ActiveMq--学习日记

Posted 飞翔的兔子哥

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMq--学习日记相关的知识,希望对你有一定的参考价值。

activeMq 简介

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

特性:

  • 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, php。应用协议:OpenWire,Stomp REST,WS Notification,XMPP,AMQP完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)

  • 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性

  • 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5
    resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上

  • 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

  • 支持通过JDBC和journal提供高速的消息持久化

  • 从设计上保证了高性能的集群,客户端-服务器,点对点

  • 支持Ajax

  • 支持与Axis的整合

  • 可以很容易得调用内嵌JMS provider,进行测试

JMS简介

JMS源于企业应用对于消息中间件的需求,使应用程序可以通过消息进行异步处理而互不影响。Sun公司和它的合作伙伴设计的JMS API定义了一组公共的应用程序接口和相应语法,使得Java程序能够和其他消息组件进行通信。JMS有四个组成部分:JMS服务提供者、消息管理对象、消息的生产者消费者和消息本身。

1. JMS服务提供者

JMS服务提供者实现消息队列和通知,同时实现消息管理的API。JMS已经是J2EE API的一部分,J2EE服务器都提供JMS服务。

2. 消息管理对象

消息管理对象提供对消息进行操作的API。JMS AP中有两个消息管理对象:创建jms连接使用的工厂(ConnectionFactory)和目的地(Destination),根据消息的消费方式的不同ConnectionFactory可以分为QueueConnectionFactory和TopicConnectionFactory,目的地(Destination)可以分为队列(Queue)和主题(Topic)两种。

3. 消息的生产者消费者

消息的产生由JMS的客户端完成,JMS服务提供者负责管理这些消息,消息的消费者可以接收消息。消息的生产者可以分为――点对点消息发布者(P2P)和主题消息发布者(TopicPublisher)。所以,消息的消费者分为两类:主题消息的订阅者(TopicSubscriber)和点对点消息的接收者(queue receiver)。

4. JMS消息

JMS消息是服务提供者和客户端之间传递信息所使用的信息单元。JMS消息由以下三部分组成:消息头(header)、属性(property)和消息体(body)。

5. 消息标头

消息标头是消息的信封,包含为使消息到达目的地所需要的所有信息,可以直接控制其中一些字段的值,其它值则由JMS提供程序填写。

JMS消息头包含了许多字段,它们是消息发送后由JMS提供者或消息发送者产生,用来表示消息、设置优先权和失效时间等等,并且为消息确定路由。

JMSDestination: 由Send方法设置。指定消息的目的地,由JMS提供程序填写

JMSDeliveryMode: 由Send方法设置。提交消息的模式-持续或非持续。发送消息后JMS提供程序填写该字段。

JMSMessageID: 由Send方法设置。包含消息的唯一标识符。发送过程中由JMS提供程序填写

JMSTimeStamp: 由Send 方法设置。记录消息被传递给send方法的时间。发送过程中由JMS提供程序填写

JMSCorrelationID: 由客户端设置。包含用于将消息连接在一起的ID。客户端一般将其置为所引用消息的ID

JMSReplyTo: 由客户端设置。响应消息的目的地,如果客户端期望得到响应消息,则填写该字段

JMSRedelivered: 由JMS提供程序设置。指出该消息先前被发送过

JMSType: 由客户端设置。包含由客户端提供的消息类型标识符。是否需要该字段,不同的提供程序有不同要求

JMSExpiration: Send 方法设置。一个根据客户端提供的年龄计算出来的值,如果GMT比该过期值晚,则销毁消息

JMSPriority: Send 方法设置。包含客户端在发送消息时所设置有限级值

5. 消息属性

消息属性,用来添加删除消息头以外的附加信息。除了上面的属性,还可以自定义属性,以便进行消息的选择 。一般通过setXXXProperty方法来定义消息属性,XXX取值为:Boolean、Byte、Double、Float、Int、Long、Object、Short及String。每一属性均由字符串名字和相关的值组成 ,例如:

TextMessage msg = tsession.createTextMessage();

msg.setStringProperty(“CUSTOMER_NAME”,”MyCustomer”);

String customer = msg.getStringProperty(“CUSTOMER_NAME”);

其中的”CUSTOMER_NAME”和”MyCustomer”就是消息当中对应的key和value。

6. 消息主体

消息主体包含了消息的核心数据。

JMS 定义了5中消息类型: TextMessage、MapMessage、BytesMessage、

StreamMessage和ObjectMessage

选择最合适的消息类型可以使JMS最有效 的处理消息。

  • TextMessage(文本消息)

