Spring整合activeMQ消息队列
Posted 依米欧
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring整合activeMQ消息队列相关的知识,希望对你有一定的参考价值。
1.配置JMS
<!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="connectionFactory"/> </bean> <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供--> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://localhost:61616"/> </bean> <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory --> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory --> <property name="targetConnectionFactory" ref="targetConnectionFactory"/> </bean>
发送信息到activeMQ
@Override public void addNotifyCashToMq(final String notifyUrl, final String cashId, final String reqSn, final String callResult,int count) { //发送的参数final String callBackUrl = SuperAppConstant.TRANSACTION_CALLBACK_PREFIX_URL + notify_url_notifyCash + notifyUrl + "&cashId=" + cashId + "&reqSn=" + reqSn + "&callResult=" + callResult + "&count=" + _count;
//发送消息到queue_notifuCash_serial消息队列 jmsTemplate.send(queue_notifyCash_serial, new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { if (logger.isDebugEnabled()) { logger.debug("notifyUrl=" + notifyUrl + ",cashId=" + cashId + ",reqSn=" + reqSn + ",callResult=" + callResult + ",_count=" + _count); } HashMap map = new HashMap(); map.put("callBackUrl", callBackUrl); ObjectMessage objectMessage = session.createObjectMessage();//创建消息 objectMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);//延时,delay为延时时长,以毫秒为单位
return objectMessage; } }); }
xml配置信息
<!-- ActiveMQ 连接工厂 --> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${jms.broker_url}" /> </bean> <!-- Spring Caching 连接工厂 --> <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="connectionFactory" /> <property name="sessionCacheSize" value="10" /> </bean> <!-- Spring JMS Template --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="cachingConnectionFactory" /> </bean>
2.destination消息队列定义
<description>Queue定义</description> <bean id="queue_callback_serial" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>queue_callback_serial</value> </constructor-arg> </bean>
3。监听器BatchJob
3.1 jms.xml
<description>JMS简单应用配置</description> <!-- ActiveMQ 连接工厂 --> <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="${jms.broker_url}" /> </bean> <!-- Spring Caching 连接工厂 --> <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory"> <property name="targetConnectionFactory" ref="connectionFactory" /> <property name="sessionCacheSize" value="10" /> </bean> <!-- Queue定义 --> <bean id="orderQueueProducer" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg value="order.queue.producer" /> </bean> <!-- Spring JMS Template --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="cachingConnectionFactory" /> <property name="defaultDestination" ref="orderQueueProducer" /> </bean> <!-- 使用Spring JmsTemplate的消息生产者 --> <bean id="orderProducerJmsService" class="com.gmall88.server.jms.order.impl.OrderProducerJmsServiceImpl"> <property name="jmsTemplate" ref="jmsTemplate" /> </bean> <!-- 定义消息队列 --> <bean id="orderQueueListener" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg> <value>order.queue.listener</value> </constructor-arg> </bean>
3.2 监听器impl
import java.util.Map; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.ObjectMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.gmall88.server.wxpay.RF; import net.sf.json.JSONObject; public class NotifyCashManagerImpl implements MessageListener { private Logger logger = LoggerFactory.getLogger(getClass()); @Override public void onMessage(Message message) { if(logger.isDebugEnabled()){ logger.debug("new callback start.."); } if(message !=null){ if(message instanceof ObjectMessage){ ObjectMessage objectMessage = (ObjectMessage) message;//监听消息 try { Map param = (Map)objectMessage.getObject(); String callBackUrl = (String)param.get("callBackUrl");//取出消息里的参数 if (logger.isInfoEnabled()) { logger.info("callBackUrl=" + callBackUrl); } JSONObject jsonObject = RF.httpsRequestJson(callBackUrl, "POST", "");//通过http回调方法 if(jsonObject != null){ logger.info("code:"+jsonObject.getString("code")); logger.info("message="+jsonObject.getString("message")); } } catch (Exception e) { logger.error(e.getMessage(),e); } }else{ logger.error("Unknown message, type=" + message.getClass().getName()); } }else{ logger.error("message is null"); } } }
回调方法:
@RequestMapping(value = "/notifyCash", method = RequestMethod.POST) @ResponseBody public Object notifyCash(String notifyUrl, String cashId, String reqSn, String cashResult,int count) { ReturnResult returnResult = new ReturnResult(); String clientId = "superApp_notifyOrder"; try { clientId += cashId; returnResult = recordRequestCheck(clientId); if(returnResult != null){ return returnResult; } returnResult = new ReturnResult(); try{ // 回调业务系统 try { superAppServerManager.notifyCash(notifyUrl, cashId, reqSn, cashResult); } catch (Exception e) { // 回调失败,做延时回调 logger.error(e.getMessage(), e); superAppServerManager.addNotifyCashToMq(notifyUrl, cashId, reqSn, cashResult, count); } }finally{ recordRequestEnd(clientId); } } catch (GmallException e) { returnResult.setCodeNum(e.getCode()); returnResult.setMessage(e.getMessage()); } catch (Exception e) { logger.error(e.getMessage(), e); returnResult.setCode(ReturnCodeType.FAILURE) .setMessage(e.getMessage()); } logger.info("called.."); return returnResult; }
整理了一下整个流程如图所示:
4.ActiveMq持久化(这里只考虑持久化为mysql方式)
把mysql的驱动方法ActiveMQ的lib文件下,如:mysql-connector-java-5.0.4-bin.jar
配置文件添加:
<persistenceAdapter> <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/> </persistenceAdapter>
ActiveMq连接数据库相关配置
<bean id="derby-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="activemq"/> <property name="password" value="activemq"/> <property name="maxActive" value="200"/> <property name="poolPreparedStatements" value="true"/> </bean>
以上是关于Spring整合activeMQ消息队列的主要内容,如果未能解决你的问题,请参考以下文章
框架篇——Spring整合ActiveMQ(MQ服务端与消费端演示)