Java多线程Master-Worker模式
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java多线程Master-Worker模式相关的知识,希望对你有一定的参考价值。
Java多线程Master-Worker模式,多适用于需要大量重复工作的场景中。
例如:使用Master-Worker计算0到100所有数字的立方的和
1.Master接收到100个任务,每个任务需要0到100中每个数字的立方,这里为了效果,每个任务再sleep一秒,
Master需要将这些任务放到一个支持高并发的非阻塞队列queue中如:ConcurrentLinkedQueue<E>。
2.Master创建10个worker去执行这100个任务,并准备一个支持高并发且线程安全的hashMap作为结果集的容器如:ConcurrentHashMap。
3.每个worker需要循环的从queue中获取任务然后执行,执行完毕后把结果放到hashMap中,直到queue为空,所有任务执行完毕后退出。
4.Master循环判断结果集hashMap中是否有已经执行完毕的结果,如果有就使用,使用完毕就立即移除该结果,直到所有的线程都退出。
5.所有任务执行完毕,Master也处理完所有任务的结果,程序结束
Master不需要等待所有的任务执行完毕就可以处理已完成的任务结果,Master和worker可以同时进行工作,这样节省了大量等待worker执行结束的时间
Master源码
package masterWorker; import java.lang.Thread.State; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.Queue; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import org.junit.Test; public class Master { public static void main(String[] args) { long befor = System.currentTimeMillis(); //任务队列 Queue<Integer> taskQueue = new ConcurrentLinkedQueue<>(); //工人map Map<String,Thread> workers = new HashMap<>(); //结果集 Map<String,Long> resultMap = new ConcurrentHashMap<>(); //最终结果 long result = 0; //提交100个任务 for (int i = 0; i < 100; i++) { taskQueue.add(i); } //添加10个工人 for (int i = 0; i < 10; i++) { workers.put(i+"", new Thread(new Worker(taskQueue,resultMap),"工人"+i)); } //启动所有工人线程 Collection<Thread> threads = workers.values(); for (Thread thread : threads) { thread.start(); } while(resultMap.size() > 0 || !isComplete(workers)){ Set<String> keySet = resultMap.keySet(); //每次从resultMap中取一个结果出来进行使用 String key = null; for (String string : keySet) { if(string != null){ key = string; break; } } Long value = null; if(key != null){ value = resultMap.get(key); } //能取到结果就使用,没有结果继续循环 if(value != null){ //获取到一个运算结果就使用 result = result+value; //使用后从结果集中移除 resultMap.remove(key); } } long after = System.currentTimeMillis(); System.out.println("结果耗时:"+(after - befor)); System.out.println(result); } /** * 判断所有的工人是否已经完成工作 * @param workers * @return */ private static boolean isComplete(Map<String,Thread> workers){ for (Entry<String, Thread> entry : workers.entrySet()) { if(entry.getValue().getState() != State.TERMINATED){ return false; } } return true; } @Test public void test() throws InterruptedException{ long befor = System.currentTimeMillis(); long result = 0; for (int i = 0; i < 100; i++) { long cube = i*i*i; result = result+cube; Thread.sleep(100); } long after = System.currentTimeMillis(); System.out.println("结果耗时:"+(after - befor)); System.out.println(result); } }
Worker源码
package masterWorker; import java.util.Map; import java.util.Queue; public class Worker implements Runnable{ private Queue<Integer> queue; private Map<String,Long> resultMap; public Worker(Queue<Integer> queue,Map<String,Long> resultMap) { this.queue = queue; this.resultMap = resultMap; } @Override public void run() { //不断循环从队列中取出任务进行运算,直到队列为空 while(true){ if(queue.peek() != null){ String name = Thread.currentThread().getName(); int poll =(int) queue.poll(); long result = poll*poll*poll; try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } resultMap.put(poll+"", result); System.out.println(name+"完成"+poll+"的运算,结果为:"+result); }else{ break; } } } }
经典的Master-Worker源码实例
package masterWorker.classics; import java.util.Map; import java.util.Set; public class Main { /** * @param args */ public static void main(String[] args) { //固定使用5个Worker,并指定Worker Master m = new Master(new PlusWorker(), Runtime.getRuntime().availableProcessors()); //提交100个子任务 for(int i=0;i<100;i++){ m.submit(i); } //开始计算 m.execute(); int re= 0; //保存最终结算结果 Map<String ,Object> resultMap =m.getResultMap(); //不需要等待所有Worker都执行完成,即可开始计算最终结果 while(resultMap.size()>0 || !m.isComplete()){ Set<String> keys = resultMap.keySet(); String key =null; for(String k:keys){ key=k; break; } Integer i =null; if(key!=null){ i=(Integer)resultMap.get(key); } if(i!=null){ //最终结果 re+=i; } if(key!=null){ //移除已经被计算过的项 resultMap.remove(key); } } System.out.println(re); } } package masterWorker.classics; import java.util.Map; import java.util.Queue; public class Worker implements Runnable{ //任务队列,用于取得子任务 protected Queue<Object> workQueue; //子任务处理结果集 protected Map<String ,Object> resultMap; public void setWorkQueue(Queue<Object> workQueue){ this.workQueue= workQueue; } public void setResultMap(Map<String ,Object> resultMap){ this.resultMap=resultMap; } //子任务处理的逻辑,在子类中实现具体逻辑 public Object handle(Object input){ return input; } @Override public void run() { while(true){ //获取子任务 Object input= workQueue.poll(); if(input==null){ break; } //处理子任务 Object re = handle(input); resultMap.put(Integer.toString(input.hashCode()), re); } } } package masterWorker.classics; public class PlusWorker extends Worker { @Override public Object handle(Object input) { Integer i =(Integer)input; return i*i*i; } } package masterWorker.classics; import java.util.HashMap; import java.util.Map; import java.util.Queue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; public class Master { // 任务队列 protected Queue<Object> workQueue = new ConcurrentLinkedQueue<Object>(); // Worker进程队列 protected Map<String, Thread> threadMap = new HashMap<String, Thread>(); // 子任务处理结果集 protected Map<String, Object> resultMap = new ConcurrentHashMap<String, Object>(); // 是否所有的子任务都结束了 public boolean isComplete() { for (Map.Entry<String, Thread> entry : threadMap.entrySet()) { if (entry.getValue().getState() != Thread.State.TERMINATED) { return false; } } return true; } // Master的构造,需要一个Worker进程逻辑,和需要Worker进程数量 public Master(Worker worker, int countWorker) { worker.setWorkQueue(workQueue); worker.setResultMap(resultMap); for (int i = 0; i < countWorker; i++) { threadMap.put(Integer.toString(i), new Thread(worker, Integer.toString(i))); } } // 提交一个任务 public void submit(Object job) { workQueue.add(job); } // 返回子任务结果集 public Map<String, Object> getResultMap() { return resultMap; } // 开始运行所有的Worker进程,进行处理 public void execute() { for (Map.Entry<String, Thread> entry : threadMap.entrySet()) { entry.getValue().start(); } } }
以上是关于Java多线程Master-Worker模式的主要内容,如果未能解决你的问题,请参考以下文章