C++从零开始实现LSM-Tree-KV存储-10-Redis兼容1

这一小节我们基于已有的LSM Tree接口, 来设计一个兼容Redis协议的服务层, 使其能替代redis-server处理redis-cli的请求.

代码仓库:https://github.com/ToniXWD/toni-lsm

欢迎点个Star

1 Resp协议简介

这里就不再对Redis本身的基础概念进行介绍了, 毕竟Redis是校招八股必背知识点, 大家想必都非常熟悉了. 但大多数朋友可能对Redis通信的Resp协议完全不熟悉, 这里简单介绍一下Resp协议.

Redis的RESP(REdis Serialization Protocol)是Redis客户端与服务器之间通信的协议, 也就是redis-cliredis-server之间进行通信的协议,它简单、高效,支持多种数据类型。因此, 其只需要描述Redis中的数据类型和请求类型就可以了。

需要说明的是, Resp协议应该属于TCP这一层的协议, 其没有HTTP等协议的头, 实现时我们也不需要http层的框架

1.1 一个简单的案例

假设你在 redis-cli 中输入了以下命令:

1
SET k1 v1

redis-cli客户端会将该命令转换为 RESP 协议格式并发送给 Redis 服务器。具体表示如下:

1
2
3
4
5
6
7
*3
$3
SET
$2
k1
$2
v1

这里其实显式地表达了换行符\r\n, 真实的内容是: *3\r\n$3\r\nSET\r\n$2\r\nk1\r\n$2\r\nv1\r\n

解释:

  • *3:表示这是一个包含 3 个元素的数组。
  • $3:表示第一个元素是一个长度为 3 的批量字符串(Bulk String),内容为 SET
  • $2:表示第二个元素是一个长度为 2 的批量字符串,内容为 k1
  • $2:表示第三个元素是一个长度为 2 的批量字符串,内容为 v1

\r\n为不同字段之间的分隔符, 且不计入长度

Redis 服务器接收到上述请求后,执行 SET k1 v1 操作,并返回响应。例如:

1
+OK

解释:

  • +OK:表示这是一个简单字符串(Simple String),值为 OK,表示操作成功。

如果客户端发送的命令或参数有误,Redis 服务器可能会返回错误信息。例如:

1
-ERR syntax error

解释:

  • -ERR:表示这是一个错误消息(Error),内容为 syntax error

1.2 数据类型语法

通过之前的案例我们可以看到, RESP使用一些符号来对数据类型进行标记, 这里简单总结如下:

  • 简单字符串(Simple Strings):以”+”开头,如+OK\r\n
  • 错误(Errors):以”-“开头,如-ERR unknown command\r\n
  • 整数(Integers):以”:”开头,如:1000\r\n
  • 批量字符串(Bulk Strings):以”$”开头,如$6\r\nfoobar\r\n
  • 数组(Arrays):以”*”开头,如*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n

2 Redis实现思路

这里我们将利用自身的LSM Tree接口来设计一个兼容Redis协议的服务层, 使其能替代redis-server处理redis-cli的请求.

首先我们回顾一下我们的LSM Tree接口支持什么api:

  1. Put(key, value): 将键值对插入到数据库中。
  2. Get(key): 根据键获取对应的值。
  3. Delete(key): 根据键删除对应的值。
  4. Scan(start_key, end_key): 根据起始键和结束键范围获取键值对。(就是上一章实现的谓词查询, 这里的Scan是一个虚拟的接口)

然后我们想一下Redsis的不同数据结构的接口

  1. 字符串(String):和LSM Tree一样, 我们也只需要实现Put(key, value)Get(key)即可
  2. 列表(List):相当于不同字符串之间有连接
  3. 哈希(Hash):一个哈希的key由很多个filedvalue组成
  4. 集合(Set): 一个集合的key由很多个member, 但不需要排序
  5. 有序集合(Sorted Set): 集合的key由很多个memberscore组成, 并且需要按照score排序

同时很多key还有一些基础属性, 最常用的就是TTL(过期时间), 当TTL过期时, 该key将不再存在, 我们也可以通过TTL来判断一个key是否过期, 也可以使用EXPIRE来设置一个key的过期时间

