当前位置: 代码迷 >> 综合 >> java并发系列八---ThreadPoolExecutor
  详细解决方案

java并发系列八---ThreadPoolExecutor

热度:50   发布时间:2023-12-22 01:59:30.0

线程池的意义
在讲解线程池之前,有些读者可能存在这样的疑惑:为什么需要线程池,线程池有什么优越性?

关于这个问题,主要从两个角度来进行解答:
?减少开销
在大部分JVM上,用户线程与操作系统内核线程是1:1的关系,也就是说每次创建回收线程都要进行内核调用,开销较大。那么有了线程池,就可以重复使用线程资源,大幅降低创建和回收的频率。此外,也能一定程度上避免有人在写BUG时,大量创建线程导致资源耗尽。

?便于管理
线程池可以帮你维护线程ID,线程状态等信息,也可以帮你统计任务执行状态等信息。

理解了线程池的意义,那么本文的主角便是JUC提供的线程池组件:ThreadPoolExecutor
请注意,有人会将JUC中的ThreadPoolExecutor与Spring Framework中的ThreadPoolTaskExexutor混淆。这是两个不同的组件,ThreadPoolTaskExecutor可以理解为对ThreadPoolExexutor做的一层封装,主要就是为了支持线程池的Bean化,将其交给Spring Context来管理,防止滥用线程池。而内部的核心逻辑还是由ThreadPoolExexutor处理。关于这一点,简单理解即可。

首先尝试用一句话对ThreadPoolExecutor进行概括://todo

从宏观上看,开发者将任务提交给ThreadPoolExecutor,ThreadPoolExecutor分配工作线程(Worker)来执行任务,任务完成后,工作线程回到ThreadPoolExecutor,等待后续任务。

根据这段描述,产生了三个比较值得探究的问题:
1.ThreadPoolExecutor自身有哪些状态,如何维护这些状态?
2.ThreadPoolExecutor如何维护内部的工作线程?
3.ThreadPoolExecutor处理任务的整体逻辑是什么样的?

继承关系
ThreadPoolExecutor继承了AbstractExecutorService,AbstractExecutor实现了ExecutorService接口。ExecutorService接口继承了Executor接口,整体呈现出了这样的关系:

ThreadPoolExecutor→AbstractExecutorService→ExecutorService→Exector

从上往下来看

 

Executor接口中只声明了一个execute方法,用于执行提交的任务

ExecutorService扩展了Executor的语义,增加了多种多样的操作。

 

而AbstractExecutorService则是对ExecutorService中声明的方法进行默认实现,方便子类进行调用。比如ThreadPoolExexutor就直接使用了AbstractExecutorService的submit方法。AbstractExecutorService也是一个比较核心的类,但它不是本文的重点,所以不会详细讲解。

静态变量


这几个变量很关键,在注释中也已经有了比较详细的解释,英文还不错的同学不妨通读一遍。我这里就以更直白的方式加以介绍,顺便帮你温习一些计算机基础知识。

 

先看下半部分的五个变量,从命名上可以判定这些值代表了ThreadPoolExecutor的状态。

这些状态值涉及了二进制移位操作,我们知道int类型在Java中的二进制表示是以补码存储的,关于原码反码补码的基础知识这里不展开解释。那么-1的二进制表示是32个1的序列,COUNT_BITS值是常数,为32-3=29。因此RUNNING的二进制表示是高三位为111,低29位都为0的序列。

我们用同样的方式表示出其余四个状态:

 

不难发现,这五个线程可以理解为目前只用到了高三位,这是因为ThreadPoolExecutor只用一个int变量来同时保存线程池状态以及工作线程数的这两个信息,线程状态使用高三位,工作线程数使用低29位。CAPACITY这个变量就表示为工作线程的最大数量。

 

这种将两种状态存储在一个二进制序列中的做法,在业务代码中相对比较少见,在底层源码中很常见。比如ReentrantReadWriteLock中,用一个int来组合表示读锁和写锁的个数,比如在ZooKeeper中,用一个long来组合表示epoch和事务个数。

这几种状态的含义是:
-RUNNING:接受新任务,也能处理阻塞队列里的任务。
-SHUTDOWN:不接受新任务,但是处理阻塞队列里的任务。
-STOP:不接受新任务,不处理阻塞队列里的任务,中断处理过程中的任务。
-TIDYING:当所有的任务都执行完了,当前线程池已经没有工作线程,这时线程池将会转换为TIDYING状态,并且将要调用terminated方法。
-TERMINATED:terminated方法调用完成。

这几个状态之间的变化如图所示:

 

 

首先着重介绍的是AtomicInteger类型的属性ctl.
ctl就是上文所说的,组合了线程池状态以及池中工作线程数两个信息的变量。它初始化时调用了ctlOf方法,可以看到ctlOf只是一个或操作。这就说明,线程池在初始化时,状态被标记为RUNNING,工作线程数为0.

读到这里,有一些读者可能会存在疑惑:为啥非要用一个int值来组合表示两种状态?用两个值表示,清清楚楚不行吗?

