C++从零开始实现LSM-Tree-KV存储-03-Block

本小节实现 Block 部分, BlockSSTable 的基本单位, 也是 LSM Tree 的最小读写单位。

本章完整代码参见:
https://github.com/ToniXWD/toni-lsm/tree/master/src/block
https://github.com/ToniXWD/toni-lsm/tree/master/include/block

代码不断迭代, 可能会和文章中的代码有出入, 但基本原理差不多

1 Block基础概念

1.1 Block与SST

MemTable-SST

介绍 Block 之前, 需要先说明什么是SST, SSTSorted String Table 的缩写, 也就是有序字符串表, 是 LSM Tree 的基本存储单位, 也是 Block 的集合。当 MemTable 达到一定大小后, 就会被 flushSST 中, SST 会被写入到磁盘中, 以此来保证数据的持久化。因此SSTImmutable MemTable是一一对应的关系。

SST被划分为多个 Block, BlockSST 的最小读写单位, 也是 LSM Tree 的最小读写单位。这样划分的好处是可以减少读取的数据量, 提高读取性能。因为SSt是磁盘上的数据, 因此将其划分为多个 Block 可以减少读取的数据量, 提高读取性能。

1.2 Block的编码

Block 的编码方式有很多种, 这里我就参考了Mini-LSM采用最简单的编码方式, 也没有什么压缩算法, 其编码格式如下:

1
2
3
4
5
6
7
8
9
10
11
------------------------------------------------------------------------------------------------------------------
| Data Section | Offset Section | Extra Information |
------------------------------------------------------------------------------------------------------------------
| Entry #1 | Entry #2 | ... | Entry #N | Offset #1(2B) | Offset #2(2B) |...| Offset #N(2B) | num_of_elements(2B) |
------------------------------------------------------------------------------------------------------------------

-----------------------------------------------------------------------
| Entry #1 | ... |
-----------------------------------------------------------------------
| key_len (2B) | key (keylen) | value_len (2B) | value (varlen) | ... |
-----------------------------------------------------------------------

上图的B表示一个字节

一个Block包含:Data SectionOffset SectionExtra Information三部分:

  • Data Section: 存放所有的数据, 也就是keyvalue的序列化数据
    • 每一对keyvalue都是一个Entry, 编码信息包括key_lenkeyvalue_lenvalue
      • key_lenvalue_len都是2Buint16_t类型, 用于表示keyvalue的长度
      • keyvalue就是对应长度的字节数, 因此需要用varlen来表示。
  • Offset Section: 存放所有Entry的偏移量, 用于快速定位
  • Extra Information: 存放num_of_elements, 也就是Entry的数量

这样编码满足了最基本的要求, 从最后的2个字节可以知道Block包含了多少kv对, 再从Offset Section中查询对应的kv对数据的偏移

2 Block代码实现

2.1 头文件定义

首先定义一个Block对应的类, 按照编码信息的需求, 可以如下定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class BlockIterator; // 迭代器, 后续会介绍

// public std::enable_shared_from_this<Block> 允许获取对象的智能指针
class Block : public std::enable_shared_from_this<Block> {
friend BlockIterator;

private:
std::vector<uint8_t> data; // `Data Section` 部分
std::vector<uint16_t> offsets; // `Offset Section` 部分
size_t capacity; // block的容量, 超出这个容量内存数据就会被编码

struct Entry {
std::string key;
std::string value;
};
Entry get_entry_at(size_t offset) const;
std::string get_key_at(size_t offset) const;
std::string get_value_at(size_t offset) const;
int compare_key_at(size_t offset, const std::string &target) const;

public:
Block() = default;
Block(size_t capacity);
// ! 这里的编码函数不包括 hash
std::vector<uint8_t> encode(); // 编码为二进制数据
// ! 这里的解码函数可指定切片是否包括 hash
static std::shared_ptr<Block> decode(const std::vector<uint8_t> &encoded,
bool with_hash = false); // 从二进制序列中解码
std::string get_first_key();
size_t get_offset_at(size_t idx) const;
bool add_entry(const std::string &key, const std::string &value);
std::optional<std::string> get_value_binary(const std::string &key);
size_t cur_size() const;
bool is_empty() const;
std::optional<size_t> get_idx_binary(const std::string &key); // 二分查找

BlockIterator begin(); // 迭代器

BlockIterator end(); // 迭代器
};

