C++从零开始实现LSM-Tree-KV存储-02-memtable

这一小节实现 MemTable 部分, 其实内容很少, 就是对 之前实现的 SkipList 的简单封装。

代码仓库: https://github.com/ToniXWD/toni-lsm

1 MemTable实现

1.1 原理剖析

首先复习下MemTable和之前的SkipList的关系:

MemTable

内存中的整个MemTable包含多份SkipList, 其中只有一份是可以写入的SkipList, 当SkipList大小达到指定阈值后被冻结成只读状态, 冻结后的SkipList按照冻结时间进行排序, 先冻结的在后, 后冻结的在前(其实只要保证查询是先冻结的SkipList优先即可)。

另一方面,由于LSM Tree只能追加写入, 因此实际上用插入一个空值表示删除操作。这也是我们为什么在之前的SkipList::get中使用了std::optional, 因为这样同时能表示key在物理上不存在和key在逻辑上被不存在(被删除): None表示这个key从来没有存在过, 而std::optional中值为空表示之前的删除记录。

1.2 代码实现

因此对之前的SkipList简单包装即可完成基础的 MemTable

  • 首先定义如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class MemTableIterator;

class MemTable {
friend class MemTableIterator; // 后续会讲解

public:
MemTable();
~MemTable();

void put(const std::string &key, const std::string &value);
std::optional<std::string> get(const std::string &key);
void remove(const std::string &key);
void clear();
void flush(); // 需要实现 sst
void frozen_cur_table();
size_t get_cur_size();
size_t get_frozen_size();
size_t get_total_size();
MemTableIterator begin();
MemTableIterator end();

private:
std::shared_ptr<SkipList> current_table; // 活跃 SkipList
std::list<std::shared_ptr<SkipList>> frozen_tables; // 冻结 SkipList
size_t frozen_bytes; // 冻结的跳表的大小
std::shared_mutex rx_mtx; // 以整个 SkipList 为单位的锁
};

这里先用读写锁试试, 如果性能不好后续再优化。

  • put就是简单放入空值:
1
2
3
4
void MemTable::put(const std::string &key, const std::string &value) {
std::unique_lock<std::shared_mutex> lock(rx_mtx);
current_table->put(key, value);
}
  • get操作会更复杂一些, 在活跃的SkipList中查询失败的话, 需要按顺序查询冻结的SkipList(就近原则):
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
std::optional<std::string> MemTable::get(const std::string &key) {
std::shared_lock<std::shared_mutex> slock(rx_mtx);
// 首先检查当前活跃的memtable
auto result = current_table->get(key);
if (result.has_value()) {
auto data = result.value();
if (!data.empty()) {
return data;
} else {
// 空值表示被删除了
return std::nullopt;
}
}

// 如果当前memtable中没有找到,检查frozen memtable
for (auto &tabe : frozen_tables) {
auto result = tabe->get(key);
if (result.has_value()) {
auto data = result.value();
if (!data.empty()) {
return data;
} else {
// 空值表示被删除了
return std::nullopt;
}
}
}

// 都没有找到,返回空
// !!! 目前只实现了内存中的 memtable, 还没有实现 SST
return std::nullopt;
}
  • remove就是插入一个空值:
1
2
3
4
5
void MemTable::remove(const std::string &key) {
std::unique_lock<std::shared_mutex> lock(rx_mtx);
// 删除的方式是写入空值
current_table->put(key, "");
}

2 迭代器实现

2.1 Merge原理

迭代器算是这里的一个难点, 因为新的SkipList中的元素会导致旧的SkipList的部分元素失效, 因此不能简单地将不同SkipList的遍历结果拼接起来就完事儿。

试想下面这个场景:

1
2
SkipList0: ("k1", "v1") -> ("k4", "") -> ("k5", "v3")
SkipList1: ("k2", "v2") -> ("k3", "v3") -> ("k4", "v4")

