multiprocessing 生产者与消费者模型JoinableQueue

Posted sunshinekimi

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了multiprocessing 生产者与消费者模型JoinableQueue相关的知识,希望对你有一定的参考价值。

from  multiprocessing  import JoinableQueue
import time
import random
import asyncio
import   logging
from  multiprocessing import cpu_count
from multiprocessing import Process
logging.basicConfig(level = logging.INFO,format = ‘%(asctime)s  - %(levelname)s -->%(funcName)s  at line %(lineno)d: 
 %(message)s‘)
log= logging.getLogger()
# set max length of  queue and limit  concurrent numbers put
q_init = JoinableQueue(maxsize=5)

async def jobs(item):
    time.sleep(random.randint(1,4))
    status = random.randint(0, 1)
    if status == 0:
        return ("success",item)
    else:
        return ("failed",item)

async def do_work(item):
    logging.info("do something %s,time start %s" % (item, time.asctime()))
    a =await jobs(item)
    return a

def async_runner(checker):
    new_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(new_loop)
    loop = asyncio.get_event_loop()
    task = asyncio.ensure_future(do_work(checker))
    loop.run_until_complete(asyncio.wait([task]))
    st = task.result()
    return st

def worker_consumer(q_init):
        while True:
            if q_init.empty():  break
            # logging.info("queue is empty , Stop Each Process  BY Break " )
            checker = q_init.get()
            st=async_runner(checker)
            if st[0] in ["success", "failed"]:
                logging.info("%s task finished status is %s" % (st[1],st[0]))
                # notify q.join() in producer,consumer has get  element  of queue 
                q_init.task_done()



def producer(q_init):
     for  i in range(10):
         q_init.put(i)
     # block produce util consumer get all queue elements
     q_init.join()


if __name__ == ‘__main__‘:
    # GET MAX CPU NUMBER OF MACHINE
    cpu_count=cpu_count()
    produce=[Process(target=producer,args=(q_init,))]
    consums=[Process(target=worker_consumer,args=(q_init,)) for i in range(cpu_count)]
    for p in produce:
        p.start()
    for c in consums:
        c.start()
    for pr in produce:
        pr.join()
    for c in consums:
        c.join()

  结果:

2019-12-21 19:50:00,754  - INFO --><module>  at line 65: 
 cpucount is 8
2019-12-21 19:50:01,036  - INFO -->do_work  at line 23: 
 do something 1,time start Sat Dec 21 19:50:01 2019
2019-12-21 19:50:01,038  - INFO -->do_work  at line 23: 
 do something 2,time start Sat Dec 21 19:50:01 2019
2019-12-21 19:50:01,040  - INFO -->do_work  at line 23: 
 do something 3,time start Sat Dec 21 19:50:01 2019
2019-12-21 19:50:01,041  - INFO -->do_work  at line 23: 
 do something 4,time start Sat Dec 21 19:50:01 2019
2019-12-21 19:50:01,055  - INFO -->do_work  at line 23: 
 do something 5,time start Sat Dec 21 19:50:01 2019
2019-12-21 19:50:02,042  - INFO -->worker_consumer  at line 43: 
 4 task finished status is success
2019-12-21 19:50:02,043  - INFO -->do_work  at line 23: 
 do something 6,time start Sat Dec 21 19:50:02 2019
2019-12-21 19:50:02,056  - INFO -->worker_consumer  at line 43: 
 5 task finished status is success
2019-12-21 19:50:02,056  - INFO -->do_work  at line 23: 
 do something 7,time start Sat Dec 21 19:50:02 2019
2019-12-21 19:50:03,041  - INFO -->worker_consumer  at line 43: 
 3 task finished status is success
2019-12-21 19:50:03,042  - INFO -->do_work  at line 23: 
 do something 8,time start Sat Dec 21 19:50:03 2019
2019-12-21 19:50:03,058  - INFO -->worker_consumer  at line 43: 
 7 task finished status is success
2019-12-21 19:50:03,059  - INFO -->do_work  at line 23: 
 do something 9,time start Sat Dec 21 19:50:03 2019
2019-12-21 19:50:04,036  - INFO -->worker_consumer  at line 43: 
 1 task finished status is failed
2019-12-21 19:50:04,037  - INFO -->do_work  at line 23: 
 do something 10,time start Sat Dec 21 19:50:04 2019
2019-12-21 19:50:04,043  - INFO -->worker_consumer  at line 43: 
 6 task finished status is success
2019-12-21 19:50:04,059  - INFO -->worker_consumer  at line 43: 
 9 task finished status is success
2019-12-21 19:50:05,039  - INFO -->worker_consumer  at line 43: 
 2 task finished status is failed
2019-12-21 19:50:07,042  - INFO -->worker_consumer  at line 43: 
 8 task finished status is success
2019-12-21 19:50:08,038  - INFO -->worker_consumer  at line 43: 
 10 task finished status is failed

Process finished with exit code 0

  

以上是关于multiprocessing 生产者与消费者模型JoinableQueue的主要内容,如果未能解决你的问题,请参考以下文章

Python multiprocess模块(下)

5 并发编程--队列&生产者消费者模型

并发编程锁和队列及生产者消费模型

多进程实现生产者消费者

Python之路:进程线程

线程的应用--生产者与消费者模型