python—多进程的消息队列

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python—多进程的消息队列相关的知识,希望对你有一定的参考价值。

消息队列

消息队列是在消息的传输过程中保存消息的容器

消息队列最经典的用法就是消费者 和生产者之间通过消息管道传递消息,消费者和生成者是不同的进程。生产者往管道写消息,消费者从管道中读消息

操作系统提供了很多机制来实现进程间的通信,multiprocessing模块提供了Queue和Pipe两种方法来实现


一、使用multiprocessing里面的Queue来实现消息队列

q = Queue

q.put(data)

data = q.get()


例子:

from multiprocessing import Queue, Process
def write(q):
    for i in ["a","b","c","d"]:
        q.put(i)
        print("put {0} to queue".format(i))
def read(q):
    while 1:
        result = q.get()
        print("get {0} from queue".format(result))
def main():
    q = Queue()
    pw = Process(target=write,args=(q,))
    pr = Process(target=read,args=(q,))
    pw.start()
    pr.start()
    pw.join()
    pr.terminate()
if __name__ == "__main__":
    main()

运行结果:

put a to queue

put b to queueget a from queue

get b from queue

put c to queue

put d to queue

get c from queue

get d from queue



二、通过Multiprocessing里面的Pipe来实现消息队列

1)Pipe方法返回(conn1,conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplux参数为True(默认值),那么这个管道是全双工模式,即conn1和conn2均可收发。duplex为False,conn1负责接收消息,conn2负责发行消息

2)send和recv方法分别是发送和接收消息的方法。close方法表示关闭管道,当消息接收结束以后,关闭管道。


例子:

from multiprocessing import Process,Pipe
import time
def proc1(pipe):
    for i in xrange(1,10):
        pipe.send(i)
        time.sleep(3)
        print("send {0} to pipe".format(i))
def proc2(pipe):
    n = 9
    while n>0:
        result = pipe.recv()
        time.sleep(3)
        print("recv {0} from pipe".format(result))
        n -= 1
if __name__ == "__main__":
    pipe = Pipe(duplex=False)
    print(type(pipe))
    p1 = Process(target=proc1,args=(pipe[1],))
    p2 = Process(target=proc2,args=(pipe[0],))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    pipe[0].close()
    pipe[1].close()


运行结果:

<type 'tuple'>

send 1 to pipe

recv 1 from pipe

send 2 to pipe

recv 2 from pipe

recv 3 from pipe

send 3 to pipe

send 4 to piperecv 4 from pipe

send 5 to pipe

recv 5 from pipe

recv 6 from pipe

send 6 to pipe

send 7 to pipe

recv 7 from pipe

send 8 to pipe

recv 8 from pipe

recv 9 from pipesend 9 to pipe


三、Queue模块

python提供了Queue模块来专门实现消息队列:

Queue对象实现一个fifo队列(其他的还有lifo、priority队列)。queue只有gsize一个构造函数,用来指定队列容量,指定为0的时候代表容量无限。只要有以下成员函数:

Queue.gsize():返回消息队列的当前空间。返回的值不一定可靠。

Queue.empty():判断消息队列是否为空,返回True或者False。同样不可靠

Queue.full():判断消息是否满

Queue.put(item,block=True,timeout=None):往消息队列中存放数据。block可以控制是否阻塞,timeout控制阻塞时候的等待时间。如果不阻塞或者超时,会引起一个full exception。

Queue.put_nowait(item):相当于put(item,False)

Queue.get(block=True,timeout=None):获取一个消息,其他等同put


以两个函数用来判断消息对应的任务是否完成:

Queue.task_done():接收消息的线程通过调用这个函来说明消息对应的任务已完成

Queue.join():实际上意味着等到队列为空,再执行别的操作


以上是关于python—多进程的消息队列的主要内容,如果未能解决你的问题,请参考以下文章

python多进程

Python进阶第二篇多线程消息队列queue

python并发编程:多进程-队列

Python 3 并发编程多进程之队列(推荐使用)

RabbitMQ 消息队列

从 Python 多处理中的排队进程获取错误标志/消息