一步一步完成 MIT-6.824-Lab1 : MapReduce 之二
GitHub代码仓库:Mit-6.824-Lab1-MapReduce
接上文
一步一步完成MIT-6.824-Lab1:MapReduce 之一
步骤三
目标
被分配了 map 任务的 worker, 读取对应的 input 中的内容。通过 Map 函数,完成对输入内容的解析。解析的结果是一系列的 key/value 对。这些 key/value 对被称为中间值,被暂存在内存中。
我们的实现
我们目前未完善的
// master.go
Master 的 Struct 还未完全完善
MyCallHandler() 还未完全完善
完善 worker 与 master 的通信数据格式
接下来我们在 rpc.go
完善 worker 与 master 之间的数据通讯格式。
首先,我们列举出可能的消息类型,比如:请求一个任务,完成了一个任务等等。
// rpc.go// Add your RPC definitions here.
const (MsgForTask = iota // ask a taskMsgForInterFileLoc // send intermediate files' location to masterMsgForFinishMap // finish a map taskMsgForFinishReduce // finish a reduce task
)
如上,类型的用途见注释。
接下来,定义 RPC 发送消息的 struct 和 接受回复的 struct。 按照我们的上面代码中的消息类型, 我们吧发送消息的 struct 写成两类, 一类单独用来传中间文件的位置给master。
// Add your RPC definitions here.type MyArgs struct {
MessageType intMessageCnt string
}// send intermediate files' filename to master
type MyIntermediateFile struct {
MessageType intMessageCnt stringNReduceType int
}
MyIntermediateFile
专用于发送中间文件的位置到master。即消息类型为MsgForInterFileLoc
的使用 MyIntermediateFile。其他消息类型使用 MyArgs
。注意:其中NReduceType字段是值经过我们自定义的分割函数后,得到了分割后的intermediate 文件交由哪类 reduce 任务的编号。关于该partation函数后文介绍。
然后定义 reply 的struct:
type MyReply struct {
Filename string // get a filenameMapNumAllocated intNReduce intReduceNumAllocated int TaskType string // refer a task type : "map" or "reduce"ReduceFileList []string // File list about
}
所有 RPC 请求的 reply 均使用该类型的 reply。
struct 中包括: 被分配的任务的类型,map 任务的话,FIlename字段装载input文件名, reduce 任务的话, ReduceFileList 字段装载文件名的list。 MapNumAllocated 代表 map 任务被分配的任务的编号。 ReduceNumAllocated 代表 reduce 任务被分配的任务的编号。NReduce 字段代表总 reduce 任务数。
完善 master 结构体和 MyCallHandler
定义了 worker 与 master 之间的消息格式, 接下来,我们需要再完善一下 Master 结构体和 MyCallHandler。
我们注意到,我们的消息中包括中间值的存放位置,这也是 worker 发送给 master 的。所以,我们需要在 master 中对这些位置做记录:
// master.go ---- Master struct
// Master struct
type Master struct {
// Your definitions here.AllFilesName map[string]intMapTaskNumCount intNReduce int // n reduce task// InterFIlename [][]string // store location of intermediate filesMapFinished boolReduceTaskStatus map[int]int // about reduce tasks' statusReduceFinished bool // Finish the reduce taskRWLock *sync.RWMutex
}
我们在 MakeMaster 中初始化。
// master.go ----- func MakeMaster()// Your code here
m.InterFIlename = make([][]string, m.NReduce)
然后,完善 MyCallHandler()。我们的消息类型还有 MsgForFinishMap
; MsgForFinishReduce
; MsgForInterFileLoc
这些,我们也需要对这些进行处理:
// master.go ----- func MyCallHandler()switch(msgType) {
case MsgForTask:select {
case filename := <- maptasks:// allocate map taskreply.Filename = filenamereply.MapNumAllocated = m.MapTaskNumCountreply.NReduce = m.NReducereply.TaskType = "map"m.RWLock.Lock()m.AllFilesName[filename] = Allocatedm.MapTaskNumCount++m.RWLock.Unlock()go m.timerForWorker("map",filename)return nilcase reduceNum := <- reducetasks:// allocate reduce taskreply.TaskType = "reduce"reply.ReduceFileList = m.InterFIlename[reduceNum]reply.NReduce = m.NReducereply.ReduceNumAllocated = reduceNumm.RWLock.Lock()m.ReduceTaskStatus[reduceNum] = Allocatedm.RWLock.Unlock()go m.timerForWorker("reduce", strconv.Itoa(reduceNum))return nil}
case MsgForFinishMap:// finish a map taskm.RWLock.Lock()defer m.RWLock.Unlock()m.AllFilesName[args.MessageCnt] = Finished // set status as finish
case MsgForFinishReduce:// finish a reduce task index, _ := strconv.Atoi(args.MessageCnt)m.RWLock.Lock()defer m.RWLock.Unlock()m.ReduceTaskStatus[index] = Finished // set status as finish
}
如上, master 里判断 args里的消息类型。 如果是 MsgForTask
的话,就向 worker 传一个 task。该 task 也是由 master 生产的。很明显,在map执行完之前,reduce任务是不会执行的。这个从我们之前的代码 generateTask 中可以看出。
如果消息类型是 MsgForFinishMap
或 MsgForFinishReduce
的话,将对应的 task 的状态设置为 Finished。
如果消息类型是 MsgForInterFileLoc
的话, 我们这里另外写一个函数,供 worker 调用,处理该消息类型:
// master.go ----- func MyInnerFileHandler// MyInnerFileHandler : intermediate files' handler
func (m *Master) MyInnerFileHandler(args *MyIntermediateFile, reply *MyReply) error {
nReduceNUm := args.NReduceType;filename := args.MessageCnt;// store themmm.InterFIlename[nReduceNUm] = append(m.InterFIlename[nReduceNUm], filename)return nil
}
通过读取参数中的 NReduceType 字段,获取该文件应该由哪个编号的 reduce 任务处理,存放在相应的位置。
另外,MyCallHandler 代码中的 go m.timerForWorker()
后文再做介绍。
接下来完成 worker 的 map 部分
worker 从 master 那里获取到 map 任务后,即开始自己的 map 任务。
首先,关于 worker 向 master 请求任务的细节:
我们观察 /main
下的 mrworker.go
, 按照官网方法运行一个 mrworker.go, 只会产生一个 worker。最开始, Worker 里的代码是这样:
// main/mrworker.go calls this function.
//
func Worker(mapf func(string, string) []KeyValue,reducef func(string, []string) string) {
// Your worker implementation here.// uncomment to send the Example RPC to the master.//CallExample()// RPC callCallForTask()
}
很明显,这样的worker会在一次call后就退出,这样肯定不对,worker应该一直运行,向master获取任务,然后执行,然后再获取,直到所有任务执行完,或者master断开。Worker的完善我们将在后面的步骤给出
另外,我们需要写好我们自己定义的与 master 交流了的函数,即发送 MsgForTask
, MsgForInnerFileLoc
等等。
// CallForTask : my RPC call function
func CallForTask(msgType int,msgCnt string) MyReply {
args := MyArgs{
}args.MessageType = msgTypeargs.MessageCnt = msgCntreply := MyReply{
}// callres := call("Master.MyCallHandler", &args, &reply)if res {
fmt.Printf("reply.type %v\n",reply.TaskType)} else {
return MyReply{
TaskType:""}}return reply
}// SendInterFiles : send intermediate files' location (filenames here) to master
func SendInterFiles(msgType int, msgCnt string, nReduceType int) MyReply {
args := MyIntermediateFile{
}args.MessageType = msgTypeargs.MessageCnt = msgCntargs.NReduceType = nReduceTyperepley := MyReply{
}res := call("Master.MyInnerFileHandler", &args, &repley)if !res {
fmt.Println("error sending intermediate files' location")}return repley
}
上述两个函数,即代表 worker 向 master 交流的函数。 CallForTask 是 MsgForTask
,MsgForFinishMap
和MsgForFinishReduce
使用。 SendInterFiles 是 MsgForInnerFileLoc
使用。两个函数中的call函数原有代码已经给出了。 CallForTask 的对于请求任务的消息的reply中会包含任务相关的信息,对于告知master 任务完成的reply消息中不会有内容,而且我们此时不会用到这个reply。 SendInterFiles 的 reply 也是没有内容的,因为我们 master 不需要此处不需要 reply。
接下来就可以进行对任务的处理了。
我们这里先完成对 map 任务的处理:
// worker.go// mapInWorker : workers do the map phase
func mapInWorker(reply *MyReply,mapf func(string, string) []KeyValue) {
file, err := os.Open(reply.Filename)defer file.Close()if err != nil {
log.Fatalf("cannot open %v", reply.Filename)}content, err := ioutil.ReadAll(file)if err != nil {
log.Fatalf("cannot read %v", reply.Filename)}// map function, get intermediate keyvalue pairskva := mapf(reply.Filename, string(content))// partition function. finish the partition taskkvas := Partition(kva, reply.NReduce)// write to temp local filefor i := 0; i<reply.NReduce; i++ {
filename := WriteToJSONFile(kvas[i], reply.MapNumAllocated, i)_ = SendInterFiles(MsgForInterFileLoc, filename, i)}_ = CallForTask(MsgForFinishMap, reply.Filename)
}
以上则是 map 任务的处理函数。读取文件内容, 然后执行事先制定好了的 plugin 中的 map 函数,生成中间值对。 此时,就需要用到我们的分割函数了,将中间值经过分割函数分割,得到用于不同 reduce 任务的中间值。然后写入到本地磁盘中,并将文件location 发送给 master .
关于分割函数,发送location 这些内容,我们接下来在步骤4讨论。
步骤4
一步一步完成MIT-6.824-Lab1:MapReduce 之三