Dask - 如何取消并重新提交停滞的任务?
Posted
技术标签:
【中文标题】Dask - 如何取消并重新提交停滞的任务?【英文标题】:Dask - How to cancel and resubmit stalled tasks? 【发布时间】:2020-03-09 09:51:04 【问题描述】:我经常遇到一个问题,即 Dask 在几个任务上随机停顿,通常与从我的网络上的不同节点读取数据有关(有关此内容的更多详细信息,请参见下文)。这可能在运行脚本几个小时后没有问题发生。它将以如下所示的形式无限期挂起(否则此循环需要几秒钟才能完成):
在这种情况下,我看到只有少数停滞的进程,并且都在一个特定的节点 (192.168.0.228) 上:
该节点上的每个工作人员都在几个 read_parquet 任务上停滞不前:
这是使用以下代码调用的,并且使用的是 fastparquet:
ddf = dd.read_parquet(file_path, columns=['col1', 'col2'], index=False, gather_statistics=False)
我的集群正在运行 Ubuntu 19.04 以及 Dask 和 Distributed 的所有最新版本(截至 11/12)以及所需的软件包(例如,tornado、fsspec、fastparquet 等)
.228 节点尝试访问的数据位于我集群中的另一个节点上。 .228 节点通过 CIFS 文件共享访问数据。我在运行脚本的同一节点上运行 Dask 调度程序(不同于 .228 节点和数据存储节点)。该脚本使用 Paramiko 通过 SSH 将工作人员连接到调度程序:
ssh_client = paramiko.SSHClient()
stdin, stdout, stderr = ssh_client.exec_command('sudo dask-worker ' +
' --name ' + comp_name_decode +
' --nprocs ' + str(nproc_int) +
' --nthreads 10 ' +
self.dask_scheduler_ip, get_pty=True)
.228 节点与调度程序和数据存储节点的连接看起来都很健康。 .228 节点可能会在尝试处理 read_parquet 任务时遇到某种短暂的连接问题,但如果发生这种情况,那么 .228 节点与调度程序和 CIFS 共享的连接在该短暂时刻之后不会受到影响。在任何情况下,日志都不会显示任何问题。这是来自 .228 节点的整个日志:
distributed.worker - INFO - Start worker at: tcp://192.168.0.228:42445
distributed.worker - INFO - Listening to: tcp://192.168.0.228:42445
distributed.worker - INFO - dashboard at: 192.168.0.228:37751
distributed.worker - INFO - Waiting to connect to: tcp://192.168.0.167:8786
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Threads: 2
distributed.worker - INFO - Memory: 14.53 GB
distributed.worker - INFO - Local Directory: /home/dan/worker-50_838ig
distributed.worker - INFO - -------------------------------------------------
distributed.worker - INFO - Registered to: tcp://192.168.0.167:8786
distributed.worker - INFO - -------------------------------------------------
暂且不说这是 Dask 还是我的代码/网络中的错误,是否可以为调度程序处理的所有任务设置一般超时?或者,是否可以:
-
识别停滞的任务,
复制停滞的任务并将其移动到另一个工作人员,然后
取消停滞的任务?
【问题讨论】:
【参考方案1】:是否可以为调度程序处理的所有任务设置一般超时?
不幸的是,截至 2019 年 11 月 13 日,答案是否定的。
如果任务正常失败,那么您可以使用client.retry(...)
重试该任务,但没有自动方式让任务在一定时间后自行失败。这是您必须自己将其写入 Python 函数的内容。不幸的是,很难在另一个线程中中断 Python 函数,这也是未实现的部分原因。
如果工人倒下了,那么事情将在其他地方进行尝试。但是,从您所说的听起来一切都很健康,只是任务本身可能会永远持续下去。不幸的是,很难确定这是一个失败案例。
【讨论】:
要求工作人员关闭或重新启动,以便重新安排任务是否合理? 是的。如果所有错误任务都在一个工作人员身上,那么重新启动该工作人员应该可以解决问题。 谢谢你们。 client.retry 不能解决问题 - 它不会重试有问题的密钥。我还尝试关闭并重新连接停滞的工作人员,但 client.retire_workers() 并不优雅,因此集群会丢失持久数据并且无法恢复。我发现的最佳解决方案是在 dask-worker 命令上使用 --lifetime 标志,调度程序会优雅地重新启动该工作人员并将密钥/数据移动到另一个工作人员。但是,显然,这并不是针对有问题的工人。是否可以以类似于 --lifetime 的方式使用分布式类重新启动工作人员? 我不能确定,但我相信我遇到了与 dan 类似的问题(记录在这里:github.com/dask/dask/issues/7543)。这里的描述是我在 GH 或 SO 上发现的其他各种挂起集群或停止任务的情况中最相似的。我理解这不是一个“失败案例”的逻辑w.r.t。当前定义了健康的工作人员/调度程序通信的方式。如果是这种情况,@MRocklin 你能推荐配置设置或参数给客户端、集群、自适应或其他任何可以使集群对这种性质的停顿更加健壮的东西吗?以上是关于Dask - 如何取消并重新提交停滞的任务?的主要内容,如果未能解决你的问题,请参考以下文章
JAVA 线程池 其中一个线程执行失败 则线程重新执行或者重新提交任务 急