消息队列核心原理

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了消息队列核心原理相关的知识,希望对你有一定的参考价值。

参考技术A 消息队列已经逐渐成为分布式应用场景、内部通信、以及秒杀等高并发业务场景的核心手段,它具有低耦合、可靠投递、广播、流量控制、最终一致性 等一系列功能。
无论是 RabbitMQ、RocketMQ、ActiveMQ、Kafka还是其它等,都有的一些基本原理、术语、机制等,总结分享出来,希望大家在使用消息队列技术的时候能够快速理解。

1.消息生产者Producer:发送消息到消息队列。
2.消息消费者Consumer:从消息队列接收消息。
3.Broker:概念来自与Apache ActiveMQ,指MQ的服务端,帮你把消息从发送端传送到接收端。
4.消息队列Queue:一个先进先出的消息存储区域。消息按照顺序发送接收,一旦消息被消费处理,该消息将从队列中删除。

1)消息的转储:在更合适的时间点投递,或者通过一系列手段辅助消息最终能送达消费机。
2)规范一种范式和通用的模式,以满足解耦、最终一致性、错峰等需求。
3)其实简单理解就是一个消息转发器,把一次RPC做成两次RPC。发送者把消息投递到broker,broker再将消息转发一手到接收端。
总结起来就是两次RPC加一次转储,如果要做消费确认,则是三次RPC。

点对点模型 用于 消息生产者 和 消息消费者 之间 点到点 的通信。
点对点模式包含三个角色:

发布订阅模型包含三个角色:

生产者发送一条消息到队列queue,只有一个消费者能收到。
发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。

基于Queue消息模型,利用FIFO先进先出的特性,可以保证消息的顺序性。

即消息的Ackownledge确认机制,为了保证消息不丢失,消息队列提供了消息Acknowledge机制,即ACK机制,当Consumer确认消息已经被消费处理,发送一个ACK给消息队列,此时消息队列便可以删除这个消息了。如果Consumer宕机/关闭,没有发送ACK,消息队列将认为这个消息没有被处理,会将这个消息重新发送给其他的Consumer重新消费处理。

主要是用“记录”和“补偿”的方式。
1.本地事务维护业务变化和通知消息,一起落地,然后RPC到达broker,在broker成功落地后,RPC返回成功,本地消息可以删除。否则本地消息一直靠定时任务轮询不断重发,这样就保证了消息可靠落地broker。
2.broker往consumer发送消息的过程类似,一直发送消息,直到consumer发送消费成功确认。
3.我们先不理会重复消息的问题,通过两次消息落地加补偿,下游是一定可以收到消息的。然后依赖状态机版本号等方式做判重,更新自己的业务,就实现了最终一致性。
4.如果出现消费方处理过慢消费不过来,要允许消费方主动ack error,并可以与broker约定下次投递的时间。
5.对于broker投递到consumer的消息,由于不确定丢失是在业务处理过程中还是消息发送丢失的情况下,有必要记录下投递的IP地址。决定重发之前询问这个IP,消息处理成功了吗?如果询问无果,再重发。
6.事务:本地事务,本地落地,补偿发送。本地事务做的,是业务落地和消息落地的事务,而不是业务落地和RPC成功的事务。消息只要成功落地,很大程度上就没有丢失的风险。

消息的收发处理支持事务,例如:在任务中心场景中,一次处理可能涉及多个消息的接收、处理,这应该处于同一个事务范围内,如果一个消息处理失败,事务回滚,消息重新回到队列中。

消息的持久化,对于一些关键的核心业务来说是非常重要的,启用消息持久化后,消息队列宕机重启后,消息可以从持久化存储恢复,消息不丢失,可以继续消费处理。

在实际生产环境中,使用单个实例的消息队列服务,如果遇到宕机、重启等系统问题,消息队列就无法提供服务了,因此很多场景下,我们希望消息队列有高可用性支持,例如RabbitMQ的镜像集群模式的高可用性方案,ActiveMQ也有基于LevelDB+ZooKeeper的高可用性方案,以及Kafka的Replication机制等。

rabbitmq - (消息队列) 的基本原理介绍

介绍

MQ全称为Message Queue, 是一种分布式应用程序的的通信方法,它是消费-生产者模型的一个典型的代表,producer往消息队列中不断写入消息,而另一端consumer则可以读取或者订阅队列中的消息。RabbitMQ是MQ产品的典型代表,是一款基于AMQP协议可复用的企业消息系统

系统架构

Rabbitmq系统最核心的组件是Exchange和Queue,Exchange和Queue是在rabbitmq server(又叫做broker)端,producer和consumer在应用端。

原理大致图(MQ:Message Queue):

技术分享图片

Queue

消息队列,提供了FIFO的处理机制,具有缓存消息的能力。rabbitmq中,队列消息可以设置为持久化,临时或者自动删除。

  1. 设置为持久化的队列,queue中的消息会在server本地硬盘存储一份,防止系统crash,数据丢失
  2. 设置为临时队列,queue中的数据在系统重启之后就会丢失
  3. 设置为自动删除的队列,当不存在用户连接到server,队列中的数据会被自动删除

Exchange

