DataFrame filtern, basierend auf der zweiten Dataframe

Verwendung von Spark-SQL, ich habe zwei dataframes, Sie werden erstellt von einer, wie:

df = sqlContext.createDataFrame(...);
df1 = df.filter("value = 'abc'"); //[path, value]
df2 = df.filter("value = 'qwe'"); //[path, value]

Möchte ich filtern df1, wenn ein Teil von Ihr " Weg " ist jeder Pfad in df2.
Also, wenn df1 hat die Zeile mit dem Pfad "a/b/c/d/e" würde ich heraus finden, ob in df2 ist eine Zeile, Pfad "a/b/c".
In SQL sollte es sein, wie

SELECT * FROM df1 WHERE udf(path) IN (SELECT path FROM df2)

wo udf user defined function, kürzen ursprünglichen Pfad von df1.
Naive Lösung ist die Verwendung von JOIN und dann die Ergebnisse filtern, aber es ist langsam, seit df1 und df2 haben jeweils mehr als 10mil Zeilen.

Ich habe auch versucht folgenden code, aber Erstens hatte ich zum erstellen von broadcast-variable aus df2

static Broadcast<DataFrame> bdf;
bdf = sc.broadcast(df2); //variable 'sc' is JavaSparkContext 

sqlContext.createDataFrame(df1.javaRDD().filter(
         new Function<Row, Boolean>(){
             @Override
             public Boolean call(Row row) throws Exception {
                 String foo = shortenPath(row.getString(0));
                 return bdf.value().filter("path = '"+foo+"'").count()>0;
             }
          }
    ), myClass.class)

das problem das ich habe ist, dass die Funke zu stecken, wenn die Rückkehr ausgewertet wurde/bei der Filterung von df2 durchgeführt wurde.

Ich würde gerne wissen, wie die Arbeit mit zwei dataframes, dies zu tun.
Ich möchte wirklich vermeiden, BEITRETEN. Irgendwelche Ideen?


EDIT>>

In meinem ursprünglichen code df1 hat alias 'ersten' und df2 "zweiten". Diese Verknüpfung ist nicht Kartesisch, und es ist auch nicht broadcast.

df1 = df1.as("first");
df2 = df2.as("second");

    df1.join(df2, df1.col("first.path").
                                lt(df2.col("second.path"))
                                      , "left_outer").
                    filter("isPrefix(first.path, second.path)").
                    na().drop("any");

isPrefix ist udf

UDF2 isPrefix = new UDF2<String, String, Boolean>() {
        @Override
        public Boolean call(String p, String s) throws Exception {
            //return true if (p.length()+4==s.length()) and s.contains(p)
        }};

shortenPath - es schneidet die letzten beiden Zeichen im Pfad

UDF1 shortenPath = new UDF1<String, String>() {
        @Override
        public String call(String s) throws Exception {
            String[] foo = s.split("/");
            String result = "";
            for (int i = 0; i < foo.length-2; i++) {
                result += foo[i];
                if(i<foo.length-3) result+="/";
            }
            return result;
        }
    };

Beispiel der Rekorde. Pfad ist einzigartig.

a/a/a/b/c abc
a/a/a     qwe
a/b/c/d/e abc
a/b/c     qwe
a/b/b/k   foo
a/b/f/a   bar
...

So df1 bestehend aus

a/a/a/b/c abc
a/b/c/d/e abc
...

und df2 aus

a/a/a     qwe
a/b/c     qwe
...
  • In Bezug auf: Wie können wir an zwei Spark SQL dataframes mit einem SQL-artige "GEFÄLLT mir" - Kriterium?
  • Frage bearbeitet wurde. Btw UNION macht Sinn, auch für mich. Aber Funken tut nicht Unterstützung von geschachtelten Abfragen wie "SELECT Pfad FROM blabla WHERE value LIKE 'abc' UND Eltern(Pfad) IN (SELECT Pfad FROM blabla, WO Wert 'qwe')". Es ist auch nicht unterstützt, durch die Verwendung von DataFrame api.
  • Haben Sie versucht, eine Filter-pattern? Natürlich würden Sie brauchen, um anzupassen, das gegebene Beispiel, aber ich denke, es kann die Antwort sein
InformationsquelleAutor HR.AD | 2015-12-16
Schreibe einen Kommentar