将数据作为简单字符串存放在主体中(XML就可以作为字符串发)

TextMessage msg = session.createTextMessage();

msg.setText(text);

有些厂商支持一种XML专用的消息格式,带来了便利,但是不是标准的JMS类型,影响移植性。只自己定义了两个方法setText(String s)、getText()

  • MapMessage(映射消息)

使用一张映射表来存放其主体内容(参照Jms API)

MapMessage msg = session.createMapMessage();

msg.setString(“CUSTOMER_NAME”,”John”);

msg.setInt(“CUSTOMER_AGE”,12);

String s = msg.getString(“CUSTOMER_NAME”);

int age = msg.getInt(“CUSTOMER_AGE”);

  • BytesMessage(字节消息)

将字节流存放在消息主体中。适合于下列情况:必须压缩发送的大量数据、需要与现有

消息格式保持一致等(参照Jms API)

byte[] data;

BytesMessage msg = session.createBytesMessage();

msg.wirte(data);

byte[] msgData = new byte[256];

int bytesRead = msg.readBytes(msgData);

  • StreamMessage(流消息)

用于处理原语类型。这里也支持属性字段和MapMessage所支持的数据类型。使用这种

消息格式时,收发双方事先协商好字段的顺序,以保证写读顺序相同(参照Jms API)

StringMessage msg = session.createStreamMessage();

msg.writeString(“John”);

msg.writeInt(12);

String s = msg.readString();

Int age = msg.readInt();

(PS:个人认为有点像socket的信息收发)

  • ObjectMessage(对象消息)

用于往消息中写入可序列化的对象。

消息中可以存放一个对象,如果要存放多个对象,需要建立一个对象集合,然后把这个集合写入消息。

客户端接收到一个ObjectMessage时,是read-only模式。如果一个客户端试图写message,将会抛出MessageNotWriteableException。如果调用了clearBody方法,message既可以读又可以写自己只单独定义了两个方法:getObject()和setObject(Serializable s)

ObjectMessage包含的只是object的一个快照,set之后object的修改对ObjectMessage的body无效 (从两个方法可以看出,这种消息已经强制要你实现java.io. Serializable接口)

Message只读时被set抛出MessageNotWriteableException;

set和get时,如果对象序列化失败抛出MessageFormatException

7. 消息的通信方式(点对点通信和发布/订阅方式)

点对点方式(point-to-point)

点对点的消息发送方式主要建立在 Message Queue、Sender、Receiver上,

Message Queue 存贮消息,Sender 发送消息,Receiver接收消息.具体点就是Sender Client发送Message Queue ,而 Receiver Client从Queue中接收消息和”发送消息已接受”到Queue,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行。
发布/订阅方式(publish/subscriber Messaging)

发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户

端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法。

编程模式

消息产生者向JMS发送消息的步骤

  1. 创建连接使用的工厂类JMS ConnectionFactory
  2. 使用管理对象JMS ConnectionFactory建立连接Connection
  3. 使用连接Connection建立会话Session
  4. 使用会话Session和管理对象Destination创建消息生产者MessageSender
  5. 使用消息生产者MessageSender发送消息

消息消费者从JMS接受消息的步骤

  1. 创建连接使用的工厂类JMS ConnectionFactory
  2. 使用管理对象JMS ConnectionFactory建立连接Connection
  3. 使用连接Connection 建立会话Session
  4. 使用会话Session和管理对象Destination创建消息消费者MessageReceiver
  5. 使用消息消费者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver 消息消费者必须实现了MessageListener接口,需要定义onMessage事件方法。

ActiveMQ运行

ActiveMQ5.3版本默认启动时,启动了内置的jetty服务器,提供一个demo应用和用于监控ActiveMQ的admin应用。运行%activemq_home%bin/目录下的 activemq.bat , 之后你会看见如下一段话表示启动成功。

打开http://localhost:8161/admin/ ,可以查看消息队列的管理控台,如下截图:
这里写图片描述

点对点通讯模式demo

消息发送者:

package com.ainong.demo.p2pqueue;
import javax.jms.DeliveryMode;
import javax.jms.MapMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.alibaba.fastjson.JSONObject;
/**
 * 
 * <b>function:</b> Queue 方式消息发送者
 * 
 * @author dong.gang
 * 
 */

public class QueueSender {
    // tcp 地址
    public static final String BROKER_URL = "tcp://localhost:61616";

    // 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
    public static final String DESTINATION_PAYMENT = "mq.queue.payment";
    public static final String DESTINATION_QUERY = "mq.queue.query";

    public QueueSender() {

    }