可以,当然可以。但使用一个变量的好处是:如果需要对两个状态值进行同步修改,直接通过位操作就可以了,省去了加锁操作。因为在操作系统级别,对int的修改本身就是原子的。顺便提一下,像64位的double,long。在32位操作系统上对它们的操作不是原子的,可能出现半读半写问题。

再来看看其他属性,属性往往会透露出这个类是如何组织的。


一个个属性来看。每个属性其实在源码里都有注释,有兴趣的读者不妨去通读一遍,我这边简单讲解一下。

 

workQueue
类型是BlockringQueue接口,用来存储积压任务的阻塞队列,具体实现类可以由用户自己定义。细节上有一个疑惑点:BlockingQueue的泛型是Runnable,难道Callable类型的任务就无法进入该阻塞队列吗?这是因为父类AbstractExecutorService将会把Callable对象转换为Runnable的子类FutureTask,所以阻塞队列的泛型是Runnable没问题。
mainLock
ReentrantLock类型,可以料想对线程池的一些操作需要状态同步,所以需要用到锁。具体在哪里用到下文再看。

workers
HashSet类型,泛型是内部类Worker。Worker这个内部类可以视为对工作线程以及一些状态的封装,workers是用来存储所有Worker的集合。

termination
由mainLock创建的Condition,看变量名应该是用于terminal调用时的线程同步。


largestPoolSize
线程池中最多有过多少个活跃线程。

completedTaskCount
线程池总共处理了多少任务

threadFactory
类型为ThreadFactory接口,用户可以自定义创建工作线程的工厂。

handler
拒绝策略,当workQueue满载时将会触发。

keepAliveTime
工作线程空闲时则保持存活的时间。

allowCoreThreadTimeOut
布尔类型,是否需要保持核心线程始终处于存活。

corePoolSize
核心线程数。可以看做稳定的工作线程数量,当阻塞队列还未满载时,线程池将保持核心线程数。

maximumPoolSize
最大线程数。可以看做弹性的工作线程数量,当阻塞队列满载时,线程池将会在核心线程数的基础上创建新线程来处理任务,直到最大线程数。

其中拒绝策略,核心线程数,最大线程数这三个变量比较重要,在下文中也会重点讲到。

 

 

继承关系:
继承AQSL:说明Worker内部存在同步需求
实现Runnable:Worker本身就是也跟异步的任务调度者

当某个Worker从workQueue中获取一个任务后,便持有锁,直到将任务在当前线程内执行完后,再释放锁,获取新的任务。有的读者可能会有疑惑:这里根本不用锁呀,是不是有点多此一举。事实上,这里加锁的作用是表示Worker是否处于工作中,不接受中断信号。

方法:
三个属性就不用进一步介绍了,注释写的很明确。在构造函数中,将AQS的state初始化为-1,之所以不初始化为0,是为了在初始化期间不接受中断信号,直到runWorker方法开始运行,即工作线程真的开始处理任务,state将会被修改为0,此时相当于锁被释放的状态,可以接受中断信号。这部分逻辑可以从interruptlfStarted中理解。

run
来看Worker中最主要的run方法,其实也就是runWorker方法,该方法在Worker启动时便会调用。runWorker的主要逻辑在第8行开始的while循环。

 

 

 

 

若当前线程从workQueue中获取任务,首先加锁,这里加锁不是为了防止并发,而是标记当前工作线程正在执行任务,不接受中断信号。

接下来14-18行用于判断当前线程池状态以及线程中断状态,大致意思就是若当前线程状态>=stop(非running shutdown状态),则对当前线程调用中断(当线程正在运行时,对线程调用中断只是标记将要中断的状态,线程不会立即中断)

19-37行是对任务本身的处理,有两个细节:
调用了task的run方法而不是start方法,表示依然在当前线程中处理,而非新启线程 
在task.run()方法的前后,有beforeExecute和afterExecute这两个方法,相当于线程池给每个任务都进行了切面。

33-37行,完成任务数++,并释放锁,没什么问题

若while判断条件中的getTask方法返回了null,那么将会跳出循环,调用finally块中的processWorkerExit方法来对Worker进行回收。也就是说getTask方法如果返回null,那将会触发回收当前worker的行为。

这里需要了解的一点是:当workQueue中不存在排队任务时,不一定会直接返回null,更有以下两种情况:

工作线程阻塞等待,结合之前的信息,如果工作线程被标记中断,并且进入阻塞状态的话,那么将会触发中断,代表该worker需要被回收,此时getTask将会返回null。
线程池当前状态需要回收worker,此时getTask将会返回null

getTask
带着这个预期,我们点开getTask方法来看。

第2行
timedOut这个bool值变量用于记录上一次从阻塞队里poll任务是否超时,接下来第4行开始就是一个循环。

5-12行
获取当前线程池的状态,若线程池的状态已经是STOP,TIDYING,TERMINATED或者是SHUTDOWN且工作队列为空,那么返回null,代表当前worker可以回收,符合我们的预期。


