#[日志与比较器]

这一节我们分析loggingcomparator.

util中的logging是用于为leveldb操作commitlog提供一些基础方法。commitlog是用于保证消息在刷盘前宕机的情形下,对未持久化的消息进行恢复的模块。而include/leveldb/env.hutil/posix_logger.h中定义和实现的Logger则是打印可读信息的日志类. util/logging.h中为logging声明了几种方法:

1
2
3
4
5
6
7
8
9
10
11
12
/* 在str后面添加数字 */
extern void AppendNumberTo(std::string* str, uint64_t num);
/* 在str后面添加可见字符串value */
extern void AppendEscapedStringTo(std::string* str, const Slice& value);
/* 数字转字符串 */
extern std::string NumberToString(uint64_t num);
/* 清除value中的不可见字符 */
extern std::string EscapeString(const Slice& value);
/* 如果in的第一个字符是第二个参数c,则in指针前进一步并返回true */
extern bool ConsumeChar(Slice* in, char c);
/* 消费in的前缀数字 */
extern bool ConsumeDecimalNumber(Slice* in, uint64_t* val);

这几个函数的意思都比较直观,具体的实现在util/logger.cc中,这里贴一下AppendEscapedStringTo函数的实现,注意可见字符的范围是:‘ ’‘~’

1
2
3
4
5
6
7
8
9
10
11
12
13
void AppendEscapedStringTo(std::string* str, const Slice& value) {
  for (size_t i = 0; i < value.size(); i++) {
    char c = value[i];
    if (c >= ' ' && c <= '~') {
      str->push_back(c);
    } else {
      char buf[10];
      snprintf(buf, sizeof(buf), "\\x%02x",
               static_cast<unsigned int>(c) & 0xff);
      str->append(buf);
    }
  }
}

关于commitlog,会在后面分析db目录的篇幅中专门分析。接下来看util/posix_logger.c,可读日志Logger在include/leveldb/env.h中有定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// An interface for writing log messages.
class Logger {
    public:
        Logger() { }
        virtual ~Logger();

        // Write an entry to the log file with the specified format.
        virtual void Logv(const char* format, va_list ap) = 0;

    private:
        // No copying allowed
        Logger(const Logger&);
        void operator=(const Logger&);
};

这个Logger类提供了一个接口Logv供实现,我们来看看util/posix_logger.h中对该接口的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
virtual void Logv(const char* format, va_list ap) {
    const uint64_t thread_id = (*gettid_)();

    // We try twice: the first time with a fixed-size stack allocated buffer,
    // and the second time with a much larger dynamically allocated buffer.
    char buffer[500];
    for (int iter = 0; iter < 2; iter++) {
        char* base;
        int bufsize;
        if (iter == 0) {
            bufsize = sizeof(buffer);
            base = buffer;
        } else {
            bufsize = 30000;
            base = new char[bufsize];
        }
        char* p = base;
        char* limit = base + bufsize;

        struct timeval now_tv;
        gettimeofday(&now_tv, NULL);
        const time_t seconds = now_tv.tv_sec;
        struct tm t;
        localtime_r(&seconds, &t);
        p += snprintf(p, limit - p,
                "%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
                t.tm_year + 1900,
                t.tm_mon + 1,
                t.tm_mday,
                t.tm_hour,
                t.tm_min,
                t.tm_sec,
                static_cast<int>(now_tv.tv_usec),
                static_cast<long long unsigned int>(thread_id));

        // Print the message
        if (p < limit) {
            va_list backup_ap;
            va_copy(backup_ap, ap);
            p += vsnprintf(p, limit - p, format, backup_ap);
            va_end(backup_ap);
        }

        // Truncate to available space if necessary
        if (p >= limit) {
            if (iter == 0) {
                continue;       // Try again with larger buffer
            } else {
                p = limit - 1;
            }
        }

        // Add newline if necessary
        if (p == base || p[-1] != '\n') {
            *p++ = '\n';
        }

        assert(p <= limit);
        fwrite(base, 1, p - base, file_);
        fflush(file_);
        if (base != buffer) {
            delete[] base;
        }
        break;
    }
}

