多处理异步:生产者-消费者模型

Posted

技术标签:

【中文标题】多处理异步:生产者-消费者模型【英文标题】:Asyncio with multiprocessing : Producers-Consumers model 【发布时间】:2021-04-30 11:23:25 【问题描述】:

我正在尝试检索股票价格并在价格出现时对其进行处理。我是并发初学者,但我认为这种设置似乎适合异步生产者-消费者模型,其中每个生产者检索股票价格,并将其通过队列传递给消费者。现在消费者已经并行处理股票价格(多处理),因为这项工作是 CPU 密集型的。因此,我将有多个消费者已经在工作,而并非所有生产者都已完成检索数据。此外,我想实施一个步骤,如果消费者发现它正在处理的股票价格无效,我们会为该股票生成一个新的消费者工作。

到目前为止,我有以下玩具代码可以让我到达那里,但我的 process_data 函数(消费者)存在问题。

from concurrent.futures import ProcessPoolExecutor
import asyncio
import random
import time
random.seed(444)

#producers
async def retrieve_data(ticker, q):
    '''
    Pretend we're using aiohttp to retrieve stock prices from a URL
    Place a tuple of stock ticker and price into asyn queue as it becomes available
    '''
    start = time.perf_counter() # start timer
    await asyncio.sleep(random.randint(4, 8)) # pretend we're calling some URL
    price = random.randint(1, 100) # pretend this is the price we retrieved
    print(f'ticker : price retrieved in time.perf_counter() - start:0.1f seconds') 
    await q.put((ticker, price)) # place the price into the asyncio queue
    

#consumers
async def process_data(q):
    while True:
        data = await q.get()
        print(f"processing: data")
        with ProcessPoolExecutor() as executor:
            loop = asyncio.get_running_loop()
            result = await loop.run_in_executor(executor, data_processor, data)
            #if output of data_processing failed, send ticker back to queue to retrieve data again
            if not result[2]: 
                print(f'result[0] data invalid. Retrieving again...')
                await retrieve_data(result[0], q) # add a new task
                q.task_done() # end this task
            else:
                q.task_done() # so that q.join() knows when the task is done
            
async def main(tickers):       
    q = asyncio.Queue()
    producers = [asyncio.create_task(retrieve_data(ticker, q)) for ticker in tickers]
    consumers = [asyncio.create_task(process_data(q))]
    await asyncio.gather(*producers)
    await q.join()  # Implicitly awaits consumers, too. blocks until all items in the queue have been received and processed
    for c in consumers:
        c.cancel() #cancel the consumer tasks, which would otherwise hang up and wait endlessly for additional queue items to appear
    

    
'''
RUN IN JUPYTER NOTEBOOK
'''
start = time.perf_counter()
tickers = ['AAPL', 'AMZN', 'TSLA', 'C', 'F']
await main(tickers)
print(f'total elapsed time: time.perf_counter() - start:0.2f')

'''
RUN IN TERMINAL
'''
# if __name__ == "__main__":
#     start = time.perf_counter()
#     tickers = ['AAPL', 'AMZN', 'TSLA', 'C', 'F']
#     asyncio.run(main(tickers))
#     print(f'total elapsed time: time.perf_counter() - start:0.2f')

由上面的 process_data() 调用的 data_processor() 函数需要位于 Jupyter notebook 中的不同单元格或单独的模块中(据我了解,以避免 PicklingError)

from multiprocessing import current_process

def data_processor(data):
    ticker = data[0]
    price = data[1]
    
    print(f'Started ticker - current_process().name')
    start = time.perf_counter() # start time counter
    time.sleep(random.randint(4, 5)) # mimic some random processing time
    
    # pretend we're processing the price. Let the processing outcome be invalid if the price is an odd number
    if price % 2==0:
        is_valid = True
    else:
        is_valid = False
    
    print(f"ticker's price price validity: --is_valid--"
          f' Elapsed time: time.perf_counter() - start:0.2f seconds')
    return (ticker, price, is_valid)

问题

    我没有使用 python 的多处理模块,而是使用了 concurrent.futures 的 ProcessPoolExecutor,我读到它与 asyncio (What kind of problems (if any) would there be combining asyncio with multiprocessing?) 兼容。但似乎我必须在检索执行程序调用的函数的输出 (result) 和能够并行运行多个子进程之间做出选择。使用下面的构造,子进程按顺序运行,而不是并行运行。

    with ProcessPoolExecutor() as executor:
            loop = asyncio.get_running_loop()
            result = await loop.run_in_executor(executor, data_processor, data)  
    

删除result = await 前面的result = await 允许并行运行多个消费者,但是我无法从父进程收集他们的结果。为此我需要await。然后当然剩下的代码块会失败。

如何让这些子流程并行运行并提供输出?也许它需要一个不同于生产者-消费者模型的结构或其他东西

    请求再次检索无效股票价格的代码部分有效(前提是我可以从上面得到结果),但它在调用它的子进程中运行并阻止创建新消费者,直到请求被履行。有没有办法解决这个问题?

    #if output of data_processing failed, send ticker back to queue to retrieve data again
    if not result[2]: 
            print(f'result[0] data invalid. Retrieving again...')
            await retrieve_data(result[0], q) # add a new task
            q.task_done() # end this task
        else:
            q.task_done() # so that q.join() knows when the task is done
    

【问题讨论】:

【参考方案1】:

但似乎我必须在检索执行程序调用的函数的输出(结果)和能够并行运行多个子进程之间做出选择。

幸运的是,情况并非如此,您也可以使用asyncio.gather() 一次等待多个项目。但是您从队列中一个一个地获取数据项,因此您没有一批要处理的项目。最简单的解决方案是启动多个消费者。替换

# the single-element list looks suspicious anyway
consumers = [asyncio.create_task(process_data(q))]

与:

# now we have an actual list
consumers = [asyncio.create_task(process_data(q)) for _ in range(16)]

每个消费者都将等待一个单独的任务完成,但这没关系,因为您将有一个完整的池并行工作,这正是您想要的。

另外,您可能希望将executor 设为全局变量,而使用with,以便所有消费者共享进程池,并且与程序一样长。这样,消费者将重用已经生成的工作进程,而不必为从队列中接收到的每个作业生成一个新进程。 (这就是拥有进程“池”的全部意义。)在这种情况下,您可能希望在程序中不再需要执行程序的位置添加 executor.shutdown()

【讨论】:

啊!以最少的代码编辑产生如此大的影响!谢谢用户48!这实际上同时解决了这两个问题。 快速提问。在 htop 中,我可以看到进程 PID 在代码运行完成后仍然存在。需要清理吗? @annon “代码完成运行”是指整个程序,还是只是使用多处理的程序部分?另外,您是否执行了最后一段中的建议? 整个程序,是的,我拔出了执行程序并使其成为全球性的。但现在我认为问题仅仅是因为我正在执行来自 Jupyter 的代码。 @annon 尝试在程序末尾添加executor.shutdown(),或者在您不再需要它的位置。

以上是关于多处理异步:生产者-消费者模型的主要内容,如果未能解决你的问题,请参考以下文章

多线程

生产者消费者模型 与多线程

JUC并发编程 多线程设计模式 -- 异步模式之生产者/消费者

多线程生产者消费者模型

7.2.6 - 并发多线程 生产者,消费者

Python多进程,多线程和异步实例