java 多线程相关

Posted 逝zxq

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java 多线程相关相关的知识,希望对你有一定的参考价值。

注 :示例图片来源于网络

1. 基本概念

  • 进程

进程是程序的一次执行过程,也就是说程序运行起来了,加载到了内存中,并占用了cpu的资源。这是一个动态的过程:有自身的产生、存在和消亡的过程,这也是进程的生命周期。

进程是系统资源分配的单位,系统在运行时会为每个进程分配不同的内存区域。

  • 线程

线程可以看做一段顺序的指令序列,线程是CPU调度的基本单位,每个线程有自己独立的栈和程序计数器,从同一个进程创建出来的线程会共享进程的内存空间。

线程的切换开销比进程小的多,Java采用一个进程多个线程的方式来实现并发。

  • 多线程

并发编程,即多个线程同一时间在CPU内执行指令流程,根据运行环境CPU的不同,可以分为单核CPU并发(时间片轮转)、多核CPU并行两种。

2. 如何创建一个线程(实际上是重写Runable接口的run()方法)

  • 继承Thread类
public class MyThread extends Thread {

    public MyThread(){}

    public MyThread(ThreadGroup group, String name){
        super(group,name);
    }

    @Override
    public void run() {
        System.out.println("This is Mythread, name is "+ getName() + ",ThreadGroup is " + getThreadGroup());

        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}
  • 实现Runnable接口
public class MyRunnable implements Runnable{

    private static final AtomicInteger poolNumber = new AtomicInteger(1);

