4.2.2 Redis协议与异步方式

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了4.2.2 Redis协议与异步方式相关的知识,希望对你有一定的参考价值。

Linux C/C++服务器

Redis协议与异步方式

redis网络层

io多路复用(单reactor)+非阻塞io


哪个管道先构成一个完整的数据包(读事件),谁就先得到处理;
1.一个数据包可能由多个读事件才能组装完成
2.管道就是连接
3.人推车相当于网络线程

redis pipeline

redis pipeline 是一个客户端提供的机制(异步请求),而不是服务端提供的;
pipeline 不具备事务性
目的:节约网络传输时间

redis事务

事务:用户定义一系列数据库操作,这些操作视为一个完整的逻辑处理工作单元,要么全部执行,要么全部不执行,是不可分割的工作单元。
MULTI 开启事务,事务执行过程中,单个命令是入队列操作,直到调用 EXEC 才会一起执行;

命令

  • MULTI(开启事务)
  • EXEC (提交事务)
  • DISCARD (取消事务)
  • WATCH (检测事务) 检测 key 的变动,若在事务执行中,key 变动则取消事务;在事务开启前调用,乐观锁实现(cas);若被取消则事务返回 nil;

应用

事务实现 z pop

WATCH zset
element = ZRANGE zset 0 0
MULTI
ZREM zset element
EXEC

事务实现 加倍操作

WATCH score:10001
val = GET score:10001
MULTI
SET score:10001 val*2
EXEC

lua脚本

lua 脚本实现原子性;
redis 中加载了一个 lua 虚拟机;用来执行 redis lua 脚本;
lua 脚本的执行是原子性的;当某个脚本正在执行的时候,不会有其他命令或者脚本被执行;
lua 脚本当中的命令会直接修改数据状态;
lua 脚本 mysql 存储区别:MySQL存储过程不具备事务性,所以也不具备原子性;
注意:如果项目中使用了 lua 脚本,不需要使用上面的事务命令;

AMQP协议与RabbitMQ

什么是AMQP?

异步通讯中,消息不会立刻到达接收方,而是被存放到一个容器中,当满足一定的条件之后,消息会被容器发送给接收方,这个容器即消息队列,而完成这个功能需要双方和容器以及其中的各个组件遵守统一的约定和规则,
AMQP就是这样的一种协议,消息发送与接受的双方遵守这个协议可以实现异步通讯。这个协议约定了消息的格式和工作方式。

AMQP 中包含的主要元素

生产者Producer):Exchange发布消息的应用。

消费者Consumer):从消息队列queue中消费消息的应用。

消息队列Message Queue):服务器组件,用于保存消息,直到发送给消费者。

Queue:消息载体;每个消息都会被投入到一个或多个队列。

消息Message):传输的内容。

 

交换器exchange:路由组件,接收Producer发送的消息,并根据Routing Key转发消息队列queue

Routing Key:路由关键字,exchange根据这个Routing Key进行消息投递到队列queue

 

虚拟主机Virtual Host: 用作不同用户权限分离;一批交换器,消息队列和相关对象。虚拟主机是共享相同身份认证和加密环境的独立服务器域。

Broker AMQP服务端称为Broker

连接Connection:一个网络连接,比如TCP/IP套接字连接。

信道Channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务;多路复用连接中的一条独立的双向数据流通道,为会话提供物理传输介质。

绑定器Binding:把exchangequeue按照路由规则绑定起来。

 

exchange Queue 的路由机制

生产者在发送消息时,都需要指定一个RoutingKeyExchange,Exchange在接到该RoutingKey以后,会判断该ExchangeType,然后转发到对应的Queue中,所以发消息不需要指定Queue,似乎消费者可以指定Queue绑定到某个Exchange、某个RoutingKey,也可以不指定Queue,就只根据某个Exchange和某个RoutingKey接受到消息


exchange 将消息发送到哪一个queue是由exchange type 和 Binding绑定规则决定的,目前常用的有3exchangeDirect exchange, Fanout exchange, Topic exchange  


Direct exchange 直接转发路由,其实现原理是会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue 


Fanout exchange 复制分发路由,该路由不需要RoutingKey,会将消息发送给所有与该 Exchange 定义过Binding所有Queues中去,其实是一种广播行为 


topic exchange 通配路由,是direct exchange通配符模式,消息中的RoutingKey可以写成通配的模式,exchange支持#*的通配。收到消息后,将消息转发给所有符合匹配正则表达式的Queue 


需要注意的一点只有queue具有保存消息的功能,exchange能保存消息。

 

AMQP 如何实现通信的

(1)建立连接Connection。由producerconsumer创建连接,连接到broker的物理节点上。 

(2)建立消息ChannelChannel是建立在Connection之上的,一个Connection可以建立多个Channelproducer连接Virtual Host 建立ChannelConsumer连接到相应的queue上建立Channel 

(3)发送消息。由Producer发送消息到Broker中的exchange中。 

(4)路由转发。exchange收到消息后,根据一定的路由策略routing key,将消息转发到相应的queue中去。 

(5)消息接收。Consumer会监听相应的queue,一旦queue中有可以消费的消息,queue就将消息发送给Consumer端。 

(6)消息确认。当Consumer完成某一条消息的处理之后,需要发送一条ACK消息给对应的QueueQueue收到ACK信息后,才会认为消息处理成功,并将消息从Queue移除;如果在对应的Channel断开Queue没有收到这条消息的ACK信息,该消息将被发送给另外Channel 至此一个消息的发送接收流程走完了。消息的确认机制提高了通信的可靠性

