Spark-Erstellen von Benutzerdefinierten Spalte, Funktion, Benutzer-definierte Funktion
Ich bin mit Scala und bauen wollen, meine eigene Funktion zu DataFrame. Zum Beispiel möchte ich bei der Behandlung einer Spalte wie ein array Durchlaufen und jedes element, und eine Berechnung.
Um zu beginnen, ich bin zu versuchen, mein eigenes getMax Methode. Also Spalte x die Werte [3,8,2,5,9], und die zu erwartende Ausgang der Verfahren wäre 9.
Hier ist, was es sieht aus wie in Scala
def getMax(inputArray: Array[Int]): Int = {
var maxValue = inputArray(0)
for (i <- 1 until inputArray.length if inputArray(i) > maxValue) {
maxValue = inputArray(i)
}
maxValue
}
Dies ist, was ich habe, so weit, und bekomme diese Fehlermeldung
"value length is not a member of org.apache.spark.sql.column",
und ich weiß nicht, wie sonst zum Durchlaufen der Spalte.
def getMax(col: Column): Column = {
var maxValue = col(0)
for (i <- 1 until col.length if col(i) > maxValue){
maxValue = col(i)
}
maxValue
}
Sobald ich in der Lage bin umzusetzen, meine eigene Methode erstelle ich eine Spalte mit Funktion
val value_max:org.apache.spark.sql.Column=getMax(df.col(“value”)).as(“value_max”)
Und dann hoffe ich in der Lage sein, diese in einer SQL-Anweisung, beispielsweise
val sample = sqlContext.sql("SELECT value_max(x) FROM table")
und die erwartete Ausgabe wäre 9, Eingang Spalte [3,8,2,5,9]
Bin ich nach einer Antwort aus einem anderen thread Spark-Scala - Wie kann ich die Zeilen iterieren, die in dataframe, und fügen Sie die berechneten Werte als neue Spalten des data Frames, wo Sie eine private Methode für die Standardabweichung.
Die Berechnungen, die ich tun wird, sein, mehr komplexer als diese, (e.g ich vergleichen jedes element in der Spalte) , werde ich in den richtigen Weg oder sollte ich suchen in benutzerdefinierten Funktionen?
- Bitte zeigen Sie Ihre input-und output/erwartete dataframes. Verwenden
show
. - Hi @JacekLaskowski danke für den Kommentar, ich habe es bearbeitet die Frage, um es klarer, was ich erreichen möchte.
Du musst angemeldet sein, um einen Kommentar abzugeben.
In einem Spark-DataFrame, kann man nicht iterieren über die Elemente einer Spalte mithilfe der Ansätze, die Sie dachte, da eine Spalte nicht iterierbar Objekt.
Jedoch die Verarbeitung der Werte einer Spalte, haben Sie einige Optionen und die richtige ist, hängt von Ihrer Aufgabe:
1) Nutzung der vorhandenen built-in-Funktionen
Spark SQL verfügt bereits über viele nützliche Funktionen für die Bearbeitung von Spalten, einschließlich aggregation und transformation-Funktionen. Die meisten von Ihnen finden Sie in der
functions
- Paket (Dokumentation hier). Einige andere (binäre Funktionen im Allgemeinen) finden Sie direkt in derColumn
Objekt (Dokumentation hier). Also, wenn Sie Sie verwenden können, ist es in der Regel die beste option. Hinweis: vergessen Sie nicht, die Fenster-Funktionen.2) Erstellen einer UDF -
Wenn Sie nicht abschließen können Ihre Aufgabe mit der built-in-Funktionen, können Sie erwägen, die die Definition einer UDF (User Defined Function). Sie sind nützlich, wenn Sie Bearbeiten können jedes Element einer Spalte unabhängig und Sie erwarten, um zu produzieren, eine neue Spalte mit der gleichen Anzahl von Zeilen wie das original (nicht die aggregierte Spalte). Diese Vorgehensweise ist Recht einfach: zunächst definieren Sie eine einfache Funktion, dann registrieren Sie es als eine UDF, dann benutzen Sie es. Beispiel:
Weitere Informationen hier ein schöner Artikel.
3) Mit einem UDAF
Wenn Ihre Aufgabe ist es, aggregierte Daten anlegen, können Sie definieren, UDAF (benutzerdefinierte Aggregation-Funktion). Ich habe nicht viel Erfahrung damit, aber ich kann Ihnen ein nettes tutorial:
https://ragrawal.wordpress.com/2015/11/03/spark-custom-udaf-example/
4) zurückgreifen, um RDD Verarbeitung
Wenn Sie wirklich können nicht, verwenden Sie die Optionen oben, oder wenn Sie die Bearbeitung der Aufgabe hängt von verschiedenen Zeilen für die Verarbeitung ein und es ist nicht eine aggregation, dann denke ich, Sie müssten wählen Sie die Spalte, die Sie möchten und über die entsprechende RDD. Beispiel:
So, es gab die Optionen, die ich denken konnte. Ich hoffe, es hilft.
groupBy
, also es kann sich wieder ein aggregierter Wert für jeden einzelnen Wert in den Spalten übergebengroupBy
(ähnlich wie ein einfachesdf.groupBy("key").agg(avg("value"))
funktioniert). Allerdings, wenn Sie nicht verwenden Sie groupBy, die UDAF, liefern nur einen Wert.Ein einfaches Beispiel ist gegeben in der ausgezeichnete Dokumentation, wo ein ganzer Abschnitt gewidmet ist UDFs: