当前位置: 代码迷 >> 综合 >> Kafka Consumer 参数
  详细解决方案

Kafka Consumer 参数

热度:57   发布时间:2023-09-05 18:35:52.0
参数名称 注释 默认值
group.id 消费者组 “”
max.poll.record 最大拉取消息数量 500
max.poll.interval.ms 两次拉取的最大间隔时间,如果超过这个时间没有poll,则会触发rebalance. 300000ms
session.timeout.ms 会话超时时间,超过这个时间没有收到心跳,则broker会移除当前消费者并触发rebalance. The timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms. 10000ms
heartbeat.interval.ms 心跳间隔时间,超时则会触发rebalance,设置应小于session.timeout.ms,通常不大于会话过期时间的1/3 3000ms
enable.auto.commit 是否允许自动提交 true
auto.commit.interval.ms 自动提交间隔时间 5000ms
partition.assignment.strategy 分区指定策略 Collections.singletonList(RangeAssignor.class)
auto.offset.reset 无初始化偏移量或者当前偏移量不存在时,重置偏移量,lastest:最新偏移量,earliest:最早偏移量,none:如果消费者群组中没有更早的offset则抛出异常,anything else:抛出异常 lastest
fetch.min.bytes 从服务器fetch最少数据量,如果没达到则等待 1 byte
fetch.max.bytes 从服务器fetch最大数据量。The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). Note that the consumer performs multiple fetches in parallel. 50 * 1024 * 1024 byte
fetch.max.wait.ms 达到fetch.min.bytes之前的最大等待时间 500ms
max.partition.fetch.bytes 每个分区返回最大字节 1 * 1024 * 1024 byte
key.deserializer key反序列化类,实现了org.apache.kafka.common.serialization.Deserializer接口的反序列化类
value.deserializer value反序列化类
check.crcs 自动检查所使用记录的CRC32。这确保不会发生消息的联机或磁盘损坏。这种检查增加了一些开销,因此在寻求极端性能的情况下可能会禁用它。 true
interceptor.classes 拦截器类,实现了org.apache.kafka.clients.consumer.ConsumerInterceptor的类
exclude.internal.topics 内部topic(如偏移量)记录是否暴露给消费者,如果设为true则只能通过订阅主题来获取 true
internal.leave.group.on.close 消费者关闭时是否离开群组,如果设为false,则会根据会话过期时间触发rebalance true
isolation.level poll消息的事务级别 read_committed:poll返回已提交的消息,read_uncommitted:返回所有消息,包括已终止的消息 read_uncommitted

关于max.poll.record和fetch.max.bytes

Consumer读取partition中的数据是通过调用发起一个fetch请求来执行的。而从KafkaConsumer来看,它有一个poll方法。但是这个poll方法只是可能会发起fetch请求。原因是:Consumer每次发起fetch请求时,读取到的数据是有限制的,通过配置项max.partition.fetch.bytes来限制的。而在执行poll方法时,会根据配置项个max.poll.records来限制一次最多pool多少个record。

那么就可能出现这样的情况: 在满足max.partition.fetch.bytes限制的情况下,假如fetch到了100个record,放到本地缓存后,由于max.poll.records限制每次只能poll出15个record。那么KafkaConsumer就需要执行7次才能将这一次通过网络发起的fetch请求所fetch到的这100个record消费完毕。其中前6次是每次pool中15个record,最后一次是poll出10个record。

关于session.timeout.ms、heartbeat.interval.ms和max.poll.interval.ms

Kafka引入了独立的心跳线程,可以在轮询消息的空档发送心跳。这样依赖,发送心跳的频率与消息轮询的频率之间相互独立。但是会出现活锁问题,活锁:消费者发送心跳,但并没有poll数据。
这种情况 通过session.timeout.ms和max.poll.interval.ms避免发生活锁,如果长时间没有poll,会认为消费者失效,触发rebalance。