2008-12-05 21:25:00 jacklam200 阅读数 2280

 ACE Reactor框架:

    只要做三件事:

        1.从ACE_Event_Handler派生一个或多个类,并给各个虚回调方法增加应用特有的事件处理行为

        2.向ACE_Reactor类登记应用的事件处理对象,把每个事件处理对象与它感兴趣的事件关联起来

        3.运行ACE_Reactor事件循环

一个接受连接的例子:

  1. #include <iostream>
  2. #include "ace/auto_ptr.h"
  3. #include "ace/log_msg.h"
  4. #include "ace/inet_addr.h"
  5. #include "ace/sock_acceptor.h"
  6. #include "ace/reactor.h"
  7. #include "ace/Message_Block.h"
  8. #include "ace/Message_Queue.h"
  9. #include "ace/SOCK_Stream.h"
  10. #include "ace/Null_Mutex.h"
  11. #include "ace/Null_Condition.h"
  12. using namespace std;
  13. //服务客户
  14. class ClientService:public ACE_Event_Handler
  15. {
  16. public:
  17.     ACE_SOCK_Stream &peer(void)
  18.     {
  19.         return this->sock_;
  20.     }
  21.     int open(void);
  22.     virtual ACE_HANDLE get_handle(voidconst
  23.     {
  24.         return this->sock_.get_handle();
  25.     }
  26.     virtual int handle_input(ACE_HANDLE fd=ACE_INVALID_HANDLE);
  27.     virtual int handle_output(ACE_HANDLE fd=ACE_INVALID_HANDLE);
  28.     virtual int handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask);
  29. protected:
  30.     ACE_SOCK_Stream sock_;
  31.     ACE_Message_Queue<ACE_NULL_SYNCH> output_queue_;
  32. };
  33. int ClientService::open(void)
  34. {
  35.     ACE_TCHAR peer_name[512];
  36.     ACE_INET_Addr peer_addr;
  37.     if(this->sock_.get_remote_addr(peer_addr)==0&&peer_addr.addr_to_string(peer_name,512)==0)
  38.         cout<<" connection from "<<peer_name<<endl;
  39.     return this->reactor()->register_handler(this,ACE_Event_Handler::READ_MASK);
  40. }
  41. int ClientService::handle_input(ACE_HANDLE)
  42. {
  43.     const size_t INPUT_SIZE=4096;
  44.     char buffer[INPUT_SIZE];
  45.     ssize_t recv_cnt,send_cnt;
  46.     if((recv_cnt=this->sock_.recv(buffer,sizeof(buffer)))<=0)
  47.     {
  48.         ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%P|%t) connection closed/n")));
  49.         return -1;
  50.     }
  51.     send_cnt=this->sock_.send(buffer,ACE_static_cast(size_t,recv_cnt));
  52.     if(send_cnt==recv_cnt)
  53.         return 0;
  54.     if(send_cnt==-1&&ACE_OS::last_error()!=EWOULDBLOCK)
  55.         ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("%P|%t) %p/n"),ACE_TEXT("send")),0);
  56.     if(send_cnt==-1)
  57.         send_cnt=0;
  58.     ACE_Message_Block *mb;
  59.     size_t remaining=ACE_static_cast(size_t,(recv_cnt-send_cnt));
  60.     ACE_NEW_RETURN(mb,ACE_Message_Block(&buffer[send_cnt],remaining),-1);
  61.     int output_off=this->output_queue_.is_empty();
  62.     ACE_Time_Value nowait(ACE_OS::gettimeofday());
  63.     if(this->output_queue_.enqueue_tail(mb,&nowait)==-1)
  64.     {   
  65.         ACE_ERROR((LM_ERROR,ACE_TEXT("(%P|%t)%P;discarding data/n"),ACE_TEXT("enqueue failed ")));
  66.         mb->release();
  67.         return 0;
  68.     }
  69.     if(output_off)
  70.         return this->reactor()->register_handler(this,ACE_Event_Handler::WRITE_MASK);
  71.     return 0;
  72. }
  73. int ClientService::handle_output(ACE_HANDLE)
  74. {
  75.     ACE_Message_Block *mb;
  76.     ACE_Time_Value nowait(ACE_OS::gettimeofday());
  77.     while(0==this->output_queue_.dequeue_head(mb,&nowait))
  78.     {
  79.         ssize_t send_cnt=this->sock_.send(mb->rd_ptr(),mb->length());
  80.         if(send_cnt==-1)
  81.             ACE_ERROR((LM_ERROR,ACE_TEXT("(%P|%t)%p/n"),ACE_TEXT("send")));
  82.         else
  83.             mb->rd_ptr(ACE_static_cast(size_t,send_cnt));
  84.         if(mb->length()>0)
  85.         {
  86.             this->output_queue_.enqueue_head(mb);
  87.             break;
  88.         }
  89.         mb->release();
  90.     }
  91.     return (this->output_queue_.is_empty())?-1:0;
  92. }
  93. int ClientService::handle_close(ACE_HANDLE,ACE_Reactor_Mask mask)
  94. {
  95.     if(mask==ACE_Event_Handler::WRITE_MASK)
  96.         return 0;
  97.     mask=ACE_Event_Handler::ALL_EVENTS_MASK|ACE_Event_Handler::DONT_CALL;
  98.     this->reactor()->remove_handler(this,mask);
  99.     this->sock_.close();
  100.     this->output_queue_.flush();
  101.     delete this;
  102.     return 0;
  103. }
  104. //接受客户
  105. class ClientAccept:public ACE_Event_Handler
  106. {
  107. public:
  108.     virtual ~ClientAccept()
  109.     {
  110.             this->handle_close(ACE_INVALID_HANDLE,0);
  111.     }
  112.     int open(const ACE_INET_Addr &listen_addr);
  113.     virtual ACE_HANDLE get_handle(voidconst
  114.     {
  115.         return this->acceptor_.get_handle();
  116.     }
  117.     virtual int handle_input(ACE_HANDLE fd=ACE_INVALID_HANDLE);
  118.     virtual int handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask);
  119. protected:
  120.     ACE_SOCK_Acceptor acceptor_;
  121. };
  122. int ClientAccept::open(const ACE_INET_Addr &listen_addr)
  123. {
  124.     if(this->acceptor_.open(listen_addr,1)==-1)
  125.     {
  126.         ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("%p/n"),ACE_TEXT("acceptor.open")),-1);
  127.     }
  128.     return this->reactor()->register_handler(this,ACE_Event_Handler::ACCEPT_MASK);
  129. }
  130. int ClientAccept::handle_input(ACE_HANDLE)
  131. {
  132.     ClientService *client;
  133.     ACE_NEW_RETURN(client,ClientService,-1);
  134.     auto_ptr<ClientService>p(client);
  135.     if(this->acceptor_.accept(client->peer())==-1)
  136.         ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("%P|%T)%p/N"),ACE_TEXT("Failed to accept ")ACE_TEXT("client connection")),-1);
  137.     p.release();
  138.     client->reactor(this->reactor());
  139.     if(client->open()==-1)
  140.         client->handle_close(ACE_INVALID_HANDLE,0);
  141.     return 0;
  142. }
  143. int ClientAccept::handle_close(ACE_HANDLE,ACE_Reactor_Mask)
  144. {
  145.     if(this->acceptor_.get_handle()!=ACE_INVALID_HANDLE)
  146.     {
  147.         ACE_Reactor_Mask m=ACE_Event_Handler::ACCEPT_MASK|ACE_Event_Handler::DONT_CALL;
  148.         this->reactor()->remove_handler(this,m);
  149.     }
  150.     return 0;
  151. }
  152. int main(int argc,char *argv[])
  153. {
  154.     ACE_INET_Addr port_to_listen(50000,ACE_LOCALHOST);
  155.     ClientAccept acceptor;
  156.     acceptor.reactor(ACE_Reactor::instance());
  157.     if(acceptor.open(port_to_listen)==-1)
  158.         return 1;
  159.     ACE_Reactor::instance()->run_reactor_event_loop();
  160.     return 0;
  161. }

