当前位置: 代码迷 >> 综合 >> k8s 上运行我们的 springboot 服务之——flume 同步数据到到 clickHouse
  详细解决方案

k8s 上运行我们的 springboot 服务之——flume 同步数据到到 clickHouse

热度:21   发布时间:2024-01-30 13:23:00.0

k8s 上运行我们的 springboot 服务之——flume同步数据到到clickHouse

clickHouse的简单介绍,详细介绍请查看官网或者百度

1)clickhouse非hadoop体系

2)使用sql语句,对于熟悉关系数据的人员入门相对简单

3)clickhouse最好用来读,不要用来变更,写用批量的方式

4)各种日志数据我们可以用flume同步到clickhouse来统一管理和做用户行为分析

5)mysql 增量同步到clickhouse,这里有一个思考:系统日志,交易日志,用户行为日志,已生成订单等不变的数据似乎可以同步到clickhouse来做报表、统计、数据分析等。

由于用户经常查询和操作一般都是最近的或者最新的数据,可以把这部分变更的有事务要求的数据放到mysql中。把mysql数据同步到clickhouse,近期最新和变更的数据在mysql中操作,其他大部分数据在clickhouse中操作,这样来减轻关系型数据库的性能瓶颈。

6)面对错综复杂的数据源我们似乎可以使用flink来把数据统一归集到clickhouse

以上都需要根据实际情况去测试使用。毕竟实践是检验真理的唯一标准

flume的简单介绍,详细介绍请查看官网或者百度

1)高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统

2)支持监听多种方式多种类型的文件或者文件目录数据的变更,以获得变更的数据并把这部分数据推送到不同的数据接收中间件

3)flume提供了多种插件来完成2中的需求,常用的例如:监听TCP的端口做为数据源,监听目录下日志文件的变更等

4)flume可以把变更的数据同步队列中,然后队列把数据分发到我们各种数据仓库中间件中,也可以不通过队列直接把数据同步存储到数据仓库中间件中

5)我们也可以自定义flume的ng来满足我们自己特殊的数据变更同步需求

本文主要讲解内容如下:

1、自定义实现flume ng

2、测试我们系统生成的日志通过1中实现的flume ng同步到 clickhouse

下面是核心代码:

1)在clickhouse中创建表:

<pre lang="text" data-origin="pm_code_preview" style="margin: 0px; padding: 0px; list-style: none; font-family: Courier, &quot;Courier New&quot;, monospace; display: block;">CREATE TABLE default.sys_log (`id` String,`sys_name` String,`level` String,`msg` String,`thread` String,`create_date` DateTime,`exe_date` DateTime) ENGINE = MergeTree()PARTITION BY toYYYYMM(create_date)ORDER BY exe_date;

2)自定义flume sink pom.xml

<pre lang="text" data-origin="pm_code_preview" style="margin: 0px; padding: 0px; list-style: none; font-family: Courier, &quot;Courier New&quot;, monospace; display: block;"><dependencies><!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-core --><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId></dependency><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-configuration --><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-configuration</artifactId></dependency><!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-sdk --><dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-sdk</artifactId></dependency><dependency><groupId>com.opencsv</groupId><artifactId>opencsv</artifactId><version>4.2</version></dependency><dependency><groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.2.4</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>
  1. flume 自定义sink 核心类
