Kafka-partition-Taste funktioniert nicht richtig
Bin ich mit zu kämpfen, wie die Verwendung der partition-key-Mechanismus richtig. Meine Logik ist, stellen Sie die partition Nummer 3, dann erstellen Sie drei-partition als Schlüssel "0", "1", "2", verwenden Sie dann die partition Schlüssel zum erstellen von drei KeyedMessage wie
- KeyedMessage(Thema, "0", Nachricht)
- KeyedMessage(Thema, "1", Nachricht)
- KeyedMessage(Thema, "2", Nachricht)
Danach erstellen eines Produzenten-Instanz zu senden, alle KeyedMessage.
Ich erwartete KeyedMessage sollten Sie die EINGABETASTE, um verschiedene Partitionen entsprechend auf die andere partition Tasten, was bedeutet, dass
- KeyedMessage(Thema, "0", Nachricht) gehen Sie auf Partition 0
- KeyedMessage(Thema, "1", Nachricht) gehen Sie auf Partition 1
- KeyedMessage(Thema, "2", Nachricht) gehen Sie auf Partition 2
Ich bin mit Kafka-web-Konsole zu beobachten, das Thema status, aber das Ergebnis ist nicht wie das, was ich erwarte. KeyedMessage noch gehen, um die Partitionen nach dem Zufallsprinzip, einige Male zwei KeyedMessage wird, geben Sie die gleiche partition auch Sie anderen partition keys.
Meine Frage mehr klar, ich möchte nach einigen Scala-codes derzeit habe ich, und ich bin mit Kafka 0.8.2-beta und Scala 2.10.4.
Hier ist die Hersteller-codes, ich nicht verwenden, die benutzerdefinierte partitioner.class :
val props = new Properties()
val codec = if(compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec
props.put("compression.codec", codec.toString)
props.put("producer.type", if(synchronously) "sync" else "async")
props.put("metadata.broker.list", brokerList)
props.put("batch.num.messages", batchSize.toString)
props.put("message.send.max.retries", messageSendMaxRetries.toString)
props.put("request.required.acks",requestRequiredAcks.toString)
props.put("client.id",clientId.toString)
val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))
def kafkaMesssage(message: Array[Byte], partition: Array[Byte]): KeyedMessage[AnyRef, AnyRef] = {
if (partition == null) {
new KeyedMessage(topic,message)
} else {
new KeyedMessage(topic,partition,message)
}
}
def send(message: String, partition: String = null): Unit = send(message.getBytes("UTF8"), if (partition == null) null else partition.getBytes("UTF8"))
def send(message: Array[Byte], partition: Array[Byte]): Unit = {
try {
producer.send(kafkaMesssage(message, partition))
} catch {
case e: Exception =>
e.printStackTrace
System.exit(1)
}
}
Und hier ist, wie ich die Produzenten, erstellen eines Produzenten-Instanz und verwenden Sie anschließend diese Instanz zu senden, drei-Nachricht. Derzeit erstelle ich die partition key als Integer, dann konvertieren Sie es in Byte-Arrays:
val testMessage = UUID.randomUUID().toString
val testTopic = "sample1"
val groupId_1 = "testGroup"
print("starting sample broker testing")
val producer = new KafkaProducer(testTopic, "localhost:9092")
val numList = List(0,1,2);
for (a <- numList) {
//Create a partition key as Byte Array
var key = java.nio.ByteBuffer.allocate(4).putInt(a).array()
//Here I give a Array[Byte] key
//so the second "send" function of producer will be called
producer.send(testMessage.getBytes("UTF8"), key)
}
Nicht sicher, ob meine Logik falsch ist oder ich nicht verstehen, die partition-key-Mechanismus korrekt. Könnte jemand bietet einige Beispiel-code oder Erklärung wäre Super!!!
- Ich nehme an, Sie sind mit
stealthly/scala-kafka
Bibliothek? Es sieht aus wie ein bug, können Sie eröffnen Sie bitte ein issue auf github für, die? Ich werde versuchen, um dieses Problem bis Ende der Woche oder so. - Ja, die codes, die war kommen aus diesem repository. Ich denke, der Hersteller sollte definiert werden, ein link zu dieser "neuen Producer[String, String](someConfig)", dann wird der interne Standard-Partitionierer sollte funktionieren. Öffnen werde ich das Thema auf github von heute. Dank
Du musst angemeldet sein, um einen Kommentar abzugeben.
Hatte das gleiche Problem - wechseln Sie einfach auf die ByteArrayParitioner:
ByteArrayPartitioner
Klasse in 0.9.x-version.Menschen nehmen oft eine Partitionierung ist eine Möglichkeit zum trennen von geschäftlichen Daten auf business-Kategorien, aber dies ist nicht der richtige Blickwinkel der partition.
Partitionierung direkt beeinflussen diese Themen:
-Leistung (jede partition kann verzehrt werden parallel auf anderen Partitionen)
-Nachrichten, die Reihenfolge der Nachrichten garantiert nur auf partition-Ebene)
Werde ich ein Beispiel geben, wie wir Partitionen erstellen:
Haben Sie ein Thema, sagen MyMessagesToWorld
Sie übertragen möchten dieses Thema (alle MyMessagesToWorld) für einige Verbraucher.
Ihnen "Gewicht" die ganze "Masse" der MyMessagesToWorld und fand, dass dies 10 kg.
Haben Sie folgende "business" - Kategorien in "MyMessagesToWorld ":
-Nachrichten zu Papa (D)
-Nachrichten, Mutter (M)
-Nachrichten an die sis (S)
-Meldungen, die Oma (G)
-Meldungen, die an Lehrer (T)
-Meldungen zu girl friend (F)
Denken Sie, wer ist Verbraucher und festgestellt, dass Ihre Verbraucher sind Zwerge, die in der Lage verbrauchen wie 1 Kg Nachrichten, die in je einer Stunde.
Können Sie beschäftigen bis zu 2 solcher Zwerge.
1 gnome braucht 10 Stunden zu verbrauchen 10 kg Nachrichten, 2 Zwerge brauchen 5 Stunden.
So dass Sie entscheiden, nutzen alle verfügbaren Zwerge um Zeit zu sparen.
Erstellen 2 "Kanäle" für diese 2 Zwerge, die Sie erstellen, auf 2 Partitionen zu diesem Thema auf Kafka. Wenn Sie invision mehr GNOME, mehr Partitionen.
Haben Sie 6 business-Kategorien innen-und 2 aufeinander unabhängige Verbraucher - GNOME (consumer-threads).
Was ist zu tun?
Kafkas Ansatz ist folgende:
Angenommen, Sie haben 2 kafka-Instanzen in cluster.
(Das gleiche Beispiel, OK , wenn Sie mehr instaces im cluster)
Legen Sie die partition Nummer 2 auf Kafka, B.(verwenden Kafka 0.8.2.1 als Beispiel):
Definieren Sie Ihr Thema in Kafka, erzählt, dass Sie 2 Partitionen für das Thema:
Nun das Thema MyMessagesToWorld hat 2 Partitionen: P(0) und P(1).
Sie wählte die Nummer 2 (Partitionen), weil Sie wissen, Sie haben(invision) nur 2 verbraucht GNOME.
Können Sie weitere Partitionen später, wenn mehr Verbraucher GNOME eingesetzt werden.
Nicht zu verwechseln Kafka Verbraucher mit solchen gnome.
Kafka Verbraucher einsetzen können, N Zwerge. (N parallele threads)
Nun erstellen Sie die Schlüssel für Ihre Nachrichten.
Müssen Sie die SCHLÜSSEL zu verteilen Sie Ihre Nachrichten zwischen den Partitionen.
Schlüssel werden diese Briefe von "business-Kategorien", die Sie vorher definiert:
D,M,S,G,T,F, denken Sie, solche Briefe sind OK-ID.
Aber im Allgemeinen Fall, was auch immer, kann als Schlüssel verwendet:
(komplexe Objekte und byte-arrays, etwas...)
Wenn Sie KEIN Partitionierer, das standardmäßig verwendet wird.
Den Standard-Partitionierer ist ein bisschen blöd.
Dauert es hashcode der einzelnen Tasten und teilt es durch die Anzahl der verfügbaren Partitionen , die "Erinnerung" wird defind die Nummer der partition für die Schlüssel.
Beispiel:
Als Sie sich vorstellen können, mit solchen partitionierers im besten Fall haben Sie 3 business-Kategorien der Landung in jede partition.
Im schlimmeren Fall können Sie alle business-Kategorien der Landung in 1 partition.
Wenn Sie hatte 100000 business-Kategorien, wird es statistisch OK, um Sie zu verbreiten, die von solchen Algorithmus.
Aber nur wenige Kategorien haben, können Sie nicht sehr gerechte Verteilung.
So schreiben Sie Partitionierer und verteilen Sie Ihre business-Kategorien ein bisschen klüger.
Gibt es ein Beispiel:
Dieser Partitionierer vertreibt business-Kategorien gleichmäßig auf die verfügbaren Partitionen.
}
Den Standard-Partitionierer schaut auf den Schlüssel (als byte array) und verwendet (% numPartitions) zu konvertieren, den Wert in einen integer-Wert zwischen 0 und der Anzahl der Partitionen-1 inklusive. Der resultierende integer ist, was bestimmt die partition, auf die die Nachricht geschrieben wird, nicht der Wert des Schlüssels, als du tust.