当前位置: 代码迷 >> 综合 >> Storm-Kafka源代码解析
  详细解决方案

Storm-Kafka源代码解析

热度:97   发布时间:2023-12-09 03:36:49.0

Storm-Kafka源代码解析


说明:本文所有代码基于Storm 0.10版本,本文描述内容只涉及KafkaSpout和KafkaBolt相关,不包含trident特性。

Kafka Spout

KafkaSpout的构造函数如下:

public KafkaSpout(SpoutConfig spoutConf) {_spoutConfig = spoutConf;
}

其构造参数来自于SpoutConfig对象,Spout中用到的所有参数都来自于该对象。该对象参数说明如下:

SpoutConfig

SpoutConfig继承自KafkaConfig。两个类内部所有参数及说明如下:

/*** Kafka地址和分区关系对应信息* 在kafka的分区信息和地址信息都很清楚的情况下,可以以直接使用StaticHosts* 但是该对象参数很难构建,需要的信息很多,所以我们一般情况下并不使用它。* 我们主要用的是ZKHosts的实例。可以在其中设置Zookeeper地址等信息,然后动态获取kafka元数据* ZKHost的参数信息见下面一段。* 必选参数**/
public final BrokerHosts hosts;
/*** 要从kafka中读取的topic队列名称* 必选参数**/
public final String topic;
/*** Kafka的客户端id参数,该参数一般不需要设置* 默认值为kafka.api.OffsetRequest.DefaultClientId()* 空字符串**/
public final String clientId;
/*** Kafka Consumer每次请求获取的数据量大小* 每次获取的数据消费完毕之后,才会再获取数据* 默认1MB**/
public int fetchSizeBytes = 1024 * 1024;
/*** Kafka SimpleConsumer 客户端和服务端连接的超时时间* 单位:毫秒**/
public int socketTimeoutMs = 10000;
/*** Consumer每次获取数据的超时时间* 单位:毫秒**/
public int fetchMaxWait = 10000;
/*** Consumer通过网络IO获取数据的socket buffet大小,* 默认1MB**/
public int bufferSizeBytes = 1024 * 1024;
/*** 该参数有两个作用:* 1:申明输出的数据字段 declareoutputFileds* 2:对从kafka中读到的数据进行反序列化,即将byte字节数组转为tuple对象。* 对kafka存入数据的key和message都比较关心的,可以使用KeyValueSchemeAsMultiScheme,* 如果不关心,可以使用SchemeAsMultiScheme* 默认接口实现一般都只会输出一个字段或者两个字段,很多时候,我们需要直接从kafka中读取到数据之后,就将每个字段解析了,然后进行简单处理再emit* 这个时候,建议自己实现MultiScheme接口* 必选参数**/
public MultiScheme scheme = new RawMultiScheme();
/*** 在拓扑提交之后,KafkaSpout会从zookeeper中读取以前的offset值,以便沿着上次位置继续读取数据。* KafkaSpout会检查拓扑ID和zookeeper中保存的拓扑id是否相同。* 如果不同,并且ignoreZkOffsets=true,那么就会从startOffsetTime参数位置读取数据* 否则,沿着zookeeper中保存的offset位置继续读取数据。* 也就是说,当ignoreZkOffsets=true的时候,kafkaspout只能保证在拓扑不杀掉的情况下,当worker进程异常退出的时候,会沿着上次读取位置继续读取数据,当拓扑重新提交的时候,就会从队列最早位置开始读取数据。* 这样就会存在重复读取数据的问题,所以正式场景,该参数还是应该设置为false。以保证任何场景数据的只被读取一次。**/
public boolean ignoreZkOffsets = false;
/*** 拓扑第一次提交,zookeeper中没有保存对应offset的情况下,默认从kafka中读取的offset位置。默认从队列最早位置开始读取数据,即从队列最开始位置读取数据。**/
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
/*** * 如果当前的(offset值-failed offsets中最小值) < maxOffsetBehind* 那么就会清理failed列表中所有大于maxOffsetBehind的offset值。* 这是为了防止failed过多,重发太多导致内存溢出* 不过默认为了保证数据不丢失,所以maxOffsetBehind设置的最大**/
public long maxOffsetBehind = Long.MAX_VALUE;