队列同步器AQS的实现原理

队列同步器

第一节 简介

队列同步器,是用来构建锁或者其他同步组件的基本框架。并发包作者Doug Lea期望同步器可以成为实现大部分同步需求的基础。

同步器的来源?当有多个线程争用同一把锁时,必须有类似排队的机制去将那些没能拿到锁的线程串联在一起。当锁释放时,锁管理器就会挑选一个合适的线程来占有这个刚刚释放的锁。每一把锁内部都会有这样一个队列管理器,管理器里面会维护一个等待的线程队列

同步器和锁的关系?同步器是实现锁的关键,在锁的实现中聚合同步器,从而利用同步器来实现锁的语义。锁是面向使用者,其定义了使用者和锁交互的接口,隐藏了实现细节同步器则面向锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁的同步器这一设计很好的隔离了使用者和实现者所关注的领域,是我们设计时要学习的思路。


第二节 如何使用同步器

同步器的主要使用方式是继承,子类通过继承同步器并实现其抽象方法来管理同步状态,在实现过程难免要对同步状态进行修改,需要调用同步器实现的三个方法来进行操作:

  • getState() :获取同步状态。
  • setState(int newState) :设置同步状态。
  • compareAndSetState(int expect,int update) :CAS设置同步状态

这三个方法能够保证状态的改变是安全的。

子类推荐被定义为自定义同步组件的静态内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干状态获取和释放的方法来供自定义同步组件使用。同步器既可以支持独占式的获取同步状态,也可以支持共享式的获取同步状态,可以方便实现不同类型的同步组件:ReentrantLock,ReentrantReadWriteLock,CountDownLatch等。


第三节 同步器的结构和方法

同步器使用了一个 int 成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。

AQS基于模板方法模式设计,使用者需要继承并重写指定方法,将同步器组合在自定义组件的实现中,并调用同步器提供的模板方法,这些模板方法会调用使用者重的方法。同步器通过一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。

3.1 同步器提供的和可重写的方法

同步器提供的方法:

  1. getState() 获取当前同步状态
  2. setState(int newState) 设置当前同步状态
  3. compareAndSetState(int expect,int update) 使用CAS设置当前状态,该方法能够保证状态设置的原子性

同步器可重写的方法如下图所示。

同步器可重写的方法

3.2 同步器提供的模板方法

实现自定义同步组件时,将会调用同步器提供的模板方法,如下图所示。

同步器提供的模板方法

同步器提供的模板方法基本上分为3类:

  1. 独占式获取与释放同步状态。
  2. 共享式获取与释放同步状态。
  3. 查询同步队列中的等待线程情况。

自定义同步组件将使用同步器提供的模板方法来实现自己的同步语义。只有掌握了同步器的工作原理才能更加深入地理解并发包中其他的并发组件,所以下面通过一个独占锁的示例来深入了解一下同步器的工作原理。

3.3 AbstractQueuedSynchronizer

ReentrantLock 的队列管理器是 AbstractQueuedSynchronizer ,它内部的等待队列是一个双向链表结构,列表中的每个节点的结构如下列代码所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class AbstractQueuedSynchronizer {
private transient volatile Node head; // 队头线程将优先获得锁
private transient volatile Node tail; // 抢锁失败的线程追加到队尾
private volatile int state; // 锁计数

/**
* 获取当前的同步状态
*/
protected final int getState() {
return state;
}

/**
* 设置当前的同步状态
*/
protected final void setState(int newState) {
state = newState;
}

/**
* 使用CAS设置当前状态,该方法保证状态设置的原子性
*/
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

static final class Node {
Node prev;
Node next;
Thread thread; // 每个节点一个线程

// 下面这两个特殊字段可以先不去理解
Node nextWaiter; // 请求的是共享锁还是独占锁
int waitStatus; // 精细状态描述字
}
}

第四节 实例

