当前位置: 代码迷 >> 综合 >> Android 线程切换(4): ThreadPoolExecutor 的原理和实现
  详细解决方案

Android 线程切换(4): ThreadPoolExecutor 的原理和实现

热度:97   发布时间:2023-12-12 20:16:30.0

文章目录

    • 参考
    • 线程池的运行原理
      • 控制状态ctl
    • 线程池的实现
      • execute
      • shutdown

参考

Java多线程进阶(十三)—— J.U.C之atomic框架:AtomicInteger
Java多线程线程池(4)–线程池的五种状态
Java中interrupt的使用

线程池的运行原理

从上一篇我们已经了解了线程池的使用方法,现在我们来了解线程池是如何实现的。线程池的执行任务的流程如下:
在这里插入图片描述
前面也提到过,线程池最重要的两个方法是 executeshutdown ,execute方法基本上就是上面这张图所描述的内容,而shutdown方法主要跟ThreadPoolExecutor的控制状态ctl有关,这个值和线程池的运行息息相关。所以在研究线程池的实现代码之前,我们要先了解一下它的工作原理。

控制状态ctl

ctl存储了线程池的当前的运行状态和线程数量。其类型为AtomicInteger(AtomicInteger是Integer类型的线程安全原子类,用来在多线程环境以原子的方式安全的更新int值)。ctl的值分为两个部分:前3位是线程池的当前状态,后29位是线程池里的线程数量(就跟View的MeasureSpec类类似),线程池定义了5个状态:

 private static final int COUNT_BITS = Integer.SIZE - 3;private static final int CAPACITY   = (1 << COUNT_BITS) - 1;private static final int RUNNING    = -1 << COUNT_BITS;private static final int SHUTDOWN   =  0 << COUNT_BITS;private static final int STOP       =  1 << COUNT_BITS;private static final int TIDYING    =  2 << COUNT_BITS;private static final int TERMINATED =  3 << COUNT_BITS;

这五种状态的意义分别是:

  • RUNNING:正常运行状态。也是线程池初始化后的状态。
  • SHUTDOWN:不再接收新任务,但会继续处理任务队列中和正在处理的任务。
  • STOP:不再接收新任务,不处理任务队列中的任务,并中断正在处理的任务。
  • TIDYING:所有任务线程已经停止。
  • TERMINATED: 线程池彻底终止。

它们之间的关系如图(来自一只逗比的程序猿):
在这里插入图片描述
可以看到,没有调用shutdown或者shutdownNow的时候,线程池的状态一直是RUNNING;调用了这两个方法会到不同的状态(区别在于会不会执行任务队列中的任务),最终都会到TIDYING直至TERMINATED彻底终止;TIDYING和TERMINATED之间有一个terminated()方法,这个方法的默认实现是一个空方法。
我们由上一篇知道,Executors .newCachedThreadPool()等提供的线程数量的参数是Integer.MAX_VALUE,即2^32 - 1,但现在看代码知道,我们保持线程数量的ctl虽然是一个int,但是这个int的前三位用来保持数量,所以其实可以创建的理论最大线程数量只有2^29 -1个。所以这个参数道理上来讲是不合理的,当然,事实上我们创建这么多的线程早就会内存溢出了(我的实验结果是只创建了3000个就报错了)。
此外我这里有个困惑就是,线程池的状态值是从-1开始的,不知道这是否有什么特殊意义。

线程池的实现

了解了上述内容后,我们可以来分析ThreadPoolExecutor的具体代码了,从shutdown或者shutdownNow这两个关键方法开始,遍历ThreadPoolExecutor的整个流程。

execute

execute的代码如下:

