消息队列:快速上手ActiveMQ消息队列的JMS方式使用(两种模式:Topic和Queue的消息推送和订阅)
Posted eguid
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列:快速上手ActiveMQ消息队列的JMS方式使用(两种模式:Topic和Queue的消息推送和订阅)相关的知识,希望对你有一定的参考价值。
1、实现功能
希望使用一套API,实现两种模式下的消息发送和接收功能,方便业务程序调用
1、发送Topic
2、发送Queue
3、接收Topic
4、接收Queue
2、接口设计
根据功能设计公共调用接口
-
/**
-
* 数据分发接口(用于发送、接收消息队列数据)
-
*
-
* @author eguid
-
*
-
*/
-
public interface MsgDistributeInterface {
-
-
/**
-
* 发送到主题
-
*
-
* @param topicName -主题
-
* @param data -数据
-
* @return
-
*/
-
public boolean sendTopic(String topicName, byte[] data);
-
-
/**
-
* 发送到主题
-
* @param topicName -主题
-
* @param data-数据
-
* @param offset -偏移量
-
* @param length -长度
-
* @return
-
*/
-
boolean sendTopic(String topicName, byte[] data, int offset, int length);
-
-
/**
-
* 发送到队列
-
*
-
* @param queueName -队列名称
-
* @param data -数据
-
* @return
-
*/
-
public boolean sendQueue(String queueName, byte[] data);
-
-
/**
-
* 发送到队列
-
* @param queueName -队列名称
-
* @param data -数据
-
* @param offset
-
* @param length
-
* @return
-
*/
-
public boolean sendQueue(String queueName, byte[] data,int offset, int length);
-
-
/**
-
* 接收队列消息
-
* @param queueName 队列名称
-
* @param listener
-
* @throws JMSException
-
*/
-
void receiveQueue(String queueName, MessageListener listener) throws JMSException;
-
-
/**
-
* 订阅主题
-
* @param topicName -主题名称
-
* @param listener
-
* @throws JMSException
-
*/
-
void receiveTopic(String topicName, MessageListener listener) throws JMSException;
-
}
3、基于ActiveMQ的接口实现
-
/**
-
* 基于activeMQ的消息生产者/消费者实现(初始化该对象时即初始化连接消息队列,如果无法连接到消息队列,立即抛出异常)
-
*
-
* @author eguid
-
*
-
*/
-
public class ActiveMQImpl implements MsgDistributeInterface {
-
-
private String userName;
-
private String password;
-
private String brokerURL;
-
private boolean persistentMode;//持久化模式
-
//连接工厂
-
ConnectionFactory connectionFactory;
-
//发送消息的线程
-
Connection connection;
-
// 事务管理
-
Session session;
-
-
//存放各个线程订阅模式生产者
-
ThreadLocal<MessageProducer> topicThreadLocal = new ThreadLocal<MessageProducer>();
-
//存放各个线程队列模式生产者
-
ThreadLocal<MessageProducer> queueThreadLocal = new ThreadLocal<MessageProducer>();
-
-
public ActiveMQImpl(String userName, String password, String brokerURL) throws JMSException {
-
this(userName, password, brokerURL, true);
-
}
-
-
public ActiveMQImpl(String userName, String password, String brokerURL,boolean persistentMode) throws JMSException {
-
this.userName = userName;
-
this.password = password;
-
this.brokerURL = brokerURL;
-
this.persistentMode=persistentMode;
-
init();
-
}
-
-
public void init() throws JMSException {
-
try {
-
// 创建一个链接工厂
-
connectionFactory = new ActiveMQConnectionFactory(this.userName, this.password, this.brokerURL);
-
// 从工厂中创建一个链接
-
connection = connectionFactory.createConnection();
-
// 开启链接
-
connection.start();
-
// 创建一个事务(订阅模式,事务采用自动确认方式)
-
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-
} catch (JMSException e) {
-
throw e;
-
}
-
}
-
-
-
public boolean sendTopic(String topicName, byte[] data) {
-
return sendTopic(topicName, data, 0, data.length);
-
}
-
-
-
public boolean sendTopic(String topicName, byte[] data, int offset, int length) {
-
return send(true, topicName, data, offset, length);
-
}
-
-
-
public boolean sendQueue(String queueName, byte[] data) {
-
return sendQueue(queueName, data, 0, data.length);
-
}
-
-
-
public boolean sendQueue(String queueName, byte[] data, int offset, int length) {
-
return send(false, queueName, data, offset, length);
-
}
-
-
/**
-
* 发送数据
-
*
-
* @param name
-
* @param data
-
* @param offset
-
* @param length
-
* @param type
-
* -类型
-
* @return
-
*/
-
private boolean send(boolean type, String name, byte[] data, int offset, int length) {
-
try {
-
MessageProducer messageProducer = getMessageProducer(name, type);
-
-
BytesMessage msg = createBytesMsg(data, offset, length);
-
System.err.println(Thread.currentThread().getName()+"发送消息");
-
// 发送消息
-
messageProducer.send(msg);
-
} catch (JMSException e) {
-
return false;
-
}
-
return false;
-
}
-
-
public void receive(String topicName) throws JMSException {
-
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-
Topic topic =session.createTopic(topicName);
-
MessageConsumer consumer=session.createConsumer(topic);
-
consumer.setMessageListener(new MessageListener() {
-
-
public void onMessage(Message message) {
-
BytesMessage msg=(BytesMessage) message;
-
System.err.println(Thread.currentThread().getName()+"收到消息:"+msg.toString());
-
}
-
});
-
-
}
-
/**
-
* 创建字节数组消息
-
*
-
* @param data
-
* @param offset
-
* @param length
-
* @return
-
* @throws JMSException
-
*/
-
private BytesMessage createBytesMsg(byte[] data, int offset, int length) throws JMSException {
-
BytesMessage msg = session.createBytesMessage();
-
msg.writeBytes(data, offset, length);
-
return msg;
-
}
-
-
/**
-
* 创建对象序列化消息
-
* @param obj
-
* @return
-
* @throws JMSException
-
*/
-
private ObjectMessage createMapMsg(Serializable obj) throws JMSException {
-
// MapMessage msg = session.createMapMessage();//key-value形式的消息
-
ObjectMessage msg = session.createObjectMessage(obj);
-
return msg;
-
}
-
-
/**
-
* 创建字符串消息
-
* @param text
-
* @return
-
* @throws JMSException
-
*/
-
private TextMessage createTextMsg(String text) throws JMSException {
-
TextMessage msg = session.createTextMessage(text);
-
return msg;
-
}
-
-
-
/**
-
* 获取创建者
-
*
-
* @param name -名称(主题名称和队列名称)
-
* @param type -类型(true:topic,false:queue)
-
* @return
-
* @throws JMSException
-
*/
-
private MessageProducer getMessageProducer(String name, boolean type) throws JMSException {
-
return type?getTopicProducer(name):getQueueProducer(name);
-
}
-
-
/**
-
* 创建或获取队列
-
* @param queueName
-
* @return
-
* @throws JMSException
-
*/
-
private MessageProducer getQueueProducer(String queueName) throws JMSException {
-
MessageProducer messageProducer = null;
-
if ((messageProducer = queueThreadLocal.get()) == null) {
-
Queue queue = session.createQueue(queueName);
-
messageProducer = session.createProducer(queue);
-
//是否持久化(1-不持久化(如果没有消费者,消息就也会自动失效),2-持久化(如果没有消费者进行消费,消息队列也会缓存消息等待消费者进行消费))
-
messageProducer.setDeliveryMode(persistentMode?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);
-
queueThreadLocal.set(messageProducer);
-
}
-
return messageProducer;
-
}
-
-
/**
-
* 创建或获取主题
-
* @param topicName
-
* @return
-
* @throws JMSException
-
*/
-
private MessageProducer getTopicProducer(String topicName) throws JMSException {
-
MessageProducer messageProducer = null;
-
if ((messageProducer = topicThreadLocal.get()) == null) {
-
Topic topic = session.createTopic(topicName);
-
messageProducer = session.createProducer(topic);
-
//是否持久化(1-不持久化(如果没有消费者,消息就也会自动失效),2-持久化(如果没有消费者进行消费,消息队列也会缓存消息等待消费者进行消费))
-
messageProducer.setDeliveryMode(persistentMode?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);
-
topicThreadLocal.set(messageProducer);
-
}
-
return messageProducer;
-
}
-
-
public String getPassword() {
-
return password;
-
}
-
-
public void setPassword(String password) {
-
this.password = password;
-
}
-
-
-
public void receiveQueue(String queueName,MessageListener listener) throws JMSException {
-
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-
Queue topic =session.createQueue(queueName);
-
MessageConsumer consumer=session.createConsumer(topic);
-
consumer.setMessageListener(listener);
-
-
}
-
-
-
public void receiveTopic(String topicName,MessageListener listener) throws JMSException {
-
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
-
Topic topic =session.createTopic(topicName);
-
MessageConsumer consumer=session.createConsumer(topic);
-
consumer.setMessageListener(listener);
-
}
4、测试一下Topic和Queue
-
public static void main(String[] args) throws JMSException{
-
//如果创建失败会立即抛出异常
-
MsgDistributeInterface producter = new ActiveMQImpl("system", "manager", "tcp://127.0.0.1:61616");
-
Test testMq = new Test();
-
try {
-
Thread.sleep(1000);
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
//Thread 1
-
new Thread(testMq.new ProductorMq(producter)).start();
-
//Thread 2
-
new Thread(testMq.new ProductorMq(producter)).start();
-
//Thread 3
-
new Thread(testMq.new ProductorMq(producter)).start();
-
//Thread 4
-
new Thread(testMq.new ProductorMq(producter)).start();
-
//Thread 5
-
new Thread(testMq.new ProductorMq(producter)).start();
-
//Thread 6
-
new Thread(testMq.new ProductorMq(producter)).start();
-
-
//订阅接收线程Thread 1
-
new Thread(new Runnable() {
-
-
public void run() {
-
try {
-
producter.receiveTopic("eguid-topic",new MessageListener() {
-
-
public void onMessage(Message message) {
-
BytesMessage msg=(BytesMessage) message;
-
System.err.println(Thread.currentThread().getName()+"订阅主题消息:"+msg.toString());
-
}
-
});
-
} catch (JMSException e) {
-
// TODO Auto-generated catch block
-
e.printStackTrace();
-
}
-
}
-
}).start();
-
//订阅接收线程Thread 2
-
new Thread(new Runnable() {
-
-
public void run() {
-
try {
-
producter.receiveTopic("eguid-topic",new MessageListener() {
-
-
public void onMessage(Message message) {
-
BytesMessage msg=(BytesMessage) message;
-
System.err.println(Thread.currentThread().getName()+"订阅主题消息:"+msg.toString());
-
}
-
});
-
} catch (JMSException e) {
-
// TODO Auto-generated catch block
-
e.printStackTrace();
-
}
-
}
-
}).start();
-
//队列消息生产线程Thread-1
-
new Thread(testMq.new QueueProductor(producter)).start();
-
//队列消息生产线程Thread-2
-
new Thread(testMq.new QueueProductor(producter)).start();
-
//队列接收线程Thread 1
-
new Thread(new Runnable() {
-
-
public void run() {
-
try {
-
producter.receiveQueue("eguid-queue",new MessageListener() {
-
-
public void onMessage(Message message) {
-
BytesMessage msg=(BytesMessage) message;
-
System.err.println(Thread.currentThread().getName()+"收到队列消息:"+msg.toString());
-
}
-
});
-
} catch (JMSException e) {
-
// TODO Auto-generated catch block
-
e.printStackTrace();
-
}
-
}
-
}).start();
-
//队列接收线程Thread2
-
new Thread(new Runnable() {
-
-
public void run() {
-
try {
-
producter.receiveQueue("eguid-queue",new MessageListener() {
-
-
public void onMessage(Message message) {
-
BytesMessage msg=(BytesMessage) message;
-
System.err.println(Thread.currentThread().getName()+"收到队列消息:"+msg.toString());
-
}
-
});
-
} catch (JMSException e) {
-
// TODO Auto-generated catch block
-
e.printStackTrace();
-
}
-
}
-
}).start();
-
}
-
-
private class ProductorMq implements Runnable{
-
Jtt809MsgProducter producter;
-
public ProductorMq(Jtt809MsgProducter producter){
-
this.producter = producter;
-
}
-
-
-
public void run() {
-
while(true){
-
try {
-
String wang=Thread.currentThread().getName()+"Hello eguid! This is topic.";
-
producter.sendTopic("eguid-topic",wang.getBytes());
-
-
Thread.sleep(2000);
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
}
-
}
-
}
-
-
private class QueueProductor implements Runnable{
-
Jtt809MsgProducter producter;
-
public QueueProductor(Jtt809MsgProducter producter){
-
this.producter = producter;
-
}
-
-
-
public void run() {
-
while(true){
-
try {
-
String eguid=Thread.currentThread().getName()+"Hello eguid! This is queue.";
-
producter.sendQueue("eguid-queue",eguid.getBytes());
-
Thread.sleep(2000);
-
} catch (InterruptedException e) {
-
e.printStackTrace();
-
}
-
}
-
}
-
}
-------------------End--------------------
以上是关于消息队列:快速上手ActiveMQ消息队列的JMS方式使用(两种模式:Topic和Queue的消息推送和订阅)的主要内容,如果未能解决你的问题,请参考以下文章