逐行解析最复杂的线程池ForkJoinPool源码


ForkJoinPool的内部成员

ForkJoinPool的成员变量大多数都是二进制的数,很多重要的成员变量下面都会维护几个小弟(二进制的数),目的是二进制位移运算得出这个成员变量的状态,不同的二进制数表示了不同的状态,看源码时尽量不要把数字类型的变量看成数字,而是二进制

runState字段

//线程池的运行状态状态
volatile int runState;

以下变量的作用用于计算runstate,是通过二进制位移运算得出当前线程的状态,线程池关闭小于0,其他都是2的幂次方。

通过这几个特殊的变量与runState做位运算,得出runState不同的位是否为1来得出当前的运行状态。

private static final int  RSLOCK     = 1;         //线程池被锁定。二进制表示:00000000 00000000 00000000 00000001
private static final int  RSIGNAL    = 1 << 1;    //线程池有线程需要唤醒  00000000 00000000 00000000 00000010
private static final int  STARTED    = 1 << 2;    //线程池已经初始化  00000000 00000000 00000000 00000100
private static final int  STOP       = 1 << 29;   //线程池停止 00100000 00000000 00000000 00000000
private static final int  TERMINATED = 1 << 30;   //线程池终止 01000000 00000000 00000000 00000000
private static final int  SHUTDOWN   = 1 << 31;   //线程池关闭 10000000 00000000 00000000 00000000

总结来说就是:runState是一个int类型变量,即32位的二进制数,上面6个变量分别表示的是二进制第一位为1其他位为0的RSLOCK,第二位为1其他位为0的RSIGNAL,依次类推第三位为1的STARTED,第三十位为1的STOP,三十一位为1的TERMINATED,三十二位为1的SHUTDOWN即负数。

ctl字段

//实例字段。存储线程池的控制信息
  volatile long ctl;

  //以下变量目的是通过与ctl做位运算可以得出四个段的信息
  // 高低位
private static final long SP_MASK    = 0xffffffffL;//long型低32位.
private static final long UC_MASK    = ~SP_MASK;//long型高32位.

// 活跃数.
private static final int  AC_SHIFT   = 48;//移位偏移量,如左移到49位开始.
private static final long AC_UNIT    = 0x0001L << AC_SHIFT;//1<<48代表一个活跃数单位.
private static final long AC_MASK    = 0xffffL << AC_SHIFT;//long型高16位(49-64)

// 总数量
private static final int  TC_SHIFT   = 32;//移位偏移量,33位开始.
private static final long TC_UNIT    = 0x0001L << TC_SHIFT;//1<<32代表一个总数量
private static final long TC_MASK    = 0xffffL << TC_SHIFT;//33-48位
private static final long ADD_WORKER = 0x0001L << (TC_SHIFT + 15); //第48位

线程池中较为核心的变量,long类型的变量。他将一个64位分为了四段。每段16位。由高到低分别为AC,TC,SS,ID

  • AC: 正在运行工作线程数减去目标并行度,高16位

  • TC: 总工作线程数减去目标并行度,中高16位

  • SS: 栈顶工作线程状态和版本数(每一个线程在挂起时都会持有前一个等待线程所在工作队列的索引,由此构成一个等待的工作线程栈,栈顶是最新等待的线程),第一位表示状态 1:不活动(inactive); 0:活动(active),后15表示版本号,防止 ABA 问题

  • ID: 栈顶工作线程所在工作队列的索引

AC为负说明,活跃的worker没有到达预期数量,需要激活或者创建。 1.没有空闲的worker并且worker太少的话会创建,具体是调用tryAddWorker方法。 2.如果有空闲worker,获取到空闲worker,解除worker阻塞。并且更新sp。

通过SS我们可以进行判断是否存在空闲的worker。

ID用来定位该worker对应队列的下标。

如图所示:

ForkJoinPool和 WorkQueue之间的共享掩码

// 跟边界相关,如队列容器(数组)的最大长度
static final int SMASK        = 0xffff;        // short bits == max index 参与位移计算,计算后16位的数
static final int MAX_CAP      = 0x7fff;        // 表示最大的队列数(线程数) ,二进制:0111 1111 1111 1111,参与计算并行度,15位正好对应ctl中计算线程数
static final int EVENMASK     = 0xfffe;        // 16位偶数位 1111 1111 1111 1110 even short bits 
static final int SQMASK       = 0x007e;        // 126,二进制表示为0111 1110 。用于计算队列数组workQueues的偶数索引下标,第一位为0表示偶数,一共六个1,最多可计算出小于126的64个偶数索引下标

// Masks and units for WorkQueue.scanState and ctl sp subfield
static final int SCANNING     = 1;             // false when running tasks
static final int INACTIVE     = 1 << 31;       // must be negative
static final int SS_SEQ       = 1 << 16;       // version count

// Mode bits for ForkJoinPool.config and WorkQueue.config
static final int MODE_MASK    = 0xffff << 16;  // top half of int
static final int LIFO_QUEUE   = 0;
static final int FIFO_QUEUE   = 1 << 16;
static final int SHARED_QUEUE = 1 << 31;       // must be negative

内部类:工作队列WorkQueue的成员变量

    //初始的队列大小,队列用下面的ForkJoinTask<?>[] array存储数据
   static final int INITIAL_QUEUE_CAPACITY = 1 << 13;

    //最大队列大小
    static final int MAXIMUM_QUEUE_CAPACITY = 1 << 26; // 64M

    // Instance fields
    volatile int scanState;    // 扫描状态,小于0:表示不活跃的(等待状态),偶数:表示忙碌状态(第一次运行任务时置偶),奇数:在队列数组的索引位置
    int stackPred;             // 等待栈中前一个元素的指针。源码中备注的是栈中前一个元素,栈在哪里?当一个线程没有任务可窃取时,线程就会进入等待状态,可能会有多个线程等待,多个等待的线程会按照先后顺序用stackPred标记前一个等待线程的队列的索引(队列存放在线程池的workQueues数组中),然后把最后一个等待的线程队列(即栈顶)放到ctl的后32位中,由此形成一个等待线程的栈数据结构。
    int nsteals;               // number of steals
    int hint;                  // randomization and stealer index hint
    int config;                // pool index and mode
    volatile int qlock;        // 用于检测队列是否被锁。1: 锁定, < 0: 终止;正常状态:0,
    volatile int base;         // index of next slot for poll
    int top;                   // index of next slot for push
    ForkJoinTask<?>[] array;   // 队列真正的容器,在第一次执行任务时初始化(runWorker()中)
    final ForkJoinPool pool;   // the containing pool (may be null)
    final ForkJoinWorkerThread owner; // owning thread or null if shared
    volatile Thread parker;    // == owner during call to park; else null
    volatile ForkJoinTask<?> currentJoin;  // task being joined in awaitJoin
    volatile ForkJoinTask<?> currentSteal; // mainly used by helpStealer

构造方法

 public ForkJoinPool(int parallelism,
                        ForkJoinWorkerThreadFactory factory,
                        UncaughtExceptionHandler handler,
                        boolean asyncMode) {
        this(checkParallelism(parallelism), //检查并行度是否超过最大限制。为什么是叫并行度不是线程数?该线程池的线程数量一般为parallelism,任务执行过程中,可能比这大
             checkFactory(factory), 
             handler,
             asyncMode ? FIFO_QUEUE : LIFO_QUEUE, //设置队列,还是栈模式
             "ForkJoinPool-" + nextPoolId() + "-worker-");
        checkPermission();
    } 


