如何重新采样(下采样)时间序列大数据,从 10 Hz(毫秒)想要使用 pyspark 转换为 1 Hz(秒)

Posted

技术标签:

【中文标题】如何重新采样(下采样)时间序列大数据,从 10 Hz(毫秒)想要使用 pyspark 转换为 1 Hz(秒)【英文标题】:How to resample (Downsample) the time series big data, from 10 Hz (miliseconds) wants to convert to 1 Hz (seconds) using pyspark 【发布时间】:2021-12-28 13:04:10 【问题描述】:

我正在使用 pyspark 处理时间序列大数据,我有 GB(100 GB 或更多)的数据,行数以百万或十亿为单位。我是使用 pyspark 的大数据新手。想要重新采样(下采样)数据原始数据以毫秒为单位的时间戳为 10 Hz 我想以秒为单位将此数据转换为 1 Hz。如果您能给我一些想法,那将非常有帮助。如果您可以向我推荐任何我可以用来使用 spark 处理(大数据)大数据的文档/解决方案,那也很棒。以下是样本数据。 DF=

start_timestamp end_timestamp value
2020-11-05 03:25:02.088 2020-11-05 04:10:19.288 0.0
2020-11-05 04:24:25.288 2020-11-05 04:24:25.218 0.4375
2020-11-05 04:24:25.218 2020-11-05 04:24:25.318 1.0625
2020-11-05 04:24:25.318 2020-11-05 04:24:25.418 1.21875
2020-11-05 04:24:25.418 2020-11-05 04:24:25.518 1.234375
2020-11-05 04:24:25.518 2020-11-05 04:24:25.618 1.265625
2020-11-05 04:24:25.618 2020-11-05 04:24:25.718 1.28125

我尝试了我得到的代码:PySpark: how to resample frequencies

这是我的示例代码:

day = 1   #60 * 60 * 24
epoch = (col("start_timestamp").cast("bigint") / day).cast("bigint") * day

with_epoch = distinctDF.withColumn("epoch", epoch)

min_epoch, max_epoch = with_epoch.select(min_("epoch"), max_("epoch")).first()


ref = spark.range(
    min_epoch, max_epoch + 1, day
).toDF("epoch")  
(ref
    .join(with_epoch, "epoch", "left")
    .orderBy("epoch")
    .withColumn("start_timestamp_resampled", timestamp_seconds("epoch"))
    .show(15, False))

代码正在运行,但我不确定它是否正确:输出如下所示。但它是否在列中显示空值。

epoch start_timestamp end_timestamp value start_timestamp_resampled
1604546702 2020-11-05 03:25:02.088 2020-11-05 04:10:19.288 0.0 2020-11-05 03:25:02
1604546703 null null null 2020-11-05 03:25:03
1604546704 null null null 2020-11-05 03:25:04
1604546705 null null null 2020-11-05 03:25:05
1604546706 null null null 2020-11-05 03:25:06
1604546707 null null null 2020-11-05 03:25:07

【问题讨论】:

欢迎来到Stack Overflow.!要求有关问题方法的一般指导的问题通常过于宽泛,不适合本网站。人们有自己解决问题的方法,因此不可能有正确的答案。仔细阅读 Where to Start 和 Minimal Reproducible Example,然后编辑您的帖子。 10Hz的频率不是毫秒周期,所以你的问题标题已经自相矛盾了。 @UlrichEckhardt 我的时间戳看起来像这样“2020-11-05 03:25:02.088”所以 0.088 以毫秒为单位,所有数据都以 10 Hz 记录。所以想将其下采样到 1Hz(即毫秒到秒)。 Hold on:有单位(如1.52s中的秒)、分辨率(如1/50秒或0.02s)和采样率(如20Hz频率或0.05s周期)。请注意,您的时间戳没有单位,而是不同单位的混合形式(不确定正确的术语)。然后,您可以下采样但仍保持分辨率。您还可以转换单位,例如从那种混合形式秒或分钟。此外,当然,您可以放弃精度并舍入到更小的分辨率。你到底想要什么?也许,对于一组示例输入,您可以提供预期的输出。 【参考方案1】:

在进行下采样时,您必须考虑如何处理丢失的数据。

使用连接,您只会在时间戳匹配时获取数据。但您也可以决定使用以下方法聚合数据点:均值、最大值、最小值、总和...

我会怎么做:

import pyspark.sql.functions as F
df = df.withColumn("Timestamp_resampled", F.date_trunc(timestamp, format='yyyy-MM-dd HH:mm:ss'))
df = df.groupby("Timestamp_resampled").agg(<function of your choice>)

然后,一旦重新采样,如果您缺少时间戳,您可以使用带有 joinepoch_range 的方法来填充缺少的时间戳,并确保每秒都有一个。

【讨论】:

谢谢,因为我的主 DF 列“start_timestamp”以毫秒为单位(格式='yyyy-MM-dd HH:mm:ss.sss)。我有点困惑并得到错误:“语法错误:位置参数遵循关键字参数”。根据您的解决方案,此代码应该进行下采样,对吗? ''' DF1 = DF.withColumn("start_timestamp", F.date_trunc(format='yyyy-MM-dd HH:mm:ss', timestamp)) df = DF1.groupby("start_timestamp").agg(mean) ''' @SSS,尝试传递“时间戳”参数拳头,然后传递“格式”,这就是错误的含义。 感谢解决方案,它运行良好。 :) @SSS,我已经更新了答案,如果可行,请随时接受!

以上是关于如何重新采样(下采样)时间序列大数据,从 10 Hz(毫秒)想要使用 pyspark 转换为 1 Hz(秒)的主要内容,如果未能解决你的问题,请参考以下文章

猪:如何重新采样时间序列数据?

如何在管道中重新采样文本(不平衡组)?

如何在不更改特定列的情况下对数据框中的数据进行重新采样?

如何在不更改特定列的情况下对数据框中的数据进行重新采样?

如何将 pandas Dataframe 时间序列数据从 8hz 重新采样到 16hz?

python中数据的随机采样