Java并发编程-Semaphore

Posted ling

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java并发编程-Semaphore相关的知识,希望对你有一定的参考价值。

  基于AQS的前世今生,来学习并发工具类Semaphore。本文将从Semaphore的应用场景、源码原理解析来学习这个并发工具类。

1、 应用场景

  Semaphore用来控制同时访问某个特定资源的操作数量,或者同时执行某个指定操作的数量。还可以用来实现某种资源池限制,或者对容器施加边界。

1.1   当成锁使用

  控制同时访问某个特定资源的操作数量,代码如下:

public class SemaphoreLock {
    public static void main(String[] args) {
        //1、信号量为1时 相当于普通的锁  信号量大于1时 共享锁
        Output o = new Output();
        for (int i = 0; i < 5; i++) {
            new Thread(() -> o.output()).start();
        }
    }
}
class Output {
    Semaphore semaphore = new Semaphore(1);

    public void output() {
        try {
            semaphore.acquire();
            System.out.println(Thread.currentThread().getName() + " start at " + System.currentTimeMillis());
            Thread.sleep(1000);
            System.out.println(Thread.currentThread().getName() + " stop at " + System.currentTimeMillis());
        }catch(Exception e) {
            e.printStackTrace();
        }finally {
            semaphore.release();
        }
    }
}

 

1.2   线程通信信号

  线程间通信,代码如下:

public class SemaphoreCommunication {
    public static void main(String[] args) {
        //2、线程间进行通信
        Semaphore semaphore = new Semaphore(1);
        new SendingThread(semaphore,"SendingThread");
        new ReceivingThread(semaphore,"ReceivingThread");
    }
}
class SendingThread extends Thread {
    Semaphore semaphore;
    String name;

    public SendingThread(Semaphore semaphore,String name) {
        this.semaphore = semaphore;
        this.name = name;
        new Thread(this).start();
    }

    public void run() {
        try {
            semaphore.acquire();
            for (int i = 0; i < 5; i++) {
                System.out.println(name + ":" + i);
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        semaphore.release();
    }
}

class ReceivingThread extends Thread {
    Semaphore semaphore;
    String name;

    public ReceivingThread(Semaphore semaphore,String name) {
        this.semaphore = semaphore;
        this.name = name;
        new Thread(this).start();
    }

    public void run() {
        try {
            semaphore.acquire();
            for (int i = 0; i < 5; i++) {
                System.out.println(name + ":" + i);
                Thread.sleep(1000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        semaphore.release();
    }
}

 

1.3   资源池限制

  对资源池进行资源限制,代码如下:

public class SemaphoreConnect {
    public static void main(String[] args) throws Exception {
        //3、模拟连接池数量限制
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 200; i++) {
            executorService.submit(new Runnable() {
                @Override
                public void run() {
                    Connection.getInstance().connect();
                }
            });
        }
        executorService.shutdown();
        executorService.awaitTermination(1, TimeUnit.DAYS);
    }
}
class Connection {
    private static Connection instance = new Connection();
    private Semaphore semaphores = new Semaphore(10,true);
    private int connections = 0;

    private Connection() {
    }

    public static Connection getInstance() {
        return instance;
    }

    public void connect() {
        try {
            semaphores.acquire();
            doConnect();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            semaphores.release();
        }
    }

    private void doConnect() {
        synchronized (this) {
            connections ++;
            System.out.println("current get connections is : " + connections);
        }

        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        synchronized (this) {
            connections --;
            System.out.println("after release current  connections is : " + connections);
        }
    }
}

 

1.4  容器边界限制

  对容器进行边界限制,代码如下:

public class SemaphoreBoundedList {
    public static void main(String[] args) {
        //4、容器边界限制
        final BoundedList ba = new BoundedList(5);
        Runnable runnable1 = new Runnable() {
                public void run() {
                    try {
                        ba.add("John");
                        ba.add("Martin");
                        ba.add("Adam");
                        ba.add("Prince");
                        ba.add("Tod");
                        System.out.println("Available Permits : " + ba.getSemaphore().availablePermits());
                        ba.add("Tony");
                        System.out.println("Final list: " + ba.getArrayList());
                    }catch (InterruptedException ie) {
                        Thread.interrupted();
                    }
                }
        };
        Runnable runnable2 = new Runnable() {
            public void run() {
                try {
                    System.out.println("Before removing elements: "+ ba.getArrayList());
                    Thread.sleep(5000);
                    ba.remove("Martin");
                    ba.remove("Adam");
                }catch (InterruptedException ie) {
                    Thread.interrupted();
                }
            }
        };
        Thread thread1 = new Thread(runnable1);
        Thread thread2 = new Thread(runnable2);
        thread1.start();
        thread2.start();
    }
}
class BoundedList<T> {
    private final Semaphore semaphore;
    private List arrayList;

    BoundedList(int limit) {
        this.arrayList = Collections.synchronizedList(new ArrayList());
        this.semaphore = new Semaphore(limit);
    }


    public boolean add(T t) throws InterruptedException {
        boolean added = false;
        semaphore.acquire();
        try {
            added = arrayList.add(t);
            return added;
        } finally {
            if (!added)
                semaphore.release();
        }

    }


    public boolean remove(T t) {
        boolean wasRemoved = arrayList.remove(t);
        if (wasRemoved)
            semaphore.release();
        return wasRemoved;
    }

    public void remove(int index) {
        arrayList.remove(index);
        semaphore.release();
    }

    public List getArrayList() {
        return arrayList;
    }


    public Semaphore getSemaphore() {
        return semaphore;
    }
}

 

2、 源码原理解析

2.1 获取信号

  获取信号的方法如下:

public void acquire() throws InterruptedException {
   sync.acquireSharedInterruptibly(1);//共享式获取AQS的同步状态
}

  调用的是AQS的acquireSharedInterruptibly方法:

public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())//线程中断 说明信号量对线程中断敏感
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0) //获取信号量失败 线程进入同步队列自旋等待
            doAcquireSharedInterruptibly(arg);
    }

