#[运行时环境]

这一节我们来分析leveldb对运行时环境的封装:envenv的头文件是include/leveldb/env.hutil/env.cc是env中的一些平台无关的基本操作的实现,util/env_posix.cc则是遵循Posix接口的PosixEnv的实现。另外,在helpers/memenv目录中,还实现了一个在纯内存环境中使用的InMemoryEnv,我猜测这是用于调试或者跑benchmark的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class Env {
    public:
        Env() { }
        virtual ~Env();
        static Env* Default();

        virtual Status NewSequentialFile(const std::string& fname,
                SequentialFile** result) = 0;
        virtual Status NewRandomAccessFile(const std::string& fname,
                RandomAccessFile** result) = 0;
        virtual Status NewWritableFile(const std::string& fname,
                WritableFile** result) = 0;

        virtual bool FileExists(const std::string& fname) = 0;
        virtual Status GetChildren(const std::string& dir,    // 获取目录dir的所有子节点
                std::vector<std::string>* result) = 0;
        virtual Status DeleteFile(const std::string& fname) = 0;
        virtual Status CreateDir(const std::string& dirname) = 0;
        virtual Status DeleteDir(const std::string& dirname) = 0;
        virtual Status GetFileSize(const std::string& fname, uint64_t* file_size) = 0;
        virtual Status RenameFile(const std::string& src,
                const std::string& target) = 0;

        // 这里的LockFile接口是try-lock
        virtual Status LockFile(const std::string& fname, FileLock** lock) = 0;
        virtual Status UnlockFile(FileLock* lock) = 0;

        // 向线程池中提交一个任务
        virtual void Schedule(
                void (*function)(void* arg),
                void* arg) = 0;
        virtual void StartThread(void (*function)(void* arg), void* arg) = 0;

        virtual Status GetTestDirectory(std::string* path) = 0;

        virtual Status NewLogger(const std::string& fname, Logger** result) = 0;

        // 获取当前毫秒数
        virtual uint64_t NowMicros() = 0;
        virtual void SleepForMicroseconds(int micros) = 0;

    private:
        // No copying allowed
        Env(const Env&);
        void operator=(const Env&);
}

class Env中的成员函数命名很清晰地描述了其对应的接口功能。其中,在Env中需要实现如下的几个类:

1
2
3
4
5
6
7
8
namespace leveldb {
    class FileLock;
    class Logger;
    class RandomAccessFile;
    class SequentialFile;
    class Slice;
    class WritableFile;
};

其中,class FileLock是用于对文件加锁的类,class Logger是用于写日志的类,需要实现virtual void Logv(const char* format, va_list ap) = 0;接口,leveldb实现了Posix环境下的日志工具PosixLogger,我们已经在leveldb[5]中对其进行了分析。class RandomAccessFile是对随机只读文件的封装,需要实现Read接口,class SequentialFile是对顺序读文件操作的封装,其中实现了接口ReadSkipWritableFile 则是对可写文件的封装,需要实现的几个接口分别是:Append,Close,Flush,Sync。我们接下来看看这几个类的具体实现。

先来看class FileLock,这个类的声明在include/leveldb/env.h中,在util/env_posix.cc中,有在posix环境下的具体实现PosixFileLock:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// file: include/leveldb/env.h
class FileLock {
 public:
  FileLock() { }
  virtual ~FileLock();
 private:
  // No copying allowed
  FileLock(const FileLock&);
  void operator=(const FileLock&);
};

// Posix环境下的FileLock
// file: util/posix_env.cc
class PosixFileLock : public FileLock {
 public:
  int fd_;
};

// class PosixEnv 成员方法
virtual Status LockFile(const std::string& fname, FileLock** lock) {
  *lock = NULL;
  Status result;
  int fd = open(fname.c_str(), O_RDWR | O_CREAT, 0644);
  if (fd < 0) {
    result = IOError(fname, errno);
  } else if (LockOrUnlock(fd, true) == -1) {
    result = IOError("lock " + fname, errno);
    close(fd);
  } else {
    PosixFileLock* my_lock = new PosixFileLock;
    my_lock->fd_ = fd;
    *lock = my_lock;
  }
  return result;
}

