使用 ArrayBlockingQueue 和 mutex 的多线程代码中的疑点

Posted

技术标签:

【中文标题】使用 ArrayBlockingQueue 和 mutex 的多线程代码中的疑点【英文标题】:Doubts in code of multi-threading using ArrayBlockingQueue and mutex 【发布时间】:2013-04-18 10:40:32 【问题描述】:

我正在尝试编写多线程代码。但说真的,我不明白我可以从哪里开始。我的头也在敲。请帮帮我。

我的任务是,

    有一个长度为 1 的队列,称为 pending_tasks,其中包含需要一些处理的任务。 还有另一个长度为 1 的队列,称为 completed_tasks,其中包含完成处理并准备交付的任务。

我的实施思路,

    首先制作两个阻塞队列,pending_taskscompleted_tasks。 一个线程(生产者)总是监听来自外部的任务,如果被放入pending_tasks。 一个线程(消费者)随时准备从pending_tasks 接收任务并开始处理,然后放入completed_tasks。 然后再次来到pending_tasks,每当有任何任务到来时,都开始相同的处理。 基本上,它是单一生产者-单一消费者问题。

我的困惑,

我知道它可以通过使用 ArrayBlockingQueue 和 Mutex 来编写代码。但我不明白我该如何开始。我对互斥锁有很好的了解,我从link 中了解了互斥锁,并且对阻塞队列也有很好的理解,因为我在这个网站上阅读了很多问题。

能否请您给我一些实现指导,以便我可以编写这个多线程代码。

我已经为此编写了一些代码,但这并没有达到我任务的最终目标。

提前致谢。期待您的友好回复。

编辑编号。 1

请看我下面的代码。此代码工作正常,但此代码缺少一个功能。请帮我补充一下,提供一些指导。

功能是,

    当生产者线程将某个值放入pending_task 队列时,它会在那里等待一段时间。如果在那个时候消费者将结果提供给消费者,那么它就可以了。否则,它说超时,生产者获取另一个值并将其放入 pending_task 队列中,并且相同的进程开始。

请帮我添加上述功能。我认为我们必须在生产者线程和消费者线程之间进行通信,而线程通信是通过使用互斥体完成的(我认为)。请帮我实现同样的

我的代码,

多线程类

package multithread;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

public class MultiThread 

    public static BlockingQueue<Integer> pending_task;
    public static BlockingQueue<Integer> completed_task;

    public MultiThread(int length) 
        pending_task = new ArrayBlockingQueue<Integer>(length, true);
        completed_task = new ArrayBlockingQueue<Integer>(length, true);
    

生产者类

package multithread;

import java.util.logging.Level;
import java.util.logging.Logger;

public class Producer implements Runnable 

    @Override
    public void run() 
        for (int i = 0; i < 10; i++) 
            try 
                System.out.println("PRODUCER: Try to put value  " + i + "  in the pending queue");
                MultiThread.pending_task.put(i);
                System.out.println("PRODUCER: Successfully put value  " + i + "  in the pending queue, now its turn to consumer");
             catch (InterruptedException ex) 
                Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
            
        
    

消费类

package multithread;

import java.util.logging.Level;
import java.util.logging.Logger;

public class Consumer implements Runnable 

    @Override
    public void run() 
        for (int i = 0; i < 10; i++) 
            try 
                System.out.println("CONSUMER: Try to take value from the pending queue");
                int val = MultiThread.pending_task.take();
                System.out.println("CONSUMER:  Successfully take value, and that is   " + val);
                System.out.println("CONSUMER: Processing starts");
                Thread.sleep(1000);
                System.out.println("CONSUMER: Processing ends");
                System.out.println("CONSUMER: Try to put that  that value in  completed queue, and the value is   " + val);
                MultiThread.completed_task.put(val);
                System.out.println("CONSUMER: Successfully put into completed queue");

                //Serve this value to the corresponding user
             catch (InterruptedException ex) 
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            

        
    

DeliveryBoy 类

package multithread;

import java.util.logging.Level;
import java.util.logging.Logger;

public class DeliveryBoy implements Runnable 

    @Override
    public void run() 
        for (int i = 0; i < 10; i++) 
            try 
                System.out.println("DELIVERYBOY: Waiting for the value near completed queue");
                int val = MultiThread.completed_task.take();
                System.out.println("DELIVERYBOY:  Succesfully take value from completed queue and the vlue is  " + val);
                //Serve this value to the corresponding user
             catch (InterruptedException ex) 
                Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
            

        
    

测试类

package multithread;

public class Test 

    public static void main(String[] args) 
        // TODO code application logic here
        MultiThread ml = new MultiThread(1);
        new Thread(new Producer()).start();
        new Thread(new Consumer()).start();
        new Thread(new DeliveryBoy()).start();
    

【问题讨论】:

除非您发布当前代码并围绕为什么不起作用提出问题,否则您的问题很可能会被关闭。目前,您的问题非常模糊和开放式,只能由为您编写完整内容的人来回答。 您的互斥链接相当旧。虽然您仍然可以对消费者/生产者模式使用等待/通知,但还有更有用和更高级别的解决方案。我建议你阅读docs.oracle.com/javase/tutorial/essential/concurrency。 @DuncanJones 我添加了有问题的代码。请看 @DuncanJones 请查看我的代码,并提供一些指导。 请参阅编辑号。 1,它显示了我想要添加的代码和一些功能。请高手给点建议。我怎样才能做到这一点。如果可能,请提供一些实施指南。 【参考方案1】:

来自ArrayBlockingQueue#put

public void put(E e) 抛出 InterruptedException

在这个队列的尾部插入指定元素,等待 **如果队列已满,则可用空间

来自ArrayBlockingQueue#take

public E take() 抛出 InterruptedException

从接口复制的描述:BlockingQueue 检索和删除 此队列的头部,在必要时等待,直到元素变为 可用

所以您需要做的就是从您的线程中调用这些方法。 试试这个(研究 javadoc),当你有更具体的问题时,你可以再问。

【讨论】:

你在回答中解释的事情我已经知道了。我觉得我在理论上已经很好了。但我无法将其转换为代码。我做了一些代码(见问题的更新部分),但是有很多问题,请帮我解决这些问题。是的,我也想在其中添加互斥锁概念。 这是个问题,我该如何实现这个,thia中缺少一些东西,请指导我,我如何在这里实现线程和互斥锁。 @devsda:看看这个:docs.oracle.com/javase/tutorial/essential/concurrency/… 根据这个页面,我必须为生产者、消费者创建不同的类?但实际上我希望这一切都在一个类的同一个模块中执行。我可以这样做吗? 请参阅编辑号。 1,它显示了我想要添加的代码和一些功能。请高手给点建议。我怎样才能做到这一点。如果可能,请提供一些实施指南。

以上是关于使用 ArrayBlockingQueue 和 mutex 的多线程代码中的疑点的主要内容,如果未能解决你的问题,请参考以下文章

ArrayBlockingQueue 和LinkedBlockingQueue

ArrayBlockingQueue 和LinkedBlockingQueue

线程池的三种队列区别:SynchronousQueueLinkedBlockingQueue 和ArrayBlockingQueue

线程池的三种队列区别:SynchronousQueueLinkedBlockingQueue 和ArrayBlockingQueue

使用 ArrayBlockingQueue 和 mutex 的多线程代码中的疑点

20.并发容器之ArrayBlockingQueue和LinkedBlockingQueue实现原理详解