当前位置: 代码迷 >> 综合 >> Spark学习笔记(五)——Flume日志收集、Flume安装配置、工作流程、avro source、taildir source、HDFS sink、拦截器(Interceptors)
  详细解决方案

Spark学习笔记(五)——Flume日志收集、Flume安装配置、工作流程、avro source、taildir source、HDFS sink、拦截器(Interceptors)

热度:76   发布时间:2024-02-12 23:48:12.0

Flume

  • Flume日志收集
    • 一、Apache Flume简介
    • 二、Flume架构
    • 三、Flume安装配置
      • 1、安装
      • 2、首次测试
    • 四、Flume学习
      • 1、Flume工作流程
      • 2、Flume构成
    • 五、Source
      • 1、exec source
      • 2、spooling directory source
      • 3、http source
      • 4、avro source
      • 5、taildir source
    • 六、channel
    • 七、Sink
      • 1、avro sink
      • 2、HDFS sink
      • 3、hive sink
    • 八、Flume Sink组
    • 九、拦截器(Interceptors)

Flume日志收集

Flume配置参照官方文档:http://flume.apache.org/

中文版:https://flume.liyifeng.org/

一、Apache Flume简介

  • Flume用于将多种来源的日志以流的方式传输至Hadoop或者其它目的地
    • 一种可靠、可用的高效分布式数据收集服务
  • Flume拥有基于数据流上的简单灵活架构,支持容错、故障转移与恢复

二、Flume架构

  • Client:客户端,数据产生的地方,如Web服务器
  • Event:事件,指通过Agent传输的单个数据包,如日志数据通常对应一行数据
  • Agent:代理,一个独立的JVM进程
    • Flume以一个或多个Agent部署运行
    • Agent包含三个组件
      • Source
      • Channel
      • Sink

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ovpFYHpd-1597923171204)(D:%5Csoftware%5Ctypora%5Cimg%5Cimage-20200804161826509.png)]

三、Flume安装配置

1、安装

(1)首先安装netcat:yum install nmap-ncat.x86_64 -y

(2)安装Flume

解压移动

[root@zjw opt]# tar -zxf flume-ng-1.6.0-cdh5.14.2.tar.gz [root@zjw opt]# mv apache-flume-1.6.0-cdh5.14.2-bin/ soft/flume160

配置:

[root@zjw conf]# cp flume-env.sh.template flume-env.sh [root@zjw conf]# vi flume-env.sh

//添加配置
export JAVA_HOME=/opt/soft/jdk180

配置环境变量:

# FLUME_HOME export FLUME_HOME=/opt/soft/flume export PATH=$PATH:$FLUME_HOME/bin

验证安装:

[root@zjw conf]# flume-ng version
Flume 1.6.0-cdh5.14.2
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 50436774fa1c7eaf0bd9c89ac6ee845695fbb687
Compiled by jenkins on Tue Mar 27 13:55:10 PDT 2018
From source with checksum 30217fe2b34097676ff5eabb51f4a11d

2、首次测试

官网指南:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html

创建测试配置文件夹

[root@zjw conf]# mkdir /opt/soft/flume160/flumeconf

编写配置:

[root@zjw flumeconf]# vi conf_0808_simple.properties

# example.conf: A single-node Flume configuration# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

进入另一个界面运行:

启动监控:

[root@zjw ~]# flume-ng agent -n a1 -c conf -f /opt/soft/flume160/flumeconf/conf_0808_simple.properties -Dflume.root.logger=INFO,console

启动netcat:

[root@zjw flumeconf]# nc -l 6666

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eny50plL-1597923171207)(../../../software/typora/img/image-20200804173510913.png)]

当在netcat中输出时会在flume中监控到输入流

四、Flume学习

1、Flume工作流程

  • Source->Channel
    • 主动模式
    • 数据由Source发送
  • Channel->Sink
    • 数据由Sink拉取

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kLbVxBP5-1597923171208)(../../../software/typora/img/image-20200817093415327.png)]

flume的主要组件包括:

Source,SourceRunner,Interceptor,Channel,ChannelSelector,ChannelProcessor,Sink,SinkRunner,SinkProcessor,SinkSelector等

