使用线程池优化Echo模型

Posted 大佛拈花-GoSaint

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用线程池优化Echo模型相关的知识,希望对你有一定的参考价值。

  在上一篇文章中 http://www.cnblogs.com/gosaint/p/8492356.html         我阐述了使用线程为每一个客户端创建一个工作线程来负责任务的执行。但是会存在如下的问题

  1.   服务器创建线程开销很大,有可能服务器创建线程消耗资源要比单独的和客户端通信所消耗的资源要大
  2.   如果存在大量的线程要同时创建,那么存在的内存开销很大。可能导致服务器系统内存不够
  3.        线程之间的切换消耗了大量的资源

  创建线程池可以较少线程的创建和销毁的开销,并且根据实际情况动态的调整线程池的大小。

  (1) 自定以线程池,让它继承ThreadGroup类

 

如下代码是自定义线程池代码的实现

package com.asiaInfo.caozg.ch_03.threadPool;

import java.util.LinkedList;

/**
 * @Authgor: gosaint
 * @Description:
 * @Date Created in 15:39 2018/3/2
 * @Modified By:自定义线程池的实现
 */
public class ThreadPool extends ThreadGroup {

    private boolean isClosed = false;//线程池是否关闭
    private LinkedList<Runnable> workQuee;//表示工作队列
    private static int threadPoolID;//表示线程池ID
    private int workThreadID;//表示工作线程池ID

    public ThreadPool(int poolSize) {
        //poolSize表示线程池中工作线程的数目
        //每一次的初始化,线程池ID都会自动增加
        super("ThreadPool-" + (threadPoolID++));
        //设置为守护线程
        setDaemon(true);
        workQuee = new LinkedList<>();//创建工作队列
        for (int i = 0; i < poolSize; i++) {
            new WorkThread().start();
        }
    }

    /**
     * 向工作队列中添加一个新的任务,让工作线程去执行
     *
     * @param task
     */
    public synchronized void execute(Runnable task) {
        if (isClosed) {
            //线程池关闭,则抛出如下的异常
            throw new IllegalStateException();
        }
        if (task != null) {
            workQuee.add(task);
            notify();//唤醒正在等待获取任务的工作线程getTask();
        }
    }

    /**
     * 从工作队列中取出一个线程,让线程执行任务
     *
     * @return
     * @throws InterruptedException
     */
    public synchronized Runnable getTask() throws InterruptedException {
        while (workQuee.size() == 0) {
            //当工作队列中的线程为0时,如果线程池关闭,则返回null.负责,等待任务
            if (isClosed) {
                return null;
            } else {
                wait();
            }
        }
        return workQuee.removeFirst();//从工作队列中弹出第一个元素
    }

    /**
     * 关闭线程池
     */
    public synchronized void closed() {
        //线程池没有关闭
        if (!isClosed) {
            isClosed = true;
            workQuee.clear();//清除工作队列
            interrupt();//中断所有的线程
        }
    }

    /**
     * 等在工作线程将若有的任务执行完毕
     */
    public void join() {
        synchronized (this) {
            isClosed = true;
            notifyAll();//唤醒所有的等待任务的工作线程
        }
        //activeCount() 返回此线程组中活动线程的估计数。 来自ThreadGroup
        Thread[] threads = new Thread[activeCount()];
        // enumerate[list] 把此线程组及其子组中的所有活动线程复制到指定数组中。
        int count = enumerate(threads);//返回所有的活着的线程数量
        for (int i = 0; i < count; i++) {
            try {
                threads[i].join();//等待所有的活动的工作宣称的结束
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 内部类,创建的工作线程对象
     */
    private class WorkThread extends Thread {
        public WorkThread() {
            super(ThreadPool.this, "WorkThread-" + workThreadID++);
        }

        @Override public void run() {
            //线程没有中断
            while (!isInterrupted()) {
                Runnable task = null;
                try {
                    task = getTask();//获取任务
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if (task == null) {
                    return;
                }
                try {
                    task.run();//运行任务
                } catch (Throwable t) {
                    t.printStackTrace();
                }

            }
        }
    }
}

1 流程执行的分析:

 

关注两个成员变量:

  1> isClosed---->表示线程池是否关闭;workQuee:表示的工作队列,使用的类型是LinkedList.

  2> 在构造器中初始化工作队列,然后启动每一个线程;

  3>execute()方法的执行如下所示:线程池对象的核心就是将线程添加到工作队列中

  4>getTask()方法 从工作队列中获取线程

  5> closed()方法:关闭线程池

  6>join()方法;等待线程执行完所有的任务

 

 2 测试自定义线程池:

package com.asiaInfo.caozg.ch_03.threadPool;

import static java.lang.Integer.parseInt;

/**
 * @Authgor: gosaint
 * @Description:
 * @Date Created in 16:25 2018/3/2
 * @Modified By:线程池测试
 */
public class ThreadPoolTester {

    public static void main(String[] args) {
        if (args.length != 2) {
            System.out.println(
                    "用法: java ThreadPoolTest numTasks poolSize");
            System.out.println(
                    "  numTasks - integer: 任务的数目");
            System.out.println(
                    "  numThreads - integer: 线程池中的线程数目");
            return;
        }
        int numTasks = Integer.parseInt(args[0]);
        int poolSize = Integer.parseInt(args[1]);

        ThreadPool threadPool = new ThreadPool(poolSize);  //创建线程池

        // 运行任务
        for (int i = 0; i < numTasks; i++)
            threadPool.execute(createTask(i));

        threadPool.join();  //等待工作线程完成所有的任务
        // threadPool.close(); //关闭线程池
    }//#main()

    /**
     * 定义了一个简单的任务(打印ID)
     */
    private static Runnable createTask(final int taskID) {
        return new Runnable() {
            public void run() {
                System.out.println("Task " + taskID + ": start");
                try {
                    Thread.sleep(500);  //增加执行一个任务的时间
                } catch (InterruptedException ex) {
                }
                System.out.println("Task " + taskID + ": end");
            }
        };
    }
}

设置参数5,3.表示5个任务3个线程去执行:

 

以上是关于使用线程池优化Echo模型的主要内容,如果未能解决你的问题,请参考以下文章

图解为什么要使用线程池?

Motan在服务provider端用于处理request的线程池

稳定性 耗时 监控原因分析-- dubbo rpc 框架 的线程池,io 连接模型. 客户端,服务端

WINSOCK.07.完成端口模型

使用自定义线程池优化EchoServer

Java——线程池