当前位置: 代码迷 >> 综合 >> 线程池(ThreadPoolExecutor)分析
  详细解决方案

线程池(ThreadPoolExecutor)分析

热度:49   发布时间:2023-11-25 13:40:29.0

说明:本文是我对自己学习知识的一个简单总结,可能存在许多不足,我希望通过此方式来回顾知识,加强理解,也希望大家能指出文中的错误与不足,互相学习,谢谢。

1.什么是线程池

线程池顾名思义是一个线程缓存的‘池子’。线程是稀缺资源,线程如果创建的太多,会消耗系统的资源,还会降低系统的稳定性,所以java中通过线程池来统一管理分配线程这个稀缺的资源,达到资源重复利用。

2.线程池的出现

在web系统中,服务器需要接受大量的并发请求,一个请求就会对应一个线程,如果并发的请求很多,但每个线程执行的时间很短,这样系统就会频繁的创建和销毁线程,系统的性能就会受到影响。那么是否存在一种方式,线程执行任务后并不销毁,而是重复利用?这就是线程池出现的目的。

3.Executor框架

Executor框架是线程池的基础接口,里面定义了一个execute()方法,如下:

void execute(Runnable command);

从方法中看出,execute方法参数是Runnable接口的实现,而Runnable接口是线程实现的接口,所有execute执行的是一个线程任务。

4.ThreadPoolExecutor

ThreadPoolExecutor是线程池实现的关键类,接下来我们分析下线程池的具体实现。

//ctl存储的是线程运行的状态和线程池内有效的线程数量。前3位代表运行状态,后29位
//代表线程数量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// 这里是线程池中的5种状态
// 运行状态
private static final int RUNNING    = -1 << COUNT_BITS;//111
// 线程池处于该状态时不再接受新任务,但可以继续处理已添加的任务
private static final int SHUTDOWN   =  0 << COUNT_BITS;//000
// 线程池处于该状态时不再接受新任务,也不继续执行已添加任务
private static final int STOP       =  1 << COUNT_BITS;//001
// 线程池处于该状态时,所有任务已终止,ctl任务数量为0
private static final int TIDYING    =  2 << COUNT_BITS;//010
// 线程池彻底终止
private static final int TERMINATED =  3 << COUNT_BITS;//011// 得到运行状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 得到线程工作数量
private static int workerCountOf(int c)  { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

4.1线程池创建

  • 线程池ThreadPoolExecutor构造方法
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler)

接下来对这些参数做简单说明:

  • corePoolSize: 线程池中的核心线程数
  • maximumPoolSize: 线程池中允许的最大线程数
  • keepAliveTime: 当线程池中的线程数量大于corePoolSize的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了keepAliveTime
  • unit: keepAliveTime的单位
  • workQueue: 用来保存等待被执行的任务的阻塞队列,且任务必须实现Runable接口,在JDK中提供了如下阻塞队列:
    1、ArrayBlockingQueue:基于数组结构的有界阻塞队列,按FIFO排序任务;
    2、LinkedBlockingQuene:基于链表结构的阻塞队列,按FIFO排序任务,吞
    吐量通常要高于ArrayBlockingQuene;
    3、SynchronousQuene:一个不存储元素的阻塞队列,每个插入操作必须等到
    另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于
    LinkedBlockingQuene;
    4、priorityBlockingQuene:具有优先级的无界阻塞队列;

吞吐量:是指对网络、设备、端口、虚电路或其他设施,单位时间内成功地传送数据的数量(以比特、字节、分组等测量)

  • threadFactory: 创建线程的工程
  • handler: 线程池的饱和策略,当阻塞队列满了,且没有空闲的工作线程,如果继续提交任务,必须采取一种策略处理该任务,线程池提供了4种策略:
    1、AbortPolicy:直接抛出异常,默认策略;
    2、CallerRunsPolicy:用调用者所在的线程来执行任务;
    3、DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
    4、DiscardPolicy:直接丢弃任务;

4.2 线程池执行

线程池中通过两种方式提交线程任务

  1. public void execute() //提交任务无返回值
  2. public Future<?> submit() //任务执行完成后有返回值

4.3 execute()方法分析

线程池通过execute()方法提交任务,分为四大步:

  1. 如果线程数量小于核心线程数,那么创建一个线程并执行任务。
  2. 如果线程池数量大于核心线程数量,小于最大线程数量,那么将新提交的任务加入到工作队列中。
  3. 当工作队列中任务已经满了,线程数量小于最大线程数时,创建新的线程,去执行任务。
  4. 当线程数大于线程池中最大线程数,并且队列已满,则根据拒绝策略来处理任务,默认直接抛出异常。
public void execute(Runnable command) {if (command == null)throw new NullPointerException();int c = ctl.get();// 1.判断线程数量是否小于核心线程if (workerCountOf(c) < corePoolSize) {// 创建一个线程去执行任务if (addWorker(command, true))return;c = ctl.get();}// 2. 判断线程池是否运行,任务是否添加到队列中if (isRunning(c) && workQueue.offer(command)) {int recheck = ctl.get();if (! isRunning(recheck) && remove(command))reject(command);else if (workerCountOf(recheck) == 0)addWorker(null, false);}// 3. 创建新线程去执行任务else if (!addWorker(command, false))// 4. 拒绝策略reject(command);
}
  • 由上面分析可知,线程通过addWorker()方法去添加任务并执行,接下来分析该方法
