spring boot rabbitmq MappingJackson2MessageConverter benutzerdefinierten Objekts der Konvertierung
Ich bin versucht, zu erstellen eine einfache spring-boot-Anwendung mit spring-boot, die "produzieren" - Nachrichten an einen rabbitmq exchange/queue und ein weiteres Beispiel spring-boot-app, die sich "verbrauchen" diese Nachrichten.
Also ich habe zwei apps (oder microservices, wenn Sie möchten).
1) "Produzent" microservice
2) "Verbraucher" microservice
Den "Hersteller" hat 2 domain-Objekte. Foo und Bar, die konvertiert werden sollen zu json und senden rabbitmq.
Der "Verbraucher" erhalten sollten, und zum konvertieren der json-Nachricht, die in einer Domäne Foo und Bar jeweils.
Aus irgendeinem Grund kann ich nicht machen, diese einfache Aufgabe. Es gibt nicht viele Beispiele dazu.
Für die Meldung converter, die ich verwenden möchten, org.springframework.messaging.converter.MappingJackson2MessageConverter
Hier ist was ich habe, so weit:
PRODUZENT MICROSERVICE
package demo.producer;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.core.RabbitMessagingTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.stereotype.Service;
@SpringBootApplication
public class ProducerApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class, args);
}
@Bean
Queue queue() {
return new Queue("queue", false);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("exchange");
}
@Bean
Binding binding(Queue queue, TopicExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("queue");
}
@Bean
public MappingJackson2MessageConverter jackson2Converter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
return converter;
}
@Autowired
private Sender sender;
@Override
public void run(String... args) throws Exception {
sender.sendToRabbitmq(new Foo(), new Bar());
}
}
@Service
class Sender {
@Autowired
private RabbitMessagingTemplate rabbitMessagingTemplate;
@Autowired
private MappingJackson2MessageConverter mappingJackson2MessageConverter;
public void sendToRabbitmq(final Foo foo, final Bar bar) {
this.rabbitMessagingTemplate.setMessageConverter(this.mappingJackson2MessageConverter);
this.rabbitMessagingTemplate.convertAndSend("exchange", "queue", foo);
this.rabbitMessagingTemplate.convertAndSend("exchange", "queue", bar);
}
}
class Bar {
public int age = 33;
}
class Foo {
public String name = "gustavo";
}
VERBRAUCHER MICROSERVICE
package demo.consumer;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.stereotype.Service;
@SpringBootApplication
@EnableRabbit
public class ConsumerApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@Autowired
private Receiver receiver;
@Override
public void run(String... args) throws Exception {
}
}
@Service
class Receiver {
@RabbitListener(queues = "queue")
public void receiveMessage(Foo foo) {
System.out.println("Received <" + foo.name + ">");
}
@RabbitListener(queues = "queue")
public void receiveMessage(Bar bar) {
System.out.println("Received <" + bar.age + ">");
}
}
class Foo {
public String name;
}
class Bar {
public int age;
}
Und hier ist die Ausnahme, ich bin immer:
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void demo.consumer.Receiver.receiveMessage(demo.consumer.Bar)]
Bean [demo.consumer.Receiver@1672fe87]
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:116)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:93)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:756)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:170)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1257)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:660)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1021)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1005)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$700(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1119)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Cannot handle message
... 13 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: No converter found to convert to class demo.consumer.Bar, message=GenericMessage [payload=byte[10], headers={amqp_receivedRoutingKey=queue, amqp_receivedExchange=exchange, amqp_deliveryTag=1, amqp_deliveryMode=PERSISTENT, amqp_consumerQueue=queue, amqp_redelivered=false, id=87cf7e06-a78a-ddc1-71f5-c55066b46b11, amqp_consumerTag=amq.ctag-msWSwB4bYGWVO2diWSAHlw, contentType=application/json;charset=UTF-8, timestamp=1433989934574}]
at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:115)
at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:77)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:127)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:100)
at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:113)
... 12 common frames omitted
Ausnahme sagt, es ist kein Konverter, und das ist wahr, mein problem ist, dass ich keine Ahnung habe, wie die MappingJackson2MessageConverter Konverter in der consumer-Seite (bitte beachten Sie, dass ich verwenden möchten org.springframework.messaging.converter.MappingJackson2MessageConverter und nicht org.springframework.amqp.Unterstützung.converter.JsonMessageConverter)
Irgendwelche Gedanken ?
Nur für den Fall, können Sie Gabel dieses Beispiel-Projekt an:
https://github.com/gustavoorsi/rabbitmq-consumer-receiver
- Schauen Sie hier: stackoverflow.com/questions/29337550/...
- In diesem Beispiel verwendet die org.springframework.amqp.Unterstützung.converter.Jackson2JsonMessageConverter (das gehört zu Abhängigkeit spring-amqp), in meinem Fall möchte ich mit org.springframework.messaging.converter.MappingJackson2MessageConverter (die gehört zum Frühling-messaging).
Du musst angemeldet sein, um einen Kommentar abzugeben.
Ok, ich habe endlich dieses arbeiten.
Frühjahr verwendet eine PayloadArgumentResolver zu extrahieren, zu konvertieren und die konvertierten Nachricht zu der Methode, die parameter kommentiert mit @RabbitListener. Irgendwie müssen wir um die mappingJackson2MessageConverter in diesem Objekt.
So, in den CONSUMER-app, die wir implementieren müssen, RabbitListenerConfigurer. Durch das überschreiben configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) wir können einen benutzerdefinierten DefaultMessageHandlerMethodFactory, in diese Fabrik setzen wir die message converter, und die Fabrik wird unsere PayloadArgumentResolver mit der korrekten konvertieren.
Hier ist ein Ausschnitt von dem code habe ich auch aktualisiert, die git Projekt.
ConsumerApplication.java
So, wenn Ihr der Produzent microservice es werden 2 Nachrichten in der Warteschlange. Eine, die ein Foo-Objekt und eine andere, die ein Bar-Objekt.
Durch ausführen der Verbraucher microservice Sie werden sehen, dass beide verbraucht werden, die durch die jeweilige Methode in der Empfänger Klasse.
Aktualisierte Ausgabe:
Es ist ein konzeptionelles problem, über das queuing von meiner Seite, denke ich. Was ich wollte erreicht, kann nicht möglich sein, durch die Erklärung 2 Methoden, annotiert mit @RabbitListener, dass Punkte auf die gleiche Warteschlange. Die Lösung wurde oben nicht richtig funktioniert. Wenn Sie senden zu rabbitmq, sagen wir, 6 Foo Nachrichten und 3 Bar Nachrichten, die Sie nicht erhalten 6 mal durch den Hörer, die mit Foo parameter. Es scheint, dass die Hörer sind aufgerufen, parallel, sodass es keine Möglichkeit gibt zu unterscheiden, die Zuhörer aufrufen, auf der Grundlage der Methode argument-Typ.
Meine Lösung (und ich bin nicht sicher, ob dies der beste Weg ist, ich bin offen für Vorschläge hier) erstellen Sie eine Warteschlange für jede Entität.
So, jetzt habe ich Warteschlange.bar und Warteschlange.foo, und aktualisieren @RabbitListener(queues = "Warteschlange.foo")
Noch einmal, ich habe aktualisiert, der code und Sie können check it out in meinem git-repository.
@RabbitListener
zusammen mit@RabbitHandler
auf die einzelnen Methoden, die diesen Anwendungsfall unterstützt.Dies nicht getan haben, mich aber, wie es scheint, müssen Sie sich die entsprechenden Konvertierungen durch die Einrichtung einer RabbitTemplate. Werfen Sie einen Blick auf Abschnitt 3.6.2 in in diesem Frühjahr-Dokumentation. Ich weiß, dass es konfiguriert ist, mit dem AMQP-Klassen, aber wenn die messaging-Klasse, die Sie erwähnen kompatibel ist, es gibt keinen Grund können Sie nicht ersetzen. Sieht aus wie diese Referenz erklärt, wie Sie könnte es tun, unter Verwendung der Java-Konfiguration anstatt XML. Ich habe nicht wirklich die Kaninchen so habe ich keine persönliche Erfahrung, aber ich würde gerne hören, was Sie herausfinden.