当前位置: 代码迷 >> 综合 >> 【Canal】Canal学习笔记
  详细解决方案

【Canal】Canal学习笔记

热度:81   发布时间:2023-09-21 00:30:03.0

目录

一、Canal 介绍

1.1 Canal 概述

1.2 Canal 原理

二、Canal 安装和使用

2.1 mysql 准备

2.1.1 开启 mysql 的 binlog 模式

2.1.2 创建同步账号

2.1.3 重启 mysql 容器

2.2 Canal 容器安装和使用

2.2.1 下载镜像和安装

2.2.2 Canal 配置

2.3 Canal 微服务搭建

2.3.1 安装 Canal 开源项目到本地 maven 仓库

2.3.3 创建 CanalDataEventListerner.java :

2.3.4 新建启动类

2.3.5 编写微服务配置文件

2.4 测试


一、Canal 介绍

1.1 Canal 概述

Canal是阿里巴巴旗下的一款开源项目,纯Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。

简单来说就是用来监控数据库数据的变化,从而获得新增数据或修改数据的项目框架,目前主要支持了MySQL。

 

1.2 Canal 原理

【Canal】Canal学习笔记

原理:

  • canal 模拟 mysql slave 的交互协议,伪装自己为 mysql slave, 向mysql master 发送 dump 协议
  • mysql master 收到 dump 请求,开始推送 binary log 给 slave(即 canal)
  • canal 解析 binary log 对象(原始为byte流)

由于 canal是基于mysql的主从模式实现的,所以必须先开启 binlog

 

二、Canal 安装和使用

2.1 mysql 准备

2.1.1 开启 mysql 的 binlog 模式

由于 mysql 是使用 docker 部署的,进入 docker 修改 mysql 的配置

# 进入 mysql 的 docker 容器内
docker exec -it mysql /bin/bash# 进入 mysql 对应的配置文件
cd /etc/mysql/mysql.conf.d# 修改配置文件
vim mysqld.cnf

  【Canal】Canal学习笔记

log-bin=/var/lib/mysql/mysql-bin # 指定日志文件存储的位置
server-id=12345 # 当前这个mysql数据库的唯一标识

 

2.1.2 创建同步账号

由于 Canal 要进入 mysql 读取数据,出于安全考虑,需要对 Canal 创建用户并赋予权限

# 创建账号 %表示能在任意机器登录  by 'xxx' 表示密码为canal
create user canal@'%' IDENTIFIED by 'canal';# 授权 依次为:查询权限、主从复制权限、主从复制客户端权限、超级权限 
# *.*  表示 任意数据库.任意表  都拥有相关权限
# 结合表示: 任意机器上以canal账号登录的用户对mysql中任意数据库和任意表都拥有上述权限
GRANT SELECT, PEPLICATION SLAVE, REPLICATION CLIENT, SUPER ON *.* TO 'canal'@'%'';# 刷新数据库
FLUSH PRIVILEGES;

 

2.1.3 重启 mysql 容器

docker restart canal

 

2.2 Canal 容器安装和使用

2.2.1 下载镜像和安装

# 下载镜像
docker pull docker.io/canal/canal-server# 安装
docker run -p 11111:11111 --name canal -d docker.io/canal/canal-server
#-p 端口映射

2.2.2 Canal 配置

进入 Canal 容器

# 进入 canal 容器
docker exec -it canal /bin/bash# 进入配置文件夹
cd /home/admin/canal-server/conf

  【Canal】Canal学习笔记

修改 Canal 的数据库唯一标识,改成唯一的

  【Canal】Canal学习笔记

修改同步配置

vi ./example/instance.properties

  【Canal】Canal学习笔记

  【Canal】Canal学习笔记

  【Canal】Canal学习笔记

  【Canal】Canal学习笔记

 

2.3 Canal 微服务搭建

2.3.1 安装 Canal 开源项目到本地 maven 仓库

在搭建 Canal 时使用了一个开源项目,实现了 Springboot 和 Canal 的集成。

https://github.com/chenqian56131/spring-boot-starter-canal

搭建:

