读写锁ReentrantReadWriteLock的实现原理

ReentrantReadWriteLock

第一节 简介

1.1 什么是读写锁

synchronizedReentrantLock 等基本都是排他锁,这些锁在同一时刻只允许一个线程进行访问,而读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。

1.2 读写锁的优势

  • 保证了读操作间的并发。
  • 保证写操作对读操作的可见性。
  • 简化读写交互场景的编程方式。

1.3 适用场景

假设在程序中定义一个共享的用作缓存的数据结构,它大部分时间提供读服务(例如查询和搜索),而写操作占有的时间很少,但是写操作完成之后的更新需要对后续的读服务可见

在没有读写锁支持的(Java 5之前)时候,如果需要完成上述工作就要使用Java的等待通知机制,就是当写操作开始时,所有晚于写操作的读操作均会进入等待状态,只有写操作完成并进行通知之后,所有等待的读操作才能继续执行(写操作之间依靠 synchronized 关键进行同步),这样做的目的是使读操作能读取到正确的数据,不会出现脏读

改用读写锁实现上述功能,只需要在读操作时获取读锁,写操作时获取写锁即可。当写锁被获取到时,后续(非当前写操作线程)的读写操作都会被阻塞,写锁释放之后,所有操作继续执行,编程方式相对于使用等待通知机制的实现方式而言,变得更简单明了。

一般情况下,读写锁的性能都会比排它锁好,因为大多数场景读是多于写的。在读多于写的情况下,读写锁能够提供比排它锁更好的并发性和吞吐量。Java并发包提供读写锁的实现是 ReentrantReadWriteLock ,它提供的特性如下表所示。

ReentrantReadWriteLock的特性


第二节 接口与示例

接口 ReadWriteLock 仅定义了获取读锁写锁的两个方法,即 readLock() 方法和 writeLock() 方法,而其实现—— ReentrantReadWriteLock ,除了接口方法之外,还提供了一些便于外界监控其内部工作状态的方法,这些方法以及描述如下表所示。

ReentrantReadWriteLock展示内部工作状态的方法

接下来,通过一个缓存示例说明读写锁的使用方式,示例代码如下列代码所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class Cache {
static Map<String, Object> map = new HashMap<String, Object>();//缓存的存储结构
static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();//读写锁
static Lock r = rwl.readLock();//读锁
static Lock w = rwl.writeLock();//写锁

// 获取一个key对应的value
public static final Object get(String key) {
r.lock();
try {
return map.get(key);
} finally {
r.unlock();
}
}
// 设置key对应的value,并返回旧的value
public static final Object put(String key, Object value) {
w.lock();
try {
return map.put(key, value);
} finally {
w.unlock();
}
}
// 清空所有的内容
public static final void clear() {
w.lock();
try {
map.clear();
} finally {
w.unlock();
}
}
}

上述示例中,Cache组合一个非线程安全的HashMap作为缓存的实现,同时使用读写锁的读锁和写锁来保证Cache是线程安全的。在读操作 get(String key) 方法中,需要获取读锁,这使得并发访问该方法时不会被阻塞。写操作 put(String key,Object value) 方法和 clear() 方法,在更新HashMap时必须提前获取写锁,当获取写锁后,其他线程对于读锁和写锁的获取均被阻塞,而只有写锁被释放之后,其他读写操作才能继续。


第三节 实现分析

接下来分析 ReentrantReadWriteLock 的实现,主要包括:读写状态的设计写锁的获取与释放读锁的获取与释放以及锁降级(以下没有特别说明读写锁均可认为是 ReentrantReadWriteLock )。

3.1 读写状态的设计

读写锁同样依赖自定义同步器 AQS 来实现同步功能,而读写状态就是其同步器的同步状态。回想 ReentrantLock 中自定义同步器的实现,同步状态表示锁被一个线程重复获取的次数,而读写锁的自定义同步器需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态,使得该状态的设计成为读写锁实现的关键。

如果在一个整型变量上维护多种状态,就一定需要“按位切割使用”这个变量,读写锁将变量切分成了两个部分,高16位表示读,低16位表示写,划分方式如下图所示。

读写锁状态的划分方式

当前同步状态表示一个线程已经获取了写锁,且重进入了两次,同时也连续获取了两次读锁。读写锁是如何迅速确定读和写各自的状态呢?答案是通过位运算。假设当前同步状态值为S,**写状态等于 S&0x0000FFFF(将高16位全部抹去),读状态等于 S>>>16(无符号补0右移16位)。当写状态增加1时,等于S+1,当读状态增加1时,等于 S+(1<<16) ,也就是 S+0x00010000 **。

