celery: 守护进程不允许有子进程
Posted
技术标签:
【中文标题】celery: 守护进程不允许有子进程【英文标题】:celery: daemonic processes are not allowed to have children 【发布时间】:2015-08-17 21:51:27 【问题描述】:在 Python (2.7) 中,我尝试在 celery 任务(celery 3.1.17)中创建进程(使用多处理),但它给出了错误:
daemonic processes are not allowed to have children
谷歌搜索,我发现最新版本的台球修复了“错误”,但我有最新版本(3.3.0.20)并且错误仍在发生。我也尝试在我的 celery 任务中实现this workaround,但它给出了同样的错误。
有人知道怎么做吗? 任何帮助表示赞赏, 帕特里克
编辑:sn-ps 代码
任务:
from __future__ import absolute_import
from celery import shared_task
from embedder.models import Embedder
@shared_task
def embedder_update_task(embedder_id):
embedder = Embedder.objects.get(pk=embedder_id)
embedder.test()
人工测试功能(from here):
def sleepawhile(t):
print("Sleeping %i seconds..." % t)
time.sleep(t)
return t
def work(num_procs):
print("Creating %i (daemon) workers and jobs in child." % num_procs)
pool = mp.Pool(num_procs)
result = pool.map(sleepawhile,
[randint(1, 5) for x in range(num_procs)])
# The following is not really needed, since the (daemon) workers of the
# child's pool are killed when the child is terminated, but it's good
# practice to cleanup after ourselves anyway.
pool.close()
pool.join()
return result
def test(self):
print("Creating 5 (non-daemon) workers and jobs in main process.")
pool = MyPool(5)
result = pool.map(work, [randint(1, 5) for x in range(5)])
pool.close()
pool.join()
print(result)
我的真实函数:
import mulitprocessing as mp
def test(self):
self.init()
for saveindex in range(self.start_index,self.start_index+self.nsaves):
self.create_storage(saveindex)
# process creation:
procs = [mp.Process(name="Process-"+str(i),target=getattr(self,self.training_method),args=(saveindex,)) for i in range(self.nproc)]
for p in procs: p.start()
for p in procs: p.join()
print "End of task"
init 函数定义了一个多处理数组和一个共享相同内存的对象,以便我的所有进程可以同时更新同一个数组:
mp_arr = mp.Array(c.c_double, np.random.rand(1000000)) # example
self.V = numpy.frombuffer(mp_arr.get_obj()) #all the processes can update V
调用任务时产生的错误:
[2015-06-04 09:47:46,659: INFO/MainProcess] Received task: embedder.tasks.embedder_update_task[09f8abae-649a-4abc-8381-bdf258d33dda]
[2015-06-04 09:47:47,674: WARNING/Worker-5] Creating 5 (non-daemon) workers and jobs in main process.
[2015-06-04 09:47:47,789: ERROR/MainProcess] Task embedder.tasks.embedder_update_task[09f8abae-649a-4abc-8381-bdf258d33dda] raised unexpected: AssertionError('daemonic processes are not allowed to have children',)
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 240, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 438, in __protected_call__
return self.run(*args, **kwargs)
File "/home/patrick/django/entite-tracker-master/entitetracker/embedder/tasks.py", line 21, in embedder_update_task
embedder.test()
File "/home/patrick/django/entite-tracker-master/entitetracker/embedder/models.py", line 475, in test
pool = MyPool(5)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 159, in __init__
self._repopulate_pool()
File "/usr/lib/python2.7/multiprocessing/pool.py", line 223, in _repopulate_pool
w.start()
File "/usr/lib/python2.7/multiprocessing/process.py", line 124, in start
'daemonic processes are not allowed to have children'
AssertionError: daemonic processes are not allowed to have children
【问题讨论】:
请使用导致异常和完整异常的代码的 sn-p 更新您的问题。 添加了我的真实代码(相对于人工代码)。感谢 scytale 的帮助,非常感谢。 hm.... 里面有很多 OO(你是 Java 人吗?:-) OO 和分布式处理会导致过度复杂化。另外,您缺少类定义(test()
是类的方法,对吗?)。另外,您应该尝试显示training_method()
(或它的代表性示例)。是否可以将training_method()
的功能移动到一个函数中?这将使它更容易与 celery 集成(或多处理)。
哦等等...test()
是Embedder
类的一个方法,对吧?如果是这样,那么为了清楚起见,它们应该在同一个代码块中。 Embedder 是一个 Django 模型,对吧?你应该把这一切都说清楚——这非常相关。
是的。 Embedder 是一个 Django 模型,而 test() 是它的方法之一。 multiprocessing 模块的唯一用途是 Embedder init() 方法(创建多处理数组)和 test() 方法(创建进程、启动和加入)。 self.training_method 引用了用于学习的函数(test() 实际上是 learn())。
【参考方案1】:
我在 django 中尝试从 Celery 任务调用多处理方法时遇到类似错误。我解决了使用台球而不是多处理
import billiard as multiprocessing
希望对你有帮助。
【讨论】:
对于windows,这是另一个奇怪的错误!不知道如何解决这个问题.. 对不起@SauravKumar,如果我知道可能会出现什么错误,我不会使用 Windows 机器【参考方案2】:billiard
和 multiprocessing
是不同的库 - billiard
是 Celery 项目自己的 multiprocessing
的分支。您需要导入 billiard
并使用它而不是 multiprocessing
但是,更好的答案可能是您应该重构代码,以便生成更多 Celery 任务,而不是使用两种不同的方式来分配您的工作。
你可以使用 Celery canvas 来做到这一点
from celery import group
@app.task
def sleepawhile(t):
print("Sleeping %i seconds..." % t)
time.sleep(t)
return t
def work(num_procs):
return group(sleepawhile.s(randint(1, 5)) for x in range(num_procs)])
def test(self):
my_group = group(work(randint(1, 5)) for x in range(5))
result = my_group.apply_async()
result.get()
我已经尝试为您的代码制作一个使用画布原语而不是多处理的工作版本。但是,由于您的示例非常人为,因此很难想出有意义的东西。
更新:
这是你使用 Celery 画布的真实代码的翻译:
tasks.py
:
@shared_task
run_training_method(saveindex, embedder_id):
embedder = Embedder.objects.get(pk=embedder_id)
embedder.training_method(saveindex)
models.py
:
from tasks import run_training_method
from celery import group
class Embedder(Model):
def embedder_update_task(self):
my_group = []
for saveindex in range(self.start_index, self.start_index + self.nsaves):
self.create_storage(saveindex)
# Add to list
my_group.extend([run_training_method.subtask((saveindex, self.id))
for i in range(self.nproc)])
result = group(my_group).apply_async()
【讨论】:
谢谢scytale,我试试!你知道在 billiard 中是否有一个等价的 multiprocessing.array(参见我的 init() 方法),以便所有任务可以共享同一个内存变量? 不,没有 - celery 工作人员不能假设能够共享任何东西,因为它们可能在不同的主机上运行。如果您打算只在一台机器上运行,那么可能只使用多处理是一种更好的方法,因为您拥有共享内存的便利。如果您需要在多台机器上运行,那么可能只使用 celery 并对您的数据进行 mapreduce 样式处理【参考方案3】:如果您正在使用已包含多处理的子模块/库,则设置 worker 的 -P threads
参数可能更有意义:
celery worker -P threads
https://github.com/celery/celery/issues/4525#issuecomment-566503932
更新:在 celery v5.1.1 中的命令行解析中存在一个错误,即使支持 -P threads
,它也不允许。它固定在 >= v5.1.1
中。从v4.4
开始正式支持。
【讨论】:
谢谢!我认为现在应该将其视为已接受的答案。 谢谢你。最佳答案。【参考方案4】:当我在 Celery 4.2.0 和 Python3.6 中使用多处理时,我得到了这个。 用台球解决了这个问题。
我更改了源代码
from multiprocessing import Process
到
from billiard.context import Process
解决了这个错误。
注意,导入源是billiard.context
而不是billiard.process
【讨论】:
更新:from billiard import Process
现在似乎可以工作了!以上是关于celery: 守护进程不允许有子进程的主要内容,如果未能解决你的问题,请参考以下文章
Django celery 和 celery-beat 守护进程脚本错误
Centos7 使用 Supervisor 守护进程 Celery
使用 AWS Elastic Beanstalk for python 3.6 在后台运行 celery worker 作为守护进程?