和我一起学习leveldb [5 util(续)]
#[日志与比较器]
这一节我们分析logging和comparator.
util中的logging是用于为leveldb操作commitlog提供一些基础方法。commitlog是用于保证消息在刷盘前宕机的情形下,对未持久化的消息进行恢复的模块。而include/leveldb/env.h和util/posix_logger.h中定义和实现的Logger则是打印可读信息的日志类. util/logging.h中为logging声明了几种方法:
1
2
3
4
5
6
7
8
9
10
11
12
/* 在str后面添加数字 */
extern void AppendNumberTo(std::string* str, uint64_t num);
/* 在str后面添加可见字符串value */
extern void AppendEscapedStringTo(std::string* str, const Slice& value);
/* 数字转字符串 */
extern std::string NumberToString(uint64_t num);
/* 清除value中的不可见字符 */
extern std::string EscapeString(const Slice& value);
/* 如果in的第一个字符是第二个参数c,则in指针前进一步并返回true */
extern bool ConsumeChar(Slice* in, char c);
/* 消费in的前缀数字 */
extern bool ConsumeDecimalNumber(Slice* in, uint64_t* val);
这几个函数的意思都比较直观,具体的实现在util/logger.cc中,这里贴一下AppendEscapedStringTo函数的实现,注意可见字符的范围是:‘ ’ 到 ‘~’
1
2
3
4
5
6
7
8
9
10
11
12
13
void AppendEscapedStringTo(std::string* str, const Slice& value) {
for (size_t i = 0; i < value.size(); i++) {
char c = value[i];
if (c >= ' ' && c <= '~') {
str->push_back(c);
} else {
char buf[10];
snprintf(buf, sizeof(buf), "\\x%02x",
static_cast<unsigned int>(c) & 0xff);
str->append(buf);
}
}
}
关于commitlog,会在后面分析db目录的篇幅中专门分析。接下来看util/posix_logger.c,可读日志Logger在include/leveldb/env.h中有定义:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
// An interface for writing log messages.
class Logger {
public:
Logger() { }
virtual ~Logger();
// Write an entry to the log file with the specified format.
virtual void Logv(const char* format, va_list ap) = 0;
private:
// No copying allowed
Logger(const Logger&);
void operator=(const Logger&);
};
这个Logger类提供了一个接口Logv供实现,我们来看看util/posix_logger.h中对该接口的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
virtual void Logv(const char* format, va_list ap) {
const uint64_t thread_id = (*gettid_)();
// We try twice: the first time with a fixed-size stack allocated buffer,
// and the second time with a much larger dynamically allocated buffer.
char buffer[500];
for (int iter = 0; iter < 2; iter++) {
char* base;
int bufsize;
if (iter == 0) {
bufsize = sizeof(buffer);
base = buffer;
} else {
bufsize = 30000;
base = new char[bufsize];
}
char* p = base;
char* limit = base + bufsize;
struct timeval now_tv;
gettimeofday(&now_tv, NULL);
const time_t seconds = now_tv.tv_sec;
struct tm t;
localtime_r(&seconds, &t);
p += snprintf(p, limit - p,
"%04d/%02d/%02d-%02d:%02d:%02d.%06d %llx ",
t.tm_year + 1900,
t.tm_mon + 1,
t.tm_mday,
t.tm_hour,
t.tm_min,
t.tm_sec,
static_cast<int>(now_tv.tv_usec),
static_cast<long long unsigned int>(thread_id));
// Print the message
if (p < limit) {
va_list backup_ap;
va_copy(backup_ap, ap);
p += vsnprintf(p, limit - p, format, backup_ap);
va_end(backup_ap);
}
// Truncate to available space if necessary
if (p >= limit) {
if (iter == 0) {
continue; // Try again with larger buffer
} else {
p = limit - 1;
}
}
// Add newline if necessary
if (p == base || p[-1] != '\n') {
*p++ = '\n';
}
assert(p <= limit);
fwrite(base, 1, p - base, file_);
fflush(file_);
if (base != buffer) {
delete[] base;
}
break;
}
}
个人觉得这段代码非常值得学习。首先是考虑了内存分配的情况。通常看到的日志实现中,要么是在栈中设定一个定长缓冲区,然后设定一个日志最大长度,以此来避免内存分配;要么是直接分配一大块内存来放日志字符串。这里作者使用了两级“内存分配”,首先在栈中分配500个byte的缓冲区,这样长度小于500字节的日志就可以避免掉new的开销。如果发现放不下,再去new一个大块内存。当然如果单行日志太大,超过了30000字节,那么就直接做截断了。另外请注意这里对snprintf函数的使用,作者使用了一个哨兵,来避免越界(关于snprintf,请参考我的博客误用snprintf导致的bug).
接下来我们来讨论Comparator,相关代码在include/leveldb/comparator.h和util/comparator.cc:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
namespace leveldb {
class Slice;
class Comparator {
public:
virtual ~Comparator();
virtual int Compare(const Slice& a, const Slice& b) const = 0;
virtual const char* Name() const = 0;
virtual void FindShortestSeparator(
std::string* start,
const Slice& limit) const = 0;
virtual void FindShortSuccessor(std::string* key) const = 0;
};
extern const Comparator* BytewiseComparator();
}
Comparator当然要有Compare接口啦,leveldb的comparator另外还有两个比较奇怪的接口: FindShortestSeparator和FindShortSuccessor.这两个是高级函数,用于节省空间。其中FindShortestSeparator(std::string *start, const Slice& limit)
的作用是,如果start < limit,就把start修改为*start和limit的共同前缀后面多一个字符加1。这句话不容易说清楚,我举个例子:
*start: helloWorld
limit: helloZookeeper
由于 *start < limit, 所以调用 FindShortSuccessor(start, limit)之后,start变成:
helloX (保留前缀,第一个不相同的字符+1)
而FindShortSuccessor(std::string *key)
,则是把key修改成一个比key大一点短字符串。在BytewiseComparator中的实现非常简单,就是顺着key的第一个字符找,直到找到一个不是0xFF的字符,然后把该字符加1,并截断。从这两个函数我们可以看出,leveldb在存储中,对前缀相同的字符串做了一些工作,这样就可以存储多个串的公共前缀,节省存储空间。接下来把Comparator的一个具体实现:BytewiseComparator
中的这两个高级函数的代码贴出来作为本篇的完结。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
virtual void FindShortestSeparator(std::string* start, const Slice& limit) const {
// Find length of common prefix
size_t min_length = std::min(start->size(), limit.size());
size_t diff_index = 0;
while ((diff_index < min_length) &&
((*start)[diff_index] == limit[diff_index])) {
diff_index++;
}
if (diff_index >= min_length) {
// Do not shorten if one string is a prefix of the other
} else {
uint8_t diff_byte = static_cast<uint8_t>((*start)[diff_index]);
if (diff_byte < static_cast<uint8_t>(0xff) &&
diff_byte + 1 < static_cast<uint8_t>(limit[diff_index])) {
(*start)[diff_index]++;
start->resize(diff_index + 1);
assert(Compare(*start, limit) < 0);
}
}
}
virtual void FindShortSuccessor(std::string* key) const {
// Find first character that can be incremented
size_t n = key->size();
for (size_t i = 0; i < n; i++) {
const uint8_t byte = (*key)[i];
if (byte != static_cast<uint8_t>(0xff)) {
(*key)[i] = byte + 1;
key->resize(i+1);
return;
}
}
// *key is a run of 0xffs. Leave it alone.
}