Python - 使用巨大的数据集避免内存错误

Posted

技术标签:

【中文标题】Python - 使用巨大的数据集避免内存错误【英文标题】:Python - avoiding memory error with HUGE data set 【发布时间】:2017-01-27 09:44:39 【问题描述】:

我有一个连接到 PostGreSQL 数据库的 python 程序。在这个数据库中,我有很多数据(大约 12 亿行)。幸运的是,我不必同时分析所有这些行。

这 12 亿行分布在多个表(大约 30 个)上。目前我正在访问一个名为 table_3 的表,我想在其中访问所有具有特定“did”值的行(因为该列被调用)。

我已经使用 SQL 命令计算了行数:

SELECT count(*) FROM table_3 WHERE did='356002062376054';

返回 1.57 亿行。

我将对所有这些行执行一些“分析”(提取 2 个特定值)并对这些值进行一些计算,然后将它们写入字典,然后将它们保存回 PostGreSQL 的另一个表中。

问题是我正在创建大量列表和字典来管理所有这些,即使我使用的是 64 位 Python 3 并且有 64 GB 的 RAM,我最终也会耗尽内存。

一些代码:

CONNECTION = psycopg2.connect('<psycopg2 formatted string>')
CURSOR = CONNECTION.cursor()

DID_LIST = ["357139052424715",
            "353224061929963",
            "356002064810514",
            "356002064810183",
            "358188051768472",
            "358188050598029",
            "356002061925067",
            "358188056470108",
            "356002062376054",
            "357460064130045"]

SENSOR_LIST = [1, 2, 3, 4, 5, 6, 7, 8, 9,
               10, 11, 12, 13, 801, 900, 901,
               902, 903, 904, 905, 906, 907,
               908, 909, 910, 911]

for did in did_list:
    table_name = did
    for sensor_id in sensor_list:
        rows = get_data(did, sensor_id)
        list_object = create_standard_list(sensor_id, rows)  # Happens here
        formatted_list = format_table_dictionary(list_object) # Or here
        pushed_rows = write_to_table(table_name, formatted_list) #write_to_table method is omitted as that is not my problem.

def get_data(did, table_id):
    """Getting data from postgresql."""
    table_name = "table_0".format(table_id)
    query = """SELECT * FROM 0 WHERE did='1'
               ORDER BY timestamp""".format(table_name, did)

    CURSOR.execute(query)
    CONNECTION.commit()
    
    return CURSOR

def create_standard_list(sensor_id, data):
    """Formats DB data to dictionary"""
    list_object = []

    print("Create standard list")
    for row in data: # data is the psycopg2 CURSOR
        row_timestamp = row[2]
        row_data = row[3]

        temp_object = "sensor_id": sensor_id, "timestamp": row_timestamp,
                       "data": row_data

        list_object.append(temp_object)

    return list_object


def format_table_dictionary(list_dict):
    """Formats dictionary to simple data
       table_name = (dates, data_count, first row)"""
    print("Formatting dict to DB")
    temp_today = 0
    dict_list = []
    first_row = 
    count = 1

    for elem in list_dict:
        # convert to seconds
        date = datetime.fromtimestamp(elem['timestamp'] / 1000)
        today = int(date.strftime('%d'))
        if temp_today is not today:
            if not first_row:
                first_row = elem['data']
            first_row_str = str(first_row)
            dict_object = "sensor_id": elem['sensor_id'],
                           "date": date.strftime('%d/%m-%Y'),
                           "reading_count": count,
                           # size in MB of data
                           "approx_data_size": (count*len(first_row_str)/1000),
                           "time": date.strftime('%H:%M:%S'),
                           "first_row": first_row

            dict_list.append(dict_object)
            first_row = 
            temp_today = today
            count = 0
        else:
            count += 1

    return dict_list

我的错误发生在创建代码中标有 cmets 的两个列表中的任何一个的某个地方。它代表我的计算机停止响应,并最终将我注销。如果这很重要,我正在运行 Windows 10。

我知道我使用“create_standard_list”方法创建的第一个列表可能会被排除在外,并且该代码可以在“format_table_dictionary”代码中运行,从而避免内存中包含 157 个 mio 元素的列表,但我认为其中一些我将遇到的其他表也会有类似的问题并且可能更大,所以我现在想优化它,但我不确定我能做什么?

我想写入文件并没有太大帮助,因为我必须读取该文件,从而将其全部重新放入内存中?

极简主义示例

我有一张桌子

---------------------------------------------------------------
|Row 1 | did | timestamp | data | unused value | unused value |
|Row 2 | did | timestamp | data | unused value | unused value |
....
---------------------------------

table = [ values from above row1 ,  values from above row2,...]

connection = psycopg2.connect(<connection string>)
cursor = connection.cursor()

table = cursor.execute("""SELECT * FROM table_3 WHERE did='356002062376054'
                          ORDER BY timestamp""")

extracted_list = extract(table)
calculated_list = calculate(extracted_list)
... write to db ...

def extract(table):
    """extract all but unused values"""
    new_list = []
    for row in table:
        did = row[0]
        timestamp = row[1]
        data = row[2]

        a_dict = 'did': did, 'timestamp': timestamp, 'data': data
        new_list.append(a_dict)

    return new_list


def calculate(a_list):
    """perform calculations on values"""
    dict_list = []
    temp_today = 0
    count = 0
    for row in a_list:
        date = datetime.fromtimestamp(row['timestamp'] / 1000) # from ms to sec
        today = int(date.strfime('%d'))
        if temp_today is not today:
            new_dict = 'date': date.strftime('%d/%m-%Y'),
                        'reading_count': count,
                        'time': date.strftime('%H:%M:%S')
            dict_list.append(new_dict)

    return dict_list
        
        

