Source
原理
每次Agent启动后,会调用PollableSourceRunner.start(),开启一个PollableSourceRunner线程!
这个线程会初始化PollableSource(可以轮询地去读取数据源中的数据)对象!
? PollableSource由所在的PollingRunner线程控制,调用PollableSource的process()方法,来探测是否有新的数据,将新的数据封装为event,存储到channel中!
package com.zhengkw.flume;import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.PollableSource;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;import java.util.ArrayList;
import java.util.List;/*** @ClassName:MySource* @author: zhengkw* @description: flume自定义source* @date: 20/03/13下午 10:20* @version:1.0* @since: jdk 1.8*/
public class MySource extends AbstractSource implements Configurable, PollableSource {
String name;/*** @descrption: 调用此方法处理封装处理event* Status 为枚举类 ready backoff 2个状态!* <p>* 最核心的方法,读取数据,封装为event,写入到channel* 如果读到数据,封装为event,返回ready,否则如果当前没有读到数据,返回backoff* 每间隔5s,自动封装10个event,10个event的内容为{zhengkw:i}* @return: org.apache.flume.PollableSource.Status* @date: 20/03/13 下午 10:24* @author: zhengkw*/@Overridepublic Status process() throws EventDeliveryException {
Status status = Status.READY;List<Event> events = new ArrayList<>();//一次封装10个for (int i = 0; i < 10; i++) {
SimpleEvent event = new SimpleEvent();//封装数据event.setBody(("zhengkw" + i).getBytes());//添加events.add(event);}try {
//获取channelprocessorChannelProcessor processor = getChannelProcessor();//将数据放入putlist中processor.processEventBatch(events);// 每次执行完,让线程休息5sThread.sleep(5000);} catch (Exception e) {
//如果处理异常就回滚!status = Status.BACKOFF;e.printStackTrace();}return status;}/*** @descrption: 单位ms 控制无数据读取时睡眠时间增量 与countGroup获取的Long做乘积* Thread.sleep(Math.min(* counterGroup.incrementAndGet("runner.backoffs.consecutive")** source.getBackOffSleepIncrement(),* source.getMaxBackOffSleepInterval()));* @return: long* @date: 20/03/13 下午 10:24* @author: zhengkw*/@Overridepublic long getBackOffSleepIncrement() {
//1秒return 1000;}/*** @descrption:单位ms 控制最大睡眠时间* @return: long* @date: 20/03/13 下午 10:25* @author: zhengkw*/@Overridepublic long getMaxBackOffSleepInterval() {
//最长5秒return 5000;}/*** @param context* @descrption: 获取 flume中agent配置文件上下文* @return: void* @date: 20/03/13 下午 10:30* @author: zhengkw*/@Overridepublic void configure(Context context) {
//如果没有通过name获取到值 则获取默认值!name = context.getString("name", "i'm default");}
}
Interceptor
在存入channel之前拦截,如果有多个拦截器,按照配置文件从上至下的顺序依次执行拦截!
package com.zhengkw.flume;import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;import java.util.List;
import java.util.Map;/*** @ClassName:MyInterceptor* @author: zhengkw* @description: flume自定义拦截器* @date: 20/03/13下午 10:21* @version:1.0* @since: jdk 1.8*/
public class MyInterceptor implements Interceptor {
// 初始化,会在拦截器创建完成后,调用一次@Overridepublic void initialize() {
}/*** @param event* @descrption: 实现一个往event header中添加一个时间戳* @return: org.apache.flume.Event* @date: 20/03/13 下午 11:52* @author: zhengkw*/@Overridepublic Event intercept(Event event) {
Map<String, String> headers = event.getHeaders();headers.put("time", System.currentTimeMillis() + "");//将封装好的header set到event中!event.setHeaders(headers);return event;}/*** @param events* @descrption: 拦截一批event* 每个处理方法直接调用拦截单个的方法intercept(Event event)!* @return: java.util.List<org.apache.flume.Event>* @date: 20/03/13 下午 11:49* @author: zhengkw*/@Overridepublic List<Event> intercept(List<Event> events) {
for (Event event : events) {
intercept(event);}return events;}/*** @descrption: 因为自定义拦截器实现的接口,* 接口中没有构造器!* 必须实现 Interceptor内部接口才能Builder* 才能返回拦截器的一个实例* @return:* @date: 20/03/13 下午 11:59* @author: zhengkw*/public static class Builder implements Interceptor.Builder {
/*** @descrption: 实现接口中的build方法* @return: org.apache.flume.interceptor.Interceptor* @date: 20/03/14 上午 12:04* @author: zhengkw*/@Overridepublic Interceptor build() {
// Builder implementations MUST have a no-arg constructorreturn new MyInterceptor();}@Overridepublic void configure(Context context) {
}}@Overridepublic void close() {
}
}
Sink
原理
每个Sink都由一个SinkRunner线程负责调用其process()方法,完成从channel抽取数据,存储到外部设备的逻辑!
package com.zhengkw.flume;import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** @ClassName:MySink* @author: zhengkw* @description: 自定义Sink* @date: 20/03/13下午 10:21* @version:1.0* @since: jdk 1.8*/
public class MySink extends AbstractSink implements Configurable {
//前缀private String prefix;//后缀private String suffix;//用org.slf4j.Logger在控制台上打印 监控Sink!private Logger logger = LoggerFactory.getLogger(MySink.class);/*** @descrption: 最核心的方法,这个方法负责从channel中获取event,* 将event写到指定的设备* 如果成功传输了一个或多个event,就返回ready,* 否则如果从channel中获取不到event,返回backoff* @return: org.apache.flume.Sink.Status* @date: 20/03/13 下午 11:09* @author: zhengkw*/@Overridepublic Status process() throws EventDeliveryException {
Status status = Status.READY;//获取channelChannel channel = getChannel();//获取事务对象!Transaction transaction = channel.getTransaction();try {
//开启事务transaction.begin();Event e = null;//channel调用take方法将数据传给 takelist//将take到的地址值给ee = channel.take();if (e == null) {
status = Status.BACKOFF;} else {
//用Logger在控制台上打印info级别的信息//打印event头,打印data byte数组转字符串! 前后缀logger.info("Header:" + e.getHeaders() + " Body:" + prefix + e.getBody().toString() + suffix);}//提交事务!transaction.commit();} catch (ChannelException ex) {
transaction.rollback();status = Status.BACKOFF;ex.printStackTrace();} finally {
//关闭事务!transaction.close();}return status;}/*** @param context* @descrption: //从agent的配置文件中获取配置* @return: void* @date: 20/03/13 下午 11:42* @author: zhengkw*/@Overridepublic void configure(Context context) {
prefix = context.getString("prefix", "kevin:");suffix = context.getString("suffix", ":come on!");}
}
conf
a1.sources = r1
a1.sinks = k1
a1.channels = c1# 自定义source,type必须是类的全类名
a1.sources.r1.type = com.zhengkw.flume.MySource
a1.sources.r1.name = merryme:# 配置sink
a1.sinks.k1.type =com.zhengkw.flume.MySink
a1.sinks.k1.prefix = jack:
a1.sinks.k1.suffix = :back!#为source添加拦截器
a1.sources.r1.interceptors = i1
#type必须写Bulider的全类名
a1.sources.r1.interceptors.i1.type =com.zhengkw.flume.MyInterceptor$Builder# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000# 绑定和连接组件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
测试
总结
注意事项
- 自定义的Source中没有写事务开启,提交,关闭!而在自定义sink中需要实现!channelprocessor中来管理事务的所以source中不需要写!
- 配置拦截器时,注意全类名用的是内部类的全类名
com.zhengkw.flume.MyInterceptor$Builder $ 符号代表内部类,因为编译后的class文件名,内部类用此区分!
- 因为自定义拦截器实现的是接口, 接口中没有构造器!
必须实现 Interceptor内部接口才能Builder才能返回拦截器的一个实例
- Flume中的事务代表的是一批要以原子性写入到channel或要从channel中移除的events!
- ? Flume在设计时,采取的是at least once语义!因此在没有故障时,flume只会写入一次数据!
- Source和Sink中的process方法都有可能发生异常,所以可能会发生数据重复的问题!
事务的特点?
-
事务的实现由channel提供!source和sink在put和 take数据时,只是先获取channel中已经定义好的事务!
? -
不同的channel的事务可能实现方式是不同的,但是原理和目的是一样的!都是为了put和take一批events的原子性!
? -
put事务,不需要source操作,而是由ChannelProcessor进行操作!
-
假如一个事务中,一部分event已经写入到目的地,但是随着事务的回滚,这些event可能重复写入!
-
如果数据重复怎么办?
-
如果对数据不敏感,重复无所谓!
-
如果对数据敏感,要求数据最好有一个唯一的id字段,在具体使用数据时,进行去重!
-
Source核心
put事务
?
-
put事务指source将封装好的event,交给ChannelProcessor,ChannelProcessor在处理这批events时,先把events放入到putList中,放入完成后,一次性commit(),这批events就可以成功写入到channel!
-
写入成功后,执行清空putList操作!?
-
如果在过程中,发生任何异常,此时执行rollback(),rollback()会回滚putList,回滚也会直接清空putList!
take事务
- ? take事务指sink不断从channel中获取event!,每获取一批event中的一个,都会将这个event放入takeList中!
- 一般一批evnet全部写入,执行commit()方法,这个方法会清空takeList!如果在此期间,发生了异常,执行rollback(),此时会回滚takeList中的这批event到channel!
回滚
- 回滚只是清空put/takeList中的数据!!
- 具体重新放入event到List中是channel所实现的!!!!
- 因为channel提供的事务!!