<pre lang="text" data-origin="pm_code_preview" style="margin: 0px; padding: 0px; list-style: none; font-family: Courier, &quot;Courier New&quot;, monospace; display: block;">private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseSink.class);private BalancedClickhouseDataSource dataSource = null;private SinkCounter sinkCounter = null;private String host = null;private String port = null;private String user = null;private String password = null;private String database = null;private String table = null;private int batchSize;@Overridepublic Status process() throws EventDeliveryException {Status status = null;Channel ch = getChannel();Transaction txn = ch.getTransaction();txn.begin();List<LogLogbackVo> insertData = new ArrayList<>();try {ClickHouseConnectionImpl conn = (ClickHouseConnectionImpl) dataSource.getConnection();int count;for (count = 0; count < batchSize; ++count) {Event event = ch.take();if (event == null) {break;}insertData.add(StringUtil.buildLog(new String(event.getBody())));}if (count <= 0) {sinkCounter.incrementBatchEmptyCount();txn.commit();return Status.BACKOFF;} else if (count < batchSize) {sinkCounter.incrementBatchUnderflowCount();} else {sinkCounter.incrementBatchCompleteCount();}sinkCounter.addToEventDrainAttemptCount(count);ClickHouseStatement sth = conn.createStatement();sth.write().table(String.format(" %s.%s", database, table)).data(new ByteArrayInputStream(JsonUtil.t2JsonString(insertData).getBytes()), ClickHouseFormat.JSONEachRow).addDbParam(ClickHouseQueryParam.MAX_PARALLEL_REPLICAS, MAX_PARALLEL_REPLICAS_VALUE).send();sinkCounter.incrementEventDrainSuccessCount();status = Status.READY;txn.commit();} catch (Throwable t) {txn.rollback();LOGGER.error(t.getMessage(), t);status = Status.BACKOFF;// re-throw all Errorsif (t instanceof Error) {throw (Error) t;}} finally {txn.close();}return status;}@Overridepublic void configure(Context context) {if (sinkCounter == null) {sinkCounter = new SinkCounter(getName());}Preconditions.checkArgument(context.getString(HOST) != null && context.getString(HOST).length() > 0, "ClickHouse host must be specified!");this.host = context.getString(HOST);if (!this.host.startsWith(CLICK_HOUSE_PREFIX)) {this.host = CLICK_HOUSE_PREFIX + this.host;}Preconditions.checkArgument(context.getString(DATABASE) != null && context.getString(DATABASE).length() > 0, "ClickHouse database must be specified!");this.database = context.getString(DATABASE);Preconditions.checkArgument(context.getString(TABLE) != null && context.getString(TABLE).length() > 0, "ClickHouse table must be specified!");this.table = context.getString(TABLE);this.port = context.getString(PORT, DEFAULT_PORT);this.user = context.getString(USER, DEFAULT_USER);this.password = context.getString(PASSWORD, DEFAULT_PASSWORD);this.batchSize = context.getInteger(BATCH_SIZE, DEFAULT_BATCH_SIZE);}@Overridepublic void start() {LOGGER.info("clickHouse sink {} starting", getName());String jdbcUrl = String.format("%s:%s/%s", this.host, this.port, this.database);ClickHouseProperties properties = new ClickHouseProperties().withCredentials(this.user, this.password);this.dataSource = new BalancedClickhouseDataSource(jdbcUrl, properties);sinkCounter.start();super.start();LOGGER.info("clickHouse sink {} started", getName());}@Overridepublic void stop() {LOGGER.info("clickHouse sink {} stopping", getName());sinkCounter.incrementConnectionClosedCount();sinkCounter.stop();super.stop();LOGGER.info("clickHouse sink {} stopped", getName());}

4)flume 对应配置:

<pre lang="text" data-origin="pm_code_preview" style="margin: 0px; padding: 0px; list-style: none; font-family: Courier, &quot;Courier New&quot;, monospace; display: block;"># 指定Agent的组件名称 a1.sources = r1 a1.sinks = sink1 a1.channels = c1 a1.sources.r1.type=spooldira1.sources.r1.spoolDir=/home/spark/flume/data/loga1.sources.r1.channels=c1a1.sources.r1.fileHeader = falsea1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = timestamp# 指定Flume sink a1.sinks.sink1.type = com.zhy.frame.newsql.clickhouse.sink.sink.ClickHouseSinka1.sinks.sink1.host = localhosta1.sinks.sink1.port = 8123a1.sinks.sink1.database = defaulta1.sinks.sink1.table = sys_loga1.sinks.sink1.batchSize = 10000a1.sinks.sink1.user = defaulta1.sinks.sink1.password = # 指定Flume channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 绑定source和sink到channel上 a1.sources.r1.channels = c1 a1.sinks.sink1.channel = c1

5)系统logback配置

