#[缓存]

这一篇将分析leveldb的cache,请容我慢慢道来。提醒大家一下,上一篇以前你们看到的内容是还没有写完的,今天花了点时间把上一篇抽象环境分析完了,内容很多很杂很丰富,同学们再去看两眼哦,同时别忘了给我的github的这个博客项目加颗星星。

arena是轻量级的内存池,设计很简单,其数据结构如下图所示:

           arena
+------------------------+
|       alloc_ptr_       | ---------------------------+
+------------------------+                            |
| alloc_bytes_remaining_ |                            |
+------------------------+                            .
| vector<char*> blocks_  | ---> +----------+------+----------+--------------+
+------------------------+      |  block1  |block2|  block3  |    block4    |
|     blocks_memory_     |      +----------+------+----------+--------------+
+------------------------+         1 page    x B     1 page        y B

对于一次大于1/4 page的内存分配请求,如果目前块剩下的部分不够,
就直接分配一个一块所需大小的内存(避免浪费当前块),单独作为一个block(例如x,y)

arena的逻辑很简单,用一个vector<char *>来维护所有分配的内存。alloc_ptr_指向当前内存池中可以分配内存的起始点。当一个内存分配请求x字节的内存时,先看alloc_ptr_到该块末尾还有多少内存(这个记录在alloc_bytes_remaining_中),如果该块内存够,新申请的内存首地址就是现在的alloc_ptr_,然后把alloc_ptr_朝后面移动x字节,并重新计算alloc_bytes_remaining_。如果该块剩下的内存不够了,就看x是否大于1/4 page,如果不大于1/4page,说明当前块剩下的不足1/4了,那么就直接分配一个新的block(1 page),把alloc_ptr_的置为新block的首地址,再按之前的逻辑进行分配,旧的块剩余的内存就被浪费掉了。新申请的block被pushback到blocks_中,以后在arena被析构的时候统一释放。如果内存分配请求大于1/4个pagesize,为了避免浪费当前块剩下的内存,就直接向操作系统申请x字节的内存,作为一个单独的block返回,这个block同样也会被pushback到vector之中。把机理分析完毕之后,再贴代码就很好读懂了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
class Arena {
    public:
        Arena();
        ~Arena();

        // Return a pointer to a newly allocated memory block of "bytes" bytes.
        char* Allocate(size_t bytes);

        // Allocate memory with the normal alignment guarantees provided by malloc
        char* AllocateAligned(size_t bytes);

        // Returns an estimate of the total memory usage of data allocated
        // by the arena (including space allocated but not yet used for user
        // allocations).
        size_t MemoryUsage() const {
            return blocks_memory_ + blocks_.capacity() * sizeof(char*);
        }

    private:
        char* AllocateFallback(size_t bytes);
        char* AllocateNewBlock(size_t block_bytes);

        // Allocation state
        char* alloc_ptr_;
        size_t alloc_bytes_remaining_;

        // Array of new[] allocated memory blocks
        std::vector<char*> blocks_;

        // Bytes of memory in blocks allocated so far
        size_t blocks_memory_;

        // No copying allowed
        Arena(const Arena&);
        void operator=(const Arena&);
};

inline char* Arena::Allocate(size_t bytes) {
    // The semantics of what to return are a bit messy if we allow
    // 0-byte allocations, so we disallow them here (we don't need
    // them for our internal use).
    assert(bytes > 0);
    if (bytes <= alloc_bytes_remaining_) {
        char* result = alloc_ptr_;
        alloc_ptr_ += bytes;
        alloc_bytes_remaining_ -= bytes;
        return result;
    }
    return AllocateFallback(bytes);
}

