在 Python3 中使用 for 循环进行多线程/多处理
Posted
技术标签:
【中文标题】在 Python3 中使用 for 循环进行多线程/多处理【英文标题】:Multithreading / Multiprocessing with a for-loop in Python3 【发布时间】:2022-01-16 00:06:54 【问题描述】:我有这个任务同时受 I/O 限制和 CPU 限制。
基本上,我从用户那里获取查询列表,谷歌搜索它们(通过 custom-search-api),将每个查询结果存储在 .txt 文件中,并将所有结果存储在 results.txt 文件中。
我在想也许并行性在这里可能是一个优势。 我的整个任务都用一个包含 2 个成员字段的对象包装,我应该在所有线程/进程(一个列表和一个字典)中使用它们。
因此,当我使用多处理时,我会得到奇怪的结果(我认为这是因为我的共享资源)。
即:
class MyObject(object):
_my_list = []
_my_dict =
_my_dict
包含 key:value
对 "query_name":list()
。
_my_list
是要在 google 中搜索的查询列表。假设它没有被写入是安全的。
对于每个查询:我在 google 上搜索它,获取顶部结果并将其存储在 _my_dict
我想并行执行此操作。我认为线程可能很好,但似乎它们会减慢工作速度..
我是如何尝试的(这是每个查询完成整个工作的方法):
def _do_job(self, query):
""" search the query on google (via http)
save results on a .txt file locally. """
这是应该为所有查询并行执行所有作业的方法:
def find_articles(self):
p = Pool(processes=len(self._my_list))
p.map_async(self._do_job, self._my_list)
p.close()
p.join()
self._create_final_log()
上述执行不起作用,我得到损坏的结果...
但是,当我使用多线程时,结果很好,但速度很慢:
def find_articles(self):
thread_pool = []
for vendor in self._vendors_list:
self._search_validate_cache(vendor)
thread = threading.Thread(target=self._search_validate_cache, args=. (vendor,))
thread_pool.append(thread)
thread.start()
for thread in thread_pool:
thread.join()
self._create_final_log()
任何帮助将不胜感激,谢谢!
【问题讨论】:
如果您的任务受 CPU 限制(或者在 IO 任务期间可能不释放 GIL),线程无法帮助您,因为每个进程一次只允许运行一个线程(因为python的内存管理不是线程安全的)。multiprocessing
通常是解决这个问题的方法,但与线程不同;进程不共享内存空间。您需要特殊的共享数据结构来在进程之间共享数据。普通列表和字典不起作用。
我提供的答案是否有运气测试?
【参考方案1】:
我过去在做类似项目时遇到过这种情况(多处理效率不高,单线程太慢,每个查询启动一个线程太快并且出现瓶颈)。我发现完成此类任务的一种有效方法是创建一个线程数量有限的线程池。从逻辑上讲,完成此任务的最快方法是使用尽可能多的网络资源而不会出现瓶颈,这就是为什么同时活跃的线程主动发出请求的原因。
在您的情况下,使用带有回调函数的线程池循环查询列表将是浏览所有数据的快速简便的方法。显然,有很多因素会影响这一点,例如网络速度和找到正确大小的线程池以避免瓶颈,但总的来说我发现这很好用。
import threading
class MultiThread:
def __init__(self, func, list_data, thread_cap=10):
"""
Parameters
----------
func : function
Callback function to multi-thread
threads : int
Amount of threads available in the pool
list_data : list
List of data to multi-thread index
"""
self.func = func
self.thread_cap = thread_cap
self.thread_pool = []
self.current_index = -1
self.total_index = len(list_data) - 1
self.complete = False
self.list_data = list_data
def start(self):
for _ in range(self.thread_cap):
thread = threading.Thread(target=self._wrapper)
self.thread_pool += [thread]
thread.start()
def _wrapper(self):
while not self.complete:
if self.current_index < self.total_index:
self.current_index += 1
self.func(self.list_data[self.current_index])
else:
self.complete = True
def wait_on_completion(self):
for thread in self.thread_pool:
thread.join()
import requests #, time
_my_dict =
base_url = "https://www.google.com/search?q="
s = requests.sessions.session()
def example_callback_func(query):
global _my_dict
# code to grab data here
r = s.get(base_url+query)
_my_dict[query] = r.text # whatever parsed results
print(r, query)
#start_time = time.time()
_my_list = ["examplequery"+str(n) for n in range(100)]
mt = MultiThread(example_callback_func, _my_list, thread_cap=30)
mt.start()
mt.wait_on_completion()
# output queries to file
#print("Time::2f".format(time.time()-start_time))
您还可以打开文件并输出您需要的任何内容,或者在最后输出数据。显然,我在这里的复制品并不完全是您所需要的,但它是一个坚固的样板,具有我制作的轻量级功能,这将大大减少所需的时间。它使用线程池调用回调到默认函数,该函数采用单个参数(查询)。
在我的测试中,它在大约 2 秒内完成了 100 个循环的查询。在我发现瓶颈之前,我绝对可以使用线程帽并降低时间。
【讨论】:
以上是关于在 Python3 中使用 for 循环进行多线程/多处理的主要内容,如果未能解决你的问题,请参考以下文章