Apache ActiveMQ教程二 (消息主题订阅)

Posted Share_Boy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache ActiveMQ教程二 (消息主题订阅)相关的知识,希望对你有一定的参考价值。


Do not talk nonsense


Look at my code


服务端:

package mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;

import com.sun.xml.internal.ws.wsdl.writer.UsingAddressing;



public class Publisher 
	
	static int i = 0;
	public static void main(String[] args) throws JMSException, InterruptedException 
		/**
		 * 创建ActiveMQ的连接工厂
		 * ActiveMQConnection.DEFAULT_USER, 默认用户名
		 * ActiveMQConnection.DEFAULT_PASSWORD,  默认密码
		 * "tcp://127.0.0.1:61616" 连接地址
		 */
		ConnectionFactory connectionFactory = 
				new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
		
		/**
		 * 从连接工厂中获得JMS连接并启动
		 */
		Connection connection = connectionFactory.createConnection();  
        connection.start();
        /**
         * 创建一个连接的session
         * 第一个参数设定是否需要事务支持
         */
        final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        /**
         * 发送消息 而订阅者接收消息必须有个topic,名字也叫msg.714303584
         */
        Destination sendTopic = new ActiveMQTopic("msg.714303584");
        
        /**
         * 订阅消息 而订阅者发送消息必须有个receive,名字也叫msg.714303584
         */
        Destination sendReceive = new ActiveMQTopic("msg.receive");
        
        /**
         * 根据主题创建一个发送者
         */
        final MessageProducer  producer = session.createProducer(sendTopic);
        
        /**
         * 根据主题创建一个接收者
         */
        final MessageConsumer consumer = session.createConsumer(sendReceive);
        
        
        Thread sender = new Thread(new Runnable() 
			@Override
			public void run() 
				while (true) 
					/**
		        	 * TextMessage: 创建一个字符串消息
		        	 *				使用producer发送消息,session提交消息
		        	 */
					try 
						TextMessage smsg = session.createTextMessage("我是服务端:"+i++);
						System.out.println("发送消息"+i);
						producer.send(smsg);
						session.commit();
						Thread.sleep(1000);
					 catch (JMSException | InterruptedException e) 
						// TODO 自动生成的 catch 块
						e.printStackTrace();
					
					
				
				
			
		);
        
        
        Thread receiver = new Thread(new Runnable() 
			
			@Override
			public void run() 
				while (true) 
					/**
		        	 *consumer 接收消息
		        	 *记得要commit提交事务否则会出现重复读取
		        	 */
					try 
						TextMessage gmsg = (TextMessage) consumer.receive();
						System.out.println("我是服务端的接受"+gmsg.getText());
						session.commit();
					 catch (Exception e) 
						// TODO: handle exception
					
				
				
			
		);
        
        sender.start();
        
        receiver.start();
		
		
	
	



客户端:


package mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQTopic;

public class Subscriber 
	  
    static int i = 0;
	
	public static void main(String[] args) throws JMSException, InterruptedException 
		/**
		 * 创建ActiveMQ的连接工厂
		 * ActiveMQConnection.DEFAULT_USER, 默认用户名
		 * ActiveMQConnection.DEFAULT_PASSWORD,  默认密码
		 * "tcp://127.0.0.1:61616" 连接地址
		 */
		ConnectionFactory connectionFactory = 
				new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
		
		/**
		 * 从连接工厂中获得JMS连接并启动
		 */
		Connection connection = connectionFactory.createConnection();  
        connection.start();
        /**
         * 创建一个连接的session
         * 第一个参数设定是否需要事务支持
         */
        final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        /**
         * 发送消息 而订阅者接收消息必须有个topic,名字也叫msg.714303584
         */
        Destination sendTopic = new ActiveMQTopic("msg.receive");
        
        /**
         * 订阅消息 而订阅者发送消息必须有个receive,名字也叫msg.714303584
         */
        Destination sendReceive = new ActiveMQTopic("msg.714303584");
        
        /**
         * 根据主题创建一个发送者
         */
        final MessageProducer  producer = session.createProducer(sendTopic);
        /**
         * 根据主题创建一个接收者
         */
        final MessageConsumer consumer = session.createConsumer(sendReceive);
      
        
        Thread sender = new Thread(new Runnable() 
			@Override
			public void run() 
				while (true) 
					/**
		        	 * TextMessage: 创建一个字符串消息
		        	 *				使用producer发送消息,session提交消息
		        	 */
					try 
						TextMessage smsg = session.createTextMessage("我是客户端:"+i++);
						producer.send(smsg);
						session.commit();
						Thread.sleep(1000);
					 catch (JMSException | InterruptedException e) 
						// TODO 自动生成的 catch 块
						e.printStackTrace();
					
					
				
				
			
		);
        
        
        Thread receiver = new Thread(new Runnable() 
			
			@Override
			public void run() 
				while (true) 
					/**
		        	 *consumer 读取消息
		        	 *注意commit提交
		        	 */
					try 
						TextMessage gmsg = (TextMessage) consumer.receive();
						System.out.println("我是客户端的接受"+gmsg.getText());
						session.commit();
					 catch (Exception e) 
						// TODO: handle exception
					
				
				
			
		);
        
        sender.start();
        
        receiver.start();
		
	






以上是关于Apache ActiveMQ教程二 (消息主题订阅)的主要内容,如果未能解决你的问题,请参考以下文章

WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅

WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅

WSO2 ESB 5.0.0 配置 JMS 传输(ActiveMQ)- 主题消息发布与订阅

Apache ActiveMQ消息中间件的安装

ActiveMQ发布-订阅消息模式

ActiveMQ