hadoop-MapReduce-shuffle
一. MapReduce 整体流程
mapreduce编程套路:数据加载读取:FileInputFormatTextInputFormatRecordReaderLineRecordReader返回:LongWritable getCurrentKey()返回:Text getCurrentValue()Mapper<>map(LongWritable key,Text value,Context context){
//获取 拆分 封装 发送context.write(k,v);}shuffle:1.分区:决定reducetask的并行度,减轻reduce压力;继承Partitioner2.排序:默认按照map输出的key字典升序;实现WritableComparator3.分组:默认按照map输出的key分组,相同key分到一组4.combiner:默认没有,优化,局部聚合组件(针对每个maptask的输出结果做聚合)贯穿整个shuffle过程中,并非某特定步骤,过程中都有用到.Reducer<>reduce(key,values,context){
context.write(k,v);}输出(文件写出):FileOutputFormatTextOutputFormatRecordWriterLineRecordWriter输出 ---> HDFS
二. Shuffle 过程详解
( 一 ) shuffle 整体
图来自于大佬博客: https://blog.csdn.net/zhongqi2513/article/details/78321664
整个过程拆分成四小部分进行操作:
-
读取数据
MRAppMaster 解析程序找到map端文件输入路径与reduce端文件输出路径
经过 InputFormat.getSplit( ), 进行逻辑切片, 确定mapTask的个数
map端真正读取文件的代码: InputFormat.getRecordReader.nextKeyValue()
读取文件数据, 执行map端逻辑后输出, 至此开始进入map端shuffle
-
map 端shuffle
-
reduce 端shuffle
-
输出到 HDFS
reduceTask 的个数与map端 输出时, 分区个数相同
? 即: int numPartition = Partition.getPartition(outKey)
有几个reduce, 就输出到 HDFS 几个文件
( 二 ) shuffle 详细
-
map 端 shuffle
-
map 端执行代码逻辑后, 输出数据首先经过分区: Partition.getPartition(outKey)
-
分区后进入内存中的环形缓冲区
环形缓冲区在mapTask这个jvm进行启动时, 就初始化好了
? 初始化一个 kvBuffer 字节数组, 临时存储数据
? 初始化一个 spillThread 线程, 执行任务
? 环形缓冲区, 默认大小100M (生产环境中会根据处理数据量进行改变)
-
当数据写入达到 环形缓冲区大小80% 时, 首先进行排序(根据分区编号以及key进行排序)然后将其刷写到磁盘中, 形成一个个spill文件
-
将一个个磁盘文件进行 Merge 合并为一个磁盘文件
-
为其创建索引文件, 记录每个分区的起始与结束偏移量
至此 map端shuffle 结束, 进入 网络传输阶段
-
-
reduce 端 shuffle
-
在有一个 maptask 执行完成后, reduceTask 就会进行拉取数据, 一边拉取一边合并
-
根据建立的索引文件, 拉取对应分区的数据首先到内存中
若拉取数据量小, 则直接保存在内存中, reduce端直接在内存中进行读取
若拉取数据量大, 则超过内存中量时, 刷写到磁盘中, 形成一个个磁盘文件, 再合并为一个, reduce 端读取该磁盘文件的数据
至此 reduce端shuffle 结束
-