#[编码与过滤器]

这一篇我们先来讨论coding.h/coding.cc,leveldb作者在这两个文件中实现了整数编码,总体来说是非常简洁直观的。其中Fixed32/Fixed64是定长的32/64位整数编码。实现非常简单:

1
2
3
4
5
6
7
8
9
10
void EncodeFixed32(char* buf, uint32_t value) {
#if __BYTE_ORDER == __LITTLE_ENDIAN
    memcpy(buf, &value, sizeof(value));
#else
    buf[0] = value & 0xff;
    buf[1] = (value >> 8) & 0xff;
    buf[2] = (value >> 16) & 0xff;
    buf[3] = (value >> 24) & 0xff;
#endif
}

32位整数,编码成4个字节的整数,如果是小尾顺序(即低地址存低字节),那么就直接memcpy到字符数组中,如果是大尾顺序,就做一下处理。再来看Fixed32的解码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
inline uint32_t DecodeFixed32(const char* ptr) {
    if (port::kLittleEndian) {
        // Load the raw bytes
        uint32_t result;
        memcpy(&result, ptr, sizeof(result));  // gcc optimizes this to a plain load
        return result;
    } else {
        /* 注意这里先要强制转换成unsigned char,char是有符号的,直接转成整数可能出问题 */
        return ((static_cast<uint32_t>(static_cast<unsigned char>(ptr[0])))
                | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[1])) << 8)
                | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[2])) << 16)
                | (static_cast<uint32_t>(static_cast<unsigned char>(ptr[3])) << 24));
    }
}

请注意这里字符转换的细节问题,我已经在第8行加了注释说明。这里要说明一下,问起char的取值范围,很多同学都以为是0~255,其实char是有符号的,所以char的取值范围应该是-128~127。实践中进行类型转换(例如char转成int)时,一定要注意考虑这里面的问题。

接下来再讲一下对整数的变长编码。我们知道,如果要序列化一个字符串,最简单直观的想法就是,先在头部用几个字节存放字符串的长度,再在后面跟真正的字符串的值。如果用uint32表示长度,那么是不是一定需要用4个字节存储呢?实际上,往往我们存储的字符串的长度可能就是几十或者一百多,如果是这样,那么一两个字节就可以表示了,没必要一定要占用4个字节,基于此,就有了变长整数编码(thrift中的CompactProtocol也是用这种方式来存储变长整数的),另外,变长整数编码技术还被广泛用于搜索之中,一个很常见的场景就是搜索引擎中倒排拉链。关于更高级的话题,可以参考DocList压缩方法简介。好了,废话不多说,我们还是回过头来看看leveldb中的varint,其实很简单,就是每个Byte用最高位bit的0/1值表示该整数是否结束,用剩余的7bit存储实际的数值。因此,[0, 2^7 - 1]区间的无符号整数只需要用一个Byte来存储,[0, 2^14 - 1]区间的无符号整数用两个Byte存储。

变长整数存储示意图:
1001 0001  1000 1001 0111 1100
^          ^         ^          A: 第一字节,最高位是1,表示未结束,实际值是 001 0001
|          |         |          B: 第二字节,最高位是1,表示未结束,实际值是 000 1001
A          B         C          C: 第三字节,最高位是0,表示结束,  实际值是 111 1100
因此,三个字节拼接成: 
[001 0001][000 1001][111 1100]
    = [17] * (2^7)^2 + [9] * (2^7)^1 + [124] * (2^7)^0
    = 278528 + 1152 + 124
    = 279804

把这个结构看明白了,再来看代码就一目了然:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
char* EncodeVarint32(char* dst, uint32_t v) {
    // Operate on characters as unsigneds
    unsigned char* ptr = reinterpret_cast<unsigned char*>(dst);
    static const int B = 128;
    if (v < (1<<7)) {
        *(ptr++) = v;
    } else if (v < (1<<14)) {
        *(ptr++) = v | B;
        *(ptr++) = v>>7;
    } else if (v < (1<<21)) {
        *(ptr++) = v | B;
        *(ptr++) = (v>>7) | B;
        *(ptr++) = v>>14;
    } else if (v < (1<<28)) {
        *(ptr++) = v | B;
        *(ptr++) = (v>>7) | B;
        *(ptr++) = (v>>14) | B;
        *(ptr++) = v>>21;
    } else {
        *(ptr++) = v | B;
        *(ptr++) = (v>>7) | B;
        *(ptr++) = (v>>14) | B;
        *(ptr++) = (v>>21) | B;
        *(ptr++) = v>>28;
    }
    return reinterpret_cast<char*>(ptr);
}

