前言
CountDownLatch的主要用途是同步一个或多个任务,使得这些任务在继续执行前必须等待其他任务完成。从类图上可以看到它的内部很简单,只有一个Sync的内部类。其中Sync是继承了AQS的,这意味着CountDownLatch底层还是基于AQS实现的。注意的是CountDownLatch只能等待countDown()前面的逻辑,countDown()后面的逻辑可能会在await()方法后执行,要看系统的调度。
data:image/s3,"s3://crabby-images/cec70/cec709495eee189c385acfd281efb0e659435505" alt="image-20240326140811843"
小demo
在这个例子中,我们创建了一个初始计数值为2的CountDownLatch。然后,我们创建了两个线程,每个线程在完成其任务后都会调用countDown()方法。主线程调用await()方法,等待所有其他线程完成任务。当所有其他线程都调用了countDown()方法,计数器的值变为0,主线程才会继续执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class CountDownLatchTest { public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(2); new Thread(()->{ selfSleep(200); System.out.println("Thread 1 started"); System.out.println("Thread 1 finished"); latch.countDown(); }).start(); new Thread(()->{ selfSleep(300); System.out.println("Thread 2 started"); System.out.println("Thread 2 finished"); latch.countDown(); }).start(); latch.await(); System.out.println("All threads finished"); } private static void selfSleep(int i) { try { Thread.sleep(i); } catch (InterruptedException e) { throw new RuntimeException(e); } } }
|
执行结果:
Thread 1 started
Thread 1 finished
Thread 2 started
Thread 2 finished
All threads finished
关键属性
只有一个Sync属性,但是这个Sync是CountDownLatch的核心,我们看下它的内部实现
1
| private final Sync sync;
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| private static final class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) { setState(count); }
int getCount() { return getState(); } protected int tryAcquireShared(int acquires) { return (getState() == 0) ? 1 : -1; } protected boolean tryReleaseShared(int releases) { for (;;) { int c = getState(); if (c == 0) return false; int nextc = c - 1; if (compareAndSetState(c, nextc)) return nextc == 0; } } }
|
构造器
构造一个指定倒计数值的CountDownLatch
1 2 3 4
| public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
|
关键方法
countDown()
使倒计数减1,tryReleaseShared(arg)方法会将倒计数减1,如果减成0则返回true,同时唤醒主线程,否则返回false,流程结束。
1 2 3 4 5 6 7 8 9 10 11
| public void countDown() { sync.releaseShared(1); }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { signalNext(head); return true; } return false; }
|
await()
如果倒计数值不为0,释放掉当前线程持有的锁,并将自己加入到等待队列中去,否则说明其它线程已处理完毕,直接执行后续逻辑即可。
1 2 3 4 5 6 7 8 9 10 11
| public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted() || (tryAcquireShared(arg) < 0 && acquire(null, arg, true, true, false, 0L) < 0)) throw new InterruptedException(); }
|