阻塞队列

阻塞队列

第一节 什么是阻塞队列

阻塞队列(BlockingQueue)是一个支持两个附加操作的队列。这两个附加的操作支持阻塞的插入和移除方法。

  1. 支持阻塞的插入方法:意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满。

  2. 支持阻塞的移除方法:意思是在队列为空时,获取元素的线程会等待队列变为非空。

阻塞队列常用于生产者和消费者的场景,生产者是向队列里添加元素的线程,消费者是从队列里取元素的线程。阻塞队列就是生产者用来存放元素、消费者用来获取元素的容器。

在阻塞队列不可用时,这两个附加操作提供了4种处理方式,如表6-1所示。

插入和移除操作的4中处理方式

  • 抛出异常:当队列满时,如果再往队列里插入元素,会抛出 IllegalStateException(”Queue full”)异常。当队列空时,从队列里获取元素会抛出 NoSuchElementException 异常。
  • 返回特殊值:当往队列插入元素时,会返回元素是否插入成功,成功返回true。如果是移除方法,则是从队列里取出一个元素,如果没有则返回null。
  • 一直阻塞:当阻塞队列满时,如果生产者线程往队列里put元素,队列会一直阻塞生产者线程,直到队列可用或者响应中断退出。当队列空时,如果消费者线程从队列里take元素,队列会阻塞住消费者线程,直到队列不为空。
  • 超时退出:当阻塞队列满时,如果生产者线程往队列里插入元素,队列会阻塞生产者线程一段时间,如果超过了指定的时间,生产者线程就会退出。

这两个附加操作的4种处理方式不方便记忆,所以我找了一下这几个方法的规律。put和 take分别尾首含有字母t,offer和poll都含有字母o。

注意: 如果是无界阻塞队列,队列不可能会出现满的情况,所以使用put或offer方法永 远不会被阻塞,而且使用offer方法时,该方法永远返回true。


第二节 Java里的阻塞队列

JDK 7提供了7个阻塞队列,如下。

  • ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
  • LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
  • PriorityBlockingQueue :一个支持优先级排序无界阻塞队列。
  • DelayQueue :一个使用优先级队列实现的无界阻塞队列。
  • SynchronousQueue :一个不存储元素的阻塞队列。
  • LinkedTransferQueue :一个由链表结构组成的无界阻塞队列。
  • LinkedBlockingDeque :一个由链表结构组成的双向阻塞队列。

2.1 ArrayBlockingQueue

ArrayBlockingQueue 是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。

默认情况下不保证线程公平的访问队列,所谓公平访问队列是指阻塞的线程,可以按照阻塞的先后顺序访问队列,即先阻塞线程先访问队列。非公平性是对先等待的线程是非公平的,当队列可用时,阻塞的线程都可以争夺访问队列的资格,有可能先阻塞的线程最后才访问队列。为了保证公平性,通常会降低吞吐量。我们可以使用以下代码创建一个公平的阻塞队列。

1
ArrayBlockingQueue fairQueue = new ArrayBlockingQueue(1000,true);

访问者的公平性是使用可重入锁实现的,代码如下。

1
2
3
4
5
6
7
8
public ArrayBlockingQueue(int capacity, boolean fair) {        
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

2.2 LinkedBlockingQueue

LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为 Integer.MAX_VALUE 。此队列按照先进先出的原则对元素进行排序。

ArrayBlockingQueue和LinkedBlockingQueue的区别?

  1. 队列中锁的实现不同
    • ArrayBlockingQueue 队列中的锁是没有分离的,即生产和消费用的是同一个锁;
    • LinkedBlockingQueue 队列中的锁是分离的,即生产用的是 putLock ,消费是 takeLock
  2. 在生产或消费时操作不同
    • ArrayBlockingQueue 队列中在生产和消费的时候,是直接将枚举对象插入或移除的;
    • LinkedBlockingQueue 队列中在生产和消费的时候,需要把枚举对象转换为 Node<E> 进行插入或移除,会影响性能。
  3. 队列大小初始化方式不同
    • ArrayBlockingQueue 实现的队列中必须指定队列的大小;
    • LinkedBlockingQueue 实现的队列中可以不指定队列的大小,但是默认是 Integer.MAX_VALUE

2.3 PriorityBlockingQueue

PriorityBlockingQueue 是一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数Comparator来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。

2.4 DelayQueue

DelayQueue 是一个支持延时获取元素的无界阻塞队列。队列使用 PriorityQueue 来实现。队列中的元素必须实现 Delayed 接口,在创建元素时可以指定多久才能从队列中获取当前元素。

只有在延迟期满时才能从队列中提取元素。

DelayQueue 非常有用,可以将 DelayQueue 运用在以下应用场景。

  • 缓存系统的设计:可以用 DelayQueue 保存缓存元素的有效期,使用一个线程循环查询 DelayQueue ,一旦能从 DelayQueue 中获取元素时,表示缓存有效期到了。
  • 定时任务调度:使用 DelayQueue 保存当天将会执行的任务和执行时间,一旦从 DelayQueue 中获取到任务就开始执行,比如 TimerQueue 就是使用 DelayQueue 实现的。

(1)如何实现Delayed接口

DelayQueue 队列的元素必须实现Delayed接口。我们可以参考 ScheduledThreadPoolExecutorScheduledFutureTask 类的实现,一共有三步。

第一步:在对象创建的时候,初始化基本数据。使用time记录当前对象延迟到什么时候可 以使用,使用 sequenceNumber 来标识元素在队列中的先后顺序。代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {

...

/**
* Sequence number to break scheduling ties, and in turn to
* guarantee FIFO order among tied entries.
*/
private static final AtomicLong sequencer = new AtomicLong(0);

private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {

/** Sequence number to break ties FIFO */
private final long sequenceNumber;

/** The time the task is enabled to execute in nanoTime units */
private long time;

/**
* Period in nanoseconds for repeating tasks. A positive
* value indicates fixed-rate execution. A negative value
* indicates fixed-delay execution. A value of 0 indicates a
* non-repeating task.
*/
private final long period;

/** The actual task to be re-enqueued by reExecutePeriodic */
RunnableScheduledFuture<V> outerTask = this;

/**
* Index into delay queue, to support faster cancellation.
*/
int heapIndex;

ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
}
}

第二步:实现getDelay方法,该方法返回当前元素还需要延时多长时间,单位是纳秒,代码如下。

1
2
3
public long getDelay(TimeUnit unit) {            
return unit.convert(time - now(), TimeUnit.NANOSECONDS);
}

通过构造函数可以看出延迟时间参数ns的单位是纳秒,自己设计的时候最好使用纳秒,因为实现 getDelay() 方法时可以指定任意单位,一旦以秒或分作为单位,而延时时间又精确不到纳秒就麻烦了。使用时请注意当time小于当前时间时,getDelay 会返回负数。

第三步:实现 compareTo 方法来指定元素的顺序。例如,让延时时间最长的放在队列的末尾。实现代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public int compareTo(Delayed other) {            
if (other == this)  // compare zero ONLY if same object
return 0;

if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<> x = (ScheduledFutureTask<>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}

long d = (getDelay(TimeUnit.NANOSECONDS) other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) 0 : ((d < 0) -1 : 1);
}

(2)如何实现延时阻塞队列

延时阻塞队列的实现很简单,当消费者从队列里获取元素时,如果元素没有达到延时时间,就阻塞当前线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
long delay = first.getDelay(TimeUnit.NANOSECONDS); 
if (delay <= 0)
return q.poll();
else if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}