    /**
     * 
     * <b>function:</b> 发送消息
     * 
     * @throws Exception
     */

    public void sendMessage(QueueSession session, javax.jms.QueueSender sender) throws Exception {

        for (int i = 0; i < SEND_NUM; i++) {

            JSONObject msgJson = new JSONObject();
            msgJson.put("seqId", i+1);
            msgJson.put("content", "发送第" + (i + 1) + "条消息");

            MapMessage map = session.createMapMessage();
            map.setString("msg", msgJson.toJSONString());
            map.setLong("time", System.currentTimeMillis());
            System.out.println(map);
            sender.send(map);
        }
    }

    public void run(String mode) throws Exception {

        QueueConnection connection = null;
        QueueSession session = null;
        try {
            // 创建链接工厂
            QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
            // 通过工厂创建一个连接
            connection = factory.createQueueConnection();
            // 启动连接
            connection.start();
            // 创建一个session会话
            session = connection.createQueueSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);
            String queueDis = "1".equals(mode) ? DESTINATION_PAYMENT : DESTINATION_QUERY;
            // 创建一个消息队列
            Queue queue = session.createQueue(queueDis);
            // 创建消息发送者
            javax.jms.QueueSender sender = session.createSender(queue);
            // 设置持久化模式
            sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            sendMessage(session, sender);
            // 提交会话
            session.commit();
        } catch (Exception e) {
            throw e;
        } finally {
            // 关闭释放资源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }

    public static void main(String[] args) throws Exception {

        QueueSender sender = new QueueSender();
        // sender.run("1");
        sender.run("2");
    }
}

消息接收者:

package com.ainong.demo.p2pqueue;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import com.alibaba.fastjson.JSONObject;
/**
 * 
 * <b>function:</b> 消息接收者
 * 
 * @author donggang
 * 
 */

public class QueueReceiver {

    // tcp 地址
    public static final String BROKER_URL = "tcp://localhost:61616";
    // 目标,在ActiveMQ管理员控制台创建 http://localhost:8161/admin/queues.jsp
    public static final String TARGET_PAYMENT = "mq.queue.payment";
    public static final String TARGET_QUERY = "mq.queue.query";

