Java Spark Streaming JSON-parsing
Habe ich angefangen zu lernen, spark-streaming von Spark-Motor und sehr neu in data analytics und Funken. Ich will einfach nur, um zu erstellen ein kleines IOT-Anwendung in die ich möchte, um eine Prognose über die zukünftigen Daten.
Habe ich Tiva hardware sendet Echtzeit-sensor JSON-Daten wie folgt,
[{"t":1478091719000,"sensors":[{"s":"s1","d":"+253.437"},{"s":"s2","d":"+129.750"},{"s":"s3","d":"+45.500"},{"s":"s4","d":"+255.687"},{"s":"s5","d":"+290.062"},{"s":"s6","d":"+281.500"},{"s":"s7","d":"+308.250"},{"s":"s8","d":"+313.812"}]}]
In diesem t unix-Zeitstempel, zu dem Daten gesendet werden.
sensoren sensoren mit jedem sensor('s') Daten als 'd'.
Was ich will zu tun ist, verbrauchen diese Daten und erstellen von Objekten, die spark-streaming und übergeben Sie dann alle Daten, wenn spark Mlib (machine learning) oder gleichwertig-Bibliothek für die Prognose zukünftiger Daten.
Möchte ich eine Allgemeine Idee, ob dies möglich sein wird, mit allen technischen Entscheidungen
- Ich habe beschlossen, zu verwenden?
- Wie kann ich verbrauchen die verschachtelten JSON? Ich habe versucht, mit SQLContext bekam aber keinen Erfolg.
- Allgemeine Richtlinien zu erreichen, was ich hier zu tun versuchen.
Hier ist der code mit dem ich arbeite, um zu konsumieren Nachrichten von KAFKA.
SparkConf conf = new SparkConf().setAppName("DattusSpark").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
//TODO: processing pipeline
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", "kafkaserver_address:9092");
Set<String> topics = Collections.singleton("RAH");
JavaPairInputDStream<String, String> directKafkaStream =
KafkaUtils.createDirectStream(ssc, String.class, String.class, StringDecoder.class,
StringDecoder.class, kafkaParams, topics);
JavaDStream<String> json = directKafkaStream.map(new Function<Tuple2<String,String>, String>() {
public String call(Tuple2<String,String> message) throws Exception {
System.out.println(message._2());
return message._2();
};
});
System.out.println(" json is 0------ 0"+ json);
json.foreachRDD(rdd -> {
rdd.foreach(
record -> System.out.println(record));
});
ssc.start();
ssc.awaitTermination();
PS: ich möchte das in Java zu erhalten Linearität und gute Leistung.
- Können Sie nach dem code, was du bisher ausprobiert? sein ist möglich Sie mit Spark SQL-und-Streaming.
- Gepostet code in Frage.
- wenn Sie versuchen
sqlContext
zum Lesen der json-string, was ist das problem, das Sie konfrontiert? ist diese Aufgabe nicht serialisierbar Problem?
Du musst angemeldet sein, um einen Kommentar abzugeben.
Da Sie mit SPark 2.0, von SparkSession, können Sie Lesen Sie die JSON -
ODER Sie können konvertieren Sie die rdd zu RDD von der Reihe, dann können Sie createDataFrame Methode.
Verschachtelte JSON-Verarbeitung möglich ist, aus dem DF, Sie können Sie diese Artikel.
Auch, sobald Sie konvertieren Sie Ihre json-DF, können Sie es in jedem spark-Module ( wie spark sql, ML)
JavaSparkContext
zu SQLContext(javasparkContext)Antwort auf Ihre Fragen:
1), Ob dies möglich sein wird, mit allen Technologie-Entscheidungen, die ich beschlossen haben, zu verwenden?
2) Wie kann ich verbrauchen die verschachtelten JSON? Ich habe versucht, mit SQLContext bekam aber keinen Erfolg.
3) Allgemeine Richtlinien zu erreichen, was ich hier zu tun versuchen.
Ans: Consuming messages through kafka seems fine, but only a limited machine learning algorithms are supported through streaming.
Wenn Sie verwenden möchten anderen machine-learning-algorithmen oder Drittanbieter-Bibliothek, vielleicht sollten Sie erwägen, die Erstellung des Modells als batch-job-emmiting sich das Modell am Ende. Die streaming-Auftrag sollte das Modell laden und bekommen Strom von Daten und Vorhersagen nur.