每个类要处理任何类型的Reactor事件的类,必须从ACE_Event_Handler派生,虽然可以用一个类控制接受和所有客户的连接,但还是创建“连接接受”和“连接服务”不同的类比较好!

1.这样可以更好的封装数据和行为,这个类接受来自客户的连接,而这是他所做的全部事情

2.代表客户的类将为客户连接提供服务

 

在针对一些I/O事件向反应器登记某个事件处理器时,反应器会把一个ACE_Event_Handler指针与一个句柄以及处理器感兴趣的I/O事件类型关联在一起!

 

当I/O事件触发时,会回调特定的句柄传给handle_input()方法的ACE_HANDLE参数

而在上面程序例子中,创建了一个clientservice实例,为每个连接使用单独的服务处理对象,所以每次接受新的连接都会得到一个新的CLIENTSERVICE实例

 

为了对要发送的数据进行排队,CLientService用一个ACE_Message_Queue,当要对稍后发送的数据进行排队时,分配一个ACE_Message_Block保存这些数据,并把它放入队列中,以备后用,如果我们无法把数据放入队列,我们就会放弃,抛弃那些数据。如果在我们尝试把余下的数据放入队列之前,输出队列是空的,我们就会再向反应器登记这个处理,这一次针对的是 WRITE事件

 

 

ACE_Message_Queue

通过在类声明是指定锁类型就可以很方便实现进程,线程安全的消息队列
ACE_Message_Queue<ACE_MT_SYNCH> message_queue_;如果程序是单线程的话,
可以ACE_Message_Queue<ACE_NULL_SYNCH> message_queue_。

 

 

ACE_Message_Block功能简介

ACE_Message_Block在Ace中用来表示消息的存放空间,可用做网络通信中的消息缓冲区,使用非常频繁,下面将在如下方简单的介绍一下ACE_Message_Block相关功能。

  1. 创建消息块
  2. 释放消息块
  3. 从消息块中读写数据
  4. 数据的拷贝
  5. 其它常用函数

1。创建消息块

创建消息块的方式比较灵活,常用的有以下几种方式 :

1。直接给消息块分配内存空间创建。

    ACE_Message_Block *mb = new ACE_Message_Block (30);

2。共享底层数据块创建。

    char buffer[100];
    ACE_Message_Block *mb = new ACE_Message_Block (buffer,30);

这种方式共享底层的数据块,被创建的消息块并不拷贝该数据,也不假定自己拥有它的所有权。在消息块mb被销毁时,相关联的数据缓冲区data将不会被销毁。这是有意义的:消息块没有拷贝数据,因此内存也不是它分配的,这样它也不应该负责销毁它。

3。通过duplicate()函数从已有的消息块中创建副本。

    ACE_Message_Block *mb = new ACE_Message_Block (30);
    ACE_Message_Block *mb2 = mb->duplicate();

这种方式下,mb2和mb共享同一数据空间,使用的是ACE_Message_Block的引用计数机制。它返回指向要被复制的消息块的指针,并在内部增加内部引用计数

4。通过clone()函数从已有的消息块中复制。

    ACE_Message_Block *mb = new ACE_Message_Block (30);
    ACE_Message_Block *mb2 = mb->clone();

clone()方法实际地创建整个消息块的新副本,包括它的数据块和附加部分;也就是说,这是一次"深拷贝"。

2。释放消息块

一旦使用完消息块,程序员可以调用它的release()方法来释放它。

  1. 如果消息数据内存是由该消息块分配的,调用release()方法就也会释放此内存。
  2. 如果消息块是引用计数的,release()就会减少计数,直到到达0为止;之后消息块和与它相关联的数据块才从内存中被移除。
  3. 如果消息块是通过共享已分配的底层数据块创建的,底层数据块不会被释放。

无论消息块是哪种方式创建的,只要在使用完后及时调用release()函数,就能确保相应的内存能正确的释放。

3。从消息块中读写数据

ACE_Message_Block提供了两个指针函数以供程序员进行读写操作,rd_ptr()指向可读的数据块地址,wr_ptr()指向可写的数据块地址,默认情况下都执行数据块的首地址。下面的例子简单了演示它的使用方法。

#include "ace/Message_Queue.h"
#include "ace/OS.h"

int main(int argc, char *argv[])
{
    ACE_Message_Block *mb = new ACE_Message_Block (30);
    ACE_OS::sprintf(mb->wr_ptr(),"%s","hello");
    ACE_OS::printf("%s/n",mb->rd_ptr ());
    mb->release();
    return 0;
}

