当前位置: 代码迷 >> 综合 >> Java并发6:CountDownLatch CyclicBarrier Semaphore
  详细解决方案

Java并发6:CountDownLatch CyclicBarrier Semaphore

热度:93   发布时间:2023-11-30 17:44:29.0

0、前言

这篇博客是AQS共享模式的应用。主要包括三个:

  • Semaphore
  • CountDownLatch
  • CyclicBarrier

1、CountDownLatch

CountDownLatch 非常实用,我们常常会将一个比较大的任务进行拆分,然后开启多个线程来执行,等所有线程都执行完了以后,再往下执行其他操作。
构建出这么一个场景:有 m 个线程是做任务的,有 n 个线程在某个栅栏上等待这 m 个线程做完任务,直到所有 m 个任务完成后,n 个线程同时通过栅栏。在这里插入图片描述
源码

构造方法,需要传入一个不小于 0 的整数:

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);
}
// 老套路了,内部封装一个 Sync 类继承自 AQS
private static final class Sync extends AbstractQueuedSynchronizer {
    Sync(int count) {
    // 这样就 state == count 了setState(count);}...
}

对于 CountDownLatch,我们仅仅需要关心两个方法,一个是 countDown() 方法,另一个是 await() 方法。
countDown() 方法每次调用都会将 state 减 1,直到 state 的值为 0;而 await 是一个阻塞方法,当 state 减为 0 的时候,await 方法才会返回。await 可以被多个线程调用,读者这个时候脑子里要有个图:所有调用了 await 方法的线程阻塞在 AQS 的阻塞队列中,等待条件满足(state == 0),将线程从队列中一个个唤醒过来。

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {
    // 这也是老套路了,我在第二篇的中断那一节说过了if (Thread.interrupted())throw new InterruptedException();// t3 和 t4 调用 await 的时候,state 都大于 0(state 此时为 2)。// 也就是说,这个 if 返回 true,然后往里看if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}
// 只有当 state == 0 的时候,这个方法才会返回 1
protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {
    // 1. 入队final Node node = addWaiter(Node.SHARED);boolean failed = true;try {
    for (;;) {
    final Node p = node.predecessor();if (p == head) {
    // 同上,只要 state 不等于 0,那么这个方法返回 -1int r = tryAcquireShared(arg);if (r >= 0) {
    setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}// 2if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {
    if (failed)cancelAcquire(node);}
}

我们来仔细分析这个方法,线程 t3 经过第 1 步 addWaiter 入队以后,我们应该可以得到这个:

在这里插入图片描述

由于 tryAcquireShared 这个方法会返回 -1,所以 if (r >= 0) 这个分支不会进去。到 shouldParkAfterFailedAcquire 的时候,t3 将 head 的 waitStatus 值设置为 -1,如下:
在这里插入图片描述

然后进入到 parkAndCheckInterrupt 的时候,t3 挂起。
我们再分析 t4 入队,t4 会将前驱节点 t3 所在节点的 waitStatus 设置为 -1,t4 入队后,应该是这样的:
在这里插入图片描述
然后,t4 也挂起。接下来,t3 和 t4 就等待唤醒了。
接下来,我们来看唤醒的流程。为了让下面的示意图更丰富些,我们假设用 10 初始化 CountDownLatch。
在这里插入图片描述

public void countDown() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    // 只有当 state 减为 0 的时候,tryReleaseShared 才返回 true// 否则只是简单的 state = state - 1 那么 countDown() 方法就结束了// 将 state 减到 0 的那个操作才是最复杂的,继续往下吧if (tryReleaseShared(arg)) {
    // 唤醒 await 的线程doReleaseShared();return true;}return false;
}
// 这个方法很简单,用自旋的方法实现 state 减 1
protected boolean tryReleaseShared(int releases) {
    for (;;) {
    int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}
}

countDown 方法就是每次调用都将 state 值减 1,如果 state 减到 0 了,那么就调用下面的方法进行唤醒阻塞队列中的线程:

// 调用这个方法的时候,state == 0
// 这个方法先不要看所有的代码,按照思路往下到我写注释的地方,我们先跑通一个流程,其他的之后还会仔细分析
private void doReleaseShared() {
    for (;;) {
    Node h = head;if (h != null && h != tail) {
    int ws = h.waitStatus;// t3 入队的时候,已经将头节点的 waitStatus 设置为 Node.SIGNAL(-1) 了if (ws == Node.SIGNAL) {
    // 将 head 的 waitStatue 设置为 0if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            // loop to recheck cases// 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点// 在这里,也就是唤醒 t3unparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) // todocontinue;                // loop on failed CAS}if (h == head)                   // loop if head changedbreak;}
}

一旦 t3 被唤醒后,我们继续回到 await 的这段代码,parkAndCheckInterrupt 返回,我们先不考虑中断的情况:

private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);boolean failed = true;try {
    for (;;) {
    final Node p = node.predecessor();if (p == head) {
    int r = tryAcquireShared(arg);if (r >= 0) {
    setHeadAndPropagate(node, r); // 2. 这里是下一步p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&// 1. 唤醒后这个方法返回parkAndCheckInterrupt())throw new InterruptedException();}} finally {
    if (failed)cancelAcquire(node);}
}

接下来,t3 会进到 setHeadAndPropagate(node, r) 这个方法,先把 head 给占了,然后唤醒队列中其他的线程:

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check belowsetHead(node);// 下面说的是,唤醒当前 node 之后的节点,即 t3 已经醒了,马上唤醒 t4// 类似的,如果 t4 后面还有 t5,那么 t4 醒了以后,马上将 t5 给唤醒了if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {
    Node s = node.next;if (s == null || s.isShared())// 又是这个方法,只是现在的 head 已经不是原来的空节点了,是 t3 的节点了doReleaseShared();}
}

又回到这个方法了,那么接下来,我们好好分析 doReleaseShared 这个方法,我们根据流程,头节点 head 此时是 t3 节点了:

// 调用这个方法的时候,state == 0
private void doReleaseShared() {
    for (;;) {
    Node h = head;// 1. h == null: 说明阻塞队列为空// 2. h == tail: 说明头结点可能是刚刚初始化的头节点,// 或者是普通线程节点,但是此节点既然是头节点了,那么代表已经被唤醒了,阻塞队列没有其他节点了// 所以这两种情况不需要进行唤醒后继节点if (h != null && h != tail) {
    int ws = h.waitStatus;// t4 将头节点(此时是 t3)的 waitStatus 设置为 Node.SIGNAL(-1) 了if (ws == Node.SIGNAL) {
    // 这里 CAS 失败的场景请看下面的解读if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;            // loop to recheck cases// 就是这里,唤醒 head 的后继节点,也就是阻塞队列中的第一个节点// 在这里,也就是唤醒 t4unparkSuccessor(h);}else if (ws == 0 &&// 这个 CAS 失败的场景是:执行到这里的时候,刚好有一个节点入队,入队会将这个 ws 设置为 -1!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS}// 如果到这里的时候,前面唤醒的线程已经占领了 head,那么再循环// 否则,就是 head 没变,那么退出循环,// 退出循环是不是意味着阻塞队列中的其他节点就不唤醒了?当然不是,唤醒的线程之后还是会调用这个方法的if (h == head)                   // loop if head changedbreak;}
}

我们分析下最后一个 if 语句,然后才能解释第一个 CAS 为什么可能会失败:
h == head:说明头节点还没有被刚刚用 unparkSuccessor 唤醒的线程(这里可以理解为 t4)占有,此时 break 退出循环。
h != head:头节点被刚刚唤醒的线程(这里可以理解为 t4)占有,那么这里重新进入下一轮循环,唤醒下一个节点(这里是 t4 )。我们知道,等到 t4 被唤醒后,其实是会主动唤醒 t5、t6、t7…,那为什么这里要进行下一个循环来唤醒 t5 呢?我觉得是出于吞吐量的考虑。
满足上面的 2 的场景,那么我们就能知道为什么上面的 CAS 操作 compareAndSetWaitStatus(h, Node.SIGNAL, 0) 会失败了?
因为当前进行 for 循环的线程到这里的时候,可能刚刚唤醒的线程 t4 也刚刚好到这里了,那么就有可能 CAS 失败了。
for 循环第一轮的时候会唤醒 t4,t4 醒后会将自己设置为头节点,如果在 t4 设置头节点后,for 循环才跑到 if (h == head),那么此时会返回 false,for 循环会进入下一轮。t4 唤醒后也会进入到这个方法里面,那么 for 循环第二轮和 t4 就有可能在这个 CAS 相遇,那么就只会有一个成功了。

2、CyclicBarrier

字面意思是“可重复使用的栅栏”或“周期性的栅栏”,总之不是用了一次就没用了的,CyclicBarrier 相比 CountDownLatch 来说,要简单很多,其源码没有什么高深的地方,它是 ReentrantLock 和 Condition 的组合使用。看如下示意图,CyclicBarrier 和 CountDownLatch 是不是很像,只是 CyclicBarrier 可以有不止一个栅栏,因为它的栅栏(Barrier)可以重复使用(Cyclic)。
在这里插入图片描述
首先,CyclicBarrier 的源码实现和 CountDownLatch 大相径庭,CountDownLatch 基于 AQS 的共享模式的使用,而 CyclicBarrier 基于 Condition 来实现。
因为 CyclicBarrier 的源码相对来说简单许多,读者只要熟悉了前面关于 Condition 的分析,那么这里的源码是毫无压力的,就是几个特殊概念罢了。
先用一张图来描绘下 CyclicBarrier 里面的一些概念,和它的基本使用流程:
在这里插入图片描述
源码不想写了

3、Semaphore

3.1、加锁解锁流程

  • Semaphore 有点像一个停车场,permits 就好像停车位数量,当线程获得了 permits 就像是获得了停车位,然后停车场显示空余车位减一
  1. 刚开始,permits(state)为 3,这时 5 个线程来获取资源
    在这里插入图片描述
    2、假设其中 Thread-1,Thread-2,Thread-4 cas 竞争成功,而 Thread-0 和 Thread-3 竞争失败,进入 AQS 队列park 阻塞
    在这里插入图片描述
    3、接下来 Thread-0 竞争成功,permits 再次设置为 0,设置自己为 head 节点,断开原来的 head 节点,unpark 接下来的 Thread-3 节点,但由于 permits 是 0,因此 Thread-3 在尝试不成功后再次进入 park 状态
    在这里插入图片描述

3.2、源码

static final class NonfairSync extends Sync {
    NonfairSync(int permits) {
    // permits 即 statesuper(permits);}
}
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {
    if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}protected int tryAcquireShared(int acquires) {
    return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
    int available = getState();int remaining = available - acquires;if (remaining < 0 ||compareAndSetState(available, remaining))return remaining;}
}private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);boolean failed = true;try {
    for (;;) {
    final Node p = node.predecessor();if (p == head) {
    int r = tryAcquireShared(arg);if (r >= 0) {
    setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {
    if (failed)cancelAcquire(node);}
}
public void release() {
    sync.releaseShared(1);
}public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
    doReleaseShared();return true;}return false;
}protected final boolean tryReleaseShared(int releases) {
    for (;;) {
    int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}
}
  相关解决方案