使用 Celery 通过 Gevent 进行实时、同步的外部 API 查询
Posted
技术标签:
【中文标题】使用 Celery 通过 Gevent 进行实时、同步的外部 API 查询【英文标题】:Using Celery for Realtime, Synchronous External API Querying with Gevent 【发布时间】:2013-04-26 01:15:47 【问题描述】:我正在开发一个 Web 应用程序,该应用程序将接收来自用户的请求,并且必须点击多个外部 API 来组合该请求的答案。这可以直接从主网络线程使用 gevent 之类的东西来扇出请求来完成。
另外,我在想,我可以将传入的请求放入队列并使用工作人员来分配负载。这个想法是尽量保持实时,同时将请求分配给几个工作人员。这些工作人员中的每一个都将只查询众多外部 API 中的一个。然后,他们收到的响应将经过一系列转换,保存到数据库中,转换为通用模式并保存在通用数据库中,最终组合成一个大响应,该响应将通过 Web 请求返回。 Web 请求很可能一直被阻塞,用户在等待,所以保持 尽可能快的排队和出队很重要。
外部 API 调用可以轻松转换为单独的任务。我认为链接 从一个 api 任务到转换为数据库保存任务可以使用链等来完成,最终结果将所有结果组合在一起使用和弦返回到 web 线程。
一些问题:
这可以(并且应该)用芹菜来完成吗? 我正在使用 django。我应该尝试在普通芹菜上使用 django-celery 吗? 这些任务中的每一项都可能衍生出其他任务 - 例如记录刚刚记录的内容 发生或其他类型的分支。这可能吗? 任务是否可以返回他们获得的数据 - 即可能通过 celery(在本例中为 redis 作为底层)的 Kb 数据,或者他们应该写入数据库,然后只传递指向该数据的指针? 每个任务主要是 I/O 绑定的,最初只是要使用来自 Web 线程的 gevent 来扇出请求并跳过整个队列设计,但事实证明它将被重用于不同的组件。试图通过 Qs 实时保持整个往返行程可能需要许多工作人员确保队列大部分是空的。或者是吗?运行 gevent 工作程序池会对此有所帮助吗? 我必须编写 gevent 特定任务还是使用 gevent 池自动处理网络 IO? 是否可以为某些任务分配优先级? 如何保持它们井井有条? 我应该跳过芹菜而只使用昆布吗? 似乎 celery 更适合于可以推迟的“任务” 对时间不敏感。我是不是想保持这个实时性? 我还应该考虑哪些其他技术?更新:试图进一步解决这个问题。我在 Kombu 上做了一些阅读,它似乎能够做我想做的事,尽管水平比 celery 低得多。这是我想到的图表。
使用 Kombu 可访问的原始队列似乎可以实现许多工作人员订阅广播消息的能力。如果使用队列,则发布者不需要知道类型和编号。可以使用 Celery 实现类似的功能吗?似乎如果你想制作和弦,你需要在运行时知道和弦中将涉及哪些任务,而在这种情况下,你可以简单地将听众添加到广播中,并简单地确保他们宣布他们在运行以将响应添加到最终队列。
更新 2:我看到有 ability to broadcast 你能把它和和弦结合起来吗?一般来说,你能把芹菜和生海带结合起来吗?这听起来像是一个关于冰沙的问题。
【问题讨论】:
我个人会使用 rabbitMQ + kombu 来解决这个问题。最好为将由多个线程处理的事件设置侦听器,最后在收集所有数据后发送响应。它通常听起来像是一个事件驱动的架构,并且包含一些好的队列/观察者机制是内在的。 哦!当然 - 如果应用程序很小并且没有多个层,您可以在 Python-twisted 中实现它,这对于从自身发出所有这些阻塞请求非常有用。 @wojciechz 感谢您的建议。我一直在研究 Kombu。看来 Kombu 的扇出交换将是我正在寻找的。是否有可能将它与在另一边工作的 celery 应用程序/任务结合起来? 坦率地说 - 我不知道。但只要我们在谈论编写软件的和平 - 是的 - 这是可能的:) 你是怎么解决这个问题的? 【参考方案1】:我会尽可能多地回答问题。
这可以(并且应该)用芹菜来完成吗?
是的,你可以
我正在使用 django。我应该尝试在普通芹菜上使用 django-celery 吗?
Django 对 celery 有很好的支持,可以让开发过程中的生活更轻松
这些任务中的每一项都可能衍生出其他任务 - 例如日志记录 刚刚发生的事情或其他类型的分支。这可能吗?
您可以从使用 ignore_result = true 的任务开始子任务,仅用于副作用
任务可能会返回他们获得的数据 - 即可能是 Kb 通过 celery 的数据(在这种情况下作为底层的 redis)或者他们应该 写入数据库,然后只传递指向该数据的指针?
我建议将结果放在 db 中,然后传递 id 会让你的经纪人和工人高兴。减少数据传输/酸洗等。
每个任务主要是 I/O 绑定的,最初只是打算使用 来自网络线程的 gevent 以扇出请求并跳过整个请求 排队设计,但事实证明它会被重复使用 不同的组件。试图保持整个往返行程 Qs 实时可能需要许多工人确保 queueus 大多是空的。或者是吗?会运行 gevent 工作者 对此有帮助吗?
由于进程是 io 绑定的,因此 gevent 肯定会在这里提供帮助。但是,gevent pool'd worker 的并发性应该是多少,我也在寻找答案。
我是否必须编写 gevent 特定任务或将使用 gevent 池 自动处理网络IO?
当您在池中使用 Gevent 时,它会自动修补猴子。但是您使用的库应该可以很好地与 gevent 配合使用。否则,如果您使用 simplejson(用 c 编写)解析一些数据,那么这将阻止其他 gevent greenlets。
是否可以为某些任务分配优先级?
您不能为某些任务分配特定的优先级,而是将它们路由到不同的队列,然后让不同数量的工作人员监听这些队列。特定队列的工作人员越多,该队列上该任务的优先级就越高。
如何保持它们井井有条?
链是维持秩序的一种方式。和弦是一个很好的总结方式。芹菜会照顾它,所以你不必担心它。即使使用 gevent 池,最后也可以推断出任务执行的顺序。
我应该跳过芹菜而只使用kombu吗?
如果您的用例不会随着时间的推移变得更复杂,并且您愿意自己通过 celeryd + supervisord 管理您的流程,那么您可以。另外,如果你不关心celerymon、flower等工具自带的任务监控。
似乎 celery 更适合于可以完成的“任务” 延迟且对时间不敏感。
Celery 也支持计划任务。如果这就是你所说的那句话的意思。
我是不是想保持这个实时性?
我不这么认为。只要您的消费者足够快,它就会像实时一样好。
我还应该看哪些其他技术?
关于芹菜,你应该明智地选择结果存储。我的建议是使用 cassandra。它适用于实时数据(无论是写入还是查询)。您也可以使用 redis 或 mongodb。他们带有自己的一系列问题作为结果存储。但是随后对配置进行一些调整可能会有很长的路要走。
如果您的意思与 celery 完全不同,那么您可以研究 asyncio (python3.5) 和 zeromq 来实现相同的目的。不过,我无法对此发表更多评论。
【讨论】:
我有与这个问题相同的要求,还有 1 个请求,除了问题中的所有内容之外,我如何从多个外部 API 获取 wss:// websocket 数据,感谢您的回答前进 我认为,由于它是 websocket,您可能希望将这些连接保持打开很长时间,并将来自这些 api 的消息作为较小的任务发送/接收。有多种方法可以编排这个客户端池,使用 celery 只是其中的一部分。我能想到的一种方法是实现一个命令模式,其中执行器逻辑将是单独的进程池,而 celery 任务将简单地通过某种 ipc 或 api 将命令传送到该池。无论哪种方式,您都不应该从 celery 任务中创建连接,因为每个任务都将创建它自己的,这是不可取的 感谢您的回答!既然您提到每个任务都会创建自己的连接,那么如果您要说无休止地运行 celery 工人,这也会是一个问题 通常,如果您使用像 pymongo 这样的库来为您处理池,那么不会有太大问题。如果有很多任务,每个任务都要求一个新的连接,就会出现问题。因此,您应该创建一个可以在任务之间共享的线程本地连接对象,这是我对这种情况的看法。以上是关于使用 Celery 通过 Gevent 进行实时、同步的外部 API 查询的主要内容,如果未能解决你的问题,请参考以下文章
使用 gevent 执行池的 celery 任务的 SynchronousOnlyOperation
我应该在 celery 中使用 prefork、eventlet 或 gevent 哪个池类?