在`Arena::Allocate函数的实现中,我们看到,如果当前块放不下了,就调用AllocateFallback。AllocateFallback会根据当前申请的内存大小,去单独向操作系统申请一个块给这次请求,或者另分配一个内存块,把当前的块剩下的内存浪费掉:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
static const int kBlockSize = 4096;

Arena::Arena() {
    blocks_memory_ = 0;
    alloc_ptr_ = NULL;  // First allocation will allocate a block
    alloc_bytes_remaining_ = 0;
}

Arena::~Arena() {
    for (size_t i = 0; i < blocks_.size(); i++) {
        delete[] blocks_[i];
    }
}

char* Arena::AllocateFallback(size_t bytes) {
    if (bytes > kBlockSize / 4) {
        // Object is more than a quarter of our block size.  Allocate it separately
        // to avoid wasting too much space in leftover bytes.
        char* result = AllocateNewBlock(bytes);
        return result;
    }

    // We waste the remaining space in the current block.
    alloc_ptr_ = AllocateNewBlock(kBlockSize);
    alloc_bytes_remaining_ = kBlockSize;

    char* result = alloc_ptr_;
    alloc_ptr_ += bytes;
    alloc_bytes_remaining_ -= bytes;
    return result;
}

// 分配一个大小为block_bytes的内存块,并挂在vector后面
char* Arena::AllocateNewBlock(size_t block_bytes) {
  char* result = new char[block_bytes];
  blocks_memory_ += block_bytes;
  blocks_.push_back(result);
  return result;
}

关于Arena,最后再提一下Arena里面提供的内存对齐分配接口。读了Arena::Allocate的代码,我们知道Arena的Allocate返回的内存地址是不保证任何对齐的。然而如果我们使用malloc簇函数,返回的内存默认是sizeof(void *)对齐的(关于这一点,很多同学是不清楚的,我觉得如果要写好C/C++程序,必须对malloc的机制有一定的了解,例如malloc是如何对齐的,malloc分配的内存块被溢出会出现什么情况,如果要分配一段内存并且自定义字节对齐怎么办等等)。因此在Arena中也提供了返回地址内存对齐的分配函数Arena::AllocateAligned。其实实现内存对齐相当简单,只需计算当前的alloc_ptr_的地址,多分配几个字节,返回内存对齐的地址就可以了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
char* Arena::AllocateAligned(size_t bytes) {
    const int align = sizeof(void*);    // We'll align to pointer size
    assert((align & (align-1)) == 0);   // Pointer size should be a power of 2
    size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align-1);
    size_t slop = (current_mod == 0 ? 0 : align - current_mod);
    size_t needed = bytes + slop;
    char* result;
    if (needed <= alloc_bytes_remaining_) {
        result = alloc_ptr_ + slop;
        alloc_ptr_ += needed;
        alloc_bytes_remaining_ -= needed;
    } else {
        // AllocateFallback always returned aligned memory
        result = AllocateFallback(bytes);
    }
    assert((reinterpret_cast<uintptr_t>(result) & (align-1)) == 0);
    return result;
}

值得一提的是第13行的注释说,AllocateFallback函数返回的内存总是内存对齐的。这是因为AllocateFallback中分配内存调用了AllocateNewBlock,而AllocateNewBlock里面是直接调用new算子进行内存分配的。标准库的new实现是对于小于128字节的内存分配,自己维护了一个内存池,内存池中是按8、16、…、128字节维护free list的,而对于超过128字节的内存分配请求,直接调用calloc进行内存分配,因此new算子返回的地址一定是8字节对齐的。关于这部分,有兴趣的同学请参考由华中科技大学出版社出版的《STL源码剖析》的第二章(请原谅我在这里为母校加了一个链接 :) )。

至此,Arena的实现就全部分析完毕了,接下来我们来看看如何利用Arena实现一个Cache。

在理解leveldb的cache实现中,要理解三个数据结构,一是HandleTable,这是一个经典的HashTable的实现,HashTable里面的每个元素除了有一个hash链的指针next_hash外,另外再附带prev和next指针,将所有的Handle链接起来成一个双向链表,用于实现第二个数据结构LRUCache。第三个是ShardedLRUCache,它是用来管理多个LRUCache的。我们先来看一下HashTable:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
// An entry is a variable length heap-allocated structure.  Entries
// are kept in a circular doubly linked list ordered by access time.
struct LRUHandle {
  void* value;
  void (*deleter)(const Slice&, void* value);
  LRUHandle* next_hash;
  LRUHandle* next;
  LRUHandle* prev;
  size_t charge;      // TODO(opt): Only allow uint32_t?
  size_t key_length;
  uint32_t refs;
  uint32_t hash;      // Hash of key(); used for fast sharding and comparisons
  char key_data[1];   // Beginning of key

  Slice key() const {
    // For cheaper lookups, we allow a temporary Handle object
    // to store a pointer to a key in "value".
    if (next == this) {
      return *(reinterpret_cast<Slice*>(value));
    } else {
      return Slice(key_data, key_length);
    }
  }
};

每一个LRUHandle对象代表缓存中的一个key-value键值对,其中next_hash域是用于hash链的(解决hash碰撞问题),而LRU的实现就是依赖于prev和next指针,如果HashTable中的某个元素被访问,就会把该元素移到双链表的表头,如果朝双链表中插入一个元素,而双链表已达到最大程度,就直接移除掉双链表尾部的元素,因为该元素的访问日期一定比前面的所有元素早(思考一下问什么?)。

另外我再啰嗦一句,上面第13行,LRUHandle结构体的最后一个字段是char key_data[1]。C基础不太牢固的同学可能会疑问,为什么只用了1个字节。其实这里仅仅一个字节的占位符而已,由于这是结构体的最后一个元素,所以真正的数据可以接在后面,而不会影响其他字段。类似的还有redis中的sds数据结构,其声明为struct sdshdr { szie_t len; char data[]; },这种形式也是很少见的,有兴趣的同学可以去google一下这到底是什么意思。把sdshdr理解了,你就能理解leveldb这里的HandleTable中的char key_data[1]的含义啦。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
class HandleTable {
 public:
  HandleTable() : length_(0), elems_(0), list_(NULL) { Resize(); }
  ~HandleTable() { delete[] list_; }

  LRUHandle* Lookup(const Slice& key, uint32_t hash) {
    return *FindPointer(key, hash);
  }

  LRUHandle* Insert(LRUHandle* h) {
    LRUHandle** ptr = FindPointer(h->key(), h->hash);
    LRUHandle* old = *ptr;
    h->next_hash = (old == NULL ? NULL : old->next_hash);
    *ptr = h;
    if (old == NULL) {
      ++elems_;
      if (elems_ > length_) {
        // Since each cache entry is fairly large, we aim for a small
        // average linked list length (<= 1).
        Resize();
      }
    }
    return old;
  }

  LRUHandle* Remove(const Slice& key, uint32_t hash) {
    LRUHandle** ptr = FindPointer(key, hash);
    LRUHandle* result = *ptr;
    if (result != NULL) {
      *ptr = result->next_hash;
      --elems_;
    }
    return result;
  }

 private:
  // The table consists of an array of buckets where each bucket is
  // a linked list of cache entries that hash into the bucket.
  uint32_t length_;
  uint32_t elems_;
  LRUHandle** list_;

  // Return a pointer to slot that points to a cache entry that
  // matches key/hash.  If there is no such cache entry, return a
  // pointer to the trailing slot in the corresponding linked list.
  LRUHandle** FindPointer(const Slice& key, uint32_t hash) {
    LRUHandle** ptr = &list_[hash & (length_ - 1)];
    while (*ptr != NULL &&
           ((*ptr)->hash != hash || key != (*ptr)->key())) {
      ptr = &(*ptr)->next_hash;
    }
    return ptr;
  }

  void Resize() {
    uint32_t new_length = 4;
    while (new_length < elems_) {
      new_length *= 2;
    }
    LRUHandle** new_list = new LRUHandle*[new_length];
    memset(new_list, 0, sizeof(new_list[0]) * new_length);
    uint32_t count = 0;
    for (uint32_t i = 0; i < length_; i++) {
      LRUHandle* h = list_[i];
      while (h != NULL) {
        LRUHandle* next = h->next_hash;
        Slice key = h->key();
        uint32_t hash = h->hash;
        LRUHandle** ptr = &new_list[hash & (new_length - 1)];
        h->next_hash = *ptr;
        *ptr = h;
        h = next;
        count++;
      }
    }
    assert(elems_ == count);
    delete[] list_;
    list_ = new_list;
    length_ = new_length;
  }
};

HandleTable的实现是很经典的代码,通常的hash表都是这样实现的,比较有参考价值的代码是Resize函数。附带说一句,面试很容易遇到hashTable,hashMap之类的问题,要是把这里的哈希表看懂了,基本上这类问题你都可以回答得很圆满(特别要注意hash扩容的问题)。当然,类似的hash经典实现还可以去参考memcachedredis的相关代码,当然memcache里面的LRUCache还有一个slab的概念,是用于最大限度防止内存碎片并节约内存,而redis的扩容做得很好,不像memcache那样会把整个表锁住,这些都是经典代码,非常值得学习。

LRUCache是使用HandleTable和双向链表实现的LRU(Least Recently Used)缓存,所有进入LRUCache的元素都被挂到HandleTable的哈希链中,并由HandleTable的扩容机制确保了查找的平均复杂度为O(1)。LRUCache中每个被访问的元素,都会被挂到lru_链的链首,这样就保证了链尾的元素一定是最远被访问的元素。因此,如果cache中的元素已满,在插入新元素之时,只需要直接替换掉lru_链尾的元素就可以了。

             LRU Cache 示意图
             +---+---+---+---+---+---+
HandleTable  | 0 | 1 | 2 |  ...  | N |
             +---+---+---+---+---+---+
               |       |           |
               +       +           +
             +---+   +---+       +---+
  lru_ <====>| A |<=>| B |<=====>| C |
             +---+   +---+       +---+
               |                /  |
               +  /============/   +
             +---+               +---+
             | D |<=============>| E |<==|
             +---+               +---+   |
                                   |     |
                                   +     |
                                 +---+   |
                                 | F |<==|
                                 +---+

上图已经很清晰地说明了LRUCache的实现原理,我再从工程的角度上补充一点,C/C++和其他语言相比,最大的优势和劣势就是内存分配与释放了。动态分配的内存需要手动释放,一方面带来的是那些使用GC的语言不可比拟的性能,另一方面也为开发产品带来了额外的负担和风险。实现LRUCache,不可避免地需要考虑缓存失效之后的内存释放问题。如果一块内存被失效替换掉了,它是否应该立刻被释放掉呢?是否还有别的数据结构在使用这块内存呢?在leveldb的LRUCache实现中,使用了引用计数来解决这个问题,每次删除一块内存(一个Handle),需要对其调用Unref函数:

1
2
3
4
5
6
7
8
9
void LRUCache::Unref(LRUHandle* e) {
  assert(e->refs > 0);
  e->refs--;
  if (e->refs <= 0) {
    usage_ -= e->charge;
    (*e->deleter)(e->key(), e->value);
    free(e);
  }
}

最后是ShardedLRUCache,这个类的实现非常简单,就是把多个LRUCache放在一个数组(大小为2的4次方)中进行管理,插入或查找一个Handle,首先根据这个Handle的hash值的高4bit,决定这个Handle的位置应该在哪一个LRUCache中,然后再在那个LRUCache中进行操作。看到这里也许就会有点纳闷了,有一个LRUCache就够用了,为什么还需要在其上层实现一个ShardedLRUCache呢?这是因为leveldb是多线程工作的,对每一个LRUCache进行操作,都会把整个LRUCache数据结构锁住。因此,ShardedLRUCache就是用来分担锁开销的。

实现ShardedLRUCache的代码不长,直接贴在下面,作为本章的结束。至此,leveldb源码中的util文件夹已经全部分析完毕,从下一章开始,我将开始对table文件夹进行分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
static const int kNumShardBits = 4;
static const int kNumShards = 1 << kNumShardBits;

class ShardedLRUCache : public Cache {
 private:
  LRUCache shard_[kNumShards];
  port::Mutex id_mutex_;
  uint64_t last_id_;

  static inline uint32_t HashSlice(const Slice& s) {
    return Hash(s.data(), s.size(), 0);
  }

  static uint32_t Shard(uint32_t hash) {
    return hash >> (32 - kNumShardBits);
  }

 public:
  explicit ShardedLRUCache(size_t capacity)
      : last_id_(0) {
    const size_t per_shard = (capacity + (kNumShards - 1)) / kNumShards; // 向上取整
    for (int s = 0; s < kNumShards; s++) {
      shard_[s].SetCapacity(per_shard);
    }
  }
  virtual ~ShardedLRUCache() { }
  virtual Handle* Insert(const Slice& key, void* value, size_t charge,
                         void (*deleter)(const Slice& key, void* value)) {
    const uint32_t hash = HashSlice(key);
    return shard_[Shard(hash)].Insert(key, hash, value, charge, deleter);
  }
  virtual Handle* Lookup(const Slice& key) {
    const uint32_t hash = HashSlice(key);
    return shard_[Shard(hash)].Lookup(key, hash);
  }
  virtual void Release(Handle* handle) {
    LRUHandle* h = reinterpret_cast<LRUHandle*>(handle);
    shard_[Shard(h->hash)].Release(handle);
  }
  virtual void Erase(const Slice& key) {
    const uint32_t hash = HashSlice(key);
    shard_[Shard(hash)].Erase(key, hash);
  }
  virtual void* Value(Handle* handle) {
    return reinterpret_cast<LRUHandle*>(handle)->value;
  }
  virtual uint64_t NewId() {
    MutexLock l(&id_mutex_);
    return ++(last_id_);
  }
};