activeMQ学习---------点对点发布订阅的消息代码实现
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了activeMQ学习---------点对点发布订阅的消息代码实现相关的知识,希望对你有一定的参考价值。
以下是个人在学习activemq时从网上找到的资料, 总结留给自己以后复习
点对点的实现
2 @Test
public void sendMessage(){ 3 try { 4 // 创建一个连接工厂 5 String url = "tcp://localhost:61616"; 6 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); 7 // 设置用户名和密码,这个用户名和密码在conf目录下的credentials.properties文件中,也可以在activemq.xml中配置 8 /* connectionFactory.setUserName("system"); 9 10 11 // 创建连接 12 Connection connection = connectionFactory.createConnection(); 13 connection.start(); 14 15 // 创建Session,参数解释: 16 // 第一个参数是否使用事务:当消息发送者向消息提供者(即消息代理)发送消息时,消息发送者等待消息代理的确认,没有回应则抛出异常,消息发送程序负责处理这个错误。 17 // 第二个参数消息的确认模式: 18 // AUTO_ACKNOWLEDGE : 指定消息提供者在每次收到消息时自动发送确认。消息只向目标发送一次,但传输过程中可能因为错误而丢失消息。 19 // CLIENT_ACKNOWLEDGE : 由消息接收者确认收到消息,通过调用消息的acknowledge()方法(会通知消息提供者收到了消息) 20 // DUPS_OK_ACKNOWLEDGE : 指定消息提供者在消息接收者没有确认发送时重新发送消息(这种确认模式不在乎接收者收到重复的消息)。 21 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 22 // 创建目标,就创建主题也可以创建队列 23 Destination destination = session.createQueue("test11"); 24 // 创建消息生产者 25 MessageProducer producer = session.createProducer(destination); 26 27 // 设置持久化,DeliveryMode.PERSISTENT和DeliveryMode.NON_PERSISTENT 28 //DeliveryMode.PERSISTENT 消息持久化,当activemq重启或者传送的过程中出现在问题,消息会被保存下来,当消费者连接时还是会收到消息 29 //DeliveryMode.NOT_PERSISTENT 消息不是持久化,当activemq重启或者传送的过程中出现在问题,消息会不会被保存下来,因此容易失去消息,不可靠的连接 30 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 31 // 创建消息 32 String text = "Hello ActiveMQ!"; 33 TextMessage message = session.createTextMessage(text); 34 // 发送消息到ActiveMQ 35 producer.send(message); 36 System.out.println("Message is sent!"); 37 // 关闭资源 38 session.close(); 39 connection.close(); 40 } 41 catch (Exception e) { 42 e.printStackTrace(); 43 } 44 } 45 46 @Test 47 public void getMessage(){ 48 try{ 49 String url = "tcp://localhost:61616"; 50 ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); 51 54 // 创建连接 55 Connection connection = connectionFactory.createConnection(); 56 connection.start(); 57 // 创建Session 58 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 59 // 创建目标,就创建主题也可以创建队列 60 Destination destination = session.createQueue("test11"); 61 // 创建消息消费者 62 MessageConsumer consumer = session.createConsumer(destination); 63 // 接收消息,参数:接收消息的超时时间,为0的话则不超时,receive返回下一个消息,但是超时了或者消费者被关闭,返回null 64 Message message = consumer.receive(1000); 65 if (message instanceof TextMessage) { 66 TextMessage textMessage = (TextMessage) message; 67 String text = textMessage.getText(); 68 System.out.println("Received: " + text); 69 } else { 70 System.out.println("Received: " + message); 71 } 72 consumer.close(); 73 session.close(); 74 connection.close(); 75 } catch (Exception e) { 76 e.printStackTrace(); 77 } 78 }
下面是发布订阅的实现
1 /** 2 * topic消息的发送者 3 * 4 * @throws Exception 5 */ 6 @Test 7 public void sendMessage() throws Exception { 8 9 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); 10 11 Connection connection = activeMQConnectionFactory.createConnection(); 12 connection.start(); 13 14 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); 15 16 Topic topic = session.createTopic("topicMessage"); 17 18 //创建一个topic的生产者 19 MessageProducer producer = session.createProducer(topic); 20 21 //消息持久化 22 producer.setDeliveryMode(DeliveryMode.PERSISTENT); 23 24 String text = "send topic message seccess"; 25 26 TextMessage textMessage = session.createTextMessage(); 27 textMessage.setText(text); 28 textMessage.setStringProperty("property", "property message"); 29 // 生产者发送一条textMessage 30 producer.send(textMessage); 31 System.out.println("seccess"); 32 33 session.commit(); 34 session.close(); 35 connection.close(); 36 37 } 38 39 /** 40 * 订阅topic发送的消息 41 * @throws Exception 42 */ 43 @Test 44 public void receiveMessage() throws Exception { 45 String clientId = "client_id"; 46 47 //连接active 48 ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); 49 50 Connection connection = activeMQConnectionFactory.createConnection(); 51 52 //客户端ID,持久订阅需要设置 53 connection.setClientID(clientId); 54 connection.start(); 55 56 Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE); 57 58 Topic topic = session.createTopic("topicMessage"); 59 //实例化一个消费者 60 MessageConsumer messageConsumer = session.createDurableSubscriber(topic, clientId); 61 62 //listener 生产者发送的消息 63 messageConsumer.setMessageListener(new MessageListener() { 64 // 订阅接收方法 65 public void onMessage(Message message) { 66 67 TextMessage textMessage = (TextMessage) message; 68 try { 69 System.out.println("Received message: " + textMessage.getText() + ":" + textMessage.getStringProperty("property")); 70 } catch (JMSException e) { 71 e.printStackTrace(); 72 } 73 } 74 }); 75 }
以上是关于activeMQ学习---------点对点发布订阅的消息代码实现的主要内容,如果未能解决你的问题,请参考以下文章