C++从零开始实现LSM-Tree-KV存储-11-Redis兼容2

这一小节继续基于我们目前已有的LSM Tree存储引擎, 实现Redis相关api的封装与兼容

代码仓库:https://github.com/ToniXWD/toni-lsm

欢迎点个Star

1 Redis List 实现

1.1 实现思路

这里也不介绍Redis list的语法了, 既然看这篇文章, 相比大家对Redis非常熟悉了, 这里我选择实现如下常见的api:

  • lpush
  • rpush
  • lpop
  • rpop
  • llen
  • lrange

这里其实有2个方案:

  1. lsit所有元素编码到一个value中, 然后通过lpush/rpush/lpop/rpop/llen/lrange来对这一个value进行操作
  2. list不同元素存储在不同的键值对中, 其存储时需要满足排序性, 这样在lrange时就可以利用我们的存储引擎提供的前缀查询或范围查询

虽然方案2看似很好, 但其实我们后续的zset会采取方案2, 这里的链表如果采取相同的方案的话, 显得有些多余, 所以这里我们选择方案1

1.2 TTL

首先, 我们需要为这个lsit也实现TTL机制. 首先需要明白, RedisTTL 只能对整个键(key)设置过期时间,而不能针对列表(list)、集合(set)、哈希(hash)等数据结构中的单个成员单独设置过期时间。

但这里引入了与之前的简单kv不同的实现难点, 就是我们的set, list这些复杂数据结构的所有成员的TTL都被统一的上层key管理, 我们每次查询成员或写入成员(例如lpush)时, 都需要判断TTL是否到期, 如果到期需要进行清理工作.

因此这里我们单独设置一个清理判断的函数, 其会用于后续所有与list相关的操作中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
bool RedisWrapper::expire_list_clean(
const std::string &key, std::shared_lock<std::shared_mutex> &rlock) {
std::string expire_key = get_explire_key(key);
auto expire_query = this->lsm->get(expire_key);
if (is_expired(expire_query, nullptr)) {
// 链表都过期了, 需要删除链表
// 先升级锁
rlock.unlock(); // 解锁读锁
std::unique_lock<std::shared_mutex> wlock(redis_mtx); // 写锁
lsm->remove(key);
lsm->remove(expire_key);
return true;
}
return false;
}

这里的get_explire_key就是将key加上一个固定的前缀, 该key存储其过期时间的时间戳
需要注意的是, 这里首先是读锁, 如果需要进行清理工作, 会升级为写锁
返回值为true表示链表已经过期, 这里主要是表面这里获取的写锁, 不需要再次进行锁升级(参考下一小节)

1.3 lpush/rpush

这里的lpush/rpush很简单, 就是在get到的keyvalue的左侧或右侧添加新成员, 但需要注意的是需要再执行查询前判断TTL是否到期并执行清理工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 链表操作
std::string RedisWrapper::redis_lpush(const std::string &key,
const std::string &value) {
std::shared_lock<std::shared_mutex> rlock(redis_mtx); // 读锁
bool is_expire = expire_list_clean(key, rlock);

if (!is_expire) {
// 如果过期了, 会执行清理操作, expire_list_clean 会升级读锁
// 没有过期的话, 读锁仍然存在, 需要手动释放
rlock.unlock();
}
std::unique_lock<std::shared_mutex> lock(redis_mtx); // 写锁

auto list_opt = lsm->get(key);
std::string list_value = list_opt.value_or("");
if (!list_value.empty()) {
list_value = value + REDIS_LIST_SEPARATOR + list_value;
} else {
list_value = value;
}

lsm->put(key, list_value);
return ":" + std::to_string(split(list_value, REDIS_LIST_SEPARATOR).size()) +
"\r\n";
}

但这里有2个很容易疏忽的点就是:

  1. 如果没有过期或者根本没有设置TTL的话, expire_list_clean不会对读锁进行升级, 因此需要手动释放读锁, 否则后续的写锁会陷入死锁
  2. 这里如果过期且进行了清理操作, 也不会影响这一次lpush, 因为这就是一个重建list的操作

rpushlpush的实现如出一辙, 这里直接略过了

1.4 lpop/rpop

这里的整体逻辑和lpush/rpush差不多, 只需要删除value的左侧或右侧的值, 同时需要判断TTL是否到期并执行清理工作

但这里如果TTL到期了或者整个list就不存在吗, 直接返回就是了, 而不需要像lpush/rpush那样进行重建操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
std::string RedisWrapper::redis_lpop(const std::string &key) {
std::shared_lock<std::shared_mutex> rlock(redis_mtx); // 读锁
bool is_expire = expire_list_clean(key, rlock);

if (is_expire) {
return "$-1\r\n";
}

// 没过期的情况, 需要手动释放读锁
rlock.unlock(); // 升级锁
std::unique_lock<std::shared_mutex> lock(redis_mtx); // 写锁

auto list_opt = lsm->get(key);
if (!list_opt.has_value()) {
return "$-1\r\n"; // 表示链表不存在
}

std::vector<std::string> elements =
split(list_opt.value(), REDIS_LIST_SEPARATOR);
if (elements.empty()) {
return "$-1\r\n"; // 表示链表为空
}

std::string value = elements.front();
elements.erase(elements.begin());

if (elements.empty()) {
lsm->remove(key);
} else {
lsm->put(key, join(elements, REDIS_LIST_SEPARATOR));
}
return "$" + std::to_string(value.size()) + "\r\n" + value + "\r\n";
}

