本文来讨论ReentrantReadWriteLock的实现原理。本文的内容客观上看不算太难,但是内容相对较多,需要有一定AQS的基础,如:AQS、独占锁、共享锁等。这些内容可以参考之前的一些文章,如:[[ReentrantLock详解(一)—— 加锁和解锁|ReentrantLock详解]]、[[CountDownLatch详解]]、[[CyclicBarrier详解]]等。

1. ReentrantReadWriteLock是什么?

翻译过来,名为读写锁。其中有两个锁示例,用于分别对读操作和写操作进行同步控制。其中读锁是共享锁,可以同时被多个线程同时使用,写锁是独占锁,只能由一个线程持有。且持有写锁的时候可以获取读锁,但是持有读锁的时候无法持有写锁。

OK,现在还是老规矩,直接看看JavaDoc中给出的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class CachedData {  
Object data;
volatile boolean cacheValid;
final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
try {
// Recheck state because another thread might have
// acquired write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
} finally {
rwl.writeLock().unlock(); // Unlock write, still hold read
}
}

try {
use(data);
} finally {
rwl.readLock().unlock();
}
}
}}

以上示例代码是实现了一个缓存操作的功能,可见可以单独控制读和写的操作。

接下来对其原理进行分析。

2. 总体结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ReentrantReadWriteLock implements ReadWriteLock, java.io.Serializable {  

private static final long serialVersionUID = -6992448646407690164L;

private final ReentrantReadWriteLock.ReadLock readerLock;

private final ReentrantReadWriteLock.WriteLock writerLock;

final Sync sync;

abstract static class Sync extends AbstractQueuedSynchronizer {...}

public static class ReadLock implements Lock, java.io.Serializable {...}

public static class WriteLock implements Lock, java.io.Serializable {...}

public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
// ......
}

此时看整体的结构,可以验证前面所提到的,ReentrantReadWriteLock具有两个锁实例,一个是读锁,一个写锁。两个锁也都是基于AQS来实现,且也支持选择公平锁和非公平锁。

再看读写锁两个的实现:

Pasted image 20240616155554

可见,两个锁是公用了同一个同步器sync。那么,是怎么实现一个共享一个独占的呢?

在理解这个问题之前,需要先回忆一下AQS中的两个获取锁的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public final void acquireShared(int arg) {  
// 对于共享锁来说,如果tryAcquireShared的返回值小于0,就说明没有拿到共享锁。
// 拿CountDownLatch举例,如果sate不等于0,就说明没有获取到共享锁
if (tryAcquireShared(arg) < 0)
// 阻塞
// 当达到共享锁的条件后,解锁并通过setHeadAndPropagate进行解锁后续线程节点
doAcquireShared(arg);
}

public final void acquire(int arg) {
// 返回false为争抢资源失败,否则为成功。
// 以ReentrantLock举例,state用于记录重入次数
if (!tryAcquire(arg) &&
// 入队并阻塞
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

可以知道,共享锁和独占锁,对于state的操作是不同的逻辑。独占锁只允许持有锁的线程对state进行操作。而共享锁可以由多个线程操作。

为了解决这个问题,在ReentrantReadWriteLock中,对state进行了分块使用。即,高16位用于共享模式的读锁,低16位用于独占模式的写锁。不同的锁操作各自的范围即可实现两种锁复用同一个Sync,也就是同一个state状态。

3. 源码分析

接下来进行源码分析。

3.1 Sync同步器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
abstract static class Sync extends AbstractQueuedSynchronizer {

// 几个位运算用的常量和位运算,此处不做详细分析
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

static int sharedCount(int c) { return c >>> SHARED_SHIFT; }

static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

// 记录每个线程持有的读锁数量
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = getThreadId(Thread.currentThread());
}

// ThreadLocal
static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}

private transient ThreadLocalHoldCounter readHolds;

// 作为一个缓存,记录"最后一个获取读锁的线程"的读锁重入次数
// 不用每次都去ThreadLocal中读
private transient HoldCounter cachedHoldCounter;

// 第一个获取读锁的线程(并且其未释放读锁)
private transient Thread firstReader = null;
// 第一个获取读锁持有的读锁数量
private transient int firstReaderHoldCount;

Sync() {
// 使用ThreadLocal记录每个线程持有读锁的数量
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}
}

对于以上的代码,其中有几个不好理解的变量:cachedHoldCounter、firstReader、firstReaderHoldCount。这几个其实都是用于性能上的优化。如果不理解可以继续往后看,在后面的代码分析中可以慢慢理解其原因。

3.2 读锁

首先来看读锁的获取:

1
2
3
4
5
6
7
8
9
public void lock() {  
sync.acquireShared(1);
}

public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

