ConcurrentHashMap源码解读-1.8版

ConcurrentHashMap源码解读

第一节 结构

1.1 基本结构

继承抽象类AbstractMap,实现ConcurrentMap和Serializable接口。关键的成员属性都由 volatile 修饰,通过结合 CAS 来保证线程安全,synchronized 锁单个Node链表来实现类似分段锁的功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {

/**
* 散列表即桶数组,首次put时延迟初始化,大小总是2的幂,由迭代器直接访问。
*/
transient volatile Node<K,V>[] table;

/**
* 下个要用的表,仅在resize时不为空,也就是用于扩容,扩容后更新为null
*/
private transient volatile Node<K,V>[] nextTable;

/**
* 基础计数器,主要在没有线程竞争时使用,但也在表初始化竞争中当作备用方案,通过CAS更新
*/
private transient volatile long baseCount;

/**
* 表的初始化和resize控制器数,也用来作扩容阈值。
* 如果为负数,表示散列表正在初始化或扩容(-1:表示初始化,-n:表示负的活跃的resize线程数+1)。
* 当table为空时将保留创建时要使用的初始表大小,默认为0,会大概设置为0.75s左右,通过sc = n - (n >>> 2)
*/
private transient volatile int sizeCtl;

/**
* 扩容时用来转移的索引
*/
private transient volatile int transferIndex;

/**
* 创建或resize CounterCells时要用的自旋锁(通过CAS锁定)
*/
private transient volatile int cellsBusy;

/**
* 计数单元的数组,非空时大小是2的幂
*/
private transient volatile CounterCell[] counterCells;

// views
private transient KeySetView<K,V> keySet;
private transient ValuesView<K,V> values;
private transient EntrySetView<K,V> entrySet;
}

1.2 Node单向链表节点

通过Node链表来应对哈希冲突,在结构上和1.8版本的HashMap的Node类似,但val和next是volatile修饰的。

1
2
3
4
5
6
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
}

第二节 主要方法

2.1 get

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash

//高位异或到低位,更多哈希冲突的场景用红黑树来优化,所以采用这种即方便又能解决问题的方案。
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}

//获取指定下标在哈希表中的节点
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}

public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
//把键的hashcode再散列得到h,合并高位低位特征,使散列更均匀
int h = spread(key.hashCode());
//散列表不为空,获取节点e,通过(tab.length - 1) & h与运算定位下标,判断对应下标是否已有元素
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
//e不为空,且hash值等于h,再判断key是否相等,是则表示命中,直接返回对应的值
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
//e的hash值不等于h,且为负数,表示此时结构为红黑树
else if (eh < 0)
//尝试遍历树查询key并返回
return (p = e.find(h, key)) != null ? p.val : null;

//e未命中,若还有后续节点,则遍历链表查询key并返回
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
//未能找到
return null;
}

2.2 put

put() 只是调用了 putVal() ,若链表为空时,会尝试CAS新增链表节点;若哈希表正在扩容,当前线程会尝试帮助扩容;链表非空,且未在扩容,则 synchronized 锁住此Node头节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
/*
* Encodings for Node hash fields. 特殊的哈希值
*/
static final int MOVED = -1; // hash for forwarding nodes 转移节点的hash
static final int TREEBIN = -2; // hash for roots of trees 树根的hash
static final int RESERVED = -3; // hash for transient reservations

