当前位置: 代码迷 >> 综合 >> RabbitMQ -- part II
  详细解决方案

RabbitMQ -- part II

热度:94   发布时间:2023-10-13 11:52:31.0

目录:

  1. 消息如何保障100%的投递成功?
  2. 幂等性概念详解
  3. Confirm确认消息
  4. Return返回消息
  5. 自定义消费者
  6. 消息端的限流
  7. 消息的ACK与重回队列
  8. TTL消息
  9. 死信队列

一.消息如何保障100%的投递成功?

什么是生产端的可靠性投递?

  • 保障消息的成功发出
  • 保障MQ节点的成功接收
  • 发送端收到MQ节点(Broker) 确认应答
  • 完善的消息进行补偿机制

生产端-可靠性投递(一)

BAT/TMD互联网大厂的解决方案:

  • **方案1:**消息落库,对消息状态进行打标
  • **方案2:**消息的延迟投递,做二次确认,回调检查

第一种方案:消息落库方案:

RabbitMQ -- part II

上图解释:

Step1:首先将要执行的业务和消息记录进行DB的持久化(不成功的情况下会有相应的快速失败处理机制)

Step2:在Step1成功以后才发送消息到Broker

Step3:Broker会发送一个成功落库的一个响应,生产端会异步的监听这个回送的响应

Step4:如果Step3中没有问题,那么就修改DB里这条消息的状态码为成功。

Step5:如果消息因为各种原因导致消息未能消费成功,那么导致在MSG DB中的消息状态一直为失败,得到这个失败的消息。

Step6:这个分布式定时任务检测消息是否正常执行,比如在5分钟内仍未成功,那么就进行消息的重新发送。

Step7:但是这个重发不能无限制的重发,如果确实是因为路由规则或者业务逻辑导致的错误,那么就不应该持续的重发,所以我们一般设定一个重发次数,在到达这个次数以后仍未成功的,后续人工进行补偿性处理。

思考:

保障MQ我们思考如果第一 种可靠性投递,在高并发的场景下是否适合?

我们不应该对消息也进行一个持久化机制,一般只考虑将业务进行持久化,因为在高并发情况下对消息持久化会对磁盘进行频繁读写,那么也会造成效率问题。

第二种方案:消息的延迟投递,做二_次确认,回调检查

主要目的:减少数据库持久化操作,其实也就是减少了第一种方案的Step1的时间成本。

RabbitMQ -- part II

一定要先对业务消息进行入库以后才能发送消息。另外一点就是一般来讲更关注的是性能,而不是百分之百的消息投递成功

上图流程说明:

首先对业务消息进行持久化

Step1、Step2:同时生成两个消息,一个首先发送,另一个为延迟检查消息,比如5分钟后发送这个延迟消息

Step3:下游消费端DownStream Service监听消息的消费。

Step4:当消息消费成功,这时候会产生一个确认消息,将这个确认消息发送到Broker的指定队列。

Step5:Callback Service监听这个存放确认消息的指定队列,当监听到这个消息已经成功被下游服务消费,那么这个时候就去将这个消息进行一个数据库持久化。

Step6:比如5分钟之后Second Send Delay Check延迟检查的消息发送到指定队列,回调服务监听到了这个消息,这个时候如果Step5一直没有得到正确返回,或者数据库中一直没有更新成功,那么此时Callback Service就会发送一个RPC的重发命令,告诉上游服务消息没有被正常的消费,需要重新发送。

二.幂等性概念详解

RabbitMQ -- part II

乐观锁:比如我们要更新库存表的时候先要去查询库存,当A、B两个并发的事务同一时间点假设查询到的都是version=1,此时A先提交了更新操作,并且将version进行了加1操作,此时Version=2,当事务B再进行更新操作时,查询不到Version=1的记录,所以更新失败。

幂等性:在高并发的业务下保障业务的正常执行。

幂等性的保障:

1.在海量订单产生的业务高峰期,如何避免消息的重复消费问题?

消费端实现幂等性,就意味着,我们的消息永远不会消费多次,即时我们收到了多条一样的消息

2.业界主流的幂等性操作:

  • 唯一ID +指纹码机制,利用数据库主键去重
  • 利用Redis的原子性去实现

RabbitMQ -- part II

分摊数据库的压力

RabbitMQ -- part II

三.Confirm确认消息

理解Confirm消息确认机制:

  • 消息的确认,是指生产者投递消息后,如果Broker收到消息,则会给我们生产者一个应答。
  • 生产者进行接收应答,用来确定这条消息是否正常的发送到Broker,这种方式也是消息的可靠性投递的核心保障!

RabbitMQ -- part II

如何实现Confirm确认消息?

  • 第一步:在channel.上开启确认模式: channel.confirmSelect()
  • 第二步:在channel.上添加监听: addConfirmListener, 监听成功和失败的返回结果,根据具体的结果对消息进行重新发送、或记录日志等后续处理!