只有掌握了同步器的工作原理才能更加深入地理解并发包中其他的并发组件,所以下面通过一个独占锁的示例来深入了解一下同步器的工作原理。顾名思义,独占锁就是在同一时刻只能有一个线程获取到锁,而其他获取锁的线程只能处于同步队列中等待,只有获取锁的线程释放了锁,后继的线程才能够获取锁,如代码所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class Mutex implements Lock {
// 静态内部类,自定义同步器

private static class Sync extends AbstractQueuedSynchronizer {
// 是否处于占用状态
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 当状态为0的时候获取锁,独占式获取同步状态
public boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
//设置当前线程为独占线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}

// 释放锁,将状态设置为0,独占式释放同步状态
protected boolean tryRelease(int releases) {
if (getState() == 0)
throw new IllegalMonitorStateException();
//置空独占线程和同步状态
setExclusiveOwnerThread(null);
setState(0);
return true;
}

// 返回一个Condition,每个condition都包含了一个condition队列
Condition newCondition() {
return new ConditionObject();
}
}
// 仅需要将操作代理到Sync上即可
private final Sync sync = new Sync();
public void lock() { sync.acquire(1); }
public boolean tryLock() { return sync.tryAcquire(1); }
public void unlock() { sync.release(1); }
public Condition newCondition() { return sync.newCondition(); }
public boolean isLocked() { return sync.isHeldExclusively(); }
public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
}
  • 独占锁Mutex是一个自定义同步组件,它在同一时刻只允许一个线程占有锁。
  • Mutex中定义了一个静态内部类,该内部类继承了同步器并实现了独占式获取和释放同步状态。
  • tryAcquire(int acquires) 方法中,如果经过CAS设置成功(同步状态设置为1),则代表获取了同步状态。
  • tryRelease(int releases) 方法中只是将同步状态重置为0。
  • 用户使用Mutex时并不会直接和内部同步器的实现打交道,而是调用Mutex提供的方法,在Mutex的实现中,以获取锁的 lock() 方法为例,只需要在方法实现中调用同步器的模板方法 acquire(int args) 即可,当前线程调用该方法获取同步状态失败后会被加入到同步队列中等待,这样就大大降低了实现一个可靠自定义同步组件的门槛。

第五节 同步器的实现分析

接下来将从实现角度分析同步器是如何完成线程同步的,主要包括:同步队列、独占式同步状态获取与释放、共享式同步状态获取与释放以及超时获取同步状态等同步器的核心数据结构与模板方法。

5.1 同步队列

同步器依赖内部的同步队列(一个FIFO双向队列)来完成同步状态的管理,当前线程获取同步状态失败时,同步器会将当前线程以及等待状态等信息构造成为一个节点(Node)并将其加入同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点中的线程唤醒,使其再次尝试获取同步状态。

同步队列中的节点(Node)用来保存获取同步状态失败的线程引用、等待状态以及前驱和后继节点,节点的属性类型与名称以及描述如下表所示。

节点的属性类型与名称以及描述

节点是构成同步队列(等待队列)的基础,同步器拥有首节点(head)和尾节点(tail),没有成功获取同步状态的线程将会成为节点加入该队列的尾部,同步队列的基本结构如下图所示。

同步队列的基本结构

在图中,同步器包含了两个节点类型的引用,一个指向头节点,而另一个指向尾节点。试想一下,当一个线程成功地获取了同步状态(或者锁),其他线程将无法获取到同步状态,转而被构造成为节点并加入到同步队列中,而这个加入队列的过程必须要保证线程安全,因此同步器提供了一个基于CAS的设置尾节点的方法compareAndSetTail(Node expect, Node update) ,它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。

同步器将节点加入到同步队列的过程如下图所示。

节点加入到同步队列

同步队列遵循FIFO,首节点是获取同步状态成功的节点,首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点,该过程如下图所示。

首节点的设置

在图中,设置首节点是通过获取同步状态成功的线程来完成的,由于只有一个线程能够成功获取到同步状态,因此设置头节点的方法并不需要使用CAS来保证,它只需要将首节点设置成为原首节点的后继节点并断开原首节点的next引用即可。


5.2 独占式同步状态获取与释放

通过调用同步器的 acquire(int arg) 方法可以获取同步状态,该方法对中断不敏感,也就是说当线程获取同步状态失败后进入同步队列中,后续对线程进行中断操作时,线程不会从同步队列中移出,该方法代码如代码所示。

