python-multiprocessing: einige Funktionen nicht zurückgegeben werden, wenn Sie komplett sind (Warteschlange material zu groß)

Ich bin mit multiprocessing -, Prozess-und Queue.
Ich starte mehrere Funktionen parallel und die meisten Verhalten sich freundlich: Sie beenden, Ihren Ausgang geht in Ihre Warteschlange, und Sie zeigen, wie .is_alive() == False. Aber aus irgendeinem Grund, einige Funktionen sind nicht Verhalten. Sie zeigen immer .is_alive() == True, wird auch nach der letzten Zeile in die Funktion (eine print-Anweisung, die sagen, "Fertig") abgeschlossen ist. Dies geschieht unabhängig von der Menge der Funktionen, die ich starten, auch dort ist nur eine. Wenn nicht parallel laufen, die Funktionen Verhalten sich fein und Rückkehr in der Regel. Was Art von was könnte das problem sein?

Hier ist die generische Funktion verwende ich zum verwalten der jobs. Bin ich aber nicht zeigen, ist die Funktionen, die ich bin übergeben Sie. Sie sind lang, oft matplotlib, manchmal startet einige shell-Befehle, aber ich kann nicht herausfinden, was die versagt haben in common.

def  runFunctionsInParallel(listOf_FuncAndArgLists):
    """
    Take a list of lists like [function, arg1, arg2, ...]. Run those functions in parallel, wait for them all to finish, and return the list of their return values, in order.   
    """
    from multiprocessing import Process, Queue

    def storeOutputFFF(fff,theArgs,que): #add a argument to function for assigning a queue
        print 'MULTIPROCESSING: Launching %s in parallel '%fff.func_name
        que.put(fff(*theArgs)) #we're putting return value into queue
        print 'MULTIPROCESSING: Finished %s in parallel! '%fff.func_name
        # We get this far even for "bad" functions
        return

    queues=[Queue() for fff in listOf_FuncAndArgLists] #create a queue object for each function
    jobs = [Process(target=storeOutputFFF,args=[funcArgs[0],funcArgs[1:],queues[iii]]) for iii,funcArgs in enumerate(listOf_FuncAndArgLists)]
    for job in jobs: job.start() # Launch them all
    import time
    from math import sqrt
    n=1
    while any([jj.is_alive() for jj in jobs]): # debugging section shows progress updates
        n+=1
        time.sleep(5+sqrt(n)) # Wait a while before next update. Slow down updates for really long runs.
        print('\n---------------------------------------------------\n'+ '\t'.join(['alive?','Job','exitcode','Func',])+ '\n---------------------------------------------------')
        print('\n'.join(['%s:\t%s:\t%s:\t%s'%(job.is_alive()*'Yes',job.name,job.exitcode,listOf_FuncAndArgLists[ii][0].func_name) for ii,job in enumerate(jobs)]))
        print('---------------------------------------------------\n')
    # I never get to the following line when one of the "bad" functions is running.
    for job in jobs: job.join() # Wait for them all to finish... Hm, Is this needed to get at the Queues?
    # And now, collect all the outputs:
    return([queue.get() for queue in queues])
  • Kompletter Schuss in der Dunkelheit: Tun das diejenigen, die hängen, einen Wert zurückzugeben? (buchstäblich, haben Sie return in Ihnen?)
  • Alle Funktionen, die guten und die schlechten, die Rückkehr einer einzigen (langen) string.
  • Wenn ich allerdings vermeiden den Einsatz von Warteschlangen, das problem geht Weg. So... eine Warteschlange gefüllt worden. Ich kann es sehen, und es sieht gut aus, aber irgendwie ist der job endet nicht, wenn es eine zugeordnete Warteschlange (und nur für "schlechte" Funktion).
  • Es kann aufgrund der Größe von dem, was ich bin, indem in der Warteschlange. stackoverflow.com/questions/10028809/... Wenn ich einen großen festen Zeichenkette in der Warteschlange ist, den job nicht zu Ende. Wenn es klein ist, es funktioniert. Ich verstehe nicht, wie man um dieses doch...
  • Versuchen Sie, stoßen sich die Größe der Warteschlange. Sie könnten auch Ihre Teilprozesse im wesentlichen imitieren die Warteschlange blockiert/timeout-Funktionalität, indem Sie schreiben an die Warteschlange mit einer expliziten timeout, dann fangen die Vollständige Ausnahme und drucken, dann die Wiederholung der put. Versuchen Sie die Segmentierung der string schreibt, um die Warteschlange zu.
  • Das klingt Komplex. Gibt es wirklich keine einfachere Möglichkeit zu Puffern, diese Dinge? Trotzdem, aus meiner Experimentieren, es scheint, dass ich brauche nicht zum aufteilen der Eingangs - (wüsste nicht wie, da die Art der Rückgabewert ist beliebig). Auch wenn die Warteschlange nicht unbedingt voll, wenn Funktionen hingen, nur tun einige .get()s auf das andere Ende zieht Sachen durch und ermöglicht die Prozesse zu beenden. Ich habe jetzt realisiert, dass in meiner vorgeschlagenen Antwort.
  • Upping Sie die Größe der Warteschlange löst das gleiche problem. Es ist ein einfaches argument, um die Queue Konstruktor.
  • Danke. Ich glaube nicht, ich würde wissen, wie groß, um es im Voraus. Meine Allgemeine-Funktions-Werkzeug erfordert eine allgemeinere Lösung.

InformationsquelleAutor CPBL | 2012-08-07
Schreibe einen Kommentar