一、概念
Fork/Join框架是Java 7提供的一个用于并行执行任务的框架
它采用了一种分而治之的思想
规模为N的问题,N<阈值,直接解决,N>阈值,将N分解为K个小规模子问题,子问题互相对立,与原问题形式相同,将子问题的解合并得到原问题的解
最简单的例子就是数组求和:一个很大的数组,如果采用单线程进行求和的话,速度会很慢,这时就可以使用Fork/Join框架
二、标准范式
三、核心API
Fork/Join框架在java.util.concurrent
包下实现。它的核心有4个类:
- ForkJoinTask<V>: 这是一个抽象任务类,并且运行在
ForkJoinPool
中。- RecursiveAction:
ForkJoinTask
的子类,这个类没有返回值。 - RecursiveTask<V>:
ForkJoinTask
的子类,有返回值。
- RecursiveAction:
- ForkJoinPool:这是一个线程池,管理并运行众多
ForkJoinTask
任务。- invoke() 同步调用。 调用一旦开始,调用者必须等到方法调用返回后,才能继续后续的行为。
- execute() 异步调用。调用更像一个消息传递,一旦开始,方法调用就会立即返回,调用者就可以继续后续的操作
如果想深入了解同步和异步,可以参考https://www.cnblogs.com/IT-CPC/p/10898871.html
这里只是简单了解一下,当我们使用该框架时,根据自己业务需求是否需要返回值,选择继承RecursiveAction或RecursiveTask<V>即可
如果想深入了解API,可以参考https://www.jianshu.com/p/42e9cd16f705
四、demo
4.1 同步用法并返回结果值
统计整形数组中所有元素的和
这里要求返回值,所以我们需要继承RecursiveTask<V>
运行过程在代码中加了注释
public class TestSumArray {//业务线程private static class SumArrayTask extends RecursiveTask<Integer>{private final static int thresholdValue = 100;//阈值,小于该值就不再拆分private int[] src; //待求和的数组private int startIndex;//开始下标private int endIndex;//结束下标public SumArrayTask(int[] src, int startIndex, int endIndex) {this.src = src;this.startIndex = startIndex;this.endIndex = endIndex;}@Overrideprotected Integer compute() {//判断是否满足条件if(endIndex - startIndex < thresholdValue) {//5、满足条件,进行求和int count = 0;try {//由于速度太快,这里休眠100毫秒Thread.sleep(100);} catch (InterruptedException e) {e.printStackTrace();}for(int i = startIndex; i <= endIndex; i++) {count = count + src[i];}return count;}else {//6、不满足条件,继续拆分int mid = (startIndex + endIndex)/2;SumArrayTask left = new SumArrayTask(src,startIndex,mid);SumArrayTask right = new SumArrayTask(src,mid+1,endIndex);//7、提交子任务invokeAll(left,right);//8、返回结果return left.join()+right.join();}}}public static void main(String[] args) {//1、先实例化ForkJoinPool线程池ForkJoinPool pool = new ForkJoinPool();//2、随机生成一个数组Random r = new Random();int[] src = new int[10000];for(int i = 0;i < 10000;i++){//用随机数填充数组src[i] = r.nextInt(100);}//3、把数组丢到业务线程中SumArrayTask innerFind = new SumArrayTask(src,0,src.length-1);long start = System.currentTimeMillis();//4、让线程池开始处理业务线程,这里采用同步调用pool.invoke(innerFind);//9、让业务线程先执行完innerFind.join();System.out.println("耗时:" + (System.currentTimeMillis()-start) + "毫秒");}
}
4.2 异步调用,无需返回值
遍历一个文件夹下,所以txt文件的文件路径
这里不需要返回值,所以我们需要继承RecursiveAction
public class TestDirsFiles extends RecursiveAction{private File path;//当前任务需要搜寻的目录public TestDirsFiles(File path) {this.path = path;}@Overrideprotected void compute() {//4、用于保存子任务的集合List<TestDirsFiles> subTasks = new ArrayList<>();File[] files = path.listFiles();//5、开始判断当前目录下,是目录还是文件if(files != null) {for(File file:files) {if(file.isDirectory()) {//6、如果是目录,加入子任务集合中,继续遍历subTasks.add(new TestDirsFiles(file));}else {//7、遇到文件,判断是否是txt文件,是的话打印路径if(file.getAbsolutePath().endsWith("txt")) {System.out.println("文件:"+file.getAbsolutePath());}}}//8、子任务集合不为空的话,提交所有的子任务if(!subTasks.isEmpty()) {for(TestDirsFiles subTask:invokeAll(subTasks)) {//等待子任务执行完成subTask.join();}}}}public static void main(String [] args){try {//1、 创建ForkJoinPoolForkJoinPool pool = new ForkJoinPool();//2、创建我们自己定义的任务线程,这里不需要返回结果,所以extends RecursiveActionTestDirsFiles task = new TestDirsFiles(new File("D:/"));//3、使用ForkJoinPool来异步调用我们的任务线程pool.execute(task);//异步调用//等待任务执行完成task.join();} catch (Exception e) {e.printStackTrace();}}
}