气流 - 脚本更改文件名变量

Posted

技术标签:

【中文标题】气流 - 脚本更改文件名变量【英文标题】:Airflow - Script changes the filename variable 【发布时间】:2019-06-24 14:51:42 【问题描述】:

我在气流中创建了一个进程,我需要每 10 分钟从 SQL Server 数据库导出一个新文件并播放到 BigQuery!生成的文件是一个 csv,它自动包含带有 YYYYMMDDHHMMSS 格式的处理日期的文件名。

当我从第 1 步(导出)到第 2 步(在 BigQuery 中插入)时,气流中继再次每个脚本都会更改文件名变量名,并且处理日期与第 1 步不同!

示例: 第一步:test_20190624113656.csv 第二步:test_20190624113705.csv

在这种情况下,我想在第一步中保留文件名。

nm_arquivo = 'test_' + datetime.today().strftime('%Y%m%d%H%M%S') + '.csv'

def insert_bigquery(ds, **kwargs):
    bigquery_client = bigquery.Client(project="project_name")
    dataset_ref = bigquery_client.dataset('test_dataset')
    job_config = bigquery.LoadJobConfig()
    job_config.schema = [
        bigquery.SchemaField('id','INTEGER',mode='REQUIRED'),
        bigquery.SchemaField('sigla','STRING',mode='REQUIRED'),
        bigquery.SchemaField('nome_en','STRING',mode='REQUIRED'),
        bigquery.SchemaField('nome_pt','STRING',mode='REQUIRED'),
    ]   
    job_config.source_format = bigquery.SourceFormat.CSV
    time_partitioning = bigquery.table.TimePartitioning()
    job_config.time_partitioning = time_partitioning
    job_config.clustering_fields = ["id", "sigla"]
    uri = "gs://bucket_name/"+nm_arquivo
    load_job = bigquery_client.load_table_from_uri(
        uri,
        dataset_ref.table('bdb'),
        job_config=job_config
        )
    print('Starting job '.format(load_job.job_id))
    load_job.result()
    print('Job finished.')

#step1      
import_orders_op = MsSqlToGoogleCloudStorageOperator(
    task_id='import_orders',
    mssql_conn_id='mssql_conn',
    google_cloud_storage_conn_id='gcp_conn',
    sql="""select * from bdb""",
    bucket='bucket_name',
    filename=nm_arquivo,
    dag=dag) 

#step2
run_this = PythonOperator(
    task_id='insert_bigquery',
    provide_context=True,
    python_callable=insert_bigquery,
    dag=dag,
)

run_this.set_upstream(import_orders_op)

【问题讨论】:

你能分享你的 insert_bigquery 函数吗?我认为您不应该使用 datetime.today().strftime('%Y%m%d%H%M%S') 而是使用 ds_nodash 变量,因为它将是常量 @GeorgesLorré,我编辑了代码... 您可以在两个步骤中尝试使用:filename='test_ ts_nodash .csv' 吗?有关宏的更多信息here 【参考方案1】:

您应该使用 DAG 的执行时间。

您可以使用 ts_nodash Airflow 宏。它格式化execution_date.isoformat()(例如:2018-01-01T00:00:00+00:00)以删除-:,例如:20180101T000000。此宏可用于任何模板化参数。

有关更多信息和所有其他可用变量的列表:

https://airflow.apache.org/1.10.3/macros.html#default-variables 气流宏 - https://airflow.apache.org/1.10.3/macros.html

【讨论】:

函数第一步被识别了,但是在PythonOperator的第二步却没有识别到​​这个宏,又失败了! 我只改变了这个:nm_arquivo = 'test_ ts_nodash .csv' 但在第二步中,函数试图找到 test_ ts_nodash .csv 而不是文件 test_20180101T000000.csv 我相信只有在 PythonOperator 中不起作用,但在其他功能中气流工作没有问题! @FelipeFB 你可以在 PythonOperator 中使用 kwargs["ts_nodash"]【参考方案2】:

您可以使用文件来存储文件名:

import pickle

nm_arquivo = 'test_' + datetime.today().strftime('%Y%m%d%H%M%S') + '.csv'

#step 1
with open('filename.pickle', 'wb') as handle:
    pickle.dump(nm_arquivo, handle)

#step 2
with open('filename.pickle', 'rb') as handle:
    nm_arquivo = pickle.load(handle)

【讨论】:

我想我不能将它插入到 Airflow 函数中。

以上是关于气流 - 脚本更改文件名变量的主要内容,如果未能解决你的问题,请参考以下文章

Bash 操作员错误:气流中没有这样的文件或目录

有没有办法在创建后编辑气流操作符?

使用 bash 脚本编辑 .tf 变量文件

使用 Apache 气流存储和访问密码

bigqueryoperator 气流上的 Bigquery 脚本

将 python 脚本转换为气流 dag