注意:这两个指针所指向的位置并不会自动移动,在上面的例子中,函数执行完毕后,执行的位置仍然是最开始的0,而不是最新的可写位置5,程序员需要通过wr_ptr(5)函数手动移动写指针的位置。

4。数据的拷贝

一般的数据的拷贝可以通过函数来实现数据的拷贝,copy()还会保证wr_ptr()的更新,使其指向缓冲区的新末尾处。

下面的例子演示了copy()函数的用法。

    mb->copy("hello");
    mb->copy("123",4);

注意:由于c++是以'/0'作为字符串结束标志的,对于上面的例子,底层数据块中保存的是"hello/0123/0",而用ACE_OS::printf("%s/n",mb->rd_ptr ());打印出来的结果是"hello",使用copy函数进行字符串连接的时候需要注意。

5。其它常用函数

  1. length()    返回当前的数据长度
  2. next()    获取和设置下一个ACE_Message_Block的链接。(用来建立消息队列非常有用)
  3. space()    获取剩余可用空间大小
  4. size()    获取和设置数据存储空间大小。

注:ACE_NEW_RETURN的意思用new动态生成一个参数2类型的空间,并将空间的首地址副给第一个参数。如果有错误产生则将第一个参数的值设为空,并返回值RET_VAL。

2014-11-22 11:59:32 yangyangye 阅读数 3142

最近接触力ACE,发现了很多好文章,在此转载一下,也当作一个笔记吧。


Reactor与 Proactor

基本概念

在高性能的I/O设计中,有两个比较著名的模式ReactorProactor模式,其中Reactor模式用于同步I/O,而Proactor运用于异步I/O操作。

       在比较这两个模式之前,我们首先的搞明白几个概念,

    • 什么是阻塞和非阻塞

阻塞和非阻塞是针对于进程在访问数据的时候,根据IO操作的就绪状态来采取的不同方式,说白了是一种读取或者写入操作函数的实现方式。

阻塞方式下读取或者写入函数将一直等待

非阻塞方式下,读取或者写入函数会立即返回一个状态值。

    • 什么是同步和异步

同步和异步是针对应用程序和内核的交互而言的。

同步指的是用户进程触发IO操作并等待或者轮询的去查看IO操作是否就绪。

异步是指用户进程触发IO操作以后便开始做自己的事情,而当IO操作已经完成的时候会得到IO完成的通知


一般来说I/O模型可以分为:同步阻塞,同步非阻塞,异步阻塞,异步非阻塞

让我们来看一下每种不同I/O模型的具体描述

    •    同步阻塞 IO

   在此种方式下,用户进程在发起一个IO操作以后,必须等待IO操作的完成,只有当真正完成了IO操作以后,用户进程才能运行。JAVA传统的IO模型属于此种方式!

    •    同步非阻塞IO:

在此种方式下,用户进程发起一个IO操作以后边可返回做其它事情,但是用户进程需要时不时的询问IO操作是否就绪,这就要求用户进程不停的去询问,从而引入不必要的CPU资源浪费。其中目前JAVANIO就属于同步非阻塞IO

    •    异步阻塞IO

此种方式下,应用发起一个IO操作以后,不等待内核IO操作的完成,等内核完成IO操作以后会通知应用程序,这其实就是同步和异步最关键的区别,同步必须等待或者主动的去询问IO是否完成,那么为什么说是阻塞的呢?因为此时是通过select系统调用来完成的,select函数本身的实现方式是阻塞的,而采用select函数有个好处就是它可以同时监听多个文件句柄,从而提高系统的并发性!

    •    异步非阻塞IO:

此种方式下,用户进程只需要发起一个IO操作然后立即返回,等IO操作真正的完成以后,应用程序会得到IO操作完成的通知,此时用户进程只需要对数据进行处理就好了,不需要进行实际的IO读写操作,因为真正的IO读取或者写入操作已经由内核完成了。目前Java中还没有支持此种IO模型。   

阻塞型I/O意味着控制权只有到调用操作结束后才会回到调用者手里.结果调用者被阻塞了,这段时间了做不了任何其它事情。 更郁闷的是,在等待IO结果的时间里,调用者所在线程此时无法腾出手来去响应其它的请求,这真是太浪费资源了。拿read() 操作来说吧,调用此函数的代码会一直僵在此处直至它所读的socket缓存中有数据到来。

相比之下,非阻塞同步是会立即返回控制权给调用者的。调用者不需要等等,它从调用的函数获取两种结果:要么此次调用成功进行了;要么系统返回错误标识告诉调用者当前资源不可用,你再等等或者再试一次看吧。比如read()操作,如果当前socket无数据可读,则立即返回EWOULBLOCK/EAGAIN,告诉调用read()"数据还没准备好,你稍后再试".

在非阻塞异步调用中,稍有不同。调用函数在立即返回时,还告诉调用者,这次请求已经开始了。系统会使用另外的资源或者线程来完成这次调用操作,并在完成的时候知会调用者(比如通过回调函数)。拿WindowsReadFile()或者POSIXaio_read()来说,调用它之后,函数立即返回,操作系统在后台同时开始读操作。

在以上三种IO形式中,非阻塞异步是性能最高、伸缩性最好的。搞清楚了以上概念以后,我们再回过头来看看,Reactor模式和Proactor模式。

此文详细的阐述了基于TCP高性能的GOLDEN数据服务器模块的设计以及解决方案 ,我们在文章的后面就不再提及阻塞式的方案了,因为阻塞式I/O实在是缺少可伸缩性,性能也达不到高性能服务器的要求。

两种IO多路复用方案:Reactor和 Proactor

一般情况下,I/O复用机制需要事件分离器(event demultiplexor ). 事件分离器的作用,就是将那些读写事件源分发给各读写事件的处理者,就像送快递的在楼下喊:谁的什么东西送了,快来拿吧。开发人员在开始的时候需要在事件分离器那里注册感兴趣的事件,并提供相应的事件处理器(event handlers),或者是回调函数;事件分离器在适当的时候会将请求的事件分发给这些handler或者回调函数。

涉及到事件分离器的两种模式称为:ReactorProactorReactor模式是基于同步I/O的,而Proactor模式是和异步I/O相关的

    Reactor模式中,事件分离者等待某个事件或者是应用或者是某个操作的状态发生(比如文件描述符可读写,或者是socket可读写),事件分离者就把这个事件传给事先注册的事件处理器或者事件处理函数或者回调函数,由后者来做实际的读写操作

    在Proactor模式中,事件处理器(或者由事件分离器代为)直接发起一个异步读写操作(相当于请求)而实际的工作是由操作系统来完成的。发起时,需要提供的参数包括用于存放读到数据的缓存区,读的数据大小,或者用于存放外发数据的缓存区,以及这个请求完后的回调函数等信息。事件分离器得知了这个请求,它默默等待这个请求的完成,然后转发完成事件给相应的事件处理器或者事件处理函数或者回调。举例来说,在Windows上事件处理器投递了一个异步IO操作(称有overlapped的技术),事件分离器等IOCompletion事件完成 ,这种异步模式的典型实现是基于操作系统底层异步API的,所以我们可称之为“系统级别”的或者“真正意义上”的异步,因为具体的读写是由操作系统代劳的。

举另外个例子来更好地理解ReactorProactor两种模式的区别。这里我们只关注read操作,因为write操作也是差不多的。下面是Reactor的做法:

  1. 某个事件处理器宣称它对某个socket上的读事件很感兴趣;

  2. 事件分离者等着这个事件的发生;

  3. 当事件发生了,事件分离器被唤醒,这负责通知先前那个事件处理器;

  4. 事件处理器收到消息,于是去那个socket上读数据了如果需要,它再次宣称对这个socket上的读事件感兴趣,一直重复上面的步骤;

下面再来看看真正意义的异步模式Proactor是如何做的:

  1. 事件处理器直接投递发一个读操作(当然,操作系统必须支持这个异步操作)。这个时候,事件处理器根本不关心读事件,它只管发这么个请求,它魂牵梦萦的是这个读操作的完成事件。这个事件处理器很拽,发个命令就不管具体的事情了,只等着别人系统)帮他搞定的时候给他回个话。

  2. 事件分离器等着这个读事件的完成(比较下与Reactor的不同);

  3. 当事件分离器默默等待完成事情到来的同时,操作系统已经在一边开始干活了,它从目标读取数据,放入用户提供的缓存区中,最后通知事件分离器,这个事情我搞完了;

  4. 事件分离器通知之前的事件处理器你吩咐的事情搞定了;

  5. 事件处理器这时会发现想要读的数据已经乖乖地放在他提供的缓存区中,想怎么处理都行了。如果有需要,事件处理器还像之前一样发起另外一个读操作,和上面的几个步骤一样。

现行做法

开源C++开发框架ACEAdaptive Communication Enviromen 提供了大量平台独立的底层并发支持类(线程、互斥量等).同时在更高一层它也提供了独立的几组C++类,用于实现ReactorProactor模式。 尽管它们都是平台独立的单元,但他们都提供了不同的接口.

ACE ProactorMS-Windows上无论是性能还在健壮性都更胜一筹,这主要是由于Windows提供了一系列高效的底层异步API

不幸的是,并不是所有操作系统都为底层异步提供健壮的支持。举例来说,许多Unix系统就有麻烦.ACE中的ProactorUnix上是使用Posix标准实现的异步操作,Posix中有一个AIOProactor使用AIO实现异步传输。但Linux2.6以前版本中不支持AIO,而在2.6版本以后,部分支持AIO。就因为这个部分支持,所以,Posix的子类不能正常工作。因此,ACEReactor可能是Unix系统上更合适的解决方案.正因为系统底层的支持力度不一,为了在各系统上有更好的性能,开发者不得不维护独立的好几份代码:Windows准备的ACE Proactor以及为Unix系列提供的ACE Reactor

就像我们提到过的,真正的异步模式需要操作系统级别的支持。由于事件处理器及操作系统交互的差异,为ReactorProactor设计一种通用统一的外部接口是非常困难的。这也是设计通行开发框架的难点所在。

ACE Proactor 框架

怎样发送和接收数据

ACEProactor框架包含了一组高度相关的类,其数量相对较多,我在进行以下描述的时候不可能按照顺序讨论它们,而又不进行提前引用。到最后我会描述完所有这些类。下面这些类给出了ACE Proactor框架的各个类以及它们之间的关系。可以把这个图1-1当作描述ACE Proactor框架实际应用的范本。注意:类名中以ACE_开始的类名称是ACE Procator框架中包含的类,而以golden_开始的类名称是实际应用范本提供的类。

下面的代码声明了一个类,它所完成的基本工作是处理接收和发送数据。



1.1 ACE Proactor框架中的类

#include "ace/Asynch_IO.h"

class golden_aio_handler : public ACE_Service_Handler

{

public :

golden_aio_handler (golden_aio_acceptor *acc = 0) ;

virtual void open ( ACE_HANDLE new_handle,

ACE_Message_Block &message_block ) ;

virtual void handle_read_stream(

const ACE_Asynch_Read_Stream::Result &result);

virtual void handle_write_stream(

const ACE_Asynch_Write_Stream::Result &result);

private:

ACE_Asynch_Read_Stream reader_;

ACE_Asynch_Write_Stream writer_;

} ;

这段代码首先包含了一些必需的头文件,以引入这个例子使用的ACE Proactor框架类:

  1. ACE_Service_Handler Proactor框架中创建事件处理器所用的目标类 。

  2. ACE_Handler ACE_Service_Handler的父类,定义了通过ACE_Proactor框架处理异步I/O完成事件所需要的接口

  3. ACE_Asynch_Read_Stream 用于在已经连接的TCP/IP socket上发起读操作的I/0工厂类。

  4. ACE_Asynch_Write_Stream 用于在已经连接的TCP/IP socket上发起写操作的I/0工厂类。

  5. Result 每个I/O工厂类都把Result定义为嵌在自己内部的类,用以保存该工厂发起的每个操作的结果。所有的Result类都从ACE_Asynch_Result派生,并且增加了专用于它们所针对的I/O类型的数据和方法。因为每个异步I/O操作的发起和完成都是分离的、不同的事情,需要有一种机制来“记住”操作的参数,并且连同结果一起吧这些参数转交给完成处理器。

设置事件处理器并发起I/O

TCP连接打开时,我们应该把新socket的句柄传给事件处理器对象,在这个例子中是golden_aio_handler。把句柄放在事件处理器里是有益的,原因如下:

    1. 它是socket的生命期一个方便的控制点,因为它是连接工厂的目标。

    2. I/O操作最有可能从这个类发起。

