当前位置: 代码迷 >> 综合 >> JMS(RabbitMQ)的进阶四:Fanout(发布订阅)
  详细解决方案

JMS(RabbitMQ)的进阶四:Fanout(发布订阅)

热度:45   发布时间:2023-12-15 19:14:17.0

??这个模式跟Topic相似,但是它不用routerkey匹配,直接就将队列绑在交换机上了,但凡发到这个交换机上的消息都会统统转到所有绑定到这个交换机上的队列。而且要注意的是,如果有个同名的绑到其他交换机上的队列也被绑到此fanout交换机,它也会收到!

1、常量

//广播public final static String SMS_EXCHANGE_FANOUT ="sms-exchange-fanout";public final static String SMS_QUEUE_FANOUT_1 ="sms-queue_fanout1";public final static String SMS_QUEUE_FANOUT_2 ="sms-queue_fanout2";

2、接收方

 //订阅接收@RabbitListener(bindings = @QueueBinding(value=@Queue(SMSConstants.SMS_QUEUE_FANOUT_1),exchange = @Exchange(value = SMSConstants.SMS_EXCHANGE_FANOUT,type = ExchangeTypes.FANOUT)))@RabbitHandlerprotected void onFanOutReceive1(Message message, SMSBean sms, Channel channel) throws IOException {
    log.info("=========onFanOutReceive1已接收===========");if (consumer!=null) {
    consumer.accept("FanOut-"+SMSConstants.SMS_QUEUE_FANOUT_1, sms);}}//订阅接收@RabbitListener(bindings = @QueueBinding(value=@Queue(SMSConstants.SMS_QUEUE_FANOUT_2),exchange = @Exchange(value = SMSConstants.SMS_EXCHANGE_FANOUT,type = ExchangeTypes.FANOUT)))@RabbitHandlerprotected void onFanOutReceive2(Message message, SMSBean sms, Channel channel) throws IOException {
    log.info("=========onFanOutReceive2已接收===========");if (consumer!=null) {
    consumer.accept("FanOut-"+SMSConstants.SMS_QUEUE_FANOUT_2, sms);}}

3、发送方

3.a 初始化

//如果交换机不存在,则定义if (!jmsDeclare.isExChangeExists(SMSConstants.SMS_EXCHANGE_FANOUT)){
    jmsDeclare.declareExchange(jmsDeclare.newFanoutExchange(SMSConstants.SMS_EXCHANGE_FANOUT));}if (!jmsDeclare.isQueueExists(SMSConstants.SMS_QUEUE_FANOUT_1)){
    jmsDeclare.initQueue(SMSConstants.SMS_QUEUE_FANOUT_1);jmsDeclare.bindQueue(SMSConstants.SMS_QUEUE_FANOUT_1,SMSConstants.SMS_EXCHANGE_FANOUT,"",null);}if (!jmsDeclare.isQueueExists(SMSConstants.SMS_QUEUE_FANOUT_2)){
    jmsDeclare.initQueue(SMSConstants.SMS_QUEUE_FANOUT_2);jmsDeclare.bindQueue(SMSConstants.SMS_QUEUE_FANOUT_2,SMSConstants.SMS_EXCHANGE_FANOUT,"",null);}

3.b 发送

 @Overridepublic void fanOutSendSMS(SMSBean sms) {
    Message message=ackRabbitTemplate.getMessageConverter().toMessage(sms,null);CorrelationData correlationData= new CorrelationData(message.getMessageProperties().getMessageId());smsConfig.saveSMS(sms,correlationData.getId());    amqpTemplate.convertAndSend(SMSConstants.SMS_EXCHANGE_FANOUT,"",message);}

测试

@Testpublic void testFanOut(){
    receiveSMSService.setOnMessage((s,sms)->{
    System.out.println(s+" 接收:"+sms.toString());});SMSBean sms=SMSUtil.genSMS();sms.setSubject("这是Fanout的消息,欢迎订阅");sendSMSService.fanOutSendSMS(sms);SMSUtil.sleep(2000);}

结果

2022-01-10 12:10:01.531  INFO 19388 --- [ntContainer#6-1] c.f.j.r.service.ReceiveSMSServiceImp     : ============onReceived2接收=========
2022-01-10 12:10:01.531  INFO 19388 --- [ntContainer#1-1] c.f.j.r.service.ReceiveSMSServiceImp     : ============onReceived接收=========
sms.direct-sms-queue-direct 接收:SMSBean(subject=这是Fanout的消息,欢迎订阅, content=this is a sms message, from=sender1, to=sendto1, timestamp=Mon Jan 10 12:09:56 CST 2022)
sms.direct-sms-queue-direct2 接收:SMSBean(subject=这是Fanout的消息,欢迎订阅, content=this is a sms message, from=sender1, to=sendto1, timestamp=Mon Jan 10 12:09:56 CST 2022)
2022-01-10 12:10:01.539  INFO 19388 --- [ntContainer#4-1] c.f.j.r.service.ReceiveSMSServiceImp     : =========onFanOutReceive1已接收===========
FanOut-sms-queue_fanout1 接收:SMSBean(subject=这是Fanout的消息,欢迎订阅, content=this is a sms message, from=sender1, to=sendto1, timestamp=Mon Jan 10 12:09:56 CST 2022)
2022-01-10 12:10:03.439  INFO 19388 --- [ntContainer#2-1] c.f.j.r.service.ReceiveSMSServiceImp     : =========onFanOutReceive2已接收===========
FanOut-sms-queue_fanout2 接收:SMSBean(subject=这是Fanout的消息,欢迎订阅, content=this is a sms message, from=sender1, to=sendto1, timestamp=Mon Jan 10 12:09:56 CST 2022)

代码链接,觉得有用的话请点个Star: https://gitee.com/tigera15/jms-samples