这里可能涉及C++的新特性:

1
public std::enable_shared_from_this<Block>

std::enable_shared_from_this 是 C++11 引入的一个标准库特性,它允许一个对象安全地创建指向自身的std::shared_ptr。这里先简单说明一下, 后续实现迭代器的时候就知道其作用了。

2.2 编解码

先看编码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
std::vector<uint8_t> Block::encode() {
// 计算总大小:数据段 + 偏移数组(每个偏移2字节) + 元素个数(2字节)
size_t total_bytes = data.size() * sizeof(uint8_t) +
offsets.size() * sizeof(uint16_t) + sizeof(uint16_t);
std::vector<uint8_t> encoded(total_bytes, 0);

// 1. 复制数据段
memcpy(encoded.data(), data.data(), data.size() * sizeof(uint8_t));

// 2. 复制偏移数组
size_t offset_pos = data.size() * sizeof(uint8_t);
memcpy(encoded.data() + offset_pos,
offsets.data(), // vector 的连续内存起始位置
offsets.size() * sizeof(uint16_t) // 总字节数
);

// 3. 写入元素个数
size_t num_pos =
data.size() * sizeof(uint8_t) + offsets.size() * sizeof(uint16_t);
uint16_t num_elements = offsets.size();
memcpy(encoded.data() + num_pos, &num_elements, sizeof(uint16_t));

return encoded;
}

很简单, 基本上就是履行了之前编码格式的过程

2.1.3 解码

解码就是编码的逆过程, 需要注意的是, 后续的SST在每个block后可能会添加一个哈希校验值, 因此这里可以通过with_hash设置是否二进制序列包括哈希值的部分。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
std::shared_ptr<Block> Block::decode(const std::vector<uint8_t> &encoded,
bool with_hash) {
// 使用 make_shared 创建对象
auto block = std::make_shared<Block>();

// 1. 安全性检查
if (encoded.size() < sizeof(uint16_t)) {
throw std::runtime_error("Encoded data too small");
}

// 2. 读取元素个数
uint16_t num_elements;
size_t num_elements_pos = encoded.size() - sizeof(uint16_t);
if (with_hash) {
num_elements_pos -= sizeof(uint32_t);
auto hash_pos = encoded.size() - sizeof(uint32_t);
uint32_t hash_value;
memcpy(&hash_value, encoded.data() + hash_pos, sizeof(uint32_t));

uint32_t compute_hash = std::hash<std::string_view>{}(
std::string_view(reinterpret_cast<const char *>(encoded.data()),
encoded.size() - sizeof(uint32_t)));
if (hash_value != compute_hash) {
throw std::runtime_error("Block hash verification failed");
}
}
memcpy(&num_elements, encoded.data() + num_elements_pos, sizeof(uint16_t));

// 3. 验证数据大小
size_t required_size = sizeof(uint16_t) + num_elements * sizeof(uint16_t);
if (encoded.size() < required_size) {
throw std::runtime_error("Invalid encoded data size");
}

// 4. 计算各段位置
size_t offsets_section_start =
num_elements_pos - num_elements * sizeof(uint16_t);

// 5. 读取偏移数组
block->offsets.resize(num_elements);
memcpy(block->offsets.data(), encoded.data() + offsets_section_start,
num_elements * sizeof(uint16_t));

// 6. 复制数据段
block->data.reserve(offsets_section_start); // 优化内存分配
block->data.assign(encoded.begin(), encoded.begin() + offsets_section_start);

return block;
}

2.3 Block构建代码

Block的构建过程是这样的:

  1. 先在内存中插入数据, 即类定义中的offsets, data
  2. 当类定义的 data数据超过capacity时, 调用encode进行编码

