当前位置: 代码迷 >> 综合 >> 【RocketMQ 高可用】RocketMQ 高可用系列
  详细解决方案

【RocketMQ 高可用】RocketMQ 高可用系列

热度:96   发布时间:2023-09-21 00:09:36.0

目录

一、消息存储机制

1.1 介绍

1.2 存储介质类型和对比

1.3 消息存储机制

1.3.1 概述

1.3.2 顺序读写和随机读写对于机械硬盘来说为什么性能差异巨大?

1.3.3 消息存储结构和流程

1.3.4 消息存储结构

1.3.5 刷盘机制

二、高可用

2.1 NameServer 高可用

2.2 BrokerServer 高可用

2.3 消息消费高可用

2.4 消息发送高可用

2.5 消息主从复制

2.5.1 同步复制和异步复制

2.5.2 配置方式

2.5.3 实际应用

三、负载均衡

3.1 Provider 负载均衡

3.1.1 概述

3.1.2 配置

3.2 Customer 负载均衡

3.2.1 集群模式

3.2.2 广播模式

四、消息重试

4.1 顺序消息重试

4.1.1 概述

4.2 无序消息重试

4.2.1 概述

4.2.2 重试次数

4.2.3 消息重试相关配置

五、死信队列

5.1 死信队列概念

5.2 死信特性

5.3 查看死信信息和重发

六、消息幂等性

6.1 消费幂等

6.2 消费幂等的必要性

6.3 处理方式

6.3.1 分析

6.3.2 设置和获取


一、消息存储机制

1.1 介绍

由于消息队列有高可靠性的要求,故要对队列中的数据进行持久化存储。
【RocketMQ 高可用】RocketMQ 高可用系列

如图:

  1. 消息生产者先向 MQ 发送消息
  2. MQ 收到消息,将消息进行持久化,并在存储系统中新增一条记录
  3. 返回ACK(确认字符)给生产者
  4. MQ 推送消息给对应的消费者,等待消费者返回ACK(确认字符,确认消费)
  5. 若这条消息的消费者在等待时间内成功返回ACK,则 MQ 认为消息消费成功,删除存储中的消息
  6. 若 MQ 在指定时间内没有收到ACK,则认为消息消费失败,会尝试重新推送消息

 

1.2 存储介质类型和对比

常用的存储类型分为关系型数据库存储分布式KV存储 和 文件系统存储

   关系型数据库存储 分布式KV存储 文件系统存储
简介 选用 JDBC 方式实现消息持久化,只需要简单地配置 xml 即可实现 JDBC 消息存储 kv存储即 Key-Value 型存储中间件,如 Redis 和 RocksDB,将消息存储到这些中间件中 将消息存储到文件系统中
性能 存在性能瓶颈,如mysql在单表数据量达到千万级别的情况下,IO读写性能下降 通过高并发的中间件存储和处理消息,速度必然优于数据库存储方式 将消息刷盘至所部属虚拟化/物理机的文件系统来实现消息持久化,效率更高
可靠性 该方案十分依赖DB,一旦DB出现故障,MQ消息无法落盘存储,从而导致线上故障 相较DB来说更加安全可靠 除非部署 MQ 的机器本身或是本地磁盘挂了,否则一般不会出现无法持久化的问题
项目使用 ActiveMQ Redis、RockDB RocketMQ、Kafaka、RabbitMQ

存储效率:文件系统 > 分布式KV存储 > 关系型数据库DB

开发难度和集成:关系型数据库DB > 分布式KV存储 > 文件系统

 

1.3 消息存储机制

1.3.1 概述

目前的高性能磁盘,顺序写速度可以达到 600MB/s,足以满足一般网卡的传输速度,而磁盘随机读写的速度只有约 100KB/s,与顺序写的性能相差了 6000 倍。故好的消息队列系统都会采用顺序写的方式

 

1.3.2 顺序读写和随机读写对于机械硬盘来说为什么性能差异巨大?

引用自:https://blog.csdn.net/u010087886/article/details/54405934

  顺序读写 随机读写
文件数目 读取一个大文件 读取多个小文件
  比较:明显顺序读写只读取一个大文件,耗时更少。而随机读写需要打开多个文件,写进行多次的训导和旋转延迟,标绿远低于顺序读写
