MapReduce入门
一、MapReduce概述
MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。
MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。
1、优点
- MapReduce易于编程
- 良好的扩展性
- 高容错性
- 适合海量数据的离线处理
2、缺点
- 不擅长实时计算,无法像MySQL一样,在毫秒或者秒级内返回结果。
- 不擅长流式计算,MapReduce的输入数据是静态。
- 不擅长DAG(有向图)计算,如果每个MapReduce作业的输出结果都写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。
3、MapReduce核心编程思想

- 1)分布式的运算程序往往需要分成至少2个阶段。
- 2)第一个阶段的
MapTask
并发实例,完全并行运行,互不相干。
- 3)第二个阶段的
ReduceTask
并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask
并发实例的输出。
- 4)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。
4、MapReduce进程
一个完整的MapReduce程序在分布式运行时有三类实例进程:
1)MrAppMaster
:负责整个程序的过程调度及状态协调。
2)MapTask
:负责Map阶段的整个数据处理流程
3)ReduceTask
:负责Reduce阶段的整个数据处理流程。
5、MapReduce编程规范
1) Mapper阶段

2)Reducer阶段

3)Driver阶段
用于提交封装了MapReduce程序相关运行参数的job对象。
二、WordCount案例实操
主要实现的是对文件中单词出现频率的分析,统计出单词出现的次数,这也是官方的示例教程
1、WcMapper ,负责数据的切分
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package cn.buildworld.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WcMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
private Text word = new Text(); private IntWritable one = new IntWritable(1);
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] words = line.split(" "); for (String word : words) { this.word.set(word); context.write(this.word, this.one); }
} }
|
2、WcReducer,负责数据的统计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package cn.buildworld.mapreduce.wordcount;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WcReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable total = new IntWritable();
@Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0;
for (IntWritable value : values) { sum += value.get(); }
total.set(sum); context.write(key, this.total); } }
|
3、WcDriver,代码相对固定,负责提交我们的Job
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| package cn.buildworld.mapreduce.wordcount;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class WcDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Job job = Job.getInstance(new Configuration()); job.setJarByClass(WcDriver.class); job.setMapperClass(WcMapper.class); job.setReducerClass(WcReducer.class);
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }
|
三、Hadoop序列化
不可以使用Java自带的序列化,要使用自定义bean对象实现序列化接口(Writable)
示例代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
| package cn.buildworld.mapreduce.flow;
import org.apache.hadoop.io.Writable;
import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;
public class FlowBean implements Writable { private long upFlow; private long downFlow; private long sumFlow;
public FlowBean() { }
public void set(long upFlow, long downFlow) { this.upFlow = upFlow; this.downFlow = downFlow; this.sumFlow = upFlow + downFlow; }
public long getUpFlow() { return upFlow; }
public void setUpFlow(long upFlow) { this.upFlow = upFlow; }
public long getDownFlow() { return downFlow; }
public void setDownFlow(long downFlow) { this.downFlow = downFlow; }
public long getSumFlow() { return sumFlow; }
public void setSumFlow(long sumFlow) { this.sumFlow = sumFlow; }
@Override public String toString() { return "FlowBean{" + "upFlow=" + upFlow + ", downFlow=" + downFlow + ", sumFlow=" + sumFlow + '}'; }
@Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); }
@Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } }
|