当前位置: 代码迷 >> 综合 >> Flume-kafka source和kafka sink
  详细解决方案

Flume-kafka source和kafka sink

热度:71   发布时间:2024-02-01 01:36:32.0

Flume-kafka source和kafka sink

一、Flume-kafka source

a1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource  //kafka类型
a1.sources.source1.kafka.bootstrap.servers = kafka-1:9093,kafka-2:9093,kafka-3:9093 //kafka集群地址,高可靠性,服务器地址用逗号隔开
a1.sources.source1.kafka.topics = mytopic  //主题名称
a1.sources.source1.kafka.consumer.group.id = flume-consumer   //消费者组ID#Example for topic subscription by comma-separated topic list.tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource  //kafka类型
tier1.sources.source1.channels = channel1 //channel通道
tier1.sources.source1.batchSize = 5000 //每个批次写入channel通道的数量
tier1.sources.source1.batchDurationMillis = 2000  //批次持续写入时间
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092 //kafka集群地址
tier1.sources.source1.kafka.topics = test1, test2  //多个主题之间用逗号隔开
tier1.sources.source1.kafka.consumer.group.id = custom.g.id  //消费者组ID#Example for topic subscription by regextier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource  //kafka类型
tier1.sources.source1.channels = channel1  //channel通道
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092 //kafka集群地址
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$  //正则表达式,匹配主题名称
# the default kafka.consumer.group.id=flume is usedagent.sources.kafkaSource.zookeeperConnect = 127.0.0.1:2181  //kafka的zookeeper的地址

二、Flume-kafka sink

a1.sinks.k1.channel = c1  //channel通道
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink  //kafka类型
a1.sinks.k1.kafka.topic = mytopic  //主题名称
a1.sinks.k1.kafka.bootstrap.servers = localhost:9092  //kafka集群地址
a1.sinks.k1.kafka.flumeBatchSize = 20 //每次从chanel中获取的数量
a1.sinks.k1.kafka.producer.acks = 1  //表示只要写入leader成功,就表示消息发送成功(0不管是否写入/-1所有节点都必须写入)
a1.sinks.k1.kafka.producer.linger.ms = 1 //每个批次发送的间隔时间
a1.sinks.k1.kafka.producer.compression.type = snappy  //发送消息使用snappy压缩
a1.sinks.k1.kafka.producer.batch.size=16384 //默认16K,可以根据生产进行压测,适当增大
  相关解决方案