MQ 之 ActiveMQ

Posted binfooo

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了MQ 之 ActiveMQ相关的知识,希望对你有一定的参考价值。

ActiveMQ 基本定义

什么是消息中间件?

消息中间件顾名思义实现的就是在两个系统或两个客户端之间进行消息传送。
技术图片

什么是ActiveMQ?

ActiveMQ 是一种开源的基于JMS(Java Message Servie)规范的一种消息中间件的实现,ActiveMQ的设计目标是提供标准的,面向消息的,能够跨越多语言和多系统的应用集成消息通信中间件。
ActiveMQ 常被应用与系统业务的解耦,异步消息的推送,增加系统并发量,提高用户体验。

如何使用ActiveMQ?

AcitveMQ 数据传送流程

技术图片

ActiveMQ 两种消息传递类型

(1)点对点传输,即一个生产者对应一个消费者,生产者向broke推送数据,数据存储在broke的一个队列中,当消费者接受该条队列里的数据。
(2)基于发布/订阅模式的传输,即根据订阅话题来接收相应数据,一个生产者可向多个消费者推送数据。

ActiveMQ 两种消费方式

(1)同步阻塞方式(receive):订阅者或者接收者调用MessageConsumer的receive方法来接受消息,receive方法在能够接受到消息之前将会一直阻塞。
(2)异步非阻塞方式(监听器):订阅者或者接收者通过MessageConsumer的setMessageListener注册一个消息监听器,当消息到达之后,系统自动调用监听器MessageListener的onMessage方法。

ActiveMQ 安装

获取安装包

下载地址:http://activemq.apache.org/components/classic/download/
技术图片

Linux下安装下载包

wget http://mirror.bit.edu.cn/apache//activemq/5.15.11/apache-activemq-5.15.11-bin.tar.gz
tar -xvzf apache-activemq-5.15.11-bin.tar.gz

技术图片

启动 activeMQ

./activemq start
./activemq status
ps -ef | grep active | grep -v grep
netstat -anp | grep 61616

技术图片

前台登陆

登陆地址:http://111.230.116.197:8161/index.html
默认账户密码:admin/admin
技术图片
技术图片

测试案例

新建Maven项目模块

导入Maven架包依赖

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.11</version>
    <scope>compile</scope>
</dependency>

测试案例 - 生产者

package com.personal.test;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

public class JmsProduct0 {

    public final static String ACTIVEMQ_URL   = "tcp://111.230.116.197:61616";
    public final static String ACTIVEMQ_QUEUE = "QUEUE02";

    public static void main(String[] args) throws JMSException {

        // 1. 创建链接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        activeMQConnectionFactory.setUserName("admin");
        activeMQConnectionFactory.setPassword("admin");

        // 2. 通过连接工厂,创建连接
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        System.out.println("创建成功.");

        // 3. 创建会话 Session
        // 参数一:事务
        // 参数二:签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE);

        MessageProducer messageProducer = session.createProducer(queue);
        for (int i = 0; i < 20000; i++) {
            System.out.println("开始发送 Message :" + i);
            TextMessage textMessage = session.createTextMessage("Message ----- " + i);
            messageProducer.send(textMessage);
        }
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("消息发送到ActiveMQ完成.");
    }
}

测试案例 - 消费者[普通接收接口] - DEMO-0

package com.personal.test;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;

public class JmsConsumer0 {

    public final static String ACTIVEMQ_URL   = "tcp://111.230.116.197:61616";
    public final static String ACTIVEMQ_QUEUE = "QUEUE02";

    public static void main(String[] args) throws JMSException {

        // 1. 创建链接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        activeMQConnectionFactory.setUserName("admin");
        activeMQConnectionFactory.setPassword("admin");

        // 2. 通过连接工厂,创建连接
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();

        // 3. 创建会话 Session
        // 参数一:事务
        // 参数二:签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE);
        MessageConsumer consumer = session.createConsumer(queue);

        while (true) {
            Message message = consumer.receive(20000);
            TextMessage textMessage = (TextMessage) message;
            if (textMessage != null) {
                System.out.println("[" + textMessage.getJMSMessageID() + "] " + textMessage.getText());
            } else {
                break;
            }
        }

        consumer.close();
        session.close();
        connection.close();
        System.out.println("消费成功.");
    }
}

测试案例 - 消费者[监听器接口] - DEMO-1

package com.personal.test;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.IOException;

public class JmsConsumer1 {

    public final static String ACTIVEMQ_URL   = "tcp://111.230.116.197:61616";
    public final static String ACTIVEMQ_QUEUE = "QUEUE02";

    public static void main(String[] args) throws JMSException, IOException {

        // 1. 创建链接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        activeMQConnectionFactory.setUserName("admin");
        activeMQConnectionFactory.setPassword("admin");

        // 2. 通过连接工厂,创建连接
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();

        // 3. 创建会话 Session
        // 参数一:事务
        // 参数二:签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE);
        MessageConsumer consumer = session.createConsumer(queue);

        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                if (message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("[" + textMessage.getJMSMessageID() + "] " + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        System.in.read();
        consumer.close();
        session.close();
        connection.close();
        System.out.println("消费成功.");
    }
}

发送进程启动:
技术图片

消费进程启动:
技术图片

消息队列处理情况:
技术图片

消费者三种消费情况

一个消费者、一个生产者,先启动消费者,普通消费

一个消费者、一个生产者,先启动生产者,普通消费

多个消费者、一个生产者,先启动消费者,消费均分

生产十二条记录:
技术图片
消费实例一,共消费六条
技术图片
消费实例二,共消费六条
技术图片

以上是关于MQ 之 ActiveMQ的主要内容,如果未能解决你的问题,请参考以下文章

JMS 之 Active MQ 启动嵌入式Broke

JMS 之 Active MQ 的消息传输

消息中间件MQ之ActiveMQ学习总结(上)

MQ框架的比较

JMS 之 Active MQ 消息存储

JMS 之 Active MQ 消息存储