Python multiprocessing mit einer Update-Warteschlange und eine output-queue
Wie kann ich ein Skript, ein Python-Multiprozess-mit zwei Warteschlangen wie diese?:
- man als Arbeits-Warteschlange, beginnt mit einigen Daten und, die, je nach den Bedingungen der Funktionen parallelisiert werden, erhält weitere Aufgaben on-the-fly,
- anderen, sammelt Ergebnisse und wird verwendet, um zu notieren Sie das Ergebnis nach der Verarbeitung abgeschlossen ist.
Ich im Grunde brauchen, um einige weitere Aufgaben in der aktiven Warteschlange, je nach dem, was ich in seiner ursprünglichen Elemente. Das Beispiel, das ich post unten ist albern (ich verwandeln könnte der Artikel, wie ich Sie gerne und setzen es direkt in die Ausgabe-Warteschlange), aber seine mechanik sind klar und spiegeln Teil des Konzepts, die ich brauche, sich zu entwickeln.
Hiermit mein Versuch:
import multiprocessing as mp
def worker(working_queue, output_queue):
item = working_queue.get() #I take an item from the working queue
if item % 2 == 0:
output_queue.put(item**2) # If I like it, I do something with it and conserve the result.
else:
working_queue.put(item+1) # If there is something missing, I do something with it and leave the result in the working queue
if __name__ == '__main__':
static_input = range(100)
working_q = mp.Queue()
output_q = mp.Queue()
for i in static_input:
working_q.put(i)
processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())] #I am running as many processes as CPU my machine has (is this wise?).
for proc in processes:
proc.start()
for proc in processes:
proc.join()
for result in iter(output_q.get, None):
print result #alternatively, I would like to (c)pickle.dump this, but I am not sure if it is possible.
Dieser nicht zu Ende, noch drucken ohne Ergebnis.
Am Ende des gesamten Prozesses, würde ich mag, um sicherzustellen, dass die Arbeits-Warteschlange leer ist, und dass alle parallelen Funktionen fertig geschrieben haben, um die Ausgabe-Warteschlange vor der später iteriert wird, um die Ergebnisse. Haben Sie Vorschläge, wie man damit es funktioniert?
Du musst angemeldet sein, um einen Kommentar abzugeben.
Haben Sie einen Tippfehler in der Zeile schafft, dass die Prozesse. Es sollte
mp.Process
, nichtmp.process
. Dies ist, was die Ursache der Ausnahme, die Sie erhalten.Außerdem sind Sie nicht Schleifen Sie in Ihre Mitarbeiter, so dass Sie tatsächlich verbrauchen nur ein einzelnes Element aus der queue und beendet sich dann. Ohne zu wissen, mehr über die erforderliche Logik, es ist nicht einfach, um spezifische Beratung, aber Sie werden wahrscheinlich wollen, schließen Sie den Körper Ihres
worker
Funktion innerhalb einerwhile True
Schleife und fügen Sie eine Bedingung in der Körper zu beenden, wenn die Arbeit getan ist.Bitte beachten Sie, dass, wenn Sie nicht hinzufügen eine Bedingung, die explizit die Ausfahrt aus der Schleife Ihrer Mitarbeiter einfach stall, für immer, wenn die Warteschlange leer ist. Sie sollten erwägen, mit Hilfe der so genannten poison pill Technik zu signalisieren, die Arbeiter, die Sie möglicherweise verlassen. Sie finden ein Beispiel und einige nützliche Diskussion in der PyMOTW Artikel auf Die Kommunikation Zwischen Prozessen.
Als für die Anzahl der Prozesse zu verwenden, müssen Sie die benchmark-etwas zu finden, was für Sie arbeitet, aber im Allgemeinen, ein Prozess pro core ist ein guter Ausgangspunkt, wenn Sie Ihre Arbeitsauslastung CPU-gebunden. Wenn Ihre Arbeitsbelastung ist IO gebunden, die Sie haben könnten bessere Ergebnisse mit einer höheren Anzahl von Beschäftigten.
Den folgenden code erreicht die erwarteten Ergebnisse. Es folgt die Anregungen von @tawmas.
Dieser code ermöglicht die Nutzung mehrerer Kerne in einem Prozess, der erfordert, dass die Warteschlange, die Daten-feeds, um die Arbeitnehmer können aktualisiert werden, indem Sie während der Verarbeitung:
try: \n print result \n except Empty: \n break \n
Dies ist der Druck der insgesamt erwarteten Ergebnisse, aber die Ausgabe in der Konsole ist noch beschweren sich über die Ausnahme. Ich glaube ich bin nicht richtig damit umgehen.