LevelDB之Version

在本文中介绍Version和VersionEdit概念,它们有助于理解LevelDB对MVCC的实现。
本文拆解自《LevelDB之Compaction》。

目录:

  1. LevelDB之Memtable实现
  2. LevelDB之SSTable实现
  3. LevelDB之Version
  4. LevelDB之Compaction
  5. LevelDB之流程概览

常见文件

需要注意的是,LevelDB是一个单机的数据库,所以实际承载的SSTable文件都位于一台机器上。

文件类型

1
2
3
4
5
6
7
8
9
enum FileType {
kLogFile,
kDBLockFile,
kTableFile,
kDescriptorFile,
kCurrentFile,
kTempFile,
kInfoLogFile // Either the current one, or an old one
};
  1. kLogFile:WAL日志文件,文件名数字.log
  2. kDBLockFile:db锁文件,文件名LOCK
  3. kTableFile:SSTable文件,文件名数字.sst
  4. kDescriptorFile:Manifest文件,存储VersionEdit信息,文件名为MANIFEST-数字
    对应descriptor_file_这个字段。
    Manifest文件中维护了所有的SSTable的key范围,层级,以及其他的元信息。
  5. kCurrentFile:记录当前的Manifest文件,文件名为CURRENT
  6. kTempFile:临时文件,db在修复【?】过程中会产生临时文件,文件名为数字.dbtmp
  7. kInfoLogFile:日志文件,文件名为LOG

WAL

Memtable 和 log 一一对应。

Version 机制和 Manifest 文件

Manifest

LevelDB 每次新生成 SSTable 文件,或者删除 SSTable 文件,都会从一个版本升级成另外一个版本。

LevelDB 采用了增量式的存储方式,用 VersionEdit 记录每一个 Version 对上一个 Version 的变化情况。

Manifest 文件专用于记录版本信息。一个 Manifest 文件中,包含了多条Session Record。一个 Session Record 记录了从上一个版本至该版本的变化情况。每一个 Session Record 对应一个 VersionEdit。其中第一条 Session Record 包含当时LevelDB的全量版本信息,这个应该是通过 WriteSnapshot 来实现的,可以看下面的介绍。

如下展示了 Manifest 文件的结构。

借助 Manifest 文件,LevelDB 启动时,可以根据一个初始的版本状态,不断地应用后面的 VersionEdit,使得系统的版本信息恢复到最近一次使用的状态。

如下所示,Manifest 中的每个 Entry,即 Session Record 包含:

  1. 增加的 SSTable
    kNewFile。比如如果是因为 minor compaction 或者 replay 生成了一个新的 sst file,则记录该信息。
  2. 删除的 SSTable
    kDeletedFile
  3. 当前 Compaction 的下标
    kCompactPointer
  4. 日志文件编号
    kLogNumber
  5. 数据库已经持久化数据项中最大的 Sequence Number
    kLastSequence

对应的代码如下

1
2
3
4
5
6
7
8
9
10
11
enum Tag {
kComparator = 1,
kLogNumber = 2,
kNextFileNumber = 3,
kLastSequence = 4,
kCompactPointer = 5,
kDeletedFile = 6,
kNewFile = 7,
// 8 was used for large value refs
kPrevLogNumber = 9
};

写 Manifest 的代码应该是 Writer::AddRecord。这是因为一个 Manifest 就是一个 WAL 文件。

读 Manifest 的代码,例如 VersionSet::Recover

Current

记录当前的 Manifest 文件名。
随着 LevelDB 运行时间的增长,一个 Manifest 中包含的 Session Record 会越来越多,故 LevelDB 在每次启动时都会重新创建一个 Manifest 文件,并将第一条 Session Record 中记录当前 Version 的快照状态。
其他过期的 Manifest 文件会在下次启动的 recover 流程中进行删除。
LevelDB 通过这种方式,来控制 Manifest 文件的大小,但是数据库本身没有重启,Manifest 还是会一直增长。

Version 机制

大前提,Compaction 过程是通过独立线程异步并发执行的。因此可能出现压缩前后的新老 SSTable 并存的情况。我们不能立即删除老的 SSTable文件,这可能是因为这个 SSTable 还在被读取,所以要等到老 SSTable 的引用计数为0才行。因此 Version 机制可以用来辨别这些 SSTable 的版本。借助于 Version 机制,也能实现 MVCC。

Version 变更,即升级发生在:

  1. 完成一次 major compaction 整理内部数据
  2. 通过 minor compaction 或者重启阶段的日志重放新生成一个0层文件

如下所示,新版本 New-Version 由 Version 类和 VersionEdit 类来描述。即 VersionEdit 是 New-Version 相对于 Version 的改动。

1
New-Version = Version + VersionEdit

LevelDB 将所有的 Version 置于一个双向链表之中,因此所有的 Version 组成一个名为 VersionSet 的集合。这个集合也代表了当前 DB 的状态,包含了最新的 Version,以及其他正在服务的 Version。

容易概括出下面的对应关系:

  1. VersionSet - Manifest
  2. VersionEdit - Session Record

Version、VersionEdit 和 VersionSet

相关类型

FileMetaData

1
2
3
4
5
6
7
8
9
10
struct FileMetaData {
int refs;
int allowed_seeks; // Seeks allowed until compaction
uint64_t number;
uint64_t file_size; // File size in bytes
InternalKey smallest; // Smallest internal key served by table
InternalKey largest; // Largest internal key served by table

FileMetaData() : refs(0), allowed_seeks(1 << 30), file_size(0) { }
};

VersionEdit

