当前位置: 代码迷 >> 综合 >> 线程池(ThreadPoolExecutor)详解
  详细解决方案

线程池(ThreadPoolExecutor)详解

热度:40   发布时间:2023-11-15 08:54:24.0

线程池

  • 线程池
    • 线程池种类:
    • ThreadPoolExecuter
      • 类关系
      • 结构
      • 自定义线程池参数:
      • 线程池的大小(maximumPoolSize):
    • JDK提供的线程池
      • singleThreadPool
      • CachedThreadPool
      • fixedThreadPool
      • ScheduledPool
      • 线程池使用示例:
    • ThreadPoolExecuter源码解读
    • ForkJoinPool
      • WorkStealingPool:
      • WorkStealingPool:
      • ParallelStream:

线程池

线程池种类:

ThreadPoolExecutor

ForkJoinPool-分解汇总任务,用于CPU密集型

ThreadPoolExecuter

类关系

ThreadPoolExecutor他的父类是从AbstractExecutorService,而AbstractExecutorService的父类是ExecutorService,再ExecutorService的父类是Executor,所以ThreadPoolExecutor就相当于线程池的执行器,可以向这个池子里面扔任务,让这个线程池去运行。

1.Executor(Interface,只有execute方法,使线程的定义和执行分离) -extends->
2.?ExecutorService(Interface,定义了线程生命周期方法) -implements->
3.AbstractExecutorService(abstract,class,实现了部分线程生命周期方法(因为是抽象类所以可以不全部实现),作为线程池的父类) -extends->
4.?ThreadPoolExecutor -构造方法应用->
5.?singleThreadPool、CachedThreadPool,自定义线程池等

结构

HashSet(存储worker)+BlockingQueue(存储任务)

自定义线程池参数:

  • 第一个参数**corePoolSoze核心线程数;
  • 第二个叫**maximumPoolSize最大线程数,线程数不够了,能扩展到最大线程是多少,包含核心线程;
  • 第三个**keepAliveTime线程生存时间,线程有很长时间没干活了把它归还给操作系统;
  • 第四个TimeUnit.SECONDS生存时间的单位到底是毫秒纳秒还是秒自己去定义;
  • 第五个是**任务队列,各种各样的BlockingQueue都可以往里面扔,不同的queue有不同的队列接收模式;
  • 第六个是线程工厂defaultThreadFactory,他返回的是一个new DefaultThreadFactory,要实现ThreadFactory的接口,这个接口只有一个方法叫newThread,所以就是产生线程的,可以通过这种方式产生自定义的线程,默认产生的是defaultThreadFactory,而defaultThreadFactory产生线程的时候有几个特点:new出来的时候指定了group制定了线程名字,然后指定的这个线程绝对不是守护线程,设定好你线程的优先级。自己可以定义产生的到底是什么样的线程,指定线程名叫什么(为什么要指定线程名称,有什么意义,就是可以方便出错是回溯);
  • 第七个叫拒绝策略,指的是线程池忙,而且任务队列满这种情况下我们就要执行各种各样的拒绝策略,jdk默认提供了四种拒绝策略,也是可以自定义的。
    1:Abort:抛异常
    2:Discard:扔掉,不抛异常
    3:DiscardOldest:扔掉排队时间最久的
    4:CallerRuns:调用者处理服务
    一般情况这四种我们会自定义策略,去实现这个拒绝策略的接口,处理的方式是一般我们的消息需要保存下来,要是订单的话那就更需要保存了,保存到kafka,保存到redis或者是存到数据库随便你然后做好日志。

线程池的大小(maximumPoolSize):

N_threads=N_cpu * UseCap_cpu * (1 + Wait/Calculation)

UseCap_cpu :期望的CPU的利用率

Wait、Calculation:线程的等待、计算时间(集散密集型,IO密集型)

实际使用时一定要压测

JDK提供的线程池

JDK提供了4种默认线程池

Executors:线程池的方法类

singleThreadPool

只有一个线程的线程池,可以保证里面的任务顺序执行

ExecutorService service = Executors.newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));   
}

既然只有一个线程,为什么还要用线程池呢?

1.线程池可以自己维护等待队列,显示创建线程需要自己维护

2.线程池可以对线程进行生命周期管理,而不用开发者自己去调用方法

CachedThreadPool

适用于波动型

ExecutorService service = Executors.newCachedThreadPool(); 
public static ExecutorService newCachedThreadPool() {
           return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());    
}

fixedThreadPool

适用于平稳型

ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,                                      new LinkedBlockingQueue<Runnable>());    
}

ScheduledPool

定时任务线程池

用的比较少,简单的用定时器,复杂的用定时框架(比如quartz,cron)

ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
public ScheduledThreadPoolExecutor(int corePoolSize) {
     super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());    
}

如果有一个闹钟服务,十亿人订阅,如何设计?

大的角度上将流量从主服务器转移到多个边缘服务器,单台服务器里面使用线程池加队列