以上这几步如果之前有基础,或者看过前面几篇同步工具相关的文章的话,应该非常熟悉,就是老生常谈的获取锁的流程。其中我们主要需要看的是 tryAcquireShared(arg)方法的实现,因为这个是ReentrantReadWriteLock中Sync自己的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
protected final int tryAcquireShared(int unused) {
// 介绍:
// 1. 如果写锁被另一个线程持有,则会失败。
// 2. 否则,这个线程有资格锁定wrt状态,所以询问它是否应该因为队列策略而阻塞。如果不是,尝试通过套管状态和更新计数来授予。注意,step不会检查可重入的获取,这被推迟到完整版本,以避免在更典型的不可重入情况下必须检查持有计数。
// 3. 如果步骤2失败,要么因为线程显然不符合条件,要么CAS失败或数量饱和,用完整的重试循环链到版本。
Thread current = Thread.currentThread();
int c = getState();
// 如果有线程在持有独占锁也就是写锁
if (exclusiveCount(c) != 0 &&
// 而且不是当前线程持有的
getExclusiveOwnerThread() != current)
// 那么直接返回一个负数,表示尝试获取锁失败
return -1;

int r = sharedCount(c);

// 检查读锁是否需要阻塞
if (!readerShouldBlock() &&
// 防止溢出,但是这种可能性很小
r < MAX_COUNT &&
// 获取共享锁
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
// 如果r为0,则说明获取之前的持有共享锁的数量为0,也就是说当前的线程是第一个获取共享锁的
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
// 如果第一个获取共享锁的是自己,那么获取共享锁的次数加一
firstReaderHoldCount++;
} else {
// cachedHoldCounter是用于缓存最后一个获取共享锁的线程
HoldCounter rh = cachedHoldCounter;
// 如果缓存为空或者不是当前线程的话,直接将当前的线程赋给cachedHoldCounter
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
// 走到这里说明最后一次获取共享锁的线程是本线程,但是获取锁的次数为0
// 说明之前已经通过readHolds.get(); 初始化过ThreadLocal。那么将rh放入ThreadLocal
readHolds.set(rh);
//
rh.count++;
}

// 返回正数,表示可以获取锁
return 1;
}
return fullTryAcquireShared(current);
}

在以上代码中,我们主要是讨论 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) 这个条件成立的情况下的代码。此时,我们来分析其中的几个条件。

首先来看,readerShouldBlock()。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 公平锁
static final class FairSync extends Sync {
//......
// 对于公平锁来说,只要同步队列中有线程在等待,就应该阻塞
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}


// 非公平锁
static final class NonfairSync extends Sync {
// ......
final boolean readerShouldBlock() {
return apparentlyFirstQueuedIsExclusive();
}
}

final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
// 返回第一个节点是否为写锁,如果是写锁就应该阻塞
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}

在以上代码中,公平锁的实现很简单,就是说只要有其他线程在阻塞,那么不管阻塞的是什么线程都阻塞住。而非公平锁中,就算同步队列中有线程持有读锁,那也可以争抢一下。
但是如果同步队列的第一个节点如果持有的是写锁就不能争抢了。可以看出在这里,ReentrantReadWriteLock对于写锁的线程还是有一定的偏向的。这样做的目的我觉的是因为,在实际运行环境中,大部分都是读的任务远多与写的任务,如果不对写线程有一定的偏向性,就有可能造成线程饥饿,写线程迟迟无法抢到资源。

接下来两个条件很简单,r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT), 即判断读线程数量是否溢出、以及CAS更新state状态值,不做详细解释。

那么就是说,只有这几个条件至少其中一个不满足时才会不进入这个if分支中,而进入return fullTryAcquireShared(current);这段代码。接下来我们分析这段代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
final int fullTryAcquireShared(Thread current) {  
HoldCounter rh = null;
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0) {
// 如果当前独占锁的重入次数不为0,且持有写锁的线程不是当前线程,则直接返回-1
if (getExclusiveOwnerThread() != current)
return -1;
} else if (readerShouldBlock()) {
// 走到这里说明独占锁的重入次数为0,且readerShouldBlock() 为 true
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
// 如果第一个获取读锁的线程不是当前线程
if (rh == null) {
// **************************
// 再次提醒:cachedHoldCounter表示的是最后一次获取读锁的线程和其重入次数
rh = cachedHoldCounter;

if (rh == null || rh.tid != getThreadId(current)) {
// 如果缓存为空或者记录的缓存不是当前线程(缓存失效),则从ThreadLocal中拿
rh = readHolds.get();
// 如果当前线程从来没有初始化过ThreadLocal中的值,那么会再get的时候初始化。
// 如果此时的rh.count == 0 说明,这个rh就是上一行刚刚初始化的。
if (rh.count == 0)
// 那就将ThreadLocal删掉,直接走到下面代码中,返回-1
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
// **************************
// 这部分就是用来处理重入的问题,防止一个线程前面的已经获取了读锁,后面又被塞到同步队列中了
}
}

// 判断溢出
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");

// 通过CAS更新state状态来获取读锁
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 获取共享锁成功

// 如果获取共享锁的数量为0,则初始化一下两个缓存
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;

} else if (firstReader == current) {
// 如果当前线程就是第一个获取共享锁的线程,只需要更新冲入次数
firstReaderHoldCount++;
} else {
// 走到这里说明 共享锁次数不为0,且当前线程不是第一个获取共享锁的线程‘
// 将cachedHoldCounter设置为当前的线程
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
// 能走到这个分支中,就说明已经获取到了读锁
return 1;
}
}
}

