精华内容
下载资源
问答
  • IO多路复用O多路复用技术是使用一个可以同时监视多个IO阻塞中间人去监视这些不同IO对象,这些被监视任何一个或多个IO对象有消息返回,都将会触发这个中间人将这些有消息IO对象返回,以供获取他们消息。...

    IO多路复用

    O多路复用技术是使用一个可以同时监视多个IO阻塞的中间人去监视这些不同的IO对象,这些被监视的任何一个或多个IO对象有消息返回,都将会触发这个中间人将这些有消息IO对象返回,以供获取他们的消息。

    使用IO多路复用的优点在于进程在单线程的情况下同样可以同时处理多个IO阻塞。与传统的多线程/多进程模型比,I/O多路复用系统开销小,系统不需要创建新的进程或者线程,也不需要维护这些进程和线程的运行,降底了系统的维护工作量,节省了系统资源,

    Python提供了selector模块来实现IO多路复用。同时,不同的操作系统上,这中间人的可选则的类型是不同的,目前常见的有,epoll, kqueue, devpoll, poll,select等;kqueue(BSD,mac支持),devpoll(solaris支持)和epoll的实现基本相同,epoll在Linux2.5+内核中实现,Windows系统只实现了select。

    epoll,poll, select的比较

    select和poll使用轮询的方式去检测监视的所有IO是否有数据返回,需要不断的遍历每一个IO对象,这是一种费时的操作,效率较低。poll优于select的一点是select限制了最大监视IO数为1024,这对于需要大量网络IO连接的服务器来显然是不够的;而poll对于这个个数没有限制。但是这同样面临问题,在使用轮询的方式监视这些IO时,IO数越大,意味着每一次轮询消耗的时间越多。效率也就越低,这是轮询无法解决的问题。

    epoll就是为了解决这样的问题诞生的,首先他没有最大的监视的IO数的限制,并且没有使用轮询的方式去检测这些IO,而是采用了事件通知机制和回调来获取这些有消息返回的IO对象,只有“活跃”的IO才会主动的去调用callback函数。这个IO将会直接被处理而不需要轮询。

    selector模块的基本使用

    importselectorsimportsocket#创建一个socketIO对象,监听后将可以接受请求消息了

    sock =socket.socket()

    sock.bind(("127.0.0.1", 80))

    sock.listen()

    slt= selectors.DefaultSelector() #使用系统默认selector,Windows为select,linux为epoll

    #将这个socketIO对象加入到,select中监视

    slt.register(fileobj=sock, events=selectors.EVENT_READ, data=None)#循环处理消息

    whileTrue:#select方法:轮询这个selector,当有至少一个IO对象有消息返回时候,将会返回这个有消息的IO对象

    ready_events = slt.select(timeout=None)print(ready_events) # 准备好的IO对象们break

    运行上面的程序,打开浏览器访问127.0.0.1默认80端口,我们的服务器将会输出以下内容。

    #ready_events为一个列表,列表中的每一个元组为一个IO对 和mask值(标识该对象是被读(1)还是写(2)激活的,此处为1)

    [(SelectorKey(fileobj=,

    fd=456, events=1, data=None), 1)]

    该SelectorKey对象有三个属性

    fileobj:注册的socket对象

    fd:文件描述符

    data:注册时我们传入的参数,可以是任意值,绑定到一个属性上,方便之后使用。

    返回了一个列表,包含了注册到这个select中的所有的有数据可接收IO对象。我们通过浏览器访问该服务器后,该IO检测到请求消息,将会被返回等待处理。上面使用break是因为我们没有对这个被激活的IO做处理,select方法将会一直检测到这个激活信号,就会快速的死循环。可以去除break尝试一下。

    处理这个请求,只需要使用该socket对应方法即可,该socket用于接收请求的连接,使用accept方法就可以处理这个请求。当接受请求之后,又将会产生新的客户端,我们将其放入selector中一并监视,当有消息来时,如果是连接请求,handle_request()函数处理,如果是客户端的消息,handle_client_msg()函数处理。下面是部分代码

    #处理这连接请求

    defhandle_request(sock:socket.socket):#使用accept方法可以将这个请求处理掉,该socket只有新的连接请求才会被再次被select检测。

    conn, addr =sock.accept()#返回了一个新的socket,添加到同一个select中去监视它

    slt.register(conn, selector.EVENT_READ, data=None)#此时slt中有两个socket,一个接受新的连接,一个与已连接客户端通信,我们需要做两个处理方法

    defhandle_client_msg(sock:socket.socket)

    data= sock.recv() #处理消息,

    print(data.decode())whileTrue:

    ready_events= slt.select(timeout=None)for event inready_events:if event.fileobj is sock: #新的用户连接请求,accept接受请求

    handle_request(event.fileobj)else: #否则这是客户端的消息

    handle_client_msg(event.fileobj)

    于select中有两类socket,所以我们需要判断被激活后返回的socket是哪一种,再调用不同的函数做不同的请求。如果这个select中的socket种类有很多,将无法如此判断。解决办法就是将处理函数绑定到对应的selectkey对象中,可以使用data参数。

    importselectorsimportsocketdef handle_request(sock:socket.socket): #处理新连接

    conn, addr =sock.accept()

    slt.register(conn, selector.EVENT_READ, data=handle_client_msg)def handle_client_msg(sock:socket.socket) #处理消息

    data =sock.recv()print(data.decode())

    sock=socket.socket()

    sock.bind(("127.0.0.1", 80))

    sock.listen()

    slt=selectors.DefaultSelector()

    slt.register(fileobj=sock, events=selectors.EVENT_READ, data=handle_request)whileTrue:

    ready_events= slt.select(timeout=None)for event, _ inready_events:

    event.data(event.fileobj)#不同的socket有不同data函数,使用自己绑定的data函数调用,再将自己的socket作为参数。就可以处理不同类型的socket。

    上面使用data很好的解决了上面问题,但是需要注意,绑定到data属性上函数(或者说可调用对象)最终会使用event.data(event.fileobj)的方式调用,这些函数接受的参数应该相同。

    展开全文
  • Linux学习之 多路复用

    2016-10-29 01:53:37
    Linux学习之 多路复用   一.多路复用基本概念 ...I/O多路复用技术通过把多个I/O的阻塞复用到同一个select的阻塞上,从而使得系统在单线程的情况下可以同时处理多个客户端请求。 二.多路复用的优点

    Linux学习之 多路复用

     

    一.多路复用基本概念

    IO多路复用是指内核一旦发现进程的一个或者多个IO条件准备读取,它就通知该进程。

    在I/O编程过程中,当需要同时处理多个客户端接入请求时,可以利用多线程或者I/O多路复用技术进行处理I/O多路复用技术通过把多个I/O的阻塞复用到同一个select的阻塞上,从而使得系统在单线程的情况下可以同时处理多个客户端请求。

    二.多路复用的优点:

    与传统的多线程/多进程模型比,I/O多路复用的优势有:

    系统开销小,系统不需要创建新的额外进程或者线程,也不需要维护这些进程和线程的运行,降底了系统的维护工作量,节省了系统资源

     

    .多路复用的应用场景

    1. 服务器需要同时处理多个处于监听状态或者多个连接状态的套接字。

    2. 服务器需要同时处理多种网络协议的套接字。

     

    四.多路复用的函数

    1.intselect(int  numfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds,struct timeval  *timeout);

    函数参数:

    (1)第一个参数maxfdp1指定待测试的描述字个数,它的值是待测试的最大描述字加1(因此把该参数命名为maxfdp1),描述字0、1、2...maxfdp1-1均将被测试,因为文件描述符是从0开始的。

    (2)中间的三个参数readset、writeset和exceptset指定我们要让内核测试读、写和异常条件的描述字。如果对某一个的条件不感兴趣,就可以把它设为空指针。struct fd_set可以理解为一个集合,这个集合中存放的是文件描述符,可通过以下四个宏进行设置:

    voidFD_ZERO(fd_set*fdset);           //清空集合

    voidFD_SET(int fd, fd_set *fdset);   //将一个给定的文件描述符加入集合之中

    voidFD_CLR(int fd, fd_set *fdset);   //将一个给定的文件描述符从集合中删除

    intFD_ISSET(int fd, fd_set *fdset);   // 检查集合中指定的文件描述符是否可以读写 

    (3)timeout告知内核等待所指定描述字中的任何一个就绪可花多少时间。其timeval结构用于指定这段时间的秒数和微秒数。

    structtimeval{

    longtv_sec;   //seconds

             long tv_usec;  //microseconds

         };

    这个参数有三种可能:

    (1)永远等待下去:仅在有一个描述字准备好I/O时才返回。为此,把该参数设置为空指针NULL。

    (2)等待一段固定时间:在有一个描述字准备好I/O时返回,但是不超过由该参数所指向的timeval结构中指定的秒数和微秒数。

    (3)根本不等待:检查描述字后立即返回,这称为轮询。为此,该参数必须指向一个timeval结构,而且其中的定时器值必须为0。

    返回值:返回总的位数这些位对应已准备好的描述符,否则返回-1

     

    select()函数实现IO多路复用的步骤

    1)清空描述符集合

    2)建立需要监视的描述符与描述符集合的关系

    3)调用select函数

    4)检查监视的描述符判断是否已经准备好

    5)对已经准备好的描述符进程IO操作

     

    五.多路复用的应用:

    问题:采用管道函数创建有名管道,使用select函数替代使用poll函数实现多路复用: 创建两个有名管道,获取3个文件描述符(2个管道1个标准输入),然后初始化读文件描述符,select监视文件描述符的文件读写,管道1输出到屏幕上,管道2输出到屏幕上,标准输入‘Q’来进行判读是否退出。

    设计流程:


    编程代码:

    #include <fcntl.h>  
    #include <stdio.h>  
    #include <stdlib.h>  
    #include <unistd.h>  
    #include <string.h>  
    #include <time.h>  
    #include <errno.h>  
       
    #define FIFO1 "in1"  
    #define FIFO2 "in2"  
    #define MAX_BUFFER_SIZE 1024  //缓冲区大小  
    #define IN_FILES 3               //多路复用输入文件数目  
    #define TIME_DELAY 60           //超时秒数  
       
    #define MAX(a,b) ((a > b) ? (a) : (b))  
       
    int main(void)  
    {  
        int fds[IN_FILES];      //管道描述符  
        int i;  
        int res;  
        int real_read;  
        int maxfd;  
       
        char buf[MAX_BUFFER_SIZE];  
       
        struct timeval tv;  
       
        fd_set inset;  
        fd_set tmp_inset;           //文件描述符集  
       
        fds[0] = 0;          //终端的文件描述符  
       
        if(access(FIFO1,F_OK) == -1)         //创建两个有名管道  
        {  
            if((mkfifo(FIFO1,0666) < 0) && (errno != EEXIST))  
            {  
                printf("Cannot creat fifo1 file!\n");  
       
                exit(1);  
            }  
        }  
       
        if(access(FIFO2,F_OK) == -1)  
        {  
            if((mkfifo(FIFO2,0666) < 0) && (errno != EEXIST))  
            {  
                printf("Cannot creat fifo2 file\n");  
       
                exit(1);  
            }  
        }  
       
        if((fds[1] = open(FIFO1,O_RDONLY | O_NONBLOCK)) < 0)   //以只读非阻塞的方式打开两个管道文件  
        {  
            printf("open in1 error!\n");  
       
            return 1;  
        }  
       
        if((fds[2] = open(FIFO2,O_RDONLY | O_NONBLOCK)) < 0)  
        {  
            printf("open in2 error!\n");  
       
            return 1;  
        }  
       
        maxfd = MAX(MAX(fds[0],fds[1]),fds[2]);  //取出两个文件描述符中的较大者  
       
        //初始化读集inset,并在读文件描述符集中加入相应的描述集  
        FD_ZERO(&inset);     //将insert清零,使集合中不含任何fd  
        for(i = 0; i < IN_FILES; i++)  
        {  //将fds[i]加入inset集合  
            FD_SET(fds[i],&inset);  
        }  
       
        FD_SET(0,&inset);  
       
        tv.tv_sec = TIME_DELAY;   //设置超时60s  
        tv.tv_usec = 0;  
    //循环测试该文件描述符是否准备就绪,并调用selelct()函数对相关文件描述符做相应的操作  
    while(FD_ISSET(fds[0],&inset) || FD_ISSET(fds[1],&inset) || FD_ISSET(fds[2],&inset))  
        {    //文件描述符集的备份,以免每次都进行初始化  
            tmp_inset = inset;  
            res = select(maxfd+1,&tmp_inset,NULL,NULL,&tv);  
       
            switch(res)  
            {  
                case -1:  
                    {  
                        printf("Select error!\n");  
       
                        return 1;  
                    }  
                    break;  
       
                case 0:  
                    {  
                        printf("Time out!\n");  
       
                        return 1;  
                    }  
                    break;  
                default:  
                    {  
                        for(i = 0; i < IN_FILES; i++)  
                        {  
                            if(FD_ISSET(fds[i],&tmp_inset))  
                            {  
                                memset(buf,0,MAX_BUFFER_SIZE);  
       
                                real_read = read(fds[i],buf,MAX_BUFFER_SIZE);  
       
                                if(real_read < 0)  
                                {  
                                    if(errno != EAGAIN)  
                                    {  
                                        return 1;  
                                    }  
                                }  
                                else if(!real_read)  //已到达文件尾  
                                {  
                                    close(fds[i]);  
       
                                    FD_CLR(fds[i],&inset);  
                                }  
                                else  
                                {  
                                    if(i == 0)  
                                    {   //主程序终端控制  
                                        if((buf[0] == 'q') || (buf[0] == 'Q'))  
                                        {  
                                            return 1;  
                                        }  
                                    }  
                                    else  
                                    {   //显示管道输入字符串  
                                        buf[real_read] = '\0';  
       
                                        printf("%s",buf);  
                                    }  
                                }  
                            }  
                        }  
                    }  
                 break;  
            }  
        }  
       
        return 0;  


    展开全文
  • 关于I/O多路复用技术 - Epoll剖析

    千次阅读 2015-04-28 10:34:44
    什么是epoll epoll是什么?按照man手册说法:是为处理大批量... is a new API introduced in Linux kernel 2.5.44),它几乎具备了之前所说一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。   ep

    首先看一下,在知乎上流传的,被大家称为“最好的介绍Epoll的文章”。读完确实感觉不错,之所以最好,是因为作者很清晰的从原理的角度用很直白的语言给我们介绍了Epoll到底完成了什么功能。-By Lingtao
    下面是原文:

    最好的“Epoll介绍”

       
    首先我们来定义流的概念,一个流可以是文件,socket,pipe等等可以进行I/O操作的内核对象。

    不管是文件,还是套接字,还是管道,我们都可以把他们看作流。

        之后我们来讨论I/O的操作,通过read,我们可以从流中读入数据;通过write,我们可以往流写入数据。现在假定一个情形,我们需要从流中读数据,但是流中还没有数据,(典型的例子为,客户端要从socket读如数据,但是服务器还没有把数据传回来),这时候该怎么办?

    阻塞:阻塞是个什么概念呢?比如某个时候你在等快递,但是你不知道快递什么时候过来,而且你没有别的事可以干(或者说接下来的事要等快递来了才能做);那么你可以去睡觉了,因为你知道快递把货送来时一定会给你打个电话(假定一定能叫醒你)。

    非阻塞忙轮询:接着上面等快递的例子,如果用忙轮询的方法,那么你需要知道快递员的手机号,然后每分钟给他挂个电话:“你到了没?”

        很明显一般人不会用第二种做法,不仅显很无脑,浪费话费不说,还占用了快递员大量的时间。
        大部分程序也不会用第二种做法,因为第一种方法经济而简单,经济是指消耗很少的CPU时间,如果线程睡眠了,就掉出了系统的调度队列,暂时不会去瓜分CPU宝贵的时间片了。

        为了了解阻塞是如何进行的,我们来讨论缓冲区,以及内核缓冲区,最终把I/O事件解释清楚。缓冲区的引入是为了减少频繁I/O操作而引起频繁的系统调用(你知道它很慢的),当你操作一个流时,更多的是以缓冲区为单位进行操作,这是相对于用户空间而言。对于内核来说,也需要缓冲区。

    假设有一个管道,进程A为管道的写入方,B为管道的读出方。

    假设一开始内核缓冲区是空的,B作为读出方,被阻塞着。然后首先A往管道写入,这时候内核缓冲区由空的状态变到非空状态,内核就会产生一个事件告诉B该醒来了,这个事件姑且称之为“缓冲区非空”。
        但是“缓冲区非空”事件通知B后,B却还没有读出数据;且内核许诺了不能把写入管道中的数据丢掉这个时候,A写入的数据会滞留在内核缓冲区中,如果内核也缓冲区满了,B仍未开始读数据,最终内核缓冲区会被填满,这个时候会产生一个I/O事件,告诉进程A,你该等等(阻塞)了,我们把这个事件定义为“缓冲区满”。

    假设后来B终于开始读数据了,于是内核的缓冲区空了出来,这时候内核会告诉A,内核缓冲区有空位了,你可以从长眠中醒来了,继续写数据了,我们把这个事件叫做“缓冲区非满”
        也许事件Y1已经通知了A,但是A也没有数据写入了,而B继续读出数据,知道内核缓冲区空了。这个时候内核就告诉B,你需要阻塞了!,我们把这个时间定为“缓冲区空”。

    这四个情形涵盖了四个I/O事件,缓冲区满,缓冲区空,缓冲区非空,缓冲区非满(注都是说的内核缓冲区,且这四个术语都是我生造的,仅为解释其原理而造)。这四个I/O事件是进行阻塞同步的根本。(如果不能理解“同步”是什么概念,请学习操作系统的锁,信号量,条件变量等任务同步方面的相关知识)。

        然后我们来说说阻塞I/O的缺点。但是阻塞I/O模式下,一个线程只能处理一个流的I/O事件。如果想要同时处理多个流,要么多进程(fork),要么多线程(pthread_create),很不幸这两种方法效率都不高。
        于是再来考虑非阻塞忙轮询的I/O方式,我们发现我们可以同时处理多个流了(把一个流从阻塞模式切换到非阻塞模式再此不予讨论):
    while true {
    for i in stream[]; {
    if i has data
    read until unavailable
    }
    }
        我们只要不停的把所有流从头到尾问一遍,又从头开始。这样就可以处理多个流了,但这样的做法显然不好,因为如果所有的流都没有数据,那么只会白白浪费CPU。这里要补充一点,阻塞模式下,内核对于I/O事件的处理是阻塞或者唤醒,而非阻塞模式下则把I/O事件交给其他对象(后文介绍的select以及epoll)处理甚至直接忽略。

        为了避免CPU空转,可以引进了一个代理(一开始有一位叫做select的代理,后来又有一位叫做poll的代理,不过两者的本质是一样的)。这个代理比较厉害,可以同时观察许多流的I/O事件,在空闲的时候,会把当前线程阻塞掉,当有一个或多个流有I/O事件时,就从阻塞态中醒来,于是我们的程序就会轮询一遍所有的流(于是我们可以把“忙”字去掉了)。代码长这样:
    while true {
    select(streams[])
    for i in streams[] {
    if i has data
    read until unavailable
    }
    }
        于是,如果没有I/O事件产生,我们的程序就会阻塞在select处。但是依然有个问题,我们从select那里仅仅知道了,有I/O事件发生了,但却并不知道是那几个流(可能有一个,多个,甚至全部),我们只能无差别轮询所有流,找出能读出数据,或者写入数据的流,对他们进行操作。
        但是使用select,我们有O(n)的无差别轮询复杂度,同时处理的流越多,没一次无差别轮询时间就越长。再次
    说了这么多,终于能好好解释epoll了
        epoll可以理解为event poll,不同于忙轮询和无差别轮询,epoll之会把哪个流发生了怎样的I/O事件通知我们。此时我们对这些流的操作都是有意义的。(复杂度降低到了O(1))
        在讨论epoll的实现细节之前,先把epoll的相关操作列出:

    epoll_create 创建一个epoll对象,一般epollfd = epoll_create()

    epoll_ctl (epoll_add/epoll_del的合体),往epoll对象中增加/删除某一个流的某一个事件
    比如
    epoll_ctl(epollfd, EPOLL_CTL_ADD, socket, EPOLLIN);//注册缓冲区非空事件,即有数据流入
    epoll_ctl(epollfd, EPOLL_CTL_DEL, socket, EPOLLOUT);//注册缓冲区非满事件,即流可以被写入
    epoll_wait(epollfd,...)等待直到注册的事件发生
    (注:当对一个非阻塞流的读写发生缓冲区满或缓冲区空,write/read会返回-1,并设置errno=EAGAIN。而epoll只关心缓冲区非满和缓冲区非空事件)。

    一个epoll模式的代码大概的样子是:
    while true {
    active_stream[] = epoll_wait(epollfd)
    for i in active_stream[] {
    read or write till
    }
    }
        限于篇幅,我只说这么多,以揭示原理性的东西,至于epoll的使用细节,请参考man和google,实现细节,请参阅linux kernel source。下面继续开始从原理的角度去分析Epoll是如何实现的。


    什么是epoll

    epoll是什么?按照man手册的说法:是为处理大批量句柄而作了改进的poll。当然,这不是2.6内核才有的,它是在2.5.44内核中被引进的(epoll(4) is a new API introduced in Linux kernel 2.5.44),它几乎具备了之前所说的一切优点,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。

     

    epoll的相关系统调用

    epoll只有epoll_create,epoll_ctl,epoll_wait 3个系统调用。

     

    1. int epoll_create(int size);

    创建一个epoll的句柄。自从linux2.6.8之后,size参数是被忽略的。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。

     

    2. int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

    epoll事件注册函数,它不同于select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。

    第一个参数是epoll_create()的返回值。

    第二个参数表示动作,用三个宏来表示:

    EPOLL_CTL_ADD:注册新的fdepfd中;

    EPOLL_CTL_MOD:修改已经注册的fd的监听事件;

    EPOLL_CTL_DEL:从epfd中删除一个fd

     

    第三个参数是需要监听的fd

    第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:

    1. //保存触发事件的某个文件描述符相关的数据(与具体使用方式有关)  
    2.   
    3. typedef union epoll_data {  
    4.     void *ptr;  
    5.     int fd;  
    6.     __uint32_t u32;  
    7.     __uint64_t u64;  
    8. } epoll_data_t;  
    9.  //感兴趣的事件和被触发的事件  
    10. struct epoll_event {  
    11.     __uint32_t events; /* Epoll events */  
    12.     epoll_data_t data; /* User data variable */  
    13. };  


    events可以是以下几个宏的集合:

    EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭);

    EPOLLOUT:表示对应的文件描述符可以写;

    EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);

    EPOLLERR:表示对应的文件描述符发生错误;

    EPOLLHUP:表示对应的文件描述符被挂断;

    EPOLLET EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。

    EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里



    3. int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);

    收集在epoll监控的事件中已经发送的事件。参数events是分配好的epoll_event结构体数组,epoll将会把发生的事件赋值到events数组中(events不可以是空指针,内核只负责把数据复制到这个events数组中,不会去帮助我们在用户态中分配内存)maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。如果函数调用成功,返回对应I/O上已准备好的文件描述符数目,如返回0表示已超时。

     

    epoll工作原理

    epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。

     

    另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。

     

    Epoll2种工作方式-水平触发(LT)和边缘触发(ET

    假如有这样一个例子:

    1. 我们已经把一个用来从管道中读取数据的文件句柄(RFD)添加到epoll描述符

    2. 这个时候从管道的另一端被写入了2KB的数据

    3. 调用epoll_wait(2),并且它会返回RFD,说明它已经准备好读取操作

    4. 然后我们读取了1KB的数据

    5. 调用epoll_wait(2)......


    Edge Triggered 工作模式:

    如果我们在第1步将RFD添加到epoll描述符的时候使用了EPOLLET标志,那么在第5步调用epoll_wait(2)之后将有可能会挂起,因为剩余的数据还存在于文件的输入缓冲区内,而且数据发出端还在等待一个针对已经发出数据的反馈信息。只有在监视的文件句柄上发生了某个事件的时候 ET 工作模式才会汇报事件。因此在第5步的时候,调用者可能会放弃等待仍在存在于文件输入缓冲区内的剩余数据。在上面的例子中,会有一个事件产生在RFD句柄上,因为在第2步执行了一个写操作,然后,事件将会在第3步被销毁。因为第4步的读取操作没有读空文件输入缓冲区内的数据,因此我们在第5步调用 epoll_wait(2)完成后,是否挂起是不确定的。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。最好以下面的方式调用ET模式的epoll接口,在后面会介绍避免可能的缺陷。

       i    基于非阻塞文件句柄

       ii   只有当read(2)或者write(2)返回EAGAIN时才需要挂起,等待。但这并不是说每次read()时都需要循环读,直到读到产生一个EAGAIN才认为此次事件处理完成,当read()返回的读到的数据长度小于请求的数据长度时,就可以确定此时缓冲中已没有数据了,也就可以认为此事读事件已处理完成。


    Level Triggered 工作模式

    相反的,以LT方式调用epoll接口的时候,它就相当于一个速度比较快的poll(2),并且无论后面的数据是否被使用,因此他们具有同样的职能。因为即使使用ET模式的epoll,在收到多个chunk的数据的时候仍然会产生多个事件。调用者可以设定EPOLLONESHOT标志,在 epoll_wait(2)收到事件后epoll会与事件关联的文件句柄从epoll描述符中禁止掉。因此当EPOLLONESHOT设定后,使用带有 EPOLL_CTL_MOD标志的epoll_ctl(2)处理文件句柄就成为调用者必须作的事情。



    LT(level triggered)是epoll缺省的工作方式,并且同时支持blockno-block socket.在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你 的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表.

     

    ET (edge-triggered)是高速工作方式,只支持no-block socket,它效率要比LT更高。ET与LT的区别在于,当一个新的事件到来时,ET模式下当然可以从epoll_wait调用中获取到这个事件,可是如果这次没有把这个事件对应的套接字缓冲区处理完,在这个套接字中没有新的事件再次到来时,在ET模式下是无法再次从epoll_wait调用中获取这个事件的。而LT模式正好相反,只要一个事件对应的套接字缓冲区还有数据,就总能从epoll_wait中获取这个事件。

    因此,LT模式下开发基于epoll的应用要简单些,不太容易出错。而在ET模式下事件发生时,如果没有彻底地将缓冲区数据处理完,则会导致缓冲区中的用户请求得不到响应。


    Nginx默认采用ET模式来使用epoll。

     

    epoll的优点:

    1.支持一个进程打开大数目的socket描述符(FD)

        select 最不能忍受的是一个进程所打开的FD是有一定限制的,由FD_SETSIZE设置,默认值是2048。对于那些需要支持的上万连接数目的IM服务器来说显然太少了。这时候你一是可以选择修改这个宏然后重新编译内核,不过资料也同时指出这样会带来网络效率的下降,二是可以选择多进程的解决方案(传统的 Apache方案),不过虽然linux上面创建进程的代价比较小,但仍旧是不可忽视的,加上进程间数据同步远比不上线程间同步的高效,所以也不是一种完美的方案。不过 epoll则没有这个限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。

     

    2.IO效率不随FD数目增加而线性下降

        传统的select/poll另一个致命弱点就是当你拥有一个很大的socket集合,不过由于网络延时,任一时间只有部分的socket"活跃"的,但是select/poll每次调用都会线性扫描全部的集合,导致效率呈现线性下降。但是epoll不存在这个问题,它只会对"活跃"socket进行操作---这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的。那么,只有"活跃"socket才会主动的去调用 callback函数,其他idle状态socket则不会,在这点上,epoll实现了一个""AIO因为这时候推动力在os内核。在一些 benchmark中,如果所有的socket基本上都是活跃的---比如一个高速LAN环境,epoll并不比select/poll有什么效率,相反,如果过多使用epoll_ctl,效率相比还有稍微的下降。但是一旦使用idle connections模拟WAN环境,epoll的效率就远在select/poll之上了。

     

    3.使用mmap加速内核与用户空间的消息传递

        这点实际上涉及到epoll的具体实现了。无论是select,poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必要的内存拷贝就很重要,在这点上,epoll是通过内核于用户空间mmap同一块内存实现的。而如果你想我一样从2.5内核就关注epoll的话,一定不会忘记手工 mmap这一步的。

     

    4.内核微调

    这一点其实不算epoll的优点了,而是整个linux平台的优点。也许你可以怀疑linux平台,但是你无法回避linux平台赋予你微调内核的能力。比如,内核TCP/IP协议栈使用内存池管理sk_buff结构,那么可以在运行时期动态调整这个内存pool(skb_head_pool)的大小--- 通过echo XXXX>/proc/sys/net/core/hot_list_length完成。再比如listen函数的第2个参数(TCP完成3次握手的数据包队列长度),也可以根据你平台内存大小动态调整。更甚至在一个数据包面数目巨大但同时每个数据包本身大小却很小的特殊系统上尝试最新的NAPI网卡驱动架构。

     

    linuxepoll如何实现高效处理百万句柄的

    开发高性能网络程序时,windows开发者们言必称iocplinux开发者们则言必称epoll。大家都明白epoll是一种IO多路复用技术,可以非常高效的处理数以百万计的socket句柄,比起以前的selectpoll效率高大发了。我们用起epoll来都感觉挺爽,确实快,那么,它到底为什么可以高速处理这么多并发连接呢?

     

    使用起来很清晰,首先要调用epoll_create建立一个epoll对象。参数size是内核保证能够正确处理的最大句柄数,多于这个最大数时内核可不保证效果。

     

    epoll_ctl可以操作上面建立的epoll,例如,将刚建立的socket加入到epoll中让其监控,或者把 epoll正在监控的某个socket句柄移出epoll,不再监控它等等。

     

    epoll_wait在调用时,在给定的timeout时间内,当在监控的所有句柄中有事件发生时,就返回用户态的进程。

     

    从上面的调用方式就可以看到epollselect/poll的优越之处:因为后者每次调用时都要传递你所要监控的所有socketselect/poll系统调用,这意味着需要将用户态的socket列表copy到内核态,如果以万计的句柄会导致每次都要copy几十几百KB的内存到内核态,非常低效。而我们调用epoll_wait时就相当于以往调用select/poll,但是这时却不用传递socket句柄给内核,因为内核已经在epoll_ctl中拿到了要监控的句柄列表。

     

    所以,实际上在你调用epoll_create后,内核就已经在内核态开始准备帮你存储要监控的句柄了,每次调用epoll_ctl只是在往内核的数据结构里塞入新的socket句柄。

     当一个进程调用epoll_creaqte方法时,Linux内核会创建一个eventpoll结构体,这个结构体中有两个成员与epoll的使用方式密切相关:

    1. /* 
    2.  
    3.  171 * This structure is stored inside the "private_data" member of the file 
    4.  
    5.  172 * structure and represents the main data structure for the eventpoll 
    6.  
    7.  173 * interface. 
    8.  
    9.  174 */  
    10.   
    11.  175struct eventpoll {  
    12.   
    13.  176        /* Protect the access to this structure */  
    14.   
    15.  177        spinlock_t lock;  
    16.   
    17.  178  
    18.   
    19.  179        /* 
    20.  
    21.  180         * This mutex is used to ensure that files are not removed 
    22.  
    23.  181         * while epoll is using them. This is held during the event 
    24.  
    25.  182         * collection loop, the file cleanup path, the epoll file exit 
    26.  
    27.  183         * code and the ctl operations. 
    28.  
    29.  184         */  
    30.   
    31.  185        struct mutex mtx;  
    32.   
    33.  186  
    34.   
    35.  187        /* Wait queue used by sys_epoll_wait() */  
    36.   
    37.  188        wait_queue_head_t wq;  
    38.   
    39.  189  
    40.   
    41.  190        /* Wait queue used by file->poll() */  
    42.   
    43.  191        wait_queue_head_t poll_wait;  
    44.   
    45.  192  
    46.   
    47.  193        /* List of ready file descriptors */  
    48.   
    49.  194        struct list_head rdllist;  
    50.   
    51.  195  
    52.   
    53.  196        /* RB tree root used to store monitored fd structs */  
    54.   
    55.  197        struct rb_root rbr;//红黑树根节点,这棵树存储着所有添加到epoll中的事件,也就是这个epoll监控的事件  
    56.  198  
    57.  199        /* 
    58.  200         * This is a single linked list that chains all the "struct epitem" that 
    59.  201         * happened while transferring ready events to userspace w/out 
    60.  202         * holding ->lock. 
    61.  203         */  
    62.  204        struct epitem *ovflist;  
    63.  205  
    64.  206        /* wakeup_source used when ep_scan_ready_list is running */  
    65.  207        struct wakeup_source *ws;  
    66.  208  
    67.  209        /* The user that created the eventpoll descriptor */  
    68.  210        struct user_struct *user;  
    69.  211  
    70.  212        struct file *file;  
    71.  213  
    72.  214        /* used to optimize loop detection check */  
    73.  215        int visited;  
    74.  216        struct list_head visited_list_link;//双向链表中保存着将要通过epoll_wait返回给用户的、满足条件的事件  
    75.  217};  

    每一个epoll对象都有一个独立的eventpoll结构体,这个结构体会在内核空间中创造独立的内存,用于存储使用epoll_ctl方法向epoll对象中添加进来的事件。这样,重复的事件就可以通过红黑树而高效的识别出来。


    在epoll中,对于每一个事件都会建立一个epitem结构体:

    1. /* 
    2.  130 * Each file descriptor added to the eventpoll interface will 
    3.  131 * have an entry of this type linked to the "rbr" RB tree. 
    4.  132 * Avoid increasing the size of this struct, there can be many thousands 
    5.  133 * of these on a server and we do not want this to take another cache line. 
    6.  134 */  
    7.  135struct epitem {  
    8.  136        /* RB tree node used to link this structure to the eventpoll RB tree */  
    9.  137        struct rb_node rbn;  
    10.  138  
    11.  139        /* List header used to link this structure to the eventpoll ready list */  
    12.  140        struct list_head rdllink;  
    13.  141  
    14.  142        /* 
    15.  143         * Works together "struct eventpoll"->ovflist in keeping the 
    16.  144         * single linked chain of items. 
    17.  145         */  
    18.  146        struct epitem *next;  
    19.  147  
    20.  148        /* The file descriptor information this item refers to */  
    21.  149        struct epoll_filefd ffd;  
    22.  150  
    23.  151        /* Number of active wait queue attached to poll operations */  
    24.  152        int nwait;  
    25.  153  
    26.  154        /* List containing poll wait queues */  
    27.  155        struct list_head pwqlist;  
    28.  156  
    29.  157        /* The "container" of this item */  
    30.  158        struct eventpoll *ep;  
    31.  159  
    32.  160        /* List header used to link this item to the "struct file" items list */  
    33.  161        struct list_head fllink;  
    34.  162  
    35.  163        /* wakeup_source used when EPOLLWAKEUP is set */  
    36.  164        struct wakeup_source __rcu *ws;  
    37.  165  
    38.  166        /* The structure that describe the interested events and the source fd */  
    39.  167        struct epoll_event event;  
    40.  168};  

    此外,epoll还维护了一个双链表,用户存储发生的事件。epoll_wait调用时,仅仅观察这个list链表里有没有数据即eptime项即可。有数据就返回,没有数据就sleep,等到timeout时间到后即使链表没数据也返回。所以,epoll_wait非常高效。

     

    而且,通常情况下即使我们要监控百万计的句柄,大多一次也只返回很少量的准备就绪句柄而已,所以,epoll_wait仅需要从内核态copy少量的句柄到用户态而已,如何能不高效?!

     

    那么,这个准备就绪list链表是怎么维护的呢?当我们执行epoll_ctl时,除了把socket放到epoll文件系统里file对象对应的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个句柄的中断到了,就把它放到准备就绪list链表里。所以,当一个socket上有数据到了,内核在把网卡上的数据copy到内核中后就来把socket插入到准备就绪链表里了。

     

    如此,一颗红黑树,一张准备就绪句柄链表,少量的内核cache,就帮我们解决了大并发下的socket处理问题。执行epoll_create时,创建了红黑树和就绪链表,执行epoll_ctl时,如果增加socket句柄,则检查在红黑树中是否存在,存在立即返回,不存在则添加到树干上,然后向内核注册回调函数,用于当中断事件来临时向准备就绪链表中插入数据。执行epoll_wait时立刻返回准备就绪链表里的数据即可。

     

    epoll的使用方法

    那么究竟如何来使用epoll呢?其实非常简单。

     

    通过在包含一个头文件#include <sys/epoll.h> 以及几个简单的API将可以大大的提高你的网络服务器的支持人数。

     

    首先通过create_epoll(int maxfds)来创建一个epoll的句柄。这个函数会返回一个新的epoll句柄,之后的所有操作将通过这个句柄来进行操作。在用完之后,记得用close()来关闭这个创建出来的epoll句柄。

     

    之后在你的网络主循环里面,每一帧的调用epoll_wait(int epfd, epoll_event events, int max events, int timeout)来查询所有的网络接口,看哪一个可以读,哪一个可以写了。基本的语法为:

    nfds = epoll_wait(kdpfd, events, maxevents, -1);

     

    其中kdpfd为用epoll_create创建之后的句柄,events是一个epoll_event*的指针,当epoll_wait这个函数操作成功之后,epoll_events里面将储存所有的读写事件。max_events是当前需要监听的所有socket句柄数。最后一个timeout epoll_wait的超时,为0的时候表示马上返回,为-1的时候表示一直等下去,直到有事件返回,为任意正整数的时候表示等这么长的时间,如果一直没有事件,则返回。一般如果网络主循环是单独的线程的话,可以用-1来等,这样可以保证一些效率,如果是和主逻辑在同一个线程的话,则可以用0来保证主循环的效率。

     

    epoll_wait返回之后应该是一个循环,遍历所有的事件。

     

     

    几乎所有的epoll程序都使用下面的框架:

    1. for( ; ; )  
    2.    {  
    3.        nfds = epoll_wait(epfd,events,20,500);  
    4.        for(i=0;i<nfds;++i)  
    5.        {  
    6.            if(events[i].data.fd==listenfd) //有新的连接  
    7.            {  
    8.                connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); //accept这个连接  
    9.                ev.data.fd=connfd;  
    10.                ev.events=EPOLLIN|EPOLLET;  
    11.                epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); //将新的fd添加到epoll的监听队列中  
    12.            }  
    13.   
    14.            else if( events[i].events&EPOLLIN ) //接收到数据,读socket  
    15.            {  
    16.                n = read(sockfd, line, MAXLINE)) < 0    //读  
    17.                ev.data.ptr = md;     //md为自定义类型,添加数据  
    18.                ev.events=EPOLLOUT|EPOLLET;  
    19.                epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);//修改标识符,等待下一个循环时发送数据,异步处理的精髓  
    20.            }  
    21.            else if(events[i].events&EPOLLOUT) //有数据待发送,写socket  
    22.            {  
    23.                struct myepoll_data* md = (myepoll_data*)events[i].data.ptr;    //取数据  
    24.                sockfd = md->fd;  
    25.                send( sockfd, md->ptr, strlen((char*)md->ptr), 0 );        //发送数据  
    26.                ev.data.fd=sockfd;  
    27.                ev.events=EPOLLIN|EPOLLET;  
    28.                epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改标识符,等待下一个循环时接收数据  
    29.            }  
    30.            else  
    31.            {  
    32.                //其他的处理  
    33.            }  
    34.        }  
    35.    }  

    epoll的程序实例

    1.  #include <stdio.h>  
    2. #include <stdlib.h>  
    3. #include <unistd.h>  
    4. #include <errno.h>  
    5. #include <sys/socket.h>  
    6. #include <netdb.h>  
    7. #include <fcntl.h>  
    8. #include <sys/epoll.h>  
    9. #include <string.h>  
    10.   
    11. #define MAXEVENTS 64  
    12.   
    13. //函数:  
    14. //功能:创建和绑定一个TCP socket  
    15. //参数:端口  
    16. //返回值:创建的socket  
    17. static int  
    18. create_and_bind (char *port)  
    19. {  
    20.   struct addrinfo hints;  
    21.   struct addrinfo *result, *rp;  
    22.   int s, sfd;  
    23.   
    24.   memset (&hints, 0, sizeof (struct addrinfo));  
    25.   hints.ai_family = AF_UNSPEC;     /* Return IPv4 and IPv6 choices */  
    26.   hints.ai_socktype = SOCK_STREAM; /* We want a TCP socket */  
    27.   hints.ai_flags = AI_PASSIVE;     /* All interfaces */  
    28.   
    29.   s = getaddrinfo (NULL, port, &hints, &result);  
    30.   if (s != 0)  
    31.     {  
    32.       fprintf (stderr, "getaddrinfo: %s\n", gai_strerror (s));  
    33.       return -1;  
    34.     }  
    35.   
    36.   for (rp = result; rp != NULL; rp = rp->ai_next)  
    37.     {  
    38.       sfd = socket (rp->ai_family, rp->ai_socktype, rp->ai_protocol);  
    39.       if (sfd == -1)  
    40.         continue;  
    41.   
    42.       s = bind (sfd, rp->ai_addr, rp->ai_addrlen);  
    43.       if (s == 0)  
    44.         {  
    45.           /* We managed to bind successfully! */  
    46.           break;  
    47.         }  
    48.   
    49.       close (sfd);  
    50.     }  
    51.   
    52.   if (rp == NULL)  
    53.     {  
    54.       fprintf (stderr, "Could not bind\n");  
    55.       return -1;  
    56.     }  
    57.   
    58.   freeaddrinfo (result);  
    59.   
    60.   return sfd;  
    61. }  
    62.   
    63.   
    64. //函数  
    65. //功能:设置socket为非阻塞的  
    66. static int  
    67. make_socket_non_blocking (int sfd)  
    68. {  
    69.   int flags, s;  
    70.   
    71.   //得到文件状态标志  
    72.   flags = fcntl (sfd, F_GETFL, 0);  
    73.   if (flags == -1)  
    74.     {  
    75.       perror ("fcntl");  
    76.       return -1;  
    77.     }  
    78.   
    79.   //设置文件状态标志  
    80.   flags |= O_NONBLOCK;  
    81.   s = fcntl (sfd, F_SETFL, flags);  
    82.   if (s == -1)  
    83.     {  
    84.       perror ("fcntl");  
    85.       return -1;  
    86.     }  
    87.   
    88.   return 0;  
    89. }  
    90.   
    91. //端口由参数argv[1]指定  
    92. int  
    93. main (int argc, char *argv[])  
    94. {  
    95.   int sfd, s;  
    96.   int efd;  
    97.   struct epoll_event event;  
    98.   struct epoll_event *events;  
    99.   
    100.   if (argc != 2)  
    101.     {  
    102.       fprintf (stderr, "Usage: %s [port]\n", argv[0]);  
    103.       exit (EXIT_FAILURE);  
    104.     }  
    105.   
    106.   sfd = create_and_bind (argv[1]);  
    107.   if (sfd == -1)  
    108.     abort ();  
    109.   
    110.   s = make_socket_non_blocking (sfd);  
    111.   if (s == -1)  
    112.     abort ();  
    113.   
    114.   s = listen (sfd, SOMAXCONN);  
    115.   if (s == -1)  
    116.     {  
    117.       perror ("listen");  
    118.       abort ();  
    119.     }  
    120.   
    121.   //除了参数size被忽略外,此函数和epoll_create完全相同  
    122.   efd = epoll_create1 (0);  
    123.   if (efd == -1)  
    124.     {  
    125.       perror ("epoll_create");  
    126.       abort ();  
    127.     }  
    128.   
    129.   event.data.fd = sfd;  
    130.   event.events = EPOLLIN | EPOLLET;//读入,边缘触发方式  
    131.   s = epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event);  
    132.   if (s == -1)  
    133.     {  
    134.       perror ("epoll_ctl");  
    135.       abort ();  
    136.     }  
    137.   
    138.   /* Buffer where events are returned */  
    139.   events = calloc (MAXEVENTS, sizeof event);  
    140.   
    141.   /* The event loop */  
    142.   while (1)  
    143.     {  
    144.       int n, i;  
    145.   
    146.       n = epoll_wait (efd, events, MAXEVENTS, -1);  
    147.       for (i = 0; i < n; i++)  
    148.         {  
    149.           if ((events[i].events & EPOLLERR) ||  
    150.               (events[i].events & EPOLLHUP) ||  
    151.               (!(events[i].events & EPOLLIN)))  
    152.             {  
    153.               /* An error has occured on this fd, or the socket is not 
    154.                  ready for reading (why were we notified then?) */  
    155.               fprintf (stderr, "epoll error\n");  
    156.               close (events[i].data.fd);  
    157.               continue;  
    158.             }  
    159.   
    160.           else if (sfd == events[i].data.fd)  
    161.             {  
    162.               /* We have a notification on the listening socket, which 
    163.                  means one or more incoming connections. */  
    164.               while (1)  
    165.                 {  
    166.                   struct sockaddr in_addr;  
    167.                   socklen_t in_len;  
    168.                   int infd;  
    169.                   char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];  
    170.   
    171.                   in_len = sizeof in_addr;  
    172.                   infd = accept (sfd, &in_addr, &in_len);  
    173.                   if (infd == -1)  
    174.                     {  
    175.                       if ((errno == EAGAIN) ||  
    176.                           (errno == EWOULDBLOCK))  
    177.                         {  
    178.                           /* We have processed all incoming 
    179.                              connections. */  
    180.                           break;  
    181.                         }  
    182.                       else  
    183.                         {  
    184.                           perror ("accept");  
    185.                           break;  
    186.                         }  
    187.                     }  
    188.   
    189.                                   //将地址转化为主机名或者服务名  
    190.                   s = getnameinfo (&in_addr, in_len,  
    191.                                    hbuf, sizeof hbuf,  
    192.                                    sbuf, sizeof sbuf,  
    193.                                    NI_NUMERICHOST | NI_NUMERICSERV);//flag参数:以数字名返回  
    194.                                   //主机地址和服务地址  
    195.   
    196.                   if (s == 0)  
    197.                     {  
    198.                       printf("Accepted connection on descriptor %d "  
    199.                              "(host=%s, port=%s)\n", infd, hbuf, sbuf);  
    200.                     }  
    201.   
    202.                   /* Make the incoming socket non-blocking and add it to the 
    203.                      list of fds to monitor. */  
    204.                   s = make_socket_non_blocking (infd);  
    205.                   if (s == -1)  
    206.                     abort ();  
    207.   
    208.                   event.data.fd = infd;  
    209.                   event.events = EPOLLIN | EPOLLET;  
    210.                   s = epoll_ctl (efd, EPOLL_CTL_ADD, infd, &event);  
    211.                   if (s == -1)  
    212.                     {  
    213.                       perror ("epoll_ctl");  
    214.                       abort ();  
    215.                     }  
    216.                 }  
    217.               continue;  
    218.             }  
    219.           else  
    220.             {  
    221.               /* We have data on the fd waiting to be read. Read and 
    222.                  display it. We must read whatever data is available 
    223.                  completely, as we are running in edge-triggered mode 
    224.                  and won't get a notification again for the same 
    225.                  data. */  
    226.               int done = 0;  
    227.   
    228.               while (1)  
    229.                 {  
    230.                   ssize_t count;  
    231.                   char buf[512];  
    232.   
    233.                   count = read (events[i].data.fd, buf, sizeof(buf));  
    234.                   if (count == -1)  
    235.                     {  
    236.                       /* If errno == EAGAIN, that means we have read all 
    237.                          data. So go back to the main loop. */  
    238.                       if (errno != EAGAIN)  
    239.                         {  
    240.                           perror ("read");  
    241.                           done = 1;  
    242.                         }  
    243.                       break;  
    244.                     }  
    245.                   else if (count == 0)  
    246.                     {  
    247.                       /* End of file. The remote has closed the 
    248.                          connection. */  
    249.                       done = 1;  
    250.                       break;  
    251.                     }  
    252.   
    253.                   /* Write the buffer to standard output */  
    254.                   s = write (1, buf, count);  
    255.                   if (s == -1)  
    256.                     {  
    257.                       perror ("write");  
    258.                       abort ();  
    259.                     }  
    260.                 }  
    261.   
    262.               if (done)  
    263.                 {  
    264.                   printf ("Closed connection on descriptor %d\n",  
    265.                           events[i].data.fd);  
    266.   
    267.                   /* Closing the descriptor will make epoll remove it 
    268.                      from the set of descriptors which are monitored. */  
    269.                   close (events[i].data.fd);  
    270.                 }  
    271.             }  
    272.         }  
    273.     }  
    274.   
    275.   free (events);  
    276.   
    277.   close (sfd);  
    278.   
    279.   return EXIT_SUCCESS;  
    280. }  

    展开全文
  • epoll的优点:1.支持一个进程打开大数目的socket描述符(FD)select 最不能忍受的是一个进程所打开的FD是有一定限制的,由FD_SETSIZE设置,默认值是2048。对于那些需要支持的上万连接数目的IM服务器来说显然太少了。这...

    epoll的优点:1.支持一个进程打开大数目的socket描述符(FD)

    select 最不能忍受的是一个进程所打开的FD是有一定限制的,由FD_SETSIZE设置,默认值是2048。对于那些需要支持的上万连接数目的IM服务器来说显然太少了。这时候你一是可以选择修改这个宏然后重新编译内核,不过资料也同时指出这样会带来网络效率的下降,二是可以选择多进程的解决方案(传统的 Apache方案),不过虽然linux上面创建进程的代价比较小,但仍旧是不可忽视的,加上进程间数据同步远比不上线程间同步的高效,所以也不是一种完美的方案。不过 epoll则没有这个限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左右,具体数目可以cat

    /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。

    2.IO效率不随FD数目增加而线性下降

    传统的select/poll另一个致命弱点就是当你拥有一个很大的socket集合,不过由于网络延时,任一时间只有部分的socket是"活跃"的,但是select/poll每次调用都会线性扫描全部的集合,导致效率呈现线性下降。但是epoll不存在这个问题,它只会对"活跃"的socket进行操作---这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的。那么,只有"活跃"的socket才会主动的去调用 callback函数,其他idle状态socket则不会,在这点上,epoll实现了一个"伪"AIO,因为这时候推动力在os内核。在一些

    benchmark中,如果所有的socket基本上都是活跃的---比如一个高速LAN环境,epoll并不比select/poll有什么效率,相反,如果过多使用epoll_ctl,效率相比还有稍微的下降。但是一旦使用idle connections模拟WAN环境,epoll的效率就远在select/poll之上了。

    3.使用mmap加速内核与用户空间的消息传递。

    这点实际上涉及到epoll的具体实现了。无论是select,poll还是epoll都需要内核把FD消息通知给用户空间,如何避免不必要的内存拷贝就很重要,在这点上,epoll是通过内核于用户空间mmap同一块内存实现的。而如果你想我一样从2.5内核就关注epoll的话,一定不会忘记手工 mmap这一步的。

    4.内核微调

    这一点其实不算epoll的优点了,而是整个linux平台的优点。也许你可以怀疑linux平台,但是你无法回避linux平台赋予你微调内核的能力。比如,内核TCP/IP协议栈使用内存池管理sk_buff结构,那么可以在运行时期动态调整这个内存pool(skb_head_pool)的大小--- 通过echo XXXX>/proc/sys/net/core/hot_list_length完成。再比如listen函数的第2个参数(TCP完成3次握手的数据包队列长度),也可以根据你平台内存大小动态调整。更甚至在一个数据包面数目巨大但同时每个数据包本身大小却很小的特殊系统上尝试最新的NAPI网卡驱动架构。

    Epoll 的高效和其数据结构的设计是密不可分的,这个下面就会提到。

    首先回忆一下 select 模型,当有 I/O 事件到来时, select 通知应用程序有事件到了快去处理,而应用程序必须轮询所有的 FD 集合,测试每个 FD 是否有事件发生,并处理事件;代码像下面这样:

    int res = select(maxfd+1, &readfds, NULL, NULL, 120);

    if (res > 0)

    {

    for (int i = 0; i < MAX_CONNECTION; i++)

    {

    if (FD_ISSET(allConnection[i], &readfds))

    {

    handleEvent(allConnection[i]);

    }

    }

    }

    // if(res == 0) handle timeout, res < 0 handle error

    Epoll 不仅会告诉应用程序有I/0 事件到来,还会告诉应用程序相关的信息,这些信息是应用程序填充的,因此根据这些信息应用程序就能直接定位到事件,而不必遍历整个FD 集合。

    int res = epoll_wait(epfd, events, 20, 120);

    for (int i = 0; i < res;i++)

    {

    handleEvent(events[n]);

    }

    Epoll 关键数据结构

    前面提到 Epoll 速度快和其数据结构密不可分,其关键数据结构就是:

    struct epoll_event {

    __uint32_t events; // Epoll events

    epoll_data_t data; // User data variable

    };

    typedef union epoll_data {

    void *ptr;

    int fd;

    __uint32_t u32;

    __uint64_t u64;

    } epoll_data_t;

    可见 epoll_data 是一个 union 结构体

    , 借助于它应用程序可以保存很多类型的信息

    :fd 、指针等等。有了它,应用程序就可以直接定位目标了。

    使用 epoll

    既然 Epoll 相比 select 这么好,那么用起来如何呢?会不会很繁琐啊 … 先看看下面的三个函数吧,就知道 Epoll 的易用了。

    int epoll_create(int size);

    生成一个 Epoll 专用的文件描述符,其实是申请一个内核空间,用来存放你想关注的 socket fd 上是否发生以及发生了什么事件。 size 就是你在这个 Epoll fd 上能关注的最大 socket fd 数,大小自定,只要内存足够。

    int epoll_ctl(int epfd, int op,

    int fd, struct epoll_event *event );

    控制某个 Epoll 文件描述符上的事件:注册、修改、删除。其中参数 epfd 是 epoll_create() 创建 Epoll 专用的文件描述符。相对于 select 模型中的 FD_SET 和 FD_CLR 宏。

    int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout);

    等待 I/O 事件的发生;参数说明:

    epfd: 由 epoll_create()生成的 Epoll 专用的文件描述符;

    epoll_event: 用于回传代处理事件的数组;

    maxevents: 每次能处理的事件数;

    timeout: 等待 I/O 事件发生的超时值;

    返回发生事件数。

    相对于 select 模型中的 select 函数。

    一个简单的epoll程序

    #include

    #include

    #include

    #include

    #include

    #include

    #include

    #include

    #include

    #include

    #include

    #include

    #define SERVPORT 9527 /*服务器监听端口号 */

    #define BACKLOG 10 /* 最大同时连接请求数 */

    void setnonblocking(int sock)

    {

    int opts;

    opts = fcntl(sock,F_GETFL);

    if(opts<0)

    {

    perror("fcntl(sock,GETFL)");

    exit(1);

    }

    opts = opts|O_NONBLOCK;

    if(fcntl(sock,F_SETFL,opts)<0)

    {

    perror("fcntl(sock,SETFL,opts)");

    exit(1);

    }

    }

    void do_use_fd(int client_fd)

    {

    const char str[] = "God bless you!\n";

    if (send(client_fd, str, sizeof(str), 0) == -1)

    perror("send");

    close(client_fd);

    }

    int main()

    {

    int sockfd;

    struct sockaddr_in my_addr;

    struct sockaddr_in remote_addr;

    if ((sockfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {

    perror("socket");

    exit(1);

    }

    my_addr.sin_family = AF_INET;

    my_addr.sin_port = htons(SERVPORT);

    my_addr.sin_addr.s_addr = INADDR_ANY;

    bzero(&(my_addr.sin_zero), 8);

    if (bind(sockfd, (struct sockaddr *)&my_addr, sizeof(struct sockaddr)) == -1) {

    perror("bind");

    exit(1);

    }

    printf("bind ok\n");

    if (listen(sockfd, BACKLOG) == -1) {

    perror("listen");

    exit(1);

    }

    printf("listen ok\b");

    size_t sin_size = sizeof(struct sockaddr_in);

    #define MAX_EVENTS 10

    struct epoll_event ev, events[MAX_EVENTS];

    int conn_sock, nfds, epollfd;

    epollfd = epoll_create(10);

    if (epollfd == -1) {

    perror("epoll_create");

    exit(EXIT_FAILURE);

    }

    printf("epoll_create\n");

    ev.events = EPOLLIN;

    ev.data.fd = sockfd;

    if (epoll_ctl(epollfd, EPOLL_CTL_ADD, sockfd, &ev) == -1) {

    perror("epoll_ctl: sockfd");

    exit(EXIT_FAILURE);

    }

    printf("epoll_ctl ok\n");

    for (;;) {

    printf("start epoll_wait\n");

    nfds = epoll_wait(epollfd, events, MAX_EVENTS, -1);

    if (nfds == -1) {

    perror("epoll_pwait");

    exit(EXIT_FAILURE);

    }

    printf("epoll_wait returns, nfds = %d\n", nfds);

    for (int n = 0; n < nfds; ++n) {

    if (events[n].data.fd == sockfd) {

    conn_sock = accept(sockfd,

    (struct sockaddr *) &remote_addr, &sin_size);

    if (conn_sock == -1) {

    perror("accept");

    exit(EXIT_FAILURE);

    }

    setnonblocking(conn_sock);

    ev.events = EPOLLIN | EPOLLET;

    ev.data.fd = conn_sock;

    if (epoll_ctl(epollfd, EPOLL_CTL_ADD, conn_sock,

    &ev) == -1) {

    perror("epoll_ctl: conn_sock");

    exit(EXIT_FAILURE);

    }

    } else {

    do_use_fd(events[n].data.fd);

    }

    }

    }

    }

    展开全文
  • 1)Select是一种多路复用IO输入输出模式,在linux输入输出编程中通过select轮询机制,发现可用/可读或可写接口。 2)低级socket程序中有一个共同点:都是基于阻塞式编程方式 3)非阻塞式是函数调用时不...
  • epoll的优点: 1.支持一个进程打开大数目的socket描述符(FD) select 最不能忍受的是一个进程所打开的FD是有一定限制的,由FD_SETSIZE设置,默认值是2048。对于那些需要支持的上万连接数目的IM服务器来说显然太少...
  • 什么是I/O多路复用

    2021-01-23 17:16:10
    的优点就是:与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。 IO复用应用的场合: (1)当客户处理多个描述字...
  • 基本思想:在发送端,它将高速串行数据经过串并变换形成多路低速数据分别对多个正交子载波进行调制,迭加后构成发送信号。在接收端,用同样数量载波进行相干解调,获得低速率数据流,经过并串变换形成高速数据流...
  • Day 44 初识IO复用

    2019-09-02 23:52:32
    io的多路复用,当客户端的并发需求并不是很高时,服务端就没有必要来使用多进程或者多线程来实现高并发。可以使用io复用的技术来进行一般的并发处理, 这种技术的优点是节约资源,但当客户端都是高并发时则不能去...
  • SPDY协议用于优化HTTP协议性能,通过压缩、多路复用和优先级等技术,缩短网页加载时间并提高安全性。 SPDY协议是对HTTP协议一种增强~ HTTP1.x缺点 HTTP/1.0一次只允许在一个TCP连接上发起一个请求;...
  • IP是一种通过互联网或其他使用IP技术的网络,来实现新型的电话通讯。随着互联网日渐普及,以及跨境通讯数量大幅飙升... (1)通过统计上的多路复用而提高的效率。  (2)通过语音压缩和语音活动检测(安静抑制)等增
  • 由于数字技术的高速发展,时分多路复用得到了越来越广泛的应用,提出了各种各样的实用方法。但是,FM-FM频分遥测体制也以它独特的优点而获得广泛的应用。特别是在中容量,速变信号测试中显示了它的生命力。 FM-FM...
  • IP是一种通过互联网或其他使用IP技术的网络,来实现新型的电话通讯。随着互联网日渐普及,以及跨境通讯数量大幅飙升... (1)通过统计上的多路复用而提高的效率。  (2)通过语音压缩和语音活动检测(安静抑制)等增
  • IP电话是语音数据集成与语音/分组IP路由器技术 进展结合经济优势,从而迎来一个新网络环境,这个新环境提供了低成本、高灵活性、高生产率及... (1)通过统计上的多路复用而提高效率。  (2)通过语音压缩和...
  • 菜鸟修行之----java线程与并发:java线程池 ​ 线程池的实现过程没有用到Synchronized关键字,用的都是Volatile,Lock和同步(阻塞)队列,Atomic相关类,FutureTask等等,因为后者的性能更优 ​ 线程池的优点: ...
  • HTTP/2协议优点

    2019-04-24 18:07:29
    HTTP/2围绕着主要7项技术进行讨论 功能 ... 多路复用 SPDY 同域名下所有通信都在单个连接上完成 TLS义务化 Speed+Mobility 必须TLS 协商 Speed+Mobiliy、Friendly 应...
  • ADI故障保护开关和多路复用器 新型产品系列(ADG52xxF和ADG54xxF)就是采用这种技术。  高性能信号链模拟输入保护往往令系统设计人员很头痛。 通常,需要在模拟性能(例如漏电阻和导通电阻)和保护水 平(可由...
  • 前传由于光纤资源不足,基站数量庞大,成本敏感度高等因素,光纤不足的区域引用彩光(WDM波分复用技术,WDM的优点在于从基站到局端,只需要升级终端设备,无需挖掘沟槽来埋设更的光纤而增加不必要的成本。...
  • 24.NAT地址转换技术

    2017-03-08 09:22:45
    NAT技术的优点:节省公有合法地址,处理地址重叠,增强灵活性,安全性。NAT技术分类:v静态NAT:一对一转换,一个私网地址对应一个公网地址。v动态NAT:多对多转换,多个私网地址对应多个公网地址。vPAT:端口多路...
  • 发展中新型光纤

    2021-02-08 15:46:38
    美国米特(Mitre)公司在贝德福科学家认为,在军事指挥、控制和通信(C3)系统应用中,纤维光学是一项业已证明...由于有希望新型光纤和多路复用技术在这个实验室中出现,这一加速发展技术有可能得到新动力。
  • Java NIO核心类库多路复用器Selector就是基于epoll的多路复用技术实现  相比select、poll系统调用,epoll有如下优点: 1.支持一个进程打开socket描述符(FD)不受限制,仅受限于操作系统最大文件句柄数...
  • 一、使用广播信道数据链层 1.1 局域网数据链层 局域网最主要特点是: ①、网络为一个单位所拥有; ②、地理范围和站点数目均有限。 局域网具有如下主要优点: ①、具有广播功能,从一个站点可很方便地...
  • 与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。 select优点: 基于select的I/O多路复用模型的是单进程执行可以...
  • 学习使用epoll - The time is passing - ITeye技术网站学习使用epoll 博客分类:linuxepolllinux epoll是Linux下多路复用IO接口select/poll增强版本,它能显著减少程序在大量并发连接中只有少量活跃情况下...
  • 多路复用技术: (1)*频分多路复用,特点是把电路或空间频带资源分为多个频段,并将其分配给多个用户,每个用户终端数据通过分配给它子通路传输。主要用于电话和电缆电视系统。 (2)*时分多路复用,特点是按...

空空如也

空空如也

1 2 3 4 5
收藏数 84
精华内容 33
关键字:

多路复用技术的优点