1
2
3
4
5
6
7
8
9
10
   public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

// 在抽象类AbstractQueuedSynchronizer中并未直接实现
protected boolean tryAcquire(int arg) {
throw new UnsupportedOperationException();
}

上述代码主要完成了同步状态获取、节点构造、加入同步队列以及在同步队列中自旋等待的相关工作,其主要逻辑是:

  1. 首先调用自定义同步器实现的 tryAcquire(int arg) 方法,该方法保证线程安全的获取同步状态。
  2. 如果同步状态获取失败,则构造同步节点(独占式 Node.EXCLUSIVE ,同一时刻只能有一个线程成功获取同步状态)并通过 addWaiter(Node node) 方法将该节点加入到同步队列的尾部。
  3. 最后调用 acquireQueued(Node node,int arg) 方法,使得该节点以“死循环”的方式获取同步状态。如果获取不到则阻塞节点中的线程,而被阻塞线程的唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。

**5.2.1 addWaiter()enq() **

下面分析一下相关工作。首先是节点的构造以及加入同步队列,如代码所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
// 快速尝试在尾部添加
Node pred = tail;//获取当前尾节点
if (pred != null) {//当队列已有节点存在
node.prev = pred;//新节点连上尾节点
if (compareAndSetTail(pred, node)) {//CAS线程安全的设置尾节点->同步器设置tail
pred.next = node;//尾节点连上新节点
return node;
}
}
//队列为空或设置tail失败,循环尝试新节点入列
enq(node);
return node;
}

/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
for (;;) {//死循环
Node t = tail;//获取当前同步器所设尾节点
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))//未设尾节点,尝试设置头节点,成功则初始化尾节点为头节点
tail = head;
} else {
node.prev = t;//已设置尾节点,则继续尝试新节点和尾节点的连接
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}

上述代码通过使用 compareAndSetTail(Node expect,Node update) 方法来确保节点能够被线程安全添加。试想一下:如果使用一个普通的 LinkedList 来维护节点之间的关系,那么当一个线程获取了同步状态,而其他多个线程由于调用 tryAcquire(int arg) 方法获取同步状态失败而并发地被添加到 LinkedList 时,LinkedList 将难以保证Node的正确添加,最终的结果可能是节点的数量有偏差,而且顺序也是混乱的。

enq(final Node node) 方法中,同步器通过“死循环”来保证节点的正确添加,在“死循环”中只有通过 CAS 将节点设置成为尾节点之后,当前线程才能从该方法返回,否则,当前线程不断地尝试设置。可以看出,enq(final Node node) 方法将并发添加节点的请求通过 CAS 变得“串行化”了。

节点进入同步队列之后,就进入了一个自旋的过程,每个节点(或者说每个线程)都在自省地观察,当条件满足,获取到了同步状态,就可以从这个自旋过程中退出,否则依旧留在这个自旋过程中(并会阻塞节点的线程),如代码所示。

**5.2.2 acquireQueued() **

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;//标识是否失败
try {
boolean interrupted = false;
for (;;) {//循环获取同步状态
final Node p = node.predecessor();//获取前驱节点
if (p == head && tryAcquire(arg)) {//当前驱节点为头节点时尝试获取同步状态
//得到同步状态
setHead(node);//设置此节点为头节点
p.next = null; // help GC
failed = false;//标识操作成功
return interrupted;//未中断返回
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;//中断
}
} finally {
if (failed)
cancelAcquire(node);
}
}

acquireQueued(final Node node,int arg) 方法中,当前线程在“死循环”中尝试获取同步状态,而只有前驱节点是头节点才能够尝试获取同步状态,这是为什么?原因有两个:

  1. 头节点是成功获取到同步状态的节点,而头节点的线程释放了同步状态之后,将会唤醒其后继节点,后继节点的线程被唤醒后需要检查自己的前驱节点是否是头节点。
  2. 维护同步队列的FIFO原则。该方法中,节点自旋获取同步状态的行为如下图所示。

节点自旋获取同步状态