private ForkJoinPool(int parallelism,
                         ForkJoinWorkerThreadFactory factory,
                         UncaughtExceptionHandler handler,
                         int mode,
                         String workerNamePrefix) {
        this.workerNamePrefix = workerNamePrefix;
        this.factory = factory;
        this.ueh = handler;
        this.config = (parallelism & SMASK) | mode; //高16位表示模式,低16位表示并行度
        long np = (long)(-parallelism); // offset ctl counts
        //将负数的parallelism分别放入ctl的高16位,和中高16位。为什么要改成负数再放入?因为这两个16位的最高位为1(即负数)表示活跃线程和总线程不够,后面创建线程时会计算线程数是否达标,初始化还未创建线程,肯定不够。
        this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
    }

代码内部如何巧妙的运用二进制运算

举例1:U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK))
解释:RSLOCK为1,转换成二进制就是 00000000 00000000 00000000 00000001,那么与rs做 | 运算的话,rs的最低位必然为1,所以这段代码的意思是将RSLOCK的bit位放入RUNSTATE,方便后面做计算。

相关方法

lockRunState方法

//尝试给线程池加锁,加锁成功直接返回,如果加锁失败则当前线程自旋+等待,需要其他线程唤醒
private int lockRunState() {
        int rs;
        //先看看当前是否为锁定状态
        return ((((rs = runState) & RSLOCK) != 0 ||
        //没有锁定的话,那么就尝试加锁
                 !U.compareAndSwapInt(this, RUNSTATE, rs, rs |= RSLOCK)) ?
                 //如果加锁还是没有成功那么,等待锁释放。
                 //如果加锁成功那么直接返回
                awaitRunStateLock() : rs);
    }

awaitRunStateLock方法

//这个方法的目的就是得到可用的锁。先是自旋,否则等待
private int awaitRunStateLock() {
        Object lock;
        boolean wasInterrupted = false;
        for (int spins = SPINS, r = 0, rs, ns;;) {
            //锁是否可用(是否其他线程占用)
            if (((rs = runState) & RSLOCK) == 0) {
                //如果可用,则尝试加锁
                if (U.compareAndSwapInt(this, RUNSTATE, rs, ns = rs | RSLOCK)) {
                    if (wasInterrupted) {
                        try {//如果进来了,说明等待被打断,直接中断线程
                            Thread.currentThread().interrupt();
                        } catch (SecurityException ignore) {
                        }
                    }
                    //返回锁状态
                    return ns;
                }
            }
             //走到这里表示锁状态不可用或者最少自旋一次,那么重新获取r的值
            else if (r == 0)
                r = ThreadLocalRandom.nextSecondarySeed();
            //spins>0? 这个条件似乎不会成立,源码里没有找到在条件之外对SPINS赋值操作,好像永远都是0
            else if (spins > 0) {
                r ^= r << 6; r ^= r >>> 21; r ^= r << 7; // xorshift
                if (r >= 0)
                    --spins;
            }
            //到这说明线程池还未初始化,那么当前线程让出cpu执行权等待初始化。继续自旋
            else if ((rs & STARTED) == 0 || (lock = stealCounter) == null)
                Thread.yield();   // initialization race
            //已经做了最大的努力还是没有拿到锁,那么改变线程池的运行状态为RSIGNAL(表示有线程需要唤醒),等待锁释放
            //改变运行状态不够严谨,本质是将RSIGNAL的二进制bit位放入RUNSTATE 
            else if (U.compareAndSwapInt(this, RUNSTATE, rs, rs | RSIGNAL)) {
                synchronized (lock) { //这里为什么需要加锁?为了下面调用lock.wait()
                    //再次检查线程池的状态是否为RSIGNAL(目的是为了检查是否存在有其他线程也在改变这个状态)
                    if ((runState & RSIGNAL) != 0) {
                        try {
                            lock.wait();//没有则等待被唤醒
                        } catch (InterruptedException ie) {
                            if (!(Thread.currentThread() instanceof
                                  ForkJoinWorkerThread))
                                wasInterrupted = true; //如果等待被打断,那么回到上面的中断线程
                        }
                    }
                    else  //如果再次检查发现RSIGNAL又被其他线程修改了,那么唤醒等待,继续自旋获取锁状态
                        lock.notifyAll();
                }
            }
        }
    }

unlockRunState方法

//解锁线程池的运行状态,解锁必须成功  
private void unlockRunState(int oldRunState, int newRunState) {
        //cas尝试解锁
        if (!U.compareAndSwapInt(this, RUNSTATE, oldRunState, newRunState)) {
        //解锁失败
        //自己上的锁,自己为什么会解锁失败?
        //加锁只是修改了RUNSTATE的锁标识位(二进制最低的第一位RSLOCK),如果修改失败说明RUNSTATE被修改,而自己上锁的,但RUNSTATE已经改变了,说明有其他线程修改了这个字段,而只有awaitRunStateLock()方法才可能在其他线程上锁的情况下修改RSIGNAL位(表示该线程需要唤醒),如果是这样,那么就需要唤醒其他线程。
            Object lock = stealCounter;
            //不管是否有线程等待或者其他原因解锁失败,都会强制解锁
            runState = newRunState;              // clears RSIGNAL bit
            if (lock != null)
                //唤醒其他所有等待锁的线程
                synchronized (lock) { lock.notifyAll(); }
        }
    }