工作流程包含两个部分:

source->channel,数据由source写入channel,主动模式,主要步骤如下:
一个SourceRunner包含一个Source对象,一个Source对象包含一个ChannelProcessor对象,一个ChannelProcessor对象包含多个Interceptor对象和一个ChannelSelector对象
1)SourceRunner启动Source,Source接收Event
2) Source调用ChannelProcessor
3)ChannelProcessor调用Interceptor进行过滤Event操作
4)ChannelProcessor调用ChannelSelector对象根据配置的策略选择Event对应的Channel(replication和multiplexing两种)
5)Source将Event发送到对应的Channel中
channel->sink,数据由sink主动从channel中拉取(将压力分摊到sink,这一点类似于kafka的consumer)
一个SinkRunner对象包含一个SinkProcessor对象,一个SinkProcessor包含多个Sink或者一个SinkSelector
1)SinkRunner启动SinkProcessor(DefaultSinkProcessor,FailoverSinkProcessor,LoadBalancingSinkProcessor 3种)
2)如果是DefaultSinkProcessor的话,直接启动单个Sink
3)FailoverSinkProcessor,LoadBalancingSinkProcessor对应的是SinkGroup
4)FailoverSinkProcessor从SinkGroup中选择出Sink并启动
5)LoadBalancingSinkProcessor包含SinkSelector,会根据SinkSelector在SinkGroup中选择Sink并启动
6)Sink 从Channel中消费Event信息

2、Flume构成

  • Source
  • SourceRunner
  • Interceptor
  • Channel
  • ChannelSelector
  • ChannelProcessor
  • Sink
  • SinkRunner
  • SinkProcessor
  • SinkSelector

五、Source

1、exec source

  • 执行Linux指令,并消费指令返回的结果,如“tail -f”
属性 默认值 描述
channels -
type - 组件类型名称,需要被执行
command - 要执行的命令
shell - 用于运行命令的shell程序调用。例如/ bin / sh -c。仅对于依赖于shell功能(如通配符,反斜杠,管道等)的命令才需要。
restartThrottle 10000 尝试重新启动之前等待的时间(以毫秒为单位)
restart false 如果执行的cmd停止运行,是否应该重新启动
logStdErr false 是否应记录命令的stderr
batchSize 20 一次读取并发送到通道的最大行数
batchTimeout 3000 如果未达到缓冲区大小,则等待时间(以毫秒为单位),然后将数据推送到下游
selector.type replicating 复制或多路复用
selector.* 取决于selector.type值
interceptors - 以空格分隔的拦截器列表
interceptors.*

测试:

a1.sources = s1
a1.channels = c1
a1.sinks = sk1#设置Source为exec 
a1.sources.s1.type = exec
a1.sources.s1.command = tail -f /opt/data/exectest.txt#source和channel连接
a1.sources.s1.channels = c1
a1.channels.c1.type = memory#指定Sink
a1.sinks.sk1.type = logger
#sink和channel进行连接
a1.sinks.sk1.channel = c1a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
flume-ng agent -f conf_hello_flume.properties -n a1 -Dflume.root.logger=INFO,console

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nsMKXnbw-1597923171209)(../../../software/typora/img/image-20200817102235760.png)]

2、spooling directory source

  • 从磁盘文件夹中获取文件数据,可避免重启或者发送失败后数据丢失,还可用于监控文件夹新文件
属性 缺省值 描述
type - spooldir
spoolDir - 需读取的文件夹
fileSuffix .COMPLETED 文件读取完成后添加的后缀
deletePolicy never 文件完成后删除策略:never和immediate
a1.sources = s1
a1.channels = c1
a1.sinks = sk1#设置Source为spooldir 
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /opt/data/spooldirTest#source和channel连接
a1.sources.s1.channels = c1
a1.channels.c1.type = memory#指定Sink
a1.sinks.sk1.type = logger
#sink和channel进行连接
a1.sinks.sk1.channel = c1a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
[root@zjw flumeconf]# flume-ng agent -f spoolDir.properties -n a1 -Dflume.root.logger=INFO,console