public V put(K key, V value) {
return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
//首先不允许新增空键或空值
if (key == null || value == null) throw new NullPointerException();
//把键的hashcode再散列得到h,合并高位低位特征,使散列更均匀
int hash = spread(key.hashCode());
//初始化binCount,用来记录修改的长度
int binCount = 0;

//死循环遍历Node数组,直到操作成功
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//判断若是首次执行putVal(),则调用initTable进行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//已有哈希表,并且hash对应下标节点还为null,此桶还未被使用,CAS新增节点
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//通过Unsafe的compareAndSwapObject尝试新增链表节点,完成任务跳出循环
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//已有哈希表,首节点也已存在,但f处于MOVED状态表示此时正在扩容,则当前线程尝试帮助扩容
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);//帮助扩容
//已有哈希表,链表也已初始化,且节点不在移动状态,则加锁进行操作
else {
//oldVal用来记录旧值
V oldVal = null;
//代码块加锁同步f节点,根据链表或红黑树分别进行操作
//链表则覆盖或尾插法新增节点
//红黑树则调用API,返回对应节点(可能是新建节点),并覆盖
synchronized (f) {
//若f此时未被修改,则继续操作,否则跳过
if (tabAt(tab, i) == f) {

//1.首节点hash>=0,表示正常态,即链表节点
if (fh >= 0) {
//binCount记录为1,至少已有一个
binCount = 1;
//遍历此链表,直到完成操作显式终止循环,计算链表长度
for (Node<K,V> e = f;; ++binCount) {
K ek;
//命中节点,覆盖value,完成操作后跳出循环
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
//记录oldVal,此时表示map中此键已对应有值
oldVal = e.val;
//当onlyIfAbsent为false时才进行覆盖
if (!onlyIfAbsent)
e.val = value;
//跳出循环
break;
}
//当前节点还不是,e移到下个节点next
Node<K,V> pred = e;
//已遍历完链表,未找到则新增节点,尾插法,完成跳出循环
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//2.非正常态,而是TreeBin节点,则向红黑树插入节点
else if (f instanceof TreeBin) {
Node<K,V> p;
//binCount更新为2
binCount = 2;
//插入树节点,完成操作
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//同步块结束
//binCount不为0表示put已对数据产生了影响,不管是否有覆盖
if (binCount != 0) {
//若达到链表阈值,就转此节点链表为红黑树
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);//转换红黑树
//oldVal不为空,表示是一次更新操作,没有对元素个数产生影响,直接返回
if (oldVal != null)
return oldVal;
//否则元素个数已改变,跳出死循环,继续后续操作
break;
}
//binCount还为0,put操作未成功,继续死循环
}
}
//更新计数值,并继续判断是否需要扩容
addCount(1L, binCount);
return null;
}


/**
* The default initial table capacity. Must be a power of 2
* (i.e., at least 1) and at most MAXIMUM_CAPACITY.
*/
private static final int DEFAULT_CAPACITY = 16;

/**
* Initializes table, using the size recorded in sizeCtl.
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
//当哈希表为空或长度为0时循环操作,否则直接返回
while ((tab = table) == null || tab.length == 0) {
//若sizeCtl<0,表示散列表正在初始化或扩容,正在被其他线程处理
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin 自旋等待
//否则CAS更新sizeCtl为-1
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
//确认此时还未初始化
if ((tab = table) == null || tab.length == 0) {
//初始化时sizeCtl可能会自定义,若未自定义则取默认值16
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
//新建节点数组,初始化table
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
//sizeCtl取和无符号右移2位后的差,即每隔n个4,就要减去n,也就是合理的取阈值0.75
sc = n - (n >>> 2);
}
} finally {
//更新sizeCtl
sizeCtl = sc;
}
break;
}
}
return tab;
}

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}


/**
* A node inserted at head of bins during transfer operations.
*/
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;

......
}

2.3 size

CounterCell 是一个计数单元,无并发时只要用 baseCount 就足以,但一旦并发时 CAS 修改 baseCount 失败,就要启用 counterCells ,计算size时会将所有计数单元数组中元素累加。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//Java 8提供了mappingCount返回long,因为int可能会限制size最大值
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}

final long sumCount() {
//分别获取计数单元和baseCount
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
//若计数单元数组counterCells不为空,即已初始化,就遍历其中元素,与baseCount相加得到元素和
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}

/**
* A padded cell for distributing counts. Adapted from LongAdder and Striped64.
* @sun.misc.Contended会影响计数单元性能,用于防止伪共享,即本应独立的缓存变量却共享一个缓存行,从而影响各自的性能
*/
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}

2.4 addCount

