C++从零开始实现LSM-Tree-KV存储-09-范围查询

这一部分是对之前的存储引擎进行优化,为其实现范围查询的功能。实现的范围查询将使用于后续的Redis兼容层的内容中。

代码仓库:https://github.com/ToniXWD/toni-lsm

欢迎点个Star

1 设计思路

这里的范围查询其实可以进行拓展,由于我们的内存的MemTableSST生成的迭代器都是有序的, 只要目标区间严格符合排序的一段区间, 都可以归类为范围查询。主要包括的类型有:

  1. 真实的范围查询: 查询key[start, end)/[start, end] 范围内的键值对
  2. 前缀匹配: 查询keyxxx为前缀的键值对, 因为根据字符串的排序规则, 相同前缀的部分肯定只会出现在一个前缀区间

因此, 我们可以设计这样一个查询接口, 其接收一个谓词, 这个谓词即说明此次查询是普通的范围查询还是前缀匹配, 但要求结果一定在全局只位于一个连续区间中就可以。同时该谓词不能返回bool值, 而是类似字符串比较那样返回一个int值, 0 表示不匹配, 1 表示大于, -1 表示小于。这样我们才可以根据返回值确定下一步二分查找的方向。

2 代码实现

2.1 SkipList 谓词查询

我们需要逐层次实现这个支持谓词的查询接口,其返回一组迭代器表示startend, 这里我们还是要利用SkipList的有序性多层不同步长的链表来实现快速的匹配查询。

首先, 为保证代码查询中节点移动的灵活性, 我们需要将之前实现的单向链表转化为双向链表, 双向链表的前向指针和后向指针存储于等长的vector, 但是使用weak_ptr, 用于表示当前节点的前向节点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// include/skiplist/skiplist.h

struct SkipListNode {
std::string key; // 节点存储的键
std::string value; // 节点存储的值
std::vector<std::shared_ptr<SkipListNode>>
forward; // 指向不同层级的下一个节点的指针数组
std::vector<std::weak_ptr<SkipListNode>>
backward; // 指向不同层级的下一个节点的指针数组
SkipListNode(const std::string &k, const std::string &v, int level)
: key(k), value(v), forward(level, nullptr),
backward(level, std::weak_ptr<SkipListNode>()) {}
void set_backward(int level, std::shared_ptr<SkipListNode> node) {
backward[level] = std::weak_ptr<SkipListNode>(node);
}
};

这里也补充说明一下weak_ptr, 它的作用是避免shared_ptr循环引用, 即一个节点的shared_ptr指针指向另一个节点, 另一个节点的shared_ptr指针指向前者, 这样就会造成两个节点的析构都无法进行, 因为在析构时互相持有对方的引用计数, 类似死锁。但weak_ptr不参与类似shared_ptr的引用计数, 保证了析构的正确进行。但也正因为如此,weak_ptr不保证指针的有效性, 需要想使用.lock()判断该指针是否有效。

至于实现, 这里先给出代码, 然后进行解释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
// src/skiplist/skipList.cpp