将文件拖至文件夹:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FZwAWnny-1597923171211)(../../../software/typora/img/image-20200817101945179.png)]

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YtIIz3AI-1597923171212)(../../../software/typora/img/image-20200817101954021.png)]

3、http source

  • 用于接收HTTP的Get和Post请求
属性 缺省值 描述
type - http
port - 监听端口
bind 0.0.0.0 绑定IP
handler org.apache.flume.source.http.JSONHandler 数据处理程序类全名
a1.sources = s1
a1.channels = c1
a1.sinks = sk1#设置Source为http
a1.sources.s1.type = http
a1.sources.s1.port = 5140#source和channel连接
a1.sources.s1.channels = c1
a1.channels.c1.type = memory#指定Sink
a1.sinks.sk1.type = logger
#sink和channel进行连接
a1.sinks.sk1.channel = c1a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
[root@zjw flumeconf]# flume-ng agent -f http.properties -n a1 -Dflume.root.logger=INFO,console

然后在另一个界面输出:

[root@zjw ~]# curl -XPOST localhost:5140 -d '[{"headers":{"h1":"v1","h2":"v2"},"body":"hello flume"}]'

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-t9dVXPMB-1597923171213)(../../../software/typora/img/image-20200817104233441.png)]

4、avro source

  • 监听Avro端口,并从外部Avro客户端接收events
属性 默认值 描述
channels - 与Source绑定的channel,多个用空格分开
type - 组件类型,这个是: avro
bind - 监听的服务器名hostname或者ip
port - 监听的端口
threads - 生成的最大工作线程数量
selector.type 可选值:replicatingmultiplexing ,分别表示: 复制、多路复用
selector.* channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同
interceptors - 该source所使用的拦截器,多个用空格分开
interceptors.* 拦截器的相关属性
compression-type none 可选值: nonedeflate 。这个类型必须跟Avro Source相匹配
ssl false 设置为 true 可启用SSL加密,如果为true必须指定下面的 keystorekeystore-password
keystore - SSL加密使用的Java keystore文件路径
keystore-password - Java keystore的密码
keystore-type JKS Java keystore的类型. 可选值有 JKSPKCS12
exclude-protocols SSLv3 指定不支持的协议,多个用空格分开,SSLv3不管是否配置都会被强制排除
ipFilter false 设置为true可启用ip过滤(netty方式的avro)
ipFilterRules - netty ipFilter的配置(参考下面的ipFilterRules详细介绍和例子)

5、taildir source

  • 可实时监控一批文件,并记录每个文件最新消费位置,agent进程重启后不会有重复消费的问题。

用于服务宕机了,重启服务之后会在上次记录的节点继续读。