addCount 方法用来增加哈希表计数值,在 put() 导致元素数目变更后需要调用此方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
/**
* 增加哈希表计数值
* 若表需要扩容且未在扩容,则开始transfer,若已经在扩容,则在可以时帮助transfer
* 重新检查占用,确认会不会已要进行另一次resize,因为resize远远滞后于add
* @param x the count to add
* @param check if <0, don't check resize, if <= 1 only check if uncontended
*/
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
//若counterCells计数单元不为空,或CAS对baseCount原子操作 +x 时失败,表示进入并发状态
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
//标识没有竞争
boolean uncontended = true;
//如果counterCells为空,或其中元素为0,表示尚未出现并发,与前一个判断结果冲突
//如果本地线程随机取余对应counterCells下标为空,对应实现:UNSAFE.getInt(Thread.currentThread(), PROBE);
//如果CAS对取到的元素进行原子操作 +x 失败了,表示出现并发竞争
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
//总之就是未成功执行计数单元数组counterCells更新计数值
//执行fullAddCount,其会一直执行直到成功,并直接返回
fullAddCount(x, uncontended);
return;
}
//check对应put即binCount,取值0、1、2
//若check小于等于1,表示此时还是链表,直接返回
if (check <= 1)
return;
//若check大于1,已裂变为红黑树,更新一下s
s = sumCount();
}
//如果需要检查,即检查是否需要扩容,putVal调用时所给check必然>=0
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
//当map的size达到sizeCtl扩容阈值,且哈希表不为空,且哈希表还未到最大值,则进行扩容
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
//通过哈希表长度获得一个resize标识,得到一个低16位首位为1的大数。
int rs = resizeStamp(n);
//若此时已在扩容
if (sc < 0) {
//开启扩容后会把sizeCtl替换为(rs << 16) + 2
//1.若sizeCtl右移16位,其高16位不等于rs,表示sizeCtl被改变了(2这些低位值会被右移掉)
//2.此处rs + 1应该是BUG,应该改为(rs << 16) + 1,表示所有线程都完成了扩容工作
// 默认第一个线程设置sizeCtl = (rs << 16) + 2,之后的线程会+1
// 所以没有线程时基础值就为(rs << 16) + 1
// 此处读源码时隐隐感觉有BUG,去sof搜了一下,果然有人已经发过类似的题问
// 看描述已经被JDK BUG审核通过了,省去大麻烦,十分感谢
//3.此处rs + MAX_RESIZERS应该是BUG,改为(rs << 16)+MAX_RESIZERS,表示帮助线程数已达到最大值
//4.若nextTable为空,表示扩容已结束
//5.若transferIndex <= 0,表示所有区间都已分配或完成了,不再需要线程协助
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
//以上条件任一满足,都表示扩容要结束了,即可结束跳出循环
break;//结束循环
//否则尝试CAS原子操作sizeCtl++,表示帮助线程+1,并执行扩容transfer
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//此时不在扩容,则尝试开启扩容流程
//首先尝试CAS原子操作将sizeCtl更新为resize标识左移16位并+2
//因为resize得到了一个低16位首尾为1的整型,这样就变成一个负数
//所以开始扩容后sizeCtl的初始值为(rs << 16) + 2,此时一个线程在进行扩容
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);//更新sizeCtl为负数后开始扩容
//更新size,即使已经在扩容了,也可能马上迎来下次扩容
s = sumCount();
}
}
}

//获取一个用来调整大小为n的哈希表的标记位,得到低16位,符号位为1
static final int resizeStamp(int n) {
//n自左连续0的位数x 和 后续二进制数进行或运算
// x | 1000 0000 0000 0000
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}

public static int numberOfLeadingZeros(int i) {
// HD, Figure 5-6
if (i == 0)
return 32;
int n = 1;
//高16位为0,n为17,i左移16位
if (i >>> 16 == 0) { n += 16; i <<= 16; }
//高8位为0,n为9,i左移8位
if (i >>> 24 == 0) { n += 8; i <<= 8; }
//高4位为0,n为5,i左移4位
if (i >>> 28 == 0) { n += 4; i <<= 4; }
//高2位为0,n为3,i左移2位
if (i >>> 30 == 0) { n += 2; i <<= 2; }
//此时n是左移位数+1
//此时i是左移(n-1)后的值
//将i无符号右移31位,并从n中去掉
n -= i >>> 31;
//最后得到从左到右连续0的个数
return n;
}

addCount将哈希表元素数+1,对baseCount和counterCells进行了修改,当counterCells被初始化就优先使用。并检查当前是否需要扩容,若需要扩容就开启扩容,若正在扩容就帮助进行扩容。

