java并发:多生产者一消费者
Posted
技术标签:
【中文标题】java并发:多生产者一消费者【英文标题】:java concurrency: multi-producer one-consumer 【发布时间】:2012-04-17 21:31:48 【问题描述】:我有一种情况,不同的线程填充一个队列(生产者),一个消费者从这个队列中检索元素。我的问题是,当从队列中检索这些元素之一时,会丢失一些元素(丢失信号?)。生产者代码是:
class Producer implements Runnable
private Consumer consumer;
Producer(Consumer consumer) this.consumer = consumer;
@Override
public void run()
consumer.send("message");
它们是通过以下方式创建和运行的:
ExecutorService executor = Executors.newSingleThreadExecutor();
for (int i = 0; i < 20; i++)
executor.execute(new Producer(consumer));
消费代码是:
class Consumer implements Runnable
private Queue<String> queue = new ConcurrentLinkedQueue<String>();
void send(String message)
synchronized (queue)
queue.add(message);
System.out.println("SIZE: " + queue.size());
queue.notify();
@Override
public void run()
int counter = 0;
synchronized (queue)
while(true)
try
System.out.println("SLEEP");
queue.wait(10);
catch (InterruptedException e)
Thread.interrupted();
System.out.println(counter);
if (!queue.isEmpty())
queue.poll();
counter++;
运行代码时,有时会添加 20 个元素并检索 20 个元素,但在其他情况下检索的元素少于 20 个。知道如何解决这个问题吗?
【问题讨论】:
您正在使用低级同步结构(wait
、notify
)和高级同步结构(ConcurrentLinkedQueue
、ExecutorService
)的奇怪组合。使用其中一个!
我做到了,但在这两种情况下我都有同样的问题
看不到实际运行 Consumer 的代码。
只是一个普通的新线程(consumer).start()
【参考方案1】:
我建议您使用 BlockingQueue 而不是 Queue。 LinkedBlockingDeque 可能是您的理想选择。
您的代码如下所示:
void send(String message)
synchronized (queue)
queue.put(message);
System.out.println("SIZE: " + queue.size());
然后你只需要
queue.take()
在您的消费者线程上
这个想法是 .take() 将阻塞,直到队列中有一个可用的项目,然后完全返回 one (我认为您的实现会受到影响:轮询时缺少通知)。 .put() 负责为您处理所有通知。无需等待/通知。
【讨论】:
试过 LinkedBlockingDeque 但我仍然遇到同样的问题 @Randomize 您能否发布一个使用 BlockingQueue 的有问题代码的示例?消费者代码应该足够了。 我正在重用上面完全相同的代码,我刚刚用 LinkedBlockingDeque 替换了 ConcurrentLinkedQueue。 如上所述,使用BlockingQueue
,您应该a) 摆脱等待/通知呼叫,b) 使用.put()
和.take()
而不是.add()
和.poll()
.
为什么要围绕queue.put
同步?【参考方案2】:
您的代码中的问题可能是因为您使用的是notify
而不是notifyAll
。如果有一个等待锁的线程,前者只会唤醒一个线程。这允许没有线程在等待并且信号丢失的竞争条件。 notifyAll 将要求所有线程唤醒以检查它们是否可以获得锁,从而以较小的性能成本强制正确性。
这在Effective Java 1st ed 中得到了最好的解释(参见第 150 页)。第 2 版删除了这个提示,因为程序员应该使用 java.util.concurrent 来提供更强的正确性保证。
【讨论】:
只有一个消费者,所以 notify / notifyAll 没有区别【参考方案3】:同时使用 ConcurrentLinkedQueue 和同步似乎是个坏主意。它首先违背了并发数据结构的目的。
ConcurrentLinkedQueue 数据结构没有问题,用 BlockingQueue 替换它可以解决问题,但这不是根本原因。
问题在于 queue.wait(10)。这是定时等待方法。 10ms 后会再次获取锁。
通知 (queue.notify() ) 将丢失,因为如果 10 毫秒已过,则没有消费者线程在等待它。
生产者将无法添加到队列中,因为他们无法获取锁,因为消费者再次申请了锁。
迁移到 BlockingQueue 解决了您的问题,因为您删除了 wait(10) 代码,BlockingQueue 数据结构处理了等待和通知。
【讨论】:
以上是关于java并发:多生产者一消费者的主要内容,如果未能解决你的问题,请参考以下文章