在图中,由于非首节点线程前驱节点出队或者被中断而从等待状态返回,随后检查自己的前驱是否是头节点,如果是则尝试获取同步状态。可以看到节点和节点之间在循环检查的过程中基本不相互通信,而是简单地判断自己的前驱是否为头节点,这样就使得节点的释放规则符合FIFO,并且也便于对过早通知的处理(过早通知是指前驱节点不是头节点的线程由于中断而被唤醒)。

独占式同步状态获取流程,也就是 acquire(int arg) 方法调用流程,如下图所示。

独占式同步状态获取流程

在图中,前驱节点为头节点且能够获取同步状态的判断条件和线程进入等待状态是获取同步状态的自旋过程。当同步状态获取成功之后,当前线程从 acquire(int arg) 方法返回,如果对于锁这种并发组件而言,代表着当前线程获取了锁。

当前线程获取同步状态并执行了相应逻辑之后,就需要释放同步状态,使得后续节点能够继续获取同步状态。通过调用同步器的 release(int arg) 方法可以释放同步状态,该方法在释放了同步状态之后,会唤醒其后继节点(进而使后继节点重新尝试获取同步状态)。该方法代码如代码所示。

**5.2.3 release() **

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
   /**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
if (tryRelease(arg)) {
// 当前头结点线程
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒等待状态的头结点线程
return true;
}
return false;
}

// 在抽象类AbstractQueuedSynchronizer中并未直接实现
protected boolean tryRelease(int arg) {
throw new UnsupportedOperationException();
}

// ReentrantLock自定义Sync中实现如下
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 当前线程未持有锁则抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 表示此时同步释放,返回true
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 否则表示同步仍在继续,返回false
setState(c);
return free;
}

该方法执行时,会唤醒头节点的后继节点线程,unparkSuccessor(Node node) 方法使用 LockSupport(在后面的章节会专门介绍)来唤醒处于等待状态的线程。

分析了独占式同步状态获取和释放过程后,适当做个总结:在获取同步状态时,同步器维护一个同步队列,获取状态失败的线程都会被加入到队列中并在队列中进行自旋;移出队列(或停止自旋)的条件是前驱节点为头节点且成功获取了同步状态。在释放同步状态时,同步器调用 tryRelease(int arg) 方法释放同步状态,然后唤醒头节点的后继节点。


5.3 共享式同步状态获取与释放

5.3.1 共享式和独占式的区别

共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态。以文件的读写为例,如果一个程序在对文件进行读操作,那么这一时刻对于该文件的写操作均被阻塞,而读操作能够同时进行。写操作要求对资源的独占式访问,而读操作可以是共享式访问,两种不同的访问模式在同一时刻对文件或资源的访问情况,如下图所示。

共享式与独占式访问资源的对比

在图中,左半部分,共享式访问资源时,其他共享式的访问均被允许,而独占式访问被阻塞,右半部分是独占式访问资源时,同一时刻其他访问均被阻塞。

**5.3.2 acquireShared()doAcquireShared() **

通过调用同步器的 acquireShared(int arg) 方法可以共享式地获取同步状态,该方法代码如下列代码所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
/**
* Acquires in shared mode, ignoring interrupts. Implemented by
* first invoking at least once {@link #tryAcquireShared},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquireShared} until success.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquireShared} but is otherwise uninterpreted
* and can represent anything you like.
*/
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)//尝试共享式获取同步状态
doAcquireShared(arg);//未获取到则执行doAcquireShared,死循环获取同步状态
}