代码中的变量leader是一个等待获取队列头部元素的线程。如果leader不等于空,表示已经有线程在等待获取队列的头元素。所以,使用 await() 方法让当前线程等待信号。如果leader等于空,则把当前线程设置成leader,并使用 awaitNanos() 方法让当前线程等待接收信号或等待delay时间。

2.5 SynchronousQueue

SynchronousQueue 是一个不存储元素的阻塞队列。每一个put操作必须等待一个take操作,否则不能继续添加元素。

它支持公平访问队列。默认情况下线程采用非公平性策略访问队列。使用以下构造方法可以创建公平性访问的SynchronousQueue ,如果设置为true,则等待的线程会采用先进先出的顺序访问队列。

1
2
3
public SynchronousQueue(boolean fair) {        
transferer = fair new TransferQueue() : new TransferStack();
}

SynchronousQueue 可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。

SynchronousQueue 的吞吐量高于 LinkedBlockingQueueArrayBlockingQueue

2.6 LinkedTransferQueue

LinkedTransferQueue 是一个由链表结构组成的无界阻塞 TransferQueue 队列。相对于其他阻塞队列, LinkedTransferQueue 多了 tryTransfertransfer 方法。

(1)transfer方法

如果当前有消费者正在等待接收元素(消费者使用 take() 方法或带时间限制的 poll() 方法时),transfer 方法可以把生产者传入的元素立刻 transfer(传输)给消费者。如果没有消费者在等待接收元素,transfer 方法会将元素存放在队列的tail节点,并等到该元素被消费者消费了才返回。transfer 方法的关键代码如下。

1
2
Node pred = tryAppend(s, haveData); 
return awaitMatch(s, pred, e, (how == TIMED), nanos);

第一行代码是试图把存放当前元素的s节点作为tail节点。第二行代码是让CPU自旋等待消费者消费元素。因为自旋会消耗CPU,所以自旋一定的次数后使用 Thread.yield() 方法来暂停当前正在执行的线程,并执行其他线程。

(2)tryTransfer方法

tryTransfer 方法是用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回false。和 transfer 方法的区别是 tryTransfer 方法无论消费者是否接收,方法立即返回,而 transfer 方法是必须等到消费者消费了才返回。

对于带有时间限制的 tryTransfer(E e,long timeout,TimeUnit unit) 方法,试图把生产者传入的元素直接传给消费者,但是如果没有消费者消费该元素则等待指定的时间再返回,如果超时还没消费元素,则返回false,如果在超时时间内消费了元素,则返回true。

2.7 LinkedBlockingDeque

LinkedBlockingDeque 是一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。相比其他的阻塞队列,LinkedBlockingDeque 多了 addFirstaddLastofferFirstofferLastpeekFirstpeekLast 等方法,以First单词结尾的方法,表示插入、 获取(peek)或移除双端队列的第一个元素。以Last单词结尾的方法,表示插入、获取或移除双端队列的最后一个元素。另外,插入方法 add 等同于 addLast ,移除方法 remove 等效于 removeFirst 。但是 take 方法却等同于 takeFirst ,不知道是不是JDK的bug,使用时还是用带有First和Last后缀的方法更清楚。

在初始化 LinkedBlockingDeque 时可以设置容量防止其过度膨胀。另外,双向阻塞队列可以运用在“工作窃取”模式中。


第三节 阻塞队列的实现原理

如果队列是空的,消费者会一直等待,当生产者添加元素时,消费者是如何知道当前队列有元素的呢?如果让你来设计阻塞队列你会如何设计,如何让生产者和消费者进行高效率的通信呢?让我们先来看看JDK是如何实现的。

使用通知模式实现。所谓通知模式,就是当生产者往满的队列里添加元素时会阻塞住生产者,当消费者消费了一个队列中的元素后,会通知生产者当前队列可用。通过查看JDK源码发现 ArrayBlockingQueue 使用了 Condition 来实现,代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private final Condition notFull; 
private final Condition notEmpty;
public ArrayBlockingQueue(int capacity, boolean fair) {
// 省略其他代码
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}

private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}

当往队列里插入一个元素时,如果队列不可用,那么阻塞生产者主要通过 LockSupport.park(this)来实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public final void await() throws InterruptedException {            
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;

while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;

if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();

if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

继续进入源码,发现调用 setBlocker 先保存一下将要阻塞的线程,然后调用 unsafe.park 阻塞当前线程。

1
2
3
4
5
6
public static void park(Object blocker) {        
Thread t = Thread.currentThread();
setBlocker(t, blocker);
unsafe.park(false, 0L);
setBlocker(t, null);
}

unsafe.park 是个native方法,代码如下。

1
public native void park(boolean isAbsolute, long time);

park 这个方法会阻塞当前线程,只有以下4种情况中的一种发生时,该方法才会返回。

  • 与park对应的 unpark 执行或已经执行时。“已经执行”是指 unpark 先执行,然后再执行park的情况。
  • 线程被中断时。
  • 等待完time参数指定的毫秒数时。
  • 异常现象发生时,这个异常现象没有任何原因。

继续看一下JVM是如何实现 park 方法:park在不同的操作系统中使用不同的方式实现,在 Linux下使用的是系统方法 pthread_cond_wait 实现。实现代码在JVM源码路径 src/os/linux/vm/os_linux.cpp 里的 os::PlatformEvent::park 方法,代码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void os::PlatformEvent::park() {    
int v ;
for (;;) {
v = _Event ;
if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
}

guarantee (v >= 0, "invariant") ;
if (v == 0) {
// Do this the hard way by blocking ...
int status = pthread_mutex_lock(_mutex);
assert_status(status == 0, status, "mutex_lock");
guarantee (_nParked == 0, "invariant") ;
++ _nParked ;
while (_Event < 0) {
status = pthread_cond_wait(_cond, _mutex);
// for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
// Treat this the same as if the wait was interrupted
if (status == ETIME) { status = EINTR; }
assert_status(status == 0 || status == EINTR, status, "cond_wait");
}
-- _nParked ;
// In theory we could move the ST of 0 into _Event past the unlock(),
// but then we'd need a MEMBAR after the ST.
_Event = 0 ;
status = pthread_mutex_unlock(_mutex);
assert_status(status == 0, status, "mutex_unlock");
}
guarantee (_Event >= 0, "invariant") ;
}
}

pthread_cond_wait 是一个多线程的条件变量函数,cond 是condition的缩写,字面意思可以理解为线程在等待一个条件发生,这个条件是一个全局变量。这个方法接收两个参数:一个共享变量 _cond,一个互斥量 _mutex 。而 unpark 方法在Linux下是使用 pthread_cond_signal 实现的。park 方法在Windows下则是使用 WaitForSingleObject 实现的。想知道 pthread_cond_wait 是如何实现的,可以参考 glibc-2.5nptl/sysdeps/pthread/pthread_cond_wait.c

当线程被阻塞队列阻塞时,线程会进入WAITING(parking)状态。我们可以使用 jstack dump 阻塞的生产者线程看到这点,如下。

