在 PySpark 中为镶木地板文件过滤日期时间范围和时区

Posted

技术标签:

【中文标题】在 PySpark 中为镶木地板文件过滤日期时间范围和时区【英文标题】:Filter between datetime ranges with timezone in PySpark for parquet files 【发布时间】:2020-08-25 14:52:19 【问题描述】:

根据here 的建议,我想知道如何使用 PySpark 过滤带有时区的日期时间范围。

我的数据如下所示:

ABC,2020-06-22T19:17:16.428+0000

DEF,2020-06-22T19:17:16.435+0000

JKL,2020-06-22T19:17:16.468+0000

移动网络运营商,2020-06-22T19:17:16.480+0000

XYZ,2020-06-22T19:17:16.495+0000

在这种情况下,我只想提取那些毫秒数在 400-450 之间的记录。

试过了,但没用:

import pyspark.sql.functions as func
df = df.select(func.to_date(df.UpdatedOn).alias("time"))
sf = df.filter(df.time > '2020-06-22T19:17:16.400').filter(df.time < '2020-06-22T19:17:16.451')

【问题讨论】:

你能给出df的schema吗? df.printSchema() root |-- 时间:日期(可为空=真) 【参考方案1】:

当您使用to_date 时,它会截断小时,因此您必须使用to_timestamp 并进行比较。

df.withColumn('date', to_timestamp('date')) \
  .filter("date between to_timestamp('2020-06-22T19:17:16.400') and to_timestamp('2020-06-22T19:17:16.451')") \
  .show(10, False)

+---+-----------------------+
|id |date                   |
+---+-----------------------+
|ABC|2020-06-22 19:17:16.428|
|DEF|2020-06-22 19:17:16.435|
+---+-----------------------+

【讨论】:

我将代码中的日期列替换为“UpdatedOn”,这是我在 OP 中的列,但出现错误:无法解析给定输入列的“UpdatedOn 测试将您的代码与 to_timestamp 一起使用并修改您的过滤器。 您是如何读取数据并制作数据框的?如果可能,请在您发布的内容的前面部分添加更多代码。 我从 Source 数据库中读取数据并将其编写为 parquet 文件。现在,对于 DWH 中的增量加载,我只想提取从昨天更改的记录。正如我在另一篇关于层次结构(表/年/月/日)的帖子中提到的,我想传递一个日期并读取仅匹配该日期之后的记录的镶木地板文件。

以上是关于在 PySpark 中为镶木地板文件过滤日期时间范围和时区的主要内容,如果未能解决你的问题,请参考以下文章

带有 hive 的 pyspark - 无法正确创建分区并从数据框中保存表

无法将数据框保存到镶木地板 pyspark

如何将 csv 文件转换为镶木地板

将 avro 转换为镶木地板(也许使用 hive?)

转换为镶木地板的 csv 文件将“e0”添加到值的末尾

PySpark - Parquet - 调用 None.None 时发生错误