当前位置: 代码迷 >> 综合 >> @Async 异步任务自定义线程池的配置方法和 @Scheduled 定时任务自定义线程池的配置方式
  详细解决方案

@Async 异步任务自定义线程池的配置方法和 @Scheduled 定时任务自定义线程池的配置方式

热度:61   发布时间:2023-12-08 12:43:33.0

文章目录

        • 一、定时和异步业务场景描述
        • 二、定时调度任务的实现方式
        • 三、定时调度任务的问题描述
        • 四、定时调度多线程解决方案(方案一)
        • 五、异步多线程程序实现方式
        • 六、定时调度多线程解决方案(方案二)

一、定时和异步业务场景描述
  • 在项目的开发过程中,定时任务调度的场景是非常常见的项目功能,随着业务的不断迭代和日益复杂化,可能一个项目中还会出现定时任务调度和异步接口调度共同存在的情况。

  • 本案例主要讲述如何通过 @Scheduled 注解来完成定时任务的调度工作,以及非定时任务的接口异步任务的自定义线程池的配置工作。

二、定时调度任务的实现方式
  1. 在启动类添加 @EnableScheduling 注解,用以允许应用程序启动定时任务调度器

    /*** Copyright (C), 1998-2021, Shenzhen Rambo Technology Co., Ltd* Spring Boot Scheduled Sample 演示启动类** @author Rambo* @date 2021/02/19 17:06* @since 1.0.0.1*/
    @SpringBootApplication
    @EnableScheduling
    public class SpringBootScheduleSampleApplication {
          public static void main(String[] args) {
          SpringApplication.run(SpringBootScheduleSampleApplication.class, args);}
    
  2. 编写定时任务

    /*** Copyright (C), 1998-2021, Shenzhen Rambo Technology Co., Ltd* 定时任务组件类** @author Rambo* @date 2021/2/20 09:50* @since 1.0.0.1*/
    @Component
    @Slf4j
    public class ScheduledTasks {
          @Scheduled(cron = "0/1 * * * * ?")public void scheduledCron1() throws InterruptedException {
          // 模拟该任务响应时间较长,导致所有采用 @Scheduled 注解调度的定时任务都被阻塞TimeUnit.SECONDS.sleep(10);log.info("-------------> 调度线程名称:[{}],被调度方法名称:[ScheduledTasks1.scheduledCron1()],执行频率:1秒/次,当前时间:[{}]", Thread.currentThread().getName(), DateUtil.now());}@Scheduled(cron = "0/2 * * * * ?")public void scheduledCron2() {
          log.info("-------------> 调度线程名称:[{}],被调度方法名称:[ScheduledTasks1.scheduledCron2()],执行频率:2秒/次,当前时间:[{}]", Thread.currentThread().getName(), DateUtil.now());}@Scheduled(cron = "0/3 * * * * ?")public void scheduledCron3() {
          log.info("-------------> 调度线程名称:[{}],被调度方法名称:[ScheduledTasks1.scheduledCron3()],执行频率:3秒/次,当前时间:[{}]", Thread.currentThread().getName(), DateUtil.now());}
    }
    

    P.S
    @Scheduled 注解所支持的参数

    参数属性 属性描述
    cron cron表达式,指定任务在特定时间执行
    fixedDelay 表示上一次任务执行完成后多久再次执行,参数类型为long,单位ms
    fixedDelayString 与fixedDelay含义一样,只是参数类型变为String
    fixedRate 表示按一定的频率执行任务,参数类型为long,单位ms
    fixedRateString 与fixedRate的含义一样,只是将参数类型变为String
    initialDelay 表示延迟多久再第一次执行任务,参数类型为long,单位ms
    initialDelayString 与initialDelay的含义一样,只是将参数类型变为String
    zone 时区,默认为当前时区,一般没有用到
  3. 查看启动效果
    1

三、定时调度任务的问题描述
  • 按照以上方式配置,不管在哪个类中,只要采用 @Scheduled 注解来调度任务方法,所有的任务方法都默认使用同一个线程池中的同一个线程来进行任务调度。

  • 所有采用 @Scheduled 注解的方法,可以交替执行(表面上),但是如果任何一个被调度的方法响应时间过长或者执行时间过长,将会导致所有被 @Scheduled 注解调度的任务呈阻塞状态。

  • 一旦出现阻塞状态,所谓的定时调度也就失去了原本预定的意义,需要特别注意。

四、定时调度多线程解决方案(方案一)
  1. 配置文件配置任务调度线程池的配置信息

    # 自定义调度任务线程池
    schedule-pool:# 核心线程池大小core-pool-size: 20# 自定义线程名称thread-name-prefix: SCHEDULE-BIZ-# 设置终止等待时间:秒await-termination-time: 60# 线程结束前,是否等待线程队列中的任务执行完成wait-tasks-complete: true# 线程拒绝策略(ABORT_POLICY、CALLER_RUNS_POLICY、DISCARD_OLDEST_POLICY、DISCARD_POLICY)rejected-policy: CALLER_RUNS_POLICY
    
  2. 创建配置类读取配置文件的配置信息

    /*** Copyright (C), 1998-2021, Shenzhen Rambo Technology Co., Ltd* "@Scheduled" 注解定时调度器线程池配置类** @author Rambo* @date 2021/2/20 16:56* @since 1.0.0.1*/
    @Component
    @ConfigurationProperties(prefix = "schedule-pool")
    @Data
    public class SchedulePoolConfig {
          /** 核心线程池大小*/private int corePoolSize = 1;/** 自定义线程名称*/private String threadNamePrefix = "SCHEDULE";/** 设置终止等待时间:秒*/private int awaitTerminationTime = 30;/** 线程结束前,是否等待线程队列中的任务执行完成*/private boolean waitTasksComplete = true;/** 线程拒绝策略 RejectedPolicy.java*/private String rejectedPolicy = "ABORT_POLICY";
    }
    
  3. 根据自定义线程配置信息创建 ThreadPoolTaskScheduler 线程池

    /*** Copyright (C), 1998-2021, Shenzhen Rambo Technology Co., Ltd* 线程拒绝策略常量类** @author Rambo* @date 2021/2/22 14:14* @since 1.0.0.1*/
    public interface RejectedPolicy {
          /** 默认的拒绝策略,会 throw RejectedExecutionException 拒绝*/String ABORT_POLICY = "ABORT_POLICY";/** 提交任务的主线程自己去执行该任务*/String CALLER_RUNS_POLICY = "CALLER_RUNS_POLICY";/** 丢弃最老的任务,然后把新任务加入到工作队列*/String DISCARD_OLDEST_POLICY = "DISCARD_OLDEST_POLICY";/** 直接丢弃任务,没有任何异常抛出*/String DISCARD_POLICY = "DISCARD_POLICY";
    }    
    
    /*** Copyright (C), 1998-2021, Shenzhen Rambo Technology Co., Ltd* 多线程并发调度任务配置类** 此配置弊端:线程池大小无界限、默认线程队列长度 16** @author Rambo* @date 2021/2/20 10:36* @since 1.0.0.1*/
    @Configuration
    @Slf4j
    public class ScheduleThreadPoolExecutor implements SchedulingConfigurer {
          @Resourceprivate SchedulePoolConfig poolConfig;@Overridepublic void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
          // 未自定义线程池,线程池没有界限,有出现 OOM 的风险// taskRegistrar.setScheduler(Executors.newScheduledThreadPool(5));// 自定义线程池taskRegistrar.setScheduler(taskScheduler());}/*** destroyMethod = shutdown 进程结束前,执行完成等待队列中的所有任务后退出应用程序** @author Rambo* @date 2021/2/22 14:26* @return org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler*/@Bean(destroyMethod = "shutdown")public ThreadPoolTaskScheduler taskScheduler() {
          // 1. 实例化任务调度线程ThreadPoolTaskScheduler scheduler = new ThreadPoolTaskScheduler();// 2. 设置线程池大小scheduler.setPoolSize(poolConfig.getCorePoolSize());// 3. 设置线程名称scheduler.setThreadNamePrefix(poolConfig.getThreadNamePrefix());// 4. 设置等待终止时间:秒scheduler.setAwaitTerminationSeconds(poolConfig.getAwaitTerminationTime());// 5. 进程结束前,等待线程队列中的任务执行完成scheduler.setWaitForTasksToCompleteOnShutdown(poolConfig.isWaitTasksComplete());// 6. 设置拒绝策略// setRejectedExecutionHandler:当线程池已经达到 max size 的时候,如何处理新任务// AbortPolicy 默认的拒绝策略,会 throw RejectedExecutionException 拒绝// CallerRunsPolicy 提交任务的主线程自己去执行该任务// DiscardOldestPolicy 丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列// DiscardPolicy 相当大胆的策略,直接丢弃任务,没有任何异常抛出scheduler.setRejectedExecutionHandler(chooseRejectedPolicy(poolConfig.getRejectedPolicy()));// 7. 设置异常输出格式scheduler.setErrorHandler(throwable -> log.error("调度任务发生异常", throwable));log.info("------>== @Schedule 业务处理线程配置成功,核心线程池:[{}],程名称前缀:[{}] ==<------",poolConfig.getCorePoolSize(), poolConfig.getThreadNamePrefix());return scheduler;}/*** 实例化线程拒绝策略** @author Rambo* @date 2021/2/22 14:23* @param rejectedPolicy 拒绝策略枚举* @return java.util.concurrent.RejectedExecutionHandler*/private RejectedExecutionHandler chooseRejectedPolicy(String rejectedPolicy) {
          RejectedExecutionHandler handler;switch (rejectedPolicy) {
          case RejectedPolicy.CALLER_RUNS_POLICY:handler = new ThreadPoolExecutor.CallerRunsPolicy();break;case RejectedPolicy.DISCARD_OLDEST_POLICY:handler = new ThreadPoolExecutor.DiscardOldestPolicy();break;case RejectedPolicy.DISCARD_POLICY:handler = new ThreadPoolExecutor.DiscardPolicy();break;default:handler = new ThreadPoolExecutor.AbortPolicy();break;}return handler;}
    }    
    

    2

