使用多线程加速 Pandas 数据框的创建

Posted

技术标签:

【中文标题】使用多线程加速 Pandas 数据框的创建【英文标题】:Use Multithreading To Speed Up Pandas Dataframe Creation 【发布时间】:2018-03-22 17:17:35 【问题描述】:

我遇到的问题,似乎没有任何答案,是我需要处理一个非常大的文本文件(来自 GUDID 的 gmdnTerms.txt 文件),操作数据以合并行重复 ID,为键值对创建适当的列,并将结果转储到 CSV 文件。除了实现多线程之外,我已经尽我所能来提高效率。我需要能够对迭代文本文件和构建数据框的过程进行多线程处理。多线程教程没有太大帮助。希望有经验的Python程序员能给出明确的答案。下面是整个程序。请帮忙,在 4.7GHz proc(8 核)、16GB RAM 和 SSD 上,当前运行时间超过 20 小时。

#Assumptions this program makes:
#That duplicate product IDs will immediately follow each other
#That the first line of the text file contains only the keys and no values
#That the data lines are delimited by a "\n" character
#That the individual values are delimited by a "|" character
#The first value in each line will always be a unique product ID
#Each line will have exactly 3 values
#Each line's values will always be in the same order

#Import necessary libraries
import os
import pandas as pd
import mmap
import time

#Time to run
startTime = time.time()

#Parameters of the program
fileLocation = "C:\\Users\User\....\GMDNTest.txt"
outCSVFile = "GMDNTermsProcessed.csv"
encodingCSVFile = "utf-8"

#Sets up variables to be used later on
df = pd.DataFrame()
keys = []
idx = 0
keyNum = 0
firstLine = True
firstValue = True
currentKey = ''

#This loops over each line in text file and collapses lines with duplicate Product IDs while building new columns for appropriate keys and values
#These collapsed lines and new columns are stored in a dataframe
with open (fileLocation, "r+b") as myFile:
    map = mmap.mmap(myFile.fileno(), 0, access=mmap.ACCESS_READ)
    for line in iter(map.readline, ""):

        #Gets keys from first line, splits them, stores in list
        if firstLine == True:
            keyRaw = line.split("|")
            keyRaw = [x.strip() for x in keyRaw]
            keyOne = keyRaw[0]
            firstLine = False

        #All lines after first go through this
        #Collapses lines by comparing the unique ID
        #Stores collapsed KVPs into a dataframe
        else:
            #Appends which number of key we are at to the key and breaks up the values into a list
            keys = [x + "_" + str(keyNum) for x in keyRaw]
            temp = line.split("|")
            temp = [x.strip() for x in temp]

            #If the key is the same as the key on the last line this area is run through
            #If this is the first values line it also goes through here
            if temp[0] == currentKey or firstValue == True:

                #Only first values line hits this part; gets first keys and builds first new columns
                if firstValue == True:
                    currentKey = temp[0]
                    df[keyOne] = ""
                    df.at[idx, keyOne] = temp[0]
                    df[keys[1]] = ""
                    df.at[idx, keys[1]] = temp[1]
                    df[keys[2]] = ""
                    df.at[idx, keys[2]] = temp[2]
                    firstValue = False

                #All other lines with the same key as the last line go through here
                else:
                    headers = list(df.columns.values)
                    if keys[1] in headers:
                        df.at[idx, keys[1]] = temp[1]
                        df.at[idx, keys[2]] = temp[2]
                        else:
                        df[keys[1]] = ""
                        df.at[idx, keys[1]] = temp[1]
                        df[keys[2]] = ""
                        df.at[idx, keys[2]] = temp[2]

            #If the current line has a different key than the last line this part is run through
            #Sets new currentKey and adds values from that line to the dataframe
            else:
                idx+=1
                keyNum = 0
                currentKey = temp[0]
                keys = [x + "_" + str(keyNum) for x in keyRaw]
                df.at[idx, keyOne] = temp[0]
                df.at[idx, keys[1]] = temp[1]
                df.at[idx, keys[2]] = temp[2]

        #Don't forget to increment that keyNum      
        keyNum+=1

#Dumps dataframe of collapsed values to a new CSV file
df.to_csv(outCSVFile, encoding=encodingCSVFile, index=False)

