一步一步完成 MIT-6.824-Lab1 : MapReduce 之一
GitHub代码仓库:Mit-6.824-Lab1-MapReduce
回顾上一篇博文中提到了 MapReduce 论文, 本次的 MIT 的 Lab1-MapReduce, 可以根据论文中提到的完成一个 MapReduce 系统的步骤来完成,此处大致列下步骤,当然,当然,没有列出细节部分。本次实验就根据这个步骤来一步一步的完成:
- MapReduce 库将用户成据的输入分割成 M 快,每块通常 16-64 MB。
- 存在一个 Master, 和多个 Worker。存在 M 个 map 任务和 N 个 reduce 任务等待分配。 Master 从worker中选择空闲的worker来完成任务。
- 被分配了 map 任务的 worker, 读取对应的 input 中的内容。通过 Map 函数,完成对输入内容的解析。解析的结果是一系列的 key/value 对。这些 key/value 对被称为中间值,被暂存在内存中。
- 定期的,这些内存中的中间值会经过一个用户自定义的 Pratition 分割函数,分成 N 份,(即 reduce task 的数量)。然后写到本地的磁盘中。这些文件的存放位置需要发送给 Master, 以保证能够被正确找到,进行 reduce 任务。
- 当一个 worker 被分配了 reduce 任务后,通过远程程序调用,读取 map worker 存放在其本地的中间文件。当读取了所有的中间值后,reduce worker 对中间值按照键值对的 key 进行排序。 如果中间值太大以至于内存容纳不下,那么,一个可能就需要一个外部的排序。
- reduce worker 重复迭代排序后的中间值。reduce worker接下来对中间值经过用户自定义的 Reduce 来处理。得到一个 output 文件。
- 当所有的 Map 和 Reduce 完成后,程序正常退出。
接下来,我们按照上述的7个步骤一步一步地完成 MIT-6.824 Lab1:MapReduce。
步骤一
目标
MapReduce 库将用户成据的输入分割成 M 快,每块通常 16-64 MB。
我们的实现
通过查看实验文件,我们可以的得知,本次 MapReduce 实验的待处理输入文件是 /main
下的一系列以 pg-*.txt
。阅读官网上的实验要求:
The pg-*.txt arguments to mrmaster.go are the input files; each file corresponds to one "split", and is the input to one Map task.
每一个 pg-*.txt
都已经是一个 "split"
后的,传给单个 Map task 的输入。所以说,此时我们的目标已经达成。文件中一共有8个txt待处理,所以本实验,正常情况存在 8 个 Map 任务。
步骤2
目标
存在一个 Master, 和多个 Worker。存在 M 个 map 任务和 N 个 reduce 任务等待分配。 Master 从worker中选择空闲的worker来完成任务。
我们的实现
查看项目文件,在 /main
下存在 mrmaster.go
,该 go 文件即会创建一个 Master。阅读官网说明,运行 Master的方法是:
go run mrmaster.go pg-*.txt
可以看出,mrmaster.go 接受一系列 pg-*.txt
文件作为输入。查看 mrmaster.go 内部:
m := mr.MakeMaster(os.Args[1:], 10)
在查看 package mr
下的 MakeMaster
:
// create a Master.
// main/mrmaster.go calls this function.
//
func MakeMaster(files []string, nReduce int) *Master {
m := Master{
}// Your code here.m.server()return &m
}
创建 Master
可以看出,我们运行一个 master 的时候,传递的两个参数,第一个:os.Arg[1:]
表示split
了的input文件。 第二个 10
代表 nReduce
,也就是值 Reduce 任务的数量。所以,我们此时创建了一个 master,并要求 reduce 的任务数量为 10。
接下来,我们完成 Masrer 需要的数据。
//在 master.go 文件中,完善现在这个阶段 Master struct// Master struct
type Master struct {
// Your definitions here.AllFilesName map[string]int // splited filesMapTaskNumCount int // curr map task numberNReduce int // n reduce taskMapFinished boolReduceTaskStatus map[int]int // about reduce tasks' statusReduceFinished boolRWLock *sync.RWMutex
}
以上是现在这个阶段 Master 需要的一些数据。 用一个 map[string]int
类型的数据来存放 input 文件。其中,map的 key 就代表 inuput 文件名,int
类型的 value 代表目前这个 input 文件的状态。同样,我们使用一个 map[int]int
的数据来存放 reduce 任务。 int
类型的 key 代表 reduce 任务的编号, int
类型的 value 代表目前这个编号的 reduce 任务的状态。 另外用 int
类型的数据存放 reduce 任务数 和当前 Map 任务编号。用 bool
类型的数据存放是否完成整个 Map 和 Reduce。
目前这个Master的结构还不完善,我们在后续的步骤中继续补充。
接下来,关于Map 和 Reduce 任务的状态, 我们用 int
类型来表示
// master.go// tasks' status
// UnAllocated ----> UnAllocated to any worker
// Allocated ----> be allocated to a worker
// Finished ----> worker finish the map task
const (UnAllocated = iotaAllocatedFinished
)
接下来,在函数 MakeMaster
中完成初始化:
// master.go -----func MakeMaster()// your code here
m.AllFilesName = make(map[string]int)
m.MapTaskNumCount = 0
m.NReduce = nReduce
m.MapFinished = false
m.ReduceFinished = false
m.ReduceTaskStatus = make(map[int]int)
m.RWLock = new(sync.RWMutex)
for _,v := range files {
m.AllFilesName[v] = UnAllocated
}for i := 0; i<nReduce; i++ {
m.ReduceTaskStatus[i] = UnAllocated
}
到此,我们完善了一个基本的 Master 的结构。
master 生成 Map 任务
接下来, 我们让 master 来成生 map 与 reduce 任务。
为了提高任务生产 worker 领取任务的效率,我们此时使用两个 channel 适用与 map与reduce 任务。并让一个函数与主进程并行执行,该进程用来生成任务。
// master.go// global
var maptasks chan string // chan for map task
var reducetasks chan int // chan for reduce task// generateTask : create tasks
func (m *Master) generateTask() {
for k,v := range m.AllFilesName {
if v == UnAllocated {
maptasks <- k // add task to channel}}ok := falsefor !ok {
ok = checkAllMapTask(m) // check if all map tasks have finished}m.MapFinished = truefor k,v := range m.ReduceTaskStatus {
if v == UnAllocated {
reducetasks <- k}}ok = falsefor !ok {
ok = checkAllReduceTask(m)}m.ReduceFinished = true
}// checkAllMapTask : check if all map tasks are finished
func checkAllMapTask(m *Master) bool {
m.RWLock.RLock()defer m.RWLock.RUnlock()for _,v := range m.AllFilesName {
if v != Finished {
return false}}return true
}func checkAllReduceTask(m *Master) bool {
m.RWLock.RLock()defer m.RWLock.RUnlock()for _, v := range m.ReduceTaskStatus {
if v != Finished {
return false}}return true
}
以上则是 master 用来生成 map 和 reduce 任务的相关代码。将生成的任务,写入到相应的channel中去。worker 在向 master 请求任务的时候, master 从channel中获取到任务,发送给 worker , worker执行。代码中还会一直监视任务的状态,来判断是否任务都完成了。
当然,我们还要初始化这两个 channel, 以及注册 RPC 服务, 并行运行 generateTask():
// master.go --- func server()// start a thread that listens for RPCs from worker.go
//
func (m *Master) server() {
// init channels maptasks = make(chan string, 5)reducetasks = make(chan int, 5)rpc.Register(m)rpc.HandleHTTP()// parallel run generateTask()go m.generateTask()//l, e := net.Listen("tcp", ":1234")sockname := masterSock()os.Remove(sockname)l, e := net.Listen("unix", sockname)if e != nil {
log.Fatal("listen error:", e)}go http.Serve(l, nil)
}
到现在,我们已经成功将 master 完成了 RPC 的注册,并完成了任务生成进程的运行。
接下来,初步完善任务分配部分。
完成本步骤最后的要求,对 worker 寻求任务的要求作出响应。我们通过创建一个RPC handler 来实现。 worker call这个handler, 获取任务。
// master.go// MyCallHandler func
// Your code here -- RPC handlers for the worker to call.
func (m *Master) MyCallHandler(args *MyArgs, reply *MyReply) error {
msgType := args.MessageType // worker 发送的消息的类型switch(msgType) {
case MsgForTask:select {
case filename := <- maptasks:// allocate map taskreply.Filename = filenamereply.MapNumAllocated = m.MapTaskNumCountreply.NReduce = m.NReducereply.TaskType = "map"m.RWLock.Lock()m.AllFilesName[filename] = Allocatedm.MapTaskNumCount++m.RWLock.Unlock()go m.timerForWorker("map",filename) // 定时函数,后面步骤解释return nilcase reduceNum := <- reducetasks:// allocate reduce taskreply.TaskType = "reduce"reply.ReduceFileList = m.InterFIlename[reduceNum]reply.NReduce = m.NReducereply.ReduceNumAllocated = reduceNumm.RWLock.Lock()m.ReduceTaskStatus[reduceNum] = Allocatedm.RWLock.Unlock()go m.timerForWorker("reduce", strconv.Itoa(reduceNum))return nil}return nil
}
本函数目前也不完善,后续步骤中一步一步完善函数内容。本 handler 通过处理我们自己定义的 worker 与 master 之间的消息类型, 作出不同的处理。 此处, master 检查 worker 发送的消息是否是 MsgForTask(这是一个int类型数据,后文会提到),然后向 worker 发送任务。
接下来是步骤三
一步一步完成MIT-6.824-Lab1:MapReduce 之二