C++从零开始实现LSM-Tree-KV存储-17-事务与MVCC设计3

之前我们已经对各个组件的接口进行了统一, 添加了tranc_id这个事务id参数, 接下来这个章节, 我们将介绍顶层的事务设计, 即事务id是如何生成的, 实现相关的事务管理器.

代码仓库:ToniXWD/toni-lsm: A KV storage engine based on LSM Tree, supporting Redis RESP 感谢您的 Star!

欢迎支持本项目同步更新的从零开始实现的视频教程:https://avo6166ew2u.feishu.cn/docx/LXmVdezdsoTBRaxC97WcHwGunOc

欢迎加入讨论群:Toni-LSM项目交流群

1 事务的设计思想

我们先给出一个完成后的demo, 演示我们的事务设计是如何工作的, 代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
#include "../include/lsm/engine.h"
#include <iostream>
#include <string>

int main() {
// create lsm instance, data_dir is the directory to store data
LSM lsm("example_data");

// put data
lsm.put("key1", "value1");

// Query data
auto value1 = lsm.get("key1");
std::cout << "key1: " << value1.value() << std::endl;


// transaction
auto tranc_hanlder = lsm.begin_tran();
tranc_hanlder->put("xxx", "yyy");
tranc_hanlder->put("yyy", "xxx");
tranc_hanlder->commit();

auto res = lsm.get("xxx");
std::cout << "xxx: " << res.value() << std::endl;

lsm.clear();

return 0;
}

这里我们可以通过一个begin_tran函数获取一个事务的处理句柄, 然后通过这个句柄进行增删改查操作, 最后通过commitabort函数完成提交事务或终结事务的流程.

现在我们要完成的就是接受begin_tran的事务管理器. 这里有一些设计问题我们需要提前明确:

  1. begin_tran获取的事务句柄肯定会分配一个事务id, 那么没有开启事务的put/get/remove操作的事务id是什么呢?
  2. begin_tran进行增删改查的操作如何保证不同事务的隔离性?

首先回答第一个问题, 我们可以使用一个全局的atomic变量来作为事务id, 这个变量在每次调用begin_tranput/get/remove时自增, 这样就可以保证每个事务或单次操作都有一个唯一的tranc_id(这里的tranc_id和事务id是同义词). 换句话说, 普通的put/get/remove就是操作数量为1的简单事务。

然后是第二个问题,这实际上取决于我们的事务隔离级别:

  1. Read Uncommitted: 允许读取未提交的数据, 也就是脏读. 这种情况下, 我们可以将事务的句柄(案例代码中的tranc_hanlder)进行增删改查的数据直接写入到memtable中, 这样就可以让其他的事务可以从memtable中读取到未提交的数据, 速度肯定很快. 但是这里有一个场景需要尤其注意, 就是我们事务rokkback(或者是abort)时, 必须撤销已经写入到memtable的数据, 因此这里需要我们记录事务的操作记录和以前的历史记录, 然后在abort时, 将memtable中的数据进行回滚.
  2. Read Committed: 允许读取已提交的数据, 也就是不可脏读. 这种情况下, 我们可以将事务的句柄(案例代码中的tranc_hanlder)进行增删改查的数据暂存到句柄的上下文, 因此其他事务从memtable中是查不到这个事务未提交的数据的, 但这个事务自身查询时可以从自己的上下文中读取到未提交的数据.
  3. Repeatable Read: 在Read Committed的基础上解决了不可重复读的问题, 也就是在同一个事务中, 多次读取同一数据的结果是一样的. 这种情况下, 我们可以将每次get的数据同样暂存到句柄的上下文, 后续查询相同的key时, 从上下文中读取到相同的数据.
  4. Serializable: 这个这个事务隔离级别我们在关系型数据库中是进一步解决幻读现象的, 例如: 在Repeatable Read隔离级别下,事务A读取了年龄>30的员工,得到10条记录。此时事务B插入了一个年龄31的新员工并提交。事务A再次读取同样的条件,可能会看到11条记录(幻读)。但在Serializable隔离级别下,事务B的插入会被阻塞或者事务A的两次读取结果保持一致,避免幻读。在我们的KV数据库中, 我们只需要保证事务提交时进程冲突检查、且按照事务id的顺序依次提交即可(虽然这样性能很低)

