Ist das hinzufügen von Aufgaben zu BlockingQueue von ThreadPoolExecutor ratsam?
In der JavaDoc für ThreadPoolExecutor ist unklar, ob es akzeptabel ist, fügen Sie die Aufgaben direkt auf die BlockingQueue
sichern der Vollstrecker. Die docs sagen aufrufen executor.getQueue()
"soll in Erster Linie für debugging und monitoring".
Ich Baue eine ThreadPoolExecutor
mit meinen eigenen BlockingQueue
. Ich behalten Sie einen Verweis auf die Warteschlange, so kann ich Aufgaben hinzufügen, um es direkt. Die gleiche Warteschlange zurückgegeben wird getQueue()
so nehme ich die Ermahnung in getQueue()
gilt für einen Verweis auf die Unterstützung Warteschlange erworben durch meine bedeutet.
Beispiel
Allgemeinen Muster des Codes:
int n = ...; //number of threads
queue = new ArrayBlockingQueue<Runnable>(queueSize);
executor = new ThreadPoolExecutor(n, n, 1, TimeUnit.HOURS, queue);
executor.prestartAllCoreThreads();
//...
while (...) {
Runnable job = ...;
queue.offer(job, 1, TimeUnit.HOURS);
}
while (jobsOutstanding.get() != 0) {
try {
Thread.sleep(...);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
executor.shutdownNow();
queue.offer()
vs executor.execute()
Als ich verstehen es, die typische Verwendung ist das hinzufügen von Aufgaben über executor.execute()
. Der Ansatz in meinem obigen Beispiel hat den Vorteil, blockieren die Warteschlange in der Erwägung, dass execute()
versagt sofort, wenn die Warteschlange voll ist und lehnt meine Aufgabe. Ich mag auch, dass das senden von Aufträgen interagiert mit einer blocking-queue; fühlt sich mehr der "reinen" Produzenten-Konsumenten zu mir.
Eine Implikation hinzufügen von tasks an die Warteschlange direkt: ich muss anrufen prestartAllCoreThreads()
sonst keine worker-threads ausgeführt werden. Vorausgesetzt, dass keine anderen Wechselwirkungen mit dem Testamentsvollstrecker, nichts wird die überwachung der Warteschlange (Prüfung der ThreadPoolExecutor
Quelle bestätigt dies). Dies bedeutet auch für die direkte einreihen, die die ThreadPoolExecutor
muss zusätzlich konfiguriert werden > 0 core threads und darf nicht so konfiguriert werden, dass Kern-threads, timeout.
tl;dr
Gegeben ThreadPoolExecutor
wie folgt konfiguriert:
- core threads > 0
- core-threads sind nicht erlaubt timeout
- core-threads werden prestarted
- einen Verweis auf die
BlockingQueue
sichern der Testamentsvollstrecker
Ist es akzeptabel, Aufgaben hinzufügen, direkt an der Warteschlange anstelle von aufrufen executor.execute()
?
Verwandte
Diese Frage ( producer/consumer-Arbeit Warteschlangen ) ist ähnlich, aber nicht gezielt ein cover hinzufügen, um die Warteschlange direkt.
- "Ich behalte mir einen Verweis auf die Warteschlange, so kann ich Aufgaben hinzufügen, um es direkt" aber warum sollte Sie das tun? Warum nicht einfach senden Sie Sie an die executor?
- siehe oben, wo ich Schreibe "Der Ansatz in meinem obigen Beispiel hat den Vorteil, dass die Blockierung in der Warteschlange..."
Du musst angemeldet sein, um einen Kommentar abzugeben.
Wenn es nach mir ginge, würde ich lieber mit
Executor#execute()
überQueue#offer()
einfach, denn ich bin mit alles ausjava.util.concurrent
schon.Deine Frage ist gut, und es hat mein Interesse geweckt, so nahm ich einen Blick auf die Quelle für
ThreadPoolExecutor#execute()
:Können wir sehen, dass führen Sie selbst nennt
offer()
auf der Warteschlange arbeiten, aber nicht, bevor Sie einige schöne, leckere pool Manipulationen, wenn nötig. Aus diesem Grund, würde ich denken, es wäre ratsam, zu verwendenexecute()
; nicht verwenden können (obwohl ich nicht weiß, für bestimmte) Ursache der pool zu betreiben in einem nicht-optimalen Weg. Aber ich glaube nicht, dass mitoffer()
wird Pause der Testamentsvollstrecker - wie es aussieht-Aufgaben gezogen werden aus der Warteschlange mit den folgenden (auch von ThreadPoolExecutor):Diese
getTask()
Methode wird nur aufgerufen innerhalb einer Schleife, so dass, wenn der Testamentsvollstrecker ist nicht heruntergefahren, es würde blockieren, bis eine neue Aufgabe gegeben wurde, um die Warteschlange (unabhängig davon, von wo es kam).Hinweis: obwohl ich gebucht habe, code-snippets von Quelle hier, wir können nicht auf Sie verlassen für eine definitive Antwort - wir sollten nur Codierung auf die API. Wir wissen nicht, wie die Umsetzung der
execute()
im Laufe der Zeit verändern.Ein trick ist, die zum implementieren einer benutzerdefinierten Unterklasse der ArrayBlockingQueue und überschreiben das Angebot () - Methode aufrufen, Ihre blockierende version, dann können Sie immer noch die normale code-Pfad.
(wie Sie wahrscheinlich erraten, ich denke, fordern Angebot direkt auf die Warteschlange als Ihre normale code-Pfad ist wahrscheinlich eine schlechte Idee).
Kann man eigentlich konfigurieren Verhalten des Pools, wenn die Warteschlange voll ist, werden durch die Angabe einer
RejectedExecutionHandler
bei der Instanziierung.ThreadPoolExecutor
definiert vier Strategien, wie innere Klassen, einschließlichAbortPolicy
,DiscardOldestPolicy
,DiscardPolicy
sowie mein persönlicher Favorit,CallerRunsPolicy
, läuft der neue job in der controlling-thread.Beispiel:
Das gewünschte Verhalten in der Frage gewonnen werden können, mit so etwas wie:
Irgendwann die Warteschlange muss zugegriffen werden. Der beste Ort, um dies zu tun, eine eigenständige
RejectedExecutionHandler
, die speichert alle code-Duplizierung oder potenial Fehler, die sich aus direkten manipulation der Warteschlange auf den Geltungsbereich des pool-Objekts. Beachten Sie, dass die Prozeduren enthalten inThreadPoolExecutor
selbstgetQueue()
.Es ist eine sehr wichtige Frage, wenn die Warteschlange, die Sie verwenden, ist eine ganz andere Umsetzung vom standard-in-memory -
LinkedBlockingQueue
oderArrayBlockingQueue
.Zum Beispiel, wenn Sie die Implementierung des producer-consumer-pattern mit mehreren Produzenten auf verschiedenen Maschinen, und verwenden Sie ein queuing-Mechanismus basiert auf einem separaten Persistenz-subsystem (wie Redis), dann wird die Frage relevant, auf seine eigene, selbst wenn Sie nicht wollen, dass eine Blockierung
offer()
wie der OP.Also die gegebene Antwort, dass
prestartAllCoreThreads()
aufgerufen werden (oder oft genugprestartCoreThread()
) für die worker-threads zur Verfügung stehen und ausgeführt wird, ist wichtig genug betont werden.Falls erforderlich, können wir auch ein Parkplatz trennt die Haupt-Verarbeitung von abgelehnten Aufgaben -