Redis底层数据结构

Redis底层数据结构

image

第一节 动态字符串-SDS

1.1 概述

Redis 是由 C 语言作为底层编程语言实现的,而 C 语言中的字符串是一个以空字符结尾的字符数组,这种结构对于 Redis 而言过于简单了,主要缺点如下:

  1. **C 字符串不记录自身长度信息,为了获取字符串长度必须遍历整个字符串,时间复杂度为 O(n)**。
  2. 由于 C 字符串不记录自身长度,稍有不小心就会造成缓冲区溢出
  3. 对于 Redis 这种缓存类型数据库,对于缓存的 Value 是有可能经常的更改的。但是 C 字符串每次的增长或是缩小都需要一次内存的重分配操作
  4. Redis 数据库中缓存的内容不是特定的,有可能会是图片、音频等等文件的二进制数据,但是 C 字符串中的字符必须符合某种编码,且字符串中不能包含空格,这些限制也导致了 Redis 不能使用 C 字符串来作为自身字符串的实现。

于是 Redis 实现了 SDS 这种简单动态字符串结构,它其实和 Java 中 ArrayList 的实现是很类似的。

Redis 源代码中 sds.h 文件下,有五种 sdshdr ,它们分别是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
struct __attribute__ ((__packed__)) sdshdr5 {
unsigned char flags; /* 3 lsb of type, and 5 msb of string length */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr8 {
uint8_t len; /* used */
uint8_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr16 {
uint16_t len; /* used */
uint16_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr32 {
uint32_t len; /* used */
uint32_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};
struct __attribute__ ((__packed__)) sdshdr64 {
uint64_t len; /* used */
uint64_t alloc; /* excluding the header and null terminator */
unsigned char flags; /* 3 lsb of type, 5 unused bits */
char buf[];
};

其中,sdshdr5 的注释表明,sdshdr5 is never used。sdshdr5 这种数据结构一般用于存储长度小于 32 个字符的字符串,但现在也已经不再使用这种结构了,再小长度的字符串也建议使用 sdshdr8 进行存储,因为 sdshdr5 少了两个关键字段,因此不具备动态扩容操作,一旦预分配的内存空间使用完,就需要重新分配内存并完成数据的复制迁移,在实际的生产环境中对于性能的影响还是很大的,所以进行了一个抛弃,但其实有些比较小的键依然会采用这种结构存储。

关于 sdshdr5 我们不再多说,我们看其他四种结构的各个字段,len 字段表示当前字符串总长度,也即当前字符串已使用内存大小,alloc 表示为当前字符串分配的总内存大小(不包括len以及flags字段本身分配的内存),因为每一个结构在预分配的时候都会多分配一段内存空间,主要是为了方便以后的扩容。flags 的低三位表示当前 sds 的类型,高五位无用。低三位取值如下:

1
2
3
4
5
#define SDS_TYPE_5  0
#define SDS_TYPE_8 1
#define SDS_TYPE_16 2
#define SDS_TYPE_32 3
#define SDS_TYPE_64 4

实际上,redis 对 sdshdr 内存分配是禁用内存对齐的,也就是说每个字段分配的内存地址是紧紧排列在一起的, 所以 redis 中字符串参数的传递直接使用 char* 指针。

可能有人会疑问,仅仅通过一个 char 指针如何确定当前字符串的类型,其实由于 sdshdr 内存分配禁止内存对齐,所以 sds[-1] 其实指向的就是 flags 字段的内存地址,通过 flags 字段又可以得到当前 sds 属于哪种类型,进而可以读取头部字段确定 sds 的相关属性。

接下来我们讲讲 sdshdr 相对于传统的 C 语言字符串,性能的提升在哪,以及具有哪些便捷的点。

首先,对于传统的 C 字符串,我想要获取字符串的长度,至少需要 O(n) 遍历一遍数组才行,而我们 sds 只需要 O(1) 的取 len 字段的值即可。

其次,也是非常重要的一个设计,如果我们初始分配了一个字符串对象,那么如果我要在这个字符串后面追加内容的话,限制于数组的长度一经初始化是不能修改的,我们至少需要分配一个足够大的数组,然后将原先的字符串进行一个拷贝。

sdshdr 每次为一个 sds 分配内存的时候都会额外分配一部分暂不使用的内存空间,一般额外的内存会等同于当前字符串占用的内存大小,如果超过 1MB,那么额外空间的内存大小就是 1MB。每当执行 sdscat 这种方法的时候,程序会用 alloc-len 比较下剩下的空余内存是否足够分配追加的内容,如果不够自然触发内存重分配,而如果剩余未使用内存空间足够放下,那么将直接进行分配,无需内存重分配。

通过这种预分配策略, SDS 将连续增长 N 次字符串所需的内存重分配次数从必定 N 次降低为最多 N 次

最后,对于常规的 C 语言字符串,它通过判断当前字符是否是空字符来决定字符串的结尾,所以就要求你的字符串中不能包含甚至一个空字符,否则空字符后面的字符都不能作为有效字符被读取。而对于某些具有特殊格式要求的,需要使用空字符进行分隔作用的,那么传统的 C 字符串就无法存储了,而我们的 sds 不是通过空字符判断字符串结尾,而是通过 len 字段的值判断字符串的结尾,所以说,sds 还具备二进制安全这个特性,即它可以安全的存储具备特殊格式要求的二进制数据。

关于 sds 我们就简单说到这,它是一种改良版的 C 字符串,兼容 C 语言中既有的函数 API,也通过一些手段提升了某些操作的性能,值得大家借鉴。


第二节 链表

链表这种数据结构相信大家也不陌生,有很多类型,比如单向链表,双向链表,循环链表等,链表相对于数组来说,一是不需要连续的内存块地址,二是删除和插入的时间复杂度是 O(1) 级别的,非常的高效,但比不上数组的随机访问查询方式

一样的那句话,没有最好的数据结构,只有恰到好处的数据结构,比如我们后面要介绍的更高层次的数据结构,字典,它的底层其实就依赖的链表规避哈希冲突,具体的我们后面再说。

redis 中借助 C 语言实现了一个双向链表结构:

1
2
3
4
5
typedef struct listNode {
struct listNode *prev;
struct listNode *next;
void *value;
} listNode;

pre 指针指向前一个节点,next 指针指向后一个节点,value 指向当前节点对应的数据对象。盗一张图描述整个串联起来的链表结构:

image

虽然通过链表的第一个头节点就可以遍历整个链表,但在 redis 向上封装了一层结构,专门用于表示一个链表结构

1
2
3
4
5
6
7
8
typedef struct list {
listNode *head;
listNode *tail;
void *(*dup)(void *ptr);
void (*free)(void *ptr);
int (*match)(void *ptr, void *key);
unsigned long len;
} list;
  • head 指向链表的头节点
  • tail 指向链表的尾节点
  • dup 函数用于链表转移复制时对节点 value 拷贝的一个实现,一般来说用等于号足以,但某些特殊情况下可能会用到节点转移函数,默认可以给这个函数赋值 NULL 即表示使用等于号进行节点转移。
  • free 函数用于释放一个节点所占用的内存空间,默认赋值 NULL 的话,即使用 redis 自带的 zfree 函数进行内存空间释放,我们也可以来看一下这个 zfree 函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
void zfree(void *ptr) {
#ifndef HAVE_MALLOC_SIZE
void *realptr;
size_t oldsize;
#endif

if (ptr == NULL) return;
#ifdef HAVE_MALLOC_SIZE
update_zmalloc_stat_free(zmalloc_size(ptr));
free(ptr);
#else
realptr = (char*)ptr-PREFIX_SIZE;
oldsize = *((size_t*)realptr);
update_zmalloc_stat_free(oldsize+PREFIX_SIZE);
free(realptr);
#endif
}

这里会涉及到一个内存对齐的概念,就比如一个 64 位的操作系统,一次内存 IO 会固定取出 8 个字节的内存数据出来,如果某个变量横跨了两个八字节段,那么 CPU 需要进行两次的 IO 才能完整取出该变量的数据,引入内存对齐,是为了保证任意变量的内存分配不会出现上述的横跨情况,具体的操作手法就是填充无用的内存位,当然这必然会造成内存碎片,不过这也是一种以空间换时间的策略,你也可以禁用它。

函数的上半部分是做一些判断,如果确定了该指针指向的数据结构占用的总内存,则直接调用 free 函数进行内存的释放,否则需要进行一个计算。redis 中的 zmalloc 在每一次内存数据分配的时候都会追加一个 PREFIX_SIZE 的头部数据块,它的值等于当前系统的最大寻址空间,比如 64 CPU的话,PREFIX_SIZE 就会占用到 8 个字节,并且这 8 个字节内部存储的是当前数据实际占用内存大小。

所以这里的话,ptr 指针向低位移动就是指向头部 PREFIX_SIZE 字段首地址,然后取出里面保存的值,也就是当前数据结构实际占用的内存大小,最后加上它自身传入 update_zmalloc_stat_free 函数中修改 used_memory 内存记录指针的值,并在最后调用 free 函数释放内存,包括头部的部分。

其实我们扯远了,继续看数据结构,这里如果还不是很明白的话,没关系,后面我们还会继续讲的。

match 函数依然是一个多态的实现,只给出了定义,具体实现由你来决定,你也可以选择不实现,它用于比较两个链表节点的 value 值是否相等。返回 0 表示不相等,返回 1 表示相等。

最后一个 len 字段描述的是,整个链表中所包含的节点数量。以上就是 redis 中链表的一个基本的定义,加上 list,最终链表结构在 redis 中呈现的抽象图大概是这样的,依然盗的图:

image

综上,我们介绍了 redis 中链表的一个基本实现情况,总结一下,它是一个双端链表,也就是查找某个节点的前后节点的时间复杂度都在 O(1),也是一个无环并具有首尾节点指针的链表,初次之外,还具有三个多态函数,用于节点间的复制、比较以及内存释放,需要使用者自行实现。


第三节 字典

字典相对于数组,链表来说,是一种较高层次的数据结构,像我们的汉语字典一样,可以通过拼音或偏旁唯一确定一个汉字,在程序里我们管每一个映射关系叫做一个键值对很多个键值对放在一起就构成了我们的字典结构

有很多高级的字典结构实现,例如我们 Java 中的 HashMap 底层实现,根据键的 Hash 值均匀的将键值对分散到数组中,并在遇到哈希冲突时,冲突的键值对通过单向链表串联,并在链表结构超过八个节点裂变成红黑树

那么 redis 中是怎么实现的呢?我们一起来看一看。

一、字典结构定义

Redis 中的字典相关结构都定义在 dict.h 文件中,dict 表示一个字典结构:

1
2
3
4
5
6
7
typedef struct dict {
dictType *type;
void *privdata;
dictht ht[2];
long rehashidx;
unsigned long iterators;
} dict;
  • type 字段指向 dictType 结构,这个结构中定义几个多态方法。
  • privdata 指针存储了字典结构一些附属额外信息。
  • ht 是一个 dictht 结构的数组,dictht 就是一个哈希表结构。
  • rehashidx 字段用于 rehash 过程中记录正在转移的键。
  • iterators 字段记录了当前字典正在进行中的迭代器。

dictType 具体如下:

1
2
3
4
5
6
7
8
typedef struct dictType {
uint64_t (*hashFunction)(const void *key);
void *(*keyDup)(void *privdata, const void *key);
void *(*valDup)(void *privdata, const void *obj);
int (*keyCompare)(void *privdata, const void *key1, const void *key2);
void (*keyDestructor)(void *privdata, void *key);
void (*valDestructor)(void *privdata, void *obj);
} dictType;
  • hashFunction 哈希函数指针,当我们通过 set 命令往字典中存储数据时,会先用键值对的键作为参数传入哈希函数,得到一个较为散列均匀的值,然后才会实际的进行数据的存储。这里就会用到哈希函数,如果你需要为你的字典结构提供不同的散列方式,在初始化字典的时候为 dictType 中哈希函数进行一个实现就好。
  • keyDup 是一个键的复制函数。
  • valDup是一个键值对的值的复制函数。
  • keyCompare 是一个键的比较大小的函数。
  • keyDestructor 销毁一个键。
  • valDestructor 销毁一个键值对的值。都是一个多态的呈现,具体实现需要使用者自行提供。

接着看 dict 结构,具体的再看。

dictht 就是我们的哈希表结构,

1
2
3
4
5
6
typedef struct dictht {
dictEntry **table;
unsigned long size;
unsigned long sizemask;
unsigned long used;
} dictht;

table 是一个指向 dictEntry 的二维数组,每个 dictEntry 其实就表述一个键值对,为什么是一个二维的结构呢?

其实正常情况下,我们的字典是这样保存数据的:

image

每个 dictEntry 内部会保存一个 key/value 的键值对,然后我们通过 table 指针可以遍历所有的键值对,但是如果某个键值对的键进行哈希之后并计算得到应该存储的位置被别的节点捷足先登了,也就是我们常说的哈希冲突了,怎么办?

redis 中的做法,甚至于大部分字典结构实现都是选择将冲突的节点串联成链表,于是字典结构就变成这样了。

image

同一条链表上的节点键的哈希值必定是相同的,也正是因为相同才会被串在一起,从逻辑上看,字典结构如上图所展示的那样,但抽象到我们的代码层,就是一个二维数组的结构,第一维放的就是节点指针的指针,第二维指向的就是指向我们键值对结构的指针,每一个 dictEntry 结构都会有一个 next 指针,在遇到哈希冲突的时候可以串联所有冲突节点。

除此之外,dictht 中的 size 属性用于描述整个哈希字典表最大可寻址大小,也就是二维数组中第一维度的最大长度,sizemask 属性始终等于 size-1 表述的是一种大小掩码的概念,用于确定节点最初在数组中的位置,used 记录了整张哈希表中已经存储的键值对节点数量。

其中,dict 字典结构中 ht 是一个只有两个元素的数组,正常情况下我们使用 ht[0] 字典表,ht[1] 用在我们渐进 rehash 过程中转移 ht[0] 中所有节点中

最后,我们再来看这个 dictEntry 键值对结构:

1
2
3
4
5
6
7
8
9
10
typedef struct dictEntry {
void *key;
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next;
} dictEntry;

key 是一个指向任意结构的指针,代表我们的 key 可以使用我们 redis 中任意对象类型,v 是一个 union 类型,它可以是一个指针,也可以是 uint64_t 或 int64_t 类型,也可以是一个 double 类型。根据实际使用中,value 的不同值,使用不同的字段属性。

next 指针指向另一个 dictEntry 结构,用于发生哈希冲突时,链接下一个键值对节点。

以上就是 redis 中字典结构主要结构类型,从里至外封装了三层,dict 描述一个字典,其中的 dictht 描述哈希表,其中的 dictEntry 描述键值对结构。迭代器回头我们单独说说。

二、渐进式 rehash 迁移数据

redis 的 rehash 和 Java 以及其他哈希的实现稍微可能有点不同,由于 redis 是单线程的,不需要写大量的并发语句来保证数据一致性,但是单线程处理也会导致一次 rehash 过程会非常缓慢,客户端阻塞太久。那么 redis 具体是怎么做的呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
int dictRehash(dict *d, int n) {
int empty_visits = n*10; /* Max number of empty buckets to visit. */
if (!dictIsRehashing(d)) return 0;

while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;

/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
assert(d->ht[0].size > (unsigned long)d->rehashidx);
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
de = d->ht[0].table[d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
while(de) {
uint64_t h;

nextde = de->next;
/* Get the index in the new hash table */
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
d->ht[0].table[d->rehashidx] = NULL;
d->rehashidx++;
}

/* Check if we already rehashed the whole table... */
if (d->ht[0].used == 0) {
zfree(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
return 0;
}

/* More to rehash... */
return 1;
}

rehashidx 的值默认为 -1,表示当前字典未处于 rehash 阶段,其他场合该字段的值等于当前正在转移桶的索引。

新版本的 dictRehash 需要多传一个参数 n,这个参数用于控制单次最多转移空桶数量。什么意思呢,具体我们看一张图:

image

有这么一个字典结构,其中索引值为 2 和 3 的两个桶是空的,即里面没有放我们的键值对节点。正常情况下,一次 rehash 只会转移一个桶,但如果上一次转移了索引为 1 的那个桶,下一次来会遍历后面一个桶,如果继续为空就继续向后遍历,直到找到一个存储了我们节点的非空桶,极端情况下,如果字典表中只有最后一个桶有节点,那么一次的 rehash 就要遍历所有的桶,时间复杂度 O(n),这会导致客户端等待过长时间,所以新版本中额外传一个参数 n 用于控制最多遍历的空桶数。

相关代码段如下:

1
2
3
4
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}

方法的尾部会进行一个校验,如果当前桶转移结束后,当前字典的 rehash 过程完全结束,那么修改 ht[0] 指针引用,让他指向新的字典表 ht[1],并设置 rehashidx 为 -1,标记整个字典 rehash 结束。

以上就是 redis 中 rehash 的全过程,还是比较简单的,那为什么说它是渐进式的呢,我们看一下添加和查询键值对的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
int index;
dictEntry *entry;
dictht *ht;

if (dictIsRehashing(d)) _dictRehashStep(d);

if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
return NULL;
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
entry = zmalloc(sizeof(*entry));
entry->next = ht->table[index];
ht->table[index] = entry;
ht->used++;

/* Set the hash entry fields. */
dictSetKey(d, entry, key);
return entry;
}

这就是我们调用 set 命令,底层为我们添加键值对的方法,函数的最开头逻辑就是调用 dictIsRehashing 方法判断当前的字典表是否处于 rehash 状态,也即判断 rehashidx 是否不等于 -1 了。_dictRehashStep 方法实现:

1
2
3
static void _dictRehashStep(dict *d) {
if (d->iterators == 0) dictRehash(d,1);
}

默认情况下,一次 rehash 过程,redis 允许最多 10 空桶的访问就要返回,不得逗留。值得注意的是,方法的后续逻辑会判断当前字典如果正在进行 rehash,那么新的键值对将不再向 ht[0] 中添加,而直接转而添加到 ht[1] 中

我们再看看查询键值对的 get 命令底层 API 调用,底层会调用 dictFind 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
dictEntry *dictFind(dict *d, const void *key)
{
dictEntry *he;
unsigned int h, idx, table;

if (d->ht[0].used + d->ht[1].used == 0) return NULL;
if (dictIsRehashing(d)) _dictRehashStep(d);
h = dictHashKey(d, key);
for (table = 0; table <= 1; table++) {
idx = h & d->ht[table].sizemask;
he = d->ht[table].table[idx];
while(he) {
if (key==he->key || dictCompareKeys(d, key, he->key))
return he;
he = he->next;
}
if (!dictIsRehashing(d)) return NULL;
}
return NULL;
}

可以看到,同样也是有 dictIsRehashing 方法的判断,如果字典处于 rehash 状态,即需要去完成一个桶的转移,然后才能返回。值得注意的是,方法的中间逻辑是嵌套在一个 for 循环中的,供两次循环,第一次从 ht[0] 中搜索我们给定 key 的键值对,如果没有找到,第二次循环将从 ht[1] 中搜索我们要查询的键值对。

之所以说 redis 的 rehash 是渐进式的,就是因为即便它处于 rehash 状态下,所有节点的插入、查询甚至于删除都是不受影响的,直至整个 rehash 结束,redis 释放原先 ht[0] 占用无用内存。

ps:redis 中的字典实现相对于 Java 中的实现要简单不少,主要还是因为 redis 是单线程调用的,不需要使用额外的并发语句控制。

三、字典迭代器

迭代器是用于迭代遍历字典中所有的节点的一个工具,有两种,一种是安全迭代器,一种是不安全迭代器。安全迭代器就是指,你在迭代的过程中,允许你对字典结构进行修改,也即允许你添加、删除、修改字典中的键值对节点。不安全迭代器即不允许对字典中任何节点进行修改。

dictIterator 结构的定义如下:

1
2
3
4
5
6
7
8
typedef struct dictIterator {
dict *d;
long index;
int table, safe;
dictEntry *entry, *nextEntry;
/* unsafe iterator fingerprint for misuse detection. */
long long fingerprint;
} dictIterator;

字段 d 指向一个即将被迭代的字典结构,index 记录了当前迭代到字典中的桶索引,table 取值为 0 或 1,表示当前迭代的是字典中哪个哈希表,safe 标记当前迭代器是安全的或是不安全的。 entry 记录的是当前迭代的节点,nextEntry 的值等于 entry 的 next 指针,用于防止当前节点接受删除操作后续节点丢失情况。fingerprint 保存了 dictFingerprint 函数根据当前字典的基本信息计算的一个指纹信息,稍有一丁点变动,指纹信息就会发生变化,用于不安全迭代器检验。

安全迭代器获取方式:

1
2
3
4
5
6
7
8
9
10
11
12
dictIterator *dictGetIterator(dict *d)
{
dictIterator *iter = zmalloc(sizeof(*iter));

iter->d = d;
iter->table = 0;
iter->index = -1;
iter->safe = 0;
iter->entry = NULL;
iter->nextEntry = NULL;
return iter;
}

不安全迭代器获取方式:

1
2
3
4
5
6
dictIterator *dictGetSafeIterator(dict *d) {
dictIterator *i = dictGetIterator(d);

i->safe = 1;
return i;
}

下面我们看看迭代器的核心方法,dictNext 用于获取字典中下一个节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
dictEntry *dictNext(dictIterator *iter)
{
while (1) {
//如果迭代器初次工作,entry 必定为 null
if (iter->entry == NULL) {
//拿到迭代器 d 字段保存的字典
dictht *ht = &iter->d->ht[iter->table];
if (iter->index == -1 && iter->table == 0) {
if (iter->safe)
//给字典的 iterators 字段自增,禁止 rehash操作
iter->d->iterators++;
else
//计算并保存指纹信息
iter->fingerprint = dictFingerprint(iter->d);
}
//迭代器开始工作,指向 0 号桶
iter->index++;
//如果 index 大于等于 size,即最后一个桶迭代结束
if (iter->index >= (long) ht->size) {
if (dictIsRehashing(iter->d) && iter->table == 0) {
//当前字典结构正在 rehash 且 ht[0] 已经遍历结束
//继续遍历 ht[1]
iter->table++;
iter->index = 0;
ht = &iter->d->ht[1];
} else {
//否则表示迭代工作确实全部结束
break;
}
}
//根据 index 取出节点
iter->entry = ht->table[iter->index];
} else {
//如果 entry 不等于 null,尝试遍历它的后续节点
iter->entry = iter->nextEntry;
}
//到这里,迭代器已经拿到下一个节点了
if (iter->entry) {
//记录 nextEntry 节点的值
iter->nextEntry = iter->entry->next;
return iter->entry;
}
}
return NULL;
}

大部分逻辑都已经注释上了,整个方法是一个死循环,如果 entry 等于 null,要么是迭代器初次工作,要么是迭代到一个桶的最后节点处了。如果是后者,会进入 if 逻辑中,判断是否整个字典全部迭代结束,如果不是取下一个桶。

如果字典未处于 rehash 状态,自增 iterators 属性的操作会禁止后续节点操作触发 rehash,如果已经处于 rehash 过程了,也不慌,当前 ht[0] 迭代结束后,再去迭代早于迭代器工作前已经被转移到 ht[1] 的那些节点。因为如果你是安全迭代器的话,iterators 一自增之后,后续节点就不会触发 rehash 迁移节点,所以不会重复迭代数据。

迭代器迭代结束之后需要释放关闭释放迭代器,redis 中对应方法:

1
2
3
4
5
6
7
8
9
10
void dictReleaseIterator(dictIterator *iter)
{
if (!(iter->index == -1 && iter->table == 0)) {
if (iter->safe)
iter->d->iterators--;
else
assert(iter->fingerprint == dictFingerprint(iter->d));
}
zfree(iter);
}

如果是安全的迭代器,自减 iterators,不安全迭代器会重新计算指纹并与迭代器最开始工作时计算的指纹比较,并通过 assert 断言判断指纹是否一致,如果不一致则说明你在不安全的迭代器中执行了修改字典结构的方法,程序报错并退出。

以上就是 redis 字典中基础的两个安全与非安全迭代器用法及其原理,终究是不允许边 rehash 边遍历的,其实 redis 中还有一种高级遍历方式,大家叫它 scan 遍历,它允许边 rehash 边迭代,比较高级,我们后续会分析它的源码,敬请期待!


第四节 跳跃表

我们都知道单链表有一个致命的弱点,查找任一节点都至少 O(n) 的时间复杂度,它需要遍历一遍整个链表,那么有没有办法提升链表的搜索效率?

跳跃表(SkipList)这种数据结构使用空间换时间的策略,通过给链表建立多层索引来加快搜索效率,我们先介绍跳跃表的基本理论,再来看看 redis 中的实现情况。

一、跳跃表(SkipList)

image

这是一条带哨兵的双端链表,大部分场景下的链表都是这种结构,它的好处是,无论是头插法还是尾插法,插入操作都是常量级别的时间复杂度,删除也是一样。但缺点就是,如果想要查询某个节点,则需要 O(n)。

那如果我们给链表加一层索引呢?当然前提是最底层的链表是有序的,不然索引也没有意义了。

image

让 HEAD 头指针指向最高索引,我抽出来一层索引,这样即便你查找节点 2222 三次比较。

第一次:与 2019 节点比较,发现大于 2019,往后继续

第二次:与 2100 节点比较,发现依然大于,往后继续

第三次:本层索引到头了,指向低层索引的下一个节点,继续比较,找到节点

而无索引的链表需要四次,效率看起来不是很明显,但是随着链表节点数量增多,索引层级增多,效率差距会很明显。图就不自己画了,取自极客时间王争老师的一张图。

![image](data:image/svg+xml;utf8,)

你看,原本需要 62 次比较操作,通过五层索引,只需要 4 次比较,跳跃表的效率可见一瞥。

想要知道具体跳跃表与链表差距多少,我们接下来进行它们各个操作的时间复杂度分析对比。

1、插入节点操作

双端链表(以下我们简称链表)的原本插入操作是 O(1) 的时间复杂度,但是这里我们讨论的是有序链表,所以插入一个节点至少还要找到它该插入的位置,然后才能执行插入操作,所以链表的插入效率是 O(n)。

跳跃表(以下我们简称跳表)也依然是需要两个步骤才能完成插入操作,先找到该插入的位置,再进行插入操作。我们设定一个具有 N 个节点的链表,它建有 K 层索引并假设每两个节点间隔就向上分裂一层索引。

k 层两个节点,k-1 层 4 个节点,k-2 层 8 个节点 … 第一层 n 个节点,

1
2
3
4
5
6
1n
21/2 * n
31/2^2 * n
.....
.....
k:1/2^(k-1) * n

1/2^(k-1) * n 表示第 k 层节点数,1/2^(k-1) * n=2 可以得到,k 等于 logn,也就是说 ,N 个节点构建跳表将需要 logn 层索引,包括自身那层链表层。

而当我们要搜索某个节点时,需要从最高层索引开始,按照我们的构建方式,某个节点必然位于两个索引节点之间,所以每一层都最多访问三个节点。这一点你可能需要理解理解,因为每一层索引的搜索都是基于上一层索引的,从上一层索引下来,要么是大于(小于)当前的索引节点,但不会大于(小于)其往后两个位置的节点,也就是当前索引节点的上一层后一索引节点,所以它最多访问三个节点。

有了这一结论,我们向跳表中插入一个元素的时间复杂度就为:O(logn)。这个时间复杂度等于二分查找的时间复杂度,所有有时我们又称跳表是实现了二分查找的链表。

很明显,插入操作,跳表完胜链表。

2、修改删除查询

这三个节点操作其实没什么可比性,修改删除操作,链表等效于跳表。而查询,我们上面也说了,链表至少 O(n),跳表在 O(logn)。

除此之外,我们都知道红黑树在每次插入节点后会自旋来进行树的平衡,那么跳表其实也会有这么一个问题,就是不断的插入,会导致底层链表节点疯狂增长,而索引层依然那么多,极端情况所有节点都新增到最后一级索引节点的右边,进而使跳表退化成链表。

简单一句话来说,就是大量的节点插入之后,而不更新索引的话,跳表将无法一如既往的保证效率。解决办法也很简单,就是每一次节点的插入,触发索引节点的更新,我们具体来看一下更新策略。

一般跳表会使用一个随机函数,这个随机函数会在跳表新增了一个节点后,根据跳表的目前结构生成一个随机数,这个数值当然要小于最大的索引层值,假定这个值等于 m,那么跳表会生成从 1 到 m 层的索引。所以这个随机函数的选择或者说实现就显得很重要了,关于它我们这里不做讨论,大家可以看看各种跳表的实现中是如何实现这个随机函数的,典型的就是 Java 中 ConcurrentSkipListMap 内部实现的 SkipList 结构,当然还有我们马上要介绍的 redis 中的实现。

以上就是跳表这种数据结构的基本理论内容,接下来我们看 redis 中的实现情况。

二、Redis 中的跳跃表

说在前面的是,redis 自己实现了跳表,但目的是为它的有序集合等高层抽象数据结构提供服务,所以等下我们分析源代码的时候其中必然会涉及到一些看似无用的结构和代码逻辑,但那些也是非常重要的,我们也会提及有序集合相关的内容,但不会拆分细致,重点还是看跳表的实现。

跳表的数据结构定义如下:

1
2
3
4
5
typedef struct zskiplist {
struct zskiplistNode *header, *tail;
unsigned long length;
int level;
} zskiplist;

跳表中的每个节点用数据结构 zskiplistNode 表示,head 和 tail 分别指向最底层链表的头尾节点。length 表示当前跳表最底层链表有多少个节点,level 记录当前跳表最高索引层数。

zskiplistNode 结构如下:

1
2
3
4
5
6
7
8
9
typedef struct zskiplistNode {
sds ele;
double score;
struct zskiplistNode *backward;
struct zskiplistLevel {
struct zskiplistNode *forward;
unsigned int span;
} level[];
} zskiplistNode;

我这里摘取的 redis 源码是 4.0 版本的,以前版本 ele 属性是一个 RedisObject 类型,现在是一个字符串类型,也即表示跳表现在只用于存储字符串数据。

score 记录当前节点的一个分值,最底层的链表就是按照分值大小有序的串联的,并且我们查询一个节点,一般也会传入该节点的 score 值,毕竟数值类型比较起来方便。

backward 指针指向前一个节点,为什么是倒着往前,我们待会会说。

level 是比较关键的一个点,这里面是一个 level 数组,而每个元素又都是一个 zskiplistLevel 类型的结构,zskiplistLevel 类型包括一个 forward 前向指针,一个 span 跨度值,具体是什么意思,我们一点点说。

跳表理论上在最底层是一条双端链表,然后基于此建立了多层索引节点以实现的,但在实际的代码实现上,这种结构是不好表述的,所以你要打破既有的惯性思维,然后才能好理解 redis 中的实现。实际上正如我们上述介绍的 zskiplistNode 结构一样,每个节点除了存储节点自身的数据外,还通过 level 数组保存了该节点在整个跳表各个索引层的节点引用,具体结构就是这样的:

![image](data:image/svg+xml;utf8,)

而整张跳表基本就是这样的结构:

![image](data:image/svg+xml;utf8,)

每一个节点的 backward 指针指向自己前面的一个节点,而每个节点中的 level 数组记录的就是当前节点在跳表的哪些索引层出现,并通过其 forward 指针顺序串联这一层索引的各个节点,0 表示第一层,1 表示第二层,等等以此类推。span 表示的是当前节点与后面一个节点的跨度,我们等下还会在代码里说到,暂时不理解也没关系。

基本上跳表就是这样一个结构,上面那张图还是很重要的,包括我们等下介绍源码实现,也对你理解有很大帮助的。(毕竟我画了半天。。)

这里多插一句,与跳表相关结构定义在一起的还有一个有序集合结构,很多人会说 redis 中的有序集合是跳表实现的,这句话不错,但有失偏驳。

1
2
3
4
typedef struct zset {
dict *dict;
zskiplist *zsl;
} zset;

准确来说,redis 中的有序集合是由我们之前介绍过的字典加上跳表实现的,字典中保存的数据和分数 score 的映射关系,每次插入数据会从字典中查询,如果已经存在了,就不再插入,有序集合中是不允许重复数据。

下面我们看看 redis 中跳表的相关代码的实现情况。

1、跳表初始化

redis 中初始化一个跳表的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
zskiplistNode *zslCreateNode(int level, double score, sds ele) {
zskiplistNode *zn =
zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));
zn->score = score;
zn->ele = ele;
return zn;
}

/* Create a new skiplist. */
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
//分配内存空间
zsl = zmalloc(sizeof(*zsl));
//默认只有一层索引
zsl->level = 1;
//0 个节点
zsl->length = 0;
//1、创建一个 node 节点,这是个哨兵节点
//2、为 level 数组分配 ZSKIPLIST_MAXLEVEL=32 内存大小
//3、也即 redis 中支持索引最大 32 层
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);
//为哨兵节点的 level 初始化
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}

