Wie parse Json formatiert Kafka Nachricht in spark streaming
Habe ich JSON-Nachrichten auf Kafka wie diese:
{"id_post":"p1", "message":"blablabla"}
und ich will Parsen der Nachricht und drucken (oder verwenden Sie für die weitere Berechnung) die message
element.
Mit folgendem code Drucke ich die json -
val kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, inputGroup, topicMap)
val postStream = kafkaStream.map(_._2)
postStream.foreachRDD((rdd, time) => {
val count = rdd.count()
if (count > 0){
rdd.foreach(record => {
println(record)
}
}
aber ich kann nicht verwalten, um das einzelne element.
Ich habe versucht ein paar JSON-parser, aber kein Glück.
Irgendeine Idee?
update:
ein paar Fehler mit verschiedenen JSON-parser
dies ist der code-und-Ausgang mit circe parser:
val parsed_record = parse(record)
und die Ausgabe:
14:45:00,676 ERROR Executor:95 - Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.NoSuchMethodError: scala.Predef$.ArrowAssoc(Ljava/lang/Object;)Ljava/lang/Object;
at io.circe.jawn.CirceSupportParser$$anon$1$$anon$4.add(CirceSupportParser.scala:36)
at jawn.CharBasedParser$class.parseString(CharBasedParser.scala:90)
at jawn.StringParser.parseString(StringParser.scala:15)
at jawn.Parser.rparse(Parser.scala:397)
at jawn.Parser.parse(Parser.scala:338)
at jawn.SyncParser.parse(SyncParser.scala:24)
at jawn.SupportParser$$anonfun$parseFromString$1.apply(SupportParser.scala:15)
usw.. an die Zeile in die ich parse(record)
wie es aussieht, keinen Zugriff auf und/oder analysieren der Zeichenfolge record
.
Gleiche, wenn ich lift-json
am parse(record)
den error-Ausgang ist mehr oder weniger das gleiche:
16:58:20,425 ERROR Executor:95 - Exception in task 0.0 in stage 4.0 (TID 4)
java.lang.NoSuchMethodError: scala.runtime.ObjectRef.create(Ljava/lang/Object;)Lscala/runtime/ObjectRef;
at net.liftweb.json.JsonParser$$anonfun$2.apply(JsonParser.scala:144)
at net.liftweb.json.JsonParser$$anonfun$2.apply(JsonParser.scala:141)
at net.liftweb.json.JsonParser$.parse(JsonParser.scala:80)
at net.liftweb.json.JsonParser$.parse(JsonParser.scala:45)
at net.liftweb.json.package$.parse(package.scala:40)
at SparkConsumer$$anonfun$main$1$$anonfun$apply$1.apply(SparkConsumer.scala:98)
at SparkConsumer$$anonfun$main$1$$anonfun$apply$1.apply(SparkConsumer.scala:95)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
in der scala.Sammlung.AbstractIterator.foreach(Iterator.scala:1157)
Was ist, wenn die JSON-hat mehrere Zeilenvorschub-Zeichen
\n
?
InformationsquelleAutor salvob | 2016-04-20
Du musst angemeldet sein, um einen Kommentar abzugeben.
Ich das Problem gelöst, also Schreibe ich hier für die Zukunft Verweise:
Abhängigkeiten, der Abhängigkeiten, der Abhängigkeiten!
Ich wählen, um lift-json, aber das gilt für jede JSON-parser und/oder Rahmen.
Die SPARK-version, die ich verwende (v1.4.1) ist der kompatibel mit scala 2.10, hier die Abhängigkeiten von pom.xml:
und einigen anderen Bibliotheken. Ich war mit dem lift-json-version für scala 2.11 ... und das ist FALSCH.
So, für die Zukunft mir, und wenn du dieses Thema Lesen: im Einklang mit der scala-version und unter Abhängigkeiten.
Im lift-json Fall:
InformationsquelleAutor salvob
gleiche problem mit Ihnen.
Aber ich löste dieses problem, indem die
fastjson
.SBT dependency :
//http://mvnrepository.com/artifact/com.alibaba/fastjson
libraryDependencies += "com.alibaba" % "fastjson" % "1.2.12"
oder
Maven dependency :
<!-- http://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.12</version>
</dependency>
Können Sie einen Versuch haben. Hoffe, dass dies hilfreich sein würde.
InformationsquelleAutor Xiang Chen
Extrahieren der Daten aus dem JSON-String in Scala/Apache Spark
Unten ist die Maven Anstand
InformationsquelleAutor Suresh