目录
一、MQ 概述
1.1 什么是 MQ
1.2 MQ 的常见场景
1.2.1 应用解耦
1.2.2 流量削峰
1.2.3 数据分发
1.3 MQ 的优缺点
1.4 MQ 产品比较
二、RocketMQ 使用
2.1 RocketMQ 角色介绍
2.2 基本搭建
2.2.1 下载
2.2.2 安装和配置
2.2.3 启动和日志
2.2.4 关闭 rocketmq
2.2.5 测试 RocketMQ
2.3 编写启动和关闭脚本
2.3.1 nameserver 脚本
2.3.2 brokerserver 脚本
三、RocketMQ 示例代码
3.1 提供者示例
3.1.1 引入依赖
3.1.2 发送同步消息
3.1.3 发送异步消息
3.1.4 发送单向消息
3.2 消费者示例
3.2.1 负载均衡模式(默认)
3.2.2 广播模式
四、消息类型
4.1 顺序消息
4.1.1 原理解析
4.1.2 示例代码
4.2 延时消息
4.2.1 常用场景
4.2.2 实现原理
4.2.3 示例代码
4.3 批量消息
4.4 过滤消息
4.4.1 常见语法
4.4.2 多个 Tag 消费
4.4.3 SQL 语法过滤
4.5 事务消息
4.5.1 事务消息处理流程图
4.5.2 三种状态
4.5.3 示例代码
一、MQ 概述
1.1 什么是 MQ
MQ(Message Queue) 即消息队列,是一种先进先出的结构,主要应用于应用解耦、流量削峰、数据分发等场景,能够极大降低硬件需求,是大并发处理系统中不可缺少的一个环节
1.2 MQ 的常见场景
1.2.1 应用解耦
系统设计时要求高内聚,低耦合。系统耦合性越高,容错性就越低。
举个例子:
在订单系统中,需要调用支付系统、库存系统、物流系统
当有一天物流系统发生故障或升级,则会对订单系统造成影响,造成用户下单异常。即使物流系统修复,也可能造成订单的丢失。
使用消息队列后,订单系统只需要将数据发送到 MQ 中,就可以返回给用户了。而消息存储在 MQ 中,即使系统发生异常,等系统重启后,也会从 MQ 中取出对应数据,并处理。
1.2.2 流量削峰
应用系统若遇到系统请求量猛增时,可能会对数据库造成极大压力,甚至导致系统崩溃。同时使用流量削峰,能够降低硬件成本。
例如:
A系统是一个订单秒杀系统,订单秒杀只有每天的晚上6点才会有大流量压入。经过预估,A系统每秒能承受的并发量为3k,超出可能造成延时甚至系统崩溃。这时候若选择升级硬件,由于订单秒杀系统只有晚6点才会有大流量,就会造成成本增加和资源浪费。若放任不管,则要承担系统崩溃的风险。
若增加了消息队列,则可以将请求缓存起来,让系统逐步处理。通过控制每秒获取的请求数,可以让A系统安全地处理完这些请求。
1.2.3 数据分发
通过消息队列,可以让数据在多个系统之间进行流通。数据产生方不需要关心谁来使用数据,只需要将数据发送到消息队列中。而数据的使用方只要直接从消息队列中获取数据使用即可。
1.3 MQ 的优缺点
优点:解耦、削峰、数据分发,降低成本
缺点:
系统可用性降低
过度依赖 MQ,一旦 MQ 宕机,则会对业务造成巨大影响
系统复杂度提高
MQ 的加入大大提高了系统复杂度,以前系统是同步的远程调用,现在是通过MQ进行异步调用。如何保证消息没有被重复消费?如何处理消息丢失的情况?如何保证消息的有序性?
一致性问题
A系统处理完业务,通过MQ给B、C、D三个系统,若B、C系统处理成功,D系统处理失败,如何保证消息数据处理的一致性?
1.4 MQ 产品比较
常见的产品有 ActiveMQ、RabbitMQ、RocketMQ、Kafka
二、RocketMQ 使用
2.1 RocketMQ 角色介绍
- Producer:消息发送者
- Consumer:消息接受者
- Broker: 暂存和传输消息
- NameServer:管理Broker
- Topic:区分消息的种类,一个发送者可以发送消息给一个或多个 Topic;一个消息的接收者可以订阅一个或多个 Topic 消息
- Message Queue:相当于 Topic的分区,用于并行发送消息和并行接收消息
2.2 基本搭建
2.2.1 下载
官网下载:https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.5.1/rocketmq-all-4.5.1-bin-release.zip
百度网盘下载:
链接:https://pan.baidu.com/s/1nUNp4E9c7l_K6X5j4fcyfA
提取码:ck22
2.2.2 安装和配置
安装rocketmq
# 解压安装包
unzip rocketmq-all-4.5.1-bin-release.zip# 重命名
mv rocketmq-all-4.5.1-bin-release rocketmq
目录解析:
- benchmark:官方自带的压测工具
- bin:启动脚本
- conf:实例配置脚本,包括 broker 配置文件、 logback 配置文件等
- lib:依赖的jar包,包括 Netty、commons-lang、FastJSON等
修改配置文件:
由于 rocketmq 默认占用的虚拟机内存为8G,可能与实际需求不符,故需要修改 rocketmq 占用的内存大小
vim ./bin/runserver.sh
vim runbroker.sh
配置属性:
- -Xms 堆内存的最小大小,默认为物理内存的1/64
- -Xmx jvm最大可用堆内存的大小,默认为物理内存的1/4
- -Xmn 堆内新生代的大小。
- -XX:MetaspaceSize=128m //持久代的初始大小
- -XX:MaxMetaspaceSize=320m //持久代的上限
- 整个JVM内存大小=新生代大小 + 年老代大小 + 持久代大小
2.2.3 启动和日志
启动时最好先启动 NameServer,后启动 Broker。因为 NameServer 是用于记录 Broker 地址的,每个 Broker 启动时都会向 NameServer 上报自己的地址
启动 NameServer
# 后台启动NameServer
nohup sh bin/mqnamesrv &# 查看启动日志
tail -f ~/logs/rocketmqlogs/namesrv.log
启动 Broker
# 启动Broker
nohup sh bin/mqbroker -n localhost:9876 &# 查看启动日志
tail -f ~/logs/rocketmqlogs/broker.log
验证
jps
2.2.4 关闭 rocketmq
# 1.关闭NameServer
sh bin/mqshutdown namesrv# 2.关闭Broker
sh bin/mqshutdown broker
2.2.5 测试 RocketMQ
发送消息
# 设置环境变量
export NAMESRV_ADDR=localhost:9876# 使用安装包的Demo发送消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
接收消息
# 设置环境变量
export NAMESRV_ADDR=localhost:9876# 接收消息
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
2.3 编写启动和关闭脚本
2.3.1 nameserver 脚本
# 创建启动脚本文件
vim name_start.sh# 添加脚本内容
nohup sh ./bin/mqnamesrv &
echo "nameserver 启动成功 .."
tail -f ~/logs/rocketmqlogs/namesrv.log
# 创建关闭脚本文件
vim name_stop.sh# 添加脚本内容
sh bin/mqshutdown namesrv
echo "nameserver 关闭成功..."
echo "再次搜索结果为:"
sleep 3
jps
2.3.2 brokerserver 脚本
# 创建启动脚本文件
vim broker_start.sh# 添加脚本内容
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
# 创建关闭脚本文件
vim broker_stop.sh# 添加脚本内容
sh bin/mqshutdown broker
echo "brokerserver 关闭中..."
echo "再次搜索结果为:"
sleep 5
jps
三、RocketMQ 示例代码
3.1 提供者示例
3.1.1 引入依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.4.0</version>
</dependency>
3.1.2 发送同步消息
同步消息即消息生产者给 mq 发送消息,发送后发送线程会阻塞,直到 mq 返回结果。这种发送方式可靠性较高,通常用于重要的消息中,如发送短信等。
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;import java.util.concurrent.TimeUnit;/*** 发送同步消息*/
public class SyncProducer {public static void main(String[] args) throws Exception {// 创建消息生产者producer,并指定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");// 指定 Nameserver 地址producer.setSendMsgTimeout(6000);producer.setVipChannelEnabled(false);producer.setNamesrvAddr("192.168.234.134:9876;192.168.234.139:9876");// 启动 producerproducer.start();for (int i = 0; i < 1; i++) {// 创建消息对象,指定主体 Topic、Tag 和消息体Message message = new Message("base", "Tag1", ("Hello World" + i).getBytes());// 发送消息SendResult result = producer.send(message);// 发送状态、ID以及接受消息的队列的IDSendStatus status = result.getSendStatus();String msgId = result.getMsgId();int queueId = result.getMessageQueue().getQueueId();System.out.println("发送状态: " + status + " 消息ID: " + msgId + " 队列: " + queueId);// TimeUnit.SECONDS.sleep(1);}// 关闭 producerproducer.shutdown();}
}
3.1.3 发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待 Broker 的响应
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.util.concurrent.TimeUnit;public class AsyncProducer {public static void main(String[] args) throws Exception {// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("group1");// 指定 Nameserver 地址producer.setNamesrvAddr("gtom.top:9876;gtom2.top:9876");// 启动Producer实例producer.start();producer.setRetryTimesWhenSendAsyncFailed(0);for (int i = 0; i < 300; i++) {final int index = i;// 创建消息,并指定Topic,Tag和消息体Message msg = new Message("base2", "Tag1", ("Hello World" + i).getBytes());producer.send(msg, new SendCallback() {public void onSuccess(SendResult sendResult) {System.out.println("发送结果:" + sendResult);}public void onException(Throwable e) {System.out.println("发送异常:" + e);
// e.printStackTrace();}});}Thread.sleep(100000);// 如果不再发送消息,关闭Producer实例。producer.shutdown();}
}
3.1.4 发送单向消息
这种方式主要用在不特别关心发送结果的场景,例如日志发送
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class OnewayProvider {public static void main(String[] args) throws Exception {// 创建消息生产者producer,并指定生产者组名DefaultMQProducer producer = new DefaultMQProducer("group1");// 指定 Nameserver 地址producer.setSendMsgTimeout(6000);producer.setVipChannelEnabled(false);producer.setNamesrvAddr("gtom.top:9876;gtom2.top:9876");// 启动 producerproducer.start();for (int i = 0; i < 100; i++) {// 创建消息对象,指定主体 Topic、Tag 和消息体Message message = new Message("base2", "Tag1", ("Hello World" + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送单向消息,不返回任何结果producer.sendOneway(message);System.out.println("发送单向消息");
// TimeUnit.SECONDS.sleep(1);}// 关闭 producerproducer.shutdown();}
}
3.2 消费者示例
消费者消费分为广播模式和负载均衡模式
3.2.1 负载均衡模式(默认)
负载均衡模式即有多个消费者共同消费队列信息,每个消费者获得的消息都不同。即队列中有a,b,c三条消息,消费者1拿到的是a,b,这两条消息,并进行消费,而消费者2拿到c这条消息,并消费。他们共同协作消费掉队列中的消息
示例代码:
通过设置消费方式:
// 设置为负载均衡模式消费
consumer.setMessageModel(MessageModel.CLUSTERING);
完整代码:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;import java.util.List;/*** @File: Consumer* @Description:* @Author: tom* @Create: 2020-07-01 09:17**/
public class Consumer {public static void main(String[] args) throws Exception {// 创建消费者 consumer,指定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");// 指定 Nameserver 地址consumer.setNamesrvAddr("gtom.top:9876;gtom2.top:9876");// 订阅主题 Topic 和 Tagconsumer.subscribe("base2", "Tag1");// 设置为负载均衡模式消费consumer.setMessageModel(MessageModel.CLUSTERING);// 设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {// 接收消息内容public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : msgs) {System.out.println(new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumerconsumer.start();}
}
3.2.2 广播模式
广播模式即只要每个消费者订阅的topic和tag相同,则每个消费者获取到的消息都是相同的。如队列中有 a,b,c 这三条消息,则只要订阅了的消费者均能拿到 a,b,c 三条消息
示例代码:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class Consumer {public static void main(String[] args) throws Exception {// 创建消费者 consumer,指定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");// 指定 Nameserver 地址consumer.setNamesrvAddr("gtom.top:9876;gtom2.top:9876");// 订阅主题 Topic 和 Tagconsumer.subscribe("base2", "Tag1");// 设置回调函数,处理消息consumer.registerMessageListener(new MessageListenerConcurrently() {// 接收消息内容public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {for (MessageExt msg : msgs) {System.out.println(new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumerconsumer.start();}
}
四、消息类型
4.1 顺序消息
消息有序是指可以按照消息的发送顺序来消费(FIFO),RoecketMQ 可以保证消息有序,分为分区有序和全局有序。
4.1.1 原理解析
由于消息发送是采用轮询的方式将消息发送到不同的 queue 中,而消费者消费消息是通过多线程,同时从多个 queue 上拉取消息,这种情况下很难保证消息的发送和消费有序。
如图,可以看到最后消费者得到的消息是乱序的
Rocketmq又如何保证有序呢?
如上图,我们只需要保证发送者中需要保证消息有序的消息,都发送到同一个 queue 中去就行了。即小王的所有消息都发送到同一个队列中,小李的所有消息都发送到同一个队列中。在代码实现中,可以通过一个唯一标识选择一个队列,之后的消息都发到对应队列中。
全局有序:只有一个队列,所有的消息都被推送到这个唯一的队列中
分区有序:存在多个队列,但要保证的有序的消息都会被扔到同一个队列中
当然,这只是简单的实现了顺序消息,实际情况总是比这复杂,如多消费者的情况下,如何保证消息顺序
4.1.2 示例代码
producer:
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;import java.util.List;/*** @File: Producer* @Description:* @Author: tom* @Create: 2020-07-01 10:37**/
public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("group1");producer.setNamesrvAddr("gtom.top:9876;gtom2.top:9876");producer.start();List<OrderStep> orderSteps = OrderStep.buildOrders();int i = 0;for (OrderStep orderStep : orderSteps) {String body = orderStep.toString();Message msg = new Message("OrderTopic", "Order", "i = " + i, body.getBytes());SendResult sendResult = producer.send(msg, new MessageQueueSelector() {/**** @param mqs 队列集合* @param message 消息对象* @param arg 业务标识的参数* @return*/public MessageQueue select(List<MessageQueue> mqs, Message message, Object arg) {long orderId = Long.valueOf(String.valueOf(arg));long index = orderId % mqs.size();return mqs.get((int) index);}}, orderStep.getOrderId());System.out.println("发送结果:" + sendResult);i++;}producer.shutdown();}
}
Consumer:
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;/*** @File: Consumer* @Description:* @Author: tom* @Create: 2020-07-01 10:54**/
public class Consumer {public static void main(String[] args) throws Exception {// 创建消费者 consumer,指定消费者组名DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");// 指定 Nameserver 地址consumer.setNamesrvAddr("gtom.top:9876;gtom2.top:9876");// 订阅主题 Topic 和 Tagconsumer.subscribe("OrderTopic", "Order");// 设置为负载均衡模式消费
// consumer.setMessageModel(MessageModel.BROADCASTING);// 处理消息consumer.registerMessageListener(new MessageListenerOrderly() {// 对于一个队列的消息,只采用一个线程去处理public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {for (MessageExt msg : list) {System.out.println("线程名称:【" + Thread.currentThread().getName() + "】:" + new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});// 启动消费者consumerconsumer.start();System.out.println("消费者启动");}
}
4.2 延时消息
延时消息即消息被提供者发送到队列中后,不会被立即消费,而是在过了设定的时间后,才会被消费者消费
4.2.1 常用场景
延时消费大量用于电商项目,例如:用户要对商品进行付款,若用户一直没有付款,则这个订单就会一直存在于数据库中变成脏数据。这时候可以发送一条延时消息到队列,1h后消费者获取到这条消息,去检查订单状态,若订单还是在未付款状态则取消订单,释放内存。
4.2.2 实现原理
https://blog.csdn.net/hosaos/article/details/90577732
4.2.3 示例代码
ScheduledMessageProducer
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;public class ScheduledMessageProducer {public static void main(String[] args) throws Exception {// 实例化一个生产者来产生延时消息DefaultMQProducer producer = new DefaultMQProducer("group1");producer.setNamesrvAddr("gtom.top:9876;gtom2.top:9876");// 启动生产者producer.start();for (int i = 0; i < 10; i++) {Message message = new Message("DelayTopic", ("Hello scheduled message " + i).getBytes());// 设置延时等级3,这个消息将在10s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)message.setDelayTimeLevel(3);// 发送消息SendResult sendResult = producer.send(message);SendStatus status = sendResult.getSendStatus();System.out.println("发送结果: " + status);}// 关闭生产者producer.shutdown();}
}
ScheduledMessageConsumer
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;/*** @File: ScheduledMessageConsumer* @Description:* @Author: tom* @Create: 2020-07-01 11:34**/
public class ScheduledMessageConsumer {public static void main(String[] args) throws Exception {// 实例化消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");consumer.setNamesrvAddr("gtom.top:9876;gtom2.top:9876");// 订阅Topicsconsumer.subscribe("DelayTopic", "*");// 注册消息监听者consumer.registerMessageListener(new MessageListenerConcurrently() {public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {for (MessageExt message : messages) {// Print approximate delay time periodSystem.out.println("Receive message[msgId=" + message.getMsgId() + "]:延迟时间 = " + (System.currentTimeMillis() - message.getStoreTimestamp()) + "ms later");}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 启动消费者consumer.start();System.out.println("消费者启动");}
}
4.3 批量消息
批量发送消息能够显著提高传递小消息的性能,限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4M。
只需要将要批量发送的消息放在 List 集合中即可
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.common.message.Message;import java.util.ArrayList;
import java.util.List;public class Producer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("group1");producer.setNamesrvAddr("gtom.top:9876;gtom2.top:9876");producer.start();List<Message> msgs = new ArrayList<Message>();Message msg1 = new Message("BatchTopic", "Tag1", ("Hello World" + 1).getBytes());Message msg2 = new Message("BatchTopic", "Tag1", ("Hello World" + 2).getBytes());Message msg3 = new Message("BatchTopic", "Tag1", ("Hello World" + 3).getBytes());msgs.add(msg1);msgs.add(msg2);msgs.add(msg3);SendResult sendResult = producer.send(msgs);SendStatus status = sendResult.getSendStatus();System.out.println("发送结果: " + status);Thread.sleep(1);}
}
若是批量消息可能大于4M,则需要对消息进行分割
public class ListSplitter implements Iterator<List<Message>> {private final int SIZE_LIMIT = 1024 * 1024 * 4;private final List<Message> messages;private int currIndex;public ListSplitter(List<Message> messages) {this.messages = messages;}@Override public boolean hasNext() {return currIndex < messages.size();}@Override public List<Message> next() {int nextIndex = currIndex;int totalSize = 0;for (; nextIndex < messages.size(); nextIndex++) {Message message = messages.get(nextIndex);int tmpSize = message.getTopic().length() + message.getBody().length;Map<String, String> properties = message.getProperties();for (Map.Entry<String, String> entry : properties.entrySet()) {tmpSize += entry.getKey().length() + entry.getValue().length();}tmpSize = tmpSize + 20; // 增加日志的开销20字节if (tmpSize > SIZE_LIMIT) {//单个消息超过了最大的限制//忽略,否则会阻塞分裂的进程if (nextIndex - currIndex == 0) {//假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环nextIndex++;}break;}if (tmpSize + totalSize > SIZE_LIMIT) {break;} else {totalSize += tmpSize;}}List<Message> subList = messages.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;}
}
使用:
//把大的消息分裂成若干个小的消息
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) {try {List<Message> listItem = splitter.next();producer.send(listItem);} catch (Exception e) {e.printStackTrace();//处理error}
}
4.4 过滤消息
4.4.1 常见语法
ocketMQ只定义了一些基本语法来支持这个特性。你也可以很容易地扩展它。
符号:
* 数值比较,比如:**>,>=,<,<=,BETWEEN,=;**
* 字符比较,比如:**=,<>,IN;**
* **IS NULL** 或者 **IS NOT NULL;**
* 逻辑符号 **AND,OR,NOT;**
常量支持类型为:
* 数值,比如:**123,3.1415;**
* 字符,比如:**'abc',必须用单引号包裹起来;**
* **NULL**,特殊的常量
* 布尔值,**TRUE** 或 **FALSE**
只有使用push模式的消费者才能用使用SQL92标准的sql语句,接口如下:
public void subscribe(finalString topic, final MessageSelector messageSelector)
4.4.2 多个 Tag 消费
consumer.subscribe("FilterTagTopic", "Tag1 || Tag2");
4.4.3 SQL 语法过滤
Provider:
// 创建消息对象,指定主体 Topic、Tag 和消息体
Message message = new Message("FilterTagTopic", "Tag1", ("Hello World" + i).getBytes());
message.putUserProperty("i",String.valueOf(i));
Consumer:
consumer.subscribe("FilterTagTopic", MessageSelector.bySql("i>5"));
4.5 事务消息
4.5.1 事务消息处理流程图
上图说明了事务消息的大致方案,其中分为两个流程:正常事务消息的发送及提交、事务消息的补偿流程。
1)事务消息发送及提交
(1) 发送消息(half消息)。
(2) 服务端响应消息写入结果。
(3) 根据发送结果执行本地事务(如果写入失败,此时half消息对业务不可见,本地逻辑不执行)。
(4) 根据本地事务状态执行Commit或者Rollback(Commit操作生成消息索引,消息对消费者可见)
2)事务补偿
(1) 对没有Commit/Rollback的事务消息(pending状态的消息),从服务端发起一次“回查”
(2) Producer收到回查消息,检查回查消息对应的本地事务的状态
(3) 根据本地事务状态,重新Commit或者Rollback
其中,补偿阶段用于解决消息Commit或者Rollback发生超时或者失败的情况。
4.5.2 三种状态
事务消息共有三种状态,提交状态、回滚状态、中间状态:
-
TransactionStatus.CommitTransaction: 提交事务,它允许消费者消费此消息。
-
TransactionStatus.RollbackTransaction: 回滚事务,它代表该消息将被删除,不允许被消费。
-
TransactionStatus.Unknown: 中间状态,它代表需要检查消息队列来确定状态。
4.5.3 示例代码
事务消息监听类:
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;public class TransactionListenerImpl implements TransactionListener {public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {System.out.println("执行本地事务");if (StringUtils.equals("Tag1", msg.getTags())) {return LocalTransactionState.COMMIT_MESSAGE;} else if (StringUtils.equals("Tag2", msg.getTags())) {return LocalTransactionState.ROLLBACK_MESSAGE;} else {return LocalTransactionState.UNKNOW;}}public LocalTransactionState checkLocalTransaction(MessageExt msg) {System.out.println("MQ检查消息Tag【"+msg.getTags()+"】的本地事务执行结果");return LocalTransactionState.COMMIT_MESSAGE;}
}
生产者中使用:
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.producer.*;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.tomcat.jni.Local;import java.util.concurrent.TimeUnit;/*** 发送事务消息*/
public class Producer {public static void main(String[] args) throws Exception {//创建事务监听器TransactionListener transactionListener = new TransactionListenerImpl();// 创建消息生产者producer,并指定生产者组名TransactionMQProducer producer = new TransactionMQProducer("group2");// 指定 Nameserver 地址producer.setSendMsgTimeout(6000);producer.setVipChannelEnabled(false);producer.setNamesrvAddr("gtom.top:9876;gtom2.top:9876");//绑定监听器producer.setTransactionListener(transactionListener);String[] tags = {"Tag1", "Tag2", "Tag3"};// 启动 producerproducer.start();for (int i = 0; i < 3; i++) {// 创建消息对象,指定主体 Topic、Tag 和消息体Message message = new Message("TranscationTopic", tags[i], ("TranscationTopic index = " + i).getBytes());// 发送消息SendResult result = producer.sendMessageInTransaction(message, null);// 发送状态、ID以及接受消息的队列的IDSendStatus status = result.getSendStatus();String msgId = result.getMsgId();int queueId = result.getMessageQueue().getQueueId();System.out.println("发送状态: " + status + " 消息ID: " + msgId + " 队列: " + queueId);TimeUnit.SECONDS.sleep(1);}// 关闭 producer
// producer.shutdown();System.out.println("生产者启动");}
}