activemq的使用
Posted lin-nest
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了activemq的使用相关的知识,希望对你有一定的参考价值。
1. activemq的使用(点对点)
(1) 创建父工程(pom工程),定义activemq的版本号
<activemq.version>5.13.4</activemq.version>
(2) 创建子工程(war工程),引入activemq依赖
<dependencies> <!-- activemq --> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-client</artifactId> </dependency> </dependencies>
(3) 创建producer(生产者)
package com.demo.activemq.producer.service; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class ActivemqProducer { public static void main(String[] args) throws JMSException { //1.创建连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建队列对象 Queue queue = session.createQueue("test-queue"); //6.创建消息生产者 MessageProducer producer = session.createProducer(queue); //7.创建消息 TextMessage textMessage = session.createTextMessage("欢迎来到神奇的activemq世界"); //8.发送消息 producer.send(textMessage); System.out.println("ActivemqProducer发送了消息"); //9.关闭资源 producer.close(); session.close(); connection.close(); } }
(4) 创建consumer(消费者)
package com.demo.activemq.producer.service; import java.io.IOException; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class ActivemqConsumer { public static void main(String[] args) throws JMSException, IOException { //1.创建连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://127.0.0.1:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建队列对象 Queue queue = session.createQueue("test-queue"); //6.创建消息消费 MessageConsumer consumer = session.createConsumer(queue); System.out.println("消费者正在运行"); //7.监听消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待键盘输入 System.in.read(); //9.关闭资源 consumer.close(); session.close(); connection.close(); } }
(5) 先运行consumer(消费者)
(6) 运行producer(生产者)
(7) 消费者运行结果
2. activemq的使用(订阅)
(1) 创建生产者
package com.demo.activemq.producer.service; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class ActivemqTopicProducer { public static void main(String[] args) throws JMSException { //1.创建连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://120.79.92.203:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建主题对象 Topic topic = session.createTopic("test-topic"); //6.创建消息生产者 MessageProducer producer = session.createProducer(topic); //7.创建消息 TextMessage textMessage = session.createTextMessage("欢迎来到神奇的activemqTopic世界"); //8.发送消息 producer.send(textMessage); System.out.println("ActivemqTopicProducer发送了消息"); //9.关闭资源 producer.close(); session.close(); connection.close(); } }
(2) 创建topic消费者1
package com.demo.activemq.producer.service; import java.io.IOException; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class ActivemqTopicConsumer1 { public static void main(String[] args) throws JMSException, IOException { //1.创建连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://120.79.92.203:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建主题对象 Topic topic = session.createTopic("test-topic"); //6.创建消息消费 MessageConsumer consumer = session.createConsumer(topic); System.out.println("消费者1正在运行"); //7.监听消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待键盘输入 System.in.read(); //9.关闭资源 consumer.close(); session.close(); connection.close(); } }
(3) 创建topic消费者2
package com.demo.activemq.producer.service; import java.io.IOException; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; public class ActivemqTopicConsumer2 { public static void main(String[] args) throws JMSException, IOException { //1.创建连接工厂 ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://120.79.92.203:61616"); //2.获取连接 Connection connection = connectionFactory.createConnection(); //3.启动连接 connection.start(); //4.获取session (参数1:是否启动事务,参数2:消息确认模式) Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //5.创建主题对象 Topic topic = session.createTopic("test-topic"); //6.创建消息消费 MessageConsumer consumer = session.createConsumer(topic); System.out.println("消费者2正在运行"); //7.监听消息 consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { TextMessage textMessage=(TextMessage)message; try { System.out.println("接收到消息:"+textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); //8.等待键盘输入 System.in.read(); //9.关闭资源 consumer.close(); session.close(); connection.close(); } }
(4) 运行,结果略
以上是关于activemq的使用的主要内容,如果未能解决你的问题,请参考以下文章