说明
前面介绍过ActiveMQ 的p2p(点对点)模式,p2p模式只允许一条消息给一个消费者使用,能够保证消息不被重复消费(开启事务时需要特殊处理才能保证)。今天我再介绍一种发布订阅模式,即一条消息能够同时让多个消费者同时消费,只要消费者订阅了topic消息,那么只要有消息发送过来,所有订阅了topic的消费者能同时接收到,但是这个需要保证消费者实时在线。
依赖
<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.15.10</version>
</dependency>
1.生产者:
public class TopicSender {public static void main(String[] args) throws JMSException {// 1. 建立工厂对象ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL,ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://localhost:61616");//2. 从工厂里获取一个连接Connection connection = factory.createConnection();connection.start();//3. 从连接中获取Session(会话)Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //第一个参数为是否开启事务//4. 从会话中获取目的地(Destination),消费者会从这个目的地取消息Destination topic = session.createTopic("lance");//5. 从会话中创建消息提供者MessageProducer producer = session.createProducer(topic);int i = 1;try {while (i <= 20) {//从会话中创建文本消息(也可以创建其它类型的消息体)TextMessage textMessage = session.createTextMessage("hello" + i);producer.send(textMessage);i++;}} catch (JMSException e) {e.printStackTrace();} finally {// 关闭连接connection.close();}System.out.println("exit");}
}
2.消费者:
public class TopicRecviver {public static void main(String[] args) throws JMSException {// 1. 建立工厂对象ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL,ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://localhost:61616"); //2. 从工厂里获取一个连接Connection connection = factory.createConnection();connection.start();//3. 从连接中获取Session(会话)Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //第一个参数为是否开启事务//4. 从会话中获取目的地(Destination),消费者会从这个目的地取消息Destination topic = session.createTopic("lance");//5. 从会话中创建消息提供者MessageConsumer consumer = session.createConsumer(topic);//6. 持续的消费try {while (true) {TextMessage textMessage = (TextMessage) consumer.receive();System.out.println("消费:" + textMessage.getText());}} catch (Exception e) {e.printStackTrace();} finally {//关闭连接connection.close();}}
}