线程池(中)

Posted 带剑书生

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程池(中)相关的知识,希望对你有一定的参考价值。

线程池(下)

线程池超负载了怎么办?都有哪些拒绝策略?

在ThreadPoolExecutor的构造方法里有一个这样的参数, RejectedExecutionHandler 通过查看Jdk我们可以知道这是一个接口,而且jdk内置实现了四种拒绝策略,它们都是ThreadPoolExecutor的public static class。

  • CallerRunsPolicy策略:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务,虽然这样并不会真的丢弃任务,但是相应的被提交任务的线程性能肯定会急剧下降。

  • AbortPolicy策略:该策略会直接抛出异常,阻止系统正常工作。

  • DiscardPolicy策略:直接丢弃任务,不给予任何处理。

  • DiscardOldestPolicy策略:该策略会丢弃最古老的请求(也就是任务队列中最先进入的任务),即将被执行的策略,并尝试再次提交当前任务。

 
   
   
 
  1. import java.util.concurrent.ArrayBlockingQueue;

  2. import java.util.concurrent.ThreadPoolExecutor;

  3. import java.util.concurrent.TimeUnit;


  4. public class ExecutorServiceDemo {


  5.    static void log(String msg) {

  6.        System.out.println(System.currentTimeMillis() + " -> " + msg);

  7.    }


  8.    public static void main(String[] args) throws Exception {

  9.        ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,

  10.                new ArrayBlockingQueue<Runnable>(1));

  11.        pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());

  12.        for (int i = 0; i < 10; i++) {

  13.            final int index = i;

  14.            pool.submit(new Runnable() {

  15.                public void run() {

  16.                    log("run task:" + index + " -> " + Thread.currentThread().getName());

  17.                    try {

  18.                        Thread.sleep(1000L);

  19.                    } catch (Exception e) {

  20.                        e.printStackTrace();

  21.                    }

  22.                    log("run over:" + index + " -> " + Thread.currentThread().getName());

  23.                }

  24.            });

  25.        }

  26.        log("before sleep");

  27.        Thread.sleep(4000L);

  28.        log("before shutdown()");

  29.        pool.shutdown();

  30.        log("after shutdown(),pool.isTerminated=" + pool.isTerminated());

  31.        pool.awaitTermination(1, TimeUnit.SECONDS);

  32.        log("now,pool.isTerminated=" + pool.isTerminated());

  33.    }

  34. }

程序设置的线程池只有一个线程,并且允许最大的线程数量也为1,同时任务队列的最大界限为1。 当采用DisCardOldestPolicy()时:

线程池(中)可以知道在workQueue中的task1-task8一直被丢弃(因为任务队列只有一个容量,而线程池里的唯一线程处理的速度不是很快,而程序又不停的往线程池里提交任务)。直到最后一个task9任务才被执行。 当采用DisCardPolicy()时:

线程池(中)可以看见刚除开始线程执行的task0和任务队列里的task1,其余的都被默默地抛弃了。 当采用AbortPolicy()策略时:系统直接抛出异常。。。

 当采用CallerRunsPolicy()策略时就有点不同的了:

线程池(中)

可以看见在main线程一直帮忙处理不能被线程池处理同时也不能进入任务队列的任务。 若以上的策略还是无法满足实际应用的需要。我们还可以自己扩展

 
   
   
 
  1. RejectedExecutionHandler {

  2. void rejectedExecution(Runnable r,ThreadPoolExecutor executor);

  3. } 

自己定义线程创建:ThreadFactory

看到这里我们可能有疑问那就是线程池的线程是从哪里来的?答案就是:ThreadFactory 它是一个接口,只有一个方法

 
   
   
 
  1. public Thread newThread(Runnable r);

自定义线程池可以更加自由的设置线程的状态

 
   
   
 
  1. import java.util.concurrent.ExecutorService;

  2. import java.util.concurrent.SynchronousQueue;

  3. import java.util.concurrent.ThreadFactory;

  4. import java.util.concurrent.ThreadPoolExecutor;

  5. import java.util.concurrent.TimeUnit;


  6. /**

  7. * 自定义线程池 :public interface ThreadFactory

  8. * 根据需要创建新线程的对象

  9. * Thread newThread(Runnable r)  

  10. * @author Administrator

  11. *

  12. */

  13. public class MyThreadFactory {


  14.    public static class MyTask implements Runnable {


  15.        @Override

  16.        public void run() {

  17.            System.out.println( System.currentTimeMillis() + " Thread id:" + Thread.currentThread().getId());

  18.            try {

  19.                Thread.sleep(100);

  20.            } catch (InterruptedException e) {

  21.                e.printStackTrace();

  22.            }

  23.        }


  24.    }  

  25.    public static void main(String[] args) throws InterruptedException {

  26.        MyTask task = new MyTask();

  27.        ExecutorService exec = new ThreadPoolExecutor(5,5,0L,TimeUnit.MILLISECONDS,

  28.                new SynchronousQueue<Runnable>(),

  29.                new ThreadFactory(){


  30.                    @Override

  31.                    public Thread newThread(Runnable r) {

  32.                        Thread t = new Thread(r,"后台线程");

  33.                        t.setDaemon(true);

  34.                        System.out.println("create " + t.getName()+ " " + t.getId());

  35.                        return t;

  36.                    }

  37.        });

  38.        for(int i = 0; i < 5; i++) {

  39.            exec.submit(task);

  40.        }

  41.        TimeUnit.SECONDS.sleep(2);

  42.    }

  43. }

