Python:使用celery处理多个服务器上的参数列表

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python:使用celery处理多个服务器上的参数列表相关的知识,希望对你有一定的参考价值。

我正在尝试使用Celery来处理输入列表。我想只处理一次输入。问题是我的服务器都是超级计算机集群的一部分。我可以向每个服务器发送一个命令来启动进程。一旦该服务器被安排为我的用户名执行工作(将来会在某个随机时间发生),它将启动该进程(因此任何时候运行的服务器数量都是不确定的)。我希望所有正在执行我的用户名的服务器共享可用的工作,直到完成所有必需的工作。

但是,我很困惑,关于如何精确地编排这个。

这是我的app.py,它概述了服务器要使用的任务:

from celery import Celery

app = Celery('tasks',
  backend='redis://localhost:6379/0',
  broker='redis://localhost:6379/0')

@app.task
def add(x, y):
  with open('results.txt', 'a') as out:
    out.write(str(x + y) + '\n')

这是安排工作的脚本(worker.py):

'''Worker node; executes tasks outlined in app.py'''
from app import add

# run the add function and pass in arguments
for i in range(10000):
  result = add.apply_async(args=[1,i]).get()

在我的本地机器上,如果我在终端中运行celery worker -l info -A app,那将启动芹菜应用程序。如果我然后运行worker.py,我看到正在搅拌的工作。

如何让多个不同的主机使用未完成的任务?每个服务器都可以访问将运行Redis的静态IP。我是否向每个主机提交celery worker -l info -A app命令?如果是这样的话,每个主人在上网时会不会完全消耗未完成的作品?我非常感谢其他人可以通过这些高级问题提供的任何帮助!

答案

为了回答上面的问题,我创建了一个名为app.py的文件,并将其加载到我可以ssh的前端节点。此文件概述了各个服务器上的各个工作程序将处理的功能:

from celery import Celery

app = Celery('tasks',
  backend='redis://daccssfe.crc.nd.edu:6379/0',
  broker='redis://daccssfe.crc.nd.edu:6379/0')

@app.task
def log(*args):

  # have all workers write their results to a common outfile
  with open('/scratch365/dduhaime/celery-test.txt', 'a') as out:
    out.write('-'.join([str(i).strip() for i in args]) + '\n')

接下来我定义了一个函数schedule_work.py来安排要完成的工作:

'''Worker node; executes tasks outlined in app.py'''
from app import log

# run the add function and pass in arguments
for i in range(10000):
  print('* processing', i)

  result = log.apply_async(args=[str(i)]).get()

此文件创建10,000个工作单元,并将每个整数0:10000-1传递给工作队列。当工作人员上线时,他们将处理此队列。

为了添加工作者,我使用我大学的超级计算系统来创建10个工作程序,每个工作程序启动app.py文件,这将使工作人员从堆栈中消耗工作。要使用Sun Grid Engine队列系统(我正在使用的超级计算机将其用作作业提交协议),我将以下内容保存在文件start_workers.sh中:

#!/bin/bash
#$ -N celery
#$ -o logs/celery.log
#$ -t 1-10:1
#$ -pe smp 4
#$ -q long
#$ -r y
source ~/.bash_profile
source celery-env/bin/activate
# add a new worker
celery worker -l info -A app

然后我提交了这些工作(qsub start_workers.sh),这些工作开始了10名工人,每个工作人员都要从工作清单中提取。最后,他们都将他们的主机地址和参数从要完成的工作列表记录到他们都有权访问的所请求的文件中。正如我们在结果文件中看到的那样,10个工作主机集中的不同主机使用不同的输入:

# /scratch365/dduhaime/celery-test.txt content
10.32.77.210-0
10.32.77.210-1
10.32.77.132-2
10.32.77.210-3
10.32.77.142-4
10.32.77.132-5
10.32.77.210-6
10.32.77.192-7
10.32.77.116-8
10.32.77.142-9
10.32.77.132-10
...

以上是关于Python:使用celery处理多个服务器上的参数列表的主要内容,如果未能解决你的问题,请参考以下文章

Python之celery的简介与使用

python-celery专注于实现分布式异步任务处理任务调度的插件!

Python 最强大的任务调度框架 Celery!

celery django 守护进程上的多个工作人员和多个队列

celery

多个工作节点上的 Django + Celery 任务