C++从零开始实现LSM-Tree-KV存储-18-WAL与崩溃恢复

之前我们已经完成了数据库内存中和编码文件中事务信息的融入, 现在我们还需要加上WAL与崩溃恢复的内容。

代码仓库:ToniXWD/toni-lsm: A KV storage engine based on LSM Tree, supporting Redis RESP 感谢您的 Star!

欢迎支持本项目同步更新的从零开始实现的视频教程:https://avo6166ew2u.feishu.cn/docx/LXmVdezdsoTBRaxC97WcHwGunOc

欢迎加入讨论群:Toni-LSM项目交流群

1 WAL编码设计

1.1 WAL简介

这里首先简单介绍下WAL(预写式日志) , WAL(预写式日志) 是数据库系统中保障数据一致性和事务可靠性的核心技术,核心思想是“日志先行”——任何数据修改必须先记录到日志中,确保日志持久化后,才允许实际数据写入磁盘。这种机制解决了两个关键问题:一是保证已提交的事务不会因系统崩溃而丢失(持久性),二是确保事务要么完全生效、要么完全回滚(原子性)。

具体来说,当一个事务提交时,数据库会先将事务的修改操作(例如数据修改前后的值、事务状态等)按顺序写入日志文件,并强制将日志刷到磁盘存储。由于日志是顺序写入,相比随机修改数据页的I/O操作,这种设计大幅提升了性能。之后,数据库可以灵活地将内存中的脏页批量刷新到磁盘,减少磁盘操作次数。

如果系统崩溃,重启后可通过日志恢复数据。恢复分为两个阶段:Redo 阶段会重放所有已提交但未落盘的日志,确保事务修改生效;Undo 阶段则回滚未提交事务的部分修改,消除中间状态。为了加速恢复,数据库会定期创建检查点(Checkpoint),将当前内存中的脏页刷盘,并记录日志位置,这样恢复时只需处理检查点之后的日志。

WAL的优势不仅在于数据安全,还在于其高性能和可扩展性。例如,PostgreSQL、MySQL InnoDB等数据库依赖WAL实现事务和崩溃恢复;分布式系统(如Raft算法)也借鉴类似思想,通过日志复制保证一致性。不过,WAL的日志文件可能快速增长,需要定期清理或归档,且频繁刷盘可能带来性能损耗,因此在实际应用中需权衡同步/异步提交等策略。

1.2 WAL编码设计

根据之前的介绍, 我们的WAL文件中需要满足如下功能:

  1. 描述事务的开始和结束
  2. 描述每次事务的操作内容

KV存储引擎这个领域, WAL的设计已经比关系型数据库简单很多了, 因为其操作类型就只有简单的基于键值对的put/get/remove. 与之相反, 关系型数据库的WAL就复杂很多了, 还涉及到物理日志和逻辑日志的区别

在KV存储引擎中,WAL的设计需要满足简洁性和高效性。由于操作类型仅限于putremoveget通常不涉及数据修改,因此无需记录),日志结构可以大幅简化。以下是具体的设计要点:

1.2.1 日志条目设计

每个日志条目需包含以下核心信息:

  1. 事务标识(Transaction ID):唯一标识事务的ID,用于关联多个操作。
  2. 操作类型(Operation Type):如PUTREMOVEGETBEGINCOMMITABORT等。
  3. 键(Key):操作的键值。
  4. 值(Value):对于PUT操作记录具体值;
  5. **校验和(Checksum)(可选)**:用于验证日志条目的完整性(如CRC32)。
  6. 时间戳(可选):记录操作时间,用于多版本控制或冲突解决。

这样,我们可以通过每一个日志条目判断这个操作类型和数据、是哪一个事务进行操作。同时, 由于我们在上一章中将数据库的事务完成状态也进行了持久化, 因此在崩溃恢复时, 我们可以通过检查当前WAL条目的事务id和以刷盘的事务的状态进行对比, 来判断是否需要重放操作。

因此我们不难进行如下的头文件定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#pragma once

#include <cstdint>
#include <iostream>
#include <stdexcept>
#include <string>
#include <vector>

enum class OperationType {
CREATE,
COMMIT,
ROLLBACK,
PUT,
DELETE,
};

class Record {
private:
// 构造函数
Record() = default;

public:
// 操作类型枚举

static Record createRecord(uint64_t tranc_id);
static Record commitRecord(uint64_t tranc_id);
static Record rollbackRecord(uint64_t tranc_id);
static Record putRecord(uint64_t tranc_id, const std::string &key,
const std::string &value);
static Record deleteRecord(uint64_t tranc_id, const std::string &key);

// 编码记录
std::vector<uint8_t> encode() const;

// 解码记录
static std::vector<Record> decode(const std::vector<uint8_t> &data);

// 获取记录的各个部分
uint64_t getTrancId() const { return tranc_id_; }
OperationType getOperationType() const { return operation_type_; }
std::string getKey() const { return key_; }
std::string getValue() const { return value_; }

// 打印记录(用于调试)
void print() const;

bool operator==(const Record &other) const;
bool operator!=(const Record &other) const;

private:
uint64_t tranc_id_;
OperationType operation_type_;
std::string key_;
std::string value_;
uint16_t record_len_;
};

这里的Record表示的就是WAL中的单个日志条目, 操作类型为OperationType, 通过OperationType可以判断其是否有key, value等附加数据信息。同时,我们通过静态成员函数createRecord, putRecord等构造类的实例。最后,encodedecode函数用于将记录转换为字节流和从字节流恢复记录。

1.2.2 日志条目编解码

这里的编解码函数实现和之前block, sst文件的编解码方式非常类似, 每个条目的格式为:

1
| record_len | tranc_id | operation_type | key_len(optional) | key(optional) | value_len(optional) | value(optional) |

这里, 当operation_typeCREATE, ROLLBACK, COMMIT时, 只需要记录tranc_idoperation_type即可, 其余的optional部分不存在, 当operation_typePUT时, 需要记录tranc_id, operation_type, key, value; 当operation_typeDELETE时, 需要记录tranc_id, operation_type, key。’

每个条目的第一部分是record_len, 其记录了整个日志条目的长度(16位)。这里的编解码需要注意一下,encode函数是以单个Record为单位, 将其编码为字节流, 而decode函数是以字节流为单位, 将其解码为Record数组。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
std::vector<uint8_t> Record::encode() const {
std::vector<uint8_t> record;

size_t key_offset = sizeof(uint16_t) + sizeof(uint64_t) + sizeof(uint8_t);
// 记录长度本身(16) + 事务id(64) +
// 操作类型(8), 固有的编码部分

record.resize(record_len_, 0);

// 编码 record_len
std::memcpy(record.data(), &record_len_, sizeof(uint16_t));

// 编码 tranc_id
std::memcpy(record.data() + sizeof(uint16_t), &tranc_id_, sizeof(uint64_t));

// 编码 operation_type
auto type_byte = static_cast<uint8_t>(operation_type_);
std::memcpy(record.data() + sizeof(uint16_t) + sizeof(uint64_t), &type_byte,
sizeof(uint8_t));

if (this->operation_type_ == OperationType::PUT) {
uint16_t key_len = key_.size();
std::memcpy(record.data() + key_offset, &key_len, sizeof(uint16_t));
std::memcpy(record.data() + key_offset + sizeof(uint16_t), key_.data(),
key_.size());

size_t value_offset = key_offset + sizeof(uint16_t) + key_.size();

uint16_t value_len = value_.size();
std::memcpy(record.data() + value_offset, &value_len, sizeof(uint16_t));
std::memcpy(record.data() + value_offset + sizeof(uint16_t), value_.data(),
value_.size());
} else if (this->operation_type_ == OperationType::DELETE) {
uint16_t key_len = key_.size();
std::memcpy(record.data() + key_offset, &key_len, sizeof(uint16_t));
std::memcpy(record.data() + key_offset + sizeof(uint16_t), key_.data(),
key_.size());
}

return record;
}

