使用 ArrayBlockingQueue 和 mutex 的多线程代码中的疑点
Posted
技术标签:
【中文标题】使用 ArrayBlockingQueue 和 mutex 的多线程代码中的疑点【英文标题】:Doubts in code of multi-threading using ArrayBlockingQueue and mutex 【发布时间】:2013-04-18 10:40:32 【问题描述】:我正在尝试编写多线程代码。但说真的,我不明白我可以从哪里开始。我的头也在敲。请帮帮我。
我的任务是,
-
有一个长度为 1 的队列,称为
pending_tasks
,其中包含需要一些处理的任务。
还有另一个长度为 1 的队列,称为 completed_tasks
,其中包含完成处理并准备交付的任务。
我的实施思路,
-
首先制作两个阻塞队列,
pending_tasks
和completed_tasks
。
一个线程(生产者)总是监听来自外部的任务,如果被放入pending_tasks
。
一个线程(消费者)随时准备从pending_tasks
接收任务并开始处理,然后放入completed_tasks
。
然后再次来到pending_tasks
,每当有任何任务到来时,都开始相同的处理。
基本上,它是单一生产者-单一消费者问题。
我的困惑,
我知道它可以通过使用 ArrayBlockingQueue 和 Mutex 来编写代码。但我不明白我该如何开始。我对互斥锁有很好的了解,我从link 中了解了互斥锁,并且对阻塞队列也有很好的理解,因为我在这个网站上阅读了很多问题。
能否请您给我一些实现指导,以便我可以编写这个多线程代码。
我已经为此编写了一些代码,但这并没有达到我任务的最终目标。
提前致谢。期待您的友好回复。
编辑编号。 1
请看我下面的代码。此代码工作正常,但此代码缺少一个功能。请帮我补充一下,提供一些指导。
功能是,
-
当生产者线程将某个值放入pending_task 队列时,它会在那里等待一段时间。如果在那个时候消费者将结果提供给消费者,那么它就可以了。否则,它说超时,生产者获取另一个值并将其放入 pending_task 队列中,并且相同的进程开始。
请帮我添加上述功能。我认为我们必须在生产者线程和消费者线程之间进行通信,而线程通信是通过使用互斥体完成的(我认为)。请帮我实现同样的
我的代码,
多线程类
package multithread;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class MultiThread
public static BlockingQueue<Integer> pending_task;
public static BlockingQueue<Integer> completed_task;
public MultiThread(int length)
pending_task = new ArrayBlockingQueue<Integer>(length, true);
completed_task = new ArrayBlockingQueue<Integer>(length, true);
生产者类
package multithread;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Producer implements Runnable
@Override
public void run()
for (int i = 0; i < 10; i++)
try
System.out.println("PRODUCER: Try to put value " + i + " in the pending queue");
MultiThread.pending_task.put(i);
System.out.println("PRODUCER: Successfully put value " + i + " in the pending queue, now its turn to consumer");
catch (InterruptedException ex)
Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, ex);
消费类
package multithread;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Consumer implements Runnable
@Override
public void run()
for (int i = 0; i < 10; i++)
try
System.out.println("CONSUMER: Try to take value from the pending queue");
int val = MultiThread.pending_task.take();
System.out.println("CONSUMER: Successfully take value, and that is " + val);
System.out.println("CONSUMER: Processing starts");
Thread.sleep(1000);
System.out.println("CONSUMER: Processing ends");
System.out.println("CONSUMER: Try to put that that value in completed queue, and the value is " + val);
MultiThread.completed_task.put(val);
System.out.println("CONSUMER: Successfully put into completed queue");
//Serve this value to the corresponding user
catch (InterruptedException ex)
Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
DeliveryBoy 类
package multithread;
import java.util.logging.Level;
import java.util.logging.Logger;
public class DeliveryBoy implements Runnable
@Override
public void run()
for (int i = 0; i < 10; i++)
try
System.out.println("DELIVERYBOY: Waiting for the value near completed queue");
int val = MultiThread.completed_task.take();
System.out.println("DELIVERYBOY: Succesfully take value from completed queue and the vlue is " + val);
//Serve this value to the corresponding user
catch (InterruptedException ex)
Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
测试类
package multithread;
public class Test
public static void main(String[] args)
// TODO code application logic here
MultiThread ml = new MultiThread(1);
new Thread(new Producer()).start();
new Thread(new Consumer()).start();
new Thread(new DeliveryBoy()).start();
【问题讨论】:
除非您发布当前代码并围绕为什么不起作用提出问题,否则您的问题很可能会被关闭。目前,您的问题非常模糊和开放式,只能由为您编写完整内容的人来回答。 您的互斥链接相当旧。虽然您仍然可以对消费者/生产者模式使用等待/通知,但还有更有用和更高级别的解决方案。我建议你阅读docs.oracle.com/javase/tutorial/essential/concurrency。 @DuncanJones 我添加了有问题的代码。请看 @DuncanJones 请查看我的代码,并提供一些指导。 请参阅编辑号。 1,它显示了我想要添加的代码和一些功能。请高手给点建议。我怎样才能做到这一点。如果可能,请提供一些实施指南。 【参考方案1】:来自ArrayBlockingQueue#put
public void put(E e) 抛出 InterruptedException
在这个队列的尾部插入指定元素,等待 **如果队列已满,则可用空间
来自ArrayBlockingQueue#take
public E take() 抛出 InterruptedException
从接口复制的描述:BlockingQueue 检索和删除 此队列的头部,在必要时等待,直到元素变为 可用。
所以您需要做的就是从您的线程中调用这些方法。 试试这个(研究 javadoc),当你有更具体的问题时,你可以再问。
【讨论】:
你在回答中解释的事情我已经知道了。我觉得我在理论上已经很好了。但我无法将其转换为代码。我做了一些代码(见问题的更新部分),但是有很多问题,请帮我解决这些问题。是的,我也想在其中添加互斥锁概念。 这是个问题,我该如何实现这个,thia中缺少一些东西,请指导我,我如何在这里实现线程和互斥锁。 @devsda:看看这个:docs.oracle.com/javase/tutorial/essential/concurrency/… 根据这个页面,我必须为生产者、消费者创建不同的类?但实际上我希望这一切都在一个类的同一个模块中执行。我可以这样做吗? 请参阅编辑号。 1,它显示了我想要添加的代码和一些功能。请高手给点建议。我怎样才能做到这一点。如果可能,请提供一些实施指南。以上是关于使用 ArrayBlockingQueue 和 mutex 的多线程代码中的疑点的主要内容,如果未能解决你的问题,请参考以下文章
ArrayBlockingQueue 和LinkedBlockingQueue
ArrayBlockingQueue 和LinkedBlockingQueue
线程池的三种队列区别:SynchronousQueueLinkedBlockingQueue 和ArrayBlockingQueue
线程池的三种队列区别:SynchronousQueueLinkedBlockingQueue 和ArrayBlockingQueue