LevelDB之SSTable实现

本文介绍 LevelDB的 SSTable 相关功能。
SSTable 是 LevelDB 的内存数据结构。当一个 Memtable 满之后,会被变成 Immutable Memtable,并写入 SSTable Level0。Level0 的 SSTable 是没有经过归并的,各个 Key 可能互相重叠。经过 Compaction 达到 Level1 之后,就是有序的了。

目录:

  1. LevelDB之Memtable实现
  2. LevelDB之SSTable实现
  3. LevelDB之Version
  4. LevelDB之Compaction
  5. LevelDB之流程概览

SSTable格式

SSTable 是后缀为 .sst 或 .ldb 的文件。
官方文档中,对 SSTable 的格式已经有了介绍。

1
2
3
4
5
6
7
8
9
10
11
12
<beginning_of_file>
[data block 1]
[data block 2]
...
[data block N]
[meta block 1]
...
[meta block K]
[metaindex block]
[index block]
[Footer] (fixed size; starts at file_size - sizeof(Footer))
<end_of_file>
  1. data block
    放有序的 KV 对。在查询SSTable文件的时候,也可以二分。
    将专门讨论 Block 的组织。

  2. meta block
    用来快速定位 key 是否在 data block 中。容易想到,里面可以是一些类似于 bloom filter 的实现。

  3. metaindex block
    每个 metaindex block 一条记录。其中 K 是 meta block 的名字,V 是指向这个 meta block 的 BlockHandle。
    BlockHandle 类似于指针,具有下面的结构。这里面也用了 VarInt 结构。

    1
    2
    offset:   varint64
    size: varint64

    有两种 meta block 类型,filter 和 stats:

    1. 如果在数据库启动时指定了某个 FilterPolicy ,就会创建一个filter block。
    2. 统计信息
  4. index block
    每个 data block 对应 index block 中的一条 entry。这个 index block entry 的 .key() 大于等于指向的 data block 最后一个 K【性质1】,但是严格小于下一个 data block 的第一个 K【性质2】。
    因此可以通过和 index block 比较来快速定位 data block。

  5. footer
    包含:

    1. metaindex handle
    2. index handle
    3. padding
    4. magic number

Block实现

SST 的构建主要集中在 table_builder.h/ccblock_builder.h/cc 中。SST 的读取主要集中在 table.h/ccblock.h/cc 中。
从前面可以看到,SSTable 主要有两层结构,Table(SSTable) 和 Block(data/index 等)。
Table 由多个 Block 构成,所以从 Block 开始分析。

BlockBuilder/Block原理

BlockBuilder 负责生成诸如 data block、index block 等所有 block。
Block 对象负责读取这些 block。

共享前缀

因为 BlockBuilder 是有序的,所以使用如下的共享前缀来节约空间:

  1. 普通

    1
    2
    Hello World
    Hello William
  2. 共享

    1
    2
    Hello World
    illiam

Block 的构造如下图所示

其中:

  1. shared_bytes
    Key 和上一个 entry 的共享前缀的长度。
  2. key_delta/unshared_bytes
    Key 除了共享前缀之外的剩余串,以及其长度。
  3. value/value_length
    值的串和其长度。

那么如何读用共享前缀表示的 K 呢?

  1. 最坏状况,比如需要读到第一个 Entry
    如下所示,第三个 K 是 abc,但是我们得回溯到第一个 K 即 a 才能确认。

    1
    2
    3
    a
    b
    c
  2. 最好状况,读当前的就行
    比如下面的 a 和 b 根本没有任何共享串。

    1
    2
    a
    b

restart

可见共享前缀会给读带来困难,因此又引入 restart 机制,即每隔 block_restart_interval 之后会去存储一次完整的 key,对应的 entry 的位置称为 restart point。在 block 中,会存储下所有的 restart point。
因为 Block 内部有序,所以能通过二分 restart point 来加速读取,具体代码在 Block::Iter 中。读取需求一般是给定 target,要求找到第一个 K 大于等于 target 的 entry。可以通过二分,即 Block::Iter 的 Seek 方法来做。

filter block

在每个 data block 内部,借助于二分 restart 可以实现 $log(n)$ 复杂度的查询,那么能在 data block 之间二分么?

可以通过 filter block 来判断某个 key 是否属于该 data block,实现是 bloom filter。

BlockBuilder实现

方法

  1. void Add(const Slice& key, const Slice& value);
    每一次 Add 的 Key,必须是有序的,从小到大的。
  2. Slice Finish();
  3. void Reset();

BlockBuilder::Add

首先是三个 assert:

  1. 第一个很好理解,如果 finished_,相当于已经调用了 Finish 或者 Abandon 等方法。
  2. block_restart_interval 表示每过多少个 key 就要设置一个 restarts。设置完之后,counter_ 会被重置为0,所以这个不等式是成立的。
  3. 最后一个是有序性检验,要不是空的,要不新来的 key 要大于老的 last_key_piece
1
2
3
4
5
6
void BlockBuilder::Add(const Slice& key, const Slice& value) {
Slice last_key_piece(last_key_);
assert(!finished_);
assert(counter_ <= options_->block_restart_interval);
assert(buffer_.empty() // No values yet?
|| options_->comparator->Compare(key, last_key_piece) > 0);

下面判断要不要 restart:

  1. 如果不要,和前一个 key 即 last_key_ 比较,算出来能 share 多少长度。
  2. 如果要,就 restart。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
...
size_t shared = 0;
if (counter_ < options_->block_restart_interval) {
// See how much sharing to do with previous string
const size_t min_length = std::min(last_key_piece.size(), key.size());
while ((shared < min_length) && (last_key_piece[shared] == key[shared])) {
shared++;
}
} else {
// Restart compression
restarts_.push_back(buffer_.size());
counter_ = 0;
}
const size_t non_shared = key.size() - shared;
...

下面就是往 buffer_ 里面写数据了。

1
2
3
4
5
6
7
8
9
10
...
// Add "<shared><non_shared><value_size>" to buffer_
PutVarint32(&buffer_, shared);
PutVarint32(&buffer_, non_shared);
PutVarint32(&buffer_, value.size());

// Add string delta to buffer_ followed by value
buffer_.append(key.data() + shared, non_shared);
buffer_.append(value.data(), value.size());
...

下面这个优化点也很有趣,首先 last_key_ 保存的是一个完整的 key。但可以复用之前一个 key 的 shared 部分,这个是安全的。接着把 non shared 部分 append 上去。这样就在本轮迭代最后更新了 last_key_,后面还可以和 key 进行校验。

1
2
3
4
5
6
7
...
// Update state
last_key_.resize(shared);
last_key_.append(key.data() + shared, non_shared);
assert(Slice(last_key_) == key);
counter_++;
}

BlockBuilder::Finish

为啥 restarts_ 不用 VarInt 存呢?

1
2
3
4
5
6
7
8
9
Slice BlockBuilder::Finish() {
// Append restart array
for (size_t i = 0; i < restarts_.size(); i++) {
PutFixed32(&buffer_, restarts_[i]);
}
PutFixed32(&buffer_, restarts_.size());
finished_ = true;
return Slice(buffer_);
}

Block实现

  1. uint32_t NumRestarts() const;
    之前了解过 Block 的结构,在 Block 的最后,是 restart 点的个数。也就对应了这个方法。

    1
    2
    3
    4
    inline uint32_t Block::NumRestarts() const {
    assert(size_ >= sizeof(uint32_t));
    return DecodeFixed32(data_ + size_ - sizeof(uint32_t));
    }
  2. const char* data_;/size_t size_;
    也就是这个 Block 的指针和长度。

  3. uint32_t restart_offset_;
    表示 restart 的开始位置

    1
    size_ - (1 + NumRestarts()) * sizeof(uint32_t)
  4. bool owned_;
    取决于构造函数传入的 BlockContentsowned_ 字段。

Block::Iter

Seek

这个函数是二分:

  1. mid < target
    则搜索 [mid, right]
  2. mid >= target
    搜索 [left, mid-1]

