Java源码Concurrent包剩余部分类

Posted 低调的洋仔

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java源码Concurrent包剩余部分类相关的知识,希望对你有一定的参考价值。

FutureTask

 

    /**
     * Simple linked list nodes to record waiting threads in a Treiber
     * stack.  See other classes such as Phaser and SynchronousQueue
     * for more detailed explanation.
     */
    static final class WaitNode 
        volatile Thread thread;
        volatile WaitNode next;
        WaitNode()  thread = Thread.currentThread(); 
    

 

本质上就是状态和线程运行的控制

运行的时候设置状态是NEW然后启动后计算出来结果然后设置其结果值并设置他的状态是COMPLETING状态,然后调用方法来通知该线程已经完成了任务了,然后直接激活该线程,然后可以返回最终的结果了。

 

里面的每个节点是一个单链表的形式。这个链表用于存储等待执行结果的线程的。

调用了run方法,基本上就是运行要运行的Callable,然后得到运行的结果,如果这个结果是正常的就设置其值为result调用结束运行通知线程拿到数据。

 

 // 启动运行
    public void run() 
    	// 检查是不是NEW状态如果是NEW的话再检查runnerOffset这个地方能不能设置当前的线程
    	// 如果都有一个满足就直接退出了,return了
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try 
        	// 赋值callable
            Callable<V> c = callable;
            // 检查状态
            if (c != null && state == NEW) 
                V result;
                boolean ran;
                // 设置结果和运行状态
                try 
                    result = c.call();
                    ran = true;
                 catch (Throwable ex) 
                	// 异常状态
                    result = null;
                    ran = false;
                    setException(ex);
                
                // 如果正常运行了,得到的结果设置
                if (ran)
                    set(result);
            
         finally 
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        
    

调用set方法,调用的时候会调用finishCompletion方法。

 

 

    /**
     * Sets the result of this future to the given value unless
     * this future has already been set or has been cancelled.
     *
     * <p>This method is invoked internally by the @link #run method
     * upon successful completion of the computation.
     *
     * @param v the value
     */
    protected void set(V v) 
    	// 将原来设置进去的线程的状态设置为完成中状态
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) 
        	// 设置结果等
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        
    

完成运行

 

unpark所有等待的节点

 

    /**
     * Removes and signals all waiting threads, invokes done(), and
     * nulls out callable.
     * 完成运行
     * unpark所有等待节点
     */
    private void finishCompletion() 
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) 
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) 
                for (;;) 
                    Thread t = q.thread;
                    if (t != null) 
                        q.thread = null;
                        LockSupport.unpark(t);
                    
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                
                break;
            
        

        done();

        callable = null;        // to reduce footprint
    


get的时候会发生阻塞

 

 

    public V get() throws InterruptedException, ExecutionException 
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    

当状态的值小于COMPLETING的时候,在不断的尝试后进行park住当前线程。
唤醒时的状态是COMPLETING状态。

    /**
     * Awaits completion or aborts on interrupt or timeout.
     *
     * @param timed true if use timed waits
     * @param nanos time to wait, if timed
     * @return state upon completion
     */
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException 
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) 
            if (Thread.interrupted()) 
                removeWaiter(q);
                throw new InterruptedException();
            

            int s = state;
            if (s > COMPLETING) 
                if (q != null)
                    q.thread = null;
                return s;
            
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) 
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) 
                    removeWaiter(q);
                    return state;
                
                LockSupport.parkNanos(this, nanos);
            
            else
                LockSupport.park(this);
        
    

 

 

唤醒后检查状态值即可退出了。然后调用report方法,得到最终的数据,因为在上面的计算结果时就已经将结果存储到outcome中了,这里只是取出来就好了。

 

    private V report(int s) throws ExecutionException 
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    


状态变迁以及其属性值。

 

 

     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

    /** The underlying callable; nulled out after running */
    private Callable<V> callable;
    /** The result to return or exception to throw from get() */
    private Object outcome; // non-volatile, protected by state reads/writes
    /** The thread running the callable; CASed during run() */
    private volatile Thread runner;
    /** Treiber stack of waiting threads */
    private volatile WaitNode waiters;


取消状态

 

 

    public boolean cancel(boolean mayInterruptIfRunning) 
        if (state != NEW)
            return false;
        if (mayInterruptIfRunning) 
            if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
                return false;
            Thread t = runner;
            if (t != null)
                t.interrupt();
            UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
        
        else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
            return false;
        finishCompletion();
        return true;
    

 

 

 

