Python ThreadPoolExecutor 异常中止解决方案

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python ThreadPoolExecutor 异常中止解决方案相关的知识,希望对你有一定的参考价值。

参考技术A

通常情况,我们利用 Ctrl+C 让程序触发 KeyboardInterrupt 异常,中止程序运行。线程池方案下, Ctrl-C 失效,当线程池里的线程任务跑完后,才会触发 KeyboardInterrupt

上下文管理协议相当于隐性地省略了 threadPool.shutdown(wait=True) ,同时,程序正常执行完成或出现异常中断的时候,就会调用 __exit__() 方法,接下来进行异常中止的基础。

适用于 Django 等 WEB 应用框架,本身自带多线程,修改全局变量简单,但要注意线程安全。

程序运行中,只需 sign = 1 或者 exiting.set() ,worker 函数则跳过主要运算部分,剩余线程任务将迅速完成,变相达到中止多线程任务的目的。

提交给线程池的每个线程任务 task 加入 threadPool 中,方便后续对 task 进行操作。当 for 循环内的 task 全部提交后,线程会再后台运行,而进程运行至 while 中堵塞,直至 threadPool 中最后一个线程是否 .done() 。若进程堵塞在 while 中接收到 Ctrl+C KeyboardInterrupt 异常,则从后往前取消 threadPool 中所有任务,达到中止目的。

在 Python 中使用 ThreadPoolExecutor 获取数据期间添加一列

【中文标题】在 Python 中使用 ThreadPoolExecutor 获取数据期间添加一列【英文标题】:Add a column during getting data with ThreadPoolExecutor in Python 【发布时间】:2022-01-14 07:36:04 【问题描述】:

我想使用 ThreadPoolExecutor 从下面的链接中读取具有不同编号的不同页面,并将相关编号作为新列保存到数据帧中。

https://booking.snav.it/api/v1/rates/1030/2019-02-25/1042/2019-02-25?lang=1

数字变化如下:

from concurrent.futures import ThreadPoolExecutor, as_completed
from pandas import json_normalize
import pandas as pd
import requests


def download_file(url):
    url_info = requests.get(url, stream=True)
    jdata = url_info.json()
    return jdata


nums = [1030,1031,1040,1050,1020,1021,1010,1023]
urls= [f"https://booking.snav.it/api/v1/rates/i/2019-02-25/1042/2019-02-25?lang=1" for i in nums]
with ThreadPoolExecutor(max_workers=14) as executor:
     for url in urls:
         sleep(0.1)
         processes.append(executor.submit(download_file, url))

for index, task in enumerate(as_completed(processes)):
    jdata = task.result()
    tmp = json_normalize(jdata)
    tmp["num"] = nums[index]
df = df.append(tmp)
print(df.head())

在上面的代码中,我尝试使用多线程读取数据,并将每个 json 响应的相关编号作为df 数据帧的新列读取。但是这段代码不起作用,因为使用多线程,nums 的数字顺序与抓取的 json 响应不同。我该怎么办?

【问题讨论】:

您可以enumerate() urls 并发送(index, url) 并返回(index, jdata),以便稍后您可以使用index 以正确的顺序对结果进行排序。 【参考方案1】:

试试这个:

from concurrent.futures import ThreadPoolExecutor

...

with ThreadPoolExecutor(max_workers=14) as executor:
     rv = executor.map(download_file, urls)

for index, jdata in enumerate(rv):
    tmp = json_normalize(jdata)
    tmp["num"] = nums[index]
    df.append(tmp)

print(df.head())

【讨论】:

我应该导入什么来使用mp 我犯了一个错误,现在你编辑了你的问题,一切看起来都很好。 map 在结果中保留为参数传递的迭代的顺序。检查starmap 以获得其他有用的界面。 我写了它,但是当我想运行 for index, task in enumerate(as_completed(rv)): 时遇到错误:Exception has occurred: TypeError x unhashable type: 'list' for next for 循环 您必须消除as_completed 调用。阅读map 文档:返回值具有函数调用的结果。 哦,天哪...我总是在代码中留下小错误。 rv 等价于rv= [download_file(f) for f in urls],但使用的是线程池。

以上是关于Python ThreadPoolExecutor 异常中止解决方案的主要内容,如果未能解决你的问题,请参考以下文章

[python] ThreadPoolExecutor线程池 python 线程池

如何在 python 3 中将队列与并发未来的 ThreadPoolExecutor 一起使用?

python爬虫 threading 多线程 ThreadPoolExecutor线程池

python线程池ThreadPoolExecutor.submit的数据丢失问题

Python ThreadPoolExecutor - 回调是不是保证与提交的函数在同一个线程中运行?

Python ThreadPoolExecutor 异常中止解决方案