当前位置: 代码迷 >> 综合 >> 【RocketMQ高可用】RocketMQ 双主双从集群搭建
  详细解决方案

【RocketMQ高可用】RocketMQ 双主双从集群搭建

热度:61   发布时间:2023-09-21 00:07:37.0

目录

一、集群架构

二、四种集群模式

2.1 单 Master 模式

2.2 多 Master 模式

2.3 多 Master 多 Slave 模式(异步)

2.4 多 Master 多 Slave 模式(同步)

三、双主双从集群搭建

3.1 总体架构图

3.2 工作流程

3.3 双主双从集群搭建流程

3.3.1 防火墙开放端口

3.3.2 配置环境变量

3.3.3 创建消息存储路径

3.3.4 配置 broker 配置文件

3.3.5 开放对应端口

3.3.6 创建脚本文件

四、集群监控平台

4.1 概述

4.2 编译和打包


一、集群架构

  描述
NameServer 集群 NameServer 是一个无状态的节点,可集群部署,节点都是各自独立的,无任何信息同步
Broker 集群

① Broker 分为 Master 与 Slave,一个 Master 可以对应多个 Slave,但一个 Slave 只能对应一 个Master

② Master 与 Slave 的对应关系通过制定相同的 BrokerName,不同的 BrokerID 来定义,id 为 0 表示 Master,         非 0 表示 Slave

③ 可以部署多个 Master 实现 Broker 集群,即多组 Master - Slaves 的 Broker 节点

④ Master 通常用于写入数据,Slaves 用于读取数据

⑤ 每个 Broker 与 NameServer 集群中的所有节点建立长连接,定时注册 Topic 信息到所有 NameServer

Producer 集群

① Producer 为消息的生产者,都是各自独立的无状态的节点,可以认为只要向 mq 中推送消息的节点都算作 Producer 节点。

② Producer 节点与 NameServer 集群中的随机一个节点建立长连接,定期从 NameServer 取出 Topic 路由信息,并向提供 Topic 服务的 Master 建立长连接。且定时向 Master 发送心跳。

Customer 集群

① Customer 为消息的消费者,都是各自独立的无状态的节点,可以认为只要向 mq 中获取消息的节点都算作 Customer 节点。

② Customer 节点与 NameServer 集群中的随机一个节点建立长连接,定期从 NameServer 取出 Topic 路由信息,并向提供 Topic 服务的 Master、Slave 建立长连接。且定时向 Master、Slave 发送心跳。

③ Customer 节点既可以从 Master 订阅消息,也可以从 Slave 订阅消息,订阅规则由 Broker 配置决定

       【RocketMQ高可用】RocketMQ 双主双从集群搭建

 

二、四种集群模式

2.1 单 Master 模式

部署简单,但风险较大,一旦 Broker 重启或者宕机时,会导致服务不可用,不建议在线上环境使用,可以在本地测试

 

2.2 多 Master 模式

Broker 集群中无 Slave,全是 Master

优点:

配置简单,且单个 Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复的情况下,由于 RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢),性能最高

缺点:

单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅和消费,消费实时性会受到影响

 

2.3 多 Master 多 Slave 模式(异步)

每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采取异步复制方式,主备有短暂消息延迟(毫秒级)。即提供者将消息发送到 Master 后,Master 即刻返回结果给提供者。同时异步复制到 Slave 中

优点:

即使磁盘损坏,丢失的消息非常少,且消息的实时性不会受到影响。且 Master 宕机后,消费者仍然可以从 Slave 消费,而且此过程对应用透明,不需要人工干预,性能同多 Master 模式几乎一样

缺点:

Master 宕机,磁盘损坏的情况下会丢失少量消息

 

2.4 多 Master 多 Slave 模式(同步)

每个Master配置一个Slave,有多对Master-Slave,HA采用同步双写方式,即只有主备都写成功,才向应用返回成功。即提供者将消息发送到 Master后,Master 要将这条消息同步到所有的 Slave 上,同步全部完成后,才会返回结果给提供者。

优点:

数据与服务都无单点故障,在 Master 宕机的情况下消息无延迟,服务可用性与数据可用性都非常高

消息安全高,高可用性

缺点:

性能比异步复制模式略低(约低10%左右),发送单个消息的RT会略高,且在主节点宕机后,备机不能自动切换为主机,需要人为干涉

 

三、双主双从集群搭建

3.1 总体架构图

