对RDD进行Spark重复数据删除以获得更大的RDD

Posted

技术标签:

【中文标题】对RDD进行Spark重复数据删除以获得更大的RDD【英文标题】:Spark deduplication of RDD to get bigger RDD 【发布时间】:2017-06-07 15:51:26 【问题描述】:

我有一个从磁盘加载的数据框

df_ = sqlContext.read.json("/Users/spark_stats/test.json")

它包含 500k 行。 我的脚本在这个大小上运行良好,但我想在 5M 行上测试它,有没有办法将 df 复制 9 次? (对我来说,在 df 中有重复并不重要)

我已经在使用 union 但它真的太慢了​​(因为我认为它每次都从磁盘读取)

df = df_
for i in range(9): 
    df = df.union(df_)

您是否有一个干净的方法来做到这一点?

谢谢

【问题讨论】:

从数据源读取数据后使用 .cache()。 谢谢,谢谢,效果很好 【参考方案1】:

你可以使用爆炸。它应该只从原始磁盘读取一次:

from pyspark.sql.types import *
from pyspark.sql.functions import *

schema = StructType([StructField("f1", StringType()), StructField("f2", StringType())])

data = [("a", "b"), ("c", "d")]
rdd = sc.parallelize(data)
df = sqlContext.createDataFrame(rdd, schema)

# Create an array with as many values as times you want to duplicate the rows
dups_array = [lit(i) for i in xrange(9)]
duplicated = df.withColumn("duplicate", array(*dups_array)) \
               .withColumn("duplicate", explode("duplicate")) \
               .drop("duplicate")

【讨论】:

以上是关于对RDD进行Spark重复数据删除以获得更大的RDD的主要内容,如果未能解决你的问题,请参考以下文章

Spark记录-Spark性能优化(开发资源数据shuffle)

Spark分区

Spark分区

Spark——RDD算子

从 spark RDD 中删除空字符串

教你删除Oracle数据库中重复没用的数据