python [Spark中的简单时间戳聚合] #spark
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python [Spark中的简单时间戳聚合] #spark相关的知识,希望对你有一定的参考价值。
import datetime as dt
from pyspark.sql.types import *
from context import initialize
MY_SCHEMA = StructType([
StructField('ts', TimestampType(), True),
])
if __name__ == '__main__':
data = [
(dt.datetime(2017, 9, 1, hour, minute),)
for hour in range(24)
for minute in range(0, 60)
]
sc, sqlContext = initialize()
rdd = sc.parallelize(data)
df = sqlContext.createDataFrame(rdd, schema=MY_SCHEMA)
df.createOrReplaceTempView('df')
agg5m = sqlContext.sql("""
select window(ts, '5 minutes').start as ts_5min
from df
group by window(ts, '5 minutes')
""")
agg1h = sqlContext.sql("""
select window(ts, '1 hour').start as ts_1hour
from df
group by window(ts, '1 hour')
""")
rows5m = agg5m.sort(agg5m.ts_5min).collect()
rows1h = agg1h.sort(agg1h.ts_1hour).collect()
以上是关于python [Spark中的简单时间戳聚合] #spark的主要内容,如果未能解决你的问题,请参考以下文章
Spark SQL:与时间窗口聚合
Spark结构化流 - 使用模式从文件中读取时间戳
Spark Streaming:如何获取一天的时间戳计数?
如何将数组传递给 Spark (UDAF) 中的用户定义聚合函数
使用聚合管道聚合 MongoDB 中的时间戳集合
从 bigQuery 中的时间戳聚合变量