介绍作为桥梁作用的VersionEdit类,它对应了 Session Record。这个类里面的方法大部分是用来读写里面的私有成员的,所以不再介绍。

  1. std::string comparator_;

  2. uint64_t next_file_number_;/bool has_next_file_number_;,通过 SetNextFile 设置
    VersionSet 里面也有类似字段,是一个全局分配器,分配下一个可用的 file number。这个 number 是 SSTable、Manifest 等共用的。详见对应章节。
    VersionSet 在 LogAndApply 中会调用 VersionEdit::SetNextFile 将自己的 next_file_number_ 设置给 VersionEdit。

  3. uint64_t log_number_;,通过 SetLogNumber 设置
    log 文件的 file number,也就是000003.log的这个3。log_number_ 表示小于这个值的Log是可以被删除的
    【Q】这个字段的作用是什么呢?目前来看,在 Recover 的时候会用到。
    【Q】为什么 VersionSet 里面也有?其实 VersionSet 里面的才是主要的,VersionEdit 里面的这个字段,是在 LogAndApply 的时候,由 VersionSet 设置过来的。
    【Q】这个 number 和版本的关系是什么,是一一对应的么?比如一次 Compaction 之后就要换个 log?

    • 在 DBImpl::CompactMemTable 中会更新为 logfile_number_。而后者是在 MakeRoomForWrite 中更新。
    • 在 DB::Open 中会更新。
    • 在 VersionSet::LogAndApply 中会设置,这是在 Recover 场景。
  4. uint64_t prev_log_number_;/bool has_prev_log_number_;
    包括 void SetPrevLogNumber(uint64_t num) 这个函数。
    这篇文章prev_log_number_ 已经废弃了,出于兼容性才保留的。

  5. SequenceNumber last_sequence_;/bool has_last_sequence_;
    SSTable 中的最大的 Sequence Number。VersionSet 里面也有个平行的。

  6. bool has_comparator_;

  7. bool has_log_number_;
    当被调用了 SetNextFile,has_log_number_ 会为 true。
    在重启后 Recover 数据库时,在 VersionSet::Recover 中,当调用了 DecodeFrom 后 has_log_number 会为 true。

  8. std::vector<std::pair<int, InternalKey>> compact_pointers_;
    主要用于 Major Compaction 的时候选择文件。first 表示每个 level。
    【Q】在 Compaction 类和 VersionSet 类里面也有一个这个字段。它们的作用是什么呢?

  9. DeletedFileSet deleted_files_;

    1
    typedef std::set<std::pair<int, uint64_t>> DeletedFileSet;

    pair 存储了 level 和 file。表示将第 level 层中的 file 删除。

  10. std::vector<std::pair<int, FileMetaData>> new_files_;
    FileMetaData 存储了 file number、文件大小,以及文件中最小的 Key 和最大的 Key。

Version

相关字段

  1. VersionSet相关
    指向这个 Version 所属的 VersionSet,以及双向链表和引用计数。
    所以说每个 Version 只能属于一个 VersionSet,这个也是很好理解的。

    1
    2
    3
    4
    VersionSet* vset_;  // VersionSet to which this Version belongs
    Version* next_; // Next version in linked list
    Version* prev_; // Previous version in linked list
    int refs_; // Number of live refs to this version
  2. SSTable相关
    files_ 表示 LevelDB 中每一层中所有的 SSTable 的文件信息。
    file_to_compact(_level)_ 标记下一个要 Compact 的文件以及属于的 Level。

    1
    2
    3
    4
    5
    6
    // List of files per level
    std::vector<FileMetaData*> files_[config::kNumLevels];

    // Next file to compact based on seek stats.
    FileMetaData* file_to_compact_;
    int file_to_compact_level_;

    根据SaveTo函数的论述files_[level]是有序的

  3. 其他字段
    compaction_score_用来计算最迫切需要 Compaction 的 Level,所以可以决定是否需要发起 Major Compaction。这个分数取决于某一层所有 SSTable 的大小。
    NeedsCompaction 会读取这个字段,计算是否需要根据 Version 的情况来 Compaction,并呈递给 MaybeScheduleCompaction

    1
    2
    3
    4
    // Level that should be compacted next and its compaction score.
    // Score < 1 means compaction is not strictly needed.
    double compaction_score_;
    int compaction_level_;

相关函数

  1. PickLevelForMemTableOutput

    1
    int PickLevelForMemTableOutput(const Slice& smallest_user_key, const Slice& largest_user_key);

    给定一个 Memtable 里面的 Key 的范围,返回这个 Memtable 被 Ingest 的话要放到第几层。

  2. Compaction* PickCompaction();
    用来处理 size compaction 和 seek compaction。
    这个函数,在“Compaction主函数”这个章节介绍。

  3. Compaction* CompactRange(int level, const InternalKey* begin, const InternalKey* end);

VersionSet

成员介绍

  1. Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu)
    这个函数接受一个 VersionEdit,产生一个新的 Version。
  2. std::string compact_pointer_[config::kNumLevels];
    这个字段在 Major Compaction 过程中被用到。表示每个 level 上,下一次 Compaction 需要开始的 key 的位置。它要么是一个空串,要么是一个 InternalKey。
    在什么时候被设置呢?从代码上来看,在 VersionSet::Apply 时,会用 edit 中的 compact_pointers_.second 来设置。而 edit 中的是在 SetupOtherInputs 中设置的 largest。
    根据文章,这个 compact_pointer_ 实际上表示这一层上一次 Compact 时文件的 largest。下一次 Compaction 从这个 key 开始。
  3. Status Recover(bool* save_manifest);
    关于 Recover 机制,我们不在这篇文章中介绍。详见“LevelDB之流程概览”这篇文章。

有关 Sequence:

  1. uint64_t LastSequence() const { return last_sequence_; },通过SetLastSequence设置。
    返回最近的 Sequence Number。这个是在写入记录的时候会使用并且更新。
    【Q】VersionEdit 里面也有个平行的,他们之间的关系是什么呢?
    • 首先 VersionSet 的 last_sequence_ 会随着 DBImpl::Write 操作更新。
    • 当需要进行 Compact 的时候,会在 LogAndApply 中赋给 VersionEdit 中的对应字段。而 VersionEdit 的目的,似乎只是持久化这个信息。

有关日志:

  1. prev_log_number_/log_number_
    【Q】和 VersionEdit 里面同名字段的关系是什么?在 LogAndApply 后,会用 VersionEdit 中的 log_numer_ 来更新。

