当前位置: 代码迷 >> 综合 >> Canal+Msql+RabbitMq数据库同步配置,看这一篇就够了
  详细解决方案

Canal+Msql+RabbitMq数据库同步配置,看这一篇就够了

热度:741   发布时间:2023-11-06 05:22:02.0

Canal数据库同步配置,看这一篇就够了

  • 总述
    • 使用背景
    • 技术选择
    • 环境布置
    • Canal服务端部署
    • Canal客户端配置

总述

最近公司需要实现从数据库到数据库实时同步的方案,Canal方案作为其中之一,在实现过程中踩了一些坑,包括官方上有记录的和没记录的。Mark一下。

使用背景

数据从Mysql(主库)传到Mysql(从库),并且满足以下条件

  1. 主库宕机后恢复,继续向从库传输同步数据,且保持数据一致性;
  2. 从库宕机后恢复,继续从主库同步数据,且保持数据一致性;

注:如果不是这样的需求 以下内容可能不适合你

技术选择

  1. canal.deployer-1.1.5.tar ----Canal服务端
  2. canal.adapter-1.1.5.tar ----Canal客户端
  3. rabbitMq ----消息队列

关于引入rabbitMq 需要解释一下。
按照正常配置,仅使用Canal服务端+Canal客户端可以实现正常的数据同步功能,在配置完成后,进行如下的尝试。
1)将主库关闭并重启,对整体流程没有影响
2)将Canal服务端关闭后重启,客户端丢失链接且不再重连
3)将Canal客户端关闭后重启,对整体流程没有影响
4)将从库关闭后重启,执行过程出错,重启后继续执行,对整体流程没有影响,且满足数据一致性要求

其中第二点的问题,存在一定的风险,所以采取引入rabbitMq的策略

环境布置

工欲善其事,必先利其器。由于有很多东西需要下载
1). 快速访问https://github.com,请点击这里

  1. Mysql主库配置,无非就是bin_log配置,创建用户并授权,具体步骤参考其他人
  2. java jdk安装 (法外狂徒张三:我下载下来canal不能运行?cannal:JAVA是我爸爸)
  3. 下载cannal服务端 https://github.com/alibaba/canal/releases (具体配置见下文)
  4. 下载canal客户端 位置同上(具体配置见下文)
  5. 安装Erlang环境,参考其他人(法外狂徒张三:装这个干嘛的? rabbitMq:Erlang是我爸爸)
  6. 安装rabbitMq,参考其他人(法外狂徒张三:安装步骤没错啊,怎么运行不起来? rabbitMq:是不是Erlang爸爸不是我的Erlang爸爸?)
    注1:不要使用初始guest账户,在canal连接时,权限会不够
    注2:密码中除了下划线,不要使用特殊字符,否则canal消费端配置会报错
  7. rabbitMq+rabbitMq 版本对照表在此 https://www.rabbitmq.com/which-erlang.html

Canal服务端部署

服务端主要配置两个文件canal.properties(canal属性配置)、example文件夹中的instance.properties(canal实例属性配置)

canal.properties(canal属性配置) ------着重注意带中文描述的配置
注:配置改改就行 不要觉得用不着就删掉,还有排版缩进格式不要调整