在使用ACE_Proactor框架的异步连接建立类时golden_aio_handler::open()挂钩方法会在新连接建立时被调用。下面是我们程序中的open()挂钩:

void

golden_aio_handler::open(ACE_HANDLE new_handle, ACE_Message_Block &)

{

this->handle(new_handle);

//打开异步读写

reader_.open (*this, new_handle, 0, proactor ());

writer_.open (*this, new_handle, 0, proactor ());

//准备读的缓冲区

ACE_NEW_NORETURN(mblk_, ACE_Message_Block (SIZEOF_HEADER_WITH_CRC));

if (reader_.read (*mblk_,SIZEOF_HEADER_WITH_CRC) <0)

{

delete this ;

}

}

在一开始,我们使用继承而得到的ACE_Handler::handle()方法保存新socket的句柄。该方法把句柄存储在一个方便的地方,以便在析构函数~golden_aio_handler()访问或者用于其他用途。这是在这个类中实现的socket句柄生命期管理的一部分。

要发起I/O,必须初始化所需的I/O工厂对象。在存储了socket句柄之后,open()方法会初始化reader_writer_ I/O工厂对象,为发起I/O操作做准备。两个类的open()方法都是一样的:

int open (ACE_Handler &handler,

ACE_HANDLE handle = ACE_INVALID_HANDLE,

const void *completion_key = 0,

ACE_Proactor *proactor = 0);

第一个参数表示工厂对象所发起的操作的完成事件处理器里。当通过工厂对象发起的I/O操作完成时,ACE_Proactor框架会回调这个对象。这也是为什么该处理器对象叫做完成事件处理器的原因。在我们的程序中,golden_aio_handler对象是ACE_Handler的后代,即是读操作也是写操作的完成事件处理器,所以*this被用作处理器参数。handle是新传入的socket句柄,completion_key参数只适用于windows默认传入0即可,proactor参数会传入一个在进程范围的ACE_Procator单体对象。

程序中的open()挂钩方法所做的最后一件事情,是调用ACE_Asynch_Read_Stream::read()方法,从而在新的socket上发起一个读操作。ACE_Asynch_Read_Stream::read()函数如下:

int read (ACE_Message_Block &message_block,

size_t num_bytes_to_read,

const void *act = 0,

int priority = 0,

int signal_number = ACE_SIGRTMIN);

为传输指定一个ACE_Message_Block ,使得缓冲区管理变得更为容易,因为可以利用ACE_Message_Block的各种能力,以及它与ACE的其他部分的集成。在发起读操作时,数据会被读入开始于数据块的写指针处在的块中,因为要被读取的数据将被写入块中。


完成I/O操作

ACE_Proactor框架是基于事件的框架。I/O工厂登记“每个操作”与“该操作完成时应回调的完成事件处理器”之间建立关联。当读取完成时,ACE_Proactor框架会调用ACE_Handler::handle_read_stream()挂钩方法:

void golden_aio_handler::handle_read_stream(

const ACE_Asynch_Read_Stream::Result &result)

{ ACE_Asynch_Read_Stream::Result &result

if (!result.success () || result.bytes_transferred () == 0)

delete this;

else if (result.bytes_transferred () < result.bytes_to_read ())

{

if (reader_.read (*mblk_, result.bytes_to_read () - result.bytes_transferred ()) < 0)

delete this ;

}

else if (mblk_->length () == SIZEOF_HEADER_WITH_CRC)

handle_msg_header();

else

{

if (handle_msg_pack()<0)

delete this ;

}

}

传入的ACE_Asynch_Read_Stream::Result指向的是用于保存读取操作结果的对象。每个I/O工厂类都会定义自己的Result类,即用于保存每个操作发起时所用的参数,又用于保存操作的结果。

如果读操作读取了任何数据,处理接收read到的报文数据包用handle_msg_pack函数,然后发起一个写操作,把数据处理结果返回给对端。当写操作完成时,ACE_Proactor框架调用下面的handle_write_stream方法:

void golden_aio_handler::handle_write_stream(const ACE_Asynch_Write_Stream::Result &result)

{

if(reader_.read (*mblk_,SIZEOF_HEADER_WITH_CRC) < 0)

delete this;

}

不管写操作是否成功完成,在该操作中使用的消息块都会释放。如果socket出了问题,先前发起的读操作也会完成并出错,而handle_read_stream()会清理对象和socket句柄。


1-2给出了本程序事件序列。

、建立连接

ACE提供里两个工厂类,用于通过ACE_Proactor框架前摄式地建立TCP/IP连接:

ACE_Asynch_Acceptor , 用于发起被动的连接建立

ACE_Asynch_Connector , 用于发起主动的连接建立

1.2 ACE Proactor异步回调序列图

当使用其中一个类建立TCP/IP连接时,ACE_Proactor框架会创建一个从ACE_Service_Handler派生的事件服务处理器,比如golden_aio_handler,用以处理新连接。ACE_Service_Handler类是

ACE_Proactor框架中所有用异步方式连接的服务的基类,从ACE_Handler派生,所以服务类也可以处理在服务中发起的I/O操作的完成.

ACE_Asynch_Acceptor是一个相当容易使用的类,它的一个挂钩方法是一个protected虚方法:make_handler()Proactor框架调用这个方法获取一个ACE_Service_Handler对象,用以为新连接提供服务。下面的代码说明了这种情况:

golden_aio_handler * golden_aio_acceptor::make_handler (void)

{

///来一个连接,就新增一个句柄。在线程池中处理

golden_aio_handler *ih;

ACE_NEW_RETURN (ih, golden_aio_handler (this), 0);

if (clients_.insert (ih) == -1)

{

delete ih ;

return NULL ;

}

return ih;

}

return 0 ;

}



ACE_Proactor完成多路分离器

ACE_Proactor类负责驱动ACE_Proactor框架的完成处理,这个类等待完成事件的发生、把这些事件多路分离给相关联的完成事件处理器,并分派每个完成处理器上适当的挂钩方法。因此,要让异步I/O完成事件处理器得以发生----无论是I/O还是连接建立----Golden Server中都必须运行前摄器的时间循环。这通常很简单,只需要把下面的代码插入到应用中就可以了:

int golden_aio::svc()

{

ACE_Proactor::instance()->proactor_run_event_loop ();

return 1 ;

}