zslCreate 用于初始化一个跳表,比较简单,我也给出了基本的注释,这里不再赘述了,强调一点的是,redis 中实现的跳表最高允许 32 层索引,这么做也是一种性能与内存之间的衡量,过多的索引层必然占用更多的内存空间,32 是一个比较合适值。

2、插入一个节点

插入一个节点的代码比较多,也稍微有点复杂,希望你也有耐心和我一起来分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
//update数组将用于记录新节点在每一层索引的目标插入位置
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
//rank数组记录目标节点每一层的排名
unsigned int rank[ZSKIPLIST_MAXLEVEL];
int i, level;

serverAssert(!isnan(score));
//指向哨兵节点
x = zsl->header;
//这一段就是遍历每一层索引,找到最后一个小于当前给定score值的节点
//从高层索引向底层索引遍历
for (i = zsl->level-1; i >= 0; i--) {
//rank记录的是节点的排名,正常情况下给它初始值等于上一层目标节点的排名
//如果当前正在遍历最高层索引,那么这个初始值暂时给0
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
//我们说过level结构中,span表示的是与后面一个节点的跨度
//rank[i]最终会得到我们要找的目标节点的排名,也就是它前面有多少个节点
rank[i] += x->level[i].span;
//挪动指针
x = x->level[i].forward;
}
update[i] = x;
}
//至此,update数组中已经记录好,每一层最后一个小于给定score值的节点
//我们的新节点只需要插在他们后即可