Exchange类似于数据通信网络中的交换机,提供消息路由策略。rabbitmq中,producer不是通过信道直接将消息发送给queue,而是先发送给Exchange。一个Exchange可以和多个Queue进行绑定,producer在传递消息的时候,会传递一个ROUTING_KEY,Exchange会根据这个ROUTING_KEY按照特定的路由算法,将消息路由给指定的queue。和Queue一样,Exchange也可设置为持久化,临时或者自动删除。

Exchange有4种类型:direct(默认),fanout, topic, 和headers,不同类型的Exchange转发消息的策略有所区别:

  1. Direct直接交换器,工作方式类似于单播,Exchange会将消息发送完全匹配ROUTING_KEY的Queue
  2. fanout广播是式交换器,不管消息的ROUTING_KEY设置为什么,Exchange都会将消息转发给所有绑定的Queue
  3. topic主题交换器,工作方式类似于组播,Exchange会将消息转发和ROUTING_KEY匹配模式相同的所有队列,比如,ROUTING_KEY为user.stock的Message会转发给绑定匹配模式为 * .stock,user.stock, * . * 和#.user.stock.#的队列。( * 表是匹配一个任意词组,#表示匹配0个或多个词组)
  4. headers消息体的header匹配(ignore)

Binding

所谓绑定就是将一个特定的 Exchange 和一个特定的 Queue 绑定起来。Exchange 和Queue的绑定可以是多对多的关系

通信过程

假设P1和C1注册了相同的Broker,Exchange和Queue。P1发送的消息最终会被C1消费。基本的通信流程大概如下所示:

  1. P1生产消息,发送给服务器端的Exchange
  2. Exchange收到消息,根据ROUTINKEY,将消息转发给匹配的Queue1
  3. Queue1收到消息,将消息发送给订阅者C1
  4. C1收到消息,发送ACK给队列确认收到消息
  5. Queue1收到ACK,删除队列中缓存的此条消息

Consumer收到消息时需要显式的向rabbit broker发送basic.ack消息或者consumer订阅消息时设置auto_ack参数为true。在通信过程中,队列对ACK的处理有以下几种情况:

  1. 如果consumer接收了消息,发送ack,rabbitmq会删除队列中这个消息,发送另一条消息给consumer。
  2. 如果cosumer接受了消息, 但在发送ack之前断开连接,rabbitmq会认为这条消息没有被deliver,在consumer在次连接的时候,这条消息会被redeliver。
  3. 如果consumer接受了消息,但是程序中有bug,忘记了ack,rabbitmq不会重复发送消息。
  4. rabbitmq2.0.0和之后的版本支持consumer reject某条(类)消息,可以通过设置requeue参数中的reject为true达到目地,那么rabbitmq将会把消息发送给下一个注册的consumer。

php 生产者示例

先用 composer 加载 mq 拓展文件

{ 
  "require": { 
    "php-amqplib/php-amqplib": "2.7.*" //增加这行 
  } 
}

 

<?php

require vendor/autoload.php;

use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

$conf = [
        host => 127.0.0.1,
        port => 5672,
        user => kd_dev,
        pwd  => kd_dev,
        vhost => /,
];
$exchangeName = kd_sms_send_ex; //交换机名
$queueName    = kd_sms_send_q; //队列名称
$routingKey   = sms_send; //路由关键字(也可以省略)

$conn = new AMQPStreamConnection( //建立生产者与mq之间的连接
    $conf[host], $conf[port], $conf[user], $conf[pwd], $conf[vhost]
);
$channel = $conn->channel(); //在已连接基础上建立生产者与mq之间的通道


$channel->exchange_declare($exchangeName, direct, false, true, false); //声明初始化交换机
$channel->queue_declare($queueName, false, true, false, false); //声明初始化一条队列
$channel->queue_bind($queueName, $exchangeName, $routingKey); //将队列与某个交换机进行绑定,并使用路由关键字

$msgBody = json_encode(["name" => "iGoo", "age" => 22]);
$msg = new AMQPMessage($msgBody, [content_type => text/plain, delivery_mode => 2]); //生成消息
$r   = $channel->basic_publish($msg, $exchangeName, $routingKey); //推送消息到某个交换机
$channel->close();
$conn->close();

 php 消费者代码示例

<?php

  $bindingkey
=‘sms_send‘;   

  
//连接RabbitMQ   $conn_args = array( host=>127.0.0.1 , port=> 5672, login=>kd_dev , password=> kd_dev,vhost =>/);   $conn = new AMQPConnection($conn_args);   $conn->connect();   
  
//设置queue名称,使用exchange,绑定routingkey   $channel = new AMQPChannel($conn); // 声明一个通道   $q = new AMQPQueue($channel); // 声明一个队列   $q->setName(‘kd_sms_send_q‘); // 路由名
  $q
->setFlags(AMQP_DURABLE);
  $q
->declare();
  $q
->bind(‘kd_sms_send_ex‘,$bindingkey); // 队列绑定交换机

  //消息获取   $messages = $q->get(AMQP_AUTOACK) ;   if ($messages){     var_dump(json_decode($messages->getBody(), true ));   }
  $conn
->disconnect();

 











以上是关于消息队列核心原理的主要内容,如果未能解决你的问题,请参考以下文章

MQ消息队列的12点核心原理总结

高并发架构系列:MQ消息队列的12点核心原理总结

消息队列事务型消息原理浅析

高性能消息队列 CKafka 核心原理介绍(上)

消息队列工作原理

rabbitmq - (消息队列) 的基本原理介绍