可以通过两种方式来使用ACE_Proactor,如上所示的程序代码instance(),作为单体来使用。也可以通过实例化一个或多个实例来使用。这个能力被用于在一个进程中支持多个前摄器。如下代码所示,这是应用于镜像发送和镜像接收的前摄器

int golden_mirror_sender::svc()

{

proactor_sender_->proactor_run_event_loop ();

return 1 ;

}

int golden_mirror_receiver::svc()

{

proactor_recviver_->proactor_run_event_loop ();

return 1 ;

}

各种操作系统上的异步I/O设施会有很大的不同,为了在所有这些系统上维持统一的接口和编程方法,ACE_Proactor类使用了Bridge模式来维持灵活性和可扩展性,同时还使得ACE_Proactor框架能够使用不同的异步I/O实现。

ACE_WIN32_Proactor类是Windows上的ACE_Proactor实现。使用了I/O完成端口进行完成事件检测。在初始化异步操作工厂时,I/O句柄与前摄器的I/O完成端口被关联在一起。在这种实现中,windows下的GetQueuedCompletionStatus()函数负责执行事件循环,如下程序代码

int golden_server::create_proactor()

{

ACE_Proactor::instance()->close_singleton();

impl_ = new ACE_WIN32_Proactor(0,1);

ACE_Proactor::instance(new ACE_Proactor(impl_,1),1) ;

return 0;

}

int golden_server::create_proactor_mirror_recviver()

{

ACE_NEW_RETURN(impl_mirror_recviver_, ACE_WIN32_Proactor(0,1),-1);

ACE_NEW_RETURN(mirror_recviver_proactor_, ACE_Proactor(impl_mirror_recviver_,1),-1);

return 0;

}

int golden_server::create_proactor_mirror_sender()

{

ACE_NEW_RETURN(impl_mirror_sender_, ACE_WIN32_Proactor(0,1),-1);

ACE_NEW_RETURN(mirror_sender_proactor_, ACE_Proactor(impl_mirror_sender_,1),-1);

return 0;

}


线程池

大多数网络服务器都被设计成能同时处理多个客户请求。使用反应式事件处理、多个进程和多个线程。在构建多线程服务器时,我们拥有多种选择,包括:为每个请求派生一个新线程、为每个连接/会话派生一个新线程、预先派生一池受管线程,也就是创建一个线程池。在Golden Server设计中我们采用了了线程池的方法。

线程池模型有两种变种,每种都有不同的性能特征:

  1. 半同步/半异步模型。在这种模型中,一个侦听会异步的接收请求,并在某个队列中缓冲它们。另外一组工作者线程负责同步地处理这些请求

  2. 领导者/跟随着模型。在这种模型总,有一个线程是领导者,其余线程是在线程池中的跟随者。当请求到达时,领导者会拾取它,并从跟随者中选取一个新的领导者,然后继续处理该请求。因此,在这种模型中,接收请求的线程池就是处理它的线程。

领导者/跟随者模型中,只用了一组线程等待新请求,并处理请求。一个线程被选作领导者,阻塞在“到来的请求源”上,当请求到达时,领导者线程首先获取请求,把某个跟随者提升为领导者,然后继续处理所收到的请求。新领导者在请求源上等待新的请求,与此同时旧领导者会处理刚刚收到的请求,一旦就领导者完成处理,它就会作为跟随者线程回到线程池的末尾。

领导者/跟随者模型的一个优点是性能得到了提高,因为不用进行线程间的上下文切换。但同时这种模型也是复杂的。在程序中单个ACE_Task封装了线程池中的所有线程。

class golden_aio :public ACE_Task<ACE_SYNCH>

{

public:

golden_aio(int number_of_connection)

/// ACE_TASK的虚拟方法。用来启动svc

virtual int open (void * = 0);

///初始化

virtual int init(u_short ,ACE_Proactor* );

///结束并关闭

virtual int fini();

/// Run by a daemon thread to handle deferred processing

virtual int svc (void);

};

int golden_aio::open (void * )

{

return activate (THR_NEW_LWP | THR_JOINABLE |THR_INHERIT_SCHED ,number_of_thread_);

}

int golden_aio::svc()

{

ACE_Proactor::instance()->proactor_run_event_loop ();

return 1 ;

}

每个线程启动时,首先会通过调用activate(),将任务转化为运行在一个或多个线程中的主动对象

主动对象执行任务的svc()挂钩方法。每个线程在执行proactor_run_event_loop ()调用GetQueuedCompletionStatus ()。 如果没有消息到达线程会阻塞在GetQueuedCompletionStatus ()函数上,直到有消息到来那么有一个线程便成了领导者线程。如果领导者线程能很快的处理完所有事情,领导者线程会再次进入等待状态。如果领导者线程不能马上处理完,则从跟随者线程中指定一个新的领导者线程,自己去处理事件,不再当领导者。

单体模型

Golden Servergolden_usergolden_authorizegolden_startupgolden_protgolden_system_infogolden_dir_visitorgolden_mirror_sendergolden_mirror_receiver采用单体实例模式。相关单体模式的概念,请自行参考相关手册和书籍。

如何添加一个接口应用

Goldensdk模块

  1. 打开goldensdk.h头文件,根据接口类型找到合适的位置,添加接口的声明、接口的描述注释。

  2. 打开goldensdk.cpp文件,添加接口的空实现。

  3. 打开goldensdk.def文件,添加接口名称。

  4. 按照步骤完成Goldenserver模块

  5. goldensdk.cpp文实现已经添加的空接口。

Goldenserver模块

  1. golden_message_protocol.h中添加自定义的报文数据结构定义和报文消息ID , 命名规则如下GOLDEN_PACK_XXXGOLDEN_PACK_XXX_RESULTMESSAGE_XXXMESSAGE_XXX_RESULT

  2. golden_message_protocol.cpp中添加用来计算报文数据结构大小的函数声明和整编\解编数据流的函数声明。命名规则如下:sizeof_GOLDEN_PACK_XXX()sizeof_GOLDEN_PACK_XXX_RESULT()int operator<<(ACE_OutputCDR &cdr , const GOLDEN_PACK_XXX &spack) int operator<<(ACE_OutputCDR &cdr , const GOLDEN_PACK_XXX_RESUL &spack);

  3. golden_message_protocol.cpp中添加用来计算报文数据结构大小的函数实现和整编\解编数据流的函数实现。

  4. golden_message_protocol.cpp中添加全部变量_gstring中的内容,用来描述报文内容。

  5. goldeserver.h中的golden_aio_handler类中添加处理报文的成员函数声明。命名规则如下:int msg_xxx() int msg_xxx_result() .

  6. goldeserver.cpp中添加处理报文的成员函数的空实现golden_aio_handler:: msg_xxx(), golden_aio_handler:: msg_xxx_result() .

  7. goldeserver.cpp的 int golden_aio_handler::handle_msg_pack()函数中添加报文处理项。

  8. 实现f步骤中定义的成员函数空实现。

