当前位置: 代码迷 >> 综合 >> 并发编程系列(十六) CyclicBarrier
  详细解决方案

并发编程系列(十六) CyclicBarrier

热度:48   发布时间:2023-12-06 10:31:41.0

目录

1.1Cyclicbarrier类的属性

1.2 CyclicBarrier的内部类

1.3 Cyclicbarrier的构造器

2.1 await() / await(Long timeout,TimeUnit unit)

2.2 这两个方法最后都会调用doawait()方法;这也是CyclicBarrier的核心方法

2.2breakBarrier方法

2.3nextGeneration() 方法

2.4 reset()方法

2.5 isBroken()方法 判断栅栏是否处在中断的状态


序号 名称 链接地址
1 并发编程系列(一)创建线程的三种方式及线程如何完成上下文如何切换 https://blog.csdn.net/qq_38130094/article/details/103443997
2 并发编程系列(二)之线程的中断 https://blog.csdn.net/qq_38130094/article/details/103444171
3  并发系列(三)线程常用的方法 https://blog.csdn.net/qq_38130094/article/details/103446126
4 并发编程系列(四)之Thread类源码分析(一) https://blog.csdn.net/qq_38130094/article/details/103448160
5 并发编程系列(五)volatile关键字详解 https://blog.csdn.net/qq_38130094/article/details/103448564
6 并发编程系列(六)volatile 之 as-if-serial 指令重排 volatile内存语义 volatile原理 https://blog.csdn.net/qq_38130094/article/details/103543998
7 线程系列(七)synchronized使用方式 https://blog.csdn.net/qq_38130094/article/details/103537663
8  线程系列(八)synchronized实现原理与应用 https://blog.csdn.net/qq_38130094/article/details/103537668
9 并发编程系列(九)ThreadLocala是如何解决共享变量的并发问题及源码分析 https://blog.csdn.net/qq_38130094/article/details/103665098
10 并发编程系列(十)AQS同步器独占锁加锁与解锁-源码解读 https://blog.csdn.net/qq_38130094/article/details/103540315
11 并发编程系列(十一)AQS同步器共享锁加锁解锁源码解读 https://blog.csdn.net/qq_38130094/article/details/103646505
12 并发编程系列(十二)AQS同步器条件锁(Condition)加锁解锁源码解读 https://blog.csdn.net/qq_38130094/article/details/103679146
13 发编程系列(十三)ReentrantLock 重入锁 https://blog.csdn.net/qq_38130094/article/details/103843779
14 发编程系列(十四)Semaphore信号量 https://blog.csdn.net/qq_38130094/article/details/103846420
15 发编程系列(十五) CountDownLatch闭锁 https://blog.csdn.net/qq_38130094/article/details/103855632
16 并发编程系列(十六) CyclicBarrier https://blog.csdn.net/qq_38130094/article/details/103859704

从字面意思理解可以看出循环执行,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。循环是因为当等待线程都被释放后,CyclicBarrier对象可以被重用

栅栏是强制要求线程到达某个临界点才能执行,当都到达时,一起跳过Cyclicbarrier对象执行,

1.1Cyclicbarrier类的属性

    /** 对象入口的重如锁 */private final ReentrantLock lock = new ReentrantLock();/**用于线程间等待与唤醒操作 */private final Condition trip = lock.newCondition();/** 拦截的线程数 */private final int parties;/* 所有线程都到达barrier时执行的任务 */private final Runnable barrierCommand;/** 当前Generation ,每当屏障失效或者开闸之后都会自动替换掉从而实现重置的功能 */private Generation generation = new Generation();/*** 剩余阻塞线程数*/private int count;

1.2 CyclicBarrier的内部类

    //每次使用CyclicBarrier对象都会关联一个Generation 对象//当CyclicBarrier对象发生trip或者reset时,对应的Generation 会发生改变private static class Generation {//标示当前的CyclicBarrier对象是否已经处在中断的状态boolean broken = false;}

1.3 Cyclicbarrier的构造器

    //创建拦截指定线程数的CyclicBarrier对象//并且可以指定在所有线程都进入栅栏后的执行动作public CyclicBarrier(int parties, Runnable barrierAction) {//校验parties值的合法性if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;this.barrierCommand = barrierAction;}/*** 创建拦截指定线程数的CyclicBarrier对象*/public CyclicBarrier(int parties) {this(parties, null);}

CyclicBarrier,默认构造方法是CyclicBarrier(int parties),参数代表屏障拦截的线程数量,每个线程使用await方法告诉CyclicBarrier我已经到达了屏障,然后线程被阻塞在这;另一个构造函数,用于线程到达屏障时优先执行barrierAction操作,方便处理更复杂的场景

2.1 await() / await(Long timeout,TimeUnit unit)

    //无参,不带超时时间参数的方法public int await() throws InterruptedException, BrokenBarrierException {try {return dowait(false, 0L);} catch (TimeoutException toe) {throw new Error(toe); // cannot happen}}//有超时时间的方法public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException {return dowait(true, unit.toNanos(timeout));}

