资源的调度——线程池

Posted Android架构师之路

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了资源的调度——线程池相关的知识,希望对你有一定的参考价值。

前言


如何使用线程决定了计算任务完成的效率,在不降低性能的前提下榨干机器的每一滴性能,让计算任务更快速高效的执行。这个问题引出了阿姆达尔定律,理论上增加CPU处理器的个数就能加快计算速度,但是有一个上限,计算速度的提升决定于与应用程序中代码可并行执行量。在单核的机器中,多个线程的执行其实并未提高太多的效率,多个线程争夺CPU执行权的时间片,一个线程获得了执行权,其他的线程就要等待。显然多核有利于提高多线程执行权的非冲突获取,提高计算速度。但并非所有的代码都会并行执行,代码串行执行情况下,多核计算效率和单核CPU情况下一样。现在的android手机基本都是多核,线程池的设计应当参考手机CPU数来进行设置达到最大的执行效率。本文从代码设计的角度分析常见的线程任务模型。


正文


在Android中,说到异步线程不得不提AsyncTask,该类具体提供了以下几个功能:

  1. 线程池管理

  2. 线程池的线程个数根据手机CPU个数自动调整

  3. UI线程自动调度

  4. 任务可取消


核心功能线程池管理用的是jdk的线程池框架ThreadPoolExecutor,线程池的常驻个数是2~4个,当常驻线程不够用的时候会创建新的线程来可满足计算需求,线程池中线程总数为两倍CPU个数加一,比如手机是4个CPU,那么线程池中最多有9个线程,其中常驻线程为3个,非常驻线程在一定时间未使用后会自动回收。UI线程的调度使用的是handler,任务可取消的特性仍然使用的jdk的特性FutureTask,综合这些特性来看,其实AsyncTask只是一层Wrapper,核心的线程功能还是用的jdk里的线程框架。


Android市面上凡是涉及到线程管理这块基本都是用的jdk的线程框架Executor,大家耳熟能详的如Rxjava,okhttp等。该线程框架实现的是对传统IO的操作,相对于传统IO还有一种新的IO操作-NIO,非阻塞式的,netty框架就是基于NIO实现了,效率比传统IO高非常多。这两个线程框架都是 Dong Lea主刀开发的,Dong Lea是并发领域的高级专家。下面来看下这个Executor框架是如何管理线程的。

Executor UML用例图设计如下:


Executor

   public interface Executor {
   //执行一个任务
   void execute(Runnable command);
 }

ExecutorService

public interface ExecutorService extends Executor {


//关闭正在执行或未执行的任务,调用后无法添加新的任务,多次调用无效
void shutdown();


//尝试停止正在执行或者等待的任务,返回未执行的任务,该方法会尽最大努力去关闭正在执行的任务,线程的取消方法不能保证绝对可靠
List<Runnable> shutdownNow();
//是否执行力shutdown命令
boolean isShutdown();

//shutdown命令发出后,所有任务是否已经完成
boolean isTerminated();

//方法将一直阻塞,直到三种情况中任何一种发生了。一,shutdown命令发出后,所有的任务执行完毕;二,阻塞等待超时;三,线程中断了。
boolean awaitTermination(long timeout, TimeUnit unit)
    throws InterruptedException
;
//提交一个计算任务,并返回这个任务的跟踪信息类Future(任务状态和结果),若要获取异步执行的结果,可以使用Future.get()方法,该方法时一个阻塞方法,知道有计算有了结果才返回
<T> Future<T> submit(Callable<T> task);

Future<?> submit(Runnable task);

 //执行所有任务,并将结果一起返回
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
    throws InterruptedException;
 //执行所有任务,并将结果一起返回,所有任务执行的时间必须在一个时间值内,否则超时
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                              long timeout, TimeUnit unit)
    throws InterruptedException;

 //执执行任意一个任务,有了结果就返回
<T> invokeAny(Collection<? extends Callable<T>> tasks)
    throws InterruptedException, ExecutionException
;

 //执执行任意一个任务,有了结果就返回,执行时间必须在一个时间值内,否则超时
<T> invokeAny(Collection<? extends Callable<T>> tasks,
                long timeout, TimeUnit unit)

    throws InterruptedException, ExecutionException, TimeoutException
;
}

ThreadPoolExecutor

