我可以在多个键(连接条件)上连接 Dataflow(Apache Beam)中的两个表吗?

Posted

技术标签:

【中文标题】我可以在多个键(连接条件)上连接 Dataflow(Apache Beam)中的两个表吗?【英文标题】:Can I join two tables in Dataflow (Apache Beam) on multiple keys (join condition)? 【发布时间】:2019-07-22 09:36:22 【问题描述】:

我有两张表,例如:

ID Name  Age
1  Alex  20
2  Sarah 21 and so on
.....................
ID Name  Marks
1  Alex  80
2  Sarah 78 and so on
.....................

我想在多个键(连接条件)上使用 Cloud Dataflow (Apache Beam) 连接这两个表。 e. ID 和 Name 都是公共列。我该怎么做?

我尝试使用一个键(一个公共列)加入它,但我不知道如何使用多个键

我已将此代码用作参考:

https://github.com/GoogleCloudPlatform/professional-services/blob/master/examples/dataflow-python-examples/dataflow_python_examples/data_lake_to_mart.py

class JoinTables:
    def add_key_details(self, row, key_details):
        result = row.copy()
        try:
            result.update(key_details[row['name']])
        except KeyError as err:
            traceback.print_exc()
            logging.error("Name Not Found error: %s", err)
        return result

def run(argv=None):
    jointables = JoinTables()

    table1= (p 
        | 'Read table1 details from BigQuery ' >> beam.io.Read(
             beam.io.BigQuerySource(
                  query='SELECT * FROM `dataset.table1`',
                  use_standard_sql=True
             )
        )
        | 'Key Details 1' >> beam.Map(lambda row: (row['name'], row))
    )

    table2 = (p 
        | 'Read table2 details from BigQuery ' >> beam.io.Read(
            beam.io.BigQuerySource(
                query='SELECT * FROM `dataset.table2`',
                use_standard_sql=True
            )
        )
        | 'Join data with side input 1' >> beam.Map(jointables.add_key_details, AsDict(table1))
    )

【问题讨论】:

【参考方案1】:

TLDR :您需要使用元组键 (ID, name) 映射 table1,然后使用这两个值访问行。

# Map using tuple
| 'Key Details 1' >> beam.Map(lambda row: ((row['id'], row['name']), row))

# Access using tuple
result.update(key_details[(row['id'], row['name'])])

解释:

加入这里基本上是:

    将 table1 转换为 KV 对,其中 K 是字段,V 是行
beam.Map(lambda row: (row['name'], row))
    传表1as a side input as a dictionary
beam.Map(jointables.add_key_details, AsDict(table1))
    对于 table2 的每一行,使用相同的 key 和 update table2 row 获取 table1 等效项
result.update(key_details[row['name']])
    返回包含新字段的新行。

因此,您在第 1 步和第 3 步中使用的字段是“名称”。如果您想使用其他名称,只需调用名称以外的名称(例如:row['id'])。获取多个字段的技巧是使用元组作为键。这样,只需将您的行映射到(row['id'], row['name']) 并在add_key_details 中使用它来访问正确的table1 行。

希望这会有所帮助!

【讨论】:

【参考方案2】:

是的,你可以这样做。从两个 BigQuery 源获得两个 PCollection 后,您需要进行一些预处理,使用 CoGroupByKey,然后按照 Rafael 的回答取消嵌套记录。如果你需要在多个地方进行类似的连接,你也可以使用复合转换来抽象所有这些,然后只传递你想要连接的列名 PCollection。这在blog 中有很好的描述。

【讨论】:

以上是关于我可以在多个键(连接条件)上连接 Dataflow(Apache Beam)中的两个表吗?的主要内容,如果未能解决你的问题,请参考以下文章

列上加索引时事有条件

使用 Dataflow Java 代码加入嵌套结构表

在多个连接条件下将数据从第二个表插入一个表

在映射器的单个输出上运行多个减速器

带有 JdbcIO 编写器的 ApacheBeam/DataFlow 运行器创建了太多连接

使用 TPL-Dataflow 进行聚合和连接(内、外、左……)?