ForkJoinPool的内部成员
ForkJoinPool的成员变量大多数都是二进制的数,很多重要的成员变量下面都会维护几个小弟(二进制的数),目的是二进制位移运算得出这个成员变量的状态,不同的二进制数表示了不同的状态,看源码时尽量不要把数字类型的变量看成数字,而是二进制
runState字段
//线程池的运行状态状态
volatile int runState;
以下变量的作用用于计算runstate,是通过二进制位移运算得出当前线程的状态,线程池关闭小于0,其他都是2的幂次方。
通过这几个特殊的变量与runState做位运算,得出runState不同的位是否为1来得出当前的运行状态。
private static final int RSLOCK = 1; //线程池被锁定。二进制表示:00000000 00000000 00000000 00000001
private static final int RSIGNAL = 1 << 1; //线程池有线程需要唤醒 00000000 00000000 00000000 00000010
private static final int STARTED = 1 << 2; //线程池已经初始化 00000000 00000000 00000000 00000100
private static final int STOP = 1 << 29; //线程池停止 00100000 00000000 00000000 00000000
private static final int TERMINATED = 1 << 30; //线程池终止 01000000 00000000 00000000 00000000
private static final int SHUTDOWN = 1 << 31; //线程池关闭 10000000 00000000 00000000 00000000
总结来说就是:runState是一个int类型变量,即32位的二进制数,上面6个变量分别表示的是二进制第一位为1其他位为0的RSLOCK,第二位为1其他位为0的RSIGNAL,依次类推第三位为1的STARTED,第三十位为1的STOP,三十一位为1的TERMINATED,三十二位为1的SHUTDOWN即负数。
ctl字段
//实例字段。存储线程池的控制信息
volatile long ctl;
//以下变量目的是通过与ctl做位运算可以得出四个段的信息
// 高低位
private static final long SP_MASK = 0xffffffffL;//long型低32位.
private static final long UC_MASK = ~SP_MASK;//long型高32位.
// 活跃数.
private static final int AC_SHIFT = 48;//移位偏移量,如左移到49位开始.
private static final long AC_UNIT = 0x0001L << AC_SHIFT;//1<<48代表一个活跃数单位.
private static final long AC_MASK = 0xffffL << AC_SHIFT;//long型高16位(49-64)
// 总数量
private static final int TC_SHIFT = 32;//移位偏移量,33位开始.
private static final long TC_UNIT = 0x0001L << TC_SHIFT;//1<<32代表一个总数量
private static final long TC_MASK = 0xffffL << TC_SHIFT;//33-48位
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); //第48位
线程池中较为核心的变量,long类型的变量。他将一个64位分为了四段。每段16位。由高到低分别为AC,TC,SS,ID
AC: 正在运行工作线程数减去目标并行度,高16位
TC: 总工作线程数减去目标并行度,中高16位
SS: 栈顶工作线程状态和版本数(每一个线程在挂起时都会持有前一个等待线程所在工作队列的索引,由此构成一个等待的工作线程栈,栈顶是最新等待的线程),第一位表示状态 1:不活动(inactive); 0:活动(active),后15表示版本号,防止 ABA 问题
ID: 栈顶工作线程所在工作队列的索引
AC为负说明,活跃的worker没有到达预期数量,需要激活或者创建。 1.没有空闲的worker并且worker太少的话会创建,具体是调用tryAddWorker方法。 2.如果有空闲worker,获取到空闲worker,解除worker阻塞。并且更新sp。
通过SS我们可以进行判断是否存在空闲的worker。
ID用来定位该worker对应队列的下标。
如图所示:
ForkJoinPool和 WorkQueue之间的共享掩码
// 跟边界相关,如队列容器(数组)的最大长度
static final int SMASK = 0xffff; // short bits == max index 参与位移计算,计算后16位的数
static final int MAX_CAP = 0x7fff; // 表示最大的队列数(线程数) ,二进制:0111 1111 1111 1111,参与计算并行度,15位正好对应ctl中计算线程数
static final int EVENMASK = 0xfffe; // 16位偶数位 1111 1111 1111 1110 even short bits
static final int SQMASK = 0x007e; // 126,二进制表示为0111 1110 。用于计算队列数组workQueues的偶数索引下标,第一位为0表示偶数,一共六个1,最多可计算出小于126的64个偶数索引下标
// Masks and units for WorkQueue.scanState and ctl sp subfield
static final int SCANNING = 1; // false when running tasks
static final int INACTIVE = 1 << 31; // must be negative
static final int SS_SEQ = 1 << 16; // version count
// Mode bits for ForkJoinPool.config and WorkQueue.config
static final int MODE_MASK = 0xffff << 16; // top half of int
static final int LIFO_QUEUE = 0;
static final int FIFO_QUEUE = 1 << 16;
static final int SHARED_QUEUE = 1 << 31; // must be negative
内部类:工作队列WorkQueue的成员变量
//初始的队列大小,队列用下面的ForkJoinTask<?>[] array存储数据
static final int INITIAL_QUEUE_CAPACITY = 1 << 13;
//最大队列大小
static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M
// Instance fields
volatile int scanState; // 扫描状态,小于0:表示不活跃的(等待状态),偶数:表示忙碌状态(第一次运行任务时置偶),奇数:在队列数组的索引位置
int stackPred; // 等待栈中前一个元素的指针。源码中备注的是栈中前一个元素,栈在哪里?当一个线程没有任务可窃取时,线程就会进入等待状态,可能会有多个线程等待,多个等待的线程会按照先后顺序用stackPred标记前一个等待线程的队列的索引(队列存放在线程池的workQueues数组中),然后把最后一个等待的线程队列(即栈顶)放到ctl的后32位中,由此形成一个等待线程的栈数据结构。
int nsteals; // number of steals
int hint; // randomization and stealer index hint
int config; // pool index and mode
volatile int qlock; // 用于检测队列是否被锁。1: 锁定, < 0: 终止;正常状态:0,
volatile int base; // index of next slot for poll
int top; // index of next slot for push
ForkJoinTask<?>[] array; // 队列真正的容器,在第一次执行任务时初始化(runWorker()中)
final ForkJoinPool pool; // the containing pool (may be null)
final ForkJoinWorkerThread owner; // owning thread or null if shared
volatile Thread parker; // == owner during call to park; else null
volatile ForkJoinTask<?> currentJoin; // task being joined in awaitJoin
volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer
构造方法
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
boolean asyncMode) {
this(checkParallelism(parallelism), //检查并行度是否超过最大限制。为什么是叫并行度不是线程数?该线程池的线程数量一般为parallelism,任务执行过程中,可能比这大
checkFactory(factory),
handler,
asyncMode ? FIFO_QUEUE : LIFO_QUEUE, //设置队列,还是栈模式
"ForkJoinPool-" + nextPoolId() + "-worker-");
checkPermission();
}
private ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
UncaughtExceptionHandler handler,
int mode,
String workerNamePrefix) {
this.workerNamePrefix = workerNamePrefix;
this.factory = factory;
this.ueh = handler;
this.config = (parallelism & SMASK) | mode; //高16位表示模式,低16位表示并行度
long np = (long)(-parallelism); // offset ctl counts
//将负数的parallelism分别放入ctl的高16位,和中高16位。为什么要改成负数再放入?因为这两个16位的最高位为1(即负数)表示活跃线程和总线程不够,后面创建线程时会计算线程数是否达标,初始化还未创建线程,肯定不够。
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
}
代码内部如何巧妙的运用二进制运算
举例1:U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK))
解释:RSLOCK为1,转换成二进制就是 00000000 00000000 00000000 00000001,那么与rs做 | 运算的话,rs的最低位必然为1,所以这段代码的意思是将RSLOCK的bit位放入RUNSTATE,方便后面做计算。
相关方法
lockRunState方法
//尝试给线程池加锁,加锁成功直接返回,如果加锁失败则当前线程自旋+等待,需要其他线程唤醒
private int lockRunState() {
int rs;
//先看看当前是否为锁定状态
return ((((rs = runState) & RSLOCK) != 0 ||
//没有锁定的话,那么就尝试加锁
!U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ?
//如果加锁还是没有成功那么,等待锁释放。
//如果加锁成功那么直接返回
awaitRunStateLock() : rs);
}
awaitRunStateLock方法
//这个方法的目的就是得到可用的锁。先是自旋,否则等待
private int awaitRunStateLock() {
Object lock;
boolean wasInterrupted = false;
for (int spins = SPINS, r = 0, rs, ns;;) {
//锁是否可用(是否其他线程占用)
if (((rs = runState) & RSLOCK) == 0) {
//如果可用,则尝试加锁
if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK)) {
if (wasInterrupted) {
try {//如果进来了,说明等待被打断,直接中断线程
Thread.currentThread().interrupt();
} catch (SecurityException ignore) {
}
}
//返回锁状态
return ns;
}
}
//走到这里表示锁状态不可用或者最少自旋一次,那么重新获取r的值
else if (r == 0)
r = ThreadLocalRandom.nextSecondarySeed();
//spins>0? 这个条件似乎不会成立,源码里没有找到在条件之外对SPINS赋值操作,好像永远都是0
else if (spins > 0) {
r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift
if (r >= 0)
--spins;
}
//到这说明线程池还未初始化,那么当前线程让出cpu执行权等待初始化。继续自旋
else if ((rs & STARTED) == 0 || (lock = stealCounter) == null)
Thread.yield(); // initialization race
//已经做了最大的努力还是没有拿到锁,那么改变线程池的运行状态为RSIGNAL(表示有线程需要唤醒),等待锁释放
//改变运行状态不够严谨,本质是将RSIGNAL的二进制bit位放入RUNSTATE
else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) {
synchronized (lock) { //这里为什么需要加锁?为了下面调用lock.wait()
//再次检查线程池的状态是否为RSIGNAL(目的是为了检查是否存在有其他线程也在改变这个状态)
if ((runState & RSIGNAL) != 0) {
try {
lock.wait();//没有则等待被唤醒
} catch (InterruptedException ie) {
if (!(Thread.currentThread() instanceof
ForkJoinWorkerThread))
wasInterrupted = true; //如果等待被打断,那么回到上面的中断线程
}
}
else //如果再次检查发现RSIGNAL又被其他线程修改了,那么唤醒等待,继续自旋获取锁状态
lock.notifyAll();
}
}
}
}
unlockRunState方法
//解锁线程池的运行状态,解锁必须成功
private void unlockRunState(int oldRunState, int newRunState) {
//cas尝试解锁
if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) {
//解锁失败
//自己上的锁,自己为什么会解锁失败?
//加锁只是修改了RUNSTATE的锁标识位(二进制最低的第一位RSLOCK),如果修改失败说明RUNSTATE被修改,而自己上锁的,但RUNSTATE已经改变了,说明有其他线程修改了这个字段,而只有awaitRunStateLock()方法才可能在其他线程上锁的情况下修改RSIGNAL位(表示该线程需要唤醒),如果是这样,那么就需要唤醒其他线程。
Object lock = stealCounter;
//不管是否有线程等待或者其他原因解锁失败,都会强制解锁
runState = newRunState; // clears RSIGNAL bit
if (lock != null)
//唤醒其他所有等待锁的线程
synchronized (lock) { lock.notifyAll(); }
}
}
externalSubmit方法
//用于外部提交任务的方法。
//何为内部外部?内部:线程池开的线程是内部线程。外部:线程池之外的线程提交的任务,主线程或其他
private void externalSubmit(ForkJoinTask<?> task) {
int r; // initialize caller's probe
//如果进入这里那么表示是第一次提交任务,需要初始化
//线程第一次调用ThreadLocalRandom.getProbe()结果为0
if ((r = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit();
r = ThreadLocalRandom.getProbe();
}
//自旋提交任务,可能会出现多线程同时提交
for (;;) {
WorkQueue[] ws; WorkQueue q; int rs, m, k;
boolean move = false; //是否发生多线程竞争
//小于0表示线程池已经停了,如果还在提交任务,那么直接抛出异常
if ((rs = runState) < 0) {
tryTerminate(false, false); // help terminate
throw new RejectedExecutionException();
}
//如果rs的第四个bit位不是1(不是开始状态),表示还未初始化,那么初始化
else if ((rs & STARTED) == 0 || // initialize
//如果是rs的状态已经是STARTED的,还需要判断workQueues是否初始化,因为可能其他的线程正在初始化,已经修改了rs的状态,但还未来得及给workQueues赋值。
//如果workQueues已经赋值了为什么还要判断数组长度是否为1呢?workQueues的长度貌似初始化之后就之后增大不会减小
((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
int ns = 0;
// 如果上述条件都满足,那么就准备初始化,先加锁。
rs = lockRunState();
try {
//再次检查,防止加锁期间被初始化
if ((rs & STARTED) == 0) {
//先初始化STEALCOUNTER 偷锁
U.compareAndSwapObject(this, STEALCOUNTER, null,
new AtomicLong());
// create workQueues array with size a power of two
//取config的低16位(确切说是低15位)。在new线程池时,需要传入并行度,这个并行度就放在config的低16位
int p = config & SMASK; // ensure at least 2 slots
//获取2的次幂数
int n = (p > 1) ? p - 1 : 1;
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4;
n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
//创建工作队列的容器,并标记线程池已经开始工作
workQueues = new WorkQueue[n];
ns = STARTED;
}
} finally {
//释放锁。将rs的锁标识位(最低位)置为0,并将STARTED的二进制位为1的放入rs
unlockRunState(rs, (rs & ~RSLOCK) | ns);
}
}
//这一步是为了计算出队列数组的偶数下标,并且如果这个索引处有一个队列,那么就将提交的任务放入这个队列,尝试开启新线程。如果走到这一步,!= null可以看出下面的else if一定走了
//ws[k = r & m & SQMASK]这一步是计算偶数下标:
//这个计算的目的是基于当前线程的探针 r 和 workQueues 数组的长度 m,计算得到 workQueues 数组中的索引 k,然后通过 ws[k] 获取相应位置的 WorkQueue 对象。
//SQMASK 的作用是确保通过 r & m 计算得到的索引 k 在 workQueues 数组长度范围内,且为偶数。这样确保了分配到的 WorkQueue 对象的索引是偶数,因为在后续的代码中,会通过 (s & 1) == 0 判断当前队列是否是共享队列,而共享队列的索引需要是偶数
else if ((q = ws[k = r & m & SQMASK]) != null) {
//如果当前队列没有上锁,那么尝试cas上锁
if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
//上锁成功,那么就把任务提交给当前队列
ForkJoinTask<?>[] a = q.array; //这里没有竞争的话为null
int s = q.top;
boolean submitted = false; // initial submission or resizing
try { // locked version of push
//如果队列已经初始化了,并且有足够的空间则放入队列
if ((a != null && a.length > s + 1 - q.base) ||
//或者队列没有初始化,那么就去初始化队列,成功则放入队列
(a = q.growArray()) != null) {
//因为需要cas将任务放入数组,所以需要计算索引top的在内存中的偏移量,需要将任务放在此处
int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
U.putOrderedObject(a, j, task);//放入任务
U.putOrderedInt(q, QTOP, s + 1); //top加1
submitted = true; //标记为已经提交
}
} finally { //解锁
U.compareAndSwapInt(q, QLOCK, 1, 0);
}
if (submitted) { //如果提交成功,那么尝试创建线程或者唤醒等待线程处理任务
signalWork(ws, q);
return;
}
}
//加锁失败,表示有其他线程也在处理这个索引下的队列,那么标记move,进入下次自旋
move = true; // move on failure
}
//走到这说明计算的出来的索引位置没有队列,且线程池状态可用,那么创建新的队列放入索引位置
else if (((rs = runState) & RSLOCK) == 0) { // create new queue
//创建队列
q = new WorkQueue(this, null);
q.hint = r;
//config的第32位置1表示共享队列
q.config = k | SHARED_QUEUE;
//共享队列的扫描状态直接标记为不活跃
//它没有工作线程,也不会参与活化和scan阻塞的过程,也不会将自己的scanState压入ctl后32位做栈元素
q.scanState = INACTIVE;
//上锁,要开始给队列数组放入队列了
rs = lockRunState(); // publish index
//上锁之后再次检查,类似双重检查锁。为什么需要再次检查?上锁过程中依然有其他的线程可能修改runState,修改了就影响rs的结果。同样的也有可能因为线程竞争导致当前线程没有初始化队列数组workQueues,几个判断都是再次检查可能因为线程竞争关系导致的不合理的进入
if (rs > 0 && (ws = workQueues) != null &&
k < ws.length && ws[k] == null)
ws[k] = q; // else terminated
unlockRunState(rs, rs & ~RSLOCK);
}
else //走到这里说明存在多线程竞争,标记繁忙
move = true; // move if busy
if (move) //如果是线程池繁忙,那么重置新的r供下一轮使用,继续自旋
r = ThreadLocalRandom.advanceProbe(r);
}
}
signalWork方法
//如果活动的工作线程太少,则尝试创建或激活工作线程。
final void signalWork(WorkQueue[] ws, WorkQueue q) {
long c; int sp, i; WorkQueue v; Thread p;
//ctl初始化的时候最高位为1即负数,每次添加线程时AC(高16位,活跃线程数)和TC(中高16位,总线程数)位都会加1,那么何时为正数?初始化时AC和TC为负的parallelism,即增加了parallelism个线程不小于0
//ctl<0表示ctl最高位为1,即可以判断AC为负数,即活跃线程数没有达到parallelism(并行度)
while ((c = ctl) < 0L) {
//ctl的低32位初始化时默认都为0。这个32位用低16位表示空闲线程的id(可以计算索引下标),如果有多个空闲线程那么保存的就是栈顶的id,高16位表示是否存在空闲线程,有的话最高位为1,其余15位表示版本数(修改次数,防止BA问题)
//即 (int)ctl==0 那么说明现在没有空闲线程
if ((sp = (int)c) == 0) { // no idle workers
//ADD_WORKER为long类型的第48位为1,即用于判断ctl的TC位的最高位是否为1
//ctl & ADD_WORKER !=0 说明TC位为负数,即总线程数还未达标
if ((c & ADD_WORKER) != 0L) // too few workers
//既然总线程数和活跃线程数都不够,且又没有空闲线程,那么就尝试开启新的线程
tryAddWorker(c);
break;
}
//正常执行情况下ws一定有值,如果为null,就表示线程池要么没有初始化,要么就终止了
if (ws == null) // unstarted/terminated
break;
//sp & SMASK计算低16位,即索引值。索引会超过ws长度?ws好像只会扩容
//还是工作线程数组长度不足?
if (ws.length <= (i = sp & SMASK)) // terminated
break;
//活跃线程未满,且又找不到空闲线程,线程池可能正在关闭?
if ((v = ws[i]) == null) // terminating
break;
//sp即ctl的低32位,SS_SEQ表示第17位为1, ~INACTIVE表示第32位为0,其他位为1
//下面的表达式的意思:ctl的中低16位加1,最高位取0。即把当前队列的线程设置为活跃线程,并且ABA位版本号加1,
int vs = (sp + SS_SEQ) & ~INACTIVE; // next scanState
//用相减的方式判断是否还是栈顶元素
int d = sp - v.scanState; // screen CAS
//这段代码的计算方式:ctl的AC位加1,再把上一个等待栈的队列的stackPred放入ctl的低32位,然后合并。
//此处并没有增加总线程数,因为是唤醒的等待线程
long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
//d==0表示当前队列就是等待栈的栈顶元素,也可以表示没有发生线程竞争。如果满足那么尝试cas替换ctl的值
if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
v.scanState = vs; //更改scanState,表示队列现在处于激活状态 // activate v
//唤醒等待线程。这里v,parker正常情况下有值,
if ((p = v.parker) != null)
U.unpark(p);
break; //目的完成,跳出
}
//走到这里说明,没有足够活跃的线程,且又没有空闲线程,那么要不要继续自旋?
//q不为null表示想处理任务,但是队列里又没有任务那么不需要自旋,其他继续自旋
if (q != null && q.base == q.top) // no more work
break;
}
}
tryAddWorker方法
//尝试添加一个新工作线程(工作者)
private void tryAddWorker(long c) {
boolean add = false;
do {
//即将添加新的线程了,ctl的AC和TC位表示的活跃线程数和线程总数需要加1
//表示高32位中两个16位,分别加1
long nc = ((AC_MASK & (c + AC_UNIT)) |
(TC_MASK & (c + TC_UNIT)));
if (ctl == c) { //判断相等的目的:是否存在多线程竞争。不相等就自旋
int rs, stop; // check if terminating
//先加锁,再判断线程池是否终止
if ((stop = (rs = lockRunState()) & STOP) == 0)
//没有终止,则cas给ctl赋值
add = U.compareAndSwapLong(this, CTL, c, nc);
unlockRunState(rs, rs & ~RSLOCK); //解锁
if (stop != 0) //停止了就跳出
break;
if (add) { /如果cas成功,那么创建线程,
createWorker();
break; //目的达成跳出
}
}
//走到这里,说明上面的走过一轮了,并且失败了,那么是否要继续自旋?
//不管怎样重新获取ctl的值,下次自旋使用
//判断总线程数是否已经饱和,并且是没有空闲线程,继续自旋
//(int)c == 0表示ctl的后32位,也是等待栈的scanState灭活的值
} while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
}
createWorker方法
//这个方法很简单,就是创建一个工作线程ForkJoinWorkerThread,创建失败就销毁
private boolean createWorker() {
//获取默认的创建线程工厂。工厂模式创建
ForkJoinWorkerThreadFactory fac = factory;
Throwable ex = null; //记录异常
ForkJoinWorkerThread wt = null;
try {
//这里就是创建一个线程,创建线程里有一个重要的方法registerWorker(this);大概意思就是给工作线程分配一个队列,并把这个队列放到线程池的数组
if (fac != null && (wt = fac.newThread(this)) != null) {
//创建成功直接开启新线程,这里线程的实现类是ForkJoinWorkerThread,自然会去调用它的run方法
wt.start();
return true;
}
} catch (Throwable rex) {
ex = rex;
}
//失败则调用注销方法
deregisterWorker(wt, ex);
return false;
}
registerWorker方法
protected ForkJoinWorkerThread(ForkJoinPool pool) {
super("aForkJoinWorkerThread");
//到这里可以看到,每次创建线程一个线程,每个线程都会记录线程池的实例,这个实例就是我们主线程创建的实例,相当于每个线程共享的
this.pool = pool;
//创建队列,队列会记录当前的线程,当前线程也会记录各自的队列
this.workQueue = pool.registerWorker(this);
}
//这个方法就是创建一个记录当前线程的队列,并把队列放入线程池的队列数组
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
UncaughtExceptionHandler handler;
//守护线程
wt.setDaemon(true); // configure thread
//设置异常处理器
if ((handler = ueh) != null)
wt.setUncaughtExceptionHandler(handler);
//创建一个队列的实例,this表示当前线程池,一般是当前线程池实例,wt是当前线程
//所以队列记录了当前线程和共享的线程池实例
WorkQueue w = new WorkQueue(this, wt);
//当前队列放到线程池队列数组的哪个位置,下面会做计算
int i = 0; // assign a pool index
//取队列模式,即取config的高16位,0表示:队列,1表示栈
int mode = config & MODE_MASK;
//现在把队列放入线程池的队列数组,因为队列数组是多个线程共享的,所以这里操作需要加锁
int rs = lockRunState();
try {
WorkQueue[] ws; int n; // skip if no array
//添加时队列数组必须已经初始化了
if ((ws = workQueues) != null && (n = ws.length) > 0) {
//下面这段算法的意义,找到一个奇数索引的位置是否有队列,有的话找下一个,直到找到没有的
//每次找下一个都会以step为步长,转了n圈还是没有位置,那么就扩容
int s = indexSeed += SEED_INCREMENT; // unlikely to collide
int m = n - 1;
//i表示奇数索引
i = ((s << 1) | 1) & m; // odd-numbered indices
if (ws[i] != null) { // collision
int probes = 0; // step by approx half n
int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
while (ws[i = (i + step) & m] != null) {
if (++probes >= n) {
//扩容两倍,保留原数组位置
workQueues = ws = Arrays.copyOf(ws, n <<= 1);
m = n - 1;
probes = 0;
}
}
}
//配置队列的hint,每个队列的hint都不同
w.hint = s; // use as random seed
//config同样的,高16为记录队列模式,低16位记录索引
//这里的意思是把当前队列的索引放入config的低16位
w.config = i | mode;
//扫描状态。初始值是队列在数组中的索引
w.scanState = i; // publication fence
ws[i] = w; //放入数组
}
} finally {
unlockRunState(rs, rs & ~RSLOCK); //解锁
}
wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
return w;
}
ForkJoinWorkerThread类
这个类是专门负责处理forkjoin框架的线程类,继承了Thread,在线程池开启新的线程时自然而然就会去执行它的run方法。
成员变量
//每次创建当前线程实例时,必定会给这两个字段赋值
final ForkJoinPool pool; //当前线程池
final ForkJoinPool.WorkQueue workQueue; //工作队列 // work-stealing mechanics
run方法
public void run() {
//刚初始化的队列的array一般为null
if (workQueue.array == null) { // only run once
Throwable exception = null;
try {
onStart(); //空实现,线程开启时回调,给开发者使用
//开始运行工作,这里会调用线程池的runWorker方法,到池里面扫描任务,扫描到入伍
pool.runWorker(workQueue);
} catch (Throwable ex) {
exception = ex;
} finally {
try {
onTermination(exception); //线程结束时的回调,如果有异常,也可以获取
} catch (Throwable ex) {
if (exception == null)
exception = ex;
} finally {
pool.deregisterWorker(this, exception); //线程结束,注销线程,如果有异常也会携带相关异常。
}
}
}
}
在线程开启时,或者这发生异常时,实现这个类,就可以获取它的回调方法
pol.runWorker方法
final void runWorker(WorkQueue w) {
w.growArray(); // 初始化或扩容
int seed = w.hint; // initially holds randomization hint
int r = (seed == 0) ? 1 : seed; // avoid 0 for xorShift
for (ForkJoinTask<?> t;;) {
if ((t = scan(w, r)) != null) //在队列数组中扫描(偷窃)任务,如果偷窃到任务
w.runTask(t); // 执行当前任务
else if (!awaitWork(w, r)) //如果找不到任务,就等待任务。如果等待也没有,那么退出循环
break;
//每次循环中更新随机数种子r,用于产生下次任务扫描的随机性
r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
}
}
扫描和处理任务
scan扫描或偷取任务
这个方法“偷窃”线程的核心实现,与其说是“偷窃”,倒不如说是扫描,这个方法的主要作用是扫描线程池的队列数组,遍历队列数组,从随机索引开始找一个队列,然后从这个队列的底部偷一个任务,偷到就返回这个任务,执行掉。如果在当前队列找不到任务,那么就下一个,直到找不到为止。
如果一直找不到任务,那么就要在这个方法里进行灭活操作,灭活之后,进入其他的等待方法,当前线程就可能被置为等待状态
于此同时,找到任务时,检查线程池的线程是否已经足够,不够开启新线程。新线程开启,重复上面动作,扫描队列,执行任务,或者灭活队列。
//扫描任务,偷窃任务。
private ForkJoinTask<?> scan(WorkQueue w, int r) {
WorkQueue[] ws; int m;
//这里判断队列数组是否为空。为什么要校验m = ws.length - 1 > 0?队列数组可能为0?
if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
int ss = w.scanState; // initially non-negative
//origin表示开始索引, r & m表示从随机索引开始
for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
int b, n; long c;
//取出元素
if ((q = ws[k]) != null) {
//队列里有元素时的处理逻辑
//这里的base和top分别表示队列尾部和顶部的元素索引位置
if ((n = (b = q.base) - q.top) < 0 &&
(a = q.array) != null) { // non-empty
//获取计算队列尾部元素的索引位置
long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
//获取上一步索引位置的元素,如果这个元素不为null
if ((t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i))) != null &&
//为什么还要再次判断q.base == b?在获取索引元素时,可能其他线程偷取了这个元素
q.base == b) {
//判断当前队列是否为活跃的。
//队列在没有元素处理的时候,线程需要灭活(等待状态),即scanState<0
if (ss >= 0) {
//cas操作将队列尾部元素的索引位置置为null(这时当前索引位置元素已经取出来了,t就是)
if (U.compareAndSwapObject(a, i, t, null)) {
//既然取出来了那么base要加1
q.base = b + 1;
//n<-1表示队列里最少有两个元素(任务),那么尝试开启或者唤醒新的线程来协同处理
//队列新增一个任务时top会加1,base不变,这样的话n=base-top,有一个人元素时n就为-1
if (n < -1) // signal others
//开启或者唤醒新的线程来协同处理
signalWork(ws, q);
//找到了一个任务,返回
return t;
}
}
//如果是不活跃的状态
else if (oldSum == 0 && // try to activate
w.scanState < 0)
//那么尝试唤醒栈顶的队列
//为什么这里是唤醒栈顶队列,而不是当前队列?
//因为ctl的低32位存的是等待栈的栈顶’队列‘,这个栈通过stackPred字段记录上一个元素的位置来形成等待栈,如果唤醒的是当前队列,那么可能斩断了这个栈
tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
}
//如果队列的不活跃,有可能在上面就可能重新被激活了,又或者被其他线程修改了,因为能进来这个方法就说明这个队列是活跃的,这里需要重置扫描状态
if (ss < 0) // refresh
ss = w.scanState;
//如果能进到这个大的if语句,但又走到这里,那么说明一定发生了多线程竞争,那么一切归零,换个索引位置,重新扫描,去找新任务
//为什么到这里就一定发生了多线程竞争?因为进入这个大if条件,说明是有找到任务的,正常走,找到任务就return了,不会到这。
r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
origin = k = r & m; // move and rescan
oldSum = checkSum = 0;
continue;
}
//用来记录,找到了队列,但又没有找到任务
checkSum += b;
}
//这个if的作用是如果找了一圈都没有找到任务,说明暂时没有任务,那么就准备灭活这个队列了
//origin表示初始索引,如果这个条件相等,那么表示最少转完一圈了,或者几圈
if ((k = (k + 1) & m) == origin) { // continue until stable
//之前一直没想通,为什么在灭活之前要用这三个条件?
//oldSum == checkSum 这个条件一定要满足,这个表示检查了一圈或n圈,所有队列是空的,但凡有’多余‘任务都不需要灭活,因为从上面的可以看到,checkSum和oldSum不相等只有checkSum += b处代码,如果有任务或者发生线程竞争,也走不到checkSum += b
//其次是ss>=0 这个是必须的,需要检查队列是否已经被灭活了,如果被灭活了,那么还需要检查ss = w.scanState(是否发生线程竞争),因为有可能不是自己灭活的(其他线程竞争灭活的),如果发生线程竞争,那么checkSum归零,继续自旋
if ((ss >= 0 || (ss == (ss = w.scanState))) &&
oldSum == (oldSum = checkSum)) {
//准备灭活,再次检查是否已经被灭活了,或者是否已经终止了
//正常的话,第一次进来这里会跳过,只有下面执行了灭活操作,第二次自旋时就会进入这个break
if (ss < 0 || w.qlock < 0) // already inactive
break; 灭活或者终止则跳出
//灭活操作,这个是将扫描状态置为负数
int ns = ss | INACTIVE; // try to inactivate
//重新计算线程池新的ctl,这个是将队列的scanStatus放入ctl的低32位,然后将ctl的AC位(活跃线程数)减1
//这个之前说过了ctl低32位放的就是等待栈(被灭活的)的栈顶队列的scanStatus
long nc = ((SP_MASK & ns) |
(UC_MASK & ((c = ctl) - AC_UNIT)));
//获取上一个等待栈的栈顶队列,把它挂在当前灭活队列的stackPred
//在这里就可以体现等待栈的形成主要靠stackPred字段连接两个被灭活的队列
w.stackPred = (int)c; // hold prev stack top
U.putInt(w, QSCANSTATE, ns); //cas置换当前队列的扫描状态。
//更换新的ctl的值
if (U.compareAndSwapLong(this, CTL, c, nc))
ss = ns; //cas成功,直接更改ss,再转一圈就可以跳出自旋了,因为跳出自旋在上面
else
w.scanState = ss; //cas失败,就还原队列的扫描状态,继续自旋 // back out
}
//转完一圈,检查位复位
checkSum = 0;
}
}
}
return null; //找不到任务
}
总结来说,scan()方法主要做三件事:
- 在共享队列数组中找任务
- 有多的任务开启或唤醒线程
- 没有任务就灭活队列
runTask方法
用于执行任务的方法
// WorkQueue队列运行任务的方法
final void runTask(ForkJoinTask<?> task) {
if (task != null) {
//目的是将scanState转为偶数,即最低位置为0
//scanState为偶数,说明表示队列处于工作状态
scanState &= ~SCANNING; // mark as busy
//这个方法会执行 抽象方法exec(), exec()方法由自己的子类去实现
//子类有RecursiveAction,RecursiveTask。在这里会执行线程的run方法
(currentSteal = task).doExec();
//currentSteal表示偷窃的任务,完了这个任务,那么将这个实例字段置null
U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
//执行当前队列自己的任务,这个会从队列顶部取任务。而偷窃则是从底部
execLocalTasks();
ForkJoinWorkerThread thread = owner;
//nsteals初始值就是0,会小于0?
if (++nsteals < 0) // collect on overflow
transferStealCount(pool); //忙不过来了转到其他线程
scanState |= SCANNING;//取消工作状态
if (thread != null) //每次执行完任务时的回调方法,子类实现
thread.afterTopLevelExec();
}
}
doExec方法
最后会调用各自的实现,用于执行的代码逻辑编写,也就是我们常用的RecursiveTask的completed方法
final int doExec() {
int s; boolean completed;
//线程池还未关闭
if ((s = status) >= 0) {
try {
子类实现,执行任务
completed = exec();
} catch (Throwable rex) {
//异常时,记录异常
return setExceptionalCompletion(rex);
}
//标记完成并唤醒等待加入此任务的线程
if (completed)
s = setCompletion(NORMAL);
}
return s;
}
任务拆分示例代码
这块业务逻辑就是用分叉线程池ForkJoinPool计算1加到100的结果
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
public static final int threshold = 2;
private int start;
private int end;
public ForkJoinTaskExample(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
//如果任务足够小就计算任务
boolean canCompute = (end - start) <= threshold;
if (canCompute) {
for (int i = start; i <= start; i++) {
Thread thread = Thread.currentThread();
System.out.println("线程名:"+thread.getName()+"完成了任务");
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算
int middle = (start + end) / 2;
ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);
// 执行子任务
leftTask.fork();
rightTask.fork();
// 等待任务执行结束合并其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// 合并子任务
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkjoinPool = new ForkJoinPool();
//生成一个计算任务,计算1+2+3+4+...100
ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);
ForkJoinTask<Integer> submit = forkjoinPool.submit(task);
try {
Integer integer = submit.get();
System.out.println(integer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
在上述代码中,先是查看任务是否是最小的任务,如果是的就是直接执行,如果不是,就继续分割,执行分割完的子任务,然后等待任务执行完成。在这compute方法其实是分叉递归调用,从最开始的大任务一直分叉,分叉到最小单元,直到执行完所有的分叉单元再逐层向上返回结果。
这里的分叉和等待任务分别用的fork和join,来看看这两个方法的实现。
fork方法
//这个方法的主要目的就是向线程池里提交任务
//在这里区分内部提交和外部提交,因为线程池里的线程都是ForkJoinWorkerThread类型,即ForkJoinWorkerThread为内部提交,主(其他)线程就是外部提交
//在源码中的注释大概意思为:在线程之外的任何线程中,任务完成并重新初始化之前多次fork一个任务,存在线程安全,因为后续操作未必一致可观察,如果想要解决这个问题,那么可以先调用join或相关方法,或调用isDone返回true
public final ForkJoinTask<V> fork() {
Thread t;
//如果是内部提交,那么直接往数组里push
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
//这里调用的push是t.workQueue ,即获取线程里的队列推送方法,所以这里的push方法不用考虑线程安全问题(单线程)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
//外部提交则调用外部提交的方法。
ForkJoinPool.common.externalPush(this);
return this;
}
这个方法没有太多的逻辑,说白了就是向当前线程的队列中添加任务,提交之后就不管了。
join方法
//等待返回执行结果,工作的是doJoin方法
//异常完成时,调用线程会中断并抛出异常,而get()方法不会
public final V join() {
int s;
//如果不是正常完成的,那么需要上报异常,实际上是抛出对应记录的异常
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);//抛出记录的异常
//返回结果回调方法,子类实现
return getRawResult();
}
doJoin方法
//该方法在等待之前会帮助线程做任务,如果没有得到任务,那么就等待。
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
//status表示当前任务的状态,小于0表示结束(包含异常或正常结束),结束则返回
return (s = status) < 0 ? s :
//1.如果当前线程是线程池的线程,那么就尝试从队列中拿出(取消推送)这个任务,如果拿出成功,就继续做任务,如果做任务也成功了就返回。如果拿失败或者做失败了,就说明有线程竞争在处理这个任务,那么就调用等待方法。
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
//tryUnpush尝试取消推送,这个方法通过cas操作拿任务,判断提交的任务是否还在队列的顶部,如果在那么就拿出来,不在就拿失败
//doExec正常执行任务的方法。
tryUnpush(this) && (s = doExec()) < 0 ? s :
//等待
wt.pool.awaitJoin(w, this, 0L) :
//2.如果不是是其他线程,那么调用外部等待方法。
externalAwaitDone();
}
awaitJoin方法
//等待任务执行完毕
final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
//记录任务状态,返回结果
int s = 0;
if (task != null && w != null) {
//暂时取出上一个等待的任务
ForkJoinTask<?> prevJoin = w.currentJoin;
//将当前任务放入currentJoin
U.putOrderedObject(w, QCURRENTJOIN, task);
//如果任务是CountedCompleter类型,下面会帮助执行
CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
(CountedCompleter<?>)task : null;
for (;;) {
//任务结束了,跳出
if ((s = task.status) < 0)
break;
//CountedCompleter任务,帮助执行
if (cc != null)
helpComplete(w, cc, 0);
//如果当前队列没有任务了,那么帮助偷窃线程执行任务。
//如果存在任务,那么尝试移除并执行这个任务。
为什么帮助执行成功了,还要再次帮助偷窃者执行任务呢?因为 task.doExec();不等待执行结果?根据源码来看是等待的呀
else if (w.base == w.top || w.tryRemoveAndExec(task))
helpStealer(w, task); //满足条件帮助执行
//任务结束了,跳出
if ((s = task.status) < 0)
break;
long ms, ns; //毫秒,纳秒
//如果上面的都不满足了,那么就要等待了。下面计算等待时间并执行等待
if (deadline == 0L)
ms = 0L;//永久等待,dealine为0
//如果设置了等待时间,计算等待时间,已经过了等待时间,直接跳出
else if ((ns = deadline - System.nanoTime()) <= 0L)
break;
//没有超过等待时间,那么将纳秒转毫秒,如果设置了负数,那么校正为等待1豪秒
//这里还需要判断==0? 如果等于0,不是进到上面的if了?
else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
ms = 1L;
//等待之前,再尝试一次有没有其他的等待线程,有就唤醒,
//或者队列里是否有任务,没有就活跃线程数-1
//还有任务,再看看是否还能开启新线程(这里只是开启新线程,不做任务)。
if (tryCompensate(w)) {
//满足以上一个条件就执行等待操作,不满足就继续自旋
task.internalWait(ms);
//代码能走到说明,已经过了等待时间或者被唤醒了,那么这个线程是就变成活跃线程了,需要活跃线程数+1,然后继续自旋
U.getAndAddLong(this, CTL, AC_UNIT);
}
}
//走到这说明跳出自旋了,跳出自旋:任务结束了或者过了指定的等待时间
//将之前 准备等待的任务复位
U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
}
return s; //返回任务状态
}
tryRemoveAndExec方法
这个方法只在awaitJoin方法中执行
//队列为空且任务未完成时,返回true
//这个方法的目的就是在为了‘是否帮助偷取线程’做最后的检验,
//说白了就是这个任务在当前线程发出了等待请求,但是当前线程不想等待,但又任务被人偷走了,我还是帮他吧,或者我自己把这个任务做掉
final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; int m, s, b, n;
//非空检验
if ((a = array) != null && (m = a.length - 1) >= 0 &&
task != null) {
//每次重新获取base和top的值,并且检查队列是否为空
while ((n = (s = top) - (b = base)) > 0) {
//内层循环每次从top到base遍历队列
for (ForkJoinTask<?> t;;) { // traverse from s to b
//获取队列顶部的任务的索引,每次都会向下移动
long j = ((--s & m) << ASHIFT) + ABASE;
//每次进来都需要获取队列的顶部元素是否为null,如果满足说明队列为空,那么需要跳出循环
if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
//这里为了检查多线程竞争,如果s+1==top表示是队列顶部,表示没有发生线程竞争
//为什么相等就没有发生线程竞争?因为进来的时候取过了base和top的值,说明进来的时候这个区间是有任务,按照正常逻辑队列顶部任务为null的话,是自己执行掉的,此时s+1==top,如果中间元素为null说明有其他线程把任务偷走了。
return s + 1 == top; // shorter than expected
//找到了目标任务
else if (t == task) {
boolean removed = false;
//如果还是队列顶部,那么直接从顶部移除任务,打移除标记
if (s + 1 == top) { // pop
if (U.compareAndSwapObject(a, j, task, null)) {
U.putOrderedInt(this, QTOP, s);
removed = true;
}
}
//如果不是顶部,那么要检查是否发生多线程竞争,其他线程可能会从底部偷取任务,所以判断base是否发生变化
else if (base == b) // replace with proxy
//如果没变,那么需要把这个任务拿出来,但是又因为此时不是队列顶部,队列正常获取元素需要从顶部弹出,此时在队列中间,那么拿出这个元素,用个空类型的任务代替,防止线程问题。
removed = U.compareAndSwapObject(
a, j, task, new EmptyTask());
//只有找到目标元素了,都需要判断下是否移除成功,移除了那么就需要执行掉,然后继续自旋
//继续自旋的目的是为了检查队列是否为空?
if (removed)
task.doExec();
break; //跳出去,检查下任务是否被完成
}
//循环一轮还没有找到目标任务,那么判断当前任务是否在队列顶部,并是否完成
//为什么要这样判断?因为这个方位的源码注释就是检查队列是否为空,所以在检查的同时,也帮忙清空已经完成的任务,方便自己更好得检查。如果完成了还必须在顶部,因为设计队列需要从顶部出列,中间出列会有问题
else if (t.status < 0 && s + 1 == top) {
if (U.compareAndSwapObject(a, j, t, null))
U.putOrderedInt(this, QTOP, s);
break; //跳出去,检查下任务是否被完成 // was cancelled
}
//一圈或者n圈转完了,还是没有找到目标任务,且队列不为空,那么返回false
if (--n == 0)
return false;
}
//兜底,每一轮内层循环跳出来或者遍历结束时,都需要检查下任务是否完成了
//因为任务完成了,我就不要帮助偷取线程执行任务了
if (task.status < 0)
return false;
}
}
return true;
}
helpStealer方法
这个方法大体逻辑如下:
- 寻找窃取者线程: 当一个线程等待另一个任务完成时,它会调用这个方法来寻找可以窃取任务的线程。这个方法会遍历所有工作队列,寻找正在执行与等待任务相关的子任务的线程。
- 帮助窃取者线程: 如果找到正在执行相关子任务的线程,这个方法会尝试帮助它窃取任务。它会从窃取者线程的队列中获取任务,然后交给等待的线程执行。
- 稳定性检查: 为了确保代码的稳定性,这个方法会进行一些检查,例如比较工作队列的长度和任务的状态。
说白了就是尽量不能让等待线程闲下来
//找到偷窃队列,如果偷窃队列中也有任务,那么尝试从偷窃队列中偷任务,让等待线程执行
private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
WorkQueue[] ws = workQueues;
int oldSum = 0, checkSum, m;
//非空校验
//变量解释:
//ws: 工作队列数组
//w: 调用者工作队列
//task: 等待的任务
//subtask: 子任务
//v: 窃取者工作队列
//j: 等待线程的工作队列
//hint: 窃取队列的索引
if (ws != null && (m = ws.length - 1) >= 0 && w != null &&
task != null) {
//最外层循环:任务完成检查,并且稳定检查
do { // restart point
checkSum = 0; // for stability check
ForkJoinTask<?> subtask;
WorkQueue j = w, v; // v is subtask stealer
descent: for (subtask = task; subtask.status >= 0; ) {
//h计算为奇数,每次从奇数索引其队列,步长为2,每次都取奇数队列
//为什么每次取奇数队列?
for (int h = j.hint | 1, k = 0, i; ; k += 2) {
if (k > m) //找不到偷窃队列,跳到最外层循环 // can't find stealer
break descent;
//循环找偷窃队列,找到就返回
if ((v = ws[i = (h + k) & m]) != null) {
if (v.currentSteal == subtask) {
j.hint = i;
break;
}
checkSum += v.base;
}
}
//走到这说明一定找到了偷窃队列,开始帮助偷窃者执行任务。
for (;;) { // help v or descend
ForkJoinTask<?>[] a; int b;
checkSum += (b = v.base);
//拿到偷窃队列的等待任务
ForkJoinTask<?> next = v.currentJoin;
//如果subtask已是完成态,或发现竞态等情况造成数据已脏,如发现本轮循环中j的当前join已不是当前subtask,
//或v的当前steal不是subtask,说明出现了脏数据,直接终止循环二,重新进入while循环重初始化jv.
if (subtask.status < 0 || j.currentJoin != subtask ||
v.currentSteal != subtask) // stale
break descent;
//如果偷窃队列已空
if (b - v.top >= 0 || (a = v.array) == null) {
//并且偷窃队列的等待任务也是空的,重新while判定循环条件,重初始化jv
if ((subtask = next) == null)
break descent;
//如果偷窃队列还有处于等待中的任务,那么将偷窃队列改为等待线程的工作队列,重新开始寻找等待队列的偷窃者,按照此逻辑直到清空任务为止
j = v;
break;
}
//等待队列反偷‘偷窃队列’的任务,从底部开始
int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
ForkJoinTask<?> t = ((ForkJoinTask<?>)
U.getObjectVolatile(a, i));
//是否发生线程竞争,如果发生了,那就回头再来,就重新帮助偷窃队列执行任务
if (v.base == b) {
//任务被处理了,回到外层,为什么直接回到最外层?
if (t == null) // stale
break descent;
//cas反偷任务,从底部开始
if (U.compareAndSwapObject(a, i, t, null)) {
v.base = b + 1;
//现在要帮助偷窃队列执行任务,等待队列要偷取偷窃队列的任务,先把当前偷窃的任务暂时拿出去 ,要放现在偷取的任务了。
ForkJoinTask<?> ps = w.currentSteal;
int top = w.top;
do {
//放入偷窃容器,执行掉
U.putOrderedObject(w, QCURRENTSTEAL, t);
t.doExec(); // clear local tasks too
//只要参数task还未完成,w新压入了任务,则依次尝试从w中pop元素,和前面的t一样按序执行(此处顺带执行自己的任务)
} while (task.status >= 0 &&
w.top != top &&
(t = w.pop()) != null);
U.putOrderedObject(w, QCURRENTSTEAL, ps);
if (w.base != w.top)
return; // can't further help
}
}
}
}
//每一轮中检查 checkSum 变量。如果 checkSum 变量与 oldSum 变量不同,则表明代码不稳定
//因为窃取者线程的工作队列已被其他线程修改,或者窃取者线程的工作队列已被清空。
} while (task.status >= 0 && oldSum != (oldSum = checkSum));
}
}
tryCompensate方法
tryCompensate 方法主要用于防止工作线程池过载。
尝试通过释放空闲工作线程、解除补偿活动计数和创建新工作线程来实现这一目标。
- 有助于确保 ForkJoinPool 的性能和稳定性
//尝试减少活动计数(有时是隐式的),并可能释放或创建新工作线程,用于执行接下来要等待的任务
//发生争用、检测到陈旧性、不稳定性或终止时,返回 false
private boolean tryCompensate(WorkQueue w) {
//记录返回状态
boolean canBlock;
//ws线程池队列数组的副本,c保留的线程池的ctl信号量,m队列数组长度,pc当前并行度,sp为ctl信号量低32位
WorkQueue[] ws; long c; int m, pc, sp;
//当前队列是否为空,或者被其他线程占用,队列数组为空,并行度为0
if (w == null || w.qlock < 0 || // caller terminating
(ws = workQueues) == null || (m = ws.length - 1) <= 0 ||
(pc = config & SMASK) == 0) // parallelism disabled
canBlock = false;
//如果一切正常,那么检查是否存在空闲线程,如果有空闲线程,那么ctl低32保留了空闲线程的信息
else if ((sp = (int)(c = ctl)) != 0) // release idle worker
//存在空闲线程,尝试激活
canBlock = tryRelease(c, ws[sp & m], 0L);
//如果所有线程都在工作,那么准备减少线程,因为执行当前任务的线程就要等待了
//在减少之前需要做些检查。具体检查如下
else {
//取出当前的活跃线程数加上设置的并行度,以及当前线程总数加并行度。为了检查做准备
int ac = (int)(c >> AC_SHIFT) + pc;
int tc = (short)(c >> TC_SHIFT) + pc;
//遍历队列数组,能到这个分支,说明所有队列都忙碌,那么再次检查是不是真的所有队列都在忙碌
//为什么只遍历奇数队列?回头再看
int nbusy = 0; // validate saturation
for (int i = 0; i <= m; ++i) { // two passes of odd indices
WorkQueue v;
if ((v = ws[((i << 1) | 1) & m]) != null) {
if ((v.scanState & SCANNING) != 0)
break; //扫描状态的队列表示忙碌的,即scanState为奇数,如果有不忙碌的,跳出
++nbusy;//正常情况下,遍历完,nbusy和总线程数相等的,不相等说明有队列不忙碌
}
}
//检查工作线程数是否不稳定或过时
//nbusy不是i应该等于pc吗?nbusy != (tc << 1) 这是啥意思
if (nbusy != (tc << 1) || ctl != c)
canBlock = false; // unstable or stale
//如果队列是空的,就减少活跃线程数,在此之前,检查线程总数,以及至少有一个活跃线程数
//在这里可以看出,我们传入的并行度并不是线程池的线程数
else if (tc >= pc && ac > 1 && w.isEmpty()) {
long nc = ((AC_MASK & (c - AC_UNIT)) | //活跃线程数减1
(~AC_MASK & c)); // uncompensated
canBlock = U.compareAndSwapLong(this, CTL, c, nc);
}
//线程数是否超标
else if (tc >= MAX_CAP ||
(this == common && tc >= pc + commonMaxSpares))
throw new RejectedExecutionException(
"Thread limit exceeded replacing blocked worker");
//如果线程数没了,或者队列不为空,那么就要尝试创建新的线程了
//因为马上有线程要等待了,需要有线程唤醒
else { // similar to tryAddWorker
boolean add = false; int rs; // CAS within lock
long nc = ((AC_MASK & c) |
(TC_MASK & (c + TC_UNIT))); //线程总数加1
if (((rs = lockRunState()) & STOP) == 0)
add = U.compareAndSwapLong(this, CTL, c, nc);
unlockRunState(rs, rs & ~RSLOCK);
canBlock = add && createWorker(); //开启新的线程 // throws on exception
}
}
return canBlock;
}