在 Spark 中对 RDD 执行 group by 并将每个组写入单独的 Parquet 文件

Posted

技术标签:

【中文标题】在 Spark 中对 RDD 执行 group by 并将每个组写入单独的 Parquet 文件【英文标题】:Perform group by on RDD in Spark and write each group as individual Parquet file 【发布时间】:2016-02-16 23:21:17 【问题描述】:

我在内存中有一个 RDD。我想使用一些任意函数对 RDD 进行分组,然后将每个单独的组写成单独的 Parquet 文件。

例如,如果我的 RDD 由以下形式的 JSON 字符串组成:

"type":"finish","resolution":"success","csr_id": 214
"type":"create","resolution":"failure","csr_id": 321
"type":"action","resolution":"success","csr_id": 262

我想按“类型”属性对 JSON 字符串进行分组,并将具有相同“类型”的每组字符串写入同一个 Parquet 文件。

我可以看到 DataFrame API 可以按如下方式写出 Parquet 文件(例如,如果 RDD 由 JSON 字符串组成):

final JavaRDD<String> rdd = ...
final SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
final DataFrame dataFrame = sqlContext.read().json(rdd);
dataFrame.write().parquet(location);

这意味着整个 DataFrame 被写入 Parquet 文件,因此 Parquet 文件将包含具有不同“type”属性值的记录。

Dataframe API 还提供 groupBy 函数:

final GroupedData groupedData = dataFrame.groupBy(this::myFunction);

但是 GroupedData API 似乎没有提供将每个组写入单个文件的任何功能。

有什么想法吗?

【问题讨论】:

【参考方案1】:

您不能写入GroupedData,但您可以在写入时对数据进行分区:

dataFrame.write.partitionBy("type").format("parquet").save("/tmp/foo")

每种类型都会以$column=$value 格式写入自己的目录。这些可以单独加载:

sqlContext.read.parquet("/tmp/foo/type=action").show
// +------+----------+
// |csr_id|resolution|
// +------+----------+
// |   262|   success|
// +------+----------+

【讨论】:

以上是关于在 Spark 中对 RDD 执行 group by 并将每个组写入单独的 Parquet 文件的主要内容,如果未能解决你的问题,请参考以下文章

我可以将 Pyspark RDD 用作 Pandas DataFrame 吗? Pyspark/spark 在数据分析中对 Pandas 的限制?

在spark sql中对窗口函数使用having子句的语义是什么?

Apache Spark:地图与地图分区?

深入理解spark streaming

spark进行groupby之后值转成list

Spark-RDD 转换算子(Value 类型)