有关文件编号:

  1. next_file_number_
    被下列函数修改。

    1
    2
    3
    4
    5
    6
    uint64_t NewFileNumber() { return next_file_number_++; }
    void ReuseFileNumber(uint64_t file_number) {
    if (next_file_number_ == file_number + 1) {
    next_file_number_ = file_number;
    }
    }

    这个字段用来生成系统中下个文件的编号。
    【Q】这里的 file number 指的是 SSTable 的 file number么?不是,而是Manifest 文件、SSTable 文件啥的共用一个编号,这也是为什么一开始 Log 文件是0,Manifest 文件是1,SetNextFile 是2的原因。
    NewFileNumber调用:

    • DB::Open 中产生 log 的编号
    • WriteLevel0Table 中产生 SSTable 的编号
    • OpenCompactionOutputFile
    • MakeRoomForWrite 中产生新的 memtable 以及对应的 log 的编号
  2. manifest_file_number_;
    表示Manifest文件的编号,主要在Recover时用到。

疑问:

  1. VersionSet 和 DBImpl是一一对应的么?
    应该是的,DBImpl 持有一个VersionSet*

VersionSet::LogAndApply

在前面已经简单介绍过这个函数的功能了。这个函数主要在下面几个地方用到:

  1. DB::Open
    当 DB 启动的时候,可能需要通过 DBImpl::Recover 调用 VersionSet::Recover 从 log 中恢复一部分数据。这些数据会以 VersionEdit 的方式被 Apply。
  2. DBImpl::CompactMemTable
    Minor Compaction。在这个过程中,会将 Immutable Memtable 生成 SSTable 文件,然后会将 edit 的 log_number_ 更新为 logfile_number_,也就是从 Immutable Memtable 对应的值更新为 Memtable 对应的值。然后就会调用 LogAndApply。
    CompactMemTable 在下面的地方调用:
    • BackgroundCompaction 中调用 CompactMemTable:这是通常情况,也就是发现有 Immutable Memable 了。
    • DoCompactionWork:也就是在 Major Compaction 的过程中也要有限处理 Minor Compaction。
  3. BackgroundCompaction 的非 manual 模式,这是平凡情况
    这种情况只是将某个 SSTable 移动到别的层。
  4. BackgroundCompaction 的 manual 模式,这是非平凡情况,也是常见情况
    需要归并。

函数的流程主要是:

  1. 将 VersionEdit 应用在 current_,并借助于 VersionSet::Builder 生成一个新的 Version。Builder 类的实现是比较巧妙的,会在稍后来讲解。
  2. 调用 Finalize 函数更新 compaction_level_compaction_score_
  3. 更新 Manifest 文件。主要是把 VersionEdit 中的内容 VersionEdit::EncodeTo 到 Manifest 文件里面。
  4. 调用 AppendVersion 将新版本添加到 VersionSet 的双向链表中,并且设置新的 current_

下面这里讲解一下源码。
__attribute__((exclusive_locks_required))表示检查在调用 LogAndApply 函数之前就要持有锁 mu。因此同时只会有一个线程执行 LogAndApply

1
2
3
Status LogAndApply(VersionEdit* edit, port::Mutex* mu)
EXCLUSIVE_LOCKS_REQUIRED(mu);
...

如果 VersionEdit 有 log_number_,说明这来自于一个 Recover 过程,要和 VersionSet 的 log_number_ 进行校验:

  1. 要大于等于 VersionSet 的 log_number_
    这是因为在 VersionSet::Recover 时会用一个循环不断地 apply 所有的 VersionEdit。在 apply 之后,会将 edit 的 log_number_ 赋值给自己。而后一个 VersionEdit 的 log_number 肯定是大于等于前一个的。【Q】为什么这里能取等号?是不是和 Major Compaction 有关?
  2. 要小于 next_file_number_,因为这是个过去的 log,它的编号肯定小于 next_file_number_

否则,就用 VersionEdit 的 log_number_ 来设置自己的 log_number_。从 CompactMemTable 可以看到,edit 的 log_number_ 实际等于 logfile_number_,也就是当前使用的 Memtable 对应的 log 的 number。