14-24行
接下来,wc就是当前worker数量,allowCoreThreadTimeOut这个变量我们最开始说过,代表是否需要保持核心线程空闲时始终存活,默认是false。那么这个逻辑就是,如果当前工作线程数超过了最大线程数,或达到了核心线程数的回收条件,且池中还有其他工作线程在工作或workQueue时,则开始尝试回收当前worker.符合我们的预期。

26-35行
如果不满足上述两种回收条件,那么就开始从阻塞队列里获取任务,不同的是,poll操作在队列为空的时候,将直接返回null,而take操作将会等待,直到队列中有任务可以被取出。


这里有个细节,poll的超时时间keepAliveTime,即为我们最开始介绍的空闲线程的回收时间。也就是说,既然队列里无任务需要处理,那么也就代表该线程空闲,可以尝试进行新一轮的回收判断。

getTask这个方法我认为是线程池动态维护工作线程的核心。设计比较巧,当我们在业务中自己处理一些复杂的生产-消费问题时,可以借鉴这种思路。

方法
execute
看完了内部类,接下来便是ThreadPoolExecutor最重要的execute方法。

 

我们来看execute方法。注释告诉我们当你向线程池提交一个任务,线程池可能会创建一个线程来处理,也可能使用已有的线程处理。当然,不一定会立即处理,因为可能会被队列缓存起来。如果队列也已经满了,那么将会触发拒绝机制。

execute这段逻辑呢,我猜很多同学都听说过,事实上我们一开始介绍核心线程数和最大线程数的时候也简单提到了。我将这段逻辑画成了一张流程图,可以辅助理解。

 

说当向线程池提交一个任务,如果当前线程数小于核心线程数,那么就新增worker。由于上层可能存在并发提交任务的情况,那么这里很可能会由于核心线程数的限制而导致新增失败。当新增工作线程失败,则进入下面的流程。向阻塞队列offer一个任务,如果阻塞队里已满,那么继续尝试创建worker,不过此时创建的不是核心线程,而是奔着最大线程数去的,如果已经达到了最大线程数,那么触发拒绝策略。

而如果成功提交到了阻塞队列,这时再判断线程池的状态,如果处于非RUNNING状态,那么尝试移除任务,如果成功移除该任务,就触发拒绝策略。如果移除失败了,那就给这个任务一个被执行的机会,尝试新增worker去消费阻塞队列里的任务。

reject方法会调用具体的拒绝策略,ThreadPoolExecutor提供了默认的四种拒绝策略,当然也可以自定义拒绝策略,这些都是可以在ThreadPoolExecutor的构造方法中指定的。

这段逻辑我相信很多博客都已经讲过了,很多人面试时也被问到过。我们今天呢要关注更深入一点的东西。可以猜想,在addWorker的时候,一定会有并发问题需要处理,我们来看看是如何处理的。

 

 

一开始一个大的循环我们暂时先不看,这段逻辑主要就是对当前线程池的一系列状态进行判断,判断当前时刻下是否还需要创建worker。当这个循环执行完并且没有退出,准确来说也就是这里CAS操作成功,说明才真的有机会创建新的worker.

首先new一个Worker,Worker的构造器我们之前讲过了其中一个细节。下面例行公事判断亦喜爱,worker内的Thread对象是否为null,我觉得这里可能仅仅是谨慎而已,因为worker是刚构造出来的,其中的属性thred应该是不可能为null。

下面就要获取独占锁,之前也说到了,这里很可能出现多线程创建worker的情况,为了防止突破核心线程数和最大线程数的限制,这里必然要加锁。下面是对线程池状态的判断,若处于这两种情况:
如果线程池处于非RUNNING的状态
线程池状态为SHUTDOWN且当前worker是为了用于消费阻塞队列里缓存的任务。

便向worker set中新增该worker,此时释放锁。然后工作开始执行,如果t.start出现异常,将会进入finally块进行清理操作。

这里细心的同学会发现一个严重的问题,那就是若当前线程释放锁,其他线程获取锁进行worker的插入时,居然没有对当前线程数的判断,那岂不是很可能会导致实际worker数超过预设的最大线程数?我们再来重新翻看上面的逻辑。

事实上,上面两段嵌套的循环已经进行了限制,当且仅当只有CAS成功的线程才能跳出这个循环,也就是无论什么时刻,都只能有一个线程执行下面创建worker的逻辑,而其他线程都将自旋等待,这就已经保证了线程安全。

既然创建worker的操作已经是线程安全了,为什么这里还要获取mainLock呢?这是为了同步对workers这个hash set的操作。因为hash set本身不是线程安全的,当这里正在向workers中add worker,其他地方很可能正在对workers进行remove操作。事实上workers这个属性上方的注释也已经说明了,当操作该属性时,需要获取mainLock。

因此既保证了新建worker操作是同步的,也保证了对workers执行add操作的线程安全。

  相关解决方案