当前位置: 代码迷 >> 综合 >> 【RocketMQ源码】二、NameServer 源码学习
  详细解决方案

【RocketMQ源码】二、NameServer 源码学习

热度:36   发布时间:2023-09-21 00:08:23.0

一、架构设计

1.1 消息中间件设计思路

消息中间件的设计思路一般是基于主题订阅发布机制,即生产者发送某一个主题消息到服务器,消息服务器负责将消息持久化存储,消费者订阅感兴趣的主题,由服务器主动推送到消费者(Push模式)或消费者主动向消息服务器拉取(Pull模式),从而实现生产者和消费者解耦。

【RocketMQ源码】二、NameServer 源码学习

 

1.2 NameServer 解决了什么

1.2.1 存在的问题

为了增加消息中间件的高可用性,避免消息服务器的单点故障导致整个系统瘫痪,通常会部署多台消息服务器共同承担消息的存储。那么问题随之而来:

  • 消息生产者怎么知道消息要发送到哪台消息服务器呢?
  • 若某一台消息服务器宕机了,消息生产者如何动态感知呢?

NameServer 就是为了解决以上问题设计的

 

1.2.2 解决方式

【RocketMQ源码】二、NameServer 源码学习

NameServer 就像一个寻址表,负责 broker 地址的记录:

  • Broker 在启动时向所有存活的 NameServer 注册,NameServer 会记录当前这个 Broker 的 IP 地址等相关信息
  • 消息生产者在发送消息前,先从 NameServer 获取 Broker 服务器地址列表,然后根据负载均衡算法从列表中选择一台服务器进行发送消息
  • NameServer 和每一个注册的 Broekr 都保持长连接,每隔 30s 检测 Broker 是否存活,若检测到 Broker 宕机,则从路由注册表中删除。为了降低 NameServer 实现的复杂度,路由变化不会马上通知消息生产者,而是通过消息发送端的容错起止保证消息发送的可用性
  • NameServer 的高可用是通过部署多台 NameServer 实现的,但 NameServer 服务器彼此之间不通讯,即各个 NameServer 服务器之间在某一时刻的数据并不完全相同,可是这对消息的发送不会造成任何影响 (因为已记录的 broker 仍能接收消息)

 

二、源代码跟踪

2.1 启动流程分析

【RocketMQ源码】二、NameServer 源码学习

2.1.1 启动配置获取和加载

【RocketMQ源码】二、NameServer 源码学习

【RocketMQ源码】二、NameServer 源码学习

点进去可以看到这两个启动类的配置内容:

NamesrvConfig: 

【RocketMQ源码】二、NameServer 源码学习

 

2.1.2 初始化方法 initialize

【RocketMQ源码】二、NameServer 源码学习

 

2.1.3 broker 扫描和移除

查看初始化方法,在初始化方法中定义了定时器。

【RocketMQ源码】二、NameServer 源码学习

【RocketMQ源码】二、NameServer 源码学习

继续往下看,看它是如何删除的

【RocketMQ源码】二、NameServer 源码学习

【RocketMQ源码】二、NameServer 源码学习

 

2.2 路由分析

NameServer的主要作用是为消息的生产者和消息消费者提供关于主题Topic的路由信息,那么NameServer需要存储路由的基础信息,还要管理Broker节点,包括路由注册、路由删除等。

2.2.1 路由管理类

路由管理类在创建 NamesrvController 的时候就被创建了

【RocketMQ源码】二、NameServer 源码学习

【RocketMQ源码】二、NameServer 源码学习

【RocketMQ源码】二、NameServer 源码学习

2.2.1.1 topicQueueTable

主要记录了有哪些broker有订阅当前的topic,同时记录了读队列和写队列(均默认4个)

【RocketMQ源码】二、NameServer 源码学习

2.2.1.2 brokerAddrTable

主要记录在 nameserver 上注册了的 broker 的信息,包括名称、所属集群以及集群内的节点地址等

【RocketMQ源码】二、NameServer 源码学习

2.2.1.3 clusterAddrTable

主要记录了集群列表,以及集群中的成员名称

【RocketMQ源码】二、NameServer 源码学习

2.2.1.4 brokerLiveTable

brokerLiveTable 中记录了所有活跃的 broker 地址,最后接收到的心跳时间,连接通道等数据

【RocketMQ源码】二、NameServer 源码学习

 

2.2.2 路由注册流程

路由注册是通过 Broker 与 NameServer 的心跳功能实现的。

  • Broker 启动时向所有的 NameSever 发送心跳信息,且每隔 30s 想集群中所有的 NameServer 发送心跳包,NameServer 收到心跳包时会更新 brokerLiveTable 缓存中对应的 broker 信息。
  • NameServer 每隔 10s 会扫描 brokerLiveTable 表,若连续 2min 内没有收到心跳包,则会将这个 Broker 路由信息移除,并关闭连接

同时,由于 NameServer 之间是无状态的,这种设计也让新加入的 NameServer 节点能够快速与其他 NameServer 保持数据同步。

2.2.2.1 Broker 定时上报信息

代码:BrokerController#start

【RocketMQ源码】二、NameServer 源码学习

 

2.2.2.2 上报信息的代码解析

代码:BrokerOuterAPI#registerBrokerAll

【RocketMQ源码】二、NameServer 源码学习

 

2.2.2.3 NameServer 处理心跳包

【RocketMQ源码】二、NameServer 源码学习

代码:DefaultRequestProcessor#processRequest

【RocketMQ源码】二、NameServer 源码学习

继续往下跟进:

代码:DefaultRequestProcessor#registerBroker

获取写锁,并开始写操作

【RocketMQ源码】二、NameServer 源码学习

 

2.2.3 路由删除流程

路由删除有两个触发点:

  • NameServer 每隔 10s 会扫描 brokerLiveTable 上的所有 broker 节点,查看上次心跳包与系统时间差是否大于 120s, 若超过则认为这个 broker 不可用,移除这个 broker
  • Broker 正常关闭的情况下,执行 unregisterBroker 指定,主动删除路由

2.2.3.1 NameServer 扫描发现

代码:NamesrvController#initialize

【RocketMQ源码】二、NameServer 源码学习

代码:RouteInfoManager#scanNotActiveBroker

在onChannelDestory中会维护各个表数据

【RocketMQ源码】二、NameServer 源码学习

 

2.2.4 路由发现

路由发现并非实时的,即当 Topic 路由 发生变化,NameServer 不会主动推送给客户端,而是由客户端定时拉取主题最新的路由

代码:DefaultRequestProcessor#getRouteInfoByTopic

【RocketMQ源码】二、NameServer 源码学习

 

2.3 小结

【RocketMQ源码】二、NameServer 源码学习

 

三、知识点

3.1 优雅地关闭线程池 

在 start 函数内,可以发现调用了 Runtime.getRuntime().addShutdownHook 

public static NamesrvController start(final NamesrvController controller) throws Exception {if (null == controller) {throw new IllegalArgumentException("NamesrvController is null");}boolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}// 设置钩子,在JVM进程关闭前会自动调用,先将线程池关闭,及时释放资源Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {@Overridepublic Void call() throws Exception {// 释放controllercontroller.shutdown();return null;}}));controller.start();return controller;
}