当前位置: 代码迷 >> 综合 >> CompletableFuture 基本使用(java8 的特性;有返回值的线程)
  详细解决方案

CompletableFuture 基本使用(java8 的特性;有返回值的线程)

热度:46   发布时间:2023-12-27 13:23:42.0

1. 不错的文章

https://www.jianshu.com/p/6bac52527ca4

@Testpublic void supplyAsyncTest() throws IOException, ExecutionException, InterruptedException {System.out.println("main  thread ......"+Thread.currentThread().getName());// 带返回值的CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("run start ...");System.out.println("task  thread ......"+Thread.currentThread().getName());System.out.println("run end ...");// 返回执行结果return 1;});Integer result = future.get();System.out.println("result = "+result);}/*** @Description:  计算结果完成时的回调方法* @Param:* @return:* @Author: guoyiguang* @Date:*/@Testpublic void whenCompleteTest() throws IOException, ExecutionException, InterruptedException {System.out.println("main  thread ......"+Thread.currentThread().getName());CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("run start ...");System.out.println("task  thread ......"+Thread.currentThread().getName());System.out.println("run end ...");// 返回执行结果return 1;});Integer result = future.get();System.out.println("result = "+result);// whenCompleteAsyncfuture.whenComplete(new BiConsumer<Integer, Throwable>() {@Overridepublic void accept(Integer t, Throwable action) {System.out.println("执行完成! thread name  为  "+Thread.currentThread().getName());}});future.exceptionally(new Function<Throwable, Integer>() {@Overridepublic Integer apply(Throwable t) {System.out.println("执行失败!"+t.getMessage());return null;}});}/*** @Description:  当一个线程依赖另一个线程时,可以使用 thenApply 方法来把这两个线程串行化。* @Param: Function<? super T, ? extends U>* T:上一个任务返回结果的类型* U:当前任务的返回值类型* 适用场景 :第二个任务依赖第一个任务的结果* @Author: guoyiguang* @Date:*/@Testpublic void thenApplyTest() throws IOException, ExecutionException, InterruptedException {System.out.println("main  thread ......"+Thread.currentThread().getName());CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("run1 start ...");System.out.println("task  thread ......"+Thread.currentThread().getName());System.out.println("run1 end ...");// 返回执行结果return 1;// thenApply 同步}).thenApplyAsync(new Function<Integer, Integer>() {@Overridepublic Integer apply(Integer t) {System.out.println("run2 start ...");System.out.println("task  thread ......"+Thread.currentThread().getName());// t 是 上述线程的执行结果Integer result = t*5;System.out.println("result2="+result);System.out.println("run2 end ...");return result;}});Integer result = future.get();System.out.println("result = "+result);}

原理:

 源码解析:

https://www.jianshu.com/p/abfa29c01e1d

代码如下:

 @Testpublic void whenCompleteTest() throws IOException, ExecutionException, InterruptedException {System.out.println("main  thread ......"+Thread.currentThread().getName());// 往 ForkJoin 线程池 扔任务 CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {System.out.println("run start ...");Long i = 0L;while(i<500000L){System.out.println("task  thread ......"+Thread.currentThread().getName());}System.out.println("run end ...");// 返回执行结果i++;return 1;});//  主线程陷入阻塞Integer result = future.get();System.out.println("result = "+result);}
 public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {return asyncSupplyStage(asyncPool, supplier);}//  asyncPool  属性定义如下:private static final Executor asyncPool = useCommonPool ?ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();

往线程池扔任务的方法:

 static <U> CompletableFuture<U> asyncSupplyStage(Executor e,Supplier<U> f) {if (f == null) throw new NullPointerException();CompletableFuture<U> d = new CompletableFuture<U>();e.execute(new AsyncSupply<U>(d, f));return d;}

任务类:

 static final class AsyncSupply<T> extends ForkJoinTask<Void>implements Runnable, AsynchronousCompletionTask {CompletableFuture<T> dep; // fn 内部的get 是要获取 返回值的(为 null 说明 任务还没有执行完)Supplier<T> fn;AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {this.dep = dep; this.fn = fn;}public final Void getRawResult() { return null; }public final void setRawResult(Void v) {}// 执行结果,任务是否已经执行完public final boolean exec() { run(); return true; }public void run() {CompletableFuture<T> d; Supplier<T> f;if ((d = dep) != null && (f = fn) != null) {dep = null; fn = null;if (d.result == null) {try {// 执行任务 ,并且设置返回值 给 CompletableFuture// 底层原理:cas// RESULT  是 Com骗了他变了Future的一个属性// UNSAFE.compareAndSwapObject(this, RESULT, null,(t == null) ? NIL : t)d.completeValue(f.get());} catch (Throwable ex) {d.completeThrowable(ex);}}d.postComplete();}}}

说明:f.get()是调用目标方法,然后获取返回值 :demo如下:

@Testpublic void returnTest()  {System.out.println(" returnTest main  thread ......"+Thread.currentThread().getName());Integer result = getReturn(() -> {System.out.println("aaaaa");System.out.println("hhhhhh");return 1;});System.out.println(result);System.out.println(result);}public <U> U getReturn(Supplier<U> supplier){return supplier.get();}@Testpublic void returnTest2()  {System.out.println(" returnTest2 main  thread ......"+Thread.currentThread().getName());// Consumer  的 accept 消费一个对象(业务逻辑)Consumer<Integer> consumer = new Consumer<Integer>() {// 不带有返回值@Overridepublic void accept(Integer integer) {System.out.println(integer);System.out.println("aaaaa");System.out.println("hhhhhh");}};// 开始调用 目标方法consumer.accept(1);}

get()

Integer result = future.get();
public T get() throws InterruptedException, ExecutionException {Object r;return reportGet((r = result) == null ? waitingGet(true) : r);}

只看  waitingGet() 方法:

 private Object waitingGet(boolean interruptible) {Signaller q = null;boolean queued = false;int spins = -1;Object r;// 在这里阻塞,直到  CompletableFuture 的  result 属性 在 run() 方法中 设置成了 非 null while ((r = result) == null) {if (spins < 0)spins = SPINS;else if (spins > 0) {if (ThreadLocalRandom.nextSecondarySeed() >= 0)--spins;}else if (q == null)q = new Signaller(interruptible, 0L, 0L);else if (!queued)queued = tryPushStack(q);else if (interruptible && q.interruptControl < 0) {q.thread = null;cleanStack();return null;}else if (q.thread != null && result == null) {try {ForkJoinPool.managedBlock(q);} catch (InterruptedException ie) {q.interruptControl = -1;}}}if (q != null) {q.thread = null;if (q.interruptControl < 0) {if (interruptible)r = null; // report interruptionelseThread.currentThread().interrupt();}}postComplete();return r;}

  相关解决方案