文章目录
- 一、整体设计
-
- 1.1 功能概述
- 1.2 目录结构
- 1.3 核心类
- 二、类设计
-
- 2.1 CanalConnector
-
- 2.1.1 接口介绍
- 2.2.2 接口实现
-
- 2.2.2.1 SimpleCanalConnector
- 2.2.2.2 ClusterCanalConnector
- 三、HA 实现
-
- 3.1 实现原理
-
- 3.1.1 ZooKeeper 分布式锁原理
- 3.1.2 Client HA 具体实现
一、整体设计
1.1 功能概述
封装与 Canal Server 进行交互的客户端,提供两种实现给外部使用:
- 简单连接:直接通过 socket 与 server 进行交互,实现连接、订阅、批量获取、提交和回滚等操作。
- 有 HA 的 Cluster 连接:基于简单连接方式进行封装,通过 ZooKeeper 实现 client 端的 HA。
1.2 目录结构
其中 kafka 包和 rocketmq 包实现了接收到的消息直接发送到消息队列中,目前先重点关注与服务端的交互和消息的处理流程,后续会重点关注 kafka 包的实现,为目前生产上 C/S 端分离 ==> 集成 的改造做准备。
1.3 核心类
- ClientIdentity
canal client 和 server 交互之间的身份标识,目前 clientId 写死为 1001(目前 canal server 上的一个 instance 只能有一个 client 消费,clientId 的设计是为 1 个 instance 多 client 消费模式而预留的,暂时不需要理会)。 - CanalConnector
SimpleCanalConnector / ClusterCanalConnector:两种 connector 的实现,simple 针对的是简单的 IP 直连模式,cluster 针对多 IP 的模式,可依赖 CanalNodeAccessStrategy 进行 failover 控制。 - CanalNodeAccessStrategy
SimpleNodeAccessStrategy / ClusterNodeAccessStrategy:两种 failover 的实现,simple 针对给定的初始 IP 列表进行failover选择,cluster 基于 ZooKeeper 上的 cluster 节点动态选择正在运行的 canal server。 - ClientRunningMonitor / ClientRunningListener / ClientRunningData
client running 相关控制,主要为解决 client 自身的 failover 机制。canal client 允许同时启动多个 canal client,通过 running 机制,可保证只有一个 client 在工作,其他 client 做为冷备。当运行中的 client 挂了,running 会控制让冷备中的 client 转为工作模式,这样就可以确保 canal client 也不会是单点。保证整个系统的高可用性。ClientRunningData 对应 ZooKeeper 上的 /otter/canal/destinations/xxx/1001 节点数据。
二、类设计
2.1 CanalConnector
2.1.1 接口介绍
CanalConnector 的作用是与服务端进行交互,支持连接、订阅、获取数据、回滚、断开等操作。一个 CanalConnector 对应一个指定的目标库(destination),如果需要订阅多个目标库需要创建多个 CanalConnector。
以下是 CanalConnector 接口的方法介绍:
- void connect():与 Canal 服务端建立连接。
- void disconnect():与 Canal 服务端断开连接。
- boolean checkValid():检查连接是否合法。
- 连接服务端失败,一直没有一个可用的连接时,返回 false
- 当前客户端在进行 running 节点抢占时,作为备份节点存在,并非作为工作节点,返回 false
- void subcribe():订阅服务端,可以传入一个 filter 字符串用于过滤物理库或表。
- void unsubscribe():取消订阅服务端。
- Message getWithoutAck():不需要指定 position 就可以获取服务端数据,Canal 会记住此 client 的最新 position。如果是第一次fetch,则会从 Canal 中保存的最老一条数据开始输出。
- (int batchSize):从服务端获取 batchSize 大小的数据,有多少取多少,不会阻塞等待。
- (int batchSize, int timeout, TimeUnit unit):从服务端获取 batchSize 大小的数据,阻塞等待直到拿够 batchSize 条记录或者等待时间达到 timeout。
- Message get():从服务端获取订阅的消费数据,在执行 getWithoutAck 之后 自动提交确认。
- (int batchSize):从服务端获取 batchSize 大小的数据,有多少取多少,不会阻塞等待
- (int batchSize, int timeout, TimeUnit unit):从服务端获取 batchSize 大小的数据,阻塞等待直到拿够 batchSize 条记录或者等待时间达到 timeout。
- void ack(int batchId):通过传入 batchId 向服务端确认消息已经消费成功,小于等于此 batchId 的 Message 都会被确认。
- void rollback(int batchId):基于 batchId 回滚对应的 get/getWithoutAck 请求,重新获取一次数据。
2.2.2 接口实现
CanalConnector 接口有两种实现,分别是 SimpleCanalConnector 和 ClusterCanalConnector。前者负责通过指定地址直接连接服务端,后者则可以通过指定一个地址列表或是 ZK 地址来连接高可用的服务端集群。
2.2.2.1 SimpleCanalConnector
该类负责实现与服务端的交互逻辑,包括与服务端进行握手、认证、发送与接收数据包等,以及自身运行状态的控制。下面以 connect 方法为例,举例分析该类与服务端的交互流程。
首先,程序会执行 waitClientRunning() 检测自己当前的角色,如果自己是工作节点,那么继续运行,如果是备用节点则线程挂起,等待角色切换,如果是单节点直连则直接设置自己为运行状态并返回。
接下来,程序会执行 doConnect() 方法开始执行真正的连接过程。
- 第一步,建立 socket 连接:开启一个 SocketChannel,并设置 socket 的连接超时时间,然后与服务端 TCP 三次握手建立连接。
- 第二步,与服务端进行握手认证:读取并解析服务端发送的报文,其中报文头包含了协议的版本、类型等,报文体包含了握手所需要的信息,如压缩方式、认证需要的 seed 等。调用加密工具类,用 seed 加密传入的 password,随后生成一个 ClientAuth 类,设置 username、password 密文等信息,并将它组装为一个客户端认证报文,发送给服务端。接收并解析服务端的响应报文,判断是否认证成功,若成功则继续设置当前状态为已连接(connected = true),返回本地 socket 地址对象,若失败则抛出异常。
连接建立后,随后的 subscribe/unsubscribe、get/getWithoutAck、ack/rollback 等操作无需再执行握手认证,这些操作实际上大同小异,无非是根据传入参数,向服务端发送一个相应的报文(指定报文类型、构建报文体),然后解析响应报文,返回或执行后续操作。客户端操作、报文头、报文类型的对应关系如下:
方法名 | 报文类型 | 报文体格式类 | 含义 |
---|---|---|---|
doConnect | PacketType.HandShake PacketType.CLIENTAUTHENTICATION PacketType.Ack |
- ClientAuth Ack |
握手 客户端认证 服务端确认 |
subscribe | PacketType.SUBSCRIPTION | Sub | 订阅 |
unsubscribe | PacketType.UNSUBSCRIPTION | Unsub | 取消订阅 |
get/getWithoutAck | PacketType.GET | Get | 获取数据 |
ack | PacketType.CLIENTACK | ClientAck | 客户端确认 |
rollback | PacketType.CLIENTROLLBACK | ClientRollback | 客户端回滚 |
2.2.2.2 ClusterCanalConnector
如 1.4 核心类 中所介绍,ClusterCanalConnector 基于 SimpleCanalConnector、CanalNodeAccessStrategy 以及 impl.running 包内的类来实现客户端连接 HA 的服务端以及客户端自身的 HA。
ClusterCanalConnector 内部维护了一个 currentConnector 对象,这是 SimpleCanalConnector 的一个实例,并且是当前正在工作的实例。类似后者,该类同样实现了 CanalConnector 与用于和服务端交互的方法,其实现方式为内部调用 currentConnecotr 的对象方法来真正地交互,而它自己只负责维护 currentConnector 即可。
该类支持 SimpleNodeAccessStrategy 和 ClusterNodeAccessStrategy 两种节点访问策略,前者需要手动指定服务端地址的列表,而后者基于 ZK 注册与发现,监听 ZK 上的节点,自动更新内存中的地址列表和当前工作节点地址。
以 ClusterNodeAccessStrategy 为例,它维护了 currentAddress 和 runningAddress 两个属性。在构造器初始化时,程序会去订阅监听 ZK 上的 destination/cluster
节点和 destination/running
节点,前者包含服务端地址列表,后者则是正在工作的服务端节点地址,分别对应它的两个属性。当上述两个节点的数据更改或者删除后,程序会对这两个属性做相应的变更,具体如下图所示。
三、HA 实现
3.1 实现原理
服务端和客户端 HA 的实现都是利用了 ZooKeeper 可作为分布式锁的特性。
3.1.1 ZooKeeper 分布式锁原理
-
临时节点(EPHEMERAL):与客户端会话绑定,一旦客户端会话失效(如宕机),这个客户端所创建的所有临时节点都会被移除。
-
Watcher 机制:
ZooKeeper Watcher 机制
ZooKeeper 客户端向 ZooKeeper 服务器注册 watcher 的同时,会将 watcher 对象存储在客户端的 WatcherManager;ZooKeeper 服务器触发 watcher 事件后,会向客户端发送通知,客户端线程从 WatcherManager 中回调 watcher 执行相应的功能。
3.1.2 Client HA 具体实现
- 1001/cursor:最新的 client 确认消费的信息
- 1001/filter:client 订阅的过滤规则
- 1001/running:正在运行中的 client 地址
client 启动时会订阅 running 节点的变更事件,然后去尝试创建 running 节点:
- 创建成功则将自己的信息写入 running 节点,然后继续运行;
- 创建失败则线程挂起,等待 running 节点释放,然后再次尝试创建 running 节点。