精华内容
下载资源
问答
  • 事件驱动框架
    千次阅读
    2016-11-04 10:31:42

    事件驱动框架(四)——实时框架


    说明:

    之前介绍的都是事件处理框架,是离应用最近的一层,主要思想就是分割对象,然后细化对象的状态,用状态机的方式进行处理。这里的处理方式很多,主要还是那两个接口的实现状态机。

    这章介绍核心的实时框架部分。这部分是介于底层或者OS和时间处理框架之间的一层可重用的设计。(看完别人的事件驱动的框架思路就被直接带着走了= =)负责多个状态机间发送事件的处理策略。
    下面这段是实施框架种类的介绍,我只是主要整理了一下。


    基础

    1.控制倒置

    事件驱动框架和顺序式结构程序最大的区别在于控制的倒置。事件驱动框架是框架会根据不同的策略在内部主动调用对象,框架常常扮演协调和排列程序活动的主函数角色。顺序式结构需要程序员事先规划好应用的执行顺序,按照流程进行运行。

    2.传统事件驱动系统

    传统事件驱动系统有2部分组成:事件基础设施和应用程序。事件基础设施由事件循环,事件调度器,事件队列组成。应用程序则是用来共享共同数据的时间处理函数。
    系统中从异步中断或者重时间处理函数被放入事件队列。控制权则停留在一个无限循环查询事件队列的调度器上。调度器负责事件的排序和事件的查询。这就要求时间处理函数执行时间短,并且要避免阻塞。
    传统时间驱动系统有以下缺点:
    1>.不可抢占。需要等待上一个事件处理完后才能接着处理下一个。
    2>.不支持对应用程序上下文的管理
    3>.全部的事件处理函数存取一样的全局数据,带来了并发性危机。

    3.活动对象模型

    这个模型的本质概念是在一个多任务环境里使用多个事件驱动型系统。一个应用程序包括了多个活动对象。每个对象都有它自己的控制线程:
    活动对象=控制线程+事件队列+状态机
    这种模型和状态机兼容。状态机的事件队列,事件循环和事件处理器是通用的,是实时框架的一部分。应用程序用状态机实现。框架通过调度接口调用它们。

    传统的可抢占式内核

    在活动对象的模型中,活动对象被映射为OS的一个线程。活动对象系统可以实现和传统的任务相同的任务层响应。

    合作式内核(非抢占式内核)

    一种非常简单的内核。活动对象合作分享CPU,并且在每个RTC步骤内无条件让出控制权。该种策略适合于低端的嵌入式系统。同时RTC时间需要被设计的短。
    内核策略:
    内核负责确定下一部分的运行代码。它的调度器在大循环中不断监视所有事件队列,挑选中间优先级最高的非空队列,派发事件调度。当调度时间完成后返回到主循环中,此时,运行权有交还给调度器来重新进行事件队列的监视。
    当调度器检测到所有的事件都为空时,执行空闲处理。(这部分可被用户程序定制,输出追踪信息等,在嵌入式中也进入低功耗模式,和当有中断时唤醒)

    可抢占式RTC内核

    在RTC内核,任务和中断服务ISR 都是一次性,运行到完成(从队列中获得事件到调度dispatch结束)。中断被看做作一个“超高”优先级任务,整个过程非常类似一个被基于优先级的中断控制器管理的ISR。
    只需要全部任务运行到完成,并强制固定优先级调度,一个RTC内核可以使用堆栈协议。当一个任务被一个更高优先级任务抢占, RTC内核使用一个常规的C函数调用,在被抢占任务堆栈上下文的顶部创建较高优先级任务的上下文。只要一个中断抢占了一个任务,内核再次使用一个常规C函数调用,使用已经建立的中断堆栈框架,在它的顶部建立较高优先级任务的上下文。这个简单的上下文管理的形式是适当的,因为每个任务,就像每个ISR 一样,运行到完成。因为抢占任务必须也运行到完成,较低优先级堆栈的上下文将不再需要,直到这个抢占任务完成为止—这时候,被抢占的任务将自然的在堆栈的顶部,准备被恢复。这个简单的机制使用和一个基于优先级的硬件中断控制器系统工作完全相同的原理工作。RTC内核最严重的局限性,就是任务不能被阻塞。。使用一个传统RTOS 唯一真实的原因是要和现有代码兼容。例如,传统的设备驱动程序,通讯堆栈(比如TCP/IP,USB,CAN等待)和其他legacy 子系统通常使用阻塞范型编写。传统的阻塞型RTOS 可以支持活动对象和传统的阻塞代码, RTOS 在实时框架外面执行它。(原理直接复制)

    4.事件派发机制

    事件的产生来源:系统的任何部分,活动对象等。消费事件只能是活动对象。
    实时框架支持2 类事件派发机制:
    1>.直接发送。将产生的事件直接放入事件队列。
    2>.发行-订阅。生产的事件发送给框架,框架发送事件给所有订阅了这个事件的活动对象。

    5.事件内存管理

    这部分主要是对事件的使用方式。
    事件管理的策略:
    1.复制事件
    将事件整个复制到事件队列中,使用时再将整个事件复制到内存缓冲区。这种方法在复制是消耗的时间比较多,并且复制的是整个事件,内存的开销也大。
    2.零复制的事件派发
    这种策略是由框架控制事件的产生和销毁。事件从内存池中进行分配,然后生产者将事件填充,在以指针的形式放入事件队列。活动对象读取队列中的指针,指针指向得到事件数据。然后用完的事件会被框架自动回收。

    静态事件和动态事件

    静态事件的参数不会变,而动态事件所带的参数会改变。所以动态事件需要动态分配。

    多路传输事件和引用计数器算法

    对于发送订阅这种模式下,实现对事件的回收可以通过使用标准的引用计数器算法。它的原理是,事件被创建时计数器从0开始。这个事件被插入队列,计数器加1。每次对这个事件的垃圾收集会将计数器减1。仅当事件的计数器为0 时才会被回收。
    注意引用计数器在事件被从一个队列提取时没有递减,仅在稍后的垃圾收集步骤才会被递减。

    内存池

    使用标准函数malloc,free,new,delete等来管理动态分配堆会产生以下问题:
    1>.大量的动态分配会把堆搞得支离破碎
    2>.基于堆的内存管理是浪费的。所有的堆管理算法必须为每个被分配的块维护某些形式的头信息。
    3>.动态分配不可重入
    4>.容易造成内存泄露并且不好调试

    内存池是一个块尺寸固定的堆。因为大小固定,它可以使不被碎片化,并且没有没了头信息,减小了开销
    用内存池管理事件的策略:
    由框架来管理内存池,为了满足不同大小的事件的需要,内存池分为不同的大小。并且在使用时事件需标明从哪块内存池中分配的事件。

    时钟管理

    在事件驱动的框架下,时钟管理是把时钟当做一类事件进行处理。

    更多相关内容
  • Nginx对请求的处理是通过事件触发的,模块作为事件消费者,只能被事件收集、分发器调用。在Nginx中,接收到一个请求时,不会产生一个单独的进程来处理该请求,而是由事件收集、分发器(进程)调用某个模块,由模块处理...
  • 传统嵌入式单片机开发中...将量子框架中的 QF 框架充当软件总线,利用事件分发机制和活动对象划分在异步事件处理上的优势,从而得出基于STM32 的事件驱动框架可以扩展嵌入式单片机的灵活性,丰富嵌入式系统功能开发的结论
  • 事件驱动框架 EDA

    2018-02-09 17:14:15
    事件驱动框架 EDA 框架设计。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
  • Redis中的事件驱动框架 在上篇文章,学习了 Redis 事件驱动框架的基本工作机制,其中介绍了事件驱动框架基于的 Reactor 模型,并以 IO 事件中的客户端连接事件为例,给你介绍了框架运行的基本流程:从 server 初始化...

    Redis中的事件驱动框架

    在上篇文章,学习了 Redis 事件驱动框架的基本工作机制,其中介绍了事件驱动框架基于的 Reactor 模型,并以 IO 事件中的客户端连接事件为例,给你介绍了框架运行的基本流程:从 server 初始化时调用 aeCreateFileEvent 函数注册监听事件,到 server 初始化完成后调用 aeMain 函数,而 aeMain 函数循环执行 aeProceeEvent 函数,来捕获和处理客户端请求触发的事件。

    但是在上篇文章当中,我们主要关注的是框架基本流程,所以到这里,你或许仍然存有一些疑问,比如说:

    • Redis 事件驱动框架监听的 IO 事件,除了上节课介绍的客户端连接以外,还有没有其他事件?
    • 而除了 IO 事件以外,框架还会监听其他事件么?
    • 这些事件的创建和处理又分别对应了 Redis 源码中的哪些具体操作?

    今天这节课,我就来给你介绍下 Redis 事件驱动框架中的两大类事件类型:IO 事件和时间事件以及它们相应的处理机制。

    事实上,了解和学习这部分内容,一方面可以帮助我们更加全面地掌握,Redis 事件驱动框架是如何以事件形式,处理 server 运行过程中面临的请求操作和多种任务的。比如,正常的客户端读写请求是以什么事件、由哪个函数进行处理,以及后台快照任务又是如何及时启动的。

    因为事件驱动框架是 Redis server 运行后的核心循环流程,了解它何时用什么函数处理哪种事件,对我们排查 server 运行过程中遇到的问题,是很有帮助的。

    另一方面,我们还可以学习到如何在一个框架中,同时处理 IO 事件和时间事件。我们平时开发服务器端程序,经常需要处理周期性任务,而 Redis 关于两类事件的处理实现,就给了我们一个不错的参考。

    好,为了对这两类事件有个相对全面的了解,接下来,我们先从事件驱动框架循环流程的数据结构及其初始化开始学起,因为这里面就包含了针对这两类事件的数据结构定义和初始化操作。

    aeEventLoop 结构体与初始化

    首先,我们来看下 Redis 事件驱动框架循环流程对应的数据结构 aeEventLoop。这个结构体是在事件驱动框架代码ae.h中定义的,记录了框架循环运行过程中的信息,其中,就包含了记录两类事件的变量,分别是:

    • aeFileEvent 类型的*指针 events,表示 IO 事件。之所以类型名称为 aeFileEvent,是因为所有的 IO 事件都会用文件描述符进行标识;
    • aeTimeEvent 类型的*指针 timeEventHead,表示时间事件,即按一定时间周期触发的事件。

    此外,aeEventLoop 结构体中还有一个 aeFiredEvent 类型的指针 fired,这个并不是一类专门的事件类型,它只是用来记录已触发事件对应的文件描述符信息*。

    下面的代码显示了 Redis 中事件循环的结构体定义,你可以看下。

    ae.h文件中查看

    /* State of an event based program */
    typedef struct aeEventLoop {
        int maxfd;   /* highest file descriptor currently registered */
        int setsize; /* max number of file descriptors tracked */
        long long timeEventNextId;
        // IO事件数组
        aeFileEvent *events; /* Registered events */ 
        // 已经触发事件数组
        aeFiredEvent *fired; /* Fired events */
        // 记录时间事件链表头
        aeTimeEvent *timeEventHead;
        int stop;
        // 和API调用接口相关的数据
        void *apidata; /* This is used for polling API specific data */
        // 进入事件循环流程前执行的函数
        aeBeforeSleepProc *beforesleep;
        // 退出事件循环流程后执行的函数
        aeBeforeSleepProc *aftersleep;
        int flags;
    } aeEventLoop;
    

    了解了 aeEventLoop 结构体后,我们再来看下,这个结构体是如何初始化的,这其中就包括了 IO 事件数组和时间事件链表的初始化。

    aeCreateEventLoop 函数的初始化操作

    因为 Redis server 在完成初始化后,就要开始运行事件驱动框架的循环流程,所以,aeEventLoop 结构体在server.c的 initServer 函数中,就通过调用 aeCreateEventLoop 函数进行初始化了。这个函数的参数只有一个,是 setsize。

    下面的代码展示了 initServer 函数中对 aeCreateEventLoop 函数的调用。

    server.c文件中查看

    initServer() {
    …
    //调用aeCreateEventLoop函数创建aeEventLoop结构体,并赋值给server结构的el变量
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR);
    …
    }
    

    从这里我们可以看到参数 setsize 的大小,其实是由 server 结构的 maxclients 变量和宏定义 CONFIG_FDSET_INCR 共同决定的。其中,maxclients 变量的值大小,可以在 Redis 的配置文件 redis.conf 中进行定义,默认值是 1000。而宏定义 CONFIG_FDSET_INCR 的大小,等于宏定义 CONFIG_MIN_RESERVED_FDS 的值再加上 96,如下所示,这里的两个宏定义都是在server.h文件中定义的。

    #define CONFIG_MIN_RESERVED_FDS 32
    #define CONFIG_FDSET_INCR (CONFIG_MIN_RESERVED_FDS+96)
    

    好了,到这里,你可能有疑问了:aeCreateEventLoop 函数的参数 setsize,设置为最大客户端数量加上一个宏定义值,可是这个参数有什么用呢?这就和 aeCreateEventLoop 函数具体执行的初始化操作有关了。接下来,我们就来看下 aeCreateEventLoop 函数执行的操作,大致可以分成以下三个步骤。

    • 第一步,aeCreateEventLoop 函数会创建一个 aeEventLoop 结构体类型的变量 eventLoop。然后,该函数会给 eventLoop 的成员变量分配内存空间,比如,按照传入的参数 setsize,给 IO 事件数组和已触发事件数组分配相应的内存空间。此外,该函数还会给 eventLoop 的成员变量赋初始值。
    • 第二步,aeCreateEventLoop 函数会调用 aeApiCreate 函数。aeApiCreate 函数封装了操作系统提供的 IO 多路复用函数,假设 Redis 运行在 Linux 操作系统上,并且 IO 多路复用机制是 epoll,那么此时,aeApiCreate 函数就会调用 epoll_create 创建 epoll 实例,同时会创建 epoll_event 结构的数组,数组大小等于参数 setsize。

    这里你需要注意,aeApiCreate 函数是把创建的 epoll 实例描述符和 epoll_event 数组,保存在了 aeApiState 结构体类型的变量 state,如下所示:

    在ae_epoll.c文件中查看

    // aeApiState结构体定义
    typedef struct aeApiState {
        // epoll实例的描述符
        int epfd;
        // epoll_event结构体数组,记录监听事件
        struct epoll_event *events;
    } aeApiState;
    
    static int aeApiCreate(aeEventLoop *eventLoop) {
        aeApiState *state = zmalloc(sizeof(aeApiState));
    
        if (!state) return -1;
        // 将epoll_event数组保存在aeApiState结构体变量state中
        state->events = zmalloc(sizeof(struct epoll_event)*eventLoop->setsize);
        if (!state->events) {
            zfree(state);
            return -1;
        }
        // 将epoll实例描述符保存在aeApiState结构体变量state中
        state->epfd = epoll_create(1024); /* 1024 is just a hint for the kernel */
        if (state->epfd == -1) {
            zfree(state->events);
            zfree(state);
            return -1;
        }
        anetCloexec(state->epfd);
        eventLoop->apidata = state;
        return 0;
    }
    

    紧接着,aeApiCreate 函数把 state 变量赋值给 eventLoop 中的 apidata。这样一来,eventLoop 结构体中就有了 epoll 实例和 epoll_event 数组的信息,这样就可以用来基于 epoll 创建和处理事件了。我一会儿还会给你具体介绍。

    eventLoop->apidata = state;
    
    • 第三步,aeCreateEventLoop 函数会把所有网络 IO 事件对应文件描述符的掩码,初始化为 AE_NONE,表示暂时不对任何事件进行监听。

    我把 aeCreateEventLoop 函数的主要部分代码放在这里,你可以看下。

    ae.c文件中查看

    aeEventLoop *aeCreateEventLoop(int setsize) {
        aeEventLoop *eventLoop;
        int i;
    
        monotonicInit();    /* just in case the calling app didn't initialize */
    
        // 给eventLoop变量分配内存空间
        if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err;
        // 给IO事件、已触发事件分配内存空间
        eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize);
        eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize);
        if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err;
        eventLoop->setsize = setsize;
        // 设置时间事件的链表头为NULL
        eventLoop->timeEventHead = NULL;
        eventLoop->timeEventNextId = 0;
        eventLoop->stop = 0;
        eventLoop->maxfd = -1;
    
        eventLoop->beforesleep = NULL;
        eventLoop->aftersleep = NULL;
        eventLoop->flags = 0;
        // 调用aeApiCreate函数,去实际调用操作系统提供的IO多路复用函数
        if (aeApiCreate(eventLoop) == -1) goto err;
        /* Events with mask == AE_NONE are not set. So let's initialize the
         * vector with it. */
        // 将所有网络IO事件对应文件描述符的掩码设置为AE_NONE
        for (i = 0; i < setsize; i++)
            eventLoop->events[i].mask = AE_NONE;
        return eventLoop;
    //初始化失败后的处理逻辑
    err:
        if (eventLoop) {
            zfree(eventLoop->events);
            zfree(eventLoop->fired);
            zfree(eventLoop);
        }
        return NULL;
    }
    

    好,那么从 aeCreateEventLoop 函数的执行流程中,我们其实可以看到以下两个关键点:

    • 事件驱动框架监听的 IO 事件数组大小就等于参数 setsize,这样决定了和 Redis server 连接的客户端数量。所以,当你遇到客户端连接 Redis 时报错“max number of clients reached”,你就可以去 redis.conf 文件修改 maxclients 配置项,以扩充框架能监听的客户端数量。
    • 当使用 Linux 系统的 epoll 机制时,框架循环流程初始化操作,会通过 aeApiCreate 函数创建 epoll_event 结构数组,并调用 epoll_create 函数创建 epoll 实例,这都是使用 epoll 机制的准备工作要求。

    IO 事件处理

    事实上,Redis 的 IO 事件主要包括三类,分别是可读事件、可写事件和屏障事件。

    其中,可读事件和可写事件其实比较好理解,也就是对应于 Redis 实例,我们可以从客户端读取数据或是向客户端写入数据。而屏障事件的主要作用是用来反转事件的处理顺序。比如在默认情况下,Redis 会先给客户端返回结果,但是如果面临需要把数据尽快写入磁盘的情况,Redis 就会用到屏障事件,把写数据和回复客户端的顺序做下调整,先把数据落盘,再给客户端回复。

    在 Redis 源码中,IO 事件的数据结构是 aeFileEvent 结构体,IO 事件的创建是通过 aeCreateFileEvent 函数来完成的。下面的代码展示了 aeFileEvent 结构体的定义,你可以再回顾下:

    typedef struct aeFileEvent {
        int mask; //掩码标记,包括可读事件、可写事件和屏障事件
        aeFileProc *rfileProc;   //处理可读事件的回调函数
        aeFileProc *wfileProc;   //处理可写事件的回调函数
        void *clientData;  //私有数据
    } aeFileEvent;
    

    而对于 aeCreateFileEvent 函数来说,在上节课我们已经了解了它是通过 aeApiAddEvent 函数来完成事件注册的。那么接下来,我们再从代码级别看下它是如何执行的,这可以帮助我们更加透彻地理解,事件驱动框架对 IO 事件监听是如何基于 epoll 机制对应封装的。

    IO 事件创建

    首先,我们来看 aeCreateFileEvent 函数的原型定义,如下所示:

    ae.c文件中查看

    int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData)
    {
        if (fd >= eventLoop->setsize) {
            errno = ERANGE;
            return AE_ERR;
        }
        aeFileEvent *fe = &eventLoop->events[fd];
    
        if (aeApiAddEvent(eventLoop, fd, mask) == -1)
            return AE_ERR;
        fe->mask |= mask;
        if (mask & AE_READABLE) fe->rfileProc = proc;
        if (mask & AE_WRITABLE) fe->wfileProc = proc;
        fe->clientData = clientData;
        if (fd > eventLoop->maxfd)
            eventLoop->maxfd = fd;
        return AE_OK;
    }
    
    int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, aeFileProc *proc, void *clientData)
    

    这个函数的参数有 5 个,分别是

    • 循环流程结构体 *eventLoop
    • IO 事件对应的文件描述符 fd
    • 事件类型掩码 mask
    • 事件处理回调函数*proc
    • 事件私有数据*clientData。

    因为循环流程结构体*eventLoop中有 IO 事件数组,这个数组的元素是 aeFileEvent 类型,所以,每个数组元素都对应记录了一个文件描述符(比如一个套接字)相关联的监听事件类型和回调函数。

    aeCreateFileEvent 函数会先根据传入的文件描述符 fd,在 eventLoop 的 IO 事件数组中,获取该描述符关联的 IO 事件指针变量*fe,如下所示:

    aeFileEvent *fe = &eventLoop->events[fd];
    

    紧接着,aeCreateFileEvent 函数会调用 aeApiAddEvent 函数,添加要监听的事件:

    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
       return AE_ERR;
    

    aeApiAddEvent 函数实际上会调用操作系统提供的 IO 多路复用函数,来完成事件的添加。我们还是假设 Redis 实例运行在使用 epoll 机制的 Linux 上,那么 aeApiAddEvent 函数就会调用 epoll_ctl 函数,添加要监听的事件。我在第 9 讲中其实已经给你介绍过 epoll_ctl 函数,这个函数会接收 4 个参数,分别是:

    • epoll 实例;
    • 要执行的操作类型(是添加还是修改);
    • 要监听的文件描述符;
    • epoll_event 类型变量。

    那么,这个调用过程是如何准备 epoll_ctl 函数需要的参数,从而完成执行的呢?

    • 首先,epoll 实例是我刚才给你介绍的 aeCreateEventLoop 函数,它是通过调用 aeApiCreate 函数来创建的,保存在了 eventLoop 结构体的 apidata 变量中,类型是 aeApiState。所以,aeApiAddEvent 函数会先获取该变量,如下所示:
    static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
        //从eventLoop结构体中获取aeApiState变量,里面保存了epoll实例
      aeApiState *state = eventLoop->apidata;
        ...
     }
    
    • 其次,对于要执行的操作类型的设置,aeApiAddEvent 函数会根据传入的文件描述符 fd,在 eventLoop 结构体中 IO 事件数组中查找该 fd。因为 IO 事件数组的每个元素,都对应了一个文件描述符,而该数组初始化时,每个元素的值都设置为了 AE_NONE。

    所以,如果要监听的文件描述符 fd 在数组中的类型不是 AE_NONE,则表明该描述符已做过设置,那么操作类型就是修改操作,对应 epoll 机制中的宏定义 EPOLL_CTL_MOD。否则,操作类型就是添加操作,对应 epoll 机制中的宏定义 EPOLL_CTL_ADD。这部分代码如下所示:

    //如果文件描述符fd对应的IO事件已存在,则操作类型为修改,否则为添加
     int op = eventLoop->events[fd].mask == AE_NONE ? EPOLL_CTL_ADD : EPOLL_CTL_MOD;
    

    第三,epoll_ctl 函数需要的监听文件描述符,就是 aeApiAddEvent 函数接收到的参数 fd。

    • 最后,epoll_ctl 函数还需要一个 epoll_event 类型变量,因此 aeApiAddEvent 函数在调用 epoll_ctl 函数前,会新创建 epoll_event 类型变量 ee。然后,aeApiAddEvent 函数会设置变量 ee 中的监听事件类型和监听文件描述符。

    aeApiAddEvent 函数的参数 mask,表示的是要监听的事件类型掩码。所以,aeApiAddEvent 函数会根据掩码值是可读(AE_READABLE)或可写(AE_WRITABLE)事件,来设置 ee 监听的事件类型是 EPOLLIN 还是 EPOLLOUT。这样一来,Redis 事件驱动框架中的读写事件就能够和 epoll 机制中的读写事件对应上来。下面的代码展示了这部分逻辑,你可以看下。

    …
    struct epoll_event ee = {0}; //创建epoll_event类型变量
    …
    //将可读或可写IO事件类型转换为epoll监听的类型EPOLLIN或EPOLLOUT
    if (mask & AE_READABLE) ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;
    ee.data.fd = fd;  //将要监听的文件描述符赋值给ee
    …  
    

    好了,到这里,aeApiAddEvent 函数就准备好了 epoll 实例、操作类型、监听文件描述符以及 epoll_event 类型变量,然后,它就会调用 epoll_ctl 开始实际创建监听事件了,如下所示:

    static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    ...
    //调用epoll_ctl实际创建监听事件
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
    }
    

    了解了这些代码后,我们可以学习到事件驱动框架是如何基于 epoll,封装实现了 IO 事件的创建。那么,在 Redis server 启动运行后,最开始监听的 IO 事件是可读事件,对应于客户端的连接请求。具体是 initServer 函数调用了 aeCreateFileEvent 函数,创建可读事件,并设置回调函数为 acceptTcpHandler,用来处理客户端连接。

    接下来,我们再来看下一旦有了客户端连接请求后,IO 事件具体是如何处理的呢?

    读事件处理

    当 Redis server 接收到客户端的连接请求时,就会使用注册好的 acceptTcpHandler 函数进行处理。

    acceptTcpHandler 函数是在networking.c文件中,它会接受客户端连接,并创建已连接套接字 cfd。然后,acceptCommonHandler 函数(在 networking.c 文件中)会被调用,同时,刚刚创建的已连接套接字 cfd 会作为参数,传递给 acceptCommonHandler 函数。

    networking文件中查看

    void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
        int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
        char cip[NET_IP_STR_LEN];
        UNUSED(el);
        UNUSED(mask);
        UNUSED(privdata);
    
        while(max--) {
            cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
            if (cfd == ANET_ERR) {
                if (errno != EWOULDBLOCK)
                    serverLog(LL_WARNING,
                        "Accepting client connection: %s", server.neterr);
                return;
            }
            anetCloexec(cfd);
            serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
            acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
        }
    }
    

    此时,aeCreateFileEvent 函数会针对已连接套接字上,创建监听事件,类型为 AE_READABLE,回调函数是 readQueryFromClient(在 networking.c 文件中)。

    好了,到这里,事件驱动框架就增加了对一个客户端已连接套接字的监听。一旦客户端有请求发送到 server,框架就会回调 readQueryFromClient 函数处理请求。这样一来,客户端请求就能通过事件驱动框架进行处理了

    下面代码展示了 createClient 函数调用 aeCreateFileEvent 的过程

    client *createClient(int fd) {
    …
        if (fd != -1) {
                …
                //调用aeCreateFileEvent,监听读事件,对应客户端读写请求,使用readQueryFromclient回调函数处理
                if (aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient, c) == AE_ERR)
                {
                    close(fd);
                    zfree(c);
                    return NULL;
                } 
        }
    …
    }
    

    为了便于你掌握从监听客户端连接请求到监听客户端常规读写请求的事件创建过程,我画了下面这张图,你可以看下。

    img

    了解了事件驱动框架中的读事件处理之后,我们再来看下写事件的处理。

    写事件处理

    Redis 实例在收到客户端请求后,会在处理客户端命令后,将要返回的数据写入客户端输出缓冲区。下图就展示了这个过程的函数调用逻辑:

    img

    而在 Redis 事件驱动框架每次循环进入事件处理函数前,也就是在框架主函数 aeMain 中调用 aeProcessEvents,来处理监听到的已触发事件或是到时的时间事件之前,都会调用 server.c 文件中的 beforeSleep 函数,进行一些任务处理,这其中就包括了调用 handleClientsWithPendingWrites 函数,它会将 Redis sever 客户端缓冲区中的数据写回客户端。

    下面给出的代码是事件驱动框架的主函数 aeMain。在该函数每次调用 aeProcessEvents 函数前,就会调用 beforeSleep 函数,你可以看下。

    ae.c文件中查看

    void aeMain(aeEventLoop *eventLoop) {
        eventLoop->stop = 0;
      while (!eventLoop->stop) {
          //如果beforeSleep函数不为空,则调用beforeSleep函数
            if (eventLoop->beforesleep != NULL)
                eventLoop->beforesleep(eventLoop);
            //调用完beforeSleep函数,再处理事件
            aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
        }
    }
    

    这里你要知道,beforeSleep 函数调用的 handleClientsWithPendingWrites 函数,会遍历每一个待写回数据的客户端,然后调用 writeToClient 函数,将客户端输出缓冲区中的数据写回。下面这张图展示了这个流程,你可以看下。

    img

    不过,如果输出缓冲区的数据还没有写完,此时,handleClientsWithPendingWrites 函数就会调用 aeCreateFileEvent 函数,创建可写事件,并设置回调函数 sendReplyToClientsendReplyToClient 函数里面会调用 writeToClient 函数写回数据。

    下面的代码展示了 handleClientsWithPendingWrite 函数的基本流程,你可以看下。

    networking.c文件中查看

    /* This function is called just before entering the event loop, in the hope
     * we can just write the replies to the client output buffer without any
     * need to use a syscall in order to install the writable event handler,
     * get it called, and so forth. */
    int handleClientsWithPendingWrites(void) {
        listIter li;
        listNode *ln;
        int processed = listLength(server.clients_pending_write);
    
        // 获取待写回的客户端列表
        listRewind(server.clients_pending_write,&li);
        // 遍历每一个待写回的客户端
        while((ln = listNext(&li))) {
            client *c = listNodeValue(ln);
            c->flags &= ~CLIENT_PENDING_WRITE;
            listDelNode(server.clients_pending_write,ln);
    
            /* If a client is protected, don't do anything,
             * that may trigger write error or recreate handler. */
            if (c->flags & CLIENT_PROTECTED) continue;
    
            /* Don't write to clients that are going to be closed anyway. */
            if (c->flags & CLIENT_CLOSE_ASAP) continue;
    
            // 调用writeToClient将当前客户端的输出缓冲区数据写回
            /* Try to write buffers to the client socket. */
            if (writeToClient(c,0) == C_ERR) continue;
    
            // 如果还有待写回数据
            /* If after the synchronous writes above we still have data to
             * output to the client, we need to install the writable handler. */
            if (clientHasPendingReplies(c)) {
                int ae_barrier = 0;
                /* For the fsync=always policy, we want that a given FD is never
                 * served for reading and writing in the same event loop iteration,
                 * so that in the middle of receiving the query, and serving it
                 * to the client, we'll call beforeSleep() that will do the
                 * actual fsync of AOF to disk. the write barrier ensures that. */
                // 创建可写事件的监听,以及设置回调函数
                if (server.aof_state == AOF_ON &&
                    server.aof_fsync == AOF_FSYNC_ALWAYS)
                {
                    ae_barrier = 1;
                }
                if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
                    freeClientAsync(c);
                }
            }
        }
        return processed;
    }
    

    好了,我们刚才了解的是读写事件对应的回调处理函数。实际上,为了能及时处理这些事件,Redis 事件驱动框架的 aeMain 函数还会循环调用 aeProcessEvents 函数,来检测已触发的事件,并调用相应的回调函数进行处理

    从 aeProcessEvents 函数的代码中,我们可以看到该函数会调用 aeApiPoll 函数,查询监听的文件描述符中,有哪些已经就绪。一旦有描述符就绪,aeProcessEvents 函数就会根据事件的可读或可写类型,调用相应的回调函数进行处理。aeProcessEvents 函数调用的基本流程如下所示:

    ae.c文件中查看

    /* Process every pending time event, then every pending file event
     * (that may be registered by time event callbacks just processed).
     * Without special flags the function sleeps until some file event
     * fires, or when the next time event occurs (if any).
     *
     * If flags is 0, the function does nothing and returns.
     * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
     * if flags has AE_FILE_EVENTS set, file events are processed.
     * if flags has AE_TIME_EVENTS set, time events are processed.
     * if flags has AE_DONT_WAIT set the function returns ASAP until all
     * the events that's possible to process without to wait are processed.
     * if flags has AE_CALL_AFTER_SLEEP set, the aftersleep callback is called.
     * if flags has AE_CALL_BEFORE_SLEEP set, the beforesleep callback is called.
     *
     * The function returns the number of events processed. */
    int aeProcessEvents(aeEventLoop *eventLoop, int flags)
    {
        int processed = 0, numevents;
    
        /* 若没有事件处理,则立刻返回*/
        /* Nothing to do? return ASAP */
        if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
    
        /* Note that we want to call select() even if there are no
         * file events to process as long as we want to process time
         * events, in order to sleep until the next time event is ready
         * to fire. */
        /*如果有IO事件发生,或者紧急的时间事件发生,则开始处理*/
        // 请注意,即使没有要处理的文件事件,只要我们想要处理时间事件,我们也想调用 select(),以便在下一次事件准备好触发之前sleep
        if (eventLoop->maxfd != -1 ||
            ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
            int j;
            struct timeval tv, *tvp;
            int64_t usUntilTimer = -1;
    
            if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
                usUntilTimer = usUntilEarliestTimer(eventLoop);
    
            if (usUntilTimer >= 0) {
                tv.tv_sec = usUntilTimer / 1000000;
                tv.tv_usec = usUntilTimer % 1000000;
                tvp = &tv;
            } else {
                /* If we have to check for events but need to return
                 * ASAP because of AE_DONT_WAIT we need to set the timeout
                 * to zero */
                if (flags & AE_DONT_WAIT) {
                    tv.tv_sec = tv.tv_usec = 0;
                    tvp = &tv;
                } else {
                    /* Otherwise we can block */
                    tvp = NULL; /* wait forever */
                }
            }
    
            if (eventLoop->flags & AE_DONT_WAIT) {
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            }
    
            if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
                eventLoop->beforesleep(eventLoop);
    
            /* Call the multiplexing API, will return only on timeout or when
             * some event fires. */
            // 调用aeApiPoll获取就绪的描述符
            numevents = aeApiPoll(eventLoop, tvp);
    
            /* After sleep callback. */
            if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP)
                eventLoop->aftersleep(eventLoop);
    
            for (j = 0; j < numevents; j++) {
                aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
                int mask = eventLoop->fired[j].mask;
                int fd = eventLoop->fired[j].fd;
                int fired = 0; /* Number of events fired for current fd. */
    
                /* Normally we execute the readable event first, and the writable
                 * event later. This is useful as sometimes we may be able
                 * to serve the reply of a query immediately after processing the
                 * query.
                 *
                 * However if AE_BARRIER is set in the mask, our application is
                 * asking us to do the reverse: never fire the writable event
                 * after the readable. In such a case, we invert the calls.
                 * This is useful when, for instance, we want to do things
                 * in the beforeSleep() hook, like fsyncing a file to disk,
                 * before replying to a client. */
                int invert = fe->mask & AE_BARRIER;
    
                /* Note the "fe->mask & mask & ..." code: maybe an already
                 * processed event removed an element that fired and we still
                 * didn't processed, so we check if the event is still valid.
                 *
                 * Fire the readable event if the call sequence is not
                 * inverted. */
                // 如果触发的是可读事件,调用事件注册时设置的读事件回调处理函数
                if (!invert && fe->mask & mask & AE_READABLE) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                    fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
                }
    
                // 如果触发的是可写事件,调用事件注册时设置的写事件回调处理函数
                /* Fire the writable event. */
                if (fe->mask & mask & AE_WRITABLE) {
                    if (!fired || fe->wfileProc != fe->rfileProc) {
                        fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                        fired++;
                    }
                }
    
                /* If we have to invert the call, fire the readable event now
                 * after the writable one. */
                if (invert) {
                    fe = &eventLoop->events[fd]; /* Refresh in case of resize. */
                    if ((fe->mask & mask & AE_READABLE) &&
                        (!fired || fe->wfileProc != fe->rfileProc))
                    {
                        fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                        fired++;
                    }
                }
    
                processed++;
            }
        }
        /* Check time events */
        /* 检查是否有时间事件,若有,则调用processTimeEvents函数处理 */
        if (flags & AE_TIME_EVENTS)
            processed += processTimeEvents(eventLoop);
        /* 返回已经处理的文件或时间*/
        return processed; /* return the number of processed file/time events */
    }
    

    到这里,我们就了解了 IO 事件的创建函数 aeCreateFileEvent,以及在处理客户端请求时对应的读写事件和它们的处理函数。那么接下来,我们再来看看事件驱动框架中的时间事件是怎么创建和处理的。

    时间事件处理

    其实,相比于 IO 事件有可读、可写、屏障类型,以及不同类型 IO 事件有不同回调函数来说,时间事件的处理就比较简单了。下面,我们就来分别学习下它的定义、创建、回调函数和触发处理。

    时间事件定义

    首先,我们来看下时间事件的结构体定义,代码如下所示:

    ae.h文件中查看

    /* Time event structure */
    typedef struct aeTimeEvent {
        // 时间事件ID
        long long id; /* time event identifier. */
        // 事件到达的微秒级时间戳
        monotime when;
        // 时间事件触发后的处理函数
        aeTimeProc *timeProc;
        // 事件结束后的处理函数
        aeEventFinalizerProc *finalizerProc;
        // 事件相关的私有数据
        void *clientData;
        // 时间事件链表的前向指针
        struct aeTimeEvent *prev;
        // 时间事件链表的后向指针
        struct aeTimeEvent *next;
        int refcount; /* refcount to prevent timer events from being
             * freed in recursive time event calls. */
    } aeTimeEvent;
    

    时间事件结构体中主要的变量,包括以微秒记录的时间事件触发时的时间戳 when,以及时间事件触发后的处理函数*timeProc。另外,在时间事件的结构体中,还包含了前向和后向指针*prev和*next,这表明时间事件是以链表的形式组织起来的。

    在了解了时间事件结构体的定义以后,我们接着来看下,时间事件是如何创建的。

    时间事件创建

    与 IO 事件创建使用 aeCreateFileEvent 函数类似,时间事件的创建函数是 aeCreateTimeEvent 函数。这个函数的原型定义如下所示:

    ae.c文件中查看

    long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds,
            aeTimeProc *proc, void *clientData,
            aeEventFinalizerProc *finalizerProc)
    {
        long long id = eventLoop->timeEventNextId++;
        aeTimeEvent *te;
    
        te = zmalloc(sizeof(*te));
        if (te == NULL) return AE_ERR;
        te->id = id;
        te->when = getMonotonicUs() + milliseconds * 1000;
        te->timeProc = proc;
        te->finalizerProc = finalizerProc;
        te->clientData = clientData;
        te->prev = NULL;
        te->next = eventLoop->timeEventHead;
        te->refcount = 0;
        if (te->next)
            te->next->prev = te;
        eventLoop->timeEventHead = te;
        return id;
    }
    

    在它的参数中,有两个需要我们重点了解下,以便于我们理解时间事件的处理。一个是 milliseconds,这是所创建时间事件的触发时间距离当前时间的时长,是用毫秒表示的。另一个是 *proc,这是所创建时间事件触发后的回调函数。

    aeCreateTimeEvent 函数的执行逻辑不复杂,主要就是创建一个时间事件的变量 te,对它进行初始化,并把它插入到框架循环流程结构体 eventLoop 中的时间事件链表中。在这个过程中,aeCreateTimeEvent 函数会调用 aeAddMillisecondsToNow 函数,根据传入的 milliseconds 参数,计算所创建时间事件具体的触发时间戳,并赋值给 te。

    实际上,Redis server 在初始化时,除了创建监听的 IO 事件外,也会调用 aeCreateTimeEvent 函数创建时间事件。下面代码显示了 initServer 函数对 aeCreateTimeEvent 函数的调用:

    initServer() {
        …
        //创建时间事件
        if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR){
        … //报错信息
        }
    }
    

    从代码中,我们可以看到,时间事件触发后的回调函数是 serverCron。所以接下来,我们就来了解下 serverCron 函数。

    时间事件回调函数

    serverCron 函数是在 server.c 文件中实现的。

    • 一方面,它会顺序调用一些函数,来实现时间事件被触发后,执行一些后台任务。比如,serverCron 函数会检查是否有进程结束信号,若有就执行 server 关闭操作。serverCron 会调用 databaseCron 函数,处理过期 key 或进行 rehash 等。你可以参考下面给出的代码:
    ...
    //如果收到进程结束信号,则执行server关闭操作
     if (server.shutdown_asap) {
            if (prepareForShutdown(SHUTDOWN_NOFLAGS) == C_OK) exit(0);
            ...
     }
    ...
    clientCron();  //执行客户端的异步操作
    databaseCron(); //执行数据库的后台操作
    ...
    
    • 另一方面,serverCron 函数还会以不同的频率周期性执行一些任务,这是通过执行宏 run_with_period 来实现的。

    run_with_period 宏定义如下,该宏定义会根据 Redis 实例配置文件 redis.conf 中定义的 hz 值,来判断参数 ms 表示的时间戳是否到达。一旦到达,serverCron 就可以执行相应的任务了。

    server.h文件中查看

    /* Using the following macro you can run code inside serverCron() with the
     * specified period, specified in milliseconds.
     * The actual resolution depends on server.hz. */
    #define run_with_period(_ms_) if ((_ms_ <= 1000/server.hz) || !(server.cronloops%((_ms_)/(1000/server.hz))))
    

    比如,serverCron 函数中会以 1 秒 1 次的频率,检查 AOF 文件是否有写错误。如果有的话,serverCron 就会调用 flushAppendOnlyFile 函数,再次刷回 AOF 文件的缓存数据。下面的代码展示了这一周期性任务:

    serverCron() {
       …
       //每1秒执行1次,检查AOF是否有写错误
       run_with_period(1000) {
            if (server.aof_last_write_status == C_ERR)
                flushAppendOnlyFile(0);
        }
       …
    }
    

    如果你想了解更多的周期性任务,可以再详细阅读下 serverCron 函数中,以 run_with_period 宏定义包含的代码块。

    好了,了解了时间事件触发后的回调函数 serverCron,我们最后来看下,时间事件是如何触发处理的。

    时间事件的触发处理

    其实,时间事件的检测触发比较简单,事件驱动框架的 aeMain 函数会循环调用 aeProcessEvents 函数,来处理各种事件。而 aeProcessEvents 函数在执行流程的最后,会调用 processTimeEvents 函数处理相应到时的任务。

    aeProcessEvents(){
    	…
        //检测时间事件是否触发
        if (flags & AE_TIME_EVENTS)
                processed += processTimeEvents(eventLoop);
        …
    }
    

    那么,具体到 proecessTimeEvent 函数来说,它的基本流程就是从时间事件链表上逐一取出每一个事件,然后根据当前时间判断该事件的触发时间戳是否已满足。如果已满足,那么就调用该事件对应的回调函数进行处理。这样一来,周期性任务就能在不断循环执行的 aeProcessEvents 函数中,得到执行了。

    下面的代码显示了 processTimeEvents 函数的基本流程,你可以再看下。

    ae.c文件中查看

    /* Process time events */
    static int processTimeEvents(aeEventLoop *eventLoop) {
        int processed = 0;
        aeTimeEvent *te;
        long long maxId;
    
        // 从时间事件链表中取出事件
        te = eventLoop->timeEventHead;
        maxId = eventLoop->timeEventNextId-1;
        // 获取当前时间
        monotime now = getMonotonicUs();
        while(te) {
            long long id;
    
            /* Remove events scheduled for deletion. */
            // 删除计划删除的事件
            if (te->id == AE_DELETED_EVENT_ID) {
                aeTimeEvent *next = te->next;
                /* If a reference exists for this timer event,
                 * don't free it. This is currently incremented
                 * for recursive timerProc calls */
                // 如果此计时器事件存在引用,请不要释放它。因为它会由于递归 timerProc 调用而增加
                if (te->refcount) {
                    te = next;
                    continue;
                }
                if (te->prev)
                    te->prev->next = te->next;
                else
                    eventLoop->timeEventHead = te->next;
                if (te->next)
                    te->next->prev = te->prev;
                if (te->finalizerProc) {
                    te->finalizerProc(eventLoop, te->clientData);
                    now = getMonotonicUs();
                }
                zfree(te);
                te = next;
                continue;
            }
    
            /* Make sure we don't process time events created by time events in
             * this iteration. Note that this check is currently useless: we always
             * add new timers on the head, however if we change the implementation
             * detail, this check may be useful again: we keep it here for future
             * defense. */
            if (te->id > maxId) {
                te = te->next;
                continue;
            }
    
            // 时间事件的时间小于当前时间点
            if (te->when <= now) {
                int retval;
    
                id = te->id;
                te->refcount++;
                retval = te->timeProc(eventLoop, id, te->clientData);
                te->refcount--;
                processed++;
                now = getMonotonicUs();
                if (retval != AE_NOMORE) {
                    te->when = now + retval * 1000;
                } else {
                    te->id = AE_DELETED_EVENT_ID;
                }
            }
            // 获取下一个时间事件
            te = te->next;
        }
        return processed;
    }
    

    这节课,我给你介绍了 Redis 事件驱动框架中的两类事件:IO 事件和时间事件。

    对于 IO 事件来说,它可以进一步分成可读、可写和屏障事件。因为可读、可写事件在 Redis 和客户端通信处理请求过程中使用广泛,所以今天我们重点学习了这两种 IO 事件。当 Redis server 创建 Socket 后,就会注册可读事件,并使用 acceptTCPHandler 回调函数处理客户端的连接请求。

    当 server 和客户端完成连接建立后,server 会在已连接套接字上监听可读事件,并使用 readQueryFromClient 函数处理客户端读写请求。这里,你需要再注意下,无论客户端发送的请求是读或写操作,对于 server 来说,都是要读取客户端的请求并解析处理。所以,server 在客户端的已连接套接字上注册的是可读事件。而当实例需要向客户端写回数据时,实例会在事件驱动框架中注册可写事件,并使用 sendReplyToClient 作为回调函数,将缓冲区中数据写回客户端。

    我总结了一张表格,以便你再回顾下 IO 事件和相应套接字、回调函数的对应关系。

    img

    然后,对于时间事件来说,它主要是用于在事件驱动框架中注册一些周期性执行的任务,以便 Redis server 进行后台处理。时间事件的回调函数是 serverCron 函数,你可以做进一步阅读了解其中的具体任务。

    Redis 在调用 aeApiCreate、aeApiAddEvent 这些函数时,是根据什么条件来决定,具体调用哪个文件中的 IO 多路复用函数的?

    在 ae.c 中,根据不同平台,首先定义好了要导入的封装好的 IO 多路复用函数,每个平台对应的文件中都定义了 aeApiCreate、aeApiAddEvent 这类函数,在执行时就会执行对应平台的函数逻辑。

    /* Include the best multiplexing layer supported by this system.
     * The following should be ordered by performances, descending. */
    // 以下应按性能顺序排列,降序排列
    #ifdef HAVE_EVPORT
    #include "ae_evport.c" // Solaris
    #else
        #ifdef HAVE_EPOLL
        #include "ae_epoll.c" // Linux
        #else
            #ifdef HAVE_KQUEUE
            #include "ae_kqueue.c" // MacOS
            #else
            #include "ae_select.c" // windows
            #endif
        #endif
    #endif
    
    展开全文
  • 消息系统事件驱动框架参照一些消息系统中的模式。我们将进行如下类比。 事件与消息,事件处理器与通道,事件转发器与路由。一个实例是邮递系统。邮递员有一个背包里面有若干信件,上面有要寄送的地址,邮递员必须将...

    上一篇文章中,我们介绍了事件驱动的基础组件。本文,我们将开发一个事件驱动的框架。

    消息系统

    事件驱动框架参照一些消息系统中的模式。我们将进行如下类比。 事件与消息,事件处理器与通道,事件转发器与路由。

    一个实例是邮递系统。邮递员有一个背包里面有若干信件,上面有要寄送的地址,邮递员必须将信件寄送到相应的地址。这个过程可以按如下形式描述?

    procedure deliver_letters(satchel):

    repeat

    letter := next_letter(satchel)

    for home in homes do:

    if letter.destination == home:

    deliver_letter(home)

    end if

    end for

    until satchel is empty

    end procedure

    这个例子可以使用事件驱动编程建模。接下来我们将以最抽象的形式来开发一个框架,来对系统建模。

    消息

    每个消息有一个具体的类型,通过类型将消息与处理器相关联。消息接口定义如下。

    public interface Message {

    public Class extends Message> getType();

    }

    管道(Channel)

    管道与某种类型的消息关联。我们将消息派发给每种消息各自的管道进行处理。

    public interface Channel {

    public void dispatch(E message);

    }

    动态路由

    消息系统的交互是通过路由完成。路由负责将给定的消息转发到具体的通道上。初始化阶段,路由会注册消息与其关联的通道。随后,由路由转发的消息会自动匹配消息类型及关联的管道,并将消息路由到关联的管道中。

    public interface DynamicRouter { public void registerChannel(Class extends E> contentType, Channel extends E> channel); public abstract void dispatch(E content); }

    接下来将实现这些接口以便创建一个完整的框架。

    框架实现

    事件

    我们将事件定义为消息的子类。

    import Message; public class Event implements Message { @Override public Class extends Message> getType() { return getClass(); } }

    处理器

    处理器最终接收事件,并处理,这里将实现管道接口。

    import Channel;

    public class Handler implements Channel {

    @Override

    public void dispatch(Event message) {

    System.out.println(message.getClass());

    }

    }

    事件转发器

    事件转发器用来注册消息与通道。本例中,我们注册处理器及其关联的事件类。我们使用HashMap将这件与处理器关联。

    import java.util.HashMap; import java.util.Map; import edu.giocc.util.router.Channel; import edu.giocc.util.router.DynamicRouter; public class EventDispatcher implements DynamicRouter { private Map, Handler> handlers; public EventDispatcher() { handlers = new HashMap, Handler>(); } @Override public void registerChannel(Class extends Event> contentType, Channel extends Event> channel) { handlers.put(contentType, (Handler)channel); } @Override public void dispatch(Event content) { handlers.get(content.getClass()).dispatch(content); } }

    基本框架完成,接下来进行行测。

    public class Program {

    public static void main(String[] args) {

    EventDispatcher dispatcher = new EventDispatcher();

    dispatcher.registerChannel(Event.class, new Handler());

    dispatcher.dispatch(new Event());

    }

    }

    在应用程序中进行扩展

    现在框架已经建立,通用框架一般应具备下列特征。

    控制反转 如同消息系统一样,框架来控制数据流。

    扩展性 应用程序可根据使用来进行扩展。

    框架代码不可修改 不能修改框架代码。

    上述提到的属性解释了应用如何与框架连接。框架仅仅是架构的一个抽象。

    12b59efda4bc27dab1eaad6b758ed9fb.png

    Handler和Event类是framework层的类。基于框架开发的代码应该位于应用层。框架职责是抽象并提供事件驱动架构的基础工具。

    现在我们可以继承Handler类,来创建我们的事件处理器,继承Event类来创建我们的事件。此外,我们在派发器中注册这些事件处理器与事件。

    下一篇文章中将展示一个框架的具体实现,以便模拟事件驱动系统。

    展开全文
  • Q-Controllers是一个事件驱动的应用代码框架,适用于低端单片机无法跑操作系统,但又要处理越来越复杂的代码构架的情况。 因为不依赖于操作系统,所以非常容易被移植到stm32之外的其他单片机上。即便不进行移植,...
  • 事件驱动研究的经典论文,给出设计框架,稳定性证明等
  • Linux流媒体服务器中异步事件驱动框架的研究与探讨.pdf
  • Java事件驱动模型框架实现

    千次阅读 2022-04-21 16:50:32
    一、框架实现篇 关键角色: 事件源(XXXEvent):能够接收外部事件的源体,内部包含事件的类型。 事件监听器(XXXEventListener):能够接收事件源通知的对象。 事件分发器(EventDispatcher):维护事件类型...

    一、框架实现篇 

    关键角色

    • 事件源(XXXEvent):能够接收外部事件的源体,内部包含事件的类型。
    • 事件监听器(XXXEventListener):能够接收事件源通知的对象。
    • 事件分发器(EventDispatcher):维护事件类型与事件监听器列表的映射关系,接收事件并进行事件的派发。
    • 事件监听器管理类(EventListenerManager):声明一系列的事件监听器,通过@EventAnnotation定义该事件监听器感兴趣的事件类型,最后通过反射的方式实现事件监听器自动注册到EventDispatcher。

    架构图

    ------------------------------------------------------------------------------------------------------------

    话不多说,直接上代码

    -------------------------------------------------------------------------------------------------------------

    (1)定义事件类型EventType

    package com.example.demo.eventdriven;
    
    /**
     * 
    * @ClassName: EventType
    * @Description: 事件类型
    * @Author: liulianglin
    * @DateTime 2022年4月21日 下午3:35:56
     */
    public enum EventType {
    	LOGIN,//登陆事件
    	EXIT,//退出事件
    	;
    }
    

    (2)定义事件基类BaseEvent

    package com.example.demo.eventdriven;
    
    /**
     * 
    * @ClassName: BaseEvent
    * @Description: 基础事件类
    * @Author: liulianglin
    * @DateTime 2022年4月21日 下午3:34:19
     */
    public class BaseEvent {
    	//是否在消息主线程同步执行
    	private boolean sync = true;
    	
    	//事件类型
    	private final EventType evtType;
    	
    	public BaseEvent (EventType evtType) {
    		this.evtType = evtType;
    	}
    	
    	public BaseEvent (EventType evtType,boolean sync) {
    		this.evtType = evtType;
    		this.sync = sync;
    	}
    
    	public EventType getEvtType() {
    		return evtType;
    	}
    	
    	/**
    	 * 是否在消息主线程同步执行
    	 * @return
    	 */
    	public boolean isSync() {
    		return sync;
    	}
    	
    	public void setSync (boolean sync) {
    		this.sync = sync;
    	}
    }
    

    (3)定义事件监听器接口EventListener

    package com.example.demo.eventdriven;
    
    /**
     * 
    * @ClassName: EventListener
    * @Description: 事件监听接口
    * @Author: liulianglin
    * @DateTime 2022年4月21日 下午3:35:21
     */
    public interface EventListener<E> {
    
    	/**
    	 * 事件触发后,处理具体逻辑
    	 * @param event
    	 */
    	public void handleEvent(E event);
    }
    

    (4)定义事件监听器管理器抽象类AbstractEventListenerManager

    package com.example.demo.eventdriven;
    
    import java.lang.reflect.Field;
    import java.lang.reflect.InvocationTargetException;
    
    
    /**
     * 
    * @ClassName: AbstractEventListenerManager
    * @Description: 事件监听器管理者基类,通过注解实现事件监听器的自动注册
    * @Author: liulianglin
    * @DateTime 2022年4月21日 下午3:05:43
    * 
    * 将要注册事件的监听器,在 AbstractEventListenerManager 子类属性中,通过注解 @EventAnnotation标记监听器所感兴趣的事件。
    * 详细参见EventListenerManager类
     */
    public abstract class AbstractEventListenerManager {
    	
    	/**
    	 * 初始化 注册监听器(启动程序时调用)
    	 */
    	@SuppressWarnings({ "unchecked" })
    	public void initEventListener () {
    		Field[] fields = getClass().getDeclaredFields();
    		for (Field f:fields) {
    			EventAnnotation evt = f.getAnnotation(EventAnnotation.class);
    			if (evt != null) {
    				EventType eventType = evt.eventType();
    				Class<?> listenClass = f.getType();
    				EventListener<? extends BaseEvent> newInstance;
    				try {
    					newInstance = (EventListener<? extends BaseEvent>)listenClass.getDeclaredConstructor().newInstance();
    					//注册事件
    					EventDispatcher.INSTANCE.registerEvent(eventType, newInstance);
    				} catch (InstantiationException e) {
    					e.printStackTrace();
    				} catch (IllegalAccessException e) {
    					e.printStackTrace();
    				} catch (IllegalArgumentException e) {
    					e.printStackTrace();
    				} catch (InvocationTargetException e) {
    					e.printStackTrace();
    				} catch (NoSuchMethodException e) {
    					e.printStackTrace();
    				} catch (SecurityException e) {
    					e.printStackTrace();
    				}
    			}
    		}
    	}
    }
    

    (5)定义一个事件监听器管理器实现EventListenerManager

    package com.example.demo.eventdriven;
    
    import org.springframework.stereotype.Component;
    
    /**
     * 
    * @ClassName: EventListenerManager
    * @Description: 事件监听器管理类
    * @Author: liulianglin
    * @DateTime 2022年4月21日 下午3:08:38
    * 
    * 不同的业务模块可以创建属于自己的EventListenerManager
     */
    @Component
    public class EventListenerManager extends AbstractEventListenerManager {
    
    	/**
    	 * 在构造器中调用父类的initEventListener,完成下方被注解修饰的所有事件监听器自动注册到EventDispatcher
    	 */
    	public EventListenerManager() {
    		super.initEventListener();
    	}
    	
    	/**
    	 * 通过@EventAnnotation定义该事件监听器感兴趣的事件类型
    	 */
    	@EventAnnotation(eventType=EventType.LOGIN)
    	public ExampleEventListener exampleEvent;
    	
    	//这里继续添加其他事件监听器
    	//@Evt(eventType=EventType.EXIT)
    	//public ExampleEventListener exampleEvent2;
    }
    

    这里注意:这里边的ExampleEventListener暂时还没定义,下面使用环节会定义这个类 

    (6)定义一个注解EventAnnotation,用于配合AbstractEventListenerManager实现事件监听器的自动注册

    package com.example.demo.eventdriven;
    
    import java.lang.annotation.Documented;
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    
    /**
     * 
    * @ClassName: EventAnnotation
    * @Description: 事件注解:指定感兴趣的事件类型
    * @Author: liulianglin
    * @DateTime 2022年4月21日 下午3:32:28
     */
    @Documented
    @Target(ElementType.FIELD)
    @Retention(RetentionPolicy.RUNTIME)
    public @interface EventAnnotation {
    
    	/**事件类型*/
    	public EventType eventType();
    }
    

    (7)再定义一个内部异常类(非必须品)EventDrivenException,用于定义处理内部出现的异常

    package com.example.demo.eventdriven;
    
    /**
     * 
    * @ClassName: EventDrivenException
    * @Description: 事件驱动内部处理异常
    * @Author: liulianglin
    * @DateTime 2022年4月21日 下午3:39:16
     */
    public class EventDrivenException  extends RuntimeException{
    	public EventDrivenException(String message) {
    		super(message);
    	}
    }
    

    (8)事件注册分发中心EventDispatcher

    package com.example.demo.eventdriven;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    
    /**
     * 
    * @ClassName: EventDispatcher
    * @Description: 事件分发器
    * @Author: liulianglin
    * @DateTime 2022年4月21日 下午2:57:35
     */
    public enum EventDispatcher {
    	
    	// 采用枚举实现单例模式
    	INSTANCE; 
    	
    	private ExecutorService executorService;
    	
    	// 事件类型与事件处理器列表的映射关系
    	private final Map<EventType,List<EventHandler<? extends BaseEvent>>> observers = new HashMap<>();
    	
    	// 异步执行的事件队列 
    	private LinkedBlockingQueue<BaseEvent> eventQueue = new LinkedBlockingQueue<>();
    	
    	EventDispatcher(){
    		executorService = Executors.newSingleThreadExecutor();
    		executorService.execute(new EventWorker());
    	}
    	
    	/**
    	 * 注册事件
    	 * @param evtType 事件类型
    	 * @param listener 具体监听器
    	 */
    	public void registerEvent(EventType evtType, EventHandler<? extends BaseEvent> listener) {
    		List<EventHandler<? extends BaseEvent>> listeners = observers.get(evtType);
    		if(listeners == null) {
    			listeners = new ArrayList<EventHandler<? extends BaseEvent>>();
    			observers.put(evtType, listeners);
    		}
    		listeners.add(listener);
    	}
    	
    	/**
    	 * 派发事件
    	 * @param event
    	 */
    	public void dispatchEvent(BaseEvent event) {
    		if(event == null) {
    			throw new NullPointerException("the event cannot be null");
    		}
    		
    		if (event.isSync()) {
    			//如果事件是同步的,那么就在消息主线程执行逻辑
    			handler(event);
    		} else {
    			//否则,就丢到事件线程异步执行
    			eventQueue.add(event);
    		}
    		
    	}
    	
    	/**
    	 * 同步处理器
    	 * @param event
    	 */
    	@SuppressWarnings({ "rawtypes", "unchecked" })
    	private void handler (BaseEvent event) {
    		EventType evtType = event.getEvtType();
    		List<EventHandler<? extends BaseEvent>> listeners = observers.get(evtType);
    		if(listeners != null) {
    			// 一个事件可能被多个事件处理器关注及待处理
    			for(EventHandler listener:listeners) {
    				try{
    					listener.handleEvent(event);
    				} catch(Exception e) {
    					//防止其中一个listener报异常而中断其他逻辑
    					e.printStackTrace();  
    				}
    			}
    		}else {
    			throw new EventDrivenException("can not find the event handler with the event type is "+event.getEvtType());
    		}
    	}
    	
    	/**
    	 * 
    	* @Description: 停止异步事件分发线程
    	* @Author: liulianglin
    	* @Datetime: 2022年5月19日 下午7:29:17
    	* @return void
    	* @throws
    	 */
    	public void stopSyncEventDispatchThread () {
    		eventQueue.add(new BaseEvent(EventType.EXIT, false));
    	}
    
    	/**
    	 * 
    	* @ClassName: EventWorker
    	* @Description: 异步执行线程
    	* @Author: liulianglin
    	* @DateTime 2022年4月21日 下午3:43:24
    	 */
    	private class EventWorker implements Runnable {
    
    		@Override
    		public void run() {
    			while (true) {
    				try {
    					BaseEvent event = eventQueue.take();
    					if (event.getEvtType() == EventType.EXIT) {
    						break;
    					}
    					handler(event);
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    		}
    		
    	}
    }
    

     

    ======================= 至此框架全部代码完毕 =============

    二、框架使用篇

    (1)声明一个业务事件

    package com.example.demo.eventdriven;
    
    
    /**
     * 
    * @ClassName: ExampleEvent
    * @Description: 示例事件(用于演示如何使用该事件驱动框架)
    * @Author: liulianglin
    * @DateTime 2022年4月21日 下午2:50:54
     */
    public class ExampleEvent extends BaseEvent {
    	private String userName;
    
    	public ExampleEvent(EventType evtType, String userName) {
    		super(evtType);
    		this.userName = userName;
    	}
    
    	public String getUserName() {
    		return userName;
    	}
    
    	public void setUserName(String userName) {
    		this.userName = userName;
    	}
    
    }
    

    (2)声明一个业务事件监听器

    package com.example.demo.eventdriven;
    
    /**
     * 
    * @ClassName: ExampleEventListener
    * @Description: 样例事件监听器(用于演示使用)
    * @Author: liulianglin
    * @DateTime 2022年4月21日 下午3:36:17
     */
    public class ExampleEventListener implements EventListener<ExampleEvent> {
    
    	@Override
    	public void handleEvent(ExampleEvent event) {
    		System.out.println("开始处理"+event.getEvtType()+"事件, 当前登陆用户名称="+event.getUserName());
    	}
    
    }
    

    这里注意,自定义的事件监听器需要声明到上面的EventListenerManager内部,如下图所示 :

    (3)编写测试类(基于Spring Boot环境)

    package com.example.demo.eventdriven;
    
    import org.junit.jupiter.api.Test;
    import org.springframework.boot.test.context.SpringBootTest;
    
    @SpringBootTest
    public class EventDrivenTest {
    	
    	@Test
    	void test() {
    		BaseEvent event = new ExampleEvent(EventType.LOGIN, "超级管理员");
    		event.setSync(false);
    		EventDispatcher.INSTANCE.dispatchEvent(event);
    		
    		try {
    			System.out.println("模拟业务处理1秒钟....");
    			Thread.sleep(1000);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		
    		System.out.println("处理完毕,关闭");
    		//EventDispatcher.INSTANCE.shutdown();
    	}
    }
    

    测试结果:

    完活儿。。。。 

    展开全文
  • 用于构建代码即数据程序的通用且与语言无关的基于插件的框架。 应该支持递归。
  • 事件驱动框架(二)——状态机

    千次阅读 2016-11-01 13:27:49
    事件驱动框架(二) 说明 本篇接上一篇事件驱动框架之后
  • 事件驱动框架
  • 基于C++11的事件驱动框架

    万次阅读 2016-04-06 21:05:20
    之前在signal-slot中提到事件循环,不过那个实在写的太挫。现在写了一个说的过去的。项目地址: https://github.com/ZhouBox/moos.git下面简单的说明下。 task完成实现对事件循环的一个任务的封装。 task_...
  • 在这里我们需要设计一个框架,实现套接字对象将自身注册到框架中,框架即可利用epoll对其套接字进行事件监测;当事件产生时通知相应的套接字对象。从而实现事件的监测与处理解耦。惯例还是献上类图。 完整源码见&...
  • Peridot - 一个事件驱动的测试框架事件驱动的BDD测试PHP框架
  • 这是一个Java事件总线框架,用于促进事件驱动的编程。 该框架的目的是提供一种生成和处理事件的简便方法。 鼓励仅将事件用作应用程序流程与第三方系统之间的通信方式。 这些文件可以通过其格式进行记录,操作,...
  • python事件驱动

    2020-12-02 14:20:36
    广告关闭提供包括云服务器,云数据库在内的50+款云计算产品。打造一站式的云产品试用服务,助力开发者和企业零门槛上云。... 当我们面对如下的环境时,事件驱动模型通常是一个好的选择: 程序中有许多任务...
  • Redis源码解析:事件驱动框架

    千次阅读 2022-02-02 21:02:26
    reactor:分配事件 acceptor:建立连接 handler:处理请求 单Reactor单线程 accept->read->处理业务逻辑->write 在一个线程 单Reactor多线程 accept,read,write复用一个线程 处理请求用一个工作线程池 ...
  • 实时性和可靠性已经通过示波器监视时序的测试(在系统负荷较高的情况下),这是可执行程序代码。
  • 原创,引入事件驱动机制的程序框架,可以移植到所有单片机上,这是51移植版本。通过压力测试。
  • 事件驱动框架(一)

    千次阅读 2016-11-01 10:49:40
    事件驱动框架中的状态机
  • 6 种事件驱动的架构模式

    千次阅读 2022-04-27 14:11:14
    2.端到端事件驱动 针对简单业务流程的状态更新 请求-应答模型在浏览器-服务器交互中特别常见。借助 Kafka 和[WebSocket]((),我们就有了一个完整的事件流驱动,包括浏览器-服务器交互。 这使得交互..
  • SOA和事件驱动的编程可以解决这一复杂的难题。SOA给出一个松散耦合的开发模型和运行时环境。它使服务提供者和服务消费者能够使用动态组件交互来构建交互模型,这些交互模型能够利用该开发模型灵活性和强大功能。事件...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 156,857
精华内容 62,742
关键字:

事件驱动框架

友情链接: test1.zip