当前位置: 代码迷 >> 综合 >> Canal 使用
  详细解决方案

Canal 使用

热度:96   发布时间:2024-01-26 15:14:14.0

一.修改数据库配置

 1.修改my.cnf配置:

log-bin=mysql-bin #设置日志位置
binlog-format=ROW #设置日志模式,记录每条数据修改内容
server_id=6  #唯一不能和canal的slave一样,1.1.4版本以后无需配置,增加自增机制
#binlog-do-db= #制定可以同步的库,不写默认全部同步
#binlog-ignore-db= #忽略同步的库 

2.创建用户,授权复制权限,库表,允许访问地址权限

  创建canal用户:

          CREATE USER canal IDENTIFIED BY 'canal';    

  授权canal查询,复制binlog,操作所有库,所有表,可以用任何ip地址访问:

         GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; 

         *.*:所有库所有表    %:所有ip地址

  刷新权限用户:

         FLUSH PRIVILEGES;

查询用户是否授权

select u.User,u.Host,u.Repl_slave_priv,u.Repl_client_priv from mysql.user u

二.修canal配置

     下载地址:wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

     版本:1.1.4

     

canal.properties系统配置:

修改instance.properties实例配置

启动成功:

三.编写java测试代码

package com.zxy.loglearn.controller;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;import java.net.InetSocketAddress;
import java.util.List;/*** @USER: zhouxy* @DATE: 2020/1/15 15:32**/
public class TestMain {public static void main(String[] args) throws Exception {CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("10.3.130.16",11111), "example", "canal", "canal");int batchSize = 1000;int emptyCount = 0;System.out.println("开始链接");try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;printEntry(message.getEntries());}connector.ack(batchId); // 提交确认}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}private static void printEntry(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage = null;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}CanalEntry.EventType eventType = rowChage.getEventType();System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {if (eventType == CanalEntry.EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == CanalEntry.EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------&gt; before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------&gt; after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<CanalEntry.Column> columns) {for (CanalEntry.Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());}}
}

三.canal-admin

1.下载地址:wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz

    版本:1.1.4 与上相同

2.创建canal-admin对应数据库

3.启动查看是否成功

账号admin 密码:123456

四.canal-adapter: