4.2.2 Redis协议与异步方式
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了4.2.2 Redis协议与异步方式相关的知识,希望对你有一定的参考价值。
Linux C/C++服务器
Redis协议与异步方式
redis网络层
io多路复用(单reactor)+非阻塞io
哪个管道先构成一个完整的数据包(读事件),谁就先得到处理;
1.一个数据包可能由多个读事件才能组装完成
2.管道就是连接
3.人推车相当于网络线程
redis pipeline
redis pipeline 是一个客户端提供的机制(异步请求),而不是服务端提供的;
pipeline 不具备事务性
目的:节约网络传输时间
redis事务
事务:用户定义一系列数据库操作,这些操作视为一个完整的逻辑处理工作单元,要么全部执行,要么全部不执行,是不可分割的工作单元。
MULTI 开启事务,事务执行过程中,单个命令是入队列操作,直到调用 EXEC 才会一起执行;
命令
- MULTI(开启事务)
- EXEC (提交事务)
- DISCARD (取消事务)
- WATCH (检测事务) 检测 key 的变动,若在事务执行中,key 变动则取消事务;在事务开启前调用,乐观锁实现(cas);若被取消则事务返回 nil;
应用
事务实现 z pop
WATCH zset
element = ZRANGE zset 0 0
MULTI
ZREM zset element
EXEC
事务实现 加倍操作
WATCH score:10001
val = GET score:10001
MULTI
SET score:10001 val*2
EXEC
lua脚本
lua 脚本实现原子性;
redis 中加载了一个 lua 虚拟机;用来执行 redis lua 脚本;
lua 脚本的执行是原子性的;当某个脚本正在执行的时候,不会有其他命令或者脚本被执行;
lua 脚本当中的命令会直接修改数据状态;
lua 脚本 mysql 存储区别:MySQL存储过程不具备事务性,所以也不具备原子性;
注意:如果项目中使用了 lua 脚本,不需要使用上面的事务命令;
AMQP协议与RabbitMQ
什么是AMQP?
在异步通讯中,消息不会立刻到达接收方,而是被存放到一个容器中,当满足一定的条件之后,消息会被容器发送给接收方,这个容器即消息队列,而完成这个功能需要双方和容器以及其中的各个组件遵守统一的约定和规则,
AMQP就是这样的一种协议,消息发送与接受的双方遵守这个协议可以实现异步通讯。这个协议约定了消息的格式和工作方式。
AMQP 中包含的主要元素
生产者(Producer):向Exchange发布消息的应用。
消费者(Consumer):从消息队列queue中消费消息的应用。
消息队列(Message Queue):服务器组件,用于保存消息,直到发送给消费者。
Queue:消息载体;每个消息都会被投入到一个或多个队列。
消息(Message):传输的内容。
交换器(exchange):路由组件,接收Producer发送的消息,并根据Routing Key转发给消息队列queue。
Routing Key:路由关键字,exchange根据这个Routing Key进行消息投递到队列queue。
虚拟主机(Virtual Host): 用作不同用户的权限分离;一批交换器,消息队列和相关对象。虚拟主机是共享相同身份认证和加密环境的独立服务器域。
Broker :AMQP的服务端称为Broker。
连接(Connection):一个网络连接,比如TCP/IP套接字连接。
信道(Channel):消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务;多路复用连接中的一条独立的双向数据流通道,为会话提供物理传输介质。
绑定器(Binding):把exchange和queue按照路由规则绑定起来。
exchange 与 Queue 的路由机制
生产者在发送消息时,都需要指定一个RoutingKey和Exchange,Exchange在接到该RoutingKey以后,会判断该ExchangeType,然后转发到对应的Queue中,所以发消息不需要指定Queue,似乎消费者可以指定Queue绑定到某个Exchange、某个RoutingKey,也可以不指定Queue,就只根据某个Exchange和某个RoutingKey接受到消息。
exchange 将消息发送到哪一个queue是由exchange type 和 Binding绑定规则决定的,目前常用的有3种exchange,Direct exchange, Fanout exchange, Topic exchange 。
Direct exchange 直接转发路由,其实现原理是会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。
Fanout exchange 复制分发路由,该路由不需要RoutingKey,会将消息发送给所有与该 Exchange 定义过Binding的所有Queues中去,其实是一种广播行为。
topic exchange 通配路由,是direct exchange的通配符模式,消息中的RoutingKey可以写成通配的模式,exchange支持“#”和“*” 的通配。收到消息后,将消息转发给所有符合匹配正则表达式的Queue。
需要注意的一点只有queue具有保存消息的功能,exchange不能保存消息。
AMQP 如何实现通信的
(1)建立连接Connection。由producer和consumer创建连接,连接到broker的物理节点上。
(2)建立消息Channel。Channel是建立在Connection之上的,一个Connection可以建立多个Channel。producer连接Virtual Host 建立Channel,Consumer连接到相应的queue上建立Channel。
(3)发送消息。由Producer发送消息到Broker中的exchange中。
(4)路由转发。exchange收到消息后,根据一定的路由策略routing key,将消息转发到相应的queue中去。
(5)消息接收。Consumer会监听相应的queue,一旦queue中有可以消费的消息,queue就将消息发送给Consumer端。
(6)消息确认。当Consumer完成某一条消息的处理之后,需要发送一条ACK消息给对应的Queue。Queue收到ACK信息后,才会认为消息处理成功,并将消息从Queue中移除;如果在对应的Channel断开后,Queue没有收到这条消息的ACK信息,该消息将被发送给另外的Channel。 至此一个消息的发送接收流程走完了。消息的确认机制提高了通信的可靠性。
消息队列的使用大概过程
(1)客户端连接Connection到消息队列服务器Broker,打开一个channel。
(2)客户端声明一个exchange,并设置相关属性。
(3)客户端声明一个queue,并设置相关属性。
(4)客户端使用routing key,在exchange和queue之间建立好绑定关系。
(5)客户端投递消息到exchange。
RabbitMQ中 exchange、route、queue的关系
MessageQueue、Exchange和Binding构成了AMQP协议的核心。
声明MessageQueue
在Rabbit MQ中,无论是生产者发送消息还是消费者接受消息,都首先需要声明一个MessageQueue。这就存在一个问题,是生产者声明还是消费者声明呢?要解决这个问题,首先需要明确:
a)消费者是无法订阅或者获取不存在的MessageQueue中信息。
b)消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。
在明白了上述两点以后,就容易理解如果是消费者去声明Queue,就有可能会出现在声明Queue之前,生产者已发送的消息被丢弃的隐患。如果应用能够通过消息重发的机制允许消息丢失,则使用此方案没有任何问题。但是如果不能接受该方案,这就需要无论是生产者还是消费者,在发送或者接受消息前,都需要去尝试建立消息队列。
这里有一点需要明确,如果客户端尝试建立一个已经存在的消息队列,Rabbit MQ不会做任何事情,并返回客户端建立成功的。
如果一个消费者在一个信道中正在监听某一个队列的消息,Rabbit MQ是不允许该消费者在同一个channel去声明其他队列的。Rabbit MQ中,可以通过queue.declare命令声明一个队列,可以设置该队列以下属性:
a) Exclusive:排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。
b) Auto-delete:自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
c) Durable:持久化,这个会在后面作为专门一个章节讨论。
d) 其他选项,例如如果用户仅仅想查询某一个队列是否已存在,如果不存在,不想建立该队列,仍然可以调用queue.declare,只不过需要将参数passive设为true,传给queue.declare,如果该队列已存在,则会返回true;如果不存在,则会返回Error,但是不会创建新的队列。
exchange 与 Queue 的路由机制
生产者在发送消息时,都需要指定一个RoutingKey和Exchange,Exchange在接到该RoutingKey以后,会判断该ExchangeType,然后转发到对应的Queue中,所以发消息不需要指定Queue,似乎消费者可以指定Queue绑定到某个Exchange、某个RoutingKey,也可以不指定Queue,就只根据某个Exchange和某个RoutingKey接受到消息。
exchange 将消息发送到哪一个queue是由exchange type 和 Binding绑定规则决定的,目前常用的有3种exchange,Direct exchange, Fanout exchange, Topic exchange 。
Direct exchange 直接转发路由,其实现原理是会将消息中的RoutingKey与该Exchange关联的所有Binding中的BindingKey进行比较,如果相等,则发送到该Binding对应的Queue中。
Fanout exchange 复制分发路由,该路由不需要RoutingKey,会将消息发送给所有与该 Exchange 定义过Binding的所有Queues中去,其实是一种广播行为。
topic exchange 通配路由,是direct exchange的通配符模式,消息中的RoutingKey可以写成通配的模式,exchange支持“#”和“*” 的通配。收到消息后,将消息转发给所有符合匹配正则表达式的Queue。
需要注意的一点只有queue具有保存消息的功能,exchange不能保存消息。
AMQP的应用场景
AMQP是实现消息机制的一种协议,消息队列主要有以下几种应用场景:
异步处理
应用解耦
流量缓冲
日志处理
SpringBoot+RabbitMQ的简单demo
https://www.cnblogs.com/theRhyme/p/10071781.html
来源:
https://blog.csdn.net/letempsar/article/details/52565020
https://blog.csdn.net/ztx114/article/details/78410727
https://www.cnblogs.com/linkenpark/p/5393666.html
以上是关于4.2.2 Redis协议与异步方式的主要内容,如果未能解决你的问题,请参考以下文章
python测试开发django-157.celery异步与redis环境搭建
Redis、Kafka或RabbitMQ:哪个作为微服务消息代理最合适?
Redis Stream 流的深度解析与实现高级消息队列一万字
Redis Stream 流的深度解析与实现高级消息队列一万字