Flume
- Flume日志收集
- 一、Apache Flume简介
- 二、Flume架构
- 三、Flume安装配置
- 1、安装
- 2、首次测试
- 四、Flume学习
- 1、Flume工作流程
- 2、Flume构成
- 五、Source
- 1、exec source
- 2、spooling directory source
- 3、http source
- 4、avro source
- 5、taildir source
- 六、channel
- 七、Sink
- 1、avro sink
- 2、HDFS sink
- 3、hive sink
- 八、Flume Sink组
- 九、拦截器(Interceptors)
Flume日志收集
Flume配置参照官方文档:http://flume.apache.org/
中文版:https://flume.liyifeng.org/
一、Apache Flume简介
- Flume用于将多种来源的日志以流的方式传输至Hadoop或者其它目的地
- 一种可靠、可用的高效分布式数据收集服务
- Flume拥有基于数据流上的简单灵活架构,支持容错、故障转移与恢复
二、Flume架构
- Client:客户端,数据产生的地方,如Web服务器
- Event:事件,指通过Agent传输的单个数据包,如日志数据通常对应一行数据
- Agent:代理,一个独立的JVM进程
- Flume以一个或多个Agent部署运行
- Agent包含三个组件
- Source
- Channel
- Sink
三、Flume安装配置
1、安装
(1)首先安装netcat
:yum install nmap-ncat.x86_64 -y
(2)安装Flume
解压移动
[root@zjw opt]# tar -zxf flume-ng-1.6.0-cdh5.14.2.tar.gz [root@zjw opt]# mv apache-flume-1.6.0-cdh5.14.2-bin/ soft/flume160
配置:
[root@zjw conf]# cp flume-env.sh.template flume-env.sh [root@zjw conf]# vi flume-env.sh
//添加配置
export JAVA_HOME=/opt/soft/jdk180
配置环境变量:
# FLUME_HOME export FLUME_HOME=/opt/soft/flume export PATH=$PATH:$FLUME_HOME/bin
验证安装:
[root@zjw conf]# flume-ng version
Flume 1.6.0-cdh5.14.2
Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git
Revision: 50436774fa1c7eaf0bd9c89ac6ee845695fbb687
Compiled by jenkins on Tue Mar 27 13:55:10 PDT 2018
From source with checksum 30217fe2b34097676ff5eabb51f4a11d
2、首次测试
官网指南:http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html
创建测试配置文件夹
[root@zjw conf]# mkdir /opt/soft/flume160/flumeconf
编写配置:
[root@zjw flumeconf]# vi conf_0808_simple.properties
# example.conf: A single-node Flume configuration# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 6666# Describe the sink
a1.sinks.k1.type = logger# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
进入另一个界面运行:
启动监控:
[root@zjw ~]# flume-ng agent -n a1 -c conf -f /opt/soft/flume160/flumeconf/conf_0808_simple.properties -Dflume.root.logger=INFO,console
启动netcat:
[root@zjw flumeconf]# nc -l 6666
当在netcat中输出时会在flume中监控到输入流
四、Flume学习
1、Flume工作流程
- Source->Channel
- 主动模式
- 数据由Source发送
- Channel->Sink
- 数据由Sink拉取
flume的主要组件包括:
Source,SourceRunner,Interceptor,Channel,ChannelSelector,ChannelProcessor,Sink,SinkRunner,SinkProcessor,SinkSelector等
工作流程包含两个部分:
source->channel,数据由source写入channel,主动模式,主要步骤如下:
一个SourceRunner包含一个Source对象,一个Source对象包含一个ChannelProcessor对象,一个ChannelProcessor对象包含多个Interceptor对象和一个ChannelSelector对象
1)SourceRunner启动Source,Source接收Event
2) Source调用ChannelProcessor
3)ChannelProcessor调用Interceptor进行过滤Event操作
4)ChannelProcessor调用ChannelSelector对象根据配置的策略选择Event对应的Channel(replication和multiplexing两种)
5)Source将Event发送到对应的Channel中
channel->sink,数据由sink主动从channel中拉取(将压力分摊到sink,这一点类似于kafka的consumer)
一个SinkRunner对象包含一个SinkProcessor对象,一个SinkProcessor包含多个Sink或者一个SinkSelector
1)SinkRunner启动SinkProcessor(DefaultSinkProcessor,FailoverSinkProcessor,LoadBalancingSinkProcessor 3种)
2)如果是DefaultSinkProcessor的话,直接启动单个Sink
3)FailoverSinkProcessor,LoadBalancingSinkProcessor对应的是SinkGroup
4)FailoverSinkProcessor从SinkGroup中选择出Sink并启动
5)LoadBalancingSinkProcessor包含SinkSelector,会根据SinkSelector在SinkGroup中选择Sink并启动
6)Sink 从Channel中消费Event信息
2、Flume构成
- Source
- SourceRunner
- Interceptor
- Channel
- ChannelSelector
- ChannelProcessor
- Sink
- SinkRunner
- SinkProcessor
- SinkSelector
五、Source
1、exec source
- 执行Linux指令,并消费指令返回的结果,如“tail -f”
属性 | 默认值 | 描述 |
---|---|---|
channels | - | |
type | - | 组件类型名称,需要被执行 |
command | - | 要执行的命令 |
shell | - | 用于运行命令的shell程序调用。例如/ bin / sh -c。仅对于依赖于shell功能(如通配符,反斜杠,管道等)的命令才需要。 |
restartThrottle | 10000 | 尝试重新启动之前等待的时间(以毫秒为单位) |
restart | false | 如果执行的cmd停止运行,是否应该重新启动 |
logStdErr | false | 是否应记录命令的stderr |
batchSize | 20 | 一次读取并发送到通道的最大行数 |
batchTimeout | 3000 | 如果未达到缓冲区大小,则等待时间(以毫秒为单位),然后将数据推送到下游 |
selector.type | replicating | 复制或多路复用 |
selector.* | 取决于selector.type值 | |
interceptors | - | 以空格分隔的拦截器列表 |
interceptors.* |
测试:
a1.sources = s1
a1.channels = c1
a1.sinks = sk1#设置Source为exec
a1.sources.s1.type = exec
a1.sources.s1.command = tail -f /opt/data/exectest.txt#source和channel连接
a1.sources.s1.channels = c1
a1.channels.c1.type = memory#指定Sink
a1.sinks.sk1.type = logger
#sink和channel进行连接
a1.sinks.sk1.channel = c1a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
flume-ng agent -f conf_hello_flume.properties -n a1 -Dflume.root.logger=INFO,console
2、spooling directory source
- 从磁盘文件夹中获取文件数据,可避免重启或者发送失败后数据丢失,还可用于监控文件夹新文件
属性 | 缺省值 | 描述 |
---|---|---|
type | - | spooldir |
spoolDir | - | 需读取的文件夹 |
fileSuffix | .COMPLETED | 文件读取完成后添加的后缀 |
deletePolicy | never | 文件完成后删除策略:never和immediate |
a1.sources = s1
a1.channels = c1
a1.sinks = sk1#设置Source为spooldir
a1.sources.s1.type = spooldir
a1.sources.s1.spoolDir = /opt/data/spooldirTest#source和channel连接
a1.sources.s1.channels = c1
a1.channels.c1.type = memory#指定Sink
a1.sinks.sk1.type = logger
#sink和channel进行连接
a1.sinks.sk1.channel = c1a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
[root@zjw flumeconf]# flume-ng agent -f spoolDir.properties -n a1 -Dflume.root.logger=INFO,console
将文件拖至文件夹:
3、http source
- 用于接收HTTP的Get和Post请求
属性 | 缺省值 | 描述 |
---|---|---|
type | - | http |
port | - | 监听端口 |
bind | 0.0.0.0 | 绑定IP |
handler | org.apache.flume.source.http.JSONHandler | 数据处理程序类全名 |
a1.sources = s1
a1.channels = c1
a1.sinks = sk1#设置Source为http
a1.sources.s1.type = http
a1.sources.s1.port = 5140#source和channel连接
a1.sources.s1.channels = c1
a1.channels.c1.type = memory#指定Sink
a1.sinks.sk1.type = logger
#sink和channel进行连接
a1.sinks.sk1.channel = c1a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
[root@zjw flumeconf]# flume-ng agent -f http.properties -n a1 -Dflume.root.logger=INFO,console
然后在另一个界面输出:
[root@zjw ~]# curl -XPOST localhost:5140 -d '[{"headers":{"h1":"v1","h2":"v2"},"body":"hello flume"}]'
4、avro source
- 监听Avro端口,并从外部Avro客户端接收events
属性 | 默认值 | 描述 |
---|---|---|
channels | - | 与Source绑定的channel,多个用空格分开 |
type | - | 组件类型,这个是: avro |
bind | - | 监听的服务器名hostname或者ip |
port | - | 监听的端口 |
threads | - | 生成的最大工作线程数量 |
selector.type | 可选值:replicating 或 multiplexing ,分别表示: 复制、多路复用 |
|
selector.* | channel选择器的相关属性,具体属性根据设定的 selector.type 值不同而不同 | |
interceptors | - | 该source所使用的拦截器,多个用空格分开 |
interceptors.* | 拦截器的相关属性 | |
compression-type | none | 可选值: none 或 deflate 。这个类型必须跟Avro Source相匹配 |
ssl | false | 设置为 true 可启用SSL加密,如果为true必须指定下面的 keystore 和 keystore-password 。 |
keystore | - | SSL加密使用的Java keystore文件路径 |
keystore-password | - | Java keystore的密码 |
keystore-type | JKS | Java keystore的类型. 可选值有 JKS 、 PKCS12 。 |
exclude-protocols | SSLv3 | 指定不支持的协议,多个用空格分开,SSLv3不管是否配置都会被强制排除 |
ipFilter | false | 设置为true可启用ip过滤(netty方式的avro) |
ipFilterRules | - | netty ipFilter的配置(参考下面的ipFilterRules详细介绍和例子) |
5、taildir source
- 可实时监控一批文件,并记录每个文件最新消费位置,agent进程重启后不会有重复消费的问题。
用于服务宕机了,重启服务之后会在上次记录的节点继续读。
属性名 | 默认值 | 描述 |
---|---|---|
channels | - | 与Source绑定的channel,多个用空格分开 |
type | - | 组件类型,这个是: TAILDIR . |
filegroups | - | 被监控的文件夹目录集合,这些文件夹下的文件都会被监控,多个用空格分隔 |
filegroups. | - | 被监控文件夹的绝对路径。正则表达式(注意不会匹配文件系统的目录)只是用来匹配文件名 |
positionFile | ~/.flume/taildir_position.json | 用来设定一个记录每个文件的绝对路径和最近一次读取位置inode的文件,这个文件是JSON格式。 |
headers.. | - | 给某个文件组下的Event添加一个固定的键值对到header中,值就是value。一个文件组可以配置多个键值对。 |
byteOffsetHeader | false | 是否把读取数据行的字节偏移量记录到Event的header里面,这个header的key是byteoffset |
skipToEnd | false | 如果在 positionFile 里面没有记录某个文件的读取位置,是否直接跳到文件末尾开始读取 |
idleTimeout | 120000 | 关闭非活动文件的超时时间(毫秒)。如果被关闭的文件重新写入了新的数据行,会被重新打开 |
writePosInterval | 3000 | 向 positionFile 记录文件的读取位置的间隔时间(毫秒) |
batchSize | 100 | 一次读取数据行和写入channel的最大数量,通常使用默认值就很好 |
backoffSleepIncrement | 1000 | 在最后一次尝试未发现任何新数据时,重新尝试轮询新数据之前的时间延迟增量(毫秒) |
maxBackoffSleep | 5000 | 每次重新尝试轮询新数据时的最大时间延迟(毫秒) |
cachePatternMatching | true | 对于包含数千个文件的目录,列出目录并应用文件名正则表达式模式可能非常耗时。 缓存匹配文件列表可以提高性能。 消耗文件的顺序也将被缓存。 要求文件系统支持以至少秒级跟踪修改时间。 |
fileHeader | false | 是否在header里面存储文件的绝对路径 |
fileHeaderKey | file | 文件的绝对路径存储到header里面使用的key |
a1.sources = s1
a1.channels = c1
a1.sinks = sk1#设置Source为taildir
a1.sources.s1.type =TAILDIR
a1.sources.s1.filegroups = f1 f2
# 配置filegroups的f1
a1.sources.s1.filegroups.f1 = /opt/data/tail_1/example.log
# 配置filegroups的f2
a1.sources.s1.filegroups.f2 = /opt/data/tail_2/.*log.*#指定position的位置
a1.sources.s1.positonFile = /opt/data/tail_position/taildir_position.json# 指定headers
a1.sources.s1.headers.f1.headerKey1 = value1
a1.sources.s1.headers.f2.headerKey1 = value2
a1.sources.s1.headers.f3.headerKey2 = value3a1.sources.s1.fileHeader = true#source和channel连接
a1.sources.s1.channels = c1
a1.channels.c1.type = memory#指定Sink
a1.sinks.sk1.type = logger
#sink和channel进行连接
a1.sinks.sk1.channel = c1a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
[root@zjw flumeconf]# flume-ng agent -f taildir.properties -n a1 -Dflume.root.logger=INFO,console
六、channel
- Memory Channel
- event保存在Java Heap中。如果允许数据小量丢失,推荐使用
- File Channel
- event保存在本地文件中,可靠性高,但吞吐量低于Memory Channel
- Kafka Channel
- JDBC Channel
- event保存在关系数据中,一般不推荐使用
七、Sink
- Sink负责从Channel收集数据
- 常用Sink
- avro sink
- HDFS sink
- Hive sink
- Kafka sink
1、avro sink
- 作为avro客户端向avro服务端发送avro事件
属性 | 默认值 | 描述 |
---|---|---|
channel | - | 与 Sink 绑定的 channel |
type | - | 组件类型,这个是: avro . |
hostname | - | 监听的服务器名(hostname)或者 IP |
port | - | 监听的端口 |
batch-size | 100 | 每次批量发送的 Event 数 |
connect-timeout | 20000 | 第一次连接请求(握手)的超时时间,单位:毫秒 |
request-timeout | 20000 | 请求超时时间,单位:毫秒 |
reset-connection-interval | none | 重置连接到下一跳之前的时间量(秒)。 这将强制 Avro Sink 重新连接到下一跳。 这将允许Sink在添加了新的主机时连接到硬件负载均衡器后面的主机,而无需重新启动 Agent。 |
compression-type | none | 压缩类型。可选值: none 、 deflate 。压缩类型必须与上一级Avro Source 配置的一致 |
compression-level | 6 | Event的压缩级别 0:不压缩,1-9:进行压缩,数字越大,压缩率越高 |
ssl | false | 设置为 true 表示Sink开启 SSL 下面的 truststore 、 truststore-password 、 truststore-type 就是开启SSL后使用的参数,并且可以指定是否信任所有证书( trust-all-certs ) |
trust-all-certs | false | 如果设置为true, 不会检查远程服务器(Avro Source)的SSL服务器证书。不要在生产环境开启这个配置,因为它使攻击者更容易执行中间人攻击并在加密的连接上进行“监听”。 |
truststore | - | 自定义 Java 信任库文件的路径。 Flume 使用此文件中的证书颁发机构信息来确定是否应该信任远程 Avro Source 的 SSL 身份验证凭据。 如果未指定,将使用缺省 Java JSSE 证书颁发机构文件(通常为Oracle JRE中的“jssecacerts”或“cacerts”)。 |
truststore-password | - | 上面配置的信任库的密码 |
truststore-type | JKS | Java 信任库的类型。可以配成 JKS 或者其他支持的 Java 信任库类型 |
exclude-protocols | SSLv3 | 要排除的以空格分隔的 SSL/TLS 协议列表。 SSLv3 协议不管是否配置都会被排除掉。 |
maxIoWorkers | 2 * 机器上可用的处理器核心数量 | I/O工作线程的最大数量。这个是在 NettyAvroRpcClient 的 NioClientSocketChannelFactory 上配置的。 |
avro_sink:
a1.sources = s1
a1.channels = c1
a1.sinks = sk1# Describe/configure the source
#设置source类型为exec
a1.sources.s1.type = exec
#设置命令,查看文件内容
a1.sources.s1.command = tail -f /root/data/customers.csv#source和channel连接
a1.sources.s1.channels = c1# 指定sink
a1.sinks.sk1.type = avro
a1.sinks.sk1.hostname = localhost
a1.sinks.sk1.port = 44444
#sink和channel 连接
a1.sinks.sk1.channel = c1# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
avro_sources:
a1.sources = s1
a1.channels = c1
a1.sinks = sk1#设置Source为avro
a1.sources.s1.type = avro
a1.sources.s1.bind = 0.0.0.0
a1.sources.s1.port = 44444#source和channel连接
a1.sources.s1.channels = c1
a1.channels.c1.type = memory#指定Sink
a1.sinks.sk1.type = logger
#sink和channel进行连接
a1.sinks.sk1.channel = c1a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
2、HDFS sink
- HDFS sink
- 将事件写入Hadoop分布式文件系统(HDFS)
配置项:
属性名 | 默认值 | 描述 |
---|---|---|
channel | - | 与 Sink 连接的 channel |
type | - | 组件类型,这个是: hdfs |
hdfs.path | - | HDFS目录路径(例如:hdfs://namenode/flume/webdata/) |
hdfs.filePrefix | FlumeData | Flume在HDFS文件夹下创建新文件的固定前缀 |
hdfs.inUseSuffix | .tmp | Flume正在写入的临时文件后缀 |
hdfs.rollInterval | 30 | 当前文件写入达到该值时间后触发滚动创建新文件(0表示不按照时间来分割文件),单位:秒 |
hdfs.rollSize | 1024 | 当前文件写入达到该大小后触发滚动创建新文件(0表示不根据文件大小来分割文件),单位:字节 |
hdfs.rollCount | 10 | 当前文件写入Event达到该数量后触发滚动创建新文件(0表示不根据 Event 数量来分割文件) |
hdfs.idleTimeout | 0 | 关闭非活动文件的超时时间(0表示禁用自动关闭文件),单位:秒 |
hdfs.batchSize | 100 | 向 HDFS 写入内容时每次批量操作的 Event 数量 |
hdfs.fileType | SequenceFile | 文件格式,目前支持: SequenceFile 、 DataStream 、 CompressedStream 。 1. DataStream 不会压缩文件,不需要设置hdfs.codeC 2. CompressedStream 必须设置hdfs.codeC参数 |
hdfs.maxOpenFiles | 5000 | 允许打开的最大文件数,如果超过这个数量,最先打开的文件会被关闭 |
hdfs.minBlockReplicas | - | 指定每个HDFS块的最小副本数。 如果未指定,则使用 classpath 中 Hadoop 的默认配置 |
hdfs.writeFormat | Writable | 文件写入格式。可选值: Text 、 Writable 。在使用 Flume 创建数据文件之前设置为 Text ,否则 Apache Impala(孵化)或 Apache Hive 无法读取这些文件 |
hdfs.threadsPoolSize | 10 | 每个HDFS Sink实例操作HDFS IO时开启的线程数(open、write 等) |
hdfs.round | false | 是否应将时间戳向下舍入(如果为true,则影响除 %t 之外的所有基于时间的转义符 |
a2.channels=c2
a2.sources=s2
a2.sinks=k2a2.sources.s2.type=spooldir
a2.sources.s2.spoolDir=/opt/data/datas
a2.sources.s2.fileHeader=truea2.channels.c2.type=memory
a2.channels.c2.capacity=20000
a2.channels.c2.transactionCapacity=1000a2.sinks.k2.type=hdfs
a2.sinks.k2.hdfs.path=hdfs://192.168.253.150:9000/tmp/customs
a2.sinks.k2.hdfs.rollCount=5000
a2.sinks.k2.hdfs.rollSize=600000
a2.sinks.k2.hdfs.batchSize=500a2.sinks.k2.channel=c2
a2.sources.s2.channels=c2
3、hive sink
- Hive sink
- 包含分隔文本或JSON数据流事件直接进入Hive表或分区
- 传入的事件数据字段映射到Hive表中相应的列
channel | - | 与 Sink 连接的 channel |
type | - | 组件类型,这个是: hive |
hive.metastore | - | Hive metastore URI (eg thrift://a.b.com:9083 ) |
hive.database | - | Hive 数据库名 |
hive.table | - | Hive表名 |
hive.partition | - | 逗号分隔的要写入的分区信息。 比如hive表的分区是(continent: string, country :string, time : string), 那么“Asia,India,2014-02-26-01-21”就表示数据会写入到continent=Asia,country=India,time=2014-02-26-01-21这个分区。 |
hive.txnsPerBatchAsk | 100 | Hive从Flume等客户端接收数据流会使用多次事务来操作,而不是只开启一个事务。这个参数指定处理每次请求所开启的事务数量。来自同一个批次中所有事务中的数据最终都在一个文件中。 Flume会向每个事务中写入 batchSize 个 Event,这个参数和 batchSize 一起控制着每个文件的大小,请注意,Hive最终会将这些文件压缩成一个更大的文件。 |
heartBeatInterval | 240 | 发送到 Hive 的连续心跳检测间隔(秒),以防止未使用的事务过期。设置为0表示禁用心跳。 |
autoCreatePartitions | true | Flume 会自动创建必要的 Hive分区以进行流式传输 |
batchSize | 15000 | 写入一个 Hive事务中最大的 Event 数量 |
callTimeout | 10000 | Hive、HDFS I/O操作的超时时间(毫秒),比如:开启事务、写数据、提交事务、取消事务。 |
round | false | 是否启用时间戳舍入机制 |
roundUnit | minute | 舍入值的单位,可选值:second 、 minute 、 hour |
roundValue | 1 | 舍入到小于当前时间的最高倍数(使用 roundUnit 配置的单位) 例子1:roundUnit=second,roundValue=10,则14:31:18这个时间戳会被舍入到14:31:10; 例子2:roundUnit=second,roundValue=30,则14:31:18这个时间戳会被舍入到14:31:00,14:31:42这个时间戳会被舍入到14:31:30; |
timeZone | Local Time | 应用于解析分区中转义序列的时区名称,比如:America/Los_Angeles、Asia/Shanghai、Asia/Tokyo等 |
useLocalTimeStamp | false | 替换转义序列时是否使用本地时间戳(否则使用Event header中的timestamp ) |
八、Flume Sink组
- sink组是用来创建逻辑上的一组sink
- sink组的行为是由sink处理器(processor)决定的,它决定了event的路由策略
- processor包括故障转移和负载均衡两类
//故障转移
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
//负载均衡
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
九、拦截器(Interceptors)
- 拦截器可修改或丢弃事件
- 设置在source和channel之间
- 内置拦截器
- HostInterceptor:在event header中插入“hostname”
- TimestampInterceptor:插入时间戳
- StaticInceptor:插入key-value
- UUIDInceptor:插入UUID
- 自定义拦截器
正则:
a3.channels=c3
a3.sources=s3
a3.sinks=k3a3.sources.s3.type=spooldir
a3.sources.s3.spoolDir=/opt/data/datas
a3.sources.s3.interceptors=userid_filter
a3.sources.s3.interceptors.userid_filter.type=regex_filter
a3.sources.s3.interceptors.userid_filter.regex=userid.*
a3.sources.s3.interceptors.userid_filter.encludeEvents=truea3.channels.s3.type=memorya3.sinks.k3.type=loggera3.sources.s3.channels=c3
a3.sinks.k3.channel=c3
代码实现拦截器:
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
public class UserInterceptor implements Interceptor {@Overridepublic void initialize() {}@Overridepublic Event intercept(Event event) {byte[] body = event.getBody();String str = new String(body, Charset.forName("UTF-8"));String[] value = str.split(",");switch (value[2]){case "男" :value[2]="1";break;case "女" :value[2]="2";break;default:value[2]="0";}String newStr = value[0]+","+value[1]+","+value[2]+","+value[3];event.setBody(newStr.getBytes());return event;}@Overridepublic List<Event> intercept(List<Event> list) {for (Event event : list) {intercept(event);}return list;}@Overridepublic void close() {}public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new UserInterceptor();}@Overridepublic void configure(Context context) {}}
}
配置:
a4.channels=c4
a4.sources=s4
a4.sinks=k4a4.sources.s4.type=spooldir
a4.sources.s4.spoolDir=/opt/data/datas
a4.sources.s4.interceptors=myInterceptor
a4.sources.s4.interceptors.myInterceptor.type=com.zjw.UserInterceptor$Buildera4.channels.c4.type=memorya4.sinks.k4.type=loggera4.sinks.k4.channel=c4
a4.sources.s4.channels=c4