multiprocessing 生产者与消费者模型JoinableQueue
Posted sunshinekimi
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了multiprocessing 生产者与消费者模型JoinableQueue相关的知识,希望对你有一定的参考价值。
from multiprocessing import JoinableQueue import time import random import asyncio import logging from multiprocessing import cpu_count from multiprocessing import Process logging.basicConfig(level = logging.INFO,format = ‘%(asctime)s - %(levelname)s -->%(funcName)s at line %(lineno)d: %(message)s‘) log= logging.getLogger() # set max length of queue and limit concurrent numbers put q_init = JoinableQueue(maxsize=5) async def jobs(item): time.sleep(random.randint(1,4)) status = random.randint(0, 1) if status == 0: return ("success",item) else: return ("failed",item) async def do_work(item): logging.info("do something %s,time start %s" % (item, time.asctime())) a =await jobs(item) return a def async_runner(checker): new_loop = asyncio.new_event_loop() asyncio.set_event_loop(new_loop) loop = asyncio.get_event_loop() task = asyncio.ensure_future(do_work(checker)) loop.run_until_complete(asyncio.wait([task])) st = task.result() return st def worker_consumer(q_init): while True: if q_init.empty(): break # logging.info("queue is empty , Stop Each Process BY Break " ) checker = q_init.get() st=async_runner(checker) if st[0] in ["success", "failed"]: logging.info("%s task finished status is %s" % (st[1],st[0])) # notify q.join() in producer,consumer has get element of queue q_init.task_done() def producer(q_init): for i in range(10): q_init.put(i) # block produce util consumer get all queue elements q_init.join() if __name__ == ‘__main__‘: # GET MAX CPU NUMBER OF MACHINE cpu_count=cpu_count() produce=[Process(target=producer,args=(q_init,))] consums=[Process(target=worker_consumer,args=(q_init,)) for i in range(cpu_count)] for p in produce: p.start() for c in consums: c.start() for pr in produce: pr.join() for c in consums: c.join()
结果:
2019-12-21 19:50:00,754 - INFO --><module> at line 65: cpucount is 8 2019-12-21 19:50:01,036 - INFO -->do_work at line 23: do something 1,time start Sat Dec 21 19:50:01 2019 2019-12-21 19:50:01,038 - INFO -->do_work at line 23: do something 2,time start Sat Dec 21 19:50:01 2019 2019-12-21 19:50:01,040 - INFO -->do_work at line 23: do something 3,time start Sat Dec 21 19:50:01 2019 2019-12-21 19:50:01,041 - INFO -->do_work at line 23: do something 4,time start Sat Dec 21 19:50:01 2019 2019-12-21 19:50:01,055 - INFO -->do_work at line 23: do something 5,time start Sat Dec 21 19:50:01 2019 2019-12-21 19:50:02,042 - INFO -->worker_consumer at line 43: 4 task finished status is success 2019-12-21 19:50:02,043 - INFO -->do_work at line 23: do something 6,time start Sat Dec 21 19:50:02 2019 2019-12-21 19:50:02,056 - INFO -->worker_consumer at line 43: 5 task finished status is success 2019-12-21 19:50:02,056 - INFO -->do_work at line 23: do something 7,time start Sat Dec 21 19:50:02 2019 2019-12-21 19:50:03,041 - INFO -->worker_consumer at line 43: 3 task finished status is success 2019-12-21 19:50:03,042 - INFO -->do_work at line 23: do something 8,time start Sat Dec 21 19:50:03 2019 2019-12-21 19:50:03,058 - INFO -->worker_consumer at line 43: 7 task finished status is success 2019-12-21 19:50:03,059 - INFO -->do_work at line 23: do something 9,time start Sat Dec 21 19:50:03 2019 2019-12-21 19:50:04,036 - INFO -->worker_consumer at line 43: 1 task finished status is failed 2019-12-21 19:50:04,037 - INFO -->do_work at line 23: do something 10,time start Sat Dec 21 19:50:04 2019 2019-12-21 19:50:04,043 - INFO -->worker_consumer at line 43: 6 task finished status is success 2019-12-21 19:50:04,059 - INFO -->worker_consumer at line 43: 9 task finished status is success 2019-12-21 19:50:05,039 - INFO -->worker_consumer at line 43: 2 task finished status is failed 2019-12-21 19:50:07,042 - INFO -->worker_consumer at line 43: 8 task finished status is success 2019-12-21 19:50:08,038 - INFO -->worker_consumer at line 43: 10 task finished status is failed Process finished with exit code 0
以上是关于multiprocessing 生产者与消费者模型JoinableQueue的主要内容,如果未能解决你的问题,请参考以下文章