activemq的使用
Posted liuyi13535496566
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了activemq的使用相关的知识,希望对你有一定的参考价值。
一.常用的消息队列:
1 activemq java,apache
2 rabbitmq c的mq
3 kafuka 大数据mq
4 zeromq 简单版的mq
5 mateMq 基于amqp
6 RocketMQ 阿里
二.mq的使用
1 解压和启动mq
activeMq start
三 .mq的角色
producer消息的发送者
Comsumer消息的消费者
queue方式; 把消息发给activemq服务器,消费端监听到只要有一个执行完成其他就不会再执行了.
topic方式: 把消息发给activemq服务器,消费端监听到都会执行.
四: 加入Pom依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
五:整合activemq客户端到项目中
@Configuration
public class ActiveMQConfig {
@Value("${spring.activemq.broker-url:disabled}")
String brokerURL ;
@Value("${activemq.listener.enable:disabled}")
String listenerEnable;
@Bean
public ActiveMQUtil getActiveMQUtil() throws JMSException {
if(brokerURL.equals("disabled")){
return null;
}
ActiveMQUtil activeMQUtil=new ActiveMQUtil();
activeMQUtil.init(brokerURL);
return activeMQUtil;
}
//定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
@Bean(name = "jmsQueueListener")
public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory ) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
if(!listenerEnable.equals("true")){
return null;
}
factory.setConnectionFactory(activeMQConnectionFactory);
//设置并发数
factory.setConcurrency("5");
//重连间隔时间
factory.setRecoveryInterval(5000L);
factory.setSessionTransacted(false);
factory.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
return factory;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory ( ){
/* if((url==null||url.equals(""))&&!brokerURL.equals("disabled")){
url=brokerURL;
}*/
ActiveMQConnectionFactory activeMQConnectionFactory =
new ActiveMQConnectionFactory( brokerURL);
return activeMQConnectionFactory;
}
}
public class ActiveMQUtil {
PooledConnectionFactory pooledConnectionFactory=null;
public ConnectionFactory init(String brokerUrl) {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);
//加入连接池
pooledConnectionFactory=new PooledConnectionFactory(factory);
//出现异常时重新连接
pooledConnectionFactory.setReconnectOnException(true);
//
pooledConnectionFactory.setMaxConnections(5);
pooledConnectionFactory.setExpiryTimeout(10000);
return pooledConnectionFactory;
}
public ConnectionFactory getConnectionFactory(){
return pooledConnectionFactory;
}
}
例子:
支付controller 层支付成功后,发送系统消息 =》 更新订单状态 锁定商品库存 物流订单等等,例如在支付的service层发送消息告诉订单需要更改状态, 然后在订单服务里面写component注解,里面实现监听更改业务即可.
@RequestMapping("alipay/callback/return")
public String alipay_callback(Model model, HttpServletRequest request){
String alipay_trade_no = request.getParameter("trade_no");//支付宝的交易单号
String order_sn = request.getParameter("out_trade_no");// 外部订单号total_amount
String pay_amount = request.getParameter("total_amount");
// 更新支付信息
PaymentInfo paymentInfo = new PaymentInfo();
// 交易单号
// 支付状态
String payment_status = "已支付";
// 回调内容
String callback_content = request.getQueryString();
// 回调时间
Date callback_time = new Date();
paymentInfo.setOrderSn(order_sn);
paymentInfo.setPaymentStatus(payment_status);
paymentInfo.setCallbackTime(callback_time);
paymentInfo.setAlipayTradeNo(alipay_trade_no);
paymentInfo.setCallbackContent(callback_content);
paymentInfo.setTotalAmount(new BigDecimal(pay_amount));
paymentService.updatePayment(paymentInfo);
// 发送系统消息 =》 更新订单状态 锁定商品库存 物流订单等等
paymentService.sendPaymentResult(paymentInfo);
return "finish";
}
@Autowired
ActiveMQUtil activeMQUtil;
@Override
public void sendPaymentResult(PaymentInfo paymentInfo) {
ConnectionFactory connectionFactory = activeMQUtil.getConnectionFactory();
Connection connection = null;
Session session = null;// 开启消息事务
Queue paymentResultQueue = null; // 队列
MessageProducer producer = null;
try {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
paymentResultQueue = session.createQueue("PAYMENT_SUCCESS_QUEUE");
//text文本格式,map键值格式
MapMessage mapMessage=new ActiveMQMapMessage();
mapMessage.setString("out_trade_no",paymentInfo.getOrderSn());
mapMessage.setDouble("pay_amount",paymentInfo.getTotalAmount().doubleValue());
producer = session.createProducer(paymentResultQueue);// 消息的生成者
producer.send(mapMessage);
session.commit();
} catch (JMSException e) {
e.printStackTrace();
}finally {
try {
producer.close();
session.close();
connection.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
@Component
public class OrderConsumer {
@Autowired
OrderService orderService;
@JmsListener(containerFactory = "jmsQueueListener",destination = "PAYMENT_SUCCESS_QUEUE")
public void consumePaymentSuccess(MapMessage mapMessage) throws JMSException {
String out_trade_no = mapMessage.getString("out_trade_no");
Double pay_amount = mapMessage.getDouble("pay_amount");
// 根据支付状态,更新订单信息
OmsOrder omsOrder = new OmsOrder();
omsOrder.setPayAmount(new BigDecimal(pay_amount));
omsOrder.setPaymentTime(new Date());
omsOrder.setOrderSn(out_trade_no);
omsOrder.setStatus("1");
orderService.updateOrder(omsOrder);
System.out.println("已监听到"+out_trade_no+"号订单,订单消费PAYMENT_SUCCESS_QUEUE队列");
}
}
以上是关于activemq的使用的主要内容,如果未能解决你的问题,请参考以下文章