其实本质上, 由于我们的存储引擎是KV存储, 我们的哈希的所有数据都将作为基础的keyvalue进行存储, 这与Redis中的字符串是一致的。而List, Set, Sorted Set等数据结构就需要将多对key进行组合, 并且需要根据一定的规则进行排序, 而且回想我的LSM Tree接口支持的api, 核心思路就只有2类:

  1. List, Set, Sorted Set等数据结构的keyvalue中要记录所管理成员的元信息
  2. 归属于某个大数据结构(List, Set, Sorted Set等)的成员的key需要包含统一的前缀, 这样才能通过我的的存储引擎进行前缀查询

接下来, 本小节将基于上述的思路首先介绍Redis基础字符串(这一部分大概分2-3章吧, 内容有点多)

3 代码实现

3.1 基础封装

我们新建一个include/redis_wrapper/redis_wrapper.hsrc/redis_wrapper/redis_wrapper.cpp文件来对我们的存储引擎进行封装:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// include/redis_wrapper/redis_wrapper.h

class RedisWrapper {
private:
std::unique_ptr<LSM> lsm;
std::shared_mutex redis_mtx;

// 参数校验部分
std::string set(std::vector<std::string> &args);
std::string get(std::vector<std::string> &args);
std::string incr(std::vector<std::string> &args);
std::string decr(std::vector<std::string> &args);
std::string expire(std::vector<std::string> &args);
std::string del(std::vector<std::string> &args);
std::string ttl(std::vector<std::string> &args);

// 实际执行部分
std::string redis_incr(const std::string &key);
std::string redis_decr(const std::string &key);
std::string redis_expire(const std::string &key, std::string seconds_count);
std::string redis_set(std::string &key, std::string &value);
std::string redis_get(std::string &key);
std::string redis_del(std::vector<std::string> &args);
std::string redis_ttl(std::string &key);

...
}

这里以redis_set为例, 我们很容易写出下面的代码:

1
2
3
4
5
std::string RedisWrapper::redis_set(std::string &key, std::string &value) {
std::unique_lock<std::shared_mutex> lock(redis_mtx); // 写锁
this->lsm->put(key, value);
return "+OK\r\n";
}

但这样会有一个问题, 如果这个key原来就存在, 且是有ttl的呢? Redis默认的 SET key value 命令对已存在的 key 进行设置,Redis 会覆盖该 key 的值,并移除原有的 TTL,使该 key 成为一个持久化的 key(永不过期)。但上述代码则缺乏对TTL的考虑, 因此我们需要引入TTL

3.2 TTL设计

首先我们的每个key都可能伴随着一个过期时间, 这个过期时间我们只需要为其加上一个固定的前缀, 就可以与真实的key关联起来, 称为expire_key, 同时其value直接为时间戳即可,如果这个key对应的expire_key不存在, 则表示该key没有过期时间.

另一方面, 原本的Redis支持后台线程定期清理查询时延迟检查清理 2种检查TTL的方式, 这里为了实现简单, 我们选择查询时延迟检查清理的方式, 在后续查询或写入时检查expire_keyvalue, 如果发现value小于当前时间戳, 则进行相应的操作.

3.3 考虑了TTL的set/get

所以这里我们首选需要对set进行如下修改:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
inline std::string get_explire_key(const std::string &key) {
return REDIS_EXPIRE_HEADER + key;
}

std::string RedisWrapper::redis_set(std::string &key, std::string &value) {
std::unique_lock<std::shared_mutex> lock(redis_mtx); // 写锁
this->lsm->put(key, value);
// 同时如果设置有过期时间, 需要删除过期时间
std::string expire_key = get_explire_key(key);
if (this->lsm->get(expire_key).has_value()) {
this->lsm->remove(expire_key);
}
return "+OK\r\n";
}