这里就先给出最基础的构建Block中向内存插入kv数据的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
bool Block::add_entry(const std::string &key, const std::string &value) {
if ((cur_size() + key.size() + value.size() + 3 * sizeof(uint16_t) >
capacity) &&
!offsets.empty()) {
return false;
}
// 计算entry大小:key长度(2B) + key + value长度(2B) + value
size_t entry_size =
sizeof(uint16_t) + key.size() + sizeof(uint16_t) + value.size();
size_t old_size = data.size();
data.resize(old_size + entry_size);

// 写入key长度
uint16_t key_len = key.size();
memcpy(data.data() + old_size, &key_len, sizeof(uint16_t));

// 写入key
memcpy(data.data() + old_size + sizeof(uint16_t), key.data(), key_len);

// 写入value长度
uint16_t value_len = value.size();
memcpy(data.data() + old_size + sizeof(uint16_t) + key_len, &value_len,
sizeof(uint16_t));

// 写入value
memcpy(data.data() + old_size + sizeof(uint16_t) + key_len + sizeof(uint16_t),
value.data(), value_len);

// 记录偏移
offsets.push_back(old_size);
return true;
}

2.4 二分查找

由于sstblcok的构建本质上就是按照我们之前实现的MemTableSkipList迭代器插入数据, 因此其已经是排好序的, 查询时可以使用二分查找:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
std::optional<size_t> Block::get_idx_binary(const std::string &key) {
if (offsets.empty()) {
return std::nullopt;
}
// 二分查找
int left = 0;
int right = offsets.size() - 1;

while (left <= right) {
int mid = left + (right - left) / 2;
size_t mid_offset = offsets[mid];

int cmp = compare_key_at(mid_offset, key);

if (cmp == 0) {
// 找到key,返回对应的value
return mid;
} else if (cmp < 0) {
// 中间的key小于目标key,查找右半部分
left = mid + 1;
} else {
// 中间的key大于目标key,查找左半部分
right = mid - 1;
}
}

return std::nullopt;
}

这里不给出全部的代码了, 有兴趣可以看Github仓库

3 Block迭代器

3.1 处理this指针

现在实现我们的Block的迭代器。通过Block的编码格式可知, 只需要知道当前的索引, 就可以在Block中查询该索引的kv对, 因此迭代器只需要记录当前索引和原始Block的引用就可以了。

这也就是之前提到的std::enable_shared_from_this的作用,让我们的BlockIterator可以使用Block的智能指针, 为什么这样设计呢? 看下面的案例代码就知道了:

1
2
3
4
5
6
7
8
TEST_F(BlockTest, IteratorTest) {
// 使用 make_shared 创建 Block
std::shared_ptr<Block> block = std::make_shared<Block>(4096);

// 1. 测试空block的迭代器
EXPECT_EQ(block->begin(), block->end());
...
}

迭代器对象BlockIterator是通过block->begin()获取的, 因此相当于类的成员函数返回的对象中需要当前对象的this指针, 因此C++11引入了std::enable_shared_from_this来使用shared_ptr来封装this指针, 但当前类必须也是由shared_ptr管理的, 正如代码中的:

1
std::shared_ptr<Block> block = std::make_shared<Block>(4096);

3.2 实现

这个实现其实很简单, 因为这里的逻辑是, sst负责从文件中读取block, 这里的block只需要读取后的内存中的数据结构, 所以也就是维护当前索引就是了, 另外这里有个小优化, 尽管有索引就在block中进行二分查找, 但这里还是用一个值进行缓存, 因为迭代器可能被反复读取值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Block;

class BlockIterator {
public:
// 标准迭代器类型定义
using iterator_category = std::forward_iterator_tag;
using value_type = std::pair<std::string, std::string>;
using difference_type = std::ptrdiff_t;
using pointer = const value_type *;
using reference = const value_type &;

// 构造函数
BlockIterator(std::shared_ptr<Block> b, size_t index);
BlockIterator(std::shared_ptr<Block> b, const std::string &key);
BlockIterator(std::shared_ptr<Block> b);
BlockIterator() : block(nullptr), current_index(0) {} // end iterator

// 迭代器操作
BlockIterator &operator++();
BlockIterator operator++(int) = delete;
bool operator==(const BlockIterator &other) const;
bool operator!=(const BlockIterator &other) const;
value_type operator*() const;
bool is_end();

private:
std::shared_ptr<Block> block; // 指向所属的 Block
size_t current_index; // 当前位置的索引
mutable std::optional<value_type> cached_value; // 缓存当前值
};

这里的std::optional也是一个智能指针, 其可以为空, 可以看做青春版的RustOption

代码实现只要是自增运算符和解引用运算符:

自增运算符

1
2
3
4
5
6
7
BlockIterator &BlockIterator::operator++() {
if (block && current_index < block->cur_size()) {
++current_index;
cached_value = std::nullopt; // 清除缓存
}
return *this;
}

