Dead letter-Warteschlange (DLQ) für Kafka mit Feder-kafka

Was ist der beste Weg zur Umsetzung Dead letter-Warteschlange (DLQ) Konzept in Spring-Boot-2.0-Anwendung mit spring-kafka 2.1.x alle Nachrichten konnte nicht verarbeitet werden @KafkaListener Methode von einigen bean geschickt, um einige vordefinierte Kafka DLQ Thema nicht zu verlieren und die einzelne Nachricht?

So verbraucht Kafka Datensatzes entweder:

  1. erfolgreich verarbeitet,
  2. konnte nicht verarbeitet werden und wird an der DLQ Thema,
  3. konnte nicht verarbeitet werden, werden nicht an den DLQ Thema (aufgrund der unerwarteten problem), so werden verbraucht durch den Hörer wieder auf.

Habe ich versucht zu erstellen, listener-container mit der individuellen Umsetzung der ErrorHandler senden von Datensätzen konnte nicht verarbeitet werden DLQ-Thema mit KafkaTemplate. Verwendung von " disable auto-commit und DATENSATZ AckMode.

spring.kafka.enable-auto-ack=false
spring.kafka.listener.ack-mode=RECORD

@Configuration
public class KafkaConfig {
    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
        ...
        factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
        return factory;
    }
}

@Component
public class DlqErrorHandler implements ErrorHandler {

    @Autowired
    private KafkaTemplate<Object, Object> kafkaTemplate;

    @Value("${dlqTopic}")
    private String dlqTopic;

    @Override
    public void handle(Exception thrownException, ConsumerRecord<?, ?> record) {
        log.error("Error, sending to DLQ...");
        kafkaTemplate.send(dlqTopic, record.key(), record.value());
    }
}

Scheint es, dass diese Umsetzung nicht garantieren Element #3. Wenn eine Ausnahme ausgeworfen wird, in DlqErrorHandler Datensatz wird nicht konsumiert werden, indem Sie den Hörer wieder auf.

Wird die Nutzung des Transaktions-listener-container helfen?

factory.getContainerProperties().setTransactionManager(kafkaTransactionManager);

Gibt es eine bequeme Möglichkeit zum implementieren von DLQ-Konzept mit Spring Kafka?

UPDATE 28/03/2018

Dank Gary Russell ' s Antwort, die ich war in der Lage zu erreichen, das gewünschte Verhalten durch die Implementierung DlqErrorHandler wie folgt

@Configuration
public class KafkaConfig {
    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = ...
        ...
        factory.getContainerProperties().setAckOnError(false);
        factory.getContainerProperties().setErrorHandler(dlqErrorHandler);
        return factory;
    }
}

@Component
public class DlqErrorHandler implements ContainerAwareErrorHandler {
    ...
    @Override
    public void handle(Exception thrownException, list<ConsumerRecord<?, ?> records, Consumer<?, ?> consumer, MessageListenerContainer container) {
        Consumerrecord<?, ? record = records.get(0);
        try {
            kafkaTemplate.send("dlqTopic", record.key, record.value());
            consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset() + 1);
            //Other records may be from other partitions, so seek to current offset for other partitions too
            //...
        } catch (Exception e) {
            consumer.seek(new TopicPartition(record.topic(), record.partition()), record.offset());
            //Other records may be from other partitions, so seek to current offset for other partitions too
            //...
            throw new KafkaException("Seek to current after exception", thrownException);
        }
    }
}

Diese Weise, wenn der Verbraucher Umfrage gibt 3 Datensätze (1, 2, 3) und 2. man kann nicht verarbeitet werden:

  • 1 verarbeitet werden
  • 2 kann nicht verarbeitet werden und an die DLQ
  • 3 durch Verbraucher suchen zu erfassen.offset() + 1, es wird geliefert werden, um die Zuhörer

Wenn das senden an DLQ fehl Verbraucher sucht, um den Rekord.offset (Versatz) und der Datensatz wird erneut übermittelt, um die Zuhörer (und senden an DLQ wahrscheinlich wird zurückgezogen).

Schreibe einen Kommentar