可重入的独占锁ReentrantLock的实现原理

ReentrantLock


第一节 简介

重入锁 ReentrantLock ,顾名思义,就是支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁。除此之外,该锁的还支持获取锁时的公平和非公平性选择

ReentrantLock还是一个独占/排他锁。相对于synchronized,它更加灵活,但是需要自己写出加锁和解锁的过程。它的灵活性在于它拥有很多特性,相关内容在《Lock接口》已有描述。ReentrantLock需要显示地进行释放锁,特别是在程序异常时,synchronized会自动释放锁,而ReentrantLock并不会自动释放锁,所以必须在finally中进行释放锁。

1.1 特性

  • 公平性:支持公平锁和非公平锁,默认使用了非公平锁。
  • 可重入:支持重进入,反复进行加锁。
  • 可中断:相对于synchronized,它是可中断的锁,能够对中断作出响应。
  • 超时机制:超时后不能获得锁,因此不会造成死锁。

第二节 锁的重进入和公平性

2.1 概述重进入和公平性

回忆在同步器AQS的实现原理一节中的示例(Mutex),同时考虑如下场景:

当一个线程调用Mutex的 lock() 方法获取锁之后,如果再次调用 lock() 方法,则该线程将会被自己所阻塞,原因是Mutex在实现 tryAcquire(int acquires) 方法时没有考虑占有锁的线程再次获取锁的场景,而在调用 tryAcquire(int acquires) 方法时返回了false,导致该线程被阻塞。

简单地说,Mutex是一个不支持重进入的锁。而synchronized关键字隐式的支持重进入,比如一个synchronized修饰的递归方法,在方法执行时,执行线程在获取了锁之后仍能连续多次地获得该锁,而不像Mutex由于获取了锁,而在下一次获取锁时出现阻塞自己的情况。

ReentrantLock虽然没能像synchronized关键字一样支持隐式的重进入,但是在调用 lock() 方法时,已经获取到锁的线程,能够再次调用 lock() 方法获取锁而不被阻塞。

这里提到一个锁获取的公平性问题,如果在绝对时间上,先对锁进行获取的请求一定先被满足,那么这个锁是公平的,反之,是不公平的公平的获取锁,也就是等待时间最长的线程最优先获取锁,也可以说锁获取是顺序的。ReentrantLock提供了一个构造函数,能够控制锁是否是公平的。

事实上,公平的锁机制往往没有非公平的效率高,但是,并不是任何场景都是以TPS作为唯一的指标,公平锁能够减少“饥饿”发生的概率,等待越久的请求越是能够得到优先满足。

下面将着重分析ReentrantLock是如何实现重进入和公平性获取锁的特性,并通过测试来验证公平性获取锁对性能的影响。


2.2 如何实现重进入

重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞,该特性的实现需要解决以下两个问题:

  1. 线程再次获取锁。锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是则再次成功获取。
  2. 锁的最终释放。线程重复n次获取了锁,随后在第n次释放该锁后,其他线程能够获取到该锁。锁的最终释放要求锁对于获取进行计数自增,计数表示当前锁被重复获取的次数,而锁被释放时,计数自减,当计数等于0时表示锁已经成功释放。

