当前位置: 代码迷 >> 综合 >> CONCURRENT—JDK工具篇下
  详细解决方案

CONCURRENT—JDK工具篇下

热度:97   发布时间:2023-10-09 23:35:18.0

CONCURRENT—JDK工具篇下

第十七章 通信工具类

类(java.util.concurrent) 作用
Semaphore 限制线程的数量
Exchanger 两个线程交换数据
CountDownLatch 线程等待直到计数器减为0时开始工作
CyclicBarrier 作用跟CountDownLatch类似,但是可以重复使用
Phaser 增强的CyclicBarrier

Semaphore

Semaphore这个工具类提供的功能就是多个线程彼此“打信号”。而这个“信号”是一个int类型的数据,也可以看成是一种“资源”。可以在构造函数中传入初始资源总数,以及是否使用“公平”的同步器。默认情况下,是非公平的。

public Semaphore(int permits) {
      // 默认情况下使用非公平sync = new NonfairSync(permits);
}public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

最主要的方法是acquire方法和release方法。acquire()方法会申请一个permit,而release方法会释放一个permit。当然,你也可以申请多个acquire(int permits)或者释放多个release(int permits)。

每次acquire,permits就会减少一个或者多个。如果减少到了0,再有其他线程来acquire,那就要阻塞这个线程直到有其它线程release permit为止。

Semaphore案例

Semaphore往往用于资源有限的场景中,去限制线程的数量

public class SemaphoreDemo {
    static class MyThread implements Runnable {
    private int value;private Semaphore semaphore;public MyThread(int value, Semaphore semaphore) {
    this.value = value;this.semaphore = semaphore;}@Overridepublic void run() {
    try {
    semaphore.acquire(); // 获取permitSystem.out.println(String.format("当前线程是%d, 还剩%d个资源,还有%d个线程在等待",value, semaphore.availablePermits(), semaphore.getQueueLength()));// 睡眠随机时间,打乱释放顺序Random random =new Random();Thread.sleep(random.nextInt(1000));System.out.println(String.format("线程%d释放了资源", value));} catch (InterruptedException e) {
    e.printStackTrace();} finally{
    semaphore.release(); // 释放permit}}}public static void main(String[] args) {
    Semaphore semaphore = new Semaphore(3);for (int i = 0; i < 10; i++) {
    new Thread(new MyThread(i, semaphore)).start();}}
}

Semaphore默认的acquire方法是会让线程进入等待队列,且会抛出中断异常。但它还有一些方法可以忽略中断或不进入阻塞队列:

// 忽略中断
public void acquireUninterruptibly()
public void acquireUninterruptibly(int permits)// 不进入等待队列,底层使用CAS
public boolean tryAcquire
public boolean tryAcquire(int permits)
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)throws InterruptedException
public boolean tryAcquire(long timeout, TimeUnit unit)

Semaphore原理

Semaphore内部有一个继承了AQS的同步器Sync,重写了tryAcquireShared方法。在这个方法里,会去尝试获取资源。如果获取失败(想要的资源数量小于目前已有的资源数量),就会返回一个负数(代表尝试获取资源失败)。然后当前线程就会进入AQS的等待队列。

Exchanger

Exchanger类用于两个线程交换数据。它支持泛型,也就是说你可以在两个线程之间不断传送任何数据。

