当前位置: 代码迷 >> 综合 >> Java线程池:ThreadPoolExecutor
  详细解决方案

Java线程池:ThreadPoolExecutor

热度:21   发布时间:2024-03-06 03:37:07.0

以下基于JDK 1.8 进行分析

文章目录

  • 1 简介
  • 2. 创建线程池
  • 3. 线程池运行
    • 3.1 预启动核心线程
      • 3.1.1 prestartCoreThread
      • 3.1.2 prestartAllCoreThreads
    • 3.2 提交任务的方式
      • 3.2.1 execute
      • 3.2.2 submit 系列方法
      • 3.2.3 其他说明
    • 3.3 工作线程
      • 3.3.1 工作线程任务执行过程
      • 3.3.2 工作线程什么时候退出、运行结束
  • 4 结束线程池
    • 4.1 shutdown()
    • 4.2 shutdownNow()
  • 5. 问题
    • 5.1 工作线程
    • 5.2 在停止线程池时,怎么区分那些工作线程是空闲的?

1 简介

实现了AbstractExecutorService 抽象类,提供了创建线程池、设置线程池属性、提交任务、停止线程池等基本操作接口。下面操作类型对 ThreadPoolExecutor 进行分析。

2. 创建线程池

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

创建线程池对象,可根据实际需要,自定义线程池特性。初始化过程,没有任何特殊化操作,只是一些基础变量的初始化。

创建完成后,也没有什么特殊的效果,刚初始化的线程池对象中不包含任何线程。

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;}

备注:如果觉得使用该自定义方式创建线程池比较复杂繁琐(需要传入众多的参数),则可以使用 Executors 工具类来创建固定属性的线程池,包括:

  • newSingleThreadExecutor :创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

  • newFixedThreadPool :创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。

  • newScheduledThreadPool :创建一个可定期或者延时执行任务的定长线程池,支持定时及周期性任务执行。

  • newCachedThreadPool :创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。

3. 线程池运行

在创建线程池后,线程池就启动了(没有类似的start 函数,创建即启动)。但是里面没有任何线程。启动线程运行任务有两种方式:

3.1 预启动核心线程

3.1.1 prestartCoreThread

public boolean prestartCoreThread()public boolean prestartCoreThread() {return workerCountOf(ctl.get()) < corePoolSize &&addWorker(null, true);}

如果当前线程池中的数量 小于 核心线程数,则创建一个线程,加入到线程池中,然后调用线程的start 方法,启动线程。

3.1.2 prestartAllCoreThreads

启动所有的核心线程。循环调用prestartCoreThread,直到
prestartCoreThread 调用不成功(已经达到核心线程数 或者 线程池已经关闭了)。

3.2 提交任务的方式

提交任务 和 预启动的区别就是:预启动 核心线程的时候,由于没有任务,则核心线程是处于空转的状态。并且,预启动只能启动核心线程数,当到达 核心线程数,则是没有作用的。

3.2.1 execute

    public void execute(Runnable command) {if (command == null)throw new NullPointerException();/** Proceed in 3 steps:** 1. If fewer than corePoolSize threads are running, try to* start a new thread with the given command as its first* task.  The call to addWorker atomically checks runState and* workerCount, and so prevents false alarms that would add* threads when it shouldn't, by returning false.** 2. If a task can be successfully queued, then we still need* to double-check whether we should have added a thread* (because existing ones died since last checking) or that* the pool shut down since entry into this method. So we* recheck state and if necessary roll back the enqueuing if* stopped, or start a new thread if there are none.** 3. If we cannot queue task, then we try to add a new* thread.  If it fails, we know we are shut down or saturated* and so reject the task.*/int c = ctl.get();// 如果当前线程数 小于 核心线程数,则启动一个新的线程,运行该任务if (workerCountOf(c) < corePoolSize) {if (addWorker(command, true))return;c = ctl.get();}if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();//如果线程池在关闭,则将任务从任务队列中删除,并执行拒绝任务流程;if (! isRunning(recheck) && remove(command))reject(command);//再次判定,在这个时候,是否之前的线程死了,则新启动一个线程(避免有任务,无工作线程的情况)else if (workerCountOf(recheck) == 0)addWorker(null, false);//其他case: 线程池处于运行状态,且当前线程数大于核心线程数,且任务队列没有满,//则将任务加入到任务队列中}//线程池处于运行状态,且当前线程数大于核心线程数,且任务队列满了,则尝试新启动一个线程//如果启动失败,则拒绝任务else if (!addWorker(command, false))reject(command);}

3.2.2 submit 系列方法

一般形式的submit 方法如下:

    public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}

