ActiveMQ queue 代码示例
Posted MIC2016
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ queue 代码示例相关的知识,希望对你有一定的参考价值。
生产者:
package com.111.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class JMSProducer { //默认连接用户名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //默认连接密码 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //默认连接地址 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; //发送的消息数量 private static final int SENDNUM = 10; public static void main(String[] args) { //连接工厂 ConnectionFactory connectionFactory; //连接 Connection connection = null; //会话 接受或者发送消息的线程 Session session = null; //消息的目的地 Destination destination; //消息生产者 MessageProducer messageProducer; //消息队列名称 String queueName = "helloWord"; //实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL); try { //通过连接工厂获取连接 connection = connectionFactory.createConnection(); //启动连接 connection.start(); //创建session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //创建一个连接自定义队列名称的消息队列 destination = session.createQueue(queueName); //创建消息生产者 messageProducer = session.createProducer(destination); //发送消息 sendMessage(session, messageProducer); session.commit(); } catch (Exception e) { e.printStackTrace(); }finally{ if(connection != null){ try { session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } /** * 发送消息 * @param session * @param messageProducer 消息生产者 * @throws Exception */ public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{ for (int i = 0; i < SENDNUM; i++) { //创建一条文本消息 TextMessage message = session.createTextMessage("ActiveMQ 发送消息" +i); System.out.println("发送消息:Activemq 发送消息" + i); //通过消息生产者发出消息 messageProducer.send(message); } } }
消费者:
package com.111.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class JMSConsumer { //默认连接用户名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //默认连接密码 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //默认连接地址 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; public static void main(String[] args) { //连接工厂 ConnectionFactory connectionFactory; //连接 Connection connection = null; //会话 接受或者发送消息的线程 Session session; //消息的目的地 Destination destination; //消息的消费者 MessageConsumer messageConsumer; //消息队列名称 String queueName = "helloWord"; //实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL); try { //通过连接工厂获取连接 connection = connectionFactory.createConnection(); //启动连接 connection.start(); //创建session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建一个连接自定义队列名称的消息队列 destination = session.createQueue(queueName); //创建消息消费者 messageConsumer = session.createConsumer(destination); while (true) { TextMessage textMessage = (TextMessage) messageConsumer.receive(100000); if(textMessage != null){ System.out.println("收到的消息:" + textMessage.getText()); }else { break; } } } catch (JMSException e) { e.printStackTrace(); } } }
多线程生产者:
package com.111.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class JMSProducerMultithreading implements Runnable{ //默认连接用户名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //默认连接密码 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //默认连接地址 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; //发送的消息数量 private static final int SENDNUM = 3; /** * 发送消息 * @param session * @param messageProducer 消息生产者 * @throws Exception */ public static void sendMessage(Session session,MessageProducer messageProducer) throws Exception{ for (int i = 0; i < SENDNUM; i++) { //获取当前线程id String threadId = Thread.currentThread().getId()+""; //创建一条文本消息 TextMessage message = session.createTextMessage("ActiveMQ 发送消息" +i+"生产者线程编号="+threadId); //控制台打印 System.out.println("ActiveMQ 发送消息" +i+"生产者线程编号="+threadId); //通过消息生产者发出消息 messageProducer.send(message); } } @Override public void run() { //连接工厂 ConnectionFactory connectionFactory; //连接 Connection connection = null; //会话 接受或者发送消息的线程 Session session = null; //消息的目的地 Destination destination; //消息生产者 MessageProducer messageProducer; //消息队列名称 String queueName = "Multithreading"; //实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL); try { //通过连接工厂获取连接 connection = connectionFactory.createConnection(); //启动连接 connection.start(); //创建session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); //创建一个名称为HelloWorld的消息队列 destination = session.createQueue(queueName); //创建消息生产者 messageProducer = session.createProducer(destination); //发送消息 sendMessage(session, messageProducer); session.commit(); } catch (Exception e) { e.printStackTrace(); }finally{ if(connection != null){ try { session.close(); connection.close(); } catch (JMSException e) { e.printStackTrace(); } } } } }
多线程消费者:
package com.111.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class JMSConsumerMultithreading implements Runnable{ //默认连接用户名 private static final String USERNAME = ActiveMQConnection.DEFAULT_USER; //默认连接密码 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD; //默认连接地址 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL; @Override public void run() { ConnectionFactory connectionFactory;//连接工厂 Connection connection = null;//连接 Session session;//会话 接受或者发送消息的线程 Destination destination;//消息的目的地 MessageConsumer messageConsumer;//消息的消费者 //消息队列名称 String queueName = "Multithreading"; //实例化连接工厂 connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEURL); try { //通过连接工厂获取连接 connection = connectionFactory.createConnection(); //启动连接 connection.start(); //创建session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建一个连接HelloWorld的消息队列 destination = session.createQueue(queueName); //创建消息消费者 messageConsumer = session.createConsumer(destination); String threadId = Thread.currentThread().getId()+""; while (true) { TextMessage textMessage = (TextMessage) messageConsumer.receive(100000); if(textMessage != null){ System.out.println("收到的消息:" + textMessage.getText()+" 消费者线程编号="+threadId); }else { break; } } } catch (JMSException e) { e.printStackTrace(); } } }
多线程生产者测试类:
package com.111.activemq; public class JMSProducerMultithreadingTest { public static void main(String[] args) { JMSProducerMultithreading jpm = new JMSProducerMultithreading(); //启动10个生产者线程 for(int i = 0 ; i < 10 ; i++){ Thread t = new Thread(jpm); t.start(); } } }
多线程消费者测试类:
package com.111.activemq; public class JMSConsumerMultithreadingTest { public static void main(String[] args) { JMSConsumerMultithreading jcm = new JMSConsumerMultithreading(); //启动3个消费者者线程 for(int i = 0 ; i < 3 ; i++){ Thread t = new Thread(jcm); t.start(); } } }
以上是关于ActiveMQ queue 代码示例的主要内容,如果未能解决你的问题,请参考以下文章
Springboot 2.2.1 与activeMq 集成2 queue 消息