python-multiprocessing: einige Funktionen nicht zurückgegeben werden, wenn Sie komplett sind (Warteschlange material zu groß)
Ich bin mit multiprocessing -, Prozess-und Queue.
Ich starte mehrere Funktionen parallel und die meisten Verhalten sich freundlich: Sie beenden, Ihren Ausgang geht in Ihre Warteschlange, und Sie zeigen, wie .is_alive() == False. Aber aus irgendeinem Grund, einige Funktionen sind nicht Verhalten. Sie zeigen immer .is_alive() == True, wird auch nach der letzten Zeile in die Funktion (eine print-Anweisung, die sagen, "Fertig") abgeschlossen ist. Dies geschieht unabhängig von der Menge der Funktionen, die ich starten, auch dort ist nur eine. Wenn nicht parallel laufen, die Funktionen Verhalten sich fein und Rückkehr in der Regel. Was Art von was könnte das problem sein?
Hier ist die generische Funktion verwende ich zum verwalten der jobs. Bin ich aber nicht zeigen, ist die Funktionen, die ich bin übergeben Sie. Sie sind lang, oft matplotlib, manchmal startet einige shell-Befehle, aber ich kann nicht herausfinden, was die versagt haben in common.
def runFunctionsInParallel(listOf_FuncAndArgLists):
"""
Take a list of lists like [function, arg1, arg2, ...]. Run those functions in parallel, wait for them all to finish, and return the list of their return values, in order.
"""
from multiprocessing import Process, Queue
def storeOutputFFF(fff,theArgs,que): #add a argument to function for assigning a queue
print 'MULTIPROCESSING: Launching %s in parallel '%fff.func_name
que.put(fff(*theArgs)) #we're putting return value into queue
print 'MULTIPROCESSING: Finished %s in parallel! '%fff.func_name
# We get this far even for "bad" functions
return
queues=[Queue() for fff in listOf_FuncAndArgLists] #create a queue object for each function
jobs = [Process(target=storeOutputFFF,args=[funcArgs[0],funcArgs[1:],queues[iii]]) for iii,funcArgs in enumerate(listOf_FuncAndArgLists)]
for job in jobs: job.start() # Launch them all
import time
from math import sqrt
n=1
while any([jj.is_alive() for jj in jobs]): # debugging section shows progress updates
n+=1
time.sleep(5+sqrt(n)) # Wait a while before next update. Slow down updates for really long runs.
print('\n---------------------------------------------------\n'+ '\t'.join(['alive?','Job','exitcode','Func',])+ '\n---------------------------------------------------')
print('\n'.join(['%s:\t%s:\t%s:\t%s'%(job.is_alive()*'Yes',job.name,job.exitcode,listOf_FuncAndArgLists[ii][0].func_name) for ii,job in enumerate(jobs)]))
print('---------------------------------------------------\n')
# I never get to the following line when one of the "bad" functions is running.
for job in jobs: job.join() # Wait for them all to finish... Hm, Is this needed to get at the Queues?
# And now, collect all the outputs:
return([queue.get() for queue in queues])
- Kompletter Schuss in der Dunkelheit: Tun das diejenigen, die hängen, einen Wert zurückzugeben? (buchstäblich, haben Sie
return
in Ihnen?) - Alle Funktionen, die guten und die schlechten, die Rückkehr einer einzigen (langen) string.
- Wenn ich allerdings vermeiden den Einsatz von Warteschlangen, das problem geht Weg. So... eine Warteschlange gefüllt worden. Ich kann es sehen, und es sieht gut aus, aber irgendwie ist der job endet nicht, wenn es eine zugeordnete Warteschlange (und nur für "schlechte" Funktion).
- Es kann aufgrund der Größe von dem, was ich bin, indem in der Warteschlange. stackoverflow.com/questions/10028809/... Wenn ich einen großen festen Zeichenkette in der Warteschlange ist, den job nicht zu Ende. Wenn es klein ist, es funktioniert. Ich verstehe nicht, wie man um dieses doch...
- Versuchen Sie, stoßen sich die Größe der Warteschlange. Sie könnten auch Ihre Teilprozesse im wesentlichen imitieren die Warteschlange blockiert/timeout-Funktionalität, indem Sie schreiben an die Warteschlange mit einer expliziten timeout, dann fangen die Vollständige Ausnahme und drucken, dann die Wiederholung der put. Versuchen Sie die Segmentierung der string schreibt, um die Warteschlange zu.
- Das klingt Komplex. Gibt es wirklich keine einfachere Möglichkeit zu Puffern, diese Dinge? Trotzdem, aus meiner Experimentieren, es scheint, dass ich brauche nicht zum aufteilen der Eingangs - (wüsste nicht wie, da die Art der Rückgabewert ist beliebig). Auch wenn die Warteschlange nicht unbedingt voll, wenn Funktionen hingen, nur tun einige .get()s auf das andere Ende zieht Sachen durch und ermöglicht die Prozesse zu beenden. Ich habe jetzt realisiert, dass in meiner vorgeschlagenen Antwort.
- Upping Sie die Größe der Warteschlange löst das gleiche problem. Es ist ein einfaches argument, um die Queue Konstruktor.
- Danke. Ich glaube nicht, ich würde wissen, wie groß, um es im Voraus. Meine Allgemeine-Funktions-Werkzeug erfordert eine allgemeinere Lösung.
Du musst angemeldet sein, um einen Kommentar abzugeben.
Okay, es scheint, dass das Rohr zum füllen der Warteschlange eingesteckt werden, wenn die Ausgabe einer Funktion ist zu groß (meinem groben Verständnis? Dies ist eine ungelöste/geschlossen Fehler? http://bugs.python.org/issue8237). Ich veränderte den code in meiner Frage, so dass es eine gewisse Pufferung (Warteschlangen werden regelmäßig geleert, während Prozesse, die ausgeführt werden), das löst alle meine Probleme. So, jetzt dauert dies eine Sammlung von Aufgaben (Funktionen und deren Argumente), startet Sie, und sammelt die Ausgänge. Ich wünschte, es wäre einfacher /sauberer suchen.
Bearbeiten (2014 Sep; update 2017 Nov: neu geschrieben, für bessere Lesbarkeit): ich aktualisiere den code mit den Erweiterungen, die ich gemacht habe seit. Der neue code (gleiche Funktion, aber bessere Funktionen) ist hier:
https://gitlab.com/cpbl/cpblUtilities/blob/master/parallel.py
Den Aufruf Beschreibung ist auch unten.
multiprocessing
setzt auf Beizen zu übergeben von Objekten zwischen Prozessen (einschließlich Resultate).