如何使用云功能触发数据流? (Python SDK)
Posted
技术标签:
【中文标题】如何使用云功能触发数据流? (Python SDK)【英文标题】:How to trigger a dataflow with a cloud function? (Python SDK) 【发布时间】:2020-02-23 13:38:05 【问题描述】:我有一个由云 Pub/Sub 触发的云功能。我想要使用 Python SDK 的相同函数触发数据流。这是我的代码:
import base64
def hello_pubsub(event, context):
if 'data' in event:
message = base64.b64decode(event['data']).decode('utf-8')
else:
message = 'hello world!'
print('Message of pubsub : '.format(message))
我是这样部署函数的:
gcloud beta functions deploy hello_pubsub --runtime python37 --trigger-topic topic1
【问题讨论】:
数据流管道的代码在哪里?在模板中?在外部文件中? 在 python 文件中。我在谷歌外壳上工作 【参考方案1】:您必须将管道 python 代码嵌入到您的函数中。当你的函数被调用时,你只需调用管道 python 主函数,它会在你的文件中执行管道。
如果您在 Cloud Shell 中开发并试用了您的管道,并且您已经在 Dataflow 管道中运行了它,那么您的代码应该具有以下结构:
def run(argv=None, save_main_session=True):
# Parse argument
# Set options
# Start Pipeline in p variable
# Perform your transform in Pipeline
# Run your Pipeline
result = p.run()
# Wait the end of the pipeline
result.wait_until_finish()
因此,使用正确的参数(尤其是 runner=DataflowRunner
)调用此函数,以允许 python 代码在 Dataflow 服务中加载管道。
删除最后的result.wait_until_finish()
,因为您的函数不会长期存在所有数据流过程。
如果你愿意,你也可以使用模板。
【讨论】:
我在 DirectRunner 上运行它。实际上我有多个 python 文件,每个文件都包含不同的管道。我想编写一个每次流水线运行的云函数。我还尝试对以这种方式执行管道的命令行进行子处理: tmp = subprocess.run(["python", "./defaultTrigger.py --network 'test' --input_topic 'projects/... ../suscriptions/sub1' -output_topic 'projects/.../topics/topic2'"]) print(tmp) 它不起作用。我会试试你的解决方案,非常感谢 1/2 @Rim,注意不要混淆。 Dataflow 是一个平台,Beam 是一个框架。当您在 DirectRunner 中运行 Beam 管道时,您不会使用 Dataflow 平台来执行它,而是使用当前系统(在您的情况下是您的功能环境)来运行 Beam 代码。不建议这样做,除非您为 Beam 管道设置 2Gb 的内存以获得最高的 CPU 功率。 2/2 另一部分是您希望在 Function 中执行子进程并在此子进程中调用“python”。请记住,您处于无服务器架构中,您不知道底层服务器、操作系统和平台是什么。如果调用被接受,执行子进程 python 可能会导致意外的事情(python 2 或 3?哪些依赖项?...)。实际上,您需要为请求处理时间付费。如果你 fork 进程并运行后台线程,那么计费是不公平的。这就是为什么禁止进行这种操作的原因。我建议您重新设计您的应用程序。【参考方案2】:您可以使用Cloud Dataflow templates 启动您的工作。您将需要编写以下步骤:
检索凭据 生成数据流服务实例 获取 GCP PROJECT_ID 生成模板正文 执行模板这是一个使用您的基本代码的示例(请随意拆分为多个方法以减少 hello_pubsub 方法中的代码)。
from googleapiclient.discovery import build
import base64
import google.auth
import os
def hello_pubsub(event, context):
if 'data' in event:
message = base64.b64decode(event['data']).decode('utf-8')
else:
message = 'hello world!'
credentials, _ = google.auth.default()
service = build('dataflow', 'v1b3', credentials=credentials)
gcp_project = os.environ["GCLOUD_PROJECT"]
template_path = gs://template_file_path_on_storage/
template_body =
"parameters":
"keyA": "valueA",
"keyB": "valueB",
,
"environment":
"envVariable": "value"
request = service.projects().templates().launch(projectId=gcp_project, gcsPath=template_path, body=template_body)
response = request.execute()
print(response)
在 template_body 变量中,参数值是将发送到您的管道的参数,环境值由 Dataflow 服务(服务帐户、工作人员和网络配置)使用。
LaunchTemplateParameters documentation
RuntimeEnvironment documentation
【讨论】:
模板正文中的参数将是运行数据流所需的参数。正确的?如果数据流作业将输入作为 GCS 位置,那么在参数中它应该是 "input": "GCS loacation" 。对吗? @PriyaAgarwal 是的,你是对的。通过模板发送的参数需要使用 ValueProvider 接口在作业中检索。这家伙允许管道接受运行时参数。 beam.apache.org/releases/javadoc/2.19.0/org/apache/beam/sdk/…beam.apache.org/releases/pydoc/2.19.0/… 快速提问——是否每次触发云功能时都会创建一个新的数据流作业? @MineshBarot 是的以上是关于如何使用云功能触发数据流? (Python SDK)的主要内容,如果未能解决你的问题,请参考以下文章
如何从事件触发的云功能中检索 delivery_attempt?