使用 pool.apply_async 测试 ML 模型的并行处理不允许访问结果

Posted

技术标签:

【中文标题】使用 pool.apply_async 测试 ML 模型的并行处理不允许访问结果【英文标题】:Parallel processing for testing ML model with pool.apply_async does not allow access to results 【发布时间】:2021-06-23 18:48:16 【问题描述】:

我有一个包含 270 万个样本的数据集,我需要在这些数据集上测试我的 ML 模型。我的笔记本电脑上有 8 个内核,我想尝试并行化我的测试代码以节省时间。这是测试功能:

def testMTGP(x_sample, y_sample, ind, model, likelihood):
    x_sample = x_sample.view(1, -1)
    y_sample = y_sample.view(1, -1)
    model.eval()
    likelihood.eval()
    with torch.no_grad():
        prediction = likelihood(model(x_sample))
        mean = (prediction.mean).detach().numpy()
        prewhiten_error = (y_sample.detach().numpy()) - mean
        cov_matrix = (prediction.covariance_matrix).detach().numpy()
        white_error, matcheck = Whiten(prewhiten_error, cov_matrix)
    return (
        ind,
        
            "prediction": mean,
            "prewhiten_error": prewhiten_error,
            "white_error": white_error,
            "cov_matrix": cov_matrix,
            "matcheck": matcheck,
        ,
    )

我返回与我测试的样本相对应的索引以及与模型为测试所做的计算相关的数据字典。函数Whiten(prewhiten_error, cov_matrix)也是我自己定义的,在代码文件开头导入的,所以全局可用。它只是接受输入,转换 cov_matrix 并将其与 prewhiten_error 相乘并返回答案,以及一个指示有关 cov_matrix 的一些状态信息的变量。

对于多处理,想法是首先将整个数据集分成大小大致相等的块;挑选每个块并将一个样本发送到每个核心进行处理。我正在使用pool.apply_async。这是代码:

test_X = torch.load(test_X_filename) #torch tensor of shape 2.7M x 3
test_Y = torch.load(test_Y_filename) #torch tensor of shape 2.7M x 3
cores = mp.cpu_count()
chunk_size = int(test_X.shape[0] / cores)
start_time = time.time()
parent_list = []
for start_ind in range(0, test_X.shape[0], chunk_size):
    pool = mp.Pool(processes=cores)
    proc_data_size = int(chunk_size / cores)
    stop_ind = min(test_X.shape[0], start_ind + chunk_size)
    results = [
        pool.apply_async(
            testMTGP, (test_X[i].detach(), test_Y[i].detach(), i, model, likelihood,)
        )
        for i in range(start_ind, stop_ind)
    ]
    for res in results:
        print("Length of results list= ", len(results))
        print("Data type of res is: ", type(res))
        res_dict = res.get()
        parent_list.append(res_dict)
    pool.close()

test_X[i]test_Y[i] 都是形状为 (3,) 的张量。在执行我得到的代码时:

回溯(最近一次通话最后一次): 文件“multiproc_async.py”,第 288 行,在 res_dict = res.get() # [1] 文件 “/home/aman/anaconda3/envs/thesis/lib/python3.8/multiprocessing/pool.py”, 第 771 行,在获取中 提高自我价值 文件 “/home/aman/anaconda3/envs/thesis/lib/python3.8/multiprocessing/pool.py”, 第 537 行,在 _handle_tasks 放(任务) 文件 “/home/aman/anaconda3/envs/thesis/lib/python3.8/multiprocessing/connection.py”, 第 206 行,在发送中 self._send_bytes(_ForkingPickler.dumps(obj)) 文件 “/home/aman/anaconda3/envs/thesis/lib/python3.8/multiprocessing/reduction.py”, 第 51 行,转储中 cls(buf, 协议).dump(obj) AttributeError:无法腌制本地对象 MultitaskGaussianLikelihood.__init__.<locals>.<lambda>

我是多处理新手,谷歌搜索此错误并没有真正帮助(其中一些不相关,有些超出我的理解)。有人可以帮我理解我犯了什么错误吗?

【问题讨论】:

请将堆栈跟踪作为文本发布,而不是图像 另外,建议使用上下文管理器而不是池,pool.close (with multiprocessing.Pool(processes=3) as pool:) 【参考方案1】:

让我们将您的问题简化为问题的根本原因。对于多处理部分,我们需要一个工作示例,否则我们没有可重现的示例来帮助您。然后你可以在实际训练模型中进行修补。

让我们使用这个虚拟函数:

def testMTGP(x_sample, y_sample, ind, model, likelihood):
    return (
        ind,
        
            "prediction": 1,
            "prewhiten_error": 1,
            "white_error": 1,
            "cov_matrix": 1,
            "matcheck": 1,
        ,
    )

那么一个工作和干净的例子是:

if __name__ == '__main__':
    cores = mp.cpu_count()
    args = [(None, None, i, None, None,) for i in range(0, 5)]

    start_time = time.time()
    with mp.Pool(processes=3) as pool:
        results = pool.starmap(testMTGP, args)
        
    end_time = time.time()
    
    print(results)
    print("it took %s" % (end_time-start_time))

试试这个,一点一点地引入训练模型所需的实际逻辑。我建议您从每次传递所需的实际参数开始,最后更新 testMTGP 函数(替换虚拟函数)。

当您找出导致代码崩溃的原因和/或发布堆栈跟踪时,我可以提供更多帮助。

【讨论】:

【参考方案2】:

这个问题相当复杂,我从未使用过 Torch,而且我也不是多处理方面的专家。但我确实对这里的概念有很好的掌握,所以我会尽力解释哪里出了问题,但你可能需要想出修复方法,因为它取决于你的最终目标。

注意:我注意到您只是在输入 python。看起来这是 Ubuntu 的 Windows 应用商店版本,如果是这种情况,您可能希望使用 python3 运行该程序。 (如果您重新映射了别名,请忽略。)

所以堆栈跟踪中的最终错误,Can't picke local object 'MultitaskGaussianLikelihood.__init__.<locals>.<lambda>';这是指库Pickle,它是一个序列化程序库。如果您不熟悉序列化,它基本上是跨系统重建某些东西的标准格式。例如,JSON 是一个很常见的序列化器;它允许您将多个变量作为数组跨多种编程语言传输。 Pickle 允许对象的序列化,以便它们可以转移到另一个程序。我相信res.get() 在这里序列化的原因是由于 python 中的功能有限,核心能够相互通信,这在多处理文档中很明显。

问题是 MultitaskGaussianLikelihood 类似乎使用 lambda 作为其参数之一,并且根据该 AttributeError,pickle 无法序列化 lambda。这意味着它不能序列化 MultitaskGaussianLikelihood,因为它包含一个。我这里没有所有代码,所以我看不到 MultitaskGaussianLikelihood 对象在您返回的位置,但我想说您需要从该类中提取您需要的所有信息并返回该数据而不是返回该类并在事后提取它。

希望我解释得很好!

【讨论】:

以上是关于使用 pool.apply_async 测试 ML 模型的并行处理不允许访问结果的主要内容,如果未能解决你的问题,请参考以下文章

无法使用 python 的多处理 Pool.apply_async() 腌制 <type 'instancemethod'>

如何使用multiprocessing.Pool.apply_async登录到单个文件

Pool.apply_async():嵌套函数未执行

将管道/连接作为上下文参数传递给多处理 Pool.apply_async()

如何从 pool.apply_async 调用中累积结果?

在 python2.7 中使用 pool.apply_async 不将值插入 MySQL 表