在 Python 中使用 ThreadPoolExecutor 获取数据期间添加一列
Posted
技术标签:
【中文标题】在 Python 中使用 ThreadPoolExecutor 获取数据期间添加一列【英文标题】:Add a column during getting data with ThreadPoolExecutor in Python 【发布时间】:2022-01-14 07:36:04 【问题描述】:我想使用 ThreadPoolExecutor 从下面的链接中读取具有不同编号的不同页面,并将相关编号作为新列保存到数据帧中。
https://booking.snav.it/api/v1/rates/1030/2019-02-25/1042/2019-02-25?lang=1
数字变化如下:
from concurrent.futures import ThreadPoolExecutor, as_completed
from pandas import json_normalize
import pandas as pd
import requests
def download_file(url):
url_info = requests.get(url, stream=True)
jdata = url_info.json()
return jdata
nums = [1030,1031,1040,1050,1020,1021,1010,1023]
urls= [f"https://booking.snav.it/api/v1/rates/i/2019-02-25/1042/2019-02-25?lang=1" for i in nums]
with ThreadPoolExecutor(max_workers=14) as executor:
for url in urls:
sleep(0.1)
processes.append(executor.submit(download_file, url))
for index, task in enumerate(as_completed(processes)):
jdata = task.result()
tmp = json_normalize(jdata)
tmp["num"] = nums[index]
df = df.append(tmp)
print(df.head())
在上面的代码中,我尝试使用多线程读取数据,并将每个 json 响应的相关编号作为df
数据帧的新列读取。但是这段代码不起作用,因为使用多线程,nums
的数字顺序与抓取的 json 响应不同。我该怎么办?
【问题讨论】:
您可以enumerate()
urls 并发送(index, url)
并返回(index, jdata)
,以便稍后您可以使用index
以正确的顺序对结果进行排序。
【参考方案1】:
试试这个:
from concurrent.futures import ThreadPoolExecutor
...
with ThreadPoolExecutor(max_workers=14) as executor:
rv = executor.map(download_file, urls)
for index, jdata in enumerate(rv):
tmp = json_normalize(jdata)
tmp["num"] = nums[index]
df.append(tmp)
print(df.head())
【讨论】:
我应该导入什么来使用mp
?
我犯了一个错误,现在你编辑了你的问题,一切看起来都很好。 map
在结果中保留为参数传递的迭代的顺序。检查starmap
以获得其他有用的界面。
我写了它,但是当我想运行 for index, task in enumerate(as_completed(rv)):
时遇到错误:Exception has occurred: TypeError x unhashable type: 'list'
for next for
循环
您必须消除as_completed
调用。阅读map
文档:返回值具有函数调用的结果。
哦,天哪...我总是在代码中留下小错误。 rv
等价于rv= [download_file(f) for f in urls]
,但使用的是线程池。以上是关于在 Python 中使用 ThreadPoolExecutor 获取数据期间添加一列的主要内容,如果未能解决你的问题,请参考以下文章
Java Executor源码解析—ThreadPoolExecutor线程池其他方法的源码