Java Spark Streaming JSON-parsing

Habe ich angefangen zu lernen, spark-streaming von Spark-Motor und sehr neu in data analytics und Funken. Ich will einfach nur, um zu erstellen ein kleines IOT-Anwendung in die ich möchte, um eine Prognose über die zukünftigen Daten.

Habe ich Tiva hardware sendet Echtzeit-sensor JSON-Daten wie folgt,

[{"t":1478091719000,"sensors":[{"s":"s1","d":"+253.437"},{"s":"s2","d":"+129.750"},{"s":"s3","d":"+45.500"},{"s":"s4","d":"+255.687"},{"s":"s5","d":"+290.062"},{"s":"s6","d":"+281.500"},{"s":"s7","d":"+308.250"},{"s":"s8","d":"+313.812"}]}]

In diesem t unix-Zeitstempel, zu dem Daten gesendet werden.
sensoren sensoren mit jedem sensor('s') Daten als 'd'.

Was ich will zu tun ist, verbrauchen diese Daten und erstellen von Objekten, die spark-streaming und übergeben Sie dann alle Daten, wenn spark Mlib (machine learning) oder gleichwertig-Bibliothek für die Prognose zukünftiger Daten.

Möchte ich eine Allgemeine Idee, ob dies möglich sein wird, mit allen technischen Entscheidungen

  1. Ich habe beschlossen, zu verwenden?
  2. Wie kann ich verbrauchen die verschachtelten JSON? Ich habe versucht, mit SQLContext bekam aber keinen Erfolg.
  3. Allgemeine Richtlinien zu erreichen, was ich hier zu tun versuchen.

Hier ist der code mit dem ich arbeite, um zu konsumieren Nachrichten von KAFKA.

SparkConf conf = new SparkConf().setAppName("DattusSpark").setMaster("local[2]");

    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));

    //TODO: processing pipeline
    Map<String, String> kafkaParams = new HashMap<String, String>();
    kafkaParams.put("metadata.broker.list", "kafkaserver_address:9092");
    Set<String> topics = Collections.singleton("RAH");


    JavaPairInputDStream<String, String> directKafkaStream = 
            KafkaUtils.createDirectStream(ssc, String.class, String.class, StringDecoder.class,
                    StringDecoder.class, kafkaParams, topics);


    JavaDStream<String> json = directKafkaStream.map(new Function<Tuple2<String,String>, String>() {
        public String call(Tuple2<String,String> message) throws Exception {
            System.out.println(message._2());
            return message._2();
        };
    });


    System.out.println(" json is  0------ 0"+ json);



    json.foreachRDD(rdd -> {
        rdd.foreach(
                record -> System.out.println(record));
    });

    ssc.start();
    ssc.awaitTermination(); 

PS: ich möchte das in Java zu erhalten Linearität und gute Leistung.

  • Können Sie nach dem code, was du bisher ausprobiert? sein ist möglich Sie mit Spark SQL-und-Streaming.
  • Gepostet code in Frage.
  • wenn Sie versuchen sqlContext zum Lesen der json-string, was ist das problem, das Sie konfrontiert? ist diese Aufgabe nicht serialisierbar Problem?
InformationsquelleAutor Rahul Borkar | 2016-11-04
Schreibe einen Kommentar