Lesen Avro Nachrichten von Kafka mit Spark 2.0.2 (strukturierte streaming)
Ich habe eine spark-2.0-Anwendung, die Nachrichten liest, von kafka mit spark streaming (mit spark-streaming-kafka-0-10_2.11).
Strukturierte streaming-sieht wirklich cool aus also ich wollte versuchen, und migrieren Sie den code, aber ich kann nicht herausfinden, wie es zu benutzen.
in der regulären streaming ich verwendet kafkaUtils zu createDstrean und in die Parameter, die ich übergeben, es wurde der Wert deserializer.
in der Strukturierten streaming der doc sagt, ich soll Deserialisieren mit DataFrame Funktionen, aber ich kann nicht herausfinden, was das bedeutet.
Schaute ich auf Beispiele wie diese Beispiel aber meine Avro-Objekt in Kafka verlassen Komplex und nicht einfach gegossen, wie der String in dem Beispiel..
Bisher habe ich versucht, diese Art von code (was ich hier gesehen habe in einer anderen Frage):
import spark.implicits._
val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","RED-test-tal4").load()
ds1.printSchema()
ds1.select("value").printSchema()
val ds2 = ds1.select($"value".cast(getDfSchemaFromAvroSchema(Obj.getClassSchema))).show()
val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()
und ich erhalte die Meldung "data type mismatch: cannot cast "BinaryType" - zu StructType(StructField(...."
wie kann ich das Deserialisieren der Wert?
- Hat jemanden zu finden, eine funktionierende Lösung? Keine der unten arbeiten für mich!
- Diese Bibliothek unterstützt die strukturierte Bäche mit Avro als Nutzlast und kann helfen: ABRiS (Avro Brücke für Funke). Es befindet sich noch in der Entwicklung, sondern unterstützt Ihre Verwendung. OFFENLEGUNG: ich arbeite für ABSA und ich bin der Hauptentwickler hinter dieser Bibliothek.
Du musst angemeldet sein, um einen Kommentar abzugeben.
Ich bin noch nicht so super vertraut, wie die Spark-Serialisierung funktioniert in Kombination mit der neuen/experimentellen Strukturierte Streaming, aber der Ansatz unten funktioniert-obwohl ich bin nicht sicher, ob es der beste Weg (IMHO der Ansatz hat eine etwas peinliche look 'n feel).
Ich werde versuchen, Ihre Frage zu beantworten am Beispiel eines benutzerdefinierten Datentyps (hier: ein
Foo
Fall Klasse) statt, die speziell Avro, aber ich hoffe, es hilft dir sowieso. Die Idee ist die Verwendung von Kryo-Serialisierung serialisieren/Deserialisieren Ihre benutzerdefinierte Typ finden Tuning: - Daten-Serialisierung in der Funke Dokumentation.Stellen Sie sich vor Sie haben folgende
Foo
Fall Klasse, wie Sie Ihre benutzerdefinierten Typ (TL;DR Hinweis: um zu verhindern, laufen in komischen Funken die Serialisierung von Beschwerden/Fehlern sollten Sie den code in einem separatenFoo.scala
- Datei):Nun haben Sie folgende Strukturierten Streaming-code zum Lesen von Daten von Kafka, wobei die Eingabe Thema enthält Kafka Nachrichten, deren Meldung Wert eine Binär codierte
String
, und Ihr Ziel ist es zu schaffenFoo
- Instanzen basierend auf diese Nachricht Werte (also ähnlich wie du würde Deserialisieren von binären Daten in Instanzen von einem Avro-Klasse):Nun sind wir Deserialisieren die Werte in Instanzen der benutzerdefinierten
Foo
geben, für die müssen wir zuerst definieren eine impliziteEncoder[Foo]
:Gehen Sie zurück zu Ihrem Avro Frage, was Sie würden tun müssen, ist:
Encoder
für Ihre Bedürfnisse.Foo(new String(row.getAs[Array[Byte]]("value"))
mit dem code zum Deserialisieren Ihre Binär codierte Avro Daten in Avro POJOs, d.h. dem code, der Ihre Binär codierte Avro Daten aus der Nachricht mit dem Wert (row.getAs[Array[Byte]]("value")
) und zurück, sagen wir, eine AvroGenericRecord
oder was auch immerSpecificCustomAvroObject
Sie haben an anderer Stelle definierten.Wenn jemand anderes weiß, der eine präziser/besser/... Antwort Tal der Frage, bin ich alle Ohren. 🙂
Siehe auch:
Wie oben erwähnt, als der Funke 2.1.0 gibt es Unterstützung für die avro mit der batch-reader aber nicht mit SparkSession.readStream(). Hier ist, wie ich es geschafft habe in Scala basiert auf die anderen Antworten. Ich habe vereinfacht das schema für die Kürze.
Also tatsächlich jemand in meiner Firma das Problem gelöst für mich, Also poste ich es hier für zukünftige Leser.
im Grunde, was ich habe auf der Spitze von dem, was miguno vorgeschlagen, ist die decode-Teil:
jetzt können Sie Lesen Nachrichten aus kafka und decodieren Sie Sie wie so:
*
KafkaMessage
ist einfach nur Klasse mit dem generischen Objekt, das Sie erhalten, wenn das Lesen von Kafka(key,value,topic,partition,offset,timestamp)
AvroTo<YourObject>Decoder
ist Klasse, dass werde Dekodieren, Ihr Objekt in einem gegebenen schema Registrierungs-url.Beispielsweise mit Konfluent ist
KafkaAvroDeserializer
und schema-Registrierung.Aus diesen, nennen
.deserialize(topicName, bytes).asInstanceOf[GenericRecord]
um eine avro-Objekt.Hoffe, dies hilft jemand
Verwenden Sie die folgenden Schritte aus:
Kafka Nachricht:
Kafka Verbraucher:
Update
Utils:
TypeTag
? Was, wenn Sie nicht wollen, hart-code-reader-schema string und was zu verwenden die Schema-ID in der Nachricht?schemaStr
... Diese sollte nicht notwendig sein, als dieKafkaAvroDeserializer
Klasse ist in der Lage zu erkennen das Magic Byte + Schema-ID, die Konfluent die Avro-Kodierung verwendet, dann wird ein lookup vor der Registrierung für dieses schema string. In anderen Worten, ich denke, es sollte möglich sein, nicht "hard-code" ein Leser-schema string, und lassen Sie es dynamisch extrahiert aus der registry