一、MapReduce工作流程概述
MapReduce是用于分布式数据处理的编程模型。它采用了划分-映射-合并的思想,将大型数据集分成小块,由多个计算节点并行处理,并将小块结果合并成一个最终结果。
MapReduce的工作流程包括两个阶段:Map(映射)和Reduce(合并)。
二、Map阶段详解
在(MapReduce中,映射)阶段,首先需要对输入数据进行划分,将数据划分为若干份较小的数据块,每个数据块交由一个Map任务处理。Map任务将数据块转化为一系列键值对,并输出为一个新的键值对序列。
// Map示例代码 public static class Map extends Mapper<Object, Text, Text, IntWritable>{ private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { word.set(itr.nextToken()); context.write(word, one); } } }
三、Shuffle和排序详解
Shuffle是MapReduce的一个重要步骤,它负责将Map输出的键值对按照键进行分组,将同一组内的记录发给同一个Reduce任务进行处理。
Shuffle完成后,Reduce任务接收到的记录已经按照键值进行了分组,只需要对每个键值组进行合并和处理即可。
MapReduce默认的排序方式是根据键值对的键对记录进行排序。如果需要自定义排序方式,可以实现WritableComarable接口,并重写compareTo方法。
四、Reduce阶段详解
Reduce阶段的处理对象是Map阶段输出的键值对序列,Reduce任务将同一组内的记录进行合并,形成一个更加小的序列,直至处理完所有的记录。
// Reduce示例代码 public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
五、Combiner优化
为了减少Reduce任务的负担,在Map输出的键值对序列传输到Reduce之前,可以在Map本地进行一些合并操作。这种方式称之为Combiner。
Combiner可大大减少Reduce任务所需要处理的数据量,从而提高整个MapReduce任务的效率。
// Combiner示例代码 public static class Reduce extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } } public static class Comb extends Reducer<Text,IntWritable,Text,IntWritable> { private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } result.set(sum); context.write(key, result); } }
六、MapReduce作业提交
MapReduce作业提交有两种方式:命令行和代码。其中命令行方式为Hadoop自带的hadoop命令,代码方式需要先创建一个配置对象,指定Hadoop集群地址、作业名等,然后将MapReduce任务的输入输出路径和类名封装到一个Job对象中,最后通过job.waitForCompletion方法提交作业。
// MapReduce作业提交示例代码 Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://localhost:9000"); Job job = new Job(conf, "word count"); job.setJarByClass(WordCount.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); job.setMapperClass(Map.class); job.setCombinerClass(Combiner.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1);