1.导依赖包
<dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-all</artifactId><version>5.15.10</version>
</dependency>
2.消息生产者
public class Sender {
public static void main(String[] args) throws JMSException, InterruptedException {
// 1. 建立工厂对象ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL,ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://localhost:61616");//2. 从工厂里获取一个连接Connection connection = factory.createConnection();connection.start();//3. 从连接中获取Session(会话)Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//4. 从会话中获取目的地(Destination),消费者会从这个目的地取消息Queue queue = session.createQueue("lance");//5. 从会话中创建消息提供者MessageProducer producer = session.createProducer(queue);int i = 1;try {
while (true) {
//从会话中创建文本消息(也可以创建其它类型的消息体)TextMessage textMessage = session.createTextMessage("hello" + i);Thread.sleep(1000);// 通过消息提供者发送消息到ActiveMQproducer.send(textMessage);i++;}} catch (JMSException e) {
e.printStackTrace();} catch (InterruptedException e) {
e.printStackTrace();} finally {
// 关闭连接connection.close();}System.out.println("exit");}
}
3.消息消费者
public class Recviver {
public static void main(String[] args) throws JMSException {
// 1. 建立工厂对象ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_BROKER_URL,ActiveMQConnectionFactory.DEFAULT_PASSWORD,"tcp://localhost:61616");//2. 从工厂里获取一个连接Connection connection = factory.createConnection();connection.start();//3. 从连接中获取Session(会话)Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);//4. 从会话中获取目的地(Destination),消费者会从这个目的地取消息Destination destination = session.createQueue("lance");//5. 从会话中创建消息消费者MessageConsumer consumer = session.createConsumer(destination);//6. 持续的消费try {
while (true) {
TextMessage textMessage = (TextMessage) consumer.receive();System.out.println("消费:" + textMessage.getText());}} catch (Exception e) {
e.printStackTrace();} finally {
//关闭连接connection.close();}}
}