如何将不同的模式应用于单个数据集中的 csvs?

Posted

技术标签:

【中文标题】如何将不同的模式应用于单个数据集中的 csvs?【英文标题】:How to apply different schemas to csvs within a single dataset? 【发布时间】:2021-12-14 03:38:38 【问题描述】:

我从一个大的 csv 压缩文件开始,我在 Palantir Foundry 中解压缩了它。

我现在有一个由多个 csv 组成的数据集(每年一个),其中 csv 几乎是相同的架构,但有一些差异。如何将架构单独应用于每个 csv 或规范化它们之间的架构?

【问题讨论】:

【参考方案1】:

如果您的文件已解压缩并且只是在数据集中以.csvs 的形式存在,您可以使用 Spark 的原生 spark_session.read.csv 方法,类似于我对 here 的回答。

这将如下所示:

from transforms.api import transform, Output, Input
from transforms.verbs.dataframes import union_many


def read_files(spark_session, paths):
    parsed_dfs = []
    for file_name in paths:
        parsed_df = spark_session.read.format('csv').load(file_name)
        parsed_dfs += [parsed_df]
    output_df = union_many(*parsed_dfs, how="wide")
    return output_df


@transform(
    the_output=Output("my.awesome.output"),
    the_input=Input("my.awesome.input"),
)
def my_compute_function(the_input, the_output, ctx):
    session = ctx.spark_session
    input_filesystem = the_input.filesystem()
    hadoop_path = input_filesystem.hadoop_path
    files = [hadoop_path + "/" + file_name.path for file_name in input_filesystem.ls()]
    output_df = read_files(session, files)
    the_output.write_dataframe(output_df)

请注意,union_many 动词会将您的架构堆叠在一起,因此如果您有许多具有不同架构的 many 文件,许多行将为空,因为它们只会存在于一个中文件。

如果您知道每个架构的公共字段,并且知道只有一列会更改文件之间的名称,您可以更改逻辑以重命名 parsed_df 中的列以协调架构。这取决于您希望对架构强制执行多少要求。

我还将包含一个与其他 response 相同的测试方法,以便您可以快速验证正确的解析行为。

【讨论】:

以上是关于如何将不同的模式应用于单个数据集中的 csvs?的主要内容,如果未能解决你的问题,请参考以下文章

如何在单个 iOS 项目/应用程序中为发布和调试模式(测试和生产环境)制作不同的 Firebase 数据库?

如何将字典键应用于值项目到火花数据集中的列?

Bigquery - 为每个 id 添加完整的日期范围

如何识别字符串数据集中的文本模板模式?

SQL:利用标准表,将不同类型的标准应用于不同的字段

如何制作将相同命令应用于子目录的单个makefile?