当明确给出 s3 路径时,模式合并不起作用
Posted
技术标签:
【中文标题】当明确给出 s3 路径时,模式合并不起作用【英文标题】:Schema Merging not working when the s3 paths are given explicitly 【发布时间】:2019-10-09 17:57:02 【问题描述】:当显式指定 s3 路径以读取不在同一目录中的 parquet 文件时,不会推断分区方案,因此我在 spark 数据框中没有得到分区列。
我在读取 parquet 文件时尝试了选项 merge schema = True,但在给定根文件夹而不是显式路径时,这似乎有效。
我当前的 s3 目录结构。
bucket
folder1
.....|- col=val1/file1.parquet
.....|- col=val2/file2.parquet
.....|- col=val3/file3.parquet
.....
folder2
.....|- col=val1/file4.parquet
.....|- col=val2/file5.parquet
.....|- col=val3/file6.parquet
.....
.....
我想读取键 col 值为 val1 和 val2 的所有文件。 因此,我将这些路径的 s3 路径作为一个列表并明确地阅读它们。
s3paths = ['s3://bucket/folder1/col=val1/file1.parquet',
's3://bucket/folder1/col=val2/file2.parquet',
's3://bucket/folder1/col=val1/file4.parquet',
's3://bucket/folder2/col=val2/file5.parquet']
df = spark.read.parquet(*s3paths)
# another option
df = spark.read.options("mergeSchema", True).parquet(*s3paths)
我能够读取文件,但列 col 没有出现在 spark 数据框中。
我希望列“col”出现在我的数据框中。
【问题讨论】:
在将数据写入存储时按特定列对数据进行分区后,该列将被排除在数据之外。如果您阅读父目录,您仍然可以过滤列。除了过滤之外,您还需要该列吗? 我想对这些文件进行聚合。我必须在分区列上使用 group by,以便为每个分区进行聚合。文件数量太多,无法一次全部读取。 (230000 个文件 - 1.2Tb)。 然后您可以使用 pyspark.sql.functions.input_file_name() (spark.apache.org/docs/latest/api/python/…) 获取 parquet 文件的目录并编写正则表达式以从中提取感兴趣的列的值。 我可以使用 withColumn 将其添加为我的数据框中的列吗?我唯一的疑问是,一个单独的 spark 任务可以处理多个文件,对吗?如果是,它返回哪个文件名? 正确,可以使用withColumn。转换将返回保存每一行记录的文件的目录。 【参考方案1】:This is the folder structure of my data.
data
.....|- period_id=201909/file1.csv
.....|- period_id=201910/file2.csv
file1.csv
---------
Name,Age
Amit,28
Ashish,30
Swaraj,27
file2.csv
----------
Name,Age,Sal
Amit,28,10000
Ashish,30,20000
Swaraj,27,15000
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import input_file_name
spark = SparkSession.builder.appName("Test").getOrCreate()
current_period = "201910"
prev_period = "201909"
mergedDF = spark.read.format("csv").option("mergeSchema", "true").option("header", "true")\
.option("inferSchema", "true")\
.load(["data/period_id=current_period/*".format(current_period=current_period),
"data/period_id=prev_period/*".format(prev_period=prev_period)]) \
.withColumn("filename", input_file_name())
splitDF = mergedDF.withColumn("period_with_file_name", F.split(F.col("filename"), "=").getItem(1))\
.withColumn("period_date_yyyymm",F.split(F.col("period_with_file_name"), "/").getItem(0).astype("int"))\
.drop("filename", "period_with_file_name")
splitDF.printSchema()
splitDF.show(truncate=False)
Output
----------
root
|-- Name: string (nullable = true)
|-- Age: string (nullable = true)
|-- Sal: integer (nullable = true)
|-- period_date_yyyymm: integer (nullable = true)
+------+---+-----+------------------+
|Name |Age|Sal |period_date_yyyymm|
+------+---+-----+------------------+
|Amit |28 |10000|201910 |
|Ashish|30 |20000|201910 |
|Swaraj|27 |15000|201910 |
|Amit |28 |null |201909 |
|Ashish|30 |null |201909 |
|Swaraj|27 |null |201909 |
+------+---+-----+------------------+
The column 'col' i.e 201910, 201909 here is present in the dataframe.
This code is as per my case. Just change based on your requirement.
【讨论】:
以上是关于当明确给出 s3 路径时,模式合并不起作用的主要内容,如果未能解决你的问题,请参考以下文章
与jquery合并/压缩时jquery Confirm插件不起作用