std::vector<Record> Record::decode(const std::vector<uint8_t> &data) {
if (data.size() < sizeof(uint16_t) + sizeof(uint64_t) + sizeof(uint8_t)) {
return {};
}

std::vector<Record> records;
size_t pos = 0;

while (pos < data.size()) {
// 读取 record_len
uint16_t record_len;
std::memcpy(&record_len, data.data() + pos, sizeof(uint16_t));
pos += sizeof(uint16_t);

// 检查数据长度是否足够
if (data.size() < record_len) {
throw std::runtime_error("Data length does not match record length");
}

// 读取 tranc_id
uint64_t tranc_id;
std::memcpy(&tranc_id, data.data() + pos, sizeof(uint64_t));
pos += sizeof(uint64_t);

// 读取 operation_type
uint8_t op_type = data[pos++];
OperationType operation_type = static_cast<OperationType>(op_type);

Record record;
record.tranc_id_ = tranc_id;
record.operation_type_ = operation_type;
record.record_len_ = record_len;

if (operation_type == OperationType::PUT) {
// 读取 key_len
uint16_t key_len;
std::memcpy(&key_len, data.data() + pos, sizeof(uint16_t));
pos += sizeof(uint16_t);

// 读取 key
record.key_ = std::string(
reinterpret_cast<const char *>(data.data() + pos), key_len);
pos += key_len;

// 读取 value_len
uint16_t value_len;
std::memcpy(&value_len, data.data() + pos, sizeof(uint16_t));
pos += sizeof(uint16_t);

// 读取 value
record.value_ = std::string(
reinterpret_cast<const char *>(data.data() + pos), value_len);
pos += value_len;
} else if (operation_type == OperationType::DELETE) {
// 读取 key_len
uint16_t key_len;
std::memcpy(&key_len, data.data() + pos, sizeof(uint16_t));
pos += sizeof(uint16_t);

// 读取 key
record.key_ = std::string(
reinterpret_cast<const char *>(data.data() + pos), key_len);
pos += key_len;
}

records.push_back(record);
}
return records;
}

2 崩溃恢复机制

2.1 崩溃恢复运行机制

当某一时刻存储引擎发送崩溃时, 需要进行崩溃恢复,以恢复数据状态。崩溃恢复的关键是回放日志,以确定已提交的事务,并执行其操作。在这个描述中我们不难得到一个信息, 即成功执行的事务一定要先将操作记录持久化到日志中, 然后再在内存中进行操作, 最后返回成功信息给客户端或者调用者。这也是为什么这个机制称为预写式日志。其崩溃恢复的工作流程包括:

  1. 日志回放:从最后一个检查点开始扫描日志,按顺序处理所有已提交事务(COMMIT标记后的操作)。
  2. Redo阶段:重新执行所有已提交事务的PUT/REMOVE操作,覆盖当前数据状态。
  3. Undo阶段(可选):若事务未提交(无COMMIT标记),则直接丢弃其操作记录。

示例场景
假设事务TX100依次执行PUT key1=value1REMOVE key2,其日志内容如下:

1
2
3
4
BEGIN TX100
PUT key1 5 value1
DELETE key2
COMMIT TX100

事务TX100在调用commit函数后, 需要将上述日志刷入wal文件完成预写这一步骤后, 才会返回client事务提交成功。一开始提交的事务,其数据一定是只存在于MemTable中的, 此时如果存储引擎崩溃, 刚刚完成的事务是没法刷入到sst文件的, 但我们重启时可以检查wal文件的内容, 将其与数据库持久化的状态进行比对(持久化的状态包括最大已完成的事务id、最大已经刷盘的事务id, 见上一章的内容),如果其事务id比目前已经持久化到sst的最大事务id大,则说明该事务需要进行重放, 另一方面, 如果事务记录的最后一个条目是Rollback,则说明该事务需要被回滚, 则不需要在崩溃恢复时进行重放.

将上述流程总结如下:

情形1: 正常提交事务、SST刷盘正常

  1. 事务开始, 写入BEGIN标记到WAL日志中。(此时的WAL日志可能存在于缓冲区, 没有刷入文件)
  2. 执行若干PUT/DELETE操作, 将操作记录写入WAL日志中。(此时的WAL日志可能存在于缓冲区, 没有刷入文件)
    1. 如果隔离级别是Read Uncommitted, 可以将PUT/DELETE操作直接应用到数据库
    2. 其他隔离级别则将PUT/DELETE操作暂存到事务管理的上下文内存中, 等待事务提交时再应用到数据库
  3. 提交事务:
    1. COMMIT标记写入WAL日志中。(此时的WAL日志可能存在于缓冲区, 没有刷入文件)
    2. WAL日志刷入磁盘。(此时WAL日志已经刷入磁盘)
    3. 如果隔离级别不是Read Committed, 则将暂存的PUT/DELETE操作应用到数据库
    4. 返回给client成功或失败
  4. 之前事务的PUT/DELETE操作的变化应用到数据库仍然是位于MemTable中的, 其会稍后输入SST

