前言

AbstractQueuedSynchronizer(AQS),翻译成中文为抽象队列同步器,是JAVA中很多类处理锁机制的底层方法,比如我们后面要讲的ReentrantLock、CountDownLatch、Semaphore等都是基于AQS来实现的。

image-20240325142003554

关键属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 等待队列的头节点。当一个线程获取同步状态成功后,该线程所对应的Node会成为head。当head的waitStatus不再需要阻塞后续的线程时,可以将其waitStatus设置为SIGNAL,这样新的节点就可以安全的加入队列。
private transient volatile Node head;

// 等待队列的尾节点。新的节点通过CAS操作加入到队列的尾部。
private transient volatile Node tail;

// 同步状态,它的属性含义由子类定义,ReentrantLock中表示重入次数,Semaphore中表示可用的许可证数量,CountDownLatch中表示倒计数的数量。
private volatile int state;
// 下面三个字段用于标识线程节点的状态
// 表示线程正在等待获取锁。当一个线程尝试获取锁但是失败时,它会被放入等待队列,并且它的状态会被设置为WAITING。
static final int WAITING = 1; // must be 1
// 表示线程已经取消等待获取锁。当一个线程被中断或者超时,它的状态会被设置为CANCELLED
static final int CANCELLED = 0x80000000; // must be negative
// 当一个线程调用Condition.await方法时,它的状态会被设置为COND。这个状态表示线程正在等待一个条件变量,而不是等待获取锁
static final int COND = 2; // in a condition wait
// Node的属性。
abstract static class Node {
volatile Node prev; // initially attached via casTail
volatile Node next; // visibly nonnull when signallable
Thread waiter; // visibly nonnull when enqueued
volatile int status; // written by owner, atomic bit ops by others
}
// 独占模式下的Node
static final class ExclusiveNode extends Node { }
// 共享模式下的Node
static final class SharedNode extends Node { }

关键方法

竞争锁

下面为首个节点获取锁并入队的逻辑,大致是获取锁,如果获取锁失败就进入队列,并使当前线程进入挂起状态。我为每步都标好了注释,然后我们通过两种情况描述一下执行流程

  1. 独占模式,进入队列前获取到了锁,此时就不进入队列了

    S2 -> S2.2 -> S2.3

  2. 独占模式,未设置超时时间,成功进入队列

    第一轮:S2 -> S2.2 -> S2.3 -> S3 // 构造Node

    第二轮:S2 -> S2.2 -> S4 -> S4.1 // 初始化链表头部

    第三轮:S2 -> S2.2 -> S4 -> S4.3 // 加入队列

    第四轮:S2 -> S2.2 -> S6 // 更改Node status状态为等待中

    第五轮:S2 -> S2.2 -> S7 -> S7.1 // 挂起线程

    如果过了超时时间,或者被其它线程唤醒,假设这次获取到了锁并且是头部元素,那么会有最后一轮的遍历:

    第六轮:S2 -> S2.2 -> S2.3 -> S2.3.1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
