如何有效地迭代 Pandas 数据帧的连续块
Posted
技术标签:
【中文标题】如何有效地迭代 Pandas 数据帧的连续块【英文标题】:How to iterate over consecutive chunks of Pandas dataframe efficiently 【发布时间】:2014-10-31 04:59:07 【问题描述】:我有一个大数据框(数百万行)。
我希望能够对其执行 groupby 操作,但只是按任意连续(最好是大小相等)的行子集进行分组,而不是使用各个行的任何特定属性来决定它们进入哪个组。
用例:我想通过 IPython 中的并行映射将函数应用于每一行。哪些行进入哪个后端引擎并不重要,因为该函数一次根据一行计算结果。 (至少在概念上;实际上它是矢量化的。)
我想出了这样的东西:
# Generate a number from 0-9 for each row, indicating which tenth of the DF it belongs to
max_idx = dataframe.index.max()
tenths = ((10 * dataframe.index) / (1 + max_idx)).astype(np.uint32)
# Use this value to perform a groupby, yielding 10 consecutive chunks
groups = [g[1] for g in dataframe.groupby(tenths)]
# Process chunks in parallel
results = dview.map_sync(my_function, groups)
但这似乎很冗长,并且不能保证大小相等的块。特别是如果索引是稀疏或非整数或其他。
对更好的方法有什么建议吗?
谢谢!
【问题讨论】:
【参考方案1】:使用numpy的array_split():
import numpy as np
import pandas as pd
data = pd.DataFrame(np.random.rand(10, 3))
for chunk in np.array_split(data, 5):
assert len(chunk) == len(data) / 5, "This assert may fail for the last chunk if data lenght isn't divisible by 5"
【讨论】:
这是最优雅的方法。只是一个简单的内置函数调用,应该是公认的答案。 当数据帧的长度不能被块的数量整除时,该断言将不成立,但这将按预期运行——最后几个数据帧都将比第一个。【参考方案2】:import pandas as pd
def batch(iterable, batch_number=10):
"""
split an iterable into mini batch with batch length of batch_number
supports batch of a pandas dataframe
usage:
for i in batch([1,2,3,4,5], batch_number=2):
print(i)
for idx, mini_data in enumerate(batch(df, batch_number=10)):
print(idx)
print(mini_data)
"""
l = len(iterable)
for idx in range(0, l, batch_number):
if isinstance(iterable, pd.DataFrame):
# dataframe can't split index label, should iter according index
yield iterable.iloc[idx:min(idx+batch_number, l)]
else:
yield iterable[idx:min(idx+batch_number, l)]
【讨论】:
【参考方案3】:Chunks 用于迭代 pandas 数据框和系列的生成器函数
块函数的生成器版本如下所示。此外,此版本适用于 pd.DataFrame 或 pd.Series 的自定义索引(例如浮点类型索引)
import numpy as np
import pandas as pd
df_sz = 14
df = pd.DataFrame(np.random.rand(df_sz,4),
index=np.linspace(0., 10., num=df_sz),
columns=['a', 'b', 'c', 'd']
)
def chunker(seq, size):
for pos in range(0, len(seq), size):
yield seq.iloc[pos:pos + size]
chunk_size = 6
for i in chunker(df, chunk_size):
print(i)
chnk = chunker(df, chunk_size)
print('\n', chnk)
print(next(chnk))
print(next(chnk))
print(next(chnk))
输出是
A B C D 0.000000 0.560627 0.665897 0.683055 0.611884 0.769231 0.241871 0.357080 0.841945 0.340778 1.538462 0.065009 0.234621 0.250644 0.552410 2.307692 0.431394 0.235463 0.755084 0.114852 3.076923 0.173748 0.189739 0.148856 0.031171 3.846154 0.772352 0.697762 0.557806 0.254476 A B C D 4.615385 0.901200 0.977844 0.250316 0.957408 5.384615 0.400939 0.520841 0.863015 0.177043 6.153846 0.356927 0.344220 0.863067 0.400573 6.923077 0.375417 0.156420 0.897889 0.810083 7.692308 0.666371 0.152800 0.482446 0.955556 8.461538 0.242711 0.421591 0.005223 0.200596 A B C D 9.230769 0.735748 0.402639 0.527825 0.595952 10.000000 0.420209 0.365231 0.966829 0.514409 - 生成器对象分块器位于 0x7f503c9d0ba0 第一个“下一个()”: A B C D 0.000000 0.560627 0.665897 0.683055 0.611884 0.769231 0.241871 0.357080 0.841945 0.340778 1.538462 0.065009 0.234621 0.250644 0.552410 2.307692 0.431394 0.235463 0.755084 0.114852 3.076923 0.173748 0.189739 0.148856 0.031171 3.846154 0.772352 0.697762 0.557806 0.254476 第二个“下一个()”: A B C D 4.615385 0.901200 0.977844 0.250316 0.957408 5.384615 0.400939 0.520841 0.863015 0.177043 6.153846 0.356927 0.344220 0.863067 0.400573 6.923077 0.375417 0.156420 0.897889 0.810083 7.692308 0.666371 0.152800 0.482446 0.955556 8.461538 0.242711 0.421591 0.005223 0.200596 第三个“下一个()”: A B C D 9.230769 0.735748 0.402639 0.527825 0.595952 10.000000 0.420209 0.365231 0.966829 0.514409【讨论】:
有重叠的版本可以在这里找到:***.com/a/61799061/501852【参考方案4】:实际上,您不能保证大小相等的块。行数 (N) 可能是素数,在这种情况下,您只能在 1 或 N 处获得相同大小的块。因此,现实世界的分块通常使用固定大小并允许在最后使用更小的块。我倾向于将数组传递给groupby
。起点:
>>> df = pd.DataFrame(np.random.rand(15, 5), index=[0]*15)
>>> df[0] = range(15)
>>> df
0 1 2 3 4
0 0 0.746300 0.346277 0.220362 0.172680
0 1 0.657324 0.687169 0.384196 0.214118
0 2 0.016062 0.858784 0.236364 0.963389
[...]
0 13 0.510273 0.051608 0.230402 0.756921
0 14 0.950544 0.576539 0.642602 0.907850
[15 rows x 5 columns]
我故意将索引设置为 0 以使其不提供信息,我们只需确定我们的大小(此处为 10)并将数组除以它:
>>> df.groupby(np.arange(len(df))//10)
<pandas.core.groupby.DataFrameGroupBy object at 0xb208492c>
>>> for k,g in df.groupby(np.arange(len(df))//10):
... print(k,g)
...
0 0 1 2 3 4
0 0 0.746300 0.346277 0.220362 0.172680
0 1 0.657324 0.687169 0.384196 0.214118
0 2 0.016062 0.858784 0.236364 0.963389
[...]
0 8 0.241049 0.246149 0.241935 0.563428
0 9 0.493819 0.918858 0.193236 0.266257
[10 rows x 5 columns]
1 0 1 2 3 4
0 10 0.037693 0.370789 0.369117 0.401041
0 11 0.721843 0.862295 0.671733 0.605006
[...]
0 14 0.950544 0.576539 0.642602 0.907850
[5 rows x 5 columns]
当索引与其不兼容时,基于对 DataFrame 进行切片的方法可能会失败,尽管您始终可以使用 .iloc[a:b]
来忽略索引值并按位置访问数据。
【讨论】:
这就是我的想法!从技术上讲,“df.groupby(np.arange(len(df)) // (len(df) / 10))”可以获得固定数量的组(每个核心 1 个)而不是固定大小。由于某种原因,我没有想到分组键实际上根本不需要与索引相关...... 值得一提的是,为了提高效率,最好使用“迭代器”(pandas.pydata.org/pandas-docs/stable/generated/…)和“块大小”读取原始文件,以便 read_csv 函数读取每个片段如@Ryan 所述,可以传递给单独的进程【参考方案5】:良好环境的标志是有很多选择,所以我从Anaconda Blaze添加这个,真正使用Odo
import blaze as bz
import pandas as pd
df = pd.DataFrame('col1':[1,2,3,4,5], 'col2':[2,4,6,8,10])
for chunk in bz.odo(df, target=bz.chunks(pd.DataFrame), chunksize=2):
# Do stuff with chunked dataframe
【讨论】:
不幸的是,Odo 似乎不再被维护。在撰写本文时,最后一次提交是在 11 个月前,贡献图已经逐渐减少到零。【参考方案6】:我不确定这是否正是您想要的,但我发现another SO thread 上的这些分组函数对于创建多处理器池非常有用。
这是来自该线程的一个简短示例,它可能会执行您想要的操作:
import numpy as np
import pandas as pds
df = pds.DataFrame(np.random.rand(14,4), columns=['a', 'b', 'c', 'd'])
def chunker(seq, size):
return (seq[pos:pos + size] for pos in xrange(0, len(seq), size))
for i in chunker(df,5):
print i
这给了你这样的东西:
a b c d
0 0.860574 0.059326 0.339192 0.786399
1 0.029196 0.395613 0.524240 0.380265
2 0.235759 0.164282 0.350042 0.877004
3 0.545394 0.881960 0.994079 0.721279
4 0.584504 0.648308 0.655147 0.511390
a b c d
5 0.276160 0.982803 0.451825 0.845363
6 0.728453 0.246870 0.515770 0.343479
7 0.971947 0.278430 0.006910 0.888512
8 0.044888 0.875791 0.842361 0.890675
9 0.200563 0.246080 0.333202 0.574488
a b c d
10 0.971125 0.106790 0.274001 0.960579
11 0.722224 0.575325 0.465267 0.258976
12 0.574039 0.258625 0.469209 0.886768
13 0.915423 0.713076 0.073338 0.622967
希望对你有帮助。
编辑
在这种情况下,我以(大约)这种方式将此函数与pool of processors 一起使用:
from multiprocessing import Pool
nprocs = 4
pool = Pool(nprocs)
for chunk in chunker(df, nprocs):
data = pool.map(myfunction, chunk)
data.domorestuff()
我认为这应该与使用 IPython 分布式机器非常相似,但我还没有尝试过。
【讨论】:
这当然可以。我仍然有点坚持一些整洁的 groupby 单线,但如果没有这样的事情发生,你会得到奖品 :-)以上是关于如何有效地迭代 Pandas 数据帧的连续块的主要内容,如果未能解决你的问题,请参考以下文章
如何在迭代 pandas 分组 df 时修复 ValueError?
从增加的位置迭代 Pandas 数据框的所有列后,如何再次返回第一列?