根据状态的划分能得出一个推论:S不等于0时,当写状态( S&0x0000FFFF )等于0时,则读状态( S>>>16 )大于0,即读锁已被获取

3.2 写锁的获取与释放

写锁是一个支持重进入的排它锁。如果当前线程已经获取了写锁则增加写状态。如果当前线程在获取写锁时,读锁已经被获取(读状态不为0)或者该线程不是已经获取写锁的线程,则当前线程进入等待状态,获取写锁的代码如下列代码所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
protected final boolean tryAcquire(int acquires) {//获取写锁
Thread current = Thread.currentThread();//获取当前线程
int c = getState();//获取同步状态
int w = exclusiveCount(c);//获取写状态
if (c != 0) {
// 存在读锁或者当前获取线程不是已经获取写锁的线程
if (w == 0 || current != getExclusiveOwnerThread())
return false;//写状态为0,当前线程未持有锁时,返回false
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");//加锁次数超过限制,抛出异常
setState(c + acquires);//更新同步状态
return true;
}
if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) {//同步状态为0时,判断写操作是否需要封锁,再尝试进行同步状态修改操作
return false;
}
setExclusiveOwnerThread(current);//更新持有线程
return true;
}

该方法除了重入条件(当前线程为获取了写锁的线程)之外,增加了一个读锁是否存在的判断。如果存在读锁,则写锁不能被获取,原因在于:读写锁要确保写锁的操作对读锁可见,如果允许读锁在已被获取的情况下对写锁的获取,那么正在运行的其他读线程就无法感知到当前写线程的操作。因此,只有等待其他读线程都释放了读锁,写锁才能被当前线程获取,而写锁一旦被获取,则其他读写线程的后续访问均被阻塞。

写锁的释放与 ReentrantLock 的释放过程基本类似,每次释放均减少写状态,当写状态为0时表示写锁已被释放,从而等待的读写线程能够继续访问读写锁,同时前次写线程的修改对后续读写线程可见。

3.3 读锁的获取与释放

读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问(或者写状态为0)时,读锁总会被成功地获取,而所做的也只是(线程安全的)增加读状态。如果当前线程已经获取了读锁,则增加读状态。如果当前线程在获取读锁时,写锁已被其他线程获取,则进入等待状态。获取读锁的实现从 Java 5Java 6 变得复杂许多,主要原因是新增了一些功能,例如 getReadHoldCount() 方法,作用是返回当前线程获取读锁的次数。读状态是所有线程获取读锁次数的总和,而每个线程各自获取读锁的次数只能选择保存在 ThreadLocal 中,由线程自身维护,这使获取读锁的实现变得复杂。因此,这里将获取读锁的代码做了删减,保留必要的部分,如下列代码所示。

1
2
3
4
5
6
7
8
9
10
11
12
protected final int tryAcquireShared(int unused) {//获取读锁
for (;;) {//死循环
int c = getState();//获取同步状态
int nextc = c + (1 << 16);//读状态的高16位+1
if (nextc < c)
throw new Error("Maximum lock count exceeded");
if (exclusiveCount(c) != 0 && owner != Thread.currentThread())//写状态不为0,且当前线程未持有锁,则返回false
return -1;
if (compareAndSetState(c, nextc))//更新同步状态
return 1;
}
}

tryAcquireShared(int unused) 方法中,如果其他线程已经获取了写锁,则当前线程获取读锁失败,进入等待状态。如果当前线程获取了写锁或者写锁未被获取,则当前线程(线程安全,依靠CAS保证)增加读状态,成功获取读锁。

读锁的每次释放(线程安全的,可能有多个读线程同时释放读锁)均减少读状态,减少的值是 1<<16

3.4 锁降级

锁降级指的是写锁降级成为读锁。如果当前线程拥有写锁,然后将其释放,最后再获取读锁,这种分段完成的过程不能称之为锁降级。锁降级是指把持住(当前拥有的)写锁,再获取到读锁,随后释放(先前拥有的)写锁的过程

接下来看一个锁降级的示例。因为数据不常变化,所以多个线程可以并发地进行数据处理,当数据变更后,如果当前线程感知到数据变化,则进行数据的准备工作,同时其他处理线程被阻塞,直到当前线程完成数据的准备工作,如下列代码所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public void processData() {
readLock.lock();//获取读锁
if (!update) {//update变量为false,表示数据发生变更
// 必须先释放读锁
readLock.unlock();
// 锁降级从写锁获取到开始
writeLock.lock();
try {
if (!update) {
// 准备数据的流程(略)
update = true;
}
readLock.lock();//完成数据流程后再获取读锁
} finally {
writeLock.unlock();//释放写锁
}
// 锁降级完成,写锁降级为读锁
}
try {
// 使用数据的流程(略)
} finally {
readLock.unlock();
}
}

上述示例中,当数据发生变更后,update变量(布尔类型且volatile修饰)被设置为false,此时所有访问 processData() 方法的线程都能够感知到变化,但只有一个线程能够获取到写锁,其他线程会被阻塞在读锁和写锁的 lock() 方法上。当前线程获取写锁完成数据准备之后,再获取读锁,随后释放写锁,完成锁降级。

锁降级中读锁的获取是否必要呢?答案是必要的。主要是为了保证数据的可见性,如果当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程(记作线程T)获取了写锁并修改了数据,那么当前线程无法感知线程T的数据更新。如果当前线程获取读锁,即遵循锁降级的步骤,则线程T将会被阻塞,直到当前线程使用数据并释放读锁之后,线程T才能获取写锁进行数据更新。

RentrantReadWriteLock 不支持锁升级(把持读锁、获取写锁,最后释放读锁的过程)。目的也是保证数据可见性如果读锁已被多个线程获取,其中任意线程成功获取了写锁并更新了数据,则其更新对其他获取到读锁的线程是不可见的


第四节 相关补充

4.1 共享锁

我们知道读锁不是排他锁,它允许多线程同时持有读锁,是共享锁。共享锁和排他锁是通过 Node 类里面的 nextWaiter 字段区分的。

1
2
3
4
5
6
7
8
class AQS {
static final Node SHARED = new Node();
static final Node EXCLUSIVE = null;

boolean isShared() {
return this.nextWaiter == SHARED;
}
}

4.2 ReentrantReadWriteLock与ReentrantLock的比较

ReentrantReadWriteLockReentrantLock 实现非常类似,它具有以下属性。

4.2.1 锁获取顺序

ReentrantLock 不会将读取者优先或写入者优先强加给锁访问的排序。但是,它确实支持可选的公平策略。

4.2.2 默认模式,非公平模式

当非公平策略(默认)构造时,未指定进入读写锁的顺序,受到 reentrancy 约束的限制。连续竞争的非公平锁可能无限期地推迟一个或多个 readerwriter 线程,但吞吐量通常要高于公平锁。

4.2.3 公平模式

当使用公平策略时,线程使用一种近似同步到达的顺序争夺资源。当线程释放当前持有锁时,等待时间最长的 write 线程获取到写入锁,如果有一组等待时间大于所有正在等待的 writer 线程的 reader 线程,将为该组分配写入锁。

当一个线程尝试获取公平策略的 read-lock 时,如果 write-lock 被持有或者有等待的 write 线程,则当前线程会阻塞。直到当前最旧的等待 writer 线程已获得并释放了写入锁之后,该线程才会获得读取锁。当然,如果等待 writer 放弃其等待,而保留一个或更多 reader 线程为队列中带有写入锁自由的时间最长的 waiter ,则将为那些 reader 分配读取锁。

4.2.4 锁降级

重入还允许从写入锁降级为读取锁,其实现方式是:先获取写入锁,然后获取读取锁,最后释放写入锁。但是,从读取锁升级到写入锁是不可能的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class CachedData {
Object data;
volatile boolean cacheValid;
ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();

void processCachedData() {
rwl.readLock().lock();
if (!cacheValid) {
// Must release read lock before acquiring write lock
rwl.readLock().unlock();
rwl.writeLock().lock();
// Recheck state because another thread might have acquired
// write lock and changed state before we did.
if (!cacheValid) {
data = ...
cacheValid = true;
}
// Downgrade by acquiring read lock before releasing write lock
rwl.readLock().lock();
rwl.writeLock().unlock(); // Unlock write, still hold read
}

use(data);
rwl.readLock().unlock();
}
}

4.2.5 锁获取的中断

读取锁和写入锁都支持锁获取期间的中断。

4.2.6 Condition支持

写入锁提供了一个 Condition 实现,对于写入锁来说,该实现的行为与 ReentrantLock.newCondition() 提供的 Condition 实现对 ReentrantLock 所做的行为相同。当然,此 Condition 只能用于写入锁。

