如何将 PCollection 转移到普通列表

Posted

技术标签:

【中文标题】如何将 PCollection 转移到普通列表【英文标题】:How to transfer PCollection to a normal List 【发布时间】:2015-10-13 16:21:56 【问题描述】:

在进行 Bigquery 处理后,我有一个 PCollection 作为管道的结果,现在我想使用与管道分开的部分数据。如何将 PCollection 传输到 List 以便我可以遍历它并使用其内容。

我在概念上做错了吗?

【问题讨论】:

【参考方案1】:

在 Dataflow 管道内完成数据处理后,您可能希望将数据写入持久存储,例如 Cloud Storage (GCS) 中的文件、BigQuery 中的表等。

然后,您可以在 Dataflow 之外使用数据,例如,将其读入列表。显然,它需要适合该特定操作的内存。

【讨论】:

【参考方案2】:

我要做的是创建“侧面输出”(https://cloud.google.com/dataflow/model/par-do),这是您与主流程一起创建的另一个 PCollection,因此最终您将拥有 2 个 PCollection 作为 BQ 流程的结果。

只需确保在您的流程功能上创建一个条件以将元素添加到侧输出集合。像这样的:

public final void processElement(final ProcessContext context) throws Exception 
  context.output(bqProcessResult);
  if (condition) 
    context.sideOutput(myFilterTag, bqProcessResult);
  

该过程的结果不是 PCollection,而是 PCollectionTuple,因此您只需执行以下操作:

PCollectionTuple myTuples = previous process using the function above...;
PCollection<MyType> bqCollection = myTuples.get(bqTag);
PCollection<MyType> filteredCollection = myTuples.get(myFilterTag);

【讨论】:

以上是关于如何将 PCollection 转移到普通列表的主要内容,如果未能解决你的问题,请参考以下文章

如何将 Pcollection<String> 变量转换为字符串

如何从 pcollection 将多个值写入红移表

Apache-Beam 将序列号添加到 PCollection

如何将 Realm 数据库转移到应用程序组

使用一个 pcollection 作为另一个 pcollection 的输入

如何从 PCollection 中过滤出无值