解引用运算符

1
2
3
4
5
6
7
8
9
10
11
12
13
BlockIterator::value_type BlockIterator::operator*() const {
if (!block || current_index >= block->cur_size()) {
throw std::out_of_range("Iterator out of range");
}

// 使用缓存避免重复解析
if (!cached_value.has_value()) {
size_t offset = block->get_offset_at(current_index);
cached_value =
std::make_pair(block->get_key_at(offset), block->get_value_at(offset));
}
return *cached_value;
}

4 单元测试

同样进行完整的单元测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
#include "../include/block/block.h"
#include "../include/block/block_iterator.h"
#include <gtest/gtest.h>

class BlockTest : public ::testing::Test {
protected:
// 预定义的编码数据
std::vector<uint8_t> getEncodedBlock() {
/*
Block layout (3 entries):
Entry1: key="apple", value="red"
Entry2: key="banana", value="yellow"
Entry3: key="orange", value="orange"
*/
std::vector<uint8_t> encoded = {
// Data Section
// Entry 1: "apple" -> "red"
5, 0, // key_len = 5
'a', 'p', 'p', 'l', 'e', // key
3, 0, // value_len = 3
'r', 'e', 'd', // value

// Entry 2: "banana" -> "yellow"
6, 0, // key_len = 6
'b', 'a', 'n', 'a', 'n', 'a', // key
6, 0, // value_len = 6
'y', 'e', 'l', 'l', 'o', 'w', // value

// Entry 3: "orange" -> "orange"
6, 0, // key_len = 6
'o', 'r', 'a', 'n', 'g', 'e', // key
6, 0, // value_len = 6
'o', 'r', 'a', 'n', 'g', 'e', // value

// Offset Section (每个entry的起始位置)
0, 0, // offset[0] = 0
12, 0, // offset[1] = 12 (第二个entry的起始位置)
28, 0, // offset[2] = 24 (第三个entry的起始位置)

// Num of elements
3, 0 // num_elements = 3
};
return encoded;
}
};

// 测试解码
TEST_F(BlockTest, DecodeTest) {
auto encoded = getEncodedBlock();
auto block = Block::decode(encoded);

// 验证第一个key
EXPECT_EQ(block->get_first_key(), "apple");

// 验证所有key-value对
EXPECT_EQ(block->get_value_binary("apple").value(), "red");
EXPECT_EQ(block->get_value_binary("banana").value(), "yellow");
EXPECT_EQ(block->get_value_binary("orange").value(), "orange");
}

// 测试编码
TEST_F(BlockTest, EncodeTest) {
Block block(1024);
block.add_entry("apple", "red");
block.add_entry("banana", "yellow");
block.add_entry("orange", "orange");

auto encoded = block.encode();

// 解码并验证
auto decoded = Block::decode(encoded);
EXPECT_EQ(decoded->get_value_binary("apple").value(), "red");
EXPECT_EQ(decoded->get_value_binary("banana").value(), "yellow");
EXPECT_EQ(decoded->get_value_binary("orange").value(), "orange");
}

// 测试二分查找
TEST_F(BlockTest, BinarySearchTest) {
Block block(1024);
block.add_entry("apple", "red");
block.add_entry("banana", "yellow");
block.add_entry("orange", "orange");

// 测试存在的key
EXPECT_EQ(block.get_value_binary("apple").value(), "red");
EXPECT_EQ(block.get_value_binary("banana").value(), "yellow");
EXPECT_EQ(block.get_value_binary("orange").value(), "orange");

// 测试不存在的key
EXPECT_FALSE(block.get_value_binary("grape").has_value());
EXPECT_FALSE(block.get_value_binary("").has_value());
}

// 测试边界情况
TEST_F(BlockTest, EdgeCasesTest) {
Block block(1024);

// 空block
EXPECT_EQ(block.get_first_key(), "");
EXPECT_FALSE(block.get_value_binary("any").has_value());

// 添加空key和value
block.add_entry("", "");
EXPECT_EQ(block.get_first_key(), "");
EXPECT_EQ(block.get_value_binary("").value(), "");

// 添加包含特殊字符的key和value
block.add_entry("key\0with\tnull", "value\rwith\nnull");
std::string special_key("key\0with\tnull");
std::string special_value("value\rwith\nnull");
EXPECT_EQ(block.get_value_binary(special_key).value(), special_value);
}

