Apache-Beam 将序列号添加到 PCollection
Posted
技术标签:
【中文标题】Apache-Beam 将序列号添加到 PCollection【英文标题】:Apache-Beam add sequence number to a PCollection 【发布时间】:2018-05-16 23:05:14 【问题描述】:我正在尝试构建一个 ETL 来加载维度表。我是使用 Python 和 DataFlow 以及 BigQuery 的 Apache Bea。
我需要为 pcollection 的每个元素分配一个序列号,以便将其加载到 BigQuery 中,但我找不到任何方法。
我认为我需要 DataFlow 进行先前的聚合并加入以获取我的最终 pcollection 以添加序列号,但此时我需要停止并行处理并将我的 pcollection 转换为列表(如在 Spark 中使用.collect()) 然后做一个简单的循环来分配序列号。对吗?
这是我编写的管道:
p | ReadFromAvro(known_args.input) | beam.Map(adapt) | beam.GroupByKey() | beam.Map(adaptGroupBy)
我读过没有办法从 pcollection 中获取列表: How to get a list of elements out of a PCollection in Google Dataflow and use it in the pipeline to loop Write Transforms?
我怎样才能实现它?有什么帮助吗?
【问题讨论】:
你能发布你到目前为止尝试过的东西和代码吗? 这是我第一次使用 Beam。我要添加我的一段代码,但我没有找到任何方法。 您能否详细说明您认为需要添加序列号的原因?您计划在 BigQuery 中执行哪些需要此序列号的操作? 我需要它来识别维表中的一个pk 听起来你只需要一个唯一的键,而不一定是序列号?可以使用随机生成的 GUID 吗? 【参考方案1】:如果您想要获取包含PCollection
中每个元素的列表,您可以使用侧面输入。请记住,这会从您的结果中删除所有并行性,并且您的管道可能会变慢。
如果你还想这样做,那么:
side_input_coll = beam.pvalue.AsIterable(my_collection)
(p
| beam.Create([0])
| beam.FlatMap(lambda _, my_seq: [(elem, i) for i, elem in enumerate(my_seq)],
my_seq=side_input_coll))
但不要忘记,为了保持并行性,最好简单地生成一个随机 ID。请记住,PCollections
本质上是无序的。
要了解有关侧输入的更多信息,请参阅Beam Programming Guide on Side Inputs
【讨论】:
以上是关于Apache-Beam 将序列号添加到 PCollection的主要内容,如果未能解决你的问题,请参考以下文章
如何组合两个结果并将其传递到 apache-beam 管道中的下一步
使用Apache-beam在Python中删除字典中的第一项[重复]
在 mac zsh 终端上安装 apache-beam[gcp] 时出错 - “zsh: no match found: apache-beam[gcp]”