RabbitMQ使用介绍2—Work queues

Posted venvive

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ使用介绍2—Work queues相关的知识,希望对你有一定的参考价值。

Work queues

接下来是part2

技术图片

在这一项中,我们创建一个工作队列,用于在多个工作者之间分配耗时的任务。

Work Queues的主要思想是,避免立即执行资源密集的任务而不得不等待其执行完成。我们将任务封装为消息并将其发送到队列中,在后台运行的一个工作进程将会弹出任务并最终执行该任务,当你管理许多工作节点时,任务就会在他们之间共享。

这个概念在web应用程序中尤其有用,因为在一个短HTTP请求窗口中不可能处理复杂的任务

下面我们发送一个特殊的String,用Thread.sleep()辅助,来模拟一些耗时的工作,用点来简单表示一个任务的复杂度,例如Hello.表示此任务需要两秒进行处理。

我们将在原来生产者的基础上修改一些代码,文件名叫send_work.py:

import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
    'localhost'))#默认端口5672,可不写

#创建通道,声明一个管道,在管道里发送消息
channel = connection.channel()
#在管道里声明queue
channel.queue_declare(queue='hello')
message = ''.join(sys.argv[1:]) or "Hello World"
#一条消息永远不能直接发送到队列,它总需要经过一个交换exchange
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)#设置routing_key(消息队列的名称)和body(发送的内容)
print("[x] Sent %r" % message)
connection.close()#关闭连接,队列关闭

我们将在原来消费者的基础上修改一些代码,文件名叫receive_work.py:

#receiving(消费者接收者)
import pika
import time
#创建一个连接
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost'))#默认端口5672,可不写
#创建通道,声明一个管道,在管道里发送消息
channel = connection.channel()

#把消息队列的名字为hello,把消费者和queue绑定起来,生产者和queue的也是hello
#为什么又声明了一个hello队列
#如果确定已经声明了,可以不声明。但是你不知道那个机器先运行,所以要声明两次
channel.queue_declare(queue='hello')

#回调函数get消息体
def callback(ch,method,properties,body):#四个参数为标准格式
    #管道内存对象,内容相关信息
    print("打印看下是什么:",ch,method,properties) #打印看下是什么
    print(" [x] Received %r" % body)
    time.sleep(body.count(b'.'))
    print("[x] Done")
    
#消费消息
channel.basic_consume(
    queue='hello',#你要从那个队列里收消息
    on_message_callback=callback,#如果收到消息,就调用callback函数来处理消息
    auto_ack=True #写的话,如果接收消息,机器宕机消息就丢了
    #一般不写,宕机则生产者检测到发给其他消费者
)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming() #创建死循环,监听消息队列,可使用CTRL+C结束监听

对于上面的,开启两个控制台的消费者,开启一个生产者的控制台,运行的结果如下:

默认情况下,RabbitMQ将按顺序将每个消息发送给下一个消费者,平均每个消费者将得到相同数量的消息。这种分发消息的方式称为循环(平均分配)

消息确认(消息答复)

完成一项任务可能需要一定的时间。你可能会想,如果一个消费者开始一项长时间的任务,并且只完成了一部分,那么会发生什么。在我们当前的代码中,一旦RabbitMQ向客户发送一条消息,它立即将其标记为删除。在这种情况下。如果我们强制关闭了一个工作节点。我们将丢失它正在处理的消息。我们还将丢失发送给这个特定工作者的所有消息,但是还没有处理

通常我们不希望因为一个节点挂掉而丢失任何消息,而希望能够将这些消息传递给其他存活节点进行处理

确保消息能够不丢失,

RabbitMQ支持message acknowledgments(消息确认),一条特定的消息被接收后,返回一个ack告诉RabbitMQ可以随意地进行删除

如果一个消费者挂了(通道关闭,TCP连接关闭)就不能发送ack给RabbitMQ,此时RabbitMQ就会意识到,某条消息没有被处理完成,那么就会将其重新发送到其他消费者。这种处理流程保证了信息不会丢失,即使偶尔有消费者挂掉

没有任何消息超时,

RabbitMQ将在某个消费者挂掉后重新传递消息,即使处理消息需要很长时间

以上是关于RabbitMQ使用介绍2—Work queues的主要内容,如果未能解决你的问题,请参考以下文章

2. RabbitMQ 之Work Queues (工作队列)

PHP rabbitmq Work queues

Rabbitmq_02 Work Queues

你了解RabbitMQ吗?我这就教你学会Work-Queue模式下的RabbitMQ。

RabbitMQ - Work queues

RabbitMQ的work queue