1
2
3
4
5
6
7
8
9
10
11
12
Status VersionSet::LogAndApply(VersionEdit* edit, port::Mutex* mu) {
if (edit->has_log_number_) {
assert(edit->log_number_ >= log_number_);
assert(edit->log_number_ < next_file_number_);
} else {
edit->SetLogNumber(log_number_);
}

if (!edit->has_prev_log_number_) {
edit->SetPrevLogNumber(prev_log_number_);
}
...

把 VersionSet 的 last_sequence_ 传给 edit,在对 VersionSet::Builder 的论述中已经推断过这里的作用了。

1
2
3
4
5
6
7
8
9
10
11
12
...
edit->SetNextFile(next_file_number_);
edit->SetLastSequence(last_sequence_);

Version* v = new Version(this);
{
Builder builder(this, current_);
builder.Apply(edit);
builder.SaveTo(v);
}
Finalize(v);
...

下面的 descriptor_file_ 就是一个 Manifest 文件。
如果此时 descriptor_log_ 是 NULL,根据注释,这个对应到首次打开数据库的状态。要新建一个 Manifest 文件,此时DescriptorFileName产生一个 "/MANIFEST-%06llu" 格式的文件名字。
通过 WriteSnapshotdescriptor_log_ 写到新的 Manifest 文件里面,这个实际上就是 Current Version 的快照。WriteSnapshot 里面也会调用 EncodeToAddRecord。为什么用 WriteSnapshot?回忆之前介绍了 Manifest 文件的构造,里面提到第一条 Session Record 记录了当前数据库的全量数据,就是在这里实现的。
【Q】注意,VersionSet::ReuseManifest 也会修改这个 descriptor_log_,有什么影响呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
...
// Initialize new descriptor log file if necessary by creating
// a temporary file that contains a snapshot of the current version.
std::string new_manifest_file;
Status s;
if (descriptor_log_ == nullptr) {
// No reason to unlock *mu here since we only hit this path in the
// first call to LogAndApply (when opening the database).
assert(descriptor_file_ == nullptr);
new_manifest_file = DescriptorFileName(dbname_, manifest_file_number_);
edit->SetNextFile(next_file_number_);
s = env_->NewWritableFile(new_manifest_file, &descriptor_file_);
if (s.ok()) {
descriptor_log_ = new log::Writer(descriptor_file_);
s = WriteSnapshot(descriptor_log_);
}
}
...

下面,是把 VersionEdit 中的内容 EncodeTo 到 Manifest 文件里面。这里不是 WriteSnapshot 写快照了,而是写一条 Log。其实 Manifest 文件的格式就是 Log。
后面就是一系列写文件的操作,因为不需要加锁,所以看到 LevelDB 故意将它们集中在一起做,并在这之前释放了锁。

1
2
3
4
5
6
7
8
9
...
// Unlock during expensive MANIFEST log write
{
mu->Unlock();

// Write new record to MANIFEST log
if (s.ok()) {
std::string record;
...

EncodeTo 将信息按照下面的Tag分类

1
2
3
4
5
6
7
8
9
10
11
enum Tag {
kComparator = 1,
kLogNumber = 2,
kNextFileNumber = 3,
kLastSequence = 4,
kCompactPointer = 5,
kDeletedFile = 6,
kNewFile = 7,
// 8 was used for large value refs
kPrevLogNumber = 9
};

AddRecord 将信息编码到文件中,对应的读取函数是 Reader::ReadRecord

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
...
edit->EncodeTo(&record);
s = descriptor_log_->AddRecord(record);
if (s.ok()) {
s = descriptor_file_->Sync();
}
if (!s.ok()) {
Log(options_->info_log, "MANIFEST write: %s\n", s.ToString().c_str());
}
}

// If we just created a new descriptor file, install it by writing a
// new CURRENT file that points to it.
if (s.ok() && !new_manifest_file.empty()) {
s = SetCurrentFile(env_, dbname_, manifest_file_number_);
}

mu->Lock();
}
...

我们现在得到了一个新的 Version 即 v,调用 AppendVersion 将它设置为 current_。这个函数还会将 v 添加到 VersionSet 里那个双向链表里面。
文章中有疑问这里遇到多线程怎么办,但 LevelDB 中 Compact 只有一条后台线程,并且这里是持有锁的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
...
// Install the new version
if (s.ok()) {
AppendVersion(v);
log_number_ = edit->log_number_;
prev_log_number_ = edit->prev_log_number_;
} else {
delete v;
if (!new_manifest_file.empty()) {
delete descriptor_log_;
delete descriptor_file_;
descriptor_log_ = nullptr;
descriptor_file_ = nullptr;
env_->RemoveFile(new_manifest_file);
}
}

return s;
}

VersionSet::AppendVersion

这里 dummy_versions_ 是 VersionSet 维护的环状链表头,dummy_versions_.prev_ 就是 current_

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
void VersionSet::AppendVersion(Version* v) {
// Make "v" current
assert(v->refs_ == 0);
assert(v != current_);
if (current_ != nullptr) {
current_->Unref();
}
current_ = v;
v->Ref();

// Append to linked list
v->prev_ = dummy_versions_.prev_;
v->next_ = &dummy_versions_;
v->prev_->next_ = v;
v->next_->prev_ = v;
}

可以通过下面的图清晰看出这个链表的结构。

VersionSet::Builder

这个类的作用是应用一系列 VersionEdit 得到一个新的 Version。不过感觉大头都已经被 VersionEdit 做掉了啊,这里还需要处理什么呢?主要是几点:

  1. 当前 Version 到底包含哪些文件,这总不能每次都从 VersionEdit 重新计算吧。
  2. Compact 相关。

LevelState 记录了需要被删除的文件的 number,这来自每个 VersionEdit 的 deleted_files_。还记录了需要被添加的文件的信息,这来自 new_files_。FileSet 指定根据 v->files_[file_number].smallest 排序。

1
2
3
4
5
typedef std::set<FileMetaData*, BySmallestKey> FileSet;
struct LevelState {
std::set<uint64_t> deleted_files;
FileSet* added_files;
};

看看主要的成员:

  1. VersionSet* vset_;
    在构造时传入的 VersionSet。
  2. Version* base_;
    在构造时传入的,一般为 current_
  3. LevelState levels_[config::kNumLevels];
    LevelState 里面记录了增加和删除的文件。
  4. void Apply(VersionEdit* edit)
    edit 里面的变动应用到 current_。例如要加些什么文件,写到 levels_[level].added_files 这个列表里面。但是我们不实际写,而是到 SaveTo 里面再一次性写。
    【Q】为什么要这样子呢?原因有2:
    • v->files_[level]这个是有序存储的。一起处理可以节约分配空间的开销,并且可以一次性 merge 解决问题。
    • 可以知道哪些文件最后还是被删除了,这样在 MaybeAddFile 添加的时候就可以不添加。
  5. void SaveTo(Version* v)
    注意,从VersionSet::Recover可以看出,Apply 和 SaveTo 并不是一对一的关系。例如我们从一个文件中多个记录里面恢复,那么每读取一个记录就要 Apply 一次,但最后再 SaveTo。

VersionSet::Builder::Apply

这个函数设置诸如 levels_[level].added_files 的字段,表示需要做什么改变。
首先将 VersionEdit 记录的 compact_pointers_ 应用到 VersionSet。

