自定义生产者消费者

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自定义生产者消费者相关的知识,希望对你有一定的参考价值。

我尝试了一个Producer Consumer程序。有时输出是正确的,有时在输出中间会发生异常,尽管程序运行完成。

我得到IndexOutOfBoundsException,我相信原因如下: - 当Q为空时,所有3个消费者线程都进入等待状态;当生产者添加一个项目并通知所有等待的线程时,在消费者线程删除该项目后,另一个消费者线程醒来将尝试删除(当Q现在为空时)导致此问题..我知道它是一场比赛条件,但无法弄清楚如何避免它...欢迎任何想法/建议。

另一个问题 - 我无法找到一种方法来优雅地终止这个程序..截至目前我已经使用System.exit(0)生成最后一个项目..欢迎任何其他更好的想法。

P.S

我不想使用任何java的API同步类,我想尝试使用wait()/notify()机制..

class Producer implements Runnable
{
    private Queue q;
    Producer(Queue q)
    {
        this.q = q;
    }
    @Override
    public void run() { 
        for(int i =0;i<50;i++)
            try {
                q.add(new Integer(i));
            } catch (InterruptedException e) {  
                e.printStackTrace();
            }   
    }
}
class Consumer extends Thread
{
    private Queue q;
    Consumer(Queue q)
    {
        this.q = q;
    }   
    public void run()
    {       
        try {
            while(true)
            {
            System.out.println(Thread.currentThread().getName()+"-"+ q.get());
            }
        } catch (InterruptedException e) {      
            e.printStackTrace();
        }
    }
}
public class Main 
{
    public static void main(String args[])
    {
    Queue q = new Queue();
    Producer runObj = new Producer(q);
    Thread producerObj = new Thread(runObj,"Producer");
    producerObj.start();    
    Consumer c1 = new Consumer(q);
    Consumer c2 = new Consumer(q);
    Consumer c3 = new Consumer(q);
    c1.setName("c1");
    c2.setName("c2");
    c3.setName("c3");
    c1.start();
    c2.start();
    c3.start(); 
    }
}

队列类:

public class Queue {

    private ArrayList<Integer> itemQ;
    int qSize = 5;

    Queue()
    {
        itemQ = new ArrayList<Integer>();
    }

    public synchronized void add(Integer item) throws InterruptedException
    {

        if(itemQ.size() == qSize)
        {
            System.out.println("Q is full");
            wait();
        }
        System.out.println(item);
        if(item.equals(new Integer(49)))
        {
            System.out.println("Out Of Stock");
            System.exit(0);

        }
        itemQ.add(item);
        notifyAll();
    }

    public synchronized Integer get() throws InterruptedException
    {
        if(itemQ.isEmpty())
        {
            wait();
        }   
        Integer item = itemQ.remove(0);     
        notify();       
        return item;
    }
}
答案

您需要更改Queue.addQueue.get中的if测试以改为使用循环。例如,将Queue.get方法的代码更改为

while (itemQ.isEmpty()) {
    wait();
}
Integer item = itemQ.remove(0);
notify();
return item;

当你打电话给等你放弃锁定,一旦你重新获得它,你需要测试你测试的条件(在你放开锁之前)是不是真的。当某些东西被添加到队列中时,在获取对队列的锁定时阻塞的每个消费者都会得到通知,并且其中任何一个都可以获得它。因此,在线程从等待中唤醒的时间与重新获取锁定的时间之间的间隔中,另一个消费者可以潜入并从队列中删除某些内容(使您认为队列不能为空的假设无效)。

还有虚假的唤醒,即使没有应用程序事件导致它,线程也可以获得通知。这是检查状态在醒来时的另一个原因。

以上是关于自定义生产者消费者的主要内容,如果未能解决你的问题,请参考以下文章

Springboot 2.0.x Redis缓存Key生成器,自定义生成器

tensorflow:使用自定义生成器时您的输入用完了数据

教义 orm:generate-proxies 抛出“无法实例化自定义生成器”

无法使用自定义生成器策略为 getter 名称创建记录

SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段

生产者消费者模型-Java代码实现