深入理解Linux网络学习笔记(三)


概述

说实话,这章我没太看懂,可能还是需要反复多次阅读。

内核和用户进程协作之epoll

IO多路复用机制可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。这里的复用指的是对进程的复用,在Linux上多路复用方案有select、poll、epoll。它们三个中的epoll的性能表现是最优秀的,能支持的并发量也最大,epoll的简单示例如下:

int main() {
    listen(lfd, ...);
    cfd1 = accept(...);
    cfd2 = accept(...);
    efd = epoll_create(...);

    epoll_ctl(efd, EPOLL_CTL_ADD, cfd1, ...);
    epoll_ctl(efd, EPOLL_CTL_ADD, cfd2, ...);
    epoll_wait(efd, ...);
}

其中和epoll相关的函数是如下三个:

  • epoll_create:创建一个epoll对象
  • epoll_ctl:向epoll对象添加要管理的连接
  • epoll_wait:等待其管理的连接上的IO事件

1)epoll内核对象的创建

在用户进程调用epoll_create时,内核会创建一个struct eventpoll内核对象,并把它关联到当前进程的已打开文件列表中,如下图所示:

对于struct eventpoll对象,更详细的结构如下图所示:

poll_create源码如下:

// fs/eventpoll.c
SYSCALL_DEFINE1(epoll_create1, int, flags)
{
    int error, fd;
    struct eventpoll *ep = NULL;
    ...
    // 创建一个eventpoll对象
    error = ep_alloc(&ep);
    ...
}

struct eventpoll的定义如下:

// fs/eventpoll.c
struct eventpoll {
    ...
    // sys_epollo_wait用到的等待队列
    wait_queue_head_t wq;
    ...
    // 接收就绪的描述符都会放到这里
    struct list_head rdllist;

    // 每个epollo对象中都有一棵红黑树
    struct rb_root rbr;
    ...
}

eventpoll这个结构体的几个成员的含义如下:

  • wq:等待队列链表。软中断数据就绪的时候会通过wq来找到阻塞在epoll对象上的用户进程
  • rbr:一棵红黑树。为了支持对海量连接的高效查找、插入和删除,eventpoll内部使用了一棵红黑树。通过这棵树来管理用户进程添加进来的所有socket连接
  • rdllist:就绪的描述符的链表。当有连接就绪的时候,内核会把就绪的连接放到rdllist链表里。这样应用进程只需要判断链表就能找出就绪连接,而不用去遍历整棵树

eventpoll初始化工作在ep_alloc中完成:

// fs/eventpoll.c
static int ep_alloc(struct eventpoll **pep)
{
    ...
    struct eventpoll *ep;
    ...
    // 申请eventpoll内存
    ep = kzalloc(sizeof(*ep), GFP_KERNEL);
    ...
    // 初始化等待队列头
    init_waitqueue_head(&ep->wq);
    init_waitqueue_head(&ep->poll_wait);
    // 初始化就绪队列
    INIT_LIST_HEAD(&ep->rdllist);
    // 初始化红黑树指针
    ep->rbr = RB_ROOT;
    ...
}

2)为epoll添加socket

理解这一步是理解整个epoll的关键。为了简单起见,这里只考虑使用EPOLL_CTL_ADD添加socket,先忽略删除和更新。假设现在和客户端的多个连接的socket都创建好了,也创建好了epoll内核对象。在使用epoll_ctl注册每一个socket的时候,内核会做如下三件事情:

  1. 分配一个红黑树节点对象epitem
  2. 将等待事件添加到socket的等待队列中,其回调函数是ep_poll_callback
  3. epitem插入epoll对象的红黑树

通过epoll_ctl添加两个socket以后,这些内核数据结构最终在进程中的关系大致如下图所示:

来看看socket是如何添加到epoll对象里的,找到epoll_ctl的源码

// fs/eventpoll.c
SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
        struct epoll_event __user *, event)
{
    ...
    struct file *file, *tfile;
    struct eventpoll *ep;
    ...
    // 根据epfd找到eventpoll内核对象
    file = fget(epfd);
    ...
    // 根据socket句柄号,找到其file内核对象
    tfile = fget(fd);
    ...
    ep = file->private_data;
    ...
    switch (op) {
    case EPOLL_CTL_ADD:
        if (!epi) {
            epds.events |= POLLERR | POLLHUP;
            error = ep_insert(ep, &epds, tfile, fd);
        } else
            error = -EEXIST;
        clear_tfile_check_list();
        break;
    ...
    }
    ...
}

epoll_ctl中首先根据传入fd找到eventpollsocket相关的内核对象。对于EPOLL_CTL_ADD操作来说,会执行到ep_insert函数。所有的注册都是在这个函数中完成的:

// fs/eventpoll.c
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
             struct file *tfile, int fd)
{
    ...
    struct epitem *epi;
    struct ep_pqueue epq;
    ...
    // 1.分配并初始化epitem
    // 分配一个epi对象
    if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
        return -ENOMEM;
    ...
    // 对分配的epi对象进行初始化
    INIT_LIST_HEAD(&epi->pwqlist);
    epi->ep = ep;
    // epi->ffd中存了句柄号和struct file对象地址
    ep_set_ffd(&epi->ffd, tfile, fd);
    ...
    // 2.设置socket等待队列
    // 定义并初始化ep_pqueue对象
    epq.epi = epi;
    init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);

    // 调用ep_ptable_queue_proc注册回调函数
    // 实际注入的函数为ep_poll_callback
    revents = ep_item_poll(epi, &epq.pt);
    ...
    // 3.将epi插入eventpoll对象的红黑树中
    ep_rbtree_insert(ep, epi);
    ...
}

分配并初始化epitem

对于每一个socket,调用epoll_ctl的时候,都会为之分配一个epitem。该结构的主要数据结构如下:

// fs/eventpoll.c
struct epitem {
    // 红黑树节点
    struct rb_node rbn;
    ...
    // socket文件描述信息
    struct epoll_filefd ffd;
    ...
    // 等待队列
    struct list_head pwqlist;

    // 所归属的eventpoll对象
    struct eventpoll *ep;
    ...
}

epitem进行一些初始化,首先在epi->ep = ep;这行代码中将其ep指针指向eventpoll对象。另外用要添加的socket的file、fd来填充epi->ffdepitem初始化后的关联关系如下图所示:

其中使用到的ep_set_ffd函数如下:

// fs/eventpoll.c
static inline void ep_set_ffd(struct epoll_filefd *ffd,
                  struct file *file, int fd)
{
    ffd->file = file;
    ffd->fd = fd;
}

设置socket等待队列

在创建epitem并初始化之后,ep_insert中第二件事情就是设置socket对象上的等待任务队列,并把函数fs/eventpoll.c文件下的ep_poll_callback设置为数据就绪时候的回调函数,如下图所示:

先来看ep_item_poll

// fs/eventpoll.c
static inline unsigned int ep_item_poll(struct epitem *epi, poll_table *pt)
{
    pt->_key = epi->event.events;

    return epi->ffd.file->f_op->poll(epi->ffd.file, pt) & epi->event.events;
}

这里调用了socket下的file->f_op->poll,这个函数实际上是sock_poll

// net/socket.c
static unsigned int sock_poll(struct file *file, poll_table *wait)
{
    struct socket *sock;
    sock = file->private_data;
    return sock->ops->poll(file, sock, wait);
}

sock->ops->poll指向的是tcp_poll

// net/ipv4/tcp.c
unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
{
    ...
    struct sock *sk = sock->sk;
    ...
    sock_poll_wait(file, sk_sleep(sk), wait);
    ...
}

在sock_poll_wait的第二个参数传参前,先调用了sk_sleep函数。在这个函数里它获取了sock对象下的等待队列列表头wait_queue_head_t,稍后等待队列项就插到这里。这里稍微注意下,是socket的等待队列,不是epoll对象的。下面来看sk_sleep源码

// include/net/sock.h
static inline wait_queue_head_t *sk_sleep(struct sock *sk)
{
    BUILD_BUG_ON(offsetof(struct socket_wq, wait) != 0);
    return &rcu_dereference_raw(sk->sk_wq)->wait;
}

接着真正进入sock_poll_wait:

// include/net/sock.h
static inline void sock_poll_wait(struct file *filp,
        wait_queue_head_t *wait_address, poll_table *p)
{
    ...
        poll_wait(filp, wait_address, p);
    ...
}
// include/linux/poll.h
static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p)
{
    if (p && p->_qproc && wait_address)
        p->_qproc(filp, wait_address, p);
}

这里的qproc是个函数指针,它在前面的init_poll_funcptr调用时设置成了ep_ptable_queue_proc函数,ep_ptable_queue_proc源码如下:

// fs/eventpoll.c
static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
                 poll_table *pt)
{
    struct epitem *epi = ep_item_from_epqueue(pt);
    struct eppoll_entry *pwq;

    if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {
        // 初始化回调方法
        init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
        pwq->whead = whead;
        pwq->base = epi;
        // 将ep_poll_callback放入socket等待队列whead(注意不是epollo等待队列)
        add_wait_queue(whead, &pwq->wait);
        list_add_tail(&pwq->llink, &epi->pwqlist);
        epi->nwait++;
    } else {
        epi->nwait = -1;
    }
}

在ep_ptable_queue_proc函数中,新建了一个等待队列项,并注册其回调函数为ep_poll_callback函数,然后再将这个等待项添加到socket的等待队列中

在前面介绍阻塞式的系统调用recvfrom时,由于需要在数据就绪的时候唤醒用户进程,所以等待对象项的private会设置成当前用户进程描述符current。而这里的socket是交给epoll来管理的,不需要在一个socket就绪的时候就唤醒进程,所以这里的q->private没有什么用就设置成了NULL

// include/linux/wait.h
static inline void init_waitqueue_func_entry(wait_queue_t *q,
                    wait_queue_func_t func)
{
    q->flags = 0;
    q->private = NULL;
    q->func = func;
}

如上,等待队列项中仅将回调函数q->func设置为ep_poll_callback。后面讲到“数据来了”时,软中断将数据收到socket的接收队列后,会通过注册的这个ep_poll_callback函数来回调,进而通知epoll对象

插入红黑树

分配完epitem对象后,紧接着把它插入红黑树。一个插入了一些socket描述符的epoll里的红黑树示意图如下图所示:

这里使用红黑树是为了让epoll在查找效率、插入效率、内存开销等多个方法比较均衡。

3)epoll_wait之等待接收

epoll_wait做的事情不复杂,当它被调用时,它观察eventpoll->rdllist链表里有没有数据。有数据就返回,没有数据就创建一个等待队列项,将其添加到eventpoll的等待队列上,然后把自己阻塞掉:

其源代码如下:

// fs/eventpoll.c
SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
        int, maxevents, int, timeout)
{
    ...
    error = ep_poll(ep, events, maxevents, timeout);
    ...
}

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
           int maxevents, long timeout)
{
    ...
    wait_queue_t wait;
    ...
fetch_events:
    spin_lock_irqsave(&ep->lock, flags);
    // 1.判断就绪队列上有没有事件就绪
    if (!ep_events_available(ep)) {
        // 2.定义等待事件关联当前进程
        init_waitqueue_entry(&wait, current);
        // 3.把新waitqueue添加到epoll->wq链表
        __add_wait_queue_exclusive(&ep->wq, &wait);

        for (;;) {
            // 4.让出CPU,主动进入睡眠状态
            set_current_state(TASK_INTERRUPTIBLE);
            if (ep_events_available(ep) || timed_out)
                break;
            if (signal_pending(current)) {
                res = -EINTR;
                break;
            }

            spin_unlock_irqrestore(&ep->lock, flags);
            if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
                timed_out = 1;

            spin_lock_irqsave(&ep->lock, flags);
        }
        __remove_wait_queue(&ep->wq, &wait);

        set_current_state(TASK_RUNNING);
    }
...
}

判断就绪队列上有没有事件就绪

首先调用ep_events_available来判断就绪链表中是否有可处理的事件

// fs/eventpoll.c
static inline int ep_events_available(struct eventpoll *ep)
{
    return !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
}

定义等待事件关联当前进程

假设确实没有就绪的连接,那接着会进入init_waitqueue_entry中定义等待任务,并把current(当前进程)添加到waitqueue上

当没有IO事件的时候,epollo也会阻塞调当前进程,因为没有事情可做了占着CPU也没什么意义。epoll本身是阻塞的,但一般会把socket设置成非阻塞

// include/linux/wait.h
static inline void init_waitqueue_entry(wait_queue_t *q, struct task_struct *p)
{
    q->flags = 0;
    q->private = p;
    q->func = default_wake_function;
}

注意这里的回调函数名称是default_wake_function。后面讲到“数据来了”时将会调用该函数

添加到等待队列

// include/linux/wait.h
static inline void __add_wait_queue_exclusive(wait_queue_head_t *q,
                          wait_queue_t *wait)
{
    wait->flags |= WQ_FLAG_EXCLUSIVE;
    __add_wait_queue(q, wait);
}

在这里把定义的等待事件添加到了epoll对象的等待队列中

让出CPU主动进入睡眠状态

通过set_current_state把当前进程设置为可打断。调用schedule_hrtimeout_range让出CPU,主动进入睡眠状态

// kernel/hrtimer.c
int __sched schedule_hrtimeout_range(ktime_t *expires, unsigned long delta,
                     const enum hrtimer_mode mode)
{
    return schedule_hrtimeout_range_clock(expires, delta, mode,
                          CLOCK_MONOTONIC);
}

int __sched
schedule_hrtimeout_range_clock(ktime_t *expires, unsigned long delta,
                   const enum hrtimer_mode mode, int clock)
{
    ...
        schedule();
    ...
}

在schedule中选择下一个进程调度

// kernel/sched/core.c
static void __sched __schedule(void)
{
    ...
    next = pick_next_task(rq);
    ...
        context_switch(rq, prev, next);
    ...
}

4)数据来了

在前面epoll_ctl执行的时候,内核为每一个socket都添加了一个等待队列项。在epoll_wait运行完的时候,又在event poll对象上添加了等待队列元素

  • socket->sock->sk_data_ready设置的就绪处理函数是sock_def_readable
  • 在socket的等待队列中,其回调函数是ep_poll_callback,private指向的是空指针null
  • 在eventpoll的等待队列项中,其回调函数是default_wake_function,private指向的是等待该事件的用户进程

将数据接收到任务队列

从TCP协议栈的处理入口函数tcp_v4_rcv开始:

// net/ipv4/tcp_ipv4.c
int tcp_v4_rcv(struct sk_buff *skb)
{
    ...
    // 获取TCP头
    th = tcp_hdr(skb);
    // 获取IP头
    iph = ip_hdr(skb);
    ...
    // 根据数据包头中的IP、端口信息查找到对应的socket
    sk = __inet_lookup_skb(&tcp_hashinfo, skb, th->source, th->dest);
    ...
    // socket未被用户锁定
    if (!sock_owned_by_user(sk)) {
        ...
        {
            if (!tcp_prequeue(sk, skb))
                ret = tcp_v4_do_rcv(sk, skb);
        }
        ...
}

在tcp_v4_rcv中首先根据收到的网络包的header里的source和dest信息在本机上查找对应的socket。找到以后,直接接入接收的主体函数tcp_v4_do_rcv:

// net/ipv4/tcp_ipv4.c
int tcp_v4_do_rcv(struct sock *sk, struct sk_buff *skb)
{
    ...
    if (sk->sk_state == TCP_ESTABLISHED) {
        ...
        // 执行连接状态下的数据处理  
        if (tcp_rcv_established(sk, skb, tcp_hdr(skb), skb->len)) {
            rsk = sk;
            goto reset;
        }
        return 0;
    }
    // 其他非ESTABLISH状态的数据包处理
    ...
}

假设处理的是ESTABLISH状态下的包,这样就又进入tcp_rcv_established函数中进行处理了

// net/ipv4/tcp_input.c
int tcp_rcv_established(struct sock *sk, struct sk_buff *skb,
            const struct tcphdr *th, unsigned int len)
{
                  ...
                // 将数据接收到队列中
                eaten = tcp_queue_rcv(sk, skb, tcp_header_len,
                              &fragstolen);
            }
            ...
            // 数据准备好,唤醒socket上阻塞掉的进程  
            sk->sk_data_ready(sk, 0);
            ...
}

tcp_rcv_established中通过调用tcp_queue_rcv函数完成了将接收数据放到socket的接收队列上,如下图所示:

// net/ipv4/tcp_input.c
static int __must_check tcp_queue_rcv(struct sock *sk, struct sk_buff *skb, int hdrlen,
          bool *fragstolen)
{
    ...
    // 把接收到的数据放到socket的接收队列的尾部 
    if (!eaten) {
        __skb_queue_tail(&sk->sk_receive_queue, skb);
        skb_set_owner_r(skb, sk);
    }
    return eaten;
}

查找就绪回调函数

调用tcp_queue_rcv完成接收之后,接着再调用sk_data_ready来唤醒在socket上等待的用户进程。在“socket的直接创建”中讲到的sock_init_data函数,已经把sk_data_ready设置成了sock_def_readable函数了。它是默认的数据就绪处理函数。

当socket上数据就绪时,内核将以sock_def_readable这个函数为入口,找到epoll_ctl添加socket时在其上设置的回调函数ep_poll_callback,如下图所示:

// net/core/sock.c
static void sock_def_readable(struct sock *sk, int len)
{
    struct socket_wq *wq;

    rcu_read_lock();
    wq = rcu_dereference(sk->sk_wq);
    // 判断等待队列不为空
    if (wq_has_sleeper(wq))
        // 执行等待队列项上的回调函数
        wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI |
                        POLLRDNORM | POLLRDBAND);
    sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN);
    rcu_read_unlock();
}

重点看wake_up_interruptible_sync_poll,看一下内核是怎么找到等待队列项里注册的回调函数的

// include/linux/wait.h
#define wake_up_interruptible_sync_poll(x, m)                \
    __wake_up_sync_key((x), TASK_INTERRUPTIBLE, 1, (void *) (m))

// kernel/sched/core.c
void __wake_up_sync_key(wait_queue_head_t *q, unsigned int mode,
            int nr_exclusive, void *key)
{
    unsigned long flags;
    int wake_flags = WF_SYNC;

    if (unlikely(!q))
        return;

    if (unlikely(!nr_exclusive))
        wake_flags = 0;

    spin_lock_irqsave(&q->lock, flags);
    __wake_up_common(q, mode, nr_exclusive, wake_flags, key);
    spin_unlock_irqrestore(&q->lock, flags);
}

接着进入__wake_up_common

// kernel/sched/core.c
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
            int nr_exclusive, int wake_flags, void *key)
{
    wait_queue_t *curr, *next;

    list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
        unsigned flags = curr->flags;

        if (curr->func(curr, mode, wake_flags, key) &&
                (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
            break;
    }
}

__wake_up_common中,选出等待队列里注册的某个元素curr,回调其curr->func。之前调用ep_insert的时候,把这个func设置成ep_poll_callback

执行socket就绪回调函数

找到了socket等待队列项里注册的函数ep_poll_callback,接着软中断就会调用它

// fs/eventpoll.c
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
    ...
    // 获取wait对应的epitem
    struct epitem *epi = ep_item_from_wait(wait);
    // 获取epitem对应的eventpoll结构体
    struct eventpoll *ep = epi->ep;
    ...
    if (!ep_is_linked(&epi->rdllink)) {
        // 1.将当前epitem添加到eventpoll的就绪队列中
        list_add_tail(&epi->rdllink, &ep->rdllist);
        ep_pm_stay_awake_rcu(epi);
    }
    // 2.查看eventpoll的等待队列上是否有等待
    if (waitqueue_active(&ep->wq))
        wake_up_locked(&ep->wq);
    ...
}

ep_poll_callback中根据等待任务队列上额外的base指针可以找到epitem,进而也可以找到eventpoll对象

它做的第一件事就是把自己的epitem添加到epoll的就绪队列中。接着它又会查看eventpoll对象上的的等待队列里是否有等待项(epoll_wait执行的时候会设置)。如果没有等待项,软中断的事情就做完了。如果有等待项,那就找到等待项里设置的回调函数,如下图所示:

依次调用wake_up_locked() => __wake_up_locked() => __wake_up_common

// kernel/sched/core.c
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,
            int nr_exclusive, int wake_flags, void *key)
{
    wait_queue_t *curr, *next;

    list_for_each_entry_safe(curr, next, &q->task_list, task_list) {
        unsigned flags = curr->flags;

        if (curr->func(curr, mode, wake_flags, key) &&
                (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive)
            break;
    }
}

__wake_up_common离,调用curr->func。这里的func是在epoll_wait时传入的default_wake_function函数

执行epoll就绪通知

default_wake_function中找到等待队列项里的进程描述符,然后唤醒它,如下图所示:

img

// kernel/sched/core.c
int default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags,
              void *key)
{
    return try_to_wake_up(curr->private, mode, wake_flags);
}

等待队列项curr->private指针是在对象上等待而被阻塞掉的进程。将epoll_wait进程推入可运行队列,等待内核重新调度进程。当这个进程重新运行后,从epoll_wait阻塞时暂停的代码处继续执行。把rdlist中就绪的事件返回给用户进程

// fs/eventpoll.c
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
           int maxevents, long timeout)
{
        ...
        __remove_wait_queue(&ep->wq, &wait);

        set_current_state(TASK_RUNNING);
    }
check_events:
    eavail = ep_events_available(ep);

    spin_unlock_irqrestore(&ep->lock, flags);

    // 给用户进程返回就绪事件
    if (!res && eavail &&
        !(res = ep_send_events(ep, events, maxevents)) && !timed_out)
        goto fetch_events;

    return res;
}

从用户角度来看,epoll_wait只是多等了一会儿而已,但执行流程还是顺序的。

5)小结

epoll的整个工作流程总结如下图所示:

其中软中断回调时的回调函数调用关系整理如下:

sock_def_readable: sock对象初始化时设置的
    => ep_poll_callback: 调用epll_ctl时添加到socket上的
        => default_wake_function: 调用epoll_wait时设置到epoll上的

总结一下,epoll相关的函数里内核运行环境分两部分:

  • 用户进程内核态。调用epoll_wait等函数时会将进程写入内核态来执行。这部分代码负责查看接收队列,以及负责把当前进程阻塞掉,让出CPU
  • 硬、软中断上下文。在这些组件中,将包从网卡接收过来进行处理,然后放到socket的接收队列。对于epoll来说,再找到socket关联的epitem,并把它添加到epoll对象的就绪链表中。这个时候再捎带检查一下epoll上是否有被阻塞的进程,如果有唤醒它

在实践中,只要活儿足够多,epoll_wait根本不会让进程阻塞。用户进程会一直干活儿,一直干活儿,直到epoll_wait里实在没活儿可干的时候才主动让出CPU。这就是epoll高效的核心原因所在。

总结

1)阻塞到底是怎么一回事?

阻塞其实说的是进程因为等待某个事件而主动让出CPU挂起的操作。在网络IO中,当进程等待socket上的数据时,如果数据还没有到来,那就把当前进程状态从TASK_RUNNING修改为TASK_INTERRUPTIBLE,然后主动让出CPU。由调度器来调度下一个就绪状态的进程来执行

所以,在分析某个技术方案是不是阻塞的时候,关键要看进程有没有放弃CPU。如果放弃了,那就是阻塞。如果没放弃,那就是非阻塞。事实上,recvfrom也可以设置成非阻塞。在这种情况下,如果socket上没有数据到达,调用直接返回空,而不是挂起等待

2)同步阻塞IO都需要哪些开销?

同步阻塞IO的开销主要有以下这些:

  • 进程通过recv系统调用接收一个socket上的数据时,如果数据没有到达,进程就被从CPU上拿下来,然后再换上另一个进程。这导致一次进程上下文切换的开销
  • 当连接上的数据就绪的时候,睡眠的进程又会被唤醒,又是一次进程切换的开销
  • 一个进程同时只能等待一条连接,如果有很多并发,则需要很多进程。每个进程都将占用大于几MB的内存

3)多路复用epoll为什么就能提高网络性能?

epoll高性能最根本的原因是极大程度地减少了无用的进程上下文切换,让进程更专注地处理网络请求

在内核的硬、软中断上下文中,包从网卡接收过来进行处理,然后放到socket的接收队列。再找到socket关联的epitem,并把它添加到epoll对象的就绪链表中

在用户进程中,通过调用epoll_wait来查看就绪链表中是否有事件到达,如果有,直接走进行处理。处理完毕再次调用epoll_wait。在高并发的实践中,主要活儿足够多,epoll_wait根本不会让进程阻塞。用户进程会一直干活儿,一直干活儿,直到epoll_wait里实在没活儿可干的时候才主动让出CPU。这就是epoll高效的核心原因所在

至于红黑树,仅仅是提高了epoll查找、添加、删除socket时的效率而已,不算epoll在高并发场景高性能的根本原因

4)epoll也是阻塞的?

例如,一个epoll对象下添加了一万个客户端连接的socket。假设所有这些socket上都还没有数据到达,这个时候进程调用epoll_wait发现没有任何事情可干。这种情况下用户进程就会被阻塞掉,而这种情况是完全正常的,没有工作需要处理,那还占着CPU是没有道理的

阻塞不会导致低性能,过多过频繁的阻塞才会。epoll的阻塞和它的高性能并不冲突

5)为什么Redis的网络性能很突出?

Redis在网络IO性能上表现非常突出,单进程的服务器在极限情况下可以达到10万的QPS

Redis的事件循环可以简化到用如下伪代码来表示

void aeMain(aeEventLoop *eventLoop) {
    job = epollo_wait(...);
    do_job();
}

Redis的主要业务逻辑就是在本机内存上的数据结构的读写,几乎没有网络IO和磁盘IP,单个请求处理起来很快。所以它把主服务器程序干脆就做成了单进程的,这样省去了多进程之间协作的负担,也很大程序减少了进程切换。进程主要的工作过程就是调用epoll_wait等待事件,有了事件以后处理,处理完之后再调用epoll_wait。一直工作,一直工作,直到实在没有请求需要处理,或者进程时间片到的时候才让出CPU。工作效率发挥到了极致

推荐阅读:

Linux五种I/O模型:带你彻底理解Linux五种I/O模型


文章作者: JoyTsing
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 JoyTsing !
评论
  目录