线程基础知识系列(四)线程的同步2 线程通信和Condition变量

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了线程基础知识系列(四)线程的同步2 线程通信和Condition变量相关的知识,希望对你有一定的参考价值。

本文是系列的第四篇。

线程基础知识系列(三)线程的同步  :同步控制,锁及synchronized

线程基础知识系列(二)线程的管理 :线程的状态,控制,休眠,Interrupt,yield等

线程基础知识系列(一)线程的创建和启动  :线程的创建和启动,join(),daemon线程,Callable任务。


第三篇文章,重点阐述了如何使用锁和同步块对线程间共享可变变量保护,保证只有一个线程可以进入临界区。其实,没有过多的涉及另一个重要的同步概念:线程协作。第三篇中涉及的线程间并没有有效的协调。本篇重点阐述线程间的协调工作,具体包含以下主题。

  1. Wait-Notify机制

  2. 认识Condition

  3. 应用Condition


1.Wait-Notify机制


相关Object API.,一般使用在线程间通信

public class Object {

public final native void wait(long timeout) throws InterruptedException;
public final void wait(long timeout, int nanos) throws InterruptedException {
    ...
    wait(timeout);
}
public final void wait() throws InterruptedException {
    wait(0);
}

public final native void notifyAll();
public final native void notify();
}

  1)wait()、notify()和notifyAll()方法是本地方法,并且为final方法,无法被重写。

  2)调用某个对象的wait()方法能让当前线程阻塞,并且当前线程必须拥有此对象的monitor(即锁)

  3)调用某个对象的notify()方法能够唤醒一个正在等待这个对象的monitor的线程,如果有多个线程都在等待这个对象的monitor,则只能唤醒其中一个线程;

  4)调用notifyAll()方法能够唤醒所有正在等待这个对象的monitor的线程;


所谓的线程通信,也就是Wait-Notify场景。一般与以下内容相关:

1、状态变量(State Variable)也就是共享可变变量。
当线程需要wait的时候,总是因为一些状态不满足导致的。如往BlockingQueue里加元素队列已满的时候。这里的状态变量就是指“队列元素数目”。

2、条件断言(Condition Predicate)
当线程确定是否进入wait或者从notify中醒来的时候是否继续往下执行,大都要测试状态条件是否满足,如往BlockingQueue里加元素队列已满,于是阻塞,后续其它线程从队列里取走了元素,就通知在等待的线程“队列不是满的了,可以往里加东西了”,这时候在等待的线程就会醒来,然后看看是不是真的队列不为满的状态,如果是,就将元素添加进去,如果不是,就继续等待。这儿的条件断言,就是指“队列中持有元素数目是否等于队列长度。

3、内置条件队列(Condition Queue)
每个对象都有一个内置的条件队列,当一个线程在该对象是调用wait的时候,就会将该线程加入该对象的条件队列。具体可以查看<深入JVM锁机制之一:synchronized>文章,了解下synchronized底层原理。

文章截图

技术分享

简单强调下:

必须在临界区中调用wait,notify,notifyAll方法。否则报java.lang.IllegalMonitorStateException异常。

如果持有其他锁,调用obj.wait,notify方法,也是不可以的。

错误情形1

package com.ticmy.concurrency;
public class TestWatiNotifyMechanism {
    private static Object obj = new Object();
    public static void main(String[] args) throws Exception {
        //错误代码,无意义,仅测试用
        obj.wait();
        //obj.notify();
    }
}

错误情形2

package com.ticmy.concurrency;
public class TestWatiNotifyMechanism {
    private static Object obj = new Object();
    public static void main(String[] args) throws Exception {
        //错误代码,无意义,仅测试用
        synchronized(TestWatiNotifyMechanism.class) {
            obj.wait();
            //obj.notify();
        }
    }
}


为了更好的说明wait-notify的含义。参考《Java并发编程实战》中的例子

1.BaseBoundedBuffer.jar 有界缓存基类

package net.jcip.examples;

import net.jcip.annotations.*;

/**
 * BaseBoundedBuffer
 * <p/>
 * Base class for bounded buffer implementations
 *
 * @author Brian Goetz and Tim Peierls
 */
@ThreadSafe
public abstract class BaseBoundedBuffer <V> {
    @GuardedBy("this") private final V[] buf;
    @GuardedBy("this") private int tail;
    @GuardedBy("this") private int head;
    @GuardedBy("this") private int count;

    protected BaseBoundedBuffer(int capacity) {
        this.buf = (V[]) new Object[capacity];
    }

