分布式--ActiveMQ 消息中间件

Posted 凌浩雨

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式--ActiveMQ 消息中间件相关的知识,希望对你有一定的参考价值。


1. ActiveMQ

1). ActiveMQ

ActiveMQ是Apache所提供的一个开源的消息系统,完全采用Java来实现,因此,它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服务)规范。JMS是一组Java应用程序接口,它提供消息的创建、发送、读取等一系列服务。JMS提供了一组公共应用程序接口和响应的语法,类似于Java数据库的统一访问接口JDBC,它是一种与厂商无关的API,使得Java程序能够与不同厂商的消息组件很好地进行通信。

2). Java Message Service(JMS)

JMS支持两种消息发送和接收模型。

  • 一种称为P2P(Ponit to Point)模型,即采用点对点的方式发送消息。P2P模型是基于队列的,消息生产者发送消息到队列,消息消费者从队列中接收消息,队列的存在使得消息的异步传输称为可能,P2P模型在点对点的情况下进行消息传递时采用。

    图1.png
  • 另一种称为Pub/Sub(Publish/Subscribe,即发布-订阅)模型,发布-订阅模型定义了如何向一个内容节点发布和订阅消息,这个内容节点称为topic(主题)。主题可以认为是消息传递的中介,消息发布这将消息发布到某个主题,而消息订阅者则从主题订阅消息。主题使得消息的订阅者与消息的发布者互相保持独立,不需要进行接触即可保证消息的传递,发布-订阅模型在消息的一对多广播时采用。

    分布式--ActiveMQ 消息中间件(一)
    图2.png
3). JMS术语
  • Provider/MessageProvider:生产者

  • Consumer/MessageConsumer:消费者

  • PTP:Point To Point,点对点通信消息模型

  • Pub/Sub:Publish/Subscribe,发布订阅消息模型

  • Queue:队列,目标类型之一,和PTP结合

  • Topic:主题,目标类型之一,和Pub/Sub结合

  • ConnectionFactory:连接工厂,JMS用它创建连接

  • Connnection:JMS Client到JMS Provider的连接

  • Destination:消息目的地,由Session创建

  • Session:会话,由Connection创建,实质上就是发送、接受消息的一个线程,因此生产者、消费者都是Session创建的

4). ActiveMQ下载


分布式--ActiveMQ 消息中间件(一)
图3.png


  • bin (windows下面的bat(分32、64位)和unix/linux下面的sh)

  • conf (activeMQ配置目录,包含最基本的activeMQ配置文件)

  • data (默认是空的)

  • docs (index,replease版本里面没有文档,-.-b不知道为啥不带)

  • example (几个例子)

  • lib (activemMQ使用到的lib)

  • webapps 注意ActiveMQ自带Jetty提供Web管控台

  • webapps-demo 示例

  • activemq-all-5.15.3.jar

  • LICENSE.txt

  • README.txt

5). 配置
  • Web控制台账号和密码(apache-activemq-5.15.3conf)

    分布式--ActiveMQ 消息中间件(一)
    图4.png
  • 网络端口(apache-activemq-5.15.3conf)--默认为8161

    分布式--ActiveMQ 消息中间件(一)
    图5.png
6). 启动

apache-activemq-5.15.3inwin64目录下双击activemq.bat文件,在浏览器中输入http://localhost:8161/admin/, 用户名和密码输入admin即可


分布式--ActiveMQ 消息中间件(一)
图6.png


7). 消息中间件(MOM:Message Orient middleware)

消息中间件有很多的用途和优点:

  • 1 将数据从一个应用程序传送到另一个应用程序,或者从软件的一个模块传送到另外一个模块;

  • 2. 负责建立网络通信的通道,进行数据的可靠传送。

  • 3. 保证数据不重发,不丢失

  • 4. 能够实现跨平台操作,能够为不同操作系统上的软件集成技工数据传送服务

8).什么情况下使用ActiveMQ?
  • 多个项目之间集成
    (1) 跨平台
    (2) 多语言
    (3) 多项目

  • 降低系统间模块的耦合度,解耦
    (1) 软件扩展性

  • 系统前后端隔离
    (1) 前后端隔离,屏蔽高安全区

