Spark:保存按“虚拟”列分区的 DataFrame

Posted

技术标签:

【中文标题】Spark:保存按“虚拟”列分区的 DataFrame【英文标题】:Spark: save DataFrame partitioned by "virtual" column 【发布时间】:2016-02-16 16:07:50 【问题描述】:

我正在使用 PySpark 执行经典的 ETL 作业(加载数据集、处理它、保存它),并希望将我的 Dataframe 保存为由“虚拟”列分区的文件/目录;我所说的“虚拟”是指我有一个 Timestamp 列,它是一个包含 ISO 8601 编码日期的字符串,我想按年/月/日进行分区;但我实际上在 DataFrame 中没有年、月或日列;我有这个时间戳,我可以从中派生这些列,但我不希望我的结果项序列化这些列之一。

将 DataFrame 保存到磁盘产生的文件结构应如下所示:

/ 
    year=2016/
        month=01/
            day=01/
                part-****.gz

有没有办法用 Spark / Pyspark 做我想做的事?

【问题讨论】:

【参考方案1】:

用于分区的列不包含在序列化数据本身中。例如,如果您像这样创建DataFrame

df = sc.parallelize([
    (1, "foo", 2.0, "2016-02-16"),
    (2, "bar", 3.0, "2016-02-16")
]).toDF(["id", "x", "y", "date"])

并写成如下:

import tempfile
from pyspark.sql.functions import col, dayofmonth, month, year
outdir = tempfile.mktemp()

dt = col("date").cast("date")
fname = [(year, "year"), (month, "month"), (dayofmonth, "day")]
exprs = [col("*")] + [f(dt).alias(name) for f, name in fname]

(df
    .select(*exprs)
    .write
    .partitionBy(*(name for _, name in fname))
    .format("json")
    .save(outdir))

单个文件不包含分区列:

import os

(sqlContext.read
    .json(os.path.join(outdir, "year=2016/month=2/day=16/"))
    .printSchema())

## root
##  |-- date: string (nullable = true)
##  |-- id: long (nullable = true)
##  |-- x: string (nullable = true)
##  |-- y: double (nullable = true)

分区数据仅存储在目录结构中,不会在序列化文件中重复。只有当您读取完整或部分目录树时才会附加它:

sqlContext.read.json(outdir).printSchema()

## root
##  |-- date: string (nullable = true)
##  |-- id: long (nullable = true)
##  |-- x: string (nullable = true)
##  |-- y: double (nullable = true)
##  |-- year: integer (nullable = true)
##  |-- month: integer (nullable = true)
##  |-- day: integer (nullable = true)

sqlContext.read.json(os.path.join(outdir, "year=2016/month=2/")).printSchema()

## root
##  |-- date: string (nullable = true)
##  |-- id: long (nullable = true)
##  |-- x: string (nullable = true)
##  |-- y: double (nullable = true)
##  |-- day: integer (nullable = true)

【讨论】:

我是 python 新手。有没有办法在路径中没有 year=、month= 和 day= 的情况下做到这一点?我了解大部分内容 嗨@deanw,您找到解决“年=”“月=”等问题的方法了吗? @Pablo A 不幸的是没有。 我需要进一步分区,所以按:年-月-日-上午/下午。知道怎么做吗?

以上是关于Spark:保存按“虚拟”列分区的 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

按重叠列分区时的高效 Spark 数据集操作

无分区列性能的 Spark 下推过滤器

“Exchange hashpartitioning”如何在 spark 中工作

pyspark:如何以“yyyy-MM-dd HH”格式按日期列分区

Spark重新分区不均匀分布记录

将 PubSub 流保存到 GCS 中的分区拼花文件