当前位置: 代码迷 >> 综合 >> 异步-CompletableFuture
  详细解决方案

异步-CompletableFuture

热度:19   发布时间:2024-03-06 20:16:38.0

1.0 线程回顾

1.1 初始化线程的4中方式

  1. 继承Thread
  2. 实现Runnable接口
  3. 实现Callable 接口+FutureTask (可以拿到返回结果,可以处理异常)
  4. 线程池

通过线程池性能稳定,也可以获取执行结果,并捕获异常。但是,在业务复杂情况下,一个异步调用可能会依赖于另一个异步调用的执行结果。

import java.util.concurrent.*;public class ThreadTest {
    public static ExecutorService service = Executors.newFixedThreadPool(10);public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("main......start...");
// Thread01 thread01 = new Thread01();
 thread01.start(); //启动线程// Runnable01 runnable01 = new Runnable01();
// new Thread(runnable01).start();// FutureTask<Integer> futureTask = new FutureTask<>(new Callable01());
// new Thread(futureTask).start();
// //阻塞等待 整个线程执行完成,获取返回结果
// Integer integer = futureTask.get();
// System.out.println("main......end...."+integer ); //这个最后执行//将所有的多线程异步任务都交给线程池执行,这样不用一个任务申请一个线程了
// new Thread(()-> System.out.println("hello")).start();//当前系统中只有一二个,每个异步任务直接提交给线程池service.execute(new Runnable01());System.out.println("main......end....");}public static class Thread01 extends Thread{
    @Overridepublic void run() {
    System.out.println("dangqiang线程" +Thread.currentThread().getId());int i =10 /2;System.out.println("运行结果"+i);}}public static class Runnable01 implements Runnable{
    @Overridepublic void run() {
    System.out.println("dangqiang线程" +Thread.currentThread().getId());int i =10 /2;System.out.println("运行结果"+i);}}public static class Callable01 implements Callable<Integer>{
    @Overridepublic Integer call() throws Exception {
    System.out.println("dangqiang线程" +Thread.currentThread().getId());int i =10 /2;System.out.println("运行结果"+i);return i;}}
}main......start...
main......end....
dangqiang线程11
运行结果5
方式1和2 : 主线程无法获取线程的运算结果,不适合当前场景
方式3: 主进程可以获取线程的运算结果,但是不利于控制服务器中的线程资源.可以导致服务器的资源耗尽
方式4: 通过以下二种方式初始化线程池Executors.newFiexedThreadPool(3);//或者new ThreadPoolExecutor(corePoolSize, maximumPoolSize,keepAliveTime,TimeUnit unit,workQueue,threadFactory,handler)

1.2 线程池

在这里插入图片描述

2.CompletableFuture 异步编排

在这里插入图片描述

2.1 启动异步任务

public static ExecutorService service = Executors.newFixedThreadPool(10);public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("main....start.....");
// CompletableFuture.runAsync(()->{
    
// System.out.println("当前线程: "+Thread.currentThread().getId());
// int i=10/2;
// System.out.println("运行结果:" +i);
// },service);CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程: " + Thread.currentThread().getId());int i = 10 / 2;System.out.println("运行结果:" + i);return i;}, service);Integer integer = future.get();System.out.println("main........end......."+integer);}main....start.....
当前线程: 11
运行结果:5
main........end.......5

.2.2 CompletableFuture-完成回调与异常感知

在这里插入图片描述

    public static ExecutorService service = Executors.newFixedThreadPool(10);public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("main....start.....");CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程: " + Thread.currentThread().getId());int i = 10 / 2;System.out.println("运行结果:" + i);return i;}, service).whenComplete((res,excption)->{
    // 虽然得到异常,但是无法修改返回数据System.out.println("异常任务成功...结果是:"+res +";异常是: "+excption);}).exceptionally(throwable -> {
    //可以感知异常,同时返回默认值return 10;});Integer integer = future.get();System.out.println("main........end......."+integer);}
}main....start.....
当前线程: 11
运行结果:5
异常任务成功...结果是:5;异常是: null
main........end.......5如过业务有异常(int i =10/0),
main....start.....
当前线程: 11
异常任务成功...结果是:null;异常是: java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
main........end.......10

2.3 handle 方法完成后的处理

