当前位置: 代码迷 >> 综合 >> ScheduledThreadPoolExecutor(定时任务线程池)简单理解
  详细解决方案

ScheduledThreadPoolExecutor(定时任务线程池)简单理解

热度:7   发布时间:2023-11-25 13:39:55.0

本文主要介绍了可定时线程池的核心原理,从宏观角度大概分析了线程池工作方式,如有不足,请指出,谢谢。

1.什么是线程池

线程池顾名思义是一个线程缓存的‘池子’。线程是稀缺资源,线程如果创建的太多,会消耗系统的资源,还会降低系统的稳定性,所以java中通过线程池来统一管理分配线程这个稀缺的资源,达到资源重复利用。

2.线程池的出现

在web系统中,服务器需要接受大量的并发请求,一个请求就会对应一个线程,如果并发的请求很多,但每个线程执行的时间很短,这样系统就会频繁的创建和销毁线程,系统的性能就会受到影响。那么是否存在一种方式,线程执行任务后并不销毁,而是重复利用?这就是线程池出现的目的。

3.ScheduledThreadPoolExecutor

  • 之前的一篇文章分析了线程池ThreadPoolExecutor的工作原理传送门,这篇文章主要介绍可定时任务的线程ScheduledThreadPoolExecutor,它也是线程池的一种,从下面类图可以看出,ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,ScheduledThreadPoolExecutor的大部分逻辑在ThreadPoolExecutor线程池中已经实现,如果不太清楚ThreadPoolExecutor,可以参考上一篇文章的讲解了,详细信息可查看。

在这里插入图片描述

4.主要提交任务的方法

ScheduledThreadPoolExecutor继承了ThreadPoolExecutor,所以具备线程池提交任务的一般方法[submit()与execute()],同时还具备提交定时任务的几个方法,如下所示:

//延后delay时长执行Runnable任务
public ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();RunnableScheduledFuture<?> t = decorateTask(command,new ScheduledFutureTask<Void>(command, null,triggerTime(delay, unit)));delayedExecute(t);return t;
}//延后delay时长执行Callable任务
public <V> ScheduledFuture<V> schedule(Callable<V> callable,long delay,TimeUnit unit) {if (callable == null || unit == null)throw new NullPointerException();RunnableScheduledFuture<V> t = decorateTask(callable,new ScheduledFutureTask<V>(callable,triggerTime(delay, unit)));delayedExecute(t);return t;
}//延后执行Runnable 任务,以后每隔period的时长再次执行该任务
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (period <= 0)throw new IllegalArgumentException();ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t;
}//延后执行Runnable 任务,以后任务执行完成后等待delay时长,再次执行任务
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit) {if (command == null || unit == null)throw new NullPointerException();if (delay <= 0)throw new IllegalArgumentException();ScheduledFutureTask<Void> sft =new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(-delay));RunnableScheduledFuture<Void> t = decorateTask(command, sft);sft.outerTask = t;delayedExecute(t);return t;
}
  • 从上面几个方法可以看出,都执行了delayedExecute(t)方法,先分析下该方法:
private void delayedExecute(RunnableScheduledFuture<?> task) {// 线程池关闭就拒绝任务if (isShutdown())reject(task);else {// 添加任务到延迟队列中,而ThreadPoolExecute线程池是线程// 数大于核心线程时才添加任务到阻塞队列,这里是第一次添加// 任务,因为是定时线程池,在延迟队列中的任务执行run方法时// 会将任务继续添加进延迟队列super.getQueue().add(task);if (isShutdown() &&!canRunInCurrentRunState(task.isPeriodic()) &&remove(task))task.cancel(false);else// 这里是增加一个worker线程,避免提交的任务没有worker去执行ensurePrestart();}
}
// 该方法是ThreadPoolExecute线程池里面的方法
void ensurePrestart() {int wc = workerCountOf(ctl.get());if (wc < corePoolSize)//这里就和ThreadPoolExecute线程池里的方法一样,创建一个没有任务的线程addWorker(null, true);else if (wc == 0)addWorker(null, false);
}

5.ScheduledFutureTask

  • ScheduledThreadPoolExecutor线程池中提交的任务都会被封装成ScheduledFutureTask类型的任务,ScheduledFutureTask是ScheduledThreadPoolExecutor中的一个内部类,实现了Runnable接口,也就是说,ScheduledFutureTask是一个线程任务,接下来我们简单分析下ScheduledFutureTask。
  • 构造方法如下
ScheduledFutureTask(Runnable r, V result, long ns) {super(r, result);this.time = ns;//任务开始的时间this.period = 0;//任务执行的时间间隔this.sequenceNumber = sequencer.getAndIncrement();//任务的序号
}
  • 线程池中,需要用阻塞队列保存等待被执行的任务,ScheduledThreadPoolExecutor线程池中通过DelayQueue阻塞队列来存储等待的任务。DelayQueue接收SchduledFutureTask类型的任务。DelayQueue封装了一个PriorityQueue,PriorityQueue会对队列中的ScheduledFutureTask进行排序,具体算法如下:
public int compareTo(Delayed other) {if (other == this) // compare zero if same objectreturn 0;if (other instanceof ScheduledFutureTask) {ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;long diff = time - x.time;if (diff < 0)return -1;else if (diff > 0)return 1;else if (sequenceNumber < x.sequenceNumber)return -1;elsereturn 1;}long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
  1. DelayQueue内部封装了一个PriorityQueue,它会根据time的先后时间排序,若
    time相同则根据sequenceNumber排序;
  2. DelayQueue也是一个无界队列,底层数据结构为最小堆,用数组存储,初始容量16;
  • 由上面分析可知ScheduledFutureTask任务是线程池执行的具体任务,我们来看一下它的run方法:
public void run() {// 是否周期性执行boolean periodic = isPeriodic();if (!canRunInCurrentRunState(periodic))cancel(false);else if (!periodic)//如果不需要周期性执行,则直接执行run方法然后结束ScheduledFutureTask.super.run();//ScheduledFutureTask.super.runAndReset()就是执行自己提交的任务的run方法else if (ScheduledFutureTask.super.runAndReset()) {// 计算下次执行任务的时间setNextRunTime();// 重复执行任务reExecutePeriodic(outerTask);}
}protected boolean runAndReset() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return false;boolean ran = false;int s = state;try {Callable<V> c = callable;if (c != null && s == NEW) {try {c.call(); // 这里设置执行自己任务的方法ran = true;} catch (Throwable ex) {setException(ex);}}} finally {runner = null;s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}return ran && s == NEW;
}void reExecutePeriodic(RunnableScheduledFuture<?> task) {if (canRunInCurrentRunState(true)) {// 这里是添加任务到队列super.getQueue().add(task);if (!canRunInCurrentRunState(true) && remove(task))task.cancel(false);else     // 该方法是ThreadPoolExecute线程池里面的方法ensurePrestart();}
}