当前位置: 代码迷 >> 综合 >> RocketMQ Canal 相关笔记
  详细解决方案

RocketMQ Canal 相关笔记

热度:62   发布时间:2023-12-29 16:20:21.0

RocketMQ安装启动

解压后直接进bin启动

nohup sh mqnamesrv & 
tail -f ~/logs/rocketmqlogs/namesrv.log

broker配置conf

启动broker

nohup sh bin/mqbroker -n 192.168.1.73:9876 -c conf/broker.conf autoCreateTopicEnable=true &
tail -f ~/logs/rocketmqlogs/broker.log

查看
在这里插入图片描述

RocketMQ可视化界面

下载地址

https://github.com/apache/rocketmq-externals/tree/release-rocketmq-console-1.0.0/rocketmq-console

编译成jar包启动

编译成jar包

mvn clean package -Dmaven.test.skip=true

启动

java -jar rocketmq-console-ng-1.0.0.jar --server.port=8080 --rocketmq.config.namesrvAddr=192.168.171.129:9876

Canal 安装部署

官方文档非常好用:
https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

注意集成RocketMQ的配置项:在这里插入图片描述
注意MQ的配置,开启发送JSON格式数据,改为true
在这里插入图片描述

在RocketMQ可视化界面创建 topic

出现,RocketMQ 消息队列大量重复消息需配置 mysql-binlog
在这里插入图片描述
instance.properties中的配置
在这里插入图片描述

贴一下 canal.properties

在原配置文件追加配置

# ...
# 可选项: tcp(默认), kafka, RocketMQ
canal.serverMode = RocketMQ
# ...
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 
canal.mq.servers = 192.168.1.80:9876
canal.mq.retries = 0
# flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage模式下请将该值改大, 建议50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为flat json格式对象
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投递是否使用事务
canal.mq.transaction = false

instance.properties

# enable gtid use true/false
canal.instance.gtidon=false# position info
canal.instance.master.address= 192.168.1.12:3306
canal.instance.master.journal.name=mysql-bin.000005
canal.instance.master.position= 1081
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
/
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config
#################################################
## mysql serverId , v1.0.26+ will autoGen
# enable gtid use true/false
canal.instance.gtidon=false# position info
canal.instance.master.address= 192.168.1.12:3306
canal.instance.master.journal.name=mysql-bin.000005
canal.instance.master.position= 1081
canal.instance.master.timestamp=
canal.instance.master.gtid=
# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal
/
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=root
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config
#canal.mq.topic=test-wy
# dynamic topic route by schema or table regex
canal.mq.dynamicTopic=spark

可能用到的其他命令

show global variables like "%binlog_format%";

消费者(后续慢慢补充)

消费者使用内网内网内网!!!!