ConcurrentSkipListMap

ConcurrentSkipListMap

第一节 概述

1.1 什么是ConcurrentSkipListMap

ConcurrentSkipListMap是由并发包提供的一种线程安全的并发容器,(优点)使我们能够以无锁方式实现线程安全底层是通过跳表来实现的能够保证元素有序

跳表是一个链表,但是通过使用“跳跃式”查找的方式使得插入、读取数据时复杂度变成了 O(logn) 。注意:size() 由于多个线程可以同时对映射进行操作,所以需要遍历整个映射才能返回元素个数,这是个 O(logn) 的操作。

1.2 简单使用

一个简单的实例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public static void main(String[] args){
ConcurrentSkipListMap<Integer, String> map = new ConcurrentSkipListMap<>(
Comparator.comparingInt(v -> v));

//通过put添加
map.put(1, "a");
map.put(2, "b");
map.put(3, "c");
map.put(-1, "a");

//线程集合
List<Thread> ts = new ArrayList<Thread>(600);
//开始时间
long start = System.currentTimeMillis();
//遍历线程集合,构建每个线程,每个线程都分别调用线程安全和不安全的计数器
for (int j = 0; j < 100; j++) {
Thread t = new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 100; i++) {//分别调用安全和不安全的计数器
map.put(i, i + "");
}
}
});
ts.add(t);
}
//遍历执行所有线程
for (Thread t : ts) {
t.start();
}
//等待所有线程执行完成
for (Thread t : ts) {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//打印最终结果
map.forEach((k,v) -> {System.out.println("k: " + k + " v: " + v);});
System.out.println(System.currentTimeMillis() - start);
}

运行结果如下:

1
2
3
4
5
6
7
8
k: -1 v: a
k: 0 v: 0
k: 1 v: 1
k: 2 v: 2
...
k: 98 v: 98
k: 99 v: 99
82

把 ConcurrentSkipListMap 替换为 TreeMap 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
k: -1 v: a
k: 0 v: 0
k: 1 v: 1
k: 2 v: 2
k: 3 v: 3
k: 4 v: 4
k: 5 v: 5
k: 6 v: 6
k: 7 v: 7
k: 12 v: 12
k: 8 v: 8
k: 9 v: 9
k: 10 v: 10
k: 11 v: 11
k: 12 v: 12
...
k: 50 v: 50
k: 51 v: 51
k: 52 v: 52
k: 53 v: 53
k: 54 v: 54
k: 55 v: 55
k: 57 v: 57
k: 56 v: 56
k: 57 v: 57
k: 58 v: 58
k: 59 v: 59
...
k: 99 v: 99
46

可以得到一个初印象:TreeMap 多线程环境下非线程安全,但单线程的有序映射有较好的性能,而 ConcurrentSkipListMap 则可以在相同需求的多线程场景作为替代品。


第二节 跳表

2.1 概述

跳表(SkipList):是一种优秀的数据结构,**使得包含n个元素的有序序列的查找和插入操作的平均时间复杂度都是 O(logn) ,要优于数组的 O(n) **。

快速的查询效果是通过维护一个多层次的链表实现的,且与前一层(下面一层)链表元素的数量相比,每一层链表中的元素的数量更少(见下图)。一开始时,算法在最稀疏的层次进行搜索,直至需要查找的元素在该层两个相邻的元素中间。这时,算法将跳转到下一个层次,重复刚才的搜索,直到找到需要查找的元素为止。跳过的元素的方法可以是随机性选择或确定性选择,其中前者更为常见。

一张跳跃列表的示意图。每个带有箭头的框表示一个指针, 而每行是一个稀疏子序列的链表;底部的编号框(黄色)表示有序的数据序列。查找从顶部最稀疏的子序列向下进行, 直至需要查找的元素在该层两个相邻的元素中间。

2.2 算法

跳跃列表是按层建造的。底层是一个普通的有序链表。每个更高层都充当下面列表的“快速通道”,这里在第 i 层中的元素按某个固定的概率 p(通常为 1/2 或 1/4 )出现在第 i+1 层中。每个元素平均出现在 1 / 1-p 个列表中,而最高层的元素(通常是在跳跃列表前端的一个特殊的头元素)在 log1/pn 个列表中出现。

在查找目标元素时,从顶层列表、头元素起步。算法沿着每层链表搜索,直至找到一个大于或等于目标的元素,或者到达当前层列表末尾。如果该元素等于目标元素,则表明该元素已被找到;如果该元素大于目标元素或已到达链表末尾,则退回到当前层的上一个元素,然后转入下一层进行搜索。每层链表中预期的查找步数最多为 1/p ,而层数为 log1/pn ,所以查找的总体步数为 -logpn / p,由于 p 是常数,查找操作总体的时间复杂度为 O(log n) 。而通过选择不同 p 值,就可以在查找代价和存储代价之间获取平衡。