  • 此类提供对外的操作是同步的;
  • 用于成对出现的线程之间交换数据;
  • 可以视作双向的同步队列;
  • 可应用于基因算法、流水线设计等场景。
public class ExchangerDemo {
    public static void main(String[] args) throws InterruptedException {
    Exchanger<String> exchanger = new Exchanger<>();new Thread(() -> {
    try {
    System.out.println("这是线程A,得到了另一个线程的数据:"+ exchanger.exchange("这是来自线程A的数据"));} catch (InterruptedException e) {
    e.printStackTrace();}}).start();System.out.println("这个时候线程A是阻塞的,在等待线程B的数据");Thread.sleep(1000);new Thread(() -> {
    try {
    System.out.println("这是线程B,得到了另一个线程的数据:"+ exchanger.exchange("这是来自线程B的数据"));} catch (InterruptedException e) {
    e.printStackTrace();}}).start();}
}

输出:

这个时候线程A是阻塞的,在等待线程B的数据
这是线程B,得到了另一个线程的数据:这是来自线程A的数据
这是线程A,得到了另一个线程的数据:这是来自线程B的数据

源码中使用park/unpark来实现等待状态的切换,在使用park/unpark方法之前,进行CAS检查,提高性能。Exchanger类还有一个有超时参数的方法,如果在指定时间内没有另一个线程调用exchange,就会抛出一个超时异常。

public V exchange(V x, long timeout, TimeUnit unit)

CountDownLatch

CountDown代表计数递减,Latch是“门闩”的意思。假设某个线程在执行任务之前,需要等待其它线程完成一些前置任务,必须等所有的前置任务都完成,才能开始执行本线程的任务。

// 构造方法:
public CountDownLatch(int count)
public void await() // 等待
public boolean await(long timeout, TimeUnit unit) // 超时等待
public void countDown() // count - 1
public long getCount() // 获取当前还有多少count

CountDownLatch案例

public class CountDownLatchDemo {
    // 定义前置任务线程static class PreTaskThread implements Runnable {
    private String task;private CountDownLatch countDownLatch;public PreTaskThread(String task, CountDownLatch countDownLatch) {
    this.task = task;this.countDownLatch = countDownLatch;}@Overridepublic void run() {
    try {
    Random random = new Random();Thread.sleep(random.nextInt(1000));System.out.println(task + " - 任务完成");countDownLatch.countDown();} catch (InterruptedException e) {
    e.printStackTrace();}}}public static void main(String[] args) {
    // 假设有三个模块需要加载CountDownLatch countDownLatch = new CountDownLatch(3);// 主任务new Thread(() -> {
    try {
    System.out.println("等待数据加载...");System.out.println(String.format("还有%d个前置任务", countDownLatch.getCount()));countDownLatch.await();System.out.println("数据加载完成,正式开始游戏!");} catch (InterruptedException e) {
    e.printStackTrace();}}).start();// 前置任务new Thread(new PreTaskThread("加载地图数据", countDownLatch)).start();new Thread(new PreTaskThread("加载人物模型", countDownLatch)).start();new Thread(new PreTaskThread("加载背景音乐", countDownLatch)).start();}
}

输出:

等待数据加载…
还有3个前置任务
加载人物模型 - 任务完成
加载背景音乐 - 任务完成
加载地图数据 - 任务完成
数据加载完成,正式开始游戏!

CountDownLatch原理

内部同样是一个继承了AQS的实现类Sync,需要注意的是构造器中的计数值(count)实际上就是闭锁需要等待的线程数量。这个值只能被设置一次,而且CountDownLatch没有提供任何机制去重新设置这个计数值

CyclicBarrier

CyclicBarrirer从名字上来理解是“循环的屏障”的意思。CountDownLatch一旦计数值count被降为0后,就不能再重新设置了,它只能起一次“屏障”的作用。而CyclicBarrier拥有CountDownLatch的所有功能,还可以使用reset()方法重置屏障。

CyclicBarrier Barrier被破坏

如果参与者(线程)在等待的过程中,Barrier被破坏,就会抛出BrokenBarrierException(且会传播到其他线程)。可以用isBroken()方法检测Barrier是否被破坏。

CyclicBarrier案例

如果玩一个游戏有多个“关卡”,那使用CountDownLatch显然不太合适,那需要为每个关卡都创建一个实例

