如何使用 TimeGrouper 遍历包含不同范围的多个文件
Posted
技术标签:
【中文标题】如何使用 TimeGrouper 遍历包含不同范围的多个文件【英文标题】:How to iterate over mulltiple files containing different ranges with TimeGrouper 【发布时间】:2017-11-01 12:24:28 【问题描述】:我有一组文件。每个文件有 1 秒的数据。此外,这些文件不是定期的,即它们不是每日文件。例如,一个文件可能包含一天半的数据,而下一个文件可能包含 3 天 2 小时;文件之间和文件内部可能存在间隙。另一个问题是同时加载内存中的所有文件是不切实际的。
这是一个显示问题的具体示例。以下数据框有一天半 1 秒的数据:
index = pd.date_range('now', periods=60*60*24*1.5, freq='1S')
data_a = pd.DataFrame(np.random.rand(len(index)), index=index, columns=['data'])
下一个数据帧从前一个数据帧停止的地方开始,它有两天的数据:
index = pd.date_range(data_a.index[-1] + pd.Timedelta('1S'), periods=60*60*24*2, freq='1S')
data_b = pd.DataFrame(np.random.rand(len(index)), index=index, columns=['data'])
让我们在每个数据帧上创建 10 分钟的迭代器并 chain 它们:
ia = iter(data_a.groupby(TimeGrouper('10Min')))
ib = iter(data_b.groupby(TimeGrouper('10Min')))
iaib = chain(ia, ib)
如果我们迭代 iaib
,我们期望的行为是只查看每个组键(及其数据)一次,但事实并非如此。
seen =
for name, group in iaib:
count = seen.get(name, 0)
seen[name] = count + 1
seen_twice = key: value for key, value in seen.items() if value > 1
seen_twice
的内容是:
Timestamp('2017-06-02 08:50:00', freq='10T'): 2
在本例中,2017-06-02 08:50:00
是最后一组 data_a
和第一组 data_b
的键。
如何以 10 分钟为单位对所有文件进行迭代,而不在文件边缘重复组?
【问题讨论】:
感谢您的反馈,希望我已经改进了这个问题。 【参考方案1】:解决方案有两部分:一是将所有文件作为单个数据集处理;另一个是考虑到一个 10 分钟的组可以在一个文件的结尾和下一个文件的开始之间分割。
这些是必需的导入:
from itertools import chain
import pandas as pd
from pandas.tseries.resample import TimeGrouper
将所有文件作为单个数据集处理
此函数返回给定文件的 10 分钟组的迭代器:
def make_iterator(file):
df = pd.read_csv(file, index_col='timestamp', parse_dates=['timestamp'])
return iter(df.groupby(TimeGrouper('10Min')))
上述函数用于创建带有itertools.chain
的迭代器的迭代器。给定一个文件列表,可以像这样创建对所有 10 分钟文件集合组的单个迭代器:
files = ... # list obtained by os.listdir() or glob.glob()
iterator_of_single_file_group_iterators = map(make_iterator, files)
chained_file_group_iterator = chain.from_iterable(iterator_of_single_file_group_iterators)
考虑到一个组可以在一个文件的结尾和下一个文件的开头之间分割
但是,上面的迭代器不知道跨越两个文件的 10 分钟组。下面的课程解决了这个问题:
class TimeGrouperChainDecorator(object):
def __init__(self, iterator):
self.iterator = iterator
self._has_more = True
self._last_item = next(self.iterator)
def __iter__(self):
return self
def __next__(self):
if not self._has_more:
raise StopIteration
try:
return self._next()
except StopIteration:
self._has_more = False
if self._last_item is not None:
return self._last_item
raise StopIteration
def _next(self):
new_key, new_data = next(self.iterator)
last_key, last_data = self._last_item
if new_key == last_key:
data = pd.concat([last_data, new_data])
try:
self._last_item = next(self.iterator)
except StopIteration:
self._has_more = False
return new_key, data
else:
self._last_item = new_key, new_data
return last_key, last_data
请注意,实现完全依赖于 pandas groupby
API。要使用它,请使用上述链式迭代器创建该类的实例:
iterator = TimeGrouperChainDecorator(chained_file_group_iterator)
for name, group in iterator:
# do something with each 10 minute group
我的实现可能并不完美,因此欢迎任何反馈。我已经发布了snippet with 3 tests。
【讨论】:
以上是关于如何使用 TimeGrouper 遍历包含不同范围的多个文件的主要内容,如果未能解决你的问题,请参考以下文章
循环遍历范围,如果单元格包含值,则复制到列中的下一个空单元格