#1 为什么需要 mapreduce?
在实际的业务场景中我们常常需要从不同的 rpc 服务中获取相应属性来组装成复杂对象。
比如要查询商品详情:
商品服务-查询商品属性
库存服务-查询库存属性
价格服务-查询价格属性
营销服务-查询营销属性
如果是串行调用的话响应时间会随着 rpc 调用次数呈线性增长,所以我们要优化性能一般会将串行改并行。
简单的场景下使用 waitGroup 也能够满足需求,但是如果我们需要对 rpc 调用返回的数据进行校验、数据加工转换、数据汇总呢?继续使用 waitGroup 就有点力不从心了,go 的官方库中并没有这种工具(java 中提供了 CompleteFuture),go-zero 作者依据 mapReduce 架构思想实现了进程内的数据批处理 mapReduce 并发工具类。
#2 核心思想
数据生产 generate
数据加工 mapper
数据聚合 reducer
其中数据生产是不可或缺的阶段,数据加工、数据聚合是可选阶段,数据生产与加工支持并发调用,数据聚合基本属于纯内存操作单协程即可。不同阶段的数据处理由不同的 goroutine 执行,不同 goroutine 之间通过 channel 通信,通过 goroutine 中监听一个全局的结束channel 和调用方提供的 ctx 即能实现整个流程的随时终止 。
#3 快速入门
首先安装 mapreduce.
目前维护了两个版本 v1, v2,其中 v2 为泛型版本.
go get github.com/kevwan/mapreduce
# go get github.com/kevwan/mapreduce/v2
官方提供的一个求平方和的简单例子如下:
package mainimport ("fmt""log""github.com/kevwan/mapreduce"
)func main() {val, err := mapreduce.MapReduce(func(source chan<- interface{}) {// generatorfor i := 0; i < 10; i++ {source <- i}}, func(item interface{}, writer mapreduce.Writer, cancel func(error)) {// mapperi := item.(int)writer.Write(i * i)}, func(pipe <-chan interface{}, writer mapreduce.Writer, cancel func(error)) {// reducervar sum intfor i := range pipe {sum += i.(int)}writer.Write(sum)})if err != nil {log.Fatal(err)}fmt.Println("result:", val)
}
使用起来简单方便, 重要的是源码只有几百行, 可以看看源码作者是如何把 goroutine 与 channel 结合的这么精妙, 另外也可以根据自己的业务需要改进代码.
#4 参考资料
https://github.com/kevwan/mapreduce
《酷Go推荐》招募:
各位Gopher同学,最近我们社区打算推出一个类似GoCN每日新闻的新栏目《酷Go推荐》,主要是每周推荐一个库或者好的项目,然后写一点这个库使用方法或者优点之类的,这样可以真正的帮助到大家能够学习到
新的库,并且知道怎么用。
大概规则和每日新闻类似,如果报名人多的话每个人一个月轮到一次,欢迎大家报名!戳「阅读原文」,即可报名
扫码也可以加入 GoCN 的大家族哟~