在 dask 产生的进程中调用 dask

Posted

技术标签:

【中文标题】在 dask 产生的进程中调用 dask【英文标题】:Calling dask inside dask spawned process 【发布时间】:2017-06-02 01:49:20 【问题描述】:

我们有一个包含大量任务的大型项目。我们使用 dask 图来安排每个任务。该图的一个小样本如下。请注意,dask 设置为多处理模式。

dask_graph:

  universe: !!python/tuple [gcsstrategies.svc.business_service.UniverseService.load_universe_object, CONTEXT]
  raw_market_data: !!python/tuple [gcsstrategies.svc.data_loading_service.RDWLoader.load_market_data, CONTEXT, universe]
  raw_fundamental_data: !!python/tuple [gcsstrategies.svc.data_loading_service.RDWLoader.load_fundamental_data, CONTEXT, universe]

dask_keys: [raw_fundamental_data]

现在其中一个任务raw_fundamental_data 使用@delay 懒惰地安排dask 任务并使用dask.compute() 运行它们。选择这种设计的原因是,dask 在raw_fundamental_data 中将被调度和延迟运行的任务列表是在运行时根据运行时参数动态选择的。

我们看到的错误是:

守护进程不允许有子进程

我们理解这是因为一个衍生的进程试图产生子进程。这个问题有什么解决办法吗? dask 是否有任何方法允许通过 daskgraph 调度的任务使用@delay 或其他方法来调度和延迟运行自己的任务。

请注意,在我们的系统中,有许多任务将使用多处理来运行它们自己的任务。所以顺序执行不是一种选择。

【问题讨论】:

【参考方案1】:

多处理调度程序不支持这种操作。但是,distributed scheduler 是(您也可以轻松地在单台机器上使用分布式调度程序。

相关文档页面在这里:

http://distributed.readthedocs.io/en/latest/task-launch.html http://dask.readthedocs.io/en/latest/scheduler-choice.html

这是一个小例子

In [1]: from dask.distributed import Client, local_client

In [2]: def f(n):
   ...:     with local_client() as lc:
   ...:         futures = [lc.submit(lambda x: x + 1, i) for i in range(n)]
   ...:         total = lc.submit(sum, futures)
   ...:         return total.result()
   ...:     

In [3]: c = Client()  # start processes on local machine

In [4]: future = c.submit(f, 10)

In [5]: future.result()
Out[5]: 55

这使用 concurrent.futures 接口来 dask 而不是 dask.delayed,但您也可以使用 dask.delayed。见http://distributed.readthedocs.io/en/latest/manage-computation.html

【讨论】:

非常感谢您的帮助。如此干净和简单。您的解决方案完美运行。虽然我无法让@delayed 工作,所以我选择使用你使用 local_client() 的方法。 @MRocklin 如果我从延迟/提交的函数内部连接到同一个 Dask 分布式集群,这是一个有效的用例吗? (即lc = Client('127.0.0.1:8786'))当我尝试这样做并使用“已发布”数据集时,我似乎陷入了僵局;我应该举报吗? 见distributed.worker_client @MRocklin 完美!每次我卡在 Dask 的某个地方时,我只需要重新阅读文档,因为 Dask 的发展速度快得超出我的想象 :)

以上是关于在 dask 产生的进程中调用 dask的主要内容,如果未能解决你的问题,请参考以下文章

compute() 在 dask 中做啥?

Dask Worker 进程内存不断增长

使用 Dask 从多个 Python 进程编写 Parquet 文件

使用 Dask 将大于内存的数据帧缓存到本地磁盘

无法在 dask 任务中启动多处理池

如何将 Dask.DataFrame 转换为 pd.DataFrame?