当前位置: 代码迷 >> 综合 >> SpringBoot2.0实战(11)整合RabbitMQ之ACK消息确认
  详细解决方案

SpringBoot2.0实战(11)整合RabbitMQ之ACK消息确认

热度:50   发布时间:2023-09-29 03:01:11.0

相关知识

交换器

  • direct:一对一
  • fanout:一对多
  • topic:模式匹配

目标

整合 Spring boot 提供的 spring-boot-starter-amqp,实现消息发送、消息消费、确认

准备工作

安装RabbitMQ

请自行根据windows或者Linux进行安装

操作步骤

添加依赖

添加后的整体依赖如下

SpringBoot2.0实战(11)整合RabbitMQ之ACK消息确认

配置

SpringBoot2.0实战(11)整合RabbitMQ之ACK消息确认

编码(消息发送方)

定义 Exchange、Queue并将两者进行关联

SpringBoot2.0实战(11)整合RabbitMQ之ACK消息确认

 

Controller 层代码

引入 spring-boot-starter-amqp 时,会自动注册 RabbitTemplate 到 Spring 容器,

消息发送可以借助其提供的 convertAndSend 方法

SpringBoot2.0实战(11)整合RabbitMQ之ACK消息确认

启动类

SpringBoot2.0实战(11)整合RabbitMQ之ACK消息确认

 

编码(消息消费方)

定义 Queue

SpringBoot2.0实战(11)整合RabbitMQ之ACK消息确认

定义消费类

  • 在类上使用注解 @RabbitListener,声明监听的队列
  • 在处理方法上使用注解 @RabbitHandler,标记该方法为回调处理方法

SpringBoot2.0实战(11)整合RabbitMQ之ACK消息确认

启动类

SpringBoot2.0实战(11)整合RabbitMQ之ACK消息确认

测试消费消息

使用 Application 类启动项目,在日志中可以看到消费消息时产生的日志

DirectReceiver消费者收到消息 : {msgId=ccf1f1c0-f8c5-483a-933e-ed3d77d59333, msgData=null, sendTime=2020-01-07 19:50:23}

去 RabbitMQ 管理后台,查看 Queues 标签页,可以看到队列 TestDirectQueue 的 Ready 值变成了 0,表示消息已经被消费。

编码(消息消费方实现消息确认)

消息接收的确认机制主要存在三种模式:

  1. 自动确认,默认值,RabbitMQ 将消息发送给应用程序,即认为消费成功,如果应用程序在消费消息的过程中,发生异常,RabbitMQ 是无法感知的,依然会将该消息从队列中删除,但实际上应用程序又没有消费成功,相当于丢失了消息。
  2. 不确认,忽略
  3. 手动确认,实际生产多数选择的模式,应用程序接收到消息并进行处理后,返回一个响应(ACK),RabbitMQ 接收到这个响应后,判断是消费成功,还是失败,并调用相应的回调方法进行处理。
  • basic.ack 用于肯定确认
  • basic.nack 用于否定确认
  • basic.reject 用于否定确认,但与 basic.nack 相比有一个限制,一次只能拒绝单条消息

配置

设置消息消费 ACK 模式为自动模式

SpringBoot2.0实战(11)整合RabbitMQ之ACK消息确认

定义消费类

basicReject 的第二个参数是 requeue,意思是是否重新加入队列,

如果为 true,则表示本次消费不成功,并将当前消息重新加入至当前队列,

如果为 false,则表示本次消费不成功,并将当前消息丢弃,如果有设置死信队列,则会进入死信队列(关于死信队列,在下一章会讲)

SpringBoot2.0实战(11)整合RabbitMQ之ACK消息确认

源码地址

本章源码 :https://github.com/caiyuanzi-song/boot.git

结束语

消息队列是实际生产中是必备组件,用于保证系统高可用、高性能、可扩展

  • 解耦:类比观察者模式,比如用户注册成功后,需要记录日志、发送短信,原始操作就是在用户注册成功后,分别调用记录日志及发送短信的方法,使用消息队列后,则可以在用户注册成功后,发送一个注册成功的消息,而注册成功的后续操作则可以订阅该消息队列,分别实现各自的处理逻辑。如果业务变更,增加了新业务,则不用修改原来的代码,而只需要增加一个订阅即可。
  • 异步:用户注册成功后需要发送短信,而发送短信是一个非常耗时的操作,这不仅会影响到用户体验,因为短信发送过程中,程序长时间阻塞,还可能造成后台资源紧张。使用消息队列,异步处理短信发送,则可以完美解决这个问题。这个保证系统高性能的一种常用手段。
  • 削峰/限流:用户注册成功后需要发送短信,而发送短信是一个非常耗时的操作,如果此时有大量请求请求该接口,可能会导致系统挂掉。使用消息队列,异步进行处理,则可以避免这个问题,如果流量特别大,异步一个一个处理都会导致系统宕机,还可以限流,在发送消息时先检测队列是否已满,如果已满,则直接给前端返回错误。

扩展

发送 Fanout 消息

定义队列

Fanout 交换器没有 Routing,直接将队列与交换器进行关联即可

SpringBoot2.0实战(11)整合RabbitMQ之ACK消息确认

发送方法

也是调用 convertAndSend 方法,只是 routing 参数传值 null

SpringBoot2.0实战(11)整合RabbitMQ之ACK消息确认

发送 Topic 消息

定义队列

SpringBoot2.0实战(11)整合RabbitMQ之ACK消息确认

发送方法

跟使用 direct 交换器一样,只是 topic 交互器会根据 routing 进行匹配,然后决定将消息发送至哪些队列

SpringBoot2.0实战(11)整合RabbitMQ之ACK消息确认

  相关解决方案