C++从零开始实现LSM-Tree-KV存储-07-单机存储引擎1

本小节将串联之前实现的MemTable, SST, Block, 各类Iterator, 实现一个初版的完整的单机KV存储引擎。

代码仓库:https://github.com/ToniXWD/toni-lsm

欢迎点个Star

1 控制结构设计

这里再次回顾一下LSM Tree的读写流程:

写入/删除

  1. 写入MemTable, 如果写入的KVvalue为空, 表示一个删除标记
  2. 若当前活跃的MemTable大小达到阈值, 则将其冻结
  3. 若冻结的MemTable数量或大小达到阈值, 则将最早冻结的MemTable转为SST

查询

  1. 查询当前活跃的MemTable, 如果查到有效记录或删除记录, 则返回
  2. 若查询当前活跃的MemTable未命中, 则遍历冻结的MemTable, 由于冻结的MemTable也存在次序, 需要先查询最近冻结的MemTable
  3. 若查询冻结的MemTable未命中, 则遍历SST, 由于SST也存在次序, 需要先查询最近创建的SST

因此我们只需要整合之前创建的MemTable, SST, Block, Iterator即可, 很容易写出下面的类定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
class LSMEngine {
public:
std::string data_dir; // 数据目录
MemTable memtable; // MemTable (包括当前活跃的MemTable和已经冻结的MemTable)
std::list<size_t> l0_sst_ids; // L0 SST ids
std::unordered_map<size_t, std::shared_ptr<SST>> ssts; // SST 的描述类 (类似文件句柄)
std::shared_mutex ssts_mtx; // SST 读写锁, 用于 SST 变化时 保证线程安全
std::shared_ptr<BlockCache> block_cache; // 之前实现的Block LRU-K缓存

LSMEngine(std::string path); // 构造函数
~LSMEngine(); // 析构函数

std::optional<std::string> get(const std::string &key); // 查询函数

void put(const std::string &key, const std::string &value); // 写入函数
void put_batch(const std::vector<std::pair<std::string, std::string>> &kvs); // 批量写入函数
void remove(const std::string &key); // 删除函数
void remove_batch(const std::vector<std::string> &keys); // 批量删除函数
void clear(); // 清空数据库函数
void flush(); // 主动持久化 MemTable 为 SST 函数
void flush_all(); // 持久化所有 MemTable 函数

std::string get_sst_path(size_t sst_id); // 获取 SST 路径

std::optional<std::pair<MergeIterator, MergeIterator>>
lsm_iters_monotony_predicate(
std::function<int(const std::string &)> predicate); // 谓词查询(范围查询、前缀匹配)

MergeIterator begin(); // 获取迭代器
MergeIterator end(); // 获取迭代器
};

这里的l0_sst_ids记录了所有sstid, 其排序是从大到小, 因为sstid越大表示这个sst越新, 需要优先查询。

可以使用l0_sst_ids获取的id从哈希表ssts中查询SST的描述类(类似于文件描述符)。

2 数据库初始化

构造函数需要指定一个路径用于存放数据, 并判断这个路径下是否已经有sst文件, 如果有, 需要解析器元数据加载到类的描述集合l0_sst_idsssts中, 代码逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
LSMEngine::LSMEngine(std::string path) : data_dir(path) {
// TODO: 现在只有 l0 sst, 之后的命名需要重新设计前缀, 统一由函数拼接返回
// 初始化 block_cahce
block_cache = std::make_shared<BlockCache>(LSMmm_BLOCK_CACHE_CAPACITY,
LSMmm_BLOCK_CACHE_K);

// 创建数据目录
if (!std::filesystem::exists(path)) {
std::filesystem::create_directory(path);
} else {
// 如果目录存在,则检查是否有 sst 文件并加载
for (const auto &entry : std::filesystem::directory_iterator(path)) {
if (!entry.is_regular_file()) {
continue;
}

std::string filename = entry.path().filename().string();
// SST文件名格式为: sst_{id}
if (filename.substr(0, 4) != "sst_") {
continue;
}

// 提取SST ID
std::string id_str =
filename.substr(4, filename.length() - 4); // 4 for "sst_"
if (id_str.empty()) {
continue;
}
size_t sst_id = std::stoull(id_str);

// 加载SST文件, 初始化时需要加写锁
std::unique_lock<std::shared_mutex> lock(ssts_mtx); // 写锁
std::string sst_path = get_sst_path(sst_id);
auto sst = SST::open(sst_id, FileObj::open(sst_path), block_cache);
ssts[sst_id] = sst;

// 所有加载的SST都属于L0层
l0_sst_ids.push_back(sst_id);
}

// 按ID排序,确保顺序一致
l0_sst_ids.sort();
// 由于 sst_id 越大, 表示是越晚刷入的, 在后续查询时需要优先考虑,
// 故需要从大到小排序
l0_sst_ids.reverse();
}
}

这里需要注意, l0_sst_ids排序后需要reverse, 这样是为了保证更新的sst(id更大)出现在链表头部; 同时还需要初始化LRU-K的缓存池block_cache