ReentrantLock是通过组合自定义同步器来实现锁的获取与释放,以非公平性(默认的)实现为例,获取同步状态的代码如下列代码所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean nonfairTryAcquire(int acquires) {
//获取当前线程对象
final Thread current = Thread.currentThread();
//获取同步状态
int c = getState();
if (c == 0) { //未持有锁
if (compareAndSetState(0, acquires)) { //尝试CAS获取
//获取成功,设置当前线程为专属线程,并返回结果表示获取成功
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {//当前线程为获取锁的线程
//累加同步状态
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);//计数更新
return true;
}
return false;
}

该方法增加了再次获取同步状态的处理逻辑:通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是获取锁的线程再次请求,则将同步状态值进行增加并返回true,表示获取同步状态成功。

成功获取锁的线程再次获取锁,只是增加了同步状态值,这也就要求 ReentrantLock 在释放同步状态时减少同步状态值,该方法的代码如下列代码所示。

1
2
3
4
5
6
7
8
9
10
11
12
protected final boolean tryRelease(int releases) {
int c = getState() - releases;//减少同步状态值
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;//是否完全释放
if (c == 0) {//完全释放
free = true;
setExclusiveOwnerThread(null);//设置占用线程为null
}
setState(c);//计数更新
return free;
}

如果该锁被获取了n次,那么前(n-1)次 tryRelease(int releases) 方法必须返回false,而只有同步状态完全释放了,才能返回true。可以看到,该方法将同步状态是否为0作为最终释放的条件,当同步状态为0时,将占有线程设置为null,并返回true,表示释放成功。


2.3 公平与非公平获取锁的区别

公平性与否是针对获取锁而言的,如果一个锁是公平的,那么锁的获取顺序就应该符合请求的绝对时间顺序,也就是FIFO。

回顾同步器AQS的实现原理中介绍的 nonfairTryAcquire(int acquires) 方法,对于非公平锁,只要CAS设置同步状态成功,则表示当前线程获取了锁,而公平锁则不同,如下列代码所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//相比非公平锁增加了判断是否有前驱节点
if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) {
//没有前驱节点,表示当前线程等待时间最长
setExclusiveOwnerThread(current);
return true;
}
} else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

该方法与 nonfairTryAcquire(int acquires) 比较,唯一不同的位置为判断条件多了 hasQueuedPredecessors() 方法,即加入了同步队列中当前节点是否有前驱节点的判断,如果该方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释放锁之后才能继续获取锁。

下面编写一个测试来观察公平和非公平锁在获取锁时的区别,在测试用例中定义了内部类ReentrantLock2,该类主要公开了 getQueuedThreads() 方法,该方法返回正在等待获取锁的线程列表,由于列表是逆序输出,为了方便观察结果,将其进行反转,测试用例(部分)如下列代码所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class FairAndUnfairTest {
private static Lock fairLock = new ReentrantLock2(true);
private static Lock unfairLock = new ReentrantLock2(false);
@Test
public void fair() {
testLock(fairLock);
}
@Test
public void unfair() {
testLock(unfairLock);
}
private void testLock(Lock lock) {
// 启动5个Job(略)
}
private static class Job extends Thread {
private Lock lock;
public Job(Lock lock) {
this.lock = lock;
}
public void run() {
// 连续2次打印当前的Thread和等待队列中的Thread(略)
}
}
private static class ReentrantLock2 extends ReentrantLock {
public ReentrantLock2(boolean fair) {
super(fair);
}
public Collection<Thread> getQueuedThreads() {
List<Thread> arrayList = new ArrayList<Thread>(super.
getQueuedThreads());
Collections.reverse(arrayList);
return arrayList;
}
}
}

分别运行 fair()unfair() 两个测试方法,输出结果如下表所示。

fair()和unfair()两个测试方法的输出结果

观察上表所示的结果(其中每个数字代表一个线程),公平性锁每次都是从同步队列中的第一个节点获取到锁,而非公平性锁出现了一个线程连续获取锁的情况。

为什么会出现线程连续获取锁的情况呢?回顾 nonfairTryAcquire(int acquires) 方法,当一个线程请求锁时,只要获取了同步状态即成功获取锁。在这个前提下,刚释放锁的线程再次获取同步状态的几率会非常大,使得其他线程只能在同步队列中等待。

非公平性锁可能使线程“饥饿”,为什么它又被设定成默认的实现呢?再次观察上表的结果,如果把每次不同线程获取到锁定义为1次切换,公平性锁在测试中进行了10次切换,而非公平性锁只有5次切换,这说明非公平性锁的开销更小。下面运行测试用例(测试环境:ubuntu server 14.04 i5-34708GB,测试场景:10个线程,每个线程获取100000次锁),通过vmstat统计测试运行时系统线程上下文切换的次数,运行结果如下表所示。

