ActiveMQ-初识MQ--主题topic
Posted 闲言博客
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ-初识MQ--主题topic相关的知识,希望对你有一定的参考价值。
1.导入依赖
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
2.编写生产者代码
编写生产者步骤
- 获取连接工厂
- 获取连接 connection
- 获取session
- 创建目的地 queue
- 创建创建生产者
- 创建消息
- 发送消息
- 释放资源
public class JmsProduce_topic
private static String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
private static String TOPIC_NAME = "topic01";
public static void main(String[] args) throws Exception
//1 按照给定的url创建连接工厂,这个构造器采用默认的用户名密码。该类的其他构造方法可以指定用户名和密码。
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2 通过连接工厂,获得连接 connection 并启动访问。
Connection connection = factory.createConnection();
connection.start();
// 3 创建会话session 。第一参数是是否开启事务, 第二参数是消息签收的方式
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4 创建目的地:topic
Topic topic = session.createTopic(TOPIC_NAME);
// 5 创建消息的生产者
MessageProducer producer = session.createProducer(topic);
// 6 通过messageProducer 生产 3 条 消息发送到消息队列中
TextMessage textMessage = session.createTextMessage();
for (int i = 0; i < 5; i++)
// 7 创建消息
textMessage.setText("这是"+i+"条消息");
// 8 通过messageProducer发送给mq
producer.send(textMessage);
// 9 关闭资源
producer.close();
session.close();
connection.close();
注意:生产者生产时,topic不保存消息它是无状态的不落地,假如无人订阅就去生产,那就是一条废消息
,所以,一般先启动消费者再启动生产者
。
3.编写消费者代码(while阻塞方式)
编写生产者步骤
- 获取连接工厂
- 获取连接 connection
- 获取session
- 创建目的地 queue
- 创建创建消费者
- 消费消息
- 释放资源【处于阻塞状态的话不会释放】
public class JmsConsumer_topic
private static String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
private static String TOPIC_NAME = "topic01";
public static void main(String[] args) throws JMSException
//获取工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//获取连接
Connection connection = factory.createConnection();
connection.start();
//获取session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4 创建目的地:主题
Topic topic = session.createTopic(TOPIC_NAME);
//创建消费者
MessageConsumer consumer = session.createConsumer(topic);
/* 通过监听的方式来消费消息,是异步非阻塞的方式消费消息。
*/
while (true)
// reveive() 一直等待接收消息,在能够接收到消息之前将一直阻塞。 是同步阻塞方式 。和socket的accept方法类似的。
// reveive(Long time) : 等待n毫秒之后还没有收到消息,就是结束阻塞。
// 因为消息发送者是 TextMessage,所以消息接受者也要是TextMessage
TextMessage receive = (TextMessage)consumer.receive();
if (receive != null)
String msg = receive.getText();
System.out.println(msg);
else
break;
consumer.close();
session.close();
connection.close();
3.编写消费者代码(监听器方式)
public class JmsConsumerTopicAsyn
private static String ACTIVEMQ_URL = "tcp://127.0.0.1:61616";
private static String TOPIC_NAME = "topic01";
public static void main(String[] args) throws JMSException, IOException
//获取工厂
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//获取连接
Connection connection = factory.createConnection();
connection.start();
//获取session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//4 创建目的地 (两种 : 队列/主题 这里用主题)
Topic topic = session.createTopic(TOPIC_NAME);
//创建消费者
MessageConsumer consumer = session.createConsumer(topic);
/* 通过监听的方式来消费消息,是异步非阻塞的方式消费消息。
通过messageConsumer 的setMessageListener 注册一个监听器,当有消息发送来时,系统自动调用MessageListener 的 onMessage 方法处理消息
*/
consumer.setMessageListener(new JmsConsumerListener());
System.in.read();
consumer.close();
session.close();
connection.close();
public class JmsConsumerListener implements MessageListener
//有消息时会触发该方法
@Override
public void onMessage(Message message)
//监听
TextMessage textMessage = (TextMessage) message;
if (null != textMessage)
try
System.out.println(textMessage.getText());
catch (JMSException e)
e.printStackTrace();
4.查看后台
启动消费者前
启动消费者后
启动消费者后,再启动生产者,这样生产的消息才会被消费到
。
以上是关于ActiveMQ-初识MQ--主题topic的主要内容,如果未能解决你的问题,请参考以下文章
消息中间件ActiveMQ学习笔记 [Java编码MQ,消费者生产者基本模型]