一、导言
公交车来了,小明、小红要等小军来才上公交车,但是小军迟迟不来,于是小明和小红都迟到了。
CountDownLatch
直译为闭锁,CountDown就是表示倒计时的意思,Latch表示门栓的意思,CountDownLatch
和门栓的含义有点像,可以看做代码执行时的一个门栓,条件不满足就进不了“门”(代码无法继续执行),此处的条件就是“latch”数字为0。
常见于以下几种使用场景(如果还有其他的使用场景,麻烦在评论区补充出来,欢迎交流沟通)。
- 实现最大并行性:当所有线程都满足与某个条件(latch为0)时才能继续执行,在这之前都在等待。
- 有先后顺序的分工合作:例如应用程序启动类要确保在处理用户请求前,所有N个外部系统已经启动和运行了。
- 死锁检测:一个非常方便的使用场景是,你可以使用n个线程访问共享资源,在每次测试阶段的线程数目是不同的,并尝试产生死锁。
二、基本使用
2.1 基本api
一般来说,使用CountDownLatch
只会用到await
的普通或超时版本、countDown
方法、以及一个构造方法。
@Test
public void latch() throws InterruptedException {
CountDownLatch latch = new CountDownLatch(4/*需要countDown4次*/);long maxLatch = latch.getCount();for (int i = 0; i < /*只countDown了3次*/maxLatch - 1; i++) {
new Thread(() -> {
// 对于每个线程都输出线程名System.out.println(Thread.currentThread().getName());sleep(1);//沉睡一秒latch.countDown();}).start();}System.out.println("超时了吗:" + !latch.await(2, TimeUnit.SECONDS));System.out.println("end");/*输出:Thread-0_0Thread-1_1Thread-2_2超时了吗:trueend* */
}
private void sleep(int i) {
try {
Thread.sleep(i * 1_000);} catch (InterruptedException e) {
e.printStackTrace();}
}
上文中的代码示例特地设计成await
等到不了的情况,最后的结果是输出超时了吗:true
。CountDownLatch#await(long, TimeUnit)
的返回值若为true则表示在指定时间内等到了结果,否则表示调用超时。
2.2 自定义工具类: ValueLatch
class ValueLatch<T> {
// 读写都在this上private T value;private final CountDownLatch done = new CountDownLatch(1);public boolean isSet() {
return done.getCount() == 0;}public synchronized void setValue(T value) {
if (!isSet()) /*如果没有这个会多次写,synchronize不保证重复写*/ {
this.value = value;done.countDown();}}public T getValue() {
try {
/*要有超时时间避免堵塞*/done.await(1, TimeUnit.SECONDS);synchronized (this) {
return value;}} catch (InterruptedException e) {
return null;}}
}
该ValueLatch
工具类实现了一个“值门栓”的功能,如果值没有设置则等待,直到值被设置了。具体的使用实例如下代码所示:
@Test
public void valueLatchTest() {
ValueLatch latch = new ValueLatch();new Thread(() -> {
sleep(1);latch.setValue("hello");}).start();// 要等待1秒后才能输出这句话System.out.println("latch.getValue() = " + latch.getValue());
}
三、源码分析
CountDownLatch
的核心逻辑都在其内部的一个静态final类Sync
上,代码如下所示,继承了抽象类AbstractQueuedSynchronizer
,即大名鼎鼎的AQS框架,以下将AbstractQueuedSynchronizer
简称为AQS。
Sync
实现父类AQS的tryAcquireShared
和tryReleaseShared
方法。其中tryAcquireShared
方法会被AQS中的doAcquireShared
、acquireShared
、doAcquireSharedNanos
、doAcquireSharedInterruptibly
调用 ,而tryReleaseShared
方法会被AQS中的releaseShared
方法调用。
简而言之就是Sync实现了AQS框架中共享锁的上锁和解锁的逻辑。
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;// 构造函数Sync(int count) {
setState(count);}int getCount() {
//java.util.concurrent.locks.AbstractQueuedSynchronizer#getStatereturn getState();}// 实现共享锁核心逻辑:tryAcquireSharedprotected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;}protected boolean tryReleaseShared(int releases) {
// 自旋锁for (;;) {
// 获取当前AQS.state: state定义在java.util.concurrent.locks.AbstractQueuedSynchronizer.Node的静态字段中,其中state为0表示已完成或已中断int c = getState();if (c == 0)return false;// 如果状态还没有释放则通过CAS将state字段值-1int nextc = c-1;if (compareAndSetState(c, nextc))return nextc == 0;}}
}
四、参考文章
- 从ReentrantLock的实现看AQS的原理及应用
- Java并发之AQS详解
- CountDownLatch详解
- CountDownLatch使用场景