将写入操作映射到 Dataframe 行组到不同的增量表

Posted

技术标签:

【中文标题】将写入操作映射到 Dataframe 行组到不同的增量表【英文标题】:Map write operation to groups of Dataframe rows to different delta tables 【发布时间】:2021-08-19 13:01:12 【问题描述】:

我有一个数据框,其中包含将保存到不同目标表的行。现在,我正在寻找唯一的参数组合来确定目标表,遍历 Dataframe 并进行过滤,然后进行编写。

类似的东西:

df = spark.load.json(directory).repartition('client', 'region')

unique_clients_regions = [(group.client, group.region) for group in df.select('client', 'region').distinct().collect()]

for client, region in unique_clients_regions:
  (df
   .filter(f"client = 'client' and region = 'region'")
   .select(
     ...
   )
   .write.mode("append")
   .saveAsTable(f"client_region_data") 
  )

有没有办法将写入操作映射到不同的groupBy 组,而不必遍历不同的集合?我确保通过clientregion 重新分区以尝试加快过滤器的性能。

【问题讨论】:

【参考方案1】:

凭良心,我无法使用此解决方案为您提供任何建议。实际上,这是一个非常糟糕的数据架构。 您应该只有一个表和按客户端和区域划分的分区。这将为每个客户/区域创建不同的文件夹。最后只需要写一次,不需要循环也不需要收集。

spark.read.json(directory).write.saveAsTable(
    "data",
    mode="append",
    partitionBy=['client', 'region']
)

【讨论】:

感谢@Steven,在学习 Spark 和这种架构的过程中。问题是每个客户端/区域的上游有不同的列/模式,这些列/模式会随着时间的推移而变化,因此每个不同的表都是有意义的。 @TomNash 如果输入是一个 unic json 文件,则意味着每对客户端/区域的架构都是相同的。所以不需要有多个表。 不是。这就是问题所在。一个文件中不同模式的多个 JSON 记录。尝试处理每个人。 @TomNash 这不是你处理它们的方式。从您执行此操作的那一刻起df = spark.load.json(directory),df 将拥有一个全局模式,统一您拥有的所有 json。这意味着最终表也将具有此统一模式。您可以根据需要进行过滤,它不会更改架构。 是的,这就是我们遇到的问题,我可以spark.load.text 然后在过滤集上处理它们get_json_object 以获取该组的本地化架构,尽管没有定义全局架构。问题仍然存在,如何在不循环的情况下将一个 Dataframe 的行写入不同的目的地。另一种方法是一次将一个文件加载到它自己的 Dataframe 中,然后一次写入一行。

以上是关于将写入操作映射到 Dataframe 行组到不同的增量表的主要内容,如果未能解决你的问题,请参考以下文章

我如何将每个Parquet行组读入一个单独的分区?

JAXB 继承,解组到编组类的子类

Spark操作dataFrame进行写入mysql,自定义sql的方式

将相同的行组聚合为一行

使用 pySpark 将 DataFrame 写入 mysql 表

Pandas dataframe数据写入文件和数据库