    @Override
    public void run(){
        System.out.println("This is MyRunnable name is "+Thread.currentThread().getName());

        System.out.println("poolNumber = " + poolNumber.getAndIncrement());

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

  • 实现Callable接口
public class MyCallable implements Callable<String> {

    @Override
    public String call() {
        System.out.println("This is MyCallable name is "+Thread.currentThread().getName());

        return "MyCallable";
    }
}

        MyCallable callable = new MyCallable();
        FutureTask<String> futureTask = new FutureTask<String>(callable);
        new Thread(futureTask, "futureTask").start();

        try {
            System.out.println("拿到callable返回值:" + futureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
  • 三者区别

Runnable、Callable以接口形式实现,Thread以继承类形式实现,接口形式实现更适用于多个线程对共享资源的处理场景。

Java 只能继承一个父类,可以实现多个接口。

3. 线程的状态

Thread.State Thread中定义的几种状态
NEW 尚未启动的线程状态,即线程创建,还未调用start方法
RUNNABLE 就绪状态(调用start,等待调度)+正在运行
BLOCKED 等待监视器锁时,陷入阻塞状态
WAITING 等待状态的线程正在等待另一线程执行特定的操作(如notify)
TIMED_WAITING 具有指定等待时间的等待状态
TERMINATED 线程完成执行,终止状态

状态转化图:

4. Thread类源码粗看

注:无论用何种方式创建线程,最后都是要new Thread()一个实例出来执行线程的,即Thread的start()方法

4.1 Thread的一些关键属性:

  • name:线程名称,可以重复,若没有指定会自动生成。
  • id:线程ID,一个正long值,创建线程时指定,终生不变,线程终结时ID可以复用。
  • priority:线程优先级,取值为1到10,线程优先级越高,执行的可能越大,若运行环境不支持优先级分10级,如只支持5级,那么设置5和设置6有可能是一样的。
  • state:线程状态,Thread.State枚举类型,有NEW、RUNNABLE、BLOCKED、WAITING、TIMED_WAITING、TERMINATED 6种。
  • daemon:布尔值,区分当前线程是用户线程还是守护线程,用户线程在主线程结束后还会继续执行,守护线程在没有用户线程执行时才会关闭。
  • ThreadGroup:所属线程组,一个线程必然有所属线程组,线程组可以管理下面的线程优先级、状态和异常处理。
  • UncaughtExceptionHandler:未捕获异常时的处理器,默认没有,线程出现错误后会立即终止当前线程运行,并打印错误。

4.2 Thread(...)构造函数(实际调用init()方法,构造方法不涉及到初始化逻辑,需要的话进行统一封装)

// g 线程组,在java中以树形式存在,根线程组为system,线程组可以用来管理该组下一些线程的中断、异常等处理
// target 线程执行体(run方法)
// name 线程名称
// stackSize 线程预期堆栈大小,默认0
// acc 权限控制上下文
// inheritThreadLocals 自父线程集成而来的ThreadLocalMap 主要用于父子线程间ThreadLocal变量的传递
private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc,
                      boolean inheritThreadLocals) {
        if (name == null) {
            throw new NullPointerException("name cannot be null");
        }

        this.name = name;

        // 如果有安全管理器,会从安全管理器获取线程组信息,没有的话获取父线程的线程组
        Thread parent = currentThread();
        SecurityManager security = System.getSecurityManager();
        if (g == null) {
            if (security != null) {
                g = security.getThreadGroup();
            }

            if (g == null) {
                g = parent.getThreadGroup();
            }
        }
        g.checkAccess();

        if (security != null) {
            if (isCCLOverridden(getClass())) {
                security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION);
            }
        }

        g.addUnstarted();

        this.group = g;
        // 继承父线程的用户还是守护,和优先级
        this.daemon = parent.isDaemon(); //用户线程和守护线程,守护线程为后台特殊线程,后台结束,守护线程也结束。
        this.priority = parent.getPriority();
        if (security == null || isCCLOverridden(parent.getClass()))
            this.contextClassLoader = parent.getContextClassLoader();
        else
            this.contextClassLoader = parent.contextClassLoader;
        this.inheritedAccessControlContext =
                acc != null ? acc : AccessController.getContext();
        this.target = target;
        setPriority(priority);
        if (inheritThreadLocals && parent.inheritableThreadLocals != null)
            this.inheritableThreadLocals =
                ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
        this.stackSize = stackSize;

        tid = nextThreadID();
    }

5. 多线程间的同步与锁

5.1 原子性、可见性、有序性

  • 原子性:即一个操作或者多个操作,要么全部执行,要么全部不执行,具有原子性的变量,同一时间只能有一个线程对它进行操作。
  • 可见性:指的是当多个线程访问同一个变量的时候,一个线程修改了变量的值,其他线程能看到这个线程的修改。
  • 有序性:指的是程序执行的指令按代码顺序执行,java 中编译器和处理器都会对指令进行重排序,单线程不影响结果,多线程会有影响。

5.2 同步与锁

同步:多线程访问共享数据的时候,保证同一时刻共享数据只能被一个线程使用。

锁:线程在操作一个共享代码块的时候,加锁lock(),当线程操作完之后,在释放锁unlock(),锁使得拥有同步块的线程执行时互斥。

5.3 synchronized

  • synchronized定义

java 语言的关键字,当它用来修饰一个方法或代码块的时候,能保证该代码块的同步执行。

  • synchronized作用域

synchronized的作用域分为两种,一种是当前对象synchronized(this),一种是当前类synchronized(Object.class)。

synchronized(this):只作用于当前锁的this对象
synchronized(Object.class):作用于当前类下的所有实例对象。

  • synchronized的注意点

当一个线程访问synchronized(this)修饰的方法时,该线程获取this对象的锁,其他线程不能访问this的synchronized代码块,但其他线程可以访问非synchronized修饰的代码块。
synchronized作用于类和类的实例这两种锁,对synchronized代码块的调用是互不干扰的。

  • synchronized的原理

示例方法:

    public void setName(String name) throws InterruptedException {
        synchronized (this){
            this.name = name;

            Thread.sleep(2000);
        }
    }

反编译后:

synchronized的语义底层是通过一个monitor的监视器对象来完成。

经过反编译后可以发现synchronized修饰后的代码块前后加上了monitorenter和monitorexit这两条指令。

monitorenter:

每个对象有一个监视器锁(monitor)。当monitor被占用时就会处于锁定状态,线程执行monitorenter指令时尝试获取monitor的所有权,过程如下:

1、如果monitor的进入数为0,则该线程进入monitor,然后将进入数设置为1,该线程即为monitor的所有者。

2、如果线程已经占有该monitor,只是重新进入,则进入monitor的进入数加1.

3、如果其他线程已经占用了monitor,则该线程进入阻塞状态,直到monitor的进入数为0,再重新尝试获取monitor的所有权

monitorexit:

指令执行时,monitor的进入数减1,如果减1后进入数为0,那线程退出monitor,不再是这个monitor的所有者。其他被这个monitor阻塞的线程可以尝试去获取这个 monitor 的所有权。

从这里我们可以看到synchronized锁是排他的,是可重入的。

5.4 volatile

  • JMM模型 线程和主内存之间的抽象关系

共享变量存储在主内存中,每个线程有一个私有的内存空间保存着共享变量的副本,线程对共享变量操作是直接对本地内存进行操作,这会造成共享变量的不一致性。

  • volatile 保证共享变量的可见性

volatile修饰的共享变量,当一个线程对这个变量进行修改操作后,jvm会把线程本地内存中变量的新值刷新到主内存中,持有这个变量的其他线程,在私有内存中变量会失效,从主内存重新获取。

  • volatile 保证操作的有序性

jvm会在操作volatile变量的指令前后插入内存屏障,保证这个操作指令前后不会发生指令的重排序。

  • volatile 原理

在操作修饰变量指令的前后插入内存屏障。

注:volatile修饰的变量不保证操作的原子性。

5.5 synchronized+volatile实现懒汉式单例的双重检查

public class SingletonThread2 {

    // 禁止指令重排序
    private volatile static SingletonThread2 instance;

    public static SingletonThread2 getInstance() {
        if (instance == null) {

            // 加同步锁
            synchronized (SingletonThread2.class){
                if(instance == null){
                    //防止多个线程在外边等待进入
                    instance = new SingletonThread2();

                    // 指令重排序可能会变成 1 -> 3 ->2
//                    1、memory = allocate(); //分配对象的内存空间
//                    2、ctorInstance(memory); //初始化对象,赋初值
//                    3、instance = memory; //设置instance指向刚分配的内存地址
                }
            }
        }
        return instance;
    }
}

6. 线程池 ThreadPoolExecutor

  • 线程池概念

java池化管理线程资源的实现,线程池通过一些核心线程(空闲时不会被销毁),可以减少线程创建、销毁的资源损耗。

通过线程池还可以明确线程资源的分配,并且提高任务的响应速度(不必重新创建线程)。

  • ThreadPoolExecutor的构造函数
// corePoolSize 核心线程数,即空闲时保留的线程数
// maximumPoolSize 最大线程数,代表该线程池能同时执行任务的最大线程数
// keepAliveTime 线程空闲时间,一般超过corePoolSize数的线程最久能保留的时间
// unit 时间单位
// workQueue 线程的等待队列
// threadFactory 线程的制造工厂
// handler 拒绝策略,即线程池最大线程数满及等待队列满情况下,新增任务的处理策略
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
  • 线程池提交任务的流程图

  • 线程池任务提交流程对应execute()

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

7. Future

  • Future的例子

Future实现了线程的异步调用,Java利用Callable接口定义一个有返回值的call(),FutureTask类来封装返回值并控制任务执行

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        Long startTime = System.currentTimeMillis();

        FutureTask<String> futureTask1 = new FutureTask<>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(5000);
                return "future1";
            }
        });

        new Thread(futureTask1).start();

        FutureTask<String> futureTask2 = new FutureTask<>(new Callable<String>() {
            @Override
            public String call() throws Exception {
                Thread.sleep(3000);
                return "future2";
            }
        });

        new Thread(futureTask2).start();

        //等待future1和future2执行完 ,get()阻塞
        System.out.println(futureTask1.get() + "执行完!");
        System.out.println(futureTask2.get() + "执行完!");

        Long endTime = System.currentTimeMillis();

        // 5004 可以看出future1和future2是异步执行的
        System.out.println("共用时: " + (endTime - startTime));

    }
  • FutureTask继承图

  • Future接口