1
2
3
4
5
6
7
8
9
// Apply all of the edits in *edit to the current state.
void Apply(VersionEdit* edit) {
// Update compaction pointers
for (size_t i = 0; i < edit->compact_pointers_.size(); i++) {
const int level = edit->compact_pointers_[i].first;
vset_->compact_pointer_[level] =
edit->compact_pointers_[i].second.Encode().ToString();
}
...

然后把要增加和删除的文件记录到自己的 levels_ 字段里面。

1
2
3
4
5
6
7
8
...
// Delete files
for (const auto& deleted_file_set_kvp : edit->deleted_files_) {
const int level = deleted_file_set_kvp.first;
const uint64_t number = deleted_file_set_kvp.second;
levels_[level].deleted_files.insert(number);
}
...

在增加文件的时候,需要处理 allowed_seeks 字段。这里的 16384U 有点奇怪,是啥意思?根据注释,我们假设:

  1. 一次 Seek 耗时 10ms
  2. 读写 1MB 耗时 10ms,也就是 IO 速度是 100MB/s
  3. 一次 Compaction,假设是 1MB,需要消耗 25MB 的 IO
    1. 需要从这一层读取 1MB
    2. 从下一层读取 10-12MB 的数据(boundaries may be misaligned)
    3. 写 10-12MB 的数据到下一层

这说明 25 次 Seek 的开销等于 1MB 数据的 Compaction 成本,也就是一次 Seek 大概摊还下来是 40KB 数据的压缩成本。我们做一些保留,让 16KB 对应一次 Compaction,也就是允许更多的 Seek 次数。同时,我们将 f->allowed_seeks 最小值设为100,这样也不会一直 Compaction。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
...
// Add new files
for (size_t i = 0; i < edit->new_files_.size(); i++) {
const int level = edit->new_files_[i].first;
FileMetaData* f = new FileMetaData(edit->new_files_[i].second);
f->refs = 1;

f->allowed_seeks = static_cast<int>((f->file_size / 16384U)); // 16*1024
if (f->allowed_seeks < 100) f->allowed_seeks = 100;

levels_[level].deleted_files.erase(f->number);
levels_[level].added_files->insert(f);
}
}

【Q】还需要注意的是,在添加文件之前,需要在 deleted_files 中删除对应的 file_number,这是为何?难道同一个文件还可以先被删除,再被添加回来?

VersionSet::Builder::SaveTo

SaveTo 的最终影响是 MaybeAddFile,也就是说将文件添加到 Version 里面。添加到 v->files_ 里面。用一个简单的式子表达,也就是

1
base_->files_[level] + (levels_[level].added_files - levels_[level].deleted_files) = v->files_[level]
1
2
3
4
5
// Save the current state in *v.
void SaveTo(Version* v) {
BySmallestKey cmp;
cmp.internal_comparator = &vset_->icmp_;
...

下面的循环中依次处理每一层的合并。主要内容是:

  1. 将添加的文件合并到files_
  2. 删除文件

之前介绍过 Version* base_ 在构造时传入,一般为 CURRENT,我们就是要对 base_ 去应用这些修改,形成一个新的 Version。
base_iter 用来遍历 base_ 中原有的文件。

1
2
3
4
5
6
7
8
...
for (int level = 0; level < config::kNumLevels; level++) {
// Merge the set of added files with the set of pre-existing files.
// Drop any deleted files. Store the result in *v.
const std::vector<FileMetaData*>& base_files = base_->files_[level];
std::vector<FileMetaData*>::const_iterator base_iter = base_files.begin();
std::vector<FileMetaData*>::const_iterator base_end = base_files.end();
...

我们首先预留最大空间,避免到时候的频繁动态分配。但是实际上最终未必用满这个空间,因为 MaybeAddFile 不一定真的添加文件。
下面就是插入操作,创建 added_iter,依次遍历要添加的 levels_[level].added_files,这也是之前通过 Apply 统计的所有 VersionEdit 的累计。

下面要做的就是将 base_levels_ 里面的文件都添加到 v 中。这是一个归并的过程,分两步:

  1. 插入原有的 base_ 里面的文件,这些文件要小于等于 added_file
    我们通过 std::upper_bound 找到第一个大于 added_file 的位置 bpos。那么就先将 bpos 之前的 base_ 中的文件先插入。
    MaybeAddFile 插入,因为这些文件可能已经被标记删除。
  2. 插入 added_file
  3. 自增 added_iter,继续处理下一个文件

总而言之,我们先初始化了 bpos,但是循环中自增的却是 base_iterfor(A;B;C)里面 A 和 C 的主体不一样,挺新鲜的。

这么做的好处是什么?我认为是减少了比较的次数,从 O(n) 到了 O(logn)。因为这里是BytewiseComparator,是两个Slice之间的比较,所以开销还是比较大的,这是值得学习的一个 Best Practice。特别是我们 merge delta 层和 stable 层的时候,因为 delta 层相对较少,所以往往可以使用这种办法快速跳过大块的 stable 层的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
...
const FileSet* added_files = levels_[level].added_files;
v->files_[level].reserve(base_files.size() + added_files->size());
for (const auto& added_file : *added_files) {
// Add all smaller files listed in base_
for (std::vector<FileMetaData*>::const_iterator bpos =
std::upper_bound(base_iter, base_end, added_file, cmp);
base_iter != bpos; ++base_iter) {
MaybeAddFile(v, level, *base_iter);
}

MaybeAddFile(v, level, added_file);
}

// Add remaining base files
for (; base_iter != base_end; ++base_iter) {
MaybeAddFile(v, level, *base_iter);
}
...

在 Debug 的状态下,会去检查除 Level0 之外的层有没有重叠。检查方法也很简单,就是看后一个文件的 smallest 是不是一定严格大于前一个文件的 largest。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
...
#ifndef NDEBUG
// Make sure there is no overlap in levels > 0
if (level > 0) {
for (uint32_t i = 1; i < v->files_[level].size(); i++) {
const InternalKey& prev_end = v->files_[level][i - 1]->largest;
const InternalKey& this_begin = v->files_[level][i]->smallest;
if (vset_->icmp_.Compare(prev_end, this_begin) >= 0) {
std::fprintf(stderr, "overlapping ranges in same level %s vs. %s\n",
prev_end.DebugString().c_str(),
this_begin.DebugString().c_str());
std::abort();
}
}
}
#endif
}
}

VersionSet::Builder::MaybeAddFile

如果后续的 VersionEdit 中又删除了这个 file,那么在最终 SaveTo 的时候,就可以不添加了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void MaybeAddFile(Version* v, int level, FileMetaData* f) {
if (levels_[level].deleted_files.count(f->number) > 0) {
// File is deleted: do nothing
} else {
std::vector<FileMetaData*>* files = &v->files_[level];
if (level > 0 && !files->empty()) {
// Must not overlap
assert(vset_->icmp_.Compare((*files)[files->size() - 1]->largest,
f->smallest) < 0);
}
f->refs++;
files->push_back(f);
}
}

