当前位置: 代码迷 >> 综合 >> flume自定义组件(source,Interceptor,sink)
  详细解决方案

flume自定义组件(source,Interceptor,sink)

热度:43   发布时间:2023-11-22 16:22:15.0

Source

原理

每次Agent启动后,会调用PollableSourceRunner.start(),开启一个PollableSourceRunner线程!

这个线程会初始化PollableSource(可以轮询地去读取数据源中的数据)对象!

? PollableSource由所在的PollingRunner线程控制,调用PollableSource的process()方法,来探测是否有新的数据,将新的数据封装为event,存储到channel中!

package com.zhengkw.flume;import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;import java.util.ArrayList;
import java.util.List;/*** @ClassName:MySource* @author: zhengkw* @description: flume自定义source* @date: 20/03/13下午 10:20* @version:1.0* @since: jdk 1.8*/
public class MySource extends AbstractSource implements Configurable, PollableSource {
    String name;/*** @descrption: 调用此方法处理封装处理event* Status 为枚举类 ready backoff 2个状态!* <p>* 最核心的方法,读取数据,封装为event,写入到channel* 如果读到数据,封装为event,返回ready,否则如果当前没有读到数据,返回backoff* 每间隔5s,自动封装10个event,10个event的内容为{zhengkw:i}* @return: org.apache.flume.PollableSource.Status* @date: 20/03/13 下午 10:24* @author: zhengkw*/@Overridepublic Status process() throws EventDeliveryException {
    Status status = Status.READY;List<Event> events = new ArrayList<>();//一次封装10个for (int i = 0; i < 10; i++) {
    SimpleEvent event = new SimpleEvent();//封装数据event.setBody(("zhengkw" + i).getBytes());//添加events.add(event);}try {
    //获取channelprocessorChannelProcessor processor = getChannelProcessor();//将数据放入putlist中processor.processEventBatch(events);// 每次执行完,让线程休息5sThread.sleep(5000);} catch (Exception e) {
    //如果处理异常就回滚!status = Status.BACKOFF;e.printStackTrace();}return status;}/*** @descrption: 单位ms 控制无数据读取时睡眠时间增量 与countGroup获取的Long做乘积* Thread.sleep(Math.min(* counterGroup.incrementAndGet("runner.backoffs.consecutive")** source.getBackOffSleepIncrement(),* source.getMaxBackOffSleepInterval()));* @return: long* @date: 20/03/13 下午 10:24* @author: zhengkw*/@Overridepublic long getBackOffSleepIncrement() {
    //1秒return 1000;}/*** @descrption:单位ms 控制最大睡眠时间* @return: long* @date: 20/03/13 下午 10:25* @author: zhengkw*/@Overridepublic long getMaxBackOffSleepInterval() {
    //最长5秒return 5000;}/*** @param context* @descrption: 获取 flume中agent配置文件上下文* @return: void* @date: 20/03/13 下午 10:30* @author: zhengkw*/@Overridepublic void configure(Context context) {
    //如果没有通过name获取到值 则获取默认值!name = context.getString("name", "i'm default");}
}

Interceptor

在存入channel之前拦截,如果有多个拦截器,按照配置文件从上至下的顺序依次执行拦截!

package com.zhengkw.flume;import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.util.List;
import java.util.Map;/*** @ClassName:MyInterceptor* @author: zhengkw* @description: flume自定义拦截器* @date: 20/03/13下午 10:21* @version:1.0* @since: jdk 1.8*/
public class MyInterceptor implements Interceptor {
    // 初始化,会在拦截器创建完成后,调用一次@Overridepublic void initialize() {
    }/*** @param event* @descrption: 实现一个往event header中添加一个时间戳* @return: org.apache.flume.Event* @date: 20/03/13 下午 11:52* @author: zhengkw*/@Overridepublic Event intercept(Event event) {
    Map<String, String> headers = event.getHeaders();headers.put("time", System.currentTimeMillis() + "");//将封装好的header set到event中!event.setHeaders(headers);return event;}/*** @param events* @descrption: 拦截一批event* 每个处理方法直接调用拦截单个的方法intercept(Event event)!* @return: java.util.List<org.apache.flume.Event>* @date: 20/03/13 下午 11:49* @author: zhengkw*/@Overridepublic List<Event> intercept(List<Event> events) {
    for (Event event : events) {
    intercept(event);}return events;}/*** @descrption: 因为自定义拦截器实现的接口,* 接口中没有构造器!* 必须实现 Interceptor内部接口才能Builder* 才能返回拦截器的一个实例* @return:* @date: 20/03/13 下午 11:59* @author: zhengkw*/public static class Builder implements Interceptor.Builder {
    /*** @descrption: 实现接口中的build方法* @return: org.apache.flume.interceptor.Interceptor* @date: 20/03/14 上午 12:04* @author: zhengkw*/@Overridepublic Interceptor build() {
    // Builder implementations MUST have a no-arg constructorreturn new MyInterceptor();}@Overridepublic void configure(Context context) {
    }}@Overridepublic void close() {
    }
}

Sink

原理

