Condition接口及实现原理

Condition接口

第一节 简介

任意一个Java对象,都拥有一组监视器方法(定义在 java.lang.Object 上),主要包括 wait()wait(long timeout)notify() 以及 notifyAll() 方法,这些方法与 synchronized 同步关键字配合,可以实现等待/通知模式

Condition 接口也提供了类似 Object 的监视器方法,与Lock配合可以实现等待/通知模式,但是这两者在使用方式以及功能特性上还是有差别的。

通过对比 Object 的监视器方法和 Condition 接口,可以更详细地了解 Condition 的特性,对比项与结果如下表所示。

Object的监视器方法与Condition接口的对比


第二节 接口与示例

Condition定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到Condition对象关联的锁。Condition对象是由Lock对象(调用Lock对象的 newCondition() 方法)创建出来的,换句话说,Condition是依赖Lock对象的

Condition的使用方式比较简单,需要注意在调用方法前获取锁,使用方式如下列代码所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 1.获取锁对象
Lock lock = new ReentrantLock();
// 2.获得condition对象
Condition condition = lock.newCondition();

public void conditionWait() throws InterruptedException {
//拿到锁
lock.lock();
try {
//调用condition,使当前线程进入等待状态
condition.await();
} finally {
lock.unlock();
}
}

public void conditionSignal() throws InterruptedException {
lock.lock();
try {
//调用condition,唤醒一个等待状态的线程
condition.signal();
} finally {
lock.unlock();
}
}

如示例所示,一般都会将Condition对象作为成员变量。当调用 await() 方法后,当前线程会释放锁并在此等待,而其他线程调用Condition对象的 signal() 方法,通知当前线程后,当前线程才从 await() 方法返回,并且在返回前已经获取了锁。

Condition定义的(部分)方法以及描述如下表所示。

Condition的(部分)方法以及描述

获取一个Condition必须通过Lock的 newCondition() 方法。下面通过一个有界队列的示例来深入了解Condition的使用方式。有界队列是一种特殊的队列,当队列为空时,队列的获取操作将会阻塞获取线程,直到队列中有新增元素,当队列已满时,队列的插入操作将会阻塞插入线程,直到队列出现“空位”,如下列代码所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class BoundedQueue<T> {
private Object[] items;
// 添加的下标,删除的下标和数组当前数量
private int addIndex, removeIndex, count;
private Lock lock = new ReentrantLock();
private Condition notEmpty = lock.newCondition();
private Condition notFull = lock.newCondition();
public BoundedQueue(int size) {
items = new Object[size];
}
// 添加一个元素,如果数组满,则添加线程进入等待状态,直到有"空位"
public void add(T t) throws InterruptedException {
lock.lock();
try {
while (count == items.length)
notFull.await(); //数组已满,调用notFull使当前线程等待
items[addIndex] = t;
if (++addIndex == items.length)
addIndex = 0;
++count;
notEmpty.signal();//已有新元素入列,唤醒等待读取的线程
} finally {
lock.unlock();
}
}
// 由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加元素
@SuppressWarnings("unchecked")
public T remove() throws InterruptedException {
lock.lock();
try {
while (count == 0)
notEmpty.await();//队列为空,当前读取线程等待
Object x = items[removeIndex];
if (++removeIndex == items.length)
removeIndex = 0;
--count;
notFull.signal();//队列多出一个空位,唤醒等待的添加线程
return (T) x;
} finally {
lock.unlock();
}
}
}

上述示例中,BoundedQueue通过 add(T t) 方法添加一个元素,通过 remove() 方法移出一个元素。以添加方法为例。

首先需要获得锁,目的是确保数组修改的可见性和排他性。当数组数量等于数组长度时,表示数组已满,则调用 notFull.await() ,当前线程随之释放锁并进入等待状态。如果数组数量不等于数组长度,表示数组未满,则添加元素到数组中,同时通知等待在 notEmpty 上的线程,数组中已经有新元素可以获取。

在添加和删除方法中使用while循环而非if判断,目的是防止过早或意外的通知,只有条件符合才能够退出循环。回想之前提到的等待/通知的经典范式,二者是非常类似的。


第三节 实现分析