情形2: 正常提交事务、SST刷盘崩溃

  1. 事务开始, 写入BEGIN标记到WAL日志中。(此时的WAL日志可能存在于缓冲区, 没有刷入文件)
  2. 执行若干PUT/DELETE操作, 将操作记录写入WAL日志中。(此时的WAL日志可能存在于缓冲区, 没有刷入文件)
    1. 如果隔离级别是Read Uncommitted, 可以将PUT/DELETE操作直接应用到数据库
    2. 其他隔离级别则将PUT/DELETE操作暂存到事务管理的上下文内存中, 等待事务提交时再应用到数据库
  3. 提交事务:
    1. COMMIT标记写入WAL日志中。(此时的WAL日志可能存在于缓冲区, 没有刷入文件)
    2. WAL日志刷入磁盘。(此时WAL日志已经刷入磁盘)
    3. 如果隔离级别不是Read Committed, 则将暂存的PUT/DELETE操作应用到数据库
    4. 返回给client成功或失败
  4. 之前事务的PUT/DELETE操作的变化应用到数据库仍然是位于MemTable中的, 其稍后输入SST奔溃
  5. 数据库重启后执行崩溃回复
    1. 检查WAL文件的记录
    2. 整合事务id每条记录, 忽略以Rollback结尾的事务
    3. 若事务以Commit结尾, 则将事务id与已经刷盘的SST中的最大事务id进行比对
      1. 若事务id大于SST的最大事务id, 执行重放操作
      2. 若事务id小于SST的最大事务id, 则忽略该事务, 因为其已经被数据库执行过了

情形3: 事务回滚

  1. 事务开始, 写入BEGIN标记到WAL日志中。(此时的WAL日志可能存在于缓冲区, 没有刷入文件)
  2. 执行若干PUT/DELETE操作, 将操作记录写入WAL日志中。(此时的WAL日志可能存在于缓冲区, 没有刷入文件)
    1. 如果隔离级别是Read Uncommitted, 可以将PUT/DELETE操作直接应用到数据库
    2. 其他隔离级别则将PUT/DELETE操作暂存到事务管理的上下文内存中, 等待事务提交时再应用到数据库
  3. 回滚事务:
    1. Rollback标记写入WAL日志中。(此时的WAL日志可能存在于缓冲区, 没有刷入文件)
    2. WAL日志刷入磁盘。(此时WAL日志已经刷入磁盘)
    3. 如果隔离级别不是Read Committed, 则将暂存的PUT/DELETE操作简单丢弃即可
    4. 如果隔离级别是Read Committed, 则将操作前的数据库状态进行还原(上一章介绍的rollback_map_)
    5. 返回给client成功或失败

2.2 考虑WAL后的事务操作

上一章中, 我们的事务设计没有考虑到或忽略了WAL的操作, 这里我们将对WAL的操作进行实现和讲解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
void TranContext::put(const std::string &key, const std::string &value) {
auto isolation_level = tranManager_->isolation_level();

// 所有隔离级别都需要先写入 operations 中
operations.emplace_back(Record::putRecord(this->tranc_id_, key, value));

if (isolation_level == IsolationLevel::READ_UNCOMMITTED) {
// 1 如果隔离级别是 READ_UNCOMMITTED, 直接写入 memtable
// 先查询以前的记录, 因为回滚时可能需要
auto prev_record = engine_->get(key, 0);
rollback_map_[key] = prev_record;
engine_->put(key, value, tranc_id_);
return;
}

// 2 其他隔离级别需要 暂存到 temp_map_ 中, 统一提交后才在数据库中生效
temp_map_[key] = value;
}

void TranContext::remove(const std::string &key) {
auto isolation_level = tranManager_->isolation_level();

// 所有隔离级别都需要先写入 operations 中
operations.emplace_back(Record::deleteRecord(this->tranc_id_, key));

// ... 具体的内存操作
}

