python多线程应用挂起

Posted

技术标签:

【中文标题】python多线程应用挂起【英文标题】:python multithreaded application hanging 【发布时间】:2018-09-02 04:17:51 【问题描述】:

我编写了一个程序,用于对在多线程批量写入条件下执行的 mongodb 数据库进行基准测试。

问题是程序挂起并且没有完成执行。

我很确定问题是由于将 530838 条记录写入数据库并使用 10 个线程一次批量写入 50 条记录。这留下了 38 条记录的模值,但是 run 方法从队列中获取 50 条记录,因此当写入 530800 条记录时进程挂起并且永远不会写入最后的 38 条记录,因为以下代码永远不会完成执行

for object in range(50): objects.append(self.queue.get())

我希望程序一次写入 50 条记录,直到剩余的记录少于 50 条,此时它应该将剩余的记录写入队列,然后在队列中没有记录时退出线程。

提前谢谢:)

import threading
import Queue
import json
from pymongo import MongoClient, InsertOne
import datetime

#Set the number of threads
n_thread = 10
#Create the queue
queue = Queue.Queue()

#Connect to the database
client = MongoClient("mongodb://mydatabase.com")
db = client.threads

class ThreadClass(threading.Thread):

    def __init__(self, queue):
        threading.Thread.__init__(self)
    #Assign thread working with queue
        self.queue = queue


    def run(self):

        while True:
            objects = []

        #Get next 50 objects from queue
            for object in range(50):
                objects.append(self.queue.get())

                #Insert the queued objects into the database           
            db.threads.insert_many(objects)

            #signals to queue job is done
            self.queue.task_done()


#Create number of processes
threads = []

for i in range(n_thread):
    t = ThreadClass(queue)
    t.setDaemon(True)
    #Start thread
    t.start()

#Start timer
starttime = datetime.datetime.now()

#Read json object by object
content = json.load(open("data.txt","r"))
for jsonobj in content:
    #Put object into queue
    queue.put(jsonobj)
#wait on the queue until everything has been processed 
queue.join()

for t in threads:
    t.join()

#Print the total execution time
endtime = datetime.datetime.now()
duration = endtime-starttime
print(divmod(duration.days * 86400 + duration.seconds, 60))

【问题讨论】:

【参考方案1】:

从docs on Queue.get 可以看到,默认设置为block=Truetimeout=None,这会导致在空队列中等待下一个可以获取的项目。

您可以使用get_nowaitget(False) 来确保您没有被阻止。如果您希望阻塞以队列是否有 50 个项目、是否为空或其他条件为条件,您可以使用 Queue.emptyQueue.qsize,但请注意,它们不提供竞争条件证明保证非阻塞行为...它们只是关于是否将block=Falseget 一起使用的启发式方法。

类似这样的:

def run(self):

    while True:
        objects = []

        #Get next 50 objects from queue
        block = self.queue.qsize >= 50
        for i in range(50):
            try:
                item = self.queue.get(block=block)
            except Queue.Empty:
                break
            objects.append(item)

        #Insert the queued objects into the database           
        db.threads.insert_many(objects)

        #signals to queue job is done
        self.queue.task_done()

另一种方法是设置timeout 并使用try ... except 块来捕获引发的任何Empty 异常。这样做的好处是您可以决定等待多长时间,而不是启发式地猜测何时立即返回,但它们是相似的。

还请注意,我将循环变量从 object 更改为 i ...您应该避免让循环变量成为全局 object 类的幻影。

【讨论】:

以上是关于python多线程应用挂起的主要内容,如果未能解决你的问题,请参考以下文章

python之多线程

Python多线程

简述python(threading)多线程

Python 多处理与多线程相结合

Python核心编程——多线程threading和队列

如何在多线程应用程序中使用 aiopg 池?