如何在 Python 中将表行 PCollections 转换为键、值 PCollections?

Posted

技术标签:

【中文标题】如何在 Python 中将表行 PCollections 转换为键、值 PCollections?【英文标题】:How do I convert table row PCollections to key,value PCollections in Python? 【发布时间】:2018-05-14 21:37:42 【问题描述】:

没有关于如何将 pCollections 转换为输入 .CoGroupByKey() 所需的 pCollections 的文档

上下文 本质上,我有两个大的 pCollections,我需要能够找到两者之间的差异,用于 II 型 ETL 更改(如果它在 pColl1 中不存在,则添加到 pColl2 中找到的嵌套字段),以便我能够保留 BigQuery 中这些记录的历史记录。

管道架构:

    将 BQ 表读入 2 个 pCollections:dwsku 和 product。 对两个集合应用 CoGroupByKey() 以返回 --> 结果 解析结果以查找 dwsku 中的所有更改并将其嵌套到产品中。

我们会推荐任何帮助。我在 SO 上找到了一个 java 链接,它完成了我需要完成的同样的事情(但 Python SDK 上没有任何内容)。

Convert from PCollection<TableRow> to PCollection<KV<K,V>>

是否有针对 Apache Beam,尤其是 Python SDK 的文档/支持?

【问题讨论】:

【参考方案1】:

为了使CoGroupByKey() 正常工作,您需要拥有tuples 中的PCollections,其中第一个元素是,第二个元素是数据强>。

在您的情况下,您说您有 BigQuerySource,在当前版本的 Apache Beam 中输出 PCollection of dictionaries (code),其中每个条目代表已读取的表中的一行。如上所述,您需要将此 PCollections 映射到元组。使用ParDo 很容易做到这一点:

class MapBigQueryRow(beam.DoFn):
    def process(self, element, key_column):
        key = element.get(key_column)
        yield key, element


data1 = (p
            | "Read #1 BigQuery table" >> beam.io.Read(beam.io.BigQuerySource(query="your query #1"))
            | "Map #1 to KV" >> beam.ParDo(MapBigQueryRow(), key_column="KEY_COLUMN_IN_TABLE_1"))

data2 = (p
            | "Read #2 BigQuery table" >> beam.io.Read(beam.io.BigQuerySource(query="your query #2"))
            | "Map #2 to KV" >> beam.ParDo(MapBigQueryRow(), key_column="KEY_COLUMN_IN_TABLE_2"))

co_grouped = ("data1": data1, "data2": data2 | beam.CoGroupByKey())

# do your processing with co_grouped here

顺便说一句,Apache Beam 的 Python SDK 文档可以在 here 找到。

【讨论】:

以上是关于如何在 Python 中将表行 PCollections 转换为键、值 PCollections?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Spring Boot 中将嵌套的 JSON 对象映射为 SQL 表行

如何在Oracle中将表行拆分为固定大小的块

在 OBIEE 中将总计添加到数据透视表行的末尾

在 SQL 中将值列表与表行连接起来

在 python 金字塔 web 框架中,如何在播种之前删除所有数据库表行?

如何从 BeautifulSoup ( Python ) 中的表中获取第一个子表行