将写入操作映射到 Dataframe 行组到不同的增量表
Posted
技术标签:
【中文标题】将写入操作映射到 Dataframe 行组到不同的增量表【英文标题】:Map write operation to groups of Dataframe rows to different delta tables 【发布时间】:2021-08-19 13:01:12 【问题描述】:我有一个数据框,其中包含将保存到不同目标表的行。现在,我正在寻找唯一的参数组合来确定目标表,遍历 Dataframe 并进行过滤,然后进行编写。
类似的东西:
df = spark.load.json(directory).repartition('client', 'region')
unique_clients_regions = [(group.client, group.region) for group in df.select('client', 'region').distinct().collect()]
for client, region in unique_clients_regions:
(df
.filter(f"client = 'client' and region = 'region'")
.select(
...
)
.write.mode("append")
.saveAsTable(f"client_region_data")
)
有没有办法将写入操作映射到不同的groupBy
组,而不必遍历不同的集合?我确保通过client
和region
重新分区以尝试加快过滤器的性能。
【问题讨论】:
【参考方案1】:凭良心,我无法使用此解决方案为您提供任何建议。实际上,这是一个非常糟糕的数据架构。 您应该只有一个表和按客户端和区域划分的分区。这将为每个客户/区域创建不同的文件夹。最后只需要写一次,不需要循环也不需要收集。
spark.read.json(directory).write.saveAsTable(
"data",
mode="append",
partitionBy=['client', 'region']
)
【讨论】:
感谢@Steven,在学习 Spark 和这种架构的过程中。问题是每个客户端/区域的上游有不同的列/模式,这些列/模式会随着时间的推移而变化,因此每个不同的表都是有意义的。 @TomNash 如果输入是一个 unic json 文件,则意味着每对客户端/区域的架构都是相同的。所以不需要有多个表。 不是。这就是问题所在。一个文件中不同模式的多个 JSON 记录。尝试处理每个人。 @TomNash 这不是你处理它们的方式。从您执行此操作的那一刻起df = spark.load.json(directory)
,df 将拥有一个全局模式,统一您拥有的所有 json。这意味着最终表也将具有此统一模式。您可以根据需要进行过滤,它不会更改架构。
是的,这就是我们遇到的问题,我可以spark.load.text
然后在过滤集上处理它们get_json_object
以获取该组的本地化架构,尽管没有定义全局架构。问题仍然存在,如何在不循环的情况下将一个 Dataframe 的行写入不同的目的地。另一种方法是一次将一个文件加载到它自己的 Dataframe 中,然后一次写入一行。以上是关于将写入操作映射到 Dataframe 行组到不同的增量表的主要内容,如果未能解决你的问题,请参考以下文章
Spark操作dataFrame进行写入mysql,自定义sql的方式