VersionSet::Builder 析构函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
~Builder() {
for (int level = 0; level < config::kNumLevels; level++) {
const FileSet* added = levels_[level].added_files;
std::vector<FileMetaData*> to_unref;
to_unref.reserve(added->size());
for (FileSet::const_iterator it = added->begin();
it != added->end(); ++it) {
to_unref.push_back(*it);
}
delete added;
for (uint32_t i = 0; i < to_unref.size(); i++) {
FileMetaData* f = to_unref[i];
f->refs--;
if (f->refs <= 0) {
delete f;
}
}
}
base_->Unref();
}

VersionSet::Finalize

这个函数在新 Version 产生后,处理一些额外的工作,主要是计算 Compaction 相关的东西。

1
2
3
4
5
6
7
8
void VersionSet::Finalize(Version* v) {
// Precomputed best level for next compaction
int best_level = -1;
double best_score = -1;

for (int level = 0; level < config::kNumLevels - 1; level++) {
double score;
...

如果 LevelDB 的第0层超过 kL0_CompactionTrigger 个文件就会触发 Compaction,如果超过 kL0_SlowdownWritesTrigger 就会 slow down write。针对第0层的特殊情况,这里使用文件数量而不是大小来计算 Compaction Score,注释里面列了两个原因:

  1. 允许更大的写 buffer,从而减少 Level0 Compaction 的数量。
    这里的写 buffer 应该是 options_.write_buffer_size 这个东西。这个阈值控制 Memtable 何时转换成 Immutable Memtable,以及在 Recover 的时候何时直接 dump 成 SSTable。
    佶屈聱牙,实际上的意思是,如果写 buffer 太大,又用固定的 size 限制死了的话,可能 Level0 的文件数量会很少,比如就1个,这样会导致频繁的 Level0 Compaction。
  2. Level0 的文件每次读取都会被 Merge。我们不希望有很多个小文件(perhaps because of a small write-buffer setting, or very high compression ratios, or lots of overwrites/deletions)。
    如果写 buffer 很小,这样会导致更多的 Level0 文件。因为 Level0 的文件是 overlap 的,所以如果数量过多,每次查询需要 Seek 的文件数量就越多。这样造成了较大的读放大。

总结一下,L0 不希望有一个单独的大文件,也不希望有很多个小文件。所以它选择直接控制文件数量。另外,还要考虑读。我们不希望 Compaction 挤占读带宽,所以当 Compaction 很大的时候,会 slow down write 或者 write stall。

1
2
3
4
5
6
...
if (level == 0) {
score = v->files_[level].size() /
static_cast<double>(config::kL0_CompactionTrigger); // ==4
} else {
...

对于第1层以下的层,计算文件总大小,而不是文件数量了。MaxBytesForLevel 的大概意思就是 Level1 总大小是 10M,下面每一层翻10倍。如果现在这一层的文件总大小大于 MaxBytesForLevel,则它就具有一个大于 1 的值,从而导致可能的 Compaction。我们会计算整个 Version 中 score 最大的 Level,将它计入 compaction_score_ 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
...
// Compute the ratio of current size to size limit.
const uint64_t level_bytes = TotalFileSize(v->files_[level]);
score =
static_cast<double>(level_bytes) / MaxBytesForLevel(options_, level);
}

if (score > best_score) {
best_level = level;
best_score = score;
}
}

v->compaction_level_ = best_level;
v->compaction_score_ = best_score;
}

MaxBytesForLevel

这个函数计算每一层的最大大小。

1
2
3
4
5
6
7
8
9
10
11
12
static double MaxBytesForLevel(const Options* options, int level) {
// Note: the result for level zero is not really used since we set
// the level-0 compaction threshold based on number of files.

// Result for both level-0 and level-1
double result = 10. * 1048576.0;
while (level > 1) {
result *= 10;
level--;
}
return result;
}

析构函数

将自己从链表中移除。
对于自己管理的所有文件,引用计数减一。【Q】这边不搞个原子操作么?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Version::~Version() {
assert(refs_ == 0);

// Remove from linked list
prev_->next_ = next_;
next_->prev_ = prev_;

// Drop references to files
for (int level = 0; level < config::kNumLevels; level++) {
for (size_t i = 0; i < files_[level].size(); i++) {
FileMetaData* f = files_[level][i];
assert(f->refs > 0);
f->refs--;
if (f->refs <= 0) {
delete f;
}
}
}
}

LevelDB对MVCC的实现总结

版本升级

文章中论述了一次版本升级的过程,但我会批注一下具体实现的函数和逻辑

  1. 新建一个 Session Record,记录状态变更信息
  2. 讨论版本升级原因
    1. Minor Compaction 或者日志 replay
      在 Session Record 中记录新增的文件信息、最新的 journal 编号、数据库 Sequence Number 以及下一个可用的文件编号。
    2. Major Compaction
      在 Session Record 中记录新增、删除的文件信息、下一个可用的文件编号即可。
  3. 通过 VersionEdit 生成新版本
    相较于旧的版本信息,新的版本信息更改的内容为:
    1. 每一层的文件信息:在 VersionSet::Builder::Apply 中。
    2. 每一层的计分信息:在 VersionSet::Finalize 中。
  4. 将 Session Record 持久化
    VersionSet::Builder::SaveTo 中。
  5. 讨论是否是第一条 Session Record
    在 LogAndApply 的 Finalize 调用之后的部分

    1. 新建一个 Manifest 文件,并将完整的版本信息全部记录进 Session Record 作为该 Manifest 的基础状态写入,同时更改 Current 文件,将其指向新建的 Manifest。
    2. 不是
      将该条 Session Record 进行序列化后直接作为一条记录写入即可。
  6. 将当前的 Version 设置为刚创建的 Version
    这个会修改 current_ 的指向。这个操作应该是原子的(不然最新版本岂不是会不一致么)实际上也在 mutex_ 的保护下。
    在 LogAndApply 对 AppendVersion 的调用中。

一些功能性函数

插入 SST 文件

主要在 Version::PickLevelForMemTableOutput 逻辑中。

OverlapInLevel

先介绍辅助函数 OverlapInLevel,作用是判断范围 [smallest_user_key,largest_user_key] 和 level 中的文件有没有 Overlap。

