java并发:多生产者一消费者

Posted

技术标签:

【中文标题】java并发:多生产者一消费者【英文标题】:java concurrency: multi-producer one-consumer 【发布时间】:2012-04-17 21:31:48 【问题描述】:

我有一种情况,不同的线程填充一个队列(生产者),一个消费者从这个队列中检索元素。我的问题是,当从队列中检索这些元素之一时,会丢失一些元素(丢失信号?)。生产者代码是:

class Producer implements Runnable 

    private Consumer consumer;

    Producer(Consumer consumer)  this.consumer = consumer; 

    @Override
public void run() 
    consumer.send("message");
  

它们是通过以下方式创建和运行的:

ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 20; i++) 
  executor.execute(new Producer(consumer));

消费代码是:

class Consumer implements Runnable 

private Queue<String> queue = new ConcurrentLinkedQueue<String>();

void send(String message) 
    synchronized (queue) 
        queue.add(message);
        System.out.println("SIZE: " + queue.size());
        queue.notify();
    


@Override
public void run() 
    int counter = 0;
    synchronized (queue) 
    while(true) 
        try 
            System.out.println("SLEEP");
                queue.wait(10);
         catch (InterruptedException e) 
                Thread.interrupted();
        
        System.out.println(counter);
        if (!queue.isEmpty())              
            queue.poll();
            counter++;
        
    
    



运行代码时,有时会添加 20 个元素并检索 20 个元素,但在其他情况下检索的元素少于 20 个。知道如何解决这个问题吗?

【问题讨论】:

您正在使用低级同步结构(waitnotify)和高级同步结构(ConcurrentLinkedQueueExecutorService)的奇怪组合。使用其中一个! 我做到了,但在这两种情况下我都有同样的问题 看不到实际运行 Consumer 的代码。 只是一个普通的新线程(consumer).start() 【参考方案1】:

我建议您使用 BlockingQueue 而不是 Queue。 LinkedBlockingDeque 可能是您的理想选择。

您的代码如下所示:

void send(String message) 
    synchronized (queue) 
        queue.put(message);
        System.out.println("SIZE: " + queue.size());
    

然后你只需要

queue.take()

在您的消费者线程上

这个想法是 .take() 将阻塞,直到队列中有一个可用的项目,然后完全返回 one (我认为您的实现会受到影响:轮询时缺少通知)。 .put() 负责为您处理所有通知。无需等待/通知。

【讨论】:

试过 LinkedBlockingDeque 但我仍然遇到同样的问题 @Randomize 您能否发布一个使用 BlockingQueue 的有问题代码的示例?消费者代码应该足够了。 我正在重用上面完全相同的代码,我刚刚用 LinkedBlockingDeque 替换了 ConcurrentLinkedQueue。 如上所述,使用BlockingQueue,您应该a) 摆脱等待/通知呼叫,b) 使用.put().take() 而不是.add().poll() . 为什么要围绕queue.put同步?【参考方案2】:

您的代码中的问题可能是因为您使用的是notify 而不是notifyAll。如果有一个等待锁的线程,前者只会唤醒一个线程。这允许没有线程在等待并且信号丢失的竞争条件。 notifyAll 将要求所有线程唤醒以检查它们是否可以获得锁,从而以较小的性能成本强制正确性。

这在Effective Java 1st ed 中得到了最好的解释(参见第 150 页)。第 2 版删除了这个提示,因为程序员应该使用 java.util.concurrent 来提供更强的正确性保证。

【讨论】:

只有一个消费者,所以 notify / notifyAll 没有区别【参考方案3】:

同时使用 ConcurrentLinkedQueue 和同步似乎是个坏主意。它首先违背了并发数据结构的目的。

ConcurrentLinkedQueue 数据结构没有问题,用 BlockingQueue 替换它可以解决问题,但这不是根本原因。

问题在于 queue.wait(10)。这是定时等待方法。 10ms 后会再次获取锁。

    通知 (queue.notify() ) 将丢失,因为如果 10 毫秒已过,则没有消费者线程在等待它。

    生产者将无法添加到队列中,因为他们无法获取锁,因为消费者再次申请了锁。

迁移到 BlockingQueue 解决了您的问题,因为您删除了 wait(10) 代码,BlockingQueue 数据结构处理了等待和通知。

【讨论】:

以上是关于java并发:多生产者一消费者的主要内容,如果未能解决你的问题,请参考以下文章

Java并发编程实战之基于生产者消费者模式的日志服务读书笔记

并发无锁队列

Java并发程序设计设计模式与并发之生产者-消费者模式

Java多线程之并发协作生产者消费者设计模式

Java并发之:生产者消费者问题

Day855.生产者-消费者模式 -Java 并发编程实战