ConcurrentSkipListMap

ConcurrentSkipListMap

第一节 简介

ConcurrentSkipListMap可以使我们以无锁的方式来实现线程安全,如果我们需要对数据创建一个不可变的快照,而其它线程正在将数据插入映射中,ConcurrentSkipListMap 会是一个理想的选择。

ConcurrentSkipListMap 按其键的自然顺序(或自定义键顺序)存储映射。

1.1 使用场景

当需要对键进行排序,以及需要一个可导航的映射 NavigableMap 时。

1.2 ConcurrentHashMap和ConcurrentSkipListMap的区别

1
2
3
4
5
6
7
8
ConcurrentHashMap
Sorted
❌ Navigable
✅ Concurrent
ConcurrentSkipListMap
Sorted
✅ Navigable
✅ Concurrent

ConcurrentHashMap 不能保证其操作的运行时间(散列结构因为重新散列的情况),允许对某些负载因子进行调整(大致是同时修改它的线程数)。

ConcurrentSkipListMap 保证了各种操作的平均 O(log n) 性能,其本身是并发的,不支持为了并发性而进行的调优,而 ConcurrentHashMap 则可以进行并发调优。

ConcurrentSkipListMap 还有许多 ConcurrentHashMap 没有的操作:ceilingEntry/KeyfloorEntry/Key 等。

因为要维护一个排序顺序,相比 ConcurrentHashMap ,它的 get/put/contains 操作要慢,但它支持 SortedMapNavigableMapConcurrentNavigableMap 等接口。如果使用 ConcurrentHashMap 得到一个顺序映射,则必须计算排序顺序(费用很高)。

通常情况下,如果需要快速添加单个键/值对和快速查找单个键,最好使用 HashMap 。如果需要更快的顺序遍历,并且能够负担额外的插入成本,可以使用 SkipListMap


第二节 案例

利用 ConcurrentSkipListMap 的高性能实现一个非阻塞算法,假设我们有一个连续且来自多个线程的事件流。我们需要能够从过去60秒开始记录事件,也可以记录超过60秒的事件。

1
2
3
4
5
6
public class Event {
private ZonedDateTime eventTime;
private String content;

// standard constructors/getters
}

我们希望使用eventTime字段对事件进行排序。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class EventWindowSort {
//为了使用ConcurrentSkipListMap实现这一点,我们需要在创建比较器的实例时向其构造函数传递比较器:
ConcurrentSkipListMap<ZonedDateTime, String> events
= new ConcurrentSkipListMap<>(
Comparator.comparingLong(v -> v.toInstant().toEpochMilli()));
//我们将使用时间戳比较所有到达的事件。我们正在使用comparingLong()方法并传递提取函数,该函数可以从ZonedDateTime获取长时间戳。

//当我们的事件到达时,我们只需要使用put()方法将它们添加到映射中。注意,此方法不需要任何显式同步:
public void acceptEvent(Event event) {
events.put(event.getEventTime(), event.getContent());
}

//要获取在过去一分钟内到达的所有事件,我们可以使用tailMap()方法并传递获取元素的时间:它将返回过去一分钟的所有事件。这将是一个不可变的快照,最重要的是,其他编写线程可以向ConcurrentSkipListMap添加新事件,而无需执行显式锁定。
public ConcurrentNavigableMap<ZonedDateTime, String> getEventsFromLastMinute() {
return events.tailMap(ZonedDateTime.now().minusMinutes(1));
}

//现在,我们可以使用headMap()方法获取一分钟后到达的所有事件:这将返回超过一分钟的所有事件的不可变快照。
public ConcurrentNavigableMap<ZonedDateTime, String> getEventsOlderThatOneMinute() {
return events.headMap(ZonedDateTime.now().minusMinutes(1));
}
}

