如何比较pyspark中的日期时间行对象

Posted

技术标签:

【中文标题】如何比较pyspark中的日期时间行对象【英文标题】:How to compare datetime row objects in pyspark 【发布时间】:2019-07-29 20:25:19 【问题描述】:

我有一个带有 normalized_date 列的数据框(beaconsDF):

+--------+--------------------+--------------------+
|isActive|             company|     Normalized_Date|
+--------+--------------------+--------------------+
|    true|[593b0d9f3f21f9dd...|09/25/2018 00:00:...|
|    true|[593b0d9f3f21f9dd...|11/29/2017 00:00:...|
|    true|[593b0d9f3f21f9dd...|04/01/2019 00:00:...|
|    true|[593b0d9f3f21f9dd...|09/25/2018 00:00:...|
|    true|[593b0d9f3f21f9dd...|11/20/2018 00:00:...|
|    true|[593b0d9f3f21f9dd...|09/25/2018 00:00:...|
|    true|[593b0d9f3f21f9dd...|01/04/2019 00:00:...|
|    true|[593b0d9f3f21f9dd...|01/04/2019 00:00:...|
+--------+--------------------+--------------------+

我想在这个简单的数据框(calendarDF2)中当日期等于 normalized_date 时过滤它:

+--------------------+
|     Normalized_Date|
+--------------------+
|11/28/2017 00:00:...|
+--------------------+

我认为这几行代码可以工作:

tempRow = calendarDF2.take(1)
beaconsDF = beaconsDF.filter(beaconsDF.Normalized_Date == tempRow)

但是我在这些行中遇到了一个很长的错误:

py4j.protocol.Py4JJavaError: 调用 o214.equalTo 时出错。 : java.lang.RuntimeException: 不支持的文字类型类 java.util.ArrayList [[11/28/2017 00:00:00 AM]]

我认为我的问题在于日期的格式,因为我对不同类型的格式感到非常困惑。我打印了两个数据框中的值来比较它们并得到了这个:

[Row(Normalized_Date=u'11/28/2017 00:00:00 AM')]  
[Row(Normalized_Date=u'04/01/2019 00:00:00 AM')]

看起来应该可以正确比较它们,对吧?我的问题也可能与我如何比较它们有关。我在某处看到我可能需要使用 3 个等号?我试过了,但没有用。我还尝试将 tempRow 设为文字,但这没有用。有什么想法吗?

编辑:我还想在将来按小于或等于 tempRow 日期进行过滤

【问题讨论】:

【参考方案1】:

使用collect,它返回一个list,从中提取的元素可用于比较。到目前为止,您正在将 list 与标量值进行比较,因此会出现错误。

tempRow = calendarDF2.collect()[0]['Normalized_Date']
beaconsDF = beaconsDF.filter(beaconsDF.Normalized_Date == tempRow)

【讨论】:

不幸的是,这导致了与以前相同的错误 成功了! '=' 将如何改变过滤?他们似乎改变了它,但不是以预期的方式 您应该将它们转换为datetime 类型以进行<=>= 比较,前提是它们不是以datetime 开头的。【参考方案2】:

除了使用 collect 从 DF 中获取 normalized_date 之外,我还建议使用带有已定义架构的模块 datetime 进行比较:

import datetime as dt

format = '%m/%d/%y %I:%M:%S %p'

calendar_date = calendarDF2.collect()[0]['Normalized_Date']
normalized_date = dt.datetime.strptime(calendar_date, format)

beaconsDF = beaconsDF.filter(dt.datetime.strptime(beaconsDF.Normalized_Date) == normalized_date)

【讨论】:

我收到错误“TypeError: strptime() argument 1 must be string, not Column”,我该如何解决?

以上是关于如何比较pyspark中的日期时间行对象的主要内容,如果未能解决你的问题,请参考以下文章

如何计算pyspark中的日期差异?

如何比较python中的日期时间对象?

如何将日期转换为 PySpark Dataframe 列中的第一天?

按日期将pyspark数据集分成两个[重复]

如何将所有日期格式转换为日期列的时间戳?

PySpark 中具有多列的日期算术