线程池是如何工作的,以及如何在 NodeJS 之类的 async/await 环境中实现它?

Posted

技术标签:

【中文标题】线程池是如何工作的,以及如何在 NodeJS 之类的 async/await 环境中实现它?【英文标题】:How does thread pooling works, and how to implement it in an async/await env like NodeJS? 【发布时间】:2019-08-07 00:46:08 【问题描述】:

我需要运行一个带有 10_000 个参数的函数 int f(int i),由于 I/O 时间,它需要大约 1 秒来执行。 在像 Python 这样的语言中,我可以使用线程(或 async/await,我知道,但我稍后会谈到)来并行化这个任务。 如果我想始终有 10 个正在运行的线程,并在它们之间拆分任务,我可以使用 ThreadingPool :

def f(p):
    x = [...]
    return x

p = ThreadPool()
xs = p.map(f, range(10_000))

但是它是如何工作的?如果我想用 NodeJS 和f = http("www.google.com", callback) 实现类似的东西,我应该从哪里开始? 这类问题的算法是什么? 同样,我想同时获得 10 个请求,当一个请求完成时,下一个应该开始。

到目前为止我一直在想什么(丑陋,因为回调正在开始对 f() 函数的新调用):

queue = ["www.google.com", "www.facebook.com"]
var f = function(url) 
  http.get(url, (e) => 
    const newUrl = queue.pop();
    f(newUrl);
  );
;

for (var i = 0; i < 10; i++) 
  f(queue.pop());

【问题讨论】:

在 Node.js 中您不需要线程池来执行此操作。 (如果您的任务受 CPU 限制,您会这样做,但 Node 是关于单个线程上的事件 I/O。)无论如何,请参阅 bluebirdjs.com/docs/api/promise.map.html 中的 concurrency 选项,为了方便起见可能还有 npmjs.com/package/request-promise。 Promise 是回调的不错选择,但您无法控制要开始的任务数量。这里我不想同时运行 f() 函数 10K 次。 不仅可以您控制要开始的任务数量,我链接到的concurrency 选项将为您完成所有工作。 好吧,我写我的时候你的评论没有刷新。我的问题更多的是关于它是如何完成的,而不是它可以用一些工具来完成。无论如何感谢您的链接 【参考方案1】:

不确定 ThreadPool 和其他库是如何实现的,但这里有一个提示:使用队列来计算正在运行的任务/线程的数量。 我没有尝试这段代码,但它可以给你一个想法:如果我们应该启动另一个线程,我们会每 0.2 秒创建一个线程检查。 然而,这意味着很多上下文切换并且可能效率不高。

class Pool:
    def __init__(self, func: Callable, params: list, thread_max = 10):
        self.func = func
        self.params = params
        self.running = 0
        self.finished = []
        self.thread_max = thread_max
        self.threads = []

    def start(self):
        Thread(target=check, args=(0.2)).start()

    def check(self, t_sleep=0.5):
        done = False
        while not done:
            sleep(t_sleep)
            # first check for finished threads
            for t in threads:
                if not t.isAlive():
                    # do something with return value
                    # ...
                    self.threads.remove(t)

            if not len(self.params): # mean there is no more task left to LAUNCH
                done = len(self.threads) # gonna be 0 when every tasks is COMPLETE
                continue # avoid the next part (launching thread)

            # now start some threads if needed
            while len(self.threads) < self.thread_max:
                arg = self.params.pop()
                thread = Thread(target=self.func, args=(arg, ))
                threads.insert(thread)
                thread.start()

但是我没有任何关于 async/await 的线索(关键字现在在 python 中可用)

【讨论】:

感谢您帮助我了解池化。但是您的解决方案并不是很有效,是吗?我也想了解更多关于 async/await 的信息。【参考方案2】:

重新实现我链接到的那个 Bluebird 函数:

const mapWithConcurrency = async (values, concurrency, fn) => 
    let i = 0;
    let results = values.map(() => null);

    const work = async () => 
        while (i < values.length) 
            const current = i++;
            results[current] = await fn(values[current]);
        
    ;

    await Promise.all(Array.from(length: concurrency, work));

    return results;
;

mapWithConcurrency(Array.from(length: 30 * 15, (_, i) => i), 10, async i => 
    const el = document.body.appendChild(document.createElement('i'));
    el.style.left = 5 * (i % 30) + 'px';
    el.style.top = 5 * (i / 30 | 0) + 'px';
    await new Promise(resolve =>  setTimeout(resolve, Math.random() * 500); );
    el.style.background = 'black';
    return 2 * i;
).then(results => 
    console.log(results.length, results.every((x, i) => x === 2 * i));
);
i 
    background: grey;
    transition: background 0.3s ease-out;
    position: absolute;
    width: 5px;
    height: 5px;

