Spark - Random Number Generation
Ich habe geschrieben eine Methode, die berücksichtigen muss, eine Zufallszahl zu simulieren, die eine Bernoulli-Verteilung. Ich bin mit random.nextDouble
generiert eine Zahl zwischen 0 und 1 ist, dann macht meine Entscheidung basierend auf diesem Wert, da meine Wahrscheinlichkeit parameter.
Mein problem ist, dass Funke die Erzeugung der gleichen Zufallszahlen innerhalb jeder iteration von meiner for-Schleife mapping-Funktion. Ich bin mit der DataFrame
API. Mein code hat Folgendes format:
val myClass = new MyClass()
val M = 3
val myAppSeed = 91234
val rand = new scala.util.Random(myAppSeed)
for (m <- 1 to M) {
val newDF = sqlContext.createDataFrame(myDF
.map{row => RowFactory
.create(row.getString(0),
myClass.myMethod(row.getString(2), rand.nextDouble())
}, myDF.schema)
}
Hier ist die Klasse:
class myClass extends Serializable {
val q = qProb
def myMethod(s: String, rand: Double) = {
if (rand <= q) //do something
else //do something else
}
}
Brauche ich eine neue Zufallszahl jedes mal myMethod
genannt wird. Ich habe auch versucht, die Generierung der Nummer in meiner Methode mit java.util.Random
(scala.util.Random
v10 nicht verlängern Serializable
) wie unten, aber ich bin noch immer die gleichen zahlen innerhalb der einzelnen for-Schleife
val r = new java.util.Random(s.hashCode.toLong)
val rand = r.nextDouble()
Ich habe einige der Forschung getan, und es scheint, dies hat zu tun mit Funken deterministische Natur.
Du musst angemeldet sein, um einen Kommentar abzugeben.
Der Grund, warum die gleiche Sequenz wiederholt wird, ist, dass der Zufallsgenerator wird erstellt und initialisiert, mit einem Samen, bevor die Daten partitioniert ist. Jede partition beginnt dann, aus den gleichen random-seed. Vielleicht nicht der effizienteste Weg, es zu tun, aber Folgendes sollte funktionieren:
java.util.Random
für serializeability Gründen.Verwenden Sie einfach die SQL-Funktion
rand
:Laut dieser Beitrag, die beste Lösung ist, nicht zu den
new scala.util.Random
im inneren der Karte, noch völlig außerhalb (dh. in der Treiber-code), sondern in einem ZwischenschrittmapPartitionsWithIndex
:Verwendung von Spark-Dataset API, vielleicht für den Einsatz in einem Akkumulator: