本文我们来讨论Semaphore的用法与原理。如果有学习过之前讨论过的[[CountDownLatch详解|CountDownLatch]]、[[CyclicBarrier详解|CyclicBarrier]]的内容,那么本文的阅读难度会非常低。接下来将从用法和源码两个方面来进行讨论。
1. 用法
我们可以将Semaphore理解为一个资源池,共有n个许可证,每个线程要执行指定操作时,去获取许可证,如果获取不到许可证,则阻塞。拿到许可证执行完操作后需要及时地释放(归还)许可证。如JavaDoc中的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| class Pool { private static final int MAX_AVAILABLE = 100; private final Semaphore available = new Semaphore(MAX_AVAILABLE, true); public Object getItem() throws InterruptedException { available.acquire(); return getNextAvailableItem(); } public void putItem(Object x) { if (markAsUnused(x)) available.release(); } protected Object[] items = ... whatever kinds of items being managed protected boolean[] used = new boolean[MAX_AVAILABLE]; protected synchronized Object getNextAvailableItem() { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (!used[i]) { used[i] = true; return items[i]; } } return null; } protected synchronized boolean markAsUnused(Object item) { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (item == items[i]) { if (used[i]) { used[i] = false; return true; } else return false; } } return false; } }}
|
以上代码,展示了一个资源池的场景,暴露出两个方法,分别是getItem()和 putItem(Object x)。池子中一共有MAX_AVAILABLE个资源,当获取的时候,会通过Semaphore的acquire()方法申请许可证;归还资源的时候通过Semaphore的release()释放许可证。
可见其使用方法还是比较简单的。
2. 源码分析
首先讲讲它的整体思路。Semaphore的实现中,将state的状态值赋予的意义为许可证的数量,每次acquire()的时候,state的值都会减1,当release()的时候state的值会加一。当state的值等于0的时候,acquire()的线程将等待,其他线程release()时,如果发现有线程等待,则会唤醒等待的线程。
2.1 构造方法
1 2 3 4 5 6 7
| public Semaphore(int permits) { sync = new NonfairSync(permits); }
public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
|
通过构造方法可以看出,Semaphore类似于ReentrantLock中,实现了公平锁和非公平锁两种。而这两种同步器的实现,在ReentrantLock中就了解过,其实区别很小,只是在tryAcquireShared()方法的实现上有一点点区别。如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); }
final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
|
可见,公平锁比非公平锁仅仅是尝试获取锁的时候多了一步检查同步队列是否有线程等待的操作。因此后面的源码分析中,默认按公平锁的流程来分析。
2.2 acquire()
接下来我们来分析acquire()方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException();
if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); }
protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1;
int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
|
2.3 release()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public void release() { sync.releaseShared(1); }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases; if (next < current) throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }
|
以上就是Semaphore实现原理的核心流程了。其实大部分内容都是AQS的复用,在之前的讨论中都已经学习过了,仅仅是state被赋予的含义不同。
这里能更加具体地看出 Doug Lea 对于AQS这块儿的设计的精巧之处。把AQS的使用场景抽象到了极致,代码复用也用到了极致。