bool TranContext::commit(bool test_fail) {
auto isolation_level = tranManager_->isolation_level();

if (isolation_level == IsolationLevel::READ_UNCOMMITTED) {
// READ_UNCOMMITTED 随单次操作更新数据库, 不需要最后的统一更新
// 因此也不需要使用到后面的锁保证正确性
operations.emplace_back(Record::commitRecord(this->tranc_id_));

// 先刷入wal
auto wal_success = tranManager_->write_to_wal(operations);

if (!wal_success) {
throw std::runtime_error("write to wal failed");
}
isCommited = true;
tranManager_->update_max_finished_tranc_id(tranc_id_);
return true;
}

// ... 冲突检查

// 其他隔离级别不检查, 直接运行到这里

// 校验全部通过, 可以刷入
operations.emplace_back(Record::commitRecord(this->tranc_id_));

// 先刷入wal
auto wal_success = tranManager_->write_to_wal(operations);

if (!wal_success) {
throw std::runtime_error("write to wal failed");
}

// 将暂存数据应用到数据库
if (!test_fail) {
// 这里是手动调用 memtable 的无锁版本的 put_, 因为之前手动加了写锁
for (auto &[k, v] : temp_map_) {
memtable.put_(k, v, tranc_id_);
}
}

// ... 持久化事务状态
}

bool TranContext::abort() {
auto isolation_level = tranManager_->isolation_level();
if (isolation_level == IsolationLevel::READ_UNCOMMITTED) {
// 需要手动恢复之前的更改
// TODO: 需要使用批量化操作优化性能
for (auto &[k, res] : rollback_map_) {
if (res.has_value()) {
engine_->put(k, res.value().first, res.value().second);
} else {
// 之前本就不存在, 需要移除当前事务的新增操作
engine_->remove(k, tranc_id_);
}
}
isAborted = true;

return true;
}
// ! 由于目前 records 是在 commit 时统一刷入 wal 的, 因此这个情况下, abort
// ! 简单丢弃 operations 即可 ! 后续会将 records 分批次写入 wal
// ! 这时就需要加上 ! rollback 标记了, 执行下面注释的逻辑

// operations.emplace_back(Record::rollbackRecord(this->tranc_id_));
// // 先刷入wal
// auto wal_success = tranManager_->write_to_wal(operations);

// if (!wal_success) {
// throw std::runtime_error("write to wal failed");
// }

isAborted = true;

return true;
}

需要注意的是, 目前的实现中, 所有的操作记录是随put/remove添加到TranContext的成员变量operations中, 然后在commit时统一写入WAL中, 因此长事务在commit时的IO耗时较大, 后续应该改为分批次进行WAL刷入, 这里我留了个issues: https://github.com/ToniXWD/toni-lsm/issues/6 ,感兴趣的朋友可以自己实现一下

2.3 WAL刷盘函数

commit函数中的write_to_wal函数实现了WAL文件的刷盘操作, 其具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// src/lsm/transation.cpp
bool TranManager::write_to_wal(const std::vector<Record> &records) {
try {
wal->log(records, true);
} catch (const std::exception &e) {
return false;
}

return true;
}

// src/wal/wal.cpp
void WAL::log(const std::vector<Record> &records, bool force_flush) {
std::unique_lock<std::mutex> lock(mutex_);

// 将 records 的所有记录添加到 log_buffer_
for (const auto &record : records) {
log_buffer_.push_back(record);
}

if (log_buffer_.size() < buffer_size_ && !force_flush) {
// 如果 log_buffer_ 的大小小于 buffer_size_ 且 force_flush 为 false,
// 不进行写入
return;
}

// 否则写入 wal 文件
auto pre_buffer = std::move(log_buffer_);
for (const auto &record : pre_buffer) {
std::vector<uint8_t> encoded_record = record.encode();
log_file_.append(encoded_record);
}
if (!log_file_.sync()) {
// 确保日志立即写入磁盘
throw std::runtime_error("Failed to sync WAL file");
}

auto cur_file_size = log_file_.size();
if (cur_file_size > file_size_limit_) {
reset_file();
}
}

