Verständnis Funke physischen plan
Ich versuche zu verstehen, physischen Pläne der Funke-aber ich bin nicht zu verstehen, einige Teile, weil Sie scheinen, unterscheiden sich von traditionellen rdbms. Zum Beispiel, in diesem plan unten, es ist ein plan über eine Abfrage über eine hive-Tabelle. Die Abfrage ist:
select
l_returnflag,
l_linestatus,
sum(l_quantity) as sum_qty,
sum(l_extendedprice) as sum_base_price,
sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
avg(l_quantity) as avg_qty,
avg(l_extendedprice) as avg_price,
avg(l_discount) as avg_disc,
count(*) as count_order
from
lineitem
where
l_shipdate <= '1998-09-16'
group by
l_returnflag,
l_linestatus
order by
l_returnflag,
l_linestatus;
== Physical Plan ==
Sort [l_returnflag#35 ASC,l_linestatus#36 ASC], true, 0
+- ConvertToUnsafe
+- Exchange rangepartitioning(l_returnflag#35 ASC,l_linestatus#36 ASC,200), None
+- ConvertToSafe
+- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Final,isDistinct=false),(sum(l_extendedpr#32),mode=Final,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Final,isDistinct=false),(sum(((l_extendedprice#32 * (1.0l_discount#33)) * (1.0 + l_tax#34))),mode=Final,isDistinct=false),(avg(l_quantity#31),mode=Final,isDistinct=false),(avg(l_extendedprice#32),mode=Fl,isDistinct=false),(avg(l_discount#33),mode=Final,isDistinct=false),(count(1),mode=Final,isDistinct=false)], output=[l_returnflag#35,l_linestatus,sum_qty#0,sum_base_price#1,sum_disc_price#2,sum_charge#3,avg_qty#4,avg_price#5,avg_disc#6,count_order#7L])
+- TungstenExchange hashpartitioning(l_returnflag#35,l_linestatus#36,200), None
+- TungstenAggregate(key=[l_returnflag#35,l_linestatus#36], functions=[(sum(l_quantity#31),mode=Partial,isDistinct=false),(sum(l_exdedprice#32),mode=Partial,isDistinct=false),(sum((l_extendedprice#32 * (1.0 - l_discount#33))),mode=Partial,isDistinct=false),(sum(((l_extendedpri32 * (1.0 - l_discount#33)) * (1.0 + l_tax#34))),mode=Partial,isDistinct=false),(avg(l_quantity#31),mode=Partial,isDistinct=false),(avg(l_extendedce#32),mode=Partial,isDistinct=false),(avg(l_discount#33),mode=Partial,isDistinct=false),(count(1),mode=Partial,isDistinct=false)], output=[l_retulag#35,l_linestatus#36,sum#64,sum#65,sum#66,sum#67,sum#68,count#69L,sum#70,count#71L,sum#72,count#73L,count#74L])
+- Project [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_returnflag#35]
+- Filter (l_shipdate#37 <= 1998-09-16)
+- HiveTableScan [l_discount#33,l_linestatus#36,l_tax#34,l_quantity#31,l_extendedprice#32,l_shipdate#37,l_returnflag#35], astoreRelation default, lineitem, None
Für das, was ich bin das Verständnis in dem plan ist:
-
Beginnt zuerst mit einer Hive-Tabelle Scannen
-
Dann filter verwenden, bei denen die Bedingung
-
Dann Projekt, um die Spalten, wir wollen
-
Dann TungstenAggregate?
-
Dann TungstenExchange?
-
Dann TungstenAggregate wieder?
-
Dann ConvertToSafe?
-
Sortiert dann das endgültige Ergebnis
Aber ich bin nicht das Verständnis der 4, 5, 6 und 7 Schritte. Wissen Sie, was Sie sind? Ich bin auf der Suche nach Informationen über dieses, so kann ich verstehen, der plan, aber ich bin nicht etwas zu finden, konkrete.
Du musst angemeldet sein, um einen Kommentar abzugeben.
Lets Blick auf die Struktur der SQL-Abfrage, die Sie verwenden:
Wie Sie bereits vermuten:
Filter (...)
entspricht Prädikate inWHERE
- Klausel (#4
)Project ...
begrenzt die Anzahl der Spalten, in denen eine Vereinigung von (#1
und#2
, und#4
/#6
wenn nicht imSELECT
)HiveTableScan
entsprichtFROM
- Klausel (#3
)Übrigen Teile können wie folgt zugerechnet:
#2
ausSELECT
- Klausel -functions
Feld inTungstenAggregates
GROUP BY
- Klausel (#5
):TungstenExchange
/hash-Partitionierungkey
Feld inTungstenAggregates
#6
-ORDER BY
- Klausel.Wolfram-Projekt im Allgemeinen beschreibt eine Reihe von Optimierungen verwendet von Spark
DataFrames
(-sets
) darunter:sun.misc.Unsafe
. Es bedeutet "native" (off-heap) memory-Nutzung und explizite Speicherreservierung /Befreiung außerhalb GC management. Diese Konvertierungen entsprechenConvertToUnsafe
/ConvertToSafe
Schritte in der Ausführung plan. Sie können lernen, einige interessante details über unsichere aus Verständnis Sonne.misc.UnsicherErfahren Sie mehr über Wolfram im Allgemeinen aus Projekt Wolfram: Die Apache Spark Näher auf Blankem Metall. Apache Spark 2.0: Einfacher, Schneller und Smarter enthält einige Beispiele, code-Generierung.
TungstenAggregate
zweimal vorkommt, weil die Daten zunächst aggregiert, die lokal auf jeder partition, die als gemischt, und schließlich zusammengeführt. Wenn Sie vertraut sind mit RDD-API dieser Prozess ist in etwa vergleichbar mitreduceByKey
.Wenn Ausführungsplan ist nicht klar, können Sie auch versuchen, zu konvertieren resultierenden
DataFrame
zuRDD
und analysieren Sie die Ausgabe vontoDebugString
.Functions
Feld listet alle Aggregationen, die durchgeführt werden in einer bestimmten Phase, währendKey
Feld beschreibt die Gruppierung. es istdf.groupBy(*key).agg(*functions)
.GROUP BY clause (#5):
. Ich möchte zum Bearbeiten von mir, aber stackOverflow mir zu zeigen, dassEdits must be at least 6 characters; is there something else to improve in this post?
Wolfram ist die neue Speicher-engine Spark seit 1.4, das die Verwaltung der Daten außerhalb der JVM zu sparen, GC overhead. Können Sie sich vorstellen, Sie beinhaltet, dass das kopieren von Daten von und zu JVM. Das ist es. In Spark 1.5 kann man wiederum Wolfram aus durch
spark.sql.tungsten.enabled
dann sehen Sie die "alten" plan, Spark 1,6 I denken, Sie können ihn nicht ausschalten, nicht mehr.