当前位置: 代码迷 >> 综合 >> 使用 logstash 将 MySQL 数据以增量方式,同步到 ES 搜索引擎
  详细解决方案

使用 logstash 将 MySQL 数据以增量方式,同步到 ES 搜索引擎

热度:47   发布时间:2023-11-20 09:31:08.0

1.环境准备

  • elasticsearch-7.7.0
  • kibana-7.7.0
  • logstash-7.7.0
  • mysql-connector-java-5.1.13.jar

2.准备核心配置文件

2.1 查询的 sql 文件
SELECTa.id AS topicId,a.topic_code AS topicCode,a.`code` AS userCode,a.topic_title AS topicTitle,a.create_time AS createTime,a.update_time AS update_time,b.topic_comments AS topicContent,a.like_count AS topicLikeCount,a.comment_count AS topicCommentCount,a.browse_count AS topicBrowseCount 
FROMth_topic aLEFT JOIN ( SELECT * FROM th_topic_resources WHERE resource_type = 1 AND deleted = 1 ) b ON a.topic_code = b.topic_code 
WHEREa.deleted = 1 AND update_time >= : sql_last_value

说明:如果是以增量方式同步数据的话,则需要在 sql 最后加上 update_time >= : sql_last_value

2.2 logstash.conf 配置文件

logstash.conf截图在这里插入图片描述

input {stdin {}jdbc {# 索引类型type => "tihu_topic"# 设置 MySql/MariaDB 数据库url以及数据库名称jdbc_connection_string => "jdbc:mysql://localhost:3306/qp_th_baskstage_test?useAffectedRows=true&useUnicode=true&characterEncoding=utf8&allowMultiQueries=true&serverTimezone=UTC&useSSL=false&verifyServerCertificate=false&autoReconnct=true&autoReconnectForPools=true&allowPublicKeyRetrieval=true&zeroDateTimeBehavior=convertToNull"# 用户名和密码jdbc_user => "root"jdbc_password => "root"# 数据库驱动所在位置,可以是绝对路径或者相对路径jdbc_driver_library => "/usr/local/elasticsearch/logstash-7.7.0/sync/mysql-connector-java-5.1.41.jar"# 驱动类名jdbc_driver_class => "com.mysql.jdbc.Driver"jdbc_default_timezone => "Asia/Shanghai"# 开启分页jdbc_paging_enabled => "true"# 分页每页数量,可以自定义jdbc_page_size => "1000"# 执行的sql文件路径statement_filepath => "/usr/local/elasticsearch/logstash-7.7.0/sync/s-t.sql"# 设置定时任务间隔  含义:分、时、天、月、年,全部为*默认含义为每分钟跑一次任务schedule => "* * * * *"#record_last_run :记录上次执行时间record_last_run => true# 是否开启记录上次追踪的结果,也就是上次更新的时间,这个会记录到 last_run_metadata_path 的文件last_run_metadata_path => "/usr/local/elasticsearch/logstash-7.7.0/sync/log_last_time.txt"use_column_value => true# 如果 use_column_value 为true, 配置本参数,追踪的 column 名,可以是自增id或者时间##指定追踪的字段,在此我设置的追踪的字段为updateTime(这个是es的对应字段不是sql)tracking_column => "updateTime"# tracking_column 对应字段的类型tracking_column_type => "timestamp"# 是否清除 last_run_metadata_path 的记录,true则每次都从头开始查询所有的数据库记录clean_run => false# 数据库字段名称大写转小写lowercase_column_names => false}
}output {#如果上面的input中有多个同步的索引,则这里可以通过使用if[type]=="XXX"{}来指定使用哪个index;#如果只有一个索引,则下面的 if 判断可以省略if [type]=="tihu_topic" {elasticsearch {# es地址hosts => ["localhost:9200"]# 同步的索引名index => "th-topic"# 设置_docID和数据相同document_id => "%{topicId}"# document_id => "%{itemId}"}}# 日志输出stdout {codec => json_lines}
}

配置参数说明:

  • input 参数
statement_filepath:读取SQL语句位置
schedule :这里配置每分钟执行一次
type :类型,写入ES的标识
lowercase_column_names :字段是否转小写
record_last_run :记录上次执行时间
use_column_value :使用列的值
tracking_column :根据写入ES的updateTime字段区分增量数据
tracking_column_type :区分的字段类型
  • output 参数
hosts :ES服务地址
index :Index名称,类比理解数据库名称
document_type :Type名称,类比理解表名称
准备好上述2个文件后,我们在 logstash 下新建一个文件夹 sync ,把这两个文件连同准备好的 mysql 的 jar 包一起上传到 sync 中。

在这里插入图片描述

3.启动

  1. 在启动之前我们需要开启 elasticsearch;进入我们安装的 es 目录,进入 bin 目录,切换为 es 用户并 输入命令*./elasticsearch -d*
  2. 开启 es 之后,进入 logstash 目录下的 bin 目录,并执行以下命令
[root@iZ2zeepnt3ok05tr3d5qc4Z bin]#./logstash -f ../sync/logstash.conf

等待启动成功后就可以看到有日志成功输出啦
在这里插入图片描述

注意:通过logstash 同步mysql 数据到 ES 中时,必须在 ES 中提前创建好相应索引才能同步成功!!!

  相关解决方案