自定义气流运算符以在 Salesforce 表中运行查询
Posted
技术标签:
【中文标题】自定义气流运算符以在 Salesforce 表中运行查询【英文标题】:Custom airflow operator to run query in Salesforce table 【发布时间】:2021-02-12 11:16:33 【问题描述】:我是气流新手,我想安排一个作业,其中来自不同数据库记录计数的两个表必须检查它是否匹配。一种来源是 GCP,另一种来源是 Salesforce。
所以我找到了 BigQueryOperator
在 GCP 端点击查询并返回 Count 结果,但我找不到任何看起来像 SalesforceQueryOperator
的运算符,我可以在 Airflow 任务中分配。
所以基本上,我说的是我们可以用来带来计数结果的这个:
t1 = BigQueryOperator(
task_id='bigquery_test',
bql='SELECT COUNT(userId) FROM [events:EVENTS_20160501]',
destination_dataset_table=False,
bigquery_conn_id='bigquery_default',
google_cloud_storage_conn_id='bigquery_default',
delegate_to=False,
udf_config=False,
dag=dag,
)
我知道我们可以创建一个函数、导入库、创建与 Salesforce 的连接并运行查询以获取 Count 结果,但我不想遵循下面给出的这种方法(代码的一部分),我已经试过了。
def salesforcequery_count():
from simple_salesforce import Salesforce
import requests
session = requests.Session()
# manipulate the session instance (optional)
sf = Salesforce(
username='user@example.com', password='password', organizationId='OrgId',
session=session)
count_record = sf.query("SELECT count(id) FROM Contact")
// for row in data:
// process(row)
return 'count_record'
我想创建一个看起来像 SalesforceQueryOperator
并且应该像 BigQueryOperator
一样工作的自定义运算符,以在 Salesforce 表中点击查询并带来结果。
这是参考:https://airflow.apache.org/docs/apache-airflow/stable/howto/custom-operator.html
任何帮助将不胜感激。
【问题讨论】:
你有什么问题? 我的问题是如何在 Airflow 中创建一个可以连接 Salesforce 表以运行查询的自定义运算符?我想将此操作员分配给 Airflow 中的任务。 【参考方案1】:您可以使用现有的SalesforceHook 创建自己的自定义运算符。
这是一个例子:
from airflow.contrib.hooks.salesforce_hook import SalesforceHook
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class SalesforceQueryOperator(BaseOperator):
"""
Make a query against Salesforce
Return result as dict.
"""
template_fields = ("query",)
@apply_defaults
def __init__(self,
conn_id,
query=None,
*args,
**kwargs
):
super(SalesforceQueryOperator, self).__init__(*args, **kwargs)
self.conn_id = conn_id
self.query = query
def execute(self, context):
sf_hook = SalesforceHook(conn_id=self.conn_id)
results = sf_hook.make_query(self.query)
return results
然后在你的 DAG 中使用它:
t2 = SalesforceQueryOperator(
task_id='salesforce_test',
query='SELECT count(id) FROM Contact',
conn_id='salesforce_default',
dag=dag,
)
salesforce_default
是您在 AirFlow 中添加的连接。您可以在此处查看如何添加它:Salesforce Connection
【讨论】:
以上是关于自定义气流运算符以在 Salesforce 表中运行查询的主要内容,如果未能解决你的问题,请参考以下文章