介绍
MapReduce是一种编程模式,以及与之相关的用于处理和生成大数据集的实现。其运作方式可以简单概括为以下步骤:一个大的输入被分成很多个小的输入块,同时,一个分布式系统中存在的多个计算机,组成一个大的计算机集群,这些分出来的小的输入块将会被集群中的计算机来执行,由一个 master 机器来分发人物。这些用于计算的机器成为 worker。master 将这个小的输入块分配给 worker,接下来就由worker来进行计算。我们称这个计算的过程为 Map,Map 会处理输入数据并生成一组结果。整个 Map 过程会产生很多个计算结果,我们称这些计算结果为中间值 (imtermediate value)。接下来,这些中间值就会被分配到 worker 中来执行 Reduce 任务,这个过程同样也是用 master 来分配的。关于 Reduce 任务,其主要目标即是合并具有相关联的中间值(key/value对)key值的中间值对,计算出结果。
上述过程即是一个 MapReduce 任务的基本步骤。当然,过程的Map函数和Reduce函数是可由用户指定的。在 MapReduce 架构下设计的程序代码将会自动的并行运行并运行在一个分布式集群里。我们接下来要关心的则是:输入数据的划分,程序运行中机器的调度,机器计算等处理过程出错,以及管理机器之间的数据交流。在本篇笔记中,就论文中处理这些问题的方法做记录与思考。
关于编程模型
在介绍部分,我们已经提到,MapReduce的过程基本分为:输入数据->
输入数据分块->
master 分配 map 任务到 workers ->
workers 产生中间值->
master 分配 reduce 任务到 workers ->
中间值会作为参数传入到 reduce 任务中 ->
程序结束,获取最终结果。
在这个过程中,还存在着一些细节:
- 首先,关于输入数据分块这个阶段,整个 MapReduce 程序接收的输入是一个很大的数据,比如TB级别的输入。我们会将输入数据分为小块,一般大小为 16-64 MB。
- workers产生的中间值的形式为 (key/value) 格式的数据。而关于 Input 的数据的格式并没有固定,毕竟在 Map 函数中完全可以从数据中提取出 key 来形成 (key/value) 的中间值。
- workers 生成中间值之后,显然,可能存在很多具有相关联的key值的中间值对。那么,直接将这些中间值传给 Reduce 函数则会降低效率,那么,在这个时候,MapReduce 框架则会将这些具有相关联的key值的中间值组合起来,注意,这个过程仅仅是组合中间值,即将 (key/value) 的中间值组合成 (key/list(value)) 这样。然后再将这个中间值作为 Reduce 的输入传入。
- workers 在产生了中间值之后(MapReduce 框架组合中间值之后),会将中间值缓存在内存中,一段时间之后,将会记录在本地磁盘的文件系统中。这样做有很多的好处,一是方便中间值的存放,因为会产生很多的中间值,可能会出现在内存中存放不下,存放在磁盘中更利于保存,另一个好处是可以节省网络带宽。同时,在 workers 自己的机器上存储的中间值的存放位置将会发回给 master 机器。因为 master 机器也要负责分配 Reduce 任务。需要告诉执行 Reduce 任务的 workers 中间值的位置。
- 中间值 (key/list(value)) 对会经由一个迭代器传入到 Reduce 函数中。这有利于帮助我们处理那些大到内存中难以容纳的数据。
伪代码示例,此伪代码展示了使用mapreduce来计数一个文档中每个单词出现的次数:
//map函数接收 (key/value) 对作为输入,对于句子中的每一个单词,都会产生一个中间值 (key/value) 对。
map(String key, String value)://key: document name//value: document contentsfor each word w in value:EmitIntermediate(w,"1");//接下来会由 MapReduce 库来完成中间值的综合,此步与 reduce 存在差别,库完成综合时只是将具有比如说相同key值的中间值对的 value 存入到一个 list 里去,而 reduce 函数则是接收中间值输入,产生更少甚至一个的最终结果。//reduce函数接收一组由 MapReduce 库处理后的中间值作为输入,Iterator 代表迭代器,使用迭代器将 values 的 list 传入到 reduce 函数中去。
reduce(String key, Iterator values)://key: a word//values: a list of countsint result = 0;for each v in values:result += ParseInt(v);Emit(AsString(result));
继续深入实现细节
下图展示了 MapReduce 整个的运作过程:
关于文件存储,整个分布式系统中存在一个内置的分布式文件系统,用来管理存储在这些磁盘上的数据。文件系统使用复制在不可靠硬件上提供可用性和可靠性。
输入数据被分隔成为 M 个输入块,这个分隔的过程可以有不同的机器并行执行。
关于 master 和 workers 的形成,当 MapReduce 库完成对输入数据的分隔后,接下来,整个集群中的机器都会获得一份程序代码的拷贝。其中一份拷贝是特别的,它就是 master 机器,剩余的机器则都成为 workers。
当一个 reduce worker 被 master 告知中间值的存储位置时,worker 通过远程过程调用,从 map workers 的本地磁盘中获取存储的中间值。当 reduce worker 成功读取了这些中间数据后,对数据进行综合,将具有相同key值的中间值对合并,这也就是我们在上一部分提到的 MapReduce 库需要完成的工作。
每个 Reduce 函数产生一个 output 文件,最终,这些文件会加到一个最终文件中去。但通常,我们没必要将这些 output 文件合并到一个最终文件中去,而是把它们作为另一个 MapReduce 操作的输入或者是把它们用于另一个分布式应用中去。
map worker 中生成的中间值信息会传给 master,在 master 中,存储的即为中间值的位置和大小。
容错
master 会对每个worker每隔一段时间进行心跳检测,不免会存在一些worker失去连接的情况,就代表着这个 worker failed,即出错了。那么如果这个 worker 正在执行一个 map 任务怎么办呢,答案就是被标记为 failed 的worker执行结束它的任务后,比如正在执行一个 map 任务,被标记为 failed,那么这个 map 任务将在执行完成后重置为等待执行的状态,因此其他的 worker 就可以继续调用这个任务。
那么,执行 reduce 任务的 workers 如何知道一个 worker failed 并相应变化处理呢,过程是这样,一个 worker A failed后,它所执行的 map 任务由另一个 worker B 调用,执行,而所有的 reduce workers都会收到这个 map 任务被重新执行的消息,它们就会向 B 去调用中间值。
这个再执行的过程即为容错的方法之一。master 通过再执行,让被出错的 worker 执行的任务得到正确 worker 的执行,使得程序继续运行直至结束。
master出错
master 节点可以周期性地设置 checkpoint ,如果master出错了,那么一份新的程序的 copy 将从上一个检查点处的数据开始作为 master 来运行。但如果是在只有一个 master 机器的集群里面,如果 master 出错了,通常的做法都是中止 MapReduce 程序。当然,客户端也可以检查这种情况,然后重试 MapReduce 操作。
本地化
网络带宽在我们的计算环境中是相对比较稀缺的资源,所以我们需要在我们的整个程序的运行中要尽量节约网络带宽。在我们的 MapReduce 程序中,通过调用存储在本地的文件来实现节约带宽的目的。程序的输入数据,被存储在集群中的机器上的本地磁盘里面,我们的文件管理系统,将这个大的输入文件,分成多个64MB的小块,每块复制若干份(通常为3份)存储在不同的机器上。master 调用拥有相应区块输入数据复制的 worker 执行 map 任务,如果 worker 出错了,就会尝试将 map 任务调度到临近这些数据的机器上(例如与保存数据的机器位于同一网关下的机器中)。
备用执行任务
MapReduce 程序中存在着"落后者"现象,一个 worker 在完成前几个 map 或 reduce 任务是花费了不正常的较长时间是使得整个 MapReduce 耗时变长的主要原因。比如:一个 worker 的disk损坏,导致读写速度变慢;或者一个 worker 在执行 mapreduce 任务的同时,机器上还有其他任务在执行,拖慢了速度;再比如代码本身存在的 bug 造成无法缓存中间数据之类的情况等等。
那么,关于备用执行任务,当一个 MapReduce 程序在临近完成的时候,master 就会启动备用执行,将还在执行用的 map 或者 reduce 任务调度给其他的空闲的机器执行,作为这个任务的备用执行。只要原始执行机器和备用执行机器中一个完成了这个任务,那么这个任务就会被标记为完成。这个机制,仅耗费少量的计算资源,就能大大的减少 MapReduce 程序的运行时间。
技巧
关于划分函数
用户可以指定整个程序的 reduce 任务或输出文件的数量(之前提到过,最终文件是可以合并为一个文件的,但我们通常都不合并,原因在上文也有提到,故会存在 R 个最终文件)。通过划分函数,将数据按照中间值 key 划分给各个 reduce 任务。默认的划分函数是散列函数,比如:hash(key) mod R。在不同的情况下使用不同的划分函数可能更有效率。
顺序保证
在给定的划分中,中间值对是按增序排列的(经过 sort 步骤)。这使得每个划分产生一个有序的输出文件变得更容易。
合并函数
用户可指定一个合并函数,在将数据通过网络发送给 reduce 任务前进行局部合并。即是对 map 任务产生的中间值进行合并这一操作,我们在上文也已提到其方式和目的。
输入输出数据类型
MapReduce 库支持多种数据类型比如 “text” , "(key/value)对"等等。用户可以田间对其他数据类型的支持,只需要完成一个 reader 接口的实现。
略过坏记录
MapReduce提供一种可选的执行模式,当某些记录即数据在运行时会产生阻塞等情况,可以选择跳过这个记录,让程序能够继续运行下去。
在用户的程序中,安装一个信号处理器,用来捕捉错误等信号。在调用用户的 map 或 reduce 操作前,MapReduce 将用于验证的序列号保存在全局变量中,如果用户代码产生了信号,信号处理器就向 master 发送包含这个序列号的最后一步的 UDP 包发送给 master。如果 master 在某一个记录上收到了多个错误信号,那么下次执行 map 或 reduce 任务时就应该跳过这个记录。
状态信息
主节点中内置了一个 HTTP 服务器,可以将当前状态输出为网页供用户查看和使用
计数器机制
MapReduce 提供了计数器机制,可用于统计多种事件的发送次数。
工作节点上的计数器值会发送给主节点,可以附在主节点发送的心跳检测的回应中。在合并数据时,master 会忽略同一个 map 或 reduce 任务重复发来的数据,避免多次叠加。
总结
MapReduce 的原理和实现细节在上文已得到较为细致的讲解。包括 MapReduce 的运行流程,其中的数据管理,存储和流动方式。以及错误处理,即 workers 出错和 master 出错的情况。还有任务的备用执行等其他细节。
此处附上原论文连接:
《MapReduce:Simplified Data Processing on Large Cluster》