Kafka-partition-Taste funktioniert nicht richtig

Bin ich mit zu kämpfen, wie die Verwendung der partition-key-Mechanismus richtig. Meine Logik ist, stellen Sie die partition Nummer 3, dann erstellen Sie drei-partition als Schlüssel "0", "1", "2", verwenden Sie dann die partition Schlüssel zum erstellen von drei KeyedMessage wie

  • KeyedMessage(Thema, "0", Nachricht)
  • KeyedMessage(Thema, "1", Nachricht)
  • KeyedMessage(Thema, "2", Nachricht)

Danach erstellen eines Produzenten-Instanz zu senden, alle KeyedMessage.

Ich erwartete KeyedMessage sollten Sie die EINGABETASTE, um verschiedene Partitionen entsprechend auf die andere partition Tasten, was bedeutet, dass

  • KeyedMessage(Thema, "0", Nachricht) gehen Sie auf Partition 0
  • KeyedMessage(Thema, "1", Nachricht) gehen Sie auf Partition 1
  • KeyedMessage(Thema, "2", Nachricht) gehen Sie auf Partition 2

Ich bin mit Kafka-web-Konsole zu beobachten, das Thema status, aber das Ergebnis ist nicht wie das, was ich erwarte. KeyedMessage noch gehen, um die Partitionen nach dem Zufallsprinzip, einige Male zwei KeyedMessage wird, geben Sie die gleiche partition auch Sie anderen partition keys.

Meine Frage mehr klar, ich möchte nach einigen Scala-codes derzeit habe ich, und ich bin mit Kafka 0.8.2-beta und Scala 2.10.4.

Hier ist die Hersteller-codes, ich nicht verwenden, die benutzerdefinierte partitioner.class :

  val props = new Properties()

  val codec = if(compress) DefaultCompressionCodec.codec else NoCompressionCodec.codec

  props.put("compression.codec", codec.toString)
  props.put("producer.type", if(synchronously) "sync" else "async")
  props.put("metadata.broker.list", brokerList)
  props.put("batch.num.messages", batchSize.toString)
  props.put("message.send.max.retries", messageSendMaxRetries.toString)
  props.put("request.required.acks",requestRequiredAcks.toString)
  props.put("client.id",clientId.toString)

  val producer = new Producer[AnyRef, AnyRef](new ProducerConfig(props))

  def kafkaMesssage(message: Array[Byte], partition: Array[Byte]): KeyedMessage[AnyRef, AnyRef] = {
     if (partition == null) {
       new KeyedMessage(topic,message)
     } else {
       new KeyedMessage(topic,partition,message)
     }
  }

  def send(message: String, partition: String = null): Unit = send(message.getBytes("UTF8"), if (partition == null) null else partition.getBytes("UTF8"))

  def send(message: Array[Byte], partition: Array[Byte]): Unit = {
    try {
      producer.send(kafkaMesssage(message, partition))
    } catch {
      case e: Exception =>
        e.printStackTrace
        System.exit(1)
    }       
  }

Und hier ist, wie ich die Produzenten, erstellen eines Produzenten-Instanz und verwenden Sie anschließend diese Instanz zu senden, drei-Nachricht. Derzeit erstelle ich die partition key als Integer, dann konvertieren Sie es in Byte-Arrays:

  val testMessage = UUID.randomUUID().toString
  val testTopic = "sample1"
  val groupId_1 = "testGroup"

  print("starting sample broker testing")
  val producer = new KafkaProducer(testTopic, "localhost:9092")

  val numList = List(0,1,2);
  for (a <- numList) {
    //Create a partition key as Byte Array
    var key = java.nio.ByteBuffer.allocate(4).putInt(a).array()
    //Here I give a Array[Byte] key
    //so the second "send" function of producer will be called
    producer.send(testMessage.getBytes("UTF8"), key)
  }

Nicht sicher, ob meine Logik falsch ist oder ich nicht verstehen, die partition-key-Mechanismus korrekt. Könnte jemand bietet einige Beispiel-code oder Erklärung wäre Super!!!

  • Ich nehme an, Sie sind mit stealthly/scala-kafka Bibliothek? Es sieht aus wie ein bug, können Sie eröffnen Sie bitte ein issue auf github für, die? Ich werde versuchen, um dieses Problem bis Ende der Woche oder so.
  • Ja, die codes, die war kommen aus diesem repository. Ich denke, der Hersteller sollte definiert werden, ein link zu dieser "neuen Producer[String, String](someConfig)", dann wird der interne Standard-Partitionierer sollte funktionieren. Öffnen werde ich das Thema auf github von heute. Dank
Schreibe einen Kommentar