18.并发类容器MQ

Posted 永恒之心

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了18.并发类容器MQ相关的知识,希望对你有一定的参考价值。


  1. package demo7.MQ;
  2. public class QueueData {
  3. private int id;
  4. private String name;
  5. private String taskCode;
  6. public QueueData() {
  7. }
  8. public QueueData(int id, String name, String taskCode) {
  9. this.id = id;
  10. this.name = name;
  11. this.taskCode = taskCode;
  12. }
  13. public int getId() {
  14. return id;
  15. }
  16. public void setId(int id) {
  17. this.id = id;
  18. }
  19. public String getName() {
  20. return name;
  21. }
  22. public void setName(String name) {
  23. this.name = name;
  24. }
  25. public String getTaskCode() {
  26. return taskCode;
  27. }
  28. public void setTaskCode(String taskCode) {
  29. this.taskCode = taskCode;
  30. }
  31. }
  1. package demo7.MQ;
  2. import java.util.Random;
  3. import java.util.concurrent.BlockingQueue;
  4. import java.util.concurrent.TimeUnit;
  5. import java.util.concurrent.atomic.AtomicInteger;
  6. public class Provider implements Runnable {
  7. //共享缓存区
  8. private BlockingQueue<QueueData> queue;
  9. //多线程释放启动?
  10. private volatile boolean isRunning = true;
  11. //ID生成器
  12. private static AtomicInteger count = new AtomicInteger();
  13. //生产随机对象
  14. private static Random random = new Random();
  15. public Provider(BlockingQueue<QueueData> queue) {
  16. this.queue = queue;
  17. }
  18. @Override
  19. public void run() {
  20. while (isRunning){
  21. try {
  22. //随机休眠 - 1000 表示读取数据、生产数据的耗时
  23. Thread.sleep(random.nextInt(1000));
  24. //incrementAndGet 进行累加
  25. int id = count.incrementAndGet();
  26. QueueData queueData = new QueueData(id,"任务"+String.valueOf(id),String.valueOf(id).hashCode()+"");
  27. System.err.println("线程:"+Thread.currentThread().getName()+"\t生产task:"+queueData.getName()+"\t"+queueData.getId());
  28. if (!queue.offer(queueData,2, TimeUnit.SECONDS)){
  29. System.err.println("!!!!!!!!!生产数据失败 error");
  30. }
  31. } catch (InterruptedException e) {
  32. e.printStackTrace();
  33. }
  34. }
  35. }
  36. public void stop(){
  37. this.isRunning=false;
  38. }
  39. }
  1. package demo7.MQ;
  2. import java.util.Random;
  3. import java.util.WeakHashMap;
  4. import java.util.concurrent.BlockingQueue;
  5. public class Consumer implements Runnable{
  6. private BlockingQueue<QueueData> queue;
  7. public Consumer(BlockingQueue<QueueData> queue) {
  8. this.queue = queue;
  9. }
  10. private static Random random = new Random();
  11. @Override
  12. public void run() {
  13. while (true){
  14. try {
  15. //take:无阻塞
  16. QueueData queueData = this.queue.take();
  17. Thread.sleep(random.nextInt(1000));
  18. System.err.println("线程:"+Thread.currentThread().getName()+"\t消费task->:"+queueData.getName()+"\t"+queueData.getId());
  19. } catch (InterruptedException e) {
  20. e.printStackTrace();
  21. }
  22. }
  23. }
  24. }
  1. package demo7.MQ;
  2. import java.util.concurrent.BlockingQueue;
  3. import java.util.concurrent.ExecutorService;
  4. import java.util.concurrent.Executors;
  5. import java.util.concurrent.LinkedBlockingQueue;
  6. public class MainMQ {
  7. /**
  8. * 生产者、消费者(多线程模式)
  9. * 1.生产、消费:通常由2类线程,即若干了生产者的线程、若干个消费者的线程、
  10. * 2.生产者线程负责提交用户请求、消费者线程负责处理生产者提交的任务请求
  11. * 3.生产者、消费者之间通过共享内存缓存进行通信
  12. */
  13. public static void main(String[] args) {
  14. //1.内存缓存区
  15. BlockingQueue<QueueData> queueData = new LinkedBlockingQueue<QueueData>();
  16. //2.生产者
  17. Provider p1 = new Provider(queueData);
  18. Provider p2 = new Provider(queueData);
  19. Provider p3 = new Provider(queueData);
  20. //3.消费者
  21. Consumer c1 = new Consumer(queueData);
  22. Consumer c2 = new Consumer(queueData);
  23. Consumer c3 = new Consumer(queueData);
  24. //创建【线程池】运行,可以创建n个线程,没有任务的时候不创建线程,空闲线程存活时间为60s(默认)
  25. ExecutorService executorService = Executors.newCachedThreadPool();
  26. executorService.execute(p1);
  27. executorService.execute(p2);
  28. executorService.execute(p3);
  29. executorService.execute(c1);
  30. executorService.execute(c2);
  31. executorService.execute(c3);
  32. try {
  33. Thread.sleep(2000);
  34. } catch (InterruptedException e) {
  35. e.printStackTrace();
  36. }
  37. p1.stop();
  38. p2.stop();
  39. p3.stop();
  40. try {
  41. Thread.sleep(1000);
  42. } catch (InterruptedException e) {
  43. e.printStackTrace();
  44. }
  45. executorService.shutdown();
  46. }
  47. }





以上是关于18.并发类容器MQ的主要内容,如果未能解决你的问题,请参考以下文章

[Java 并发编程实战] 同步容器类潜在的问题(含实例代码)

[Java 并发编程实战] 同步容器类潜在的问题(含实例代码)

并发类容器-第一讲

并发编程:同步类容器与并发类容器

消息队列(MQ)

同步类容器和并发类容器