参考文献

  1. C++网络编程卷一--- 运用ACE和模式消除复杂性

  2. C++网络编程卷二--- 基于ACE和框架的系统化复用

  3. ACE程序员指南--- 网络与系统编程的实用设计模式

  4. 设计模式:可复用面向对象软件的基础




2014-07-01 19:42:45 iw1210 阅读数 4483

用ACE的Reactor模式实现网络通讯的例子,不罗嗦,直接上代码。

服务器代码:

#include <ace/Reactor.h>
#include <ace/SOCK_Connector.h> 
#include <ace/SOCK_Acceptor.h> 
#include <ace/Auto_Ptr.h>


class ClientService : public ACE_Event_Handler
{
public:
    ACE_SOCK_Stream &peer (void) { return this->sock_; }

    int open (void)
    {
        //注册读就绪回调函数
        return this->reactor ()->register_handler(this, ACE_Event_Handler::READ_MASK);
    }

    virtual ACE_HANDLE get_handle (void) const { return this->sock_.get_handle (); }

    virtual int handle_input (ACE_HANDLE fd )
    {
        int rev = peer().recv(buf,sizeof(buf));
        if(rev<=0)
            return -1;
		buf[rev] = '\0';
		printf("recv: %s",buf);
		strcpy(buf,"hello,Client\n");
        peer().send(buf,strlen(buf)); //向客户端发送信息。
        return 0;
    }

    // 释放相应资源
virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask mask)
    {
        if (mask == ACE_Event_Handler::WRITE_MASK)
            return 0;
        mask = ACE_Event_Handler::ALL_EVENTS_MASK |
            ACE_Event_Handler::DONT_CALL;
        this->reactor ()->remove_handler (this, mask);
        this->sock_.close ();
        delete this;    //socket出错时,将自动删除该客户端,释放相应资源
        return 0;
    }

protected:
    char buf[100];
    ACE_SOCK_Stream sock_;
};


class ClientAcceptor : public ACE_Event_Handler
{
public:
    virtual ~ClientAcceptor (){this->handle_close (ACE_INVALID_HANDLE, 0);}

    int open (const ACE_INET_Addr &listen_addr)
    {
        if (this->acceptor_.open (listen_addr, 1) == -1)
        {
            printf("open port fail\n");
            return -1;
        }
        //注册接受连接回调事件
        return this->reactor ()->register_handler(this, ACE_Event_Handler::ACCEPT_MASK);
    }

    virtual ACE_HANDLE get_handle (void) const
    { return this->acceptor_.get_handle (); }

    virtual int handle_input (ACE_HANDLE fd )
    {
        ClientService *client = new ClientService();
        auto_ptr<ClientService> p (client);

        if (this->acceptor_.accept (client->peer ()) == -1)
        {
            printf("accept client fail\n");
            return -1;
        }
        p.release ();
        client->reactor (this->reactor ());
        if (client->open () == -1)
            client->handle_close (ACE_INVALID_HANDLE, 0);
        return 0;
    }
    
    virtual int handle_close (ACE_HANDLE handle,
        ACE_Reactor_Mask close_mask)
    {
        if (this->acceptor_.get_handle () != ACE_INVALID_HANDLE)
        {
            ACE_Reactor_Mask m = ACE_Event_Handler::ACCEPT_MASK |
                ACE_Event_Handler::DONT_CALL;
            this->reactor ()->remove_handler (this, m);
            this->acceptor_.close ();
        }
        return 0;
    }

protected:
    ACE_SOCK_Acceptor acceptor_;
};

int main(int argc, char *argv[]) 
{
    ACE_INET_Addr addr(3000,"127.0.0.1");
    ClientAcceptor server;
    server.reactor(ACE_Reactor::instance());
    server.open(addr);

    while(true)
    {
        ACE_Reactor::instance()->handle_events(); 
    }

    return 0; 
} 


客户端代码:

#include "ace/Reactor.h"
#include "ace/SOCK_Connector.h"

#include <string>
#include <iostream>
using namespace std;

class MyClient:public ACE_Event_Handler 
{
public:
    bool open()
    {
        ACE_SOCK_Connector connector;
        ACE_INET_Addr addr(3000,"127.0.0.1");
        ACE_Time_Value timeout(5,0);
        if(connector.connect(peer,addr,&timeout) != 0)
        {
            cout<<endl<<"connect fail.";
            return false;
        }
        ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::READ_MASK);
        cout<<endl<<"connected.";
		strcpy(buf, "hello,Server\n");
		peer.send(buf,strlen(buf));
        return true;
    }

    ACE_HANDLE get_handle(void) const
    {
        return peer.get_handle();
    }

    int handle_input (ACE_HANDLE fd)
    {
        int rev=0;
        ACE_Time_Value timeout(5,0);
        if((rev=peer.recv(buf,sizeof(buf),&timeout))>0)
        {
            buf[rev]='\0';
            cout<<endl<<"recv: "<<buf<<endl;
        }
        return 3;
    }

private:
    ACE_SOCK_Stream peer;
    char buf[100];
};

int main(int argc, char *argv[]) 
{
    MyClient client;
    client.open();

    while(true)
    {
        ACE_Reactor::instance()->handle_events(); 
    }

    return 0; 
}


分别编译运行(先运行服务端,后运行客户端)下面是执行结果。

服务器:

recv: hello,Server

客户端:

connected.
recv: hello,Client

 

参考:

http://www.cnblogs.com/TianFang/archive/2006/12/13/591332.html

http://www.cnblogs.com/TianFang/archive/2006/12/18/595938.html

2015-10-19 11:20:22 chinaclock 阅读数 5280

ACE_Reactor框架的任务:

  • 检测来自于各种各样事件源的事件的发生
  • 将事件多路分离到他们的预先注册的事件处理器上
  • 将事件分派给由处理器所定义的挂钩方法,以按照应用程序所定义的方式来处理事件

