一、概述
1、flume是什么
- Flume提供一个分布式的,可靠的,对大数据量的日志进行高效收集、聚集、移动的服务,Flume只能在Linux环境下运行。
- Flume基于流式架构,容错性强,也很灵活简单,架构简单。
- Flume、Kafka用来实时进行数据收集,Spark、Storm用来实时处理数据,impala用来实时查询。
的
2、flume三层架构
1)flume架构图
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-aJyoUX5E-1595242184263)(http://www.luran.name/upload/5574c4cb12b24f1499be1b8e3a4bea3c_22.png)]
?
2)flume数据流图
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jzxw6kwf-1595242184266)(http://www.luran.name/upload/81167bfcbe6641bd9473c8b06511a3db_11.png)]
?
3)多source多channel多sink
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2ul9qh1k-1595242184268)(http://www.luran.name/upload/72d0b31e5a304c6aa2e5130157c81c8d_33.png)]
首先在每个数据源上都会部署一个 flume agent ,这个agent就是用来采取数据的。
这个agent由3个组件组成:source,channel,sink。而在flume中,数据传输的基本单位是event。下面讲讲这几个概念
(1)source
用于从数据源采集数据,并将数据传输在channel中。source支持多种数据源采集方式。比如监听端口采集数据,从文件中采集,从目录中采集,从http服务中采集等。
(2)channel
位于source和sink之间,是数据的一个暂存区域。一般情况下,从source流出数据的速率和sink流出的数据的速率会有所差异。所以需要一个空间暂存那些还没办法传输到sink进行处理的数据。所以channel类似于一个缓冲区,一个队列。
(3)sink
从channel获取数据,并将数据写到目标源。目标源支持多种,比如本地文件、hdfs、kafka、下一个flume agent的source等均可。
(4)event
传输单元,flume传输的基本单位,包括 headers和body两部分,header可以添加一些头部信息,body则是数据。
二、flume的使用(实战)
1、flume部署
flume的程序本身的部署非常简单,
(1)部署jdk1.8
(2)解压flume的程序压缩包到指定目录,然后添加环境变量即可
(3)修改agent配置文件(来自生产环境真实配置)
简单来说其实就是对source,channel,sink三大组件的工作属性的配置,笔者的业务是接收syslog类型的数据(flume支持的source类型),通过管道channel(1、memory(推荐) 2、File 3、SPILLABLEMEMORY),输出到kafka中(flume支持的sink类型)
################自定以名字################
syslog.sources = src-tcp-1
syslog.channels = ch-tcp-1
syslog.sinks = sink-tcp-1##########################################
## syslog tcp
########################################### 描述/配置tcp源 监听25000端口
syslog.sources.src-tcp-1.type = syslogtcp
syslog.sources.src-tcp-1.port = 25000
syslog.sources.src-tcp-1.host = 0.0.0.0# channel中缓冲内存配置
syslog.channels.ch-tcp-1.type = memory
syslog.channels.ch-tcp-1.capacity = 10000
syslog.channels.ch-tcp-1.transactionCapacity = 1000# 描述tcp的sink
syslog.sinks.sink-tcp-1.type = org.apache.flume.sink.kafka.KafkaSink
syslog.sinks.sink-tcp-1.kafka.bootstrap.servers = localhost:9092
syslog.sinks.sink-tcp-1.kafka.topic = ida_log_recv
syslog.sinks.sink-tcp-1.kafka.producer.acks = 1
syslog.sinks.sink-tcp-1.flumeBatchSize = 20# 将sources和sinks绑定到channel
syslog.sources.src-tcp-1.channels = ch-tcp-1
syslog.sinks.sink-tcp-1.channel = ch-tcp-1
三、flume运行和监控(实战)
1、flume的启动
./flume-ng agent -c /usr/local/apache-flume-1.9.0-bin/conf -f /opt/apps/flume/conf/avro.conf -n a1 -Dflume.root.logger=INFO,console
做成run脚本
#日志配置
-Dlog4j.configuration=File:/usr/local/xxxx/syslog/config/log4j.properties \
#flume监控
-Dflume.monitoring.type=http -Dflume.monitoring.port=25001 \
-cp "/usr/local/apache-flume-1.9.0-bin/conf:/usr/local/apache-flume-1.9.0-bin/lib/*" \org.apache.flume.node.Application \
#指向flume-app.conf文件的绝对路径
-f /usr/local/xxxx/syslog/config/syslog.config \
#flume-app.conf文件中agent的名字
-n "syslog"
2、测试发送数据
通过socket发送数据
public static void send(){Socket client = null;OutputStream out = null;try {client = new Socket("127.0.0.1", 25000);out = client.getOutputStream();Date s = new Date();String a = String.valueOf(s);String s1 = a.substring(4,19);String event = "<14>"+s1+ " TIP {\"log_type\":\"mdevice_info\"}\n";out.write(event.getBytes());out.flush();System.out.println("发送成功 ");} catch (UnknownHostException e) {e.printStackTrace();} catch (IOException e) {e.printStackTrace();} finally {try {out.close();} catch (IOException e) {e.printStackTrace();}try {client.close();} catch (IOException e) {e.printStackTrace();}}}
模拟发送50条,写入了kafka
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EGjEa1Uz-1595242184270)(C:\Users\luran\AppData\Roaming\Typora\typora-user-images\1577093917709.png)]
3、flume监控
总所周知,任何大数据组件都需要监控,flume实时收集数据动态信息的界面,包括flume成功收集的日志数量、成功发送的日志数量、flume启动时间、停止时间、以及flume一些具体的配置信息,像通道容量等。在遇到数据收集瓶颈或者数据丢失的时候,通过分析监控数据来分析、解决问题。
(1)Http监控
上面的启动脚本中配置了这种监控方式,指定了25001端口,启动flume后,通过 http://ip:25001/metrics 就可以得到flume的一个json格式的监控数据。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GcCukwc6-1595242184271)(C:\Users\luran\AppData\Roaming\Typora\typora-user-images\1577094223602.png)]