属性名 默认值 描述
channels - 与Source绑定的channel,多个用空格分开
type - 组件类型,这个是: TAILDIR.
filegroups - 被监控的文件夹目录集合,这些文件夹下的文件都会被监控,多个用空格分隔
filegroups. - 被监控文件夹的绝对路径。正则表达式(注意不会匹配文件系统的目录)只是用来匹配文件名
positionFile ~/.flume/taildir_position.json 用来设定一个记录每个文件的绝对路径和最近一次读取位置inode的文件,这个文件是JSON格式。
headers.. - 给某个文件组下的Event添加一个固定的键值对到header中,值就是value。一个文件组可以配置多个键值对。
byteOffsetHeader false 是否把读取数据行的字节偏移量记录到Event的header里面,这个header的key是byteoffset
skipToEnd false 如果在 positionFile 里面没有记录某个文件的读取位置,是否直接跳到文件末尾开始读取
idleTimeout 120000 关闭非活动文件的超时时间(毫秒)。如果被关闭的文件重新写入了新的数据行,会被重新打开
writePosInterval 3000 positionFile 记录文件的读取位置的间隔时间(毫秒)
batchSize 100 一次读取数据行和写入channel的最大数量,通常使用默认值就很好
backoffSleepIncrement 1000 在最后一次尝试未发现任何新数据时,重新尝试轮询新数据之前的时间延迟增量(毫秒)
maxBackoffSleep 5000 每次重新尝试轮询新数据时的最大时间延迟(毫秒)
cachePatternMatching true 对于包含数千个文件的目录,列出目录并应用文件名正则表达式模式可能非常耗时。 缓存匹配文件列表可以提高性能。 消耗文件的顺序也将被缓存。 要求文件系统支持以至少秒级跟踪修改时间。
fileHeader false 是否在header里面存储文件的绝对路径
fileHeaderKey file 文件的绝对路径存储到header里面使用的key
a1.sources = s1
a1.channels = c1
a1.sinks = sk1#设置Source为taildir
a1.sources.s1.type =TAILDIR
a1.sources.s1.filegroups = f1 f2
# 配置filegroups的f1
a1.sources.s1.filegroups.f1 = /opt/data/tail_1/example.log
# 配置filegroups的f2
a1.sources.s1.filegroups.f2 = /opt/data/tail_2/.*log.*#指定position的位置
a1.sources.s1.positonFile = /opt/data/tail_position/taildir_position.json# 指定headers
a1.sources.s1.headers.f1.headerKey1 = value1
a1.sources.s1.headers.f2.headerKey1 = value2
a1.sources.s1.headers.f3.headerKey2 = value3a1.sources.s1.fileHeader = true#source和channel连接
a1.sources.s1.channels = c1
a1.channels.c1.type = memory#指定Sink
a1.sinks.sk1.type = logger
#sink和channel进行连接
a1.sinks.sk1.channel = c1a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
[root@zjw flumeconf]# flume-ng agent -f taildir.properties -n a1 -Dflume.root.logger=INFO,console

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-01RBoqCn-1597923171214)(../../../software/typora/img/image-20200817111853168.png)]

六、channel

  • Memory Channel
    • event保存在Java Heap中。如果允许数据小量丢失,推荐使用
  • File Channel
    • event保存在本地文件中,可靠性高,但吞吐量低于Memory Channel
  • Kafka Channel
  • JDBC Channel
    • event保存在关系数据中,一般不推荐使用

七、Sink

  • Sink负责从Channel收集数据
  • 常用Sink
    • avro sink
    • HDFS sink
    • Hive sink
    • Kafka sink

1、avro sink

  • 作为avro客户端向avro服务端发送avro事件
