如何在 dask 分布式工作人员上设置日志记录?
Posted
技术标签:
【中文标题】如何在 dask 分布式工作人员上设置日志记录?【英文标题】:How to set up logging on dask distributed workers? 【发布时间】:2017-05-19 10:02:07 【问题描述】:将 dask 升级到 1.15.0 版后,我的日志记录停止工作。
我使用 logging.config.dictConfig 来初始化 python 日志记录工具,之前这些设置传播到所有工作人员。但是升级后就不行了。
如果我在每个工作人员的每次日志调用之前执行 dictConfig,它可以工作,但这不是一个正确的解决方案。
所以问题是它如何在我的计算图开始执行之前初始化每个工作人员的日志记录并且每个工作人员只执行一次?
更新:
这个 hack 对一个虚拟示例起作用,但对我的系统没有任何影响:
def init_logging():
# logging initializing happens here
...
client = distributed.Client()
client.map(lambda _: init_logging, client.ncores())
更新 2:
在仔细阅读文档后解决了这个问题:
client.run(init_logging)
所以现在的问题是:这是解决这个问题的正确方法吗?
【问题讨论】:
【参考方案1】:从 1.15.0 版开始,我们现在从一个干净的进程中派生工作人员,因此您在调用 Client()
之前对进程所做的更改不会影响派生的工作人员。欲了解更多信息,请在此处搜索forkserver
:https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
您使用Client.run
的解决方案对我来说看起来不错。 Client.run 目前(从 1.15.0 版开始)是在所有当前活动的工作人员上调用函数的最佳方式。
分布式系统
值得注意的是,您在这里设置的是从单台计算机上的同一进程派生的客户端。您在上面使用的技巧在分布式设置中不起作用。如果有人遇到这个问题,询问如何在集群上下文中使用 Dask 处理日志记录,我将添加此注释。
通常 Dask 不会移动日志。相反,通常用于启动 Dask 的任何机制都会处理此问题。像 SGE/SLURM/Torque/PBS 这样的作业调度程序都是这样做的。像 YARN/Mesos/Marathon/Kubernetes 这样的云系统都是这样做的。 dask-ssh
工具可以做到这一点。
【讨论】:
所以这意味着如果我想记录任何东西,我必须在任何应该在工作人员上运行的代码块中配置日志记录,例如。到远程系统日志? Client.run 方法不够好。在我发现在实践中可以终止和重新启动工作进程(不详述)之后,我不得不替换集群中 Client.run() 的所有用法。新开始的工人错过了由 Client.run() 准备的状态。因此,我使用 Client.scatter(global_data, broadcast=True) 并在提交给工作人员的每个方法中检查是否在工作人员上执行了初始化。我将自己的“worker state”字段存储在使用distributed.get_worker() 获得的worker 中。有分布式的就好了。worker_init(method) 沃尔夫冈,看看Client.register_worker_callbacks。 我认为分布式作业日志应该与工作日志分开。前者是关于分布式图处理发生了什么。后者是关于发生在工人身上的事情。前者具有与客户端和分布式作业调度程序相关的上下文。后者具有与集群管理器相关的上下文。如果 dask 有一种本地方式来捕获来自图形节点的所有日志并将它们聚合成一个随意的跟踪,那就太好了。 Chris 非常感谢 Client.register_worker_callbacks()。不幸的是,我没有在 dask 文档中找到它(可能是“回调”名称让我感到困惑),并且它非常适合基于其文档的所有当前和未来的新工作人员初始化:“该函数将立即在所有当前连接的工作人员上运行。它将将来添加的任何工作人员也可以在连接时运行。”以上是关于如何在 dask 分布式工作人员上设置日志记录?的主要内容,如果未能解决你的问题,请参考以下文章
我们如何在 dask 分布式中为每个工作人员选择 --nthreads 和 --nprocs?