C++从零开始实现LSM-Tree-KV存储-16-事务与MVCC设计2

上一章我们对事物和MVCC进行了简单的介绍,并简单介绍了接口的变化, 这一章我们逐层进行修改,以实现事务的语义。

这里部分内容和上一章有重复, 这里是安排有些冗余, 大家觉得重复略过就好

1 Block中的事务id

1.1 数据写入

这里首先回顾上一章中对编码数据格式的修改:

trac_id

因此,我们第一步更改的是Block的编码格式的代码,在Block中添加一个trac_id,这样当读取到Block的某个键值对时,我们就可以知道这个键值对是哪个事务新增或修改的,能更好地帮我们实现不同事务、隔离级别的并发控制。

这里我们修改的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// src/block/block.cpp
bool Block::add_entry(const std::string &key, const std::string &value,
uint64_t tranc_id, bool force_write) {
if (!force_write &&
(cur_size() + key.size() + value.size() + 3 * sizeof(uint16_t) +
sizeof(uint64_t) >
capacity) &&
!offsets.empty()) {
return false;
}
// 计算entry大小:key长度(2B) + key + value长度(2B) + value
size_t entry_size = sizeof(uint16_t) + key.size() + sizeof(uint16_t) +
value.size() + sizeof(uint64_t);
size_t old_size = data.size();
data.resize(old_size + entry_size);

// 写入key长度
uint16_t key_len = key.size();
memcpy(data.data() + old_size, &key_len, sizeof(uint16_t));

// 写入key
memcpy(data.data() + old_size + sizeof(uint16_t), key.data(), key_len);

// 写入value长度
uint16_t value_len = value.size();
memcpy(data.data() + old_size + sizeof(uint16_t) + key_len, &value_len,
sizeof(uint16_t));

// 写入value
memcpy(data.data() + old_size + sizeof(uint16_t) + key_len + sizeof(uint16_t),
value.data(), value_len);

// 写入事务id
memcpy(data.data() + old_size + sizeof(uint16_t) + key_len +
sizeof(uint16_t) + value_len,
&tranc_id, sizeof(uint64_t));

// 记录偏移
offsets.push_back(old_size);
return true;
}

这里做出的修改就是在写入value后再写入了tranc_id,且tranc_id恒定为8个字节的整型数。在文件存储方面,也就只需要修改Blockadd_entry函数,上层的sst仅仅是调用Blockadd_entry函数,而没有直接控制底层文件的写入,因此我们的修改方案也非常简单, 这得益于我们之前的设计模块较为分离.

1.2 数据读取

1.2.1 基础的事务id查询

同时, 在编码数据中添加了tranc_id后, 我们也需要添加对应的读取函数, 这里的逻辑就是在指定偏移位置, 跳过keyvalue的长度, 然后读取tranc_id即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// src/block/block.cpp
uint16_t Block::get_tranc_id_at(size_t offset) const {
// 先获取key长度
uint16_t key_len;
memcpy(&key_len, data.data() + offset, sizeof(uint16_t));

// 计算value长度的位置
size_t value_len_pos = offset + sizeof(uint16_t) + key_len;
uint16_t value_len;
memcpy(&value_len, data.data() + value_len_pos, sizeof(uint16_t));

// 计算事务id的位置
size_t tranc_id_pos = value_len_pos + sizeof(uint16_t) + value_len;
uint64_t tranc_id;
memcpy(&tranc_id, data.data() + tranc_id_pos, sizeof(uint64_t));
return tranc_id;
}

1.3.2 id的查询语义

当然, get_tranc_id_at仅仅是查询指定位置的tranc_id,但我们更多的时候是通过key查询valuetranc_id. 这里二分查询的时候, 我们需要再参数中指定一个tranc_id,然后根据这个tranc_id来判断事务的可见性。这里的逻辑是:

  1. 每个事务都会被分配一个tranc_id,这个tranc_id是递增的,且从1开始。
  2. 事务查询时, 会结合具体的隔离级别, 根据tranc_id来判断事务的可见性。我们如下设定:
    1. 如果事务查询时某个键值对的tranc_id小于等于当前事务的tranc_id, 表示这个键值对是更早创建的事务(或当前事务自己)修改的,因此这个键值对是可见的。
    2. 如果事务查询时某个键值对的tranc_id大于当前事务的tranc_id, 表示这个键值对是当前事务修改的,因此这个键值对是不可见的。
  3. 事务查询时也可以直接让存储引擎返回最新的键值对记录,这时将tranc_id设置为0即可,表示不开启事务功能。

