Python ThreadPoolExecutor 异常中止解决方案
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python ThreadPoolExecutor 异常中止解决方案相关的知识,希望对你有一定的参考价值。
参考技术A通常情况,我们利用 Ctrl+C 让程序触发 KeyboardInterrupt 异常,中止程序运行。线程池方案下, Ctrl-C 失效,当线程池里的线程任务跑完后,才会触发 KeyboardInterrupt 。
上下文管理协议相当于隐性地省略了 threadPool.shutdown(wait=True) ,同时,程序正常执行完成或出现异常中断的时候,就会调用 __exit__() 方法,接下来进行异常中止的基础。
适用于 Django 等 WEB 应用框架,本身自带多线程,修改全局变量简单,但要注意线程安全。
程序运行中,只需 sign = 1 或者 exiting.set() ,worker 函数则跳过主要运算部分,剩余线程任务将迅速完成,变相达到中止多线程任务的目的。
提交给线程池的每个线程任务 task 加入 threadPool 中,方便后续对 task 进行操作。当 for 循环内的 task 全部提交后,线程会再后台运行,而进程运行至 while 中堵塞,直至 threadPool 中最后一个线程是否 .done() 。若进程堵塞在 while 中接收到 Ctrl+C 的 KeyboardInterrupt 异常,则从后往前取消 threadPool 中所有任务,达到中止目的。
在 Python 中使用 ThreadPoolExecutor 获取数据期间添加一列
【中文标题】在 Python 中使用 ThreadPoolExecutor 获取数据期间添加一列【英文标题】:Add a column during getting data with ThreadPoolExecutor in Python 【发布时间】:2022-01-14 07:36:04 【问题描述】:我想使用 ThreadPoolExecutor 从下面的链接中读取具有不同编号的不同页面,并将相关编号作为新列保存到数据帧中。
https://booking.snav.it/api/v1/rates/1030/2019-02-25/1042/2019-02-25?lang=1
数字变化如下:
from concurrent.futures import ThreadPoolExecutor, as_completed
from pandas import json_normalize
import pandas as pd
import requests
def download_file(url):
url_info = requests.get(url, stream=True)
jdata = url_info.json()
return jdata
nums = [1030,1031,1040,1050,1020,1021,1010,1023]
urls= [f"https://booking.snav.it/api/v1/rates/i/2019-02-25/1042/2019-02-25?lang=1" for i in nums]
with ThreadPoolExecutor(max_workers=14) as executor:
for url in urls:
sleep(0.1)
processes.append(executor.submit(download_file, url))
for index, task in enumerate(as_completed(processes)):
jdata = task.result()
tmp = json_normalize(jdata)
tmp["num"] = nums[index]
df = df.append(tmp)
print(df.head())
在上面的代码中,我尝试使用多线程读取数据,并将每个 json 响应的相关编号作为df
数据帧的新列读取。但是这段代码不起作用,因为使用多线程,nums
的数字顺序与抓取的 json 响应不同。我该怎么办?
【问题讨论】:
您可以enumerate()
urls 并发送(index, url)
并返回(index, jdata)
,以便稍后您可以使用index
以正确的顺序对结果进行排序。
【参考方案1】:
试试这个:
from concurrent.futures import ThreadPoolExecutor
...
with ThreadPoolExecutor(max_workers=14) as executor:
rv = executor.map(download_file, urls)
for index, jdata in enumerate(rv):
tmp = json_normalize(jdata)
tmp["num"] = nums[index]
df.append(tmp)
print(df.head())
【讨论】:
我应该导入什么来使用mp
?
我犯了一个错误,现在你编辑了你的问题,一切看起来都很好。 map
在结果中保留为参数传递的迭代的顺序。检查starmap
以获得其他有用的界面。
我写了它,但是当我想运行 for index, task in enumerate(as_completed(rv)):
时遇到错误:Exception has occurred: TypeError x unhashable type: 'list'
for next for
循环
您必须消除as_completed
调用。阅读map
文档:返回值具有函数调用的结果。
哦,天哪...我总是在代码中留下小错误。 rv
等价于rv= [download_file(f) for f in urls]
,但使用的是线程池。以上是关于Python ThreadPoolExecutor 异常中止解决方案的主要内容,如果未能解决你的问题,请参考以下文章
[python] ThreadPoolExecutor线程池 python 线程池
如何在 python 3 中将队列与并发未来的 ThreadPoolExecutor 一起使用?
python爬虫 threading 多线程 ThreadPoolExecutor线程池
python线程池ThreadPoolExecutor.submit的数据丢失问题