线程同步 各个关键字和方法的使用

Posted 高国藩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程同步 各个关键字和方法的使用相关的知识,希望对你有一定的参考价值。



1、volatile关键词:用来对共享变量的访问进行同步,上一次写入操作的结果对下一次读取操作是肯定可见的。(在写入volatile变量值之后,CPU缓存中的内容会被写回内存;在读取volatile变量时,CPU缓存中的对应内容会被置为失效,重新从主存中进行读取),volatile不使用锁,性能优于synchronized关键词。

用来确保对一个变量的修改被正确地传播到其他线程中。

例子:A线程是Worker,一直跑循环,B线程调用setDone(true),A线程即停止任务

[java] view plain copy print ?
  1. public class Worker  
  2.    private volatile boolean done;  
  3.    public void setDone(boolean done)  
  4.       this.done = done;  
  5.      
  6.    public void work()  
  7.       while(!done)  
  8.          //执行任务;  
  9.         
  10.      
  11.   
public class Worker
   private volatile boolean done;
   public void setDone(boolean done)
      this.done = done;
   
   public void work()
      while(!done)
         //执行任务;
      
   
例子:错误使用。因为没有锁的支持,volatile的修改不能依赖于当前值,当前值可能在其他线程中被修改。(Worker是直接赋新值与当前值无关)

[java] view plain copy print ?
  1. public class Counter   
  2.     public volatile static int count = 0;  
  3.     public static void inc()   
  4.         //这里延迟1毫秒,使得结果明显  
  5.         try   
  6.             Thread.sleep(1);  
  7.          catch (InterruptedException e)   
  8.           
  9.         count++;  
  10.       
  11.     public static void main(String[] args)   
  12.         //同时启动1000个线程,去进行i++计算,看看实际结果  
  13.         for (int i = 0; i < 1000; i++)   
  14.             new Thread(new Runnable()   
  15.                 @Override  
  16.                 public void run()   
  17.                     Counter.inc();  
  18.                   
  19.             ).start();  
  20.           
  21.         //这里每次运行的值都有可能不同,可能不为1000  
  22.         System.out.println("运行结果:Counter.count=" + Counter.count);  
  23.       
  24.   
public class Counter 
    public volatile static int count = 0;
    public static void inc() 
        //这里延迟1毫秒,使得结果明显
        try 
            Thread.sleep(1);
         catch (InterruptedException e) 
        
        count++;
    
    public static void main(String[] args) 
        //同时启动1000个线程,去进行i++计算,看看实际结果
        for (int i = 0; i < 1000; i++) 
            new Thread(new Runnable() 
                @Override
                public void run() 
                    Counter.inc();
                
            ).start();
        
        //这里每次运行的值都有可能不同,可能不为1000
        System.out.println("运行结果:Counter.count=" + Counter.count);
    
2、final关键词
final关键词声明的域的值只能被初始化一次,一般在构造方法中初始化。。(在多线程开发中,final域通常用来实现不可变对象)

当对象中的共享变量的值不可能发生变化时,在多线程中也就不需要同步机制来进行处理,故在多线程开发中应尽可能使用不可变对象

另外,在代码执行时,final域的值可以被保存在寄存器中,而不用从主存中频繁重新读取。

3、java基本类型的原子操作

1)基本类型,引用类型的复制引用是原子操作;(即一条指令完成)

2)long与double的赋值,引用是可以分割的,非原子操作;

3)要在线程间共享long或double的字段时,必须在synchronized中操作,或是声明成volatile

三、Java提供的线程同步方式

1、synchronized关键字

方法或代码块的互斥性来完成实际上的一个原子操作。(方法或代码块在被一个线程调用时,其他线程处于等待状态)

所有的Java对象都有一个与synchronzied关联的监视器对象(monitor),允许线程在该监视器对象上进行加锁和解锁操作。

a、静态方法:Java类对应的Class类的对象所关联的监视器对象。

b、实例方法:当前对象实例所关联的监视器对象。

c、代码块:代码块声明中的对象所关联的监视器对象。

注:当锁被释放,对共享变量的修改会写入主存;当活得锁,CPU缓存中的内容被置为无效。编译器在处理synchronized方法或代码块,不会把其中包含的代码移动到synchronized方法或代码块之外,从而避免了由于代码重排而造成的问题。

例:以下方法getNext()和getNextV2() 都获得了当前实例所关联的监视器对象

