python多线程应用挂起
Posted
技术标签:
【中文标题】python多线程应用挂起【英文标题】:python multithreaded application hanging 【发布时间】:2018-09-02 04:17:51 【问题描述】:我编写了一个程序,用于对在多线程批量写入条件下执行的 mongodb 数据库进行基准测试。
问题是程序挂起并且没有完成执行。
我很确定问题是由于将 530838 条记录写入数据库并使用 10 个线程一次批量写入 50 条记录。这留下了 38 条记录的模值,但是 run 方法从队列中获取 50 条记录,因此当写入 530800 条记录时进程挂起并且永远不会写入最后的 38 条记录,因为以下代码永远不会完成执行
for object in range(50):
objects.append(self.queue.get())
我希望程序一次写入 50 条记录,直到剩余的记录少于 50 条,此时它应该将剩余的记录写入队列,然后在队列中没有记录时退出线程。
提前谢谢:)
import threading
import Queue
import json
from pymongo import MongoClient, InsertOne
import datetime
#Set the number of threads
n_thread = 10
#Create the queue
queue = Queue.Queue()
#Connect to the database
client = MongoClient("mongodb://mydatabase.com")
db = client.threads
class ThreadClass(threading.Thread):
def __init__(self, queue):
threading.Thread.__init__(self)
#Assign thread working with queue
self.queue = queue
def run(self):
while True:
objects = []
#Get next 50 objects from queue
for object in range(50):
objects.append(self.queue.get())
#Insert the queued objects into the database
db.threads.insert_many(objects)
#signals to queue job is done
self.queue.task_done()
#Create number of processes
threads = []
for i in range(n_thread):
t = ThreadClass(queue)
t.setDaemon(True)
#Start thread
t.start()
#Start timer
starttime = datetime.datetime.now()
#Read json object by object
content = json.load(open("data.txt","r"))
for jsonobj in content:
#Put object into queue
queue.put(jsonobj)
#wait on the queue until everything has been processed
queue.join()
for t in threads:
t.join()
#Print the total execution time
endtime = datetime.datetime.now()
duration = endtime-starttime
print(divmod(duration.days * 86400 + duration.seconds, 60))
【问题讨论】:
【参考方案1】:从docs on Queue.get
可以看到,默认设置为block=True
和timeout=None
,这会导致在空队列中等待下一个可以获取的项目。
您可以使用get_nowait
或get(False)
来确保您没有被阻止。如果您希望阻塞以队列是否有 50 个项目、是否为空或其他条件为条件,您可以使用 Queue.empty
和 Queue.qsize
,但请注意,它们不提供竞争条件证明保证非阻塞行为...它们只是关于是否将block=False
与get
一起使用的启发式方法。
类似这样的:
def run(self):
while True:
objects = []
#Get next 50 objects from queue
block = self.queue.qsize >= 50
for i in range(50):
try:
item = self.queue.get(block=block)
except Queue.Empty:
break
objects.append(item)
#Insert the queued objects into the database
db.threads.insert_many(objects)
#signals to queue job is done
self.queue.task_done()
另一种方法是设置timeout
并使用try
... except
块来捕获引发的任何Empty
异常。这样做的好处是您可以决定等待多长时间,而不是启发式地猜测何时立即返回,但它们是相似的。
还请注意,我将循环变量从 object
更改为 i
...您应该避免让循环变量成为全局 object
类的幻影。
【讨论】:
以上是关于python多线程应用挂起的主要内容,如果未能解决你的问题,请参考以下文章