Hadoop MapReduce Art reduzieren Ausgabe mit dem Schlüssel

da unten ist eine map-reduce-Programm zählen Wörter, die aus mehreren text-Dateien.
Mein Ziel ist es, das Ergebnis in absteigender Reihenfolge in Bezug auf die Menge Partys.

Leider das Programm sortiert die Ausgabe lexikographisch, indem Sie die-Taste. Ich möchte eine Natürliche Ordnung der integer-Wert.

So dass ich Hinzugefügt eine benutzerdefinierte Komparator mit job.setSortComparatorClass(IntComparator.class). Aber dies funktioniert nicht wie erwartet. Ich bin immer folgende exception:

java.lang.Exception: java.nio.BufferUnderflowException
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:404)
Caused by: java.nio.BufferUnderflowException
    at java.nio.Buffer.nextGetIndex(Buffer.java:498)
    at java.nio.HeapByteBuffer.getInt(HeapByteBuffer.java:355)
    at WordCount$IntComparator.compare(WordCount.java:128)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.compare(MapTask.java:987)
    at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:100)
    at org.apache.hadoop.util.QuickSort.sort(QuickSort.java:64)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1277)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1174)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:609)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:675)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:266)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
    at java.util.concurrent.FutureTask.run(FutureTask.java:166)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:722)

Jede mögliche Hilfe würde geschätzt! 🙂

Ich aufgeführt habe, das ganze Programm unter wie kann es einen Grund für die Ausnahme, die ich offensichtlich nicht kenne. Wie Sie sehen können ich bin mit der neuen mapreduce api (org.apache.hadoop.mapreduce.*).

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * Counts the words in several text files.
 */
public class WordCount {
  /**
   * Maps lines of text to (word, amount) pairs.
   */
  public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {

    private Text word = new Text();
    private IntWritable amount = new IntWritable(1);

    @Override
    protected void map(LongWritable key, Text value, Context context)
        throws IOException, InterruptedException {
      String textLine = value.toString();

      StringTokenizer tokenizer = new StringTokenizer(textLine);
      while (tokenizer.hasMoreElements()) {
        word.set((String) tokenizer.nextElement());

        context.write(word, amount);
      }
    }

  }

  /**
   * Reduces (word, amount) pairs to (amount, word) list.
   */
  public static class Reduce extends
      Reducer<Text, IntWritable, IntWritable, Text> {

    private IntWritable amount = new IntWritable();
    private int sum;

    @Override
    protected void reduce(Text key, Iterable<IntWritable> valueList,
        Context context) throws IOException, InterruptedException {
      sum = 0;

      for (IntWritable value : valueList) {
        sum += value.get();
      }

      amount.set(sum);
      context.write(amount, key);
    }
  }

  public static class IntComparator extends WritableComparator {
    public IntComparator() {
      super(IntWritable.class);
    }

    private Integer int1;
    private Integer int2;

    @Override
    public int compare(byte[] raw1, int offset1, int length1, byte[] raw2,
        int offset2, int length2) {
      int1 = ByteBuffer.wrap(raw1, offset1, length1).getInt();
      int2 = ByteBuffer.wrap(raw2, offset2, length2).getInt();

      return int2.compareTo(int1);
    }

  }

  /**
   * Job configuration.
   * 
   * @param args
   * @throws IOException
   * @throws ClassNotFoundException
   * @throws InterruptedException
   */
  public static void main(String[] args) throws IOException,
      ClassNotFoundException, InterruptedException {
    Path inputPath = new Path(args[0]);
    Path outputPath = new Path(args[1]);

    Configuration configuration = new Configuration();
    configuration.addResource(new Path("/etc/hadoop/conf/core-site.xml"));
    Job job = new Job(configuration);
    job.setJobName("WordCount");
    job.setJarByClass(WordCount.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);

    job.setOutputKeyClass(IntWritable.class);
    job.setOutputValueClass(Text.class);

    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);

    job.setSortComparatorClass(IntComparator.class);

    FileInputFormat.setInputPaths(job, inputPath);

    FileSystem.get(configuration).delete(outputPath, true);
    FileOutputFormat.setOutputPath(job, outputPath);

    job.waitForCompletion(true);
  }
}
Würde es Ihnen etwas ausmachen zu erklären, was der natural order of the integer value ein string ist? Derzeit Sie sind versucht zu vergleichen, die ersten 4 bytes aus einem string (wtf?).
Nicht die Zeichenfolge, die IntWritable, das ist der Schlüssel.
Aber Ihr Schlüssel ist text, es gibt keine int sortiert werden, überall.
Die Ausgabe der reducer verwendet IntWritable als key-Typ. Sehen context.write(amount, key); mit Menge wird eine IntWritable.
Die Sortierung geschieht nicht nach dem Druckminderer.

InformationsquelleAutor fyaa | 2013-06-16

Schreibe einen Kommentar