当前位置: 代码迷 >> 综合 >> canal.client 源码分析
  详细解决方案

canal.client 源码分析

热度:6   发布时间:2023-12-08 16:51:03.0

文章目录

  • 一、整体设计
    • 1.1 功能概述
    • 1.2 目录结构
    • 1.3 核心类
  • 二、类设计
    • 2.1 CanalConnector
      • 2.1.1 接口介绍
      • 2.2.2 接口实现
        • 2.2.2.1 SimpleCanalConnector
        • 2.2.2.2 ClusterCanalConnector
  • 三、HA 实现
    • 3.1 实现原理
      • 3.1.1 ZooKeeper 分布式锁原理
      • 3.1.2 Client HA 具体实现

一、整体设计

1.1 功能概述

封装与 Canal Server 进行交互的客户端,提供两种实现给外部使用:

  • 简单连接:直接通过 socket 与 server 进行交互,实现连接、订阅、批量获取、提交和回滚等操作。
  • 有 HA 的 Cluster 连接:基于简单连接方式进行封装,通过 ZooKeeper 实现 client 端的 HA。

1.2 目录结构

其中 kafka 包和 rocketmq 包实现了接收到的消息直接发送到消息队列中,目前先重点关注与服务端的交互和消息的处理流程,后续会重点关注 kafka 包的实现,为目前生产上 C/S 端分离 ==> 集成 的改造做准备。

1.3 核心类

核心类类图

  • ClientIdentity
    canal client 和 server 交互之间的身份标识,目前 clientId 写死为 1001(目前 canal server 上的一个 instance 只能有一个 client 消费,clientId 的设计是为 1 个 instance 多 client 消费模式而预留的,暂时不需要理会)。
  • CanalConnector
    SimpleCanalConnector / ClusterCanalConnector:两种 connector 的实现,simple 针对的是简单的 IP 直连模式,cluster 针对多 IP 的模式,可依赖 CanalNodeAccessStrategy 进行 failover 控制。
  • CanalNodeAccessStrategy
    SimpleNodeAccessStrategy / ClusterNodeAccessStrategy:两种 failover 的实现,simple 针对给定的初始 IP 列表进行failover选择,cluster 基于 ZooKeeper 上的 cluster 节点动态选择正在运行的 canal server。
  • ClientRunningMonitor / ClientRunningListener / ClientRunningData
    client running 相关控制,主要为解决 client 自身的 failover 机制。canal client 允许同时启动多个 canal client,通过 running 机制,可保证只有一个 client 在工作,其他 client 做为冷备。当运行中的 client 挂了,running 会控制让冷备中的 client 转为工作模式,这样就可以确保 canal client 也不会是单点。保证整个系统的高可用性。ClientRunningData 对应 ZooKeeper 上的 /otter/canal/destinations/xxx/1001 节点数据。

二、类设计

2.1 CanalConnector

2.1.1 接口介绍

CanalConnector 的作用是与服务端进行交互,支持连接、订阅、获取数据、回滚、断开等操作。一个 CanalConnector 对应一个指定的目标库(destination),如果需要订阅多个目标库需要创建多个 CanalConnector。

以下是 CanalConnector 接口的方法介绍:

CanalConnector 方法列表

  • void connect():与 Canal 服务端建立连接。
  • void disconnect():与 Canal 服务端断开连接。
  • boolean checkValid():检查连接是否合法。
    • 连接服务端失败,一直没有一个可用的连接时,返回 false
    • 当前客户端在进行 running 节点抢占时,作为备份节点存在,并非作为工作节点,返回 false
  • void subcribe():订阅服务端,可以传入一个 filter 字符串用于过滤物理库或表。
  • void unsubscribe():取消订阅服务端。
  • Message getWithoutAck():不需要指定 position 就可以获取服务端数据,Canal 会记住此 client 的最新 position。如果是第一次fetch,则会从 Canal 中保存的最老一条数据开始输出。
    • (int batchSize):从服务端获取 batchSize 大小的数据,有多少取多少,不会阻塞等待。
    • (int batchSize, int timeout, TimeUnit unit):从服务端获取 batchSize 大小的数据,阻塞等待直到拿够 batchSize 条记录或者等待时间达到 timeout。
  • Message get():从服务端获取订阅的消费数据,在执行 getWithoutAck 之后 自动提交确认。
    • (int batchSize):从服务端获取 batchSize 大小的数据,有多少取多少,不会阻塞等待
    • (int batchSize, int timeout, TimeUnit unit):从服务端获取 batchSize 大小的数据,阻塞等待直到拿够 batchSize 条记录或者等待时间达到 timeout。
  • void ack(int batchId):通过传入 batchId 向服务端确认消息已经消费成功,小于等于此 batchId 的 Message 都会被确认。
  • void rollback(int batchId):基于 batchId 回滚对应的 get/getWithoutAck 请求,重新获取一次数据。

2.2.2 接口实现

CanalConnector 接口有两种实现,分别是 SimpleCanalConnector 和 ClusterCanalConnector。前者负责通过指定地址直接连接服务端,后者则可以通过指定一个地址列表或是 ZK 地址来连接高可用的服务端集群。

