Spring AMQP RabbitMQ wie man zwei parallele Verbraucher nicht greifen gleiche Aufgabe in der gleichen Zeit?
Habe ich zwei Systeme integriert sind, mit RabbitMQ.
Hintergrund
Client-senden Sie multiple-request-Nachrichten von Spring-AMQP outbound-Gateway zu RabbitMQExchange, die rabbitmq-DirectExchange verwenden round-robin - Versand dieser Nachrichten an Mehrere Arbeitnehmer(die Arbeiter sind unabhängige, die sich auf verschiedenen desktops die parallel läuft Arbeitnehmer code für die Verarbeitung verschiedene Nachrichten von RabbitExchange, die durch die Nutzung simpleMessageListner.
Logic Flow
Ähnlich mit Rabbitmq Tutorial multiWorker-DirectExchange.
Client-----sendRequests (5tasks)- - - - >RabbitMQ-DirectExchange
dann Rabbitmq-DirectExchange verteilen diese 5 Aufgaben für die Arbeitnehmer
PC1 ( Worker1 ), PC2 ( Worker2 )
ExchangeType & meine Bindungen
<!-- rabbit connection factory, rabbit template, and rabbit admin -->
<rabbit:connection-factory
id="connectionFactory"
host="local IP address"
username="guest"
password="guest"
channel-cache-size="10" />
<rabbit:template id="amqpTemplate"
connection-factory="connectionFactory"
reply-timeout="600000"
exchange="JobRequestDirectExchange"/>
<rabbit:admin connection-factory="connectionFactory" id="rabbitAdmin" />
<rabbit:direct-exchange name="taskRequests"
auto-delete="false"
durable="true" >
<rabbit:bindings>
<rabbit:binding queue="jobRequests" key="request.doTask" />
</rabbit:bindings>
</rabbit:direct-exchange>
<rabbit:queue name="jobRequests" auto-delete="false" durable="true" />
Worker-Der-consumer-Konfiguration
<rabbit:listener-container id="workerContainer"
acknowledge="auto"
prefetch="1"
connection-factory="connectionFactory">
<rabbit:listener ref="taskWorker" queue-names="jobRequests" />
</rabbit:listener-container>
Der Arbeiter-Klasse ist die einfache POJO-wer verarbeitet die Anfrage und Aufgabe.
Verwenden: RabbitMQ 3.2.2 mit Spring-Integration-Amqp 2.2
, Was ich erwarten
Erwarte ich, dass Worker1 können Sie erhalten einige Aufgaben, während Worker2 können wählen Sie die restlichen Aufgaben ( die anderen Aufgaben ).
Ich Wünsche der Arbeitnehmer kann parallel zusammen insgesamt 5 Aufgaben. Jedes mal, wenn jeder Arbeiter nur tun, eine Aufgabe, die, nach dem Ziel zu verteilen, andere Aufgaben nacheinander. (Kaninchen-listner festgelegt wurde prefetch=1)
Wie
worker1: t2 t3 t5
worker2: t1-t4
Aber
Nach vielen Laufzeit-test, irgendwann wird es die Aufgabe richtig.
Worker1------ task4 task1
Worker2------ task2 task3 task5
Irgendwann während es die falsch Art und Weise, wie diese:
Worker1------ task4 task1
Worker2------ task4 task1 task2
Sich, die task4 und task1 sind wieder durch worker1 und worker2 sametime.
Runtime-test:
Habe ich überprüft, dass die client richtig senden task1 task2 task3 task4 task5 request-Nachricht an RabbitExchange. Aber jedesmal, wenn jeder Arbeiter erhalten verschiedene Aufgaben. Es ist eine gemeinsame Falle, dass auslösen können falsch dispathcing.
Gibt es 5tasks (t1,t2,t3,t4,t5) bei RabbitmqExchange, und Sie werden schicken 2 parallel Arbeitnehmer (w1,w2).
w1 bekam Aufgaben: t2 t1 t4
w2 bekam Aufgaben: t3 t1
Als Round-Robin-Versand-Methode, w1 und w2 in der Folge bekam Aufgaben.
w1 bekam t2 und w2 bekam t3.
Während t2 und t3 läuft, RabbitmqExchange senden t1 zu w1 und warten auf ack vom w1.
Angenommen t2 verbringen Sie mehr Zeit, um zu beenden Aufgabe als t3 tut und w2 ist kostenlos, wenn w1 tun t1.
w2 finish t3 Aufgabe erhalten RabbitmqExchange ausgelöst t1 weil w2 nicht ausgelastet ist und RabbitExchange nicht erhalten t1 beendet Aufgabe ack-Nachricht.
Mein Verständnis ist
Beide w1 und w2 tun gleiche Aufgabe t1. Entweder einer von Ihnen einmal beenden t1 senden ack zurück zu RabbitmqExchange, dann RabbitmqExchange dequeue eine task-Nachricht. Als t1 beendet worden zweimal, RabbitmqExchange dequeue eine weitere Nachricht, dass es sein sollte. So in dieser Art und Weise t5 Nachricht aus der Warteschlange entfernt werden, weil t1 wurde zweimal durchgeführt. Obwohl 5 Meldungen im RabbitmqExchange sind acked-und dequeue-finish. Aber die zwei Arbeiter fehlen, t5 und tun t1 zweimal.
Was soll ich tun, verhindern können, dass parallel zwei Arbeiter packen die gleiche Nachricht vom gleichen Kaninchen Warteschlange?
Ich versucht auto-ack Art und Weise, die Nachricht korrekt acked. Aber während der Zeit, die der server warten, bis ein Arbeiter ack, rabbitmq kann redispatch wird die Nachricht, die nicht-acked aber bereits verteilt zu einem anderen Arbeiter.
Denken auch über synchronisiert die gesendeten Nachrichten oder vorrangig auf gesendete Nachrichten. Aber tun Sie nicht haben eine klare vision, wie das zu erreichen.
Ich bin dankbar zu hören, irgendwelche Ideen zu diesem problem.Dank
Danke Gary für deine schnelle Antwort. Firstime ich dieses problem auftreten, dachte ich, ich sollte noch die manuelle ack auf Arbeitnehmer-Seite, um zu lösen, einige Nachrichten wurden umverteilen auf andere Arbeitnehmer. Aber nach Forschung, ich verstehe, dass spring-amqp auto-ack es richtig gemacht hast acknowldegement. Das eigentliche problem ist, während der ein Arbeitnehmer arbeitet auf die Aufgabe, RabbitmqBroker kann wieder an andere Arbeiter, denn diese Aufgabe wurde nicht beendet (broker nicht empfangen ack) und die anderen Arbeiter frei zu bekommen job. Auf diese Weise werden zwei Verbraucher die gleiche Nachricht erhalten. Ich bin noch auf der Suche, wie dieses problem zu lösen.
>>RabbitmqBroker kann wieder an andere Arbeiter, denn diese Aufgabe wurde nicht abgeschlossen - das ist einfach nicht wahr. Der Makler wird nicht liefern eine un-acked-Nachricht an einen anderen Verbraucher (es sei denn, wie @cvbarros sagt in seiner Antwort, die erste consumer stirbt oder lehnt die Nachricht), in dem Fall, statt un-acked, die Nachricht ist requeued.
InformationsquelleAutor Erica Zhang | 2014-01-23
Du musst angemeldet sein, um einen Kommentar abzugeben.
Eine Sache, die ich denken kann, verursacht dieser duplizierten Nachrichten für die Verbraucher ist, wenn ein Verbraucher schließt den Kanal vor dem senden einer ack-Nachricht.
In diesem Fall, die RabbitMQ-broker requeue die Nachricht und legen Sie es nachgeliefert Flagge zu wahr. Von RabbitMQ docs:
Ist, wenn Sie die Prüfung beim schließen eines der worker-Prozesse, die vor dem senden eines ack -, oder in Fall, dass Sie Schuld, das wird sehr wahrscheinlich passieren. Sie können versuchen, zu prüfen, die nachgeliefert - flag, um zu vermeiden, dass es erneut verarbeitet werden, durch einen anderen Verbraucher, wenn das der Fall ist.
Andere Sache, die ich bemerkt habe, ist die prefetch Einstellung in Ihrem consumer-Konfiguration. Sollten Sie hier einen höheren Wert (tune es für Ihre Bedürfnisse, anstatt es nur 1. Sie können lernen, mehr über prefetch hier.
Hoffe, das hilft!
InformationsquelleAutor cvbarros
Versuchte ich lange Zeit, um herauszufinden SpringConfigured-Weg, um diese Funktion zu implementieren, scheiterte aber.
Während ich die funktionierende Lösung mit RabbitMQ-Java-Client-API.
Mit Spring-Asynchron-Gateway mit QuartzScheduler, es haben immer problem mit Nachricht senden als nötig. Ich denke, es hat einen Grund für multi-threads Sortieren.
Am Anfang dachte ich, es da, dass der Kanal-Instanz zugegriffen gleichzeitig von mehreren threads. Auf diese Weise bestätigt werden nicht korrekt behandelt.
Oben aus http://www.rabbitmq.com/javadoc/com/rabbitmq/client/Channel.html
Schließlich ich entscheiden, aufgeben nutzen Spring Weg und wechselt wieder zu verwenden RabbitMQ-API(Bevor ich mit Spring-XML-Konfiguration des Gateways/- Kanäle, jetzt verwenden RabbitMQ-JavaClient java-Programmierung Art und Weise erklären, Austausch mit den Kanälen.). Und fügen Sie die Verwendung von RabbitMQRPC für asynchrone callback. Nun funktioniert alles wunderbar für die aktuelle Anforderung.
So zusammenfassend, die endgültige Lösung für meine Anforderung ist:
Verwenden RabbitMQ JAVAClient API zu erklären, exchange/Kanäle/Bindung/routingKey.
Für beide client-und server-Seite.
Verwenden RabbitMQ RPC für das implementieren von asynchronen callback-Funktion.
(Ich Folge RabbitMQ-java tutorial, verwenden Sie diesen link: http://www.rabbitmq.com/tutorials/tutorial-six-java.html)
InformationsquelleAutor Erica Zhang
Haben Sie versucht, die Einstellung concurrentConsumers Eigenschaft des listener-container, wie besprochen,hier?
InformationsquelleAutor user3400371