java 多线程相关
Posted 逝zxq
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java 多线程相关相关的知识,希望对你有一定的参考价值。
注 :示例图片来源于网络
1. 基本概念
- 进程
进程是程序的一次执行过程,也就是说程序运行起来了,加载到了内存中,并占用了cpu的资源。这是一个动态的过程:有自身的产生、存在和消亡的过程,这也是进程的生命周期。
进程是系统资源分配的单位,系统在运行时会为每个进程分配不同的内存区域。
- 线程
线程可以看做一段顺序的指令序列,线程是CPU调度的基本单位,每个线程有自己独立的栈和程序计数器,从同一个进程创建出来的线程会共享进程的内存空间。
线程的切换开销比进程小的多,Java采用一个进程多个线程的方式来实现并发。
- 多线程
并发编程,即多个线程同一时间在CPU内执行指令流程,根据运行环境CPU的不同,可以分为单核CPU并发(时间片轮转)、多核CPU并行两种。
2. 如何创建一个线程(实际上是重写Runable接口的run()方法)
- 继承Thread类
public class MyThread extends Thread {
public MyThread(){}
public MyThread(ThreadGroup group, String name){
super(group,name);
}
@Override
public void run() {
System.out.println("This is Mythread, name is "+ getName() + ",ThreadGroup is " + getThreadGroup());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- 实现Runnable接口
public class MyRunnable implements Runnable{
private static final AtomicInteger poolNumber = new AtomicInteger(1);
@Override
public void run(){
System.out.println("This is MyRunnable name is "+Thread.currentThread().getName());
System.out.println("poolNumber = " + poolNumber.getAndIncrement());
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- 实现Callable接口
public class MyCallable implements Callable<String> {
@Override
public String call() {
System.out.println("This is MyCallable name is "+Thread.currentThread().getName());
return "MyCallable";
}
}
MyCallable callable = new MyCallable();
FutureTask<String> futureTask = new FutureTask<String>(callable);
new Thread(futureTask, "futureTask").start();
try {
System.out.println("拿到callable返回值:" + futureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
- 三者区别
Runnable、Callable以接口形式实现,Thread以继承类形式实现,接口形式实现更适用于多个线程对共享资源的处理场景。
Java 只能继承一个父类,可以实现多个接口。
3. 线程的状态
Thread.State | Thread中定义的几种状态 |
---|---|
NEW | 尚未启动的线程状态,即线程创建,还未调用start方法 |
RUNNABLE | 就绪状态(调用start,等待调度)+正在运行 |
BLOCKED | 等待监视器锁时,陷入阻塞状态 |
WAITING | 等待状态的线程正在等待另一线程执行特定的操作(如notify) |
TIMED_WAITING | 具有指定等待时间的等待状态 |
TERMINATED | 线程完成执行,终止状态 |
状态转化图:
4. Thread类源码粗看
注:无论用何种方式创建线程,最后都是要new Thread()一个实例出来执行线程的,即Thread的start()方法
4.1 Thread的一些关键属性:
- name:线程名称,可以重复,若没有指定会自动生成。
- id:线程ID,一个正long值,创建线程时指定,终生不变,线程终结时ID可以复用。
- priority:线程优先级,取值为1到10,线程优先级越高,执行的可能越大,若运行环境不支持优先级分10级,如只支持5级,那么设置5和设置6有可能是一样的。
- state:线程状态,Thread.State枚举类型,有NEW、RUNNABLE、BLOCKED、WAITING、TIMED_WAITING、TERMINATED 6种。
- daemon:布尔值,区分当前线程是用户线程还是守护线程,用户线程在主线程结束后还会继续执行,守护线程在没有用户线程执行时才会关闭。
- ThreadGroup:所属线程组,一个线程必然有所属线程组,线程组可以管理下面的线程优先级、状态和异常处理。
- UncaughtExceptionHandler:未捕获异常时的处理器,默认没有,线程出现错误后会立即终止当前线程运行,并打印错误。
4.2 Thread(...)构造函数(实际调用init()方法,构造方法不涉及到初始化逻辑,需要的话进行统一封装)
// g 线程组,在java中以树形式存在,根线程组为system,线程组可以用来管理该组下一些线程的中断、异常等处理
// target 线程执行体(run方法)
// name 线程名称
// stackSize 线程预期堆栈大小,默认0
// acc 权限控制上下文
// inheritThreadLocals 自父线程集成而来的ThreadLocalMap 主要用于父子线程间ThreadLocal变量的传递
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
if (name == null) {
throw new NullPointerException("name cannot be null");
}
this.name = name;
// 如果有安全管理器,会从安全管理器获取线程组信息,没有的话获取父线程的线程组
Thread parent = currentThread();
SecurityManager security = System.getSecurityManager();
if (g == null) {
if (security != null) {
g = security.getThreadGroup();
}
if (g == null) {
g = parent.getThreadGroup();
}
}
g.checkAccess();
if (security != null) {
if (isCCLOverridden(getClass())) {
security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION);
}
}
g.addUnstarted();
this.group = g;
// 继承父线程的用户还是守护,和优先级
this.daemon = parent.isDaemon(); //用户线程和守护线程,守护线程为后台特殊线程,后台结束,守护线程也结束。
this.priority = parent.getPriority();
if (security == null || isCCLOverridden(parent.getClass()))
this.contextClassLoader = parent.getContextClassLoader();
else
this.contextClassLoader = parent.contextClassLoader;
this.inheritedAccessControlContext =
acc != null ? acc : AccessController.getContext();
this.target = target;
setPriority(priority);
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
this.stackSize = stackSize;
tid = nextThreadID();
}
5. 多线程间的同步与锁
5.1 原子性、可见性、有序性
- 原子性:即一个操作或者多个操作,要么全部执行,要么全部不执行,具有原子性的变量,同一时间只能有一个线程对它进行操作。
- 可见性:指的是当多个线程访问同一个变量的时候,一个线程修改了变量的值,其他线程能看到这个线程的修改。
- 有序性:指的是程序执行的指令按代码顺序执行,java 中编译器和处理器都会对指令进行重排序,单线程不影响结果,多线程会有影响。
5.2 同步与锁
同步:多线程访问共享数据的时候,保证同一时刻共享数据只能被一个线程使用。
锁:线程在操作一个共享代码块的时候,加锁lock(),当线程操作完之后,在释放锁unlock(),锁使得拥有同步块的线程执行时互斥。
5.3 synchronized
- synchronized定义
java 语言的关键字,当它用来修饰一个方法或代码块的时候,能保证该代码块的同步执行。
- synchronized作用域
synchronized的作用域分为两种,一种是当前对象synchronized(this),一种是当前类synchronized(Object.class)。
synchronized(this):只作用于当前锁的this对象
synchronized(Object.class):作用于当前类下的所有实例对象。
- synchronized的注意点
当一个线程访问synchronized(this)修饰的方法时,该线程获取this对象的锁,其他线程不能访问this的synchronized代码块,但其他线程可以访问非synchronized修饰的代码块。
synchronized作用于类和类的实例这两种锁,对synchronized代码块的调用是互不干扰的。
- synchronized的原理
示例方法:
public void setName(String name) throws InterruptedException {
synchronized (this){
this.name = name;
Thread.sleep(2000);
}
}
反编译后:
synchronized的语义底层是通过一个monitor的监视器对象来完成。
经过反编译后可以发现synchronized修饰后的代码块前后加上了monitorenter和monitorexit这两条指令。
monitorenter:
每个对象有一个监视器锁(monitor)。当monitor被占用时就会处于锁定状态,线程执行monitorenter指令时尝试获取monitor的所有权,过程如下:
1、如果monitor的进入数为0,则该线程进入monitor,然后将进入数设置为1,该线程即为monitor的所有者。
2、如果线程已经占有该monitor,只是重新进入,则进入monitor的进入数加1.
3、如果其他线程已经占用了monitor,则该线程进入阻塞状态,直到monitor的进入数为0,再重新尝试获取monitor的所有权
monitorexit:
指令执行时,monitor的进入数减1,如果减1后进入数为0,那线程退出monitor,不再是这个monitor的所有者。其他被这个monitor阻塞的线程可以尝试去获取这个 monitor 的所有权。
从这里我们可以看到synchronized锁是排他的,是可重入的。
5.4 volatile
- JMM模型 线程和主内存之间的抽象关系
共享变量存储在主内存中,每个线程有一个私有的内存空间保存着共享变量的副本,线程对共享变量操作是直接对本地内存进行操作,这会造成共享变量的不一致性。
- volatile 保证共享变量的可见性
volatile修饰的共享变量,当一个线程对这个变量进行修改操作后,jvm会把线程本地内存中变量的新值刷新到主内存中,持有这个变量的其他线程,在私有内存中变量会失效,从主内存重新获取。
- volatile 保证操作的有序性
jvm会在操作volatile变量的指令前后插入内存屏障,保证这个操作指令前后不会发生指令的重排序。
- volatile 原理
在操作修饰变量指令的前后插入内存屏障。
注:volatile修饰的变量不保证操作的原子性。
5.5 synchronized+volatile实现懒汉式单例的双重检查
public class SingletonThread2 {
// 禁止指令重排序
private volatile static SingletonThread2 instance;
public static SingletonThread2 getInstance() {
if (instance == null) {
// 加同步锁
synchronized (SingletonThread2.class){
if(instance == null){
//防止多个线程在外边等待进入
instance = new SingletonThread2();
// 指令重排序可能会变成 1 -> 3 ->2
// 1、memory = allocate(); //分配对象的内存空间
// 2、ctorInstance(memory); //初始化对象,赋初值
// 3、instance = memory; //设置instance指向刚分配的内存地址
}
}
}
return instance;
}
}
6. 线程池 ThreadPoolExecutor
- 线程池概念
java池化管理线程资源的实现,线程池通过一些核心线程(空闲时不会被销毁),可以减少线程创建、销毁的资源损耗。
通过线程池还可以明确线程资源的分配,并且提高任务的响应速度(不必重新创建线程)。
- ThreadPoolExecutor的构造函数
// corePoolSize 核心线程数,即空闲时保留的线程数
// maximumPoolSize 最大线程数,代表该线程池能同时执行任务的最大线程数
// keepAliveTime 线程空闲时间,一般超过corePoolSize数的线程最久能保留的时间
// unit 时间单位
// workQueue 线程的等待队列
// threadFactory 线程的制造工厂
// handler 拒绝策略,即线程池最大线程数满及等待队列满情况下,新增任务的处理策略
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
-
线程池提交任务的流程图
-
线程池任务提交流程对应execute()
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
7. Future
- Future的例子
Future实现了线程的异步调用,Java利用Callable接口定义一个有返回值的call(),FutureTask类来封装返回值并控制任务执行
public static void main(String[] args) throws ExecutionException, InterruptedException {
Long startTime = System.currentTimeMillis();
FutureTask<String> futureTask1 = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(5000);
return "future1";
}
});
new Thread(futureTask1).start();
FutureTask<String> futureTask2 = new FutureTask<>(new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(3000);
return "future2";
}
});
new Thread(futureTask2).start();
//等待future1和future2执行完 ,get()阻塞
System.out.println(futureTask1.get() + "执行完!");
System.out.println(futureTask2.get() + "执行完!");
Long endTime = System.currentTimeMillis();
// 5004 可以看出future1和future2是异步执行的
System.out.println("共用时: " + (endTime - startTime));
}
- FutureTask继承图
- Future接口
1、boolean cancel(boolean mayInterruptIfRunning)
尝试取消当前任务的执行。如果任务已经取消、已经完成或者其他原因不能取消,尝试将失败。如果任务还没有启动就调用了cancel(true),任务将永远不会被执行。如果任务已经启动,参数mayInterruptIfRunning将决定任务是否应该中断执行该任务的线程,以尝试中断该任务。
如果任务不能被取消,通常是因为它已经正常完成,此时返回false,否则返回true
2、boolean isCancelled()
如果任务在正常结束之前被被取消返回true
3、boolean isDone()
正常结束、异常或者被取消导致任务完成,将返回true
4、V get()
等待任务结束,然后获取结果,如果任务在等待过程中被终端将抛出InterruptedException,如果任务被取消将抛出CancellationException,如果任务中执行过程中发生异常将抛出ExecutionException。
5、V get(long timeout, TimeUnit unit)
任务最多在给定时间内完成并返回结果,如果没有在给定时间内完成任务将抛出TimeoutException。
- FutureTask:实现了RunnableFuture接口(即实现了Future接口和Runnable接口)
注:FutureTask的finishCompletion方法作用 -> 调用finishCompletion唤醒所有等待结果的线程
线程的执行逻辑及赋值 FutureTask.run()方法
public void run() {
// java不能直接访问操作系统底层,而是通过本地方法来访问,Unsafe类提供了硬件级别的原子操作。
// UNSAFE.compareAndSwapObject在obj的offset位置比较object field和期望的值,如果相同则更新。field值修改会返回true
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran; // 判断任务是否执行成功
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex); // 设置异常
}
if (ran)
set(result); // 设置任务执行结果
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
利用FutureTask获取线程的返回值 FutureTask.get()方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
// 等待任务完成
s = awaitDone(false, 0L);
return report(s); // 对返回值的封装
}
FutureTask的awaitDone(): 等待任务完成,将等待线程放进waiters
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// 判断线程是否已经中断
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 线程已完成
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
//如果其他线程也来执行get方法将waitNode的next指向新的
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
//唤醒park池中等待结果的线程
LockSupport.parkNanos(this, nanos);
}
else
//唤醒park池中等待结果的线程
LockSupport.park(this);
}
}
8. CompletableFuture
- 定义
java8新增对Future的补充,CompletableFuture支持流式计算、函数式编程等新特性,通过CompletableFuture,我们可以实现非阻塞的Future结果调用。
CompletableFuture实现了Future和CompletionStage两个接口,其中CompletionStage抽象了一些异步编程的补充方法。
- 使用(可以分为以下几种类别)
创建一个CompletableFuture:
supplyAsync(有返回参数)/runAsync(无返回参数)
手动改变CompletableFuture的返回值或异常
completeExceptionally(throwable) 异常
completedFuture(U value) 返回值
创建一个回调函数: 将异步的结果同步或异步的处理
thenApply/thenApplyAsync 接收上一个任务返回值做参数,有返回值
thenAccept/thenAcceptAsync 接收上一个任务返回值做参数,但没有返回值
thenRun/thenRunAsync 没有入参,也没有返回值
异常处理方法:
whenComplete 当某个任务执行完成后,传入执行结果(无论是否有异常)的回调方法,参数为(result,throwable)
exceptionally 出现异常,会调用这个回调方法,参数为 (throwable)
handle 异常捕捉方法,无论发生异常都会执行,参数为(result,throwable)
组合两个CompletableFuture:
两个有依赖关系的CompletableFuture,第一个执行后才执行第二个 thenCompose
两个任务组合,两个任务正常执行后执行thenCombine(有传参、有返回值) / thenAcceptBoth (有传参、无返回值) / runAfterBoth(无传参、无返回值)
两个任务组合,其中一个执行完成后执行applyToEither(有传参、有返回值) / acceptEither(有传参、无返回值) / runAfterEither(无传参、无返回值)
组合多个CompletableFuture:
allOf 等待一系列的future任务执行完成,全部正常执行完成返回null
anyOf 多个future任务有一个执行完成就返回
- 原理
以上是关于java 多线程相关的主要内容,如果未能解决你的问题,请参考以下文章