生产者消费者使用线程
Posted
技术标签:
【中文标题】生产者消费者使用线程【英文标题】:Producer Consumer using threads 【发布时间】:2013-11-10 21:29:05 【问题描述】:我正在编写一个使用多线程概念在 Java 中实现生产者消费者问题的程序。以下是我应该如何做的一些细节:
1) 主线程应该创建一个容量指定为命令行参数的缓冲区。生产者和消费者线程的数量也被指定为命令行参数。我应该为每个生产者和消费者线程分配一个唯一的编号。如何为生产者和消费者线程分配唯一编号?
2) 生产者线程在无限循环中运行。它产生一个具有以下格式的数据项(一个字符串):<producer number>_<data item number>
。例如,来自线程号 1 的第一个数据项将是 1_1,来自线程号 3 的第二个数据项将是 3_2。如何创建这种格式的数据项?
3) 然后生产者线程将一个条目写入生产者日志文件(“已生成”<data item>
)。在写入日志条目时,它会尝试插入缓冲区。如果插入成功,它会在日志文件中创建一个条目(<producer number> <data item>
“插入成功”)。这样的代码怎么写?
以下是我编写的 Java 代码。
import java.util.*;
import java.util.logging.*;
public class PC2
public static void main(String args[])
ArrayList<Integer> queue = new ArrayList<Integer>();
int size = Integer.parseInt(args[2]);
Thread[] prod = new Thread[Integer.parseInt(args[0])];
Thread[] cons = new Thread[Integer.parseInt(args[1])];
for(int i=0; i<prod.length; i++)
prod[i] = new Thread(new Producer(queue, size));
prod[i].start();
for(int i=0; i<cons.length; i++)
cons[i] = new Thread(new Consumer(queue, size));
cons[i].start();
class Producer extends Thread
private final ArrayList<Integer> queue;
private final int size;
public Producer(ArrayList<Integer> queue, int size)
this.queue = queue;
this.size = size;
public void run()
while(true)
for(int i=0; i<size; i++)
System.out.println("Produced: "+i+" by id " +Thread.currentThread().getId());
try
produce(i);
Thread.sleep(3000);
catch(Exception e)
Logger.getLogger(Producer.class.getName()).log(Level.SEVERE, null, e);
public void produce(int i) throws InterruptedException
while(queue.size() == size)
synchronized(queue)
System.out.println("Queue is full "+Thread.currentThread().getName() +" is waiting, size: "+queue.size());
queue.wait();
synchronized(queue)
queue.add(i);
queue.notifyAll();
class Consumer extends Thread
private final ArrayList<Integer> queue;
private final int size;
public Consumer(ArrayList<Integer> queue, int size)
this.queue = queue;
this.size = size;
public void run()
while(true)
try
System.out.println("Consumed: "+consume());
Thread.sleep(1000);
catch(Exception e)
Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, e);
public int consume() throws InterruptedException
while(queue.isEmpty())
synchronized(queue)
System.out.println("Queue is empty "+Thread.currentThread().getName()+" is waiting, size: "+queue.size());
queue.wait();
synchronized (queue)
queue.notifyAll();
System.out.println("Consumed by id "+Thread.currentThread().getId());
return (Integer) queue.remove(0);
如何执行上述步骤?
【问题讨论】:
【参考方案1】:public class ProducerConsumerTest
public static void main(String[] args)
CubbyHole c = new CubbyHole();
Producer p1 = new Producer(c, 1);
Consumer c1 = new Consumer(c, 1);
p1.start();
c1.start();
class CubbyHole
private int contents;
private boolean available = false;
public synchronized int get()
while (available == false)
try
wait();
catch (InterruptedException e)
available = false;
notifyAll();
return contents;
public synchronized void put(int value)
while (available == true)
try
wait();
catch (InterruptedException e)
contents = value;
available = true;
notifyAll();
class Consumer extends Thread
private CubbyHole cubbyhole;
private int number;
public Consumer(CubbyHole c, int number)
cubbyhole = c;
this.number = number;
public void run()
int value = 0;
for (int i = 0; i < 10; i++)
value = cubbyhole.get();
System.out.println("Consumer #"
+ this.number
+ " got: " + value);
class Producer extends Thread
private CubbyHole cubbyhole;
private int number;
public Producer(CubbyHole c, int number)
cubbyhole = c;
this.number = number;
public void run()
for (int i = 0; i < 10; i++)
cubbyhole.put(i);
System.out.println("Producer #" + this.number
+ " put: " + i);
try
sleep((int) (Math.random() * 100));
catch (InterruptedException e)
【讨论】:
我看到了同样的答案tutorialspoint.com/javaexamples/thread_procon.htm【参考方案2】:请参考以下代码。您可以根据命令行参数更改常量值。我已经测试了代码,它可以按照您的要求工作。
import java.util.LinkedList;
import java.util.Queue;
public class ProducerConsumerProblem
public static int CAPACITY = 10; // At a time maximum of 10 tasks can be
// produced.
public static int PRODUCERS = 2;
public static int CONSUMERS = 4;
public static void main(String args[])
Queue<String> mTasks = new LinkedList<String>();
for (int i = 1; i <= PRODUCERS; i++)
Thread producer = new Thread(new Producer(mTasks));
producer.setName("Producer " + i);
producer.start();
for (int i = 1; i <= CONSUMERS; i++)
Thread consumer = new Thread(new Consumer(mTasks));
consumer.setName("Consumer " + i);
consumer.start();
class Producer implements Runnable
Queue<String> mSharedTasks;
int taskCount = 1;
public Producer(Queue<String> mSharedTasks)
super();
this.mSharedTasks = mSharedTasks;
@Override
public void run()
while (true)
synchronized (mSharedTasks)
try
if (mSharedTasks.size() == ProducerConsumerProblem.CAPACITY)
System.out.println("Producer Waiting!!");
mSharedTasks.wait();
catch (InterruptedException e)
e.printStackTrace();
while (mSharedTasks.size() != ProducerConsumerProblem.CAPACITY)
try
Thread.sleep(50);
catch (InterruptedException e)
String produceHere = Thread.currentThread().getName()
+ "_Item number_" + taskCount++;
synchronized (mSharedTasks)
mSharedTasks.add(produceHere);
System.out.println(produceHere);
if (mSharedTasks.size() == 1)
mSharedTasks.notifyAll(); // Informs consumer that there
// is something to consume.
class Consumer implements Runnable
Queue<String> mSharedTasks;
public Consumer(Queue<String> mSharedTasks)
super();
this.mSharedTasks = mSharedTasks;
@Override
public void run()
while (true)
synchronized (mSharedTasks)
if (mSharedTasks.isEmpty()) // Checks whether there is no task
// to consume.
try
mSharedTasks.wait(); // Waits for producer to produce!
catch (InterruptedException e)
// TODO Auto-generated catch block
e.printStackTrace();
while (!mSharedTasks.isEmpty()) // Consumes till task list is
// empty
try
// Consumer consumes late hence producer has to wait...!
Thread.sleep(100);
catch (InterruptedException e)
// TODO Auto-generated catch block
e.printStackTrace();
synchronized (mSharedTasks)
System.out.println(Thread.currentThread().getName()
+ " consumed " + mSharedTasks.poll());
if (mSharedTasks.size() == ProducerConsumerProblem.CAPACITY - 1)
mSharedTasks.notifyAll();
【讨论】:
【参考方案3】:我尝试了以下可能对您有用的方法,除了 3 的缓冲区条件,您可以自己添加部分代码。 希望这会有所帮助。
public class Message
private String msg;
public Message(String msg)
super();
this.msg = msg;
public String getMsg()
return msg;
import java.util.concurrent.BlockingQueue;
public class Producer implements Runnable
private BlockingQueue<Message> queue;
private boolean run = true;
public Producer(BlockingQueue<Message> queue)
super();
this.queue = queue;
public void setRun(boolean val)
this.run = val;
@Override
public void run()
int i = 0;
while (run)
Message msg = new Message(Thread.currentThread().getName() + "_"+ i);
try
Thread.sleep(i * 100);
queue.put(msg);
System.out.println("Producer: "+Thread.currentThread().getName()+" produced and added to the queue: "+msg.getMsg());
catch (InterruptedException e)
e.printStackTrace();
i++;
if(i==10)
setRun(false);
System.out.println(Thread.currentThread().getName()+" stopped");
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable
private BlockingQueue<Message> queue;
private boolean run = true;
public Consumer(BlockingQueue<Message> queue)
super();
this.queue = queue;
public void setRun(boolean val)
this.run = val;
@Override
public void run()
while(run)
try
Thread.sleep(100);
Message msg = queue.take();
System.out.println("Consumer: "+Thread.currentThread().getName()+" generated/consumed "+msg.getMsg());
catch (InterruptedException e)
// TODO Auto-generated catch block
e.printStackTrace();
import java.util.Scanner;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class ProducerConsumerMain
public static void main(String[] args)
System.out
.println("please enter the number of producer:consumer:size of the queue in order");
Scanner scan = new Scanner(System.in);
Thread[] prodThreads = new Thread[scan.nextInt()];
Thread[] consThreads = new Thread[scan.nextInt()];
BlockingQueue<Message> queue = new ArrayBlockingQueue<Message>(scan.nextInt());
for (int i = 0; i < prodThreads.length; i++)
prodThreads[i] = new Thread(new Producer(queue), "" + i);
prodThreads[i].start();
for (int i = 0; i < consThreads.length; i++)
consThreads[i] = new Thread(new Consumer(queue), "" + i);
consThreads[i].start();
【讨论】:
【参考方案4】:对于生产者消费者问题,最好的解决方案是 BlockingQueue。我正在测试一些东西,因此设计了相同类型的程序,现在根据您的需要对其进行了修改。
看看有没有帮助。
import java.util.concurrent.*;
public class ThreadingExample
public static void main(String args[])
BlockingQueue<Message> blockingQueue = new ArrayBlockingQueue<Message>(100);
ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Producer(blockingQueue));
exec.execute(new Consumer(blockingQueue));
class Message
private static int count=0;
int messageId;
Message()
this.messageId=count++;
System.out.print("message Id"+messageId+" Created ");
class Producer implements Runnable
private BlockingQueue<Message> blockingQueue;
Producer(BlockingQueue<Message> blockingQueue)
this.blockingQueue=blockingQueue;
@Override
public void run()
while(!Thread.interrupted())
System.out.print("Producer Started");
try
blockingQueue.put(new Message());
catch (InterruptedException e)
e.printStackTrace();
System.out.println("Producer Done");
class Consumer implements Runnable
private BlockingQueue<Message> blockingQueue;
Consumer(BlockingQueue<Message> blockingQueue)
this.blockingQueue=blockingQueue;
@Override
public void run()
while(!Thread.interrupted())
System.out.print("Concumer Started");
try
Message message = blockingQueue.take();
System.out.print("message Id"+message.messageId+" Consumed ");
catch(InterruptedException e)
e.printStackTrace();
System.out.println("Concumer Done");
【讨论】:
BlockingQueue 线程安全吗?因为我不应该使用任何线程安全的数据结构。 有人可以帮我吗? (对不起,如果我在推动) @user2201650 回答这个问题***.com/questions/2695426/…【参考方案5】:我应该为每个生产者和消费者分配一个唯一的编号 线。如何为生产者和消费者分配唯一编号 线程?
向生产者/消费者类添加一个实例(非静态)变量。初始化新的生产者/消费者对象时,传入唯一编号。您可以在主课中使用int counter
跟踪您的号码。
2) 生产者线程在无限循环中运行。它产生一个 数据项(字符串),格式如下:_ .例如线程号的第一个数据项 1 将是 1_1,来自线程号 3 的第二个数据项将是 3_2。 如何创建这种格式的数据项?
使用同步方法和/或原子变量。查看 Java Concurrency。
3) 然后Producer线程向Producer日志文件中写入一个条目 (“生成”)。在写日志时 entry,它会尝试插入缓冲区。如果插入是 成功,它会在日志文件中创建一个条目( “插入成功”)。这样的代码怎么写?
我的回答和上一个问题一样:阅读 Java 并发。花一个小时阅读有关同步、锁和原子变量的知识,我保证您会轻松编写程序。
【讨论】:
我不应该为这个程序使用任何线程安全的数据结构。 @user2201650 看看***.com/questions/1006655/… 。考虑是否创建一个“int index”并使用它来访问您的队列;例如“queue.get(index++)”。你认为会发生什么? 我指的是 Jon Skeet 的回答。他清楚地表示对原始类型(如 int)的操作在 Java 中是原子的。因此可以保证,如果一个线程执行 queue.get(index++),则 index 的新值将立即对所有其他线程可见。因此两个线程永远不会从队列中获得相同的对象。 我是 Java 编程的初学者。那你能帮帮我吗? 这不是一个做你的家庭作业的网站。到目前为止我给你的帮助很容易解决你的问题。对不起:(以上是关于生产者消费者使用线程的主要内容,如果未能解决你的问题,请参考以下文章