转 Java 多线程之一
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了转 Java 多线程之一相关的知识,希望对你有一定的参考价值。
一、多线程
1、操作系统有两个容易混淆的概念,进程和线程。
进程:一个计算机程序的运行实例,包含了需要执行的指令;有自己的独立地址空间,包含程序内容和数据;不同进程的地址空间是互相隔离的;进程拥有各种资源和状态信息,包括打开的文件、子进程和信号处理。
线程:表示程序的执行流程,是CPU调度执行的基本单位;线程有自己的程序计数器、寄存器、堆栈和帧。同一进程中的线程共用相同的地址空间,同时共享进进程锁拥有的内存和其他资源。
2、Java标准库提供了进程和线程相关的API,进程主要包括表示进程的java.lang.Process类和创建进程的java.lang.ProcessBuilder类;
表示线程的是java.lang.Thread类,在虚拟机启动之后,通常只有Java类的main方法这个普通线程运行,运行时可以创建和启动新 的线程;还有一类守护线程(damon thread),守护线程在后台运行,提供程序运行时所需的服务。当虚拟机中运行的所有线程都是守护线程时,虚拟机终止运行。
3、线程间的可见性:一个线程对进程中共享的数据的修改,是否对另一个线程可见
可见性问题:
a、CPU采用时间片轮转等不同算法来对线程进行调度
- public class IdGenerator{
- private int value = 0;
- public int getNext(){
- return value++;
- }
- }
对于IdGenerator的getNext()方法,在多线程下不能保证返回值是不重复的:各个线程之间相互竞争CPU时间来获取运行机会,CPU切换可能发生在执行间隙。
以上代码getNext()的指令序列:CPU切换可能发生在7条指令之间,多个getNext的指令交织在一起。
- aload_0
- dup
- getfield #12
- dup_x1
- iconst_1
- iadd
- putfield #12
b、CPU缓存:
目前CPU一般采用层次结构的多级缓存的架构,有的CPU提供了 L1、L2和L3三级缓存。当CPU需要读取主存中某个位置的数据时,会一次检查各级缓存中是否存在对应的数据。如果有,直接从缓存中读取,这比从主存中 读取速度快很多。当CPU需要写入时,数据先被写入缓存中,之后再某个时间点写回主存。所以某些时间点上,缓存中的数据与主存中的数据可能是不一致。
c、指令顺序重排
出行性能考虑,编译器在编译时可能会对字节代码的指令顺序进行重新排列,以优化指令的执行顺序,在单线程中不会有问题,但在多线程可能产生与可见性相关的问题。
二、Java内存模型(Java Memory Model)
屏蔽了CPU缓存等细节,只关注主存中的共享变量;关注对象的实例域、静态域和数组元素;关注线程间的动作。
1、volatile关键词:用来对共享变量的访问进行同步,上一次写入操作的结果对下一次读取操作是肯定可见 的。(在写入volatile变量值之后,CPU缓存中的内容会被写回内存;在读取volatile变量时,CPU缓存中的对应内容会被置为失效,重新从 主存中进行读取),volatile不使用锁,性能优于synchronized关键词。
用来确保对一个变量的修改被正确地传播到其他线程中。
例子:A线程是Worker,一直跑循环,B线程调用setDone(true),A线程即停止任务
- public class Worker{
- private volatile boolean done;
- public void setDone(boolean done){
- this.done = done;
- }
- public void work(){
- while(!done){
- //执行任务;
- }
- }
- }
例子:错误使用。因为没有锁的支持,volatile的修改不能依赖于当前值,当前值可能在其他线程中被修改。(Worker是直接赋新值与当前值无关)
- public class Counter {
- public volatile static int count = 0;
- public static void inc() {
- //这里延迟1毫秒,使得结果明显
- try {
- Thread.sleep(1);
- } catch (InterruptedException e) {
- }
- count++;
- }
- public static void main(String[] args) {
- //同时启动1000个线程,去进行i++计算,看看实际结果
- for (int i = 0; i < 1000; i++) {
- new Thread(new Runnable() {
- @Override
- public void run() {
- Counter.inc();
- }
- }).start();
- }
- //这里每次运行的值都有可能不同,可能不为1000
- System.out.println("运行结果:Counter.count=" + Counter.count);
- }
- }
2、final关键词
final关键词声明的域的值只能被初始化一次,一般在构造方法中初始化。。(在多线程开发中,final域通常用来实现不可变对象)
当对象中的共享变量的值不可能发生变化时,在多线程中也就不需要同步机制来进行处理,故在多线程开发中应尽可能使用不可变对象。
另外,在代码执行时,final域的值可以被保存在寄存器中,而不用从主存中频繁重新读取。
3、java基本类型的原子操作
1)基本类型,引用类型的复制引用是原子操作;(即一条指令完成)
2)long与double的赋值,引用是可以分割的,非原子操作;
3)要在线程间共享long或double的字段时,必须在synchronized中操作,或是声明成volatile
三、Java提供的线程同步方式
1、synchronized关键字
方法或代码块的互斥性来完成实际上的一个原子操作。(方法或代码块在被一个线程调用时,其他线程处于等待状态)
所有的Java对象都有一个与synchronzied关联的监视器对象(monitor),允许线程在该监视器对象上进行加锁和解锁操作。
a、静态方法:Java类对应的Class类的对象所关联的监视器对象。
b、实例方法:当前对象实例所关联的监视器对象。
c、代码块:代码块声明中的对象所关联的监视器对象。
注:当锁被释放,对共享变量的修改会写入主存;当活得锁,CPU缓存中的内容被置为无效。编译器在处理synchronized方法或代码块,不会把其中包含的代码移动到synchronized方法或代码块之外,从而避免了由于代码重排而造成的问题。
例:以下方法getNext()和getNextV2() 都获得了当前实例所关联的监视器对象
- public class SynchronizedIdGenerator{
- private int value = 0;
- public synchronized int getNext(){
- return value++;
- }
- public int getNextV2(){
- synchronized(this){
- return value++;
- }
- }
- }
2、Object类的wait、notify和notifyAll方法
生产者和消费者模式,判断缓冲区是否满来消费,缓冲区是否空来生产的逻辑。如果用while 和 volatile也可以做,不过本质上会让线程处于忙等待,占用CPU时间,对性能造成影响。
wait: 将当前线程放入,该对象的等待池中,线程A调用了B对象的wait()方法,线程A进入B对象的等待池,并且释放B的锁。(这里,线程A必须持有B的锁, 所以调用的代码必须在synchronized修饰下,否则直接抛出java.lang.IllegalMonitorStateException异 常)。
notify:将该对象中等待池中的线程,随机选取一个放入对象的锁池,当当前线程结束后释放掉锁, 锁池中的线程即可竞争对象的锁来获得执行机会。
notifyAll:将对象中等待池中的线程,全部放入锁池。
(notify锁唤醒的线程选择由虚拟机实现来决定,不能保证一个对象锁关联的等待集合中的线程按照所期望的顺序被唤醒,很可能一个线程被唤醒之 后,发现他所要求的条件并没有满足,而重新进入等待池。因为当等待池中包含多个线程时,一般使用notifyAll方法,不过该方法会导致线程在没有必要 的情况下被唤醒,之后又马上进入等待池,对性能有影响,不过能保证程序的正确性)
工作流程:
a、Consumer线程A 来 看产品,发现产品为空,调用产品对象的wait(),线程A进入产品对象的等待池并释放产品的锁。
b、Producer线程B获得产品的锁,执行产品的notifyAll(),Consumer线程A从产品的等待池进入锁池,Producer线程B生产产品,然后退出释放锁。
c、Consumer线程A获得产品锁,进入执行,发现有产品,消费产品,然后退出。
例子:
- public synchronized String pop(){
- this.notifyAll();// 唤醒对象等待池中的所有线程,可能唤醒的就是 生产者(当生产者发现产品满,就会进入对象的等待池,这里代码省略,基本略同)
- while(index == -1){//如果发现没产品,就释放锁,进入对象等待池
- this.wait();
- }//当生产者生产完后,消费者从this.wait()方法再开始执行,第一次还会执行循环,万一产品还是为空,则再等待,所以这里必须用while循环,不能用if
- String good = buffer[index];
- buffer[index] = null;
- index--;
- return good;// 消费完产品,退出。
- }
注:wait()方法有超时和不超时之分,超时的在经过一段时间,线程还在对象的等待池中,那么线程也会推出等待状态。
3、线程状态转换:
已经废弃的方法:stop、suspend、resume、destroy,这些方法在实现上时不安全的。
线程的状态:NEW、RUNNABLE、BLOCKED、WAITING、TIMED_WAITING(有超时的等待)、TERMINATED。
a、方法sleep()进入的阻塞状态,不会释放对象的锁(即大家一起睡,谁也别想执行代码),所以不要让sleep方法处在synchronized方法或代码块中,否则造成其他等待获取锁的线程长时间处于等待。
b、方法join()则是主线程等待子线程完成,再往下执行。例如main方法新建两个线程A和B
- public static void main(String[] args) throws InterruptedException {
- Thread t1 = new Thread(new ThreadTesterA());
- Thread t2 = new Thread(new ThreadTesterB());
- t1.start();
- t1.join(); // 等t1执行完再往下执行
- t2.start();
- t2.join(); // 在虚拟机执行中,这句可能被忽略
- }
c、方法interrupt(),向被调用的对象线程发起中断请求。如线程A通过调用线程B的d的interrupt方法来发出中断请求,线程B来 处理这个请求,当然也可以忽略,这不是必须的。Object类的wait()、Thread类的join()和sleep方法都会抛出受检异常 java.lang.InterruptedException,通过interrupt方法中断该线程会导致线程离开等待状态。对于wait()调用来 说,线程需要重新获取监视器对象上的锁之后才能抛出InterruptedException异常,并致以异常的处理逻辑。
可以通过Thread类的isInterrupted方法来判断是否有中断请求发生,通常可以利用这个方法来判断是否退出线程(类似上面的volatitle修饰符的例子);
Thread类还有个方法Interrupted(),该方法不但可以判断当前线程是否被中断,还会清楚线程内部的中断标记,如果返回true,即曾被请求中断,同时调用完后,清除中断标记。
如果一个线程在某个对象的等待池,那么notify和 interrupt 都可以使该线程从等待池中被移除。如果同时发生,那么看实际发生顺序。如果是notify先,那照常唤醒,没影响。如果是interrupt先,并且虚拟 机选择让该线程中断,那么即使nofity,也会忽略该线程,而唤醒等待池中的另一个线程。
e、yield(),尝试让出所占有的CPU资源,让其他线程获取运行机会,对操作系统上的调度器来说是一个信号,不一定立即切换线程。(在实际开发中,测试阶段频繁调用yeid方法使线程切换更频繁,从而让一些多线程相关的错误更容易暴露出来)。
四、非阻塞方式
线程之间同步机制的核心是监视对象上的锁,竞争锁来获得执行代码的机 会。当一个对象获取对象的锁,然后其他尝试获取锁的对象会处于等待状态,这种锁机制的实现方式很大程度限制了多线程程序的吞吐量和性能(线程阻塞),且会 带来死锁(线程A有a对象锁,等着获取b对象锁,线程B有b对象锁,等待获取a对象锁)和优先级倒置(优先级低的线程获得锁,优先级高的只能等待对方释放 锁)等问题。
如果能不阻塞线程,又能保证多线程程序的正确性,就能有更好的性能。
在程序中,对共享变量的使用一般遵循一定的模式,即读取、修改和写入三步组成。之前碰到的问题是,这三步执行中可能线程执行切换,造成非原子操作。锁机制是把这三步变成一个原子操作。
目前CPU本身实现 将这三步 合起来 形成一个原子操作,无需线程锁机制干预,常见的指令是“比较和替换”(compare and swap,CAS),这个指令会先比较某个内存地址的当前值是不是指定的旧指,如果是,就用新值替换,否则什么也不做,指令返回的结果是内存地址的当前 值。通过CAS指令可以实现不依赖锁机制的非阻塞算法。一般做法是把CAS指令的调用放在一个无限循环中,不断尝试,知道CAS指令成功完成修改。
java.util.concurrent.atomic包中提供了CAS指令。(不是所有CPU都支持CAS,在某些平台,java.util.concurrent.atomic的实现仍然是锁机制)
atomic包中提供的Java类分成三类:
1、支持以原子操作来进行更新的数据类型的Java类(AtomicBoolean、AtomicInteger、AtomicReference),在内存模型相关的语义上,这四个类的对象类似于volatile变量。
类中的常用方法:
a、compareAndSet:接受两个参数,一个是期望的旧值,一个是替换的新值。
b、weakCompareAndSet:效果同compareAndSet(JSR中表示weak原子方式读取和有条件地写入变量但不创建任何 happen-before 排序,但在源代码中和compareAndSet完全一样,所以并没有按JSR实现)
c、get和set:分别用来直接获取和设置变量的值。
d、lazySet:与set类似,但允许编译器把lazySet方法的调用与后面的指令进行重排,因此对值得设置操作有可能被推迟。
例:
- public class AtomicIdGenerator{
- private final AtomicInter counter = new AtomicInteger(0);
- public int getNext(){
- return counter.getAndIncrement();
- }
- }
- // getAndIncrement方法的内部实现方式,这也是CAS方法的一般模式,CAS方法不一定成功,所以包装在一个无限循环中,直到成功
- public final int getAndIncrement(){
- for(;;){
- int current = get();
- int next = current +1;
- if(compareAndSet(current,next))
- return current;
- }
- }
2、提供对数组类型的变量进行处理的Java类,AtomicIntegerArray、AtomicLongArray和AtomicReferenceArray类。(同上,只是放在类数组里,调用时也只是多了一个操作元素索引的参数)
3、通过反射的方式对任何对象中包含的volatitle变量使用CAS方法,AtomicIntegerFieldUpdater、
AtomicLongFieldUpdater、AtomicReferenceFieldUpdater。他们提供了一种方式把CAS的功能扩展到了任
何Java类中声明为volatitle的域上。(灵活,但语义较弱,因为对象的volatitle可能被非atomic的其他方式被修改)
- public class TreeNode{
- private volatile TreeNode parent;
- // 静态工厂方法
- private static final AtomicReferenceFieldUpdater<TreeNode, TreeNode> parentUpdater = AtomicReferenceFieldUpdater.newUpdater(TreeNode.class,TreeNode.class,"parent");
- public boolean compareAndSetParent(TreeNode expect, TreeNode update){
- return parentUpdater.compareAndSet(this, expect, update);
- }
- }
注:java.util.concurrent.atomic包中的Java类属于比较底层的实现,一般作为java.util.concurrent包中很多非阻塞的数据结构的实现基础。
比较多的用AtomicBoolean、AtomicInteger、AtomicLong和AtomicReference。在实现线程安全的计数器时,AtomicInteger和AtomicLong类时最佳的选择。
五、高级同步机制(比synchronized更灵活的加锁机制)
synchronized和volatile,以及wait、notify等方法抽象层次低,在程序开发中使用比较繁琐,易出错。
而多线程之间的交互来说,存在某些固定的模式,如生产者-消费者和读者-写者模式,把这些模式抽象成高层API,使用起来会非常方便。
java.util.concurrent包为多线程提供了高层的API,满足日常开发中的常见需求。
常用接口
1、Lock接口,表示一个锁方法:
a、lock(),获取所,如果无法获取所锁,会处于等待状态
b、unlock(),释放锁。(一般放在finally代码块中)
c、lockInterruptibly(),与lock()类似,但允许当前线程在等待获取锁的过程中被中断。(所以要处理InterruptedException)
d、tryLock(),以非阻塞方式获取锁,如果无法获取锁,则返回false。(tryLock()的另一个重载可以指定超时,如果指定超时,当无法获取锁,会等待而阻塞,同时线程可以被中断)
2、ReadWriteLock接口,表示两个锁,读取的共享锁和写入的排他锁。(适合常见的读者--写者场景)
ReadWriteLock接口的readLock和writeLock方法来获取对应的锁的Lock接口的实现。
在多数线程读取,少数线程写入的情况下,可以提高多线程的性能,提高使用该数据结构的吞吐量。
如果是相反的情况,较多的线程写入,则接口会降低性能。
3、ReentrantLock类和ReentrantReadWriteLock,分别为上面两个接口的实现类。
他们具有重入性:即允许一个线程多次获取同一个锁(他们会记住上次获取锁并且未释放的线程对象,和加锁的次数,getHoldCount())
同一个线程每次获取锁,加锁数+1,每次释放锁,加锁数-1,到0,则该锁被释放,可以被其他线程获取。
- public class LockIdGenrator{
- //new ReentrantLock(true)是重载,使用更加公平的加锁机制,在锁被释放后,会优先给等待时间最长的线程,避免一些线程长期无法获得锁
- private int ReentrantLock lock = ReentrantLock();
- privafte int value = 0;
- public int getNext(){
- lock.lock(); //进来就加锁,没有锁会等待
- try{
- return value++;//实际操作
- }finally{
- lock.unlock();//释放锁
- }
- }
- }
注:
重入性减少了锁在各个线程之间的等待,例如便利一个HashMap,每次next()之前加锁,之后释放,可以保证一个线程一口气完成便利,而不会每次
next()之后释放锁,然后和其他线程竞争,降低了加锁的代价, 提供了程序整体的吞吐量。(即,让一个线程一口气完成任务,再把锁传递给其他线程)。
4、Condition接口,Lock接口代替了synchronized,Condition接口替代了object的wait、nofity。
a、await(),使当前线程进入等待状态,知道被唤醒或中断。重载形式可以指定超时时间。
b、awaitNanos(),以纳秒为单位等待。
c、awaitUntil(),指定超时发生的时间点,而不是经过的时间,参数为java.util.Date。
d、awaitUninterruptibly(),前面几种会响应其他线程发出的中断请求,他会无视,直到被唤醒。
注:与Object类的wait()相同,await()会释放其所持有的锁。
e、signal()和signalAll, 相当于 notify和notifyAll
- Lock lock = new ReentrantLock();
- Condition condition = lock.newCondition();
- lock.lock();
- try{
- while(/*逻辑条件不满足*/){
- condition.await();
- }
- }finally{
- lock.unlock();
- }
六、底层同步器
多线程程序中,线程之间存在多种不同的同步方式。除了Java标准库提供的同步方式之外,程序中特有的同步方式需要由开发人员自己来实现。
常见的一种需求是 对有限个共享资源的访问,比如多台个人电脑,2台打印机,当多个线程在等待同一个资源时,从公平角度出发,会用FIFO队列。
如果程序中的同步方式可以抽象成对有限个资源的访问,那么可以使 用java.util.concurrent.locks包中的AbstractQueuedSynchronizer类和 AbstractQueuedLongSynchronizer类作为实现的基础,前者用int类型的变量来维护内部状态,而后者用long类型。(可以 将这个变量理解为共享资源个数)
通过getState、setState、和compareAndSetState3个方法更新内部变量的值。
AbstractQueuedSynchronizer类是 abstract的,需要覆盖其中包含的部分方法,通常做法是把其作为一个Java类的内部类,外部类提供具体的同步方式,内部类则作为实现的基础。有两 种模式,排他模式和共享模式,分别对应方法 tryAcquire()、tryRelease 和 tryAcquireShared、tryReleaseShared,在这些方法中,使用getState、setState、 compareAndSetState3个方法来修改内部变量的值,以此来反应资源的状态。
- public class SimpleResourceManager{
- private final InnerSynchronizer synchronizer;
- private static class InnerSynchronizer extends AbstractQueuedSynchronizer{
- InnerSynchronizer(int numOfResources){
- setState(numOfResources);
- }
- protected int tryAcquireShared(int acquires){
- for(;;){
- int available = getState();
- int remain = available - acquires;
- if(remain <0 || comapreAndSetState(available, remain){
- return remain;
- }
- }
- }
- protected boolean try ReleaseShared(int releases){
- for(;;){
- int available = getState();
- int next = available + releases;
- if(compareAndSetState(available,next){
- return true;
- }
- }
- }
- }
- public SimpleResourceManager(int numOfResources){
- synchronizer = new InnerSynchronizer(numOfResources);
- }
- public void acquire() throws InterruptedException{
- synchronizer.acquireSharedInterruptibly(1);
- }
- pubic void release(){
- synchronizer.releaseShared(1);
- }
- }
七、高级同步对象(提高开发效率)
atomic和locks包提供的Java类可以满足基本的互斥和同步访问的需求,但这些Java类的抽象层次较低,使用比较复杂。
更简单的做法是使用java.util.concurrent包中的高级同步对象。
1、信号量。
信号量一般用来数量有限的资源,每类资源有一个对象的信号量,信号量的值表示资源的可用数量。
在使用资源时,需要从该信号量上获取许可,成功获取许可,资源的可用 数-1;完成对资源的使用,释放许可,资源可用数+1; 当资源数为0时,需要获取资源的线程以阻塞的方式来等待资源,或过段时间之后再来检查资源是否可用。(上面的SimpleResourceManager 类实际上时信号量的一个简单实现)
java.util.concurrent.Semaphore类,在创建Semaphore类的对象时指定资源的可用数
a、acquire(),以阻塞方式获取许可
b、tryAcquire(),以非阻塞方式获取许可
c、release(),释放许可。
d、accquireUninterruptibly(),accquire()方法获取许可以的过程可以被中断,如果不希望被中断,使用此方法。
- public class PrinterManager{
- private final Semphore semaphore;
- private final List<Printer> printers = new ArrayList<>():
- public PrinterManager(Collection<? extends Printer> printers){
- this.printers.addAll(printers);
- //这里重载方法,第二个参数为true,以公平竞争模式,防止线程饥饿
- this.semaphore = new Semaphore(this.printers.size(),true);
- }
- public Printer acquirePrinter() throws InterruptedException{
- semaphore.acquire();
- return getAvailablePrinter();
- }
- public void releasePrinter(Printer printer){
- putBackPrinter(pinter);
- semaphore.release();
- }
- private synchronized Printer getAvailablePrinter(){
- printer result = printers.get(0);
- printers.remove(0);
- return result;
- }
- private synchronized void putBackPrinter(Printer printer){
- printers.add(printer);
- }
- }
2、倒数闸门
多线程协作时,一个线程等待另外的线程完成任务才能继续进行。
java.util.concurrent.CountDownLatch 类,创建该类时,指定等待完成的任务数;当一个任务完成,调用countDonw(),任务数-1。等待任务完成的线程通过await(),进入阻塞状 态,直到任务数量为0。CountDownLatch类为一次性,一旦任务数为0,再调用await()不再阻塞当前线程,直接返回。
例:
- public class PageSizeSorter{
- // 并发性能远远优于HashTable的 Map实现,hashTable做任何操作都需要获得锁,同一时间只有有个线程能使用,而ConcurrentHashMap是分段加锁,不同线程访问不同的数据段,完全不受影响,忘记HashTable吧。
- private static final ConcurrentHashMap<String , Interger> sizeMap = new ConcurrentHashMap<>();
- private static class GetSizeWorker implements Runnable{
- private final String urlString;
- public GetSizeWorker(String urlString , CountDownLatch signal){
- this.urlString = urlStirng;
- this.signal = signal;
- }
- public void run(){
- try{
- InputStream is = new URL(urlString).openStream();
- int size = IOUtils.toByteArray(is).length;
- sizeMap.put(urlString, siz
以上是关于转 Java 多线程之一的主要内容,如果未能解决你的问题,请参考以下文章