前言
Semaphore,中文翻译为信号量,常用来限制某些资源的访问数量。就好像一个十个座位的咖啡厅最多只能接纳10个客户一样。由下面类图可以看出,Semaphore实现了Serializable接口,意味着它可以被虚拟化,同时它有三个子类:Sync,NofairSync,FairSync;其中Sync是Semaphore的核心,NofairSync,FairSync均继承自Sync,它俩的主要区别是在tryAcquireShared(int acquires)方法的实现,FairSync在尝试获取许可前会检查是否有等待的线程,以保证公平性。

使用demo
下面例子中,我们定义了一个同时只能有1个线程进行逻辑处理的信号量,线程2我们先休眠10毫秒,这意味着线程1会先执行逻辑然后休眠1秒再释放资源,此时线程2只有等待线程1释放完后才能继续执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public static void main(String[] args) { Semaphore semaphore = new Semaphore(1); new Thread(() -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + " acquired semaphore"); Thread.sleep(10); semaphore.release(); System.out.println(Thread.currentThread().getName() + " released semaphore"); } catch (InterruptedException e) { e.printStackTrace(); } }).start();
new Thread(() -> { try { Thread.sleep(10); semaphore.acquire(); System.out.println(Thread.currentThread().getName() + " acquired semaphore"); semaphore.release(); System.out.println(Thread.currentThread().getName() + " released semaphore"); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); }
|
执行结果:
Thread-0 acquired semaphore
Thread-0 released semaphore
Thread-1 acquired semaphore
Thread-1 released semaphore
关键属性
1
| private final Sync sync;
|
Sync类是Semaphore的核心,我们具体看下它的定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) { setState(permits); }
final int getPermits() { return getState(); }
final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } } final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } }
|
构造器
1 2 3 4 5 6 7 8
| public Semaphore(int permits) { sync = new NonfairSync(permits); }
public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
|
关键方法
获取许可
acquire()
以共享模式获取许可,如果线程被中断则抛出InterruptedException。可以看到是直接调用了AQS的acquireSharedInterruptibly方法实现的。
1 2 3 4 5 6 7 8 9 10 11
| public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted() || (tryAcquireShared(arg) < 0 && acquire(null, arg, true, true, false, 0L) < 0)) throw new InterruptedException(); }
|
acquireUninterruptibly()
以非中断的方式获取一个许可,如果没有可用的许可,它会使当前线程阻塞,直到有许可可用。如果线程在等待许可的过程中被中断,它会忽略中断并继续等待许可。
1 2 3 4 5 6 7 8
| public void acquireUninterruptibly() { sync.acquireShared(1); } public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) acquire(null, arg, true, false, false, 0L); }
|
tryAcquire()
尝试获取一个许可,如果获取成功则返回ture,否则返回false。与acquire()方法唯一的不同就是它会有返回值。
1 2 3
| public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; }
|
释放许可
release()
释放一个许可,并唤醒下一个等待的线程。
1 2 3 4 5 6 7 8 9 10 11 12
| public void release() { sync.releaseShared(1); }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { signalNext(head); return true; } return false; }
|