如果这里看不懂的话, 可以继续往下看, 随着代码的实现应该能理解, 再理解不了的话, 建议先复习或学习数据库中MVCC、事务、并发控制的基础知识

2 事务管理器的实现

2.1 组件关系设计

还记得我们之前实现的LSmLSMEngine吗? 当时我们将LSMEngine包裹在LSM中, LSMEngine中封装了memtable, sst等组件, 我们却进一步将其封装在LSM中, 这样的目的就是在后续中加入其他同级别的组件, 例如本章的事务管理器, 偷我二年级的定义为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// include/lsm/engine.h
class LSM {
private:
std::shared_ptr<LSMEngine> engine;
std::shared_ptr<TranManager> tran_manager_; // new

public:
LSM(std::string path);
~LSM();

std::optional<std::string> get(const std::string &key);

void put(const std::string &key, const std::string &value);
void put_batch(const std::vector<std::pair<std::string, std::string>> &kvs);

void remove(const std::string &key);
void remove_batch(const std::vector<std::string> &keys);

using LSMIterator = TwoMergeIterator; // TODO: 实现 compact后需要修改
LSMIterator begin(uint64_t tranc_id); // TODO: 实现 compact后需要修改
LSMIterator end();
std::optional<std::pair<TwoMergeIterator, TwoMergeIterator>>
lsm_iters_monotony_predicate(
uint64_t tranc_id, std::function<int(const std::string &)> predicate);
void clear();
void flush();
void flush_all();

// 开启一个事务
std::shared_ptr<TranContext> begin_tran(); // new
};

这里的数据成员中, 我们添加了tran_manager_, 其将作为分配事务idbegin_tran申请TranContext的管理器

2.2 事务管理器的定义

2.2.1 TranContext的定义

这里首先回顾之前的定义:

1
2
std::shared_ptr<TranManager> tran_manager_; // new
std::shared_ptr<TranContext> begin_tran(); // new

这里的TranContext是向TranManager申请事务上下文后返回的类, 这个上下文中, 我们需要实现基本的事务管理功能, 包括增删改查、提交、回滚,我们初步将其头文件定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// include/lsm/transaction.h
class LSMEngine;
class TranManager;

class TranContext {
friend class TranManager;

public:
TranContext(uint64_t tranc_id, std::shared_ptr<LSMEngine> engine,
std::shared_ptr<TranManager> tranManager);
void put(const std::string &key, const std::string &value);
void remove(const std::string &key);
std::optional<std::string> get(const std::string &key);

// ! test_fail = true 是测试中手动触发的崩溃
bool commit(bool test_fail = false);
bool abort();

std::shared_ptr<LSMEngine> engine_;
std::shared_ptr<TranManager> tranManager_;
uint64_t tranc_id_;
std::vector<Record> operations;
std::unordered_map<std::string, std::string> temp_map_;
bool isCommited = false;
bool isAborted = false;

private:
std::unordered_map<std::string,
std::optional<std::pair<std::string, uint64_t>>>
read_map_;
std::unordered_map<std::string,
std::optional<std::pair<std::string, uint64_t>>>
rollback_map_;
};

需要注意的是, 这里的设计考虑了不同的事务隔离级别, 例如:

  1. 当我们使用Repeatable Read隔离级别时, 我们需要将每次get的数据暂存到句柄的上下文中, 后续查询相同的key时, 从上下文中读取到相同的数据. 这个暂存的数据就放在read_map_
  2. 当我们使用非Read Uncommitted隔离级别时, 我们需要将每次put的数据暂存到句柄的上下文中, 后续查询相同的key时, 从上下文中读取到相同的数据. 这个暂存的数据就放在temp_map_
  3. 当我们使用Read Uncommitted隔离级别时, 我们每次put的数据是直接应用到整个存储引擎中的, 因此这里我们存在一个存储引擎的指针std::shared_ptr<TranManager> tranManager_, 同时我们需要记录操作前的数据库状态以便于回滚, 因此其存放于rollback_map_

2.2.2 TranManager的定义