externalSubmit方法

    //用于外部提交任务的方法。
    //何为内部外部?内部:线程池开的线程是内部线程。外部:线程池之外的线程提交的任务,主线程或其他
    private void externalSubmit(ForkJoinTask<?> task) {
            int r;                                    // initialize caller's probe
            //如果进入这里那么表示是第一次提交任务,需要初始化
            //线程第一次调用ThreadLocalRandom.getProbe()结果为0
            if ((r = ThreadLocalRandom.getProbe()) == 0) {
                ThreadLocalRandom.localInit();
                r = ThreadLocalRandom.getProbe();
            }
            //自旋提交任务,可能会出现多线程同时提交
            for (;;) {
                WorkQueue[] ws; WorkQueue q; int rs, m, k;
                boolean move = false; //是否发生多线程竞争
                 //小于0表示线程池已经停了,如果还在提交任务,那么直接抛出异常
                if ((rs = runState) < 0) {
                    tryTerminate(false, false);     // help terminate
                    throw new RejectedExecutionException();
                }
                //如果rs的第四个bit位不是1(不是开始状态),表示还未初始化,那么初始化
                else if ((rs & STARTED) == 0 ||     // initialize
                    //如果是rs的状态已经是STARTED的,还需要判断workQueues是否初始化,因为可能其他的线程正在初始化,已经修改了rs的状态,但还未来得及给workQueues赋值。
                    //如果workQueues已经赋值了为什么还要判断数组长度是否为1呢?workQueues的长度貌似初始化之后就之后增大不会减小
                         ((ws = workQueues) == null || (m = ws.length - 1) < 0)) {
                    int ns = 0;
                    // 如果上述条件都满足,那么就准备初始化,先加锁。
                    rs = lockRunState();
                    try {
                        //再次检查,防止加锁期间被初始化
                        if ((rs & STARTED) == 0) {
                            //先初始化STEALCOUNTER 偷锁
                            U.compareAndSwapObject(this, STEALCOUNTER, null,
                                                   new AtomicLong());
                            // create workQueues array with size a power of two
                            //取config的低16位(确切说是低15位)。在new线程池时,需要传入并行度,这个并行度就放在config的低16位
                            int p = config & SMASK; // ensure at least 2 slots
                            //获取2的次幂数
                            int n = (p > 1) ? p - 1 : 1;
                            n |= n >>> 1; n |= n >>> 2;  n |= n >>> 4;
                            n |= n >>> 8; n |= n >>> 16; n = (n + 1) << 1;
                            //创建工作队列的容器,并标记线程池已经开始工作
                            workQueues = new WorkQueue[n];
                            ns = STARTED;
                        }
                    } finally {
                        //释放锁。将rs的锁标识位(最低位)置为0,并将STARTED的二进制位为1的放入rs
                        unlockRunState(rs, (rs & ~RSLOCK) | ns);
                    }
                }
                //这一步是为了计算出队列数组的偶数下标,并且如果这个索引处有一个队列,那么就将提交的任务放入这个队列,尝试开启新线程。如果走到这一步,!= null可以看出下面的else if一定走了
                //ws[k = r & m & SQMASK]这一步是计算偶数下标:
                //这个计算的目的是基于当前线程的探针 r 和 workQueues 数组的长度 m,计算得到 workQueues 数组中的索引 k,然后通过 ws[k] 获取相应位置的 WorkQueue 对象。

                //SQMASK 的作用是确保通过 r & m 计算得到的索引 k 在 workQueues 数组长度范围内,且为偶数。这样确保了分配到的 WorkQueue 对象的索引是偶数,因为在后续的代码中,会通过 (s & 1) == 0 判断当前队列是否是共享队列,而共享队列的索引需要是偶数
                else if ((q = ws[k = r & m & SQMASK]) != null) {
                    //如果当前队列没有上锁,那么尝试cas上锁
                    if (q.qlock == 0 && U.compareAndSwapInt(q, QLOCK, 0, 1)) {
                    //上锁成功,那么就把任务提交给当前队列
                        ForkJoinTask<?>[] a = q.array; //这里没有竞争的话为null
                        int s = q.top; 
                        boolean submitted = false; // initial submission or resizing
                        try {                      // locked version of push
                            //如果队列已经初始化了,并且有足够的空间则放入队列
                            if ((a != null && a.length > s + 1 - q.base) ||
                            //或者队列没有初始化,那么就去初始化队列,成功则放入队列
                                (a = q.growArray()) != null) {
                                //因为需要cas将任务放入数组,所以需要计算索引top的在内存中的偏移量,需要将任务放在此处
                                int j = (((a.length - 1) & s) << ASHIFT) + ABASE;
                                U.putOrderedObject(a, j, task);//放入任务
                                U.putOrderedInt(q, QTOP, s + 1); //top加1
                                submitted = true; //标记为已经提交
                            }
                        } finally { //解锁
                            U.compareAndSwapInt(q, QLOCK, 1, 0);
                        }
                        if (submitted) { //如果提交成功,那么尝试创建线程或者唤醒等待线程处理任务
                            signalWork(ws, q);
                            return;
                        }
                    }
                    //加锁失败,表示有其他线程也在处理这个索引下的队列,那么标记move,进入下次自旋
                    move = true;                   // move on failure
                }
            //走到这说明计算的出来的索引位置没有队列,且线程池状态可用,那么创建新的队列放入索引位置
                else if (((rs = runState) & RSLOCK) == 0) { // create new queue
                //创建队列
                    q = new WorkQueue(this, null);
                    q.hint = r;
                    //config的第32位置1表示共享队列
                    q.config = k | SHARED_QUEUE;
                    //共享队列的扫描状态直接标记为不活跃
                    //它没有工作线程,也不会参与活化和scan阻塞的过程,也不会将自己的scanState压入ctl后32位做栈元素
                    q.scanState = INACTIVE;
                    //上锁,要开始给队列数组放入队列了
                    rs = lockRunState();           // publish index
                    //上锁之后再次检查,类似双重检查锁。为什么需要再次检查?上锁过程中依然有其他的线程可能修改runState,修改了就影响rs的结果。同样的也有可能因为线程竞争导致当前线程没有初始化队列数组workQueues,几个判断都是再次检查可能因为线程竞争关系导致的不合理的进入
                    if (rs > 0 &&  (ws = workQueues) != null &&
                        k < ws.length && ws[k] == null)
                        ws[k] = q;                 // else terminated
                    unlockRunState(rs, rs & ~RSLOCK);
                }
                else  //走到这里说明存在多线程竞争,标记繁忙
                    move = true;                   // move if busy
                if (move)  //如果是线程池繁忙,那么重置新的r供下一轮使用,继续自旋
                    r = ThreadLocalRandom.advanceProbe(r);
            }
        }

signalWork方法

//如果活动的工作线程太少,则尝试创建或激活工作线程。
final void signalWork(WorkQueue[] ws, WorkQueue q) {
        long c; int sp, i; WorkQueue v; Thread p;
          //ctl初始化的时候最高位为1即负数,每次添加线程时AC(高16位,活跃线程数)和TC(中高16位,总线程数)位都会加1,那么何时为正数?初始化时AC和TC为负的parallelism,即增加了parallelism个线程不小于0
        //ctl<0表示ctl最高位为1,即可以判断AC为负数,即活跃线程数没有达到parallelism(并行度)
        while ((c = ctl) < 0L) {
            //ctl的低32位初始化时默认都为0。这个32位用低16位表示空闲线程的id(可以计算索引下标),如果有多个空闲线程那么保存的就是栈顶的id,高16位表示是否存在空闲线程,有的话最高位为1,其余15位表示版本数(修改次数,防止BA问题)
            //即 (int)ctl==0 那么说明现在没有空闲线程
            if ((sp = (int)c) == 0) {                  // no idle workers
                //ADD_WORKER为long类型的第48位为1,即用于判断ctl的TC位的最高位是否为1
                //ctl & ADD_WORKER !=0 说明TC位为负数,即总线程数还未达标
                if ((c & ADD_WORKER) != 0L)            // too few workers
                    //既然总线程数和活跃线程数都不够,且又没有空闲线程,那么就尝试开启新的线程
                    tryAddWorker(c);
                break;
            }
            //正常执行情况下ws一定有值,如果为null,就表示线程池要么没有初始化,要么就终止了
            if (ws == null)                            // unstarted/terminated
                break;
            //sp & SMASK计算低16位,即索引值。索引会超过ws长度?ws好像只会扩容
            //还是工作线程数组长度不足?
            if (ws.length <= (i = sp & SMASK))         // terminated
                break;
            //活跃线程未满,且又找不到空闲线程,线程池可能正在关闭?
            if ((v = ws[i]) == null)                   // terminating
                break;
            //sp即ctl的低32位,SS_SEQ表示第17位为1, ~INACTIVE表示第32位为0,其他位为1
            //下面的表达式的意思:ctl的中低16位加1,最高位取0。即把当前队列的线程设置为活跃线程,并且ABA位版本号加1,
            int vs = (sp + SS_SEQ) & ~INACTIVE;        // next scanState
            //用相减的方式判断是否还是栈顶元素
            int d = sp - v.scanState;                  // screen CAS
            //这段代码的计算方式:ctl的AC位加1,再把上一个等待栈的队列的stackPred放入ctl的低32位,然后合并。
            //此处并没有增加总线程数,因为是唤醒的等待线程
            long nc = (UC_MASK & (c + AC_UNIT)) | (SP_MASK & v.stackPred);
            //d==0表示当前队列就是等待栈的栈顶元素,也可以表示没有发生线程竞争。如果满足那么尝试cas替换ctl的值
            if (d == 0 && U.compareAndSwapLong(this, CTL, c, nc)) {
                v.scanState = vs;   //更改scanState,表示队列现在处于激活状态       // activate v
                //唤醒等待线程。这里v,parker正常情况下有值,
                if ((p = v.parker) != null)
                    U.unpark(p);
                break; //目的完成,跳出
            }
            //走到这里说明,没有足够活跃的线程,且又没有空闲线程,那么要不要继续自旋?
            //q不为null表示想处理任务,但是队列里又没有任务那么不需要自旋,其他继续自旋
            if (q != null && q.base == q.top)          // no more work
                break;
        }
    }