这是一个 TTT…F/T 的二分,所以每次要令 mid = (left + right + 1)/2
但这里二分的难点是,Block 中为了压缩空间,使用的是共享前缀的办法进行存储的。因此很难取得二分的 key。因此我们实际上是二分的 restart points。

有一个 GetRestartPoint 函数来计算当前 restart point 位置对应的 entry 的位置。

1
2
3
4
5
6
7
8
class Block::Iter : public Iterator {
...
uint32_t GetRestartPoint(uint32_t index) {
assert(index < num_restarts_);
return DecodeFixed32(data_ + restarts_ + index * sizeof(uint32_t));
}
...
}

下面是函数主体。介绍几个成员变量:

  1. restarts_
    存储传入的 restart,表示在 restarts 数组里面的位置。
  2. current_
    初始值为 restarts。

Valid() 主要是判断 current_ 和 num_restarts_ 的关系。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
void Seek(const Slice& target) override {
// Binary search in restart array to find the last restart point
// with a key < target
uint32_t left = 0;
uint32_t right = num_restarts_ - 1;
int current_key_compare = 0;

if (Valid()) {
// If we're already scanning, use the current position as a starting
// point. This is beneficial if the key we're seeking to is ahead of the
// current position.
current_key_compare = Compare(key_, target);
if (current_key_compare < 0) {
// key_ is smaller than target
left = restart_index_;
} else if (current_key_compare > 0) {
right = restart_index_;
} else {
// We're seeking to the key we're already at.
return;
}
}

while (left < right) {
uint32_t mid = (left + right + 1) / 2;
uint32_t region_offset = GetRestartPoint(mid);
uint32_t shared, non_shared, value_length;
const char* key_ptr =
DecodeEntry(data_ + region_offset, data_ + restarts_, &shared,
&non_shared, &value_length);
if (key_ptr == nullptr || (shared != 0)) {
CorruptionError();
return;
}
Slice mid_key(key_ptr, non_shared);
if (Compare(mid_key, target) < 0) {
// Key at "mid" is smaller than "target". Therefore all
// blocks before "mid" are uninteresting.
left = mid;
} else {
// Key at "mid" is >= "target". Therefore all blocks at or
// after "mid" are uninteresting.
right = mid - 1;
}
}
...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
...
// We might be able to use our current position within the restart block.
// This is true if we determined the key we desire is in the current block
// and is after than the current key.
assert(current_key_compare == 0 || Valid());
bool skip_seek = left == restart_index_ && current_key_compare < 0;
if (!skip_seek) {
SeekToRestartPoint(left);
}
// Linear search (within restart block) for first key >= target
while (true) {
if (!ParseNextKey()) {
return;
}
if (Compare(key_, target) >= 0) {
return;
}
}
}

一些问题

Block 的大小如何选择?

Block size 变大,则:

  1. 索引变少,节约空间
  2. 写入的 IO 次数变少,提高性能
  3. Block Cache 只能缓存更少的 Block,可能造成较多的读盘,读放大加重

Table实现

TableBuilder

  1. Status ChangeOptions(const Options& options);
    修改 option。但如果 Table 已经被创建,那么有些 Option 就不能被修改了。此时会报错。
  2. void Add(const Slice& key, const Slice& value);
    增加一个 KV 对。
    key is after any previously added key according to comparator,往TableBuilder里面加KV,必须是有序的?
  3. void Flush();
    将内存中缓存的数据写入磁盘。因为 SSTable 是一个文件,所以相当于是写一点,Flush 一点,从而控制 Block 的大小。
    注释上说可以被用来保证两个相邻的 Entry 不会在同一个 data block 中,不是很明白什么意思。
  4. Status status() const;
  5. Status Finish();
    结束当前表的构建。
  6. void Abandon();
    表示需要丢弃当前缓存内容,并且结束表的构建。
  7. uint64_t NumEntries() const;
    Add 了多少次,实际上返回的是 TableBuilder::Rep 里面的 num_entries
  8. uint64_t FileSize() const;
  9. void WriteBlock(BlockBuilder* block, BlockHandle* handle);
  10. void WriteRawBlock(const Slice& data, CompressionType, BlockHandle* handle);