TranManager的作用包括:

  1. 管理事务的id的生成
  2. 管理事务上下文TranContext的申请
  3. 负责记录当前事务完成状态:
    1. max_flushed_tranc_id_: 最大的已经刷入sst中的事务
    2. max_finished_tranc_id_: 最大的已经完成的事务(但数据可能还存在于内存中)
    3. nextTransactionId_: 下一个分配事务的id
  4. 负责事务状态的持久化操作

这里特别进行说明, 为什么要负责是事务的持久化操作, 因为我们后续会实现WAL(Write Ahead Log), 这个日志会记录每次的操作, 当我们的数据库崩溃后重启时, 会根据WAL中的操作记录进行恢复, 这里的操作记录是指putremove等操作, 但是我们需要在重启时知道哪些操作是已经完成的, 哪些操作是未完成的, 因此我们需要在WAL中记录每个事务的状态, 这个状态就是max_flushed_tranc_id_, 因此在重放WAL的操作时, 我们需要根据这个状态来判断哪些操作是已经完成的, 哪些操作是未完成的.

WAL在实现MVCC后实现

讲清楚这些概念后, 我们不难写出如下的TranManager定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// include/lsm/transaction.h
class TranManager : public std::enable_shared_from_this<TranManager> {
public:
TranManager(std::string data_dir, enum IsolationLevel isolation_level =
IsolationLevel::REPEATABLE_READ);
~TranManager();
void init_new_wal();
void set_engine(std::shared_ptr<LSMEngine> engine);
std::shared_ptr<TranContext> new_tranc();

uint64_t getNextTransactionId();
uint64_t get_max_flushed_tranc_id();
uint64_t get_max_finished_tranc_id_();

void update_max_finished_tranc_id(uint64_t tranc_id);
void update_max_flushed_tranc_id(uint64_t tranc_id);

bool write_to_wal(const std::vector<Record> &records);

std::map<uint64_t, std::vector<Record>> check_recover();

std::string get_tranc_id_file_path();
void write_tranc_id_file();
void read_tranc_id_file();
enum IsolationLevel isolation_level();
// void flusher();

private:
mutable std::mutex mutex_;
std::shared_ptr<LSMEngine> engine_;
std::shared_ptr<WAL> wal;
std::string data_dir_;
enum IsolationLevel isolation_level_;
// std::atomic<bool> flush_thread_running_ = true;
std::atomic<uint64_t> nextTransactionId_ = 1;
std::atomic<uint64_t> max_flushed_tranc_id_ = 0;
std::atomic<uint64_t> max_finished_tranc_id_ = 0;
std::map<uint64_t, std::shared_ptr<TranContext>> activeTrans_;
FileObj tranc_id_file_;
};

2.3 事务管理器的实现

这里我们进入今天的主题, 如何实现不同隔离级别下的事务操作

2.3.1 put

这里的实现就是我们之前讲到的:

  1. 如果隔离级别是READ_UNCOMMITTED, 直接写入memtable
  2. 其他隔离级别需要 暂存到temp_map_中, 统一提交后才在数据库中生效

这里设计到的operations等操作可以先忽略, 其属于wal部分的内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// src/lsm/transaction.cpp
void TranContext::put(const std::string &key, const std::string &value) {
auto isolation_level = tranManager_->isolation_level();

// 所有隔离级别都需要先写入 operations 中
operations.emplace_back(Record::putRecord(this->tranc_id_, key, value)); // 可忽略, WAL部分的内容

if (isolation_level == IsolationLevel::READ_UNCOMMITTED) {
// 1 如果隔离级别是 READ_UNCOMMITTED, 直接写入 memtable
// 先查询以前的记录, 因为回滚时可能需要
auto prev_record = engine_->get(key, 0);
rollback_map_[key] = prev_record;
engine_->put(key, value, tranc_id_);
return;
}

// 2 其他隔离级别需要 暂存到 temp_map_ 中, 统一提交后才在数据库中生效
temp_map_[key] = value;
}

2.3.2 remove

