Java 多线程学习扩展

Posted betterLearing

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java 多线程学习扩展相关的知识,希望对你有一定的参考价值。

一、Java内存模型与线程

由于计算机的存储设配与处理器的运算速度有几个数量级的差距,所以加入高速缓存来作为内存与处理器之间的缓冲。

Java虚拟机规范定义一种Java内存模型来屏蔽各种硬件和操作系统的内存访问差异。

 1.1 Java Memery Model即JMM

① JMM规定所有变量规定了所有的变量都存在主内存中。

② 每条线程还有自己的工作内存,线程的工作内存中保存了被该线程使用到的变量的主内存副本拷贝,线程对变量的所有操作都必须在工作内存中进行,不能直接读写主内存的变量。

③ 不同线程无法访问对方的工作内存,线程间变量值的传递需要通过主内存来实现。

 

1.2 对volatile 型变量的特殊规则

  关键字volatile是Java虚拟机提供的最轻量级的同步机制。

  ①  可见性

    当一条线程修改了这个变量的值,新值对于其他线程来说可以立刻得知。

    使用场景:主线程中,某个用volatile修饰的变量-如是否配置信息加载完成,定义成boolean,初始值是false,线程A负责加载配置信息,加载完后,它会把false改成true,而线程B采用while循环,一但加载完配置信息的标记从false,改成true,线程B就立马可以感知,线程B就可以进行相应的初始化工作。

  ② 非原子性

    在多个线程同时拷贝主内存的变量到自己的工作内存中,对副本进行修改,之后各自向主内存写入操作的时候,出现写覆盖,写丢失。

  ③ 禁止指令重排

    (Double Check Lock)双端检锁-单例模式

public class Singleton {
    /**
     *  volatile:禁止指令重排
     *  
     *  new Singleton():
     *  指令①:分配内存地址
     *  指令②:初始化
     *  指令③ :指针指向初始化后的变量
     */
    private volatile static Singleton instance;
    
    private Singleton() {
        System.out.println("no args construtor...");
    }
    
    /**
     * double check lock
     */
    public static Singleton getInstance() {
        if (instance == null) {
            synchronized (Singleton.class) {
                if (instance == null) {
                    instance = new Singleton();
                }
            }
        }
        return instance;
    }
}

上述代码中,如果instance没有用volatile修饰,那么在高并发的情况下,有可能出现指令③和指令②重排,对于其他线程来说,由于被synchronized锁住代码块,但是当前拿到锁的线程已经有了指针指向变量(实际上还没有初始化),其他线程直接用这个引用操作的时候,就会发生异常。因为这个指针的引用指向的对象还没有完成初始化。

1.3 Java Memery Model 规范

  ① 可见性:当一个线程修改了共享变量的值,其他线程能够立刻得知这个变量的修改。

  volatile的特殊规则保证了新值能立即同步到主内存,以及每次使用前立即从主内存刷新。

  ② 原子性:Java给用户开发了synchronized关键字(底层实现monitorenter 与monitorexit)

  ③ 有序性

    volatile与synchroniezed(一个变量在同一时刻只允许一条线程来对其进行lock操作)

    

1.4 状态转化

  一个线程在任意时间点,只能有且只有其中一种

  ① 先建New:创建后尚未启动的线程处于这种状态

  ② 运行Runable:可能正在执行,或等待CPU为他分配执行时间。

  ③ 无限期等待Waiting:处于这种状态的线程不会被分配CPU执行时间,它需要被其他线程唤醒。

  ④ 限期等待Timed Waiting:一段时间后,自动唤醒。

  ⑤ 阻塞Blocked:线程等待进入同步代码块时,进入这种状态。

  ⑥ 结束Terminated:已终止的线程状态。

1.5 CAS(比较并交换)-常用的原子基本类型如AtomicInteger

CAS(Compare-And-Swap),它是一条CPU并发原语,原语的执行必须是连续的,在执行过程中不允许被中断,不会造成所谓的数据不一致问题。