文件预读 顺序读写时磁盘会预读文件,即在读取的起始地址连续读取多个页面,若被预读的页面被使用,则无需再去读取 由于数据不在一起,无法预读
  比较:在大并发的情况下,磁盘预读能够免去大量的读操作,处理速度肯定更快
系统的overhead 只需要找到一个文件,并对这个文件进行属性和权限的检查 需要找到多个文件,并对每个文件进行属性和权限检查
  比较:只寻找一个文件,并确认属性和权限,肯定优于处理多个文件
写入数据 写入新文件时,需要寻找磁盘可用空间 写入新文件时,需要寻找磁盘可用空间。但由于一个文件的存储量更小,这个操作触发频率更多
  比较:顺序读写创建新文件,只需要创建一个大文件就可以用很久,而随机读写可能频繁创建文件。创建文件时需要进行寻找磁盘可用空间等一些列操作,肯定更加耗时

 

1.3.3 消息存储结构和流程

Linux操作系统分为【用户态】和【内核态】,文件操作、网络操作需要涉及这两种形态的切换,免不了进行数据复制。

一台服务器 把本机磁盘文件的内容发送到客户端,一般分为两个步骤:

  1. read;读取本地文件内容
  2. write;将读取的内容通过网络发送出去

这两个看似简单的操作,实际进行了4 次数据复制,分别是:

  1. 从磁盘复制数据到内核态内存
  2. 从内核态内存复 制到用户态内存
  3. 然后从用户态 内存复制到网络驱动的内核态内存
  4. 最后是从网络驱动的内核态内存复 制到网卡中进行传输

【RocketMQ 高可用】RocketMQ 高可用系列

通过使用mmap的方式,可以省去向用户态的内存复制,提高速度。这种机制在Java中是通过MappedByteBuffer实现的

RocketMQ充分利用了上述特性,也就是所谓的“零拷贝”技术,提高消息存盘和网络发送的速度。

这里需要注意的是,采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因了

 

1.3.4 消息存储结构

RocketMQ 消息的存储是由 ConsumeQueue 和 CommitLog 配合实现的,CommitLog 负责将消息存储在真正的物理存储文件,而 ConsumeQueue 则是消息的逻辑队列,存储对应消息指向的物理存储的地址。

每个 Topic 下的每个 Message Queue 都有对应的一个 ConsumeQueue 文件

查看文件:

【RocketMQ 高可用】RocketMQ 高可用系列

【RocketMQ 高可用】RocketMQ 高可用系列

CommitLog:存储消息的元数据,同时也保存了 ConsumerQueue,可以恢复 ConsumerQueue

ConsumerQueue:存储消息在CommitLog的索引,且会被加载到内存中,加快读取速度

IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程

 

1.3.5 刷盘机制

RocketMQ 的消息是存储在磁盘上的,这样做有两个优点:

  • 保证断点后恢复
  • 让存储的消息量超出内存的限制

Rocketmq 在保证顺序写时,在通过 Producer 写入 RocketMQ 的时候,支持两种写磁盘方式:同步刷盘和异步刷盘

【RocketMQ 高可用】RocketMQ 高可用系列

  同步刷盘 异步刷盘
 消息情况 在返回写成功状态时,消息已经被写入磁盘中。即消息被写入内存的PAGECACHE 中后,立刻通知刷新线程刷盘,等待刷盘完成,才会唤醒等待的线程并返回成功状态 在返回写成功状态时,消息可能只是被写入内存的 PAGECACHE 中。当内存的消息量积累到一定程度时,触发写操作快速写入
性能 需要等待刷盘才能返回结果 消息写入内存后立刻返回结果,吞吐量更高
可靠性 可以保持MQ的消息状态和生产者/消费者的消息状态一致 Master宕机,磁盘损坏的情况下,会丢失少量的消息, 导致MQ的消息状态和生产者/消费者的消息状态不一致

设置方式:

  【RocketMQ 高可用】RocketMQ 高可用系列

 

二、高可用

【RocketMQ 高可用】RocketMQ 高可用系列

2.1 NameServer 高可用

