Java多线程:线程池ThreadPoolExecutor源码解析


简介

常用的线程池主要有以下几种:

  1. FixedThreadPool: 固定大小的线程池,线程数固定不变。适用于处理数量已知且固定的任务。
  2. CachedThreadPool: 缓存线程池,线程数根据任务动态调整。线程空闲一分钟后会被回收,当任务到来时重新创建线程。适用于处理大量短时间的小任务。
  3. SingleThreadExecutor: 单线程的线程池,确保任务顺序执行。适用于需要保证顺序地执行各个任务。
  4. ScheduledThreadPool: 定时调度线程池,支持定时及周期性执行任务。适用于需要定时执行的重复任务。

实际底层都是采用的ThreadPoolExecutor,使用线程池时,先创建线程池,再往线程池里提交任务,提交任务之后线程池开始执行。先看看如何创建线程池。

结构

构造函数

    /**
    *        
    *corePoolSize: 核心线程数。
    *maximumPoolSize: 最大线程数。
    *keepAliveTime: 线程池中线程的最大闲置生命周期。
    *unit: 针对keepAliveTime的时间单位。
    *workQueue: 阻塞队列。
    *threadFactory: 创建线程的线程工厂。
    *handler: 拒绝策略。
    */
  public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.acc = System.getSecurityManager() == null ?
                null :
                AccessController.getContext();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

成员变量

这里的成员变量ctl用一个原子数表示两个状态,为什么不用两个数表示?因为高并发控制保证多个数的状态是一件消耗性能事

// 状态控制属性:高3位表示线程池的运行状态,剩下的29位表示当前有效的线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));


    // runState is stored in the high-order bits
//运行态,可处理新任务并执行队列中的任务
private static final int RUNNING    = -1 << COUNT_BITS;
//关闭态,不接受新任务,但处理队列中的任务
private static final int SHUTDOWN   =  0 << COUNT_BITS;
//停止态,不接受新任务,不处理队列中任务,且打断运行中任务
private static final int STOP       =  1 << COUNT_BITS;
//整理态,所有任务已经结束,workerCount = 0 ,将执行terminated()方法
private static final int TIDYING    =  2 << COUNT_BITS;
//结束态,terminated() 方法已完成
private static final int TERMINATED =  3 << COUNT_BITS;

// 线程池的基本大小,当提交一个任务到线程池时,线程池会创建一个线程来执行任务,
// 即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于
// 线程池基本大小时就不再创建。如果调用了线程池的prestartAllCoreThreads()方法,
// 线程池会提前创建并启动所有基本线程。
private volatile int corePoolSize;

// 线程池线程最大数量,如果队列满了,并且已创建的线程数小于最大线程数,
// 则线程池会再创建新的线程执行任务。如果使用了无界的任务队列这个参数就没什么效果。
private volatile int maximumPoolSize;

// 用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设 置更有意义的名字。
private volatile ThreadFactory threadFactory;

// 饱和策略,默认情况下是AbortPolicy。
private volatile RejectedExecutionHandler handler;

// 线程池的工作线程空闲后,保持存活的时间。如果任务很多,并且每个任务执行的时间比较短,
// 可以调大时间,提高线程的利用率。
private volatile long keepAliveTime;

// 用于保存等待执行的任务的阻塞队列,具体可以参考[JAVA并发容器-阻塞队列](https://www.jianshu.com/p/5646fb5faee1)
private final BlockingQueue<Runnable> workQueue;

// 存放工作线程的容器,必须获取到锁才能访问
private final HashSet<Worker> workers = new HashSet<Worker>();

// ctl的拆包和包装
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

线程对象

ThreadPoolExecutor中维护了一个Worker类,该类继承了Runnable接口,该类的实例就是一个线程,假设线程池的大小是10,那么就表示开启了十个此类的实例,此实例中执行我们提交的线程任务。

private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {


        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }


    }

提交任务

提交任务一般使用submit(),execute()方法,submit实际上还是调用的execute方法。

