java线程池原理便懂了!♥♥

Posted 牛牛最爱喝兽奶

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java线程池原理便懂了!♥♥相关的知识,希望对你有一定的参考价值。

模拟Java线程池运行原理

在了解线程池之前,我们先来谈谈线程的状态转换

一个线程到底有哪些状态呢?
1、新建状态(new):线程对象被创建后就进入了新建状态。例如 Thread t = new Thread();
2、就绪状态(Runnable):也可以称为可执行状态,线程对象被创建后,其他线程(主线程、守护线程、用户线程)调用了该对象的start()方法,从而启动该线程。如:t.start(); 处于就绪状态的线程随时可能被CPU调度执行。
3、运行状态(Running):线程获取CPU权限进行执行。需要注意的是,线程只能从就绪状态进入到运行状态
4、阻塞状态(Blocked):阻塞状态是线程因为某种原因放弃CPU使用权限,暂时停止运行。直到线程进入就绪状态,才有机会进入运行状态。阻塞的三种情况:
1)等待阻塞:通过调用线程的wait()方法,让线程等待某工作的完成。
2)同步阻塞:线程在获取synchronized同步锁失败(因为锁被其他线程占用),它会进入同步阻塞状态。
3)其他阻塞:通过调用线程的sleep()或join()或发出了I/O请求时,线程会进入到阻塞状态。当sleep()状态超时、join()等待线程终止或超时、或者I/O处理完毕时,线程重新转入就绪状态。
5、死亡状态(Dead):线程执行完了或因异常退出了run()方法,该线程结束生命周期。

线程常用方法及种类

线程的种类:
系统级线程:
核心级线程 由操作系统内核进行管理,使用户程序可以创建,执行 撤销
用户级线程:
管理过程全部都是由用户程序完成,操作系统内核只对进程管理
多线程有哪些优势?
多线程使系统空转时间减少 ----提高CPU的利用率
进程间不能共享内存 但是线程之间共享内存非常容易
使用多线程实现多任务高并发 比多进程的效率高
线程方法:
Thread.currendThead() 获取当前线程的引用
getName() 获取线程的名字
setName() 设置线程的名字
start() 启动线程
run() 存放线程体的代码
setDaemon() 将线程设置为守护线程
线程的优先级:
查看优先级的方法 getPriority()
每个线程都具有一定的优先级 当调度线程时 会优先考虑级别比较高的线程
默认情况下:
一个线程继承父类的线程的优先级,可以使用线程中的setPriority(int newPriority) 设置线程的优先级别。
优先级主要影响的是 CPU在线程间在切换状态下
原则:
当一个线程 通过显示放弃 睡眠 或者阻塞 自愿释放控制权时 所有的线程都要接收检查,然后优先级高的会优先执行。
一个线程可以被一个高优先级的线程抢占资源。 同级别的线程之间,通过控制权的释放确保所有的线程均有机会执行。

线程池实现原理和线程池概念


线程池做的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量超出数量的线程排队等候,等其它线程执行完毕,再从队列中取出任务来执行。他的主要特点为:线程复用;控制最大并发数;管理线程。在java的API中实现线程池的类超类是java.util.concurrent 包下的Executor接口。

执行已提交的 Runnable 任务的对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用 Executor 而不是显式地创建线程。例如,可能会使用以下方法,而不是为一组任务中的每个任务调用 new Thread(new(RunnableTask())).start():
Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
不过,Executor 接口并没有严格地要求执行是异步的。在最简单的情况下,执行程序可以在调用方的线程中立即运行已提交的任务:
class DirectExecutor implements Executor
public void execute(Runnable r)
r.run();

更常见的是,任务是在某个不是调用方线程的线程中执行的。以下执行程序将为每个任务生成一个新线程。
class ThreadPerTaskExecutor implements Executor
public void execute(Runnable r)
new Thread®.start();

许多 Executor 实现都对调度任务的方式和时间强加了某种限制。以下执行程序使任务提交与第二个执行程序保持连续,这说明了一个复合执行程序。