2.5 helpTransfer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
//当扩容正在进行时,帮忙进行transfer
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
//若节点f为转发节点,且新表不为空,则尝试帮助扩容(转移节点相当于路标,告知其他线程此桶在扩容转移)
if (tab != null && (f instanceof ForwardingNode) &&
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
//通过表长获得一个resize标识,低16位首位为1,以便左移后得到一个负数
int rs = resizeStamp(tab.length);
//当nextTable和table还未被修改,且sizeCtl<0表示正在扩容,进行循环
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
//开启扩容后会把sizeCtl替换为(rs << 16) + 2
//此部分与addCount一样:五种特征表示扩容已完成
//1.若sizeCtl右移16位,其高16位不等于rs,表示sizeCtl被改变了(2这些低位值会被右移掉)
//2.此处rs + 1应该是BUG,应该改为(rs << 16) + 1,表示所有线程都完成了扩容工作
// 默认第一个线程设置sizeCtl = (rs << 16) + 2,之后的线程会+1
// 所以没有线程时基础值就为(rs << 16) + 1
//3.此处rs + MAX_RESIZERS应该是BUG,改为(rs << 16)+MAX_RESIZERS,表示帮助线程数已达到最大值
//4.若nextTable为空,表示扩容已结束
//5.若transferIndex <= 0,表示所有区间都已分配或完成了,不再需要线程协助
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;//这些情况不需要再协助了,直接跳出循环
//否则尝试CAS原子操作使sizeCtl++,表示增加了一个帮助线程
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
//进行转移
transfer(tab, nextTab);
//并跳出循环
break;
}
}
return nextTab;//返回新表
}
//无需协助
return table;//返回旧表
}

2.6 transfer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
//CPU核心数大于1时,除以8得到结果如果小于MTS,就直接使用MTS
//MTS是每次transfer最小绑定数为16,同来限制哈希表被分的太小,所以默认情况下一个线程至少要处理16块桶
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range 细分范围
//若所给的参数新表还未初始化,则先创建新表进行初始化
if (nextTab == null) { // initiating
try {
//扩容2倍,创建哈希表
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
//赋值给nextTab
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
//扩容失败,sizeCtl直接采用int最大值
sizeCtl = Integer.MAX_VALUE;
//结束
return;
}
//扩容初始化成功,更新nextTable
nextTable = nextTab;
//更新转移下标,指向除了旧表长度后的首位
transferIndex = n;
}

