Groupby、Split 和 Count 组合(删除循环以提高 pytorch 性能)

Posted

技术标签:

【中文标题】Groupby、Split 和 Count 组合(删除循环以提高 pytorch 性能)【英文标题】:Groupby, Split, and Count Combinations (removing loop for pytorch performance) 【发布时间】:2021-10-31 09:09:56 【问题描述】:

我正在尝试提高以下 sn-p 代码在 GPU 上执行的性能:

[torch.combinations(doc_ids, r=2) for doc_ids in torch.split(X, C)]

如果为了更适合 GPU 使用而移除此处的循环,那就太好了。我想看看是否有人对此有任何建议。

我知道这里出了点问题,因为 GPU 基准测试比 CPU 上的相同代码慢得多。

最终,我正在处理的问题陈述类似于“给定一个形状为 n_docs x m_features 的矩阵 X,计算文档之间共享特征的数量”。

我为此拥有的实际代码稍微复杂一些(在 sn-p 中进行一些连接和排序 - 见下文),但似乎包含 fortorch.combinations 确实降低了性能。

这里有一小段代码可以重现我正在做的一个方面,以及一些基准测试:

import torch
import multiprocessing as mp
from tqdm import tqdm
import numpy as np
import pandas as pd
import itertools
import perfplot

# Dummy Data
############


def load_data(n_docs=100000,
              doc_id_start=13340,
              max_feats_per_doc=20):
    # Document IDs each have an undetermined number of features
    doc_range = np.arange(doc_id_start, n_docs+doc_id_start)
    doc_ids = np.concatenate([[docid]*np.random.randint(max_feats_per_doc)
                              for docid in doc_range]).ravel()

    feature_ids = np.random.randint(100000, 900000, size=len(doc_ids))

    # DataFrame of starting data (Doc_id, feature_id)
    df = pd.DataFrame(np.stack([doc_ids, feature_ids]).T,
                      columns=["doc_id", "feature_id"])

    # Every `int_rate` number of documents, look to pair some features with
    # another document
    int_rate = 27
    # Approximate percentage of features that may be shared
    prob_share = 0.2
    for doc_ix, (doc_id, gdf) in enumerate(df.groupby("doc_id")):
        if doc_ix % int_rate == 0:
            this_ix = gdf.index #df[df.doc_id == doc_id].index
            # Random other document
            other_doc = np.random.choice(doc_ids)
            other_features = df[df.doc_id == other_doc].feature_id
            # Insert a few items from the other document
            new_features = [f if prob_share < np.random.rand()
                              else np.random.choice(other_features)
                            for f in gdf.feature_id]
            df.loc[this_ix, "feature_id"] = new_features

    print("Initial Dataframe of (Doc_id, feature_id) data:")
    print(df)
    print(df.feature_id.value_counts())
    return df


# Groupby feature and count shared docs
#######################################

def gpu_groupby_features(frame,
                         min_collisions=2,
                         max_collisions=40,
                         force_cpu=False,
                         do_combs=True):
    torch.autograd.set_grad_enabled(False)
    device = torch.device("cuda:0" if torch.cuda.is_available()
                          and not force_cpu else "cpu")

    # Detach from computational graph (this isn't a NN)
    X = torch.from_numpy(frame.values).to(device).detach()

    # Removing the rows which do not have a duplicated feature_id
    #   *really* improves the speed (~20x) (less iterations in the for loop)
    U, I, C = torch.unique(X[:, 1], return_counts=True, return_inverse=True)
    # Number of docs sharing each feature
    D = C.gather(index=I, dim=0)
    # Good row (doc_id-feature_id pair) boolean index
    B = (min_collisions <= D)# & (D <= max_collisions)

    # Filter to only the relevant (duplicated feature_id) data
    X = X[B]

    # Sort the (document_id, feature_id) pairs by the feature ids
    srt_ix = X[:, 1].argsort()
    X = X[srt_ix]

    ###############################################
    # Apparent Bottleneck
    ###############################################
    # Now that we have sorted X by feature_id, if we get the counts for each,
    #   this will give us a list of sizes to split the array
    U, C = torch.unique(X[:, 1], return_counts=True)
    C = list(C.cpu().numpy())
    if len(C)>0 and do_combs:
        doc_pairs = torch.cat(
                   [torch.combinations(doc_ids.unique(), r=2).sort()[0]
                    for doc_ids in torch.split(X[:, 0], split_size_or_sections=C)],
                   dim=0)

        # now count the document pairings
        U, C = torch.unique(doc_pairs, dim=0, return_counts=True)
        out = torch.cat([U, torch.unsqueeze(C, 1)], dim=1)
        srt_ix = out[:, 2].argsort()
        out = out[srt_ix]
        out = pd.DataFrame(out.cpu().numpy(),
                            columns=["docid_0", "docid_1", "count"])
        return out.sort_values(["count", "docid_0", "docid_1"])
    elif len(C)==0:
        print("No duplicates present")
        return
    else:
        return


