精华内容
下载资源
问答
  • Proactor

    2011-07-10 21:31:23
    This article is extracted from POSA2 as my reading note. The Proactor architectural pattern allows event-driven applications to efficiently
    This article is extracted from POSA2 as my reading note. 

    The Proactor architectural pattern allows event-driven applications to efficiently demultiplex and dispatch service requests triggered by the completion of asynchronous operations, to achieve the performance benefits of concurrency without incurring certain of its liabilities.

    The Proactor pattern can be considered an asynchronous variant of the synchronous Reactor pattern. The Reactor pattern is responsible for demultiplexing and dispatching multiple event handlers that are triggered when it is possible to invoke an operation synchronous without blocking. In contrast, the Proactor pattern supports the demultiplexing and dispatching of multiple completion handlers that are triggered by the completion of operations that execute asynchronously.

    The Proactor pattern includes nine participants:

    Handles: Handles are provided by operating systems to identify entities, such as network connections or open files, that can generate completion events. Completion events are generated either in response to external service requests, such as connection or data requests arriving from remote applications, or in response to operations an application generates internally, such as time-outs or asynchronous I/O system calls.

    Asynchronous operations: Asynchronous operations represent potentially long-duration operations that are used in the implementation of services, such as reading and writing data asynchronously via a socket handle. After an asynchronous operation  is invoked, it executes without blocking its caller's thread of control. Thus, the caller can perform other operations. If an operation must wait for application, its execution will be deferred until the event arrives.

    Completion handler: A completion handler specifies an interface that consists of one or more hook methods. These methods represent the set of operations available for processing information returned in the application-specific completion events that are generated when asynchronous operations finish executing.

    Concrete completion handlers: Concrete completion handlers specialize the completion handler to define a particular application service by implementing the inherited hook methods. These hook methods process the results contained in the completion events they receive when the asynchronous operations associated with the completion handler finish executing. A concrete completion handler is associated with a handle that it can use to invoke asynchronous operations itself.

    Asynchronous operation processor: Asynchronous operations are invoked on a particular handle and run to completion by an aynchronous operation processor, which is often implemented by an operating system kernel.

    Completion event queue:
    When an asynchronous operation finishs executing the asynchronous operation processor generates the corresponding completion event. It inserts this event into the completion event queue associated with the handle upon which the operation was invoked. This queue buffers completion events while they wait to be demultiplexed to their associated completion handler.

    Asynchronous event demultiplexer: An asynchronous event demultiplexer is a function that waits for completion events to be inserted into a completion event queue when an asynchronous operation has finished executing. The asynchronous event demultiplexer function then removes one or more completion event results from the queue and returns to its caller.

    Proactor: A proactor provides an event loop for an application process or thread. In this event loop, a proactor calls an asynchronous event demultiplexer to wait for completion events to occur. When an event arrives the asynchronous event demultiplexer returns. The proactor then demultiplexes the event to its associated completion handler and dispatches the appropriate hook method on the handler to process the results of the completion event.

    Initiator: An initiator is an entity local to an application that invokes asynchronous operations on an asynchronous operation processor. The initiator often processes the results of the asynchronous operations it invokes, in which case it also plays the role of a concrete completion handler.


    展开全文
  • 模拟Proactor

    2017-02-07 10:33:10
    前面在【Reactor and Proactor】中讨论了那两种模式,因为Proactor模式需要操作系统级别的支持。所以这里看看融合Reactor和Proactor的解决方案:[color=blue]使用Reactor模拟Proactor[/color]。 使用Reactor模拟...
    前面在【Reactor and Proactor】中讨论了那两种模式,因为Proactor模式需要操作系统级别的支持。所以这里看看融合Reactor和Proactor的解决方案:[color=blue]使用Reactor模拟Proactor[/color]。

    使用Reactor模拟Proactor的思路其实很简单,就是让Reactor模式中的事件分享者在得知事件发生的时候,让事件分享者来读写数据,读写完成后把数据放到事件处理者提供的缓冲区里面。再通知事件处理者:你所关心的事件已经完成[color=red](注意这里不是发生而是完成)[/color]。然后事件处理者就直接在自己所提供的缓冲区里面获取数据。事件处理者需要提供:用于存放读到数据的缓存区,读的数据大小,或者用于存放外发数据的缓存区,请求完后的回调函数。
    展开全文
  • Reactor与Proactor

    2020-09-20 16:58:58
    reactor,proactor

    https://blog.csdn.net/wanbf123/article/details/78062802
    https://juejin.im/post/6844903636422623240
    https://segmentfault.com/a/1190000002715832
    https://www.zhihu.com/question/26943938/answer/68773398

    0. 准备知识

    一般地,IO多路复用机制都依赖于一个事件多路分离器(Event Demultiplexer)以及事件处理器(Event Handler):

    • 多路分离器:分离器用于将来自事件源的I/O事件分离出来,并分发到对应的事件处理器中;
    • 事件处理器:包括read/write事件处理器,也就是负责处理IO的线程或者是回调函数(在异步中)。

    1. Reactor模式

    • 事件分离器负责等待文件描述符或socket为读写操作准备就绪,然后将就绪事件传递给对应的处理器,最后由处理器负责完成实际的读写工作,并处理对应的逻辑。
    • 事件分离器负责监听socket,并分发就绪的socket给对应的处理器;
    • 处理器负责读写以及逻辑处理。

    1.Reactor模式结构
    在这里插入图片描述
    可以看到,Reactor模式包含以下角色:

    • Reactor:反应器,充当事件分离器的角色,实现以下功能:
      1)注册和删除关注的事件句柄;
      2)运行事件循环;
      3)有就绪事件到来时,分发事件到之前注册的事件处理器上处理。

    • Synchronous Event Demultiplexer:同步事件多路分离器:由操作系统内核实现的一个函数(select/epoll),用于阻塞等待发生在句柄集合上的一个或多个事件;

    • Handler:关注的事件句柄

    • Event Handler:事件处理接口

    • Concrete Event HandlerA:实现应用程序所提供的特定事件处理逻辑(继承自Event Handler),例如read。

    2.处理逻辑
    在这里插入图片描述

    • 将关注的事件handle注册到Reactor中
    • 调用Reactor,进入无限事件循环,等待注册的事件到来
    • 事件到来,select返回,Reactor将事件(连接/读/写)分发到之前注册的回调函数中处理

    3.1 单Reactor单线程模型

    1. Reactor 负责多路分离套接字,有新连接到来触发connect 事件之后,交由Acceptor线程进行处理,有IO读写事件之后交给hanlder 处理(也就是说,Reactor负责监听事件的到来,再将事件分发到对应的Acceptor或者Handler中);
    2. Acceptor负责处理connect 事件;
    3. Handler负责处理IO读写事件和业务逻辑,其实是Reactor线程的一部分;
      在这里插入图片描述

    3.2 单Reactor多线程模型

    1. Reactor 负责多路分离套接字,有新连接到来触发connect 事件之后,交由Acceptor线程进行处理,有IO读写事件之后交给hanlder 处理(也就是说,Reactor负责监听事件的到来,再将事件分发到对应的Acceptor或者Handler中);
    2. Acceptor负责处理connect 事件;
    3. 相对于单Reactor单线程而言,这里的Handler采用线程池方式,来负责处理IO读写 + 业务逻辑,这样可以减小主reactor的性能开销,从而更专注的做事件分发工作了,从而提升整个应用的吞吐。
      在这里插入图片描述

    3. 多Reactor多线程模型
    在这里插入图片描述

    3. Proactor模式

    • 事件分离器:负责发起异步读写操作,并将读写完成事件传递给对应处理器;
    • IO操作本身由操作系统来完成。传递给操作系统的参数需要包括用户定义的数据缓冲区地址和数据大小,操作系统才能从中得到写出操作所需数据,或写入从socket读到的数据;
    • 事件处理器不用再负责IO操作,而是专注于事件的逻辑处理。

    1. Proactor模式结构
    在这里插入图片描述
    Proactor模式包含如下角色:

    • Async Operation Processor:异步操作处理器;负责执行异步操作,一般由操作系统内核实现;
    • Completion Handler:完成事件接口;一般是由回调函数组成的接口;
    • Completion Handler A(B):完成事件处理逻辑;实现接口定义特定的应用处理逻辑。
    • Completion Event Queue:完成事件队列;异步操作完成的结果放到队列中等待后续使用;
    • Proactor主动器:
      1)为应用程序进程提供事件循环;
      2)从完成事件队列中取出异步操作的结果;
      3)分发调用相应的后续处理逻辑。

    2. Proactor处理逻辑

    • 发起I/O异步操作,注册I/O完成事件处理器;
    • Proactor主动器(事件分离器),启动事件循环,等待I/O操作完成事件;
    • 内核并行执行实际的I/O操作,并将结果数据存入用户自定义缓冲区;
    • 内核完成I/O操作,通知事件分离器,事件分离器调度对应的事件处理器;
    • 事件处理器处理用户自定义缓冲区中的数据。

    一个疑问:Proactor主动器是不是也负责注册关心的socket?《Linux高性能服务器编程》中的例子:

    在这里插入图片描述

    展开全文
  • 异步I/O模型则用于实现Proactor模式 最后我们会使用同步I/O方式模拟出Proactor模式 一、Reactor模式 Reactor模式特点 它要求主线程(I/O处理单元)只负责监听文件描述符上是否有事件发生,有的话就立即将时间...

    前言

    • 同步I/O模型通常用于实现Reactor模式
    • 异步I/O模型则用于实现Proactor模式
    • 最后我们会使用同步I/O方式模拟出Proactor模式

    一、Reactor模式

    • Reactor 释义“反应堆”,是一种事件驱动机制
    • Reactor的回调函数:和普通函数调用的不同之处在于,应用程序不是主动的调用某个 API 完成处理,而是恰恰 相反,Reactor 逆置了事件处理流程,应用程序需要提供相应的接口并注册到 Reactor 上, 如果相应的时间发生,Reactor 将主动调用应用程序注册的接口,这些接口又称为“回调函数”

    • Reactor 模式是处理并发I/O比较常见的一种模式,用于同步 I/O,中心思想是将所有要处理的I/O 事件注册到一个中心I/O多路复用器上,同时主线程/进程阻塞在多路复用器上; 一旦有 I/O 事件到来或是准备就绪(文件描述符或 socket 可读、写),多路复用器返回并将事先注册的相应 I/O 事件分发到对应的处理器中。
    • Reactor 模型有三个重要的组件:
      • 多路复用器:由操作系统提供,在 linux 上一般是 select, poll, epoll 等系统调用。
      • 事件分发器:将多路复用器中返回的就绪事件分到对应的处理函数中
      • 事件处理器:负责处理特定事件的处理函数
    • 具体流程如下:
      • 注册读就绪事件和相应的事件处理器
      • 事件分离器等待事件
      • 事件到来,激活分离器,分离器调用事件对应的处理器
      • 事件处理器完成实际的读操作,处理读到的数据,注册新的事件,然后返还控制 权

    多线程Reactor模式

    • 多线程Reactor模式特点:
      • 它要求主线程(I/O处理单元)只负责监听文件描述符上是否有事件发生,有的话就立即将时间通知工作线程(逻辑单元)。除此之外,主线程不做任何其他实质性的工作
      • 读写数据,接受新的连接,以及处理客户请求均在工作线程中完成
    • 工作流程:
      • ①主线程往epoll内核事件表中注册socket上有数据可读
      • ②主线程调用epoll_wait等待socket上有数据可读
      • ③当socket上有数据可读时,epoll_wait通知主线程。主线程则将socket可读事件放入请求队列
      • ④睡眠在请求请求队列上的某个工作线程被唤醒,它从socket读取数据,并处理客户请求,然后往epoll内核事件表中注册该socket上的写就绪时间
      • ⑤主线程调用epoll_wait等到socket可写
      • ⑥当socket可写时,epoll_wait通知主线程。主线程将socket可写事件放入请求队列
      • ⑦睡眠在请求队列上的某个工作线程被唤醒,它向socket上写入服务器处理客户请求的结果

    单线程Reactor模式

    • 单线程Reactor模式与多线程Reactor模式原理相同。但是工作都是在同一个线程中完成的
    • 单线程优缺点:
      • 优点:Reactor模型开发效率上比起直接使用IO复用要高,它通常是单线程的,设计目标是希望单线程使用一颗 CPU 的全部资源。优点为每个事件处理中很多时候可以 不考虑共享资源的互斥访问
      • 缺点:可是缺点也是明显的,现在的硬件发展,已经不再遵循摩尔定 律,CPU 的频率受制于材料的限制不再有大的提升,而改为是从核数的增加上提升能力
    • 单线程Reactor使用多核:
      • 如果程序业务很简单,例如只是简单的访问一些提供了并发访问的服务,就可以直接开启多个反应堆(Reactor),每个反应堆对应一颗CPU核心
      • 这些反应堆上跑的请求互不相关,这是完全可以利用多核的。例如Nginx这样的http静态服务器
    • 下面是单线程Reactor模式的实现代码,下载下来之后可以直接编译运行:
    // reactor.c
    // 源码链接: https://github.com/dongyusheng/csdn-code/blob/master/server-client/reactor.c
    // gcc -o reactor reactor.c
    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <string.h>
    #include <arpa/inet.h>
    #include <netinet/in.h>
    #include <sys/socket.h>
    #include <sys/types.h>
    #include <sys/epoll.h>
    #include <errno.h>
    #include <time.h>
    #include <libgen.h>
    #include <fcntl.h>
    
    #define MAX_EPOLL_EVENTS    1024
    #define MAX_BUFFER_SIZE     4096
    
    typedef int NCALLBACK(int, int, void*);
    
    // 事件结构体, 每个套接字都会被封装为一个事件
    struct  ntyevent {
        int fd;           // 事件对应的fd
        int events;       // 事件类型(  本代码中我们只处理EPOLL_IN和EPOLL_OUT)
    
        void *arg;        // 事件回调函数的参数3, 实际传入的是一个struct ntyreactor结构体指针
        int (*callback)(int fd, int events, void *arg); //事件回调函数
    
        int status;       // 当前事件是否位于epoll集合中: 1表示在, 0表示不在
    
        char buffer[MAX_BUFFER_SIZE]; // 读写缓冲区
        int length;       //缓冲区数据的长度
        
        long last_active; // 最后一次活跃的时间
    };
    
    
    // Reactor主体
    struct ntyreactor {
        int epoll_fd;             // epoll套接字
        struct ntyevent *events; // reactor当前处理的事件集
    };
    
    // 创建一个Tcp Server
    int init_server(char *ip, short port);
    // 向reactor中添加一个服务器监听事件
    int ntyreactor_addlistener(struct ntyreactor *reactor, int fd, NCALLBACK callback);
    
    
    /***下面这3个函数是用来对reactor操作的***/
    // 初始化reactor
    struct ntyreactor *ntyreactor_init();
    // 销毁reactor
    int ntyreactor_destroy(struct ntyreactor *reactor);
    // reactor运行函数
    int ntyreactor_run(struct ntyreactor *reactor);
    
    
    
    /***下面这3个函数是用来对ntyevent事件结构操作的***/
    // 将一个fd封装到事件结构中
    int nty_event_set(struct ntyevent *ev, int fd, int event, int length, int status, NCALLBACK callback, void *arg);
    // 将一个事件添加/更新到epoll的事件表中
    int nty_event_add(int epoll_fd, struct ntyevent* ev);
    // 将一个事件移出epoll事件表
    int nty_event_del(int epoll_fd, struct ntyevent* event);
    
    
    /***下面这3个函数是ntyevent事件可以使用的回调函数***/
    int accept_callback(int fd, int events, void *arg);
    int recv_callback(int fd, int events, void *arg);
    int send_callback(int fd, int events, void *arg);
    
    
    
    int main(int argc, char *argv[])
    {
        if(argc != 3)
        {
            printf("usage: ./%s [ip] [port]\n", basename(argv[0]));
            exit(EXIT_FAILURE);
        }
    
        char *ip = argv[1];
        short port = atoi(argv[2]);
        
        int sock_fd;
    
        // 1.初始化一个Tcp Server
        sock_fd = init_server(ip, port);
    
        // 2.初始化reactor
        struct ntyreactor *reactor = ntyreactor_init();
        if( reactor == NULL)
        {
            printf("Error in %s(), ntyreactor_init: create reactor error\n", __func__);
            exit(EXIT_FAILURE);
        }
    
        // 3.将Tcp Server添加到reactor事件集中
        ntyreactor_addlistener(reactor, sock_fd, accept_callback);
    
        // 4.运行reactor
        ntyreactor_run(reactor);
    
        // 5.销毁
        ntyreactor_destroy(reactor);
    
        close(sock_fd);
        
        return 0;
    }
    
    int init_server(char *ip, short port)
    {
        // 1.创建套接字
        int sock_fd = socket(AF_INET, SOCK_STREAM, 0);
        if(sock_fd == -1)
        {
            printf("Error in %s(), socket: %s\n", __func__, strerror(errno));
            return -1;
        }
    
        // 2.初始化服务器地址
        struct sockaddr_in server_addr;
        memset(&server_addr, 0, sizeof(server_addr));
        server_addr.sin_family = AF_INET;
        if(inet_pton(AF_INET, ip, (void*)&server_addr.sin_addr.s_addr) == -1)
        {
            printf("Error in %s(), inet_pton: %s\n", __func__, strerror(errno));
            return -1;
        }
        server_addr.sin_port = htons(port);
    
        // 3.绑定服务器地址
        if(bind(sock_fd, (const struct sockaddr*)&server_addr, sizeof(server_addr)) == -1)
        {
            printf("Error in %s(), bind: %s\n", __func__, strerror(errno));
            return -1;
        }
    
        // 3.监听
        if(listen(sock_fd, 20) == -1)
        {
            printf("Error in %s(), listen: %s\n", __func__, strerror(errno));
            return -1;
        }
    
        printf("Listen start [%s:%d]...\n", inet_ntoa(server_addr.sin_addr), ntohs(server_addr.sin_port));
        
        return sock_fd;
    }
    
    struct ntyreactor *ntyreactor_init()
    {
        // 1.创建一个reactor
        struct ntyreactor *reactor = (struct ntyreactor*)malloc(sizeof(struct ntyreactor));
        if(reactor == NULL)
            return NULL;
        memset(reactor, 0, sizeof(struct ntyreactor));
    
        // 2.创建reacotr的epoll_fd
        reactor->epoll_fd = epoll_create(1);
        if(reactor->epoll_fd == -1)
        {
            printf("Error in %s(), epoll_create: %s\n", __func__, strerror(errno));
            free(reactor);
            return NULL;
        }
    
        // 3.创建reactor的事件集
        reactor->events = (struct ntyevent*)malloc(sizeof(struct ntyevent) * MAX_EPOLL_EVENTS);
        if(reactor->events == NULL)
        {
            printf("Error in %s(), malloc: %s\n", __func__, strerror(errno));
            close(reactor->epoll_fd);
            free(reactor);
            return NULL;
        }
    
        return reactor;
    }
    
    int ntyreactor_destroy(struct ntyreactor *reactor)
    {
        if(reactor == NULL)
        {
            printf("Error in %s(): %s\n", __func__, "reactor arg is NULL");
            return -1;
        }
    
        // 关闭epoll_fd、销毁事件集、释放结构
        close(reactor->epoll_fd);
        free(reactor->events);
    
        free(reactor);
        
        return 0;
    }
    
    int ntyreactor_run(struct ntyreactor *reactor)
    {
        // 1.判断参数
        if(reactor == NULL || reactor->epoll_fd < 0 || reactor->events == NULL)
        {
            printf("Error in %s(): %s\n", __func__, "reactor arg is error");
            return -1;
        }
    
    
        struct epoll_event ep_events[MAX_EPOLL_EVENTS + 1];
    
        // 2.进行epoll_wait()
        int nready;
        while(1)
        {
            // 超时检测
            /*
            int checkpos = 0, i;
            long now = time(NULL);
    		for (i = 0; i < MAX_EPOLL_EVENTS; i++, checkpos ++) {
    			if (checkpos == MAX_EPOLL_EVENTS) {
    				checkpos = 0;
    			}
                // 如果当前索引处的事件status为0, 则不检测, 进行下一个
    			if (reactor->events[checkpos].status != 1) {
    				continue;
    			}
    
                // 如果超过60秒, 那么就认定为超时, 超时后关闭移除
    			long duration = now - reactor->events[checkpos].last_active;
    			if (duration >= 60) {
    				close(reactor->events[checkpos].fd);
    				printf("[fd=%d] timeout\n", reactor->events[checkpos].fd);
    				nty_event_del(reactor->epfd, &reactor->events[checkpos]);
    			}
    		}*/
            
            nready = epoll_wait(reactor->epoll_fd, ep_events, MAX_EPOLL_EVENTS, 1000);
            // 3.函数出错
            if(nready == -1)
            {
                // 如果函数在阻塞过程中接收到信号, 那么继续进行epoll_wait()
                if(errno == EAGAIN || errno == EWOULDBLOCK)
                    continue;
                printf("Error in %s(), epoll_wait: %s\n", __func__, strerror(errno));
                return -1;
            }
            // 4.函数超时
            else if(nready == 0)
                continue;
            // 5.有事件准备好
            else
            {
                // 遍历处理已就绪的事件
                int i;
                for(i = 0; i < nready; ++i)
                {
                    // 获取事件结构体, 保存在struct epoll_event结构的data.ptr中
                    struct ntyevent* ev = (struct ntyevent*)ep_events[i].data.ptr;
    
                    // 如果事件可读
                    if((ep_events[i].events & EPOLLIN) && (ev->events & EPOLLIN))
                        ev->callback(ev->fd, ev->events, ev->arg);
    
                    // 如果事件可写
                    if((ep_events[i].events & EPOLLOUT) && (ev->events & EPOLLOUT))
                        ev->callback(ev->fd, ev->events, ev->arg);
                }
            }
        }
    
        return 0;
    }
    
    int ntyreactor_addlistener(struct ntyreactor *reactor, int fd, NCALLBACK callback)
    {
        if(reactor == NULL || fd <0 || callback == NULL)
        {
            printf("Error in %s(): %s\n", __func__, "arg error");
            return -1;
        }
    
        // 初始化ntyevent事件结构, 然后添加到reactor的epoll事件表中即可
        nty_event_set(&reactor->events[fd], fd, EPOLLIN, 0, 0, callback, reactor);
        nty_event_add(reactor->epoll_fd, &reactor->events[fd]);
    
        return 0;
    }
    
    int nty_event_set(struct ntyevent *ev, int fd, int event, int length, int status, NCALLBACK callback, void *arg)
    {
        if(ev == NULL || fd <0 || event <0 || length < 0 || callback == NULL || arg == NULL || status < 0)
        {
            printf("Error in %s(): %s\n", __func__, "arg error");
            return -1;
        }
    
        // 初始化ntyevent结构的相关内容即可
        ev->fd = fd;
        ev->events = event;
        ev->arg = arg;
        ev->callback = callback;
        ev->status = status;
        ev->length = length;
        ev->last_active = time(NULL);
    
        return 0;
    }
    
    int nty_event_add(int epoll_fd, struct ntyevent* ev)
    {
        if(epoll_fd <0 || ev == NULL)
        {
            printf("Error in %s(): %s\n", __func__, "arg error");
            return -1;
        }
        
        // 1.创建一个epoll事件结构
        struct epoll_event ep_event;
        memset(&ep_event, 0, sizeof(ep_event));
        ep_event.events = ev->events;
        ep_event.data.ptr = ev;
        //ep_event.data.fd = ev->fd; data成员是一个联合体, 不能同时使用fd和ptr成员
    
        // 2.如果当前ev已经在epoll事件表中, 那么就修改; 否则就把ev加入到epoll事件表中
        int op;
        if(ev->status == 0)
        {
            op = EPOLL_CTL_ADD;
            ev->status = 1;
        } 
        else
            op = EPOLL_CTL_MOD;
    
        // 3.添加/更新 
        if(epoll_ctl(epoll_fd, op, ev->fd, &ep_event) == -1)
        {
            printf("Error in %s(), epoll_ctl: %s\n", __func__, strerror(errno));
            return -1;
        }
    
        return 0;
    }
    
    int nty_event_del(int epoll_fd, struct ntyevent* ev)
    {
        if(epoll_fd < 0 || ev == NULL || ev->status != 1)
        {
            printf("Error in %s(): %s\n", __func__, "ev arg is error");
            return -1;
        }
    
        // 初始要删除的epoll事件结构
        struct epoll_event ep_event;
        memset(&ep_event, 0, sizeof(ep_event));
        ep_event.data.ptr = ev;
        //ep_event.data.fd = ev->fd; data成员是一个枚举, 不能同时使用ptr和fd成员
        ev->status = 0;
    
        // 从epoll事件表中删除epoll事件
        if(epoll_ctl(epoll_fd, EPOLL_CTL_DEL, ev->fd, &ep_event) == -1)
        {
            printf("Error in %s(), epoll_ctl: %s\n", __func__, strerror(errno));
            return -1;
        }
        
        return 0;
    }
    
    int accept_callback(int fd, int events, void *arg)
    {
        // 1.获得reactor结构
        struct ntyreactor *reactor = (struct ntyreactor*)arg;
        // 2.获取该fd对应的事件结构
        struct ntyevent *ev = reactor->events + fd;
    
        // 3.初始化客户端地址结构
        struct sockaddr_in cli_addr;
        memset(&cli_addr, 0 , sizeof(cli_addr));
        socklen_t len = sizeof(cli_addr);
    
        // 4.接收客户端
        int cli_fd;
        cli_fd = accept(ev->fd, (struct sockaddr*)&cli_addr, &len);
        if(cli_fd == -1)
        {
            printf("Error in %s(), accept: %s\n", __func__, strerror(errno));
            return -1;
        }
    
        int i;
        do {
            // 5.在reactor事件表中找到第一个空位置, 用i表示新事件存放的位置, 也是其套接字的值
            // reactor->events的0、1、2、3、4都被占用了, 客户端第一个可以使用的套接字为5, 因此此处从5开始遍历
            for(i = 5; i< MAX_EPOLL_EVENTS; ++i)
            {
                if(reactor->events[i].status == 0)
                    break;
            }
    
            // 6.如果满了, 就退出
            if(i == MAX_EPOLL_EVENTS)
            {
                printf("Error in %s(): max connect limit[%d]\n", __func__, MAX_EPOLL_EVENTS);
                return -1;
            }
    
            // 7.将套接字设置为非阻塞
            int flag = 0;
    		if ((flag = fcntl(cli_fd, F_SETFL, O_NONBLOCK)) < 0) {
                printf("Error in %s(), fcntl: %s\n", __func__, strerror(errno));
    			return -1;
    		}
            
            // 8.将新事件添加到reactor事件表中
            // 此处我们将新客户端的回调函数首先设置为recv_callback, 事件类型为EPOLLIN, 因为一般都是客户端向服务器发送数据的
            nty_event_set(&reactor->events[cli_fd], cli_fd, EPOLLIN, 0, 0, recv_callback, reactor);
            nty_event_add(reactor->epoll_fd, &reactor->events[cli_fd]);
        } while(0);
    
        printf("New connect: [%s:%d], [time:%ld], pos[%d]\n", \
            inet_ntoa(cli_addr.sin_addr), ntohs(cli_addr.sin_port), reactor->events[cli_fd].last_active, i);
    
        return 0;
    }
    
    int recv_callback(int fd, int events, void *arg)
    {
        // 1.获得reactor结构
        struct ntyreactor *reactor =(struct ntyreactor*)arg;
        // 2.获取该fd对应的事件结构
        struct ntyevent *ev = reactor->events + fd;
    
        // 3.先将事件从epoll事件集移除
        nty_event_del(reactor->epoll_fd, ev);
        
        // 3.接收数据
        int rc = recv(ev->fd, ev->buffer, MAX_BUFFER_SIZE, 0);
        if(rc < 0)        //recv出错
        {
            //if(errno == EAGAIN || errno == EWOULDBLOCK)
            //    return rc;
            
            printf("Error in %s(), recv: %s\n", __func__, strerror(errno));
    
            // 此处我们不再需要将该nty_event从epoll事件集移除, 因为上面我们已经移除了
            close(ev->fd);
        }
        else if(rc == 0)  //对方关闭了
        {
            printf("Client closed the connection, fd = %d\n", ev->fd);
    
            // 此处我们也当做错误处理
            // 此处我们不再需要将该nty_event从epoll事件集移除, 因为上面我们已经移除了
            close(ev->fd);
        } 
        else              //接收到数据
        {
            ev->buffer[rc] = '\0';
            printf("Recv[fd = %d]: %s\n", ev->fd, ev->buffer);
    
            // 将事件变为可读, 然后加入到epoll事件表中
            nty_event_set(ev, ev->fd, EPOLLOUT, rc, 0, send_callback, reactor);
            nty_event_add(reactor->epoll_fd, ev);
        }
    
        return rc;
    }
    
    int send_callback(int fd, int events, void *arg)
    {
        // 1.获得reactor结构
        struct ntyreactor *reactor =(struct ntyreactor*)arg;
        // 2.获取该fd对应的事件结构
        struct ntyevent *ev = reactor->events + fd;
    
        // 3.此处我们把接收的内容再回送给对象, 因此使用的是ev->buffer
        int rc = send(ev->fd, ev->buffer, ev->length, 0);
        if(rc > 0) //send成功
        {
            printf("Send[fd = %d]: %s\n", ev->fd, ev->buffer);
    
            // 移除、添加: 将其变为可读
            nty_event_del(reactor->epoll_fd, ev);
            nty_event_set(ev, ev->fd, EPOLLIN, 0, 0, recv_callback, reactor);
            nty_event_add(reactor->epoll_fd, ev);
        }
        else //send失败
        {
            printf("Error in %s(), send: %s\n", __func__, strerror(errno));
    
            // 关闭、移除
            close(ev->fd);
            nty_event_del(reactor->epoll_fd, ev);
        }
    
        return rc;
    }

     

    二、Proactor模式

    Proactor模式特点

    • 与Reactor不同,Proactor模式将所有的I/O操作都交给主线程和内核来处理,工作线程仅仅负责业务逻辑

    Proactor模式的工作流程

    • ①主线程调用aio_read函数向内核注册socket上读完成事件,并告诉内核用户读缓冲区的位置,以及读操作完成时如何通知应用程序(这里以信号为例)
    • ②主线程继续处理其他逻辑
    • ③当socket上的数据被读入用户缓冲区后,内核将向应用程序发送一个信号,以通知应用程序数据已经可用
    • ④应用程序预先定义好的信号处理函数选择一个工作线程来处理客户请求。工作线程处理完客户请求之后,调用aio_write函数向内核注册socket上的写完成事件,并告诉内核用户写缓冲区的位置,以及写操作完成时如何通知应用程序(这里以信号为例)
    • ⑤主线程继续处理其他逻辑
    • ⑥当用户缓冲区的数据被写入socket之后,内核将向应用程序发送一个信号,以通知应用程序数据已经发送完毕
    • ⑦应用程序预先定义好的信号处理函数选择一个工作线程来做善后处理,比如决定是否关闭socket

    • 在上图中,连接socket上的读写事件是通过aio_read/aio_write向内核注册的,因此内核将通过信号来向应用程序报告连接socket上的读写事件。所以,主线程的epoll_wait调用仅能用来检测监听socket上的连接请求事件,而不能用来检测连接socket的读写事件

    三、使用同步I/O模拟Proactor模式

    原理:

    • 主线程执行数据读写操作,读写完成之后,主线程向工作线程通知这一“完成事件”。那么从工作线程的角度来看,它们就直接获得了数据读写的结果,接下来要做的只是对读写的结果进行逻辑处理

    工作流程:

    • ①主线程往epoll内核事件表中注册socket上的读就绪事件
    • ②主线程调用epoll_wait等待socket上有数据可读
    • ③当socket上有数据可读时,epoll_wait通知主线程。主线程从socket循环读取数据,直到没有更多数据可读,然后将读取到的数据封装成一个请求对象并插入请求队列
    • ④睡眠在请求队列上的某个工作线程被唤醒,它获得请求对象并处理客户请求,然后往epoll内核事件表中注册socket上的写就绪事件
    • ⑤主线程调用epoll_wait等到socket可写
    • ⑥当socket可写时,epoll_wait通知主线程。主线程往socket上写入服务器处理客户请求的结果

    四、几种开源库

    • 下面是几种使用到上面技术的开源库:
      • libevent:名气最大,应用最广泛,历史悠久的跨平台事件库
      • libev:较 libevent 而言,设计更简练,性能更好,但对 Windows 支持不够好;
      • libuv:开发 node 的过程中需要一个跨平台的事件库,他们首选了 libev,但又要支持 Windows,故重新封装了一套,linux 下用 libev 实现,Windows 下用 IOCP 实现

    优先级

    • libevent:激活的事件组织在优先级队列中,各类事件默认的优先级是相同的,可以通过设置 事件的优先级使其优先被处理
    • libev:也是通过优先级队列来管理激活的时间,也可以设置事件优先级
    • libuv:也是通过优先级队列来管理激活的时间,也可以设置事件优先级

    事件循环

    • libevent:event_base 用于管理事件
    • libev:激活的事件组织在优先级队列中,各类事件默认的优先级是相同的,
    • libuv:可以通 过设置事件的优先级 使其优先被处理

    线程安全

    • event_base 和 loop 都不是线程安全的,一个 event_base 或 loop 实例只能在用户的一个线程 内访问(一般是主线程),注册到 event_base 或者 loop 的 event 都是串行访问的,即每个执 行过程中,会按照优先级顺序访问已经激活的事件,执行其回调函数。所以在仅使用一个 event_base 或 loop 的情况下,回调函数的执行不存在并行关系
    展开全文
  • Proactor VS Reactor

    2019-10-30 16:02:41
    proactor vs reactor 先发几本proactor 与 reactor 相关的电子书: http://files.cnblogs.com/files/f1194361820/react...
  • Proactor模式详解

    千次阅读 2018-08-27 23:07:31
      proactor结构模式在异步操作完成后触发服务请求的分配和分发 。 1. 举个例子吧   考虑一个需要同时处理多个请求的网络服务程序,比如,一个高效的WEB服务器需要并发的处理来自于不同客户端浏览器的HTTP...
  • Proactor 与 Reactor

    2019-02-27 07:58:39
    Reactor和Proactor对比以及优缺点 两种高效的事件处理模型:Reactor模式和Proactor模式 在高性能的I/O设计中,有两个比较著名的模式Reactor和Proactor模式,其中Reactor模式用于同步I/O,而Proactor运用于异步I/O...
  • Proactor设计模式

    2019-03-06 20:33:30
    主动器Proactor Proactor模式结构 Proactor主动器模式包含如下角色 Handle 句柄;用来标识socket连接或是打开文件; Asynchronous Operation Processor:异步操作处理器;负责执行异步操作,一般由操作系统内核...
  • ACE proactor

    2010-07-08 19:11:00
    ACE proactor
  • Proactor模式

    2014-11-04 22:06:00
    第8章前摄器(Proactor):用于为异步事件多路分离和分派处理器的对象行为模式 Irfan Pyarali Tim Harrison Douglas C. Schmidt Thomas D. Jordan 摘要   现代操作系统为开发并发应用提供了多种机制。同步多...
  • Proactor架构模式

    2018-04-28 23:08:12
    http://www.laputan.org/pub/sag/proactor.pdf
  • Proactor和Reactor

    2019-08-09 17:50:00
    Proactor: 事件句柄初始化一个异步读操作,此时该句柄并不在意异步操作结果,而是要获得完成事件而注册 事件多路器等待直到io事件完成 当事件多路器等待io事件时,操作...
  • Proactor模式 论文

    2019-01-30 22:23:30
    Proactor An Object Behavioral Pattern for Demultiplexing and Dispatching Handlers for Asynchronous Events 论文Irfan Pyarali, Tim Harrison, Douglas C. Schmidt, Thomas D. Jordan 2012-12-10 23:58  T...
  • ACE proactor 与 Reactor 模式的详解
  • Reactor和Proactor模式

    2020-09-27 08:11:57
    1.Reactor和Proactor模式的比较 两种I/O多路复用模式:Reactor和Proactor。一般地,I/O多路复用机制都依赖于一个事件多路分离器(Event Demultiplexer)。分离器可将来自事件源的I/O事件分离出来,并分发到对应的read/...
  • Reactor和Proactor

    2019-05-28 22:05:35
    两种I/O多路复用模式:Reactor和Proactor 一般地,I/O多路复用机制都依赖于一个事件多路分离器(Event Demultiplexer)。分离器对象可将来自事件源的I/O事件分离出来,并分发到对应的read/write事件处理器(Eve...
  • Reactor 与 Proactor

    2019-06-07 10:01:19
    Reactor 与 Proactor 比较 一.概述 系统I/O可以被阻断,或非阻塞同步,或异步非阻塞。阻塞I/O意味着在操作完成之前,系统不会将控制权返回给调用者,导致调用者被阻塞,并且在此期间无法执行任何其他任务。最重要的...
  • Boost ASIO proactor 浅析

    2019-10-06 01:50:28
    Boost ASIO proactor 浅析 http://www.cnblogs.com/zhiranok/archive/2011/10/07/boost-asio-proactor.html ...
  • reactor, proactor

    2017-03-17 17:30:36
    首先分享一下,我在网上看到的两篇不错的文章:正是这两篇文章才理解了reactor和proactor模式;  Reactor模式,或者叫反应器模式 高性能IO设计的Reactor和Proactor模式  首先就第一篇《Reactor模式,或者叫反应...
  • Reactor和Proactor对比

    2020-07-17 22:14:42
    常见的IO事件处理模型有两种:Reactor和Proactor。Redis中的ae就是采用的Reactor事件处理模型,Proactor需要操作系统的支持,目前暂时还没接触到相关的使用场景,主要是学习模型结构。 Reactor模型 Handler:...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,314
精华内容 525
关键字:

proactor