tryAddWorker方法

//尝试添加一个新工作线程(工作者)
private void tryAddWorker(long c) {
        boolean add = false;
        do {
            //即将添加新的线程了,ctl的AC和TC位表示的活跃线程数和线程总数需要加1
            //表示高32位中两个16位,分别加1
            long nc = ((AC_MASK & (c + AC_UNIT)) |
                       (TC_MASK & (c + TC_UNIT)));
            if (ctl == c) { //判断相等的目的:是否存在多线程竞争。不相等就自旋
                int rs, stop;                 // check if terminating
                //先加锁,再判断线程池是否终止
                if ((stop = (rs = lockRunState()) & STOP) == 0)
                    //没有终止,则cas给ctl赋值
                    add = U.compareAndSwapLong(this, CTL, c, nc);
                unlockRunState(rs, rs & ~RSLOCK); //解锁
                if (stop != 0) //停止了就跳出
                    break;
                if (add) {  /如果cas成功,那么创建线程,
                    createWorker();
                    break; //目的达成跳出
                }
            }
        //走到这里,说明上面的走过一轮了,并且失败了,那么是否要继续自旋?
        //不管怎样重新获取ctl的值,下次自旋使用
        //判断总线程数是否已经饱和,并且是没有空闲线程,继续自旋
        //(int)c == 0表示ctl的后32位,也是等待栈的scanState灭活的值
        } while (((c = ctl) & ADD_WORKER) != 0L && (int)c == 0);
    }

createWorker方法

//这个方法很简单,就是创建一个工作线程ForkJoinWorkerThread,创建失败就销毁
private boolean createWorker() {
        //获取默认的创建线程工厂。工厂模式创建
        ForkJoinWorkerThreadFactory fac = factory;
        Throwable ex = null; //记录异常
        ForkJoinWorkerThread wt = null;
        try {
        //这里就是创建一个线程,创建线程里有一个重要的方法registerWorker(this);大概意思就是给工作线程分配一个队列,并把这个队列放到线程池的数组

            if (fac != null && (wt = fac.newThread(this)) != null) {
                //创建成功直接开启新线程,这里线程的实现类是ForkJoinWorkerThread,自然会去调用它的run方法
                wt.start();
                return true;
            }
        } catch (Throwable rex) {
            ex = rex;
        }
        //失败则调用注销方法
        deregisterWorker(wt, ex);
        return false;
    }

registerWorker方法

protected ForkJoinWorkerThread(ForkJoinPool pool) {
        super("aForkJoinWorkerThread");
        //到这里可以看到,每次创建线程一个线程,每个线程都会记录线程池的实例,这个实例就是我们主线程创建的实例,相当于每个线程共享的
        this.pool = pool;
        //创建队列,队列会记录当前的线程,当前线程也会记录各自的队列
        this.workQueue = pool.registerWorker(this);
    }

//这个方法就是创建一个记录当前线程的队列,并把队列放入线程池的队列数组
final WorkQueue registerWorker(ForkJoinWorkerThread wt) {
        UncaughtExceptionHandler handler;
        //守护线程
        wt.setDaemon(true);                           // configure thread
        //设置异常处理器
        if ((handler = ueh) != null)
            wt.setUncaughtExceptionHandler(handler);
        //创建一个队列的实例,this表示当前线程池,一般是当前线程池实例,wt是当前线程
        //所以队列记录了当前线程和共享的线程池实例
        WorkQueue w = new WorkQueue(this, wt);
        //当前队列放到线程池队列数组的哪个位置,下面会做计算
        int i = 0;                                    // assign a pool index
        //取队列模式,即取config的高16位,0表示:队列,1表示栈
        int mode = config & MODE_MASK;
        //现在把队列放入线程池的队列数组,因为队列数组是多个线程共享的,所以这里操作需要加锁
        int rs = lockRunState();
        try {
            WorkQueue[] ws; int n;                    // skip if no array
            //添加时队列数组必须已经初始化了
            if ((ws = workQueues) != null && (n = ws.length) > 0) {
                //下面这段算法的意义,找到一个奇数索引的位置是否有队列,有的话找下一个,直到找到没有的
                //每次找下一个都会以step为步长,转了n圈还是没有位置,那么就扩容
                int s = indexSeed += SEED_INCREMENT;  // unlikely to collide
                int m = n - 1;
                //i表示奇数索引
                i = ((s << 1) | 1) & m;               // odd-numbered indices
                if (ws[i] != null) {                  // collision
                    int probes = 0;                   // step by approx half n
                    int step = (n <= 4) ? 2 : ((n >>> 1) & EVENMASK) + 2;
                    while (ws[i = (i + step) & m] != null) {
                        if (++probes >= n) {
                            //扩容两倍,保留原数组位置
                            workQueues = ws = Arrays.copyOf(ws, n <<= 1);
                            m = n - 1;
                            probes = 0;
                        }
                    }
                }
                //配置队列的hint,每个队列的hint都不同
                w.hint = s;                           // use as random seed
                //config同样的,高16为记录队列模式,低16位记录索引
                //这里的意思是把当前队列的索引放入config的低16位
                w.config = i | mode;
                //扫描状态。初始值是队列在数组中的索引
                w.scanState = i;                      // publication fence
                ws[i] = w; //放入数组
            }
        } finally {
            unlockRunState(rs, rs & ~RSLOCK);  //解锁
        }
        wt.setName(workerNamePrefix.concat(Integer.toString(i >>> 1)));
        return w;
    }

ForkJoinWorkerThread类

这个类是专门负责处理forkjoin框架的线程类,继承了Thread,在线程池开启新的线程时自然而然就会去执行它的run方法。

成员变量

//每次创建当前线程实例时,必定会给这两个字段赋值
final ForkJoinPool pool;                //当前线程池
final ForkJoinPool.WorkQueue workQueue;  //工作队列 // work-stealing mechanics

run方法

public void run() {
        //刚初始化的队列的array一般为null
        if (workQueue.array == null) { // only run once
            Throwable exception = null;
            try {
                onStart(); //空实现,线程开启时回调,给开发者使用

                //开始运行工作,这里会调用线程池的runWorker方法,到池里面扫描任务,扫描到入伍
                pool.runWorker(workQueue); 
            } catch (Throwable ex) {
                exception = ex;
            } finally {
                try {
                    onTermination(exception);  //线程结束时的回调,如果有异常,也可以获取
                } catch (Throwable ex) {
                    if (exception == null)
                        exception = ex;
                } finally {
                    pool.deregisterWorker(this, exception); //线程结束,注销线程,如果有异常也会携带相关异常。
                }
            }
        }
    }

在线程开启时,或者这发生异常时,实现这个类,就可以获取它的回调方法

pol.runWorker方法

