ActiveMQ使用
Posted lzylcf
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ使用相关的知识,希望对你有一定的参考价值。
ActiveMQ介绍
Active是一种消息中间件,有两种模式,一种点对点模式 发布者将发布的消息发送给服务器,等待用户监听并接受数据;第二种订阅模式 发布者将消息发布给消息服务器,让服务器将所有的数据直接转发给再监听的用户,进行一对多通信(类似微信公众号)。
点对点模式:
发布者发布8条信息,这时有3个用户在监听服务器消息,则3个用户共同消费这8条消息。服务器中的每条消息只能被一个用户消费,这种模式服务器会存储发布者发布的数据,当未被用户接收的数据则会留在服务器中,等待下个监听服务器的用户接收数据。
订阅模式(持久订阅模式/非持久订阅模式):
发布者发布消息给消息服务器,消息服务器则将消息直接转发给监听的用户,这要求发布者发布消息的同时用户也在监听消息,若没有用户监听, 则不保留数据,认为数据已发送完成。也就是发布者发布时,用户没在监听消息,则不会在收到该数据。即使用户以后再监听也接收不到
持久订阅模式:订阅者会注册一个clientId,当订阅者离线时,ActiveMQ会为这个 ID 保存所有拥有这个ID的主题的消息,当订阅者连接时,则会通过自己的clientId得到所有自己处于离线时所要接收主题消息
非持久订阅模式:只有当订阅者处于连接状态才会接收到发布者发布出来的消息,并且发送完成后ActiveMQ则将消息丢弃。
在程序中使用ActiveMQ
从官网中下载activeMQ,下载地址:http://activemq.apache.org/download.html
解压后,打开目录下的bin,根据自己的系统选择win32或win64安装Active服务,并开启activeMQ
开启后浏览器访问该地址:http://127.0.0.1:8161/,选择 Manage ActiveMQ broker,输入账号密码,默认都是admin
创建一个maven项目,在pom.xml文件中引入jar包
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency>
点对点模式
创建一个消费者和一个生产者的类
Consumer.java 消费者通过消息监听器监听服务器上的信息
package cn.lcf.activeMQ; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * Hello world! * */ public class Consumer { //设置连接地址 private static final String url = "tcp://127.0.0.1:61616"; //设置消息队列名称 private static final String queueName = "queue-text"; public static void main(String[] args) throws JMSException { // 1、创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); // 2、创建连接对象 Connection createConnection = connectionFactory.createConnection(); // 3、启动连接 createConnection.start(); // 4、创建会话 createSession第一个参数表示是否支持事务,第二个参数是客户端接收确认模式,Session.AUTO_ACKNOWLEDGE是自动确认,Session.CLIENT_ACKNOWLEDGE 客户端通过调用消息的 acknowledge 方法签收消息。 Session createSession = createConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5、创建消息目标 Queue createQueue = createSession.createQueue(queueName); // 6 、创建消费者 MessageConsumer createConsumer = createSession.createConsumer(createQueue); // 7、设置消费者监听 createConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接收的消息为" + textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); } }
producer.java 生产者跟消费者差不多,这是第6步开始,变成创建生产者,并发送消息,发送完成之后需要关闭连接。
package cn.lcf.activeMQ; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; /** * Hello world! * */ public class Producer { private static final String url = "tcp://127.0.0.1:61616"; private static final String queueName = "queue-text"; public static void main( String[] args ) throws JMSException { // 1、创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); // 2、创建连接对象 Connection createConnection = connectionFactory.createConnection(); // 3、启动连接 createConnection.start(); // 4、创建会话 Session createSession = createConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5、创建消息目标 Queue createQueue = createSession.createQueue(queueName); // 6、创建生产者 MessageProducer createProducer = createSession.createProducer(createQueue); for (int i=0;i<100;++i) { // 7、创建消息 TextMessage textMessage = createSession.createTextMessage("666 " + i); // 8、发布消息 createProducer.send(textMessage); System.out.println("发送的消息为:" + "666 " + i); } // 9、关闭连接 createConnection.close(); } }
运行下生产者的类 Producer.java,将消息存到ActiveMQ服务器上
查看ActiveMQ中的队列信息
拥有100条信息,未出列,这时候运行一个消费者(Consumer.java)去消费这100条信息
查看ActiveMQ上的信息,100条信息全被这个1个消费者接收
清空,然后同时运行三个消费者(Consumer.java执行三次后可在console切换不同类的控制台),在运行一个生产者
可以看到这100条消息被这三个消费者平分了。
点对点模式主要用于消除程序高并发高峰对数据库造成的巨大压力,可以通过使用消息队列,让消费者进程从消息队列中获取数据,然后异步将数据写入数据库,由于消息队列的服务处理速度远快于数据库,因此用户的响应延迟可得到有效改善。
订阅模式(非持久订阅)
将消费者(Consumer.java)修改成以下内容,写法相同,只是session不再是创建队列消费者,而是创建主题消费者
package cn.lcf.TestActiveMQ; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消费者 * */ public class Consumer { private static final String URL = "tcp://127.0.0.1:61616"; //订阅模式名称 private static final String topicName = "topic-name"; public static void main(String[] args) throws JMSException { //创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); //创建连接 Connection createConnection = connectionFactory.createConnection();//打开连接 createConnection.start(); //创建会话 Session createSession = createConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建发布/订阅模式消息 Topic createTopic = createSession.createTopic(topicName);// 非持久订阅 //创建消费者 MessageConsumer createConsumer = createSession.createConsumer(createTopic); //设置消费者监听 createConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接收的消息为:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } }
将生产者(Producer.java)修改成以下内容
package cn.lcf.TestActiveMQ; import java.util.Enumeration; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; import org.apache.activemq.ActiveMQConnectionFactory; /** * 生产者 * */ public class Producer { private static final String URL = "tcp://127.0.0.1:61616"; //发布/订阅模式名称 private static final String topicName = "topic-name"; public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); Connection createConnection = connectionFactory.createConnection(); createConnection.start(); Session createSession = createConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); //发布/订阅模式 Topic createTopic = createSession.createTopic(topicName); MessageProducer createProducer = createSession.createProducer(createTopic); for (int i = 0; i < 100; i++) { TextMessage textMessage = createSession.createTextMessage("666 " + i); createProducer.send(textMessage); System.out.println("发送的消息为:" + textMessage.getText()); } createConnection.close(); } }
之后先运行消费者,在运行生产者,消费者才能接受到信息,否则生产者发布信息时若没有在监听的消费者则会将信息丢弃,这样消费者是接收不到信息的。
同时运行多个消费者,在运行生产者,消费者将获取生产者发布的所有消息
订阅模式(持久订阅)
持久订阅模式的客户端需要创建一个链接id,以保证服务器确认该客户端是否已消费信息,创建完订阅模式,之后不再是创建一个消费者,而是创建一个带有id的用户,这个用户id是唯一的,若有两个相同的id连接,则会报错。
public static void main(String[] args) throws JMSException { //创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); //创建连接 Connection createConnection = connectionFactory.createConnection(); //创建客户端ID createConnection.setClientID("333"); //打开连接 createConnection.start(); //创建会话 Session createSession = createConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建发布/订阅模式消息 Topic createTopic = createSession.createTopic(topicName); //创建持久订阅 即未在发布者发布时监听消息,在之后也能接收消息 TopicSubscriber subscriber = createSession.createDurableSubscriber(createTopic, "333"); subscriber.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { TextMessage textMessage = (TextMessage) message; try { System.out.println("接受消息:" + textMessage.getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }); }
生产者需要将消息模式设为持久订阅模式
public static void main(String[] args) throws JMSException { ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); Connection createConnection = connectionFactory.createConnection(); createConnection.start(); Session createSession = createConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); //发布/订阅模式 Topic createTopic = createSession.createTopic(topicName); MessageProducer createProducer = createSession.createProducer(createTopic); //设置为持久订阅模式 createProducer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 0; i < 100; i++) { TextMessage textMessage = createSession.createTextMessage("666 " + i); createProducer.send(textMessage); System.out.println("发送的消息为:" + textMessage.getText()); } createConnection.close(); }
运行用户(Consumer.java),创建连接id,之后将用户连接关闭,启动生产者(Producer.java)发布消息,最后在重新连接用户获取信息。当用户离线状态时,发布者发布的消息会将信息存在activeMQ服务器上,等待用户监听时将消息发送给用户。
生成的用户会在subscribers中显示
运行发布者(Producer.java),并且用户处于离线状态,则会显示消息等待出列。
最后再次连接上用户(Consumer.java),则用户的能立即获取消息。
持久传输和非持久传输最大的区别是:采用持久传输时,传输的消息会保存到磁盘中(messages are persisted to disk/database),即“存储转发”方式。先把消息存储到磁盘中,然后再将消息“转发”给订阅者。
采用非持久传输时,发送的消息不会存储到磁盘中。
采用持久传输时,当Borker宕机 恢复后,消息还在。采用非持久传输,Borker宕机重启后,消息丢失。比如,当生产者将消息投递给Broker后,Broker将该消息存储到磁盘中,在Broker将消息发送给Subscriber之前,Broker宕机了,如果采用持久传输,Broker重启后,从磁盘中读出消息再传递给Subscriber;如果采用非持久传输,这条消息就丢失了。
以上是关于ActiveMQ使用的主要内容,如果未能解决你的问题,请参考以下文章