- 应用实例
- 源码分析
-
- 调用链路分析
- workerPool 的 start 和 stop
- workerPool 的 Serve
- 逻辑架构图
这篇文章主要讲解fasthttp 这个http库关于协程池做的优化原理;
应用实例
fasthttp 是一个非常优秀的web server框架,github上的benchmark号称比官方的net/http快10倍以上。fasthttp用了很多黑魔法。我们今天通过源码来看一看它的goroutine pool的实现。
这里还是以一个例子开头来说明 fasthttp 的使用方法:
package mainimport ("github.com/valyala/fasthttp""fmt"
)// request handler in fasthttp style, i.e. just plain function.
func fastHTTPHandler(ctx *fasthttp.RequestCtx) {fmt.Fprintf(ctx, "Hi there! RequestURI is %q", ctx.RequestURI())
}func main() {
// pass plain function to fasthttpfasthttp.ListenAndServe(":8081", fastHTTPHandler)
}
可以看到 fasthttp 使用起来也是非常的方便。这里需要有一点注意, fasthttp 和 net/http 的 Handler 接口不一样,没有使用request 和 response style的Handler,而是使用了 context 这样一个角色,当然,context 里面肯定是包含request 和 response的。
下面我们从源码出发;理解其协程池的使用原理。
源码分析
调用链路分析
入口函数是 ListenAndServe() 下面我们从该函数出发分析整个调用链路(只保留最核心主链路代码):
func ListenAndServe(addr string, handler RequestHandler) error {s := &Server{Handler: handler,}return s.ListenAndServe(addr)
}func (s *Server) ListenAndServe(addr string) error {ln, err := net.Listen("tcp4", addr)if err != nil {return err}return s.Serve(ln)
}func (s *Server) Serve(ln net.Listener) error {......wp := &workerPool{WorkerFunc: s.serveConn,MaxWorkersCount: maxWorkersCount,Logger: s.logger(),connState: s.setState,}wp.Start()for {if c, err = acceptConn(s, ln, &lastPerIPErrorTime); err != nil {wp.Stop()if err == io.EOF {return nil}return err}if !wp.Serve(c) {s.writeFastError(c, StatusServiceUnavailable,"The connection cannot be served because Server.Concurrency limit exceeded")c.Close()s.setState(c, StateClosed)}c = nil}
}
fasthttp 首先监听基于TCP的网络端口,然后创建一个 workerPool 也就是一个协程池,协程池的代码定义与实现在 workerpool.go 中。
之后主线程就处于死循环中,调用 acceptConn() 函数接收TCP连接的请求,如果没有请求到来就阻塞。然后调用 Serve() 函数处理连接。这里有一点类似于 Reactor 的线程模型。
我们先看看 workerPool 的定义(只保留核心数据域):
type workerPool struct {// Function for serving server connections.// It must leave c unclosed.WorkerFunc ServeHandlerMaxWorkersCount intMaxIdleWorkerDuration time.DurationLogger Loggerlock sync.MutexworkersCount intmustStop boolready []*workerChanstopCh chan struct{}workerChanPool sync.PoolconnState func(net.Conn, ConnState)
}type workerChan struct {lastUseTime time.Timech chan net.Conn
}
- 成员 WorkerFunc 是每个TCP Conn 的处理函数,类似net/http包中的ServeHTTP,因为在fasthttp中所有conn的处理函数都是一样的,所以WorkerFunc不需要和传入的每个conn绑定,整个worker pool共用一个。
- workerChanPool是sync.Pool对象池。
- MaxIdleWorkerDuration是worker空闲的最长时间,超过就将worker关闭。
- workersCount是worker的数量。
- ready是可用的worker列表,也就是说所有goroutine worker是存放在一个数组里面的。这个数组模拟一个类似栈的FILO队列,也就是说我们每次使用的worker都从队列的尾部开始取。
workerPool 的 start 和 stop
wp.Start()启动worker pool。wp.Stop()是出错处理。wp.Serve?是对conn进行处理的函数。我们先看一下wp.Start() 和 wp.Stop()。
func (wp *workerPool) Start() {if wp.stopCh != nil {panic("BUG: workerPool already started")}wp.stopCh = make(chan struct{})stopCh := wp.stopChgo func() {var scratch []*workerChanfor {wp.clean(&scratch)select {case <-stopCh:returndefault:time.Sleep(wp.getMaxIdleWorkerDuration())}}}()
}func (wp *workerPool) Stop() {if wp.stopCh == nil {panic("BUG: workerPool wasn't started")}close(wp.stopCh)wp.stopCh = nilwp.lock.Lock()ready := wp.readyfor i, ch := range ready {ch.ch <- nilready[i] = nil}wp.ready = ready[:0]wp.mustStop = truewp.lock.Unlock()
}
简单来说,workerPool 启动时候开了一个 goroutine 来定期清理worker pool中过期worker(过期=未使用时间超过MaxIdleWorkerDuration)。清理操作都在wp.clean()函数中完成,这里就不继续往下看了。
wp.Stop() 负责停止worker pool的处理工作,包括关闭stopCh,清理闲置的worker列表(这时候还有一部分worker在处理conn,待其处理完成通过判断wp.mustStop来停止)。这里需要注意的一点是做资源清理的时候,对于channel需要置nil。
workerPool 的 Serve
下面看看最重要的函数 wp.Serve() 的调用链路。 wp.Serve() 负责处理主线程接收到的每一个 TCP 连接。
先看源码:
func (wp *workerPool) Serve(c net.Conn) bool { ch := wp.getCh() if ch == nil { return false } ch.ch <- c return true } func (wp *workerPool) getCh() *workerChan { var ch *workerChan createWorker := false wp.lock.Lock() ready := wp.ready n := len(ready) - 1 if n < 0 { if wp.workersCount < wp.MaxWorkersCount { createWorker = true wp.workersCount++ } } else { ch = ready[n] ready[n] = nil wp.ready = ready[:n] } wp.lock.Unlock() if ch == nil { if !createWorker { return nil } vch := wp.workerChanPool.Get() if vch == nil { vch = &workerChan{ ch: make(chan net.Conn, workerChanCap), } } ch = vch.(*workerChan) go func() { wp.workerFunc(ch) wp.workerChanPool.Put(vch) }() } return ch }
func (wp *workerPool) Serve(c net.Conn) bool {ch := wp.getCh()if ch == nil {return false}ch.ch <- creturn true
}func (wp *workerPool) getCh() *workerChan {var ch *workerChancreateWorker := falsewp.lock.Lock()ready := wp.readyn := len(ready) - 1if n < 0 {if wp.workersCount < wp.MaxWorkersCount {createWorker = truewp.workersCount++}} else {ch = ready[n]ready[n] = nilwp.ready = ready[:n]}wp.lock.Unlock()if ch == nil {if !createWorker {return nil}vch := wp.workerChanPool.Get()if vch == nil {vch = &workerChan{ch: make(chan net.Conn, workerChanCap),}}ch = vch.(*workerChan)go func() {wp.workerFunc(ch)wp.workerChanPool.Put(vch)}()}return ch
}
Serve() 最主要也最核心的就是调用 getCh() 它从worker pool的可用空闲worker列表尾部取出一个可用的worker。然后 Serve() 将待处理的连接 conn 存入该可用worker的channel。
getCh() 首先从worker pool 的worker队列的队尾获取一个可用的worker,这里有几点需要注意:
- 先从worker pool的队尾获取可用worker
- 如果没有可用的worker,就新建一个worker(比如处理第一个conn是,worker pool还是空的)
- 如果worker达到上限,则直接不处理这个连接(这个地方感觉处理不是很好,应该加入一定的策略,或者加一个Hook)
这里重点需要关注一下新建worker的过程:
- 首先从workerChanPool 里面获取一个 workerChan
- 使用 go 关键字新建一个协程来处理这个workerChan
我们来看看新建的协程是怎么处理workerChan的:直接调用 wp.workerFunc(ch) 来处理,我们跟踪进去调用链路:
func (wp *workerPool) workerFunc(ch *workerChan) {var c net.Connvar err errorfor c = range ch.ch {if c == nil {break}if err = wp.WorkerFunc(c); err != nil && err != errHijacked {........}if err == errHijacked {wp.connState(c, StateHijacked)} else {c.Close()wp.connState(c, StateClosed)}c = nilif !wp.release(ch) {break}}wp.lock.Lock()wp.workersCount--wp.lock.Unlock()
}
可以看到,当我们新建一个worker协程的时候,该协程就会进入一个死循环中。 这个死循环的逻辑非常熟悉,就是一个典型的协程池的实现逻辑。
- 从channel阻塞获取待处理的任务;
- 调用任务里面封装的函数;
因为前面的wp.Serve()函数只处理一个conn,所以for循环执行一次我们就可以把worker放到空闲队列中去等待下一次conn过来。release(ch 函数就是将workChan放回空闲队列的末尾(可算和上面呼应上了)。还有上面提到的mustStop,如果worker pool停止了,mustStop就为true,那么workerFunc就要跳出循环,也就是goroutine结束了。
逻辑架构图
从上面图中我们可以分析出:
- fasthttp 采用协程池避免创建goroutine和go runtime schedule goroutine的性能成本,提高性能
- 才用goroutine + channel 的形式解决多线程并发数据竞争问题
- 复用TCP连接,直至超时断开,提升性能