案例:
自定义拦截器
pom.xml
<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId> </dependency>
拦截器类
package com.flume.interceptors;import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import org.apache.velocity.runtime.directive.Foreach;import java.util.ArrayList; import java.util.List; import java.util.Map;/*** Created by HP on 2020/1/17.*/ public class DistributeLogsInterceptor implements Interceptor{//声明一个添加了头信息的event容器private List<Event> addHeaderEventList;@Overridepublic void initialize() {addHeaderEventList=new ArrayList<>();}/*a1.sources = r1a1.channels = c1 c2 c3 c4a1.sources.r1.selector.type = multiplexinga1.sources.r1.selector.header = statea1.sources.r1.selector.mapping.CZ = c1a1.sources.r1.selector.mapping.US = c2 c3a1.sources.r1.selector.default = c4*///单个事件的拦截器@Overridepublic Event intercept(Event event) {//获取事件中的头信息// a1.sources.r1.selector.header = stateMap<String, String> headers = event.getHeaders();//获取事件中的body信息String body=new String(event.getBody());if(body.contains("PVUV")){//前台页面访问日志//添加头信息//a1.sources.r1.selector.header = state//a1.sources.r1.selector.mapping.CZ = c1headers.put("state","CZ");}else if(body.contains("ORDER")){//后台下单日志//添加头信息//a1.sources.r1.selector.header = state//a1.sources.r1.selector.mapping.US = c1headers.put("state","US");}else{//a1.sources.r1.selector.default = c4//前两个判断没有进入则进入c4,flume.conf中不配置的话,则代表数据被抛弃掉}return event;}@Overridepublic List<Event> intercept(List<Event> list) {//清空listaddHeaderEventList.clear();for (Event event : list) {addHeaderEventList.add(intercept(event));}return addHeaderEventList;}@Overridepublic void close() {}//静态内部类,用于返回拦截器实体public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new DistributeLogsInterceptor();}@Overridepublic void configure(Context context) {}} }
打包上传到/opt/apache-flume-1.7.0-bin/
agent配置文件
node2:agent1
#定义 a1.sources = r1 a1.channels = c1 c2 a1.sinks = k1 k2#source a1.sources.r1.type = netcat a1.sources.r1.bind = node2 a1.sources.r1.port = 44444#channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100a1.channels.c2.type = memory a1.channels.c2.capacity = 1000 a1.channels.c2.transactionCapacity = 100# channel selector 定义channel选择器 a1.sources.r1.selector.type = multiplexing a1.sources.r1.selector.header = state a1.sources.r1.selector.mapping.CZ = c1 a1.sources.r1.selector.mapping.US = c2 #拦截器拦截掉的数据可以定义到c3 #a1.sources.r1.selector.default = c3#Interceptor 配置自定义拦截器 a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.flume.interceptors.DistributeLogsInterceptor$Builder#sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = node3 a1.sinks.k1.port = 4141a1.sinks.k2.type = avro a1.sinks.k2.hostname = node3 a1.sinks.k2.port = 4142#bind 关联关系 a1.sources.r1.channels = c1 c2 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1
node3:agent2
#定义 a2.sources = r1 a2.sinks = k1 a2.channels = c1#source a2.sources.r1.type = avro a2.sources.r1.bind = node3 a2.sources.r1.port = 4141#channel a2.channels.c1.type = memory a2.channels.c1.capacity = 1000 a2.channels.c1.transactionCapacity = 100#sink #控制台日志打印 a2.sinks.k1.type = logger#关联关系 a2.sources.r1.channels = c1 a2.sinks.k1.channel = c1
node3:agent3
#定义 a3.sources = r1 a3.sinks = k1 a3.channels = c1#source a3.sources.r1.type = avro a3.sources.r1.bind = node3 a3.sources.r1.port = 4142#channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100#sink #控制台日志打印 a3.sinks.k1.type = logger#关联关系 a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1
启动
依次执行node3节点 flume-ng agent -n a3 -c conf -f /opt/flume_conf/flume13.conf -Dflume.root.logger=INFO,console flume-ng agent -n a2 -c conf -f /opt/flume_conf/flume12.conf -Dflume.root.logger=INFO,consolenode2节点 flume-ng agent -n a1 -c conf -f /opt/flume_conf/flume11.conf