1 需求
承接上一篇:kafka实战文档 https://blog.csdn.net/m0_37813354/article/details/107599928
flum 收集日志到HDFS
flum 实战文档里有一张流程图
https://blog.csdn.net/m0_37813354/article/details/107595893
下面是向下补齐后成这个样子
现在要做的是这一块
消费kafka的数据到HDFS
2 准备一台flum
具体细节参照
flum 实战
https://blog.csdn.net/m0_37813354/article/details/107595893
拷贝cluster2-slave2上flume安装文件到cluster2-master
scp -r flume-1.7.0 hadoop@cluster2-master:/home/hadoop/liucf/module
3 配置flume文件
## 组件
a1.sources=r1 r2
a1.channels=c1 c2
a1.sinks=k1 k2
## source1
## 5000条一个批次,或者不够5000条时间已经过去2秒也算一个批次,r1 source 从topic_start topic取数据
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize = 5000
a1.sources.r1.batchDurationMillis = 2000
a1.sources.r1.kafka.bootstrap.servers = cluster2-cluster:9092,cluster2-slave1:9092,cluster2-slave2:9092
a1.sources.r1.kafka.topics=topic_start
## source2
## 5000条一个批次,或者不够5000条时间已经过去2秒也算一个批次,r1 source 从topic_event topic取数据
a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.batchSize = 5000
a1.sources.r2.batchDurationMillis = 2000
a1.sources.r2.kafka.bootstrap.servers = cluster2-cluster:9092,cluster2-slave1:9092,cluster2-slave2:9092
a1.sources.r2.kafka.topics=topic_event
## channel1
## channel 类型是 file 检查索引位置checkpointDir,实际数据位置dataDirs
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/hadoop/liucf/module/flume-1.7.0/checkpoint/behavior1
a1.channels.c1.dataDirs = /home/hadoop/liucf/module/flume-1.7.0/data/behavior1/
a1.channels.c1.maxFileSize = 2146435071
a1.channels.c1.capacity = 1000000
a1.channels.c1.keep-alive = 6
## channel2
## channel 类型是 file 检查索引位置checkpointDir,实际数据位置dataDirs
a1.channels.c2.type = file
a1.channels.c2.checkpointDir = /home/hadoop/liucf/module/flume-1.7.0/checkpoint/behavior2
a1.channels.c2.dataDirs = /home/hadoop/liucf/module/flume-1.7.0/data/behavior2/
a1.channels.c2.maxFileSize = 2146435071
a1.channels.c2.capacity = 1000000
a1.channels.c2.keep-alive = 6
## sink1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /data/liucf/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-
##sink2
a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = /data/liucf/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
## 不要产生大量小文件
## 不配置会一个event一个小文件,下面配置,一个小时一个文件,或者一小没到大小128M了也形成一个文件,rollCount = 0 不启用event个数来拆分文件
a1.sinks.k1.hdfs.rollInterval = 3600
a1.sinks.k1.hdfs.rollSize = 134217728
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k2.hdfs.rollInterval = 10
a1.sinks.k2.hdfs.rollSize = 134217728
a1.sinks.k2.hdfs.rollCount = 0
## 控制输出文件是原生文件。
## 控制文件是压缩流,具体哪种压缩方式由codeC控制
a1.sinks.k1.hdfs.fileType = CompressedStream
a1.sinks.k2.hdfs.fileType = CompressedStream
a1.sinks.k1.hdfs.codeC = lzop
a1.sinks.k2.hdfs.codeC = lzop
## 拼装
a1.sources.r1.channels = c1
a1.sinks.k1.channel= c1
a1.sources.r2.channels = c2
a1.sinks.k2.channel= c2
注意:
如果flum和hadoop不在同一台机器上时,HDFS路径需要指明NameNode地址
比如:
a1.sinks.k1.hdfs.path = hdfs://NameNode-Ip:900/data/liucf/log/topic_start/%Y-%m-%d
8020是namenode节点active状态下的端口号;
9000端口:是fileSystem默认的端口号:
4 Flume 组件
4.1 FileChannel 和 MemoryChannel 区别
MemoryChannel 传输数据速度更快,但因为数据保存在 JVM 的堆内存中,Agent 进程 挂掉会导致数据丢失,适用于对数据质量要求不高的需求。 FileChannel 传输速度相对于 Memory 慢,但数据安全保障高,Agent 进程挂掉也可以从 失败中恢复数据。 选型: 金融类公司、对钱要求非常准确的公司通常会选择 FileChannel 传输的是普通日志信息(京东内部一天丢 100 万-200 万条,这是非常正常的),通常 选择 MemoryChannel。
4.2 FileChannel 优化
通过配置 dataDirs 指向多个路径,每个路径对应不同的硬盘,增大 Flume 吞吐量。
官方说明如下: Comma separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel peformance
checkpointDir 和 backupCheckpointDir 也尽量配置在不同硬盘对应的目录中,保证 checkpoint 坏掉后,可以快速使用 backupCheckpointDir 恢复数据
4.3 Sink:HDFS Sink
(1)HDFS 存入大量小文件,有什么影响?
元数据层面:
每个小文件都有一份元数据,其中包括文件路径,文件名,所有者,所属 组,权限,创建时间等,这些信息都保存在 Namenode 内存中。所以小文件过多,会占用 Namenode 服务器大量内存,影响 Namenode 性能和使用寿命
计算层面:
默认情况下 MR 会对每个小文件启用一个 Map 任务计算,非常影响计算性 能。同时也影响磁盘寻址时间。
(2)HDFS 小文件处理
官方默认的这三个参数配置写入 HDFS 后会产生小文件,
- hdfs.rollInterval、
- hdfs.rollSize、
- hdfs.rollCount
基于以上 hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0 几个参数综 合作用,效果如下:
- a 文件在达到 128M 时会滚动生成新文件
- b 文件创建超 3600 秒时会滚动生成新文件