Apache Beam - 跳过管道步骤

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Beam - 跳过管道步骤相关的知识,希望对你有一定的参考价值。

我正在使用Apache Beam来建立一个由两个主要步骤组成的管道:

  • 使用Beam Transform转换数据
  • 将转换后的数据加载到BigQuery

管道设置如下所示:

myPCollection = (org.apache.beam.sdk.values.PCollection<myCollectionObjectType>)myInputPCollection
                .apply("do a parallel transform"),
                     ParDo.of(new MyTransformClassName.MyTransformFn()));

 myPCollection
    .apply("Load BigQuery data for PCollection",
            BigQueryIO.<myCollectionObjectType>write()
            .to(new MyDataLoadClass.MyFactTableDestination(myDestination))
            .withFormatFunction(new MyDataLoadClass.MySerializationFn())

我看过这个问题:

Apache Beam: Skipping steps in an already-constructed pipeline

这表明我可以以某种方式动态地改变我可以将数据传递到哪个输出,遵循步骤1中的并行变换。

我该怎么做呢?我不知道如何选择是否将myPCollection从第1步传递到第2步。如果来自第1步的myPCollection中的对象是null,我需要跳过第2步。

答案

当你在下一步中不想要它时,你就不会从MyTransformClassName.MyTransformFn中发出元素,例如:

class MyTransformClassName.MyTransformFn extends...
  @ProcessElement
  public void processElement(ProcessContext c, ...) {
    ...
    result = ...
    if (result != null) {
       c.output(result);   //only output something that's not null
    }
  }

这样,空值无法进入下一步。

有关详细信息,请参阅指南的ParDo部分:https://beam.apache.org/documentation/programming-guide/#pardo

以上是关于Apache Beam - 跳过管道步骤的主要内容,如果未能解决你的问题,请参考以下文章

数据流管道上的 Apache Beam StatusRuntimeException

如何组合两个结果并将其传递到 apache-beam 管道中的下一步

Apache Beam TextIO.ReadAll(),处理丢失的文件名?

Apache Beam实战指南 | 大数据管道(pipeline)设计及实践

Apache Beam - 即使程序连续执行,也会捕获并抛出异常。如何停止该进程或在管道中处理

使用 Python 处理 Apache Beam 管道中的异常