在具有非唯一索引列日期的 Dask 数据框中提取最新值

Posted

技术标签:

【中文标题】在具有非唯一索引列日期的 Dask 数据框中提取最新值【英文标题】:Extracting latest values in a Dask dataframe with non-unique index column dates 【发布时间】:2022-01-19 07:12:57 【问题描述】:

我对 pandas 数据帧非常熟悉,但我对 Dask 还是很陌生,所以我仍在努力解决我的代码并行化问题。 我已经使用 pandas 和 pandarallel 获得了我想要的结果,所以我想弄清楚我是否可以使用 Dask 扩大任务或以某种方式加速它。

假设我的数据框将日期时间作为非唯一索引、值列和 id 列。

time                        value   id
2021-01-01 00:00:00.210281  28.08   293707
2021-01-01 00:00:00.279228  28.07   293708
2021-01-01 00:00:00.697341  28.08   293709
2021-01-01 00:00:00.941704  28.08   293710
2021-01-01 00:00:00.945422  28.07   293711
...     ...     ...
2021-01-01 23:59:59.288914  29.84   512665
2021-01-01 23:59:59.288914  29.83   512666
2021-01-01 23:59:59.288914  29.82   512667
2021-01-01 23:59:59.525227  29.84   512668
2021-01-01 23:59:59.784754  29.84   512669

我要提取的是每秒的最新值。例如如果2021-01-01 00:00:01之前的价格是索引为2021-01-01 00:00:00.945422的行,则最新值为28.07

在我的情况下,有时索引值不是唯一的,因此作为决胜局,我想使用id 列。 id 编号最大的值将被视为最新值。对于在时间2021-01-01 23:59:59.288914 绑定的三个值的情况,将选择值29.82,因为该日期的最大id 将是512667。另请注意,id 在整个数据集中并不一致,我不能只依靠它来排序我的数据。

在 pandas 中,我只是通过获取最后一个索引来做到这一点

last_index = df.loc[date_minus60: date_curr].index[-1]
last_values = df.loc[last_index]

然后如果last_values.index.is_unique的值为false,我最后执行last_values.sort_values('id').iloc[-1]

我一直很难将这段代码翻译成 Dask,因为我遇到了关于我的延迟函数的问题,导致他们需要计算才能再次重新索引我的数据帧。

我想知道是否有处理此类问题的最佳做法。

【问题讨论】:

【参考方案1】:

下面的 sn-p 表明它是一个非常相似的语法:

import dask

# generate dask dataframe
ddf = dask.datasets.timeseries(freq="500ms", partition_freq="1h")

# generate a pandas dataframe
df = ddf.partitions[0].compute()  # pandas df for example

# sample dates
date_minus60 = "2000-01-01 00:00:00.000"
date_curr = "2000-01-01 00:00:02.000"

# pandas code
last_index_pandas = df.loc[date_minus60:date_curr].index[-1]
last_values_pandas = df.loc[last_index_pandas]

# dask code
last_index_dask = ddf.loc[date_minus60:date_curr].compute().index[-1]
last_values_dask = ddf.loc[last_index_dask].compute()

# check equality of the results
print(last_values_pandas == last_values_dask)

注意,区别在于dask 版本中的两个.compute 步骤,因为需要计算两个惰性值:第一个是找出正确的索引位置,第二个是获取实际值。这也假设数据已经被时间戳索引,如果不是,最好在加载到dask之前索引数据,因为.set_index通常是一个缓慢的操作。

然而,这取决于你在这之后的真实情况,dask 的使用可能不是很好。如果基本思想是进行快速查找,那么更好的解决方案是使用索引数据库(包括专门的时间序列数据库)。

最后,上面的 sn-p 使用了唯一索引。如果实际数据具有非唯一索引,则在计算 last_values_dask 后,应使用类似这样的方法(伪代码,预计不会立即工作)来处理按最大 id 选择的要求:

def get_largest_id(last_values):
    return last_values.sort_values('id').tail(1)

last_values_dask = get_largest_id(last_values_dask)

如果查找是针对批次(而不是特定的采样日期),则可以设计更好的管道。

【讨论】:

感谢您澄清双 .compute 步骤!我最初尝试过,结果非常缓慢且效率低下。因此,似乎最好将此逻辑卸载到管道的其他部分。我只使用 CSV 和 pandas 使逻辑与多年的数据一起工作,所以现在看来​​我要么必须在使用 dask 之前添加预处理,要么根本不使用 dask 如果你已经用pandas实现了一个逻辑,那么也许可以使用dask.delayed来提高效率,但是不看剩下的代码就不确定了。

以上是关于在具有非唯一索引列日期的 Dask 数据框中提取最新值的主要内容,如果未能解决你的问题,请参考以下文章

合并具有非唯一索引的多个数据帧

命名 Dask 数据框中返回的聚合列

如何绘制日期时间索引数据框中特定列的手动箱线图?

Pandas Dataframe .loc + 更新非唯一日期时间索引?

唯一与非唯一索引

上采样日期时间 - ValueError:无法使用方法或限制重新索引非唯一索引