将功能应用于每个组,其中组被拆分为多个文件,而不连接所有文件

Posted

技术标签:

【中文标题】将功能应用于每个组,其中组被拆分为多个文件,而不连接所有文件【英文标题】:Apply function to each group where the group are splitted in multiple files without concatenating all the files 【发布时间】:2021-10-04 04:43:20 【问题描述】:

我的数据来自 BigQuery,以 CSV 文件的形式导出到 GCS 存储桶,如果文件非常大,BigQuery 会自动将数据拆分成几个块。考虑到时间序列,时间序列可能分散在不同的文件中。我有一个自定义函数,我想将其应用于每个 TimeseriesID

以下是数据的一些约束:

数据按TimeseriesIDTimeID排序 每个文件的行数可能不同,但至少 1 行(这不太可能) TimeID 的开头并不总是 0 每个时间序列的长度可能会有所不同,但最多只能分散在 2 个文件中。没有时间序列分散在 3 个不同的文件中。

这是说明问题的初始设置:

# Please take note this is just for simplicity. The actual goal is not to calculate mean for all group, but to apply a custom_func to each Timeseries ID
def custom_func(x):
    return np.mean(x) 

# Please take note this is just for simplicity. In actual, I read the file one by one since reading all the data is not possible
df1 = pd.DataFrame("TimeseriesID":['A','A','A','B'],"TimeID":[0,1,2,4],"value":[10,20,5,30])
df2 = pd.DataFrame("TimeseriesID":['B','B','B','C'],"TimeID":[5,6,7,8],"value":[10,20,5,30])
df3 = pd.DataFrame("TimeseriesID":['C','D','D','D'],"TimeID":[9,1,2,3],"value":[10,20,5,30])

如果我可以 concat 所有文件,这应该是非常微不足道的,但问题是如果我 concat 所有数据帧,那么它就无法放入内存。

我想要的输出应该与此类似,但没有 concat 所有文件。

pd.concat([df1,df2,df3],axis=0).groupby('TimeseriesID').agg("value":simple_func) 

我也知道vaexdask,但我想暂时坚持使用简单的熊猫。 我也愿意接受涉及修改 BigQuery 以更好地拆分文件的解决方案。

【问题讨论】:

你说你正在使用简单的熊猫。您使用的是普通的Panda 还是Panda GBQ?它不适合记忆,你有什么错误吗? 关于“TimeID”相关性,正如问题中提到的,它只是一个用于排序的值,它可能并不总是 0,但除此之外,没有任何关键信息可能表明它用于所需的输出计算。唯一起作用的列是“TimeseriesID”和“value” @PjoterS 我不是直接从 BigQuery 读取数据。我将 bigquery 表导出到 GCS 存储桶中的 csv 文件,之后我使用 pd.read_csv 从 GCS 存储桶中读取数据。我将编辑我的问题。 还有关于 TimeID,因为数据已经按 TimeID 排序,是的,使用 TimeID 可能没有用。我只是把它说出来,以便人们了解数据的性质。 【参考方案1】:

op 提出的对数百万条记录使用 concat 的方法对于内存/其他资源来说太过分了。

我已经使用 Google Colab Notebooks 测试了 OP 代码,这是一个糟糕的方法

import pandas as pd
import numpy as np
import time

# Please take note this is just for simplicity. The actual goal is not to calculate mean for all group, but to apply a custom_func to each Timeseries ID

def  custom_func(x):
    return np.mean(x)

# Please take note this is just for simplicity. In actual, I read the file one by one since reading all the data is not possible
df1 = pd.DataFrame("TimeseriesID":['A','A','A','B'],"TimeID":[0,1,2,4],"value":[10,20,5,30])
df2 = pd.DataFrame("TimeseriesID":['B','B','B','C'],"TimeID":[5,6,7,8],"value":[10,20,5,30])
df3 = pd.DataFrame("TimeseriesID":['C','D','D','D'],"TimeID":[9,1,2,3],"value":[10,20,5,30])

start = time.time()
df = pd.concat([df1,df2,df3]).groupby('TimeseriesID').agg("value":custom_func)
elapsed = (time.time() - start)

print(elapsed)
print(df.head())

输出将是:

0.023952960968017578 
                value 
TimeseriesID A 11.666667 
             B 16.250000 
             C 20.000000 
             D 18.333333

如您所见,“concat”需要时间来处理。由于很少有记录,这没有被察觉。 方法应该如下:

    获取包含您要处理的数据的文件。即:只有可行的列。 根据已处理文件的键和值创建字典。如有必要,获取必要文件中每个键的值。您可以将结果以 json/csv 格式存储在“结果”目录中:

A.csv 将包含所有关键的“A”值 ... n.csv 将包含所有关键的“n”值

    遍历结果目录并开始在字典中构建最终输出。

'A': [10, 20, 5], 'B': [30, 10, 20, 5], 'C': [30, 10], 'D': [20, 5, 30 ]

    对每个键值列表应用自定义函数。

'A': 11.666666666666666, 'B': 16.25, 'C': 20.0, 'D': 18.333333333333332

您可以使用以下代码检查逻辑,我使用 json 存储数据:

from google.colab import files
import json
import pandas as pd

