如何在 Dask 中停止正在运行的任务?

Posted

技术标签:

【中文标题】如何在 Dask 中停止正在运行的任务?【英文标题】:How do I stop a running task in Dask? 【发布时间】:2018-08-18 13:52:41 【问题描述】:

在使用 Dask 的分布式调度程序时,我有一个任务正在我想停止的远程工作人员上运行。

我该如何阻止它?我知道取消方法,但是如果任务已经开始执行,这似乎不起作用。

【问题讨论】:

【参考方案1】:

如果它还没有运行

如果任务尚未开始运行,您可以通过取消关联的未来来取消它

future = client.submit(func, *args)  # start task
future.cancel()                      # cancel task

如果您使用的是 dask 集合,那么您可以使用 client.cancel 方法

x = x.persist()   # start many tasks 
client.cancel(x)  # cancel all tasks

如果它正在运行

但是,如果您的任务已经开始在工作线程中的线程上运行,那么您无法中断该线程。不幸的是,这是 Python 的限制。

建立一个明确的停止条件

您能做的最好的事情就是使用您自己的自定义逻辑在您的函数中构建某种停止标准。您可能会考虑在循环中检查共享变量。在这些文档中查找“变量”:http://dask.pydata.org/en/latest/futures.html

from dask.distributed import Client, Variable

client = Client()
stop = Varible()
stop.put(False)

def long_running_task():
    while not stop.get():
        ... do stuff

future = client.submit(long_running_task)

... wait a while

stop.put(True)

【讨论】:

变量的生命周期是多少?如果没有客户端引用它,它是垃圾收集的吗?另外:对于长时间运行的任务,我改为使用长期运行任务的键设置元数据,以便“如果键对应于任务,那么当调度程序忘记任务时,该键将被清理”跨度> 是的,如果不存在对变量的引用,则会清理变量。 这似乎是旧信息。我可以通过调用 Future.cancel() 来停止正在运行的任务。 好吧,我可以确认会发生这种情况。我很惊讶!

以上是关于如何在 Dask 中停止正在运行的任务?的主要内容,如果未能解决你的问题,请参考以下文章

如何查看Dask Compute任务的进度?

Dask - 如何取消并重新提交停滞的任务?

Dask分布式中如何高效提交大参数任务?

如果应用程序正在启动,如何停止后台任务?

如何在Matlab中停止正在运行的脚本[重复]

Dask Worker 进程内存不断增长