python [Spark中的简单时间戳聚合] #spark

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python [Spark中的简单时间戳聚合] #spark相关的知识,希望对你有一定的参考价值。

import datetime as dt
from pyspark.sql.types import *

from context import initialize

MY_SCHEMA = StructType([
    StructField('ts', TimestampType(), True),
])


if __name__ == '__main__':
    data = [
        (dt.datetime(2017, 9, 1, hour, minute),)
        for hour in range(24)
        for minute in range(0, 60)
    ]

    sc, sqlContext = initialize()
    rdd = sc.parallelize(data)
    df = sqlContext.createDataFrame(rdd, schema=MY_SCHEMA)
    df.createOrReplaceTempView('df')

    agg5m = sqlContext.sql("""
      select window(ts, '5 minutes').start as ts_5min 
      from df
      group by window(ts, '5 minutes')
    """)

    agg1h = sqlContext.sql("""
      select window(ts, '1 hour').start as ts_1hour 
      from df
      group by window(ts, '1 hour')
    """)

    rows5m = agg5m.sort(agg5m.ts_5min).collect()
    rows1h = agg1h.sort(agg1h.ts_1hour).collect()

以上是关于python [Spark中的简单时间戳聚合] #spark的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL:与时间窗口聚合

Spark结构化流 - 使用模式从文件中读取时间戳

Spark Streaming:如何获取一天的时间戳计数?

如何将数组传递给 Spark (UDAF) 中的用户定义聚合函数

使用聚合管道聚合 MongoDB 中的时间戳集合

从 bigQuery 中的时间戳聚合变量