适合30岁短期培训班深圳网站制作十年乐云seo品牌
关于 AQS,网上已经有无数的文章阐述 AQS 的使用及其源码,所以多这么一篇文章也没啥所谓,还能总结一下研究过的源码。源码解析和某某的使用,大概是互联网上 Java 文章中写得最多的主题了。
AQS
AQS 是 AbstractQueuedSynchronizer 的缩写,中文翻译过来就是抽象队列同步器。ReentrantLock、ReentrantReadWriteLock、Semaphore、CountDownLatch 都是基于 AQS。AQS 的核心思想是,当线程请求获取资源时,如果资源空闲,则会将当前线程设置为资源的独占线程,成功获得锁;否则将获取锁失败的线程加入到排队队列中(CLH),并提供线程阻塞和线程唤醒机制。CLH 是一个虚拟的双向队列。
首先看一下 AQS 的关键属性。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
private transient volatile Node head; //队列的头节点
private transient volatile Node tail; //队列的尾节点
private volatile int state; //同步状态
 
state 用于实现锁的可重入性。
- 0 为表示没有线程持有该锁
 - 当存在线程获取锁成功,则 state 变为 1。如果是同一线程重复获得,则 state++
 - 如果存在线程释放锁,则 state–
 
上面的三个属性都存在对应的以 CAS 方式进行修改的方法,state 对应的是 compareAndSetState() 方法, head 对应的是 compareAndSetHead() 方法,tail 对应的是 compareAndSetTail()。以 CAS 的方式修改值能避免锁的竞争。
因为请求获取锁的线程会以 Node 节点的方式在 CLH 队列中排队,在分析 AQS 机制时也会大量涉及到 Node 节点,所以很有必要对 Node 节点进行分析。
CLH 队列中 Node 节点。
// java.util.concurrent.locks.AbstractQueuedSynchronizer.Node
volatile int waitStatus;  //当前节点在队列中的状态
volatile Node prev;  //前驱节点
volatile Node next;  //后继节点
volatile Thread thread;  //在该节点中排队的线程
Node nextWaiter;  //下一个处于condition或共享状态的节点//等待锁的两种状态
static final Node SHARED = new Node();  //共享
static final Node EXCLUSIVE = null; //独占//waitStatus的几个值 
static final int CANCELLED =  1;  //已取消
static final int SIGNAL    = -1;  //后继节点的线程需要唤醒
static final int CONDITION = -2;  //节点处于等待队列中
static final int PROPAGATE = -3; //线程处在SHARED情况下使用该字段
 
