Kafka: Überwachen Sie die Verzögerung für die Verbraucher, Partitionen zugewiesen Thema

Arbeite ich mit Kafka 0.9.1 neue Verbraucher-API. Der Verbraucher wird manuell eine partition zugewiesen. Für diese Verbraucher ich würde gerne sehen, seine Fortschritte (im Sinne der lag). Da ich Hinzugefügt die Gruppen-id, Verbraucher-tutorial als Eigenschaft, die ich davon ausgegangen, dass ich mit dem Befehl

bin/kafka-consumer-groups.sh --new-consumer --describe --group consumer-tutorial --bootstrap-server localhost:9092

(wie hier erläutert http://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0.9-consumer-client)

Leider, meinen die Verbraucher-Gruppe-details wird nicht angezeigt mit dem oben genannten Befehl. Daher kann ich den Fortschritt meiner Verbraucher (es lag). Wie kann ich überwachen, die lag in der oben beschriebenen Szenario (manuell zugewiesene partition)?

Der code ist:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "consumer-tutorial");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);


        String topic = "my-topic";
        TopicPartition topicPartition = new TopicPartition(topic, 0);
        consumer.assign(Arrays.asList(topicPartition));
        consumer.seekToBeginning(topicPartition);
try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(1000);
    for (ConsumerRecord<String, String> record : records)
     System.out.println(record.offset() + ": " + record.value());
  consumer.commitSynch();
  }
} finally {
  consumer.close();
}
InformationsquelleAutor florins | 2016-06-09
Schreibe einen Kommentar