此外,TableBuilder 持有一个 Rep 类型的对象指针,用来隐藏相关实现。
在 Add/Flush/Finish/Abandon 中会检查此时 Adandon 和 Finish 不能已经被调用。

TableBuilder::Rep

具有下面的:

  1. Options options;
  2. Options index_block_options;
  3. WritableFile* file;
    WritableFile是一个接口,具体实现可以分为随机读写文件,顺序读写文件等。
  4. uint64_t offset;
  5. Status status;
    ok()的返回值,表示是否发生了错误。一般,错误会在 file 里面的 Append 和 Flush 方法中出现。
  6. BlockBuilder data_block;
  7. BlockBuilder index_block;
  8. std::string last_key;
    每一次 Add 会更新这个字段。因为 Add 是有序的,所以实际上就表示了当前最大的 key。
  9. int64_t num_entries;
    NumEntries
  10. bool closed;
    Finish 和 Abandon 会设置为 true。
  11. FilterBlockBuilder* filter_block;
  12. bool pending_index_entry;
    当写完一个 data block 之后,设置 pending_index_entry,表示需要更新 index block。
    这里有个不变量,当且仅当 data_block 为空的时候 pending_index_entry 才是 true。一个 Table 中有多个 data block,当写完一个 data block 后,设置 pending_index_entry 为 true,之后更新 index block。
  13. BlockHandle pending_handle;
    Handle to add to index block。不是说用这个 handle 来写 index block,而是会把这个 handle 里面的值写到 index block 里面作为 index。
  14. std::string compressed_output;

写 index block 的顺序是:

  1. 写一个 data block
  2. 接到下一个 Add 请求
  3. 根据 last_key 和当前传入的 Key,写 index
  4. 正常处理该 Add 请求

这里会有疑问,为什么不在写完 data block 的时候就直接写 index block,而是要等到读到下一个 data block 的第一个 key 的时候才写呢?其目的是这样可以通过之前提到的 FindShortestSeparator,在写 index 的时候用尽可能短的 key。例如,已经知道第一个 data block 中最大的是 “the quick brown foxggggggggggg”,而第二个 data block 中最小的是 “the who”。如果我们刚写完第一个 data block,就只能用 “the quick brown foxggggggggggg” 这个很长的字符串来写 index 了。而如果读到下一个 key 实际是 “the who” 的话,就很从容地可以用 “the r” 作为 index block 中的 entry。因为 “the r” 既大于等于第一个 block 中的所有 key,又小于第二个 block 中的所有 key。

TableBuilder::Add

1
2
3
4
5
6
7
8
void TableBuilder::Add(const Slice& key, const Slice& value) {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->num_entries > 0) {
assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
}
...

如果 pending_index_entry 是 true,说明之前已经写入了一个 data block。因此要插入 index,然后清理pending_index_entry标志。

1
2
3
4
5
6
7
8
9
10
...
if (r->pending_index_entry) {
assert(r->data_block.empty());
r->options.comparator->FindShortestSeparator(&r->last_key, key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
}
...

下面是 Add 的主要逻辑,先处理 filter block。

1
2
3
4
5
...
if (r->filter_block != nullptr) {
r->filter_block->AddKey(key);
}
...

然后设置 last_key,并向当前的 data block 中添加 KV。
估算当前 data block 的大小,如果超过配置的阈值 options.block_size就进行调用 Flush 生成一个 Block。
这个估算实际上就是统计所有 entry 以及 restart 的总大小。相比Spark 里面的大小估计,感觉 LevelDB/Redis 里面的大小估计要简单很多,感觉得益于 C/C++ 能自己管理内存。

1
2
3
4
5
6
7
8
9
10
...
r->last_key.assign(key.data(), key.size());
r->num_entries++;
r->data_block.Add(key, value);

const size_t estimated_block_size = r->data_block.CurrentSizeEstimate();
if (estimated_block_size >= r->options.block_size) {
Flush();
}
}

TableBuilder::WriteBlock

WriteBlock

Table 在 WriteBlock 要先调用 BlockBuilder::Finish 处理一些元信息,也就是说把所有 restart 都写到文件里面。

