将 xcom 值传递给 Airflow 中的 JiraOperator

Posted

技术标签:

【中文标题】将 xcom 值传递给 Airflow 中的 JiraOperator【英文标题】:Passing xcom value to JiraOperator in Airflow 【发布时间】:2021-01-22 14:25:17 【问题描述】:
def f_get_value_via_xcom(**context):
    var = context['ti'].xcom_pull(dag_id='bash_execute', key='return_value')  
    
bash_execute = BashOperator(
    task_id='bash_execute',
    bash_command='some bash command'
    xcom_push=True,
    priority_weight=100,
    start_date = datetime(2021, 1, 21))
    
def setUp(self):
    configuration.load_test_config()
    args = 
         'owner': 'airflow',
         'start_date': DEFAULT_DATE
    
dag = DAG(dag_id='test_jira_operator',
          default_args=default_args,
          schedule_interval='15 9 * * *',
          dagrun_timeout=timedelta(seconds=120))
db.merge_conn(
    Connection(
        conn_id='jira username', conn_type='jira',
        host='jira server', port=443,
        extra='"verify": "False", "project": "AIRFLOW"'))          
        
        
jira_ticket_search_operator = JiraOperator(task_id='test_jira_operator',
               jira_conn_id='airflow_jira',
                                           jira_method="create_issue",
                                           python_callable=f_get_value_via_xcom,
                                           provide_context=True,
                                           jira_method_args=
                                                   'project': 'id': 10000,
                                                   'summary': 'New test issue',
                                                   'description': " ti.xcom_pull('php_thelia') ",
                                                   'issuetype': 'name': 'Bug',
                                           ,
                                           dag=dag)         
                                                  
                                                   
bash_execute >> jira_ticket_search_operator

我正在尝试创建一个 DAG,它创建一个新的 Jira 任务,其描述是 bash 命令的控制台输出,但我收到以下错误

*[2021-01-22 12:57:28,109] taskinstance.py:1152 ERROR - Object of type 'Issue' is not JSON serializable*
                 

我不明白为什么。我已经在我当前的气流脚本上方发布了。你能帮帮我吗。

【问题讨论】:

【参考方案1】:

这是因为从 Airflow 2.0 开始,出于安全原因,[core] enable_xcom_pickling 的默认值已更改为 False

XCom 消息的 pickle 类型已默认替换为 JSON,以防止 RCE 攻击。请注意,JSON 序列化比酸洗更严格,因此例如,如果您想通过 XCom 传递原始字节,则必须使用 base64 之类的编码对它们进行编码。如果您了解风险并仍想使用酸洗,请在 Airflow 配置的核心部分设置 enable_xcom_pickling = True

就像文档中提到的那样,如果您对风险感到满意并且知道没有人会渗透到您的环境中,您可以设置 enable_xcom_pickling = True 以使用 Pickling 来存储 XCom 而不是 JSON。然后你不会得到那个错误。 JSON 序列化不支持任意对象——这就是不支持Object of type Issue 的原因。

https://github.com/apache/airflow/blob/master/UPDATING.md#the-default-value-for-core-enable_xcom_pickling-has-been-changed-to-false

【讨论】:

以上是关于将 xcom 值传递给 Airflow 中的 JiraOperator的主要内容,如果未能解决你的问题,请参考以下文章

Airflow踩坑XCom大数据传递反序列化失败

Airflow踩坑XCom大数据传递反序列化失败

Airflow踩坑Xcom任务间数据传递反序列化失败

Airflow踩坑Xcom任务间数据传递反序列化失败

Apache Airflow 中的分布式日志记录

Airflow:如何从 Postgres Operator 推送 xcom 价值?