2.2.2.1 SimpleCanalConnector

该类负责实现与服务端的交互逻辑,包括与服务端进行握手、认证、发送与接收数据包等,以及自身运行状态的控制。下面以 connect 方法为例,举例分析该类与服务端的交互流程。

单节点直连时序图

首先,程序会执行 waitClientRunning() 检测自己当前的角色,如果自己是工作节点,那么继续运行,如果是备用节点则线程挂起,等待角色切换,如果是单节点直连则直接设置自己为运行状态并返回。

接下来,程序会执行 doConnect() 方法开始执行真正的连接过程。

  • 第一步,建立 socket 连接:开启一个 SocketChannel,并设置 socket 的连接超时时间,然后与服务端 TCP 三次握手建立连接。
  • 第二步,与服务端进行握手认证:读取并解析服务端发送的报文,其中报文头包含了协议的版本、类型等,报文体包含了握手所需要的信息,如压缩方式、认证需要的 seed 等。调用加密工具类,用 seed 加密传入的 password,随后生成一个 ClientAuth 类,设置 username、password 密文等信息,并将它组装为一个客户端认证报文,发送给服务端。接收并解析服务端的响应报文,判断是否认证成功,若成功则继续设置当前状态为已连接(connected = true),返回本地 socket 地址对象,若失败则抛出异常。

连接建立后,随后的 subscribe/unsubscribe、get/getWithoutAck、ack/rollback 等操作无需再执行握手认证,这些操作实际上大同小异,无非是根据传入参数,向服务端发送一个相应的报文(指定报文类型、构建报文体),然后解析响应报文,返回或执行后续操作。客户端操作、报文头、报文类型的对应关系如下:

方法名 报文类型 报文体格式类 含义
doConnect PacketType.HandShake
PacketType.CLIENTAUTHENTICATION
PacketType.Ack
-
ClientAuth
Ack
握手
客户端认证
服务端确认
subscribe PacketType.SUBSCRIPTION Sub 订阅
unsubscribe PacketType.UNSUBSCRIPTION Unsub 取消订阅
get/getWithoutAck PacketType.GET Get 获取数据
ack PacketType.CLIENTACK ClientAck 客户端确认
rollback PacketType.CLIENTROLLBACK ClientRollback 客户端回滚
2.2.2.2 ClusterCanalConnector

1.4 核心类 中所介绍,ClusterCanalConnector 基于 SimpleCanalConnector、CanalNodeAccessStrategy 以及 impl.running 包内的类来实现客户端连接 HA 的服务端以及客户端自身的 HA。

ClusterCanalConnector 内部维护了一个 currentConnector 对象,这是 SimpleCanalConnector 的一个实例,并且是当前正在工作的实例。类似后者,该类同样实现了 CanalConnector 与用于和服务端交互的方法,其实现方式为内部调用 currentConnecotr 的对象方法来真正地交互,而它自己只负责维护 currentConnector 即可。

该类支持 SimpleNodeAccessStrategy 和 ClusterNodeAccessStrategy 两种节点访问策略,前者需要手动指定服务端地址的列表,而后者基于 ZK 注册与发现,监听 ZK 上的节点,自动更新内存中的地址列表和当前工作节点地址。

ZooKeeper 上的相关节点

以 ClusterNodeAccessStrategy 为例,它维护了 currentAddress 和 runningAddress 两个属性。在构造器初始化时,程序会去订阅监听 ZK 上的 destination/cluster 节点和 destination/running 节点,前者包含服务端地址列表,后者则是正在工作的服务端节点地址,分别对应它的两个属性。当上述两个节点的数据更改或者删除后,程序会对这两个属性做相应的变更,具体如下图所示。

连接 HA 服务端

三、HA 实现

3.1 实现原理

服务端和客户端 HA 的实现都是利用了 ZooKeeper 可作为分布式锁的特性。

3.1.1 ZooKeeper 分布式锁原理

  • 临时节点(EPHEMERAL):与客户端会话绑定,一旦客户端会话失效(如宕机),这个客户端所创建的所有临时节点都会被移除。

  • Watcher 机制:

    ZooKeeper Watcher 机制

    ZooKeeper 客户端向 ZooKeeper 服务器注册 watcher 的同时,会将 watcher 对象存储在客户端的 WatcherManager;ZooKeeper 服务器触发 watcher 事件后,会向客户端发送通知,客户端线程从 WatcherManager 中回调 watcher 执行相应的功能。

3.1.2 Client HA 具体实现

基于 ZooKeeper 的 Client HA

  • 1001/cursor:最新的 client 确认消费的信息
  • 1001/filter:client 订阅的过滤规则
  • 1001/running:正在运行中的 client 地址

client 启动时会订阅 running 节点的变更事件,然后去尝试创建 running 节点:

  • 创建成功则将自己的信息写入 running 节点,然后继续运行;
  • 创建失败则线程挂起,等待 running 节点释放,然后再次尝试创建 running 节点。
  相关解决方案