1
2
3
4
5
void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
assert(ok());
Rep* r = rep_;
Slice raw = block->Finish();
...

接着,是写 data block 。实际写入的 block_contents 可能是被压缩了的,也可能是没有被压缩的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
...
Slice block_contents;
CompressionType type = r->options.compression;
// TODO(postrelease): Support more compression options: zlib?
switch (type) {
case kNoCompression:
block_contents = raw;
break;

case kSnappyCompression: {
std::string* compressed = &r->compressed_output;
if (port::Snappy_Compress(raw.data(), raw.size(), compressed) &&
compressed->size() < raw.size() - (raw.size() / 8u)) {
block_contents = *compressed;
} else {
// Snappy not supported, or compressed less than 12.5%, so just
// store uncompressed form
block_contents = raw;
type = kNoCompression;
}
break;
}
}
WriteRawBlock(block_contents, type, handle);
r->compressed_output.clear();
...

清空这个 Block 里面 buffer_restarts_ 等状态。

1
2
3
...
block->Reset();
}

TableBuilder::WriteRawBlock

每一个block包含:

  1. data
  2. type 表示有没有压缩
  3. crc32
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void TableBuilder::WriteRawBlock(const Slice& block_contents,
CompressionType type, BlockHandle* handle) {
Rep* r = rep_;
handle->set_offset(r->offset);
handle->set_size(block_contents.size());
r->status = r->file->Append(block_contents);
if (r->status.ok()) {
char trailer[kBlockTrailerSize];
trailer[0] = type;
uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size());
crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type
EncodeFixed32(trailer + 1, crc32c::Mask(crc));
r->status = r->file->Append(Slice(trailer, kBlockTrailerSize));
if (r->status.ok()) {
r->offset += block_contents.size() + kBlockTrailerSize;
}
}
}

TableBuilder::Flush

当目前内存中的数据达到一个 Block 的大小时,就调用 Flush。

Flush 一开始是判断一些条件:

  1. 如果说 data block 是空的,那么就直接返回。
  2. 断言 pending_index_entry 这个是 true。因为如果没写 index,肯定不能 flush。
1
2
3
4
5
6
7
void TableBuilder::Flush() {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
if (r->data_block.empty()) return;
assert(!r->pending_index_entry);
...

根据 pending_handle 的说明,它的值会被写到 index block 里面。

1
2
3
4
5
6
7
8
9
10
...
WriteBlock(&r->data_block, &r->pending_handle);
if (ok()) {
r->pending_index_entry = true;
r->status = r->file->Flush();
}
if (r->filter_block != nullptr) {
r->filter_block->StartBlock(r->offset);
}
}

TableBuilder::Finish

Finish 操作用来生成一个 SSTable。需要区分 BlockBuilder::Finish。
首先先 Flush,也就是把 data_block 写盘。