get时, 也需要检查TTL:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
std::string RedisWrapper::redis_get(std::string &key) {
std::shared_lock<std::shared_mutex> rlock(redis_mtx); // 读锁

auto key_query = this->lsm->get(key);

std::string expire_key = get_explire_key(key);
auto expire_query = this->lsm->get(expire_key);

if (key_query.has_value()) {
// 还需要检测 TTL
if (expire_query.has_value()) {

// 检查是否过期
if (is_expired(expire_query, nullptr)) {
// 过期了, 先进行锁升级, 再清理数据
rlock.unlock(); // 解锁读锁
std::unique_lock<std::shared_mutex> wlock(redis_mtx); // 写锁
this->lsm->remove(key);
this->lsm->remove(expire_key);
return "$-1\r\n";
} else {
// 没有过期
return "$" + std::to_string(key_query.value().size()) + "\r\n" +
key_query.value() + "\r\n";
}
} else {
// 没有设置过期时间, 直接返回
return "$" + std::to_string(key_query.value().size()) + "\r\n" +
key_query.value() + "\r\n";
}
} else {
// key 不存在, 有必要的话清理 expire_key
if (expire_query.has_value()) {
// 先进行锁升级, 再清理数据
rlock.unlock(); // 解锁读锁
std::unique_lock<std::shared_mutex> wlock(redis_mtx); // 写锁
this->lsm->remove(expire_key);
}
}
return "$-1\r\n"; // 表示键不存在
}

这里还有一个易错点, 我们检查TTL和查询key都是读操作, 因此获取的是读锁, 但我们在由于TTL超时导致key不存在时, 需要清理expire_keykey, 这需要我们进行写锁的升级, 需要先释放读锁, 然获取写锁.

而且get操作比set要复杂, 具体的流程代码里面的注释很清楚了, 就不多说了

3.4 TTL/EXPIRE

这里的expire操作就是简单插入一个expire_keyvalue即可, value为当前时间戳加上seconds_count

1
2
3
4
5
6
7
8
9
10
11
12
std::string RedisWrapper::redis_expire(const std::string &key,
std::string seconds_count) {
std::unique_lock<std::shared_mutex> lock(redis_mtx); // 写锁
std::string expire_key = get_explire_key(key);

auto expire_time_str = get_expire_time(seconds_count);

// 存储过期时间
this->lsm->put(expire_key, expire_time_str);

return ":1\r\n";
}

TTL则是直接获取expire_keyvalue即可, 这里需要注意的是, 如果expire_key不存在, 则表示该key没有过期时间, 因此返回-1即可, 当expire_key存在, 则需要判断是否过期, 如果过期, 则返回-2表示key不存在, 如果没有过期, 则返回剩余的过期时间即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
std::string RedisWrapper::redis_ttl(std::string &key) {
std::shared_lock<std::shared_mutex> lock(redis_mtx); // 读锁

auto key_query = this->lsm->get(key);

std::string expire_key = get_explire_key(key);
auto expire_query = this->lsm->get(expire_key);

if (key_query.has_value()) {
// key 存在, 判断是否过期
if (expire_query.has_value()) {
std::time_t now_time_t;
// 检查是否过期
if (is_expired(expire_query, &now_time_t)) {
// 过期了, key不存在
// 过期了也不删除, ttl这里设计为只读, 删除在之后进行
// -2 表示 key 不存在
return ":-2\r\n";
} else {
// 没有过期
auto now = std::chrono::system_clock::now();
return ":" +
std::to_string(std::stoll(expire_query.value()) - now_time_t) +
"\r\n";
}
} else {
// 没有设置过期时间, 返回 -1
return ":-1\r\n";
}
} else {
// key 不存在
return ":-1\r\n";
}
}

其他的常规接口, 如incr, del等很简单, 逻辑也都差不多, 就不展开介绍了, 可以看源码

4 后端服务器实现

完成了基础的封装后, 我们需要设置一个网络服务器来接受redis-cli的请求, 这里我选择muduo网络库进行网络层的封装, 服务器的代码在server文件夹下

4.1 添加依赖

这里建议直接使用xmake提供的muduo库, 并定义server依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
-- 添加Muduo库
add_requires("muduo")

-- ...

-- 定义server
target("server")
set_kind("binary")
add_files("server/src/*.cpp")
add_deps("redis")
add_includedirs("include", {public = true})
add_packages("muduo") -- 添加muduo依赖
set_targetdir("$(buildir)/bin")

如果不想使用xmake提供的muduo库, 也可以手动添加依赖, 具体可以参考muduo的文档: https://github.com/chenshuo/muduo-tutorial

4.2 Muduo 网络层封装

这里直接用代码简单介绍下muduo的使用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#include "../../include/redis_wrapper/redis_wrapper.h"
#include "../include/handler.h"
#include <cstddef>
#include <iostream>
#include <muduo/base/Logging.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/net/TcpServer.h>
#include <string>
#include <unordered_map>
#include <vector>

using namespace muduo;
using namespace muduo::net;

