如何使用云功能触发数据流? (Python SDK)

Posted

技术标签:

【中文标题】如何使用云功能触发数据流? (Python SDK)【英文标题】:How to trigger a dataflow with a cloud function? (Python SDK) 【发布时间】:2020-02-23 13:38:05 【问题描述】:

我有一个由云 Pub/Sub 触发的云功能。我想要使​​用 Python SDK 的相同函数触发数据流。这是我的代码:

import base64
def hello_pubsub(event, context):   
    if 'data' in event:
        message = base64.b64decode(event['data']).decode('utf-8')
    else:
        message = 'hello world!'
    print('Message of pubsub : '.format(message))

我是这样部署函数的:

gcloud beta functions deploy hello_pubsub  --runtime python37 --trigger-topic topic1

【问题讨论】:

数据流管道的代码在哪里?在模板中?在外部文件中? 在 python 文件中。我在谷歌外壳上工作 【参考方案1】:

您必须将管道 python 代码嵌入到您的函数中。当你的函数被调用时,你只需调用管道 python 主函数,它会在你的文件中执行管道。

如果您在 Cloud Shell 中开发并试用了您的管道,并且您已经在 Dataflow 管道中运行了它,那么您的代码应该具有以下结构:

def run(argv=None, save_main_session=True):
  # Parse argument
  # Set options
  # Start Pipeline in p variable
  # Perform your transform in Pipeline
  # Run your Pipeline
  result = p.run()
  # Wait the end of the pipeline
  result.wait_until_finish()

因此,使用正确的参数(尤其是 runner=DataflowRunner)调用此函数,以允许 python 代码在 Dataflow 服务中加载管道。

删除最后的result.wait_until_finish(),因为您的函数不会长期存在所有数据流过程。

如果你愿意,你也可以使用模板。

【讨论】:

我在 DirectRunner 上运行它。实际上我有多个 python 文件,每个文件都包含不同的管道。我想编写一个每次流水线运行的云函数。我还尝试对以这种方式执行管道的命令行进行子处理: tmp = subprocess.run(["python", "./defaultTrigger.py --network 'test' --input_topic 'projects/... ../suscriptions/sub1' -output_topic 'projects/.../topics/topic2'"]) print(tmp) 它不起作用。我会试试你的解决方案,非常感谢 1/2 @Rim,注意不要混淆。 Dataflow 是一个平台,Beam 是一个框架。当您在 DirectRunner 中运行 Beam 管道时,您不会使用 Dataflow 平台来执行它,而是使用当前系统(在您的情况下是您的功能环境)来运行 Beam 代码。不建议这样做,除非您为 Beam 管道设置 2Gb 的内存以获得最高的 CPU 功率。 2/2 另一部分是您希望在 Function 中执行子进程并在此子进程中调用“python”。请记住,您处于无服务器架构中,您不知道底层服务器、操作系统和平台是什么。如果调用被接受,执行子进程 python 可能会导致意外的事情(python 2 或 3?哪些依赖项?...)。实际上,您需要为请求处理时间付费。如果你 fork 进程并运行后台线程,那么计费是不公平的。这就是为什么禁止进行这种操作的原因。我建议您重新设计您的应用程序。【参考方案2】:

您可以使用Cloud Dataflow templates 启动您的工作。您将需要编写以下步骤:

检索凭据 生成数据流服务实例 获取 GCP PROJECT_ID 生成模板正文 执行模板

这是一个使用您的基本代码的示例(请随意拆分为多个方法以减少 hello_pubsub 方法中的代码)。

from googleapiclient.discovery import build
import base64
import google.auth
import os

def hello_pubsub(event, context):   
    if 'data' in event:
        message = base64.b64decode(event['data']).decode('utf-8')
    else:
        message = 'hello world!'

    credentials, _ = google.auth.default()
    service = build('dataflow', 'v1b3', credentials=credentials)
    gcp_project = os.environ["GCLOUD_PROJECT"]

    template_path = gs://template_file_path_on_storage/
    template_body = 
        "parameters": 
            "keyA": "valueA",
            "keyB": "valueB",
        ,
        "environment": 
            "envVariable": "value"
        
    

    request = service.projects().templates().launch(projectId=gcp_project, gcsPath=template_path, body=template_body)
    response = request.execute()

    print(response)

在 template_body 变量中,参数值是将发送到您的管道的参数,环境值由 Dataflow 服务(服务帐户、工作人员和网络配置)使用。

LaunchTemplateParameters documentation

RuntimeEnvironment documentation

【讨论】:

模板正文中的参数将是运行数据流所需的参数。正确的?如果数据流作业将输入作为 GCS 位置,那么在参数中它应该是 "input": "GCS loacation" 。对吗? @PriyaAgarwal 是的,你是对的。通过模板发送的参数需要使用 ValueProvider 接口在作业中检索。这家伙允许管道接受运行时参数。 beam.apache.org/releases/javadoc/2.19.0/org/apache/beam/sdk/…beam.apache.org/releases/pydoc/2.19.0/… 快速提问——是否每次触发云功能时都会创建一个新的数据流作业? @MineshBarot 是的

以上是关于如何使用云功能触发数据流? (Python SDK)的主要内容,如果未能解决你的问题,请参考以下文章

使用 pubsub 推送触发器运行云功能

如何使用云功能或某些自动触发器将视图中的数据插入表中。

如何从事件触发的云功能中检索 delivery_attempt?

使用IBM云功能的BOX SDK

具有实时数据库触发器的 Firebase 云功能:如何更新源节点?

是否可以通过 Pub/Sub 主题上的消息子集触发云功能?