当前位置: 代码迷 >> 综合 >> Flume三:多路复用(ChannelSelector之Multiplexing)+自定义拦截器
  详细解决方案

Flume三:多路复用(ChannelSelector之Multiplexing)+自定义拦截器

热度:49   发布时间:2024-01-25 15:38:09.0

案例:

自定义拦截器 

pom.xml

<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId>
</dependency>

拦截器类

package com.flume.interceptors;import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.apache.velocity.runtime.directive.Foreach;import java.util.ArrayList;
import java.util.List;
import java.util.Map;/*** Created by HP on 2020/1/17.*/
public class DistributeLogsInterceptor implements Interceptor{//声明一个添加了头信息的event容器private List<Event> addHeaderEventList;@Overridepublic void initialize() {addHeaderEventList=new ArrayList<>();}/*a1.sources = r1a1.channels = c1 c2 c3 c4a1.sources.r1.selector.type = multiplexinga1.sources.r1.selector.header = statea1.sources.r1.selector.mapping.CZ = c1a1.sources.r1.selector.mapping.US = c2 c3a1.sources.r1.selector.default = c4*///单个事件的拦截器@Overridepublic Event intercept(Event event) {//获取事件中的头信息// a1.sources.r1.selector.header = stateMap<String, String> headers = event.getHeaders();//获取事件中的body信息String body=new String(event.getBody());if(body.contains("PVUV")){//前台页面访问日志//添加头信息//a1.sources.r1.selector.header = state//a1.sources.r1.selector.mapping.CZ = c1headers.put("state","CZ");}else if(body.contains("ORDER")){//后台下单日志//添加头信息//a1.sources.r1.selector.header = state//a1.sources.r1.selector.mapping.US = c1headers.put("state","US");}else{//a1.sources.r1.selector.default = c4//前两个判断没有进入则进入c4,flume.conf中不配置的话,则代表数据被抛弃掉}return event;}@Overridepublic List<Event> intercept(List<Event> list) {//清空listaddHeaderEventList.clear();for (Event event : list) {addHeaderEventList.add(intercept(event));}return addHeaderEventList;}@Overridepublic void close() {}//静态内部类,用于返回拦截器实体public static class Builder implements Interceptor.Builder{@Overridepublic Interceptor build() {return new DistributeLogsInterceptor();}@Overridepublic void configure(Context context) {}}
}

打包上传到/opt/apache-flume-1.7.0-bin/

agent配置文件

node2:agent1

#定义
a1.sources = r1
a1.channels = c1 c2
a1.sinks = k1 k2#source
a1.sources.r1.type = netcat
a1.sources.r1.bind = node2
a1.sources.r1.port = 44444#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100# channel selector 定义channel选择器
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = state
a1.sources.r1.selector.mapping.CZ = c1
a1.sources.r1.selector.mapping.US = c2
#拦截器拦截掉的数据可以定义到c3
#a1.sources.r1.selector.default = c3#Interceptor 配置自定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.flume.interceptors.DistributeLogsInterceptor$Builder#sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = node3
a1.sinks.k1.port = 4141a1.sinks.k2.type = avro
a1.sinks.k2.hostname = node3
a1.sinks.k2.port = 4142#bind 关联关系
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c1

node3:agent2

#定义
a2.sources = r1
a2.sinks = k1
a2.channels = c1#source
a2.sources.r1.type = avro
a2.sources.r1.bind = node3
a2.sources.r1.port = 4141#channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100#sink
#控制台日志打印
a2.sinks.k1.type = logger#关联关系
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

node3:agent3

#定义
a3.sources = r1
a3.sinks = k1
a3.channels = c1#source
a3.sources.r1.type = avro
a3.sources.r1.bind = node3
a3.sources.r1.port = 4142#channel
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100#sink
#控制台日志打印
a3.sinks.k1.type = logger#关联关系
a3.sources.r1.channels = c1
a3.sinks.k1.channel = c1

启动

依次执行node3节点
flume-ng agent -n a3 -c conf -f /opt/flume_conf/flume13.conf  -Dflume.root.logger=INFO,console
flume-ng agent -n a2 -c conf -f /opt/flume_conf/flume12.conf  -Dflume.root.logger=INFO,consolenode2节点
flume-ng agent -n a1 -c conf -f /opt/flume_conf/flume11.conf

 

 

  相关解决方案