[java] view plain copy print ?
  1. public class SynchronizedIdGenerator  
  2.    private int value = 0;  
  3.    public synchronized int getNext()  
  4.       return value++;  
  5.      
  6.    public int getNextV2()  
  7.       synchronized(this)  
  8.          return value++;  
  9.         
  10.      
  11.   
public class SynchronizedIdGenerator
   private int value = 0;
   public synchronized int getNext()
      return value++;
   
   public int getNextV2()
      synchronized(this)
         return value++;
      
   

2、Object类的wait、notify和notifyAll方法

生产者和消费者模式,判断缓冲区是否满来消费,缓冲区是否空来生产的逻辑。如果用while 和 volatile也可以做,不过本质上会让线程处于忙等待,占用CPU时间,对性能造成影响。

Obj.wait(),与Obj.notify()必须要与synchronized(Obj)一起使用,也就是wait,与notify是针对已经获取了Obj锁进行操作,从语法角度来说就是Obj.wait(),Obj.notify必须在synchronized(Obj)...语句块内。从功能上来说wait就是说线程在获取对象锁后,主动释放对象锁,同时本线程休眠。直到有其它线程调用对象的notify()唤醒该线程,才能继续获取对象锁,并继续执行。相应的notify()就是对对象锁的唤醒操作。但有一点需要注意的是notify()调用后,并不是马上就释放对象锁的,而是在相应的synchronized()语句块执行结束,自动释放锁后,JVM会在wait()对象锁的线程中随机选取一线程,赋予其对象锁,唤醒线程,继续执行。这样就提供了在线程间同步、唤醒的操作。Thread.sleep()与Object.wait()二者都可以暂停当前线程,释放CPU控制权,主要的区别在于Object.wait()在释放CPU同时,释放了对象锁的控制。

工作流程:

a、Consumer线程A 来 看产品,发现产品为空,调用产品对象的wait(),线程A进入产品对象的等待池并释放产品的锁。

b、Producer线程B获得产品的锁,执行产品的notifyAll(),Consumer线程A从产品的等待池进入锁池,Producer线程B生产产品,然后退出释放锁。

c、Consumer线程A获得产品锁,进入执行,发现有产品,消费产品,然后退出。

例子:

package com.page.bjsxt.cude;

public class Test 

	public static Object object = new Object();

	public static void main(String[] args) 
		Thread1 thread1 = new Thread1();
		Thread2 thread2 = new Thread2();
		thread1.start();
		thread2.start();
	

	static class Thread1 extends Thread 
		@Override
		public void run() 
			synchronized (object) 
				try 
					object.wait();//释放锁,该线程等待,直到被唤醒
					System.out.println("synchronized (object) 语句块执行结束...");
					
				 catch (InterruptedException e) 
				
				System.out.println("线程" + Thread.currentThread().getName()
						+ "获取到了锁");
			
		
	

	static class Thread2 extends Thread 
		@Override
		public void run() 
			synchronized (object) 
				object.notify();//唤醒object.wait() 并将此synchronized语句块执行结束;
				System.out.println("线程" + Thread.currentThread().getName()
						+ "调用了object.notify()");
			
			System.out.println("线程" + Thread.currentThread().getName() + "释放了锁");
		
	

public synchronized String pop()
  this.notifyAll();// 唤醒对象等待池中的所有线程,可能唤醒的就是 生产者(当生产者发现产品满,就会进入对象的等待池,这里代码省略,基本略同)
   while(index == -1)//如果发现没产品,就释放锁,进入对象等待池
      this.wait();
   //当生产者生产完后,消费者从this.wait()方法再开始执行,第一次还会执行循环,万一产品还是为空,则再等待,所以这里必须用while循环,不能用if
   String good = buffer[index];
   buffer[index] = null;
   index--;
   return good;// 消费完产品,退出。

注:wait()方法有超时和不超时之分,超时的在经过一段时间,线程还在对象的等待池中,那么线程也会推出等待状态。

3、线程状态转换:

已经废弃的方法:stop、suspend、resume、destroy,这些方法在实现上时不安全的。

线程的状态:NEW、RUNNABLE、BLOCKED、WAITING、TIMED_WAITING(有超时的等待)、TERMINATED。

a、方法sleep()进入的阻塞状态,不会释放对象的锁(即大家一起睡,谁也别想执行代码),所以不要让sleep方法处在synchronized方法或代码块中,否则造成其他等待获取锁的线程长时间处于等待。