public void execute(Runnable command) {
    if (command == null)throw new NullPointerException();int c = ctl.get();if (workerCountOf(c) < corePoolSize) {
      //1if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {
     //2int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}else if (!addWorker(command, false)) //3reject(command);}

以下是execute方法源码中的注释:

Proceed in 3 steps:
1. If fewer than corePoolSize threads are running, try to start a new thread with the given command as its first task. The call to addWorker atomically checks runState and workerCount, and so prevents false alarms that would add threads when it shouldn’t, by returning false.
2. If a task can be successfully queued, then we still need to double-check whether we should have added a thread because existing ones died since last checking) or that the pool shut down since entry into this method. So we recheck state and if necessary roll back the enqueuing if stopped, or start a new thread if there are none.
3. If we cannot queue task, then we try to add a new thread. If it fails, we know we are shut down or saturated and so reject the task.

这个方法就是添加任务的主要逻辑,简单来说,所有这个方法分为3步:

  • 当前线程数小于核心线程数corePoolSize时,创建线程执行任务。
  • 否则 ,如果线程池正在运行,且任务队列workQueue未满,将任务放入任务队列,二次检查,防止此时有核心线程结束。
  • 否则,如果线程数小于最大允许线程数maximumPoolSize,创建线程执行任务。
  • 否则,触发拒绝策略。

这个方法涉及到的isRunningworkerCountOf都是读取ctl的辅助方法:

private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}
private static int workerCountOf(int c)  {
     return c & CAPACITY; }

顾名思义,isRunning是判断线程池是否正在运行,workerCountOf获取当前活跃的线程数量。而reject方法就是执行拒绝策略。最重要的是addWorker方法:

boolean addWorker(Runnable firstTask, boolean core)

