18.并发类容器MQ
Posted 永恒之心
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了18.并发类容器MQ相关的知识,希望对你有一定的参考价值。
package demo7.MQ;
public class QueueData {
private int id;
private String name;
private String taskCode;
public QueueData() {
}
public QueueData(int id, String name, String taskCode) {
this.id = id;
this.name = name;
this.taskCode = taskCode;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getTaskCode() {
return taskCode;
}
public void setTaskCode(String taskCode) {
this.taskCode = taskCode;
}
}
package demo7.MQ;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class Provider implements Runnable {
//共享缓存区
private BlockingQueue<QueueData> queue;
//多线程释放启动?
private volatile boolean isRunning = true;
//ID生成器
private static AtomicInteger count = new AtomicInteger();
//生产随机对象
private static Random random = new Random();
public Provider(BlockingQueue<QueueData> queue) {
this.queue = queue;
}
@Override
public void run() {
while (isRunning){
try {
//随机休眠 - 1000 表示读取数据、生产数据的耗时
Thread.sleep(random.nextInt(1000));
//incrementAndGet 进行累加
int id = count.incrementAndGet();
QueueData queueData = new QueueData(id,"任务"+String.valueOf(id),String.valueOf(id).hashCode()+"");
System.err.println("线程:"+Thread.currentThread().getName()+"\t生产task:"+queueData.getName()+"\t"+queueData.getId());
if (!queue.offer(queueData,2, TimeUnit.SECONDS)){
System.err.println("!!!!!!!!!生产数据失败 error");
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void stop(){
this.isRunning=false;
}
}
package demo7.MQ;
import java.util.Random;
import java.util.WeakHashMap;
import java.util.concurrent.BlockingQueue;
public class Consumer implements Runnable{
private BlockingQueue<QueueData> queue;
public Consumer(BlockingQueue<QueueData> queue) {
this.queue = queue;
}
private static Random random = new Random();
@Override
public void run() {
while (true){
try {
//take:无阻塞
QueueData queueData = this.queue.take();
Thread.sleep(random.nextInt(1000));
System.err.println("线程:"+Thread.currentThread().getName()+"\t消费task->:"+queueData.getName()+"\t"+queueData.getId());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
package demo7.MQ;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class MainMQ {
/**
* 生产者、消费者(多线程模式)
* 1.生产、消费:通常由2类线程,即若干了生产者的线程、若干个消费者的线程、
* 2.生产者线程负责提交用户请求、消费者线程负责处理生产者提交的任务请求
* 3.生产者、消费者之间通过共享内存缓存进行通信
*/
public static void main(String[] args) {
//1.内存缓存区
BlockingQueue<QueueData> queueData = new LinkedBlockingQueue<QueueData>();
//2.生产者
Provider p1 = new Provider(queueData);
Provider p2 = new Provider(queueData);
Provider p3 = new Provider(queueData);
//3.消费者
Consumer c1 = new Consumer(queueData);
Consumer c2 = new Consumer(queueData);
Consumer c3 = new Consumer(queueData);
//创建【线程池】运行,可以创建n个线程,没有任务的时候不创建线程,空闲线程存活时间为60s(默认)
ExecutorService executorService = Executors.newCachedThreadPool();
executorService.execute(p1);
executorService.execute(p2);
executorService.execute(p3);
executorService.execute(c1);
executorService.execute(c2);
executorService.execute(c3);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
p1.stop();
p2.stop();
p3.stop();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
executorService.shutdown();
}
}
以上是关于18.并发类容器MQ的主要内容,如果未能解决你的问题,请参考以下文章
[Java 并发编程实战] 同步容器类潜在的问题(含实例代码)