专注于快乐的事情

AbstractQueuedSynchronizer学习

AQS简介

AbstractQueuedSynchronizer,抽象的队列式的同步器,简称AQS。
AQS是Java并发框架的一个基础组件,java.util.concurrent并发包下很多类都是基于它实现的,如:ReentrantLock、ReentrantReadWriterLock、信号量、线程池等。

AQS有两种工作模式:独占模式共享模式
AQS是独占锁,例如ReentrantLock中Sync的公共父类。
AQS为共享锁,例如ReentrantReadWriteLock,Semaphore的Sync公共父类。

其结构图如下:

png

它的所有子类中,要么实现并使用了它独占功能的API,要么使用了共享锁的功能,而不会同时使用两套API。

以ReentrantReadWriterLock为例:
如果一个线程(写线程)以独占模式(acquire)获取到这个锁,那么其他线程(读或写线程)不管以哪种模式尝试获取锁(共享模式是通过acquireShared获取的),都会失败;
如果一个线程(读线程)以共享模式获取到这个锁,那么其他读线程可以获取到锁,而写线程会获取失败。

api简介

不响应中断的独占锁(acquire)
响应中断的独占锁(acquireInterruptibly)
不响应中断的共享锁(acquireShared)
响应中断的共享锁(acquireSharedInterruptibly)
独占锁的释放(release)
共享锁的释放(releaseShared)

公平与非公平通过类FairSync,NonfairSync进行区分

AQS实现

同步状态

AbstractQueuedSynchronizer内置一个int32位字段state来保存同步状态,并暴露出getState、setState以及compareAndSet操作来读取和更新这个同步状态。其中属性state被声明为volatile,并且通过使用CAS指令来实现compareAndSetState。

子类通过继承同步器并需要实现它的方法来管理其状态,管理的方式就是通过类似acquire和release的方式来操纵状态。

ReentrantLock使用AQS的时候,state被用来表示锁被重入的次数;
当Semaphore使用AQS的时候,state则被用来表示当前还有多少信号量可被获取。

同步队列管理

AQS会对进行 acquire 而被阻塞的线程进行管理,其实就是内部维护了一个FIFO队列,这个队列是一个双向链表。链头可以理解为是一个空的节点,除了链头以外,每个节点内部都持有着一个线程。

acquire操作

f(尝试获取成功){
    return;
 }else{
     加入等待队列;park自己
}

releas

if(尝试释放成功){
    unpark等待队列中第一个节点
}else{
    return false
}

整体实现

为了实现上述操作,需要下面三个基本组件:

1) 同步状态的原子性管理;

2) 线程的阻塞与解除阻塞;

3) 队列的管理;

用一个列表表示

组件 实现
同步状态 volatile int state
阻塞 LockSupport类
同步队列 Node节点
条件队列 ConditionObject

代码整体结构

AQS同步队列的主要功能是将无法获得资源的线程放入同步队列中,进行等待,它是通过链表来实现的,每一个节点对应一个任务线程。在AbstractQueuedSynchronizer类中用静态内部类Node来作为链表的数据结构。

AQS参考了CLH锁的设计,但AQS没有采用CLH中的自旋来查看前驱(prev)节点的状态
AQS中,队列节点是放在一个双向链表结构中的。

静态内部类Node类的主要属性:

1
2
3
4
5
6
7
8
9
Node {
int waitStatus; //节点状态
Node prev;
Node next;
/** 因为条件只能是独占的,所以nextWaiter指向的是下一个等待该条件的Node。当然,在共享模式中,nextWaiter会被设置成一个特殊值:SHARED*/
Node nextWaiter;
/** Node上绑定的线程,由构造函数传入,用完后需要set null */
Thread thread;
}

Node的非静态属性,因为是双向链表,所以有prev和next,还有一个thread,用来记录该节点对应的线程,还有一个表示该节点状态的waitStatus,它有四种状态:

SIGNAL,值为-1,表示当前节点的next节点需要获取资源数,也就是需要unpark
CANCELLED,值为1,表示当前的线程被取消
CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中
PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够执行
0,新建的Node的状态都是0,表示初始状态。

AbstractQueuedSynchronizer类拥有三个成员变量:sync队列的头结点head、sync队列的尾节点tail和状态state。

AQS中资源数(或者锁)的个数是放在state,
private volatile int state;

对于锁的获取,请求形成节点,将其挂载在尾部,而锁资源的转移(释放再获取)是从头部开始向后进行。

tryAcquire通过返回一个boolean值来告诉调用者获取资源数是否成功,
tryAcquireShared则返回的是个int型,

  • 小于0,表示获取失败,
  • 等于0,表示获取成功但后续还有其他获取将失败,
  • 大于0,表示获取成功但后续的获取有可能成功。

代码细节

先从一个例子看起,类似一个互斥锁的lock方法如下:

1
2
3
public void lock() {
sync.acquire(1);
}

acquire()

acquire()在AQS中实现:

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

  1. 当前线程首先通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,进入到等待队列排序等待。tryAcquire由子类实现。
  2. 尝试获取锁失败,通过addWaiter(Node.EXCLUSIVE)来将当前线程加入到CLH队列,也就是线程等待队列。
  3. 调用acquireQueued()来获取锁。
  4. 当前线程在执行acquireQueued()时,会进入到CLH队列中休眠等待,直到获取锁了才返回!如果当前线程在休眠等待过程中被中断过,acquireQueued会返回true,此时当前线程会调用selfInterrupt()来自己给自己产生一个中断。