读取锁不支持 ConditionreadLock().newCondition() 会抛出 UnsupportedOperationException

在使用某些种类的 Collection 时,可以使用 ReentrantReadWriteLock 来提高并发性。通常,在预期 collection 很大,读取者线程访问它的次数多于写入者线程,并且 entail 操作的开销高于同步开销时,这很值得一试。例如,以下是一个使用 TreeMap 的类,预期它很大,并且能被同时访问。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class RWDictionary {
private final Map<String, Data> m = new TreeMap<String, Data>();
private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
private final Lock r = rwl.readLock();
private final Lock w = rwl.writeLock();

public Data get(String key) {
r.lock();
try { return m.get(key); }
finally { r.unlock(); }
}
public String[] allKeys() {
r.lock();
try { return m.keySet().toArray(); }
finally { r.unlock(); }
}
public Data put(String key, Data value) {
w.lock();
try { return m.put(key, value); }
finally { w.unlock(); }
}
public void clear() {
w.lock();
try { m.clear(); }
finally { w.unlock(); }
}
}

第五节 源码解析

5.1 ReadWriteLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public interface ReadWriteLock {//读写锁接口
/**
* Returns the lock used for reading.
*
* @return the lock used for reading
*/
Lock readLock();

/**
* Returns the lock used for writing.
*
* @return the lock used for writing
*/
Lock writeLock();
}

5.2 ReentrantReadWriteLock

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public class ReentrantReadWriteLock
implements ReadWriteLock, java.io.Serializable {
private final ReentrantReadWriteLock.ReadLock readerLock;
private final ReentrantReadWriteLock.WriteLock writerLock;
final Sync sync;

public ReentrantReadWriteLock() {
this(false);
}

public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}


abstract static class Sync extends AbstractQueuedSynchronizer {//AQS自定义实现队列同步器
...
}


static final class NonfairSync extends Sync {//非公平策略
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
}

static final class FairSync extends Sync {//公平策略
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}

public static class ReadLock implements Lock, java.io.Serializable {//内部类ReadLock
...
}


public static class WriteLock implements Lock, java.io.Serializable {//内部类WriteLock
...
}

}

5.3 Sync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
/**
* ReentrantReadWriteLock内部的实现机制
* @author Iflytek_dsw
*
*/
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 6317671515068378041L;

/**
* 这些量用来计算Read和Write锁的数量,ReentrantReadWriterLock使用一个32位的int类型来表示锁被占用的线程数
* (ReentrantLock中的state)采取的办法是:
* 高16位用来表示读锁(共享锁)占有的线程数量,用低16位表示写锁(独占锁)被占用的数量
*/
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

/**
* 通过进行右移16位计算出共享锁(读取锁)的占用个数
*/
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
/**
* 通过进行&运算计算出低16位的写锁(独占锁)个数
*/
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

/**
* 定义一个容器来计算保存每个线程read锁的个数
*/
static final class HoldCounter {
int count = 0;
// Use id, not reference, to avoid garbage retention
final long tid = Thread.currentThread().getId();
}

/**
* ThreadLocal的使用来针对每个线程进行存储HoldCounter
*/
static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}

/**
* 一个read线程的HoldCounter,当为0的时候进行删除。
*/
private transient ThreadLocalHoldCounter readHolds;

/**
* cachedHoldCounter 缓存的是最后一个获取线程的HolderCount信息,
* 该变量主要是在如果当前线程多次获取读锁时,减少从readHolds中获取HoldCounter的次数
*/
private transient HoldCounter cachedHoldCounter;

/**
* firstReader is the first thread to have acquired the read lock.
* firstReaderHoldCount is firstReader's hold count.
*/
private transient Thread firstReader = null;
private transient int firstReaderHoldCount;

Sync() {
readHolds = new ThreadLocalHoldCounter();
setState(getState()); // ensures visibility of readHolds
}

/**
* Returns true if the current thread, when trying to acquire
* the read lock, and otherwise eligible to do so, should block
* because of policy for overtaking other waiting threads.
*/
abstract boolean readerShouldBlock();

/**
* Returns true if the current thread, when trying to acquire
* the write lock, and otherwise eligible to do so, should block
* because of policy for overtaking other waiting threads.
*/
abstract boolean writerShouldBlock();

