钉钉、微博极速扩容黑科技,点击观看阿里云弹性计算年度发布会!>>>
1. 简介
AbstractQueuedSynchronizer (抽象队列同步器,以下简称 AQS)出现在 JDK 1.5 中,由大师 Doug Lea 所创作。AQS 是很多同步器的基础框架。
ReentrantLock、CountDownLatch 和 Semaphore 等都是基于 AQS 实现的。除此之外,我们还可以基于 AQS,定制出我们所需要的同步器。
AQS 的使用方式通常都是通过内部类继承 AQS 实现同步功能,通过继承 AQS,可以简化同步器的实现。如前面所说,AQS 是很多同步器实现的基础框架。弄懂 AQS 对理解 Java 并发包里的组件大有裨益,这也是我学习 AQS 并写出这篇文章的缘由。另外,需要说明的是,AQS 本身并不是很好理解,细节很多。在看的过程中要有一定的耐心,做好看多遍的准备。
2.原理概述
在 AQS 内部,通过维护一个FIFO 队列
来管理多线程的排队工作。在公平竞争的情况下,无法获取同步状态的线程将会被封装成一个节点,置于队列尾部。入队的线程将会通过自旋的方式获取同步状态,若在有限次的尝试后,仍未获取成功,线程则会被阻塞住。
大致示意图如下:
当头结点释放同步状态后,此时头结点线程将会去唤醒后继节点阻塞的线程。后继节点线程恢复运行并获取同步状态后,会将旧的头结点从队列中移除,并将自己设为头结点。
实现思路
AQS内部维护一个CLH队列来管理锁。
线程会首先尝试获取锁,如果失败,则将当前线程以及等待状态等信息包成一个Node节点加到同步队列里。
接着会不断循环尝试获取锁(条件是当前节点为head的直接后继才会尝试),如果失败则会阻塞自己,直至被唤醒;
而当持有锁的线程释放锁时,会唤醒队列中的后继线程。
下面列举JDK中几种常见使用了AQS的同步组件:
- ReentrantLock: 使用了AQS的独占获取和释放,用state变量记录某个线程获取独占锁的次数,获取锁时+1,释放锁时-1,在获取时会校验线程是否可以获取锁。
- Semaphore: 使用了AQS的共享获取和释放,用state变量作为计数器,只有在大于0时允许线程进入。获取锁时-1,释放锁时+1。
- CountDownLatch: 使用了AQS的共享获取和释放,用state变量作为计数器,在初始化时指定。只要state还大于0,获取共享锁会因为失败而阻塞,直到计数器的值为0时,共享锁才允许获取,所有等待线程会被逐一唤醒。
3.源码介绍
3.1 方法简介
第一组方法是用于访问/设置同步状态的方法。
int getState(): 获取同步状态
void setState(): 设置同步状态
boolean compareAndSetState(int expect, int update):基于CAS,原子设置当前状态
第二组方需要由同步组件覆写方法,来实现同步状态的管理。
方法 | 说明 |
---|---|
boolean tryAcquire(int arg) | 独占式获取同步状态 |
boolean tryRelease(int arg) | 独占式释放同步状态 |
int tryAcquireShared(int arg) | 共享式获取同步状态 |
boolean tryReleaseShared(int arg) | 共享式获取同步状态 |
boolean isHeldExclusively() | 检测当前线程是否获取独占锁 |
以上的几个试获取/释放锁的方法的具体实现应当是无阻塞的。
第三组方法是一组模板方法,同步组件可直接调用。
方法 | 描述 |
---|---|
void acquire(int arg) | 获取独占锁。会调用tryAcquire 方法,如果未获取成功,则会进入同步队列等待 |
void acquireInterruptibly(int arg) | 响应中断版本的acquire |
boolean tryAcquireNanos(int arg,long nanos) | 响应中断+带超时版本的acquire |
void acquireShared(int arg) | 获取共享锁。会调用tryAcquireShared 方法 |
void acquireSharedInterruptibly(int arg) | 响应中断版本的acquireShared |
boolean tryAcquireSharedNanos(int arg,long nanos) | 响应中断+带超时版本的acquireShared |
boolean release(int arg) | 释放独占锁 |
boolean releaseShared(int arg) | 释放共享锁 |
Collection getQueuedThreads() | 获取同步队列上的线程集合 |
上面看上去很多方法,其实从语义上来区分就是获取和释放,从模式上区分就是独占式和共享式,从中断相应上来看就是支持和不支持。
3.2 节点结构
在并发的情况下,AQS 会将未获取同步状态的线程将会封装成节点,并将其放入同步队列尾部。同步队列中的节点除了要保存线程,还要保存等待状态。不管是独占式还是共享式,在获取状态失败时都会用到节点类。
static final class Node {/** 共享类型节点,标记节点在共享模式下等待 */static final Node SHARED = new Node();/** 独占类型节点,标记节点在独占模式下等待 */static final Node EXCLUSIVE = null;/** 等待状态 - 取消 */static final int CANCELLED = 1;//等待状态 - 通知。某个节点是处于该状态,当该节点释放同步状态后,会通知后继节点线程,使之可以恢复运行static final int SIGNAL = -1;/** 等待状态 - 条件等待。表明节点等待在 Condition 上 */static final int CONDITION = -2;// 等待状态 - 传播。表示无条件向后传播唤醒动作static final int PROPAGATE = -3;volatile int waitStatus;// 前驱节点volatile Node prev;//后继节点volatile Node next;//对应的线程volatile Thread thread;//下一个等待节点,用在 ConditionObject 中Node nextWaiter;//判断节点是否是共享节点final boolean isShared() {return nextWaiter == SHARED;}// 获取前驱节点final Node predecessor() throws NullPointerException {Node p = prev;if (p == null)throw new NullPointerException();elsereturn p;}Node() { // 用于建立初始标头或共享标头}/** addWaiter 方法会调用该构造方法 */Node(Thread thread, Node mode) {this.nextWaiter = mode;this.thread = thread;}/** Condition 中会用到此构造方法 */Node(Thread thread, int waitStatus) { // Used by Conditionthis.waitStatus = waitStatus;this.thread = thread;}}
这里有必要专门梳理一下节点等待状态的定义,因为AQS源码中有大量的状态判断与跃迁。
int waitStatus等待状态如下:
① CANCELLED,值为1,由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待,节点进入该状态将不会变化
② SIGNAL,值为 -1,后继节点的线程处于等待状态,而当前节点的线程如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行
③ CONDITION,值为 -2,节点在等待队列中,节点线程等待在Condition上,当其他线程对Condition调用了signal() 方法后,该节点将会从等待队列中转移到同步队列中,加入到对同步状态的获取中
④ PROPAGATE,值为 -3,表示下一次共享式同步状态获取将会无条件地被传播下去
⑤ INITIAL,值为,初始状态
对于分析AQS中不涉及ConditionObject
部分的代码,可以认为队列中的节点状态只会是CANCELLED, SIGNAL, PROPAGATE, 0这几种情况。
上图为自制的AQS状态的流转图,AQS中0状态和CONDITION状态为始态,CANCELLED状态为终态。0状态同时也可以是节点生命周期的终态。
注意,上图仅表示状态之间流转的可达性,并不代表一定能够从一个状态沿着线随意跃迁。
在AQS中包含了head和tail两个Node引用,其中head在逻辑上的含义是当前持有锁的线程,head节点实际上是一个虚节点,本身并不会存储线程信息。
当一个线程无法获取锁而被加入到同步队列时,会用CAS来设置尾节点tail为当前线程对应的Node节点。
head和tail在AQS中是延迟初始化的,也就是在需要的时候才会被初始化,也就意味着在所有线程都能获取到锁的情况下,队列中的head和tail都会是null。
3.3独占锁的实现
非公平锁的加锁解锁流程
ReentrantLock类使用了AQS的独占锁的获取和释放,所以下面有些源码的展示,是从这个类进入的。
3.3.1 获取同步状态
(1)假设这个时候在初始情况下,还没有多任务来请求竞争这个state。
这时候如果第一个线程thread1调用了lock方法请求获得锁,首先会通过CAS的方式将state更新为1,表示自己thread1获得了锁,并将独占锁的线程持有者设置为thread1。
//ReentrantLock -> NofairSync -> lock(非公平锁)
final void lock() {if (compareAndSetState(0, 1))//setExclusiveOwnerThread是AbstractOwnableSynchronizer的方法,AQS继承了AbstractOwnableSynchronizersetExclusiveOwnerThread(Thread.currentThread());elseacquire(1);}
(2)这个时候有另一个线程thread2来尝试或者锁,同样也调用lock方法,尝试通过CAS的方式将state更新为1,但是由于之前已经有线程持有了state,所以thread2这一步CAS失败(前面的thread1已经获取state并且没有释放),
就会调用acquire(1)方法(该方法是AQS提供的模板方法,它会调用子类的tryAcquire方法)。
非公平锁的实现中,AQS的模板方法acquire(1)就会调用NofairSync的tryAcquire方法,而tryAcquire方法又调用的Sync的nonfairTryAcquire方法,所以我们看看nonfairTryAcquire的流程。
//NofairSyncprotected final boolean tryAcquire(int acquires) {return nonfairTryAcquire(acquires);}final boolean nonfairTryAcquire(int acquires) {//(1)获取当前线程final Thread current = Thread.currentThread();//(2)获得当前同步状态stateint c = getState();//(3)如果state==0,表示没有线程获取if (c == 0) {//(3-1)那么就尝试以CAS的方式更新state的值if (compareAndSetState(0, acquires)) {//(3-2)如果更新成功,就设置当前独占模式下同步状态的持有者为当前线程setExclusiveOwnerThread(current);//(3-3)获得成功之后,返回truereturn true;}}//(4)这里是重入锁的逻辑else if (current == getExclusiveOwnerThread()) {//(4-1)判断当前占有state的线程就是当前来再次获取state的线程之后,就计算重入后的stateint nextc = c + acquires;//(4-2)这里是风险处理if (nextc < 0) // overflowthrow new Error(\"Maximum lock count exceeded\");//(4-3)通过setState无条件的设置state的值,(因为这里也只有一个线程操作state的值,即//已经获取到的线程,所以没有进行CAS操作)setState(nextc);return true;}//(5)没有获得state,也不是重入,就返回falsereturn false;}
总结来说就是:
1、线程thread2想要去获取锁。
2、获取当前AQS的state的值。如果此时state的值是0,那么我们就通过CAS操作获取锁,然后设置AQS的线程占有者为thread2。
很明显,在当前的这个执行情况下,state的值是1不是0,因为我们的thread1还没有释放锁。所以CAS失败,后面第3步的重入逻辑也不会进行
3、如果当前将要获取锁的线程等于此时AQS的exclusiveOwnerThread的线程,等于还是原来的线程thread1,则此时将state的值加1,这是重入锁的实现方式。
4、最终thread2执行到这里会返回false。
(3)上面的thread2加锁失败,返回false。那么根据开始我们讲到的AQS概述就应该将thread2构造为一个Node结点加入同步队列中。
因为NofairSync的tryAcquire方法是由AQS的模板方法acquire()来调用的,那么我们看看该方法的源码以及执行流程。
//(1)tryAcquire,这里thread2执行返回了false,那么就会执行addWaiter将当前线程构造为一个结点加入同步队列中public final void acquire(int arg) {if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))selfInterrupt();}
那么我们就看一下addWaiter方法的执行流程。
private Node addWaiter(Node mode) {//(1)将当前线程以及阻塞原因(是因为SHARED模式获取state失败还是EXCLUSIVE获取失败)构造为Node结点Node node = new Node(Thread.currentThread(), mode);//(2)这一步是快速将当前线程插入队列尾部Node pred = tail;if (pred != null) {//(2-1)将构造后的node结点的前驱结点设置为tailnode.prev = pred;//(2-2)以CAS的方式设置当前的node结点为tail结点if (compareAndSetTail(pred, node)) {//(2-3)CAS设置成功,就将原来的tail的next结点设置为当前的node结点。这样这个双向队//列就更新完成了pred.next = node;return node;}}//(3)执行到这里,说明要么当前队列为null,要么存在多个线程竞争失败都去将自己设置为tail结点,//那么就会有线程在上面(2-2)的CAS设置中失败,就会到这里调用enq方法enq(node);return node;}
那么总结一下addWaiter方法
1、将当前将要去获取锁的线程也就是thread2和独占模式封装为一个node对象。
2、尝试快速的将当前线程构造的node结点添加作为tail结点(这里就是直接获取当前tail,然后将node的前驱结点设置为tail),
并且以CAS的方式将node设置为tail结点(CAS成功后将原tail的next设置为node,然后这个队列更新成功)。
3、如果2设置失败,就进入enq方法。
在刚刚的thread1和thread2的环境下,开始时候线程阻塞队列是空的(因为thread1获取了锁,thread2也是刚刚来请求锁,所以线程阻塞队列里面是空的)。
很明显,这个时候队列的尾部tail节点也是null,那么将直接进入到enq方法。所以我们看看enq方法的实现
//通过CAS + 自旋的方式插入节点到队尾
private
Node enq(final Node node) {for (;;) {//(4)还是先获取当前队列的tail结点Node t = tail;//(5)如果tail为null,表示当前同步队列为null,就必须初始化这个同步队列的head和tail(建//立一个哨兵结点)if (t == null) {//(5-1)初始情况下,多个线程竞争失败,在检查的时候都发现没有哨兵结点,所以需要CAS的//设置哨兵结点if (compareAndSetHead(new Node()))tail = head;}//(6)tail不为nullelse {//(6-1)直接将当前结点的前驱结点设置为tail结点node.prev = t;//(6-2)前驱结点设置完毕之后,还需要以CAS的方式将自己设置为tail结点,如果设置失败,//就会重新进入循环判断一遍if (compareAndSetTail(t, node)) {t.next = node;return t;}}}}
enq方法内部是一个自旋循环,第一次循环默认情况如下图所示
1、首先代码块(4)处将t指向了tail,判断得到t==null,如图(1)所示;
2、于是需要新建一个哨兵结点作为整个同步队列的头节点(代码块5-1处执行)
3、完了之后如图(2)所示。这样第一次循环执行完毕。
第二次循环整体执行如下图所示。
1、还是先获取当前tail结点然后将t指向tail结点。如下图的(3)
2、然后判断得到当前t!=null,所以enq方法中进入代码块(6).
3、在(6-1)代码块中将node的前驱结点设置为原来队列的tail结点,如下图的(4)所示。
4、设置完前驱结点之后,代码块(6-2)会以CAS的方式将当前的node结点设置为tail结点,如果设置成功,就会是下图(5)所示。
更新完tail结点之后,需要保证双向队列的,所以将原来的指向哨兵结点的t的next结点指向node结点,如下图(6)所示。最后返回。
总结来说,即使在多线程情况下,enq方法还是能够保证每个线程结点会被安全的添加到同步队列中,因为enq通过CAS方式将结点添加到同步队列之后才会返回,
否则就会不断尝试添加(这样实际上就是在并发情况下,把向同步队列添加Node变得串行化了)
(4)在上面AQS的模板方法中,acquire()方法还有一步acquireQueued,这个方法的主要作用就是在同步队列中嗅探到自己的前驱结点,如果前驱结点是头节点的话就会尝试取获取同步状态,
否则会先设置自己的waitStatus为-1,然后调用LockSupport的方法park自己。具体的实现如下面代码所示
final boolean acquireQueued(final Node node, int arg) {boolean failed = true;try {boolean interrupted = false;//在这样一个循环中尝试tryAcquire同步状态for (;;) {//获取前驱结点final Node p = node.predecessor();//(1)如果前驱结点是头节点,就尝试取获取同步状态,这里的tryAcquire方法相当于还是调//用NofairSync的tryAcquire方法,在上面已经说过if (p == head && tryAcquire(arg)) {//如果前驱结点是头节点并且tryAcquire返回true,那么就重新设置头节点为nodesetHead(node);p.next = null; //将原来的头节点的next设置为null,交由GC去回收它failed = false;return interrupted;}//(2)如果不是头节点,或者虽然前驱结点是头节点但是尝试获取同步状态失败就会将node结点//的waitStatus设置为-1(SIGNAL),并且park自己,等待前驱结点的唤醒。至于唤醒的细节//下面会说到 p节点是node节点的前驱节点,node节点是添加到队列的末尾。if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);//(3)如果没能成功重新获取锁,则取消获取}}
在上面的代码中我们可以看出,这个方法也是一个自旋循环,继续按照刚刚的thread1和thread2这个情况分析。在enq方法执行完之后,同步队列的情况大概如下所示。
当前的node结点的前驱结点为head,所以会调用tryAcquire()方法去获得同步状态。但是由于state被thread1占有,所以tryAcquire失败。这里就是执行acquireQueued方法的代码块(2)了。
代码块(2)中首先调用了shouldParkAfterFailedAcquire方法,该方法会将同步队列中node结点的前驱结点的waitStatus为CANCELLED的线程移除,并将当前调用该方法的线程所属结点自己和他的前驱结点的waitStatus设置为-1(SIGNAL),然后返回。具体方法实现如下所示。
//方法名称的解释为:停止失败的获取
private
static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {//(1)获取前驱结点的waitStatusint ws = pred.waitStatus;//(2)如果前驱结点的waitStatus为SINGNAL,就直接返回trueif (ws == Node.SIGNAL)//前驱结点的状态为SIGNAL,那么该结点就能够安全的调用park方法阻塞自己了。return true;if (ws > 0) {//(3)这里就是将所有的前驱结点状态为CANCELLED的都移除do {node.prev = pred = pred.prev; //将node的前节点pred的前节点赋值给node的前节点,pred从中删除,因为pred.waitStatus > 0} while (pred.waitStatus > 0);pred.next = node;} else {//CAS操作将这个前驱节点设置成SIGHNAL。compareAndSetWaitStatus(pred, ws, Node.SIGNAL);}return false;}
所以shouldParkAfterFailedAcquire方法执行完毕,现在的同步队列情况大概就是这样子,即哨兵结点的waitStatus值变为-1。
上面的执行完毕返回到acquireQueued方法的时候,在acquireQueued方法中就会进行第二次循环了,但是还是获取state失败,而当再次进入shouldParkAfterFailedAcquire方法的时候,当前结点node的前驱结点head的waitStatus已经为-1(SIGNAL)了,就会返回true,然后acquireQueued方法中就会接着执行parkAndCheckInterrupt将自己park阻塞挂起。
private final boolean parkAndCheckInterrupt() {LockSupport.park(this);return Thread.interrupted();}
代码3实现某个node取消获取锁。
private void cancelAcquire(Node node) {if (node == null)return;node.thread = null;// 遍历并更新节点前驱,把node的prev指向前部第一个非取消节点。Node pred = node.prev;while (pred.waitStatus > 0)node.prev = pred = pred.prev;// 记录pred节点的后继为predNext,后续CAS会用到。Node predNext = pred.next;// 直接把当前节点的等待状态置为取消,后继节点即便也在cancel可以跨越node节点。node.waitStatus = Node.CANCELLED;/** 如果CAS将tail从node置为pred节点了* 则剩下要做的事情就是尝试用CAS将pred节点的next更新为null以彻底切断pred和node的联系。* 这样一来就断开了pred与pred的所有后继节点,这些节点由于变得不可达,最终会被回收掉。* 由于node没有后继节点,所以这种情况到这里整个cancel就算是处理完毕了。** 这里的CAS更新pred的next即使失败了也没关系,说明有其它新入队线程或者其它取消线程更新掉了。*/if (node == tail && compareAndSetTail(node, pred)) {compareAndSetNext(pred, predNext, null);} else {// 如果node还有后继节点,这种情况要做的事情是把pred和后继非取消节点拼起来。int ws;if (pred != head &&((ws = pred.waitStatus) == Node.SIGNAL ||(ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&pred.thread != null) {Node next = node.next;/** 如果node的后继节点next非取消状态的话,则用CAS尝试把pred的后继置为node的后继节点* 这里if条件为false或者CAS失败都没关系,这说明可能有多个线程在取消,总归会有一个能成功的。*/if (next != null && next.waitStatus <= 0)compareAndSetNext(pred, predNext, next);} else {/** 这时说明pred == head或者pred状态取消或者pred.thread == null* 在这些情况下为了保证队列的活跃性,需要去唤醒一次后继线程。* 举例来说pred == head完全有可能实际上目前已经没有线程持有锁了,* 自然就不会有释放锁唤醒后继的动作。如果不唤醒后继,队列就挂掉了。** 这种情况下看似由于没有更新pred的next的操作,队列中可能会留有一大把的取消节点。* 实际上不要紧,因为后继线程唤醒之后会走一次试获取锁的过程,* 失败的话会走到shouldParkAfterFailedAcquire的逻辑。* 那里面的if中有处理前驱节点如果为取消则维护pred/next,踢掉这些取消节点的逻辑。*/unparkSuccessor(node);}/** 取消节点的next之所以设置为自己本身而不是null,* 是为了方便AQS中Condition部分的isOnSyncQueue方法,* 判断一个原先属于条件队列的节点是否转移到了同步队列。** 因为同步队列中会用到节点的next域,取消节点的next也有值的话,* 可以断言next域有值的节点一定在同步队列上。** 在GC层面,和设置为null具有相同的效果。*/node.next = node;}}/*** 唤醒后继线程。*/private void unparkSuccessor(Node node) {int ws = node.waitStatus;// 尝试将node的等待状态置为0,这样的话,后继争用线程可以有机会再尝试获取一次锁。if (ws < 0)compareAndSetWaitStatus(node, ws, 0);Node s = node.next;/** 这里的逻辑就是如果node.next存在并且状态不为取消(就是waitStatus!=1),则直接唤醒s即可* 否则需要从tail开始向前找到node之后最近的非取消节点。** 这里为什么要从tail开始向前查找也是值得琢磨的:* 如果读到s == null,不代表node就为tail,参考addWaiter以及enq函数中的我的注释。* 不妨考虑到如下场景:* 1. node某时刻为tail* 2. 有新线程通过addWaiter中的if分支或者enq方法添加自己* 3. compareAndSetTail成功* 4. 此时这里的Node s = node.next读出来s == null,但事实上node已经不是tail,它有后继了!*/if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)LockSupport.unpark(s.thread);}
(5)我们梳理一下整个方法调用的流程,假设现在又有一个thread3线程竞争这个state,那么这个方法调用的流程是什么样的呢。
①首先肯定是调用ReentrantLock.lock()方法去尝试加锁;
②因为是非公平锁,所以就会转到调用NoFairSync.lock()方法;
③在NoFairSync.lock()方法中,会首先尝试设置state的值,因为已经被占有那么肯定就是失败的。这时候就会调用AQS的模板方法AQS.acquire(1)。
④在AQS的模板方法acquire(1)中,实际首先会调用的是子类的tryAcquire()方法,而在非公平锁的实现中即Sync.nofairTryAcquire()方法。
⑤显然tryAcquire()会返回false,所以acquire()继续执行,即调用AQS.addWaiter(),就会将当前线程构造称为一个Node结点,初始状况下waitStatus为0。
⑥在addWaiter方法中,会首先尝试直接将构建的node结点以CAS的方式(存在多个线程尝试将自己设置为tail)设置为tail结点,如果设置成功就直接返回,失败的话就会进入一个自旋循环的过程。即调用enq()方法。最终保证自己成功被添加到同步队列中。
⑦加入同步队列之后,就需要将自己挂起或者嗅探自己的前驱结点是否为头结点以便尝试获取同步状态。即调用acquireQueued()方法。
⑧在这里thread3的前驱结点不是head结点,所以就直接调用shouldParkAfterFailedAcquire()方法,该方法首先会将刚刚的thread2线程结点中的waitStatue的值改变为-1(初始的时候是没有改变这个waitStatus的,每个新节点的添加就会改变前驱结点的waitStatus值)。
⑨thread2所在结点的waitStatus改变后,shouldParkAfterFailedAcquire方法会返回false。所以之后还会在acquireQueued中进行第二次循环。并再次调用shouldParkAfterFailedAcquire方法,然后返回true。最终调用parkAndCheckInterrupt()将自己挂起。
每个线程去竞争这个同步状态失败的话大概就会经历上面的这些过程。假设现在thread3经历上面这些过程之后也进入同步队列,那么整个同步队列大概就是下面这样了.
将上面的流程整理一下大概就是下面这个图
3.3.2 释放同步状态
上面说一ReentrantLock为例到了怎样去获得非公平锁,那么thread1获取锁,执行完释放锁的流程是怎样的呢。首先肯定是在finally中调用ReentrantLock.unlock()方法,所以我们就从这个方法开始看起。
(1)从下面的unlock方法中我们可以看出,实际上是调用AQS的release()方法,其中传递的参数为1,表示每一次调用unlock方法都是释放所获得的一次state。重入的情况下会多次调用unlock方法,也保证了lock和unlock是成对的。
public void unlock() {sync.release(1); //这里ReentrantLock的unlock方法调用了AQS的release方法}public final boolean release(int arg) {//这里调用了子类的tryRelease方法,即ReentrantLock的内部类Sync的tryRelease方法if (tryRelease(arg)) {Node h = head;if (h != null && h.waitStatus != 0)unparkSuccessor(h);return true;}return false;}
(2)上面看到release方法首先会调用ReentrantLock的内部类Sync的tryRelease方法。而通过下面代码的分析,大概知道tryRelease做了这些事情。
① 获取当前AQS的state,并减去1;
② 判断当前线程是否等于AQS的exclusiveOwnerThread,如果不是,就抛IllegalMonitorStateException异常,这就保证了加锁和释放锁必须是同一个线程;
③ 如果(state-1)的结果不为0,说明锁被重入了,需要多次unlock,这也是lock和unlock成对的原因;
④ 如果(state-1)等于0,我们就将AQS的ExclusiveOwnerThread设置为null;
⑤ 如果上述操作成功了,也就是tryRelase方法返回了true;返回false表示需要多次unlock。
protected final boolean tryRelease(int releases) {//(1)获取当前的state,然后减1,得到要更新的stateint c = getState() - releases;//(2)判断当前调用的线程是不是持有锁的线程,如果不是抛出IllegalMonitorStateExceptionif (Thread.currentThread() != getExclusiveOwnerThread())throw new IllegalMonitorStateException();boolean free = false;//(3)判断更新后的state是不是0if (c == 0) {free = true;//(3-1)将当前锁持者设为nullsetExclusiveOwnerThread(null);}//(4)设置当前state=c=getState()-releasessetState(c);//(5)只有state==0,才会返回truereturn free;}
(3)那么当tryRelease返回true之后,就会执行release方法中if语句块中的内容。从上面我们看到
if (tryRelease(arg)) {//(1)获取当前队列的头节点headNode h = head;//(2)判断头节点不为null,并且头结点的waitStatus不为0(CACCELLED)if (h != null && h.waitStatus != 0)//(3-1)调用下面的方法唤醒同步队列head结点的后继结点中的线程unparkSuccessor(h);return true;}
(4)在获取锁的流程分析中,我们知道当前同步队列如下所示,所以判断得到head!=null并且head的waitStatus=-1。
所以会执行unparkSuccessor方法,传递的参数为指向head的一个引用h.那下面我们就看看unparkSuccessor方法中处理了什么事情。
private void unparkSuccessor(Node node) {//(1)获得node的waitStatusint ws = node.waitStatus;//(2)判断waitStatus是否小于0if (ws < 0)//(2-1)如果waitStatus小于0需要将其以CAS的方式设置为0compareAndSetWaitStatus(node, ws, 0);//(2)获得s的后继结点,这里即head的后继结点Node s = node.next;//(3)判断后继结点是否已经被移除,或者其waitStatus==CANCELLEDif (s == null || s.waitStatus > 0) {//(3-1)如果s!=null,但是其waitStatus=CANCELLED需要将其设置为nulls = null;//(3-2)会从尾部结点开始寻找,找到离head最近的不为null并且node.waitStatus的结点小于0的for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}//(4)node.next!=null或者找到的一个离head最近的结点不为nullif (s != null)//(4-1)唤醒这个结点中的线程LockSupport.unpark(s.thread);}
从上面的代码实现中可以总结,unparkSuccessor主要做了两件事情:
① 获取head节点的waitStatus,如果小于0,就通过CAS操作将head节点的waitStatus修改为0
② 寻找head节点的下一个节点,如果这个节点的waitStatus小于0,就唤醒这个节点,否则遍历下去,找到第一个waitStatus<=0的节点,并唤醒。
(5)下面我们应该分析的是释放掉state之后,唤醒同步队列中的结点之后程序又是是怎样执行的。按照上面的同步队列示意图,那么下面会执行这些
①thread1(获取到锁的线程)调用unlock方法之后,最终执行到unparkSuccessor方法会唤醒thread2结点。所以thread2被unpark。
②再回想一下,当时thread2是在调用acquireQueued方法之后的parkAndCheckInterrupt里面被park阻塞挂起了,所以thread2被唤醒之后继续执行acquireQueued方法中的for循环(到这里可以往前回忆看一下acquireQueued方法中的for循环做了哪些事情);
③for循环中做的第一件事情就是查看自己的前驱结点是不是头结点(按照上面的同步队列情况是满足的);
④前驱结点是head结点,就会调用tryAcquire方法尝试获取state,因为thread1已经释放了state,即state=0,所以thread2调用tryAcquire方法时候,以CAS的方式去将state从0更新为1是成功的,所以这个时候thread2就获取到了锁
⑤thread2获取state成功,就会从acquireQueued方法中退出。注意这时候的acquireQueued返回值为false,所以在AQS的模板方法的acquire中会直接从if条件退出,最后执行自己锁住的代码块中的程序。
相关图谱,方便理解
其他竞争情况
-
当同步队列中头节点唤醒后继节点时,此时可能有其他线程尝试获取同步状态。
-
假设获取成功,将会被设置为头节点。
-
头节点后续节点获取同步状态失败。
3.4共享锁的实现
与独占模式不同,共享模式下,同一时刻会有多个线程获取共享同步状态。
如果需要使用AQS中共享锁,在实现tryAcquireShared方法时需要注意,返回负数表示获取失败;返回0表示成功,但是后继争用线程不会成功;返回正数表示
获取成功,并且后继争用线程也可能成功。
3.4.1获取共享锁的实现
public final void acquireShared(int arg) {if (tryAcquireShared(arg) < 0)doAcquireShared(arg);}private void doAcquireShared(int arg) {final Node node = addWaiter(Node.SHARED);boolean failed = true;try {boolean interrupted = false;for (;;) {final Node p = node.predecessor();if (p == head) {int r = tryAcquireShared(arg);// 一旦共享获取成功,设置新的头结点,并且唤醒后继线程if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCif (interrupted)selfInterrupt();failed = false;return;}}if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())interrupted = true;}} finally {if (failed)cancelAcquire(node);}}/*** 这个函数做的事情有两件:* 1. 在获取共享锁成功后,设置head节点* 2. 根据调用tryAcquireShared返回的状态以及节点本身的等待状态来判断是否要需要唤醒后继线程。*/private void setHeadAndPropagate(Node node, int propagate) {// 把当前的head封闭在方法栈上,用以下面的条件检查。Node h = head;setHead(node);/** propagate是tryAcquireShared的返回值,这是决定是否传播唤醒的依据之一。* h.waitStatus为SIGNAL或者PROPAGATE时也根据node的下一个节点共享来决定是否传播唤醒,* 这里为什么不能只用propagate > 0来决定是否可以传播在本文下面的思考问题中有相关讲述。*/if (propagate > 0 || h == null || h.waitStatus < 0 ||(h = head) == null || h.waitStatus < 0) {Node s = node.next;if (s == null || s.isShared())doReleaseShared();}}/*** 这是共享锁中的核心唤醒函数,主要做的事情就是唤醒下一个线程或者设置传播状态。* 后继线程被唤醒后,会尝试获取共享锁,如果成功之后,则又会调用setHeadAndPropagate,将唤醒传播下去。* 这个函数的作用是保障在acquire和release存在竞争的情况下,保证队列中处于等待状态的节点能够有办法被唤醒。*/private void doReleaseShared() {/** 以下的循环做的事情就是,在队列存在后继线程的情况下,唤醒后继线程;* 或者由于多线程同时释放共享锁由于处在中间过程,读到head节点等待状态为0的情况下,* 虽然不能unparkSuccessor,但为了保证唤醒能够正确稳固传递下去,设置节点状态为PROPAGATE。* 这样的话获取锁的线程在执行setHeadAndPropagate时可以读到PROPAGATE,从而由获取锁的线程去释放后继等待线程。*/for (;;) {Node h = head;// 如果队列中存在后继线程。if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;unparkSuccessor(h);}// 如果h节点的状态为0,需要设置为PROPAGATE用以保证唤醒的传播。else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;}// 检查h是否仍然是head,如果不是的话需要再进行循环。if (h == head)break;}}
-
当队列中的等待线程被唤醒以后就重新尝试获取锁资源,如果成功则唤醒后面还在等待的共享节点并把该唤醒事件传递下去,即会依次唤醒在该节点后面的所有共享节点,否则继续挂起等待。
当一个同享节点获取到同步状态,并唤醒后面等待的共享状态的结果如下图所示:
最后,获取到同步状态的线程执行完毕,同步队列中只有一个独占节点:
3.4.2 释放共享锁的实现
释放共享锁与获取共享锁的代码共享了doReleaseShared,用于实现唤醒的传播。
public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {// doReleaseShared的实现上面获取共享锁已经介绍doReleaseShared();return true;}return false;}
3.5 AQS中的条件队列
ConditionObject是AQS中定义的内部类,实现了Condition接口,ConditionObject是基于Lock实现的,在其内部通过链表来维护等待队列(条件队列)。
Contidion必须在lock的同步控制块中使用,调用Condition的signal方法并不代表线程可以马上执行,signal方法的作用是将线程所在的节点从等待队列中移除,然后加入到同步队列中,线程的执行始终都需要根据同步状态(即线程是否占有锁)。
与Lock之间的实现关系
Condition是一个接口,其实现类只有两个:AQS和AQLS,都以内部类的形式存在,内部类叫做ConditionObject,这里有点纳闷,既然这个类是为Lock专属定制的,为什么不在ReentrantLock里面来实现呢?放在AQS不会太臃肿吗?不知道Doug Lea上神当时是怎么考虑的。
由于在AQS中已经实现,因此在ReentrantLock里面对其操作也是很简单的,创建一个条件队列:
public Condition newCondition() {return sync.newCondition();}
sync中的实现:
abstract static class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = -5179523762034025860L;final ConditionObject newCondition() {return new ConditionObject();}}
ConditionObject内部类相关源码
两个成员变量
//条件(等待)队列的第一个节点private transient Node firstWaiter;//条件(等待)队列的最后一个节点private transient Node lastWaiter;
1. 核心方法:await() 不可中断的条件等待实现
public final void await() throws InterruptedException {//当前线程被中断则抛异常if (Thread.interrupted()) throw new InterruptedException();//添加进条件队列Node node = addConditionWaiter();//释放当前线程持有的锁int savedState = fullyRelease(node);int interruptMode = 0;//在这里一直查找创建的Node节点在不在Sync队列,不在就一直禁用当前线程while (!isOnSyncQueue(node)) {//park当前线程,直到唤醒LockSupport.park(this);//如被中断也跳出循环if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)break;}//此时已被唤醒,获取之前被释放的锁,if (acquireQueued(node, savedState) && interruptMode != THROW_IE)interruptMode = REINTERRUPT;//移除节点,释放内存if (node.nextWaiter != null) // clean up if cancelledunlinkCancelledWaiters();//被中断后的操作if (interruptMode != 0)reportInterruptAfterWait(interruptMode);}
简而言之,await()方法其实就是为当前线程创建一个Node节点,加入到Condition队列并释放锁,之后就一直查看这个节点是否在Sync队列中了(signal()方法将它移到Sync队列),如果在的话就唤醒此线程,重新获取锁
/*** 添加一个Node节点到Condition队列中*/private Node addConditionWaiter() {Node t = lastWaiter;//如果尾节点被取消,就清理掉if (t != null && t.waitStatus != Node.CONDITION) {unlinkCancelledWaiters();t = lastWaiter;}//新建状态为的CONDITION节点,并添加在尾部Node node = new Node(Thread.currentThread(), Node.CONDITION);if (t == null)firstWaiter = node;elset.nextWaiter = node;lastWaiter = node;return node;}/*** 释放当前线程的state,实际还是调用tryRelease方法*/final int fullyRelease(Node node) {boolean failed = true;try {int savedState = getState();if (release(savedState)) {failed = false;return savedState;} else {throw new IllegalMonitorStateException();}} finally {if (failed)node.waitStatus = Node.CANCELLED;}}
/*** 检查当前节点在不在Sync队列*/final boolean isOnSyncQueue(Node node) {//如果当前节点状态为CONDITION,一定还在Condition队列//如果Sync队列的前置节点为null,则表明当前节点一定还在Condition队列if (node.waitStatus == Node.CONDITION || node.prev == null)return false;//有后继节点,也有前置节点,那么一定在Sync队列if (node.next != null) // If has successor, it must be on queuereturn true;//倒查Node节点,前置节点不能为null,第一个if已经做了判断,其前置节点为non-null,但是当前节点也不在Sync也是可能的,因为CAS操作将其加入队列也可能失败,所以我们需要从尾部开始遍历确保其在队列return findNodeFromTail(node);}
2. 核心方法:signal()将等待时间最长的线程(如果存在)从Condition队列中移动到拥有锁的Sync队列
public final void signal() {//当前线程非独占线程,报非法监视器状态异常if (!isHeldExclusively())throw new IllegalMonitorStateException();//头节点是等待时间最长的节点Node first = firstWaiter;if (first != null)doSignal(first);}private void doSignal(Node first) {do {//如果头节点的下一节点为null,则将Condition的lastWaiter置为nullif ( (firstWaiter = first.nextWaiter) == null)lastWaiter = null;//将头结点的下一个节点设为nullfirst.nextWaiter = null;//被唤醒并且头节点不为null则结束循环} while (!transferForSignal(first) &&(first = firstWaiter) != null);}final boolean transferForSignal(Node node) {//如果无法改变节点状态,说明节点已经被唤醒if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))return false;//将当前节点添加到Sync队列尾部,并设置前置节点的waitStatus为SIGNAL,表明后继有节点(可能)将被唤醒,如果取消或者设置waitStatus失败,会唤醒重新同步操作,这时候waitStatus是瞬时的,出现错误也是无妨的Node p = enq(node);int ws = p.waitStatus;if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))LockSupport.unpark(node.thread);return true;}
3、核心方法:signalAll() 将所有线程从Condition队列移动到拥有锁的Sync队列中。
public final void signalAll() {if (!isHeldExclusively())throw new IllegalMonitorStateException();Node first = firstWaiter;if (first != null)doSignalAll(first);}/*** 遍历所有节点,加入到拥有锁的Sync队列*/private void doSignalAll(Node first) {lastWaiter = firstWaiter = null;do {Node next = first.nextWaiter;first.nextWaiter = null;transferForSignal(first);first = next;} while (first != null);}
分析上面的源码,我们可以看出,其实signal方法的核心其实是doSignal和transferForSignal方法,doSignal的主要作用就是将条件(等待)队列中的头节点firstWaiter从队列中移除,transferForSignal方法的主要作用就是将doSignal方法中移除的firstWaiter节点通过enq方法重新添加到同步队列中,从这里也可以看出为什么会在await方法中调用isOnSyncQueue方法判断节点是否处于同步队列中了。
下面我们用图来表示await方法节点的存放过程:
由于await方法必须在占有锁后才能调用,因此,最开始线程所在的节点应该位于同步队列中的:
当成功调用了Condition的await方法,此时线程拿到了锁,接下来需要将当前线程的节点从同步队列中移除,在await方法内部的addConditionWaiter执行后,节点加入到了等待队列中,即节点变化过程如下
之后await方法会通过isOnSyncQueue方法判断,节点是否在同步队列中,至于为什么要判断,上面我已经做了解释,不再重复。signal和signalAll方法就是将条件队列中的节点按顺序移除,并重新添加到同步队列,下面我们就看看signal方法中,节点的变化过程:
节点变化总结,经过上面节点的变化过程,我们发现一个线程所在的节点从同步队列中移除加入到等待队列过后,可能会存在一种特殊情况就是其他线程通过signal方法又将该节点从等待队列中移除然后加入到同步队列中。经过上面的分析我们也可以看出await方法实际上就是在while循环中阻塞线程的。
总结
AQS毫无疑问是Doug Lea大师令人叹为观止的作品,它实现精巧、鲁棒、优雅,很好地封装了同步状态的管理、线程的等待与唤醒,足以满足大多数同步工具的需求。
阅读AQS的源码不是一蹴而就就能完全读懂的,阅读源码大致分为三步:
1 读懂大概思路以及一些重要方法之间的调用关系
2 逐行看代码的具体实现,知道每一段代码是干什么的
3 琢磨参悟某一段代码为什么是这么写的,能否换一种写法,能否前后几行代码调换顺序,作者是怎么想的。
自己能力有限,参考一些相关文章总结至此,如有不足 请君指点。