SkipList0中的("k4", "")表示删除了"k4", 因此如果我们先消耗了SkipList0的迭代器, 那么SkipList1中就无法获取"k3"的不合法性, 因此需要对不同SkipList的迭代器进行merge操作来删除这些无效的元素。

这里可以维护一个堆,堆首先根据key排序, 然后根据SkipList id排序, 因此相同的key, 后插入的记录肯定更靠近堆顶(因为SkipList id越小表示其越新), 因此堆顶的某个key一定是整个MemTable中该key的最新记录, 迭代器对该key只需要堆顶的这一个元素, 其余在取出堆顶后即可全部移除(因为首先按照key排序, 所以他们一定连续出现在堆顶)

其他函数就都很简单了, 这里不列举了, 感兴趣可以看Github源码

2.2 代码实现

  • 迭代器定义:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#pragma once

#include "../skiplist/skiplist.h"
#include "memtable.h"
#include <queue>

struct SearchItem {
std::string key;
std::string value;
int mem_idx;
}; // 用于在堆(C++中的priority_queue)中排序

bool operator<(const SearchItem &a, const SearchItem &b);
bool operator>(const SearchItem &a, const SearchItem &b);
bool operator==(const SearchItem &a, const SearchItem &b);

class MemTableIterator {
public:
MemTableIterator();
MemTableIterator(const MemTable &memtable);
std::pair<std::string, std::string> operator*() const;
MemTableIterator &operator++();
MemTableIterator operator++(int);
bool operator==(const MemTableIterator &other) const;
bool operator!=(const MemTableIterator &other) const;

private:
std::priority_queue<SearchItem, std::vector<SearchItem>,
std::greater<SearchItem>>
items;
};

MemTableIterator中维持着一个堆(priority_queue), 这个堆的堆顶就是当前迭代器的kv, 且必须保证其是有效的, 换言之, 执行next()操作时必须移除堆顶的无效元素(被之前的删除记录删除的元素 && 本身就是删除记录)

  • 接下来首先看看MemTableIterator的构造:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
MemTableIterator::MemTableIterator(const MemTable &memtable) {
auto cur_table = memtable.current_table;

for (auto iter = cur_table->begin(); iter != cur_table->end(); iter++) {
items.push(SearchItem{iter.get_key(), iter.get_value(), 0});
}

int level = 1;
for (auto ft = memtable.frozen_tables.begin();
ft != memtable.frozen_tables.end(); ft++) {
auto table = *ft;
for (auto iter = table->begin(); iter != table->end(); iter++) {
items.push(SearchItem{iter.get_key(), iter.get_value(), level});
}
level++;
}

while (!items.empty() && items.top().value.empty()) {
// 如果当前元素的value为空,则说明该元素已经被删除,需要从优先队列中删除
auto del_key = items.top().key;
while (!items.empty() && items.top().key == del_key) {
items.pop();
}
}
}

目前也没有使用什么优化手段, 直接遍历整个底层链表, 将其放入堆中。需要注意的是,因为当前堆顶可能本身就是一个删除记录(value是空值), 因此需要先从堆顶检查是否value为空, 如果是, 需要继续删除所有该key的堆元素

  • 接下来是最重要的迭代器自增操作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
MemTableIterator &MemTableIterator::operator++() {
if (items.empty()) {
return *this; // 处理空队列情况
}

auto old_item = items.top();
items.pop();

// 删除与旧元素key相同的元素
while (!items.empty() && items.top().key == old_item.key) {
items.pop();
}

// 处理被删除的元素
while (!items.empty() && items.top().value.empty()) {
auto del_key = items.top().key;
while (!items.empty() && items.top().key == del_key) {
items.pop();
}
}

return *this;
}

取出一个元素后, 需要删除剩余堆中所有与该元素的key相同的元素。同时类似构造函数中最后对堆顶的检查, 需要检测堆顶元素不为空value

其他函数很简单了, 就不列举了