virtual Status UnlockFile(FileLock* lock) {
  PosixFileLock* my_lock = reinterpret_cast<PosixFileLock*>(lock);
  Status result;
  if (LockOrUnlock(my_lock->fd_, false) == -1) {
    result = IOError("unlock", errno);
  }
  close(my_lock->fd_);
  delete my_lock;
  return result;
}

我们看到PosixFileLock数据结构是被PosixEnv类用来实现LockFile和UnlockFile方法。PosixFileLock其实就是一个抽象出来的锁句柄,这个类里面只有一个成员变量:文件描述符fd。而在PosixEnv的LockFile和UnlockFile方法中,实现文件锁的核心代码就在函数LockOrUnlock中:

1
2
3
4
5
6
7
8
9
10
static int LockOrUnlock(int fd, bool lock) {
  errno = 0;
  struct flock f;
  memset(&f, 0, sizeof(f));
  f.l_type = (lock ? F_WRLCK : F_UNLCK);
  f.l_whence = SEEK_SET;
  f.l_start = 0;
  f.l_len = 0;        // Lock/unlock entire file
  return fcntl(fd, F_SETLK, &f);
}

这段代码是很简单的,直接调用Unix系统调用fcntl对文件进行加锁解锁操作。关于Unix文件锁,不清楚的同学们可以自行去查看APUE第二版的14.3节。

接下来我们去看class RandomAccessFile,其声明在include/leveldb/env.h中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// A file abstraction for randomly reading the contents of a file.
class RandomAccessFile {
 public:
  RandomAccessFile() { }
  virtual ~RandomAccessFile();

  // Read up to "n" bytes from the file starting at "offset".
  // "scratch[0..n-1]" may be written by this routine.  Sets "*result"
  // to the data that was read (including if fewer than "n" bytes were
  // successfully read).  May set "*result" to point at data in
  // "scratch[0..n-1]", so "scratch[0..n-1]" must be live when
  // "*result" is used.  If an error was encountered, returns a non-OK
  // status.
  //
  // Safe for concurrent use by multiple threads.
  virtual Status Read(uint64_t offset, size_t n, Slice* result,
                      char* scratch) const = 0;
};