弊端

  • 如果自定义线程池配置不合理,调度任务数量大于线程池数量,并且各个线程都处于工作状态,那么新来的任务将会被阻塞,等待前面的线程执行完成后,再被执行

  • 线程池大小无界限、默认线程队列长度 16

五、异步多线程程序实现方式
  1. 应用程序启动类或者需要异步的实现类上添加 @EnableAsync 注解

    /*** Copyright (C), 1998-2021, Shenzhen Rambo Technology Co., Ltd* Spring Boot Scheduled Sample 演示启动类** @author Rambo* @date 2021/02/19 17:06* @since 1.0.0.1*/
    @SpringBootApplication
    @EnableScheduling
    @EnableAsync
    public class SpringBootScheduleSampleApplication {
          public static void main(String[] args) {
          SpringApplication.run(SpringBootScheduleSampleApplication.class, args);}
    }
    
  2. 配置文件配置异步任务线程池的配置信息

    # 自定义异步任务线程池
    async-pool:# 核心线程池大小core-pool-size: 20# 最大线程数大小maximum-pool-size: 40# 活跃时间:秒keep-alive-seconds: 300# 线程等待队列大小queue-capacity: 50# 自定义线程名称前缀thread-name-prefix: ASYNC-BIZ-# 设置终止等待时间:秒await-termination-time: 60# 线程结束前,是否等待线程队列中的任务执行完成wait-tasks-complete: true 
    
  3. 创建配置类读取配置文件异步任务线程池的配置信息

    /*** Copyright (C), 1998-2021, Shenzhen Rambo Technology Co., Ltd* @Async 注解异步线程池配置类,如果需要根据不同业务指定多个不同的异步线程池,则可以通过 @Async 指定不同自定义线程池的方式实现** @author Rambo* @date 2021/2/22 11:28* @since 1.0.0.1*/
    @Configuration
    @ConfigurationProperties(prefix = "async-pool")
    @Data
    public class AsyncPoolConfig {
          /** 核心线程池大小*/private int corePoolSize = 10;/** 最大线程数大小*/private int maximumPoolSize = 20;/** 活跃时间:秒*/private int keepAliveSeconds = 60;/** 线程等待队列大小*/private int queueCapacity = 30;/** 自定义线程名称前缀*/private String threadNamePrefix = "ASYNC-";/** 设置终止等待时间:秒*/private int awaitTerminationTime = 30;/** 线程结束前,是否等待线程队列中的任务执行完成*/private boolean waitTasksComplete = true;
    }
    
  4. 根据自定义线程配置信息创建 ThreadPoolTaskExecutor 线程池

    /*** Copyright (C), 1998-2021, Shenzhen Rambo Technology Co., Ltd* "@Async" 注解 异步任务自定义线程池装配类** @author Rambo* @date 2021/1/13 18:55* @since 1.0.0.1*/
    @Configuration
    @Slf4j
    public class AsyncThreadPoolExecutor {
          @Resourceprivate AsyncPoolConfig poolConfig;@Beanpublic Executor asyncExecutor() {
          // 1. 实例化异步任务线程池ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();// 2. 设置核心线程池大小executor.setCorePoolSize(poolConfig.getCorePoolSize());// 3. 设置最大线程数executor.setMaxPoolSize(poolConfig.getMaximumPoolSize());// 4. 设置线程等待队列大小executor.setQueueCapacity(poolConfig.getQueueCapacity());// 5. 设置活跃时间:秒executor.setKeepAliveSeconds(poolConfig.getKeepAliveSeconds());// 6. 设置线程名字前缀executor.setThreadNamePrefix(poolConfig.getThreadNamePrefix());// setRejectedExecutionHandler:当线程池已经达到 max size 的时候,如何处理新任务// AbortPolicy 默认的拒绝策略,会 throw RejectedExecutionException 拒绝// CallerRunsPolicy 提交任务的主线程自己去执行该任务// DiscardOldestPolicy 丢弃最老的任务,其实就是把最早进入工作队列的任务丢弃,然后把新任务加入到工作队列// DiscardPolicy 相当大胆的策略,直接丢弃任务,没有任何异常抛出executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 7. 设置等待终止时间:秒executor.setAwaitTerminationSeconds(poolConfig.getAwaitTerminationTime());// 8. 进程结束前,等待线程队列中的任务执行完成executor.setWaitForTasksToCompleteOnShutdown(poolConfig.isWaitTasksComplete());// 9. 手动初始化线程池executor.initialize();log.info("------>== @Async 业务处理线程配置成功,核心线程池:[{}],最大线程池:[{}],队列容量:[{}],线程名称前缀:[{}] ==<------",poolConfig.getCorePoolSize(), poolConfig.getMaximumPoolSize(), poolConfig.getQueueCapacity(), poolConfig.getThreadNamePrefix());return executor;}@Beanpublic AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
          return (throwable, method, objects) -> {
          log.error("@Async 业务 ----> 异常精简信息:[{}],异常Throwable:{}", throwable.getMessage(), throwable);log.error("@Async 业务 ----> 触发异常的方法名称:{}", method.getName());};}
    }
    
  5. 在需要使用异步线程的方法上添加 @Async(value = "自定义ThreadPoolTaskExecutor的 Bean 的方法名")

    /*** Copyright (C), 1998-2021, Shenzhen Rambo Technology Co., Ltd* 异步任务线程演示控制器** @author Rambo* @date 2021/2/22 16:09* @since 1.0.0.1*/
    @RestController
    @RequestMapping("/async")
    @Slf4j
    public class AsyncController {
          @GetMapping("/info")@Async(value = "asyncExecutor")public void info() throws InterruptedException {
          log.info("-------------> 调度线程名称:[{}],被调度方法名称:[AsyncController.info()],当前时间:[{}]", Thread.currentThread().getName(), DateUtil.now());// 模拟业务任务处理耗时TimeUnit.SECONDS.sleep(5);}
    }
    
  6. 频繁疯狂请求的效果

    3

六、定时调度多线程解决方案(方案二)
  1. 根据上一步骤,我们可以知道,采用 @Async 注解也可以自定义线程池来实现异步任务

  2. 那么我们是否可以将 @Async 注解 和 @Schedule 注解同时使用,来解决默认 @Schedule 注解同一线程池同一线程处理的弊端?

  3. 代码实现

    @Async(value = "asyncExecutor")
    @Scheduled(cron = "0/2 * * * * ?")
    public void scheduledCron2() throws InterruptedException {
          log.info("-------------> 调度线程名称:[{}],被调度方法名称:[ScheduledTasks1.scheduledCron2()],执行频率:2秒/次,当前时间:[{}]", Thread.currentThread().getName(), DateUtil.now());// 模拟该任务响应时间TimeUnit.SECONDS.sleep(20);
    }
    
  4. 调度效果

    4

  相关解决方案