在 Spark 中创建分箱直方图

Posted

技术标签:

【中文标题】在 Spark 中创建分箱直方图【英文标题】:Creating binned histograms in Spark 【发布时间】:2015-12-29 06:18:39 【问题描述】:

假设我有一个包含以下两列的数据框 (df) (Pandas) 或 RDD (Spark):

timestamp, data
12345.0    10 
12346.0    12

在 Pandas 中,我可以很容易地创建不同 bin 长度的 bin 直方图。例如,要创建超过 1 小时的直方图,我执行以下操作:

df =  df[ ['timestamp', 'data'] ].set_index('timestamp')
df.resample('1H',how=sum).dropna()

从 Spark RDD 迁移到 Pandas df 对我来说非常昂贵(考虑到数据集)。因此,我更愿意尽可能地留在 Spark 域中。

有没有办法在 Spark RDD 或数据帧中做同样的事情?

【问题讨论】:

Spark RDD 或 DataFrame 遗憾地没有索引,并且 spark 不提供低级操作,因为 pandas 绝对没有重新采样 ts 。 最近有一个关于时间序列的 Cloudera Spark 包,它也有 Python doc here。我不知道它是否是您正在寻找的东西,但它确实说它是时间序列的类似熊猫的功能。 WoodChopper:“没有索引”是什么意思?您指的是 Pandas 中可用的“set_index”功能吗? 【参考方案1】:

火花 >= 2.0

你可以使用window函数

from pyspark.sql.functions import window

(df
    .groupBy(window("timestamp", "3 minute").alias("ts"))
    .sum()
    .orderBy("ts")
    .show())
## +--------------------+---------+
## |                  ts|sum(data)|
## +--------------------+---------+
## |2000-01-01 00:00...|        3|
## |2000-01-01 00:03...|       12|
## |2000-01-01 00:06...|       21|
## +--------------------+---------+

(df
    .groupBy(window("timestamp", "3 minute").alias("ts"))
    .sum()
    .orderBy("ts")
    .show())

## +--------------------+---------+
## |                  ts|sum(data)|
## +--------------------+---------+
## |2000-01-01 00:00...|       36|
## +--------------------+---------+

火花

在这种特殊情况下,您只需要 Unix 时间戳和基本算术:

from pyspark.sql.functions import timestamp_seconds

def resample_to_minute(c, interval=1):
    t = 60 * interval
    # For Spark < 3.1 
    # return (floor(c / t) * t).cast("timestamp")
    return timestamp_seconds(floor(c / t) * t)

def resample_to_hour(c, interval=1):
    return resample_to_minute(c, 60 * interval)

df = sc.parallelize([
    ("2000-01-01 00:00:00", 0), ("2000-01-01 00:01:00", 1),
    ("2000-01-01 00:02:00", 2), ("2000-01-01 00:03:00", 3),
    ("2000-01-01 00:04:00", 4), ("2000-01-01 00:05:00", 5),
    ("2000-01-01 00:06:00", 6), ("2000-01-01 00:07:00", 7),
    ("2000-01-01 00:08:00", 8)
]).toDF(["timestamp", "data"])

(df.groupBy(resample_to_minute(unix_timestamp("timestamp"), 3).alias("ts"))
    .sum().orderBy("ts").show(3, False))

## +---------------------+---------+
## |ts                   |sum(data)|
## +---------------------+---------+
## |2000-01-01 00:00:00.0|3        |
## |2000-01-01 00:03:00.0|12       |
## |2000-01-01 00:06:00.0|21       |
## +---------------------+---------+

(df.groupBy(resample_to_hour(unix_timestamp("timestamp")).alias("ts"))
    .sum().orderBy("ts").show(3, False))
## +---------------------+---------+
## |ts                   |sum(data)|
## +---------------------+---------+
## |2000-01-01 00:00:00.0|36       |
## +---------------------+---------+

来自pandas.DataFrame.resample documentation 的示例数据。

一般情况见Making histogram with Spark DataFrame column

【讨论】:

【参考方案2】:

这是一个使用 RDD 而不是数据帧的答案:

# Generating some data to test with 
import random
import datetime

startTS = 12345.0
array = [(startTS+60*k, random.randrange(10, 20)) for k in range(150)]

# Initializing a RDD
rdd = sc.parallelize(array)

# I first map the timestamps to datetime objects so I can use the datetime.replace 
# method to round the times
formattedRDD = (rdd
                .map(lambda (ts, data): (datetime.fromtimestamp(int(ts)), data))
                .cache())

# Putting the minute and second fields to zero in datetime objects is 
# exactly like rounding per hour. You can then reduceByKey to aggregate bins.
hourlyRDD = (formattedRDD
             .map(lambda (time, msg): (time.replace(minute=0, second=0), 1))
             .reduceByKey(lambda a, b : a + b))

hourlyHisto = hourlyRDD.collect()
print hourlyHisto
> [(datetime.datetime(1970, 1, 1, 4, 0), 60), (datetime.datetime(1970, 1, 1, 5, 0), 55), (datetime.datetime(1970, 1, 1, 3, 0), 35)]

为了进行每日汇总,您可以使用 time.date() 而不是 time.replace(...)。此外,要从非圆形日期时间对象开始每小时装箱,您可以将原始时间增加增量到最接近的圆形小时。

【讨论】:

以上是关于在 Spark 中创建分箱直方图的主要内容,如果未能解决你的问题,请参考以下文章

如何在 R 中绘制预分箱直方图

在 javascript 中将数组分箱以获得直方图

R语言plotly可视化:plotly可视化多个数据集归一化直方图(historgram)设置不同的直方图使用不同的分箱大小(bin size)在直方图的底部边缘添加边缘轴须图rug

使用gnuplot的直方图?

R语言plotly可视化:可视化多个数据集归一化直方图(historgram)并在直方图中添加密度曲线kde设置不同的直方图使用不同的分箱大小(bin size)在直方图的底部边缘添加边缘轴须图

使用分箱 X 值 Python 制作条形图