public class CyclicBarrierDemo {
    static class PreTaskThread implements Runnable {
    private String task;private CyclicBarrier cyclicBarrier;public PreTaskThread(String task, CyclicBarrier cyclicBarrier) {
    this.task = task;this.cyclicBarrier = cyclicBarrier;}@Overridepublic void run() {
    // 假设总共三个关卡for (int i = 1; i < 4; i++) {
    try {
    Random random = new Random();Thread.sleep(random.nextInt(1000));System.out.println(String.format("关卡%d的任务%s完成", i, task));cyclicBarrier.await();} catch (InterruptedException | BrokenBarrierException e) {
    e.printStackTrace();}cyclicBarrier.reset(); // 重置屏障}}}public static void main(String[] args) {
    CyclicBarrier cyclicBarrier = new CyclicBarrier(3, () -> {
    System.out.println("本关卡所有前置任务完成,开始游戏...");});new Thread(new PreTaskThread("加载地图数据", cyclicBarrier)).start();new Thread(new PreTaskThread("加载人物模型", cyclicBarrier)).start();new Thread(new PreTaskThread("加载背景音乐", cyclicBarrier)).start();}
}

注意这里跟CountDownLatch的代码有一些不同。CyclicBarrier没有分为await()countDown(),而是只有单独的一个await()方法。一旦调用await()方法的线程数量等于构造方法中传入的任务总量(这里是3),就代表达到屏障了。CyclicBarrier允许我们在达到屏障的时候可以执行一个任务,可以在构造方法传入一个Runnable类型的对象。上述案例就是在达到屏障时,输出“本关卡所有前置任务完成,开始游戏…”。

// 构造方法
public CyclicBarrier(int parties) {
    this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
    // 具体实现
}

CyclicBarrier原理

CyclicBarrier虽说功能与CountDownLatch类似,但是实现原理却完全不同,CyclicBarrier内部使用的是Lock + Condition实现的等待/通知模式。详情可以查看这个方法的源码:

private int dowait(boolean timed, long nanos)

Phaser

这个是“移相器,相位器”的意思,CyclicBarrier可以发现它在构造方法里传入“任务总量”parties之后,便不能修改,并且每次调用await()方法也只能消耗一个parties计数。但Phaser可以动态地调整任务总量!

  • party:对应一个线程,数量可以通过register或者构造参数传入;
  • arrive:对应一个party的状态,初始时是unarrived,当调用arriveAndAwaitAdvance()或者 arriveAndDeregister()进入arrive状态,可以通过getUnarrivedParties()获取当前未到达的数量;
  • register:注册一个party,每一阶段必须所有注册的party都到达才能进入下一阶段;
  • deRegister:减少一个party。
  • phase:阶段,当所有注册的party都arrive之后,将会调用Phaser的onAdvance()方法来判断是否要进入下一阶段。

Phaser终止的两种途径,Phaser维护的线程执行完毕或者onAdvance()返回true 此外Phaser还能维护一个树状的层级关系,构造的时候new Phaser(parentPhaser),对于Task执行时间短的场景(竞争激烈),也就是说有大量的party, 那可以把每个Phaser的任务量设置较小,多个Phaser共同继承一个父Phaser。

Phaser案例

假设游戏有三个关卡,但只有第一个关卡有新手教程,需要加载新手教程模块。但后面的第二个关卡和第三个关卡都不需要。我们可以用Phaser来做这个需求:

public class PhaserDemo {
    static class PreTaskThread implements Runnable {
    private String task;private Phaser phaser;public PreTaskThread(String task, Phaser phaser) {
    this.task = task;this.phaser = phaser;}@Overridepublic void run() {
    for (int i = 1; i < 4; i++) {
    try {
    // 第二次关卡起不加载NPC,跳过if (i >= 2 && "加载新手教程".equals(task)) {
    continue;}Random random = new Random();Thread.sleep(random.nextInt(1000));System.out.println(String.format("关卡%d,需要加载%d个模块,当前模块【%s】",i, phaser.getRegisteredParties(), task));// 从第二个关卡起,不加载NPCif (i == 1 && "加载新手教程".equals(task)) {
    System.out.println("下次关卡移除加载【新手教程】模块");phaser.arriveAndDeregister(); // 移除一个模块} else {
    phaser.arriveAndAwaitAdvance();}} catch (InterruptedException e) {
    e.printStackTrace();}}}}public static void main(String[] args) {
    Phaser phaser = new Phaser(4) {
    @Overrideprotected boolean onAdvance(int phase, int registeredParties) {
    System.out.println(String.format("第%d次关卡准备完成", phase + 1));return phase == 3 || registeredParties == 0;}};new Thread(new PreTaskThread("加载地图数据", phaser)).start();new Thread(new PreTaskThread("加载人物模型", phaser)).start();new Thread(new PreTaskThread("加载背景音乐", phaser)).start();new Thread(new PreTaskThread("加载新手教程", phaser)).start();}
}

