如何从 PCollection Apache Beam Python 创建 N 个元素组

Posted

技术标签:

【中文标题】如何从 PCollection Apache Beam Python 创建 N 个元素组【英文标题】:How to create groups of N elements from a PCollection Apache Beam Python 【发布时间】:2018-09-04 19:50:21 【问题描述】:

我正在尝试完成这样的事情: Batch PCollection in Beam/Dataflow

以上链接中的答案是 Java,而我正在使用的语言是 Python。因此,我需要一些帮助来获得类似的构造。

特别是我有这个:

 p = beam.Pipeline (options = pipeline_options)
 lines = p | 'File reading' >> ReadFromText (known_args.input)

在此之后,我需要创建另一个 PCollection,但使用 N 行“行”的 List,因为我的用例需要一组行。我不能逐行操作。

我尝试了一个ParDo 函数,该函数使用与计数器 N 行关联的计数变量,并在 groupBy 之后使用 Map。但是这些会每 1000 条记录重置一次,所以这不是我正在寻找的解决方案。我阅读了链接中的示例,但我不知道如何在 Python 中执行类似的操作。

我尝试将计数器保存在 Datastore 中,但是 Dataflow 与 Datastore 读取和写入的速度差异非常显着。

这样做的正确方法是什么?我不知道如何处理它。 问候。

【问题讨论】:

您能改述一下您想要实现的目标吗?您希望能够将整个输入文件(所有行)作为单个列表吗? 嗨@MarcinZablocki,不,我想要输入文件中包含N行的列表的PCollection,例如:如果N是2并且输入是“1,2,3,4,5, 6,7,8",其中逗号是换行符,我想要一个类似这样的 PCollection:PCollection[List(1,2), List(3,4), List(5,6), List( 7,8)] 那么如果输入是“1,2,3,4,5,6,7”并且N=2呢?输出 PCollection 应该是什么样子? PCollection 是无序的。除非您的输入包含订单信息(例如 ReadFromText 返回 (sequence number, element) 的元组),否则这种确定性分组对于使用梁很棘手(需要 State 或数据驱动的触发器)。如果您的管道不需要确定性分组,您可以在 DoFn 中维护一个大小为 N 的缓冲区,并在每次缓冲区满时(或在 finish_bundle 中)刷新缓冲区。 This 的问题似乎与您的类似 - 答案是使用 Top transform。 【参考方案1】:

假设分组顺序不重要,您可以在DoFn 内分组。

class Group(beam.DoFn):
  def __init__(self, n):
     self._n = n
     self._buffer = []

  def process(self, element):
     self._buffer.append(element)
     if len(self._buffer) == self._n:
        yield list(self._buffer)
        self._buffer = []

  def finish_bundle(self):
     if len(self._buffer) != 0:
        yield list(self._buffer)
        self._buffer = []

lines = p | 'File reading' >> ReadFromText(known_args.input)
          | 'Group' >> beam.ParDo(Group(known_args.N)
          ...

【讨论】:

以上是关于如何从 PCollection Apache Beam Python 创建 N 个元素组的主要内容,如果未能解决你的问题,请参考以下文章

Apache-Beam 将序列号添加到 PCollection

如何从 Dataflow 中的 PCollection 读取 bigQuery

如何从 pcollection 将多个值写入红移表

如何从 PCollection 中过滤出无值

如何从光束中的 PCollection<string> 获取所有文件元数据

如何在Apache Beam / Google Dataflow中使用ParseJsons?