这节课针对之前实现的LSM-Tree
设计布隆过滤器来减少无效的磁盘IO
代码仓库:ToniXWD/toni-lsm: A KV storage engine based on LSM Tree, supporting Redis RESP
欢迎点个Star
1 布隆过滤器简介
1.1 原理介绍
布隆过滤器是一种 probabilistic
数据结构,用于判断一个元素是否在集合中。它通过一个概率的方式来判断,即使集合中存在一个元素,但是布隆过滤器也依然可以判断出来。
布隆过滤器的核心思想是使用一个位数组和多个哈希函数来表示一个集合。它的特点是可以高效地判断某个元素是否“可能存在”或“确定不存在”,但它有一定的误判率,即可能会误判某个不存在的元素为“可能存在”。其实现方式如下:
- 初始化位数组:布隆过滤器使用一个固定大小的位数组(如长度为
m
),所有位初始化为 0
。
- 插入元素时在位数组记录:当插入一个元素时,使用
k
个独立的哈希函数对该元素进行哈希运算,得到 k
个哈希值(每个值对应位数组的索引)。将这些索引位置的位设置为 1
。
- 查询元素时在位数组校验:当查询一个元素是否存在时,同样使用
k
个哈希函数对该元素进行哈希运算,得到 k
个哈希值。如果这些哈希值对应的位数组中的所有位都为 1
,则判断该元素“可能存在”;如果有任意一位为 0
,则判断该元素“确定不存在”。
特点
- 优点:布隆过滤器的空间效率和查询效率非常高,适合处理大规模数据。
- 缺点:存在一定的误判率(即可能会误判不存在的元素为“可能存在”),并且无法删除元素(删除可能会影响其他元素的判断)。
1.2 案例介绍
下面我们通过一个案例来看看布隆过滤器的运行机制:

案例图片来源自 https://blog.csdn.net/Zyw907155124/article/details/135556557, 侵删
- 在新增一个记录时(
put/insert
等), 通过预定义数量(案例图片中是2个)的哈希函数对key
进行哈希, 将哈希值对位数组的长度取模, 将结果作为索引, 将索引位置的位设置为1
- 在查询时(
get
), 通过预定义数量(案例图片中是2个)的哈希函数对key
进行哈希, 将哈希值对位数组的长度取模, 将结果作为索引, 如果索引位置的位为1, 则认为该key
可能存在, 否则认为该key
不存在
2 LSM-Tree中的布隆过滤器
尽管我们之前为整个LSM-Tree
设计了bock cache
, 但面对大量不存在于LSM-Tree
中的key
时, bock cache
会频繁的进行磁盘IO, 这将导致LSM-Tree
的性能降低。
这里的原理类似后端八股文中Redis
常问的缓存击穿、缓存穿透、缓存雪崩,而设置布隆过滤器就是一个景点的解决方案,背过八股的同学们应该会很熟悉。
因此,我们可以在编码SST
文件时,为每个SST
文件都设计一个bloom filter
, 在其中记录整个SST
中的所有key
, 并将其持久化到文件系统中, 在读取文件时, 相应的布隆过滤器部分也需要被解码并加载到内存中。这样当查询时, 我们只需要判断key
是否存在于bloom filter
中, 如果不存在, 则认为该key
不存在于LSM-Tree
, 否则, 我们需要继续在LSM-Tree
中查找。这样以来, 我们就可以避免大量的无效访问。
3 布隆过滤器代码实现
3.1 设计思路
现在我们就可以为我们的LSM-Tree
设计一个布隆过滤器了,我们需要完成如下的功能:
- 能够在初始化时通过假阳性率和键的数量来确定哈希函数的数量和位数组的长度
- 支持编码与解码
对于通过假阳性率和键的数量来确定哈希函数的数量和位数组的长度
, 我们可以按照指定的公式求取, 但问题在于我们是不知道哈希函数的数量的, 难道我们要手动写很多个哈希函数吗?
我们可以采用这样的方案: 只写2个哈希函数, 通过对这两个哈希函数的线性组合构造新的哈希函数
3.2 代码实现
3.2.1 头文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| class BloomFilter { public: BloomFilter(); BloomFilter(size_t expected_elements, double false_positive_rate);
BloomFilter(size_t expected_elements, double false_positive_rate, size_t num_bits);
void add(const std::string &key);
bool possibly_contains(const std::string &key) const;
void clear();
std::vector<uint8_t> encode(); static BloomFilter decode(const std::vector<uint8_t> &data);
private: size_t expected_elements_; double false_positive_rate_; size_t num_bits_; size_t num_hashes_; std::vector<bool> bits_;
private: size_t hash1(const std::string &key) const;
size_t hash2(const std::string &key) const;
size_t hash(const std::string &key, size_t idx) const; };
|
3.2.2 构造函数
构造函数就是计算需要的哈希函数数量和位数组大小:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| BloomFilter::BloomFilter(){};
BloomFilter::BloomFilter(size_t expected_elements, double false_positive_rate) : expected_elements_(expected_elements), false_positive_rate_(false_positive_rate) { double m = -static_cast<double>(expected_elements) * std::log(false_positive_rate) / std::pow(std::log(2), 2);
num_bits_ = static_cast<size_t>(std::ceil(m));
num_hashes_ = static_cast<size_t>(std::ceil(m / expected_elements * std::log(2)));
bits_.resize(num_bits_, false); }
|
构造函数中, 假阳性率、键值对数量、哈希函数数量、位数组数量间的关系可以通过下面的公式计算:
1 2
| double m = -static_cast<double>(expected_elements) * std::log(false_positive_rate) / std::pow(std::log(2), 2);
|
也就是:

这里的公式推导不介绍了, 就是个数学问题
3.2.3 添加记录
添加记录时, 我们根据哈希函数的序号, 使用2个不同的哈希函数的线性组合来构造一个新的哈希函数计算位数组的索引:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| void BloomFilter::add(const std::string &key) { for (size_t i = 0; i < num_hashes_; ++i) { bits_[hash(key, i)] = true; } }
size_t BloomFilter::hash1(const std::string &key) const { std::hash<std::string> hasher; return hasher(key); }
size_t BloomFilter::hash2(const std::string &key) const { std::hash<std::string> hasher; return hasher(key + "salt"); }
size_t BloomFilter::hash(const std::string &key, size_t idx) const { auto h1 = hash1(key); auto h2 = hash2(key); return (h1 + idx * h2) % num_bits_; }
|
3.2.4 判断key否存在
布隆过滤器检查key
是否存在时, 我们只需要判断哈希函数的线性组合的计算结果对应的位数组中的位是否都为true
即可:
1 2 3 4 5 6 7 8 9 10 11
| bool BloomFilter::possibly_contains(const std::string &key) const { for (size_t i = 0; i < num_hashes_; ++i) { auto bit_idx = hash(key, i); if (!bits_[bit_idx]) { return false; } } return true; }
|
3.2.5 编解码
这里编解码在逻辑上也非常简单, 但在C++
语法上有一个坑需要注意:
CPP
的std::vector<bool>
是以bit
为基础操作的, 也就是说假设有下面的代码:
1 2
| std::vector<bool> bits(8, false); bits[0] = true;
|
这里的[0]
表示对第一个bit
进行操作, 而不是对第一个byte
进行操作!!!
因此, 这里编解码时, 需要额外确保字节的对其:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
| std::vector<uint8_t> BloomFilter::encode() { std::vector<uint8_t> data;
data.insert(data.end(), reinterpret_cast<const uint8_t *>(&expected_elements_), reinterpret_cast<const uint8_t *>(&expected_elements_) + sizeof(expected_elements_));
data.insert(data.end(), reinterpret_cast<const uint8_t *>(&false_positive_rate_), reinterpret_cast<const uint8_t *>(&false_positive_rate_) + sizeof(false_positive_rate_));
data.insert(data.end(), reinterpret_cast<const uint8_t *>(&num_bits_), reinterpret_cast<const uint8_t *>(&num_bits_) + sizeof(num_bits_));
data.insert(data.end(), reinterpret_cast<const uint8_t *>(&num_hashes_), reinterpret_cast<const uint8_t *>(&num_hashes_) + sizeof(num_hashes_));
size_t num_bytes = (num_bits_ + 7) / 8; for (size_t i = 0; i < num_bytes; ++i) { uint8_t byte = 0; for (size_t j = 0; j < 8; ++j) { if (i * 8 + j < num_bits_) { byte |= (bits_[i * 8 + j] << j); } } data.push_back(byte); }
return data; }
BloomFilter BloomFilter::decode(const std::vector<uint8_t> &data) { size_t index = 0;
size_t expected_elements; std::memcpy(&expected_elements, &data[index], sizeof(expected_elements)); index += sizeof(expected_elements);
double false_positive_rate; std::memcpy(&false_positive_rate, &data[index], sizeof(false_positive_rate)); index += sizeof(false_positive_rate);
size_t num_bits; std::memcpy(&num_bits, &data[index], sizeof(num_bits)); index += sizeof(num_bits);
size_t num_hashes; std::memcpy(&num_hashes, &data[index], sizeof(num_hashes)); index += sizeof(num_hashes);
std::vector<bool> bits(num_bits, false); size_t num_bytes = (num_bits + 7) / 8; for (size_t i = 0; i < num_bytes; ++i) { uint8_t byte = data[index++]; for (size_t j = 0; j < 8; ++j) { if (i * 8 + j < num_bits) { bits[i * 8 + j] = (byte >> j) & 1; } } }
BloomFilter bf; bf.expected_elements_ = expected_elements; bf.false_positive_rate_ = false_positive_rate; bf.num_bits_ = num_bits; bf.num_hashes_ = num_hashes; bf.bits_ = bits;
return bf; }
|
4 LSM-Tree引入布隆过滤器
4.1 布隆过滤器的编码
布隆过滤器是编码到sst
文件中的, 我们需要更新sst
的编码格式如下:

