如何在 CPU 内核上执行函数,并在完成后获得回调?
Posted
技术标签:
【中文标题】如何在 CPU 内核上执行函数,并在完成后获得回调?【英文标题】:How can I execute a function on a CPU core, and get a callback when it has completed? 【发布时间】:2021-09-30 21:21:49 【问题描述】:上下文
我正在接收流:
symbols = ['ABC', 'DFG', ...] # 52 of these
handlers = symbol: Handler(symbol) for symbol in symbols
async for symbol, payload in lines: # 600M of these
handlers[symbol].feed(payload)
我需要使用多个 CPU 内核来加快速度。
handler['ABC']
(例如)保持状态,但它与(例如)handler['DFG']
的状态不相交
基本上我不能同时运行 2 个内核,例如handler['ABC']
.
我目前的方法
我想出了以下解决方案,但它是部分伪代码,因为我看不到如何实现它。
NCORES = 4
symbol_curr_active_on_core = [None]*NCORES
NO_CORES_FREE = -1
def first_free_core():
for i, symbol in enumerate(symbol_curr_active_on_core):
if not symbol:
return i
return NO_CORES_FREE
for symbol, payload in lines:
# wait for avail core to handle it
while True:
sleep(0.001)
if first_free_core() == NO_CORES_FREE:
continue
if symbol in symbol_curr_active_on_core:
continue
core = first_free_core()
symbol_curr_active_on_core[core] = symbol
cores[core].execute(
processor[symbol].feed(payload),
on_complete=lambda core_index: \
symbol_curr_active_on_core[core_index] = None
)
所以我的问题是:如何将最后一条语句转换为工作 Python 代码?
cores[core].execute(
processor[symbol].feed(payload),
on_complete=lambda core_index: \
symbol_curr_active_on_core[core_index] = None
)
PS 更一般地说,我的方法是最优的吗?
【问题讨论】:
我可以理解您为什么不希望两个单独的进程处理同一个符号。但是,假设两个不同的进程相互隔离,为什么不能安排两个不同的进程处理它们不同的符号集在同一个内核上运行呢? 如果我在进程之间划分我的符号,我会因执行时间的变化而失去效率。但这就是我现在所做的,而且效果很好! 如果您有 4 个进程并且每个都准备好运行,例如,不等待 I/O 完成,并且您至少有 4 个物理内核没有运行其他工作,它们都将在 4 个不同的内核上并行运行(这都是一个很大的 if)。但是,不能保证给定进程在调度时始终在同一核心上运行。据我所知,Python 中没有办法指定 CPU 核心亲和性来指定给定进程只能在特定核心上运行。如果可以的话,指定这样的亲和力在性能方面会弄巧成拙。 但听起来你甚至不需要同一个进程总是处理同一个符号。我说对了吗? 【参考方案1】:假设以下方法应该是可行的:
-
您的
Handler
类可以“腌制”并且
Handler
类没有携带如此多的状态信息,以至于它在每次工作人员调用中的序列化成本过高。
主进程创建一个handlers
字典,其中键是 52 个符号之一,值是具有两个键的字典:“handler”,其值是符号的处理程序,“处理”,其值是True
或 False
取决于进程当前是否正在处理该符号的一个或多个有效负载。
池中的每个进程都使用另一个 queue_dict
字典进行初始化,该字典的键是 52 个符号之一,其值是 multiprocessing.Queue
实例,该实例将保存为该符号处理的有效负载实例。
主进程迭代输入的每一行以获得下一个符号/有效负载对。有效负载排队到当前符号的适当队列中。访问handlers
字典以确定是否已将任务排入处理池以通过检查当前符号的processing
标志来处理当前符号的符号特定处理程序。如果此标志为True
,则无需进一步操作。否则,processing
标志设置为 True
并调用 apply_async
作为参数传递此符号的处理程序。
每次主任务将有效负载写入 52 个处理程序队列之一时,都会保留入队任务(即有效负载)的计数并递增。指定为apply_async
参数的工作函数采用其 handler 参数,并从中推断出需要处理的队列。对于它在队列中找到的每个有效负载,它都会调用处理程序的feed
方法。然后它返回一个元组,该元组由更新的处理程序和从队列中删除的有效负载消息的数量组成。 apply_async
方法的回调函数 (1) 更新 handlers
字典中的处理程序并 (2) 将适当符号的 processing
标志重置为 False
。最后,它会根据已删除的有效负载消息的数量来减少入队任务的数量。
当主进程在对有效负载进行排队后检查当前是否有进程正在运行此符号的处理程序并看到processing
标志为True
并且在此基础上不通过@ 提交新任务987654342@,有一个小窗口,该工作人员已经完成处理其队列中的所有有效负载并且即将返回或已经返回,并且回调函数尚未将processing
标志设置为False
。在这种情况下,有效负载将在队列中未处理,直到从输入中读取该符号的下一个有效负载并进行处理。但是,如果该符号没有进一步的输入行,那么当所有任务都完成后,我们将拥有未处理的有效负载。但是我们也会有一个非零的入队任务计数,这表明我们遇到了这种情况。因此,与其尝试实现一个复杂的多进程同步协议,不如通过重新创建一个新池并检查 52 个队列中的每一个来检测这种情况并进行处理。
from multiprocessing import Pool, Queue
import time
from queue import Empty
from threading import Lock
# This class needs to be Pickle-able:
class Handler:
def __init__(self, symbol):
self.symbol = symbol
self.counter = 0
def feed(self, payload):
# For testing just increment counter by payload:
self.counter += payload
def init_pool(the_queue_dict):
global queue_dict
queue_dict = the_queue_dict
def worker(handler):
symbol = handler.symbol
q = queue_dict[symbol]
tasks_removed = 0
while True:
try:
payload = q.get_nowait()
handler.feed(payload)
tasks_removed += 1
except Empty:
break
# return updated handler:
return handler, tasks_removed
def callback_result(result):
global queued_tasks
global lock
handler, tasks_removed = result
# show done processing this symbol by updating handler state:
d = handlers[handler.symbol]
# The order of the next two statements matter:
d['handler'] = handler
d['processing'] = False
with lock:
queued_tasks -= tasks_removed
def main():
global handlers
global lock
global queued_tasks
symbols = [
'A','B','C','D','E','F','G','H','I','J','K','L','M','AA','BB','CC','DD','EE','FF','GG','HH','II','JJ','KK','LL','MM',
'a','b','c','d','e','f','g','h','i','j','k','l','m','aa','bb','cc','dd','ee','ff','gg','hh','ii','jj','kk','ll','mm'
]
queue_dict = symbol: Queue() for symbol in symbols
handlers = symbol: 'processing': False, 'handler': Handler(symbol) for symbol in symbols
lines = [
('A',1),('B',1),('C',1),('D',1),('E',1),('F',1),('G',1),('H',1),('I',1),('J',1),('K',1),('L',1),('M',1),
('AA',1),('BB',1),('CC',1),('DD',1),('EE',1),('FF',1),('GG',1),('HH',1),('II',1),('JJ',1),('KK',1),('LL',1),('MM',1),
('a',1),('b',1),('c',1),('d',1),('e',1),('f',1),('g',1),('h',1),('i',1),('j',1),('k',1),('l',1),('m',1),
('aa',1),('bb',1),('cc',1),('dd',1),('ee',1),('ff',1),('gg',1),('hh',1),('ii',1),('jj',1),('kk',1),('ll',1),('mm',1)
]
def get_lines():
# Emulate 52_000 lines:
for _ in range(10_000):
for line in lines:
yield line
POOL_SIZE = 4
queued_tasks = 0
lock = Lock()
# Create pool of POOL_SIZE processes:
pool = Pool(POOL_SIZE, initializer=init_pool, initargs=(queue_dict,))
for symbol, payload in get_lines():
# Put some limit on memory utilization:
while queued_tasks > 10_000:
time.sleep(.001)
d = handlers[symbol]
q = queue_dict[symbol]
q.put(payload)
with lock:
queued_tasks += 1
if not d['processing']:
d['processing'] = True
handler = d['handler']
pool.apply_async(worker, args=(handler,), callback=callback_result)
# Wait for all tasks to complete
pool.close()
pool.join()
if queued_tasks:
# Re-create pool:
pool = Pool(POOL_SIZE, initializer=init_pool, initargs=(queue_dict,))
for d in handlers.values():
handler = d['handler']
d['processing'] = True
pool.apply_async(worker, args=(handler,), callback=callback_result)
pool.close()
pool.join()
assert queued_tasks == 0
# Print results:
for d in handlers.values():
handler = d['handler']
print(handler.symbol, handler.counter)
if __name__ == "__main__":
main()
打印:
A 10000
B 10000
C 10000
D 10000
E 10000
F 10000
G 10000
H 10000
I 10000
J 10000
K 10000
L 10000
M 10000
AA 10000
BB 10000
CC 10000
DD 10000
EE 10000
FF 10000
GG 10000
HH 10000
II 10000
JJ 10000
KK 10000
LL 10000
MM 10000
a 10000
b 10000
c 10000
d 10000
e 10000
f 10000
g 10000
h 10000
i 10000
j 10000
k 10000
l 10000
m 10000
aa 10000
bb 10000
cc 10000
dd 10000
ee 10000
ff 10000
gg 10000
hh 10000
ii 10000
jj 10000
kk 10000
ll 10000
mm 10000
【讨论】:
【参考方案2】:这远非唯一(甚至可能是“最佳”)方法,但根据我对您其他帖子的评论,这是一个让特定子进程处理特定“符号”的示例
from multiprocessing import Process, Queue
from queue import Empty
from math import ceil
class STOPFLAG: pass
class Handler:
def __init__(self, symbol):
self.counter = 0 #maintain some state for each "Handler"
self.symbol = symbol
def feed(self, payload):
self.counter += payload
return self.counter
class Worker(Process):
def __init__(self, out_q):
self.handlers =
self.in_q = Queue()
self.out_q = out_q
super().__init__()
def run(self):
while True:
try:
symbol = self.in_q.get(1)
except Empty:
pass #put break here if you always expect symbols to be available and a timeout "shouldn't" happen
else:
if isinstance(symbol, STOPFLAG):
#pass back the handlers with their now modified state
self.out_q.put(self.handlers)
break
else:
self.handlers[symbol[0]].feed(symbol[1])
def main():
n_workers = 4
# Just 8 for testing:
symbols = ['ABC', 'DEF', 'GHI', 'JKL', 'MNO', 'PQR', 'STU', 'VWX']
workers = []
out_q = Queue()
for i in range(n_workers):
workers.append(Worker(out_q))
symbol_worker_mapping =
for i, symbol in enumerate(symbols):
workers[i%n_workers].handlers[symbol] = Handler(symbol)
symbol_worker_mapping[symbol] = i%n_workers
for worker in workers: worker.start() #start processes
# Just a few for testing:
lines = [
('ABC', 1),
('DEF', 1),
('GHI', 1),
('JKL', 1),
('MNO', 1),
('PQR', 1),
('STU', 1),
('VWX', 1),
('ABC', 1),
('DEF', 1),
('GHI', 1),
('JKL', 1),
('MNO', 1),
('PQR', 1),
('STU', 1),
('VWX', 1),
]
#putting this loop in a thread could allow results to be collected while inputs are still being fed in.
for symbol, payload in lines: #feed in tasks
worker = workers[symbol_worker_mapping[symbol]] #select the correct worker
worker.in_q.put([symbol, payload]) #pass the inputs
results = [] #results are handler dicts from each worker
for worker in workers:
worker.in_q.put(STOPFLAG()) #Send stop signal to each worker
results.append(out_q.get()) #get results (may be out of order)
for worker in workers: worker.join() #cleanup
for result in results:
for symbol, handler in result.items():
print(symbol, handler.counter)
if __name__ == "__main__":
main()
每个子进程处理“符号”的子集,并且每个子进程都有自己的输入队列。这与普通的pool
不同,其中每个孩子都是相同的,它们都共享一个输入队列,下一个可用的孩子总是接受下一个输入。然后他们都将结果放到一个共享的输出队列中返回给主进程。
一个完全不同的解决方案可能是在主进程中保存所有状态,为每个符号维护一个锁,并在必要的状态被发送给工作人员直到收到结果时持有锁,并且状态在主进程已更新。
【讨论】:
是否保证每个进程使用不同的核心? 进程通常会根据操作系统调度程序的判断在内核之间跳转。 python 没有一种简单的方法来告诉操作系统将进程保留在特定的物理内核上,但这通常并不重要,因为操作系统会尝试相对有效地管理上下文切换。以上是关于如何在 CPU 内核上执行函数,并在完成后获得回调?的主要内容,如果未能解决你的问题,请参考以下文章