我们从注释中看到,这里的Read接口的实现需要是线程安全的。另外这个Read接口的参数设计得比较有意思,第三个参数是一个Slice指针,而第四个参数则是这个Slice指针指向的Slice中的字符串数组。这样设计的好处就是显式指定了存储数据的缓冲区,避免在函数内部进行内存分配,同时可以由调用者自己来管理Slice的内存,尽可能地减少不必要的数据拷贝。试想,如果直接传一个Slice指针进去,就无法自己去管理Slice的内存(因为无法知道Slice的内存是调用者分配的还是函数内部分配的),那么在很多场景下就无法避免不必要的数据拷贝,从这里也可以看出为什么作者要自己封装一个Slice数据结构,而不直接使用std::string,就是因为std::string不允许调用者直接去写它的字符串数组(注意std::string提供的c_str()方法的定义是const char *std::string::c_str() const

接下来去看看RandomAccessFile的具体实现,在util/env_posix.cc中有一个NewRandomAccessFile方法,我们从这里出发,去看看Posix环境下的RandomAccessFile的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
virtual Status NewRandomAccessFile(const std::string& fname,
                                   RandomAccessFile** result) {
  *result = NULL;
  Status s;
  int fd = open(fname.c_str(), O_RDONLY);
  if (fd < 0) {
    s = IOError(fname, errno);
  } else if (sizeof(void*) >= 8) {
    // Use mmap when virtual address-space is plentiful.
    uint64_t size;
    s = GetFileSize(fname, &size); // 这里的实现是直接调用stat获取文件属性,得到文件大小
    if (s.ok()) {
      void* base = mmap(NULL, size, PROT_READ, MAP_SHARED, fd, 0);
      if (base != MAP_FAILED) {
        *result = new PosixMmapReadableFile(fname, base, size);
      } else {
        s = IOError(fname, errno);
      }
    }
    close(fd);
  } else {
    *result = new PosixRandomAccessFile(fname, fd);
  }
  return s;
}

注意这里的第8行,当sizeof(void *) >= 8时,作者的注释是说虚拟内存充裕,于是使用PosixMmapReadableFile,PosixMmapReadableFile的实现是使用一块被mmap的内存,这样可以加快读的速度。当然,这里使用mmap的不足之处就是一下子把整个文件映射到了内存,如果实际需求只是读取文件中的一小块数据,这样做就会浪费很多内存。所以作者对sizeof(void *)做了判断。当sizeof(void *)的值是8的时候,说明运行环境是64位机。因此只在64位以上的机器环境下,认定虚拟地址充裕,才使用mmap来辅助随机读。这里插一句话,以前去面试过一家公司,面试我的人以前在阿里是P9,当我说我读过一些redis源码的时候,他问我32位redis和64位redis有什么不同。我当时懵了,不知道该怎么回答。后来请教我师父,他说这个应该跟物理内存不足的时候会使用到swap有关系。读了这里的源码,我突然间有点感悟,64位机的虚拟内存充裕,因此可能会在涉及磁盘io的情况下能够利用mmap等技术做更多的读写优化。

继续顺着代码看,看看PosixMmapReadableFilePosixRandomAccessFile分别是怎么去实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// mmap() based random-access
class PosixMmapReadableFile: public RandomAccessFile {
 private:
  std::string filename_;
  void* mmapped_region_;
  size_t length_;

 public:
  // base[0,length-1] contains the mmapped contents of the file.
  PosixMmapReadableFile(const std::string& fname, void* base, size_t length)
      : filename_(fname), mmapped_region_(base), length_(length) { }
  virtual ~PosixMmapReadableFile() { munmap(mmapped_region_, length_); }

  virtual Status Read(uint64_t offset, size_t n, Slice* result,
                      char* scratch) const {
    Status s;
    if (offset + n > length_) {
      *result = Slice();
      s = IOError(filename_, EINVAL);
    } else {
      *result = Slice(reinterpret_cast<char*>(mmapped_region_) + offset, n);
    }
    return s;
  }
};

// pread() based random-access
class PosixRandomAccessFile: public RandomAccessFile {
 private:
  std::string filename_;
  int fd_;

 public:
  PosixRandomAccessFile(const std::string& fname, int fd)
      : filename_(fname), fd_(fd) { }
  virtual ~PosixRandomAccessFile() { close(fd_); }

  virtual Status Read(uint64_t offset, size_t n, Slice* result,
                      char* scratch) const {
    Status s;
    ssize_t r = pread(fd_, scratch, n, static_cast<off_t>(offset));
    *result = Slice(scratch, (r < 0) ? 0 : r);
    if (r < 0) {
      // An error: return a non-ok status
      s = IOError(filename_, errno);
    }
    return s;
  }
};

这两个类的实现都很简单,主要是注意一些细节问题。注意在NewRandomAccessFile调用new PosixMmapReadableFile之前,使用了mmap把文件映射到了虚拟内存,因此在PosixMmapReadableFile的析构函数中需要调用munmap释放内存。同时由于使用了mmap映射了内存,并且这个类是用于随机读文件的,不涉及写操作,因此调用完mmap就可以把文件关闭了。因此关闭文件的操作在NewRandomAccessFile中进行了,PosixMmapReadableFile的析构文件中就不需要再关闭文件了。另外由于文件的内容已经映射进内存了,就不需要再进行不必要的拷贝,于是Read接口直接把Slice的data_指针指向映射的内存区域。而PosixRandomAccessFile使用了pread来实现随机的线程安全读写(注意不能先使用seek再read,这样不是原子性的,不满足线程安全),另外在析构函数中需要去close文件。

现在来看class SequentialFile,依然先读include/leveldb/SequentialFile中的声明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// A file abstraction for reading sequentially through a file
class SequentialFile {
 public:
  SequentialFile() { }
  virtual ~SequentialFile();

  // Read up to "n" bytes from the file.  "scratch[0..n-1]" may be
  // written by this routine.  Sets "*result" to the data that was
  // read (including if fewer than "n" bytes were successfully read).
  // May set "*result" to point at data in "scratch[0..n-1]", so
  // "scratch[0..n-1]" must be live when "*result" is used.
  // If an error was encountered, returns a non-OK status.
  //
  // REQUIRES: External synchronization
  virtual Status Read(size_t n, Slice* result, char* scratch) = 0;

  // Skip "n" bytes from the file. This is guaranteed to be no
  // slower that reading the same data, but may be faster.
  //
  // If end of file is reached, skipping will stop at the end of the
  // file, and Skip will return OK.
  //
  // REQUIRES: External synchronization
  virtual Status Skip(uint64_t n) = 0;
};

从注释中看到,这是一个顺序读文件的抽象,提供了两个接口,一个是Read,从当前位置读n字节,另一个操作是跳过n字节,这个操作保证不比Read慢。另外,注意注释中的REQUIRES,这是说调用者需要保证对这两个接口的调用是同步的。然后我们去util/posix_env.cc看看PosixEnv下对这两个接口的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
virtual Status NewSequentialFile(const std::string& fname,
                                 SequentialFile** result) {
  FILE* f = fopen(fname.c_str(), "r");
  if (f == NULL) {
    *result = NULL;
    return IOError(fname, errno);
  } else {
    *result = new PosixSequentialFile(fname, f);
    return Status::OK();
  }
}

class PosixSequentialFile: public SequentialFile {
 private:
  std::string filename_;
  FILE* file_;

 public:
  PosixSequentialFile(const std::string& fname, FILE* f)
      : filename_(fname), file_(f) { }
  virtual ~PosixSequentialFile() { fclose(file_); }

  virtual Status Read(size_t n, Slice* result, char* scratch) {
    Status s;
    // 使用fread_unlocked是因为在函数之外对文件访问已经做了同步(调用者保证)
    size_t r = fread_unlocked(scratch, 1, n, file_);
    *result = Slice(scratch, r);
    if (r < n) {
      if (feof(file_)) {
        // We leave status as ok if we hit the end of the file
      } else {
        // A partial read with an error: return a non-ok status
        s = IOError(filename_, errno);
      }
    }
    return s;
  }

  virtual Status Skip(uint64_t n) {
    if (fseek(file_, n, SEEK_CUR)) {
      return IOError(filename_, errno);
    }
    return Status::OK();
  }
};

这段代码中唯一一个可能造成疑惑的点就是第26行,这里是因为Read接口和Skip接口需要调用者自己去保证同步(见声明中的注释),所以这里可以调用fread_unlocked代替fread用以提高性能。

接下来我们来看class WritableFileinclude/leveldb/env.h中的声明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// A file abstraction for sequential writing.  The implementation
// must provide buffering since callers may append small fragments
// at a time to the file.
class WritableFile {
 public:
  WritableFile() { }
  virtual ~WritableFile();

  virtual Status Append(const Slice& data) = 0;
  virtual Status Close() = 0;
  virtual Status Flush() = 0;
  virtual Status Sync() = 0;

 private:
  // No copying allowed
  WritableFile(const WritableFile&);
  void operator=(const WritableFile&);
};

class WritableFile需要实现接口Append, Close, Flush, Sync,在util/env_posix.cc中,对WritableFile的实现是PosixMmapFile:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// We preallocate up to an extra megabyte and use memcpy to append new
// data to the file.  This is safe since we either properly close the
// file before reading from it, or for log files, the reading code
// knows enough to skip zero suffixes.
class PosixMmapFile : public WritableFile {
 private:
  std::string filename_;
  int fd_;
  size_t page_size_;
  size_t map_size_;       // How much extra memory to map at a time
  char* base_;            // The mapped region
  char* limit_;           // Limit of the mapped region
  char* dst_;             // Where to write next  (in range [base_,limit_])
  char* last_sync_;       // Where have we synced up to
  uint64_t file_offset_;  // Offset of base_ in file

  // Have we done an munmap of unsynced data?
  bool pending_sync_;

  // Roundup x to a multiple of y
  static size_t Roundup(size_t x, size_t y) {
    return ((x + y - 1) / y) * y;
  }

  size_t TruncateToPageBoundary(size_t s) {
    s -= (s & (page_size_ - 1));
    assert((s % page_size_) == 0);
    return s;
  }

  bool UnmapCurrentRegion() {
    bool result = true;
    if (base_ != NULL) {
      if (last_sync_ < limit_) {
        // Defer syncing this data until next Sync() call, if any
        pending_sync_ = true;
      }
      if (munmap(base_, limit_ - base_) != 0) {
        result = false;
      }
      file_offset_ += limit_ - base_;
      base_ = NULL;
      limit_ = NULL;
      last_sync_ = NULL;
      dst_ = NULL;

      // Increase the amount we map the next time, but capped at 1MB
      if (map_size_ < (1<<20)) {
        map_size_ *= 2;
      }
    }
    return result;
  }

  bool MapNewRegion() {
    assert(base_ == NULL);
    if (ftruncate(fd_, file_offset_ + map_size_) < 0) {
      return false;
    }
    void* ptr = mmap(NULL, map_size_, PROT_READ | PROT_WRITE, MAP_SHARED,
                     fd_, file_offset_);
    if (ptr == MAP_FAILED) {
      return false;
    }
    base_ = reinterpret_cast<char*>(ptr);
    limit_ = base_ + map_size_;
    dst_ = base_;
    last_sync_ = base_;
    return true;
  }

PosixMmapFile的实现原理是通过mmap按pagesize映射文件,通过memcpy到映射的内存,实现文件的高效追加写。我们顺着代码来分析它的成员函数。第21行,辅助函数Roundup是用来实现对齐,这是因为mmap映射的内存要求是按页对齐的,所以如果要映射一块大小为x的内存,就应该使用 x = Roundup(x, pagesize)这样的代码来保证按页面对齐(假设某个操作系统的页大小为256,如果希望映射文件中一段4000字节的内容到内存,则应该调用:Roundup(4000, 256) = 4096)。

TruncateToPageBoundary(size_t s)是获取地址s所在的页基址。注意这个函数实现的代码为s -= (s & (page_size_ - 1));,由于page_size_一定是2的N次幂,所以page_size_ - 1的二进制表示正好是000…111这样的形式,因此这个算式就把s置为其所在页的页基址。而MapNewRegionUnmapCurrentRegion这两个函数顾名思义,就是把顺着文件的末尾,map一块新的区域到内存和unmap当前的区域。对这两个函数,只需要搞懂ftruncate, mmapmunmap这三个系统函数的作用和各个参数的含义,就能看懂其逻辑,因此这里也不赘述了。接下来分析PosixMmapFile的public函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
  PosixMmapFile(const std::string& fname, int fd, size_t page_size)
      : filename_(fname),
        fd_(fd),
        page_size_(page_size),
        map_size_(Roundup(65536, page_size)),
        base_(NULL),
        limit_(NULL),
        dst_(NULL),
        last_sync_(NULL),
        file_offset_(0),
        pending_sync_(false) {
    assert((page_size & (page_size - 1)) == 0);
  }

  ~PosixMmapFile() {
    if (fd_ >= 0) {
      PosixMmapFile::Close();
    }
  }

  virtual Status Close() {
    Status s;
    size_t unused = limit_ - dst_;
    if (!UnmapCurrentRegion()) {
      s = IOError(filename_, errno);
    } else if (unused > 0) {
      // Trim the extra space at the end of the file
      if (ftruncate(fd_, file_offset_ - unused) < 0) {
        s = IOError(filename_, errno);
      }
    }

    if (close(fd_) < 0) {
      if (s.ok()) {
        s = IOError(filename_, errno);
      }
    }

    fd_ = -1;
    base_ = NULL;
    limit_ = NULL;
    return s;
  }

构造函数主要是初始化成员变量,其中map_size_初始化为Roundup(65536, page_size),65536是2的16次方,而通常来说,page_size是4096(2的12次方),Roundup(65536, 4096) == 65536,因此在普通的服务器上,是一次映射16个页面。回顾我们前面对TruncateToPageBoundary的分析,其中谈到page_size_成员变量的值必须是2的整数次幂,在构造函数体中,代码assert((page_size & (page_size - 1)) == 0);保证了page_size必须是2的整数次幂。析构函数调用了PosixMmapFile::Close(),我们顺着调用栈来看PosixMmapFile的Close方法,Close方法首先调用UnmapCurrentRegion,取消内存映射,归还虚拟内存。然后判断映射的内存是否还有可用的(注意mmap的内存是在文件的末尾),如果还有可用的,说明文件末尾有文件空洞,因此要使用ftruncate调整文件的大小。之后再调用系统调用close,并把相关的成员变量赋为-1或者NULL,完成清理工作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
  virtual Status Append(const Slice& data) {
    const char* src = data.data();
    size_t left = data.size();
    while (left > 0) {
      assert(base_ <= dst_);
      assert(dst_ <= limit_);
      size_t avail = limit_ - dst_;
      if (avail == 0) {
        if (!UnmapCurrentRegion() ||
            !MapNewRegion()) {
          return IOError(filename_, errno);
        }
      }

      size_t n = (left <= avail) ? left : avail;
      memcpy(dst_, src, n);
      dst_ += n;
      src += n;
      left -= n;
    }
    return Status::OK();
  }

  virtual Status Sync() {
    Status s;

    if (pending_sync_) {
      // Some unmapped data was not synced
      pending_sync_ = false;
      if (fdatasync(fd_) < 0) {
        s = IOError(filename_, errno);
      }
    }

    if (dst_ > last_sync_) {
      // Find the beginnings of the pages that contain the first and last
      // bytes to be synced.
      size_t p1 = TruncateToPageBoundary(last_sync_ - base_);
      size_t p2 = TruncateToPageBoundary(dst_ - base_ - 1);
      last_sync_ = dst_;
      if (msync(base_ + p1, p2 - p1 + page_size_, MS_SYNC) < 0) {
        s = IOError(filename_, errno);
      }
    }

    return s;
  }

PosixMmapFile还剩下两个成员函数,AppendSync。这两个函数的逻辑都很简单。Append是顺着映射的内存写(使用memcpy完成写操作),如果写到了映射的边界,就先UnmapCurrentRegion,再调用MapNewRegion函数在文件的末尾再映射一块新的内存继续写,直至写完。Sync函数完成同步操作。首先判断是否pending_sync_是否为true,如果为true,说明在上次UnmapRegion之时,还没有做过fdatasync操作(munmap的时候,OS会把unmap的内存中的内容刷进文件,但是OS默认是异步刷盘,因此这里调用fdatasync完成同步)。然后判断当前写入点和当前同步点的位置,如果当前写入点的位置在当前同步点之后,那么需要把同步点和写入点之间这一段未同步的数据进行同步。这里调用msync完成同步(注意这里调用msync的最后一个参数是MS_SYNC,因此是进行同步刷盘的),而成员函数TruncateToPageBoundary就在此发挥作用,用于计算内存地址的页边界,因为msync是以页为单位进行同步的。这里附带说两句,被mmap的内存,在被写入或者修改数据之后,要直到munmap时才会被异步刷盘,如果需要立即同步,就需要调用msync实现,另外在这里完成磁盘数据同步作者调用了fdatasync,有兴趣的同学可以去man一下fsync和fdatasync,这两个函数是有区别的,fdatasync性能更高,因为它不会去同步文件的一些元数据信息。另外提一下,操作系统的写文件操作,默认都是异步刷盘,也就是写磁盘操作由进程提交给操作系统之后,就立即返回,而不是等到操作系统真正把数据写入到磁盘之后。异步刷盘的速度比同步刷盘快上千倍,当然其缺点是如果在进程把数据提交到操作系统后,系统突然宕机,那么这些数据就不会被真正写到磁盘上,而进程却以为数据已经成功写入了。当然这种情况只有在宕机才会出现,如果进程崩溃,是不会出现数据丢失的(因为这时候进程已经把写盘请求提交给OS了)。要确保宕机数据不丢,就需要使用同步刷盘(在posix系统中,同步刷盘的方式是在写操作返回之前调用fsync(...)fdatasync(...)msync(..., MS_SYNC) )。在leveldb的实现中,在性能和数据安全上做出了折衷,也就是每过一段时间做一次同步刷盘操作。

至此,本篇开始提到的Env中需要实现的几个类已经全部分析完。最后还要说一下PosixEnv中实现的一个做任务的后台线程池,这里面使用std::deque实现一个双向队列,Schedule实现了把一个任务进队的操作,BGThread则是不停的从队列中取任务来做,这使用的是APUE中论述的经典队列模型,我这里把具体实现的代码贴出来,就不再多费口舌分析了。看不懂的同学请参考APUE第二版11.6节程序清单11-9。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
  void PthreadCall(const char* label, int result) {
    if (result != 0) {
      fprintf(stderr, "pthread %s: %s\n", label, strerror(result));
      exit(1);
    }
  }

PosixEnv::PosixEnv() : page_size_(getpagesize()),
                       started_bgthread_(false) {
  PthreadCall("mutex_init", pthread_mutex_init(&mu_, NULL));
  PthreadCall("cvar_init", pthread_cond_init(&bgsignal_, NULL));
}

void PosixEnv::Schedule(void (*function)(void*), void* arg) {
  PthreadCall("lock", pthread_mutex_lock(&mu_));

  // Start background thread if necessary
  if (!started_bgthread_) {
    started_bgthread_ = true;
    PthreadCall(
        "create thread",
        pthread_create(&bgthread_, NULL,  &PosixEnv::BGThreadWrapper, this));
  }

  // If the queue is currently empty, the background thread may currently be
  // waiting.
  if (queue_.empty()) {
    PthreadCall("signal", pthread_cond_signal(&bgsignal_));
  }

  // Add to priority queue
  queue_.push_back(BGItem());
  queue_.back().function = function;
  queue_.back().arg = arg;

  PthreadCall("unlock", pthread_mutex_unlock(&mu_));
}

void PosixEnv::BGThread() {
  while (true) {
    // Wait until there is an item that is ready to run
    PthreadCall("lock", pthread_mutex_lock(&mu_));
    while (queue_.empty()) {
      PthreadCall("wait", pthread_cond_wait(&bgsignal_, &mu_));
    }

    void (*function)(void*) = queue_.front().function;
    void* arg = queue_.front().arg;
    queue_.pop_front();

    PthreadCall("unlock", pthread_mutex_unlock(&mu_));
    (*function)(arg);
  }
}

至此,util中与env相关的部分就分析完了,这一篇的内容很长,我自己在写这一篇的过程中,查阅了很多资料,也学习了不少系统知识,受益匪浅,同时也难免会有差错,如果有什么地方写得不对或者不够清楚,请读者们不吝赐教。下一篇,我们将分析leveldb实现的cache,同时也将是util部分的最后一篇。