这个方法用于在线程池中添加工作线程,它有两个参数和一个返回值:firstTask是工作线程(如果成功添加)运行的第一个线程,可以为null,core区分新线程是作为核心线程还是非核心线程添加的;返回值表示是否添加了一个新线程。addWorker的实现如下:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:for (;;) {
    int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN &&     //1! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {
    int wc = workerCountOf(c);if (wc >= CAPACITY || //2wc >= (core ? corePoolSize : maximumPoolSize))return false;if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();  // Re-read ctlif (runStateOf(c) != rs)continue retry;// else CAS failed due to workerCount change; retry inner loop}}boolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {
    w = new Worker(firstTask);//3final Thread t = w.thread;if (t != null) {
    final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {
    // Recheck while holding lock.// Back out on ThreadFactory failure or if// shut down before lock acquired.int rs = runStateOf(ctl.get());if (rs < SHUTDOWN ||(rs == `SHUTDOWN` && firstTask == null)) {
       //4if (t.isAlive()) // precheck that t is startablethrow new IllegalThreadStateException();workers.add(w);int s = workers.size();if (s > largestPoolSize)largestPoolSize = s;workerAdded = true;}} finally {
    mainLock.unlock();}if (workerAdded) {
    t.start();//5workerStarted = true;}}} finally {
    if (! workerStarted)addWorkerFailed(w);//6}return workerStarted;}

这个方法有点长,但核心逻辑是十分简单的,按我在代码中标记的顺序

  1. 如果线程池已经是STOP及以上状态,或者是SHUTDOWN状态且所有任务已执行,直接返回false
  2. 如果当前线程数已达到线程池理论最大值(其实不可能)或者创建线程种类(核心线程或非核心线程)所允许最大值。直接返回false
  3. 将firstTask作为参数创建工作线程的封装类Worker
  4. 如果线程池状态是Running或者说是SHUTDOWN但是任务队列中仍有任务,将新线程加入线程集中
  5. 启动线程
  6. 如果中间出了什么问题导致线程激动失败,就进入处理错误的方法。

Worker是ThreadPoolExecutor的内部类,它继承了AbstractQueuedSynchronizer类(AbstractQueuedSynchronizer用于构建线程间同步,不熟悉不敢聊),实现了Runnable接口,用来封装执行的任务和工作线程。Worker的主要实现如下:

//...final Thread thread;Runnable firstTask;volatile long completedTasks;Worker(Runnable firstTask) {
    setState(-1);this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this); }public void run() {
    runWorker(this);}//...

可以看到,thread初始化的时候传入的Runnable就是这个实现了Runnable接口的Worker本身,所以addWorker调用了线程的start方法之后,会调用Worker的run方法,最后调用runWorker方法,这个方法的实现如下:

 final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {
    while (task != null || (task = getTask()) != null) {
     //1w.lock();if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())     //2wt.interrupt();try {
           //3beforeExecute(wt, task);Throwable thrown = null;try {
    task.run();} catch (RuntimeException x) {
    thrown = x; throw x;} catch (Error x) {
    thrown = x; throw x;} catch (Throwable x) {
    thrown = x; throw new Error(x);} finally {
    afterExecute(task, thrown);}} finally {
    task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {
    processWorkerExit(w, completedAbruptly);//4}}

可以看到,这个方法通过while循环,不断调用getTask获取任务,当getTask返回null时,退出循环,即意味着线程结束运行。只有当线程池至少是STOP的时候,才会中断线程执行,而执行任务之前之后分别会调用beforeExecute和afterExecute,这两个方法默认是空方法。如果过程中出现异常,就会用processWorkerExit方法处理异常,这个方法会创建一个新线程取代这个线程。
接下来看getTask的实现:

 private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?for (;;) {
    int c = ctl.get();int rs = runStateOf(c);// Check if queue empty only if necessary.if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
     //1decrementWorkerCount();return null;}int wc = workerCountOf(c);// Are workers subject to culling?boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {
       //2if (compareAndDecrementWorkerCount(c))return null;continue;}try {
    Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();    //3if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {
    timedOut = false;}}}

getTask方法也分为3步:

  1. 如果线程池状态为STOP或者为SHUTDOWN但是任务队列已空,那么就减少线程数量,返回null
  2. 如果当前线程数大于最大允许线程数或者已经超时了,同时保证任务队列已空或者还有其他线程存活以执行剩余任务的情况下,返回null
  3. 读取任务队列中的任务,因为是阻塞队列的原因,所以对于非核心线程或者设置了核心线程超时时间的情况,设置了读取超时。如果超时了就设置timedOuttrue,这个值在2中用到。

由上面runWorker方法可以得知,当getTask返回null的时候,线程结束。而当这个方法因为阻塞队列的原因阻塞时,线程也被阻塞了,所以这就是核心线程没有任务时仍能保持存活以及非核心线程可以设置存活时间的原因。

到此,以execute为引的ThreadPoolExecutor执行任务的全部流程就分析完了。

shutdown

shutdownshutdownNow的实现如下:

public void shutdown() {
    final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {
    checkShutdownAccess();  //Android中没有作用advanceRunState(SHUTDOWN);interruptIdleWorkers();onShutdown(); } finally {
    mainLock.unlock();}tryTerminate();}public List<Runnable> shutdownNow() {
    List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {
    checkShutdownAccess(); //Android中没有作用advanceRunState(STOP);interruptWorkers();tasks = drainQueue();} finally {
    mainLock.unlock();}tryTerminate();return tasks;}

可以看到,这两个方法的逻辑都差不多,就如状态的关系图所描述的:shutdown将线程池变为SHUTDOWN状态,中断空闲线程(阻塞在获取任务的时候的线程),调用onShutdown方法(默认实现是空方法)。调用tryTerminate尝试结束线程池;
shutdownNow将线程池变为STOP状态,中断所有工作线程,调用tryTerminate尝试结束线程池,返回任务队列中没有执行的任务。tryTerminate的作用是在情况允许的时候使线程池终止,即状态变为TERMINATED,它的代码如下:

final void tryTerminate() {
    for (;;) {
    int c = ctl.get();if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;if (workerCountOf(c) != 0) {
     // Eligible to terminateinterruptIdleWorkers(ONLY_ONE);return;}final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {
    if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
    try {
    terminated();} finally {
    ctl.set(ctlOf(TERMINATED, 0));termination.signalAll();}return;}} finally {
    mainLock.unlock();}// else retry on failed CAS}}

tryTerminate方法就是在RunningTIDYING(正在终止时)、SHUTDOWN且任务队列为空时,会直接退出,否则(SHUTDOWN 、任务队列已空或者STOP),如果还有工作线程在运行的话,就会中断空闲线程再退出,这种情况下,工作线程结束时会调用processWorkerExit方法,这个方法中会重新调用tryTerminate;如果以上情况都不满足,tryTerminate就会尝试将线程池状态设为TIDYING,调用terminated方法(默认实现为空),接着设置状态为TERMINATED,线程池完全停止。

到此为止,线程池的两条主要逻辑就分析完了。ThreadPoolExecutor的整个运行流程也有了个比较完整的描述。有个遗憾就是Worker类跟JUC关系很大,但是我对这一块没有了解过,只能跳过,等以后有时间深入学习了JUC之后再来补足这里了。

  相关解决方案