final void runWorker(WorkQueue w) {
        w.growArray();                   // 初始化或扩容
        int seed = w.hint;               // initially holds randomization hint
        int r = (seed == 0) ? 1 : seed;  // avoid 0 for xorShift
        for (ForkJoinTask<?> t;;) {
            if ((t = scan(w, r)) != null) //在队列数组中扫描(偷窃)任务,如果偷窃到任务
                w.runTask(t); // 执行当前任务
            else if (!awaitWork(w, r)) //如果找不到任务,就等待任务。如果等待也没有,那么退出循环
                break;

            //每次循环中更新随机数种子r,用于产生下次任务扫描的随机性
            r ^= r << 13; r ^= r >>> 17; r ^= r << 5; // xorshift
        }
    }

扫描和处理任务

scan扫描或偷取任务

这个方法“偷窃”线程的核心实现,与其说是“偷窃”,倒不如说是扫描,这个方法的主要作用是扫描线程池的队列数组,遍历队列数组,从随机索引开始找一个队列,然后从这个队列的底部偷一个任务,偷到就返回这个任务,执行掉。如果在当前队列找不到任务,那么就下一个,直到找不到为止。

如果一直找不到任务,那么就要在这个方法里进行灭活操作,灭活之后,进入其他的等待方法,当前线程就可能被置为等待状态

于此同时,找到任务时,检查线程池的线程是否已经足够,不够开启新线程。新线程开启,重复上面动作,扫描队列,执行任务,或者灭活队列。

//扫描任务,偷窃任务。
private ForkJoinTask<?> scan(WorkQueue w, int r) {
        WorkQueue[] ws; int m; 
        //这里判断队列数组是否为空。为什么要校验m = ws.length - 1 > 0?队列数组可能为0?
        if ((ws = workQueues) != null && (m = ws.length - 1) > 0 && w != null) {
            int ss = w.scanState;                     // initially non-negative
            //origin表示开始索引, r & m表示从随机索引开始
            for (int origin = r & m, k = origin, oldSum = 0, checkSum = 0;;) {
                WorkQueue q; ForkJoinTask<?>[] a; ForkJoinTask<?> t;
                int b, n; long c;
                //取出元素
                if ((q = ws[k]) != null) {
                    //队列里有元素时的处理逻辑
                    //这里的base和top分别表示队列尾部和顶部的元素索引位置
                    if ((n = (b = q.base) - q.top) < 0 &&
                        (a = q.array) != null) {      // non-empty
                        //获取计算队列尾部元素的索引位置
                        long i = (((a.length - 1) & b) << ASHIFT) + ABASE;
                        //获取上一步索引位置的元素,如果这个元素不为null
                        if ((t = ((ForkJoinTask<?>)
                                  U.getObjectVolatile(a, i))) != null &&
                            //为什么还要再次判断q.base == b?在获取索引元素时,可能其他线程偷取了这个元素
                            q.base == b) {
                            //判断当前队列是否为活跃的。
                            //队列在没有元素处理的时候,线程需要灭活(等待状态),即scanState<0
                            if (ss >= 0) {
                                //cas操作将队列尾部元素的索引位置置为null(这时当前索引位置元素已经取出来了,t就是)
                                if (U.compareAndSwapObject(a, i, t, null)) {
                                    //既然取出来了那么base要加1
                                    q.base = b + 1;
                                    //n<-1表示队列里最少有两个元素(任务),那么尝试开启或者唤醒新的线程来协同处理
                                    //队列新增一个任务时top会加1,base不变,这样的话n=base-top,有一个人元素时n就为-1
                                    if (n < -1)       // signal others
                                        //开启或者唤醒新的线程来协同处理
                                        signalWork(ws, q);
                                    //找到了一个任务,返回
                                    return t;
                                }
                            }
                            //如果是不活跃的状态
                            else if (oldSum == 0 &&   // try to activate
                                     w.scanState < 0)
                                //那么尝试唤醒栈顶的队列
                                //为什么这里是唤醒栈顶队列,而不是当前队列?
                                //因为ctl的低32位存的是等待栈的栈顶’队列‘,这个栈通过stackPred字段记录上一个元素的位置来形成等待栈,如果唤醒的是当前队列,那么可能斩断了这个栈
                                tryRelease(c = ctl, ws[m & (int)c], AC_UNIT);
                        }
                        //如果队列的不活跃,有可能在上面就可能重新被激活了,又或者被其他线程修改了,因为能进来这个方法就说明这个队列是活跃的,这里需要重置扫描状态
                        if (ss < 0)                   // refresh
                            ss = w.scanState;
                        //如果能进到这个大的if语句,但又走到这里,那么说明一定发生了多线程竞争,那么一切归零,换个索引位置,重新扫描,去找新任务
                        //为什么到这里就一定发生了多线程竞争?因为进入这个大if条件,说明是有找到任务的,正常走,找到任务就return了,不会到这。
                        r ^= r << 1; r ^= r >>> 3; r ^= r << 10;
                        origin = k = r & m;           // move and rescan
                        oldSum = checkSum = 0;
                        continue;
                    }
                    //用来记录,找到了队列,但又没有找到任务
                    checkSum += b;
                }
                //这个if的作用是如果找了一圈都没有找到任务,说明暂时没有任务,那么就准备灭活这个队列了
                //origin表示初始索引,如果这个条件相等,那么表示最少转完一圈了,或者几圈
                if ((k = (k + 1) & m) == origin) {    // continue until stable
                //之前一直没想通,为什么在灭活之前要用这三个条件?
                //oldSum == checkSum 这个条件一定要满足,这个表示检查了一圈或n圈,所有队列是空的,但凡有’多余‘任务都不需要灭活,因为从上面的可以看到,checkSum和oldSum不相等只有checkSum += b处代码,如果有任务或者发生线程竞争,也走不到checkSum += b
                //其次是ss>=0 这个是必须的,需要检查队列是否已经被灭活了,如果被灭活了,那么还需要检查ss = w.scanState(是否发生线程竞争),因为有可能不是自己灭活的(其他线程竞争灭活的),如果发生线程竞争,那么checkSum归零,继续自旋
                    if ((ss >= 0 || (ss == (ss = w.scanState))) &&
                        oldSum == (oldSum = checkSum)) {
                        //准备灭活,再次检查是否已经被灭活了,或者是否已经终止了
                        //正常的话,第一次进来这里会跳过,只有下面执行了灭活操作,第二次自旋时就会进入这个break
                        if (ss < 0 || w.qlock < 0)    // already inactive
                            break; 灭活或者终止则跳出
                        //灭活操作,这个是将扫描状态置为负数
                        int ns = ss | INACTIVE;       // try to inactivate
                        //重新计算线程池新的ctl,这个是将队列的scanStatus放入ctl的低32位,然后将ctl的AC位(活跃线程数)减1
                        //这个之前说过了ctl低32位放的就是等待栈(被灭活的)的栈顶队列的scanStatus
                        long nc = ((SP_MASK & ns) |
                                   (UC_MASK & ((c = ctl) - AC_UNIT)));
                        //获取上一个等待栈的栈顶队列,把它挂在当前灭活队列的stackPred
                        //在这里就可以体现等待栈的形成主要靠stackPred字段连接两个被灭活的队列
                        w.stackPred = (int)c;         // hold prev stack top
                        U.putInt(w, QSCANSTATE, ns); //cas置换当前队列的扫描状态。
                        //更换新的ctl的值
                        if (U.compareAndSwapLong(this, CTL, c, nc))
                            ss = ns; //cas成功,直接更改ss,再转一圈就可以跳出自旋了,因为跳出自旋在上面
                        else
                            w.scanState = ss;   //cas失败,就还原队列的扫描状态,继续自旋      // back out
                    }
                    //转完一圈,检查位复位
                    checkSum = 0;
                }
            }
        }
        return null; //找不到任务
    }

