精华内容
下载资源
问答
  • 文章目录IO复用概述epoll概述 在学到I/O复用——《UNP1》第六章的时候,我们通常要学习: 5种io模型: 阻塞I/O 非阻塞I/O I/O复用(select和poll) 信号驱动I/O 异步I/O 3个主要函数:(selectors模块) select...


    在学到I/O复用——《UNP1》第六章的时候,我们通常要学习:
    5种io模型:

    • 阻塞I/O
    • 非阻塞I/O
    • I/O复用(select和poll)
    • 信号驱动I/O
    • 异步I/O

    3个主要函数:(selectors模块)

    • select函数
    • poll函数
    • epoll系列函数

    考虑到epoll性能最高而目前服务器大部分使用该函数
    本文内容只涵盖5种I/O模型中的第3种——io复用、3个主要函数中的第三个epoll函数。
    关于5种io模型和3个主要函数的解释与对比可见
    并发编程(IO多路复用)

    IO复用概述

    在一个典型聊天程序中,一个服务器对多个客户端,有socket(IO流)大致可以分为两种方法
    第一种方法就是给一个新的IO分配一个新的资源(多进程或者多线程的线程池)
    第二种方法就是io多路复用——一个线程记录每个IO流的状态,来同时管理多个io流

    IO复用(I/O multiplexing)指代的东西,其实可以理解为就是早期电话接线员的工作——分时复用
    在这里插入图片描述
    在同一时刻,会有很多IO流(socket描述符)链接至服务器,epoll会把链接进来的IO流都监视起来,谁有数据,epoll就会像电话接线员/波动开关一样,把通道给谁,然后调用响应的代码去数量数据。
    select、poll、epoll是IO多路复用的三种具体实现,之所以有三种,是因为存在历史顺序,在不断的出现bug和完善bug中诞生的poll–>epoll。现在在大并发的场景下,我们多用的是epoll。

    epoll 可以说是I/O 多路复用最新的一个实现,epoll 修复了poll 和select绝大部分问题, 比如:
    epoll 现在是线程安全的。
    epoll 现在不仅告诉你sock组里面数据,还会告诉你具体哪个sock有数据,你不用自己去找了。

    来源:知乎IO 多路复用是什么意思?

    epoll概述

    epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。

    epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

    关于epoll原理的更多理解:
    我读过的最好的epoll讲解–转自”知乎“

    epoll难在理解原理和内部代码实现,而函数接口使用较为简单,作为初学,就先来掌握函数的调用吧~
    在这里插入图片描述

    epoll_create创建一个epoll句柄(instance)

     #include <sys/epoll.h>
    int epoll_create(int size); 
    

    通知内核需要监听文件描述符
    size:用来告诉内核希望epoll instance监听的文件描述符数量有多大。但是从linux2.6.8之后,这个数字不再被使用,但必须传入一个大于0的值,这是因为内核可以动态分配大小,所以不需要人为传输size了

    Since Linux 2.6.8, the size argument is ignored, but must be greater than zero;

    返回值:成功返回epoll句柄的描述符epoll_fd,失败返回-1
    注意,epoll句柄会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的。所以在使用完之后要调用close()关闭,否则可能导致fd被耗尽

    epoll_create() returns a file descriptor referring to the new epoll instance. This file descriptor is used for all the subsequent calls to the epoll interface. When no longer required, the file descriptor returned by epoll_create() should be closed by using close(2).
    When all file descriptors referring to an epoll instance have been closed, the kernel destroys the instance and releases the associated resources for reuse.

    另外这个函数还可以吃传参flag,没啥必要就不介绍了

    epoll_ctl将要监听的事件注册(加入)epoll句柄、从句柄中删除、对监听事件进行修改

    简单来说就是对要监听的事件进行控制的函数

     #include <sys/epoll.h>
    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); 
    
    • 第一个参数epfd:epoll_create返回的epoll句柄
    • 第二个参数op:要对第三个参数fd进行的操作,有三种取值:
    取值意义
    EPOLL_CTL_ADD注册新的fd到epfd中;
    EPOLL_CTL_MOD修改已经注册的fd的监听事件;
    EPOLL_CTL_DEL从epfd中删除一个fd;
    • 第三个参数是需要监听的fd
    • 第四个参数event是指向epoll_event 类型结构体的指针,用来告诉内核需要监听的是什么事件的什么动作,struct epoll_event结构体如下:
               struct epoll_event {
                   uint32_t     events;      /* Epoll events */
                   epoll_data_t data;        /* User data variable */
               };
    

    其中的data又是一个结构体:

              typedef union epoll_data {
                   void        *ptr;
                   int          fd;
                   uint32_t     u32;
                   uint64_t     u64;
               } epoll_data_t;
    
    

    对于epoll_event结构体中的events,有如下数据可设置:

    取值意义
    EPOLLIN触发该事件,表示对应的文件描述符上有可读数据。(包括对端SOCKET正常关闭);
    EPOLLOUT触发该事件,表示对应的文件描述符上可以写数据;
    EPOLLPRI表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
    EPOLLERR表示对应的文件描述符发生错误;不需设置, epoll_wait(2) will always wait for this event;
    EPOLLHUP表示对应的文件描述符被挂断;不需设置, epoll_wait(2) will always wait for this event;
    EPOLLET将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
    EPOLLONESHOT只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里。

    参考:epoll机制:epoll_create、epoll_ctl、epoll_wait、close

    而对于epoll_event的第二个参数data,它包含与要处理的事件相关的文件描述符fd,通常与epoll_ctl函数的第三个参数一致
    栗子:

    	int epoll_fd;
    	epoll_fd = epoll_create(1);//size传送一个>0参数即可
    	struct epoll_event epollEvent;
    	//下面两句:对server_socket_fd的数据输入事件感兴趣
    	epollEvent.data.fd = server_socket_fd;
    	epollEvent.events = EPOLLIN;
    	//注册要处理的事件server_socket_fd进epoll_fd
    	epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_socket_fd, &epollEvent);
    
    

    epoll_wait等待已注册事件触发

    等待事件的产生,若超过timeout还没有触发,就超时

           #include <sys/epoll.h>
    
           int epoll_wait(int epfd, struct epoll_event *events,
                          int maxevents, int timeout);
                          
           int epoll_pwait(int epfd, struct epoll_event *events,
                          int maxevents, int timeout,
                          const sigset_t *sigmask);
    
    

    我们仅说第一种函数:

    • 第一个参数epfd是由epoll_create 生成的epoll专用的文件描述符;
    • 第二个参数events和上面最后一个参数一样,也是指向epoll_event 类型结构体的指针,不过现在是作为一个容器使用,用来从内核得到发生的事件的集合
    • 第三个参数maxevents用来告知这个容器有多大(事件数组成员个数),也就是每次能处理的事件个数
    • 第四个参数timeout是等待I/O事件发生的超时值(单位我也不太清楚);一般置为-1即可,-1代表将“block indefinitely”将无限期等待永久阻塞,也有说导致不确定结果,这取决于怎么翻译了。置为0将使得函数立刻返回,无论有没有事件发生,也就是设置为非阻塞。

    返回值是存放在第二个参数events数组容器内的实际成员个数。也就是发生了的需要处理的事件个数,如果返回0表示已超时。

    epoll_wait函数的运行过程是:程序阻塞在这个函数,等侍注册在epfd上的socket fd的事件的发生,如果发生则将发生的sokct fd和事件类型放入到events数组中。

    ET/LT工作模式

    一、LT(LevelTriggered)水平触发模式:(默认工作方式)
    当epoll_wait检测到fd上有事件发生并将此事件通知应用程序后,程序可以选择立即或不立即处理该事件,只要事件不被处理,下一次调用epoll_wait的时候还会像应用程序通知此事件,直到事件被处理。

    二、ET(Edge Triggered)边沿触发模式:
    当epoll_wait检测到fd上有事件发生并将此事件通知应用程序后,程序必须立即处理该事件,因为epoll_wait将只通知一遍,后续的epoll_wait调用不再通知这一事件。

    三、两种模式的特性:

    • LT(LevelTriggered)水平触发模式,是一种缺省工作方式,支持blocksocket和no_blocksocket,错误率比较小。
    • ET(Edge Triggered)边沿触发模式,是高速工作方式,因为仅通知一遍,效率高,但错误率比较大,只支持no_block socket (非阻塞socket),因为如果描述符是阻塞blocksocket的,那么读或写操作将会因没有后续事件而一直处于阻塞状态 ( 饥渴状态 )。
    • 两者的差异在于LT模式下只要某个socket处于readable/writable状态,无论什么时候进行epoll_wait都会返回该socket;而edge-trigger模式下只有某个socket从unreadable变为readable或从unwritable变为writable时,epoll_wait才会返回该socket,状态不变则不返回。
    • 在epoll的ET模式下,正确的读写方式为:读:只要可读,就一直读,直到返回0,或者 errno = EAGAIN;写:只要可写,就一直写,直到数据发送完,或者 errno = EAGAIN

    关于模式选择与代码逻辑的结合,也是后台开发非常需要研究的地方,笔者目前经验不足,仅列几个参考链接:

    ET/LT模式区别
    epoll的LT和ET模式
    epoll两种类型ET和LT区别(结合实际例子)

    end

    展开全文
  • http://apps.hi.baidu.com/share/detail/31300135 NAME epoll - I/O event notification facility SYNOPSIS #include <sys/epoll.h> DEscrīptION epoll is a variant of poll(2) that c...

    http://apps.hi.baidu.com/share/detail/31300135

    NAME
           epoll - I/O event notification facility

    SYNOPSIS
           #include <sys/epoll.h>

    DEscrīptION
           epoll is a variant of poll(2) that can be used either as Edge or Level
           Triggered interface and scales well to large numbers of watched fds.
           Three system calls are provided to set up and control an epoll set:
           epoll_create(2), epoll_ctl(2), epoll_wait(2).

           An epoll set is connected to a file descrīptor created by epoll_create(2).   Interest for certain file descrīptors is then registered via
           epoll_ctl(2). Finally, the actual wait is started by epoll_wait(2).

    其实,一切的解释都是多余的,按照我目前的了解,EPOLL模型似乎只有一种格式,所以大家只要参考我下面的代码,就能够对EPOLL有所了解了,代码的解释都已经在注释中:

    while (TRUE)
    {
    int nfds = epoll_wait (m_epoll_fd, m_events, MAX_EVENTS, EPOLL_TIME_OUT);//等待EPOLL事件的发生,相当于监听,至于相关的端口,需要在初始化EPOLL的时候绑定。
    if (nfds <= 0)
       continue;
    m_bOnTimeChecking = FALSE;
    G_CurTime = time(NULL);
    for (int i=0; i<nfds; i++)
    {
       try
       {
        if (m_events[i].data.fd == m_listen_http_fd)//如果新监测到一个HTTP用户连接到绑定的HTTP端口,建立新的连接。由于我们新采用了SOCKET连接,所以基本没用。
        {
         OnAcceptHttpEpoll ();
        }
        else if (m_events[i].data.fd == m_listen_sock_fd)//如果新监测到一个SOCKET用户连接到了绑定的SOCKET端口,建立新的连接。
        {
         OnAcceptSockEpoll ();
        }
        else if (m_events[i].events & EPOLLIN)//如果是已经连接的用户,并且收到数据,那么进行读入。
        {
         OnReadEpoll (i);
        }

        OnWriteEpoll (i);//查看当前的活动连接是否有需要写出的数据。
       }
       catch (int)
       {
        PRINTF ("CATCH捕获错误\n");
        continue;
       }
    }
    m_bOnTimeChecking = TRUE;
    OnTimer ();//进行一些定时的操作,主要就是删除一些断线用户等。
    }

    其实EPOLL的精华,按照我目前的理解,也就是上述的几段短短的代码,看来时代真的不同了,以前如何接受大量用户连接的问题,现在却被如此轻松的搞定,真是让人不得不感叹。

    今天搞了一天的epoll,想做一个高并发的代理程序。刚开始真是郁闷,一直搞不通,网上也有几篇介绍epoll的文章。但都不深入,没有将一些注意的地方讲明。以至于走了很多弯路,现将自己的一些理解共享给大家,以少走弯路。

    epoll用到的所有函数都是在头文件sys/epoll.h中声明,有什么地方不明白或函数忘记了可以去看一下。
    epoll和select相比,最大不同在于:

    1epoll返回时已经明确的知道哪个sokcet fd发生了事件,不用再一个个比对。这样就提高了效率。
    2select的FD_SETSIZE是有限止的,而epoll是没有限止的只与系统资源有关。

    1、epoll_create函数
    函数声明:int epoll_create(int size)
    该 函数生成一个epoll专用的文件描述符。它其实是在内核申请一空间,用来存放你想关注的socket fd上是否发生以及发生了什么事件。size就是你在这个epoll fd上能关注的最大socket fd数。随你定好了。只要你有空间。可参见上面与select之不同2.

    22、epoll_ctl函数
    函数声明:int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
    该函数用于控制某个epoll文件描述符上的事件,可以注册事件,修改事件,删除事件。
    参数:
    epfd:由 epoll_create 生成的epoll专用的文件描述符;
    op:要进行的操作例如注册事件,可能的取值EPOLL_CTL_ADD 注册、EPOLL_CTL_MOD 修 改、EPOLL_CTL_DEL 删除

    fd:关联的文件描述符;
    event:指向epoll_event的指针;
    如果调用成功返回0,不成功返回-1

    用到的数据结构
    typedef union epoll_data {
    void *ptr;
    int fd;
    __uint32_t u32;
    __uint64_t u64;
    } epoll_data_t;

    struct epoll_event {
    __uint32_t events; /* Epoll events */
    epoll_data_t data; /* User data variable */
    };


    如:
    struct epoll_event ev;
    //设置与要处理的事件相关的文件描述符
    ev.data.fd=listenfd;
    //设置要处理的事件类型
    ev.events=EPOLLIN|EPOLLET;
    //注册epoll事件
    epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);


    常用的事件类型:
    EPOLLIN :表示对应的文件描述符可以读;
    EPOLLOUT:表示对应的文件描述符可以写;
    EPOLLPRI:表示对应的文件描述符有紧急的数据可读
    EPOLLERR:表示对应的文件描述符发生错误;
    EPOLLHUP:表示对应的文件描述符被挂断;
    EPOLLET:表示对应的文件描述符有事件发生;


    3、epoll_wait函数
    函数声明:int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout)
    该函数用于轮询I/O事件的发生;
    参数:
    epfd:由epoll_create 生成的epoll专用的文件描述符;
    epoll_event:用于回传代处理事件的数组;
    maxevents:每次能处理的事件数;
    timeout:等待I/O事件发生的超时值(单位我也不太清楚);-1相当于阻塞,0相当于非阻塞。一般用-1即可
    返回发生事件数。


    用法如下:

    /*build the epoll enent for recall */
    struct epoll_event ev_read[20];
    int nfds = 0; //return the events count
    nfds=epoll_wait(epoll_fd,ev_read,20, -1);
    for(i=0; i
    {
    if(ev_read[i].data.fd == sock)// the listener port hava data
    ......

    epoll_wait运行的原理是
    等侍注册在epfd上的socket fd的事件的发生,如果发生则将发生的sokct fd和事件类型放入到events数组中。
    并 且将注册在epfd上的socket fd的事件类型给清空,所以如果下一个循环你还要关注这个socket fd的话,则需要用epoll_ctl(epfd,EPOLL_CTL_MOD,listenfd,&ev)来重新设置socket fd的事件类型。这时不用EPOLL_CTL_ADD,因为socket fd并未清空,只是事件类型清空。这一步非常重要。
    俺最开始就是没有加这个,白搞了一个上午。

    4单个epoll并不能解决所有问题,特别是你的每个操作都比较费时的时候,因为epoll是串行处理的。
    所以你还是有必要建立线程池来发挥更大的效能。

    //
    man中给出了epoll的用法,example程序如下:
           for(;;) {
               nfds = epoll_wait(kdpfd, events, maxevents, -1);

               for(n = 0; n < nfds; ++n) {
                   if(events[n].data.fd == listener) {
                       client = accept(listener, (struct sockaddr *) &local,
                                       &addrlen);
                       if(client < 0){
                           perror("accept");
                           continue;
                       }
                       setnonblocking(client);
                       ev.events = EPOLLIN | EPOLLET;
                       ev.data.fd = client;
                       if (epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev) < 0) {
                           fprintf(stderr, "epoll set insertion error: fd=%d\n",
                                   client);
                           return -1;
                       }
                   }
                   else
                       do_use_fd(events[n].data.fd);
               }
           }
    此时使用的是ET模式,即,边沿触发,类似于电平触发,epoll中的边沿触发的意思是只对新到的数据进行通知,而内核缓冲区中如果是旧数据则不进行通知,所以在do_use_fd函数中应该使用如下循环,才能将内核缓冲区中的数据读完。
            while (1) {
               len = recv(*******);
               if (len == -1) {
                 if(errno == EAGAIN)
                    break;
                 perror("recv");
                 break;
               }
               do something with the recved data........
            }

    在上面例子中没有说明对于listen socket fd该如何处理,有的时候会使用两个线程,一个用来监听accept另一个用来监听epoll_wait,如果是这样使用的话,则listen socket fd使用默认的阻塞方式就行了,而如果epoll_wait和accept处于一个线程中,即,全部由epoll_wait进行监听,则,需将listen socket fd也设置成非阻塞的,这样,对accept也应该使用while包起来(类似于上面的recv),因为,epoll_wait返回时只是说有连接到来了,并没有说有几个连接,而且在ET模式下epoll_wait不会再因为上一次的连接还没读完而返回,这种情况确实存在,我因为这个问题而耗费了一天多的时间,这里需要说明的是,每调用一次accept将从内核中的已连接队列中的队头读取一个连接,因为在并发访问的环境下,有可能有多个连接“同时”到达,而epoll_wait只返回了一次。

    唯一有点麻烦是epoll有2种工作方式:LT和ET。

    LT(level triggered)是缺省的工作方式,并且同时支持block和no-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表.

    ET (edge-triggered)是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知,直到你做了某些操作导致那个文件描述符不再为就绪状态了(比如,你在发送,接收或者接收请求,或者发送接收的数据少于一定量时导致了一个EWOULDBLOCK 错误)。但是请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once),不过在TCP协议中,ET模式的加速效用仍需要更多的benchmark确认。

    展开全文
  • 这篇文章主要对epoll_wait进行分析,其中可以说藏着很多以前想知道而又没办法知道的东西,正如侯捷老师所言,“源码面前,了无秘密”.在这篇文章里你可以知道ET与LT究竟有什么区别,epoll如何防止惊群问题等等有意思的...

    示例代码内核版本为2.6.38

    epoll源码解析(1) epoll_create
    epoll源码解析(2) epoll_ctl
    epoll源码解析(3) epoll_wait

    引言

    这篇文章主要对epoll_wait进行分析,其中可以说藏着很多以前想知道而又没办法知道的东西,正如侯捷老师所言,“源码面前,了无秘密”.在这篇文章里你可以知道ET与LT究竟有什么区别,epoll如何防止惊群问题等等有意思的东西.

    /*
     * Implement the event wait interface for the eventpoll file. It is the kernel
     * part of the user space epoll_wait(2).
     */
    SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
    		int, maxevents, int, timeout)
    {
    	int error;
    	struct file *file;
    	struct eventpoll *ep; 
    	
    	//这个函数中基本是对用户传进来的参数进行一些正确性检验,因为内核对于用户态是不信任的,这也就是干什么都要拷贝的原因吧.
    
    	/* The maximum number of event must be greater than zero */
    	if (maxevents <= 0 || maxevents > EP_MAX_EVENTS) //判断最大事件数是否在正确范围内
    		return -EINVAL;
    
    	/* Verify that the area passed by the user is writeable */
    	//检测传用户传入的指针所指区域是否可写
    	if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) {
    		error = -EFAULT;
    		goto error_return;
    	}
    
    	/* Get the "struct file *" for the eventpoll file */
    	error = -EBADF;
    	file = fget(epfd); //获取epoll的file结构体 linux的文件概念真滴是强大
    	if (!file)
    		goto error_return;
    
    	/*
    	 * We have to check that the file structure underneath the fd
    	 * the user passed to us _is_ an eventpoll file.
    	 */
    	error = -EINVAL;
    	if (!is_file_epoll(file)) //判断这是不是一个epoll的文件指针
    		goto error_fput;
    
    	/*
    	 * At this point it is safe to assume that the "private_data" contains
    	 * our own data structure.
    	 */
    	ep = file->private_data; //从file的private_data中获取eventpoll结构
    
    	/* Time to fish for events ... */
    	error = ep_poll(ep, events, maxevents, timeout); //epoll_wait的主函数体
    
    error_fput:
    	fput(file);
    error_return:
    
    	return error;
    }
    
    static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
    		   int maxevents, long timeout)
    {
    	int res, eavail;
    	unsigned long flags;
    	long jtimeout;
    	wait_queue_t wait;
    
    	/*
    	 * Calculate the timeout by checking for the "infinite" value ( -1 )
    	 * and the overflow condition. The passed timeout is in milliseconds,
    	 * that why (t * HZ) / 1000.
    	 */
    	 //计算睡眠事件 要转换成毫秒 式子里那个+999)/1000的作用是为了向上取整 HZ在/drivers/md/raid6.h中定义 值为1000
    	 //所以这里面的向上取整是否真的有必要呢?
    	jtimeout = (timeout < 0 || timeout >= EP_MAX_MSTIMEO) ?
    		MAX_SCHEDULE_TIMEOUT : (timeout * HZ + 999) / 1000;
    
    retry: //这个标记很重要 下面会说道
    
    	//#define write_lock_irqsave(lock, flags)	flags = _write_lock_irqsave(lock)
    	//#define _write_lock_irqsave(lock, flags)	__LOCK_IRQSAVE(lock, flags)
    	/*
    	#define __LOCK_IRQSAVE(lock, flags) \
      	do { local_irq_save(flags); __LOCK(lock); } while (0)
    	*/
    	write_lock_irqsave(&ep->lock, flags); //自旋锁上锁 一系列宏上面已列出
    
    	res = 0;
    	if (list_empty(&ep->rdllist)) { //如果就绪链表为空的话就进入if 也就是睡眠了,否则的话直接跳出,
    		//相当于我们如果在epoll_ctl(ADD)后,事件已经发生了后在wait,消耗实际上就只是一个用户态到内核态的转换和拷贝而已,
    		//不涉及从等待队列中唤醒
    		/*
    		 * We don't have any available event to return to the caller.
    		 * We need to sleep here, and we will be wake up by
    		 * ep_poll_callback() when events will become available.
    		 */
    		init_waitqueue_entry(&wait, current); //用当前进程初始化一个等待队列的entry
    		add_wait_queue(&ep->wq, &wait);
    		//把刚刚初始化的这个等待队列节点加到epoll内部的等待队列中去,也就是说在epoll_wait被唤醒时唤醒本进程
    
    		for (;;) { //开始睡眠
    			/*
    			 * We don't want to sleep if the ep_poll_callback() sends us
    			 * a wakeup in between. That's why we set the task state
    			 * to TASK_INTERRUPTIBLE before doing the checks.
    			 */
    			 //这里我翻译下源码中的解释,
    			 //我们不希望ep_poll_callback()发送给我们wakeup消息时我们还在沉睡,这就是为什么我们我们要在检查前设置成TASK_INTERRUPTIBLE
    			set_current_state(TASK_INTERRUPTIBLE);
    			if (!list_empty(&ep->rdllist) || !jtimeout) //rdllist不为空或者超时
    				break;
    			//当阻塞于某个慢系统调用的一个进程捕获某个信号且相应信号处理函数返回时,该系统调用可能返回一个EINTR错误
    			if (signal_pending(current)) { //收到一个信号的时候也可能被唤醒
    				res = -EINTR;
    				break;
    			}
    
    			write_unlock_irqrestore(&ep->lock, flags); //解锁 开始睡眠
    			jtimeout = schedule_timeout(jtimeout);//schedule_timeout的功能源码中的介绍为 sleep until timeout
    			write_lock_irqsave(&ep->lock, flags);
    		}
    		remove_wait_queue(&ep->wq, &wait); //醒来啦!从等待队列中移除
    
    		set_current_state(TASK_RUNNING);
    	}
    	/* Is it worth to try to dig for events ? */
    
    	//这里要注意 当rdlist被其他进程访问的时候,ep->ovflist会被设置为NULL,那时rdlist会被txlist替换,
    	//因为在遍历rdlist的时候有可能ovlist传入数据,然后写入rdllist,所以rdllist也有可能不为空,
    	//但为空且其他进程在访问的时候就会将eavail设置为true,为后面goto到retry再次进行睡眠做准备,
    	//这样就避免了用户态的唤醒,从而避免了一定程度上的惊群.文末附上我对惊群的测试于结论的链接
        eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
    
    	write_unlock_irqrestore(&ep->lock, flags);
    
    	/*
    	 * Try to transfer events to user space. In case we get 0 events and
    	 * there's still timeout left over, we go trying again in search of
    	 * more luck.
    	 */
    	if (!res && eavail &&
    	    !(res = ep_send_events(ep, events, maxevents)) && jtimeout)
    	   //res还得为0 epoll才会继续沉睡,有可能ovflist其中有数据 ,后面赋给了rdllist,这是rdllist有数据,也就是说此时epoll_wait醒来还有数据,所以不必继续沉睡
    		goto retry;
    
    	return res;
    }
    
    //传入eventpoll结构,用户指定的内存和最大事件数 然后执行ep_send_events_proc回调
    static int ep_send_events(struct eventpoll *ep,
                  struct epoll_event __user *events, int maxevents)
    {
        struct ep_send_events_data esed;
        esed.maxevents = maxevents;
        esed.events = events; 
        return ep_scan_ready_list(ep, ep_send_events_proc, &esed);
    }
    
    /*ep_scan_ready_list - Scans the ready list in a way that makes possible for
                           the scan code, to call f_op->poll(). Also allows for
                           O(NumReady) performance.
     */
    static int ep_scan_ready_list(struct eventpoll *ep,
    			      int (*sproc)(struct eventpoll *, //注意上面调用的回调在这个函数中名为sproc
    					   struct list_head *, void *),
    			      void *priv)
    {
    	int error, pwake = 0;
    	unsigned long flags;
    	struct epitem *epi, *nepi;
    	LIST_HEAD(txlist); //初始化一个链表 作用是把rellist中的数据换出来
    
    	/*
    	 * We need to lock this because we could be hit by
    	 * eventpoll_release_file() and epoll_ctl().
    	 */
    	mutex_lock(&ep->mtx);//操作时加锁 防止ctl中对结构进行修改
    
    	/*
    	 * Steal the ready list, and re-init the original one to the
    	 * empty list. Also, set ep->ovflist to NULL so that events
    	 * happening while looping w/out locks, are not lost. We cannot
    	 * have the poll callback to queue directly on ep->rdllist,
    	 * because we want the "sproc" callback to be able to do it
    	 * in a lockless way.
    	 */
    	spin_lock_irqsave(&ep->lock, flags); //加锁
    	list_splice_init(&ep->rdllist, &txlist);
    	ep->ovflist = NULL; //这里设置为NULL
    	spin_unlock_irqrestore(&ep->lock, flags);
    
    	/*
    	 * Now call the callback function.
    	 */
    	error = (*sproc)(ep, &txlist, priv); 
    	//对整个txlist执行回调,也就是对rdllist执行回调 
    	//遍历的时候可能所监控的fd也会执行回调,向把fd加入到rellist中,但那个时候可能这里正在遍历,为了不竞争锁,把数据放到ovflist中
    
    	spin_lock_irqsave(&ep->lock, flags); //加锁,把其中数据放入rdllist
    	/*
    	 * During the time we spent inside the "sproc" callback, some
    	 * other events might have been queued by the poll callback.
    	 * We re-insert them inside the main ready-list here.
    	 */
    	 //上面提到了 当执行sproc回调的时候可能也会有到来的数据,为了避免那时插入rdllist加锁,
    	 //把数据放到ovlist中.在执行完后加入rdllist中
    	for (nepi = ep->ovflist; (epi = nepi) != NULL;
    	     nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
    		/*
    		 * We need to check if the item is already in the list.
    		 * During the "sproc" callback execution time, items are
    		 * queued into ->ovflist but the "txlist" might already
    		 * contain them, and the list_splice() below takes care of them.
    		 */
    		if (!ep_is_linked(&epi->rdllink))
    			list_add_tail(&epi->rdllink, &ep->rdllist);
    	}
    	/*
    	 * We need to set back ep->ovflist to EP_UNACTIVE_PTR, so that after
    	 * releasing the lock, events will be queued in the normal way inside
    	 * ep->rdllist.
    	 */
    	ep->ovflist = EP_UNACTIVE_PTR; //设置为初始化时的样子
    
    	/*
    	 * Quickly re-inject items left on "txlist".
    	 */
    	 //有可能有未处理完的数据,再插入rdllist中,比如说LT
    	list_splice(&txlist, &ep->rdllist);
    
    	if (!list_empty(&ep->rdllist)) { //rellist不为空的话,进行唤醒
    		/*
    		 * Wake up (if active) both the eventpoll wait list and
    		 * the ->poll() wait list (delayed after we release the lock).
    		 */
    		if (waitqueue_active(&ep->wq))
    			wake_up_locked(&ep->wq);
    		if (waitqueue_active(&ep->poll_wait))
    			pwake++;
    	}
    	spin_unlock_irqrestore(&ep->lock, flags);
    
    	mutex_unlock(&ep->mtx);
    
    	/* We have to call this outside the lock */
    	if (pwake)
    		ep_poll_safewake(&ep->poll_wait);
    
    	return error;
    }
    

    我们再来看看具体执行的回调,也就是上面所提到的sproc.

    static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
    			       void *priv)
    {
    	struct ep_send_events_data *esed = priv;
    	int eventcnt;
    	unsigned int revents;
    	struct epitem *epi;
    	struct epoll_event __user *uevent;
    
    	/*
    	 * We can loop without lock because we are passed a task private list.
    	 * Items cannot vanish during the loop because ep_scan_ready_list() is
    	 * holding "mtx" during this call.
    	 */
    	for (eventcnt = 0, uevent = esed->events;
    	     !list_empty(head) && eventcnt < esed->maxevents;) { //可以看到,循环次数有我们设定的最大值和rdllist长度共同决定
    		epi = list_first_entry(head, struct epitem, rdllink); //取出一个值
    
    		list_del_init(&epi->rdllink); //删除它
    		/*
    		 *读取触发的事件类型,这里我其实有一点疑问,我们在ep_poll_callback中其实已经设置了events,
    		 *也就是这里的第二项,为什么还要再取一次呢,
    		 *查阅资料后给出的解释是因为events是会变的,其次不是所有的poll实现, 都通过等待队列传递了events, 
    		 *有可能某些驱动压根没传,必须我们主动去读取.
    		 *比如说LT,我们实际插入了rdllist,但是确没有触发新的事件,加入我们已经在上次读取完全部,
    		 *这次不会触发可读了,如果没有第一步的话就出现了问题
    		 */
    		revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &
    			epi->event.events;
    
    		/*
    		 * If the event mask intersect the caller-requested one,
    		 * deliver the event to userspace. Again, ep_scan_ready_list()
    		 * is holding "mtx", so no operations coming from userspace
    		 * can change the item.
    		 */
    		if (revents) { 
    			if (__put_user(revents, &uevent->events) || //将事件从内核空间传递至用户空间
    			    __put_user(epi->event.data, &uevent->data)) {
    				list_add(&epi->rdllink, head);
    				return eventcnt ? eventcnt : -EFAULT;
    			}
    			eventcnt++;//事件数加1 用于判断最大事件数
    			uevent++; //用户指定的地址 指针向后移一位
    			//我们可以看到EPOLLONESHOT事件会在每次触发时移除所有的事件类型,需要我们下次再指定
    			if (epi->event.events & EPOLLONESHOT)
    				epi->event.events &= EP_PRIVATE_BITS;
    				//#define EP_PRIVATE_BITS (EPOLLONESHOT | EPOLLET)
    			else if (!(epi->event.events & EPOLLET)) { //LT会把事件再次插入rdllist.
    				//ET与LT的差别其实就是一句代码,即是否插入rdllist而已
    				//就算ET中此时是有数据的,但是没有被取出,而所对应的fd也没有新事件的话就再也不会触发了,而LT还会继续触发.
    				//但是如果上面说的ET的情况下,再次来了新数据,仍然会触发回调再次加入rdllist中
    				/*
    				 * If this file has been added with Level
    				 * Trigger mode, we need to insert back inside
    				 * the ready list, so that the next call to
    				 * epoll_wait() will check again the events
    				 * availability. At this point, noone can insert
    				 * into ep->rdllist besides us. The epoll_ctl()
    				 * callers are locked out by
    				 * ep_scan_ready_list() holding "mtx" and the
    				 * poll callback will queue them in ep->ovflist.
    				 */
    				list_add_tail(&epi->rdllink, &ep->rdllist);
    			}
    		}
    	}
    
    	return eventcnt; //返回触发的事件数
    }
    

    惊群问题详解

    展开全文
  • epoll_wait详解

    千次阅读 2021-01-11 17:32:18
    int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout); epoll_wait()系统调用等待文件描述符epfd引用的epoll实例上的事件。事件所指向的存储区域将包含可供调用者使用的事件。...

    epoll_wait,等待epoll文件描述符上的I / O事件。

    #include <sys / epoll.h>
    
    int epoll_wait(int epfd,struct epoll_event * events, int maxevents,int timeout);

           epoll_wait()系统调用等待文件描述符epfd引用的epoll实例上的事件。事件所指向的存储区域将包含可供调用者使用的事件。 epoll_wait()最多返回最大事件。 maxevents参数必须大于零。 timeout参数指定epoll_wait()将阻止的最小毫秒数。 (此间隔将四舍五入为系统时钟的粒度,并且内核调度延迟意味着阻塞间隔可能会少量溢出。)指定超时值为-1会导致epoll_wait()无限期阻塞,而指定的超时时间等于零导致epoll_wait()立即返回,即使没有可用事件。

           结构epoll_event定义为:         

     typedef union epoll_data {
                   无效* ptr;
                   int fd;
                   uint32_t u32;
                   uint64_t u64;
               } epoll_data_t;
    
               struct epoll_event {
                   uint32_t事件; / * Epoll事件* /
                   epoll_data_t数据; / *用户数据变量* /
               };

           每个返回结构的数据将包含用户使用epoll_ctl设置的相同数据(EPOLL_CTL_ADD,EPOLL_CTL_MOD),而事件成员将包含返回的事件位字段。

    返回值:成功时,epoll_wait()返回为请求的I / O准备就绪的文件描述符的数目;如果在请求的超时毫秒内没有文件描述符准备就绪,则返回零。发生错误时,epoll_wait()返回-1并正确设置errno。

    错误errno:
           EBADF:epfd不是有效的文件描述符。

           EFAULT:具有写许可权不能访问事件指向的存储区。

           EINTR:在任何请求的事件发生或超时到期之前,信号处理程序中断了该调用;参见signal(7)。

           EINVAL:epfd不是epoll文件描述符,或者maxevents小于或等于零。

    注意:当一个线程在对epoll_pwait()的调用中被阻止时,另一个线程有可能向等待的epoll实例添加文件描述符。如果新文件描述符准备就绪,它将导致epoll_wait()调用解除阻止。

    展开全文
  •  epoll - I/O event notification facility SYNOPSIS  #include &lt;sys/epoll.h&gt; DEscrīptION  epoll is a variant of poll(2) that can be used either as Edge or Level  Tri...
  • 水平/边沿触发:指的是epoll_wait()的触发行为 阻塞/非阻塞I/O:指的是文件描述符的读写(read)行为 使用水平触发非阻塞I/O:如果客户端发送10字节,服务端每次只能读5字节,水平触发就会导致缓冲区不断堆积。如果...
  • 我们在利用 gdb 调试带有 epoll_wait select sem_wat 的多线程代码的时候可能会出现非正常返回 -1 的情况,错误原因是:Interrupted system call。这是由于 gdb调试的时候会在断点处插入一条中断指令,当程序执行到...
  • redis的定时任务是死循环+epoll_wait延时来实现的。其函数调用顺序是: redis.cmain 调用ae.c aeMain (死循环) ae.c aeMain 调用 ae.c aeProcessEvents: ae.c aeProcessEvents 调用 ae.c aeSearchNearestTimer...
  • epoll_wait阻塞期间,signal信号会打断该阻塞,使其返回-1并将errno设置成EINTR,此时可以在主循环中判断并处理该情况,一般continue即可!
  • 实际调试过程中,通过epoll_wait等待事件的产生,类似于select()调用,其中有一小段伪代码如下: while(1) { .... int ret = epoll_wait(....) if (ret < 0){ goto err_exit }else{ continue; } ......
  • 问题情境 一般IO复用是使用 one loop per thread 的模型, 一般wait都是可读事件,监听可写需实时添加,如果遇到wait线程在等待可读事件,并一直阻塞下去, 却有一个...因此epoll需要多监听一个套接字作为唤醒wait的...
  • 在linux新的内核中,有了一种替换它的机制,就是epoll。相比于select,epoll最大的好处在于它不会随着监听fd数目的增长而降低效率。因为在内核中的select实现中,它是采用轮询来处理的,轮询的fd数目越多,自然耗时...
  • 要用epoll实现多路IO转接需要用的epoll_create()、epoll_ctl()、epoll_wait()三个函数 一、epoll_create(int size):创建一棵监听红黑树 本质是一棵平衡二叉树(红黑树) 参数size:创建红黑树的监听节点数量(但这...
  • epoll_wait返回事件的测试

    千次阅读 2019-05-22 11:18:15
    epoll返回的事件可以有EPOLLIN,EPOLLOUT,EPOLLRDHUP,EPOLLPRI,EPOLLERR,EPOLLHUP。 关于EPOLLIN,EPOLLOUT,EPOLLPRI相信大家都熟悉,EPOLLIN代表有数据可读,EPOLLOUT代表可写,EPOLLPRI代表有带外数据可读。 ...
  • 在linux新的内核中,有了一种替换它的机制,就是epoll。相比于select,epoll最大的好处在于它不会随着监听fd数目的增长而降低效率。因为在内核中的select实现中,它是采用轮询来处理的,轮询的fd数目越多,自然耗时...
  • 半个月前动手写了web服务器用webbench进行压测导致epoll_wait阻塞不返回,一直查bug历经很长时间才查出,总结一下踩到的坑点… web服务器框架简述 采用reactor模型,主线程调用epoll进行监听,分发给线程池进行解析...
  • libevent 用fd事件来通知epoll_wait,处理事件。epoll_wait在信号处理后,会返回-1 ,错误码为EINTR,直接return 0 nginx用全局变量来通知进程,epoll_wait在信号处理后,会返回-1 ,错误码为EINTR,在主循环里处理...
  • int ready = epoll_wait(epfd,&epv,1,-1); printf("ready--------===%d\n",ready); //--读内容,输出到屏幕 char buf[6]; memset(buf,0x00,sizeof(buf)); int ret = read(epv.data.fd,buf,sizeof(buf)); //...
  • epoll_wait监听事件

    千次阅读 2019-07-11 19:35:15
    一般来讲,epoll_wait在监听事件的时候,对于标准输入输出,套接字都能同时监听到in和out事件,一般我们先监听in然后处理结束后再将out加入,此时,ewait会立马返回out的监听事件,反应堆就是这个原理。 ...
  • 问题现象: 对于测试报告,我们一般应当包含:测试对比环境的软件架构、输入、可视化的对比结果、原因分析&总结 从go 自带的profile 图 只能看出 如题所示,对于问题排查,我们从不吝啬更多信息 用于分析: ...
  • int epoll_create1(int flags); int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
  • 我们在利用 gdb 调试带有 epoll_wait select sem_wat 的多线程代码的时候可能会出现非正常返回 -1 的情况,错误原因是:Interrupted system call。这是由于 gdb调试的时候会在断点处插入一条中断指令,当程序执行到...
  • epoll - I/O event notification facility在linux的网络编程中,很长的时间都在使用select来做事件触发。在linux新的内核中,有了一种替换它的机制,就是epoll。相比于select,epoll最大的好处在于它不会随着监听fd...
  • Linux accept()/epoll_wait()惊群问题与解决方案
  • epoll_wait惊群问题

    2017-11-06 22:42:00
    项目接入层用的模型是,主线程创建listenfd,传入6个子线程,每个子线程一个事件循环,epoll_wait这个listenfd。 如果是listenfd,则epoll_wait返回调用accept,其它fd则另外处理。 这里有个epoll_wait的惊群现象...
  • 今天,当一个程序在epoll_wait阻塞时,用strace跟踪了一下,结果epoll_wait就被EINTR唤醒了,并且返回-1; 所以,当epoll_wait返回-1时,需要判断errno是不是EINTR,如果是,继续epoll_wait就行了。 还有,当一...
  • 浅论epoll_wait

    2019-09-18 07:26:20
    应用场景 类似libaio,属于异步IO 模式,实现批量获取完成的event. 使用方法 step1: 创建epolling fd 可以把epolling fd 想象成一个容器或者代理,里面装需要侦听的文件等描述符。...EPOLL_CREATE(2) Li...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 38,788
精华内容 15,515
关键字:

epoll_wait