在 pika / RabbitMQ 中处理长时间运行的任务

Posted

技术标签:

【中文标题】在 pika / RabbitMQ 中处理长时间运行的任务【英文标题】:Handling long running tasks in pika / RabbitMQ 【发布时间】:2013-01-12 09:08:30 【问题描述】:

我们正在尝试建立一个基本的定向队列系统,其中生产者将生成多个任务,而一个或多个消费者将一次获取一个任务,对其进行处理并确认消息。

问题是,处理可能需要 10-20 分钟,而我们当时没有响应消息,导致服务器断开我们的连接。

这是我们消费者的一些伪代码:

#!/usr/bin/env python
import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def callback(ch, method, properties, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

第一个任务完成后,在 BlockingConnection 深处的某个地方抛出异常,抱怨套接字被重置。此外,RabbitMQ 日志显示消费者因未及时响应而断开连接(为什么它重置连接而不是发送 FIN 很奇怪,但我们不会担心)。

我们搜索了很多,因为我们认为这是 RabbitMQ 的正常用例(有很多长时间运行的任务应该在许多消费者之间分配),但似乎没有其他人真正遇到过这个问题。最后,我们偶然发现了一个线程,建议使用心跳并在单独的线程中生成long_running_task()

所以代码变成了:

#!/usr/bin/env python
import pika
import time
import threading

connection = pika.BlockingConnection(pika.ConnectionParameters(
        host='localhost',
        heartbeat_interval=20))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'

def thread_func(ch, method, body):
    long_running_task(connection)
    ch.basic_ack(delivery_tag = method.delivery_tag)

def callback(ch, method, properties, body):
    threading.Thread(target=thread_func, args=(ch, method, body)).start()

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

这似乎有效,但它非常混乱。我们确定ch 对象是线程安全的吗?此外,假设long_running_task() 正在使用该连接参数将任务添加到新队列(即这个漫长过程的第一部分已经完成,让我们将任务发送到第二部分)。因此,线程正在使用connection 对象。那个线程安全吗?

更重要的是,这样做的首选方式是什么?我觉得这很混乱,可能不是线程安全的,所以也许我们做得不对。谢谢!

【问题讨论】:

我也有同样的问题。文档说 pika 连接不是线程安全的 pika.readthedocs.org/en/latest/faq.html 谢谢你的问题,遇到了同样的问题。 【参考方案1】:

目前,您最好的选择是关闭心跳,如果您阻塞太久,这将阻止 RabbitMQ 关闭连接。我正在试验 pika 的核心连接管理和在后台线程中运行的 IO 循环,但它不够稳定,无法发布。

在pika v1.1.0 这是ConnectionParameters(heartbeat=0)

【讨论】:

正如@Gavin 所说,目前最好的选择是在建立连接时关闭 pika 的心跳。 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', virtual_host='TestVirtualHost', credentials=credentials, heartbeat_interval=0, port=5672)) 鼠兔0.12.0有更好的解决方案,请看this answer 谢谢,它有效。如果它仍然不适合你。请注意,heartbeat 参数应设置为对等方(消费者和生产者)。这就是我的情况。 设置ConnectionParameters(heartbeat=0) 是安全的。因为当你杀死这个进程时,连接会立即自动关闭。你可以去rabbit_mq_server:15672/#/connections验证一下。【参考方案2】:

我遇到了同样的问题。 我的解决方案是:

    关闭服务器端的心跳 评估任务可能花费的最长时间 将客户端心跳超时设置为从第 2 步获得的时间

为什么会这样?

当我测试以下情况时:

案例一
    服务器心跳开启,1800s 客户端未设置

任务运行很长时间还是报错 -->1800

案例二
    关闭服务器心跳 关闭客户端心跳

客户端没有错误,除了一个问题--当客户端崩溃时(我的操作系统因某些故障而重新启动),在Rabbitmq管理插件中仍然可以看到tcp连接。而且很混乱。

案例三
    关闭服务器心跳 打开客户端心跳,将其设置为可预见的最大运行时间

在这种情况下,我可以动态更改单个客户端上的每个热信号。事实上,我在经常死机的机器上设置了心跳。而且,我可以通过Rabbitmq Manangement插件看到离线机器。

环境

操作系统:centos x86_64 鼠兔:0.9.13 rabbitmq: 3.3.1

【讨论】:

如何开启客户端心跳?找不到任何有关如何操作的信息。 你可以试试这样的:params = pika.ConnectionParameters(host=self.__host, port=self.__port, credentials=credentials, heartbeat_interval=<your-interval-in-seconds>) 我应该先试试你的方法,让我省了很多头痛和头发。感谢您提供有用的见解。【参考方案3】:

不要禁用心跳。 最好的解决方案是在单独的线程中运行任务,并将prefetch_count 设置为1,以便消费者只获得 1 条未确认消息 使用类似channel.basic_qos(prefetch_count=1)

https://github.com/pika/pika/issues/753#issuecomment-318124510 https://www.rabbitmq.com/consumer-prefetch.html

【讨论】:

【参考方案4】:
    你可以在你的long_running_task(connection)中定期调用connection.process_data_events(),这个函数被调用时会向服务器发送心跳,并保持pika客户端远离关闭。 在您的鼠兔BlockingConnection 中设置大于调用connection.process_data_events() 周期的心跳值。

【讨论】:

connection.process_data_events()帮帮我【参考方案5】:

请不要禁用心跳!

从 Pika 0.12.0 开始,请使用 this example code 中描述的技术在单独的线程上运行您的长时间运行的任务,然后确认来自该线程的消息。


注意:RabbitMQ 团队监控the rabbitmq-users mailing list,并且有时只回答 *** 上的问题。

【讨论】:

为什么禁用心跳是一件坏事? RabbitMQ 和您的应用程序都不会检测到丢失的 TCP 连接,直到对该连接尝试下一个操作。 我明白了,但根据用例,这不一定是坏事。在我的情况下,我宁愿在尝试确认消息时出错,因为连接丢失,而不是在尝试确认消息时出错,因为我的处理时间太长而导致连接关闭。这就是为什么不禁用心跳的一般警告似乎是不合理的 IMO。这一切都取决于用例,所以我相信说为什么你可能不应该考虑禁用它们而不是在没有任何进一步信息的情况下“禁用 = 坏”会更有成效。 @LukeBakken 有没有办法将此方法与 channel.basic_get 一起使用?我需要我的消费者消费一条消息,确认它然后死/退出,我可以让它只消费一条带有 basic_get 的消息,但是我不能让它确认(长时间运行的)消息。 这是最好的和正确的解决方案。谢谢@LukeBakken。【参考方案6】:

您也可以建立一个新线程,在这个新线程中处理消息,并在该线程处于活动状态时在连接上调用.sleep,以防止丢失心跳。这是从 github 中的@gmr 获取的示例代码块,以及指向该问题的链接以供将来参考。

import re
import json
import threading

from google.cloud import bigquery
import pandas as pd
import pika
from unidecode import unidecode

def process_export(url, tablename):
    df = pd.read_csv(csvURL, encoding="utf-8")
    print("read in the csv")
    columns = list(df)
    ascii_only_name = [unidecode(name) for name in columns]
    cleaned_column_names = [re.sub("[^a-zA-Z0-9_ ]", "", name) for name in ascii_only_name]
    underscored_names = [name.replace(" ", "_") for name in cleaned_column_names]
    valid_gbq_tablename = "test." + tablename
    df.columns = underscored_names

    # try:
    df.to_gbq(valid_gbq_tablename, "some_project", if_exists="append", verbose=True, chunksize=10000)
    # print("Finished Exporting")
    # except Exception as error:
    #     print("unable to export due to: ")
    #     print(error)
    #     print()

def data_handler(channel, method, properties, body):
    body = json.loads(body)

    thread = threading.Thread(target=process_export, args=(body["csvURL"], body["tablename"]))
    thread.start()
    while thread.is_alive():  # Loop while the thread is processing
        channel._connection.sleep(1.0)
    print('Back from thread')
    channel.basic_ack(delivery_tag=method.delivery_tag)


def main():
    params = pika.ConnectionParameters(host='localhost', heartbeat=60)
    connection = pika.BlockingConnection(params)
    channel = connection.channel()
    channel.queue_declare(queue="some_queue", durable=True)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(data_handler, queue="some_queue")
    try:
        channel.start_consuming()
    except KeyboardInterrupt:
        channel.stop_consuming()
    channel.close()

if __name__ == '__main__':
    main()

链接: https://github.com/pika/pika/issues/930#issuecomment-360333837

【讨论】:

请参考 Luke Bakken 提供的解决方案。它是线程安全的,参考 pika 文档中的官方示例。【参考方案7】:

这是使用线程处理此问题的一种更简单的方法。如果消费者应用程序在当前作业完成之前不应使用另一个作业,则特别有用。 ack 可以随时发送 - 在这种情况下,我选择仅在作业完成时发送它(线程不再活动)。

在自己的线程中启动长时间运行的进程,然后通过调用 channel.process_data_events() 在循环中监视该线程。在主线程中保留对连接对象的引用,因为它不是线程安全的。本质上:

import time
import pika
from threading import Thread
from functools import partial

rmqconn = pika.BlockingConnection( ... )
rmqchan = rmqconn.channel()
rmqchan.basic_consume(
    queue='test',
    on_message_callback=partial(launch_process,rmqconn)
)
rmqchan.start_consuming()

def launch_process(conn,ch,method,properties,body):
    runthread = Thread(target=run_process,args=body)
    runthread.start()
    while runthread.is_alive():
        time.sleep(2)
        conn.process_data_events()
    ch.basic_ack(delivery_tag=method.delivery_tag)

def run_process(body):
    #do the long-running thing
    time.sleep(10)

【讨论】:

以上是关于在 pika / RabbitMQ 中处理长时间运行的任务的主要内容,如果未能解决你的问题,请参考以下文章

Pika - 处理RabbitMQ连接丢失

RabbitMQ 实现实现基本通信及订单处理

RabbitMQ 的 Pika 超时问题

当我尝试使用 pika (python) 向 RabbitMQ 确认消息时出现错误“未知的传递标签”

python采用pika库使用rabbitmq --工作队列

python使用pika操作rabbitmq总结