如何重新采样(下采样)时间序列大数据,从 10 Hz(毫秒)想要使用 pyspark 转换为 1 Hz(秒)
Posted
技术标签:
【中文标题】如何重新采样(下采样)时间序列大数据,从 10 Hz(毫秒)想要使用 pyspark 转换为 1 Hz(秒)【英文标题】:How to resample (Downsample) the time series big data, from 10 Hz (miliseconds) wants to convert to 1 Hz (seconds) using pyspark 【发布时间】:2021-12-28 13:04:10 【问题描述】:我正在使用 pyspark 处理时间序列大数据,我有 GB(100 GB 或更多)的数据,行数以百万或十亿为单位。我是使用 pyspark 的大数据新手。想要重新采样(下采样)数据原始数据以毫秒为单位的时间戳为 10 Hz 我想以秒为单位将此数据转换为 1 Hz。如果您能给我一些想法,那将非常有帮助。如果您可以向我推荐任何我可以用来使用 spark 处理(大数据)大数据的文档/解决方案,那也很棒。以下是样本数据。 DF=
start_timestamp | end_timestamp | value |
---|---|---|
2020-11-05 03:25:02.088 | 2020-11-05 04:10:19.288 | 0.0 |
2020-11-05 04:24:25.288 | 2020-11-05 04:24:25.218 | 0.4375 |
2020-11-05 04:24:25.218 | 2020-11-05 04:24:25.318 | 1.0625 |
2020-11-05 04:24:25.318 | 2020-11-05 04:24:25.418 | 1.21875 |
2020-11-05 04:24:25.418 | 2020-11-05 04:24:25.518 | 1.234375 |
2020-11-05 04:24:25.518 | 2020-11-05 04:24:25.618 | 1.265625 |
2020-11-05 04:24:25.618 | 2020-11-05 04:24:25.718 | 1.28125 |
我尝试了我得到的代码:PySpark: how to resample frequencies
这是我的示例代码:
day = 1 #60 * 60 * 24
epoch = (col("start_timestamp").cast("bigint") / day).cast("bigint") * day
with_epoch = distinctDF.withColumn("epoch", epoch)
min_epoch, max_epoch = with_epoch.select(min_("epoch"), max_("epoch")).first()
ref = spark.range(
min_epoch, max_epoch + 1, day
).toDF("epoch")
(ref
.join(with_epoch, "epoch", "left")
.orderBy("epoch")
.withColumn("start_timestamp_resampled", timestamp_seconds("epoch"))
.show(15, False))
代码正在运行,但我不确定它是否正确:输出如下所示。但它是否在列中显示空值。
epoch | start_timestamp | end_timestamp | value | start_timestamp_resampled |
---|---|---|---|---|
1604546702 | 2020-11-05 03:25:02.088 | 2020-11-05 04:10:19.288 | 0.0 | 2020-11-05 03:25:02 |
1604546703 | null | null | null | 2020-11-05 03:25:03 |
1604546704 | null | null | null | 2020-11-05 03:25:04 |
1604546705 | null | null | null | 2020-11-05 03:25:05 |
1604546706 | null | null | null | 2020-11-05 03:25:06 |
1604546707 | null | null | null | 2020-11-05 03:25:07 |
【问题讨论】:
欢迎来到Stack Overflow.!要求有关问题方法的一般指导的问题通常过于宽泛,不适合本网站。人们有自己解决问题的方法,因此不可能有正确的答案。仔细阅读 Where to Start 和 Minimal Reproducible Example,然后编辑您的帖子。 10Hz的频率不是毫秒周期,所以你的问题标题已经自相矛盾了。 @UlrichEckhardt 我的时间戳看起来像这样“2020-11-05 03:25:02.088”所以 0.088 以毫秒为单位,所有数据都以 10 Hz 记录。所以想将其下采样到 1Hz(即毫秒到秒)。 Hold on:有单位(如1.52s中的秒)、分辨率(如1/50秒或0.02s)和采样率(如20Hz频率或0.05s周期)。请注意,您的时间戳没有单位,而是不同单位的混合形式(不确定正确的术语)。然后,您可以下采样但仍保持分辨率。您还可以转换单位,例如从那种混合形式秒或分钟。此外,当然,您可以放弃精度并舍入到更小的分辨率。你到底想要什么?也许,对于一组示例输入,您可以提供预期的输出。 【参考方案1】:在进行下采样时,您必须考虑如何处理丢失的数据。
使用连接,您只会在时间戳匹配时获取数据。但您也可以决定使用以下方法聚合数据点:均值、最大值、最小值、总和...
我会怎么做:
import pyspark.sql.functions as F
df = df.withColumn("Timestamp_resampled", F.date_trunc(timestamp, format='yyyy-MM-dd HH:mm:ss'))
df = df.groupby("Timestamp_resampled").agg(<function of your choice>)
然后,一旦重新采样,如果您缺少时间戳,您可以使用带有 join
和 epoch_range
的方法来填充缺少的时间戳,并确保每秒都有一个。
【讨论】:
谢谢,因为我的主 DF 列“start_timestamp”以毫秒为单位(格式='yyyy-MM-dd HH:mm:ss.sss)。我有点困惑并得到错误:“语法错误:位置参数遵循关键字参数”。根据您的解决方案,此代码应该进行下采样,对吗? ''' DF1 = DF.withColumn("start_timestamp", F.date_trunc(format='yyyy-MM-dd HH:mm:ss', timestamp)) df = DF1.groupby("start_timestamp").agg(mean) ''' @SSS,尝试传递“时间戳”参数拳头,然后传递“格式”,这就是错误的含义。 感谢解决方案,它运行良好。 :) @SSS,我已经更新了答案,如果可行,请随时接受!以上是关于如何重新采样(下采样)时间序列大数据,从 10 Hz(毫秒)想要使用 pyspark 转换为 1 Hz(秒)的主要内容,如果未能解决你的问题,请参考以下文章