如何将 DoFn PTransform 应用于 Apache Beam 中的 PCollectionTuple

Posted

技术标签:

【中文标题】如何将 DoFn PTransform 应用于 Apache Beam 中的 PCollectionTuple【英文标题】:How to apply a DoFn PTransform to a PCollectionTuple in Apache Beam 【发布时间】:2020-06-14 22:10:08 【问题描述】:

我正在尝试将 PTransform 应用于 PCollectionTuple,但无法弄清楚编译器为何抱怨。

我想这样做是为了将加入一些 csv 行所需的多个步骤抽象成一个 PTransform(PCollectionTuple 中的每个 PCollection 都包含要加入的 csv 行),而我遇到的问题不在于加入本身,但如何将 PTransform 应用于 PCollectionTuple。

这是我的代码:

static class JoinCsvLines extends DoFn<PCollectionTuple, String[]> 
        @ProcessElement
        public void processElement(ProcessContext context) 
            PCollectionTuple element = context.element();
            // TODO: Implement the output
        
    

我这样称呼 PTransform:

TupleTag<String[]> tag1 = new TupleTag<>();
TupleTag<String[]> tag2 = new TupleTag<>();
PCollectionTuple toJoin = PCollectionTuple.of(tag1, csvLines1).and(tag2, csvLines2);

// Can't compile this line
PCollection<String[]> joinedLines = toJoin.apply("JoinLines", ParDo.of(new JoinCsvLines()));

当我将鼠标悬停在未编译的行上方时,IntelliJ IDEA 会输出以下内容:

Required type:
PTransform
<? super PCollectionTuple,
OutputT>
Provided:
SingleOutput
<PCollectionTuple,
String[]>
reason: no instance(s) of type variable(s) InputT exist so that PCollectionTuple conforms to PCollection<? extends InputT>

如何将 PTransform 应用于 PCollectionTuple?

【问题讨论】:

【参考方案1】:

DoFn&lt;PCollectionTuple, String[]&gt; 表示您想为每条记录应用“DoFn”,因此您不应使用 PCollectionTuple 作为输入类型。相反,您应该使用“csvLines1”和“csvLines2”的类型。

如果您的意图是合并两个 PCollection,您可以检查 Flatten transform:https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java#L41

【讨论】:

以上是关于如何将 DoFn PTransform 应用于 Apache Beam 中的 PCollectionTuple的主要内容,如果未能解决你的问题,请参考以下文章

包括自定义 PTransform 导致在 GCP 的 Dataflow 作业中找不到依赖项

将列表转换为 PCollection

Apache Beam:DoFn 和 SimpleFunction 有啥区别?

python Dataflow DoFn生命周期中的光束设置()刷新多长时间?

Apache Beam Stateful DoFn 周期性输出所有 K/V 对

如何将 CSS 规则应用于直接相邻的兄弟元素?