1
2
3
4
5
6
Status TableBuilder::Finish() {
Rep* r = rep_;
Flush();
assert(!r->closed);
r->closed = true;
...

什么时候不 ok 呢?也就是发生错误的情况。
下面,写入 filter block。

1
2
3
4
5
6
7
8
...
BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle;
// Write filter block
if (ok() && r->filter_block != nullptr) {
WriteRawBlock(r->filter_block->Finish(), kNoCompression,
&filter_block_handle);
}
...

如果上面成功,接着写入 meta index block。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
...
// Write metaindex block
if (ok()) {
BlockBuilder meta_index_block(&r->options);
if (r->filter_block != nullptr) {
// Add mapping from "filter.Name" to location of filter data
std::string key = "filter.";
key.append(r->options.filter_policy->Name());
std::string handle_encoding;
filter_block_handle.EncodeTo(&handle_encoding);
meta_index_block.Add(key, handle_encoding);
}

// TODO(postrelease): Add stats and other meta blocks
WriteBlock(&meta_index_block, &metaindex_block_handle);
}
...

如果上面成功,接着写入 index block。

1
2
3
4
5
6
7
8
9
10
11
12
13
...
// Write index block
if (ok()) {
if (r->pending_index_entry) {
r->options.comparator->FindShortSuccessor(&r->last_key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
r->index_block.Add(r->last_key, Slice(handle_encoding));
r->pending_index_entry = false;
}
WriteBlock(&r->index_block, &index_block_handle);
}
...

如果上面成功,接着写入 footer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
...
// Write footer
if (ok()) {
Footer footer;
footer.set_metaindex_handle(metaindex_block_handle);
footer.set_index_handle(index_block_handle);
std::string footer_encoding;
footer.EncodeTo(&footer_encoding);
r->status = r->file->Append(footer_encoding);
if (r->status.ok()) {
r->offset += footer_encoding.size();
}
}
return r->status;
}

Table

下面介绍 Table 类,它的作用是负责读取 SSTable。

  1. static Status Open(const Options& options, RandomAccessFile* file, uint64_t file_size, Table** table);
    解析传入的 file
    如果成功,返回 ok,并且设置 *table,这是一个指针,由调用方释放。
    如果失败,返回一个非 ok,并且设置 *table 为 nullptr。
    Does not take ownership of “*source”, but the client must ensure that “source” remains live for the duration of the returned table’s lifetime.
  2. Iterator* NewIterator(const ReadOptions&) const;
  3. uint64_t ApproximateOffsetOf(const Slice& key) const;
    传入一个 key,返回它在文件中的大概位置。对不存在的 key,返回如果存在,那么大概在的位置。
  4. static Iterator* BlockReader(void*, const ReadOptions&, const Slice&);
    TwoLevelIterator 需要这个函数,通过它来构建一个 data_iter_
  5. Status InternalGet(const ReadOptions&, const Slice& key, void* arg, void (*handle_result)(void* arg, const Slice& k, const Slice& v));
  6. void ReadMeta(const Footer& footer);
  7. void ReadFilter(const Slice& filter_handle_value);

Table::Rep

  1. Options options;
  2. Status status;
  3. RandomAccessFile* file;
  4. uint64_t cache_id;
  5. FilterBlockReader* filter;
  6. const char* filter_data;
  7. BlockHandle metaindex_handle;
  8. Block* index_block;

Table::Open

首先解析出 footer。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Status Table::Open(const Options& options, RandomAccessFile* file,
uint64_t size, Table** table) {
*table = nullptr;
if (size < Footer::kEncodedLength) {
return Status::Corruption("file is too short to be an sstable");
}

char footer_space[Footer::kEncodedLength];
Slice footer_input;
Status s = file->Read(size - Footer::kEncodedLength, Footer::kEncodedLength,
&footer_input, footer_space);
if (!s.ok()) return s;

Footer footer;
s = footer.DecodeFrom(&footer_input);
if (!s.ok()) return s;

接下来,解析 index block。

1
2
3
4
5
6
7
// Read the index block
BlockContents index_block_contents;
ReadOptions opt;
if (options.paranoid_checks) {
opt.verify_checksums = true;
}
s = ReadBlock(file, opt, footer.index_handle(), &index_block_contents);

初始化 rep_ 字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
  if (s.ok()) {
// We've successfully read the footer and the index block: we're
// ready to serve requests.
Block* index_block = new Block(index_block_contents);
Rep* rep = new Table::Rep;
rep->options = options;
rep->file = file;
rep->metaindex_handle = footer.metaindex_handle();
rep->index_block = index_block;
rep->cache_id = (options.block_cache ? options.block_cache->NewId() : 0);
rep->filter_data = nullptr;
rep->filter = nullptr;
*table = new Table(rep);
(*table)->ReadMeta(footer);
}

return s;
}

Table::NewIterator

实际上调用 NewTwoLevelIterator 得到一个 TwoLevelIterator

1
2
3
4
5
6
7
8
9
Iterator* Table::NewIterator(const ReadOptions& options) const {
return NewTwoLevelIterator(
rep_->index_block->NewIterator(rep_->options.comparator),
&Table::BlockReader, const_cast<Table*>(this), options);
}

Iterator* NewTwoLevelIterator(Iterator* index_iter,
BlockFunction block_function, void* arg,
const ReadOptions& options);

方法 NewIterator 的实现如下。如果没有 restart 点,那么就创建一个空的迭代器,否则创建一个Block::Iter

1
2
3
4
5
6
7
8
9
10
11
Iterator* Block::NewIterator(const Comparator* comparator) {
if (size_ < sizeof(uint32_t)) {
return NewErrorIterator(Status::Corruption("bad block contents"));
}
const uint32_t num_restarts = NumRestarts();
if (num_restarts == 0) {
return NewEmptyIterator();
} else {
return new Iter(comparator, data_, restart_offset_, num_restarts);
}
}

两个 Level 的含义是:

  1. IteratorWrapper index_iter_ 负责查询 index block,找到 key 所在的 data block。
    IteratorWrapper 封装了 Iterator,可以理解为一层对 valid()key() 的 cache。整体上类似于 TiFlash 中 MultiSSTReader 的实现。
    Iterator是个接口,实际类型应该是Block::Iter【待确认】。
  2. IteratorWrapper data_iter_ 负责在这个 block 里面查找。

TwoLevelIterator

  1. BlockFunction block_function_;
    block_function_ 可以从一个 index_iter_ 创建一个 data_iter_
    在 Table 的实现中,是 Table::BlockReader 这个函数。我们将在后面详细分析这个函数。
  2. void* arg_;
    在 Table 的实现中,传入了 Table* this
  3. const ReadOptions options_;
  4. Status status_;
  5. IteratorWrapper index_iter_;
  6. IteratorWrapper data_iter_;
  7. std::string data_block_handle_;
    如果 data_iter_ 不是 null,那么 data_block_handle_ 持有传给 block_function_ 的那个 index_iter_ 的值。

TwoLevelIterator::Next

存在一个问题,如果一直 data_iter_.Next(),迟早会碰到一个 Block 的右边界,这样后面迭代器就 Invalid 了。因此需要检查如果 data_iter_ 当前已经失效了,那么就递增 index_iter_,获取下一个 data_iter_,具体实现见下面。

1
2
3
4
5
void TwoLevelIterator::Next() {
assert(Valid());
data_iter_.Next();
SkipEmptyDataBlocksForward();
}

TwoLevelIterator::SkipEmptyDataBlocksForward

【Q】在上面说过这个函数的作用了,但是为啥这里实现是 while 而不是 if 呢?

1
2
3
4
void TwoLevelIterator::SkipEmptyDataBlocksForward() {
while (data_iter_.iter() == nullptr || !data_iter_.Valid()) {
// Move to next block
...

SetDataIterator 函数接受一个迭代器作为参数,如果迭代器不是空,那么就设置为 data_iter_,并且释放掉原来的 iter_ 内存。

1
2
3
4
5
6
7
8
...
if (!index_iter_.Valid()) {
SetDataIterator(nullptr);
return;
}
index_iter_.Next();
InitDataBlock();
...

需要注意,这里需要显式将data_iter_移动到当前data block的开头。

1
2
3
4
...
if (data_iter_.iter() != nullptr) data_iter_.SeekToFirst();
}
}

TwoLevelIterator::InitDataBlock

InitDataBlock 作用是从 index_iter_ 构建或者说解析出一个 data block。
如果 index_iter_ 无效,那么设置 data_iter_ 也无效。
如果 data_iter_ 不为空,并且等于之前的 data_block_handle_,说明 data_iter_ 现在就指向的这个data block,那么就跳过。
否则,以 index_iter_ 为参数,通过 block_function_ 生成一个新的 data_iter_

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void TwoLevelIterator::InitDataBlock() {
if (!index_iter_.Valid()) {
SetDataIterator(nullptr);
} else {
Slice handle = index_iter_.value();
if (data_iter_.iter() != nullptr &&
handle.compare(data_block_handle_) == 0) {
} else {
Iterator* iter = (*block_function_)(arg_, options_, handle);
data_block_handle_.assign(handle.data(), handle.size());
SetDataIterator(iter);
}
}
}

TwoLevelIterator::Seek

首先,在 index block 层 Seek。下面证明只要找这个 index_iter_ 指向的 data block 就行,也就是说,target 不会出现在 (index_iter_ - 1)(index_iter_ + 1) 指向的 data block 里面。

  1. 因为 LevelDB 中的性质,Seek 得到的是第一个大于等于 target 的指针。此时,(index_iter_ - 1) 中的 .key()严格小于 target 的。而根据 index block 的【性质1】,这个 index block entry 指向的 data block 中的所有 K 都小于等于 (index_iter_ - 1).key()。因此,(index_iter_ - 1) 指向的 data block 里面所有的 K,都小于 target。
  2. 此外,index_iter_.key() 是大于等于 target 的。
  3. 下面还要证明 (index_iter_ + 1).key() 指向的 data block 里面的所有 K 都大于 target。根据【性质2】我们知道 index_iter_.key() 会严格小于它指向的下一个 data block 中的所有 K,根据我们上一条结论可以知道 target 严格小于下一个 data block 中的所有 K,所以 target 如果存在的话,一定是当前 data block 上的。
1
2
3
void TwoLevelIterator::Seek(const Slice& target) {
index_iter_.Seek(target);
...

接着调用 InitDataBlock 初始化 data_iter_。接着在 data block 层 Seek。

1
2
3
4
5
...
InitDataBlock();
if (data_iter_.iter() != nullptr) data_iter_.Seek(target);
SkipEmptyDataBlocksForward();
}

Table::BlockReader

Table::BlockReader 创建一个 Iterator,这个 Iterator 实际上是一个 Block::Iter 对象,由 Block::NewIterator 产生。而 Block::Iter 的方法,例如 Seek,在前面已经介绍过了。

1
2
3
4
5
6
7
8
9
10
11
Iterator* Block::NewIterator(const Comparator* comparator) {
if (size_ < sizeof(uint32_t)) {
return NewErrorIterator(Status::Corruption("bad block contents"));
}
const uint32_t num_restarts = NumRestarts();
if (num_restarts == 0) {
return NewEmptyIterator();
} else {
return new Iter(comparator, data_, restart_offset_, num_restarts);
}
}

接受三个参数:

  1. arg
    这个类型设置就很奇怪,实际上是一个 Table*,表示现在读的那个 Table 的上下文。
  2. index_value
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// Convert an index iterator value (i.e., an encoded BlockHandle)
// into an iterator over the contents of the corresponding block.
Iterator* Table::BlockReader(void* arg, const ReadOptions& options,
const Slice& index_value) {
Table* table = reinterpret_cast<Table*>(arg);
Cache* block_cache = table->rep_->options.block_cache;
Block* block = nullptr;
Cache::Handle* cache_handle = nullptr;

BlockHandle handle;
Slice input = index_value;
Status s = handle.DecodeFrom(&input);
// We intentionally allow extra stuff in index_value so that we
// can add more features in the future.

if (s.ok()) {
BlockContents contents;
if (block_cache != nullptr) {
char cache_key_buffer[16];
EncodeFixed64(cache_key_buffer, table->rep_->cache_id);
EncodeFixed64(cache_key_buffer + 8, handle.offset());
Slice key(cache_key_buffer, sizeof(cache_key_buffer));
cache_handle = block_cache->Lookup(key);
if (cache_handle != nullptr) {
block = reinterpret_cast<Block*>(block_cache->Value(cache_handle));
} else {
s = ReadBlock(table->rep_->file, options, handle, &contents);
if (s.ok()) {
block = new Block(contents);
if (contents.cachable && options.fill_cache) {
cache_handle = block_cache->Insert(key, block, block->size(),
&DeleteCachedBlock);
}
}
}
} else {
s = ReadBlock(table->rep_->file, options, handle, &contents);
if (s.ok()) {
block = new Block(contents);
}
}
}
...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
...
Iterator* iter;
if (block != nullptr) {
iter = block->NewIterator(table->rep_->options.comparator);
if (cache_handle == nullptr) {
iter->RegisterCleanup(&DeleteBlock, block, nullptr);
} else {
iter->RegisterCleanup(&ReleaseBlock, block_cache, cache_handle);
}
} else {
iter = NewErrorIterator(s);
}
return iter;
}

Reference

  1. https://izualzhy.cn/leveldb-block
  2. https://izualzhy.cn/leveldb-sstable