//random算法获取一个平衡跳表的level值,标志着我们的新节点将要在哪些索引出现
//具体算法这里不做分析,你也可以私下找我讨论
level = zslRandomLevel();
//如果产生值大于当前跳表最高索引
if (level > zsl->level) {
//为高出来的索引层赋初始值,update[i]指向哨兵节点
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
//根据score和ele创建节点
x = zslCreateNode(level,score,ele);
//每一索引层得进行新节点插入,建议对照我之前给出的跳表示意图
for (i = 0; i < level; i++) {
//断开指针,插入新节点
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;

//rank[0]等于新节点再最底层链表的排名,就是它前面有多少个节点
//update[i]->level[i].span记录的是目标节点与后一个索引节点之间的跨度,即跨越了多少个节点
//得到新插入节点与后一个索引节点之间的跨度
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
//修改目标节点的span值
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}

//如果上面产生的平衡level大于跳表最高使用索引,我们上面说会为高出部分做初始化
//这里是自增他们的span值,因为新插入了一个节点,跨度自然要增加
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}

//修改 backward 指针与 tail 指针
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}

整个方法我都已经给出了注释,具体的不再细说,欢迎你与我交流讨论,整体的逻辑分为三个步骤。

  1. 从最高索引层开始遍历,根据 score 找到它的前驱节点,用 update 数组进行保存
  2. 每一层得进行节点的插入,并计算更新 span 值
  3. 修改 backward 指针与 tail 指针

