当前位置: 代码迷 >> 综合 >> 高并发场景实战: 线程池,countDownLatch,CompletableFuture混合使用
  详细解决方案

高并发场景实战: 线程池,countDownLatch,CompletableFuture混合使用

热度:59   发布时间:2023-11-25 19:49:06.0

写在前面: 希望大家从一开始就跟着敲, 收获一定是满满的, 末尾会留公众号,想一起努力的小朋友可以一起加入,
由兴趣可一起维护我们共同的公众号。

1:项目搭建

需要引入的maven

//提供线程池,TransmittableThreadLocal两个工具类
<dependency><groupId>com.alibaba</groupId><artifactId>transmittable-thread-local</artifactId><version>2.11.4</version>
</dependency>

下期会详细讲解: InheritableThreadLocal, ThreadLocal, TransmittableThreadLocal的区别

ThreadLocal: 每个线程之间起到隔离作用, 当前线程set的值只能被当前线程get到

InheritableThreadLocal:没有隔离作用, 父子线程数据传递问题,在线程池中,会出现缓存的问题, 比如可能当前传的id=1, 大家get都是1, 等第二次传id=2时, 此时get的值还是id=1。

TransmittableThreadLocal:阿里提供的工具类,解决了上面缓存问题,

2:场景一:线程池的execute,countDownLatch配合使用

情景: 服务A有10000个用户id, 通过openFegn组件, 去服务B查询用户信息, 中间是十分耗时的,若调用接口超时, 会导致系统异常。

低级做法: 用单线程去访问
异步做法: 使用线程池和countDownLatch, 批量去访问接口,等响应成功, 统一通过主线程返回给前端 。

这里的countDownLatch对象的作用是用来计数的. 相当于一个打疫苗的站点, 当前站点里面只允许10个人打, 打完一个走一个, 当10个人打完之后全部离开站点, 才允许外面的人进来, 如果站点里面一直有人的话, 那么外面的人也不允许进来

思路:
此处为了清晰, 数字尽可能简易化
1:如果当前用户id数量<10, 那么直接单线程进行访问服务B接口。
2:如果用户id数量>10,就要考虑线程池做异步操作。
3:假设一个线程池最高效率是处理10个用户id, 我们进行分组, 分多批线程池去处理用户id, 100/10,我们分10次去:用线程池去请求->请求–>请求–>请求–>请求–>请求->请求–>请求–>请求–>请求–>服务B
4:这里用到countDownLatch对象, 来做一个统计, 当走到调用await()方法时, countDownLatch为0时, 才执行主线程下面的逻辑, 否则阻塞主线程。
5:假设第一个线程池异步时,只处理[0,9)的ids, 第二个线程异步,只处理[9,20)的ids, …依次处理完。注意每次当一个线程池处理完后, 要将服务B返回的结果添加到线程安全的synchronizedList中, 同时countDownLatch要减1.
6: 待所有异步完成后, countDownLatch的值为0时, 可以继续往下做逻辑操作.