removeLSM中本质上就是put一个空值, 因此逻辑和put是一样的, 这里我们只需要注意:
隔离级别为READ_UNCOMMITTED时, 需要先查询以前的记录, 因为回滚时可能需要, 这里我们需要将以前的记录存储到rollback_map_中, 然后再执行删除操作.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
void TranContext::remove(const std::string &key) {
auto isolation_level = tranManager_->isolation_level();

// 所有隔离级别都需要先写入 operations 中
operations.emplace_back(Record::deleteRecord(this->tranc_id_, key));

if (isolation_level == IsolationLevel::READ_UNCOMMITTED) {
// 1 如果隔离级别是 READ_UNCOMMITTED, 直接写入 memtable
// 先查询以前的记录, 因为回滚时可能需要
auto prev_record = engine_->get(key, 0);
rollback_map_[key] = prev_record;
engine_->remove(key, tranc_id_);
return;
}

// 2 其他隔离级别需要 暂存到 temp_map_ 中, 统一提交后才在数据库中生效
temp_map_[key] = "";
}

2.3.3 get

这里get的逻辑相对复杂一点:

  1. 所有隔离级别先就近在当前操作的临时缓存中查找(即使是READ_UNCOMMITTED隔离级别也可以直接从temp_map_中读取, 虽然它是空的…)
  2. 否则使用engine查询
    1. 如果隔离级别是READ_UNCOMMITTED, 使用engine查询时不需要判断tranc_id, 直接获取最新值
    2. 如果隔离级别是READ_COMMITTED, 使用engine查询时需要传递当前事务的tranc_id表示最高的可见版本
    3. 如果隔离级别是SERIALIZABLEREPEATABLE_READ, 第一次使用engine查询后还需要暂存, 从而避免不可重复读现象
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
std::optional<std::string> TranContext::get(const std::string &key) {
auto isolation_level = tranManager_->isolation_level();

// 1 所有隔离级别先就近在当前操作的临时缓存中查找
if (temp_map_.find(key) != temp_map_.end()) {
// READ_UNCOMMITTED 随单次操作更新数据库, 不需要最后的统一更新
// 这一步骤肯定会自然跳过的
return temp_map_[key];
}

// 2 否则使用 engine 查询
std::optional<std::pair<std::string, uint64_t>> query;
if (isolation_level == IsolationLevel::READ_UNCOMMITTED) {
// 2.1 如果隔离级别是 READ_UNCOMMITTED, 使用 engine
// 查询时不需要判断 tranc_id, 直接获取最新值
query = engine_->get(key, 0);
} else if (isolation_level == IsolationLevel::READ_COMMITTED) {
// 2.2 如果隔离级别是 READ_COMMITTED, 使用 engine
// 查询时判断 tranc_id
query = engine_->get(key, this->tranc_id_);
} else {
// 2.2 如果隔离级别是 SERIALIZABLE 或 REPEATABLE_READ, 第一次使用 engine
// 查询后还需要暂存
if (read_map_.find(key) != read_map_.end()) {
query = read_map_[key];
} else {
query = engine_->get(key, this->tranc_id_);
read_map_[key] = query;
}
}
if (query.has_value()) {
return query->first;
}
return std::nullopt;
}

2.3.4 commit

commit可能是折柳最复杂的, 其逻辑是:

  1. 如果隔离级别是READ_UNCOMMITTED, 因为之前就已经将更改的数据写入了MemTable, 现在只需要直接写入wal一个Commit记录(目前不涉及, 可先跳过)
  2. 如果隔离级别是REPEATABLE_READSERIALIZABLE, 需要遍历所有的操作记录, 判断是否存在冲突, 如果存在冲突则终止事务, 否则将所有的操作记录写入wal中, 然后将数据应用到数据库中
  3. 完成事务数据同步到MemTable后, 更新max_finished_tranc_id_并持久化数据