protected final boolean tryRelease(int releases) {
/**当前线程不支持锁的时候,抛IllegalMonitor异常*/
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
/**当前锁线程的个数是否为0个*/
boolean free = exclusiveCount(nextc) == 0;
if (free)
setExclusiveOwnerThread(null);
setState(nextc);
return free;
}

protected final boolean tryAcquire(int acquires) {
//省略
}

protected final boolean tryReleaseShared(int unused) {
//省略
}

private IllegalMonitorStateException unmatchedUnlockException() {
return new IllegalMonitorStateException(
"attempt to unlock read lock, not locked by current thread");
}

protected final int tryAcquireShared(int unused) {
//省略
}

/**
* Full version of acquire for reads, that handles CAS misses
* and reentrant reads not dealt with in tryAcquireShared.
*/
final int fullTryAcquireShared(Thread current) {
//省略
}

/**
* Performs tryLock for write, enabling barging in both modes.
* This is identical in effect to tryAcquire except for lack
* of calls to writerShouldBlock.
*/
final boolean tryWriteLock() {
//省略
}

/**
* Performs tryLock for read, enabling barging in both modes.
* This is identical in effect to tryAcquireShared except for
* lack of calls to readerShouldBlock.
*/
final boolean tryReadLock()
//省略
}

protected final boolean isHeldExclusively() {
// While we must in general read state before owner,
// we don't need to do so to check if current thread is owner
return getExclusiveOwnerThread() == Thread.currentThread();
}

// Methods relayed to outer class

final ConditionObject newCondition() {
return new ConditionObject();
}

final Thread getOwner() {
// Must read state before owner to ensure memory consistency
return ((exclusiveCount(getState()) == 0) ?
null :
getExclusiveOwnerThread());
}

final int getReadLockCount() {
return sharedCount(getState());
}

final boolean isWriteLocked() {
return exclusiveCount(getState()) != 0;
}

final int getWriteHoldCount() {
return isHeldExclusively() ? exclusiveCount(getState()) : 0;
}

final int getReadHoldCount() {
//省略
}

/**
* Reconstitute this lock instance from a stream
* @param s the stream
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
readHolds = new ThreadLocalHoldCounter();
setState(0); // reset to unlocked state
}

final int getCount() { return getState(); }
}

5.4 FairSync

ReentrantReadWriteLock 同样支持公平锁和非公平锁。FairSync的实现如下:

1
2
3
4
5
6
7
8
9
static final class FairSync extends Sync {
private static final long serialVersionUID = -2274990926593161451L;
final boolean writerShouldBlock() {
return hasQueuedPredecessors();
}
final boolean readerShouldBlock() {
return hasQueuedPredecessors();
}
}

在Sync类中预留了 writerShouldBlock()readerShouldBlock() 两个抽象方法供子类重写。在FairSync中返回的是 hasQueuedPredecessors() 方法的返回值。

1
2
3
4
5
6
7
8
9
10
11
/**
* 当前线程前面是否有处理的线程,如果有返回true,反之返回false。
* 队列为空同样返回false
*/
public final boolean hasQueuedPredecessors() {
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}

5.5 NonfairSync

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -8159625535654395037L;
final boolean writerShouldBlock() {
return false; // writers can always barge
}
final boolean readerShouldBlock() {
/* As a heuristic to avoid indefinite writer starvation,
* block if the thread that momentarily appears to be head
* of queue, if one exists, is a waiting writer. This is
* only a probabilistic effect since a new reader will not
* block if there is a waiting writer behind other enabled
* readers that have not yet drained from the queue.
*/
return apparentlyFirstQueuedIsExclusive();
}
}

在非公平锁中,write返回的是false。而在读锁中则返回 apparentlyFirstQueuedIsExclusive() 方法。

1
2
3
4
5
6
7
final boolean apparentlyFirstQueuedIsExclusive() {
Node h, s;
return (h = head) != null &&
(s = h.next) != null &&
!s.isShared() &&
s.thread != null;
}

该方法如果头节点不为空,并头节点的下一个节点不为空,并且不是共享模式【独占模式,写锁】、并且线程不为空,则返回true。

这个方法判断队列的 head.next 是否正在等待独占锁(写锁,因为在 ReentrantReadWriteLock 中读写锁共用一个队列)。当然这个方法执行的过程中队列的形态可能发生变化。这个方法的意思是:读锁不应该让写锁始终等待,因为在同一时刻读写锁只能有一种锁在“工作”。

5.6 ReadLock