删除节点也是类似的,首先需要根据 score 值找到目标节点,然后断开前后节点的连接,完成节点删除。

3、特殊的查询操作

因为 redis 的跳表实现中,增设了 span 这个跨度字段,它记录了与当前节点与后一个节点之间的跨度,所以就具有以下一些查询方法。

a、zslGetRank

返回包含给定成员和分值的节点在跳跃表中的排位。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
unsigned long zslGetRank(zskiplist *zsl, double score, sds ele) {
zskiplistNode *x;
unsigned long rank = 0;
int i;

x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) <= 0))) {
rank += x->level[i].span;
x = x->level[i].forward;
}

/* x might be equal to zsl->header, so test if obj is non-NULL */
if (x->ele && sdscmp(x->ele,ele) == 0) {
return rank;
}
}
return 0;
}

你会发现,这个方法的核心代码其实就是我们插入节点方法的一个部分,通过累计 span 得到目标节点的一个排名值。

b、zslGetElementByRank

通过给定排名查询元素。这个方法就更简单了。

c、zslIsInRange

给定一个分值范围(range), 比如 0 到 10, 如果给定的分值范围包含在跳跃表的分值范围之内, 那么返回 1 ,否则返回 0 。

d、zslFirstInRange

给定一个分值范围, 返回跳跃表中第一个符合这个范围的节点。