private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();int rs = runStateOf(c);/*如果rs >= SHUTDOWN 表示不再接收新任务接下来三个条件只要有一个不满足则返回false1.线程池状态为SHUTDOWN2. 任务为空3. 任务队列不为空分析下这三种情况:3.1当rs==SHUTDOWN时,表示不再接收新任务,所以firstTask!=null时返回false3.2当firstTask==null,workQueue也为空时,返回false,因为队列中没有任务了,不需要再创建线程*/if (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {// 得到工作线程数int wc = workerCountOf(c);// 判断是否大于最大容量或者核心线程数if (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// 增加线程数量 ctl.compareAndSet(expect, expect + 1)if (compareAndIncrementWorkerCount(c))break retry;// 如果增加workerCount失败,则重新获取ctl的值c = ctl.get();// 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个 for循环继续执行if (runStateOf(c) != rs)continue retry;}}// 线程启动的标记boolean workerStarted = false;// 任务添加到队列中的标记boolean workerAdded = false;Worker w = null;try {// 创建一个worker线程,每实例化一个worker都会创建一个线程w = new Worker(firstTask);final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int rs = runStateOf(ctl.get());/**  rs < SHUTDOWN代表运行状态,如果线程池是运行状态,或者线程池为SHUTDOWN*  并且firstTask == null,则添加线程到线程池中因为在SHUTDOWN时不会在添加*  新的任务,但还是会执行 workQueue中的任务*/if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 线程存活则抛出异常,因为这里线程还没有启动运行if (t.isAlive())throw new IllegalThreadStateException();// 添加线程到集合 workers是一个HashSet workers临界资源,上面加了锁workers.add(w);int s = workers.size();if (s > largestPoolSize)// 记录线程池中出现的最大线程数largestPoolSize = s;workerAdded = true;}} finally {mainLock.unlock();}//线程添加到HashSet集合后启动线程if (workerAdded) {t.start();workerStarted = true;}}} finally {if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}
  • 从上面方法知道,真正的线程创建通过Worker类来实现,接下来分析一下该类:
    Worker实现了Runnable接口,继承了AQS(有关AQS的可以参考我的这篇文章:传送门),所以一个Worker对象在启动的时候会调用Worker类的run方法。
Worker(Runnable firstTask) {// 设置状态为-1,AQS中state默认的值是0,在runWorker方法前禁止中断,因为刚创建的Worker对象// 还没有执行,就不应该被中断setState(-1);this.firstTask = firstTask;// 创建线程this.thread = getThreadFactory().newThread(this);
}
  • 分析下Worker中的run方法
public void run() {runWorker(this);
}
//该方法主要是运行任务
final void runWorker(ThreadPoolExecutor.Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock(); // 允许中断,将worker构造方法中的state状态值设置为0boolean completedAbruptly = true;try {//如果task为空则通过getTask()方法去获取任务(从队列中获取)while (task != null || (task = getTask()) != null) {w.lock();if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {//执行任务前的钩子函数beforeExecute(wt, task);Throwable thrown = null;try {// 执行任务的run方法task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// 任务执行后的方法afterExecute(task, thrown);}} finally {task = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}
}
  • 由上面分析知,队列中的任务通过getTask()方法去获取执行,这里再分析下该方法:
private Runnable getTask() {//表示上次从阻塞队列中取任务时是否超时boolean timedOut = false;for (;;) {int c = ctl.get();int rs = runStateOf(c);// 如果线程池停止,那么线程数量递减if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {/*根据timed来判断,如果为true,则通过阻塞队列的poll方法进行超时控制,如果在keepAliveTime时间内没有获取到任务,则返回null; 否则通过take方法,如果这时队列为空,则take方法会阻塞直到队列不为空*/Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}

4.4submit()方法分析

submit()方法是AbstractExecutorService类的方法,submit()方法能得到任务执行的值,任务一般实现Callable接口,该接口和Runnable接口都有一个run()方法【线程实现的方式可以通过实现Callable接口,重写run方法】。线程池ThreadPoolExecutor继承了AbstractExecutorService这个类,所以也具有该方法,我们来看一下submit()方法在AbstractExecutorService类中的实现:

public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();//newTaskFor()方法是将Runnable的任务转换成Callable的任务,得到任务的返回结果RunnableFuture<Void> ftask = newTaskFor(task, null);//这里已经介绍过execute(ftask);return ftask;
}
//这里看一下
newTaskFor方法的实现
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {return new FutureTask<T>(runnable, value);
}
//这里返回的是FutureTask,该类实现了RunnableFuture接口,如下图:
public FutureTask(Runnable runnable, V result) {this.callable = Executors.callable(runnable, result);this.state = NEW;       // ensure visibility of callable
}

这里就不对Future接口过多介绍,线程池的分析暂时分析这些,主要分析了线程池中执行的过程,其它的方法没有介绍到,希望能够一起讨论。