2. ActiveMQ 示例

1). P2P 示例

I. 导包--activemq-all-5.15.3.jar
II. Producer

 1/**
2 * 定义消息的生产者
3 * @author mazaiting
4 */

5public class Producer {
6    // 用户名
7    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
8    // 密码
9    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
10    // 链接
11    private static final String BROKENURL = ActiveMQConnection.DEFAULT_BROKER_URL;
12
13    /**
14     * 定义消息并发送,等待消息的接收者(消费者)消费此消息
15     * @param args
16     * @throws JMSException 
17     */

18    public static void main(String[] args) throws JMSException {
19        // 消息中间件的链接工厂
20        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
21                USERNAME, PASSWORD, BROKENURL);
22        // 连接
23        Connection connection = null;
24        // 会话
25        Session session = null;
26        // 消息的目的地
27        Destination destination = null;
28        // 消息生产者
29        MessageProducer messageProducer = null;
30
31        try {
32            // 通过连接工厂获取链接
33            connection = connectionFactory.createConnection();
34            // 创建会话,进行消息的发送
35            // 参数一:是否启用事务
36            // 参数二:设置自动签收
37            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
38            // 创建消息队列
39            destination = session.createQueue("talkWithMo");
40            // 创建一个消息生产者
41            messageProducer = session.createProducer(destination);
42            // 设置持久化/非持久化, 如果非持久化,MQ重启后可能后导致消息丢失
43            messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
44            // 模拟发送消息
45            for (int i = 0; i < 5; i++) {
46                TextMessage textMessage = session.createTextMessage("给妈妈发送的消息:"+i);
47                System.out.println("textMessage: " + textMessage);
48                messageProducer.send(textMessage);
49            }
50
51            // 如果设置了事务,会话就必须提交
52            session.commit();
53        } catch (JMSException e) {
54            e.printStackTrace();
55        } finally {
56            if (null != connection) {
57                connection.close();
58            }
59        }
60    }
61}

III. Consumer

 1/**
2 * 定义消息的消费者
3 * @author mazaiting
4 */

5public class Consumer {
6    // 用户名
7    private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
8    // 密码
9    private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
10    // 链接
11    private static final String BROKENURL = ActiveMQConnection.DEFAULT_BROKER_URL;
12
13    /**
14     * 接收消息
15     * @param args
16     * @throws JMSException 
17     */

18    public static void main(String[] args) throws JMSException {
19        // 消息中间件的链接工厂
20        ConnectionFactory connectionFactory = null;
21        // 链接
22        Connection connection = null;
23        // 会话
24        Session session = null;
25        // 消息的目的地
26        Destination destination = null;
27        // 消息的消费者
28        MessageConsumer messageConsumer = null;
29        // 实例化链接工厂,创建一个链接
30        connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKENURL);
31
32        try {
33            // 通过工厂获取链接
34            connection = connectionFactory.createConnection();
35            // 启动链接
36            connection.start();
37            // 创建会话,进行消息的接收
38            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
39            // 创建消息队列
40            destination = session.createQueue("talkWithMo");
41            // 创建一个消息的消费者
42            messageConsumer = session.createConsumer(destination);
43
44            // 模拟接收消息
45            while (true) {
46                TextMessage textMessage = (TextMessage) messageConsumer.receive(10000);
47                if (null != textMessage) {
48                    System.out.println("收到消息: " + textMessage);
49                } else {
50                    break;
51                }
52            }
53            // 提交
54            session.commit();
55        } catch (JMSException e) {
56            e.printStackTrace();
57        } finally {
58            if (null != connection) {
59                connection.close();
60            }
61        }
62    }
63}

IV. 测试

  • 先运行生产者Producer

    分布式--ActiveMQ 消息中间件(一)
    图7.png

ActiveMQ控制台


