Java Thread系列Master-Worker模式

Posted binarylei

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java Thread系列Master-Worker模式相关的知识,希望对你有一定的参考价值。

Java Thread系列(九)Master-Worker模式

Master-Worker模式是常用的并行设计模式.

一、Master-Worker 模式核心思想

Master-Worker 系统由两个角色组成,Master 和 Worker,Master 负责接收和分配任务,Worker 负责处理子任务。任务处理过程中,Master 还负责监督任务进展和 Worker 的健康状态;Master 将接收 Client 提交的任务,并将任务的进展汇总反馈给 Client。各角色关系如下图:

技术分享图片

二、Master-Worker 实现

(1) Master

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;

public class Master {
    //1. 应该有一个容器存放任务列表,这个容器需要支持高并发操作
    private ConcurrentLinkedDeque<Task> taskQueue = new ConcurrentLinkedDeque<Task>();

    //2. 应该有一个容器存放worker
    private HashMap<String, Thread> workers = new HashMap<String, Thread>();

    //3. 应该有一个容器存放结果集,这个容器需要支持高并发操作
    private ConcurrentHashMap<String, Object> resultMap = new ConcurrentHashMap<String, Object>();

    //4. 构造函数
    public Master (Worker worker, int threadCount) {
        //将任务列表和结果集传递给worker
        worker.setTaskQueue(taskQueue);
        worker.setResultMap(resultMap);
        //初始化worder列表
        for (int i = 0; i < threadCount; i++) {
            workers.put("worker-" + i, new Thread(worker));
        }
    }

    public Master (Worker worker) {
        this(worker, Runtime.getRuntime().availableProcessors());
    }

    //5. 提交任务
    public void submit (Task task) {
        taskQueue.add(task);
    }

    //6. 执行方法 开启所有的线程
    public void execute () {
        for(Map.Entry<String, Thread> me : workers.entrySet()) {
            me.getValue().start();
        }
    }

    //7. 判断是否执行完毕
    public boolean isComplete () {
        for(Map.Entry<String, Thread> me : workers.entrySet()) {
            if (me.getValue().getState() != Thread.State.TERMINATED)
                return false;
        }
        return true;
    }

    //8. 处理结果集
    public int getResult () {
        int ret = 0;
        for(Map.Entry<String, Object> me : resultMap.entrySet()) {
            ret += (int)me.getValue();
        }
        return ret;
    }
}

(2) Worker

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;

public class Worker implements Runnable {

    private ConcurrentLinkedDeque<Task> taskQueue;
    private ConcurrentHashMap<String, Object> resultMap;

    @Override
    public void run() {
        while (true) {
            Task task = taskQueue.poll();
            if (task == null) break;

            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //返回结果集
            resultMap.put(Integer.toString(task.getId()), handle(task));
        }
    }

    private Object handle(Task task) {
        return task.getCount();
    }

    public void setTaskQueue(ConcurrentLinkedDeque<Task> taskQueue) {
        this.taskQueue = taskQueue;
    }

    public void setResultMap(ConcurrentHashMap<String, Object> resultMap) {
        this.resultMap = resultMap;
    }
}

(3) Task

public class Task {
    private int id;
    private String name;
    private int count;

    public Task() {}    
    public Task(int id, String name, int count) {
        this.id = id;
        this.name = name;
        this.count = count;
    }
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public int getCount() {
        return count;
    }
    public void setCount(int count) {
        this.count = count;
    }
    @Override
    public String toString() {
        return "Task{" + "id=" + id + ", name=‘" + name + ‘\‘‘ +
                ", count=" + count + ‘}‘;
    }
}

(4) 测试

Master master = new Master(new Worker(), 1);

for (int i = 1; i <= 100; i++) {
    master.submit(new Task(i, "task-" + i ,i));
}

master.execute();

long t1 = System.currentTimeMillis();
while (true) {
    if (master.isComplete()) {
        long t = System.currentTimeMillis() - t1;
        System.out.printf("执行结果:%s;执行时间:%s", master.getResult(), t);
        break;
    }
}

每天用心记录一点点。内容也许不重要,但习惯很重要!

以上是关于Java Thread系列Master-Worker模式的主要内容,如果未能解决你的问题,请参考以下文章

Java Thread系列线程安全

Java Thread系列synchronized

Java Thread系列线程状态

Java多线程系列二——Thread类的方法

Java Thread系列Future 模式

Java多线程系列--“基础篇” 线程创建的方式