在 apache 梁管道 Python 中跳过步骤

Posted

技术标签:

【中文标题】在 apache 梁管道 Python 中跳过步骤【英文标题】:Skipping step in an apache beam pipeline Python 【发布时间】:2020-12-04 00:24:30 【问题描述】:

所以我正在构建一个 apache 光束管道,并且在跳过 python SDK 中的其余步骤时遇到了一些麻烦。这是一个我遇到问题的简化示例:

import apache_beam as beam
import os 

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = API_KEY
def foo(message):
    pass

options = 
    'streaming': True


runner = 'DirectRunner'
opts = beam.pipeline.PipelineOptions(flags=[], **options)
with beam.Pipeline(runner, options=opts) as p:
    sub_message = (p | 'sub' >> beam.io.ReadFromPubSub(subscription=my_sub))
    result = (sub_message | 'foo' >> beam.Map(foo))
    result | 'print' >> beam.Map(print)

    job = p.run()
    if runner == 'DirectRunner':
        job.wait_until_finish()

所以根据这个:Apache Beam - skip pipeline step 在 Java 中如果我的函数没有返回任何内容,那么 apache_beam 应该跳过其余的步骤。如果我错了,请纠正我,但在 python 中这与返回 None 相同,所以我的 pass 可以替换为 return None 并且完全相同。但是当我使用passreturn None 运行此代码时,结果确实会进入下一步。也就是说,当它不应该打印任何东西时,它会继续打印None,因为它应该跳过所有后续步骤。任何帮助表示赞赏。

【问题讨论】:

【参考方案1】:

有趣的是,我一发布此内容,就在文档中找到了答案。所以看起来在我提供的链接中,我提供的等价物是像我一样使用 ParDo 而不是地图。所以真的应该是这样的:

import apache_beam as beam
import os 

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = credentials
class TestFn(beam.DoFn):
    def process(self, element):
        print('hi')
        pass

options = 
    'streaming': True


runner = 'DirectRunner'
opts = beam.pipeline.PipelineOptions(flags=[], **options)
with beam.Pipeline(runner, options=opts) as p:
    sub_message = (p | 'sub' >> beam.io.ReadFromPubSub(subscription=mysub))
    result = (sub_message | 'foo' >> beam.ParDo(TestFn()))
    result | 'print' >> beam.Map(print)

    job = p.run()
    if runner == 'DirectRunner':
        job.wait_until_finish()

【讨论】:

请注意,在上下文中使用 p 会自动调用 p.run().wait_until_finish(),因此这可能最终会提交您的管道两次。

以上是关于在 apache 梁管道 Python 中跳过步骤的主要内容,如果未能解决你的问题,请参考以下文章

在 Vim 中跳过撤消步骤

Maven 发布插件 - 在发布中跳过快照版本更新:准备步骤

Apache梁管道Java:记录未按顺序写入目标文件

Python 3.2 在 csv.DictReader 中跳过一行

在循环中跳过一组值(在数组中) - Python3

如何使用 Selenium 和 Python 在控制台中跳过调试日志