多消费者单生产者队列

Posted

技术标签:

【中文标题】多消费者单生产者队列【英文标题】:Multiple Consumer Single Producer Queue 【发布时间】:2014-01-02 16:00:04 【问题描述】:

我正在尝试解决 Python 中的这个同步问题。我有一个生产者线程和(可选)多个消费者线程(取决于命令,即 ./script sums.txt -c 10)。 现在有 1 个生产者和 1 个消费者没有问题,因为同步是用队列处理的。

现在的问题是,如果有超过 1 个消费者线程,线程 1 可能会从队列中获取一个项目并对其进行处理。虽然线程 2 执行相同但比线程 1 更快,并在线程 1 之前打印。我尝试使用随机计时器模拟这个问题。

我的输出现在带有随机计时器:“./script sommen.txt -c 2” 正如您注意到队列中的第二个项目在第一个项目之前处理的那样,如果没有随机计时器,不会发生很多事情,因为操作非常简单,因此线程足够快。有没有办法解决这个问题?我考虑过锁,但那会使程序效率低下?

另一件事,清理线程的最佳方法是什么。我知道我的队列什么时候完成(哨兵值)但是清理线程的好方法是什么?

非常感谢!

Consumers is set to: 2
I'm thread number: 4316991488 Read  (P): 12 + 90
I'm thread number: 4316991488 Read  (P): 420 / 20
I'm thread number: 4316991488 Read  (P): 12 + 90
I'm thread number: 4316991488 Read  (P): 420 / 20
Monitor is done
I'm thread number: 4329586688 Write (C): 420 / 20 = 21.0
I'm thread number: 4324331520 Write (C): 12 + 90 = 102

--

#!/usr/bin/env python

import threading
import operator
import sys
import queue
import optparse
from time import sleep
import random

def optionsparser():
    parser = optparse.OptionParser(
        usage="usage: %prog file [Options]")
    parser.add_option("-c", "--consumer", dest="consumer", type="int",
                      help="consumer <ident> [default: %default]")

    parser.set_defaults(consumer=1)
    opts, files = parser.parse_args()

    filename = files[0]

    try:
        _f = open(filename)
        return(filename, opts.consumer)
    except IOError:
        print ('Oh dear I/O Error')

def readitems(filename):

    print("Read from file: ", filename)
    with open(filename, 'r') as f:
        mylist = [line.rstrip('\n') for line in f]
    f.close()

    try: 
        for _line in mylist:
            data = _line.split(' ')

            qprint.put(data) #write to monitor queue
            qsum.put(data) #write to consumer queue

    except ValueError as e:
        print(e)
    except RuntimeError as err:
        print(err)
    finally:
        qsum.put("Done Flag")
        qprint.put("Done Flag")
def consumer(qsum):

    while qsum:
        sleeptime = random.randint(1,10)
        sleep(sleeptime)
        try:
            if qsum.get() == "Done Flag":
                print("Monitor queue empty", threading.get_ident())
                ## Clean up
                # Put bakc for other consumers
                qsum.put("Done Flag")
                #cleanup here

            else:
                data = qsum.get()
                operator = calc(data)

        except EnvironmentError as Err:
            print(Err)

def calc(data):

    try:
        sleeptime = random.randint(1,10)
        sleep(sleeptime)
        getal1, diff, getal2 = data
        getal1 = int(getal1)
        getal2 = int(getal2)

        if diff == '+':
            print("I'm thread number:", threading.get_ident(), "Write (C):", str(getal1), diff, str(getal2), "=",  operator.add(getal1, getal2))
        elif diff == '-':
            print("I'm thread number:", threading.get_ident(), "Write (C):", str(getal1), diff, str(getal2), "=",  operator.sub(getal1, getal2))
        elif diff == '*':
            print("I'm thread number:", threading.get_ident(), "Write (C):", str(getal1), diff, str(getal2), "=",  operator.mul(getal1, getal2))
        elif diff == '/':
            print("I'm thread number:", threading.get_ident(), "Write (C):", str(getal1), diff, str(getal2), "=",  operator.truediv(getal1, getal2))
        elif diff == '%':
            print("I'm thread number:", threading.get_ident(), "Write (C):", str(getal1), diff, str(getal2), "=",  operator.mod(getal1, getal2))
        elif diff == '**':
            print("I'm thread number:", threading.get_ident(), "Write (C):", str(getal1), diff, str(getal2), "=",  operator.pow(getal1, getal2))
        else:
            print("I'm thread number:", threading.get_ident(), "Write (C):", str(getal1), diff, str(getal2), "=", "Unknown operator!")

    except ZeroDivisionError as Err:
        print(Err)
    except ValueError:
        print("Wrong input")

def producer(reqs):  
    try:
        readitems(reqs)
    except IndexError as e:
        print(e)


def monitor(qprint):

    while qprint:
        try:
            if qprint.get() == "Done Flag":

                print("Monitor is done")
            else:
                data = (qprint.get())
                getal1, diff, getal2 = data
                print("I'm thread number:", threading.get_ident(), "Read  (P):", str(getal1), diff, str(getal2))
        except RuntimeError as e:
            print(e)

if __name__ == '__main__':

    try:
        reqs = optionsparser() 
        #create queu's
        qprint = queue.Queue()
        qsum = queue.Queue()
        #monitor threads
        t2 = threading.Thread(target=monitor, args=(qprint,))
        t2.start()
        #create consumers threads 
        thread_count = reqs[1]
        print("Consumers is set to:", thread_count)
        for i in range(thread_count):
            t = threading.Thread(target=consumer, args=(qsum,))
            t.start()

        #start producer 
        producer(reqs[0])

    except RuntimeError as Err:
        print(Err)
    except AssertionError as e:
        print(e)

【问题讨论】:

【参考方案1】:

当任务可以被拆分和独立威胁时,使用线程是有效的。如果您想使用thead,请记住,当代码中没有或很少锁定点时,并行化代码效率更高。锁定点可以是共享资源。

在您的情况下,您只需生成/使用数据并且希望它同步。如果您按顺序运行此代码,效率会更高,否则您必须更准确地定义哪些任务可以从并行化中受益。

【讨论】:

【参考方案2】:

首先:不要使用 Python 线程来加速 CPU 密集型任务,例如计算。除了减速,你永远不会看到任何东西。 Because GIL。务必将 Python 线程用于 I/O 绑定任务,例如 URL 获取。

如果您希望结果按发布顺序到达,请为每个队列元素指定一个序列号。这样每个任务都会知道它的结果属于哪里。

使用有序集合(例如列表)来放置工作线程产生的结果,使用序列号作为索引。由于您可能会以相反的顺序接收结果,因此您需要将它们全部存储(不能流式传输)。

我不明白为什么在这里使用锁定。首先,锁通过阻塞其他独立的工作人员来破坏并行处理的目的。其次,锁很难并且容易出现细微的错误。队列更友好。

【讨论】:

以上是关于多消费者单生产者队列的主要内容,如果未能解决你的问题,请参考以下文章

并发无锁队列

lockFreeQueue 无锁队列实现与总结

用阻塞队列实现生产者消费者模式一(单线程消费)

并发无锁队列学习(单生产者单消费者模型)

用阻塞队列实现生产者消费者模式二(多线程消费)

单生产者/单消费者 的 FIFO 无锁队列