好久没写博客了,最近有时间研究了下JMS,顺便花了点时间用springboot搭了一个Demo,不得不再次感叹Springboot的简易方便以及功能强大,废话说不多说进入正题吧(PS:请忽略我的排版啊,csdn的排序实在玩不转,.. 将就看下吧~)
首先我就不啰嗦什么是消息队列、以及JMS的一些概念了~google上面一大堆~
1、安装ActiveMQ
我是在windows系统上搭建的环境,首先去Apache官网下载安装包,然后解压到本地,附上下载链接http://activemq.apache.org/download.html
解压之后的目录结构如下:
然后进入bin 目录下会有win32跟win64两个文件夹。两个文件夹下的activemq.bat就是对应32位跟64位操作系统的启动脚本了,双击activemq.bat运行结果如下
这样就是启动成功了!但是有一点要注意,计算机名不能是中文,否则就启动失败,下午就踩过这样的坑。。。
成功之后在浏览器输入http://127.0.0.1:8161/地址,可以看到ActiveMQ的管理页面,用户名和密码默认都是admin,如下:
2、新建一个Maven工程,并在pom文件添加包,配置如下:
<!-- JMS activemq --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency>
这里使用的是ActiveMQ 。
3、application.yml 配置
spring:# activemq 消息队列配置activemq:broker-url: tcp://127.0.0.1:61616user: adminpassword: adminjms:pub-sub-domain: false #为true时是topic模式,为false是queue模式,若想同时支持两种模式可替换queue和topic的ContainerFactory,@JmsListener注解后指定CotainerFactory
4、消息的发送方式
消息发送方式有两种方式
①、queue模式
②、topic模式
@Configuration
public class ActiveMqQueueConfig {@Beanpublic Queue queue(){return new ActiveMQQueue("queue");}@Beanpublic Topic topic(){return new ActiveMQTopic("topic");}}
5、消息生产者,这里queue模式跟topic模式是一起写的
package com.kayak.demo.jms;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;import javax.jms.Queue;
import javax.jms.Topic;/*** @author: wd* @Description: TODO*/
@RestController
@RequestMapping(value = "/api", method= RequestMethod.POST)
public class ActiveMqClient {private Logger log = LoggerFactory.getLogger(ActiveMqClient.class);//JmsMessagingTemplate 对 jmsTemplate 进行了封装@Autowiredprivate JmsMessagingTemplate jmsTemplate;@Autowiredprivate Queue queue;@Autowiredprivate Topic topic;/*** queue模式 生产者** @param message*/@RequestMapping("/sendQueue")public void sendQueue(@RequestBody String message){log.info("生产了一条点对点消息:" + message);jmsTemplate.convertAndSend(queue, message);}/*** topic模式 生产者** @param message*/@RequestMapping("/sendTopic")public void sendTopic(@RequestBody String message){log.info("生产了一条订阅消息:" + message);jmsTemplate.convertAndSend(topic, message);}}
6、消息消费者
package com.kayak.demo.jms;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;/*** @author: wd* @Description: TODO*/
@Component
public class ActiveMqServer {private Logger log = LoggerFactory.getLogger(ActiveMqServer.class);@JmsListener(destination = "queue")public void receiveQueue(String message) {log.info("receiveQueue监听到Queue数据:" + message);}@JmsListener(destination = "topic")public void receiveTopic_1(String message) {log.info("receiveTopic_1 监听到Topic数据:" + message);}@JmsListener(destination = "topic")public void receiveTopic_2(String message) {log.info("receiveTopic_2 监听到Topic数据:" + message);}@JmsListener(destination = "topic")public void receiveTopic_3(String message) {log.info("receiveTopic_3 监听到Topic数据:" + message);}@JmsListener(destination = "topic")public void receiveTopic_4(String message) {log.info("receiveTopic_4 监听到Topic数据:" + message);}
}
7、单元测试
测试queue模式
package com.kayak.demo;import com.kayak.demo.jms.ActiveMqClient;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;/*** @author: wd* @Description: TODO*/@RunWith(SpringJUnit4ClassRunner.class) // SpringJUnit支持,由此引入Spring-Test框架支持!
@SpringBootTest(classes = DemoApplication.class) // 指定我们SpringBoot工程的Application启动类
public class MQTest {@AutowiredActiveMqClient activeMqClient;@Testpublic void doBusi() throws Exception {//for(int i=0; i<10; i++){activeMqClient.sendQueue("这是一条Queue的消息");//activeMqClient.sendTopic("这是一条Topic的消息");//}}}
日志如下:
测试topic模式
修改application.yml 配置
pub-sub-domain: true
package com.kayak.demo;import com.kayak.demo.jms.ActiveMqClient;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;/*** @author: wd* @Description: TODO*/@RunWith(SpringJUnit4ClassRunner.class) // SpringJUnit支持,由此引入Spring-Test框架支持!
@SpringBootTest(classes = DemoApplication.class) // 指定我们SpringBoot工程的Application启动类
public class MQTest {@AutowiredActiveMqClient activeMqClient;@Testpublic void doBusi() throws Exception {//for(int i=0; i<10; i++){//activeMqClient.sendQueue("这是一条Queue的消息");activeMqClient.sendTopic("这是一条Topic的消息");//}}}
日志如下: