So speichern Sie die Ausgabe von spark sql-Abfrage in eine text-Datei

Ich bin das schreiben eines einfachen Verbraucher-Programm mit spark streaming. Mein code speichern Sie einige der Daten in der Datei, aber nicht ALLE Daten. Kann mir jemand helfen wie dieses Problem zu beheben. Ich bin nicht sicher, wo ich bin verlieren der Daten. Ich bekomme die Daten von kafka Thema, dann habe ich mit meinem schema von java-Bean-Klasse.

public class ConsumerFile {
public static void main(String[] args){
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
String topic = args[0];
final String path=new String(args[2]);
String broker = args[1];
SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer").setMaster("local[*]");;
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
    HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topic.split(",")));

    HashMap<String, String> kafkaParams = new HashMap<String, String>();

    kafkaParams.put("metadata.broker.list", broker);
    JavaPairInputDStream<String, String> kafkaStream = KafkaUtils.createDirectStream(
    ssc, String.class, String.class,StringDecoder.class,StringDecoder.class,kafkaPrams,
topicsSet
);

JavaDStream<String> words = kafkaStream.map(new Function<Tuple2<String, String>, String>() 

                                         {
                   public String call(Tuple2<String, String> message)

                                             {
                                                 return message._2();}});
words.foreachRDD(
          new Function2<JavaRDD<String>, Time, Void>() {
       public Void call(JavaRDD<String> rdd, Time time) {
   SQLContext sqlContext = JavaSQLContextSingleton.getInstance(rdd.context());
              //Convert RDD[String] to RDD[case class] to DataFrame
              JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
                public JavaRow call(String line) throws Exception{
                String[] fields = line.split(",");
                  JavaRow record = new JavaRow(fields[0], fields[1],fields[2]  );

                  return record;

                }

              });

              DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD, JavaRow.class);
              wordsDataFrame.registerTempTable("Data");
              DataFrame wDataFrame = sqlContext.sql(" select * from Data");  
              if(!wDataFrame.rdd().isEmpty()){
             wDataFrame.rdd().coalesce(1,true,null).saveAsTextFile(path); }
              return null;
            }} );
ssc.start();
 ssc.awaitTermination();}

}

InformationsquelleAutor Hoda Moradi | 2016-03-17

Schreibe einen Kommentar