如何将数据帧传递到气流任务的临时表中

Posted

技术标签:

【中文标题】如何将数据帧传递到气流任务的临时表中【英文标题】:How to pass dataframes into temp tables for airflow tasks 【发布时间】:2021-11-21 14:39:06 【问题描述】:

所以我有一段代码在 python 中具有多个函数,用于我们开发的工作流。此工作流采用 CSV 并将其通过数据框。然后,此数据帧通过多个函数传递,这些函数对数据帧应用各种转换。

但是,当将此代码写入气流环境时,由于数据帧在虚拟环境中的工作方式以及数据跨多台机器运行,我将无法通过每个函数传递我的数据帧,并且必须存储它们什么地方?

有谁知道如何在 bigquery 中设置一个临时表,以便为我的每个函数传递一个数据框,以便我可以使用气流任务为它们运行我的 ETL?

【问题讨论】:

【参考方案1】:

如果您正在寻找从数据帧输入开始的 Airflow 任务,那么您使用它是错误的。如果您想将脚本作为一个单元执行,您可以使用PythonOperatorBashOperator,但是如果您想将代码分解为多个任务,您可能需要进行一些重构。

要从GCS 上的csv 创建BigQuery 外部表,您可以将external_table 中的external_table 设置为:

from airflow.providers.google.cloud.transfers.gcs_to_bigquery import GCSToBigQueryOperator
load_csv = GCSToBigQueryOperator(
        task_id='gcs_to_bigquery_example',
        bucket='cloud-samples-data',
        source_objects=['bigquery/us-states/us-states.csv'],
        destination_project_dataset_table=f"DATASET_NAME.TABLE_NAME",
        schema_fields=[
            'name': 'name', 'type': 'STRING', 'mode': 'NULLABLE',
            'name': 'post_abbr', 'type': 'STRING', 'mode': 'NULLABLE',
        ],
        write_disposition='WRITE_TRUNCATE',
        external_table=True,
    )

我不知道您的工作流程中数据框的功能是什么(我假设它是对csv 进行某种转换),因为您可以使用GCSFileTransformOperator(请参阅source code)。此操作符将数据从源 GCS 位置复制到本地文件系统上的临时位置。按照指定的对此文件运行转换 转换脚本并将输出上传到目标存储桶。如果未指定输出存储桶,则会覆盖原始文件。

所以您的工作流程可能是:

    GCS 中的文件登陆 运行GCSFileTransformOperator 处理和清理记录。 在 BigQuery 中使用GCSToBigQueryOperator 创建一个表

【讨论】:

所以这是可行的,但它是否是与 csv 转换一样的最佳解决方案,每个函数,列被删除,列名被更改,列被添加和删除等. 并且附加到具有列参数的数据框表的更改可能会导致在上传到大查询时出现问题,因为架构总是在不断变化。 @MizanurChoudhury 这取决于您的特定 ETL。在将原始 csv 转储到 GCS 之前,您还可以清理所有内容。这更多的是您可以控制哪个组件以及可以在哪里进行更改的问题。不过,这些都是大问题 - 这是整个流程的设计,超出了 *** 问题的范围。

以上是关于如何将数据帧传递到气流任务的临时表中的主要内容,如果未能解决你的问题,请参考以下文章

具有动态结果的存储过程到临时表中

如何将数据设置为临时表中的变量?

如何将临时表作为参数传递到单独的存储过程中

TSQL 将数据库 BLOB 提取到临时表中

需要将数据全部插入到临时表中

Azure 数据工厂 - 查找值以在 ForEach 复制任务中选择语句