PySpark 中具有多列的日期算术

Posted

技术标签:

【中文标题】PySpark 中具有多列的日期算术【英文标题】:Date Arithmetic with Multiple Columns in PySpark 【发布时间】:2016-04-12 00:11:39 【问题描述】:

我正在尝试使用 PySpark 数据框中的多个列进行一些中等复杂的日期算术。基本上,我有一个名为number 的列,它表示我需要过滤的created_at 时间戳之后的周数。在 PostgreSQL 中,您可以乘以 interval based on the value in a column,但我似乎无法弄清楚如何在 PySpark 中使用 SQL API 或 Python API 来做到这一点。在这里的任何帮助将不胜感激!

import datetime
from pyspark.sql import SQLContext
from pyspark.sql import Row
from pyspark import SparkContext

sc = SparkContext()
sqlContext = SQLContext(sc)
start_date = datetime.date(2020,1,1)

my_df = sc.parallelize([
        Row(id=1, created_at=datetime.datetime(2020, 1, 1), number=1,  metric=10),
        Row(id=1, created_at=datetime.datetime(2020, 1, 1), number=2,  metric=10),
        Row(id=1, created_at=datetime.datetime(2020, 1, 1), number=3,  metric=10),
        Row(id=2, created_at=datetime.datetime(2020, 1, 15), number=1,  metric=20),
        Row(id=2, created_at=datetime.datetime(2020, 1, 15), number=2,  metric=20),
        Row(id=3, created_at=datetime.datetime(2020, 7, 1), number=7,  metric=30),
        Row(id=3, created_at=datetime.datetime(2020, 7, 1), number=8,  metric=30),
        Row(id=3, created_at=datetime.datetime(2020, 7, 1), number=9,  metric=30),
        Row(id=3, created_at=datetime.datetime(2020, 7, 1), number=10, metric=30),
    ]).toDF()


# This doesn't work!
new_df = my_df.where("created_at + interval 7 days * number > '" + start_date.strftime("%Y-%m-%d") +"'")
# Neither does this!
new_df = my_df.filter(my_df.created_at + datetime.timedelta(days=my_df.number * 7)).date() > start_date.date()

有一个可能的solution here 需要将日期转换为字符串,使用python 中的datetime 库将字符串转换为datetime 对象,然后执行操作,但这似乎很疯狂。

【问题讨论】:

【参考方案1】:

好的,我找到了使用expr 和内置date_add 函数的方法。

from pyspark.sql.functions import expr, date_add
new_df = my_df.withColumn('test', expr('date_add(created_at, number*7)'))
filtered = new_df.filter(new_df.test > start_date)
filtered.show()

如果其他人想要添加,我希望能深入了解它是如何/为什么以一般方式工作的!

【讨论】:

以上是关于PySpark 中具有多列的日期算术的主要内容,如果未能解决你的问题,请参考以下文章

python + pyspark:在pyspark中进行多列比较的内部连接错误

Pyspark - 如何检查两条记录中哪一条具有最新日期及其列值?

从pyspark的多列中选择非空值

Pyspark:在UDF中传递多列以及参数

PySpark 一次替换多列中的值

Pyspark - 多列聚合