上一章我们对事物和MVCC
进行了简单的介绍,并简单介绍了接口的变化, 这一章我们逐层进行修改,以实现事务的语义。
这里部分内容和上一章有重复, 这里是安排有些冗余, 大家觉得重复略过就好
1 Block中的事务id 1.1 数据写入 这里首先回顾上一章中对编码数据格式的修改:
因此,我们第一步更改的是Block
的编码格式的代码,在Block
中添加一个trac_id
,这样当读取到Block
的某个键值对时,我们就可以知道这个键值对是哪个事务新增或修改的,能更好地帮我们实现不同事务、隔离级别的并发控制。
这里我们修改的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 bool Block::add_entry (const std::string &key, const std::string &value, uint64_t tranc_id, bool force_write) { if (!force_write && (cur_size () + key.size () + value.size () + 3 * sizeof (uint16_t ) + sizeof (uint64_t ) > capacity) && !offsets.empty ()) { return false ; } size_t entry_size = sizeof (uint16_t ) + key.size () + sizeof (uint16_t ) + value.size () + sizeof (uint64_t ); size_t old_size = data.size (); data.resize (old_size + entry_size); uint16_t key_len = key.size (); memcpy (data.data () + old_size, &key_len, sizeof (uint16_t )); memcpy (data.data () + old_size + sizeof (uint16_t ), key.data (), key_len); uint16_t value_len = value.size (); memcpy (data.data () + old_size + sizeof (uint16_t ) + key_len, &value_len, sizeof (uint16_t )); memcpy (data.data () + old_size + sizeof (uint16_t ) + key_len + sizeof (uint16_t ), value.data (), value_len); memcpy (data.data () + old_size + sizeof (uint16_t ) + key_len + sizeof (uint16_t ) + value_len, &tranc_id, sizeof (uint64_t )); offsets.push_back (old_size); return true ; }
这里做出的修改就是在写入value
后再写入了tranc_id
,且tranc_id
恒定为8个字节的整型数。在文件存储方面,也就只需要修改Block
的add_entry
函数,上层的sst
仅仅是调用Block
的add_entry
函数,而没有直接控制底层文件的写入,因此我们的修改方案也非常简单, 这得益于我们之前的设计模块较为分离.
1.2 数据读取 1.2.1 基础的事务id查询 同时, 在编码数据中添加了tranc_id
后, 我们也需要添加对应的读取函数, 这里的逻辑就是在指定偏移位置, 跳过key
和value
的长度, 然后读取tranc_id
即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 uint16_t Block::get_tranc_id_at (size_t offset) const { uint16_t key_len; memcpy (&key_len, data.data () + offset, sizeof (uint16_t )); size_t value_len_pos = offset + sizeof (uint16_t ) + key_len; uint16_t value_len; memcpy (&value_len, data.data () + value_len_pos, sizeof (uint16_t )); size_t tranc_id_pos = value_len_pos + sizeof (uint16_t ) + value_len; uint64_t tranc_id; memcpy (&tranc_id, data.data () + tranc_id_pos, sizeof (uint64_t )); return tranc_id; }
1.3.2 id的查询语义 当然, get_tranc_id_at
仅仅是查询指定位置的tranc_id
,但我们更多的时候是通过key
查询value
和tranc_id
. 这里二分查询的时候, 我们需要再参数中指定一个tranc_id
,然后根据这个tranc_id
来判断事务的可见性。这里的逻辑是:
每个事务都会被分配一个tranc_id
,这个tranc_id
是递增的,且从1开始。
事务查询时, 会结合具体的隔离级别, 根据tranc_id
来判断事务的可见性。我们如下设定:
如果事务查询时某个键值对的tranc_id
小于等于当前事务的tranc_id
, 表示这个键值对是更早创建的事务(或当前事务自己)修改的,因此这个键值对是可见的。
如果事务查询时某个键值对的tranc_id
大于当前事务的tranc_id
, 表示这个键值对是当前事务修改的,因此这个键值对是不可见的。
事务查询时也可以直接让存储引擎返回最新的键值对记录,这时将tranc_id
设置为0即可,表示不开启事务功能。
1.3.3 考虑事务语义的二分查询 在设计了事务id
的查询语义后,我们必须修改之前实现的二分查询的代码,以考虑事务的语义。因为在之前的设计中, 一个block
中相同的key
只会存在一次, 因此代码逻辑也很简单,但引入了事务id
后, 一个key
可能存在多个value
,这是因为多个事务可能同时修改同一个key
的值,而为了MVCC
的实现, 多个不同事务对相同key
的修改都需要进行记录, 因此我们需要再二分查询中考虑到重复的key
。
首先给出修改后的二分查询代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 std::optional<size_t > Block::get_idx_binary (const std::string &key, uint64_t tranc_id) { if (offsets.empty ()) { return std::nullopt ; } int left = 0 ; int right = offsets.size () - 1 ; while (left <= right) { int mid = left + (right - left) / 2 ; size_t mid_offset = offsets[mid]; int cmp = compare_key_at (mid_offset, key); if (cmp == 0 ) { auto new_mid = adjust_idx_by_tranc_id (mid, tranc_id); if (new_mid == -1 ) { return std::nullopt ; } return new_mid; } else if (cmp < 0 ) { left = mid + 1 ; } else { right = mid - 1 ; } } return std::nullopt ; }
这里首先通过二分查询找到指定key
的位置, 但需要注意的是, 这里找到的key
的位置并不一定是唯一的, 因为同一个key
可能存在多个value
, 因此我们需要在找到的位置上再进行判断, 判断事务的可见性并执行偏移, 这部分工作由adjust_idx_by_tranc_id
函数完成:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 int Block::adjust_idx_by_tranc_id (size_t idx, uint64_t tranc_id) { if (idx >= offsets.size ()) { return -1 ; } auto target_key = get_key_at (offsets[idx]); if (tranc_id != 0 ) { auto cur_tranc_id = get_tranc_id_at (offsets[idx]); if (cur_tranc_id <= tranc_id) { size_t prev_idx = idx; while (prev_idx > 0 && is_same_key (prev_idx - 1 , target_key)) { prev_idx--; auto new_tranc_id = get_tranc_id_at (offsets[prev_idx]); if (new_tranc_id > tranc_id) { return prev_idx + 1 ; } } return prev_idx; } else { size_t next_idx = idx + 1 ; while (next_idx < offsets.size () && is_same_key (next_idx, target_key)) { auto new_tranc_id = get_tranc_id_at (offsets[next_idx]); if (new_tranc_id <= tranc_id) { return next_idx; } next_idx++; } return -1 ; } } else { size_t prev_idx = idx; while (prev_idx > 0 && is_same_key (prev_idx - 1 , target_key)) { prev_idx--; } return prev_idx; } } bool Block::is_same_key (size_t idx, const std::string &target_key) const { if (idx >= offsets.size ()) { return false ; } return get_key_at (offsets[idx]) == target_key; }
这里需要注意的是, 这里的tranc_id
是按照从大到小的顺序排列的, 也就是说更大的tranc_id
对应的事务优先级更高, adjust_idx_by_tranc_id
的执行逻辑为:
如果开启了事务功能
使用当前位置的tranc_id
和目标tranc_id
进行比较, 如果当前位置的tranc_id
小于等于目标tranc_id
, 表示当前位置的记录是可见的, 因此需要向前查找更接近目标tranc_id
的记录
如果当前位置的tranc_id
大于目标tranc_id
, 表示当前位置的记录是不可见的, 因此需要向后查找
如果没有开启事务功能, 直接选择最大的事务id的记录返回
2 BlockIterator中的事务id 同样的, 引入事务id
后, 我们也需要修改BlockIterator
中的代码, 在使用迭代器查询或者遍历的时候, 需要跳过不可见的事务, 并且对多个key
(多个事务创建的, 有不同的tranc_id
) 只保留最接近当前tranc_id
的记录。
首先是迭代器构造函数需要跳过不可见或重复的记录:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 BlockIterator::BlockIterator (std::shared_ptr<Block> b, size_t index, uint64_t tranc_id) : block (b), current_index (index), tranc_id_ (tranc_id), cached_value (std::nullopt ) { skip_by_tranc_id (); } void BlockIterator::skip_by_tranc_id () { if (tranc_id_ == 0 ) { cached_value = std::nullopt ; return ; } while (current_index < block->offsets.size ()) { size_t offset = block->get_offset_at (current_index); auto tranc_id = block->get_tranc_id_at (offset); if (tranc_id <= tranc_id_) { break ; } ++current_index; } cached_value = std::nullopt ; }
自增运算符重载也需要进行类似的判断:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 BlockIterator &BlockIterator::operator ++() { if (block && current_index < block->size ()) { auto prev_idx = current_index; auto prev_offset = block->get_offset_at (prev_idx); auto prev_entry = block->get_entry_at (prev_offset); ++current_index; while (block && current_index < block->size ()) { auto cur_offset = block->get_offset_at (current_index); auto cur_entry = block->get_entry_at (cur_offset); if (cur_entry.key != prev_entry.key) { break ; } ++current_index; } skip_by_tranc_id (); } return *this ; }
3 内存部分中的事务id 之前的内容知识对SST
中的Block
部分和BlockIterator
部分进行了修改, 使其包含了事务id
的信息。对于内存部分中的MemTable
和MemTableIterator
部分, 我们也需要做类似的修改, 否则在开启事务功能后, 会出现一些问题。
3.1 统一查询接口 我们之前的实现中,Skiplist::get
的定义是:
1 std::optional<std::string> get (const std::string &key) ;
这里返回的直接是value
的值, 但在引入了事务和MVCC后, 我们需要判断查询结果的tranc_id
是否小于等于当前tranc_id
, 如果小于等于, 表示该记录是可见的, 否则表示该记录是不可见的。而之前的get
返回值显然不能提供这么复杂的信息, 因此我们需要统一查询接口, 所有的查询组件都返回一个迭代器, 这个迭代器需要继承一个虚基类, 其纯虚函数能够支持所有的组件查询key
, value
, tranc_id
等关键数据。
因此我们实现让虚基类包含相应的纯虚函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 class BaseIterator {public : using value_type = std::pair<std::string, std::string>; using pointer = value_type *; using reference = value_type &; virtual BaseIterator &operator ++() = 0 ; virtual bool operator ==(const BaseIterator &other) const = 0 ; virtual bool operator !=(const BaseIterator &other) const = 0 ; virtual value_type operator *() const = 0 ; virtual IteratorType get_type () const = 0 ; virtual uint64_t get_tranc_id () const = 0 ; virtual bool is_end () const = 0 ; virtual bool is_valid () const = 0 ; }; class SstIterator ;struct SearchItem { std::string key_; std::string value_; uint64_t tranc_id_; int idx_; int level_; SearchItem () = default ; SearchItem (std::string k, std::string v, int i, int l, uint64_t tranc_id) : key_ (std::move (k)), value_ (std::move (v)), idx_ (i), level_ (l), tranc_id_ (tranc_id) {} }; bool operator <(const SearchItem &a, const SearchItem &b);bool operator >(const SearchItem &a, const SearchItem &b);bool operator ==(const SearchItem &a, const SearchItem &b);
这里所有的迭代器都需要继承这个基类, 并且需要实现相应的纯虚函数。同时, 这里的SearchItem
用于三元组key
, value
, tranc_id
的比较, 并且需要实现相应的比较运算符:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 bool operator <(const SearchItem &a, const SearchItem &b) { if (a.key_ != b.key_) { return a.key_ < b.key_; } if (a.tranc_id_ > b.tranc_id_) { return true ; } if (a.level_ < b.level_) { return true ; } return a.idx_ < b.idx_; } bool operator >(const SearchItem &a, const SearchItem &b) { if (a.key_ != b.key_) { return a.key_ > b.key_; } if (a.tranc_id_ < b.tranc_id_) { return true ; } if (a.level_ < b.level_) { return true ; } return a.idx_ > b.idx_; } bool operator ==(const SearchItem &a, const SearchItem &b) { return a.idx_ == b.idx_ && a.key_ == b.key_; }
这里的比较逻辑也是优先比较key
, key
相同时比较tranc_id
, tranc_id
相同时比较level
, level
相同时比较idx
。不过需要注意的是, tranc_id
更大表示优先级更高。同时这里的level
表示SST
的level
, 从0
开始, 0
表示level0
, 1
表示level1
, 2
表示level2
, 以此类推。idx
则表示内存中的MemTable
的比较依据(这里不熟悉可以复习下之前的文档)
然后, 所有的迭代器都继承这个基类BaseIterator
, 并且需要实现相应的纯虚函数, 这里的代码就不具体贴出来了, 都比较简单, 可以参考完整的代码仓库
3.2 Memtable接口的变化 这里将MemTable
的所有读写接口都进行修改, 一方面加入trabnc_id
这个参数, 另一方面统一迭代器作为返回的接口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 class MemTable { friend class TranContext ; friend class HeapIterator ; private : void put_ (const std::string &key, const std::string &value, uint64_t tranc_id) ; SkipListIterator get_ (const std::string &key, uint64_t tranc_id) ; SkipListIterator cur_get_ (const std::string &key, uint64_t tranc_id) ; SkipListIterator frozen_get_ (const std::string &key, uint64_t tranc_id) ; void remove_ (const std::string &key, uint64_t tranc_id) ; void frozen_cur_table_ () ; public : MemTable (); ~MemTable (); void put (const std::string &key, const std::string &value, uint64_t tranc_id) ; void put_batch (const std::vector<std::pair<std::string, std::string>> &kvs, uint64_t tranc_id) ; SkipListIterator get (const std::string &key, uint64_t tranc_id) ; void remove (const std::string &key, uint64_t tranc_id) ; void remove_batch (const std::vector<std::string> &keys, uint64_t tranc_id) ; void clear () ; std::shared_ptr<SST> flush_last (SSTBuilder &builder, std::string &sst_path, size_t sst_id, std::shared_ptr<BlockCache> block_cache) ; void frozen_cur_table () ; size_t get_cur_size () ; size_t get_frozen_size () ; size_t get_total_size () ; HeapIterator begin (uint64_t tranc_id) ; HeapIterator iters_preffix (const std::string &preffix, uint64_t tranc_id) ; std::optional<std::pair<HeapIterator, HeapIterator>> iters_monotony_predicate (uint64_t tranc_id, std::function<int (const std::string &)> predicate); HeapIterator end () ; private : std::shared_ptr<SkipList> current_table; std::list<std::shared_ptr<SkipList>> frozen_tables; size_t frozen_bytes; std::shared_mutex frozen_mtx; std::shared_mutex cur_mtx; };
4 SST中的事务id 之前我们已经在block
中添加了tranc_id
这个数据, 并(key
, value
)的二元组转化为了(key
, value
, tranc_id
)的三元组存储, 现在我们需要进一步修改整合block
的sst
4.1 新增元数据 为了方便在事务查询时, 更直接地判断当前sst
是否包含指定的事务id, 我们需要新增如下元数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 class SST : public std::enable_shared_from_this<SST> { friend class SSTBuilder ; friend std::optional<std::pair<SstIterator, SstIterator>> sst_iters_monotony_predicate ( std::shared_ptr<SST> sst, uint64_t tranc_id, std::function<int (const std::string &)> predicate); private : FileObj file; std::vector<BlockMeta> meta_entries; uint32_t bloom_offset; uint32_t meta_block_offset; size_t sst_id; std::string first_key; std::string last_key; std::shared_ptr<BloomFilter> bloom_filter; std::shared_ptr<BlockCache> block_cache; uint64_t min_tranc_id_ = UINT64_MAX; uint64_t max_tranc_id_ = 0 ; ... };
4.2 SST的构建 构建SST
的SSTBuilder
中, 需要记录min_tranc_id_
和max_tranc_id_
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 void SSTBuilder::add (const std::string &key, const std::string &value, uint64_t tranc_id) { if (first_key.empty ()) { first_key = key; } if (bloom_filter != nullptr ) { bloom_filter->add (key); } max_tranc_id_ = std::max (max_tranc_id_, tranc_id); min_tranc_id_ = std::min (min_tranc_id_, tranc_id); bool force_write = key == last_key; if (block.add_entry (key, value, tranc_id, force_write)) { last_key = key; return ; } finish_block (); block.add_entry (key, value, tranc_id, false ); first_key = key; last_key = key; }
同时这里涉及到一个非常重要的易错点(我debug了好久), 当目前add
的键值对的key
相同时, 一定不能使其跨block
分布, 这是因为如果相同的key
跨block
分布, 那么在block
中二分查找时, 会出现定位错误的情况, 具体距离就是:
sst
构建时, (key=1, tranc_id=2)
被添加到了block5
, (key=1, tranc_id=1)
被添加到了block6
现在tranc_id=3
的事务查询key=1
, 在二分查询时定位到了block6
, 返回结果是(key=1, tranc_id=1)
的数据
实际的数据应该是更新的(key=1, tranc_id=2)
因此, 让相同的key
出现在同一个block
中, 才满足二分查询的流程
这里的做法是, 每次插入时, 与上一次操作的key
进行比较, 如果相同, 则将force_write
参数设置为true
, 确保key
相同时, 都会插入到同一个block
中
4.3 SSTIterator的修改 我们之前的实现中, sst
中数据的具体查询是交由SSTIterator
完成的, 因此我们需要修改SSTIterator
的实现, 确保在sst
中查询时, tranc_id
的判断正确:
首先是在头文件中添加数据成员:
1 2 3 4 5 class SstIterator : public BaseIterator { ... uint64_t max_tranc_id_; ... };
然后在具体的定位函数seek
中, 添加对事物可见性的判断:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 void SstIterator::seek (const std::string &key) { if (!m_sst) { m_block_it = nullptr ; return ; } try { m_block_idx = m_sst->find_block_idx (key); if (m_block_idx == -1 || m_block_idx >= m_sst->num_blocks ()) { m_block_it = nullptr ; m_block_idx = m_sst->num_blocks (); return ; } auto block = m_sst->read_block (m_block_idx); if (!block) { m_block_it = nullptr ; return ; } m_block_it = std::make_shared <BlockIterator>(block, key, max_tranc_id_); if (m_block_it->is_end ()) { m_block_idx = m_sst->num_blocks (); m_block_it = nullptr ; return ; } } catch (const std::exception &) { m_block_it = nullptr ; return ; } }
这里的关键是:
1 m_block_it = std::make_shared <BlockIterator>(block, key, max_tranc_id_);
通过max_tranc_id_
将事务可见性的相关操作转交给BlockIterator
实现
5 TwoMergeIterator 这里我们要实现一个新的迭代器TwoMergeIterator
, 它是两个迭代器(记为a和b)的合并。这两个迭代器都是BaseIterator
的子类, 具有优先级先后顺序。例如,a
的优先级更高, key
相同时需要先遍历a
,再遍历b
。
这里TwoMergeIterator
的核心设计思路就是对已有的迭代器进行封装, 按照一定优先级规则对2个迭代器进行自增操作和数据滤除操作。因为其需要复用已有的迭代器,所以我将这部分内容放到实现了BaseIterator
和基础的tranc_id
之后.
4.1 TwoMergeIterator的定义 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 class TwoMergeIterator : public BaseIterator {private : std::shared_ptr<BaseIterator> it_a; std::shared_ptr<BaseIterator> it_b; bool choose_a = false ; mutable std::shared_ptr<value_type> current; uint64_t max_tranc_id_ = 0 ; void update_current () const ; public : TwoMergeIterator (); TwoMergeIterator (std::shared_ptr<BaseIterator> it_a, std::shared_ptr<BaseIterator> it_b, uint64_t max_tranc_id); bool choose_it_a () ; void skip_by_tranc_id () ; void skip_it_b () ; virtual BaseIterator &operator ++() override ; virtual bool operator ==(const BaseIterator &other) const override ; virtual bool operator !=(const BaseIterator &other) const override ; virtual value_type operator *() const override ; virtual IteratorType get_type () const override ; virtual uint64_t get_tranc_id () const override ; virtual bool is_end () const override ; virtual bool is_valid () const override ; pointer operator ->() const ; };
这里的TwoMergeIterator
继承自BaseIterator
, 并且需要传入两个迭代器it_a
和it_b
, 且前者的优先级更改, 包装规则为:
二者key
不同时, 按照key
的比较规则进行遍历
二者key
相同时, it_a
优先级高, 先选择it_a
的键值对为当前迭代器的键值对
当任意一个迭代器遍历完成时, 剩余迭代器则直接遍历
相同key
只会遍历一次, 也就是说, 如果it_a
和it_b
都遍历到同一个key
, 那么it_a
的键值对为当前迭代器的键值对, it_b
则直接跳过该键值对。
4.2 TwoMergeIterator的实现 代码实现也很简单:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 TwoMergeIterator::TwoMergeIterator () {} TwoMergeIterator::TwoMergeIterator (std::shared_ptr<BaseIterator> it_a, std::shared_ptr<BaseIterator> it_b, uint64_t max_tranc_id) : it_a (std::move (it_a)), it_b (std::move (it_b)), max_tranc_id_ (max_tranc_id) { skip_by_tranc_id (); skip_it_b (); choose_a = choose_it_a (); } bool TwoMergeIterator::choose_it_a () { if (it_a->is_end ()) { return false ; } if (it_b->is_end ()) { return true ; } return (**it_a).first < (**it_b).first; } void TwoMergeIterator::skip_it_b () { if (!it_a->is_end () && !it_b->is_end () && (**it_a).first == (**it_b).first) { ++(*it_b); } } void TwoMergeIterator::skip_by_tranc_id () { if (max_tranc_id_ == 0 ) { return ; } while (it_a->get_tranc_id () > max_tranc_id_) { ++(*it_a); } while (it_b->get_tranc_id () > max_tranc_id_) { ++(*it_b); } } BaseIterator &TwoMergeIterator::operator ++() { if (choose_a) { ++(*it_a); } else { ++(*it_b); } skip_by_tranc_id (); skip_it_b (); choose_a = choose_it_a (); return *this ; }
这里贴出几个关键的函数, 其余函数实现都很简单直接, 就略了, 可以自行查看代码。
4.3 TwoMergeIterator的使用场景 这里首先就是顶层engine
的谓词查询:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 std::optional<std::pair<TwoMergeIterator, TwoMergeIterator>> LSMEngine::lsm_iters_monotony_predicate ( uint64_t tranc_id, std::function<int (const std::string &)> predicate) { auto mem_result = memtable.iters_monotony_predicate (tranc_id, predicate); std::vector<SearchItem> item_vec; for (auto &[sst_level, sst_ids] : level_sst_ids) { for (auto &sst_id : sst_ids) { auto sst = ssts[sst_id]; auto result = sst_iters_monotony_predicate (sst, tranc_id, predicate); if (!result.has_value ()) { continue ; } auto [it_begin, it_end] = result.value (); for (; it_begin != it_end && it_begin.is_valid (); ++it_begin) { if (tranc_id != 0 && it_begin.get_tranc_id () > tranc_id) { continue ; } if (!item_vec.empty () && item_vec.back ().key_ == it_begin.key ()) { continue ; } item_vec.emplace_back (it_begin.key (), it_begin.value (), -sst_id, sst_level, it_begin.get_tranc_id ()); } } } std::shared_ptr<HeapIterator> l0_iter_ptr = std::make_shared <HeapIterator>(item_vec, tranc_id); if (!mem_result.has_value () && item_vec.empty ()) { return std::nullopt ; } if (mem_result.has_value ()) { auto [mem_start, mem_end] = mem_result.value (); std::shared_ptr<HeapIterator> mem_start_ptr = std::make_shared <HeapIterator>(); *mem_start_ptr = mem_start; auto start = TwoMergeIterator (mem_start_ptr, l0_iter_ptr, tranc_id); auto end = TwoMergeIterator{}; return std::make_optional<std::pair<TwoMergeIterator, TwoMergeIterator>>( start, end); } else { auto start = TwoMergeIterator (std::make_shared <HeapIterator>(), l0_iter_ptr, tranc_id); auto end = TwoMergeIterator{}; return std::make_optional<std::pair<TwoMergeIterator, TwoMergeIterator>>( start, end); } }
谓词查询时, 查询的内容包括:
memtable
中的内容
sst
中的内容 因此, LSMEngine::lsm_iters_monotony_predicate
的执行逻辑是:
调用memtable
的iters_monotony_predicate
函数, 获取memtable
中满足谓词的迭代器
调用sst
的iters_monotony_predicate
函数, 获取sst
中满足谓词的迭代器
将memtable
和sst
中的迭代器合并成一个TwoMergeIterator
迭代器, 并返回
这里就可以看出, TwoMergeIterator
的实现非常简单但功能又非常强大, 后续凡是涉及不同优先级迭代器的整合, 都可以复用这个TwoMergeIterator
迭代器。
5 小节 本章将LSM Tree
的各个组件补全了涉及事务id
的操作, 下一章我们将继续完成engine
部分对事物的id
分配和管理工作