ActiveMq--学习日记
Posted 飞翔的兔子哥
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMq--学习日记相关的知识,希望对你有一定的参考价值。
activeMq 简介
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
特性:
多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, php。应用协议:OpenWire,Stomp REST,WS Notification,XMPP,AMQP完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5
resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
支持通过JDBC和journal提供高速的消息持久化
从设计上保证了高性能的集群,客户端-服务器,点对点
支持Ajax
支持与Axis的整合
可以很容易得调用内嵌JMS provider,进行测试
JMS简介
JMS源于企业应用对于消息中间件的需求,使应用程序可以通过消息进行异步处理而互不影响。Sun公司和它的合作伙伴设计的JMS API定义了一组公共的应用程序接口和相应语法,使得Java程序能够和其他消息组件进行通信。JMS有四个组成部分:JMS服务提供者、消息管理对象、消息的生产者消费者和消息本身。
1. JMS服务提供者
JMS服务提供者实现消息队列和通知,同时实现消息管理的API。JMS已经是J2EE API的一部分,J2EE服务器都提供JMS服务。
2. 消息管理对象
消息管理对象提供对消息进行操作的API。JMS AP中有两个消息管理对象:创建jms连接使用的工厂(ConnectionFactory)和目的地(Destination),根据消息的消费方式的不同ConnectionFactory可以分为QueueConnectionFactory和TopicConnectionFactory,目的地(Destination)可以分为队列(Queue)和主题(Topic)两种。
3. 消息的生产者消费者
消息的产生由JMS的客户端完成,JMS服务提供者负责管理这些消息,消息的消费者可以接收消息。消息的生产者可以分为――点对点消息发布者(P2P)和主题消息发布者(TopicPublisher)。所以,消息的消费者分为两类:主题消息的订阅者(TopicSubscriber)和点对点消息的接收者(queue receiver)。
4. JMS消息
JMS消息是服务提供者和客户端之间传递信息所使用的信息单元。JMS消息由以下三部分组成:消息头(header)、属性(property)和消息体(body)。
5. 消息标头
消息标头是消息的信封,包含为使消息到达目的地所需要的所有信息,可以直接控制其中一些字段的值,其它值则由JMS提供程序填写。
JMS消息头包含了许多字段,它们是消息发送后由JMS提供者或消息发送者产生,用来表示消息、设置优先权和失效时间等等,并且为消息确定路由。
JMSDestination: 由Send方法设置。指定消息的目的地,由JMS提供程序填写
JMSDeliveryMode: 由Send方法设置。提交消息的模式-持续或非持续。发送消息后JMS提供程序填写该字段。
JMSMessageID: 由Send方法设置。包含消息的唯一标识符。发送过程中由JMS提供程序填写
JMSTimeStamp: 由Send 方法设置。记录消息被传递给send方法的时间。发送过程中由JMS提供程序填写
JMSCorrelationID: 由客户端设置。包含用于将消息连接在一起的ID。客户端一般将其置为所引用消息的ID
JMSReplyTo: 由客户端设置。响应消息的目的地,如果客户端期望得到响应消息,则填写该字段
JMSRedelivered: 由JMS提供程序设置。指出该消息先前被发送过
JMSType: 由客户端设置。包含由客户端提供的消息类型标识符。是否需要该字段,不同的提供程序有不同要求
JMSExpiration: Send 方法设置。一个根据客户端提供的年龄计算出来的值,如果GMT比该过期值晚,则销毁消息
JMSPriority: Send 方法设置。包含客户端在发送消息时所设置有限级值
5. 消息属性
消息属性,用来添加删除消息头以外的附加信息。除了上面的属性,还可以自定义属性,以便进行消息的选择 。一般通过setXXXProperty方法来定义消息属性,XXX取值为:Boolean、Byte、Double、Float、Int、Long、Object、Short及String。每一属性均由字符串名字和相关的值组成 ,例如:
TextMessage msg = tsession.createTextMessage();
msg.setStringProperty(“CUSTOMER_NAME”,”MyCustomer”);
String customer = msg.getStringProperty(“CUSTOMER_NAME”);
其中的”CUSTOMER_NAME”和”MyCustomer”就是消息当中对应的key和value。
6. 消息主体
消息主体包含了消息的核心数据。
JMS 定义了5中消息类型: TextMessage、MapMessage、BytesMessage、
StreamMessage和ObjectMessage
选择最合适的消息类型可以使JMS最有效 的处理消息。
- TextMessage(文本消息)
将数据作为简单字符串存放在主体中(XML就可以作为字符串发)
TextMessage msg = session.createTextMessage();
msg.setText(text);
有些厂商支持一种XML专用的消息格式,带来了便利,但是不是标准的JMS类型,影响移植性。只自己定义了两个方法setText(String s)、getText()
- MapMessage(映射消息)
使用一张映射表来存放其主体内容(参照Jms API)
MapMessage msg = session.createMapMessage();
msg.setString(“CUSTOMER_NAME”,”John”);
msg.setInt(“CUSTOMER_AGE”,12);
String s = msg.getString(“CUSTOMER_NAME”);
int age = msg.getInt(“CUSTOMER_AGE”);
- BytesMessage(字节消息)
将字节流存放在消息主体中。适合于下列情况:必须压缩发送的大量数据、需要与现有
消息格式保持一致等(参照Jms API)
byte[] data;
BytesMessage msg = session.createBytesMessage();
msg.wirte(data);
byte[] msgData = new byte[256];
int bytesRead = msg.readBytes(msgData);
- StreamMessage(流消息)
用于处理原语类型。这里也支持属性字段和MapMessage所支持的数据类型。使用这种
消息格式时,收发双方事先协商好字段的顺序,以保证写读顺序相同(参照Jms API)
StringMessage msg = session.createStreamMessage();
msg.writeString(“John”);
msg.writeInt(12);
String s = msg.readString();
Int age = msg.readInt();
(PS:个人认为有点像socket的信息收发)
- ObjectMessage(对象消息)
用于往消息中写入可序列化的对象。
消息中可以存放一个对象,如果要存放多个对象,需要建立一个对象集合,然后把这个集合写入消息。
客户端接收到一个ObjectMessage时,是read-only模式。如果一个客户端试图写message,将会抛出MessageNotWriteableException。如果调用了clearBody方法,message既可以读又可以写自己只单独定义了两个方法:getObject()和setObject(Serializable s)
ObjectMessage包含的只是object的一个快照,set之后object的修改对ObjectMessage的body无效 (从两个方法可以看出,这种消息已经强制要你实现java.io. Serializable接口)
Message只读时被set抛出MessageNotWriteableException;
set和get时,如果对象序列化失败抛出MessageFormatException
7. 消息的通信方式(点对点通信和发布/订阅方式)
点对点方式(point-to-point)
点对点的消息发送方式主要建立在 Message Queue、Sender、Receiver上,
Message Queue 存贮消息,Sender 发送消息,Receiver接收消息.具体点就是Sender Client发送Message Queue ,而 Receiver Client从Queue中接收消息和”发送消息已接受”到Queue,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行。
发布/订阅方式(publish/subscriber Messaging)
发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户
端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法。
编程模式
消息产生者向JMS发送消息的步骤
- 创建连接使用的工厂类JMS ConnectionFactory
- 使用管理对象JMS ConnectionFactory建立连接Connection
- 使用连接Connection建立会话Session
- 使用会话Session和管理对象Destination创建消息生产者MessageSender
- 使用消息生产者MessageSender发送消息
消息消费者从JMS接受消息的步骤
- 创建连接使用的工厂类JMS ConnectionFactory
- 使用管理对象JMS ConnectionFactory建立连接Connection
- 使用连接Connection 建立会话Session
- 使用会话Session和管理对象Destination创建消息消费者MessageReceiver
- 使用消息消费者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver 消息消费者必须实现了MessageListener接口,需要定义onMessage事件方法。
ActiveMQ运行
ActiveMQ5.3版本默认启动时,启动了内置的jetty服务器,提供一个demo应用和用于监控ActiveMQ的admin应用。运行%activemq_home%bin/目录下的 activemq.bat , 之后你会看见如下一段话表示启动成功。
打开http://localhost:8161/admin/ ,可以查看消息队列的管理控台,如下截图:
点对点通讯模式demo
消息发送者:
package com.ainong.demo.p2pqueue;
import javax.jms.DeliveryMode;
import javax.jms.MapMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.alibaba.fastjson.JSONObject;
/**
*
* <b>function:</b> Queue 方式消息发送者
*
* @author dong.gang
*
*/
public class QueueSender {
// tcp 地址
public static final String BROKER_URL = "tcp://localhost:61616";
// 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
public static final String DESTINATION_PAYMENT = "mq.queue.payment";
public static final String DESTINATION_QUERY = "mq.queue.query";
public QueueSender() {
}
/**
*
* <b>function:</b> 发送消息
*
* @throws Exception
*/
public void sendMessage(QueueSession session, javax.jms.QueueSender sender) throws Exception {
for (int i = 0; i < SEND_NUM; i++) {
JSONObject msgJson = new JSONObject();
msgJson.put("seqId", i+1);
msgJson.put("content", "发送第" + (i + 1) + "条消息");
MapMessage map = session.createMapMessage();
map.setString("msg", msgJson.toJSONString());
map.setLong("time", System.currentTimeMillis());
System.out.println(map);
sender.send(map);
}
}
public void run(String mode) throws Exception {
QueueConnection connection = null;
QueueSession session = null;
try {
// 创建链接工厂
QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通过工厂创建一个连接
connection = factory.createQueueConnection();
// 启动连接
connection.start();
// 创建一个session会话
session = connection.createQueueSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
String queueDis = "1".equals(mode) ? DESTINATION_PAYMENT : DESTINATION_QUERY;
// 创建一个消息队列
Queue queue = session.createQueue(queueDis);
// 创建消息发送者
javax.jms.QueueSender sender = session.createSender(queue);
// 设置持久化模式
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
sendMessage(session, sender);
// 提交会话
session.commit();
} catch (Exception e) {
throw e;
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
QueueSender sender = new QueueSender();
// sender.run("1");
sender.run("2");
}
}
消息接收者:
package com.ainong.demo.p2pqueue;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.alibaba.fastjson.JSONObject;
/**
*
* <b>function:</b> 消息接收者
*
* @author donggang
*
*/
public class QueueReceiver {
// tcp 地址
public static final String BROKER_URL = "tcp://localhost:61616";
// 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
public static final String TARGET_PAYMENT = "mq.queue.payment";
public static final String TARGET_QUERY = "mq.queue.query";
public void run(String mode) throws Exception {
QueueConnection connection = null;
QueueSession session = null;
try {
// 创建链接工厂
QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
// 通过工厂创建一个连接
connection = factory.createQueueConnection();
// 启动连接
connection.start();
// 创建一个session会话
session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
// 创建一个消息队列
String queueTarg = "1".equals(mode) ? TARGET_PAYMENT : TARGET_QUERY;
Queue queue = session.createQueue(queueTarg);
// 创建消息制作者
javax.jms.QueueReceiver receiver = session.createReceiver(queue);
receiver.setMessageListener(new MessageListener() {
public void onMessage(Message msg) {
if (msg != null) {
MapMessage map = (MapMessage) msg;
try {
JSONObject jsonObj = JSONObject.parseObject(map.getString("msg"));
if ("3".equals(jsonObj.getString("seqId"))) {
System.out.println(map.getLong("time") + "接收#" + map.getString("msg"));
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 提交会话
session.commit();
// 休眠1s再关闭
Thread.sleep(1000);
} catch (Exception e) {
throw e;
} finally {
// 关闭释放资源
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
}
}
public static void main(String[] args) throws Exception {
QueueReceiver receiver = new QueueReceiver();
// receiver.run("1");
receiver.run("2");
}
}
其实上边的消息接收者已经集成了消息监听类,如果我们需要分离业务操作,可以 receiver.setMessageListener()参数中设为我们的业务监听处理类(需要实现MessageListener类):
package com.ainong.demo.p2pqueue;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
public class QueueMsgListenner implements MessageListener {
@Override
public void onMessage(Message msg) {
if (msg != null) {
MapMessage map = (MapMessage) msg;
try {
System.out.println(map.getLong("time") + "接收#" + map.getString("id000"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
发布/订阅模式 demo
消息发布者:
package com.ainong.demo.publish_subscrib;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMapMessage;
import com.alibaba.fastjson.JSONObject;
public class Publisher {
protected static String brokerURL = "tcp://localhost:61616";
protected static transient ConnectionFactory factory;
protected transient Connection connection;
protected transient Session session;
protected transient MessageProducer producer;
public Publisher() throws JMSException {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
try {
// 持久化订阅(消息会保留,特定的消费者可以一段时间后进行消费)
connection.setClientID("client1");
connection.start();
} catch (JMSException jmse) {
connection.close();
throw jmse;
}
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
producer = session.createProducer(null);
// 发送消息时用使用持久模式(不设置,默认就是持久的)
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
}
public void close() throws JMSException {
if (connection != null) {
connection.close();
}
}
protected void sendMessage(JSONObject msg) throws JMSException {
Destination destination = session.createTopic("demo");
Message message = createMessage(msg, session);
System.out.println("消息发送: "
+ ((ActiveMQMapMessage) message).getContentMap()
+ " on destination: " + destination);
producer.send(destination, message);
}
protected Message createMessage(JSONObject msg, Session session)
throws JMSException {
MapMessage message = session.createMapMessage();
message.setString("id", msg.getString("id"));
message.setString("content", msg.getString("content"));
return message;
}
public static void main(String[] args) throws JMSException,
InterruptedException {
Publisher publisher = new Publisher();
for (int i = 0; i < 3; i++) {
JSONObject msgObject = new JSONObject();
msgObject.put("id", "msg00" + i);
msgObject.put("content", "第" + i + "条消息");
publisher.sendMessage(msgObject);
Thread.sleep(3000);
}
publisher.close();
}
}
消息订阅者1(持久化订阅)
package com.ainong.demo.publish_subscrib;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 持久化订阅模式(消息会保留,消费者可以在任意时间消费消息)
*
* @author DG
*
*/
public class Consumer1 {
private static String brokerURL = "tcp://localhost:61616";
private static transient ConnectionFactory factory;
private transient Connection connection;
private transient Session session;
public Consumer1() throws JMSException {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
connection.setClientID("client1");
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public void close() throws JMSException {
if (connection != null) {
connection.close();
}
}
public Session getSession() {
return session;
}
public static void main(String[] args) throws JMSException {
Consumer1 consumer = new Consumer1();
Topic topic = consumer.getSession().createTopic("demo");
// 普通订阅
// MessageConsumer messageConsumer =
// consumer.getSession().createConsumer(
// destination);
// 持久化订阅
MessageConsumer messageConsumer =consumer.getSession().createDurableSubscriber(topic,"client1"); //持久订阅
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
MapMessage map = (MapMessage) message;
String id = map.getString("id");
String content = map.getString("content");
System.out.println("消费者1,消息接收:id = " + id + ";content = "
+ content);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
常规订阅者:
package com.ainong.demo.publish_subscrib;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 常规订阅模式(消费者必须在消息生产前就已经启动监听消息,否则错过消息之后,消费就会失效,不会被处理)
*
* @author DG
*
*/
public class Consumer2 {
private static String brokerURL = "tcp://localhost:61616";
private static transient ConnectionFactory factory;
private transient Connection connection;
private transient Session session;
public Consumer2() throws JMSException {
factory = new ActiveMQConnectionFactory(brokerURL);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public void close() throws JMSException {
if (connection != null) {
connection.close();
}
}
public Session getSession() {
return session;
}
public static void main(String[] args) throws JMSException {
Consumer2 consumer = new Consumer2();
Destination destination = consumer.getSession().createTopic("demo");
MessageConsumer messageConsumer = consumer.getSession().createConsumer(
destination);
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
MapMessage map = (MapMessage) message;
String id = map.getString("id");
String content = map.getString("content");
System.out.println("消费者2:消息接收:id = " + id + ";content = "
+ content);
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
上面只是我初识activemq时的一些心得,附代码源码:
http://download.csdn.net/detail/donggang1992/9561041
以上是关于ActiveMq--学习日记的主要内容,如果未能解决你的问题,请参考以下文章