如何在python中使用多处理将df的内容写入csv文件
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何在python中使用多处理将df的内容写入csv文件相关的知识,希望对你有一定的参考价值。
我有一个函数将df的内容写入csv文件。
def writeToCSV(outDf, defFile, toFile, retainFlag=True, delim=' ', quotechar='"'):
headers = []
fid = open(defFile, 'r')
for line in fid:
headers.append(line.replace('
','').split('
')[0].split(' ')[0])
df = pd.DataFrame([], columns=headers)
for header in outDf.columns.values:
if header in headers:
df[header] = outDf[header]
df.to_csv(toFile, sep=delim, quotechar=quotechar, index=False, encoding='utf-8')
我如何并行化这个过程?目前我正在使用以下代码
def writeToSchemaParallel(outDf, defFile, toFile, retainFlag=True, delim=' ', quotechar='"'):
logInfo('Start writingtoSchema in parallel...', 'track')
headers = []
fid = open(defFile, 'r')
for line in fid:
headers.append(line.replace('
','').split('
')[0].split(' ')[0])
df = pd.DataFrame([], columns=headers)
for header in outDf.columns.values:
if header in headers:
df[header] = outDf[header]
out_Names = Queue()
cores = min([int(multiprocessing.cpu_count() / 2), int(len(outDf) / 200000)+1])
#cores=4
logInfo(str(cores) + 'cores are used...', 'track')
# split the data for parallel computation
outDf = splitDf(df, cores)
# process the chunks in parallel
logInfo('splitdf called are df divided...', 'track')
Filenames=[]
procs = []
fname=toFile.split("_Opera_output")[0]
for i in range(0, cores):
filename=fname+"_"+str(i)+".tsv"
proc = Process(target=writeToSchema, args=(outDf[i], defFile,filename, retainFlag,delim, quotechar,i))
procs.append(proc)
proc.start()
print 'processing '+str(i)
Filenames.append(filename)
# combine all returned chunks
# outDf = out_Names.get()
# for i in range(1, cores):
# outDf = outDf.append(out_q.get(), ignore_index=True)
for proc in procs:
proc.join()
logInfo('Now we merge files...', 'track')
print Filenames
with open(toFile,'w') as outfile:
for fname in Filenames:
with open(fname) as infile:
for line in infile:
outfile.write(line)
但它不起作用并给出以下错误
2017-12-17 16:02:55,078 - track - ERROR: Traceback (most recent call last):
2017-12-17 16:02:55,078 - track - ERROR: File "C:/Users/sudhir.tiwari/Document
s/AMEX2/Workspace/Backup/Trunk/code/runMapping.py", line 257, in <module>
2017-12-17 16:02:55,089 - track - ERROR: writeToSchemaParallel(outDf, defFile, t
oFile, retainFlag, delim=' ', quotechar='"')
2017-12-17 16:02:55,153 - track - ERROR: File "C:Userssudhir.tiwariDocument
sAMEX2WorkspaceBackupTrunkcodeutils.py", line 510, in writeToSchemaParalle
l
2017-12-17 16:02:55,163 - track - ERROR: with open(fname) as infile:
2017-12-17 16:02:55,198 - track - ERROR: IOError
2017-12-17 16:02:55,233 - track - ERROR: :
2017-12-17 16:02:55,233 - track - ERROR: [Errno 2] No such file or directory: 'C
:/Users/sudhir.tiwari/Documents/AMEX2/Workspace/Input/work/Schindler_20171130/Sc
hindler_20171130_0.tsv'
并且它没有写入文件,因为当我搜索没有找到文件的位置时。我正在使用多处理将数据帧写入多个文件然后合并所有文件。拆分df将数据帧分成n部分。
答案
使用多处理方式将比默认方式消耗更多时间(直接保存)。通过使用Synchronization between processes,使用Processes和Lock来并行编写过程。以下是样本POC。
import pandas as pd
import numpy as np
from multiprocessing import Lock, Process
from time import time
def writefile(df,l):
l.acquire()
df.to_csv('dataframe-multiprocessing.csv', index=False, mode='a', header=False)
l.release()
if __name__ == '__main__':
a = np.random.randint(1,1000,10000000)
b = np.random.randint(1,1000,10000000)
c = np.random.randint(1,1000,10000000)
df = pd.DataFrame(data={'a':a,'b':b,'c':c})
print('Iterative way:')
print()
new = time()
df.to_csv('dataframe-conventional.csv', index=False, mode='a', header=False)
print(time() - new, 'seconds')
print()
print('Multiprocessing way:')
print()
new = time()
l = Lock()
p = Process(target=writefile, args=(df,l))
p.start()
p.join()
print(time() - new, 'seconds')
print()
df1 = pd.read_csv('dataframe-conventional.csv')
df2 = pd.read_csv('dataframe-multiprocessing.csv')
print('If both file same or not:')
print(df1.equals(df2))
结果:
C:UsersAriffDocumentsGitHub esting-code>python pandas_multip.py
Iterative way:
18.323541402816772 seconds
Multiprocessing way:
20.14128303527832 seconds
If both file same or not:
True
另一答案
如果要将文件写入磁盘,则没有必要将其并行化。由于将数据写入磁盘本质上不是并行的。并且该文件将始终由OS写入磁盘。因此,您无法通过编写高级编程代码来获得任何性能。
以上是关于如何在python中使用多处理将df的内容写入csv文件的主要内容,如果未能解决你的问题,请参考以下文章