ReadLock是读取锁的获取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
public static class ReadLock implements Lock, java.io.Serializable {
private static final long serialVersionUID = -5992448646407690164L;
private final Sync sync;

/**
* Constructor for use by subclasses
*
* @param lock the outer lock object
* @throws NullPointerException if the lock is null
*/
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}

/**
* Acquires the read lock.
* 申请read锁
* 如果write锁没有被别的线程持有,则立即返回read锁。如果write锁被占用,则当前线程会被阻塞,直至获取到read锁。
*/
public void lock() {
sync.acquireShared(1);
}


public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

/**
* Acquires the read lock only if the write lock is not held by
* another thread at the time of invocation.
*
* <p>If the write lock is held by another thread then
* this method will return immediately with the value
* {@code false}.
*
* @return {@code true} if the read lock was acquired
*/
public boolean tryLock() {
return sync.tryReadLock();
}

public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}

/**
* Attempts to release this lock.
*
* <p> If the number of readers is now zero then the lock
* is made available for write lock attempts.
*/
public void unlock() {
sync.releaseShared(1);
}

/**
* Throws {@code UnsupportedOperationException} because
* {@code ReadLocks} do not support conditions.
*
* @throws UnsupportedOperationException always
*/
public Condition newCondition() {
throw new UnsupportedOperationException();
}

/**
* Returns a string identifying this lock, as well as its lock state.
* The state, in brackets, includes the String {@code "Read locks ="}
* followed by the number of held read locks.
*
* @return a string identifying this lock, as well as its lock state
*/
public String toString() {
int r = sync.getReadLockCount();
return super.toString() +
"[Read locks = " + r + "]";
}
}

ReadLock 继承 Lock 类,并实现了对应的申请所、释放锁方法。

5.7 ReadLock申请锁lock

1
2
3
public void lock() {
sync.acquireShared(1);
}

read锁是共享锁,这里调用sync对象的 acquireShared() 方法来申请锁。

1
2
3
4
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}

这里 tryAcquireShared 方法执行就到了Sync类中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. 如果write锁被其它线程占用,申请fail
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
/**获取当前的线程*/
Thread current = Thread.currentThread();
/**获取当前锁的状态值*/
int c = getState();
/**如果写锁(独占锁)占有个数不为0,并且持有线程不等于当前线程,返回-1*/
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
/**获取读取锁(共享锁)的个数*/
int r = sharedCount(c);
/** 如果当前read锁不被阻塞,并且个数小于最大MAX_COUNT,同时CAS操作成功*/
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {//如果当前共享锁个数为0,没有被持有
/**当前线程赋值给firstReader,并且firstReadHoldCount赋值为1*/
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
/**如果当前线程已经持有所,则firstReaderHoldCount自加1*/
firstReaderHoldCount++;
} else {/**其它情况则是新来的一个线程*/
/**新建一个HoldCounter对象存储cachedHoldCounter*/
HoldCounter rh = cachedHoldCounter;
/**如果cachedHoldCounter为空,或tid不等于当前线程*/
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
/**如果锁被占用等不满足上述情况,则通过fullTryAcquireShared进行申请*/
return fullTryAcquireShared(current);
}

在tryAcquireShared方法中, 如果当前read锁不被阻塞,并且个数小于最大MAX_COUNT,同时CAS操作成功则申请锁成功。反之通过fullTryAcquireShared方法进行申请。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
final int fullTryAcquireShared(Thread current) {
HoldCounter rh = null;
/**采用自旋的方式,直至申请到锁*/
for (;;) {
int c = getState();
/**如果Write锁被占用,并且持有线程不是当前线程则返回-1*/
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
} else if (readerShouldBlock()) {//Write锁被持有
// 如果最近没有申请过read锁
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {//没有申请过
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId()) {
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
/**Read锁已经到达最大值*/
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}

采用自旋的方式,其它逻辑同tryAcquireShared类似。

  • 如果Write锁被占有,并且持有写锁的线程不是当前线程直接返回-1
  • 如果Write锁当前线程持有,并且在CLH队列中下一个节点是Write锁,则返回-1

最后在所有申请失败返回-1的时候,则通过doAcquireShared()方法进行申请。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
if (interrupted)
selfInterrupt();
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

8 WriteLock

前面在ReentrantLock的笔记中,同样有tryLock方法。其实它们都差不多。

1
2
3
public  boolean tryLock() {
return sync.tryReadLock();
}

tryLock的内部调用的Sync类的tryReadLock()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
/**Write锁被占用并且占用线程不是当前线程,返回false*/
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
/**返回read锁的个数*/
int r = sharedCount(c);
/**如果是最大值,则抛出异常*/
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {//Read的个数为0,则将当前线程赋值给firstReader,并且firstReaderHoldCount赋值为1
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {//如果firstReader==当前线程,则firstReaderHoldCount自加1
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}

9 锁的释放

1
2
3
public void unlock() {
sync.releaseShared(1);
}

unlock释放锁最终还是在Sync类的releaseShared方法中进行释放。releaseShared方法是在AQS类中。

1
2
3
4
5
6
7
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

在AQS中,我们知道releaseShared方法是AQS预留给子类进行实现的,所以最终的实现还是在Sync类中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
if (firstReaderHoldCount == 1)
firstReader = null;
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != current.getId())
rh = readHolds.get();
int count = rh.count;
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
--rh.count;
}
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}

