Fehler beim Lesen der Bereich "Themen": java.nio.BufferUnderflowException in Kafka
9.0-client zu verbrauchen, Nachrichten von zwei Maklern, die auf einem remote-system.Mein Produzent ist in Ordnung, und ist in der Lage, zum senden von Nachrichten an den broker, aber meine Verbraucher ist nicht in der Lage zu konsumieren, diese Nachrichten.Von Konsumenten und Produzenten laufen auf meinem lokalen system und dem zwei-Broker sind auf der aws-Plattform.
Immer wenn ich versuche zu laufen Verbraucher. Folgende Fehlermeldung erscheint auf dem broker-Protokolle.
ERROR Closing socket for /122.172.17.81 because of error (kafka.network.Processor)
org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'topics': java.nio.BufferUnderflowException
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:66)
at org.apache.kafka.common.requests.JoinGroupRequest.parse(JoinGroupRequest.java:85)
at kafka.api.JoinGroupRequestAndHeader$.readFrom(JoinGroupRequestAndHeader.scala:29)
at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
at kafka.api.RequestKeys$$anonfun$12.apply(RequestKeys.scala:50)
at kafka.network.RequestChannel$Request.<init>(RequestChannel.scala:50)
at kafka.network.Processor.read(SocketServer.scala:450)
at kafka.network.Processor.run(SocketServer.scala:340)
at java.lang.Thread.run(Thread.java:745)
Meine Consumer-code ist wie folgt>
package Kafka1.K1;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
public class HelloKafkaConsumer
{
public static void main(String args[]) throws InterruptedException, ExecutionException {
String a[] = new String[]{"loader1"};
//topik.add("loader1");
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"IP1:9092,IP2:9093");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put("heartbeat.interval.ms", "500");
props.put("session.timeout.ms", "1000");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "10000");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(a));
while (true) {
//Poll for ConsumerRecords for a certain amount of time
ConsumerRecords<String, String> records = consumer.poll(1000);
//Process the ConsumerRecords, if any, that came back
for (ConsumerRecord<String, String> record : records) {
String key = record.key();
String value = record.value();
System.out.println(key+":"+value);
//Do something with message
}
}
}
}
Kann jemand helfen?
- Welche version des Kafka-Broker verwenden Sie?
- 2.11.0.10.0 Kafka
- Meine Vermutung ist, dass einer der Ihren broker version ist 0.8.x. Check KAFKA-2496
Du musst angemeldet sein, um einen Kommentar abzugeben.
Dieses Problem tritt auf, wenn die kafka-cluster, die auf Ihrem Computer ist eine ältere version ich.e 0.8.x.x, wo als client verwendet wird, um Zugriff auf Daten aus dem cluster eine höhere version habe ich.e 0.9.x.x.
Gibt es zwei einfache Lösungen, die auf Anforderungen basieren: