Fehler beim Lesen der Bereich "Themen": java.nio.BufferUnderflowException in Kafka

9.0-client zu verbrauchen, Nachrichten von zwei Maklern, die auf einem remote-system.Mein Produzent ist in Ordnung, und ist in der Lage, zum senden von Nachrichten an den broker, aber meine Verbraucher ist nicht in der Lage zu konsumieren, diese Nachrichten.Von Konsumenten und Produzenten laufen auf meinem lokalen system und dem zwei-Broker sind auf der aws-Plattform.
Immer wenn ich versuche zu laufen Verbraucher. Folgende Fehlermeldung erscheint auf dem broker-Protokolle.

ERROR Closing socket for /122.172.17.81 because of error (kafka.network.Processor)
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topics': java.nio.BufferUnderflowException
        at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:66)
        at org.apache.kafka.common.requests.JoinGroupRequest.parse(JoinGroupRequest.java:85)
        at kafka.api.JoinGroupRequestAndHeader$.readFrom(JoinGroupRequestAndHeader.scala:29)
        at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
        at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
        at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:50)
        at kafka.network.Processor.read(SocketServer.scala:450)
        at kafka.network.Processor.run(SocketServer.scala:340)
        at java.lang.Thread.run(Thread.java:745)

Meine Consumer-code ist wie folgt>

package Kafka1.K1;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

public class HelloKafkaConsumer 
{
    public static void main(String args[]) throws InterruptedException, ExecutionException {
        String a[] = new String[]{"loader1"};
        //topik.add("loader1");
Properties props = new Properties();
 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"IP1:9092,IP2:9093");
 props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
 props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
 props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
 props.put("heartbeat.interval.ms", "500");
 props.put("session.timeout.ms", "1000");
 props.put("enable.auto.commit", "true");
 props.put("auto.commit.interval.ms", "10000");
 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
 consumer.subscribe(Arrays.asList(a));
 while (true) {
        //Poll for ConsumerRecords for a certain amount of time
        ConsumerRecords<String, String> records = consumer.poll(1000);

        //Process the ConsumerRecords, if any, that came back
        for (ConsumerRecord<String, String> record : records) {
                String key = record.key();
                String value = record.value();
                System.out.println(key+":"+value);
                //Do something with message
        }
      }

    }
}

Kann jemand helfen?

  • Welche version des Kafka-Broker verwenden Sie?
  • 2.11.0.10.0 Kafka
  • Meine Vermutung ist, dass einer der Ihren broker version ist 0.8.x. Check KAFKA-2496
InformationsquelleAutor user2966021 | 2016-06-01
Schreibe einen Kommentar