1.3.3 考虑事务语义的二分查询

在设计了事务id的查询语义后,我们必须修改之前实现的二分查询的代码,以考虑事务的语义。因为在之前的设计中, 一个block中相同的key只会存在一次, 因此代码逻辑也很简单,但引入了事务id后, 一个key可能存在多个value,这是因为多个事务可能同时修改同一个key的值,而为了MVCC的实现, 多个不同事务对相同key的修改都需要进行记录, 因此我们需要再二分查询中考虑到重复的key

首先给出修改后的二分查询代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
std::optional<size_t> Block::get_idx_binary(const std::string &key,
uint64_t tranc_id) {
if (offsets.empty()) {
return std::nullopt;
}
// 二分查找
int left = 0;
int right = offsets.size() - 1;

while (left <= right) {
int mid = left + (right - left) / 2;
size_t mid_offset = offsets[mid];

int cmp = compare_key_at(mid_offset, key);

if (cmp == 0) {
// 找到key,返回对应的value
// 还需要判断事务id可见性
auto new_mid = adjust_idx_by_tranc_id(mid, tranc_id);
if (new_mid == -1) {
return std::nullopt;
}
return new_mid;
} else if (cmp < 0) {
// 中间的key小于目标key,查找右半部分
left = mid + 1;
} else {
// 中间的key大于目标key,查找左半部分
right = mid - 1;
}
}

return std::nullopt;
}

这里首先通过二分查询找到指定key的位置, 但需要注意的是, 这里找到的key的位置并不一定是唯一的, 因为同一个key可能存在多个value, 因此我们需要在找到的位置上再进行判断, 判断事务的可见性并执行偏移, 这部分工作由adjust_idx_by_tranc_id函数完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 相同的key连续分布, 且相同的key的事务id从大到小排布
// 这里的逻辑是找到最接近 tranc_id 的键值对的索引位置
int Block::adjust_idx_by_tranc_id(size_t idx, uint64_t tranc_id) {
if (idx >= offsets.size()) {
return -1; // 索引超出范围
}

auto target_key = get_key_at(offsets[idx]);

if (tranc_id != 0) {
auto cur_tranc_id = get_tranc_id_at(offsets[idx]);

if (cur_tranc_id <= tranc_id) {
// 当前记录可见,向前查找更接近的目标
size_t prev_idx = idx;
while (prev_idx > 0 && is_same_key(prev_idx - 1, target_key)) {
prev_idx--;
auto new_tranc_id = get_tranc_id_at(offsets[prev_idx]);
if (new_tranc_id > tranc_id) {
return prev_idx + 1; // 更新的记录不可见
}
}
return prev_idx;
} else {
// 当前记录不可见,向后查找
size_t next_idx = idx + 1;
while (next_idx < offsets.size() && is_same_key(next_idx, target_key)) {
auto new_tranc_id = get_tranc_id_at(offsets[next_idx]);
if (new_tranc_id <= tranc_id) {
return next_idx; // 找到可见记录
}
next_idx++;
}
return -1; // 没有找到满足条件的记录
}
} else {
// 没有开启事务的话, 直接选择最大的事务id的记录返回
size_t prev_idx = idx;
while (prev_idx > 0 && is_same_key(prev_idx - 1, target_key)) {
prev_idx--;
}
return prev_idx;
}
}

bool Block::is_same_key(size_t idx, const std::string &target_key) const {
if (idx >= offsets.size()) {
return false; // 索引超出范围
}
return get_key_at(offsets[idx]) == target_key;
}