execute方法

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    // 获取控制的值
    int c = ctl.get();
    // 判断工作线程数是否小于corePoolSize
    if (workerCountOf(c) < corePoolSize) {
        // 新创建核心线程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 工作线程数大于或等于corePoolSize
    // 判断线程池是否处于运行状态,如果是将任务command入队
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次检查线程池的运行状态,如果不在运行中,那么将任务从队列里面删除,并尝试结束线程池
        if (! isRunning(recheck) && remove(command))
            // 调用驱逐策略
            reject(command);
        // 检查活跃线程总数是否为0
        else if (workerCountOf(recheck) == 0)
            // 新创建非核心线程
            addWorker(null, false);
    }
    // 队列满了,新创建非核心线程
    else if (!addWorker(command, false))
        // 调用驱逐策略
        reject(command);
}

addWorker方法

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 仅在必要的时候检查队列是否为NULL
        // 检查队列是否处于非运行状态
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            // 获取活跃线程数
            int wc = workerCountOf(c);
            // 判断线程是否超过最大值,当队列满了则验证线程数是否大于maximumPoolSize,
            // 没有满则验证corePoolSize
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 增加活跃线程总数,否则重试
            if (compareAndIncrementWorkerCount(c))
                // 如果成功跳出外层循环
                break retry;
            c = ctl.get();  // Re-read ctl
            // 再次校验一下线程池运行状态
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    // 工作线程是否启动
    boolean workerStarted = false;
    // 工作线程是否创建
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 新创建线程
        w = new Worker(firstTask);
        // 获取新创建的线程
        final Thread t = w.thread;
        if (t != null) {
            // 创建线程要获得全局锁
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
                // 检查线程池的运行状态
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    // 检查线程的状态
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 将新建工作线程存放到容器
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize) {
                        // 跟踪线程池最大的工作线程总数
                        largestPoolSize = s;
                    }
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            // 启动工作线程
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            // 启动新的工作线程失败,
            // 1. 将工作线程移除workers容器
            // 2. 还原工作线程总数(workerCount)
            // 3. 尝试结束线程
            addWorkerFailed(w);
    }
    return workerStarted;
}

runWorker方法

final void runWorker(Worker w) {
    // 当前Work中的工作线程
    Thread wt = Thread.currentThread();
    // 获取初始任务
    Runnable task = w.firstTask;
    // 初始任务置NULL(表示不是建线程)
    w.firstTask = null;
    // 修改锁的状态,使需发起中断的线程可以获取到锁(使工作线程可以响应中断)
    w.unlock(); // allow interrupts
    // 工作线程是否是异常结束
    boolean completedAbruptly = true;
    try {
        // 循环的从队列里面获取任务
        while (task != null || (task = getTask()) != null) {
            // 每次执行任务时需要获取到内置的互斥锁
            w.lock();
            // 1. 当前工作线程不是中断状态,且线程池是STOP,TIDYING,TERMINATED状态,我们需要中断当前工作线程
            // 2. 当前工作线程是中断状态,且线程池是STOP,TIDYING,TERMINATED状态,我们需要中断当前工作线程
            if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP)))
                    && !wt.isInterrupted())
                // 中断线程,中断标志位设置成true
                wt.interrupt();
            try {
                // 执行任务前置方法,扩展用
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 执行任务
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // 执行任务后置方法,扩展用
                    afterExecute(task, thrown);
                }
            } finally {
                // 任务NULL表示已经处理了
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 将工作线程从容器中剔除
        processWorkerExit(w, completedAbruptly);
    }
}

正在执行线程的方法,执行流程:

  1. 获取到当前的工作线程
  2. 获取初始化的线程任务
  3. 修改锁的状态,使工作线程可以响应中断
  4. 获取工作线程的锁(保证在任务执行过程中工作线程不被外部线程中断),如果获取到的任务是NULL,则结束当前工作线程
  5. 判断先测试状态,看是否需要中断当前工作线程
  6. 执行任务前置方法​​beforeExecute(wt, task);​​
  7. 执行任务(执行提交到线程池的线程)​​task.run();​​
  8. 执行任务后置方法​​afterExecute(task, thrown);​​,处理异常信息
  9. 修改完成任务的总数
  10. 解除当前工作线程的锁
  11. 获取队列里面的任务,循环第4步
  12. 将工作线程从容器中剔除

整体流程如下:


文章作者: Needle
转载声明:本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Needle !
  目录
  评论