本文将从以下角度来介绍ReentrantLock:
ReentrantLock介绍;
ReentrantLock与synchronized的关系;
ReentrantLock的原理;
1. ReentrantLock介绍 JDK1.5新增了并发包,里面包含Lock接口,与synchronized关键字一样能实现同步功能。但相比synchronized,Lock更加灵活,可以手动获取、释放锁,而ReentrantLock就是Lock的一个实现类。接下来先介绍其基本使用。
1.1 ReentrantLock的基本使用 以下是一段是使用5个线程,并发对count做加一操作。使用ReentrantLock来保证线程安全。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 class AddRunnable implements Runnable { @Override public void run () { for (int i = 0 ; i < 10000 ; i++) { lock.lock(); try { count++; }finally { lock.unlock(); } } } } @Slf4j public class Demo17 { private static final int THREAD_NUM = 5 ; public static ReentrantLock lock = new ReentrantLock (); public static int count = 0 ; public static void main (String[] args) throws InterruptedException { ArrayList<Thread> list = new ArrayList <>(THREAD_NUM); for (int i = 0 ; i < THREAD_NUM; i++) list.add(new Thread (new AddRunnable ())); for (Thread thread : list) thread.start(); for (Thread thread : list) thread.join(); log.info("count : {}" , count); } }
根据以上明显可见,ReentrantLock和synchronized关键字的最直观的区别就是ReentrantLock需要进行手动地加锁和解锁。
当使用synchronized时,由于其实现了自动加锁解锁,在出现异常时,JVM依然会释放锁持有的锁。而ReentrantLock不会,因此在编写时必须使用try-finnaly来保证其一定会释放锁。使用模板如下:
1 2 3 4 5 6 lock.lock(); try { doSomeThing(); }finally { lock.unlock(); }
1.2 ReentrantLock的常用方法
方法
说明
lock()
获取锁。如果锁已经被占用,则等待
tryLock()
尝试获取锁,拿到锁返回true,没拿到返回false,并立即返回
tryLock(long time, TimeUnit unit)
在指定时间内会等待获取锁,如果一直拿不到就返回false,并立即返回。在等待过程中可以进行中断
lockInterruptibley()
获取锁。如果线程interrupted了,则跟着抛出异常中断
unLock()
释放锁
newCondition()
创建一个与此 Lock 实例一起使用的 Condition 实例
ReentrantLock(boolean fair)
构造方法
前五个方法,很容易理解。对后两个进行简要解释。
newCondition():可以创建一个与此lock关联的Condition实例。这也是后面要提到的ReentrantLock与synchronized关键字的一个不同点。一个lock可以关联多个Condition实例,可以以更细的力度去阻塞和唤醒线程。ReentrantLock(boolean fair):此为ReentrantLock的一个有参构造方法,可以用于构造公平锁或者非公平锁。这也是一个与synchronized之间的不同点。
2. ReentrantLock与synchronized的关系 ^a04927
2.1 共同点
二者都是独占锁,保证线程互斥地访问临界区;
二者都是可重入的。
2.2 不同点 2.2.1 语法上看 ReentrantLock需要手动加锁解锁,而synchronized不需要;
2.2.2 是否公平 synchronized是非公平锁,而ReentrantLock可以指定是否使用公平锁。由于非公平锁性能相对较高,因此默认为非公平锁;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 @Slf4j class D18FairRunnable implements Runnable { @Override public void run () { for (int i = 0 ; i < 4 ; i++) { fareLock.lock(); try { log.info("Running" ); } finally { fareLock.unlock(); } } } } @Slf4j class D18UnFairRunnable implements Runnable { @Override public void run () { for (int i = 0 ; i < 4 ; i++) { unFareLock.lock(); try { log.info("Running" ); } finally { unFareLock.unlock(); } } } } public class Demo18 { public static final ReentrantLock fareLock = new ReentrantLock (true ); public static final ReentrantLock unFareLock = new ReentrantLock (false ); private static final Logger log = LoggerFactory.getLogger(Demo18.class); public static void main (String[] args) throws InterruptedException { ArrayList<Thread> list1 = new ArrayList <>() {{ add(new Thread (new D18FairRunnable ())); add(new Thread (new D18FairRunnable ())); add(new Thread (new D18FairRunnable ())); }}; for (Thread thread : list1) thread.start(); Thread.sleep(1000 ); System.out.println("================================" ); ArrayList<Thread> list2 = new ArrayList <>() {{ add(new Thread (new D18UnFairRunnable ())); add(new Thread (new D18UnFairRunnable ())); add(new Thread (new D18UnFairRunnable ())); }}; for (Thread thread : list2) thread.start(); } }
输出如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 19:01:02.357 [Thread-0] INFO com.yang.juc.D18FairRunnable - Running 19:01:02.359 [Thread-1] INFO com.yang.juc.D18FairRunnable - Running 19:01:02.359 [Thread-2] INFO com.yang.juc.D18FairRunnable - Running 19:01:02.359 [Thread-0] INFO com.yang.juc.D18FairRunnable - Running 19:01:02.359 [Thread-1] INFO com.yang.juc.D18FairRunnable - Running 19:01:02.360 [Thread-2] INFO com.yang.juc.D18FairRunnable - Running 19:01:02.360 [Thread-0] INFO com.yang.juc.D18FairRunnable - Running 19:01:02.360 [Thread-1] INFO com.yang.juc.D18FairRunnable - Running 19:01:02.360 [Thread-2] INFO com.yang.juc.D18FairRunnable - Running 19:01:02.360 [Thread-0] INFO com.yang.juc.D18FairRunnable - Running 19:01:02.360 [Thread-1] INFO com.yang.juc.D18FairRunnable - Running 19:01:02.360 [Thread-2] INFO com.yang.juc.D18FairRunnable - Running ================================ 19:01:03.362 [Thread-3] INFO com.yang.juc.D18UnFairRunnable - Running 19:01:03.362 [Thread-3] INFO com.yang.juc.D18UnFairRunnable - Running 19:01:03.362 [Thread-3] INFO com.yang.juc.D18UnFairRunnable - Running 19:01:03.362 [Thread-3] INFO com.yang.juc.D18UnFairRunnable - Running 19:01:03.362 [Thread-4] INFO com.yang.juc.D18UnFairRunnable - Running 19:01:03.362 [Thread-4] INFO com.yang.juc.D18UnFairRunnable - Running 19:01:03.362 [Thread-4] INFO com.yang.juc.D18UnFairRunnable - Running 19:01:03.362 [Thread-4] INFO com.yang.juc.D18UnFairRunnable - Running 19:01:03.362 [Thread-5] INFO com.yang.juc.D18UnFairRunnable - Running 19:01:03.362 [Thread-5] INFO com.yang.juc.D18UnFairRunnable - Running 19:01:03.362 [Thread-5] INFO com.yang.juc.D18UnFairRunnable - Running 19:01:03.362 [Thread-5] INFO com.yang.juc.D18UnFairRunnable - Running
可见,使用公平锁时不会出现饥饿现象。
2.2.3 是否可中断 可打断指的是处于阻塞状态等待锁的线程可以被打断等待。注意 lock.lockInterruptibly() 和 lock.trylock() ⽅法是可打断的,lock.lock()不是。可打断的意义在于避免得不到锁的线程⽆限制地等待下去,防⽌死锁的⼀种⽅式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @Slf4j public class Demo19 { private static final ReentrantLock lock = new ReentrantLock (); public static void main (String[] args) throws InterruptedException { Thread t1 = new Thread (() -> { lock.lock(); try { lock.lockInterruptibly(); log.info("获得了锁" ); } catch (InterruptedException e) { log.error("被打断" ); } finally { lock.unlock(); } }); lock.lock(); t1.start(); log.info("获得了锁" ); try { Thread.sleep(1000 ); t1.interrupt(); }finally { lock.unlock(); } } }
输出如下:
1 2 19:22:57.166 [main] INFO com.yang.juc.Demo19 - 获得了锁 19:22:58.172 [Thread-0] ERROR com.yang.juc.Demo19 - 被打断
也可以使用tryLock()方法,等待获取一定时间,超时则放弃。
2.2.4 是否能绑定条件 synchronized不能绑定条件 ,而ReentrantLock可以绑定多个条件结合await()和signal()方法实现更细粒度地等待和唤醒。
其实syncrhonized也可以理解为可以绑定条件,但只能绑定一个。就是说,当条件不满足时,会放入Monitor的WaitSet中,需要唤醒时,是直接唤醒整个WaitSet中的所有线程。
接下来假设一个场景,有两个线程t1和t2,分别需要满足A和B条件,为满足时则阻塞,等待条件满足唤醒。首先使用synchronized来实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 @Slf4j public class Demo21 { private static Object obj = new Object (); private static boolean hasA = false ; private static boolean hasB = false ; public static void main (String[] args) throws InterruptedException { Thread t1 = new Thread (() -> { synchronized (obj){ while (!hasA) { log.info("等待A条件" ); try { obj.wait(); } catch (InterruptedException e) { throw new RuntimeException (e); } log.debug("被唤醒" ); } } }); Thread t2 = new Thread (() -> { synchronized (obj){ while (!hasB) { log.info("等待B条件" ); try { obj.wait(); } catch (InterruptedException e) { throw new RuntimeException (e); } log.debug("被唤醒" ); } } }); t1.start(); t2.start(); Thread.sleep(100 ); synchronized (obj){ hasA = !hasA; obj.notifyAll(); } Thread.sleep(100 ); synchronized (obj){ hasB = !hasB; obj.notifyAll(); } } }
输出:
1 2 3 4 5 6 20:11:01.840 [Thread-0] INFO com.yang.juc.Demo21 - 等待A条件 20:11:01.841 [Thread-1] INFO com.yang.juc.Demo21 - 等待B条件 20:11:01.941 [Thread-0] DEBUG com.yang.juc.Demo21 - 被唤醒 20:11:01.941 [Thread-1] DEBUG com.yang.juc.Demo21 - 被唤醒 20:11:01.941 [Thread-1] INFO com.yang.juc.Demo21 - 等待B条件 20:11:02.046 [Thread-1] DEBUG com.yang.juc.Demo21 - 被唤醒
可见,A条件满足后无法唤醒指定的线程,只能全部唤醒,或者随机唤醒,让线程判断是否满足条件,再决定是否需要继续阻塞。如果有大量的线程阻塞的话,会造成线程的争抢和不必要地性能浪费。接下来我们使用ReentrantLock来实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 @Slf4j public class Demo20 { private static ReentrantLock lock = new ReentrantLock (); private static Condition A = lock.newCondition(); private static Condition B = lock.newCondition(); private static boolean hasA = false ; private static boolean hasB = false ; public static void main (String[] args) throws InterruptedException { Thread t1 = new Thread (() -> { lock.lock(); try { while (!hasA){ log.info("等待A条件" ); A.await(); } log.info("A条件满足,继续执行" ); } catch (InterruptedException e) { throw new RuntimeException (e); } finally { lock.unlock(); } }); Thread t2 = new Thread (() -> { lock.lock(); try { while (!hasB){ log.info("等待B条件" ); B.await(); } log.info("B条件满足,继续执行" ); } catch (InterruptedException e) { throw new RuntimeException (e); } finally { lock.unlock(); } }); t1.start(); t2.start(); Thread.sleep(100 ); lock.lock(); try { hasA = !hasA; log.debug("满足A条件" ); A.signal(); } finally { lock.unlock(); } Thread.sleep(100 ); lock.lock(); try { hasB = !hasB; log.debug("满足B条件" ); B.signal(); } finally { lock.unlock(); } } }
输出:
1 2 3 4 5 6 20:13:46.631 [Thread-0] INFO com.yang.juc.Demo20 - 等待A条件 20:13:46.633 [Thread-1] INFO com.yang.juc.Demo20 - 等待B条件 20:13:46.732 [main] DEBUG com.yang.juc.Demo20 - 满足A条件 20:13:46.733 [Thread-0] INFO com.yang.juc.Demo20 - A条件满足,继续执行 20:13:46.834 [main] DEBUG com.yang.juc.Demo20 - 满足B条件 20:13:46.834 [Thread-1] INFO com.yang.juc.Demo20 - B条件满足,继续执行
通过以上的Demo的对比可以明显看出,ReentrantLock可以绑定多个条件的特性的优越之处。
2.2.5 锁的对象 synchronized锁的是对象,锁信息是保存在对象头中,根据对象头的锁标志位来判断其锁状态;ReentrantLock锁的是线程,会根据进入的线程和int类型的state标识来判断其锁状态。
2.2.6 底层实现 synchronized是使用JVM层面的Monitor实现,而ReentrantLock基于AQS实现。
2.2.5和2.2.6会在后面小节中体现
3. ReentrantLock的原理 ReentrantLock 在内部用了内部类 Sync 来管理锁,所以真正的获取锁和释放锁是由 Sync 的实现类来控制的。
1 2 abstract static class Sync extends AbstractQueuedSynchronizer {}
Sync 有两个实现,分别为 NonfairSync(非公平锁)和 FairSync(公平锁),我们看 FairSync 部分。
1 2 3 public ReentrantLock (boolean fair) { sync = fair ? new FairSync () : new NonfairSync (); }
3.1 加锁操作 3.1.1 源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 final void lock () { acquire(1 ); } @ReservedStackAccess public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } @ReservedStackAccess protected final boolean tryAcquire (int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0 ) { if (!hasQueuedPredecessors() && compareAndSetState(0 , acquires)) { setExclusiveOwnerThread(current); return true ; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0 ) throw new Error ("Maximum lock count exceeded" ); setState(nextc); return true ; } return false ; } public final boolean hasQueuedPredecessors () { Node t = tail; Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } private Node addWaiter (Node mode) { Node node = new Node (Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node ())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } } @ReservedStackAccess final boolean acquireQueued (final Node node, int arg) { boolean failed = true ; try { boolean interrupted = false ; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null ; failed = false ; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire (Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) return true ; if (ws > 0 ) { do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0 ); pred.next = node; } else { compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false ; } private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }
SIGNAL(-1)的含义是: 其后继节点需要被唤醒,或者说此节点有义务唤醒其后继节点。
3.1.2 addWaiter()方法中的细节 观察addWaiter()方法的源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 private Node addWaiter (Node mode) { Node node = new Node (Thread.currentThread(), mode); Node pred = tail; if (pred != null ) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq (final Node node) { for (;;) { Node t = tail; if (t == null ) { if (compareAndSetHead(new Node ())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
不难发现,虚线包起来的部分源码和enq(node)中的代码逻辑重合。即使将虚线中这部分代码删掉,也几乎不影响功能。那是为什么呢?
这里就体现了Doug Lea前辈对细节的把控和性能的极致追求。因为在大多数情况下,tail 已经被初始化且有效,直接执行快速路径中的操作就可以成功,将新节点插入到队列末尾。这种情况下,避免了进入 enq 方法的循环以及一些不必要的判断语句,从而提高了效率。
3.1.3 shouldParkAfterFailedAcquire()返回值为false的情况 自己进行模拟一下,不难发现。线程第一次进入这个方法的时候,一定会返回false。为什么呢?
通过以上源码的分析中,可知,tail的ws是都是依赖后继节点来置为SIGNAL的。节点创建时默认是0。因此当后继节点进入一次后,才会将其通过CAS置为SIGNAL也就是-1。由于这个操作是在一个死循环中,因此下一次走到shouldParkAfterFailedAcquire()方法时自然会返回true了。
3.2 解锁操作 3.2.1 源码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 public void unlock () { sync.release(1 ); } @ReservedStackAccess public final boolean release (int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0 ) unparkSuccessor(h); return true ; } return false ; } protected final boolean tryRelease (int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException (); boolean free = false ; if (c == 0 ) { free = true ; setExclusiveOwnerThread(null ); } setState(c); return free; } private void unparkSuccessor (Node node) { int ws = node.waitStatus; if (ws < 0 ) compareAndSetWaitStatus(node, ws, 0 ); Node s = node.next; if (s == null || s.waitStatus > 0 ) { s = null ; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0 ) s = t; } if (s != null ) LockSupport.unpark(s.thread); }
唤醒后继节点后,被唤醒的线程将从之前的代码继续往下走,即:
1 2 3 4 private final boolean parkAndCheckInterrupt () { LockSupport.park(this ); return Thread.interrupted(); }