Was ist eine optimierte Möglichkeit der Verbindung von großen Tabellen in Spark SQL
Habe ich gebraucht, die das verknüpfen von Tabellen mit Spark SQL oder Dataframe API. Müssen wissen, was hätte optimiert werden, so erreichen wir es.
Szenario ist:
- Alle Daten in Hive in ORC-format (Base-Dataframe-und Referenz-Dateien).
- Ich brauche um an einem Base-Datei (Dataframe) Lesen von Hive mit 11-13 anderen Referenz-Datei zu erstellen, die eine große in-memory-Struktur (400 Spalten) (etwa 1 TB)
Was kann man am besten Vorgehen um dies zu erreichen? Bitte teilen Sie Ihre Erfahrungen, wenn jemand hat, treten ähnliche problem.
Du musst angemeldet sein, um einen Kommentar abzugeben.
Mein Standard-Beratung zur Optimierung von joins:
Verwenden eine broadcast beitreten, wenn Sie können (siehe dieses notebook). Bei deiner Frage scheint es, Ihre Tabellen groß sind und eine broadcast-Beitritt ist keine option.
Prüfen, mit einem sehr großen cluster (es ist billiger als Sie vielleicht denken). $250 jetzt (6/2016) kauft über 24 Stunden 800 Kerne mit 6 TB RAM und viele SSDs auf dem EC2-spot-instance-Markt. Wenn Sie sich Gedanken über die Gesamtkosten eines big-data-Lösung, finde ich, dass Menschen oft erheblich unterschätzen Ihre Zeit.
Verwenden Sie die gleichen Partitionierer. Sehen diese Frage Informationen zum co-gruppiert Verknüpfungen.
Wenn die Daten riesig ist und/oder Ihre Cluster nicht wachsen können, so dass auch (3) oben führt zu OOM, verwenden Sie ein zwei-pass-Ansatz. Erstens, re-partition, die Daten und speichern Sie mit partitionierten Tabellen (
dataframe.write.partitionBy()
). Dann join sub-Partitionen nacheinander in einer Schleife, "anfügen", um die selben final-Ergebnis-Tabelle.Seite Hinweis: ich sagen Sie "Anhängen" da oben in Produktion, die ich nie verwenden
SaveMode.Append
. Es ist nicht idempotent, und das ist eine gefährliche Sache. Ich benutzeSaveMode.Overwrite
tief in die Unterstruktur einer partitionierten Tabelle-Baum-Struktur. Vor 2.0.0 und 1.6.2 haben Sie zu löschen_SUCCESS
oder Metadaten-Dateien oder dynamische partition Entdeckung ersticken.Hoffe, das hilft.
df.withColumn("par_id", id % 256).repartition(256, 'par_id).write.partitionBy("par_id")...
Dann Durchlaufenpersisted.select('par_id).distinct.collect
Eintritt jede partition + persistierende wieder. Dann union.Partition der Quell-Verwendung hash-Partitionen oder range-Partitionen oder schreiben Sie benutzerdefinierte Partitionen, wenn Sie wissen besser über den Beitritt Felder. Partition wird Ihnen helfen, um neu zu partitionieren während des joins als Funke die Daten von der gleichen partition auf mehrere Tabellen existieren, im gleichen Ort.
ORC wird auf jeden Fall helfen, die Ursache.
WENN dies immer noch zu verschütten, versuchen Sie es mit tachyon, die wird schneller sein als Datenträger