这里需要注意的是, 这里的tranc_id是按照从大到小的顺序排列的, 也就是说更大的tranc_id对应的事务优先级更高, adjust_idx_by_tranc_id的执行逻辑为:

  1. 如果开启了事务功能
    1. 使用当前位置的tranc_id和目标tranc_id进行比较, 如果当前位置的tranc_id小于等于目标tranc_id, 表示当前位置的记录是可见的, 因此需要向前查找更接近目标tranc_id的记录
    2. 如果当前位置的tranc_id大于目标tranc_id, 表示当前位置的记录是不可见的, 因此需要向后查找
  2. 如果没有开启事务功能, 直接选择最大的事务id的记录返回

2 BlockIterator中的事务id

同样的, 引入事务id后, 我们也需要修改BlockIterator中的代码, 在使用迭代器查询或者遍历的时候, 需要跳过不可见的事务, 并且对多个key(多个事务创建的, 有不同的tranc_id) 只保留最接近当前tranc_id的记录。

首先是迭代器构造函数需要跳过不可见或重复的记录:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
BlockIterator::BlockIterator(std::shared_ptr<Block> b, size_t index,
uint64_t tranc_id)
: block(b), current_index(index), tranc_id_(tranc_id),
cached_value(std::nullopt) {
skip_by_tranc_id();
}

void BlockIterator::skip_by_tranc_id() {
if (tranc_id_ == 0) {
// 没有开启事务功能
cached_value = std::nullopt;
return;
}

while (current_index < block->offsets.size()) {
size_t offset = block->get_offset_at(current_index);
auto tranc_id = block->get_tranc_id_at(offset);
if (tranc_id <= tranc_id_) {
// 位置合法
break;
}
// 否则跳过不可见事务的键值对
++current_index;
}
cached_value = std::nullopt;
}

自增运算符重载也需要进行类似的判断:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
BlockIterator &BlockIterator::operator++() {
if (block && current_index < block->size()) {
auto prev_idx = current_index;
auto prev_offset = block->get_offset_at(prev_idx);
auto prev_entry = block->get_entry_at(prev_offset);

++current_index;

// 跳过相同的key
while (block && current_index < block->size()) {
auto cur_offset = block->get_offset_at(current_index);
auto cur_entry = block->get_entry_at(cur_offset);
if (cur_entry.key != prev_entry.key) {
break;
}
// 可能会连续出现多个key, 但由不同事务创建, 同样的key直接跳过
++current_index;
}

// 出现不同的key时, 还需要跳过不可见事务的键值对
skip_by_tranc_id();
}
return *this;
}

3 内存部分中的事务id

之前的内容知识对SST中的Block部分和BlockIterator部分进行了修改, 使其包含了事务id的信息。对于内存部分中的MemTableMemTableIterator部分, 我们也需要做类似的修改, 否则在开启事务功能后, 会出现一些问题。

3.1 统一查询接口

我们之前的实现中,Skiplist::get的定义是:

1
std::optional<std::string> get(const std::string &key); // 查找键对应的值

这里返回的直接是value的值, 但在引入了事务和MVCC后, 我们需要判断查询结果的tranc_id是否小于等于当前tranc_id, 如果小于等于, 表示该记录是可见的, 否则表示该记录是不可见的。而之前的get返回值显然不能提供这么复杂的信息, 因此我们需要统一查询接口, 所有的查询组件都返回一个迭代器, 这个迭代器需要继承一个虚基类, 其纯虚函数能够支持所有的组件查询key, value, tranc_id等关键数据。

因此我们实现让虚基类包含相应的纯虚函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
class BaseIterator {
public:
using value_type = std::pair<std::string, std::string>;
using pointer = value_type *;
using reference = value_type &;

virtual BaseIterator &operator++() = 0;
virtual bool operator==(const BaseIterator &other) const = 0;
virtual bool operator!=(const BaseIterator &other) const = 0;
virtual value_type operator*() const = 0;
virtual IteratorType get_type() const = 0;
virtual uint64_t get_tranc_id() const = 0;
virtual bool is_end() const = 0;
virtual bool is_valid() const = 0;
};

