当前位置: 代码迷 >> 综合 >> 并发编程:Phaser 运行阶段性并发任务
  详细解决方案

并发编程:Phaser 运行阶段性并发任务

热度:44   发布时间:2024-02-09 19:37:43.0

目录

Phaser

案例说明

一、主程序

二、搜索线程(核心)

三、执行结果


Phaser

当一些并发任务需要分步骤执行时,可以使用该机制。(Phaser类提供了在每一步结束时同步线程的机制,这使得只有当所有线程都完成第一步后,才会有线程开始执行第二步。)

案例说明

下面模拟文件搜索的功能,它拆分为N个阶段进行工作,每个阶段完成后会进入等待,当所有线程都完成该阶段的工作之后,开始执行下一阶段的工作。

 

一、主程序

主程序创建了3个线程,来进行文件的搜索,同时Phaser也设置了3个参与线程数。三个线程统一由phaser对象进行控制,使它们同步得执行每个阶段的工作。

package xyz.jangle.thread.test.n3_5.phaser;import java.util.concurrent.Phaser;/*** 3.5 Phaser 运行阶段性并发任务*   当一些并发任务需要分步骤执行时,可以使用该机制。(Phaser类提供了在每一步结束时同步线程的机制,* 这使得只有当所有线程都完成第一步后,才会有线程开始执行第二步。)* * @author jangle* @email jangle@jangle.xyz* @time 2020年8月11日 下午4:58:36* */
public class M {public static void main(String[] args) {Phaser phaser = new Phaser(3);FileSearch program86 = new FileSearch("C:\\Program Files (x86)", "log", phaser);FileSearch program = new FileSearch("C:\\Program Files", "log", phaser);FileSearch workspace = new FileSearch("C:\\workspace", "log", phaser);Thread thread1 = new Thread(program86, "program86");Thread thread2 = new Thread(program, "program");Thread thread3 = new Thread(workspace, "workspace");thread1.start();thread2.start();thread3.start();try {thread1.join();thread2.join();thread3.join();} catch (InterruptedException e) {e.printStackTrace();}System.out.println("程序结束,phaser is terminated:" + phaser.isTerminated());}}

二、搜索线程(核心)

这个线程实现了几个阶段性的工作,并在每个阶段进行等待。                          

  1. 遍历目录搜索所有符合扩展名的文件,并检查是否有记录。                         
  2. 对结果进行过滤,获取近24小时修改过的文件,并检查是否有记录。                    
  3. 输出这些记录。
  4. 完毕注销。
package xyz.jangle.thread.test.n3_5.phaser;import java.io.File;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;/***  这个线程实现了几个阶段性的工作,并在每个阶段进行等待。*  1、遍历目录搜索所有符合扩展名的文件,并检查是否有记录。*  2、对结果进行过滤,获取近24小时修改过的文件,并检查是否有记录。*  3、输出这些记录。*  4、完毕注销。* @author jangle* @email jangle@jangle.xyz* @time 2020年8月11日 下午5:02:51* */
public class FileSearch implements Runnable {// 需要搜索的文件夹路径private final String initPath;// 要搜索的文件扩展名private final String fileExtension;// 存储满足条件的文件全路径private List<String> results;// 对任务的不同阶段进行同步控制。 importantprivate Phaser phaser;public FileSearch(String initPath, String fileExtension, Phaser phaser) {super();this.initPath = initPath;this.fileExtension = fileExtension;this.phaser = phaser;this.results = new ArrayList<String>();}@Overridepublic void run() {File file = new File(initPath);if (file.isDirectory()) {// 递归调用,遍历所有目录与其子目录directoryProcess(file);}if (!checkResults()) {// 虽然checkResults中从phaser中注销了,但程序还会继续执行,所以这里需要手动return,结束程序。return;}// 筛选近24小时修改过的文件filterResults();if (!checkResults()) {return;}// 用于最终输出所有结果到控制台showInfo();// 注销当前任务phaser.arriveAndDeregister();System.out.println(Thread.currentThread().getName()+"程序执行完毕。");}/*** 递归调用,遍历所有目录与其子目录* * @param file*/private void directoryProcess(File file) {File[] files = file.listFiles();if (files != null) {for (int i = 0; i < files.length; i++) {if (files[i].isDirectory()) {directoryProcess(files[i]);} else {fileProcess(files[i]);}}}}/*** 记录符合扩展名的文件路径* * @param file*/private void fileProcess(File file) {if (file.getName().endsWith(fileExtension)) {results.add(file.getAbsolutePath());}}/*** 筛选近24小时修改过的文件* * @author jangle* @time 2020年8月11日 下午5:24:05*/private void filterResults() {ArrayList<String> newResult = new ArrayList<>();long currentDate = new Date().getTime();for (int i = 0; i < results.size(); i++) {File file = new File(results.get(i));long fileDate = file.lastModified();if (currentDate - fileDate < TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS)) {newResult.add(results.get(i));}}results = newResult;}/*** 用于检查每个阶段结束之后,结果是否为空。* * @author jangle* @time 2020年8月11日 下午5:25:11*/private boolean checkResults() {if (results.isEmpty()) {System.out.println(Thread.currentThread().getName() + "结果为空,,阶段phaser:" + phaser.getPhase() + "。该阶段结束,当前存在参与线程数"+phaser.getRegisteredParties());// 无结果,则完成当前阶段工作,并且不参与后续阶段的工作。phaser.arriveAndDeregister();return false;} else {System.out.println(Thread.currentThread().getName() + "存在結果" + results.size() + ",阶段phaser:" + phaser.getPhase()+",当前存在参与线程数"+phaser.getRegisteredParties());// 有结果,则完成当前阶段工作,并且等待其他线程完成工作,而后进入后续阶段的工作。phaser.arriveAndAwaitAdvance();return true;}}/*** 用于最终输出所有结果到控制台* * @author jangle* @time 2020年8月11日 下午5:41:02*/private void showInfo() {for (int i = 0; i < results.size(); i++) {File file = new File(results.get(i));System.out.println(Thread.currentThread().getName() + ":" + file.getAbsolutePath());}// 等待其他线程完成输出,再进入后续工作。phaser.arriveAndAwaitAdvance();}}

三、执行结果

前3行是搜索文件数量的结果,3-6行是log文件的数量结果,“参与线程数”是指phaser内部的计数,因program无log文件,提前结束了工作内容,从phaser中注销了,故第6行值为2。

workspace存在結果9,阶段phaser:0,当前存在参与线程数3
program86存在結果13,阶段phaser:0,当前存在参与线程数3
program存在結果10,阶段phaser:0,当前存在参与线程数3
workspace存在結果3,阶段phaser:1,当前存在参与线程数3
program结果为空,,阶段phaser:1。该阶段结束,当前存在参与线程数3
program86存在結果1,阶段phaser:1,当前存在参与线程数2
program86:C:\Program Files (x86)\DingDing\main\current_new\debug.log
workspace:C:\workspace\.metadata\.log
workspace:C:\workspace\.metadata\.plugins\org.eclipse.m2e.logback.configuration\0.log
workspace:C:\workspace\.metadata\.plugins\org.eclipse.rse.core\.log
workspace程序执行完毕。
program86程序执行完毕。
程序结束,phaser is terminated:true