1
2
3
4
5
6
"main" prio=5 tid=0x00007fc83c000000 nid=0x10164e000 waiting on condition [0x0000000101        
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x0000000140559fe8> (a java.util.concurrent.locks AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:32
at blockingqueue.ArrayBlockingQueueTest.main(ArrayBlockingQueueTest.java:

第四节 源码

4.1 BlockingQueue

BlockingQueue 接口API如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public interface BlockingQueue<E> extends Queue<E> {
// 新增元素
boolean add(E e);

// 将元素插入队列
boolean offer(E e);

// 将元素插入队列,若空间有限可以等待指定的时间
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;

// 将元素插入队列,若空间有限则阻塞等待
void put(E e) throws InterruptedException;

// 获取并移除队列的头部,在元素变得可用前一直阻塞等待
E take() throws InterruptedException;

// 获取并移除队列的头部,若元素不可用可以等待指定的时间
E poll(long timeout, TimeUnit unit)
throws InterruptedException;

// 返回队列中剩余容量
int remainingCapacity();

// 移除元素
boolean remove(Object o);

public boolean contains(Object o);

// 从队列中删除所有可用的元素,并添加到给定集合。
int drainTo(Collection<? super E> c);

int drainTo(Collection<? super E> c, int maxElements);
}

4.2 ArrayBlockingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {

/**
* Serialization ID. This class relies on default serialization
* even for the items array, which is default-serialized, even if
* it is empty. Otherwise it could not be declared final, which is
* necessary here.
*/
private static final long serialVersionUID = -817911632652898426L;

/** The queued items */
final Object[] items;

/** items index for next take, poll, peek or remove */
int takeIndex;

/** items index for next put, offer, or add */
int putIndex;

/** Number of elements in the queue */
int count;

/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/

/** Main lock guarding all access */
final ReentrantLock lock;

/** Condition for waiting takes */
private final Condition notEmpty;

/** Condition for waiting puts */
private final Condition notFull;

/**
* Shared state for currently active iterators, or null if there
* are known not to be any. Allows queue operations to update
* iterator state.
*/
transient Itrs itrs = null;

// Internal helper methods

/**
* Circularly decrement i.
*/
final int dec(int i) {
return ((i == 0) ? items.length : i) - 1;
}

/**
* Returns item at index i.
*/
@SuppressWarnings("unchecked")
final E itemAt(int i) {
return (E) items[i];
}

/**
* Throws NullPointerException if argument is null.
*
* @param v the element
*/
private static void checkNotNull(Object v) {
if (v == null)
throw new NullPointerException();
}

/**
* Inserts element at current put position, advances, and signals.
* Call only when holding lock.
*/
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}

/**
* Extracts element at current take position, advances, and signals.
* Call only when holding lock.
*/
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}

/**
* Deletes item at array index removeIndex.
* Utility for remove(Object) and iterator.remove.
* Call only when holding lock.
*/
void removeAt(final int removeIndex) {
// assert lock.getHoldCount() == 1;
// assert items[removeIndex] != null;
// assert removeIndex >= 0 && removeIndex < items.length;
final Object[] items = this.items;
if (removeIndex == takeIndex) {
// removing front item; just advance
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
} else {
// an "interior" remove

// slide over all others up through putIndex.
final int putIndex = this.putIndex;
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
itrs.removedAt(removeIndex);
}
notFull.signal();
}

/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and default access policy.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}

/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and the specified access policy.
*
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity, the specified access policy and initially containing the
* elements of the given collection,
* added in traversal order of the collection's iterator.
*
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.
* @param c the collection of elements to initially contain
* @throws IllegalArgumentException if {@code capacity} is less than
* {@code c.size()}, or less than 1.
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);

final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}

/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning {@code true} upon success and throwing an
* {@code IllegalStateException} if this queue is full.
*
* @param e the element to add
* @return {@code true} (as specified by {@link Collection#add})
* @throws IllegalStateException if this queue is full
* @throws NullPointerException if the specified element is null
*/
public boolean add(E e) {
return super.add(e);
}

/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning {@code true} upon success and {@code false} if this queue
* is full. This method is generally preferable to method {@link #add},
* which can fail to insert an element only by throwing an exception.
*
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}

/**
* Inserts the specified element at the tail of this queue, waiting
* for space to become available if the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}

/**
* Inserts the specified element at the tail of this queue, waiting
* up to the specified wait time for space to become available if
* the queue is full.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

checkNotNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}

public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}

public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}

public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}

// this doc comment is overridden to remove the reference to collections
// greater in size than Integer.MAX_VALUE
/**
* Returns the number of elements in this queue.
*
* @return the number of elements in this queue
*/
public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}

// this doc comment is a modified copy of the inherited doc comment,
// without the reference to unlimited queues.
/**
* Returns the number of additional elements that this queue can ideally
* (in the absence of memory or resource constraints) accept without
* blocking. This is always equal to the initial capacity of this queue
* less the current {@code size} of this queue.
*
* <p>Note that you <em>cannot</em> always tell if an attempt to insert
* an element will succeed by inspecting {@code remainingCapacity}
* because it may be the case that another thread is about to
* insert or remove an element.
*/
public int remainingCapacity() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return items.length - count;
} finally {
lock.unlock();
}
}

/**
* Removes a single instance of the specified element from this queue,
* if it is present. More formally, removes an element {@code e} such
* that {@code o.equals(e)}, if this queue contains one or more such
* elements.
* Returns {@code true} if this queue contained the specified element
* (or equivalently, if this queue changed as a result of the call).
*
* <p>Removal of interior elements in circular array based queues
* is an intrinsically slow and disruptive operation, so should
* be undertaken only in exceptional circumstances, ideally
* only when the queue is known not to be accessible by other
* threads.
*
* @param o element to be removed from this queue, if present
* @return {@code true} if this queue changed as a result of the call
*/
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}

/**
* Returns {@code true} if this queue contains the specified element.
* More formally, returns {@code true} if and only if this queue contains
* at least one element {@code e} such that {@code o.equals(e)}.
*
* @param o object to be checked for containment in this queue
* @return {@code true} if this queue contains the specified element
*/
public boolean contains(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
if (o.equals(items[i]))
return true;
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}

/**
* Returns an array containing all of the elements in this queue, in
* proper sequence.
*
* <p>The returned array will be "safe" in that no references to it are
* maintained by this queue. (In other words, this method must allocate
* a new array). The caller is thus free to modify the returned array.
*
* <p>This method acts as bridge between array-based and collection-based
* APIs.
*
* @return an array containing all of the elements in this queue
*/
public Object[] toArray() {
Object[] a;
final ReentrantLock lock = this.lock;
lock.lock();
try {
final int count = this.count;
a = new Object[count];
int n = items.length - takeIndex;
if (count <= n)
System.arraycopy(items, takeIndex, a, 0, count);
else {
System.arraycopy(items, takeIndex, a, 0, n);
System.arraycopy(items, 0, a, n, count - n);
}
} finally {
lock.unlock();
}
return a;
}

/**
* Returns an array containing all of the elements in this queue, in
* proper sequence; the runtime type of the returned array is that of
* the specified array. If the queue fits in the specified array, it
* is returned therein. Otherwise, a new array is allocated with the
* runtime type of the specified array and the size of this queue.
*
* <p>If this queue fits in the specified array with room to spare
* (i.e., the array has more elements than this queue), the element in
* the array immediately following the end of the queue is set to
* {@code null}.
*
* <p>Like the {@link #toArray()} method, this method acts as bridge between
* array-based and collection-based APIs. Further, this method allows
* precise control over the runtime type of the output array, and may,
* under certain circumstances, be used to save allocation costs.
*
* <p>Suppose {@code x} is a queue known to contain only strings.
* The following code can be used to dump the queue into a newly
* allocated array of {@code String}:
*
* <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
*
* Note that {@code toArray(new Object[0])} is identical in function to
* {@code toArray()}.
*
* @param a the array into which the elements of the queue are to
* be stored, if it is big enough; otherwise, a new array of the
* same runtime type is allocated for this purpose
* @return an array containing all of the elements in this queue
* @throws ArrayStoreException if the runtime type of the specified array
* is not a supertype of the runtime type of every element in
* this queue
* @throws NullPointerException if the specified array is null
*/
@SuppressWarnings("unchecked")
public <T> T[] toArray(T[] a) {
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
final int count = this.count;
final int len = a.length;
if (len < count)
a = (T[])java.lang.reflect.Array.newInstance(
a.getClass().getComponentType(), count);
int n = items.length - takeIndex;
if (count <= n)
System.arraycopy(items, takeIndex, a, 0, count);
else {
System.arraycopy(items, takeIndex, a, 0, n);
System.arraycopy(items, 0, a, n, count - n);
}
if (len > count)
a[count] = null;
} finally {
lock.unlock();
}
return a;
}