class SstIterator;
// *************************** SearchItem ***************************
struct SearchItem {
std::string key_;
std::string value_;
uint64_t tranc_id_;
int idx_;
int level_; // 来自sst的level

SearchItem() = default;
SearchItem(std::string k, std::string v, int i, int l, uint64_t tranc_id)
: key_(std::move(k)), value_(std::move(v)), idx_(i), level_(l),
tranc_id_(tranc_id) {}
};

bool operator<(const SearchItem &a, const SearchItem &b);
bool operator>(const SearchItem &a, const SearchItem &b);
bool operator==(const SearchItem &a, const SearchItem &b);

这里所有的迭代器都需要继承这个基类, 并且需要实现相应的纯虚函数。同时, 这里的SearchItem用于三元组key, value, tranc_id的比较, 并且需要实现相应的比较运算符:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// *************************** SearchItem ***************************
bool operator<(const SearchItem &a, const SearchItem &b) {
if (a.key_ != b.key_) {
return a.key_ < b.key_;
}
if (a.tranc_id_ > b.tranc_id_) {
return true;
}
if (a.level_ < b.level_) {
return true;
}
return a.idx_ < b.idx_;
}

bool operator>(const SearchItem &a, const SearchItem &b) {
if (a.key_ != b.key_) {
return a.key_ > b.key_;
}
if (a.tranc_id_ < b.tranc_id_) {
return true;
}
if (a.level_ < b.level_) {
return true;
}
return a.idx_ > b.idx_;
}

bool operator==(const SearchItem &a, const SearchItem &b) {
return a.idx_ == b.idx_ && a.key_ == b.key_;
}

这里的比较逻辑也是优先比较key, key相同时比较tranc_id, tranc_id相同时比较level, level相同时比较idx。不过需要注意的是, tranc_id更大表示优先级更高。同时这里的level表示SSTlevel, 从0开始, 0表示level0, 1表示level1, 2表示level2, 以此类推。idx则表示内存中的MemTable的比较依据(这里不熟悉可以复习下之前的文档)

然后, 所有的迭代器都继承这个基类BaseIterator , 并且需要实现相应的纯虚函数, 这里的代码就不具体贴出来了, 都比较简单, 可以参考完整的代码仓库

3.2 Memtable接口的变化

这里将MemTable的所有读写接口都进行修改, 一方面加入trabnc_id这个参数, 另一方面统一迭代器作为返回的接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class MemTable {
friend class TranContext;
friend class HeapIterator;

private:
void put_(const std::string &key, const std::string &value,
uint64_t tranc_id);

SkipListIterator get_(const std::string &key, uint64_t tranc_id);

SkipListIterator cur_get_(const std::string &key, uint64_t tranc_id);

SkipListIterator frozen_get_(const std::string &key, uint64_t tranc_id);

void remove_(const std::string &key, uint64_t tranc_id);
void frozen_cur_table_(); // _ 表示不需要锁的版本

public:
MemTable();
~MemTable();

void put(const std::string &key, const std::string &value, uint64_t tranc_id);
void put_batch(const std::vector<std::pair<std::string, std::string>> &kvs,
uint64_t tranc_id);

SkipListIterator get(const std::string &key, uint64_t tranc_id);
void remove(const std::string &key, uint64_t tranc_id);
void remove_batch(const std::vector<std::string> &keys, uint64_t tranc_id);

void clear();
std::shared_ptr<SST> flush_last(SSTBuilder &builder, std::string &sst_path,
size_t sst_id,
std::shared_ptr<BlockCache> block_cache);
void frozen_cur_table();
size_t get_cur_size();
size_t get_frozen_size();
size_t get_total_size();
HeapIterator begin(uint64_t tranc_id);
HeapIterator iters_preffix(const std::string &preffix, uint64_t tranc_id);

std::optional<std::pair<HeapIterator, HeapIterator>>
iters_monotony_predicate(uint64_t tranc_id,
std::function<int(const std::string &)> predicate);

HeapIterator end();

private:
std::shared_ptr<SkipList> current_table;
std::list<std::shared_ptr<SkipList>> frozen_tables;
size_t frozen_bytes;
std::shared_mutex frozen_mtx; // 冻结表的锁
std::shared_mutex cur_mtx; // 活跃表的锁
};

