Funke: Erhöhung der Anzahl von Partitionen, ohne dass ein shuffle?
Wenn die Verringerung der Anzahl von Partitionen kann man verwenden coalesce
, das ist toll, weil es nicht zu einem shuffle-und scheint zu funktionieren sofort (keine zusätzliche job-Phase).
Möchte ich das Gegenteil tun manchmal, aber repartition
induziert einen shuffle. Ich denke, vor ein paar Monaten habe ich diese arbeiten, die durch die Nutzung CoalescedRDD
mit balanceSlack = 1.0
- also, was passieren würde, ist es aufgeteilt würde eine partition, so dass die sich ergebenden Partitionen Ort, wo alle auf dem gleichen Knoten (so klein net-IO).
Diese Art von Funktionalität ist die automatische Hadoop, man nur tweaks die split-Größe. Es scheint nicht zu funktionieren auf diese Weise in der Funke, es sei denn, man ist eine Verringerung der Anzahl von Partitionen. Ich denke die Lösung könnte sein, schreiben Sie eine benutzerdefinierte Partitionierer zusammen mit einem custom-RDD, wo wir definieren getPreferredLocations
... aber ich dachte, das ist eine so einfache und gewöhnliche Sache zu tun, doch es muss ein straight forward Weise, es zu tun?
Sachen ausprobiert:
.set("spark.default.parallelism", partitions)
auf meine SparkConf
, und wenn in den Kontext des Lesens Parkett ich habe versucht sqlContext.sql("set spark.sql.shuffle.partitions= ...
, die auf 1.0.0 verursacht einen Fehler UND nicht wirklich wollen, ich will, ich will die partition Nummer zu ändern, die über alle Arten von Arbeit, nicht nur mischt.
- Viel Glück finden eine Lösung dafür?
Du musst angemeldet sein, um einen Kommentar abzugeben.
Watch this space
https://issues.apache.org/jira/browse/SPARK-5997
Diese Art von wirklich einfachen, offensichtlichen Funktion irgendwann umgesetzt werden - ich denke nur, nachdem Sie beenden Sie alle unnötigen Funktionen in
Dataset
s.Ich nicht genau verstehe, was dein Punkt ist. Meinst du, Sie haben jetzt 5 Partitionen, aber nach der nächsten operation, die Sie wünschen, dass Daten verteilt auf 10? Weil mit 10, aber immer noch mit 5 macht nicht viel Sinn... den Prozess Der das senden von Daten, um neue Partitionen zu geschehen hat irgendwann.
Wenn dabei
coalesce
können Sie loswerden der unsued Partitionen, zum Beispiel: wenn Sie zunächst 100, dann aber nach reduceByKey du hast 10 (als dort, wo nur 10-Schlüssel), können Sie festlegencoalesce
.Wenn Sie möchten, dass der Prozess in die andere Richtung gehen, Sie könnten nur Kraft einer Art von Partitionierung:
Ich bin nicht sicher, dass das, was du suchst, aber hoffe ja.
repartition
oder Ihren code ein, um 10 Partitionen, das wird schieben Sie die Daten - das sind Daten, die für jede der 5 Knoten passieren können, die über das Netzwerk auf andere Knoten. Was ich will, ist, dass Spark einfach teilt jeder partition in 2 ohne verschieben von Daten um - das ist, was passiert in Hadoop als tweaking-split-Einstellungen..forEachNode
Funktion. Aber ich habe nie etwas gesehen wie diese. Und ich bin mir nicht sicher, ob es kann leicht implementiert werden. Der Partitionierer wieder die gleiche partition für das gleiche Objekt jedes mal. Standardmäßig Funken die VerwendungHashPartitioner
, die hashCode modulo number_of_partitions. Wenn Sie nur aufteilen der Daten in zwei neue Partitionen, Sie würde auf jeden Fall am Ende nicht in Ihre Orte. Das ist der Grund, warum shuffle ist notwendig. Vielleicht, wenn Sie Ihre eigenen Partitionierer, es könnte die Erhöhung der Anzahl von Partitionen ohne shuffling-over-net.Wie Sie wissen pyspark eine Art von "faulen" Weg laufen. Es wird nur die Berechnung, wenn es einige action zu tun (zum Beispiel ein "df.count()" oder ein "df.show()". Also, was Sie tun können, ist zu definieren, die eine shuffle-partition zwischen diesen Aktionen.
Können Sie schreiben :
spark.sql.shuffle.partitions
haben nur einen Effekt auf das mischen von Operationen wie Verknüpfungen, aggegation und Sortieren... aber nicht auf die Filterung