ConcurrentSkipListMap 将使用在构造函数中传递给它的比较器来处理这些事件的排序。ConcurrentSkipListMap 最显著的优点是可以以无锁方式生成其数据的不可变快照的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@Test
public void givenThreadsProducingEvents_whenGetForEventsFromLastMinute_thenReturnThoseEventsInTheLockFreeWay() throws InterruptedException {
//一旦我们使用ConcurrentSkipListMap实现了排序逻辑,我们现在可以通过创建两个writer线程来测试它,每个线程将发送100个事件:
ExecutorService executorService = Executors.newFixedThreadPool(3);
EventWindowSort eventWindowSort = new EventWindowSort();
int numberOfThreads = 2;

//每个线程都在调用acceptEvent()方法,发送随机时间从现在开始到“现在减100秒”的事件。
Runnable producer = () -> IntStream
.rangeClosed(0, 100)
.forEach(index -> eventWindowSort.acceptEvent(
new Event(ZonedDateTime.now().minusSeconds(index), UUID.randomUUID().toString()))
);

for (int i = 0; i < numberOfThreads; i++) {
executorService.execute(producer);
}

//同时,我们可以调用getEventsFromLastMinute()方法,该方法将返回窗口一分钟内事件的快照:
ConcurrentNavigableMap<ZonedDateTime, String> eventsFromLastMinute
= eventWindowSort.getEventsFromLastMinute();

//eventsFromLastMinute中的事件数在每次测试运行中都会有所不同,具体取决于生产者线程将事件发送到EventWindowsPort的速度。
//我们可以断言,返回的快照中没有一个事件的时间超过一分钟:
long eventsOlderThanOneMinute = eventsFromLastMinute
.entrySet()
.stream()
.filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1)))
.count();
assertEquals(eventsOlderThanOneMinute, 0);

//快照中一分钟窗口内的事件大于零:
long eventYoungerThanOneMinute = eventsFromLastMinute
.entrySet()
.stream()
.filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1)))
.count();
assertTrue(eventYoungerThanOneMinute > 0);
}

@Test
public void givenThreadsProducingEvents_whenGetForEventsOlderThanOneMinute_thenReturnThoseEventsInTheLockFreeWay() throws InterruptedException {

ExecutorService executorService = Executors.newFixedThreadPool(3);
EventWindowSort eventWindowSort = new EventWindowSort();
int numberOfThreads = 2;

Runnable producer = () -> IntStream
.rangeClosed(0, 100)
.forEach(index -> eventWindowSort.acceptEvent(new Event(ZonedDateTime
.now()
.minusSeconds(index), UUID
.randomUUID()
.toString())));

for (int i = 0; i < numberOfThreads; i++) {
executorService.execute(producer);
}

Thread.sleep(500);

//我们的getEventsFromLastMinute()使用下面的tailMap()。
//现在让我们测试getEventsOlderThatOneMinute(),它使用来自ConcurrentSkipListMap的headMap()方法:
ConcurrentNavigableMap<ZonedDateTime, String> eventsFromLastMinute
= eventWindowSort.getEventsOlderThatOneMinute();

//这一次我们得到一个超过一分钟的事件快照。我们可以断言,此类事件的数量超过零:
long eventsOlderThanOneMinute = eventsFromLastMinute
.entrySet()
.stream()
.filter(e -> e.getKey().isBefore(ZonedDateTime.now().minusMinutes(1)))
.count();

assertTrue(eventsOlderThanOneMinute > 0);

//接下来,没有一个事件是在最后一分钟内发生的:
long eventYoungerThanOneMinute = eventsFromLastMinute
.entrySet()
.stream()
.filter(e -> e.getKey().isAfter(ZonedDateTime.now().minusMinutes(1)))
.count();

assertEquals(eventYoungerThanOneMinute, 0);

executorService.awaitTermination(1, TimeUnit.SECONDS);
executorService.shutdown();
}

第三节 跳跃列表

3.1 简述

跳跃列表是一种数据结构。它允许快速查询一个有序连续元素的数据链表。跳跃列表的平均查找和插入时间复杂度都是 O(log n) ,优于普通队列的 O(n)

