本文我们来讨论Semaphore的用法与原理。如果有学习过之前讨论过的[[CountDownLatch详解|CountDownLatch]]、[[CyclicBarrier详解|CyclicBarrier]]的内容,那么本文的阅读难度会非常低。接下来将从用法和源码两个方面来进行讨论。

1. 用法

我们可以将Semaphore理解为一个资源池,共有n个许可证,每个线程要执行指定操作时,去获取许可证,如果获取不到许可证,则阻塞。拿到许可证执行完操作后需要及时地释放(归还)许可证。如JavaDoc中的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class Pool {  
private static final int MAX_AVAILABLE = 100;
private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);

public Object getItem() throws InterruptedException {
available.acquire();
return getNextAvailableItem();
}

public void putItem(Object x) {
if (markAsUnused(x))
available.release();
}

// Not a particularly efficient data structure; just for demo

protected Object[] items = ... whatever kinds of items being managed
protected boolean[] used = new boolean[MAX_AVAILABLE];

protected synchronized Object getNextAvailableItem() {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (!used[i]) {
used[i] = true;
return items[i];
}
}
return null; // not reached
}

protected synchronized boolean markAsUnused(Object item) {
for (int i = 0; i < MAX_AVAILABLE; ++i) {
if (item == items[i]) {
if (used[i]) {
used[i] = false;
return true;
} else
return false;
}
}
return false;
}
}}

以上代码,展示了一个资源池的场景,暴露出两个方法,分别是getItem() putItem(Object x)。池子中一共有MAX_AVAILABLE个资源,当获取的时候,会通过Semaphore的acquire()方法申请许可证;归还资源的时候通过Semaphore的release()释放许可证。

可见其使用方法还是比较简单的。

2. 源码分析

首先讲讲它的整体思路。Semaphore的实现中,将state的状态值赋予的意义为许可证的数量,每次acquire()的时候,state的值都会减1,当release()的时候state的值会加一。当state的值等于0的时候,acquire()的线程将等待,其他线程release()时,如果发现有线程等待,则会唤醒等待的线程。

2.1 构造方法

1
2
3
4
5
6
7
public Semaphore(int permits) {  
sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

通过构造方法可以看出,Semaphore类似于ReentrantLock中,实现了公平锁和非公平锁两种。而这两种同步器的实现,在ReentrantLock中就了解过,其实区别很小,只是在tryAcquireShared()方法的实现上有一点点区别。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
static final class FairSync extends Sync {  
private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
for (;;) {
// 和非公平锁的区别就在这里
// 在尝试获取的时候,先看看等待队列中有没有人等待。
// 如果有人等待的话,直接返回负数,即进入同步队列等待。
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}

static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}


// 位于父类 java.util.concurrent.Semaphore.Sync
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}

可见,公平锁比非公平锁仅仅是尝试获取锁的时候多了一步检查同步队列是否有线程等待的操作。因此后面的源码分析中,默认按公平锁的流程来分析。

2.2 acquire()

接下来我们来分析acquire()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
public void acquire() throws InterruptedException {  
sync.acquireSharedInterruptibly(1);
}


public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 由于是可中断的方法,因此先判断是否被中断过,如果中断过,抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();

// 尝试获取锁,如果发现state - 1后结果小于0,说明许可证不够了
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}

// 尝试获取锁
protected int tryAcquireShared(int acquires) {
for (;;) {
// 同步队列中是否有线程节点在等待, 有的话直接返回-1。进行入队,不再和已经入队的进行争抢资源
if (hasQueuedPredecessors())
return -1;

// 走到这里说明同步队列中没有线程节点在等待
int available = getState();
int remaining = available - acquires;
// 判断令牌的数量获取后的余量是多少
// 如果余量小于0则直接返回
if (remaining < 0 ||
// 如果余量大于等于0,则通过CAS尝试获取。
// 如果失败,则继续尝试。成功则返回余量
compareAndSetState(available, remaining))
return remaining;
}
}

// 进入同步队列进行等待
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 构建共享的线程节点
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
// 如果前驱节点为头节点
if (p == head) {
// 再次尝试获取锁
int r = tryAcquireShared(arg);
// 如果r>=0说明尝试获取成功,开始唤醒
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
// 走到这里说明前驱节点不是head或者许可证余量不足
// 先判断是否应该挂起
if (shouldParkAfterFailedAcquire(p, node) &&
// 挂起
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

2.3 release()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public void release() {  
sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
// 尝试释放共享锁,通过自旋CAS操作,一定会释放成功(忽略几乎不可能出现的情况)。
if (tryReleaseShared(arg)) {
// 唤醒同步队列中等待的线程
doReleaseShared();
return true;
}
return false;
}

protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
// int精度溢出,但是出先的可能性极小
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}

private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
// 如果h != head则继续循环,唤醒节点。给已经唤醒的节点帮忙,提高吞吐率
if (h == head) // loop if head changed
break;
}
}

以上就是Semaphore实现原理的核心流程了。其实大部分内容都是AQS的复用,在之前的讨论中都已经学习过了,仅仅是state被赋予的含义不同。

这里能更加具体地看出 Doug Lea 对于AQS这块儿的设计的精巧之处。把AQS的使用场景抽象到了极致,代码复用也用到了极致。