源码:

    /**
     * Atomically sets the value to the given updated value
     * if the current value {@code ==} the expected value.
     */
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }
    /**
     * Atomically increments by one the current value.
     */
    public final int getAndIncrement() {
        return unsafe.getAndAddInt(this, valueOffset, 1);
    }

 unsafe类:

  是CAS的核心类,由于Java方法无法直接访问底层系统,需要通过native方法来访问,Unsafe相当于一个后门,基于该类可以直接操作特定内存的数据。Unsafe类存在于sun.misc包中,其内部可以像C的指针一样直接操作内存,Java中CAS操作的执行依赖于Unsafe类的方法。

    /**
     *  Unsafe.class:getAndAddInt
     *  如果工作内存中预期的值与主内存中一致,那么就把更新后的值(var5 +var4)写入主内存中,否则继续循环(线程自旋)
     *  compareAndSwapInt 返回 boolean    
     */
    
    public final int getAndAddInt(Object var1,long var2,int var4) {
        int var5;
        do {
            var5 = this.getIntVolatile(var1,var2);
        }while (!this.compareAndSwapInt(var1,var2,var5,var5 + var4))
        return var5;
    }

CAS缺点:

  ① ABA问题--解决(时间戳原则引用)--AtomicStampReference<>()。

  ② 循环开销时间很大

  ③ 只能保证一个共享变量的原子操作 

1.6  锁

  ① 公平锁和非公平锁

    公平锁:指多个线程按照申请锁的顺序来获取锁,并发环境,每个线程在获取锁时先查看此锁维护的等待队列,如果为空,或者当前线程是等待队列的第一个,就占有,否则就会加入到等待队列中,以后按照FIFO从规则中获取自己。

    非公平锁:获取锁的顺序不是按照申请锁的顺序,可能后申请的线程比先申请的线程优先获得锁,在高并发的情况下,可能造成优先级反转或者饥饿现象。

  ② 可重入锁(又名递归锁):线程可以进入任何一个它已经拥有的锁所同步的代码块。 

  ③ 独占锁/共享锁

  ④ 自旋锁 

1.7 阻塞队列 BlockQueue

 

 

  传统1:

// 资源类
class ShareData {
    private int number = 0;

