当前位置: 代码迷 >> 综合 >> Flum实战文档-Kafka Source File Channel HDFS Sink
  详细解决方案

Flum实战文档-Kafka Source File Channel HDFS Sink

热度:90   发布时间:2024-02-02 05:19:27.0

1 需求

承接上一篇:kafka实战文档   https://blog.csdn.net/m0_37813354/article/details/107599928

flum 收集日志到HDFS

flum 实战文档里有一张流程图

https://blog.csdn.net/m0_37813354/article/details/107595893

下面是向下补齐后成这个样子

完整流程图

现在要做的是这一块

消费kafka的数据到HDFS

2 准备一台flum

具体细节参照

flum 实战

https://blog.csdn.net/m0_37813354/article/details/107595893

拷贝cluster2-slave2上flume安装文件到cluster2-master

scp -r flume-1.7.0 hadoop@cluster2-master:/home/hadoop/liucf/module

3 配置flume文件

## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2
## source1
## 5000条一个批次,或者不够5000条时间已经过去2秒也算一个批次,r1 source 从topic_start topic取数据
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = cluster2-cluster:9092,cluster2-slave1:9092,cluster2-slave2:9092
a1.sources.r1.kafka.topics=topic_start
## source2
## 5000条一个批次,或者不够5000条时间已经过去2秒也算一个批次,r1 source 从topic_event topic取数据
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = cluster2-cluster:9092,cluster2-slave1:9092,cluster2-slave2:9092
a1.sources.r2.kafka.topics=topic_event
## channel1
## channel 类型是 file 检查索引位置checkpointDir,实际数据位置dataDirs
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/hadoop/liucf/module/flume-1.7.0/checkpoint/behavior1
a1.channels.c1.dataDirs = /home/hadoop/liucf/module/flume-1.7.0/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## channel2
## channel 类型是 file 检查索引位置checkpointDir,实际数据位置dataDirs
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /home/hadoop/liucf/module/flume-1.7.0/checkpoint/behavior2
a1.channels.c2.dataDirs = /home/hadoop/liucf/module/flume-1.7.0/data/behavior2/
a1.channels.c2.maxFileSize = 2146435071
a1.channels.c2.capacity = 1000000
a1.channels.c2.keep-alive = 6
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /data/liucf/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /data/liucf/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
## 不要产生大量小文件
## 不配置会一个event一个小文件,下面配置,一个小时一个文件,或者一小没到大小128M了也形成一个文件,rollCount = 0 不启用event个数来拆分文件
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0
## 控制输出文件是原生文件。
## 控制文件是压缩流,具体哪种压缩方式由codeC控制
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2

注意:

如果flum和hadoop不在同一台机器上时,HDFS路径需要指明NameNode地址

比如:

a1.sinks.k1.hdfs.path = hdfs://NameNode-Ip:900/data/liucf/log/topic_start/%Y-%m-%d

8020是namenode节点active状态下的端口号;

9000端口:是fileSystem默认的端口号:

4 Flume 组件

4.1 FileChannel 和 MemoryChannel 区别

MemoryChannel 传输数据速度更快,但因为数据保存在 JVM 的堆内存中,Agent 进程 挂掉会导致数据丢失,适用于对数据质量要求不高的需求。 FileChannel 传输速度相对于 Memory 慢,但数据安全保障高,Agent 进程挂掉也可以从 失败中恢复数据。 选型: 金融类公司、对钱要求非常准确的公司通常会选择 FileChannel 传输的是普通日志信息(京东内部一天丢 100 万-200 万条,这是非常正常的),通常 选择 MemoryChannel。

4.2 FileChannel 优化

通过配置 dataDirs 指向多个路径,每个路径对应不同的硬盘,增大 Flume 吞吐量。

官方说明如下: Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance

checkpointDir 和 backupCheckpointDir 也尽量配置在不同硬盘对应的目录中,保证 checkpoint 坏掉后,可以快速使用 backupCheckpointDir 恢复数据

4.3 Sink:HDFS Sink 

(1)HDFS 存入大量小文件,有什么影响?

元数据层面:

              每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属 组,权限,创建时间等,这些信息都保存在 Namenode 内存中。所以小文件过多,会占用 Namenode 服务器大量内存,影响 Namenode 性能和使用寿命

计算层面:

            默认情况下 MR 会对每个小文件启用一个 Map 任务计算,非常影响计算性 能。同时也影响磁盘寻址时间。

(2)HDFS 小文件处理

官方默认的这三个参数配置写入 HDFS 后会产生小文件,

  • hdfs.rollInterval、
  • hdfs.rollSize、
  • hdfs.rollCount

基于以上 hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0 几个参数综 合作用,效果如下:

  • a  文件在达到 128M 时会滚动生成新文件
  • b 文件创建超 3600 秒时会滚动生成新文件

 

 

  相关解决方案