Serialisierung Ausnahme auf spark
Treffe ich ein sehr seltsames problem auf Spark über die Serialisierung.
Der code ist wie folgt:
class PLSA(val sc : SparkContext, val numOfTopics : Int) extends Serializable
{
def infer(document: RDD[Document]): RDD[DocumentParameter] = {
val docs = documents.map(doc => DocumentParameter(doc, numOfTopics))
docs
}
}
dem Dokument ist definiert als:
class Document(val tokens: SparseVector[Int]) extends Serializable
und DocumentParameter ist:
class DocumentParameter(val document: Document, val theta: Array[Float]) extends Serializable
object DocumentParameter extends Serializable
{
def apply(document: Document, numOfTopics: Int) = new DocumentParameter(document,
Array.ofDim[Float](numOfTopics))
}
SparseVectoris eine serialisierbare Klasse breeze.linalg.SparseVector
.
Dies ist eine einfache map-Verfahren, und alle Klassen, die serialisierbar sind, aber ich bekomme diese exception:
org.apache.spark.SparkException: Task not serializable
Aber wenn ich entfernen Sie die numOfTopics
parameter, das heißt:
object DocumentParameter extends Serializable
{
def apply(document: Document) = new DocumentParameter(document,
Array.ofDim[Float](10))
}
- und nennen es wie folgt:
val docs = documents.map(DocumentParameter.apply)
und es scheint OK.
Ist der Typ Int nicht serialisierbar? Aber ich sehe, dass einige code ist so geschrieben, dass.
Ich bin nicht sicher, wie man diesen Fehler beheben kann.
#AKTUALISIERT#:
Danke @samthebest. Ich will hinzufügen, mehr details über ihn.
stack trace:
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.rdd.RDD.map(RDD.scala:270)
at com.topicmodel.PLSA.infer(PLSA.scala:13)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:37)
at $iwC$$iwC$$iwC.<init>(<console>:39)
at $iwC$$iwC.<init>(<console>:41)
at $iwC.<init>(<console>:43)
at <init>(<console>:45)
at .<init>(<console>:49)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
... 46 more
Den stack trace gibt die Allgemeine information der Ausnahme, habe ich es entfernt.
Ich den code auszuführen, der in der Funke-shell.
//suppose I have get RDD[Document] for docs
val numOfTopics = 100
val plsa = new PLSA(sc, numOfTopics)
val docPara = plsa.infer(docs)
Könnten Sie mir einige tutorials oder Tipps serialisierbar?
Du musst angemeldet sein, um einen Kommentar abzugeben.
Anonyme Funktionen serialize Ihrer übergeordneten Klasse. Wenn Sie
map {doc => DocumentParameter(doc, numOfTopics)}
, die einzige Möglichkeit, die es geben kann, dass die Funktion Zugriff aufnumOfTopics
zu serialisieren, diePLSA
Klasse. Und diese Klasse kann nicht serialisiert werden, weil (wie Sie sehen können aus dem stacktrace) es enthält dieSparkContext
was nicht serialisierbar (die Schlechten Dinge, die passieren würde, wenn die einzelnen cluster-Knoten Zugriff auf den Kontext und könnte z.B. neue Arbeitsplätze zu schaffen in einem mapper).In der Regel versuchen zu vermeiden, die Speicherung der
SparkContext
Sie in Ihren Klassen (edit: oder zumindest, stellen Sie sicher, es ist sehr klar, welche Art von Klassen enthalten, dieSparkContext
und welche nicht); es ist besser, es weiterzugeben, als eine (möglicherweiseimplicit
) - parameter, um einzelne Methoden, die es brauchen. Alternativ verschieben Sie die Funktion{doc => DocumentParameter(doc, numOfTopics)}
in eine andere Klasse vonPLSA
, eine, die wirklich serialisiert werden kann.(Wie mehrere Leute haben vorgeschlagen, es für möglich zu halten
SparkContext
in der Klasse, aber markiert als@transient
so, dass es nicht serialisiert werden. Ich weiß nicht empfehlen, diesen Ansatz; es bedeutet die Klasse "magisch" ändern Ihren Zustand, wenn Sie serialisiert und verlieren dieSparkContext
), und so könnten Sie am Ende mit NPEs, wenn Sie versuchen, Zugriff auf dieSparkContext
aus einem serialisierten job. Es ist besser, eine klare Unterscheidung von Klassen, die nur in der "control" - code (und möglicherweise verwenden Sie dieSparkContext
) und Klassen, die serialisiert werden auf dem cluster ausführen (muss nicht dieSparkContext
)).@transient
vorval sc : SparkContext
, dannSparkContext
wird nicht serialisiert werden.SparkContext
in Ihren Klassen ganz (aber dennoch von Ihnen positiv bewertet werden). Wenn Sie nicht speichern Sie diese in Umfang, dann können Sie am Ende immer parameter aufblasen (die ist hässlich, auch wenn die Verwendung der impliziten Parameter). Die einzige alternative ist, zu stecken es einige Globale singleton irgendwo die Probleme schafft eigene (gefürchteten null-Zeiger).Dies ist in der Tat ein komischer Kauz, aber ich denke, ich kann erraten, das problem. Aber Erstens haben Sie nicht das absolute minimum, um das problem zu lösen (ich vermute es mal, denn ich habe gesehen, 100s von diesen vor). Hier sind einige Probleme, die mit Ihrer Frage:
Diese Methode nicht zurück
RDD[DocumentParameter]
es gibtUnit
. Sie muss kopiert und eingefügt code falsch.Zweitens haben Sie nicht, sofern der gesamte stack-trace? Warum? Es gibt keinen Grund, NICHT zu geben Sie den vollständigen stack-trace, und der volle stack-trace mit der Meldung ist notwendig zu verstehen, die Fehler - braucht man die ganzen Fehler zu verstehen, was der Fehler ist. In der Regel ein nicht serializable exception sagt Sie was ist nicht serialisierbar.
Drittens haben Sie nicht uns gesagt, wo Methode
infer
ist, machst du das in einer shell? Was ist das enthaltende Objekt/Klasse/Eigenschaft etc.infer
?Wie auch immer, ich werde erraten, dass durch die übergabe der
Int
Ihre und verursacht eine Kette von Dingen serialisiert erhalten, die Sie nicht erwarten, ich kann Ihnen nicht mehr Informationen als das, bis Sie das minimum-code, so können wir voll und ganz verstehen, Ihr problem.