10 总结

上面我们针对ReadLock的锁的申请、释放进行了分析,WriteLock由于是个独占锁,大致跟ReentrantLock流程差不多,不做过多分析。总体说来,还是需要多读几遍源码才能理解透彻。
最后给一个使用的示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public class MyTest {

static StudentList studentList = new StudentList();
public static void main(String[]args){
for(int i=0;i<3;i++){
new Thread(new Runnable() {

@Override
public void run() {
studentList.addItem();
}
}).start();
}

for(int i=0;i<3;i++){
new Thread(new Runnable() {

@Override
public void run() {
studentList.printList();
}
}).start();
}
}
}

class StudentList{
private List<Integer> listNumber = new ArrayList<Integer>();

private ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
private ReadLock readLock;
private WriteLock writeLock;
public StudentList(){
readLock = reentrantReadWriteLock.readLock();
writeLock = reentrantReadWriteLock.writeLock();
}

public void printList(){
readLock.lock();
for(int i=0;i<listNumber.size();i++){
System.out.println(Thread.currentThread().getName() + "--printList--current:" + i);
}
readLock.unlock();
}

public void addItem(){
writeLock.lock();
for(int i=listNumber.size();i<5;i++){
System.out.println(Thread.currentThread().getName() + "--addItem--current:" + i);
listNumber.add(i);
}
writeLock.unlock();
}
}

执行结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Thread-0--addItem--current:0
Thread-0--addItem--current:1
Thread-0--addItem--current:2
Thread-0--addItem--current:3
Thread-0--addItem--current:4
Thread-3--printList--current:0
Thread-3--printList--current:1
Thread-3--printList--current:2
Thread-3--printList--current:3
Thread-3--printList--current:4
Thread-4--printList--current:0
Thread-5--printList--current:0
Thread-4--printList--current:1
Thread-5--printList--current:1
Thread-4--printList--current:2
Thread-5--printList--current:2
Thread-4--printList--current:3
Thread-5--printList--current:3
Thread-4--printList--current:4
Thread-5--printList--current:4

通过运行结果我们可以证明一个结论:Read锁是共享锁,每个线程都能获取到。Write锁是独占锁,只能同时被一个线程持有。
当我们把Write锁释放代码注释:

1
2
3
4
5
6
7
8
public void addItem(){
writeLock.lock();
for(int i=listNumber.size();i<5;i++){
System.out.println(Thread.currentThread().getName() + "--addItem--current:" + i);
listNumber.add(i);
}
//writeLock.unlock();
}
1
2
3
4
5
Thread-0--addItem--current:0
Thread-0--addItem--current:1
Thread-0--addItem--current:2
Thread-0--addItem--current:3
Thread-0--addItem--current:4

通过运行结果可以有结论:当Write锁被持有的时候,Read锁是无法被其它线程申请的,会处于阻塞状态。直至Write锁释放。同时也可以验证到当同一个线程持有Write锁时是可以申请到Read锁。

11 写锁加锁过程

读写锁的写锁加锁在整体逻辑上和 ReentrantLock 是一样的,不同的是 tryAcquire() 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}

protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}

写锁也需要考虑可重入,如果当前 AQS 互斥锁的持有线程正好是当前要加锁的线程,那么就是写锁在重入,重入只需要递增锁计数值即可。当 c!=0 也就是锁计数不为零时,既可能是因为当前的 AQS 有读锁也可能是因为有写锁,判断 w == 0 就是判断当前的计数是不是读锁带来的。