接下来如果返回值为负数,则进入分支执行doAcquireShared(arg)。这个方法在之前的CountDownLatch中已经详细讨论过,此处不做过多解释。

至此,读锁的获取也就是加锁的部分已经了解完了,接下来看看读锁的释放。

1
2
3
4
5
6
7
8
9
10
11
public void unlock() {  
sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

依然是熟悉的套路,不做详细解释,只针对读写锁自己实现的tryReleaseShared(arg)方法进行分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
protected final boolean tryReleaseShared(int unused) {  
Thread current = Thread.currentThread();
if (firstReader == current) {
// 如果当前线程就是第一个获取读锁的线程
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
// 如果当前线程的持有锁的数量为1,那么释放一次之后就是0了,此线程就结束了。
// 锁已将firstReader置为null,至于firstReaderHoldCount为什么不置为0呢?
// 因为没有必要,后续线程使用这个缓存的时候,会自己设置。
firstReader = null;
else
// 如果不是1,则减一
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
// 查看缓存的最后一个获取读锁的线程
if (rh == null || rh.tid != getThreadId(current))
// 若缓存未命中,则从ThreadLocal中拿
rh = readHolds.get();
// 看看当前线程重入了多少次
int count = rh.count;
// 如果小于等于1,说明这一次释放后就此线程对读锁就用完了
if (count <= 1) {
// 那么将ThreadLocal中存的删掉
readHolds.remove();
// 这里是针对同一个线程 解锁次数比加锁次数多的情况,跑出异常
if (count <= 0)
throw unmatchedUnlockException();
}

// 当前线程的重如次数减一
--rh.count;
}

// 通过自旋CAS保证对state中读锁部分更改成功
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
// 当更新后的state读锁部分(高16位)的状态未0的话,说明已经没有线程持有读锁了,可以释放读锁了。然后将会唤醒同步队列中的写线程
return nextc == 0;
}
}

tryReleaseShared(arg)返回true后,则会进入共享锁的解锁阶段:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void doReleaseShared() {  
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}

此处不做详细解释。

3.3 写锁

以上讨论了共享锁读锁的加锁和解锁的流程,接下来讨论一下独占锁写锁。首先来讨论加锁流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
public void lock() {  
sync.acquire(1);
}

public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

protected final boolean tryAcquire(int acquires) {
/*
* Walkthrough:
* 1. If read count nonzero or write count nonzero
* and owner is a different thread, fail.
* 2. If count would saturate, fail. (This can only
* happen if count is already nonzero.)
* 3. Otherwise, this thread is eligible for lock if
* it is either a reentrant acquire or
* queue policy allows it. If so, update state
* and set owner.
*/
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
// (Note: if c != 0 and w == 0 then shared count != 0)
// state状态不为0,独占锁的重入次数(低16位)为0,那么说明共享锁不为0,肯定不能直接获取到锁
// 或者 当前线程不是持有写锁的线程
if (w == 0 || current != getExclusiveOwnerThread())
return false;

// 防止溢出
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");

// 走到这里的话,就是锁重入。无需CAS,直接更新state即可
setState(c + acquires);
return true;
}

// 走到这里说明,state为0
// 判断写线程是否应该被阻塞
if (writerShouldBlock() ||
// 如果不用阻塞,则尝试CAS争抢锁
!compareAndSetState(c, c + acquires))
// 争抢失败,返回false
return false;

// 争抢成功,将独占锁持有线程设置为当前线程
setExclusiveOwnerThread(current);
return true;
}

// 公平锁
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}

// 非公平锁
final boolean writerShouldBlock() {
return false; // writers can always barge
}

以上就是写锁的整体流程了,逻辑还是比较简单。接下来讨论一下写锁的释放。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public void unlock() {  
sync.release(1);
}

public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

protected final boolean tryRelease(int releases) {
// 先看看解锁的是不是当前的线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();

int nextc = getState() - releases;
// 重入次数减一,看是否为0
boolean free = exclusiveCount(nextc) == 0;
// 如果为0,则这个线程解锁成功。将写锁的持有者置为空
if (free)
setExclusiveOwnerThread(null);
// 更新state状态
setState(nextc);
// 返回解锁结果
return free;
}

以上就是对读写锁的整体流程的分析。