Pyspark 在元组列表上设置
Posted
技术标签:
【中文标题】Pyspark 在元组列表上设置【英文标题】:Pyspark set on list of tuples 【发布时间】:2019-05-01 19:26:51 【问题描述】:我对 Pyspark 比较陌生。我有一个由元组列表组成的 RDD。我想在 RDD 上调用一个函数,它与 python set() 函数等效,可以消除任何重复项。
我也试图在分布式系统上从理论上理解这一点。如果 RDD 分布在多个 worker 中,set 函数如何工作?如果它使用的是简单 RDD 而不是 Pair RDD,它如何确定什么是重复的?
给定一个唯一元组的输入 RDD,其中元组中的元素必须是唯一的,即顺序无关紧要。
输入:
myTup = [('cat', 'dog'), , ('mouse', 'duck'), ('duck', 'cat'), ('cat', 'dog'), ('dog', 'cat'), ('dog', 'horse'), ('cat', 'duck'), ('dog', 'horse'), ('dog', 'horse')]
我想要类似的东西:
tuple_fix = list(set([tuple(sorted(t)) for t in my_Tup ]))
并得到输出:
[('cat', 'dog'), ('mouse', 'duck'), ('duck', 'cat'), ('dog', 'horse')]
感谢您抽出宝贵时间!
【问题讨论】:
【参考方案1】:这是一个高级解释,希望能解释它如何在分布式系统中工作1。
首先从myTup
中创建一个rdd
:
rdd = sc.parallelize(myTup)
print(rdd.collect())
#[('cat', 'dog'),
# ('mouse', 'duck'),
# ('duck', 'cat'),
# ('cat', 'dog'),
# ('dog', 'cat'),
# ('dog', 'horse'),
# ('cat', 'duck'),
# ('dog', 'horse'),
# ('dog', 'horse')]
每个元组都可以独立排序。每个工作人员都可以获取行的子集并进行排序——这非常简单。
sorted_rdd = rdd.map(lambda t: tuple(sorted(t)))
print(sorted_rdd.collect())
#[('cat', 'dog'),
# ('duck', 'mouse'),
# ('cat', 'duck'),
# ('cat', 'dog'),
# ('cat', 'dog'),
# ('dog', 'horse'),
# ('cat', 'duck'),
# ('dog', 'horse'),
# ('dog', 'horse')]
为了从sorted_rdd
中获取不同的元素,您可以使用distinct()
。这可以通过分布式方式完成的方式是通过散列。散列算法用于决定哪个工作人员(reducer)获取每一行。这将在您的执行程序之间拆分数据,同时确保所有重复项都将发送到同一台机器。
最后,每台机器只是从它已发送的数据中发出不同的元组。
print(sorted_rdd.distinct().collect())
#[('cat', 'dog'), ('duck', 'mouse'), ('dog', 'horse'), ('cat', 'duck')]
注意事项:
1:我不确定这究竟是如何实现的,但这是一种实现方式。
【讨论】:
以上是关于Pyspark 在元组列表上设置的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark:如何将现有非空列的元组列表作为数据框中的列值之一返回