Python und RabbitMQ - Beste Weg, um hören zu konsumieren Ereignisse, die von mehreren Kanälen?
Habe ich zwei, separate RabbitMQ-Instanzen. Ich bin versuchen zu finden der beste Weg, um zu hören, um Ereignisse aus beiden.
Ich kann zum Beispiel verbrauchen Ereignisse zu eins mit dem folgenden:
credentials = pika.PlainCredentials(user, pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(host="host1", credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(Exclusive=True)
self.channel.queue_bind(exchange="my-exchange", result.method.queue, routing_key='*.*.*.*.*')
channel.basic_consume(callback_func, result.method.queue, no_ack=True)
self.channel.start_consuming()
Habe ich einen zweiten host, "host2",, ich würde gerne hören, wie gut. Ich dachte über die Schaffung von zwei separate threads, um dies zu tun, aber von dem was ich gelesen habe, pika ist nicht thread-sicher. Gibt es einen besseren Weg? Oder würde die Schaffung von zwei separate threads, die jedes hören zu einem anderen Kaninchen Instanz (host1 und host2) ausreichend sein?
InformationsquelleAutor blindsnowmobile | 2015-02-16
Du musst angemeldet sein, um einen Kommentar abzugeben.
Die Antwort auf die Frage "was ist der beste Weg" ist, hängt stark von Ihrem Nutzungsverhalten von Warteschlangen und was meinen Sie mit "am besten". Da kann ich nicht kommentieren, Fragen noch, ich werde nur versuchen, Ihnen einige mögliche Lösungen.
In jedem Beispiel werde ich davon ausgehen, Austausch ist bereits deklariert.
Threads
Können Sie verbrauchen die Nachrichten von zwei Warteschlangen, die auf separaten hosts in einzelnen Prozess mit
pika
.Du hast Recht - als seine eigene FAQ Staaten,
pika
ist nicht thread-sicher, aber es kann verwendet werden, in multi-threaded-Weise durch die Schaffung von verbindungen zu RabbitMQ-hosts pro thread. Machen Sie dieses Beispiel laufen in threads mitthreading
Modul sieht wie folgt aus:Habe ich erklärt
callback_func
als eine Methode, rein zu verwendenConsumerThread.name
beim drucken message-body. Es könnte genauso gut eine Funktion außerhalb derConsumerThread
Klasse.Prozesse
Alternativ können Sie immer nur das ausführen eines Prozesses mit consumer-code pro Warteschlange, die Sie konsumieren wollen events.
und dann ausführen von:
Wenn die Arbeit, die Sie tun auf Nachrichten aus Warteschlangen ist CPU-heavy und solange die Anzahl der Kerne in Ihrer CPU >= Zahl der Verbraucher, es ist in der Regel besser für die Verwendung dieses Ansatzes - es sei denn, Ihrer Warteschlangen sind leer, die meisten von der Zeit und dem Verbraucher nicht nutzen, CPU-Zeit*.
Async
Andere alternative ist es, beinhalten einige asynchrone framework (zum Beispiel
Verdreht
) und läuft die ganze Sache im single-thread.Können Sie nicht mehr nutzen
BlockingConnection
im asynchronen code; glücklicherweisepika
hat adapter fürTwisted
:Dieser Ansatz wäre sogar noch besser, je mehr Warteschlangen, die Sie verbrauchen würde und die weniger CPU-gebunden sind, die arbeiten ausführen, die von Verbrauchern ist*.
Python 3
Da Sie erwähnt haben
pika
habe ich eingeschränkt, mir zu Python 2.x-basierten Lösungen, weilpika
ist noch nicht portiert.Aber in Fall, dass Sie verschieben wollen, zu >=3.3, eine mögliche option ist die Verwendung
asyncio
mit einem AMQP-Protokoll (das Protokoll, die Sie sprechen mit RabbitMQ) , z.B.asynqp
oderaioamqp
.* - bitte beachten Sie, dass diese sind sehr flach - Tipps- in den meisten Fällen ist die Wahl nicht so offensichtlich; was wird für Sie die beste ist, hängt von Warteschlangen "Sättigung" (Nachrichten/die Zeit) was tun Sie nach Erhalt dieser Nachrichten, in welcher Umgebung Sie führen den Verbraucher in etc.; es gibt keinen Weg, um sicher zu sein, als benchmark alle Implementierungen
Du bist herzlich willkommen. 🙂 Ich habe auch darauf hingewiesen, eine weitere Sache, die in der edit.
InformationsquelleAutor Unit03
Unten ist ein Beispiel, wie ich mit einer rabbitmq-Instanz zu hören 2 queues gleichzeitig:
InformationsquelleAutor NasimBM