Schiefe dataset mitmachen Funke?
Ich bin das verbinden von zwei großen Datensätzen mithilfe von Spark RDD. Ein dataset ist sehr viel schief, so dass einige der Testamentsvollstrecker Aufgaben nehmen eine lange Zeit, um den job zu beenden. Wie kann ich lösen dieses Szenario?
Du musst angemeldet sein, um einen Kommentar abzugeben.
Ziemlich guten Artikel über, wie es getan werden kann: https://datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark/
Kurze version:
small_rdd_transformed = small_rdd.cartesian(sc.parallelize(range(0, N))).map(lambda x: ((x[0][0], x[1]), x[0][1])).coalesce(num_parts).cache() # replicate the small rdd
im scala-x ist ein tuple2, was so tut, x[0][0] bedeutet beispielsweise?..mapPartitionWithIndex
fehltJe nach der bestimmten Art neigen Sie erleben, möglicherweise gibt es verschiedene Möglichkeiten, es zu lösen. Die Grundidee ist:
Den "Kampf der Skew In der Funke" Artikel verwiesen, die in LiMuBei s Antwort ist eine gute Technik, wenn die schiefe der Daten nimmt in der Verknüpfung. In meinem Fall, skew verursacht wurde durch eine sehr große Anzahl von null-Werten im join-Spalte. Die null-Werte wurden nicht an die anzuschließen, aber da Spark Partitionen auf die join-Spalte, die post-join Partitionen waren sehr schief sein, wie es war eine riesige partition, die alle von der null-Werte.
Ich löste es durch hinzufügen einer neuen Spalte, die geändert, werden alle null-Werte zu einem gut verteilten temporären Wert, wie "NULL_VALUE_X", wobei X ersetzt wird durch zufällige zahlen zwischen 1 und 10.000, z.B. (in Java):
Dann der Beitritt auf diese neue Spalte und dann nach der Verknüpfung:
Sagen, Sie haben zum verknüpfen von zwei Tabellen A und B on A. id=B. id. Nehmen wir an, Eine Tabelle hat die skew auf id=1.
d.h. select A. id from A join B on A. id = B. id
Gibt es zwei grundlegende Ansätze zur Lösung der skew join Problem:
Ansatz 1:
Break your query/dataset in 2 Teile - mit nur neigen und die anderen, die nicht verzerrte Daten.
In dem oben genannten Beispiel. Abfrage wird -
Die erste Abfrage wird sich nicht neigen, so dass alle Aufgaben von ResultStage fertig zu ungefähr der gleichen Zeit.
Wenn wir davon ausgehen, dass B nur einige wenige Zeilen mit B. id = 1, dann wird es passen in den Speicher. So den Zweiten Abfrage werden in einer Sendung mitmachen. Dies wird auch als Map-side-join-in-Struktur.
Referenz: https://cwiki.apache.org/confluence/display/Hive/Skewed+Join+Optimierung
Die teilweise die Ergebnisse der beiden Abfragen können dann zusammengeführt werden, um die endgültigen Ergebnisse.
Ansatz 2:
Auch erwähnt LeMuBei oben, der 2. Ansatz versucht, mischen Sie die join-Schlüssel durch anfügen von zusätzlichen Spalte.
Schritte:
Fügen Sie eine Spalte in der größeren Tabelle (A), sagen skewLeft und füllen es mit Zufallszahlen zwischen 0 bis N-1 für alle Zeilen.
Fügen Sie eine Spalte in der kleineren Tabelle (B), sagen skewRight. Replizieren Sie den kleineren Tisch N-mal. Also Werte in neuen skewRight Spalte variiert von 0 bis N-1 für jede Kopie der original-Daten. Für diese, die Sie verwenden können, die explodieren sql/dataset Betreiber.
Nach 1 und 2, kommen die 2 Datensätzen/Tabellen mit join-Bedingung aktualisiert-
Referenz: https://datarus.wordpress.com/2015/05/04/fighting-the-skew-in-spark/
Könnten Sie versuchen, neu zu partitionieren die "schiefe" RDD mehr Partitionen, oder erhöhen Sie die
spark.sql.shuffle.partitions
(das ist standardmäßig 200).In deinem Fall würde ich versuchen, die Anzahl der Partitionen zu werden, viel höher als die Zahl der Vollzieher.