    protected synchronized final void doPut(V v) {
        buf[tail] = v;
        if (++tail == buf.length)
            tail = 0;
        ++count;
    }

    protected synchronized final V doTake() {
        V v = buf[head];
        buf[head] = null;
        if (++head == buf.length)
            head = 0;
        --count;
        return v;
    }

    public synchronized final boolean isFull() {
        return count == buf.length;
    }

    public synchronized final boolean isEmpty() {
        return count == 0;
    }
}

2.BoundedBuffer.java ,有界缓存实现

package net.jcip.examples;

import net.jcip.annotations.*;

/**
 * BoundedBuffer
 * <p/>
 * Bounded buffer using condition queues
 *
 * @author Brian Goetz and Tim Peierls
 */
@ThreadSafe
        public class BoundedBuffer <V> extends BaseBoundedBuffer<V> {
    // CONDITION PREDICATE: not-full (!isFull())
    // CONDITION PREDICATE: not-empty (!isEmpty())
    public BoundedBuffer() {
        this(100);
    }

    public BoundedBuffer(int size) {
        super(size);
    }

    // BLOCKS-UNTIL: not-full
    public synchronized void put(V v) throws InterruptedException {
        while (isFull())
            wait();
        doPut(v);
        notifyAll();
    }

    // BLOCKS-UNTIL: not-empty
    public synchronized V take() throws InterruptedException {
        while (isEmpty())
            wait();
        V v = doTake();
        notifyAll();
        return v;
    }

    // BLOCKS-UNTIL: not-full
    // Alternate form of put() using conditional notification
    public synchronized void alternatePut(V v) throws InterruptedException {
        while (isFull())
            wait();
        boolean wasEmpty = isEmpty();
        doPut(v);
        if (wasEmpty)
            notifyAll();
    }
}

由于逻辑简单,不多说了。 可以想一下为什么使用notifyAll().从中,可以对号入座,

状态变量在哪?条件断言在哪?条件队列,因为使用synchronized保证的同步,所以是JVM底层实现的。如果你有心的话,应该注意一个情况:wait操作,都出现在循环中

为什么要在循环中wait?有以下几个原因。
1、一个对象锁可能用于保护多个状态变量,当它们都需要wait-notify操作时,如果不将wait放到while中就有问题。例如,某对象锁obj保护两种状态变量a和b,当a的条件断言不成立时发生了wait操作,当b的条件断言不成立时也发生了wait操作,两个线程被加入到obj对应的条件队列中,现在若改变状态变量a的某操作发生,在obj上调用了notifyAll操作,obj对应的条件队列里的所有线程均被唤醒,之前等待a的某个或几个线程去判断a的条件断言可能成立了,但b对应的条件断言肯定仍不成立,而此时等待b的线程也被唤醒了,所以需要循环判断b的条件断言是否满足,如果不满足,继续wait。
2、多个线程wait的同一个状态的条件断言。如BlockingQueue场景下,当前队列是空的,多个线程要从里面取元素,于是都wait了。此时另一个线程往里面添加了一个元素,调用了notifyAll操作,唤醒了所有线程,但只有一个线程能拿到那个新加进来的元素,继续走下去,其它的仍需等待。
3、虚假唤醒。在没有被通知、中断或超时的情况下,线程自动苏醒了。虽然这种情况在实践中很少发生,但是必须通过循环检测条件是否满足的方式来防止其发生,如果不满足该条件,则继续等待。


2.认识Condition

Condition是在java 1.5中才出现的,它用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition1的await()、signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用Condition,在阻塞队列那一篇博文中就讲述到了,阻塞队列实际上是使用了Condition来模拟线程间协作。
    Condition是个接口,基本的方法就是await()和signal()方法;
    Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition()
    调用Condition的await()和signal()方法,都必须在lock保护之内
 Conditon中的await()对应Object的wait();
 Condition中的signal()对应Object的notify();
 Condition中的signalAll()对应Object的notifyAll()。

强调:这里的Condition可以对应条件断言(Condition Predicate),一并理解。需要说到的是,它与内置锁、内置条件队列的区别

