Spark mehreren Kontexten
In kurzen :
EC2-cluster: 1 master-3 slaves
Spark version : 1.3.1
Ich wollen, verwenden Sie die option Funken.- Treiber.allowMultipleContexts, einem lokalen Kontext (nur master) und ein cluster (master und slaves).
Bekomme ich diese stacktrace des Fehlers (Zeile 29 ist, wo rufe ich das Objekt, initialisiert die zweite sparkcontext) :
fr.entry.Main.main(Main.scala)
at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1$$anonfun$apply$10.apply(SparkContext.scala:1812)
at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1$$anonfun$apply$10.apply(SparkContext.scala:1808)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:1808)
at org.apache.spark.SparkContext$$anonfun$assertNoOtherContextIsRunning$1.apply(SparkContext.scala:1795)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.SparkContext$.assertNoOtherContextIsRunning(SparkContext.scala:1795)
at org.apache.spark.SparkContext$.setActiveContext(SparkContext.scala:1847)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:1754)
at fr.entry.cluster$.<init>(Main.scala:79)
at fr.entry.cluster$.<clinit>(Main.scala)
at fr.entry.Main$delayedInit$body.apply(Main.scala:29)
at scala.Function0$class.apply$mcV$sp(Function0.scala:40)
at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.App$$anonfun$main$1.apply(App.scala:71)
at scala.collection.immutable.List.foreach(List.scala:318)
at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:32)
at scala.App$class.main(App.scala:71)
at fr.entry.Main$.main(Main.scala:14)
at fr.entry.Main.main(Main.scala)
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app- 20150928153330-0036/2 is now LOADING
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app- 20150928153330-0036/0 is now RUNNING
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app-20150928153330-0036/1 is now RUNNING
15/09/28 15:33:30 INFO SparkContext: Starting job: sum at Main.scala:29
15/09/28 15:33:30 INFO DAGScheduler: Got job 0 (sum at Main.scala:29) with 2 output partitions (allowLocal=false)
15/09/28 15:33:30 INFO DAGScheduler: Final stage: Stage 0(sum at Main.scala:29)
15/09/28 15:33:30 INFO DAGScheduler: Parents of final stage: List()
15/09/28 15:33:30 INFO DAGScheduler: Missing parents: List()
15/09/28 15:33:30 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[2] at numericRDDToDoubleRDDFunctions at Main.scala:29), which has no missing parents
15/09/28 15:33:30 INFO MemoryStore: ensureFreeSpace(2264) called with curMem=0, maxMem=55566516879
15/09/28 15:33:30 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 2.2 KB, free 51.8 GB)
15/09/28 15:33:30 INFO MemoryStore: ensureFreeSpace(1656) called with curMem=2264, maxMem=55566516879
15/09/28 15:33:30 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1656.0 B, free 51.8 GB)
15/09/28 15:33:30 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:40476 (size: 1656.0 B, free: 51.8 GB)
15/09/28 15:33:30 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0
15/09/28 15:33:30 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:839
15/09/28 15:33:30 INFO AppClient$ClientActor: Executor updated: app-20150928153330-0036/2 is now RUNNING
15/09/28 15:33:30 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MapPartitionsRDD[2] at numericRDDToDoubleRDDFunctions at Main.scala:29)
15/09/28 15:33:30 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
15/09/28 15:33:45 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
15/09/28 15:34:00 WARN TaskSchedulerImpl: Initial job has not accepted any resources; check your cluster UI to ensure that workers are registered and have sufficient resources
Mehr details :
Ich würde gerne ein Programm das macht zwei Dinge. Zum einen habe ich eine sparkContext lokal (auf dem master), mache ich eine RDD und einige Operationen. Zweitens habe ich eine zweite sparkContext initialisieren mit einem master und 3 slaves, die auch eine RDD und hat einige Operationen.
Also im ersten Fall will ich mit den 16 Kernen des Meisters-und der zweite Fall, den ich verwenden möchten, die 8cores x 3 der Sklaven.
Einfaches Beispiel :
val arr = Array(Array(1, 2, 3, 4, 5, 6, 7, 8), Array(1, 2, 3, 4, 5, 6, 7, 8))
println(local.sparkContext.makeRDD(arr).count())
println(cluster.sparkContext.makeRDD(arr).map(l => l.sum).sum)
Meine zwei SparkContexts :
object local {
val project = "test"
val version = "1.0"
val sc = new SparkConf()
.setMaster("local[16]")
.setAppName("Local")
.set("spark.local.dir", "/mnt")
.setJars(Seq("target/scala-2.10/" + project + "-assembly-" + version + ".jar", "target/scala-2.10/" + project + "_2.10-" + version + "-tests.jar"))
.setSparkHome("/root/spark")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.executor.memory", "45g")
val sparkContext = new SparkContext(sc)
}
object cluster {
val project = "test"
val version = "1.0"
val sc = new SparkConf()
.setMaster(masterURL) //ec2-XX-XXX-XXX-XX.compute-1.amazonaws.com
.setAppName("Cluster")
.set("spark.local.dir", "/mnt")
.setJars(Seq("target/scala-2.10/" + project + "-assembly-" + version + ".jar", "target/scala-2.10/" + project + "_2.10-" + version + "-tests.jar") ++ otherJars)
.setSparkHome("/root/spark")
.set("spark.driver.allowMultipleContexts", "true")
.set("spark.executor.memory", "35g")
val sparkContext = new SparkContext(sc)
}
Wie kann ich dieses Problem beheben?
- Können Sie zählen, warum Sie verwenden wollen, zwei Kontexten? Mehr als oft nicht, ist dies nicht erforderlich
- Sagen wir, ich habe 3 Programme : prog1 und prog3 parallel laufen können und prog2 sequenziell sein. Prog1 ' s, Ausgang 15 RDDs (Datensätze). Prog2 ist ein machine-learning-Algorithmus, die muss ich 15 mal laufen. Als prog2 ausführen auf 1 Kern vor Ort. Ich machte einen kleinen hack was ist ein RDD-enthält die 15 gesammelten Datensätze . Ich Karte auf dieser RDD und führen prog2 auf jeden Datensatz. Prog3 nehmen Sie die 15 Ergebnisse von prog2 und einige Vorgänge parallel. Ich hoffe, das ist klar? Ich denke in meinem Fall ist dies erforderlich, aber wenn es nicht, ich bin auch daran interessiert zu wissen, die Antwort von meinem kleinen Beispiel.
Du musst angemeldet sein, um einen Kommentar abzugeben.
Obwohl Konfigurationsoption Funken.- Treiber.allowMultipleContexts vorhanden ist, ist es irreführend, weil die Nutzung von mehreren Funken Kontexten wird abgeraten. Diese option ist nur für Funke-internen tests und sollte nicht verwendet werden, in Benutzer-Programme. Sie können unerwartete Ergebnisse erhalten, während der Ausführung mehr als einen Funken Kontext, in einer einzigen JVM.
Wenn die Koordination ist es erforderlich, zwischen 2 Programmen, dann wäre es besser zu machen, Teil eines einzigen Funken Anwendung zu nutzen, Funken interne Optimierungen und vermeiden Sie unnötige i/o.
Zweitens, wenn 2 Anwendungen nicht koordinieren müssen, in irgendeiner Weise, die Sie starten können 2 separate Programme. Da Sie mit Amazon EC2/EMR, können Sie GARN als Ressource-manager ohne erhebliche Zeit-Investition, wie beschrieben,hier.
Wenn Sie eine Notwendigkeit, um die Arbeit mit vielen der Funke zusammenhängen, können Sie auf spezielle option [MultipleContexts] (1) , aber es ist nur für die Funke internen tests und sollte nicht verwendet werden, in Benutzer-Programme. Sie erhalten unerwartetes Verhalten während der Ausführung mehr als einen Funken Kontext, in einer einzigen JVM [SPARK-2243] (2).
Es ist jedoch möglich, verschiedene Kontexte in getrennten JVMs und verwalten von Kontexten im SparkConf Ebene, die optimal passen Sie die ausführbare Jobs.
Sieht es wie folgt aus:
Nebel schafft jede neue Sparkcontext in seiner eigenen JVM.
Es ist eine middleware, die auf der Oberseite des Spark - [Mist]. Es verwaltet Funke Kontexten und mehrere JVMs, so könnten Sie haben verschiedene Aufgaben, wie zum Beispiel ETL-pipeline, eine schnelle Prognose job, eine ad-hoc-Hive-Abfrage und Spark-streaming-Anwendungen laufen parallel auf dem gleichen cluster.
1> github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/SparkContextSuite.scala#L67
2> issues.apache.org/jira/browse/SPARK-2243
Java:
+
Es funktioniert für mich.