从云函数触发数据流管道时,无法从主类执行超过 1 个函数

Posted

技术标签:

【中文标题】从云函数触发数据流管道时,无法从主类执行超过 1 个函数【英文标题】:Unable to execute more than 1 function from main class while triggering dataflow pipeline from cloud function 【发布时间】:2019-08-07 13:26:19 【问题描述】:

由于我在 WriteToBigQuery 操作之后执行了一些任务,因此我编写了两个单独的函数并在主类中执行它们以按顺序运行我的数据流管道。我已经在云存储上创建了相同的模板,并尝试从云功能触发这个自定义管道模板,但它直接执行第二个功能,而不是第一个。

这是我的主要课程:

if __name__ == '__main__':
    print "Starting Dataflow Pipeline"
    writetobq()
    writetocsv()

部分云功能代码:

BODY = 
        "jobName": "jobname".format(jobname=JOBNAME),
        "parameters": 
        ,
        "environment": 
            "tempLocation": "gs://bucket/temp".format(bucket=BUCKET),
            "zone": "europe-west1-b"
        
    
request = service.projects().templates().launch(projectId=PROJECT, gcsPath=GCSPATH, body=BODY)
response = request.execute()

触发此管道时我没有收到任何错误,但它只执行 writetocsv(),我希望它首先执行 writetobq(),然后执行 writetocsv()。如果我在本地运行这个管道代码,那么它会给我预期的输出。

【问题讨论】:

【参考方案1】:

数据流模板(我假设这就是您所说的模板)不执行您的主类。它只存储一个 Dataflow 管道并执行该管道。因此,如果您需要将第二个函数作为模板的一部分执行,它必须是管道的一部分(例如,ParDo 步骤)。

在您的情况下,您似乎需要遵循 WriteToBigQuery'. Unfortunately this is not possible today sinceWriteToBigQuery` 步骤的内容,但不会返回结果。

您能否在启动模板的代码中添加执行第二个函数(作为本地执行或第二个 Dataflow 作业)?

【讨论】:

请尝试引用此网址:***.com/questions/46992455/… 在我的数据流模板中,这就是我正在做的,就像在主类中有 2 个单独的函数一样。 def writetobq(): table_schema = 'schema' JOB_NAME = 'test' pipeline_options = 我的管道选项 options = PipelineOptions.from_dictionary(pipeline_options) interaction_query = """myquery""" 与梁。管道(选项=选项)作为 p:行 = p | '从 BQ 读取' >> beam.io.Read(beam.io.BigQuerySource(query=interaction_query,use_standard_sql=True)) final_data = (rows | "写入 BQ" >> beam.io.WriteToBigQuery(table=processedDetails, schema=table_schema,project=project, dataset=dataset_id,write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)) def writetocsv(): table_schema = 'schema' JOB_NAME = 'test' pipeline_options = options options = PipelineOptions.from_dictionary(pipeline_options) del_query = """delquery""" with beam.Pipeline( options=options) as p: delete_rows = p | '读取' >> beam.io.Read(beam.io.BigQuerySource(query=delete_query,use_standard_sql=True)) required_data = delete_rows | 'Retriving' >> beam.ParDo(RequiredData()) data_conversion = required_data | '转换' >> beam.ParDo(DictToCSV()) write_to_csv = data_conversion | “写”>> beam.io.WriteToText(文件)

以上是关于从云函数触发数据流管道时,无法从主类执行超过 1 个函数的主要内容,如果未能解决你的问题,请参考以下文章

参数 5:无法从 'System.Drawing.Image' 转换为 'string' - 从主类调用 class1

如何从云函数内部运行查询?

从主类访问数组

无法从云形成 yaml 中的条件函数返回整数

Insert 是一个变量,但用作方法 - 从主类调用 class1 [关闭]

由于无法加载主类错误而无法运行 JAR 文件