Python - 从多个线程附加到同一个文件

Posted

技术标签:

【中文标题】Python - 从多个线程附加到同一个文件【英文标题】:Python - appending to same file from multiple threads 【发布时间】:2012-08-12 14:41:32 【问题描述】:

我正在编写一个应用程序,它可以从多个线程将行附加到同一个文件。

我遇到了一个问题,其中一些行被附加而没有新行。

有什么解决办法吗?

class PathThread(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def printfiles(self, p):
        for path, dirs, files in os.walk(p):
            for f in files:
                print(f, file=output)

    def run(self):
        while True:
            path = self.queue.get()
            self.printfiles(path)
            self.queue.task_done()


pathqueue = Queue.Queue()
paths = getThisFromSomeWhere()

output = codecs.open('file', 'a')

# spawn threads
for i in range(0, 5):
    t = PathThread(pathqueue)
    t.setDaemon(True)
    t.start()

# add paths to queue
for path in paths:
    pathqueue.put(path)

# wait for queue to get empty
pathqueue.join()

【问题讨论】:

贴一些代码,会有帮助的。 写入时检查文件最后一个字符是否为换行符。如果不是,请附加一个。当然,这需要用 r+ 而不是 a 打开,这可能不是你想要的。 【参考方案1】:

解决方案是只在一个线程中写入文件。

import Queue  # or queue in Python 3
import threading

class PrintThread(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def printfiles(self, p):
        for path, dirs, files in os.walk(p):
            for f in files:
                print(f, file=output)

    def run(self):
        while True:
            result = self.queue.get()
            self.printfiles(result)
            self.queue.task_done()

class ProcessThread(threading.Thread):
    def __init__(self, in_queue, out_queue):
        threading.Thread.__init__(self)
        self.in_queue = in_queue
        self.out_queue = out_queue

    def run(self):
        while True:
            path = self.in_queue.get()
            result = self.process(path)
            self.out_queue.put(result)
            self.in_queue.task_done()

    def process(self, path):
        # Do the processing job here

pathqueue = Queue.Queue()
resultqueue = Queue.Queue()
paths = getThisFromSomeWhere()

output = codecs.open('file', 'a')

# spawn threads to process
for i in range(0, 5):
    t = ProcessThread(pathqueue, resultqueue)
    t.setDaemon(True)
    t.start()

# spawn threads to print
t = PrintThread(resultqueue)
t.setDaemon(True)
t.start()

# add paths to queue
for path in paths:
    pathqueue.put(path)

# wait for queue to get empty
pathqueue.join()
resultqueue.join()

【讨论】:

在 ProcessThread 中,行 - result = self.process(path) ?你不在那里配置 process() 方法.. 你应该定义过程方法来做你想做的事。我只是修改代码来澄清这一点。 有必要一直旋转吗?还是建议在队列中使用阻塞get() 代替? 我们也可以使用Lock 来完成它吗?【参考方案2】:

您永远不会在同一行上看到混乱的文本或在一行中间看到新行这一事实表明您实际上不需要同步附加到文件。问题是您使用 print 写入单个文件句柄。我怀疑print 实际上是在一次调用中对文件句柄执行 2 次操作,并且这些操作在线程之间竞速。基本上print 正在做类似的事情:

file_handle.write('whatever_text_you_pass_it')
file_handle.write(os.linesep)

并且由于不同的线程同时在同一个文件句柄上执行此操作,有时一个线程将进入第一次写入,然后另一个线程将进入第一次写入,然后您将连续获得两个回车符。或者实际上是这些的任何排列。

解决这个问题的最简单方法是停止使用print,直接使用write。试试这样的:

output.write(f + os.linesep)

这对我来说仍然很危险。我不确定所有使用相同文件句柄对象并争夺其内部缓冲区的线程可以期待什么保证。个人 id 旁观整个问题,只是让每个线程都有自己的文件句柄。另请注意,这是有效的,因为写入缓冲区刷新的默认设置是行缓冲的,因此当它对文件进行刷新时,它以os.linesep 结束。强制它使用行缓冲发送1 作为open 的第三个参数。你可以像这样测试它:

#!/usr/bin/env python
import os
import sys
import threading

def hello(file_name, message, count):
  with open(file_name, 'a', 1) as f:
    for i in range(0, count):
      f.write(message + os.linesep)

if __name__ == '__main__':
  #start a file
  with open('some.txt', 'w') as f:
    f.write('this is the beginning' + os.linesep)
  #make 10 threads write a million lines to the same file at the same time
  threads = []
  for i in range(0, 10):
    threads.append(threading.Thread(target=hello, args=('some.txt', 'hey im thread %d' % i, 1000000)))
    threads[-1].start()
  for t in threads:
    t.join()
  #check what the heck the file had
  uniq_lines = set()
  with open('some.txt', 'r') as f:
    for l in f:
      uniq_lines.add(l)
  for u in uniq_lines:
    sys.stdout.write(u)

输出如下:

hey im thread 6
hey im thread 7
hey im thread 9
hey im thread 8
hey im thread 3
this is the beginning
hey im thread 5
hey im thread 4
hey im thread 1
hey im thread 0
hey im thread 2

【讨论】:

set() 不保留顺序,因此“开始”行实际上可能不会写在第一个线程之后 @crypdick 是的,这是故意的。上面的测试让每个线程将一百万条相同的行写入文件。该集合然后收集所有唯一的行,我们不关心输出的顺序我们想要证明的是没有一行被多个线程同时写入同一个文件而混乱【参考方案3】:

也许还有一些不应该出现的换行符?

您应该牢记这样一个事实,即一次不应由多个线程访问共享资源,否则可能会发生不可预知的后果(在使用线程时称为“原子操作”)。

看看这个页面有点直觉:Thread Synchronization Mechanisms in Python

【讨论】:

以上是关于Python - 从多个线程附加到同一个文件的主要内容,如果未能解决你的问题,请参考以下文章

遇到多线程问题同时连接到多个设备

如何在 Python 中使用 Pandas 数据结构附加多个 CSV 文件

从列表框中选择多个项目以附加到电子邮件中

如何将多个唯一元素附加到Python中的同一组列?

将多个文本文件合并到一个Excel工作表中

从文本文件中读取句子并使用 Python 3 附加到列表中 [关闭]