当前位置: 代码迷 >> 综合 >> ActiveMQ--与Spring和Springboot的整合
  详细解决方案

ActiveMQ--与Spring和Springboot的整合

热度:51   发布时间:2023-09-06 11:54:03.0

一、整合Spring

首先导入pom文件

<properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><spring-version>4.3.7.RELEASE</spring-version></properties>
<dependencies><!--  activeMQ  jms 的支持  --><dependency><groupId>org.springframework</groupId><artifactId>spring-jms</artifactId><version>4.3.23.RELEASE</version></dependency><dependency>    <!--  pool 池化包相关的支持  --><groupId>org.apache.activemq</groupId><artifactId>activemq-pool</artifactId><version>5.15.9</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-beans</artifactId><version>${spring-version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context</artifactId><version>${spring-version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-context-support</artifactId><version>${spring-version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-core</artifactId><version>${spring-version}</version></dependency><dependency><groupId>org.springframework</groupId><artifactId>spring-expression</artifactId><version>${spring-version}</version></dependency><!-- AOP --><!-- 基于代理的AOP支持 --><dependency><groupId>org.springframework</groupId><artifactId>spring-aop</artifactId><version>${spring-version}</version></dependency><!-- 提供与AspectJ的集成 --><dependency><groupId>org.springframework</groupId><artifactId>spring-aspects</artifactId><version>${spring-version}</version></dependency></dependencies>

applicationcontext.xml文件

<!--    注解扫描--><context:component-scan base-package="com.xijian.spring"></context:component-scan>
<!--    配置连接工厂--><bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory"  destroy-method="stop"><property name="connectionFactory"><bean class="org.apache.activemq.ActiveMQConnectionFactory"><property name="brokerURL" value="tcp://192.168.6.10:61616"></property></bean></property><property name="maxConnections" value="100"></property></bean><!-- 队列目的地 --><bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue"><constructor-arg index="0" value="spring-active-queue"></constructor-arg></bean>
<!--    主题目的地--><bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic"><constructor-arg index="0" value="spring-active-topic"></constructor-arg></bean><!--  jms 的工具类,需要用哪种目的地将其配置在defaultDestination中即可 --><bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"><property name="connectionFactory" ref="jmsFactory"/><property name="defaultDestination" ref="destinationQueue"/><property name="messageConverter"><bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/></property></bean>

生产者代码

@Service
public class SpringProducer {@Autowiredprivate JmsTemplate jmsTemplate;public static void main(String[] args) {//加载Spring配置文件ClassPathXmlApplicationContext ac = new ClassPathXmlApplicationContext("applicationcontext.xml");//获取容器中的SpringProduceSpringProducer springProducer = (SpringProducer) ac.getBean("springProducer");springProducer.jmsTemplate.send(new MessageCreator() {@Override//消息创建public Message createMessage(Session session) throws JMSException {TextMessage textMessage = session.createTextMessage("spring 和activeMQ的整合");return textMessage;}});System.out.println("生产者生产消息完毕");}
}

消费者代码

@Service
public class SpringConsumer {@Autowiredprivate JmsTemplate jmsTemplate;public static void main(String[] args) {ClassPathXmlApplicationContext ac = new ClassPathXmlApplicationContext("applicationcontext.xml");SpringConsumer springConsumer = (SpringConsumer)ac.getBean("springConsumer");//这里spring会将消息类型自动转换String str = (String)springConsumer.jmsTemplate.receiveAndConvert();System.out.println(str);}
}

在Spring中如果消费者通过监听模式进行消息的消费,不将消费者线程启动也可以进行消费,需要在spring配置文件中另行配置。

!--配置监听程序--><bean id="jsmContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"><property name="connectionFactory" ref="jmsFactory"></property><property name="destination" ref="destinationTopic"></property><property name="messageListener" ref="myMessageListener"></property></bean>

其中myMessageListener为自己写的监听类,需要继承MessageListener

@Component
public class myMessageListener  implements MessageListener {@Overridepublic void onMessage(Message message) {if(message != null && message instanceof TextMessage){TextMessage textMessage = (TextMessage) message;try {System.out.println(textMessage.getText());} catch (JMSException e) {e.printStackTrace();}}}
}

配置监听器后,启动服务器后立刻消费。

二、整合Springboot

由于queue与topic方式在使用springboot的时候只有配置文件中的内容不一样,其他无差别,所以下文以queue为例子,讲解都在注释中阐明。

1.创建一个Springboot工程,我这里使用的Spring initializer创建的,导入了ActiveMQ5的启动器

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.2.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.xijian</groupId><artifactId>springboot_activemq</artifactId><version>0.0.1-SNAPSHOT</version><name>springboot_activemq</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-activemq</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope><exclusions><exclusion><groupId>org.junit.vintage</groupId><artifactId>junit-vintage-engine</artifactId></exclusion></exclusions></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

2.配置application.properties配置文件

server.port=7777#连接ActiveMQ服务器
spring.activemq.broker-url=tcp://192.168.6.10:61616
spring.activemq.user=admin
spring.activemq.password=admin#不启用topic模式,不配置默认也是false,需要使用topic模式时再启动
spring.jms.pub-sub-domain=false#队列的名字,如需要使用topic则自行修改即可
myqueue=boot-activemq-queue

3.Spring的主启动类不做过多描述,但加了一个@EnableScheduling是为了开启定时任务,在生产者生产消息的时候为了测试方便,所以设置了每三秒生产一条消息。

package com.xijian.springboot_activemq;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;@SpringBootApplication
//开启定时任务功能
@EnableScheduling
public class SpringbootActivemqApplication {public static void main(String[] args) {SpringApplication.run(SpringbootActivemqApplication.class, args);}}

4.写一个配置类,用于注入目的地(queue/topic)对象

//从配置文件中读取属性,并将要操作的目的地注入到容器中
@Component
//开启Jms支持
@EnableJms
public class ConfigBean {//获取配置文件中的myqueue的值@Value("${myqueue}")private String queue;//将目的地对象注入到容器中@Beanpublic ActiveMQQueue queue() throws Exception {return new ActiveMQQueue(queue);}
}

5.生产者

@Component
public class Producer {//注入Springboot提供的操作消息队列的工具类@Autowiredprivate JmsMessagingTemplate jmsMessagingTemplate;//注入目的地对象@Autowiredprivate ActiveMQQueue activeMQQueue;//每三秒发送一条消息@Scheduled(fixedDelay = 3000)public void produce(){jmsMessagingTemplate.convertAndSend(activeMQQueue,"******:"+ UUID.randomUUID().toString().substring(0,6));System.out.println("success");}}

6.消费者,使用监听器去监听消息。

@Component
public class Consumer {//启用监听模式消费消息@JmsListener(destination = "${myqueue}")public void recieve(TextMessage textMessage) throws  Exception{System.out.println("收到消息:"+textMessage.getText());}
}

7.测试,将Springboot的启动类启动,每隔三秒生产者向队列中生产一条消息,消费者监听到就消费。

ActiveMQ--与Spring和Springboot的整合

 

  相关解决方案