概述
仅仅靠单线程就可以支撑起每秒数万 QPS 的高处理能力,今天重点来看一下Redis 核心网络模块,很多时候网络是限制性能的重要因素。
一、多路复用原理
epoll可以说是老生常谈了,这里就放一张图吧。
性能提升思路很简单,就是让很多的用户连接来复用同一个进(线)程,这就是多路复用。多路指的是许许多多个用户的网络连接。复用指的是对进(线)程的复用。换到牧羊人的例子里,就是一群羊只要一个牧羊人来处理就行了。
不过复用实现起来是需要特殊的 socket 事件管理机制的,最典型和高效的方案就是 epoll。放到牧羊人的例子来,epoll 就相当于一只牧羊犬。
在 epoll 的系列函数里, epoll_create 用于创建一个 epoll 对象,epoll_ctl 用来给 epoll 对象添加或者删除一个 socket。epoll_wait 就是查看它当前管理的这些 socket 上有没有可读可写事件发生。
当网卡上收到数据包后,Linux 内核进行一系列的处理后把数据放到 socket 的接收队列。然后会检查是否有 epoll 在管理它,如果是则在 epoll 的就绪队列中插入一个元素。epoll_wait 的操作就非常的简单了,就是到 epoll 的就绪队列上来查询有没有事件发生就行了。关于 epoll 这只“牧羊犬”的工作原理参见:深入揭秘 epoll 是如何实现 IO 多路复用的 (或者深入理解Linux网络内核这本书)。
在基于 epoll 的编程中,和传统的函数调用思路不同的是,我们并不能主动调用某个 API 来处理。因为无法知道我们想要处理的事件啥时候发生。所以只好提前把想要处理的事件的处理函数注册到一个事件分发器上去。当事件发生的时候,由这个事件分发器调用回调函数进行处理。这类基于实现注册事件分发器的开发模式也叫 Reactor 模型。
二、Redis 服务启动初始化
其中整个 Redis 服务的代码总入口在 src/server.c 文件中,如下。
//file: src/server.c
int main(int argc, char **argv) {
......
// 启动初始化
initServer();
// 运行事件处理循环,一直到服务器关闭为止
aeMain(server.el);
}
整个 Redis 的工作过程,就只需要理解清楚 main 函数中调用的 initServer 和 aeMain 这两个函数就足够了。在 initServer 这个函数内,Redis 做了这么三件重要的事情。
- 创建一个 epoll 对象
- 对配置的监听端口进行 listen
- 把 listen socket 让 epoll 给管理起来
2.1 创建 epoll 对象
Redis 在操作系统提供的 epoll 对象基础上又封装了一个 eventLoop 出来,所以创建的时候是先申请和创建 eventLoop。
//file:src/ae.c
aeEventLoop *aeCreateEventLoop(int setsize) {
aeEventLoop *eventLoop;
eventLoop = zmalloc(sizeof(*eventLoop);
//将来的各种回调事件就都会存在这里
eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
......
aeApiCreate(eventLoop);
return eventLoop;
}
2.2 绑定监听服务端口
Redis 中的 listen 过程,它在 listenToPort 函数中。虽然调用链条很长,但其实主要就是执行了个简单 listen 而已。
//file: src/redis.c
int listenToPort(int port, int *fds, int *count) {
for (j = 0; j < server.bindaddr_count || j == 0; j++) {
fds[*count] = anetTcpServer(server.neterr,port,NULL,
server.tcp_backlog);
}
}
Redis 是支持开启多个端口的,所以在 listenToPort 中我们看到是启用一个循环来调用 anetTcpServer。在 anetTcpServer 中,逐步会展开调用,直到执行到 bind 和 listen 系统调用。
2.3 注册事件回调函数
initServer里面调用 aeCreateEventLoop 创建了 epoll,调用 listenToPort 进行了服务端口的 bind 和 listen。接着就开始调用 aeCreateFileEvent 来注册一个 accept 事件处理器。
//file: src/server.c
void initServer() {
// 2.1.1 创建 epoll
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
// 2.1.2 监听服务端口
listenToPort(server.port,server.ipfd,&server.ipfd_count);
// 2.1.3 注册 accept 事件处理器
for (j = 0; j < server.ipfd_count; j++) {
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL);
}
...
}
调用 aeCreateFileEvent 时传的重要参数是 acceptTcpHandler,它表示将来在 listen socket 上有新用户连接到达的时候,该函数将被调用执行。我们来看 aeCreateFileEvent 具体代码。
//file: src/ae.c
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
// 取出一个文件事件结构
aeFileEvent *fe = &eventLoop->events[fd];
// 监听指定 fd 的指定事件
aeApiAddEvent(eventLoop, fd, mask);
// 设置文件事件类型,以及事件的处理器
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc;
if (mask & AE_WRITABLE) fe->wfileProc = proc;
// 私有数据
fe->clientData = clientData;
}
函数 aeCreateFileEvent 一开始,从 eventLoop->events 获取了一个 aeFileEvent 对象, eventLoop->events 数组,注册的各种事件处理器会保存在这个地方。接下来调用 aeApiAddEvent。这个函数其实就是对 epoll_ctl 的一个封装。主要就是实际执行 epoll_ctl EPOLL_CTL_ADD。
//file:src/ae_epoll.c
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
// add or mod
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
......
// epoll_ctl 添加事件
epoll_ctl(state->epfd,op,fd,&ee);
return 0;
}
每一个 eventLoop->events 元素都指向一个 aeFileEvent 对象。在这个对象上,设置了三个关键东西
- rfileProc:读事件回调
- wfileProc:写事件回调
- clientData:一些额外的扩展数据
当 epoll_wait 发现某个 fd 上有事件发生的时候,这样 redis 首先根据 fd 到 eventLoop->events 中查找 aeFileEvent 对象,然后再看 rfileProc、wfileProc 就可以找到读、写回调处理函数。回头看 initServer 调用 aeCreateFileEvent 时传参来看。
//file: src/server.c
void initServer() {
......
// 2.1.3 注册 accept 事件处理器
for (j = 0; j < server.ipfd_count; j++) {
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE,
acceptTcpHandler,NULL);
}
}
listen fd 对应的读回调函数 rfileProc 事实上就被设置成了 acceptTcpHandler,写回调没有设置,私有数据 client_data 也为 null。
三、Redis 事件处理循环
接下来,Redis 就会进入 aeMain 开始进行真正的用户请求处理了。在 aeMain 函数中,是一个无休止的循环。在每一次的循环中,要做如下几件事情。
- 通过 epoll_wait 发现 listen socket 以及其它连接上的可读、可写事件
- 若发现 listen socket 上有新连接到达,则接收新连接,并追加到 epoll 中进行管理
- 若发现其它 socket 上有命令请求到达,则读取和处理命令,把命令结果写到缓存中,加入写任务队列
- 每一次进入 epoll_wait 前都调用 beforesleep 来将写任务队列中的数据实际进行发送
- 如若有首次未发送完毕的,当写事件发生时继续发送
以上就是 aeMain 函数的核心逻辑所在,接下来我们分别对如上提到的四件事情进行详细的阐述。
3.1 epoll_wait 发现事件
Redis 不管有多少个用户连接,都是通过 epoll_wait 来统一发现和管理其上的可读(包括 liisten socket 上的 accept事件)、可写事件的。甚至连 timer,也都是交给 epoll_wait 来统一管理的。
每当 epoll_wait 发现特定的事件发生的时候,就会调用相应的事先注册好的事件处理函数进行处理。aeProcessEvents 就是调用 epoll_wait 来发现事件。当发现有某个 fd 上事件发生以后,则调为其事先注册的事件处理器函数 rfileProc 和 wfileProc。
3.2 处理新连接请求
我们假设现在有新用户连接到达了。前面在我们看到 listen socket 上的 rfileProc 注册的是 acceptTcpHandler。也就是说,如果有连接到达的时候,会回调到 acceptTcpHandler。
在 acceptTcpHandler 中,主要做了几件事情
- 调用 accept 系统调用把用户连接给接收回来
- 为这个新连接创建一个唯一 redisClient 对象
- 将这个新连接添加到 epoll,并注册一个读事件处理函数
3.3 处理客户连接上的可读事件
现在假设该用户连接有命令到达了,就假设用户发送了GET XXXXXX_KEY
命令。那么在 Redis 的时间循环中调用 epoll_wait 发现该连接上有读时间后,会调用在上一节中讨论的为其注册的读处理函数 readQueryFromClient。
在读处理函数 readQueryFromClient 中主要做了这么几件事情。
- 解析并查找命令
- 调用命令处理
- 添加写任务到队列
- 将输出写到缓存等待发送
3.4 beforesleep 处理写任务队列
次在进入 aeProcessEvents 前都需要先进行 beforesleep 处理。这个函数名字起的怪怪的,但实际上大有用处。
//file:src/ae.c
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
// beforesleep 处理写任务队列并实际发送之
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
该函数处理了许多工作,其中一项便是遍历发送任务队列,并将 client 发送缓存区中的处理结果通过 write 发送到客户端手中。
//file:src/server.c
void beforeSleep(struct aeEventLoop *eventLoop) {
......
handleClientsWithPendingWrites();
}
//file:src/networking.c
int handleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
int processed = listLength(server.clients_pending_write);
//遍历写任务队列 server.clients_pending_write
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
listDelNode(server.clients_pending_write,ln);
//实际将 client 中的结果数据发送出去
writeToClient(c->fd,c,0)
//如果一次发送不完则准备下一次发送
if (clientHasPendingReplies(c)) {
//注册一个写事件处理器,等待 epoll_wait 发现可写后再处理
aeCreateFileEvent(server.el, c->fd, ae_flags,
sendReplyToClient, c);
}
......
}
}
在 handleClientsWithPendingWrites
中,遍历了发送任务队列 server.clients_pending_write
,并调用 writeToClient 进行实际的发送处理。值得注意的是,发送 write 并不总是能一次性发送完的。假如要发送的结果太大,而系统为每个 socket 设置的发送缓存区又是有限的。
在这种情况下,clientHasPendingReplies
判断仍然有未发送完的数据的话,就需要注册一个写事件处理函数到 epoll 上。等待 epoll 发现该 socket 可写的时候再次调用 sendReplyToClient
进行发送。
//file:src/networking.c
int writeToClient(int fd, client *c, int handler_installed) {
while(clientHasPendingReplies(c)) {
// 先发送固定缓冲区
if (c->bufpos > 0) {
nwritten = write(fd,c->buf+c->sentlen,c->bufpos-c->sentlen);
if (nwritten <= 0) break;
......
// 再发送回复链表中数据
} else {
o = listNodeValue(listFirst(c->reply));
nwritten = write(fd, o->buf + c->sentlen, objlen - c->sentlen);
......
}
}
}
writeToClient 中的主要逻辑就是调用 write 系统调用让内核帮其把数据发送出去即可。由于每个命令的处理结果大小是不固定的。所以 Redis 采用的做法用固定的 buf + 可变链表来储存结果字符串。这里自然发送的时候就需要分别对固定缓存区和链表来进行发送了。
四、小结
到了喜闻乐见的小结时间,Redis 服务器端只需要单线程可以达到非常高的处理能力,每秒可以达到数万 QPS 的高处理能力。如此高性能的程序其实就是对 Linux 提供的多路复用机制 epoll 的一个较为完美的运用。
在 Redis 源码中,核心逻辑其实就是两个,一个是 initServer 启动服务,另外一个就是 aeMain 事件循环。在 initServer 这个函数内,Redis 做了这么三件重要的事情。
- 创建一个 epoll 对象
- 对配置的监听端口进行 listen
- 把 listen socket 让 epoll 给管理起来
在 aeMain 函数中,是一个无休止的循环,它是 Redis 中最重要的部分。在每一次的循环中,要做的事情可以总结为如下图。
- 通过 epoll_wait 发现 listen socket 以及其它连接上的可读、可写事件
- 若发现 listen socket 上有新连接到达,则接收新连接,并追加到 epoll 中进行管理
- 若发现其它 socket 上有命令请求到达,则读取和处理命令,把命令结果写到缓存中,加入写任务队列
- 每一次进入 epoll_wait 前都调用 beforesleep 来将写任务队列中的数据实际进行发送
其实事件分发器还处理了一个不明显的逻辑,那就是如果 beforesleep 在将结果写回给客户端的时候,如果由于内核 socket 发送缓存区过小而导致不能一次发送完毕的时候,也会注册一个写事件处理器。等到 epoll_wait 发现对应的 socket 可写的时候,再执行 write 写处理。