线程设计模式之Master-worker
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程设计模式之Master-worker相关的知识,希望对你有一定的参考价值。
一、线程设计模式之Master-Worker模式
1.装载任务支持多个worker同事读取N个任务,支持并发。ConcurentLinkedList.
2.装载Woker的容器,不需要考虑并发,使用HashMap<String,Thread>
3.装载Worker返回结果集容器。concurrentHashMap<String, Object>盛装每一个任务返回的结果集。
Master
package com.cmos.ngkm.web.controller.basic; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; public class Master { //需要一个盛装任务的集合 private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<>(); //使用HashMap去盛装所有Worker对象 private HashMap<String,Thread> workers = new HashMap<String,Thread>(); //使用ConcurrentHashMap盛装每一个worker的结果集 private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>(); public ConcurrentHashMap<String, Object> getResultMap() { return resultMap; } public Master(Worker worker, int workCount) { worker.setTaskQueue(workQueue); worker.setResultMap(resultMap); for(int i = 0;i<workCount;i++) { workers.put("子节点"+Integer.toString(i),new Thread(worker) ); } } //提交方法 public void submit(Task task) { workQueue.offer(task); } //需要一个执行方法,让所有worker工作 public void execute() { for(Map.Entry<String, Thread> me:workers.entrySet()) { me.getValue().start(); } } //判断线程是否结束 public boolean isComplete() { for(Map.Entry<String, Thread> me:workers.entrySet()) { if(me.getValue().getState()!=Thread.State.TERMINATED) { return false; } } return true; } }
Worker
package com.cmos.ngkm.web.controller.basic; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; public class Worker implements Runnable { //返回结果集 private ConcurrentHashMap<String, Object> resultMap; //传递任务 private ConcurrentLinkedQueue<Task> taskQueue; public void setResultMap(ConcurrentHashMap<String, Object> resultMap) { this.resultMap = resultMap; } public void setTaskQueue(ConcurrentLinkedQueue<Task> taskQueue) { this.taskQueue = taskQueue; } @Override public void run() { while(true) { Task task = taskQueue.poll(); if(task == null) { break; } //真正做业务处理 Object obj = handel(task); this.resultMap.put(task.getTaskId(),obj); } } public Object handel(Task task) { try{ Thread.sleep(300); System.out.println(task.getTaskId()+"正在执行"); return task.getTaskName(); }catch(Exception e) { e.printStackTrace(); } return null; } }
package com.cmos.ngkm.web.controller.basic; public class Task { private String taskName; private String taskId; public String getTaskId() { return taskId; } public String getTaskName() { return taskName; } public void setTaskName(String taskName) { this.taskName = taskName; } public void setTaskId(String taskId) { this.taskId = taskId; } }
MainTest
package com.cmos.ngkm.web.controller.basic; import java.util.concurrent.ConcurrentHashMap; public class MainTest { public static void main(String[] args) { Master master = new Master(new Worker(),10); for(int i=0;i<100;i++) { Task task = new Task(); task.setTaskId("taskid"+Integer.toString(i)); task.setTaskName("taskName"+"i"); master.submit(task); } master.execute(); while(true) { if(master.isComplete()) { ConcurrentHashMap<String, Object> resultMap = master.getResultMap(); System.out.println(resultMap.size()); System.out.println("任务已经完成"); break; } } } }
package com.cmos.ngkm.web.controller.basic;
import java.util.concurrent.ConcurrentHashMap;
public class MainTest {
public static void main(String[] args) {
Master master = new Master(new Worker(),10);
for(int i=0;i<100;i++)
{
Task task = new Task();
task.setTaskId("taskid"+Integer.toString(i));
task.setTaskName("taskName"+"i");
master.submit(task);
}
master.execute();
while(true)
{
if(master.isComplete())
{
ConcurrentHashMap<String, Object> resultMap = master.getResultMap();
System.out.println(resultMap.size());
System.out.println("任务已经完成");
break;
}
}
}
}
以上是关于线程设计模式之Master-worker的主要内容,如果未能解决你的问题,请参考以下文章