当前位置: 代码迷 >> 综合 >> 精通RocketMQ系列:万字深度剖析RocketMQ Consumer start启动流程源码
  详细解决方案

精通RocketMQ系列:万字深度剖析RocketMQ Consumer start启动流程源码

热度:86   发布时间:2024-01-09 19:10:36.0

一、概述

RocketMQ的消息消费包含两种模式:推push和拉pull。对于拉模式官方已经不推荐使用,所以我们主要介绍推模式。
特别说明:本文的源码基于RocketMQ4.8

二、Push模式启动流程

1、consumer代码片段

package com.example.demo.rocketmq;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.List;/*** @author Saint*/
public class Consumer {
    public static void main(String[] args) throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("study-consumer");consumer.setNamesrvAddr("127.0.0.1:9876");// topic , 过滤器 * 表示不过滤consumer.subscribe("saint-study-topic", "*");consumer.setConsumeTimeout(20L);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);// 消息传播模式consumer.setMessageModel(MessageModel.CLUSTERING);consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
    for (MessageExt msg : msgs) {
    System.out.println(new String(msg.getBody()));}// ack机制return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.println("Consumer start。。。。。。");}
}

2、确定启动流程入口

在consumer.start()行我们F7进入方法发现它所有的逻辑都是DefaultMQPushConsumerImpl类中的start()方法中做的,从这里我们可以确定入口就是DefaultMQPushConsumerImpl#start()。
在这里插入图片描述
在这里插入图片描述
到这肯定有很多小机灵鬼要问了,这个traceDispatcher是干嘛用的?满脸黑人问号。

从注释中我们看出,它是用来异步传输数据的,默认情况下它是null,也就是说正常我们使用不到它,所以不需要专门花费过多精力去看它。
在这里插入图片描述

3、启动流程逻辑

接着上面,我们继续F7步入方法,可以看到此时consumer服务的状态处于CREATE_JUST,然后我们继续深入剖析一把start()方法的内部,拔开它的底裤。
在这里插入图片描述
秉持了广大网友的习惯,我们先把源码和相应注释贴出来,方便大家先了解一下。
其实大家看RocketMQ相对新点的版本会发现,注释就像是珍稀动物一下,那是真的少。可想而知在开源之前大部分都是中文注释,开源了中文注释指定不能留,外国人看不懂。所以自己啃吧,好在RocketMQ的代码逻辑很贴近中国人的思维,没那么多设计模式;相对比较好理解。

1)DefaultMQPushConsumerImpl#start()方法

流程图:
在这里插入图片描述