公平性和非公平性在系统线程上下文切换方面的对比

在测试中公平性锁与非公平性锁相比,总耗时是其94.3倍,总切换次数是其133倍。可以看出:

  • 公平性锁保证了锁的获取按照FIFO原则,而代价是进行大量的线程切换
  • 非公平性锁虽然可能造成线程“饥饿”,但极少的线程切换,保证了其更大的吞吐量

第三节 ReentrantLock源码分析

3.1 概述

ReentrantLock 是很多类的基础,例如 ConcurrentHashMap 内部使用的 Segment 就是继承 ReentrantLockCopyOnWriteArrayList 也使用了 ReentrantLock

锁无法使用 try-resource 机制来释放,因为首先释放锁不是 close() 方法,然后 try-resource 机制希望在首部里声明一个新变量,而锁则需要多个线程共享此变量。

1
2
3
4
5
6
myLock.lock();//确保任何时刻只有一个线程进入临界区
try{
...
}finally{//如果抛出异常,也要顺利释放锁,否则其他线程将一直被阻塞
myLock.unlock();
}

Lock在硬件层面依赖CPU指令,完全由Java代码完成,底层利用 LockSupport 类和 Unsafe 类进行操作;

虽然锁有很多实现,但是都依赖队列同步器 AbstractQueuedSynchronizer 类,我们用代码来讲下 ReentrantLock 的实现。


3.2 Sync

Sync 是自定义队列同步器,继承了 AbstractQueuedSynchronizer 。又分为两个子类:公平锁和非公平锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class ReentrantLock implements Lock, java.io.Serializable {
/** Synchronizer providing all implementation mechanics */
private final Sync sync;

//ReentrantLock类的API调用都委托给了内部类 Sync,Sync又分为两个子类
abstract static class Sync extends AbstractQueuedSynchronizer {
...
}

//1.非公平锁
static final class NonfairSync extends Sync {
protected final boolean tryAcquire(int acquires) {
...
}
}

//2.公平锁
static final class FairSync extends Sync {
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
...
}
}

//默认为非公平锁,因为效率相较较高
public ReentrantLock() {
sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
...
}

相关依赖关系如下图所示:

类依赖图

调用lock API时的流程图如下,JDK中有很多类似的层级结构设计。

调用lock API时的流程图


3.3 Lock相关

3.3.1 nofairTryAcquire

非公平获取锁,代码解析第二节已经有了,这里简单描述一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

来看这段代码,首先获取当前状态(初始化为0),当它等于0的时候,代表还没有任何线程获得该锁,然后通过CAS(底层是通过 CompareAndSwapInt 实现)改变state,并且设置当前线程为持有锁的线程;其他线程会直接返回false;当该线程重入的时候,state已经不等于0,这个时候并不需要CAS,因为该线程已经持有锁,然后会重新通过 setState 设置state的值,这里就实现了一个偏向锁的功能,即锁偏向该线程。

3.3.2 addWaiter

新增等待结点AbstractQueuedSynchronizer 的默认实现,只有当锁被一个线程持有,另外一个线程请求获得该锁的时候才会进入这个方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
//线程从同步队列加入等待队列
//首先构建一个新的结点
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//快速尝试在尾部添加,先获取当前尾结点
Node pred = tail;
if (pred != null) {//当前等待队列已有结点
node.prev = pred;//新结点连接尾结点
if (compareAndSetTail(pred, node)) {//CAS添加新尾结点
//设置成功后,尾结点连接新结点,并返回新结点
pred.next = node;
return node;
}
}
//当前队列为空,或是CAS设置尾结点失败,调用enq循环尝试新结点入列
enq(node);
return node;
}

注意等待状态有四种值:

添加等待结点的步骤:

  • 首先,new 一个节点,这个时候模式为:mode为 Node.EXCLUSIVE ,默认为 null 即排它锁;
  • 然后,如果该队列已经有node即 tail!=null ,则将新节点的前驱节点置为tail,再通过CAS将tail指向当前节点,前驱节点的后继节点指向当前节点,然后返回当前节点;
  • 如果队列为空或者CAS失败,则通过enq入队。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* Inserts node into queue, initializing if necessary. See picture above.
* @param node the node to insert
* @return node's predecessor
*/
private Node enq(final Node node) {
//死循环尝试结点入队
for (;;) {
Node t = tail;
if (t == null) { // Must initialize 此时尾节点为空,队列无结点
if (compareAndSetHead(new Node())) //CAS设置新结点,并设尾结点为头结点
tail = head;
} else { //尾结点有值
//新结点连接尾结点
node.prev = t;
//CAS设置尾节点
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
//CAS失败继续循环
}
}

进队的时候,要么是第一个入队并且设置head节点并且循环设置tail,要么是add tail,如果CAS不成功,则会无限循环,直到设置成功,即使高并发的场景,也最终能够保证设置成功,然后返回包装好的node节点。

3.3.3 acquireQueued

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
/**
* Acquires in exclusive uninterruptible mode for thread already in
* queue. Used by condition wait methods as well as acquire.
*
* @param node the node
* @param arg the acquire argument
* @return {@code true} if interrupted while waiting
*/
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;//标识是否失败
try {
//是否已中断
boolean interrupted = false;
//死循环尝试获取同步状态
for (;;) {
//获得前驱结点
final Node p = node.predecessor();
//当前驱节点为头节点时尝试获取同步状态
if (p == head && tryAcquire(arg)) {
//得到同步状态,设置此节点为头节点
setHead(node);
p.next = null; // help GC
//标识操作成功
failed = false;
//未中断返回
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;//中断
}
} finally {
if (failed)
cancelAcquire(node);
}
}

该方法的主要作用就是将已经进入虚拟队列的节点进行阻塞,我们看到,如果当前节点的前驱节点是head并且尝试获取锁的时候成功了,则直接返回,不需要阻塞;
如果前驱节点不是头节点或者获取锁的时候失败了,则进行判定是否需要阻塞:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
/**
* Checks and updates status for a node that failed to acquire.
* Returns true if thread should block. This is the main signal
* control in all acquire loops. Requires that pred == node.prev.
*
* @param pred node's predecessor holding status
* @param node the node
* @return {@code true} if thread should block
*/
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//获取前驱结点的等待状态
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

这段代码对该节点的前驱节点的状态进行判断,如果前驱节点已经处于 signal 状态,则返回true,表明当前节点可以进入阻塞状态;

否则,将前驱节点状态CAS置为 signal 状态,然后通过上层的for循环进入 parkAndCheckInterrupt 代码块park:

1
2
3
4
5
6
7
8
9
/**
* Convenience method to park and then check if interrupted
*
* @return {@code true} if interrupted
*/
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

这个时候将该线程交给操作系统内核进行阻塞;

总体来讲,acquireQueued 就是依靠前驱节点的状态来决定当前线程是否应该处于阻塞状态,如果前驱节点处于 cancel 状态,则丢弃这些节点,重新构建队列;


3.4 Unlock相关

流程类似lock api相关类的流程,这里讲主要的代码,unlock相对的比较简单;

3.4.1 release

首先 ReentrantLock 调用 Syncrelease 接口也就是 AbstractQueuedSynchronizerrelease接口,释放同步状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/**
* Releases in exclusive mode. Implemented by unblocking one or
* more threads if {@link #tryRelease} returns true.
* This method can be used to implement method {@link Lock#unlock}.
*
* @param arg the release argument. This value is conveyed to
* {@link #tryRelease} but is otherwise uninterpreted and
* can represent anything you like.
* @return the value returned from {@link #tryRelease}
*/
public final boolean release(int arg) {
//尝试释放同步状态
if (tryRelease(arg)) {
Node h = head;
//若头结点不为空且等待状态不为0,则尝试唤醒一个结点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

3.4.2 tryRelease

释放时会首先调用 SynctryRelease ,尝试释放同步状态,如果返回true,则释放锁成功。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 当前线程未持有锁则抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
// 表示此时同步释放,返回true
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
// 否则表示同步仍在继续,返回false
setState(c);
return free;
}

这个接口的作用很简单,如果不是获得锁的线程调用直接抛出异常,否则,如果当前 state-releases==0 也就是lock已经完全释放,返回true,清除资源;

3.4.3 unparkSuccessor

tryRelease 返回free之后,release拿到head节点,尝试唤醒一个结点,进入以下代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
/**
* Wakes up node's successor, if one exists.
*
* @param node the node
*/
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);//等待状态归0

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//要取消驻留的线程保留在后续线程中,该线程通常只是下一个节点。但是,如果已取消或明显为空,请从尾部向后移动以找到实际的未取消的后继结点。
Node s = node.next;
if (s == null || s.waitStatus > 0) {//下个结点为空,或等待状态不为CANCEL
s = null;
//从尾结点往前遍历
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//找到后继结点,释放结点对应线程
if (s != null)
LockSupport.unpark(s.thread);
}

这个作用即:当头结点的状态小于0,则将头结点的状态CAS为0,然后通过链表获取下一个节点,如果下一个节点为null或者不符合要求的状态,则从队尾遍历整个链表,直到遍历到离head节点最近的一个节点并且
等待状态符合预期,则将头结点的后继节点置为该节点;

对刚刚筛出来的符合要求的节点唤醒,也就是该节点获得争夺锁的权利;

这就是非公平锁的特点:在队列一直等待的线程不一定比后来的线程先获得锁,至此,unlock已经解释完成。


第四节 ReentrantLock加锁过程

下面我们精细分析加锁过程,深入理解锁逻辑控制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class ReentrantLock {
...
public void lock() {
sync.acquire(1);
}
...
}

class Sync extends AQS {
...
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
...
}

acquire 的 if 判断语句要分为三个部分,tryAcquire 方法表示当前的线程尝试加锁,如果加锁不成功就需要排队,这时候调用 addWaiter 方法,将当前线程入队。然后再调用 acquireQueued 方法,开始了 park 、醒来重试加锁、加锁不成功继续 park 的循环重试加锁过程。直到加锁成功 acquire 方法才会返回。

如果在循环重试加锁过程中被其它线程打断了,acquireQueued 方法就会返回 true。这时候线程就需要调用 selfInterrupt() 方法给当前线程设置一个被打断的标识位。

1
2
3
4
// 打断当前线程,其实就是设置一个标识位
static void selfInterrupt() {
Thread.currentThread().interrupt();
}

线程如何知道自己被其它线程打断了呢?在 park 醒来之后调用 Thread.interrupted() 就知道了,不过这个方法只能调用一次,因为它在调用之后就会立即 clear 打断标志位。这也是为什么 acquire 方法里需要调用 selfInterrupt() ,为的就是重新设置打断标志位。这样上层的逻辑才可以通过 Thread.interrupted() 知道自己有没有被打断。

acquireQueuedaddWaiter 方法由 AQS 类提供,tryAcquire 需要由子类自己实现。不同的锁会有不同的实现。下面我们来看看 ReentrantLock 的公平锁 tryAcquire 方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

这里有个 if else 分支,其中 else if 部分表示锁的重入,当前尝试加锁的线程是已经持有了这把锁的线程,也就是同一个线程重复加锁,这时只需要增加计数值就行了。锁的 state 记录的就是加锁计数,重入一次就 +1AQS 对象里有一个 exclusiveOwnerThread 字段,记录了当前持有排他锁的线程。

if(c == 0) 意味着当前锁是自由态,计数值为零。这时就需要争抢锁,因为同一时间可能会有多个线程在调用 tryAcquire 。争抢的方式是用 CAS 操作 compareAndSetState ,成功将锁计数值从 0 改成 1 的线程将获得这把锁,将当前的线程记录到 exclusiveOwnerThread 中。

代码里还有一个 hasQueuedPredecessors() 判断,这个判断非常重要,它的意思是看看当前的 AQS 等待队列里有没有其它线程在排队,公平锁在加锁之前需要 check 一下,如果有排队的,自己就不能插队。而非公平锁就不需要 check ,公平锁和非公平锁的全部的实现差异就在于此,就这一个 check 决定了锁的公平与否。

下面我们再看看 addWaiter 方法的实现,参数 mode 表示是共享锁还是排他锁,它对应 Node.nextWaiter 属性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
/**
* Creates and enqueues node for current thread and given mode.
*
* @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
* @return the new node
*/
private Node addWaiter(Node mode) {
Node node = new Node(mode);

for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue();
}
}
}