#Show us the approx runtime
print("--- %s seconds ---" % (time.time() - startTime))

【问题讨论】:

您的输入文件有多大? 输入样本和输出样本也会有所帮助。 @Steve 这是测试输入数据pastebin.com/z8nKX22t,这是该数据的结果(以 CSV 格式打开)pastebin.com/dhmtKbGE 啊好的,输入文件有多大?我想知道它是否适合内存 @Steve 抱歉,忘记包含实际文件的大小。它的大小约为 880MB。 【参考方案1】:

我不能保证这会更快,但请尝试一下,让我知道它是如何运行的,它会根据您的示例数据正确且快速地运行

import csv
import itertools
import sys

input_filename = sys.argv[1]
output_filename = sys.argv[2]

with open(input_filename, 'r') as input_file, \
     open(output_filename, 'w') as output_file:
    input_reader = csv.reader(input_file, delimiter='|')
    header = next(input_reader)
    header_1_base = header[1]
    header_2_base = header[2]
    header[1] = header_1_base + '_0'
    header[2] = header_2_base + '_0'
    current_max_size = 1
    data = 
    for line in input_reader:
        line[0] = line[0].strip()
        # line[1] = line[1].strip()
        # line[2] = line[2].strip()
        if line[0] in data:
            data[line[0]].append(line[1:])
            if len(data[line[0]]) > current_max_size:
                current_max_size += 1
                header.append('0_1'.format(header_1_base, current_max_size - 1))
                header.append('0_1'.format(header_2_base, current_max_size - 1))
        else:
            data[line[0]] = [line[1:]]

    output_writer = csv.writer(output_file, lineterminator='\n')
    output_writer.writerow(header)
    for id in data:
        output_writer.writerow(itertools.chain([id], itertools.chain(*data[id])))

它没有使用 pandas 数据框,因为您的目标似乎是转换为 csv 格式,而是使用简单的 python 字典。此版本中也没有多线程,但如有必要,稍后可以添加一些。我猜你会遇到的最大瓶颈是如果你的系统内存不足并开始交换,那么我们可以考虑其他方法来加速它。

更新 - 以上是python3将其转换为python2更改:

output_writer.writerow(itertools.chain([id], itertools.chain(*data[id])))

output_writer.writerow([x for x in itertools.chain([id], itertools.chain(*data[id]))])

【讨论】:

我在本地对同一个数据集运行它,但我得到了一个 'output_writer.writerow(itertools.chain([id], itertools.chain(*data[id])))_csv.Error:预期序列'我在 Python 2.7.14 上运行它 啊,这是为python3写的 难以置信的结果!它的运行速度提高了两个数量级(从 1.19 秒降至 0.019 秒)。你认为是什么导致了我原来的方法速度变慢?熊猫?比较标题的所有循环?别的东西?非常感谢您的帮助,真的帮助我在这里学习(并为我自己和我的公司节省了很多时间)。 很多东西 :) 如果你真的想的话,你可能会得到更多的性能。我没有分析,所以这只是猜测...1)使用 csv 库来遍历文件,而不是读取每一行并将其拆分 2)pandas 跟踪大量信息以提供其随附的所有功能一些内存和性能成本 3) 计算每一行的键 4) python 为每个条调用创建一个新字符串 5) 大量临时列表 6) 减少内存占用 - 这会消耗你所有的内存,它会减慢很多. 再次感谢您的解释。将来我会考虑这些因素,并且我将不再使用 Pandas 进行简单的文本到 csv 操作,就像这里一样。我认为我真的没有使用正确/最有效的工具来完成这项工作。干杯!

以上是关于使用多线程加速 Pandas 数据框的创建的主要内容,如果未能解决你的问题,请参考以下文章

是否可以使用多线程加速脚本?

为啥使用多线程更新数组没有加速

Python:使用多线程修改pandas DataFrame时,Spyder会发生错误

[备忘]不用许可证 多线程直接操作界面组件比如超级列表框的实现

Python多线程线程在继续之前不等待.join()

pandas df 在多线程中附加更改变量:为 df 创建初始索引时出现问题,pd 是不是是正确的工具?