前言

Semaphore,中文翻译为信号量,常用来限制某些资源的访问数量。就好像一个十个座位的咖啡厅最多只能接纳10个客户一样。由下面类图可以看出,Semaphore实现了Serializable接口,意味着它可以被虚拟化,同时它有三个子类:Sync,NofairSync,FairSync;其中Sync是Semaphore的核心,NofairSync,FairSync均继承自Sync,它俩的主要区别是在tryAcquireShared(int acquires)方法的实现,FairSync在尝试获取许可前会检查是否有等待的线程,以保证公平性。

image-20240326114212467

使用demo

下面例子中,我们定义了一个同时只能有1个线程进行逻辑处理的信号量,线程2我们先休眠10毫秒,这意味着线程1会先执行逻辑然后休眠1秒再释放资源,此时线程2只有等待线程1释放完后才能继续执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(1);
new Thread(() -> {
try {
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " acquired semaphore");
Thread.sleep(10);
semaphore.release();
System.out.println(Thread.currentThread().getName() + " released semaphore");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();

new Thread(() -> {
try {
Thread.sleep(10);
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + " acquired semaphore");
semaphore.release();
System.out.println(Thread.currentThread().getName() + " released semaphore");
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}

执行结果:

Thread-0 acquired semaphore
Thread-0 released semaphore
Thread-1 acquired semaphore
Thread-1 released semaphore

关键属性

1
private final Sync sync;

Sync类是Semaphore的核心,我们具体看下它的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;

Sync(int permits) { // 构造器,初始化许可证的数量
setState(permits);
}

final int getPermits() { // 获取许可证的数量
return getState();
}

final int nonfairTryAcquireShared(int acquires) { // 尝试获取指定数量的许可,这个方法不保证公平性。
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// 释放指定数量的许可,当一个线程完成了它的任务后,它会调用这个方法来释放它所持有的许可
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}

final void reducePermits(int reductions) { // 减少指定数量的许可
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}

// 获取并返回所有立即可用的许可,如果没有可用的许可,则释放它们。返回的是获取或释放的许可数量。
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}

构造器

1
2
3
4
5
6
7
8
// 构造一个指定许可的非公平的信号量对象
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 构造一个指定许可数量与是否公平的信号量对象,为true则表示公平锁
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

关键方法

获取许可

acquire()

以共享模式获取许可,如果线程被中断则抛出InterruptedException。可以看到是直接调用了AQS的acquireSharedInterruptibly方法实现的。

1
2
3
4
5
6
7
8
9
10
11
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 如果线程被中断就抛出异常,没中断则尝试获取锁,这里的获取锁的方法tryAcquireShared是由Semaphore实现的,没拿到锁则调用acquire加入队列
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted() ||
(tryAcquireShared(arg) < 0 &&
acquire(null, arg, true, true, false, 0L) < 0))
throw new InterruptedException();
}

acquireUninterruptibly()

以非中断的方式获取一个许可,如果没有可用的许可,它会使当前线程阻塞,直到有许可可用。如果线程在等待许可的过程中被中断,它会忽略中断并继续等待许可。

1
2
3
4
5
6
7
8
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
// 注意第四个参数为false,忽略中断
acquire(null, arg, true, false, false, 0L);
}

tryAcquire()

尝试获取一个许可,如果获取成功则返回ture,否则返回false。与acquire()方法唯一的不同就是它会有返回值。

1
2
3
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}

释放许可

release()

释放一个许可,并唤醒下一个等待的线程。

1
2
3
4
5
6
7
8
9
10
11
12
public void release() {
sync.releaseShared(1);
}
// 如果有线程正在等待许可,那么它会唤醒一个等待的线程,并将刚刚释放的许可分配给那个线程。
// releaseShared(int arg)是在AQS中实现,但是tryReleaseShared(arg)是Semaphore实现的。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
signalNext(head);// 唤醒下一个线程
return true;
}
return false;
}