/** Constructor used by addWaiter. */
Node(Node nextWaiter) {
this.nextWaiter = nextWaiter;
THREAD.set(this, Thread.currentThread());
}


/**
* Initializes head and tail fields on first contention.
*/
private final void initializeSyncQueue() {
Node h;
if (HEAD.compareAndSet(this, null, (h = new Node())))
tail = h;
}


/**
* CASes tail field.
*/
private final boolean compareAndSetTail(Node expect, Node update) {
return TAIL.compareAndSet(this, expect, update);
}

addWaiter 需要将新的节点添加到 AQS 等待队列的队尾。如果队尾 tail 是空的意味着队列还没有初始化,那就需要初始化一下。AQS 队列在初始化时需要一个冗余的头部节点,这个节点的 thread 字段是空的。

将新节点添加到队尾也是需要考虑多线程并发的,所以代码里再一次使用了 CAS 操作 compareAndSetTail 来竞争队尾指针。没有竞争到的线程就会继续下一轮竞争 for(;;) 继续使用 CAS 操作将新节点往队尾添加。

下面我们再看看 acquireQueue 方法的代码实现,它会重复 park 、尝试再次加锁、加锁失败继续 park 的循环过程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}

acquireQueue 在尝试加锁之前会先看看自己是不是 AQS 等待队列的第一个节点,如果不是它就继续去 park。这意味着不管是公平还是非公平锁,在这里它们都统一采取了公平的方案,看看队列中是不是轮到自己了。也就是说「一朝排队,永远排队」。

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

