如何从气流传感器中提取 xcom 值?

Posted

技术标签:

【中文标题】如何从气流传感器中提取 xcom 值?【英文标题】:How can i pull xcom value from Airflow sensor? 【发布时间】:2021-04-20 22:13:14 【问题描述】:

主要问题:我正在尝试创建 BigQuery 表(如果不存在)。

方法:使用BigQueryTableSensor检查表是否存在,根据返回值,使用BigQueryCreateEmptyTableOperator创建新表。

问题:我无法使用 xcom 获取 BigQueryTableSensor 传感器的返回值。我们知道,poke 方法需要返回一个布尔值。

这就是我创建任务的方式:

check_if_table_exists = BigQueryTableSensor(
        task_id='check_if_table_exists',
        project_id='my_project',
        dataset_id='my_dataset',
        table_id='my_table',
        bigquery_conn_id='bigquery_default',
        timeout=120,
        do_xcom_push=True,
    )

# Output: INFO - Success criteria met. Exiting.

get_results = BashOperator(
        task_id='get_results',
        bash_command="echo  ti.xcom_pull(task_ids='check_if_table_exists') "
    )

# Output: INFO - Running command: echo None

查看 Airflow 界面,我检查了 BigQueryTableSensor 没有推送任何内容:(

问题:

有没有办法让我获得传感器的返回值?

有没有更好的方法来解决我的主要问题?也许使用 BigQueryOperator 和类似“CREATE TABLE IF NOT EXISTS”这样的 sql 查询。

【问题讨论】:

可能您可以将答案“是的,可能”更改为已接受的答案,我认为这将有助于更好的人找到问题标题的答案。 (当前接受的答案特定于 BigQueryTableSensor) 【参考方案1】:

是的,这是可能的,我让它像这样工作:

class MyCustomSensor(BaseSensorOperator):

    @apply_defaults
    def __init__(self,
                 *args,
                 **kwargs):
        super(MyCustomSensor, self).__init__(*args, **kwargs)

    def poke(self, context):
        application_id = context['ti'].xcom_pull(key='application_id')
        print("We found " + application_id)
        return True

这是一个完整的 DAG 示例:

import os
import sys
from datetime import datetime
from airflow import DAG, settings
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.base_sensor_operator import BaseSensorOperator
from airflow.utils.decorators import apply_defaults


dag = DAG('my_dag_name',
          description='DAG ',
          schedule_interval=None,
          start_date=datetime(2021, 1, 7),
          tags=["samples"],
          catchup=False)

class MyCustomSensor(BaseSensorOperator):

    @apply_defaults
    def __init__(self,
                 *args,
                 **kwargs):
        super(MyCustomSensor, self).__init__(*args, **kwargs)

    def poke(self, context):
        application_id = context['ti'].xcom_pull(key='application_id')
        print("We found " + application_id)
        return True


def launch_spark_job(**kwargs):
    application_id = "application_1613995447156_11473"
    kwargs['ti'].xcom_push(key='application_id', value=application_id)


launch_spark_job_op = PythonOperator(task_id='test_python',
                                     python_callable=launch_spark_job,
                                     provide_context=True,
                                     dag=dag)

wait_spark_job_sens = MyCustomSensor(task_id='wait_spark_job',
                                     dag=dag,
                                     mode="reschedule")

launch_spark_job_op >> wait_spark_job_sens

【讨论】:

什么冠军!!【参考方案2】:

这不是传感器的用例。 传感器使工作流程等待某些事情发生。在您的情况下,BigQueryTableSensor 将等到由其他进程创建的表,然后才会继续执行下游任务。

您正在寻找的是:

    使用BigQueryCheckOperator 运行返回布尔值的查询(如果表存在则为True,否则为False)然后您将能够从BashOperator 中的XCOM 中提取布尔值。 工作流所在的分支运算符(如:BranchSQLOperator) 基于检查表的 SQL 查询结果的分支 存在。在该选项中,无需使用 XCOM。

【讨论】:

【参考方案3】:

我知道其他答案很好,但我只想分享另一个解决方案,以防它可能对其他人有所帮助。我知道这个解决方案适用于 Apache Airflow 2.x.x,但不确定 1.x.x。

您可以在 PythonSensor 中使用 jinja 模板和“op_kwargs”参数(查看 this),因此 Python 脚本至少可以读取这些值。

例子:

waiting_sjs_job = PythonSensor(
    task_id="waiting_job",
    python_callable=waiting_job,
    mode="reschedule",
    poke_interval=60,
    timeout=60*60*2,
    op_kwargs='sjs_url': ' ti.xcom_pull(key="sjs_url") ', 'job_id': ' ti.xcom_pull(key="job_id") ',
)

def waiting_job(sjs_url, job_id):
   # those lines commented below are equivalent to the parameters passed to this function
   # ti = kwargs['ti']
   # sjs_url = ti.xcom_pull(key="sjs_url")
   # job_id = ti.xcom_pull(key="job_id")
   r = requests.get( sjs_url + '/jobs/' + job_id)
   json_data = r.json()
   if json_data['status'] != 'RUNNING':
      return True
   r.raise_for_status()
   return False

【讨论】:

以上是关于如何从气流传感器中提取 xcom 值?的主要内容,如果未能解决你的问题,请参考以下文章

气流测试模式 xcom 拉/推不工作

如何从 kinect 深度图像中获取深度强度,因为它表示像素到传感器的距离

从多条曲线中提取特征

无需配对即可通过蓝牙提取移动(android 和 IOS)传感器数据

如何从目标 C 更改 CBAdvertisementDataLocalNameKey 值?

如何提取学习的 ML 模型以实现不同的实现?