如何在 Python 中将表行 PCollections 转换为键、值 PCollections?
Posted
技术标签:
【中文标题】如何在 Python 中将表行 PCollections 转换为键、值 PCollections?【英文标题】:How do I convert table row PCollections to key,value PCollections in Python? 【发布时间】:2018-05-14 21:37:42 【问题描述】:没有关于如何将 pCollections 转换为输入 .CoGroupByKey() 所需的 pCollections 的文档
上下文 本质上,我有两个大的 pCollections,我需要能够找到两者之间的差异,用于 II 型 ETL 更改(如果它在 pColl1 中不存在,则添加到 pColl2 中找到的嵌套字段),以便我能够保留 BigQuery 中这些记录的历史记录。
管道架构:
-
将 BQ 表读入 2 个 pCollections:dwsku 和 product。
对两个集合应用 CoGroupByKey() 以返回 --> 结果
解析结果以查找 dwsku 中的所有更改并将其嵌套到产品中。
我们会推荐任何帮助。我在 SO 上找到了一个 java 链接,它完成了我需要完成的同样的事情(但 Python SDK 上没有任何内容)。
Convert from PCollection<TableRow> to PCollection<KV<K,V>>
是否有针对 Apache Beam,尤其是 Python SDK 的文档/支持?
【问题讨论】:
【参考方案1】:为了使CoGroupByKey()
正常工作,您需要拥有tuples
中的PCollections
,其中第一个元素是键,第二个元素是数据强>。
在您的情况下,您说您有 BigQuerySource
,在当前版本的 Apache Beam 中输出 PCollection of dictionaries
(code),其中每个条目代表已读取的表中的一行。如上所述,您需要将此 PCollections 映射到元组。使用ParDo
很容易做到这一点:
class MapBigQueryRow(beam.DoFn):
def process(self, element, key_column):
key = element.get(key_column)
yield key, element
data1 = (p
| "Read #1 BigQuery table" >> beam.io.Read(beam.io.BigQuerySource(query="your query #1"))
| "Map #1 to KV" >> beam.ParDo(MapBigQueryRow(), key_column="KEY_COLUMN_IN_TABLE_1"))
data2 = (p
| "Read #2 BigQuery table" >> beam.io.Read(beam.io.BigQuerySource(query="your query #2"))
| "Map #2 to KV" >> beam.ParDo(MapBigQueryRow(), key_column="KEY_COLUMN_IN_TABLE_2"))
co_grouped = ("data1": data1, "data2": data2 | beam.CoGroupByKey())
# do your processing with co_grouped here
顺便说一句,Apache Beam 的 Python SDK 文档可以在 here 找到。
【讨论】:
以上是关于如何在 Python 中将表行 PCollections 转换为键、值 PCollections?的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Spring Boot 中将嵌套的 JSON 对象映射为 SQL 表行