ActiveMQ

Posted zzzzn

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ActiveMQ相关的知识,希望对你有一定的参考价值。

 一、MQ产品的分类

1、RabbitMQ

  是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQPXMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。同时实现了一个经纪人(Broker)构架,这意味着消息在发送给客户端时先在中心队列排队。对路由(Routing),负载均衡(Load balance)或者数据持久化都有很好的支持。

2、Redis

  是一个Key-Value的NoSQL数据库,开发维护很活跃,虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。对于RabbitMQRedis的入队和出队操作,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes512Bytes1K10K四个不同大小的数据。实验表明:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而如果数据大小超过了10KRedis则慢的无法忍受;出队时,无论数据大小,Redis都表现出非常好的性能,而RabbitMQ的出队性能则远低于Redis

 

入队

出队

 

128B

512B

1K

10K

128B

512B

1K

10K

Redis

16088

15961

17094

25

15955

20449

18098

9355

RabbitMQ

10627

9916

9370

2366

3219

3174

2982

1588

3、ZeroMQ

  号称最快的消息队列系统,尤其针对大吞吐量的需求场景。ZMQ能够实现RabbitMQ不擅长的高级/复杂的队列,但是开发人员需要自己组合多种技术框架,技术上的复杂度是对这MQ能够应用成功的挑战。ZeroMQ具有一个独特的非中间件的模式,你不需要安装和运行一个消息服务器或中间件,因为你的应用程序将扮演了这个服务角色。你只需要简单的引用ZeroMQ程序库,可以使用NuGet安装,然后你就可以愉快的在应用程序之间发送消息了。但是ZeroMQ仅提供非持久性的队列,也就是说如果down机,数据将会丢失。其中,TwitterStorm中使用ZeroMQ作为数据流的传输。

4、ActiveMQ

  Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。RabbitMQZeroMQActiveMQ均支持常用的多种语言客户端 C++Java.Net,PythonphpRuby等。

5、Jafka/Kafka

  Kafka是Apache下的一个子项目,是一个高性能跨语言分布式Publish/Subscribe消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现复杂均衡;支持Hadoop数据并行加载,对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka通过Hadoop的并行加载机制来统一了在线和离线的消息处理,这一点也是本课题所研究系统所看重的。Apache Kafka相对于ActiveMQ是一个非常轻量级的消息系统,除了性能非常好之外,还是一个工作良好的分布式系统。

  其他一些队列列表HornetQ、Apache Qpid、Sparrow、Starling、Kestrel、Beanstalkd、Amazon SQS就不再一一分析。

二、消息中间件概述

1、消息中间件产生的背景

  在客户端与服务器进行通讯时.客户端调用后,必须等待服务对象完成处理返回结果才能继续执行。

  客户与服务器对象的生命周期紧密耦合,客户进程和服务对象进程都都必须正常运行;如果由于服务对象崩溃或者网络故障导致用户的请求不可达,客户会受到异常

  点对点通信: 客户的一次调用只发送给某个单独的目标对象。

2、什么是消息中间件

  面向消息的中间件(MessageOrlented MiddlewareMOM)较好的解决了以上问题。发送者将消息发送给消息服务器,消息服务器将消感存放在若千队列中,在合适的时候再将消息转发给接收者。

  这种模式下,发送和接收是异步的,发送者无需等待;

  二者的生命周期未必相同:发送消息的时候接收者不一定运行,接收消息的时候发送者也不一定运行;

  一对多通信:对于一个消息可以有多个接收者。

三、JMS介绍

1、什么JMS

  JMS是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。

2、什么是消息模型 

  ○ Point-to-Point(P2P) --- 点对点

  ○ Publish/Subscribe(Pub/Sub)---  发布订阅

 3、P2P (点对点)

  ①、P2P模式图 

    技术图片

  ②、涉及到的概念 

    消息队列(Queue)

    发送者(Sender)

    接收者(Receiver)

    每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

  ③、P2P的特点

    每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)

    发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列

    接收者在成功接收消息之后需向队列应答成功

  如果你希望发送的每个消息都应该被成功处理的话,那么你需要P2P模式。

  ④、应用场景

    A用户与B用户发送消息

3、Pub/Sub (发布与订阅)

  ①、Pub/Sub模式图 

    技术图片

  ②、涉及到的概念 

     主题(Topic)

    发布者(Publisher)

    订阅者(Subscriber) 
    客户端将消息发送到主题。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

  ③、Pub/Sub的特点

    每个消息可以有多个消费者

    发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。

    为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

  如果你希望发送的消息可以不被做任何处理、或者被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型

  ④、消息的消费   

    JMS中,消息的产生和消息是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。 

  ○ 同步 
    订阅者或接收者调用receive方法来接收消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞 
  ○ 异步 
    订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。

    ⑤、应用场景:

    用户注册、订单修改库存、日志存储

四、ActiveMQ

  ActiveMQ是Apache下的一个子项目。 类似于ZeroMQ,它能够以代理人和点对点的技术实现队列。同时类似于RabbitMQ,它少量代码就可以高效地实现高级应用场景。RabbitMQ、ZeroMQ、ActiveMQ均支持常用的多种语言客户端 C++、Java、.Net,、Python、 Php、 Ruby

