本文来讨论ReentrantReadWriteLock的实现原理。本文的内容客观上看不算太难,但是内容相对较多,需要有一定AQS的基础,如:AQS、独占锁、共享锁等。这些内容可以参考之前的一些文章,如:[[ReentrantLock详解(一)—— 加锁和解锁|ReentrantLock详解]]、[[CountDownLatch详解]]、[[CyclicBarrier详解]]等。
1. ReentrantReadWriteLock是什么? 翻译过来,名为读写锁。其中有两个锁示例,用于分别对读操作和写操作进行同步控制。其中读锁是共享锁,可以同时被多个线程同时使用,写锁是独占锁,只能由一个线程持有。且持有写锁的时候可以获取读锁,但是持有读锁的时候无法持有写锁。
OK,现在还是老规矩,直接看看JavaDoc中给出的例子:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 class CachedData { Object data; volatile boolean cacheValid; final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock (); void processCachedData () { rwl.readLock().lock(); if (!cacheValid) { rwl.readLock().unlock(); rwl.writeLock().lock(); try { if (!cacheValid) { data = ... cacheValid = true ; } rwl.readLock().lock(); } finally { rwl.writeLock().unlock(); } } try { use(data); } finally { rwl.readLock().unlock(); } } }}
以上示例代码是实现了一个缓存操作的功能,可见可以单独控制读和写的操作。
接下来对其原理进行分析。
2. 总体结构 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public class ReentrantReadWriteLock implements ReadWriteLock , java.io.Serializable { private static final long serialVersionUID = -6992448646407690164L ; private final ReentrantReadWriteLock.ReadLock readerLock; private final ReentrantReadWriteLock.WriteLock writerLock; final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer {...} public static class ReadLock implements Lock , java.io.Serializable {...} public static class WriteLock implements Lock , java.io.Serializable {...} public ReentrantReadWriteLock (boolean fair) { sync = fair ? new FairSync () : new NonfairSync (); readerLock = new ReadLock (this ); writerLock = new WriteLock (this ); } }
此时看整体的结构,可以验证前面所提到的,ReentrantReadWriteLock具有两个锁实例,一个是读锁,一个写锁。两个锁也都是基于AQS来实现,且也支持选择公平锁和非公平锁。
再看读写锁两个的实现:
可见,两个锁是公用了同一个同步器sync。那么,是怎么实现一个共享一个独占的呢?
在理解这个问题之前,需要先回忆一下AQS中的两个获取锁的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); } public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
可以知道,共享锁和独占锁,对于state的操作是不同的逻辑。独占锁只允许持有锁的线程对state进行操作。而共享锁可以由多个线程操作。
为了解决这个问题,在ReentrantReadWriteLock中,对state进行了分块使用。即,高16位用于共享模式的读锁,低16位用于独占模式的写锁。不同的锁操作各自的范围即可实现两种锁复用同一个Sync,也就是同一个state状态。
3. 源码分析 接下来进行源码分析。
3.1 Sync同步器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 abstract static class Sync extends AbstractQueuedSynchronizer { static final int SHARED_SHIFT = 16 ; static final int SHARED_UNIT = (1 << SHARED_SHIFT); static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1 ; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1 ; static int sharedCount (int c) { return c >>> SHARED_SHIFT; } static int exclusiveCount (int c) { return c & EXCLUSIVE_MASK; } static final class HoldCounter { int count = 0 ; final long tid = getThreadId(Thread.currentThread()); } static final class ThreadLocalHoldCounter extends ThreadLocal <HoldCounter> { public HoldCounter initialValue () { return new HoldCounter (); } } private transient ThreadLocalHoldCounter readHolds; private transient HoldCounter cachedHoldCounter; private transient Thread firstReader = null ; private transient int firstReaderHoldCount; Sync() { readHolds = new ThreadLocalHoldCounter (); setState(getState()); } }
对于以上的代码,其中有几个不好理解的变量:cachedHoldCounter、firstReader、firstReaderHoldCount。这几个其实都是用于性能上的优化。如果不理解可以继续往后看,在后面的代码分析中可以慢慢理解其原因。
3.2 读锁 首先来看读锁的获取:
1 2 3 4 5 6 7 8 9 public void lock () { sync.acquireShared(1 ); } public final void acquireShared (int arg) { if (tryAcquireShared(arg) < 0 ) doAcquireShared(arg); }
以上这几步如果之前有基础,或者看过前面几篇同步工具相关的文章的话,应该非常熟悉,就是老生常谈的获取锁的流程。其中我们主要需要看的是 tryAcquireShared(arg)方法的实现,因为这个是ReentrantReadWriteLock中Sync自己的实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 protected final int tryAcquireShared (int unused) { Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1 ; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0 ) { firstReader = current; firstReaderHoldCount = 1 ; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0 ) readHolds.set(rh); rh.count++; } return 1 ; } return fullTryAcquireShared(current); }
在以上代码中,我们主要是讨论 if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) 这个条件成立的情况下的代码。此时,我们来分析其中的几个条件。
首先来看,readerShouldBlock()。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 static final class FairSync extends Sync { final boolean readerShouldBlock () { return hasQueuedPredecessors(); } } static final class NonfairSync extends Sync { final boolean readerShouldBlock () { return apparentlyFirstQueuedIsExclusive(); } } final boolean apparentlyFirstQueuedIsExclusive () { Node h, s; return (h = head) != null && (s = h.next) != null && !s.isShared() && s.thread != null ; }
在以上代码中,公平锁的实现很简单,就是说只要有其他线程在阻塞,那么不管阻塞的是什么线程都阻塞住。而非公平锁中,就算同步队列中有线程持有读锁,那也可以争抢一下。 但是如果同步队列的第一个节点如果持有的是写锁就不能争抢了。可以看出在这里,ReentrantReadWriteLock对于写锁的线程还是有一定的偏向的。这样做的目的我觉的是因为,在实际运行环境中,大部分都是读的任务远多与写的任务,如果不对写线程有一定的偏向性,就有可能造成线程饥饿,写线程迟迟无法抢到资源。
接下来两个条件很简单,r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT), 即判断读线程数量是否溢出、以及CAS更新state状态值,不做详细解释。
那么就是说,只有这几个条件至少其中一个不满足时才会不进入这个if分支中,而进入return fullTryAcquireShared(current);这段代码。接下来我们分析这段代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 final int fullTryAcquireShared (Thread current) { HoldCounter rh = null ; for (;;) { int c = getState(); if (exclusiveCount(c) != 0 ) { if (getExclusiveOwnerThread() != current) return -1 ; } else if (readerShouldBlock()) { if (firstReader == current) { } else { if (rh == null ) { rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) { rh = readHolds.get(); if (rh.count == 0 ) readHolds.remove(); } } if (rh.count == 0 ) return -1 ; } } if (sharedCount(c) == MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0 ) { firstReader = current; firstReaderHoldCount = 1 ; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null ) rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); else if (rh.count == 0 ) readHolds.set(rh); rh.count++; cachedHoldCounter = rh; } return 1 ; } } }
接下来如果返回值为负数,则进入分支执行doAcquireShared(arg)。这个方法在之前的CountDownLatch中已经详细讨论过,此处不做过多解释。
至此,读锁的获取也就是加锁的部分已经了解完了,接下来看看读锁的释放。
1 2 3 4 5 6 7 8 9 10 11 public void unlock () { sync.releaseShared(1 ); } public final boolean releaseShared (int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true ; } return false ; }
依然是熟悉的套路,不做详细解释,只针对读写锁自己实现的tryReleaseShared(arg)方法进行分析。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 protected final boolean tryReleaseShared (int unused) { Thread current = Thread.currentThread(); if (firstReader == current) { if (firstReaderHoldCount == 1 ) firstReader = null ; else firstReaderHoldCount--; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) rh = readHolds.get(); int count = rh.count; if (count <= 1 ) { readHolds.remove(); if (count <= 0 ) throw unmatchedUnlockException(); } --rh.count; } for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) return nextc == 0 ; } }
当tryReleaseShared(arg)返回true后,则会进入共享锁的解锁阶段:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 private void doReleaseShared () { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0 )) continue ; unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0 , Node.PROPAGATE)) continue ; } if (h == head) break ; } }
此处不做详细解释。
3.3 写锁 以上讨论了共享锁读锁的加锁和解锁的流程,接下来讨论一下独占锁写锁。首先来讨论加锁流程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 public void lock () { sync.acquire(1 ); } public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire (int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0 ) { if (w == 0 || current != getExclusiveOwnerThread()) return false ; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error ("Maximum lock count exceeded" ); setState(c + acquires); return true ; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false ; setExclusiveOwnerThread(current); return true ; } final boolean writerShouldBlock () { return hasQueuedPredecessors(); } final boolean writerShouldBlock () { return false ; }
以上就是写锁的整体流程了,逻辑还是比较简单。接下来讨论一下写锁的释放。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public void unlock () { sync.release(1 ); } public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } protected final boolean tryRelease (int releases) { if (!isHeldExclusively()) throw new IllegalMonitorStateException (); int nextc = getState() - releases; boolean free = exclusiveCount(nextc) == 0 ; if (free) setExclusiveOwnerThread(null ); setState(nextc); return free; }
以上就是对读写锁的整体流程的分析。