如果计数值为零,那就开始争抢锁。取决于锁是否公平,在争抢之前调用 writerShouldBlock() 方法看看自己是否需要排队,如果不需要排队,就可以使用 CAS 操作来争抢,成功将计数值从 0 设置为 1 的线程将独占写锁。

12 读锁加锁过程

读锁加锁过程比写锁要复杂很多,它在整体流程上和写锁一样,但是细节差距很大。特别是它需要为每一个线程记录读锁计数,这部分逻辑占据了不少代码。

1
2
3
4
5
6
public final void acquireShared(int arg) {
// 如果尝试加锁不成功, 就去排队休眠,然后循环重试
if (tryAcquireShared(arg) < 0)
// 排队、循环重试
doAcquireShared(arg);
}

如果当前线程已经持有写锁,它还可以继续加读锁,这是为了达成锁降级必须支持的逻辑。锁降级是指在持有写锁的情况下,再加读锁,再解写锁。相比于先写解锁再加读锁而言,这样可以省去加锁二次排队的过程。因为锁降级的存在,锁计数中读写计数可以同时不为零。

1
2
3
4
5
6
7
8
9
10
11
12
wlock.lock();
if(whatever) {
// 降级
rlock.lock();
wlock.unlock();
doRead();
rlock.unlock();
} else {
// 不降级
doWrite()
wlock.unlock();
}

为了给每一个读锁线程进行锁计数,它设置了一个 ThreadLocal 变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
private transient ThreadLocalHoldCounter readHolds;

static final class HoldCounter {
int count;
final long tid = LockSupport.getThreadId(Thread.currentThread());
}

static final class ThreadLocalHoldCounter
extends ThreadLocal<HoldCounter> {
public HoldCounter initialValue() {
return new HoldCounter();
}
}

但是 ThreadLocal 变量访问起来效率不够高,所以又设置了缓存。它会存储最近一次获取读锁线程的锁计数。在线程争用不是特别频繁的情况下,直接读取缓存会比较高效。

1
private transient HoldCounter cachedHoldCounter;

Dough Lea 觉得使用 cachedHoldCounter 还是不够高效,所以又加了一层缓存记录 firstReader,记录第一个将读锁计数从 0 变成 1 的线程以及锁计数。当没有线程争用时,直接读取这两个字段会更加高效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private transient Thread firstReader;
private transient int firstReaderHoldCount;

final int getReadHoldCount() {
// 先访问锁全局计数的读计数部分
if (getReadLockCount() == 0)
return 0;

// 再访问 firstReader
Thread current = Thread.currentThread();
if (firstReader == current)
return firstReaderHoldCount;

// 再访问最近的读线程锁计数
HoldCounter rh = cachedHoldCounter;
if (rh != null && rh.tid == LockSupport.getThreadId(current))
return rh.count;

// 无奈读 ThreadLocal 吧
int count = readHolds.get().count;
if (count == 0) readHolds.remove();
return count;
}

所以我们看到为了记录这个读锁计数作者煞费苦心,那这个读计数的作用是什么呢?那就是线程可以通过这个计数值知道自己有没有持有这个读写锁。

读加锁还有一个自旋的过程,所谓自旋就是第一次加锁失败,那就直接循环重试,不休眠,听起来有点像死循环重试法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final static int SHARED_UNIT = 65536
// 读计数是高16位

final int fullTryAcquireShared(Thread current) {
for(;;) {
int c = getState();
// 如果有其它线程加了写锁,还是返回睡觉去吧
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
...
// 超出计数上限
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
// 拿到读锁了
...
return 1
}
...
// 循环重试
}
}

因为读锁需要使用 CAS 操作来修改底层锁的总读计数值,成功的才可以获得读锁,获取读锁的 CAS 操作失败只是意味着读锁之间存在 CAS 操作的竞争,并不意味着此刻锁被别人占据了自己不能获得。多试几次肯定可以加锁成功,这就是自旋的原因所在。同样在释放读锁的时候也有一个 CAS 操作的循环重试过程。

1
2
3
4
5
6
7
8
9
10
11
protected final boolean tryReleaseShared(int unused) {
...
for (;;) {
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc)) {
return nextc == 0;
}
}
...
}

参考博客和文章书籍等:

《Java并发编程的艺术》

Java并发编程——ReentrantReadWriteLock

打通 Java 任督二脉 —— 并发数据结构的基石

因博客主等未标明不可引用,若部分内容涉及侵权请及时告知,我会尽快修改和删除相关内容