C++从零开始实现LSM-Tree-KV存储-06-Block缓存池

上一小节完成了文件IO的封装类, 这一小节为Block设置一个缓存池

代码仓库:https://github.com/ToniXWD/toni-lsm

1 缓存池原理与设计

1.1 缓存池的作用

在LSM Tree的实现中,数据读取是以Block为单位进行的。为了提高热点Block的读取效率,我们引入了缓存池(Cache Pool)。缓存池的主要作用主要是减少磁盘I/O, 通过将频繁访问的Block缓存到内存中,可以显著减少对磁盘的读取次数,从而提高系统的整体性能。因此这里我们的目的就是尽量让热点Block数据常驻内存。

1.2 LRU-K算法原理

这里我采用LRU-K算法来管理缓存池。其实主要原因是以前做过CMN15445, 里面实现过LRU-K算法, 稍作改进可以直接拿来用, 这里就偷个懒了。照例还是先简单介绍下LRU-K

LRU-K(Least Recently Used with K accesses)是一种基于访问频率和时间的缓存淘汰策略。相比于传统的LRU(最近最少使用),LRU-K考虑了更复杂的访问模式,能够更好地适应实际应用场景中的数据访问特性。LRU-K仍然是使用键值对来索引缓存内容。

设计思路

  • Key:表示在决定淘汰某个缓存项时,会考虑该缓存项在过去K次访问的时间间隔。当K=1时,LRU-K退化为普通的LRU。这里我的的Blcok可以由(sst_id, block_id)唯一索引, 因此缓存池的的Block索引键可以设计为std::pair<sst_id, block_id>
  • 访问链表:每个缓存项都有一个访问历史记录,记录其最近K次的访问时间。当最近访问次数大于等于K次, 则放在链表首部, 不足K次的放在后半部分, 按照访问时间排序。

淘汰规则

  • 当缓存池满时,选择那些在过去K次访问中时间间隔最长的缓存项进行淘汰。
  • 如果多个缓存项的时间间隔相同,则选择最早插入缓存池的那个项。

2 代码实现

2.1 设计思路

这里采用一个简化的LRU-K实现, 我们不需要记录访问的时间戳, 而是用链表的顺序来表达最新的访问记录, 最新的访问置于链表头部。另一方面,为保证快速查询,我们还需要一个哈希表,这个哈希表的索引键为std::pair<sst_id, block_id>, 索引值为链表的迭代器, 因此结合二者可以简单实现查询和新增均为O(1)的缓存池。那怎么表达LRU-K呢? 很简单, 采用2个链表, 一个存储访问次数少于k次的缓存项, 一个存储访问次数大于等于k次的缓存项。

首先看一下头文件的关键定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
struct CacheItem {
int sst_id;
int block_id;
std::shared_ptr<Block> cache_block;
uint64_t access_count; // 访问时间戳
};
// 定义缓存池
class BlockCache {
public:
...

private:
size_t capacity_; // 缓存容量
size_t k_; // LRU-K 中的 K 值

// 双向链表存储缓存项
std::list<CacheItem> cache_list_greater_k;
std::list<CacheItem> cache_list_less_k;

// 哈希表索引缓存项
std::unordered_map<std::pair<int, int>, std::list<CacheItem>::iterator,
pair_hash, pair_equal>
cache_map_;

...
};

我们结合cache_list_greater_k, cache_list_less_kcache_map_来整理一下流程:

查询

  1. 先通过cache_map_查询缓存项的迭代器, 通过迭代器的访问次数判断其属于哪个链表。
    1. 访问次数 < k -> 属于 cache_list_less_k
      1. 如果访问次数 == k-1,更新访问次数后就就等于k了,将其重新置于cache_list_greater_k的头部。
      2. 否则更新后仍然属于cache_list_greater_k,但需要将其移动到cache_list_greater_k的头部。
    2. 访问次数 = k -> 属于 cache_list_greater_k, 将其置于cache_list_greater_k的头部。但访问次数就固定在k, 不继续增长了

