1.为什么要用事务?
消息事务是在生产者producer到broker或broker到consumer过程中同一个session中发生的,保证几条消息在发送过程中的原子性。
可以在connection的createSession方法中指定一个布尔值开启,如果消息确认机制是事务确认,那么在发送message的过程中session就会开启事务(实际上broker的),不用用户显示调用 beginTransaction,这时所有通过session发送的消息都被缓存下来,用户调用session.commit时会发送所有消息,当发送出现异常时用户可以调用rollback进行回滚操作,只有在开启事务状态下有效。
为什么commit之后,不会有持久的消息重新传送呢?
原因在于commit操作会自动将为签收确认的消息进行签收确认,如果是当前接收但未签收确认的消息,都会被确认处理。因而在commit之后不会有持久化的消息出现。
2.activeMQ支持的事务:
ActiveMQ有支持两种事务,
- JMS transactions - the commit() / rollback() methods on a Session (which is like doing commit() / rollback() on a JDBC connection)
- XA Transactions - where the XASession acts as an XAResource by communicating with the Message Broker, rather like a JDBC Connection takes place in an XA transaction by communicating with the database.
在支持事务的session中,producer发送message时在message中带有transaction ID。broker收到message后判断是否有transaction ID,如果有就把message保存在transaction store中,等待commit或者rollback消息。所以ActiveMq的事务是针对broker而不是producer的,不管session是否commit,broker都会收到message。
如果producer发送模式选择了persistent,那么message过期后会进入死亡队列。在message进入死亡队列之前,ActiveMQ会删除message中的transaction ID,这样过期的message就不在事务中了,不会保存在transaction store中,会直接进入死亡队列。具体删除transaction ID的地方是在:org.apache.activemq.util.BrokerSupport的doResend,将transaction ID保存在了originalTransactionID中,删除了transaction ID。
在下面的介绍中我用的是JMS transactions.
JMS transactions事务的配置:
①建立JMS事务,并引入关联链接事务。
②.设置一个jmsTamplat,并关联监听容器。
-
- <bean id="jmsTransactionManager"
- class="org.springframework.jms.connection.JmsTransactionManager">
- <property name="connectionFactory" ref="connectionFactory" />
- </bean>
- <tx:annotation-driven transaction-manager="jmsTransactionManager" />
-
-
-
- <bean id="jmsContainerOne" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
- <property name="connectionFactory" ref="connectionFactory" />
- <property name="destination" ref="destinationOne" />
- <property name="messageListener" ref="consumerMessageListenerOfOne" />
-
-
- <property name="transactionManager" ref="jmsTransactionManager" />
- <property name="sessionAcknowledgeMode" value="4"></property>
-
- <property name="concurrency" value="2-3" />
- </bean>
上面配置文件配置完成后,在接收着那边接受消息失败后,进行事务回滚。
具体实现:
- public void onMessage(Message message, Session session) {
- TextMessage textMsg = (TextMessage) message;
- try {
- System.out.println(1);
- String endStr = textMsg.getText();
- Integer endInt = Integer.parseInt(endStr);
- System.out.println("消息:==="+endInt);
- //只要被确认后 就会出队,接受失败没有确认成功,会在原队列里面
- textMsg.acknowledge();
-
- } catch (Exception e) {
- try {
- session.rollback();
- System.out.println("测试回滚");
- e.printStackTrace();
- System.out.println("异常信息是:===:" + e.getMessage());
- }
- }
这就介绍完了。
另外,activeMQ还有一种JtaTransactionManager 事务控制。
事务控制: (这里让接收消息和数据库访问处于同一事务中)我们就可以配置一个外部的事务管理同时配置一个支持外部事务管理的消息监听容器(如DefaultMessageListenerContainer) 。要配置这样一个参与分布式事务管理的消息监听容器,我们可以配置一个JtaTransactionManager,当然底层的JMS ConnectionFactory需要能够支持分布式事务管理, 并正确地注册我们的JtaTransactionManager。这样消息监听器进行消息接收和对应的数据库访问就会处于同一数据库控制下,当消息接收失败或数据库访问失败都会进行事务回滚操作。
这种事务控制,我在配置的时候,失败了,所以选用了JMS事务控制。
待我详细了解JtaTransactionManager 后,再说吧。
原文地址:http://blog.csdn.net/dly1580854879/article/details/68945997