万字长文源码解析AQS
AQS概述
AQS是指AbstractQueuedSynchronizer(抽象同步队列,简称AQS)。AQS是Java并发编程中一个核心的组件,主要用于实现个各种同步器,如ReentrantLock、Semaphore和CountDownLatch等。AQS提供了一套多线程访问共享资源的同步模板,简化了同步器的实现。
AQS的基础组成
因为AQS的本质上还是一个队列,所以在AQS的整体数据结构都是围绕着队列来展开的。
Node的结构
在AQS的队列中,队列中的每一个元素都是一个Node,这个Node通过一个内部类来进行定义。
static final class Node {
static final Node SHARED = new Node(); // 共享模式
static final Node EXCLUSIVE = null;// 独占模式
static final int CANCELLED = 1; // 节点的状态:节点已取消
static final int SIGNAL = -1; // 节点的状态:后继节点需要被唤醒
static final int CONDITION = -2; // 节点的状态:节点在条件队列中
static final int PROPAGATE = -3; // 节点的状态:共享模式下的传播释放
volatile int waitStatus; // 当前节点的去等待状态
volatile Node prev; // 当前节点的前驱节点
volatile Node next; // 当前节点的后继节点
volatile Thread thread; // 当前节点的线程对象
Node nextWaiter; // 条件队列的下一个节点(或共享模式标记)
// 是否是共享节点
final boolean isShared() {
return nextWaiter == SHARED;
}
// 获取当前节点的前驱节点
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}
// 构造方法
Node() {
}
Node(Thread thread, Node mode) {
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}Node有两种模式:共享模式和独占模式,这其实就是我们经常看到的共享锁和独占锁。例如ReentrantLock就是独占锁,ReentrantReadWriteLock中的ReadLock就是共享锁,WriteLock就是独占锁。
AQS的整体结构
Node是队列中的元素的结构,那么这些元素是如何组成这个同步队列的,可以直接查看AQS的属性字段就能很清晰了。
// 当前队列的头节点
private transient volatile Node head;
// 当前队列的尾节点
private transient volatile Node tail;
// 当前队列的状态
private volatile int state;看着这些字段其实也比较好理解,这就是标准的队列的模式的。
在AQS中采用了模板方法,其中下面的方法都是需要子类来实现的,这些具体的实现可以参考ReentrantLock中的相关实现,这里先不去详细讲述,只需要大概知道这些方法的作用就好了。
// 尝试获取独占锁
protected boolean tryAcquire(int arg)
// 尝试释放独占锁
protected boolean tryRelease(int arg)
// 尝试获取共享锁
protected int tryAcquireShared(int arg)
// 尝试释放共享锁
protected boolean tryReleaseShared(int arg)
// 判断是否被当前线程独占
protected boolean isHeldExclusively()了解完了整体的AQS的基础使用后,就可以来看AQS的核心运行逻辑了。
在讲到AQS入口之前,我们先需要了解下AQS的使用方式,所以我们可以先来看下它的实现类ReentrantLock的使用方式。
ReentrantLock的基本使用
ReentrantLock是Java并发包中可重入互斥锁,相比synchronized关键字而言,它提供了更灵活的锁操作。
创建ReentrantLock
import java.util.concurrent.locks.ReentrantLock;
public class ReentrantLockExample {
// 创建锁实例
private final ReentrantLock lock = new ReentrantLock();
// 可选择创建公平锁(默认是非公平锁)
// private final ReentrantLock fairLock = new ReentrantLock(true);
}基本的使用方式
使用lock()方法
public void criticalSection() {
lock.lock(); // 获取锁
try {
// 临界区代码
System.out.println(Thread.currentThread().getName() + " 进入临界区");
Thread.sleep(1000);
// 业务逻辑...
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock(); // 必须在finally中释放锁
System.out.println(Thread.currentThread().getName() + " 释放锁");
}
}使用tryLock()方法
public boolean tryCriticalSection() {
if (lock.tryLock()) { // 尝试获取锁,立即返回
try {
// 成功获取锁,执行临界区代码
System.out.println("成功获取锁,执行操作");
return true;
} finally {
lock.unlock();
}
} else {
// 未获取到锁,执行其他逻辑
System.out.println("未获取到锁,执行备选方案");
return false;
}
}
// 带超时的 tryLock
public boolean tryCriticalSectionWithTimeout() throws InterruptedException {
if (lock.tryLock(3, TimeUnit.SECONDS)) { // 等待最多3秒
try {
System.out.println("在超时时间内成功获取锁");
return true;
} finally {
lock.unlock();
}
} else {
System.out.println("获取锁超时");
return false;
}
}使用可中断的锁资源
public void interruptibleCriticalSection() throws InterruptedException {
lock.lockInterruptibly(); // 可响应中断的锁获取
try {
// 临界区代码
while (!Thread.currentThread().isInterrupted()) {
// 执行操作
}
} finally {
lock.unlock();
}
}ReentrantLock的特点是:可重入性、公平性选择、灵活获取、条件支持以及提供丰富的锁状态查询等。在使用ReentrantLock的时候,需要注意的点在于:
- 必须释放锁:lock()后必须再finally中调用unlock()方法
- 避免死锁:注意锁的获取顺序
- 性能考虑:再简单场景下,synchronized性能可能更好
- 可中断性:lockInterruptibly()可以响应线程中断
AQS的源码分析
在了解了ReentrantLock的基础使用方法后,我们大概就知道了ReentrantLock的入口方法。通常而言,它的入口方法都是tryLock()或者lock()方法。因为AQS是一个抽象类,所以在实际的开发过程中我们都会使用它的子类来进行开发,并且它本身是依赖于模板方法设计模式的,所以有些重要的核心逻辑是子类来实现的。ReentrantLock作为使用范围最为广泛的子类,我们就依托于ReentrantLock来分析它的源码。
创建ReentrantLock
ReentrantLock的有两种模式,就是公平锁模式和非公平锁模式,这里我们先来将下非公平锁模式(也是使用最广泛的模式)。
创建ReentrantLock的方式也比较简单,只需要直接通过new关键字就好了:
// 这样创建出来的就是非公平锁
ReentrantLock reentrantLock = new ReentrantLock();进入ReentrantLock之后,找到这个无参的构造方法:
public ReentrantLock() {
// sync是ReentrantLock的非静态属性,NonfairSync是AQS的子类
sync = new NonfairSync();
}它的逻辑也是比较简单的,就是单纯的创建一个非公平模式的同步器 —— NonfairSync。NonfairSync是Sync的子类,而Sync就是AQS的子类,其中NonfairSync和Sync都是ReentrantLock的内部类。至于具体的逻辑代码,这里就不贴出来了,可以直接看ReentrantLock的源码。
这里如果想知道公平锁模式的话,其实创建的时候携带一个boolean类型的参数就可以了:
1
// 这样创建出来的就是公平锁
2
ReentrantLock reentrantLock = new ReentrantLock(true);至于后续的代码,可以直接追下源码,也是很好理解的。
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}其实就是创建不同的Sync类的实例对象。下面会先看非公平锁,再来看公平锁的场景。
非公平锁模式
无论是公平锁还是非公平锁,它的入口方法都是lock()或者是tryLock(),我们优先从tryLock()方法进入。
tryLock()方法
public boolean tryLock() {
return sync.nonfairTryAcquire(1);
}在tryLock()方法中,它最终调用的是sync中的nonfairTryAcquire()方法,这个方法是sync自己实现的方法,不是AQS中的方法。
// acquires - 表示要获取的锁的数量
final boolean nonfairTryAcquire(int acquires) {
// 获取到当前线程
final Thread current = Thread.currentThread();
// 获取到当前的state的状态(AQS的属性)
int c = getState();
// 如果c===0,则表示当前锁是没有被其他线程抢占的
if (c == 0) {
// 通过CAS的方式去将锁的状态设置为1
if (compareAndSetState(0, acquires)) {
// 如果锁状态更新完毕后,就设置持有锁的线程为当前线程
setExclusiveOwnerThread(current);
// 返回true表示抢占锁成功了
return true;
}
}
// 如果当前线程就是正在持有锁的线程,那么就可以直接进入,只需要记录当前线程的重入次数就行了
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
// 将当前线程的重入次数更新到state属性中(重入n次,也需要解锁n次,否则state不会归零)
setState(nextc);
return true;
}
return false;
}其实对于tryLock()方法,不需要太过去纠结它的逻辑。至于重入的特点是ReentrantLock的特点,在其他的实现类中未必是支持的。所以,对于tryLock()方法而言,它的核心是:
- 将state的值更新为非0的数值 —— 对应的就是setState()方法和compareAndSetState()方法;
- 将持有锁的线程设置为当前线程 —— 对应的就是setExclusiveOwnerThread()方法;
至于是否可以重入,以及最大可重入几次,这种都是具体的AQS的实现类自己定义的。在分析AQS的源码的时候可以先抛掉这部分的逻辑,避免形成了刻板印象。
设置state的属性
设置state的属性,它是通过setState()方法和compareAndSetState()方法,这两个方法都是AQS中的方法。state属性的使用方式也是由子类实现类的,例如在ReentrantLock中state=0表示的,锁没有被抢占,反之就是被抢占了。
setState()方法比较好理解,逻辑也比较简单,它就是直接的将新的状态值设置给AQS的state属性就好了。
protected final void setState(int newState) {
state = newState;
}compareAndSetState()方法根据名字也能看出来,它是利用CAS的机制来将state属性设置为新的值。这里为什么要用CAS?这是因为需要保证state属性的线程安全,state属性作为共享变量,如果此时有多个线程来设置这个值,要保证state属性的更新是安全的,所以这里采取的是CAS的方式来更新state属性的值。
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}如果state属性的值的设置成功,就表示当前线程抢占到了锁,state的值也被它设置成了1,此时其他的线程再去执行这个CAS的时候就会返回false,它就无法抢占到这个锁。
设置当前线程为持有锁的线程
对于state属性而言,它就是只是个锁的状态的标识,在子类中可以是0代表抢占,也可以是1代表抢占都是无所谓的。因为tryLock()并不会阻塞,所以只是一次尝试获取锁,所以它并不会在AQS队列中占据一个节点。
setExclusiveOwnerThread()就是将当前线程赋值到exclusiveOwnerThread属性中,也比较好理解就是简单的赋值,它的作用就是记录当前是哪个线程持有这个锁。
// AbstractOwnableSynchronizer中的方法,它是AQS的父类,简单的属性赋值
protected final void setExclusiveOwnerThread(Thread thread) {
exclusiveOwnerThread = thread;
}lock()方法
lock()方法会阻塞式的来等待,直到它能获取到锁。
public void lock() {
sync.lock();
}lock()方法调用的sync实例的lock()方法,在Sync中也是用了模板方法,如果公平锁它是调用的FairSync中的lock()方法,反之非公平锁就是调用NonfairSync的lock()方法。这里讨论的非公平场景下所以先看NonfairSync中的lock()方法。
final void lock() {
// 通过CAS的方式将state的值由0设置为1
if (compareAndSetState(0, 1))
// 将当前线程标记为当前持有锁的线程
setExclusiveOwnerThread(Thread.currentThread());
else
// 如果设置失败了,就需要阻塞这个线程
acquire(1);
}lock()方法的前半段是比较好理解的,跟tryLock()并没有什么不同,都是CAS的方式来修改state的值,如果修改成功了就标记当前线程为持有锁的线程。
真正关键的方法是acquire()方法,当线程修改state属性失败之后,表示锁被其他的线程持有了,所以需要进入持有锁的线程释放锁,所以真正核心的关键就在于acquire()方法。
acquire()方法
public final void acquire(int arg) {
/**
* 调用链路:tryAcquire --> NonfairSync#tryAcquire --> NonfairSync#nonfairTryAcquire
* 这里的就是再次尝试获取一次加锁,如果这个成功了不用阻塞了
* 如果这里尝试还是失败的话,就执行后续的逻辑
*/
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}在acquire()方法会再次进行尝试加锁,在ReentrantLock中最终调用到的就是NonfairSync#nonfairTryAcquire()方法。如果这里,尝试加锁成功了,就直接结束了,反之,就需要将当前线程放入到AQS的等待队列中。
当tryAcquire()返回false的时候,代表加锁失败,进而执行addWriter()方法
// 创建一个Node节点,根据mode来选择是共享模式还是独占模式,这里传入的值为Node.EXCLUSIVE表示独占模式
private Node addWaiter(Node mode) {
// 创建一个Node节点:Node(nextWaiter=Thread.currentThread(),mode=Node.EXCLUSIVE)
Node node = new Node(Thread.currentThread(), mode);
// 获取当前队列的尾节点
Node pred = tail;
// 如果尾节点不为空,表示当前队列中已经有了等待的线程
if (pred != null) {
// 将当前节点的前驱节点设置为尾节点
node.prev = pred;
// 然后通过CAS的方式将当前节点设置为整个队列的尾节点
if (compareAndSetTail(pred, node)) {
// 如果设置成功后,将原本的尾节点的后继节点设置为当前节点
pred.next = node;
return node;
}
}
// 设置完上面的属性后,其实node节点就已经在队列中了,后面是后续的工作
enq(node);
return node;
}addWriter()方法的主要逻辑就是将当前线程对象封装成一个Node节点。所以此时有两种情况:
- 队列为空 (当前不存在等待锁的线程):直接执行入队的操作;此时队列中的内容为:
Node(nextWaiter=Thread.currentThread(),mode=Node.EXCLUSIVE)
tail = null
head = null- 队列不为空(当前已经有在等待锁的线程):通过尾插法将当前节点插入到队列中;
Node(nextWaiter=Thread.currentThread(),mode=Node.EXCLUSIVE)
tail = node
head = anotherNode然后再来看下enq()方法:
private Node enq(final Node node) {
// 先整个死循环
for (;;) {
// 获取当前的尾节点
Node t = tail;
// 如果尾节点为空,表示当前等待队列没有等待锁的线程
if (t == null) {
// 通过CAS的方式来创建一个创建一个空节点作为头节点
if (compareAndSetHead(new Node()))
// 并将这个头节点也设置成尾节点
tail = head;
} else {
// 将当前节点的前驱节点设置为现在的尾节点
node.prev = t;
// 然后通过CAS的方式将当前节点设置为新的尾节点
if (compareAndSetTail(t, node)) {
// 将原来尾节点的后继节点设置为当前节点
t.next = node;
// 返回尾节点
return t;
}
}
}
}enq()方法的主要作用就是保证head和tail属性的更新的一致性。这里要理解下tail在什么场景下会为null:
- AQS中的队列开始初始化的时候,此时tail和head的值都是null;
- 当有多个线程执行enq()方法并且都是对AQS的队列进行初始化的时候,因为这里头节点和尾节点的设置是两个步骤,所以在线程A通过CAS的方式设置头节点成功后,如果线程的上下文切换到线程B,此时线程B进来的时候会发现tail为null,但是它去CAS去设置头节点一定会失败的,所以它就会继续进入循环中。当线程上下文再次切换到线程A的时候,它将尾节点设置成功了。那么两个线程都是重新进入循环,然后进入else的分支中,将各自的线程对象加入到等待队列中。
当addWaiter()方法正确的执行完毕后,他就会返回当前队列的尾巴节点,也就是当前线程代表的节点。接着他会执行acquireQueued()方法。
final boolean acquireQueued(final Node node, int arg) {
// 表示操作成功还是失败
boolean failed = true;
try {
// 表示当前线程的中断标志位
boolean interrupted = false;
for (;;) {
// 获取当前节点的前驱节点
final Node p = node.predecessor();
/**
* 如果当前节点的前驱节点就是头节点
* 说明当前节点就是等待队列中的第一个线程,那么就再次尝试去获取锁
*/
if (p == head && tryAcquire(arg)) {
/*
* 尝试加锁成功了
* 表示当前线程获取到锁了,所以队列中其实是没有线程了
* 将head设置为当前线程,同时将当前节点的thread属性置为null,前驱节点prev置为null
*/
setHead(node);
// 原本的前驱节点的后继节点设置为null
p.next = null;
// 表示加锁成功了
failed = false;
// 返回中断标志位(这里直接返回了false,说明他是不响应中断的)
return interrupted;
}
// 当再次尝试加锁失败后,就会正常执行
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}acquireQueued()方法就是将节点合理的放入到队列中,并且处理队列中的状态问题。在acquireQueued()方法中如果发现当前等待线程就是队列的最前面的元素,就再次尝试获取锁。如果获取锁成功,就将当前节点作为新的头节点;如果获取锁失败或者不是第一个元素,就会执行shouldParkAfterFailedAcquire()。
// 再次尝试加锁失败后,就会进入该方法(pred当前节点的前驱节点,node表示当前节点)
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 获取它前驱节点的状态
int ws = pred.waitStatus;
// 如果前驱节点的状态为Node.SIGNAL(-1)
if (ws == Node.SIGNAL)
// 返回true,表示需要让该线程进入等待状态
return true;
if (ws > 0) {
// 如果前驱节点的状态大于0,也就是CANCELLED(1)
do {
/*
* 这行代码等价于下面两行代码:
* pred = pred.prev; // pred向前指定一个节点
* node.prev = pred; // 将当前节点的前驱节点向前指向一个节点
*/
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
// 遍历到第一个waitStatus小于0的节点,然后将它的后继节点更新为当前节点
pred.next = node;
} else {
// 通过CAS的方式将当前节点的waitStatus设置为 -1
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}shouldParkAfterFailedAcquire()方法主要是为了更新它的前驱节点的状态为-1。在队列中Node有一个waitStatus来表示状态,在创建Node对象的时候这个默认为0。在新的节点加入队列之前,它必须要去找到在它之前的第一个不为CANCELLED状态(只有可能是0或者PROPAGATE,CONDITION是条件队列的节点)的节点,并将他waitStatus的值修改为-1,表示在这个队列出队之前需要唤醒后面的节点。
如果shouldParkAfterFailedAcquire()方前驱节点的waitStatus不是SIGNAL,他就会返回false,然后在acquireQueued()方法中进入下一次循环。在它依然会再次尝试获取锁,然后再执行shouldParkAfterFailedAcquire()方法,并且获得返回值true。然后就会执行parkAndCheckInterrupt()方法。
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}parkAndCheckInterrupt()方法的作用就是利用LockSupport暂停这个线程。当这个线程被再次唤醒的时候,他会就会接着从这里开始执行。首先就是检查当前线程的中断标志为,并清除中断标志位,意思就是如果当前线程中断标志位true,这里就返回true,然后再将其设置为false。
当线程被唤醒且当前线程的中断标志位设置为ture,parkAndCheckInterrupt()就会返回true,它就会再次进入acquireQueued()方法的循环。直到该线程获取到了锁,就执行相关操作后返回true(中断标志位)。然后就会执行selfInterrupt()方法。
// 重新将该线程的中断标志位设置为true
static void selfInterrupt() {
Thread.currentThread().interrupt();
}此时用户的线程就可以根据这个中断标志位来进行特殊的处理。例如获取到锁执行业务的过程中,线程被中断掉了,我们可以检查中断直接返回。
总结
简单总结下lock()方法的主要逻辑:
尝试将state的值设置为占用状态(CAS机制),如果成功了,将持有锁的线程设置为当前线程,结束流程;
尝试将state的值设置为占用状态失败后,就表示锁已经被其他的线程持有了。然后就会执行acquire()方法,将当前线程封装成Node节点存储到队列中;
在acquire()方法中,当前线程会再次尝试获取锁,如果获取成功了,结束流程;
在acquire()方法中,当前线程再次尝试获取锁失败,就会执行addWaiter()方法。在addWaiter()方法方法中会将当前线程封装成Node节点,然后初始化等待队列,并且返回队列的尾节点(当前线程对应的Node节点)
在addWaiter()结束后队列被初始化成功了,此时就会有三种初始情况:
- 队列中没有元素,当前线程为队列中的第一个处于等待状态的线程

- 队列中有元素,当前线程之前还有处于等待状态的线程

在得到addWaiter()方法返回的尾节点,也就是当前节点后,就会执行acquireQueued()方法。在acquireQueued()方法中会执行当前线程真正的暂停。在进入acquireQueued()方法的时候,会根据上面的四种方法做出不同的响应:
- 当前线程为队列中等待的第一个线程:再次尝试获取锁,如果获取锁成功了,流程结束;
- 当前线程不为当队列中等待的第一个线程,且它的前驱节点为waitStatus的值为SIGNAL(-1):直接暂停当前线程;
- 当前线程不为队列中等待的第一个线程,且它的前驱节点存在waitStatus大于0(CANCELLED)节点:向前遍历找到第一个waitStat小于等于0的节点,然后修改当前节点的前驱节点为目标节点,目标节点的后继节点为当前节点(将CANCNELLED状态的节点直接从队列中去掉)。
按照上述流程,绘制对应的流程图如下所示:

写到这里的时候,其实有点疑惑了?既然我们讨论的是非公平锁,那么为啥只有队列的第一个等待线程才可以获取锁呢?理论上来讲的话,不应该是所有线程都来获取锁吗?但是p == head会导致只有一个线程来获取锁。关于这里的疑惑,我们会在了解了公平锁之后再给出答案。
unlock()方法
ReentrantLock的unlock()方法的作用就是让持有锁的线程释放掉锁。
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
/**
* 修改state的值,重入锁的话会判断解锁次数与加锁次数之间的关系
* 修改持有锁的线程为null
* 完成解锁的话返回true,反之,返回false
*/
if (tryRelease(arg)) {
// 获取队列的头节点
Node h = head;
// 如果头结点不为空,且头结点的状态为不为0
if (h != null && h.waitStatus != 0)
// 唤醒后续的线程
unparkSuccessor(h);
return true;
}
return false;
}释放锁的时候直接调用的就是Sync类的release()方法,从实现层面上来讲的话,对于释放而言它是不区分公平锁和非公平锁的。
这里需要理解下这个判断h != null && h.waitStatus !=0 条件:
- 先来看前部分
h!=nulll,如果h的值为null,那么说等待队列还没有初始化,也就表示没有需要唤醒的线程。等待队列没有被初始化,意思是锁资源并没有产生竞争,两个线程交替执行,它们之前完全遵循A获取锁 --> A释放锁 --> B获取锁 --> B释放锁,且在时间上不重叠。 - 再来看后部分
h.waitStatus !=0,当h.waitStatus的值为0,表示当前节点没有唤醒后继节点的责任。那么h.waitStatus的值在哪些场景下会等于0呢:
- 队列刚刚初始化完成,当第一个线程获取锁失败,调用addWaiter()方法和enq()方法入队时,会初始化队列。此时可能队列的头节点还是一个哑节点,那么waitStatus的值就是默认值,也就是0;这个时候获取锁的线程来释放锁的时候,就可能发现waitStatus的值为0;这里它不会去唤醒后继节点,因为它无法找到后继节点,而是直接释放,那么新的线程在初始化完成之后会再次尝试获取锁,新线程获取锁成功后就会修改head节点的值。
- 当这个线程是最后一个等待线程,此时它被唤醒后获取到锁了,它就成为了新的头节点。那么它的waitStatus的值就继承的是它作为普通节点的值0。
- 当这个线程获取到锁后,它成为新的头节点,但是它的后继节点还没来得及修改它的状态waitStatus,它就释放锁了。此时后继节点在设置完它的waitStatus之后再次尝试获取锁,获取锁就成功了。
所以unlock()在解锁的时候只有在队列中有等待的线程时才会去唤醒下个线程来获取锁,在其他的场景下都是依赖于需要获取锁的线程通过自旋来尝试获取锁。
tryRelease()方法
在进入Sync#release()方法的时候,他会的执行tryRelease()方法,这个方法是AQS的抽象方法,因此具体的实现还是在Sync类中。
protected final boolean tryRelease(int releases) {
// 获取当前锁的状态减去本次要释放的数量(重入的概念,多次加锁也需要多次释放锁)
int c = getState() - releases;
// 如果当前线程不是持有锁的线程,就直接抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
// 释放标志,默认为false
boolean free = false;
// 如果state=0表示释放完毕了
if (c == 0) {
// 释放标志设置为true,表示成功释放
free = true;
// 将持有锁的线程设置为null
setExclusiveOwnerThread(null);
}
// 重新设置state的值,这里是因为重入锁的问题,解锁次数小于加锁次数的时候,还不能完全释放锁资源
setState(c);
// 返回锁释放的结果
return free;
}当state的值等于0的时候,表示所有的加锁次数都通过解锁次数完成了,那么当前线程在释放锁资源之后就要唤醒其他的线程来获取锁。
unparkSuccessor()方法
// node传递的是队列的头节点
private void unparkSuccessor(Node node) {
// 获取节点的状态
int ws = node.waitStatus;
// 如果节点的状态小于0(SIGNAL、CONDITIAON、PROPAGATE)
if (ws < 0)
// 通过CAS将节点的waitStatus修改为0
compareAndSetWaitStatus(node, ws, 0);
// 获取头节点的下一个节点(处于对首的等待线程)
Node s = node.next;
// 如果没有等待的线程或者等待线程的状态已经是CANCELLED了
if (s == null || s.waitStatus > 0) {
// 将队首的节点置为null,表示废弃掉了
s = null;
// 如果此时队列中还有节点,就从尾部向前遍历
for (Node t = tail; t != null && t != node; t = t.prev)
// 这里会循环不断地给s进行赋值,最后的复制就是从头部开始遍历的第一个waitStatus<=0的节点
if (t.waitStatus <= 0)
s = t;
}
// 这里判断是为了避免队列里面没有等待线程了,或者等待线程的waitStatus都大于0,最终s的值为null
if (s != null)
// 当找到下一次获取锁的节点时,唤醒这个线程
LockSupport.unpark(s.thread);
}unparkSuccessor()的方法的主题逻辑会很清晰的,它的目的就是唤醒从头部往后遍历的第一个waitStatus<=0的节点。
总结
整个unlock()方法只做两件事情:
- 修改基本的状态值:state的值和exclusiveOwnerThread的值;
- 唤醒队列中的下一个waitaStatus<=0的节点中的线程,如果队列中没有等待唤醒的节点就直接结束;

公平锁模式
前面我们了解到了非公平锁的模式,在unlock()方法它并没有去区分公平锁和非公平锁,这是因为公平锁和非公平锁之间最大的区别就是获取锁时机。对于tryLock()方法而言,它其实最终都是调用的Sync#nonfairTryAcquire()方法,所以也是没有区别的。因此公平锁和非公平锁的最大区别就体现在lock()方法上。
lock()方法
在公平锁模式下,lock()方法的具体实现在FairSync#lock()方法中,它非常简单就是直接调用acquire()方法进入等待队列中进行等待。
final void lock() {
acquire(1);
}我们再来回顾下非公平锁模式下的lock()方法,它的具体实现在NonfairSync#lock()方法中:
final void lock() {
// 尝试去修改state的值
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}所以所非公平锁在加锁的时候,它不会理会队列中是否有处于等待状态的线程,直接就尝试去加锁,如果加锁成功饿了它就获取到锁了。但是公平锁,它不会直接去加锁而是进入队列中进行等待。
其实针对这两点区别就可以回答上文中的为什么每次只有head.next可以去尝试加锁以及唤醒的时候只唤醒第一个waitStatus<=0的后继节点,最大的原因是因为:AQS的本质还是队列,既然是队列就需要维护FIFO的原则,所以当线程进入到队列的时候,其实它获取锁的顺序就是固定的。那么非公平锁的特点就是,当它还没有进入队列的时候先插队抢一下,抢成功那就直接持有锁,不成功了再进入队列等待;公平锁的特点就是,它会直接进入队列等待。
Condition模式
Condition是Java并发包中的一个接口,它必须与一个Lock对象(通常是ReentrantLock)配合使用。在AQS的实现中,Condition的具体实现类是AbstractQueuedSynchronizer.ConditionObject。
Condition是一个条件变量,它允许线程在某个特地条件不满足的时候挂起等待,并在满足条件的时候被唤醒。Condition主要解决的是线程间协作的问题,尤其是在生产者-消费者模型中非常关键。
ReentranLock中的Condition使用方式
来看一下Condition的常见例子,控制三个线程交替执行,并且将count的值交替执行到100。
@Slf4j
public class AppTest {
public static final String API_KEY = "sk-f3ba88dc77bb4e9489feb0362c4aa805";
private static int count = 0;
private static int currentThread = 1;
private static ReentrantLock reentrantLock = new ReentrantLock(true);
private static void run(Integer threadId, Integer target, Condition condition1, Condition condition2, Condition condition3) {
try {
reentrantLock.lock();
// 不该由当前线程执行,直接睡眠
while (count <= 100) {
if (currentThread != threadId) {
condition1.await();
}
// 由该线程执行(执行业务)
count++;
log.info("==========>>>>>>>>>> count:{}", count);
currentThread = target; // 设置下一个要执行的线程为2号线程
if (count < 100) {
condition2.signal();
condition3.signal();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if (reentrantLock.isHeldByCurrentThread() && reentrantLock.isLocked()) {
reentrantLock.unlock();
}
}
}
@Test
public void test() throws InterruptedException {
Condition condition1 = reentrantLock.newCondition();
Condition condition2 = reentrantLock.newCondition();
Condition condition3 = reentrantLock.newCondition();
Thread thread1 = new Thread(() -> run(1, 2, condition1, condition2, condition3));
Thread thread2 = new Thread(() -> run(2, 3, condition2, condition3, condition1));
Thread thread3 = new Thread(() -> run(3, 1, condition3, condition1, condition2));
thread1.start();
thread2.start();
thread3.start();
thread1.join();
thread2.join();
thread3.join();
}
}对于await()方法而言它的流程为:
- 释放锁:当前线程必须持有锁。
- 加入条件队列:将当前线程节点加入到该Condition的等待队列尾部。
- 挂起线程
- 重新竞争锁:当线程被signal()方法唤醒后,它会重新尝试获取锁;
对于singal()方法而言它的流程为:
- 获取锁:调用singal()的线程必须持有锁;
- 移动节点:从条件队列的头部取出一个节点,将其从条件队列中移除;
- 加入同步队列:将该节点加入到AQS的同步队列尾部,并将其waitStatus设置为SIGNAL;
- 唤醒线程:如果该节点的waitStatus原本的值为0,说明之前没有在同步队列中等待,现在需要唤醒它;
创建Condition对象
创建Condition对象是通过ReentrantLock#newCondition()方法来创建:
public Condition newCondition() {
return sync.newCondition();
}
final ConditionObject newCondition() {
return new ConditionObject();
}它的调用链路是:ReentrantLock#newCondition() --> Sync#newCondition() --> AQS.ConditionObject#new完成一个ConditionObject的初始化。实际上就是实例化一个ConditionObject类的对象,那么来看下这个ConditionObject的结构:
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
// 条件队列中的头节点
private transient Node firstWaiter;
// 条件队列中的尾节点
private transient Node lastWaiter;
// 省略方法代码
}根据ConditionObject的属性可以看出这个Condition它也是一个队列(基于链表实现的)的,队列中的每一个元素的类型仍然是Node。所以当我们每次调用Reentrant#newCondition()方法的时候,它都会创建一个条件等待队列。
await()方法
当我们调用Condition#await()方法的时候,实际上调用的是具体的实现类里面的ConditionObject#await()方法。
public final void await() throws InterruptedException {
// 如果在await()之前判断当前线程的中断标志位,如果中断标志位为true,就清除并返回true
if (Thread.interrupted())
// 抛出中断异常(线程已经被中断了,没有进入条件队列的必要的,直接抛出异常让用户处理这个中断就行了)
throw new InterruptedException();
// 构建一个条件等待队列的节点node(将当前节点插入到条件队列的尾部)
Node node = addConditionWaiter();
// 释放锁并返回加锁的次数
int savedState = fullyRelease(node);
// 中断模式
int interruptMode = 0;
/**
* 这里是await()的核心逻辑
* 每次调用await()方法都是创建一个新的Node节点,且waitStatus的值为CONDITON,然后将他插入到条件队列的尾部
* 因为waitStatus的值是CONDITIOn状态,isOnSyncQueue()方法返回的是false,所以会执行LockSupport.park(this)暂停当前线程
* 当调用signal()方法之后,他会将条件队列的中的节点放入到同步队列中,并唤醒处于等待的线程
* 当前线程被唤醒后,机会继续判断,此时isOnSyncQueue()方法返回的就是true,这样子就会往后执行了
*/
while (!isOnSyncQueue(node)) {
// 暂停当前线程
LockSupport.park(this);
/**
* 在当前线程因为await()方法而处于等待状态时,检测当前线程是否被中断,并根据中断模式决定是否立即抛出中断异常或延迟处理
* 该方法会尝试将当前节点从条件队列转译到同步队列,以便线程能够重新竞争锁
* 如果转移成功,说明线程在等待期间被中断,且中断发生在signal()方法调用之前,应该抛出异常(THROW_IE)
* 如果转移失败,说明中断发生在singal()方法调用之后,应延迟恢复中断状态(REINTERRUPT)
*/
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 将当前节点被唤醒后,当前线程开始抢锁,抢锁失败又被暂停了
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 如果当前线程抢到锁之后也执行完毕了相关的任务,判断当前节点是否还有后继节点
if (node.nextWaiter != null)
unlinkCancelledWaiters();
// 根据中断的模式来响应对应的中断
if (interruptMode != 0)
// 处理中断标志位
reportInterruptAfterWait(interruptMode);
}在await()方法中,主要要做以下几件事情:
- 将当前线程封装成一个Node对象,如果此时条件队列还没有初始化,就初始化条件队列,反之就将当前节点作为新的尾节点加入到等待队列中。
- 释放锁,当前线程在调用await()方法的时候必须获取到锁了,否则就会抛出InterruptException异常;在释放锁的时候,释放锁失败就会抛出异常,并且将当前节点的waitStatus的值设置为CANCELLED。
- 当锁被释放后,会通过循环来不断的判断Node节点是否已经加入到了同步队列中(由signal()方法完成)。在第一次进入await()方法的时候,因为新节点的waitStatus的值为CONDITON,所以一定会进入循环,此时当前线程被
LockSupport.park(this)暂停了。当有其他的线程执行了singal()方法后,当前线程被唤醒了,再次判断当前线程是否在同步队列中,此时节点已经在同步队列中了,所以就会跳出循环了。 - 当前线程因为signal()方法被唤醒后,就会再次进入循环中来尝试获取锁,当获取锁失败的时候会再次被LockSupport.park()方法暂停。直到当前线程获取到了锁,就跳出循环;
在await()方法中还有一个很重要的机制,就是避免线程调用await()方法进行暂停状态后,被其他的线程通过interrupt()方法唤醒了。此时就需要通过checkInterruptWhileWaiting()方法来进行判断的。
// 检查当前线程在waiting期间是否被中断了
private int checkInterruptWhileWaiting(Node node) {
// 检查当前线程的中断标志位,如果中断标志位true,就会执行transferAfterCancelledWait()来进一步判断中断的类型,反之就会返回0表示没有被中断
// transferAfterCancelledWait()返回true表示的是在await()方法期间被中断了
// transferAfterCancelledWait()返回false表示的是在signal()方法调用后被中断了
return Thread.interrupted() ? (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}
// 这里就是判断当前线程中断的类型
final boolean transferAfterCancelledWait(Node node) {
// 如果是被中断唤醒的线程,它的节点还是在等待队列中,依然是需要将等待队列中的节点移动到同步队列中
// 将当前节点的waitStatus的值修改为0,做进入同步队列之前的准备工作
if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
// 将当前节点加入到同步队列的尾部
enq(node);
// 操作成功后返回true
return true;
}
// 这里可能是其他线程调用signal()方法后当前线程被其他线程中断了,所以此时当前节点已经在同步队列了,上面的CAS操作一定会失败的
// 这里判断下当前节点是否在同步队列中,如果不在,说signal()方法调用后还没来得及将当前节点加入到同步队列,就让出CPU使用权,自旋等待
// 直到再次循环检测到当前节点已经在同步队列中了,那么就返回false,表示是在signal()方法调用之后被中断的
while (!isOnSyncQueue(node))
Thread.yield();
return false;
}在线程被唤醒后,通过checkInterruptWhileWaiting()方法可以检查到它的中断类型:
- THROW_IE:表示在await()方法期间被其它线程中断掉了;
- REINTERRUPT:表示在执行signal()方法当前线程被唤醒后被中断掉了;
根据不同的中断模式最终由reportInterruptAfterWait()方法来进行真正的处理;
private void reportInterruptAfterWait(int interruptMode)
throws InterruptedException {
// 如果当前现在在await()期间被中断了,就抛出InterruptedException异常
if (interruptMode == THROW_IE)
throw new InterruptedException();
// 如果当前线程是在signal()方法执行后被中断了,就通过恢复他的中断标志位,由用户的业务代码来捕获这个中断标志来进行处理
else if (interruptMode == REINTERRUPT)
selfInterrupt();
}😢😢😢 其实看到这里的时候还是有点懵逼的,感觉主体流程是掌握了,但是实际细节又不太清楚,其实这里是需要结合signal()方法来一起看的。因为await()方法是需要通过signal()方法来唤醒的,我们要了解singal()方法之后才能知道部分属性的变化。
addConditionWaiter()方法
将当前线程包装成一个新的Node对象,它的waitStatus的值为CONDITON。此时判断条件等待队列是否初始化了,如果初始化了就通过尾插法将当前节点插入等待队列的尾部;反之,将初始化等待队列后再将当前节点插入到等待队列的尾节点。
private Node addConditionWaiter() {
// 获取条件队列的尾
Node t = lastWaiter;
// 尾节点不为null,且尾节点的状态不是CONDITION
if (t != null && t.waitStatus != Node.CONDITION) {
// 从条件队列中去掉waitStatus的值为不是CONDITON的节点
unlinkCancelledWaiters();
// t是新的尾节点(防止尾节点被去掉了)
t = lastWaiter;
}
// 如果尾节点为null,或者CONDITION状态就构建一个新的节点
Node node = new Node(Thread.currentThread(), Node.CONDITION);
// 初始化,此时判断尾节点是否为null,如果尾节点为null的话,就说名条件队列还没有初始化
if (t == null)
// 进行初始化
firstWaiter = node;
else
// 如果不是的话,将当前节点插入到条件队列的尾部
t.nextWaiter = node;
// 将当前节点设置为新的尾节点
lastWaiter = node;
// 返回当前节点
return node;
}fullyRelease()方法
在将当前节点保存到等待队列后,就释放掉了当前线程持有的锁:
final int fullyRelease(Node node) {
// 释放失败的标识,默认的是失败的
boolean failed = true;
try {
// 获取当前锁的状态
int savedState = getState();
// 释放锁
if (release(savedState)) {
failed = false;
// 返回加锁的次数
return savedState;
} else {
// 这里释放失败有可能是当前线程还没有获取到锁,所以await()方法必须要先加锁,否则就抛出异常
throw new IllegalMonitorStateException();
}
} finally {
// 最终如果解锁失败,就将当前节点的waitStatus设置为CANCELLED
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
signal()方法
signal()方法是唤醒调用await()方法被暂停的线程。
public final void signal() {
// 当前线程是否持有锁,不持有锁的话,就抛出IllegalMonitorStateException异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
// 获取条件队里的第一个节点
Node first = firstWaiter;
// 如果条件队列的第一个节点为null,则表示条件队列中没有在等待的线程
if (first != null)
// 如果条件队列中有等待的线程,就唤醒第一个等待的线程
doSignal(first);
}signal()方法只是简单的判断了当前线程是否持有锁以及条件队列中是否存在等待的线程。真正执行唤醒逻辑的是doSignal()方法:
private void doSignal(Node first) {
// 在一个循环里面
do {
// 如果条件队列里面只有当前线程一个,那么需要将头结点置为null,并将尾节点也置为null
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
// 将当前节点的后继节点置为null,表示将当前节点从条件队列的中删除掉
first.nextWaiter = null;
}
/**
* transferForSignal()方法是整个Signal过程中的核心代码
*/
while (!transferForSignal(first) &&(first = firstWaiter) != null);
}在doSignal()方法中,通过一个循环来进行判断,其中重要的是transferForSignal()方法,它负责将一个条件队列中的Node节点转移到同步队列中。
final boolean transferForSignal(Node node) {
// 通过CAS的方式将当前节点的waitStatus的值修改为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
// 如果修改失败,返回false
return false;
// 将新的节点加入到同步队列中,插入到同步队列的尾节点,并返回当前节点的前驱节点
Node p = enq(node);
// 获取前驱节点的waitStatus
int ws = p.waitStatus;
// 如果前驱节点的状态大于0,也就是CANCELLED,就立即唤醒当前线程,因为前驱节点已经不会通知当前节点了
// 如果前驱节点的状态是小于等于0的,此时会尝试修改前驱节点的状态为-1,修改失败说明前驱节点在获取到比较期间已经被修改了,立即唤醒当前线程
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
// 唤醒当前线程
LockSupport.unpark(node.thread);
return true;
}在transferForSignal()方法中会先将当前节点的waitStatus从CONDITION修改为0,表示当前节点不再条件队列中等待了。将修改后的当前节点通过enq()方法加入到同步队列的尾部,并返回当前节点的前驱节点。最后需要根据前驱节点的状态来进行相应的操作:
- 如果前驱节点的状态是CANCELLED,那就说明前驱节点已经不具备唤醒当前节点的能力,那么立即唤醒当前节点,避免陷入无限等待(自救机制);
- 如果前驱节点的状态不是CANCELLED,然后就通过CAS去修改,如果CAS修改失败了,说明发生了并发修改。此时前驱节点的状态是不确定的,有可能因为中断或者超时被其他的线程修改为了CANCELLED,那么立即唤醒当前节点。
当节点被唤醒后就会在await()方法中进入到acquireQueued()方法循环中开始尝试加锁。按照上述的分析,最终signal()方法的具体流程就如下图所示:

总结
经过上面大部分内容的分析,最后可以得到就是AQS的整体结构:

在AQS内部存在多个队列,其中一个是同步队列(也称之为CHL队列,我习惯称之为同步队列,如上图中的AQS对应的队列)。当我们每次调用newCondition()方法的时候都会创建一个条件队列(如上图中ConditionObject对应的队列)。
AQS内部的同步队列只会有一个,但是条件队列是可以有多个的
当我们创建一个AQS的实例的时候, 它的初始状态如下图所示:

1️⃣当我们创建了一个AQS的时候,此时在AQS的实例中:tail=null,head=null,state=0,exclusiveOwnerThread=null,这是AQS的初始状态。
2️⃣此时如果有一个线程成功获取到锁了,那么state的值就会变为1,tail和head的值仍然是null。这是因为并没有产生锁竞争,也就没有必要初始化同步队列。如果当前线程释放后,其他线程再来获取锁,依然是没有产生锁竞争所以也不会有有同步队列的初始化。只有在当前线程持有锁期间,有其他的线程来获取锁才会进行同步队列的初始化。
这里需要注意的是lock()方法一定会进行同步队列的初始化的,但是tryLock()是不一定的。如果是非公平锁的,加锁失败了会直接返回false,如果公平锁的话,都会初始化同步队列。
3️⃣在当前线程持有锁的期间,有其他的线程来获取锁,当前线程就会被封装成Node节点加入到同步队列中。在加入同步队列中还有其他两个操作:
- 如果当前同步队列还没有初始化,就会初始化当前队列,此时会创建一个哑节点(waitStatus=0)作为head节点;然后再将这个Node节点作为head节点的后继节点加入到队列中,如下面的thread2所在的节点加入到同步队列时。
- 当节点加入到同步队列后,会向前寻找到第一个状态不为CANCELLED的节点,将该节点作为它的前驱节点,并且该节点的后继指针指向自己。然后再利用CAS将该节点的waitStatus的值修改为SIGNAL,意思是当该节点释放锁的时候的需要唤醒它的后继节点。如下图thread3加入队列后,将thread2所在节点的waitStatus修改为SIGNAL。

4️⃣当持有锁的线程被释放掉后的,如果是非公平锁的话不一定是同步队列的第一个不是CANCELLED状态的节点来尝试获取锁,也有可能是其他新来的线程来获取;如果公平锁,同步队列的第一个不为CANCELLED节点来获取。

当前节点(如上图thread2节点)在获取锁之后,会将自己设置同步队列头节点的指针指向自己,然后将exclusiveOwnerThread设置为自己,同时将自己的prev指针设置为null,thread属性设置为null。所以在同步队列中,head节点表示的当前获取锁的线程。
5️⃣如果当前现在获取到锁后,调用了ConditionObject#await()方法就会进入条件队列中进行等待(如图中thread2所在的节点)。

首先调用awat()方法的当前线程封装成一个新的Node节点,这个Node节点的waitStatus设置为CONDITION(-2)。然后将这个新的Node加入到条件队列中,如果条件队列没有初始化,就将条件队列的firstWaiter和lastWaiter都指向这个新的节点,反之就将当前节点插入到条件队列的尾部。
其次在节点加入到条件队列后,就会当前线程就需要释放锁。释放锁后,就会让其他的线程抢到锁,成为同步队列新的头节点。
6️⃣此时如果有其他的线程调用了signal()方法,就会唤醒条件队列的第一个节点。此时第一个节点就会离开条件队列,进入同步队列(如上图中thread6所示)。

从这看到调用了signal()并不意味着,之前调用await()的线程会马上执行,而是在抢到锁之后再回从调用await()方法之后继续执行。