这里需要注意的是, 我们检查冲突侵入式地获取了其他组件的锁(memtable.frozen_mtx, memtable.cur_mtx, engine_->ssts_mtx), 这个写法可能有点丑陋, 但一开始没有设计好, 也只能这样了
同样地, 这里设计WAL的部分可以先跳过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
bool TranContext::commit(bool test_fail) {
auto isolation_level = tranManager_->isolation_level();

if (isolation_level == IsolationLevel::READ_UNCOMMITTED) {
// READ_UNCOMMITTED 随单次操作更新数据库, 不需要最后的统一更新
// 因此也不需要使用到后面的锁保证正确性
operations.emplace_back(Record::commitRecord(this->tranc_id_));

// 先刷入wal
auto wal_success = tranManager_->write_to_wal(operations); // WAL的部分, 现在可以忽略

if (!wal_success) {
throw std::runtime_error("write to wal failed");
}
isCommited = true;
tranManager_->update_max_finished_tranc_id(tranc_id_);
return true;
}

// commit 需要检查所有的操作是否合法

// 遍历所有的记录, 判断是否合法

// TODO: 目前为检查冲突, 全局获取了读锁, 后续考虑性能优化方案

MemTable &memtable = engine_->memtable;
std::unique_lock<std::shared_mutex> wlock1(memtable.frozen_mtx);
std::unique_lock<std::shared_mutex> wlock2(memtable.cur_mtx);

if (isolation_level == IsolationLevel::REPEATABLE_READ ||
isolation_level == IsolationLevel::SERIALIZABLE) {
// REPEATABLE_READ 需要校验冲突
// TODO: 目前 SERIALIZABLE 还没有实现, 逻辑和 REPEATABLE_READ 相同

// 只要需要校验的 隔离级别 需要加sst的锁
std::shared_lock<std::shared_mutex> rlock3(engine_->ssts_mtx);

for (auto &[k, v] : temp_map_) {
// 步骤1: 先在内存表中判断该 key 是否冲突

// ! 注意第二个参数设置为0, 表示忽略事务可见性的查询
auto res = memtable.get_(k, 0);
if (res.is_valid() && res.get_tranc_id() > tranc_id_) {
// 数据库中存在相同的 key , 且其 tranc_id 大于当前 tranc_id
// 表示更晚创建的事务修改了相同的key, 并先提交, 发生了冲突
// 需要终止事务
isAborted = true;
return false;
} else {
// 步骤2: 判断sst中是否是否存在冲突
if (tranManager_->get_max_flushed_tranc_id() <= tranc_id_) {
// sst 中最大的 tranc_id 小于当前 tranc_id, 没有冲突
continue;
}

// 否则要查询具体的key是否冲突
// ! 注意第二个参数设置为0, 表示忽略事务可见性的查询
auto res = engine_->sst_get_(k, 0);
if (res.has_value()) {
auto [v, tranc_id] = res.value();
if (tranc_id > tranc_id_) {
// 数据库中存在相同的 key , 且其 tranc_id 大于当前 tranc_id
// 表示更晚创建的事务修改了相同的key, 并先提交, 发生了冲突
// 需要终止事务
isAborted = true;
return false;
}
}
}
}
}

// 其他隔离级别不检查, 直接运行到这里

// 校验全部通过, 可以刷入
operations.emplace_back(Record::commitRecord(this->tranc_id_));

// 先刷入wal
auto wal_success = tranManager_->write_to_wal(operations);

if (!wal_success) {
throw std::runtime_error("write to wal failed");
}

// 将暂存数据应用到数据库
if (!test_fail) {
// 这里是手动调用 memtable 的无锁版本的 put_, 因为之前手动加了写锁
for (auto &[k, v] : temp_map_) {
memtable.put_(k, v, tranc_id_);
}
}

isCommited = true;
tranManager_->update_max_finished_tranc_id(tranc_id_);
return true;
}

2.3.5 abort

