数据流:将 Top 模块与 Python SDK 一起使用:单元素 PCollection

Posted

技术标签:

【中文标题】数据流:将 Top 模块与 Python SDK 一起使用:单元素 PCollection【英文标题】:Dataflow: Using Top module with Python SDK: single-element PCollection 【发布时间】:2016-11-01 20:37:10 【问题描述】:

我正在查看 incubator-beam 存储库(链接自 Dataflow 文档)上的 word_counting.py 示例,我想修改它以获得出现次数最多的 n。这是我的管道:

  counts = (lines
        | 'split' >> (beam.ParDo(WordExtractingDoFn())
                      .with_output_types(unicode))
        | 'pair_with_one' >> beam.Map(lambda x: (x, 1))
        | 'group' >> beam.GroupByKey()
        | 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
        | 'top' >> beam.combiners.Top.Of('top', 10, key=lambda (word, c): c) # 'top' is the only added line

  output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
  output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))

我使用 Top.Of() 方法添加了一行,但它似乎返回了一个 PCollection,其中一个数组作为单个元素(我正在等待一个有序的 PCollection,但似乎在查看文档PCollections 是无序的集合。

当管道运行时,beam.Map 仅循环一个元素(即整个数组)并且在“格式”中,lambda 函数会引发错误,因为它无法将整个数组映射到元组(word,c )

我应该如何处理这个单元素 PCollection 而不会在这一步中断管道?

【问题讨论】:

【参考方案1】:

如果您想将可迭代对象的PCollection 扩展为这些可迭代对象的元素的PCollection,可以使用FlatMap,其参数是从元素到结果可迭代的函数:在您的情况下,元素本身就是可迭代的,所以我们使用标识函数。

  counts = ...
        | 'top' >> beam.combiners.Top.Of('top', 10, key=lambda (word, c): c)
        | 'expand' >> beam.FlatMap(lambda word_counts: word_counts) # sic!

  output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
  ...

【讨论】:

以上是关于数据流:将 Top 模块与 Python SDK 一起使用:单元素 PCollection的主要内容,如果未能解决你的问题,请参考以下文章

《团对-爬取豆瓣电影TOP250-开发环境搭建过程》

团队-Python 爬取豆瓣电影top250-需求分析

第一个爬虫经历----豆瓣电影top250(经典案例)

Facebook-sdk python模块没有属性GraphAPI

可以将 Python 与 Intel 的 Atom Developer SDK (C/C++) 一起使用吗?

团队-爬取豆瓣电影top250-模块开发过程