//新表的长度
int nextn = nextTab.length;
//创建一个转移节点,用于占位,其他线程看到转移节点会知道对应区域正在进行扩容转移
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
//推进标识,
boolean advance = true;
//结束标识,指扩容过程
boolean finishing = false; // to ensure sweep before committing nextTab
//最外层死循环,直到执行成功
//i是指向桶数组的指针,bound则是指向当前线程可以处理的桶区间最小下标
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
//循环推进,直到成功,此循环主要负责在分区结束或整个流程结束时处理i和bound指针的值
while (advance) {
int nextIndex, nextBound;
//i减1如果大于等于bound,说明指针还未移出划分区域,表示之前分配的区间还未结束,跳出循环,继续工作
//i在分区内倒序遍历,到达bound分区开始时再进入下个分区
//若结束标识为真,则表示表示扩容结束,跳出循环,在后面进行收尾后返回
if (--i >= bound || finishing)
advance = false;//防止在没有成功处理一个桶的情况下却进行了推进
//若转移索引小于0,表示没有区间了,跳出循环,扩容结束
else if ((nextIndex = transferIndex) <= 0) {
//更新i为-1,表示扩容结束
i = -1;
//防止在没有成功处理一个桶的情况下却进行了推进
advance = false;
}
//CAS原子操作更新transferIndex为nextIndex和stride的差
//比如旧表64扩容,区间16,第一次分配区间为48-63,下一次为32-47
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
//值为当前线程可以处理的当前区间的最小下标 |nextBound ... nextIndex-1|
bound = nextBound;
//初次对i赋值,当前线程可以处理的当前区间的最大下标
i = nextIndex - 1;
//防止在没有成功处理一个桶的情况下却进行了推进
advance = false;
}
}
//三种情况:
//i < 0,表示已经没有区间了,扩容任务都已完成或分配出去了
//i >= n,当前下标超过旧表容量,表示i异常
//i + n >= nextn,当前下标+旧表长超过新表长度,表示i异常
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
//若已完成了扩容,则进行收尾工作
if (finishing) {
//重置新表为空
nextTable = null;
//将当前表指向新表
table = nextTab;
//更新sizeCtl为2n-n/2
sizeCtl = (n << 1) - (n >>> 1);
//直接结束
return;
}
//若还未未完成扩容任务
//CAS原子操作更新sizeCtl--,表示当前线程要结束转移了
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
//若sc != (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2,直接返回
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
//若相等,表示已无帮助线程,说明此次扩容已结束
//更新结束标识为true,表示已完成这次扩容任务
finishing = advance = true;
//重置i,还不急return,再检查一下再返回
i = n; // recheck before commit
}
}
//如果旧表i对应节点为空,不用操作了,就用转移节点来占位
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);//占位并恢复推进标识
//如果旧表i对应节点不为空,且已被标注为转移节点
else if ((fh = f.hash) == MOVED)
advance = true; // already processed 说明其他线程已经处理过了,不用管,恢复推进标识
//以上都不满足,表示下标对应节点有值,且不是占位符,则开始操作
else {
//先对节点加锁
synchronized (f) {
//拿到锁时f未被修改
if (tabAt(tab, i) == f) {
//声明低位节点和高位节点
Node<K,V> ln, hn;
//节点f的hash>=0表示这是正常链表节点
if (fh >= 0) {
//首节点hash值和旧表长进行与运算,因为表长都是只有一位是1其他位是0的二进制
//所以与运算结果只有1或0,分别对应高位和低位
//目的是将链表重新散列,放到对应的位置上,让新的取余算法能够击中他。
int runBit = fh & n;
//lastRun用来标记最后一次变化的节点
Node<K,V> lastRun = f;
//遍历当前桶,即节点对应链表
for (Node<K,V> p = f.next; p != null; p = p.next) {
//和旧表长与运算,得到此节点的“命运”
int b = p.hash & n;
//不等于runBit则更新
if (b != runBit) {
//更新runBit,用于判断赋值低位还是高位
runBit = b;
//更新lastRun,纪录变化的节点,后续节点自然和此节点相同
lastRun = p;
}
}
//如果最后更新的runBit为0,放置到低位节点
if (runBit == 0) {
ln = lastRun;
hn = null;
}
//如果最后更新的runBit为1,放置到高位节点
else {
hn = lastRun;
ln = null;
}
//再次遍历链表,生成高位和低位链表,lastRun纪录的最后一次变化节点作为结束标记
//未被遍历到的节点保持原样,避免不必要的循环操作
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
//类似于HashMap,低位链表放置于i,高位链表放置于i+n
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
//旧链表设为转移节点占位,提醒其他线程
setTabAt(tab, i, fwd);
//这样就完成了这个桶/链表的转移工作,恢复推进标识,继续推进
advance = true;
}
//红黑树优化
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
//lo和hi分别对应不需要再散列和需要再散列的节点
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
//循环遍历树
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
//新建树节点
TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);
//与HashMap实现类似
//原数组中计算key在数组中的位置:hash & (oldCap - 1)
//新数组为:hash & (2 * oldCap - 1) = hash & (oldCap << 1 - 1)
//所以如果hash对应在oldCap中的最高位为0,则 hash & (oldCap - 1) == hash & (2 * oldCap - 1),即在新数组中位置不变
if ((h & n) == 0) {//位置不变的节点
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {//位置改变的节点
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
//通过三目运算符用对非法树处理
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
//分别把不需要动的和需要转移的树节点放置正确
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
//旧表对应下标放置转移节点提醒
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}

2.7 fullAddCount

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
/**
* 返回当前线程的探测值,注意调用ThreadLocalRandom.current()会在返回0时强制进行初始化
*/
static final int getProbe() {
return U.getInt(Thread.currentThread(), PROBE);
}

// See LongAdder version for explanation
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
//从本地获取探测值,若为0,则强制进行初始化并再获取,更新无竞争标识符为true
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true;
}
boolean collide = false; // True if last slot nonempty 若最后一个插槽为空则为true
//死循环
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
//若counterCells不为空,且有元素,则不断尝试新增或更新对应计数单元
if ((as = counterCells) != null && (n = as.length) > 0) {
//若探测值对应计数单元为空,(length-1) & probe定位元素,则尝试创建新的计数单元
if ((a = as[(n - 1) & h]) == null) {
//自旋锁cellsBusy=0
if (cellsBusy == 0) { // Try to attach new Cell
//乐观的创建一个新计数单元,计数值即为参数x
CounterCell r = new CounterCell(x); // Optimistic create
//若自旋锁cellsBusy依旧=0
//且尝试CAS更新自旋锁为1
if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
//更新成功,锁死后复查
try { // Recheck under lock
CounterCell[] rs; int m, j;
//counterCells已创建,且已有元素,且探测值对应计数单元为空,则将创建的计数单元赋予
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;//恢复自旋锁
}
//若成功添加就跳出死循环
if (created)
break;
//添加失败,说明对应计数单元不为空
continue; // Slot is now non-empty
}
}
collide = false;
}
//计数单元不为空,且若有竞争,表示CAS操作失败,重置标识,等待再散列后重试
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//计数单元不为空,重新尝试CAS更新计数值,若成功则跳出循环
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
//计数单元不为空,重新尝试CAS失败,且counterCells已被修改或元素数超过CPU核数,更新collide继续循环尝试
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale 表示此时已达到最大值或计数单元是旧数据
//以上都不符合,且collide依然为false,更新collide继续循环尝试
else if (!collide)
collide = true;
//collide为true,可能另外的线程完成了对这个计数单元的操作,尝试获得自旋锁,并扩大计数单元数组容量
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale 只要表还未被修改就把它展开
//创建一个新计数单元数组,扩大一倍容量
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)//拷贝旧值
rs[i] = as[i];
counterCells = rs;//更新
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table 用新的数组重试
}
//更新探测值
h = ThreadLocalRandom.advanceProbe(h);
}
//计数单元数组为空或还未有元素,获取自旋锁,并占用成功,则尝试初始化计数单元数组
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean init = false;
try { // Initialize table
//初始化计数单元数组,创建首个计数单元
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2];
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//自旋锁正在被占用,重新尝试更新baseCount,若更新成功就跳出循环
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}

