Frühjahr SimpleMessageListenerContainer für RabbitMQ ist Abbruch auf ungültige Nachricht

Ich bin mit Federn SimpleMessageListenerContainer zu konsumieren-Nachrichten von einem RabbitMQ Warteschlange. Alles funktioniert gut, aber wenn eine ungültige Nachricht an die Warteschlange gesendet wird (z.B. ungültige json) den Hörer einfach abgebrochen wird, ist das Herunterfahren des Arbeitnehmers nicht annimmt, keine weiteren Nachrichten.

Ist es möglich, konfigurieren Sie es so, dass es verwirft die defekte Nachricht und hält hören weitere Nachrichten?

Ich bin mit sprint-rabbit-1.6.1.RELEASE.jar

Meine Konfiguration sieht wie folgt aus:

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
                                                MessageListenerAdapter listenerAdapter,
                                                MessageConverter messageConverter) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames("my.queue");
    container.setMessageListener(listenerAdapter);
    container.setMessageConverter(messageConverter);
    return container;
}

@Bean
public MessageConverter messageConverter() {
    return new Jackson2JsonMessageConverter();
}

@Bean
MessageListenerAdapter listenerAdapter(Worker worker) {
    MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(worker, "processMessage");
    messageListenerAdapter.setMessageConverter(new Jackson2JsonMessageConverter());
    return messageListenerAdapter;
}

Die Erklärung meines listener-Methode:

   public void processMessage(Map<String, String> message) {

Wenn ich eine Nachricht senden wie '"routeId":"7"}' (broken json), dann bekomme ich die Exception:

2016-09-02 08:10:35.821  WARN 35841 --- [   container-29] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed.

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Failed to invoke target method 'processMessage' with argument type = [class java.lang.String], value = [{routeId}]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:408) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312) [spring-rabbit-1.6.1.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]
Caused by: java.lang.NoSuchMethodException: nz.co.qrious.transport.batchstarter.service.Worker.processMessage(java.lang.String)
at java.lang.Class.getMethod(Class.java:1786) ~[na:1.8.0_101]
at     org.springframework.util.MethodInvoker.prepare(MethodInvoker.java:174) ~[spring-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:386) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
... 12 common frames omitted

2016-09-02 08:10:35.828 ERROR 35841 --- [   container-29]     o.s.a.r.l.SimpleMessageListenerContainer : Consumer received fatal exception during processing

org.springframework.amqp.rabbit.listener.exception.FatalListenerExecutionException: Invalid listener
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1351) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101]
Caused by: org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Failed to invoke target method 'processMessage' with argument type = [class java.lang.String], value = [{routeId}]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:408) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
... 1 common frames omitted
Caused by: java.lang.NoSuchMethodException: nz.co.qrious.transport.batchstarter.service.Worker.processMessage(java.lang.String)
at java.lang.Class.getMethod(Class.java:1786) ~[na:1.8.0_101]
at org.springframework.util.MethodInvoker.prepare(MethodInvoker.java:174) ~[spring-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:386) ~[spring-rabbit-1.6.1.RELEASE.jar:na]
... 12 common frames omitted

2016-09-02 08:10:35.833 ERROR 35841 --- [   container-29] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer
2016-09-02 08:10:35.833  INFO 35841 --- [   container-29] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish.
2016-09-02 08:10:35.833  INFO 35841 --- [   container-29] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish.

Fatal Exception in SimpleMessageListenerContainer geworfen wird, hier:

catch (ListenerExecutionFailedException ex) {
                    //Continue to process, otherwise re-throw
                    if (ex.getCause() instanceof NoSuchMethodException) {
                        throw new FatalListenerExecutionException("Invalid listener", ex);
                    }
                }

So scheint es, soll es heruntergefahren, wenn der Behälter konfiguriert ist, mit einer nicht-existenten Methode. Aber im Falle eines gebrochenen-Nachricht, es zu versuchen, rufen Sie die Methode mit einem falschen parameter geben, die auch verursacht eine NoSuchMethodException. Dies bedeutet, dass jeder Erzeuger, der töten kann meine Verbraucher mit einem gebrochenen Nachricht.

Vielen Dank für alle Vorschläge!

  • Ich bin mir nicht sicher, was du damit meinst But in case of a broken message, it's trying to call the method with a wrong parameter type, which also causes a NoSuchMethodException.. Sie benötigen zum anzeigen den kompletten stack-trace, nicht eine bearbeitete und welche version verwenden Sie? Normalerweise, schlechte JSON Ursachen MessageConversionExceptions, die speziell behandelt werden und solche Meldungen werden zurückgewiesen.
  • Vielen Dank für Ihren Kommentar. Ich habe den kompletten stacktrace, und auch die weitere Konfiguration. Es scheint er versucht zu rufen Sie meine listener-Methode ('processMessage') mit einem String als argument, anstatt nur nicht früher, wenn Sie versuchen zu Parsen von json.
  • Siehe meine Antwort für eine Arbeit, die rund - um- öffnete ich auch ein JIRA-Issue.
InformationsquelleAutor johannesv | 2016-09-01
Schreibe einen Kommentar