跳跃列表不像平衡树等数据结构那样提供对最坏情况的性能保证:由于用来建造跳跃列表采用随机选取元素进入更高层的方法,在小概率情况下会生成一个不平衡的跳跃列表(最坏情况例如最底层仅有一个元素进入了更高层,此时跳跃列表的查找与普通列表一致)。但是在实际中它通常工作良好,随机化平衡方案也比平衡二叉查找树等数据结构中使用的确定性平衡方案容易实现。跳跃列表在并行计算中也很有用:插入可以在跳跃列表不同的部分并行地进行,而不用对数据结构进行全局的重新平衡。

2.3 实现细节

因为跳跃列表中的元素可以在多个列表中,所以每个元素可以有多于一个指针。

跳跃列表的插入和删除的实现与普通的链表操作类似,但高层元素必须在进行多个链表中进行插入或删除。

跳跃列表的最坏时间性能具有一定随机性,但是可以通过时间复杂度为 O(n) 的遍历操作(例如在打印列表全部内容时)以无随机的算法重整列表的结构,从而使跳跃列表的实际查找时间复杂度尽量符合理论平均值 O(log n) 。

(1)插入操作

往跳跃列表中插入一个元素,如图:

  1. 新节点和各层索引节点逐一比较,确定原链表的插入位置,O(log n)。
  2. 把索引插入到原链表,O(1)。
  3. 利用抛硬币的随机方式,决定新节点是否提升为上一级索引。结果为“正”则提升并继续抛硬币,结果为“负”则停止,O(log n)。

总体上,跳跃表插入操作的时间复杂度是 O(log n) ,而这种数据结构所占空间是2N,既空间复杂度是 O(n) 。

(2)删除操作

在索引层找到要删除的节点,依次删除每层相同节点即可。若某层索引在删除后只剩下一个节点,可以删除此层。

  1. 自上而下,查找第一次出现节点的索引,并逐层找到每一层对应的节点,O(log n) 。
  2. 删除每一层查找到的节点,如果该层只剩下1个节点,删除整个一层(原链表除外),O(log n) 。

总体上,跳跃表删除操作的时间复杂度是 O(log n) 。

2.4 跳跃表和二叉树的区别

  • 跳表维持结构平衡的成本较低,完全依靠随机;二叉树则在多次插入删除后,通过再平衡操作来重新调整结构平衡。
  • 跳表非树结构,分索引节点和底层链表节点,索引节点只有右边和下边两条索引,链表节点则只有下个节点索引,而二叉树节点则有左右子结点。

补充:Redis通过跳跃表的优化实现Sorted-set维护有序集合,而关系型数据库则采用B+树。


第三节 源码解析

1.1 基本结构

ConcurrentSkipListMap 中包括 Node 和 Index 两种节点,其中 Node 即普通链表节点,而 Index 则是索引节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V>
implements ConcurrentNavigableMap<K,V>, Cloneable, Serializable {
/**
* 跳表的最高头索引
*/
private transient volatile HeadIndex<K,V> head;

/**
* 维护映射内元素顺序的比较器,可以为空表示自然顺序
*/
final Comparator<? super K> comparator;

/** 延迟初始化的键集 */
private transient KeySet<K> keySet;
/** 延迟初始化的条目集 */
private transient EntrySet<K,V> entrySet;
/** 延迟初始化的值集合 */
private transient Values<V> values;
/** 延迟初始化的降序键集 */
private transient ConcurrentNavigableMap<K,V> descendingMap;

//链表节点
static final class Node<K,V> {
final K key;
volatile Object value;
volatile Node<K,V> next;

Node(K key, Object value, Node<K,V> next) {
this.key = key;
this.value = value;
this.next = next;
}

/**
* 创建一个新的标记节点. 标记的区别在于其值字段指向自身。标记节点也具有空键,这一事实在一些地方得到了利用,但这并不能将标记与也具有空键的基本级别标头节点(head.node)区分开。
*/
Node(Node<K,V> next) {
this.key = null;
this.value = this;
this.next = next;
}

//比较并设置value字段
boolean casValue(Object cmp, Object val) {
return UNSAFE.compareAndSwapObject(this, valueOffset, cmp, val);
}

//比较并设置next字段
boolean casNext(Node<K,V> cmp, Node<K,V> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}


void helpDelete(Node<K,V> b, Node<K,V> f) {
//重新检查链接,然后在每个调用中仅执行一个help-out阶段,这样可以最大程度地减少帮助线程之间的CAS干扰
//如果f是此节点的下个节点,b是此节点的前个节点
if (f == next && this == b.next) {
//如果f为空或f的value字段不等于f,则尝试CAS将新Node节点插入f前
if (f == null || f.value != f) // not already marked
casNext(f, new Node<K,V>(f));
//否则,用f的下个节点覆盖f
else
b.casNext(this, f.next);
}
}
}

static class Index<K,V> {
final Node<K,V> node;//对应链表节点,即value
final Index<K,V> down;//下层索引节点
volatile Index<K,V> right;//右边索引节点

...

//比较并设置right字段,CAS set right value
final boolean casRight(Index<K,V> cmp, Index<K,V> val) {
return UNSAFE.compareAndSwapObject(this, rightOffset, cmp, val);
}

//是否已删除此索引的节点
final boolean indexesDeletedNode() {
return node.value == null;
}

//尝试CAS索引节点newSucc作为当前节点的新右边节点(插入),并将原右边节点succ右移一位
final boolean link(Index<K,V> succ, Index<K,V> newSucc) {
Node<K,V> n = node;
newSucc.right = succ;
return n.value != null && casRight(succ, newSucc);
}

//尝试CAS覆盖当前节点的右边节点succ。如果已知已删除此节点,则失败(强制调用者进行遍历)
final boolean unlink(Index<K,V> succ) {
return node.value != null && casRight(succ, succ.right);
}

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long rightOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Index.class;
rightOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("right"));
} catch (Exception e) {
throw new Error(e);
}
}
}

