线程设计模式之Master-worker

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程设计模式之Master-worker相关的知识,希望对你有一定的参考价值。

、线程设计模式之Master-Worker模式

技术分享图片

1.装载任务支持多个worker同事读取N个任务,支持并发。ConcurentLinkedList.

2.装载Woker的容器,不需要考虑并发,使用HashMap<String,Thread>

3.装载Worker返回结果集容器。concurrentHashMap<String, Object>盛装每一个任务返回的结果集。

                                                                                                     Master

 

package com.cmos.ngkm.web.controller.basic;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Master {
    //需要一个盛装任务的集合
    private ConcurrentLinkedQueue<Task> workQueue = new ConcurrentLinkedQueue<>();
    //使用HashMap去盛装所有Worker对象
    private HashMap<String,Thread> workers = new HashMap<String,Thread>();
    //使用ConcurrentHashMap盛装每一个worker的结果集
    private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();

    public ConcurrentHashMap<String, Object> getResultMap() {
        return resultMap;
    }

    public Master(Worker worker, int workCount)
    {
        worker.setTaskQueue(workQueue);
        worker.setResultMap(resultMap);
        for(int i = 0;i<workCount;i++)
        {
            workers.put("子节点"+Integer.toString(i),new Thread(worker) );
        }
    }
  //提交方法
    public void submit(Task task)
    {
        workQueue.offer(task);
    }
 //需要一个执行方法,让所有worker工作
    public void execute()
    {
        for(Map.Entry<String, Thread> me:workers.entrySet())
        {
             me.getValue().start();
        }
    }
    //判断线程是否结束
    public boolean isComplete()
    {
        for(Map.Entry<String, Thread> me:workers.entrySet())
        {
           if(me.getValue().getState()!=Thread.State.TERMINATED)
           {
               return false;
           }
        }
        return true;
    }
}

 

                                                                                            Worker

package com.cmos.ngkm.web.controller.basic;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;

public class Worker implements Runnable {
    //返回结果集
    private ConcurrentHashMap<String, Object> resultMap;
    //传递任务
    private ConcurrentLinkedQueue<Task> taskQueue;

    public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
        this.resultMap = resultMap;
    }

    public void setTaskQueue(ConcurrentLinkedQueue<Task> taskQueue) {
        this.taskQueue = taskQueue;
    }

    @Override
    public void run() {
        while(true) {
         Task task = taskQueue.poll();
         if(task == null)
         {
             break;
         }
         //真正做业务处理
         Object obj = handel(task);
         this.resultMap.put(task.getTaskId(),obj);
        }
    }
    public Object handel(Task task)
    {
        try{
          Thread.sleep(300);
          System.out.println(task.getTaskId()+"正在执行");
          return task.getTaskName();
        }catch(Exception e)
        {
          e.printStackTrace();
        }
        return null;
    }

}

                                                                                                           

package com.cmos.ngkm.web.controller.basic;

public class Task {
    private String taskName;

    private String taskId;

    public String getTaskId() {
        return taskId;
    }

    public String getTaskName() {
        return taskName;
    }

    public void setTaskName(String taskName) {
        this.taskName = taskName;
    }

    public void setTaskId(String taskId) {
        this.taskId = taskId;
    }
}

                                                                                     MainTest

package com.cmos.ngkm.web.controller.basic;

import java.util.concurrent.ConcurrentHashMap;

public class MainTest {

    public static void main(String[] args) {
        Master master = new Master(new Worker(),10);
        for(int i=0;i<100;i++)
        {
            Task task = new Task();
            task.setTaskId("taskid"+Integer.toString(i));
            task.setTaskName("taskName"+"i");
            master.submit(task);
        }
        master.execute();
        while(true)
        {
            if(master.isComplete())
            {
                ConcurrentHashMap<String, Object> resultMap = master.getResultMap();
                System.out.println(resultMap.size());
                System.out.println("任务已经完成");
                break;
            }
        }
    }
}

 

 

 

 

package com.cmos.ngkm.web.controller.basic;

import java.util.concurrent.ConcurrentHashMap;

public class MainTest {

public static void main(String[] args) {
Master master = new Master(new Worker(),10);
for(int i=0;i<100;i++)
{
Task task = new Task();
task.setTaskId("taskid"+Integer.toString(i));
task.setTaskName("taskName"+"i");
master.submit(task);
}
master.execute();
while(true)
{
if(master.isComplete())
{
ConcurrentHashMap<String, Object> resultMap = master.getResultMap();
System.out.println(resultMap.size());
System.out.println("任务已经完成");
break;
}
}
}
}



























以上是关于线程设计模式之Master-worker的主要内容,如果未能解决你的问题,请参考以下文章

多线程:多线程设计模式:Master-Worker模式

线程学习--Master-Worker模式

Java多线程Master-Worker模式

并行设计模式-- Master-Worker模式

java线程模型Master-Worker

java线程学习之Master-Worker模式