由于 NameServer 节点是无状态的,且各个节点直接的数据是一致的,故存在多个 NameServer 节点的情况下,部分 NameServer 不可用也可以保证 MQ 服务正常运行

 

2.2 BrokerServer 高可用

RocketMQ是通过 Master 和 Slave 的配合达到 BrokerServer 模块的高可用性的

一个 Master 可以配置多个 Slave,同时也支持配置多个 Master-Slave 组。

当其中一个 Master 出现问题时:

  • 由于Slave只负责读,当 Master 不可用,它对应的 Slave 仍能保证消息被正常消费
  • 由于配置多组 Master-Slave 组,其他的 Master-Slave 组也会保证消息的正常发送和消费

 

2.3 消息消费高可用

Consumer 的高可用是依赖于 Master-Slave 配置的,由于 Master 能够支持读写消息,Slave 支持读消息,当 Master 不可用或繁忙时, Consumer 会被自动切换到从 Slave 读取(自动切换,无需配置)。故当 Master 的机器故障后,消息仍可从 Slave 中被消费

 

2.4 消息发送高可用

在创建Topic的时候,把Topic的多个Message Queue创建在多个Broker组上(相同Broker名称,不同 brokerId的机器组成一个Broker组),这样当一个Broker组的Master不可用后,其他组的Master仍然可用,Producer仍然可以发送消息。 RocketMQ目前还不支持把Slave自动转成Master,如果机器资源不足, 需要把Slave转成Master,则要手动停止Slave角色的Broker,更改配置文 件,用新的配置文件启动Broker。

【RocketMQ 高可用】RocketMQ 高可用系列

 

2.5 消息主从复制

2.5.1 同步复制和异步复制

若一个 Broker 组有一个 Master 和 Slave,消息需要从 Master 复制到 Slave 上,有同步复制和异步复制两种方式

  同步复制 异步复制
概念 即等 Master 和 Slave 均写成功后才反馈给客户端写成功状态 只要 Master 写成功,就反馈客户端写成功状态
可靠性 可靠性高,若 Master 出现故障,Slave 上有全部的备份数据,容易恢复 若 Master 出现故障,可能存在一些数据还没来得及写入 Slave,可能会丢失
效率 由于是同步复制,会增加数据写入延迟,降低系统吞吐量 由于只要写入 Master 即可,故数据写入延迟较低,吞吐量较高

 

2.5.2 配置方式

可以对 broker 配置文件里的 brokerRole 参数进行设置,提供的值有:

ASYNC_MASTER:异步复制

SYNC_MASTER:同步复制

SLAVE:表明当前是从节点,无需配置 brokerRole

 

2.5.3 实际应用

在实际应用中,由于同步刷盘方式会频繁触发磁盘写操作,明显降低性能,故通常配置为:

刷盘方式:ASYNC_FLUSH(异步刷盘)

主从复制:SYNC_MASTER(同步复制)

异步刷盘能够避免频繁触发磁盘写操作,除非服务器宕机,否则不会造成消息丢失。

主从同步复制能够保证消息不丢失,即使 Master 节点异常,也能保证 Slave 节点存储所有消息并被正常消费掉。

 

三、负载均衡

3.1 Provider 负载均衡

3.1.1 概述

在实例发送消息时,默认会轮询所有订阅了改 Topic 的 broker 节点上的 message queue,让消息平均落在不同的 queue 上,而由于这些 queue 散落在不同的 broker 节点中,即使某个 broker 节点异常,其他存在订阅了这个 Topic 的 message queue 的 broker 依然能消费消息

【RocketMQ 高可用】RocketMQ 高可用系列

 

3.1.2 配置

打开 rocketmq-console,在 Topic 中新建主题,并指定要在哪些 broker 内订阅这些 Topic

【RocketMQ 高可用】RocketMQ 高可用系列

【RocketMQ 高可用】RocketMQ 高可用系列

发消息时的数据结果,可以看到 RocketMQ 集群都在同时消费这些消息

【RocketMQ 高可用】RocketMQ 高可用系列

可以看到不同的队列在处理这些消息

【RocketMQ 高可用】RocketMQ 高可用系列

 

3.2 Customer 负载均衡

3.2.1 集群模式

