Pyspark 在元组列表上设置

Posted

技术标签:

【中文标题】Pyspark 在元组列表上设置【英文标题】:Pyspark set on list of tuples 【发布时间】:2019-05-01 19:26:51 【问题描述】:

我对 Pyspark 比较陌生。我有一个由元组列表组成的 RDD。我想在 RDD 上调用一个函数,它与 python set() 函数等效,可以消除任何重复项。

我也试图在分布式系统上从理论上理解这一点。如果 RDD 分布在多个 worker 中,set 函数如何工作?如果它使用的是简单 RDD 而不是 Pair RDD,它如何确定什么是重复的?

给定一个唯一元组的输入 RDD,其中元组中的元素必须是唯一的,即顺序无关紧要。

输入:

myTup = [('cat', 'dog'), , ('mouse', 'duck'), ('duck', 'cat'), ('cat', 'dog'), ('dog', 'cat'), ('dog', 'horse'), ('cat', 'duck'), ('dog', 'horse'), ('dog', 'horse')]

我想要类似的东西:

tuple_fix = list(set([tuple(sorted(t)) for t in my_Tup ]))

并得到输出:

[('cat', 'dog'), ('mouse', 'duck'), ('duck', 'cat'), ('dog', 'horse')]

感谢您抽出宝贵时间!

【问题讨论】:

【参考方案1】:

这是一个高级解释,希望能解释它如何在分布式系统中工作1

首先从myTup 中创建一个rdd

rdd = sc.parallelize(myTup)
print(rdd.collect())
#[('cat', 'dog'),
# ('mouse', 'duck'),
# ('duck', 'cat'),
# ('cat', 'dog'),
# ('dog', 'cat'),
# ('dog', 'horse'),
# ('cat', 'duck'),
# ('dog', 'horse'),
# ('dog', 'horse')]

每个元组都可以独立排序。每个工作人员都可以获取行的子集并进行排序——这非常简单。

sorted_rdd = rdd.map(lambda t: tuple(sorted(t)))
print(sorted_rdd.collect())
#[('cat', 'dog'),
# ('duck', 'mouse'),
# ('cat', 'duck'),
# ('cat', 'dog'),
# ('cat', 'dog'),
# ('dog', 'horse'),
# ('cat', 'duck'),
# ('dog', 'horse'),
# ('dog', 'horse')]

为了从sorted_rdd 中获取不同的元素,您可以使用distinct()。这可以通过分布式方式完成的方式是通过散列。散列算法用于决定哪个工作人员(reducer)获取每一行。这将在您的执行程序之间拆分数据,同时确保所有重复项都将发送到同一台机器。

最后,每台机器只是从它已发送的数据中发出不同的元组。

print(sorted_rdd.distinct().collect())
#[('cat', 'dog'), ('duck', 'mouse'), ('dog', 'horse'), ('cat', 'duck')]

注意事项

1:我不确定这究竟是如何实现的,但这是一种实现方式。

【讨论】:

以上是关于Pyspark 在元组列表上设置的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark:如何将现有非空列的元组列表作为数据框中的列值之一返回

使用 Pyspark 从单词列表的行条目创建元组并使用 RDD 计数

pyspark reduce键是一个元组值嵌套列表

Pyspark 使用 ArrayWritable

将列表转换为 pyspark 数据框

PySpark Dataframe 将两列转换为基于第三列值的元组新列