abort 方法用于回滚事务,其逻辑如下:

  1. 如果隔离级别是READ_UNCOMMITTED, 需要手动恢复之前的更改, 这里我们需要遍历rollback_map_, 如果存在值, 则将其写入memtable, 否则删除当前事务的新增操作
  2. 如果是其他隔离级别, 由于数据都是暂存到本地的, 因此直接丢弃即可, 同时为WAL添加一个Abort标记即可(暂时先忽略, wal的内容)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    bool TranContext::abort() {
    auto isolation_level = tranManager_->isolation_level();
    if (isolation_level == IsolationLevel::READ_UNCOMMITTED) {
    // 需要手动恢复之前的更改
    // TODO: 需要使用批量化操作优化性能
    for (auto &[k, res] : rollback_map_) {
    if (res.has_value()) {
    engine_->put(k, res.value().first, res.value().second);
    } else {
    // 之前本就不存在, 需要移除当前事务的新增操作
    engine_->remove(k, tranc_id_);
    }
    }
    isAborted = true;

    return true;
    }
    // ! 由于目前 records 是在 commit 时统一刷入 wal 的, 因此这个情况下, abort
    // ! 简单丢弃 operations 即可 ! 后续会将 records 分批次写入 wal
    // ! 这时就需要加上 ! rollback 标记了, 执行下面注释的逻辑

    // operations.emplace_back(Record::rollbackRecord(this->tranc_id_));
    // // 先刷入wal
    // auto wal_success = tranManager_->write_to_wal(operations);

    // if (!wal_success) {
    // throw std::runtime_error("write to wal failed");
    // }

    isAborted = true;

    return true;
    }

2.3.6 begin_tran

begin_tran方法用于创建一个新的事务上下文, 在实现了前述的TranContext 之后, 我们只需要在Engine中添加一个begin_tran方法即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// src/lsm/engine.cpp
// 开启一个事务
std::shared_ptr<TranContext> LSM::begin_tran() {
auto tranc_context = tran_manager_->new_tranc();
return tranc_context;
}

// src/lsm/transation.cpp
std::shared_ptr<TranContext> TranManager::new_tranc() {
// 获取锁
std::unique_lock<std::mutex> lock(mutex_);

auto tranc_id = getNextTransactionId();
activeTrans_[tranc_id] =
std::make_shared<TranContext>(tranc_id, engine_, shared_from_this());
return activeTrans_[tranc_id];
}

2.3.7 事务信息持久化

当每次进行事务id的更新时(主要是update_max_flushed_tranc_id), 都需要将最新的事务信息进行持久化, 由于我们的update_max_flushed_tranc_id是一个原子变量, 因此我们可以通过原子变量的特性无锁地完成其更新操作,:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
void TranManager::update_max_flushed_tranc_id(uint64_t tranc_id) {
// ! max_finished_tranc_id_ 对于崩溃恢复有决定性的作用
// ! 需要立即刷盘
uint64_t expected = max_flushed_tranc_id_.load(std::memory_order_relaxed);
while (tranc_id > expected) {
if (max_flushed_tranc_id_.compare_exchange_weak(
expected, tranc_id, std::memory_order_acq_rel,
std::memory_order_relaxed)) {
break;
}
}
write_tranc_id_file();
}

void TranManager::write_tranc_id_file() {
// 共4个8字节的整型id记录
// std::atomic<uint64_t> nextTransactionId_;
// std::atomic<uint64_t> max_flushed_tranc_id_;
// std::atomic<uint64_t> max_finished_tranc_id_;

std::vector<uint8_t> buf(3 * sizeof(uint64_t), 0);
uint64_t nextTransactionId = nextTransactionId_.load();
uint64_t max_flushed_tranc_id = max_flushed_tranc_id_.load();
uint64_t max_finished_tranc_id = max_finished_tranc_id_.load();

memcpy(buf.data(), &nextTransactionId, sizeof(uint64_t));
memcpy(buf.data() + sizeof(uint64_t), &max_flushed_tranc_id,
sizeof(uint64_t));
memcpy(buf.data() + 2 * sizeof(uint64_t), &max_finished_tranc_id,
sizeof(uint64_t));

tranc_id_file_.write(0, buf);
tranc_id_file_.sync();
}

这里简单介绍下原子变量和内存顺序:

  1. 原子变量max_flushed_tranc_id_ 通过 compare_exchange_weak 循环更新最大值,确保多线程安全。
  2. 内存顺序acq_rel 保证更新对其他线程可见且本线程读取最新值,失败时 relaxed 降低开销。
  3. 持久化:更新后立即调用 write_tranc_id_file() 刷盘,确保崩溃恢复时数据可靠。

3 小节

本章完成了MVCC和事务设计的上层设计, 现在我们的存储引擎已经支持MVCC查询和不同隔离级别的事务了, 但现在我们的存储引擎还缺少WAL和崩溃恢复的设计, 这是我们下一章的内容。