总结来说,scan()方法主要做三件事:

  1. 在共享队列数组中找任务
  2. 有多的任务开启或唤醒线程
  3. 没有任务就灭活队列

runTask方法

用于执行任务的方法

// WorkQueue队列运行任务的方法
final void runTask(ForkJoinTask<?> task) {
            if (task != null) {
                //目的是将scanState转为偶数,即最低位置为0
                //scanState为偶数,说明表示队列处于工作状态
                scanState &= ~SCANNING; // mark as busy
                //这个方法会执行 抽象方法exec(),  exec()方法由自己的子类去实现
                //子类有RecursiveAction,RecursiveTask。在这里会执行线程的run方法
                (currentSteal = task).doExec();
                //currentSteal表示偷窃的任务,完了这个任务,那么将这个实例字段置null
                U.putOrderedObject(this, QCURRENTSTEAL, null); // release for GC
                //执行当前队列自己的任务,这个会从队列顶部取任务。而偷窃则是从底部
                execLocalTasks();
                ForkJoinWorkerThread thread = owner;
                //nsteals初始值就是0,会小于0?
                if (++nsteals < 0)      // collect on overflow
                    transferStealCount(pool); //忙不过来了转到其他线程
                scanState |= SCANNING;//取消工作状态
                if (thread != null)  //每次执行完任务时的回调方法,子类实现
                    thread.afterTopLevelExec();
            }
        }

doExec方法

最后会调用各自的实现,用于执行的代码逻辑编写,也就是我们常用的RecursiveTask的completed方法

   final int doExec() {
        int s; boolean completed;
        //线程池还未关闭
        if ((s = status) >= 0) {
            try {
                子类实现,执行任务
                completed = exec();
            } catch (Throwable rex) {
                //异常时,记录异常
                return setExceptionalCompletion(rex);
            }
            //标记完成并唤醒等待加入此任务的线程
            if (completed)
                s = setCompletion(NORMAL);
        }
        return s;
    }

任务拆分示例代码

这块业务逻辑就是用分叉线程池ForkJoinPool计算1加到100的结果

public class ForkJoinTaskExample extends RecursiveTask<Integer> {

    public static final int threshold = 2;
    private int start;
    private int end;
    public ForkJoinTaskExample(int start, int end) {
        this.start = start;
        this.end = end;
    }
    @Override
    protected Integer compute() {
        int sum = 0;
        //如果任务足够小就计算任务
        boolean canCompute = (end - start) <= threshold;
        if (canCompute) {
            for (int i = start; i <= start; i++) {
                Thread thread = Thread.currentThread();
                System.out.println("线程名:"+thread.getName()+"完成了任务");
            }
        } else {
            // 如果任务大于阈值,就分裂成两个子任务计算
            int middle = (start + end) / 2;
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);

            // 执行子任务
            leftTask.fork();
            rightTask.fork();

            // 等待任务执行结束合并其结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            // 合并子任务
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkjoinPool = new ForkJoinPool();

        //生成一个计算任务,计算1+2+3+4+...100
        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);

       ForkJoinTask<Integer> submit = forkjoinPool.submit(task);

        try {
            Integer integer = submit.get();
            System.out.println(integer);
        } catch (Exception e) {
           e.printStackTrace();
        }
    }
}

在上述代码中,先是查看任务是否是最小的任务,如果是的就是直接执行,如果不是,就继续分割,执行分割完的子任务,然后等待任务执行完成。在这compute方法其实是分叉递归调用,从最开始的大任务一直分叉,分叉到最小单元,直到执行完所有的分叉单元再逐层向上返回结果。

这里的分叉和等待任务分别用的fork和join,来看看这两个方法的实现。

fork方法

//这个方法的主要目的就是向线程池里提交任务
//在这里区分内部提交和外部提交,因为线程池里的线程都是ForkJoinWorkerThread类型,即ForkJoinWorkerThread为内部提交,主(其他)线程就是外部提交
//在源码中的注释大概意思为:在线程之外的任何线程中,任务完成并重新初始化之前多次fork一个任务,存在线程安全,因为后续操作未必一致可观察,如果想要解决这个问题,那么可以先调用join或相关方法,或调用isDone返回true
public final ForkJoinTask<V> fork() {
        Thread t;
        //如果是内部提交,那么直接往数组里push
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
        //这里调用的push是t.workQueue ,即获取线程里的队列推送方法,所以这里的push方法不用考虑线程安全问题(单线程)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            //外部提交则调用外部提交的方法。
            ForkJoinPool.common.externalPush(this);
        return this;
    }

这个方法没有太多的逻辑,说白了就是向当前线程的队列中添加任务,提交之后就不管了。

join方法

//等待返回执行结果,工作的是doJoin方法
//异常完成时,调用线程会中断并抛出异常,而get()方法不会
public final V join() {
        int s;
        //如果不是正常完成的,那么需要上报异常,实际上是抛出对应记录的异常
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);//抛出记录的异常
        //返回结果回调方法,子类实现
        return getRawResult();
    }

doJoin方法

    //该方法在等待之前会帮助线程做任务,如果没有得到任务,那么就等待。
    private int doJoin() {
            int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
            //status表示当前任务的状态,小于0表示结束(包含异常或正常结束),结束则返回
            return (s = status) < 0 ? s :
                //1.如果当前线程是线程池的线程,那么就尝试从队列中拿出(取消推送)这个任务,如果拿出成功,就继续做任务,如果做任务也成功了就返回。如果拿失败或者做失败了,就说明有线程竞争在处理这个任务,那么就调用等待方法。
                ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
                (w = (wt = (ForkJoinWorkerThread)t).workQueue).
                //tryUnpush尝试取消推送,这个方法通过cas操作拿任务,判断提交的任务是否还在队列的顶部,如果在那么就拿出来,不在就拿失败
                //doExec正常执行任务的方法。
                tryUnpush(this) && (s = doExec()) < 0 ? s :
                //等待
                wt.pool.awaitJoin(w, this, 0L) :
                //2.如果不是是其他线程,那么调用外部等待方法。
                externalAwaitDone();
        }