addWaiter源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private Node addWaiter(Node mode) {
// 新建一个Node节点,节点对应的线程是“当前线程”,“当前线程”的锁的模型是mode。
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 若CLH队列不为空,则将“当前线程”添加到CLH队列末尾
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 若CLH队列为空,则调用enq()新建CLH队列,然后再将“当前线程”添加到CLH队列中。
enq(node);
return node;
}

acquireQueued()源码

acquireQueued()的作用就是逐步的去执行CLH队列的线程,如果当前线程获取到了锁,则返回;否则,当前线程进行休眠,直到唤醒并重新获取锁了才返回。

节点以“死循环”的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
 final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
// interrupted表示在CLH队列的调度中,
// “当前线程”在休眠时,有没有被中断过。
boolean interrupted = false;
for (;;) {
// 获取上一个节点。
// node是“当前线程”对应的节点,这里就意味着“获取上一个等待锁的线程”。
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

在acquireQueued(final Node node,int arg)方法中,当前线程在“死循环”中尝试获取同步状态,而只有前驱节点是头节点,当前线程就调用tryAcquire尝试获取锁,如果获取成功就将头结点设置为当前结点,返回;

不是头结点的情况决定是否应该挂起,shouldParkAfterFailedAcquire返回“当前线程是否应该阻塞。

shouldParkAfterFailedAcquire()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// ”
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
// 前继节点的状态
int ws = pred.waitStatus;
// 如果前继节点是SIGNAL状态,则意味这当前线程需要被unpark唤醒。此时,返回true。
if (ws == Node.SIGNAL)
return true;
// 如果前继节点是“取消”状态,则设置 “当前节点”的 “当前前继节点” 为 “‘原前继节点’的前继节点”。
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
// 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

CANCELLED:因为超时或者中断,结点会被设置为取消状态,被取消状态的结点不应该去竞争锁,只能保持取消状态不变,不能转换为其他状态。处于这种状态的结点会被踢出队列,被GC回收;

SIGNAL:表示这个结点的继任结点被阻塞了,到时需要通知它;

CONDITION:表示这个结点在条件队列中,因为等待某个条件而被阻塞;

PROPAGATE:使用在共享模式头结点有可能牌处于这种状态,表示锁的下一次获取可以无条件传播;
0:None of the above,新结点会处于这种状态。

该方法首先检查前趋结点的waitStatus位,如果为SIGNAL,表示前趋结点会通知它,那么它可以放心大胆地挂起了;

如果前趋结点是一个被取消的结点怎么办呢?那么就向前遍历跳过被取消的结点,直到找到一个没有被取消的结点为止,将找到的这个结点作为它的前趋结点,将找到的这个结点的waitStatus位设置为SIGNAL,返回false表示线程不应该被挂起。

parkAndCheckInterrupt

上面的代码返回true,需要进行阻塞。则会调用parkAndCheckInterrupt()阻塞当前线程,直到当前先被唤醒才从parkAndCheckInterrupt()中返回。

1
2
3
4
5
6
private final boolean parkAndCheckInterrupt() {
// 通过LockSupport的park()阻塞“当前线程”。
LockSupport.park(this);
// 返回线程的中断状态。
return Thread.interrupted();
}

线程被阻塞之后如何唤醒。一般有2种情况:
第1种情况:unpark()唤醒。“前继节点对应的线程”使用完锁之后,通过unpark()方式唤醒当前线程。
第2种情况:中断唤醒。其它线程通过interrupt()中断当前线程。

LockSupport中的park() 和 unpark() 的作用分别是阻塞线程和解除阻塞线程。
Thread.interrupted()清除中断状态。

selfInterrupt()

如果阻塞后,中断唤醒

1
2
3
private static void selfInterrupt() {
Thread.currentThread().interrupt();
}

“当前线程”自己产生一个中断。

在上面的acquireQueued()中,线程在阻塞状态被中断唤醒而获取到cpu执行权利;但是,如果该线程的前面还有其它等待锁的线程,根据公平性原则,该线程依然无法获取到锁。它会再次阻塞! 该线程再次阻塞,直到该线程被它的前面等待锁的线程锁采用unpark()唤醒;线程才会获取锁,然后“真正执行起来”!

在parkAndCheckInterrupt()中,线程的中断状态被清除了。

再看acquire()函数

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
  1. 先是通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,再通过acquireQueued()获取锁。
  2. 尝试失败的情况下,会先通过addWaiter()来将“当前线程”加入到”CLH队列”末尾;然后调用acquireQueued(),在CLH队列中排序等待获取锁,在此过程中,线程处于休眠状态。直到获取锁了才返回。 如果在休眠等待过程中被中断过,则调用selfInterrupt()来自己产生一个中断。

参考

https://www.cnblogs.com/noahsark/p/sbstract_queued_synchronizer.html
https://www.cnblogs.com/2015110615L/p/6754529.html
http://ifeve.com/introduce-abstractqueuedsynchronizer/
http://blog.csdn.net/sherld/article/details/42492259
AbstractQueuedSynchronizer源码解析
http://blog.csdn.net/yuenkin/article/details/50867530

AbstractQueuedSynchronizer(AQS)源码解析(一)
http://www.tuicool.com/articles/INfEj23

http://manzhizhen.iteye.com/blog/2305890

Java多线程系列目录(共43篇)
http://www.cnblogs.com/skywang12345/p/java_threads_category.html

http://blog.csdn.net/yanlinwang/article/details/41172697

评论系统未开启,无法评论!