  public void increment() throws InterruptedException { synchronized (this){ // 判断 while (number !=0){ this.wait(); } // 干活 number++; System.out.println(Thread.currentThread().getName() + "\\t " +number); // 唤醒 this.notifyAll(); } } public void decrement() throws InterruptedException { synchronized (this){ // 判断 while (number ==0){ this.wait(); } // 干活 number--; System.out.println(Thread.currentThread().getName() + "\\t " +number); // 唤醒 this.notifyAll(); } } } /** * 场景:一个初始值为零的变量,两个线程对其交替操作,一个加1,一个减1,连续5轮. * * 1 线程 操作 资源类 * 2 判断 干活 通知 * 3 防止虚假唤醒通知 */ public class Prod_Consumer_BlockQueueDemo { public static void main(String[] args) { } }

传统2:

// 资源类
class ShareData {
    private int number = 0;
    private Lock lock = new ReentrantLock();
    Condition condition = lock.newCondition();

    public void increment() throws InterruptedException {
        lock.lock();
        try {
            // 判断
            while (number != 0) {
                //等待,不能生产
                condition.await();
            }
            // 干活
            number++;
            System.out.println(Thread.currentThread().getName() + "\\t " + number);
            // 通知
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void decrement() throws InterruptedException {
        lock.lock();
        try {
            // 判断
            while (number == 0) {
                //等待,不能消费
                condition.await();
            }
            // 干活
            number--;
            System.out.println(Thread.currentThread().getName() + "\\t " + number);
            // 通知
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
/**
 * 场景:一个初始值为零的变量,两个线程对其交替操作,一个加1,一个减1,连续5轮.
 *
 * 1 线程  操作  资源类
 * 2 判断  干活  通知
 * 3 防止虚假唤醒通知
 */
public class Prod_Consumer_BlockQueueDemo {
    public static void main(String[] args) {
        ShareData shareData = new ShareData();

        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.increment();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "AA").start();


        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                try {
                    shareData.decrement();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }, "BB").start();
    }
}

 1.8 synchronized 与Lock区别

  ①. 原始构成

  synchronized是关键字属于JVM层面:monitorenter、monitorexit(底层是通过monitor对象来完成,其实wait/notify等方法也是依赖于monitor对象只有在同步块或者方法中才能调用)

   Lock是具体类(java.util.concurrent.locks.Lock)是api层面的锁

  ② 使用方法

  synchronized不需要用户去手动释放锁,当synchronized代码执行完后系统会自动让线程释放对锁的占用。(即使执行发生异常也会释放锁,毕竟有两个monitorexit)

  ReetrantLock则需要用户手动释放锁,如果没有主动释放,就可能导致死锁的存在。lock.lock  try{...}....finally {lock.unlock}

  ③ 等待是否中断

  synchronized不可中断,除非抛出异常或者正常执行完毕。

  ReentrancLock可中断。a: 设置超时方法tryLock(long timeout,TimeUnit unit)。b: lockInterruptibly()放入代码块中,调用interrupt方法可以中断。

  ④ 加锁是否公平

  synchronized非公平锁。

  ReentrancLock两者都可以,默认非公平锁,构造方法传入boolean值,true为公平锁,false为非公平锁。

  ⑤ 锁绑定多个条件Condiion

  synchronized没有

  ReentrantLock用来实现分组唤醒需要的线程,可以精确唤醒,而不是像synchronized要么随机唤醒一个线程要么唤醒全部线程。

 

class shareResource{
    private int number =1;
    private Lock lock = new ReentrantLock();
    private Condition c1 = lock.newCondition();
    private Condition c2 = lock.newCondition();
    private Condition c3 = lock.newCondition();

    public void print1(){
        // 同步-判断-执行-通知
        lock.lock();
        try {
            while (number !=1){
                c1.await();
            }
            for (int i = 1; i <=1 ; i++) {
                System.out.println(Thread.currentThread().getName() + "\\t " + i) ;
            }
            number =2;
            c2.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    public void print2(){
        // 同步-判断-执行-通知
        lock.lock();
        try {
            while (number !=2){
                c2.await();
            }
            for (int i = 1; i <=2 ; i++) {
                System.out.println(Thread.currentThread().getName() + "\\t " + i) ;
            }
            number =3;
            c3.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
    public void print3(){
        // 同步-判断-执行-通知
        lock.lock();
        try {
            while (number !=3){
                c3.await();
            }
            for (int i = 1; i <=3 ; i++) {
                System.out.println(Thread.currentThread().getName() + "\\t " + i) ;
            }
            number =1;
            c1.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}
public class LockConditions {
    public static void main(String[] args) {
        shareResource shareResource = new shareResource();

        new Thread(()->{
            // 来三轮
            for (int i = 0; i < 3; i++) {
                shareResource.print1();
            }
        },"A").start();
        new Thread(()->{
            // 来三轮
            for (int i = 0; i < 3; i++) {
                shareResource.print2();
            }
        },"B").start();
        new Thread(()->{
            // 来三轮
            for (int i = 0; i < 3; i++) {
                shareResource.print3();
            }
        },"C").start();
    }
}

1.9 Callable接口

/**
 *
 * <p>The {@code Callable} interface is similar to {@link
 * java.lang.Runnable}, in that both are designed for classes whose
 * instances are potentially executed by another thread.  A
 * {@code Runnable}, however, does not return a result and cannot
 * throw a checked exception.
 */
@FunctionalInterface
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     */
    V call() throws Exception;
}

 

 

 

 

 

 

class MyThread implements Callable<Integer>{

    @Override
    public Integer call() throws Exception {
        System.out.println("come in Callbale...");
        try {
            System.out.println(Thread.currentThread().getName()+"*****");
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return 1024;
    }
}
public class CallableDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // public FutureTask(Callable<V> callable)
        FutureTask<Integer> futureTask = new FutureTask(new MyThread());
        new Thread(futureTask,"AA").start();
        // 多个线程抢futureTask只抢一次
        new Thread(futureTask,"BB").start();

        System.out.println(Thread.currentThread().getName() + "****");
        int result2 = 100;
        // 任务执行完需要时间
        while (!futureTask.isDone()){}

        //  futureTask.get() 尽量放在最后
        int result = result2 + futureTask.get();
        System.out.println("******result: " + futureTask.get());
    }
}

 

2.0 CountDownLatch/CyclicBarrier/Semaphore

CountDownLatch:让一些线程阻塞到另一些线程完成一系列操作后才被唤醒。

CountDownLatch主要有两个方法,当一个或多个线程调用await方法时,调用线程会被阻塞。其他线程调用countDown方法会将计数器减1(调用countDown方法的线程不会阻塞),当计数器的值变为零时,因调用await方法被阻塞的线程会被唤醒,开始继续执行。

public class CountDownLatchDemo {
    private static final Integer COUNT = 6;
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(COUNT);
        for (int i = 1; i <= COUNT; i++) {
            new Thread(()-> {
                System.out.println(Thread.currentThread().getName() + " worker getOn" );
                countDownLatch.countDown();
            },WorksEnum.getWorkName(Integer.valueOf(i))).start();;
        }
        countDownLatch.await();
        System.out.println("Driver drive");
    }
}
import lombok.Getter;

public enum WorksEnum {
    ONE(1, "张三"), TWO(2, "李四"), THREE(3, "王五"), FOUR(4, "赵六"), FIVE(5, "孙七"), SIX(6, "刘八");
    
    @Getter
    private Integer workId;
    
    @Getter
    private String workName;
    
    private WorksEnum(Integer workId, String workName) {
        this.workId = workId;
        this.workName = workName;
    }
    
    public static WorksEnum getWorksEnum(Integer workId){
        WorksEnum[] works = WorksEnum.values();
        if(workId ==null){
            return null;
        }
        for (WorksEnum worksEnum : works) {
            if(workId == worksEnum.workId){
                return worksEnum;
            }
        }
        return null;
    }
    
    public static String getWorkName(Integer workId){
        WorksEnum[] works = WorksEnum.values();
        if(workId ==null){
            return null;
        }
        for (WorksEnum worksEnum : works) {
            if(workId == worksEnum.workId){
                return worksEnum.workName;
            }
        }
        return null;
    }
}

 CycliBarrier:可以循环使用的屏障。让一组线程到达一个屏障(或同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会打开,所有被屏障拦截的线程才会继续干活,线程进入屏障通过CyclicBarrier的await()方法。

public class CyclicBarrierDemo {
    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(6,()->{System.out.println("Boss 来开会");});
        for (int i = 1; i <= 6; i++) {
            final int workNo = i;
            new Thread(() -> {
                System.out.println("员工" + workNo + "来会议间...等着");
                try {
                    cyclicBarrier.await();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            },String.valueOf(i)).start();;
        }
    }
}

Semaphore:“信号量”主要用于两个目的,一是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。

public class SemaphoreDemo {
    public static void main(String[] args)  {
        Semaphore semaphore = new Semaphore(3);
        for (int i = 1; i <= 6; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + " :抢到车位" );
                    TimeUnit.SECONDS.sleep(3);
                    System.out.println(Thread.currentThread().getName() + " :停了3秒后释放");
                    semaphore.release();
                } catch (Exception e1) {
                    e1.printStackTrace();
                }
            }, String.valueOf(i)).start();;
        }
    }
}

 

2.1 线程池

  线程池主要工作:控制运行的线程数量,处理过程中将任务放入队列,然后再线程创建后启动这些任务,如果线程数量超过了最大核心的数量,进入线程队列等候,等其他线程执行完毕,再从队列中取出任务来执行。

  特点:线程复用,控制最大并发数,管理线程。

  第一:降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。

  第二:提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立刻被线程池中已有的线程执行。

  第三:提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配,调优和监控。

  

 

 

 

public class ExecutorsDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(5); // 一池5个处理线程
        ExecutorService executorService1 = Executors.newSingleThreadExecutor(); // 一池1个处理线程
        ExecutorService executorService2 = Executors.newCachedThreadPool(); // 一池N个处理线程

        try {
            for (int i = 0; i < 30; i++) {
                executorService2.execute(() ->{
                    System.out.println(Thread.currentThread().getName() +"\\t 办理业务");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            executorService.shutdown();
        }
    }
}
    /**
     *
     * @param corePoolSize the number of threads to keep in the pool, even
     *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
     * @param maximumPoolSize the maximum number of threads to allow in the
     *        pool
     * @param keepAliveTime when the number of threads is greater than
     *        the core, this is the maximum time that excess idle threads
     *        will wait for new tasks before terminating.
     * @param unit the time unit for the {@code keepAliveTime} argument
     * @param workQueue the queue to use for holding tasks before they are
     *        executed.  This queue will hold only the {@code Runnable}
     *        tasks submitted by the {@code execute} method.
     * @param threadFactory the factory to use when the executor
     *        creates a new thread
     * @param handler the handler to use when execution is blocked
     *        because the thread bounds and queue capacities are reached
     */
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {

 

 

 

  ① 再创建了线程池后,等待提交过来的任务请求。

  ② 当调用execute()方法添加一个请求任务时,线程池会做如下判断:

    * 如果正在运行的线程数量小于corePoolSize,那么马上创建线程运行这个任务;

    * 如果正在运行的线程数量大于或等于corePoolSize,那么将这个任务放入队列;

    * 如果这时候队列满了且正在运行的线程数量还小于maxinumPoolSize,那么还是要创建非核心线程立刻运行这个任务;

    * 如果队列满了且正在运行的线程数量大于或等于maxinumPoolSize,那么线程池会启动饱和拒绝策略来执行。

  ③ 当一个线程完成任务时,它会从队列中取下一个任务来执行。

  ④ 当一个线程无事可做超过一定的时间(keepAliveTime)时,线程池会判断;

    * 如果当前运行的线程数大于corePoolSize,那么这个线程就被停掉。

    * 所有线程池的所有任务完成后它最终会收缩到corePoolSize的大小。

 

 

 

 

 

 

  

   

http://www.imooc.com/video/5176

一、如何扩展Java并发知识

Java Memory Mode

  JMM描述了Java线程如何通过内存进行交互

  happens-before原则

  synchronized,volatile&final

Locks &Conditon(Java1.5引入,加锁-同步通信)

  Java锁机制和等待条件的高层实现

  java.util.concurrent.locks

线程安全性

  原子性与可见性

  java.util.concurrent.atomic

  synchronized&volatile

  DeadLocks

多线程编程常用的交互模型

  Producer-Consumer模型

  Read-Write Lock模型

  Future模型

  Worker Thread模型

Java 1.5 中并发编程工具

  java.util.concurrent

  如:线程池ExecutorService;Callable & Future;BlockingQueue

推荐书籍:

以上是关于Java 多线程学习扩展的主要内容,如果未能解决你的问题,请参考以下文章

线程学习知识点总结

python小白学习记录 多线程爬取ts片段

学习java第19天个人总结

Java多线程学习

Java多线程学习(吐血超详细总结)

Java多线程学习(吐血超详细总结)