在 Spark 中获取上周一

Posted

技术标签:

【中文标题】在 Spark 中获取上周一【英文标题】:Get Last Monday in Spark 【发布时间】:2016-10-26 20:45:09 【问题描述】:

我正在使用带有 Python API 的 Spark 2.0。

我有一个包含 DateType() 类型列的数据框。我想在包含最近星期一的数据框中添加一列。

我可以这样做:

reg_schema = pyspark.sql.types.StructType([
    pyspark.sql.types.StructField('AccountCreationDate', pyspark.sql.types.DateType(), True),
    pyspark.sql.types.StructField('UserId', pyspark.sql.types.LongType(), True)
])
reg = spark.read.schema(reg_schema).option('header', True).csv(path_to_file)
reg = reg.withColumn('monday',
    pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate,'E') == 'Mon',
        reg.AccountCreationDate).otherwise(
    pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate,'E') == 'Tue',
        pyspark.sql.functions.date_sub(reg.AccountCreationDate, 1)).otherwise(
    pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate, 'E') == 'Wed',
        pyspark.sql.functions.date_sub(reg.AccountCreationDate, 2)).otherwise(
    pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate, 'E') == 'Thu',
        pyspark.sql.functions.date_sub(reg.AccountCreationDate, 3)).otherwise(
    pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate, 'E') == 'Fri',
        pyspark.sql.functions.date_sub(reg.AccountCreationDate, 4)).otherwise(
    pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate, 'E') == 'Sat',
        pyspark.sql.functions.date_sub(reg.AccountCreationDate, 5)).otherwise(
    pyspark.sql.functions.when(pyspark.sql.functions.date_format(reg.AccountCreationDate, 'E') == 'Sun',
        pyspark.sql.functions.date_sub(reg.AccountCreationDate, 6))
        )))))))

然而,这似乎是一个相当简单的东西的大量代码。有更简洁的方法吗?

【问题讨论】:

【参考方案1】:

您可以使用next_day 确定下一个日期并减去一周。需要的函数可以按如下方式导入:

from pyspark.sql.functions import next_day, date_sub

又如:

def previous_day(date, dayOfWeek):
    return date_sub(next_day(date, "monday"), 7)

最后一个例子:

from pyspark.sql.functions import to_date

df = sc.parallelize([
    ("2016-10-26", )
]).toDF(["date"]).withColumn("date", to_date("date"))

df.withColumn("last_monday", previous_day("date", "monday"))

结果:

+----------+-----------+
|      date|last_monday|
+----------+-----------+
|2016-10-26| 2016-10-24|
+----------+-----------+

【讨论】:

非常感谢!这个功能真的好用【参考方案2】:

我发现pyspark的函数trunc也可以。

import pyspark.sql.functions as f

df = spark.createDataFrame([
    (datetime.date(2020, 10, 27), ),
    (datetime.date(2020, 12, 21), ),
    (datetime.date(2020, 10, 13), ),
    (datetime.date(2020, 11, 11), ),
], ["date_col"])
df = df.withColumn("first_day_of_week", f.trunc("date_col", "week"))

【讨论】:

【参考方案3】:
import pyspark.sql.functions as f

df = df.withColumn('days_from_monday', f.dayofweek(f.col('transaction_timestamp'))-2)      
df = df.withColumn('transaction_week_start_date', f.expr("date_sub(transaction_timestamp, days_from_monday)"))

【讨论】:

以上是关于在 Spark 中获取上周一的主要内容,如果未能解决你的问题,请参考以下文章

查询获取上周添加的产品详情

如何重置周日晚上 11:59(周一开始前的时间)从数据库中获取的记录

如何获取上周日到周六的日期

java获取某个时间的上周一和周日

如何在php中获取上周的日期(星期二或其他日期)?

mysql 获取当前日期周一和周日