当明确给出 s3 路径时,模式合并不起作用

Posted

技术标签:

【中文标题】当明确给出 s3 路径时,模式合并不起作用【英文标题】:Schema Merging not working when the s3 paths are given explicitly 【发布时间】:2019-10-09 17:57:02 【问题描述】:

当显式指定 s3 路径以读取不在同一目录中的 parquet 文件时,不会推断分区方案,因此我在 spark 数据框中没有得到分区列。

我在读取 parquet 文件时尝试了选项 merge schema = True,但在给定根文件夹而不是显式路径时,这似乎有效。

我当前的 s3 目录结构。

bucket
    folder1
        .....|- col=val1/file1.parquet
        .....|- col=val2/file2.parquet
        .....|- col=val3/file3.parquet
        .....
    folder2
        .....|- col=val1/file4.parquet
        .....|- col=val2/file5.parquet
        .....|- col=val3/file6.parquet
        .....
    .....

我想读取键 col 值为 val1 和 val2 的所有文件。 因此,我将这些路径的 s3 路径作为一个列表并明确地阅读它们。

s3paths = ['s3://bucket/folder1/col=val1/file1.parquet',
's3://bucket/folder1/col=val2/file2.parquet',
's3://bucket/folder1/col=val1/file4.parquet',
's3://bucket/folder2/col=val2/file5.parquet']

df = spark.read.parquet(*s3paths) 

# another option
df = spark.read.options("mergeSchema", True).parquet(*s3paths) 

我能够读取文件,但列 col 没有出现在 spark 数据框中。

我希望列“col”出现在我的数据框中。

【问题讨论】:

在将数据写入存储时按特定列对数据进行分区后,该列将被排除在数据之外。如果您阅读父目录,您仍然可以过滤列。除了过滤之外,您还需要该列吗? 我想对这些文件进行聚合。我必须在分区列上使用 group by,以便为每个分区进行聚合。文件数量太多,无法一次全部读取。 (230000 个文件 - 1.2Tb)。 然后您可以使用 pyspark.sql.functions.input_file_name() (spark.apache.org/docs/latest/api/python/…) 获取 parquet 文件的目录并编写正则表达式以从中提取感兴趣的列的值。 我可以使用 withColumn 将其添加为我的数据框中的列吗?我唯一的疑问是,一个单独的 spark 任务可以处理多个文件,对吗?如果是,它返回哪个文件名? 正确,可以使用withColumn。转换将返回保存每一行记录的文件的目录。 【参考方案1】:
This is the folder structure of my data.
    data
        .....|- period_id=201909/file1.csv
        .....|- period_id=201910/file2.csv
file1.csv
---------
Name,Age
Amit,28
Ashish,30
Swaraj,27

file2.csv
----------
Name,Age,Sal
Amit,28,10000
Ashish,30,20000
Swaraj,27,15000

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import input_file_name
spark = SparkSession.builder.appName("Test").getOrCreate()


current_period = "201910"
prev_period = "201909"
mergedDF = spark.read.format("csv").option("mergeSchema", "true").option("header", "true")\
    .option("inferSchema", "true")\
    .load(["data/period_id=current_period/*".format(current_period=current_period),
           "data/period_id=prev_period/*".format(prev_period=prev_period)]) \
    .withColumn("filename", input_file_name())

splitDF = mergedDF.withColumn("period_with_file_name", F.split(F.col("filename"), "=").getItem(1))\
    .withColumn("period_date_yyyymm",F.split(F.col("period_with_file_name"), "/").getItem(0).astype("int"))\
    .drop("filename", "period_with_file_name")

splitDF.printSchema()

splitDF.show(truncate=False)

Output
----------
root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Sal: integer (nullable = true)
 |-- period_date_yyyymm: integer (nullable = true)

+------+---+-----+------------------+
|Name  |Age|Sal  |period_date_yyyymm|
+------+---+-----+------------------+
|Amit  |28 |10000|201910            |
|Ashish|30 |20000|201910            |
|Swaraj|27 |15000|201910            |
|Amit  |28 |null |201909            |
|Ashish|30 |null |201909            |
|Swaraj|27 |null |201909            |
+------+---+-----+------------------+

The column 'col' i.e 201910, 201909 here is present in the dataframe.

This code is as per my case. Just change based on your requirement.

【讨论】:

以上是关于当明确给出 s3 路径时,模式合并不起作用的主要内容,如果未能解决你的问题,请参考以下文章

c#:为啥当我尝试在模拟中使用合并运算符时它不起作用

CSS 类中给出的图像 url 不起作用

AWS S3+Cloudfront 静态网站子目录不起作用

与jquery合并/压缩时jquery Confirm插件不起作用

当 Windows 进入睡眠模式然后唤醒时 setTimeout 不起作用

当 Windows 进入睡眠模式然后唤醒时 setTimeout 不起作用