这里的核心函数是WAL::log, 其会将一个数组中的内存刷入到内存缓存数组log_buffer_, 当其数组大小超过指定阈值时, 就会进行刷盘操作, 同时会重置log_buffer_。同时,类似commit, abort这样的事务操作需要立即刷盘以完成持久化, 这是可以手动指定force_flushtrue来强行刷入。这里的log_file_就是我们使用之前章节实现的FileObj包装的wal文件操作句柄。

2.4 WAL重放检查函数

这是这章最核心的部分, 数据库启动时, 我们需要检查WAL文件是否需要被重放, 即检查WAL文件中的Record是否需要被应用到数据库中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// src/lsm/engine.cpp
LSM::LSM(std::string path)
: engine(std::make_shared<LSMEngine>(path)),
tran_manager_(std::make_shared<TranManager>(path)) {
tran_manager_->set_engine(engine);
auto check_recover_res = tran_manager_->check_recover();
for (auto &[tranc_id, records] : check_recover_res) {
tran_manager_->update_max_finished_tranc_id(tranc_id);
for (auto &record : records) {
if (record.getOperationType() == OperationType::PUT) {
engine->put(record.getKey(), record.getValue(), tranc_id);
} else if (record.getOperationType() == OperationType::DELETE) {
engine->remove(record.getKey(), tranc_id);
}
}
}
tran_manager_->init_new_wal();
}

数据库初始化时, 构造函数会调用check_recover判断是否需要进行WAL重放, 其执行的逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// src/lsm/transation.cpp
std::map<uint64_t, std::vector<Record>> TranManager::check_recover() {
std::map<uint64_t, std::vector<Record>> wal_records =
WAL::recover(data_dir_, max_flushed_tranc_id_);
return wal_records;
}

// src/wal/wal.cpp
std::map<uint64_t, std::vector<Record>>
WAL::recover(const std::string &log_dir, uint64_t max_flushed_tranc_id) {
std::map<uint64_t, std::vector<Record>> tranc_records{};

// 引擎启动时判断
if (!std::filesystem::exists(log_dir)) {
return tranc_records;
}

// 遍历log_dir下的所有文件
std::vector<std::string> wal_paths;
for (const auto &entry : std::filesystem::directory_iterator(log_dir)) {
if (entry.is_regular_file()) {
// 获取/符号后的文件名
std::string filename = entry.path().filename().string();
if (filename.substr(0, 4) != "wal.") {
continue;
}

wal_paths.push_back(entry.path().string());
}
}

// 按照seq升序排序
std::sort(wal_paths.begin(), wal_paths.end(),
[](const std::string &a, const std::string &b) {
auto a_seq_str = a.substr(a.find_last_of(".") + 1);
auto b_seq_str = b.substr(b.find_last_of(".") + 1);
return std::stoi(a_seq_str) < std::stoi(b_seq_str);
});

// 读取所有的记录
for (const auto &wal_path : wal_paths) {
auto wal_file = FileObj::open(wal_path, false);
auto wal_records_slice = wal_file.read_to_slice(0, wal_file.size());
auto records = Record::decode(wal_records_slice);
for (const auto &record : records) {
if (record.getTrancId() > max_flushed_tranc_id) {
// 如果记录的 tranc_id 大于 max_finished_tranc_id, 才需要尝试恢复
tranc_records[record.getTrancId()].push_back(record);
}
}
}

return tranc_records;
}

WAL::recover是核心的WAL检测函数, 其会将WAL中不同事务的记录进行整理, 放到一个map中, map的每个key即为事务的id, value即为事务中包含的所有RecordWAL::recoverkey与当前数据持久化信息中的max_flushed_tranc_id进行比较, 只保留记录的 tranc_id 大于 max_finished_tranc_id的记录, 否则其已经持久化到sst中, 因此不需要再进行恢复。

WAL::recover检查只保留要重放的记录, 具体的重放操作由LSM::LSM中完成, 即构造函数中的for循环部分:

1
2
3
4
5
6
7
8
9
10
for (auto &[tranc_id, records] : check_recover_res) {
tran_manager_->update_max_finished_tranc_id(tranc_id);
for (auto &record : records) {
if (record.getOperationType() == OperationType::PUT) {
engine->put(record.getKey(), record.getValue(), tranc_id);
} else if (record.getOperationType() == OperationType::DELETE) {
engine->remove(record.getKey(), tranc_id);
}
}
}

2.5 WAL清理函数

