RabbitMQ使用介绍2—Work queues
Posted venvive
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ使用介绍2—Work queues相关的知识,希望对你有一定的参考价值。
Work queues
接下来是part2
在这一项中,我们创建一个工作队列,用于在多个工作者之间分配耗时的任务。
Work Queues的主要思想是,避免立即执行资源密集的任务而不得不等待其执行完成。我们将任务封装为消息并将其发送到队列中,在后台运行的一个工作进程将会弹出任务并最终执行该任务,当你管理许多工作节点时,任务就会在他们之间共享。
这个概念在web应用程序中尤其有用,因为在一个短HTTP请求窗口中不可能处理复杂的任务
下面我们发送一个特殊的String,用Thread.sleep()辅助,来模拟一些耗时的工作,用点来简单表示一个任务的复杂度,例如Hello.表示此任务需要两秒进行处理。
我们将在原来生产者的基础上修改一些代码,文件名叫send_work.py:
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))#默认端口5672,可不写
#创建通道,声明一个管道,在管道里发送消息
channel = connection.channel()
#在管道里声明queue
channel.queue_declare(queue='hello')
message = ''.join(sys.argv[1:]) or "Hello World"
#一条消息永远不能直接发送到队列,它总需要经过一个交换exchange
channel.basic_publish(exchange='',
routing_key='hello',
body=message)#设置routing_key(消息队列的名称)和body(发送的内容)
print("[x] Sent %r" % message)
connection.close()#关闭连接,队列关闭
我们将在原来消费者的基础上修改一些代码,文件名叫receive_work.py:
#receiving(消费者接收者)
import pika
import time
#创建一个连接
connection = pika.BlockingConnection(
pika.ConnectionParameters('localhost'))#默认端口5672,可不写
#创建通道,声明一个管道,在管道里发送消息
channel = connection.channel()
#把消息队列的名字为hello,把消费者和queue绑定起来,生产者和queue的也是hello
#为什么又声明了一个hello队列
#如果确定已经声明了,可以不声明。但是你不知道那个机器先运行,所以要声明两次
channel.queue_declare(queue='hello')
#回调函数get消息体
def callback(ch,method,properties,body):#四个参数为标准格式
#管道内存对象,内容相关信息
print("打印看下是什么:",ch,method,properties) #打印看下是什么
print(" [x] Received %r" % body)
time.sleep(body.count(b'.'))
print("[x] Done")
#消费消息
channel.basic_consume(
queue='hello',#你要从那个队列里收消息
on_message_callback=callback,#如果收到消息,就调用callback函数来处理消息
auto_ack=True #写的话,如果接收消息,机器宕机消息就丢了
#一般不写,宕机则生产者检测到发给其他消费者
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #创建死循环,监听消息队列,可使用CTRL+C结束监听
对于上面的,开启两个控制台的消费者,开启一个生产者的控制台,运行的结果如下:
默认情况下,RabbitMQ将按顺序将每个消息发送给下一个消费者,平均每个消费者将得到相同数量的消息。这种分发消息的方式称为循环(平均分配)
消息确认(消息答复)
完成一项任务可能需要一定的时间。你可能会想,如果一个消费者开始一项长时间的任务,并且只完成了一部分,那么会发生什么。在我们当前的代码中,一旦RabbitMQ向客户发送一条消息,它立即将其标记为删除。在这种情况下。如果我们强制关闭了一个工作节点。我们将丢失它正在处理的消息。我们还将丢失发送给这个特定工作者的所有消息,但是还没有处理
通常我们不希望因为一个节点挂掉而丢失任何消息,而希望能够将这些消息传递给其他存活节点进行处理
确保消息能够不丢失,
RabbitMQ支持message acknowledgments(消息确认),一条特定的消息被接收后,返回一个ack告诉RabbitMQ可以随意地进行删除
如果一个消费者挂了(通道关闭,TCP连接关闭)就不能发送ack给RabbitMQ,此时RabbitMQ就会意识到,某条消息没有被处理完成,那么就会将其重新发送到其他消费者。这种处理流程保证了信息不会丢失,即使偶尔有消费者挂掉
没有任何消息超时,
RabbitMQ将在某个消费者挂掉后重新传递消息,即使处理消息需要很长时间
以上是关于RabbitMQ使用介绍2—Work queues的主要内容,如果未能解决你的问题,请参考以下文章
2. RabbitMQ 之Work Queues (工作队列)