五、windows安装ActiveMQ

  1、 解压,进入apache-activemq-5.11.1inwin64

    技术图片

  2、 启动,双击activeMQ.bat脚本启动,启动窗口不要关闭,可以设置后台启动

    技术图片

  3、 启动完成后,如果发送消息或者消费消息通过61616端口进行 后台查看信息通过8161端口查看

    技术图片

  4、 进入后台登陆:默认用户名和密码都是admin

    技术图片

     技术图片

  5、 登陆成功进入主页面

    技术图片

 六、实现点对点通讯模式

1、目录展示

   技术图片

2、导入依赖

  技术图片

3、提供者P2P_Provider

package com.zn.p2p;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 点对点提供者
 */
public class P2P_Provider {
    public static void main(String[] args) throws JMSException {
        //步骤一:创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory();
        //步骤二:创建连接
        Connection connection = activeMQConnectionFactory.createConnection();
        //步骤三:启动连接
        connection.start();
        //步骤四:获取会话工厂
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //步骤五:创建队列
        Queue quque = session.createQueue("MyQueue");
        //创建消息生产者
        MessageProducer producer = session.createProducer(quque);
        //消息持久化  参数1为不做持久化  2为持久化
        producer.setDeliveryMode(2);
        //模拟消息
        TextMessage textMessage = session.createTextMessage("Hello ActiveMQ!");
        //发送消息
        producer.send(textMessage);
        System.out.println("生产者生成消息完毕!");
        //回收资源
        session.close();
        connection.close();
    }
}

4、消费者P2P_Consumer

package com.zn.p2p;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * P2P消费者
 */
public class P2P_Consumer {
    public static void main(String[] args) throws JMSException {
        //步骤一:创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //步骤二:创建连接
        Connection connection = activeMQConnectionFactory.createConnection();
        //步骤三:启动连接
        connection.start();
        //步骤四:获取会话工厂
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //步骤五:创建队列
        Queue quque = session.createQueue("MyQueue");
        //创建消费者
        MessageConsumer consumer = session.createConsumer(quque);
        //循环获取消息
        while (true){
            TextMessage message = (TextMessage)consumer.receive();
            if (message!=null){
                System.out.println("消费者获取信息:"+message.getText());
            }else {
                break;
            }
        }
        //回收资源
        session.close();
        connection.close();
    }
}

5、启动提供者

  技术图片

  技术图片

6、启动消费者

  技术图片

   技术图片

七、实现发布订阅模式(消费者先订阅Topic制图,再生产消息)

1、目录展示

  技术图片

2、导入依赖

  技术图片

3、发布订阅消费者

package com.zn.pubsub;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 发布订阅消费者
 */
public class PubSub_Consumer {
    public static void main(String[] args) throws JMSException {
        //步骤一:创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //步骤二:创建连接
        Connection connection = activeMQConnectionFactory.createConnection();
        //步骤三:启动连接
        connection.start();
        //步骤四:获取会话工厂
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //步骤五:创建主题
        Topic topic = session.createTopic("MyQueue");
        //创建消费者
        MessageConsumer consumer = session.createConsumer(topic);
        //循环获取消息
        while (true){
            TextMessage message = (TextMessage)consumer.receive();
            if (message!=null){
                System.out.println("消费者获取信息:"+message.getText());
            }else {
                break;
            }
        }
        //回收资源
        session.close();
        connection.close();
    }
}

4、发布订阅提供者

package com.zn.pubsub;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 发布订阅提供者
 */
public class PubSub_Provider {
    public static void main(String[] args) throws JMSException {
        //步骤一:创建连接工厂
        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER, ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
        //步骤二:创建连接
        Connection connection = activeMQConnectionFactory.createConnection();
        //步骤三:启动连接
        connection.start();
        //步骤四:获取会话工厂
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //步骤五:创建主题
        Topic topic = session.createTopic("MyQueue");
        //创建消息生产者
        MessageProducer producer = session.createProducer(null);
        //消息持久化  参数1为不做持久化  2为持久化
        producer.setDeliveryMode(2);
        //模拟消息
        TextMessage textMessage = session.createTextMessage("Hello ActiveMQ--PubSub!");
        //发送消息
        producer.send(topic,textMessage);
        System.out.println("生产者生成消息完毕!");
        //回收资源
        session.close();
        connection.close();
    }
}

5、启动消费者订阅消息

  技术图片

 

 

   技术图片

   技术图片

6、启动提供者

  技术图片

   技术图片

   技术图片

 

以上是关于ActiveMQ的主要内容,如果未能解决你的问题,请参考以下文章

wildfly 实践5 ---分布式服务中的JMS服务访问

ActiveMQ的安全机制使用及其源代码分析 [转]

ActiveMQ——activemq的使用java代码实例

ActiveMQ入门系列二:入门代码实例(点对点模式)

ActiveMQ入门案例-生产者代码实现

ActiveMQ queue 代码示例