当前位置: 代码迷 >> 综合 >> ActiveMQ学习3-Spirng集成ActiveMQ/Spirng boot集成ActiveMQ
  详细解决方案

ActiveMQ学习3-Spirng集成ActiveMQ/Spirng boot集成ActiveMQ

热度:83   发布时间:2023-12-16 05:00:38.0

Spirng集成ActiveMQ/Spirng boot集成ActiveMQ

  • 18 Spring 集成 ActiveMQ
    • 18.1 点对点同步接收
    • 18.2 点对点异步接收
    • 18.3 发布订阅同步接收
    • 18.4 发布订阅异步接收
  • 19 Spring boot 集成 ActiveMQ
    • 19.1 点对点同步接收
    • 19.2 点对点异步接收
    • 19.3 发布订阅同步接收
    • 19.4 发布订阅异步接收

??

18 Spring 集成 ActiveMQ

18.1 点对点同步接收

1、加入Spring集成jms依赖

<dependencies><!-- JMS规范的jar依赖 --><dependency><groupId>javax.jms</groupId><artifactId>javax.jms-api</artifactId><version>2.0.1</version></dependency><!-- activeMQ对jms具体实现的jar依赖 --><dependency><groupId>org.apache.activemq</groupId><artifactId>activemq-client</artifactId><version>5.15.8</version></dependency><!--slf4j日志的简易实现--><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.21</version></dependency><!--spring-jms依赖--><dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>4.3.13.RELEASE</version></dependency>
</dependencies>

2、创建Spring配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsd"><!--导入jms配置文件--><import resource="applicationContext-jms.xml"></import></beans>
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:context="http://www.springframework.org/schema/context"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsdhttp://www.springframework.org/schema/contexthttp://www.springframework.org/schema/context/spring-context.xsd"><!--扫描service包,将service对象注入到spring容器中--><context:component-scan base-package="com.activemq.service.sender"></context:component-scan><!-- 配置一个连接工厂 --><bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://192.168.64.128:61616"/><!--<property name="userName" value="system"/>--><!--<property name="password" value="A123456"/>--></bean><!-- 配置JmsTemplate --><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="connectionFactory" /><property name="defaultDestinationName" value="mySpringQueue" /></bean><!--在spring中配置jms和配置Redis非常相似-->
</beans>

3、创建消息发送类

@Service
public class MessageSendService {
    @AutowiredJmsTemplate jmsTemplate;/*** 发送消息*/public void sendMessage(){
    jmsTemplate.send(new MessageCreator() {
    public Message createMessage(Session session) throws JMSException {
    return session.createTextMessage("Hello-Spring-ActiveMQ");}});}
}

4、创建消息接受类

@Service
public class MessageReciveService {
    @AutowiredJmsTemplate jmsTemplate;public void reciveMessage(){
    //同步接收,阻塞方法,没有接收到消息就等待,接到了消息就返回Message message = jmsTemplate.receive();if(message instanceof TextMessage){
    try {
    String text = ((TextMessage) message).getText();System.out.println("接收到的消息为:" + text);} catch (JMSException e) {
    e.printStackTrace();}}}
}

5、测试类

