ActiveMQ第一个示例

Posted 码上学习

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ第一个示例相关的知识,希望对你有一定的参考价值。

首先先安装ActiveMQ:https://www.cnblogs.com/hejianliang/p/9149590.html 

创建Java项目,把 activemq-all-5.15.4.jar 包导入到项目。

本次案例主要有两个角色,分别是 新闻发布者(NewsPublisher)、新闻订阅者(NewsSubscriber);发布者相当于 生产者,负责生产消息,订阅者相当于 消费者,负责接收消息。

 

新闻发布者(NewsPublisher)

package edu.activemq.publisher;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 新闻发布者(生产者)
 * @author Administrator
 *
 */
public class NewsPublisher {

	/**
	 * ActiveMQ连接用户名
	 */
	private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
	
	/**
	 * ActiveMQ连接密码
	 */
	private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
	
	/**
	 * ActiveMQ连接地址
	 */
	private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
	
	/**
	 * 消息队列名称
	 */
	private static final String QUEUE_NAME = "MY_QUEUE";
	
	/**
	 * 发布数量
	 */
	private static final Integer SEND_NUMBER = 5000;
	
	/**
	 * 发布新闻
	 */
	public static void main(String[] args) {
		// 声明连接工厂
		ConnectionFactory connectionFactory;
		
		// 声明连接
		Connection connection = null;
		
		// 声明会话,接收或者发送信息的线程
		Session session;
		
		// 声明消息的目的地
		Destination destination;
		
		// 声明消息生产者
		MessageProducer messageProducer;
		
		try {
			// 实例化连接工厂
			connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
			
			// 通过连接工厂创建 连接
			connection = connectionFactory.createConnection();
			
			// 启动连接
			connection.start();
			
			// 创建session 参数1:开启事物  参数2:消息确认方式
			session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
			
			// 创建消息队列
			destination = session.createQueue(QUEUE_NAME);
			
			// 创建消息生产者
			messageProducer = session.createProducer(destination);
			
			// 发送消息
			sendMessage(session, messageProducer);
			
			// 提交事务
			session.commit();
		} catch (Exception e) {
			System.out.println("发布新闻失败.");
			e.printStackTrace();
		} finally {
			// 关闭资源
			if (null != connection) {
				try {
					connection.close();
				} catch (Exception e) {
					System.out.println("关闭资源失败.");
					e.printStackTrace();
				}
			}
		}
	}
	
	/**
	 * 发送消息
	 * @param session
	 * @param messageProducer
	 */
	private static void sendMessage(Session session, MessageProducer messageProducer) {
		// 声明消息对象
		TextMessage textMessage;
		
		try {
			for (int i = 0; i <= SEND_NUMBER; i++) {
				// 创建消息
				textMessage = session.createTextMessage("当前发送的新闻xxx, 编号为: " + i);
				
				// 发送消息
				messageProducer.send(textMessage);
			}
		} catch (Exception e) {
			System.out.println("发送新闻消息失败.");
			e.printStackTrace();
		}
	}
	
}

 

新闻订阅者(NewsSubscriber)

package edu.activemq.subscriber;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;

import edu.activemq.listener.NewsSubscriberListener;

/**
 * 新闻订阅者(消费者)
 * @author Administrator
 *
 */
public class NewsSubscriber {

	/**
	 * ActiveMQ连接用户名
	 */
	private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
	
	/**
	 * ActiveMQ连接密码
	 */
	private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
	
	/**
	 * ActiveMQ连接地址
	 */
	private static final String BROKER_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
	
	/**
	 * 消息队列名称
	 */
	private static final String QUEUE_NAME = "MY_QUEUE";
	
	public static void main(String[] args) {
		// 声明连接工厂
		ConnectionFactory connectionFactory;
		
		// 连接
		Connection connection = null;
		
		// 会话
		Session session;
		
		// 消息目的地
		Destination destination;
		
		// 消息的消费者
		MessageConsumer messageConsumer;
		
		// 实例化连接工厂
		connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKER_URL);
		try {
			// 获取连接
			connection = connectionFactory.createConnection();
			// 启动连接
			connection.start(); 
			
			// 获取Session
			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
			
			// 创建连接的消息队列
			destination = session.createQueue(QUEUE_NAME);
			 
			// 创建消息的消费者
			messageConsumer = session.createConsumer(destination);
			
			// 注册消息监听
			messageConsumer.setMessageListener(new NewsSubscriberListener());
			
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
}

 

新闻监听器(NewsSubscriberListener)

package edu.activemq.listener;

import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

/**
 * 新闻监听器
 * @author Administrator
 *
 */
public class NewsSubscriberListener implements MessageListener {

	@Override
	public void onMessage(Message message) {
		try {
			// 创建文本消息对象
			TextMessage textMessage = (TextMessage) message;
			
			// 输出监听到的消息
			System.out.println("NewsSubscriberListener 监听到的消息: " + textMessage.getText());
		} catch (Exception e) {
 			e.printStackTrace();
		}
	}

}

  

新闻发布者(NewsPublisher):负责生产消息,将消息发送到队列里面去。

新闻订阅者(NewsSubscriber):负责消费消息,从消息队列里面取出消息。

新闻监听器(NewsSubscriberListener):新闻订阅者的一个监听器,辅助订阅者监听队列的消息。

 

运行结果:

发布新闻:

 

订阅者消费:

 

 

 

 

运行后,队列里的消息已经被订阅者消费了。

 

以上是关于ActiveMQ第一个示例的主要内容,如果未能解决你的问题,请参考以下文章

Java连接ActiveMQ代码示例(Producer和Consumer)

进程内 ActiveMQ 生产者/消费者示例?

ACTIVEMQ-发布者订阅者hello world示例

ActiveMQ消息生产者自动注入报错:Could not autowire. No beans of 'JmsMessagingTemplate' type found(示例代码(代

在运行时加载第 3 方 DLL 失败并出现未处理异常(Log4CXX、ActiveMQ)

ActiveMQ简单示例