使用pyspark分区时循环覆盖模式

Posted

技术标签:

【中文标题】使用pyspark分区时循环覆盖模式【英文标题】:Overwrite mode in loop when partition using pyspark 【发布时间】:2019-12-13 20:34:51 【问题描述】:
#Start and End is a range of dates. 
start = date(2019, 1, 20)
end = date(2019, 1, 22)

for single_date in daterange(start, end):
  query = "(SELECT ID, firstname,lastname,date FROM dbo.emp WHERE date = '%s' ) emp_alias" %((single_date).strftime("%Y-%m-%d %H:%M:%S")) 
  df = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProperties)
  df.write.format("parquet").mode("ignore").partitionBy("Date").save("/mnt/data/empData.parquet")

我有一个表中的天数数据,我需要按日期分区的镶木地板文件。我必须在循环中按天保存,因为数据量很大,而且我不能将所有天数(如年份数据)放在一个数据框中。我尝试了所有保存模式。在“忽略”模式下,它会保存第一天。在“覆盖”模式下,它会保存最后一天。在“附加”模式下,它添加数据。我需要的是,如果那天有可用的数据,它应该忽略那一天并保留已经存在的数据,但如果数据不可用,则在按日期分区的镶木地板文件中创建。请帮忙。

【问题讨论】:

【参考方案1】:

如果您还想使用 Hive 分区(当您调用方法 @987654321 @)。请注意,可以选择执行相反的操作,即覆盖某些分区中的数据,同时保留 DataFrame 中没有数据的分区(将配置设置 "spark.sql.sources.partitionOverwriteMode" 设置为 "dynamic" 并使用 SaveMode.Overwrite写数据集时)。

您仍然可以通过首先创建一组所有现有分区来实现您想要的。您可以使用 PySpark 或使用任何允许您在文件系统(如 Azure Data Lake Storage Gen2)或键值存储(如 AWS S3)中执行列表操作的库来做到这一点。获得该列表后,您可以使用它来过滤新数据集以获取您仍要写入的数据。这是一个仅使用 PySpark 的示例:

In [1]: from pyspark.sql.functions import lit
   ...: df = spark.range(3).withColumn("foo", lit("bar"))
   ...: dir = "/tmp/foo"
   ...: df.write.mode("overwrite").partitionBy("id").parquet(dir)  # initial seeding
   ...: ! tree /tmp/foo
   ...: 
   ...: 
/tmp/foo                                                                        
├── id=0
│   └── part-00001-5d14d286-81e1-4eb1-969e-c0d8089712ce.c000.snappy.parquet
├── id=1
│   └── part-00002-5d14d286-81e1-4eb1-969e-c0d8089712ce.c000.snappy.parquet
├── id=2
│   └── part-00003-5d14d286-81e1-4eb1-969e-c0d8089712ce.c000.snappy.parquet
└── _SUCCESS

3 directories, 4 files

In [2]: df2 = spark.range(5).withColumn("foo", lit("baz"))
   ...: existing_partitions = spark.read.parquet(dir).select("id").distinct()
   ...: df3 = df2.join(existing_partitions, "id", how="left_anti")
   ...: df3.write.mode("append").partitionBy("id").parquet(dir)
   ...: spark.read.parquet(dir).orderBy("id").show()
   ...: 
   ...: 
+---+---+                                                                       
|foo| id|
+---+---+
|bar|  0|
|bar|  1|
|bar|  2|
|baz|  3|
|baz|  4|
+---+---+

如您所见,只添加了 2 个分区。已经存在的那些都被保留了。

现在,获取existing_partitions DataFrame 需要读取数据。 Spark 实际上不会读取所有数据,只是读取分区列和元数据。如前所述,您也可以使用与数据存储位置相关的任何 API 来获取这些数据。在我和你的特殊情况下,看到你如何写入 Databricks 上的 /mnt 文件夹,我可以简单地使用内置 Python 函数 os.walk:dirnames = next(os.walk(dir))[1],并创建一个 DataFrame从此。

顺便说一句,你得到你所看到的行为的原因是:

    忽略模式

    在“忽略”模式下,它会保存第一天。

    因为您使用的是 for 循环并且输出目录最初可能不存在,所以将写入第一个日期分区。在 for 循环的所有后续迭代中,DataFrameWriter 对象将不再写入,因为它认为那里已经有一些数据(一个分区,用于第一个日期)。

    覆盖模式

    在“覆盖”模式下,它会保存最后一天。

    实际上,它在 for 循环的每次迭代中都保存了一个分区,但是由于您正在指示 DataFrameWriter 覆盖,它会删除目录中所有先前存在的分区。所以看起来只写了最后一个。

    追加模式

    在“追加”模式下,它添加数据 这个不用多解释了。

一个建议:可能不需要多次从数据库中读取数据(使用 for 循环创建多个不同的查询和 jdbc 连接)。您可能可以将查询更新为 WHERE date BETWEEN %(start) AND %(end),完全删除 for 循环并享受高效的写入。

【讨论】:

以上是关于使用pyspark分区时循环覆盖模式的主要内容,如果未能解决你的问题,请参考以下文章

pyspark - 分区数据的计算(使用“附加”模式创建)慢

PySpark 使用“覆盖”模式保存到 Redshift 表会导致删除表?

使用 PySpark 读取 CSV 时是不是可以仅覆盖一种列类型?

使用自定义分区器对 Pyspark 中的数据框进行分区

使用 pyspark 插入镶木地板文件时,Hive 表需要对每个新分区进行“修复”

用修改后的 PySpark DataFrame 覆盖现有 Parquet 数据集