当前位置: 代码迷 >> 综合 >> Java高并发编程中Executor、ExecutorService、ThreadPoolExecutor的使用及详细介绍-刘宇
  详细解决方案

Java高并发编程中Executor、ExecutorService、ThreadPoolExecutor的使用及详细介绍-刘宇

热度:14   发布时间:2024-02-10 10:00:01.0

Java高并发编程中Executor、ExecutorService、ThreadPoolExecutor的使用及详细介绍-刘宇

  • 一、什么是Executor框架
  • 二、Executor框架示意图
  • 三、ExecutorService的方法详解
    • 1、submit方法
    • 2、invokeAny方法
    • 3、invokeAll方法
    • 4、awaitTermination方法
    • 5、isShutdown方法
    • 6、isShutdown方法
    • 7、shutdown方法
    • 8、shutdownNow方法
  • 四、ThreadPoolExecutor的方法详解
    • 1、4种构造方法
    • 2、构造方法中参数详解
      • 2.1、corePoolSize
      • 2.2、maximumPoolSize
      • 2.3、keepAliveTime
      • 2.4、unit
      • 2.5、workQueue
      • 2.6、threadFactory
      • 2.6、handler
    • 3、其他基本方法
      • 3.1 execute方法
      • 3.2 getCorePoolSize方法
      • 3.3 getMaximumPoolSize方法
      • 3.4 getQueue方法
      • 3.5 getPoolSize方法
      • 3.6 getActiveCount方法
  • 五、ThreadPoolExecutor练习
    • 1、简单练习
    • 2、线程池关闭练习1
    • 3、线程池关闭练习2

作者:刘宇
CSDN博客地址:https://blog.csdn.net/liuyu973971883
有部分资料参考,如有侵权,请联系删除。如有不正确的地方,烦请指正,谢谢。

一、什么是Executor框架

Executor是从Java5中才开始有的,在java.util.cocurrent包下,是非常强大的异步执行框架,标准的将任务提交过程和执行过程进行解耦。内部使用了线程池机制,可以通过该框架来控制线程的启动、执行、关闭等,大大提高了并发开发的效率。

二、Executor框架示意图

在这里插入图片描述

  • Executor:是一个基本接口,提供方法execute(Runnable command),该方法接收一个Runable实例,将提交任务的过程与执行任务的过程进行了解耦。
  • ExecutorService:是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回Future的方法
  • AbstractExecutorService:ExecutorService执行方法的默认实现
  • ScheduledExecutorService:一个可定时调度任务的接口
  • ThreadPoolExecutor:线程池,可以通过调用Executors以下静态工厂方法来创建线程池并返回一个ExecutorService对象:
  • ScheduledThreadPoolExecutor:ScheduledExecutorService的实现,一个可定时调度任务的线程池

三、ExecutorService的方法详解

1、submit方法

//提交一个Runnable的任务,该任务没有返回结果,虽然返回一个Future对象,但是Future的get是null。
Future<?> submit(Runnable task)
//提交一个Runnable的任务,该任务可以有返回值,通过传入的result对象返回结果。
<T> Future<T> submit(Runnable task, T result)
//提交一个Callable的任务,该任务可以有返回值,因为Callable与Runnable不同,Callable自身就可以返回结果
<T> Future<T> submit(Callable<T> task)

2、invokeAny方法

批量提交任务并获得一个已经成功执行的任务的结果,如果一个任务已经完成,剩余的Callable任务将被取消。

  • tasks:Callable类型的集合
  • timeout:超时时间
  • unit:时间单位

异常处理:

  • 如果Callable集合中只有部分Callable异常,则即使是将该异常抛出,在其调用的地方也是无法捕获异常的,因为该Callable异常了,则会调用Callable集合中的下一个Callable。
  • 如果Callable集合中的Callable全部异常,则可以在其调用的地方捕获异常的。
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;

3、invokeAll方法

批量提交任务并获得他们的future,Task列表与Future列表一一对应

  • tasks:Callable类型的集合
  • timeout:超时时间
  • unit:时间单位

异常处理

  • 当线程池中线程发送发生异常时,直接在抛出,可以在其调用的方法捕获异常,但是只有在调用Future中的get才能捕获异常,否则则一样捕获不到。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;

4、awaitTermination方法

是阻塞方法。等待一定时间后如果有任务未结束则强行关闭

boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;

5、isShutdown方法

判断该ExcutorService是否关闭

boolean isShutdown();

6、isShutdown方法

立即关闭ExcutorService,如果有没有执行的Task则返回

boolean isShutdown();

7、shutdown方法

是非阻塞方法。不再接受新的任务,等待所有任务执行完毕后关闭ExcutorService

void shutdown();

8、shutdownNow方法

  • 它会对正在执行的线程进行interrupt,也有可能存在interrupt不成功的现象,如该线程中没有sleep、wait、join等即没有抛出中断异常的方法,则可能会出现中断失败
  • 将空闲线程取消
  • 会返回未执行的任务列表
List<Runnable> shutdownNow()

四、ThreadPoolExecutor的方法详解

