当前位置: 代码迷 >> java >> 春天-等待Rabbitmq消息监听器完成
  详细解决方案

春天-等待Rabbitmq消息监听器完成

热度:51   发布时间:2023-07-17 21:04:41.0

我有两个消息侦听器,每个侦听器都在同一RabbitMQ服务器上侦听自己的消息队列。

现在,当第一个侦听器在其队列中接收到一条消息时,它必须询问其他侦听器是否在其队列中有任何消息要处理,并且如果有,请等待第二个侦听器处理此消息,然后再继续自己的消息执行。

我需要这样的东西:

SimpleMessageListenerContainer first;
SimpleMessageListenerContainer second;
if(second.hasReceivedMessageButItsStillNotProcessed){
  Thread.waitUntilSecondProcessesAMessage();
}

或像这样

while(rabbitAdmin.getQueueProperties("qName").get("UNACKED_MSG_COUNT")>0){
  Thread.waitUntilSecondProcessesAMessage();
}

任何帮助表示赞赏。

如果您使用的是Spring,则可以 (第二个答案)中使用此来获取队列消息大小。 您可以尝试这样的事情:

Spring队列配置:

<jms:listener-container 
    connection-factory="jmsFactory"  
    destination-type="queue1">
        <jms:listener destination="queue1" ref="consumer1" method="consume"/>
</jms:listener-container>
<bean id="consumer1" class="m.p.e.MessageConsumer1" scope="tenant"/>
<jms:listener-container 
    connection-factory="jmsFactory"  
    destination-type="queue2" >
        <jms:listener destination="queue2" ref="consumer2" method="consume"/>
</jms:listener-container>
<bean id="consumer2" class="m.p.e.MessageConsumer2" scope="tenant"/>

StatsCounter作为 (第二个答案)

public class QueueStatsProcessor {
    @Autowired
    private RabbitAdmin admin;
    @Autowired
    private List<Queue> rabbitQueues;

    public Integer getCounts(String queueName){
        //find your queue from rabbitQueues
        //return the 
        props = admin.getQueueProperties(queue.getName());
        return Integer.parseInt(props.get("QUEUE_MESSAGE_COUNT");        
    }
}

您的第二个使用者必须等待,直到queue1上没有消息queue1

public class MessageConsumer2{
@Autowired
QueueStatsProcessor queueStatesProcesor;

public void consume(){
    while(queueStatesProcesor.getCounts("queue1")  > 0){
        Thread.sleep(1000);
    }
    //do your own work
}

}

我认为这种逻辑“手动”管理起来有点复杂,我建议您使用诸如或类的东西。