ACE_Reactor类的实现。
接口声明放在Reactor.h中,
接口实现则分别放在Reactor.cpp和Reactor.ini文件中,而Reactor.inl作为内联函数直接在Reactor.h文件incliude了。
其实内部实现,主要还是封装了ACE_Reactor_Impl类中。

ACE_Reactor::notify方法
起到的作用是,可以打断多路复用监测事件的阻塞,让多路复用线程解除阻塞后重新检查需要监测的句柄和处理器集合,然后再去阻塞等待事件。
这一方法,可以似的其他线程能够唤醒反应器的拥有者线程,而该线程的事件多路分离器函数被阻塞并等待IO事件的发生。
为了防止一个事件处理器对应的事件在通知被分派前就失效了,引起分发无效事件,增加了purge_pending_notifications()方法,用于移除任何与来自于该队列的事件处理器相关联的通知。

而在实际的ACE_Select_Reactor_Notify 类中,有如下指针说明:
/* 指向ACE_Select_Reactor的指针,如果该指针为空,那么Reactor管理器将不支持Notify事件
*/
ACE_Select_Reactor_Impl *select_reactor_;
又有一句这样的语句:

ACE_Reactor::run_reactor_event_loop方法
这个方法是用于处理事件的loop循环,除非里面的handle_events()调用返回-1。而handle_events是谁实现的呢,是本ACE_Reactor中封装的ACE_Reactor_Impl对象指针中的成员函数,而ACE_Reactor_Impl的具体对象由多种。handle_events的作用是直到超时时间到或者事件被触发才返回,其返回值是ACE_Event_Handlers派发(Dispatching)的总数。而altertable_handle_events和handle_events不同的地方在于,在eventloop会在系统将一个I/O完成例程或一个异步过程调用排队时返回。而这里所说的eventloop即run_reactor_event_loop的while(1)的loop循环。

再来看具体的实现:
在ACE_Dev_Poll_Reactor::handle_events

int ACE_Dev_Poll_Reactor::handle_events()
{
   return handle_events_i()
   {
       event_num=work_pending_i(){
            return epoll_wait();// or poll;
        }
        Dispatch(){
            dispatch_timer_handler(guard){
                timer_queue->expire_single
                }
            dispatch_io_event(guard){
                disp_mask=WIRTE/READ/EXCPET_MASK;
                callback=handle_output/input/exception;
                this->upcall(){
                    (event_handler->*callback)(handle)
                };//dispatch the detected event;
            }    


        }

   };

}

handle_events_i在其中调work_pending_i,这个函数中,若支持epoll则用epoll_wait去等待事件,否则用poll,返回值是检测到event数量。但是这里在事件分发处理时都是只处理第一个。
不管如何,这里可以看到其事件执行线程就是监测到事件的这个线程,那个线程调用ACE_Reactor::run_reactor_event_loop方法去等待并处理事件,那就是这个线程去完成事件处理。

ACE_Select_Reactor:使用select作为多路复用分离的函数,
此时配合ACE_Pipe类实现ACE_Select_Reactor的消息通知机制,提供OS核内部传输数据的、可移植的双方向IPC机制。

ACE_TP_Reactor:在ACE_Select_Reactor基础上仍使用select作为多路复用分离的函数,扩展使用了领导者跟随者模型,将ACE_Select_Reactor事件扩展成一个线程池。

此时配合ACE_Pipe类实现ACE_Select_Reactor的消息通知机制,提供OS核内部传输数据的、可移植的双方向IPC机制。

相关参考书目:
Pattern-Oriented Software Architecture Volume 1: A System of Patterns ebook PDF for free

2016-03-01 09:39:25 ClamReason 阅读数 0

ACE_Reactor执行逻辑

(1)ACE_Reactor反应器作为全局基础服务,提供了事件回调的统一管理。
(2)一般在处理类的构造函数里注册一个事件到反应器,或者由其他的模式显示调用反应器的注册函数来注册事件。
(3)一旦事件被注册到反应器,就意味着反应器拥有指向处理器的指针
(4)当事件发生时,反应器可以知道当前事件应该回调哪一个处理器
(5)当事件发生时,不同的事件会回调处理器的不同的handle_*方法
(6)执行handle_*的时候如果返回了-1,则会自动执行处理器的handle_close;处理器的remove_handler在被调用的时候也会执行handle_close
(7)handle_close在基类ACE_Event_Handler中的实现是什么也不做,仅仅是返回-1

ACE_Reacotr对处理器的释放

能够把堆对象交给ACE_Reactor,就说明对象已经经过了正常的创建和初始化。如果在创建和初始化阶段遇到了问题需要释放,那就是其他模式的职责,不归ACE_Reactor管。参考Acceptor-Connector模式对象的释放。
对于ACE_Reacotor反应器来说,处理器对象的释放都是通过执行处理器的handle_close方法来实现的(其他的模式可能会有自己的释放方式,这里不做讨论)
由于处理器自己的handle_close方法可能使用了父类的,也可能自己重写了。所以要分类讨论:
(1)如果服务处理器直接继承自ACE_Event_Handler,这个是不会释放处理器堆对象的,因为它的实现仅仅是返回-1就完事了
int
ACE_Event_Handler::handle_input (ACE_HANDLE)
{
  ACE_TRACE ("ACE_Event_Handler::handle_input");
  return -1;
}


(2)服务处理器继承自ACE_Svc_Handler(推荐),这个类自己重写了handle_close方法,主要动作就是执行自己的destroy方法,而这个方法的动作就是delete this
template <typename PEER_STREAM, typename SYNCH_TRAITS> int
ACE_Svc_Handler<PEER_STREAM, SYNCH_TRAITS>::handle_close (ACE_HANDLE,
                                                       ACE_Reactor_Mask)
{
  ACE_TRACE ("ACE_Svc_Handler<PEER_STREAM, SYNCH_TRAITS>::handle_close");

  if (this->reference_counting_policy ().value () ==
      ACE_Event_Handler::Reference_Counting_Policy::DISABLED)
    {
      this->destroy ();
    }

  return 0;
}


handle_timeout

ACE定时器

ACE_Reactor实现计时器

handle_signal

ACE_Reactor实现运行时接收键盘输入

handle_input