将功能应用于每个组,其中组被拆分为多个文件,而不连接所有文件
Posted
技术标签:
【中文标题】将功能应用于每个组,其中组被拆分为多个文件,而不连接所有文件【英文标题】:Apply function to each group where the group are splitted in multiple files without concatenating all the files 【发布时间】:2021-10-04 04:43:20 【问题描述】:我的数据来自 BigQuery,以 CSV 文件的形式导出到 GCS 存储桶,如果文件非常大,BigQuery 会自动将数据拆分成几个块。考虑到时间序列,时间序列可能分散在不同的文件中。我有一个自定义函数,我想将其应用于每个 TimeseriesID
。
以下是数据的一些约束:
数据按TimeseriesID
和TimeID
排序
每个文件的行数可能不同,但至少 1 行(这不太可能)
TimeID
的开头并不总是 0
每个时间序列的长度可能会有所不同,但最多只能分散在 2 个文件中。没有时间序列分散在 3 个不同的文件中。
这是说明问题的初始设置:
# Please take note this is just for simplicity. The actual goal is not to calculate mean for all group, but to apply a custom_func to each Timeseries ID
def custom_func(x):
return np.mean(x)
# Please take note this is just for simplicity. In actual, I read the file one by one since reading all the data is not possible
df1 = pd.DataFrame("TimeseriesID":['A','A','A','B'],"TimeID":[0,1,2,4],"value":[10,20,5,30])
df2 = pd.DataFrame("TimeseriesID":['B','B','B','C'],"TimeID":[5,6,7,8],"value":[10,20,5,30])
df3 = pd.DataFrame("TimeseriesID":['C','D','D','D'],"TimeID":[9,1,2,3],"value":[10,20,5,30])
如果我可以 concat
所有文件,这应该是非常微不足道的,但问题是如果我 concat
所有数据帧,那么它就无法放入内存。
我想要的输出应该与此类似,但没有 concat
所有文件。
pd.concat([df1,df2,df3],axis=0).groupby('TimeseriesID').agg("value":simple_func)
我也知道vaex
和dask
,但我想暂时坚持使用简单的熊猫。
我也愿意接受涉及修改 BigQuery 以更好地拆分文件的解决方案。
【问题讨论】:
你说你正在使用简单的熊猫。您使用的是普通的Panda
还是Panda GBQ?它不适合记忆,你有什么错误吗?
关于“TimeID”相关性,正如问题中提到的,它只是一个用于排序的值,它可能并不总是 0,但除此之外,没有任何关键信息可能表明它用于所需的输出计算。唯一起作用的列是“TimeseriesID”和“value”
@PjoterS 我不是直接从 BigQuery 读取数据。我将 bigquery 表导出到 GCS 存储桶中的 csv 文件,之后我使用 pd.read_csv 从 GCS 存储桶中读取数据。我将编辑我的问题。
还有关于 TimeID,因为数据已经按 TimeID 排序,是的,使用 TimeID 可能没有用。我只是把它说出来,以便人们了解数据的性质。
【参考方案1】:
op 提出的对数百万条记录使用 concat 的方法对于内存/其他资源来说太过分了。
我已经使用 Google Colab Notebooks 测试了 OP 代码,这是一个糟糕的方法
import pandas as pd
import numpy as np
import time
# Please take note this is just for simplicity. The actual goal is not to calculate mean for all group, but to apply a custom_func to each Timeseries ID
def custom_func(x):
return np.mean(x)
# Please take note this is just for simplicity. In actual, I read the file one by one since reading all the data is not possible
df1 = pd.DataFrame("TimeseriesID":['A','A','A','B'],"TimeID":[0,1,2,4],"value":[10,20,5,30])
df2 = pd.DataFrame("TimeseriesID":['B','B','B','C'],"TimeID":[5,6,7,8],"value":[10,20,5,30])
df3 = pd.DataFrame("TimeseriesID":['C','D','D','D'],"TimeID":[9,1,2,3],"value":[10,20,5,30])
start = time.time()
df = pd.concat([df1,df2,df3]).groupby('TimeseriesID').agg("value":custom_func)
elapsed = (time.time() - start)
print(elapsed)
print(df.head())
输出将是:
0.023952960968017578
value
TimeseriesID A 11.666667
B 16.250000
C 20.000000
D 18.333333
如您所见,“concat”需要时间来处理。由于很少有记录,这没有被察觉。 方法应该如下:
-
获取包含您要处理的数据的文件。即:只有可行的列。
根据已处理文件的键和值创建字典。如有必要,获取必要文件中每个键的值。您可以将结果以 json/csv 格式存储在“结果”目录中:
A.csv 将包含所有关键的“A”值 ... n.csv 将包含所有关键的“n”值
-
遍历结果目录并开始在字典中构建最终输出。
'A': [10, 20, 5], 'B': [30, 10, 20, 5], 'C': [30, 10], 'D': [20, 5, 30 ]
-
对每个键值列表应用自定义函数。
'A': 11.666666666666666, 'B': 16.25, 'C': 20.0, 'D': 18.333333333333332
您可以使用以下代码检查逻辑,我使用 json 存储数据:
from google.colab import files
import json
import pandas as pd
#initial dataset
df1 = pd.DataFrame("TimeseriesID":['A','A','A','B'],"TimeID":[0,1,2,4],"value":[10,20,5,30])
df2 = pd.DataFrame("TimeseriesID":['B','B','B','C'],"TimeID":[5,6,7,8],"value":[10,20,5,30])
df3 = pd.DataFrame("TimeseriesID":['C','D','D','D'],"TimeID":[9,1,2,3],"value":[10,20,5,30])
#get unique keys and its values
df1.groupby('TimeseriesID')['value'].apply(list).to_json('df1.json')
df2.groupby('TimeseriesID')['value'].apply(list).to_json('df2.json')
df3.groupby('TimeseriesID')['value'].apply(list).to_json('df3.json')
#as this is an example you can download the output as jsons
files.download('df1.json')
files.download('df2.json')
files.download('df3.json')
2021 年 6 月 10 日更新 我已经针对 OP 的需求调整了代码。这部分创建精炼文件。
from google.colab import files
import json
#you should use your own function to get the data from the file
def retrieve_data(uploaded,file):
return json.loads(uploaded[file].decode('utf-8'))
#you should use your own function to get a list of files to process
def retrieve_files():
return files.upload()
key_list =[]
#call a function that gets a list of files to process
file_to_process = retrieve_files()
#read every raw file:
for file in file_to_process:
file_data = retrieve_data(file_to_process,file)
for key,value in file_data.items():
if key not in key_list:
key_list.append(key)
with open(f'key.json','w') as new_key_file:
new_json = json.dumps(key:value)
new_key_file.write(new_json)
else:
with open(f'key.json','r+') as key_file:
raw_json = key_file.read()
old_json = json.loads(raw_json)
new_json = json.dumps(key:old_json[key]+value)
key_file.seek(0)
key_file.write(new_json)
for key in key_list:
files.download(f'key.json')
print(key_list)
2021 年 7 月 10 日更新 我已经更新了代码以避免混淆。这部分处理精炼文件。
import time
import numpy as np
#Once we get the refined values we can use it to apply custom functions
def custom_func(x):
return np.mean(x)
#Get key and data content from single json
def get_data(file_data):
content = file_data.popitem()
return content[0],content[1]
#load key list and build our refined dictionary
refined_values = []
#call a function that gets a list of files to process
file_to_process = retrieve_files()
start = time.time()
#read every refined file:
for file in file_to_process:
#read content of file n
file_data = retrieve_data(file_to_process,file)
#parse and apply function per file read
key,data = get_data(file_data)
func_output = custom_func(data)
#start building refined list
refined_values.append([key,func_output])
elapsed = (time.time() - start)
print(elapsed)
df = pd.DataFrame.from_records(refined_values,columns=['TimerSeriesID','value']).sort_values(by=['TimerSeriesID'])
df = df.reset_index(drop=True)
print(df.head())
输出将是:
0.00045609474182128906
TimerSeriesID value
0 A 11.666667
1 B 16.250000
2 C 20.000000
3 D 18.333333
总结:
在处理大型数据集时,您应该始终关注您将要使用的数据并将其保持在最低限度。只使用可行的值。
当操作由基本运算符或 python 本机库执行时,处理时间会更快。
【讨论】:
感谢您的回答,喜欢您的清晰回答。所以基本上对于第一部分,我将为我的每个文件执行一个循环以将其转换为 json.file。对于第二部分,由于我可以访问 GCS 存储桶,而不是一个一个地上传每个文件,我可以列出我的所有文件(使用 glob 之类的东西)并将它们附加到 json。但是这种方法会破坏值的顺序,我认为我们需要修改它以附加时间索引和值对,以便稍后对其进行排序。而且由于您的解决方案对每个文件和每个 json 项使用循环,我认为我们需要在这里进行多处理。 您好 Vinson Ciawandy,请检查更新后的代码。 感谢您的更新。我只是意识到在#read every refined file
中创建refined_values
变量,它与仅读取所有数据但仅以json 格式读取本质上不一样吗?它不会破坏记忆吗?
我认为申请custom_func
的过程可以在这个循环for file in file_to_process:
上完成。它应该工作正常吗?所以不需要收集所有数据,只需为每个 file_data
应用函数,因为每个 file_data
已经包含一个完整的系列?
您好 Vinson Ciawandy,我已经更新了代码来回答您的问题。简而言之,是的,正确的方法应该是单独处理每个文件以避免内存消耗。即使以这种方式处理消耗仍然很高,您也应该将块限制在可管理的大小。此外,您可以使用多处理来加速此过滤过程。以上是关于将功能应用于每个组,其中组被拆分为多个文件,而不连接所有文件的主要内容,如果未能解决你的问题,请参考以下文章