public String toString() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int k = count;
if (k == 0)
return "[]";

final Object[] items = this.items;
StringBuilder sb = new StringBuilder();
sb.append('[');
for (int i = takeIndex; ; ) {
Object e = items[i];
sb.append(e == this ? "(this Collection)" : e);
if (--k == 0)
return sb.append(']').toString();
sb.append(',').append(' ');
if (++i == items.length)
i = 0;
}
} finally {
lock.unlock();
}
}

/**
* Atomically removes all of the elements from this queue.
* The queue will be empty after this call returns.
*/
public void clear() {
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int k = count;
if (k > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
do {
items[i] = null;
if (++i == items.length)
i = 0;
} while (i != putIndex);
takeIndex = putIndex;
count = 0;
if (itrs != null)
itrs.queueIsEmpty();
for (; k > 0 && lock.hasWaiters(notFull); k--)
notFull.signal();
}
} finally {
lock.unlock();
}
}

/**
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}

/**
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection<? super E> c, int maxElements) {
checkNotNull(c);
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int n = Math.min(maxElements, count);
int take = takeIndex;
int i = 0;
try {
while (i < n) {
@SuppressWarnings("unchecked")
E x = (E) items[take];
c.add(x);
items[take] = null;
if (++take == items.length)
take = 0;
i++;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
count -= i;
takeIndex = take;
if (itrs != null) {
if (count == 0)
itrs.queueIsEmpty();
else if (i > take)
itrs.takeIndexWrapped();
}
for (; i > 0 && lock.hasWaiters(notFull); i--)
notFull.signal();
}
}
} finally {
lock.unlock();
}
}

/**
* Returns an iterator over the elements in this queue in proper sequence.
* The elements will be returned in order from first (head) to last (tail).
*
* <p>The returned iterator is
* <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
*
* @return an iterator over the elements in this queue in proper sequence
*/
public Iterator<E> iterator() {
return new Itr();
}

/**
* Shared data between iterators and their queue, allowing queue
* modifications to update iterators when elements are removed.
*
* This adds a lot of complexity for the sake of correctly
* handling some uncommon operations, but the combination of
* circular-arrays and supporting interior removes (i.e., those
* not at head) would cause iterators to sometimes lose their
* places and/or (re)report elements they shouldn't. To avoid
* this, when a queue has one or more iterators, it keeps iterator
* state consistent by:
*
* (1) keeping track of the number of "cycles", that is, the
* number of times takeIndex has wrapped around to 0.
* (2) notifying all iterators via the callback removedAt whenever
* an interior element is removed (and thus other elements may
* be shifted).
*
* These suffice to eliminate iterator inconsistencies, but
* unfortunately add the secondary responsibility of maintaining
* the list of iterators. We track all active iterators in a
* simple linked list (accessed only when the queue's lock is
* held) of weak references to Itr. The list is cleaned up using
* 3 different mechanisms:
*
* (1) Whenever a new iterator is created, do some O(1) checking for
* stale list elements.
*
* (2) Whenever takeIndex wraps around to 0, check for iterators
* that have been unused for more than one wrap-around cycle.
*
* (3) Whenever the queue becomes empty, all iterators are notified
* and this entire data structure is discarded.
*
* So in addition to the removedAt callback that is necessary for
* correctness, iterators have the shutdown and takeIndexWrapped
* callbacks that help remove stale iterators from the list.
*
* Whenever a list element is examined, it is expunged if either
* the GC has determined that the iterator is discarded, or if the
* iterator reports that it is "detached" (does not need any
* further state updates). Overhead is maximal when takeIndex
* never advances, iterators are discarded before they are
* exhausted, and all removals are interior removes, in which case
* all stale iterators are discovered by the GC. But even in this
* case we don't increase the amortized complexity.
*
* Care must be taken to keep list sweeping methods from
* reentrantly invoking another such method, causing subtle
* corruption bugs.
*/
class Itrs {

/**
* Node in a linked list of weak iterator references.
*/
private class Node extends WeakReference<Itr> {
Node next;

Node(Itr iterator, Node next) {
super(iterator);
this.next = next;
}
}

/** Incremented whenever takeIndex wraps around to 0 */
int cycles = 0;

/** Linked list of weak iterator references */
private Node head;

/** Used to expunge stale iterators */
private Node sweeper = null;

private static final int SHORT_SWEEP_PROBES = 4;
private static final int LONG_SWEEP_PROBES = 16;

Itrs(Itr initial) {
register(initial);
}

/**
* Sweeps itrs, looking for and expunging stale iterators.
* If at least one was found, tries harder to find more.
* Called only from iterating thread.
*
* @param tryHarder whether to start in try-harder mode, because
* there is known to be at least one iterator to collect
*/
void doSomeSweeping(boolean tryHarder) {
// assert lock.getHoldCount() == 1;
// assert head != null;
int probes = tryHarder ? LONG_SWEEP_PROBES : SHORT_SWEEP_PROBES;
Node o, p;
final Node sweeper = this.sweeper;
boolean passedGo; // to limit search to one full sweep

if (sweeper == null) {
o = null;
p = head;
passedGo = true;
} else {
o = sweeper;
p = o.next;
passedGo = false;
}

for (; probes > 0; probes--) {
if (p == null) {
if (passedGo)
break;
o = null;
p = head;
passedGo = true;
}
final Itr it = p.get();
final Node next = p.next;
if (it == null || it.isDetached()) {
// found a discarded/exhausted iterator
probes = LONG_SWEEP_PROBES; // "try harder"
// unlink p
p.clear();
p.next = null;
if (o == null) {
head = next;
if (next == null) {
// We've run out of iterators to track; retire
itrs = null;
return;
}
}
else
o.next = next;
} else {
o = p;
}
p = next;
}

this.sweeper = (p == null) ? null : o;
}

/**
* Adds a new iterator to the linked list of tracked iterators.
*/
void register(Itr itr) {
// assert lock.getHoldCount() == 1;
head = new Node(itr, head);
}

/**
* Called whenever takeIndex wraps around to 0.
*
* Notifies all iterators, and expunges any that are now stale.
*/
void takeIndexWrapped() {
// assert lock.getHoldCount() == 1;
cycles++;
for (Node o = null, p = head; p != null;) {
final Itr it = p.get();
final Node next = p.next;
if (it == null || it.takeIndexWrapped()) {
// unlink p
// assert it == null || it.isDetached();
p.clear();
p.next = null;
if (o == null)
head = next;
else
o.next = next;
} else {
o = p;
}
p = next;
}
if (head == null) // no more iterators to track
itrs = null;
}

/**
* Called whenever an interior remove (not at takeIndex) occurred.
*
* Notifies all iterators, and expunges any that are now stale.
*/
void removedAt(int removedIndex) {
for (Node o = null, p = head; p != null;) {
final Itr it = p.get();
final Node next = p.next;
if (it == null || it.removedAt(removedIndex)) {
// unlink p
// assert it == null || it.isDetached();
p.clear();
p.next = null;
if (o == null)
head = next;
else
o.next = next;
} else {
o = p;
}
p = next;
}
if (head == null) // no more iterators to track
itrs = null;
}

/**
* Called whenever the queue becomes empty.
*
* Notifies all active iterators that the queue is empty,
* clears all weak refs, and unlinks the itrs datastructure.
*/
void queueIsEmpty() {
// assert lock.getHoldCount() == 1;
for (Node p = head; p != null; p = p.next) {
Itr it = p.get();
if (it != null) {
p.clear();
it.shutdown();
}
}
head = null;
itrs = null;
}

/**
* Called whenever an element has been dequeued (at takeIndex).
*/
void elementDequeued() {
// assert lock.getHoldCount() == 1;
if (count == 0)
queueIsEmpty();
else if (takeIndex == 0)
takeIndexWrapped();
}
}

