当前位置: 代码迷 >> 综合 >> Java 并发 (10) -- CountDownLatch 类
  详细解决方案

Java 并发 (10) -- CountDownLatch 类

热度:72   发布时间:2023-12-16 13:15:08.0

文章目录

  • 1. 简介
  • 2. 精讲
    • 1. 概念
    • 2. 实现
      • 1. 构造器
      • 2. await()
      • 3. countDown()
      • 4. 总结
    • 3. 应用场景
    • 4. 示例

1. 简介

  1. CountDownLatch 是通过一个计数器来实现的,当我们在 new 一个 CountDownLatch 对象的时候需要传入该计数器值,该值就表示了线程的数量。每当一个线程完成自己的任务后,计数器的值就会减 1。当计数器的值变为 0 时,就表示所有的线程均已经完成了任务,然后就可以恢复等待的线程继续执行

  2. 更具体地说,CountDownLatch 底层是采用共享锁来实现的,在创建 CountDownLatch 实例时,需要传递一个整型的参数:count,该参数是计数器的初始值,也可以理解为该共享锁可以被获取的总次数。

    当某个线程调用 await() 方法时,程序首先判断 count 的值是否为 0,如果不会 0 的话则会一直等待直到 count 为 0。

    当其他线程调用 countDown() 方法时,会执行释放共享锁状态,即,使 count 值 - 1。当在创建 CountDownLatch 时如果初始化的整型参数值为 a,就必须要有 a 个线程调用 countDown() 方法才会使计数器的值等于 0,锁才会释放,前面等待的线程才会继续运行。

    另外 CountDownLatch 不能回滚重置

  3. CountDownLatch 可以用于某一线程在开始运行前需要等待 n 个线程执行完毕的场景,比如启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。还可以用于实现多个线程开始执行任务的最大并行性,有点类似赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。还可以用于死锁检查,比如可以使用 n 个线程访问共享资源,然后观察每次测试阶段的线程数目

2. 精讲

1. 概念

CyclicBarrier 所描述的是 “允许一组线程互相等待,直到到达某个公共屏障点,才会进行后续任务",而 CountDownLatch 所描述的是 ”在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待“。

在 API 中是这样描述的:用给定的计数初始化 CountDownLatch。由于调用了 countDown() 方法,所以在当前计数到达零之前,await() 方法会一直受阻塞。之后,会释放所有等待的线程,await() 的所有后续调用都将立即返回。这种现象只出现一次,因为计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier
在这里插入图片描述
CountDownLatch 是通过一个计数器来实现的,当我们在 new 一个 CountDownLatch 对象的时候需要传入该计数器值,该值就表示了线程的数量。每当一个线程完成自己的任务后,计数器的值就会减 1。当计数器的值变为 0 时,就表示所有的线程均已经完成了任务,然后就可以恢复等待的线程继续执行了。

虽然,CountDownlatch 与 CyclicBarrier 有那么点相似,但是他们还是存在一些区别的:

  1. CountDownLatch 的作用是允许 1 或 N 个线程等待其他线程完成执行;而 CyclicBarrier 则是允许 N 个线程相互等待
  2. CountDownLatch 的计数器无法被重置;CyclicBarrier 的计数器可以被重置后使用,因此它被称为是循环的 barrier

2. 实现

1. 构造器

在这里插入图片描述
通过上面的结构图我们可以看到,CountDownLatch 内部依赖 Sync 实现,而 Sync 继承 AQS。CountDownLatch 仅提供了一个构造方法:

CountDownLatch(int count): 构造一个用给定计数初始化的 CountDownLatch

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);
}

sync 为 CountDownLatch 的一个内部类,其定义如下:

private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;Sync(int count) {
    setState(count);}//获取同步状态int getCount() {
    return getState();}//获取同步状态protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;}//释放同步状态protected boolean tryReleaseShared(int releases) {
    for (;;) {
    int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}
}

通过这个内部类 Sync 我们可以清楚地看到 CountDownLatch 是采用共享锁来实现的

2. await()