实际上,submit 方法可接收 runnable, 也可接收 callable 对象。

submit 系列方法核心 要点还是调用了execute ,只是封装了一层,可以返回future 对象,可查询任务状态、结果。

3.2.3 其他说明

创建先的工作线程可能失败的case:

  • 达到线程池限制(预启动时,是到达 核心线程数;其他情况,达到最大线程数限制);
  • 线程池已经停止了了,或者在 缓慢 shutdown 过程中。
  • 设置的线程池工厂 创建线程错误,例如系统内存溢出等情况;

3.3 工作线程

工作线程的定义:

private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{final Thread thread;Runnable firstTask;。。。。

3.3.1 工作线程任务执行过程

工作线程继承了runnable 接口,直接看接口的run 函数:

 public void run() {runWorker(this);}final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // allow interruptsboolean completedAbruptly = true;try {//如果初始任务不为空 或者能取到任务(任务队列不为空)while (task != null || (task = getTask()) != null) {w.lock();// If pool is stopping, ensure thread is interrupted;// if not, ensure thread is not interrupted.  This// requires a recheck in second case to deal with// shutdownNow race while clearing interruptif ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//默认实现是空的,可自定义为记录日志等其他功能beforeExecute(wt, task);Throwable thrown = null;try {//调用任务的run函数,执行任务task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {//空函数afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {//退出了主循环,该线程结束。会进行任务统计,且如果线程数达不到要求,会重新启动一个工作线程。processWorkerExit(w, completedAbruptly);}}

线程池中的任务由工作线程完成。工作线程的运行逻辑:

  • 运行初始任务(如果有的话。使用预启动函数创建的工作线程没有初始任务);
  • 不停的从任务队列中取任务,调用任务的run函数,执行任务;

3.3.2 工作线程什么时候退出、运行结束

从上面代码中可以看到,工作线程一般情况下,是不会退出的,退出工作循环只有一个条件:执行抛出了异常。抛出异常又可分为两种情况:

  • 当线程池关闭时,任务会收到中断 异常;
  • 本身任务抛出异常;

4 结束线程池

ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:

4.1 shutdown()

不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务

  public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();//设置线程池的状态为关闭,则不会接受新的任务advanceRunState(SHUTDOWN);//中断那些空闲的进程interruptIdleWorkers();onShutdown(); // hook for ScheduledThreadPoolExecutor} finally {mainLock.unlock();}tryTerminate();}

4.2 shutdownNow()

立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

5. 问题

5.1 工作线程

线程池中的工作线程 被放入到 一个哈HashSet中。当创建新的线程或者线程死亡时,通过锁 来修改 workers 集合。

private final HashSet<Worker> workers = new HashSet<Worker>();private final ReentrantLock mainLock = new ReentrantLock();

那为什么不直接使用线程安全的集合?

 * Lock held on access to workers set and related bookkeeping.* While we could use a concurrent set of some sort, it turns out* to be generally preferable to use a lock. Among the reasons is* that this serializes interruptIdleWorkers, which avoids* unnecessary interrupt storms, especially during shutdown.* Otherwise exiting threads would concurrently interrupt those* that have not yet interrupted. It also simplifies some of the* associated statistics bookkeeping of largestPoolSize etc. We* also hold mainLock on shutdown and shutdownNow, for the sake of* ensuring workers set is stable while separately checking* permission to interrupt and actually interrupting.

没有看懂文档上的说明

5.2 在停止线程池时,怎么区分那些工作线程是空闲的?

所谓忙碌的工作线程,即取到任务(从任务队列获取到任务 或者 有初始任务),准备执行的工作线程。

当工作线程有了任务后,会给自己上锁:

 final void runWorker(Worker w) {.......while (task != null || (task = getTask()) != null) {// 上锁w.lock();。。。。。
}                

worker 继承了AbstractQueuedSynchronizer,当执行worker.lock 后,如果不执行unlock ,则其他地方是获取不到锁的。当worker 执行完取到的单个任务后,会调用 unlock 函数。

当调用shutdown 函数停止 空闲工作线程的时候,会尝试调用worker 的 trylock 函数。如上所说,如果工作线程取到了任务,则会进行lock 操作,即此时 shutdown 中,对该工作程序执行trylock 不成功,则不会发送interrupt 消息。

for (Worker w : workers) {Thread t = w.thread;// 执在执行任务的工作线程 trylock或失败if (!t.isInterrupted() && w.tryLock()) {try {t.interrupt();} catch (SecurityException ignore) {} finally {w.unlock();}}if (onlyOne)break;}
  相关解决方案