b、方法join()则是主线程等待子线程完成,再往下执行。例如main方法新建两个线程A和B

[java] view plain copy print ?
  1. public static void main(String[] args) throws InterruptedException     
  2. Thread t1 = new Thread(new ThreadTesterA());    
  3. Thread t2 = new Thread(new ThreadTesterB());    
  4. t1.start();    
  5. t1.join(); // 等t1执行完再往下执行  
  6. t2.start();    
  7. t2.join(); // 在虚拟机执行中,这句可能被忽略  
  8.   
public static void main(String[] args) throws InterruptedException   
Thread t1 = new Thread(new ThreadTesterA());  
Thread t2 = new Thread(new ThreadTesterB());  
t1.start();  
t1.join(); // 等t1执行完再往下执行
t2.start();  
t2.join(); // 在虚拟机执行中,这句可能被忽略

c、方法interrupt(),向被调用的对象线程发起中断请求。如线程A通过调用线程B的d的interrupt方法来发出中断请求,线程B来处理这个请求,当然也可以忽略,这不是必须的。Object类的wait()、Thread类的join()和sleep方法都会抛出受检异常java.lang.InterruptedException,通过interrupt方法中断该线程会导致线程离开等待状态。对于wait()调用来说,线程需要重新获取监视器对象上的锁之后才能抛出InterruptedException异常,并致以异常的处理逻辑。

可以通过Thread类的isInterrupted方法来判断是否有中断请求发生,通常可以利用这个方法来判断是否退出线程(类似上面的volatitle修饰符的例子);

Thread类还有个方法Interrupted(),该方法不但可以判断当前线程是否被中断,还会清楚线程内部的中断标记,如果返回true,即曾被请求中断,同时调用完后,清除中断标记。

如果一个线程在某个对象的等待池,那么notify和interrupt 都可以使该线程从等待池中被移除。如果同时发生,那么看实际发生顺序。如果是notify先,那照常唤醒,没影响。如果是interrupt先,并且虚拟机选择让该线程中断,那么即使nofity,也会忽略该线程,而唤醒等待池中的另一个线程。

e、yield(),尝试让出所占有的CPU资源,让其他线程获取运行机会,对操作系统上的调度器来说是一个信号,不一定立即切换线程。(在实际开发中,测试阶段频繁调用yeid方法使线程切换更频繁,从而让一些多线程相关的错误更容易暴露出来)。


四、非阻塞方式

线程之间同步机制的核心是监视对象上的锁,竞争锁来获得执行代码的机会。当一个对象获取对象的锁,然后其他尝试获取锁的对象会处于等待状态,这种锁机制的实现方式很大程度限制了多线程程序的吞吐量和性能(线程阻塞),且会带来死锁(线程A有a对象锁,等着获取b对象锁,线程B有b对象锁,等待获取a对象锁)和优先级倒置(优先级低的线程获得锁,优先级高的只能等待对方释放锁)等问题。

如果能不阻塞线程,又能保证多线程程序的正确性,就能有更好的性能。

在程序中,对共享变量的使用一般遵循一定的模式,即读取、修改和写入三步组成。之前碰到的问题是,这三步执行中可能线程执行切换,造成非原子操作。锁机制是把这三步变成一个原子操作。

目前CPU本身实现 将这三步 合起来 形成一个原子操作,无需线程锁机制干预,常见的指令是“比较和替换”(compare and swap,CAS),这个指令会先比较某个内存地址的当前值是不是指定的旧指,如果是,就用新值替换,否则什么也不做,指令返回的结果是内存地址的当前值。通过CAS指令可以实现不依赖锁机制的非阻塞算法。一般做法是把CAS指令的调用放在一个无限循环中,不断尝试,知道CAS指令成功完成修改。

java.util.concurrent.atomic包中提供了CAS指令。(不是所有CPU都支持CAS,在某些平台,java.util.concurrent.atomic的实现仍然是锁机制)

atomic包中提供的Java类分成三类:

1、支持以原子操作来进行更新的数据类型的Java类(AtomicBoolean、AtomicInteger、AtomicReference),在内存模型相关的语义上,这四个类的对象类似于volatile变量。

类中的常用方法:

a、compareAndSet:接受两个参数,一个是期望的旧值,一个是替换的新值。

b、weakCompareAndSet:效果同compareAndSet(JSR中表示weak原子方式读取和有条件地写入变量但创建任何 happen-before 排序,但在源代码中和compareAndSet完全一样,所以并没有按JSR实现)

