新手快速搭建ActiveMQ服务「消息队列」

Posted 爪哇笔记

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了新手快速搭建ActiveMQ服务「消息队列」相关的知识,希望对你有一定的参考价值。

前言

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

在生产项目中,很多时候需要消息中间件来进行分布式系统间的通信。它具有低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能。本篇主要介绍ActiveMQ 相关概念以及安装说明,后面会着重介绍 SpringBoot 集成实现秒杀消息队列

概念

JMS消息模式

点对点或队列模式

包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

  • 每个消息只有一个消费者(Consumer),即一旦被消费,消息就不再在消息队列中

  • 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列

  • 接收者在成功接收消息之后需向队列应答成功

Pub/Sub 发布/订阅模式

包含三个角色:主题(Topic),发布者(Publisher),订阅者(Subscriber) 。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

  • 每个消息可以有多个消费者

  • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息。

  • 为了消费消息,订阅者必须保持运行的状态。

为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型。

JMS消息基本组件

ConnectionFactory

创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。

Destination

Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。 所以,Destination实际上就是两种类型的对象:Queue、Topic可以通过JNDI来查找Destination。

Connection

Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。

Session

Session是操作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。

消息的生产者

消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。

消息消费者

消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。

MessageListener

消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。

Transport传输方式

ActiveMQ目前支持的Transport有:VM Transport、TCP Transport、NIO Transport、SSL Transport、Peer Transport、UDP Transport、Multicast Transport、HTTP and HTTPS Transport、WebSockets Transport、Failover Transport、Fanout Transport、Discovery Transport、ZeroConf Transport等。

  • VM Transport:允许客户端和Broker直接在VM内部通信,采用的连接不是Socket连接,而是直接的方法调用,从而避免了网络传输的开销。应用场景也仅限于Broker和客户端在同一JVM环境下。

  • TCP Transport:客户端通过TCP Socket连接到远程Broker。配置语法:

  • tcp://hostname:port?transportOptions

  • HTTP and HTTPS Transport:允许客户端使用REST或者Ajax的方式进行连接。这意味着可以直接使用javascript向ActiveMQ发送消息。

  • WebSockets Transport:允许客户端通过html5标准的WebSockets方式连接到Broker。

  • Failover Transport:青龙系统MQ采用的就是这种连接方式。这种方式具备自动重新连接的机制,工作在其他Transport的上层,用于建立可靠的传输。允许配置任意多个的URI,该机制将会自动选择其中的一个URI来尝试连接。配置语法:

  • failover:(tcp://localhost:61616,tcp://localhost:61617,.....)?transportOptions

  • Fanout Transport:主要适用于生产消息发向多个代理。如果多个代理出现环路,可能造成消费者接收重复的消息。所以,使用该协议时,最好将消息发送给多个不相连接的代理。

Persistence持久化存储

AMQ Message Store

ActiveMQ 5.0 的缺省持久化存储方式。

Kaha Persistence

这是一个专门针对消息持久化的解决方案。它对典型的消息使用模式进行了优化。

JDBC Persistence

目前支持的数据库有:Apache Derby, Axion, DB2, HSQL, Informix, MaxDB, mysql, Oracle, Postgresql, SQLServer, Sybase。

Disable Persistence

不应用持久化存储。

集群方案(Master / Slave)

Pure Master Slave

  • 无单点故障;

  • 不需要依赖共享文件系统或是共享数据库,使用 KahaDB的方式持久化存储;

  • 一个Master只能带一个Slave;

  • Master工作期间,会将消息状况自动同步到Slave;

  • Master一旦崩溃,Slave自动接替其工作,已发送并尚未消费的消息继续有效;

  • Slave接手后,必须停止Slave才能重启先前的Master;

Shared File System Master Slave

JDBC Master Slave

  • 配置上,不存在Master和Slave的区分,多个共享数据源的Broker构成JDBC Master Slave;

  • 首先抢到资源(数据库锁)的Broker成为Master,其他Broker定期尝试抢占资源;

  • 一旦Master崩溃,其他Broker抢占资源,最终只有一台抢到,立刻成为Master,之前的Master即便重启成功,也只能作为Slave等待;

安装说明

这里使用Docker安装,查询Docker镜像:

 
   
   
 
  1. docker search activemq

下载Docker镜像:

 
   
   
 
  1. docker pull webcenter/activemq

创建&运行ActiveMQ容器:

 
   
   
 
  1. docker run -d --name myactivemq -p 61617:61616 -p 8162:8161 webcenter/activemq

61616是 activemq 的容器使用端口(映射为61617),8161是 web 页面管理端口(对外映射为8162)

查看创建的容器,如果存在说明安装成功:

 
   
   
 
  1. docker ps

查看WEB管理页面:

浏览器输入http://ip:8162 点击Manage ActiveMQ broker使用默认账号/密码:admin/admin进入查看。

配置访问密码

进入Docker容器:

 
   
   
 
  1. docker exec -it myactivemq /bin/bash

控制台界面设置用户名和密码:

 
   
   
 
  1. # 位于根目录 conf 目录下

  2. vi jetty-realm.properties

  3. # 修改密码

  4. # username: password [,rolename ...]

  5. admin: admin, admin

配置连接密码

编辑activemq.xml文件,放置到 shutdownHooks 下方即可。

 
   
   
 
  1. <!-- 添加访问ActiveMQ的账号密码 -->

  2. <plugins>

  3.    <simpleAuthenticationPlugin>

  4.        <users>

  5.            <authenticationUser username="${activemq.username}" password="${activemq.password}" groups="users,admins"/>

  6.        </users>

  7.    </simpleAuthenticationPlugin>

  8. </plugins>

修改conf中credentials.properties文件进行密码设置:

 
   
   
 
  1. activemq.username=admin

  2. activemq.password=123456

  3. guest.password=123456

注意事项

如果是云服务器,记得开放相关端口(61617/8160)

参考

http://activemq.apache.org/ 

https://www.cnblogs.com/xdouby/p/5934965.html

以上是关于新手快速搭建ActiveMQ服务「消息队列」的主要内容,如果未能解决你的问题,请参考以下文章

ActiveMQ消息队列集群的搭建

框架篇——Spring整合ActiveMQ(MQ服务端与消费端演示)

消息队列:快速上手ActiveMQ消息队列的JMS方式使用(两种模式:Topic和Queue的消息推送和订阅)

AMQP消息队列之RabbitMQ简单示例

JAVAEE——宜立方商城08:Zookeeper+SolrCloud集群搭建搜索功能切换到集群版Activemq消息队列搭建与使用

三kafka搭建