我们在Block
的元数据块后面加入了布隆过滤器的编码部分, 同时将其偏移量也记录在了Extra Information
中。
编解码格式变化了, 相应的函数也需要更新, 但这里就不展开了, 毕竟流程类似的, 可以看仓库源码
我们只需要在编码sst
时, 每次add
时, 添加对这个key
的记录就可以了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| void SSTBuilder::add(const std::string &key, const std::string &value) { if (first_key.empty()) { first_key = key; }
if (bloom_filter != nullptr) { bloom_filter->add(key); }
if (block.add_entry(key, value)) { last_key = key; return; }
finish_block();
block.add_entry(key, value); first_key = key; last_key = key; }
|
4.2 布隆过滤器的解码
在open
一个sst
时, 将布隆过滤器的部分解码并初始化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| std::shared_ptr<SST> SST::open(size_t sst_id, FileObj file, std::shared_ptr<BlockCache> block_cache) { ...
if (sst->bloom_offset + 2 * sizeof(uint32_t) < file_size) { uint32_t bloom_size = file_size - sst->bloom_offset - sizeof(uint32_t) * 2; auto bloom_bytes = sst->file.read_to_slice(sst->bloom_offset, bloom_size);
auto bloom = BloomFilter::decode(bloom_bytes); sst->bloom_filter = std::make_shared<BloomFilter>(std::move(bloom)); }
...
return sst; }
|
4.3 布隆过滤器的使用
同时, 在查询时需要用布隆过滤器过滤无效请求:
1 2 3 4 5 6 7 8 9
| size_t SST::find_block_idx(const std::string &key) { if (bloom_filter != nullptr && !bloom_filter->possibly_contains(key)) { return -1; }
... }
|