C++从零开始实现LSM-Tree-KV存储-15-事务与MVCC设计1

目前我们已经得到了一个能跑的引擎, 但目前只有最基本的API, 现在我们来实现事务特性。

代码仓库:ToniXWD/toni-lsm: A KV storage engine based on LSM Tree, supporting Redis RESP

欢迎点个Star

1 KV存储中的事务

1.1 事务介绍

在数据库系统中,事务(Transaction)是指一组原子性操作的执行单元,需要满足ACID特性。这里简单介绍下ACID的基本性质:

特性 定义 技术实现 KV存储中的特殊表现
原子性
(Atomicity)
事务内的操作要么全部成功,要么全部失败,不存在中间状态。 - WAL(Write-Ahead Logging)
- 两阶段提交(2PC)
- 操作回滚日志
- 单键操作天然原子
- 跨键原子性需显式事务(如批量提交)
- LSM-Tree依赖WAL保证崩溃恢复
一致性
(Consistency)
事务执行后数据库必须保持预设的业务规则(如唯一性、完整性约束)。 - 关系型:外键、触发器
- 声明式约束(DDL)
- 事务回滚机制
- 业务逻辑一致性需应用层实现
隔离性
(Isolation)
并发事务相互隔离,避免脏读、不可重复读、幻读。 - 锁机制(悲观锁)
- MVCC(多版本并发控制)
- 快照隔离(Snapshot Isolation)
- 通常采用乐观锁(版本号校验)
- 全局时间戳实现快照隔离
- 弱隔离级别常见(如Read Committed)
持久性
(Durability)
事务提交后,数据永久存储,即使系统崩溃也不丢失。 - 同步刷盘(fsync
- 副本复制(Replication)
- 冗余存储(如RAID)
- LSM-Tree依赖SSTable落盘
- 内存数据需通过WAL持久化
- 异步刷盘可能牺牲部分持久性(如Redis AOF)

在这四个基本性质中, KV存储由于数据较为简单, 不存在类似关系型数据库中的外键、触发器、声明式约束等复杂业务规则, 因此一致性是比较容易实现的,基本上你实现了一致性、隔离性,一致性就自然满足了,这里我们探讨下其他几个形状在KV存储中的实现逻辑:

  1. 原子性:通过批量化操作即可, 可以将一个事务的操作先暂存起来, 在提交时统一应用到存储引擎的状态机中。
  2. 隔离性:这里涉及到隔离级别, 会在后面统一介绍。
  3. 持久性LSM-Tree的持久化依赖SSTable落盘,但我们插入一些数据后, 这些数据肯定优先存在于内存中, 在内存容量达到阈值后才会刷盘, 为保证持久性, 可以用以下两个方案
    1. 事务提交后强制刷盘到SST, 但这样可能导致刷入的L0 SST大小过小, 加大了L0 SST合并时的计算量(后续章节介绍compact)
    2. 事务提交时先将操作写入WAL(Write-Ahead Logging), 同时存储引擎维护刷入SST的最大事务序号, 在重启或崩溃恢复时根据WAL重放操作(后续会介绍)

1.2 隔离级别

1.2.1 隔离级别的定义与分类

在并发事务场景下,隔离性通过不同的隔离级别实现不同程度的可见性控制。我们先回顾下关系型数据库中的隔离级别:

这里如果对这些隔离级别不熟悉, 建议先学下数据库的课程

隔离级别 脏读(Dirty Read) 不可重复读(Non-Repeatable Read) 幻读(Phantom Read) 典型实现方案
读未提交 允许 允许 允许 - 无版本控制
- 直接读取内存最新值
读已提交 禁止 允许 允许 - 单版本快照
- 每次读获取最新提交版本
可重复读 禁止 禁止 允许 - 多版本快照
- 事务级版本锚定(如MySQL InnoDB的MVCC)
可串行化 禁止 禁止 禁止 - 严格锁机制
- 冲突范围检测(如FoundationDB)

注:快照隔离下幻读仍可能发生,但可通过追加范围锁避免

1.2.2 KV存储中事务冲突和隔离级别的案例

在LSM-Tree结构的KV存储中,我们举一个例子说明隔离级别和实物冲突的联系:

在下面的场景时序图, 事务按照创建的时间从小大大分配ID, 在这个

Timeline 事务A (ID=100) 事务B (ID=101) Key1状态
T0 Begin Version=0 (初始值)
T1 Read Key1 (Version=0) Begin
T2 Modify Key1→ValueA Read Key1 (Version=0)
T3 Modify Key1→ValueB
T4 Commit ✅ (Version=1) Version=1 (ValueB)
T5 Attempt Commit ❌ 检测到Key1已更新

在这个例子中, 更早创建的事务A和和更晚创建的事务B在读写了同一个Key1, 事务B先提交, 并且更新了Key1的值为ValueB, 事务A在提交后, 数据库中面临这样的问题: 以哪一个版本的修改为准?

以上问题就是一种事务冲突,在KV存储中,事务冲突的解决方式直接取决于隔离级别的设计。以下是不同隔离级别的处理差异:

隔离级别 事务A提交结果 原因 最终Key1值
读未提交 ✅ 成功 允许覆盖未提交数据,但导致事务B的修改丢失(违反原子性) ValueA
读已提交 ❌ 中止 检测到Key1已提交新版本(Version=1 > 事务A的起始版本) ValueB
可重复读 ❌ 中止 事务A的读快照锁定Version=0,但写冲突仍存在 ValueB
可串行化 ❌ 中止 通过范围锁阻止事务B写入,或强制事务串行执行 ValueB

以上是put时是事务冲突, 基于这个案例, 同时有2个事务再去读取key1:

假设初始时刻 Key1 = Value0

Timeline 事务A (ID=100) 事务B (ID=101) 事务C (ID=102) 事务D (ID=103)
T0 Begin Key1 = Value0 (初始值) Key1 = Value0 (初始值)
T1 Read Key1 Begin
T2 Modify Key1→ValueA Begin
T3 Read Key1 (Version=❓)
T4 Read Key1
T5 Modify Key1→ValueB
T6 Attempt Commit ❓
T7 Begin
T8 Read Key1 (Key1=❓) Read Key1 (Key1=❓)
T9 Attempt Commit ❓
T10 Read Key1 (Key1=❓) Read Key1 (Key1=❓)

这个场景下, 在事务A和B操作时, 另外还存在2个事务C和D在读取key1的值, 事务C和D的读取结果是仍然取决于隔离级别:

1. 读未提交(Read Uncommitted)

允许读取未提交的中间值(包括可能回滚的脏数据)。

事务 T3(C读取) T8(C/D读取) T10(C/D读取) 事务提交结果 结果解释
A - - - ✅ 成功 不检测写冲突,直接覆盖事务B的提交(Key1=ValueA)
B - - - ✅ 成功 事务B先提交(Key1=ValueB),但被事务A覆盖
C ValueA ValueB ValueA - T3读取事务A未提交的ValueA;T8时事务B已经提交,读ValueB;T10事务A提交后读ValueA
D - ValueB ValueA - T8时事务A未提交,读ValueB;T10事务A提交后读ValueA

最终Key1值:ValueA(事务A覆盖事务B)
风险:事务B的合法提交被覆盖,数据一致性被破坏。


2. 读已提交(Read Committed)

仅读取已提交的数据,但同一事务内多次读取结果可能不同。

事务 T3(C读取) T8(C/D读取) T10(C/D读取) 事务提交结果 结果解释
A - - - ✅ 成功
or
❌中止
只确保读取时的数据是提交的, 但不确保提交时没有冲突, 取决于具体的实现
B - - - ✅ 成功 事务B提交成功(Key1=ValueB)
C Value0 ValueB ValueA - T3时事务A/B均未提交,读Value0;T8时事务B已提交,读ValueB, 最后T10时事务A提交,读ValueA
D - ValueB ValueA - T8时事务B已提交,读ValueB, 最后T10时事务A提交,读ValueA

最终Key1值:ValueA(事务A覆盖事务B)
风险:事务B的合法提交被覆盖,数据一致性被破坏。
相较于前者的优化: 读取的数据一定是已经提交的数据


3. 可重复读(Repeatable Read)

基于首次读取时的值锚定,保证多次读取结果一致。

事务 T3(C读取) T8(C/D读取) T10(C/D读取) 事务提交结果 结果解释
A - - - ❌ 中止 提交时检测到Key1已被事务B修改
B - - - ✅ 成功 事务B提交成功(Key1=ValueB)
C Value0 Value0 Value0 - 事务C首次读取锚定Value0,后续读取强制复用
D - ValueB ValueB - 事务D首次读取时事务B已提交,锚定ValueB

最终Key1值:ValueB
实现难点:需在内存中维护事务首次读取的键值锚定表,防止Compaction清理旧版本。


5. 可串行化(Serializable)

通过锁机制强制事务串行执行,完全禁止并发冲突。

事务 T3(C读取) T8(C/D读取) T10(C/D读取) 事务提交结果 结果解释
A - - - ❌ 中止 事务C持有Key1的共享锁,事务A尝试获取排他锁时被阻塞,最终超时中止
B - - - ✅ 成功 事务B在事务C释放锁后获取排他锁并提交
C Value0 Value0 Value0 - 事务C持有共享锁,保证读取一致性
D - ValueB ValueB - 事务D在事务B提交后读取ValueB

最终Key1值:ValueB
锁竞争时序:事务C的共享锁阻塞事务A/B,事务B在事务C释放锁后提交。


通过这个案例, 我们可以复习下MVCC在读已提交和可重复读的区别:

  1. 读已提交: 读取时,快照基于当前这一次操作的时间
  2. 可重复读: 读取时,快照基于事务创建的时间

3 本KV引擎的MVCC和事务设计

3.1 添加的元数据

3.1.2 架构设计

通过事前的MVCC运行机制的分析可知, 在可重复读级别下, 我们需要记录这个事务开始的时间, 其实只需要记录这个事务的id就可以了, 因为我们可以设计一个事务管理器, 使得事务开始的时间严格按照事务id排序.

另一方面, 如果是读已提交, 我们需要按理说需要记录每个读操作的时间, 但是实际上, 我们的KV存储引擎是追加写入的, 我们本身读取的相同key就是按照最近时间读取的, 不需要额外记录本次操作的时间戳。换句话说,假设你遍历整个存储引擎的键值对, 其都是按照插入时间从近到久排序的。

故我们首先需要再键值对中记录事务id:

trac_id

同时, 我们需要在每个SST中记录事务id的范围:

MemTable-SST

3.1.2 代码修改

编码代码修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// src/block/block.cpp
bool Block::add_entry(const std::string &key, const std::string &value,
uint64_t tranc_id, bool force_write) {
if (!force_write &&
(cur_size() + key.size() + value.size() + 3 * sizeof(uint16_t) +
sizeof(uint64_t) >
capacity) &&
!offsets.empty()) {
return false;
}
// 计算entry大小:key长度(2B) + key + value长度(2B) + value
size_t entry_size = sizeof(uint16_t) + key.size() + sizeof(uint16_t) +
value.size() + sizeof(uint64_t);
size_t old_size = data.size();
data.resize(old_size + entry_size);

// 写入key长度
uint16_t key_len = key.size();
memcpy(data.data() + old_size, &key_len, sizeof(uint16_t));

// 写入key
memcpy(data.data() + old_size + sizeof(uint16_t), key.data(), key_len);

// 写入value长度
uint16_t value_len = value.size();
memcpy(data.data() + old_size + sizeof(uint16_t) + key_len, &value_len,
sizeof(uint16_t));

// 写入value
memcpy(data.data() + old_size + sizeof(uint16_t) + key_len + sizeof(uint16_t),
value.data(), value_len);

// 写入事务id
memcpy(data.data() + old_size + sizeof(uint16_t) + key_len +
sizeof(uint16_t) + value_len,
&tranc_id, sizeof(uint64_t));

// 记录偏移
offsets.push_back(old_size);
return true;
}

我们每次添加一个键值对时, 都需要记录事务id

迭代器增加事务id的记录, 并且比较逻辑是, 事务id越大, 读到的值就是最新的, 排序优先级更高:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// include/skiplist/skiplist.h
struct SkipListNode {
std::string key_; // 节点存储的键
std::string value_; // 节点存储的值
uint64_t tranc_id_; // 事务 id
std::vector<std::shared_ptr<SkipListNode>>
forward_; // 指向不同层级的下一个节点的指针数组
std::vector<std::weak_ptr<SkipListNode>>
backward_; // 指向不同层级的下一个节点的指针数组
SkipListNode(const std::string &k, const std::string &v, int level,
uint64_t tranc_id)
: key_(k), value_(v), forward_(level, nullptr),
backward_(level, std::weak_ptr<SkipListNode>()), tranc_id_(tranc_id) {}
void set_backward(int level, std::shared_ptr<SkipListNode> node) {
backward_[level] = std::weak_ptr<SkipListNode>(node);
}

bool operator==(const SkipListNode &other) const {
return key_ == other.key_ && value_ == other.value_ &&
tranc_id_ == other.tranc_id_;
}

bool operator!=(const SkipListNode &other) const { return !(*this == other); }

bool operator<(const SkipListNode &other) const {
if (key_ == other.key_) {
// key 相等时,trans_id 更大的优先级更高
return tranc_id_ > other.tranc_id_;
}
return key_ < other.key_;
}
bool operator>(const SkipListNode &other) const {
if (key_ == other.key_) {
// key 相等时,trans_id 更大的优先级更高
return tranc_id_ < other.tranc_id_;
}
return key_ > other.key_;
}
};

skiplistput时, 需要先定位到插入位置, 由于此时skiplist可能存储多个相同的key(事务id不同), 故查询的逻辑发生了变化, 需要同时整个keytranc_id:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// src/skiplist/skipList.cpp
void SkipList::put(const std::string &key, const std::string &value,
uint64_t tranc_id) {
...

// 从最高层开始查找插入位置
// 这里需要解引用, 用之前重载的`SkiplistNode`的比较函数进行排序, 而不是仅仅比较`key`
for (int i = current_level - 1; i >= 0; --i) {
while (current->forward_[i] && *current->forward_[i] < *new_node) {
current = current->forward_[i];
}
update[i] = current;
}

...
}

查询时的逻辑也会发生变化, 和put是类似的, 这里不进一步展示了

3.2 接口的变化

现在我们存储的数据从二元组(key, value)变成了(key, value, tranc_id), 我们需要修改接口, 使得putget等接口的数据类型也发生变化:

迭代器定义更新, 使其包含事务id, 这里我们定义一个虚基类, 包含get_tranc_id()返回事务id:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class BaseIterator {
public:
using value_type = std::pair<std::string, std::string>;
using pointer = value_type *;
using reference = value_type &;

virtual BaseIterator &operator++() = 0;
virtual bool operator==(const BaseIterator &other) const = 0;
virtual bool operator!=(const BaseIterator &other) const = 0;
virtual value_type operator*() const = 0;
virtual IteratorType get_type() const = 0;
virtual uint64_t get_tranc_id() const = 0;
virtual bool is_end() const = 0;
virtual bool is_valid() const = 0;
};

其余getapi返回的都是这个基类迭代器的继承类, 例如Skiplistget定义变成了:

1
SkipListIterator SkipList::get(const std::string &key, uint64_t tranc_id);

而不是之前的返回一个std::pair<std::string, std::string>

3.3 查询的变化

查询时, 我们需要指定一个事务id, 通过id判断如何启用mvcc机制, 目前我们实现的隔离级别只有(读未提交, 读已提交, 可重复读)

  1. 若为0表示我们的事务隔离级别是读未提交读已提交(因为直接查询最新的记录即可, 不需要判断失误id是否合法)
  2. 若指定的事务id, 并指定了事务的隔离级别为可重复读, 则需要判断id是否合法, 若不合法, 则返回nullptr

本项目只在commit后才将事务更改的键值对加入数据库, 否则只会暂存, 因此读已提交不需要判断事务id

skiplist::get
这里我们先展示体层组件Skiplist::get的变化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
SkipListIterator SkipList::get(const std::string &key, uint64_t tranc_id) {
// std::shared_lock<std::shared_mutex> slock(rw_mutex);

auto current = head;
// 从最高层开始查找
for (int i = current_level - 1; i >= 0; --i) {
while (current->forward_[i] && current->forward_[i]->key_ < key) {
current = current->forward_[i];
}
}
// 移动到最底层
current = current->forward_[0];
if (tranc_id == 0) {
// 如果没有开启可重复读,直接比较key即可
// 如果找到匹配的key,返回value
if (current && current->key_ == key) {
return SkipListIterator{current};
}
} else {
while (current && current->key_ == key) {
// 如果开启了可重复读事务,只返回小于等于事务id的值
if (tranc_id != 0) {
if (current->tranc_id_ <= tranc_id) {
// 满足事务可见性
return SkipListIterator{current};

} else {
// 否则跳过
current = current->forward_[0];
}
} else {
// 没有开启事务
return SkipListIterator{current};
}
}
}
// 未找到返回空
return SkipListIterator{};
}

变化只有一个, 如果开启了可重复读, 则需要判断tranc_id是否小于等于当前节点的tranc_id, 若是, 则返回当前节点, 否则跳过, 直到找到满足条件的节点或遍历完链表节点

memtable组件的get是对skiplistget的封装, 改动类似, 这里就不展示了, 有兴趣去看仓库源码

sst::get
sst查询时返回的是迭代器SStIterator, 其会根据key进行定位, 现在其定位的依据需要额外添加事务id:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// src/sst/sst_iterator.cpp
void SstIterator::seek(const std::string &key) {
if (!m_sst) {
m_block_it = nullptr;
return;
}

try {
m_block_idx = m_sst->find_block_idx(key);
if (m_block_idx == -1 || m_block_idx >= m_sst->num_blocks()) {
// 置为 end
// TODO: 这个边界情况需要添加单元测试
m_block_it = nullptr;
m_block_idx = m_sst->num_blocks();
return;
}
auto block = m_sst->read_block(m_block_idx);
if (!block) {
m_block_it = nullptr;
return;
}
m_block_it = std::make_shared<BlockIterator>(block, key, max_tranc_id_);
if (m_block_it->is_end()) {
// block 中找不到
m_block_idx = m_sst->num_blocks();
m_block_it = nullptr;
return;
}
} catch (const std::exception &) {
m_block_it = nullptr;
return;
}
}

其返回的迭代器是对BlockIterator的封装, 因此需要递归地修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
BlockIterator::BlockIterator(std::shared_ptr<Block> b, const std::string &key,
uint64_t tranc_id)
: block(b), tranc_id_(tranc_id), cached_value(std::nullopt) {
auto key_idx_ops = block->get_idx_binary(key, tranc_id);
if (key_idx_ops.has_value()) {
current_index = key_idx_ops.value();
} else {
current_index = block->offsets.size();
}
}

std::optional<size_t> Block::get_idx_binary(const std::string &key,
uint64_t tranc_id) {
if (offsets.empty()) {
return std::nullopt;
}
// 二分查找
int left = 0;
int right = offsets.size() - 1;

while (left <= right) {
int mid = left + (right - left) / 2;
size_t mid_offset = offsets[mid];

int cmp = compare_key_at(mid_offset, key);

if (cmp == 0) {
// 找到key,返回对应的value
// 还需要判断事务id可见性
auto new_mid = adjust_idx_by_tranc_id(mid, tranc_id);
if (new_mid == -1) {
return std::nullopt;
}
return new_mid;
} else if (cmp < 0) {
// 中间的key小于目标key,查找右半部分
left = mid + 1;
} else {
// 中间的key大于目标key,查找左半部分
right = mid - 1;
}
}

return std::nullopt;
}

这里的get_idx_binary就是核心的查询逻辑了, 这里在找到key相同时, 还通过adjust_idx_by_tranc_id调教位置, 保证key对应的事务id是满足当前查询的隔离性的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// 相同的key连续分布, 且相同的key的事务id从大到小排布
// 这里的逻辑是找到最接近 tranc_id 的键值对的索引位置
int Block::adjust_idx_by_tranc_id(size_t idx, uint64_t tranc_id) {
if (idx >= offsets.size()) {
return -1; // 索引超出范围
}

auto target_key = get_key_at(offsets[idx]);

if (tranc_id != 0) {
auto cur_tranc_id = get_tranc_id_at(offsets[idx]);

if (cur_tranc_id <= tranc_id) {
// 当前记录可见,向前查找更接近的目标
size_t prev_idx = idx;
while (prev_idx > 0 && is_same_key(prev_idx - 1, target_key)) {
prev_idx--;
auto new_tranc_id = get_tranc_id_at(offsets[prev_idx]);
if (new_tranc_id > tranc_id) {
return prev_idx + 1; // 更新的记录不可见
}
}
return prev_idx;
} else {
// 当前记录不可见,向后查找
size_t next_idx = idx + 1;
while (next_idx < offsets.size() && is_same_key(next_idx, target_key)) {
auto new_tranc_id = get_tranc_id_at(offsets[next_idx]);
if (new_tranc_id <= tranc_id) {
return next_idx; // 找到可见记录
}
next_idx++;
}
return -1; // 没有找到满足条件的记录
}
} else {
// 没有开启事务的话, 直接选择最大的事务id的记录返回
size_t prev_idx = idx;
while (prev_idx > 0 && is_same_key(prev_idx - 1, target_key)) {
prev_idx--;
}
return prev_idx;
}
}

4 小节

本章我们在数据编码时引入了事务id, 并且在skiplistmemtablesst等组件的查询中中添加了事务id的判断, 使得查询可以满足事务隔离性, 但目前我们还不知道这些事务id是从哪里生成的, 我们在下一章中会引入事务管理器, 来管理事务id的生成。同时处理单词不开启事务查询时逻辑的兼容性。

需要注意的是,在添加事务id后, 我们之前的封装迭代器, 例如HeapIterator也需要包含事务的隔离性判断, 这将在下一章中实现。