WAL中的文件内容, 在数据运用到数据库并刷入了SST后就没有实际意义了, 这时我们需要清理WAL文件, 删除多余的WAL文件, 并且将WAL文件指针指向最新的WAL文件。这里我们如下设计:

  1. wal文件格式为wal.0, wal.1, wal.2, …, wal.n, 其中n为文件序号, wal.0为最旧文件, wal.n为最新文件
  2. 单个wal文件有容量限制, 超出容量就需要创建新的wal文件, 并且将wal文件指针指向最新的wal文件
  3. 任意时刻只有1个wal文件处于允许写入状态, 其余的wal文件处于关闭状态
  4. 清理函数根据目前已经被持久化的事务的id, 周期性地从小到大遍历wal文件, 删除所有记录已经被持久化的wal文件,

其清理函数为:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// src/wal/wal.cpp
void WAL::cleanWALFile() {
// 遍历log_file_所在的文件夹
std::string dir_path;

std::unique_lock<std::mutex> lock(mutex_); // 只在获取当前文件路径时获取锁
if (active_log_path_.find("/") != std::string::npos) {
dir_path =
active_log_path_.substr(0, active_log_path_.find_last_of("/")) + "/";
} else {
dir_path = "./";
}
lock.unlock();

// wal文件格式为:
// wal.seq

std::vector<std::pair<size_t, std::string>> wal_paths;

for (const auto &entry : std::filesystem::directory_iterator(dir_path)) {
if (entry.is_regular_file() &&
entry.path().filename().string().substr(0, 4) == "wal.") {
std::string filename = entry.path().filename().string();
size_t dot_pos = filename.find_last_of(".");
std::string seq_str = filename.substr(dot_pos + 1);
uint64_t seq = std::stoull(seq_str);
wal_paths.push_back({seq, entry.path().string()});
}
}

// 按照seq升序排序
std::sort(wal_paths.begin(), wal_paths.end(),
[](const std::pair<size_t, std::string> &a,
const std::pair<size_t, std::string> &b) {
return a.first < b.first;
});

// 判断是否可以删除
std::vector<FileObj> del_paths;
for (int idx = 0; idx < wal_paths.size() - 1; idx++) {
auto cur_path = wal_paths[idx].second;
auto cur_file = FileObj::open(cur_path, false);
// 遍历文件记录, 读取所有的tranc_id,
// 判断是否都小于等于max_finished_tranc_id_
size_t offset = 0;
bool has_unfinished = false;
while (offset + sizeof(uint16_t) < cur_file.size()) {
uint16_t record_size = cur_file.read_uint16(offset);
uint64_t tranc_id = cur_file.read_uint64(offset + sizeof(uint16_t));
if (tranc_id > max_finished_tranc_id_) {
has_unfinished = true;
break;
}
}
if (!has_unfinished) {
del_paths.push_back(std::move(cur_file));
}
}

for (auto &del_file : del_paths) {
del_file.del_file();
}
}

其在构造函数初始化时, 会启动一个函数周期性地运行cleanWALFile:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// src/wal/wal.cpp
// 从零开始的初始化流程
WAL::WAL(const std::string &log_dir, size_t buffer_size,
uint64_t max_finished_tranc_id, uint64_t clean_interval,
uint64_t file_size_limit)
: buffer_size_(buffer_size), max_finished_tranc_id_(max_finished_tranc_id),
stop_cleaner_(false), clean_interval_(clean_interval),
file_size_limit_(file_size_limit) {
active_log_path_ = log_dir + "/wal.0";
log_file_ = FileObj::open(active_log_path_, true);

cleaner_thread_ = std::thread(&WAL::cleaner, this);
}

void WAL::cleaner() {
while (true) {
{
// 睡眠 clean_interval_ s
std::this_thread::sleep_for(std::chrono::seconds(clean_interval_));
if (stop_cleaner_) {
break;
}
cleanWALFile();
}
}
}

3 总结

WALLSM树中一个重要的组件, 它的作用是保证数据的持久化, 并且WAL中的数据可以保证数据的一致性。至此,我们的LSm Tree的大部分基础功能都已经实现完毕, 包括基础的api, 事务, MVCC, WAL和崩溃恢复。下一章, 我们完成SSTcompact后, LSM Tree这个项目就接近完结啦