Teilen viele queues unter-Prozesse in Python
Ich bin mir bewusst multiprocessing.Manager()
und wie es verwendet werden kann zum erstellen von gemeinsam genutzten Objekten, insbesondere von Warteschlangen, die geteilt werden können zwischen den Beschäftigten. Es ist diese Frage, diese Frage, diese Frage und sogar eine meiner eigenen Fragen.
Allerdings brauche ich, um zu definieren, sehr viele Warteschlangen, von denen jede Verknüpfung ein bestimmtes paar von Prozessen. Sagen, dass jedes paar von Prozessen und deren Verknüpfung Warteschlange wird ermittelt, indem der variable key
.
Möchte ich ein Wörterbuch für den Zugriff auf meine Warteschlangen, wenn ich put-und get-Daten. Ich kann nicht diese Arbeit machen. Ich habe versucht eine Reihe von Dingen. Mit multiprocessing
importiert mp
:
Definition ein dict wie for key in all_keys: DICT[key] = mp.Queue
in einer config-Datei, die importiert werden, das multiprocessing-Modul (nennen wir es multi.py
) keinen Fehler zurück, aber die Schlange DICT[key]
wird nicht zwischen den Prozessen, jeder scheint Ihre eigene Kopie der Warteschlange und somit keine Kommunikation erfolgt.
Wenn ich versuchen zu definieren, die DICT
am Anfang der main-multiprocessing-Funktion, definiert die Prozesse und startet Sie, wie
DICT = mp.Manager().dict()
for key in all_keys:
DICT[key] = mp.Queue()
Bekomme ich die Fehlermeldung
RuntimeError: Queue objects should only be shared between processes through
inheritance
Wechsel zu
DICT = mp.Manager().dict()
for key in all_keys:
DICT[key] = mp.Manager().Queue()
macht nur alles schlimmer. Versuchen ähnliche Definitionen an den Kopf multi.py
eher als innerhalb der main-Funktion liefert den gleichen Fehler.
Es muss ein Weg gefunden werden, teilen viele Warteschlangen zwischen Prozessen, ohne Sie explizit angeben, jeweils in den code. Irgendwelche Ideen?
Bearbeiten
Hier ist eine grundlegende schema des Programms:
1 legen Sie das erste Modul, die definiert einige Variablen, Importe multi
startet multi.main()
und lädt ein weiteres Modul startet eine Kaskade von Modul-Lasten-und code-Ausführung. In der Zwischenzeit...
2- multi.main
sieht wie folgt aus:
def main():
manager = mp.Manager()
pool = mp.Pool()
DICT2 = manager.dict()
for key in all_keys:
DICT2[key] = manager.Queue()
proc_1 = pool.apply_async(targ1,(DICT1[key],) ) #DICT1 is defined in the config file
proc_2 = pool.apply_async(targ2,(DICT2[key], otherargs,)
Eher als pool
und manager
war ich auch starten von Prozessen mit den folgenden:
mp.Process(target=targ1, args=(DICT[key],))
3 - Die Funktion targ1
nimmt die Eingabe-Daten, die in (sortiert nach key
) von der Haupt-Prozess. Es soll zu passieren, das Ergebnis zu DICT[key]
so targ2
seine Arbeit erledigen kann. Dies ist der Teil, der nicht funktioniert. Es gibt eine beliebige Anzahl von targ1
s targ2
s, etc. und daher eine beliebige Anzahl von Warteschlangen.
4 - Die Ergebnisse dieser Prozesse gesendet werden, um eine Reihe von verschiedenen arrays /pandas dataframes, die auch indiziert durch key
, und ich würde Sie gerne erreichbar beliebige Prozesse, auch solche, startete in einem anderen Modul. Ich muß noch schreiben, dieser Teil und es könnte eine andere Frage. (Ich erwähne es hier, weil die Antwort auf die 3 oben genannten könnten auch lösen 4 schön.)
- Wie werden Sie das starten von child-Prozessen? Können Sie instanziieren die
Queue
vor dem Start der Prozesse? Sind die Paare von Prozessen, Sie sprechen über potentiell zwei Kind-Prozesse, oder ist es immer ein Eltern-Kind-Beziehung? - Hey Mann, ich habe ein edit, das beschreibt mein Programm im kurzen überblick. Ich bin instanziieren
Queue
vor dem Start der Prozesse. Ich bin nicht sicher, wie Sie Sie zu unterscheiden, ein Kind von parent-Prozess, so kann nicht die Antwort auf Ihre Letzte Frage... - Warum ist
DICT2
einemanager.dict()
in deinem code oben? Es sieht nicht wie Sie tatsächlich versuchen, passieren dieDICT2
Objekt für jedes Kind. Könnte es nicht einfach sein, eine regelmäßige dict mitmp.Manager().Queue()
Instanzen? - Ich habe versucht
DICT2={}
früher (nicht nur inmulti.main()
aber überall, wo ich denken könnte, könnte funktionieren), bekomme ich die gleicheRunTime
Fehler. Was meinst du mit 'pass in dieDICT2
Objekt zu Kindern"? Ist nicht das, was ich Tue, macht es ein argument vontarg2
? - Hey, also ich denke, dass ich einige Fortschritte gemacht, inspiriert durch deinen zweiten Kommentar.
DICT2[key]
gedacht, um gelesen werdentarg2
, und geschrieben in einer Funktion, dietarg1
Berufung war. Aber wenn ich mirDICT2[key]
intarg1
als argument (nebenDICT1[key]
), das Verfahren funktioniert undtarg2
war in der Lage das Lesen von Daten aus der WarteschlangeDICT2[key]
. Hat dieses Abkommen mit Ihrer Erfahrung und Ihrem Sinn machen? Ist es das, was du meintest von Eltern/Kinder-Prozessen (proc_1
ist die Muttergesellschaft vonproc_2
)? - Ich denke, es immer noch nicht verstehen, was Sie zu tun versuchen. Was gespeichert wird, in
DICT1
genau? Auch sind Sie nicht bestehen entweder dict-auf die Kind-Prozesse. Wenn Sie das tunpool.apply_async(targ1,(DICT1[key],)
Sie sind nicht vorbeiDICT1
um das Kind, das Sie vorbei sind, was Objekt ist gespeichert inDICT1[key]
. Auchproc_1
undproc_2
sind Geschwister; Sie sind beide Kinder von deinem Haupt-script. Ich bin mir auch nicht sicher, warumproc_1
muss, um Informationen zuproc_2
, warum nicht ein einzelner Prozess, der sich um alle arbeiten, die getan werden muss, und haben eine Reihe von diese identische Prozesse, die parallel laufen? - Sorry für die Verspätung, ich habe ziemlich zugeschlagen. Ich habe den code-arbeiten durch die übergabe
DICT[key]
als Argumente zu den Verfahren, wie ich schon in meinem vorherigen Kommentar. Mein Beispiel-code ist eher obskuren, weil ich versucht habe (mit wenig Erfolg) zu extrahieren, die Essenz von meinen Schwierigkeiten. Die anderen Aspekte des Kodex sind nicht wichtig. Ich verstehe den Unterschied zwischen untergeordneten und übergeordneten Prozessen, sehr intuitiv. Danke.
Du musst angemeldet sein, um einen Kommentar abzugeben.
Es klingt wie Sie Ihre Probleme begannen, als Sie versuchte eine
multiprocessing.Queue()
durch übergeben es als argument. Sie können dies umgehen, durch Schaffung einer verwaltete Warteschlange statt:Wenn Sie ein manager, um es zu schaffen, Sie sind die Speicherung und Weitergabe um eine proxy der Warteschlange, anstatt die queue selbst, so dass selbst wenn das Objekt, das Sie passieren, um Ihren worker-Prozesse ist kopiert, es wird immer noch an der gleichen zugrunde liegenden Daten-Struktur: die Warteschlange. Es ist sehr ähnlich wie (in-Konzept) zu Zeigern in C/C++. Wenn Sie erstellen Sie Ihre Warteschlangen diese Weise werden Sie in der Lage, Sie zu übergeben, wenn Sie starten Sie einen Arbeitsprozess.
Da können Sie Warteschlangen, um jetzt brauchen Sie nicht mehr über Ihr Wörterbuch verwaltet werden. Ein normales Wörterbuch in main gespeichert werden alle Zuordnungen, und geben nur Ihren worker-Prozesse, die Warteschlangen, die Sie benötigen, so dass Sie nicht brauchen Zugang zu allen Zuordnungen.
Ich geschrieben habe, ein Beispiel von diesem hier. Es sieht aus wie Sie sind übergeben von Objekten zwischen Ihrer Arbeitnehmer, so dass das, was getan hier. Sich vorstellen, wir haben zwei Stufen der Verarbeitung, und die Daten sowohl beginnt und endet bei der Kontrolle der
main
. Schauen Sie, wie können wir die Warteschlangen, die eine Verbindung der Arbeiter wie eine pipeline, sondern indem Sie Sie nur Warteschlangen, die Sie benötigen, es gibt keine Notwendigkeit für Sie zu wissen, über alle Zuweisungen:Den code erzeugt diese Ausgabe:
Ich nicht beinhalten ein Beispiel der Speicherung der Warteschlangen oder
AsyncResults
Objekte in Wörterbüchern, weil ich noch nicht ganz verstanden, wie dein Programm funktionieren soll. Aber jetzt, können Sie übergeben Sie Ihre queues frei, Sie können bauen Sie Ihr Wörterbuch zum speichern der Warteschlange/Prozess-mappings, wie gebraucht.In der Tat, wenn Sie wirklich tun, bauen eine pipeline zwischen mehreren Arbeitnehmern, die Sie gar nicht benötigen, um einen Verweis auf die "inter-worker" - Warteschlangen in
main
. Erstellen von Warteschlangen, übergeben Sie an Ihre Arbeitnehmer, dann nur behalten, Verweise auf Warteschlangenmain
verwenden. Ich würde definitiv empfehlen, versuchen zu lassen alte Warteschlangen, Müll gesammelt, so schnell wie möglich, wenn Sie wirklich tun haben "eine beliebige Anzahl" von Warteschlangen.mp.Manager()
Definitionen explizit, und stattdessen setzen viele Warteschlangen in einem dict. @dano half mir lösen, was ich falsch machte in den Kommentaren oben. Ich muss etwas fehlt in deinem Beispiel, weil ich nicht sehe den Punkt, der mit einem verwalteten Warteschlange. Wenn ich die Stelle des verwalteten queues mit Stammkunden und nehmen sich die manager und pool und nur normale multiprocessing syntax, dein Beispiel funktioniert auch. Was ist der manger hinzufügen?multiprocessing.Process
Klasse, Sie können Sie weitergeben normalen Warteschlangen. Ich konzentrierte mich darauf, die Warteschlangen, um die Arbeit für Sie mit einem multiprocessing-pool, weil es Klang wie das ist, was Sie benutzen wollte. Wenn Sie wollen stick mit den pools, die Sie benötigen, um verwaltete Warteschlangen übergeben Sie als Argumente.mp.Process()
seitpool
gab mir so viel Mühe. Eine Sache habe ich bemerkt ist, dass, wenn ich eine gemeinsame dict mit dem manager, er windet sich wirklich verlangsamt meinen app. Ich habe keine Zeit, aber die Verlangsamung kann ein Faktor von 1000 oder mehr. Ist das zu erwarten, wenn, sagen wir, 10 Prozesse, die alle schreiben zu einem gemeinsamen dict etwa 10 mal pro Sekunde?pool
und erwähnt das verwaltete Warteschlangen, ich bin akzeptieren Sie Ihre Antwort. Hoffentlich jemand Lesen wird dieses Zeug bekommen etwas Gebrauch aus ihm heraus.