以下基于JDK 1.8 进行分析
文章目录
- 1 简介
- 2. 创建线程池
- 3. 线程池运行
-
- 3.1 预启动核心线程
-
- 3.1.1 prestartCoreThread
- 3.1.2 prestartAllCoreThreads
- 3.2 提交任务的方式
-
- 3.2.1 execute
- 3.2.2 submit 系列方法
- 3.2.3 其他说明
- 3.3 工作线程
-
- 3.3.1 工作线程任务执行过程
- 3.3.2 工作线程什么时候退出、运行结束
- 4 结束线程池
-
- 4.1 shutdown()
- 4.2 shutdownNow()
- 5. 问题
-
- 5.1 工作线程
- 5.2 在停止线程池时,怎么区分那些工作线程是空闲的?
1 简介
实现了AbstractExecutorService 抽象类,提供了创建线程池、设置线程池属性、提交任务、停止线程池等基本操作接口。下面操作类型对 ThreadPoolExecutor 进行分析。
2. 创建线程池
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)
创建线程池对象,可根据实际需要,自定义线程池特性。初始化过程,没有任何特殊化操作,只是一些基础变量的初始化。
创建完成后,也没有什么特殊的效果,刚初始化的线程池对象中不包含任何线程。
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}
备注:如果觉得使用该自定义方式创建线程池比较复杂繁琐(需要传入众多的参数),则可以使用 Executors 工具类来创建固定属性的线程池,包括:
-
newSingleThreadExecutor :创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
-
newFixedThreadPool :创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
-
newScheduledThreadPool :创建一个可定期或者延时执行任务的定长线程池,支持定时及周期性任务执行。
-
newCachedThreadPool :创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
3. 线程池运行
在创建线程池后,线程池就启动了(没有类似的start 函数,创建即启动)。但是里面没有任何线程。启动线程运行任务有两种方式:
3.1 预启动核心线程
3.1.1 prestartCoreThread
public boolean prestartCoreThread()public boolean prestartCoreThread() {return workerCountOf(ctl.get()) < corePoolSize &&addWorker(null, true);}
如果当前线程池中的数量 小于 核心线程数,则创建一个线程,加入到线程池中,然后调用线程的start 方法,启动线程。
3.1.2 prestartAllCoreThreads
启动所有的核心线程。循环调用prestartCoreThread,直到
prestartCoreThread 调用不成功(已经达到核心线程数 或者 线程池已经关闭了)。
3.2 提交任务的方式
提交任务 和 预启动的区别就是:预启动 核心线程的时候,由于没有任务,则核心线程是处于空转的状态。并且,预启动只能启动核心线程数,当到达 核心线程数,则是没有作用的。
3.2.1 execute
public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task. The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread. If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();// 如果当前线程数 小于 核心线程数,则启动一个新的线程,运行该任务if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();//如果线程池在关闭,则将任务从任务队列中删除,并执行拒绝任务流程;if (! isRunning(recheck) && remove(command))reject(command);//再次判定,在这个时候,是否之前的线程死了,则新启动一个线程(避免有任务,无工作线程的情况)else if (workerCountOf(recheck) == 0)addWorker(null, false);//其他case: 线程池处于运行状态,且当前线程数大于核心线程数,且任务队列没有满,//则将任务加入到任务队列中}//线程池处于运行状态,且当前线程数大于核心线程数,且任务队列满了,则尝试新启动一个线程//如果启动失败,则拒绝任务else if (!addWorker(command, false))reject(command);}
3.2.2 submit 系列方法
一般形式的submit 方法如下:
public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}
实际上,submit 方法可接收 runnable, 也可接收 callable 对象。
submit 系列方法核心 要点还是调用了execute ,只是封装了一层,可以返回future 对象,可查询任务状态、结果。
3.2.3 其他说明
创建先的工作线程可能失败的case:
- 达到线程池限制(预启动时,是到达 核心线程数;其他情况,达到最大线程数限制);
- 线程池已经停止了了,或者在 缓慢 shutdown 过程中。
- 设置的线程池工厂 创建线程错误,例如系统内存溢出等情况;
3.3 工作线程
工作线程的定义:
private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{final Thread thread;Runnable firstTask;。。。。
3.3.1 工作线程任务执行过程
工作线程继承了runnable 接口,直接看接口的run 函数:
public void run() {runWorker(this);}final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {//如果初始任务不为空 或者能取到任务(任务队列不为空)while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted. This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//默认实现是空的,可自定义为记录日志等其他功能beforeExecute(wt, task);Throwable thrown = null;try {//调用任务的run函数,执行任务task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//空函数afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {//退出了主循环,该线程结束。会进行任务统计,且如果线程数达不到要求,会重新启动一个工作线程。processWorkerExit(w, completedAbruptly);}}
线程池中的任务由工作线程完成。工作线程的运行逻辑:
- 运行初始任务(如果有的话。使用预启动函数创建的工作线程没有初始任务);
- 不停的从任务队列中取任务,调用任务的run函数,执行任务;
3.3.2 工作线程什么时候退出、运行结束
从上面代码中可以看到,工作线程一般情况下,是不会退出的,退出工作循环只有一个条件:执行抛出了异常。抛出异常又可分为两种情况:
- 当线程池关闭时,任务会收到中断 异常;
- 本身任务抛出异常;
4 结束线程池
ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:
4.1 shutdown()
不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();//设置线程池的状态为关闭,则不会接受新的任务advanceRunState(SHUTDOWN);//中断那些空闲的进程interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate();}
4.2 shutdownNow()
立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
5. 问题
5.1 工作线程
线程池中的工作线程 被放入到 一个哈HashSet中。当创建新的线程或者线程死亡时,通过锁 来修改 workers 集合。
private final HashSet<Worker> workers = new HashSet<Worker>();private final ReentrantLock mainLock = new ReentrantLock();
那为什么不直接使用线程安全的集合?
* Lock held on access to workers set and related bookkeeping.* While we could use a concurrent set of some sort, it turns out* to be generally preferable to use a lock. Among the reasons is* that this serializes interruptIdleWorkers, which avoids* unnecessary interrupt storms, especially during shutdown.* Otherwise exiting threads would concurrently interrupt those* that have not yet interrupted. It also simplifies some of the* associated statistics bookkeeping of largestPoolSize etc. We* also hold mainLock on shutdown and shutdownNow, for the sake of* ensuring workers set is stable while separately checking* permission to interrupt and actually interrupting.
没有看懂文档上的说明
5.2 在停止线程池时,怎么区分那些工作线程是空闲的?
所谓忙碌的工作线程,即取到任务(从任务队列获取到任务 或者 有初始任务),准备执行的工作线程。
当工作线程有了任务后,会给自己上锁:
final void runWorker(Worker w) {.......while (task != null || (task = getTask()) != null) {// 上锁w.lock();。。。。。
}
worker 继承了AbstractQueuedSynchronizer,当执行worker.lock 后,如果不执行unlock ,则其他地方是获取不到锁的。当worker 执行完取到的单个任务后,会调用 unlock 函数。
当调用shutdown 函数停止 空闲工作线程的时候,会尝试调用worker 的 trylock 函数。如上所说,如果工作线程取到了任务,则会进行lock 操作,即此时 shutdown 中,对该工作程序执行trylock 不成功,则不会发送interrupt 消息。
for (Worker w : workers) {Thread t = w.thread;// 执在执行任务的工作线程 trylock或失败if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}