前言

CountDownLatch的主要用途是同步一个或多个任务,使得这些任务在继续执行前必须等待其他任务完成。从类图上可以看到它的内部很简单,只有一个Sync的内部类。其中Sync是继承了AQS的,这意味着CountDownLatch底层还是基于AQS实现的。注意的是CountDownLatch只能等待countDown()前面的逻辑,countDown()后面的逻辑可能会在await()方法后执行,要看系统的调度。

image-20240326140811843

小demo

在这个例子中,我们创建了一个初始计数值为2的CountDownLatch。然后,我们创建了两个线程,每个线程在完成其任务后都会调用countDown()方法。主线程调用await()方法,等待所有其他线程完成任务。当所有其他线程都调用了countDown()方法,计数器的值变为0,主线程才会继续执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class CountDownLatchTest {
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(2);
new Thread(()->{
selfSleep(200);
System.out.println("Thread 1 started");
System.out.println("Thread 1 finished");
latch.countDown();
}).start();
new Thread(()->{
selfSleep(300);
System.out.println("Thread 2 started");
System.out.println("Thread 2 finished");
latch.countDown();
}).start();
latch.await();
System.out.println("All threads finished");
}
private static void selfSleep(int i) {
try {
Thread.sleep(i);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

执行结果:

Thread 1 started
Thread 1 finished
Thread 2 started
Thread 2 finished
All threads finished

关键属性

只有一个Sync属性,但是这个Sync是CountDownLatch的核心,我们看下它的内部实现

1
private final Sync sync;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) { // 初始化倒计数的值
setState(count);
}

int getCount() { // 获取倒计数值
return getState();
}
// 尝试获取共享锁。在CountDownLatch中,如果count(即同步状态)已经为0,那么返回1表示获取成功;否则返回-1表示获取失败。
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

// 尝试释放共享锁。每次调用countDown()方法时,都会尝试释放一次共享锁。方法内部的循环会不断尝试将同步状态(即count)减1,直到count为0或者无法将count减1为止。如果count减1后为0,那么返回true表示释放成功;否则返回false表示释放失败
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

构造器

构造一个指定倒计数值的CountDownLatch

1
2
3
4
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}

关键方法

countDown()

使倒计数减1,tryReleaseShared(arg)方法会将倒计数减1,如果减成0则返回true,同时唤醒主线程,否则返回false,流程结束。

1
2
3
4
5
6
7
8
9
10
11
public void countDown() {
sync.releaseShared(1);
}
// 这里调用的AQS类中的releaseShared(int arg),但是tryReleaseShared(arg)是子类自己实现的,它的原理我们上面也有阐述
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
signalNext(head); // 唤醒下一个线程
return true;
}
return false;
}

await()

如果倒计数值不为0,释放掉当前线程持有的锁,并将自己加入到等待队列中去,否则说明其它线程已处理完毕,直接执行后续逻辑即可。

1
2
3
4
5
6
7
8
9
10
11
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 这个方法维护在AQS中,其实在Semaphore中我们已经看到过这个方法了
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted() ||
(tryAcquireShared(arg) < 0 &&
acquire(null, arg, true, true, false, 0L) < 0))
throw new InterruptedException();
}