3 数据写入

数据写入直接调用MemTableput函数即可, 但这里需要写入后判断是否需要进行刷盘:

1
2
3
4
5
6
7
8
void LSMEngine::put(const std::string &key, const std::string &value) {
memtable.put(key, value);

// 如果 memtable 太大,需要刷新到磁盘
if (memtable.get_total_size() >= LSM_TOL_MEM_SIZE_LIMIT) {
flush();
}
}

flush()函数马上会介绍

4 数据查询

查询就比较复杂了, 这里先给出一个流程说明, 再结合代码解释:

  1. MemTable(包括冻结和未冻结的)中查询, 这一部分之前的文章已经介绍过了
  2. 如果MemTable未命中, 按照链表顺序(sst_id从大到小)从sst中查询

代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
std::optional<std::string> LSMEngine::get(const std::string &key) {
// 1. 先查找 memtable
auto value = memtable.get(key);
if (value.has_value()) {
if (value.value().size() > 0) {
// 值存在且不为空(没有被删除)
return value;
} else {
// memtable返回的kv的value为空值表示被删除了
return std::nullopt;
}
}

// 2. l0 sst中查询
std::shared_lock<std::shared_mutex> rlock(ssts_mtx); // 读锁
for (auto &sst_id : l0_sst_ids) {
// l0_sst_ids 中的 sst_id 是按从大到小的顺序排列,
// sst_id 越大, 表示是越晚刷入的, 优先查询
auto sst = ssts[sst_id];
auto sst_iterator = sst->get(key);
if (sst_iterator != sst->end()) {
if ((sst_iterator)->second.size() > 0) {
// 值存在且不为空(没有被删除)
return sst_iterator->second;
} else {
// 空值表示被删除了
return std::nullopt;
}
}
}

return std::nullopt;
}

这里还有一个很重要的点, 就是我们调用memtableapi时并没有加锁, 但读写sst时进行了加锁, 这是因为, 我们之前实现的MemTable已经包含了加锁解锁操作保证线程安全, 而多个sst本身就是我们目前这个控制结构类管理的, 所以后者需要我们自行加锁保证线程安全。

5 数据删除

这部分就是put一个空的value即可, 这里就不再赘述了。

6 Flush刷盘

这里先给出代码, 然后解释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void LSMEngine::flush() {
if (memtable.get_total_size() == 0) {
return;
}

// 1. 创建新的 SST ID
// 链表头部存储的是最新刷入的sst, 其sst_id最大
size_t new_sst_id = l0_sst_ids.empty() ? 0 : l0_sst_ids.front() + 1;
// ! 这里有内存安全问题, 因为可能多个 flush 获取了同一个
// new_sst_id, 但后续有WAL, 故先忽略这个问题, 且这个概率很小

// 2. 准备 SSTBuilder
SSTBuilder builder(LSM_BLOCK_SIZE); // 4KB block size

// 3. 将 memtable 中最旧的表写入 SST
auto sst_path = get_sst_path(new_sst_id);
auto new_sst =
memtable.flush_last(builder, sst_path, new_sst_id, block_cache);

std::unique_lock<std::shared_mutex> lock(ssts_mtx); // 写锁
// 4. 更新内存索引
ssts[new_sst_id] = new_sst;

// 5. 更新 sst_ids
l0_sst_ids.push_front(new_sst_id);
}

整体看流程倒没有什么问题,但存在一个前缀的线程不安全问题:

  1. 线程1某一时刻获取了new_sst_id,随后让出CPU
  2. 线程2锁喉被调度, 也获取了new_sst_id
  3. 线程1和线程2随后各自调度memtable.flush_last得到一个编码的sst, 他们数据是不同的, 因为memtable是线程安全的, 但他们此时却有相同的new_sst_id

代码写到这里时我已经注意到了这个问题, 但还是先搁置了, 因为我们后续还要实现WAL(如果我不鸽的话), 到时候再一起解决这个问题

7 小节

到此时, 你已经得到了一个可以玩基础put/get/del的单机kv数据库了, 你可以编译后自行测试一下, 但它还缺少如下功能:

  1. 前缀查询和范围查询, 我们目前的get/put都是精准的点查询, 而范围查询要求我们更改底层的memtablesst, 下一小节会介绍
  2. 迭代器, 进行精确点查询时, 按照顺序查询memtablesst就可以了, 但如果实现整个迭代器, 需要估计先遍历的memtablesstvalue为空的记录对后续遍历结果中同样key的删除效果, 这会比这一小节实现的函数更复杂, 也会介绍

当我们完成上面这些功能后, 我们的kv数据库就相对完备了 ,至少已经是一个比较高级的玩具了, 你可以对这个玩具进行进一步的封装, 比如实现RedisResp协议从而替代redis-server来支持redis-cli的连接。目前我最新的github仓库代码已经支持了部分常用的redis命令, 但redis命令还是太多了, 我没有完全实现, 如果有感兴趣的朋友, 可以提一提pr