e、zslDeleteRangeByScore

给定一个分值范围, 删除跳跃表中所有在这个范围之内的节点。

f、zslDeleteRangeByRank

给定一个排名范围, 删除跳跃表中所有在这个范围之内的节点。

其实,后面列出来的那些根据排名,甚至一个范围查询删除节点的方法,都仰仗的是 span 这个字段,这也是为什么 insert 方法中需要通过那么复杂的计算逻辑对 span 字段进行计算的一个原因。

总结一下,跳表是为有序集合服务的,通过多层索引把链表的搜索效率提升到 O(logn)级别,但修改删除依然是 O(1),是一个较为优秀的数据结构,而 redis 中的实现把每个节点实现成类似楼房一样的结构,也即我们的索引层,非常的巧妙。


第五节 整数集合

当一个集合中只包含整数,并且元素的个数不是很多的话,redis 会用整数集合作为底层存储,它的一个优点就是可以节省很多内存,虽然字典结构的效率很高,但是它的实现结构相对复杂并且会分配较多的内存空间。

而我们的整数集合(intset)可以做到使用较少的内存空间却达到和字典一样效率的实现,但也是前提的,集合中只能包含整型数据并且数量不能太多。整数集合最多能存多少个元素在 redis 中也是有体现的。

OBJ_SET_MAX_INTSET_ENTRIES 512