代码示例:

生产者代码:

public class Producer {
    public static void main(String[] args) throws Exception {
    //1 创建ConnectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.11.76");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");//2 获取C onnectionConnection connection = connectionFactory.newConnection();//3 通过Connection创建一个新的ChannelChannel channel = connection.createChannel();//4 指定我们的消息投递模式: 消息的确认模式 channel.confirmSelect();String exchangeName = "test_confirm_exchange";String routingKey = "confirm.save";//5 发送一条消息String msg = "Hello RabbitMQ Send confirm message!";channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());//6 添加一个确认监听channel.addConfirmListener(new ConfirmListener() {
    @Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {
    System.err.println("-------no ack!-----------");}@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {
    System.err.println("-------ack!-----------");}});}

消费端代码:

public class Consumer {
    public static void main(String[] args) throws Exception {
    //1 创建ConnectionFactoryConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.11.76");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");//2 获取ConnectionConnection connection = connectionFactory.newConnection();//3 通过Connection创建一个新的ChannelChannel channel = connection.createChannel();String exchangeName = "test_confirm_exchange";String routingKey = "confirm.#";String queueName = "test_confirm_queue";//4 声明交换机和队列 然后进行绑定设置, 最后制定路由Keychannel.exchangeDeclare(exchangeName, "topic", true);channel.queueDeclare(queueName, true, false, false, null);channel.queueBind(queueName, exchangeName, routingKey);//5 创建消费者 QueueingConsumer queueingConsumer = new QueueingConsumer(channel);channel.basicConsume(queueName, true, queueingConsumer);while(true){
    Delivery delivery = queueingConsumer.nextDelivery();String msg = new String(delivery.getBody());System.err.println("消费端: " + msg);}}
}

四.Return返回消息

Return消息机制

  • Return Listener用于处理一些不可路由的消息!
  • 我们的消息生产者,通过指定- -个Exchange 和Routingkey,把消息送达到某一个队列中去,然后我们的消费者监听队列,进行消费处理操作!
  • 但是在某些情况下,如果我们在发送消息的时候,当前的exchange不存在或者指定的路由key路由不到,这个时候如果我们需要监听这种不可达的消息,就要使用Return Listener !
  • 在基础API中有一个关键的配置项: .Mandatory:如果为true,则监听器会接收到路由不可达的消息,然后进行后续处理,如果为false,那么broker端自动删除该消息!

RabbitMQ -- part II

示例程序:

生产者程序:

public class Producer {
    public static void main(String[] args) throws Exception {
    ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.11.76");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchange = "test_return_exchange";String routingKey = "return.save";String routingKeyError = "abc.save";String msg = "Hello RabbitMQ Return Message";channel.addReturnListener(new ReturnListener() {
    @Overridepublic void handleReturn(int replyCode, String replyText, String exchange,String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.err.println("---------handle return----------");System.err.println("replyCode: " + replyCode);System.err.println("replyText: " + replyText);System.err.println("exchange: " + exchange);System.err.println("routingKey: " + routingKey);System.err.println("properties: " + properties);System.err.println("body: " + new String(body));}});//设置一个错误的路由规则就可以返回return消息//channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());//设置正确的路由规则就会正常消费消息channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());}
}

消费者程序:

public class Consumer {
    public static void main(String[] args) throws Exception {
    ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.11.76");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchangeName = "test_return_exchange";String routingKey = "return.#";String queueName = "test_return_queue";channel.exchangeDeclare(exchangeName, "topic", true, false, null);channel.queueDeclare(queueName, true, false, false, null);channel.queueBind(queueName, exchangeName, routingKey);QueueingConsumer queueingConsumer = new QueueingConsumer(channel);channel.basicConsume(queueName, true, queueingConsumer);while(true){
    QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();String msg = new String(delivery.getBody());System.err.println("消费者: " + msg);}}
}

五.自定义消费者

  • 我们一 般就是在代码中编写while循环,进行consumer.nextDelivery方法进行获取下一条消息,然后进行消费处理!
  • 但是我们使用自定义的Consumer更加的方便,解耦性更加的强,也是在实际工作中最常用的使用方式!

使用:继承DefaultConsumer,重写handleDelivery(…)方法即可

示例程序:

生产者:

public class Producer {
    public static void main(String[] args) throws Exception {
    ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.11.76");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchange = "test_consumer_exchange";String routingKey = "consumer.save";String msg = "Hello RabbitMQ Consumer Message";for(int i =0; i<5; i ++){
    channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());}}
}

消费者:

public class Consumer {
    public static void main(String[] args) throws Exception {
    ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.11.76");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchangeName = "test_consumer_exchange";String routingKey = "consumer.#";String queueName = "test_consumer_queue";channel.exchangeDeclare(exchangeName, "topic", true, false, null);channel.queueDeclare(queueName, true, false, false, null);channel.queueBind(queueName, exchangeName, routingKey);channel.basicConsume(queueName, true, new MyConsumer(channel));}
}

自定义Consumer实现:

public class MyConsumer extends DefaultConsumer {
    public MyConsumer(Channel channel) {
    super(channel);}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.err.println("-----------consume message----------");System.err.println("consumerTag: " + consumerTag);System.err.println("envelope: " + envelope);System.err.println("properties: " + properties);System.err.println("body: " + new String(body));}}

六.消费端的限流

假设一个场景,首先,我们Rabbitmq服务器有上万条未处理的消息,我们随便打开-一个消费者客户端,会出现下面情况:巨量的消息瞬间全部推送过来,但是我们单个客户端无法同时处理这么多数据!

RabbitMQ提供了一种qos (服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设Qos的值)未被确认前,不进行消费新的消息。

void BasicQos(uint prefetchSize, ushort prefetchCount, bool global);

  • prefetchSize: 0
  • prefetchCount:会告诉RabbitMQ不要同时给一个消费者推送多于N个消息,即一-旦有N个消息还没有ack,则该consumer将block掉,直到有消息ack
  • global: true\false 是否将上面设置应用于channel
    简单点说,就是上面限制是channe|级别的还是consumer级别

RabbitMQ -- part II

示例程序:

生产者:

public class Producer {
    public static void main(String[] args) throws Exception{
    ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("47.98.155.175");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchange = "test_qos_exchange";String routingKey = "qos.save";String msg = "Hello RabbitMQ QOS Message";for(int i =0; i<5; i ++){
    channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());}}
}

消费者:

public class Consumer {
    public static void main(String[] args) throws Exception {
    ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("47.98.155.175");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchangeName = "test_qos_exchange";String queueName = "test_qos_queue";String routingKey = "qos.#";//声明和绑定channel.exchangeDeclare(exchangeName,"topic",true,false,null);channel.queueDeclare(queueName,true,false,false,null);channel.queueBind(queueName,exchangeName,routingKey);//1 限流方式 第一件事就是 autoAck设置为 false,每次给消费者推送一个消息channel.basicQos(0, 1, false);channel.basicConsume(queueName, false, new MyConsumer(channel));}
}

重写的消费者:

public class MyConsumer extends DefaultConsumer {
    private Channel channel;public MyConsumer(Channel channel) {
    super(channel);this.channel=channel;}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.err.println("-----------consume message----------");System.err.println("consumerTag: " + consumerTag);System.err.println("envelope: " + envelope);System.err.println("properties: " + properties);System.err.println("body: " + new String(body));//确认消息的方法,回调成功以后再执行下一条,表示这条消息我已经处理完了,你可以给我下一条了。false表示不批量签收channel.basicAck(envelope.getDeliveryTag(),false);}

当将channel.basicAck(envelope.getDeliveryTag(),false);这段代码注释掉以后出现下面的状态,显示未确认的有一条消息,准备好的有四条消息。

RabbitMQ -- part II

七.消费端ACK与重回队列

消费端的手工ACK和NACK

  • 消费端进行消费的时候,如果由于业务异常我们可以进行日志的记录,然后进行补偿!
  • 如果由于服务器宕机等严重问题,那我们就需要手工进行ACK保障消费端消费成功!

消费端的重回队列

  • 消费端重回队列是为了对没有处理成功的消息,把消息重新会递给Broker!
  • 一般我们在实际应用中,都会关闭重回队列,也就是设置为False

示例程序:

生产者:

public class Producer {
    public static void main(String[] args) throws Exception {
    ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.11.76");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchange = "test_ack_exchange";String routingKey = "ack.save";for(int i =0; i<5; i ++){
    Map<String, Object> headers = new HashMap<String, Object>();headers.put("num", i);AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).contentEncoding("UTF-8").headers(headers).build();String msg = "Hello RabbitMQ ACK Message " + i;channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());}}
}

消费者:

public class Consumer {
    public static void main(String[] args) throws Exception {
    ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.11.76");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchangeName = "test_ack_exchange";String queueName = "test_ack_queue";String routingKey = "ack.#";channel.exchangeDeclare(exchangeName, "topic", true, false, null);channel.queueDeclare(queueName, true, false, false, null);channel.queueBind(queueName, exchangeName, routingKey);// 手工签收 必须要关闭 autoAck = falsechannel.basicConsume(queueName, false, new MyConsumer(channel));}
}

重写的消费实现:

public class MyConsumer extends DefaultConsumer {
    private Channel channel ;public MyConsumer(Channel channel) {
    super(channel);this.channel = channel;}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.err.println("-----------consume message----------");System.err.println("body: " + new String(body));try {
    Thread.sleep(2000);} catch (InterruptedException e) {
    e.printStackTrace();}if((Integer)properties.getHeaders().get("num") == 0) {
    //multiple:是否批量 requeue:true 是否重回队列,确认失败,重新将消息放到队列尾部执行。channel.basicNack(envelope.getDeliveryTag(), false, true);} else {
    channel.basicAck(envelope.getDeliveryTag(), false);}}}

