当前位置: 代码迷 >> 综合 >> CompletableFuture 使用及应用场景
  详细解决方案

CompletableFuture 使用及应用场景

热度:8   发布时间:2023-12-11 22:21:15.0

JDK1.8 之前,我们会通过 Future 和 Callable 采用轮询来实现异步获取结果

//定义一个异步任务
Future<String> future = executor.submit(()->{Thread.sleep(2000);return "hello world";
});
//轮询获取结果
while (true){if(future.isDone()) {System.out.println(future.get());break;}}

JDK1.8 中提供的 CompletableFuture 提供了异步函数式编程。可以帮助我们简化异步编程的复杂性,通过回调的方式处理计算结果,并且提供了转换和组合的方法。

1 CompletableFuture 的使用

1.1 创建 CompletableFuture 对象

        提供了四个静态方法来创建

public static CompletableFuture<Void>   runAsync(Runnable runnable)
public static CompletableFuture<Void>   runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U>  supplyAsync(Supplier<U> supplier, Executor executor)

async 代表异步。

runAsync 和 supplyAsync 方法的区别在于,前者没有结果返回,后者会有结果返回

默认用的线程池为 ForkJoinPool.commonPool()

private static void completableFuture() throws Exception {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return "hello world";});// 阻塞的获取结果while (true) {if (completableFuture.isDone()) {System.out.println(completableFuture.get());break;}}
}

1.2 阻塞获取

        以下四个方法用于获取结果

public T    get()
public T    get(long timeout, TimeUnit unit)
public T    getNow(T valueIfAbsent)
public T    join()

getNow() 代表计算完,如果返回结果或抛出异常就正常get,否则就返回给定的 valueIfAbsent 值

join() 返回计算的结果或者抛出一个 unchecked 异常(CompletionException)

        主动触发计算

public boolean complete(T  value)
public boolean completeExceptionally(Throwable ex)
    private static void completableFuture3() throws Exception {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return "hello world";});// 设置 completableFuture.get() 获取到的值completableFuture.complete("aaa");System.out.println(completableFuture.get());// 以异常的形式触发计算// completableFuture.completeExceptionally(new Exception());// System.out.println(completableFuture.get());}

1.3 计算完成时处理

public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)

上面四个方法是计算阶段结束的时候触发

BiConsumer 有两个入参,分别代表计算的返回值,以及异常

private static void completableFuture4() throws Exception {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return "hello world";});completableFuture = completableFuture.whenCompleteAsync((v, e) -> {System.out.println("return value:" + v + "  exception:" + e);});System.out.println(completableFuture.get());
}
public <U> CompletableFuture<U>     handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U>     handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U>     handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)

返回的不是原始返回的值,而是经过 BiFunction 处理返回的值

private static void completableFuture5() throws Exception {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return "hello world";});completableFuture = completableFuture.handle((v, e) -> {System.out.println("return value:" + v + "  exception:" + e);return "handled " + v;});System.out.println(completableFuture.get());
}

1.4 thenApply

public <U> CompletableFuture<U>     thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
private static void completableFuture6() throws Exception {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return "hello world";});completableFuture = completableFuture.thenApply((ele) -> {return "handled1 " + ele;}).thenApply((ele) -> {return "handled2 " + ele;});System.out.println(completableFuture.get());
}

        与 handle 方法的区别在于,handle 方法会处理正常值和异常。因此它可以屏蔽异常,避免异常继续抛出。

        而 thenApply 方法只是用来处理正常值,因此一旦有异常就会抛出。

1.5 thenAccept

public CompletableFuture<Void>  thenAccept(Consumer<? super T> action)
public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action, Executor executor)

        只对结果进行消费,没有返回值

1.6 thenAcceptBoth

public CompletableFuture<Void>  thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public CompletableFuture<Void>  thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public CompletableFuture<Void>  thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)

        用来组合两个 CompletableFuture,其中一个 CompletableFuture 等待另一个 CompletableFuture 的结果。