注意关卡1的输出,在“加载新手教程”线程中调用了arriveAndDeregister()减少一个party之后,后面的线程使用getRegisteredParties()得到的是已经被修改后的parties了。但是当前这个阶段(phase),仍然是需要4个parties都arrive才触发屏障的。从下一个阶段开始,才需要3个parties都arrive就触发屏障。

Phaser原理

Phaser类的原理相比起来要复杂得多。它内部使用了两个基于Fork-Join框架的原子类辅助:

private final AtomicReference<QNode> evenQ;
private final AtomicReference<QNode> oddQ;static final class QNode implements ForkJoinPool.ManagedBlocker {
    // 实现代码
}

第十八章 Fork/Join框架

什么是Fork/Join

Fork/Join框架是一个实现了ExecutorService接口的多线程处理器,它专为那些可以通过递归分解成更细小的任务而设计,最大化的利用多核处理器来提高应用程序的性能。Fork/Join框架在执行任务时使用了工作窃取算法

fork在英文里有分叉的意思,join在英文里连接、结合的意思。顾名思义,fork就是要使一个大任务分解成若干个小任务,而join就是最后将各个小任务的结果结合起来得到大任务的结果。

solve(任务):if(任务已经划分到足够小):顺序执行任务else:for(划分任务得到子任务)solve(子任务)结合所有子任务的结果到上一层循环return 最终结合的结果

递归嵌套的计算得到最终结果,这里有体现分而治之(divide and conquer) 的算法思想。

工作窃取算法

工作窃取算法指的是在多线程执行不同任务队列的过程中,某个线程执行完自己队列的任务后从其他线程的任务队列里窃取任务来执行。当一个线程窃取另一个线程的时候,为了减少两个任务线程之间的竞争,我们通常使用双端队列来存储任务。被窃取的任务线程都从双端队列的头部拿任务执行,而窃取其他任务的线程从双端队列的尾部执行任务。另外,当一个线程在窃取任务时要是没有其他可用的任务了,这个线程会进入阻塞状态以等待再次“工作”。

Fork/Join的具体实现

前面我们说Fork/Join框架简单来讲就是对任务的分割与子任务的合并,所以要实现这个框架,先得有任务。在Fork/Join框架里提供了抽象类ForkJoinTask来实现任务。

ForkJoinTask

ForkJoinTask是一个类似普通线程的实体,但是比普通线程轻量得多。

fork()方法:使用线程池中的空闲线程异步提交任务

// 本文所有代码都引自Java 8
public final ForkJoinTask<V> fork() {
    Thread t;// ForkJoinWorkerThread是执行ForkJoinTask的专有线程,由ForkJoinPool管理// 先判断当前线程是否是ForkJoin专有线程,如果是,则将任务push到当前线程所负责的队列里去if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)((ForkJoinWorkerThread)t).workQueue.push(this);else// 如果不是则将线程加入队列// 没有显式创建ForkJoinPool的时候走这里,提交任务到默认的common线程池中ForkJoinPool.common.externalPush(this);return this;
}

其实fork()只做了一件事,那就是把任务推入当前工作线程的工作队列里

join()方法:等待处理任务的线程处理完毕,获得返回值。

来看下join()的源码:

public final V join() {
    int s;// doJoin()方法来获取当前任务的执行状态if ((s = doJoin() & DONE_MASK) != NORMAL)// 任务异常,抛出异常reportException(s);// 任务正常完成,获取返回值return getRawResult();
}/*** doJoin()方法用来返回当前任务的执行状态**/
private int doJoin() {
    int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;// 先判断任务是否执行完毕,执行完毕直接返回结果(执行状态)return (s = status) < 0 ? s :// 如果没有执行完毕,先判断是否是ForkJoinWorkThread线程((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?// 如果是,先判断任务是否处于工作队列顶端(意味着下一个就执行它)// tryUnpush()方法判断任务是否处于当前工作队列顶端,是返回true// doExec()方法执行任务(w = (wt = (ForkJoinWorkerThread)t).workQueue).// 如果是处于顶端并且任务执行完毕,返回结果tryUnpush(this) && (s = doExec()) < 0 ? s :// 如果不在顶端或者在顶端却没未执行完毕,那就调用awitJoin()执行任务// awaitJoin():使用自旋使任务执行完成,返回结果wt.pool.awaitJoin(w, this, 0L) :// 如果不是ForkJoinWorkThread线程,执行externalAwaitDone()返回任务结果externalAwaitDone();
}

我们在之前介绍过说Thread.join()会使线程阻塞,而ForkJoinPool.join()会使线程免于阻塞

RecursiveAction和RecursiveTask

通常情况下,在创建任务的时候我们一般不直接继承ForkJoinTask,而是继承它的子类RecursiveActionRecursiveTask。两个都是ForkJoinTask的子类,RecursiveAction可以看做是无返回值的ForkJoinTask,RecursiveTask是有返回值的ForkJoinTask。此外,两个子类都有执行主要计算的方法compute(),当然,RecursiveAction的compute()返回void,RecursiveTask的compute()有具体的返回值。

ForkJoinPool

ForkJoinPool是用于执行ForkJoinTask任务的执行(线程)池。ForkJoinPool管理着执行池中的线程和任务队列,此外,执行池是否还接受任务,显示线程的运行状态也是在这里处理。

