我可以在 Dask/Distributed 中使用从 .py 文件导入的函数吗?

Posted

技术标签:

【中文标题】我可以在 Dask/Distributed 中使用从 .py 文件导入的函数吗?【英文标题】:Can I use functions imported from .py files in Dask/Distributed? 【发布时间】:2017-01-10 17:38:48 【问题描述】:

我有一个关于序列化和导入的问题。

函数应该有自己的导入吗? like I've seen done with PySpark 以下是完全错误的吗? mod.py 是否需要是 conda/pip 包? mod.py 已写入共享文件系统。

In [1]: from distributed import Executor

In [2]: e = Executor('127.0.0.1:8786')

In [3]: e
Out[3]: <Executor: scheduler="127.0.0.1:8786" processes=2 cores=2>

In [4]: import socket

In [5]: e.run(socket.gethostname)
Out[5]: '172.20.12.7:53405': 'n1015', '172.20.12.8:53779': 'n1016'

In [6]: %%file mod.py
   ...: def hostname():
   ...:     return 'the hostname'
   ...: 
Overwriting mod.py

In [7]: import mod

In [8]: mod.hostname()
Out[8]: 'the hostname'

In [9]: e.run(mod.hostname)
distributed.utils - ERROR - No module named 'mod'

【问题讨论】:

【参考方案1】:

快速解答

将您的 mod.py 文件上传给您的所有工作人员。您可以使用用于设置 dask.distributed 的任何机制来执行此操作,也可以使用 upload_file 方法

e.upload_file('mod.py')

或者,如果你的函数是在 IPython 中创建的,而不是作为模块的一部分,它会毫无问题地被发送出去。

长答案

这一切都与函数在 Python 中的序列化方式有关。来自模块的函数通过它们的模块名和函数名进行序列化

In [1]: from math import sin

In [2]: import pickle

In [3]: pickle.dumps(sin)
Out[3]: b'\x80\x03cmath\nsin\nq\x00.'

因此,如果客户端机器想要引用 math.sin 函数,它会将这个字节串(您会注意到其中包含 'math''sin' 以及其他字节)发送到工作机器。工人看着这个字节串并说“好的,我想要的功能在某个模块中,让我去我的本地文件系统中找到它。如果模块不存在,那么它会引发错误,就像您在上面收到的一样。

对于动态创建的函数(您在 IPython 中创建的函数),它使用完全不同的方法,将所有代码捆绑在一起。这种方法通常效果很好。

一般而言,Dask 假设工作人员和客户都拥有相同的软件环境。通常,这主要由设置集群的人使用 Docker 等其他工具来处理。当您的文件或脚本更新更频繁时,upload_file 之类的方法可以填补空白。

【讨论】:

谢谢,这正是我所需要的。所以更好的方法可能是使用setup.py install --develop? 如何安装 dask.distributed 与这个问题无关。如果您指的是您的 mod.py 软件,那可能取决于您的各种 dask-worker 进程是否都能看到您将软件安装到的任何位置。例如,这可能适用于网络文件系统,但如果 dask-worker 进程完全位于不同的文件系统上,则不会起作用。【参考方案2】:

要在集群上运行工作人员环境中不可用的导入函数,您还可以从导入的函数创建本地函数。然后这个本地函数将被cloudpickle 腌制。在 Python 2 中,您可以使用 new.function 来实现这一点(参见 new module)。对于 Python 3,这可以通过 types module 实现,但我还没有尝试过。

您上面的示例将如下所示:

In [3]: import mod

In [4]: import new

In [5]: def remote(func):
   ...:     return new.function(func.func_code, func.func_globals, closure=func.func_closure)
   ...:

In [6]: e.run(remote(mod.hostname))
Out[6]: 'tcp://10.0.2.15:44208': 'the hostname'

【讨论】:

【参考方案3】:

将模块的目录添加到 PYTHONPATH 对我有用

【讨论】:

以上是关于我可以在 Dask/Distributed 中使用从 .py 文件导入的函数吗?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用dask.distributed并行化嵌套循环?

尝试连接到 dask 仪表板时出现“404 Not found”

Dask 中的 KilledWorker 异常是啥意思?

我可以在存储规则中使用云功能吗?

我可以在 activex 控件中使用 CView 派生类吗?

我可以在 C 程序中使用 C++ 库吗?