我可以在多个键(连接条件)上连接 Dataflow(Apache Beam)中的两个表吗?
Posted
技术标签:
【中文标题】我可以在多个键(连接条件)上连接 Dataflow(Apache Beam)中的两个表吗?【英文标题】:Can I join two tables in Dataflow (Apache Beam) on multiple keys (join condition)? 【发布时间】:2019-07-22 09:36:22 【问题描述】:我有两张表,例如:
ID Name Age
1 Alex 20
2 Sarah 21 and so on
.....................
ID Name Marks
1 Alex 80
2 Sarah 78 and so on
.....................
我想在多个键(连接条件)上使用 Cloud Dataflow (Apache Beam) 连接这两个表。 e. ID 和 Name 都是公共列。我该怎么做?
我尝试使用一个键(一个公共列)加入它,但我不知道如何使用多个键
我已将此代码用作参考:
https://github.com/GoogleCloudPlatform/professional-services/blob/master/examples/dataflow-python-examples/dataflow_python_examples/data_lake_to_mart.py
class JoinTables:
def add_key_details(self, row, key_details):
result = row.copy()
try:
result.update(key_details[row['name']])
except KeyError as err:
traceback.print_exc()
logging.error("Name Not Found error: %s", err)
return result
def run(argv=None):
jointables = JoinTables()
table1= (p
| 'Read table1 details from BigQuery ' >> beam.io.Read(
beam.io.BigQuerySource(
query='SELECT * FROM `dataset.table1`',
use_standard_sql=True
)
)
| 'Key Details 1' >> beam.Map(lambda row: (row['name'], row))
)
table2 = (p
| 'Read table2 details from BigQuery ' >> beam.io.Read(
beam.io.BigQuerySource(
query='SELECT * FROM `dataset.table2`',
use_standard_sql=True
)
)
| 'Join data with side input 1' >> beam.Map(jointables.add_key_details, AsDict(table1))
)
【问题讨论】:
【参考方案1】:TLDR :您需要使用元组键 (ID, name)
映射 table1
,然后使用这两个值访问行。
# Map using tuple
| 'Key Details 1' >> beam.Map(lambda row: ((row['id'], row['name']), row))
# Access using tuple
result.update(key_details[(row['id'], row['name'])])
解释:
加入这里基本上是:
-
将 table1 转换为 KV 对,其中 K 是字段,V 是行
beam.Map(lambda row: (row['name'], row))
-
传表1as a side input as a dictionary
beam.Map(jointables.add_key_details, AsDict(table1))
-
对于 table2 的每一行,使用相同的 key 和 update table2 row 获取 table1 等效项
result.update(key_details[row['name']])
-
返回包含新字段的新行。
因此,您在第 1 步和第 3 步中使用的字段是“名称”。如果您想使用其他名称,只需调用名称以外的名称(例如:row['id']
)。获取多个字段的技巧是使用元组作为键。这样,只需将您的行映射到(row['id'], row['name'])
并在add_key_details
中使用它来访问正确的table1
行。
希望这会有所帮助!
【讨论】:
【参考方案2】:是的,你可以这样做。从两个 BigQuery 源获得两个 PCollection 后,您需要进行一些预处理,使用 CoGroupByKey,然后按照 Rafael 的回答取消嵌套记录。如果你需要在多个地方进行类似的连接,你也可以使用复合转换来抽象所有这些,然后只传递你想要连接的列名 PCollection。这在blog 中有很好的描述。
【讨论】:
以上是关于我可以在多个键(连接条件)上连接 Dataflow(Apache Beam)中的两个表吗?的主要内容,如果未能解决你的问题,请参考以下文章