CountDownLatch 提供 await() 方法来使当前线程在锁存器倒计数至 0 之前一直等待,除非线程被中断,定义如下:

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}

await() 内部调用了 AQS 的 acquireSharedInterruptibly(int arg)

public final void acquireSharedInterruptibly(int arg)throws InterruptedException {
    if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}

在内部类 Sync 中重写了 tryAcquireShared(int arg) 方法:

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1; //如果计数器值不等于0,那么r会一直小于0,为 -1
}

getState() 可以获取同步状态,其值等于计数器的值,从这里我们可以看到如果计数器值不等于 0,则会调用 doAcquireSharedInterruptibly(int arg),该方法为一个自旋方法会尝试一直去获取同步状态:

private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);boolean failed = true;try {
    for (;;) {
    final Node p = node.predecessor();if (p == head) {
    //对于CountDownLatch而言,如果计数器值不等于0,那么r会一直小于0int r = tryAcquireShared(arg);if (r >= 0) {
    setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}//等待if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {
    if (failed)cancelAcquire(node);}
}

3. countDown()

CountDownLatch 提供 countDown() 方法递减锁存器的计数,如果计数到达 0,则释放所有等待的线程。

public void countDown() {
    sync.releaseShared(1);
}

内部调用 AQS 的 releaseShared(int arg) 方法来释放共享锁同步状态:

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
    doReleaseShared();return true;}return false;
}

tryReleaseShared(int arg) 方法被 CountDownLatch 的内部类 Sync 重写:

protected boolean tryReleaseShared(int releases) {
    for (;;) {
    //获取锁状态int c = getState();//c == 0 直接返回,释放锁成功if (c == 0)return false;//计算新“锁计数器”,每次都减一int nextc = c-1;//更新锁状态(计数器)if (compareAndSetState(c, nextc))return nextc == 0;}
}

4. 总结

CountDownLatch 内部通过共享锁实现。在创建 CountDownLatch 实例时,需要传递一个 int 型的参数:count,该参数是计数器的初始值,也可以理解为该共享锁可以被获取的总次数。

当某个线程调用 await() 方法时,程序首先判断 count 的值是否为 0,如果不会 0 的话则会一直等待直到为 0 为止。

当其他线程调用 countDown() 方法时,则执行释放共享锁状态,使 count 值 - 1。当在创建 CountDownLatch 时如果初始化的 int 型参数值为 count,则必须要有 count 个线程调用 countDown() 方法才会使计数器 count 等于 0,锁才会释放,前面等待的线程才会继续运行。注意 CountDownLatch 不能回滚重置

3. 应用场景

  1. 某一线程在开始运行前需要等待 n 个线程执行完毕,比如启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。

  2. 实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。

  3. 死锁检测:一个非常方便的使用场景是,你可以使用 n 个线程访问共享资源,在每次测试阶段的线程数目是不同的,并尝试产生死锁

4. 示例

示例仍然使用开会案例。老板进入会议室等待 5 个人全部到达会议室才会开会。所以这里有两个线程:老板等待开会线程、员工到达会议室:

public class CountDownLatchTest {
    private static CountDownLatch countDownLatch = new CountDownLatch(5);/*** Boss线程,等待员工到达开会*/static class BossThread extends Thread{
    @Overridepublic void run() {
    System.out.println("Boss在会议室等待,总共有" + countDownLatch.getCount() + "个人开会...");try {
    //Boss等待countDownLatch.await();} catch (InterruptedException e) {
    e.printStackTrace();}System.out.println("所有人都已经到齐了,开会吧...");}}//员工到达会议室static class EmpleoyeeThread  extends Thread{
    @Overridepublic void run() {
    System.out.println(Thread.currentThread().getName() + ",到达会议室....");//员工到达会议室 count - 1countDownLatch.countDown();}}public static void main(String[] args){
    //Boss线程启动new BossThread().start();for(int i = 0,j = countDownLatch.getCount() ; i < j ; i++){
    new EmpleoyeeThread().start();}}
}

在这里插入图片描述
这里等待的线程只有 BossThread

  相关解决方案