在并发编程中,AQS(AbstractQueuedSynchronizer)是一个非常重要的类,JUC中很多工具都是基于AQS进行开发。其设计和编码思路都极其精妙。本文的目的是先对AQS进行一个大致的介绍,具体的实现细节将在后续的各种同步组件的实现中进行详细解释以体现出其设计思想的巧妙之处。
1. 动手实现一个锁 为了认识AQS的功能,我们先自己动手实现一个非常简易的独占不可重入锁。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 public class MyLock1 extends AbstractMyLock { private volatile int status; private volatile static Thread t = null ; private static final Unsafe U; private static final long statusOffset; static { Field theUnsafeField = null ; try { theUnsafeField = Unsafe.class.getDeclaredField("theUnsafe" ); theUnsafeField.setAccessible(true ); U = (Unsafe) theUnsafeField.get(null ); statusOffset = U.objectFieldOffset(MyLock1.class.getDeclaredField("status" )); } catch (NoSuchFieldException | IllegalAccessException e) { throw new RuntimeException (e); } } public MyLock1 () { this .status = 0 ; } public void lock () { while (!U.compareAndSwapInt(this , statusOffset, 0 , 1 )) { } t = Thread.currentThread(); } public void unlock () { if (t == Thread.currentThread()){ this .status = 0 ; this .t = null ; } } }
以上代码实现了一个极其简易的独占不可重入锁。接下来对其进行测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 @Slf4j public class Demo27 { private static MyLock1 lock = new MyLock1 (); private static int cnt = 0 ; public void start () throws InterruptedException { Thread t1 = new Thread (new MyRunnable ()); Thread t2 = new Thread (new MyRunnable ()); Thread t3 = new Thread (new MyRunnable ()); t1.start(); t2.start(); t3.start(); t1.join(); t2.join(); t3.join(); log.info("The final result is: {}" , cnt); } private class MyRunnable implements Runnable { @Override public void run () { for (int i = 0 ; i < 10000 ; i++) { lock.lock(); log.debug("{} lock()" , Thread.currentThread().getName()); try { cnt ++; log.info("cnt --> {}" , cnt); }finally { log.debug("{} unlock()" , Thread.currentThread().getName()); lock.unlock(); } } } } public static void main (String[] args) throws InterruptedException { new Demo27 ().start(); } }
部分果如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 11:06:27.706 [Thread-0] DEBUG com.yang.juc.Demo27 - Thread-0 lock() 11:06:27.706 [Thread-0] INFO com.yang.juc.Demo27 - cnt --> 29996 11:06:27.706 [Thread-0] DEBUG com.yang.juc.Demo27 - Thread-0 unlock() 11:06:27.706 [Thread-0] DEBUG com.yang.juc.Demo27 - Thread-0 lock() 11:06:27.706 [Thread-0] INFO com.yang.juc.Demo27 - cnt --> 29997 11:06:27.706 [Thread-0] DEBUG com.yang.juc.Demo27 - Thread-0 unlock() 11:06:27.706 [Thread-0] DEBUG com.yang.juc.Demo27 - Thread-0 lock() 11:06:27.706 [Thread-0] INFO com.yang.juc.Demo27 - cnt --> 29998 11:06:27.706 [Thread-0] DEBUG com.yang.juc.Demo27 - Thread-0 unlock() 11:06:27.706 [Thread-0] DEBUG com.yang.juc.Demo27 - Thread-0 lock() 11:06:27.706 [Thread-0] INFO com.yang.juc.Demo27 - cnt --> 29999 11:06:27.706 [Thread-0] DEBUG com.yang.juc.Demo27 - Thread-0 unlock() 11:06:27.706 [Thread-0] DEBUG com.yang.juc.Demo27 - Thread-0 lock() 11:06:27.706 [Thread-0] INFO com.yang.juc.Demo27 - cnt --> 30000 11:06:27.706 [Thread-0] DEBUG com.yang.juc.Demo27 - Thread-0 unlock() 11:06:27.706 [main] INFO com.yang.juc.Demo27 - The final result is: 30000
根据以上测试可见,锁的功能基本已经实现。但是很容易发现有很多缺陷,比如不可重入,不可中断,高并发环境下性能低等。在本文中重点针对高并发环境下性能低的缺点展开讨论。
2. 分析问题 在MyLock1的实现中,可见为了保证加锁解锁操作的原子性,使用了Unsafe类中的CAS操作实现了乐观锁。
2.1 乐观锁和悲观锁
所谓乐观锁,根据名字可以猜出其设计思想,是假设最好的情况,认为共享资源每次被访问的时候不会出现问题,线程可以不停地执行,无需加锁也无需等待,只是在提交修改的时候去验证对应的资源(也就是数据)是否被其它线程修改了。 而悲观锁,总是假设最坏的情况,认为共享资源每次被访问的时候就会出现问题(比如共享数据被修改),所以每次在获取资源操作的时候都会上锁,这样其他线程想拿到这个资源就会阻塞直到锁被上一个持有者释放。
对于独占锁加锁问题,可以简单地类比为上厕所的场景。一个厕所只能有一个人使用,此时如果有甲、乙、丙三人想争抢厕所,甲抢到了,剩下两人就无法进入,使用这个共享资源。接下来他俩有两种选择:1、在厕所外不停转圈圈,检查厕所是否被释放(乐观锁,自旋);2、去一个区域排好队,等待甲用完厕所,叫队列中的第一个人去上(悲观锁,进入阻塞队列)。
可见两个选择都可以实现。但是,此时设想一下,如果有一万个人在外面等待呢?不忍直视。
通过此例子来类比,容易理解,乐观锁适合线程争抢不严重的情况,可以减少线程频繁的上下文切换;而悲观锁适合线程争抢激烈的情况,可以避免大量的无意义的自旋操作占用CPU。
2.2 思考解决方案 根据以上的分析,可知乐观锁和悲观锁在不同情况下各有长处,使用哪个需要根据场景而定。此时如果我们要在之前的思路上,解决掉高并发环境下造成的CPU压力过大导致的性能变差的问题,那么就需要对之前的乐观锁进行改进。
如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制。
3. 引入AQS 3.1 基于AQS实现lock AQS就实现了以上提到的一套线程阻塞等待以及被唤醒时锁分配的机制。我们可以先体验一下AQS,对MyLock1进行改造,通过AQS来实现。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @Slf4j public class MyLock2 extends AbstractMyLock { private Sync sync; public MyLock2 () { sync = new Sync (); } public void lock () { sync.acquire(1 ); } public void unLock () { sync.release(1 ); } private class Sync extends AbstractQueuedSynchronizer { @Override protected boolean tryRelease (int arg) { return compareAndSetState(1 , 0 ); } @Override protected boolean tryAcquire (int arg) { for (int i = 0 ; i < 3 ; i++) { if (compareAndSetState(0 , 1 )) return true ; } return false ; } protected Sync () { super (); } } }
现在对MyLock1和MyLock2进行测试对比其性能:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 @Slf4j public class Demo29 { private static int cnt = 0 ; private AbstractMyLock lock; private static int THREAD_COUNT = 50 ; public Demo29 (int threadCount, AbstractMyLock lock) { this .THREAD_COUNT = threadCount; this .lock = lock; } private void start () throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT); long start = System.currentTimeMillis(); for (int i = 0 ; i < THREAD_COUNT; i++) { executorService.execute(new MyRunnable ()); } executorService.shutdown(); executorService.awaitTermination(60 , TimeUnit.SECONDS); log.info("lock: {}, cost time : {}" , lock.getClass().getName() , System.currentTimeMillis() - start); } public static void main (String[] args) throws InterruptedException { new Demo29 (100 , new MyLock2 ()).start(); } private class MyRunnable implements Runnable { @Override public void run () { for (int i = 0 ; i < 1000 ; i++) { lock.lock(); log.debug("{} lock()" , Thread.currentThread().getName()); try { cnt ++; log.debug("cnt --> {}" , cnt); }finally { log.debug("{} unlock()" , Thread.currentThread().getName()); lock.unlock(); } } } } }
测试结果分别如下:
1 2 18:11:25.586 [main] INFO c.y.j.Demo29 - lock: MyLock1, cost time : 15629 18:12:25.376 [main] INFO c.y.j.Demo29 - lock: MyLock2, cost time : 691
不难看出,基于AQS的锁在高并发的情况下,性能相较于于之前的单纯的CAS实现的乐观锁非常可观,然而其实现方法又非常简洁。主要归功于Doug Lea 对与AQS的设计,接下来讲对AQS进行一个整体的介绍。
3.2 AQS和Lock的关系 通过以上的铺垫,这里可以得出以下结论。
AQS面向锁的开发者,它关注的两个主要问题是同步状态管理以及维护线程的同步等待队列;
Lock面向锁的使用者,它聚焦的问题是使用者如何更好地使用锁处理并发问题,而使用者不需要知道锁的实现细节就可以实现互斥同步。
4. 介绍AQS AQS是一个用来构建锁和同步器的框架,使用AQS能简单且高效地构造出应用广泛的大量的同步器,比如我们提到的ReentrantLock,Semaphore,其他的诸如ReentrantReadWriteLock,SynchronousQueue,FutureTask等等皆是基于AQS的。当然,我们自己也能利用AQS非常轻松容易地构造出符合我们自己需求的同步器。
4.1 核心思想 AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列锁实现的,即将暂时获取不到锁的线程加入到队列中。
其中,AQS维护了一个state成员变量用于表示同步状态。具体的含义根据不同的实现可以自行赋予其不同的含义。
1 2 3 4 5 private volatile int state;
其状态信息通过以下几个方法进行操作。
1 2 3 4 5 6 7 8 9 10 11 12 protected final int getState () { return state; } protected final void setState (int newState) { state = newState; } protected final boolean compareAndSetState (int expect, int update) { return unsafe.compareAndSwapInt(this , stateOffset, expect, update); }
4.2 AQS中的模版方法设计模式 AQS是基于[[设计模式——模版方法|模版方法设计模式]]设计的(本文中不展开讨论)。Doug Lea对与同步器的使用过程进行抽象和建模,将大部分代码已经实现,留下了一部分方法给开发者用于满足自己不同场景下的需求。开发者可以通过继承指定的方法,在同步组件的实现过程中调用模版方法,模版方法则会调用用户重写的方法。一般用于重写的方法有如下几个:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 protected boolean tryAcquire (int arg) ;protected boolean tryRelease (int arg) ;protected int tryAcquireShared (int arg) ;protected boolean tryReleaseShared (int arg) ;protected boolean isHeldExclusively () ;
当开发者自定义的同步组件未重写指定方法,而又调用此方法时,默认会抛出UnsupportedOperationException异常。
这里没有像常见的模版方法中掩饰的一样,使用接口或者抽象类定义需要自定义的方法,而是使用抛出异常的方式。是因为有些方法是自定义同步组件的时候无需实现的,比如现在要定义一个独占模式的同步组件,那么就没有必要重写共享模式的方法。如果将其定义为了抽象方法的话,那么子类就必须实现其所有抽象方法。
4.3 同步队列和等待队列 继续观察AQS的成员变量, 维护了head和tail。
1 2 3 4 5 6 private transient volatile Node head;private transient volatile Node tail;
再观察内部类Node:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 static final class Node { static final Node SHARED = new Node (); static final Node EXCLUSIVE = null ; static final int CANCELLED = 1 ; static final int SIGNAL = -1 ; static final int CONDITION = -2 ; static final int PROPAGATE = -3 ; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; }
不难看出,AQS中维护了一个双向链表用于实现同步队列(也叫阻塞队列)。此外Node中通过nextWaiter还维护了一个单向队列用于实现等待队列(等待队列只有当使用Condition时,才会存在此单向链表,并可能有多个等待队列)。如下图:
本文对AQS的介绍就到这里,虽然看起来没什么东西。但是对于后续深入学习AQS以及JUC组件非常重要。在本文中,主要目的就是理解AQS是什么?在同步组件开发中是什么角色?它实现同步的核心思想是什么? 至于AQS的具体细节,将在后续的源码中进行学习。