Apache ActiveMQ教程二 (消息主题订阅)
Posted Share_Boy
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache ActiveMQ教程二 (消息主题订阅)相关的知识,希望对你有一定的参考价值。
Do not talk nonsense
Look at my code
服务端:
package mq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
import com.sun.xml.internal.ws.wsdl.writer.UsingAddressing;
public class Publisher
static int i = 0;
public static void main(String[] args) throws JMSException, InterruptedException
/**
* 创建ActiveMQ的连接工厂
* ActiveMQConnection.DEFAULT_USER, 默认用户名
* ActiveMQConnection.DEFAULT_PASSWORD, 默认密码
* "tcp://127.0.0.1:61616" 连接地址
*/
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
/**
* 从连接工厂中获得JMS连接并启动
*/
Connection connection = connectionFactory.createConnection();
connection.start();
/**
* 创建一个连接的session
* 第一个参数设定是否需要事务支持
*/
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
/**
* 发送消息 而订阅者接收消息必须有个topic,名字也叫msg.714303584
*/
Destination sendTopic = new ActiveMQTopic("msg.714303584");
/**
* 订阅消息 而订阅者发送消息必须有个receive,名字也叫msg.714303584
*/
Destination sendReceive = new ActiveMQTopic("msg.receive");
/**
* 根据主题创建一个发送者
*/
final MessageProducer producer = session.createProducer(sendTopic);
/**
* 根据主题创建一个接收者
*/
final MessageConsumer consumer = session.createConsumer(sendReceive);
Thread sender = new Thread(new Runnable()
@Override
public void run()
while (true)
/**
* TextMessage: 创建一个字符串消息
* 使用producer发送消息,session提交消息
*/
try
TextMessage smsg = session.createTextMessage("我是服务端:"+i++);
System.out.println("发送消息"+i);
producer.send(smsg);
session.commit();
Thread.sleep(1000);
catch (JMSException | InterruptedException e)
// TODO 自动生成的 catch 块
e.printStackTrace();
);
Thread receiver = new Thread(new Runnable()
@Override
public void run()
while (true)
/**
*consumer 接收消息
*记得要commit提交事务否则会出现重复读取
*/
try
TextMessage gmsg = (TextMessage) consumer.receive();
System.out.println("我是服务端的接受"+gmsg.getText());
session.commit();
catch (Exception e)
// TODO: handle exception
);
sender.start();
receiver.start();
客户端:
package mq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;
public class Subscriber
static int i = 0;
public static void main(String[] args) throws JMSException, InterruptedException
/**
* 创建ActiveMQ的连接工厂
* ActiveMQConnection.DEFAULT_USER, 默认用户名
* ActiveMQConnection.DEFAULT_PASSWORD, 默认密码
* "tcp://127.0.0.1:61616" 连接地址
*/
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
/**
* 从连接工厂中获得JMS连接并启动
*/
Connection connection = connectionFactory.createConnection();
connection.start();
/**
* 创建一个连接的session
* 第一个参数设定是否需要事务支持
*/
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
/**
* 发送消息 而订阅者接收消息必须有个topic,名字也叫msg.714303584
*/
Destination sendTopic = new ActiveMQTopic("msg.receive");
/**
* 订阅消息 而订阅者发送消息必须有个receive,名字也叫msg.714303584
*/
Destination sendReceive = new ActiveMQTopic("msg.714303584");
/**
* 根据主题创建一个发送者
*/
final MessageProducer producer = session.createProducer(sendTopic);
/**
* 根据主题创建一个接收者
*/
final MessageConsumer consumer = session.createConsumer(sendReceive);
Thread sender = new Thread(new Runnable()
@Override
public void run()
while (true)
/**
* TextMessage: 创建一个字符串消息
* 使用producer发送消息,session提交消息
*/
try
TextMessage smsg = session.createTextMessage("我是客户端:"+i++);
producer.send(smsg);
session.commit();
Thread.sleep(1000);
catch (JMSException | InterruptedException e)
// TODO 自动生成的 catch 块
e.printStackTrace();
);
Thread receiver = new Thread(new Runnable()
@Override
public void run()
while (true)
/**
*consumer 读取消息
*注意commit提交
*/
try
TextMessage gmsg = (TextMessage) consumer.receive();
System.out.println("我是客户端的接受"+gmsg.getText());
session.commit();
catch (Exception e)
// TODO: handle exception
);
sender.start();
receiver.start();
以上是关于Apache ActiveMQ教程二 (消息主题订阅)的主要内容,如果未能解决你的问题,请参考以下文章
WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅
WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