当前位置: 代码迷 >> 综合 >> 【分布式】Kafka-clients 入门
  详细解决方案

【分布式】Kafka-clients 入门

热度:94   发布时间:2023-12-22 01:40:26.0

目录

1.消息队列

2.Kafka是什么?

2.Kafka架构概念

2.1 集群角色

2.2 逻辑-物理数据结构

2.2.1 Topic主题

2.2.2 Partition分区

2.2.3 message消息

3.设计原理

3.1 生产者发送消息

3.1.1 生产者配置

3.2 Consumer消费流程

3.2.1 poll方法

3.2.2 coordnator协调器

?3.3 Broker 设计原理

4.Kafka特性

4.1 Kafka如何保证幂等性

4.2 Kafka高性能特性

4.3 生产者优化


1.消息队列

MQ(Message Queue,消息队列)通常用来业务解耦流量削峰异步等等,市面上有rabbitmq、kafka、ActiveMQ、RocketMQ等等。接下来介绍消息中间件Kafka。更详细参考

2.Kafka是什么?

Kafka是一个分布式的消息引擎。具有以下特征

  1. 能够发布和订阅消息流(类似于消息队列)

  2. 以容错的、持久的方式存储消息流

  3. 多分区概念,提高了并行能力

2.Kafka架构概念

 

2.1 集群角色

  1. Broker Kafka 集群为了提高可用性,一般有多个服务实例(进程),每一个服务实例都是一个Broker,用于存储消息
  2. Producer:向Kafka发送消息,生产者会根据topic分发消息到Topic上的哪一个分区。最简单的方式从分区列表中轮流选择。
  3. Cousumer:负责订阅和消费消息。消费者用consumerGroup来标识自己。
  4. CousumerGroup:同一个Consumer Group可有多个Consumers,Kafka每个消息可以发给多个组,但同组只有一个Consumer消费消息。
  5. zookeeper:集群管理,管理以上三种角色的中间组件,保存kafka系统信息,例如消费进度offset,消费组Group等。

2.2 逻辑-物理数据结构

 

2.2.1 Topic主题

消息主题,包含多个分区Partition(队列),每一个消息都有它的topic,Kafka通过topic对消息进行归类。

Kafka中可以将Topic从物理上划分成一个或多个分区(Partition),每个分区在物理上对应一个文件夹,,该dir包含了这个分区的所有消息(.log)和索引文件(.index),这使得Kafka的吞吐率可以水平扩展。

2.2.2 Partition分区

每个分区都是一个 顺序的、不可变的消息逻辑队列,并且可以持续的添加。

每个分区在物理上对应一个文件夹,以”topicName_partitionIndex”的命名方式命名。

2.2.3 message消息

消息是最小的订阅单元,分区中的消息都被分了一个序列号,称之为偏移量(offset),在每个分区中此偏移量都是唯一的。

3.设计原理

3.1 生产者发送消息

producer在发布消息的时候,可以为每条消息指定Key,这样消息被发送到broker时,会根据分区算法把消息存储到对应的分区中TopicPartition tp,如果分区规则设置的合理,那么所有的消息将会被均匀的分布到不同的分区中,这样就实现了负载均衡

1.序列化消息&&.计算partition

根据key和value的配置(默认String)对消息进行序列化,

然后计算partition:ProducerRecord对象中如果指定了partition,就使用这个partition。

否则根据key和topic的partition数目取余

如果key也没有的话就随机生成一个counter,使用这个counter来和partition数目取余。这个counter每次使用的时候递增。

2.发送到batch&&唤醒Sender 线程

根据topic-partition获取对应的batchs(Dueue<ProducerBatch>),然后将消息append到batch中;

如果有batch满了则唤醒Sender 线程。队列的操作是加锁执行,所以batch内消息时有序的。后续的Sender操作当前方法异步操作。

3.Sender把消息有序发到 broker(tp replia leader)

