本文字数:5380 字
精读时间:12 分钟
也可在 6 分钟内完成速读
作者郑伟,小米信息技术部架构组
01
前言
gRPC Name Resolver(名称解析)是 gRPC 核心功能之一,目前大部分 gRPC Name Resolver 都采用 ETCD 来实现,通过引入 ETCD Client sdk,和 ETCD Server 之间通过 gRPC 双向流的方式进行数据交互。服务端定时上报服务名称、实例数据至 ETCD 实现服务注册,客户端进行监听指定服务名称对应实例变化来实现服务发现。
基于 ETCD 实现的 Name Resolver 已有很多相关的文章,同时 github 上也有很多相关类库,本文不做赘述。
有些公司内部已有成熟的 Name Server,比如我们小米内部 SOA 平台,已稳定运行多年。所以我们没有采用直连 ETCD 的方案,而是基于该 Name Server 来做适配。下文会给大家介绍下实现原理。
02
实现自定义 Name Resolver
gRPC 支持将 DNS 作为默认的 Name System,同时也提供了一些 API 方便开发者构建和使用自定义的 Resolver。本文所有代码均基于 grpc@v1.26.0 实现。自定义 gRPC Name Resolver 源码结构大概如下所示:
整个 resolver 代码比较简单,包含三个 go 文件:resolver.go
、resolver_build.go
、dail.go
。
ns # 自定义 resolver 包名
├── dial.go # 封装了 gRPC 包的 grpc.DialContext() 方法,严格来说 dail.go 不应该放在 ns 包下,本例中这么做只是为简化包布局,方便读者理解
├── resolver.go # 实现了 gRPC resolver 包 Resolver 接口的 nsResolver
└── resolver_builder.go # 实现了 gRPC resolver 包 ResolverBuilder 接口的 nsResolverBuilder
03
定义 nsResolver
主要逻辑在 resolver.go
:
package nsimport ("context""encoding/json""fmt""strings""time""mypkg/internal/logz" // 私有日志包,基于 uber 开源的 zap 实现sdk "mypkg/internal/soa-sdk" // 私有 ns sdk 包,封装了内部 soa 平台进行服务发现的 sdk_ "google.golang.org/grpc""google.golang.org/grpc/resolver""google.golang.org/grpc/serviceconfig"
)const (// syncNSInterval 定义了从 NS 服务同步实例列表的周期syncNSInterval = 1 * time.Second
)// nsResolver 实现了 resolver.Resolver 接口
type nsResolver struct {target resolver.Targetcc resolver.ClientConnctx context.Contextcancel context.CancelFunc...
}// watcher 轮询并更新指定 CalleeService 服务的实例变化
func (r *nsResolver) watcher() {r.updateCC()ticker := time.NewTicker(syncNSInterval)for {select {// 当* nsResolver Close 时退出监听case <-r.ctx.Done():ticker.Stop()returncase <-ticker.C:// 调用* nsResolver.updagteCC() 方法,更新实例地址r.updateCC()}}
}// updateCC 更新 resolver.Resolver.ClientConn 配置
func (r *nsResolver) updateCC() {// 从 NS 服务获取指定 target 的实例列表instances, err := r.getInstances(r.target)// 如果获取实例列表失败,或者实例列表为空,则不更新 resolver 中实例列表if err != nil || len(instances.CalleeIns) == 0 {logz.Warn("[mis] error retrieving instances from Mis", logz.Any("target", r.target), logz.Error(err))return}...// 组装实例列表 []resolver.Address// resolver.Address 结构体表示 grpc server 端实例地址var newAddrs []resolver.Addressfor k := range instances.CalleeIns {newAddrs = append(newAddrs, instances.CalleeIns)}...// 更新实例列表// grpc 底层 LB 组件对每个服务端实例创建一个 subConnection。并根据设定的 LB 策略,选择合适的 subConnection 处理某次 RPC 请求。// 此处代码比较复杂,后续在 LB 相关原理文章中再做概述r.cc.UpdateState(resolver.State{Addresses: newAddrs})
}// ResolveNow 实现了 resolver.Resolver.ResolveNow 方法
func (*nsResolver) ResolveNow(o resolver.ResolveNowOption) {}// Close 实现了 resolver.Resolver.Close 方法
func (r *nsResolver) Close() {r.cancel()
}// instances 包含调用方服务名、被调方服务名、被调方实例列表等数据
type instances struct {callerService stringcalleeService stringcalleeIns []string
}// getInstances 获取指定服务所有可用的实例列表
func (r *nsResolver) getInstances(target resolver.Target) (s *instances, e error) {auths := strings.Split(target.Authority, "@")// auths[0] 为 callerService 名,target.Endpoint 为 calleeService 名// 通过自定义 sdk 从内部 NameServer 查询指定 calleeService 对应的实例列表ins, e := sdk.GetInstances(auths[0], target.Endpoint)if e != nil {return nil, e}return &instances{callerService: auths[0],calleeService: target.Endpoint,calleeIns: ins.Instances,}, nil
}
04
定义 nsResolverBuilder
ns/resolver_builder.go
构建 nsResolver 时,我们参考 google.golang.org/grpc/resolver/dns/dns_resolver.go
源码,采用 Builder 设计模式:
package nsimport ("context""fmt""google.golang.org/grpc/resolver"
)// init 将定义好的 NS Builder 注册到 resolver 包中
func init() {resolver.Register(NewBuilder())
}// NewBuilder 构造 nsResolverBuilder
func NewBuilder() resolver.Builder {return &nsResolverBuilder{}
}// nsResolverBuilder 实现了 resolver.Builder 接口,用来构造定义好的 Resolver Bulder
type nsResolverBuilder struct{}// URI 返回某个服务的统一资源描述符(URI),这个 URI 可以从 nsResolver 中查询实例列表
// URI 设计时可以遵循 RFC-3986(https://tools.ietf.org/html/rfc3986) 规范,
// 比如本例中 ns 格式为:ns://callerService:@calleeService
// 其中 ns 为协议名,callerService 为订阅方服务名(即主调方服务名),calleeService 为发布方服务名(即被调方服务名)
func URI(callerService, calleeService string) string {return fmt.Sprintf("ns://%s:@%s", callerService, calleeService)
}// Build 实现了 resolver.Builder.Build 方法
func (*nsResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {ctx, cancel := context.WithCancel(context.Background())r := &nsResolver{target: target,cc: cc,ctx: ctx,cancel: cancel,}// 启动协程,响应指定 Name 服务实例变化go r.watcher()return r, nil
}// Scheme 实现了 resolver.Builder.Scheme 方法
// Scheme 方法定义了 ns resolver 的协议名
func (*nsResolverBuilder) Scheme() string {return "ns"
}
05
封装 gRPC.Dial() 方法
实现 nsResolver
nsResolverBuilder
后,我们还需要对 grpc.Dial()
方法进行封装,方便业务方适用。封装后 dial.go
代码如下所示(严格来说 dial.go
不应该放在 ns
包中,本例中这么做只是为简化包布局,方便读者理解):
package ns// Dial 封装 `grpc.Dial()` 方法以供业务方代码初始化 *grpc.ClientConn。
// 业务方可使用此 Dial 方法基于主调方服务名、被调方服务名等参数构造 *grpc.ClientConn 实例,
// 随后可在业务代码中使用 *grpc.ClientConn 实例构造桩代码中生成的 grpcServiceClient 并发起 RPC 调用。
func Dial(callerService, calleeService string, dialOpts ...grpc.DialOption) (*grpc.ClientConn, error) {// 根据 callerService 和 calleeService 构造对应的 URIURI := ns.URI(callerService, calleeService)ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)defer cancel()// 设置拨号配置opts := []grpc.DialOption{grpc.WithBlock(),grpc.WithInsecure(),}dialOpts = append(dialOpts, dialOpts...)// 调用 grpc.DialContext() 方法拨号conn, err := grpc.DialContext(ctx,URI,opts...,)if err != nil {logz.Warn("did not connect", logz.Any("target", URI), logz.E(err))return nil, err}return conn, err
}
06
gRPC resolver 原理
看完上面的实现有些同学可能仍然不太明白 Name Resolution 流程,我们结合 grpc resolver 源码来做简单分析。
上一节在自定义包 ns
中定义了两个 go 文件, resolver.go
和 resolver_builder.go
。
前者是整个功能最核心的代码,通过自定义
nsResolver
将服务名解析成对应实例。后者是采用 Builder 模式在包初始化时创建并注册构造
nsResover
的nsResolverBuilder
实例。当客户端通过Dial
方法对指定服务进行拨号时,grpc resolver 查找注册的 Builder 实例调用其Build()
方法构建自定义nsResolver
。
我们试着从 grpc 源码出发,从客户端拨号开始直到发起 RPC 调用来了解 gRPC Name Resolution 流程。
假设我们有如下 demo.pb
文件:
syntax = "proto3";package demo;service DemoService {rpc SayHi(HiRequest) returns (HiResponse);
}message HiRequest {string name = 1;
}message HiResponse {string message = 1;
}
生成的桩代码 demo.pb.go
可能如下:
package demo...
type DemoServiceClient interface {SayHiOK(ctx context.Context, in *HiRequest, opts ...grpc.CallOption) (*HiResponse, error)
}type demoServiceClient struct {cc *grpc.ClientConn
}// NewDemoServiceClient 业务代码中此方法来构造 *demoServiceClient 实例
func NewDemoServiceClient(cc *grpc.ClientConn) DemoServiceClient {return &demoServiceClient{cc}
}func (c *demoServiceClient) SayHiOK(ctx context.Context, in *HiRequest, opts ...grpc.CallOption) (*HiResponse, error) {out := new(HiResponse)err := c.cc.Invoke(ctx, "/proto.DemoService/SayHiOK", in, out, opts...)if err != nil {return nil, err}return out, nil
}
...
客户端业务代码中构造 *grpc.ClientConn
再发起 RPC 调用代码如下:
import "mypkg/internal/ns"...
// 使用上节中封装的 ns.Dial 方法构造 *grpc.ClientConn
conn, _ := ns.Dial("my-caller-service", "my-callee-service")
// 构造 *demoServiceClient
cli := demo.NewDemoServiceClient(conn)
// 使用 *demoServiceClient 发起 RPC 调用
res, _ := cli.SayHiOK(ctx, &proto.HiRequest{Name: "world"})
...
业务代码 import "mypkg/internal/ns"
包后,在 ns/resolver_builder.go
的 init 阶段会通过 Register()
方法将 nsResolverBuilder
注册到 grpc 内部的一个全局 map 中:
// m 定义为一个全局 map,用于存放 [resolver 协议名 -> resolverBuilder] 键值对
var m = make(map[string]Builder)// Register 方法将指定 [resolver 协议名 -> resolverBuilder] 键值对存入 map
func Register(b Builder) {m[b.Scheme()] = b
}// Get 方法根据传入的 resolver 协议名返回对应的 resolverBuilder
func Get(scheme string) Builder {if b, ok := m[scheme]; ok {return b}return nil
}
ns.Dial()
方法使用 callerService 和 calleeService 构造服务 URI,并使用此 URI 作为参数调用 grpc.DialContext()
方法,来构造 *grpc.ClientConn
实例。
grpc.DialContext()
方法接收三个参数:ctx、target、opts,
target:就是根据我们自定义的协议名、callerService、CalleeService 生成的 URI,比如本例中 target 参数值为 ns://my-caller-service:@my-callee-service
,其中 ns
为协议名。grpc 可通过协议名查表来获取对应的 resolverBuilder。opts:是一个变长参数,表示拨号配置选项。
grpc.DialContext()
内部逻辑比较复杂,我们挑重点讲:
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {// 构造 ClientConn 实例cc := &ClientConn{target: target,csMgr: &connectivityStateManager{},conns: make(map[*addrConn]struct{}),dopts: defaultDialOptions(),blockingpicker: newPickerWrapper(),czData: new(channelzData),firstResolveEvent: grpcsync.NewEvent(),}cc.retryThrottler.Store((*retryThrottler)(nil))cc.ctx, cc.cancel = context.WithCancel(context.Background())for _, opt := range opts {opt.apply(&cc.dopts)}...// 如果用户指定了 timeout 超时配置,那么初始化一个带超时的 ctx// 如果 defer 阶段已超时,则抛出 j 错误if cc.dopts.timeout > 0 {var cancel context.CancelFuncctx, cancel = context.WithTimeout(ctx, cc.dopts.timeout)defer cancel()}defer func() {select {case <-ctx.Done():conn, err = nil, ctx.Err()default:}}()...// Name Resolver 核心逻辑,初始化 resolverBuilder,代码中首先会判断下用户是否指定 resolverBuilder// - 如果有指定 resolverBuilder,则直接使用此 resolverBuilder。// - 如果用户没有指定 resolverBuilder,那么 grpc 做如下操作:// - 通过 parseTarget 方法解析用户传入的 target,本例中即 `ns://my-caller-service:@my-callee-service`,获取 Scheme(协议名)、authority(包含 callerService、calleeService)。// - 查询指定协议对应的 resolverBuilder。if cc.dopts.resolverBuilder == nil {// 解析用户传入的 targetcc.parsedTarget = parseTarget(cc.target)// 通过协议名查表获取对应的 resolverBuildercc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)// 如果表中没查到对应的 resolverBuilder,则使用默认协议查询对应的 resolverBuilder// 默认协议为 `passthrough`,它会从用户解析的 target 中直接读取 endpoint 地址if cc.dopts.resolverBuilder == nil {cc.parsedTarget = resolver.Target{Scheme: resolver.GetDefaultScheme(),Endpoint: target,}cc.dopts.resolverBuilder = resolver.Get(cc.parsedTarget.Scheme)}} else {cc.parsedTarget = resolver.Target{Endpoint: target}}...// 使用上面初始化的 resolverBuilder 构建 resolver// 初始化 resolverWrapperrWrapper, err := newCCResolverWrapper(cc)if err != nil {return nil, fmt.Errorf("failed to build resolver: %v", err)}cc.mu.Lock()cc.resolverWrapper = rWrappercc.mu.Unlock()// 如果客户端配置了 WithBlock option,则会轮询 ClientConn 状态,如果 ClientConn 就绪,则返回 ClientConn。// 如果直到 ctx 超时或被 Cancel ClientConn 依然未就绪,则抛出 ctx.Err() 错误。if cc.dopts.block {for {s := cc.GetState()// 1. 如果 ClientConn 状态为 Ready 则返回此 ClientConn// 2. 如果 ClientConn 状态并非 Ready,且用户配置了 FailOnNonTempDialError,当前 ClientConn 状态为 TransientFailure,且 lbPicker 尝试和服务端实例建立连接时产生错误。根据错误性质做如下处理:// 2.1. 如果此错误是非临时性的错误,则抛出此错误// 2.2. 如果此错误是临时性的错误,则继续轮询 ClientConn 状态,直至 ctx 超时或被外部 Cancelif s == connectivity.Ready {break} else if cc.dopts.copts.FailOnNonTempDialError && s == connectivity.TransientFailure {if err = cc.blockingpicker.connectionError(); err != nil {terr, ok := err.(interface {Temporary() bool})if ok && !terr.Temporary() {return nil, err}}}if !cc.WaitForStateChange(ctx, s) {return nil, ctx.Err()}}}return cc, nil
}
我们再看下 newCCResolverWrapper()
方法内部实现:
func newCCResolverWrapper(cc *ClientConn) (*ccResolverWrapper, error) {...ccr := &ccResolverWrapper{cc: cc,addrCh: make(chan []resolver.Address, 1),scCh: make(chan string, 1),}var err error// rb.Build() 调用指定 resolveBuilder 的 Build 方法,本例中会执行我们定义的 nsResolverBuilder.Builder() 方法ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, resolver.BuildOption{DisableServiceConfig: cc.dopts.disableServiceConfig})if err != nil {return nil, err}return ccr, nil
}
前面章节中在 ns/resolver_builder.go
中已经给出了 nsResolverBuilder
实现,我们再看下 nsResolverBuilder.Builder()
方法内部逻辑:
package ns// init 将定义好的 NS Builder 注册到 resolver 包中
func init() {resolver.Register(NewBuilder())
}...
func (*nsResolverBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOption) (resolver.Resolver, error) {ctx, cancel := context.WithCancel(context.Background())r := &nsResolver{target: target,cc: cc,ctx: ctx,cancel: cancel,}// 启动协程,轮询并更新指定 CalleeService 服务的实例变化go r.watcher()return r, nil
}
...
前面章节中在 ns/resolver.go
中给出了 nsResolver
实现,我们再看下 nsResolver.watch()
方法:
package ns...
// watcher 轮询并更新指定 CalleeService 服务的实例变化
func (r *nsResolver) watcher() {r.updateCC()ticker := time.NewTicker(syncNSInterval)for {select {// 当* nsResolver Close 时退出case <-r.ctx.Done():ticker.Stop()returncase <-ticker.C:// 调用* nsResolver.updagteCC() 方法,更新实例地址r.updateCC()}}
}// updateCC 更新 resolver.Resolver.ClientConn 配置
func (r *nsResolver) updateCC() {// 从 NS 服务获取指定 target 的实例列表instances, err := r.getInstances(r.target)// 如果获取实例列表失败,或者实例列表为空,则不更新 resolver 中实例列表if err != nil || len(instances.CalleeIns) == 0 {logz.Warn("[mis] error retrieving instances from Mis", logz.Any("target", r.target), logz.Error(err))return}...// 组装实例列表 []resolver.Address// resolver.Address 结构体表示 grpc server 端实例地址var newAddrs []resolver.Addressfor k := range instances.calleeIns {newAddrs = append(newAddrs, resolver.Address{Addr: instances.CalleeIns[k],})}...// 更新实例列表// grpc 底层 LB 组件对每个服务端实例创建一个 subConnection。并根据设定的 LB 策略,选择合适的 subConnection 处理某次 RPC 请求。// 此处代码比较复杂,后续在 LB 相关原理文章中再做概述。r.cc.UpdateState(resolver.State{Addresses: newAddrs})
}
...
07
总结
文章末尾我们对自定义 grpc Resolver 做个总结,整个工作流大概如下所示:
客户端启动时,引入自定义的 resolver 包(比如本例中我们自定义的
ns
包)-
引入
ns
包,在init()
阶段,构造自定义的 resolveBuilder,并将其注册到 grpc 内部的 resolveBuilder 表中(其实是一个全局 map,key 为协议名,比如ns
;value 为构造的 resolveBuilder,比如nsResolverBuilder
)。
客户端启动时通过自定义
Dail()
方法构造 grpc.ClientConn 单例-
grpc.DialContext()
方法内部解析 URI,分析协议类型,并从 resolveBuilder 表中查找协议对应的 resolverBuilder。比如本例中我们定义的 URI 协议类型为ns
,对应的 resolverBuilder 为nsResolverBuilder
找到指定的 resolveBuilder 后,调用 resolveBuilder 的
Build()
方法,构建自定义 resolver,同时开启协程,通过此 resolver 更新被调服务实例列表。Dial()
方法接收主调服务名和被调服务名,并根据自定义的协议名,基于这两个参数构造服务的 URIDial()
方法内部使用构造的 URI,调用grpc.DialContext()
方法对指定服务进行拨号
grpc 底层 LB 库对每个实例均创建一个 subConnection,最终根据相应的 LB 策略,选择合适的 subConnection 处理某次 RPC 请求。
至此为止,我们基本上捋清了 gRPC Name Resolver 大概原理。由于篇幅原因,底层的很多细节没有介绍到,比如 grpc 是如何管理服务端的多个实例连接的,服务端有多个实例时负载均衡是如何实现的。下篇文章将给大家介绍下 gRPC LB 原理以及如何自定义动态的 LB 策略来实现流量灰度功能。
聪明又努力的 Gophers,让我知道你“在看”