之前版本使用AQS会出现cancel(true)问题,我看的代码是1.7的,后来找了之前版本的代码,最主要的原因是因为innerCancel()方法中的一段代码:

 

innerCancel:

----

Thread thread = runner;

if(thread != null)

    thread.interrupt();

---

运行如下代码:

 

        ExecutorService executor = Executors.newFixedThreadPool(1);

        executor.submit(new Task()).cancel(true);

        executor.submit(new Task());

 

假如第一个submit运行,然后cancel的时候运气不好,获取到runner后发生线程切换,executor执行第二个submit,然后又切换回去了,cancel代码继续运行,就会到thread.interrupt();这一句,这样再切换第二个task的任务的时候,就会发生中断。具体可以看参考里面的那个bug解释。


 

Executors

这个是线程池的工厂类,这个类中可以帮助你创建不同的需要的线程池。

包括单个线程的。newSingleThreadScheduledExecutor

几个固定的线程的线程池。newFixedThreadPool(int)

newCachedThreadPool默认会自动新建新的线程,当存在空闲的时候可以利用已经存在的,只要没有空闲的,就创建。

newScheduledThreadPool(int)创建可调度的线程池

 

ThreadPoolExecutor

1.如果当前线程数量大于等于核心线程数量,进入第2步;如果当前线程数量小于核心线程数量,那么尝试添加一个工作线程,同时让这个工作线程处理当前提交的任务,提交任务流程结束;如果添加工作线程失败,那么进入第2步。

2.首先判断当前线程池状态是否为正在运行,如果正在运行,就将当前任务放入任务队列中,然后进入第4步。

3.如果当前线程池状态不是正在运行,或者第2步中将任务放入任务队列失败(任务队列饱和),那么尝试添加一个工作线程,同时让这个工作线程处理当前提交的任务,但不能超时最大工作线程数。如果添加成功,提交任务流程结束;如果添加失败,使用拒绝任务处理器来处理任务。

4.再次检测线程池状态,确保刚添加的任务能被处理,提交任务流程结束。

 

public void execute(Runnable command) 
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) 
            if (addWorker(command, true))
                return;
            c = ctl.get();
        
        if (isRunning(c) && workQueue.offer(command)) 
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        
        else if (!addWorker(command, false))
            reject(command);
    

 

 

 

 

 

ExecutionCompletionService

 

    private class QueueingFuture extends FutureTask<Void> 
        QueueingFuture(RunnableFuture<V> task) 
            super(task, null);
            this.task = task;
        
        protected void done()  completionQueue.add(task); // 任务完成后将其放入完成队列中。
        private final Future<V> task;
    

内部搞了一个FutureTask的队列,有任务来就填进去。

 

内部的属性

 

    private final Executor executor;
    private final AbstractExecutorService aes;
    private final BlockingQueue<Future<V>> completionQueue;


提交的时候

 

 

    public Future<V> submit(Callable<V> task) 
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    

    public Future<V> submit(Runnable task, V result) 
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    

方法很简单,提交到ExecutorCompletionService的任务会在内部被包装成QueueingFuture,并由内部的executor来执行这个任务,当任务执行完成后,会被加入到内部的队列里面,外部程序就可以通过take或者poll方法来获取完成的任务了

 

 

ScheduledThreadPoolExecutor

首先创建实例的时候

 

    public ScheduledThreadPoolExecutor(int corePoolSize) 
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue());
    

创建好的队列是DelayWorkQueue这个自己内部实现了的队列。

 

 

    static class DelayedWorkQueue extends AbstractQueue<Runnable>
        implements BlockingQueue<Runnable> 

实际上实现的是和DelayQueue差不多的。也是采用的二叉堆结构来存储消息。以运行时间为标准进行调整在堆中的先后次序的。也就是说越靠近头部的优先级越高,离得执行时间越近。

 

 

        public RunnableScheduledFuture poll() 
            final ReentrantLock lock = this.lock;
            lock.lock();
            try 
                RunnableScheduledFuture first = queue[0];
                if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                    return null;
                else
                    return finishPoll(first);
             finally 
                lock.unlock();
            
        

获取的时候如果没有到时间不会返回最终的结果的。

 

 

提交任务

 

    public Future<?> submit(Runnable task) 
        return schedule(task, 0, TimeUnit.NANOSECONDS);
    

调用了schedule方法

 

 

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) 
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
    

