和我一起学习leveldb [7 util(续)]
#[缓存]
这一篇将分析leveldb的cache,请容我慢慢道来。提醒大家一下,上一篇以前你们看到的内容是还没有写完的,今天花了点时间把上一篇抽象环境分析完了,内容很多很杂很丰富,同学们再去看两眼哦,同时别忘了给我的github的这个博客项目加颗星星。
arena是轻量级的内存池,设计很简单,其数据结构如下图所示:
arena
+------------------------+
| alloc_ptr_ | ---------------------------+
+------------------------+ |
| alloc_bytes_remaining_ | |
+------------------------+ .
| vector<char*> blocks_ | ---> +----------+------+----------+--------------+
+------------------------+ | block1 |block2| block3 | block4 |
| blocks_memory_ | +----------+------+----------+--------------+
+------------------------+ 1 page x B 1 page y B
对于一次大于1/4 page的内存分配请求,如果目前块剩下的部分不够,
就直接分配一个一块所需大小的内存(避免浪费当前块),单独作为一个block(例如x,y)
arena的逻辑很简单,用一个vector<char *>
来维护所有分配的内存。alloc_ptr_指向当前内存池中可以分配内存的起始点。当一个内存分配请求x字节的内存时,先看alloc_ptr_到该块末尾还有多少内存(这个记录在alloc_bytes_remaining_中),如果该块内存够,新申请的内存首地址就是现在的alloc_ptr_,然后把alloc_ptr_朝后面移动x字节,并重新计算alloc_bytes_remaining_。如果该块剩下的内存不够了,就看x是否大于1/4 page,如果不大于1/4page,说明当前块剩下的不足1/4了,那么就直接分配一个新的block(1 page),把alloc_ptr_的置为新block的首地址,再按之前的逻辑进行分配,旧的块剩余的内存就被浪费掉了。新申请的block被pushback到blocks_中,以后在arena被析构的时候统一释放。如果内存分配请求大于1/4个pagesize,为了避免浪费当前块剩下的内存,就直接向操作系统申请x字节的内存,作为一个单独的block返回,这个block同样也会被pushback到vector之中。把机理分析完毕之后,再贴代码就很好读懂了:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class Arena {
public:
Arena();
~Arena();
// Return a pointer to a newly allocated memory block of "bytes" bytes.
char* Allocate(size_t bytes);
// Allocate memory with the normal alignment guarantees provided by malloc
char* AllocateAligned(size_t bytes);
// Returns an estimate of the total memory usage of data allocated
// by the arena (including space allocated but not yet used for user
// allocations).
size_t MemoryUsage() const {
return blocks_memory_ + blocks_.capacity() * sizeof(char*);
}
private:
char* AllocateFallback(size_t bytes);
char* AllocateNewBlock(size_t block_bytes);
// Allocation state
char* alloc_ptr_;
size_t alloc_bytes_remaining_;
// Array of new[] allocated memory blocks
std::vector<char*> blocks_;
// Bytes of memory in blocks allocated so far
size_t blocks_memory_;
// No copying allowed
Arena(const Arena&);
void operator=(const Arena&);
};
inline char* Arena::Allocate(size_t bytes) {
// The semantics of what to return are a bit messy if we allow
// 0-byte allocations, so we disallow them here (we don't need
// them for our internal use).
assert(bytes > 0);
if (bytes <= alloc_bytes_remaining_) {
char* result = alloc_ptr_;
alloc_ptr_ += bytes;
alloc_bytes_remaining_ -= bytes;
return result;
}
return AllocateFallback(bytes);
}
在`Arena::Allocate函数的实现中,我们看到,如果当前块放不下了,就调用AllocateFallback。AllocateFallback会根据当前申请的内存大小,去单独向操作系统申请一个块给这次请求,或者另分配一个内存块,把当前的块剩下的内存浪费掉:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
static const int kBlockSize = 4096;
Arena::Arena() {
blocks_memory_ = 0;
alloc_ptr_ = NULL; // First allocation will allocate a block
alloc_bytes_remaining_ = 0;
}
Arena::~Arena() {
for (size_t i = 0; i < blocks_.size(); i++) {
delete[] blocks_[i];
}
}
char* Arena::AllocateFallback(size_t bytes) {
if (bytes > kBlockSize / 4) {
// Object is more than a quarter of our block size. Allocate it separately
// to avoid wasting too much space in leftover bytes.
char* result = AllocateNewBlock(bytes);
return result;
}
// We waste the remaining space in the current block.
alloc_ptr_ = AllocateNewBlock(kBlockSize);
alloc_bytes_remaining_ = kBlockSize;
char* result = alloc_ptr_;
alloc_ptr_ += bytes;
alloc_bytes_remaining_ -= bytes;
return result;
}
// 分配一个大小为block_bytes的内存块,并挂在vector后面
char* Arena::AllocateNewBlock(size_t block_bytes) {
char* result = new char[block_bytes];
blocks_memory_ += block_bytes;
blocks_.push_back(result);
return result;
}
关于Arena,最后再提一下Arena里面提供的内存对齐分配接口。读了Arena::Allocate
的代码,我们知道Arena的Allocate返回的内存地址是不保证任何对齐的。然而如果我们使用malloc簇函数,返回的内存默认是sizeof(void *)对齐的(关于这一点,很多同学是不清楚的,我觉得如果要写好C/C++程序,必须对malloc的机制有一定的了解,例如malloc是如何对齐的,malloc分配的内存块被溢出会出现什么情况,如果要分配一段内存并且自定义字节对齐怎么办等等)。因此在Arena中也提供了返回地址内存对齐的分配函数Arena::AllocateAligned
。其实实现内存对齐相当简单,只需计算当前的alloc_ptr_的地址,多分配几个字节,返回内存对齐的地址就可以了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
char* Arena::AllocateAligned(size_t bytes) {
const int align = sizeof(void*); // We'll align to pointer size
assert((align & (align-1)) == 0); // Pointer size should be a power of 2
size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align-1);
size_t slop = (current_mod == 0 ? 0 : align - current_mod);
size_t needed = bytes + slop;
char* result;
if (needed <= alloc_bytes_remaining_) {
result = alloc_ptr_ + slop;
alloc_ptr_ += needed;
alloc_bytes_remaining_ -= needed;
} else {
// AllocateFallback always returned aligned memory
result = AllocateFallback(bytes);
}
assert((reinterpret_cast<uintptr_t>(result) & (align-1)) == 0);
return result;
}
值得一提的是第13行的注释说,AllocateFallback
函数返回的内存总是内存对齐的。这是因为AllocateFallback
中分配内存调用了AllocateNewBlock
,而AllocateNewBlock
里面是直接调用new
算子进行内存分配的。标准库的new
实现是对于小于128字节的内存分配,自己维护了一个内存池,内存池中是按8、16、…、128字节维护free list
的,而对于超过128字节的内存分配请求,直接调用calloc进行内存分配,因此new
算子返回的地址一定是8字节对齐的。关于这部分,有兴趣的同学请参考由华中科技大学出版社出版的《STL源码剖析》的第二章(请原谅我在这里为母校加了一个链接 :) )。
至此,Arena的实现就全部分析完毕了,接下来我们来看看如何利用Arena实现一个Cache。
在理解leveldb的cache实现中,要理解三个数据结构,一是HandleTable,这是一个经典的HashTable的实现,HashTable里面的每个元素除了有一个hash链的指针next_hash外,另外再附带prev和next指针,将所有的Handle链接起来成一个双向链表,用于实现第二个数据结构LRUCache。第三个是ShardedLRUCache,它是用来管理多个LRUCache的。我们先来看一下HashTable:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// An entry is a variable length heap-allocated structure. Entries
// are kept in a circular doubly linked list ordered by access time.
struct LRUHandle {
void* value;
void (*deleter)(const Slice&, void* value);
LRUHandle* next_hash;
LRUHandle* next;
LRUHandle* prev;
size_t charge; // TODO(opt): Only allow uint32_t?
size_t key_length;
uint32_t refs;
uint32_t hash; // Hash of key(); used for fast sharding and comparisons
char key_data[1]; // Beginning of key
Slice key() const {
// For cheaper lookups, we allow a temporary Handle object
// to store a pointer to a key in "value".
if (next == this) {
return *(reinterpret_cast<Slice*>(value));
} else {
return Slice(key_data, key_length);
}
}
};
每一个LRUHandle对象代表缓存中的一个key-value键值对,其中next_hash域是用于hash链的(解决hash碰撞问题),而LRU的实现就是依赖于prev和next指针,如果HashTable中的某个元素被访问,就会把该元素移到双链表的表头,如果朝双链表中插入一个元素,而双链表已达到最大程度,就直接移除掉双链表尾部的元素,因为该元素的访问日期一定比前面的所有元素早(思考一下问什么?)。
另外我再啰嗦一句,上面第13行,LRUHandle结构体的最后一个字段是char key_data[1]。C基础不太牢固的同学可能会疑问,为什么只用了1个字节。其实这里仅仅一个字节的占位符而已,由于这是结构体的最后一个元素,所以真正的数据可以接在后面,而不会影响其他字段。类似的还有redis中的sds数据结构,其声明为struct sdshdr { szie_t len; char data[]; }
,这种形式也是很少见的,有兴趣的同学可以去google一下这到底是什么意思。把sdshdr理解了,你就能理解leveldb这里的HandleTable中的char key_data[1]
的含义啦。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
class HandleTable {
public:
HandleTable() : length_(0), elems_(0), list_(NULL) { Resize(); }
~HandleTable() { delete[] list_; }
LRUHandle* Lookup(const Slice& key, uint32_t hash) {
return *FindPointer(key, hash);
}
LRUHandle* Insert(LRUHandle* h) {
LRUHandle** ptr = FindPointer(h->key(), h->hash);
LRUHandle* old = *ptr;
h->next_hash = (old == NULL ? NULL : old->next_hash);
*ptr = h;
if (old == NULL) {
++elems_;
if (elems_ > length_) {
// Since each cache entry is fairly large, we aim for a small
// average linked list length (<= 1).
Resize();
}
}
return old;
}
LRUHandle* Remove(const Slice& key, uint32_t hash) {
LRUHandle** ptr = FindPointer(key, hash);
LRUHandle* result = *ptr;
if (result != NULL) {
*ptr = result->next_hash;
--elems_;
}
return result;
}
private:
// The table consists of an array of buckets where each bucket is
// a linked list of cache entries that hash into the bucket.
uint32_t length_;
uint32_t elems_;
LRUHandle** list_;
// Return a pointer to slot that points to a cache entry that
// matches key/hash. If there is no such cache entry, return a
// pointer to the trailing slot in the corresponding linked list.
LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
LRUHandle** ptr = &list_[hash & (length_ - 1)];
while (*ptr != NULL &&
((*ptr)->hash != hash || key != (*ptr)->key())) {
ptr = &(*ptr)->next_hash;
}
return ptr;
}
void Resize() {
uint32_t new_length = 4;
while (new_length < elems_) {
new_length *= 2;
}
LRUHandle** new_list = new LRUHandle*[new_length];
memset(new_list, 0, sizeof(new_list[0]) * new_length);
uint32_t count = 0;
for (uint32_t i = 0; i < length_; i++) {
LRUHandle* h = list_[i];
while (h != NULL) {
LRUHandle* next = h->next_hash;
Slice key = h->key();
uint32_t hash = h->hash;
LRUHandle** ptr = &new_list[hash & (new_length - 1)];
h->next_hash = *ptr;
*ptr = h;
h = next;
count++;
}
}
assert(elems_ == count);
delete[] list_;
list_ = new_list;
length_ = new_length;
}
};
HandleTable的实现是很经典的代码,通常的hash表都是这样实现的,比较有参考价值的代码是Resize函数。附带说一句,面试很容易遇到hashTable,hashMap之类的问题,要是把这里的哈希表看懂了,基本上这类问题你都可以回答得很圆满(特别要注意hash扩容的问题)。当然,类似的hash经典实现还可以去参考memcached和redis的相关代码,当然memcache里面的LRUCache还有一个slab的概念,是用于最大限度防止内存碎片并节约内存,而redis的扩容做得很好,不像memcache那样会把整个表锁住,这些都是经典代码,非常值得学习。
LRUCache是使用HandleTable和双向链表实现的LRU(Least Recently Used)缓存,所有进入LRUCache的元素都被挂到HandleTable的哈希链中,并由HandleTable的扩容机制确保了查找的平均复杂度为O(1)。LRUCache中每个被访问的元素,都会被挂到lru_链的链首,这样就保证了链尾的元素一定是最远被访问的元素。因此,如果cache中的元素已满,在插入新元素之时,只需要直接替换掉lru_链尾的元素就可以了。
LRU Cache 示意图
+---+---+---+---+---+---+
HandleTable | 0 | 1 | 2 | ... | N |
+---+---+---+---+---+---+
| | |
+ + +
+---+ +---+ +---+
lru_ <====>| A |<=>| B |<=====>| C |
+---+ +---+ +---+
| / |
+ /============/ +
+---+ +---+
| D |<=============>| E |<==|
+---+ +---+ |
| |
+ |
+---+ |
| F |<==|
+---+
上图已经很清晰地说明了LRUCache的实现原理,我再从工程的角度上补充一点,C/C++和其他语言相比,最大的优势和劣势就是内存分配与释放了。动态分配的内存需要手动释放,一方面带来的是那些使用GC的语言不可比拟的性能,另一方面也为开发产品带来了额外的负担和风险。实现LRUCache,不可避免地需要考虑缓存失效之后的内存释放问题。如果一块内存被失效替换掉了,它是否应该立刻被释放掉呢?是否还有别的数据结构在使用这块内存呢?在leveldb的LRUCache实现中,使用了引用计数来解决这个问题,每次删除一块内存(一个Handle),需要对其调用Unref函数:
1
2
3
4
5
6
7
8
9
void LRUCache::Unref(LRUHandle* e) {
assert(e->refs > 0);
e->refs--;
if (e->refs <= 0) {
usage_ -= e->charge;
(*e->deleter)(e->key(), e->value);
free(e);
}
}
最后是ShardedLRUCache,这个类的实现非常简单,就是把多个LRUCache放在一个数组(大小为2的4次方)中进行管理,插入或查找一个Handle,首先根据这个Handle的hash值的高4bit,决定这个Handle的位置应该在哪一个LRUCache中,然后再在那个LRUCache中进行操作。看到这里也许就会有点纳闷了,有一个LRUCache就够用了,为什么还需要在其上层实现一个ShardedLRUCache呢?这是因为leveldb是多线程工作的,对每一个LRUCache进行操作,都会把整个LRUCache数据结构锁住。因此,ShardedLRUCache就是用来分担锁开销的。
实现ShardedLRUCache的代码不长,直接贴在下面,作为本章的结束。至此,leveldb源码中的util文件夹已经全部分析完毕,从下一章开始,我将开始对table文件夹进行分析。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
static const int kNumShardBits = 4;
static const int kNumShards = 1 << kNumShardBits;
class ShardedLRUCache : public Cache {
private:
LRUCache shard_[kNumShards];
port::Mutex id_mutex_;
uint64_t last_id_;
static inline uint32_t HashSlice(const Slice& s) {
return Hash(s.data(), s.size(), 0);
}
static uint32_t Shard(uint32_t hash) {
return hash >> (32 - kNumShardBits);
}
public:
explicit ShardedLRUCache(size_t capacity)
: last_id_(0) {
const size_t per_shard = (capacity + (kNumShards - 1)) / kNumShards; // 向上取整
for (int s = 0; s < kNumShards; s++) {
shard_[s].SetCapacity(per_shard);
}
}
virtual ~ShardedLRUCache() { }
virtual Handle* Insert(const Slice& key, void* value, size_t charge,
void (*deleter)(const Slice& key, void* value)) {
const uint32_t hash = HashSlice(key);
return shard_[Shard(hash)].Insert(key, hash, value, charge, deleter);
}
virtual Handle* Lookup(const Slice& key) {
const uint32_t hash = HashSlice(key);
return shard_[Shard(hash)].Lookup(key, hash);
}
virtual void Release(Handle* handle) {
LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
shard_[Shard(h->hash)].Release(handle);
}
virtual void Erase(const Slice& key) {
const uint32_t hash = HashSlice(key);
shard_[Shard(hash)].Erase(key, hash);
}
virtual void* Value(Handle* handle) {
return reinterpret_cast<LRUHandle*>(handle)->value;
}
virtual uint64_t NewId() {
MutexLock l(&id_mutex_);
return ++(last_id_);
}
};