属性 默认值 描述
channel - 与 Sink 绑定的 channel
type - 组件类型,这个是: avro.
hostname - 监听的服务器名(hostname)或者 IP
port - 监听的端口
batch-size 100 每次批量发送的 Event 数
connect-timeout 20000 第一次连接请求(握手)的超时时间,单位:毫秒
request-timeout 20000 请求超时时间,单位:毫秒
reset-connection-interval none 重置连接到下一跳之前的时间量(秒)。 这将强制 Avro Sink 重新连接到下一跳。 这将允许Sink在添加了新的主机时连接到硬件负载均衡器后面的主机,而无需重新启动 Agent。
compression-type none 压缩类型。可选值: nonedeflate 。压缩类型必须与上一级Avro Source 配置的一致
compression-level 6 Event的压缩级别 0:不压缩,1-9:进行压缩,数字越大,压缩率越高
ssl false 设置为 true 表示Sink开启 SSL 下面的 truststoretruststore-passwordtruststore-type 就是开启SSL后使用的参数,并且可以指定是否信任所有证书( trust-all-certs
trust-all-certs false 如果设置为true, 不会检查远程服务器(Avro Source)的SSL服务器证书。不要在生产环境开启这个配置,因为它使攻击者更容易执行中间人攻击并在加密的连接上进行“监听”。
truststore - 自定义 Java 信任库文件的路径。 Flume 使用此文件中的证书颁发机构信息来确定是否应该信任远程 Avro Source 的 SSL 身份验证凭据。 如果未指定,将使用缺省 Java JSSE 证书颁发机构文件(通常为Oracle JRE中的“jssecacerts”或“cacerts”)。
truststore-password - 上面配置的信任库的密码
truststore-type JKS Java 信任库的类型。可以配成 JKS 或者其他支持的 Java 信任库类型
exclude-protocols SSLv3 要排除的以空格分隔的 SSL/TLS 协议列表。 SSLv3 协议不管是否配置都会被排除掉。
maxIoWorkers 2 * 机器上可用的处理器核心数量 I/O工作线程的最大数量。这个是在 NettyAvroRpcClient 的 NioClientSocketChannelFactory 上配置的。

avro_sink:

a1.sources = s1
a1.channels = c1
a1.sinks = sk1# Describe/configure the source
#设置source类型为exec
a1.sources.s1.type = exec
#设置命令,查看文件内容
a1.sources.s1.command = tail -f /root/data/customers.csv#source和channel连接
a1.sources.s1.channels = c1# 指定sink
a1.sinks.sk1.type = avro
a1.sinks.sk1.hostname = localhost
a1.sinks.sk1.port = 44444
#sink和channel 连接
a1.sinks.sk1.channel = c1# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

avro_sources:

a1.sources = s1
a1.channels = c1
a1.sinks = sk1#设置Source为avro
a1.sources.s1.type = avro
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 44444#source和channel连接
a1.sources.s1.channels = c1
a1.channels.c1.type = memory#指定Sink
a1.sinks.sk1.type = logger
#sink和channel进行连接
a1.sinks.sk1.channel = c1a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

2、HDFS sink

  • HDFS sink
    • 将事件写入Hadoop分布式文件系统(HDFS)

配置项:

属性名 默认值 描述
channel - 与 Sink 连接的 channel
type - 组件类型,这个是: hdfs
hdfs.path - HDFS目录路径(例如:hdfs://namenode/flume/webdata/)
hdfs.filePrefix FlumeData Flume在HDFS文件夹下创建新文件的固定前缀
hdfs.inUseSuffix .tmp Flume正在写入的临时文件后缀
hdfs.rollInterval 30 当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒
hdfs.rollSize 1024 当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节
hdfs.rollCount 10 当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件)
hdfs.idleTimeout 0 关闭非活动文件的超时时间(0表示禁用自动关闭文件),单位:秒
hdfs.batchSize 100 向 HDFS 写入内容时每次批量操作的 Event 数量
hdfs.fileType SequenceFile 文件格式,目前支持: SequenceFileDataStreamCompressedStream 。 1. DataStream 不会压缩文件,不需要设置hdfs.codeC 2. CompressedStream 必须设置hdfs.codeC参数
hdfs.maxOpenFiles 5000 允许打开的最大文件数,如果超过这个数量,最先打开的文件会被关闭
hdfs.minBlockReplicas - 指定每个HDFS块的最小副本数。 如果未指定,则使用 classpath 中 Hadoop 的默认配置
hdfs.writeFormat Writable 文件写入格式。可选值: TextWritable 。在使用 Flume 创建数据文件之前设置为 Text,否则 Apache Impala(孵化)或 Apache Hive 无法读取这些文件
hdfs.threadsPoolSize 10 每个HDFS Sink实例操作HDFS IO时开启的线程数(open、write 等)
hdfs.round false 是否应将时间戳向下舍入(如果为true,则影响除 %t 之外的所有基于时间的转义符
a2.channels=c2
a2.sources=s2
a2.sinks=k2a2.sources.s2.type=spooldir
a2.sources.s2.spoolDir=/opt/data/datas
a2.sources.s2.fileHeader=truea2.channels.c2.type=memory
a2.channels.c2.capacity=20000
a2.channels.c2.transactionCapacity=1000a2.sinks.k2.type=hdfs
a2.sinks.k2.hdfs.path=hdfs://192.168.253.150:9000/tmp/customs
a2.sinks.k2.hdfs.rollCount=5000
a2.sinks.k2.hdfs.rollSize=600000
a2.sinks.k2.hdfs.batchSize=500a2.sinks.k2.channel=c2
a2.sources.s2.channels=c2

3、hive sink

  • Hive sink
    • 包含分隔文本或JSON数据流事件直接进入Hive表或分区
    • 传入的事件数据字段映射到Hive表中相应的列
channel - 与 Sink 连接的 channel
type - 组件类型,这个是: hive
hive.metastore - Hive metastore URI (eg thrift://a.b.com:9083 )
hive.database - Hive 数据库名
hive.table - Hive表名
hive.partition - 逗号分隔的要写入的分区信息。 比如hive表的分区是(continent: string, country :string, time : string), 那么“Asia,India,2014-02-26-01-21”就表示数据会写入到continent=Asia,country=India,time=2014-02-26-01-21这个分区。
hive.txnsPerBatchAsk 100 Hive从Flume等客户端接收数据流会使用多次事务来操作,而不是只开启一个事务。这个参数指定处理每次请求所开启的事务数量。来自同一个批次中所有事务中的数据最终都在一个文件中。 Flume会向每个事务中写入 batchSize 个 Event,这个参数和 batchSize 一起控制着每个文件的大小,请注意,Hive最终会将这些文件压缩成一个更大的文件。
heartBeatInterval 240 发送到 Hive 的连续心跳检测间隔(秒),以防止未使用的事务过期。设置为0表示禁用心跳。
autoCreatePartitions true Flume 会自动创建必要的 Hive分区以进行流式传输
batchSize 15000 写入一个 Hive事务中最大的 Event 数量
callTimeout 10000 Hive、HDFS I/O操作的超时时间(毫秒),比如:开启事务、写数据、提交事务、取消事务。
round false 是否启用时间戳舍入机制
roundUnit minute 舍入值的单位,可选值:secondminutehour
roundValue 1 舍入到小于当前时间的最高倍数(使用 roundUnit 配置的单位) 例子1:roundUnit=second,roundValue=10,则14:31:18这个时间戳会被舍入到14:31:10; 例子2:roundUnit=second,roundValue=30,则14:31:18这个时间戳会被舍入到14:31:00,14:31:42这个时间戳会被舍入到14:31:30;
timeZone Local Time 应用于解析分区中转义序列的时区名称,比如:America/Los_Angeles、Asia/Shanghai、Asia/Tokyo等
useLocalTimeStamp false 替换转义序列时是否使用本地时间戳(否则使用Event header中的timestamp )

八、Flume Sink组

  • sink组是用来创建逻辑上的一组sink
  • sink组的行为是由sink处理器(processor)决定的,它决定了event的路由策略
  • processor包括故障转移和负载均衡两类
//故障转移
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
//负载均衡
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

九、拦截器(Interceptors)

  • 拦截器可修改或丢弃事件
    • 设置在source和channel之间
  • 内置拦截器
    • HostInterceptor:在event header中插入“hostname”
    • TimestampInterceptor:插入时间戳
    • StaticInceptor:插入key-value
    • UUIDInceptor:插入UUID
  • 自定义拦截器

正则:

a3.channels=c3
a3.sources=s3
a3.sinks=k3a3.sources.s3.type=spooldir
a3.sources.s3.spoolDir=/opt/data/datas
a3.sources.s3.interceptors=userid_filter
a3.sources.s3.interceptors.userid_filter.type=regex_filter
a3.sources.s3.interceptors.userid_filter.regex=userid.*
a3.sources.s3.interceptors.userid_filter.encludeEvents=truea3.channels.s3.type=memorya3.sinks.k3.type=loggera3.sources.s3.channels=c3
a3.sinks.k3.channel=c3

代码实现拦截器:

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
public class UserInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {byte[] body = event.getBody();String str = new String(body, Charset.forName("UTF-8"));String[] value = str.split(",");switch (value[2]){case "男" :value[2]="1";break;case "女" :value[2]="2";break;default:value[2]="0";}String newStr = value[0]+","+value[1]+","+value[2]+","+value[3];event.setBody(newStr.getBytes());return event;}@Overridepublic List<Event> intercept(List<Event> list) {for (Event event : list) {intercept(event);}return list;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new UserInterceptor();}@Overridepublic void configure(Context context) {}}
}

配置:

a4.channels=c4
a4.sources=s4
a4.sinks=k4a4.sources.s4.type=spooldir
a4.sources.s4.spoolDir=/opt/data/datas
a4.sources.s4.interceptors=myInterceptor
a4.sources.s4.interceptors.myInterceptor.type=com.zjw.UserInterceptor$Buildera4.channels.c4.type=memorya4.sinks.k4.type=loggera4.sinks.k4.channel=c4
a4.sources.s4.channels=c4
  相关解决方案