PySpark,按键交叉
Posted
技术标签:
【中文标题】PySpark,按键交叉【英文标题】:PySpark, intersection by Key 【发布时间】:2016-06-08 11:52:19 【问题描述】:例如,我在 PySpark 中有两个 RDD:
((0,0), 1)
((0,1), 2)
((1,0), 3)
((1,1), 4)
第二个就是
((0,1), 3)
((1,1), 0)
我希望第一个 RDD 与第二个 RDD 有交集。实际上,第二个 RDD 必须为第一个 RDD 扮演掩码的角色。输出应该是:
((0,1), 2)
((1,1), 4)
它表示来自第一个 RDD 的值,但仅适用于来自第二个 RDD 的键。两个RDD的长度不同。
我有一些解决方案(必须证明),但是是这样的:
rdd3 = rdd1.cartesian(rdd2)
rdd4 = rdd3.filter(lambda((key1, val1), (key2, val2)): key1 == key2)
rdd5 = rdd4.map(lambda((key1, val1), (key2, val2)): (key1, val1))
我不知道,这个解决方案的效率如何。想听听经验丰富的 Spark 程序员的意见....
【问题讨论】:
【参考方案1】:也许我们不应该把这个过程看作是加入。您不是真的要加入两个数据集,而是要从另一个数据集中减去一个数据集?
我将说明我对您的问题的假设
-
您根本不关心第二个数据集中的值。
您只想保留键值对出现在第二个数据集中的第一个数据集中的值。
想法 1:Cogroup(我认为可能是最快的方式)。它基本上是计算两个数据集的交集。
rdd1 = sc.parallelize([((0,0), 1), ((0,1), 2), ((1,0), 3), ((1,1), 4)])
rdd2 = sc.parallelize([((0,1), 3), ((1,1), 0)])
intersection = rdd1.cogroup(rdd2).filter(lambda x: x[1][0] and x[1][1])
final_rdd = intersection.map(lambda x: (x[0], list(x[1][0]))).map(lambda (x,y): (x, y[0]))
想法2:按键减法
rdd1 = sc.parallelize([((0,0), 1), ((0,1), 2), ((1,0), 3), ((1,1), 4)])
rdd2 = sc.parallelize([((0,1), 3), ((1,1), 0)])
unwanted_rows = rdd1.subtractByKey(rdd2)
wanted_rows = rdd1.subtractByKey(unwanted_rows)
我不确定这是否比您的方法更快。它确实需要两个subtractByKey
操作,这可能很慢。此外,此方法不保留顺序(例如,((0, 1), 2)
,尽管在您的第一个数据集中排名第一,但在最终数据集中排名第二)。但我无法想象这很重要。
至于哪个更快,我认为这取决于您的cartersian join 需要多长时间。映射和过滤往往比subtractByKey
所需的shuffle 操作更快,但cartesian
当然是一个耗时的过程。
无论如何,我想你可以试试这个方法,看看它是否适合你!
性能改进的旁注,具体取决于 RDD 的大小。
如果rdd1
足够小,可以保存在主内存中,那么如果您广播它然后流rdd2
对它进行流式传输,则减法过程可以大大加快。但是,我承认这种情况很少见。
【讨论】:
非常感谢。我想,这应该是一种根据键结果对我的 RDD 进行分类的方法。再次感谢您。 我还建议您查看intersection
。现在它似乎只在整行匹配时才有效,这不适用于您的用例,但我想知道您是否可以操纵 intersection
仅在键值对的交集上工作。如果我弄清楚了,我会更新我的答案。
intersection 是另外一回事,我真的在寻找键的交集,而不是值(当然也是你告诉我的)。但我会花点时间分析你的建议。谢谢
我在cogroup
上添加了一些信息,它允许您在保持值的同时仅匹配键。以上是关于PySpark,按键交叉的主要内容,如果未能解决你的问题,请参考以下文章