  其中tryAcquireShared依赖的是Sync的实现,Sync提供了公平和非公平式的方式,先看非公平式。

protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);
        }
final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();//同步状态 当前的信号量许可数
                int remaining = available - acquires;//减去释放的信号量 剩余信号量许可数
                if (remaining < 0 ||//剩余信号量小于0 直接返回remaining 不做CAS
                    compareAndSetState(available, remaining))//CAS更新
                    return remaining;
            }
        }

  再看下公平式的。

protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())//判断同步队列如果存在前置节点 获取信号量失败  其他和非公平式是一致的
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

  最后来看下,如果未获取到信号量的处理方法doAcquireSharedInterruptibly。

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);//线程进入同步队列
        boolean failed = true;
        try {
            for (;;) {//自旋
                final Node p = node.predecessor();
                if (p == head) {//当前节点的前置节点是AQS的头节点 即自己是AQS同步队列的第一个节点
                    int r = tryAcquireShared(arg); //再去获取信号量
                    if (r >= 0) {//获取成功
                        setHeadAndPropagate(node, r);//退出自旋
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node); //获取失败 就取消获取
        }
    }

2.2 释放信号

  释放信号的方法如下:

public void release() {
        sync.releaseShared(1);
    }

  调用的是AQS的releaseShared方法:

public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {//释放信号量
            doReleaseShared();//唤醒后续的线程节点
            return true;
        }
        return false;
}

  tryReleaseShared交由子类Sync实现,代码如下:

protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();//当前信号量许可数
                int next = current + releases; //当前信号量许可数+释放的信号量许可数
                if (next < current) // overflow 这个分支我看着永远走不进来呢
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))//CAS更新当前信号量许可数
                    return true;
            }
        }

  释放成功后,则继续调用doReleaseShared,唤醒后续线程节点可以来争取信号量了。

private void doReleaseShared() {
        for (;;) {
            Node h = head; //头节点
            if (h != null && h != tail) {//同步队列中存在线程等待
                int ws = h.waitStatus; //头节点线程状态
                if (ws == Node.SIGNAL) {//头节点线程状态为SIGNAL 唤醒后续线程节点
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h); //唤醒下个节点
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

  总结:Semaphore使用AQS同步状态来保存信号量的当前计数。它里面定义的acquireSharedInterruptibly方法会减少计数,当计数为非正值时阻塞线程,releaseShared方法会增加计数,在计数不超过信号量限制时要解除线程的阻塞。

 

参考资料:

https://github.com/lingjiango/ConcurrentProgramPractice

https://www.caveofprogramming.com/java-multithreading/java-multithreading-semaphores-part-12.html

https://java2blog.com/java-semaphore-example/

http://tutorials.jenkov.com/java-util-concurrent/semaphore.html

以上是关于Java并发编程-Semaphore的主要内容,如果未能解决你的问题,请参考以下文章

Java并发编程-Semaphore

并发编程Semaphore详解

Java并发多线程编程——并发工具类Semaphore(信号量)

Java并发编程:CountDownLatchCyclicBarrier和Semaphore

Java并发编程:CountDownLatchCyclicBarrier和 Semaphore

Java并发编程:CountDownLatchCyclicBarrier和 Semaphore