1、内置锁对象只有一个条件队列,而显式锁可以通过newCondition方法创建多个条件队列,这样就可以避免不同的条件断言关联同一个条件队列造成的问题。
2、如同Lock比内置锁更灵活一样,显式的条件队列也提供了更多的方法供调用(如等待的时候不可被中断的awaitUninterruptibly方法),更多方法参见java.util.concurrent.locks.Condition的JAVA API。
3、Condition也有wait、notify方法,它们从Object类继承而来,一般实际中不会调用这些方法(要调用这些方法必须持有Condition对象的锁,而不是Lock的锁定)以避免混淆。
4、Condition可以继承Lock的公平策略。如new ReentrantLock的时候传入的公平策略参数。当公平策略为true的时候,signal的时候,Condition中的线程唤醒顺序是FIFO的。

至于是选择显式的条件队列还是内置的,如同内置锁和Lock一样,取决于应用是否需要使用内置条件队列无法提供而显式条件队列提供了的特性。如果已使用了Lock,那么使用Condition是自然而然的事情。

官方API中提供的例子,如下

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class BoundedBuffer<T> {
     final Lock lock = new ReentrantLock();
     final Condition notFull  = lock.newCondition();
     final Condition notEmpty = lock.newCondition();
     final T[] items =(T[]) new Object[10];
     int putptr, takeptr, count;
     public void put(T x) throws InterruptedException {
       lock.lock();
       try {
         while (count == items.length) {
             notFull.await();
         }
         items[putptr] = x;
         if (++putptr == items.length) putptr = 0;
         ++count;
         notEmpty.signal();
       } finally {
         lock.unlock();
       }
     }
  
     public T take() throws InterruptedException {
       lock.lock();
       try {
         while (count == 0) {
             notEmpty.await();
         }
         T x = items[takeptr];
         if (++takeptr == items.length) takeptr = 0;
         --count;
         notFull.signal();
         return x;
       } finally {
         lock.unlock();
       }
     }
   }

可以和上文中的BoundedBuffer对比阅读。

两者的最大不同:除了同步形式不同,更主要的是方法一,涉及到竞态的地方都用了synchronized,这样做,虽然保证了线程安全,但降低了并发。


3.应用Condition

Condition到底有啥用,相信有人和我一样,感到很困惑。不妨做个例子,使用Lock,但不借助Condition的情况,是什么样的。为了更好说明Condition的使用场景,例子最好要有多个竞态条件。

下面就使用Condition来完成一个线程面试题:有三个线程分别打印A、B、C,请用多线程编程实现,在屏幕上循环打印10次ABCABC…

3.1不使用Condition版本

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ConditionExample {
    private static   Object lock = new Object();
    /** 当前线程的名字 */
    private static volatile char currentThreadName = ‘A‘;
    private static class ThreadA implements Runnable {
        public void run() {
            for (int i = 0; i < 10; i++) {
                synchronized (lock){
                    {
                        while (currentThreadName != ‘A‘) {
                            try {
                            /*
                             * 如果当前线程名字不是A,那么ThreadA就处理等待状态
                             */
                                lock.wait();
                            } catch (InterruptedException e) {

                            }
                        }
                    /*
                     * 打印出第几遍以及A信息
                     */
                        System.out.println(String.format("第%d遍", i + 1));
                        System.out.println("A");

                    /*
                     * 将当前线程名置为B, 然后通知ThreadB执行
                     */
                        currentThreadName = ‘B‘;
                        lock.notifyAll();
                    }
                }
            }
        }

    }

    private static class ThreadB implements Runnable {
        public void run() {
            for (int i = 0; i < 10; i++) {
                synchronized (lock){
                    {
                        while (currentThreadName != ‘B‘) {
                            try {
                            /*
                             * 如果当前线程名字不是A,那么ThreadA就处理等待状态
                             */
                                lock.wait();
                            } catch (InterruptedException e) {

                            }
                        }
                        System.out.println("B");

                    /*
                     * 将当前线程名置为B, 然后通知ThreadB执行
                     */
                        currentThreadName = ‘C‘;
                        lock.notifyAll();
                    }
                }
            }
        }

    }

    private static class ThreadC implements Runnable {
        public void run() {
            for (int i = 0; i < 10; i++) {
                synchronized (lock){
                    {
                        while (currentThreadName != ‘C‘) {
                            try {
                            /*
                             * 如果当前线程名字不是A,那么ThreadA就处理等待状态
                             */
                                lock.wait();
                            } catch (InterruptedException e) {

                            }
                        }
                        System.out.println("C");

                    /*
                     * 将当前线程名置为B, 然后通知ThreadB执行
                     */
                        currentThreadName = ‘A‘;
                        lock.notifyAll();
                    }
                }
            }
        }

    }

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(3);
        service.execute(new ThreadA());
        service.execute(new ThreadB());
        service.execute(new ThreadC());
//        new Thread(new Task("C")).start();
    }
}