ReentrantLock
说是研究 AQS 源码,但 AQS 毕竟是一个抽象类,只实现了部分方法,另外一些方法会在子类中实现。所以我们也同样会涉及到 ReentrantLock 源码的研究。
ReentrantLock 的通常使用方式是:
class X {    private final ReentrantLock lock = new ReentrantLock();       public void m() {      lock.lock();       try {        // ... method body      } finally {        lock.unlock()      }    }  
}
 
ReentrantLock 存在两种锁:公平锁(FairSync)和非公平锁(NonfairSync),默认为非公平锁。使用公平锁时,线程会直接进入队列中排队,只有队列中第一个线程才能获取锁;使用非公平锁时,线程会先尝试获取锁,成功则占用锁,失败则在队列排队。对于 AQS 来说,公平锁和非公平锁的绝大部分方法都是共用的。
ReentrantLock 的主要方法有两个,一是使用 lock() 方法加锁,二是使用 unlock() 方法解锁。
加锁
非公平锁
我们首先来分析非公平锁的加锁过程。
// java.util.concurrent.locks.ReentrantLock.NonfairSync
final void lock() {  if (compareAndSetState(0, 1))  //通过cas方式设置同步状态setExclusiveOwnerThread(Thread.currentThread());  //成功,设置为独占线程else        acquire(1);  //失败,获取锁
}
 
在 lock() 方法中,会先尝试通过 CAS 的方式去获取锁,成功则设置为独占线程,否则执行 acquire() 方法。
// java.util.concurrent.locks.AbstractQueuedSynchronizer
public final void acquire(int arg) {  if (!tryAcquire(arg) &&  //尝试获取锁acquireQueued(addWaiter(Node.EXCLUSIVE), arg))  // 失败,则添加到等待队列selfInterrupt();  
}
 
在 acquire() 方法中,会再次尝试去获取锁,如果失败则加入到排队队列。
// java.util.concurrent.locks.ReentrantLock.NonfairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {  return nonfairTryAcquire(acquires);  
}// java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) {  final Thread current = Thread.currentThread();  int c = getState();  if (c == 0) {  //如果当前没有线程持有锁if (compareAndSetState(0, acquires)) {  //state++setExclusiveOwnerThread(current);  //将当前线程设置为独占线程return true;        }  }  else if (current == getExclusiveOwnerThread()) {  //如果已有线程持有锁,且当前线程为独占线程int nextc = c + acquires;  if (nextc < 0) // overflow  throw new Error("Maximum lock count exceeded");  setState(nextc);   //state++,实现锁的可重入性return true;   //获得锁}  return false;  //如果已有线程持有锁,且当前线程不为独占线程,则获取锁失败
}
 
在 nonfairTryAcquire() 方法中,如果资源空闲,则再次尝试通过 CAS 的方式去获取锁。如果当前资源已被当前线程占用,则将 state++,以实现锁的可重入性。
// java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter
// 设置队列尾节点
private Node addWaiter(Node mode) {  Node node = new Node(Thread.currentThread(), mode);   //新建排队节点 Node pred = tail;  //pred指向尾节点if (pred != null) {  //如果Pred指针是Null(说明等待队列中没有元素),或者当前Pred指针和Tail指向的位置不同(说明被别的线程已经修改)node.prev = pred;  //将新建节点的prev指向predif (compareAndSetTail(pred, node)) {  // 设置新建节点为尾节点pred.next = node;  return node;  }  }  enq(node);  return node;  
}//java.util.concurrent.locks.AbstractQueuedSynchronizer#enq
private Node enq(final Node node) {  for (;;) {  Node t = tail;  if (t == null) {   //如果尾节点为空,则说明队列还未初始化,if (compareAndSetHead(new Node()))  //初始化一个头节点tail = head;  } else {  //如果已经初始化,则设置为尾节点node.prev = t;  if (compareAndSetTail(t, node)) {  t.next = node;  return t;  }  }  }  
}
 
如果尝试多次获取锁都失败,则在 addWaiter() 方法中会将线程放到节点中,并设置为排队队列的尾节点。
//java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued
final boolean acquireQueued(final Node node, int arg) {  //是否成功获取到资源boolean failed = true;  try {  //循环等待过程中是否中断过boolean interrupted = false;  for (;;) {  final Node p = node.predecessor();  //获取当前节点的前驱节点if (p == head && tryAcquire(arg)) {  //如果前驱节点为头节点,则尝试获取锁setHead(node);  //获取锁成功,设置当前节点为头节点p.next = null; // help GC  failed = false;  return interrupted;  }  //来到这里,说明前驱节点不是头节点或者获取锁失败。判断当前节点是否要被阻塞if (shouldParkAfterFailedAcquire(p, node) &&  parkAndCheckInterrupt())  interrupted = true;  //线程中断成功}  } finally {  if (failed)  cancelAcquire(node);  }  
}
 
加入到排队队列中后,会在 acquireQueued() 方法中循环等待资源的获取,并判断线程是否需要被阻塞,直到线程获取成功或者抛出异常。
//java.util.concurrent.locks.AbstractQueuedSynchronizer#shouldParkAfterFailedAcquire
//靠前驱节点判断当前线程是否应该被阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {  //获取前驱节点的状态int ws = pred.waitStatus;  if (ws == Node.SIGNAL)  //如果前驱节点处于唤醒状态  return true;  if (ws > 0) {  //前驱节点处于取消状态      do {  //向前查找取消的节点,并将其从队列中删除node.prev = pred = pred.prev;  } while (pred.waitStatus > 0);  pred.next = node;  } else {  compareAndSetWaitStatus(pred, ws, Node.SIGNAL);  //设置前驱节点为唤醒状态}  return false;  
}
 
//java.util.concurrent.locks.AbstractQueuedSynchronizer#parkAndCheckInterrupt
//挂起当前线程,阻塞调用栈,返回当前线程的中断状态
private final boolean parkAndCheckInterrupt() {  LockSupport.park(this);  return Thread.interrupted();  
}
 
整一个非公平锁的加锁流程可以用如下流程图表示:
公平锁
公平锁和非公平锁的流程中只有 lock() 方法和 tryAcquire() 方法存在差别。
//java.util.concurrent.locks.ReentrantLock.FairSync#lock
final void lock() {  acquire(1);  //直接获取锁
}
 
公平锁中会直接调用 acquire() 方法。
// java.util.concurrent.locks.ReentrantLock.FairSync#tryAcquire
protected final boolean tryAcquire(int acquires) {  final Thread current = Thread.currentThread();  int c = getState();  if (c == 0) {  if (!hasQueuedPredecessors() &&  compareAndSetState(0, acquires)) {  setExclusiveOwnerThread(current);  return true;            }  }  else if (current == getExclusiveOwnerThread()) {  int nextc = c + acquires;  if (nextc < 0)  throw new Error("Maximum lock count exceeded");  setState(nextc);  return true;        }  return false;  
}  
 
相对于 nonfair 的 nonfairTryAcquire 方法,在没有线程持有锁时,增加了 hasQueuedPredecessors() 方法的判断,该方法用于判断队列中是否存在线程比当前线程等待时间更长。
// java.util.concurrent.locks.AbstractQueuedSynchronizer#hasQueuedPredecessors
public final boolean hasQueuedPredecessors() {     Node t = tail; Node h = head;  Node s;  return h != t &&  ((s = h.next) == null || s.thread != Thread.currentThread());  
}
 
分析一下这段代码:
- 如果 
h != t为 true,说明队列正在初始化,或者已经初始化完成。- 在 
h != t前提下,如果(s = h.next) == null为 true,说明队列正在初始化,因为在队列初始化过程中,是有可能存在 head 已经被初始化(不再等于 tail 了),但 head.next 还没有被设值为 node,这种情况下,因为队列中已经存在 Node,当前线程需要加到等待队列中,故返回 true。初始化过程在AbstractQueuedSynchronizer#enq()。 - 但如果 
(s = h.next) == null为 false,说明队列中已经存在 Node,则判断该 Node 的线程是否与当前线程相同。如果s.thread != Thread.currentThread()为 true,说明不相同,需要进入等待队列。如果相同,说明当前线程可以获取锁。 
 - 在 
 - 如果 
h != t为 false,说明队列为空,返回 false,说明可以去获得锁。 
另外,s = h.next 这段代码获取的是 head 的下一个节点,因为 head 是虚节点,不存储数据,真正的数据存储在 head.next。
解锁
相对于加锁过程,解锁过程比较简单,且公平锁和非公平锁共用同一个 lock() 方法。
// java.util.concurrent.locks.ReentrantLock#unlock
public void unlock() {  sync.release(1);  
}//java.util.concurrent.locks.AbstractQueuedSynchronizer#release
public final boolean release(int arg) {  if (tryRelease(arg)) {   //如果锁没有被任何线程持有Node h = head;  if (h != null && h.waitStatus != 0)  unparkSuccessor(h);  //解除后继节点的线程挂起return true;   }  return false;  
}
 
关于代码 h != null && h.waitStatus != 0,分为以下几种情况:
h==null,说明头节点还未初始化。h!=null && h.waitStatus==0,说明后继节点还在运行中。因为如果节点已被取消,waitStatus 会被设置为 1;如果后继节点需要唤醒,waitStatus 会被设置为 -1;如果节点正在排队,waitStatus 则会设置为 -2。如果 waitStatus 为 0,说明已经处于运行中。h != null && h.waitStatus != 0则会去唤醒后继节点。
在 release() 方法中,会先尝试去解锁,如果解锁成功且后继节点需要唤醒,则将后继节点取消挂起。
//java.util.concurrent.locks.ReentrantLock.Sync#tryRelease
//更新state状态,如果重入次数为0,则将锁的独占线程设置为null
protected final boolean tryRelease(int releases) {  //减少可重入次数int c = getState() - releases;  if (Thread.currentThread() != getExclusiveOwnerThread())  //如果当前线程不是该锁的独占线程,则抛出异常throw new IllegalMonitorStateException();  boolean free = false;  //该锁是否已被释放if (c == 0) {  //如果可重入次数已为0free = true;  setExclusiveOwnerThread(null);  //将锁的独占线程设置为null,表明不再有线程持有该锁}  setState(c);  //修改锁的状态return free;  
}
 
在 tryRelease() 方法中,会去减少锁的可重入次数,当可重入次数为 0 时,清空锁的独占线程。
// java.util.concurrent.locks.AbstractQueuedSynchronizer#unparkSuccessor
private void unparkSuccessor(Node node) {     int ws = node.waitStatus;  //获取头节点的waitStatusif (ws < 0)  //如果节点的waitStatus为负数,说明后继节点需要被唤醒或者正在排队compareAndSetWaitStatus(node, ws, 0);  //清除节点当前的waitStatus,设置为0  Node s = node.next;  //获取头节点的后继节点if (s == null || s.waitStatus > 0) {  //如果节点为null,或者已被取消s = null;  for (Node t = tail; t != null && t != node; t = t.prev)  //从尾节点开始向前遍历if (t.waitStatus <= 0)  //找到队列中第一个不被取消的节点s = t;  }  if (s != null)  //如果找到了,则将该节点取消挂起LockSupport.unpark(s.thread);  
}
 
在 unparkSuccessor() 方法中,会从尾节点开始从后往前遍历,找到队列中第一个没有被取消的节点,将该节点取消挂起。
整个解锁流程可以用如下流程图表示。
