Wie zu Fragen, RabbitMQ zu wiederholen, wenn business-Ausnahme tritt im Frühjahr Asynchrone MessageListener Anwendungsfall
Ich habe eine Spring AMQP message-listener läuft.
public class ConsumerService implements MessageListener {
@Autowired
RabbitTemplate rabbitTemplate;
@Override
public void onMessage(Message message) {
try {
testService.process(message); //This process method can throw Business Exception
} catch (BusinessException e) {
//Here we can just log the exception. How the retry attempt is made?
} catch (Exception e) {
//Here we can just log the exception. How the retry attempt is made?
}
}
}
Wie Sie sehen können, könnte es sein, Ausnahme coming-out während des Prozesses. Ich möchte wiederholen, weil ein bestimmten Fehler im Catch-block. Ich kann nicht durch die Ausnahme in der onMessage.
Wie zu sagen, RabbitMQ ist es eine Ausnahme, und wiederholen?
- Ich habe bearbeitet meine Antwort zur Abdeckung Ihrer spezifischen Frage. Lassen Sie mich wissen, wenn Sie benötigen weitere Abklärungen.
Du musst angemeldet sein, um einen Kommentar abzugeben.
Seit
onMessage()
nicht erlauben zu werfen, checked exceptions Sie können wickeln Sie die Ausnahme in einemRuntimeException
und wieder werfen.Beachten Sie jedoch, dass dadurch möglicherweise die Nachricht erneut zugestellt auf unbestimmte Zeit. Hier ist, wie das funktioniert:
RabbitMQ unterstützt zurückweisen einer Nachricht und fragt den Makler zu requeue es. Dies wird angezeigt,hier. Aber RabbitMQ nicht nativ über einen Mechanismus verfügen, wiederholen Politik, Z. B. Einstellung max Wiederholungen, Verzögerung, usw.
Bei der Verwendung von Spring AMQP, "requeue Sie auf ablehnen," ist die Standardoption. Spring
SimpleMessageListenerContainer
wird standardmäßig dies tun, wenn es ist eine unbehandelte Ausnahme. Also in deinem Fall, die Sie gerade brauchen, um neu zu werfen die Ausnahme gefangen. Beachten Sie jedoch, dass, wenn Sie eine Nachricht nicht verarbeiten kann, und Sie werfen immer die Ausnahme von dieser re-geliefert auf unbestimmte Zeit und führt in eine Endlosschleife.Können Sie überschreiben, dieses Verhalten pro Nachricht durch das werfen einer
AmqpRejectAndDontRequeueException
Ausnahme, in dem Fall, dass die Nachricht nicht requeued.Können Sie auch ausschalten "requeue Sie auf ablehnen," Verhalten von
SimpleMessageListenerContainer
völlig durch EinstellungWenn eine Nachricht zurückgewiesen wird und nicht requeued es ist entweder verloren oder übertragen werden, um eine DLQ, wenn man in RabbitMQ.
Wenn Sie eine Richtlinie für Wiederholungsversuche mit max versucht, delay, etc der einfachste Weg ist die Einrichtung einer Feder "stateless"
RetryOperationsInterceptor
wird das tun alle Wiederholungen in den thread (mitThread.sleep()
) ohne ablehnen der Nachricht auf jede Wiederholung (so, ohne wieder zu RabbitMQ für jede Wiederholung). Wenn Wiederholungen erschöpft sind, wird standardmäßig eine Warnung protokolliert und die Nachricht wird verbraucht. Wenn Sie möchten, senden Sie eine DLQ Sie benötigen entweder einRepublishMessageRecoverer
oder eine benutzerdefinierteMessageRecoverer
, dass die Nachricht abgelehnt, ohne requeuing (in diesem letzteren Fall sollten Sie auch setup ein RabbitMQ DLQ in der Warteschlange). Beispiel mit default-message-verwertenden:Dies hat natürlich den Nachteil, dass Sie besetzen den Thread für die gesamte Dauer der Wiederholungen. Sie haben auch die Möglichkeit, eine "stateful"
RetryOperationsInterceptor
, die senden Sie die Nachricht zurück zu RabbitMQ für jede Wiederholung, aber die Verzögerung wird immer noch umgesetzt werden mitThread.sleep()
innerhalb der Anwendung, plus die Einrichtung einer stateful-interceptor ist ein bisschen komplizierter.Deshalb, wenn Sie wollen Wiederholungen mit Verzögerungen beanspruchen
Thread
benötigen Sie ein viel mehr beteiligt benutzerdefinierte Lösung mithilfe von TTL auf RabbitMQ Warteschlangen. Wenn Sie nicht möchten, exponential-backoff (also Verzögerung nicht erhöht auf jeden wiederholen) es ist ein bisschen einfacher. Eine solche Lösung zu implementieren, die Sie im Grunde erstellen Sie eine andere Warteschlange auf rabbitMQ mit Argumenten:"x-message-ttl": <delay time in milliseconds>
und"x-dead-letter-exchange":"<name of the original queue>"
. Dann auf das Haupt-queue-Sie setzen"x-dead-letter-exchange":"<name of the queue with the TTL>"
. So, jetzt, wenn Sie ablehnen und nicht requeue eine Nachricht RabbitMQ leitet es an die zweite Warteschlange. Wenn die TTL ausläuft, wird es umgeleitet werden, um die ursprüngliche Warteschlange und somit nachgeliefert, um die Anwendung. So, jetzt müssen Sie wiederholen interceptor, der lehnt die Botschaft zu RabbitMQ nach jedem Versagen und verfolgt auch die Wiederholungsanzahl. To avoid the need To keep state in der Anwendung (weil, wenn Ihre Anwendung in einem Cluster befindet, müssen Sie replizieren Zustand) können Sie berechnen, die Anzahl der Wiederholungen aus derx-death
header, RabbitMQ-sets. Finden Sie mehr info über diesen header hier. Also an dieser Stelle die Implementierung einer benutzerdefinierten interceptor ist einfacher als die Anpassung der Frühling stateful-interceptor mit diesem Verhalten.Überprüfen Sie auch der Abschnitt über die Wiederholungen in das Spring AMQP-Referenz.