awaitJoin方法

    //等待任务执行完毕
    final int awaitJoin(WorkQueue w, ForkJoinTask<?> task, long deadline) {
            //记录任务状态,返回结果
            int s = 0;
            if (task != null && w != null) {
                //暂时取出上一个等待的任务
                ForkJoinTask<?> prevJoin = w.currentJoin;
                //将当前任务放入currentJoin
                U.putOrderedObject(w, QCURRENTJOIN, task);
                //如果任务是CountedCompleter类型,下面会帮助执行
                CountedCompleter<?> cc = (task instanceof CountedCompleter) ?
                    (CountedCompleter<?>)task : null;
                for (;;) {
                //任务结束了,跳出
                    if ((s = task.status) < 0)
                        break;
                    //CountedCompleter任务,帮助执行
                    if (cc != null)
                        helpComplete(w, cc, 0);
                    //如果当前队列没有任务了,那么帮助偷窃线程执行任务。
                    //如果存在任务,那么尝试移除并执行这个任务。
                    为什么帮助执行成功了,还要再次帮助偷窃者执行任务呢?因为 task.doExec();不等待执行结果?根据源码来看是等待的呀
                    else if (w.base == w.top || w.tryRemoveAndExec(task))
                        helpStealer(w, task); //满足条件帮助执行
                    //任务结束了,跳出
                    if ((s = task.status) < 0)
                        break;
                    long ms, ns; //毫秒,纳秒
                    //如果上面的都不满足了,那么就要等待了。下面计算等待时间并执行等待
                    if (deadline == 0L)
                        ms = 0L;//永久等待,dealine为0
                    //如果设置了等待时间,计算等待时间,已经过了等待时间,直接跳出
                    else if ((ns = deadline - System.nanoTime()) <= 0L)
                        break;
                    //没有超过等待时间,那么将纳秒转毫秒,如果设置了负数,那么校正为等待1豪秒
                    //这里还需要判断==0? 如果等于0,不是进到上面的if了?
                    else if ((ms = TimeUnit.NANOSECONDS.toMillis(ns)) <= 0L)
                        ms = 1L;
                    //等待之前,再尝试一次有没有其他的等待线程,有就唤醒,
                    //或者队列里是否有任务,没有就活跃线程数-1
                    //还有任务,再看看是否还能开启新线程(这里只是开启新线程,不做任务)。
                    if (tryCompensate(w)) {
                        //满足以上一个条件就执行等待操作,不满足就继续自旋
                        task.internalWait(ms);
                        //代码能走到说明,已经过了等待时间或者被唤醒了,那么这个线程是就变成活跃线程了,需要活跃线程数+1,然后继续自旋
                        U.getAndAddLong(this, CTL, AC_UNIT);
                    }
                }
                //走到这说明跳出自旋了,跳出自旋:任务结束了或者过了指定的等待时间
                //将之前 准备等待的任务复位
                U.putOrderedObject(w, QCURRENTJOIN, prevJoin);
            }
            return s; //返回任务状态
        }

tryRemoveAndExec方法

这个方法只在awaitJoin方法中执行

//队列为空且任务未完成时,返回true
//这个方法的目的就是在为了‘是否帮助偷取线程’做最后的检验,
//说白了就是这个任务在当前线程发出了等待请求,但是当前线程不想等待,但又任务被人偷走了,我还是帮他吧,或者我自己把这个任务做掉
final boolean tryRemoveAndExec(ForkJoinTask<?> task) {
            ForkJoinTask<?>[] a; int m, s, b, n;
            //非空检验
            if ((a = array) != null && (m = a.length - 1) >= 0 &&
                task != null) {
                //每次重新获取base和top的值,并且检查队列是否为空
                while ((n = (s = top) - (b = base)) > 0) {
                    //内层循环每次从top到base遍历队列
                    for (ForkJoinTask<?> t;;) {      // traverse from s to b
                        //获取队列顶部的任务的索引,每次都会向下移动
                        long j = ((--s & m) << ASHIFT) + ABASE;
                        //每次进来都需要获取队列的顶部元素是否为null,如果满足说明队列为空,那么需要跳出循环
                        if ((t = (ForkJoinTask<?>)U.getObject(a, j)) == null)
                            //这里为了检查多线程竞争,如果s+1==top表示是队列顶部,表示没有发生线程竞争
                            //为什么相等就没有发生线程竞争?因为进来的时候取过了base和top的值,说明进来的时候这个区间是有任务,按照正常逻辑队列顶部任务为null的话,是自己执行掉的,此时s+1==top,如果中间元素为null说明有其他线程把任务偷走了。
                            return s + 1 == top;     // shorter than expected
                        //找到了目标任务
                        else if (t == task) {
                            boolean removed = false;
                            //如果还是队列顶部,那么直接从顶部移除任务,打移除标记
                            if (s + 1 == top) {      // pop
                                if (U.compareAndSwapObject(a, j, task, null)) {
                                    U.putOrderedInt(this, QTOP, s);
                                    removed = true;
                                }
                            }
                            //如果不是顶部,那么要检查是否发生多线程竞争,其他线程可能会从底部偷取任务,所以判断base是否发生变化
                            else if (base == b)      // replace with proxy
                            //如果没变,那么需要把这个任务拿出来,但是又因为此时不是队列顶部,队列正常获取元素需要从顶部弹出,此时在队列中间,那么拿出这个元素,用个空类型的任务代替,防止线程问题。
                                removed = U.compareAndSwapObject(
                                    a, j, task, new EmptyTask());
                            //只有找到目标元素了,都需要判断下是否移除成功,移除了那么就需要执行掉,然后继续自旋
                            //继续自旋的目的是为了检查队列是否为空?
                            if (removed)
                                task.doExec();
                            break; //跳出去,检查下任务是否被完成
                        }
                        //循环一轮还没有找到目标任务,那么判断当前任务是否在队列顶部,并是否完成
                        //为什么要这样判断?因为这个方位的源码注释就是检查队列是否为空,所以在检查的同时,也帮忙清空已经完成的任务,方便自己更好得检查。如果完成了还必须在顶部,因为设计队列需要从顶部出列,中间出列会有问题
                        else if (t.status < 0 && s + 1 == top) {
                            if (U.compareAndSwapObject(a, j, t, null))
                                U.putOrderedInt(this, QTOP, s);
                            break;    //跳出去,检查下任务是否被完成     // was cancelled
                        }
                        //一圈或者n圈转完了,还是没有找到目标任务,且队列不为空,那么返回false
                        if (--n == 0)
                            return false;
                    }
                    //兜底,每一轮内层循环跳出来或者遍历结束时,都需要检查下任务是否完成了
                    //因为任务完成了,我就不要帮助偷取线程执行任务了
                    if (task.status < 0)
                        return false;
                }
            }
            return true;
        }

helpStealer方法

这个方法大体逻辑如下:

  1. 寻找窃取者线程: 当一个线程等待另一个任务完成时,它会调用这个方法来寻找可以窃取任务的线程。这个方法会遍历所有工作队列,寻找正在执行与等待任务相关的子任务的线程。
  2. 帮助窃取者线程: 如果找到正在执行相关子任务的线程,这个方法会尝试帮助它窃取任务。它会从窃取者线程的队列中获取任务,然后交给等待的线程执行。
  3. 稳定性检查: 为了确保代码的稳定性,这个方法会进行一些检查,例如比较工作队列的长度和任务的状态。

说白了就是尽量不能让等待线程闲下来

