在给定开始和结束时间的情况下计算并发会话
Posted
技术标签:
【中文标题】在给定开始和结束时间的情况下计算并发会话【英文标题】:Calculating concurrent sessions given a start and end time 【发布时间】:2020-06-03 14:50:22 【问题描述】:我需要能够根据如下数百万行数据计算出在任何给定时间每分钟正在运行的会话数。
我尝试融合数据框并创建了一个等于 1 或 -1 的新列,具体取决于它是开始还是结束。将其总结并按时间分组,我希望会奏效。
问题是,很多会话可能会在前一天开始,因为我只能在今天查询,所以当这些会话结束时,我在凌晨时会得到 MINUS 200 万。
有没有一种好的方法可以做到这一点并绘制图表,以便显示任何给定时间的会话数?
2020-05-31 00:00:01 | 2020-05-31 00:00:31
2020-05-31 00:01:01 | 2020-05-31 00:02:01
2020-05-31 00:02:01 | 2020-05-31 00:06:03
2020-05-31 00:03:01 | 2020-05-31 00:04:01
2020-05-31 00:04:01 | 2020-05-31 00:34:01
【问题讨论】:
您能否澄清一下,您共享会话的开始和结束时间的两列? 是的开始时间和结束时间 【参考方案1】:所以我有一个尝试,也许有人可以借鉴(否则它可能会打扰到足以提供更好答案的人?)?这是你的数据,我刚刚添加了列名:
In[1]: df
Out[1]:
Session_Starts Session_Ends
0 2020-05-31 00:00:01 2020-05-31 00:00:31
1 2020-05-31 00:01:01 2020-05-31 00:02:01
2 2020-05-31 00:02:01 2020-05-31 00:06:03
3 2020-05-31 00:03:01 2020-05-31 00:04:01
4 2020-05-31 00:04:01 2020-05-31 00:34:01
我将每个会话的开始和结束时间缩短一分钟,然后在这些新的开始时间和结束时间之间创建一个date_range
(以分钟为频率)。当每个会话处于活动状态时,这会给出一组独特的分钟数数组。然后,我将此列表解压缩为 Series
并获取 value_counts()
。
import pandas as pd
import numpy as np
from itertools import chain
session_starts = (x - pd.Timedelta(seconds=x.second) for x in df['Session_Starts'])
session_ends = (x - pd.Timedelta(seconds=x.second) for x in df['Session_Ends'])
ranges = (pd.date_range(x,y,freq='1T') for x,y in zip(session_starts,session_ends))
ranges = pd.Series(chain.from_iterable(ranges))
output = ranges.value_counts(sort=False).sort_index()
输出:
2020-05-31 00:00:00 1
2020-05-31 00:01:00 1
2020-05-31 00:02:00 2
2020-05-31 00:03:00 2
2020-05-31 00:04:00 3
2020-05-31 00:05:00 2
2020-05-31 00:06:00 2
2020-05-31 00:07:00 1
2020-05-31 00:08:00 1
...
2020-05-31 00:33:00 1
2020-05-31 00:34:00 1
dtype: int64
问题在于规模,也就是您所说的数百万次观察。我正在尝试使用长度低于 100 万的玩具数据,但它已经开始需要很长时间了:
SIZE = 100000
dr = pd.date_range(start='01-01-2020',end='1-02-2020',freq='1T')
col1 = np.random.choice(dr, SIZE)
deltas = pd.Series([pd.Timedelta(minutes = r) for r in np.random.randint(0,10,size=SIZE)])
col2 = col1 + deltas
df = pd.DataFrame('Session_Starts':col1,'Session_Ends':col2)
使用timeit
,通过上面相同的代码运行这个df
需要超过20秒。我相信时间与行数成线性关系。
我想不出更好的办法,但我相信一定有;我很想知道如何改进它(或者只是一个更好的解决方案)。希望这能有所帮助,或者至少能让事情顺利进行。
【讨论】:
非常感谢您的回复。我将尝试在 Spark 上的 Koalas 中运行它,看看我们是否可以通过并行化来缩短执行时间:)【参考方案2】:我最初的方法是创建一个DatetimeIndex
,它表示包含数据中所有事件的时间段,然后为每个事件创建一个与索引具有相同维度的数组,其值为1
或True
,当事件发生时,0
或False
,否则。添加这些数组会产生每次的并发事件总数。 much better approach 仅考虑新事件开始 (+1
) 或结束 (-1
) 的时间,然后计算这些更改的累积总和。我们可以通过重新索引和填充将这些结果扩展到包含事件的整个时期。
加载数据
import pandas as pd
# Data from the question
data = [['2020-05-31 00:00:01', '2020-05-31 00:00:31'],
['2020-05-31 00:01:01', '2020-05-31 00:02:01'],
['2020-05-31 00:02:01', '2020-05-31 00:06:03'],
['2020-05-31 00:03:01', '2020-05-31 00:04:01'],
['2020-05-31 00:04:01', '2020-05-31 00:34:01']]
# The data as a DataFrame
df = pd.DataFrame(data, columns=['Start time', 'End time'], dtype='datetime64[ns]')
创建DatetimeIndex
频率与事件时间戳的时间粒度相匹配是有意义的。
min_time = df['Start time'].min()
max_time = df['End time'].max()
ts_index = pd.date_range(min_time, max_time, freq = 's')
计算并发
在前两个方法中,我们创建了一个数据结构,它对应于一个与每个事件的索引具有相同维度的数组。这些数组指示事件发生的时间。如果有很多事件,最好创建一个迭代器,否则我们可能会耗尽内存。第三种方法侧重于事件的开始和结束,而不是对整个时期的单个事件进行表征。
1.有一个系列
这个小例子没有内存不足的风险,所以我们创建了一系列数组并添加它们。
concurrency_array = df.apply(lambda e: ((ts_index >= e[0]) & (ts_index <= e[1])).astype(int), axis='columns').sum()
concurrency = pd.Series(concurrency_array, index = ts_index)
2.使用迭代器
这将避免一次将所有数组加载到内存中。请注意,这里我们使用 python map
和 sum
函数而不是 pandas 构造。
concurrency_iter = map(lambda e: (ts_index >= e[0]) & (ts_index <= e[1]), df.values)
concurrency = pd.Series(sum(concurrency_iter), index = ts_index)
3.有一系列的唯一变化(Best)
这种方法比我想出的任何方法都快大大,而且总的来说它更好。我是从this answer 那里得到这个想法的。
基本上,我们创建一个系列,其中包含所有事件的所有开始和结束时间,开始时间的值为1
,结束时间的值为-1
。然后我们groupby
索引值和sum
,这会产生一个包含所有更改(即事件开始、结束以及两者的任意组合)的系列。然后我们取累积总和 (cumsum
),它会在并发事件发生变化时产生总并发事件,也就是说,在至少一个事件开始或结束的时间。要获得整个周期的结果,我们只需 reindex
使用我们之前创建的索引并向前填充 (ffill
)。
starts = pd.Series(1, df['Start time'])
ends = pd.Series(-1, df['End time'] + pd.Timedelta('1 sec')) # Include last second
concurrency_changes = pd.concat([starts, ends]) \
.groupby(level=0).sum() \
.cumsum()
concurrency = concurrency_changes.reindex(ts_index, method='ffill')
结果
上述所有方法的结果是一个系列,其索引是我们之前创建的DatetimeIndex
,其值是我们数据中并发事件的总数。
重采样
现在我们有了一个包含并发数据的 Series,我们可以在方便时重新采样。例如,如果我们正在调查某个资源的最大利用率,我们可能会这样做:
In [5]: concurrency.resample('5T').max()
Out[5]:
2020-05-31 00:00:00 3
2020-05-31 00:05:00 2
2020-05-31 00:10:00 1
2020-05-31 00:15:00 1
2020-05-31 00:20:00 1
2020-05-31 00:25:00 1
2020-05-31 00:30:00 1
Freq: 5T, dtype: int64
【讨论】:
以上是关于在给定开始和结束时间的情况下计算并发会话的主要内容,如果未能解决你的问题,请参考以下文章
给定具有开始和结束时间的事件,如何使用 Spark 计算同时发生的事件数?
在给定日期时间连续性的情况下,Pandas 输出日期、开始和结束时间以及事件状态