DataFrame filtern, basierend auf der zweiten Dataframe
Verwendung von Spark-SQL, ich habe zwei dataframes, Sie werden erstellt von einer, wie:
df = sqlContext.createDataFrame(...);
df1 = df.filter("value = 'abc'"); //[path, value]
df2 = df.filter("value = 'qwe'"); //[path, value]
Möchte ich filtern df1, wenn ein Teil von Ihr " Weg " ist jeder Pfad in df2.
Also, wenn df1 hat die Zeile mit dem Pfad "a/b/c/d/e" würde ich heraus finden, ob in df2 ist eine Zeile, Pfad "a/b/c".
In SQL sollte es sein, wie
SELECT * FROM df1 WHERE udf(path) IN (SELECT path FROM df2)
wo udf user defined function, kürzen ursprünglichen Pfad von df1.
Naive Lösung ist die Verwendung von JOIN und dann die Ergebnisse filtern, aber es ist langsam, seit df1 und df2 haben jeweils mehr als 10mil Zeilen.
Ich habe auch versucht folgenden code, aber Erstens hatte ich zum erstellen von broadcast-variable aus df2
static Broadcast<DataFrame> bdf;
bdf = sc.broadcast(df2); //variable 'sc' is JavaSparkContext
sqlContext.createDataFrame(df1.javaRDD().filter(
new Function<Row, Boolean>(){
@Override
public Boolean call(Row row) throws Exception {
String foo = shortenPath(row.getString(0));
return bdf.value().filter("path = '"+foo+"'").count()>0;
}
}
), myClass.class)
das problem das ich habe ist, dass die Funke zu stecken, wenn die Rückkehr ausgewertet wurde/bei der Filterung von df2 durchgeführt wurde.
Ich würde gerne wissen, wie die Arbeit mit zwei dataframes, dies zu tun.
Ich möchte wirklich vermeiden, BEITRETEN. Irgendwelche Ideen?
EDIT>>
In meinem ursprünglichen code df1 hat alias 'ersten' und df2 "zweiten". Diese Verknüpfung ist nicht Kartesisch, und es ist auch nicht broadcast.
df1 = df1.as("first");
df2 = df2.as("second");
df1.join(df2, df1.col("first.path").
lt(df2.col("second.path"))
, "left_outer").
filter("isPrefix(first.path, second.path)").
na().drop("any");
isPrefix ist udf
UDF2 isPrefix = new UDF2<String, String, Boolean>() {
@Override
public Boolean call(String p, String s) throws Exception {
//return true if (p.length()+4==s.length()) and s.contains(p)
}};
shortenPath - es schneidet die letzten beiden Zeichen im Pfad
UDF1 shortenPath = new UDF1<String, String>() {
@Override
public String call(String s) throws Exception {
String[] foo = s.split("/");
String result = "";
for (int i = 0; i < foo.length-2; i++) {
result += foo[i];
if(i<foo.length-3) result+="/";
}
return result;
}
};
Beispiel der Rekorde. Pfad ist einzigartig.
a/a/a/b/c abc
a/a/a qwe
a/b/c/d/e abc
a/b/c qwe
a/b/b/k foo
a/b/f/a bar
...
So df1 bestehend aus
a/a/a/b/c abc
a/b/c/d/e abc
...
und df2 aus
a/a/a qwe
a/b/c qwe
...
- In Bezug auf: Wie können wir an zwei Spark SQL dataframes mit einem SQL-artige "GEFÄLLT mir" - Kriterium?
- Frage bearbeitet wurde. Btw UNION macht Sinn, auch für mich. Aber Funken tut nicht Unterstützung von geschachtelten Abfragen wie "SELECT Pfad FROM blabla WHERE value LIKE 'abc' UND Eltern(Pfad) IN (SELECT Pfad FROM blabla, WO Wert 'qwe')". Es ist auch nicht unterstützt, durch die Verwendung von DataFrame api.
- Haben Sie versucht, eine Filter-pattern? Natürlich würden Sie brauchen, um anzupassen, das gegebene Beispiel, aber ich denke, es kann die Antwort sein
Du musst angemeldet sein, um einen Kommentar abzugeben.
Gibt es zumindest wenige Probleme mit dem code:
DataFrame
einfach nicht funktionieren, und Sie sollten eine Ausnahme.join
Sie verwenden, ausgeführt wird, als ein Kartesisches Produkt gefolgt von filter. Da Funke ist mitHashing
für joins nur die Gleichheit basierte Verknüpfungen können effizient ausgeführt, ohne kartesischen. Es ist leicht mit Bezug auf Warum eine UDF-Datei in eine SQL-Abfrage führt zu kartesischen Produkt?DataFrames
sind relativ groß und haben eine ähnliche Größe dann broadcasting ist unwahrscheinlich, nützlich zu sein. Sehen Warum meine BroadcastHashJoin ist langsamer als ShuffledHashJoin SparkisPrefix
scheint falsch. Insbesondere sieht es aus wie es passen beide Präfix-und suffix -col("first.path").lt(col("second.path"))
Zustand sieht falsch aus. Ich nehme an, Sie wollena/a/a/b/c
ausdf1
matcha/a/a
ausdf2
. Wenn es so sein solltegt
nichtlt
.Wahrscheinlich die beste Sache, die Sie tun können, ist etwas ähnlich wie diese:
Können Sie versuchen, die broadcast-einer der Tabellen wie diese (Funke >= 1.5.0 nur):
erhöhen und das auto broadcast-Grenzen, aber wie ich schon oben erwähnte, es wird wahrscheinlich werden weniger effizient als die einfache
HashJoin
.Als eine Möglichkeit der Implementierung
IN
mit Unterabfrage, dieLEFT SEMI JOIN
können verwendet werden:Dem Physischen Plan von solchen Abfrage wird wie folgt Aussehen:
Wird es mit dem LeftSemiJoinBNL für die eigentliche join-operation, sollte die broadcast-Werte intern. Mehr details beziehen sich auf die tatsächliche implementation in Spark - LeftSemiJoinBNL.scala
P. S. ich habe nicht ganz verstehen die Notwendigkeit für das entfernen der letzten beiden Zeichen, aber wenn das nötig ist, es kann getan werden, wie @zero323 vorgeschlagen (mit
regexp_extract
).contains
(oder sogarstartsWith
undendsWith
) ist, dass es nicht optimiert werden kann. Also, wenn Sie reduzieren das problem auf die Geschlechter zu prüfen ist, wie signifikante performance-boost (LeftSemiJoinHash
vsLeftSemiJoinBNL
).