1.2 get

通过 get() 获取键值对。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45

public V get(Object key) {
return doGet(key);
}

private V doGet(Object key) {
//键不允许为null
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
//outer配合break,外层循环
outer: for (;;) {
//内层循环,找到所给键的前驱节点b,并找到键对应节点n
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
//找到节点已为null,跳出outer
if (n == null)
break outer;
Node<K,V> f = n.next;
//若此时数据已发生改变,跳出内层循环,重新查找
if (n != b.next) // inconsistent read
break;
//n节点的值已为null,说明已被删除,调用helpDelete删掉此节点,跳出内层循环,重新查找
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
//b已被删除,跳出内层循环,重新查找
if (b.value == null || v == n) // b is deleted
break;
//若命中,则返回value
if ((c = cpr(cmp, key, n.key)) == 0) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
//所给键小于找到的n节点键,跳出outer
if (c < 0)
break outer;
//否则右移,继续循环判断
b = n;
n = f;
}
}
return null;
}

1.3 put

通过 put() 放置键值对,前面和 get() 流程类似,当新增节点时。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
return doPut(key, value, false);
}

private V doPut(K key, V value, boolean onlyIfAbsent) {
Node<K,V> z; // added node
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
if (n != null) {
Object v; int c;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
if ((c = cpr(cmp, key, n.key)) > 0) {
b = n;
n = f;
continue;
}
if (c == 0) {
if (onlyIfAbsent || n.casValue(v, value)) {
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
break; // restart if lost race to replace value
}
// else c < 0; fall through
}

z = new Node<K,V>(key, value, n);
if (!b.casNext(n, z))
break; // restart if lost race to append to b
break outer;
}
}

//非只覆盖value,新增了节点,通过随机数维持平衡
int rnd = ThreadLocalRandom.nextSecondarySeed();
if ((rnd & 0x80000001) == 0) { // test highest and lowest bits
int level = 1, max;
while (((rnd >>>= 1) & 1) != 0)
++level;
Index<K,V> idx = null;
HeadIndex<K,V> h = head;
if (level <= (max = h.level)) {
for (int i = 1; i <= level; ++i)
idx = new Index<K,V>(z, idx, null);
}
else { // try to grow by one level
level = max + 1; // hold in array and later pick the one to use
@SuppressWarnings("unchecked")Index<K,V>[] idxs =
(Index<K,V>[])new Index<?,?>[level+1];
for (int i = 1; i <= level; ++i)
idxs[i] = idx = new Index<K,V>(z, idx, null);
for (;;) {
h = head;
int oldLevel = h.level;
if (level <= oldLevel) // lost race to add level
break;
HeadIndex<K,V> newh = h;
Node<K,V> oldbase = h.node;
for (int j = oldLevel+1; j <= level; ++j)
newh = new HeadIndex<K,V>(oldbase, newh, idxs[j], j);
if (casHead(h, newh)) {
h = newh;
idx = idxs[level = oldLevel];
break;
}
}
}
// find insertion points and splice in
splice: for (int insertionLevel = level;;) {
int j = h.level;
for (Index<K,V> q = h, r = q.right, t = idx;;) {
if (q == null || t == null)
break splice;
if (r != null) {
Node<K,V> n = r.node;
// compare before deletion check avoids needing recheck
int c = cpr(cmp, key, n.key);
if (n.value == null) {
if (!q.unlink(r))
break;
r = q.right;
continue;
}
if (c > 0) {
q = r;
r = r.right;
continue;
}
}

if (j == insertionLevel) {
if (!q.link(r, t))
break; // restart
if (t.node.value == null) {
findNode(key);
break splice;
}
if (--insertionLevel == 0)
break splice;
}

if (--j >= insertionLevel && j < level)
t = t.down;
q = q.down;
r = q.right;
}
}
}
return null;
}