3.2 使用Condition的例子(代码来自http://mouselearnjava.iteye.com/blog/1948437)

package my.thread.test;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;

/**
 * 题目:有三个线程分别打印A、B、C,请用多线程编程实现,在屏幕上循环打印10次ABCABC…
 * 
 * 本程序采用Lock和Condition来实现。
 * 
 * @author Eric
 * 
 */
public class ConditionExample {

    private Lock lock = new ReentrantLock();

    private Condition conditionA = lock.newCondition();
    private Condition conditionB = lock.newCondition();
    private Condition conditionC = lock.newCondition();

    /** 当前线程的名字 */
    private char currentThreadName = ‘A‘;

    private static final Logger logger = Logger
            .getLogger("my.thread.test.OrderPrintTest");

    public static void main(String[] args) {

        ConditionExample ce = new ConditionExample();

        ExecutorService service = Executors.newFixedThreadPool(3);
        service.execute(ce.new ThreadA());
        service.execute(ce.new ThreadB());
        service.execute(ce.new ThreadC());

        service.shutdown();
    }

    private class ThreadA implements Runnable {
        public void run() {

            for (int i = 0; i < 10; i++) {
                lock.lock();
                try {
                    while (currentThreadName != ‘A‘) {
                        try {
                            /*
                             * 如果当前线程名字不是A,那么ThreadA就处理等待状态
                             */
                            conditionA.await();
                        } catch (InterruptedException e) {
                            logger.severe(e.getLocalizedMessage());
                        }
                    }

                    /*
                     * 打印出第几遍以及A信息
                     */
                    System.out.println(String.format("第%d遍", i + 1));
                    System.out.println("A");

                    /*
                     * 将当前线程名置为B, 然后通知ThreadB执行
                     */
                    currentThreadName = ‘B‘;
                    conditionB.signal();

                } finally {
                    lock.unlock();
                }
            }
        }

    }

    private class ThreadB implements Runnable {
        public void run() {
            for (int i = 0; i < 10; i++) {
                lock.lock();
                try {
                    while (currentThreadName != ‘B‘) {
                        try {
                            /*
                             * 如果当前线程名字不是B,那么ThreadB就处理等待状态
                             */
                            conditionB.await();
                        } catch (InterruptedException e) {
                            logger.severe(e.getLocalizedMessage());
                        }
                    }

                    /*
                     * 打印信息B
                     */
                    System.out.println("B");

                    /*
                     * 将当前线程值置为C 并通过ThreadC来执行
                     */
                    currentThreadName = ‘C‘;
                    conditionC.signal();

                } finally {
                    lock.unlock();
                }
            }

        }

    }

    private class ThreadC implements Runnable {

        public void run() {
            for (int i = 0; i < 10; i++) {
                lock.lock();
                try {
                    while (currentThreadName != ‘C‘) {
                        try {
                            /*
                             * 如果当前线程名字不是C,那么ThreadC就处理等待状态
                             */
                            conditionC.await();
                        } catch (InterruptedException e) {
                            logger.severe(e.getLocalizedMessage());
                        }
                    }

                    /*
                     * 打印信息C
                     */
                    System.out.println("C");
                    System.out.println();

                    /*
                     * 将当前线程值置为A 并通过ThreadA来执行
                     */
                    currentThreadName = ‘A‘;
                    conditionA.signal();

                } finally {
                    lock.unlock();
                }

            }
        }
    }
}


个人认为:使用Condition:一个Condition,对应一个竞态条件,实现起来很简单,可以实现针对性通知。额外Condition提供了一些额外功能,如awaitUninterruptibly

中文API:http://www.cjsdn.net/Doc/JDK60/java/util/concurrent/locks/Condition.html

资源

http://www.ticmy.com/?p=219

http://www.cnblogs.com/dolphin0520/p/3920385.html

本文出自 “简单” 博客,请务必保留此出处http://dba10g.blog.51cto.com/764602/1794785

以上是关于线程基础知识系列(四)线程的同步2 线程通信和Condition变量的主要内容,如果未能解决你的问题,请参考以下文章

线程基础知识系列(三)线程的同步

JavaSE基础(十 一 )--<线程>线程同步,死锁,Lock锁,线程通信,生产消费问题,新增的线程创建方式

JAVA开发知识之Java的线程

Java 多线程基础

线程间的通信同步方式与进程间通信方式

java多线程系列:CountDownLatch