/**
* Acquires in shared uninterruptible mode.
* @param arg the acquire argument
*/
private void doAcquireShared(int arg) {
// 构建一个共享的尾结点,对比独占式的
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 响应中断
boolean interrupted = false;
// 死循环尝试获取同步状态
for (;;) {
final Node p = node.predecessor();
if (p == head) {//当前驱节点为头节点
int r = tryAcquireShared(arg);//获取同步状态
if (r >= 0) {//获取成功,从自旋过程退出
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
// 判断是否需要进入等待状态
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

acquireShared(int arg) 方法中,同步器调用 tryAcquireShared(int arg) 方法尝试获取同步状态,tryAcquireShared(int arg) 方法返回值为int类型,当返回值大于等于0时,表示能够获取到同步状态。因此,在共享式获取的自旋过程中,成功获取到同步状态并退出自旋的条件就是 tryAcquireShared(int arg) 方法返回值大于等于0。可以看到,在 doAcquireShared(int arg) 方法的自旋过程中,如果当前节点的前驱为头节点时,尝试获取同步状态,如果返回值大于等于0,表示该次获取同步状态成功并从自旋过程中退出。

与独占式一样,共享式获取也需要释放同步状态,通过调用 releaseShared(int arg) 方法可以释放同步状态,该方法代码如下列代码所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/**
* Releases in shared mode. Implemented by unblocking one or more
* threads if {@link #tryReleaseShared} returns true.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryReleaseShared} but is otherwise uninterpreted
* and can represent anything you like.
* @return the value returned from {@link #tryReleaseShared}
*/
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

该方法在释放同步状态之后,将会唤醒后续处于等待状态的节点。对于能够支持多个线程同时访问的并发组件(比如Semaphore),它和独占式主要区别在于 tryReleaseShared(int arg) 方法必须确保同步状态(或者资源数)线程安全释放,一般是通过循环和CAS来保证的,因为释放同步状态的操作会同时来自多个线程


5.4 独占式超时获取同步状态

通过调用同步器的 doAcquireNanos(int arg, long nanosTimeout) 方法可以超时获取同步状态,即在指定的时间段内获取同步状态,如果获取到同步状态则返回true,否则返回false。该方法提供了传统Java同步操作(比如 synchronized 关键字)所不具备的特性。

在分析该方法的实现前,先介绍一下响应中断的同步状态获取过程。在Java 5之前,当一个线程获取不到锁而被阻塞在 synchronized 之外时,对该线程进行中断操作,此时该线程的中断标志位会被修改,但线程依旧会阻塞在 synchronized 上,等待着获取锁。在Java 5中,同步器提供了 acquireInterruptibly(int arg) 方法,这个方法在等待获取同步状态时,如果当前线程被中断,会立刻返回,并抛出 InterruptedException

超时获取同步状态过程可以被视作响应中断获取同步状态过程的“增强版”,doAcquireNanos(int arg,long nanosTimeout) 方法在支持响应中断的基础上,增加了超时获取的特性。针对超时获取,主要需要计算出需要睡眠的时间间隔 nanosTimeout ,为了防止过早通知,nanosTimeout 计算公式为:nanosTimeout -= now-lastTime ,其中 now 为当前唤醒时间,lastTime 为上次唤醒时间,如果 nanosTimeout 大于0则表示超时时间未到,需要继续睡眠 nanosTimeout 纳秒,反之,表示已经超时,该方法代码如下列代码所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* Acquires in exclusive timed mode.
*
* @param arg the acquire argument
* @param nanosTimeout max wait time
* @return {@code true} if acquired
*/
private boolean doAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;//超时间隔小于0直接返回false
final long deadline = System.nanoTime() + nanosTimeout;//记录死亡时间线=当前时间+间隔时间
final Node node = addWaiter(Node.EXCLUSIVE);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {//当节点的前驱节点为头节点,尝试获取同步状态
setHead(node);//获取成功,重置头节点
p.next = null; // help GC
failed = false;
return true;//获取成功,返回true
}
nanosTimeout = deadline - System.nanoTime();//计算当前还剩余时间间隔,重置时间间隔
if (nanosTimeout <= 0L)//判断超时
return false;//剩余时间间隔小于0,表示已到指定时间,返回false
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanosTimeout);//当剩余间隔还大于1000纳秒时进入超时等待,否则快速的自旋
if (Thread.interrupted())
throw new InterruptedException();//线程被中断,抛出异常
}
} finally {
if (failed)
cancelAcquire(node);
}
}

该方法在自旋过程中,当节点的前驱节点为头节点时尝试获取同步状态,如果获取成功则从该方法返回,这个过程和独占式同步获取的过程类似,但是在同步状态获取失败的处理上有所不同。如果当前线程获取同步状态失败,则判断是否超时( nanosTimeout 小于等于0表示已经超时),如果没有超时,重新计算超时间隔 nanosTimeout ,然后使当前线程等待 nanosTimeout 纳秒(当已到设置的超时时间,该线程会从 LockSupport.parkNanos(Object blocker,long nanos) 方法返回)。

