目录
一、Canal 介绍
1.1 Canal 概述
1.2 Canal 原理
二、Canal 安装和使用
2.1 mysql 准备
2.1.1 开启 mysql 的 binlog 模式
2.1.2 创建同步账号
2.1.3 重启 mysql 容器
2.2 Canal 容器安装和使用
2.2.1 下载镜像和安装
2.2.2 Canal 配置
2.3 Canal 微服务搭建
2.3.1 安装 Canal 开源项目到本地 maven 仓库
2.3.3 创建 CanalDataEventListerner.java :
2.3.4 新建启动类
2.3.5 编写微服务配置文件
2.4 测试
一、Canal 介绍
1.1 Canal 概述
Canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。
简单来说就是用来监控数据库数据的变化,从而获得新增数据或修改数据的项目框架,目前主要支持了MySQL。
1.2 Canal 原理
原理:
- canal 模拟 mysql slave 的交互协议,伪装自己为 mysql slave, 向mysql master 发送 dump 协议
- mysql master 收到 dump 请求,开始推送 binary log 给 slave(即 canal)
- canal 解析 binary log 对象(原始为byte流)
由于 canal是基于mysql的主从模式实现的,所以必须先开启 binlog
二、Canal 安装和使用
2.1 mysql 准备
2.1.1 开启 mysql 的 binlog 模式
由于 mysql 是使用 docker 部署的,进入 docker 修改 mysql 的配置
# 进入 mysql 的 docker 容器内
docker exec -it mysql /bin/bash# 进入 mysql 对应的配置文件
cd /etc/mysql/mysql.conf.d# 修改配置文件
vim mysqld.cnf
log-bin=/var/lib/mysql/mysql-bin # 指定日志文件存储的位置
server-id=12345 # 当前这个mysql数据库的唯一标识
2.1.2 创建同步账号
由于 Canal 要进入 mysql 读取数据,出于安全考虑,需要对 Canal 创建用户并赋予权限
# 创建账号 %表示能在任意机器登录 by 'xxx' 表示密码为canal
create user canal@'%' IDENTIFIED by 'canal';# 授权 依次为:查询权限、主从复制权限、主从复制客户端权限、超级权限
# *.* 表示 任意数据库.任意表 都拥有相关权限
# 结合表示: 任意机器上以canal账号登录的用户对mysql中任意数据库和任意表都拥有上述权限
GRANT SELECT, PEPLICATION SLAVE, REPLICATION CLIENT, SUPER ON *.* TO 'canal'@'%'';# 刷新数据库
FLUSH PRIVILEGES;
2.1.3 重启 mysql 容器
docker restart canal
2.2 Canal 容器安装和使用
2.2.1 下载镜像和安装
# 下载镜像
docker pull docker.io/canal/canal-server# 安装
docker run -p 11111:11111 --name canal -d docker.io/canal/canal-server
#-p 端口映射
2.2.2 Canal 配置
进入 Canal 容器
# 进入 canal 容器
docker exec -it canal /bin/bash# 进入配置文件夹
cd /home/admin/canal-server/conf
修改 Canal 的数据库唯一标识,改成唯一的
修改同步配置
vi ./example/instance.properties
2.3 Canal 微服务搭建
2.3.1 安装 Canal 开源项目到本地 maven 仓库
在搭建 Canal 时使用了一个开源项目,实现了 Springboot 和 Canal 的集成。
https://github.com/chenqian56131/spring-boot-starter-canal
搭建:
找到项目目录,使用命令安装到本地 maven 仓库
mvn install
2.3.2 创建微服务项目
创建 maven 项目,并加入依赖
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- canal依赖 --><dependency><groupId>com.xpand</groupId><artifactId>starter-canal</artifactId><version>0.0.1-SNAPSHOT</version></dependency>
</dependencies>
2.3.3 创建 CanalDataEventListerner.java :
package com.changgou.canal;import com.alibaba.otter.canal.protocol.CanalEntry;
import com.changgou.content.feign.ContentFeign;
import com.xpand.starter.canal.annotation.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;/*** 实现mysql数据监听*/
@CanalEventListener
@Slf4j
public class CanalDataEventListener222 {@Autowiredprivate ContentFeign contentFeign;@Autowiredprivate StringRedisTemplate stringRedisTemplate;/*** @InsertListenPoint 增加监听* @param eventType 当前操作的类型: 增加数据* @param rowData 发生变更的一行数据*/@InsertListenPointpublic void onEventInsert(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {for (CanalEntry.Column column : rowData.getAfterColumnsList()) {log.info("增加: 列名:" + column.getName() + "------ 变更的数据:" + column.getValue());}}/*** @UpdateListenPoint 修改监听* @param eventType 当前操作的类型: 增加数据* @param rowData 发生变更的一行数据*/@UpdateListenPointpublic void onEventUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {log.info("===================================================================================");for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {log.info("修改前: 列名:" + column.getName() + "------ 变更的数据:" + column.getValue());}log.info("===================================================================================");for (CanalEntry.Column column : rowData.getAfterColumnsList()) {log.info("修改后: 列名:" + column.getName() + "------ 变更的数据:" + column.getValue());}}/*** @DeleteListenPoint 删除监听* @param eventType 当前操作的类型: 增加数据* @param rowData 发生变更的一行数据*/@DeleteListenPointpublic void onEventDel(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {log.info("删除前:列名:" + column.getName() + "------ 变更的数据:" + column.getValue());}}/*** @ListenPoint 自定义监听* @param eventType 当前操作的类型: 增加数据* @param rowData 发生变更的一行数据*/@ListenPoint(destination = "example", // 指定Canal实例的地址schema = {"changgou_content"}, // 指定监听的数据库table = {"tb_content", "tb_content_category"}, // 指定监控的表eventType = {CanalEntry.EventType.DELETE,CanalEntry.EventType.UPDATE,CanalEntry.EventType.INSERT} // 监听类型)public void onEventCustomUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {log.info("自定义操作前: 列名:" + column.getName() + "------ 变更的数据:" + column.getValue());}for (CanalEntry.Column column : rowData.getAfterColumnsList()) {log.info("自定义操作后: 列名:" + column.getName() + "------ 变更的数据:" + column.getValue());}}
}
2.3.4 新建启动类
import com.xpand.starter.canal.annotation.EnableCanalClient;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;/*** Canal微服务,监听数据库变化并响应*/
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
@EnableEurekaClient
@EnableCanalClient
public class CanalApplication {public static void main(String[] args) {SpringApplication.run(CanalApplication.class, args);}
}
2.3.5 编写微服务配置文件
# 192.168.47.142 为安装了redis、canal的远端服务器
server:port: 18082
spring:application:name: canalredis:host: 192.168.47.142port: 6379
eureka:client:service-url:defaultZone: http://127.0.0.1:7001/eurekainstance:prefer-ip-address: true
feign:hystrix:enabled: true
hystrix:command:default:execution:timeout:# 若enabled设置为false,则请求超时交给ribbon控制enabled: trueisolation:strategy: SEMAPHORE
canal:client:instances:example:host: 192.168.47.142port: 11111
2.4 测试
项目运行后,修改数据库即可看到控制台打印出修改的内容了。