    public void run(String mode) throws Exception {

        QueueConnection connection = null;
        QueueSession session = null;
        try {
            // 创建链接工厂
            QueueConnectionFactory factory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD, BROKER_URL);
            // 通过工厂创建一个连接
            connection = factory.createQueueConnection();
            // 启动连接
            connection.start();
            // 创建一个session会话
            session = connection.createQueueSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
            // 创建一个消息队列
            String queueTarg = "1".equals(mode) ? TARGET_PAYMENT : TARGET_QUERY;
            Queue queue = session.createQueue(queueTarg);

            // 创建消息制作者
            javax.jms.QueueReceiver receiver = session.createReceiver(queue);
            receiver.setMessageListener(new MessageListener() {
                public void onMessage(Message msg) {
                    if (msg != null) {
                        MapMessage map = (MapMessage) msg;
                        try {
                            JSONObject jsonObj = JSONObject.parseObject(map.getString("msg"));
                            if ("3".equals(jsonObj.getString("seqId"))) {
                                System.out.println(map.getLong("time") + "接收#" + map.getString("msg"));                             
                            }
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            // 提交会话
            session.commit();
            // 休眠1s再关闭
            Thread.sleep(1000);

        } catch (Exception e) {
            throw e;
        } finally {
            // 关闭释放资源
            if (session != null) {
                session.close();
            }
            if (connection != null) {
                connection.close();
            }
        }
    }

    public static void main(String[] args) throws Exception {

        QueueReceiver receiver = new QueueReceiver();
        // receiver.run("1");
        receiver.run("2");
    }
}

其实上边的消息接收者已经集成了消息监听类,如果我们需要分离业务操作,可以 receiver.setMessageListener()参数中设为我们的业务监听处理类(需要实现MessageListener类):

package com.ainong.demo.p2pqueue;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageListener;

public class QueueMsgListenner implements MessageListener {

    @Override
    public void onMessage(Message msg) {
        if (msg != null) {
            MapMessage map = (MapMessage) msg;
            try {
                System.out.println(map.getLong("time") + "接收#" + map.getString("id000"));
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
}

发布/订阅模式 demo

消息发布者:

package com.ainong.demo.publish_subscrib;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQMapMessage;

import com.alibaba.fastjson.JSONObject;

public class Publisher {
    protected static String brokerURL = "tcp://localhost:61616";
    protected static transient ConnectionFactory factory;
    protected transient Connection connection;
    protected transient Session session;
    protected transient MessageProducer producer;

    public Publisher() throws JMSException {
        factory = new ActiveMQConnectionFactory(brokerURL);
        connection = factory.createConnection();
        try {
            // 持久化订阅(消息会保留,特定的消费者可以一段时间后进行消费)
            connection.setClientID("client1");
            connection.start();
        } catch (JMSException jmse) {
            connection.close();
            throw jmse;
        }

        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        producer = session.createProducer(null);
        // 发送消息时用使用持久模式(不设置,默认就是持久的)
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    }

    public void close() throws JMSException {
        if (connection != null) {
            connection.close();
        }
    }

    protected void sendMessage(JSONObject msg) throws JMSException {
        Destination destination = session.createTopic("demo");
        Message message = createMessage(msg, session);
        System.out.println("消息发送: "
                + ((ActiveMQMapMessage) message).getContentMap()
                + " on destination: " + destination);
        producer.send(destination, message);
    }

    protected Message createMessage(JSONObject msg, Session session)
            throws JMSException {

        MapMessage message = session.createMapMessage();
        message.setString("id", msg.getString("id"));
        message.setString("content", msg.getString("content"));
        return message;
    }

    public static void main(String[] args) throws JMSException,
            InterruptedException {
        Publisher publisher = new Publisher();
        for (int i = 0; i < 3; i++) {
            JSONObject msgObject = new JSONObject();
            msgObject.put("id", "msg00" + i);
            msgObject.put("content", "第" + i + "条消息");
            publisher.sendMessage(msgObject);
            Thread.sleep(3000);
        }
        publisher.close();
    }
}

消息订阅者1(持久化订阅)

package com.ainong.demo.publish_subscrib;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 持久化订阅模式(消息会保留,消费者可以在任意时间消费消息)
 * 
 * @author DG
 *
 */
public class Consumer1 {

    private static String brokerURL = "tcp://localhost:61616";
    private static transient ConnectionFactory factory;
    private transient Connection connection;
    private transient Session session;

    public Consumer1() throws JMSException {
        factory = new ActiveMQConnectionFactory(brokerURL);
        connection = factory.createConnection();
        connection.setClientID("client1");
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    }

    public void close() throws JMSException {
        if (connection != null) {
            connection.close();
        }
    }

    public Session getSession() {
        return session;
    }

    public static void main(String[] args) throws JMSException {
        Consumer1 consumer = new Consumer1();
        Topic topic = consumer.getSession().createTopic("demo");
        // 普通订阅
        // MessageConsumer messageConsumer =
        // consumer.getSession().createConsumer(
        // destination);
        // 持久化订阅
        MessageConsumer messageConsumer =consumer.getSession().createDurableSubscriber(topic,"client1"); //持久订阅
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    MapMessage map = (MapMessage) message;
                    String id = map.getString("id");
                    String content = map.getString("content");
                    System.out.println("消费者1,消息接收:id = " + id + ";content = "
                            + content);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

常规订阅者:

package com.ainong.demo.publish_subscrib;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 常规订阅模式(消费者必须在消息生产前就已经启动监听消息,否则错过消息之后,消费就会失效,不会被处理)
 * 
 * @author DG
 *
 */
public class Consumer2 {

    private static String brokerURL = "tcp://localhost:61616";
    private static transient ConnectionFactory factory;
    private transient Connection connection;
    private transient Session session;

    public Consumer2() throws JMSException {
        factory = new ActiveMQConnectionFactory(brokerURL);
        connection = factory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    }

    public void close() throws JMSException {
        if (connection != null) {
            connection.close();
        }
    }

    public Session getSession() {
        return session;
    }

    public static void main(String[] args) throws JMSException {
        Consumer2 consumer = new Consumer2();
        Destination destination = consumer.getSession().createTopic("demo");
        MessageConsumer messageConsumer = consumer.getSession().createConsumer(
                destination);
        messageConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    MapMessage map = (MapMessage) message;
                    String id = map.getString("id");
                    String content = map.getString("content");
                    System.out.println("消费者2:消息接收:id = " + id + ";content = "
                            + content);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

上面只是我初识activemq时的一些心得,附代码源码:
http://download.csdn.net/detail/donggang1992/9561041

以上是关于ActiveMq--学习日记的主要内容,如果未能解决你的问题,请参考以下文章

Python学习日记之练习代码

Wildfly - 配置 ActiveMQ 以使用 Postgres 日志

python学习日记:day15:------内置函数

Python学习日记 3/10

Python 学习日记 第九天

Python 学习日记第七篇 -- 函数