4 SST中的事务id

之前我们已经在block中添加了tranc_id这个数据, 并(key, value)的二元组转化为了(key, value, tranc_id)的三元组存储, 现在我们需要进一步修改整合blocksst

4.1 新增元数据

为了方便在事务查询时, 更直接地判断当前sst是否包含指定的事务id, 我们需要新增如下元数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class SST : public std::enable_shared_from_this<SST> {
friend class SSTBuilder;
friend std::optional<std::pair<SstIterator, SstIterator>>
sst_iters_monotony_predicate(
std::shared_ptr<SST> sst, uint64_t tranc_id,
std::function<int(const std::string &)> predicate);

private:
FileObj file;
std::vector<BlockMeta> meta_entries;
uint32_t bloom_offset;
uint32_t meta_block_offset;
size_t sst_id;
std::string first_key;
std::string last_key;
std::shared_ptr<BloomFilter> bloom_filter;
std::shared_ptr<BlockCache> block_cache;
uint64_t min_tranc_id_ = UINT64_MAX; // new
uint64_t max_tranc_id_ = 0; // new
...
};

4.2 SST的构建

构建SSTSSTBuilder中, 需要记录min_tranc_id_max_tranc_id_:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
void SSTBuilder::add(const std::string &key, const std::string &value,
uint64_t tranc_id) {
// 记录第一个key
if (first_key.empty()) {
first_key = key;
}

// 在 布隆过滤器 中添加key
if (bloom_filter != nullptr) {
bloom_filter->add(key);
}

// 记录 事务id 范围
max_tranc_id_ = std::max(max_tranc_id_, tranc_id);
min_tranc_id_ = std::min(min_tranc_id_, tranc_id);

bool force_write = key == last_key;
// 连续出现相同的 key 必须位于 同一个 block 中

if (block.add_entry(key, value, tranc_id, force_write)) {
// block 满足容量限制, 插入成功
last_key = key;
return;
}

finish_block(); // 将当前 block 写入

block.add_entry(key, value, tranc_id, false);
first_key = key;
last_key = key; // 更新最后一个key
}

同时这里涉及到一个非常重要的易错点(我debug了好久), 当目前add的键值对的key相同时, 一定不能使其跨block分布, 这是因为如果相同的keyblock分布, 那么在block中二分查找时, 会出现定位错误的情况, 具体距离就是:

  1. sst构建时, (key=1, tranc_id=2)被添加到了block5, (key=1, tranc_id=1)被添加到了block6
  2. 现在tranc_id=3的事务查询key=1, 在二分查询时定位到了block6, 返回结果是(key=1, tranc_id=1)的数据
  3. 实际的数据应该是更新的(key=1, tranc_id=2)

因此, 让相同的key出现在同一个block中, 才满足二分查询的流程

这里的做法是, 每次插入时, 与上一次操作的key进行比较, 如果相同, 则将force_write参数设置为true, 确保key相同时, 都会插入到同一个block

4.3 SSTIterator的修改

我们之前的实现中, sst中数据的具体查询是交由SSTIterator完成的, 因此我们需要修改SSTIterator的实现, 确保在sst中查询时, tranc_id的判断正确:

首先是在头文件中添加数据成员:

1
2
3
4
5
class SstIterator : public BaseIterator {
...
uint64_t max_tranc_id_; // new
...
};

然后在具体的定位函数seek中, 添加对事物可见性的判断:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void SstIterator::seek(const std::string &key) {
if (!m_sst) {
m_block_it = nullptr;
return;
}

try {
m_block_idx = m_sst->find_block_idx(key);
if (m_block_idx == -1 || m_block_idx >= m_sst->num_blocks()) {
// 置为 end
// TODO: 这个边界情况需要添加单元测试
m_block_it = nullptr;
m_block_idx = m_sst->num_blocks();
return;
}
auto block = m_sst->read_block(m_block_idx);
if (!block) {
m_block_it = nullptr;
return;
}
m_block_it = std::make_shared<BlockIterator>(block, key, max_tranc_id_);
if (m_block_it->is_end()) {
// block 中找不到
m_block_idx = m_sst->num_blocks();
m_block_it = nullptr;
return;
}
} catch (const std::exception &) {
m_block_it = nullptr;
return;
}
}

