如何将 RDD 保存到单个镶木地板文件中?

Posted

技术标签:

【中文标题】如何将 RDD 保存到单个镶木地板文件中?【英文标题】:How can i save RDD to a single parquet file? 【发布时间】:2016-10-04 17:20:43 【问题描述】:

我使用 pyspark 2.0、hadoop 2.7.2。 这是我的代码:

def func(df):
    new_df = pd.DataFrame(df['id'])
    new_df['num'] = new_df['num'] * 12
    return new_df

set = sqlContext.read.parquet("data_set.parquet")
columns = set.columns
map_res = set.rdd.mapPartitions(lambda iter_: func(pd.DataFrame(list(iter_), 
                                                   columns=columns)))

现在,我需要将 ma​​p_res RDD 保存为 parquet 文件 new.parquet。 有什么办法可以在保存之前不创建大型数据框的情况下做到这一点?或者有没有可能单独保存RDD的每个分区,然后合并所有保存的文件?

附:由于它的大小非常大,我想在不创建数据框的情况下进行管理。

【问题讨论】:

@santon 似乎需要将所有单个数据帧合并成一个保留模式的大数据帧。将它们保留为 RDD 的元素将不允许像使用 DataFrame 那样对结果进行操作。 @ИванСудос 正确,所以我不希望所有数据都移动到一个节点 @santon 当您将管道制作为单个镶木地板文件时,因为参数更容易处理 【参考方案1】:

只有两种方法可以做到这一点:

一个是使用"coalesce(1)" 这将确保所有数据都保存到 1 个文件而不是多个文件中(200 是 spark 默认分区数)使用dataframe.write.save("/this/is/path")

另一种选择是将输出写入配置单元表,然后使用hive -e "select * from table" > data.tsv,它将用制表符分隔。

【讨论】:

【参考方案2】:

我建议这样做:

dataframes = []
#creating index
map_res = map_res.zipWithIndex()
# setting index as key
map_res = map_res.map(lambda x: (x[1],x[0]))
# creating one spark df per element
for i in range(0, map_res.count()):
    partial_dataframe_pd  = map_res.lookup(i)
    partial_dataframe = sqlContext.createDataFrame(partial_dataframe_pd)
    dataframes.append(partial_dataframe)
# concatination
result_df = dataframes.pop()
for df in dataframes:
    result_df.union(df)   
#saving
result_df.write.parquet("...")

如果您的分区数量较少(2-100),那么它应该工作得相当快。

【讨论】:

【参考方案3】:

要以 Parquet 格式保存文件,您需要将 Rdd 转换为 DataFrame,因为 Parquet 文件总是需要一个模式来处理。

【讨论】:

【参考方案4】:

你可以使用:

set.coalesce(1).write.parquet("myFile.parquet")

【讨论】:

以上是关于如何将 RDD 保存到单个镶木地板文件中?的主要内容,如果未能解决你的问题,请参考以下文章

如何将单个镶木地板文件从 s3 读入 dask 数据帧?

如何从 hadoopish 文件夹加载镶木地板文件

如何使用镶木地板在火花中读取和写入同一个文件?

如何使用 Spark 将镶木地板文件加载到 Hive 表中?

将数据保存到HDFS的格式是什么?

如何加入两个镶木地板数据集?