在集群消费模式下,存在多个消费者同时消费消息,同一条消息只会被某一个消费者获取。即消息只需要被投递到订阅了这个 Topic 的消费者Group下的一个实例中即可,消费者采用主动拉去的方式拉去并消费,在拉取的时候需要明确指定拉取那一条消息队列中的消息。

每当有实例变更,都会触发一次所有消费者实例的负载均衡,这是会按照queue的数量和实例的数量平均分配 queue 给每个实例。

【RocketMQ 高可用】RocketMQ 高可用系列

注意:

1)在集群模式下,一个 queue 只允许分配给一个消费者实例,这是由于若多个实例同时消费一个 queue 的小,由于拉取操作是由 consumer 主动发生的,可能导致同一个消息在不同的 consumer 实例中被消费。故算法保证了一个 queue 只会被一个 consumer 实例消费,但一个 consumer 实例能够消费多个 queue

2)控制 consumer 数量,应小于 queue 数量。这是由于一个 queue 只允许分配给一个 consumer 实例,若 consumer 实例数量多于 queue,则多出的 consumer 实例无法分配到 queue消费,会浪费系统资源

 

3.2.2 广播模式

广播模式其实不是负载均衡,由于每个消费者都能够拿到所有消息,故不能达到负载均衡的要求

【RocketMQ 高可用】RocketMQ 高可用系列

 

四、消息重试

4.1 顺序消息重试

4.1.1 概述

对于顺序消息,为了保证消息消费的顺序性,当消费者消费消息失败后,消息队列会自动不断进行消息重试(每次间隔时间为1s),这时会导致消息消费被阻塞的情况,故必须保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的发生

 

4.2 无序消息重试

4.2.1 概述

无序消息即普通、定时、延时、事务消息,当消费者消费消息失败时,可以通过设置返回状态实现消息重试

注意:无序消息的重试只针对集群消费方式生效,广播方式不提供失败重试特性,即消费失败后,失败的消息不再重试,而是继续消费新消息

 

4.2.2 重试次数

消息队列 RocketMQ 默认允许每条消息最多重试 16 次,每次重试的间隔时间如下:

第几次重试 与上次重试的间隔时间 第几次重试 与上次重试的间隔时间
1 10 秒 9 7 分钟
2 30 秒 10 8 分钟
3 1 分钟 11 9 分钟
4 2 分钟 12 10 分钟
5 3 分钟 13 20 分钟
6 4 分钟 14 30 分钟
7 5 分钟 15 1 小时
8 6 分钟 16 2 小时

如果消息重试 16 次后仍然失败,消息将不再投递。如果严格按照上述重试时间间隔计算,某条消息在一直消费失败的前提下,将会在接下来的 4 小时 46 分钟之内进行 16 次重试,超过这个时间范围消息将不再重试投递。

注意: 一条消息无论重试多少次,这些重试消息的 Message ID 不会改变。

 

 

4.2.3 消息重试相关配置

1)消息失败后,进行重试

集群消费方式下,消息消费失败后期望消息重试,需要在消息监听器接口的实现中明确进行配置

public class MessageListenerImpl implements MessageListener {@Overridepublic Action consume(Message message, ConsumeContext context) {//处理消息doConsumeMessage(message);// 推荐//方式1:返回 Action.ReconsumeLater,消息将重试return Action.ReconsumeLater;//不推荐//方式2:返回 null,消息将重试return null;//方式3:直接抛出异常, 消息将重试throw new RuntimeException("Consumer Message exceotion");}
}

2)消息失败后,不进行重试

集群消费方式下,消息失败后期望消息不重试,需要捕获消费逻辑中可能抛出的异常,最终返回 Action.CommitMessage,此后这条消息将不会再重试。

public class MessageListenerImpl implements MessageListener {@Overridepublic Action consume(Message message, ConsumeContext context) {try {doConsumeMessage(message);} catch (Throwable e) {//捕获消费逻辑中的所有异常,并返回 Action.CommitMessage;return Action.CommitMessage;}//消息处理正常,直接返回 Action.CommitMessage;return Action.CommitMessage;}
}

3)自定义消息最大重试次数

消息队列 RocketMQ 允许 Consumer 启动的时候设置最大重试次数,重试时间间隔将按照如下策略:

