Python 多处理以僵尸结尾

Posted

技术标签:

【中文标题】Python 多处理以僵尸结尾【英文标题】:Python multiprocessing ending with zombies 【发布时间】:2022-01-03 19:00:39 【问题描述】:

将在 ubuntu 上处理 36 000 个目标。经过 13-14 小时的计算和 5814 个目标后,进程数(最初为 120 个)下降,进程正在变成僵尸。

我这样实现了多处理:

from multiprocessing import Process
import gc
import traceback

from scrapy.crawler import CrawlerProcess

from scrapy.settings import Settings
crawler_settings = Settings()
crawler_settings.setmodule(my_settings)

from scrapy.spiders.sales import SalesSpider

def format_target(seller):
    return f"xxxxxxxxxxxsellerxxxxxxxxxxxxxx"

def launch_crawler(crawler, seller):
    try:
        formated_seller = format_target(seller[1])
        if formated_seller:
            process = CrawlerProcess(crawler_settings)
            process.crawl(crawler, seller[0], formated_seller, seller[2])
            process.start(stop_after_crawl=True)
            del formated_seller
            del process
    except:
        print(traceback.format_exc())

def process_x(urls_lst, process_nb):

    list_process = [None] * process_nb
    while urls_lst:
        for i in range(process_nb):
            if not (list_process[i] and list_process[i].is_alive()):
                list_process[i] = Process(target=launch_crawler, args=(SalesSpider, urls_lst.pop(0)))
                list_process[i].start()
                gc.collect()
                break

    ## Wait all thread end
    for process in list_process:
        if process:
            process.join()
            gc.collect()

## MAIN
sellers = [...] ## 36k objects
process_x(sellers,120)

这是第一次发生这种情况。我已经让它运行了好几天,并且完全没有任何问题。

如何预防?

尝试 1

def format_target(seller):
    return f"xxxxxxxxxxxsellerxxxxxxxxxxxxxx"

def launch_crawler(crawler, seller):
    try:
        formated_seller = format_target(seller[1])
        if formated_seller:
            process = CrawlerProcess(crawler_settings)
            process.crawl(crawler, seller[0], formated_seller, seller[2])
            process.start(stop_after_crawl=True)
    except:
        print(traceback.format_exc())

if __name__=="__main__":
    
    n_cpu = multiprocessing.cpu_count()
    processes =  int(math.ceil(2.5 * n_cpu))

    sellers = get_sellers()         ## Returning psycopg2.extras.Record, can't pickle
    sellers = [[seller[0],seller[1],seller[2]] for seller in sellers]

    chunksize, remainder = divmod(len(sellers), processes)
    if remainder:
        chunksize += 1
    pool = multiprocessing.Pool(processes)
    pool.imap_unordered(partial(launch_crawler, SalesSpider),
                        sellers,
                        chunksize=chunksize)
    pool.close()
    pool.join()

导致错误:

[INFO] Crawled 0 pages (at 0 pages/min), scraped 0 items (at 0 items/min)
2021-11-30 15:51:48 [scrapy.extensions.logstats] INFO: Crawled 0 pages (at 0 pages/min), scraped 0 items (at 0 items/min)
Traceback (most recent call last):
  File "scraping/ebayspy/main.py", line 320, in launch_crawler
    process.start(stop_after_crawl=True)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/scrapy/crawler.py", line 327, in start
    reactor.run(installSignalHandlers=False)  # blocking call
  File "/home/ubuntu/.local/lib/python3.8/site-packages/twisted/internet/base.py", line 1317, in run
    self.startRunning(installSignalHandlers=installSignalHandlers)
  File "/home/ubuntu/.local/lib/python3.8/site-packages/twisted/internet/base.py", line 1299, in startRunning
    ReactorBase.startRunning(cast(ReactorBase, self))
  File "/home/ubuntu/.local/lib/python3.8/site-packages/twisted/internet/base.py", line 843, in startRunning
    raise error.ReactorNotRestartable()
twisted.internet.error.ReactorNotRestartable

【问题讨论】:

【参考方案1】:

我认为问题出在以下代码部分:

            if not (list_process[i] and list_process[i].is_alive()):
                list_process[i] = Process(target=launch_crawler, args=(SalesSpider, urls_lst.pop(0)))

如果list_process[i] 不是None 但进程已终止,则list_process[i].is_alive() 将返回False,并且您的if 语句正在测试的完整布尔表达式将是True,您最终将替换@ 987654327@ 和一个新的 Process 实例从未加入之前由 list_process[i] 引用的进程。这将导致僵尸进程。所以修改代码如下:

            p = list_process[i]
            if not (p and p.is_alive()):
                if p:
                    p.join()
                list_process[i] = Process(target=launch_crawler, args=(SalesSpider, urls_lst.pop(0)))

更新

如果您有充分的理由并行运行scrapy,那么使用多处理池可能会大大有助于解决您的僵尸问题。将N_PROCESSES 设置为所需的并发进程数。由于scrapy 进程在很大程度上等待网络请求完成,所以这个数字没有理由不能大于您拥有的 CPU 内核数。但请注意,进程确实会占用大量资源,例如内存。所以 120 个进程可能过大

from multiprocessing import Pool
import traceback
from functools import partial

from scrapy.crawler import CrawlerProcess
from scrapy.settings import Settings
from scrapy.spiders.sales import SalesSpider

crawler_settings = Settings()
crawler_settings.setmodule(my_settings)


def format_target(seller):
    return f"xxxxxxxxxxxsellerxxxxxxxxxxxxxx"

def launch_crawler(crawler, seller):
    try:
        formated_seller = format_target(seller[1])
        if formated_seller:
            process = CrawlerProcess(crawler_settings)
            process.crawl(crawler, seller[0], formated_seller, seller[2])
            process.start(stop_after_crawl=True)
            del formated_seller
            del process
    except:
        print(traceback.format_exc())

if __name__ == '__main__':
    sellers = [...] ## 36k objects
    N_PROCESSES = 120 # Really this many?
    chunksize, remainder = divmod(len(sellers), 4 * N_PROCESSES)
    if remainder:
        chunksize += 1
    pool = Pool(N_PROCESSES)
    pool.imap_unordered(partial(launch_crawler, SalesSpider),
                        sellers,
                        chunksize=chunksize)
    pool.close()
    pool.join()

【讨论】:

测试后又报错:assert self._popen is not None, 'can only join a started process' AssertionError: can only join a started process (on p.join()) 当您向list_process[i] 添加进程时,下一条语句是list_process[i].start()。所以我看不出你如何在list_process 中有任何未启动的进程。您的AssertionError 似乎表明异常发生在其他地方。您是否仔细查看了堆栈跟踪? 如何查看我的堆栈跟踪?对不起,我不熟悉这个。我目前正在运行:while urls_lst: for i in range(process_nb): p = list_process[i] if not (p and p.is_alive()): list_process[i] = Process(target=launch_crawler, args=( SalesSpider, urls_lst.pop(0))) list_process[i].start() if p: p.join() 堆栈跟踪是导致异常的行号和语句的列表,具体来说 assert self._popen is not None, 'can only join a started process' AssertionError: can only join a开始进程。其中一些语句将在您的代码中,而其他语句可能在您的代码调用的函数中,具体取决于调用异常的位置。查看程序中的哪个语句导致了异常。 但我有问题要问你:我对Scrapy 不是很熟悉,但它没有内置支持抓取多个 URL,它为你处理所有多线程,所以真的需要自己创建多个进程吗?如果你觉得有必要,为什么不使用多线程,因为这些爬虫似乎适合多线程(这就是 Scrapy 使用的)。

以上是关于Python 多处理以僵尸结尾的主要内容,如果未能解决你的问题,请参考以下文章

多任务编程 -- 孤儿进程和僵尸进程

用 Python 实现植物大战僵尸代码!

Python植物大战僵尸

戏说守护僵尸孤儿进程

PHP多进程编程之僵尸进程问题

linux C/C++多进程教程(多进程原理以及多进程的应用以多连接socket服务端为例(fork子进程处理socket_fd),同时介绍了僵尸进程产生原因与解决方法)(getpidfork)