这里的关键是:

1
m_block_it = std::make_shared<BlockIterator>(block, key, max_tranc_id_);

通过max_tranc_id_将事务可见性的相关操作转交给BlockIterator实现

5 TwoMergeIterator

这里我们要实现一个新的迭代器TwoMergeIterator, 它是两个迭代器(记为a和b)的合并。这两个迭代器都是BaseIterator的子类, 具有优先级先后顺序。例如,a的优先级更高, key相同时需要先遍历a,再遍历b

这里TwoMergeIterator的核心设计思路就是对已有的迭代器进行封装, 按照一定优先级规则对2个迭代器进行自增操作和数据滤除操作。因为其需要复用已有的迭代器,所以我将这部分内容放到实现了BaseIterator和基础的tranc_id之后.

4.1 TwoMergeIterator的定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// include/lsm/two_merge_iterator.h
class TwoMergeIterator : public BaseIterator {
private:
std::shared_ptr<BaseIterator> it_a;
std::shared_ptr<BaseIterator> it_b;
bool choose_a = false;
mutable std::shared_ptr<value_type> current; // 用于存储当前元素
uint64_t max_tranc_id_ = 0;

void update_current() const;

public:
TwoMergeIterator();
TwoMergeIterator(std::shared_ptr<BaseIterator> it_a,
std::shared_ptr<BaseIterator> it_b, uint64_t max_tranc_id);
bool choose_it_a();
// 跳过当前不可见事务的id (如果开启了事务功能)
void skip_by_tranc_id();
void skip_it_b();

virtual BaseIterator &operator++() override;
virtual bool operator==(const BaseIterator &other) const override;
virtual bool operator!=(const BaseIterator &other) const override;
virtual value_type operator*() const override;
virtual IteratorType get_type() const override;
virtual uint64_t get_tranc_id() const override;
virtual bool is_end() const override;
virtual bool is_valid() const override;

pointer operator->() const;
};

这里的TwoMergeIterator继承自BaseIterator, 并且需要传入两个迭代器it_ait_b, 且前者的优先级更改, 包装规则为:

  1. 二者key不同时, 按照key的比较规则进行遍历
  2. 二者key相同时, it_a优先级高, 先选择it_a的键值对为当前迭代器的键值对
  3. 当任意一个迭代器遍历完成时, 剩余迭代器则直接遍历
  4. 相同key只会遍历一次, 也就是说, 如果it_ait_b都遍历到同一个key, 那么it_a的键值对为当前迭代器的键值对, it_b则直接跳过该键值对。

4.2 TwoMergeIterator的实现

代码实现也很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
TwoMergeIterator::TwoMergeIterator() {}

TwoMergeIterator::TwoMergeIterator(std::shared_ptr<BaseIterator> it_a,
std::shared_ptr<BaseIterator> it_b,
uint64_t max_tranc_id)
: it_a(std::move(it_a)), it_b(std::move(it_b)),
max_tranc_id_(max_tranc_id) {
// 先跳过不可见的事务
skip_by_tranc_id();
skip_it_b(); // 跳过与 it_a 重复的 key
choose_a = choose_it_a(); // 决定使用哪个迭代器
}

bool TwoMergeIterator::choose_it_a() {
if (it_a->is_end()) {
return false;
}
if (it_b->is_end()) {
return true;
}
return (**it_a).first < (**it_b).first; // 比较 key
}

void TwoMergeIterator::skip_it_b() {
if (!it_a->is_end() && !it_b->is_end() && (**it_a).first == (**it_b).first) {
++(*it_b);
}
}

void TwoMergeIterator::skip_by_tranc_id() {
if (max_tranc_id_ == 0) {
return;
}
while (it_a->get_tranc_id() > max_tranc_id_) {
++(*it_a);
}
while (it_b->get_tranc_id() > max_tranc_id_) {
++(*it_b);
}
}