//找到偷窃队列,如果偷窃队列中也有任务,那么尝试从偷窃队列中偷任务,让等待线程执行
private void helpStealer(WorkQueue w, ForkJoinTask<?> task) {
        WorkQueue[] ws = workQueues;
        int oldSum = 0, checkSum, m;
        //非空校验
        //变量解释:
        //ws: 工作队列数组
        //w: 调用者工作队列
        //task: 等待的任务
        //subtask: 子任务
        //v: 窃取者工作队列
        //j: 等待线程的工作队列
        //hint: 窃取队列的索引
        if (ws != null && (m = ws.length - 1) >= 0 && w != null &&
            task != null) {
            //最外层循环:任务完成检查,并且稳定检查
            do {                                       // restart point
                checkSum = 0;                          // for stability check
                ForkJoinTask<?> subtask;
                WorkQueue j = w, v;                    // v is subtask stealer
                descent: for (subtask = task; subtask.status >= 0; ) {
                    //h计算为奇数,每次从奇数索引其队列,步长为2,每次都取奇数队列
                    //为什么每次取奇数队列?
                    for (int h = j.hint | 1, k = 0, i; ; k += 2) {
                        if (k > m)  //找不到偷窃队列,跳到最外层循环 // can't find stealer
                            break descent;
                        //循环找偷窃队列,找到就返回
                        if ((v = ws[i = (h + k) & m]) != null) {
                            if (v.currentSteal == subtask) {
                                j.hint = i;
                                break;
                            }
                            checkSum += v.base;
                        }
                    }
                    //走到这说明一定找到了偷窃队列,开始帮助偷窃者执行任务。
                    for (;;) {                         // help v or descend
                        ForkJoinTask<?>[] a; int b;
                        checkSum += (b = v.base);
                        //拿到偷窃队列的等待任务
                        ForkJoinTask<?> next = v.currentJoin;
                        //如果subtask已是完成态,或发现竞态等情况造成数据已脏,如发现本轮循环中j的当前join已不是当前subtask,
                        //或v的当前steal不是subtask,说明出现了脏数据,直接终止循环二,重新进入while循环重初始化jv.
                        if (subtask.status < 0 || j.currentJoin != subtask ||
                            v.currentSteal != subtask) // stale
                            break descent;
                        //如果偷窃队列已空
                        if (b - v.top >= 0 || (a = v.array) == null) {
                            //并且偷窃队列的等待任务也是空的,重新while判定循环条件,重初始化jv
                            if ((subtask = next) == null)
                                break descent;
                            //如果偷窃队列还有处于等待中的任务,那么将偷窃队列改为等待线程的工作队列,重新开始寻找等待队列的偷窃者,按照此逻辑直到清空任务为止
                            j = v;
                            break;
                        }
                        //等待队列反偷‘偷窃队列’的任务,从底部开始
                        int i = (((a.length - 1) & b) << ASHIFT) + ABASE;
                        ForkJoinTask<?> t = ((ForkJoinTask<?>)
                                             U.getObjectVolatile(a, i));
                        //是否发生线程竞争,如果发生了,那就回头再来,就重新帮助偷窃队列执行任务
                        if (v.base == b) {
                            //任务被处理了,回到外层,为什么直接回到最外层?
                            if (t == null)             // stale
                                break descent;
                            //cas反偷任务,从底部开始
                            if (U.compareAndSwapObject(a, i, t, null)) {
                                v.base = b + 1;
                                //现在要帮助偷窃队列执行任务,等待队列要偷取偷窃队列的任务,先把当前偷窃的任务暂时拿出去 ,要放现在偷取的任务了。
                                ForkJoinTask<?> ps = w.currentSteal;
                                int top = w.top;
                                do {
                                    //放入偷窃容器,执行掉
                                    U.putOrderedObject(w, QCURRENTSTEAL, t);
                                    t.doExec();        // clear local tasks too
                                //只要参数task还未完成,w新压入了任务,则依次尝试从w中pop元素,和前面的t一样按序执行(此处顺带执行自己的任务)
                                } while (task.status >= 0 &&
                                         w.top != top &&
                                         (t = w.pop()) != null);
                                U.putOrderedObject(w, QCURRENTSTEAL, ps);
                                if (w.base != w.top)
                                    return;            // can't further help
                            }
                        }
                    }
                }
                //每一轮中检查 checkSum 变量。如果 checkSum 变量与 oldSum 变量不同,则表明代码不稳定
                //因为窃取者线程的工作队列已被其他线程修改,或者窃取者线程的工作队列已被清空。
            } while (task.status >= 0 && oldSum != (oldSum = checkSum));
        }
    }

tryCompensate方法

  1. tryCompensate 方法主要用于防止工作线程池过载。

  2. 尝试通过释放空闲工作线程、解除补偿活动计数和创建新工作线程来实现这一目标。

  3. 有助于确保 ForkJoinPool 的性能和稳定性
//尝试减少活动计数(有时是隐式的),并可能释放或创建新工作线程,用于执行接下来要等待的任务
//发生争用、检测到陈旧性、不稳定性或终止时,返回 false
private boolean tryCompensate(WorkQueue w) {
        //记录返回状态
        boolean canBlock;
        //ws线程池队列数组的副本,c保留的线程池的ctl信号量,m队列数组长度,pc当前并行度,sp为ctl信号量低32位
        WorkQueue[] ws; long c; int m, pc, sp;
        //当前队列是否为空,或者被其他线程占用,队列数组为空,并行度为0
        if (w == null || w.qlock < 0 ||           // caller terminating
            (ws = workQueues) == null || (m = ws.length - 1) <= 0 ||
            (pc = config & SMASK) == 0)           // parallelism disabled
            canBlock = false;
        //如果一切正常,那么检查是否存在空闲线程,如果有空闲线程,那么ctl低32保留了空闲线程的信息
        else if ((sp = (int)(c = ctl)) != 0)      // release idle worker
            //存在空闲线程,尝试激活
            canBlock = tryRelease(c, ws[sp & m], 0L);
        //如果所有线程都在工作,那么准备减少线程,因为执行当前任务的线程就要等待了
        //在减少之前需要做些检查。具体检查如下
        else {
            //取出当前的活跃线程数加上设置的并行度,以及当前线程总数加并行度。为了检查做准备
            int ac = (int)(c >> AC_SHIFT) + pc;
            int tc = (short)(c >> TC_SHIFT) + pc;
            //遍历队列数组,能到这个分支,说明所有队列都忙碌,那么再次检查是不是真的所有队列都在忙碌
            //为什么只遍历奇数队列?回头再看
            int nbusy = 0;                        // validate saturation
            for (int i = 0; i <= m; ++i) {        // two passes of odd indices
                WorkQueue v;
                if ((v = ws[((i << 1) | 1) & m]) != null) {
                    if ((v.scanState & SCANNING) != 0)
                        break; //扫描状态的队列表示忙碌的,即scanState为奇数,如果有不忙碌的,跳出
                    ++nbusy;//正常情况下,遍历完,nbusy和总线程数相等的,不相等说明有队列不忙碌
                }
            }
            //检查工作线程数是否不稳定或过时
            //nbusy不是i应该等于pc吗?nbusy != (tc << 1) 这是啥意思
            if (nbusy != (tc << 1) || ctl != c)
                canBlock = false;                 // unstable or stale
            //如果队列是空的,就减少活跃线程数,在此之前,检查线程总数,以及至少有一个活跃线程数
            //在这里可以看出,我们传入的并行度并不是线程池的线程数
            else if (tc >= pc && ac > 1 && w.isEmpty()) {
                long nc = ((AC_MASK & (c - AC_UNIT)) | //活跃线程数减1
                           (~AC_MASK & c));       // uncompensated
                canBlock = U.compareAndSwapLong(this, CTL, c, nc); 
            }
            //线程数是否超标
            else if (tc >= MAX_CAP ||
                     (this == common && tc >= pc + commonMaxSpares))
                throw new RejectedExecutionException(
                    "Thread limit exceeded replacing blocked worker");
            //如果线程数没了,或者队列不为空,那么就要尝试创建新的线程了
            //因为马上有线程要等待了,需要有线程唤醒
            else {                                // similar to tryAddWorker
                boolean add = false; int rs;      // CAS within lock
                long nc = ((AC_MASK & c) |
                           (TC_MASK & (c + TC_UNIT))); //线程总数加1
                if (((rs = lockRunState()) & STOP) == 0)
                    add = U.compareAndSwapLong(this, CTL, c, nc);
                unlockRunState(rs, rs & ~RSLOCK);
                canBlock = add && createWorker(); //开启新的线程 // throws on exception
            }
        }
        return canBlock;
    }

文章作者: Needle
转载声明:本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Needle !
  目录
  评论