#initial dataset
df1 = pd.DataFrame("TimeseriesID":['A','A','A','B'],"TimeID":[0,1,2,4],"value":[10,20,5,30])
df2 = pd.DataFrame("TimeseriesID":['B','B','B','C'],"TimeID":[5,6,7,8],"value":[10,20,5,30])
df3 = pd.DataFrame("TimeseriesID":['C','D','D','D'],"TimeID":[9,1,2,3],"value":[10,20,5,30])

#get unique keys and its values
df1.groupby('TimeseriesID')['value'].apply(list).to_json('df1.json')
df2.groupby('TimeseriesID')['value'].apply(list).to_json('df2.json')
df3.groupby('TimeseriesID')['value'].apply(list).to_json('df3.json')

#as this is an example you can download the output as jsons
files.download('df1.json')
files.download('df2.json')
files.download('df3.json')

2021 年 6 月 10 日更新 我已经针对 OP 的需求调整了代码。这部分创建精炼文件。

from google.colab import files
import json

#you should use your own function to get the data from the file
def retrieve_data(uploaded,file):
  return json.loads(uploaded[file].decode('utf-8'))

#you should use your own function to get a list of files to process
def retrieve_files():
  return files.upload()

key_list =[]
#call a function that gets a list of files to process
file_to_process = retrieve_files()

#read every raw file:
for file in file_to_process: 
  file_data = retrieve_data(file_to_process,file)

  for key,value in file_data.items(): 
    if key not in key_list: 
      key_list.append(key)
      with open(f'key.json','w') as new_key_file:
        new_json = json.dumps(key:value)
        new_key_file.write(new_json)

    else:
      with open(f'key.json','r+') as key_file:
        raw_json = key_file.read()
        old_json = json.loads(raw_json)
        new_json = json.dumps(key:old_json[key]+value)

        key_file.seek(0)
        key_file.write(new_json)

for key in key_list:
  files.download(f'key.json')

print(key_list)

2021 年 7 月 10 日更新 我已经更新了代码以避免混淆。这部分处理精炼文件。

import time
import numpy as np

#Once we get the refined values we can use it to apply custom functions
def custom_func(x):
    return np.mean(x) 

#Get key and data content from single json
def get_data(file_data):
    content = file_data.popitem()
    return content[0],content[1]

#load key list and build our refined dictionary
refined_values = []

#call a function that gets a list of files to process
file_to_process = retrieve_files()

start = time.time()
#read every refined file:
for file in file_to_process: 
  #read content of file n
  file_data = retrieve_data(file_to_process,file)
  
  #parse and apply function per file read
  key,data = get_data(file_data)
  func_output = custom_func(data)

  #start building refined list
  refined_values.append([key,func_output])

elapsed = (time.time() - start)
print(elapsed)
  
df = pd.DataFrame.from_records(refined_values,columns=['TimerSeriesID','value']).sort_values(by=['TimerSeriesID'])
df = df.reset_index(drop=True)
print(df.head())

输出将是:

0.00045609474182128906
  TimerSeriesID      value
0             A  11.666667
1             B  16.250000
2             C  20.000000
3             D  18.333333

总结:

在处理大型数据集时,您应该始终关注您将要使用的数据并将其保持在最低限度。只使用可行的值。

当操作由基本运算符或 python 本机库执行时,处理时间会更快。

【讨论】:

感谢您的回答,喜欢您的清晰回答。所以基本上对于第一部分,我将为我的每个文件执行一个循环以将其转换为 json.file。对于第二部分,由于我可以访问 GCS 存储桶,而不是一个一个地上传每个文件,我可以列出我的所有文件(使用 glob 之类的东西)并将它们附加到 json。但是这种方法会破坏值的顺序,我认为我们需要修改它以附加时间索引和值对,以便稍后对其进行排序。而且由于您的解决方案对每个文件和每个 json 项使用循环,我认为我们需要在这里进行多处理。 您好 Vinson Ciawandy,请检查更新后的代码。 感谢您的更新。我只是意识到在#read every refined file 中创建refined_values 变量,它与仅读取所有数据但仅以json 格式读取本质上不一样吗?它不会破坏记忆吗? 我认为申请custom_func的过程可以在这个循环for file in file_to_process:上完成。它应该工作正常吗?所以不需要收集所有数据,只需为每个 file_data 应用函数,因为每个 file_data 已经包含一个完整的系列? 您好 Vinson Ciawandy,我已经更新了代码来回答您的问题。简而言之,是的,正确的方法应该是单独处理每个文件以避免内存消耗。即使以这种方式处理消耗仍然很高,您也应该将块限制在可管理的大小。此外,您可以使用多处理来加速此过滤过程。

以上是关于将功能应用于每个组,其中组被拆分为多个文件,而不连接所有文件的主要内容,如果未能解决你的问题,请参考以下文章

将字符串拆分为具有多个分隔符的多个字符串而不删除?

如何将具有一组子命令的Click命令拆分为多个文件?

如何将数据帧拆分为多个数据帧,其中每个数据帧包含相等但随机的数据[重复]

将 SVG 组转换为多个“单独的”PN​​G 文件

在 Power Query 中拆分列而不转换为文本?

如何将每个带有一组子命令的 Click 命令拆分为多个文件?