线程池使用示例:

public class HelloThreadPool {
    static class Task implements Runnable {
    private int i;public Task(int i) {
    this.i = i;}@Overridepublic void run() {
    System.out.println(Thread.currentThread().getName() + " Task " + i);}@Overridepublic String toString() {
    return "Task{" +"i=" + i +'}';}}public static void main(String[] args) {
    ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 4,60, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(4),Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());for (int i = 0; i < 7; i++) {
    tpe.execute(new Task(i));}System.out.println(tpe.getQueue());try {
    System.in.read();} catch (IOException e) {
    e.printStackTrace();}tpe.shutdown();}
}

ThreadPoolExecuter源码解读

  1. 主要结构

    ThreadPoolExecutor:

    //ctl:AtomicInteger类型,共32位,前3位表示线程池的状态,后29位表示线程数量,这样做的原因猜测为:减少cas的复杂度
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //workers:存储Workers的实例对象,可以认为是存储的线程对象
    private final HashSet<Worker> workers = new HashSet<Worker>();
    //workQueue:存储等待执行的任务即Runnable
    private final BlockingQueue<Runnable> workQueue;
    

    Worker:

    //线程
    final Thread thread;
    //当前worker自身携带的任务
    Runnable firstTask;
    //已完成的任务数量
    volatile long completedTasks;
    
    1. 过程图解
      在这里插入图片描述

tips:

  • worker.thread在while循环执行任务时可以理解为消费者,那么main线程放入任务时可以理解为生产者

  • 核心线程与非核心线程本身并没有区别,只是用coreSize(核心线程数量)划出的一条界限

  • 线程池需要维护一定数量的线程,这些线程在执行完任务且任务队列为空时不会结束而是阻塞,等待在队列中获取任务,这些线程就是核心线程,其他线程在执行完任务且任务队列为空时会被结束掉,当要被结束掉时做过线程还会检查一下在workQueue不为空且核心线程是否有空余位置,如果有的话会再创建一个线程,这个线程可能是核心线程

  • getTask()方法在workQueue为空的情况下,通过BlockingQueue对于poll()和take()的不同处理来控制核心线程和非核心线程的返回

    Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();
    

ForkJoinPool

分解汇总任务,用于CPU密集型

使用示例:

public class T12_ForkJoinPool {
    static int[] nums = new int[1000000];static final int MAX_NUM = 50000;static Random r = new Random();static {
    for(int i=0; i<nums.length; i++) {
    nums[i] = r.nextInt(100);}System.out.println("---" + Arrays.stream(nums).sum()); //stream api}//子任务无返回值static class AddTask extends RecursiveAction {
    int start, end;AddTask(int s, int e) {
    start = s;end = e;}@Overrideprotected void compute() {
    if(end-start <= MAX_NUM) {
    long sum = 0L;for(int i=start; i<end; i++) sum += nums[i];System.out.println("from:" + start + " to:" + end + " = " + sum);} else {
    int middle = start + (end-start)/2;AddTask subTask1 = new AddTask(start, middle);AddTask subTask2 = new AddTask(middle, end);subTask1.fork();subTask2.fork();}}}//子任务有返回值static class AddTaskRet extends RecursiveTask<Long> {
    private static final long serialVersionUID = 1L;int start, end;AddTaskRet(int s, int e) {
    start = s;end = e;}@Overrideprotected Long compute() {
    if(end-start <= MAX_NUM) {
    long sum = 0L;for(int i=start; i<end; i++) sum += nums[i];return sum;} int middle = start + (end-start)/2;AddTaskRet subTask1 = new AddTaskRet(start, middle);AddTaskRet subTask2 = new AddTaskRet(middle, end);subTask1.fork();subTask2.fork();return subTask1.join() + subTask2.join();}}public static void main(String[] args) throws IOException {
    ForkJoinPool fjp = new ForkJoinPool();AddTask task = new AddTask(0, nums.length);fjp.execute(task);AddTaskRet taskRet = new AddTaskRet(0, nums.length);fjp.execute(taskRet);long result = taskRet.join();System.out.println(result);}
}

WorkStealingPool:

ForkJoinPool的一种

适用:任务规模较大,但可以拆解成子任务的情况,支持子任务带/不带返回值(类似递归)

  fjp.execute(task);AddTaskRet taskRet = new AddTaskRet(0, nums.length);fjp.execute(taskRet);long result = taskRet.join();System.out.println(result);}
}

WorkStealingPool:

ForkJoinPool的一种

适用:任务规模较大,但可以拆解成子任务的情况,支持子任务带/不带返回值(类似递归)

结构:每个线程有单独的一个队列,自己队列的任务执行完后可以去其他队列拿任务执行

ParallelStream:

ParallelStream流式处理底层也使用了ForkJoinPool线程池

List<Integer> nums = new ArrayList<>();
nums.parallelStream().forEach(ClassName::Method);