也就是超过 512 个元素,或者向集合中添加了字符串或其他数据结构,redis 会将整数集合向字典结构进行转换。

一、基本的数据结构

intset 的结构定义很简单,有以下成员构成:

1
2
3
4
5
typedef struct intset {
uint32_t encoding;
uint32_t length;
int8_t contents [];
} intset;

encoding 记录当前 intset 使用编码,有三个取值:

1
2
3
#define INTSET_ENC_INT16 (sizeof(int16_t))
#define INTSET_ENC_INT32 (sizeof(int32_t))
#define INTSET_ENC_INT64 (sizeof(int64_t))

length 记录整数集合中目前存储了多少个元素,contents 记录我们实际的数据集合,虽然我们看到结构体中给数组元素的类型定死成 int8_t,但实际上这个 int8_t 定义的毫无意义,因为这里的处理方式非常规的数组操作,content 字段虽然被定义成指向一个 int8_t 类型数据的指针,但实际上 redis 无论是读取数组元素还是新增元素进去都依赖 encoding 和 length 两个字段直接操作的内存。

基本数据结构还是非常的简单的,下面我们来看看它的一些核心方法。

二、核心 API 实现

1、初始化一个 intset

1
2
3
4
5
6
intset *intsetNew(void) {
intset *is = zmalloc(sizeof(intset));
is->encoding = intrev32ifbe(INTSET_ENC_INT16);
is->length = 0;
return is;
}

可见,默认的 inset 配置是使用 INTSET_ENC_INT16 作为数据存储大小,并且不会为 content 数组初始化。常规的数组需要先预先确定数组长度,然后分配内存,继而通过 contents[x] 可以访问数组中任一元素。

但是,inset 这里是非常规式操作数组,encoding 字段定义了数组中每个元素实际类型,lenth 字段定义了数组中实际的元素个数,那么 contents[x] 是失效的,这种方式只会按照 int8_t 进行内存偏移,这种方式是拿不到正确的数据的,所以 redis 中通过 memcpy 按照 encoding 字段的值暴力直接偏移地址操作内存读取数据。

所以,这也是为什么 intset 初始化时不初始化 content 数组的原因所在,因为没有必要。而每当新增一个元素的时候都会去动态扩容原数组的长度以盛放下新插入进来的元素,扩容不会扩容很多,刚好一个新元素所占用的内存即可。具体的细节,我们接着看。

2、添加新元素

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
intset *intsetAdd(intset *is, int64_t value, uint8_t *success) {
//计算得到新插入的元素的编码
uint8_t valenc = _intsetValueEncoding(value);
uint32_t pos;
if (success) *success = 1;
//如果大于 intset 目前存储元素的编码大小
if (valenc > intrev32ifbe(is->encoding)) {
//触发 intset 升级
return intsetUpgradeAndAdd(is,value);
} else {
//二分搜索当前元素,如果元素已经存在会直接返回
//如果没找到元素,pos 的值就是该元素的位置索引
if (intsetSearch(is,value,&pos)) {
if (success) *success = 0;
return is;
}
//resize 集合,扩容一个元素的内存空间
is = intsetResize(is,intrev32ifbe(is->length)+1);
//移动 pos 后面的元素,以插入我们的新元素
if (pos < intrev32ifbe(is->length)) intsetMoveTail(is,pos,pos+1);
}
//赋值
_intsetSet(is,pos,value);
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
return is;
}

由此,我们应该知道为什么 intset 内的数据是有序且无重复的了,二分查找 O(logN),但是 intset 插入一个元素却不是 O(logN),因为有些情况会触发升级操作,或者极端情况下,会移动所有元素,时间复杂度达到 O(N)。

3、升级

我们先看示意图的变化,然后再分析源码,假设原 intset 使用 16 位的编码存储数据,先来了一个 32 位的数据,触发了我们的编码升级。

