如何使用字典修复多线程/多处理?
Posted
技术标签:
【中文标题】如何使用字典修复多线程/多处理?【英文标题】:How to fix multithreading/multiprocessing with dictionaries? 【发布时间】:2019-09-14 07:34:40 【问题描述】:我正在对一个 api 进行超过 100K 次调用,使用 2 个函数我使用第一个函数访问 api 并获取每个主机的 sysinfo(a dict),然后使用第二个函数通过 sysinfo 并获取IP 地址。我正在寻找一种方法来加快速度,但之前从未使用过多处理/线程(目前大约需要 3 小时)。
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool
#pool = ThreadPool(4)
p = Pool(5)
#obviously I removed a lot of the code that generates some of these
#variables, but this is the part that slooooows everything down.
def get_sys_info(self, host_id, appliance):
sysinfo = self.hx_request("https://:3000//hx/api/v3/hosts//sysinfo"
return sysinfo
def get_ips_from_sysinfo(self, sysinfo):
sysinfo = sysinfo["data"]
network_array = sysinfo.get("networkArray", )
network_info = network_array.get("networkInfo", [])
ips = []
for ni in network_info:
ip_array = ni.get("ipArray", )
ip_info = ip_array.get("ipInfo", [])
for i in ip_info:
ips.append(i)
return ips
if __name__ == "__main__":
for i in ids:
sysinfo = rr.get_sys_info(i, appliance)
hostname = sysinfo.get("data", ).get("hostname")
try:
ips = p.map(rr.get_ips_from_sysinfo(sysinfo))
except Exception as e:
rr.logger.error("Exception on -- ".format(hostname, e))
continue
#Tried calling it here
ips = p.map(rr.get_ips_from_sysinfo(sysinfo))
我必须经历超过 100,000 次这样的 api 调用,而这确实是减慢一切的部分。
我想我已经尝试了所有方法并得到了所有可能的可迭代、缺少参数的错误。
我真的很感激任何类型的帮助。谢谢!
【问题讨论】:
你得到有效的结果了吗?您可能想尝试concurrent.futures 模块 - 它有一个不错的 api。 ... 为什么你的函数有self
参数?
这两个函数是一个类的一部分,整个程序确实给了我准确的结果,但它需要的时间太长了。我见过的所有简单示例似乎都有他们经过的列表......
你能从 api 中得到sysinfo
并在一个函数中提取你想要的数据吗?以便您可以在每个主机名上调用/映射该单个函数?或者你知道瓶颈是否只是 api 调用,sysinfo.get("data", ).get("hostname")
是一个相当快的过程?
是的...我们确实考虑过这一点,我们将在未来的项目中从 sysinfo 中获取其他信息,但我今晚回家后会尝试。
你的外部循环(在ids
,不管是什么)是串行的,所以除非在get_sys_info
上花费的时间很少,否则你无法加快速度。
【参考方案1】:
您可以使用线程和队列进行通信,首先您将启动get_ips_from_sysinfo
作为单个线程来监视和处理任何已完成的sysinfo
,它将将输出存储在output_list
然后触发所有get_sys_info
线程,小心不要用 100k 个线程耗尽内存
from threading import Thread
from queue import Queue
jobs = Queue() # buffer for sysinfo
output_list = [] # store ips
def get_sys_info(self, host_id, appliance):
sysinfo = self.hx_request("https://:3000//hx/api/v3/hosts//sysinfo"
jobs.put(sysinfo) # add sysinfo to jobs queue
return sysinfo # comment if you don't need it
def get_ips_from_sysinfo(self):
"""it will run contineously untill finish all jobd"""
while True:
# get sysinfo from jobs queue
sysinfo = jobs.get() # it will wait here for new entry
if sysinfo == 'exit':
print('we are done here')
break
sysinfo = sysinfo["data"]
network_array = sysinfo.get("networkArray", )
network_info = network_array.get("networkInfo", [])
ips = []
for ni in network_info:
ip_array = ni.get("ipArray", )
ip_info = ip_array.get("ipInfo", [])
for i in ip_info:
ips.append(i)
output_list.append(ips)
if __name__ == "__main__":
# start our listner thread
Thread(target=rr.get_ips_from_sysinfo)
threads = []
for i in ids:
t = Thread(target=rr.get_sys_info, args=(i, appliance))
threads.append(t)
t.start()
# wait for threads to finish then terminate get_ips_from_sysinfo() by send 'exit' flag
for t in threads:
t.join()
jobs.put('exit')
【讨论】:
【参考方案2】:正如@wwii 评论的那样,concurrent.futures
提供了一些可以帮助您的便利,特别是因为这看起来像是一个批处理作业。
看来您的性能损失最有可能来自网络调用,因此多线程可能更适合您的用例(here 是与多处理的比较)。如果没有,您可以使用相同的 API 将池从线程切换到进程。
from concurrent.futures import ThreadPoolExecutor, as_completed
# You can import ProcessPoolExecutor instead and use the same APIs
def thread_worker(instance, host_id, appliance):
"""Wrapper for your class's `get_sys_info` method"""
sysinfo = instance.get_sys_info(host_id, appliance)
return sysinfo, instance
# instantiate the class that contains the methods in your example code
# I will call it `RR`
instances = (RR(*your_args, **your_kwds) for your_args, your_kwds
in zip(iterable_of_args, iterable_of_kwds))
all_host_ids = another_iterable
all_appliances = still_another_iterable
if __name__ == "__main__":
with ThreadPoolExecutor(max_workers=50) as executor: # assuming 10 threads per core; your example uses 5 processes
pool = executor.submit(thread_worker, instance, _id, _app): (_id, _app)
for _id, _app in zip(instances, all_host_ids, all_appliances)
# handle the `sysinfo` dicts as they arrive
for future in as_completed(pool):
_result = future.result()
if isinstance(_sysinfo, Exception): # just one way of handling exceptions
# do something
print(f"pool[future] raised future.result()")
else:
# enqueue results for parallel processing in a separate stage, or
# process the results serially
_sysinfo, _instance = _result
ips = _instance.get_ips_from_sysinfo(_sysinfo)
# do something with `ips`
您可以通过将您的方法重构为函数来简化此示例,如果它们确实没有像您的代码那样使用状态。
如果提取sysinfo
数据的成本很高,您可以将结果排入队列,然后将这些结果提供给ProcessPoolExecutor
,该get_ips_from_sysinfo
在排队的字典上调用get_ips_from_sysinfo
。
【讨论】:
【参考方案3】:无论出于何种原因,我对在多个线程中调用实例方法有点担心——但它似乎有效。我使用concurrent.futures 制作了这个玩具示例 - 希望它能够很好地模拟您的实际情况。这会将 4000 个实例方法调用提交到(最多)500 个工作人员的线程池。玩弄max_workers
值,我发现执行时间的改进非常线性,最多可达 1000 名工作人员,然后改进ratio开始减少。
import concurrent.futures, time, random
a = [.001*n for n in range(1,4001)]
class F:
def __init__(self, name):
self.name = f'name:self.__class__.__name__'
def apicall(self,n):
wait = random.choice(a)
time.sleep(wait)
return (n,wait, self.name)
f = F('foo')
if __name__ == '__main__':
nworkers = 500
with concurrent.futures.ThreadPoolExecutor(nworkers) as executor:
# t = time.time()
futures = [executor.submit(f.apicall, n) for n in range(4000)]
results = [future.result() for future in concurrent.futures.as_completed(futures)]
# t = time.time() - t
# q = sum(r[1] for r in results)
# print(f'# workers:nworkers - ratio:q/t')
我没有考虑方法调用期间可能引发的异常,但文档中的示例非常清楚如何处理。
【讨论】:
【参考方案4】:所以...看了几天这里的建议(非常感谢!!!)和一些外部阅读(Fluent Python Ch 17 和 Effective Python 59 Specific Ways..)
def get_ips_from_sysinfo(urls):
sysinfo = lx_request(urls)
ip_dict =[]
sysinfo = sysinfo["data"]
hostname = sysinfo.get("hostname")
network_array = sysinfo.get("networkArray", )
network_info = network_array.get("networkInfo", [])
ips = []
entry =
entry["hostname"] = hostname
entry["ip_addrs"] = []
for ni in network_info:
ip_array = ni.get("ipArray", )
ip_info = ip_array.get("ipInfo", [])
for ip in ip_info:
ip_addr = ip.get("ipAddress", None)
if not ip_addr:
ip_addr = ip.get("ipv6Address", None)
if ip is None:
continue
if not is_ip_private(ip_addr):
entry["ip_addrs"].append(ip_addr)
if len(entry["ip_addrs"]) == 0:
continue
else:
ip_dict.append(entry)
return ip_dict
urls = get_sys_info(appliance, ids)
def main():
pool = ThreadPoolExecutor(max_workers = 15)
results = list(tqdm(pool.map(get_ips_from_sysinfo, urls), total=len(urls)))
with open("ip_array.json", "w+") as f:
json.dump(results, f, indent=2, sort_keys=True)
main()
*修改后现在可以使用了,希望对其他人有帮助
【讨论】:
以上是关于如何使用字典修复多线程/多处理?的主要内容,如果未能解决你的问题,请参考以下文章