triggerTime设置了触发时间,然后包装成一个ScheduleFutureTask类,然后

 

 

    private long triggerTime(long delay, TimeUnit unit) 
        return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
    

    /**
     * Returns the trigger time of a delayed action.
     */
    long triggerTime(long delay) 
        return now() +
            ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
    


调用了delayedExecute

 

 

    private void delayedExecute(RunnableScheduledFuture<?> task) 
        if (isShutdown())//如果已经关闭的话就调用reject
            reject(task);
        else 
            super.getQueue().add(task);// 首先加入队列
            // 再次判断线程池状态和该状态下,任务是否能执行,不能就remove,然后task取消
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                ensurePrestart();// 如果可以运行就调用父类方法确保有一个线程运行
        
    

这个地方加入队列中,就是获取到的刚开始创建好的DelayedWorkQueue这个队列,然后将任务放进这个队列中去。

 

 

    void ensurePrestart() 
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
    

调用addWorker这个方法来添加worker

 

 

    private boolean addWorker(Runnable firstTask, boolean core) 
        retry:
        for (;;) 
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) 
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            
        

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try 
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) 
                mainLock.lock();
                try 
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) 
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);// 添加了worker进去
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    
                 finally 
                    mainLock.unlock();
                
                if (workerAdded) 
                    t.start();
                    workerStarted = true;
                
            
         finally 
            if (! workerStarted)
                addWorkerFailed(w);
        
        return workerStarted;
    

应该在这里触发了run方法,感觉这里应该是与线程池有关的额,当真正的线程池需要执行的时候会调用worker的run方法,然后调用runWorker方法

 

然后调用worker的run方法。

        public void run()
            runWorker(this);
       

调用了runWorker方法,

 

    final void runWorker(Worker w) 
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try 
            while (task != null || (task = getTask()) != null) 
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try 
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try 
                        task.run();// 调用任务运行
                     catch (RuntimeException x) 
                        thrown = x; throw x;
                     catch (Error x) 
                        thrown = x; throw x;
                     catch (Throwable x) 
                        thrown = x; throw new Error(x);
                     finally 
                        afterExecute(task, thrown);
                    
                 finally 
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                
            
            completedAbruptly = false;
         finally 
            processWorkerExit(w, completedAbruptly);
        
    


然后调用的ScheduledFutureTask中的run方法

 

 

public void run() 
            boolean periodic = isPeriodic();
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            else if (!periodic)
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) 
                setNextRunTime();
                reExecutePeriodic(outerTask);
            
        

包括周期性的运行和一次性的运行。

 

 

Exchanger

 

    private static final class Node extends AtomicReference<Object> 
        /** The element offered by the Thread creating this node. */
        public final Object item;

        /** The Thread waiting to be signalled; null until waiting. */
        public volatile Thread waiter;

        /**
         * Creates node with given item and empty hole.
         * @param item the item
         */
        public Node(Object item) 
            this.item = item;
        
    

Node继承了AtomicReference,这个Node就是表示的当前等待的线程的相关信息。

 

在交换数据的时候,一个线程到了如果不存在不为null的Node说明当前没有线程要交换数据的,尝试将这个获取到的slot占用,如果获取的这个slot的数组的索引是0的话,就阻塞等待,不是0 的话就自旋等待。当然如果获取到的这个不为null的Node时,那么这个slot可能是被其他的Node占用了,如果确实存在这个Node的话尝试将slot置为空,然后将Node赋值给you,然后,尝试将Node的value设置为item,然后返回you的item完成数据的交换。

 

