尝试在多线程中处理链接时出错

Posted

技术标签:

【中文标题】尝试在多线程中处理链接时出错【英文标题】:Getting error when trying to process links in multithread 【发布时间】:2016-02-01 00:10:03 【问题描述】:

当我尝试通过 python3.4 中的 asyncio 和 concurrent.futures 模块处理具有 20 个线程的 100k url 时,出现此错误。它会在脚本运行 2-5 分钟后出现。

concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
Task exception was never retrieved
future: <Task finished coro=<main() done, defined at async.py:173> exception=BrokenProcessPool('A process in the process pool was terminated abruptly while the future was running or pending.',)>

我正在尝试优化我的代码,但仍然出现此错误,如前所述。

代码:

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor
from grab import Grab
import random
import psycopg2

# Open connection to the database
connection = psycopg2.connect(database="<....>",
                              user="<....>",
                              password="<....>",
                              host="127.0.0.1",
                              port="5432")

# Create a new cursor for it
c = connection.cursor()

# Select settings from database
c.execute("SELECT * FROM <....> WHERE id=1;")
data = c.fetchall()

# Get time starting script
start_time = time.time()

def operation(link):
    # init grab framework
    g = Grab()
    # try to find some elements on the page
    try:
        # open link
        g.go(link)
        # some link processing
        <....>
    except:
        pass


@asyncio.coroutine
def main(item):
    yield from loop.run_in_executor(p, operation, item)

# Create async loop, declare number of threads
loop = asyncio.get_event_loop()
p = ProcessPoolExecutor(data[0][13])  # =20

# Init tasks list - empty
tasks = []

# Select all urls which need to process
c.execute ("SELECT url FROM <....> ORDER BY id;")

# Forming tasks
for item in c.fetchall():
    tasks.append(main(item[0]))

# Close main connection to the database
connection.close()
# Run async tasks
loop.run_until_complete(asyncio.wait(tasks))
# Close loop
loop.close()
# Get script finish time
print("--- %s seconds ---" % (time.time() - start_time))

【问题讨论】:

【参考方案1】:

loop.close() 之后添加p.shutdown() 等待完成所有已执行的任务。

【讨论】:

仍然遇到同样的错误。回溯到这一行:yield from loop.run_in_executor(p, operation, item) 现在怎么了? 啊啊。您在operation() 调用中遇到未处理的异常。

以上是关于尝试在多线程中处理链接时出错的主要内容,如果未能解决你的问题,请参考以下文章

在多线程 C++11 程序中未处理异常时会发生啥?

RC 在多线程/多处理的上下文中代表啥?

Python中的多线程并行运行

OKHTTP Singleton 对象在多线程系统中处理不同的 API 调用

在多线程中使用链表队列

ZeroMQ 在多线程应用程序中处理中断