  • 最大重试次数小于等于 16 次,则重试时间间隔同上表描述。
  • 最大重试次数大于 16 次,超过 16 次的重试时间间隔均为每次 2 小时。

设置方式:

consumer.setMaxReconsumeTimes(17);

注意:

  • 消息最大重试次数设置,对相同 Group ID 下的所有 Consumer 实例有效。
  • 如果只对相同 Group ID 下两个 Consumer 实例中的其中一个设置了 MaxReconsumeTimes,那么该配置对两个 Consumer 实例均生效。
  • 配置采用覆盖的方式生效,即最后启动的 Consumer 实例会覆盖之前的启动实例的配置

4)获取消息重试次数

消费者收到消息后,可以获取到消息的重试次数

设置方式:

@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);for (MessageExt ext : msgs) {System.out.println(ext.getReconsumeTimes(););}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}

 

五、死信队列

5.1 死信队列概念

在正常情况下无法被消费(超过最大重试次数)的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列就称为死信队列(Dead-Letter Queue)

当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的死信队列中。

 

5.2 死信特性

死信消息有以下特点:

  • 不会再被消费者正常消费
  • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。故死信消息应在产生的 3 天内及时处理

死信队列有以下特点:

  • 一个死信队列对应一个消费者组,而不是对应单个消费者实例
  • 一个死信队列包含了对应的 Group ID 所产生的所有死信消息,不论该消息属于哪个 Topic
  • 若一个 Group ID 没有产生过死信消息,则 RocketMQ 不会为其创建相应的死信队列

 

5.3 查看死信信息和重发

在控制台查看死信队列的主题信息

【RocketMQ 高可用】RocketMQ 高可用系列

【RocketMQ 高可用】RocketMQ 高可用系列

重发消息

【RocketMQ 高可用】RocketMQ 高可用系列

 

六、消息幂等性

6.1 消费幂等

消费幂等即无论消费者消费多少次,其结果都是一样的。RocketMQ 是通过业务上的唯一 Key 来对消息做幂等处理

 

6.2 消费幂等的必要性

在网络环境中,由于网络不稳定等因素,消息队列的消息有可能出现重复,大概有以下几种:

  • 发送时消息重复

    当一条消息已被成功发送到服务端并完成持久化,此时出现了网络闪断或者客户端宕机,导致服务端对客户端应答失败。 如果此时生产者意识到消息发送失败并尝试再次发送消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

  • 投递时消息重复

    消息消费的场景下,消息已投递到消费者并完成业务处理,当客户端给服务端反馈应答的时候网络闪断。 为了保证消息至少被消费一次,消息队列 RocketMQ 的服务端将在网络恢复后再次尝试投递之前已被处理过的消息,消费者后续会收到两条内容相同并且 Message ID 也相同的消息。

  • 负载均衡时消息重复(包括但不限于网络抖动、Broker 重启以及订阅方应用重启)

    当消息队列 RocketMQ 的 Broker 或客户端重启、扩容或缩容时,会触发 Rebalance,此时消费者可能会收到重复消息。

结合三种情况,可以发现消息重发的最后结果都是,消费者接收到了重复消息,那么,我们只需要在消费者端统一进行幂等处理就能够实现消息幂等。

 

6.3 处理方式

6.3.1 分析

RocketMQ 只能够保证消息丢失但不能保证消息不重复,且由于高可用和高性能的考虑,不应在 RocketMQ 中实现消息幂等性的处理。由于 6.2 可知,应该在消费端实现消息幂等性。

在消费端通过业务逻辑实现幂等性操作,最常用的方式就是唯一ID的形式,若已经消费过的消息就不进行处理。例如在商城系统中使用订单ID作为关键ID,分布式系统中常用雪花算法生成ID。

雪花算法: https://blog.csdn.net/qq_34416331/article/details/107337331

 

6.3.2 设置和获取

在发送消息时,可以对 Message 设置标识:

Message message = new Message();# 设置唯一标识,标识由雪花算法生成
message.setKey(idWorker.nextId());

订阅方收到消息时,可以获取到这个 Key

consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);for (MessageExt ext : msgs) {System.out.println(ext.getKeys());}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});