.handle((res,thr)->{
    if (res!=null){
    return res*2;  //没异常}if (thr!=null){
      //有异常return 0;}return 1;});

2.4 线程串行化方法

在这里插入图片描述

    public static ExecutorService service = Executors.newFixedThreadPool(10);public static void main(String[] args) throws ExecutionException, InterruptedException {
    System.out.println("main....start.....");CompletableFuture<String> applyAsync = CompletableFuture.supplyAsync(() -> {
    System.out.println("当前线程: " + Thread.currentThread().getId());int i = 10 / 2;System.out.println("运行结果:" + i);return i;}, service).thenApplyAsync((res) -> {
    System.out.println("任务二启动了 ..." + res);return "hello" + res;}, service);String s = applyAsync.get();System.out.println("main........end......."+s);}
}
main....start.....
当前线程: 11
运行结果:5
任务二启动了 ...5
main........end.......hello5

2.5 两任务组合-都要完成

 System.out.println("main....start.....");/*** 两个都完成*/CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务1线程:" + Thread.currentThread().getId());int i = 10 / 2;System.out.println("任务1结束:");return i;}, service);CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务2线程:" + Thread.currentThread().getId());System.out.println("任务2结束:");return "Hello";}, service);future01.runAfterBothAsync(future02,()->{
      //不接受上二步的返回值,自己也没返回值System.out.println("任务3开始");}, service);future01.thenAcceptBothAsync(future02,(f1, f2)->{
    System.out.println("任务3开始。。。之前的结果:" + f1 + "==>" + f2);}, service);System.out.println("main....end....." );main....start.....
任务1线程:11
任务1结束:
任务2线程:12
任务2结束:
任务3开始
main....end.....
任务3开始。。。之前的结果:5==>HelloCompletableFuture<String> future = future01.thenCombineAsync(future02, (f1, f2) -> {
    return f1 + ":" + f2 + " -> Haha";}, service);System.out.println("main....end....." + future.get());
main....start.....
任务1线程:11
任务1结束:
任务2线程:12
任务2结束:
main....end.....5:Hello -> Haha

2.6 两任务组合-只要一个完成

当两个任务中,任意一个future任务完成的时候,执行任务。
applyIoEither:两个任务有一个执行完成,获取它的返回值,处理任务并有新的返回值。
acceptEither:两个任务有一个执行完成,获取它的返回值,处理任务,没有新的返回值。
runAfterEither:两个任务有一个执行完成,不需要获取future的结果,处理任务,也没有返回值。

 CompletableFuture<Integer> future01 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务1线程:" + Thread.currentThread().getId());int i = 10 / 2;System.out.println("任务1结束:");return i;}, service);CompletableFuture<String> future02 = CompletableFuture.supplyAsync(() -> {
    System.out.println("任务2线程:" + Thread.currentThread().getId());try {
    Thread.sleep(3000);} catch (InterruptedException e) {
    e.printStackTrace();}System.out.println("任务2结束:");return "Hello";}, service);future01.runAfterEitherAsync(future02, () -> {
    System.out.println("任务3结束:");}, service);System.out.println("main....end.....");}main....start.....
任务1线程:11
任务1结束:
任务2线程:12
main....end.....
任务3结束:
任务2结束:

2.7 allof,anyof方法

allof: 顾名思义,就是所有的任务串行处理,无返回值.
anyif: 就是只要有一个任务执行完成后就返回future并将第一个完成的参数带着一起返回

public class ThreadTest {
    public static ExecutorService service = Executors.newFixedThreadPool(10);public static void main(String[] args) throws ExecutionException, InterruptedException {
    CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
    try {
    TimeUnit.SECONDS.sleep(3);System.out.println("f1...");} catch (InterruptedException e) {
    e.printStackTrace();}return "f1";}, service);CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
    try {
    TimeUnit.SECONDS.sleep(2);System.out.println("f2...");} catch (InterruptedException e) {
    e.printStackTrace();}// throw new RuntimeException("aa");// 如果想处理异常,可以直接处理,可以通过whencomplete,handler方法…return "f2";}, service);CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> {
    try {
    TimeUnit.SECONDS.sleep(3);System.out.println("f3...");} catch (InterruptedException e) {
    e.printStackTrace();}return "f3";}, service);CompletableFuture<String> all = CompletableFuture.allOf(f1, f2, f3).thenApplyAsync(x -> {
    return "等到f1,f2和f3都结束了,才能执行 ,X="+x;}, service);System.out.println(":主线程...");//join 或者getString aVoid = all.get();System.out.println(aVoid);  // 阻塞等待System.out.println("任务均已完成。"); //这个最后显示,因为要等上一步的输出}
}
:主线程...   //第一个出来
f2...		//2秒后出来
f1...       //这个及下面的 差不多一起
f3...
等到f1,f2和f3都结束了,才能执行  ,X=null
任务均已完成。 CompletableFuture<Object> handle = CompletableFuture.anyOf(f1, f2, f3).handle((res, thr) -> {
    if (res != null) {
    System.out.println("可以直接返回一个已完成的线程: " + res);return "xiaoming";}if (thr != null) {
      //有异常return 0;}return 1;}); //不能异步处理
:主线程...
f2...
可以直接返回一个已完成的线程: f2
xiaoming
任务均已完成。
f1...
f3...