当前位置: 代码迷 >> 综合 >> MapReduce-shuffle 流程详解
  详细解决方案

MapReduce-shuffle 流程详解

热度:21   发布时间:2023-12-15 14:16:07.0

hadoop-MapReduce-shuffle

一. MapReduce 整体流程

mapreduce编程套路:数据加载读取:FileInputFormatTextInputFormatRecordReaderLineRecordReader返回:LongWritable  getCurrentKey()返回:Text          getCurrentValue()Mapper<>map(LongWritable key,Text value,Context context){
    //获取 拆分 封装 发送context.write(k,v);}shuffle:1.分区:决定reducetask的并行度,减轻reduce压力;继承Partitioner2.排序:默认按照map输出的key字典升序;实现WritableComparator3.分组:默认按照map输出的key分组,相同key分到一组4.combiner:默认没有,优化,局部聚合组件(针对每个maptask的输出结果做聚合)贯穿整个shuffle过程中,并非某特定步骤,过程中都有用到.Reducer<>reduce(key,values,context){
    context.write(k,v);}输出(文件写出):FileOutputFormatTextOutputFormatRecordWriterLineRecordWriter输出 ---> HDFS

二. Shuffle 过程详解

( 一 ) shuffle 整体

在这里插入图片描述
图来自于大佬博客: https://blog.csdn.net/zhongqi2513/article/details/78321664

整个过程拆分成四小部分进行操作:

  1. 读取数据

    MRAppMaster 解析程序找到map端文件输入路径与reduce端文件输出路径

    经过 InputFormat.getSplit( ), 进行逻辑切片, 确定mapTask的个数

    map端真正读取文件的代码: InputFormat.getRecordReader.nextKeyValue()

    读取文件数据, 执行map端逻辑后输出, 至此开始进入map端shuffle

  2. map 端shuffle

  3. reduce 端shuffle

  4. 输出到 HDFS

    reduceTask 的个数与map端 输出时, 分区个数相同

    ? 即: int numPartition = Partition.getPartition(outKey)

    有几个reduce, 就输出到 HDFS 几个文件

( 二 ) shuffle 详细
  • map 端 shuffle

    1. map 端执行代码逻辑后, 输出数据首先经过分区: Partition.getPartition(outKey)

    2. 分区后进入内存中的环形缓冲区

      环形缓冲区在mapTask这个jvm进行启动时, 就初始化好了

      ? 初始化一个 kvBuffer 字节数组, 临时存储数据

      ? 初始化一个 spillThread 线程, 执行任务

      ? 环形缓冲区, 默认大小100M (生产环境中会根据处理数据量进行改变)

    3. 当数据写入达到 环形缓冲区大小80% 时, 首先进行排序(根据分区编号以及key进行排序)然后将其刷写到磁盘中, 形成一个个spill文件

    4. 将一个个磁盘文件进行 Merge 合并为一个磁盘文件

    5. 为其创建索引文件, 记录每个分区的起始与结束偏移量

    至此 map端shuffle 结束, 进入 网络传输阶段

  • reduce 端 shuffle

    1. 在有一个 maptask 执行完成后, reduceTask 就会进行拉取数据, 一边拉取一边合并

    2. 根据建立的索引文件, 拉取对应分区的数据首先到内存中

      若拉取数据量小, 则直接保存在内存中, reduce端直接在内存中进行读取
      若拉取数据量大, 则超过内存中量时, 刷写到磁盘中, 形成一个个磁盘文件, 再合并为一个, reduce 端读取该磁盘文件的数据

    至此 reduce端shuffle 结束

  相关解决方案