1、4种构造方法

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

2、构造方法中参数详解

2.1、corePoolSize

线程池的基本大小,线程池最小就有这么多线程

2.2、maximumPoolSize

线程池中最大的线程数,当workQueue队列中的任务放不下的时候就会创建新线程,直至达到maximumPoolSize的值。无界的任务队列这个参数就没用了,newCachedThreadPool使用的阻塞队列就是一个无界的任务队列

2.3、keepAliveTime

线程最大空闲时间,如果在空闲时间内,任务队列没有饱和的话就会销毁除基本线程之外的线程。

2.4、unit

keepAliveTime的时间单位。可选的单位有Days、HOURS、MINUTES、MILLISECONDS、MICROSECONDS、NANOSECONDS。

2.5、workQueue

用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列:

  • ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,按FIFO原则进行排序
  • LinkedBlockingQueue:一个基于链表结构的阻塞队列,吞吐量高于ArrayBlockingQueue。静态工厂方法Excutors.newFixedThreadPool()使用了这个队列
  • SynchronousQueue: 一个不存储元素的阻塞队列。插入元素到队列的线程被阻塞,直到另一个线程从队列中获取了队列中存储的元素。如果线程获取的元素时该队列不存在任何元素,则该线程会被阻塞,直至有元素插入。吞吐量高于LinkedBlockingQueue,静态工厂方法Excutors.newCachedThreadPool()使用了这个队列
  • PriorityBlockingQueue:一个具有优先级的无限阻塞队列。

2.6、threadFactory

线程创建工厂,通常可以用来给线程命名、查看创建线程数、给线程设置是否是后台运行、设置线程优先级等等

2.6、handler

拒绝策略。当队列中的元素放满并且线程池中的线程达到最大数量时,此时线程池处于饱和状态。此时就需要做出相应的策略应对,有如下四个选项:
AbortPolicy:默认策略,抛出异常
CallerRunsPolicy:使用调用者所在线程来运行该任务,不抛出异常
DiscardOldestPolicy:丢弃队列里最近的一个任务,然后再添加到队列中,不抛出异常
DiscardPolicy:直接忽略提交的任务

3、其他基本方法

3.1 execute方法

非阻塞方法。提交Runnable任务,没有返回值。

void execute(Runnable command)

3.2 getCorePoolSize方法

获取当前线程池基本线程的大小

int getCorePoolSize()

3.3 getMaximumPoolSize方法

获取当前线程池允许的最大线程数

int getMaximumPoolSize()

3.4 getQueue方法

获取任务队列

BlockingQueue<Runnable> getQueue()

3.5 getPoolSize方法

获取线程池当前的线程数

int getPoolSize()

3.6 getActiveCount方法

获取线程池当前处于活跃状态的线程数

int getActiveCount()

五、ThreadPoolExecutor练习

1、简单练习

package com.brycen.concurrency03.threadpool;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class ThreadPoolExecutorExample {public static void main(String[] args) throws InterruptedException {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 30, TimeUnit.SECONDS,new ArrayBlockingQueue(1), r -> {return new Thread(r);}, new ThreadPoolExecutor.AbortPolicy());threadPoolExecutor.submit(() -> sleepSeconds(10));threadPoolExecutor.submit(() -> sleepSeconds(10));threadPoolExecutor.submit(() -> sleepSeconds(10));int corePoolSize = -1;int maxPoolSize = -1;int activeCount = -1;int poolSize = -1;int queueSize = -1;while (true) {int currentCorePoolSize = threadPoolExecutor.getCorePoolSize();int currentMaxPoolSize = threadPoolExecutor.getMaximumPoolSize();int currentActiveCount = threadPoolExecutor.getActiveCount();int currentPoolSize = threadPoolExecutor.getPoolSize();int currentQueueSize = threadPoolExecutor.getQueue().size();if (corePoolSize != currentCorePoolSize || maxPoolSize != currentMaxPoolSize|| activeCount != currentActiveCount || queueSize != currentQueueSize || poolSize != currentPoolSize) {System.out.println("CorePoolSize: " + currentCorePoolSize);System.out.println("MaximumPoolSize: " + currentMaxPoolSize);System.out.println("ActiveCount: " + currentActiveCount);System.out.println("PoolSize: " + currentPoolSize);System.out.println("BlockingQueueSize: " + currentQueueSize);corePoolSize = currentCorePoolSize;maxPoolSize = currentMaxPoolSize;activeCount = currentActiveCount;queueSize = currentQueueSize;poolSize = currentPoolSize;System.out.println("==============================================");TimeUnit.MILLISECONDS.sleep(100);}}}private static void sleepSeconds(int seconds) {try {System.out.println("** " + Thread.currentThread().getName() + " **");TimeUnit.SECONDS.sleep(seconds);} catch (InterruptedException e) {e.printStackTrace();}}}

运行结果:

** Thread-0 **
** Thread-1 **
CorePoolSize: 1
MaximumPoolSize: 2
ActiveCount: 2
PoolSize: 2
BlockingQueueSize: 1
==============================================
** Thread-1 **
CorePoolSize: 1
MaximumPoolSize: 2
ActiveCount: 1
PoolSize: 2
BlockingQueueSize: 0
==============================================
CorePoolSize: 1
MaximumPoolSize: 2
ActiveCount: 0
PoolSize: 2
BlockingQueueSize: 0
==============================================
CorePoolSize: 1
MaximumPoolSize: 2
ActiveCount: 0
PoolSize: 1
BlockingQueueSize: 0
==============================================

2、线程池关闭练习1

背景介绍:当我们的任务全部并行执行完关闭线程池后,如何串行执行我们后续代码。使用shutdown显然是不行的,因为他不是阻塞的,并行任务还没执行完我们的串行代码就被执行了。这时我们可以使用awaitTermination结合shutdown的方法,因为awaitTermination是阻塞的。

package com.brycen.concurrency03.threadpool;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;public class ThreadPoolExecutorExample2 {public static void main(String[] args) throws InterruptedException {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 30, TimeUnit.SECONDS,new ArrayBlockingQueue(10), r -> {return new Thread(r);}, new ThreadPoolExecutor.AbortPolicy());//提交20个任务IntStream.rangeClosed(1, 20).forEach(i->{threadPoolExecutor.submit(() -> task(5,String.valueOf(i)));});threadPoolExecutor.shutdown();//阻塞,等待线程池关闭threadPoolExecutor.awaitTermination(1, TimeUnit.HOURS);//下面可以进行串行执行代码System.out.println("========all work over========");}private static void task(int seconds,String no) {try {System.out.println(Thread.currentThread().getName()+" ["+no+"] start work");TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}}}

运行结果:

Thread-1 [2] start work
Thread-2 [3] start work
Thread-0 [1] start work
Thread-3 [4] start work
Thread-4 [5] start work
Thread-5 [6] start work
Thread-6 [7] start work
Thread-7 [8] start work
Thread-8 [9] start work
Thread-9 [10] start work
Thread-2 [11] start work
Thread-1 [13] start work
Thread-0 [12] start work
Thread-3 [14] start work
Thread-5 [16] start work
Thread-8 [18] start work
Thread-4 [15] start work
Thread-6 [17] start work
Thread-9 [20] start work
Thread-7 [19] start work
========all work over========

3、线程池关闭练习2

背景介绍:我们知道shutdownNow方法会立即关闭线程池,但是也会存在关闭不了的情况,因为shutdownNow中其实是对正在执行任务的线程进行interrupt打断,如果该任务中没有抛出interrupt异常的方法,则该线程就不会被打断,也就结束不了,这样的场景如:网络请求一个数据,这个数据非常庞大,耗时非常久,那么interrupt就不会使该线程中断,也就不会立即结束。

错误演示

  • 虽然最后会显示未执行的任务数,但是线程池并未关闭。
package com.brycen.concurrency03.threadpool;import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;public class ThreadPoolExecutorExample3 {public static void main(String[] args) throws InterruptedException {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 30, TimeUnit.SECONDS,new ArrayBlockingQueue(10), r -> {return new Thread(r);}, new ThreadPoolExecutor.AbortPolicy());// 提交20个任务IntStream.rangeClosed(1, 20).forEach(i -> {threadPoolExecutor.submit(() -> task(5, String.valueOf(i)));});List<Runnable> noRunTask = threadPoolExecutor.shutdownNow();// 下面可以进行串行执行代码System.out.println("未执行的任务数:" + noRunTask.size());}private static void task(int seconds, String no) {System.out.println(Thread.currentThread().getName() + " [" + no + "] start work");// 模拟网络请求while (true) {}}
}

正确演示

  • 解决方案就是利用ThreadFactory,将线程池创建的线程都设置为守护线程,这样当主线程挂掉之后,这样线程也就会跟着结束了
package com.brycen.concurrency03.threadpool;import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;public class ThreadPoolExecutorExample3 {public static void main(String[] args) throws InterruptedException {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 30, TimeUnit.SECONDS,new ArrayBlockingQueue(10), r -> {Thread t = new Thread(r);//设置为守护线程t.setDaemon(true);return t;}, new ThreadPoolExecutor.AbortPolicy());// 提交20个任务IntStream.rangeClosed(1, 20).forEach(i -> {threadPoolExecutor.submit(() -> task(5, String.valueOf(i)));});List<Runnable> noRunTask = threadPoolExecutor.shutdownNow();// 下面可以进行串行执行代码System.out.println("未执行的任务数:" + noRunTask.size());}private static void task(int seconds, String no) {System.out.println(Thread.currentThread().getName() + " [" + no + "] start work");// 模拟网络请求while (true) {}}
}

运行结果:

  • 成功关闭线程池,结束程序
Thread-0 [1] start work
Thread-2 [3] start work
Thread-4 [5] start work
Thread-1 [2] start work
Thread-3 [4] start work
Thread-5 [6] start work
Thread-6 [7] start work
Thread-7 [8] start work
Thread-8 [9] start work
Thread-9 [10] start work
未执行的任务数:10
  相关解决方案