Java ExecutorService: awaitTermination aller rekursiv erstellte Aufgaben
Benutze ich ein ExecutorService
zum ausführen einer Aufgabe. Diese Aufgabe kann rekursiv erzeugen andere Aufgaben, die einzureichen sind die gleichen ExecutorService
und diese untergeordneten Aufgaben können das auch.
Ich habe jetzt das problem, dass ich warten möchte, bis alle Aufgaben erledigt sind (das heißt, dass alle tasks fertig sind und Sie nicht Einreichen), bevor ich weiter.
Kann ich nicht nennen ExecutorService.shutdown()
im Haupt-thread, da dies verhindert, dass neue Aufgaben akzeptiert werden, die von der ExecutorService
.
Und Ruft ExecutorService.awaitTermination()
scheint, nichts zu tun, wenn shutdown
wurde noch nicht genannt.
So bin ich irgendwie hier hängengeblieben. Es kann nicht sein, dass hart für die ExecutorService
zu sehen, dass alle Arbeiter frei sind, kann es? Den nur unelegant Lösung könnte ich kommen mit, um direkt mit einem ThreadPoolExecutor
und Abfrage seiner getPoolSize()
jeder einmal in eine Weile. Gibt es wirklich keine bessere Art und Weise tun Sie das?
InformationsquelleAutor Christoph | 2011-02-10
Du musst angemeldet sein, um einen Kommentar abzugeben.
Wenn die Anzahl der Aufgaben in der Struktur der rekursiven Aufgaben ist zunächst nicht bekannt, vielleicht die einfachste Möglichkeit wäre, eine eigene Synchronisierung implementieren primitive, eine Art "inverse semaphore", und teilen Sie Sie zu Ihren Aufgaben. Vor dem Absenden jeder Aufgabe, die Sie Inkrementieren einen Wert, wenn die Aufgabe abgeschlossen ist, dekrementiert den Wert, und warten Sie, bis der Wert 0 ist.
Implementierung als separate primitive explizit genannt von Aufgaben entkoppelt diese Logik aus dem thread-pool-Implementierung, und ermöglicht Ihnen das senden mehrere voneinander unabhängige Bäume von rekursiven Aufgaben in der gleichen pool.
Etwas wie dieses:
Beachten Sie, dass
taskCompleted()
sollte aufgerufen werden, innerhalb einesfinally
block, um es immun gegen mögliche Ausnahmen.Beachten Sie auch, dass
beforeSubmit()
sollte aufgerufen werden, durch die Einreichung der thread vor die Aufgabe gestellt wird, nicht durch die Aufgabe selbst, zu vermeiden "falschen Abschluss", wenn alte Aufgaben abgeschlossen und neue noch nicht angefangen.EDIT: Wichtiges problem in der Verwendung der Muster fixiert.
Mit AtomicInteger Sie kann nicht warten, für die Fertigstellung ohne busy waiting.
Es ist ein Tippfehler, du tust lock-- statt--Wert
oh peter bearbeitet hat, um es zu korrigieren
dass nicht helfen, diese Antwort, obwohl becauase es müsste Synchronisierung für das warten.
InformationsquelleAutor axtavt
Dies ist wirklich ein Idealer Kandidat für einen Phaser-Effekt. Java 7 kommt mit dieser neuen Klasse. Seine flexible CountdonwLatch/CyclicBarrier. Sie erhalten eine stabile version auf JSR 166-Interest-Site.
So, wie es ist ein flexibler CountdownLatch/CyclicBarrier ist, weil es in der Lage, nicht nur die Unterstützung einer unbekannten Anzahl von Parteien (threads), aber es ist auch wiederverwendbar (das ist, wo die phase Teil kommt in)
Für jede Aufgabe, die Sie Einreichen, Sie würden registrieren, wenn die Aufgabe abgeschlossen ist, wenn Sie ankommen. Diese kann getan werden, rekursiv.
Edit: Danke @depthofreality für den Hinweis auf die race-condition in meinem vorherigen Beispiel. Ich bin eine Aktualisierung es so, dass die Ausführung thread wartet nur noch auf vorher von der aktuellen phase, wie Sie Blöcke für die rekursive Funktion zu vervollständigen.
Phase Anzahl nicht Reise, bis die Anzahl der
arrive
s ==register
s. Da vor jedem rekursiven Aufruf, ruftregister
eine phase Inkrement wird geschehen, wenn alle Aufrufe abgeschlossen sind.Ich bin mir bewusst, es wurde geschrieben vor langer Zeit. Immer noch Frage ich mich, ob es die race-Bedingung hier. Nicht doWork() abgeschlossen ist, bevor recursiveRunnable registriert mit phaser?
Das ist ein großer Punkt. Du hast Recht es wäre sicherlich ein Rennen hier (muss es übersehen haben, als ich dieses Beispiel schnell zusammen). Ich werde diese jetzt aktualisieren.
vielen Dank für die Klarstellung und Korrektur. Ich denke, es ist ein anderes Problem jetzt. Einige Parteien ankommen kann ohne registriert zu sein.
Ich betrachtet, aber das wäre nicht der Fall. Die erste Partei zu registrieren, erfolgt in
doWork
, da die phaser unten, nichtarriveAndAwaitAdvance
dierecursiveRunnable
braucht, um anzukommen (was er macht). Beachten Sie auch die runnableregister
s vor der Ausführung der ExecutorServiceInformationsquelleAutor John Vint
Wow, Ihr seid schnell:)
Danke für alle Anregungen. Futures nicht leicht zu integrieren, mit meinem Modell, denn ich weiß nicht, wie viele runnables sind geplant vorher. Also, wenn ich halten eine übergeordnete Aufgabe, lebendig, nur zu warten, bis es die rekursive Kind Aufgaben zu erledigen, habe ich eine Menge Müll herumliegen.
Ich mein problem gelöst mit der AtomicInteger Vorschlag. Im Grunde ist mir untergeordnet ThreadPoolExecutor und erhöhen Sie den Zähler für Aufrufe von execute() und decrement auf Anrufe zu afterExecute(). Wenn der Zähler wird 0, die ich Aufruf von shutdown(). Dies scheint zu funktionieren für meine Probleme, nicht sicher, ob das generell eine gute Möglichkeit, das zu tun. Vor allem, ich gehe davon aus, dass Sie nur mit execute() zum hinzufügen von Runnables.
Als eine Seite Knoten: ich habe zunächst versucht, zu überprüfen, in afterExecute() die Nummer des Runnables in der Warteschlange und die Zahl der Arbeitnehmer, die aktiv sind, und Herunterfahren, wenn diese 0 sind; aber das hat nicht funktioniert, weil nicht alle Runnables, zeigte sich in der Warteschlange und die getActiveCount() hat nicht das getan was ich erwartet habe.
Jedenfalls, hier ist meine Lösung: (wenn jemand findet ernste Probleme mit diesem, lassen Sie es mich bitte wissen:)
Sie haben das problem, dass der Testamentsvollstrecker nicht wissen, Wann Sie aufhören Zugabe Aufgaben. Wenn Sie zu irgendeinem Zeitpunkt alle Ihre Aufgaben zu beenden, bevor Sie fertig sind das hinzufügen von Ihnen, diese Aufgaben werden abgelehnt, wie der pool wurde heruntergefahren.
Richtig, aber es gibt eine triviale Lösung: erhöhen Sie den Zähler zunächst an und verringern ihn, wenn Sie fertig mit dem hinzufügen. Oder verwenden Sie ein "Zusatz-Aufgabe" für das hinzufügen aller Aufgaben.
InformationsquelleAutor Christoph
Könnte man erstellen Sie Ihre eigenen thread-pool erstreckt sich ThreadPoolExecutor. Sie möchten wissen, wenn eine Aufgabe abgegeben wurde und wenn es vollendet.
InformationsquelleAutor Peter Lawrey
Verwenden Zukunft für Ihre Aufgaben (anstelle der Einreichung
Runnable
's), eine callback aktualisiert den Zustand, wenn es fertig ist, so können Sie Zukunft.isDone zu verfolgen, den Zustand aller Ihrer Aufgaben.Wenn er sagt callback-er bedeutet, dass der Wert, den Sie der Rückkehr aus der Methode call
Tun Sie, was ich meine ist, dass die "fertig" - flag gesetzt ist (über ein callback). Ich habe also die Antwort, um diese deutlicher.
ok, so hat er noch zu Umfrage isDone? Will nur sicher. @John V. - die call-Methode auf ein callable gibt das Ergebnis zurück, wartet darauf, dass es, wenn nötig. Sie bekommen eine Zukunft zurück, wenn Sie uns ein Callable. Vielleicht ist unsere definition von callback unterscheidet.
Nein, du hast Recht SB vermisse ich Lesen deiner Frage auf, was er meinte.
InformationsquelleAutor Joel
(mea culpa: es ist ein 'bisschen' hinter meinem Schlafenszeit 😉 aber hier ist ein Erster Versuch, ein dynamisches latch):
Dieser Riegel stellt eine neue Methode join (), um die vorhandenen (geklont) CountDownLatch-API, die verwendet wird, indem Sie Aufgaben, um zu signalisieren Ihrem Eintritt in die größere Aufgabe der Gruppe.
Den Riegel pass um von übergeordneten Aufgaben an untergeordnete Aufgabe. Jede Aufgabe, pro Suraj Muster, ersten 'join()' den Riegel, der seine Aufgabe(), und dann countDown().
Zur Behandlung von Situationen, wo der main-thread startet den task-Gruppe und dann sofort erwartet() -- vor der Aufgabe, die Fäden haben eine chance hatte, sich selbst () --
topLatch
verwendet wird int innerenSync
Klasse. Dies ist ein latch, das wird nach unten gezählt, auf jedem join(); nur der erste countdown ist natürlich groß, da alle nachfolgenden NOPs für.Die erste Implementierung oben führt zu einem semantischen Falten des möglichen, da die tryAcquiredShared(int) ist nicht zugelassen werden, wirft eine InterruptedException, aber dann müssen wir befassen sich mit der interrupt-auf das warten auf die topLatch.
Ist dies eine Verbesserung gegenüber OP ' s eigene Lösung mit Atomarer Zähler? Ich würde sagen, wahrscheinlich nicht, wennn er ist beharrlich auf der Vollzieher, aber es ist, glaube ich, eine gleichermaßen gültige alternative Ansatz durch die AQS in diesem Fall, und ist verwendbar mit generischen threads sowie.
Crit Weg fellow Hacker.
InformationsquelleAutor alphazero
Wenn Sie verwenden möchten JSR166y Klassen - z.B. Phaser oder Fork/Join - entweder, die vielleicht Arbeit für Sie, können Sie immer laden Sie die Java 6-backport aus: http://gee.cs.oswego.edu/dl/concurrency-interest/ und verwenden es als Grundlage eher als das schreiben einer komplett homebrew-Lösung. Dann, wenn 7 kommt können Sie einfach die Abhängigkeit von den portieren und ein paar Paket-Namen.
(Full disclosure: Wir haben mit der LinkedTransferQueue in prod für eine Weile jetzt. Keine Probleme)
InformationsquelleAutor kittylyst
Muss ich sagen, dass Lösungen, die oben beschrieben problem mit den rekursiven aufrufen von task und warten Sie, bis Ende Unterordnung Aufgaben nicht erfüllen mich. Da ist meine Lösung, inspiriert durch die original-Dokumentation von Oracle gibt es: CountDownLatch und Beispiel: Human resources CountDownLatch.
Den ersten roten Faden im Prozess im Instanz der Klasse HRManagerCompact hat warten Riegel für die zwei Tochter-threads, die darauf warten Riegel für den späteren 2 Tochter threads... usw.
Natürlich", "latch" eingestellt werden können, auf der anderen Wert als 2 (im Konstruktor von CountDownLatch), sowie die Anzahl der runnable-Objekte hergestellt werden können, die in iteration also ArrayList, aber es muss übereinstimmen (Nummer des count-downs gleich sein müssen, die parameter in CountDownLatch Konstruktor).
Vorsichtig sein, die Anzahl der Verriegelungen erhöht sich exponentiell nach Einschränkung Bedingung:
'Ebene.get() < 2', sowie die Anzahl der Objekte. 1, 2, 4, 8, 16... und latches 0, 1, 2, 4... Wie Sie sehen können, für die vier Ebenen (Ebene.get() < 4) es werden 15 wartenden threads und 7 Riegel in der Zeit, als peak-16 threads ausgeführt werden.
Möglich kommentierte Ausgabe (mit einiger Wahrscheinlichkeit):
InformationsquelleAutor
Verwenden CountDownLatch.
Übergeben Sie die CountDownLatch-Objekts, um jede Ihrer Aufgaben und code werden Ihre Aufgaben etwas wie unten.
In der Erwägung, dass der thread muss warten sollte führen Sie den folgenden code:
Aber natürlich, dies setzt Voraus, Sie bereits wissen, die Anzahl der untergeordneten Aufgaben, so dass Sie könnte Initialisierung des latch zählen.
CountDownLatch hat ein problem, Sie können es nicht zurücksetzen der Zähler, sobald es erstellt wurde. Ich nehme an, er kennt Sie nicht, die Anzahl der Aufgaben, die das system aufrufen.
ja...ich weiß, bin ich davon ausgegangen, dass er wissen konnte die Anzahl der Aufgaben vor der hand
Tatsächlich, ich don T wissen, die Anzahl der Aufgaben, die im Voraus:) trotzdem Danke!
InformationsquelleAutor Suraj Chandran
Verwenden Sie
shutdown() ,
awaitTermination()and shutdownNow()
Methoden in der richtigen Reihenfolge.shutdown()
: Initiiert einen geordneten shutdown, in dem zuvor eingereichten Aufgaben werden ausgeführt, aber es werden keine neuen Aufgaben angenommen werden.awaitTermination()
:Blockiert, bis alle Aufgaben abgeschlossen haben, der Ausführung nach eine shutdown-Anforderung, oder das timeout Auftritt, oder der aktuelle thread unterbrochen wird, je nachdem, was zuerst Eintritt.shutdownNow()
: Versuche zu stoppen, alle aktiv ausgeführten Aufgaben, Stoppt die Verarbeitung der wartenden Aufgaben, und gibt eine Liste der Aufgaben, die auf Ihre Hinrichtung warteten.Empfohlene Methode aus der oracle-Dokumentation Seite ExecutorService:
Kann man ersetzen, wenn die Bedingung bei while-Bedingung im Fall von langer Dauer in der Erledigung von Aufgaben wie folgt:
Ändern
Zu
Finden Sie andere alternativen (außer
join()
, die verwendet werden kann, mit der eigenständigen thread ), in :warten Sie, bis alle threads Ihre Arbeit beenden im java
InformationsquelleAutor Ravindra babu