有没有办法在创建后编辑气流操作符?

Posted

技术标签:

【中文标题】有没有办法在创建后编辑气流操作符?【英文标题】:Is there any way to edit an airflow operator after creation? 【发布时间】:2019-04-04 11:54:49 【问题描述】:

我有一个 python 脚本,它基于映射每个所需选项的 JSON 文件动态创建任务(气流操作员)和 DAG。 该脚本还专门用于创建所需的任何运算符。 有时我想根据映射激活一些条件选项...例如在 bigqueryOperator 中有时我需要 time_partitioning 和 destination_table,但我不想在每个映射任务上都设置。

我尝试阅读有关 BaseOperator 的文档,但看不到任何类似 java 的 set 方法。

返回运算符的函数,例如 bigQuery 一个

def bqOperator(mappedTask):
    try:
        return BigQueryOperator(
        task_id=mappedTask.get('task_id'),
        sql=mappedTask.get('sql'),  
##destination_dataset_table=project+'.'+dataset+'.'+mappedTask.get('target'),
        write_disposition=mappedTask.get('write_disposition'),
        allow_large_results=mappedTask.get('allow_large_results'),
        ##time_partitioning=mappedTask.get('time_partitioning'),
        use_legacy_sql=mappedTask.get('use_legacy_sql'),
        dag=dag,
        )
    except Exception as e:
        error = 'Error creating BigQueryOperator for task : ' + mappedTask.get('task_id')
        logger.error(error)
        raise Exception(error) 

没有分区的json文件中的mappedTask

        
            "task_id": "TEST_TASK_ID",
            "sql": "some fancy query",
            "type": "bqOperator",
            "dependencies": [],
            "write_disposition": "WRITE_APPEND",
            "allow_large_results": true,
            "createDisposition": "CREATE_IF_NEEDED",
            "use_legacy_sql": false
        ,

带有分区的json文件中的mappedTask

        
            "task_id": "TEST_TASK_ID_PARTITION",
            "sql": "some fancy query",
            "type": "bqOperator",
            "dependencies": [],
            "write_disposition": "WRITE_APPEND",
            "allow_large_results": true,
            "createDisposition": "CREATE_IF_NEEDED",
            "use_legacy_sql": false,
                        "targetTable": "TARGET_TABLE",
            "time_partitioning": 
                "field": "DATE_TO_PART",
                "type": "DAY"
            
        ,

【问题讨论】:

【参考方案1】:

如下更改bqOperator 来处理这种情况,基本上当它在您的json 中找不到该字段时它会传递None:

def bqOperator(mappedTask):
    try:
        return BigQueryOperator(
        task_id=mappedTask.get('task_id'),
        sql=mappedTask.get('sql'),  
        destination_dataset_table="..".format(project, dataset, mappedTask.get('target')) if mappedTask.get('target', None)  else None,
        write_disposition=mappedTask.get('write_disposition'),
        allow_large_results=mappedTask.get('allow_large_results'),
        time_partitioning=mappedTask.get('time_partitioning', None),
        use_legacy_sql=mappedTask.get('use_legacy_sql'),
        dag=dag,
        )
    except Exception as e:
        error = 'Error creating BigQueryOperator for task : ' + mappedTask.get('task_id')
        logger.error(error)
        raise Exception(error) 

【讨论】:

感谢您的回答,它给了我一些我直到现在才得到的pythonic语法,并且可能解决了这个问题,我会试试这个!【参考方案2】:

python中没有私有方法或字段,所以可以直接设置和获取字段,如

op.use_legacy_sql = True

鉴于我强烈反对这样做,因为这是一种真正的代码味道。相反,您可以修改您的工厂类以将一些默认值应用于您的 json 数据。 或者更好的是,在 json 本身上应用默认值。比保存并使用更新的 json。这将使事情更加可预测。

【讨论】:

感谢您的回答,它给了我要考虑的语法,但我想我会遵循 kaxil 的回答,因为我不想让映射 json 太难编辑。最终目标是为非开发人员提供一个简单的工具,以便在此作曲家流程中创建新任务。谢谢。

以上是关于有没有办法在创建后编辑气流操作符?的主要内容,如果未能解决你的问题,请参考以下文章

以编程方式在 dockerized apache 气流 python 操作符内创建 SSH 隧道

气流 DAG EMR EmrCreateJobFlowOperator 不执行任何操作

有没有办法确定 Sonata\AdminBundle\Admin\Admin::configureFormFields() 中的当前操作(创建或编辑)?

气流 SSH 操作错误:[Errno 2] 没有这样的文件或目录:

Bash 操作员错误:气流中没有这样的文件或目录

气流:如何删除 DAG?