private static void completableFuture8() throws Exception {CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}return "hello world1";});CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}return "hello world2";});CompletableFuture<Void> compose = completableFuture1.thenAcceptBoth(completableFuture2, (x, y) -> {System.out.println(x + " " + y);});System.out.println(completableFuture1.get());System.out.println(completableFuture2.get());System.out.println(compose.get());
}

1.7 组合处理

public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
  • applyToEither
    • 一个完成就触发 Function
  • acceptEither
    • 两个都完成才触发 Consumer
  • allOf
    • 所有都执行完成才执行计算
  • anyOf
    • 任意一个执行完成就执行计算

2 CompletableFuture 的应用场景

2.1 创建异步任务

        最初设计出来就是为了完成异步任务的功能。

2.2 简单任务异步回调

先说一下 async 和没有 async 的区别

  • 有 async 的方法,前后任务共用一个线程池
  • 没有 async 的方法,第二个任务使用的是 ForkJoin 线程池

这部分很多都在前文中解释过了,就不再补充代码测试了。

1 thenRun/thenRunAsync

  • 执行完第一个任务后,执行第二个任务。
  • 也就是第二个任务是第一个任务的回调。
  • 但是任务前后没有参数传递,第二个任务也没有返回值

2 thenAccept/thenAcceptAsync

  • 执行完第一个任务后,执行第二个任务。
  • 会将第一个任务的结果当做入参传入第二个任务。
  • 第二个任务没有返回值,是 Consumer

3 thenApply/thenApplyAsync

  • 执行完第一个任务后,执行第二个任务
  • 会将第一个任务的结果当做入参传入第二个任务。
  • 第二个任务也有返回值,是 Function

4 exceptionally

  • 执行任务发生异常的回调。
  • 发生的异常作为参数,传递到回调方法中

5 whenComplete

  • 类似于 thenAccept ,只是回调中会返回 exception
  • 而 thenAccept 方法有异常会直接对外抛出

6 handle

  • 类似于 thenApply ,只是回调中会返回 exception
  • 而 thenApply 方法有异常会直接对外抛出

2.3 多个任务组合处理

1 and 组合关系

表示将两个任务组合起来,只有两个都正常执行完了,才会执行某个任务

  • thenCombine:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值
  • thenAcceptBoth: 会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值
  • runAfterBoth 不会把执行结果当做方法入参,且没有返回值。

2 or 组合关系

表示将两个任务组合起来,只要其中有一个任务执行完了,就会执行某个任务

  • applyToEither:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值
  • acceptEither: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值
  • runAfterEither: 不会把执行结果当做方法入参,且没有返回值。

3 allOf

        所有任务都执行完成后,才执行返回的CompletableFuture

        这边发现取不到 v 和 e 的值,因为可能存在多个任务,不确定最终执行哪个任务。

        所以这个方法适用于各个任务的返回值没有关联关系

        等待多个任务确认都完成后,再执行后续回调

        有点类似 jemeter 中的设置集合点 的概念

private static void completableFuture10() throws Exception {CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}return "hello world1";});CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}return "hello world2";});CompletableFuture<Void> allOf = CompletableFuture.allOf(completableFuture1, completableFuture2).whenComplete((v, e) -> System.out.println("value:" + v + "ex:" + e));System.out.println(completableFuture1.get());System.out.println(completableFuture2.get());System.out.println(allOf.get());
}

4 anyOf

        任意一个任务执行完,才执行返回的CompletableFuture

        可以取到 v 和 e 的值,返回的是首先完成的任务的返回值

private static void completableFuture11() throws Exception {CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}return "hello world1";});CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}return "hello world2";});CompletableFuture<Object> allOf = CompletableFuture.anyOf(completableFuture1, completableFuture2).whenComplete((v, e) -> {System.out.println("value:" + v + "ex:" + e);});System.out.println(completableFuture1.get());System.out.println(completableFuture2.get());System.out.println(allOf.get());
}

5 thenCompose

        thenCompose方法会在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法。该方法会返回一个新的CompletableFuture实例

  • 如果该CompletableFuture实例的result不为null,则返回一个基于该result新的CompletableFuture实例;
  • 如果该CompletableFuture实例为null,然后就执行这个新任务