// 返回第一个满足谓词的位置和最后一个满足谓词的迭代器
// 如果不存在, 范围nullptr
// 谓词作用于key, 且保证满足谓词的结果只在一段连续的区间内, 例如前缀匹配的谓词
// predicate返回值:
// 0: 谓词
// >0: 不满足谓词, 需要向右移动
// <0: 不满足谓词, 需要向左移动
std::optional<std::pair<SkipListIterator, SkipListIterator>>
SkipList::iters_monotony_predicate(
std::function<int(const std::string &)> predicate) {
auto current = head;
SkipListIterator begin_iter = SkipListIterator(nullptr);
SkipListIterator end_iter = SkipListIterator(nullptr);

// 从最高层开始查找
// 一开始 current == head, 所以 current_level - 1 处肯定有合法的指针
bool find1 = false;
for (int i = current_level - 1; i >= 0; --i) {
while (!find1) {
auto forward_i = current->forward[i];
if (forward_i == nullptr) {
break;
}
auto direction = predicate(forward_i->key);
if (direction == 0) {
// current 已经满足谓词了
find1 = true;
current = forward_i;
break;
} else if (direction < 0) {
// 下一个位置不满足谓词, 且方向错误(位于目标区间右侧)
// 需要尝试更小的步长(层级)
break;
} else {
// 下一个位置不满足谓词, 但方向正确(位于目标区间左侧)
current = forward_i;
}
}
}

if (!find1) {
// 无法找到第一个满足谓词的迭代器, 直接返回
return std::nullopt;
}

// 记住当前 current 的位置
auto current2 = current;

// current 已经满足谓词, 但有可能中途跳过了节点, 需要前向检查
// 注意此时 不能直接从 current_level - 1 层开始,
// 因为当前节点的层数不一定等于最大层数
for (int i = current->backward.size() - 1; i >= 0; --i) {
while (true) {
if (current->backward[i].lock() == nullptr ||
current->backward[i].lock() == head) {
// 当前层没有前向节点, 或前向节点指向头结点
break;
}
auto direction = predicate(current->backward[i].lock()->key);
if (direction == 0) {
// 前一个位置满足谓词, 继续判断
current = current->backward[i].lock();
continue;
} else if (direction > 0) {
// 前一个位置不满足谓词
// 需要尝试更小的步长(层级)
break;
} else {
// 因为当前位置满足了谓词, 前一个位置不可能返回-1
// 这种情况属于跳表实现错误, 需要排查
throw std::runtime_error("iters_predicate: invalid direction");
}
}
}

// 找到第一个满足谓词的节点
begin_iter = SkipListIterator(current);

// 找到最后一个满足谓词的节点
for (int i = current2->forward.size() - 1; i >= 0; --i) {
while (true) {
if (current2->forward[i] == nullptr) {
// 当前层没有后向节点
break;
}
auto direction = predicate(current2->forward[i]->key);
if (direction == 0) {
// 后一个位置满足谓词, 继续判断
current2 = current2->forward[i];
continue;
} else if (direction < 0) {
// 后一个位置不满足谓词
// 需要尝试更小的步长(层级)
break;
} else {
// 因为当前位置满足了谓词, 后一个位置不可能返回1
// 这种情况属于跳表实现错误, 需要排查
throw std::runtime_error("iters_predicate: invalid direction");
}
}
}

end_iter = SkipListIterator(current2);
// 转化为开区间
++end_iter;

return std::make_optional<std::pair<SkipListIterator, SkipListIterator>>(
begin_iter, end_iter);
}

这里的步骤是:

  1. 先使用步长最长的一层链表前向查询来查找第一个满足谓词的节点n_temp, 如果找不到, 直接返回。
  2. 如果找到了这个第一个满足谓词的节点n_temp, 那么就继续向前遍历。因为我们可能是通过大步长的链表找到的节点, 中间可能跳过了满足步长的节点, 所以还需要后向查询, 直到找到第一个不满足谓词的节点, 这个节点的后一个节点就是第一个满足谓词的节点n_start
  3. n_temp开始, 使用步长最长的一层链表后向查询来查找最后一个满足谓词的节点n_end并记录。如果找不到, 先降低跳表的层级(降低步长), 如果还是找不到, 直接返回。

这里的前向查询就正向查询, 后向查询就反向查询, 因为我们使用的是双向链表

这里也看出了我们为什么需要一个返回int类型的谓词, 因为当我们发现谓词不匹配时(返回值 == 0), 需要知道下一步的方向是什么。

2.2 MemTable 谓词查询

由于MemTable就是多份SkipList的集合, 因此我们只需要对每个SkipList进行范围查询, 最后封装为一个HeapIterator来去除旧数据的干扰即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
std::optional<std::pair<HeapIterator, HeapIterator>>
MemTable::iters_monotony_predicate(
std::function<int(const std::string &)> predicate) {
std::shared_lock<std::shared_mutex> slock(rx_mtx);
std::vector<SearchItem> item_vec;

auto cur_result = current_table->iters_monotony_predicate(predicate);
if (cur_result.has_value()) {
auto [begin, end] = cur_result.value();
for (auto iter = begin; iter != end; ++iter) {
item_vec.emplace_back(iter.get_key(), iter.get_value(), 0);
}
}

int table_idx = 1;
for (auto ft = frozen_tables.begin(); ft != frozen_tables.end(); ft++) {
auto table = *ft;
auto result = table->iters_monotony_predicate(predicate);
if (result.has_value()) {
auto [begin, end] = result.value();
for (auto iter = begin; iter != end; ++iter) {
item_vec.emplace_back(iter.get_key(), iter.get_value(), table_idx);
}
}
table_idx++;
}

if (item_vec.empty()) {
return std::nullopt;
}
return std::make_pair(HeapIterator(item_vec), HeapIterator{});
}

2.3 Block 谓词查询

同理我们的Block也需要支持谓词查询。但Block逻辑上就是排序的数组, 因此不需要像SkipList一样使用多层链表按照步长不同来先后遍历, 我们只需要使用常规的二分查询就可以了:

这里的逻辑是:

  1. 先使用二分查找来查找第一个满足谓词的位置, 如果匹配:
    1. 如果是第一次匹配, 记录当前位置first_first, 更新first的位置, 继续向左查询
    2. 如果不是第一次匹配, 只需要更新first的位置, 继续向左查询
  2. 之前的二分查找结束后, 区间的start就是first的位置, 如果这个位置为预设的-1, 表示无法匹配, 直接返回空
  3. 使用二分查找来查找最后一个满足谓词的位置, 二分的左侧从first_first开始向右二分查询, 找到last为区间的end位置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
// src/block/block.cpp

// 返回第一个满足谓词的位置和最后一个满足谓词的位置
// 如果不存在, 范围nullptr
// 谓词作用于key, 且保证满足谓词的结果只在一段连续的区间内, 例如前缀匹配的谓词
// 返回的区间是闭区间, 开区间需要手动对返回值自增
// predicate返回值:
// 0: 满足谓词
// >0: 不满足谓词, 需要向右移动
// <0: 不满足谓词, 需要向左移动
std::optional<
std::pair<std::shared_ptr<BlockIterator>, std::shared_ptr<BlockIterator>>>
Block::get_monotony_predicate_iters(
std::function<int(const std::string &)> predicate) {
if (offsets.empty()) {
return std::nullopt;
}

// 第一次二分查找,找到第一个满足谓词的位置
int left = 0;
int right = offsets.size() - 1;
int first = -1;
int first_first = -1; // 第一次找到的位置, 不一定是区间的首尾

while (left <= right) {
int mid = left + (right - left) / 2;
size_t mid_offset = offsets[mid];

auto mid_key = get_key_at(mid_offset);
int direction = predicate(mid_key);
if (direction < 0) {
// 目标在 mid 左侧
right = mid - 1;
} else if (direction > 0) {
// 目标在 mid 右侧
left = mid + 1;
} else {
first = mid;
if (first_first == -1) {
first_first = mid;
}
// 继续判断左边是否符合
right = mid - 1;
}
}

if (first == -1) {
return std::nullopt; // 没有找到满足谓词的元素
}

// 第二次二分查找,找到最后一个满足谓词的位置
left = first_first;
right = offsets.size() - 1;
int last = -1;

while (left <= right) {
int mid = left + (right - left) / 2;
size_t mid_offset = offsets[mid];

auto mid_key = get_key_at(mid_offset);
int direction = predicate(mid_key);

if (direction < 0) {
// 目标在 mid 左侧
right = mid - 1;
} else if (direction > 0) {
// 目标在 mid 右侧
throw std::runtime_error("block is not sorted");
} else {
last = mid;
// 继续判断右边是否符合
left = mid + 1;
}
}

auto it_begin = std::make_shared<BlockIterator>(shared_from_this(), first);
auto it_end = std::make_shared<BlockIterator>(shared_from_this(), last + 1);

return std::make_optional<std::pair<std::shared_ptr<BlockIterator>,
std::shared_ptr<BlockIterator>>>(it_begin,
it_end);
}

2.4 SST 谓词查询

SST中的所有Block都是排序的, 因此也只需要对每个Block进行二分查找, 然后将第一次找到的开始位置和最后一次找到的结束位置拼接在一起就可以了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// src/sst/sst_iterator.cpp

