Dead letter-Warteschlange (DLQ) für Kafka mit Feder-kafka
Was ist der beste Weg zur Umsetzung Dead letter-Warteschlange (DLQ) Konzept in Spring-Boot-2.0-Anwendung mit spring-kafka 2.1.x alle Nachrichten konnte nicht verarbeitet werden @KafkaListener Methode von einigen bean geschickt, um einige vordefinierte Kafka DLQ Thema nicht zu verlieren und die einzelne Nachricht?
So verbraucht Kafka Datensatzes entweder:
- erfolgreich verarbeitet,
- konnte nicht verarbeitet werden und wird an der DLQ Thema,
- konnte nicht verarbeitet werden, werden nicht an den DLQ Thema (aufgrund der unerwarteten problem), so werden verbraucht durch den Hörer wieder auf.
Habe ich versucht zu erstellen, listener-container mit der individuellen Umsetzung der ErrorHandler senden von Datensätzen konnte nicht verarbeitet werden DLQ-Thema mit KafkaTemplate. Verwendung von " disable auto-commit und DATENSATZ AckMode.
spring.kafka.enable-auto-ack=false
spring.kafka.listener.ack-mode=RECORD
@Configuration
public class KafkaConfig {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
...
factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
return factory;
}
}
@Component
public class DlqErrorHandler implements ErrorHandler {
@Autowired
private KafkaTemplate<Object, Object> kafkaTemplate;
@Value("${dlqTopic}")
private String dlqTopic;
@Override
public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
log.error("Error, sending to DLQ...");
kafkaTemplate.send(dlqTopic, record.key(), record.value());
}
}
Scheint es, dass diese Umsetzung nicht garantieren Element #3. Wenn eine Ausnahme ausgeworfen wird, in DlqErrorHandler Datensatz wird nicht konsumiert werden, indem Sie den Hörer wieder auf.
Wird die Nutzung des Transaktions-listener-container helfen?
factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);
Gibt es eine bequeme Möglichkeit zum implementieren von DLQ-Konzept mit Spring Kafka?
UPDATE 28/03/2018
Dank Gary Russell ' s Antwort, die ich war in der Lage zu erreichen, das gewünschte Verhalten durch die Implementierung DlqErrorHandler wie folgt
@Configuration
public class KafkaConfig {
@Bean
ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
...
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
return factory;
}
}
@Component
public class DlqErrorHandler implements ContainerAwareErrorHandler {
...
@Override
public void handle(Exception thrownException, list<ConsumerRecord<?, ?> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
Consumerrecord<?, ? record = records.get(0);
try {
kafkaTemplate.send("dlqTopic", record.key, record.value());
consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
//Other records may be from other partitions, so seek to current offset for other partitions too
//...
} catch (Exception e) {
consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
//Other records may be from other partitions, so seek to current offset for other partitions too
//...
throw new KafkaException("Seek to current after exception", thrownException);
}
}
}
Diese Weise, wenn der Verbraucher Umfrage gibt 3 Datensätze (1, 2, 3) und 2. man kann nicht verarbeitet werden:
- 1 verarbeitet werden
- 2 kann nicht verarbeitet werden und an die DLQ
- 3 durch Verbraucher suchen zu erfassen.offset() + 1, es wird geliefert werden, um die Zuhörer
Wenn das senden an DLQ fehl Verbraucher sucht, um den Rekord.offset (Versatz) und der Datensatz wird erneut übermittelt, um die Zuhörer (und senden an DLQ wahrscheinlich wird zurückgezogen).
- Seit version 2.2 des spring-kafka es ist eine Klasse, die DeadLetterPublishingRecoverer und Kapitel Publishing Dead Letter Records in der offiziellen Dokumentation.
Du musst angemeldet sein, um einen Kommentar abzugeben.
Sehen die
SeekToCurrentErrorHandler
.Wenn eine exception Auftritt, sucht der Verbraucher, so dass alle unverarbeiteten Datensätze nachgeliefert, auf die nächste Umfrage.
Können Sie das gleiche Verfahren verwenden (z.B. eine Unterklasse) zu schreiben, um die DLQ und suchen die aktuellen Offsets (und andere unverarbeitete) wenn der DLQ schreiben schlägt fehl, und suchen nur die verbleibenden Datensätze, wenn der DLQ schreiben gelingt.