当前位置: 代码迷 >> 综合 >> Flume六:自定义 Sink
  详细解决方案

Flume六:自定义 Sink

热度:36   发布时间:2024-01-26 08:34:07.0

案例

自定义实现类

package com.flume.sinks;import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;/*** Created by HP on 2020/1/19.*/
public class MySink extends AbstractSink implements Configurable{//定义logger对象Logger logger;//定义两个属性private String prefix;private String subfix;@Overridepublic void configure(Context context) {//获取logger对象,用于将数据打印到控制台,方便测试查看数据输出的效果logger=LoggerFactory.getLogger(MySink.class);//读取配置文件,为参数赋值prefix=context.getString("prefix","prefix");subfix=context.getString("subfix");}/*** 1、定义返回状态* 2、获取channel* 3、从channel中获取 事务和数据* 4、发送数据** @return* @throws EventDeliveryException*/@Overridepublic Status process() throws EventDeliveryException {//1、定义返回状态Status status=Status.BACKOFF;//2、获取当前 Sink 绑定的 ChannelChannel channel = getChannel();//3、获取事务Transaction transaction = channel.getTransaction();//4、开启事务transaction.begin();try {//5、读取 Channel 中的事件,直到读取到事件结束循环//声明事件Event event ;while (true) {event = channel.take();if (event != null) {break;}}//6、处理数据:此处就是业务逻辑代码了,将数据写到何方//从事件中获取bodyString body = new String(event.getBody());//打印事件中的bodylogger.info("["+prefix+" -- "+body+" -- "+subfix+"]");//7、提交事务transaction.commit();//8、修改状态status=Status.READY;} catch (ChannelException e) {//回滚数据transaction.rollback();status=Status.BACKOFF;e.printStackTrace();} finally {//关闭事务transaction.close();}return status;}}

agent配置文件

#定义
a1.sources = r1
a1.channels = c1
a1.sinks = k1#source
a1.sources.r1.type = netcat
a1.sources.r1.bind = node2
a1.sources.r1.port = 44444#channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100#sink
a1.sinks.k1.type = com.flume.sinks.MySink
a1.sinks.k1.prefix = kaishi
a1.sinks.k1.subfix = ceshi#bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动

flume-ng agent -n a1 -c conf -f /opt/flume_conf/flume_sink.conf -Dflume.root.logger=INFO,console

 

 

  相关解决方案