【问题讨论】:

可以直接使用像额外 psycopg2 包中提供的 dict cursor 类就足以满足您的要求吗? 您正在从数据库中提取大量数据,进行一些相当简单的处理,然后将结果推回数据库。您是否考虑过在数据库中使用纯 SQL 进行所有这些操作?它会更有效,具体取决于您的实际逐行处理是否像您的示例所暗示的那样简单。 @SergeBallesta 可能是。我会看看文档。 @dnswlt 除了请求和写入之外,我在 SQL 方面并不那么坚定,tbh。事实上我不知道这是可能的。 @Zeliax,很有可能 :) 我现在已经同意了,但如果仍然相关,我可能会稍后创建一些草图 SQL 脚本。 【参考方案1】:

create_standard_list()format_table_dictionary() 可以构建生成器(yielding 每个项目而不是 returning 完整列表),这将停止将整个列表保存在内存中,因此应该可以解决您的问题,例如:

def create_standard_list(sensor_id, data):
    for row in data:
        row_timestamp = row[2]
        row_data = row[3]

        temp_object = "sensor_id": sensor_id, "timestamp": row_timestamp,
                       "data": row_data
        yield temp_object
       #^ yield each item instead of appending to a list

有关generators 和yield keyword 的更多信息。

【讨论】:

所以我应该重写这两个函数以产生它们的结果,然后立即将该结果写入我的数据库?我已经阅读了有关生成器和 yield 函数的内容,据我所知,我必须创建变量 yielded1=create_standard_list(data)yielded2=format_table_dictionary(yielded1) 然后直接将它们写入数据库。但是我假设我必须使用生成器创建一个 for 循环? 是的,要写入数据,您可以遍历生成器并逐项写入,例如:for item in yielded1: out_file.write(item) 因此,如果我在两个函数中都使用 yield,我必须创建一个嵌套的 for 循环,我假设要遍历两个结果? (我承认我在上面的评论中存在误解,应该按照您的建议进行。)我阅读了一些文档,但没有发现 yield 函数之后的代码是否也运行。是吗? @Zeliax 是的,yield 不会终止函数(就像return 一样)。 yield 在这个意义上更像print @Zeliax 是的,这将是 yield 值的元组【参考方案2】:

IIUC,您在这里尝试做的是在 Python 代码中模拟 SQL GROUP BY 表达式。这永远不会像直接在数据库中那样快速和高效。 您的示例代码似乎有一些问题,但我将其理解为:您想要 对于给定did 发生的每一天,计算每天的行数。还有,你是 对每组值(即每一天)的最小(或最大,或中值,无关紧要)时间感兴趣。

让我们建立一个小示例表(在 Oracle 上测试):

create table t1 (id number primary key, created timestamp, did number, other_data varchar2(200));  

insert into t1 values (1, to_timestamp('2017-01-31 17:00:00', 'YYYY-MM-DD HH24:MI:SS'), 9001, 'some text');
insert into t1 values (2, to_timestamp('2017-01-31 19:53:00', 'YYYY-MM-DD HH24:MI:SS'), 9001, 'some more text');
insert into t1 values (3, to_timestamp('2017-02-01 08:10:00', 'YYYY-MM-DD HH24:MI:SS'), 9001, 'another day');
insert into t1 values (4, to_timestamp('2017-02-01 15:55:00', 'YYYY-MM-DD HH24:MI:SS'), 9001, 'another day, rainy afternoon');
insert into t1 values (5, to_timestamp('2017-02-01 15:59:00', 'YYYY-MM-DD HH24:MI:SS'), 9002, 'different did');
insert into t1 values (6, to_timestamp('2017-02-03 01:01:00', 'YYYY-MM-DD HH24:MI:SS'), 9001, 'night shift');

我们有一些行,分布在几天内,9001。 did 9002 也有一个值,我们将 忽视。现在让我们以简单的SELECT .. GROUP BY 的形式将要写入第二个表的行:

select 
    count(*) cnt, 
    to_char(created, 'YYYY-MM-DD') day, 
    min(to_char(created, 'HH24:MI:SS')) min_time 
from t1 
where did = 9001
group by to_char(created, 'YYYY-MM-DD')
;

我们按created 列(时间戳)的日期对所有行进行分组。我们选择 每组的行数,当天本身,以及 - 只是为了好玩 - 每个的最短时间部分 团体。结果:

cnt day         min_time
2   2017-02-01  08:10:00
1   2017-02-03  01:01:00
2   2017-01-31  17:00:00

所以现在您的第二张桌子是SELECT。从中创建一个表很简单:

create table t2 as
select
    ... as above
;

HTH!

【讨论】:

谢谢,是的。你理解正确。差不多,因为我得到的时间是我在给定日期内获得的第一个数据点的时间。我会试试这个,希望我能得到一些工作。我会告诉你。谢谢! :D 很高兴听到!每个给定日期内的“第一个数据点的时间”正是我上面的“分钟”将选择的。祝你好运!

以上是关于Python - 使用巨大的数据集避免内存错误的主要内容,如果未能解决你的问题,请参考以下文章

使用大型数据结构时,避免 Java(eclipse) 中的“内存不足错误”?

在 Pandas 数据框中加载大表时,如何避免 EC2 中的内存错误?

make 中惰性集和立即集之间的巨大内存使用差异

python:类与元组巨大的内存开销(?)

避免使用promises中的内存泄漏并在咖啡脚本中循环(无需等待)

大矩阵和内存问题