使用pyspark分区时循环覆盖模式
Posted
技术标签:
【中文标题】使用pyspark分区时循环覆盖模式【英文标题】:Overwrite mode in loop when partition using pyspark 【发布时间】:2019-12-13 20:34:51 【问题描述】:#Start and End is a range of dates.
start = date(2019, 1, 20)
end = date(2019, 1, 22)
for single_date in daterange(start, end):
query = "(SELECT ID, firstname,lastname,date FROM dbo.emp WHERE date = '%s' ) emp_alias" %((single_date).strftime("%Y-%m-%d %H:%M:%S"))
df = spark.read.jdbc(url=jdbcUrl, table=query, properties=connectionProperties)
df.write.format("parquet").mode("ignore").partitionBy("Date").save("/mnt/data/empData.parquet")
我有一个表中的天数数据,我需要按日期分区的镶木地板文件。我必须在循环中按天保存,因为数据量很大,而且我不能将所有天数(如年份数据)放在一个数据框中。我尝试了所有保存模式。在“忽略”模式下,它会保存第一天。在“覆盖”模式下,它会保存最后一天。在“附加”模式下,它添加数据。我需要的是,如果那天有可用的数据,它应该忽略那一天并保留已经存在的数据,但如果数据不可用,则在按日期分区的镶木地板文件中创建。请帮忙。
【问题讨论】:
【参考方案1】:如果您还想使用 Hive 分区(当您调用方法 @987654321 @)。请注意,可以选择执行相反的操作,即覆盖某些分区中的数据,同时保留 DataFrame 中没有数据的分区(将配置设置 "spark.sql.sources.partitionOverwriteMode"
设置为 "dynamic"
并使用 SaveMode.Overwrite
写数据集时)。
您仍然可以通过首先创建一组所有现有分区来实现您想要的。您可以使用 PySpark 或使用任何允许您在文件系统(如 Azure Data Lake Storage Gen2)或键值存储(如 AWS S3)中执行列表操作的库来做到这一点。获得该列表后,您可以使用它来过滤新数据集以获取您仍要写入的数据。这是一个仅使用 PySpark 的示例:
In [1]: from pyspark.sql.functions import lit
...: df = spark.range(3).withColumn("foo", lit("bar"))
...: dir = "/tmp/foo"
...: df.write.mode("overwrite").partitionBy("id").parquet(dir) # initial seeding
...: ! tree /tmp/foo
...:
...:
/tmp/foo
├── id=0
│ └── part-00001-5d14d286-81e1-4eb1-969e-c0d8089712ce.c000.snappy.parquet
├── id=1
│ └── part-00002-5d14d286-81e1-4eb1-969e-c0d8089712ce.c000.snappy.parquet
├── id=2
│ └── part-00003-5d14d286-81e1-4eb1-969e-c0d8089712ce.c000.snappy.parquet
└── _SUCCESS
3 directories, 4 files
In [2]: df2 = spark.range(5).withColumn("foo", lit("baz"))
...: existing_partitions = spark.read.parquet(dir).select("id").distinct()
...: df3 = df2.join(existing_partitions, "id", how="left_anti")
...: df3.write.mode("append").partitionBy("id").parquet(dir)
...: spark.read.parquet(dir).orderBy("id").show()
...:
...:
+---+---+
|foo| id|
+---+---+
|bar| 0|
|bar| 1|
|bar| 2|
|baz| 3|
|baz| 4|
+---+---+
如您所见,只添加了 2 个分区。已经存在的那些都被保留了。
现在,获取existing_partitions
DataFrame 需要读取数据。 Spark 实际上不会读取所有数据,只是读取分区列和元数据。如前所述,您也可以使用与数据存储位置相关的任何 API 来获取这些数据。在我和你的特殊情况下,看到你如何写入 Databricks 上的 /mnt
文件夹,我可以简单地使用内置 Python 函数 os.walk
:dirnames = next(os.walk(dir))[1]
,并创建一个 DataFrame从此。
顺便说一句,你得到你所看到的行为的原因是:
忽略模式
在“忽略”模式下,它会保存第一天。
因为您使用的是 for 循环并且输出目录最初可能不存在,所以将写入第一个日期分区。在 for 循环的所有后续迭代中,DataFrameWriter 对象将不再写入,因为它认为那里已经有一些数据(一个分区,用于第一个日期)。
覆盖模式
在“覆盖”模式下,它会保存最后一天。
实际上,它在 for 循环的每次迭代中都保存了一个分区,但是由于您正在指示 DataFrameWriter 覆盖,它会删除目录中所有先前存在的分区。所以看起来只写了最后一个。
追加模式
在“追加”模式下,它添加数据 这个不用多解释了。
一个建议:可能不需要多次从数据库中读取数据(使用 for 循环创建多个不同的查询和 jdbc 连接)。您可能可以将查询更新为 WHERE date BETWEEN %(start) AND %(end)
,完全删除 for 循环并享受高效的写入。
【讨论】:
以上是关于使用pyspark分区时循环覆盖模式的主要内容,如果未能解决你的问题,请参考以下文章
pyspark - 分区数据的计算(使用“附加”模式创建)慢
PySpark 使用“覆盖”模式保存到 Redshift 表会导致删除表?
使用 PySpark 读取 CSV 时是不是可以仅覆盖一种列类型?