Lesen Avro Nachrichten von Kafka mit Spark 2.0.2 (strukturierte streaming)

Ich habe eine spark-2.0-Anwendung, die Nachrichten liest, von kafka mit spark streaming (mit spark-streaming-kafka-0-10_2.11).

Strukturierte streaming-sieht wirklich cool aus also ich wollte versuchen, und migrieren Sie den code, aber ich kann nicht herausfinden, wie es zu benutzen.

in der regulären streaming ich verwendet kafkaUtils zu createDstrean und in die Parameter, die ich übergeben, es wurde der Wert deserializer.

in der Strukturierten streaming der doc sagt, ich soll Deserialisieren mit DataFrame Funktionen, aber ich kann nicht herausfinden, was das bedeutet.

Schaute ich auf Beispiele wie diese Beispiel aber meine Avro-Objekt in Kafka verlassen Komplex und nicht einfach gegossen, wie der String in dem Beispiel..

Bisher habe ich versucht, diese Art von code (was ich hier gesehen habe in einer anderen Frage):

import spark.implicits._

  val ds1 = spark.readStream.format("kafka").
    option("kafka.bootstrap.servers","localhost:9092").
    option("subscribe","RED-test-tal4").load()

  ds1.printSchema()
  ds1.select("value").printSchema()
  val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()  
  val query = ds2.writeStream
    .outputMode("append")
    .format("console")
    .start()

und ich erhalte die Meldung "data type mismatch: cannot cast "BinaryType" - zu StructType(StructField(...."

wie kann ich das Deserialisieren der Wert?

  • Hat jemanden zu finden, eine funktionierende Lösung? Keine der unten arbeiten für mich!
  • Diese Bibliothek unterstützt die strukturierte Bäche mit Avro als Nutzlast und kann helfen: ABRiS (Avro Brücke für Funke). Es befindet sich noch in der Entwicklung, sondern unterstützt Ihre Verwendung. OFFENLEGUNG: ich arbeite für ABSA und ich bin der Hauptentwickler hinter dieser Bibliothek.
InformationsquelleAutor Tal Joffe | 2016-11-20
Schreibe einen Kommentar