channel.basicNack(envelope.getDeliveryTag(), false, true);

//multiple:是否批量 requeue:true 是否重回队列,确认失败,重新将消息放到队列尾部执行。

八.TTL消息

  • TTL是Time To Live的缩写,也就是生存时间
  • RabbitMQ支持消息的过期时间,在消息发送时可以进行指定
  • RabbitMQ支持队列的过期时间,从消息入队列开始计算,只要超过了队列的超时时间配置,那么消息会自动的清除

示例:

1.在控制台新建一个exchange和queue

RabbitMQ -- part II

2.添加绑定规则

RabbitMQ -- part II

3.测试发送消息,查看对应过期时间。

注意这里的过期时间不同于前面AMQP.BasicProperties设置过期时间,这里的ttl指的是队列的过期时间,即到这个队列的所有消息十秒后过期,而前面的是指的是消息的过期时间。

九.死信队列

死信队列: DLX, Dead-Letter-Exchange

利用DLX,当消息在一个队列中变成死信(dead message)之后,它能被重新publish到另一个Exchange, 这个Exchange就是DLX

消息变成死信有一下几种情况

  • 消息被拒绝(basic.reject/ basic.nack)并且requeue=false(即消息虽然消息消费失败,但是又不想让消息重新回到队列中)
  • 消息TTL过期
  • 队列达到最大长度

死信队列

  • DLX也是一个正常的Exchange, 和一般的Exchange没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。
  • 当这个队列中有死信时,RabbitMQ就会 自动的将这个消息重新发布到
    设置的Exchange.上去,进而被路由到另一个队列。
  • 可以监听这个队列中消息做相应的处理,这个特性可以弥补RabbitMQ3.0以前支持的immediate参数的功能。

死信队列前提

RabbitMQ -- part II

示例程序:

生产者:

public class Producer {
    public static void main(String[] args) throws Exception {
    ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.11.76");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();String exchange = "test_dlx_exchange";String routingKey = "dlx.save";String msg = "Hello RabbitMQ DLX Message";for(int i =0; i<1; i ++){
    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).contentEncoding("UTF-8").expiration("10000").build();channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());}}
}

消费者:

public class Consumer {
    public static void main(String[] args) throws Exception {
    ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.11.76");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();// 这就是一个普通的交换机 和 队列 以及路由String exchangeName = "test_dlx_exchange";String routingKey = "dlx.#";String queueName = "test_dlx_queue";channel.exchangeDeclare(exchangeName, "topic", true, false, null);Map<String, Object> agruments = new HashMap<String, Object>();agruments.put("x-dead-letter-exchange", "dlx.exchange");//这个agruments属性,要设置到声明队列上channel.queueDeclare(queueName, true, false, false, agruments);channel.queueBind(queueName, exchangeName, routingKey);//要进行死信队列的声明:channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);channel.queueDeclare("dlx.queue", true, false, false, null);channel.queueBind("dlx.queue", "dlx.exchange", "#");channel.basicConsume(queueName, true, new MyConsumer(channel));}
}

重写的消费方式代码:

public class MyConsumer extends DefaultConsumer {
    public MyConsumer(Channel channel) {
    super(channel);}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.err.println("-----------consume message----------");System.err.println("consumerTag: " + consumerTag);System.err.println("envelope: " + envelope);System.err.println("properties: " + properties);System.err.println("body: " + new String(body));}}

运行消费端可以看到我们定义的两个队列和两个交换机:

RabbitMQ -- part II

当我们停止消费端,再去启动发送端的时候,这个时候由于没有消费端进行消费,经过10秒后,这个消息变为了死信队列。如下图。

RabbitMQ -- part II

第三章完结。。。

重写的消费方式代码:

public class MyConsumer extends DefaultConsumer {
    public MyConsumer(Channel channel) {
    super(channel);}@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.err.println("-----------consume message----------");System.err.println("consumerTag: " + consumerTag);System.err.println("envelope: " + envelope);System.err.println("properties: " + properties);System.err.println("body: " + new String(body));}}

运行消费端可以看到我们定义的两个队列和两个交换机:

[外链图片转存中…(img-xaaMHgWs-1631458058357)]

当我们停止消费端,再去启动发送端的时候,这个时候由于没有消费端进行消费,经过10秒后,这个消息变为了死信队列。如下图。

[外链图片转存中…(img-4OkeDIOx-1631458058358)]

第三章完结。。。

  相关解决方案