一.修改数据库配置
1.修改my.cnf配置:
log-bin=mysql-bin #设置日志位置
binlog-format=ROW #设置日志模式,记录每条数据修改内容
server_id=6 #唯一不能和canal的slave一样,1.1.4版本以后无需配置,增加自增机制
#binlog-do-db= #制定可以同步的库,不写默认全部同步
#binlog-ignore-db= #忽略同步的库
2.创建用户,授权复制权限,库表,允许访问地址权限
创建canal用户:
CREATE USER canal IDENTIFIED BY 'canal';
授权canal查询,复制binlog,操作所有库,所有表,可以用任何ip地址访问:
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
*.*:所有库所有表 %:所有ip地址
刷新权限用户:
FLUSH PRIVILEGES;
查询用户是否授权
select u.User,u.Host,u.Repl_slave_priv,u.Repl_client_priv from mysql.user u
二.修canal配置
下载地址:wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
版本:1.1.4
canal.properties系统配置:
修改instance.properties实例配置
启动成功:
三.编写java测试代码
package com.zxy.loglearn.controller;import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;import java.net.InetSocketAddress;
import java.util.List;/*** @USER: zhouxy* @DATE: 2020/1/15 15:32**/
public class TestMain {public static void main(String[] args) throws Exception {CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("10.3.130.16",11111), "example", "canal", "canal");int batchSize = 1000;int emptyCount = 0;System.out.println("开始链接");try {connector.connect();connector.subscribe(".*\\..*");connector.rollback();int totalEmptyCount = 120;while (emptyCount < totalEmptyCount) {Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据long batchId = message.getId();int size = message.getEntries().size();if (batchId == -1 || size == 0) {emptyCount++;System.out.println("empty count : " + emptyCount);try {Thread.sleep(1000);} catch (InterruptedException e) {}} else {emptyCount = 0;printEntry(message.getEntries());}connector.ack(batchId); // 提交确认}System.out.println("empty too many times, exit");} finally {connector.disconnect();}}private static void printEntry(List<CanalEntry.Entry> entrys) {for (CanalEntry.Entry entry : entrys) {if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {continue;}CanalEntry.RowChange rowChage = null;try {rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),e);}CanalEntry.EventType eventType = rowChage.getEventType();System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType));for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {if (eventType == CanalEntry.EventType.DELETE) {printColumn(rowData.getBeforeColumnsList());} else if (eventType == CanalEntry.EventType.INSERT) {printColumn(rowData.getAfterColumnsList());} else {System.out.println("-------> before");printColumn(rowData.getBeforeColumnsList());System.out.println("-------> after");printColumn(rowData.getAfterColumnsList());}}}}private static void printColumn(List<CanalEntry.Column> columns) {for (CanalEntry.Column column : columns) {System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());}}
}
三.canal-admin
1.下载地址:wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
版本:1.1.4 与上相同
2.创建canal-admin对应数据库
3.启动查看是否成功
账号admin 密码:123456
四.canal-adapter: