Apache-Beam 将序列号添加到 PCollection

Posted

技术标签:

【中文标题】Apache-Beam 将序列号添加到 PCollection【英文标题】:Apache-Beam add sequence number to a PCollection 【发布时间】:2018-05-16 23:05:14 【问题描述】:

我正在尝试构建一个 ETL 来加载维度表。我是使用 Python 和 DataFlow 以及 BigQuery 的 Apache Bea。

我需要为 pcollection 的每个元素分配一个序列号,以便将其加载到 BigQuery 中,但我找不到任何方法。

我认为我需要 DataFlow 进行先前的聚合并加入以获取我的最终 pcollection 以添加序列号,但此时我需要停止并行处理并将我的 pcollection 转换为列表(如在 Spark 中使用.collect()) 然后做一个简单的循环来分配序列号。对吗?

这是我编写的管道:

p | ReadFromAvro(known_args.input) | beam.Map(adapt) | beam.GroupByKey() | beam.Map(adaptGroupBy) 

我读过没有办法从 pcollection 中获取列表: How to get a list of elements out of a PCollection in Google Dataflow and use it in the pipeline to loop Write Transforms?

我怎样才能实现它?有什么帮助吗?

【问题讨论】:

你能发布你到目前为止尝试过的东西和代码吗? 这是我第一次使用 Beam。我要添加我的一段代码,但我没有找到任何方法。 您能否详细说明您认为需要添加序列号的原因?您计划在 BigQuery 中执行哪些需要此序列号的操作? 我需要它来识别维表中的一个pk 听起来你只需要一个唯一的键,而不一定是序列号?可以使用随机生成的 GUID 吗? 【参考方案1】:

如果您想要获取包含PCollection 中每个元素的列表,您可以使用侧面输入。请记住,这会从您的结果中删除所有并行性,并且您的管道可能会变慢。

如果你还想这样做,那么:

side_input_coll = beam.pvalue.AsIterable(my_collection)

(p 
 | beam.Create([0]) 
 | beam.FlatMap(lambda _, my_seq: [(elem, i) for i, elem in enumerate(my_seq)],
               my_seq=side_input_coll))

但不要忘记,为了保持并行性,最好简单地生成一个随机 ID。请记住,PCollections 本质上是无序的。

要了解有关侧输入的更多信息,请参阅Beam Programming Guide on Side Inputs

【讨论】:

以上是关于Apache-Beam 将序列号添加到 PCollection的主要内容,如果未能解决你的问题,请参考以下文章

如何组合两个结果并将其传递到 apache-beam 管道中的下一步

使用Apache-beam在Python中删除字典中的第一项[重复]

在 mac zsh 终端上安装 apache-beam[gcp] 时出错 - “zsh: no match found: apache-beam[gcp]”

apache-beam 从 GCS 存储桶的多个文件夹中读取多个文件并加载它 bigquery python

如何将类型添加到 GWT 的序列化策略白名单?

将变量添加到序列化缓冲区而不使用 protobuf.net 对其进行序列化