当前位置: 代码迷 >> 综合 >> Java并发基础(5)——Fork-Join框架
  详细解决方案

Java并发基础(5)——Fork-Join框架

热度:46   发布时间:2023-12-12 12:10:09.0

一、概念

Fork/Join框架是Java 7提供的一个用于并行执行任务的框架

它采用了一种分而治之的思想

规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解

最简单的例子就是数组求和:一个很大的数组,如果采用单线程进行求和的话,速度会很慢,这时就可以使用Fork/Join框架

二、标准范式

三、核心API

Fork/Join框架在java.util.concurrent包下实现。它的核心有4个类:

  • ForkJoinTask<V>: 这是一个抽象任务类,并且运行在ForkJoinPool中。
    • RecursiveAction: ForkJoinTask的子类,这个类没有返回值。
    • RecursiveTask<V>: ForkJoinTask的子类,有返回值。
  • ForkJoinPool:这是一个线程池,管理并运行众多ForkJoinTask任务。
    • invoke()  同步调用。 调用一旦开始,调用者必须等到方法调用返回后,才能继续后续的行为。
    • execute() 异步调用。调用更像一个消息传递,一旦开始,方法调用就会立即返回,调用者就可以继续后续的操作

如果想深入了解同步和异步,可以参考https://www.cnblogs.com/IT-CPC/p/10898871.html

这里只是简单了解一下,当我们使用该框架时,根据自己业务需求是否需要返回值,选择继承RecursiveActionRecursiveTask<V>即可

如果想深入了解API,可以参考https://www.jianshu.com/p/42e9cd16f705 

四、demo

4.1 同步用法并返回结果值

统计整形数组中所有元素的和

这里要求返回值,所以我们需要继承RecursiveTask<V>

运行过程在代码中加了注释

public class TestSumArray {//业务线程private static class SumArrayTask extends RecursiveTask<Integer>{private final static int thresholdValue = 100;//阈值,小于该值就不再拆分private int[] src; //待求和的数组private int startIndex;//开始下标private int endIndex;//结束下标public SumArrayTask(int[] src, int startIndex, int endIndex) {this.src = src;this.startIndex = startIndex;this.endIndex = endIndex;}@Overrideprotected Integer compute() {//判断是否满足条件if(endIndex - startIndex < thresholdValue) {//5、满足条件,进行求和int count = 0;try {//由于速度太快,这里休眠100毫秒Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}for(int i = startIndex; i <= endIndex; i++) {count = count + src[i];}return count;}else {//6、不满足条件,继续拆分int mid = (startIndex + endIndex)/2;SumArrayTask left = new SumArrayTask(src,startIndex,mid);SumArrayTask right = new SumArrayTask(src,mid+1,endIndex);//7、提交子任务invokeAll(left,right);//8、返回结果return left.join()+right.join();}}}public static void main(String[] args) {//1、先实例化ForkJoinPool线程池ForkJoinPool pool = new ForkJoinPool();//2、随机生成一个数组Random r = new Random();int[] src = new int[10000];for(int i = 0;i < 10000;i++){//用随机数填充数组src[i] =  r.nextInt(100);}//3、把数组丢到业务线程中SumArrayTask innerFind = new SumArrayTask(src,0,src.length-1);long start = System.currentTimeMillis();//4、让线程池开始处理业务线程,这里采用同步调用pool.invoke(innerFind);//9、让业务线程先执行完innerFind.join();System.out.println("耗时:" + (System.currentTimeMillis()-start) + "毫秒");}
}

4.2 异步调用,无需返回值

遍历一个文件夹下,所以txt文件的文件路径

这里不需要返回值,所以我们需要继承RecursiveAction

public class TestDirsFiles extends RecursiveAction{private File path;//当前任务需要搜寻的目录public TestDirsFiles(File path) {this.path = path;}@Overrideprotected void compute() {//4、用于保存子任务的集合List<TestDirsFiles> subTasks = new ArrayList<>();File[] files = path.listFiles();//5、开始判断当前目录下,是目录还是文件if(files != null) {for(File file:files) {if(file.isDirectory()) {//6、如果是目录,加入子任务集合中,继续遍历subTasks.add(new TestDirsFiles(file));}else {//7、遇到文件,判断是否是txt文件,是的话打印路径if(file.getAbsolutePath().endsWith("txt")) {System.out.println("文件:"+file.getAbsolutePath());}}}//8、子任务集合不为空的话,提交所有的子任务if(!subTasks.isEmpty()) {for(TestDirsFiles subTask:invokeAll(subTasks)) {//等待子任务执行完成subTask.join();}}}}public static void main(String [] args){try {//1、 创建ForkJoinPoolForkJoinPool pool = new ForkJoinPool();//2、创建我们自己定义的任务线程,这里不需要返回结果,所以extends RecursiveActionTestDirsFiles task = new TestDirsFiles(new File("D:/"));//3、使用ForkJoinPool来异步调用我们的任务线程pool.execute(task);//异步调用//等待任务执行完成task.join();} catch (Exception e) {e.printStackTrace();}}
}

 

  相关解决方案