1、boolean cancel(boolean mayInterruptIfRunning)
尝试取消当前任务的执行。如果任务已经取消、已经完成或者其他原因不能取消,尝试将失败。如果任务还没有启动就调用了cancel(true),任务将永远不会被执行。如果任务已经启动,参数mayInterruptIfRunning将决定任务是否应该中断执行该任务的线程,以尝试中断该任务。
如果任务不能被取消,通常是因为它已经正常完成,此时返回false,否则返回true

2、boolean isCancelled()
如果任务在正常结束之前被被取消返回true

3、boolean isDone()
正常结束、异常或者被取消导致任务完成,将返回true

4、V get()
等待任务结束,然后获取结果,如果任务在等待过程中被终端将抛出InterruptedException,如果任务被取消将抛出CancellationException,如果任务中执行过程中发生异常将抛出ExecutionException。

5、V get(long timeout, TimeUnit unit)
任务最多在给定时间内完成并返回结果,如果没有在给定时间内完成任务将抛出TimeoutException。

  • FutureTask:实现了RunnableFuture接口(即实现了Future接口和Runnable接口)

注:FutureTask的finishCompletion方法作用 -> 调用finishCompletion唤醒所有等待结果的线程

线程的执行逻辑及赋值 FutureTask.run()方法

    public void run() {
        // java不能直接访问操作系统底层,而是通过本地方法来访问,Unsafe类提供了硬件级别的原子操作。
        // UNSAFE.compareAndSwapObject在obj的offset位置比较object field和期望的值,如果相同则更新。field值修改会返回true
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran; // 判断任务是否执行成功
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex); // 设置异常
                }
                if (ran)
                    set(result); // 设置任务执行结果
            }
        } finally {
            runner = null;
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

利用FutureTask获取线程的返回值 FutureTask.get()方法

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            // 等待任务完成
            s = awaitDone(false, 0L);
        return report(s); // 对返回值的封装
    }

