Java:生产者=消费者,如何知道何时停止?

Posted

技术标签:

【中文标题】Java:生产者=消费者,如何知道何时停止?【英文标题】:Java: Producer = Consumer, how to know when to stop? 【发布时间】:2012-04-02 20:59:28 【问题描述】:

我有几个工人,他们使用 ArrayBlockingQueue。

每个worker从队列中取出一个对象,处理它,结果可以得到几个对象,这些对象将被放入队列中进行进一步处理。所以,工人 = 生产者 + 消费者。

工人:

public class Worker implements Runnable

    private BlockingQueue<String> processQueue = null;

    public Worker(BlockingQueue<String> processQueue)
    
        this.processQueue = processQueue;
    

    public void run()
    
        try
        
            do
            
                String item = this.processQueue.take();
                ArrayList<String> resultItems = this.processItem(item);

                for(String resultItem : resultItems)
                
                    this.processQueue.put(resultItem);
                
            
            while(true);
        
        catch(Exception)
        
            ...
        
    

    private ArrayList<String> processItem(String item) throws Exception
    
        ...
    

主要:

public class Test

    public static void main(String[] args) throws Exception
    
        new Test().run();
    

    private void run() throws Exception
    
        BlockingQueue<String> processQueue = new ArrayBlockingQueue<>(10000);
        processQueue.put("lalala");

        Executor service = Executors.newFixedThreadPool(100);
        for(int i=0; i<100; ++i)
        
            service.execute(new Worker(processQueue));
        
    

当没有更多工作时,停止工人的最佳方法是什么?

首先,我想到的是定期检查队列中有多少项目以及当前正在处理的项目有多少。如果两者都为零,则在 ExecutorService 上执行类似“shutdownNow()”的操作。但我不确定这是不是最好的方法。

【问题讨论】:

更有趣的问题是什么是你对循环模式的保护,显然队列在某些情况下可能会自我复制。 你能澄清一下用例吗?您是否希望应用在队列为空时自行终止,或者当请求关闭应用时,它只会在队列为空后才这样做? @Osw,这里的“循环模式”是什么意思? @Handerson,是的,我想在没有更多工作时终止应用程序本身。 @OlegGolovanov 如果您可以发布一个简单示例(与生产者、消费者、执行者一起),说明您要实现的目标,可能更容易理解您的问题。 【参考方案1】:

如果没有更多工作要做,请在队列中添加一条消息,这样说,并让工作人员在方便时自行关闭。这是防止数据损坏的好方法。

如果您需要通知另一个线程所有工人都回家了,您可以使用CountDownLatch 来执行此操作。

【讨论】:

我不能使用“毒丸”,因为我没有一个制作人,知道当他没有更多工作要制作时。关于 CountDownLatch:为了做到这一点,每个工人都必须知道什么时候没有更多的工作:)【参考方案2】:

听起来您有自己的解决方案 - 使用单独的进行中队列,其大小将是当前正在处理的项目数。如果您使用访问任一队列的约定是在synchronized(theArrayBlockingQueue) 块中,那么一切都应该很好。特别是,当将一个项目移动到处理状态时,将其从 ArrayBlockingQueue 中移除,并将其添加到 same 同步块内的 processingQueue 中。

【讨论】:

感谢您的回答 :) 我真的需要“processingQueue”还是 AtomicInteger(表示当前有多少项目正在处理中)就足够了?可能我什么都不知道。 我喜欢用数据结构而不是线程的局部变量来表示“状态”。例如,我可以作为控制线程,在 ArrayBlockingQueue 上进行同步,杀死所有工作人员,将工作中队列中的项目返回到 ArrayBlockingQueue,然后重新启动工作人员。如果您必须与线程通信以恢复它们的状态,这将更加困难。【参考方案3】:

我稍微修改了您的代码,不确定这是否是您所期望的,但至少它终止了!如果您使用shutdownNow 而不是shutdown,您的工作人员将被打断,除非您让他们重新工作,否则将退出且不保证队列为空。

public class Test 

    public static void main(String[] args) throws Exception 
        new Test().run();
    

    private void run() throws Exception 
        BlockingQueue<String> processQueue = new ArrayBlockingQueue<>(10000);
        processQueue.put("lalalalalalalalalalalalala"); //a little longer to make sure there is enough to process

        ExecutorService service = Executors.newFixedThreadPool(100);
        for (int i = 0; i < 100; ++i) 
            service.execute(new Worker(processQueue));
        
        service.shutdown(); //orderly shutdown = lets the tasks terminate what they are doing
        service.awaitTermination(1, TimeUnit.SECONDS); //blocks until all tasks have finished or throws TimeOutException if timeout is reached
    

    public static class Worker implements Runnable 

        private BlockingQueue<String> processQueue = null;
        private int count = 0;

        public Worker(BlockingQueue<String> processQueue) 
            this.processQueue = processQueue;
        

        @Override
        public void run() 
            try 
                do 
                    //tries to get something from the queue for 100ms and returns null if it could not get anything
                    String item = this.processQueue.poll(100, TimeUnit.MILLISECONDS);
                    if (item == null) break; //Ends the job because the queue was empty
                    count++;
                    List<String> resultItems = this.processItem(item);

                    for (String resultItem : resultItems) 
                        this.processQueue.put(resultItem);
                    
                 while (true);
             catch (InterruptedException e) 
                System.out.println("Interrupted");
                Thread.currentThread().interrupt();
            
            if (count != 0) System.out.println(Thread.currentThread() + ": processed " + count + " entries");
        

        private List<String> processItem(String item)  //let's put the string back less final character
            if (item.isEmpty()) 
                return Collections.<String> emptyList();
             else 
                return Arrays.asList(item.substring(0, item.length() - 1));
            
        
    

【讨论】:

以上是关于Java:生产者=消费者,如何知道何时停止?的主要内容,如果未能解决你的问题,请参考以下文章

21 | Java 消费者是如何管理TCP连接的?

Java线程同步模型-生产者与消费者

多线程生产者消费者模式中,如何停止消费者?多生产者情况下对“毒丸”策略的应用。

生产者消费者模型Java实现

Java线程通信-生产者消费者问题

JAVA SE基础篇60.线程协作