插入

  1. 先通过cache_map_查询缓存项是否存在, 存在则按照之前的步骤更新缓存项即可
  2. 不存在的话, 判断缓存池是否已满
    1. 没有满
      1. 直接插入到cache_list_less_k头部,并更新cache_map_索引。
    2. 满了
      1. 如果cache_list_less_k不为空,从cache_list_less_k末尾淘汰一个缓存项,并插入新的缓存项到cache_list_less_k头部。
      2. 如果cache_list_less_k为空,从cache_list_greater_k末尾淘汰一个缓存项,并插入新的缓存项到cache_list_less_k

2.2 具体实现

接下来就是具体代码了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
#include "../../include/block/block_cache.h"
#include "../../include/block/block.h"
#include <chrono>
#include <list>
#include <memory>
#include <mutex>
#include <unordered_map>

BlockCache::BlockCache(size_t capacity, size_t k)
: capacity_(capacity), k_(k) {}

BlockCache::~BlockCache() = default;

std::shared_ptr<Block> BlockCache::get(int sst_id, int block_id) {
std::lock_guard<std::mutex> lock(mutex_);
++total_requests_; // 增加总请求数
auto key = std::make_pair(sst_id, block_id);
auto it = cache_map_.find(key);
if (it == cache_map_.end()) {
return nullptr; // 缓存未命中
}

++hit_requests_; // 增加命中请求数
// 更新访问次数
update_access_count(it->second);

return it->second->cache_block;
}

void BlockCache::put(int sst_id, int block_id, std::shared_ptr<Block> block) {
std::lock_guard<std::mutex> lock(mutex_);
auto key = std::make_pair(sst_id, block_id);
auto it = cache_map_.find(key);

if (it != cache_map_.end()) {
// 更新已有缓存项
// ! 照理说 Block 类的数据是不可变的,这里的更新分支应该不会存在,
// 只是debug用
it->second->cache_block = block;
update_access_count(it->second);
} else {
// 插入新缓存项
if (cache_map_.size() >= capacity_) {
// 移除最久未使用的缓存项
if (!cache_list_less_k.empty()) {
// 优先从 cache_list_less_k 中移除
cache_map_.erase(std::make_pair(cache_list_less_k.back().sst_id,
cache_list_less_k.back().block_id));
cache_list_less_k.pop_back();
} else {
cache_map_.erase(std::make_pair(cache_list_greater_k.back().sst_id,
cache_list_greater_k.back().block_id));
cache_list_greater_k.pop_back();
}
}

CacheItem item = {sst_id, block_id, block, 1};
cache_list_less_k.push_front(item);
cache_map_[key] = cache_list_less_k.begin();
}
}

double BlockCache::hit_rate() const {
std::lock_guard<std::mutex> lock(mutex_);
return total_requests_ == 0
? 0.0
: static_cast<double>(hit_requests_) / total_requests_;
}

size_t BlockCache::current_time() const {
return std::chrono::steady_clock::now().time_since_epoch().count();
}

void BlockCache::update_access_count(std::list<CacheItem>::iterator it) {
++it->access_count;
if (it->access_count < k_) {
// 更新后仍然位于cache_list_less_k
// 重新置于cache_list_less_k头部
cache_list_less_k.splice(cache_list_less_k.begin(), cache_list_less_k, it);
} else if (it->access_count == k_) {
// 更新后满足k次访问, 升级链表
// 从 cache_list_less_k 移动到 cache_list_greater_k 头部
auto item = *it;
cache_list_less_k.erase(it);
cache_list_greater_k.push_front(item);
cache_map_[std::make_pair(item.sst_id, item.block_id)] =
cache_list_greater_k.begin();
} else if (it->access_count > k_) {
// 本来就位于 cache_list_greater_k
// 移动到 cache_list_greater_k 头部
cache_list_greater_k.splice(cache_list_greater_k.begin(),
cache_list_greater_k, it);
}
}

3 更新block读取逻辑