class SerialExecutor implements Executor 
     final Queue<Runnable> tasks = new LinkedBlockingQueue<Runnable>();
     final Executor executor;
     Runnable active;

     SerialExecutor(Executor executor) 
         this.executor = executor;
     

     public synchronized void execute(final Runnable r) 
         tasks.offer(new Runnable() 
             public void run() 
                 try 
                     r.run();
                  finally 
                     scheduleNext();
                 
             
         );
         if (active == null) 
             scheduleNext();
         
     

     protected synchronized void scheduleNext() 
         if ((active = tasks.poll()) != null) 
             executor.execute(active);
         
     
 

此包中提供的 Executor 实现实现了 ExecutorService,这是一个使用更广泛的接口。ThreadPoolExecutor 类提供一个可扩展的线程池实现。Executors 类为这些 Executor 提供了便捷的工厂方法。

四种线程池

Java 里面线程池的顶级接口是 Executor,但是严格意义上讲 Executor 并不是一个线程池,而
只是一个执行线程的工具。真正的线程池接口是 ExecutorService。

newCachedThreadPool:
创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行
很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute 将重用以前构造
的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并
从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资
源。
newFixedThreadPool:
创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大
多数 nThreads 线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,
则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何
线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之
前,池中的线程将一直存在。
newScheduledThreadPool:
创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。

ScheduledExecutorService scheduledThreadPool= Executors.newScheduledThreadPool(3); 
 scheduledThreadPool.schedule(newRunnable() 
 @Override 
 public void run() 
 System.out.println("延迟三秒");
 
 , 3, TimeUnit.SECONDS);
scheduledThreadPool.scheduleAtFixedRate(newRunnable() 
 @Override 
 public void run() 
 System.out.println("延迟 1 秒后每三秒执行一次");
 
 ,1,3,TimeUnit.SECONDS);

newSingleThreadExecutor:
Executors.newSingleThreadExecutor()返回一个线程池(这个线程池只有一个线程),这个线程
池可以在线程死后(或发生异常时)重新启动一个线程来替代原来的线程继续执行下去!

线程池的组成

一般分为以下4个组成部分:

  1. 线程池管理器:用于创建并管理线程池
  2. 工作线程:线程池中的线程
  3. 任务接口:每个任务必须实现的接口,用于工作线程调度其运行
  4. 任务队列:用于存放待处理的任务,提供一种缓冲机制
    Java 中的线程池是通过 Executor 框架实现的,该框架中用到了 Executor,Executors,ExecutorService,ThreadPoolExecutor ,Callable 和 Future、FutureTask 这几个类。

    ThreadPoolExecutor的构造方法:
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) 
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    

参数介绍:

  1. corePoolSize:指定了线程池中的线程数量。
  2. maximumPoolSize:指定了线程池中的最大线程数量。
  3. keepAliveTime:当前线程池数量超过 corePoolSize 时,多余的空闲线程的存活时间,即多
    次时间内会被销毁。
  4. unit:keepAliveTime 的单位。
  5. workQueue:任务队列,被提交但尚未被执行的任务。
  6. threadFactory:线程工厂,用于创建线程,一般用默认的即可。
  7. handler:拒绝策略,当任务太多来不及处理,如何拒绝任务。

拒绝策略:
线程池中的线程已经用完了,无法继续为新任务服务,同时,等待队列也已经排满了,再也
塞不下新任务了。这时候我们就需要拒绝策略机制合理的处理这个问题。
JDK 内置的拒绝策略如下:

  1. AbortPolicy : 直接抛出异常,阻止系统正常运行。

  2. CallerRunsPolicy : 只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的
    任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。

  3. DiscardOldestPolicy : 丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再
    次提交当前任务。

  4. DiscardPolicy : 该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢
    失,这是最好的一种方案。
    以上内置拒绝策略均实现了 RejectedExecutionHandler 接口,若以上策略仍无法满足实际
    需要,完全可以自己扩展 RejectedExecutionHandler 接口。
    线程池工作过程:

  5. 线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面
    有任务,线程池也不会马上执行它们。

  6. 当调用 execute() 方法添加一个任务时,线程池会做如下判断:
    a) 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
    b) 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
    c) 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要
    创建非核心线程立刻运行这个任务;
    d) 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池
    会抛出异常 RejectExecutionException。

  7. 当一个线程完成任务时,它会从队列中取下一个任务来执行。

  8. 当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运
    行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它
    最终会收缩到 corePoolSize 的大小。
    java阻塞队列原理
    阻塞队列,关键字是阻塞,先理解阻塞的含义,在阻塞队列中,线程阻塞有这样的两种情况:

  9. 当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。

  10. 当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。

    阻塞队列常用方法:

代码

execute方法在将来某个时间执行任务

public void execute(Runnable command) 
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) 
            if (addWorker(command, true))
                return;
            c = ctl.get();
        
        if (isRunning(c) && workQueue.offer(command)) 
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        
        else if (!addWorker(command, false))
            reject(command);
    

purge()试图从工作队列移除所有已取消的future任务。

public void purge() 
        final BlockingQueue<Runnable> q = workQueue;
        try 
            Iterator<Runnable> it = q.iterator();
            while (it.hasNext()) 
                Runnable r = it.next();
                if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
                    it.remove();
            
         catch (ConcurrentModificationException fallThrough) 
            // Take slow path if we encounter interference during traversal.
            // Make copy for traversal and call remove for cancelled entries.
            // The slow path is more likely to be O(N*N).
            for (Object r : q.toArray())
                if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
                    q.remove(r);
        

        tryTerminate(); // In case SHUTDOWN and now empty
    

getPoolSize()返回线程池中当前的线程

public int getPoolSize() 
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try 
            // Remove rare and surprising possibility of
            // isTerminated() && getPoolSize() > 0
            return runStateAtLeast(ctl.get(), TIDYING) ? 0
                : workers.size();
         finally 
            mainLock.unlock();
        
    

自定义线程池功能简单

package cn.thread.线程;

import java.util.LinkedList;

public class ThreePool extends ThreadGroup 
    private boolean isClose = false;
    private LinkedList taskQueue;
    private static int threadPool_ID = 1;
    //线程池的构造方法
    public ThreePool(int poolSize) 
        super(threadPool_ID+"");//规定线程池的名称
        setDaemon(true);//守护线程
        taskQueue = new LinkedList();//创建队列
        for (int i=0;i<poolSize;i++)//启动线程任务
            new TaskThread(i).start();
        
    
    //添加新任务,并且执行
    public synchronized void executeTask(Runnable task)
        if(isClose)
           throw  new IllegalArgumentException("");
        
        if(task!=null)
            taskQueue.add(task);//向任务中队列中添加一个任务
            notify();//唤醒
        
    
    private synchronized Runnable getTask(int id)  
        try
            while (taskQueue.size()==0)
                if(isClose)
                    return null;
                else
                    System.out.println("工作线程"+id+"在等待任务!");
                    wait();
                
            
        catch (Exception e)
            System.out.println("等待任务出现错误!");
        
        System.out.println("工作线程"+id+"开始执行工作");
        return  (Runnable) taskQueue.removeFirst();
    
    private class TaskThread extends Thread
        private int id;
        public TaskThread(int id)
            super(ThreePool.this, String.valueOf(id));
            this.id = id;
        

        @Override
        public void run() 
            //判断当前线程是否处于中断状态
            while(!isInterrupted())
                Runnable task =getTask(id); //通过id获取队列中的线程对象
               if(task==null)
                   return;
               
               try
                   task.run();
               catch (Exception e)
                   e.printStackTrace();
               
            
        
    
    //关闭线程池
    public synchronized  void closeThreadPool() throws InterruptedException 
        if(!isClose)//判断标识
            //等待线程的任务执行完毕
            waitTaskFinish();
            isClose = true;
            taskQueue.clear();
            interrupt();
        
    
    public void waitTaskFinish() throws InterruptedException 
        synchronized (this)
            isClose  = true;
            notifyAll();
        
        Thread[] threads = new Thread[activeCount()];//线程组中的活动数量相同
        int count = enumerate(threads);
        //循环等待线程结束
        for(int i=0;i<count;i++)
            threads[i].join();
        

    



以上是关于java线程池原理便懂了!♥♥的主要内容,如果未能解决你的问题,请参考以下文章

Java并发系列终结篇:学校门口保安小王,这次彻底搞懂了Java线程池的工作原理

Java线程池的实现原理,你清楚么?

Java并发编程——线程池

面试阿里,字节跳动99%会被问到的java线程和线程池,看完这篇你就懂了!

java生成随机字母组合,看完这一篇你就懂了

面试必问的线程池,你懂了吗?