如何将 RDD 保存到单个镶木地板文件中?
Posted
技术标签:
【中文标题】如何将 RDD 保存到单个镶木地板文件中?【英文标题】:How can i save RDD to a single parquet file? 【发布时间】:2016-10-04 17:20:43 【问题描述】:我使用 pyspark 2.0、hadoop 2.7.2。 这是我的代码:
def func(df):
new_df = pd.DataFrame(df['id'])
new_df['num'] = new_df['num'] * 12
return new_df
set = sqlContext.read.parquet("data_set.parquet")
columns = set.columns
map_res = set.rdd.mapPartitions(lambda iter_: func(pd.DataFrame(list(iter_),
columns=columns)))
现在,我需要将 map_res RDD 保存为 parquet 文件 new.parquet。 有什么办法可以在保存之前不创建大型数据框的情况下做到这一点?或者有没有可能单独保存RDD的每个分区,然后合并所有保存的文件?
附:由于它的大小非常大,我想在不创建数据框的情况下进行管理。
【问题讨论】:
@santon 似乎需要将所有单个数据帧合并成一个保留模式的大数据帧。将它们保留为 RDD 的元素将不允许像使用 DataFrame 那样对结果进行操作。 @ИванСудос 正确,所以我不希望所有数据都移动到一个节点 @santon 当您将管道制作为单个镶木地板文件时,因为参数更容易处理 【参考方案1】:只有两种方法可以做到这一点:
一个是使用"coalesce(1)"
这将确保所有数据都保存到 1 个文件而不是多个文件中(200 是 spark 默认分区数)使用dataframe.write.save("/this/is/path")
。
另一种选择是将输出写入配置单元表,然后使用hive -e "select * from table" > data.tsv
,它将用制表符分隔。
【讨论】:
【参考方案2】:我建议这样做:
dataframes = []
#creating index
map_res = map_res.zipWithIndex()
# setting index as key
map_res = map_res.map(lambda x: (x[1],x[0]))
# creating one spark df per element
for i in range(0, map_res.count()):
partial_dataframe_pd = map_res.lookup(i)
partial_dataframe = sqlContext.createDataFrame(partial_dataframe_pd)
dataframes.append(partial_dataframe)
# concatination
result_df = dataframes.pop()
for df in dataframes:
result_df.union(df)
#saving
result_df.write.parquet("...")
如果您的分区数量较少(2-100),那么它应该工作得相当快。
【讨论】:
【参考方案3】:要以 Parquet 格式保存文件,您需要将 Rdd 转换为 DataFrame,因为 Parquet 文件总是需要一个模式来处理。
【讨论】:
【参考方案4】:你可以使用:
set.coalesce(1).write.parquet("myFile.parquet")
【讨论】:
以上是关于如何将 RDD 保存到单个镶木地板文件中?的主要内容,如果未能解决你的问题,请参考以下文章