Teilen viele queues unter-Prozesse in Python

Ich bin mir bewusst multiprocessing.Manager() und wie es verwendet werden kann zum erstellen von gemeinsam genutzten Objekten, insbesondere von Warteschlangen, die geteilt werden können zwischen den Beschäftigten. Es ist diese Frage, diese Frage, diese Frage und sogar eine meiner eigenen Fragen.

Allerdings brauche ich, um zu definieren, sehr viele Warteschlangen, von denen jede Verknüpfung ein bestimmtes paar von Prozessen. Sagen, dass jedes paar von Prozessen und deren Verknüpfung Warteschlange wird ermittelt, indem der variable key.

Möchte ich ein Wörterbuch für den Zugriff auf meine Warteschlangen, wenn ich put-und get-Daten. Ich kann nicht diese Arbeit machen. Ich habe versucht eine Reihe von Dingen. Mit multiprocessing importiert mp:

Definition ein dict wie for key in all_keys: DICT[key] = mp.Queue in einer config-Datei, die importiert werden, das multiprocessing-Modul (nennen wir es multi.py) keinen Fehler zurück, aber die Schlange DICT[key] wird nicht zwischen den Prozessen, jeder scheint Ihre eigene Kopie der Warteschlange und somit keine Kommunikation erfolgt.

Wenn ich versuchen zu definieren, die DICT am Anfang der main-multiprocessing-Funktion, definiert die Prozesse und startet Sie, wie

DICT = mp.Manager().dict()    
for key in all_keys:
    DICT[key] = mp.Queue()

Bekomme ich die Fehlermeldung

RuntimeError: Queue objects should only be shared between processes through
 inheritance

Wechsel zu

DICT = mp.Manager().dict()    
for key in all_keys:
    DICT[key] = mp.Manager().Queue()

macht nur alles schlimmer. Versuchen ähnliche Definitionen an den Kopf multi.py eher als innerhalb der main-Funktion liefert den gleichen Fehler.

Es muss ein Weg gefunden werden, teilen viele Warteschlangen zwischen Prozessen, ohne Sie explizit angeben, jeweils in den code. Irgendwelche Ideen?

Bearbeiten

Hier ist eine grundlegende schema des Programms:

1 legen Sie das erste Modul, die definiert einige Variablen, Importe multi startet multi.main() und lädt ein weiteres Modul startet eine Kaskade von Modul-Lasten-und code-Ausführung. In der Zwischenzeit...

2- multi.main sieht wie folgt aus:

def main():
    manager = mp.Manager()
    pool = mp.Pool()
    DICT2 = manager.dict()

    for key in all_keys:
        DICT2[key] = manager.Queue()
        proc_1 = pool.apply_async(targ1,(DICT1[key],) ) #DICT1 is defined in the config file
        proc_2 =  pool.apply_async(targ2,(DICT2[key], otherargs,) 

Eher als pool und manager war ich auch starten von Prozessen mit den folgenden:

mp.Process(target=targ1, args=(DICT[key],))

3 - Die Funktion targ1 nimmt die Eingabe-Daten, die in (sortiert nach key) von der Haupt-Prozess. Es soll zu passieren, das Ergebnis zu DICT[key] so targ2 seine Arbeit erledigen kann. Dies ist der Teil, der nicht funktioniert. Es gibt eine beliebige Anzahl von targ1s targ2s, etc. und daher eine beliebige Anzahl von Warteschlangen.

4 - Die Ergebnisse dieser Prozesse gesendet werden, um eine Reihe von verschiedenen arrays /pandas dataframes, die auch indiziert durch key, und ich würde Sie gerne erreichbar beliebige Prozesse, auch solche, startete in einem anderen Modul. Ich muß noch schreiben, dieser Teil und es könnte eine andere Frage. (Ich erwähne es hier, weil die Antwort auf die 3 oben genannten könnten auch lösen 4 schön.)

  • Wie werden Sie das starten von child-Prozessen? Können Sie instanziieren die Queue vor dem Start der Prozesse? Sind die Paare von Prozessen, Sie sprechen über potentiell zwei Kind-Prozesse, oder ist es immer ein Eltern-Kind-Beziehung?
  • Hey Mann, ich habe ein edit, das beschreibt mein Programm im kurzen überblick. Ich bin instanziieren Queue vor dem Start der Prozesse. Ich bin nicht sicher, wie Sie Sie zu unterscheiden, ein Kind von parent-Prozess, so kann nicht die Antwort auf Ihre Letzte Frage...
  • Warum ist DICT2 eine manager.dict() in deinem code oben? Es sieht nicht wie Sie tatsächlich versuchen, passieren die DICT2 Objekt für jedes Kind. Könnte es nicht einfach sein, eine regelmäßige dict mit mp.Manager().Queue() Instanzen?
  • Ich habe versucht DICT2={} früher (nicht nur in multi.main() aber überall, wo ich denken könnte, könnte funktionieren), bekomme ich die gleiche RunTime Fehler. Was meinst du mit 'pass in die DICT2 Objekt zu Kindern"? Ist nicht das, was ich Tue, macht es ein argument von targ2?
  • Hey, also ich denke, dass ich einige Fortschritte gemacht, inspiriert durch deinen zweiten Kommentar. DICT2[key] gedacht, um gelesen werden targ2, und geschrieben in einer Funktion, die targ1 Berufung war. Aber wenn ich mir DICT2[key] in targ1 als argument (neben DICT1[key]), das Verfahren funktioniert und targ2 war in der Lage das Lesen von Daten aus der Warteschlange DICT2[key]. Hat dieses Abkommen mit Ihrer Erfahrung und Ihrem Sinn machen? Ist es das, was du meintest von Eltern/Kinder-Prozessen (proc_1 ist die Muttergesellschaft von proc_2)?
  • Ich denke, es immer noch nicht verstehen, was Sie zu tun versuchen. Was gespeichert wird, in DICT1 genau? Auch sind Sie nicht bestehen entweder dict-auf die Kind-Prozesse. Wenn Sie das tun pool.apply_async(targ1,(DICT1[key],) Sie sind nicht vorbei DICT1 um das Kind, das Sie vorbei sind, was Objekt ist gespeichert in DICT1[key]. Auch proc_1 und proc_2 sind Geschwister; Sie sind beide Kinder von deinem Haupt-script. Ich bin mir auch nicht sicher, warum proc_1 muss, um Informationen zu proc_2, warum nicht ein einzelner Prozess, der sich um alle arbeiten, die getan werden muss, und haben eine Reihe von diese identische Prozesse, die parallel laufen?
  • Sorry für die Verspätung, ich habe ziemlich zugeschlagen. Ich habe den code-arbeiten durch die übergabe DICT[key] als Argumente zu den Verfahren, wie ich schon in meinem vorherigen Kommentar. Mein Beispiel-code ist eher obskuren, weil ich versucht habe (mit wenig Erfolg) zu extrahieren, die Essenz von meinen Schwierigkeiten. Die anderen Aspekte des Kodex sind nicht wichtig. Ich verstehe den Unterschied zwischen untergeordneten und übergeordneten Prozessen, sehr intuitiv. Danke.

InformationsquelleAutor Wapiti | 2015-04-30
Schreibe einen Kommentar