只需这篇文章java线程池原理便懂了!♥♥
Posted 勇敢牛牛不怕困难@帅
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了只需这篇文章java线程池原理便懂了!♥♥相关的知识,希望对你有一定的参考价值。
在了解线程池之前,我们先来谈谈线程的状态转换
一个线程到底有哪些状态呢?
1、新建状态(new):线程对象被创建后就进入了新建状态。例如 Thread t = new Thread();
2、就绪状态(Runnable):也可以称为可执行状态,线程对象被创建后,其他线程(主线程、守护线程、用户线程)调用了该对象的start()方法,从而启动该线程。如:t.start(); 处于就绪状态的线程随时可能被CPU调度执行。
3、运行状态(Running):线程获取CPU权限进行执行。需要注意的是,线程只能从就绪状态进入到运行状态。
4、阻塞状态(Blocked):阻塞状态是线程因为某种原因放弃CPU使用权限,暂时停止运行。直到线程进入就绪状态,才有机会进入运行状态。阻塞的三种情况:
1)等待阻塞:通过调用线程的wait()方法,让线程等待某工作的完成。
2)同步阻塞:线程在获取synchronized同步锁失败(因为锁被其他线程占用),它会进入同步阻塞状态。
3)其他阻塞:通过调用线程的sleep()或join()或发出了I/O请求时,线程会进入到阻塞状态。当sleep()状态超时、join()等待线程终止或超时、或者I/O处理完毕时,线程重新转入就绪状态。
5、死亡状态(Dead):线程执行完了或因异常退出了run()方法,该线程结束生命周期。
线程常用方法及种类
线程的种类:
系统级线程:
核心级线程 由操作系统内核进行管理,使用户程序可以创建,执行 撤销
用户级线程:
管理过程全部都是由用户程序完成,操作系统内核只对进程管理
多线程有哪些优势?
多线程使系统空转时间减少 ----提高CPU的利用率
进程间不能共享内存 但是线程之间共享内存非常容易
使用多线程实现多任务高并发 比多进程的效率高
线程方法:
Thread.currendThead() 获取当前线程的引用
getName() 获取线程的名字
setName() 设置线程的名字
start() 启动线程
run() 存放线程体的代码
setDaemon() 将线程设置为守护线程
线程的优先级:
查看优先级的方法 getPriority()
每个线程都具有一定的优先级 当调度线程时 会优先考虑级别比较高的线程
默认情况下:
一个线程继承父类的线程的优先级,可以使用线程中的setPriority(int newPriority) 设置线程的优先级别。
优先级主要影响的是 CPU在线程间在切换状态下
原则:
当一个线程 通过显示放弃 睡眠 或者阻塞 自愿释放控制权时 所有的线程都要接收检查,然后优先级高的会优先执行。
一个线程可以被一个高优先级的线程抢占资源。 同级别的线程之间,通过控制权的释放确保所有的线程均有机会执行。
线程池实现原理和线程池概念
线程池做的工作主要是控制运行的线程的数量,处理过程中将任务放入队列,然后在线程创建后启动这些任务,如果线程数量超过了最大数量超出数量的线程排队等候,等其它线程执行完毕,再从队列中取出任务来执行。他的主要特点为:线程复用;控制最大并发数;管理线程。在java的API中实现线程池的类超类是java.util.concurrent 包下的Executor接口。
执行已提交的 Runnable 任务的对象。此接口提供一种将任务提交与每个任务将如何运行的机制(包括线程使用的细节、调度等)分离开来的方法。通常使用 Executor 而不是显式地创建线程。例如,可能会使用以下方法,而不是为一组任务中的每个任务调用 new Thread(new(RunnableTask())).start():
Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
不过,Executor 接口并没有严格地要求执行是异步的。在最简单的情况下,执行程序可以在调用方的线程中立即运行已提交的任务:
class DirectExecutor implements Executor {
public void execute(Runnable r) {
r.run();
}
}更常见的是,任务是在某个不是调用方线程的线程中执行的。以下执行程序将为每个任务生成一个新线程。
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread®.start();
}
}
许多 Executor 实现都对调度任务的方式和时间强加了某种限制。以下执行程序使任务提交与第二个执行程序保持连续,这说明了一个复合执行程序。
class SerialExecutor implements Executor {
final Queue<Runnable> tasks = new LinkedBlockingQueue<Runnable>();
final Executor executor;
Runnable active;
SerialExecutor(Executor executor) {
this.executor = executor;
}
public synchronized void execute(final Runnable r) {
tasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (active == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((active = tasks.poll()) != null) {
executor.execute(active);
}
}
}
此包中提供的 Executor 实现实现了 ExecutorService,这是一个使用更广泛的接口。ThreadPoolExecutor 类提供一个可扩展的线程池实现。Executors 类为这些 Executor 提供了便捷的工厂方法。
四种线程池
Java 里面线程池的顶级接口是 Executor,但是严格意义上讲 Executor 并不是一个线程池,而
只是一个执行线程的工具。真正的线程池接口是 ExecutorService。
newCachedThreadPool:
创建一个可根据需要创建新线程的线程池,但是在以前构造的线程可用时将重用它们。对于执行
很多短期异步任务的程序而言,这些线程池通常可提高程序性能。调用 execute 将重用以前构造
的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并
从缓存中移除那些已有 60 秒钟未被使用的线程。因此,长时间保持空闲的线程池不会使用任何资
源。
newFixedThreadPool:
创建一个可重用固定线程数的线程池,以共享的无界队列方式来运行这些线程。在任意点,在大
多数 nThreads 线程会处于处理任务的活动状态。如果在所有线程处于活动状态时提交附加任务,
则在有可用线程之前,附加任务将在队列中等待。如果在关闭前的执行期间由于失败而导致任何
线程终止,那么一个新线程将代替它执行后续的任务(如果需要)。在某个线程被显式地关闭之
前,池中的线程将一直存在。
newScheduledThreadPool:
创建一个线程池,它可安排在给定延迟后运行命令或者定期地执行。
ScheduledExecutorService scheduledThreadPool= Executors.newScheduledThreadPool(3);
scheduledThreadPool.schedule(newRunnable(){
@Override
public void run() {
System.out.println("延迟三秒");
}
}, 3, TimeUnit.SECONDS);
scheduledThreadPool.scheduleAtFixedRate(newRunnable(){
@Override
public void run() {
System.out.println("延迟 1 秒后每三秒执行一次");
}
},1,3,TimeUnit.SECONDS);
newSingleThreadExecutor:
Executors.newSingleThreadExecutor()返回一个线程池(这个线程池只有一个线程),这个线程
池可以在线程死后(或发生异常时)重新启动一个线程来替代原来的线程继续执行下去!
线程池的组成
一般分为以下4个组成部分:
- 线程池管理器:用于创建并管理线程池
- 工作线程:线程池中的线程
- 任务接口:每个任务必须实现的接口,用于工作线程调度其运行
- 任务队列:用于存放待处理的任务,提供一种缓冲机制
Java 中的线程池是通过 Executor 框架实现的,该框架中用到了 Executor,Executors,ExecutorService,ThreadPoolExecutor ,Callable 和 Future、FutureTask 这几个类。
ThreadPoolExecutor的构造方法:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
参数介绍:
- corePoolSize:指定了线程池中的线程数量。
- maximumPoolSize:指定了线程池中的最大线程数量。
- keepAliveTime:当前线程池数量超过 corePoolSize 时,多余的空闲线程的存活时间,即多
次时间内会被销毁。 - unit:keepAliveTime 的单位。
- workQueue:任务队列,被提交但尚未被执行的任务。
- threadFactory:线程工厂,用于创建线程,一般用默认的即可。
- handler:拒绝策略,当任务太多来不及处理,如何拒绝任务。
拒绝策略:
线程池中的线程已经用完了,无法继续为新任务服务,同时,等待队列也已经排满了,再也
塞不下新任务了。这时候我们就需要拒绝策略机制合理的处理这个问题。
JDK 内置的拒绝策略如下:
-
AbortPolicy : 直接抛出异常,阻止系统正常运行。
-
CallerRunsPolicy : 只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的
任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。 -
DiscardOldestPolicy : 丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再
次提交当前任务。 -
DiscardPolicy : 该策略默默地丢弃无法处理的任务,不予任何处理。如果允许任务丢
失,这是最好的一种方案。
以上内置拒绝策略均实现了 RejectedExecutionHandler 接口,若以上策略仍无法满足实际
需要,完全可以自己扩展 RejectedExecutionHandler 接口。
线程池工作过程: -
线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面
有任务,线程池也不会马上执行它们。 -
当调用 execute() 方法添加一个任务时,线程池会做如下判断:
a) 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
b) 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列;
c) 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要
创建非核心线程立刻运行这个任务;
d) 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池
会抛出异常 RejectExecutionException。 -
当一个线程完成任务时,它会从队列中取下一个任务来执行。
-
当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运
行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它
最终会收缩到 corePoolSize 的大小。
java阻塞队列原理
阻塞队列,关键字是阻塞,先理解阻塞的含义,在阻塞队列中,线程阻塞有这样的两种情况: -
当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。
-
当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。
阻塞队列常用方法:
代码
execute方法在将来某个时间执行任务
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
purge()试图从工作队列移除所有已取消的future任务。
public void purge() {
final BlockingQueue<Runnable> q = workQueue;
try {
Iterator<Runnable> it = q.iterator();
while (it.hasNext()) {
Runnable r = it.next();
if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
it.remove();
}
} catch (ConcurrentModificationException fallThrough) {
// Take slow path if we encounter interference during traversal.
// Make copy for traversal and call remove for cancelled entries.
// The slow path is more likely to be O(N*N).
for (Object r : q.toArray())
if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
q.remove(r);
}
tryTerminate(); // In case SHUTDOWN and now empty
}
getPoolSize()返回线程池中当前的线程
public int getPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Remove rare and surprising possibility of
// isTerminated() && getPoolSize() > 0
return runStateAtLeast(ctl.get(), TIDYING) ? 0
: workers.size();
} finally {
mainLock.unlock();
}
}
自定义线程池功能简单
package cn.thread.线程;
import java.util.LinkedList;
public class ThreePool extends ThreadGroup {
private boolean isClose = false;
private LinkedList taskQueue;
private static int threadPool_ID = 1;
//线程池的构造方法
public ThreePool(int poolSize) {
super(threadPool_ID+"");//规定线程池的名称
setDaemon(true);//守护线程
taskQueue = new LinkedList();//创建队列
for (int i=0;i<poolSize;i++){//启动线程任务
new TaskThread(i).start();
}
}
//添加新任务,并且执行
public synchronized void executeTask(Runnable task){
if(isClose){
throw new IllegalArgumentException("");
}
if(task!=null){
taskQueue.add(task);//向任务中队列中添加一个任务
notify();//唤醒
}
}
private synchronized Runnable getTask(int id) {
try{
while (taskQueue.size()==0){
if(isClose){
return null;
}else{
System.out.println("工作线程"+id+"在等待任务!");
wait();
}
}
}catch (Exception e){
System.out.println("等待任务出现错误!");
}
System.out.println("工作线程"+id+"开始执行工作");
return (Runnable) taskQueue.removeFirst();
}
private class TaskThread extends Thread{
private int id;
public TaskThread(int id){
super(ThreePool.this, String.valueOf(id));
this.id = id;
}
@Override
public void run() {
//判断当前线程是否处于中断状态
while(!isInterrupted()){
Runnable task =getTask(id); //通过id获取队列中的线程对象
if(task==null){
return;
}
try{
task.run();
}catch (Exception e){
e.printStackTrace();
}
}
}
}
//关闭线程池
public synchronized void closeThreadPool() throws InterruptedException {
if(!isClose){//判断标识
//等待线程的任务执行完毕
waitTaskFinish();
isClose = true;
taskQueue.clear();
interrupt();
}
}
public void waitTaskFinish() throws InterruptedException {
synchronized (this){
isClose = true;
notifyAll();
}
Thread[] threads = new Thread[activeCount()];//线程组中的活动数量相同
int count = enumerate(threads);
//循环等待线程结束
for(int i=0;i<count;i++){
threads[i].join();
}
}
}
以上是关于只需这篇文章java线程池原理便懂了!♥♥的主要内容,如果未能解决你的问题,请参考以下文章
Java并发系列终结篇:学校门口保安小王,这次彻底搞懂了Java线程池的工作原理