/**
* Iterator for ArrayBlockingQueue.
*
* To maintain weak consistency with respect to puts and takes, we
* read ahead one slot, so as to not report hasNext true but then
* not have an element to return.
*
* We switch into "detached" mode (allowing prompt unlinking from
* itrs without help from the GC) when all indices are negative, or
* when hasNext returns false for the first time. This allows the
* iterator to track concurrent updates completely accurately,
* except for the corner case of the user calling Iterator.remove()
* after hasNext() returned false. Even in this case, we ensure
* that we don't remove the wrong element by keeping track of the
* expected element to remove, in lastItem. Yes, we may fail to
* remove lastItem from the queue if it moved due to an interleaved
* interior remove while in detached mode.
*/
private class Itr implements Iterator<E> {
/** Index to look for new nextItem; NONE at end */
private int cursor;

/** Element to be returned by next call to next(); null if none */
private E nextItem;

/** Index of nextItem; NONE if none, REMOVED if removed elsewhere */
private int nextIndex;

/** Last element returned; null if none or not detached. */
private E lastItem;

/** Index of lastItem, NONE if none, REMOVED if removed elsewhere */
private int lastRet;

/** Previous value of takeIndex, or DETACHED when detached */
private int prevTakeIndex;

/** Previous value of iters.cycles */
private int prevCycles;

/** Special index value indicating "not available" or "undefined" */
private static final int NONE = -1;

/**
* Special index value indicating "removed elsewhere", that is,
* removed by some operation other than a call to this.remove().
*/
private static final int REMOVED = -2;

/** Special value for prevTakeIndex indicating "detached mode" */
private static final int DETACHED = -3;

Itr() {
// assert lock.getHoldCount() == 0;
lastRet = NONE;
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
if (count == 0) {
// assert itrs == null;
cursor = NONE;
nextIndex = NONE;
prevTakeIndex = DETACHED;
} else {
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
prevTakeIndex = takeIndex;
nextItem = itemAt(nextIndex = takeIndex);
cursor = incCursor(takeIndex);
if (itrs == null) {
itrs = new Itrs(this);
} else {
itrs.register(this); // in this order
itrs.doSomeSweeping(false);
}
prevCycles = itrs.cycles;
// assert takeIndex >= 0;
// assert prevTakeIndex == takeIndex;
// assert nextIndex >= 0;
// assert nextItem != null;
}
} finally {
lock.unlock();
}
}

boolean isDetached() {
// assert lock.getHoldCount() == 1;
return prevTakeIndex < 0;
}

private int incCursor(int index) {
// assert lock.getHoldCount() == 1;
if (++index == items.length)
index = 0;
if (index == putIndex)
index = NONE;
return index;
}

/**
* Returns true if index is invalidated by the given number of
* dequeues, starting from prevTakeIndex.
*/
private boolean invalidated(int index, int prevTakeIndex,
long dequeues, int length) {
if (index < 0)
return false;
int distance = index - prevTakeIndex;
if (distance < 0)
distance += length;
return dequeues > distance;
}

/**
* Adjusts indices to incorporate all dequeues since the last
* operation on this iterator. Call only from iterating thread.
*/
private void incorporateDequeues() {
// assert lock.getHoldCount() == 1;
// assert itrs != null;
// assert !isDetached();
// assert count > 0;

final int cycles = itrs.cycles;
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
final int prevCycles = this.prevCycles;
final int prevTakeIndex = this.prevTakeIndex;

if (cycles != prevCycles || takeIndex != prevTakeIndex) {
final int len = items.length;
// how far takeIndex has advanced since the previous
// operation of this iterator
long dequeues = (cycles - prevCycles) * len
+ (takeIndex - prevTakeIndex);

// Check indices for invalidation
if (invalidated(lastRet, prevTakeIndex, dequeues, len))
lastRet = REMOVED;
if (invalidated(nextIndex, prevTakeIndex, dequeues, len))
nextIndex = REMOVED;
if (invalidated(cursor, prevTakeIndex, dequeues, len))
cursor = takeIndex;

if (cursor < 0 && nextIndex < 0 && lastRet < 0)
detach();
else {
this.prevCycles = cycles;
this.prevTakeIndex = takeIndex;
}
}
}

/**
* Called when itrs should stop tracking this iterator, either
* because there are no more indices to update (cursor < 0 &&
* nextIndex < 0 && lastRet < 0) or as a special exception, when
* lastRet >= 0, because hasNext() is about to return false for the
* first time. Call only from iterating thread.
*/
private void detach() {
// Switch to detached mode
// assert lock.getHoldCount() == 1;
// assert cursor == NONE;
// assert nextIndex < 0;
// assert lastRet < 0 || nextItem == null;
// assert lastRet < 0 ^ lastItem != null;
if (prevTakeIndex >= 0) {
// assert itrs != null;
prevTakeIndex = DETACHED;
// try to unlink from itrs (but not too hard)
itrs.doSomeSweeping(true);
}
}

/**
* For performance reasons, we would like not to acquire a lock in
* hasNext in the common case. To allow for this, we only access
* fields (i.e. nextItem) that are not modified by update operations
* triggered by queue modifications.
*/
public boolean hasNext() {
// assert lock.getHoldCount() == 0;
if (nextItem != null)
return true;
noNext();
return false;
}

private void noNext() {
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
// assert cursor == NONE;
// assert nextIndex == NONE;
if (!isDetached()) {
// assert lastRet >= 0;
incorporateDequeues(); // might update lastRet
if (lastRet >= 0) {
lastItem = itemAt(lastRet);
// assert lastItem != null;
detach();
}
}
// assert isDetached();
// assert lastRet < 0 ^ lastItem != null;
} finally {
lock.unlock();
}
}

public E next() {
// assert lock.getHoldCount() == 0;
final E x = nextItem;
if (x == null)
throw new NoSuchElementException();
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
if (!isDetached())
incorporateDequeues();
// assert nextIndex != NONE;
// assert lastItem == null;
lastRet = nextIndex;
final int cursor = this.cursor;
if (cursor >= 0) {
nextItem = itemAt(nextIndex = cursor);
// assert nextItem != null;
this.cursor = incCursor(cursor);
} else {
nextIndex = NONE;
nextItem = null;
}
} finally {
lock.unlock();
}
return x;
}