// predicate返回值:
// 0: 谓词
// >0: 不满足谓词, 需要向右移动
// <0: 不满足谓词, 需要向左移动
std::optional<std::pair<SstIterator, SstIterator>> sst_iters_monotony_predicate(
std::shared_ptr<SST> sst,
std::function<int(const std::string &)> predicate) {
std::optional<SstIterator> final_begin = std::nullopt;
std::optional<SstIterator> final_end = std::nullopt;
for (int block_idx = 0; block_idx < sst->meta_entries.size(); block_idx++) {
auto block = sst->read_block(block_idx);

BlockMeta &meta_i = sst->meta_entries[block_idx];
if (predicate(meta_i.first_key) < 0 || predicate(meta_i.last_key) > 0) {
break;
}

auto result_i = block->get_monotony_predicate_iters(predicate);
if (result_i.has_value()) {
auto [i_begin, i_end] = result_i.value();
if (!final_begin.has_value()) {
auto tmp_it = SstIterator(sst);
tmp_it.set_block_idx(block_idx);
tmp_it.set_block_it(i_begin);
final_begin = tmp_it;
}
auto tmp_it = SstIterator(sst);
tmp_it.set_block_idx(block_idx);
tmp_it.set_block_it(i_end);
if (tmp_it.is_end() && tmp_it.m_block_idx == sst->num_blocks()) {
tmp_it.set_block_it(nullptr);
}
final_end = tmp_it;
}
}
if (!final_begin.has_value() || !final_end.has_value()) {
return std::nullopt;
}
return std::make_pair(final_begin.value(), final_end.value());
}

2.5 LSM Engine 谓词查询

LSM Engine中, SST的查询结果和memtable的查询结果需要合并, 这里也是我们之前类似的实现就可以了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// src/lsm/engine.cpp

std::optional<std::pair<MergeIterator, MergeIterator>>
LSMEngine::lsm_iters_monotony_predicate(
std::function<int(const std::string &)> predicate) {

// 先从 memtable 中查询
auto mem_result = memtable.iters_monotony_predicate(predicate);

// 再从 sst 中查询
std::vector<SearchItem> item_vec;
for (auto &[sst_idx, sst] : ssts) {
auto result = sst_iters_monotony_predicate(sst, predicate);
if (!result.has_value()) {
continue;
}
auto [it_begin, it_end] = result.value();
for (; it_begin != it_end && it_begin.is_valid(); ++it_begin) {
// 这里越古老的sst的idx越小, 我们需要让新的sst优先在堆顶
// 让新的sst(拥有更大的idx)排序在前面, 反转符号就行了
item_vec.emplace_back(it_begin.key(), it_begin.value(), -sst_idx);
}
}

HeapIterator l0_iter(item_vec);

if (!mem_result.has_value() && item_vec.empty()) {
return std::nullopt;
}
if (mem_result.has_value()) {
auto [mem_start, mem_end] = mem_result.value();
auto start = MergeIterator(mem_start, l0_iter);
auto end = MergeIterator{};
return std::make_optional<std::pair<MergeIterator, MergeIterator>>(start,
end);
} else {
auto start = MergeIterator(HeapIterator{}, l0_iter);
auto end = MergeIterator{};
return std::make_optional<std::pair<MergeIterator, MergeIterator>>(start,
end);
}
}

3 范围查询/前缀查询

现在我们可以借助我们的谓词查询来完成前缀查询和范围查询了:

3.1 范围查询

这里返回值是目标位置相对当前位置的方向:

  • 如果当前位置更大, 则目标位于左边, 返回-1 (返回小于0的值都可以)
  • 如果当前位置更小, 则目标位于右边, 返回1 (返回大于0的值都可以)
  • 如果当前位置等于目标, 则返回0
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
LSM lsm(test_dir);

auto predicate = [](const std::string &key) -> int {
int key_num = std::stoi(key.substr(3)); // Extract the number from the key
if (key_num < 20) {
return 1;
}
if (key_num > 60) {
return -1;
}
return 0;
};

// Call the method under test
auto result = lsm.lsm_iters_monotony_predicate(predicate);

3.2 前缀查询

前缀查询只需借助我们的compare函数即可完成比较, 但需要翻转符号, 因为当前位置更大返回的值是大于0, 但目标位置在左边, 需要反转为小于0的值, 反之亦然:

1
2
3
4
5
6
LSM lsm(test_dir);

auto result_elem = lsm->lsm_iters_monotony_predicate(
[&preffix](const std::string &elem) {
return -elem.compare(0, preffix.size(), preffix);
});

4 小节

目前我们补完了范围查询和前缀查询, 一个KV数据库的基础功能都具备了。下一章我们先不急着实现Compact, WAL, MVCC等, 我们先基于这些基础功能构建一个兼容Redis的后端, 也就是你可以用我们新构建的后端替代大名鼎鼎的redis-server, 然后接受客户端redis-cli的访问。