个人觉得这段代码非常值得学习。首先是考虑了内存分配的情况。通常看到的日志实现中,要么是在栈中设定一个定长缓冲区,然后设定一个日志最大长度,以此来避免内存分配;要么是直接分配一大块内存来放日志字符串。这里作者使用了两级“内存分配”,首先在栈中分配500个byte的缓冲区,这样长度小于500字节的日志就可以避免掉new的开销。如果发现放不下,再去new一个大块内存。当然如果单行日志太大,超过了30000字节,那么就直接做截断了。另外请注意这里对snprintf函数的使用,作者使用了一个哨兵,来避免越界(关于snprintf,请参考我的博客误用snprintf导致的bug).

接下来我们来讨论Comparator,相关代码在include/leveldb/comparator.h和util/comparator.cc:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
namespace leveldb {
    class Slice;
    class Comparator {
        public:
            virtual ~Comparator();
            virtual int Compare(const Slice& a, const Slice& b) const = 0;
            virtual const char* Name() const = 0;
            virtual void FindShortestSeparator(
                    std::string* start,
                    const Slice& limit) const = 0;
            virtual void FindShortSuccessor(std::string* key) const = 0;
    };
    extern const Comparator* BytewiseComparator();
}

Comparator当然要有Compare接口啦,leveldb的comparator另外还有两个比较奇怪的接口: FindShortestSeparatorFindShortSuccessor.这两个是高级函数,用于节省空间。其中FindShortestSeparator(std::string *start, const Slice& limit)的作用是,如果start < limit,就把start修改为*start和limit的共同前缀后面多一个字符加1。这句话不容易说清楚,我举个例子:

 *start:    helloWorld
 limit:     helloZookeeper
 由于 *start < limit, 所以调用 FindShortSuccessor(start, limit)之后,start变成:
 helloX (保留前缀,第一个不相同的字符+1)

FindShortSuccessor(std::string *key),则是把key修改成一个比key大一点短字符串。在BytewiseComparator中的实现非常简单,就是顺着key的第一个字符找,直到找到一个不是0xFF的字符,然后把该字符加1,并截断。从这两个函数我们可以看出,leveldb在存储中,对前缀相同的字符串做了一些工作,这样就可以存储多个串的公共前缀,节省存储空间。接下来把Comparator的一个具体实现:BytewiseComparator中的这两个高级函数的代码贴出来作为本篇的完结。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
virtual void FindShortestSeparator(std::string* start, const Slice& limit) const {
    // Find length of common prefix
    size_t min_length = std::min(start->size(), limit.size());
    size_t diff_index = 0;
    while ((diff_index < min_length) &&
            ((*start)[diff_index] == limit[diff_index])) {
        diff_index++;
    }

    if (diff_index >= min_length) {
        // Do not shorten if one string is a prefix of the other
    } else {
        uint8_t diff_byte = static_cast<uint8_t>((*start)[diff_index]);
        if (diff_byte < static_cast<uint8_t>(0xff) &&
                diff_byte + 1 < static_cast<uint8_t>(limit[diff_index])) {
            (*start)[diff_index]++;
            start->resize(diff_index + 1);
            assert(Compare(*start, limit) < 0);
        }
    }
}

virtual void FindShortSuccessor(std::string* key) const {
    // Find first character that can be incremented
    size_t n = key->size();
    for (size_t i = 0; i < n; i++) {
        const uint8_t byte = (*key)[i];
        if (byte != static_cast<uint8_t>(0xff)) {
            (*key)[i] = byte + 1;
            key->resize(i+1);
            return;
        }
    }
    // *key is a run of 0xffs.  Leave it alone.
}