1
2
3
4
5
bool Version::OverlapInLevel(int level, const Slice* smallest_user_key,
const Slice* largest_user_key) {
return SomeFileOverlapsRange(vset_->icmp_, (level > 0), files_[level],
smallest_user_key, largest_user_key);
}

SomeFileOverlapsRange

SomeFileOverlapsRange 是 OverlapInLevel 的辅助函数。返回 files 中有没有在范围 [smallest_user_key,largest_user_key] 中的 key。
disjoint_sorted_files 表示传入的 files 里面的 key 是不是不 Overlap 的。Level0 会 Overlap,其他不会。
AfterFileBeforeFile 都比较 FileMetaData 里面的 largest/smallestuser_key() 字段。复习一下,largest 的类型是 InternalKey,由 User Key,Sequence Number 和 Value Type 组成。AfterFile 表示传入的 key 比传入的 f 中最大的 key 还大,disjoint_sorted_files 表示传入的 key 比传入的 f 中最小的 key 还小。
普通情况,对于一个文件 f,如果 smallest_user_key 大于该文件中的最大值,或者 largest_user_key 小于最小值,那么认为是不 Overlap 的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
bool SomeFileOverlapsRange(const InternalKeyComparator& icmp,
bool disjoint_sorted_files,
const std::vector<FileMetaData*>& files,
const Slice* smallest_user_key,
const Slice* largest_user_key) {
const Comparator* ucmp = icmp.user_comparator();
if (!disjoint_sorted_files) {
// Need to check against all files
for (size_t i = 0; i < files.size(); i++) {
const FileMetaData* f = files[i];
if (AfterFile(ucmp, smallest_user_key, f) ||
BeforeFile(ucmp, largest_user_key, f)) {
// No overlap
} else {
return true; // Overlap
}
}
return false;
}
...

如果是不相交的文件,就可以基于 FindFilefiles 集合二分查找。所以我们看到,在某一个 Level 找 SSTable 的时候是可以二分的
可以思考下用什么做二分的 key 呢?答案是每个 file 的 largest。我们要找到第一个 largest 大于等于 smallest_user_key 的文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
...
// Binary search over file list
uint32_t index = 0;
if (smallest_user_key != nullptr) {
// Find the earliest possible internal key for smallest_user_key
InternalKey small_key(*smallest_user_key, kMaxSequenceNumber,
kValueTypeForSeek);
index = FindFile(icmp, files, small_key.Encode());
}

if (index >= files.size()) {
// beginning of range is after all files, so no overlap.
return false;
}
...

二分法找到可能存在的文件 files[index] 后,不要忘了在判断下这个文件实际有没有 overlap。这是二分法的基本规则。

1
2
3
...
return !BeforeFile(ucmp, largest_user_key, files[index]);
}

FindFile

用来二分某个有序的 files 集合,从中找到可能存在 key 的文件。
它在写入、读取的过程中都会被用到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
int FindFile(const InternalKeyComparator& icmp,
const std::vector<FileMetaData*>& files, const Slice& key) {
uint32_t left = 0;
uint32_t right = files.size();
while (left < right) {
uint32_t mid = (left + right) / 2;
const FileMetaData* f = files[mid];
if (icmp.InternalKeyComparator::Compare(f->largest.Encode(), key) < 0) {
// Key at "mid.largest" is < "target". Therefore all
// files at or before "mid" are uninteresting.
left = mid + 1;
} else {
// Key at "mid.largest" is >= "target". Therefore all files
// after "mid" are uninteresting.
right = mid;
}
}
return right;
}

主流程

1
2
3
4
int Version::PickLevelForMemTableOutput(const Slice& smallest_user_key,
const Slice& largest_user_key) {
int level = 0;
...

首先判断要加入的文件的 [smallest_user_key,largest_user_key] 和 Level0 有没有交叠。如果有交叠,就进不了这个 if,直接放到第一层,等后面 Major Compaction 了。
如果没有交叠,尝试能否将它下放到 config::kMaxMemCompactLevel 之前的层。【Q】为什么要设置上限 kMaxMemCompactLevel 呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
...
if (!OverlapInLevel(0, &smallest_user_key, &largest_user_key)) {
// Push to next level if there is no overlap in next level,
// and the #bytes overlapping in the level after that are limited.
InternalKey start(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek);
InternalKey limit(largest_user_key, 0, static_cast<ValueType>(0));
std::vector<FileMetaData*> overlaps;
while (level < config::kMaxMemCompactLevel) {
if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) {
break;
}
// 为什么会有这个?下面讲。
if (level + 2 < config::kNumLevels) {
// Check that file does not overlap too many grandparent bytes.
...
}
level++;
}
}
return level;
}

判断level + 2层情况的分支详解

这里需要着重讲解一下 level + 2 < config::kNumLevels 这个分支的含义。