c、get和set:分别用来直接获取和设置变量的值。

d、lazySet:与set类似,但允许编译器把lazySet方法的调用与后面的指令进行重排,因此对值得设置操作有可能被推迟。

例:

[java] view plain copy print ?
  1. public class AtomicIdGenerator  
  2.    private final AtomicInter counter = new AtomicInteger(0);  
  3.    public int getNext()  
  4.       return counter.getAndIncrement();  
  5.      
  6.   
  7. // getAndIncrement方法的内部实现方式,这也是CAS方法的一般模式,CAS方法不一定成功,所以包装在一个无限循环中,直到成功  
  8. public final int getAndIncrement()  
  9.    for(;;)  
  10.       int current = get();  
  11.       int next = current +1;  
  12.       if(compareAndSet(current,next))  
  13.          return current;  
  14.      
  15.   
public class AtomicIdGenerator
   private final AtomicInter counter = new AtomicInteger(0);
   public int getNext()
      return counter.getAndIncrement();
   

// getAndIncrement方法的内部实现方式,这也是CAS方法的一般模式,CAS方法不一定成功,所以包装在一个无限循环中,直到成功
public final int getAndIncrement()
   for(;;)
      int current = get();
      int next = current +1;
      if(compareAndSet(current,next))
         return current;
   
2、提供对数组类型的变量进行处理的Java类,AtomicIntegerArray、AtomicLongArray和AtomicReferenceArray类。(同上,只是放在类数组里,调用时也只是多了一个操作元素索引的参数)

3、通过反射的方式对任何对象中包含的volatitle变量使用CAS方法,AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater。他们提供了一种方式把CAS的功能扩展到了任何Java类中声明为volatitle的域上。(灵活,但语义较弱,因为对象的volatitle可能被非atomic的其他方式被修改)

[java] view plain copy print ?
  1. public class TreeNode  
  2.    private volatile TreeNode parent;  
  3. // 静态工厂方法  
  4.    private static final AtomicReferenceFieldUpdater<TreeNode, TreeNode> parentUpdater = AtomicReferenceFieldUpdater.newUpdater(TreeNode.class,TreeNode.class,"parent");  
  5. public boolean compareAndSetParent(TreeNode expect, TreeNode update)  
  6.       return parentUpdater.compareAndSet(this, expect, update);  
  7.   
  8.   
public class TreeNode
   private volatile TreeNode parent;
// 静态工厂方法
   private static final AtomicReferenceFieldUpdater<TreeNode, TreeNode> parentUpdater = AtomicReferenceFieldUpdater.newUpdater(TreeNode.class,TreeNode.class,"parent");
public boolean compareAndSetParent(TreeNode expect, TreeNode update)
      return parentUpdater.compareAndSet(this, expect, update);

注:java.util.concurrent.atomic包中的Java类属于比较底层的实现,一般作为java.util.concurrent包中很多非阻塞的数据结构的实现基础。

比较多的用AtomicBoolean、AtomicInteger、AtomicLong和AtomicReference。在实现线程安全的计数器时,AtomicInteger和AtomicLong类时最佳的选择。

五、高级同步机制(比synchronized更灵活的加锁机制)

synchronized和volatile,以及wait、notify等方法抽象层次低,在程序开发中使用比较繁琐,易出错。

而多线程之间的交互来说,存在某些固定的模式,如生产者-消费者和读者-写者模式,把这些模式抽象成高层API,使用起来会非常方便。

java.util.concurrent包为多线程提供了高层的API,满足日常开发中的常见需求。

常用接口

1、Lock接口,表示一个锁方法:

a、lock(),获取所,如果无法获取所锁,会处于等待状态

b、unlock(),释放锁。(一般放在finally代码块中)

c、lockInterruptibly(),与lock()类似,但允许当前线程在等待获取锁的过程中被中断。(所以要处理InterruptedException)

d、tryLock(),以非阻塞方式获取锁,如果无法获取锁,则返回false。(tryLock()的另一个重载可以指定超时,如果指定超时,当无法获取锁,会等待而阻塞,同时线程可以被中断)

2、ReadWriteLock接口,表示两个锁,读取的共享锁和写入的排他锁。(适合常见的读者--写者场景)

ReadWriteLock接口的readLock和writeLock方法来获取对应的锁的Lock接口的实现。

