消息系统之Apache ActiveMQ
Posted cac2020
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息系统之Apache ActiveMQ相关的知识,希望对你有一定的参考价值。
一、下载运行MQ服务
1、下载ActiveMQ :http://activemq.apache.org/
2、解压缩:
进入bin目录 win32和win64对应不同位的操作系统,选择进入 点击activemq.bat 运行即可启动ActiveMQ服务。
在浏览器输入ActiveMQ 服务地址:http://127.0.0.1:8161/admin/ 默认用户名/密码 admin/admin
二、开发
jar:activemq-all-5.11.1.jar 在ActiveMQ安装目录下面就有 拷贝到工程即可
1、点对点模式
package com.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息生产者 * @author Administrator */ public class JMSProducer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址 public static void main(String[] args) { ConnectionFactory connfactory;//连接工厂 Connection conn = null;//连接 Session session;//接收或者发送消息的线程 Destination dest;//消息的目的地 MessageProducer producer;//消息的生产者 //创建连接工厂 connfactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME,JMSProducer.PASSWORD,JMSProducer.BROKEURL); try { conn = connfactory.createConnection();//获取连接 conn.start();//启动连接 session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);//以事务模式获取会话 dest = session.createQueue("FirstQueue1");//创建消息队列 producer = session.createProducer(dest);//创建消息生产者 sendMessage(session, producer);//生产并发送消息 session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { if (conn != null) { try { conn.close(); } catch (JMSException e) { e.printStackTrace(); } } } } /** * 发现哦那个消息 * @param session * @param messageProducer * @throws JMSException */ private static void sendMessage(Session session,MessageProducer messageProducer) throws JMSException { for(int i=1;i<=10;i++) { TextMessage text = session.createTextMessage("生产消息:"+i);//session用来生产消息 messageProducer.send(text);//MessageProducer用来发送消息 } } }
package com.activemq; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; /** * 消息消费者 * @author Administrator * */ public class JMSConsumer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址 public static void main(String[] args) { ConnectionFactory connfactory;//连接工厂 Connection conn = null;//连接 Session session;//接收或者发送消息的线程 Destination dest;//消息的目的地 MessageConsumer messageConsumer;//消息消费者 //创建连接工厂 connfactory = new ActiveMQConnectionFactory(JMSConsumer.USERNAME,JMSConsumer.PASSWORD,JMSConsumer.BROKEURL); try { conn = connfactory.createConnection();//获取连接 conn.start();//启动连接 session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);//以事务模式获取会话 dest = session.createQueue("FirstQueue1");//创建消息队列 messageConsumer = session.createConsumer(dest); //receive模式 // while(true) // { // TextMessage text = (TextMessage)messageConsumer.receive(100000); // if (text != null) // { // System.out.println("receive模式接收:"+text.getText()); // } // else // { // break; // } // } //监听模式 messageConsumer.setMessageListener(new Listener());// 注册消息监听 } catch (Exception e) { e.printStackTrace(); } //后期不能关闭 要一直处于监听模式 需要conn一直开启 } }
package com.activemq; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class Listener implements MessageListener { @Override public void onMessage(Message message) { try { System.out.println("监听模式接收:"+ ((TextMessage)message).getText()); } catch (JMSException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
2、发布订阅模式
package com.activemq2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; public class JMSProducer { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址 public static void main(String[] args) { ConnectionFactory connfactory;//连接工厂 Connection conn = null;//连接 Session session;//接收或者发送消息的线程 Destination dest;//消息的目的地 MessageProducer producer;//消息的生产者 //创建连接工厂 connfactory = new ActiveMQConnectionFactory(JMSProducer.USERNAME,JMSProducer.PASSWORD,JMSProducer.BROKEURL); try { conn = connfactory.createConnection();//获取连接 conn.start();//启动连接 session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);//以事务模式获取会话 dest = session.createTopic("FirstTopic1");//创建主题 与队列的区别 producer = session.createProducer(dest);//创建消息生产者 sendMessage(session, producer);//生产并发送消息 session.commit(); } catch (Exception e) { e.printStackTrace(); } finally { if (conn != null) { try { conn.close(); } catch (JMSException e) { e.printStackTrace(); } } } } /** * 发现哦那个消息 * @param session * @param messageProducer * @throws JMSException */ private static void sendMessage(Session session,MessageProducer messageProducer) throws JMSException { for(int i=1;i<=10;i++) { TextMessage text = session.createTextMessage("生产消息:"+i);//session用来生产消息 messageProducer.send(text);//MessageProducer用来发送消息 } } }
package com.activemq2; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import com.activemq.Listener; public class JMSConsumer1 { private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;//默认连接用户名 private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;//默认连接密码 private static final String BROKEURL = ActiveMQConnection.DEFAULT_BROKER_URL;//默认连接地址 public static void main(String[] args) { ConnectionFactory connfactory;//连接工厂 Connection conn = null;//连接 Session session;//接收或者发送消息的线程 Destination dest;//消息的目的地 MessageConsumer messageConsumer;//消息消费者 //创建连接工厂 connfactory = new ActiveMQConnectionFactory(JMSConsumer1.USERNAME,JMSConsumer1.PASSWORD,JMSConsumer1.BROKEURL); try { conn = connfactory.createConnection();//获取连接 conn.start();//启动连接 session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);//以事务模式获取会话 dest = session.createTopic("FirstTopic1");//创建消息主题 messageConsumer = session.createConsumer(dest); //监听模式 messageConsumer.setMessageListener(new Listener1());// 注册消息监听 } catch (Exception e) { e.printStackTrace(); } //后期不能关闭 要一直处于监听模式 需要conn一直开启 } }
package com.activemq2; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; public class Listener1 implements MessageListener { @Override public void onMessage(Message message) { try { System.out.println("监听模式1接收:"+ ((TextMessage)message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }
注意:
1、点对点和发布订阅模式的主要区别就是
dest = session.createQueue("FirstQueue1");//创建消息队列 dest = session.createTopic("FirstTopic1");//创建消息主题
2、发布订阅模式必须先订阅 再发布才能接收到。
参考
以上是关于消息系统之Apache ActiveMQ的主要内容,如果未能解决你的问题,请参考以下文章