1 http server端
package mainimport ("log""net/http""time"
)func sayHello(w http.ResponseWriter,r *http.Request){w.Write([]byte("hello world"))
}var addr = "0.0.0.0:9090"
func main() {// 创建路由器mux := http.NewServeMux()// 注册路由mux.HandleFunc("/test",sayHello)server := &http.Server{Addr: addr,WriteTimeout: time.Second * 3,Handler: mux,}log.Println("Start to serve at : ",addr)//启动服务if err := server.ListenAndServe();err !=nil{log.Fatal("Failed to start http server,err:",err)}
}
1.1 http server 源码分析
关键步骤:
注册路由
启动服务
连接处理
1.2 http server 源码走读
1.2.1 注册路由
// NewServeMux allocates and returns a new ServeMux.
func NewServeMux() *ServeMux { return new(ServeMux) }
ServeMux 结构体
type ServeMux struct {mu sync.RWMutexm map[string]muxEntryes []muxEntry // slice of entries sorted from longest to shortest.hosts bool // whether any patterns contain hostnames
}
muxEntry结构体
type muxEntry struct {h Handlerpattern string
}
Handler : 实现ServerHTTP接口的方法
type Handler interface {ServeHTTP(ResponseWriter, *Request)
}
// 注册handler
// The HandlerFunc type is an adapter to allow the use of
// ordinary functions as HTTP handlers. If f is a function
// with the appropriate signature, HandlerFunc(f) is a
// Handler that calls f.
type HandlerFunc func(ResponseWriter, *Request)// HandleFunc registers the handler function for the given pattern.
func (mux *ServeMux) HandleFunc(pattern string, handler func(ResponseWriter, *Request)) {if handler == nil {panic("http: nil handler")}mux.Handle(pattern, HandlerFunc(handler))
}
路由注册具体实现
// Handle registers the handler for the given pattern.
// If a handler already exists for pattern, Handle panics.
func (mux *ServeMux) Handle(pattern string, handler Handler) {mux.mu.Lock()defer mux.mu.Unlock()// 检查pattern 、handler是否不为空,且pattern 是否已注册过if pattern == "" {panic("http: invalid pattern")}if handler == nil {panic("http: nil handler")}if _, exist := mux.m[pattern]; exist {panic("http: multiple registrations for " + pattern)}// 初始化mux.m map[string]muxEntryif mux.m == nil {mux.m = make(map[string]muxEntry)}e := muxEntry{h: handler, pattern: pattern}// map key为pattern,value 为muxEntrymux.m[pattern] = eif pattern[len(pattern)-1] == '/' {mux.es = appendSorted(mux.es, e)}if pattern[0] != '/' {mux.hosts = true}
}
因此,注册路由实际上就是构造一个map[pattern] = muxEntry
1.2.2 启动服务
// ListenAndServe listens on the TCP network address srv.Addr and then
// calls Serve to handle requests on incoming connections.
// Accepted connections are configured to enable TCP keep-alives.
//
// If srv.Addr is blank, ":http" is used.
//
// ListenAndServe always returns a non-nil error. After Shutdown or Close,
// the returned error is ErrServerClosed.
func (srv *Server) ListenAndServe() error {if srv.shuttingDown() {return ErrServerClosed}addr := srv.Addrif addr == "" {addr = ":http"}ln, err := net.Listen("tcp", addr)if err != nil {return err}return srv.Serve(tcpKeepAliveListener{ln.(*net.TCPListener)})
}
func (srv *Server) Serve(l net.Listener) error {......l = &onceCloseListener{Listener: l}defer l.Close()......var tempDelay time.Duration // how long to sleep on accept failurefor {// 建立socket 拿到connrw, e := l.Accept()if e != nil {// 接收关闭服务器信号select {case <-srv.getDoneChan():return ErrServerCloseddefault:}......}tempDelay = 0// 包装一个conn结构体c := srv.newConn(rw)c.setState(c.rwc, StateNew) // before Serve can returngo c.serve(ctx)}
}
conn 结构体
// A conn represents the server side of an HTTP connection.
type conn struct {server *Serverrwc net.Conn remoteAddr stringr *connReaderbufr *bufio.Readerbufw *bufio.Writermu sync.Mutex}
1.2.3 处理连接
// ServeHTTP dispatches the request to the handler whose
// pattern most closely matches the request URL.
func (mux *ServeMux) ServeHTTP(w ResponseWriter, r *Request) {if r.RequestURI == "*" {if r.ProtoAtLeast(1, 1) {w.Header().Set("Connection", "close")}w.WriteHeader(StatusBadRequest)return}h, _ := mux.Handler(r)h.ServeHTTP(w, r)
}
func (mux *ServeMux) Handler(r *Request) (h Handler, pattern string) {// CONNECT requests are not canonicalized.if r.Method == "CONNECT" {// If r.URL.Path is /tree and its handler is not registered,// the /tree -> /tree/ redirect applies to CONNECT requests// but the path canonicalization does not.if u, ok := mux.redirectToPathSlash(r.URL.Host, r.URL.Path, r.URL); ok {return RedirectHandler(u.String(), StatusMovedPermanently), u.Path}return mux.handler(r.Host, r.URL.Path)}// All other requests have any port stripped and path cleaned// before passing to mux.handler.host := stripHostPort(r.Host)path := cleanPath(r.URL.Path)// If the given path is /tree and its handler is not registered,// redirect for /tree/.if u, ok := mux.redirectToPathSlash(host, path, r.URL); ok {return RedirectHandler(u.String(), StatusMovedPermanently), u.Path}if path != r.URL.Path {_, pattern = mux.handler(host, path)url := *r.URLurl.Path = pathreturn RedirectHandler(url.String(), StatusMovedPermanently), pattern}return mux.handler(host, r.URL.Path)
}
// handler is the main implementation of Handler.
// The path is known to be in canonical form, except for CONNECT methods.
func (mux *ServeMux) handler(host, path string) (h Handler, pattern string) {mux.mu.RLock()defer mux.mu.RUnlock()// Host-specific pattern takes precedence over generic onesif mux.hosts {h, pattern = mux.match(host + path)}if h == nil {h, pattern = mux.match(path)}if h == nil {h, pattern = NotFoundHandler(), ""}return
}
match 第一步注册路由时构建的map
// Find a handler on a handler map given a path string.
// Most-specific (longest) pattern wins.
func (mux *ServeMux) match(path string) (h Handler, pattern string) {// Check for exact match first.v, ok := mux.m[path]if ok {return v.h, v.pattern}// Check for longest valid match. mux.es contains all patterns// that end in / sorted from longest to shortest.for _, e := range mux.es {if strings.HasPrefix(path, e.pattern) {return e.h, e.pattern}}return nil, ""
}
主要逻辑如下:
mux.ServerHttp -> mux.Handler(r) -> mux.handler(host, r.URL.Path) -> mux.match()
获取到h Handler 之后一步步返回,最终调用h.ServeHTTP(w,r) ,也就相当于调用了第一步注册路由时指定的mux 中的ServeHTTP方法
2. http client端
package mainimport ("fmt""io/ioutil""log""net""net/http""time"
)func main() {// 创建连接池transport := &http.Transport{DialContext: (&net.Dialer{Timeout: time.Second * 30, // 连接超时时间KeepAlive: time.Second * 30, // 探活时间}).DialContext,MaxIdleConns: 100, //最大空闲连接IdleConnTimeout: time.Second * 90, //空闲超时时间TLSHandshakeTimeout: time.Second * 10, //tls握手超时时间ExpectContinueTimeout: time.Second * 1, //100-continue状态码超时时间}// 创建客户端client := &http.Client{Transport:transport, Timeout: time.Second * 30, //请求超时时间}var url = "http://127.0.0.1:9090/test"// 请求数据resp,err := client.Get(url)defer resp.Body.Close()if err != nil {log.Fatalf("GET %s failed.",url)}// 读取并打印内容data,err := ioutil.ReadAll(resp.Body)if err != nil {log.Fatal("Read content failed,err:",err)}fmt.Println("Data : ",string(data))}
2.1 http client 源码分析
主要结构体
type Client struct {// Transport specifies the mechanism by which individual// HTTP requests are made.// If nil, DefaultTransport is used.Transport RoundTripperCheckRedirect func(req *Request, via []*Request) errorJar CookieJar// Timeout specifies a time limit for requests made by this// Client. The timeout includes connection time, any// redirects, and reading the response body. The timer remains// running after Get, Head, Post, or Do return and will// interrupt reading of the Response.Body.//// A Timeout of zero means no timeout.Timeout time.Duration
}
// RoundTripper is an interface representing the ability to execute a
// single HTTP transaction, obtaining the Response for a given Request.
//
// A RoundTripper must be safe for concurrent use by multiple
// goroutines.type RoundTripper interface {// RoundTrip executes a single HTTP transaction, returning// a Response for the provided Request.RoundTrip(*Request) (*Response, error)
}
type Request struct {Method stringURL *url.URLProto string // "HTTP/1.0"Header HeaderBody io.ReadCloserGetBody func() (io.ReadCloser, error)ContentLength int64TransferEncoding []stringClose boolHost stringForm url.ValuesPostForm url.ValuesMultipartForm *multipart.FormRemoteAddr stringRequestURI stringTLS *tls.ConnectionStateCancel <-chan struct{}Response *Responsectx context.Context
}
主要处理流程
2.2 http client 源码走读
2.2.1 client.Get
func (c *Client) Get(url string) (resp *Response, err error) {req, err := NewRequest("GET", url, nil)if err != nil {return nil, err}return c.Do(req)
}
func (c *Client) Do(req *Request) (*Response, error) {return c.do(req)
}
func (c *Client) do(req *Request) (retres *Response, reterr error) {......var (deadline = c.deadline()reqs []*Requestresp *ResponsecopyHeaders = c.makeHeadersCopier(req)reqBodyClosed = false // have we closed the current req.Body?// Redirect behavior:redirectMethod stringincludeBody bool)reqs = append(reqs, req)var err errorvar didTimeout func() boolif resp, didTimeout, err = c.send(req, deadline); err != nil {// c.send() always closes req.Body......}......
}
// didTimeout is non-nil only if err != nil.
func (c *Client) send(req *Request, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {......resp, didTimeout, err = send(req, c.transport(), deadline)......return resp, nil, nil
}
// send issues an HTTP request.
// Caller should close resp.Body when done reading from it.
func send(ireq *Request, rt RoundTripper, deadline time.Time) (resp *Response, didTimeout func() bool, err error) {req := ireq // req is either the original request, or a modified fork......resp, err = rt.RoundTrip(req)......return resp, nil, nil
}
2.2.2 连接处理roundTrip()
2.2.2.1 主要逻辑
// roundTrip implements a RoundTripper over HTTP.
func (t *Transport) roundTrip(req *Request) (*Response, error) {......for {// Get the cached or newly-created connection to either the// host (for http or https), the http proxy, or the http proxy// pre-CONNECTed to https server. In any case, we'll be ready// to send it requests.pconn, err := t.getConn(treq, cm)......var resp *Responseif pconn.alt != nil {// HTTP/2 path.t.decHostConnCount(cm.key()) // don't count cached http2 conns toward conns per hostt.setReqCanceler(req, nil) // not cancelable with CancelRequestresp, err = pconn.alt.RoundTrip(req)} else {resp, err = pconn.roundTrip(treq) // 拿到持久化连接后调用该方法}......}
}
获取空闲连接
func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (*persistConn, error) {// 获取空闲连接if pc, idleSince := t.getIdleConn(cm); pc != nil {// set request canceler to some non-nil function so we// can detect whether it was cleared between now and when// we enter roundTript.setReqCanceler(req, func(error) {})return pc, nil}......handlePendingDial := func() {testHookPrePendingDial()go func() {if v := <-dialc; v.err == nil {t.putOrCloseIdleConn(v.pc)} else {t.decHostConnCount(cmKey)}testHookPostPendingDial()}()}cancelc := make(chan error, 1)t.setReqCanceler(req, func(err error) { cancelc <- err })if t.MaxConnsPerHost > 0 {select {case <-t.incHostConnCount(cmKey): // 确认每个主机是否有限制case pc := <-t.getIdleConnCh(cm): // 等待释放的空闲连接if trace != nil && trace.GotConn != nil {trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()})}return pc, nilcase <-req.Cancel: // 监听取消事件return nil, errRequestCanceledConncase <-req.Context().Done():return nil, req.Context().Err()case err := <-cancelc:if err == errRequestCanceled {err = errRequestCanceledConn}return nil, err}}// 如果获取不到空闲连接,异步创建连接go func() {pc, err := t.dialConn(ctx, cm)dialc <- dialRes{pc, err}}()idleConnCh := t.getIdleConnCh(cm)select {// 新连接创建成功case v := <-dialc:// Our dial finished.if v.pc != nil {if trace != nil && trace.GotConn != nil && v.pc.alt == nil {trace.GotConn(httptrace.GotConnInfo{Conn: v.pc.conn})}return v.pc, nil}// Our dial failed. See why to return a nicer error// value.t.decHostConnCount(cmKey)select {case <-req.Cancel:// It was an error due to cancelation, so prioritize that// error value. (Issue 16049)return nil, errRequestCanceledConncase <-req.Context().Done():return nil, req.Context().Err()case err := <-cancelc:if err == errRequestCanceled {err = errRequestCanceledConn}return nil, errdefault:// It wasn't an error due to cancelation, so// return the original error message:return nil, v.err}// 获取到空闲连接case pc := <-idleConnCh:// Another request finished first and its net.Conn// became available before our dial. Or somebody// else's dial that they didn't use.// But our dial is still going, so give it away// when it finishes:handlePendingDial()if trace != nil && trace.GotConn != nil {trace.GotConn(httptrace.GotConnInfo{Conn: pc.conn, Reused: pc.isReused()})}return pc, nilcase <-req.Cancel:handlePendingDial()return nil, errRequestCanceledConncase <-req.Context().Done():handlePendingDial()return nil, req.Context().Err()case err := <-cancelc:handlePendingDial()if err == errRequestCanceled {err = errRequestCanceledConn}return nil, err}
}
func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (*persistConn, error) {pconn := &persistConn{t: t,cacheKey: cm.key(),reqch: make(chan requestAndChan, 1),writech: make(chan writeRequest, 1),closech: make(chan struct{}),writeErrCh: make(chan error, 1),writeLoopDone: make(chan struct{}),} ...... pconn.br = bufio.NewReader(pconn)pconn.bw = bufio.NewWriter(persistConnWriter{pconn})go pconn.readLoop() // 监听pc.reqch rc := <-pc.reqchgo pconn.writeLoop() // 监听writech wr := <-pc.writechreturn pconn, nil
}
func (t *Transport) getIdleConn(cm connectMethod) (pconn *persistConn, idleSince time.Time) {key := cm.key()t.idleMu.Lock()defer t.idleMu.Unlock()for {pconns, ok := t.idleConn[key]if !ok {return nil, time.Time{}}// 只有一个pconn 直接返回if len(pconns) == 1 {pconn = pconns[0]delete(t.idleConn, key)} else {// LRU 算法获取最后使用的一个pconnpconn = pconns[len(pconns)-1]t.idleConn[key] = pconns[:len(pconns)-1]}t.idleLRU.remove(pconn)......return pconn, pconn.idleAt}
}
type persistConn struct {t *TransportcacheKey connectMethodKeyconn net.Connbr *bufio.Reader // from connbw *bufio.Writer // to connreqch chan requestAndChan // 由 roundTrip写入; 由 readLoop 读取writech chan writeRequest // 由 roundTrip写入;由 writeLoop 读取mu sync.Mutex }
func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) {......// Write the request concurrently with waiting for a response,// in case the server decides to reply before reading our full// request body.startBytesWritten := pc.nwritewriteErrCh := make(chan error, 1)pc.writech <- writeRequest{req, writeErrCh, continueCh} //将请求写入到writechresc := make(chan responseAndError)pc.reqch <- requestAndChan{ // 写pc.reqchreq: req.Request,ch: resc,addedGzip: requestedGzip,continueCh: continueCh,callerGone: gone,}var respHeaderTimer <-chan time.TimecancelChan := req.Request.CancelctxDoneChan := req.Context().Done()for {testHookWaitResLoop()select {case err := <-writeErrCh:......case <-pc.closech:......case <-respHeaderTimer:......case <-cancelChan:......case <-ctxDoneChan:......}}
}