BaseIterator &TwoMergeIterator::operator++() {
if (choose_a) {
++(*it_a);
} else {
++(*it_b);
}
// 先跳过不可见的事务
skip_by_tranc_id();
skip_it_b(); // 跳过重复的 key
choose_a = choose_it_a(); // 重新决定使用哪个迭代器
return *this;
}

这里贴出几个关键的函数, 其余函数实现都很简单直接, 就略了, 可以自行查看代码。

4.3 TwoMergeIterator的使用场景

这里首先就是顶层engine的谓词查询:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
// src/lsm/engine.cpp
std::optional<std::pair<TwoMergeIterator, TwoMergeIterator>>
LSMEngine::lsm_iters_monotony_predicate(
uint64_t tranc_id, std::function<int(const std::string &)> predicate) {

// 先从 memtable 中查询
auto mem_result = memtable.iters_monotony_predicate(tranc_id, predicate);

// 再从 sst 中查询
std::vector<SearchItem> item_vec;
for (auto &[sst_level, sst_ids] : level_sst_ids) {
for (auto &sst_id : sst_ids) {
auto sst = ssts[sst_id];
auto result = sst_iters_monotony_predicate(sst, tranc_id, predicate);
if (!result.has_value()) {
continue;
}
auto [it_begin, it_end] = result.value();
for (; it_begin != it_end && it_begin.is_valid(); ++it_begin) {
// l0中, 这里越古老的sst的idx越小, 我们需要让新的sst优先在堆顶
// 让新的sst(拥有更大的idx)排序在前面, 反转符号就行了
if (tranc_id != 0 && it_begin.get_tranc_id() > tranc_id) {
// 如果开启了事务, 比当前事务 id 更大的记录是不可见的
continue;
}
if (!item_vec.empty() && item_vec.back().key_ == it_begin.key()) {
// 如果key相同,则只保留最新的事务修改的记录即可
// 且这个记录既然已经存在于item_vec中,则其肯定满足了事务的可见性判断
continue;
}
item_vec.emplace_back(it_begin.key(), it_begin.value(), -sst_id,
sst_level, it_begin.get_tranc_id());
}
}
}

std::shared_ptr<HeapIterator> l0_iter_ptr =
std::make_shared<HeapIterator>(item_vec, tranc_id);

if (!mem_result.has_value() && item_vec.empty()) {
return std::nullopt;
}
if (mem_result.has_value()) {
auto [mem_start, mem_end] = mem_result.value();
std::shared_ptr<HeapIterator> mem_start_ptr =
std::make_shared<HeapIterator>();
*mem_start_ptr = mem_start;
auto start = TwoMergeIterator(mem_start_ptr, l0_iter_ptr, tranc_id);
auto end = TwoMergeIterator{};
return std::make_optional<std::pair<TwoMergeIterator, TwoMergeIterator>>(
start, end);
} else {
auto start = TwoMergeIterator(std::make_shared<HeapIterator>(), l0_iter_ptr,
tranc_id);
auto end = TwoMergeIterator{};
return std::make_optional<std::pair<TwoMergeIterator, TwoMergeIterator>>(
start, end);
}
}

谓词查询时, 查询的内容包括:

  1. memtable 中的内容
  2. sst 中的内容
    因此, LSMEngine::lsm_iters_monotony_predicate的执行逻辑是:
  3. 调用memtableiters_monotony_predicate函数, 获取memtable中满足谓词的迭代器
  4. 调用sstiters_monotony_predicate函数, 获取sst中满足谓词的迭代器
  5. memtablesst中的迭代器合并成一个TwoMergeIterator迭代器, 并返回

这里就可以看出, TwoMergeIterator的实现非常简单但功能又非常强大, 后续凡是涉及不同优先级迭代器的整合, 都可以复用这个TwoMergeIterator迭代器。

5 小节

本章将LSM Tree的各个组件补全了涉及事务id的操作, 下一章我们将继续完成engine部分对事物的id分配和管理工作