Python 多处理以僵尸结尾
Posted
技术标签:
【中文标题】Python 多处理以僵尸结尾【英文标题】:Python multiprocessing ending with zombies 【发布时间】:2022-01-03 19:00:39 【问题描述】:将在 ubuntu 上处理 36 000 个目标。经过 13-14 小时的计算和 5814 个目标后,进程数(最初为 120 个)下降,进程正在变成僵尸。
我这样实现了多处理:
from multiprocessing import Process
import gc
import traceback
from scrapy.crawler import CrawlerProcess
from scrapy.settings import Settings
crawler_settings = Settings()
crawler_settings.setmodule(my_settings)
from scrapy.spiders.sales import SalesSpider
def format_target(seller):
return f"xxxxxxxxxxxsellerxxxxxxxxxxxxxx"
def launch_crawler(crawler, seller):
try:
formated_seller = format_target(seller[1])
if formated_seller:
process = CrawlerProcess(crawler_settings)
process.crawl(crawler, seller[0], formated_seller, seller[2])
process.start(stop_after_crawl=True)
del formated_seller
del process
except:
print(traceback.format_exc())
def process_x(urls_lst, process_nb):
list_process = [None] * process_nb
while urls_lst:
for i in range(process_nb):
if not (list_process[i] and list_process[i].is_alive()):
list_process[i] = Process(target=launch_crawler, args=(SalesSpider, urls_lst.pop(0)))
list_process[i].start()
gc.collect()
break
## Wait all thread end
for process in list_process:
if process:
process.join()
gc.collect()
## MAIN
sellers = [...] ## 36k objects
process_x(sellers,120)
这是第一次发生这种情况。我已经让它运行了好几天,并且完全没有任何问题。
如何预防?
尝试 1
def format_target(seller):
return f"xxxxxxxxxxxsellerxxxxxxxxxxxxxx"
def launch_crawler(crawler, seller):
try:
formated_seller = format_target(seller[1])
if formated_seller:
process = CrawlerProcess(crawler_settings)
process.crawl(crawler, seller[0], formated_seller, seller[2])
process.start(stop_after_crawl=True)
except:
print(traceback.format_exc())
if __name__=="__main__":
n_cpu = multiprocessing.cpu_count()
processes = int(math.ceil(2.5 * n_cpu))
sellers = get_sellers() ## Returning psycopg2.extras.Record, can't pickle
sellers = [[seller[0],seller[1],seller[2]] for seller in sellers]
chunksize, remainder = divmod(len(sellers), processes)
if remainder:
chunksize += 1
pool = multiprocessing.Pool(processes)
pool.imap_unordered(partial(launch_crawler, SalesSpider),
sellers,
chunksize=chunksize)
pool.close()
pool.join()
导致错误:
[INFO] Crawled 0 pages (at 0 pages/min), scraped 0 items (at 0 items/min)
2021-11-30 15:51:48 [scrapy.extensions.logstats] INFO: Crawled 0 pages (at 0 pages/min), scraped 0 items (at 0 items/min)
Traceback (most recent call last):
File "scraping/ebayspy/main.py", line 320, in launch_crawler
process.start(stop_after_crawl=True)
File "/home/ubuntu/.local/lib/python3.8/site-packages/scrapy/crawler.py", line 327, in start
reactor.run(installSignalHandlers=False) # blocking call
File "/home/ubuntu/.local/lib/python3.8/site-packages/twisted/internet/base.py", line 1317, in run
self.startRunning(installSignalHandlers=installSignalHandlers)
File "/home/ubuntu/.local/lib/python3.8/site-packages/twisted/internet/base.py", line 1299, in startRunning
ReactorBase.startRunning(cast(ReactorBase, self))
File "/home/ubuntu/.local/lib/python3.8/site-packages/twisted/internet/base.py", line 843, in startRunning
raise error.ReactorNotRestartable()
twisted.internet.error.ReactorNotRestartable
【问题讨论】:
【参考方案1】:我认为问题出在以下代码部分:
if not (list_process[i] and list_process[i].is_alive()):
list_process[i] = Process(target=launch_crawler, args=(SalesSpider, urls_lst.pop(0)))
如果list_process[i]
不是None
但进程已终止,则list_process[i].is_alive()
将返回False
,并且您的if 语句正在测试的完整布尔表达式将是True
,您最终将替换@ 987654327@ 和一个新的 Process
实例从未加入之前由 list_process[i]
引用的进程。这将导致僵尸进程。所以修改代码如下:
p = list_process[i]
if not (p and p.is_alive()):
if p:
p.join()
list_process[i] = Process(target=launch_crawler, args=(SalesSpider, urls_lst.pop(0)))
更新
如果您有充分的理由并行运行scrapy
,那么使用多处理池可能会大大有助于解决您的僵尸问题。将N_PROCESSES
设置为所需的并发进程数。由于scrapy
进程在很大程度上等待网络请求完成,所以这个数字没有理由不能大于您拥有的 CPU 内核数。但请注意,进程确实会占用大量资源,例如内存。所以 120 个进程可能过大。
from multiprocessing import Pool
import traceback
from functools import partial
from scrapy.crawler import CrawlerProcess
from scrapy.settings import Settings
from scrapy.spiders.sales import SalesSpider
crawler_settings = Settings()
crawler_settings.setmodule(my_settings)
def format_target(seller):
return f"xxxxxxxxxxxsellerxxxxxxxxxxxxxx"
def launch_crawler(crawler, seller):
try:
formated_seller = format_target(seller[1])
if formated_seller:
process = CrawlerProcess(crawler_settings)
process.crawl(crawler, seller[0], formated_seller, seller[2])
process.start(stop_after_crawl=True)
del formated_seller
del process
except:
print(traceback.format_exc())
if __name__ == '__main__':
sellers = [...] ## 36k objects
N_PROCESSES = 120 # Really this many?
chunksize, remainder = divmod(len(sellers), 4 * N_PROCESSES)
if remainder:
chunksize += 1
pool = Pool(N_PROCESSES)
pool.imap_unordered(partial(launch_crawler, SalesSpider),
sellers,
chunksize=chunksize)
pool.close()
pool.join()
【讨论】:
测试后又报错:assert self._popen is not None, 'can only join a started process' AssertionError: can only join a started process (on p.join()) 当您向list_process[i]
添加进程时,下一条语句是list_process[i].start()
。所以我看不出你如何在list_process
中有任何未启动的进程。您的AssertionError
似乎表明异常发生在其他地方。您是否仔细查看了堆栈跟踪?
如何查看我的堆栈跟踪?对不起,我不熟悉这个。我目前正在运行:while urls_lst: for i in range(process_nb): p = list_process[i] if not (p and p.is_alive()): list_process[i] = Process(target=launch_crawler, args=( SalesSpider, urls_lst.pop(0))) list_process[i].start() if p: p.join()
堆栈跟踪是导致异常的行号和语句的列表,具体来说 assert self._popen is not None, 'can only join a started process' AssertionError: can only join a开始进程。其中一些语句将在您的代码中,而其他语句可能在您的代码调用的函数中,具体取决于调用异常的位置。查看程序中的哪个语句导致了异常。
但我有问题要问你:我对Scrapy
不是很熟悉,但它没有内置支持抓取多个 URL,它为你处理所有多线程,所以真的需要自己创建多个进程吗?如果你觉得有必要,为什么不使用多线程,因为这些爬虫似乎适合多线程(这就是 Scrapy
使用的)。以上是关于Python 多处理以僵尸结尾的主要内容,如果未能解决你的问题,请参考以下文章
linux C/C++多进程教程(多进程原理以及多进程的应用以多连接socket服务端为例(fork子进程处理socket_fd),同时介绍了僵尸进程产生原因与解决方法)(getpidfork)