KafkaAvroSerializer zum serialisieren Avro ohne schema.Registrierung.url
Ich bin ein noob auf Kafka und Avro. So habe ich versucht, um das Producer/Consumer läuft. So weit ich in der Lage gewesen, zu produzieren und zu konsumieren, einfach Bytes und Strings, die mit den folgenden :
Konfiguration für den Produzenten :
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
for (int i = 0; i < 1000; i++) {
GenericData.Record avroRecord = new GenericData.Record(schema);
avroRecord.put("str1", "Str 1-" + i);
avroRecord.put("str2", "Str 2-" + i);
avroRecord.put("int1", i);
byte[] bytes = recordInjection.apply(avroRecord);
ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
producer.send(record);
Thread.sleep(250);
}
producer.close();
}
Nun das ist ja alles schön und gut, das problem kommt, wenn ich versuche zu serialisieren ein POJO.
Also , ich war in der Lage, um die AvroSchema aus dem POJO mit dem Dienstprogramm mit Avro.
Hardcoded das schema, und dann versucht, erstellen Sie einen Generischen Datensatz zu senden, durch die KafkaProducer
der Produzent ist jetzt :
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.KafkaAvroSerializer");
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA); //this is the Generated AvroSchema
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
dies ist, wo das problem ist : der moment benutze ich KafkaAvroSerializer, die Hersteller kommen nicht durch :
missing mandatory parameter : schema.Registrierung.url
Ich Lesen, warum dieses erforderlich ist, so dass der Verbraucher in der Lage ist, zu entziffern, was die Hersteller senden zu mir.
Aber ist das nicht das schema bereits eingebettet in die AvroMessage?
Wäre wirklich toll, wenn jemand kann teilen ein funktionsfähiges Beispiel KafkaProducer mit der KafkaAvroSerializer ohne Angabe von schema.Registrierung.url
wäre auch wirklich zu schätzen wissen, alle Erkenntnisse/Ressourcen auf den nutzen der schema registry.
Dank!
- haben Sie versucht, Frühling-kafka avro deserializer ? Hier ein Lernprogramm, wie gut.
Du musst angemeldet sein, um einen Kommentar abzugeben.
Hinweis zuerst:
KafkaAvroSerializer
ist nicht in vanilla apache kafka - das wird durch die Konfluent Plattform. (https://www.confluent.io/), als Teil Ihrer open-source-Komponenten (http://docs.confluent.io/current/platform.html#confluent-schema-registry)Schnelle Antwort: Nein, wenn Sie
KafkaAvroSerializer
Sie benötigen eine schema registry. Siehe einige Beispiele hier:http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html
Die grundlegende Idee, die mit der schema-Registrierung ist, dass jedes Thema wird auf eine avro-schema (dh, Sie werden nur in der Lage, Daten zu senden, im Einklang mit einander. Aber ein schema kann mehrere version, so dass Sie noch brauchen, um zu erkennen, das schema für jeden Datensatz)
Wollen wir nicht zu schreiben, das schema für everydata wie Sie implizieren - Häufig-schema ist größer als Ihre Daten! Das wäre eine Verschwendung von Zeit Analyse es jedes mal beim Lesen, und eine Verschwendung von Ressourcen (Netzwerk, Festplatte, cpu)
Statt, eine schema registry-Instanz wird eine verbindliche
avro schema <-> int schemaId
und den serializer schreibt dann nur diese id vor den Daten, nachdem ich es aus der registry (und Zwischenspeichern für eine spätere Verwendung).So innerhalb von kafka, wird Ihr Datensatz wird
[<id> <bytesavro>]
(und magic byte aus technischen Gründen), das ist ein overhead von nur 5 Byte (zu vergleichen, um die Größe des schema -)Und beim Lesen Ihrer Verbraucher finden das entsprechende schema um die id, und deserializer avro bytes in Bezug auf es. Sie finden viel mehr in konfluent doc
Wenn du wirklich eine Anwendung haben, wo Sie möchten, schreiben Sie das schema für jeden Datensatz, benötigen Sie eine andere serializer (ich denke, das schreiben Ihrer eigenen, aber es wird leicht sein, nur die Wiederverwendung von https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java und entfernen Sie die schema registry-Teil, ersetzen Sie es mit dem schema, das gleiche gilt für das Lesen). Aber wenn Sie avro, würde ich wirklich raten von dieser - einen Tag später, werden Sie brauchen, um etwas umzusetzen, wie avro registry zu verwalten, Versionierung
Während die Antwort überprüft, ist alles korrekt, es sollte auch erwähnt werden, dass - schema-Registrierung kann deaktiviert werden.
Einfach
auto.register.schemas
zufalse
.Können Sie jederzeit Ihre Wert-Klassen zu implementieren
Serialiser<T>
,Deserialiser<T>
(undSerde<T>
für Kafka-Streams) manuell. Java-Klassen sind in der Regel generiert, die von Avro-Dateien so Bearbeiten, dass Sie direkt ist nicht eine gute Idee, aber die Verpackung ist vielleicht die ausführliche, aber möglich Weg.Ein anderer Weg ist die Melodie Arvo generator-Vorlagen verwendet, für die Java-Klassen generation und generieren die Umsetzung aller diese Schnittstellen automatisch. Beide Avro maven-und gradle-plugins unterstützt benutzerdefinierte Vorlagen, so ist es sollte leicht zu konfigurieren.
Habe ich https://github.com/artemyarulin/avro-kafka-deserializable das hat sich geändert-template-Dateien und einfache CLI-tool, das Sie verwenden können, für die Datei-Generierung