Tipps für das richtig die Verwendung von großen broadcast-Variablen?

Ich bin mit einem broadcast-variable über 100 MB gebeizt Größe, die ich bin, die Annäherung mit:

>>> data = list(range(int(10*1e6)))
>>> import cPickle as pickle
>>> len(pickle.dumps(data))
98888896

Läuft auf einem cluster mit 3 c3.2xlarge Vollzieher, und ein m3.große Treiber mit dem folgenden Befehl starten Sie die interaktive Sitzung:

IPYTHON=1 pyspark --executor-memory 10G --driver-memory 5G --conf spark.driver.maxResultSize=5g

In einem RDD, wenn ich anhalten, einen Verweis auf dieses broadcast-Variablen, die Speicher-Nutzung explodiert. Für 100 Verweise auf eine 100-MB-Variablen, auch wenn es kopiert wurden 100 mal, ich würde erwarten, dass die Daten-Nutzung nicht mehr als 10 GB insgesamt (geschweige denn 30 GB über 3 Knoten). Allerdings sehe ich die out-of-memory-Fehler, wenn ich den folgenden test ausführen:

data = list(range(int(10*1e6)))
metadata = sc.broadcast(data)
ids = sc.parallelize(zip(range(100), range(100)))
joined_rdd = ids.mapValues(lambda _: metadata.value)
joined_rdd.persist()
print('count: {}'.format(joined_rdd.count()))

Den stack-trace:

TaskSetManager: Lost task 17.3 in stage 0.0 (TID 75, 10.22.10.13): 

org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
    process()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func
    return func(split, prev_func(split, iterator))
  File "/usr/lib/spark/python/pyspark/rdd.py", line 317, in func
    return f(iterator)
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <lambda>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <genexpr>
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream
    yield self._read_with_length(stream)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
    return self.loads(obj)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
    return pickle.loads(obj)
MemoryError


  at org.apache.spark.api.python.PythonRDD$$anon$1.read(PythonRDD.scala:138)
  at org.apache.spark.api.python.PythonRDD$$anon$1.<init>(PythonRDD.scala:179)
  at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
  at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
  at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
  at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
  at org.apache.spark.scheduler.Task.run(Task.scala:88)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at org.apache.spark.scheduler.Task.run(Task.scala:88)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
  at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
  at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
  at java.lang.Thread.run(Thread.java:745)

16/05/25 23:57:15 ERROR TaskSetManager: Task 17 in stage 0.0 failed 4 times; aborting job
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-1-7a262fdfa561> in <module>()
      7 joined_rdd.persist()
      8 print('persist called')
----> 9 print('count: {}'.format(joined_rdd.count()))

/usr/lib/spark/python/pyspark/rdd.py in count(self)
   1004         3
   1005         """
-> 1006         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
   1007
   1008     def stats(self):

/usr/lib/spark/python/pyspark/rdd.py in sum(self)
    995         6.0
    996         """
--> 997         return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
    998
    999     def count(self):

/usr/lib/spark/python/pyspark/rdd.py in fold(self, zeroValue, op)
    869         # zeroValue provided to each partition is unique from the one provided
    870         # to the final reduce call
--> 871         vals = self.mapPartitions(func).collect()
    872         return reduce(op, vals, zeroValue)
    873

/usr/lib/spark/python/pyspark/rdd.py in collect(self)
    771         """
    772         with SCCallSiteSync(self.context) as css:
--> 773             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
    774         return list(_load_from_socket(port, self._jrdd_deserializer))
    775

/usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)

  at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
  at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
  at py4j.Gateway.invoke(Gateway.java:259)
  at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
  at py4j.commands.CallCommand.execute(CallCommand.java:79)
  at py4j.GatewayConnection.run(GatewayConnection.java:207)
  at java.lang.Thread.run(Thread.java:745)

Ich habe gesehen früheren threads über die Speicherauslastung Gurke Deserialisierung ein Problem. Allerdings würde ich erwarten, dass Sie ein broadcast-variable nur deserialisiert werden (und in den Arbeitsspeicher geladen, auf einen Testamentsvollstrecker) einmal, und die nachfolgenden Verweise auf .value zu verweisen, dass in-memory-Adresse. Das scheint nicht der Fall zu sein, jedoch. Bin ich etwas fehlt?

Die Beispiele, die ich gesehen habe mit broadcast-Variablen haben Sie als Wörterbücher, eine Zeit, zu transformieren eine Reihe von Daten (D. H. ersetzen Sie Flughafen Abkürzungen mit Flughafen-Namen). Die motivation hinter persistierenden Sie hier ist, um Objekte zu erstellen mit dem wissen einer broadcast-variable und wie man mit ihm interagieren, bestehen die Objekte, und führen Sie mehrere Berechnungen mit Ihnen (mit spark aufpassen, hält Sie im Speicher).

Was sind einige Tipps für die Verwendung von großen (100 MB+) broadcast-Variablen? Anhalten einer Sendung variable fehlgeleitet? Ist dies ein Thema ist, das möglicherweise spezifisch für PySpark?

Danke! Ihre Hilfe wird sehr geschätzt.

Hinweis, ich habe auch gepostet, diese Frage auf die databricks Foren

Edit - Nachtrag-Frage:

Es wurde vorgeschlagen, dass die Standard-Funke-serializer hat eine batch-Größe von 65337. Serialisiert Objekte in verschiedenen Chargen sind nicht identifiziert als bei demselben und zugewiesen sind, verschiedenen Speicher-Adressen, die hier untersucht werden, über die eingebauten id Funktion. Allerdings auch mit einem größeren broadcast-variable, die würde in der Theorie nehmen die 256 Reihen zu serialisieren, ich sehe immer noch nur 2 verschiedene Exemplare. Sollte ich nicht sehen, viele mehr? Ist mein Verständnis von, wie batch-Serialisierung arbeitet falsch?

>>> sc.serializer.bestSize
65536
>>> import cPickle as pickle
>>> broadcast_data = {k: v for (k, v) in enumerate(range(int(1e6)))}
>>> len(pickle.dumps(broadcast_data))
16777786
>>> len(pickle.dumps({k: v for (k, v) in enumerate(range(int(1e6)))})) / sc.serializer.bestSize
256
>>> bd = sc.broadcast(broadcast_data)
>>> rdd = sc.parallelize(range(100), 1).map(lambda _: bd.value)
>>> rdd.map(id).distinct().count()
1
>>> rdd.cache().count()
100
>>> rdd.map(id).distinct().count()
2
Schreibe einen Kommentar