在 PCollection 上使用 python 在数据流中执行 sql 查询
Posted
技术标签:
【中文标题】在 PCollection 上使用 python 在数据流中执行 sql 查询【英文标题】:Executing a sql query inside a dataflow using python on PCollection 【发布时间】:2018-06-12 11:13:17 【问题描述】:我正在尝试将一个 sql 查询实现为数据流中的转换。我从 bigquery 加载了一个表作为 PCollection。我想像下面的查询一样聚合我的数据。
SELECT
name,
user_id,
place,
SUM(amount) as some_amount ,
SUM(cost) as sum_cost
FROM
[project:test.day_0_test]
GROUP BY 1,2,3
我如何轻松实现它。我听说使用 Java 的数据流支持在 P Collection 上运行 sql kind 查询,但正确地 python 不支持。谁能帮我解决这个问题
注意:
我想在 P 集合上实现这个查询.. 不直接从 bigquery 读取
【问题讨论】:
您需要使用GroupByKey
。见这里:cloud.google.com/dataflow/model/group-by-key#groupbykey
【参考方案1】:
(当您评论不想直接在 BigQuery 中运行 SQL 查询时,我编辑了我的答案)
我模拟了一个文件input.csv
,其中包含:
#input.csv
name1,1,place1,2.,1.5
name1,1,place1,3.,0.5
name1,1,place2,1.,1
name1,2,place3,2.,1.5
name2,2,place3,3.,0.5
这似乎是您从 BQ 检索的数据。您的 SQL 查询可以在 Beam 中实现,例如:
def sum_l(l):
s0, s1 = 0, 0
for i in range(len(l)):
s0 += l[i][0]
s1 += l[i][1]
return [s0, s1]
with beam.Pipeline(options=po) as p:
(p | 'Read Input' >> beam.io.ReadFromText("input.csv")
| 'Split Commas' >> beam.Map(lambda x: x.strip().split(','))
| 'Prepare Keys' >> beam.Map(lambda x: (x[:-2], map(float, x[-2:])))
| 'Group Each Key' >> beam.GroupByKey()
| 'Make Summation' >> beam.Map(lambda x: [x[0], sum_l([e for e in x[1]])])
| 'Write Results' >> beam.io.WriteToText('results.csv'))
结果是:
#results.csv-00000-of-00001
[[u'name1', u'1', u'place2'], [1.0, 1.0]]
[[u'name1', u'2', u'place3'], [2.0, 1.5]]
[[u'name1', u'1', u'place1'], [5.0, 2.0]]
[[u'name2', u'2', u'place3'], [3.0, 0.5]]
这基本上是查询的直接 MapReduce 实现:为每一行构建一个键,它们被组合在一起,最后的求和发生在使用函数 sum_l
的 Map
操作中。
我不确定您为什么要在 Beam 而不是 BigQuery 中运行查询操作。我建议尝试这两种方法,因为在这种情况下,Beam 的效率可能不如 BigQuery 的效率。
【讨论】:
我想在 PCOLlection 上实现这个查询。不要直接从 bigquery 中读取 我刚刚编辑了答案以在 Python Beam 操作中实现您的 SQL 查询,希望这就是您要寻找的。span> 我们可以像变量一样保存 p 集合以备将来使用吗? 当然,只需将结果赋给另一个变量,例如:other_po = (p | transforms...)
。然后您可以使用other_po
进行未来的转换
谢谢威廉。现在我想合并两个 P 集合 - 就像从一个 P 集合中一样,我必须获取 id 并在第二个 P 集合中获取相应的行 - 我如何使用 python 执行此连接操作(来自 PCollection1 的 id,(id,value1,value2,value3 ) 来自 PCollection 2)以上是关于在 PCollection 上使用 python 在数据流中执行 sql 查询的主要内容,如果未能解决你的问题,请参考以下文章
如何从 PCollection Apache Beam Python 创建 N 个元素组
Apache-Beam 将序列号添加到 PCollection