Featured image of post MIT 6.5840 分布式系统

MIT 6.5840 分布式系统

著名的分布式系统课程

前言

这是一门由MIT的Robert Morris主讲的分布式系统课程,课程主页链接,课程视频链接

翻译版

课程内容包括Golang的学习、分布式系统研究动机、一致性协议算法、著名分布式系统软件论文讲解和一致性算法 (Raft算法)的实现机制及实验。

是一门不可多得的实践性质比较强的课程。

本文主要记录自己做lab时候的一些心得体会,以及对课程的一些理解。

Lab1:MapReduce

论文:MapReduce: Simplified Data Processing on Large Clusters

Lab链接:Lab1

设计思路

MapReduce是一个分布式计算框架,它的设计思路是将计算任务分为两个阶段:Map阶段和Reduce阶段。

这是MapReduce的一个简单的示意图:

工作流程示意图

这是论文中讲解MapReduce的流程图,基本上解释清楚了。

输入数据以文件形式进入系统。一些进程运行map任务,拆分了原任务,产生了一些中间体,这些中间体可能以键值对形式存在。一些进程运行了reduce任务,利用中间体产生最终输出。master进程用于分配任务,调整各个worker进程。

输入数据能够产生中间体,这说明原任务是可拆的,也就才有了写成分布式的可能性。若原问题不是可拆的,MapReduce也就无从谈起。

中间体应均匀地分配给各个reduce任务,每个reduce任务整合这些中间体,令中间体个数减少,直至无法再减少,从中整合出最终结果。

输入数据以什么形式进入系统,原任务应如何拆分,中间体如何保存和传输,master和worker之间如何通信和调度,中间体如何转化为最终输出。这些都是设计的考量,没有一定之规。

Lab要求我们做一个对txt文件里的wordcount,那么输入数据就是txt文件,原任务就是对txt文件里的单词进行统计,中间体就是单词和对应的出现次数,最终输出就是单词和对应的出现次数。

示意图如下:

MapReduce WordCount

实现细节

queue.go

首先定义一堆结构体(其实也不是首先,在编写过程中会不断地修改和更新结构体,不过在实现一个功能之前必须理清对应的数据结构):

// queue.go
type listNode struct {
	data interface{} // data can be any type
	next *listNode
	prev *listNode
}

type LinkedList struct {
	head  listNode
	count int
}

type BlockQueue struct {
	list *LinkedList
	cond *sync.Cond
}

由于篇幅限制这里暂时省略了对应的方法,不过重点也不是方法就是了

第一个结构体是链表的节点,第二个结构体是链表,第三个结构体是阻塞队列,阻塞队列是用链表实现的,链表的每个节点都是一个interface{}类型的数据,这样就可以存储任意类型的数据了。

同时阻塞队列还有一个条件变量,用于实现阻塞队列的阻塞功能。

mapset.go

type MapSet struct {
	mapbool map[interface{}]bool
	count   int
}

这个结构体的作用是用于存储键值对,键值对的键是任意类型的,值是bool类型的,这样就可以实现set的功能了。

worker.go

这部分就是重点了,我们这里先不直接抛出代码,先思考一下worker的工作流程。

worker的工作流程如下:

  1. 从master获取任务

  2. 执行任务

  3. 将任务执行结果发送给master

  4. 重复1-3

  5. 如果有空闲的worker,master会将任务分配给空闲的worker(才能体现多线程和分布式的优点)这一步暂时取名为Join吧

在获取任务之前,首先应该进行一些初始化,通过下面两个函数,worker能够确定自己在map阶段和reduce阶段的任务是什么,以及自己的id是多少。

func ihash(key string) int {
	h := fnv.New32a()
	h.Write([]byte(key))
	return int(h.Sum32() & 0x7fffffff)
}

func keyReduceIndex(key string, nReduce int) int {
	// use ihash(key) % NReduce to choose the reduce
	return ihash(key) % nReduce
}

然后我们创造一个worker对象,我们接下来的操作都是对这个对象进行操作。

type Aworker struct {
	// a struct for worker
	mapf    func(string, string) []KeyValue
	reducef func(string, []string) string
	// true: Map
	// false: Reduce
	MapOrReduce bool
	//exit if true
	DoneFlag bool

	WorkerId int
}

初始化完成后,worker开始向coordinator发送请求,获取任务。

func (worker *Aworker) askMapTask() *MapTaskReply {
	/* ask for a map task then return the reply */
	args := MapTaskArgs{
		WorkerId: worker.WorkerId,
	}
	reply := MapTaskReply{}

	worker.logPrintf("Asking for a map task...\n")
	call("Coordinator.GiveMapTask", &args, &reply)

	// obtain a worker id
	worker.WorkerId = reply.WorkerId
	worker.logPrintf("Got a map task, filename: %s, fileId: %v", reply.FileName, reply.FileId)

	if reply.FileId == -1 {
		// refused to give a task
		if reply.DoneFlag {
			worker.logPrintf("No more map tasks, switching to reduce tasks...\n")
			return nil
		} else {
			worker.logPrintf("No map tasks available, waiting...\n")
			return &reply
		}
	}
	worker.logPrintf("got a map task, filename: %s, fileId: %v\n", reply.FileName, reply.FileId)
	return &reply
}

这个函数比较简单,就是向coordinator发送请求,获取任务,如果没有任务就等待,如果有任务就返回任务。没有什么特别需要注意的。

如果coordinator发现所有任务都在进行但是还有空闲的worker,那么就会将任务分配给空闲的worker,这个过程就是Join过程。

func (worker *Aworker) joinMapTask(fileId int) {
	// notify coordinator that map task is done, the worker can join another map task
	args := MapTaskJoinArgs{
		FileId:   fileId,
		WorkerId: worker.WorkerId,
	}

	reply := MapTaskJoinReply{}
	worker.logPrintf("Joining map task...\n")
	call("Coordinator.JoinMapTask", &args, &reply)
	if reply.Accepted {
		worker.logPrintf("\033[33mAccepted\033[0m to join map task!\n")
	} else {
		worker.logPrintf("Failed to join map task!\n")
	}
}

可以看到,上面几个函数都是通过rpc调用coordinator的函数来实现的。

Licensed under CC BY-NC-SA 4.0
最后更新于 Aug 16, 2023 18:23 CST