首页 > JDK 源码剖析 —— ThreadPoolExecutor
头像
何人听我楚狂声
编辑于 2021-02-24 14:01
+ 关注

JDK 源码剖析 —— ThreadPoolExecutor

Java 中的线程池,一般都是围绕 ThreadPoolExecutor 展开的,其他的实现要么是基于它,要么是模仿它的思想。所以只要理解 ThreadPoolExecutor,就相当于完全理解了 Java 线程池的精髓。

我们可以提前给线程池下一个定义:提供预定义好的线程,供调用者直接执行任务的工具。

本章中的源码基于 JDK 1.8。

线程池优点

也可以说是池化的优点,可类推到各种如连接池、内存池等各种 “池” 的优点。

  1. 降低资源消耗:通过池化技术重复利用已创建的线程,降低线程创建和销毁造成的损耗。
  2. 提高响应速度:任务到达时,无需等待线程创建即可立即执行。
  3. 提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性。使用线程池可以进行统一的分配、调优和监控。
  4. 提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。比如延时定时线程池ScheduledThreadPoolExecutor,就允许任务延期执行或定期执行。

类签名及继承关系

ThreadPoolExecutor 类签名如下:

public class ThreadPoolExecutor extends AbstractExecutorService

ThreadPoolExecutor 类继承了 AbstractExecutorService 类,再向上,一个整体的继承关系如下 UML 类图所示:

ThreadPoolExecutor 实现的顶层接口是 Executor,顶层接口 Executor 提供了一种思想:将任务提交和任务执行进行解耦。用户无需关注如何创建线程,如何调度线程来执行任务,用户只需提供 Runnable 对象,将任务的运行逻辑提交到执行器(Executor)中,由 Executor 框架完成线程的调配和任务的执行部分。ExecutorService 接口增加了一些能力:

  • 扩充执行任务的能力,补充可以为一个或一批异步任务生成Future的方法
  • 提供了管控线程池的方法,比如停止线程池的运行。

AbstractExecutorService 则是上层的抽象类,将执行任务的流程串联了起来,保证下层的实现只需关注一个执行任务的方法即可。最下层的实现类 ThreadPoolExecutor 实现最复杂的运行部分,ThreadPoolExecutor 将会一方面维护自身的生命周期,另一方面同时管理线程和任务,使两者良好的结合从而执行并行任务。

我们可以在这里就先看一下 ThreadPoolExecutor 的运行模型,如下图(图源美团技术团队):

ThreadPoolExecutor 在内部实际上构建了一个生产者消费者模型,将任务看作产品,将任务提交和线程执行解耦。ThreadPoolExecutor 可以在逻辑上分成两个部分:任务管理和线程管理。任务管理充当生产者的角色,当有任务提交后,由线程池决定后续流转:

  1. 直接申请线程执行任务
  2. 缓冲到阻塞队列等待
  3. 拒绝任务

线程管理部分承担消费者的角色,它们被统一维护在线程池内,根据任务请求进行线程的分配,当线程执行完任务后则会继续获取新的任务去执行,最终当线程获取不到任务的时候,线程就会被回收。

生命周期管理

ThreadPoolExecutor 内部会随着线程池运行自行维护线程池状态。ThreadPoolExecutor 内部同时将线程数量(workerCount)和运行状态(runState)封装在一个变量中统一进行维护:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

ctlAtomicInteger 类型,其高 3 位保存 runState,低 29 位保存 workerCount。在源码中大部分情况都要同时获取这两个变量来判断状态,使用一个变量存储可以避免在改变状态时,不必去为了维护两者一致而占用锁。源码中也提供了一些方法供用户获得线程池当前的运行状态、线程个数。这些方法一般都是通过位运算:

// mask
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池将其运行状态分成五种:

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

这里的左移实际上是因为 ctl 中只有高三位是表示运行状态的。每个状态具体如下:

运行状态 描述
RUNNING 接受新提交的任务,也能处理阻塞队列中的任务
SHUTDOWN 不再接受新提交任务,但是仍然能继续处理阻塞队列中的任务
STOP 不再接受新任务,也不再处理阻塞队列中的任务,同时中断正在处理任务的线程
TIDYING 所有任务都被终止,workerCount 为 0
TERMINATED TIDYING 状态时会自动调用 terminated 方法,方法调用完成后进入本状态

声明周期转换如下图所示:

任务调度

任务调度是线程池的入口,当用户提交了一个任务,接下来这个任务的全部过程(执行或拒绝)都由这个阶段负责。

任务调度依赖于几个很重要的参数,这些参数在线程池构造时就会被设置,ThreadPoolExecutor 最长的构造方法如下:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler)

几个参数解释如下:

corePoolSize: 线程池核心线程数(平时保留的线程数),使用时机: 在初始时刻,每次请求进来都会创建一个线程直到达到该size
maximumPoolSize: 线程池最大线程数,使用时机: 当workQueue都放不下时,启动新线程,直到最大线程数,此时到达线程池的极限
keepAliveTime/unit: 超出corePoolSize数量的线程的保留时间,unit为时间单位
workQueue: 任务队列,当核心线程数达到或者超出后,会先尝试将任务放入该队列由各线程自行消费;  
    ArrayBlockingQueue: 构造函数一定要传大小
    LinkedBlockingQueue: 构造函数不传大小会默认为65536(Integer.MAX_VALUE ),当大量请求任务时,容易造成 内存耗尽。
    SynchronousQueue: 同步队列,一个没有存储空间的阻塞队列 ,将任务同步交付给工作线程。
    PriorityBlockingQueue: 优先队列
threadFactory:线程工厂,用于线程需要创建时,调用其newThread()生产新线程使用
handler: 饱和策略,当队列已放不下任务,且创建的线程已达到 maximum 后,则不能再处理任务,直接将任务交给饱和策略
    AbortPolicy: 直接抛弃(默认)
    CallerRunsPolicy: 用调用者的线程执行任务
    DiscardOldestPolicy: 抛弃队列中最久的任务
    DiscardPolicy: 抛弃当前任务

整体的一个任务流转过程可以由下图表示:

总体流程总结如下:

  1. 判断核心线程池是否已满,如果不是,则创建线程执行任务
  2. 如果核心线程池满了,判断队列是否满了,如果队列没满,将任务放在队列中
  3. 如果队列满了,则判断线程池是否已满,如果没满,创建线程执行任务
  4. 如果线程池也满了,则按照拒绝策略对任务进行处理

任务调度策略的入口是 execute() 方法,它主要的工作是,检查现在线程池的运行状态、运行线程数、运行策略,决定接下来执行的流程,是直接申请线程执行,或是缓冲到队列中执行,亦或是直接拒绝该任务。

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 当还没有达到核心线程池的数量时,直接添加 1 个新线程,然后让其执行任务即可
    if (workerCountOf(c) < corePoolSize) {
        // 添加新线程,且执行 command 任务
        // 添加成功,即不需要后续操作了,添加失败,则说明外部环境变化了
        // addWorker 第二个参数 true 表示使用核心线程
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 当核心线程达到后,则尝试添加到阻塞队列中,具体添加方法由阻塞队列实现
    // isRunning => c < SHUTDOWN;
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 添加队列成功后,还要再次检测线程池的运行状态,决定启动线程或者状态过期
        // 当线程池已关闭,则将刚刚添加的任务移除,走reject策略
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 当一个 worker 都没有时,则添加 worker
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 当队列满后,则直接再创建新的线程运行,addWorker 的 false 表示使用 maximumPoolSize
    // 如果不能再创建线程了,则 reject
    else if (!addWorker(command, false))
        reject(command);
}

整个过程没有通过锁,而是仅依靠一个 AtomicInteger ctl 就保证了线程安全。

Worker 管理

线程池为了获取线程状态,维护线程生命周期,使用了工作线程 Worker 作为线程的包装,Worker 部分代码如下:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    final Thread thread;// Worker 持有的线程
    Runnable firstTask;// 初始化的任务,可以为 null
}

Worker 工作线程,持有了一个线程 thread和一个初始化任务 firstTask,同时 Worker 自身也实现了 Runnable 接口。thread 是由线程池构造中的 threadFactory 创建的,而 firstTask 则在 Worker 创建时传入,如果 firstTask 不为 null,Worker 就会在创建完成后立刻执行该任务;如果 firstTask 是 null,说明该 Worker 是一个非核心线程,这个线程就需要去任务队列(workQueue)中获取任务执行。

Worker 执行任务的模型如下图:

对于非核心线程,在创建完成并且没有任务执行后,需要考虑回收的问题。线程池通过一个 HashSet 来持有 Worker 的引用:

private final HashSet<Worker> workers = new HashSet<Worker>();

这样可以通过添加引用、移除引用这样的操作来控制线程的生命周期,这时就需要判断线程是否正在运行。

Worker 通过继承了 AbstractQueuedSynchronizer(AQS)来实现独占,实现了一个不可重入锁来反映线程当前的状态(所以没有直接继承 ReentrantLock 可重入锁)。具体如下:

  1. lock 方法获得独占锁,表示当前线程正在执行
  2. 当线程执行任务完成后,会调用 unlock 释放锁
  3. 线程池在执行 shutdown 方法或 tryTerminate 方法时会调用 interruptIdleWorkers 方法来中断空闲的线程,interruptIdleWorkers 方***使用 tryLock 方法来尝试获得锁,以判断线程池中的线程是否是空闲状态

在线程回收的过程中就用到了上述独占锁的特性,回收过程示意如下:

增加 Worker

增加 Worker 线程主要通过 addWorker() 方法,该方法功能很单一,仅仅是增加一个 Worker,并不会判断当前的状态等,判断策略是在上个步骤(如 execute() 方法)完成的。

addWorker() 方法有两个参数:firstTask、core。firstTask 参数用于指定新增的线程执行的第一个任务,该参数可以为空;core 参数为 true 表示在新增线程时会判断当前活动线程数是否少于 corePoolSize,false 表示新增线程前需要判断当前活动线程数是否少于 maximumPoolSize。addWorker() 流程如下:

addWorker() 方法注释如下:

    private boolean addWorker(Runnable firstTask, boolean core) {
        // 为确保线程安全,进行CAS反复重试
        retry:
        for (;;) {
            int c = ctl.get();
            // 获取runState
            int rs = runStateOf(c);

            // 已经shutdown, firstTask 为空的添加并不会成功
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                // 如果超出最大允许创建的线程数,则直接失败
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // CAS 更新worker+1数,成功则说明占位成功退出retry,后续的添加操作将是安全的
                // 失败则说明已有其他线程变更该值
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                // runState 变更,则退出到 retry 重新循环 
                if (runStateOf(c) != rs)
                    continue retry;
                }
        }
        // 以下为添加 worker 过程
        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            // 使用 Worker 封闭 firstTask 任务,后续运行将由 Worker 接管
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                // 添加 worker 的过程,需要保证线程安全
                mainLock.lock();
                try {
                    int rs = runStateOf(ctl.get());
                    // SHUTDOWN 情况下还是会创建 Worker, 但是后续检测将会失败
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // 既然是新添加的线程,就不应该是 alive 状态
                        if (t.isAlive())
                            throw new IllegalThreadStateException();
                        // workers 只是一个工作线程的容器,使用 HashSet 承载,以保持其引用
                        workers.add(w);
                        int s = workers.size();
                        // 维护一个全局达到过的最大线程数计数器
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                // worker 添加成功后,进行将worker启起来,里面应该是有一个 死循环,一直在获取任务
                // 不然怎么运行添加到队列里的任务呢?
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 如果任务启动失败,则必须进行清理,返回失败
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

Worker 执行任务

Worker 中的 run() 方***调用 runWorker() 来执行任务,方法执行过程如下:

  1. while 循环不断地通过 getTask() 方法获取任务。
  2. getTask() 方法从阻塞队列中取任务。
  3. 如果线程池正在停止,那么要保证当前线程是中断状态,否则要保证当前线程不是中断状态。
  4. 执行任务。
  5. 如果 getTask 结果为 null 则跳出循环,执行 processWorkerExit() 方法,销毁线程。

整体流程如下图所示:

runWorker() 方法注释如下:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // 允许打断
        boolean completedAbruptly = true;
        try {
            // 不停地从 workQueue 中获取任务,然后执行,就是这么个逻辑
            // getTask() 会阻塞式获取,所以 Worker 往往不会立即退出 
            while (task != null || (task = getTask()) != null) {
                // 执行过程中是不允许并发的,即同时只能一个 task 在运行,此时也不允许进行 interrupt
                w.lock();
                // 检测是否已被线程池是否停止 或者当前 worker 被中断
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    // 中断信息传递
                    wt.interrupt();
                try {
                    // 任务开始前 切点,默认为空执行
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        // 直接调用任务的run方法, 具体的返回结果,会被 FutureTask 封装到 某个变量中
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 任务开始后 切点,默认为空执行
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            // 正常退出,有必要的话,可能重新将 Worker 添加进来
            completedAbruptly = false;
        } finally {
            // 处理退出后下一步操作,可能重新添加 Worker
            processWorkerExit(w, completedAbruptly);
        }
    }

Worker 回收

线程池中线程的销毁依赖 JVM 自动的回收,线程池做的工作是根据当前线程池的状态维护一定数量的线程引用,防止这部分线程被 JVM 回收,当线程池决定哪些线程需要回收时,只需要将其引用消除即可。Worker 被创建出来后,就会不断地进行轮询,然后获取任务去执行,核心线程可以无限等待获取任务,非核心线程要限时获取任务。当 Worker 无法获取到任务,也就是获取的任务为空时,循环会结束,Worker 会主动消除自身在线程池内的引用。在上一节 runWorker() 源码中就可以看到。

try {
    while (task != null || (task = getTask()) != null) {
        //执行任务
    }
} finally {
    processWorkerExit(w, completedAbruptly);//获取不到任务时,主动回收自己
}

线程回收的工作是在 processWorkerExit() 方法完成的。

大致流程如下:

代码注释如下:

    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        if (completedAbruptly)
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            // 移出线程池
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            // 在 Worker 正常退出的情况下,检查是否超时导致,维持最小线程数
            if (!completedAbruptly) {
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                // 如果满足最小线程要求,则直接返回
                if (workerCountOf(c) >= min)
                    return;
            }
            // 否则再添加一个 Worker 到线程池中备用
            // 非正常退出,会直接再添加一个 Worker
            addWorker(null, false);
        }
    }

事实上,将线程引用移出线程池就已经结束了线程销毁的部分。但由于引起线程销毁的可能性有很多,线程池还要判断是什么引发了这次销毁,是否要改变线程池的现阶段状态,是否要根据新状态,重新分配线程。

结束语

线程池算是 JDK 源码中综合性很强的部分了,对于很多项目的设计都是很有启发性的。

面试中也主要是针对 ThreadPoolExecutor 的设计理念来提问,甚至可能会扩展到让面试者自行设计。

全部评论

(11) 回帖
加载中...
话题 回帖

推荐话题

相关热帖

热门推荐