final int acquire(Node node, int arg, boolean shared,
boolean interruptible, boolean timed, long time) {
Thread current = Thread.currentThread();
byte spins = 0, postSpins = 0; // retries upon unpark of first thread
boolean interrupted = false, first = false;
Node pred = null; // predecessor of node when enqueued

// 一直尝试获取锁,直到成功或者被中断
for (;;) {
if (!first && (pred = (node == null) ? null : node.prev) != null &&
!(first = (head == pred))) { //S1 如果不是head节点并且前驱节点不为null和头部节点
if (pred.status < 0) { //S1.1 小于0表示前驱节点已经被取消,触发清理队列操作
cleanQueue();
continue;
} else if (pred.prev == null) { //S1.2 前驱节点的pred为null,确保序列化
Thread.onSpinWait();
continue;
}
}
if (first || pred == null) { //S2 如果是头部节点或者还没入队列
boolean acquired;
try {
if (shared) //S2.1 尝试以共享模式获取锁
acquired = (tryAcquireShared(arg) >= 0);
else //S2.2 尝试以独占模式获取锁
acquired = tryAcquire(arg);
} catch (Throwable ex) {
cancelAcquire(node, interrupted, false);
throw ex;
}
if (acquired) { //S2.3 获取锁成功,继续后续处理
if (first) { //S2.3.1 如果是头部元素
node.prev = null;
head = node;
pred.next = null;
node.waiter = null;
if (shared) // 共享模式下唤醒下一个节点
signalNextIfShared(node);
if (interrupted) // 如果被中断,则中断当前线程
current.interrupt();
}
return 1;
}
}
if (node == null) { //S3 Node为null,就创建一个节点
if (shared)
node = new SharedNode();
else
node = new ExclusiveNode();
} else if (pred == null) { //S4 加入队列
node.waiter = current; // 将等待指针指向自己,表示当前线程正在等待获取锁
Node t = tail;
node.setPrevRelaxed(t); // 获取队列的尾节点,并将其赋值给node.pre
if (t == null) //S4.1 t == null为true说明队列为空,此时会初始化队列的头部,即构造一个空节点并将head和tail指向此空节点
tryInitializeHead();
else if (!casTail(t, node)) //S4.2 将当前节点入队
node.setPrevRelaxed(null); // back out
else
t.next = node; //S4.3 将前驱节点的next指针指向当前Node
} else if (first && spins != 0) { //S5 进入自旋等待模式
--spins; // reduce unfairness on rewaits
Thread.onSpinWait();
} else if (node.status == 0) { //S6 将节点的状态改为等待获取锁
node.status = WAITING;
} else { // S7 没有获取到锁,使当前线程进去等待状态
long nanos;
spins = postSpins = (byte)((postSpins << 1) | 1);
if (!timed) //S7.1 如果未设置超时时间,挂起当前线程
LockSupport.park(this);
else if ((nanos = time - System.nanoTime()) > 0L) //S7.2 进入超时等待模式
LockSupport.parkNanos(this, nanos);
else
break;
node.clearStatus(); // 将status设置为0。因为线程已经在等待中类,所以要清除节点状态
if ((interrupted |= Thread.interrupted()) && interruptible)
break;
}
}
// 如果获取锁失败,取消当前线程对锁的获取请求
return cancelAcquire(node, interrupted, interruptible);
}

释放锁

释放锁并通知下个Node。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public final boolean release(int arg) {
// tryRelease()方法由各个子类去实现。
if (tryRelease(arg)) {
signalNext(head);
return true;
}
return false;
}
// 如果有下个Node等待被唤醒,通过LockSupport工具类唤醒它。
private static void signalNext(Node h) {
Node s;
if (h != null && (s = h.next) != null && s.status != 0) {
s.getAndUnsetStatus(WAITING);
LockSupport.unpark(s.waiter);
}
}

Condition原理

用来替代传统的Object的wait()、notify()实现线程间的协作,相比使用Object的wait()、notify(),使用Condition中的await()、signal()这种方式实现线程间协作更加安全和高效。因此通常来说比较推荐使用Condition。

demo

这里使用ReentrantLock来写一个例子,从结果可以看到,线程1在调用await()方法后会被挂起,直到线程2通知后才继续执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public static void conditionTest(){
Condition condition = lock.newCondition();
new Thread(() -> {
try {
lock.lock();
System.out.println("线程一加锁成功");
System.out.println("线程一执行await被挂起");
condition.await();
System.out.println("线程一被唤醒成功");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println("线程一释放锁成功");
}
}).start();

new Thread(() -> {
try {
Thread.sleep(1000);
lock.lock();
System.out.println("线程二加锁成功");
condition.signal();
System.out.println("线程二唤醒线程一");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
System.out.println("线程二释放锁成功");
}
}).start();
}
// 执行结果:
线程一加锁成功
线程一执行await被挂起
线程二加锁成功
线程二唤醒线程一
线程二释放锁成功
线程一被唤醒成功
线程一释放锁成功

关键属性

1
2
3
4
// 第一个节点
private transient ConditionNode firstWaiter;
// 最后一个节点
private transient ConditionNode lastWaiter;

我们再看一下ConditionNode,发现它是继承自父类的Node,只是多了一个自己的属性。