/* varint64和varint32其实是同样的原理,
   不过两种实现的代码不一样而已,一个直观,一个简洁 */
char* EncodeVarint64(char* dst, uint64_t v) {
    static const int B = 128;
    unsigned char* ptr = reinterpret_cast<unsigned char*>(dst);
    while (v >= B) {
        *(ptr++) = (v & (B-1)) | B;
        v >>= 7;
    }
    *(ptr++) = static_cast<unsigned char>(v);
    return reinterpret_cast<char*>(ptr);
}

另外不知道大家看到这里有没有发现一个很有趣的现象。EncodeVarint32和EncodeVarint64这两个函数,原理相同,但是作者用了两种不同的方式来实现。而前面的EncodeFixed32和EncodeFixed64,原理也是一样,而作者的实现方式不同,一个是使用了宏来判定大小端顺序,另一个则是使用了port中一个接口来判别大小端顺序,也许作者是想通过这种方式来展示各种技巧,秀一秀技术吧。最后谈一下GetVarint32PtrFallback这个函数,仔细阅读这段代码,就能看出来,这个函数是用于解析变长编码字符串的。先解码字符串首部的长度,并且返回指向字符串真实地址的指针。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
const char* GetVarint32PtrFallback(const char* p, const char* limit, uint32_t* value) {
    uint32_t result = 0;
    for (uint32_t shift = 0; shift <= 28 && p < limit; shift += 7) {
        uint32_t byte = *(reinterpret_cast<const unsigned char*>(p));
        p++;
        if (byte & 128) {
            // More bytes are present
            result |= ((byte & 127) << shift);
        } else {
            result |= (byte << shift);
            *value = result;
            return reinterpret_cast<const char*>(p);
        }
    }
    return NULL;
}

本篇最后讨论一下过滤器,include/leveldb/filter_policy.h和util/filter_policy.cc封装了一个过滤器策略类,鉴于源码中的注释比较多,我直接把注释清理掉后的源码贴出来,再附上一点讲解:

1
2
3
4
5
6
7
8
9
10
11
namespace leveldb {
    class Slice;
    class FilterPolicy {
        public:
            virtual ~FilterPolicy();
            virtual const char* Name() const = 0;
            virtual void CreateFilter(const Slice* keys, int n, std::string* dst) const = 0;
            virtual bool KeyMayMatch(const Slice& key, const Slice& filter) const = 0;
    };
    extern const FilterPolicy* NewBloomFilterPolicy(int bits_per_key);
}

FilterPolicy提供了三个接口,Name()返回具体实现的过滤器的名字。CreateFilter这个接口有两个参数,第一个参数是个Slice数组(Slice是一个简单的string实现,其中有两个成员变量,即size_t size_const char* data_。之所以不直接用std::string,是为了在一些Slice复制的场景中,直接通过传递data_指针避免数据拷贝),第二个参数是Slice数组的size。这个函数把这个Slice数组中的所有key的信息以某种方式写入到dst中。那么写入到dst中有什么用呢?这当然就是用于给第三个接口KeyMayMatch啦。第三个接口是检验一个Slice是否在CreateFilter的第一个参数数组中的。如果key是在CreateFilter中的数组中,那么就返回true,如果不在,那么可能返回true(误判),也可能返回false。Filter的作用就是:如果返回false,那么key就一定不在CreateFilter的keys中。这个特性在查找中非常有助于提高速度。关于bloomFilter,这里就不细说了,如果想了解详细情况,请看这里util/bloom.cc是FilterPolicy接口的一个具体实现,这里面的代码是写得很漂亮的,对误判率、hash函数等的选取都有讨论,从作者的注释中也能看出是参考了一些布隆过滤器在数学上的讨论的。读这段代码时,要注意作者把hash函数(private: size_t k_;)的数量写在dst的最后一个字节中,如果过滤器涉及到了扩容,那么就重新把k_写到新的过滤器结尾,而原来存放k_的byte则用于布隆过滤器存储bit值。关于这部分代码,这里就不贴出来了,有兴趣的同学可自行阅读之。