原 intset 结构如下:

![image](data:image/svg+xml;utf8,)

新 intset 结构会扩容成这样:

![image](data:image/svg+xml;utf8,)

虽然数据占用的内存已经分配好了,但是还需要做的是迁移每个元素占用的比特位。 做法是这样的,假设我们的新元素是 int_32 类型的数值 65536,那么首先我们会将这个 65536 放到[128-159]比特位区间,然后将 78 放到[96-127]比特位区间,并向前以此类推,最后我们会得到升级完成之后 intset。

下面我们看 redis 中代码的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
static intset *intsetUpgradeAndAdd(intset *is, int64_t value) {
//intset目前的编码
uint8_t curenc = intrev32ifbe(is->encoding);
//intset即将扩展到的编码
uint8_t newenc = _intsetValueEncoding(value);
int length = intrev32ifbe(is->length);
int prepend = value < 0 ? 1 : 0;

//根据新的元素内存大小重新分配 intset 内存大小
is->encoding = intrev32ifbe(newenc);
is = intsetResize(is,intrev32ifbe(is->length)+1);
//这个地方我先标记一下 @1,下面详细分析
//总体上你可以理解,就是我们上图画的那样,从原集合的最后一个元素
//开始扩大它占用的比特位
while(length--)
_intsetSet(is,length+prepend,_intsetGetEncoded(is,length,curenc));

//将新元素放进 intset 中
if (prepend)
_intsetSet(is,0,value);
else
_intsetSet(is,intrev32ifbe(is->length),value);
is->length = intrev32ifbe(intrev32ifbe(is->length)+1);
return is;
}

别的不再解释,我重点解释一下我做标记的 @1,这个循环其实是这个方法的核心点,它完成了将旧元素扩充比特位这么一个操作。

首先明确的一点是,升级操作只有两种情况会触发,一种是新插入一个较大的数值,另一种是新插入一个负很大的值,这两种情况都会导致类型不够存储,需要扩大数据位。

_intsetGetEncoded 这个方法可以根据给定了 length,也就是元素在数组中的下标取出旧数组中对应的元素,很显然,这里是从后往前倒着来的。

因为我们的 intsetResize 方法已经完成了扩容内存的操作,也就是说新元素的内存已经分配完毕,那么 _intsetSet 方法就会将 _intsetGetEncoded 取出的元素重新的向数组中赋值。循环结束时,就是所有元素重新归位的时候,最后再将新元素赋值进入数组最后的位置。

但其实细心的同学会发现,_intsetSet 方法在传下标索引的时候实际传的是 length+prepend,这其实就是我们说,如果 value 是小于零的,length+prepend 最终会导致所有的旧元素往后挪了一个偏移量,然后新的元素会被赋值的索引为零的位置。也就是说,如果新插入的数值是负数,它会被头插进数组的第一个位置。

核心的几个 API 我们都已经介绍了,其他的一些 API 你可以自行参阅源码,相信对你不难。

总结一下,整数集合(intset)使用了非常简洁的数据结构,可以更少的占用内存存储一些整数,但终究是基于数组的,也就避免不了不能存储大量数据的缺点。总体来说,插入一个元素,最好情况 O(logN),最坏的情况是 O(N),摊还时间复杂度为 O(N),查找一个元素,根据索引下标时间复杂度在 O(1)。当 intset 中的元素超过 512 个,或者向其中添加了字符串,redis 会将 intset 转换成字典。


第六节 压缩列表

上一篇我们介绍了 redis 中的整数集合这种数据结构的实现,也谈到了,引入这种数据结构的一个很大的原因就是,在某些仅有少量整数元素的集合场景,通过整数集合既可以达到字典的效率,也能使用远少于字典的内存达到同样的效果。

我们本篇介绍的压缩列表,相信你从他的名字里应该也能看出来,又是一个为了节约内存而设计的数据结构,它的数据结构相对于整数集合来说会复杂了很多,但是整数集合只能允许存储少量的整型数据,而我们的压缩列表可以允许存储少量的整型数据或字符串。

这是他们之间的一个区别,下面我们来看看这种数据结构。

一、基本的结构定义

image

  • ZIPLIST_BYTES:四个字节,记录了整个压缩列表总共占用了多少字节数
  • ZIPLIST_TAIL_OFFSET:四个字节,记录了整个压缩列表第一个节点到最后一个节点跨越了多少个字节,通故这个字段可以迅速定位到列表最后一个节点位置
  • ZIPLIST_LENGTH:两个字节,记录了整个压缩列表中总共包含几个 zlentry 节点
  • zlentry:非固定字节,记录的是单个节点,这是一个复合结构,我们等下再说
  • 0xFF:一个字节,十进制的值为 255,标志压缩列表的结尾

其中,zlentry 在 redis 中确实有着这样的结构体定义,但实际上这个结构定义了一堆类似于 length 这样的字段,记录前一个节点和自身节点占用的字节数等等信息,用处不多,而我们更倾向于使用这样的逻辑结构来描述 zlentry 节点。

image

这种结构在 redis 中是没有具体结构体定义的,请知悉,网上的很多博客文章都直接描述 zlentry 节点是这样的一种结构,其实是不准确的。

简单解释一下这三个字段的含义:

  • previous_entry_length:每个节点会使用一个或者五个字节来描述前一个节点占用的总字节数,如果前一个节点占用的总字节数小于 254,那么就用一个字节存储,反之如果前一个节点占用的总字节数超过了 254,那么一个字节就不够存储了,这里会用五个字节存储并将第一个字节的值存储为固定值 254 用于区分。
  • encoding:压缩列表可以存储 16位、32位、64位的整数以及字符串,encoding 就是用来区分后面的 content 字段中存储于的到底是哪种内容,分别占多少字节,这个我们等下细说。
  • content:没什么特别的,存储的就是具体的二进制内容,整数或者字符串。

下面我们细说一个 encoding 具体是怎么存储的。

主要分为两种,一种是字符串的存储格式:

编码 编码长度 content类型
00xxxxxx 一个字节 长度小于 63 的字符串
01xxxxxx xxxxxxxx 两个字节 长度小于 16383 的字符串
10xxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx 五个字节 长度小于 4294967295 的字符串

content 的具体长度,由编码除去高两位剩余的二进制位表示。

编码 编码长度 content类型
11000000 一个字节 int16_t 类型的整数
11010000 一个字节 int32_t 类型的整数
11100000 一个字节 int64_t 类型的整数
11110000 一个字节 24 位有符号整数
11111110 一个字节 8 位有符号整数

注意,整型数据的编码是固定 11 开头的八位二进制,而字符串类型的编码都是非固定的,因为它还需要通过后面的二进制位得到字符串的长度,稍有区别。

这就是压缩列表的基本的结构定义情况,下面我们通过节点的增删改查方法源码实现来看看 redis 中具体的实现情况。

二、redis 的具体源码实现

1、ziplistNew

