消息中间件MQ及ActiveMQ介绍
Posted 踩踩踩从踩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息中间件MQ及ActiveMQ介绍相关的知识,希望对你有一定的参考价值。
前言
本篇文章会首先介绍MQ的概念,及核心概念及核心设计来理解出为什么需要消息中间件,消息中间间的作用,然后会接着介绍常用的activeMq的安装,以及在spring中集成,一些比较常用的命令。
无论是在传统还是互联网中,都是协作的。系统之间有联系,系统之间需要交流的方式分为两种同步调用和异步调用。对应起来马上响应和延时响应
对于同步和异步效果对比 上来说:
- 同步-串行:
下单过后 需要等待所有服务完成,才能完成数据
缺点:耗时比较长 180ms
- 异步-并行
利用线程池异步方式处理,降低处理时间
缺点是:高耦合、需要线程池管理。 写订单服务代码 需要把所有的物流服务。。。都和订单服务耦合起来
- 异步-MQ
对应起来mq系统做通知得作用,所有服务订阅,通知需要得服务
最后达到异步解耦得效果
通过消息中间件可以达到异步解耦所有系统得效果。
消息中间件MQ
概述
MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据结构。一般用来解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。
特点:
- 利用 高效可靠得消息传递机制 进行平台无关的数据交流;
- 并基于数据通信来进行分布式系统的集成;
- 通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间通信;
消息中间件的应用场景
跨系统数据传递、高并发流量削峰(缓冲)、数据异步处理、系统解耦。。。等等
常用的消息中间件
ActiveMq、RabbitMq、Kafka、RoctetMq
对于message-Queue消息中间件中的消息对应的是数据对象,有可能是服务这些。
为什么要用Queue:也是来源于队列的特性先进先出,不用栈这些结构,消息是有顺序,有时间先后的。
消息中间件设计
本质
一种具备接收请求、保存数据、发送数据等功能的网络应用。和一般网络应用程序的区别是它负责数据的接收和传递,所以性能一般高于普通程序。
核心构成
协议、持久化机制、消息分发机制、高可用设计、高可靠设计。
它因为需要接受保存发送数据,有了几个核心机制。 所有的中间件都会包括这几个构成。
协议
协议是计算机之间通信时共同遵从一组约定,都遵从相同的约定,计算机之间才能相互交流。是对数据格式和计算机之间交换数据时必须遵守的规则的正式描述。
协议三要素:
- 语法:即数据与控制信息的结构或格式。
- 语义:即需要发出何种控制信息,完成何种动作以及做出何种响应;
- 时序:即事件实现顺序的详细描述。
也就是计算机之间相互交流约定的术语。
协议三要素举例:
语法: http规定了请求报文和响应报文的具体格式。
语义:客户端主动发起的操作称为请求;
时序:— 个请求对应— 个响应
至于在消息中间件中不用http,大家肯定能想到,http的消息头也好,还是cookie等等字段,太繁杂了,导致效率不高,而消息中间件业务场景比较专一,所以不需要这种重协议,不用什么都考虑到。
AMQP协议
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。
特性:
事务支持、持久化支持、出生金融行业、在可靠消息处理上具备天然的优势。
MQTT协议
MQTT协议消息队列遥测传输是IBM开发的一个即时通讯协议,物联网系统架构中的重要组成部分。
MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一个消息中间件 。
MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
特性:
轻量、结构简单、传输快、没有事务支持、没有持久化相关设计
应用场景:
适用于计算能力有限、低带宽、网络不稳定的场景。
Open message协议
Open message是近一两年由阿里发起、与雅虎、滴滴出行、Streamlio等公司共同参与创立的分布式消息中间件、流处理领域的应用开发标准。是国内首个在全球范围内发起的分布式消息领域国际标准。
特性:
结构简单、解析快、有事务设计、有持久化设计
Kafka协议
kafka协议是基于TCP的二进制协议。消息内部是通过长度来分隔,由一些基本数据类型组成。Kafka专有协议
特性
结构简单、解析快、无事务设计、有持久化设计。
持久化
持久化是将程序数据在持久状态和瞬时状态间转换的机制。通俗的讲,就是瞬时数据(比如内存中的数据,是不能永久保存的)持久化为持久数据(比如持久化至数据库中,能够长久保存)。
常用的持久化方式就两种:
一是在磁盘中,二是在文件系统中。
大部分消息中间支持的是文件系统中,在数据库中支持比较重,并且繁杂
消息分发
产生消息分发策略的原因
也是在于 消费的方式 服务在去处理时,是多种多样的
需要不同系统上进行消费,然后以及在业务场景二中,并不需要这么繁杂
需要一个确认机制。
常见的分发策略
- 发布订阅: 发一次数据,被所有订阅的服务都收到
- 轮询分发: 发一条消息,根据服务进行分发数据,有些是按权重进行轮询
- 公平分发:一般是权重进行公平分发
- 重发:重发概念,需要有一个有一个消息确认机制,来保证数据安全
- 消息拉取:由服务决定,而不是进行分发,服务想要时去拉取
高可用
“高可用性”(High Availability)通常来描述一个系统经过专门的设计,从而减少停工时间,而保持其服务的高度可用性。
在这里描述的是指产品在规定的条件和规定的时刻或时间区间内处于可执行规定功能状态的能力。当业务量大时,一台消息中间件服务器可能无法满足需求。所以需要消息中间件能够部署集群,达到高可用的目的。
主从方案共享数据的方式
通过共享数据方式来保证数据的一致性。
Master-Slave主从同步部署方式
也是只能在主节点进行操作数据
Blocker-Cluster多主从集群同步部署方式
Blocker-Cluster多主从集群转发部署方式
将数据进行分开。
高可靠
最主要数据不能丢失
高可靠性是指系统可以无故障地持续运行。比如— 个系统从来不崩溃、报错,或者崩溃、
报错的几率较低,那就是高可靠。
在高并发业务场景下,如果不能保证系统的高可靠,那造成的损失将会非常严重。
- 保证消息中间件的高可靠性,可以从以下几方面考虑
- 消息传输可靠:通过协议来保证系统间数据解析的正确性。
- 消息存储可靠:通过持久化来保证消息的存储可靠性
ActiveMQ
ActiveMQ是Apache出品的,比较经典的消息中间件。
官网地址:activemq.apache.org
Apache ActiveMQ™ is the most popular open source, multi-protocol, Java-based messaging
server. It supports industry standard protocols so users get the benefits of client choices across a
broad range of languages and platforms. Connectivity from C, C++, Python, .Net, and more is
available. Integrate your multi-platform applications using the ubiquitous AMQP protocol.
Exchange messages between your web applications using STOMP over websockets. Manage your
IoT devices using MQTT. Support your existing JMS infrastructure and beyond. ActiveMQ offers
the power and flexibility to support any messaging use-case.
- 用AMQP工业标准协议进行多平台应用集成;
- Web应用可基于websocket用STOMP协议与ActiveMQ直接交互
- 物联网设备用MQTT协议
- 基于JMS的已有基础设施也支持
- 还有更多…
ActiveMQ Artemis 是高性能目标为非阻塞的版本,基于事件驱动,这里介绍是 ActiveMQ 5 "Classic" 经典版本
ActiveMQ作为一个老牌的消息中间件,其提供了详细的官方文档。
在官方文档中
包含了一些学习目录文档,包含了使用
以及包含了 特性 以及使用说明等等
ActiveMQ常用的应用场景
- 用AMQP工业标准协议进行多平台应用集成;
- Web应用可基于websocket的stomp协议与ActiveMQ直接交互。
- 物联网设备MQTT协议
- 基于JMS的已有基础设施也支持
- 还有一些更多
ActiveMQ安装
ActiveMQ是用JAVA开发的,跨平台的,开箱即可使用
- 虚拟机软件:Oracle VM VirtualBox 下载地址:https://www.virtualbox.org/wiki/Downloads
- Linux: Centos 7 CentOS-7-x86_64-Minimal-1810.iso 阿里云镜像下载地址:https://mirrors.aliyun.com/centos/7.6.1810/isos/x86_64/
- Jdk 8 jdk-8u221-linux-x64.rpm
安装包对应下载:https://activemq.apache.org/components/classic/download/
也可在linux机器上直接下载:
wget –c http://mirror.bit.edu.cn/apache/activemq/5.15.9/apache-activemq-5.15.9- bin.tar.gz
安装
- 创建安装目录
mkdir /usr/activemq
- 解压安装包到安装目录
tar -zxvf apache-activemq-5.15.9-bin.tar.gz -C /usr/activemq
- 为方便配置时书写,创建软链接
ln -s /usr/activemq/apache-activemq-5.15.9 /usr/activemq/latest
- 熟悉activemq的目录构成:
cd /usr/activemq/latest/bin
./activemq console
./activemq start
INFO: Loading '/usr/activemq/apache-activemq-5.15.9//bin/env' INFO: Using java '/usr/bin/java' INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details INFO: pidfile created : '/usr/activemq/apache-activemq- 5.15.9//data/activemq.pid' (pid '14887')
Apache ActiveMQ 5.15.9 (localhost, ID:ntbk11111-50816-1428933306116-0:1) started | org.apache.activemq.broker.BrokerService | main
[root@localhost latest]# jps 25778 activemq.jar 25805 Jps
#Web管理端口默认为8161,通讯端口默认为61616
firewall-cmd --zone=public --add-port=8161/tcp --permanent
firewall-cmd --zone=public --add-port=61616/tcp --permanent
systemctl restart firewalld.service
- 以普通用户activemq 身份来运行
useradd activemq
chown -R activemq:users /usr/activemq
- 创建全局默认的配置文件,并配置activemq
cp /usr/activemq/latest/bin/env /etc/default/activemq
sed -i '~s/^ACTIVEMQ\\_USER=""/ACTIVEMQ\\_USER="activemq"/'
/etc/default/activemq
vim /etc/default/activemq
# Active MQ installation dirs # ACTIVEMQ_HOME="<Installationdir>/"
# ACTIVEMQ_BASE="$ACTIVEMQ_HOME" # ACTIVEMQ_CONF="$ACTIVEMQ_BASE/conf"
# ACTIVEMQ_DATA="$ACTIVEMQ_BASE/data" # ACTIVEMQ_TMP="$ACTIVEMQ_BASE/tmp"
# Set jvm memory configuration (minimal/maximum amount of memory) ACTIVEMQ_OPTS_MEMORY="-Xms64M -Xmx1G"
chmod 644 /etc/default/activemq
-
安装启动脚本
ln -snf /usr/activemq/latest/bin/activemq /etc/init.d/activemq
-
激活启动服务
# RHEL
chkconfig --add activemq
chkconfig activemq on
systemctl enable activemq
- 手动启动服务
systemctl start activemq
activeMQ管理台
自带的管理台,在浏览器中访问http://服务IP:8161/admin即可进入
- broker 展示的实例 MQ的信息
- queue 队列中数据信息 ,包含入队 消息,出队信息,以及 消费者信息
- topics 发布订阅的主题信息
- 订阅者 subscribers
以及后面的订阅者网络连接等等,都是需要使用到的
send a jms message 控制台
ActiveMQ 配置
这里面包含 webapps 和配置文件,以及 data 日志文件 以及数据文件地。
配置文件
web服务控制台 的 配置文件 jetty.xml
- 打开有端口号等的配置,沿用spring的配置,包含安全配置,端口号配置
- real.properties 配置对应的用户信息
以及 group.properties 对于消息的配置 , log4j.properties 配置日志 文件。
ActiveMQ使用
直接添加maven依赖即可,其实现在基本的使用方式都是采用这种方式,从而达到引入 引入activemq-all.jar 的目的
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
使用方式
创建一个消费者,进行 请求消费数据,这是最原始的,但我们一般不使用这种方式
try {
// brokerURL
// http://activemq.apache.org/connection-configuration-uri.html
// 1、创建连接工厂
connectionFactory = new ActiveMQConnectionFactory(this.brokerUrl);
// 2、创建连接对象
conn = connectionFactory.createConnection(); conn.start();
// 一定要启动
// 3、创建会话(可以创建一个或者多个session)
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、创建消息消费目标(Topic or Queue)
Destination destination = session.createQueue(destinationUrl);
// 5、创建消息消费者 http://activemq.apache.org/destination- options.html
consumer = session.createConsumer(destination);
// 6、接收消息(没有消息就持续等待)
Message message = consumer.receive();
if (message instanceof TextMessage) {
System.out.println("收到文本消息:" + ((TextMessage) message).getText());
} else {
System.out.println(message);
}consumer.close();
session.close();
conn.close();
以及生产者
public class Producer {
public static void main(String[] args) {
new ProducerThread("tcp://mq.study.com:61616", "queue1").start();
new ProducerThread("tcp://mq.study.com:61616", "queue1").start();
}
static class ProducerThread extends Thread {
String brokerUrl;
String destinationUrl;
public ProducerThread(String brokerUrl, String destinationUrl) {
this.brokerUrl = brokerUrl;
this.destinationUrl = destinationUrl;
}
@Override
public void run() {
ActiveMQConnectionFactory connectionFactory;
Connection conn;
Session session;
try {
// 1、创建连接工厂
connectionFactory = new ActiveMQConnectionFactory(brokerUrl);
connectionFactory.setUseAsyncSend(true);
// 2、创建连接
conn = connectionFactory.createConnection();
conn.start(); // 一定要start
// 3、创建会话(可以创建一个或者多个session)
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4、创建消息发送目标 (Topic or Queue)
Destination destination = session.createQueue(destinationUrl);
// 5、用目的地创建消息生产者
MessageProducer producer = session.createProducer(destination);
// 设置递送模式(持久化 / 不持久化)
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 6、创建一条文本消息
String text = "Hello world! From: " + Thread.currentThread().getName() + " : "
+ System.currentTimeMillis();
TextMessage message = session.createTextMessage(text);
// 7、通过producer 发送消息
System.out.println("Sent message: " + text);
CountDownLatch cdl = new CountDownLatch(1);
((ActiveMQMessageProducer) producer).send(message, new AsyncCallback() {
@Override
public void onException(JMSException exception) {
// TODO Auto-generated method stub
}
@Override
public void onSuccess() {
try {
System.out.println(Thread.currentThread().getName() + " 异步发送完成:messageId: "
+ message.getJMSMessageID() + " " + text);
} catch (JMSException e) {
}
}
});
// cdl.await();
// 8、 清理、关闭连接
session.close();
conn.close();
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
在topic服务器上,会将消息单独发送 ,为每个消费者都维护了一个队列。
在使用过程中
- 在发布订阅模式下,如果在订阅在发布之后,不应该收到消息。
- 持久订阅 MQ登记有名的订阅,消费者 挂了,会持久等待
spring boot 中使用ActiveMQ
引用包中
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
配置activemq broker连接参数 (application.yml)
spring:
activemq:
broker-url: tcp://mq.study.com:61616
#user: admin
#password: secret
使用方式很简单
@Autowired
private JmsTemplate jmsTemplate;
@PostConstruct
public void sendMessage() {
// Send a message with a POJO - the template reuse the message converter
System.out.println("Sending an email message.");
jmsTemplate.convertAndSend("mailbox", new Email("info@example.com", "Hello"));
}
public static void main(String[] args) {
SpringApplication.run(Producer.class, args);
}
以上是关于消息中间件MQ及ActiveMQ介绍的主要内容,如果未能解决你的问题,请参考以下文章