数据流:将 Top 模块与 Python SDK 一起使用:单元素 PCollection
Posted
技术标签:
【中文标题】数据流:将 Top 模块与 Python SDK 一起使用:单元素 PCollection【英文标题】:Dataflow: Using Top module with Python SDK: single-element PCollection 【发布时间】:2016-11-01 20:37:10 【问题描述】:我正在查看 incubator-beam 存储库(链接自 Dataflow 文档)上的 word_counting.py 示例,我想修改它以获得出现次数最多的 n。这是我的管道:
counts = (lines
| 'split' >> (beam.ParDo(WordExtractingDoFn())
.with_output_types(unicode))
| 'pair_with_one' >> beam.Map(lambda x: (x, 1))
| 'group' >> beam.GroupByKey()
| 'count' >> beam.Map(lambda (word, ones): (word, sum(ones)))
| 'top' >> beam.combiners.Top.Of('top', 10, key=lambda (word, c): c) # 'top' is the only added line
output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
output | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))
我使用 Top.Of() 方法添加了一行,但它似乎返回了一个 PCollection,其中一个数组作为单个元素(我正在等待一个有序的 PCollection,但似乎在查看文档PCollections 是无序的集合。
当管道运行时,beam.Map 仅循环一个元素(即整个数组)并且在“格式”中,lambda 函数会引发错误,因为它无法将整个数组映射到元组(word,c )
我应该如何处理这个单元素 PCollection 而不会在这一步中断管道?
【问题讨论】:
【参考方案1】:如果您想将可迭代对象的PCollection
扩展为这些可迭代对象的元素的PCollection
,可以使用FlatMap
,其参数是从元素到结果可迭代的函数:在您的情况下,元素本身就是可迭代的,所以我们使用标识函数。
counts = ...
| 'top' >> beam.combiners.Top.Of('top', 10, key=lambda (word, c): c)
| 'expand' >> beam.FlatMap(lambda word_counts: word_counts) # sic!
output = counts | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
...
【讨论】:
以上是关于数据流:将 Top 模块与 Python SDK 一起使用:单元素 PCollection的主要内容,如果未能解决你的问题,请参考以下文章
Facebook-sdk python模块没有属性GraphAPI