从时间戳中提取时间分辨率以获取列中特定值的最佳方法是啥?

Posted

技术标签:

【中文标题】从时间戳中提取时间分辨率以获取列中特定值的最佳方法是啥?【英文标题】:What is the best way to extract time resolution from timestamp for specific value in column?从时间戳中提取时间分辨率以获取列中特定值的最佳方法是什么? 【发布时间】:2021-11-06 16:32:56 【问题描述】:

假设我有以下 Spark 框架:

+--------------------------+-----+
|timestamp                 |name |
+--------------------------+-----+
|2021-11-06 16:29:00.004204|Alice|
|2021-11-06 16:29:00.004204|Bob  |
+--------------------------+-----+

现在我想根据特定name == 'Alice'时间戳提取记录/行的计数值,如下所示:

第一个 12 小时工作班次 (00:00-11:59:59) 第二次 12 小时轮班 (12:00-23:59:59) 第一个 8 小时工作班次 (00:00-07:59:59) 第二次 8 小时轮班 (08:00-15:59:59) 第三次 8 小时轮班 (16:00-23:59:59)

并将结果返回给 Spark 框架。我尝试了以下方法没有成功:

import time
import datetime as dt
from pyspark.sql import functions as F
from pyspark.sql.functions import *
from pyspark.sql.functions import dayofmonth, dayofweek
from pyspark.sql.functions import to_date
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, TimestampType

dict = [ 'name': 'Alice',
         'name': 'Bob']
#df = spark.createDataFrame(dict)
schema = StructType([ 
    StructField("timestamp",        TimestampType(), True), \
    StructField("date",             StringType(), True), \
    StructField("name",            StringType(), True), \
  ])
 
#create a Spark dataframe
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(data=dict,schema=schema)
sdf.printSchema()
sdf.show(truncate=False)

#Generate data and timestamp
new_df = sdf.withColumn('timestamp',    F.current_timestamp().cast("timestamp")) \
            .withColumn('date',         F.current_date().cast("date")) \
            .withColumn('day_of_month', dayofmonth('timestamp')) \
            .withColumn('day_of_week', ((dayofweek('timestamp')+5)%7)+1)  # start of the week as a Monday = 1 (by default is Sunday = 1)
            #.withColumn("No. records in 1st 12-hrs",from_unixtime(unix_timestamp(col("timestamp"),"yyyy-MM-dd HH:mm:ss"),"HH:mm:ss")) \
            #.filter(col("timestamp").between("00:00","11:59")) \
            #.groupBy("No. records in 1st 12-hrs", "name").sum("Count") \
            #.withColumn("No. records in 1st 12-hrs",from_unixtime(unix_timestamp(col("timestamp"),"yyyy-MM-dd HH:mm:ss"),"HH:mm:ss")) \
            #.filter(col("timestamp").between("12:00","23:59")) \
            #.groupBy("No. records in 1st 12-hrs" , "name").sum("Count") \

            #.withColumn('# No. records in 1st 8-hrs shift (00:00-07:59:59)', ????('timestamp')) \
            #.withColumn('# No. records in 2nd 8-hrs shift (08:00-15:59:59)', ????('timestamp')) \
            #.withColumn('# No. records in 3rd 8-hrs shift (16:00-23:59:59)', ????('timestamp')) \
new_df.show(truncate = False)

所以到目前为止,我的输出如下,您可以在Colab notebook 中尝试:

+--------------------------+----------+-----+------------+-----------+
|timestamp                 |date      |name |day_of_month|day_of_week|
+--------------------------+----------+-----+------------+-----------+
|2021-11-06 16:17:43.698815|2021-11-06|Alice|6           |6          |
|2021-11-06 16:17:43.698815|2021-11-06|Bob  |6           |6          |
+--------------------------+----------+-----+------------+-----------+

另外,我查看了一些帖子中的Spark based data filtering 以及一个很酷的answer 和Group spark dataframe by date,以应用于特定的name 主火花框架,除了工作班次范围。

请注意,我对使用UDF 或通过toPandas() 破解它不感兴趣

所以预期结果应该是特定name == 'Alice'的结果:

+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+
|No. records in 1st 12-hrs |No. records in 1st 12-hrs |No. records in 1st 8-hrs  |No. records in 2nd 8-hrs  |No. records in 3rd 8-hrs  |
+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+
|                          |                          |                          |                          |                          |
+--------------------------+--------------------------+--------------------------+--------------------------+--------------------------+

【问题讨论】:

【参考方案1】:

您可以通过检查时间戳的小时部分在[0, 11][12, 23] 等之间来实现...

import pyspark.sql.functions as F

new_df = sdf.groupBy("name").agg(
    F.sum(F.hour("timestamp").between(0, 11).cast("int")).alias("1st-12-hrs"),
    F.sum(F.hour("timestamp").between(12, 23).cast("int")).alias("2nd-12-hrs"),
    F.sum(F.hour("timestamp").between(0, 7).cast("int")).alias("1st-8-hrs"),
    F.sum(F.hour("timestamp").between(8, 15).cast("int")).alias("2nd-8-hrs"),
    F.sum(F.hour("timestamp").between(16, 23).cast("int")).alias("3rd-8-hrs"),
)

new_df.show()

#+-----+----------+----------+---------+---------+---------+
#|name |1st-12-hrs|2nd-12-hrs|1st-8-hrs|2nd-8-hrs|3rd-8-hrs|
#+-----+----------+----------+---------+---------+---------+
#|Bob  |0         |1         |0        |0        |1        |
#|Alice|0         |1         |0        |0        |1        |
#+-----+----------+----------+---------+---------+---------+

【讨论】:

以上是关于从时间戳中提取时间分辨率以获取列中特定值的最佳方法是啥?的主要内容,如果未能解决你的问题,请参考以下文章

从熊猫数据框中提取在特定列中具有特定值的所有行

从长的复杂日期加上时间戳中提取日期?

如何从java中的字符串时间戳中提取日期和时间

如何从Django中的时间戳中仅提取一天

从时间戳中提取时间?

火花数据集:如何从列中获取唯一值的出现次数