Java高并发编程中Executor、ExecutorService、ThreadPoolExecutor的使用及详细介绍-刘宇
- 一、什么是Executor框架
- 二、Executor框架示意图
- 三、ExecutorService的方法详解
- 1、submit方法
- 2、invokeAny方法
- 3、invokeAll方法
- 4、awaitTermination方法
- 5、isShutdown方法
- 6、isShutdown方法
- 7、shutdown方法
- 8、shutdownNow方法
- 四、ThreadPoolExecutor的方法详解
- 1、4种构造方法
- 2、构造方法中参数详解
- 2.1、corePoolSize
- 2.2、maximumPoolSize
- 2.3、keepAliveTime
- 2.4、unit
- 2.5、workQueue
- 2.6、threadFactory
- 2.6、handler
- 3、其他基本方法
- 3.1 execute方法
- 3.2 getCorePoolSize方法
- 3.3 getMaximumPoolSize方法
- 3.4 getQueue方法
- 3.5 getPoolSize方法
- 3.6 getActiveCount方法
- 五、ThreadPoolExecutor练习
- 1、简单练习
- 2、线程池关闭练习1
- 3、线程池关闭练习2
作者:刘宇
CSDN博客地址:https://blog.csdn.net/liuyu973971883
有部分资料参考,如有侵权,请联系删除。如有不正确的地方,烦请指正,谢谢。
一、什么是Executor框架
Executor是从Java5中才开始有的,在java.util.cocurrent包下,是非常强大的异步执行框架,标准的将任务提交过程和执行过程进行解耦。内部使用了线程池机制,可以通过该框架来控制线程的启动、执行、关闭等,大大提高了并发开发的效率。
二、Executor框架示意图
- Executor:是一个基本接口,提供方法execute(Runnable command),该方法接收一个Runable实例,将提交任务的过程与执行任务的过程进行了解耦。
- ExecutorService:是一个比Executor使用更广泛的子类接口,其提供了生命周期管理的方法,以及可跟踪一个或多个异步任务执行状况返回Future的方法
- AbstractExecutorService:ExecutorService执行方法的默认实现
- ScheduledExecutorService:一个可定时调度任务的接口
- ThreadPoolExecutor:线程池,可以通过调用Executors以下静态工厂方法来创建线程池并返回一个ExecutorService对象:
- ScheduledThreadPoolExecutor:ScheduledExecutorService的实现,一个可定时调度任务的线程池
三、ExecutorService的方法详解
1、submit方法
//提交一个Runnable的任务,该任务没有返回结果,虽然返回一个Future对象,但是Future的get是null。
Future<?> submit(Runnable task)
//提交一个Runnable的任务,该任务可以有返回值,通过传入的result对象返回结果。
<T> Future<T> submit(Runnable task, T result)
//提交一个Callable的任务,该任务可以有返回值,因为Callable与Runnable不同,Callable自身就可以返回结果
<T> Future<T> submit(Callable<T> task)
2、invokeAny方法
批量提交任务并获得一个已经成功执行的任务的结果,如果一个任务已经完成,剩余的Callable任务将被取消。
- tasks:Callable类型的集合
- timeout:超时时间
- unit:时间单位
异常处理:
- 如果Callable集合中只有部分Callable异常,则即使是将该异常抛出,在其调用的地方也是无法捕获异常的,因为该Callable异常了,则会调用Callable集合中的下一个Callable。
- 如果Callable集合中的Callable全部异常,则可以在其调用的地方捕获异常的。
<T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
3、invokeAll方法
批量提交任务并获得他们的future,Task列表与Future列表一一对应
- tasks:Callable类型的集合
- timeout:超时时间
- unit:时间单位
异常处理
- 当线程池中线程发送发生异常时,直接在抛出,可以在其调用的方法捕获异常,但是只有在调用Future中的get才能捕获异常,否则则一样捕获不到。
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException;
4、awaitTermination方法
是阻塞方法。等待一定时间后如果有任务未结束则强行关闭
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
5、isShutdown方法
判断该ExcutorService是否关闭
boolean isShutdown();
6、isShutdown方法
立即关闭ExcutorService,如果有没有执行的Task则返回
boolean isShutdown();
7、shutdown方法
是非阻塞方法。不再接受新的任务,等待所有任务执行完毕后关闭ExcutorService
void shutdown();
8、shutdownNow方法
- 它会对正在执行的线程进行interrupt,也有可能存在interrupt不成功的现象,如该线程中没有sleep、wait、join等即没有抛出中断异常的方法,则可能会出现中断失败
- 将空闲线程取消
- 会返回未执行的任务列表
List<Runnable> shutdownNow()
四、ThreadPoolExecutor的方法详解
1、4种构造方法
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory)
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
2、构造方法中参数详解
2.1、corePoolSize
线程池的基本大小,线程池最小就有这么多线程
2.2、maximumPoolSize
线程池中最大的线程数,当workQueue队列中的任务放不下的时候就会创建新线程,直至达到maximumPoolSize的值。无界的任务队列这个参数就没用了,newCachedThreadPool使用的阻塞队列就是一个无界的任务队列
2.3、keepAliveTime
线程最大空闲时间,如果在空闲时间内,任务队列没有饱和的话就会销毁除基本线程之外的线程。
2.4、unit
keepAliveTime的时间单位。可选的单位有Days、HOURS、MINUTES、MILLISECONDS、MICROSECONDS、NANOSECONDS。
2.5、workQueue
用于保存等待执行的任务的阻塞队列。可以选择以下几个阻塞队列:
- ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,按FIFO原则进行排序
- LinkedBlockingQueue:一个基于链表结构的阻塞队列,吞吐量高于ArrayBlockingQueue。静态工厂方法Excutors.newFixedThreadPool()使用了这个队列
- SynchronousQueue: 一个不存储元素的阻塞队列。插入元素到队列的线程被阻塞,直到另一个线程从队列中获取了队列中存储的元素。如果线程获取的元素时该队列不存在任何元素,则该线程会被阻塞,直至有元素插入。吞吐量高于LinkedBlockingQueue,静态工厂方法Excutors.newCachedThreadPool()使用了这个队列
- PriorityBlockingQueue:一个具有优先级的无限阻塞队列。
2.6、threadFactory
线程创建工厂,通常可以用来给线程命名、查看创建线程数、给线程设置是否是后台运行、设置线程优先级等等
2.6、handler
拒绝策略。当队列中的元素放满并且线程池中的线程达到最大数量时,此时线程池处于饱和状态。此时就需要做出相应的策略应对,有如下四个选项:
AbortPolicy:默认策略,抛出异常
CallerRunsPolicy:使用调用者所在线程来运行该任务,不抛出异常
DiscardOldestPolicy:丢弃队列里最近的一个任务,然后再添加到队列中,不抛出异常
DiscardPolicy:直接忽略提交的任务
3、其他基本方法
3.1 execute方法
非阻塞方法。提交Runnable任务,没有返回值。
void execute(Runnable command)
3.2 getCorePoolSize方法
获取当前线程池基本线程的大小
int getCorePoolSize()
3.3 getMaximumPoolSize方法
获取当前线程池允许的最大线程数
int getMaximumPoolSize()
3.4 getQueue方法
获取任务队列
BlockingQueue<Runnable> getQueue()
3.5 getPoolSize方法
获取线程池当前的线程数
int getPoolSize()
3.6 getActiveCount方法
获取线程池当前处于活跃状态的线程数
int getActiveCount()
五、ThreadPoolExecutor练习
1、简单练习
package com.brycen.concurrency03.threadpool;import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class ThreadPoolExecutorExample {public static void main(String[] args) throws InterruptedException {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1, 2, 30, TimeUnit.SECONDS,new ArrayBlockingQueue(1), r -> {return new Thread(r);}, new ThreadPoolExecutor.AbortPolicy());threadPoolExecutor.submit(() -> sleepSeconds(10));threadPoolExecutor.submit(() -> sleepSeconds(10));threadPoolExecutor.submit(() -> sleepSeconds(10));int corePoolSize = -1;int maxPoolSize = -1;int activeCount = -1;int poolSize = -1;int queueSize = -1;while (true) {int currentCorePoolSize = threadPoolExecutor.getCorePoolSize();int currentMaxPoolSize = threadPoolExecutor.getMaximumPoolSize();int currentActiveCount = threadPoolExecutor.getActiveCount();int currentPoolSize = threadPoolExecutor.getPoolSize();int currentQueueSize = threadPoolExecutor.getQueue().size();if (corePoolSize != currentCorePoolSize || maxPoolSize != currentMaxPoolSize|| activeCount != currentActiveCount || queueSize != currentQueueSize || poolSize != currentPoolSize) {System.out.println("CorePoolSize: " + currentCorePoolSize);System.out.println("MaximumPoolSize: " + currentMaxPoolSize);System.out.println("ActiveCount: " + currentActiveCount);System.out.println("PoolSize: " + currentPoolSize);System.out.println("BlockingQueueSize: " + currentQueueSize);corePoolSize = currentCorePoolSize;maxPoolSize = currentMaxPoolSize;activeCount = currentActiveCount;queueSize = currentQueueSize;poolSize = currentPoolSize;System.out.println("==============================================");TimeUnit.MILLISECONDS.sleep(100);}}}private static void sleepSeconds(int seconds) {try {System.out.println("** " + Thread.currentThread().getName() + " **");TimeUnit.SECONDS.sleep(seconds);} catch (InterruptedException e) {e.printStackTrace();}}}
运行结果:
** Thread-0 **
** Thread-1 **
CorePoolSize: 1
MaximumPoolSize: 2
ActiveCount: 2
PoolSize: 2
BlockingQueueSize: 1
==============================================
** Thread-1 **
CorePoolSize: 1
MaximumPoolSize: 2
ActiveCount: 1
PoolSize: 2
BlockingQueueSize: 0
==============================================
CorePoolSize: 1
MaximumPoolSize: 2
ActiveCount: 0
PoolSize: 2
BlockingQueueSize: 0
==============================================
CorePoolSize: 1
MaximumPoolSize: 2
ActiveCount: 0
PoolSize: 1
BlockingQueueSize: 0
==============================================
2、线程池关闭练习1
背景介绍:当我们的任务全部并行执行完关闭线程池后,如何串行执行我们后续代码。使用shutdown显然是不行的,因为他不是阻塞的,并行任务还没执行完我们的串行代码就被执行了。这时我们可以使用awaitTermination结合shutdown的方法,因为awaitTermination是阻塞的。
package com.brycen.concurrency03.threadpool;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;public class ThreadPoolExecutorExample2 {public static void main(String[] args) throws InterruptedException {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 30, TimeUnit.SECONDS,new ArrayBlockingQueue(10), r -> {return new Thread(r);}, new ThreadPoolExecutor.AbortPolicy());//提交20个任务IntStream.rangeClosed(1, 20).forEach(i->{threadPoolExecutor.submit(() -> task(5,String.valueOf(i)));});threadPoolExecutor.shutdown();//阻塞,等待线程池关闭threadPoolExecutor.awaitTermination(1, TimeUnit.HOURS);//下面可以进行串行执行代码System.out.println("========all work over========");}private static void task(int seconds,String no) {try {System.out.println(Thread.currentThread().getName()+" ["+no+"] start work");TimeUnit.SECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}}}
运行结果:
Thread-1 [2] start work
Thread-2 [3] start work
Thread-0 [1] start work
Thread-3 [4] start work
Thread-4 [5] start work
Thread-5 [6] start work
Thread-6 [7] start work
Thread-7 [8] start work
Thread-8 [9] start work
Thread-9 [10] start work
Thread-2 [11] start work
Thread-1 [13] start work
Thread-0 [12] start work
Thread-3 [14] start work
Thread-5 [16] start work
Thread-8 [18] start work
Thread-4 [15] start work
Thread-6 [17] start work
Thread-9 [20] start work
Thread-7 [19] start work
========all work over========
3、线程池关闭练习2
背景介绍:我们知道shutdownNow方法会立即关闭线程池,但是也会存在关闭不了的情况,因为shutdownNow中其实是对正在执行任务的线程进行interrupt打断,如果该任务中没有抛出interrupt异常的方法,则该线程就不会被打断,也就结束不了,这样的场景如:网络请求一个数据,这个数据非常庞大,耗时非常久,那么interrupt就不会使该线程中断,也就不会立即结束。
错误演示
- 虽然最后会显示未执行的任务数,但是线程池并未关闭。
package com.brycen.concurrency03.threadpool;import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;public class ThreadPoolExecutorExample3 {public static void main(String[] args) throws InterruptedException {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 30, TimeUnit.SECONDS,new ArrayBlockingQueue(10), r -> {return new Thread(r);}, new ThreadPoolExecutor.AbortPolicy());// 提交20个任务IntStream.rangeClosed(1, 20).forEach(i -> {threadPoolExecutor.submit(() -> task(5, String.valueOf(i)));});List<Runnable> noRunTask = threadPoolExecutor.shutdownNow();// 下面可以进行串行执行代码System.out.println("未执行的任务数:" + noRunTask.size());}private static void task(int seconds, String no) {System.out.println(Thread.currentThread().getName() + " [" + no + "] start work");// 模拟网络请求while (true) {}}
}
正确演示
- 解决方案就是利用ThreadFactory,将线程池创建的线程都设置为守护线程,这样当主线程挂掉之后,这样线程也就会跟着结束了
package com.brycen.concurrency03.threadpool;import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;public class ThreadPoolExecutorExample3 {public static void main(String[] args) throws InterruptedException {ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 20, 30, TimeUnit.SECONDS,new ArrayBlockingQueue(10), r -> {Thread t = new Thread(r);//设置为守护线程t.setDaemon(true);return t;}, new ThreadPoolExecutor.AbortPolicy());// 提交20个任务IntStream.rangeClosed(1, 20).forEach(i -> {threadPoolExecutor.submit(() -> task(5, String.valueOf(i)));});List<Runnable> noRunTask = threadPoolExecutor.shutdownNow();// 下面可以进行串行执行代码System.out.println("未执行的任务数:" + noRunTask.size());}private static void task(int seconds, String no) {System.out.println(Thread.currentThread().getName() + " [" + no + "] start work");// 模拟网络请求while (true) {}}
}
运行结果:
- 成功关闭线程池,结束程序
Thread-0 [1] start work
Thread-2 [3] start work
Thread-4 [5] start work
Thread-1 [2] start work
Thread-3 [4] start work
Thread-5 [6] start work
Thread-6 [7] start work
Thread-7 [8] start work
Thread-8 [9] start work
Thread-9 [10] start work
未执行的任务数:10