ActiveMQ之队列和主题发布订阅实例
Posted zengnansheng
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ之队列和主题发布订阅实例相关的知识,希望对你有一定的参考价值。
JMS 消息模型
JMS消息服务应用程序结构支持两种模型:点对点模型,发布者/订阅者模型。
(1)点对点模型(Queue)
一个生产者向一个特定的队列发布消息,一个消费者从这个队列中依次读取消息。
模型特点:只有一个消费者获得消息。
(2)发布者/订阅者模型(Topic)
0个或多个订阅者可以接受特定主题的消息。
模型特点:多个消费者可获得消息。
Topic和Queue的最大区别在于Topic是以广播的形式,通知所有在线监听的客户端有新的消息,没有监听的客户端将收不到消息;而Queue则是以点对点的形式通知多个处于监听状态的客户端中的一个。
JMS消息格式
-
MapMessage -- key-value键值对
-
TextMessage -- 字符串对象
-
ObjcetMessage -- 一个序列化的Java对象
-
ByteMessage -- 一个未解释字节的数据流
-
StreamMessage -- Java原始值的数据流
增加maven依赖
<dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.10.0</version> </dependency>
点对点
生产者
package com.zns.activemq.queue; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { public static void main(String[] args) { // 连接工厂 ConnectionFactory factory; // 连接实例 Connection connection = null; // 收发的线程实例 Session session; // 消息发送目标地址 Destination destination; // 消息创建者 MessageProducer messageProducer; try { factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 获取连接实例 connection = factory.createConnection(); // 启动连接 connection.start(); // 创建接收或发送的线程实例(创建session的时候定义是否要启用事务,且事务类型是Auto_ACKNOWLEDGE也就是消费者成功在Listern中获得消息返回时,会话自动确定用户收到消息) session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建队列(返回一个消息目的地) destination = session.createQueue("Queue1"); // 创建消息生产者 messageProducer = session.createProducer(destination); // 创建TextMessage消息实体 TextMessage message = session.createTextMessage("hello,world!"); messageProducer.send(message); session.commit(); System.out.println("生产了消息..."); } catch (JMSException e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
消费者
package com.zns.activemq.queue; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; public class Consumer { public static void main(String[] args) { // 连接工厂 ConnectionFactory connectionFactory; // 连接实例 Connection connection = null; // 收发的线程实例 Session session; // 消息发送目标地址 Destination destination; try { // 实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 获取连接实例 connection = connectionFactory.createConnection(); // 启动连接 connection.start(); // 创建接收或发送的线程实例(消费者就不需要开启事务了) session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE); // 创建队列(返回一个消息目的地) destination = session.createQueue("Queue1"); // 创建消息消费者 MessageConsumer consumer = session.createConsumer(destination); //注册消息监听 consumer.setMessageListener(new MyListerner()); } catch (JMSException e) { e.printStackTrace(); } } }
消息监听器
package com.zns.activemq.queue; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class MyListerner implements MessageListener{ public void onMessage(Message message) { try { System.out.println(((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }
发布订阅
生产者
package com.zns.activemq.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { public static void main(String[] args) { // 连接工厂 ConnectionFactory factory; // 连接实例 Connection connection = null; // 收发的线程实例 Session session; // 消息发送目标地址 Destination destination; // 消息创建者 MessageProducer messageProducer; try { factory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 获取连接实例 connection = factory.createConnection(); // 启动连接 connection.start(); // 创建接收或发送的线程实例(创建session的时候定义是否要启用事务,且事务类型是Auto_ACKNOWLEDGE也就是消费者成功在Listern中获得消息返回时,会话自动确定用户收到消息) session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // 创建Topic destination = session.createTopic("Topic1"); // 创建消息生产者 messageProducer = session.createProducer(destination); // 创建TextMessage消息实体 TextMessage message = session.createTextMessage("hello,world!"); messageProducer.send(message); session.commit(); System.out.println("生产了消息..."); } catch (JMSException e) { e.printStackTrace(); } finally { if (connection != null) { try { connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
消费者
package com.zns.activemq.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; public class Consumer1 { public static void main(String[] args) { // 连接工厂 ConnectionFactory connectionFactory; // 连接实例 Connection connection = null; // 收发的线程实例 Session session; // 消息发送目标地址 Destination destination; try { // 实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 获取连接实例 connection = connectionFactory.createConnection(); // 启动连接 connection.start(); // 创建接收或发送的线程实例(消费者就不需要开启事务了) session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE); // 创建Topic destination = session.createTopic("Topic1"); // 创建消息消费者 MessageConsumer consumer = session.createConsumer(destination); //注册消息监听 consumer.setMessageListener(new Listerner1()); } catch (JMSException e) { e.printStackTrace(); } } } package com.zns.activemq.topic; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; public class Consumer2 { public static void main(String[] args) { // 连接工厂 ConnectionFactory connectionFactory; // 连接实例 Connection connection = null; // 收发的线程实例 Session session; // 消息发送目标地址 Destination destination; try { // 实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); // 获取连接实例 connection = connectionFactory.createConnection(); // 启动连接 connection.start(); // 创建接收或发送的线程实例(消费者就不需要开启事务了) session = connection.createSession(Boolean.FALSE,Session.AUTO_ACKNOWLEDGE); // 创建Topic destination = session.createTopic("Topic1"); // 创建消息消费者 MessageConsumer consumer = session.createConsumer(destination); //注册消息监听 consumer.setMessageListener(new Listerner2()); } catch (JMSException e) { e.printStackTrace(); } } }
消息监听器
package com.zns.activemq.topic; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class Listerner1 implements MessageListener{ public void onMessage(Message message) { try { System.out.println("订阅者1接收到消息:" + ((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } } } package com.zns.activemq.topic; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class Listerner2 implements MessageListener{ public void onMessage(Message message) { try { System.out.println("订阅者2接收到消息:" + ((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }
以上是关于ActiveMQ之队列和主题发布订阅实例的主要内容,如果未能解决你的问题,请参考以下文章
如何使用虚拟目的地创建多个 activemq 主题订阅者实例?
漏洞复现 - ActiveMQ反序列化漏洞(CVE-2015-5254)