public void remove() {
// assert lock.getHoldCount() == 0;
final ReentrantLock lock = ArrayBlockingQueue.this.lock;
lock.lock();
try {
if (!isDetached())
incorporateDequeues(); // might update lastRet or detach
final int lastRet = this.lastRet;
this.lastRet = NONE;
if (lastRet >= 0) {
if (!isDetached())
removeAt(lastRet);
else {
final E lastItem = this.lastItem;
// assert lastItem != null;
this.lastItem = null;
if (itemAt(lastRet) == lastItem)
removeAt(lastRet);
}
} else if (lastRet == NONE)
throw new IllegalStateException();
// else lastRet == REMOVED and the last returned element was
// previously asynchronously removed via an operation other
// than this.remove(), so nothing to do.

if (cursor < 0 && nextIndex < 0)
detach();
} finally {
lock.unlock();
// assert lastRet == NONE;
// assert lastItem == null;
}
}

/**
* Called to notify the iterator that the queue is empty, or that it
* has fallen hopelessly behind, so that it should abandon any
* further iteration, except possibly to return one more element
* from next(), as promised by returning true from hasNext().
*/
void shutdown() {
// assert lock.getHoldCount() == 1;
cursor = NONE;
if (nextIndex >= 0)
nextIndex = REMOVED;
if (lastRet >= 0) {
lastRet = REMOVED;
lastItem = null;
}
prevTakeIndex = DETACHED;
// Don't set nextItem to null because we must continue to be
// able to return it on next().
//
// Caller will unlink from itrs when convenient.
}

private int distance(int index, int prevTakeIndex, int length) {
int distance = index - prevTakeIndex;
if (distance < 0)
distance += length;
return distance;
}

/**
* Called whenever an interior remove (not at takeIndex) occurred.
*
* @return true if this iterator should be unlinked from itrs
*/
boolean removedAt(int removedIndex) {
// assert lock.getHoldCount() == 1;
if (isDetached())
return true;

final int cycles = itrs.cycles;
final int takeIndex = ArrayBlockingQueue.this.takeIndex;
final int prevCycles = this.prevCycles;
final int prevTakeIndex = this.prevTakeIndex;
final int len = items.length;
int cycleDiff = cycles - prevCycles;
if (removedIndex < takeIndex)
cycleDiff++;
final int removedDistance =
(cycleDiff * len) + (removedIndex - prevTakeIndex);
// assert removedDistance >= 0;
int cursor = this.cursor;
if (cursor >= 0) {
int x = distance(cursor, prevTakeIndex, len);
if (x == removedDistance) {
if (cursor == putIndex)
this.cursor = cursor = NONE;
}
else if (x > removedDistance) {
// assert cursor != prevTakeIndex;
this.cursor = cursor = dec(cursor);
}
}
int lastRet = this.lastRet;
if (lastRet >= 0) {
int x = distance(lastRet, prevTakeIndex, len);
if (x == removedDistance)
this.lastRet = lastRet = REMOVED;
else if (x > removedDistance)
this.lastRet = lastRet = dec(lastRet);
}
int nextIndex = this.nextIndex;
if (nextIndex >= 0) {
int x = distance(nextIndex, prevTakeIndex, len);
if (x == removedDistance)
this.nextIndex = nextIndex = REMOVED;
else if (x > removedDistance)
this.nextIndex = nextIndex = dec(nextIndex);
}
else if (cursor < 0 && nextIndex < 0 && lastRet < 0) {
this.prevTakeIndex = DETACHED;
return true;
}
return false;
}

/**
* Called whenever takeIndex wraps around to zero.
*
* @return true if this iterator should be unlinked from itrs
*/
boolean takeIndexWrapped() {
// assert lock.getHoldCount() == 1;
if (isDetached())
return true;
if (itrs.cycles - prevCycles > 1) {
// All the elements that existed at the time of the last
// operation are gone, so abandon further iteration.
shutdown();
return true;
}
return false;
}

// /** Uncomment for debugging. */
// public String toString() {
// return ("cursor=" + cursor + " " +
// "nextIndex=" + nextIndex + " " +
// "lastRet=" + lastRet + " " +
// "nextItem=" + nextItem + " " +
// "lastItem=" + lastItem + " " +
// "prevCycles=" + prevCycles + " " +
// "prevTakeIndex=" + prevTakeIndex + " " +
// "size()=" + size() + " " +
// "remainingCapacity()=" + remainingCapacity());
// }
}

/**
* Returns a {@link Spliterator} over the elements in this queue.
*
* <p>The returned spliterator is
* <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
*
* <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
* {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
*
* @implNote
* The {@code Spliterator} implements {@code trySplit} to permit limited
* parallelism.
*
* @return a {@code Spliterator} over the elements in this queue
* @since 1.8
*/
public Spliterator<E> spliterator() {
return Spliterators.spliterator
(this, Spliterator.ORDERED | Spliterator.NONNULL |
Spliterator.CONCURRENT);
}

/**
* Deserializes this queue and then checks some invariants.
*
* @param s the input stream
* @throws ClassNotFoundException if the class of a serialized object
* could not be found
* @throws java.io.InvalidObjectException if invariants are violated
* @throws java.io.IOException if an I/O error occurs
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {

// Read in items array and various fields
s.defaultReadObject();

// Check invariants over count and index fields. Note that
// if putIndex==takeIndex, count can be either 0 or items.length.
if (items.length == 0 ||
takeIndex < 0 || takeIndex >= items.length ||
putIndex < 0 || putIndex >= items.length ||
count < 0 || count > items.length ||
Math.floorMod(putIndex - takeIndex, items.length) !=
Math.floorMod(count, items.length)) {
throw new java.io.InvalidObjectException("invariants violated");
}
}
}

4.3 LinkedBlockingQueue

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
public class LinkedBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = -6903933977591709194L;

/*
* A variant of the "two lock queue" algorithm. The putLock gates
* entry to put (and offer), and has an associated condition for
* waiting puts. Similarly for the takeLock. The "count" field
* that they both rely on is maintained as an atomic to avoid
* needing to get both locks in most cases. Also, to minimize need
* for puts to get takeLock and vice-versa, cascading notifies are
* used. When a put notices that it has enabled at least one take,
* it signals taker. That taker in turn signals others if more
* items have been entered since the signal. And symmetrically for
* takes signalling puts. Operations such as remove(Object) and
* iterators acquire both locks.
*
* Visibility between writers and readers is provided as follows:
*
* Whenever an element is enqueued, the putLock is acquired and
* count updated. A subsequent reader guarantees visibility to the
* enqueued Node by either acquiring the putLock (via fullyLock)
* or by acquiring the takeLock, and then reading n = count.get();
* this gives visibility to the first n items.
*
* To implement weakly consistent iterators, it appears we need to
* keep all Nodes GC-reachable from a predecessor dequeued Node.
* That would cause two problems:
* - allow a rogue Iterator to cause unbounded memory retention
* - cause cross-generational linking of old Nodes to new Nodes if
* a Node was tenured while live, which generational GCs have a
* hard time dealing with, causing repeated major collections.
* However, only non-deleted Nodes need to be reachable from
* dequeued Nodes, and reachability does not necessarily have to
* be of the kind understood by the GC. We use the trick of
* linking a Node that has just been dequeued to itself. Such a
* self-link implicitly means to advance to head.next.
*/

/**
* Linked list node class
*/
static class Node<E> {
E item;

/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;

Node(E x) { item = x; }
}

/** The capacity bound, or Integer.MAX_VALUE if none */
private final int capacity;

/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();

/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;

/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;

/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();

/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();

/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();

/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();

/**
* Signals a waiting take. Called only from put/offer (which do not
* otherwise ordinarily lock takeLock.)
*/
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}

