RabbitMQ: schnell Produzent und Verbraucher langsam
Ich habe eine Anwendung, die verwendet RabbitMQ als message-queue zum senden/empfangen von Nachrichten zwischen zwei Komponenten: sender und Empfänger. Der sender sendet Nachricht in eine sehr schnelle Art und Weise. Der Empfänger erhält die Nachricht und macht dann einige sehr zeitaufwändige Aufgabe (vor allem Datenbank schreiben für sehr große Daten-Größe). Da der Empfänger nimmt eine sehr lange Zeit, um die Aufgabe abzuschließen und dann rufen Sie die nächste Nachricht in der Warteschlange, der sender, halten, füllt sich die Warteschlange schnell. Also meine Frage ist: Wird diese Ursache der message-queue-überlauf?
Den message consumer sieht wie folgt aus:
public void onMessage() throws IOException, InterruptedException {
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare("allDataCase", true, false, false, null).getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
JSONObject json = new JSONObject(message);
String caseID = json.getString("caseID");
//following takes very long time
dao.saveToDB(caseID);
}
}
Jede empfangene Nachricht durch den Verbraucher enthält eine caseID. Für jede caseID, es wird speichern große Menge von Daten in der Datenbank, das dauert sehr lange Zeit. Derzeit wird nur ein Verbraucher für die RabbitMQ seit dem producer/consumer-verwenden Sie die gleiche Warteschlange für die publish/subscribe-der caseID. Also, wie kann ich die Geschwindigkeit des consumer-Durchsatz, so dass die Verbraucher können aufholen mit dem Produzenten und vermeiden Sie die Meldung überlauf der Warteschlange? Sollte ich das multithreading in den consumer-Teil, um die Geschwindigkeit der Verbrauch? Oder sollte ich mehrere Verbraucher zu konsumieren, die eingehende Nachricht simutaneously? Oder ist es so asynchron zu lassen, die Verbraucher konsumieren die Nachricht asynchron, ohne zu warten, es zu beenden? Alle Vorschläge sind willkommen.
InformationsquelleAutor der Frage tonga | 2014-10-28
Du musst angemeldet sein, um einen Kommentar abzugeben.
Ja. RabbitMQ wird Sie in einen Zustand des "flow-control", um zu verhindern, dass übermäßig viel Speicher Verbrauch als die Länge der Warteschlange erhöht. Es startet auch die anhaltenden Nachrichten auf der Festplatte, anstatt halten Sie in Erinnerung.
Haben Sie 2 Möglichkeiten:
Nicht, es sei denn, Sie haben eine durchdachte Lösung. Das hinzufügen von Parallelität zu einer Anwendung hinzufügen, wird eine Menge Aufwand auf der consumer-Seite. Sie können am Ende anstrengend, der ThreadPool oder die Drosselung memory-usage.
Beim Umgang mit AMQP, die Sie wirklich brauchen, zu prüfen, die business-Anforderung für jeden Prozess, um die optimale Lösung zu finden. Wie die Zeit-sensibel sind, Ihre eingehenden Nachrichten? Sie müssen beibehalten werden, um die DB so schnell wie möglich, oder macht es einen Unterschied, um Ihre Benutzer, ob oder nicht, dass Daten sofort verfügbar sind?
Wenn die Daten brauchen nicht beibehalten werden sofort, Sie könnte sich ändern Sie Ihre Anwendung so, dass der Verbraucher(s) entfernen Sie einfach die Nachrichten aus der Warteschlange und speichert Sie in einem Cache-Kollektion, die in Redis, zum Beispiel. Führen Sie einen zweiten Prozess, dann liest und verarbeitet der zwischengespeicherten Nachrichten nacheinander. Dadurch wird sichergestellt, dass Ihr queue-Länge wächst nicht ausreichend, um das Ergebnis im flow-control, während verhindert wird, dass Ihre DB aus bombardiert mit schreiben, Anfragen, die sind in der Regel teurer als read-Anforderungen. Ihre Verbraucher(s) nun entfernen Sie einfach die Nachrichten aus der Warteschlange, die behandelt werden, die von einem anderen Prozess später.
InformationsquelleAutor der Antwort Paul Mooney
"Also, wie kann ich die Geschwindigkeit des consumer-Durchsatz, so dass die Verbraucher können aufholen mit dem Produzenten und vermeiden Sie die Meldung überlauf in die Warteschlange?" Dies ist die Antwort, "verwenden mehrerer Verbraucher zu konsumieren, die eingehende Nachricht gleichzeitig", Verwendung von multi-threading und parallel laufen diese Verbraucher Implementierung von shared-nothing-Prinzip, http://www.eaipatterns.com/CompetingConsumers.html
InformationsquelleAutor der Antwort voutrin
Haben Sie viele Möglichkeiten, um Ihre Leistung zu erhöhen.
Können Sie erstellen einen worker-Warteschlange mit mehr Produzenten, die auf diese Weise erstellen Sie sich ein einfaches load-balance-system. verwenden Sie nicht die exchange---> Warteschlange, aber nur Warteschlange. Lesen Sie diese post, RabbitMQ Non-Round-Robin-Versand
Wenn Sie eine Nachricht erhalten, können Sie erstellen eine poolthread für die Daten einfügen, die auf Ihre Datenbank, aber in diesem Fall haben Sie zum verwalten des Scheiterns.
Aber ich denke, das grundsätzliche problem ist die Datenbank und nicht RabbitMQ. Mit einem guten tuning, multi-threading und worker-Warteschlange Sie können eine skalierbare und schnelle Lösung.
Lassen Sie mich wissen,
InformationsquelleAutor der Antwort Gabriele
Während es wahr ist, indem mehr Verbraucher kann die Dinge beschleunigen, das eigentliche Problem wird sein, das speichern in die Datenbank.
Gibt es schon viele Antworten hier, dass die Diskussion über das hinzufügen von Verbrauchern (threads, und oder Maschinen) und die änderung der QoS-so werde ich nicht wiederholen, dass. Stattdessen sollten Sie ernsthaft erwägen, mit dem Aggregator Muster aggregieren der Nachrichten in einer Gruppe von Nachrichten und dann die batch legen Sie die Gruppe in Ihre Datenbank auf einen Schlag.
Ihre aktuelle code für jede Nachricht, die wahrscheinlich öffnet eine Verbindung ein, fügt die Daten und schließt die Verbindung (bzw. der Rückkehr auf den pool). Schlimmer kann es auch sein, mithilfe von Transaktionen unterstützt werden.
Durch mit dem aggregator pattern Ihr im wesentlichen die Pufferung der Daten, bevor Sie Spülen.
Schreiben nun einen guten aggregator ist schwierig. Sie müssen entscheiden, wie Sie wollen-Puffer (dh jeder Arbeiter hat seinen eigenen Puffer-oder zentral-Puffer wie Redis). Spring integration ist ein aggregator, glaube ich.
InformationsquelleAutor der Antwort Adam Gent
Als Antwort schlage ich vor: sowohl als auch.
Können Sie die Vorteile aus, die mehrere Empfänger, sowie das einrichten jeder Empfänger zum ausführen der Aufgabe in einem separaten Thread, so dass der Empfänger zu akzeptieren, die nächste Nachricht in der Warteschlange.
Natürlich dieser Ansatz geht davon aus, dass das Ergebnis jeder operation (das schreiben auf die db, wenn ich das richtig verstanden habe) nicht beeinflussen in irgendeiner Weise das Ergebnis der nachfolgenden Operationen in Reaktion von anderen Nachrichten.
InformationsquelleAutor der Antwort mbera