2.2 这两个方法最后都会调用doawait()方法;这也是CyclicBarrier的核心方法

    private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {final ReentrantLock lock = this.lock;//重入锁,获取后控制并发以串行lock.lock();try {//获取当前代final Generation g = generation;//如果当前代坏了,则抛出BrokenBarrierException异常if (g.broken)throw new BrokenBarrierException();//判断当前线程是否被中断if (Thread.interrupted()) {//设置当前代被损坏状态为true,并通知其他阻塞在次栅栏的线程breakBarrier();throw new InterruptedException();}// count 值自减,赋值给indexint index = --count;//如果index值为0;说明这是最后一个到栅栏的线程if (index == 0) {  // tripped//任务是否被执行的标志boolean ranAction = false;try {//获取所有线程到达栅栏后要执行的任务final Runnable command = barrierCommand;//如果command部位空,command执行run方法if (command != null)command.run();ranAction = true;//更新栅栏的状态,并唤醒所有阻塞在这个栅栏的线程nextGeneration();return 0;} finally {//如果执行栅栏时失败了if (!ranAction)//设置当前代的broken状态为true,唤醒所有线程breakBarrier();}}// loop until tripped, broken, interrupted, or timed outfor (;;) {try {//判断是否有时间限制if (!timed)//无时间限制//等待知道被唤醒trip.await();else if (nanos > 0L)//等待指定的时间nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {//捕获InterruptedException 异常//如果当前代没有被损坏if (g == generation && ! g.broken) {//设置broken状态为true//并通知阻塞在次栅栏的其他线程breakBarrier();//把捕获的异常继续抛出throw ie;} else {// 上面条件不满足,说线程不是这代的//不会影响当前这代栅栏的执行只会,标记中断Thread.currentThread().interrupt();}}//如果当前代被破坏if (g.broken)throw new BrokenBarrierException();//g != generation表示正常换代了//返回当前线程所在栅栏的下标//如果g == generation说明还没有换代,那为什么会醒呢?//因为一个线程可以使用多个栅栏//当别的栅栏唤醒这个线程,就会走到这里,所以需要判断是否是当前代//正是因为这个原因才需要generation来保证正确if (g != generation)return index;//如果有时间限制,且小于等于0;if (timed && nanos <= 0L) {//唤醒所有线程 # 并抛出异常breakBarrier();throw new TimeoutException();}}} finally {lock.unlock();}}

等待状态:

如果该线程不是最后一个调用await方法的线程,则它会一直处在等待状态,除非发生以下情况之一

  1. 最后一个线程到达,即index=0;
  2. 某个参与线程等待超时;
  3. 某个参与线程被中断
  4. 调用了Cyclicbarrierde reset方法,将屏障重置为初始状态

BrokenBarrierException 异常

如果一个线程处在等待状态时,如果其他线程调用reset()或者调用的barrier原本就是被破坏的,则抛出BrokenBarrierException异常

任何线程在等待时被中断了,则其他线程都将抛出BrokenBarrierException异常,并将barrier置于损坏状态

2.2breakBarrier方法

默认barrier是没有损坏的

当barrier损坏了或者有一个线程中断了,则通过breakBarrier来终止所有线程

当barrier损坏或者有一个线程中断了,则通过breakBarrier来终止所有线程

在breakBarrier中除了将broken设置为true,还会调用signall将在CyclicBarrier处在等待状态的线程全部唤醒

    private void breakBarrier() {//设置状态generation.broken = true;//恢复正在等待进入屏障的线程数量count = parties;//唤醒所有线程trip.signalAll();}

2.3nextGeneration() 方法

    //当barrier发送trip时,用于更新状态并唤醒每一个线程//这一个方法只有持有lock时被调用private void nextGeneration() {// signal completion of last generation//唤醒所有线程trip.signalAll();//恢复正在等待的进入屏障的线程数量count = parties;//新生一代generation = new Generation();}

2.4 reset()方法

    /***将barrier状态重置,如果此时有线程在barrier处等待,他们会抛出BrokenBarrierException并返回*注意:由于其他原因发生broken后重置可能会很复杂,线程需要一些方式来完成同步,并选择一种方式来完成reset*相对为后续的使用重建一个barrier,次重置操作更受欢迎*注意:这是一个需要加锁的操作*/public void reset() {final ReentrantLock lock = this.lock;lock.lock();try {breakBarrier();   // break the current generationnextGeneration(); // start a new generation} finally {lock.unlock();}}

2.5 isBroken()方法 判断栅栏是否处在中断的状态

    public boolean isBroken() {final ReentrantLock lock = this.lock;lock.lock();try {return generation.broken;} finally {lock.unlock();}}

 

  相关解决方案