/**
* Signals a waiting put. Called only from take/poll.
*/
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}

/**
* Links node at end of queue.
*
* @param node the node
*/
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}

/**
* Removes a node from head of queue.
*
* @return the node
*/
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}

/**
* Locks to prevent both puts and takes.
*/
void fullyLock() {
putLock.lock();
takeLock.lock();
}

/**
* Unlocks to allow both puts and takes.
*/
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}

// /**
// * Tells whether both locks are held by current thread.
// */
// boolean isFullyLocked() {
// return (putLock.isHeldByCurrentThread() &&
// takeLock.isHeldByCurrentThread());
// }

/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}.
*/
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}

/**
* Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
*
* @param capacity the capacity of this queue
* @throws IllegalArgumentException if {@code capacity} is not greater
* than zero
*/
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}

/**
* Creates a {@code LinkedBlockingQueue} with a capacity of
* {@link Integer#MAX_VALUE}, initially containing the elements of the
* given collection,
* added in traversal order of the collection's iterator.
*
* @param c the collection of elements to initially contain
* @throws NullPointerException if the specified collection or any
* of its elements are null
*/
public LinkedBlockingQueue(Collection<? extends E> c) {
this(Integer.MAX_VALUE);
final ReentrantLock putLock = this.putLock;
putLock.lock(); // Never contended, but necessary for visibility
try {
int n = 0;
for (E e : c) {
if (e == null)
throw new NullPointerException();
if (n == capacity)
throw new IllegalStateException("Queue full");
enqueue(new Node<E>(e));
++n;
}
count.set(n);
} finally {
putLock.unlock();
}
}

// this doc comment is overridden to remove the reference to collections
// greater in size than Integer.MAX_VALUE
/**
* Returns the number of elements in this queue.
*
* @return the number of elements in this queue
*/
public int size() {
return count.get();
}

// this doc comment is a modified copy of the inherited doc comment,
// without the reference to unlimited queues.
/**
* Returns the number of additional elements that this queue can ideally
* (in the absence of memory or resource constraints) accept without
* blocking. This is always equal to the initial capacity of this queue
* less the current {@code size} of this queue.
*
* <p>Note that you <em>cannot</em> always tell if an attempt to insert
* an element will succeed by inspecting {@code remainingCapacity}
* because it may be the case that another thread is about to
* insert or remove an element.
*/
public int remainingCapacity() {
return capacity - count.get();
}

/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary for space to become available.
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
while (count.get() == capacity) {
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}

/**
* Inserts the specified element at the tail of this queue, waiting if
* necessary up to the specified wait time for space to become available.
*
* @return {@code true} if successful, or {@code false} if
* the specified waiting time elapses before space is available
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException {

if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
enqueue(new Node<E>(e));
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return true;
}

/**
* Inserts the specified element at the tail of this queue if it is
* possible to do so immediately without exceeding the queue's capacity,
* returning {@code true} upon success and {@code false} if this queue
* is full.
* When using a capacity-restricted queue, this method is generally
* preferable to method {@link BlockingQueue#add add}, which can fail to
* insert an element only by throwing an exception.
*
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}

public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}

public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
Node<E> first = head.next;
if (first == null)
return null;
else
return first.item;
} finally {
takeLock.unlock();
}
}

/**
* Unlinks interior Node p with predecessor trail.
*/
void unlink(Node<E> p, Node<E> trail) {
// assert isFullyLocked();
// p.next is not changed, to allow iterators that are
// traversing p to maintain their weak-consistency guarantee.
p.item = null;
trail.next = p.next;
if (last == p)
last = trail;
if (count.getAndDecrement() == capacity)
notFull.signal();
}

/**
* Removes a single instance of the specified element from this queue,
* if it is present. More formally, removes an element {@code e} such
* that {@code o.equals(e)}, if this queue contains one or more such
* elements.
* Returns {@code true} if this queue contained the specified element
* (or equivalently, if this queue changed as a result of the call).
*
* @param o element to be removed from this queue, if present
* @return {@code true} if this queue changed as a result of the call
*/
public boolean remove(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (o.equals(p.item)) {
unlink(p, trail);
return true;
}
}
return false;
} finally {
fullyUnlock();
}
}

/**
* Returns {@code true} if this queue contains the specified element.
* More formally, returns {@code true} if and only if this queue contains
* at least one element {@code e} such that {@code o.equals(e)}.
*
* @param o object to be checked for containment in this queue
* @return {@code true} if this queue contains the specified element
*/
public boolean contains(Object o) {
if (o == null) return false;
fullyLock();
try {
for (Node<E> p = head.next; p != null; p = p.next)
if (o.equals(p.item))
return true;
return false;
} finally {
fullyUnlock();
}
}

/**
* Returns an array containing all of the elements in this queue, in
* proper sequence.
*
* <p>The returned array will be "safe" in that no references to it are
* maintained by this queue. (In other words, this method must allocate
* a new array). The caller is thus free to modify the returned array.
*
* <p>This method acts as bridge between array-based and collection-based
* APIs.
*
* @return an array containing all of the elements in this queue
*/
public Object[] toArray() {
fullyLock();
try {
int size = count.get();
Object[] a = new Object[size];
int k = 0;
for (Node<E> p = head.next; p != null; p = p.next)
a[k++] = p.item;
return a;
} finally {
fullyUnlock();
}
}

/**
* Returns an array containing all of the elements in this queue, in
* proper sequence; the runtime type of the returned array is that of
* the specified array. If the queue fits in the specified array, it
* is returned therein. Otherwise, a new array is allocated with the
* runtime type of the specified array and the size of this queue.
*
* <p>If this queue fits in the specified array with room to spare
* (i.e., the array has more elements than this queue), the element in
* the array immediately following the end of the queue is set to
* {@code null}.
*
* <p>Like the {@link #toArray()} method, this method acts as bridge between
* array-based and collection-based APIs. Further, this method allows
* precise control over the runtime type of the output array, and may,
* under certain circumstances, be used to save allocation costs.
*
* <p>Suppose {@code x} is a queue known to contain only strings.
* The following code can be used to dump the queue into a newly
* allocated array of {@code String}:
*
* <pre> {@code String[] y = x.toArray(new String[0]);}</pre>
*
* Note that {@code toArray(new Object[0])} is identical in function to
* {@code toArray()}.
*
* @param a the array into which the elements of the queue are to
* be stored, if it is big enough; otherwise, a new array of the
* same runtime type is allocated for this purpose
* @return an array containing all of the elements in this queue
* @throws ArrayStoreException if the runtime type of the specified array
* is not a supertype of the runtime type of every element in
* this queue
* @throws NullPointerException if the specified array is null
*/
@SuppressWarnings("unchecked")
public <T> T[] toArray(T[] a) {
fullyLock();
try {
int size = count.get();
if (a.length < size)
a = (T[])java.lang.reflect.Array.newInstance
(a.getClass().getComponentType(), size);

int k = 0;
for (Node<E> p = head.next; p != null; p = p.next)
a[k++] = (T)p.item;
if (a.length > k)
a[k] = null;
return a;
} finally {
fullyUnlock();
}
}

public String toString() {
fullyLock();
try {
Node<E> p = head.next;
if (p == null)
return "[]";

StringBuilder sb = new StringBuilder();
sb.append('[');
for (;;) {
E e = p.item;
sb.append(e == this ? "(this Collection)" : e);
p = p.next;
if (p == null)
return sb.append(']').toString();
sb.append(',').append(' ');
}
} finally {
fullyUnlock();
}
}

