JDK1.8 之前,我们会通过 Future 和 Callable 采用轮询来实现异步获取结果
//定义一个异步任务
Future<String> future = executor.submit(()->{Thread.sleep(2000);return "hello world";
});
//轮询获取结果
while (true){if(future.isDone()) {System.out.println(future.get());break;}}
JDK1.8 中提供的 CompletableFuture 提供了异步函数式编程。可以帮助我们简化异步编程的复杂性,通过回调的方式处理计算结果,并且提供了转换和组合的方法。
1 CompletableFuture 的使用
1.1 创建 CompletableFuture 对象
提供了四个静态方法来创建
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
async 代表异步。
runAsync 和 supplyAsync 方法的区别在于,前者没有结果返回,后者会有结果返回
默认用的线程池为 ForkJoinPool.commonPool()
private static void completableFuture() throws Exception {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return "hello world";});// 阻塞的获取结果while (true) {if (completableFuture.isDone()) {System.out.println(completableFuture.get());break;}}
}
1.2 阻塞获取
以下四个方法用于获取结果
public T get()
public T get(long timeout, TimeUnit unit)
public T getNow(T valueIfAbsent)
public T join()
getNow() 代表计算完,如果返回结果或抛出异常就正常get,否则就返回给定的 valueIfAbsent 值
join() 返回计算的结果或者抛出一个 unchecked 异常(CompletionException)
主动触发计算
public boolean complete(T value)
public boolean completeExceptionally(Throwable ex)
private static void completableFuture3() throws Exception {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return "hello world";});// 设置 completableFuture.get() 获取到的值completableFuture.complete("aaa");System.out.println(completableFuture.get());// 以异常的形式触发计算// completableFuture.completeExceptionally(new Exception());// System.out.println(completableFuture.get());}
1.3 计算完成时处理
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
上面四个方法是计算阶段结束的时候触发
BiConsumer 有两个入参,分别代表计算的返回值,以及异常
private static void completableFuture4() throws Exception {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return "hello world";});completableFuture = completableFuture.whenCompleteAsync((v, e) -> {System.out.println("return value:" + v + " exception:" + e);});System.out.println(completableFuture.get());
}
public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
返回的不是原始返回的值,而是经过 BiFunction 处理返回的值
private static void completableFuture5() throws Exception {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return "hello world";});completableFuture = completableFuture.handle((v, e) -> {System.out.println("return value:" + v + " exception:" + e);return "handled " + v;});System.out.println(completableFuture.get());
}
1.4 thenApply
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
private static void completableFuture6() throws Exception {CompletableFuture<String> completableFuture = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(2000);} catch (InterruptedException e) {e.printStackTrace();}return "hello world";});completableFuture = completableFuture.thenApply((ele) -> {return "handled1 " + ele;}).thenApply((ele) -> {return "handled2 " + ele;});System.out.println(completableFuture.get());
}
与 handle 方法的区别在于,handle 方法会处理正常值和异常。因此它可以屏蔽异常,避免异常继续抛出。
而 thenApply 方法只是用来处理正常值,因此一旦有异常就会抛出。
1.5 thenAccept
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
只对结果进行消费,没有返回值
1.6 thenAcceptBoth
public CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action)
public CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action, Executor executor)
用来组合两个 CompletableFuture,其中一个 CompletableFuture 等待另一个 CompletableFuture 的结果。
private static void completableFuture8() throws Exception {CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(10000);} catch (InterruptedException e) {e.printStackTrace();}return "hello world1";});CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}return "hello world2";});CompletableFuture<Void> compose = completableFuture1.thenAcceptBoth(completableFuture2, (x, y) -> {System.out.println(x + " " + y);});System.out.println(completableFuture1.get());System.out.println(completableFuture2.get());System.out.println(compose.get());
}
1.7 组合处理
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other, Function<? super T, U> fn);
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action);
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
- applyToEither
- 一个完成就触发 Function
- acceptEither
- 两个都完成才触发 Consumer
- allOf
- 所有都执行完成才执行计算
- anyOf
- 任意一个执行完成就执行计算
2 CompletableFuture 的应用场景
2.1 创建异步任务
最初设计出来就是为了完成异步任务的功能。
2.2 简单任务异步回调
先说一下 async 和没有 async 的区别
- 有 async 的方法,前后任务共用一个线程池
- 没有 async 的方法,第二个任务使用的是 ForkJoin 线程池
这部分很多都在前文中解释过了,就不再补充代码测试了。
1 thenRun/thenRunAsync
- 执行完第一个任务后,执行第二个任务。
- 也就是第二个任务是第一个任务的回调。
- 但是任务前后没有参数传递,第二个任务也没有返回值
2 thenAccept/thenAcceptAsync
- 执行完第一个任务后,执行第二个任务。
- 会将第一个任务的结果当做入参传入第二个任务。
- 第二个任务没有返回值,是 Consumer
3 thenApply/thenApplyAsync
- 执行完第一个任务后,执行第二个任务
- 会将第一个任务的结果当做入参传入第二个任务。
- 第二个任务也有返回值,是 Function
4 exceptionally
- 执行任务发生异常的回调。
- 发生的异常作为参数,传递到回调方法中
5 whenComplete
- 类似于 thenAccept ,只是回调中会返回 exception
- 而 thenAccept 方法有异常会直接对外抛出
6 handle
- 类似于 thenApply ,只是回调中会返回 exception
- 而 thenApply 方法有异常会直接对外抛出
2.3 多个任务组合处理
1 and 组合关系
表示将两个任务组合起来,只有两个都正常执行完了,才会执行某个任务
- thenCombine:会将两个任务的执行结果作为方法入参,传递到指定方法中,且有返回值
- thenAcceptBoth: 会将两个任务的执行结果作为方法入参,传递到指定方法中,且无返回值
- runAfterBoth 不会把执行结果当做方法入参,且没有返回值。
2 or 组合关系
表示将两个任务组合起来,只要其中有一个任务执行完了,就会执行某个任务
- applyToEither:会将已经执行完成的任务,作为方法入参,传递到指定方法中,且有返回值
- acceptEither: 会将已经执行完成的任务,作为方法入参,传递到指定方法中,且无返回值
- runAfterEither: 不会把执行结果当做方法入参,且没有返回值。
3 allOf
所有任务都执行完成后,才执行返回的CompletableFuture
这边发现取不到 v 和 e 的值,因为可能存在多个任务,不确定最终执行哪个任务。
所以这个方法适用于各个任务的返回值没有关联关系
等待多个任务确认都完成后,再执行后续回调
有点类似 jemeter 中的设置集合点 的概念
private static void completableFuture10() throws Exception {CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}return "hello world1";});CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}return "hello world2";});CompletableFuture<Void> allOf = CompletableFuture.allOf(completableFuture1, completableFuture2).whenComplete((v, e) -> System.out.println("value:" + v + "ex:" + e));System.out.println(completableFuture1.get());System.out.println(completableFuture2.get());System.out.println(allOf.get());
}
4 anyOf
任意一个任务执行完,才执行返回的CompletableFuture
可以取到 v 和 e 的值,返回的是首先完成的任务的返回值
private static void completableFuture11() throws Exception {CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(5000);} catch (InterruptedException e) {e.printStackTrace();}return "hello world1";});CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {try {Thread.sleep(1000);} catch (InterruptedException e) {e.printStackTrace();}return "hello world2";});CompletableFuture<Object> allOf = CompletableFuture.anyOf(completableFuture1, completableFuture2).whenComplete((v, e) -> {System.out.println("value:" + v + "ex:" + e);});System.out.println(completableFuture1.get());System.out.println(completableFuture2.get());System.out.println(allOf.get());
}
5 thenCompose
thenCompose方法会在某个任务执行完成后,将该任务的执行结果,作为方法入参,去执行指定的方法。该方法会返回一个新的CompletableFuture实例
- 如果该CompletableFuture实例的result不为null,则返回一个基于该result新的CompletableFuture实例;
- 如果该CompletableFuture实例为null,然后就执行这个新任务