private Object doExchange(Object item, boolean timed, long nanos) 
        Node me = new Node(item);                 // Create in case occupying创建个当前节点
        int index = hashIndex();                  // Index of current slot计算当前slot的下标
        int fails = 0;                            // Number of CAS failures

        for (;;) 
            Object y;                             // Contents of current slot保存当前slot中可能存在的Node
            Slot slot = arena[index];			  // 按照前面计算出下标获取到slot
            if (slot == null)                     // Lazily initialize slots,如果slot为空的话,继续创建slot,然后继续循环
                createSlot(index);                // Continue loop to reread
            else if ((y = slot.get()) != null &&  // Try to fulfill, 如果slot不为空,那么slot可能被另一个Node给占了,如果确实存在这个Node,尝试将其置空。(表示当前节点要和这个Node交换数据了)
                     slot.compareAndSet(y, null)) 
                Node you = (Node)y;               // Transfer item // 给这个Node转型,赋给you
                if (you.compareAndSet(null, item)) // 将item设置给you,注意you本身是一个AtomicReference,这里相当于把item设置到you的value字段上。  
                    LockSupport.unpark(you.waiter); // 然后唤醒you节点上等待的线程。
                    return you.item; 				// 返回you的item。  
                                                 // Else cancelled; continue
            
            else if (y == null &&                 // Try to occupy // 如果slot为空,那么说明没有要和当前线程交换数据的线程,  
                     slot.compareAndSet(null, me))  //那么当前线程先尝试把这个slot给占了。
                if (index == 0)                   // Blocking wait for slot 0 // 如果slot下标为0,那么阻塞等待。  
                    return timed ?					// 有超时的话,会阻塞给定的时间。  
                        awaitNanos(me, slot, nanos) :
                        await(me, slot);
                Object v = spinWait(me, slot);    // Spin wait for non-0   // 如果slot下标不是0,自旋等待,等待其他线程来和当前线程交换数据,然后返回交换后的数据。  
                if (v != CANCEL)
                    return v;
                me = new Node(item);              // Throw away cancelled node// 如果取消的话,重试,重建一个Node,之前的Node就丢弃了。  
                int m = max.get();					 // 获取当前slot下标的最大值。  
                if (m > (index >>>= 1))           // Decrease index // 如果当前允许的最大索引太大。 
                    max.compareAndSet(m, m - 1);  // Maybe shrink table  // 递减最大索引  
            
            else if (++fails > 1)                // Allow 2 fails on 1st slot // 如果1个slot竞争失败超过2次。  
                int m = max.get();
                if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))//如果竞争失败超过3次,尝试递增最大索引值。  
                    index = m + 1;                // Grow on 3rd failed slot // 增加索引值。 
                else if (--index < 0)				// 换个index。
                    index = m;                    // Circularly traverse // 绕回逻辑,防止index越界。 
            
        
    

await里面会有线程的阻塞。

    private static Object await(Node node, Slot slot) 
        Thread w = Thread.currentThread();
        int spins = SPINS;
        for (;;) 
            Object v = node.get();
            if (v != null)
                return v;
            else if (spins > 0)                 // Spin-wait phase
                --spins;
            else if (node.waiter == null)       // Set up to block next
                node.waiter = w;
            else if (w.isInterrupted())         // Abort on interrupt
                tryCancel(node, slot);
            else                                // Block
                LockSupport.park(node);
        
    

 

这里形象的理解一下:

          其实就是"我"和"你"(可能有多个"我",多个"你")在一个叫Slot的地方做交易(一手交钱,一手交货),过程分以下步骤:

              1.我到交易地点(Slot)的时候,你已经到了,那我就尝试喊你交易,如果你回应了我,决定和我交易那么进入第2步;如果别人抢先一步把你喊走了,那我只能再找别人了,进入第5步。

              2.我拿出钱交给你,你可能会接收我的钱,然后把货给我,交易结束;也可能嫌我掏钱太慢(超时)或者接个电话(中断),TM的不卖了,走了,那我只能再找别人买货了(从头开始)。

              3.我到交易地点的时候,你不在,那我先尝试把这个交易点给占了(一屁股做凳子上...),如果我成功抢占了单间(交易点),那就坐这儿等着你拿货来交易,进入第4步;如果被别人抢座了,那我只能在找别的地方儿了,进入第5步。

              4.你拿着货来了,喊我交易,然后完成交易;也可能我等了好长时间你都没来,我不等了,继续找别人交易去,走的时候我看了一眼,一共没多少人,弄了这么多单间(交易地点Slot),太TM浪费了,我喊来交易地点管理员:一共也没几个人,搞这么多单间儿干毛,给哥撤一个!。然后再找别人买货(从头开始);或者我老大给我打了个电话,不让我买货了(中断)。

              5.如果之前我尝试交易了2次都没成功,那我就想我TM选的这个位置(Slot下标)是不是风水不好啊,换个地儿继续(从头开始);如果之前都尝试交易了4次还没成功,我怒了,喊过来交易地点的管理员:给哥再开一个单间(Slot),加一个凳子,这么多人就这么几个破凳子够谁用!

 

 







 





 


 

以上是关于Java源码Concurrent包剩余部分类的主要内容,如果未能解决你的问题,请参考以下文章

Java多线程——AQS框架源码阅读

JDK源码JDK的java.util.concurrent包结构

java 并发(concurrent)包源码分析

java.util.concurrent.DelayQueue 源码学习

简单看看jdk7源码之java.lang包01

jdk8中java.util.concurrent包分析