当前位置: 代码迷 >> 综合 >> Canal-实时同步MySQL与ES数据
  详细解决方案

Canal-实时同步MySQL与ES数据

热度:12   发布时间:2023-12-27 16:14:49.0

Canal-实时同步MySQL与ES数据

  • Canal-实时同步MySQL与ES数据
    • 安装步骤
      • 1. MySQL安装
        • 1.1 Docker安装MySQL
        • 1.2 配置MySQL
        • 1.3 验证Binlog是否开启
        • 1.4 创建用户
      • 2. ES安装
        • ES 操作
      • 3. ES-Head
      • 4. Kibana
      • 5. Canal安装
        • 5.1 安装Canal Server
        • 5.2 安装Canal Adapter

Canal-实时同步MySQL与ES数据

Canal的工作原理就是把自己伪装成MySQL Slave,模拟MySQL Slave的交互协议向MySQL Mater发送 Dump协议,MySQL Mater收到Canal发送过来的Dump请求,开始推送Binlog给Canal,然后Canal解析Binlog,再发送到存储目的地,比如MySQL,Kafka,Elastic Search等等。

安装步骤

1. MySQL安装

1.1 Docker安装MySQL

mkdir -p mysql/conf
docker run -p 3306:3306 --name mysql -v ${pwd}/mysql/conf:/etc/mysql/conf.d -e MYSQL_ROOT_PASSWORD=root -d mysql
# docker cp mysql:/etc/mysql/my.cnf ${pwd}/mysql/conf

1.2 配置MySQL

cat > ${pwd}/mysql/conf/my.cnf << EOF [mysqld] # 打开Binlog log-bin = mysql-bin # 选择ROW(行)模式 binlog-format = ROW # MySQL Replication定义 server_id = 1001 EOF
docker restart mysql

注意:service_id范围为1到232?12^{32}-1232?1

1.3 验证Binlog是否开启

[root@192 ~]# docker exec -it mysql bash
root@10eaf43f300e:/# mysql -uroot -proot
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin       | ON    |
+---------------+-------+
1 row in set (0.00 sec)
mysql> show master status;
+------------------+----------+--------------+------------------+-------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin.000001 |      156 |              |                  |                   |
+------------------+----------+--------------+------------------+-------------------+
1 row in set (0.00 sec)

1.4 创建用户

# 创建用户 用户名:canal 密码:Canal@123456
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
# 授权 *.*表示所有库
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
# 刷新并应用
FLUSH PRIVILEGES;
create database canal;
use canal;
create table users (id int auto_increment,name varchar(64),primary key(id)
);
insert into users(name) values('zhangsan'),('lisi');

2. ES安装

docker pull elasticsearch:7.17.1
mkdir -p /root/elasticsearch/config
mkdir -p /root/elasticsearch/data
chmod -R 777 /root/elasticsearch
echo "http.host: 0.0.0.0 http.cors.enabled: true http.cors.allow-origin: \"*\"" > /root/elasticsearch/config/elasticsearch.yml
docker run --name elasticsearch -p 9200:9200  -p 9300:9300 \-e "discovery.type=single-node" \-e ES_JAVA_OPTS="-Xms84m -Xmx512m" \-v /root/elasticsearch/config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \-v /root/elasticsearch/data:/usr/share/elasticsearch/data \-v /root/elasticsearch/plugins:/usr/share/elasticsearch/plugins \--restart=always -d elasticsearch:7.17.1

ES 操作

### 创建索引
PUT http://192.168.0.131:9200/users_index
Content-Type: application/json{
    "mappings": {
    "properties": {
    "id": {
    "type": "keyword"},"name": {
    "type": "text"}}}
}### 插入数据
POST http://192.168.0.131:9200/users_index/_doc
Content-Type: application/json{
    "id": 1,"name": "zhangsan"
}### 查询数据
GET http://192.168.0.131:9200/users_index/_search
Content-Type: application/json{
    "query": {
    "constant_score": {
    "filter": {
    "term": {
     "id": "1" }}}}
}

3. ES-Head

docker run --restart=always --name elasticsearch-head -di -p 9100:9100 mobz/elasticsearch-head:5-alpine
docker exec -it -u root elasticsearch-head shvi _site/vendor.js# 6886行
contentType: "application/x-www-form-urlencoded",
# 改成
contentType: "application/json;charset=UTF-8",
# 7573行
var inspectData = s.contentType === "application/x-www-form-urlencoded" &&
# 改成
var inspectData = s.contentType === "application/json;charset=UTF-8" &&docker restart elasticsearch-head

4. Kibana

docker pull kibana:7.17.1
mkdir -p /root/kibana    # 创建挂载配置目录
chmod 777 /root/kibana   # 赋值读写执行权限
echo "server.host: 0.0.0.0 elasticsearch.hosts: http://192.168.0.131:9200" > /root/kibana/kibana.yml
docker run --name kibana --restart=always -v /root/kibana/kibana.yml:/usr/share/kibana/config/kibana.yml -p 5601:5601 -d kibana:7.17.1 

5. Canal安装

5.1 安装Canal Server

# 创建Canal配置文件位置
mkdir -p /root/canal/conf
# 启动Canal-Server
docker run --name canal-server -d canal/canal-server
# 复制容器中的配置文件到本地
docker cp canal-server:/home/admin/canal-server/conf/canal.properties /root/canal/conf
docker cp canal-server:/home/admin/canal-server/conf/example/instance.properties /root/canal/conf
# 删除Canal-Server容器
docker stop canal-server && docker rm canal-server

修改配置

vi /root/canal/conf/instance.properties

在这里插入图片描述

# 重新启动Canal-Server容器
docker run --name canal-server --restart=always -p 11111:11111 \
-v /root/canal/conf/instance.properties:/home/admin/canal-server/conf/example/instance.properties \
-v /root/canal/conf/canal.properties:/home/admin/canal-server/conf/canal.properties \
-d canal/canal-server

5.2 安装Canal Adapter

mkdir -p /root/canal/adapter/conf/es7
cat > /root/canal/adapter/conf/application.yml << EOF server:port: 8081 spring:jackson:date-format: yyyy-MM-dd HH:mm:sstime-zone: GMT+8default-property-inclusion: non_null canal.conf:mode: tcpflatMessage: truezookeeperHosts:syncBatchSize: 1000retries: 0timeout:accessKey:secretKey:consumerProperties:# canal tcp consumercanal.tcp.server.host: 192.168.0.131:11111canal.tcp.batch.size: 500canal.tcp.username:canal.tcp.password:srcDataSources:defaultDS:url: jdbc:mysql://192.168.0.131:3306/canal?useUnicode=trueusername: canalpassword: canalcanalAdapters:- instance: example # canal instance Name or mq topic namegroups:- groupId: g1outerAdapters:- name: logger- name: es7 # 该版本发现只能是es7/es6hosts: 192.168.0.131:9200 # 127.0.0.1:9300 for rest transportproperties:mode: rest # or transportcluster.name: elasticsearch EOF

cluster.name通过访问http://192.168.0.131:9200获取

cat > /root/canal/adapter/conf/es7/users_index.yml << EOF dataSourceKey: defaultDS destination: example groupId: g1 esMapping:_index: users_index_id: _id_type: _docupsert: true# pk: idsql: "SELECT a.id as _id, a.name as name from users a "# objFields:# _labels: array:;# etlCondition: "where a.>={}"commitBatch: 3000 EOF
docker run --name canal-adapter --restart=always -p 8081:8081 -v /root/canal/adapter/conf:/opt/canal-adapter/conf  -d slpcat/canal-adapter:v1.1.5
  相关解决方案