本小节将串联之前实现的MemTable
, SST
, Block
, 各类Iterator
, 实现一个初版的完整的单机KV
存储引擎。
代码仓库:https://github.com/ToniXWD/toni-lsm
欢迎点个Star
1 控制结构设计
这里再次回顾一下LSM Tree
的读写流程:
写入/删除
- 写入
MemTable
, 如果写入的KV
的value
为空, 表示一个删除标记
- 若当前活跃的
MemTable
大小达到阈值, 则将其冻结
- 若冻结的
MemTable
数量或大小达到阈值, 则将最早冻结的MemTable
转为SST
查询
- 查询当前活跃的
MemTable
, 如果查到有效记录或删除记录, 则返回
- 若查询当前活跃的
MemTable
未命中, 则遍历冻结的MemTable
, 由于冻结的MemTable
也存在次序, 需要先查询最近冻结的MemTable
- 若查询冻结的
MemTable
未命中, 则遍历SST
, 由于SST
也存在次序, 需要先查询最近创建的SST
因此我们只需要整合之前创建的MemTable
, SST
, Block
, Iterator
即可, 很容易写出下面的类定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| class LSMEngine { public: std::string data_dir; MemTable memtable; std::list<size_t> l0_sst_ids; std::unordered_map<size_t, std::shared_ptr<SST>> ssts; std::shared_mutex ssts_mtx; std::shared_ptr<BlockCache> block_cache;
LSMEngine(std::string path); ~LSMEngine();
std::optional<std::string> get(const std::string &key);
void put(const std::string &key, const std::string &value); void put_batch(const std::vector<std::pair<std::string, std::string>> &kvs); void remove(const std::string &key); void remove_batch(const std::vector<std::string> &keys); void clear(); void flush(); void flush_all();
std::string get_sst_path(size_t sst_id);
std::optional<std::pair<MergeIterator, MergeIterator>> lsm_iters_monotony_predicate( std::function<int(const std::string &)> predicate);
MergeIterator begin(); MergeIterator end(); };
|
这里的l0_sst_ids
记录了所有sst
的id
, 其排序是从大到小, 因为sst
的id
越大表示这个sst
越新, 需要优先查询。
可以使用l0_sst_ids
获取的id
从哈希表ssts
中查询SST
的描述类(类似于文件描述符)。
2 数据库初始化
构造函数需要指定一个路径用于存放数据, 并判断这个路径下是否已经有sst
文件, 如果有, 需要解析器元数据加载到类的描述集合l0_sst_ids
和ssts
中, 代码逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| LSMEngine::LSMEngine(std::string path) : data_dir(path) { block_cache = std::make_shared<BlockCache>(LSMmm_BLOCK_CACHE_CAPACITY, LSMmm_BLOCK_CACHE_K);
if (!std::filesystem::exists(path)) { std::filesystem::create_directory(path); } else { for (const auto &entry : std::filesystem::directory_iterator(path)) { if (!entry.is_regular_file()) { continue; }
std::string filename = entry.path().filename().string(); if (filename.substr(0, 4) != "sst_") { continue; }
std::string id_str = filename.substr(4, filename.length() - 4); if (id_str.empty()) { continue; } size_t sst_id = std::stoull(id_str);
std::unique_lock<std::shared_mutex> lock(ssts_mtx); std::string sst_path = get_sst_path(sst_id); auto sst = SST::open(sst_id, FileObj::open(sst_path), block_cache); ssts[sst_id] = sst;
l0_sst_ids.push_back(sst_id); }
l0_sst_ids.sort(); l0_sst_ids.reverse(); } }
|
这里需要注意, l0_sst_ids
排序后需要reverse
, 这样是为了保证更新的sst
(id更大)出现在链表头部; 同时还需要初始化LRU-K
的缓存池block_cache
。
3 数据写入
数据写入直接调用MemTable
的put
函数即可, 但这里需要写入后判断是否需要进行刷盘:
1 2 3 4 5 6 7 8
| void LSMEngine::put(const std::string &key, const std::string &value) { memtable.put(key, value);
if (memtable.get_total_size() >= LSM_TOL_MEM_SIZE_LIMIT) { flush(); } }
|
flush()
函数马上会介绍
4 数据查询
查询就比较复杂了, 这里先给出一个流程说明, 再结合代码解释:
- 从
MemTable
(包括冻结和未冻结的)中查询, 这一部分之前的文章已经介绍过了
- 如果
MemTable
未命中, 按照链表顺序(sst_id从大到小)从sst
中查询
代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| std::optional<std::string> LSMEngine::get(const std::string &key) { auto value = memtable.get(key); if (value.has_value()) { if (value.value().size() > 0) { return value; } else { return std::nullopt; } }
std::shared_lock<std::shared_mutex> rlock(ssts_mtx); for (auto &sst_id : l0_sst_ids) { auto sst = ssts[sst_id]; auto sst_iterator = sst->get(key); if (sst_iterator != sst->end()) { if ((sst_iterator)->second.size() > 0) { return sst_iterator->second; } else { return std::nullopt; } } }
return std::nullopt; }
|
这里还有一个很重要的点, 就是我们调用memtable
的api
时并没有加锁, 但读写sst
时进行了加锁, 这是因为, 我们之前实现的MemTable
已经包含了加锁解锁操作保证线程安全, 而多个sst
本身就是我们目前这个控制结构类管理的, 所以后者需要我们自行加锁保证线程安全。
5 数据删除
这部分就是put
一个空的value
即可, 这里就不再赘述了。
6 Flush刷盘
这里先给出代码, 然后解释:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| void LSMEngine::flush() { if (memtable.get_total_size() == 0) { return; }
size_t new_sst_id = l0_sst_ids.empty() ? 0 : l0_sst_ids.front() + 1;
SSTBuilder builder(LSM_BLOCK_SIZE);
auto sst_path = get_sst_path(new_sst_id); auto new_sst = memtable.flush_last(builder, sst_path, new_sst_id, block_cache);
std::unique_lock<std::shared_mutex> lock(ssts_mtx); ssts[new_sst_id] = new_sst;
l0_sst_ids.push_front(new_sst_id); }
|
整体看流程倒没有什么问题,但存在一个前缀的线程不安全问题:
- 线程1某一时刻获取了
new_sst_id
,随后让出CPU
- 线程2锁喉被调度, 也获取了
new_sst_id
- 线程1和线程2随后各自调度
memtable.flush_last
得到一个编码的sst
, 他们数据是不同的, 因为memtable
是线程安全的, 但他们此时却有相同的new_sst_id
代码写到这里时我已经注意到了这个问题, 但还是先搁置了, 因为我们后续还要实现WAL
(如果我不鸽的话), 到时候再一起解决这个问题
7 小节
到此时, 你已经得到了一个可以玩基础put/get/del
的单机kv
数据库了, 你可以编译后自行测试一下, 但它还缺少如下功能:
- 前缀查询和范围查询, 我们目前的
get/put
都是精准的点查询, 而范围查询要求我们更改底层的memtable
和sst
, 下一小节会介绍
- 迭代器, 进行精确点查询时, 按照顺序查询
memtable
和sst
就可以了, 但如果实现整个迭代器, 需要估计先遍历的memtable
和sst
中value
为空的记录对后续遍历结果中同样key
的删除效果, 这会比这一小节实现的函数更复杂, 也会介绍
当我们完成上面这些功能后, 我们的kv
数据库就相对完备了 ,至少已经是一个比较高级的玩具了, 你可以对这个玩具进行进一步的封装, 比如实现Redis
的Resp
协议从而替代redis-server
来支持redis-cli
的连接。目前我最新的github
仓库代码已经支持了部分常用的redis
命令, 但redis
命令还是太多了, 我没有完全实现, 如果有感兴趣的朋友, 可以提一提pr
。