对RDD进行Spark重复数据删除以获得更大的RDD
Posted
技术标签:
【中文标题】对RDD进行Spark重复数据删除以获得更大的RDD【英文标题】:Spark deduplication of RDD to get bigger RDD 【发布时间】:2017-06-07 15:51:26 【问题描述】:我有一个从磁盘加载的数据框
df_ = sqlContext.read.json("/Users/spark_stats/test.json")
它包含 500k 行。 我的脚本在这个大小上运行良好,但我想在 5M 行上测试它,有没有办法将 df 复制 9 次? (对我来说,在 df 中有重复并不重要)
我已经在使用 union 但它真的太慢了(因为我认为它每次都从磁盘读取)
df = df_
for i in range(9):
df = df.union(df_)
您是否有一个干净的方法来做到这一点?
谢谢
【问题讨论】:
从数据源读取数据后使用 .cache()。 谢谢,谢谢,效果很好 【参考方案1】:你可以使用爆炸。它应该只从原始磁盘读取一次:
from pyspark.sql.types import *
from pyspark.sql.functions import *
schema = StructType([StructField("f1", StringType()), StructField("f2", StringType())])
data = [("a", "b"), ("c", "d")]
rdd = sc.parallelize(data)
df = sqlContext.createDataFrame(rdd, schema)
# Create an array with as many values as times you want to duplicate the rows
dups_array = [lit(i) for i in xrange(9)]
duplicated = df.withColumn("duplicate", array(*dups_array)) \
.withColumn("duplicate", explode("duplicate")) \
.drop("duplicate")
【讨论】:
以上是关于对RDD进行Spark重复数据删除以获得更大的RDD的主要内容,如果未能解决你的问题,请参考以下文章