这一小节我们基于已有的LSM Tree
接口, 来设计一个兼容Redis
协议的服务层, 使其能替代redis-server
处理redis-cli
的请求.
代码仓库:https://github.com/ToniXWD/toni-lsm
欢迎点个Star
1 Resp协议简介
这里就不再对Redis
本身的基础概念进行介绍了, 毕竟Redis
是校招八股必背知识点, 大家想必都非常熟悉了. 但大多数朋友可能对Redis
通信的Resp
协议完全不熟悉, 这里简单介绍一下Resp
协议.
Redis的RESP(REdis Serialization Protocol)是Redis客户端与服务器之间通信的协议, 也就是redis-cli
和redis-server
之间进行通信的协议,它简单、高效,支持多种数据类型。因此, 其只需要描述Redis
中的数据类型和请求类型就可以了。
需要说明的是, Resp
协议应该属于TCP
这一层的协议, 其没有HTTP
等协议的头, 实现时我们也不需要http
层的框架
1.1 一个简单的案例
假设你在 redis-cli
中输入了以下命令:
redis-cli
客户端会将该命令转换为 RESP
协议格式并发送给 Redis
服务器。具体表示如下:
这里其实显式地表达了换行符\r\n
, 真实的内容是: *3\r\n$3\r\nSET\r\n$2\r\nk1\r\n$2\r\nv1\r\n
解释:
*3
:表示这是一个包含 3 个元素的数组。
$3
:表示第一个元素是一个长度为 3 的批量字符串(Bulk String),内容为 SET
。
$2
:表示第二个元素是一个长度为 2 的批量字符串,内容为 k1
。
$2
:表示第三个元素是一个长度为 2 的批量字符串,内容为 v1
。
\r\n
为不同字段之间的分隔符, 且不计入长度
Redis
服务器接收到上述请求后,执行 SET k1 v1
操作,并返回响应。例如:
解释:
+OK
:表示这是一个简单字符串(Simple String),值为 OK
,表示操作成功。
如果客户端发送的命令或参数有误,Redis
服务器可能会返回错误信息。例如:
解释:
-ERR
:表示这是一个错误消息(Error),内容为 syntax error
。
1.2 数据类型语法
通过之前的案例我们可以看到, RESP
使用一些符号来对数据类型进行标记, 这里简单总结如下:
- 简单字符串(Simple Strings):以”+”开头,如
+OK\r\n
。
- 错误(Errors):以”-“开头,如
-ERR unknown command\r\n
。
- 整数(Integers):以”:”开头,如
:1000\r\n
。
- 批量字符串(Bulk Strings):以”$”开头,如
$6\r\nfoobar\r\n
。
- 数组(Arrays):以”*”开头,如
*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n
。
2 Redis实现思路
这里我们将利用自身的LSM Tree
接口来设计一个兼容Redis
协议的服务层, 使其能替代redis-server
处理redis-cli
的请求.
首先我们回顾一下我们的LSM Tree
接口支持什么api:
Put(key, value)
: 将键值对插入到数据库中。
Get(key)
: 根据键获取对应的值。
Delete(key)
: 根据键删除对应的值。
Scan(start_key, end_key)
: 根据起始键和结束键范围获取键值对。(就是上一章实现的谓词查询, 这里的Scan
是一个虚拟的接口)
然后我们想一下Redsis
的不同数据结构的接口
- 字符串(String):和
LSM Tree
一样, 我们也只需要实现Put(key, value)
和Get(key)
即可
- 列表(List):相当于不同字符串之间有连接
- 哈希(Hash):一个哈希的
key
由很多个filed
即value
组成
- 集合(Set): 一个集合的
key
由很多个member
, 但不需要排序
- 有序集合(Sorted Set): 集合的
key
由很多个member
和score
组成, 并且需要按照score
排序
同时很多key
还有一些基础属性, 最常用的就是TTL
(过期时间), 当TTL
过期时, 该key
将不再存在, 我们也可以通过TTL
来判断一个key
是否过期, 也可以使用EXPIRE
来设置一个key
的过期时间
其实本质上, 由于我们的存储引擎是KV
存储, 我们的哈希的所有数据都将作为基础的key
和value
进行存储, 这与Redis
中的字符串
是一致的。而List
, Set
, Sorted Set
等数据结构就需要将多对key
进行组合, 并且需要根据一定的规则进行排序, 而且回想我的LSM Tree
接口支持的api, 核心思路就只有2类:
List
, Set
, Sorted Set
等数据结构的key
的value
中要记录所管理成员的元信息
- 归属于某个大数据结构(
List
, Set
, Sorted Set
等)的成员的key
需要包含统一的前缀, 这样才能通过我的的存储引擎进行前缀查询
接下来, 本小节将基于上述的思路首先介绍Redis
基础字符串(这一部分大概分2-3章吧, 内容有点多)
3 代码实现
3.1 基础封装
我们新建一个include/redis_wrapper/redis_wrapper.h
和src/redis_wrapper/redis_wrapper.cpp
文件来对我们的存储引擎进行封装:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
|
class RedisWrapper { private: std::unique_ptr<LSM> lsm; std::shared_mutex redis_mtx;
std::string set(std::vector<std::string> &args); std::string get(std::vector<std::string> &args); std::string incr(std::vector<std::string> &args); std::string decr(std::vector<std::string> &args); std::string expire(std::vector<std::string> &args); std::string del(std::vector<std::string> &args); std::string ttl(std::vector<std::string> &args);
std::string redis_incr(const std::string &key); std::string redis_decr(const std::string &key); std::string redis_expire(const std::string &key, std::string seconds_count); std::string redis_set(std::string &key, std::string &value); std::string redis_get(std::string &key); std::string redis_del(std::vector<std::string> &args); std::string redis_ttl(std::string &key);
... }
|
这里以redis_set
为例, 我们很容易写出下面的代码:
1 2 3 4 5
| std::string RedisWrapper::redis_set(std::string &key, std::string &value) { std::unique_lock<std::shared_mutex> lock(redis_mtx); this->lsm->put(key, value); return "+OK\r\n"; }
|
但这样会有一个问题, 如果这个key
原来就存在, 且是有ttl
的呢? Redis
默认的 SET key value
命令对已存在的 key
进行设置,Redis
会覆盖该 key
的值,并移除原有的 TTL
,使该 key
成为一个持久化的 key
(永不过期)。但上述代码则缺乏对TTL
的考虑, 因此我们需要引入TTL
。
3.2 TTL设计
首先我们的每个key
都可能伴随着一个过期时间, 这个过期时间我们只需要为其加上一个固定的前缀, 就可以与真实的key
关联起来, 称为expire_key
, 同时其value
直接为时间戳即可,如果这个key
对应的expire_key
不存在, 则表示该key
没有过期时间.
另一方面, 原本的Redis
支持后台线程定期清理
和 查询时延迟检查清理
2种检查TTL
的方式, 这里为了实现简单, 我们选择查询时延迟检查清理
的方式, 在后续查询或写入时检查expire_key
的value
, 如果发现value
小于当前时间戳, 则进行相应的操作.
3.3 考虑了TTL的set/get
所以这里我们首选需要对set
进行如下修改:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| inline std::string get_explire_key(const std::string &key) { return REDIS_EXPIRE_HEADER + key; }
std::string RedisWrapper::redis_set(std::string &key, std::string &value) { std::unique_lock<std::shared_mutex> lock(redis_mtx); this->lsm->put(key, value); std::string expire_key = get_explire_key(key); if (this->lsm->get(expire_key).has_value()) { this->lsm->remove(expire_key); } return "+OK\r\n"; }
|
在get
时, 也需要检查TTL
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| std::string RedisWrapper::redis_get(std::string &key) { std::shared_lock<std::shared_mutex> rlock(redis_mtx);
auto key_query = this->lsm->get(key);
std::string expire_key = get_explire_key(key); auto expire_query = this->lsm->get(expire_key);
if (key_query.has_value()) { if (expire_query.has_value()) {
if (is_expired(expire_query, nullptr)) { rlock.unlock(); std::unique_lock<std::shared_mutex> wlock(redis_mtx); this->lsm->remove(key); this->lsm->remove(expire_key); return "$-1\r\n"; } else { return "$" + std::to_string(key_query.value().size()) + "\r\n" + key_query.value() + "\r\n"; } } else { return "$" + std::to_string(key_query.value().size()) + "\r\n" + key_query.value() + "\r\n"; } } else { if (expire_query.has_value()) { rlock.unlock(); std::unique_lock<std::shared_mutex> wlock(redis_mtx); this->lsm->remove(expire_key); } } return "$-1\r\n"; }
|
这里还有一个易错点, 我们检查TTL
和查询key
都是读操作, 因此获取的是读锁, 但我们在由于TTL
超时导致key
不存在时, 需要清理expire_key
和key
, 这需要我们进行写锁的升级, 需要先释放读锁, 然获取写锁.
而且get
操作比set
要复杂, 具体的流程代码里面的注释很清楚了, 就不多说了
3.4 TTL/EXPIRE
这里的expire
操作就是简单插入一个expire_key
和value
即可, value
为当前时间戳加上seconds_count
1 2 3 4 5 6 7 8 9 10 11 12
| std::string RedisWrapper::redis_expire(const std::string &key, std::string seconds_count) { std::unique_lock<std::shared_mutex> lock(redis_mtx); std::string expire_key = get_explire_key(key);
auto expire_time_str = get_expire_time(seconds_count);
this->lsm->put(expire_key, expire_time_str);
return ":1\r\n"; }
|
TTL
则是直接获取expire_key
的value
即可, 这里需要注意的是, 如果expire_key
不存在, 则表示该key
没有过期时间, 因此返回-1
即可, 当expire_key
存在, 则需要判断是否过期, 如果过期, 则返回-2
表示key
不存在, 如果没有过期, 则返回剩余的过期时间即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| std::string RedisWrapper::redis_ttl(std::string &key) { std::shared_lock<std::shared_mutex> lock(redis_mtx);
auto key_query = this->lsm->get(key);
std::string expire_key = get_explire_key(key); auto expire_query = this->lsm->get(expire_key);
if (key_query.has_value()) { if (expire_query.has_value()) { std::time_t now_time_t; if (is_expired(expire_query, &now_time_t)) { return ":-2\r\n"; } else { auto now = std::chrono::system_clock::now(); return ":" + std::to_string(std::stoll(expire_query.value()) - now_time_t) + "\r\n"; } } else { return ":-1\r\n"; } } else { return ":-1\r\n"; } }
|
其他的常规接口, 如incr
, del
等很简单, 逻辑也都差不多, 就不展开介绍了, 可以看源码
4 后端服务器实现
完成了基础的封装后, 我们需要设置一个网络服务器来接受redis-cli
的请求, 这里我选择muduo
网络库进行网络层的封装, 服务器的代码在server
文件夹下
4.1 添加依赖
这里建议直接使用xmake
提供的muduo
库, 并定义server
依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13
| add_requires("muduo")
target("server") set_kind("binary") add_files("server/src/*.cpp") add_deps("redis") add_includedirs("include", {public = true}) add_packages("muduo") set_targetdir("$(buildir)/bin")
|
如果不想使用xmake
提供的muduo
库, 也可以手动添加依赖, 具体可以参考muduo
的文档: https://github.com/chenshuo/muduo-tutorial
4.2 Muduo 网络层封装
这里直接用代码简单介绍下muduo
的使用:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138
| #include "../../include/redis_wrapper/redis_wrapper.h" #include "../include/handler.h" #include <cstddef> #include <iostream> #include <muduo/base/Logging.h> #include <muduo/net/EventLoop.h> #include <muduo/net/InetAddress.h> #include <muduo/net/TcpConnection.h> #include <muduo/net/TcpServer.h> #include <string> #include <unordered_map> #include <vector>
using namespace muduo; using namespace muduo::net;
class RedisServer { public: RedisServer(EventLoop *loop, const InetAddress &listenAddr) : server_(loop, listenAddr, "RedisServer"), redis("example_db") { server_.setConnectionCallback( std::bind(&RedisServer::onConnection, this, std::placeholders::_1)); server_.setMessageCallback( std::bind(&RedisServer::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)); }
void start() { server_.start(); }
void onConnection(const TcpConnectionPtr &conn) { if (conn->connected()) { LOG_INFO << "Connection from " << conn->peerAddress().toIpPort(); } else { LOG_INFO << "Connection closed from " << conn->peerAddress().toIpPort(); } }
void onMessage(const TcpConnectionPtr &conn, Buffer *buf, Timestamp time) { std::string msg(buf->retrieveAllAsString()); LOG_INFO << "Received message at " << time.toString() << ":\n" << msg;
std::string response = handleRequest(msg); conn->send(response); }
std::string handleRequest(const std::string &request) { size_t pos = 0;
if (request.empty() || request[pos] != '*') { return "-ERR Protocol error: expected '*'\r\n"; }
int numElements = 0; try { numElements = std::stoi(request.substr(pos + 1)); } catch (const std::exception &) { return "-ERR Protocol error: invalid number of elements\r\n"; } pos = request.find('\n', pos) + 1;
LOG_INFO << "request: " << request << '\n'; LOG_INFO << "Number of elements: " << numElements << '\n';
std::vector<std::string> args;
for (int i = 0; i < numElements; ++i) { if (pos >= request.size() || request[pos] != '$') { LOG_INFO << "pos = " << pos << ", i = " << i << ", last args = " << args.back() << '\n'; LOG_INFO << "-ERR Protocol error: expected '$'\r\n"; return "-ERR Protocol error: expected '$'\r\n"; }
int len = 0; std::string value_len; int next_n_pos; try { next_n_pos = request.find('\n', pos); len = std::stoi(request.substr(pos + 1)); } catch (const std::exception &) { LOG_INFO << "-ERR Protocol error: invalid bulk string length\r\n"; return "-ERR Protocol error: invalid bulk string length\r\n"; } pos = next_n_pos + 1; if (pos + len > request.size()) { LOG_INFO << "-ERR Protocol error: bulk string length exceeds request " "size\r\n"; return "-ERR Protocol error: bulk string length exceeds request " "size\r\n"; } args.push_back(request.substr(pos, len)); next_n_pos = request.find('\n', pos); pos = next_n_pos + 1; }
LOG_INFO << "Request: "; for (const auto &arg : args) { LOG_INFO << arg << " "; } LOG_INFO << '\n';
switch (string2Ops(args[0])) { return save_handler(redis); case OPS::SET: return set_handler(args, redis); case OPS::GET: return get_handler(args, redis); case OPS::DEL: return del_handler(args, redis); case OPS::INCR: return incr_handler(args, redis); case OPS::DECR: return decr_handler(args, redis); case OPS::EXPIRE: return expire_handler(args, redis); case OPS::TTL: return ttl_handler(args, redis); case OPS::HSET: } }
TcpServer server_; RedisWrapper redis; };
int main() { EventLoop loop; InetAddress listenAddr(6379); RedisServer server(&loop, listenAddr);
server.start(); loop.loop(); }
|
这里的handleRequest
就是处理Resp
协议的函数, 其目前只支持一些基础指令的解析, 这里的解析也是比较繁琐的, 不好说清楚, 有兴趣直接单步调试看看吧, 这里的RedisWrapper
就是我们之前实现的Redis
的封装, 这里的RedisWrapper
就是我们之前实现的LSM Tree Engine
的封装。这里重点介绍muduo
的使用:
muduo
是一个基于 Reactor 模式的 C++ 网络库,由陈硕开发,广泛应用于高性能网络编程。它提供了简单易用的接口,能够帮助开发者快速构建高性能的 TCP 服务器。以下是如何使用 muduo
的基本步骤,基于你提供的代码进行说明。
引入 muduo 头文件
1 2 3 4 5
| #include <muduo/base/Logging.h> #include <muduo/net/EventLoop.h> #include <muduo/net/InetAddress.h> #include <muduo/net/TcpConnection.h> #include <muduo/net/TcpServer.h>
|
这些头文件分别提供了日志、事件循环、网络地址、TCP 连接和 TCP 服务器的功能。
创建 EventLoop
EventLoop
是 muduo
的核心组件,负责事件循环和事件分发。每个 muduo
服务器都需要一个 EventLoop
对象。
loop.loop()
会启动事件循环,等待事件的发生。
定义服务器地址
使用 InetAddress
来指定服务器的监听地址和端口。例如,监听本地 6379 端口:
1
| InetAddress listenAddr(6379);
|
创建 TcpServer
TcpServer
是 muduo
提供的 TCP 服务器类。需要传入 EventLoop
和监听地址来初始化它:
1
| TcpServer server(&loop, listenAddr, "RedisServer");
|
- 第一个参数是
EventLoop
对象。
- 第二个参数是
InetAddress
,表示服务器的监听地址。
- 第三个参数是服务器的名称(可选)。
这里的TcpServer
放入了RedisServer
类中,并在类的构造函数中初始化。
设置回调函数
muduo
是一个事件驱动的网络库,通过回调函数处理连接和消息事件。你需要为 TcpServer
设置以下回调函数:
连接回调
当有客户端连接或断开时,会触发连接回调函数:
1 2
| server.setConnectionCallback( std::bind(&RedisServer::onConnection, this, std::placeholders::_1));
|
onConnection
是一个成员函数,用于处理连接事件。
std::placeholders::_1
表示回调函数的第一个参数(TcpConnectionPtr
)。
消息回调
当接收到客户端发送的数据时,会触发消息回调函数:
1 2 3
| server.setMessageCallback( std::bind(&RedisServer::onMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));
|
onMessage
是一个成员函数,用于处理接收到的消息。
std::placeholders::_1
是 TcpConnectionPtr
,表示连接对象。
std::placeholders::_2
是 Buffer*
,表示接收到的数据缓冲区。
std::placeholders::_3
是 Timestamp
,表示消息到达的时间。
实现回调函数
需要实现 onConnection
和 onMessage
回调函数。
(1) onConnection处理连接事件:
1 2 3 4 5 6 7
| void onConnection(const TcpConnectionPtr &conn) { if (conn->connected()) { LOG_INFO << "Connection from " << conn->peerAddress().toIpPort(); } else { LOG_INFO << "Connection closed from " << conn->peerAddress().toIpPort(); } }
|
conn->connected()
判断连接是建立还是断开。
conn->peerAddress().toIpPort()
获取客户端的 IP 和端口。
(2) onMessage处理接收到的消息:
1 2 3 4 5 6 7 8
| void onMessage(const TcpConnectionPtr &conn, Buffer *buf, Timestamp time) { std::string msg(buf->retrieveAllAsString()); LOG_INFO << "Received message at " << time.toString() << ":\n" << msg;
std::string response = handleRequest(msg); conn->send(response); }
|
buf->retrieveAllAsString()
从缓冲区中提取所有数据并转换为字符串。
conn->send(response)
将响应发送给客户端。
启动服务器
调用 TcpServer::start()
启动服务器,然后进入事件循环:
1 2
| server.start(); loop.loop();
|
日志功能
muduo
提供了内置的日志功能,可以通过 LOG_INFO
、LOG_ERROR
等宏记录日志:
1
| LOG_INFO << "Received message at " << time.toString() << ":\n" << msg;
|
5 小节
这章引入了一个简单的Redis
服务器和基于Redis
对LSM Tree
的封装,Redis
服务器和使用muduo
库来处理TCP连接和消息; 封装的Redis_wrapper
实现了一些基本的Redis
命令,如GET、SET、DEL
等。由于这一章还介绍了Resp
和muduo
, Redis
命令的实现只包括了简单字符串和TTL
, 后续章节将视线更复杂的set
, hash
, list
, zset
等命令。