消息队列 发送 接受
Posted xiaolei2017
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列 发送 接受相关的知识,希望对你有一定的参考价值。
package service; import java.util.Random; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.log4j.Logger; import org.junit.Test; /** * 消息队列 发送 接受 测试 * * @author fenglei.ma 2016-03-31 13:20 * http://localhost:8162/admin/queues.jsp,默认的用户名和密码:admin/admin * * 公司 代码 地址 连接 tcp://192.168.250.200:61616 * 公司 网页 消息 查看 地址 http://192.168.250.200:8161/admin/queues.jsp * * 本地测试地址 tcp://0.0.0.0:61616 * 本地查看消息地址 http://localhost:8161/admin/queues.jsp */ public class ActiveMqSendReceiveTest { public static final Logger LOG = Logger .getLogger(ActiveMqSendReceiveTest.class); private static final String RUL = "tcp://192.168.250.200:61616"; private static final String QUEUE_NAME = "dp.cjs.queue"; /* 消息发送 发送同一个管道下的所有消息 同一个管道下部分 消息类型*/ @Test public void sendMessage() throws JMSException { // JMS 客户端到JMSProvider 的连接 Connection connection = null; try { // 创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(RUL); // 从连接工厂得到连接对象 connection = (Connection) connectionFactory.createConnection(); // 启动连接 connection.start(); // 获取操作连接 发送或接收消息 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消息的目的地,消息发送到那个队列 Destination destination = session.createQueue(QUEUE_NAME); // 创建消息发送者 MessageProducer producer = session.createProducer(destination); // 设置是否持久化 // DeliveryMode.NON_PERSISTENT:不持久化 DeliveryMode.PERSISTENT:持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 发送 for (int i = 0; i < 1; i++) { String msg = "第" + i + "次发送的消息:" + new Random(); // 发送消息内容 TextMessage messeage = session.createTextMessage(msg); Thread.sleep(1000); System.out.println("发送消息:" + msg); producer.send(messeage); } // session.commit(); } catch (Exception e) { e.printStackTrace(); LOG.error("消息发送异常", e); } finally { try { if (connection != null) { connection.close(); } } catch (Exception ex) { LOG.error("连接关闭异常", ex); } } } /* 消息接收 发送同一个管道下的所有消息 同一个管道下部分 消息类型*/ @Test public void receiveMessage() throws JMSException { // JMS 客户端到JMSProvider 的连接 Connection connection = null; try { // 创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(RUL); // 从连接工厂得到连接对象 connection = (Connection) connectionFactory.createConnection(); // 启动连接 connection.start(); // 获取操作连接 发送或接收消息 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消息的目的地,消息发送到那个队列 Destination destination = session.createQueue(QUEUE_NAME); // 消息接收者,也就是消费者 MessageConsumer messageConsumer = session .createConsumer(destination); while (true) { // 1秒接受一次 Message message = messageConsumer.receive(1000); if (message != null) { String id = message.getJMSMessageID(); System.out.println("消息接受 " + id); } else { // break; } } } catch (Exception e) { e.printStackTrace(); LOG.error("消息接受异常", e); } finally { try { if (connection != null) { connection.close(); } } catch (Exception ex) { LOG.error("连接关闭异常", ex); } } } // ************************************************************************************************************************************ /* 消息发送 同一个管道下 ,分为不同的消费者 */ @Test public void sendMessageOne() throws JMSException { // JMS 客户端到JMSProvider 的连接 Connection connection = null; try { // 创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(RUL); // 从连接工厂得到连接对象 connection = (Connection) connectionFactory.createConnection(); // 启动连接 connection.start(); // 获取操作连接 发送或接收消息 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消息的目的地,消息发送到那个队列 Destination destination = session.createQueue(QUEUE_NAME); // 创建消息发送者 MessageProducer producer = session.createProducer(destination); // 设置是否持久化 // DeliveryMode.NON_PERSISTENT:不持久化 DeliveryMode.PERSISTENT:持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 发送 for (int i = 1; i <= 1; i++) { // 创建一个发送的文本文本消息 TextMessage message = session.createTextMessage("A-张三-" + i); //同一个管道下 C这个消费者 C type message.setStringProperty("type", "C"); producer.send(message); System.out.println("已发送" + message); //同一个管道下 D 消费者 type D TextMessage message1 = session.createTextMessage("B-李四-" + i); message1.setStringProperty("type", "D"); producer.send(message1); System.out.println("已发送" + message); } // session.commit(); } catch (Exception e) { e.printStackTrace(); LOG.error("消息发送异常", e); } finally { try { if (connection != null) { connection.close(); } } catch (Exception ex) { LOG.error("连接关闭异常", ex); } } } /* 消息接收 接受同一个管道下的不同的消费者 */ @Test public void receiveMessageOne() throws JMSException { // JMS 客户端到JMSProvider 的连接 Connection connection = null; try { // 创建连接工厂 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(RUL); // 从连接工厂得到连接对象 connection = (Connection) connectionFactory.createConnection(); // 启动连接 connection.start(); // 获取操作连接 发送或接收消息 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 消息的目的地,消息发送到那个队列 Destination destination = session.createQueue(QUEUE_NAME); // 消息接收者,也就是消费者 管道下的A消费者 // "type=‘D‘" 要看 公司环境具体配置的消费者名称 和 类型 MessageConsumer messageConsumer = session.createConsumer(destination, "type=‘D‘"); // messageConsumer.setMessageListener(new MessageListener() { // @Override // public void onMessage(Message message) { // TextMessage textMessage = (TextMessage) message; // try { // System.out.println("B:"+textMessage.getText()); // } catch (JMSException e) { // e.printStackTrace(); // } // } // }); while (true) { // 1秒接受一次 Message message = messageConsumer.receive(1000); if (message != null) { String id = message.getJMSMessageID(); System.out.println("消息接受 " + id); } else { break; } } } catch (Exception e) { e.printStackTrace(); LOG.error("消息接受异常", e); } finally { try { if (connection != null) { connection.close(); } } catch (Exception ex) { LOG.error("连接关闭异常", ex); } } } }
以上是关于消息队列 发送 接受的主要内容,如果未能解决你的问题,请参考以下文章