Warum hat Spark-Anwendung fehlschlägt mit "ClassNotFoundException: Fehler beim suchen von Daten Quelle: kafka" als uber-jar mit sbt Baugruppe?
Ich versuche, eine Probe wie https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala. Angefangen habe ich mit der Funke Strukturierte Streaming Programming guide bei http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html.
Mein code ist
package io.boontadata.spark.job1
import org.apache.spark.sql.SparkSession
object DirectKafkaAggregateEvents {
val FIELD_MESSAGE_ID = 0
val FIELD_DEVICE_ID = 1
val FIELD_TIMESTAMP = 2
val FIELD_CATEGORY = 3
val FIELD_MEASURE1 = 4
val FIELD_MEASURE2 = 5
def main(args: Array[String]) {
if (args.length < 3) {
System.err.println(s"""
|Usage: DirectKafkaAggregateEvents <brokers> <subscribeType> <topics>
| <brokers> is a list of one or more Kafka brokers
| <subscribeType> sample value: subscribe
| <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}
val Array(bootstrapServers, subscribeType, topics) = args
val spark = SparkSession
.builder
.appName("boontadata-spark-job1")
.getOrCreate()
import spark.implicits._
//Create DataSet representing the stream of input lines from kafka
val lines = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option(subscribeType, topics)
.load()
.selectExpr("CAST(value AS STRING)")
.as[String]
//Generate running word count
val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count()
//Start running the query that prints the running counts to the console
val query = wordCounts.writeStream
.outputMode("complete")
.format("console")
.start()
query.awaitTermination()
}
}
Habe ich noch folgende sbt-Dateien:
bauen.sbt:
name := "boontadata-spark-job1"
version := "0.1"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-streaming_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql_2.11" % "2.0.2" % "provided"
libraryDependencies += "org.apache.spark" % "spark-sql-kafka-0-10_2.11" % "2.0.2"
libraryDependencies += "org.apache.spark" % "spark-streaming-kafka-0-10_2.11" % "2.0.2"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "0.10.1.1"
libraryDependencies += "org.apache.kafka" % "kafka_2.11" % "0.10.1.1"
//META-INF discarding
assemblyMergeStrategy in assembly := {
{
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
}
Ich habe auch Projekt - /Montage.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.3")
Dies schafft eine Uber jar mit der nicht provided
Gläser.
Reiche ich mit der folgenden Zeile:
spark-submit boontadata-spark-job1-assembly-0.1.jar ks1:9092,ks2:9092,ks3:9092 subscribe sampletopic
aber bekomme ich diesen Laufzeitfehler:
Exception in thread "main" java.lang.ClassNotFoundException: Failed to find data source: kafka. Please find packages at https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projects
at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79)
at org.apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79)
at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:218)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:80)
at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:80)
at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124)
at io.boontadata.spark.job1.DirectKafkaAggregateEvents$.main(StreamingJob.scala:41)
at io.boontadata.spark.job1.DirectKafkaAggregateEvents.main(StreamingJob.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:736)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:185)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:210)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:124)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassNotFoundException: kafka.DefaultSource
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5.apply(DataSource.scala:132)
at scala.util.Try.orElse(Try.scala:84)
at org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:132)
... 18 more
16/12/23 13:32:48 INFO spark.SparkContext: Invoking stop() from shutdown hook
Gibt es eine Möglichkeit zu wissen, welche Klasse nicht gefunden, so dass ich die Suche maven.org repo für diese Klasse.
Den lookupDataSource
source-code zu sein scheint, an der Linie 543 auf https://github.com/apache/spark/blob/83a6ace0d1be44f70e768348ae6688798c84343e/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala aber ich konnte nicht finden einen direkten link mit Kafka Datenquelle...
Komplette source code ist hier: https://github.com/boontadata/boontadata-streams/tree/ad0d0134ddb7664d359c8dca40f1d16ddd94053f
Hi @jithinpt, bitte siehe Kommentare in der Antwort als "Antwort" gekennzeichnet.
InformationsquelleAutor benjguin | 2016-12-23
Du musst angemeldet sein, um einen Kommentar abzugeben.
Ich habe versucht, wie diese es ist, für mich zu arbeiten. Einreichen wie diese, und lassen Sie mich wissen, wenn Sie irgendwelche Probleme
Würde ich es nennen, ein workaround und nicht zu erklären, warum die uber-jar funktioniert nicht. Die Ursache ist
assemblyMergeStrategy
imbuild.sbt
, verwirft alleMETA-INF
Dateien inkl. - Anmeldungen. Siehe meine Antwort unten.InformationsquelleAutor Sree Eedupuganti
Die Frage ist der folgende Abschnitt in
build.sbt
:Er sagt, dass alle
META-INF
Einträge verworfen werden sollte, einschließlich der "code", lässt Datenquelle Aliase (z.B.kafka
) arbeiten.Aber die
META-INF
- Dateien sind sehr wichtig fürkafka
(und andere Aliase von streaming data sources) zu arbeiten.Für
kafka
alias zu arbeiten, Funken-SQL verwendet META-INF/services/org.apache.spark.sql.Quellen.DataSourceRegister mit dem folgenden Eintrag:KafkaSourceProvider
verantwortlich ist, sich zu registrierenkafka
alias mit dem richtigen streaming-Datenquelle, d.h. KafkaSource.Nur um zu überprüfen, dass der wahre code ist zwar verfügbar, aber der "code", macht der alias eingetragen ist Sie nicht, Sie könnten die
kafka
Datenquelle mit dem voll qualifizierten Namen (nicht den alias) wie folgt:Sehen Sie andere Probleme, die durch fehlende Optionen wie
kafka.bootstrap.servers
, aber...wir sind abgeschweift.Lösung ist
MergeStrategy.concat
alleMETA-INF/services/org.apache.spark.sql.sources.DataSourceRegister
(das wäre ein uber-jar mit allen Datenquellen, inkl. diekafka
Datenquelle).InformationsquelleAutor Jacek Laskowski
In meinem Fall bekam ich auch diese Fehlermeldung beim kompilieren mit sbt, die Ursache war, dass
sbt assembly
war auch nicht dasspark-sql-kafka-0-10_2.11
Artefakt als Teil des fat-jar.(Ich wäre sehr willkommen, um Kommentare hier. Die Abhängigkeit wurde nicht angegeben, ein Bereich, daher sollte es nicht angenommen werden, "sofern").
Also änderte ich die Bereitstellung eines normal (schlank) jar-einschließlich der Abhängigkeiten mit der
--jars
Parameter Funke zu übermitteln.Um zu sammeln, alle Abhängigkeiten in einem Ort, können Sie hinzufügen
retrieveManaged := true
zu Ihrem sbt-Projekt-Einstellungen, oder Sie können in der sbt-Konsole, Ausgabe:Bringt alle Abhängigkeiten auf die
lib_managed
Ordner.Dann können Sie kopieren Sie alle diese Dateien (mit einem bash-Befehl können Sie zum Beispiel so etwas wie das
InformationsquelleAutor ssice
Ich bin mit der Funke 2.1 und vor der sehr gleiche problem
mein workaround ist
1)
spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.1.0
2)
cd ~/.ivy2/jars
hier sind Sie ,alle benötigten jars sind in diesem Ordner jetzt3) kopieren Sie alle jar-Dateien in diesen Ordner, um alle Knoten(können erstellen Sie einen bestimmten Ordner zu halten)
4) fügen Sie den Namen des Ordners zu
spark.driver.extraClassPath
undspark.driver.extraClassPath
z.B.spark.driver.extraClassPath=/opt/jars/*:your_other_jars
5
spark-submit --class ClassNm --Other-Options YourJar.jar
funktioniert jetztInformationsquelleAutor dalin qin
Ich löste es durch das herunterladen der jar-Datei an den Antrieb. Von dort aus lieferte ich die jar zu Funken reichen mit --jar-option.
Ebenfalls zu beachten ist, dass ich die Verpackung war die ganze Funke 2.1 Umgebung, in meine uber jar (da mein cluster ist noch auf 1.6.1) aus irgendeinem Grund nicht abgeholt, wenn in uber jar.
InformationsquelleAutor Gyan
Dies ist im Hinblick auf Jacek Laskowski Antwort.
Denen Sie den Aufbau Ihrer Projekt auf maven kann das ausprobieren.
Fügen Sie die Zeile, die unten erwähnt werden, um Ihre maven-shade-plugin.
META-INF/services/org.apache.spark.sql.Quellen.DataSourceRegister
Hab ich legte den plugin-code für die pom-Datei als Beispiel, um zu zeigen, wo Sie die Zeile hinzufügen.
Bitte entschuldigen Sie meine Formatierung Fähigkeiten.
InformationsquelleAutor Algomeister