每个Sink都由一个SinkRunner线程负责调用其process()方法,完成从channel抽取数据,存储到外部设备的逻辑!

package com.zhengkw.flume;import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** @ClassName:MySink* @author: zhengkw* @description: 自定义Sink* @date: 20/03/13下午 10:21* @version:1.0* @since: jdk 1.8*/
public class MySink extends AbstractSink implements Configurable {
    //前缀private String prefix;//后缀private String suffix;//用org.slf4j.Logger在控制台上打印 监控Sink!private Logger logger = LoggerFactory.getLogger(MySink.class);/*** @descrption: 最核心的方法,这个方法负责从channel中获取event,* 将event写到指定的设备* 如果成功传输了一个或多个event,就返回ready,* 否则如果从channel中获取不到event,返回backoff* @return: org.apache.flume.Sink.Status* @date: 20/03/13 下午 11:09* @author: zhengkw*/@Overridepublic Status process() throws EventDeliveryException {
    Status status = Status.READY;//获取channelChannel channel = getChannel();//获取事务对象!Transaction transaction = channel.getTransaction();try {
    //开启事务transaction.begin();Event e = null;//channel调用take方法将数据传给 takelist//将take到的地址值给ee = channel.take();if (e == null) {
    status = Status.BACKOFF;} else {
    //用Logger在控制台上打印info级别的信息//打印event头,打印data byte数组转字符串! 前后缀logger.info("Header:" + e.getHeaders() + " Body:" + prefix + e.getBody().toString() + suffix);}//提交事务!transaction.commit();} catch (ChannelException ex) {
    transaction.rollback();status = Status.BACKOFF;ex.printStackTrace();} finally {
    //关闭事务!transaction.close();}return status;}/*** @param context* @descrption: //从agent的配置文件中获取配置* @return: void* @date: 20/03/13 下午 11:42* @author: zhengkw*/@Overridepublic void configure(Context context) {
    prefix = context.getString("prefix", "kevin:");suffix = context.getString("suffix", ":come on!");}
}

conf

a1.sources = r1
a1.sinks = k1
a1.channels = c1# 自定义source,type必须是类的全类名
a1.sources.r1.type = com.zhengkw.flume.MySource
a1.sources.r1.name = merryme:# 配置sink
a1.sinks.k1.type =com.zhengkw.flume.MySink
a1.sinks.k1.prefix = jack:
a1.sinks.k1.suffix = :back!#为source添加拦截器
a1.sources.r1.interceptors = i1
#type必须写Bulider的全类名
a1.sources.r1.interceptors.i1.type =com.zhengkw.flume.MyInterceptor$Builder# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000# 绑定和连接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

测试

在这里插入图片描述

总结

注意事项

  • 自定义的Source中没有写事务开启,提交,关闭!而在自定义sink中需要实现!channelprocessor中来管理事务的所以source中不需要写!
  • 配置拦截器时,注意全类名用的是内部类的全类名
    com.zhengkw.flume.MyInterceptor$Builder $ 符号代表内部类,因为编译后的class文件名,内部类用此区分!

在这里插入图片描述

  • 因为自定义拦截器实现的是接口, 接口中没有构造器
    必须实现 Interceptor内部接口才能Builder才能返回拦截器的一个实例

在这里插入图片描述

  • Flume中的事务代表的是一批要以原子性写入到channel或要从channel中移除的events!
  • ? Flume在设计时,采取的是at least once语义!因此在没有故障时,flume只会写入一次数据!
  • Source和Sink中的process方法都有可能发生异常,所以可能会发生数据重复的问题!

事务的特点?

  • 事务的实现由channel提供!source和sink在put和 take数据时,只是先获取channel中已经定义好的事务!
    ?

  • 不同的channel的事务可能实现方式是不同的,但是原理和目的是一样的!都是为了put和take一批events的原子性!
    ?

  • put事务,不需要source操作,而是由ChannelProcessor进行操作!

  • 假如一个事务中,一部分event已经写入到目的地,但是随着事务的回滚,这些event可能重复写入!

  • 如果数据重复怎么办?

  • 如果对数据不敏感,重复无所谓!

  • 如果对数据敏感,要求数据最好有一个唯一的id字段,在具体使用数据时,进行去重!

  • Source核心

put事务

?

  • put事务指source将封装好的event,交给ChannelProcessor,ChannelProcessor在处理这批events时,先把events放入到putList中,放入完成后,一次性commit(),这批events就可以成功写入到channel!

  • 写入成功后,执行清空putList操作!?

  • 如果在过程中,发生任何异常,此时执行rollback(),rollback()会回滚putList,回滚也会直接清空putList!

take事务

  • ? take事务指sink不断从channel中获取event!,每获取一批event中的一个,都会将这个event放入takeList中!
  • 一般一批evnet全部写入,执行commit()方法,这个方法会清空takeList!如果在此期间,发生了异常,执行rollback(),此时会回滚takeList中的这批event到channel!

回滚

  • 回滚只是清空put/takeList中的数据!!
  • 具体重新放入event到List中是channel所实现的!!!!
  • 因为channel提供的事务!!
  相关解决方案