SQL-Abfrage in Spark/scala Größe überschreitet Integer.MAX_VALUE
Ich bin versucht, zu erstellen eine einfache sql-Abfrage auf S3 Ereignisse mit Spark. Ich bin be ~30GB von JSON-Dateien wie folgt:
val d2 = spark.read.json("s3n://myData/2017/02/01/1234");
d2.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK);
d2.registerTempTable("d2");
Dann bin ich versucht zu schreiben, um die Datei, die das Ergebnis meiner Abfrage:
val users_count = sql("select count(distinct data.user_id) from d2");
users_count.write.format("com.databricks.spark.csv").option("header", "true").save("s3n://myfolder/UsersCount.csv");
Aber der Funke werfen die folgende Ausnahme:
java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1287)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:439)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:672)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Beachten Sie, dass die gleiche Abfrage funktioniert für kleinere Mengen von Daten. Was ist hier das problem?
wahrscheinlich Problem mit der Größe der partition von Grenzen überschreiten, versuchen
nach dem Lesen der Daten versuchen, neu zu partitionieren
Als seitliche Anmerkung, möchten Sie vielleicht zu schauen, mit den neueren
Vielen Dank für die Antworten. Die Abfrage arbeitete für 30GB. Nun möchte ich eine Abfrage ausführen, auf ±200 GB an Daten und ich sehe das:
.repartition(100)
etc, das sollte es lösennach dem Lesen der Daten versuchen, neu zu partitionieren
val d2 = spark.read.json("s3n://myData/2017/02/01/1234").repartition(1000)
Verweis issues.apache.org/jira/browse/SPARK-1476Als seitliche Anmerkung, möchten Sie vielleicht zu schauen, mit den neueren
s3a
statt s3n
; siehe z.B. stackoverflow.com/questions/33356041/...Vielen Dank für die Antworten. Die Abfrage arbeitete für 30GB. Nun möchte ich eine Abfrage ausführen, auf ±200 GB an Daten und ich sehe das:
Failed to send RPC 6395111411946395180 to /x.x.x.x:yyyy: java.nio.channels.ClosedChannelException
Und auch Versucht zu bekommen Testamentsvollstrecker Verlust Grund für die executor id 165 bei RPC-Adresse x.x.x.x:yyyyyy, bekam aber keine Antwort. Kennzeichnung als Sklavin verloren." Irgendwelche Ideen? Ich bin das laden von Daten in 100 repartitions.InformationsquelleAutor eexxoo | 2017-02-15
Du musst angemeldet sein, um einen Kommentar abzugeben.
Kein Funke shuffle block kann größer als 2 GB (ganze Zahl.MAX_VALUE bytes), damit Sie brauchen, mehr /kleinere Partitionen.
Stellen Sie Funken.Standard.Parallelität und Funken.sql.shuffle.Partitionen (Standard 200), so dass die Anzahl der Partitionen Platz für Ihre Daten zu erreichen, ohne die 2 GB-Grenze (Sie könnten versuchen, mit dem Ziel für 256MB /- partition, so dass für 200GB Sie bekommen 800-Partitionen). Tausende von Partitionen ist sehr Häufig, so scheuen Sie sich nicht, um neu zu partitionieren, um 1000 als vorgeschlagen.
FYI, können Sie überprüfen die Anzahl der Partitionen für eine RDD mit so etwas wie rdd.getNumPartitions (also d2.rdd.getNumPartitions)
Es ist eine Geschichte, die zu verfolgen sich die Mühe der Auseinandersetzung mit den verschiedenen 2GB-Grenzen (offen gewesen für eine Weile jetzt): https://issues.apache.org/jira/browse/SPARK-6235
Sehen http://www.slideshare.net/cloudera/top-5-mistakes-to-avoid-when-writing-apache-spark-applications/25 für mehr info über diesen Fehler.
InformationsquelleAutor Traian