之前我们已经完成了数据库内存中和编码文件中事务信息的融入, 现在我们还需要加上WAL与崩溃恢复的内容。
代码仓库:ToniXWD/toni-lsm: A KV storage engine based on LSM Tree, supporting Redis RESP 感谢您的 Star
!
欢迎支持本项目同步更新的从零开始实现的视频教程:https://avo6166ew2u.feishu.cn/docx/LXmVdezdsoTBRaxC97WcHwGunOc
欢迎加入讨论群:Toni-LSM项目交流群
1 WAL编码设计
1.1 WAL简介
这里首先简单介绍下WAL(预写式日志) , WAL(预写式日志) 是数据库系统中保障数据一致性和事务可靠性的核心技术,核心思想是“日志先行”——任何数据修改必须先记录到日志中,确保日志持久化后,才允许实际数据写入磁盘。这种机制解决了两个关键问题:一是保证已提交的事务不会因系统崩溃而丢失(持久性),二是确保事务要么完全生效、要么完全回滚(原子性)。
具体来说,当一个事务提交时,数据库会先将事务的修改操作(例如数据修改前后的值、事务状态等)按顺序写入日志文件,并强制将日志刷到磁盘存储。由于日志是顺序写入,相比随机修改数据页的I/O操作,这种设计大幅提升了性能。之后,数据库可以灵活地将内存中的脏页批量刷新到磁盘,减少磁盘操作次数。
如果系统崩溃,重启后可通过日志恢复数据。恢复分为两个阶段:Redo 阶段会重放所有已提交但未落盘的日志,确保事务修改生效;Undo 阶段则回滚未提交事务的部分修改,消除中间状态。为了加速恢复,数据库会定期创建检查点(Checkpoint),将当前内存中的脏页刷盘,并记录日志位置,这样恢复时只需处理检查点之后的日志。
WAL的优势不仅在于数据安全,还在于其高性能和可扩展性。例如,PostgreSQL、MySQL InnoDB等数据库依赖WAL实现事务和崩溃恢复;分布式系统(如Raft算法)也借鉴类似思想,通过日志复制保证一致性。不过,WAL的日志文件可能快速增长,需要定期清理或归档,且频繁刷盘可能带来性能损耗,因此在实际应用中需权衡同步/异步提交等策略。
1.2 WAL编码设计
根据之前的介绍, 我们的WAL
文件中需要满足如下功能:
- 描述事务的开始和结束
- 描述每次事务的操作内容
在KV
存储引擎这个领域, WAL
的设计已经比关系型数据库简单很多了, 因为其操作类型就只有简单的基于键值对的put/get/remove
. 与之相反, 关系型数据库的WAL
就复杂很多了, 还涉及到物理日志和逻辑日志的区别
在KV存储引擎中,WAL
的设计需要满足简洁性和高效性。由于操作类型仅限于put
、remove
(get
通常不涉及数据修改,因此无需记录),日志结构可以大幅简化。以下是具体的设计要点:
1.2.1 日志条目设计
每个日志条目需包含以下核心信息:
- 事务标识(Transaction ID):唯一标识事务的ID,用于关联多个操作。
- 操作类型(Operation Type):如
PUT
、REMOVE
、GET
、BEGIN
、COMMIT
、ABORT
等。
- 键(Key):操作的键值。
- 值(Value):对于
PUT
操作记录具体值;
- **校验和(Checksum)(可选)**:用于验证日志条目的完整性(如CRC32)。
- 时间戳(可选):记录操作时间,用于多版本控制或冲突解决。
这样,我们可以通过每一个日志条目判断这个操作类型和数据、是哪一个事务进行操作。同时, 由于我们在上一章中将数据库的事务完成状态也进行了持久化, 因此在崩溃恢复时, 我们可以通过检查当前WAL
条目的事务id
和以刷盘的事务的状态进行对比, 来判断是否需要重放操作。
因此我们不难进行如下的头文件定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| #pragma once
#include <cstdint> #include <iostream> #include <stdexcept> #include <string> #include <vector>
enum class OperationType { CREATE, COMMIT, ROLLBACK, PUT, DELETE, };
class Record { private: Record() = default;
public:
static Record createRecord(uint64_t tranc_id); static Record commitRecord(uint64_t tranc_id); static Record rollbackRecord(uint64_t tranc_id); static Record putRecord(uint64_t tranc_id, const std::string &key, const std::string &value); static Record deleteRecord(uint64_t tranc_id, const std::string &key);
std::vector<uint8_t> encode() const;
static std::vector<Record> decode(const std::vector<uint8_t> &data);
uint64_t getTrancId() const { return tranc_id_; } OperationType getOperationType() const { return operation_type_; } std::string getKey() const { return key_; } std::string getValue() const { return value_; }
void print() const;
bool operator==(const Record &other) const; bool operator!=(const Record &other) const;
private: uint64_t tranc_id_; OperationType operation_type_; std::string key_; std::string value_; uint16_t record_len_; };
|
这里的Record
表示的就是WAL
中的单个日志条目, 操作类型为OperationType
, 通过OperationType
可以判断其是否有key
, value
等附加数据信息。同时,我们通过静态成员函数createRecord
, putRecord
等构造类的实例。最后,encode
和decode
函数用于将记录转换为字节流和从字节流恢复记录。
1.2.2 日志条目编解码
这里的编解码函数实现和之前block
, sst
文件的编解码方式非常类似, 每个条目的格式为:
1
| | record_len | tranc_id | operation_type | key_len(optional) | key(optional) | value_len(optional) | value(optional) |
|
这里, 当operation_type
是CREATE
, ROLLBACK
, COMMIT
时, 只需要记录tranc_id
和operation_type
即可, 其余的optional
部分不存在, 当operation_type
是PUT
时, 需要记录tranc_id
, operation_type
, key
, value
; 当operation_type
是DELETE
时, 需要记录tranc_id
, operation_type
, key
。’
每个条目的第一部分是record_len
, 其记录了整个日志条目的长度(16位)。这里的编解码需要注意一下,encode
函数是以单个Record
为单位, 将其编码为字节流, 而decode
函数是以字节流为单位, 将其解码为Record
数组。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
| std::vector<uint8_t> Record::encode() const { std::vector<uint8_t> record;
size_t key_offset = sizeof(uint16_t) + sizeof(uint64_t) + sizeof(uint8_t);
record.resize(record_len_, 0);
std::memcpy(record.data(), &record_len_, sizeof(uint16_t));
std::memcpy(record.data() + sizeof(uint16_t), &tranc_id_, sizeof(uint64_t));
auto type_byte = static_cast<uint8_t>(operation_type_); std::memcpy(record.data() + sizeof(uint16_t) + sizeof(uint64_t), &type_byte, sizeof(uint8_t));
if (this->operation_type_ == OperationType::PUT) { uint16_t key_len = key_.size(); std::memcpy(record.data() + key_offset, &key_len, sizeof(uint16_t)); std::memcpy(record.data() + key_offset + sizeof(uint16_t), key_.data(), key_.size());
size_t value_offset = key_offset + sizeof(uint16_t) + key_.size();
uint16_t value_len = value_.size(); std::memcpy(record.data() + value_offset, &value_len, sizeof(uint16_t)); std::memcpy(record.data() + value_offset + sizeof(uint16_t), value_.data(), value_.size()); } else if (this->operation_type_ == OperationType::DELETE) { uint16_t key_len = key_.size(); std::memcpy(record.data() + key_offset, &key_len, sizeof(uint16_t)); std::memcpy(record.data() + key_offset + sizeof(uint16_t), key_.data(), key_.size()); }
return record; }
std::vector<Record> Record::decode(const std::vector<uint8_t> &data) { if (data.size() < sizeof(uint16_t) + sizeof(uint64_t) + sizeof(uint8_t)) { return {}; }
std::vector<Record> records; size_t pos = 0;
while (pos < data.size()) { uint16_t record_len; std::memcpy(&record_len, data.data() + pos, sizeof(uint16_t)); pos += sizeof(uint16_t);
if (data.size() < record_len) { throw std::runtime_error("Data length does not match record length"); }
uint64_t tranc_id; std::memcpy(&tranc_id, data.data() + pos, sizeof(uint64_t)); pos += sizeof(uint64_t);
uint8_t op_type = data[pos++]; OperationType operation_type = static_cast<OperationType>(op_type);
Record record; record.tranc_id_ = tranc_id; record.operation_type_ = operation_type; record.record_len_ = record_len;
if (operation_type == OperationType::PUT) { uint16_t key_len; std::memcpy(&key_len, data.data() + pos, sizeof(uint16_t)); pos += sizeof(uint16_t);
record.key_ = std::string( reinterpret_cast<const char *>(data.data() + pos), key_len); pos += key_len;
uint16_t value_len; std::memcpy(&value_len, data.data() + pos, sizeof(uint16_t)); pos += sizeof(uint16_t);
record.value_ = std::string( reinterpret_cast<const char *>(data.data() + pos), value_len); pos += value_len; } else if (operation_type == OperationType::DELETE) { uint16_t key_len; std::memcpy(&key_len, data.data() + pos, sizeof(uint16_t)); pos += sizeof(uint16_t);
record.key_ = std::string( reinterpret_cast<const char *>(data.data() + pos), key_len); pos += key_len; }
records.push_back(record); } return records; }
|
2 崩溃恢复机制
2.1 崩溃恢复运行机制
当某一时刻存储引擎发送崩溃时, 需要进行崩溃恢复,以恢复数据状态。崩溃恢复的关键是回放日志,以确定已提交的事务,并执行其操作。在这个描述中我们不难得到一个信息, 即成功执行的事务一定要先将操作记录持久化到日志中, 然后再在内存中进行操作, 最后返回成功信息给客户端或者调用者。这也是为什么这个机制称为预写式日志。其崩溃恢复的工作流程包括:
- 日志回放:从最后一个检查点开始扫描日志,按顺序处理所有已提交事务(
COMMIT
标记后的操作)。
- Redo阶段:重新执行所有已提交事务的
PUT/REMOVE
操作,覆盖当前数据状态。
- Undo阶段(可选):若事务未提交(无
COMMIT
标记),则直接丢弃其操作记录。
示例场景
假设事务TX100
依次执行PUT key1=value1
和REMOVE key2
,其日志内容如下:
1 2 3 4
| BEGIN TX100 PUT key1 5 value1 DELETE key2 COMMIT TX100
|
事务TX100
在调用commit
函数后, 需要将上述日志刷入wal
文件完成预写这一步骤后, 才会返回client
事务提交成功。一开始提交的事务,其数据一定是只存在于MemTable
中的, 此时如果存储引擎崩溃, 刚刚完成的事务是没法刷入到sst
文件的, 但我们重启时可以检查wal
文件的内容, 将其与数据库持久化的状态进行比对(持久化的状态包括最大已完成的事务id、最大已经刷盘的事务id, 见上一章的内容),如果其事务id
比目前已经持久化到sst
的最大事务id
大,则说明该事务需要进行重放, 另一方面, 如果事务记录的最后一个条目是Rollback
,则说明该事务需要被回滚, 则不需要在崩溃恢复时进行重放.
将上述流程总结如下:
情形1: 正常提交事务、SST刷盘正常
- 事务开始, 写入
BEGIN
标记到WAL
日志中。(此时的WAL
日志可能存在于缓冲区, 没有刷入文件)
- 执行若干
PUT/DELETE
操作, 将操作记录写入WAL
日志中。(此时的WAL
日志可能存在于缓冲区, 没有刷入文件)
- 如果隔离级别是
Read Uncommitted
, 可以将PUT/DELETE
操作直接应用到数据库
- 其他隔离级别则将
PUT/DELETE
操作暂存到事务管理的上下文内存中, 等待事务提交时再应用到数据库
- 提交事务:
- 将
COMMIT
标记写入WAL
日志中。(此时的WAL
日志可能存在于缓冲区, 没有刷入文件)
- 将
WAL
日志刷入磁盘。(此时WAL
日志已经刷入磁盘)
- 如果隔离级别不是
Read Committed
, 则将暂存的PUT/DELETE
操作应用到数据库
- 返回给
client
成功或失败
- 之前事务的
PUT/DELETE
操作的变化应用到数据库仍然是位于MemTable
中的, 其会稍后输入SST
情形2: 正常提交事务、SST刷盘崩溃
- 事务开始, 写入
BEGIN
标记到WAL
日志中。(此时的WAL
日志可能存在于缓冲区, 没有刷入文件)
- 执行若干
PUT/DELETE
操作, 将操作记录写入WAL
日志中。(此时的WAL
日志可能存在于缓冲区, 没有刷入文件)
- 如果隔离级别是
Read Uncommitted
, 可以将PUT/DELETE
操作直接应用到数据库
- 其他隔离级别则将
PUT/DELETE
操作暂存到事务管理的上下文内存中, 等待事务提交时再应用到数据库
- 提交事务:
- 将
COMMIT
标记写入WAL
日志中。(此时的WAL
日志可能存在于缓冲区, 没有刷入文件)
- 将
WAL
日志刷入磁盘。(此时WAL
日志已经刷入磁盘)
- 如果隔离级别不是
Read Committed
, 则将暂存的PUT/DELETE
操作应用到数据库
- 返回给
client
成功或失败
- 之前事务的
PUT/DELETE
操作的变化应用到数据库仍然是位于MemTable
中的, 其稍后输入SST
奔溃
- 数据库重启后执行崩溃回复
- 检查
WAL
文件的记录
- 整合事务
id
每条记录, 忽略以Rollback
结尾的事务
- 若事务以
Commit
结尾, 则将事务id
与已经刷盘的SST
中的最大事务id
进行比对
- 若事务
id
大于SST
的最大事务id
, 执行重放操作
- 若事务
id
小于SST
的最大事务id
, 则忽略该事务, 因为其已经被数据库执行过了
情形3: 事务回滚
- 事务开始, 写入
BEGIN
标记到WAL
日志中。(此时的WAL
日志可能存在于缓冲区, 没有刷入文件)
- 执行若干
PUT/DELETE
操作, 将操作记录写入WAL
日志中。(此时的WAL
日志可能存在于缓冲区, 没有刷入文件)
- 如果隔离级别是
Read Uncommitted
, 可以将PUT/DELETE
操作直接应用到数据库
- 其他隔离级别则将
PUT/DELETE
操作暂存到事务管理的上下文内存中, 等待事务提交时再应用到数据库
- 回滚事务:
- 将
Rollback
标记写入WAL
日志中。(此时的WAL
日志可能存在于缓冲区, 没有刷入文件)
- 将
WAL
日志刷入磁盘。(此时WAL
日志已经刷入磁盘)
- 如果隔离级别不是
Read Committed
, 则将暂存的PUT/DELETE
操作简单丢弃即可
- 如果隔离级别是
Read Committed
, 则将操作前的数据库状态进行还原(上一章介绍的rollback_map_
)
- 返回给
client
成功或失败
2.2 考虑WAL后的事务操作
上一章中, 我们的事务设计没有考虑到或忽略了WAL
的操作, 这里我们将对WAL
的操作进行实现和讲解:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
| void TranContext::put(const std::string &key, const std::string &value) { auto isolation_level = tranManager_->isolation_level();
operations.emplace_back(Record::putRecord(this->tranc_id_, key, value));
if (isolation_level == IsolationLevel::READ_UNCOMMITTED) { auto prev_record = engine_->get(key, 0); rollback_map_[key] = prev_record; engine_->put(key, value, tranc_id_); return; }
temp_map_[key] = value; }
void TranContext::remove(const std::string &key) { auto isolation_level = tranManager_->isolation_level();
operations.emplace_back(Record::deleteRecord(this->tranc_id_, key));
}
bool TranContext::commit(bool test_fail) { auto isolation_level = tranManager_->isolation_level();
if (isolation_level == IsolationLevel::READ_UNCOMMITTED) { operations.emplace_back(Record::commitRecord(this->tranc_id_));
auto wal_success = tranManager_->write_to_wal(operations);
if (!wal_success) { throw std::runtime_error("write to wal failed"); } isCommited = true; tranManager_->update_max_finished_tranc_id(tranc_id_); return true; }
operations.emplace_back(Record::commitRecord(this->tranc_id_));
auto wal_success = tranManager_->write_to_wal(operations);
if (!wal_success) { throw std::runtime_error("write to wal failed"); }
if (!test_fail) { for (auto &[k, v] : temp_map_) { memtable.put_(k, v, tranc_id_); } }
}
bool TranContext::abort() { auto isolation_level = tranManager_->isolation_level(); if (isolation_level == IsolationLevel::READ_UNCOMMITTED) { for (auto &[k, res] : rollback_map_) { if (res.has_value()) { engine_->put(k, res.value().first, res.value().second); } else { engine_->remove(k, tranc_id_); } } isAborted = true;
return true; }
isAborted = true;
return true; }
|
需要注意的是, 目前的实现中, 所有的操作记录是随put/remove
添加到TranContext
的成员变量operations
中, 然后在commit
时统一写入WAL
中, 因此长事务在commit
时的IO
耗时较大, 后续应该改为分批次进行WAL
刷入, 这里我留了个issues
: https://github.com/ToniXWD/toni-lsm/issues/6 ,感兴趣的朋友可以自己实现一下
2.3 WAL刷盘函数
commit
函数中的write_to_wal
函数实现了WAL
文件的刷盘操作, 其具体实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| bool TranManager::write_to_wal(const std::vector<Record> &records) { try { wal->log(records, true); } catch (const std::exception &e) { return false; }
return true; }
void WAL::log(const std::vector<Record> &records, bool force_flush) { std::unique_lock<std::mutex> lock(mutex_);
for (const auto &record : records) { log_buffer_.push_back(record); }
if (log_buffer_.size() < buffer_size_ && !force_flush) { return; }
auto pre_buffer = std::move(log_buffer_); for (const auto &record : pre_buffer) { std::vector<uint8_t> encoded_record = record.encode(); log_file_.append(encoded_record); } if (!log_file_.sync()) { throw std::runtime_error("Failed to sync WAL file"); }
auto cur_file_size = log_file_.size(); if (cur_file_size > file_size_limit_) { reset_file(); } }
|
这里的核心函数是WAL::log
, 其会将一个数组中的内存刷入到内存缓存数组log_buffer_
, 当其数组大小超过指定阈值时, 就会进行刷盘操作, 同时会重置log_buffer_
。同时,类似commit
, abort
这样的事务操作需要立即刷盘以完成持久化, 这是可以手动指定force_flush
为true
来强行刷入。这里的log_file_
就是我们使用之前章节实现的FileObj
包装的wal
文件操作句柄。
2.4 WAL重放检查函数
这是这章最核心的部分, 数据库启动时, 我们需要检查WAL
文件是否需要被重放, 即检查WAL
文件中的Record
是否需要被应用到数据库中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| LSM::LSM(std::string path) : engine(std::make_shared<LSMEngine>(path)), tran_manager_(std::make_shared<TranManager>(path)) { tran_manager_->set_engine(engine); auto check_recover_res = tran_manager_->check_recover(); for (auto &[tranc_id, records] : check_recover_res) { tran_manager_->update_max_finished_tranc_id(tranc_id); for (auto &record : records) { if (record.getOperationType() == OperationType::PUT) { engine->put(record.getKey(), record.getValue(), tranc_id); } else if (record.getOperationType() == OperationType::DELETE) { engine->remove(record.getKey(), tranc_id); } } } tran_manager_->init_new_wal(); }
|
数据库初始化时, 构造函数会调用check_recover
判断是否需要进行WAL
重放, 其执行的逻辑如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| std::map<uint64_t, std::vector<Record>> TranManager::check_recover() { std::map<uint64_t, std::vector<Record>> wal_records = WAL::recover(data_dir_, max_flushed_tranc_id_); return wal_records; }
std::map<uint64_t, std::vector<Record>> WAL::recover(const std::string &log_dir, uint64_t max_flushed_tranc_id) { std::map<uint64_t, std::vector<Record>> tranc_records{};
if (!std::filesystem::exists(log_dir)) { return tranc_records; }
std::vector<std::string> wal_paths; for (const auto &entry : std::filesystem::directory_iterator(log_dir)) { if (entry.is_regular_file()) { std::string filename = entry.path().filename().string(); if (filename.substr(0, 4) != "wal.") { continue; }
wal_paths.push_back(entry.path().string()); } }
std::sort(wal_paths.begin(), wal_paths.end(), [](const std::string &a, const std::string &b) { auto a_seq_str = a.substr(a.find_last_of(".") + 1); auto b_seq_str = b.substr(b.find_last_of(".") + 1); return std::stoi(a_seq_str) < std::stoi(b_seq_str); });
for (const auto &wal_path : wal_paths) { auto wal_file = FileObj::open(wal_path, false); auto wal_records_slice = wal_file.read_to_slice(0, wal_file.size()); auto records = Record::decode(wal_records_slice); for (const auto &record : records) { if (record.getTrancId() > max_flushed_tranc_id) { tranc_records[record.getTrancId()].push_back(record); } } }
return tranc_records; }
|
WAL::recover
是核心的WAL
检测函数, 其会将WAL
中不同事务的记录进行整理, 放到一个map
中, map
的每个key
即为事务的id
, value
即为事务中包含的所有Record
。WAL::recover
将key
与当前数据持久化信息中的max_flushed_tranc_id
进行比较, 只保留记录的 tranc_id
大于 max_finished_tranc_id
的记录, 否则其已经持久化到sst
中, 因此不需要再进行恢复。
WAL::recover
检查只保留要重放的记录, 具体的重放操作由LSM::LSM
中完成, 即构造函数中的for
循环部分:
1 2 3 4 5 6 7 8 9 10
| for (auto &[tranc_id, records] : check_recover_res) { tran_manager_->update_max_finished_tranc_id(tranc_id); for (auto &record : records) { if (record.getOperationType() == OperationType::PUT) { engine->put(record.getKey(), record.getValue(), tranc_id); } else if (record.getOperationType() == OperationType::DELETE) { engine->remove(record.getKey(), tranc_id); } } }
|
2.5 WAL清理函数
WAL
中的文件内容, 在数据运用到数据库并刷入了SST
后就没有实际意义了, 这时我们需要清理WAL
文件, 删除多余的WAL
文件, 并且将WAL
文件指针指向最新的WAL
文件。这里我们如下设计:
wal
文件格式为wal.0
, wal.1
, wal.2
, …, wal.n
, 其中n
为文件序号, wal.0
为最旧文件, wal.n
为最新文件
- 单个
wal
文件有容量限制, 超出容量就需要创建新的wal
文件, 并且将wal
文件指针指向最新的wal
文件
- 任意时刻只有1个
wal
文件处于允许写入状态, 其余的wal
文件处于关闭状态
- 清理函数根据目前已经被持久化的事务的
id
, 周期性地从小到大遍历wal
文件, 删除所有记录已经被持久化的wal
文件,
其清理函数为:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| void WAL::cleanWALFile() { std::string dir_path;
std::unique_lock<std::mutex> lock(mutex_); if (active_log_path_.find("/") != std::string::npos) { dir_path = active_log_path_.substr(0, active_log_path_.find_last_of("/")) + "/"; } else { dir_path = "./"; } lock.unlock();
std::vector<std::pair<size_t, std::string>> wal_paths;
for (const auto &entry : std::filesystem::directory_iterator(dir_path)) { if (entry.is_regular_file() && entry.path().filename().string().substr(0, 4) == "wal.") { std::string filename = entry.path().filename().string(); size_t dot_pos = filename.find_last_of("."); std::string seq_str = filename.substr(dot_pos + 1); uint64_t seq = std::stoull(seq_str); wal_paths.push_back({seq, entry.path().string()}); } }
std::sort(wal_paths.begin(), wal_paths.end(), [](const std::pair<size_t, std::string> &a, const std::pair<size_t, std::string> &b) { return a.first < b.first; });
std::vector<FileObj> del_paths; for (int idx = 0; idx < wal_paths.size() - 1; idx++) { auto cur_path = wal_paths[idx].second; auto cur_file = FileObj::open(cur_path, false); size_t offset = 0; bool has_unfinished = false; while (offset + sizeof(uint16_t) < cur_file.size()) { uint16_t record_size = cur_file.read_uint16(offset); uint64_t tranc_id = cur_file.read_uint64(offset + sizeof(uint16_t)); if (tranc_id > max_finished_tranc_id_) { has_unfinished = true; break; } } if (!has_unfinished) { del_paths.push_back(std::move(cur_file)); } }
for (auto &del_file : del_paths) { del_file.del_file(); } }
|
其在构造函数初始化时, 会启动一个函数周期性地运行cleanWALFile
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
WAL::WAL(const std::string &log_dir, size_t buffer_size, uint64_t max_finished_tranc_id, uint64_t clean_interval, uint64_t file_size_limit) : buffer_size_(buffer_size), max_finished_tranc_id_(max_finished_tranc_id), stop_cleaner_(false), clean_interval_(clean_interval), file_size_limit_(file_size_limit) { active_log_path_ = log_dir + "/wal.0"; log_file_ = FileObj::open(active_log_path_, true);
cleaner_thread_ = std::thread(&WAL::cleaner, this); }
void WAL::cleaner() { while (true) { { std::this_thread::sleep_for(std::chrono::seconds(clean_interval_)); if (stop_cleaner_) { break; } cleanWALFile(); } } }
|
3 总结
WAL
是LSM
树中一个重要的组件, 它的作用是保证数据的持久化, 并且WAL
中的数据可以保证数据的一致性。至此,我们的LSm Tree
的大部分基础功能都已经实现完毕, 包括基础的api
, 事务, MVCC
, WAL
和崩溃恢复。下一章, 我们完成SST
的compact
后, LSM Tree
这个项目就接近完结啦