分布式--ActiveMQ 消息中间件(一)
图8.png


  • 再运行消费者Consumer

    分布式--ActiveMQ 消息中间件(一)
    图9.png


    ActiveMQ控制台

    分布式--ActiveMQ 消息中间件(一)
    图10.png

V. 消息类型

  • StreamMessage Java原始值的数据流

  • MapMessage 一套名称-键值对

  • TextMessage 一个字符串对象

  • ObjectMessage 一个序列号的Java对象

  • BytesMessage 一个未解释字节的数据流
    VI. 控制台 Queue

  • Messages Enqueued:表示生产了多少条消息,记做P

  • Messages Dequeued:表示消费了多少条消息,记做C

  • Number Of Consumers:表示在该队列上还有多少消费者在等待接受消息

  • Number Of Pending Messages:表示还有多少条消息没有被消费,实际上是表示消息的积压程度,就是P-C
    VII. 签收
    签收就是消费者接受到消息后,需要告诉消息服务器,我收到消息了。当消息服务器收到回执后,本条消息将失效。因此签收将对PTP模式产生很大影响。如果消费者收到消息后,并不签收,那么本条消息继续有效,很可能会被其他消费者消费掉!

  • AUTO_ACKNOWLEDGE:表示在消费者receive消息的时候自动的签收

  • CLIENT_ACKNOWLEDGE:表示消费者receive消息后必须手动的调用acknowledge()方法进行签收

  • DUPS_OK_ACKNOWLEDGE:签不签收无所谓了,只要消费者能够容忍重复的消息接受,当然这样会降低Session的开销

2). request/reply模型

I. 实现思路


分布式--ActiveMQ 消息中间件(一)
图11.png


Client的Producer发出一个JMS message形式的request,request上附加了一些额外的属性:



  • correlation ID(用来和返回的correlation ID对比进行验证),

  • JMSReplyTo属性(放置jms message的destination,这样worker的Consumer获得jms message就能得到destination)

Worker的consumer收到requset,处理request并用producer发出reply,destination就从requset的JMSReplyTo属性中得到。

II. Server代码

 1public class Server implements MessageListener {
2    // 经纪人链接
3    private static final String BROKER_URL = "tcp://localhost:61616";
4    // 请求队列
5    private static final String REQUEST_QUEUE = "requestQueue";
6    // 经纪人服务
7    private BrokerService brokerService;
8    // 会话
9    private Session session;
10    // 生产者
11    private MessageProducer producer;
12    // 消费者
13    private MessageConsumer consumer;
14
15    private void start() throws Exception {
16        createBroker();
17        setUpConsumer();
18    }
19
20    /**
21     * 创建经纪人
22     * @throws Exception 
23     */

24    private void createBroker() throws Exception {
25        // 创建经纪人服务
26        brokerService = new BrokerService();
27        // 设置是否持久化
28        brokerService.setPersistent(false);
29        // 设置是否使用JMX
30        brokerService.setUseJmx(false);
31        // 添加链接
32        brokerService.addConnector(BROKER_URL);
33        // 启动
34        brokerService.start();
35    }
36
37    /**
38     * 设置消费者
39     * @throws JMSException 
40     */

41    private void setUpConsumer() throws JMSException {
42        // 创建连接工厂
43        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
44        // 创建连接
45        Connection connection = connectionFactory.createConnection();
46        // 启动连接
47        connection.start();
48        // 创建Session
49        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
50        // 创建队列
51        Destination adminQueue = session.createQueue(REQUEST_QUEUE);
52        // 创建生产者
53        producer = session.createProducer(null);
54        // 设置持久化模式
55        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
56        // 创建消费者
57        consumer = session.createConsumer(adminQueue);
58        // 消费者设置消息监听
59        consumer.setMessageListener(this);
60    }
61
62    public void stop() throws Exception {
63        producer.close();
64        consumer.close();
65        session.close();
66        brokerService.stop();
67    }
68
69    @Override
70    public void onMessage(Message message) {
71        try {
72            // 创建新消息
73            TextMessage response = this.session.createTextMessage();
74
75            // 判断消息是否是文本消息
76            if (message instanceof TextMessage) {
77                // 强转为文本消息 
78                TextMessage textMessage = (TextMessage) message;
79                // 获取消息内容
80                String text = textMessage.getText();
81                // 设置消息
82                response.setText(handleRequest(text));
83            }
84            response.setJMSCorrelationID(message.getJMSCorrelationID());
85            producer.send(message.getJMSReplyTo(), response);
86        } catch (JMSException e) {
87            e.printStackTrace();
88        }
89    }
90
91    /**
92     * 构建消息内容
93     * @param text 文本
94     * @return
95     */

96    private String handleRequest(String text) {
97        return "Response to '" + text + "'";
98    }
99
100    public static void main(String[] args) throws Exception {
101        Server server = new Server();
102        // 启动
103        server.start();
104        System.out.println();
105        System.out.println("Press any key to stop the server");
106        System.out.println();
107        System.in.read();
108        server.stop();
109    }
110}