【讨论】:

【参考方案3】:

在python中,线程池只使用1个cpu核心。但由于您的任务是 I/O 受限的,它会比串行执行 10k 函数调用做得更好。

为了做得更好,您可以尝试使用多个内核的进程池。甚至将 asyncio 与进程结合起来。根据您的问题,使用这两种方法可能会或可能不会进一步加速,使用线程池作为基线。

见this example of combining thread/process with asyncio。它应该直接适用于您的情况。您的函数f 相当于他们的函数block

在 Python 3.6 中,异步代码的一般形式是创建一个事件循环来运行异步函数。一个非常简单的例子是

import asyncio

async def coroutine():
    print('in coroutine')

coro = coroutine()
event_loop = asyncio.get_event_loop()

event_loop.run_until_complete(coro)
event_loop.close()

为简单起见,您可以将async def 函数的返回视为要执行的东西(协程),然后循环执行它。如果有 N 个任务要异步执行,你可以用 N 个 async def 函数定义它们,另一个 awaits 函数定义它们。最后一个async 函数定义了N 个任务的“完成”意味着什么。例如,“完成”可能意味着所有 N 个任务都已完成,或者只要其中 1 个任务完成,等等。循环执行第 N+1 个函数。

在 Python 3.7 中,异步 API 发生了一些变化,并且不需要显式创建循环。 您可以在my blog post 中找到一些示例。

【讨论】:

【参考方案4】:

迟到的答案,但我通常处理最大线程限制为X 的多个线程的方式如下:

import threading
import requests, json
import time
from urllib.parse import urlparse

final_dict =  # will hold final results

def parser(u):
    try:
        parsed_uri = urlparse(u) # parse url to get domain name that'l be used as key in final_dict
        domain = "uri.netloc".format(uri=parsed_uri)
        x = requests.get(u)
        status_code = x.status_code
        headers = x.headers
        cookies = x.cookies
        # OR cookies = ";".join(f"k:v" for k,v in x.cookies.iteritems())
        html = x.text
        # do something with the parsed url, in this case, I created a dictionary containing info about the parsed url: timestamp, url, status_code, html, headers and cookies
        if not domain in final_dict:
            final_dict[domain] = []
        final_dict[domain].append( 'ts': time.time(), 'url': u, 'status': status_code , 'headers': str(headers), 'cookies': str(cookies), 'html': html )

    except Exception as e:
        pass
        print(e)
        return 

max_threads = 10
urls = ['https://google.com','https://www.facebook.com', 'https://google.com/search?q=hello+world', 'https://www.facebook.com/messages/', 'https://google.com/search?q=learn+python', 'https://www.facebook.com/me/photos', 'https://google.com/search?q=visit+lisboa', 'https://www.facebook.com/me/photos_albums']

for u in urls:
    threading.Thread(target=parser, args=[u]).start()
    tc = threading.active_count()
    while tc == max_threads:
        tc = threading.active_count()
        time.sleep(0.2)

while tc != 1: # wait for threads to finish, when tc == 1 no more threads are running apart from the main process.
    tc = threading.active_count()
    time.sleep(0.2)

print(json.dumps(final_dict))

'''
# save to file
with open("output.json", "w") as f:
    f.write(json.dumps(final_dict))

# load from file
with open("output.json") as f:
    _json = json.loads(f.read())
'''

输出:

    请检查上面生成的json:https://jsoneditoronline.org/?id=403e55d841394a5a83dbbda98d5f2ccd 上面的代码在某种程度上是“我自己的代码”,我的意思是它在以前的项目中使用过,它可能无法完全回答您的问题,但希望它对未来的用户来说是一个很好的资源。 在Linux 上,我通常将max_threads 设置为250,在Windows 上设置为 150 附近。

【讨论】:

【参考方案5】:

要具有与 nodejs 类似的行为,您必须使用响应式 x 编程。您正在寻找的是 rxpy 。 https://github.com/ReactiveX/RxPY

【讨论】:

【参考方案6】:

看看我新发布的模块:concurrency-controller

它可以以给定的并发度同时调用函数。

【讨论】:

以上是关于线程池是如何工作的,以及如何在 NodeJS 之类的 async/await 环境中实现它?的主要内容,如果未能解决你的问题,请参考以下文章

面试官问:线程池是如何工作的?我给他讲了个白话故事!

nodejs如何开启多线程

你知道线程池是如何退出程序的吗?

你知道线程池是如何退出程序的吗?

线程池是如何重复利用空闲的线程来执行任务的?

线程池是如何重复利用空闲线程的?