生产者和消费者模式-代码

Posted 安迪9468

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了生产者和消费者模式-代码相关的知识,希望对你有一定的参考价值。

函数:生产者和消费者

import random
from queue import Queue

from threading import Thread, current_thread

import time

# 实例化一个队列
myq = Queue()

# 定义生产者
def producer():
    while True:
        tmp = random.randint(1,100)
        myq.put(tmp)
        print("%s生产了%s,生产后,现在产品总量:%s" % (current_thread().name, tmp, myq.qsize()))
        time.sleep(0.5)


# 定义消费者
def consumer():
    while True:
        print("%s消费了%s,剩余产品%s" % (current_thread().name, myq.get(), myq.qsize()))
        time.sleep(1.1)


# 启动生产者和消费者
# 启动生产者
tp = Thread(target=producer)
tp.start()

# 启动消费者
for i in range(2):
    tc = Thread(target=consumer)
    tc.start()

函数2:

# coding:utf-8

from queue import Queue

from threading import Thread,current_thread

import random
import time



# 实例化一个队列,线程安全
myq = Queue()

# 定义生产者
def produce():
    while True:
        tmp = random.randint(1,100)
        myq.put(tmp)
        print(‘%s生产了%s‘ % (current_thread().name,tmp))
        time.sleep(0.5)


# 消费者
def consumer():
    while True:
        print(‘%s消费了%s‘ % (current_thread().name, myq.get()))
        time.sleep(1)


# 启动生产者和消费者
t_p = Thread(target=produce)
t_p.start()

# 启动消费者
for i in range(2):
    t_cs = Thread(target=consumer)
    t_cs.start()

  

函数3:

# 编写一个基于tcp的echo服务器(回响服务器,即将客户端发送的信息返回给客户端),
# 要求使用线程和生产者消费者模型(提示:一个线程accept--生产者;两个线程用于接收和发送--消费者)。
import socket
from threading import Thread, current_thread
from queue import Queue


# 生产者
def accept_t(queue):
    print("当前线程",current_thread().name)
    # client_info = server.accept()
    # queue.put(client_info)



# 消费者recv
def recv_t(queue, queue_data):
    client_info = queue.get()
    client_sock = client_info[0]
    data = client_sock.recv(1024)
    queue_data.put(data)
    pass
    try:
        print(data.decode())
    except:
        print(data.decode(‘gbk‘))


# 消费者send
def send_t(queue_data):
    data = queue_data.get()
    client_sock = client_info[0]
    client_sock.send(data)
    client_sock.close()
    pass


if __name__ == "__main__":
    client_info = None
    server = None

    # 创建服务器的套接字(监听套接字)
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    # 设置地址复用属性
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

    # 绑定IP和端口
    server_address = ("", 7972)
    server.bind(server_address)

    # 监听
    server.listen(128)

    queue = Queue()
    queue_data = Queue()

    t1 = Thread(target=accept_t, args=(queue))
    t1.start()

    t2 = Thread(target=recv_t, args=(queue, queue_data))
    t2.start()

    t3 = Thread(target=send_t, args=(queue_data,))
    t3.start()

    t1.join()
    t2.join()
    t3.join()

  

  

 

类:生产者和消费者

import socket
from queue import Queue
from threading import Thread

import time

import chardet

client_queue = Queue()


# 生产者
class Producer(Thread):
    def __init__(self, tcp_server):
        super().__init__()
        self.tcp_server = tcp_server


    def run(self):
        client_info = self.tcp_server.accept()
        client_queue.put(client_info)



# 消费者
class Consumer(Thread):
    def __init__(self):
        super().__init__()


    def run(self):
        client_info = client_queue.get()
        client_sock = client_info[0]
        client_addr = client_info[1]
        msg = client_sock.recv(1024)
        print("原始字节流:",msg)

        a = ‘abcd‘.encode("UTF-8")
        print(‘a:‘, a)

        # a = msg.decode()
        code = chardet.detect(a)
        print(‘获取到a的编码是‘,code[‘encoding‘])



        print("%s说:%s" % (client_addr, msg.decode()))
        client_sock.send(msg.decode().encode(‘gbk‘))

        client_sock.close()
        print(‘consumer is over‘)


# 主函数
def main():
    tcp_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    tcp_server.bind(("", 7892))
    tcp_server.listen(128)

    p = Producer(tcp_server)
    c1 = Consumer()
    # c2 = Consumer()

    p.start()
    c1.start()
    # c2.start()

    # time.sleep(2)

    p.join()
    c1.join()
    # c2.join()

    tcp_server.close()


if __name__ == ‘__main__‘:
    main()

  

以上是关于生产者和消费者模式-代码的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ工作模式

生产者消费者模型-Java代码实现

Java多线程:生产者消费者模型

阻塞队列和生产者-消费者模式

并行模式之生产者-消费者模式

用ReentrantLock和Condition实现生产者和消费者模式