Wie parse Json formatiert Kafka Nachricht in spark streaming

Habe ich JSON-Nachrichten auf Kafka wie diese:

{"id_post":"p1", "message":"blablabla"}

und ich will Parsen der Nachricht und drucken (oder verwenden Sie für die weitere Berechnung) die message element.
Mit folgendem code Drucke ich die json -

val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, inputGroup, topicMap)
val postStream = kafkaStream.map(_._2)
postStream.foreachRDD((rdd, time) => {
val count = rdd.count()
if (count > 0){
    rdd.foreach(record => {
      println(record)
    }
}

aber ich kann nicht verwalten, um das einzelne element.
Ich habe versucht ein paar JSON-parser, aber kein Glück.
Irgendeine Idee?

update:
ein paar Fehler mit verschiedenen JSON-parser
dies ist der code-und-Ausgang mit circe parser:

val parsed_record = parse(record)

und die Ausgabe:

14:45:00,676 ERROR Executor:95 - Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
        at io.circe.jawn.CirceSupportParser$$anon$1$$anon$4.add(CirceSupportParser.scala:36)
        at jawn.CharBasedParser$class.parseString(CharBasedParser.scala:90)
        at jawn.StringParser.parseString(StringParser.scala:15)
        at jawn.Parser.rparse(Parser.scala:397)
        at jawn.Parser.parse(Parser.scala:338)
        at jawn.SyncParser.parse(SyncParser.scala:24)
        at jawn.SupportParser$$anonfun$parseFromString$1.apply(SupportParser.scala:15)

usw.. an die Zeile in die ich parse(record)
wie es aussieht, keinen Zugriff auf und/oder analysieren der Zeichenfolge record.

Gleiche, wenn ich lift-json
am parse(record) den error-Ausgang ist mehr oder weniger das gleiche:

16:58:20,425 ERROR Executor:95 - Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.NoSuchMethodError: scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
at net.liftweb.json.JsonParser$$anonfun$2.apply(JsonParser.scala:144)
at net.liftweb.json.JsonParser$$anonfun$2.apply(JsonParser.scala:141)
at net.liftweb.json.JsonParser$.parse(JsonParser.scala:80)
at net.liftweb.json.JsonParser$.parse(JsonParser.scala:45)
at net.liftweb.json.package$.parse(package.scala:40)
at SparkConsumer$$anonfun$main$1$$anonfun$apply$1.apply(SparkConsumer.scala:98)
at SparkConsumer$$anonfun$main$1$$anonfun$apply$1.apply(SparkConsumer.scala:95)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)

in der scala.Sammlung.AbstractIterator.foreach(Iterator.scala:1157)

Welches problem hast du Erfahrungen mit dem JSON-parser?
Was ist, wenn die JSON-hat mehrere Zeilenvorschub-Zeichen \n?

InformationsquelleAutor salvob | 2016-04-20

Schreibe einen Kommentar