static {
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
            CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
            sPoolWorkQueue, sThreadFactory);
    threadPoolExecutor.allowCoreThreadTimeOut(true);
    THREAD_POOL_EXECUTOR = threadPoolExecutor;
}


ThreadPoolExecutor的构造参数有非常多的配置,其中有几个重点:


  1. 线程数量:线程池创建时会创建corePoolSize数量的线程常驻内存,当常驻线程用完,新的线程将会创建来执行任务,总的线程数不能超过maximusPoolSize,若线程数已经达最大值,新的任务将在队列中等待。

  2. 线程工厂:自定义线程工厂ThreadFactory来定义配置创建线程的属性比如归属的线程组优先级是否是守护线程等

  3. 线程生存时间:常驻线程之外的线程称为临时线程,临时线程有着生存时间,当临时线程空闲超过一定时间就会被销毁,常驻线程也可以设置是否在空闲时自动销毁。

  4. 任务队列:使用一个容量无限的队列,当所有的corePoolSize都处于忙碌状态时,新任务将在队列中等待,此时maximumPoolSize设置是无效的,线程池中最多的线程数是corePoolSize;使用一个容量有限的队列,对资源的使用有一个限制,此时maximumPoolSize有效的,但是任务队列大小与线程池maximumPoolSize大小如何彼此参照设定是一个很难控制的问题,不协调的设置方式都会对系统的资源分配不利,AsyncTask的任务队列设置大小为128,当队列中的任务数超过128是会抛出异常。

     private static final BlockingQueue<Runnable> sPoolWorkQueue =
        new LinkedBlockingQueue<Runnable>(128);
  5.  任务拒绝异常:线程池关闭或者线程池中的任务队列容量已经到达上限,添加新的任务时会发生rejected异常。


线程池工作模型图:


一个woker消费者只与一个线程和一个任务绑定,一个woker同一时间只能执行一个任务。当一个任务添加进任务队列的时候会根据线程池中消费者的数量决定是创建新的消费者还是在队列中等待可用的消费者。


public void execute(Runnable command{
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
   //若消费者数量小于corePoolSize,则添加消费者
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
   //若线程池处于运行状态,添加任务到任务队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
   //若添加任务不成功
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(nullfalse);
    }

    else if (!addWorker(command, false))
        reject(command);
   }


每个消费者的线程在一个循环中阻塞获取任务队列中的任务:


final void runWorker(Worker w{
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
      //getTask()方法从任务队列中获取任务,若没有任务则会阻塞等待
        while (task != null || (task = getTask()) != null) {
            w.lock();

            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
    }


任务执行完后会将结果发送到Future中


public void run() {
    if (state != NEW ||
        !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
               //返回结果
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
               //通知异常
                setException(ex);
            }
            //通知结果
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
   }


run方法完成后,结果将存储在future类中,通过get方法获取,完成整个异步任务的执行。

就像一座沙场不断的产生沙子,然后有多个运沙车将沙子从里面运出来,没有沙子的时候运沙车就在沙场外面等着,等待的时间超过一定值,管理人员觉得这么多运沙车空闲是一种资源浪费,于是就保留少量的运沙车在外面等待,其余的车退回市场。当任务繁忙的时候又从市场上订来更多的运沙车帮忙运沙。每个运沙车运沙子的量以及结果都有一个账本进行跟踪统计。


总结

jdk中的线程池框架是典型的生产者消费者模式,在此架构基础上提供了很多的安全、状态控制以及任务信息跟踪的功能。正如Dong Lea自己所说,该线程池框架着重要解决的是,在机器资源一定的情况下,最大性能的解决大量的并发任务问题。因此设计上线程池的配置以及任务的配置都是可控的,可以根据机器具体情况进行配置。


在实际的研发中,也有任务异步串行执行的需求,串行任务执行使用任务队列+单消费者的模式更简洁明了,Android系统的单线程模型即是基于这种模式,从执行的顺序性上规约了任务事件发生的顺序性,而事件发生的顺序性往往是很多需求所追求的。



推荐阅读:








如果你想要跟大家分享你的文章,欢迎投稿~


┏(^0^)┛下周见!




以上是关于资源的调度——线程池的主要内容,如果未能解决你的问题,请参考以下文章

基于线程池的线程调度管控系统

基于线程池的线程调度管控系统

Linux---多线程线程池

Linux---多线程线程池

Linux---多线程线程池

线程池 & 线程调度