在 Spark 中对 RDD 执行 group by 并将每个组写入单独的 Parquet 文件
Posted
技术标签:
【中文标题】在 Spark 中对 RDD 执行 group by 并将每个组写入单独的 Parquet 文件【英文标题】:Perform group by on RDD in Spark and write each group as individual Parquet file 【发布时间】:2016-02-16 23:21:17 【问题描述】:我在内存中有一个 RDD。我想使用一些任意函数对 RDD 进行分组,然后将每个单独的组写成单独的 Parquet 文件。
例如,如果我的 RDD 由以下形式的 JSON 字符串组成:
"type":"finish","resolution":"success","csr_id": 214
"type":"create","resolution":"failure","csr_id": 321
"type":"action","resolution":"success","csr_id": 262
我想按“类型”属性对 JSON 字符串进行分组,并将具有相同“类型”的每组字符串写入同一个 Parquet 文件。
我可以看到 DataFrame API 可以按如下方式写出 Parquet 文件(例如,如果 RDD 由 JSON 字符串组成):
final JavaRDD<String> rdd = ...
final SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
final DataFrame dataFrame = sqlContext.read().json(rdd);
dataFrame.write().parquet(location);
这意味着整个 DataFrame 被写入 Parquet 文件,因此 Parquet 文件将包含具有不同“type”属性值的记录。
Dataframe API 还提供 groupBy 函数:
final GroupedData groupedData = dataFrame.groupBy(this::myFunction);
但是 GroupedData API 似乎没有提供将每个组写入单个文件的任何功能。
有什么想法吗?
【问题讨论】:
【参考方案1】:您不能写入GroupedData
,但您可以在写入时对数据进行分区:
dataFrame.write.partitionBy("type").format("parquet").save("/tmp/foo")
每种类型都会以$column=$value
格式写入自己的目录。这些可以单独加载:
sqlContext.read.parquet("/tmp/foo/type=action").show
// +------+----------+
// |csr_id|resolution|
// +------+----------+
// | 262| success|
// +------+----------+
【讨论】:
以上是关于在 Spark 中对 RDD 执行 group by 并将每个组写入单独的 Parquet 文件的主要内容,如果未能解决你的问题,请参考以下文章
我可以将 Pyspark RDD 用作 Pandas DataFrame 吗? Pyspark/spark 在数据分析中对 Pandas 的限制?