如果 nanosTimeout 小于等于 spinForTimeoutThreshold(1000纳秒)时,将不会使该线程进行超时等待,而是进入快速的自旋过程。原因在于,非常短的超时等待无法做到十分精确,如果这时再进行超时等待,相反会让 nanosTimeout 的超时从整体上表现得反而不精确。因此,在超时非常短的场景下,同步器会进入无条件的快速自旋。

独占式超时获取同步态的流程如下图所示。

独占式超时获取同步状态的流程

从图中可以看出,独占式超时获取同步状态 doAcquireNanos(int arg,long nanosTimeout) 和独占式获取同步状态 acquire(int args) 在流程上非常相似,其主要区别在于未获取到同步状态时的处理逻辑。acquire(int args) 在未获取到同步状态时,将会使当前线程一直处于等待状态,而 doAcquireNanos(int arg,long nanosTimeout) 会使当前线程等待 nanosTimeout 纳秒,如果当前线程在 nanosTimeout 纳秒内没有获取到同步状态,将会从等待逻辑中自动返回。


5.5 自定义同步组件——TwinsLock

在前面对同步器 AbstractQueuedSynchronizer 进行了实现层面的分析,接下来通过编写一个自定义同步组件来加深对同步器的理解。

设计一个同步工具:该工具在同一时刻,只允许至多两个线程同时访问,超过两个线程的访问将被阻塞,我们将这个同步工具命名为 TwinsLock

首先,确定访问模式。TwinsLock能够在同一时刻支持多个线程的访问,这显然是共享式访问,因此,需要使用同步器提供的 acquireShared(int args) 方法等和Shared相关的方法,这就要求TwinsLock必须重写 tryAcquireShared(int args) 方法和 tryReleaseShared(int args) 方法,这样才能保证同步器的共享式同步状态的获取与释放方法得以执行。

其次,定义资源数。TwinsLock在同一时刻允许至多两个线程的同时访问,表明同步资源数为2,这样可以设置初始状态status为2,当一个线程进行获取,status减1,该线程释放,则status加1,状态的合法范围为0、1和2,其中0表示当前已经有两个线程获取了同步资源,此时再有其他线程对同步状态进行获取,该线程只能被阻塞。在同步状态变更时,需要使用 compareAndSet(int expect,int update) 方法做原子性保障。

最后,组合自定义同步器。前面的章节提到,自定义同步组件通过组合自定义同步器来完成同步功能,一般情况下自定义同步器会被定义为自定义同步组件的内部类。

TwinsLock(部分)代码如下列代码所示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class TwinsLock implements Lock {
private final Sync sync = new Sync(2);
private static final class Sync extends AbstractQueuedSynchronizer {
Sync(int count) {
if (count <= 0) {
throw new IllegalArgumentException("count must large than zero.");
}
setState(count);
}
public int tryAcquireShared(int reduceCount) {
for (;;) {
int current = getState();
int newCount = current - reduceCount;
if (newCount < 0 || compareAndSetState(current,
newCount)) {
return newCount;
}
}
}
public boolean tryReleaseShared(int returnCount) {
for (;;) {
int current = getState();
int newCount = current + returnCount;
if (compareAndSetState(current, newCount)) {
return true;
}
}
}
}
public void lock() {
sync.acquireShared(1);
}
public void unlock() {
sync.releaseShared(1);
}

// 其他接口方法略
}

在上述示例中,TwinsLock实现了 Lock 接口,提供了面向使用者的接口,使用者调用 lock() 方法获取锁,随后调用 unlock() 方法释放锁,而同一时刻只能有两个线程同时获取到锁。TwinsLock同时包含了一个自定义同步器 Sync ,而该同步器面向线程访问和同步状态控制。以共享式获取同步状态为例:同步器会先计算出获取后的同步状态,然后通过CAS确保状态的正确设置,当 tryAcquireShared(int reduceCount) 方法返回值大于等于0时,当前线程才获取同步状态,对于上层的TwinsLock而言,则表示当前线程获得了锁。

