文章目录
- 1. 简介
- 2. 精讲
-
- 1. 概念
- 2. 实现
-
- 1. 构造器
- 2. await()
- 3. countDown()
- 4. 总结
- 3. 应用场景
- 4. 示例
1. 简介
-
CountDownLatch 是通过一个计数器来实现的,当我们在 new 一个 CountDownLatch 对象的时候需要传入该计数器值,该值就表示了线程的数量。每当一个线程完成自己的任务后,计数器的值就会减 1。当计数器的值变为 0 时,就表示所有的线程均已经完成了任务,然后就可以恢复等待的线程继续执行
-
更具体地说,CountDownLatch 底层是采用共享锁来实现的,在创建 CountDownLatch 实例时,需要传递一个整型的参数:count,该参数是计数器的初始值,也可以理解为该共享锁可以被获取的总次数。
当某个线程调用
await()
方法时,程序首先判断 count 的值是否为 0,如果不会 0 的话则会一直等待直到 count 为 0。当其他线程调用
countDown()
方法时,会执行释放共享锁状态,即,使 count 值 - 1。当在创建 CountDownLatch 时如果初始化的整型参数值为 a,就必须要有 a 个线程调用countDown()
方法才会使计数器的值等于 0,锁才会释放,前面等待的线程才会继续运行。另外 CountDownLatch 不能回滚重置
-
CountDownLatch 可以用于某一线程在开始运行前需要等待 n 个线程执行完毕的场景,比如启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。还可以用于实现多个线程开始执行任务的最大并行性,有点类似赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。还可以用于死锁检查,比如可以使用 n 个线程访问共享资源,然后观察每次测试阶段的线程数目
2. 精讲
1. 概念
CyclicBarrier 所描述的是 “允许一组线程互相等待,直到到达某个公共屏障点,才会进行后续任务",而 CountDownLatch 所描述的是 ”在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待“。
在 API 中是这样描述的:用给定的计数初始化 CountDownLatch。由于调用了 countDown()
方法,所以在当前计数到达零之前,await()
方法会一直受阻塞。之后,会释放所有等待的线程,await()
的所有后续调用都将立即返回。这种现象只出现一次,因为计数无法被重置。如果需要重置计数,请考虑使用 CyclicBarrier
CountDownLatch 是通过一个计数器来实现的,当我们在 new 一个 CountDownLatch 对象的时候需要传入该计数器值,该值就表示了线程的数量。每当一个线程完成自己的任务后,计数器的值就会减 1。当计数器的值变为 0 时,就表示所有的线程均已经完成了任务,然后就可以恢复等待的线程继续执行了。
虽然,CountDownlatch 与 CyclicBarrier 有那么点相似,但是他们还是存在一些区别的:
- CountDownLatch 的作用是允许 1 或 N 个线程等待其他线程完成执行;而 CyclicBarrier 则是允许 N 个线程相互等待
- CountDownLatch 的计数器无法被重置;CyclicBarrier 的计数器可以被重置后使用,因此它被称为是循环的 barrier
2. 实现
1. 构造器
通过上面的结构图我们可以看到,CountDownLatch 内部依赖 Sync 实现,而 Sync 继承 AQS。CountDownLatch 仅提供了一个构造方法:
CountDownLatch(int count)
: 构造一个用给定计数初始化的 CountDownLatch
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);
}
sync 为 CountDownLatch 的一个内部类,其定义如下:
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;Sync(int count) {
setState(count);}//获取同步状态int getCount() {
return getState();}//获取同步状态protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;}//释放同步状态protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}
}
通过这个内部类 Sync 我们可以清楚地看到 CountDownLatch 是采用共享锁来实现的
2. await()
CountDownLatch 提供 await()
方法来使当前线程在锁存器倒计数至 0 之前一直等待,除非线程被中断,定义如下:
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
await()
内部调用了 AQS 的 acquireSharedInterruptibly(int arg)
:
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {
if (Thread.interrupted())throw new InterruptedException();if (tryAcquireShared(arg) < 0)doAcquireSharedInterruptibly(arg);
}
在内部类 Sync 中重写了 tryAcquireShared(int arg)
方法:
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1; //如果计数器值不等于0,那么r会一直小于0,为 -1
}
getState()
可以获取同步状态,其值等于计数器的值,从这里我们可以看到如果计数器值不等于 0,则会调用 doAcquireSharedInterruptibly(int arg)
,该方法为一个自旋方法会尝试一直去获取同步状态:
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {
final Node node = addWaiter(Node.SHARED);boolean failed = true;try {
for (;;) {
final Node p = node.predecessor();if (p == head) {
//对于CountDownLatch而言,如果计数器值不等于0,那么r会一直小于0int r = tryAcquireShared(arg);if (r >= 0) {
setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}//等待if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {
if (failed)cancelAcquire(node);}
}
3. countDown()
CountDownLatch 提供 countDown()
方法递减锁存器的计数,如果计数到达 0,则释放所有等待的线程。
public void countDown() {
sync.releaseShared(1);
}
内部调用 AQS 的 releaseShared(int arg)
方法来释放共享锁同步状态:
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();return true;}return false;
}
tryReleaseShared(int arg)
方法被 CountDownLatch 的内部类 Sync 重写:
protected boolean tryReleaseShared(int releases) {
for (;;) {
//获取锁状态int c = getState();//c == 0 直接返回,释放锁成功if (c == 0)return false;//计算新“锁计数器”,每次都减一int nextc = c-1;//更新锁状态(计数器)if (compareAndSetState(c, nextc))return nextc == 0;}
}
4. 总结
CountDownLatch 内部通过共享锁实现。在创建 CountDownLatch 实例时,需要传递一个 int 型的参数:count,该参数是计数器的初始值,也可以理解为该共享锁可以被获取的总次数。
当某个线程调用 await()
方法时,程序首先判断 count 的值是否为 0,如果不会 0 的话则会一直等待直到为 0 为止。
当其他线程调用 countDown()
方法时,则执行释放共享锁状态,使 count 值 - 1。当在创建 CountDownLatch 时如果初始化的 int 型参数值为 count,则必须要有 count 个线程调用 countDown()
方法才会使计数器 count 等于 0,锁才会释放,前面等待的线程才会继续运行。注意 CountDownLatch 不能回滚重置
3. 应用场景
-
某一线程在开始运行前需要等待 n 个线程执行完毕,比如启动一个服务时,主线程需要等待多个组件加载完毕,之后再继续执行。
-
实现多个线程开始执行任务的最大并行性。注意是并行性,不是并发,强调的是多个线程在某一时刻同时开始执行。类似于赛跑,将多个线程放到起点,等待发令枪响,然后同时开跑。
-
死锁检测:一个非常方便的使用场景是,你可以使用 n 个线程访问共享资源,在每次测试阶段的线程数目是不同的,并尝试产生死锁
4. 示例
示例仍然使用开会案例。老板进入会议室等待 5 个人全部到达会议室才会开会。所以这里有两个线程:老板等待开会线程、员工到达会议室:
public class CountDownLatchTest {
private static CountDownLatch countDownLatch = new CountDownLatch(5);/*** Boss线程,等待员工到达开会*/static class BossThread extends Thread{
@Overridepublic void run() {
System.out.println("Boss在会议室等待,总共有" + countDownLatch.getCount() + "个人开会...");try {
//Boss等待countDownLatch.await();} catch (InterruptedException e) {
e.printStackTrace();}System.out.println("所有人都已经到齐了,开会吧...");}}//员工到达会议室static class EmpleoyeeThread extends Thread{
@Overridepublic void run() {
System.out.println(Thread.currentThread().getName() + ",到达会议室....");//员工到达会议室 count - 1countDownLatch.countDown();}}public static void main(String[] args){
//Boss线程启动new BossThread().start();for(int i = 0,j = countDownLatch.getCount() ; i < j ; i++){
new EmpleoyeeThread().start();}}
}
这里等待的线程只有 BossThread