Angepasster Thread-Pool in parallelem Java 8-Stream
Ist es möglich, zum angeben einer benutzerdefinierten thread-pool für Java 8 parallel stream? Ich kann es nirgends finden.
Vorstellen, dass ich eine server-Anwendung, und ich möchte die parallele streams. Aber die Anwendung ist groß und multi-threaded, so will ich es aufteilen. Ich will nicht einen langsam Laufenden Aufgabe in einem Modul der applicationblock Aufgaben von einem anderen Modul.
Wenn ich nicht verwenden können, die verschiedenen thread-pools für die verschiedenen Module, das bedeutet, ich kann sicher nicht die parallele streams in den meisten realen Situationen.
Versuchen Sie das folgende Beispiel. Es gibt einige CPU-intensive Aufgaben ausgeführt, die in separaten threads.
Die Aufgaben nutzen die parallele streams. Die erste Aufgabe ist gebrochen, so dass jeder Schritt dauert 1 Sekunde (simuliert durch den thread schlafen). Das Problem ist, dass andere threads stecken und warten, bis die defekte Aufgabe zu beenden. Das ist erfundenes Beispiel, aber stellen Sie sich ein servlet-app und jemand der Einreichung eine lange laufende Aufgabe der gemeinsamen fork-join-pool.
public class ParallelTest {
public static void main(String[] args) throws InterruptedException {
ExecutorService es = Executors.newCachedThreadPool();
es.execute(() -> runTask(1000)); //incorrect task
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.execute(() -> runTask(0));
es.shutdown();
es.awaitTermination(60, TimeUnit.SECONDS);
}
private static void runTask(int delay) {
range(1, 1_000_000).parallel().filter(ParallelTest::isPrime).peek(i -> Utils.sleep(delay)).max()
.ifPresent(max -> System.out.println(Thread.currentThread() + " " + max));
}
public static boolean isPrime(long n) {
return n > 1 && rangeClosed(2, (long) sqrt(n)).noneMatch(divisor -> n % divisor == 0);
}
}
InformationsquelleAutor der Frage Lukas | 2014-01-16
Du musst angemeldet sein, um einen Kommentar abzugeben.
Gibt es eigentlich einen trick, wie das ausführen einer parallelen Betrieb in einem bestimmten fork-join-pool. Wenn Sie ausführen, es als eine Aufgabe in einem fork-join-pool, bleibt es dort und nicht die häufigste.
Der trick basiert auf ForkJoinTask.Gabel, die angibt: "Ordnet asynchron ausführen dieser Aufgabe in den pool der aktuellen Aufgabe ausgeführt wird, falls anwendbar, oder über den ForkJoinPool.commonPool (), wenn nicht inForkJoinPool()"
InformationsquelleAutor der Antwort Lukas
Den parallelen streams verwenden Sie die Standard -
ForkJoinPool.commonPool
die standardmäßig hat man weniger threads, wie Sie Prozessoren, zurückgegeben durchRuntime.getRuntime().availableProcessors()
(Das bedeutet, dass parallele streams verwenden Sie alle Ihre Prozessoren, da Sie auch mit dem Haupt-thread):Dies bedeutet auch, wenn Sie geschachtelte parallel-streams oder mehrere parallele streams gleichzeitig begann, Sie alle teilen dem gleichen pool. Vorteil: Sie werden nie mehr als die Vorgabe (Anzahl der verfügbaren Prozessoren). Nachteil: man kann nicht "alle Prozessoren" zugeordnet, um jeden parallelen stream zu initiieren (wenn Sie geschehen, um mehr als eine). (Anscheinend können Sie ein ManagedBlocker zu umgehen.)
Verändern die Art und Weise parallele streams ausgeführt werden, können Sie entweder
yourFJP.submit(() -> stream.parallel().forEach(soSomething)).get();
oderSystem.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20")
für ein Ziel Parallelität von 20 threads.Beispiel des letzteren auf meiner Maschine, die hat 8 Prozessoren. Wenn ich das folgende Programm ausführen:
Ausgabe:
Damit Sie sehen können, dass die parallele stream-Prozesse 8 items gleichzeitig, D. H. es nutzt 8 threads. Wenn ich allerdings kommentieren Sie die auskommentierte Zeile, die Ausgabe ist:
Dieser Zeit, der parallelen Streams benutzt hat 20 threads und alle 20 Elemente in dem stream wurden gleichzeitig verarbeitet.
InformationsquelleAutor der Antwort assylias
Alternativ zu dem trick mit der Auslösung der parallelen Berechnung in Ihrem eigenen forkJoinPool Sie kann auch passieren, dass der pool der CompletableFuture.supplyAsync Methode, wie in:
InformationsquelleAutor der Antwort Mario Fusco
Mit einem ForkJoinPool und reichen für einen parallelen stream nicht zuverlässig nutzen alle threads. Wenn man sich diese ( Parallel stream von einem HashSet nicht parallel laufen ) und diese ( Warum wird der parallele Datenstrom nicht alle threads von den ForkJoinPool? ), werden Sie sehen, die Argumentation.
Kurze version: wenn ForkJoinPool/reichen nicht für Sie arbeiten, verwenden Sie
InformationsquelleAutor der Antwort Tod Casasent
Bis jetzt habe ich die beschriebenen Lösungen in den Antworten dieser Frage. Nun, ich kam mit einer kleinen Bibliothek namens Parallel-Stream-Unterstützung:
Aber wie @PabloMatiasGomez darauf hingewiesen, in den Kommentaren, gibt es Nachteile bezüglich der Trennung Mechanismus der parallelen Ströme, die hängt stark von der Größe der gemeinsamen pool. Sehen Parallel stream von einem HashSet nicht parallel laufen .
Bin ich mit dieser Lösung nur über separate pools für unterschiedliche Arten von Arbeit, aber ich kann nicht die Größe des common-pool 1, auch wenn ich Sie nicht nutze.
InformationsquelleAutor der Antwort Stefan Ferstl
Zur Messung der tatsächlichen Anzahl der verwendeten threads, Sie können überprüfen
Thread.activeCount()
:Diese produzieren können auf einen 4-Kern-CPU eine Ausgabe wie:
Ohne
.parallel()
es gibt:InformationsquelleAutor der Antwort charlie
Gehen, um AbacusUtil. Thread-Anzahl kann durch die angegebene, für die parallele Streams. Hier ist der Beispielcode:
Offenlegung: ich bin der Entwickler des AbacusUtil.
InformationsquelleAutor der Antwort user_3380739
Wenn Sie nichts dagegen haben, mit einem Drittanbieter-Bibliothek, mit cyclops-reagieren können Sie mischen sequentielle und parallele Streams in der gleichen Rohrleitung und bieten maßgeschneiderte ForkJoinPools. Zum Beispiel
Oder wenn wir wollten weiterhin die Bearbeitung innerhalb einer sequentiellen Stream
[Offenlegung ich bin der leitende Entwickler des Zyklopen-reagieren]
InformationsquelleAutor der Antwort John McClean