在多数线程读取,少数线程写入的情况下,可以提高多线程的性能,提高使用该数据结构的吞吐量。

如果是相反的情况,较多的线程写入,则接口会降低性能。

3、ReentrantLock类和ReentrantReadWriteLock,分别为上面两个接口的实现类。

他们具有重入性:即允许一个线程多次获取同一个锁(他们会记住上次获取锁并且未释放的线程对象,和加锁的次数,getHoldCount())

同一个线程每次获取锁,加锁数+1,每次释放锁,加锁数-1,到0,则该锁被释放,可以被其他线程获取。

[java] view plain copy print ?
  1. public class LockIdGenrator  
  2. //new ReentrantLock(true)是重载,使用更加公平的加锁机制,在锁被释放后,会优先给等待时间最长的线程,避免一些线程长期无法获得锁  
  3.    private int ReentrantLock lock = ReentrantLock();  
  4.    privafte int value = 0;  
  5.    public int getNext()  
  6.       lock.lock();      //进来就加锁,没有锁会等待  
  7.       try  
  8.          return value++;//实际操作  
  9.       finally  
  10.          lock.unlock();//释放锁  
  11.         
  12.      
  13.   
public class LockIdGenrator
//new ReentrantLock(true)是重载,使用更加公平的加锁机制,在锁被释放后,会优先给等待时间最长的线程,避免一些线程长期无法获得锁
   private int ReentrantLock lock = ReentrantLock();
   privafte int value = 0;
   public int getNext()
      lock.lock();      //进来就加锁,没有锁会等待
      try
         return value++;//实际操作
      finally
         lock.unlock();//释放锁
      
   
注:重入性减少了锁在各个线程之间的等待,例如便利一个HashMap,每次next()之前加锁,之后释放,可以保证一个线程一口气完成便利,而不会每次next()之后释放锁,然后和其他线程竞争,降低了加锁的代价, 提供了程序整体的吞吐量。(即,让一个线程一口气完成任务,再把锁传递给其他线程)。
4、Condition接口,Lock接口代替了synchronized,Condition接口替代了object的wait、nofity。

a、await(),使当前线程进入等待状态,知道被唤醒或中断。重载形式可以指定超时时间。

b、awaitNanos(),以纳秒为单位等待。

c、awaitUntil(),指定超时发生的时间点,而不是经过的时间,参数为java.util.Date。

d、awaitUninterruptibly(),前面几种会响应其他线程发出的中断请求,他会无视,直到被唤醒。

注:与Object类的wait()相同,await()会释放其所持有的锁。

e、signal()和signalAll, 相当于 notify和notifyAll

[java] view plain copy print ?
  1. Lock lock = new ReentrantLock();  
  2. Condition condition = lock.newCondition();  
  3. lock.lock();  
  4. try  
  5.    while(/*逻辑条件不满足*/)  
  6.       condition.await();     
  7.      
  8. finally  
  9.    lock.unlock();  
  10.   
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
lock.lock();
try
   while(/*逻辑条件不满足*/)
      condition.await();   
   
finally
   lock.unlock();

六、底层同步器

多线程程序中,线程之间存在多种不同的同步方式。除了Java标准库提供的同步方式之外,程序中特有的同步方式需要由开发人员自己来实现。

常见的一种需求是 对有限个共享资源的访问,比如多台个人电脑,2台打印机,当多个线程在等待同一个资源时,从公平角度出发,会用FIFO队列。

  如果程序中的同步方式可以抽象成对有限个资源的访问,那么可以使用java.util.concurrent.locks包中的AbstractQueuedSynchronizer类和AbstractQueuedLongSynchronizer类作为实现的基础,前者用int类型的变量来维护内部状态,而后者用long类型。(可以将这个变量理解为共享资源个数)

通过getState、setState、和compareAndSetState3个方法更新内部变量的值。

AbstractQueuedSynchronizer类是abstract的,需要覆盖其中包含的部分方法,通常做法是把其作为一个Java类的内部类,外部类提供具体的同步方式,内部类则作为实现的基础。有两种模式,排他模式和共享模式,分别对应方法 tryAcquire()、tryRelease 和 tryAcquireShared、tryReleaseShared,在这些方法中,使用getState、setState、compareAndSetState3个方法来修改内部变量的值,以此来反应资源的状态。