III. Client代码

 1public class Client implements MessageListener {
2    // 经纪人链接
3    private static final String BROKER_URL = "tcp://localhost:61616";
4    // 请求队列
5    private static final String REQUEST_QUEUE = "requestQueue";
6    // 连接
7    private Connection connection;
8    // 会话
9    private Session session;
10    // 生产者
11    private MessageProducer producer;
12    // 消费者
13    private MessageConsumer consumer;
14    // 请求队列
15    private Queue tempDest;
16
17    public void start() throws JMSException {
18        // 连接工厂
19        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
20        // 创建连接
21        connection = activeMQConnectionFactory.createConnection();
22        // 开启连接
23        connection.start();
24        // 创建会话
25        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
26        // 创建队列
27        Destination adminQueue = session.createQueue(REQUEST_QUEUE);
28        // 创建生产者
29        producer = session.createProducer(adminQueue);
30        // 设置持久化模式
31        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
32        // 创建模板队列
33        tempDest = session.createTemporaryQueue();
34        // 创建消费者
35        consumer = session.createConsumer(tempDest);
36        // 设置消息监听
37        consumer.setMessageListener(this);      
38    }
39
40    /**
41     * 停止
42     * @throws JMSException 
43     */

44    public void stop() throws JMSException {
45        producer.close();
46        consumer.close();
47        session.close();
48    }
49
50    /**
51     * 请求
52     * @param request
53     * @throws JMSException 
54     */

55    public void request(String request) throws JMSException {
56        System.out.println("Request: " + request);
57        // 创建文本消息
58        TextMessage textMessage = session.createTextMessage();
59        // 设置文本内容
60        textMessage.setText(request);
61        // 设置回复
62        textMessage.setJMSReplyTo(tempDest);
63        // 获取UUID
64        String correlationId = UUID.randomUUID().toString();
65        // 设置JMS id
66        textMessage.setJMSCorrelationID(correlationId);
67        // 发送消息
68        this.producer.send(textMessage);
69    }
70
71    @Override
72    public void onMessage(Message message) {
73        try {
74            System.out.println("Received response for: " + ((TextMessage)message).getText());
75        } catch (JMSException e) {
76            e.printStackTrace();
77        }
78    }
79
80    public static void main(String[] args) throws JMSException, InterruptedException {
81        Client client = new Client();
82        // 启动
83        client.start();
84        int i = 0;
85        while(i++ < 10) {
86            client.request("REQUEST- " + i);
87        }
88        Thread.sleep(3000);
89        client.stop();
90    }
91}

IV. 测试

  • 启动Server

    图12.png
  • 启动Client

    图13.png

文章中涉及到的网络链接,请点击阅读原文进行查看。

以上是关于分布式--ActiveMQ 消息中间件的主要内容,如果未能解决你的问题,请参考以下文章

Java中间消息件——ActiveMQ入门级运用

分布式--ActiveMQ 消息中间件

分布式消息通信ActiveMQ

分布式--ActiveMQ 消息中间件

学习dubbo:消息中间件activemq -简介

高并发分布式消息中间件技术ActiveMQ事务