如何从 PCollection Apache Beam Python 创建 N 个元素组
Posted
技术标签:
【中文标题】如何从 PCollection Apache Beam Python 创建 N 个元素组【英文标题】:How to create groups of N elements from a PCollection Apache Beam Python 【发布时间】:2018-09-04 19:50:21 【问题描述】:我正在尝试完成这样的事情: Batch PCollection in Beam/Dataflow
以上链接中的答案是 Java,而我正在使用的语言是 Python。因此,我需要一些帮助来获得类似的构造。
特别是我有这个:
p = beam.Pipeline (options = pipeline_options)
lines = p | 'File reading' >> ReadFromText (known_args.input)
在此之后,我需要创建另一个 PCollection
,但使用 N 行“行”的 List
,因为我的用例需要一组行。我不能逐行操作。
我尝试了一个ParDo
函数,该函数使用与计数器 N 行关联的计数变量,并在 groupBy
之后使用 Map
。但是这些会每 1000 条记录重置一次,所以这不是我正在寻找的解决方案。我阅读了链接中的示例,但我不知道如何在 Python 中执行类似的操作。
我尝试将计数器保存在 Datastore 中,但是 Dataflow 与 Datastore 读取和写入的速度差异非常显着。
这样做的正确方法是什么?我不知道如何处理它。 问候。
【问题讨论】:
您能改述一下您想要实现的目标吗?您希望能够将整个输入文件(所有行)作为单个列表吗? 嗨@MarcinZablocki,不,我想要输入文件中包含N行的列表的PCollection,例如:如果N是2并且输入是“1,2,3,4,5, 6,7,8",其中逗号是换行符,我想要一个类似这样的 PCollection:PCollection[List(1,2), List(3,4), List(5,6), List( 7,8)] 那么如果输入是“1,2,3,4,5,6,7”并且N=2呢?输出 PCollection 应该是什么样子? PCollection 是无序的。除非您的输入包含订单信息(例如ReadFromText
返回 (sequence number, element)
的元组),否则这种确定性分组对于使用梁很棘手(需要 State
或数据驱动的触发器)。如果您的管道不需要确定性分组,您可以在 DoFn 中维护一个大小为 N 的缓冲区,并在每次缓冲区满时(或在 finish_bundle
中)刷新缓冲区。
This 的问题似乎与您的类似 - 答案是使用 Top transform。
【参考方案1】:
假设分组顺序不重要,您可以在DoFn
内分组。
class Group(beam.DoFn):
def __init__(self, n):
self._n = n
self._buffer = []
def process(self, element):
self._buffer.append(element)
if len(self._buffer) == self._n:
yield list(self._buffer)
self._buffer = []
def finish_bundle(self):
if len(self._buffer) != 0:
yield list(self._buffer)
self._buffer = []
lines = p | 'File reading' >> ReadFromText(known_args.input)
| 'Group' >> beam.ParDo(Group(known_args.N)
...
【讨论】:
以上是关于如何从 PCollection Apache Beam Python 创建 N 个元素组的主要内容,如果未能解决你的问题,请参考以下文章
Apache-Beam 将序列号添加到 PCollection
如何从 Dataflow 中的 PCollection 读取 bigQuery