双主双从集群(2m-2s)同步双写方式,即使用两组 Master-Slave 形成集群

【RocketMQ高可用】RocketMQ 双主双从集群搭建

 

3.2 工作流程

1)启动NameServer,NameServer起来后监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。

2)Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有

      Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。

3)收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。

4)Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪 

      些Broker上,轮询从队列列表中选择一个队列,然后与队列所在的Broker建立长连接从而向Broker发消息。

5)Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker

      建立连接通道,开始消费消息。

 

3.3 双主双从集群搭建流程

3.3.1 防火墙开放端口

在实际工作中,应只对外暴露需要被外部访问的端口。

  • nameserver: 默认使用 9876 端口
  • master: 默认使用 10911 端口
  • slave: 默认使用 11011 端口

开放命令:

# 开放name server默认端口
firewall-cmd --zone=public --add-port=9876/tcp --permanent# 开放master默认端口
firewall-cmd --zone=public --add-port=10911/tcp --permanent# 开放slave默认端口 (当前集群模式可不开启)
firewall-cmd --zone=public --add-port=11011/tcp --permanent# 重启防火墙
firewall-cmd --reload# 查看已开放的端口
firewall-cmd --list-ports

 

3.3.2 配置环境变量

进入配置文件

vim /etc/profile

在 profile 文件末尾增加

#set rocketmq
ROCKETMQ_HOME=/usr/local/rocketmq
PATH=$PATH:$ROCKETMQ_HOME/bin
export ROCKETMQ_HOME PAT

让配置立刻生效

source /etc/profile

 

3.3.3 创建消息存储路径

mq 获取到消息后,broker 会默认将消息持久化,持久化目录默认为 /home,故可以修改消息存储路径

# 创建消息存储文件夹
cd /usr/local/rocketmqmkdir store
mkdir ./store/commitlog
mkdir ./store/consumequeue
mkdir ./store/index

创建slave文件夹,避免和master共用一个存储文件夹,否则会报 Lock failed,MQ already started

# 创建 slave 存储的文件夹
cd /usr/local/rocketmqmkdir s_store
mkdir ./s_store/commitlog
mkdir ./s_store/consumequeue
mkdir ./s_store/index

 

3.3.4 配置 broker 配置文件

rocketmq 已经提供了一些集群模式的配置文件模板

cd /usr/local/rocketmq/conf

 【RocketMQ高可用】RocketMQ 双主双从集群搭建

修改集群配置:

序号 IP 角色 架构模式
1 192.168.234.135 nameserver、brokerserver Master1、Slave2
2 192.168.234.139 nameserver、brokerserver Master2、Slave1

1)Master1 配置(192.168.234.135)

vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-a.properties

修改配置文件内容:

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=0
brokerIP1=192.168.234.135
#nameServer地址,分号分割
namesrvAddr=192.168.234.135:9876;192.168.234.139:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

2)Slave2 配置(192.168.234.135)

vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-b-s.properties

修改配置文件内容:

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=1
brokerIP1=192.168.234.135
#nameServer地址,分号分割
namesrvAddr=192.168.234.135:9876;192.168.234.139:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/s_store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/s_store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/s_store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/s_store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/s_store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/s_store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

3)Master2 配置(192.168.234.139)

vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-b.properties

修改配置文件内容:

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
brokerIP1=192.168.234.139
#nameServer地址,分号分割
namesrvAddr=192.168.234.139:9876;192.168.234.135:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=10911
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SYNC_MASTER
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=SYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

4)Slave1 配置(192.168.234.139)

vi /usr/soft/rocketmq/conf/2m-2s-sync/broker-a-s.properties

修改配置文件内容:

#所属集群名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此处不同的配置文件填写的不一样
brokerName=broker-a
#0 表示 Master,>0 表示 Slave
brokerId=1
brokerIP1=192.168.234.139
#nameServer地址,分号分割
namesrvAddr=192.168.234.135:9876;192.168.234.139:9876
#在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4
#是否允许 Broker 自动创建Topic,建议线下开启,线上关闭
autoCreateTopicEnable=true
#是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true
#Broker 对外服务的监听端口
listenPort=11011
#删除文件时间点,默认凌晨 4点
deleteWhen=04
#文件保留时间,默认 48 小时
fileReservedTime=120
#commitLog每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每个文件默认存30W条,根据业务情况调整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
#存储路径
storePathRootDir=/usr/local/rocketmq/s_store
#commitLog 存储路径
storePathCommitLog=/usr/local/rocketmq/s_store/commitlog
#消费队列存储路径存储路径
storePathConsumeQueue=/usr/local/rocketmq/s_store/consumequeue
#消息索引存储路径
storePathIndex=/usr/local/rocketmq/s_store/index
#checkpoint 文件存储路径
storeCheckpoint=/usr/local/rocketmq/s_store/checkpoint
#abort 文件存储路径
abortFile=/usr/local/rocketmq/s_store/abort
#限制的消息大小
maxMessageSize=65536
#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 异步复制Master
#- SYNC_MASTER 同步双写Master
#- SLAVE
brokerRole=SLAVE
#刷盘方式
#- ASYNC_FLUSH 异步刷盘
#- SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#发消息线程池数量
#sendMessageThreadPoolNums=128
#拉消息线程池数量
#pullMessageThreadPoolNums=128

3.3.5 开放对应端口

根据自己在 Master 文件中设定的 ListenPort 值,计算需要开放的端口:

1) Master1

【RocketMQ高可用】RocketMQ 双主双从集群搭建

在 Master1 中定义的 listenPort 为 10911,故需要暴露:

vip通道端口:10911 - 2 = 10909

HA 高可用端口:10911 + 1 = 10912

firewall-cmd --zone=public --add-port=10909/tcp --permanent
firewall-cmd --zone=public --add-port=10912/tcp --permanent
firewall-cmd --reload

 

2) Master2

【RocketMQ高可用】RocketMQ 双主双从集群搭建

在 Master2 中定义的 listenPort 为 11011,故需要暴露:

vip通道端口:11011 - 2 = 11009

HA 高可用端口:11011 + 1 = 11012

firewall-cmd --zone=public --add-port=11009/tcp --permanent
firewall-cmd --zone=public --add-port=11012/tcp --permanent
firewall-cmd --reload

 

3.3.6 创建脚本文件

nameserver 脚本:

# 创建启动文件
vim name_start.sh# 添加内容
nohup sh ./bin/mqnamesrv &
echo "nameserver 启动成功 .."
tail -f ~/logs/rocketmqlogs/namesrv.log
# 创建关闭文件
vim name_stop.sh# 添加内容
sh bin/mqshutdown namesrv
echo "nameserver 关闭成功..."
echo "再次搜索结果为:"
sleep 3
jps

 

在135服务器上:

# 编写master1启动脚本
vim b_master1_start.sh# 添加内容
nohup sh ./bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-a.properties &
tail -f ~/logs/rocketmqlogs/broker.log
# 编写slave2启动脚本
vim b_slave2_start.sh# 添加内容
nohup sh ./bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-b-s.properties &
tail -f ~/logs/rocketmqlogs/broker.log

 

在139服务器上:

# 编写master2启动脚本
vim b_master2_start.sh# 添加内容
nohup sh ./bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-b.properties &
tail -f ~/logs/rocketmqlogs/broker.log
# 编写slave1启动脚本
vim b_slave1_start.sh# 添加内容
nohup sh ./bin/mqbroker -c /usr/local/rocketmq/conf/2m-2s-sync/broker-a-s.properties &
tail -f ~/logs/rocketmqlogs/broker.log

 

四、集群监控平台

4.1 概述

RocketMQ有一个对其扩展的开源项目incubator-rocketmq-externals,这个项目中有一个子模块叫rocketmq-console,这个便是管理控制台项目了,先将incubator-rocketmq-externals拉到本地,因为我们需要自己对rocketmq-console进行编译打包运行。

 

4.2 编译和打包

下载项目:

git clone https://github.com/apache/rocketmq-externals

修改配置文件:

【RocketMQ高可用】RocketMQ 双主双从集群搭建

打包:

用git打包界面,输入下面的命令

mvn clean package -Dmaven.test.skip=true

【RocketMQ高可用】RocketMQ 双主双从集群搭建

将打包后的jar包放到服务器上,启动控制台

java -jar rocketmq-console-ng-1.0.0.jar

如果服务器的防火墙没有开放 8080、11099 端口,则需要打开:

firewall-cmd --zone=public --add-port=8080/tcp --permanent
firewall-cmd --zone=public --add-port=11009/tcp --permanent
firewall-cmd --reload

启动成功后,在浏览器访问 localhost:8080 进入控制台界面即可

【RocketMQ高可用】RocketMQ 双主双从集群搭建