<pre lang="text" data-origin="pm_code_preview" style="margin: 0px; padding: 0px; list-style: none; font-family: Courier, &quot;Courier New&quot;, monospace; display: block;"><?xml version="1.0" encoding="UTF-8"?><!-- 日志级别从低到高分为TRACE < DEBUG < INFO < WARN < ERROR < FATAL,如果设置为WARN,则低于WARN的信息都不会输出 --><!-- scan:当此属性设置为true时,配置文件如果发生改变,将会被重新加载,默认值为true --><!-- scanPeriod:设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,默认单位是毫秒。当scan为true时,此属性生效。默认的时间间隔为1分钟。 --><!-- debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。默认值为false。 --><configuration scan="true" scanPeriod="7 seconds"><contextName>logback</contextName><!--系统名称--><property name="sysName" value="frameSimple"/><!--日志大小 KB MB--><property name="logMaxFileSize" value="1KB"/><!--日志保留天数--><property name="logMaxHistory" value="7"/><property name="logging.path" value="sysLog"/><!-- 彩色日志 --><!-- 彩色日志依赖的渲染类 --><conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter"/><conversionRule conversionWord="wex"converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter"/><conversionRule conversionWord="wEx"converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter"/><!-- 彩色日志格式 --><property name="CONSOLE_LOG_PATTERN"value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/><!--输出到控制台--><appender name="CONSOLE_APPENDER" class="ch.qos.logback.core.ConsoleAppender"><!--此日志appender是为开发使用,只配置最底级别,控制台输出的日志级别是大于或等于此级别的日志信息--><filter class="ch.qos.logback.classic.filter.ThresholdFilter"><level>info</level></filter><encoder><Pattern>${CONSOLE_LOG_PATTERN}</Pattern><!-- 设置字符集 --><charset>UTF-8</charset></encoder></appender><!--输出到文件--><!-- 时间滚动输出 level为 DEBUG 日志 --><appender name="DEBUG_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender"><!-- 正在记录的日志文件的路径及文件名 --><file>${logging.path}/debug_${sysName}.log</file><!--日志文件输出格式--><encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder"><jsonGeneratorDecoratorclass="net.logstash.logback.decorate.FeatureJsonGeneratorDecorator"/><providers><pattern><pattern>{"sysName":"${sysName}","thread":"%thread","exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}","level":"%level","msg": "%msg"}</pattern></pattern></providers></encoder><!-- 日志记录器的滚动策略,按日期,按大小记录 --><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><!-- 日志归档 --><fileNamePattern>${logging.path}/debug/debug_${sysName}_%d{yyyy-MM-dd}.%i.log</fileNamePattern><timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"><maxFileSize>${logMaxFileSize}</maxFileSize></timeBasedFileNamingAndTriggeringPolicy><!--日志文件保留天数--><maxHistory>${logMaxHistory}</maxHistory></rollingPolicy><!-- 此日志文件只记录debug级别的 --><filter class="ch.qos.logback.classic.filter.LevelFilter"><level>DEBUG</level><onMatch>ACCEPT</onMatch><onMismatch>DENY</onMismatch></filter></appender><!-- 时间滚动输出 level为 INFO 日志 --><appender name="INFO_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender"><!-- 正在记录的日志文件的路径及文件名 --><file>${logging.path}/info_${sysName}.log</file><encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder"><jsonGeneratorDecoratorclass="net.logstash.logback.decorate.FeatureJsonGeneratorDecorator"/><providers><pattern><pattern>{"sysName":"${sysName}","thread":"%thread","exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}","level":"%level","msg": "%msg"}</pattern></pattern></providers></encoder><!-- 日志记录器的滚动策略,按日期,按大小记录 --><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><!-- 每天日志归档路径以及格式 --><fileNamePattern>${logging.path}/info/info_${sysName}_%d{yyyy-MM-dd}.%i.log</fileNamePattern><timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"><maxFileSize>${logMaxFileSize}</maxFileSize></timeBasedFileNamingAndTriggeringPolicy><!--日志文件保留天数--><maxHistory>${logMaxHistory}</maxHistory></rollingPolicy><!-- 此日志文件只记录info级别的 --><filter class="ch.qos.logback.classic.filter.LevelFilter"><level>INFO</level><onMatch>ACCEPT</onMatch><onMismatch>DENY</onMismatch></filter></appender><!-- 时间滚动输出 level为 WARN 日志 --><appender name="WARN_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender"><!-- 正在记录的日志文件的路径及文件名 --><file>${logging.path}/warn_${sysName}.log</file><!--日志文件输出格式--><encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder"><jsonGeneratorDecoratorclass="net.logstash.logback.decorate.FeatureJsonGeneratorDecorator"/><providers><pattern><pattern>{"sysName":"${sysName}","thread":"%thread","exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}","level":"%level","msg": "%msg"}</pattern></pattern></providers></encoder><!-- 日志记录器的滚动策略,按日期,按大小记录 --><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><fileNamePattern>${logging.path}/warn/warn_${sysName}_%d{yyyy-MM-dd}.%i.log</fileNamePattern><timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"><maxFileSize>${logMaxFileSize}</maxFileSize></timeBasedFileNamingAndTriggeringPolicy><!--日志文件保留天数--><maxHistory>${logMaxHistory}</maxHistory></rollingPolicy><!-- 此日志文件只记录warn级别的 --><filter class="ch.qos.logback.classic.filter.LevelFilter"><level>WARN</level><onMatch>ACCEPT</onMatch><onMismatch>DENY</onMismatch></filter></appender><!-- 时间滚动输出 level为 ERROR 日志 --><appender name="ERROR_APPENDER" class="ch.qos.logback.core.rolling.RollingFileAppender"><!-- 正在记录的日志文件的路径及文件名 --><file>${logging.path}/error_${sysName}.log</file><!--日志文件输出格式--><encoder class="net.logstash.logback.encoder.LoggingEventCompositeJsonEncoder"><jsonGeneratorDecoratorclass="net.logstash.logback.decorate.FeatureJsonGeneratorDecorator"/><providers><pattern><pattern>{"sysName":"${sysName}","thread":"%thread","exeDate": "%date{yyyy-MM-dd HH:mm:ss.SSS}","level":"%level","msg": "%msg"}</pattern></pattern></providers></encoder><!-- 日志记录器的滚动策略,按日期,按大小记录 --><rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy"><fileNamePattern>${logging.path}/error/error_${sysName}_%d{yyyy-MM-dd}.%i.log</fileNamePattern><timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP"><maxFileSize>${logMaxFileSize}</maxFileSize></timeBasedFileNamingAndTriggeringPolicy><!--日志文件保留天数--><maxHistory>${logMaxHistory}</maxHistory></rollingPolicy><!-- 此日志文件只记录ERROR级别的 --><filter class="ch.qos.logback.classic.filter.LevelFilter"><level>ERROR</level><onMatch>ACCEPT</onMatch><onMismatch>DENY</onMismatch></filter></appender><root level="INFO"><appender-ref ref="CONSOLE_APPENDER"/><appender-ref ref="DEBUG_APPENDER"/><appender-ref ref="INFO_APPENDER"/><appender-ref ref="WARN_APPENDER"/><appender-ref ref="ERROR_APPENDER"/></root></configuration>