我们先来看看压缩列表初始化的方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
unsigned char *ziplistNew(void) {
//bytes=2*4+2
//分配压缩列表结构所需要的字节数
//ZIPLIST_BYTES + ZIPLIST_TAIL_OFFSET + ZIPLIST_LENGTH
unsigned int bytes = ZIPLIST_HEADER_SIZE+1;
unsigned char *zl = zmalloc(bytes);
//初始化 ZIPLIST_BYTES 字段
ZIPLIST_BYTES(zl) = intrev32ifbe(bytes);
//初始化 ZIPLIST_TAIL_OFFSET
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(ZIPLIST_HEADER_SIZE);
//初始化 ZIPLIST_LENGTH 字段
ZIPLIST_LENGTH(zl) = 0;
//为压缩列表最后一个字节赋值 255
zl[bytes-1] = ZIP_END;
return zl;
}

2、ziplistPush

接着我们看新增节点的源码实现:

1
2
3
4
5
6
7
unsigned char *ziplistPush(unsigned char *zl, unsigned char *s
,unsigned int slen, int where) {
unsigned char *p;
//找到待插入的位置,头部或者尾部
p = (where == ZIPLIST_HEAD) ? ZIPLIST_ENTRY_HEAD(zl) : ZIPLIST_ENTRY_END(zl);
return __ziplistInsert(zl,p,s,slen);
}

解释一下 ziplistPush 的几个入参的含义。

zl 指向一个压缩列表的首地址,s 指向一个字符串首地址),slen 指向字符串的长度(如果节点存储的值是整型,存储的就是整型值),where 指明新节点的插入方式,头插亦或尾插。

ziplistPush 方法的核心是 __ziplistInsert:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
unsigned char *__ziplistInsert(unsigned char *zl, unsigned char *p, unsigned char *s, unsigned int slen) {
size_t curlen = intrev32ifbe(ZIPLIST_BYTES(zl)), reqlen;
unsigned int prevlensize, prevlen = 0;
size_t offset;
int nextdiff = 0;
unsigned char encoding = 0;
long long value = 123456789;
zlentry tail;
//prevlensize 存储前一个节点长度,本节点使用了几个字节 1 or 5
//prelen 存储前一个节点实际占用了几个字节
if (p[0] != ZIP_END) {
ZIP_DECODE_PREVLEN(p, prevlensize, prevlen);
} else {
unsigned char *ptail = ZIPLIST_ENTRY_TAIL(zl);
if (ptail[0] != ZIP_END) {
prevlen = zipRawEntryLength(ptail);
}
}
if (zipTryEncoding(s,slen,&value,&encoding)) {
//s 指针指向一个整数,尝试进行一个转换并得到存储这个整数占用了几个字节
reqlen = zipIntSize(encoding);
} else {
//s 指针指向一个字符串(字符数组),slen 就是他占用的字节数
reqlen = slen;
}
//当前节点存储数据占用 reqlen 个字节,加上存储前一个节点长度占用的字节数
reqlen += zipStorePrevEntryLength(NULL,prevlen);
//encoding 字段存储实际占用字节数
reqlen += zipStoreEntryEncoding(NULL,encoding,slen);
//至此,reqlen 保存了存储当前节点数据占用字节数和 encoding 编码占用的字节数总和
int forcelarge = 0;
//当前节点占用的总字节减去存储前一个节点字段占用的字节
//记录的是这一个节点的插入会引起下一个节点占用字节的变化量
nextdiff = (p[0] != ZIP_END) ? zipPrevLenByteDiff(p,reqlen) : 0;
if (nextdiff == -4 && reqlen < 4) {
nextdiff = 0;
forcelarge = 1;
}
//扩容有可能导致 zl 的起始位置偏移,故记录 p 与 zl 首地址的相对偏差数,事后还原 p 指针指向
offset = p-zl;
zl = ziplistResize(zl,curlen+reqlen+nextdiff);
p = zl+offset;
if (p[0] != ZIP_END) {
memmove(p+reqlen,p-nextdiff,curlen-offset-1+nextdiff);
//把当前节点占用的字节数存储到下一个节点的头部字段
if (forcelarge)
zipStorePrevEntryLengthLarge(p+reqlen,reqlen);
else
zipStorePrevEntryLength(p+reqlen,reqlen);

//更新 tail_offset 字段,让他保存从头节点到尾节点之间的距离
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+reqlen);
zipEntry(p+reqlen, &tail);
if (p[reqlen+tail.headersize+tail.len] != ZIP_END) {
ZIPLIST_TAIL_OFFSET(zl) =
intrev32ifbe(intrev32ifbe(ZIPLIST_TAIL_OFFSET(zl))+nextdiff);
}
} else {
ZIPLIST_TAIL_OFFSET(zl) = intrev32ifbe(p-zl);
}
//是否触发连锁更新
if (nextdiff != 0) {
offset = p-zl;
zl = __ziplistCascadeUpdate(zl,p+reqlen);
p = zl+offset;
}
//将节点写入指定位置
p += zipStorePrevEntryLength(p,prevlen);
p += zipStoreEntryEncoding(p,encoding,slen);
if (ZIP_IS_STR(encoding)) {
memcpy(p,s,slen);
} else {
zipSaveInteger(p,value,encoding);
}
ZIPLIST_INCR_LENGTH(zl,1);
return zl;
}

具体细节我不再赘述,总结一下整个插入节点的步骤。

  1. 计算并得到前一个节点的总长度,并判断得到当前待插入节点保存前一个节点长度的 previous_entry_length 占用字节数
  2. 根据传入的 s 和 slen,计算并保存 encoding 字段内容
  3. 构建节点并将数据写入节点添加到压缩列表中

ps:重点要去理解压缩列表节点的数据结构定义,previous_entry_length、encoding、content 字段,这样才能比较容易理解节点新增操作的实现。

三、连锁更新

谈到 redis 的压缩列表,就必然会谈到他的连锁更新,我们先引一张图:

image

假设原本 entry1 节点占用字节数为 211(小于 254),那么 entry2 的 previous_entry_length 会使用一个字节存储 211,现在我们新插入一个节点 NEWEntry,这个节点比较大,占用了 512 个字节。

那么,我们知道,NEWEntry 节点插入后,entry2 的 previous_entry_length 存储不了 512,那么 redis 就会重分配内存,增加 entry2 的内存分配,并分配给 previous_entry_length 五个字节存储 NEWEntry 节点长度。

看似没什么问题,但是如果极端情况下,entry2 扩容四个字节后,导致自身占用字节数超过 254,就会又触发后一个节点的内存占用空间扩大,非常极端情况下,会导致所有的节点都扩容,这就是连锁更新,一次更新导致大量甚至全部节点都更新内存的分配。

如果连锁更新发生的概率很高的话,压缩列表无疑就会是一个低效的数据结构,但实际上连锁更新发生的条件是非常苛刻的,其一是需要大量节点长度小于 254 连续串联连接,其二是我们更新的节点位置恰好也导致后一个节点内存扩充更新。

基于这两点,且少量的连锁更新对性能是影响不大的,所以这里的连锁更新对压缩列表的性能是没有多大的影响的,可以忽略,但需要知晓。


参考:

🔗 《Redis开发与运维》

🔗 Redis 的底层数据结构(SDS和链表)

🔗 Redis 的底层数据结构(字典)

🔗 Redis 的底层数据结构(跳跃表)

🔗 Redis 的底层数据结构(整数集合)

🔗 Redis 的底层数据结构(压缩列表)