同理, rpop的实现如出一辙, 这里直接略过了

1.5 llen

llen最简单了, 解析这个字符串就可以了, 判断其value中指定的分隔符REDIS_LIST_SEPARATOR分割出了多少个元素就可以了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
std::string RedisWrapper::redis_llen(const std::string &key) {
std::shared_lock<std::shared_mutex> rlock(redis_mtx); // 读锁
bool is_expired = expire_list_clean(key, rlock);

if (is_expired) {
return ":0\r\n";
}

auto list_opt = lsm->get(key);
if (!list_opt.has_value()) {
return ":0\r\n"; // 表示链表不存在
}

std::vector<std::string> elements =
split(list_opt.value(), REDIS_LIST_SEPARATOR);
return ":" + std::to_string(elements.size()) + "\r\n";
}

1.6 lrange

lrange也是对value字符串进行操作, 但需要注意, 负数表示从尾部开始计算的索引, 这里我们需要想将其转化为正数的索引, 然后也是基于分隔符REDIS_LIST_SEPARATOR生成一个vector:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
std::string RedisWrapper::redis_lrange(const std::string &key, int start,
int stop) {
std::shared_lock<std::shared_mutex> rlock(redis_mtx); // 读锁
bool is_expire = expire_list_clean(key, rlock);

if (is_expire) {
return "*0\r\n";
}

auto list_opt = lsm->get(key);
if (!list_opt.has_value()) {
return "*0\r\n"; // 表示链表不存在
}

std::vector<std::string> elements =
split(list_opt.value(), REDIS_LIST_SEPARATOR);
if (elements.empty()) {
return "*0\r\n"; // 表示链表为空
}

if (start < 0)
start += elements.size();
if (stop < 0)
stop += elements.size();
if (start < 0)
start = 0;
if (stop >= elements.size())
stop = elements.size() - 1;
if (start > stop)
return "*0\r\n";

std::ostringstream oss;
oss << "*" << (stop - start + 1) << "\r\n";
for (int i = start; i <= stop; ++i) {
oss << "$" << elements[i].size() << "\r\n" << elements[i] << "\r\n";
}
return oss.str();
}

2 Redis Hash实现

2.1 实现思路

Redis的哈希结构就是一个key中管理了一个哈希表, 这里的实现我采用如下的思路:

  1. 整个hash数据结构的key存储的value是其所有filed的集合拼接的字符串
  2. 每个filedvalue单独用另一个键值对存储, 但filed不能直接作为一个key, 而是需要加上指定的前缀以标识这是一个哈希结构的filed

那么查询的逻辑就是, 将keyfiled拼接起来形成实际存储的key, 然后查询对应的value即可

2.2 TTL

这里的TTL设计和list几乎一致, 也就是清理时需要从hashkey存储的value中解析到所有的flieldkey, 然后进行清理即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
bool RedisWrapper::expire_hash_clean(
const std::string &key, std::shared_lock<std::shared_mutex> &rlock) {
std::string expire_key = get_explire_key(key);
auto expire_query = this->lsm->get(expire_key);

if (is_expired(expire_query, nullptr)) {
// 整个哈希数据结构都过期了, 需要删除所有的字段
// 先升级锁
rlock.unlock(); // 解锁读锁
std::unique_lock<std::shared_mutex> wlock(redis_mtx); // 写锁
auto fileds = get_fileds_from_hash_value(lsm->get(key));
for (const auto &field : fileds) {
std::string field_key = get_hash_filed_key(key, field);
lsm->remove(field_key);
}
lsm->remove(key);
lsm->remove(expire_key);
return true;
}
return false;
}

2.3 hset

hset的流程和list一致, 这里不做过多解释, 只是介绍下流程:

  1. 先判断TTL是否过期, 过期则清理
  2. 判断锁升级的必要性
    1. 如果TTL过期, 则在清理函数中已经进行了锁升级
    2. 过期或不存在的hash的情况下, 该次hset为一个新建操作, 需要升级锁
  3. 控制结构键值对的操作:
    1. 如果这次是新建操作, 需要新建整个hash控制结构键值对
    2. 否则, 只需要在value中拼接新的filed的字符串即可
  4. 插入filed的键值对
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
std::string RedisWrapper::redis_hset(const std::string &key,
const std::string &field,
const std::string &value) {
std::shared_lock<std::shared_mutex> rlock(redis_mtx); // 读锁线判断是否过期
bool is_expired = expire_hash_clean(key, rlock);

// 关闭读锁打开写锁
if (!is_expired) {
// 如果没有过期的话, expire_hash_clean 不会升级读锁, 这里需要手动解锁
rlock.unlock();
}
std::unique_lock<std::shared_mutex> lock(redis_mtx); // 写锁

// 更新字段值
std::string field_key = get_hash_filed_key(key, field);
lsm->put(field_key, value);

// 更新字段列表
auto field_list_opt = lsm->get(key);
std::vector<std::string> field_list =
get_fileds_from_hash_value(field_list_opt);

if (std::find(field_list.begin(), field_list.end(), field) ==
field_list.end()) {
// 不存在则添加
field_list.push_back(field);
auto new_value = get_hash_value_from_fields(field_list);
lsm->put(key, new_value);
}

return "+OK\r\n";
}

