自定义气流运算符以在 Salesforce 表中运行查询

Posted

技术标签:

【中文标题】自定义气流运算符以在 Salesforce 表中运行查询【英文标题】:Custom airflow operator to run query in Salesforce table 【发布时间】:2021-02-12 11:16:33 【问题描述】:

我是气流新手,我想安排一个作业,其中来自不同数据库记录计数的两个表必须检查它是否匹配。一种来源是 GCP,另一种来源是 Salesforce。

所以我找到了 BigQueryOperator 在 GCP 端点击查询并返回 Count 结果,但我找不到任何看起来像 SalesforceQueryOperator 的运算符,我可以在 Airflow 任务中分配。

所以基本上,我说的是我们可以用来带来计数结果的这个:

t1 = BigQueryOperator(
        task_id='bigquery_test',
        bql='SELECT COUNT(userId) FROM [events:EVENTS_20160501]',
        destination_dataset_table=False,
        bigquery_conn_id='bigquery_default',             
        google_cloud_storage_conn_id='bigquery_default',
        delegate_to=False,
        udf_config=False,
        dag=dag,
 )

我知道我们可以创建一个函数、导入库、创建与 Salesforce 的连接并运行查询以获取 Count 结果,但我不想遵循下面给出的这种方法(代码的一部分),我已经试过了。

def salesforcequery_count():
from simple_salesforce import Salesforce
import requests

session = requests.Session()
# manipulate the session instance (optional)
sf = Salesforce(
   username='user@example.com', password='password', organizationId='OrgId',
   session=session)
   count_record = sf.query("SELECT count(id) FROM Contact")
//   for row in data:
//   process(row)
    return 'count_record'

我想创建一个看起来像 SalesforceQueryOperator 并且应该像 BigQueryOperator 一样工作的自定义运算符,以在 Salesforce 表中点击查询并带来结果。

这是参考:https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html

任何帮助将不胜感激。

【问题讨论】:

你有什么问题? 我的问题是如何在 Airflow 中创建一个可以连接 Salesforce 表以运行查询的自定义运算符?我想将此操作员分配给 Airflow 中的任务。 【参考方案1】:

您可以使用现有的SalesforceHook 创建自己的自定义运算符。

这是一个例子:

from airflow.contrib.hooks.salesforce_hook import SalesforceHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults


class SalesforceQueryOperator(BaseOperator):
    """
    Make a query against Salesforce
    Return result as dict.
    """
    template_fields = ("query",)

    @apply_defaults
    def __init__(self,
                 conn_id,
                 query=None,
                 *args,
                 **kwargs
                 ):
        super(SalesforceQueryOperator, self).__init__(*args, **kwargs)

        self.conn_id = conn_id
        self.query = query

    def execute(self, context):
        sf_hook = SalesforceHook(conn_id=self.conn_id)

        results = sf_hook.make_query(self.query)

        return results

然后在你的 DAG 中使用它:

t2 = SalesforceQueryOperator(
        task_id='salesforce_test',
        query='SELECT count(id) FROM Contact',
        conn_id='salesforce_default',             
        dag=dag,
 )

salesforce_default 是您在 AirFlow 中添加的连接。您可以在此处查看如何添加它:Salesforce Connection

【讨论】:

以上是关于自定义气流运算符以在 Salesforce 表中运行查询的主要内容,如果未能解决你的问题,请参考以下文章

java中运算符优先级

Cometd/bayeux 客户端 + Salesforce 流 API 问题

sales force看不到审批状态

JavaScript中运算符的优先级

无法在 C++ 中重载流提取运算符 (>>)

如何构建包含 ListView.Builder 的自定义行,以在颤动中构建 ElevatedButton?