实际中使用guava的ThreadFactoryBuilder来创建一个ThreadFactory更加灵活方便。

线程池的扩展

上面仅仅只是我们自定义了创建线程时的状态,但是有时候,我们需要对线程执行的任务进行监控,比如说任务的开始时间和结束时间。幸运的是,ThreadPoolExecutor是一个可扩展的线程池。提供了三个方法:

 
   
   
 
  1. protected void beforeExecute(Thread t, Runnable r) { }

  2. protected void afterExecute(Runnable r, Throwable t) { }

  3. protected void terminated() { }    

同时再看ThreadPoolExecutor中有一个这样的类, private final class Worker extends AbstractQueuedSynchronizer implements Runnable它里面有一个方法runWorker

 
   
   
 
  1.    final void runWorker(Worker w) {

  2.        Runnable task = w.firstTask;

  3.        w.firstTask = null;

  4.        boolean completedAbruptly = true;

  5.        try {

  6.            while (task != null || (task = getTask()) != null) {

  7.                w.lock();

  8.                clearInterruptsForTaskRun();

  9.                try {

  10.                    beforeExecute(w.thread, task);//运行前

  11.                    Throwable thrown = null;

  12.                    try {

  13.                        task.run();//运行任务

  14.                    } catch (RuntimeException x) {

  15.                        thrown = x; throw x;

  16.                    } catch (Error x) {

  17.                        thrown = x; throw x;

  18.                    } catch (Throwable x) {

  19.                        thrown = x; throw new Error(x);

  20.                    } finally {

  21.                        afterExecute(task, thrown);//运行结束后

  22.                    }

  23.                } finally {

  24.                    task = null;

  25.                    w.completedTasks++;

  26.                    w.unlock();

  27.                }

  28.            }

  29.            completedAbruptly = false;

  30.        } finally {

  31.            processWorkerExit(w, completedAbruptly);

  32.        }

  33.    }

ThreadPoolExecutor中工作线程正是Worker的实例(它是把Runnable对象进行了包装),Worker.runWorker()会被线程池以多线程模式异步调用,即它会被多个线程访问,因此其beforeExecute()它们也会被多线程同时访问。 在默认的ThreadExecutor实现中,提供了beforeExecute(),afterExecute(),terminated()空的实现。在实际的应用中可以通过对其进行扩展实现多线程池运行状态的跟踪。

 
   
   
 
  1. import java.text.SimpleDateFormat;

  2. import java.util.Date;

  3. import java.util.concurrent.ExecutorService;

  4. import java.util.concurrent.LinkedBlockingQueue;

  5. import java.util.concurrent.ThreadPoolExecutor;

  6. import java.util.concurrent.TimeUnit;



  7. public class ExtThreadPool {


  8.    public static class MyTask implements Runnable {

  9.        private String name;


  10.        public MyTask(String name) {

  11.            this.name = name;

  12.        }


  13.        @Override

  14.        public void run() {

  15.            System.out.println("正在执行线程id:" + Thread.currentThread().getId() + " Task name " + name);

  16.            try {

  17.                TimeUnit.MILLISECONDS.sleep(100);

  18.            } catch (InterruptedException e) {

  19.                e.printStackTrace();

  20.            }

  21.        }

  22.    }


  23.    public static void main(String[] args) throws InterruptedException {


  24.        ExecutorService exec =

  25.                new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,

  26.                new LinkedBlockingQueue<Runnable>())

  27.        //直接获得ThreadPoolExecutor的子类,并且重写protect钩子方法

  28.        {

  29.            @Override

  30.            protected synchronized void beforeExecute(Thread t, Runnable r) {

  31.                System.out.print(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SS").format(new Date()));

  32.                System.out.println("准备执行:" + ((MyTask)r).name);

  33.            }


  34.            @Override

  35.            protected synchronized void afterExecute(Runnable r, Throwable t) {

  36.                System.out.print(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SS").format(new Date()));

  37.                System.out.println("执行完成:" + ((MyTask)r).name);

  38.            }


  39.            @Override

  40.            protected synchronized void terminated() {

  41.                System.out.print(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss:SS").format(new Date()));

  42.                System.out.println("线程池退出...");    

  43.            }


  44.        };


  45.        for(int i = 0; i < 5; i++) {

  46.            MyTask task = new MyTask("Task" + i);

  47.            exec.execute(task);

  48.            Thread.sleep(10);

  49.        }

  50.        exec.shutdown();



  51.    }

  52. }

这里需要注意打印时间这句会发生线程安全。用Synchronized监视器让线程在临界区进行互斥的执行。 运行结果:

这里可以清楚地看到线程池中执行任务的创建和结束信息。

关于我



以上是关于线程池(中)的主要内容,如果未能解决你的问题,请参考以下文章

Java线程池详解

Java线程池详解

Java 线程池详解

线程池与并行度

Motan在服务provider端用于处理request的线程池

线程池-实现一个取消选项