参数名称 | 注释 | 默认值 |
---|---|---|
group.id | 消费者组 | “” |
max.poll.record | 最大拉取消息数量 | 500 |
max.poll.interval.ms | 两次拉取的最大间隔时间,如果超过这个时间没有poll,则会触发rebalance. | 300000ms |
session.timeout.ms | 会话超时时间,超过这个时间没有收到心跳,则broker会移除当前消费者并触发rebalance. The timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms . |
10000ms |
heartbeat.interval.ms | 心跳间隔时间,超时则会触发rebalance,设置应小于session.timeout.ms,通常不大于会话过期时间的1/3 | 3000ms |
enable.auto.commit | 是否允许自动提交 | true |
auto.commit.interval.ms | 自动提交间隔时间 | 5000ms |
partition.assignment.strategy | 分区指定策略 | Collections.singletonList(RangeAssignor.class) |
auto.offset.reset | 无初始化偏移量或者当前偏移量不存在时,重置偏移量,lastest:最新偏移量,earliest:最早偏移量,none:如果消费者群组中没有更早的offset则抛出异常,anything else:抛出异常 | lastest |
fetch.min.bytes | 从服务器fetch最少数据量,如果没达到则等待 | 1 byte |
fetch.max.bytes | 从服务器fetch最大数据量。The maximum amount of data the server should return for a fetch request. Records are fetched in batches by the consumer, and if the first record batch in the first non-empty partition of the fetch is larger than this value, the record batch will still be returned to ensure that the consumer can make progress. As such, this is not a absolute maximum. The maximum record batch size accepted by the broker is defined via message.max.bytes (broker config) or max.message.bytes (topic config). Note that the consumer performs multiple fetches in parallel. |
50 * 1024 * 1024 byte |
fetch.max.wait.ms | 达到fetch.min.bytes之前的最大等待时间 | 500ms |
max.partition.fetch.bytes | 每个分区返回最大字节 | 1 * 1024 * 1024 byte |
key.deserializer | key反序列化类,实现了org.apache.kafka.common.serialization.Deserializer接口的反序列化类 | |
value.deserializer | value反序列化类 | |
check.crcs | 自动检查所使用记录的CRC32。这确保不会发生消息的联机或磁盘损坏。这种检查增加了一些开销,因此在寻求极端性能的情况下可能会禁用它。 | true |
interceptor.classes | 拦截器类,实现了org.apache.kafka.clients.consumer.ConsumerInterceptor的类 | 无 |
exclude.internal.topics | 内部topic(如偏移量)记录是否暴露给消费者,如果设为true则只能通过订阅主题来获取 | true |
internal.leave.group.on.close | 消费者关闭时是否离开群组,如果设为false,则会根据会话过期时间触发rebalance | true |
isolation.level | poll消息的事务级别 read_committed:poll返回已提交的消息,read_uncommitted:返回所有消息,包括已终止的消息 | read_uncommitted |
关于max.poll.record和fetch.max.bytes
Consumer读取partition中的数据是通过调用发起一个fetch请求来执行的。而从KafkaConsumer来看,它有一个poll方法。但是这个poll方法只是可能会发起fetch请求。原因是:Consumer每次发起fetch请求时,读取到的数据是有限制的,通过配置项max.partition.fetch.bytes来限制的。而在执行poll方法时,会根据配置项个max.poll.records来限制一次最多pool多少个record。
那么就可能出现这样的情况: 在满足max.partition.fetch.bytes限制的情况下,假如fetch到了100个record,放到本地缓存后,由于max.poll.records限制每次只能poll出15个record。那么KafkaConsumer就需要执行7次才能将这一次通过网络发起的fetch请求所fetch到的这100个record消费完毕。其中前6次是每次pool中15个record,最后一次是poll出10个record。
关于session.timeout.ms、heartbeat.interval.ms和max.poll.interval.ms
Kafka引入了独立的心跳线程,可以在轮询消息的空档发送心跳。这样依赖,发送心跳的频率与消息轮询的频率之间相互独立。但是会出现活锁问题,活锁:消费者发送心跳,但并没有poll数据。
这种情况 通过session.timeout.ms和max.poll.interval.ms避免发生活锁,如果长时间没有poll,会认为消费者失效,触发rebalance。