So speichern Sie die Ausgabe von spark sql-Abfrage in eine text-Datei
Ich bin das schreiben eines einfachen Verbraucher-Programm mit spark streaming. Mein code speichern Sie einige der Daten in der Datei, aber nicht ALLE Daten. Kann mir jemand helfen wie dieses Problem zu beheben. Ich bin nicht sicher, wo ich bin verlieren der Daten. Ich bekomme die Daten von kafka Thema, dann habe ich mit meinem schema von java-Bean-Klasse.
public class ConsumerFile {
public static void main(String[] args){
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
String topic = args[0];
final String path=new String(args[2]);
String broker = args[1];
SparkConf sparkConf = new SparkConf().setAppName("SparkConsumer").setMaster("local[*]");;
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
HashSet<String> topicsSet = new HashSet<String>(Arrays.asList(topic.split(",")));
HashMap<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", broker);
JavaPairInputDStream<String, String> kafkaStream = KafkaUtils.createDirectStream(
ssc, String.class, String.class,StringDecoder.class,StringDecoder.class,kafkaPrams,
topicsSet
);
JavaDStream<String> words = kafkaStream.map(new Function<Tuple2<String, String>, String>()
{
public String call(Tuple2<String, String> message)
{
return message._2();}});
words.foreachRDD(
new Function2<JavaRDD<String>, Time, Void>() {
public Void call(JavaRDD<String> rdd, Time time) {
SQLContext sqlContext = JavaSQLContextSingleton.getInstance(rdd.context());
//Convert RDD[String] to RDD[case class] to DataFrame
JavaRDD<JavaRow> rowRDD = rdd.map(new Function<String, JavaRow>() {
public JavaRow call(String line) throws Exception{
String[] fields = line.split(",");
JavaRow record = new JavaRow(fields[0], fields[1],fields[2] );
return record;
}
});
DataFrame wordsDataFrame = sqlContext.createDataFrame(rowRDD, JavaRow.class);
wordsDataFrame.registerTempTable("Data");
DataFrame wDataFrame = sqlContext.sql(" select * from Data");
if(!wDataFrame.rdd().isEmpty()){
wDataFrame.rdd().coalesce(1,true,null).saveAsTextFile(path); }
return null;
}} );
ssc.start();
ssc.awaitTermination();}
}
InformationsquelleAutor Hoda Moradi | 2016-03-17
Du musst angemeldet sein, um einen Kommentar abzugeben.
Das könnte sein, weil Sie nicht die Angabe des Modus des Schreibens. Stattdessen verwenden Sie diese,
P. s: ich bin nicht tun es in java, die ich gab, ist eine scala/python-äquivalent
InformationsquelleAutor avrsanjay
wird es geschrieben als einen partitionierten text-Datei, so müssen Sie Ihre Ergebnisse Abstand unter einem Haufen von Dateien mit der Bezeichnung Teil-00000, aber es wird da sein.
InformationsquelleAutor Jerry Schirmer
Fand ich heraus, warum es das macht, falls jemand anderes das selbe problem hat. Wenn Sie das tun foreachRDD es führt im wesentlichen die Funktion auf jeden RDD von der DStream Sie speichern Sie Sie alle auf die gleiche Datei. Also Sie überschreiben jeweils anderen Daten und das erste oder Letzte writer wins. Der einfachste fix ist, speichern Sie Sie in eine Datei mit einem eindeutigen Namen. So habe ich saveAsTextFile(Weg + Zeit().Millisekunden().toString()) und das problem behoben. Aber hätte Sie den gleichen Zeitstempel zweimal, so daß ich diese noch einzigartiger, indem Sie hinzufügen einer Zufallszahl.
InformationsquelleAutor Hoda Moradi