Java JUC
Posted 木心
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java JUC相关的知识,希望对你有一定的参考价值。
1、volatile 关键字与内存可见性
2、原子变量与 CAS 算法
3、同步容器类
4、闭锁 CountDownLatch
5、使用 Callable 创建线程
6、Lock 同步锁
7、生产者消费者案例--虚假等待
8、线程按序交替执行
9、读写锁 ReadWriteLock
10、线程八锁
11、线程池
12、线程调度(定时任务)
13、ForkJoinPool 分支合并框架--工作窃取
1、volatile 关键字与内存可见性 <--返回目录
什么是内存可见性:当多个线程操作共享数据时,彼此不可见。把变量声明为 volatile,这就指示 JVM,这个变量是不稳定的,每次使用它都到主存中进行读取。
demo:测试线程数据没有及时与主内存数据进行同步
package com.oy; public class TestVolatile { public static void main(String[] args) throws InterruptedException { MyRunnable myRunnable = new MyRunnable(); new Thread(myRunnable).start(); while (true) { // 下面这个判断 myRunnable.isFlag() 一直是 false // 主线程 Thread.sleep 后,才有时间同步线程数据 // Thread.sleep(1000); if (myRunnable.isFlag()) { System.out.println("========"); break; } } } } class MyRunnable implements Runnable { private boolean flag = false; @Override public void run() { try { Thread.sleep(200); } catch (Exception e) { } flag = true; System.out.println("flag=" + isFlag()); } public boolean isFlag() { return flag; } public void setFlag(boolean flag) { this.flag = flag; } }
现象:主线程没有添加 Thread.sleep(1000); 这句代码时,主线程不会打印 "========";添加后会打印。
原因:while(true) 效率很高,主线程没有及时将数据与主内存进行同步。共享变量保存在主内存中,子线程操作该变量时,复制一份进行操作,然后同步到主内存中。但是这个过程其他线程不可见。
解决方案一:使用 synchronized 会取同步数据
synchronized (myRunnable) { if (myRunnable.isFlag()) { System.out.println("========"); break; } }
解决方案二:使用 volatile 关键字修饰变量
private volatile boolean flag = false;
volatile 关键字:当多个线程操作共享数据时,可以保证内存中的数据可见。相较于 synchronized 是一种较为轻量级的同步策略。
注意:
- volatile 不具备“互斥性”;
- volatile 不能保证变量的“原子性”;
2、原子变量与 CAS 算法 <--返回目录
i++ 的原子性问题:i++ 操作时实际上分为三个步骤“读-改-写”
int i = 10; i = i++; // 上面 i++ 操作系统底层的过程分为三步 int temp = i; i = i + 1; i = temp;
2.1、demo: 多个线程操作共享数据,参数线程安全问题
package com.oy; public class TestAtomicDemo { public static void main(String[] args) { AtomicDemo ad = new AtomicDemo(); for (int i = 0; i < 10; i++) { new Thread(ad).start(); } } } class AtomicDemo implements Runnable { private int serialNumber = 0; @Override public void run() { try { Thread.sleep(200); } catch (Exception e) { } System.out.println(Thread.currentThread().getName() + ":" + getSerialNumber()); } public int getSerialNumber() { return serialNumber++; } }
现象:
原因:因为变量 serialNumber 的自增 (serialNumber++) 操作不是原子性操作,当多个线程操作 serialNumber 变量时就会有线程安全问题。
解决方案:
原子变量:jdk1.5 后 java.util.concurrent.atomic 包下提供了常用的原子变量。
- 原子变量封装的变量都使用 volatile 保证内存可见性;
- CAS(Compare-And-Swap)算法保证数据的原子性;
2.2、CAS 算法
CAS 算法时硬件对于并发操作共享数据的支持。CAS 保护三个操作数:内存值 V,预估值(旧值)A,更新值 B,当且仅当 V==A 时, V = B, 否则不做任何操作。(就是只有预估值与内存值相等时,才进行更新)
2.3、使用 “原子变量” 改写前面的程序
当某个线程判断预估值与内存值不等时,不进行任何处理,但是会发起重试,直至成功。
package com.oy; import java.util.concurrent.atomic.AtomicInteger; public class TestAtomicDemo { public static void main(String[] args) { AtomicDemo ad = new AtomicDemo(); for (int i = 0; i < 10; i++) { new Thread(ad).start(); } } } class AtomicDemo implements Runnable { //private int serialNumber = 0; private AtomicInteger serialNumber = new AtomicInteger(); @Override public void run() { try { Thread.sleep(200); } catch (Exception e) { } System.out.println(Thread.currentThread().getName() + ":" + getSerialNumber()); } public int getSerialNumber() { //return serialNumber++; return serialNumber.getAndIncrement(); } }
3、同步容器类 <--返回目录
Java 5 在 java.util.concurrent 包中提供了多种并发容器类来改进同步容器的性能。
ConcurrentHashMap 同步容器类时 Java 5 增加的一个线程安全的哈希表。对于多线程的操作,介于 HashMap 与 Hashtable 之间。内部采用 “锁分段” 机制替代 Hashtable 的独占锁,进而提高性能。Hashtable 是在其内部的每个方法上添加 synchronized 关键字进行同步,性能低,且在复合操作(比如不存在则添加、存在则删除)时,由于调用了 Hashtable 的多个方法,同样有线程安全问题。
当使用 List<Stirng> list = Collections.synchronizedList<new ArrayList<String>()) 包装一个 ArrayList 来得到线程安全的 list 集合时同样有 “并发修改异常” 问题。
package com.oy; import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; public class TestCopyOnWriteArrayList { public static void main(String[] args) { MyThread mt = new MyThread(); for (int i = 0; i < 5; i++) { new Thread(mt).start(); } } } class MyThread implements Runnable { //private static List<String> list = Collections.synchronizedList(new ArrayList<String>()); private static CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<String>(); static { list.add("aaa"); list.add("bbb"); list.add("ccc"); } @Override public void run() { Iterator<String> iterator = list.iterator(); while (iterator.hasNext()) { System.out.println(iterator.next()); // 使用 Collections.synchronizedList 时,同样有并发修改异常 list.add("ddd"); } } }
4、闭锁 CountDownLatch <--返回目录
CountDownLatch 内部维护一个 count, 如果count 不为 0,latch.await() 处于等待,直至 count 为 0。
下面案例:统计 5 个线程执行时间。每个子线程执行完后进行 latch.countDown(), 直至 count 为 0,程序从latch.await() 等待的地方继续执行。
package com.oy; import java.util.concurrent.CountDownLatch; public class TestCountDownLatch { public static void main(String[] args) { // 创建闭锁对象 int threadNum = 5; CountDownLatch latch = new CountDownLatch(threadNum); LatchDemo ld = new LatchDemo(latch); // 计算下面多个线程的执行时间 long start = System.currentTimeMillis(); for (int i = 0; i < threadNum; i++) { new Thread(ld).start(); } // latch.await(): 等待锁 countDown try { latch.await(); } catch (Exception e) { } long end = System.currentTimeMillis(); System.out.println("执行时间:" + (end - start) + " ms"); } } class LatchDemo implements Runnable { private CountDownLatch latch; public LatchDemo(CountDownLatch latch) { this.latch = latch; } @Override public void run() { synchronized (this) { try { int sum = 0; for (int i = 1; i <= 10000000; i++) { sum += i; } System.out.println(Thread.currentThread().getName() + ", sum=" + sum); } finally { latch.countDown(); } } } }
5、使用 Callable 创建线程 <--返回目录
package com.oy; import java.util.concurrent.Callable; import java.util.concurrent.FutureTask; public class TestCallable { public static void main(String[] args) throws Exception { Demo d = new Demo(); // 执行 Callable 方式,需要 FutureTask 实现类的支持,用于接收结果 FutureTask<Integer> result = new FutureTask<>(d); new Thread(result).start(); // 接收线程运算后的结果 // 线程没有返回结果前,result.get() 处于等待 Integer sum = result.get(); // FutureTask 可用于闭锁 System.out.println(Thread.currentThread().getName() + ", sum=" + sum); } } class Demo implements Callable<Integer> { @Override public Integer call() throws Exception { int sum = 0; for (int i = 1; i <= 10000000; i++) { sum += i; } System.out.println(Thread.currentThread().getName() + ", sum=" + sum); return sum; } }
6、Lock 同步锁 <--返回目录
解决多线程安全问题的方式
- synchronized(隐式锁) 同步代码块
- synchronized(隐式锁) 同步方法
- jdk 1.5 后的同步锁 Lock(显示锁,需要通过 lock()方法上锁,通过 unlock() 方法进行释放锁)
package com.oy; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class TestLock { public static void main(String[] args) { Ticket t = new Ticket(); new Thread(t, "1 号窗口").start(); new Thread(t, "2 号窗口").start(); new Thread(t, "3 号窗口").start(); } } class Ticket implements Runnable { private int tick = 100; private Lock lock = new ReentrantLock(); @Override public void run() { while (true) { try { lock.lock(); // 加锁 if (tick > 0) { try { Thread.sleep(200); } catch (Exception e) { } System.out.println(Thread.currentThread().getName() + " 完成售票,余票为:" + (--tick)); } } finally { lock.unlock(); // 释放锁 } } } }
7、生产者消费者案例--虚假等待 <--返回目录
package com.oy; /* * 生产者和消费者案例 */ public class TestProductorAndConsumer { public static void main(String[] args) { Clerk clerk = new Clerk(); Productor pro = new Productor(clerk); Consumer cus = new Consumer(clerk); new Thread(pro, "生产者 A").start(); new Thread(cus, "消费者 B").start(); new Thread(pro, "生产者 C").start(); new Thread(cus, "消费者 D").start(); } } //店员 class Clerk{ private int product = 0; //进货 public synchronized void get(){ while(product >= 1){//为了避免虚假唤醒问题,应该总是使用在循环中 System.out.println("产品已满!"); try { this.wait(); } catch (InterruptedException e) { } } System.out.println(Thread.currentThread().getName() + " : " + ++product); this.notifyAll(); } //卖货 public synchronized void sale(){ while(product <= 0){ System.out.println("缺货!"); try { this.wait(); } catch (InterruptedException e) { } } System.out.println(Thread.currentThread().getName() + " : " + --product); this.notifyAll(); } } //生产者 class Productor implements Runnable{ private Clerk clerk; public Productor(Clerk clerk) { this.clerk = clerk; } @Override public void run() { for (int i = 0; i < 20; i++) { try { Thread.sleep(200); } catch (InterruptedException e) { } clerk.get(); } } } //消费者 class Consumer implements Runnable{ private Clerk clerk; public Consumer(Clerk clerk) { this.clerk = clerk; } @Override public void run() { for (int i = 0; i < 20; i++) { clerk.sale(); } } }
8、线程按序交替执行 <--返回目录
package com.oy; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /* * 编写一个程序,开启 3 个线程,这三个线程的 ID 分别为 A、B、C,每个线程将自己的 ID 在屏幕上打印 10 遍,要求输出的结果必须按顺序显示。 * 如:ABCABCABC…… 依次递归 */ public class TestABCAlternate { public static void main(String[] args) { AlternateDemo ad = new AlternateDemo(); new Thread(new Runnable() { @Override public void run() { for (int i = 1; i <= 20; i++) { ad.loopA(i); } } }, "A").start(); new Thread(new Runnable() { @Override public void run() { for (int i = 1; i <= 20; i++) { ad.loopB(i); } } }, "B").start(); new Thread(new Runnable() { @Override public void run() { for (int i = 1; i <= 20; i++) { ad.loopC(i); System.out.println("-----------------------------------"); } } }, "C").start(); } } class AlternateDemo{ private int number = 1; //当前正在执行线程的标记 private Lock lock = new ReentrantLock(); private Condition condition1 = lock.newCondition(); private Condition condition2 = lock.newCondition(); private Condition condition3 = lock.newCondition(); /** * @param totalLoop : 循环第几轮 */ public void loopA(int totalLoop){ lock.lock(); try { //1. 判断 if(number != 1){ condition1.await(); } //2. 打印 for (int i = 1; i <= 1; i++) { System.out.println(Thread.currentThread().getName() + "\\t" + i + "\\t" + totalLoop); } //3. 唤醒 number = 2; condition2.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void loopB(int totalLoop){ lock.lock(); try { //1. 判断 if(number != 2){ condition2.await(); } //2. 打印 for (int i = 1; i <= 1; i++) { System.out.println(Thread.currentThread().getName() + "\\t" + i + "\\t" + totalLoop); } //3. 唤醒 number = 3; condition3.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void loopC(int totalLoop){ lock.lock(); try { //1. 判断 if(number != 3){ condition3.await(); } //2. 打印 for (int i = 1; i <= 1; i++) { System.out.println(Thread.currentThread().getName() + "\\t" + i + "\\t" + totalLoop); } //3. 唤醒 number = 1; condition1.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } }
9、读写锁 ReadWriteLock <--返回目录

package com.oy; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; /* * 1. ReadWriteLock : 读写锁 * * 写写/读写 需要“互斥” * 读读 不需要互斥 * */ public class TestReadWriteLock { public static void main(String[] args) { ReadWriteLockDemo rw = new ReadWriteLockDemo(); new Thread(new Runnable() { @Override public void run() { rw.set((int)(Math.random() * 101)); } }, "Write:").start(); for (int i = 0; i < 100; i++) { new Thread(new Runnable() { @Override public void run() { rw.get(); } }).start(); } } } class ReadWriteLockDemo{ private int number = 0; private ReadWriteLock lock = new ReentrantReadWriteLock(); //读 public void get(){ lock.readLock().lock(); //上锁 try{ System.out.println(Thread.currentThread().getName() + " : " + number); }finally{ lock.readLock().unlock(); //释放锁 } } //写 public void set(int number){ lock.writeLock().lock(); try{ System.out.println(Thread.currentThread().getName()); this.number = number; }finally{ lock.writeLock().unlock(); } } }
10、线程八锁 <--返回目录
所有的非静态同步方法用的都是同一把锁,即实例对象本身,或者说this对象,如果一个实例对象的非静态同步方法获取锁后,该实例对象的其他非静态同步方法必须等待获取锁的方法释放锁后才能获取锁。如果别的对象的非静态同步方法与该实例对象的非静态同步方法获取不同的锁,则不需要等待。
所有的静态同步方法用的也是同一把锁,即类对象本身,所以静态同步方法与非静态同步方法之间是不会有竞态条件的,但是一个静态同步方法获取Class实例的锁后,其他静态同步方法都必须等待该方法释放锁才能获取锁。
所谓线程八锁实际上对应于是否加上synchronized,是否加上static等8种常见情况,代码如下:

/* * 题目:判断打印的 "one" or "two" ? * * 1. 两个普通同步方法,两个线程,标准打印, 打印? //one two * 2. 新增 Thread.sleep() 给 getOne() ,打印? //one two * 3. 新增普通方法 getThree() , 打印? //three one two * 4. 两个普通同步方法,两个 Number 对象,打印? //two one * 5. 修改 getOne() 为静态同步方法,打印? //two one * 6. 修改两个方法均为静态同步方法,一个 Number 对象? //one two * 7. 一个静态同步方法,一个非静态同步方法,两个 Number 对象? //two one * 8. 两个静态同步方法,两个 Number 对象? //one two * * 线程八锁的关键: * ①非静态方法的锁默认为 this, 静态方法的锁为 对应的 Class 实例 * ②某一个时刻内,只能有一个线程持有锁,无论几个方法。 */ public class TestThread8Monitor { public static void main(String[] args) { Number number = new Number(); Number number2 = new Number(); new Thread(new Runnable() { @Override public void run() { number.getOne(); } }).start(); new Thread(new Runnable() { @Override public void run() { // number.getTwo(); number2.getTwo(); } }).start(); /*new Thread(new Runnable() { @Override public void run() { number.getThree(); } }).start();*/ } } class Number{ public static synchronized void getOne(){//Number.class try { Thread.sleep(3000); } catch (InterruptedException e) { } System.out.println("one"); } public synchronized void getTwo(){//this System.out.println("two"); } public void getThree(){ System.out.println("three"); } }
11、线程池 <--返回目录
package com.oy; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /* * 一、线程池:提供了一个线程队列,队列中保存着所有等待状态的线程。避免了创建与销毁额外开销,提高了响应的速度。 * * 二、线程池的体系结构: * java.util.concurrent.Executor : 负责线程的使用与调度的根接口 * |--**ExecutorService 子接口: 线程池的主要接口 * |--ThreadPoolExecutor 线程池的实现类 * |--ScheduledExecutorService 子接口:负责线程的调度 * |--ScheduledThreadPoolExecutor :继承 ThreadPoolExecutor, 实现 ScheduledExecutorService * * 三、工具类 : Executors * ExecutorService newFixedThreadPool() : 创建固定大小的线程池 * ExecutorService newCachedThreadPool() : 缓存线程池,线程池的数量不固定,可以根据需求自动的更改数量。 * ExecutorService newSingleThreadExecutor() : 创建单个线程池。线程池中只有一个线程 * * ScheduledExecutorService newScheduledThreadPool() : 创建固定大小的线程,可以延迟或定时的执行任务。 */ public class TestThreadPool { public static void main(String[] args) throws Exception { //1. 创建线程池 ExecutorService pool = Executors.newFixedThreadPool(5); List<Future<Integer>> list = new ArrayList<>(); for (int i = 0; i < 10; i++) { Future<Integer> future = pool.submit(new Callable<Integer>(){ @Override public Integer call() throws Exception { int sum = 0; for (int i = 0; i <= 100; i++) { sum += i; } return sum; } }); list.add(future); } pool.shutdown(); for (Future<Integer> future : list) { System.out.println(future.get()); } /*ThreadPoolDemo以上是关于Java JUC的主要内容,如果未能解决你的问题,请参考以下文章