如何使用字典修复多线程/多处理?

Posted

技术标签:

【中文标题】如何使用字典修复多线程/多处理?【英文标题】:How to fix multithreading/multiprocessing with dictionaries? 【发布时间】:2019-09-14 07:34:40 【问题描述】:

我正在对一个 api 进行超过 100K 次调用,使用 2 个函数我使用第一个函数访问 api 并获取每个主机的 sysinfo(a dict),然后使用第二个函数通过 sysinfo 并获取IP 地址。我正在寻找一种方法来加快速度,但之前从未使用过多处理/线程(目前大约需要 3 小时)。

from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool

#pool = ThreadPool(4)
p = Pool(5)

#obviously I removed a lot of the code that generates some of these
#variables, but this is the part that slooooows everything down. 

def get_sys_info(self, host_id, appliance):
    sysinfo = self.hx_request("https://:3000//hx/api/v3/hosts//sysinfo"
    return sysinfo

def get_ips_from_sysinfo(self, sysinfo):
    sysinfo = sysinfo["data"]
    network_array = sysinfo.get("networkArray", )
    network_info = network_array.get("networkInfo", [])
    ips = []
    for ni in network_info:
        ip_array = ni.get("ipArray", )
        ip_info = ip_array.get("ipInfo", [])
        for i in ip_info:
            ips.append(i)
    return ips

if __name__ == "__main__":
    for i in ids:
        sysinfo = rr.get_sys_info(i, appliance)
        hostname = sysinfo.get("data", ).get("hostname")
        try:
            ips = p.map(rr.get_ips_from_sysinfo(sysinfo))
        except Exception as e:
            rr.logger.error("Exception on  -- ".format(hostname, e))
            continue

#Tried calling it here
ips = p.map(rr.get_ips_from_sysinfo(sysinfo))

我必须经历超过 100,000 次这样的 api 调用,而这确实是减慢一切的部分。

我想我已经尝试了所有方法并得到了所有可能的可迭代、缺少参数的错误。

我真的很感激任何类型的帮助。谢谢!

【问题讨论】:

你得到有效的结果了吗?您可能想尝试concurrent.futures 模块 - 它有一个不错的 api。 ... 为什么你的函数有self 参数? 这两个函数是一个类的一部分,整个程序确实给了我准确的结果,但它需要的时间太长了。我见过的所有简单示例似乎都有他们经过的列表...... 你能从 api 中得到sysinfo 并在一个函数中提取你想要的数据吗?以便您可以在每个主机名上调用/映射该单个函数?或者你知道瓶颈是否只是 api 调用,sysinfo.get("data", ).get("hostname") 是一个相当快的过程? 是的...我们确实考虑过这一点,我们将在未来的项目中从 sysinfo 中获取其他信息,但我今晚回家后会尝试。 你的外部循环(在ids,不管是什么)是串行的,所以除非在get_sys_info 上花费的时间很少,否则你无法加快速度。 【参考方案1】:

您可以使用线程和队列进行通信,首先您将启动get_ips_from_sysinfo 作为单个线程来监视和处理任何已完成的sysinfo,它将将输出存储在output_list 然后触发所有get_sys_info 线程,小心不要用 100k 个线程耗尽内存

from threading import Thread
from queue import Queue

jobs = Queue()  # buffer for sysinfo
output_list = []  # store ips

def get_sys_info(self, host_id, appliance):
    sysinfo = self.hx_request("https://:3000//hx/api/v3/hosts//sysinfo"
    jobs.put(sysinfo)  # add sysinfo to jobs queue
    return sysinfo  # comment if you don't need it

def get_ips_from_sysinfo(self):
    """it will run contineously untill finish all jobd"""
    while True:
        # get sysinfo from jobs queue
        sysinfo = jobs.get()  # it will wait here for new entry
        if sysinfo == 'exit':
            print('we are done here')
            break

        sysinfo = sysinfo["data"]
        network_array = sysinfo.get("networkArray", )
        network_info = network_array.get("networkInfo", [])
        ips = []
        for ni in network_info:
            ip_array = ni.get("ipArray", )
            ip_info = ip_array.get("ipInfo", [])
            for i in ip_info:
                ips.append(i)
        output_list.append(ips)


if __name__ == "__main__":
    # start our listner thread
    Thread(target=rr.get_ips_from_sysinfo)

    threads = []
    for i in ids:
        t = Thread(target=rr.get_sys_info, args=(i, appliance))
        threads.append(t)
        t.start()

    # wait for threads to finish then terminate get_ips_from_sysinfo() by send 'exit' flag
    for t in threads:
        t.join()

    jobs.put('exit')

【讨论】:

【参考方案2】:

正如@wwii 评论的那样,concurrent.futures 提供了一些可以帮助您的便利,特别是因为这看起来像是一个批处理作业。

看来您的性能损失最有可能来自网络调用,因此多线程可能更适合您的用例(here 是与多处理的比较)。如果没有,您可以使用相同的 API 将池从线程切换到进程。

from concurrent.futures import ThreadPoolExecutor, as_completed
# You can import ProcessPoolExecutor instead and use the same APIs

def thread_worker(instance, host_id, appliance):
    """Wrapper for your class's `get_sys_info` method"""
    sysinfo = instance.get_sys_info(host_id, appliance)
    return sysinfo, instance

# instantiate the class that contains the methods in your example code
# I will call it `RR`
instances = (RR(*your_args, **your_kwds) for your_args, your_kwds 
    in zip(iterable_of_args, iterable_of_kwds))
all_host_ids = another_iterable
all_appliances = still_another_iterable

if __name__ == "__main__":
   with ThreadPoolExecutor(max_workers=50) as executor:  # assuming 10 threads per core; your example uses 5 processes
        pool = executor.submit(thread_worker, instance, _id, _app): (_id, _app)
            for _id, _app in zip(instances, all_host_ids, all_appliances)

        # handle the `sysinfo` dicts as they arrive
        for future in as_completed(pool):
            _result = future.result()
            if isinstance(_sysinfo, Exception):  # just one way of handling exceptions
                # do something
                print(f"pool[future] raised future.result()")
            else:
                # enqueue results for parallel processing in a separate stage, or
                # process the results serially
                _sysinfo, _instance = _result
                ips = _instance.get_ips_from_sysinfo(_sysinfo)
                # do something with `ips`

您可以通过将您的方法重构为函数来简化此示例,如果它们确实没有像您的代码那样使用状态。

如果提取sysinfo 数据的成本很高,您可以将结果排入队列,然后将这些结果提供给ProcessPoolExecutor,该get_ips_from_sysinfo 在排队的字典上调用get_ips_from_sysinfo

【讨论】:

【参考方案3】:

无论出于何种原因,我对在多个线程中调用实例方法有点担心——但它似乎有效。我使用concurrent.futures 制作了这个玩具示例 - 希望它能够很好地模拟您的实际情况。这会将 4000 个实例方法调用提交到(最多)500 个工作人员的线程池。玩弄max_workers 值,我发现执行时间的改进非常线性,最多可达 1000 名工作人员,然后改进ratio开始减少。

import concurrent.futures, time, random

a = [.001*n for n in range(1,4001)]

class F:
    def __init__(self, name):
        self.name = f'name:self.__class__.__name__'
    def apicall(self,n):
        wait = random.choice(a)
        time.sleep(wait)
        return (n,wait, self.name)

f = F('foo')

if __name__ == '__main__':
    nworkers = 500
    with concurrent.futures.ThreadPoolExecutor(nworkers) as executor:
#        t = time.time()
        futures = [executor.submit(f.apicall, n) for n in range(4000)]
        results = [future.result() for future in concurrent.futures.as_completed(futures)]
#        t = time.time() - t
#    q = sum(r[1] for r in results)
#    print(f'# workers:nworkers - ratio:q/t')

我没有考虑方法调用期间可能引发的异常,但文档中的示例非常清楚如何处理。

【讨论】:

【参考方案4】:

所以...看了几天这里的建议(非常感谢!!!)和一些外部阅读(Fluent Python Ch 17 和 Effective Python 59 Specific Ways..)

def get_ips_from_sysinfo(urls):
    sysinfo = lx_request(urls)
    ip_dict =[]
    sysinfo = sysinfo["data"]
    hostname = sysinfo.get("hostname")
    network_array = sysinfo.get("networkArray", )
    network_info = network_array.get("networkInfo", [])
    ips = []
    entry = 
    entry["hostname"] = hostname
    entry["ip_addrs"] = []
    for ni in network_info:
        ip_array = ni.get("ipArray", )
        ip_info = ip_array.get("ipInfo", [])
        for ip in ip_info:
            ip_addr = ip.get("ipAddress", None)
            if not ip_addr:
                ip_addr = ip.get("ipv6Address", None)
            if ip is None:
                continue
            if not is_ip_private(ip_addr):
                entry["ip_addrs"].append(ip_addr)
        if len(entry["ip_addrs"]) == 0:
            continue
        else:
            ip_dict.append(entry)
        return ip_dict

urls = get_sys_info(appliance, ids)

def main():
    pool = ThreadPoolExecutor(max_workers = 15)
    results = list(tqdm(pool.map(get_ips_from_sysinfo, urls), total=len(urls)))
    with open("ip_array.json", "w+") as f:
        json.dump(results, f,  indent=2, sort_keys=True)

main()

*修改后现在可以使用了,希望对其他人有帮助

【讨论】:

以上是关于如何使用字典修复多线程/多处理?的主要内容,如果未能解决你的问题,请参考以下文章

使用多个参数的字典使旧代码单线程代码适应多处理

使用多处理器时如何在字典中重新添加数字

python多线程ssh爆破如何实现与防范?

多处理:如何在多个进程之间共享一个字典?

使用多线程时如何正确处理此错误

如何在 Python 中使用套接字处理多线程?