1
2
3
4
5
6
7
8
9
10
11
12
13
static final class ConditionNode extends Node
implements ForkJoinPool.ManagedBlocker {
ConditionNode nextWaiter; // 关联下一个等待节点

public final boolean isReleasable() {
return status <= 1 || Thread.currentThread().isInterrupted();
}

public final boolean block() {
while (!isReleasable()) LockSupport.park();
return true;
}
}

await方法

将当前线程的ConditionNode放入队列并释放锁,然后将自己挂起等待其它线程唤醒。在唤醒后再次尝试获取锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
ConditionNode node = new ConditionNode();
// 入队并释放锁
int savedState = enableWait(node);
LockSupport.setCurrentBlocker(this); // 将自己阻塞掉
boolean interrupted = false, cancelled = false, rejected = false;
while (!canReacquire(node)) {
if (interrupted |= Thread.interrupted()) {
if (cancelled = (node.getAndUnsetStatus(COND) & COND) != 0)
break; // else interrupted after signal
} else if ((node.status & COND) != 0) {
try {
if (rejected)
node.block();
else
// 将当前线程挂起
ForkJoinPool.managedBlock(node);
} catch (RejectedExecutionException ex) {
rejected = true;
} catch (InterruptedException ie) {
interrupted = true;
}
} else
Thread.onSpinWait(); // awoke while enqueuing
}
LockSupport.setCurrentBlocker(null); // 清除阻塞状态
node.clearStatus(); // 清除等待状态
acquire(node, savedState, false, false, false, 0L); // 加锁
if (interrupted) {
if (cancelled) {
unlinkCancelledWaiters(node);
throw new InterruptedException();
}
Thread.currentThread().interrupt();
}
}

我们看下enableWait(node)方法,它主要负责入队列和释放锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private int enableWait(ConditionNode node) {
if (isHeldExclusively()) { // 是否独占地持有锁
node.waiter = Thread.currentThread(); // 将当前线程设置为节点的等待线程
node.setStatusRelaxed(COND | WAITING); // 设置节点状态,这里表示节点正在等待条件
ConditionNode last = lastWaiter;
if (last == null) // 如果上个节点为null,说明是第一个节点,将node赋值给firstWaiter
firstWaiter = node;
else // 否则,将node赋值给nextWaiter,表示是上个节点的下个节点。
last.nextWaiter = node;
lastWaiter = node; // 将lastWaiter指向这个节点
int savedState = getState();
if (release(savedState))// 释放锁
return savedState;
}
node.status = CANCELLED; // lock not held or inconsistent
throw new IllegalMonitorStateException();
}

signal方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 这个方法主要是将第一个节点从条件等待队列中移动到持有锁的等待队列中去
public final void signal() {
ConditionNode first = firstWaiter;
if (!isHeldExclusively()) // 如果没有持有锁,就抛出异常
throw new IllegalMonitorStateException();
if (first != null)
doSignal(first, false);
}
private void doSignal(ConditionNode first, boolean all) {
while (first != null) { // 遍历条件队列
ConditionNode next = first.nextWaiter; // 获取下一个条件节点
if ((firstWaiter = next) == null) // 如果下一个条件节点为空,说明条件队列已空,此时将lastWaiter置空
lastWaiter = null;
if ((first.getAndUnsetStatus(COND) & COND) != 0) {
// 如果节点的状态不是COND,继续找下一个节点。否则执行enqueue方法
enqueue(first);
if (!all)
break;
}
first = next;
}
}
// 处理入队逻辑,即将这个Node加入到AQS的同步队列的尾部,等待后续竞争锁。
final void enqueue(Node node) {
if (node != null) {
for (;;) {
Node t = tail;
node.setPrevRelaxed(t); // avoid unnecessary fence
if (t == null) // initialize
tryInitializeHead();
else if (casTail(t, node)) { //加入队列尾部
t.next = node;
if (t.status < 0)// 如果t的状态小于0,那么就将当前线程唤醒,唤醒后的线程还是要等待获取锁后才能处理后续逻辑
LockSupport.unpark(node.waiter);
break;
}
}
}
}