有了缓存池后我们可以更新block的读取逻辑,即在读取block时先从缓存池中查找,如果命中则直接返回,否则从磁盘中读取:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 这里的 block_cache 就是我们实现的缓存池的指针
std::shared_ptr<Block> SST::read_block(size_t block_idx) {
if (block_idx >= meta_entries.size()) {
throw std::out_of_range("Block index out of range");
}

// 先从缓存中查找
if (block_cache != nullptr) {
auto cache_ptr = block_cache->get(this->sst_id, block_idx);
if (cache_ptr != nullptr) {
return cache_ptr;
}
} else {
throw std::runtime_error("Block cache not set");
}

const auto &meta = meta_entries[block_idx];
size_t block_size;

// 计算block大小
if (block_idx == meta_entries.size() - 1) {
block_size = meta_block_offset - meta.offset;
} else {
block_size = meta_entries[block_idx + 1].offset - meta.offset;
}

// 读取block数据
auto block_data = file.read_to_slice(meta.offset, block_size);
auto block_res = Block::decode(block_data, true);

// 更新缓存
if (block_cache != nullptr) {
block_cache->put(this->sst_id, block_idx, block_res);
} else {
throw std::runtime_error("Block cache not set");
}
return block_res;
}

其他地方还需要对缓存池进行初始化等操作, 但都是琐碎但简单的操作, 这里不陈述了, 建议直接看我的Github 源码

4 性能测试

照理说, block原来从硬盘读取很费时间的, 这里我们实现缓存池后, 性能有质的飞跃, 那不妨找一个测试来试试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
TEST_F(LSMTest, Persistence) {
std::unordered_map<std::string, std::string> kvs;
int num = 100000;
{
LSM lsm(test_dir);
for (int i = 0; i < num; ++i) {
std::string key = "key" + std::to_string(i);
std::string value = "value" + std::to_string(i);
lsm.put(key, value);
kvs[key] = value;

// 删除之前被10整除的key
if (i % 10 == 0 && i != 0) {
std::string del_key = "key" + std::to_string(i - 10);
lsm.remove(del_key);
kvs.erase(del_key);
}
}
} // LSM destructor called here

// Create new LSM instance
LSM lsm(test_dir);
for (int i = 0; i < num; ++i) {
std::string key = "key" + std::to_string(i);
if (kvs.find(key) != kvs.end()) {
EXPECT_EQ(lsm.get(key).value(), kvs[key]);
} else {
EXPECT_FALSE(lsm.get(key).has_value());
}
}
}

这个测试量够大, 也包含从文件读取block这一逻辑, 我们看看其测试时间:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
$ time xmake run test_lsm
[==========] Running 5 tests from 1 test suite.
[----------] Global test environment set-up.
[----------] 5 tests from LSMTest
[ RUN ] LSMTest.BasicOperations
[ OK ] LSMTest.BasicOperations (0 ms)
[ RUN ] LSMTest.Persistence
[ OK ] LSMTest.Persistence (533 ms)
[ RUN ] LSMTest.LargeScaleOperations
[ OK ] LSMTest.LargeScaleOperations (4 ms)
[ RUN ] LSMTest.IteratorOperations
[ OK ] LSMTest.IteratorOperations (0 ms)
[ RUN ] LSMTest.MixedOperations
[ OK ] LSMTest.MixedOperations (0 ms)
[----------] 5 tests from LSMTest (539 ms total)

[----------] Global test environment tear-down
[==========] 5 tests from 1 test suite ran. (539 ms total)
[ PASSED ] 5 tests.
xmake run test_lsm 0.61s user 0.01s system 98% cpu 0.629 total

我们checkout到之前的commit比较一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
$ time xmake run test_lsm
[==========] Running 5 tests from 1 test suite.
[----------] Global test environment set-up.
[----------] 5 tests from LSMTest
[ RUN ] LSMTest.BasicOperations
[ OK ] LSMTest.BasicOperations (0 ms)
[ RUN ] LSMTest.Persistence
[ OK ] LSMTest.Persistence (1051 ms)
[ RUN ] LSMTest.LargeScaleOperations
[ OK ] LSMTest.LargeScaleOperations (4 ms)
[ RUN ] LSMTest.IteratorOperations
[ OK ] LSMTest.IteratorOperations (0 ms)
[ RUN ] LSMTest.MixedOperations
[ OK ] LSMTest.MixedOperations (0 ms)
[----------] 5 tests from LSMTest (1057 ms total)

[----------] Global test environment tear-down
[==========] 5 tests from 1 test suite ran. (1057 ms total)
[ PASSED ] 5 tests.
xmake run test_lsm 1.02s user 0.12s system 98% cpu 1.160 total

可以看到速度提升差不多2倍, 从原来的1051 ms加速到533 ms, 并且其中还有部分时间是sst的编解码, 除去后性能提升会更明显。