使用一个 pcollection 作为另一个 pcollection 的输入

Posted

技术标签:

【中文标题】使用一个 pcollection 作为另一个 pcollection 的输入【英文标题】:Use a pcollection as input of another pcollection 【发布时间】:2018-02-06 19:02:13 【问题描述】:

在谷歌数据流中使用python sdk,我想做这样的查询:

query_a_and_b = "从表 A 中选择 a、b"

此查询返回我想用来执行更多查询的元组列表:

query_param = SELECT * from TableA WHERE a = and b = .format(a, b) (这里我设置了 TableA,但它也将用于与 TableA 内部连接的 TableB、C 和 D...)

所以我想做什么:

coll = (p
    | 'read a_b_tuples' >> beam.io.Read(beam.io.BigQuerySource(query=query_a_and_b, use_standard_sql=True)) 
    | 'Build SQL' >> beam.Map(lambda x: query_param.format(x['a'], x['b'])) 
    | 'Query pardo' >> beam.ParDo(lambda q: [beam.io.Read(beam.io.BigQuerySource(query=q, use_standard_sql=True))])
    | 'Save' >> beam.io.WriteToText('results.csv')
)

我不确定最好的方法,它不起作用。在数据流中实现这一目标的首选方法是什么?

最终,这些查询中的每一个都将返回少量行(少于 5k),我想将它们加载到 pandas 数据帧中进行过滤/处理,然后为每个查询组合所有 TableA、B、C、D tuple (a,b) 并将每个 tuple datafarm 写入一个 csv 文件。

从某种意义上说,我可能错误地减少了问题,我可以使用光束函数按 a 和 b 分组,然后进行处理...?

【问题讨论】:

难道您不能使用通用表表达式将您的 SQL 查询写入 BigQuery 并在 SQL 中执行所有联接 + 使用 UNION ALL 组合结果,以便将 Apache Beam 复杂性保持在最低限度( + 你将有单一来源) 由于计算量大,查询会有点复杂。我们有 5 个不同结构的不同表。独立处理它们并合并汇总结果似乎更容易。我已经有一些代码可以手动在一个 (a,b) 元组上运行,所以我想遍历所有其他元组并利用数据流自动扩展。 【参考方案1】:

Beam 尚不直接支持 BigQuery。其他一些转换支持类似的用例,例如JdbcIO.readAll() 可以查询数据库以获取查询参数集合,TextIO.readAll() 可以读取文件名集合 - 但BigQueryIO 还没有这样做,在 Java 和 Python SDK 中都没有。

在您的“查询 pardo”中,您可以改为明确地与 BigQuery REST API 交谈 - 应该没问题,因为您的查询返回少量结果。

【讨论】:

嗯,这意味着我需要实现指数回退和 sdk 中免费提供的其他一些东西....不是我需要的 :(

以上是关于使用一个 pcollection 作为另一个 pcollection 的输入的主要内容,如果未能解决你的问题,请参考以下文章

在 PCollection 上使用 python 在数据流中执行 sql 查询

数据流如何组合 PCollections 替换对象

如何从 pcollection 将多个值写入红移表

如何从 PCollection 中过滤出无值

如何从谷歌数据流管道中的多个输入 PCollection 生成一个输出 PCollection?

将列表转换为 PCollection