技术分享图片

 

消息队列的使用大概过程

(1)客户端连接Connection到消息队列服务器Broker,打开一个channel
(2)客户端声明一个exchange,并设置相关属性。
(3)客户端声明一个queue,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
(5)客户端投递消息到exchange

 

RabbitMQ中 exchange、route、queue的关系

技术分享图片

MessageQueue、Exchange和Binding构成了AMQP协议的核心。

  声明MessageQueue 

 

  在Rabbit MQ中,无论是生产者发送消息还是消费者接受消息,都首先需要声明一个MessageQueue。这就存在一个问题,是生产者声明还是消费者声明呢?要解决这个问题,首先需要明确:

 

  a)消费者是无法订阅或者获取不存在的MessageQueue中信息。

 

  b)消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。

 

  在明白了上述两点以后,就容易理解如果是消费者去声明Queue,就有可能会出现在声明Queue之前,生产者已发送的消息被丢弃的隐患。如果应用能够通过消息重发的机制允许消息丢失,则使用此方案没有任何问题。但是如果不能接受该方案,这就需要无论是生产者还是消费者,在发送或者接受消息前,都需要去尝试建立消息队列。

  这里有一点需要明确,如果客户端尝试建立一个已经存在的消息队列,Rabbit MQ不会做任何事情,并返回客户端建立成功的。

 

       如果一个消费者在一个信道中正在监听某一个队列的消息,Rabbit MQ是不允许该消费者在同一个channel去声明其他队列的。Rabbit MQ中,可以通过queue.declare命令声明一个队列,可以设置该队列以下属性:

 

a) Exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。

 

b)   Auto-delete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。

 

 c)   Durable:持久化,这个会在后面作为专门一个章节讨论。

 

d)  其他选项,例如如果用户仅仅想查询某一个队列是否已存在,如果不存在,不想建立该队列,仍然可以调用queue.declare,只不过需要将参数passive设为true,传给queue.declare,如果该队列已存在,则会返回true;如果不存在,则会返回Error,但是不会创建新的队列。

 

  

 

exchange Queue 的路由机制

生产者在发送消息时,都需要指定一个RoutingKeyExchange,Exchange在接到该RoutingKey以后,会判断该ExchangeType,然后转发到对应的Queue中,所以发消息不需要指定Queue,似乎消费者可以指定Queue绑定到某个Exchange、某个RoutingKey,也可以不指定Queue,就只根据某个Exchange和某个RoutingKey接受到消息


exchange 将消息发送到哪一个queue是由exchange type 和 Binding绑定规则决定的,目前常用的有3exchangeDirect exchange, Fanout exchange, Topic exchange  


Direct exchange 直接转发路由,其实现原理是会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue 


Fanout exchange 复制分发路由,该路由不需要RoutingKey,会将消息发送给所有与该 Exchange 定义过Binding所有Queues中去,其实是一种广播行为 


topic exchange 通配路由,是direct exchange通配符模式,消息中的RoutingKey可以写成通配的模式,exchange支持#*的通配。收到消息后,将消息转发给所有符合匹配正则表达式的Queue 


需要注意的一点只有queue具有保存消息的功能,exchange能保存消息。

 

AMQP的应用场景


AMQP是实现消息机制的一种协议,消息队列主要有以下几种应用场景:


异步处理


比如公司新入职一个员工,需要开通系统账号,有几件事情要做,开通系统账号,发短信通知用户,发邮件给员工,在公司内部通讯系统中发送消息给员工。其中发短信,发邮件,发内部通讯系统消息,这三件事情可以串行也可以并行,并行的好处就是可以提高效率,这时可以应用MQ来实现并行。

应用解耦

在公司内部系统中,有人事系统,OA系统,财务系统,外围应用系统等等,当人事发生变动的时候(离职入职调岗),人事系统需要将这些变动通知给其他系统,这时只需人事系统发送一条消息,各个外围系统订阅该消息,就可得知人事变动,与实时服务调用相比,如果人事系统挂掉,各个外围系统不会受到影响,继续运行;如果是实时服务调用,比如人事系统被各个服务调用,人事系统挂了,调用人事系统的服务都会受到影响

流量缓冲


在有些流量会瞬间暴增的场景下,如秒杀,为了防止流量突然增大而使得应用挂掉,可以引入MQ,将请求存入MQ中,如果超过了MQ的长度,就把请求丢弃掉,这样来限制流量

日志处理


将消息队列引入到日志处理中,如kafka的应用,解决了大量日志的传输问题。日志客户端负责采集日志数据,并定期写入kafka队列,kafka负责接收,存储和转发日志,日志处理系统订阅并消费kafka中的日志数据。


SpringBoot+RabbitMQ的简单demo

 https://www.cnblogs.com/theRhyme/p/10071781.html

 

 

来源:

https://blog.csdn.net/letempsar/article/details/52565020

https://blog.csdn.net/ztx114/article/details/78410727

https://www.cnblogs.com/linkenpark/p/5393666.html











以上是关于4.2.2 Redis协议与异步方式的主要内容,如果未能解决你的问题,请参考以下文章

python测试开发django-157.celery异步与redis环境搭建

Redis、Kafka或RabbitMQ:哪个作为微服务消息代理最合适?

Redis Stream 流的深度解析与实现高级消息队列一万字

Redis Stream 流的深度解析与实现高级消息队列一万字

Redis Stream 流的深度解析与实现高级消息队列一万字

zookpeer 和 redis 集群内一致性协议 及 选举 对比