当前位置: 代码迷 >> 综合 >> 【RocketMQ】(二) 顺序消息
  详细解决方案

【RocketMQ】(二) 顺序消息

热度:108   发布时间:2023-11-17 12:48:23.0

消息有序:

        指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。 

 顺序消费的原理解析

         在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。

         如果控制发送的顺序消息只依次发送到同一个queue 中,消费的时候只从这个queue上依次拉取,则就保证了顺序。

        (1)当发送和消费参与的 queue只有一个,则是全局有序;

       (2)如果多个queue参与,则为分区有序,即相对每个 queue,消息都是有序的。

我个人的理解就是

       默认情况下,发送消息是采用轮询的方式去发送消息,发送不同的queue(队列)中,假如数据库有一张user表中没有任何数据,此时新增一条数据:id=1,name="张三";再修改为id=1,name="李四";最后再删除id=1,name="李四";也就是id为1的这条数据,经过了三次数据库操作,如果数据库的每一次操作都需要发送消息到队列当中,假设有queueA,queueB,queueC,这时候会将三条消息发送到三个不同的队列当中,消费的话,也是在三个queue中进行拉取,rocketmq这么做,你在业务处理是没有办法,判断是先新增呢,还是先修改,还是先删除,这个顺序你就没有办法去保证;

     有两种办法去解决,<1>如果不考虑性能的话,全局消息应该是最容易想到的一种办法,因为队列天然的先进先出的优势就可以满足这点;新增一条数据:id=1,name="张三";再修改为id=1,name="李四";最后再删除id=1,name="李四";进去队列是这个顺序,取出来也肯定是这个顺序。<2>分区消息:就是根据某一个key,比如orderId,userId等这种sharding-key来做分区,拿上面的例子来说,这个id就可以作为一个sharding-key,根据不同的sharding-key发送到不同的队列当中,因此在每个队列当中,这个数据都是有序的。

下面来一张分区消息原理的图片:

这个原理图是采用订单的创建、付款、完成的例子,根据orderId这个key,将orderId=2001订单的一系列操作发送到其中一个queue,将orderId=3001订单的一系列操作发送到另一个queue中;这个分配的规则,可以通过取模;

 

例子:张三和李四到银行去存取钱,然后银行发送短信告诉用户,余额多少?

分析:肯定是先存钱,账户有钱后,才能取钱,取完钱或者存完钱后,银行给用户发送短信,这个顺序是不能乱的,总不可能用户账户没有钱,还没有存,就可以取钱了或者就收到了存钱或者取钱短信了。

 生产者代码:

/*** @author lucifer* @date 2020/4/14 14:15* @description 顺序消息 -生产者*/
public class OrderedProducer {public static final String NAME_SERVER_ADDR = "192.168.160.131:9876";public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {//1.创建生产者对象,并指定组名DefaultMQProducer defaultMQProducer = new DefaultMQProducer("GROUP_TEST");//2.指定NameServer地址defaultMQProducer.setNamesrvAddr(NAME_SERVER_ADDR);//3.启动生产者defaultMQProducer.start();//设置异步发送失败重试次数,默认为2次defaultMQProducer.setRetryTimesWhenSendAsyncFailed(0);//4.定义消息队列选择器MessageQueueSelector messageQueueSelector = new MessageQueueSelector() {/*** 消息队列选择器,保证同一条业务数据的消息在同一队列当中* @param list topic中所有队列的集合* @param message   发送的消息* @param o 此参数是本示例中defaultMQProducer.send()中的第三个参数* @return*/@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {Integer id = (Integer) o;//通过id与topic中所有队列的集合的大小进行取模,求出索引值indexint index = id % list.size();//分区顺序:同一个index的消息在同一个队列当中return list.get(index);//全局消息,放在同一个queue//return list.get(0);}};String[] tags = new String[]{"TagA", "TagB", "TagC"};List<Map> bizData = getBizData();//5.循环发送消息for (int i = 0; i < bizData.size(); i++) {Map bizMap = bizData.get(i);// keys:业务数据的ID,比如用户ID、订单编号等等Message message = new Message("TopicTest", tags[i % tags.length],bizMap.get("userId") + "",bizMap.toString().getBytes(RemotingHelper.DEFAULT_CHARSET));//发送有序消息SendResult sendResult = defaultMQProducer.send(message, messageQueueSelector, bizMap.get("userId"));System.out.printf("%s, body:%s%n", sendResult, bizMap);}}private static List<Map> getBizData() {List<Map> orders = new ArrayList<>();HashMap orderData = new HashMap();orderData.put("userId", 1000);orderData.put("userName", "张三");orderData.put("description", "银行存钱,存1000");orders.add(orderData);orderData = new HashMap();orderData.put("userId", 1000);orderData.put("userName", "张三");orderData.put("description", "短信提示:存1000,账户余额1000");orders.add(orderData);orderData = new HashMap();orderData.put("userId", 1000);orderData.put("userName", "张三");orderData.put("description", "银行取钱,取300");orders.add(orderData);orderData = new HashMap();orderData.put("userId", 1000);orderData.put("userName", "张三");orderData.put("description", "短信提示:取300,账户余额700");orders.add(orderData);orderData = new HashMap();orderData.put("userId", 1000);orderData.put("userName", "张三");orderData.put("description", "银行存钱,存400");orders.add(orderData);orderData = new HashMap();orderData.put("userId", 1000);orderData.put("userName", "张三");orderData.put("description", "短信提示:存款400,账户余额1100");orders.add(orderData);orderData = new HashMap();orderData.put("userId", 1001);orderData.put("userName", "李四");orderData.put("description", "银行存钱,存600");orders.add(orderData);orderData = new HashMap();orderData.put("userId", 1001);orderData.put("userName", "李四");orderData.put("description", "短信提示:存款600,账户余额600");orders.add(orderData);return orders;}}

 启动生产者,控制台打印:

(1).如果是全局消息,这里的queueId是一样的,放在了同一个queue中;

SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC2464469920000, offsetMsgId=C0A8A08300002A9F000000000000C1CB, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=0], queueOffset=26], body:{description=银行存钱,存1000, userName=张三, userId=1000}
SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC2464469980001, offsetMsgId=C0A8A08300002A9F000000000000C2E6, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=0], queueOffset=27], body:{description=短信提示:存1000,账户余额1000, userName=张三, userId=1000}
SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC24644699B0002, offsetMsgId=C0A8A08300002A9F000000000000C414, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=0], queueOffset=28], body:{description=银行取钱,取300, userName=张三, userId=1000}
SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC24644699D0003, offsetMsgId=C0A8A08300002A9F000000000000C52E, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=0], queueOffset=29], body:{description=短信提示:取300,账户余额700, userName=张三, userId=1000}
SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC24644699F0004, offsetMsgId=C0A8A08300002A9F000000000000C65A, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=0], queueOffset=30], body:{description=银行存钱,存400, userName=张三, userId=1000}
SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC2464469A10005, offsetMsgId=C0A8A08300002A9F000000000000C776, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=0], queueOffset=31], body:{description=短信提示:存款400,账户余额1100, userName=张三, userId=1000}
SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC2464469A30006, offsetMsgId=C0A8A08300002A9F000000000000C8A4, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=0], queueOffset=32], body:{description=银行存钱,存600, userName=李四, userId=1001}
SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC2464469A50007, offsetMsgId=C0A8A08300002A9F000000000000C9C0, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=0], queueOffset=33], body:{description=短信提示:存款600,账户余额600, userName=李四, userId=1001}

 (1).如果是分区消息,这里的queueId是不一样的,放在了不同queue中;

SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC24648E14E0000, offsetMsgId=C0A8A08300002A9F000000000000E653, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=8], queueOffset=42], body:{description=银行存钱,存1000, userName=张三, userId=1000}
SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC24648E1540001, offsetMsgId=C0A8A08300002A9F000000000000E76E, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=8], queueOffset=43], body:{description=短信提示:存1000,账户余额1000, userName=张三, userId=1000}
SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC24648E1560002, offsetMsgId=C0A8A08300002A9F000000000000E89C, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=8], queueOffset=44], body:{description=银行取钱,取300, userName=张三, userId=1000}
SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC24648E1590003, offsetMsgId=C0A8A08300002A9F000000000000E9B6, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=8], queueOffset=45], body:{description=短信提示:取300,账户余额700, userName=张三, userId=1000}
SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC24648E15C0004, offsetMsgId=C0A8A08300002A9F000000000000EAE2, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=8], queueOffset=46], body:{description=银行存钱,存400, userName=张三, userId=1000}
SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC24648E15F0005, offsetMsgId=C0A8A08300002A9F000000000000EBFE, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=8], queueOffset=47], body:{description=短信提示:存款400,账户余额1100, userName=张三, userId=1000}
SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC24648E1620006, offsetMsgId=C0A8A08300002A9F000000000000ED2C, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=9], queueOffset=45], body:{description=银行存钱,存600, userName=李四, userId=1001}
SendResult [sendStatus=SEND_OK, msgId=24098A1E7A3ECE402C3D4AADC1A10079000018B4AAC24648E1650007, offsetMsgId=C0A8A08300002A9F000000000000EE48, messageQueue=MessageQueue [topic=TopicTest, brokerName=localhost.localdomain, queueId=9], queueOffset=46], body:{description=短信提示:存款600,账户余额600, userName=李四, userId=1001}

消费者代码:

/*** @author lucifer* @date 2020/4/14 14:50* @description 顺序消息 -消费者*/
public class OrderedConsumer {public static final String NAME_SERVER_ADDR = "192.168.160.131:9876";public static void main(String[] args) throws MQClientException {//1.创建消费者DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("GROUP_TEST");//2.设置NameServer地址defaultMQPushConsumer.setNamesrvAddr(NAME_SERVER_ADDR);//设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费// 如果不是第一次启动,那么按照上次消费的位置继续消费defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);//3.订阅对应的主题和TagdefaultMQPushConsumer.subscribe("TopicTest", "TagA || TagB || TagC");defaultMQPushConsumer.setMaxReconsumeTimes(-1);//顺序消费消息defaultMQPushConsumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list,ConsumeOrderlyContext consumeOrderlyContext) {consumeOrderlyContext.setAutoCommit(true);doBiz(list.get(0));return ConsumeOrderlyStatus.SUCCESS;}});// 5. 启动消费者(必须在注册完消息监听器后启动,否则会报错)defaultMQPushConsumer.start();System.out.println("已启动消费者");}/*** 模拟处理业务** @param message*/public static void doBiz(Message message) {try {System.out.printf("线程:%-25s 接收到新消息 %s --- %s %n", Thread.currentThread().getName(), message.getTags(), new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
}

 启动消费者,控制台打印:

已启动消费者
线程:ConsumeMessageThread_1    接收到新消息 TagA --- {description=银行存钱,存1000, userName=张三, userId=1000} 
线程:ConsumeMessageThread_1    接收到新消息 TagB --- {description=短信提示:存1000,账户余额1000, userName=张三, userId=1000} 
线程:ConsumeMessageThread_1    接收到新消息 TagC --- {description=银行取钱,取300, userName=张三, userId=1000} 
线程:ConsumeMessageThread_1    接收到新消息 TagA --- {description=短信提示:取300,账户余额700, userName=张三, userId=1000} 
线程:ConsumeMessageThread_1    接收到新消息 TagB --- {description=银行存钱,存400, userName=张三, userId=1000} 
线程:ConsumeMessageThread_1    接收到新消息 TagC --- {description=短信提示:存款400,账户余额1100, userName=张三, userId=1000} 
线程:ConsumeMessageThread_1    接收到新消息 TagA --- {description=银行存钱,存600, userName=李四, userId=1001} 
线程:ConsumeMessageThread_1    接收到新消息 TagB --- {description=短信提示:存款600,账户余额600, userName=李四, userId=1001}