In Pika oder RabbitMQ, Wie kann ich überprüfen, ob alle Verbraucher sind derzeit in Anspruch genommen?

Ich würde gerne überprüfen, ob eine Verbraucher/Arbeitnehmer anwesend ist, um zu konsumieren, ein Nachricht ich bin zu senden.

Wenn es keine Arbeiter, würde ich anfangen, einige Arbeiter (sowohl die Konsumenten als auch Publisher sind auf einer einzigen Maschine) und gehen Sie dann über die Veröffentlichung Nachrichten.

Wenn es eine Funktion gibt wie connection.check_if_has_consumers würde ich umsetzen, es etwas wie dieses -

import pika
import workers

# code for publishing to worker queue
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

# if there are no consumers running (would be nice to have such a function)
if not connection.check_if_has_consumers(queue="worker_queue", exchange=""):
    # start the workers in other processes, using python's `multiprocessing`
    workers.start_workers()

# now, publish with no fear of your queues getting filled up
channel.queue_declare(queue="worker_queue", auto_delete=False, durable=True)
channel.basic_publish(exchange="", routing_key="worker_queue", body="rockin",
                            properties=pika.BasicProperties(delivery_mode=2))
connection.close()

Aber ich bin nicht in der Lage, jede Funktion zu finden mit check_if_has_consumers Funktionalität in pika.

Gibt es eine Möglichkeit, dies zu erreichen, verwenden pika? oder vielleicht, indem Sie reden zu Die Kaninchen direkt?

Ich bin nicht ganz sicher, aber ich glaube wirklich, dass RabbitMQ würde sich bewusst sein, die Zahl der Verbraucher abonniert verschiedenen Warteschlangen, denn es macht die Versand Nachrichten zu Ihnen und akzeptiert acks

Ich habe gerade angefangen mit RabbitMQ 3 Stunden... jede Hilfe ist willkommen...

hier ist die workers.py code, den ich schrieb, wenn Ihr keine Hilfe....

import multiprocessing
import pika


def start_workers(num=3):
    """start workers as non-daemon processes"""
    for i in xrange(num):    
        process = WorkerProcess()
        process.start()


class WorkerProcess(multiprocessing.Process):
    """
    worker process that waits infinitly for task msgs and calls
    the `callback` whenever it gets a msg
    """
    def __init__(self):
        multiprocessing.Process.__init__(self)
        self.stop_working = multiprocessing.Event()

    def run(self):
        """
        worker method, open a channel through a pika connection and
        start consuming
        """
        connection = pika.BlockingConnection(
                              pika.ConnectionParameters(host='localhost')
                     )
        channel = connection.channel()
        channel.queue_declare(queue='worker_queue', auto_delete=False,
                                                    durable=True)

        # don't give work to one worker guy until he's finished
        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(callback, queue='worker_queue')

        # do what `channel.start_consuming()` does but with stopping signal
        while len(channel._consumers) and not self.stop_working.is_set():
            channel.transport.connection.process_data_events()

        channel.stop_consuming()
        connection.close()
        return 0

    def signal_exit(self):
        """exit when finished with current loop"""
        self.stop_working.set()

    def exit(self):
        """exit worker, blocks until worker is finished and dead"""
        self.signal_exit()
        while self.is_alive(): # checking `is_alive()` on zombies kills them
            time.sleep(1)

    def kill(self):
        """kill now! should not use this, might create problems"""
        self.terminate()
        self.join()


def callback(channel, method, properties, body):
    """pika basic consume callback"""
    print 'GOT:', body
    # do some heavy lifting here
    result = save_to_database(body)
    print 'DONE:', result
    channel.basic_ack(delivery_tag=method.delivery_tag)

EDIT:

Habe ich, um vorwärts zu bewegen, so dass hier ein workaround, das werde ich nehmen, es sei denn, ein besserer Ansatz kommt,

So, RabbitMQ hat diese HTTP-apis, Sie arbeiten nach, die Sie gedreht haben, auf die management-plugin und an der Mitte des HTTP-apis-Seite gibt es

/api/verbindungen - Eine Liste aller offenen verbindungen.

/api/verbindungen/name - Einer einzelnen Verbindung. DELETEing es wird die Verbindung zu schließen.

So, wenn ich meine Verbindung Arbeitnehmer und meine Produziert beide von verschiedenen Verbindung Namen /user, ich werde in der Lage sein zu überprüfen, ob die Worker-Verbindung offen ist... (es könnte Probleme geben, wenn Arbeiter stirbt...)

warten auf eine bessere Lösung...

EDIT:

fand gerade diese in der rabbitmq-docs, aber das wäre hacky zu tun in python:

shobhit@oracle:~$ sudo rabbitmqctl -p vhostname list_queues name consumers
Listing queues ...
worker_queue    0
...done.

so, ich könnte etwas tun,

subprocess.call("echo password|sudo -S rabbitmqctl -p vhostname list_queues name consumers | grep 'worker_queue'")

hacky... hoffe immer noch, pika hat einige python-Funktion, dies zu tun...

Dank,

InformationsquelleAutor Optimus | 2012-10-23
Schreibe einen Kommentar