找到项目目录,使用命令安装到本地 maven 仓库

mvn install

  【Canal】Canal学习笔记  

  【Canal】Canal学习笔记

 

2.3.2 创建微服务项目

创建 maven 项目,并加入依赖

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!--   canal依赖     --><dependency><groupId>com.xpand</groupId><artifactId>starter-canal</artifactId><version>0.0.1-SNAPSHOT</version></dependency>
</dependencies>

 

2.3.3 创建 CanalDataEventListerner.java :

【Canal】Canal学习笔记

package com.changgou.canal;import com.alibaba.otter.canal.protocol.CanalEntry;
import com.changgou.content.feign.ContentFeign;
import com.xpand.starter.canal.annotation.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;/*** 实现mysql数据监听*/
@CanalEventListener
@Slf4j
public class CanalDataEventListener222 {@Autowiredprivate ContentFeign contentFeign;@Autowiredprivate StringRedisTemplate stringRedisTemplate;/*** @InsertListenPoint 增加监听* @param eventType 当前操作的类型: 增加数据* @param rowData 发生变更的一行数据*/@InsertListenPointpublic void onEventInsert(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {for (CanalEntry.Column column : rowData.getAfterColumnsList()) {log.info("增加: 列名:" + column.getName() + "------ 变更的数据:" + column.getValue());}}/*** @UpdateListenPoint 修改监听* @param eventType 当前操作的类型: 增加数据* @param rowData 发生变更的一行数据*/@UpdateListenPointpublic void onEventUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {log.info("===================================================================================");for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {log.info("修改前: 列名:" + column.getName() + "------ 变更的数据:" + column.getValue());}log.info("===================================================================================");for (CanalEntry.Column column : rowData.getAfterColumnsList()) {log.info("修改后: 列名:" + column.getName() + "------ 变更的数据:" + column.getValue());}}/*** @DeleteListenPoint 删除监听* @param eventType 当前操作的类型: 增加数据* @param rowData 发生变更的一行数据*/@DeleteListenPointpublic void onEventDel(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {log.info("删除前:列名:" + column.getName() + "------ 变更的数据:" + column.getValue());}}/*** @ListenPoint 自定义监听* @param eventType 当前操作的类型: 增加数据* @param rowData 发生变更的一行数据*/@ListenPoint(destination = "example", // 指定Canal实例的地址schema = {"changgou_content"}, // 指定监听的数据库table = {"tb_content", "tb_content_category"}, // 指定监控的表eventType = {CanalEntry.EventType.DELETE,CanalEntry.EventType.UPDATE,CanalEntry.EventType.INSERT} // 监听类型)public void onEventCustomUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData) {for (CanalEntry.Column column : rowData.getBeforeColumnsList()) {log.info("自定义操作前: 列名:" + column.getName() + "------ 变更的数据:" + column.getValue());}for (CanalEntry.Column column : rowData.getAfterColumnsList()) {log.info("自定义操作后: 列名:" + column.getName() + "------ 变更的数据:" + column.getValue());}}
}

 

2.3.4 新建启动类

import com.xpand.starter.canal.annotation.EnableCanalClient;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;/*** Canal微服务,监听数据库变化并响应*/
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
@EnableEurekaClient
@EnableCanalClient
public class CanalApplication {public static void main(String[] args) {SpringApplication.run(CanalApplication.class, args);}
}

 

2.3.5 编写微服务配置文件

【Canal】Canal学习笔记

# 192.168.47.142 为安装了redis、canal的远端服务器
server:port: 18082
spring:application:name: canalredis:host: 192.168.47.142port: 6379
eureka:client:service-url:defaultZone: http://127.0.0.1:7001/eurekainstance:prefer-ip-address: true
feign:hystrix:enabled: true
hystrix:command:default:execution:timeout:# 若enabled设置为false,则请求超时交给ribbon控制enabled: trueisolation:strategy: SEMAPHORE
canal:client:instances:example:host: 192.168.47.142port: 11111

 

2.4 测试

项目运行后,修改数据库即可看到控制台打印出修改的内容了。

【Canal】Canal学习笔记