问题描述
我有两个消息侦听器,每个侦听器都在同一RabbitMQ服务器上侦听自己的消息队列。
现在,当第一个侦听器在其队列中接收到一条消息时,它必须询问其他侦听器是否在其队列中有任何消息要处理,并且如果有,请等待第二个侦听器处理此消息,然后再继续自己的消息执行。
我需要这样的东西:
SimpleMessageListenerContainer first;
SimpleMessageListenerContainer second;
if(second.hasReceivedMessageButItsStillNotProcessed){
Thread.waitUntilSecondProcessesAMessage();
}
或像这样
while(rabbitAdmin.getQueueProperties("qName").get("UNACKED_MSG_COUNT")>0){
Thread.waitUntilSecondProcessesAMessage();
}
任何帮助表示赞赏。
1楼
如果您使用的是Spring,则可以 (第二个答案)中使用此来获取队列消息大小。 您可以尝试这样的事情:
Spring队列配置:
<jms:listener-container
connection-factory="jmsFactory"
destination-type="queue1">
<jms:listener destination="queue1" ref="consumer1" method="consume"/>
</jms:listener-container>
<bean id="consumer1" class="m.p.e.MessageConsumer1" scope="tenant"/>
<jms:listener-container
connection-factory="jmsFactory"
destination-type="queue2" >
<jms:listener destination="queue2" ref="consumer2" method="consume"/>
</jms:listener-container>
<bean id="consumer2" class="m.p.e.MessageConsumer2" scope="tenant"/>
StatsCounter作为 (第二个答案) :
public class QueueStatsProcessor {
@Autowired
private RabbitAdmin admin;
@Autowired
private List<Queue> rabbitQueues;
public Integer getCounts(String queueName){
//find your queue from rabbitQueues
//return the
props = admin.getQueueProperties(queue.getName());
return Integer.parseInt(props.get("QUEUE_MESSAGE_COUNT");
}
}
您的第二个使用者必须等待,直到queue1
上没有消息queue1
:
public class MessageConsumer2{
@Autowired
QueueStatsProcessor queueStatesProcesor;
public void consume(){
while(queueStatesProcesor.getCounts("queue1") > 0){
Thread.sleep(1000);
}
//do your own work
}
}
我认为这种逻辑“手动”管理起来有点复杂,我建议您使用诸如或类的东西。