def get_combs(x):
    min_collisions=2
    feat, doc_id_df = x
    if min_collisions <= len(doc_id_df):#& len(doc_id_df) <= max_collisions
        return [sorted(pair) for pair in
                itertools.combinations(set(doc_id_df.doc_id.tolist()), r=2)]
    else:
        return


def pandas_groupby_features(df,
                            min_collisions=2,
                            max_collisions=40):

    results = []
    with mp.Pool(processes=mp.cpu_count()) as p:
        for docpair_list in p.imap(get_combs, df.groupby("feature_id")):
            if docpair_list:
                results += docpair_list

    results = pd.DataFrame(results, columns=["docid_0", "docid_1"])
    results = results.value_counts(["docid_0", "docid_1"])
    results.name = "count"
    results = results.reset_index()
    return results.sort_values(["count", "docid_0", "docid_1"])


n_range = [10000, 100000, 200000, 300000, 400000, 500000, 1000000]
df = load_data(n_range[-1])
test1 = gpu_groupby_features(df[:n_range[-1]])
print(test1)
test2 = pandas_groupby_features(df[:n_range[-1]])
print(test2)
print(np.all(test1.values==test2.values))

perfplot.show(
    setup=lambda n: df[:n],
    kernels=[
        lambda a: gpu_groupby_features(a),
        lambda a: gpu_groupby_features(a, do_combs=False),
        lambda a: gpu_groupby_features(a, force_cpu=True),
        lambda a: gpu_groupby_features(a, force_cpu=True, do_combs=False),
        lambda a: pandas_groupby_features(a)
    ],
    equality_check=None,
    labels=["GPU_torch", "GPU_torch_without_combos",
            "CPU_torch", "CPU_torch_without_combos",
            "pandas_multiprocessing"],
    n_range=n_range,
    max_time=120,
    xlabel="n_docs"
)

此基准测试代码的输出显示如下:

您可能会注意到,GPU 实现比 CPU (pytorch) 实现慢得多。我已经包含了without_combos 以进行比较,以表明大部分处理时间都进入了代码的torch.combinationstorch.split 部分。 pandas 的实现也被包括在内,作为解决问题的简单方法的参考点。

我注意到,首先删除不重复的 feature_id 对性能有很大帮助。因此,将问题分成每个 feature_id 具有相同数量的 doc_id 的数据组可能会大大提高速度,但我不确定最简单的方法是什么。

【问题讨论】:

【参考方案1】:

似乎我发现通过迭代过滤到仅阵列的视图,其中某个特征由 n 个文档共享,可以显着提高性能。在这样做时(以​​及按特征排序),您可以使用跳过每 n 行的切片语法。 然后可以通过对组合索引进行切片来实现组合 - 但现在它一次收集整个数组,而不是遍历每个组。

上面的文字可能没有任何意义,所以这是我最终使用的函数:

def doc_pairs_with_n_docs_per_shingle(X, nps=2):
    U, I, C = torch.unique(X[:, 1],
                           return_counts = True,
                           return_inverse = True)
    # Filter array by number of documents shared by each feature
    #   nps is the *desired* number of documents shared by each feature
    D = C.gather(index=I, dim=0)
    B = D == nps

    X = X[B]
    X = X[X[:, 1].argsort()]
    ixs = list(itertools.combinations(range(nps), r=2))
    doc_pairs = torch.vstack([torch.column_stack(
                                (X[ix[0]::nps, 0],
                                 X[ix[1]::nps, 0])).sort(axis=1)[0]
                              for ix in ixs])
    return doc_pairs

def gpu_doc_pairs(frame,
                  min_collisions=2,
                  max_collisions=40,
                  force_cpu=False,
                  do_combs=True):
    torch.autograd.set_grad_enabled(False)
    device = torch.device("cuda:0" if torch.cuda.is_available()
                          and not force_cpu else "cpu")

    # Detach from computational graph (this isn't a NN)
    X = torch.from_numpy(frame.values).to(device).detach()

    # Removing the rows which do not have a duplicated feature_id
    #   *really* improves the speed (~20x) (less iterations in the for loop)
    U, I, C = torch.unique(X[:, 1], return_counts=True, return_inverse=True)
    # Number of docs sharing each feature
    D = C.gather(index=I, dim=0)
    # Good row (doc_id-feature_id pair) boolean index
    B = (min_collisions <= D)# & (D <= max_collisions)

    # Filter to only the relevant (duplicated feature_id) data
    X = X[B]

    # Sort the (document_id, feature_id) pairs by the feature ids
    srt_ix = X[:, 1].argsort()
    X = X[srt_ix]

    results = []
    for n_docs_per_feature in range(min_collisions):#, max_collisions):
        doc_pairs = doc_pairs_with_n_docs_per_shingle(X,
                                                      nps=n_docs_per_feature)
        if len(doc_pairs)>0:
            # Now count the document pairings
            U, C = torch.unique(doc_pairs, dim=0, return_counts=True)
            C = torch.unsqueeze(C, 1)
            arr = torch.cat([U, C], dim=1)
            srt_ix = arr[:, 2].argsort()
            arr = arr[srt_ix]
            results += [arr.cpu().numpy()]

    results = pd.DataFrame(np.vstack(results),
                        columns=["docid_0", "docid_1", "count"])
    return results.sort_values(["count", "docid_0", "docid_1"])

为了达到这个性能(GPU_group_approach):

并放大:

我知道目前这很丑。希望我可以清理它并使其更通用(例如,torch.groupby 类型的函数会很好),但现在如果有人遇到类似问题,他们可能会对此有所了解。

【讨论】:

请注意,与在 CPU 上相比,查找唯一值是一种在 GPU 上难以高效完成的计算(尤其是在使用多线程完成时)。即使是最先进的算法也不是那么好。不过,如果您的 GPU 是高端的,而您的 CPU 也不是很强大,那么 GPU 实现可以更快(尽管它应该消耗更多的能量)。 @JérômeRichard 感谢您的评论,我会记住这一点。我真的很希望能在 GPU 上运行它以加快速度,但你可能是对的。也许我会对相同算法的 pytorch CPU 版本进行基准测试并进行比较。我想如果 CPU 版本在这里始终优于 GPU 版本,我可以简单地完全删除 pytorch 依赖项以进行多处理,因为 nvidia 不断地破坏一切(tf、pytorch,基本上所有 python 开发),每次更新都会发生重大变化。跨度>

以上是关于Groupby、Split 和 Count 组合(删除循环以提高 pytorch 性能)的主要内容,如果未能解决你的问题,请参考以下文章

具有两个分类变量的数据帧上的 Groupby 和 count() [重复]

如何在熊猫中执行选择性 groupby().count()?

itertools.groupby 的反面?

Linq:GroupBy、Sum 和 Count

hive grouping sets 等聚合函数

groupby 的妙用(注意size和count)