同步器作为一个桥梁,连接线程访问以及同步状态控制等底层技术与不同并发组件(比如 LockCountDownLatch 等)的接口语义。

下面编写一个测试来验证TwinsLock是否能按照预期工作。在测试用例中,定义了工作者线程Worker,该线程在执行过程中获取锁,当获取锁之后使当前线程睡眠1秒(并不释放锁),随后打印当前线程名称,最后再次睡眠1秒并释放锁,测试用例如下列代码所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
public class TwinsLockTest {
@Test
public void test() {
final Lock lock = new TwinsLock();
class Worker extends Thread {
public void run() {
while (true) {
lock.lock();
try {
SleepUtils.second(1);
System.out.println(Thread.currentThread().getName());
SleepUtils.second(1);
} finally {
lock.unlock();
}
}
}
}

// 启动10个线程
for (int i = 0; i < 10; i++) {
Worker w = new Worker();
w.setDaemon(true);
w.start();
}

// 每隔1秒换行
for (int i = 0; i < 10; i++) {
SleepUtils.second(1);
System.out.println();
}
}
}

运行该测试用例,可以看到线程名称成对输出,也就是在同一时刻只有两个线程能够获取到锁,这表明TwinsLock可以按照预期正确工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Thread-3

Thread-0

Thread-4
Thread-3



Thread-4
Thread-3

Thread-3

Thread-4


Thread-3
Thread-4


第六节 AbstractQueuedSynchronizer源码

AbstractQueuedSynchronizer部分代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
public abstract class AbstractQueuedSynchronizer 
extends AbstractOwnableSynchronizer
implements java.io.Serializable {//AQS中共享锁相关代码

private volatile int state;//对于共享锁,这个state的作用类似计数器
/**
* 请求共享锁
*/
public final void acquireShared(int arg) {
//state != 0时,tryAcquireShared(arg) < 0,才会真正操作锁
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

/**
* 跟独占锁很像,只不过共享锁初始化时有传入一个count,count为
*/
private void doAcquireShared(int arg) {
//把当前线程封装到一个SHARE类型Node中,添加到SyncQueue尾巴上
final Node node = addWaiter(Node.SHARED);
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {//前继节点是head节点,下一个就到自己了
int r = tryAcquireShared(arg);//非公平锁实现,再尝试获取锁
//state==0时tryAcquireShared会返回>=0(CountDownLatch中返回的是1)。state为0说明共享次数已经到了,可以获取锁了
//注意上面说的, 等于0表示不用唤醒后继节点,大于0需要
if (r >= 0) {//r>0表示state==0,前继节点已经释放锁,锁的状态为可被获取
setHeadAndPropagate(node, r);//这一步设置node为head节点设置node.waitStatus->Node.PROPAGATE,然后唤醒node.thread
//唤醒head节点线程后,从这里开始继续往下走
p.next = null; //head已经指向node节点,oldHead.next索引置空,方便p节点对象回收
if (interrupted)
selfInterrupt();
return;
}
}
//前继节点非head节点,将前继节点状态设置为SIGNAL,通过park挂起node节点的线程
if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}

/**
* 把node节点设置成head节点,且node.waitStatus->Node.PROPAGATE
*/
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head;//h用来保存旧的head节点
setHead(node);//head引用指向node节点
/* 这里意思有两种情况是需要执行唤醒操作
* 1.propagate > 0 表示调用方指明了后继节点需要被唤醒
* 2.头节点后面的节点需要被唤醒(waitStatus<0),不论是老的头结点还是新的头结点*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())//node是最后一个节点或者 node的后继节点是共享节点
/* 如果head节点状态为SIGNAL,唤醒head节点线程,重置head.waitStatus->0
* head节点状态为0(第一次添加时是0),设置head.waitStatus->Node.PROPAGATE表示状态需要向后继节点传播
*/
doReleaseShared();//对于这个方法,其实就是把node节点设置成Node.PROPAGATE状态
}
}