ConditionObject 是队列同步器 AbstractQueuedSynchronizer 的内部类,实现了 Condition 接口,因为Condition的操作需要获取相关联的锁,所以作为同步器的内部类也较为合理。每个Condition对象都包含着一个队列(以下称为等待队列),该队列是Condition对象实现等待/通知功能的关键。

下面将分析Condition的实现,主要包括:等待队列、等待和通知,下面提到的Condition如果不加说明均指的是 ConditionObject

3.1 等待队列

等待队列是一个FIFO的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,如果一个线程调用了 Condition.await() 方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。事实上,节点的定义复用了同步器中节点的定义,也就是说,同步队列和等待队列中节点类型都是同步器的静态内部类 AbstractQueuedSynchronizer.Node

一个Condition包含一个等待队列,Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。当前线程调用 Condition.await() 方法,将会以当前线程构造节点,并将节点从尾部加入等待队列,等待队列的基本结构如下图所示。

等待队列的基本结构

如图所示,Condition 拥有首尾节点的引用,而新增节点只需要将原有的尾节点 nextWaiter 指向它,并且更新尾节点即可。上述节点引用更新的过程并没有使用CAS保证,原因在于调用 await() 方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。

在Object的监视器模型上,一个对象拥有一个同步队列和等待队列,而并发包中的Lock(更确切地说是同步器)拥有一个同步队列和多个等待队列,其对应关系如下图所示。

同步队列与等待队列

如图所示,Condition的实现是同步器的内部类,因此每个Condition实例都能够访问同步器提供的方法,相当于每个Condition都拥有所属同步器的引用。

3.2 等待

调用 Conditionawait() 方法(或者以 await 开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。当从 await() 方法返回时,当前线程一定获取了 Condition 相关联的锁。

如果从队列(同步队列和等待队列)的角度看 await() 方法,当调用 await() 方法时,相当于同步队列的首节点(获取了锁的节点)移动到 Condition 的等待队列中

Conditionawait() 方法,如下列代码所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// 当前线程加入等待队列
Node node = addConditionWaiter();
// 释放同步状态,也就是释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

private Node addConditionWaiter() {
//获取等待队列的尾结点
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//构建新的等待节点,并加入等待队列
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}

调用该方法的线程成功获取了锁的线程,也就是同步队列中的首节点,该方法会将当前线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。

当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。如果不是通过其他线程调用 Condition.signal() 方法唤醒,而是对等待线程进行中断,则会抛出 InterruptedException

如果从队列的角度去看,当前线程加入 Condition 的等待队列,该过程如下图示。

当前线程加入等待队列

如图所示,同步队列的首节点并不会直接加入等待队列,而是通过 addConditionWaiter() 方法把当前线程构造成一个新的节点并将其加入等待队列中

3.3 通知

调用 Conditionsignal() 方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。

Conditionsignal() 方法,如下列代码所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
	public class ConditionObject implements Condition, java.io.Serializable {

public final void signal() {
//当前线程必须是获取了锁的线程
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}

private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
}

final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}

调用该方法的前置条件是当前线程必须获取了锁,可以看到 signal() 方法进行了 isHeldExclusively() 检查,也就是当前线程必须是获取了锁的线程。接着获取等待队列的首节点,将其移动到同步队列并使用 LockSupport 唤醒节点中的线程。

节点从等待队列移动到同步队列的过程如下图所示。

节点从等待队列移动到同步队列

通过调用同步器的 enq(Node node) 方法,等待队列中的头节点线程安全地移动到同步队列。当节点移动到同步队列后,当前线程再使用 LockSupport 唤醒该节点的线程。

被唤醒后的线程,将从 await() 方法中的while循环中退出( isOnSyncQueue(Node node) 方法返回true,节点已经在同步队列中),进而调用同步器的 acquireQueued() 方法加入到获取同步状态的竞争中。

成功获取同步状态(或者说锁)之后,被唤醒的线程将从先前调用的 await() 方法返回,此时该线程已经成功地获取了锁。

ConditionsignalAll() 方法,相当于对等待队列中的每个节点均执行一次 signal() 方法,效果就是将等待队列中所有节点全部移动到同步队列中,并唤醒每个节点的线程。


参考博客和文章书籍等:

《Java并发编程的艺术》

因博客主等未标明不可引用,若部分内容涉及侵权请及时告知,我会尽快修改和删除相关内容