当前位置: 代码迷 >> 综合 >> CyclicBarrier 深入源码解析
  详细解决方案

CyclicBarrier 深入源码解析

热度:0   发布时间:2024-01-17 01:24:19.0

??CyclicBarrier意为“循环栅栏”,是一个可循环利用的屏障。CyclicBarrier可以使指定数量线程到达阻塞点后继续后续任务。

??演示示例:

??首先,新建一个线程类,用于模拟多线程环境。

package com.securitit.serialize.juc;import java.util.concurrent.CyclicBarrier;public class CyclicBarrierThread extends Thread {
    // CyclicBarrier实例.private CyclicBarrier cyclicBarrier;// Constructor.public CyclicBarrierThread(CyclicBarrier cyclicBarrier) {
    this.cyclicBarrier = cyclicBarrier;}@Overridepublic void run() {
    try {
    Thread.sleep(1000);System.out.println(Thread.currentThread().getName() + " 到达循环栅栏-A");cyclicBarrier.await();System.out.println(Thread.currentThread().getName() + " 通过循环栅栏-A");Thread.sleep(2000);System.out.println(Thread.currentThread().getName() + " 到达循环栅栏-B");cyclicBarrier.await();System.out.println(Thread.currentThread().getName() + " 通过循环栅栏-B");} catch (Exception e) {
    e.printStackTrace();}}}

??在测试类中,启动多线程对循环栅栏CyclicBarrier进行测试。

package com.securitit.serialize.juc;import java.util.concurrent.CyclicBarrier;public class CyclicBarrierTester {
    // CyclicBarrier实例.private static CyclicBarrier cyclicBarrier = new CyclicBarrier(3);public static void main(String[] args) throws Exception {
    new CyclicBarrierThread(cyclicBarrier).start();new CyclicBarrierThread(cyclicBarrier).start();new CyclicBarrierThread(cyclicBarrier).start();}}

??输出结果:

Thread-2 到达循环栅栏-A
Thread-1 到达循环栅栏-A
Thread-0 到达循环栅栏-A
Thread-1 通过循环栅栏-A
Thread-0 通过循环栅栏-A
Thread-2 通过循环栅栏-A
Thread-2 到达循环栅栏-B
Thread-0 到达循环栅栏-B
Thread-1 到达循环栅栏-B
Thread-1 通过循环栅栏-B
Thread-0 通过循环栅栏-B
Thread-2 通过循环栅栏-B

??从输出结果可以看出,与线程启动顺序无关,所有线程都是到达栅栏位置然后继续运行,到达栅栏线程的多少取决于CyclicBarrier实例化是指定的栅栏数量。

??源码分析:

package java.util.concurrent;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;public class CyclicBarrier {
    // private static class Generation {
    boolean broken = false;}// 守卫栅栏进入的锁.private final ReentrantLock lock = new ReentrantLock();// 等待条件.private final Condition trip = lock.newCondition();// 总共需要等待的线程数.private final int parties;// 所有等待线程都到达时执行该命令.private final Runnable barrierCommand;// Generation.private Generation generation = new Generation();// 在等待栅栏的数量.private int count;// 下一栅栏.private void nextGeneration() {
    // 唤醒所有正在等待的吃的人.trip.signalAll();// 重置部分数据.count = parties;generation = new Generation();}// 打破栅栏.private void breakBarrier() {
    generation.broken = true;count = parties;trip.signalAll();}// 等待线程.private int dowait(boolean timed, long nanos)throws InterruptedException, BrokenBarrierException,TimeoutException {
    final ReentrantLock lock = this.lock;lock.lock();try {
    final Generation g = generation;// 屏障已被破坏.抛出异常.if (g.broken)throw new BrokenBarrierException();// 线程响应中断.if (Thread.interrupted()) {
    breakBarrier();throw new InterruptedException();}// 当前线程到达后剩余线程数.int index = --count;// 线程已全部到达.if (index == 0) {
      // trippedboolean ranAction = false;try {
    final Runnable command = barrierCommand;if (command != null)command.run();ranAction = true;// 生成新一代唤醒等待线程,重置count、parties.nextGeneration();return 0;} finally {
    // 执行失败,打破栅栏.if (!ranAction)breakBarrier();}}// for (;;) {
    // 设置线程阻塞.try {
    if (!timed)trip.await();else if (nanos > 0L)nanos = trip.awaitNanos(nanos);} catch (InterruptedException ie) {
    if (g == generation && ! g.broken) {
    breakBarrier();throw ie;} else {
    // We're about to finish waiting even if we had not// been interrupted, so this interrupt is deemed to// "belong" to subsequent execution.Thread.currentThread().interrupt();}}// 栅栏被打破,抛出异常.if (g.broken)throw new BrokenBarrierException();if (g != generation)return index;// 超时,打破栅栏,抛出异常.if (timed && nanos <= 0L) {
    breakBarrier();throw new TimeoutException();}}} finally {
    lock.unlock();}}// Construtor.public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();this.parties = parties;this.count = parties;this.barrierCommand = barrierAction;}// Construtor.public CyclicBarrier(int parties) {
    this(parties, null);}// 获取需要等待的线程数.public int getParties() {
    return parties;}// 阻塞线程.public int await() throws InterruptedException, BrokenBarrierException {
    try {
    return dowait(false, 0L);} catch (TimeoutException toe) {
    throw new Error(toe); // cannot happen}}// 带超时时间阻塞线程.public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException {
    return dowait(true, unit.toNanos(timeout));}// 栅栏是否已被破解.public boolean isBroken() {
    final ReentrantLock lock = this.lock;lock.lock();try {
    return generation.broken;} finally {
    lock.unlock();}}// 重置栅栏数据和状态,CyclicBarrierpublic void reset() {
    final ReentrantLock lock = this.lock;lock.lock();try {
    // 打破当前栅栏信息.breakBarrier();  // 开始处理下一栅栏信息。nextGeneration(); // start a new generation} finally {
    lock.unlock();}}// 已达到等待线程数量.public int getNumberWaiting() {
    final ReentrantLock lock = this.lock;lock.lock();try {
    return parties - count;} finally {
    lock.unlock();}}
}

??从“演示示例“到”源码分析“,CyclicBarrier的使用以及源码相对比较简单,“源码分析”中是CyclicBarrier全部API。

??注:文中源码均来自于JDK1.8版本,不同版本间可能存在差异。

??如果有哪里有不明白或不清楚的内容,欢迎留言哦!

  相关解决方案