public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {//state为0时,返回true(针对CountDownLatch)
doReleaseShared();
return true;
}
return false;
}
/**
* 把当前结点设置为SIGNAL或者PROPAGATE
* 唤醒head.next(B节点),B节点唤醒后可以竞争锁,成功后head->B,然后又会唤醒B.next,一直重复直到共享节点都唤醒
* head节点状态为SIGNAL,重置head.waitStatus->0,唤醒head节点线程,唤醒后线程去竞争共享锁
* head节点状态为0,将head.waitStatus->Node.PROPAGATE传播状态,表示需要将状态向后继节点传播
*/
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {//head是SIGNAL状态
/* head状态是SIGNAL,重置head节点waitStatus为0,这里不直接设为Node.PROPAGATE,
* 是因为unparkSuccessor(h)中,如果ws < 0会设置为0,所以ws先设置为0,再设置为PROPAGATE
* 这里需要控制并发,因为入口有setHeadAndPropagate跟release两个,避免两次unpark
*/
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue;//设置失败,重新循环
/* head状态为SIGNAL,且成功设置为0之后,唤醒head.next节点线程
* 此时head、head.next的线程都唤醒了,head.next会去竞争锁,成功后head会指向获取锁的节点,
* 也就是head发生了变化。看最底下一行代码可知,head发生变化后会重新循环,继续唤醒head的下一个节点
*/
unparkSuccessor(h);
/*
* 如果本身头节点的waitStatus是出于重置状态(waitStatus==0)的,将其设置为“传播”状态。
* 意味着需要将状态向后一个节点传播
*/
} else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue;
}
if (h == head)//如果head变了,重新循环
break;
}
}

private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
node.compareAndSetWaitStatus(ws, 0);
Node s = node.next;//node.next
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread);//唤醒的是下一个可唤醒的线程
}

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {//去除CANCELLED节点
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
}
return false;
}
}

CountDownLatch部分代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public class CountDownLatch {//CountDownLatch共享锁源码
//继承AQS,核心实现都在AQS里
private static final class Sync extends AbstractQueuedSynchronizer {

Sync(int count) {
//共享锁state的值可以自己设定,用作计算共享次数,这点跟排它锁(只能0/1)不同
setState(count);
}

int getCount() {
return getState();
}


/* tryAcquireShared返回值:
* < 0:表示获取锁失败,需要进入等待队列
* = 0:表示当前线程获取共享锁成功,但不需要把它后面等待的节点唤醒
* > 0:表示当前线程获取共享锁成功,且此时需要把后续节点唤醒让它们去尝试获取共享锁
*/
protected int tryAcquireShared(int acquires) {
/* getState()是初始化时传入的count值,getState>0,return -1,在AQS中会往下执行
* getState == 0时,return 1,在AQS中不往下走
*/
return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
//state == 0 表示锁已经释放了
if (c == 0)
return false;
//每次调用tryReleaseShared,state值减1
int nextc = c - 1;
//state为0了,返回true,这时才真正去释放锁
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}
private final Sync sync;
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public void countDown() {
sync.releaseShared(1);
}
}

AQS基本结构如下图所示。

加锁不成功时,当前的线程就会把自己纳入到等待链表的尾部,然后调用 LockSupport.park 将自己休眠。其它线程解锁时,会从链表的表头取一个节点,调用 LockSupport.unpark 唤醒它。

AbstractQueuedSynchronizer 类是一个抽象类,它是所有的锁队列管理器的父类,JDK中的各种形式的锁其内部的队列管理器都继承了这个类,它是Java并发世界的核心基石。比如 ReentrantLockReadWriteLockCountDownLatchSemaphoneThreadPoolExecutor 内部的队列管理器都是它的子类。

这个抽象类暴露了一些抽象方法,每一种锁都需要对这个管理器进行定制。而JDK内置的所有并发数据结构都是在这些锁的保护下完成的,它是JDK多线程高楼大厦的地基。

锁管理器维护的只是一个普通的双向列表形式的队列,这个数据结构很简单,但是仔细维护起来却相当复杂,因为它需要精细考虑多线程并发问题,每一行代码都写的无比小心。


参考博客和文章书籍等:

《Java核心技术 卷Ⅰ》

《Java并发编程的艺术》

并发数据结构的基石

因博客主等未标明不可引用,若部分内容涉及侵权请及时告知,我会尽快修改和删除相关内容