class RedisServer {
public:
RedisServer(EventLoop *loop, const InetAddress &listenAddr)
: server_(loop, listenAddr, "RedisServer"), redis("example_db") {
server_.setConnectionCallback(
std::bind(&RedisServer::onConnection, this, std::placeholders::_1));
server_.setMessageCallback(
std::bind(&RedisServer::onMessage, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
}

void start() { server_.start(); }

// private:
void onConnection(const TcpConnectionPtr &conn) {
if (conn->connected()) {
LOG_INFO << "Connection from " << conn->peerAddress().toIpPort();
} else {
LOG_INFO << "Connection closed from " << conn->peerAddress().toIpPort();
}
}

void onMessage(const TcpConnectionPtr &conn, Buffer *buf, Timestamp time) {
std::string msg(buf->retrieveAllAsString());
LOG_INFO << "Received message at " << time.toString() << ":\n" << msg;

// 解析并处理请求
std::string response = handleRequest(msg);
conn->send(response);
}

std::string handleRequest(const std::string &request) {
size_t pos = 0;

if (request.empty() || request[pos] != '*') {
return "-ERR Protocol error: expected '*'\r\n";
}

int numElements = 0;
try {
numElements = std::stoi(request.substr(pos + 1)); // 跳过 '*'
} catch (const std::exception &) {
return "-ERR Protocol error: invalid number of elements\r\n";
}
pos = request.find('\n', pos) + 1; // 跳过 '\r\n'

LOG_INFO << "request: " << request << '\n';
LOG_INFO << "Number of elements: " << numElements << '\n';

std::vector<std::string> args;

for (int i = 0; i < numElements; ++i) {
if (pos >= request.size() || request[pos] != '$') {
LOG_INFO << "pos = " << pos << ", i = " << i
<< ", last args = " << args.back() << '\n';
LOG_INFO << "-ERR Protocol error: expected '$'\r\n";
return "-ERR Protocol error: expected '$'\r\n";
}

int len = 0;
std::string value_len;
int next_n_pos;
try {
next_n_pos = request.find('\n', pos);
len = std::stoi(request.substr(pos + 1)); // 跳过 '$'
} catch (const std::exception &) {
LOG_INFO << "-ERR Protocol error: invalid bulk string length\r\n";
return "-ERR Protocol error: invalid bulk string length\r\n";
}
pos = next_n_pos + 1; // 跳过 '$' 值 \r\n
if (pos + len > request.size()) {
LOG_INFO << "-ERR Protocol error: bulk string length exceeds request "
"size\r\n";
return "-ERR Protocol error: bulk string length exceeds request "
"size\r\n";
}
args.push_back(request.substr(pos, len));
next_n_pos = request.find('\n', pos);
pos = next_n_pos + 1; // 跳过数据和/r/n
}

LOG_INFO << "Request: ";
for (const auto &arg : args) {
LOG_INFO << arg << " ";
}
LOG_INFO << '\n';

// 处理命令
switch (string2Ops(args[0])) {
// ...
return save_handler(redis);
case OPS::SET:
return set_handler(args, redis);
case OPS::GET:
return get_handler(args, redis);
case OPS::DEL:
return del_handler(args, redis);
case OPS::INCR:
return incr_handler(args, redis);
case OPS::DECR:
return decr_handler(args, redis);
case OPS::EXPIRE:
return expire_handler(args, redis);
case OPS::TTL:
return ttl_handler(args, redis);
case OPS::HSET:
// ...
}
}

TcpServer server_;
RedisWrapper redis; // 简单的键值存储
};

int main() {
EventLoop loop;
InetAddress listenAddr(6379); // Redis默认端口
RedisServer server(&loop, listenAddr);

server.start();
loop.loop(); // 进入事件循环
}

这里的handleRequest就是处理Resp协议的函数, 其目前只支持一些基础指令的解析, 这里的解析也是比较繁琐的, 不好说清楚, 有兴趣直接单步调试看看吧, 这里的RedisWrapper就是我们之前实现的Redis的封装, 这里的RedisWrapper就是我们之前实现的LSM Tree Engine的封装。这里重点介绍muduo的使用:

muduo 是一个基于 Reactor 模式的 C++ 网络库,由陈硕开发,广泛应用于高性能网络编程。它提供了简单易用的接口,能够帮助开发者快速构建高性能的 TCP 服务器。以下是如何使用 muduo 的基本步骤,基于你提供的代码进行说明。

引入 muduo 头文件

1
2
3
4
5
#include <muduo/base/Logging.h>
#include <muduo/net/EventLoop.h>
#include <muduo/net/InetAddress.h>
#include <muduo/net/TcpConnection.h>
#include <muduo/net/TcpServer.h>

这些头文件分别提供了日志、事件循环、网络地址、TCP 连接和 TCP 服务器的功能。

创建 EventLoop

EventLoopmuduo 的核心组件,负责事件循环和事件分发。每个 muduo 服务器都需要一个 EventLoop 对象。

1
EventLoop loop;

loop.loop() 会启动事件循环,等待事件的发生。

定义服务器地址

使用 InetAddress 来指定服务器的监听地址和端口。例如,监听本地 6379 端口:

1
InetAddress listenAddr(6379);

创建 TcpServer

TcpServermuduo 提供的 TCP 服务器类。需要传入 EventLoop 和监听地址来初始化它:

1
TcpServer server(&loop, listenAddr, "RedisServer");
  • 第一个参数是 EventLoop 对象。
  • 第二个参数是 InetAddress,表示服务器的监听地址。
  • 第三个参数是服务器的名称(可选)。

这里的TcpServer放入了RedisServer类中,并在类的构造函数中初始化。

设置回调函数

muduo 是一个事件驱动的网络库,通过回调函数处理连接和消息事件。你需要为 TcpServer 设置以下回调函数:
连接回调

当有客户端连接或断开时,会触发连接回调函数:

1
2
server.setConnectionCallback(
std::bind(&RedisServer::onConnection, this, std::placeholders::_1));
  • onConnection 是一个成员函数,用于处理连接事件。
  • std::placeholders::_1 表示回调函数的第一个参数(TcpConnectionPtr)。

消息回调
当接收到客户端发送的数据时,会触发消息回调函数:

1
2
3
server.setMessageCallback(
std::bind(&RedisServer::onMessage, this, std::placeholders::_1,
std::placeholders::_2, std::placeholders::_3));
  • onMessage 是一个成员函数,用于处理接收到的消息。
  • std::placeholders::_1TcpConnectionPtr,表示连接对象。
  • std::placeholders::_2Buffer*,表示接收到的数据缓冲区。
  • std::placeholders::_3Timestamp,表示消息到达的时间。

实现回调函数

需要实现 onConnectiononMessage 回调函数。
(1) onConnection处理连接事件:

1
2
3
4
5
6
7
void onConnection(const TcpConnectionPtr &conn) {
if (conn->connected()) {
LOG_INFO << "Connection from " << conn->peerAddress().toIpPort();
} else {
LOG_INFO << "Connection closed from " << conn->peerAddress().toIpPort();
}
}
  • conn->connected() 判断连接是建立还是断开。
  • conn->peerAddress().toIpPort() 获取客户端的 IP 和端口。

(2) onMessage处理接收到的消息:

1
2
3
4
5
6
7
8
void onMessage(const TcpConnectionPtr &conn, Buffer *buf, Timestamp time) {
std::string msg(buf->retrieveAllAsString()); // 从缓冲区中提取数据
LOG_INFO << "Received message at " << time.toString() << ":\n" << msg;

// 处理消息并发送响应
std::string response = handleRequest(msg);
conn->send(response); // 发送响应给客户端
}
  • buf->retrieveAllAsString() 从缓冲区中提取所有数据并转换为字符串。
  • conn->send(response) 将响应发送给客户端。

启动服务器

调用 TcpServer::start() 启动服务器,然后进入事件循环:

1
2
server.start(); // 启动服务器
loop.loop(); // 进入事件循环

日志功能

muduo 提供了内置的日志功能,可以通过 LOG_INFOLOG_ERROR 等宏记录日志:

1
LOG_INFO << "Received message at " << time.toString() << ":\n" << msg;

5 小节

这章引入了一个简单的Redis服务器和基于RedisLSM Tree的封装,Redis服务器和使用muduo库来处理TCP连接和消息; 封装的Redis_wrapper实现了一些基本的Redis命令,如GET、SET、DEL等。由于这一章还介绍了Respmuduo, Redis命令的实现只包括了简单字符串和TTL, 后续章节将视线更复杂的set, hash, list, zset 等命令。