线程在 park 返回醒来之后要立即检测一下是否被其它线程中断了。不过即使发生中断了,它还会继续尝试获取锁,如果获取不到,还会继续睡眠,直到锁获取到了才将中断状态返回。这意味着打断线程并不会导致死锁状态(拿不到锁)退出。

同时我们还可以注意到锁是可以取消的 cancelAcquire() ,准确地说是取消处于等待加锁的状态,线程处于 AQS 的等待队列中等待加锁。那什么情况下才会抛出异常而导致取消加锁呢,唯一的可能就是 tryAcquire 方法,这个方法是由子类实现的,子类的行为不受 AQS 控制。当子类的 tryAcquire 方法抛出了异常,那 AQS 最好的处理方法就是取消加锁了。cancelAcquire 会将当前节点从等待队列中移除。


第五节 ReentrantLock解锁过程

解锁的过程要简单一些,将锁计数降为零后,唤醒等待队列中的第一个有效节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

protected final boolean tryRelease(int releases) {
int c = getState() - releases;
// 解铃还须系铃人
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

考虑到可重入锁,需要判断锁计数是否降为零才可以确定锁是否彻底被释放。只有锁彻底被释放了才能唤醒后继等待节点。unparkSuccessor 会跳过无效节点(已取消的节点),找到第一个有效节点调用 unpark() 唤醒相应的线程。


部分参考以下内容:

《Java并发编程的艺术》

Java之ReentrantLock实现

打通 Java 任督二脉 —— 并发数据结构的基石

若内容涉及侵权请及时告知,我会尽快修改和删除相关内容