1.4 remove

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public V remove(Object key) {
return doRemove(key, null);
}

final V doRemove(Object key, Object value) {
if (key == null)
throw new NullPointerException();
Comparator<? super K> cmp = comparator;
outer: for (;;) {
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
if (n == null)
break outer;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
if ((c = cpr(cmp, key, n.key)) < 0)
break outer;
if (c > 0) {
b = n;
n = f;
continue;
}
if (value != null && !value.equals(v))
break outer;
if (!n.casValue(v, null))
break;
if (!n.appendMarker(f) || !b.casNext(n, f))
findNode(key); // retry via findNode
else {
findPredecessor(key, cmp); // clean index
if (head.right == null)
tryReduceLevel();
}
@SuppressWarnings("unchecked") V vv = (V)v;
return vv;
}
}
return null;
}

1.5 findPredecessor

通过 findPredecessor() 查找给定键和比较器的前驱节点(小)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
/**
* 返回一个其键严格小于给定键的base-level节点,如果没有这样的节点,则返回base-level header。
* 通过不断右移和下移索引,找到对应链表节点,过程中会unlink发现到的已删除节点。
*/
private Node<K,V> findPredecessor(Object key, Comparator<? super K> cmp) {
if (key == null)
throw new NullPointerException(); // don't postpone errors
//外层循环
for (;;) {
//内层循环,从头索引开始
for (Index<K,V> q = head, r = q.right, d;;) {
//右边还有节点
if (r != null) {
Node<K,V> n = r.node;
K k = n.key;
//右边索引对应Node所存值为空,则删掉它,进入下一次循环
if (n.value == null) {
if (!q.unlink(r)) //删掉右边索引节点
break; // restart
r = q.right; // 更新右边索引节点(原右边节点的右边节点)
continue;
}
//若所给键大于右边节点的键,则指针右移,进入下一次循环
if (cpr(cmp, key, k) > 0) {
q = r;
r = r.right;
continue;
}
}
//当前索引节点没有下层索引,直接返回其Node节点
if ((d = q.down) == null)
return q.node;
//否则指针q下移到下层索引,同时更新d
q = d;
r = d.right;
}
}
}

1.6 findNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
/**
* Returns node holding key or null if no such, clearing out any
* deleted nodes seen along the way. Repeatedly traverses at
* base-level looking for key starting at predecessor returned
* from findPredecessor, processing base-level deletions as
* encountered. Some callers rely on this side-effect of clearing
* deleted nodes.
*
* Restarts occur, at traversal step centered on node n, if:
*
* (1) After reading n's next field, n is no longer assumed
* predecessor b's current successor, which means that
* we don't have a consistent 3-node snapshot and so cannot
* unlink any subsequent deleted nodes encountered.
*
* (2) n's value field is null, indicating n is deleted, in
* which case we help out an ongoing structural deletion
* before retrying. Even though there are cases where such
* unlinking doesn't require restart, they aren't sorted out
* here because doing so would not usually outweigh cost of
* restarting.
*
* (3) n is a marker or n's predecessor's value field is null,
* indicating (among other possibilities) that
* findPredecessor returned a deleted node. We can't unlink
* the node because we don't know its predecessor, so rely
* on another call to findPredecessor to notice and return
* some earlier predecessor, which it will do. This check is
* only strictly needed at beginning of loop, (and the
* b.value check isn't strictly needed at all) but is done
* each iteration to help avoid contention with other
* threads by callers that will fail to be able to change
* links, and so will retry anyway.
*
* The traversal loops in doPut, doRemove, and findNear all
* include the same three kinds of checks. And specialized
* versions appear in findFirst, and findLast and their
* variants. They can't easily share code because each uses the
* reads of fields held in locals occurring in the orders they
* were performed.
*
* @param key the key
* @return node holding key, or null if no such
*/
private Node<K,V> findNode(Object key) {
if (key == null)
throw new NullPointerException(); // don't postpone errors
Comparator<? super K> cmp = comparator;
outer: for (;;) {
for (Node<K,V> b = findPredecessor(key, cmp), n = b.next;;) {
Object v; int c;
if (n == null)
break outer;
Node<K,V> f = n.next;
if (n != b.next) // inconsistent read
break;
if ((v = n.value) == null) { // n is deleted
n.helpDelete(b, f);
break;
}
if (b.value == null || v == n) // b is deleted
break;
if ((c = cpr(cmp, key, n.key)) == 0)
return n;
if (c < 0)
break outer;
b = n;
n = f;
}
}
return null;
}


参考:

🔗 跳跃列表

🔗 Guide to the ConcurrentSkipListMap