无法在嵌套循环中使用 pandas 附加更大的数据帧。如何更改为 numpy 向量化?

Posted

技术标签:

【中文标题】无法在嵌套循环中使用 pandas 附加更大的数据帧。如何更改为 numpy 向量化?【英文标题】:Failing to append bigger data frames with pandas in nested loops. How to change to numpy vectorization? 【发布时间】:2022-01-09 05:26:21 【问题描述】:

我需要从旧的 postgres 数据库中加载一个巨大的表 (6 gb),其中包含一些我需要在加载时删除的错误值。所以我写了一个循环,出于性能原因尝试加载更大的块,但逐步减少以隔离和丢弃坏值。通常这是可行的,但在大约 50 万条记录之后,性能会迅速下降。

我已经发现不建议使用 pandas 处理更大的数据集。这就是我尝试使用 numpy 的原因。但这并没有改变什么。然后我尝试使用列表推导,但由于我必须使用异常来尝试以较小的块进行迭代而失败了。

从我的角度来看,numpy 向量化看起来是个好主意,但我不知道如何让它发挥作用。

https://towardsdatascience.com/how-to-make-your-pandas-loop-71-803-times-faster-805030df4f06

总的来说,这部分我想加快速度。

df = pds.read_sql_query(sql,conn,params=[(i * chunksize), chunksize])
appended_df.append(df)
products_df = pds.concat(appended_df, ignore_index=True)

如果上面的 sn-p 没有足够的上下文,你会发现更多。

# set autocommit = True
conn = pyodbc.connect(conn_str, autocommit=True)

cur = conn.cursor()

# count rows for chunking
sql_count = """\
select count("item_no") from "products" 
"""
cur.execute(sql_count)
sql_row_counter = cur.fetchone()[0]
print("Total rows: " + str(sql_row_counter))

# define chunksize and calculate chunks
chunksize = 35000
chunk_divisor = 100
if chunksize / chunk_divisor < 1:
    chunk_divisor = chunksize
print("Chunk devisor on error: " + str(chunk_divisor))
chksz_lvl2 = int(chunksize / chunk_divisor)
if chksz_lvl2 < 1:
    chksz_lvl2 = 1
chksz_lvl3 = int(chksz_lvl2 / chunk_divisor)
if chksz_lvl3 < 1:
    chksz_lvl3 = 1
# print settings for iteration
print("Chunksize: " + str(chunksize) + "\nChunksize Level 2: " +
       str(chksz_lvl2) + "\nChunksize Level 3: " + str(chksz_lvl3))
chunks = int(sql_row_counter / chunksize)
# Uncomment next row for testpurposes
chunks = 25
print("Chunks: " + str(chunks) + "\n")
error_counter = 0
# iterate chunks
appended_df = []
print("Starting to iterate chunks.\nPlease wait...")

for i in range(0, chunks):
            # try to iterate in full speed
            print("\nNext chunk starts from " + str((i * chunksize)) +
                  " with an limit of " + str(chunksize) + ".")
            try:
                # start runtime measurment
                i_start = time.time()
                # sql statement
                sql = """\
                select "item_no", "description_1", "description_2", "description_3" FROM "products" order by "item_no" offset ? limit ?"""
                # store into dataframe
                df = pds.read_sql_query(sql,
                                        conn,
                                        params=[(i * chunksize), chunksize])
                # get first and last value from dataframe
                head = df["item_no"].iloc[0]
                tail = df["item_no"].iloc[-1]
                # store query
                # Appending data frames via pandas.append() suddenly becomes slower by a factor of 10 from approx. 500,000 data records per 4 columns.
                appended_df.append(df)
                # stop runtime measurement
                i_end = time.time()
                # print result
                print(
                    str(i + 1) + " out of " + str(chunks) + " chunks in " +
                    ":5.3fs".format(i_end - i_start) + " processed.")
            except:
                # collect error information
                print(
                    "\nChunk " + str(i + 1) +
                    " cannot be selected due to an error. Reduce chunk size from "
                    + str(chunksize) + " to " + str(chksz_lvl2) +
                    ". Entering level 2.\nFirst working item_no of last working chunk "
                    + str(head) +
                    "\nLast working item_no of last working chunk " +
                    str(tail))
                ### 2 ### Successively reduce the chunks to narrow down and isolate errors.
                for j in range(0, chunk_divisor):
                     
                  and so on...
                             ...
                                ...
                                   ...
# Merge chunks
print("\nNote: Chunkzize = from row_no to row_no. Could be 1,2,3,4 = range of 4 or compleley different. Ex. 2,45,99,1002 = range of 4.\n\nConcatinate chunks.")
products_df = pds.DataFrame()
products_df = pds.concat(appended_df, ignore_index=True)
print("Done. " + str(error_counter) +
" rows had to be skipped. Details can be found in the full error log.")

conn.close()

【问题讨论】:

你有没有试着站在 dask 的一边? dask.org 您要处理哪些“不良价值观”?除非出现任何任意问题,否则我所看到的都是空的。 不知何故,同事设法将无法通过 utf8 和 win1250 编码的值输入数据库,这是强制性的。这些干扰了选择。已经在各个地方进行了调整,例如在驱动器等处。这种方式是迄今为止最稳定的。因此,我想遵循它。在 500,000 条记录中,只有少数是有问题的。我还没有听说过Dask。我不是本地开发人员,因此一点一点地进入这个话题。 :) @Tbaki 我现在已经尝试切换到 dask。一般来说,现在正在使用 dask,但性能方面没有任何变化。 df = pds.read_sql_query(sql,conn,params=[(i * chunksize), chunksize]) dask_df = from_pandas(df, npartitions=4) appended_df.append(dask_df) products_df = multi.concat(appended_df, ignore_index=True) products_df=products_df.compute() @Tbaki 我能够在 20 分钟内加载 1.4 个 mio 记录。还有一些地方需要改进,但总的来说已经很好了。因此,我最终可以确认,是的,它有效。 :) 【参考方案1】:

我刚刚注意到 python 脚本已经按预期运行。 Dask 等其他框架没有任何机会改进这一点。在我的情况下,我想获取一些数据的源 Postgres DB(在我的情况下为 v. 9.x)在查询大表期间存在关于同时使用 limitorder by 的问题。

我无法直接检测到这一点,因为即使您想查询整个表,我的 SQL 查询工具 (DBeaver) 也只会加载要显示的子集。因此结果是一个假朋友。如果您想通过排序正确地检查带有相当大的offsetlimit 的短选择。

偏移量约为。 500 k 的记录在我的情况下,仅选择一条记录的时间大约需要 10 秒。

解决方案是在我的嵌入式 SQL 脚本的“try”部分删除order by

【讨论】:

以上是关于无法在嵌套循环中使用 pandas 附加更大的数据帧。如何更改为 numpy 向量化?的主要内容,如果未能解决你的问题,请参考以下文章

无法在 python pandas 数据框中附加嵌套的 JSON 值

使用 Pandas 嵌套 JSON

如何在通过 Pandas 在 csv 中写入多个 for 循环的数据时在单个单元格中附加数据?

更快的 For 循环在 Pandas 中处理数据

如何在 Python 中使用 Pandas 数据结构附加多个 CSV 文件

Python/Pandas,.count 不适用于更大的数据框