第三节 总结

几个关键点:

  • 在键的哈希码基础上 key.hashcode() ,进行再散列,通过把高位移位异或,合并高低位的特征,使散列更均匀。
  • 根据 (tab.length - 1) & hash 来将哈希码转换为数组下标。

3.1 怎么获取元素?

  1. 先对哈希码进行再散列(高位右移异或)。
  2. 新哈希码定位到散列表坐标 (n - 1) & h ,找到桶对应的首节点。
  3. 依次判断哈希码、键是否相等,若是则命中并返回。
  4. 不是则先判断是否已裂变红黑树(哈希码为负),若是则调用树查询。
  5. 非树,则遍历链表查询后续节点。

3.2 怎么放置元素?

  1. 首先校验不允许空值或空键,先对哈希码进行再散列(高位右移异或),并进入死循环。

  2. 哈希表还未初始化,则会先进行初始化,然后继续循环尝试put。

  3. 若表下标 (n - 1) & h 对应节点还为null,则尝试CAS新增链表节点,跳出死循环->7。

  4. 已有哈希表,桶也存在,但对应桶正在扩容,则先协助扩容,然后继续循环。

  5. 已有哈希表,桶也存在,桶可正常使用,则 synchronized 锁住此Node头节点。然后根据链表和红黑树来完成put操作。若是链表则覆盖或尾插法新增节点;若是红黑树则调用API,返回对应节点(可能是新建节点),并覆盖。

  6. put操作若未成功就继续循环;若成功则先判断是否需要把链表转为红黑树,然后判断若只是更新操作就直接返回,若导致计数变更,则跳出死循环->7。

  7. 死循环结束,调用addCount更新计数器值,并在这个过程中继续判断是否需要扩容。