public class MessageSendTest {
    public static void main(String[] args) {
    //加载spring容器ApplicationContext ac = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");//获得serviceBean对象MessageSendService messageSendService = (MessageSendService) ac.getBean("messageSendService");//调用发消息的方法messageSendService.sendMessage();}
}

18.2 点对点异步接收

1、创建maven工程,加入相关依赖(同上)
2、创建spring配置文件(同上)
3、创建jms配置文件

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://www.springframework.org/schema/beanshttp://www.springframework.org/schema/beans/spring-beans.xsd"><!-- 配置一个连接工厂 --><bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://192.168.187.128:61616"/></bean><!-- 配置JmsTemplate 无法实现异步接收 --><!--<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="connectionFactory" /><property name="defaultDestinationName" value="springQueue" /></bean>--><!--在spring下集成jms 与spring集成redis 非常相似--><!--实现异步接收消息--><!-- 配置一个sping监听器的容器 --><bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory"/><property name="destinationName" value="springQueue"/><!--消息监听器--><property name="messageListener" ref="myMessageListener" /></bean><!--自定义的一个消息监听器--><bean id="myMessageListener" class="com.activemq.listener.MyMessageListener"/></beans>

4、创建消息监听器

public class MyMessageListener implements MessageListener {
    /*** 是spring中的监听器容器监听到消息后自动回调该onMessage方法,并且将监听到的消息传给该方法** @param message*/public void onMessage(Message message) {
    if (message instanceof TextMessage) {
    String text = null;try {
    text = ((TextMessage) message).getText();} catch (JMSException e) {
    e.printStackTrace();}//可以进行业务的处理(省略业务处理,比如插入数据库,更新状态等都属于业务处理)System.out.println("异步->接收到的消息为:" + text);}}
}

5、发送与接收测试

18.3 发布订阅同步接收

<!-- 配置JmsTemplate --><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="connectionFactory" /><property name="defaultDestinationName" value="mySpringQueue" /><!--订阅模式 默认 为false(点对点)--><property name="pubSubDomain" value="true"/></bean>

18.4 发布订阅异步接收

<!--实现异步接收消息--><!-- 配置一个sping监听器的容器 --><bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="connectionFactory"/><property name="destinationName" value="springQueue"/><!--消息监听器--><property name="messageListener" ref="myMessageListener" /><!--false为点对点--><property name="pubSubDomain" value="true"/></bean>

19 Spring boot 集成 ActiveMQ

19.1 点对点同步接收

1、创建Springboot项目添加相关依赖

<!--springboot集成activemq的起步依赖-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId>
</dependency>

2、配置springboot核心配置文件

#activemq的连接地址
spring.activemq.broker-url=tcp://192.168.64.128:61616#消息的目的地
spring.jms.template.default-destination=springbootQueue#退出main方法的主程序
spring.jms.cache.enabled=fals

3、消息发送类

@Service
public class SendMessageService {
    /*** JmsTemplate是springboot框架自动帮我们配置的,我们可以直接注入使用** springboot自动配置前提条件:* 1、加入spring-jms,activemq相关的jar包依赖* 2、application.properties配置了相关连接信息*/@Autowiredprivate JmsTemplate jmsTemplate;/*** 发送消息**/public void sendMessage () {
    jmsTemplate.send(new MessageCreator() {
    public Message createMessage(Session session) throws JMSException {
    //创建消息return session.createTextMessage("Hello Spring ActiveMQ....");}});}
}

4、消息接收类

@Service
public class ReceiveMessageService {
    @Autowiredprivate JmsTemplate jmsTemplate;/*** 接收消息**/public void receiveMessage () {
    //同步接收Message message = jmsTemplate.receive();if (message instanceof TextMessage) {
    try {
    String text = ((TextMessage) message).getText();System.out.println("接收到的消息是:" + text);} catch (JMSException e) {
    e.printStackTrace();}}}
}

5、测试类

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
    //run方法运行后会启动一个spring容器ApplicationContext context = SpringApplication.run(Application.class, args);//从spring容器中获取发送消息的service的beanSendMessageService sms = context.getBean("sendMessageService", SendMessageService.class);//调用bean中的发送消息的方法实现消息发送sms.sendMessage();//接收消息的service//ReceiveMessageService rms = context.getBean("receiveMessageService", ReceiveMessageService.class);//接收消息//rms.receiveMessage();}
}

19.2 点对点异步接收

1、第一种实现方式

@Component
public class MessageReceiveService {
    /*** 实现异步接收消息* JmsTemplate无法实现异步接收* 使用@JmsListener注解的方法,就是一个监听器方法,可以实现消息的异步接收*/@JmsListener(destination="${spring.jms.template.default-destination}")public void receiveMessage (Message message) {
    if (message instanceof TextMessage) {
    try {
    String text = ((TextMessage) message).getText();System.out.println("@JmsListener注解实现异步接收消息:" + text);} catch (JMSException e) {
    e.printStackTrace();}}}
}

2、第二种方式

@Component
public class MyMessageListener implements MessageListener {
    /*** 是spring中的监听器容器监听到消息后自动回调该onMessage方法,并且将监听到的消息传给该方法** @param message*/public void onMessage(Message message) {
    if (message instanceof TextMessage) {
    String text = null;try {
    text = ((TextMessage) message).getText();} catch (JMSException e) {
    e.printStackTrace();}//可以进行业务的处理(省略业务处理,比如插入数据库,更新状态等都属于业务处理)System.out.println("springboot异步->接收到的消息为:" + text);}}
}

2、配置类

@Configuration // 该注解相当于是原来一个spring的xml配置文件
public class ActiveMQConfig {
    /*** <!-- 配置一个连接工厂 -->* <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">* <property name="brokerURL" value="tcp://192.168.187.128:61616"/>* </bean>** 该bean的配置在springboot框架下是自动配置好的,不需要我们手动配置*/@Autowiredprivate ConnectionFactory connectionFactory;@Value("${spring.jms.template.default-destination}")private String destination;@Autowiredprivate MyMessageListener myMessageListener;/*** <!--实现异步接收消息-->** <!-- 配置一个sping监听器的容器 -->* <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">* <property name="connectionFactory" ref="connectionFactory"/>* <property name="destinationName" value="springTopic"/>* <!--消息监听器-->* <property name="messageListener" ref="myMessageListener" />* <!--pubSubDomain=false表示点对点消息,true表示发布订阅消息,默认是false-->* <property name="pubSubDomain" value="true"/>* </bean>*/@Beanpublic DefaultMessageListenerContainer defaultMessageListenerContainer () {
    DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();defaultMessageListenerContainer.setConnectionFactory(connectionFactory);defaultMessageListenerContainer.setDestinationName(destination);defaultMessageListenerContainer.setMessageListener(myMessageListener);return defaultMessageListenerContainer;}/*** <!--自定义的一个消息监听器-->* <bean id="myMessageListener" class="com.activemq.listener.MyMessageListener"/>** 该配置通过注解已经实现了*/
}

19.3 发布订阅同步接收

#activemq的连接地址
spring.activemq.broker-url=tcp://192.168.64.128:61616#消息的目的地
spring.jms.template.default-destination=springbootQueue#退出main方法的主程序
spring.jms.cache.enabled=fals#开启订阅模式,默认是点对点
spring.jms.pub-sub-domain=true

19.4 发布订阅异步接收

```java
@Component
public class MyMessageListener implements MessageListener {
    /*** 是spring中的监听器容器监听到消息后自动回调该onMessage方法,并且将监听到的消息传给该方法** @param message*/public void onMessage(Message message) {
    if (message instanceof TextMessage) {
    String text = null;try {
    text = ((TextMessage) message).getText();} catch (JMSException e) {
    e.printStackTrace();}//可以进行业务的处理(省略业务处理,比如插入数据库,更新状态等都属于业务处理)System.out.println("springboot异步->接收到的消息为:" + text);}}
}

2、配置类

@Configuration // 该注解相当于是原来一个spring的xml配置文件
public class ActiveMQConfig {
    /*** <!-- 配置一个连接工厂 -->* <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">* <property name="brokerURL" value="tcp://192.168.187.128:61616"/>* </bean>** 该bean的配置在springboot框架下是自动配置好的,不需要我们手动配置*/@Autowiredprivate ConnectionFactory connectionFactory;@Value("${spring.jms.template.default-destination}")private String destination;@Value("${spring.jms.pub-sub-domain}")private boolean pubSubDomain;@Autowiredprivate MyMessageListener myMessageListener;/*** <!--实现异步接收消息-->** <!-- 配置一个sping监听器的容器 -->* <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">* <property name="connectionFactory" ref="connectionFactory"/>* <property name="destinationName" value="springTopic"/>* <!--消息监听器-->* <property name="messageListener" ref="myMessageListener" />* <!--pubSubDomain=false表示点对点消息,true表示发布订阅消息,默认是false-->* <property name="pubSubDomain" value="true"/>* </bean>*/@Beanpublic DefaultMessageListenerContainer defaultMessageListenerContainer () {
    DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();defaultMessageListenerContainer.setConnectionFactory(connectionFactory);defaultMessageListenerContainer.setDestinationName(destination);defaultMessageListenerContainer.setMessageListener(myMessageListener);//发布订阅模式true,点对点模式falsedefaultMessageListenerContainer.setPubSubDomain(pubSubDomain);return defaultMessageListenerContainer;}/*** <!--自定义的一个消息监听器-->* <bean id="myMessageListener" class="com.activemq.listener.MyMessageListener"/>** 该配置通过注解已经实现了*/
}
  相关解决方案