[java] view plain copy print ?
  1. public class SimpleResourceManager  
  2.    private final InnerSynchronizer synchronizer;  
  3.    private static class InnerSynchronizer extends AbstractQueuedSynchronizer  
  4.       InnerSynchronizer(int numOfResources)  
  5.          setState(numOfResources);  
  6.         
  7.       protected int tryAcquireShared(int acquires)  
  8.          for(;;)  
  9.             int available = getState();  
  10.             int remain = available - acquires;  
  11.             if(remain <0 || comapreAndSetState(available, remain)  
  12.                return remain;  
  13.               
  14.            
  15.         
  16.       protected boolean try ReleaseShared(int releases)  
  17.          for(;;)  
  18.             int available = getState();   
  19.             int next = available + releases;   
  20.             if(compareAndSetState(available,next)  
  21.                return true;  
  22.               
  23.            
  24.         
  25.      
  26.    public SimpleResourceManager(int numOfResources)  
  27.       synchronizer = new InnerSynchronizer(numOfResources);  
  28.      
  29.    public void acquire() throws InterruptedException  
  30.       synchronizer.acquireSharedInterruptibly(1);  
  31.            
  32.    pubic void release()      
  33.       synchronizer.releaseShared(1);  
  34.       
  35.   
public class SimpleResourceManager
   private final InnerSynchronizer synchronizer;
   private static class InnerSynchronizer extends AbstractQueuedSynchronizer
      InnerSynchronizer(int numOfResources)
         setState(numOfResources);
      
      protected int tryAcquireShared(int acquires)
         for(;;)
            int available = getState();
            int remain = available - acquires;
            if(remain <0 || comapreAndSetState(available, remain)
               return remain;
            
         
      
      protected boolean try ReleaseShared(int releases)
         for(;;)
            int available = getState(); 
            int next = available + releases; 
            if(compareAndSetState(available,next)
               return true;
            
         
      
   
   public SimpleResourceManager(int numOfResources)
      synchronizer = new InnerSynchronizer(numOfResources);
   
   public void acquire() throws InterruptedException
      synchronizer.acquireSharedInterruptibly(1);
         
   pubic void release()    
      synchronizer.releaseShared(1);
    

七、高级同步对象(提高开发效率)

atomic和locks包提供的Java类可以满足基本的互斥和同步访问的需求,但这些Java类的抽象层次较低,使用比较复杂。

更简单的做法是使用java.util.concurrent包中的高级同步对象。

1、信号量。

信号量一般用来数量有限的资源,每类资源有一个对象的信号量,信号量的值表示资源的可用数量。

在使用资源时,需要从该信号量上获取许可,成功获取许可,资源的可用数-1;完成对资源的使用,释放许可,资源可用数+1; 当资源数为0时,需要获取资源的线程以阻塞的方式来等待资源,或过段时间之后再来检查资源是否可用。(上面的SimpleResourceManager类实际上时信号量的一个简单实现)

java.util.concurrent.Semaphore类,在创建Semaphore类的对象时指定资源的可用数

a、acquire(),以阻塞方式获取许可

b、tryAcquire(),以非阻塞方式获取许可

c、release(),释放许可。

d、accquireUninterruptibly(),accquire()方法获取许可以的过程可以被中断,如果不希望被中断,使用此方法。

[java] view plain copy print ?
  1. public class PrinterManager  
  2.    private final Semphore semaphore;  
  3.    private final List<Printer> printers = new ArrayList<>():  
  4.    public PrinterManager(Collection<? extends Printer> printers)  
  5.       this.printers.addAll(printers);  
  6.       //这里重载方法,第二个参数为true,以公平竞争模式,防止线程饥饿  
  7.       this.semaphore = new Semaphore(this.printers.size(),true);  
  8.      
  9.    public Printer acquirePrinter() throws InterruptedException  
  10.       semaphore.acquire();  
  11.       return getAvailablePrinter();  
  12.      
  13.    public void releasePrinter(Printer printer)  
  14.       putBackPrinter(pinter);  
  15.       semaphore.release();  
  16.      
  17.    private synchronized Printer getAvailablePrinter()  
  18.       printer result = printers.get(0);  
  19.       printers.remove(0);  
  20.       return result;  
  21.      
  22.    private synchronized void putBackPrinter(Printer printer)  
  23.       printers.add(printer);  
  24.      
  25.   
public class PrinterManager
   private final Semphore semaphore;
   private final List<Printer> printers = new ArrayList<>():
   public PrinterManager(Collection<? extends Printer> printers)
      this.printers.addAll(printers);
      //这里重载方法,第二个参数为true,以公平竞争模式,防止线程饥饿
      this.semaphore = new Semaphore(this.printers.size(),true);
   
   public Printer acquirePrinter() throws InterruptedException
      semaphore.acquire();
      return getAvailablePrinter();
   
   public void releasePrinter(Printer printer)
      putBackPrinter(pinter);
      semaphore.release();
   
   private synchronized Printer getAvailablePrinter()
      printer result = printers.get(0);
      printers.remove(0);
      return result;
   
   private synchronized void putBackPrinter(Printer printer)
      printers.add(printer);
   
2、倒数闸门

多线程协作时,一个线程等待另外的线程完成任务才能继续进行。

java.util.concurrent.CountDownLatch类,创建该类时,指定等待完成的任务数;当一个任务完成,调用countDonw(),任务数-1。等待任务完成的线程通过await(),进入阻塞状态,直到任务数量为0。CountDownLatch类为一次性,一旦任务数为0,再调用await()不再阻塞当前线程,直接返回。

例:

[java] view plain copy print ?
  1. public class PageSizeSorter  
  2.    // 并发性能远远优于HashTable的 Map实现,hashTable做任何操作都需要获得锁,同一时间只有有个线程能使用,而ConcurrentHashMap是分段加锁,不同线程访问不同的数据段,完全不受影响,忘记HashTable吧。  
  3.    private static final ConcurrentHashMap<String , Interger> sizeMap = new ConcurrentHashMap<>();  
  4.    private static class GetSizeWorker implements Runnable  
  5.       private final String urlString;  
  6.       public GetSizeWorker(String urlString , CountDownLatch signal)  
  7.          this.urlString = urlStirng;  
  8.          this.signal = signal;  
  9.         
  10.       public void run()  
  11.          try  
  12.             InputStream is = new URL(urlString).openStream();  
  13.             int size = IOUtils.toByteArray(is).length;  
  14.             sizeMap.put(urlString, size);  
  15.          catch(IOException e)  
  16.             sizeMap.put(urlString, -1);  
  17.          finally  
  18.             signal.countDown()://完成一个任务 , 任务数-1  
  19.            
  20.         
  21.      
  22.    private void sort()  
  23.       List<Entry<String, Integer> list = new ArrayList<sizeMap.entrySet());  
  24.       Collections.slort(list, new Comparator<Entry<String,Integer>>()  
  25.          public int compare (Entry<String, Integer> o1, Entry<Sting , Integer> o2)  
  26.             return Integer.compare(o2.getValue(),o1.getValue());  
  27.       ;  
  28.       System.out.println(Arrays.deepToString(list.toArray()));  
  29.      
  30.    public void sortPageSize(Collection<String> urls) throws InterruptedException  
  31.       CountDownLatch sortSignal = new CountDownLatch(urls.size());  
  32.       for(String url: urls)  
  33.          new Thread(new GetSizeWorker(url, sortSignal)).start();  
  34.         
  35.       sortSignal.await()://主线程在这里等待,任务数归0,则继续执行  
  36.       sort();  
  37.      
  38.   
public class PageSizeSorter
   // 并发性能远远优于HashTable的 Map实现,hashTable做任何操作都需要获得锁,同一时间只有有个线程能使用,而ConcurrentHashMap是分段加锁,不同线程访问不同的数据段,完全不受影响,忘记HashTable吧。
   private static final ConcurrentHashMap<String , Interger> sizeMap = new ConcurrentHashMap<>();
   private static class GetSizeWorker implements Runnable
      private final String urlString;
      public GetSizeWorker(String urlString , CountDownLatch signal)
         this.urlString = urlStirng;
         this.signal = signal;
      
      public void run()
         try
            InputStream is = new URL(urlString).openStream();
            int size = IOUtils.toByteArray(is).length;
            sizeMap.put(urlString, size);
         catch(IOException e)
            sizeMap.put(urlString, -1);
         finally
            signal.countDown()://完成一个任务 , 任务数-1
         
      
   
   private void sort()
      List<Entry<String, Integer> list = new ArrayList<sizeMap.entrySet());
      Collections.slort(list, new Comparator<Entry<String,Integer>>()
         public int compare (Entry<String, Integer> o1, Entry<Sting , Integer> o2)
            return Integer.compare(o2.getValue(),o1.getValue());
      ;
      System.out.println(Arrays.deepToString(list.toArray()));
   
   public void sortPageSize(Collection<String> urls) throws InterruptedException
      CountDownLatch sortSignal = new CountDownLatch(urls.size());
      for(String url: urls)
         new Thread(new GetSizeWorker(url, sortSignal)).start();
      
      sortSignal.await()://主线程在这里等待,任务数归0,则继续执行
      sort();
   

3、循环屏障

循环屏障在作用上类似倒数闸门,不过他不像倒数闸门是一次性的,可以循环使用。另外,线程之间是互相平等的,彼此都需要等待对方完成,当一个线程完成自己的任务之后,等待其他线程完成。当所有线程都完成任务之后,所有线程才可以继续运行。

当线程之间需要再次进行互相等待时,可以复用同一个循环屏障。

类java.uti.concurrent.CyclicBarrier用来表示循环屏障,创建时指定使用该对象的线程数目,还可以指定一个Runnable接口的对象作为每次循环后执行的动作。(当最后一个线程完成任务之后,所有线程继续执行之前,被执行。如果线程之间需要更新一些共享的内部状态,可以利用这个Runnalbe接口的对象来处理)。

每个线程任务完成之后,通过调用await方法进行等待,当所有线程都调用await方法之后,处于等待状态的线程都可以继续执行。在所有线程中,只要有一个在等待中被中断,超时或是其他错误,整个循环屏障会失败,所有等待中的其他线程抛出java.uti.concurrent.BrokenBarrierException。

例:每个线程负责找一个数字区间的质数,当所有线程完成后,如果质数数目不够,继续扩大范围查找

[java] view plain copy print ?
  1. public class PrimeNumber  
  2.    private static final int TOTAL_COUTN = 5000;  
  3.    private static final int RANGE_LENGTH= 200;  
  4.    private static final int WORKER_NUMBER = 5;  
  5.    private static volatitle boolean done = false;  
  6.    private static int rangeCount = 0;  
  7.    private static final List<Long> results = new ArrayList<Long>():  
  8.    private static final CyclicBarrier barrier = new CyclicBarrier(WORKER_NUMBER, new Runnable()  
  9.       public void run()  
  10.          if(results.size() >= TOTAL_COUNT)  
  11.             done = true;  
  12.            
  13.        
  14.    );  
  15.    private static class PrimeFinder implements Runnable  
  16.       public void run()  
  17.          while(!done)// 整个过程在一个 while循环下,await()等待,下次循环开始,会再次判断 执行条件  
  18.             int range = getNextRange();  
  19.             long start = rang * RANGE_LENGTH;  
  20.             long end = (range + 1) * RANGE_LENGTH;  
  21.             for(long i = start; i<end;i++)  
  22.                if(isPrime(i))  
  23.                   updateResult(i);  
  24.                  
  25.               
  26.             try  
  27.                barrier.await();  
  28.             catch (InterruptedException | BokenBarrierException e)  
  29.                done =  true;  
  30.               
  31.            
  32.         
  33.      
  34.    private synchronized static void updateResult(long value)  
  35.       results.add(value);  
  36.      
  37.    private synchronized static int getNextRange()  
  38.       return rangeCount++;  
  39.      
  40.    private static boolean isPrime(long number)  
  41.       //找质数的代码  
  42.      
  43.    public void calculate()  
  44.       for(int i=0;i<WORKER_NUMBER;i++)  
  45.          new Thread(new PrimeFinder()).start();  
  46.         
  47.       while(!done)  
  48.   
  49.         
  50.       //计算完成  
  51.      
  52.   
public class PrimeNumber
   private static final int TOTAL_COUTN = 5000;
   private static final int RANGE_LENGTH= 200;
   private static final int WORKER_NUMBER = 5;
   private static volatitle boolean done = false;
   private static int rangeCount = 0;
   private static final List<Long> results = new ArrayList<Long>():
   private static final CyclicBarrier barrier = new CyclicBarrier(WORKER_NUMBER, new Runnable()
      public void run()
         if(results.size() >= TOTAL_COUNT)
            done = true;
         
     
   );
   private static class Prim

以上是关于线程同步 各个关键字和方法的使用的主要内容,如果未能解决你的问题,请参考以下文章

AQS同步组件-CyclicBarrier(循环屏障)解析和用例

QNX多线程同步之Barrier(屏障)

java CyclicBarrier同步屏障

屏障同步(输出、增量和等待)

java并发之同步辅助类

Linux 线程同步都有哪些方法?