3.3 怎么获取元素数目?

  1. 若计数单元数组counterCells不为空,就遍历其内元素与baseCount相加求和,得到最终计数值。
  2. 若为空则直接返回baseCount

无并发时只要用baseCount就足以,但一旦并发时CAS修改baseCount失败,就要启用counterCells,计算size时会将所有计数单元数组累加。

两种技术器都是通过 addCount() 来更新,但可能会因为并发而更新失败,会采用备用方法 fullAddCount(),死循环不断尝试。

3.4 怎么扩容?

  1. 扩容流程可以多线程协作,每个线程负责一块区域,要求至少16个桶。
  2. 扩容通过转移节点到新表来完成,新表初始化为旧表2倍大小。
  3. 因为要线程间协作,在某线程转移某下标链表后,会在旧表用一个转移节点替换表示已处理。
  4. 线程按分区一段一段倒序内循环,完成遍历后更新 sizeCtl-- ,表示参与转移线程数-1。
  5. 当线程要进行转移时,根据链表和红黑树有不同的实现,但思路是一样的,都保留哈希码不变的,转移哈希码变更的。
  6. 转移过程通过 synchronized 锁住同步资源-首节点。

3.5 怎么协助扩容?

  1. 首先判断若节点f为转发节点,且新表不为空,则尝试帮助扩容,否则直接返回旧表。
  2. 通过哈希表长度获得一个resize标识,是一个低16位首位为1的大数,从而能在左移后变成一个负数。
  3. 判断新旧表还未被修改且 sizeCtl < 0 ,表示还在扩容,然后根据五种特征判断扩容是否要结束,若满足即可跳出扩容循环。
  4. 若不满足特征,表示扩容还在继续,则尝试CAS原子操作 sizeCtl++ ,表示帮助线程+1,并执行扩容transfer,完毕跳出循环。
  5. 循环结束,直接返回新表。

根据源码可以发现协助扩容 helpTransfer 核心部分代码与更新计数器 addCount 相同。

3.6 怎么更新计数器?

更新计数器流程分两部分:并发更新扩容

  1. 首先判断计数单元数组是否已创建,或尝试CAS对baseCount计数更新是否可以成功,二者皆为并发特征。
  2. 如果不满足,表示不需要进行并发更新,然后根据check参数是否>=0判断是否需要扩容检查,如果不需要则结束。
  3. 如果满足,则表示baseCount不可靠,需要更新计数单元数组。
  4. 若当前计数单元数组为空,或选定计数单元为空,或CAS更新计数单元失败,则调用备案fullAddCount,然后结束。
  5. 若非上述情况,则表示CAS更新成功,再判断所给check,若<=1表示还是链表,则直接返回,不进行扩容检查;若check>1,表示已是红黑树,需还要再算一下此时的size。
  6. 然后进入扩容流程,首先判断只有 check >= 0 才有必要扩容。
  7. 当此时size达到sizeCtl扩容阈值等,以此作为循环条件,则进行扩容循环,否则函数执行完毕。
  8. 通过哈希表长度获得一个resize标识,是一个低16位首位为1的大数,从而能在左移后变成一个负数。
  9. 判断 sizeCtl < 0 ,表示其他线程已经在扩容,然后根据五种特征判断扩容是否要结束,若满足即可跳出扩容循环。
  10. 若不满足特征,表示扩容还在继续,则尝试CAS原子操作 sizeCtl++ ,表示帮助线程+1,并执行扩容transfer。
  11. sizeCtl >= 0 此时不在扩容,则尝试开启扩容流程,关键在于 (resize << 16) + 2 得到一个大的负数作为初始值,并CAS覆盖 sizeCtl ,后续每个线程都直接 sizeCtl++ ,成功后,执行扩容transfer。
  12. 等扩容执行完毕后,更新一下当前size,继续循环,直到扩容条件不满足。


参考博客和文章书籍等:

https://stackoverflow.com/questions/53493706/how-the-conditions-sc-rs-1-sc-rs-max-resizers-can-be-achieved-in

ConcurrentHashMap源码解读-1.8版

高并发编程系列:ConcurrentHashMap的实现原理(JDK1.7和JDK1.8)

因博客主等未标明不可引用,若部分内容涉及侵权请及时告知,我会尽快修改和删除相关内容