消息队列:快速上手ActiveMQ消息队列的JMS方式使用(两种模式:Topic和Queue的消息推送和订阅)

Posted eguid

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列:快速上手ActiveMQ消息队列的JMS方式使用(两种模式:Topic和Queue的消息推送和订阅)相关的知识,希望对你有一定的参考价值。

1、实现功能

希望使用一套API,实现两种模式下的消息发送和接收功能,方便业务程序调用

1、发送Topic

2、发送Queue

3、接收Topic

4、接收Queue

2、接口设计

根据功能设计公共调用接口

  1. /**
  2. * 数据分发接口(用于发送、接收消息队列数据)
  3. *
  4. * @author eguid
  5. *
  6. */
  7. public interface MsgDistributeInterface {
  8.  
  9. /**
  10. * 发送到主题
  11. *
  12. * @param topicName -主题
  13. * @param data -数据
  14. * @return
  15. */
  16. public boolean sendTopic(String topicName, byte[] data);
  17.  
  18. /**
  19. * 发送到主题
  20. * @param topicName -主题
  21. * @param data-数据
  22. * @param offset -偏移量
  23. * @param length -长度
  24. * @return
  25. */
  26. boolean sendTopic(String topicName, byte[] data, int offset, int length);
  27.  
  28. /**
  29. * 发送到队列
  30. *
  31. * @param queueName -队列名称
  32. * @param data -数据
  33. * @return
  34. */
  35. public boolean sendQueue(String queueName, byte[] data);
  36.  
  37. /**
  38. * 发送到队列
  39. * @param queueName -队列名称
  40. * @param data -数据
  41. * @param offset
  42. * @param length
  43. * @return
  44. */
  45. public boolean sendQueue(String queueName, byte[] data,int offset, int length);
  46.  
  47. /**
  48. * 接收队列消息
  49. * @param queueName 队列名称
  50. * @param listener
  51. * @throws JMSException
  52. */
  53. void receiveQueue(String queueName, MessageListener listener) throws JMSException;
  54.  
  55. /**
  56. * 订阅主题
  57. * @param topicName -主题名称
  58. * @param listener
  59. * @throws JMSException
  60. */
  61. void receiveTopic(String topicName, MessageListener listener) throws JMSException;
  62. }