其快速查询是通过维护一个多层次的链表,且每一层链表中的元素是前一层链表元素的子集(见下边的示意图)。一开始时,算法在最稀疏的层次进行搜索,直至需要查找的元素在该层两个相邻的元素中间。这时,算法将跳转到下一个层次,重复刚才的搜索,直到找到需要查找的元素为止。跳过的元素的方法可以是随机性选择确定性选择,其中前者更为常见。

如下为一张跳跃列表的示意图。每个带有箭头的框表示一个指针, 而每行是一个稀疏子序列的链表;底部的编号框(黄色)表示有序的数据序列。查找从顶部最稀疏的子序列向下进行, 直至需要查找的元素在该层两个相邻的元素中间。

跳跃列表的示意图

跳跃列表是按层建造的。底层是一个普通的有序链表。每个更高层都充当下面列表的“快速通道”,这里在第i层中的元素按某个固定的概率p(通常为1/2或1/4)出现在第 i+1 层中。每个元素平均出现在 1/1-p 个列表中,而最高层的元素(通常是在跳跃列表前端的一个特殊的头元素)在 log(1/P)n 个列表中出现。

在查找目标元素时,从顶层列表、头元素起步。算法沿着每层链表搜索,直至找到一个大于或等于目标的元素,或者到达当前层列表末尾。如果该元素等于目标元素,则表明该元素已被找到;如果该元素大于目标元素或已到达链表末尾,则退回到当前层的上一个元素,然后转入下一层进行搜索。每层链表中预期的查找步数最多为1/p,而层数为 log(1/P)n ,所以查找的总体步数为 -(log(p)n)/p ,由于p是常数,查找操作总体的时间复杂度为O(log n)。而通过选择不同p值,就可以在查找代价和存储代价之间获取平衡。

跳跃列表不像平衡树等数据结构那样提供对最坏情况的性能保证:由于用来建造跳跃列表采用随机选取元素进入更高层的方法,在小概率情况下会生成一个不平衡的跳跃列表(最坏情况例如最底层仅有一个元素进入了更高层,此时跳跃列表的查找与普通列表一致)。但是在实际中它通常工作良好,随机化平衡方案也比平衡二叉查找树等数据结构中使用的确定性平衡方案容易实现。跳跃列表在并行计算中也很有用:插入可以在跳跃列表不同的部分并行地进行,而不用对数据结构进行全局的重新平衡

发明者威廉·普对跳跃列表的评价是:“跳跃列表是在很多应用中有可能替代平衡树而作为实现方法的一种数据结构。跳跃列表的算法有同平衡树一样的渐进的预期时间边界,并且更简单、更快速和使用更少的空间。”

3.2 实现细节

因为跳跃列表中的元素可以在多个列表中,所以每个元素可以有多于一个指针。

跳跃列表的插入和删除的实现与普通的链表操作类似,但高层元素必须在进行多个链表中进行插入或删除。

往跳跃列表中插入一个元素

跳跃列表的最坏时间性能具有一定随机性,但是可以通过时间复杂度为O(n)的遍历操作(例如在打印列表全部内容时)以无随机的算法重整列表的结构,从而使跳跃列表的实际查找时间复杂度尽量符合理论平均值O(log n)。

除了使用无随机算法之外,我们可以以下面的准随机方式地生成每一层:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
make all nodes level 1
j ← 1
while the number of nodes at level j > 1 do
for each i'th node at level j do
if i is odd
if i is not the last node at level j
randomly choose whether to promote it to level j+1
else
do not promote
end if
else if i is even and node i-1 was not promoted
promote it to level j+1
end if
repeat
j ← j + 1
repeat

类似无随机版本,准随机重整仅在需要执行一个O(n)操作(访问每个节点)的时候伴随进行。


参考:

🔗 Guide to the ConcurrentSkipListMap
🔗 Skip list
🔗 When should I use ConcurrentSkipListMap?
🔗 Java ConcurrentNavigableMap and ConcurrentSkipListMap Tutorial with all Details (java.util.concurrent.*)