PySpark,按键交叉

Posted

技术标签:

【中文标题】PySpark,按键交叉【英文标题】:PySpark, intersection by Key 【发布时间】:2016-06-08 11:52:19 【问题描述】:

例如,我在 PySpark 中有两个 RDD:

((0,0), 1)
((0,1), 2)
((1,0), 3)
((1,1), 4)

第二个就是

((0,1), 3)
((1,1), 0)

我希望第一个 RDD 与第二个 RDD 有交集。实际上,第二个 RDD 必须为第一个 RDD 扮演掩码的角色。输出应该是:

((0,1), 2)
((1,1), 4)

它表示来自第一个 RDD 的值,但仅适用于来自第二个 RDD 的键。两个RDD的长度不同。

我有一些解决方案(必须证明),但是是这样的:

rdd3 = rdd1.cartesian(rdd2)
rdd4 = rdd3.filter(lambda((key1, val1), (key2, val2)): key1 == key2)
rdd5 = rdd4.map(lambda((key1, val1), (key2, val2)): (key1, val1))

我不知道,这个解决方案的效率如何。想听听经验丰富的 Spark 程序员的意见....

【问题讨论】:

【参考方案1】:

也许我们不应该把这个过程看作是加入。您不是真的要加入两个数据集,而是要从另一个数据集中减去一个数据集?

我将说明我对您的问题的假设

    您根本不关心第二个数据集中的值。 您只想保留键值对出现在第二个数据集中的第一个数据集中的值。

想法 1:Cogroup(我认为可能是最快的方式)。它基本上是计算两个数据集的交集。

rdd1 = sc.parallelize([((0,0), 1), ((0,1), 2), ((1,0), 3), ((1,1), 4)])
rdd2 = sc.parallelize([((0,1), 3), ((1,1), 0)])
intersection = rdd1.cogroup(rdd2).filter(lambda x: x[1][0] and x[1][1])
final_rdd = intersection.map(lambda x: (x[0], list(x[1][0]))).map(lambda (x,y): (x, y[0]))

想法2:按键减法

rdd1 = sc.parallelize([((0,0), 1), ((0,1), 2), ((1,0), 3), ((1,1), 4)])
rdd2 = sc.parallelize([((0,1), 3), ((1,1), 0)])

unwanted_rows = rdd1.subtractByKey(rdd2)
wanted_rows = rdd1.subtractByKey(unwanted_rows)

我不确定这是否比您的方法更快。它确实需要两个subtractByKey 操作,这可能很慢。此外,此方法不保留顺序(例如,((0, 1), 2),尽管在您的第一个数据集中排名第一,但在最终数据集中排名第二)。但我无法想象这很重要。

至于哪个更快,我认为这取决于您的cartersian join 需要多长时间。映射和过滤往往比subtractByKey 所需的shuffle 操作更快,但cartesian 当然是一个耗时的过程。

无论如何,我想你可以试试这个方法,看看它是否适合你!


性能改进的旁注,具体取决于 RDD 的大小。

如果rdd1 足够小,可以保存在主内存中,那么如果您广播它然后流rdd2 对它进行流式传输,则减法过程可以大大加快。但是,我承认这种情况很少见。

【讨论】:

非常感谢。我想,这应该是一种根据键结果对我的 RDD 进行分类的方法。再次感谢您。 我还建议您查看intersection。现在它似乎只在整行匹配时才有效,这不适用于您的用例,但我想知道您是否可以操纵 intersection 仅在键值对的交集上工作。如果我弄清楚了,我会更新我的答案。 intersection 是另外一回事,我真的在寻找键的交集,而不是值(当然也是你告诉我的)。但我会花点时间分析你的建议。谢谢 我在cogroup 上添加了一些信息,它允许您在保持值的同时仅匹配键。

以上是关于PySpark,按键交叉的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark:如何改善空间交叉点?

pyspark中的交叉验证

PySpark 中的分层交叉验证

pyspark - 如何交叉验证几种 ML 算法

Pyspark交叉验证后如何获得最佳超参数值?

使用 pyspark 交叉组合两个 RDD