MapReduce进阶(一)–框架原理
MapReduce数据流
2、MapTask并行度决定机制
数据块:Block是HDFS物理上把数据分成一块一块。
数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。
- 1)一个Job的Map阶段并行度由客户端在提交Job时的切片数决定
- 2)每一个Split切片分配一个MapTask并行实例处理
- 3)默认情况下,切片大小=BlockSize
- 4)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
3、Job提交流程源码解析

1)源码解析
2)切片机制
- (1)简单地按照文件的内容长度进行切片
- (2)切片大小,默认等于Block大小
- (3)切片时不考虑数据集整体,而是逐个针对每一个文件单独切片
3)切片大小的参数配置

5、小文件切片–CombineTextInputFormat切片机制
生成切片过程包括:虚拟存储过程和切片过程二部分。

(1)虚拟存储过程:
将输入目录下所有文件大小,依次和设置的setMaxInputSplitSize
值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2倍,此时将文件均分成2个虚拟存储块(防止出现太小切片)。
(2)切片过程:
1 2 3 4 5 6
| (a)判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。
(b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。
(c)测试举例:有4个小文件大小分别为1.7M、5.1M、3.4M以及6.8M这四个小文件,则虚拟存储之后形成6个文件块,大小分别为:1.7M,(2.55M、2.55M),3.4M以及(3.4M、3.4M) 最终会形成3个切片,大小分别为:(1.7+2.55)M,(2.55+3.4)M,(3.4+3.4)M
|

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| package cn.buildworld.mapreduce.inputformat;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import java.io.IOException;
public class WholeFileInputFormat extends FileInputFormat<Text, BytesWritable> {
@Override protected boolean isSplitable(JobContext context, Path filename) { return false; }
@Override public RecordReader<Text, BytesWritable> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new WholeFileRecordReader(); } }
|
2)自定义RecordReader–WholeFileRecordReader
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126
| package cn.buildworld.mapreduce.inputformat;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import java.io.IOException;
public class WholeFileRecordReader extends RecordReader<Text, BytesWritable> {
private boolean notRead = true; private Text key = new Text(); private BytesWritable value = new BytesWritable(); private FSDataInputStream inputStream; private Path path; private FileSplit fs;
@Override public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { fs = (FileSplit) split; path = fs.getPath(); FileSystem fileSystem = path.getFileSystem(context.getConfiguration()); inputStream = fileSystem.open(path); }
@Override public boolean nextKeyValue() throws IOException, InterruptedException {
if (notRead) { key.set(fs.getPath().toString());
byte[] buf = new byte[(int) fs.getLength()]; inputStream.read(buf); value.set(buf, 0, buf.length);
notRead = false; return true; } else {
return false; } }
@Override public Text getCurrentKey() throws IOException, InterruptedException { return key; }
@Override public BytesWritable getCurrentValue() throws IOException, InterruptedException { return value; }
@Override public float getProgress() throws IOException, InterruptedException { return notRead ? 0 : 1; }
@Override public void close() throws IOException { IOUtils.closeStream(inputStream); } }
|