3.1 确定tp relica leader 所在的broker 

  • 每台broker都保存了kafka集群的metadata信息,metadata信息里包括了每个topic的所有partition的信息: leader, leader_epoch, controller_epoch, isr, replicas等;Kafka客户端从任一broker都可以获取到需要的metadata信息;sender线程通过metadata信息可以知道tp leader的brokerId
  • producer也保存了metada信息,同时根据metadata更新策略(定期更新metadata.max.age.ms、失效检测,强制更新:检查到metadata失效以后,调用metadata.requestUpdate()强制更新

4. Sender处理broker发来的produce response

 一旦broker处理完Sender的produce请求,就会发送produce response给Sender,此时producer将执行我们为send()设置的回调函数。至此producer的send执行完毕。

3.1.1 生产者配置

buffer.memory:buffer设置大了有助于提升吞吐性,但是batch太大会增大延迟,可搭配linger_ms参数使用
linger_ms:如果batch太大,或者producer qps不高,batch添加的会很慢,我们可以强制在linger_ms时间后发送batch数据
ack:producer收到多少broker的答复才算真的发送成功
         0表示producer无需等待leader的确认(吞吐最高、数据可靠性最差)
        1代表需要leader确认写入它的本地log并立即确认
       -1/all 代表所有的ISR都完成后确认(吞吐最低、数据可靠性最高)

3.2 Consumer消费流程

如下图所示,消费者以消费者组为单位 订阅主题,一个主题的每个消息可以发给多个消费者组,一个消费者组内仅有一个消费者能够消费消息。就像团体奖项,每个组都要选个组长去领奖一样,消费者组 组员之间需要用协调器进行协调选择一个成员去拉取消息。

  

3.2.1 poll方法

消费者先确保协调器已经分配好分区让当前消费者拉取,然后通过fetcher拉消息(单线程)

3.2.2 coordnator协调器

每个consumer group在broker上都有一个coordnator来管理,消费者加入和退出,消息处理的负载均衡,以及消费消息的位移都由coordnator处理。

消费者通过AbstractCoordinator$HeartbeatTask心跳线程来与broker保持心跳,超时会认为挂掉。

负载均衡reblance

一个消费者组包含多个消费者,为了保证消费者的负载比较均衡,使用协调器进行均衡。协调器会负责协调组中的leader,分配哪些成员接收哪个分区的消息。

当一些原因导致consumer对partition消费不再均匀时,kafka会自动执行reblance,使得consumer对partition的消费再次平衡。

rebalance的时机

  • 组订阅topic数变更

  • topic partition数变更

  • consumer成员变更: consumer 加入群组或者离开群组(consumer被检测为崩溃)

举例 consumer加入引起的reblance

  1. 使用join协议,表示有consumer 要加入到group中

  2. 使用sync 协议,根据分配规则进行分配

(上图图片摘自网络)

位移管理

consumer的消息位移代表了当前grouptopic-partition的消费进度,consumer宕机重启后可以继续从该offset开始消费。

在kafka0.8之前,位移信息存放在zookeeper上,由于zookeeper不适合高并发的读写;

新版本Kafka把位移信息当成消息,主题是__consumers_offsets 默认有50个分区

消息的key =groupId+topic_partition,value =offset.


3.3 Broker 设计原理

Broker 是Kafka 集群中的服务实例。

主要功能:

  1. 处理生产者发送过来的消息,
  2. 消费者消费的拉取请求。
  3. 集群节点的管理等

(1)broker消息存储

  • Kafka的消息以二进制的方式紧凑地存储,节省空间

  • 此外消息存在ByteBuffer而不是堆内存,这样broker进程挂掉时,数据不会丢失,同时避免了gc问题

  • 通过零拷贝和顺序寻址,让消息存储和读取速度都非常快

  • 处理fetch请求的时候通过zero-copy 加快速度

(2)broker状态数据

broker设计中,每台机器都保存了相同的状态数据。主要包括以下:

  • 所有broker的信息:比如每台broker的ID、机架信息以及配置的若干组连接信息

  • 所有分区的信息:所谓分区信息指的是分区的leader、ISR和AR信息以及当前处于offline状态的副本集合。这部分数据按照topic-partitionID进行分组,可以快速地查找到每个分区的当前状态。(注:AR表示assigned replicas,即创建topic时为该分区分配的副本集合)

broker负载均衡

分区数量负载:各台broker的partition数量应该均匀

partition Replica分配算法如下:

  1. 将所有Broker(假设共n个Broker)和待分配的Partition排序

  2. 将第i个Partition分配到第(i mod n)个Broker上

  3. 将第i个Partition的第j个Replica分配到第((i + j) mod n)个Broker上

4.Kafka特性

4.1 Kafka如何保证幂等性

不丢消息

  • 首先kafka保证了对已提交消息的at least保证

  • Sender有重试机制

  • producer业务方在使用producer发送消息时,注册回调函数。在onError方法中重发消息

  • consumer 拉取到消息后,处理完毕再commit,保证commit的消息一定被处理完毕

不重复

  • consumer拉取到消息先保存,commit成功后删除缓存数据

4.2 Kafka高性能特性

  • partition提升了并发

  • zero-copy

  • 顺序写入

  • 消息聚集batch

  • 页缓存

4.3 生产者优化

  • 增大producer数量

  • ack配置

  • batch