@sun.misc.Contended
public class ForkJoinPool extends AbstractExecutorService {
    // 任务队列volatile WorkQueue[] workQueues;   // 线程的运行状态volatile int runState;  // 创建ForkJoinWorkerThread的默认工厂,可以通过构造函数重写public static final ForkJoinWorkerThreadFactory defaultForkJoinWorkerThreadFactory;// 公用的线程池,其运行状态不受shutdown()和shutdownNow()的影响static final ForkJoinPool common;// 私有构造方法,没有任何安全检查和参数校验,由makeCommonPool直接调用// 其他构造方法都是源自于此方法// parallelism: 并行度,// 默认调用java.lang.Runtime.availableProcessors() 方法返回可用处理器的数量private ForkJoinPool(int parallelism,ForkJoinWorkerThreadFactory factory, // 工作线程工厂UncaughtExceptionHandler handler, // 拒绝任务的handlerint mode, // 同步模式String workerNamePrefix) {
     // 线程名prefixthis.workerNamePrefix = workerNamePrefix;this.factory = factory;this.ueh = handler;this.config = (parallelism & SMASK) | mode;long np = (long)(-parallelism); // offset ctl countsthis.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);}}

WorkQueue

双端队列,ForkJoinTask存放在这里。当工作线程在处理自己的工作队列时,会从队列首取任务来执行(FIFO);如果是窃取其他队列的任务时,窃取的任务位于所属任务队列的队尾(LIFO)。

ForkJoinPool与传统线程池最显著的区别就是它维护了一个工作队列数组(volatile WorkQueue[] workQueues,ForkJoinPool中的每个工作线程都维护着一个工作队列)。

runState

ForkJoinPool的运行状态。SHUTDOWN状态用负数表示,其他用2的幂次表示。

Fork/Join的使用

上面我们说ForkJoinPool负责管理线程和任务,ForkJoinTask实现fork和join操作,所以要使用Fork/Join框架就离不开这两个类了,只是在实际开发中我们常用ForkJoinTask的子类RecursiveTask 和RecursiveAction来替代ForkJoinTask。

下面我们用一个计算斐波那契数列第n项的例子来看一下Fork/Join的使用:

斐波那契数列数列是一个线性递推数列,从第三项开始,每一项的值都等于前两项之和:

1, 1, 2, 3, 5, 8, 13, 21, 34, 55, 89······

如果设f(n)为该数列的第n项(n∈N*),那么有:f(n) = f(n-1) + f(n-2)。

public class FibonacciTest {
    class Fibonacci extends RecursiveTask<Integer> {
    int n;public Fibonacci(int n) {
    this.n = n;}// 主要的实现逻辑都在compute()里@Overrideprotected Integer compute() {
    // 这里先假设 n >= 0if (n <= 1) {
    return n;} else {
    // f(n-1)Fibonacci f1 = new Fibonacci(n - 1);f1.fork();// f(n-2)Fibonacci f2 = new Fibonacci(n - 2);f2.fork();// f(n) = f(n-1) + f(n-2)return f1.join() + f2.join();}}}@Testpublic void testFib() throws ExecutionException, InterruptedException {
    ForkJoinPool forkJoinPool = new ForkJoinPool();System.out.println("CPU核数:" + Runtime.getRuntime().availableProcessors());long start = System.currentTimeMillis();Fibonacci fibonacci = new Fibonacci(40);Future<Integer> future = forkJoinPool.submit(fibonacci);System.out.println(future.get());long end = System.currentTimeMillis();System.out.println(String.format("耗时:%d millis", end - start));}}

Java 8 Stream的并行操作底层就是用到了Fork/Join框架

第十九章 Java 8 Stream并行计算原理

从Java 8 开始,我们可以使用Stream接口以及lambda表达式进行“流式计算”。Stream接口有非常多用于集合计算的方法,比如判空操作empty、过滤操作filter、求最max值、查找操作findFirst和findAny等等。

Stream单线程串行计算

Stream接口默认是使用串行的方式,也就是说在一个线程里(main)执行

public class StreamDemo {
    public static void main(String[] args) {
    Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9).reduce((a, b) -> {
    System.out.println(String.format("%s: %d + %d = %d",Thread.currentThread().getName(), a, b, a + b));return a + b;}).ifPresent(System.out::println);}
}

整数1~9创建了一个Stream。这里的Stream.of(T… values)方法是Stream接口的一个静态方法,其底层调用的是Arrays.stream(T[] array)方法。然后使用了reduce方法来计算这个集合的累加和。reduce方法这里做的是:从前两个元素开始,进行某种操作(这里进行的是加法操作)后,返回一个结果,然后再拿这个结果跟第三个元素执行同样的操作,以此类推,直到最后的一个元素。

main: 1 + 2 = 3
main: 3 + 3 = 6
main: 6 + 4 = 10
main: 10 + 5 = 15
main: 15 + 6 = 21
main: 21 + 7 = 28
main: 28 + 8 = 36
main: 36 + 9 = 45
45

Stream多线程并行计算

public class StreamParallelDemo {
    public static void main(String[] args) {
    Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9).parallel().reduce((a, b) -> {
    System.out.println(String.format("%s: %d + %d = %d",Thread.currentThread().getName(), a, b, a + b));return a + b;}).ifPresent(System.out::println);}
}

一点区别,在reduce方法被调用之前,调用了parallel()方法。下面来看看这个方法的输出:

ForkJoinPool.commonPool-worker-1: 3 + 4 = 7
ForkJoinPool.commonPool-worker-4: 8 + 9 = 17
ForkJoinPool.commonPool-worker-2: 5 + 6 = 11
ForkJoinPool.commonPool-worker-3: 1 + 2 = 3
ForkJoinPool.commonPool-worker-4: 7 + 17 = 24
ForkJoinPool.commonPool-worker-4: 11 + 24 = 35
ForkJoinPool.commonPool-worker-3: 3 + 7 = 10
ForkJoinPool.commonPool-worker-3: 10 + 35 = 45
45

