Tipps für das richtig die Verwendung von großen broadcast-Variablen?
Ich bin mit einem broadcast-variable über 100 MB gebeizt Größe, die ich bin, die Annäherung mit:
>>> data = list(range(int(10*1e6)))
>>> import cPickle as pickle
>>> len(pickle.dumps(data))
98888896
Läuft auf einem cluster mit 3 c3.2xlarge Vollzieher, und ein m3.große Treiber mit dem folgenden Befehl starten Sie die interaktive Sitzung:
IPYTHON=1 pyspark --executor-memory 10G --driver-memory 5G --conf spark.driver.maxResultSize=5g
In einem RDD, wenn ich anhalten, einen Verweis auf dieses broadcast-Variablen, die Speicher-Nutzung explodiert. Für 100 Verweise auf eine 100-MB-Variablen, auch wenn es kopiert wurden 100 mal, ich würde erwarten, dass die Daten-Nutzung nicht mehr als 10 GB insgesamt (geschweige denn 30 GB über 3 Knoten). Allerdings sehe ich die out-of-memory-Fehler, wenn ich den folgenden test ausführen:
data = list(range(int(10*1e6)))
metadata = sc.broadcast(data)
ids = sc.parallelize(zip(range(100), range(100)))
joined_rdd = ids.mapValues(lambda _: metadata.value)
joined_rdd.persist()
print('count: {}'.format(joined_rdd.count()))
Den stack-trace:
TaskSetManager: Lost task 17.3 in stage 0.0 (TID 75, 10.22.10.13):
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/lib/spark/python/pyspark/rdd.py", line 317, in func
return f(iterator)
File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <lambda>
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <genexpr>
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream
yield self._read_with_length(stream)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
MemoryError
at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
16/05/25 23:57:15 ERROR TaskSetManager: Task 17 in stage 0.0 failed 4 times; aborting job
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-1-7a262fdfa561> in <module>()
7 joined_rdd.persist()
8 print('persist called')
----> 9 print('count: {}'.format(joined_rdd.count()))
/usr/lib/spark/python/pyspark/rdd.py in count(self)
1004 3
1005 """
-> 1006 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
1007
1008 def stats(self):
/usr/lib/spark/python/pyspark/rdd.py in sum(self)
995 6.0
996 """
--> 997 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
998
999 def count(self):
/usr/lib/spark/python/pyspark/rdd.py in fold(self, zeroValue, op)
869 # zeroValue provided to each partition is unique from the one provided
870 # to the final reduce call
--> 871 vals = self.mapPartitions(func).collect()
872 return reduce(op, vals, zeroValue)
873
/usr/lib/spark/python/pyspark/rdd.py in collect(self)
771 """
772 with SCCallSiteSync(self.context) as css:
--> 773 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
774 return list(_load_from_socket(port, self._jrdd_deserializer))
775
/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Ich habe gesehen früheren threads über die Speicherauslastung Gurke Deserialisierung ein Problem. Allerdings würde ich erwarten, dass Sie ein broadcast-variable nur deserialisiert werden (und in den Arbeitsspeicher geladen, auf einen Testamentsvollstrecker) einmal, und die nachfolgenden Verweise auf .value
zu verweisen, dass in-memory-Adresse. Das scheint nicht der Fall zu sein, jedoch. Bin ich etwas fehlt?
Die Beispiele, die ich gesehen habe mit broadcast-Variablen haben Sie als Wörterbücher, eine Zeit, zu transformieren eine Reihe von Daten (D. H. ersetzen Sie Flughafen Abkürzungen mit Flughafen-Namen). Die motivation hinter persistierenden Sie hier ist, um Objekte zu erstellen mit dem wissen einer broadcast-variable und wie man mit ihm interagieren, bestehen die Objekte, und führen Sie mehrere Berechnungen mit Ihnen (mit spark aufpassen, hält Sie im Speicher).
Was sind einige Tipps für die Verwendung von großen (100 MB+) broadcast-Variablen? Anhalten einer Sendung variable fehlgeleitet? Ist dies ein Thema ist, das möglicherweise spezifisch für PySpark?
Danke! Ihre Hilfe wird sehr geschätzt.
Hinweis, ich habe auch gepostet, diese Frage auf die databricks Foren
Edit - Nachtrag-Frage:
Es wurde vorgeschlagen, dass die Standard-Funke-serializer hat eine batch-Größe von 65337. Serialisiert Objekte in verschiedenen Chargen sind nicht identifiziert als bei demselben und zugewiesen sind, verschiedenen Speicher-Adressen, die hier untersucht werden, über die eingebauten id
Funktion. Allerdings auch mit einem größeren broadcast-variable, die würde in der Theorie nehmen die 256 Reihen zu serialisieren, ich sehe immer noch nur 2 verschiedene Exemplare. Sollte ich nicht sehen, viele mehr? Ist mein Verständnis von, wie batch-Serialisierung arbeitet falsch?
>>> sc.serializer.bestSize
65536
>>> import cPickle as pickle
>>> broadcast_data = {k: v for (k, v) in enumerate(range(int(1e6)))}
>>> len(pickle.dumps(broadcast_data))
16777786
>>> len(pickle.dumps({k: v for (k, v) in enumerate(range(int(1e6)))})) / sc.serializer.bestSize
256
>>> bd = sc.broadcast(broadcast_data)
>>> rdd = sc.parallelize(range(100), 1).map(lambda _: bd.value)
>>> rdd.map(id).distinct().count()
1
>>> rdd.cache().count()
100
>>> rdd.map(id).distinct().count()
2
Du musst angemeldet sein, um einen Kommentar abzugeben.
Gut, der Teufel steckt im detail. Um den Grund zu verstehen, warum dies passieren kann, werden wir noch einen genaueren Blick auf die PySpark serialisierungsprogramme. Zunächst können erstellen
SparkContext
mit Standard-Einstellungen:und überprüfen Sie, was ist eine Standard-serializer:
Sagt es uns drei verschiedene Dinge:
AutoBatchedSerializer
serializerPickleSerializer
durchzuführen eigentlichen jobbestSize
des serialisierten Batch ist 65536 bytesEinen kurzen Blick im source-code wird Ihnen zeigen, dass diese serialisieren passt die Anzahl der Datensätze, die serialisiert in der Zeit auf die Laufzeit und versucht zu halten, batch-Größe von weniger als 10 *
bestSize
. Der wichtige Punkt ist, dass nicht alle Datensätze in der partition sind, die serialisiert in der gleichen Zeit.Wir können überprüfen, dass experimentell wie folgt:
Wie man sehen kann, sogar in diesem einfachen Beispiel nach Serialisierung-Deserialisierung bekommen wir zwei unterschiedliche Objekte. Sie können beobachten, dass ein ähnliches Verhalten arbeiten direkt mit
pickle
:Werte serialisiert in der gleichen batch-Referenz, nach unpickling, das gleiche Objekt. Werte aus verschiedenen Chargen zeigen Sie auf verschiedene Objekte.
In der Praxis Funken mehrere serialisiert und unterschiedliche Serialisierung Strategien. Sie können zum Beispiel verwenden von batches von unendlicher Größe:
Können Sie ändern serializer durch die übergabe
serializer
- und /oderbatchSize
ParameterSparkContext
Konstruktor:Auswahl verschiedener serialisierungsprogramme und Dosier-Strategien, die Ergebnisse in verschiedenen trade-offs (die Geschwindigkeit, die Fähigkeit zur Serialisierung beliebiger Objekte, Arbeitsspeicher, etc.).
Sollten Sie auch daran denken, dass broadcast-Variablen in Spark nicht geteilt zwischen executor-threads so auf die gleiche Person kann existieren mehrere deserialisiert Kopien gleichzeitig.
Darüber hinaus werden Sie sehen, ein ähnliches Verhalten zu diesem, wenn Sie führen Sie eine transformation, die erfordert zu mischen.
AutoBatchedSerializer
passen Sie die batch-Größe auf der Flucht. Überprüfen Sie die Quelle für die exakte Logik, aber es hält Größe und Anzahl der Elemente, die schon zuvor serialisiert.