在 ParDo 中访问 sideinput

Posted

技术标签:

【中文标题】在 ParDo 中访问 sideinput【英文标题】:Access sideinput inside ParDo 【发布时间】:2022-01-24 03:27:43 【问题描述】:

我是 apache beam 的新手,我正在对我们的一个用例使用 sideinput 进行一些调查。下面是代码。

PipelineOptions options =
            PipelineOptionsFactory.fromArgs().as(PipelineOptions.class);
    Pipeline pipeline = Pipeline.create(options);
    final List<String> sideInput = Arrays.asList("1", "2", "3", "4");
    final List<String> input = Arrays.asList("a", "b", "c", "d");
    PCollectionView<List<String>> sideinput =
            pipeline.apply("readInput", Create.of(sideInput)).apply(View.asList());
    pipeline.apply("read", Create.of(input))
            .apply("process", ParDo.of(new DoFn<String, String>() 
                @ProcessElement public void process(ProcessContext pc) 
                    System.out.println("processing element:" + pc.element());
                    List<String> list = pc.sideInput(sideinput);
                    for (String element : list) 
                        System.out.print(element);
                    
                    System.out.println("");
                
            ).withSideInputs(sideinput));
    pipeline.run();

我希望它会在每个元素之后打印出所有 sideinput 元素,例如

processing element:d
1234
processing element:c
1234
processing element:a
1234
processing element:b
1234

但是每次结果都不一样:

processing element:d
processing element:a
processing element:c
processing element:b
44441113312
2
32
32

或者

processing element:c
processing element:d
processing element:b
processing element:a
444422233211
31

31

【问题讨论】:

【参考方案1】:

这是意料之中的,因为在分布式环境中,无法保证输入元素处理的顺序和聚合系统输出的顺序。您可能希望连接主要元素和侧面输入元素,并一次性将其写出来,以获得您期望的输出。

【讨论】:

这是正确的。此外,许多 Beam runner 会尝试在多个线程中处理元素,这可能会导致多个打印调用相互干扰,就像上面的示例一样。 (这是按照您的建议进行操作并一次性输出所有内容的另一个原因。)

以上是关于在 ParDo 中访问 sideinput的主要内容,如果未能解决你的问题,请参考以下文章

SideInput I/O 会影响性能

侧输入的高效 ParDo 设置或 start_bundle

输出类型中 beam.ParDo 和 beam.Map 的区别?

当 ParDo 函数出现错误时,NACK 不会从 Dataflow 发送回 Google Cloud Pub/Sub

流计算技术实战 - 超大维表问题

Cloud Dataflow 中的“辅助输入”是不是支持从 BigQuery 视图中读取?