/**
* Atomically removes all of the elements from this queue.
* The queue will be empty after this call returns.
*/
public void clear() {
fullyLock();
try {
for (Node<E> p, h = head; (p = h.next) != null; h = p) {
h.next = h;
p.item = null;
}
head = last;
// assert head.item == null && head.next == null;
if (count.getAndSet(0) == capacity)
notFull.signal();
} finally {
fullyUnlock();
}
}

/**
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection<? super E> c) {
return drainTo(c, Integer.MAX_VALUE);
}

/**
* @throws UnsupportedOperationException {@inheritDoc}
* @throws ClassCastException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public int drainTo(Collection<? super E> c, int maxElements) {
if (c == null)
throw new NullPointerException();
if (c == this)
throw new IllegalArgumentException();
if (maxElements <= 0)
return 0;
boolean signalNotFull = false;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
int n = Math.min(maxElements, count.get());
// count.get provides visibility to first n Nodes
Node<E> h = head;
int i = 0;
try {
while (i < n) {
Node<E> p = h.next;
c.add(p.item);
p.item = null;
h.next = h;
h = p;
++i;
}
return n;
} finally {
// Restore invariants even if c.add() threw
if (i > 0) {
// assert h.item == null;
head = h;
signalNotFull = (count.getAndAdd(-i) == capacity);
}
}
} finally {
takeLock.unlock();
if (signalNotFull)
signalNotFull();
}
}

/**
* Returns an iterator over the elements in this queue in proper sequence.
* The elements will be returned in order from first (head) to last (tail).
*
* <p>The returned iterator is
* <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
*
* @return an iterator over the elements in this queue in proper sequence
*/
public Iterator<E> iterator() {
return new Itr();
}

private class Itr implements Iterator<E> {
/*
* Basic weakly-consistent iterator. At all times hold the next
* item to hand out so that if hasNext() reports true, we will
* still have it to return even if lost race with a take etc.
*/

private Node<E> current;
private Node<E> lastRet;
private E currentElement;

Itr() {
fullyLock();
try {
current = head.next;
if (current != null)
currentElement = current.item;
} finally {
fullyUnlock();
}
}

public boolean hasNext() {
return current != null;
}

/**
* Returns the next live successor of p, or null if no such.
*
* Unlike other traversal methods, iterators need to handle both:
* - dequeued nodes (p.next == p)
* - (possibly multiple) interior removed nodes (p.item == null)
*/
private Node<E> nextNode(Node<E> p) {
for (;;) {
Node<E> s = p.next;
if (s == p)
return head.next;
if (s == null || s.item != null)
return s;
p = s;
}
}

public E next() {
fullyLock();
try {
if (current == null)
throw new NoSuchElementException();
E x = currentElement;
lastRet = current;
current = nextNode(current);
currentElement = (current == null) ? null : current.item;
return x;
} finally {
fullyUnlock();
}
}

public void remove() {
if (lastRet == null)
throw new IllegalStateException();
fullyLock();
try {
Node<E> node = lastRet;
lastRet = null;
for (Node<E> trail = head, p = trail.next;
p != null;
trail = p, p = p.next) {
if (p == node) {
unlink(p, trail);
break;
}
}
} finally {
fullyUnlock();
}
}
}

/** A customized variant of Spliterators.IteratorSpliterator */
static final class LBQSpliterator<E> implements Spliterator<E> {
static final int MAX_BATCH = 1 << 25; // max batch array size;
final LinkedBlockingQueue<E> queue;
Node<E> current; // current node; null until initialized
int batch; // batch size for splits
boolean exhausted; // true when no more nodes
long est; // size estimate
LBQSpliterator(LinkedBlockingQueue<E> queue) {
this.queue = queue;
this.est = queue.size();
}

public long estimateSize() { return est; }

public Spliterator<E> trySplit() {
Node<E> h;
final LinkedBlockingQueue<E> q = this.queue;
int b = batch;
int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
if (!exhausted &&
((h = current) != null || (h = q.head.next) != null) &&
h.next != null) {
Object[] a = new Object[n];
int i = 0;
Node<E> p = current;
q.fullyLock();
try {
if (p != null || (p = q.head.next) != null) {
do {
if ((a[i] = p.item) != null)
++i;
} while ((p = p.next) != null && i < n);
}
} finally {
q.fullyUnlock();
}
if ((current = p) == null) {
est = 0L;
exhausted = true;
}
else if ((est -= i) < 0L)
est = 0L;
if (i > 0) {
batch = i;
return Spliterators.spliterator
(a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL |
Spliterator.CONCURRENT);
}
}
return null;
}

public void forEachRemaining(Consumer<? super E> action) {
if (action == null) throw new NullPointerException();
final LinkedBlockingQueue<E> q = this.queue;
if (!exhausted) {
exhausted = true;
Node<E> p = current;
do {
E e = null;
q.fullyLock();
try {
if (p == null)
p = q.head.next;
while (p != null) {
e = p.item;
p = p.next;
if (e != null)
break;
}
} finally {
q.fullyUnlock();
}
if (e != null)
action.accept(e);
} while (p != null);
}
}

public boolean tryAdvance(Consumer<? super E> action) {
if (action == null) throw new NullPointerException();
final LinkedBlockingQueue<E> q = this.queue;
if (!exhausted) {
E e = null;
q.fullyLock();
try {
if (current == null)
current = q.head.next;
while (current != null) {
e = current.item;
current = current.next;
if (e != null)
break;
}
} finally {
q.fullyUnlock();
}
if (current == null)
exhausted = true;
if (e != null) {
action.accept(e);
return true;
}
}
return false;
}

public int characteristics() {
return Spliterator.ORDERED | Spliterator.NONNULL |
Spliterator.CONCURRENT;
}
}

/**
* Returns a {@link Spliterator} over the elements in this queue.
*
* <p>The returned spliterator is
* <a href="package-summary.html#Weakly"><i>weakly consistent</i></a>.
*
* <p>The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
* {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
*
* @implNote
* The {@code Spliterator} implements {@code trySplit} to permit limited
* parallelism.
*
* @return a {@code Spliterator} over the elements in this queue
* @since 1.8
*/
public Spliterator<E> spliterator() {
return new LBQSpliterator<E>(this);
}

/**
* Saves this queue to a stream (that is, serializes it).
*
* @param s the stream
* @throws java.io.IOException if an I/O error occurs
* @serialData The capacity is emitted (int), followed by all of
* its elements (each an {@code Object}) in the proper order,
* followed by a null
*/
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {

fullyLock();
try {
// Write out any hidden stuff, plus capacity
s.defaultWriteObject();

// Write out all elements in the proper order.
for (Node<E> p = head.next; p != null; p = p.next)
s.writeObject(p.item);

// Use trailing null as sentinel
s.writeObject(null);
} finally {
fullyUnlock();
}
}

/**
* Reconstitutes this queue from a stream (that is, deserializes it).
* @param s the stream
* @throws ClassNotFoundException if the class of a serialized object
* could not be found
* @throws java.io.IOException if an I/O error occurs
*/
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
// Read in capacity, and any hidden stuff
s.defaultReadObject();

count.set(0);
last = head = new Node<E>(null);

// Read in all elements and place in queue
for (;;) {
@SuppressWarnings("unchecked")
E item = (E)s.readObject();
if (item == null)
break;
add(item);
}
}
}


参考博客和文章书籍等:

《Java并发编程的艺术》

因博客主等未标明不可引用,若部分内容涉及侵权请及时告知,我会尽快修改和删除相关内容