@Slf4j
public class AnimalCountThread {
    /*** 用户id*/private static List<Integer> userIds = new ArrayList<>();/*** 最大查询数*/private static Integer MAX_NUM = 10;private static final Logger logger = LogManager.getLogger(AnimalCountThread.class.getName());public static void main(String[] args) throws InterruptedException, ExecutionException {
    logger.info("开始时间:{}", LocalDateTime.now());//模拟数据,假设有for (int i = 0; i < 100; i++) {
    userIds.add(i);}if (userIds.size() > MAX_NUM) {
    Integer num = ((Double) Math.ceil(userIds.size() / MAX_NUM)).intValue();CountDownLatch countDownLatch = new CountDownLatch(num);List<User> synchronizedList = new CopyOnWriteArrayList();for (Integer i = 0; i < num; i++) {
    int finalI = i;ThreadpoolConfig.CUSTOMER_EXECUTOR.execute(() -> {
    try {
    int formIndex = finalI * MAX_NUM;int endIndex = (finalI + 1) * MAX_NUM - 1;List<Integer> idsBySub = userIds.subList(formIndex, endIndex);synchronizedList.addAll(getUserById(idsBySub));} catch (Exception e) {
    System.out.println(e.getMessage());} finally {
    //countDownLatch减1countDownLatch.countDown();}return synchronizedList;});}//当countDownLatch为0时,则往下面执行countDownLatch.await();//逻辑操作System.out.println("并发情况:" + synchronizedList);} else {
    System.out.println("普通情况:" + getUserById(userIds));}logger.info("结束时间:{}", LocalDateTime.now());}/*** 通过userId去查找User对象, 假设这个接口的api耗时5秒** @param userIds* @return*/public static List<User> getUserById(List<Integer> userIds) throws InterruptedException {
    List<User> userList = new ArrayList<>();System.out.println("线程进来了...");for (int i = 0; i < 10; i++) {
    User user = new User();user.setId(Integer.valueOf(i));user.setName(String.valueOf(i));userList.add(user);}Thread.sleep(5000);return userList;}
}

但是这里有个小细节, 既然是使用了线程池的submit()方法, 为什么没有用到返回值来接受呢,而非要用countDownLatch呢? (submit是有返回值, execute是没有返回值的), 我们再来一个案例看看.

3: 场景二:只用线程池来处理, 用submit()方法来执行任务

submit()会返回一个Future对象,当调用Future中的get()方法时, 会阻塞其他线程,只有get拿到值之后,才唤醒。一般这种适用于异步操作比较合适,但是再高并发情况下,使用场景一效果更佳。

@Slf4j
public class AnimalCountThread2 {
    /*** 用户id*/private static List<Integer> userIds = new ArrayList<>();/*** 最大查询数*/private static Integer MAX_NUM = 1;private static final Logger logger = LogManager.getLogger(AnimalCountThread2.class.getName());public static void main(String[] args) throws InterruptedException, ExecutionException {
    logger.info("开始时间:{}", LocalDateTime.now());for (int i = 0; i < 10; i++) {
    userIds.add(i);}if (userIds.size() > MAX_NUM) {
    Integer num = ((Double) Math.ceil(userIds.size() / MAX_NUM)).intValue();List<User> synchronizedList = new CopyOnWriteArrayList();for (Integer i = 0; i < num; i++) {
    int finalI = i;Future<List<User>> submit = ThreadpoolConfig.CUSTOMER_EXECUTOR.submit(() -> {
    int formIndex = finalI * MAX_NUM;int endIndex = (finalI + 1) * MAX_NUM - 1;List<Integer> idsBySub = userIds.subList(formIndex, endIndex);return getUserById(idsBySub);});//调用了get(),就会阻塞其他线程if (submit.get()!=null) {
    List<User> users = submit.get();synchronizedList.addAll(users);}}System.out.println("并发情况:" + synchronizedList);System.out.println("数据量:" + synchronizedList.size());} else {
    System.out.println("普通情况:" + getUserById(userIds));}logger.info("结束时间:{}", LocalDateTime.now());}/*** 通过userId去查找User对象, 假设这个接口的api耗时5秒** @param userIds* @return*/public static List<User> getUserById(List<Integer> userIds) throws InterruptedException {
    List<User> userList = new ArrayList<>();System.out.println("线程进来了..." + Thread.currentThread().getName());for (int i = 0; i < 10; i++) {
    User user = new User();user.setId(Integer.valueOf(i));user.setName(String.valueOf(i));userList.add(user);}Thread.sleep(5000);return userList;}
}

2和3总结

如果是高并发情况下, 建议使用2,CountDownLatch对象他像一个Future的升级版, await()方法就像get()方法一样,会阻塞其他线程,而get()在3中, 跟同步没啥区别, 用了反而还多此一举。
如果是普通异步,可以使用submit(),来接收返回值,最后再用get()获取对应的值, 但是我觉得如果是用线程池仅仅是做简单的异步操作,不如使用CompletableFuture对象。

4:CompletableFuture对象

在这里插入图片描述

一个completetableFuture就代表了一个任务。他能用Future的方法。还能做一些之前说的executorService配合futures做不了的。只要用了get()就会阻塞其他线程,执行当前线程的任务.
细节问题:
通过CompletableFuture对象执行的异步, 其中的线程都是守护线程。 线程区分:用户线程和守护线程, 当用户线程都结束后,守护线程也结束了。这是一个小细节。官方话:只有当最后一个非守护线程结束时,守护线程随着JVM一同结束工作

看了一篇博客, 介绍的很详细, 具体api可以参考

https://blog.csdn.net/finalheart/article/details/87615546

如果没有用get()阻塞线程时, 当主线程执行完, 用户线程终止了, 导致守护线程随着jvm一起终止.

whenComplete: 当主线程在处理任务时, 如果completableFuture执行其他任务已处理完, 谁先执行完, 谁先输出. 看下图一, 图二test输出的顺序, 这个api最重要的地方在于, 只要不让用户线程终止, 最后的值一定是会输出的. 只是顺序问题

whenCompleteAsync: 创建一个异步线程去执行任务, 我觉得这个才是重中之重, 主线程执行主线程的, 你执行你的
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
简单的异步Demo

public class CompletableFutureDemo3 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {
    System.out.println("this is first task");try {
    Thread.sleep(1000);} catch (InterruptedException e) {
    e.printStackTrace();}
// int i = 1 / 0;return "first";});/*** 有异常了会抛出, 不会有输出的结果, 反正t和u会有一个为null*/completableFuture.whenCompleteAsync((t, u) -> {
    try {
    Thread.sleep(1000);} catch (InterruptedException e) {
    e.printStackTrace();}System.out.println("我是:"+Thread.currentThread().getName());System.out.println("输出的结果:" + t);System.out.println(u);});System.out.println("我是:"+Thread.currentThread().getName());System.out.println("主线程");Thread.sleep(10000);}
}

在这里插入图片描述

  相关解决方案