#################################################
######### common argument #############
#################################################
# tcp bind ip
canal.ip = 
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
# canal.user = canal
# canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458# canal admin config
#canal.admin.manager = 127.0.0.1:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
#canal.admin.register.auto = true
#canal.admin.register.cluster =
#canal.admin.register.name =canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, rocketMQ, rabbitMQ 一共四种模式 使用了什么插件就配置什么
canal.serverMode = rabbitMQ
# flush meta cursor/parse position to file
canal.file.data.dir = ${
    canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false
canal.instance.filter.dml.insert = false
canal.instance.filter.dml.update = false
canal.instance.filter.dml.delete = false# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB# binlog ddl isolation
canal.instance.get.ddl.isolation = false# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${
    canal.file.data.dir:../conf}/${
    canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${
    canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
# 此处配置读取mysql二进制文件的用户名 不需要带引号
canal.instance.tsdb.dbUsername = XXX
# 此处配置读取mysql二进制文件的密码 不需要带引号
canal.instance.tsdb.dbPassword = XXX
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360#################################################
######### destinations #############
#################################################
# 配置同步实例名称 在进行消费端配置时需要对应
canal.destinations = asyncdb
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
canal.auto.scan = true
canal.auto.scan.interval = 5
# set this value to 'true' means that when binlog pos not found, skip to latest.
# WARN: pls keep 'false' in production env, or if you know what you want.
canal.auto.reset.latest.pos.mode = falsecanal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xmlcanal.instance.global.mode = spring
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${
    canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml##################################################
######### MQ Properties #############
##################################################
# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =
canal.aliyun.uid=
# true 生产到kafka的消息就是json的, false就是protobuf二进制的
canal.mq.flatMessage = true
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = localcanal.mq.database.hash = true
canal.mq.send.thread.size = 30
canal.mq.build.thread.size = 8##################################################
######### Kafka #############
##################################################
kafka.bootstrap.servers = 127.0.0.1:9092
kafka.acks = all
kafka.compression.type = none
kafka.batch.size = 16384
kafka.linger.ms = 1
kafka.max.request.size = 1048576
kafka.buffer.memory = 33554432
kafka.max.in.flight.requests.per.connection = 1
kafka.retries = 0kafka.kerberos.enable = false
kafka.kerberos.krb5.file = "../conf/kerberos/krb5.conf"
kafka.kerberos.jaas.file = "../conf/kerberos/jaas.conf"##################################################
######### RocketMQ #############
##################################################
rocketmq.producer.group = test
rocketmq.enable.message.trace = false
rocketmq.customized.trace.topic =
rocketmq.namespace =
rocketmq.namesrv.addr = 127.0.0.1:9876
rocketmq.retry.times.when.send.failed = 0
rocketmq.vip.channel.enabled = false
rocketmq.tag = ##################################################
######### RabbitMQ #############
##################################################
# mq的ip地址 不可带端口号 加了端口号会报ipv6的错
rabbitmq.host = 127.0.0.1
# 就填个/就行
rabbitmq.virtual.host = /
# 填写mq队列相对应的交换机名称
rabbitmq.exchange = asyncdbcharge
# mq用户名
rabbitmq.username = admin
# mq密码
rabbitmq.password = !Abc@123
# 2 表示Durable
rabbitmq.deliveryMode = 2

instance.properties(canal实例属性配置) ------着重注意带中文描述的配置

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0# enable gtid use true/false
canal.instance.gtidon=false# position info 此处配置mysql主库的ip地址,带端口
canal.instance.master.address=127.0.0.1:3306
# 配置bin_log文件 如果配置了 只能获取指定的 不配置的话会默认获取当前 建议不配置
canal.instance.master.journal.name=
# 配置bin_log文件初始偏移量 不配置的话会默认获取当前 建议不配置
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=# table meta tsdb info
canal.instance.tsdb.enable= false
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=# username/password 用户名和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
# 格式UTF_8格式
canal.instance.connectionCharset=UTF-8
#canal.instance.defaultDatabaseName=maintaindb
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regex 表过滤 (.*\\..*)所有库所有表 (xxx\\..*)指定库所有表 (xxx\\.xxx指定库指定表) 如果多个用英文逗号隔开
canal.instance.filter.regex=.*\\..*
# table black regex #mysql\\.slave_.*
# canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
# canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config 此处配置mq中队列所配置的Key
canal.mq.topic = AsyncDB_Exchanges_Key
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#################################################

配置完成后,运行服务端项目。验证是否配置成功。
配置成功时,日志中canal日志没有错误,生成实例名称配置的文件夹,example会生成mate文件。
如果没出现类似描述,说明配置不成功

配置成功后,在主库中进行数据操作,并打开mq进行数据监测,如果mq中存在数据波动,说明服务端配置完毕

Canal客户端配置

Canal客户端配置也包含两部分。application.yml(应用配置)和rdb文件夹中xxx.yml配置(数据映射配置)

application.yml(应用配置)------着重注意带中文描述的配置
注:不要乱删,不要动排版,注释不要写在配置后面

server:port: 8081
spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_nullcanal.conf:#tcp kafka rocketMQ rabbitMQ 模式和canal服务端对应mode: rabbitMQ #接收数据的模式 和canal服务端对应 如果不对应 客户端消费数据是 只打日志 不执行数据操作flatMessage: truezookeeperHosts:syncBatchSize: 1000retries: 0timeout:accessKey:secretKey:consumerProperties:# canal tcp consumercanal.tcp.server.host: 127.0.0.1:11111canal.tcp.zookeeper.hosts:canal.tcp.batch.size: 500canal.tcp.username:canal.tcp.password:# kafka consumerkafka.bootstrap.servers: 127.0.0.1:9092kafka.enable.auto.commit: falsekafka.auto.commit.interval.ms: 1000kafka.auto.offset.reset: latestkafka.request.timeout.ms: 40000kafka.session.timeout.ms: 30000kafka.isolation.level: read_committedkafka.max.poll.records: 1000# rocketMQ consumerrocketmq.namespace:rocketmq.namesrv.addr: 127.0.0.1:9876rocketmq.batch.size: 1000rocketmq.enable.message.trace: falserocketmq.customized.trace.topic:rocketmq.access.channel:rocketmq.subscribe.filter:# rabbitMQ consumer mq服务IP地址 还是不带端口号rabbitmq.host: 127.0.0.1# 默认斜杠 /rabbitmq.virtual.host: /# mq用户名密码rabbitmq.username: guestrabbitmq.password: guest# 默认空rabbitmq.resource.ownerId:srcDataSources:defaultDS:# 默认数据库主库 数据库连接字符串 注意带上区时 否则可能会报超时错误url: jdbc:mysql://127.0.0.1:3306/maintaindb?useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf-8&autoReconnect=true# 主库用户名密码username: canalpassword: canalcanalAdapters:# canal实例名称 和canal服务端相对应- instance: testqueue # canal instance Name or mq topic namegroups:# 分组名称 随便取- groupId: testqueueouterAdapters:- name: logger- name: rdb# 填写mq队列的key 和instance.properties中canal.mq.topic配置一致key: testexchangekeyproperties:jdbc.driverClassName: com.mysql.jdbc.Driver# 从库链接 注意区时配置jdbc.url: jdbc:mysql://127.0.0.1:3306/subdb1?useUnicode=true&useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf-8&autoReconnect=true# 数据库名称和密码 带增删改权限jdbc.username: XXXjdbc.password: XXX
# - name: rdb
# key: oracle1
# properties:
# jdbc.driverClassName: oracle.jdbc.OracleDriver
# jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
# jdbc.username: mytest
# jdbc.password: m121212
# - name: rdb
# key: postgres1
# properties:
# jdbc.driverClassName: org.postgresql.Driver
# jdbc.url: jdbc:postgresql://localhost:5432/postgres
# jdbc.username: postgres
# jdbc.password: 121212
# threads: 1
# commitSize: 3000
# - name: hbase
# properties:
# hbase.zookeeper.quorum: 127.0.0.1
# hbase.zookeeper.property.clientPort: 2181
# zookeeper.znode.parent: /hbase
# - name: es
# hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
# properties:
# mode: transport # or rest
# # security.auth: test:123456 # only used for rest mode
# cluster.name: elasticsearch
# - name: kudu
# key: kudu
# properties:
# kudu.master.address: 127.0.0.1 # ',' split multi address

rdb文件夹中xxx.yml配置(数据映射配置)------着重注意带中文描述的配置

须知1:rdb文件夹中默认有一个yml文件,文件名称可以修改
须知2:rdb文件夹中默认有一个yml文件,仅可编辑一张表的映射关系
须知3:如果要实现多表映射,需要添加其他的yml文件
yml文件模板配置如下


# Mirror schema synchronize config 对应application.yml中的配置 默认为defaultDS,不需要调整
dataSourceKey: defaultDS
# application.yml中的instance配置(canal实例)
destination: testqueue
# application.yml中的分组名称
groupId: testqueue
# application.yml中的分组下的key
outerAdapterKey: testexchangekey
concurrent: true
dbMapping:
# mirrorDb: false# 主库数据库名称database: maintaindb# 主库表table: table1# 从库的目标表 不需要带数据库名称 否则会出现 库名.库名.表名的错targetTable: table1# id间主键映射 如果某表主库中主键为aaid,在从库中为bbid,则下面的id: id要更改为 aaid:bbid。此处配置失败,会出现目标id不匹配的错targetPk:id: id#映射关系 true 为全映射 false为非全映射,若为非全映射 则需要编辑下面的配置来进行字段映射mapAll: true
# targetColumns:
# id:
# name:
# role_id:
# c_time:
# test1:etlCondition: "where c_time>={}"commitBatch: 3000 # 批量提交的大小

配置完成后运行程序,检查mq中队列是否被消费,检查程序的控制台是否有日志输出,检查数据是否落入到数据库。

到此时,配置已经完成!!!