3、基于ActiveMQ的接口实现

  1. /**
  2. * 基于activeMQ的消息生产者/消费者实现(初始化该对象时即初始化连接消息队列,如果无法连接到消息队列,立即抛出异常)
  3. *
  4. * @author eguid
  5. *
  6. */
  7. public class ActiveMQImpl implements MsgDistributeInterface {
  8.  
  9. private String userName;
  10. private String password;
  11. private String brokerURL;
  12. private boolean persistentMode;//持久化模式
  13. //连接工厂
  14. ConnectionFactory connectionFactory;
  15. //发送消息的线程
  16. Connection connection;
  17. // 事务管理
  18. Session session;
  19.  
  20. //存放各个线程订阅模式生产者
  21. ThreadLocal<MessageProducer> topicThreadLocal = new ThreadLocal<MessageProducer>();
  22. //存放各个线程队列模式生产者
  23. ThreadLocal<MessageProducer> queueThreadLocal = new ThreadLocal<MessageProducer>();
  24.  
  25. public ActiveMQImpl(String userName, String password, String brokerURL) throws JMSException {
  26. this(userName, password, brokerURL, true);
  27. }
  28.  
  29. public ActiveMQImpl(String userName, String password, String brokerURL,boolean persistentMode) throws JMSException {
  30. this.userName = userName;
  31. this.password = password;
  32. this.brokerURL = brokerURL;
  33. this.persistentMode=persistentMode;
  34. init();
  35. }
  36.  
  37. public void init() throws JMSException {
  38. try {
  39. // 创建一个链接工厂
  40. connectionFactory = new ActiveMQConnectionFactory(this.userName, this.password, this.brokerURL);
  41. // 从工厂中创建一个链接
  42. connection = connectionFactory.createConnection();
  43. // 开启链接
  44. connection.start();
  45. // 创建一个事务(订阅模式,事务采用自动确认方式)
  46. session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  47. } catch (JMSException e) {
  48. throw e;
  49. }
  50. }
  51.  
  52. @Override
  53. public boolean sendTopic(String topicName, byte[] data) {
  54. return sendTopic(topicName, data, 0, data.length);
  55. }
  56.  
  57. @Override
  58. public boolean sendTopic(String topicName, byte[] data, int offset, int length) {
  59. return send(true, topicName, data, offset, length);
  60. }
  61.  
  62. @Override
  63. public boolean sendQueue(String queueName, byte[] data) {
  64. return sendQueue(queueName, data, 0, data.length);
  65. }
  66.  
  67. @Override
  68. public boolean sendQueue(String queueName, byte[] data, int offset, int length) {
  69. return send(false, queueName, data, offset, length);
  70. }
  71.  
  72. /**
  73. * 发送数据
  74. *
  75. * @param name
  76. * @param data
  77. * @param offset
  78. * @param length
  79. * @param type
  80. * -类型
  81. * @return
  82. */
  83. private boolean send(boolean type, String name, byte[] data, int offset, int length) {
  84. try {
  85. MessageProducer messageProducer = getMessageProducer(name, type);
  86.  
  87. BytesMessage msg = createBytesMsg(data, offset, length);
  88. System.err.println(Thread.currentThread().getName()+"发送消息");
  89. // 发送消息
  90. messageProducer.send(msg);
  91. } catch (JMSException e) {
  92. return false;
  93. }
  94. return false;
  95. }
  96.  
  97. public void receive(String topicName) throws JMSException {
  98. final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
  99. Topic topic =session.createTopic(topicName);
  100. MessageConsumer consumer=session.createConsumer(topic);
  101. consumer.setMessageListener(new MessageListener() {
  102. @Override
  103. public void onMessage(Message message) {
  104. BytesMessage msg=(BytesMessage) message;
  105. System.err.println(Thread.currentThread().getName()+"收到消息:"+msg.toString());
  106. }
  107. });
  108.  
  109. }
  110. /**
  111. * 创建字节数组消息
  112. *
  113. * @param data
  114. * @param offset
  115. * @param length
  116. * @return
  117. * @throws JMSException
  118. */
  119. private BytesMessage createBytesMsg(byte[] data, int offset, int length) throws JMSException {
  120. BytesMessage msg = session.createBytesMessage();
  121. msg.writeBytes(data, offset, length);
  122. return msg;
  123. }
  124.  
  125. /**
  126. * 创建对象序列化消息
  127. * @param obj
  128. * @return
  129. * @throws JMSException
  130. */
  131. private ObjectMessage createMapMsg(Serializable obj) throws JMSException {
  132. // MapMessage msg = session.createMapMessage();//key-value形式的消息
  133. ObjectMessage msg = session.createObjectMessage(obj);
  134. return msg;
  135. }
  136.  
  137. /**
  138. * 创建字符串消息
  139. * @param text
  140. * @return
  141. * @throws JMSException
  142. */
  143. private TextMessage createTextMsg(String text) throws JMSException {
  144. TextMessage msg = session.createTextMessage(text);
  145. return msg;
  146. }
  147.  
  148.  
  149. /**
  150. * 获取创建者
  151. *
  152. * @param name -名称(主题名称和队列名称)
  153. * @param type -类型(true:topic,false:queue)
  154. * @return
  155. * @throws JMSException
  156. */
  157. private MessageProducer getMessageProducer(String name, boolean type) throws JMSException {
  158. return type?getTopicProducer(name):getQueueProducer(name);
  159. }
  160.  
  161. /**
  162. * 创建或获取队列
  163. * @param queueName
  164. * @return
  165. * @throws JMSException
  166. */
  167. private MessageProducer getQueueProducer(String queueName) throws JMSException {
  168. MessageProducer messageProducer = null;
  169. if ((messageProducer = queueThreadLocal.get()) == null) {
  170. Queue queue = session.createQueue(queueName);
  171. messageProducer = session.createProducer(queue);
  172. //是否持久化(1-不持久化(如果没有消费者,消息就也会自动失效),2-持久化(如果没有消费者进行消费,消息队列也会缓存消息等待消费者进行消费))
  173. messageProducer.setDeliveryMode(persistentMode?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);
  174. queueThreadLocal.set(messageProducer);
  175. }
  176. return messageProducer;
  177. }
  178.  
  179. /**
  180. * 创建或获取主题
  181. * @param topicName
  182. * @return
  183. * @throws JMSException
  184. */
  185. private MessageProducer getTopicProducer(String topicName) throws JMSException {
  186. MessageProducer messageProducer = null;
  187. if ((messageProducer = topicThreadLocal.get()) == null) {
  188. Topic topic = session.createTopic(topicName);
  189. messageProducer = session.createProducer(topic);
  190. //是否持久化(1-不持久化(如果没有消费者,消息就也会自动失效),2-持久化(如果没有消费者进行消费,消息队列也会缓存消息等待消费者进行消费))
  191. messageProducer.setDeliveryMode(persistentMode?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);
  192. topicThreadLocal.set(messageProducer);
  193. }
  194. return messageProducer;
  195. }
  196.  
  197. public String getPassword() {
  198. return password;
  199. }
  200.  
  201. public void setPassword(String password) {
  202. this.password = password;
  203. }
  204.  
  205. @Override
  206. public void receiveQueue(String queueName,MessageListener listener) throws JMSException {
  207. final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
  208. Queue topic =session.createQueue(queueName);
  209. MessageConsumer consumer=session.createConsumer(topic);
  210. consumer.setMessageListener(listener);
  211.  
  212. }
  213.  
  214. @Override
  215. public void receiveTopic(String topicName,MessageListener listener) throws JMSException {
  216. final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
  217. Topic topic =session.createTopic(topicName);
  218. MessageConsumer consumer=session.createConsumer(topic);
  219. consumer.setMessageListener(listener);
  220. }

4、测试一下Topic和Queue

  1. public static void main(String[] args) throws JMSException{
  2. //如果创建失败会立即抛出异常
  3. MsgDistributeInterface producter = new ActiveMQImpl("system", "manager", "tcp://127.0.0.1:61616");
  4. Test testMq = new Test();
  5. try {
  6. Thread.sleep(1000);
  7. } catch (InterruptedException e) {
  8. e.printStackTrace();
  9. }
  10. //Thread 1
  11. new Thread(testMq.new ProductorMq(producter)).start();
  12. //Thread 2
  13. new Thread(testMq.new ProductorMq(producter)).start();
  14. //Thread 3
  15. new Thread(testMq.new ProductorMq(producter)).start();
  16. //Thread 4
  17. new Thread(testMq.new ProductorMq(producter)).start();
  18. //Thread 5
  19. new Thread(testMq.new ProductorMq(producter)).start();
  20. //Thread 6
  21. new Thread(testMq.new ProductorMq(producter)).start();
  22.  
  23. //订阅接收线程Thread 1
  24. new Thread(new Runnable() {
  25. @Override
  26. public void run() {
  27. try {
  28. producter.receiveTopic("eguid-topic",new MessageListener() {
  29. @Override
  30. public void onMessage(Message message) {
  31. BytesMessage msg=(BytesMessage) message;
  32. System.err.println(Thread.currentThread().getName()+"订阅主题消息:"+msg.toString());
  33. }
  34. });
  35. } catch (JMSException e) {
  36. // TODO Auto-generated catch block
  37. e.printStackTrace();
  38. }
  39. }
  40. }).start();
  41. //订阅接收线程Thread 2
  42. new Thread(new Runnable() {
  43. @Override
  44. public void run() {
  45. try {
  46. producter.receiveTopic("eguid-topic",new MessageListener() {
  47. @Override
  48. public void onMessage(Message message) {
  49. BytesMessage msg=(BytesMessage) message;
  50. System.err.println(Thread.currentThread().getName()+"订阅主题消息:"+msg.toString());
  51. }
  52. });
  53. } catch (JMSException e) {
  54. // TODO Auto-generated catch block
  55. e.printStackTrace();
  56. }
  57. }
  58. }).start();
  59. //队列消息生产线程Thread-1
  60. new Thread(testMq.new QueueProductor(producter)).start();
  61. //队列消息生产线程Thread-2
  62. new Thread(testMq.new QueueProductor(producter)).start();
  63. //队列接收线程Thread 1
  64. new Thread(new Runnable() {
  65. @Override
  66. public void run() {
  67. try {
  68. producter.receiveQueue("eguid-queue",new MessageListener() {
  69. @Override
  70. public void onMessage(Message message) {
  71. BytesMessage msg=(BytesMessage) message;
  72. System.err.println(Thread.currentThread().getName()+"收到队列消息:"+msg.toString());
  73. }
  74. });
  75. } catch (JMSException e) {
  76. // TODO Auto-generated catch block
  77. e.printStackTrace();
  78. }
  79. }
  80. }).start();
  81. //队列接收线程Thread2
  82. new Thread(new Runnable() {
  83. @Override
  84. public void run() {
  85. try {
  86. producter.receiveQueue("eguid-queue",new MessageListener() {
  87. @Override
  88. public void onMessage(Message message) {
  89. BytesMessage msg=(BytesMessage) message;
  90. System.err.println(Thread.currentThread().getName()+"收到队列消息:"+msg.toString());
  91. }
  92. });
  93. } catch (JMSException e) {
  94. // TODO Auto-generated catch block
  95. e.printStackTrace();
  96. }
  97. }
  98. }).start();
  99. }
  100.  
  101. private class ProductorMq implements Runnable{
  102. Jtt809MsgProducter producter;
  103. public ProductorMq(Jtt809MsgProducter producter){
  104. this.producter = producter;
  105. }
  106.  
  107. @Override
  108. public void run() {
  109. while(true){
  110. try {
  111. String wang=Thread.currentThread().getName()+"Hello eguid! This is topic.";
  112. producter.sendTopic("eguid-topic",wang.getBytes());
  113.  
  114. Thread.sleep(2000);
  115. } catch (InterruptedException e) {
  116. e.printStackTrace();
  117. }
  118. }
  119. }
  120. }
  121.  
  122. private class QueueProductor implements Runnable{
  123. Jtt809MsgProducter producter;
  124. public QueueProductor(Jtt809MsgProducter producter){
  125. this.producter = producter;
  126. }
  127.  
  128. @Override
  129. public void run() {
  130. while(true){
  131. try {
  132. String eguid=Thread.currentThread().getName()+"Hello eguid! This is queue.";
  133. producter.sendQueue("eguid-queue",eguid.getBytes());
  134. Thread.sleep(2000);
  135. } catch (InterruptedException e) {
  136. e.printStackTrace();
  137. }
  138. }
  139. }
  140. }

-------------------End--------------------

以上是关于消息队列:快速上手ActiveMQ消息队列的JMS方式使用(两种模式:Topic和Queue的消息推送和订阅)的主要内容,如果未能解决你的问题,请参考以下文章

消息队列ActiveMQ的使用详解

JMS消息队列ActiveMQ(点对点模式)

ActiveMQ(04):JMS的模型

JMS消息队列ActiveMQ(发布/订阅模式)

Java消息队列ActiveMQ --JMS基本概念

WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 队列消息生产与消费