FutureTask的awaitDone(): 等待任务完成,将等待线程放进waiters

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            // 判断线程是否已经中断
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            // 线程已完成
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                //如果其他线程也来执行get方法将waitNode的next指向新的
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                //唤醒park池中等待结果的线程
                LockSupport.parkNanos(this, nanos);
            }
            else
                //唤醒park池中等待结果的线程
                LockSupport.park(this);
        }
    }

8. CompletableFuture

  • 定义

java8新增对Future的补充,CompletableFuture支持流式计算、函数式编程等新特性,通过CompletableFuture,我们可以实现非阻塞的Future结果调用。

CompletableFuture实现了Future和CompletionStage两个接口,其中CompletionStage抽象了一些异步编程的补充方法。

  • 使用(可以分为以下几种类别)

       创建一个CompletableFuture:
       supplyAsync(有返回参数)/runAsync(无返回参数)
    
       手动改变CompletableFuture的返回值或异常
       completeExceptionally(throwable) 异常
       completedFuture(U value) 返回值
       
       创建一个回调函数: 将异步的结果同步或异步的处理
       thenApply/thenApplyAsync 接收上一个任务返回值做参数,有返回值
       thenAccept/thenAcceptAsync 接收上一个任务返回值做参数,但没有返回值
       thenRun/thenRunAsync 没有入参,也没有返回值
       
       异常处理方法:
       whenComplete 当某个任务执行完成后,传入执行结果(无论是否有异常)的回调方法,参数为(result,throwable)
       exceptionally 出现异常,会调用这个回调方法,参数为 (throwable)
       handle 异常捕捉方法,无论发生异常都会执行,参数为(result,throwable)
       
       组合两个CompletableFuture:
       两个有依赖关系的CompletableFuture,第一个执行后才执行第二个 thenCompose
       两个任务组合,两个任务正常执行后执行thenCombine(有传参、有返回值) / thenAcceptBoth (有传参、无返回值) / runAfterBoth(无传参、无返回值)
       两个任务组合,其中一个执行完成后执行applyToEither(有传参、有返回值) / acceptEither(有传参、无返回值) / runAfterEither(无传参、无返回值)
       
       组合多个CompletableFuture:
       allOf 等待一系列的future任务执行完成,全部正常执行完成返回null
       anyOf 多个future任务有一个执行完成就返回
       
  • 原理

以上是关于java 多线程相关的主要内容,如果未能解决你的问题,请参考以下文章

什么是Java多线程编程?

Java多线程——Lock&Condition

Java多线程与并发库高级应用-工具类介绍

Java多线程与并发库高级应用-工具类介绍

多线程 Thread 线程同步 synchronized

java网络多线程专题