之前我们已经对各个组件的接口进行了统一, 添加了tranc_id
这个事务id参数, 接下来这个章节, 我们将介绍顶层的事务设计, 即事务id是如何生成的, 实现相关的事务管理器.
代码仓库:ToniXWD/toni-lsm: A KV storage engine based on LSM Tree, supporting Redis RESP 感谢您的 Star
!
欢迎支持本项目同步更新的从零开始实现的视频教程:https://avo6166ew2u.feishu.cn/docx/LXmVdezdsoTBRaxC97WcHwGunOc
欢迎加入讨论群:Toni-LSM项目交流群
1 事务的设计思想
我们先给出一个完成后的demo
, 演示我们的事务设计是如何工作的, 代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| #include "../include/lsm/engine.h" #include <iostream> #include <string>
int main() { LSM lsm("example_data");
lsm.put("key1", "value1");
auto value1 = lsm.get("key1"); std::cout << "key1: " << value1.value() << std::endl;
auto tranc_hanlder = lsm.begin_tran(); tranc_hanlder->put("xxx", "yyy"); tranc_hanlder->put("yyy", "xxx"); tranc_hanlder->commit();
auto res = lsm.get("xxx"); std::cout << "xxx: " << res.value() << std::endl;
lsm.clear();
return 0; }
|
这里我们可以通过一个begin_tran
函数获取一个事务的处理句柄, 然后通过这个句柄进行增删改查操作, 最后通过commit
或abort
函数完成提交事务或终结事务的流程.
现在我们要完成的就是接受begin_tran
的事务管理器. 这里有一些设计问题我们需要提前明确:
begin_tran
获取的事务句柄肯定会分配一个事务id
, 那么没有开启事务的put/get/remove
操作的事务id
是什么呢?
begin_tran
进行增删改查的操作如何保证不同事务的隔离性?
首先回答第一个问题, 我们可以使用一个全局的atomic
变量来作为事务id, 这个变量在每次调用begin_tran
或put/get/remove
时自增, 这样就可以保证每个事务或单次操作都有一个唯一的tranc_id
(这里的tranc_id
和事务id
是同义词). 换句话说, 普通的put/get/remove
就是操作数量为1的简单事务。
然后是第二个问题,这实际上取决于我们的事务隔离级别:
Read Uncommitted
: 允许读取未提交的数据, 也就是脏读. 这种情况下, 我们可以将事务的句柄(案例代码中的tranc_hanlder
)进行增删改查的数据直接写入到memtable
中, 这样就可以让其他的事务可以从memtable
中读取到未提交的数据, 速度肯定很快. 但是这里有一个场景需要尤其注意, 就是我们事务rokkback
(或者是abort
)时, 必须撤销已经写入到memtable
的数据, 因此这里需要我们记录事务的操作记录和以前的历史记录, 然后在abort
时, 将memtable
中的数据进行回滚.
Read Committed
: 允许读取已提交的数据, 也就是不可脏读. 这种情况下, 我们可以将事务的句柄(案例代码中的tranc_hanlder
)进行增删改查的数据暂存到句柄的上下文, 因此其他事务从memtable
中是查不到这个事务未提交的数据的, 但这个事务自身查询时可以从自己的上下文中读取到未提交的数据.
Repeatable Read
: 在Read Committed
的基础上解决了不可重复读的问题, 也就是在同一个事务中, 多次读取同一数据的结果是一样的. 这种情况下, 我们可以将每次get
的数据同样暂存到句柄的上下文, 后续查询相同的key
时, 从上下文中读取到相同的数据.
Serializable
: 这个这个事务隔离级别我们在关系型数据库中是进一步解决幻读
现象的, 例如: 在Repeatable Read
隔离级别下,事务A读取了年龄>30的员工,得到10条记录。此时事务B插入了一个年龄31的新员工并提交。事务A再次读取同样的条件,可能会看到11条记录(幻读)。但在Serializable
隔离级别下,事务B的插入会被阻塞或者事务A的两次读取结果保持一致,避免幻读。在我们的KV
数据库中, 我们只需要保证事务提交时进程冲突检查、且按照事务id
的顺序依次提交即可(虽然这样性能很低)
如果这里看不懂的话, 可以继续往下看, 随着代码的实现应该能理解, 再理解不了的话, 建议先复习或学习数据库中MVCC
、事务、并发控制的基础知识
2 事务管理器的实现
2.1 组件关系设计
还记得我们之前实现的LSm
和LSMEngine
吗? 当时我们将LSMEngine
包裹在LSM
中, LSMEngine
中封装了memtable
, sst
等组件, 我们却进一步将其封装在LSM
中, 这样的目的就是在后续中加入其他同级别的组件, 例如本章的事务管理器, 偷我二年级的定义为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| class LSM { private: std::shared_ptr<LSMEngine> engine; std::shared_ptr<TranManager> tran_manager_;
public: LSM(std::string path); ~LSM();
std::optional<std::string> get(const std::string &key);
void put(const std::string &key, const std::string &value); void put_batch(const std::vector<std::pair<std::string, std::string>> &kvs);
void remove(const std::string &key); void remove_batch(const std::vector<std::string> &keys);
using LSMIterator = TwoMergeIterator; LSMIterator begin(uint64_t tranc_id); LSMIterator end(); std::optional<std::pair<TwoMergeIterator, TwoMergeIterator>> lsm_iters_monotony_predicate( uint64_t tranc_id, std::function<int(const std::string &)> predicate); void clear(); void flush(); void flush_all();
std::shared_ptr<TranContext> begin_tran(); };
|
这里的数据成员中, 我们添加了tran_manager_
, 其将作为分配事务id
和begin_tran
申请TranContext
的管理器
2.2 事务管理器的定义
2.2.1 TranContext的定义
这里首先回顾之前的定义:
1 2
| std::shared_ptr<TranManager> tran_manager_; std::shared_ptr<TranContext> begin_tran();
|
这里的TranContext
是向TranManager
申请事务上下文后返回的类, 这个上下文中, 我们需要实现基本的事务管理功能, 包括增删改查、提交、回滚,我们初步将其头文件定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| class LSMEngine; class TranManager;
class TranContext { friend class TranManager;
public: TranContext(uint64_t tranc_id, std::shared_ptr<LSMEngine> engine, std::shared_ptr<TranManager> tranManager); void put(const std::string &key, const std::string &value); void remove(const std::string &key); std::optional<std::string> get(const std::string &key);
bool commit(bool test_fail = false); bool abort();
std::shared_ptr<LSMEngine> engine_; std::shared_ptr<TranManager> tranManager_; uint64_t tranc_id_; std::vector<Record> operations; std::unordered_map<std::string, std::string> temp_map_; bool isCommited = false; bool isAborted = false;
private: std::unordered_map<std::string, std::optional<std::pair<std::string, uint64_t>>> read_map_; std::unordered_map<std::string, std::optional<std::pair<std::string, uint64_t>>> rollback_map_; };
|
需要注意的是, 这里的设计考虑了不同的事务隔离级别, 例如:
- 当我们使用
Repeatable Read
隔离级别时, 我们需要将每次get
的数据暂存到句柄的上下文中, 后续查询相同的key
时, 从上下文中读取到相同的数据. 这个暂存的数据就放在read_map_
中
- 当我们使用非
Read Uncommitted
隔离级别时, 我们需要将每次put
的数据暂存到句柄的上下文中, 后续查询相同的key
时, 从上下文中读取到相同的数据. 这个暂存的数据就放在temp_map_
中
- 当我们使用
Read Uncommitted
隔离级别时, 我们每次put
的数据是直接应用到整个存储引擎中的, 因此这里我们存在一个存储引擎的指针std::shared_ptr<TranManager> tranManager_
, 同时我们需要记录操作前的数据库状态以便于回滚, 因此其存放于rollback_map_
中
2.2.2 TranManager的定义
TranManager
的作用包括:
- 管理事务的
id
的生成
- 管理事务上下文
TranContext
的申请
- 负责记录当前事务完成状态:
max_flushed_tranc_id_
: 最大的已经刷入sst
中的事务
max_finished_tranc_id_
: 最大的已经完成的事务(但数据可能还存在于内存中)
nextTransactionId_
: 下一个分配事务的id
- 负责事务状态的持久化操作
这里特别进行说明, 为什么要负责是事务的持久化操作, 因为我们后续会实现WAL
(Write Ahead Log), 这个日志会记录每次的操作, 当我们的数据库崩溃后重启时, 会根据WAL
中的操作记录进行恢复, 这里的操作记录是指put
、remove
等操作, 但是我们需要在重启时知道哪些操作是已经完成的, 哪些操作是未完成的, 因此我们需要在WAL
中记录每个事务的状态, 这个状态就是max_flushed_tranc_id_
, 因此在重放WAL
的操作时, 我们需要根据这个状态来判断哪些操作是已经完成的, 哪些操作是未完成的.
WAL
在实现MVCC
后实现
讲清楚这些概念后, 我们不难写出如下的TranManager
定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| class TranManager : public std::enable_shared_from_this<TranManager> { public: TranManager(std::string data_dir, enum IsolationLevel isolation_level = IsolationLevel::REPEATABLE_READ); ~TranManager(); void init_new_wal(); void set_engine(std::shared_ptr<LSMEngine> engine); std::shared_ptr<TranContext> new_tranc();
uint64_t getNextTransactionId(); uint64_t get_max_flushed_tranc_id(); uint64_t get_max_finished_tranc_id_();
void update_max_finished_tranc_id(uint64_t tranc_id); void update_max_flushed_tranc_id(uint64_t tranc_id);
bool write_to_wal(const std::vector<Record> &records);
std::map<uint64_t, std::vector<Record>> check_recover();
std::string get_tranc_id_file_path(); void write_tranc_id_file(); void read_tranc_id_file(); enum IsolationLevel isolation_level();
private: mutable std::mutex mutex_; std::shared_ptr<LSMEngine> engine_; std::shared_ptr<WAL> wal; std::string data_dir_; enum IsolationLevel isolation_level_; std::atomic<uint64_t> nextTransactionId_ = 1; std::atomic<uint64_t> max_flushed_tranc_id_ = 0; std::atomic<uint64_t> max_finished_tranc_id_ = 0; std::map<uint64_t, std::shared_ptr<TranContext>> activeTrans_; FileObj tranc_id_file_; };
|
2.3 事务管理器的实现
这里我们进入今天的主题, 如何实现不同隔离级别下的事务操作
2.3.1 put
这里的实现就是我们之前讲到的:
- 如果隔离级别是
READ_UNCOMMITTED
, 直接写入memtable
- 其他隔离级别需要 暂存到
temp_map_
中, 统一提交后才在数据库中生效
这里设计到的operations
等操作可以先忽略, 其属于wal
部分的内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| void TranContext::put(const std::string &key, const std::string &value) { auto isolation_level = tranManager_->isolation_level();
operations.emplace_back(Record::putRecord(this->tranc_id_, key, value));
if (isolation_level == IsolationLevel::READ_UNCOMMITTED) { auto prev_record = engine_->get(key, 0); rollback_map_[key] = prev_record; engine_->put(key, value, tranc_id_); return; }
temp_map_[key] = value; }
|
2.3.2 remove
remove
在LSM
中本质上就是put
一个空值, 因此逻辑和put
是一样的, 这里我们只需要注意:
隔离级别为READ_UNCOMMITTED
时, 需要先查询以前的记录, 因为回滚时可能需要, 这里我们需要将以前的记录存储到rollback_map_
中, 然后再执行删除操作.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| void TranContext::remove(const std::string &key) { auto isolation_level = tranManager_->isolation_level();
operations.emplace_back(Record::deleteRecord(this->tranc_id_, key));
if (isolation_level == IsolationLevel::READ_UNCOMMITTED) { auto prev_record = engine_->get(key, 0); rollback_map_[key] = prev_record; engine_->remove(key, tranc_id_); return; }
temp_map_[key] = ""; }
|
2.3.3 get
这里get
的逻辑相对复杂一点:
- 所有隔离级别先就近在当前操作的临时缓存中查找(即使是
READ_UNCOMMITTED
隔离级别也可以直接从temp_map_
中读取, 虽然它是空的…)
- 否则使用
engine
查询
- 如果隔离级别是
READ_UNCOMMITTED
, 使用engine
查询时不需要判断tranc_id
, 直接获取最新值
- 如果隔离级别是
READ_COMMITTED
, 使用engine
查询时需要传递当前事务的tranc_id
表示最高的可见版本
- 如果隔离级别是
SERIALIZABLE
或REPEATABLE_READ
, 第一次使用engine
查询后还需要暂存, 从而避免不可重复读现象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| std::optional<std::string> TranContext::get(const std::string &key) { auto isolation_level = tranManager_->isolation_level();
if (temp_map_.find(key) != temp_map_.end()) { return temp_map_[key]; }
std::optional<std::pair<std::string, uint64_t>> query; if (isolation_level == IsolationLevel::READ_UNCOMMITTED) { query = engine_->get(key, 0); } else if (isolation_level == IsolationLevel::READ_COMMITTED) { query = engine_->get(key, this->tranc_id_); } else { if (read_map_.find(key) != read_map_.end()) { query = read_map_[key]; } else { query = engine_->get(key, this->tranc_id_); read_map_[key] = query; } } if (query.has_value()) { return query->first; } return std::nullopt; }
|
2.3.4 commit
commit
可能是折柳最复杂的, 其逻辑是:
- 如果隔离级别是
READ_UNCOMMITTED
, 因为之前就已经将更改的数据写入了MemTable
, 现在只需要直接写入wal
一个Commit
记录(目前不涉及, 可先跳过)
- 如果隔离级别是
REPEATABLE_READ
或SERIALIZABLE
, 需要遍历所有的操作记录, 判断是否存在冲突, 如果存在冲突则终止事务, 否则将所有的操作记录写入wal
中, 然后将数据应用到数据库中
- 完成事务数据同步到
MemTable
后, 更新max_finished_tranc_id_
并持久化数据
这里需要注意的是, 我们检查冲突侵入式地获取了其他组件的锁(memtable.frozen_mtx
, memtable.cur_mtx
, engine_->ssts_mtx
), 这个写法可能有点丑陋, 但一开始没有设计好, 也只能这样了
同样地, 这里设计WAL
的部分可以先跳过
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
| bool TranContext::commit(bool test_fail) { auto isolation_level = tranManager_->isolation_level();
if (isolation_level == IsolationLevel::READ_UNCOMMITTED) { operations.emplace_back(Record::commitRecord(this->tranc_id_));
auto wal_success = tranManager_->write_to_wal(operations);
if (!wal_success) { throw std::runtime_error("write to wal failed"); } isCommited = true; tranManager_->update_max_finished_tranc_id(tranc_id_); return true; }
MemTable &memtable = engine_->memtable; std::unique_lock<std::shared_mutex> wlock1(memtable.frozen_mtx); std::unique_lock<std::shared_mutex> wlock2(memtable.cur_mtx);
if (isolation_level == IsolationLevel::REPEATABLE_READ || isolation_level == IsolationLevel::SERIALIZABLE) {
std::shared_lock<std::shared_mutex> rlock3(engine_->ssts_mtx);
for (auto &[k, v] : temp_map_) {
auto res = memtable.get_(k, 0); if (res.is_valid() && res.get_tranc_id() > tranc_id_) { isAborted = true; return false; } else { if (tranManager_->get_max_flushed_tranc_id() <= tranc_id_) { continue; }
auto res = engine_->sst_get_(k, 0); if (res.has_value()) { auto [v, tranc_id] = res.value(); if (tranc_id > tranc_id_) { isAborted = true; return false; } } } } }
operations.emplace_back(Record::commitRecord(this->tranc_id_));
auto wal_success = tranManager_->write_to_wal(operations);
if (!wal_success) { throw std::runtime_error("write to wal failed"); }
if (!test_fail) { for (auto &[k, v] : temp_map_) { memtable.put_(k, v, tranc_id_); } }
isCommited = true; tranManager_->update_max_finished_tranc_id(tranc_id_); return true; }
|
2.3.5 abort
abort
方法用于回滚事务,其逻辑如下:
- 如果隔离级别是
READ_UNCOMMITTED
, 需要手动恢复之前的更改, 这里我们需要遍历rollback_map_
, 如果存在值, 则将其写入memtable
, 否则删除当前事务的新增操作
- 如果是其他隔离级别, 由于数据都是暂存到本地的, 因此直接丢弃即可, 同时为
WAL
添加一个Abort
标记即可(暂时先忽略, wal
的内容)1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| bool TranContext::abort() { auto isolation_level = tranManager_->isolation_level(); if (isolation_level == IsolationLevel::READ_UNCOMMITTED) { for (auto &[k, res] : rollback_map_) { if (res.has_value()) { engine_->put(k, res.value().first, res.value().second); } else { engine_->remove(k, tranc_id_); } } isAborted = true;
return true; }
isAborted = true;
return true; }
|
2.3.6 begin_tran
begin_tran
方法用于创建一个新的事务上下文, 在实现了前述的TranContext
之后, 我们只需要在Engine
中添加一个begin_tran
方法即可:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
|
std::shared_ptr<TranContext> LSM::begin_tran() { auto tranc_context = tran_manager_->new_tranc(); return tranc_context; }
std::shared_ptr<TranContext> TranManager::new_tranc() { std::unique_lock<std::mutex> lock(mutex_);
auto tranc_id = getNextTransactionId(); activeTrans_[tranc_id] = std::make_shared<TranContext>(tranc_id, engine_, shared_from_this()); return activeTrans_[tranc_id]; }
|
2.3.7 事务信息持久化
当每次进行事务id
的更新时(主要是update_max_flushed_tranc_id
), 都需要将最新的事务信息进行持久化, 由于我们的update_max_flushed_tranc_id
是一个原子变量, 因此我们可以通过原子变量的特性无锁地完成其更新操作,:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| void TranManager::update_max_flushed_tranc_id(uint64_t tranc_id) { uint64_t expected = max_flushed_tranc_id_.load(std::memory_order_relaxed); while (tranc_id > expected) { if (max_flushed_tranc_id_.compare_exchange_weak( expected, tranc_id, std::memory_order_acq_rel, std::memory_order_relaxed)) { break; } } write_tranc_id_file(); }
void TranManager::write_tranc_id_file() {
std::vector<uint8_t> buf(3 * sizeof(uint64_t), 0); uint64_t nextTransactionId = nextTransactionId_.load(); uint64_t max_flushed_tranc_id = max_flushed_tranc_id_.load(); uint64_t max_finished_tranc_id = max_finished_tranc_id_.load();
memcpy(buf.data(), &nextTransactionId, sizeof(uint64_t)); memcpy(buf.data() + sizeof(uint64_t), &max_flushed_tranc_id, sizeof(uint64_t)); memcpy(buf.data() + 2 * sizeof(uint64_t), &max_finished_tranc_id, sizeof(uint64_t));
tranc_id_file_.write(0, buf); tranc_id_file_.sync(); }
|
这里简单介绍下原子变量和内存顺序:
- 原子变量:
max_flushed_tranc_id_
通过 compare_exchange_weak
循环更新最大值,确保多线程安全。
- 内存顺序:
acq_rel
保证更新对其他线程可见且本线程读取最新值,失败时 relaxed
降低开销。
- 持久化:更新后立即调用
write_tranc_id_file()
刷盘,确保崩溃恢复时数据可靠。
3 小节
本章完成了MVCC
和事务设计的上层设计, 现在我们的存储引擎已经支持MVCC
查询和不同隔离级别的事务了, 但现在我们的存储引擎还缺少WAL
和崩溃恢复的设计, 这是我们下一章的内容。