线程池
- 线程池
-
- 线程池种类:
- ThreadPoolExecuter
-
- 类关系
- 结构
- 自定义线程池参数:
- 线程池的大小(maximumPoolSize):
- JDK提供的线程池
-
- singleThreadPool
- CachedThreadPool
- fixedThreadPool
- ScheduledPool
- 线程池使用示例:
- ThreadPoolExecuter源码解读
- ForkJoinPool
-
- WorkStealingPool:
- WorkStealingPool:
- ParallelStream:
线程池
线程池种类:
ThreadPoolExecutor
ForkJoinPool-分解汇总任务,用于CPU密集型
ThreadPoolExecuter
类关系
ThreadPoolExecutor他的父类是从AbstractExecutorService,而AbstractExecutorService的父类是ExecutorService,再ExecutorService的父类是Executor,所以ThreadPoolExecutor就相当于线程池的执行器,可以向这个池子里面扔任务,让这个线程池去运行。
1.Executor(Interface,只有execute方法,使线程的定义和执行分离) -extends->
2.?ExecutorService(Interface,定义了线程生命周期方法) -implements->
3.AbstractExecutorService(abstract,class,实现了部分线程生命周期方法(因为是抽象类所以可以不全部实现),作为线程池的父类) -extends->
4.?ThreadPoolExecutor -构造方法应用->
5.?singleThreadPool、CachedThreadPool,自定义线程池等
结构
HashSet(存储worker)+BlockingQueue(存储任务)
自定义线程池参数:
- 第一个参数**corePoolSoze核心线程数;
- 第二个叫**maximumPoolSize最大线程数,线程数不够了,能扩展到最大线程是多少,包含核心线程;
- 第三个**keepAliveTime线程生存时间,线程有很长时间没干活了把它归还给操作系统;
- 第四个TimeUnit.SECONDS生存时间的单位到底是毫秒纳秒还是秒自己去定义;
- 第五个是**任务队列,各种各样的BlockingQueue都可以往里面扔,不同的queue有不同的队列接收模式;
- 第六个是线程工厂defaultThreadFactory,他返回的是一个new DefaultThreadFactory,要实现ThreadFactory的接口,这个接口只有一个方法叫newThread,所以就是产生线程的,可以通过这种方式产生自定义的线程,默认产生的是defaultThreadFactory,而defaultThreadFactory产生线程的时候有几个特点:new出来的时候指定了group制定了线程名字,然后指定的这个线程绝对不是守护线程,设定好你线程的优先级。自己可以定义产生的到底是什么样的线程,指定线程名叫什么(为什么要指定线程名称,有什么意义,就是可以方便出错是回溯);
- 第七个叫拒绝策略,指的是线程池忙,而且任务队列满这种情况下我们就要执行各种各样的拒绝策略,jdk默认提供了四种拒绝策略,也是可以自定义的。
1:Abort:抛异常
2:Discard:扔掉,不抛异常
3:DiscardOldest:扔掉排队时间最久的
4:CallerRuns:调用者处理服务
一般情况这四种我们会自定义策略,去实现这个拒绝策略的接口,处理的方式是一般我们的消息需要保存下来,要是订单的话那就更需要保存了,保存到kafka,保存到redis或者是存到数据库随便你然后做好日志。
线程池的大小(maximumPoolSize):
N_threads=N_cpu * UseCap_cpu * (1 + Wait/Calculation)
UseCap_cpu :期望的CPU的利用率
Wait、Calculation:线程的等待、计算时间(集散密集型,IO密集型)
实际使用时一定要压测
JDK提供的线程池
JDK提供了4种默认线程池
Executors:线程池的方法类
singleThreadPool
只有一个线程的线程池,可以保证里面的任务顺序执行
ExecutorService service = Executors.newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
}
既然只有一个线程,为什么还要用线程池呢?
1.线程池可以自己维护等待队列,显示创建线程需要自己维护
2.线程池可以对线程进行生命周期管理,而不用开发者自己去调用方法
CachedThreadPool
适用于波动型
ExecutorService service = Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
}
fixedThreadPool
适用于平稳型
ExecutorService service = Executors.newFixedThreadPool(cpuCoreNum);
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
}
ScheduledPool
定时任务线程池
用的比较少,简单的用定时器,复杂的用定时框架(比如quartz,cron)
ScheduledExecutorService service = Executors.newScheduledThreadPool(4);
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
}
如果有一个闹钟服务,十亿人订阅,如何设计?
大的角度上将流量从主服务器转移到多个边缘服务器,单台服务器里面使用线程池加队列
线程池使用示例:
public class HelloThreadPool {
static class Task implements Runnable {
private int i;public Task(int i) {
this.i = i;}@Overridepublic void run() {
System.out.println(Thread.currentThread().getName() + " Task " + i);}@Overridepublic String toString() {
return "Task{" +"i=" + i +'}';}}public static void main(String[] args) {
ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 4,60, TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(4),Executors.defaultThreadFactory(),new ThreadPoolExecutor.CallerRunsPolicy());for (int i = 0; i < 7; i++) {
tpe.execute(new Task(i));}System.out.println(tpe.getQueue());try {
System.in.read();} catch (IOException e) {
e.printStackTrace();}tpe.shutdown();}
}
ThreadPoolExecuter源码解读
-
主要结构
ThreadPoolExecutor:
//ctl:AtomicInteger类型,共32位,前3位表示线程池的状态,后29位表示线程数量,这样做的原因猜测为:减少cas的复杂度 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); //workers:存储Workers的实例对象,可以认为是存储的线程对象 private final HashSet<Worker> workers = new HashSet<Worker>(); //workQueue:存储等待执行的任务即Runnable private final BlockingQueue<Runnable> workQueue;
Worker:
//线程 final Thread thread; //当前worker自身携带的任务 Runnable firstTask; //已完成的任务数量 volatile long completedTasks;
- 过程图解
- 过程图解
tips:
-
worker.thread在while循环执行任务时可以理解为消费者,那么main线程放入任务时可以理解为生产者
-
核心线程与非核心线程本身并没有区别,只是用coreSize(核心线程数量)划出的一条界限
-
线程池需要维护一定数量的线程,这些线程在执行完任务且任务队列为空时不会结束而是阻塞,等待在队列中获取任务,这些线程就是核心线程,其他线程在执行完任务且任务队列为空时会被结束掉,当要被结束掉时做过线程还会检查一下在workQueue不为空且核心线程是否有空余位置,如果有的话会再创建一个线程,这个线程可能是核心线程
-
getTask()方法在workQueue为空的情况下,通过BlockingQueue对于poll()和take()的不同处理来控制核心线程和非核心线程的返回
Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();
ForkJoinPool
分解汇总任务,用于CPU密集型
使用示例:
public class T12_ForkJoinPool {
static int[] nums = new int[1000000];static final int MAX_NUM = 50000;static Random r = new Random();static {
for(int i=0; i<nums.length; i++) {
nums[i] = r.nextInt(100);}System.out.println("---" + Arrays.stream(nums).sum()); //stream api}//子任务无返回值static class AddTask extends RecursiveAction {
int start, end;AddTask(int s, int e) {
start = s;end = e;}@Overrideprotected void compute() {
if(end-start <= MAX_NUM) {
long sum = 0L;for(int i=start; i<end; i++) sum += nums[i];System.out.println("from:" + start + " to:" + end + " = " + sum);} else {
int middle = start + (end-start)/2;AddTask subTask1 = new AddTask(start, middle);AddTask subTask2 = new AddTask(middle, end);subTask1.fork();subTask2.fork();}}}//子任务有返回值static class AddTaskRet extends RecursiveTask<Long> {
private static final long serialVersionUID = 1L;int start, end;AddTaskRet(int s, int e) {
start = s;end = e;}@Overrideprotected Long compute() {
if(end-start <= MAX_NUM) {
long sum = 0L;for(int i=start; i<end; i++) sum += nums[i];return sum;} int middle = start + (end-start)/2;AddTaskRet subTask1 = new AddTaskRet(start, middle);AddTaskRet subTask2 = new AddTaskRet(middle, end);subTask1.fork();subTask2.fork();return subTask1.join() + subTask2.join();}}public static void main(String[] args) throws IOException {
ForkJoinPool fjp = new ForkJoinPool();AddTask task = new AddTask(0, nums.length);fjp.execute(task);AddTaskRet taskRet = new AddTaskRet(0, nums.length);fjp.execute(taskRet);long result = taskRet.join();System.out.println(result);}
}
WorkStealingPool:
ForkJoinPool的一种
适用:任务规模较大,但可以拆解成子任务的情况,支持子任务带/不带返回值(类似递归)
fjp.execute(task);AddTaskRet taskRet = new AddTaskRet(0, nums.length);fjp.execute(taskRet);long result = taskRet.join();System.out.println(result);}
}
WorkStealingPool:
ForkJoinPool的一种
适用:任务规模较大,但可以拆解成子任务的情况,支持子任务带/不带返回值(类似递归)
结构:每个线程有单独的一个队列,自己队列的任务执行完后可以去其他队列拿任务执行
ParallelStream:
ParallelStream流式处理底层也使用了ForkJoinPool线程池
List<Integer> nums = new ArrayList<>();
nums.parallelStream().forEach(ClassName::Method);