当前位置: 代码迷 >> 综合 >> 带返回值的线程(callable)
  详细解决方案

带返回值的线程(callable)

热度:40   发布时间:2023-12-27 13:25:30.0

1.

 

 FutureTask 类 的  run() 方法 来自 Runnable 的run () 方法,所以 走 run () 方法就会执行 call() 方法

原理:

 源码如下:

public interface RunnableFuture<V> extends Runnable, Future<V> {/*** Sets this Future to the result of its computation* unless it has been cancelled.*/void run();
}

 

public interface Future<V> {/*** Attempts to cancel execution of this task.  This attempt will* fail if the task has already completed, has already been cancelled,* or could not be cancelled for some other reason. If successful,* and this task has not started when {@code cancel} is called,* this task should never run.  If the task has already started,* then the {@code mayInterruptIfRunning} parameter determines* whether the thread executing this task should be interrupted in* an attempt to stop the task.** <p>After this method returns, subsequent calls to {@link #isDone} will* always return {@code true}.  Subsequent calls to {@link #isCancelled}* will always return {@code true} if this method returned {@code true}.** @param mayInterruptIfRunning {@code true} if the thread executing this* task should be interrupted; otherwise, in-progress tasks are allowed* to complete* @return {@code false} if the task could not be cancelled,* typically because it has already completed normally;* {@code true} otherwise*/boolean cancel(boolean mayInterruptIfRunning);/*** Returns {@code true} if this task was cancelled before it completed* normally.** @return {@code true} if this task was cancelled before it completed*/boolean isCancelled();/*** Returns {@code true} if this task completed.** Completion may be due to normal termination, an exception, or* cancellation -- in all of these cases, this method will return* {@code true}.** @return {@code true} if this task completed*/boolean isDone();/*** Waits if necessary for the computation to complete, and then* retrieves its result.** @return the computed result* @throws CancellationException if the computation was cancelled* @throws ExecutionException if the computation threw an* exception* @throws InterruptedException if the current thread was interrupted* while waiting*/V get() throws InterruptedException, ExecutionException;/*** Waits if necessary for at most the given time for the computation* to complete, and then retrieves its result, if available.** @param timeout the maximum time to wait* @param unit the time unit of the timeout argument* @return the computed result* @throws CancellationException if the computation was cancelled* @throws ExecutionException if the computation threw an* exception* @throws InterruptedException if the current thread was interrupted* while waiting* @throws TimeoutException if the wait timed out*/V get(long timeout, TimeUnit unit)throws InterruptedException, ExecutionException, TimeoutException;
}

 原理:

 

 

 

 

 返回 执行结果:

 

  public void run() {if (state != NEW ||!UNSAFE.compareAndSwapObject(this, runnerOffset,null, Thread.currentThread()))return;try {Callable<V> c = callable;if (c != null && state == NEW) {V result;boolean ran;try {// 执行任务方法,并获取返回值result = c.call();ran = true;} catch (Throwable ex) {result = null;ran = false;setException(ex);}if (ran)// 设置 线程返回值set(result);}} finally {// runner must be non-null until state is settled to// prevent concurrent calls to run()runner = null;// state must be re-read after nulling runner to prevent// leaked interruptsint s = state;if (s >= INTERRUPTING)handlePossibleCancellationInterrupt(s);}}

流程:

public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();// 调用 new FutureTask<T>(runnable, value) 构造器,生成 对象//  RunnableFuture 里 就有  了 Callable 属性RunnableFuture<Void> ftask = newTaskFor(task, null);// execute(ftask);return ftask;}

demo:

自定义任务:

package com.example.demo.thread;import lombok.extern.slf4j.Slf4j;import java.util.concurrent.Callable;/*** @program: springboot_01* @description:  程序中使用的是Callable接口,可以获取线程的返回值*                  没有返回值的  用 Runnable 接口* @author: guoyiguang* @create: 2021-07-11 11:01**/
@Slf4j
public class MyTask implements Callable {private String filename;public MyTask(String filename) {this.filename = filename;}@Overridepublic Object call() throws Exception {// 要执行的任务log.info("callable  call() method thread name-----{} ,mission value----{}",Thread.currentThread().getName(),filename);return "返回值" + filename;}
}

测试controller:

package com.example.demo.controller;import com.example.demo.thread.MyTask;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.*;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;/*** @program: springboot_01* @description:  线程 控制器* @author: guoyiguang* @create: 2021-01-16 13:16**/
@RestController
@RequestMapping("/thread")
@Slf4j
public class ThreadController {/*** 创建线程池,并发量最大为5* LinkedBlockingDeque,表示执行任务或者放入队列*/ThreadPoolExecutor excutor = new ThreadPoolExecutor(5, 10, 0,TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(),new ThreadPoolExecutor.CallerRunsPolicy());@PostMapping(value="a" )@ResponseBodypublic String a() throws ExecutionException, InterruptedException {log.info("request thread name-----{}",Thread.currentThread().getName());//存储线程的返回值List<Future<String>> results = new LinkedList<>();for (int i = 0; i < 10; i++) {MyTask task = new MyTask(String.valueOf(i));Future<String> result = excutor.submit(task);// 收集返回值results.add(result);}//输出结果StringBuilder sb = new StringBuilder();for (int i = 0; i < 10; i++) {// 获取返回值sb.append(results.get(i).get());sb.append(" ");}return  sb.toString();}}

执行结果:任务是不同的线程执行的

 springboot 异步的应用:

先测试同步:

package com.example.demo.service;/*** @program: springboot_01* @description:* @author: guoyiguang* @create: 2021-07-11 13:24**/
public interface AsyncService {long doNoReturn();
}
package com.example.demo.service.Impl;import com.example.demo.service.AsyncService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;import java.time.LocalDateTime;/*** @program: springboot_01* @description:* @author: guoyiguang* @create: 2021-07-11 13:24**/
@Service
@Slf4j
public class AsyncServiceImp implements AsyncService {@Overridepublic long doNoReturn() {try {long start = System.currentTimeMillis();log.info(" thread name ------>  {} ; 方法执行开始 ------>  {}",Thread.currentThread().getName(),LocalDateTime.now());// 这个方法执行需要三秒Thread.sleep(3000);long end = System.currentTimeMillis();return end-start;} catch (InterruptedException e) {e.printStackTrace();}return -1L;}
}

测试 controller:

  @PostMapping(value="b" )public long b() throws ExecutionException, InterruptedException {log.info("request thread name-----{}",Thread.currentThread().getName());return  asyncService.doNoReturn();}

 执行时间:

 异步测试:

启动类添加 异步注解:

package com.example.demo;import org.mybatis.spring.annotation.MapperScan;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.web.client.RestTemplate;@SpringBootApplication
//@MapperScan({"com.example.demo.mapper","com.example.demo.mapper"})
// @MapperScan  代替 @Mapper的
@MapperScan("com.example.demo.mapper")
// 开启缓存
@EnableCaching
@EnableAsync // 开启异步任务
public class DemoApplication {public static void main(String[] args) {SpringApplication.run(DemoApplication.class, args);}@BeanRestTemplate restTemplate(){return new RestTemplate();}}

方法上添加异步:

package com.example.demo.service.Impl;import com.example.demo.service.AsyncService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;import java.time.LocalDateTime;/*** @program: springboot_01* @description:* @author: guoyiguang* @create: 2021-07-11 13:24**/
@Service
@Slf4j
public class AsyncServiceImp implements AsyncService {@Async@Overridepublic void doNoReturn() {try {long start = System.currentTimeMillis();log.info(" thread name ------>  {} ; 方法执行开始 ------>  {}",Thread.currentThread().getName(),LocalDateTime.now());// 这个方法执行需要三秒Thread.sleep(3000);long end = System.currentTimeMillis();log.info(" thread name ------>  {} ; 方法执行结束 ------>  {}",Thread.currentThread().getName(),LocalDateTime.now());} catch (InterruptedException e) {e.printStackTrace();}}
}

测试代码如下:

    @PostMapping(value="b" )public long b() throws ExecutionException, InterruptedException {log.info("request thread name-----{}",Thread.currentThread().getName());long start = System.currentTimeMillis();asyncService.doNoReturn();long end = System.currentTimeMillis();return end - start;}

 

 

 已经返回结果:

 

 

  相关解决方案