SQL-Abfrage in Spark/scala Größe überschreitet Integer.MAX_VALUE

Ich bin versucht, zu erstellen eine einfache sql-Abfrage auf S3 Ereignisse mit Spark. Ich bin be ~30GB von JSON-Dateien wie folgt:

val d2 = spark.read.json("s3n://myData/2017/02/01/1234");
d2.persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK);
d2.registerTempTable("d2");

Dann bin ich versucht zu schreiben, um die Datei, die das Ergebnis meiner Abfrage:

val users_count = sql("select count(distinct data.user_id) from d2");
users_count.write.format("com.databricks.spark.csv").option("header", "true").save("s3n://myfolder/UsersCount.csv");

Aber der Funke werfen die folgende Ausnahme:

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE
at sun.nio.ch.FileChannelImpl.map(FileChannelImpl.java:869)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:103)
at org.apache.spark.storage.DiskStore$$anonfun$getBytes$2.apply(DiskStore.scala:91)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1287)
at org.apache.spark.storage.DiskStore.getBytes(DiskStore.scala:105)
at org.apache.spark.storage.BlockManager.getLocalValues(BlockManager.scala:439)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:672)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Beachten Sie, dass die gleiche Abfrage funktioniert für kleinere Mengen von Daten. Was ist hier das problem?

wahrscheinlich Problem mit der Größe der partition von Grenzen überschreiten, versuchen .repartition(100) etc, das sollte es lösen
nach dem Lesen der Daten versuchen, neu zu partitionieren val d2 = spark.read.json("s3n://myData/2017/02/01/1234").repartition(1000) Verweis issues.apache.org/jira/browse/SPARK-1476
Als seitliche Anmerkung, möchten Sie vielleicht zu schauen, mit den neueren s3a statt s3n; siehe z.B. stackoverflow.com/questions/33356041/...
Vielen Dank für die Antworten. Die Abfrage arbeitete für 30GB. Nun möchte ich eine Abfrage ausführen, auf ±200 GB an Daten und ich sehe das: Failed to send RPC 6395111411946395180 to /x.x.x.x:yyyy: java.nio.channels.ClosedChannelException Und auch Versucht zu bekommen Testamentsvollstrecker Verlust Grund für die executor id 165 bei RPC-Adresse x.x.x.x:yyyyyy, bekam aber keine Antwort. Kennzeichnung als Sklavin verloren." Irgendwelche Ideen? Ich bin das laden von Daten in 100 repartitions.

InformationsquelleAutor eexxoo | 2017-02-15

Schreibe einen Kommentar