和我一起学习leveldb [6 util(续)]
#[运行时环境]
这一节我们来分析leveldb对运行时环境的封装:env。 env的头文件是include/leveldb/env.h,util/env.cc是env中的一些平台无关的基本操作的实现,util/env_posix.cc则是遵循Posix接口的PosixEnv的实现。另外,在helpers/memenv目录中,还实现了一个在纯内存环境中使用的InMemoryEnv,我猜测这是用于调试或者跑benchmark的。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class Env {
public:
Env() { }
virtual ~Env();
static Env* Default();
virtual Status NewSequentialFile(const std::string& fname,
SequentialFile** result) = 0;
virtual Status NewRandomAccessFile(const std::string& fname,
RandomAccessFile** result) = 0;
virtual Status NewWritableFile(const std::string& fname,
WritableFile** result) = 0;
virtual bool FileExists(const std::string& fname) = 0;
virtual Status GetChildren(const std::string& dir, // 获取目录dir的所有子节点
std::vector<std::string>* result) = 0;
virtual Status DeleteFile(const std::string& fname) = 0;
virtual Status CreateDir(const std::string& dirname) = 0;
virtual Status DeleteDir(const std::string& dirname) = 0;
virtual Status GetFileSize(const std::string& fname, uint64_t* file_size) = 0;
virtual Status RenameFile(const std::string& src,
const std::string& target) = 0;
// 这里的LockFile接口是try-lock
virtual Status LockFile(const std::string& fname, FileLock** lock) = 0;
virtual Status UnlockFile(FileLock* lock) = 0;
// 向线程池中提交一个任务
virtual void Schedule(
void (*function)(void* arg),
void* arg) = 0;
virtual void StartThread(void (*function)(void* arg), void* arg) = 0;
virtual Status GetTestDirectory(std::string* path) = 0;
virtual Status NewLogger(const std::string& fname, Logger** result) = 0;
// 获取当前毫秒数
virtual uint64_t NowMicros() = 0;
virtual void SleepForMicroseconds(int micros) = 0;
private:
// No copying allowed
Env(const Env&);
void operator=(const Env&);
}
class Env
中的成员函数命名很清晰地描述了其对应的接口功能。其中,在Env中需要实现如下的几个类:
1
2
3
4
5
6
7
8
namespace leveldb {
class FileLock;
class Logger;
class RandomAccessFile;
class SequentialFile;
class Slice;
class WritableFile;
};
其中,class FileLock
是用于对文件加锁的类,class Logger
是用于写日志的类,需要实现virtual void Logv(const char* format, va_list ap) = 0;
接口,leveldb实现了Posix环境下的日志工具PosixLogger
,我们已经在leveldb[5]中对其进行了分析。class RandomAccessFile
是对随机只读文件的封装,需要实现Read
接口,class SequentialFile
是对顺序读文件操作的封装,其中实现了接口Read
和Skip
。WritableFile
则是对可写文件的封装,需要实现的几个接口分别是:Append
,Close
,Flush
,Sync
。我们接下来看看这几个类的具体实现。
先来看class FileLock
,这个类的声明在include/leveldb/env.h中,在util/env_posix.cc中,有在posix环境下的具体实现PosixFileLock
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// file: include/leveldb/env.h
class FileLock {
public:
FileLock() { }
virtual ~FileLock();
private:
// No copying allowed
FileLock(const FileLock&);
void operator=(const FileLock&);
};
// Posix环境下的FileLock
// file: util/posix_env.cc
class PosixFileLock : public FileLock {
public:
int fd_;
};
// class PosixEnv 成员方法
virtual Status LockFile(const std::string& fname, FileLock** lock) {
*lock = NULL;
Status result;
int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
if (fd < 0) {
result = IOError(fname, errno);
} else if (LockOrUnlock(fd, true) == -1) {
result = IOError("lock " + fname, errno);
close(fd);
} else {
PosixFileLock* my_lock = new PosixFileLock;
my_lock->fd_ = fd;
*lock = my_lock;
}
return result;
}
virtual Status UnlockFile(FileLock* lock) {
PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
Status result;
if (LockOrUnlock(my_lock->fd_, false) == -1) {
result = IOError("unlock", errno);
}
close(my_lock->fd_);
delete my_lock;
return result;
}
我们看到PosixFileLock数据结构是被PosixEnv类用来实现LockFile和UnlockFile方法。PosixFileLock其实就是一个抽象出来的锁句柄,这个类里面只有一个成员变量:文件描述符fd。而在PosixEnv的LockFile和UnlockFile方法中,实现文件锁的核心代码就在函数LockOrUnlock
中:
1
2
3
4
5
6
7
8
9
10
static int LockOrUnlock(int fd, bool lock) {
errno = 0;
struct flock f;
memset(&f, 0, sizeof(f));
f.l_type = (lock ? F_WRLCK : F_UNLCK);
f.l_whence = SEEK_SET;
f.l_start = 0;
f.l_len = 0; // Lock/unlock entire file
return fcntl(fd, F_SETLK, &f);
}
这段代码是很简单的,直接调用Unix系统调用fcntl对文件进行加锁解锁操作。关于Unix文件锁,不清楚的同学们可以自行去查看APUE第二版的14.3节。
接下来我们去看class RandomAccessFile
,其声明在include/leveldb/env.h中:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// A file abstraction for randomly reading the contents of a file.
class RandomAccessFile {
public:
RandomAccessFile() { }
virtual ~RandomAccessFile();
// Read up to "n" bytes from the file starting at "offset".
// "scratch[0..n-1]" may be written by this routine. Sets "*result"
// to the data that was read (including if fewer than "n" bytes were
// successfully read). May set "*result" to point at data in
// "scratch[0..n-1]", so "scratch[0..n-1]" must be live when
// "*result" is used. If an error was encountered, returns a non-OK
// status.
//
// Safe for concurrent use by multiple threads.
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const = 0;
};
我们从注释中看到,这里的Read接口的实现需要是线程安全的。另外这个Read接口的参数设计得比较有意思,第三个参数是一个Slice指针,而第四个参数则是这个Slice指针指向的Slice中的字符串数组。这样设计的好处就是显式指定了存储数据的缓冲区,避免在函数内部进行内存分配,同时可以由调用者自己来管理Slice的内存,尽可能地减少不必要的数据拷贝。试想,如果直接传一个Slice指针进去,就无法自己去管理Slice的内存(因为无法知道Slice的内存是调用者分配的还是函数内部分配的),那么在很多场景下就无法避免不必要的数据拷贝,从这里也可以看出为什么作者要自己封装一个Slice数据结构,而不直接使用std::string,就是因为std::string不允许调用者直接去写它的字符串数组(注意std::string
提供的c_str()
方法的定义是const char *std::string::c_str() const
)
接下来去看看RandomAccessFile
的具体实现,在util/env_posix.cc中有一个NewRandomAccessFile
方法,我们从这里出发,去看看Posix环境下的RandomAccessFile的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
virtual Status NewRandomAccessFile(const std::string& fname,
RandomAccessFile** result) {
*result = NULL;
Status s;
int fd = open(fname.c_str(), O_RDONLY);
if (fd < 0) {
s = IOError(fname, errno);
} else if (sizeof(void*) >= 8) {
// Use mmap when virtual address-space is plentiful.
uint64_t size;
s = GetFileSize(fname, &size); // 这里的实现是直接调用stat获取文件属性,得到文件大小
if (s.ok()) {
void* base = mmap(NULL, size, PROT_READ, MAP_SHARED, fd, 0);
if (base != MAP_FAILED) {
*result = new PosixMmapReadableFile(fname, base, size);
} else {
s = IOError(fname, errno);
}
}
close(fd);
} else {
*result = new PosixRandomAccessFile(fname, fd);
}
return s;
}
注意这里的第8行,当sizeof(void *) >= 8
时,作者的注释是说虚拟内存充裕,于是使用PosixMmapReadableFile,PosixMmapReadableFile的实现是使用一块被mmap的内存,这样可以加快读的速度。当然,这里使用mmap的不足之处就是一下子把整个文件映射到了内存,如果实际需求只是读取文件中的一小块数据,这样做就会浪费很多内存。所以作者对sizeof(void *)做了判断。当sizeof(void *)的值是8的时候,说明运行环境是64位机。因此只在64位以上的机器环境下,认定虚拟地址充裕,才使用mmap来辅助随机读。这里插一句话,以前去面试过一家公司,面试我的人以前在阿里是P9,当我说我读过一些redis源码的时候,他问我32位redis和64位redis有什么不同。我当时懵了,不知道该怎么回答。后来请教我师父,他说这个应该跟物理内存不足的时候会使用到swap有关系。读了这里的源码,我突然间有点感悟,64位机的虚拟内存充裕,因此可能会在涉及磁盘io的情况下能够利用mmap等技术做更多的读写优化。
继续顺着代码看,看看PosixMmapReadableFile
和PosixRandomAccessFile
分别是怎么去实现的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// mmap() based random-access
class PosixMmapReadableFile: public RandomAccessFile {
private:
std::string filename_;
void* mmapped_region_;
size_t length_;
public:
// base[0,length-1] contains the mmapped contents of the file.
PosixMmapReadableFile(const std::string& fname, void* base, size_t length)
: filename_(fname), mmapped_region_(base), length_(length) { }
virtual ~PosixMmapReadableFile() { munmap(mmapped_region_, length_); }
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const {
Status s;
if (offset + n > length_) {
*result = Slice();
s = IOError(filename_, EINVAL);
} else {
*result = Slice(reinterpret_cast<char*>(mmapped_region_) + offset, n);
}
return s;
}
};
// pread() based random-access
class PosixRandomAccessFile: public RandomAccessFile {
private:
std::string filename_;
int fd_;
public:
PosixRandomAccessFile(const std::string& fname, int fd)
: filename_(fname), fd_(fd) { }
virtual ~PosixRandomAccessFile() { close(fd_); }
virtual Status Read(uint64_t offset, size_t n, Slice* result,
char* scratch) const {
Status s;
ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
*result = Slice(scratch, (r < 0) ? 0 : r);
if (r < 0) {
// An error: return a non-ok status
s = IOError(filename_, errno);
}
return s;
}
};
这两个类的实现都很简单,主要是注意一些细节问题。注意在NewRandomAccessFile
调用new PosixMmapReadableFile
之前,使用了mmap把文件映射到了虚拟内存,因此在PosixMmapReadableFile
的析构函数中需要调用munmap释放内存。同时由于使用了mmap映射了内存,并且这个类是用于随机读文件的,不涉及写操作,因此调用完mmap就可以把文件关闭了。因此关闭文件的操作在NewRandomAccessFile
中进行了,PosixMmapReadableFile
的析构文件中就不需要再关闭文件了。另外由于文件的内容已经映射进内存了,就不需要再进行不必要的拷贝,于是Read接口直接把Slice的data_指针指向映射的内存区域。而PosixRandomAccessFile
使用了pread来实现随机的线程安全读写(注意不能先使用seek再read,这样不是原子性的,不满足线程安全),另外在析构函数中需要去close文件。
现在来看class SequentialFile
,依然先读include/leveldb/SequentialFile中的声明:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// A file abstraction for reading sequentially through a file
class SequentialFile {
public:
SequentialFile() { }
virtual ~SequentialFile();
// Read up to "n" bytes from the file. "scratch[0..n-1]" may be
// written by this routine. Sets "*result" to the data that was
// read (including if fewer than "n" bytes were successfully read).
// May set "*result" to point at data in "scratch[0..n-1]", so
// "scratch[0..n-1]" must be live when "*result" is used.
// If an error was encountered, returns a non-OK status.
//
// REQUIRES: External synchronization
virtual Status Read(size_t n, Slice* result, char* scratch) = 0;
// Skip "n" bytes from the file. This is guaranteed to be no
// slower that reading the same data, but may be faster.
//
// If end of file is reached, skipping will stop at the end of the
// file, and Skip will return OK.
//
// REQUIRES: External synchronization
virtual Status Skip(uint64_t n) = 0;
};
从注释中看到,这是一个顺序读文件的抽象,提供了两个接口,一个是Read,从当前位置读n字节,另一个操作是跳过n字节,这个操作保证不比Read慢。另外,注意注释中的REQUIRES,这是说调用者需要保证对这两个接口的调用是同步的。然后我们去util/posix_env.cc看看PosixEnv下对这两个接口的实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
virtual Status NewSequentialFile(const std::string& fname,
SequentialFile** result) {
FILE* f = fopen(fname.c_str(), "r");
if (f == NULL) {
*result = NULL;
return IOError(fname, errno);
} else {
*result = new PosixSequentialFile(fname, f);
return Status::OK();
}
}
class PosixSequentialFile: public SequentialFile {
private:
std::string filename_;
FILE* file_;
public:
PosixSequentialFile(const std::string& fname, FILE* f)
: filename_(fname), file_(f) { }
virtual ~PosixSequentialFile() { fclose(file_); }
virtual Status Read(size_t n, Slice* result, char* scratch) {
Status s;
// 使用fread_unlocked是因为在函数之外对文件访问已经做了同步(调用者保证)
size_t r = fread_unlocked(scratch, 1, n, file_);
*result = Slice(scratch, r);
if (r < n) {
if (feof(file_)) {
// We leave status as ok if we hit the end of the file
} else {
// A partial read with an error: return a non-ok status
s = IOError(filename_, errno);
}
}
return s;
}
virtual Status Skip(uint64_t n) {
if (fseek(file_, n, SEEK_CUR)) {
return IOError(filename_, errno);
}
return Status::OK();
}
};
这段代码中唯一一个可能造成疑惑的点就是第26行,这里是因为Read接口和Skip接口需要调用者自己去保证同步(见声明中的注释),所以这里可以调用fread_unlocked
代替fread
用以提高性能。
接下来我们来看class WritableFile
在include/leveldb/env.h中的声明:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// A file abstraction for sequential writing. The implementation
// must provide buffering since callers may append small fragments
// at a time to the file.
class WritableFile {
public:
WritableFile() { }
virtual ~WritableFile();
virtual Status Append(const Slice& data) = 0;
virtual Status Close() = 0;
virtual Status Flush() = 0;
virtual Status Sync() = 0;
private:
// No copying allowed
WritableFile(const WritableFile&);
void operator=(const WritableFile&);
};
class WritableFile
需要实现接口Append, Close, Flush, Sync
,在util/env_posix.cc中,对WritableFile的实现是PosixMmapFile
:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// We preallocate up to an extra megabyte and use memcpy to append new
// data to the file. This is safe since we either properly close the
// file before reading from it, or for log files, the reading code
// knows enough to skip zero suffixes.
class PosixMmapFile : public WritableFile {
private:
std::string filename_;
int fd_;
size_t page_size_;
size_t map_size_; // How much extra memory to map at a time
char* base_; // The mapped region
char* limit_; // Limit of the mapped region
char* dst_; // Where to write next (in range [base_,limit_])
char* last_sync_; // Where have we synced up to
uint64_t file_offset_; // Offset of base_ in file
// Have we done an munmap of unsynced data?
bool pending_sync_;
// Roundup x to a multiple of y
static size_t Roundup(size_t x, size_t y) {
return ((x + y - 1) / y) * y;
}
size_t TruncateToPageBoundary(size_t s) {
s -= (s & (page_size_ - 1));
assert((s % page_size_) == 0);
return s;
}
bool UnmapCurrentRegion() {
bool result = true;
if (base_ != NULL) {
if (last_sync_ < limit_) {
// Defer syncing this data until next Sync() call, if any
pending_sync_ = true;
}
if (munmap(base_, limit_ - base_) != 0) {
result = false;
}
file_offset_ += limit_ - base_;
base_ = NULL;
limit_ = NULL;
last_sync_ = NULL;
dst_ = NULL;
// Increase the amount we map the next time, but capped at 1MB
if (map_size_ < (1<<20)) {
map_size_ *= 2;
}
}
return result;
}
bool MapNewRegion() {
assert(base_ == NULL);
if (ftruncate(fd_, file_offset_ + map_size_) < 0) {
return false;
}
void* ptr = mmap(NULL, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED,
fd_, file_offset_);
if (ptr == MAP_FAILED) {
return false;
}
base_ = reinterpret_cast<char*>(ptr);
limit_ = base_ + map_size_;
dst_ = base_;
last_sync_ = base_;
return true;
}
PosixMmapFile
的实现原理是通过mmap按pagesize映射文件,通过memcpy到映射的内存,实现文件的高效追加写。我们顺着代码来分析它的成员函数。第21行,辅助函数Roundup
是用来实现对齐,这是因为mmap映射的内存要求是按页对齐的,所以如果要映射一块大小为x的内存,就应该使用 x = Roundup(x, pagesize)
这样的代码来保证按页面对齐(假设某个操作系统的页大小为256,如果希望映射文件中一段4000字节的内容到内存,则应该调用:Roundup(4000, 256) = 4096)。
TruncateToPageBoundary(size_t s)
是获取地址s所在的页基址。注意这个函数实现的代码为s -= (s & (page_size_ - 1));
,由于page_size_
一定是2的N次幂,所以page_size_ - 1
的二进制表示正好是000…111这样的形式,因此这个算式就把s置为其所在页的页基址。而MapNewRegion
和UnmapCurrentRegion
这两个函数顾名思义,就是把顺着文件的末尾,map一块新的区域到内存和unmap当前的区域。对这两个函数,只需要搞懂ftruncate
, mmap
和munmap
这三个系统函数的作用和各个参数的含义,就能看懂其逻辑,因此这里也不赘述了。接下来分析PosixMmapFile
的public函数。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
PosixMmapFile(const std::string& fname, int fd, size_t page_size)
: filename_(fname),
fd_(fd),
page_size_(page_size),
map_size_(Roundup(65536, page_size)),
base_(NULL),
limit_(NULL),
dst_(NULL),
last_sync_(NULL),
file_offset_(0),
pending_sync_(false) {
assert((page_size & (page_size - 1)) == 0);
}
~PosixMmapFile() {
if (fd_ >= 0) {
PosixMmapFile::Close();
}
}
virtual Status Close() {
Status s;
size_t unused = limit_ - dst_;
if (!UnmapCurrentRegion()) {
s = IOError(filename_, errno);
} else if (unused > 0) {
// Trim the extra space at the end of the file
if (ftruncate(fd_, file_offset_ - unused) < 0) {
s = IOError(filename_, errno);
}
}
if (close(fd_) < 0) {
if (s.ok()) {
s = IOError(filename_, errno);
}
}
fd_ = -1;
base_ = NULL;
limit_ = NULL;
return s;
}
构造函数主要是初始化成员变量,其中map_size_初始化为Roundup(65536, page_size)
,65536是2的16次方,而通常来说,page_size是4096(2的12次方),Roundup(65536, 4096) == 65536,因此在普通的服务器上,是一次映射16个页面。回顾我们前面对TruncateToPageBoundary
的分析,其中谈到page_size_成员变量的值必须是2的整数次幂,在构造函数体中,代码assert((page_size & (page_size - 1)) == 0);
保证了page_size必须是2的整数次幂。析构函数调用了PosixMmapFile::Close()
,我们顺着调用栈来看PosixMmapFile的Close方法,Close方法首先调用UnmapCurrentRegion
,取消内存映射,归还虚拟内存。然后判断映射的内存是否还有可用的(注意mmap的内存是在文件的末尾),如果还有可用的,说明文件末尾有文件空洞,因此要使用ftruncate调整文件的大小。之后再调用系统调用close
,并把相关的成员变量赋为-1或者NULL,完成清理工作。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
virtual Status Append(const Slice& data) {
const char* src = data.data();
size_t left = data.size();
while (left > 0) {
assert(base_ <= dst_);
assert(dst_ <= limit_);
size_t avail = limit_ - dst_;
if (avail == 0) {
if (!UnmapCurrentRegion() ||
!MapNewRegion()) {
return IOError(filename_, errno);
}
}
size_t n = (left <= avail) ? left : avail;
memcpy(dst_, src, n);
dst_ += n;
src += n;
left -= n;
}
return Status::OK();
}
virtual Status Sync() {
Status s;
if (pending_sync_) {
// Some unmapped data was not synced
pending_sync_ = false;
if (fdatasync(fd_) < 0) {
s = IOError(filename_, errno);
}
}
if (dst_ > last_sync_) {
// Find the beginnings of the pages that contain the first and last
// bytes to be synced.
size_t p1 = TruncateToPageBoundary(last_sync_ - base_);
size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1);
last_sync_ = dst_;
if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) {
s = IOError(filename_, errno);
}
}
return s;
}
PosixMmapFile
还剩下两个成员函数,Append
和Sync
。这两个函数的逻辑都很简单。Append
是顺着映射的内存写(使用memcpy完成写操作),如果写到了映射的边界,就先UnmapCurrentRegion
,再调用MapNewRegion
函数在文件的末尾再映射一块新的内存继续写,直至写完。Sync
函数完成同步操作。首先判断是否pending_sync_
是否为true,如果为true,说明在上次UnmapRegion之时,还没有做过fdatasync操作(munmap的时候,OS会把unmap的内存中的内容刷进文件,但是OS默认是异步刷盘,因此这里调用fdatasync完成同步)。然后判断当前写入点和当前同步点的位置,如果当前写入点的位置在当前同步点之后,那么需要把同步点和写入点之间这一段未同步的数据进行同步。这里调用msync
完成同步(注意这里调用msync的最后一个参数是MS_SYNC,因此是进行同步刷盘的),而成员函数TruncateToPageBoundary
就在此发挥作用,用于计算内存地址的页边界,因为msync
是以页为单位进行同步的。这里附带说两句,被mmap的内存,在被写入或者修改数据之后,要直到munmap时才会被异步刷盘,如果需要立即同步,就需要调用msync
实现,另外在这里完成磁盘数据同步作者调用了fdatasync
,有兴趣的同学可以去man一下fsync和fdatasync,这两个函数是有区别的,fdatasync性能更高,因为它不会去同步文件的一些元数据信息。另外提一下,操作系统的写文件操作,默认都是异步刷盘,也就是写磁盘操作由进程提交给操作系统之后,就立即返回,而不是等到操作系统真正把数据写入到磁盘之后。异步刷盘的速度比同步刷盘快上千倍,当然其缺点是如果在进程把数据提交到操作系统后,系统突然宕机,那么这些数据就不会被真正写到磁盘上,而进程却以为数据已经成功写入了。当然这种情况只有在宕机才会出现,如果进程崩溃,是不会出现数据丢失的(因为这时候进程已经把写盘请求提交给OS了)。要确保宕机数据不丢,就需要使用同步刷盘(在posix系统中,同步刷盘的方式是在写操作返回之前调用fsync(...)
或 fdatasync(...)
或 msync(..., MS_SYNC)
)。在leveldb的实现中,在性能和数据安全上做出了折衷,也就是每过一段时间做一次同步刷盘操作。
至此,本篇开始提到的Env中需要实现的几个类已经全部分析完。最后还要说一下PosixEnv中实现的一个做任务的后台线程池,这里面使用std::deque
实现一个双向队列,Schedule
实现了把一个任务进队的操作,BGThread
则是不停的从队列中取任务来做,这使用的是APUE中论述的经典队列模型,我这里把具体实现的代码贴出来,就不再多费口舌分析了。看不懂的同学请参考APUE第二版11.6节程序清单11-9。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
void PthreadCall(const char* label, int result) {
if (result != 0) {
fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
exit(1);
}
}
PosixEnv::PosixEnv() : page_size_(getpagesize()),
started_bgthread_(false) {
PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL));
PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL));
}
void PosixEnv::Schedule(void (*function)(void*), void* arg) {
PthreadCall("lock", pthread_mutex_lock(&mu_));
// Start background thread if necessary
if (!started_bgthread_) {
started_bgthread_ = true;
PthreadCall(
"create thread",
pthread_create(&bgthread_, NULL, &PosixEnv::BGThreadWrapper, this));
}
// If the queue is currently empty, the background thread may currently be
// waiting.
if (queue_.empty()) {
PthreadCall("signal", pthread_cond_signal(&bgsignal_));
}
// Add to priority queue
queue_.push_back(BGItem());
queue_.back().function = function;
queue_.back().arg = arg;
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}
void PosixEnv::BGThread() {
while (true) {
// Wait until there is an item that is ready to run
PthreadCall("lock", pthread_mutex_lock(&mu_));
while (queue_.empty()) {
PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
}
void (*function)(void*) = queue_.front().function;
void* arg = queue_.front().arg;
queue_.pop_front();
PthreadCall("unlock", pthread_mutex_unlock(&mu_));
(*function)(arg);
}
}
至此,util中与env相关的部分就分析完了,这一篇的内容很长,我自己在写这一篇的过程中,查阅了很多资料,也学习了不少系统知识,受益匪浅,同时也难免会有差错,如果有什么地方写得不对或者不够清楚,请读者们不吝赐教。下一篇,我们将分析leveldb实现的cache,同时也将是util部分的最后一篇。