如何将不同的模式应用于单个数据集中的 csvs?
Posted
技术标签:
【中文标题】如何将不同的模式应用于单个数据集中的 csvs?【英文标题】:How to apply different schemas to csvs within a single dataset? 【发布时间】:2021-12-14 03:38:38 【问题描述】:我从一个大的 csv 压缩文件开始,我在 Palantir Foundry 中解压缩了它。
我现在有一个由多个 csv 组成的数据集(每年一个),其中 csv 几乎是相同的架构,但有一些差异。如何将架构单独应用于每个 csv 或规范化它们之间的架构?
【问题讨论】:
【参考方案1】:如果您的文件已解压缩并且只是在数据集中以.csv
s 的形式存在,您可以使用 Spark 的原生 spark_session.read.csv
方法,类似于我对 here 的回答。
这将如下所示:
from transforms.api import transform, Output, Input
from transforms.verbs.dataframes import union_many
def read_files(spark_session, paths):
parsed_dfs = []
for file_name in paths:
parsed_df = spark_session.read.format('csv').load(file_name)
parsed_dfs += [parsed_df]
output_df = union_many(*parsed_dfs, how="wide")
return output_df
@transform(
the_output=Output("my.awesome.output"),
the_input=Input("my.awesome.input"),
)
def my_compute_function(the_input, the_output, ctx):
session = ctx.spark_session
input_filesystem = the_input.filesystem()
hadoop_path = input_filesystem.hadoop_path
files = [hadoop_path + "/" + file_name.path for file_name in input_filesystem.ls()]
output_df = read_files(session, files)
the_output.write_dataframe(output_df)
请注意,union_many
动词会将您的架构堆叠在一起,因此如果您有许多具有不同架构的 many 文件,许多行将为空,因为它们只会存在于一个中文件。
如果您知道每个架构的公共字段,并且知道只有一列会更改文件之间的名称,您可以更改逻辑以重命名 parsed_df
中的列以协调架构。这取决于您希望对架构强制执行多少要求。
我还将包含一个与其他 response 相同的测试方法,以便您可以快速验证正确的解析行为。
【讨论】:
以上是关于如何将不同的模式应用于单个数据集中的 csvs?的主要内容,如果未能解决你的问题,请参考以下文章