在 ParDo 中访问 sideinput
Posted
技术标签:
【中文标题】在 ParDo 中访问 sideinput【英文标题】:Access sideinput inside ParDo 【发布时间】:2022-01-24 03:27:43 【问题描述】:我是 apache beam 的新手,我正在对我们的一个用例使用 sideinput 进行一些调查。下面是代码。
PipelineOptions options =
PipelineOptionsFactory.fromArgs().as(PipelineOptions.class);
Pipeline pipeline = Pipeline.create(options);
final List<String> sideInput = Arrays.asList("1", "2", "3", "4");
final List<String> input = Arrays.asList("a", "b", "c", "d");
PCollectionView<List<String>> sideinput =
pipeline.apply("readInput", Create.of(sideInput)).apply(View.asList());
pipeline.apply("read", Create.of(input))
.apply("process", ParDo.of(new DoFn<String, String>()
@ProcessElement public void process(ProcessContext pc)
System.out.println("processing element:" + pc.element());
List<String> list = pc.sideInput(sideinput);
for (String element : list)
System.out.print(element);
System.out.println("");
).withSideInputs(sideinput));
pipeline.run();
我希望它会在每个元素之后打印出所有 sideinput 元素,例如
processing element:d
1234
processing element:c
1234
processing element:a
1234
processing element:b
1234
但是每次结果都不一样:
processing element:d
processing element:a
processing element:c
processing element:b
44441113312
2
32
32
或者
processing element:c
processing element:d
processing element:b
processing element:a
444422233211
31
31
【问题讨论】:
【参考方案1】:这是意料之中的,因为在分布式环境中,无法保证输入元素处理的顺序和聚合系统输出的顺序。您可能希望连接主要元素和侧面输入元素,并一次性将其写出来,以获得您期望的输出。
【讨论】:
这是正确的。此外,许多 Beam runner 会尝试在多个线程中处理元素,这可能会导致多个打印调用相互干扰,就像上面的示例一样。 (这是按照您的建议进行操作并一次性输出所有内容的另一个原因。)以上是关于在 ParDo 中访问 sideinput的主要内容,如果未能解决你的问题,请参考以下文章
输出类型中 beam.ParDo 和 beam.Map 的区别?