使用 CPU 工作人员进行数据处理,并使用带有 dask 的 GPU 工作人员训练 xgboost
Posted
技术标签:
【中文标题】使用 CPU 工作人员进行数据处理,并使用带有 dask 的 GPU 工作人员训练 xgboost【英文标题】:Data wrangling using CPU workers and training xgboost using GPU workers with dask 【发布时间】:2021-11-11 20:55:31 【问题描述】:我正在尝试从 hdfs 读取 200 个 parquet 文件,然后尝试使用 4 个 GPU 训练模型。我的机器上也有 48 个 vcore。如果我只使用 GPU 工作人员启动集群,那么读取部分将非常慢(因为它只使用分配给 gpu 工作人员的 4 个 cpu 工作人员,并且你不能真正运行比你拥有的 gpu 数量更多的工作人员,除非你在单独的外壳上运行它们,然后它会变得很讨厌,因为你自己处理内存管理问题。)我想使用 CPU 工作人员读取文件,与 cpu 工作人员一起处理数据,然后使用 GPU 工作人员训练 xgboost 模型.我阅读了 here 文档,了解如何启动和分配具有不同资源的工作人员到不同的任务。我也看到this的问题,但我有点困惑。
这是我试图运行以读取 .parquet
文件的代码:
import dask.dataframe as dd
df = dd \
.read_parquet(
"hdfs://address/to/the/*.parquet",
storage_options =
"user":user,
"kerb_ticket":kerb_ticket,
engine='pyarrow') \
.persist()
这将自动使用所有 cpu 和 gpu 工作人员,这很好。在此之后,我需要创建我的训练数据和标签。假设我有X_train
、y_train
和params
。在这里我将它们转换为dask_cudf
:
X_train = dask_cudf.from_dask_dataframe(X_train)
y_train = dask_cudf.from_dask_dataframe(y_train)
这是我只需要使用 GPU 工作者的部分:
Xy = dxgb.DaskDMatrix(client, X_train, y_train)
为了遵循文档,我应该将其转换为:
Xy = client.submit(dxgb.DaskDMatrix, client, X_train, y_train, resources='GPU': 1)
但是我会得到这个错误:
distributed.protocol.pickle - INFO - Failed to serialize (<Client: 'tcp://169.68.236.35:8786' processes=52 threads=52, memory=1.97 TiB>, <dask_cudf.DataFrame | 19200 tasks | 200 npartitions>, <dask_cudf.Series | 600 tasks | 200 npartitions>). Exception: cannot pickle 'socket' object
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
/envs/dask/lib/python3.8/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
48 buffers.clear()
---> 49 result = pickle.dumps(x, **dump_kwargs)
50 if len(result) < 1000:
/envs/dask/lib/python3.8/socket.py in __getstate__(self)
271 def __getstate__(self):
--> 272 raise TypeError(f"cannot pickle self.__class__.__name__!r object")
273
TypeError: cannot pickle 'socket' object
During handling of the above exception, another exception occurred:
TypeError Traceback (most recent call last)
<ipython-input-12-0d6a943365a9> in <module>
1 # Xy = dxgb.DaskDMatrix(client, X_train, y_train)
2 # Xy = dxgb.DaskDeviceQuantileDMatrix(client, X_train, y_train)
----> 3 Xy = client.submit(dxgb.DaskDMatrix, client, X_train, y_train, resources='GPU': 1)
4 # Xy_valid = dxgb.DaskDMatrix(client, X_valid, y_valid)
/envs/dask/lib/python3.8/site-packages/distributed/client.py in submit(self, func, key, workers, resources, retries, priority, fifo_timeout, allow_other_workers, actor, actors, pure, *args, **kwargs)
1629 dsk = skey: (func,) + tuple(args)
1630
-> 1631 futures = self._graph_to_futures(
1632 dsk,
1633 [skey],
/envs/dask/lib/python3.8/site-packages/distributed/client.py in _graph_to_futures(self, dsk, keys, workers, allow_other_workers, priority, user_priority, resources, retries, fifo_timeout, actors)
2646 # Pack the high level graph before sending it to the scheduler
2647 keyset = set(keys)
-> 2648 dsk = dsk.__dask_distributed_pack__(self, keyset, annotations)
2649
2650 # Create futures before sending graph (helps avoid contention)
/envs/dask/lib/python3.8/site-packages/dask/highlevelgraph.py in __dask_distributed_pack__(self, client, client_keys, annotations)
1045 "__module__": layer.__module__,
1046 "__name__": type(layer).__name__,
-> 1047 "state": layer.__dask_distributed_pack__(
1048 self.get_all_external_keys(),
1049 self.key_dependencies,
/envs/dask/lib/python3.8/site-packages/dask/highlevelgraph.py in __dask_distributed_pack__(self, all_hlg_keys, known_key_dependencies, client, client_keys)
424 for k, v in dsk.items()
425
--> 426 dsk = toolz.valmap(dumps_task, dsk)
427 return "dsk": dsk, "dependencies": dependencies
428
/envs/dask/lib/python3.8/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()
/envs/dask/lib/python3.8/site-packages/cytoolz/dicttoolz.pyx in cytoolz.dicttoolz.valmap()
/envs/dask/lib/python3.8/site-packages/distributed/worker.py in dumps_task(task)
3784 return d
3785 elif not any(map(_maybe_complex, task[1:])):
-> 3786 return "function": dumps_function(task[0]), "args": warn_dumps(task[1:])
3787 return to_serialize(task)
3788
/envs/dask/lib/python3.8/site-packages/distributed/worker.py in warn_dumps(obj, dumps, limit)
3793 def warn_dumps(obj, dumps=pickle.dumps, limit=1e6):
3794 """Dump an object to bytes, warn if those bytes are large"""
-> 3795 b = dumps(obj, protocol=4)
3796 if not _warn_dumps_warned[0] and len(b) > limit:
3797 _warn_dumps_warned[0] = True
/envs/dask/lib/python3.8/site-packages/distributed/protocol/pickle.py in dumps(x, buffer_callback, protocol)
58 try:
59 buffers.clear()
---> 60 result = cloudpickle.dumps(x, **dump_kwargs)
61 except Exception as e:
62 logger.info("Failed to serialize %s. Exception: %s", x, e)
/envs/dask/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dumps(obj, protocol, buffer_callback)
71 file, protocol=protocol, buffer_callback=buffer_callback
72 )
---> 73 cp.dump(obj)
74 return file.getvalue()
75
/envs/dask/lib/python3.8/site-packages/cloudpickle/cloudpickle_fast.py in dump(self, obj)
561 def dump(self, obj):
562 try:
--> 563 return Pickler.dump(self, obj)
564 except RuntimeError as e:
565 if "recursion" in e.args[0]:
/envs/dask/lib/python3.8/socket.py in __getstate__(self)
270
271 def __getstate__(self):
--> 272 raise TypeError(f"cannot pickle self.__class__.__name__!r object")
273
274 def dup(self):
TypeError: cannot pickle 'socket' object
有人知道如何解决这个问题吗?
【问题讨论】:
【参考方案1】:问题是dask.Client
是不可序列化的,所以不能提交。
您可以使用dask.distributed.get_client
在任务中访问dask.Client
来解决此问题:
from dask.distributed import get_client
def create_dmatrix(X_train, y_train):
client = get_client()
return dxgb.DaskDMatrix(client, X_train, y_train)
Xy = client.submit(create_dmatrix, X_train, y_train, resources='GPU': 1)
【讨论】:
以上是关于使用 CPU 工作人员进行数据处理,并使用带有 dask 的 GPU 工作人员训练 xgboost的主要内容,如果未能解决你的问题,请参考以下文章
mysql最大连接数默认只有100,当很多用户访问带有数据库的站点如论坛时,会造成mysql服务CPU占用率上升,并无法提供服务