它使用的线程是ForkJoinPool里面的commonPool里面的worker线程。并且它们是并行计算的,并不是串行计算的。但由于Fork/Join框架的作用,它最终能很好的协调计算结果,使得计算结果完全正确。

需要注意的是,一个Java进程的Stream并行计算任务默认共享同一个线程池,如果随意的使用并行特性可能会导致方法的吞吐量下降。自定义的ForkJoin线程池:

ForkJoinPool customThreadPool = new ForkJoinPool(4);
long actualTotal = customThreadPool.submit(() -> roster.parallelStream().reduce(0, Integer::sum)).get();

第二十章 计划任务

JDK提供了ScheduledThreadPoolExecutor类用于计划任务(又称定时任务),这个类有两个用途:

  • 在给定的延迟之后运行任务
  • 周期性重复执行任务

在这之前,是使用Timer类来完成定时任务的,但是Timer有缺陷:

  • Timer是单线程模式;
  • 如果在执行任务期间某个TimerTask耗时较久,那么就会影响其它任务的调度;
  • Timer的任务调度是基于绝对时间的,对系统时间敏感;
  • Timer不会捕获执行TimerTask时所抛出的异常,由于Timer是单线程,所以一旦出现异常,则线程就会终止,其他任务也得不到执行。

使用案例

假设我有一个需求,指定时间给大家发送消息。那么我们会将消息(包含发送时间)存储在数据库中,然后想用一个定时任务,每隔1秒检查数据库在当前时间有没有需要发送的消息,那这个计划任务怎么写?下面是一个Demo:

public class ThreadPool {
    private static final ScheduledExecutorService executor = newScheduledThreadPoolExecutor(1, Executors.defaultThreadFactory());private static SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");public static void main(String[] args){
    // 新建一个固定延迟时间的计划任务executor.scheduleWithFixedDelay(new Runnable() {
    @Overridepublic void run() {
    if (haveMsgAtCurrentTime()) {
    System.out.println(df.format(new Date()));System.out.println("大家注意了,我要发消息了");}}}, 1, 1, TimeUnit.SECONDS);}public static boolean haveMsgAtCurrentTime(){
    //查询数据库,有没有当前时间需要发送的消息//这里省略实现,直接返回truereturn true;}
}

下面截取前面的输出(这个demo会一直运行下去):

2019-01-23 16:16:48
大家注意了,我要发消息了
2019-01-23 16:16:49
大家注意了,我要发消息了

类结构

public class ScheduledThreadPoolExecutor extends ThreadPoolExecutorimplements ScheduledExecutorService {
    public ScheduledThreadPoolExecutor(int corePoolSize,ThreadFactory threadFactory) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue(), threadFactory);}
}

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,实现了ScheduledExecutorService

public interface ScheduledExecutorService extends ExecutorService {
    public ScheduledFuture<?> schedule(Runnable command,long delay, TimeUnit unit);public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay, TimeUnit unit);public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
}

ScheduledExecutorService实现了ExecutorService ,并增加若干定时相关的接口。 前两个方法用于单次调度执行任务,区别是有没有返回值。重点理解一下后面两个方法:

  • scheduleAtFixedRate

    该方法在initialDelay时长后第一次执行任务,以后每隔period时长,再次执行任务。注意,period是从任务开始执行算起的。开始执行任务后,定时器每隔period时长检查该任务是否完成,如果完成则再次启动任务,否则等该任务结束后才再次启动任务。

  • scheduleWithFixDelay

    该方法在initialDelay时长后第一次执行任务,以后每当任务执行完成后,等待delay时长,再次执行任务。

主要方法介绍

schedule