2.4 hget

这里逻辑就更简单了, 先判断TTL, 不存在或过期就返回, 否则拼接filedkey进行查询即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
std::string RedisWrapper::redis_hget(const std::string &key,
const std::string &field) {
std::shared_lock<std::shared_mutex> rlock(redis_mtx); // 读锁
bool is_expired = expire_hash_clean(key, rlock);

if (is_expired) {
return "$-1\r\n";
}

std::string field_key = get_hash_filed_key(key, field);
auto value_opt = lsm->get(field_key);

if (value_opt.has_value()) {
return "$" + std::to_string(value_opt->size()) + "\r\n" +
value_opt.value() + "\r\n";
} else {
return "$-1\r\n"; // 表示字段不存在
}
}

2.5 hdel

相反, hdel应该是这几个操作中相对复杂的:

  1. 首先按照常规判断TTL, 过期或不存在就返回
  2. 判断是否需要升级锁, 这里逻辑类似之前的代码, 略过
  3. 删除字段值的kv
  4. 修改控制结构中filed结合的value, 如果字段列表为空, 则删除整个 hashkey
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    std::string RedisWrapper::redis_hdel(const std::string &key,
    const std::string &field) {
    std::shared_lock<std::shared_mutex> rlock(redis_mtx); // 读锁线判断是否过期
    bool is_expired = expire_hash_clean(key, rlock);

    if (is_expired) {
    return ":0\r\n";
    }

    // 没有过期的话, expire_hash_clean 不会升级读锁, 这里需要手动解锁
    rlock.unlock();
    std::unique_lock<std::shared_mutex> lock(
    redis_mtx); // 写锁, 如果过期的话, 读锁在 expire_hash_clean 中已经被释放了

    int del_count = 0;
    // 删除字段值
    std::string field_key = get_hash_filed_key(key, field);
    if (this->lsm->get(field_key).has_value()) {
    del_count++;
    this->lsm->remove(field_key);
    }

    // 更新字段列表
    auto field_list_opt = lsm->get(key);
    auto field_list = get_fileds_from_hash_value(field_list_opt);
    auto find_res = std::find(field_list.begin(), field_list.end(), field);
    if (find_res != field_list.end()) {
    // 存在则删除
    field_list.erase(find_res);
    if (field_list.empty()) {
    // 如果字段列表为空, 则删除 key
    lsm->remove(key);
    } else {
    // 否则更新字段列表
    auto new_value = get_hash_value_from_fields(field_list);
    lsm->put(key, new_value);
    }
    }

    return ":" + std::to_string(del_count) + "\r\n";
    }

2.6 hkeys

这个操作就简单了, 直接查询控制结构的key, 解析filed拼接而成的value即可:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
std::string RedisWrapper::redis_hkeys(const std::string &key) {
std::shared_lock<std::shared_mutex> rlock(redis_mtx); // 读锁
bool is_expired = expire_hash_clean(key, rlock);

if (is_expired) {
return "*0\r\n";
}

auto field_list_opt = lsm->get(key);
std::vector<std::string> fields;
auto res_vec = get_fileds_from_hash_value(field_list_opt);

std::string res_str = "*";
res_str += std::to_string(res_vec.size()) + "\r\n";
for (const auto &field : res_vec) {
res_str += "$" + std::to_string(field.size()) + "\r\n" + field + "\r\n";
}

return res_str;
}

3 测试

这里我们其实还需要在muduo搭建的服务器代码中接入这些api的处理函数的分发, 以及校验参数合理性的操作, 但逻辑基本上都是简单重复且繁琐的操作, 这里拿上来也没有什么意义, 这里就直接看代码吧: https://github.com/ToniXWD/toni-lsm/tree/master/server

最后我们测试一下我们的实现:

在一个terminal中运行我们的服务器:

1
2
xmake
xmake run server

在另一个terminal中运行我们的redis-cli客户端:

1
redis-cli -h 127.0.0.1 -p 6379

然后测试下我们的listhash的接口:

example-list-hash

嗯嗯, 还挺有成就感的

4 小节

这一小节介绍了 hashlist 的实现, 后续章节将继续介绍 set, zset 的实现, 这两个会更复杂一些.
为了让大家学习中更有成就感, 我们实现支持替代redis-server接受处理redis-cli的请求, 还是很有意义的, 相信写在简历上作为春招或者秋招的项目也还是有一定竞争力的.