精华内容
下载资源
问答
  • IO多路复用是指内核一旦发现进程指定的一个或者多个IO条件准备读取,它就通知该进程。IO多路复用适用如下场合: (1)当客户处理多个描述字时(一般是交互式输入和网络套接口),必须使用I/O复用。 (2)当一个客户...
  • 下面小编就为大家带来一篇IO多路复用之epoll全面总结(必看篇)。小编觉得挺不错的。现在就分享给大家。也给大家做个参考。一起跟随小编过来看看吧
  • 下面小编就为大家带来一篇IO多路复用之poll全面总结(必看篇)。小编觉得挺不错的。现在就分享给大家。也给大家做个参考。一起跟随小编过来看看吧
  • 对python并发进行的笔记整理,个人所学习使用,主要包括多进程,多线程,协程,IO多路复用,进程线程通信等
  • 使用C语言实现的io多路复用http服务器的一个简单例子,可以显示简单的图片文字等,内含makefile,所用到的图片和html,编译运行即可
  • io多路复用帮助你更快更轻松的掌握多并发服务器的搭建
  • IO多路复用

    2020-09-05 10:34:47
    IO多路复用 IO多路复用就是服务端用来处理大量客户端同时连接的情况,select,poll,epoll都是IO多路复用的机制 文章目录IO多路复用IO多路复用操作系统知识补充用户态和内核态进程切换文件描述符FDIO模型BIONIOAIO...

    IO多路复用

    IO多路复用就是服务端用来处理大量客户端同时连接的情况,select,poll,epoll都是IO多路复用的机制

    IO多路复用

    何为IO多路复用?

    IO多路复用就是一个线程或者一个进程同时监视多个文件描述符,一旦某个或某几个文件描述符准备就绪(读写就绪),就通知程序进行相应的读写操作。所以,这里的多路是指有多个网络连接,即多个客户端同时请求的情况,复用:复用同一个线程来监视所有网络连接,一旦某个或者某几个连接读写准备就绪,则通知应用程序进行读写

    IO多路复用解决了什么问题?

    没有IO多路复用机制时,有BIO、NIO两种实现方式,但有一些问题

    • BIO(同步阻塞IO)
      • 如果服务端是单线程:当一个线程执行accept一个请求后,在recv或send调用阻塞时,将无法accept其他请求(必须等上一个请求处recv或send完),无法处理并发
      • 如果服务端是多线程:当accept一个请求后,开启线程进行recv,可以完成并发处理,但随着请求数增加需要增加系统线程,大量的线程占用很大的内存空间,并且线程切换会带来很大的开销,10000个线程真正发生读写事件的线程数不会超过20%,每次accept都开一个线程也是一种资源浪费
    • NIO(同步非阻塞IO)
      • 服务端accept后,将所有请求的客户端或者网络连接都加入到fds文件描述符集合,每次轮询这个集合,有数据则进行读写,无则立马返回错误,每次都需要轮询这个集合

    什么场景需要使用IO多路复用?

    • 当服务器需要处理多个socket连接
    • 当服务器既然处理TCP连接,又要处理UDP连接
    • 如果一个服务器要处理多个服务或多个协议,一般要使用I/O复用。

    与多线程多进程相比,IO多路复用的最大优势就是复用一个线程或者一个进程,系统开销小,也不必对多线程多进程进行维护。

    操作系统知识补充

    用户态和内核态

    现在操作系统都采用虚拟存储器,对于32位操作系统而言,他的寻址空间就是4G(2的32次方),操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限,为了保护应用程序不直接操作内核,保证内核安全,操作系统将虚拟地址空间划分为两部分:

    • 内核空间(内核态)

    • 用户空间(用户态)

    针对linux操作系统而言,将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用,称为内核空间,而将较低的3G字节(从虚拟地址0x00000000到0xBFFFFFFF),供各个进程使用,称为用户空间

    进程切换

    进程的上下文切换是在内核完成的,过程如下:

    • 保存处理器上下文,包括程序计数器和其他寄存器
    • 更新进程PCB信息
    • 把进程的PCB移入相应的队列,如就绪队列或者等待事件完成的阻塞队列
    • 选择另一个进程执行,并更新其PCB
    • 恢复处理机上下文

    文件描述符FD

    文件描述符就是一个用于描述指向文件的引用的抽象画概念

    文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。

    IO模型

    数据到达的路径:

    • 数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。

    BIO

    等待数据和数据拷贝的过程中,这两个过程用户程序都是阻塞的

    优点:

    • 能够及时返回数据,无延迟

    缺点:

    • 对用户来说处于等待就要付出性能的代价了输入图片说明

    NIO

    非阻塞的recvfrom系统调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,此时会返回一个error。进程在返回之后,可以处理其他业务逻辑,然后以轮询的方式发起recvfrom系统调用,直到数据准备好,再拷贝数据到进程,进行数据处理,但是数据拷贝的过程用户进程还是阻塞的,所以NIO最大的特点就是不断主动轮询内核,数据准备好了吗

    优点:能够在等待任务完成的时间里干其他活了(包括提交其他任务,也就是 “后台” 可以有多个任务在同时执行)。

    缺点:任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。这会导致整体数据吞吐量的降低。而且轮询也消耗CPU时间

    输入图片说明

    AIO

    用户程序进行aio_read系统调用之后,无论内核数据是否准备好,都会直接返回给用户进程,然后用户态程序继续执行其他业务逻辑,等到数据准备完成且数据拷贝完成后,内核向进程发送通知,这时候应用程序才开始处理IO读写,AIO中,数据准备和数据拷贝这两个过程都不会阻塞应用程序。

    通知方式:

    • 信号
      • 在linux中是以信号通知的方式进行通知
    • 有一个专门执行回调通知的线程

    输入图片说明

    多路复用模型

    IO多路复用有两个特别的系统调用函数select,poll,epoll,这三个函数可以为我们监听是否有socket数据准备好,只有有1个以上准备好就立马返回,用户进程在等待数据拷贝完成后就可以进行读写

    当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。数据拷贝的过程,用户进程也是阻塞的,拷贝完成后,即可进行数据读写。

    输入图片说明

    **多路复用的特点:**就是通过一个线程或者一个进程同时监视/等待多个IO文件描述符,只要有一个以上数据准备完成,即可返回。监视的方式分为:

    • select
    • poll
    • epoll

    但是千万别以为IO多路复用一定比BIO或者NIO优秀:

    • IO多路复用往往需要使用两个系统调用(select , recvfrom)而BIO只需要调用recvfrom这一个系统调用,如果连接数并不是很多的话,可能延迟还会更大,如果一个连接占用的时间过长也不好,比如要读写的数据很大,就会造成后续已完成数据准备的请求无法及时读写,所以IO多路复用的优势在于处理多连接和短连接
    • IO多路复用的最大优势就是复用线程,开销小,降低了维护难度

    I/O多路复用就是通过一种机制,一个进程可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作。

    select

    select函数会监视文件描述符,调用select函数会阻塞,直到有描述符读就绪或者写就绪,或者超过指定的超时时间,函数返回,当select函数返回后,可以通过遍历fdset来找到准备就绪的描述符

    select函数的缺点:

    • 单个进程能监视的最大文件描述符的数量有上限,linux上是1024,select函数使用bitmap这个数据结构来对监视的文件描述符做标记,这个bitmap在linux中默认是1024,select函数在运行过程中将这个bitmap拷贝到内核,让内核来判断哪个文件描述符准备就绪,所以,这个数据拷贝过程也会带来的一定的系统开销
    • 当select函数返回后,应用程序还需要以O(N)的复杂度去遍历FD集合,来找到准备就绪的FD,然后进行数据处理
    poll

    poll本质上和select没有区别,与select的区别的是他没有最大连接数的限制,原因是他是基于链表来存储的

    poll还有一个特点是“水平触发”,如果报告了fd后,没有被处理,那么下次poll时会再次报告该fd。

    缺点:

    从上面看,select和poll都需要在返回后,通过遍历文件描述符来获取已经就绪的socket。事实上,同时连接的大量客户端在一时刻可能只有很少的处于就绪状态,因此随着监视的描述符数量的增长,其效率也会线性下降。

    epoll

    epoll是select和poll的增强版本

    epoll支持水平触发和边缘触发:

    • 边缘触发:epoll只告诉进程哪些FD准备好了,进程可以不用遍历整个FD集合,降低了开销
    • 水平触发:如果报告了fd后,没有被处理,那么下次epoll时会再次报告该fd。

    epoll还支持使用事件的就绪通知方式,通过epoll_ctl注册文件描述符,一旦FD准备就绪,内核就会采用类似callback的回调机制来激活该fd,epoll_wait便可以收到通知

    总结

    • BIO:数据准备和数据拷贝都是阻塞的
    • NIO:轮询数据是否准备完成,数据拷贝的过程是阻塞的
    • IO多路复用:两次系统调用,select,poll,epoll监视内核数据准备,数据拷贝的过程仍然是阻塞的,适合于多连接,短连接
    • AIO:数据准备和数据拷贝过程是非阻塞的

    输入图片说明

    redis IO模型

    redis的高性能的一个原因是redis采用网络IO多路复用的技术保证在多连接的时候,系统的高吞吐量

    多路-指的是多个socket连接,复用-指的是复用一个线程。多路复用主要有三种技术:select,poll,epoll。epoll是最新的也是目前最好的多路复用技术。

    这里“多路”指的是多个网络连接,“复用”指的是复用同一个线程。采用多路 I/O 复用技术可以让单个线程高效的处理多个连接请求(尽量减少网络IO的时间消耗),且Redis在内存中操作数据的速度非常快(内存内的操作不会成为这里的性能瓶颈),主要以上两点造就了Redis具有很高的吞吐量。

    展开全文
  • io多路复用解析

    2016-12-07 11:04:25
    io多路复用解析
  • 三种IO多路复用机制: 一:select 二:poll 三:epoll 以上三种IO多路复用的完整代码,皆可以在我的资源列表中获取下载: 资源列表:http://download.csdn.net/user/qiulanzhu
  • 什么是IO多路复用? 为什么出现IO多路复用机制? IO多路复用的三种实现方式 select函数接口 select使用示例 select缺点 poll函数接口 poll使用示例 poll缺点 epoll函数接口 epoll使用示例 epoll缺点 epoll...

    阅读本文大概需要 5 分钟。

    看完下面这些,高频面试题你都会答了吧

    目录

    1. 什么是IO多路复用?
    2. 为什么出现IO多路复用机制?
    3. IO多路复用的三种实现方式
    4. select函数接口
    5. select使用示例
    6. select缺点
    7. poll函数接口
    8. poll使用示例
    9. poll缺点
    10. epoll函数接口
    11. epoll使用示例
    12. epoll缺点
    13. epoll LT 与 ET模式的区别
    14. epoll应用
    15. select/poll/epoll之间的区别
    16. IO多路复用完整代码实现
    17. 高频面试题

    1、什么是IO多路复用

    • IO多路复用是一种同步IO模型,实现一个线程可以监视多个文件句柄;一旦某个文件句柄就绪,就能够通知应用程序进行相应的读写操作;没有文件句柄就绪时会阻塞应用程序,交出cpu。多路是指网络连接,复用指的是同一个线程

    2、为什么有IO多路复用机制?

    没有IO多路复用机制时,有BIO、NIO两种实现方式,但有一些问题

    同步阻塞(BIO)
    服务端采用单线程,当accept一个请求后,在recv或send调用阻塞时,将无法accept其他请求(必须等上一个请求处recv或send完),无法处理并发

    // 伪代码描述
    while(1) {
      // accept阻塞
      client_fd = accept(listen_fd)
      fds.append(client_fd)
      for (fd in fds) {
        // recv阻塞(会影响上面的accept)
        if (recv(fd)) {
          // logic
        }
      }  
    }


    服务器端采用多线程,当accept一个请求后,开启线程进行recv,可以完成并发处理,但随着请求数增加需要增加系统线程,大量的线程占用很大的内存空间,并且线程切换会带来很大的开销,10000个线程真正发生读写事件的线程数不会超过20%,每次accept都开一个线程也是一种资源浪费

    // 伪代码描述
    while(1) {
      // accept阻塞
      client_fd = accept(listen_fd)
      // 开启线程read数据(fd增多导致线程数增多)
      new Thread func() {
        // recv阻塞(多线程不影响上面的accept)
        if (recv(fd)) {
          // logic
        }
      }  
    }

     同步非阻塞(NIO)
    服务器端当accept一个请求后,加入fds集合,每次轮询一遍fds集合recv(非阻塞)数据,没有数据则立即返回错误,每次轮询所有fd(包括没有发生读写事件的fd)会很浪费cpu

    setNonblocking(listen_fd)
    // 伪代码描述
    while(1) {
      // accept非阻塞(cpu一直忙轮询)
      client_fd = accept(listen_fd)
      if (client_fd != null) {
        // 有人连接
        fds.append(client_fd)
      } else {
        // 无人连接
      }  
      for (fd in fds) {
        // recv非阻塞
        setNonblocking(client_fd)
        // recv 为非阻塞命令
        if (len = recv(fd) && len > 0) {
          // 有读写数据
          // logic
        } else {
           无读写数据
        }
      }  
    }

    IO多路复用(现在的做法)
    服务器端采用单线程通过select/epoll等系统调用获取fd列表,遍历有事件的fd进行accept/recv/send,使其能支持更多的并发连接请求

    fds = [listen_fd]
    // 伪代码描述
    while(1) {
      // 通过内核获取有读写事件发生的fd,只要有一个则返回,无则阻塞
      // 整个过程只在调用select、poll、epoll这些调用的时候才会阻塞,accept/recv是不会阻塞
      for (fd in select(fds)) {
        if (fd == listen_fd) {
            client_fd = accept(listen_fd)
            fds.append(client_fd)
        } elseif (len = recv(fd) && len != -1) { 
          // logic
        }
      }  
    }


    3、IO多路复用的三种实现方式

    • select
    • poll
    • epoll

    4、select函数接口

    #include <sys/select.h>
    #include <sys/time.h>
     
    #define FD_SETSIZE 1024
    #define NFDBITS (8 * sizeof(unsigned long))
    #define __FDSET_LONGS (FD_SETSIZE/NFDBITS)
     
    // 数据结构 (bitmap)
    typedef struct {
        unsigned long fds_bits[__FDSET_LONGS];
    } fd_set;
     
    // API
    int select(
        int max_fd, 
        fd_set *readset, 
        fd_set *writeset, 
        fd_set *exceptset, 
        struct timeval *timeout
    )                              // 返回值就绪描述符的数目
     
    FD_ZERO(int fd, fd_set* fds)   // 清空集合
    FD_SET(int fd, fd_set* fds)    // 将给定的描述符加入集合
    FD_ISSET(int fd, fd_set* fds)  // 判断指定描述符是否在集合中 
    FD_CLR(int fd, fd_set* fds)    // 将给定的描述符从文件中删除  

    5、select使用示例

    int main() {
      /*
       * 这里进行一些初始化的设置,
       * 包括socket建立,地址的设置等,
       */
     
      fd_set read_fs, write_fs;
      struct timeval timeout;
      int max = 0;  // 用于记录最大的fd,在轮询中时刻更新即可
     
      // 初始化比特位
      FD_ZERO(&read_fs);
      FD_ZERO(&write_fs);
     
      int nfds = 0; // 记录就绪的事件,可以减少遍历的次数
      while (1) {
        // 阻塞获取
        // 每次需要把fd从用户态拷贝到内核态
        nfds = select(max + 1, &read_fd, &write_fd, NULL, &timeout);
        // 每次需要遍历所有fd,判断有无读写事件发生
        for (int i = 0; i <= max && nfds; ++i) {
          if (i == listenfd) {
             --nfds;
             // 这里处理accept事件
             FD_SET(i, &read_fd);//将客户端socket加入到集合中
          }
          if (FD_ISSET(i, &read_fd)) {
            --nfds;
            // 这里处理read事件
          }
          if (FD_ISSET(i, &write_fd)) {
             --nfds;
            // 这里处理write事件
          }
        }
      }

    6、select缺点

    • 单个进程所打开的FD是有限制的,通过FD_SETSIZE设置,默认1024
    • 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
    • 对socket扫描时是线性扫描(对所有的fds遍历扫描),采用轮询的方法,效率较低(高并发时)

    7、poll函数接口

    poll与select相比,只是没有fd的限制,其它基本一样

    
    #include <poll.h>
    // 数据结构
    struct pollfd {
        int fd;                         // 需要监视的文件描述符
        short events;                   // 需要内核监视的事件
        short revents;                  // 实际发生的事件
    };
     
    // API
    int poll(struct pollfd fds[], nfds_t nfds, int timeout);
    

    8、poll使用示例

    #include <poll.h>
    // 数据结构
    struct pollfd {
        int fd;                         // 需要监视的文件描述符
        short events;                   // 需要内核监视的事件
        short revents;                  // 实际发生的事件
    };
     
    // API
    int poll(struct pollfd fds[], nfds_t nfds, int timeout);
    8、poll使用示例
    // 先宏定义长度
    #define MAX_POLLFD_LEN 4096  
     
    int main() {
      /*
       * 在这里进行一些初始化的操作,
       * 比如初始化数据和socket等。
       */
     
      int nfds = 0;
      pollfd fds[MAX_POLLFD_LEN];
      memset(fds, 0, sizeof(fds));
      fds[0].fd = listenfd;
      fds[0].events = POLLRDNORM;
      int max  = 0;  // 队列的实际长度,是一个随时更新的,也可以自定义其他的
      int timeout = 0;
     
      int current_size = max;
      while (1) {
        // 阻塞获取
        // 每次需要把fd从用户态拷贝到内核态
        nfds = poll(fds, max+1, timeout);
        if (fds[0].revents & POLLRDNORM) {
            // 这里处理accept事件
            connfd = accept(listenfd);
            //将新的描述符添加到读描述符集合中
        }
        // 每次需要遍历所有fd,判断有无读写事件发生
        for (int i = 1; i < max; ++i) {     
          if (fds[i].revents & POLLRDNORM) { 
             sockfd = fds[i].fd
             if ((n = read(sockfd, buf, MAXLINE)) <= 0) {
                // 这里处理read事件
                if (n == 0) {
                    close(sockfd);
                    fds[i].fd = -1;
                }
             } else {
                 // 这里处理write事件     
             }
             if (--nfds <= 0) {
                break;       
             }   
          }
        }
      }

    9、poll缺点

    • 每次调用poll,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
    • 对socket扫描时是线性扫描,采用轮询的方法,效率较低(高并发时)

    10、epoll函数接口

    #include <sys/epoll.h>
     
    // 数据结构
    // 每一个epoll对象都有一个独立的eventpoll结构体
    // 用于存放通过epoll_ctl方法向epoll对象中添加进来的事件
    // epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可
    struct eventpoll {
        /*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
        struct rb_root  rbr;
        /*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
        struct list_head rdlist;
    };
     
    // API
     
    int epoll_create(int size); // 内核中间加一个 ep 对象,把所有需要监听的 socket 都放到 ep 对象中
    int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); // epoll_ctl 负责把 socket 增加、删除到内核红黑树
    int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);// epoll_wait 负责检测可读队列,没有可读 socket 则阻塞进程

    11、epoll使用示例

    int main(int argc, char* argv[])
    {
       /*
       * 在这里进行一些初始化的操作,
       * 比如初始化数据和socket等。
       */
     
        // 内核中创建ep对象
        epfd=epoll_create(256);
        // 需要监听的socket放到ep中
        epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev);
     
        while(1) {
          // 阻塞获取
          nfds = epoll_wait(epfd,events,20,0);
          for(i=0;i<nfds;++i) {
              if(events[i].data.fd==listenfd) {
                  // 这里处理accept事件
                  connfd = accept(listenfd);
                  // 接收新连接写到内核对象中
                  epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev);
              } else if (events[i].events&EPOLLIN) {
                  // 这里处理read事件
                  read(sockfd, BUF, MAXLINE);
                  //读完后准备写
                  epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
              } else if(events[i].events&EPOLLOUT) {
                  // 这里处理write事件
                  write(sockfd, BUF, n);
                  //写完后准备读
                  epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);
              }
          }
        }
        return 0;
    }

    12、epoll缺点

    epoll只能工作在linux下

    13、epoll LT 与 ET模式的区别
    epoll有EPOLLLT和EPOLLET两种触发模式,LT是默认的模式,ET是“高速”模式。

    • LT模式下,只要这个fd还有数据可读,每次 epoll_wait都会返回它的事件,提醒用户程序去操作
    • ET模式下,它只会提示一次,直到下次再有数据流入之前都不会再提示了,无论fd中是否还有数据可读。所以在ET模式下,read一个fd的时候一定要把它的buffer读完,或者遇到EAGAIN错误

    14、epoll应用

    • redis
    • nginx

    15、select/poll/epoll之间的区别

    select    poll    epoll
    数据结构    bitmap    数组    红黑树
    最大连接数    1024    无上限    无上限
    fd拷贝    每次调用select拷贝    每次调用poll拷贝    fd首次调用epoll_ctl拷贝,每次调用epoll_wait不拷贝
    工作效率    轮询:O(n)    轮询:O(n)    回调:O(1)
    16、完整代码示例
    https://github.com/caijinlin/learning-pratice/tree/master/linux/io

    17、高频面试题

    • 什么是IO多路复用?
    • nginx/redis 所使用的IO模型是什么?
    • select、poll、epoll之间的区别
    • epoll 水平触发(LT)与 边缘触发(ET)的区别?

    原文链接:https://blog.csdn.net/caspar_notes/article/details/106991119

    展开全文
  • 目录 ...三、Tomcat之IO多路复用应用 1.模型流程 2.模型初始化源码分析 2.1 ProtocolHandler之Http11NioProtocol初始化 2.2 Endpoint和Accptor初始化 3.模型运行源码分析 3.1 Accptor接收器

    目录

    一、多路复用

    1.概念引入

    2.BIO处理并发情况

    3.多路复用IO并发情况

    4.两者的对比

    二、多路复用模型

    1.Selector和Poll模型

    2.epoll模型(仅限Linux系统)

    3.Reactor多线程模型

    三、Tomcat之IO多路复用应用

    1.模型流程

    2.模型初始化源码分析

    2.1 ProtocolHandler之Http11NioProtocol初始化

    2.2 Endpoint和Accptor初始化

    3.模型运行源码分析

    3.1 Accptor接收器

    3.2 Reactor容器之Endpoint

    3.3 处理IO多路复用的Poller和PollerEvent

    3.4 执行用户请求的线程对象SocketProcessorBase


    一、多路复用

    1.概念引入

    如果想知道Tomcat对于IO多路复用的应用,就应该先了解IO多路复用是个什么概念。假设系统现在需要处理两个互相独立的IO事件:A事件和B事件,并且这两个事件的触发时间是随机的,那么系统应该如何处理A和B呢?对于传统的BIO而言,要处理这两个事件那么只能创建两个线程来分别监听处理,但是如果事件的数量是动态变化的,那么使用常规BIO将会损耗过多的系统资源。而如果我们的入口线程只有一个,优先处理哪个事件将成为难题。IO多路复用便是为了处理这种场景而出现的。

    IO多路复用:IO就是指的我们网络IO,多路指多个TCP连接(或多个Channel),复用指复用一个或少量线程。串起来理解就是很多个网络IO复用一个或少量的线程来处理这些连接。

    2.BIO处理并发情况

    常规的BIO同时请求A和B事件的大致示意图:

    因为BIO是直接开始处理流的,当没有处理完时其它的线程请求时将会被阻塞,请求无法被程序接收,从而导致系统资源的浪费。使用BIO如果想要满足多个请求,只能创建更多的线程来处理,但如果对于服务器而言,任意时刻请求的数量大多是不可预估的,且线程之间的上下文切换开销也会是个问题,这就意味着要么浪费资源性能来空耗,要么就让服务器的并发量很低。

    3.多路复用IO并发情况

    使用多路复用IO同时请求A和B事件的大致示意图:

    和常规的BIO不同的是,多路复用使用了Selector复用器来对请求进来的事件进行统一的注册分发,随后再使用一个或者少量线程来对IO请求进行真正的处理,而不是让请求B一直阻塞在那里,直到A整个请求完成。

    4.两者的对比

    可能看到这个图只是知道BIO和多路复用IO的大致区别,对于多路复用IO和BIO都是需要阻塞的,我们就需要知道各自是在哪里阻塞的,阻塞的效果是什么。

    • BIO:阻塞的是操作系统资源,当占用时其它的线程无法使用,只能阻塞,因此多个线程进来时只能有一个请求被处理,直到这个请求被处理完才可以处理其它的请求;
    • 多路复用IO:阻塞的是用应用程序层面,不阻塞操作系统资源,因此多个请求进来时虽然被阻塞,但当请求就绪时,就会使用操作系统资源直接复制,复制后再后续处理其它就绪的请求。

    引用网上一篇文章的话:从应用进程的角度去理解始终是阻塞的,等待数据和将数据复制到用户进程这两个阶段都是阻塞的。这一点我们从应用程序是可以清楚的得知,比如我们调用一个以I/O复用为基础的NIO应用服务。调用端是一直阻塞等待返回结果的。

    从操作系统的角度等待Selector上面的网络事件就绪,是阻塞的,如果没有任何一个网络事件就绪则一直等待直到有一个或者多个网络事件就绪。但是从操作系统的角度考虑,有一点是不阻塞的,就是复制数据,因为操作系统不用等待,当有就绪条件满足的时候,它直接复制,其余时间在处理别的就绪的条件。这也是大家一直说的非阻塞I/O。实际上是就是指的这个地方的非阻塞。

    二、多路复用模型

    1.Selector和Poll模型

    学过JDK的NIO应该熟悉这种模型,这种模型一般适用于以下场景:

    1. 当客户处理多个描述字时(一般是交互式输入和网络套接口),必须使用I/O复用;
    2. 当一个客户同时处理多个套接口时,而这种情况是可能的,但很少出现;
    3. 如果一个TCP服务器既要处理监听套接口,又要处理已连接套接口,一般也要用到I/O复用;
    4. 如果一个服务器即要处理TCP,又要处理UDP,一般要使用I/O复用;
    5. 如果一个服务器要处理多个服务或多个协议,一般要使用I/O复用。

    与传统的BIO相比,这种模型的优势便在于系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。JDK对其实现的示意图如下:

    这种模式的缺陷:

    1. 单个进程能够监视的文件描述符的数量存在最大限制,通常是1024(Window和Linux默认的都是1024),当然可以更改数量,但由于select采用轮询的方式扫描文件描述符,文件描述符数量越多,性能越差;
    2. 内核 / 用户空间内存拷贝问题,select需要复制大量的句柄数据结构,产生巨大的开销;
    3. select返回的是含有整个句柄的数组,应用程序需要遍历整个数组才能发现哪些句柄发生了事件;
    4. select的触发方式是水平触发,应用程序如果没有完成对一个已经就绪的文件描述符进行IO操作,那么之后每次select调用还是会将这些文件描述符通知进程。

    Poll使用的是链表保存文件描述符,因此不存在第一个缺陷,但其它三个缺点依然存在。

    2.epoll模型(仅限Linux系统)

    epoll模型便是Linux用来解决前面提到select和poll两种模型缺陷的增强版,这种模型一般被用于高并发服务型程序,特别是在大量并发连接中只有少部分连接处于活跃下的情况。epoll采用的是事件驱动,不用像select和poll去遍历监听描述符集合中的所有文件描述符,而是遍历那些被操作系统IO事件异步唤醒后加入到就绪队列,并返回到用户空间的描述符集合。

    由于Springboot项目和Tomcat项目基本没用到epoll模型,因此只做简单的带过。

    3.Reactor多线程模型

    Reactor模型基于事件驱动,特别适合处理海量的I/O事件。Reactor模型中定义的三种角色:

    1. Acceptor:处理客户端新连接,并分派请求到处理器链中;
    2. Reactor:负责监听和分配事件,将I/O事件分派给对应的Handler。新的事件包含连接建立就绪、读就绪、写就绪等;
    3. Handler:将自身与事件绑定,执行非阻塞读/写任务,完成channel的读入,完成处理业务逻辑后,负责将结果写出channel。可用资源池来管理。

    经典的架构图如下:

    三、Tomcat之IO多路复用应用

    1.模型流程

    Tomcat9所使用的IO多路复用模型可以看成是Reactor、Select和Poll的组合型,此话怎讲?上面画了Select和Reactor的大致模型图,现在来看下Tomcat关于多路复用的模型图:

    这个模型图一看便可以知道select、poll这两个在Reactor模型中承担了Reactor角色的作用。在tomcat的这个模型中,select的流程有点稍微的区别,原来的select模型是在调用select方法时阻塞,然后处理接收数据就绪的通道。而在tomcat模型中其各自工作流程如下:

    1. 在Acceptor角色中先调用accept方法接收IO请求通道Channel;
    2. 把IO通道放入PollerEvent对象中;
    3. 随后Poller线程中将会遍历PollerEvent数组,将其中的IO通道注册到Selector中;
    4. 使用Selector那套流程再对已经进来的IO请求进行一次多路复用;
    5. 把和SelectionKey关联起来的IO通道传到线程池中的处理线程;
    6. 处理线程对不同的请求类型进行处理返回(如HTTP和WebSocket这种不同的请求类型)。

    2.模型初始化源码分析

    上面已经描述过大致的模型图和流程,接下来稍微结合Tomcat关于这方便的实现来大致说明Tomcat对于IO多路复用的实现。

    2.1 ProtocolHandler之Http11NioProtocol初始化

    如果要谈Tomcat的IO多路复用实现,那么模型架构的初始化就不得不说。在我们常用的Springboot中,代码中指定了默认的协议处理器。简易代码如下:

    public class TomcatServletWebServerFactory 
            extends AbstractServletWebServerFactory
            implements ConfigurableTomcatWebServerFactory, 
            ResourceLoaderAware {
        public static final String DEFAULT_PROTOCOL = 
                "org.apache.coyote.http11.Http11NioProtocol";
    }

    因此Http11NioProtocol实现类就是我们分析Tomcat对于IO多路复用模型使用的入口。需要注意的是除了Http11NioProtocol实现类,还有诸如Http11Nio2Protocol、Http11AprProtocol等实现类,但是一般而言我们使用的都是Http11NioProtocol,除非有特殊的需求。

    对于每个ProtocolHandler实现类而言,每个实现类都有一套对应的Endpoint、Acceptor等实现类,而对于Http11NioProtocol来说,其对应的实现类如下:

    • Endpoint为NioEndpoint类;
    • Acceptor就是Acceptor类;
    • executor为ThreadPoolExecutor类;
    • ThreadFactory为TaskThreadFactory类;
    • workQueue为TaskQueue类;
    • SocketProcessorBase为NioEndpoint内部类SocketProcessor。

    看过Tomcat框架源码的都会知道,Tomcat的关键组件都是有一套生命周期流程的,要看初始化直接从init()或者start()方法开始就行了。因此我们直接看到其抽象实现类AbstractProtocol的init()和start()方法,其关键源码如下:

    public abstract class AbstractProtocol<S> implements ProtocolHandler, MBeanRegistration {
        @Override
        public void init() throws Exception {
            if (oname == null) {
                // 创建之后的oname格式为:
                // domain:type=ProtocolHandler,port=port,address=hostAddress
                oname = createObjectName();
                if (oname != null) {
                    Registry.getRegistry(null, null).registerComponent(this, 
                            oname, null);
                }
            }
            if (this.domain != null) {
                rgOname = new ObjectName(domain +":type=GlobalRequestProcessor,
                        name=" + getName());
                Registry.getRegistry(null, null).registerComponent(
                        getHandler().getGlobal(), rgOname, null);
            }
            // 在Http11NioProtocol实例化时,就已经实例化了endpoint
            // 类型为NioEndpoint
            // 调用完getName()方法,返回的endpointName格式为:
            // SSL的类型    :https-openssl/jsse-nio[-hostAddress]-port
            // 普通HTTP类型 :http-nio[-hostAddress][-port/auto-N]
            String endpointName = getName();
            endpoint.setName(endpointName.substring(1, 
                    endpointName.length()-1));
            endpoint.setDomain(domain);
            // endpoint被初始化
            endpoint.init();
        }
        @Override
        public void start() throws Exception {
            // 调用start方法,并且启动监听
            endpoint.start();
            monitorFuture = getUtilityExecutor().scheduleWithFixedDelay(
                    new Runnable() {
                        @Override
                        public void run() {
                            if (!isPaused()) {
                                startAsyncTimeout();
                            }
                        }
                    }, 0, 60, TimeUnit.SECONDS);
        }
    }

    2.2 Endpoint和Accptor初始化

    Endpoint的操作是在刚刚的ProtocolHandler中被调用的,分别是init()和start()方法,因此初始化我们就看这两个方法就行了。部分源码如下:

    public abstract class AbstractEndpoint<S,U> {
        public final void init() throws Exception {
            // 主要初始化的便是初始化服务器socket并绑定地址
            if (bindOnInit) {
                // 绑定IP和端口信息,初始化serverSock服务端口对象以及selectorPool
                bindWithCleanup();
                bindState = BindState.BOUND_ON_INIT;
            }
            // 后面的方法可以忽略
        }
        public final void start() throws Exception {
            if (bindState == BindState.UNBOUND) {
                // 绑定IP和端口信息
                bindWithCleanup();
                bindState = BindState.BOUND_ON_START;
            }
            // 开始真正的start方法
            startInternal();
        }
        @Override
        public void startInternal() throws Exception {
            // 确保只会初始化一次
            if (!running) {
                running = true;
                paused = false;
                // processorCache用来缓存SocketProcessorBase类型的对象
                // 避免每次处理一个IO请求都重复创建新的对象,减少创建和销毁对象开销
                if (socketProperties.getProcessorCache() != 0) {
                    // SynchronizedStack.DEFAULT_SIZE默认为128
                    processorCache = new SynchronizedStack<>(
                            SynchronizedStack.DEFAULT_SIZE,
                            socketProperties.getProcessorCache());
                }
                // 每次当有新的请求进来时都会产生一个PollerEvent事件
                // 当有新请求时如果以前的PollerEvent事件有空闲的,则会被拿来重复
                // 使用,同样也是为了减少对象开销
                if (socketProperties.getEventCache() != 0) {
                    eventCache = new SynchronizedStack<>(
                            SynchronizedStack.DEFAULT_SIZE,
                            socketProperties.getEventCache());
                }
                // 同样的作用,为了减少对象开销
                if (socketProperties.getBufferPool() != 0) {
                    nioChannels = new SynchronizedStack<>(
                            SynchronizedStack.DEFAULT_SIZE,
                            socketProperties.getBufferPool());
                }
                // 如果处理线程的工作线程池为空则创建
                if (getExecutor() == null) {
                    createExecutor();
                }
                // 初始化connectionLimitLatch,用来控制最大并发连接数
                initializeConnectionLatch();
                // 开始实例化Poller,并且将其设定为保护线程,让其在后台一直运行
                poller = new Poller();
                Thread pollerThread = new Thread(poller, getName() + 
                        "-ClientPoller");
                pollerThread.setPriority(threadPriority);
                pollerThread.setDaemon(true);
                pollerThread.start();
                // 开始初始化Accptor对象
                startAcceptorThread();
            }
        }
        protected void startAcceptorThread() {
            // 同样的,实例化Acceptor对象,并设定为保护线程,让其一直在后台运行
            acceptor = new Acceptor<>(this);
            String threadName = getName() + "-Acceptor";
            acceptor.setThreadName(threadName);
            Thread t = new Thread(acceptor, threadName);
            t.setPriority(getAcceptorThreadPriority());
            t.setDaemon(getDaemon());
            t.start();
        }
    }

    就这么几段代码,将接下来要分析的Tomcat IO多路复用关键组件全都初始化了。

    3.模型运行源码分析

    模型已经初始化完成,接下来粗略的看下Tomcat的IO多路复用组件是如何搭配运行的。

    注:接下来的源码分析只会贴出一些对于流程特别重要的代码,对于和数据流向无关的代码忽略,并且只看正常流程,受制于篇幅异常流程暂不分析。

    3.1 Accptor接收器

    这个组件的作用便是用来接收新的IO请求,并将其封装成Channel,随后传给Reacto处理。由于Acceptor是一个线程实现类,因此运行时只需要从run()方法开始看起即可,其关键源码如下:

    public class Acceptor<U> implements Runnable {
        @Override
        public void run() {
            // 如果endpoint没有被关闭,则无线循环下去
            while (endpoint.isRunning()) {
                // 状态控制忽略
                ...
                try {
                    // 在这里控制最大的连接数,如果框架内的连接数已达maxConnections
                    // 则在这里暂停,等待有socket被关闭或者销毁
                    // 需要注意的是maxConnections默认值是10000,即意味着本服务器
                    // 同时最多只能有10000个套接字同时运行被Acceptor这套体系管理
                    // 如果并发量超过了10000则不会再创建新的套接字,直到有新的套接字
                    // 在这个流程中被关闭或者销毁
                    endpoint.countUpOrAwaitConnection();
                    // 如果暂停则不再接收新的套接字请求
                    if (endpoint.isPaused()) {
                        continue;
                    }
                    U socket = null;
                    try {
                        // 接收下一个从ServerSocketChannel对象中接收到的请求
                        socket = endpoint.serverSocketAccept();
                    } catch (Exception ioe) {
                        // 如果没有获取到socket则让阈值-1
                        endpoint.countDownConnection();
                        // 略
                        ...
                    }
                    ...
                    // 开始处理Socket套接字
                    if (endpoint.isRunning() && !endpoint.isPaused()) {
                        // 如果setSocketOptions()处理成功则会返回true,失败
                        // 则会关闭套接字
                        if (!endpoint.setSocketOptions(socket)) {
                            endpoint.closeSocket(socket);
                        }
                    } else {
                        // 如果没有运行或者暂停了则销毁套接字
                        endpoint.destroySocket(socket);
                    }
                } catch (Throwable t) {
                    ...
                }
            }
            state = AcceptorState.ENDED;
        }
    }

    Acceptor的作用实际上就是不断调用endpoint来获取Socket套接字,然后再调用endpoint中的方法来处理套接字,同时实现一个控制maxConnections参数的功能。满足Reactor所定义的用来处理客户端新连接,并将请求分发到处理器中。

    3.2 Reactor容器之Endpoint

    这个类的功能定位便是连接Poller和Acceptor,相当于是一个切入点或者管理类,毕竟Poller所执行的流程都在NioEndpoint中进行。其接收Socket套接字和将Socket注册到Poller的关键源码如下:

    public class NioEndpoint 
            extends AbstractJsseEndpoint<NioChannel,SocketChannel> {
        // NIO中负责接收客户端请求的服务类
        private volatile ServerSocketChannel serverSock = null;
        @Override
        protected SocketChannel serverSocketAccept() throws Exception {
            // 接收时这里面的方法会阻塞,直到有新的连接就绪并复制成功才会返回
            // 具体的细节有兴趣可以看下流程,但这里对于其它的类的流程不做过多分析
            return serverSock.accept();
        }
        @Override
        protected boolean setSocketOptions(SocketChannel socket) {
            // 开始执行连接
            try {
                // 将socket的阻塞状态关闭
                socket.configureBlocking(false);
                Socket sock = socket.socket();
                socketProperties.setProperties(sock);
                // 将Socket封装成NioChannel
                NioChannel channel = null;
                if (nioChannels != null) {
                    // 试着使用已经缓存的nioChannel通道
                    channel = nioChannels.pop();
                }
                // 如果没有缓存了,则新创建封装一个
                if (channel == null) {
                    SocketBufferHandler bufhandler = new SocketBufferHandler(
                            socketProperties.getAppReadBufSize(),
                            socketProperties.getAppWriteBufSize(),
                            socketProperties.getDirectBuffer());
                    if (isSSLEnabled()) {
                        channel = new SecureNioChannel(socket, bufhandler, 
                                selectorPool, this);
                    } else {
                        // 一般而言会创建这种类型
                        channel = new NioChannel(socket, bufhandler);
                    }
                } else {
                    // 如果使用缓存成功,则更新socket,且将对象中的动态数据重置
                    channel.setIOChannel(socket);
                    channel.reset();
                }
                // 进一步封装Channel对象,后续使用的对象类型都是这种类型的
                NioSocketWrapper socketWrapper = new NioSocketWrapper(channel, 
                        this);
                channel.setSocketWrapper(socketWrapper);
                socketWrapper.setReadTimeout(getConnectionTimeout());
                socketWrapper.setWriteTimeout(getConnectionTimeout());
                socketWrapper.setKeepAliveLeft(NioEndpoint.this
                        .getMaxKeepAliveRequests());
                socketWrapper.setSecure(isSSLEnabled());
                // 注册到Poller对象中,开始进行IO多路复用,随后的主角便是Poller
                poller.register(channel, socketWrapper);
                // 注册成功后返回true
                return true;
            } catch (Throwable t) {
                ...
            }
            return false;
        }
    }

    对于Endpoint而言,在IO多路复用中其实际上更像一个容器,里面包含了Poller和PollerEvent等对象,并且在Endpoint中实现整个Reactor的角色职能。

    3.3 处理IO多路复用的Poller和PollerEvent

    从上面的逻辑延续下来,我们暂时可以知道接下来应该需要看三个方法:一个是register,另外两个分别是Poller和PollerEvent的run()方法。先看到Poller的相关关键源码:

    public class Poller implements Runnable {
        public void register(final NioChannel socket, 
                final NioSocketWrapper socketWrapper) {
            // 这是OP_REGISTER变成的,对应着OP_REGISTER,标志进行read操作
            socketWrapper.interestOps(SelectionKey.OP_READ);
            PollerEvent r = null;
            // 试着从缓存中拿取到空闲对象
            if (eventCache != null) {
                r = eventCache.pop();
            }
            // 如果没有空闲对象则实例化一个,有的话则重置内部动态属性
            // 且将intOps设置成OP_REGISTER,后续会用到
            if (r == null) {
                r = new PollerEvent(socket, OP_REGISTER);
            } else {
                r.reset(socket, OP_REGISTER);
            }
            // 将获得的PollerEvent事件添加到events数组中
            addEvent(r);
        }
        private void addEvent(PollerEvent event) {
            // 添加数据到数组
            events.offer(event);
            // 设置信号量,说明已经有新的请求事件发生,这里后续会说明
            if (wakeupCounter.incrementAndGet() == 0) {
                selector.wakeup();
            }
        }
        @Override
        public void run() {
            // 一直轮询直到destroy()方法被调用
            while (true) {
                boolean hasEvents = false;
                try {
                    if (!close) {
                        // 没有关闭,先调用events方法,这个方法的作用便是异步唤醒
                        // 所有的PollerEvent
                        hasEvents = events();
                        // 设置wakeupCounter为-1,代表selector正在阻塞,等待
                        // 新的请求进来
                        if (wakeupCounter.getAndSet(-1) > 0) {
                            // wakeupCounter原来的值如果大于0,代表有请求事件
                            // 无需等待,直接立刻获取即可
                            keyCount = selector.selectNow();
                        } else {
                            // 如果进到这里面说明当前没有请求事件,需要阻塞等待
                            // 有新的请求事件发生
                            keyCount = selector.select(selectorTimeout);
                        }
                        // 获取完之后重新归0
                        wakeupCounter.set(0);
                    }
                    if (close) {
                        // 如果已经关闭,也要依次异步唤醒PollerEvent事件,在里面
                        // 对其进行相应的处理
                        events();
                        timeout(0, false);
                        try {
                            // 关闭slector复用器
                            selector.close();
                        } catch (IOException ioe) {
                        }
                        break;
                    }
                } catch (Throwable x) {
                    continue;
                }
                // 跑到这里keyCount为0说明selector复用器没有获取到请求
                // 不管是因为超时还是wakeup导致的,如果hasEvents为false都先
                // 唤醒一遍全部的PollerEvent事件
                if (keyCount == 0) {
                    hasEvents = (hasEvents | events());
                }
                // 如果有请求事件,则将selector的所有已就绪selectionKey遍历一遍
                Iterator<SelectionKey> iterator =
                    keyCount > 0 ? selector.selectedKeys().iterator() : null;
                while (iterator != null && iterator.hasNext()) {
                    // 获取SelectionKey和刚在外部封装的NioSocketWrapper对象
                    SelectionKey sk = iterator.next();
                    NioSocketWrapper socketWrapper = 
                            (NioSocketWrapper) sk.attachment();
                    if (socketWrapper == null) {
                        // 为空则删除
                        iterator.remove();
                    } else {
                        // 先删除,再调用processKey()方法,processKey方法如果调用
                        // 正常的话最终将会调用到processSocket()方法中
                        iterator.remove();
                        processKey(sk, socketWrapper);
                    }
                }
                // 处理超时情况,如果没有事件则将注册到selector的selectionKey取消
                // 取消后会调用socketWrapper的close方法、SelectionKey的cancel方法
                // 并将SelectionKey关联的Channel通道也关闭
                timeout(keyCount,hasEvents);
            }
            getStopLatch().countDown();
        }
        public boolean processSocket(SocketWrapperBase<S> socketWrapper,
            SocketEvent event, boolean dispatch) {
            // processKey()方法是判断具体要使用什么方法来处理这次请求
            // 但如果都正常的话最终都会调用到这个方法中来,并且processKey()
            // 方法大多是当前流程的额外处理逻辑,对整个的流程来说无影响,不在
            // 本次的分析范围内,略过
            try {
                // socketWrapper不能为空
                if (socketWrapper == null) {
                    return false;
                }
                // SocketProcessorBase对象就算是Reactor模型中的请求线程了
                // 可以看成是HandlerThread,最终对请求的处理将会在这个线程中完成
                SocketProcessorBase<S> sc = null;
                if (processorCache != null) {
                    // 如果缓存中有可用对象则优先使用缓存对象
                    sc = processorCache.pop();
                }
                if (sc == null) {
                    // 没有缓存对象则创建新的SocketProcessorBase对象
                    sc = createSocketProcessor(socketWrapper, event);
                } else {
                    // 用的是缓存中的则重置动态数据
                    sc.reset(socketWrapper, event);
                }
                Executor executor = getExecutor();
                // 如果是Reactor多线程类型的,则使用多线程模型来处理请求线程
                // 如果是Reactor单线程的则使用单请求线程来处理
                if (dispatch && executor != null) {
                    // Reactor多线程
                    executor.execute(sc);
                } else {
                    // Reactor单线程
                    sc.run();
                }
            } catch (RejectedExecutionException ree) {
                ...
                return false;
            } 
            return true;
        }
    }

    对于Poller中的addEvent()方法那里为什么要设置wakeupCounter这个值来判断呢?实际上变量wakeupCounter的值相当于一个信号,初始值为0, 当wakeupCounter为0时代表selector没有调用select方法阻塞自己,当wakeupCounter为-1时代表selector正在执行select方法阻塞,等待有新的套接字事件发生,直到超时。因此这里的调用incrementAndGet方法判断是否为0则意味着有新的请求事件发生,需要唤醒selector,毕竟selector是在守护程中一直阻塞的。

    Poller中的方法有点多,但毕竟Reactor组件是负责分发派送请求的,因此处理的逻辑多也是没有办法的事。

    上面提到过PollerEvent,但方法并未贴出来,接下来看下PollerEvent的关键方法源码:

    public class Poller implements Runnable {
        public boolean events() {
            // 这个方法其实很简单,只是单纯的遍历一遍events的EventPoller事件对象
            // 对调用其run()方法,随后重置EventPoller属性,并加入到eventCache
            // 缓存中
            boolean result = false;
            PollerEvent pe = null;
            // 开始遍历,遍历完之后events对象将会为空
            for (int i = 0, size = events.size(); i < size 
                    && (pe = events.poll()) != null; i++ ) {
                result = true;
                try {
                    // 调用EventPoller的run()方法,但是直接调用线程对象的run()
                    // 方法确实少见。。
                    pe.run();
                    // 重置EventPoller数据
                    pe.reset();
                    if (running && !paused && eventCache != null) {
                        // 满足下次利用条件则放入到事件缓存对象中
                        eventCache.push(pe);
                    }
                } catch ( Throwable x ) {
                    ...
                }
            }
            return result;
        }
    }
    public static class PollerEvent implements Runnable {
        private NioChannel socket;
        private int interestOps;
        @Override
        public void run() {
            // 这个方法会在Poller调用events()方法后被调用
            if (interestOps == OP_REGISTER) {
                // 如果PollerEvent操作类型为OP_REGISTER,则说明这个PollerEvent
                // 事件是刚刚添加进来的尚未注册,这里将会把socket注册到selector
                // 中,以便后续selector调用select方法获取到
                try {
                    socket.getIOChannel().register(socket.getSocketWrapper()
                            .getPoller().getSelector(), SelectionKey.OP_READ, 
                                    socket.getSocketWrapper());
                } catch (Exception x) {
                    ...
                }
            } else {
                // 如果是其它的操作类型则先获取socket绑定的SelectionKey
                final SelectionKey key = socket.getIOChannel()
                        .keyFor(socket.getSocketWrapper().getPoller()
                                .getSelector());
                try {
                    if (key == null) {
                        try {
                            // 如果key为空,说明绑定失败,直接调用close关闭连接
                            socket.socketWrapper.close();
                        } catch (Exception ignore) {
                            ...
                        }
                    } else {
                        final NioSocketWrapper socketWrapper = 
                                (NioSocketWrapper) key.attachment();
                        if (socketWrapper != null) {
                            // 一切正常,则将socketWrapper和key都设置成
                            // interestOps的操作类型,如read或者write
                            int ops = key.interestOps() | interestOps;
                            socketWrapper.interestOps(ops);
                            key.interestOps(ops);
                        } else {
                            // 如果实际的socketWrapper对象为空,则取消当前key
                            socket.getSocketWrapper().getPoller()
                                .cancelledKey(key, socket.getSocketWrapper());
                        }
                    }
                } catch (CancelledKeyException ckx) {
                    try {
                        // 发生了异常则取消当前key
                        socket.getSocketWrapper().getPoller()
                                .cancelledKey(key, socket.getSocketWrapper());
                    } catch (Exception ignore) {}
                }
            }
        }
    }

    到这里关于Reactor便已经分析完成,想要了解Acceptor、Poller和PollerEvent这三个关键组件以及和NIO的Selector是如何搭配合作以达到高吞吐量的性能,光是看一遍文章或者笔记是远远不够的,必须还得知道NIO的运作机制以及亲身去分析一遍Tomcat的IO多路复用实现,这样才能够对Tomcat的实现有更清晰的认识。

    3.4 执行用户请求的线程对象SocketProcessorBase

    这一部分便是和Handler相关的处理线程了,在这个线程中将会使用Handler来处理具体的用户请求。实际上SocketProcessorBase实现了Runnable,如果Reactor组件没有配置线程池,那么请求只会有一个SocketProcessorBase线程来处理,即Reactor单线程模型;如果配置了线程池,才是Reactor多线程模型。接下来简略的看下其执行流程:

    public abstract class SocketProcessorBase<S> implements Runnable {
        protected SocketWrapperBase<S> socketWrapper;
        protected SocketEvent event;
        @Override
        public final void run() {
            synchronized (socketWrapper) {
                // socketWrapper可能会是read和write模式,添加synchronized锁
                // 是为了不让这两种模式同时进行
                if (socketWrapper.isClosed()) {
                    return;
                }
                doRun();
            }
        }
        protected abstract void doRun();
    }

    父类很简单,直接调用了子类的doRun()方法。因此直接看到子类doRun()方法的流程:

    protected class SocketProcessor extends SocketProcessorBase<NioChannel> {
        @Override
        protected void doRun() {
            // 获取实际的socket对象和关联的SelectionKey对象
            NioChannel socket = socketWrapper.getSocket();
            SelectionKey key = socket.getIOChannel()
                .keyFor(socket.getSocketWrapper().getPoller().getSelector());
            Poller poller = NioEndpoint.this.poller;
            if (poller == null) {
                socketWrapper.close();
                return;
            }
            try {
                int handshake = -1;
                // 这里会有一段代码,用来判断三次握手,不是本次的主流程,略过
                if (handshake == 0) {
                    SocketState state = SocketState.OPEN;
                    // 在线程中使用Handler对象来处理请求
                    if (event == null) {
                        state = getHandler().process(socketWrapper, 
                                SocketEvent.OPEN_READ);
                    } else {
                        state = getHandler().process(socketWrapper, event);
                    }
                } 
                // 握手失败的情况略过
                ...
            } // 异常情况也忽略
            ...
             finally {
                socketWrapper = null;
                event = null;
                // 如果Reactor模型正常运行,则将当前线程对象添加到processorCache
                // 缓存中,以便后续使用
                if (running && !paused && processorCache != null) {
                    processorCache.push(this);
                }
            }
        }
    }

    至此,源码层面的分析流程到此结束。

     

     

    展开全文
  • Java IO多路复用机制详解

    千次阅读 2020-04-03 18:03:42
    1、在Java中,常见的IO模型有4种, 同步阻塞IO(BlockingIO):即传统的IO模型。 同步非阻塞IO(Non-blockingIO):默认创建的socket都... IO多路复用(IOMultiplexing):经典的Reactor模式,也称为异步阻塞IO,...

    1、在Java中,常见的IO模型有4种,

    • 同步阻塞IO(Blocking IO)
    • 同步非阻塞IO(Non-blocking IO):默认创建的socket都是阻塞的,非阻塞IO要求socket被设置为NONBLOCK。注意这里所说的NIO并非Java的NIO(New IO)库。
    • IO多路复用(IO Multiplexing):也称为异步阻塞IO,Java中的Selector和Linux中的epoll都是这种模型。这里复用的是指复用一个或几个线程,用一个或一组线程处理多个IO操作,减少系统开销小,不必创建和维护过多的进程/线程;
    • 异步IO(Asynchronous IO):即经典的Proactor设计模式,也称为异步非阻塞IO。

    2、同步与阻塞概念

     1)同步和异步的概念描述的是用户线程与内核的交互方式

    • 同步是指用户线程发起IO请求后需要等待或者轮询内核IO操作完成后才能继续执行;
    • 异步是指用户线程发起IO请求后仍继续执行,当内核IO操作完成后会通知用户线程,或者调用用户线程注册的回调函数。

     2)阻塞和非阻塞的概念描述的是用户线程调用内核IO操作的方式

    • 阻塞是指IO操作需要彻底完成后才返回到用户空间;
    • 非阻塞是指IO操作被调用后立即返回给用户一个状态值,无需等到IO操作彻底完成。

    “真正”的异步IO需要操作系统更强的支持。在IO多路复用模型中,事件循环将文件句柄的状态事件通知给用户线程,由用户线程自行读取数据、处理数据。而在异步IO模型中,当用户线程收到通知时,数据已经被内核读取完毕,并放在了用户线程指定的缓冲区内,内核在IO完成后通知用户线程直接使用即可。(图片来自https://blog.csdn.net/sehanlingfeng/article/details/78920423

    相比于IO多路复用模型,异步IO并不十分常用,不少高性能并发服务程序使用IO多路复用模型+多线程任务处理的架构基本可以满足需求。Java的NIO是基于I/O复用来实现的。

    3、IO多路复用的底层原理

    IO多路复用使用两个系统调用(select/poll/epoll和recvfrom),blocking IO只调用了recvfrom;select/poll/epoll 核心是可以同时处理多个connection,而不是更快,所以连接数不高的话,性能不一定比多线程+阻塞IO好,多路复用模型中,每一个socket,设置为non-blocking,阻塞是被select这个函数block,而不是被socket阻塞的。

    1)select机制

    客户端操作服务器时就会产生这三种文件描述符(简称fd):writefds(写)、readfds(读)、和exceptfds(异常)。select会阻塞住监视3类文件描述符,等有数据、可读、可写、出异常 或超时、就会返回;返回后通过遍历fdset整个数组来找到就绪的描述符fd,然后进行对应的IO操作。

    优点:
      几乎在所有的平台上支持,跨平台支持性好
    缺点:
      由于是采用轮询方式全盘扫描,会随着文件描述符FD数量增多而性能下降。
      每次调用 select(),需要把 fd 集合从用户态拷贝到内核态,并进行遍历(消息传递都是从内核到用户空间)
      默认单个进程打开的FD有限制是1024个,可修改宏定义,但是效率仍然慢。

    2)poll机制

    基本原理与select一致,只是没有最大文件描述符限制,因为采用的是链表存储fd。

    3)epoll机制

    epoll之所以高性能是得益于它的三个函数
      1)epoll_create()系统启动时,在Linux内核里面申请一个B+树结构文件系统,返回epoll对象,也是一个fd
      2)epoll_ctl() 每新建一个连接,都通过该函数操作epoll对象,在这个对象里面修改添加删除对应的链接fd, 绑定一个callback函数。
      3)epoll_wait() 轮训所有的callback集合,并完成对应的IO操作

    优点:
      没fd这个限制,所支持的FD上限是操作系统的最大文件句柄数,1G内存大概支持10万个句柄
      效率提高,使用回调通知而不是轮询的方式,不会随着FD数目的增加效率下降
      内核和用户空间mmap同一块内存实现

    展开全文
  • IO多路复用—由Redis的IO多路复用yinch

    万次阅读 多人点赞 2018-04-23 16:56:40
    linux IO多路复用有epoll, poll, select,epoll性能比其他几者要好。 名词比较绕口,理解涵义就好。一个epoll场景:一个酒吧服务员(一个线程),前面趴了一群醉汉,突然一个吼一声“倒酒”(事件),你小跑过去...
  • 什么是IO多路复用,理解IO多路复用

    千次阅读 多人点赞 2020-04-03 12:18:00
    三、I/O多路复用 好了,我们讲了这么多,再来总结一下,到底什么是I/O多路复用。 先讲一下I/O模型: 首先,输入操作一般包含两个步骤: 等待数据准备好(waiting for data to be ready)。对于一个套接口上的操作,...
  • 一般在使用IO多路复用模型时,socket都是设置为NONBLOCK的,不过这并不会产生影响,因为用户发起IO请求时,数据已经到达了,用户线程一定不会被阻塞。 用户线程使用IO多路复用模型的伪代码描述为: void ...
  • 【面试】彻底理解 IO多路复用

    千次阅读 2021-05-01 00:11:34
    目录1、什么是IO多路复用?2、为什么出现IO多路复用机制?3、IO多路复用的三种实现方式4、select函数接口5、select使用示例6、select缺点7、poll函数接口8、pol...
  • 本示例演示了使用select函数处理多路IO复用。本示例在ubuntu + Qt下编译通过的,仅提供学习使用。
  • 什么是IO多路复用

    2020-02-14 00:03:03
    服务器端编程经常需要构造高性能的IO模型,常见的IO模型有四种: (1)同步阻塞IO(Blocking IO):即传统的IO模型。 (2)同步非阻塞IO(Non-blocking IO):默认...(3)IO多路复用(IO Multiplexing):即经...
  • 总结 IO多路复用适合用于:处理过程简单,进/线程池适合处理过程复杂 的情况 IO多路复用+单进(线)程比较省资源适合处理大量的闲置的IO IO多路复用+多单进(线)程与线程池方案相比有好处,但是并不会 有太大的...
  • 高并发服务器中的多线程多进程缺陷问题: 多进程: 进程数量有限 代价太高(销毁,上下文切换) 受限于CPU 内存隔离 进程间通信代价高 ...以上都是不可避免的,所以引出了IO多路复用(IO多路转接
  • 目录 一、基础概念 一、阻塞和非阻塞 ...IO多路复用和NIO的区别 多路复用IO的特点 五、AIO( Asynchronous I/O)异步非阻塞I/O模型 一、基础概念 一、阻塞和非阻塞 当线程访问资源时,...
  • 简单介绍IO多路复用

    2020-10-07 13:44:09
    IO多路复用 (简单介绍) 那么,什么是IO呢? 首先如果要介绍到关于IO,肯定要介绍操作系统内核↓ 关于内核(kernel)的介绍:   按下电源键后,bios第一个读到内存的程序就是内核(kernel)。在内存中属于kernel的...
  • 公众号后台回复“面试”,获取精品学习资料扫描下方二维码了解详情,试听课程本文来源:蔡蔡技术记阅读本文大概需要 5 分钟。看完下面这些,高频面试题你都会答了吧目录1、什么是IO多路复用?2...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 61,141
精华内容 24,456
关键字:

io多路复用