// delay时长后执行任务command,该任务只执行一次
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    if (command == null || unit == null)throw new NullPointerException();// 这里的decorateTask方法仅仅返回第二个参数RunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null, triggerTime(delay,unit)));// 延时或者周期执行任务的主要方法,稍后统一说明delayedExecute(t);return t;
}

scheduledAtFixedRate

// 注意,固定速率和固定时延,传入的参数都是Runnable,也就是说这种定时任务是没有返回值的
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {
    if (command == null || unit == null)throw new NullPointerException();if (period <= 0)throw new IllegalArgumentException();// 创建一个有初始延时和固定周期的任务ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));RunnableScheduledFuture<Void> t = decorateTask(command, sft);// outerTask表示将会重新入队的任务sft.outerTask = t;// 稍后说明delayedExecute(t);return t;
}

scheduleAtFixedRate这个方法和schedule类似,不同点是scheduleAtFixedRate方法内部创建的是ScheduledFutureTask,带有初始延时和固定周期的任务 。

scheduledAtFixedDelay

FixedDelay也是通过ScheduledFutureTask体现的,唯一不同的地方在于创建的ScheduledFutureTask不同

delayedExecute

前面讲到的schedulescheduleAtFixedRatescheduleAtFixedDelay最后都调用了delayedExecute方法,该方法是定时任务执行的主要方法:

private void delayedExecute(RunnableScheduledFuture<?> task) {
    // 线程池已经关闭,调用拒绝执行处理器处理if (isShutdown())reject(task);else {
    // 将任务加入到等待队列super.getQueue().add(task);// 线程池已经关闭,且当前状态不能运行该任务,将该任务从等待队列移除并取消该任务if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);else// 增加一个worker,就算corePoolSize=0也要增加一个workerensurePrestart();}
}

delayedExecute方法的逻辑也很简单,主要就是将任务添加到等待队列,然后调用ensurePrestart方法。

void ensurePrestart() {
    int wc = workerCountOf(ctl.get());if (wc < corePoolSize)addWorker(null, true);else if (wc == 0)addWorker(null, false);
}

ensurePrestart方法主要是调用了addWorker,线程池中的工作线程是通过该方法来启动并执行任务的。 对于ScheduledThreadPoolExecutorworker添加到线程池后会在等待队列上等待获取任务,这点是和ThreadPoolExecutor一致的

worker是怎么从等待队列取定时任务的?

ScheduledThreadPoolExecutor使用了DelayedWorkQueue保存等待的任务

DelayedWorkQueue

该等待队列队首应该保存的是最近将要执行的任务,所以worker只关心队首任务即可,如果队首任务的开始执行时间还未到,worker也应该继续等待。

DelayedWorkQueue是一个无界优先队列,使用数组存储,底层是使用堆结构来实现优先队列的功能。我们先看看DelayedWorkQueue的声明和成员变量:

static class DelayedWorkQueue extends AbstractQueue<Runnable>
implements BlockingQueue<Runnable> {
    // 队列初始容量private static final int INITIAL_CAPACITY = 16;// 数组用来存储定时任务,通过数组实现堆排序private RunnableScheduledFuture[] queue = new RunnableScheduledFuture[INITIAL_CAPACITY];// 当前在队首等待的线程private Thread leader = null;// 锁和监视器,用于leader线程private final ReentrantLock lock = new ReentrantLock();private final Condition available = lock.newCondition();// 其他代码,略
}

当一个线程成为leader,它只要等待队首任务的delay时间即可,其他线程会无条件等待。leader取到任务返回前要通知其他线程,直到有线程成为新的leader。每当队首的定时任务被其他更早需要执行的任务替换时,leader设置为null,其他等待的线程(被当前leader通知)和当前的leader重新竞争成为leader。同时,定义了锁lock和监视器available用于线程竞争成为leader。 当一个新的任务成为队首,或者需要有新的线程成为leader时,available监视器上的线程将会被通知,然后竞争成为leader线程。 有些类似于生产者-消费者模式。几个比较重要的方法有(简):take、offer