JAVA模拟生产者与消费者实例
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了JAVA模拟生产者与消费者实例相关的知识,希望对你有一定的参考价值。
模拟生产者与消费者实例,生产者生产一个产品,消费者就消费一个产品 ,然后生产者再生产,消费者再消费
JAVA代码是什么 希望给出代码 和解答的分析思路。。。
(1)本实验的多个缓冲区不是环形循环的,也不要求按顺序访问。生产者可以把产品放到目前某一个空缓冲区中。
(2)消费者只消费指定生产者的产品。
(3)在测试用例文件中指定了所有的生产和消费的需求,只有当共享缓冲区的数据满足了所有关于它的消费需求后,此共享缓冲区才可以作为空闲空间允许新的生产者使用。
(4)本实验在为生产者分配缓冲区时各生产者间必须互斥,此后各个生产者的具体生产活动可以并发。而消费者之间只有在对同一产品进行消费时才需要互斥,同时它们在消费过程结束时需要判断该消费对象是否已经消费完毕并清除该产品。
Windows
用来实现同步和互斥的实体。在Windows
中,常见的同步对象有:信号量(Semaphore)、
互斥量(Mutex)、临界段(CriticalSection)和事件(Event)等。本程序中用到了前三个。使用这些对象都分
为三个步骤,一是创建或者初始化:接着请求该同步对象,随即进入临界区,这一步对应于互斥量的
上锁;最后释放该同步对象,这对应于互斥量的解锁。这些同步对象在一个线程中创建,在其他线程
中都可以使用,从而实现同步互斥。当然,在进程间使用这些同步对象实现同步的方法是类似的。
1.用锁操作原语实现互斥
为解决进程互斥进人临界区的问题,可为每类临界区设置一把锁,该锁有打开和关闭两种状态,进程执行临界区程序的操作按下列步骤进行:
①关锁。先检查锁的状态,如为关闭状态,则等待其打开;如已打开了,则将其关闭,继续执行步骤②的操作。
②执行临界区程序。
③开锁。将锁打开,退出临界区。
2.信号量及WAIT,SIGNAL操作原语
信号量的初值可以由系统根据资源情况和使用需要来确定。在初始条件下信号量的指针项可以置为0,表示队列为空。信号量在使用过程中它的值是可变的,但只能由WAIT,SIGNAL操作来改变。设信号量为S,对S的WAIT操作记为WAIT(S),对它的SIGNAL操作记为SIGNAL(S)。
WAIT(S):顺序执行以下两个动作:
①信号量的值减1,即S=S-1;
②如果S≥0,则该进程继续执行;
如果
S(0,则把该进程的状态置为阻塞态,把相应的WAITCB连人该信号量队列的末尾,并放弃处理机,进行等待(直至其它进程在S上执行SIGNAL操作,把它释放出来为止)。
SIGNAL(S):顺序执行以下两个动作
①S值加
1,即
S=S+1;
②如果S)0,则该进程继续运行;
如果S(0则释放信号量队列上的第一个PCB(既信号量指针项所指向的PCB)所对应的进程(把阻塞态改为就绪态),执行SIGNAL操作的进程继续运行。
在具体实现时注意,WAIT,SIGNAL操作都应作为一个整体实施,不允许分割或相互穿插执行。也就是说,WAIT,SIGNAL操作各自都好像对应一条指令,需要不间断地做下去,否则会造成混乱。
从物理概念上讲,信号量S)时,S值表示可用资源的数量。执行一次WAIT操作意味着请求分配一个单位资源,因此S值减1;当S<0时,表示已无可用资源,请求者必须等待别的进程释放了该类资源,它才能运行下去。所以它要排队。而执行一次SIGNAL操作意味着释放一个单位资源,因此S值加1;若S(0时,表示有某些进程正在等待该资源,因而要把队列头上的进程唤醒,释放资源的进程总是可以运行下去的。
---------------
/**
*
生产者
*
*/
public
class
Producer
implements
Runnable
private
Semaphore
mutex,full,empty;
private
Buffer
buf;
String
name;
public
Producer(String
name,Semaphore
mutex,Semaphore
full,Semaphore
empty,Buffer
buf)
this.mutex
=
mutex;
this.full
=
full;
this.empty
=
empty;
this.buf
=
buf;
this.name
=
name;
public
void
run()
while(true)
empty.p();
mutex.p();
System.out.println(name+"
inserts
a
new
product
into
"+buf.nextEmptyIndex);
buf.nextEmptyIndex
=
(buf.nextEmptyIndex+1)%buf.size;
mutex.v();
full.v();
try
Thread.sleep(1000);
catch
(InterruptedException
e)
e.printStackTrace();
---------------
/**
*
消费者
*
*/
public
class
Customer
implements
Runnable
private
Semaphore
mutex,full,empty;
private
Buffer
buf;
String
name;
public
Customer(String
name,Semaphore
mutex,Semaphore
full,Semaphore
empty,Buffer
buf)
this.mutex
=
mutex;
this.full
=
full;
this.empty
=
empty;
this.buf
=
buf;
this.name
=
name;
public
void
run()
while(true)
full.p();
mutex.p();
System.out.println(name+"
gets
a
product
from
"+buf.nextFullIndex);
buf.nextFullIndex
=
(buf.nextFullIndex+1)%buf.size;
mutex.v();
empty.v();
try
Thread.sleep(1000);
catch
(InterruptedException
e)
e.printStackTrace();
-------------------------
/**
*
缓冲区
*
*/
public
class
Buffer
public
Buffer(int
size,int
nextEmpty,int
nextFull)
this.nextEmptyIndex
=
nextEmpty;
this.nextFullIndex
=
nextFull;
this.size
=
size;
public
int
size;
public
int
nextEmptyIndex;
public
int
nextFullIndex;
-----------------
/**
*
此类用来模拟信号量
*
*/
public
class
Semaphore
private
int
semValue;
public
Semaphore(int
semValue)
this.semValue
=
semValue;
public
synchronized
void
p()
semValue--;
if(semValue<0)
try
this.wait();
catch
(InterruptedException
e)
e.printStackTrace();
public
synchronized
void
v()
semValue++;
if(semValue<=0)
this.notify();
------------------------
public
class
Test
extends
Thread
public
static
void
main(String[]
args)
Buffer
bf=new
Buffer(10,0,0);
Semaphore
mutex=new
Semaphore(1);
Semaphore
full=new
Semaphore(0);
Semaphore
empty=new
Semaphore(10);
//new
Thread(new
Producer("p001",mutex,full,empty,bf)).start();
Producer
p=new
Producer("p001",mutex,full,empty,bf);
new
Thread(new
Producer("p002",mutex,full,empty,bf)).start();
new
Thread(new
Producer("p003",mutex,full,empty,bf)).start();
new
Thread(new
Producer("p004",mutex,full,empty,bf)).start();
new
Thread(new
Producer("p005",mutex,full,empty,bf)).start();
try
sleep(3000);
catch(Exception
ex)
ex.printStackTrace();
new
Thread(new
Customer("c001",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c002",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c003",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c004",mutex,full,empty,bf)).start();
new
Thread(new
Customer("c005",mutex,full,empty,bf)).start();
-------------------------------------------- 参考技术A package lly;
public class Box
private int value;
private boolean available = false;
public synchronized int get()
while (available == false)
try
// 等待生产者写入数据
wait();
catch (InterruptedException e)
// TODO: handle exception
e.printStackTrace();
available = false;
// 通知生产者数据已经被取走,可以再次写入数据
notifyAll();
return value;
public synchronized void put(int value)
while (available == true)
try
// 等待消费者取走数据
wait();
catch (InterruptedException e)
// TODO: handle exception
e.printStackTrace();
this.value = value;
available = true;
// 通知消费者可以来取数据
notifyAll();
package lly;
public class Consumer extends Thread
private Box box;
private String name;
public Consumer(Box b, String n)
box = b;
name = n;
public void run()
int value = 0;
for (int i = 1; i < 6; i++)
value = box.get();
System.out.println("Consumer " + name + " consumed: " + value);
try
sleep((int) (Math.random() * 10000));
catch (InterruptedException e)
// TODO: handle exception
e.printStackTrace();
package lly;
public class Producer extends Thread
private Box box;
private String name;
public Producer(Box b, String n)
box = b;
name = n;
public void run()
for (int i = 1; i < 6; i++)
box.put(i);
System.out.println("Producer " + name + " produced: " + i);
try
sleep((int) (Math.random() * 10000));
catch (InterruptedException e)
// TODO: handle exception
e.printStackTrace();
package lly;
public class ProducerConsumer
/**
* @param args
*/
public static void main(String[] args)
// TODO 自动生成方法存根
Box box = new Box();
Producer p = new Producer(box, "p");
Consumer c = new Consumer(box, "c");
p.start();
c.start();
创建一组“生产者”线程和一组“消费者”线程,并建立一个全局数组作为共享缓冲区。“生产者”向缓冲区输入数据,“消费者”从缓冲区读出数据。当缓冲区满时,“生产者”必须阻塞,等待“消费者”取走缓冲区数据后将其唤醒。当缓冲区空时,“消费者”阻塞,等待“生产者”生产了产品后将其唤醒。 参考技术B class Producter extends Thread
Queue q;
Producter (Queue q)
this.q=q;
public void run()
for(int i=0;i<10;i++)
q.put(i);
System.out.println("producter :"+i);
class Consumer extends Thread
Queue q;
Consumer(Queue q)
this.q=q;
public void run()
while(true)
System.out.println("Consumer:"+q.get());
class Queue
//key
int value;
boolean bFull=false;
public synchronized void put(int i)
if(!bFull)
value=i;
bFull=true;
notify();//必须用在synchronized
try
wait();//必须捕获异常
catch (InterruptedException e)
// TODO Auto-generated catch block
e.printStackTrace();
public synchronized int get()
if(!bFull)
try
wait();
catch (InterruptedException e)
// TODO Auto-generated catch block
e.printStackTrace();
bFull=false;
notify();
return value;
public class test //测试类
public static void main(String[] args)
Queue q=new Queue();
Producter p=new Producter(q);
Consumer c=new Consumer(q);
p.start();
c.start();
参考资料:注释少点,不过不难
本回答被提问者采纳 参考技术C public class Teststatic boolean falg=false;
public static void main(String[] args)
Producer p = new Producer();
p.product();
if(p.falg)
Consumer c=new Consumer();
c.consumer();
//生产者类
class Producer
String name="生产者";
boolean falg=false;
Producer()
public void product()
System.out.println("生产一个产品!");
falg=true;
//消费者类
class Consumer
String name="消费者";
Consumer()
public void consumer()
System.out.println("消费一个产品!");
Producer p=new Producer();
p.falg=false;
还要一直产生和消费下去用循环就行了 参考技术D 用线程来做
多线程操作实例——生产者与消费者
面对多线程学习生产者与消费者是最基本的实例
对于java后端开发的人员必须要掌握,还有考研考试计算机操作系统的同鞋。
下面是三个实例对于生产者与消费者的的例子,层层递进,逐步解决问题。
问题:生产者——设置信息名字name,和内容content
消费者——负责取出设置的信息。
一、基本实现
由于线程的不确定性可能出现以下问题:
(1)消费者取出的信息不匹配,即不是由同一个生产者设置的信息
(2)生产者生产了多个信息,消费者才开始取出信息,或消费者取出的重复的信息。
上面的问题下面会逐一解决,下面先看出现问题的程序:
package li.xin.hua.ch9; /*线程生产者与向消费者最基本实现,问题有: * 1、数据不匹配 * 2、数据重复取出已经取过的数据*/ class Info{ private String name; private String content; public String getName() { return name; } public void setName(String name) { this.name = name; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } }; class Producer implements Runnable{ private Info info=null; public Producer(Info info){ this.info=info; } public void run(){ boolean flag=false; for(int i=0;i<10;++i) { if(flag){ this.info.setName("胡歌"); try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } this.info.setContent("林殊"); flag=false; }else{ this.info.setName("刘涛"); try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } this.info.setContent("郡主"); flag=true; } } } }; class Consumer implements Runnable{ private Info info=null; public Consumer(Info info){ this.info=info; } public void run(){ for(int i=0;i<10;i++) { try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(this.info.getName()+"---饰演--->"+this.info.getContent()); } } }; public class Producer_Comsumer01 { public static void main(String[] args) { Info info=new Info(); Producer pro=new Producer(info); Consumer con=new Consumer(info); new Thread(pro).start(); new Thread(con).start(); } }
运行结果如下图:
发现胡歌不仅饰演林殊,还饰演郡主,哈哈哈哈哈哈!
问题是线程生产的信息取出时是不匹配的,解决方法使用同步机制——synchronized
二、加入同步机制
将设置名称与内容定义在一个同步方法中,代码如下:
package li.xin.hua.ch9; /*线程生产者与向消费者最基本实现 * 1、数据不匹配通过同步机制已经解决 * 2、重复取数据问题还是有*/ class Info02{ private String name; private String content; public synchronized void get() { try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(this.name+"---饰演--->"+this.content); } public synchronized void set(String name,String content) { this.name=name; try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } this.content = content; } }; class Producer02 implements Runnable{ private Info02 info=null; public Producer02(Info02 info){ this.info=info; } public void run(){ boolean flag=false; for(int i=0;i<10;++i) { if(flag){ this.info.set("胡歌","林殊"); flag=false; }else{ this.info.set("刘涛","郡主"); flag=true; } } } }; class Consumer02 implements Runnable{ private Info02 info=null; public Consumer02(Info02 info){ this.info=info; } public void run(){ for(int i=0;i<10;i++) { try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } this.info.get(); } } }; public class Producer_Comsumer02 { public static void main(String[] args) { Info02 info=new Info02(); Producer02 pro=new Producer02(info); Consumer02 con=new Consumer02(info); new Thread(pro).start(); new Thread(con).start(); } }
运行结果如下图:
胡歌与刘涛饰演的角色没有匹配错误,但信息反复取出,需要Object类中的方法来解决。
三、加入等待唤醒机制
Object类中wait()、notify()方法,扩充点知识:wai()方法会释放线程的对象的锁,而sleep()方法不会。
设置一个标志位flag,
当flag为true时:
可以进行生产,但不能取出数据,若此时消费者线程恰巧抢到CPU资源,想要执行消费者程序,
必须将消费者线程等待wait()。生产者生产完成后要修改标示位(表示可以消费者可以取出信息了),和唤醒notify()被等待的线程。
当flag为false时:
消费者可以取出信息,但生产者不能生产信息,若此时生产者线程恰巧抢到CPU资源,想要执行生产者程序,
必须将生产者线程等待wait()。消费者完成取出信息后要修改标示位(表示可以生产者可以生产信息了),和唤醒notify()被等待的线程。
package li.xin.hua.ch9; /*线程生产者与向消费者最基本实现 * 1、数据不匹配通过同步机制已经解决 * 2、重复取数据问题通过等待唤醒机制已经解决 * 当flag为true时允许生产者生产,若此时消费者进入则要等待 * 当flag为false时允许消费者取出信息,若此时生产者进入则要等待*/ class Info03{ private String name; private String content; private boolean flag=true; /* 设置标示位:true是生产的时间,false是消费的时间。 第一次先生产*/ public synchronized void set(String name,String content) { if(!flag) /*现在不是生产的时间,线程要等待,唤醒后才能生产。*/ { try { super.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } this.name=name; try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } this.content = content; flag=false; super.notify(); } public synchronized void get() { if(flag) /*消费者*/ { try { super.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(this.name+"---饰演--->"+this.content); flag=true; super.notify(); } }; class Producer03 implements Runnable{ private Info03 info=null; public Producer03(Info03 info){ this.info=info; } public void run(){ boolean flag=false; for(int i=0;i<10;++i) { if(flag){ this.info.set("胡歌","林殊"); flag=false; }else{ this.info.set("刘涛","郡主"); flag=true; } } } }; class Consumer03 implements Runnable{ private Info03 info=null; public Consumer03(Info03 info){ this.info=info; } public void run(){ for(int i=0;i<10;i++) { try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); } this.info.get(); } } }; public class Producer_Comsumer03 { public static void main(String[] args) { Info03 info=new Info03(); Producer03 pro=new Producer03(info); Consumer03 con=new Consumer03(info); new Thread(pro).start(); new Thread(con).start(); } }
运行结果如下图:
胡歌与刘涛交替出现,并且角色匹配正确。
以上是关于JAVA模拟生产者与消费者实例的主要内容,如果未能解决你的问题,请参考以下文章
使用Java模拟消费者是如何消费rabbitMQ消息队列中的消息的