public synchronized void start() throws MQClientException {
    switch (this.serviceState) {
    case CREATE_JUST:log.info("the consumer [{}] start beginning. messageModel={}, isUnitMode={}", this.defaultMQPushConsumer.getConsumerGroup(),this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());// 0、将消费者服务状态预设置为 "启动失败"this.serviceState = ServiceState.START_FAILED;// 1、校验一堆配置,例如:consumerGroup配置规则、消息传播方式不能为null(默认为集群消费--CLUSTERING)、并发消费线程数量。this.checkConfig();// 2、copy订阅关系,监听重投队列%RETRY%TOPIC。this.copySubscription();// 3、如果消息传播方式是集群模式,将消费者实例的name 修改为PIDif (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
    this.defaultMQPushConsumer.changeInstanceNameToPID();}// 4、初始化MQ客户端连接工厂,此处的MQClientManager使用了饿汉式单例模式this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);// 5、 消息重新负载消费// 指定消费组this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());// 指定消息传播方式this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());// 队列分配算法,指定如何将消息队列分配给每个使用者客户端。this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());// 指定MQClient工厂this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);// 6、指定Pull模型请求包装器this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());// 注册消息过滤钩子this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);// 7、指定消费进度(偏移量)if (this.defaultMQPushConsumer.getOffsetStore() != null) {
    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();} else {
    switch (this.defaultMQPushConsumer.getMessageModel()) {
    // 广播模式offset保存在本地case BROADCASTING:this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;// 集群模式offset保存在服务器case CLUSTERING:this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());break;default:break;}this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);}this.offsetStore.load();// 8、创建消费服务if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
    // 顺序消费this.consumeOrderly = true;this.consumeMessageService =new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
    // 并行消费this.consumeOrderly = false;this.consumeMessageService =new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());}// 启动消费服务--定时任务this.consumeMessageService.start();// 向broker注册自己(consumer)boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);if (!registerOK) {
    this.serviceState = ServiceState.CREATE_JUST;this.consumeMessageService.shutdown(defaultMQPushConsumer.getAwaitTerminationMillisWhenShutdown());throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}mQClientFactory.start();log.info("the consumer [{}] start OK.", this.defaultMQPushConsumer.getConsumerGroup());// 将consumer的状态修改为 "运行中"this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The PushConsumer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}// 从nameServer中获取监听的topic路由信息,若变更则修改。this.updateTopicSubscribeInfoWhenSubscriptionChanged();// 检查消费者是否注册到broker中this.mQClientFactory.checkClientInBroker();// 向所有broker发送心跳信息、并上传FilterClass的源文件this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();// 唤醒ReBalance服务线程this.mQClientFactory.rebalanceImmediately();}

我们刚启用一个consumer的时候,consumer客户端的状态是CREATE_JUST,在Switch case逻辑中,当serverState是CREATE_JUST时,会执行以下逻辑:
在这里插入图片描述

(1)将消费者服务状态预设置为 “启动失败”。这个操作相信很多看过JUC的源码的大佬都会记得,JUC坐着道格.李老爷子的编程习惯:先预置状态,后续逻辑成功直接提交,否者就回滚。

在这里插入图片描述

(2)然后我们真正进入RocketMQ的启动流程,第一步是很常规的校验操作,校验一堆配置,比如:consumerGroup配置规则、消息传播方式不能为null(默认为集群消费–CLUSTERING)、并发消费线程数量等。感兴趣的老哥可以自己跟进去,你会发现这个逻辑太像我们平时写的代码了。

在这里插入图片描述

(3)第二步:复制copy订阅关系,监听重投队列%RETRY%TOPIC。这一步对于我们整体consumer的启动流程来讲意义不大,所以不要专进入不要专进入不要专进入。重要的事说三遍。
在这里插入图片描述
(4)第三步:判断消息传播方式是否为集群模式,是就将消费者实例的name 修改为PID。
在这里插入图片描述
(5)第四步:初始化MQ客户端连接工厂–MQClientManagerFactory,进而初始化MQClient,此处的MQClientManager使用了饿汉式单例模式。MQClientInstance封装了 RocketMQ 网络处理 API,是消息生产者、消息消费者与 NameServer、Broker 打交道的网络通道。另外:同一个 JVM 中的不同消费者和不同生产者在启动时获取到的 MQClientInstance 实例都是同一个
在这里插入图片描述

(6)第五步:指定消息重新重新负载的相关配置,比如:消费组、消息传播方式、队列负载策略、MQClient工厂等。
在这里插入图片描述

(7)第六步:指定创建Pull模型请求包装器(PullAPIWrapper),它是拉取Broker消息的API操作包装器。
在这里插入图片描述

(8)第七步:指定消息消费进度OffsetStore对象,初始化消息消费进度。集群模式下消息消费进度offset保存在broker、广播模式下消息消费进度offset保存在client消费者端,即本地文件中。如果是广播模式,紧接着会从本磁盘中加载消费进度文件。
在这里插入图片描述
从这里我们可以看到本地的文件的命名规则为:RocketMQ运行目录 / MQClientInstance的ID / groupName / offsets.json

在这里插入图片描述

(9)第八步:根据是否为顺序消费创建ConsumeMessageOrderlyService实现或ConsumeMessageConcurrentlyService实现的不同ConsumeMessageService对象并开启消费消息服务----这是个定时任务。consumeMessageService主要负责消息消费,内部维护一个线程池,可以通过参数配置最大和最小核心线程数、注意的它的阻塞队列是无界的。
在这里插入图片描述

(10)第九步:consumer向broker注册自己,注册失败则将consumer服务实例的状态回滚到CREATE_JUST,并将已经启动的消费消息的定时任务取消。否则将consumer的状态修改为RUNNING,并启动MQClientInstance。

嚯,启动MQClientInstance都干了什么呢?
在这里插入图片描述
卧槽,居然有注释。我们看一下它的意思:

public void start() throws MQClientException {
    synchronized (this) {
    switch (this.serviceState) {
    case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;// 如果nameserver地址为空,会去`http:// + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP`获取,// WS_DOMAIN_NAME由配置参数rocketmq.namesrv.domain设置,WS_DOMAIN_SUBG由配置参数rocketmq.namesrv.domain.subgroup设置if (null == this.clientConfig.getNamesrvAddr()) {
    this.mQClientAPIImpl.fetchNameServerAddr();}// 开启请求和响应通道,即远程通信服务,生产者和消费者客户端处理消息发送和消费的API。this.mQClientAPIImpl.start();/*** 1.定时2min拉取最新的nameServer信息* 2.默认定时30秒拉取最新的broker和topic路由信息(可配置)* 3.默认定时30s向broker发送心跳包(可配置)* 4.默认定时5s持久化consumer的offset(可配置)* 5.定时1分钟,动态调整线程池线程数量*/this.startScheduledTask();// 启动消息拉取服务this.pullMessageService.start();// 启动负载均衡服务this.rebalanceService.start();// 启动producer消息推送服务this.defaultMQProducer.getDefaultMQProducerImpl().start(false);log.info("the client factory [{}] start OK", this.clientId);this.serviceState = ServiceState.RUNNING;break;case START_FAILED:throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);default:break;}}
}
  1. 开启MQClientAPIImpl远程通信服务,生产者和消费者客户端处理消息发送和消费的API。
  2. 开启各种各样的定时任务,比如定时拉取最新的nameServer、broker、topic信息,向broker发送心跳,持久化offset和调整线程池数量等。
  3. 开启从Broker拉取消息服务,供消费端消息消费。
  4. 开启消费者和消费队列关于消息消费的负载均衡服务。

这地方展开了说会很多,我们后面专门聊一下broker相关的内容。

可能大家会困惑这个serviceState的状态不是修改为了START_FAILED吗?这边不就抛异常直接退出了!!!!注意这里的serviceState是MQClientInstance自己的,而不是上文说的DefaultMQPushConsumerImpl中的那个serviceState。
在这里插入图片描述

后面无论consumer的状态是什么都会执行:
在这里插入图片描述
(11)第十步:从namesrv获取topic路由信息,若变更则修改。

在这里插入图片描述
(12)第十一步:向broker端校验客户端,检查client是否注册到broker。

在这里插入图片描述
(13)第十二步:向所有broker发送心跳信息、并上传FilterClass的源文件给FilterServer。

在这里插入图片描述
(14)第十二步:唤醒ReBalance服务线程,立即负载队列。

对于(11)—(14)步的详细介绍我们放在下一篇消费者subscribe流程中介绍。

三、Pull模式启动流程

Pull模式的启动流程主要体现在DefaultMQPullConsumerImpl类中,下面我们贴出其start()方法:

public synchronized void start() throws MQClientException {
    switch (this.serviceState) {
    case CREATE_JUST:this.serviceState = ServiceState.START_FAILED;this.checkConfig();this.copySubscription();if (this.defaultMQPullConsumer.getMessageModel() == MessageModel.CLUSTERING) {
    this.defaultMQPullConsumer.changeInstanceNameToPID();}this.mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(this.defaultMQPullConsumer, this.rpcHook);this.rebalanceImpl.setConsumerGroup(this.defaultMQPullConsumer.getConsumerGroup());this.rebalanceImpl.setMessageModel(this.defaultMQPullConsumer.getMessageModel());this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPullConsumer.getAllocateMessageQueueStrategy());this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);this.pullAPIWrapper = new PullAPIWrapper(mQClientFactory,this.defaultMQPullConsumer.getConsumerGroup(), isUnitMode());this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);if (this.defaultMQPullConsumer.getOffsetStore() != null) {
    this.offsetStore = this.defaultMQPullConsumer.getOffsetStore();} else {
    switch (this.defaultMQPullConsumer.getMessageModel()) {
    case BROADCASTING:this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());break;case CLUSTERING:this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPullConsumer.getConsumerGroup());break;default:break;}this.defaultMQPullConsumer.setOffsetStore(this.offsetStore);}this.offsetStore.load();boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPullConsumer.getConsumerGroup(), this);if (!registerOK) {
    this.serviceState = ServiceState.CREATE_JUST;throw new MQClientException("The consumer group[" + this.defaultMQPullConsumer.getConsumerGroup()+ "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),null);}mQClientFactory.start();log.info("the consumer [{}] start OK", this.defaultMQPullConsumer.getConsumerGroup());this.serviceState = ServiceState.RUNNING;break;case RUNNING:case START_FAILED:case SHUTDOWN_ALREADY:throw new MQClientException("The PullConsumer service state not OK, maybe started once, "+ this.serviceState+ FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),null);default:break;}}

从代码上来看,几乎和DefaultMQPushConsumerImpl类的start()方法一样,区别在于DefaultMQPullConsumerImpl类的start()方法中在switch case逻辑后多了如下片段:

// 从nameServer中获取监听的topic路由信息,若变更则修改。
this.updateTopicSubscribeInfoWhenSubscriptionChanged();
// 检查消费者是否注册到broker中
this.mQClientFactory.checkClientInBroker();
// 向所有broker发送心跳信息、并上传FilterClass的源文件
this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
// 唤醒ReBalance服务线程
this.mQClientFactory.rebalanceImmediately();

四、总结

1、几个关键类的作用

DefaultMQPushConsumerImpl是供客户端进行消息消费的,它创建了ConsumeMessageService消息消费服务、消息进度保存对象OffsetStore,消息消费的监听器对象MessageListener

MQClientInstance开启了请求和响应通道、即远程通信服务;开启了消息拉取服务PullMessageService、从Broker拉取消息;负载均衡服务RebalanceService,给consumer和消息队列做负载均衡。

另外:DefaultMQPushConsumerImplMQClientInstance都是部署在客户端的;像从Broker拉取消息,消息队列的负载均衡都是在客户端完成的。

2、consumer启动流程关键点

主要就是检查配置参数;
获取MQClientInstance;
给重新负载服务设置消费组、消息传播模式、负载策略等属性,
创建pullAPIWrapper采用长轮询的方式拉取消息;
根据消息传播方式加载offsetStore;
根据是否为顺序消费选择对应的ConsumerMessageService消费服务并启动;
启动MQClientInstance、给broker心跳、唤醒ReBalance服务线程–立即负载队列。

至此国庆嗨皮完回归学习状态。
在这里插入图片描述

  相关解决方案