// 测试大数据量
TEST_F(BlockTest, LargeDataTest) {
Block block(1024 * 32);
const int n = 1000;

// 添加大量数据
for (int i = 0; i < n; i++) {
// 使用 std::format 或 sprintf 进行补零
char key_buf[16];
snprintf(key_buf, sizeof(key_buf), "key%03d", i); // 补零到3位
std::string key = key_buf;

char value_buf[16];
snprintf(value_buf, sizeof(value_buf), "value%03d", i);
std::string value = value_buf;

block.add_entry(key, value);
}

// 验证所有数据
for (int i = 0; i < n; i++) {
char key_buf[16];
snprintf(key_buf, sizeof(key_buf), "key%03d", i);
std::string key = key_buf;

char value_buf[16];
snprintf(value_buf, sizeof(value_buf), "value%03d", i);
std::string expected_value = value_buf;

EXPECT_EQ(block.get_value_binary(key).value(), expected_value);
}
}

// 测试错误处理
TEST_F(BlockTest, ErrorHandlingTest) {
// 测试解码无效数据
std::vector<uint8_t> invalid_data = {1, 2, 3}; // 太短
EXPECT_THROW(Block::decode(invalid_data), std::runtime_error);

// 测试空vector
std::vector<uint8_t> empty_data;
EXPECT_THROW(Block::decode(empty_data), std::runtime_error);
}

// 测试迭代器
TEST_F(BlockTest, IteratorTest) {
// 使用 make_shared 创建 Block
auto block = std::make_shared<Block>(4096);

// 1. 测试空block的迭代器
EXPECT_EQ(block->begin(), block->end());

// 2. 添加有序数据
const int n = 100;
std::vector<std::pair<std::string, std::string>> test_data;

for (int i = 0; i < n; i++) {
char key_buf[16], value_buf[16];
snprintf(key_buf, sizeof(key_buf), "key%03d", i);
snprintf(value_buf, sizeof(value_buf), "value%03d", i);

block->add_entry(key_buf, value_buf);
test_data.emplace_back(key_buf, value_buf);
}

// 3. 测试正向遍历和数据正确性
size_t count = 0;
for (const auto &[key, value] : *block) { // 注意这里使用 *block
EXPECT_EQ(key, test_data[count].first);
EXPECT_EQ(value, test_data[count].second);
count++;
}
EXPECT_EQ(count, test_data.size());

// 4. 测试迭代器的比较和移动
auto it = block->begin();
EXPECT_EQ((*it).first, "key000");
++it;
EXPECT_EQ((*it).first, "key001");
++it;
EXPECT_EQ((*it).first, "key002");

// 5. 测试编码后的迭代
auto encoded = block->encode();
auto decoded_block = Block::decode(encoded);
count = 0;
for (auto it = decoded_block->begin(); it != decoded_block->end(); ++it) {
EXPECT_EQ((*it).first, test_data[count].first);
EXPECT_EQ((*it).second, test_data[count].second);
count++;
}
}

int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS();
}

结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
(base) ➜  toni-lsm git:(master) xmake run test_block
[==========] Running 7 tests from 1 test suite.
[----------] Global test environment set-up.
[----------] 7 tests from BlockTest
[ RUN ] BlockTest.DecodeTest
[ OK ] BlockTest.DecodeTest (0 ms)
[ RUN ] BlockTest.EncodeTest
[ OK ] BlockTest.EncodeTest (0 ms)
[ RUN ] BlockTest.BinarySearchTest
[ OK ] BlockTest.BinarySearchTest (0 ms)
[ RUN ] BlockTest.EdgeCasesTest
[ OK ] BlockTest.EdgeCasesTest (0 ms)
[ RUN ] BlockTest.LargeDataTest
[ OK ] BlockTest.LargeDataTest (0 ms)
[ RUN ] BlockTest.ErrorHandlingTest
[ OK ] BlockTest.ErrorHandlingTest (0 ms)
[ RUN ] BlockTest.IteratorTest
[ OK ] BlockTest.IteratorTest (0 ms)
[----------] 7 tests from BlockTest (0 ms total)

[----------] Global test environment tear-down
[==========] 7 tests from 1 test suite ran. (0 ms total)
[ PASSED ] 7 tests.