线程池是如何工作的,以及如何在 NodeJS 之类的 async/await 环境中实现它?
Posted
技术标签:
【中文标题】线程池是如何工作的,以及如何在 NodeJS 之类的 async/await 环境中实现它?【英文标题】:How does thread pooling works, and how to implement it in an async/await env like NodeJS? 【发布时间】:2019-08-07 00:46:08 【问题描述】:我需要运行一个带有 10_000 个参数的函数 int f(int i)
,由于 I/O 时间,它需要大约 1 秒来执行。
在像 Python 这样的语言中,我可以使用线程(或 async/await
,我知道,但我稍后会谈到)来并行化这个任务。
如果我想始终有 10 个正在运行的线程,并在它们之间拆分任务,我可以使用 ThreadingPool :
def f(p):
x = [...]
return x
p = ThreadPool()
xs = p.map(f, range(10_000))
但是它是如何工作的?如果我想用 NodeJS 和f = http("www.google.com", callback)
实现类似的东西,我应该从哪里开始? 这类问题的算法是什么?
同样,我想同时获得 10 个请求,当一个请求完成时,下一个应该开始。
到目前为止我一直在想什么(丑陋,因为回调正在开始对 f() 函数的新调用):
queue = ["www.google.com", "www.facebook.com"]
var f = function(url)
http.get(url, (e) =>
const newUrl = queue.pop();
f(newUrl);
);
;
for (var i = 0; i < 10; i++)
f(queue.pop());
【问题讨论】:
在 Node.js 中您不需要线程池来执行此操作。 (如果您的任务受 CPU 限制,您会这样做,但 Node 是关于单个线程上的事件 I/O。)无论如何,请参阅 bluebirdjs.com/docs/api/promise.map.html 中的concurrency
选项,为了方便起见可能还有 npmjs.com/package/request-promise。
Promise 是回调的不错选择,但您无法控制要开始的任务数量。这里我不想同时运行 f()
函数 10K 次。
不仅可以您控制要开始的任务数量,我链接到的concurrency
选项将为您完成所有工作。
好吧,我写我的时候你的评论没有刷新。我的问题更多的是关于它是如何完成的,而不是它可以用一些工具来完成。无论如何感谢您的链接
【参考方案1】:
不确定 ThreadPool 和其他库是如何实现的,但这里有一个提示:使用队列来计算正在运行的任务/线程的数量。 我没有尝试这段代码,但它可以给你一个想法:如果我们应该启动另一个线程,我们会每 0.2 秒创建一个线程检查。 然而,这意味着很多上下文切换并且可能效率不高。
class Pool:
def __init__(self, func: Callable, params: list, thread_max = 10):
self.func = func
self.params = params
self.running = 0
self.finished = []
self.thread_max = thread_max
self.threads = []
def start(self):
Thread(target=check, args=(0.2)).start()
def check(self, t_sleep=0.5):
done = False
while not done:
sleep(t_sleep)
# first check for finished threads
for t in threads:
if not t.isAlive():
# do something with return value
# ...
self.threads.remove(t)
if not len(self.params): # mean there is no more task left to LAUNCH
done = len(self.threads) # gonna be 0 when every tasks is COMPLETE
continue # avoid the next part (launching thread)
# now start some threads if needed
while len(self.threads) < self.thread_max:
arg = self.params.pop()
thread = Thread(target=self.func, args=(arg, ))
threads.insert(thread)
thread.start()
但是我没有任何关于 async/await 的线索(关键字现在在 python 中可用)
【讨论】:
感谢您帮助我了解池化。但是您的解决方案并不是很有效,是吗?我也想了解更多关于 async/await 的信息。【参考方案2】:重新实现我链接到的那个 Bluebird 函数:
const mapWithConcurrency = async (values, concurrency, fn) =>
let i = 0;
let results = values.map(() => null);
const work = async () =>
while (i < values.length)
const current = i++;
results[current] = await fn(values[current]);
;
await Promise.all(Array.from(length: concurrency, work));
return results;
;
mapWithConcurrency(Array.from(length: 30 * 15, (_, i) => i), 10, async i =>
const el = document.body.appendChild(document.createElement('i'));
el.style.left = 5 * (i % 30) + 'px';
el.style.top = 5 * (i / 30 | 0) + 'px';
await new Promise(resolve => setTimeout(resolve, Math.random() * 500); );
el.style.background = 'black';
return 2 * i;
).then(results =>
console.log(results.length, results.every((x, i) => x === 2 * i));
);
i
background: grey;
transition: background 0.3s ease-out;
position: absolute;
width: 5px;
height: 5px;
【讨论】:
【参考方案3】:在python中,线程池只使用1个cpu核心。但由于您的任务是 I/O 受限的,它会比串行执行 10k 函数调用做得更好。
为了做得更好,您可以尝试使用多个内核的进程池。甚至将 asyncio 与进程结合起来。根据您的问题,使用这两种方法可能会或可能不会进一步加速,使用线程池作为基线。
见this example of combining thread/process with asyncio。它应该直接适用于您的情况。您的函数f
相当于他们的函数block
。
在 Python 3.6 中,异步代码的一般形式是创建一个事件循环来运行异步函数。一个非常简单的例子是
import asyncio
async def coroutine():
print('in coroutine')
coro = coroutine()
event_loop = asyncio.get_event_loop()
event_loop.run_until_complete(coro)
event_loop.close()
为简单起见,您可以将async def
函数的返回视为要执行的东西(协程),然后循环执行它。如果有 N 个任务要异步执行,你可以用 N 个 async def
函数定义它们,另一个 await
s 函数定义它们。最后一个async
函数定义了N 个任务的“完成”意味着什么。例如,“完成”可能意味着所有 N 个任务都已完成,或者只要其中 1 个任务完成,等等。循环执行第 N+1 个函数。
在 Python 3.7 中,异步 API 发生了一些变化,并且不需要显式创建循环。 您可以在my blog post 中找到一些示例。
【讨论】:
【参考方案4】:迟到的答案,但我通常处理最大线程限制为X
的多个线程的方式如下:
import threading
import requests, json
import time
from urllib.parse import urlparse
final_dict = # will hold final results
def parser(u):
try:
parsed_uri = urlparse(u) # parse url to get domain name that'l be used as key in final_dict
domain = "uri.netloc".format(uri=parsed_uri)
x = requests.get(u)
status_code = x.status_code
headers = x.headers
cookies = x.cookies
# OR cookies = ";".join(f"k:v" for k,v in x.cookies.iteritems())
html = x.text
# do something with the parsed url, in this case, I created a dictionary containing info about the parsed url: timestamp, url, status_code, html, headers and cookies
if not domain in final_dict:
final_dict[domain] = []
final_dict[domain].append( 'ts': time.time(), 'url': u, 'status': status_code , 'headers': str(headers), 'cookies': str(cookies), 'html': html )
except Exception as e:
pass
print(e)
return
max_threads = 10
urls = ['https://google.com','https://www.facebook.com', 'https://google.com/search?q=hello+world', 'https://www.facebook.com/messages/', 'https://google.com/search?q=learn+python', 'https://www.facebook.com/me/photos', 'https://google.com/search?q=visit+lisboa', 'https://www.facebook.com/me/photos_albums']
for u in urls:
threading.Thread(target=parser, args=[u]).start()
tc = threading.active_count()
while tc == max_threads:
tc = threading.active_count()
time.sleep(0.2)
while tc != 1: # wait for threads to finish, when tc == 1 no more threads are running apart from the main process.
tc = threading.active_count()
time.sleep(0.2)
print(json.dumps(final_dict))
'''
# save to file
with open("output.json", "w") as f:
f.write(json.dumps(final_dict))
# load from file
with open("output.json") as f:
_json = json.loads(f.read())
'''
输出:
-
请检查上面生成的
json
:https://jsoneditoronline.org/?id=403e55d841394a5a83dbbda98d5f2ccd
上面的代码在某种程度上是“我自己的代码”,我的意思是它在以前的项目中使用过,它可能无法完全回答您的问题,但希望它对未来的用户来说是一个很好的资源。
在Linux
上,我通常将max_threads
设置为250
,在Windows
上设置为
150
附近。
【讨论】:
【参考方案5】:要具有与 nodejs 类似的行为,您必须使用响应式 x 编程。您正在寻找的是 rxpy 。 https://github.com/ReactiveX/RxPY
【讨论】:
【参考方案6】:看看我新发布的模块:concurrency-controller
它可以以给定的并发度同时调用函数。
【讨论】:
以上是关于线程池是如何工作的,以及如何在 NodeJS 之类的 async/await 环境中实现它?的主要内容,如果未能解决你的问题,请参考以下文章