Java源码Concurrent包剩余部分类
Posted 低调的洋仔
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java源码Concurrent包剩余部分类相关的知识,希望对你有一定的参考价值。
FutureTask
/**
* Simple linked list nodes to record waiting threads in a Treiber
* stack. See other classes such as Phaser and SynchronousQueue
* for more detailed explanation.
*/
static final class WaitNode
volatile Thread thread;
volatile WaitNode next;
WaitNode() thread = Thread.currentThread();
本质上就是状态和线程运行的控制
运行的时候设置状态是NEW然后启动后计算出来结果然后设置其结果值并设置他的状态是COMPLETING状态,然后调用方法来通知该线程已经完成了任务了,然后直接激活该线程,然后可以返回最终的结果了。
里面的每个节点是一个单链表的形式。这个链表用于存储等待执行结果的线程的。
调用了run方法,基本上就是运行要运行的Callable,然后得到运行的结果,如果这个结果是正常的就设置其值为result调用结束运行通知线程拿到数据。
// 启动运行
public void run()
// 检查是不是NEW状态如果是NEW的话再检查runnerOffset这个地方能不能设置当前的线程
// 如果都有一个满足就直接退出了,return了
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try
// 赋值callable
Callable<V> c = callable;
// 检查状态
if (c != null && state == NEW)
V result;
boolean ran;
// 设置结果和运行状态
try
result = c.call();
ran = true;
catch (Throwable ex)
// 异常状态
result = null;
ran = false;
setException(ex);
// 如果正常运行了,得到的结果设置
if (ran)
set(result);
finally
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
调用set方法,调用的时候会调用finishCompletion方法。
/**
* Sets the result of this future to the given value unless
* this future has already been set or has been cancelled.
*
* <p>This method is invoked internally by the @link #run method
* upon successful completion of the computation.
*
* @param v the value
*/
protected void set(V v)
// 将原来设置进去的线程的状态设置为完成中状态
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING))
// 设置结果等
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
完成运行
unpark所有等待的节点
/**
* Removes and signals all waiting threads, invokes done(), and
* nulls out callable.
* 完成运行
* unpark所有等待节点
*/
private void finishCompletion()
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;)
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null))
for (;;)
Thread t = q.thread;
if (t != null)
q.thread = null;
LockSupport.unpark(t);
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
break;
done();
callable = null; // to reduce footprint
get的时候会发生阻塞
public V get() throws InterruptedException, ExecutionException
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
当状态的值小于COMPLETING的时候,在不断的尝试后进行park住当前线程。
唤醒时的状态是COMPLETING状态。
/**
* Awaits completion or aborts on interrupt or timeout.
*
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;)
if (Thread.interrupted())
removeWaiter(q);
throw new InterruptedException();
int s = state;
if (s > COMPLETING)
if (q != null)
q.thread = null;
return s;
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed)
nanos = deadline - System.nanoTime();
if (nanos <= 0L)
removeWaiter(q);
return state;
LockSupport.parkNanos(this, nanos);
else
LockSupport.park(this);
唤醒后检查状态值即可退出了。然后调用report方法,得到最终的数据,因为在上面的计算结果时就已经将结果存储到outcome中了,这里只是取出来就好了。
private V report(int s) throws ExecutionException
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
状态变迁以及其属性值。
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
取消状态
public boolean cancel(boolean mayInterruptIfRunning)
if (state != NEW)
return false;
if (mayInterruptIfRunning)
if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
return false;
Thread t = runner;
if (t != null)
t.interrupt();
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
return false;
finishCompletion();
return true;
之前版本使用AQS会出现cancel(true)问题,我看的代码是1.7的,后来找了之前版本的代码,最主要的原因是因为innerCancel()方法中的一段代码:
innerCancel:
----
Thread thread = runner;
if(thread != null)
thread.interrupt();
---
运行如下代码:
ExecutorService executor = Executors.newFixedThreadPool(1);
executor.submit(new Task()).cancel(true);
executor.submit(new Task());
假如第一个submit运行,然后cancel的时候运气不好,获取到runner后发生线程切换,executor执行第二个submit,然后又切换回去了,cancel代码继续运行,就会到thread.interrupt();这一句,这样再切换第二个task的任务的时候,就会发生中断。具体可以看参考里面的那个bug解释。
Executors
这个是线程池的工厂类,这个类中可以帮助你创建不同的需要的线程池。
包括单个线程的。newSingleThreadScheduledExecutor
几个固定的线程的线程池。newFixedThreadPool(int)
newCachedThreadPool默认会自动新建新的线程,当存在空闲的时候可以利用已经存在的,只要没有空闲的,就创建。
newScheduledThreadPool(int)创建可调度的线程池
ThreadPoolExecutor
1.如果当前线程数量大于等于核心线程数量,进入第2步;如果当前线程数量小于核心线程数量,那么尝试添加一个工作线程,同时让这个工作线程处理当前提交的任务,提交任务流程结束;如果添加工作线程失败,那么进入第2步。
2.首先判断当前线程池状态是否为正在运行,如果正在运行,就将当前任务放入任务队列中,然后进入第4步。
3.如果当前线程池状态不是正在运行,或者第2步中将任务放入任务队列失败(任务队列饱和),那么尝试添加一个工作线程,同时让这个工作线程处理当前提交的任务,但不能超时最大工作线程数。如果添加成功,提交任务流程结束;如果添加失败,使用拒绝任务处理器来处理任务。
4.再次检测线程池状态,确保刚添加的任务能被处理,提交任务流程结束。
public void execute(Runnable command)
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize)
if (addWorker(command, true))
return;
c = ctl.get();
if (isRunning(c) && workQueue.offer(command))
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
else if (!addWorker(command, false))
reject(command);
ExecutionCompletionService
private class QueueingFuture extends FutureTask<Void>
QueueingFuture(RunnableFuture<V> task)
super(task, null);
this.task = task;
protected void done() completionQueue.add(task); // 任务完成后将其放入完成队列中。
private final Future<V> task;
内部搞了一个FutureTask的队列,有任务来就填进去。
内部的属性
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
提交的时候
public Future<V> submit(Callable<V> task)
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
public Future<V> submit(Runnable task, V result)
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture(f));
return f;
方法很简单,提交到ExecutorCompletionService的任务会在内部被包装成QueueingFuture,并由内部的executor来执行这个任务,当任务执行完成后,会被加入到内部的队列里面,外部程序就可以通过take或者poll方法来获取完成的任务了
ScheduledThreadPoolExecutor
首先创建实例的时候
public ScheduledThreadPoolExecutor(int corePoolSize)
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
创建好的队列是DelayWorkQueue这个自己内部实现了的队列。
static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable>
实际上实现的是和DelayQueue差不多的。也是采用的二叉堆结构来存储消息。以运行时间为标准进行调整在堆中的先后次序的。也就是说越靠近头部的优先级越高,离得执行时间越近。
public RunnableScheduledFuture poll()
final ReentrantLock lock = this.lock;
lock.lock();
try
RunnableScheduledFuture first = queue[0];
if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
return null;
else
return finishPoll(first);
finally
lock.unlock();
获取的时候如果没有到时间不会返回最终的结果的。
提交任务
public Future<?> submit(Runnable task)
return schedule(task, 0, TimeUnit.NANOSECONDS);
调用了schedule方法
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit)
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
triggerTime设置了触发时间,然后包装成一个ScheduleFutureTask类,然后
private long triggerTime(long delay, TimeUnit unit)
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
/**
* Returns the trigger time of a delayed action.
*/
long triggerTime(long delay)
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
调用了delayedExecute
private void delayedExecute(RunnableScheduledFuture<?> task)
if (isShutdown())//如果已经关闭的话就调用reject
reject(task);
else
super.getQueue().add(task);// 首先加入队列
// 再次判断线程池状态和该状态下,任务是否能执行,不能就remove,然后task取消
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();// 如果可以运行就调用父类方法确保有一个线程运行
这个地方加入队列中,就是获取到的刚开始创建好的DelayedWorkQueue这个队列,然后将任务放进这个队列中去。
void ensurePrestart()
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
调用addWorker这个方法来添加worker
private boolean addWorker(Runnable firstTask, boolean core)
retry:
for (;;)
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;)
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null)
mainLock.lock();
try
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null))
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);// 添加了worker进去
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
finally
mainLock.unlock();
if (workerAdded)
t.start();
workerStarted = true;
finally
if (! workerStarted)
addWorkerFailed(w);
return workerStarted;
应该在这里触发了run方法,感觉这里应该是与线程池有关的额,当真正的线程池需要执行的时候会调用worker的run方法,然后调用runWorker方法
然后调用worker的run方法。
public void run()
runWorker(this);
调用了runWorker方法,
final void runWorker(Worker w)
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try
while (task != null || (task = getTask()) != null)
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try
beforeExecute(wt, task);
Throwable thrown = null;
try
task.run();// 调用任务运行
catch (RuntimeException x)
thrown = x; throw x;
catch (Error x)
thrown = x; throw x;
catch (Throwable x)
thrown = x; throw new Error(x);
finally
afterExecute(task, thrown);
finally
task = null;
w.completedTasks++;
w.unlock();
completedAbruptly = false;
finally
processWorkerExit(w, completedAbruptly);
然后调用的ScheduledFutureTask中的run方法
public void run()
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset())
setNextRunTime();
reExecutePeriodic(outerTask);
包括周期性的运行和一次性的运行。
Exchanger
private static final class Node extends AtomicReference<Object>
/** The element offered by the Thread creating this node. */
public final Object item;
/** The Thread waiting to be signalled; null until waiting. */
public volatile Thread waiter;
/**
* Creates node with given item and empty hole.
* @param item the item
*/
public Node(Object item)
this.item = item;
Node继承了AtomicReference,这个Node就是表示的当前等待的线程的相关信息。
在交换数据的时候,一个线程到了如果不存在不为null的Node说明当前没有线程要交换数据的,尝试将这个获取到的slot占用,如果获取的这个slot的数组的索引是0的话,就阻塞等待,不是0 的话就自旋等待。当然如果获取到的这个不为null的Node时,那么这个slot可能是被其他的Node占用了,如果确实存在这个Node的话尝试将slot置为空,然后将Node赋值给you,然后,尝试将Node的value设置为item,然后返回you的item完成数据的交换。
private Object doExchange(Object item, boolean timed, long nanos)
Node me = new Node(item); // Create in case occupying创建个当前节点
int index = hashIndex(); // Index of current slot计算当前slot的下标
int fails = 0; // Number of CAS failures
for (;;)
Object y; // Contents of current slot保存当前slot中可能存在的Node
Slot slot = arena[index]; // 按照前面计算出下标获取到slot
if (slot == null) // Lazily initialize slots,如果slot为空的话,继续创建slot,然后继续循环
createSlot(index); // Continue loop to reread
else if ((y = slot.get()) != null && // Try to fulfill, 如果slot不为空,那么slot可能被另一个Node给占了,如果确实存在这个Node,尝试将其置空。(表示当前节点要和这个Node交换数据了)
slot.compareAndSet(y, null))
Node you = (Node)y; // Transfer item // 给这个Node转型,赋给you
if (you.compareAndSet(null, item)) // 将item设置给you,注意you本身是一个AtomicReference,这里相当于把item设置到you的value字段上。
LockSupport.unpark(you.waiter); // 然后唤醒you节点上等待的线程。
return you.item; // 返回you的item。
// Else cancelled; continue
else if (y == null && // Try to occupy // 如果slot为空,那么说明没有要和当前线程交换数据的线程,
slot.compareAndSet(null, me)) //那么当前线程先尝试把这个slot给占了。
if (index == 0) // Blocking wait for slot 0 // 如果slot下标为0,那么阻塞等待。
return timed ? // 有超时的话,会阻塞给定的时间。
awaitNanos(me, slot, nanos) :
await(me, slot);
Object v = spinWait(me, slot); // Spin wait for non-0 // 如果slot下标不是0,自旋等待,等待其他线程来和当前线程交换数据,然后返回交换后的数据。
if (v != CANCEL)
return v;
me = new Node(item); // Throw away cancelled node// 如果取消的话,重试,重建一个Node,之前的Node就丢弃了。
int m = max.get(); // 获取当前slot下标的最大值。
if (m > (index >>>= 1)) // Decrease index // 如果当前允许的最大索引太大。
max.compareAndSet(m, m - 1); // Maybe shrink table // 递减最大索引
else if (++fails > 1) // Allow 2 fails on 1st slot // 如果1个slot竞争失败超过2次。
int m = max.get();
if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1))//如果竞争失败超过3次,尝试递增最大索引值。
index = m + 1; // Grow on 3rd failed slot // 增加索引值。
else if (--index < 0) // 换个index。
index = m; // Circularly traverse // 绕回逻辑,防止index越界。
await里面会有线程的阻塞。
private static Object await(Node node, Slot slot)
Thread w = Thread.currentThread();
int spins = SPINS;
for (;;)
Object v = node.get();
if (v != null)
return v;
else if (spins > 0) // Spin-wait phase
--spins;
else if (node.waiter == null) // Set up to block next
node.waiter = w;
else if (w.isInterrupted()) // Abort on interrupt
tryCancel(node, slot);
else // Block
LockSupport.park(node);
这里形象的理解一下:
其实就是"我"和"你"(可能有多个"我",多个"你")在一个叫Slot的地方做交易(一手交钱,一手交货),过程分以下步骤:
1.我到交易地点(Slot)的时候,你已经到了,那我就尝试喊你交易,如果你回应了我,决定和我交易那么进入第2步;如果别人抢先一步把你喊走了,那我只能再找别人了,进入第5步。
2.我拿出钱交给你,你可能会接收我的钱,然后把货给我,交易结束;也可能嫌我掏钱太慢(超时)或者接个电话(中断),TM的不卖了,走了,那我只能再找别人买货了(从头开始)。
3.我到交易地点的时候,你不在,那我先尝试把这个交易点给占了(一屁股做凳子上...),如果我成功抢占了单间(交易点),那就坐这儿等着你拿货来交易,进入第4步;如果被别人抢座了,那我只能在找别的地方儿了,进入第5步。
4.你拿着货来了,喊我交易,然后完成交易;也可能我等了好长时间你都没来,我不等了,继续找别人交易去,走的时候我看了一眼,一共没多少人,弄了这么多单间(交易地点Slot),太TM浪费了,我喊来交易地点管理员:一共也没几个人,搞这么多单间儿干毛,给哥撤一个!。然后再找别人买货(从头开始);或者我老大给我打了个电话,不让我买货了(中断)。
5.如果之前我尝试交易了2次都没成功,那我就想我TM选的这个位置(Slot下标)是不是风水不好啊,换个地儿继续(从头开始);如果之前都尝试交易了4次还没成功,我怒了,喊过来交易地点的管理员:给哥再开一个单间(Slot),加一个凳子,这么多人就这么几个破凳子够谁用!
以上是关于Java源码Concurrent包剩余部分类的主要内容,如果未能解决你的问题,请参考以下文章
JDK源码JDK的java.util.concurrent包结构