作为普通人,我觉得判断完 OverlapInLevel(level + 1,... 就可以直接 level++ 了啊,但是大佬肯定是不平凡的。
大佬觉得在把文件放到 level + 1 层前要先打住,看看 level + 2 层是什么情况,也就对应到下面的代码。我们要计算所有重叠的文件的总大小,如果这个大小超过了阈值,那么就不把这个 SSTable 进行下放。
这是防止 level + 1 和 level + 2 的重叠范围太大,导致这两层进行 Compaction 时涉及的 SSTable 过多,耗时过长。

1
2
3
4
5
6
7
8
9
10
11
// PickLevelForMemTableOutput中的片段代码
...
if (level + 2 < config::kNumLevels) {
// Check that file does not overlap too many grandparent bytes.
GetOverlappingInputs(level + 2, &start, &limit, &overlaps);
const int64_t sum = TotalFileSize(overlaps);
if (sum > MaxGrandParentOverlapBytes(vset_->options_)) {
break;
}
}
...

于是,先要用 GetOverlappingInputs 这个函数,计算 level + 2 层中到底有哪些文件和 [smallest_user_key,largest_user_key] 有交叠,这些文件会放到 overlaps 里面。然后用 TotalFileSize 对这些重叠的文件的 FileMetaData::file_size 字段求和,得到总大小。
然后和 MaxGrandParentOverlapBytes 返回的阈值进行比较,如果大于这个阈值,就跳出循环,不再往下放了。

GetOverlappingInputs/MaxGrandParentOverlapBytes

GetOverlappingInputs 的目标是找到 level 中和 [begin,end] 重叠的所有文件,并放到 inputs 里面。这个函数对 Level0 有特殊的处理。

1
2
3
4
// Store in "*inputs" all files in "level" that overlap [begin,end]
void Version::GetOverlappingInputs(int level, const InternalKey* begin,
const InternalKey* end,
std::vector<FileMetaData*>* inputs) {

user_beginuser_end 是从 InternalKey 中提取出的 user key。如果传入 nullptr,表示在比较时 begin 永远小于任何 key。
【Q】这里为什么去找的 user key 而不是 InternalKey 呢?貌似很多地方都是找 user key。在这篇文章中指出一个很容易注意到的性质,就是除了 Level0,每一层 Level 都是有序的。进一步地,由于 LevelDB 使用 leveled 策略(LCS),即强调一个 key 在每一层至多只有1条记录,不存在冗余记录。【Q】但在LevelDB之Compaction中,AddBoundaryInputs 的出现似乎告诉我们还是会存在重复的 user key 在同一层上的。

1
2
3
4
5
6
7
8
9
10
11
12
13
...
assert(level >= 0);
assert(level < config::kNumLevels);
inputs->clear();
Slice user_begin, user_end;
if (begin != nullptr) {
user_begin = begin->user_key();
}
if (end != nullptr) {
user_end = end->user_key();
}
const Comparator* user_cmp = vset_->icmp_.user_comparator();
...

默认遍历这一层的所有的文件。前面两个 if 分别处理文件和 range 完全不重叠的情况。

1
2
3
4
5
6
7
8
9
10
11
...
for (size_t i = 0; i < files_[level].size();) {
FileMetaData* f = files_[level][i++];
const Slice file_start = f->smallest.user_key();
const Slice file_limit = f->largest.user_key();
if (begin != nullptr && user_cmp->Compare(file_limit, user_begin) < 0) {
// "f" is completely before specified range; skip it
} else if (end != nullptr && user_cmp->Compare(file_start, user_end) > 0) {
// "f" is completely after specified range; skip it
} else {
...

else 处理有重叠的情况。我们把这个文件加入到 inputs 里面作为结果返回。对于 PickLevelForMemTableOutput 的逻辑而言,这里就到此为止了。

但是 GetOverlappingInputs 这个函数还会在 CompactRangeSetupOtherInputs 这些函数中用到。此时,需要处理 Level0 的逻辑。
且慢,我们已经逐文件遍历了啊,还会有什么问题呢?
这篇文章中,详细解释了原因。这因为认为 Level1 的文件是比 Level0 要旧的,所以如果要把 Level0 中的某个文件 f 移动到 Level1 中,则要把 Level0 中所有和 f Overlap 的文件都放到 Level1 里面。这样,实际上保证了如果我有一个 Key 在 Level0 里面,那么 inputs 包含了所有包含这个 Key 的文件。
所以这里我们要修改遍历的区间。也就是说,当检查到 user_begin 在文件 [file_start,file_limit] 中后,需要将 user_begin 调整为文件的开头 file_start,从而扫描和整个文件 f 重叠的所有文件。对 user_end也是同理的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
...
inputs->push_back(f);
if (level == 0) {
// Level-0 files may overlap each other. So check if the newly
// added file has expanded the range. If so, restart search.
if (begin != nullptr && user_cmp->Compare(file_start, user_begin) < 0) {
user_begin = file_start;
inputs->clear();
i = 0;
} else if (end != nullptr &&
user_cmp->Compare(file_limit, user_end) > 0) {
user_end = file_limit;
inputs->clear();
i = 0;
}
}
}
}
}

这个函数的存在,也回答了后续在 Compaction 过程中的一些疑问:

  1. 在 Level0 往 Level1 归并的时候,其实也应该看到这个过程。事实上 PickCompaction 的代码实现中也能看到在最后有个 if (level == 0) 的判断。
  2. IsTrivialMove 为什么 Level 层有两个的时候,不能简单把其中一个文件移动到下层。
    因为可能另一个文件的和下层 key 有重叠。

重启后 Recover

Snapshot机制

Snapshot 对 Compaction 的影响

这里说明 Snapshot 对 Compaction 的影响:导致同一个 user key 的不同的 Sequence Number 版本存在多个。

Snapshot 实际上就是某个特定的 Sequence Number。
【Q】Sequence Number 是全局递增的么?应该是这样的,在 Put 和 Get 的实现中,看到的都是读取的VersionSet::LastSequence()这个。

1
2
3
4
const Snapshot* DBImpl::GetSnapshot() {
MutexLock l(&mutex_);
return snapshots_.New(versions_->LastSequence());
}

Reference

  1. https://bean-li.github.io/leveldb-version/
  2. https://www.ravenxrz.ink/archives/1ba074b9.html
    介绍了Snapshot
  3. https://izualzhy.cn/leveldb-version
    解释了Version的实现
  4. http://www.petermao.com/leveldb/leveldb-8-snapshot.html
    介绍了snapshot机制
  5. https://zhuanlan.zhihu.com/p/60188395
    带Snapshot的Compaction,以及为什么会导致Issue 320的问题
  6. https://zhuanlan.zhihu.com/p/360345923
    也讲解了AddBoundaryInputs的来源,并且指出了快照会导致Issue 320的问题。
  7. https://zhuanlan.zhihu.com/p/35343043
    讲解VersionSet/VersionEdit里面出现的各种文件编号
  8. https://leveldb-handbook.readthedocs.io/zh/latest/version.html
    版本控制相关
  9. https://bean-li.github.io/leveldb-manifest/
    有关Manifest文件的深入讨论
  10. http://1feng.github.io/2016/08/24/mvcc-and-manifest/
    介绍MVCC机制,很好
  11. https://draveness.me/database-concurrency-control/
    同样介绍了MVCC,包括乐观锁和悲观锁机制