Future接口,常见的线程池中的FutureTask实现

FutureTask

1. Callable

Callable是一个函数式接口,声明了 call() 函数,和 run() 的区别就是会返回具体的结果,且可以抛出异常

1
2
3
4
5
6
7
@FunctionalInterface
public interface Callable<V> {
/**
* 计算结果并返回
*/
V call() throws Exception;
}

2. Future

Callable的返回值由Future接口获取。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public interface Future<V> {

/**
* 尝试取消执行此任务,若任务已完成、已取消或其他原因无法取消则此尝试失败。若执行成功,且执行时任务尚未启动,则此任务不会再运行。若任务已启动,则mayInterruptIfRunning来决定是否中断其执行
*/
boolean cancel(boolean mayInterruptIfRunning);

/**
* 若任务在正常完成前被取消则返回true
*/
boolean isCancelled();

/**
* 若任务执行完毕则返回true
*/
boolean isDone();

/**
* 等待计算完成,并返回结果
*/
V get() throws InterruptedException, ExecutionException;

/**
* 等待最多指定的时间来完成计算,返回结果
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}

Future的 get() 方法是阻塞性的,当计算尚未结束时 get() 方法会一直阻塞,所以就导致了排队等待,严重的影响到运行效率。

1
2
3
4
5
6
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* 设置此Future为计算结果,除非被取消
*/
void run();
}

3. FutureTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
/**
* 可被取消的异步运算
* 此类为接口Future的实现类,实现了启动和取消计算,查询计算是否完成以及查询计算结果等方法。
* 只有在计算完成后才能检索结果,在此之前get()会一直阻塞。计算完成后无法重新启动或者取消计算。
*/
public class FutureTask<V> implements RunnableFuture<V> {
/*
* 修订说明:当前与该类以前依赖AbstractQueuedSynchronizer的版本不同,主要是为了避免在取消竞争期间保留中断状态使用户觉得诧异。当前设计中的同步控制依赖于通过CAS更新的“状态”字段来跟踪完成情况,以及一个简单的treiber堆栈来保存等待的线程。
*
* 样式说明:和往常一样,我们绕过使用atomicxfieldupdater的开销,而是直接使用不安全的内部函数。
*/

/**
* 当前任务的执行状态,初始为NEW. 只会在set,setException,cancel中转为终止状态,
* 在过程中可能会有临时的状态:
* COMPLETING (在设置结果时) 和 INTERRUPTING (仅在中断程序以便cancel(true)时)
* 这些状态转变采用比较廉价的ordered/lazy写入,因为这些值都是唯一的,无法被修改。
* 可能发生的状态转换:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;

/** 底层可调用,运行后为null */
private Callable<V> callable;
/** 返回的结果或get()抛出的异常 */
private Object outcome; // non-volatile, protected by state reads/writes
/** 运行callable的thread,在run()进行了CAS */
private volatile Thread runner;
/** 等待线程的treiber堆栈 */
private volatile WaitNode waiters;

/**
* 返回已完成任务的结果
*/
@SuppressWarnings("unchecked")
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}

/**
* 创建FutureTask}在运行时执行给定的Callable
*/
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // 确保callable的可见性
}

/**
* 创建FutureTask}在运行时执行给定的Callable,并在get()成功结束后返回给定的结果
*/
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}

public boolean isCancelled() {
return state >= CANCELLED;
}

public boolean isDone() {
return state != NEW;
}

public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}

public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}

public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}

/**
* 当任务转换为状态isDone(正常流程或cancel()取消)时调用此方法,默认实现什么也不做,子类可以重写此函数来进行回调或执行记录。
* 可以在函数内查询当前状态,来确定是否已取消此任务。
*/
protected void done() { }

/**
* 设置future的结果为给定值,除非此future已被set或已被cancel
* 此方法会在完成计算后被run()内部调用
*/
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}

/**
* 使此future声明一个ExecutionException,并将给定的Throwable作为原因,除非此future已被set或已被cancel
* 此方法会在完成计算失败时被run()内部调用
*/
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}

public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

/**
* 在不设置结果的情况下进行计算,然后将此future设置为初始状态。若计算遇到异常或被cancel,则无法执行此操作。
* 为了同本质上会执行多次的任务一起执行而设计,成功运行并重置则返回true
*/
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}

/**
* 确保来自cancel(true)的任意中断只有在运行或运行和重置时才能传递给任务
*/
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt

// assert state == INTERRUPTED;

// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}

/**
* 简单的链表节点,用于在Treiber堆栈中记录等待的线程。
* 请参阅其他类,如Phaser和SynchronousQueue,以获得更详细的解释。
*/
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}

/**
* 移除所有等待线程并发出信号,调用done(),置callable为null
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}

done();

callable = null; // to reduce footprint
}

/**
* 在中断或超时时等待完成或中止
*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}

int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}

/**
* 尝试取消链接超时或中断的等待节点,以避免积累垃圾。
内部节点在没有CAS的情况下是不分割的,因为无论如何释放器都可以遍历到,相对来讲是无害的。
*/
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}
}