当前位置: 代码迷 >> 综合 >> ThreadPoolExecutor 线程池解析
  详细解决方案

ThreadPoolExecutor 线程池解析

热度:90   发布时间:2023-12-15 01:00:00.0

一。ThreadPoolExecuror类的构造方法中各个参数的含义:
public ThreadPoolExecutor (int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,BlockingQueue<Runnable> workQueue)

corePoolSize:线程池中所保存的核心线程数,包括空闲线程。

maximumPoolSize:池中允许的最大线程数。

keepAliveTime:线程池中的空闲线程所能持续的最长时间。

unit:持续时间的单位。

workQueue:任务执行前保存任务的队列,仅保存由execute方法提交的Runnable任务。


根据ThreadPoolExecutor源码前面大段的注释,我们可以看出,当试图通过excute方法将一个Runnable任务添加到线程池中时,按照如下顺序来处理:
    1、如果线程池中的线程数量少于corePoolSize,即使线程池中有空闲线程,也会创建一个新的线程来执行新添加的任务;
    2、如果线程池中的线程数量大于等于corePoolSize,但缓冲队列workQueue未满,则将新添加的任务放到workQueue中,按照FIFO的原则依次等待执行(线程池中有线程空闲出来后依次将缓冲队列中的任务交付给空闲的线程执行);

   3、如果线程池中的线程数量大于等于corePoolSize,且缓冲队列workQueue已满,但线程池中的线程数量小于maximumPoolSize,则会创建新的线程来处理被添加的任务;


  4、如果线程池中的线程数量等于了maximumPoolSize,有4种处理方式(该构造方法调用了含有5个参数的构造方法,并将最后一个构造方法为RejectedExecutionHandler类型,它在处理线程溢出时有4种方式,这里不再细说,要了解的,自己可以阅读下源码)。

    总结起来,也即是说,当有新的任务要处理时,先看线程池中的线程数量是否大于corePoolSize,再看缓冲队列workQueue是否满,最后看线程池中的线程数量是否大于maximumPoolSize。

    另外,当线程池中的线程数量大于corePoolSize时,如果里面有线程的空闲时间超过了keepAliveTime,就将其移除线程池,这样,可以动态地调整线程池中线程的数量。

 

二。多线程测试demo

下面的例子,核心线程数3个,总的线程数5个,总任务数10个,任务队列的长度5(new LinkedBlockingQueue<Runnable>(5))此时执行的是上面方式3的这种情况(红色字体)

通过下面的打印日志,可以看出workQueue已满,且线程总数小于5,所以这里又创建了2个非核心的线程,用来执行任务:

16:02:00,271  INFO TestThread:101 - 当前的线程是:pool-1-thread-4
16:02:00,271  INFO TestThread:101 - 当前的线程是:pool-1-thread-5

private final ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(3 ,5 ,100 ,TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(5));@Testpublic void testThreadPoolExecutor() throws ExecutionException, InterruptedException {logger.info("start testThreadPoolExecutor....");List<Future<String>> futureList = new ArrayList<>();for (Integer i = 0; i < 10; i++) {Future<String> future = poolExecutor.submit(new Callable<String>() {@Overridepublic String call() throws Exception{Thread.sleep(1000);return "当前的线程是:" + Thread.currentThread().getName();}});futureList.add(future);}logger.info("perform  testThreadPoolExecutor....");for (Future future : futureList) {try {logger.info(future.get().toString());} catch (InterruptedException | ExecutionException e) {e.printStackTrace();}}logger.info("end  testThreadPoolExecutor....");}

 

三。日志解析:

start testThreadPoolExecutor....
perform  testThreadPoolExecutor....

因为上面的这两条日志是顺序执行的,这里可以看出ThreadPoolExecutor 直接提交了多个任务(此时任务还没有开始执行),仅仅只是提交任务而已。

然后用Future来接收每个线程的返回值,此时还没有真正的返回值,这里只是标记把值赋给Future对象(这里用到了函数指针的概念,当这里有值返回的时候,就立即把值传给Future对象)。关于Future,大家可以看看我上面几篇文章。

任务真正执行是在这句代码:logger.info(future.get().toString());

这里的future.get()会阻塞式的等待之前提交任务的执行结果,没有结果返回的话,会一直等待下去,直到返回一个结果,才能执行下面的代码。