ace react
2011-06-25 16:48:00 fsmiy 阅读数 467

ACE为了能够在各种平台都能运行,针对windows、linux等定义了几种不同的Reactor,windows下的默认Reactor为WFMO_Reactor,其精度取决了Waitformultipleobjects的精度,单位为毫秒;Linux/Unix下默认的为select,可以精确到微妙。

由于受到对事件检测机制的影响,对于ACE_Select_Reactor来说其性能取决于select的性能,当select的句柄数量越来越大,其性能会越来越差;而ACE_WFMO_Reactor则由WaitForMultipleObjects决定,但是WaitForMultipleObjects最多只能检测64个句柄,如果应用程序要监听的句柄个数超过64个,则只能分组或者使用其他的检测机制来代替。

个人觉得,在windows下,对socket io有6中模型可根据选择,可以需要自行实现不同的reactor.

Linux/Unix也可以定义基于Poll或者epoll的reactor.其实现见以后章节。

ace react 相关内容

2015-11-10 15:00:15 hk52222 阅读数 303

整体的主要是服务端的,唯一比较大的区别来说应该是比example来的更好理解(个人感受)

下面上代码

//acceptor.h

#include "ace/Event_Handler.h"
#include "ace/INET_Addr.h"
#include "ace/Reactor.h"
#include "ace/SOCK_Stream.h"
#include "ace/SOCK_Acceptor.h"

class Handle_data;

class Acceptor: public ACE_Event_Handler
{
    public:
	    Acceptor( ACE_Reactor *r = ACE_Reactor::instance() ):ACE_Event_Handler(r){}
	    	
	    ACE_INT32 open( const ACE_UINT16 port );	
		ACE_INT32 handle_input( ACE_HANDLE = ACE_INVALID_HANDLE );
		ACE_INT32 handle_close( ACE_HANDLE = ACE_INVALID_HANDLE, ACE_Reactor_Mask = 0 );
		
		ACE_HANDLE get_handle() const
		{
			return acceptor_.get_handle();
		}
		ACE_SOCK_Acceptor &get_acceptor(){ return acceptor_; }
	private:
	     ~Acceptor(){};
    private:
        ACE_SOCK_Acceptor  acceptor_;	
};


acceptor.cpp

#include "ace/Log_Msg.h"
#include "acceptor.h"
#include "handle_data.h"

ACE_INT32 Acceptor::open( ACE_UINT16 port )
{
	
	ACE_INET_Addr  addr;
	addr.set( port, (ACE_UINT32)INADDR_ANY );
	if (acceptor_.open( addr ) == -1)
	{
		ACE_DEBUG( (LM_DEBUG, "accept open error\n") );
		return -1;
	}
	return reactor()->register_handler( this, ACE_Event_Handler::ACCEPT_MASK );	
}


ACE_INT32 Acceptor::handle_input( ACE_HANDLE handle )
{
	Handle_data  *handle_data = 0;
	ACE_NEW_RETURN( handle_data, Handle_data( reactor() ), -1 );
	//等待链接
	if (acceptor_.accept( handle_data->get_peer() ) == -1)
	{
		ACE_DEBUG( (LM_DEBUG, "accept handle input accept error!\n") );
		handle_data->handle_close();
		return -1;
	}
	//获取远程链接地址
	else if (handle_data->open() == -1)
	{
		ACE_DEBUG( (LM_DEBUG, "accept handle input open error!\n") );
		handle_data->handle_close();
		return -1;
	}
	else
	{
		ACE_DEBUG( (LM_DEBUG, "accept handle input ok!\n") );
		return 0;
	}	
}


ACE_INT32 Acceptor::handle_close( ACE_HANDLE handle, ACE_Reactor_Mask mask )
{
	acceptor_.close();
	delete this;
	ACE_DEBUG( (LM_DEBUG, "accept handle close ok!\n") );
	return 0;	
}


handle_data.h

#include "ace/Event_Handler.h"
#include "ace/SOCK_Stream.h"
#include "ace/INET_Addr.h"
#include "ace/Reactor.h"
#include "ace/Time_Value.h"

class Handle_data: public ACE_Event_Handler
{
	public:		
	    Handle_data( ACE_Reactor *r = ACE_Reactor::instance() ):ACE_Event_Handler(r)  {}
	    ACE_INT32 open( );
	    ACE_INT32 handle_input( ACE_HANDLE = ACE_INVALID_HANDLE );
	    ACE_INT32 handle_close( ACE_HANDLE = ACE_INVALID_HANDLE, ACE_Reactor_Mask mask = 0 );
	    ACE_INT32 handle_output (ACE_HANDLE fd = ACE_INVALID_HANDLE);
	    ACE_HANDLE get_handle( ) const
		{
			return peer_.get_handle();
		}
		ACE_SOCK_Stream &get_peer()
		{
			return peer_;
		}
	private:	
		~Handle_data(){ACE_DEBUG( (LM_DEBUG, "handle data ~dctor .\n") );};
	private:	
		ACE_SOCK_Stream  peer_;		
};


handle_data.cpp

#include "ace/SOCK_Stream.h"
#include "ace/Reactor.h"
#include "ace/Event_Handler.h"
#include "ace/Log_Msg.h"
#include "ace/Timer_Queue.h"
#include "handle_data.h"
#include <iostream>

ACE_INT32 Handle_data::open( )
{	
	ACE_INT32 ret = 0;
	ACE_INET_Addr remote_addr;	
	get_peer().get_remote_addr( remote_addr );		
	ACE_DEBUG( (LM_DEBUG, "the remote addr is %s\n", remote_addr.get_host_addr())  );
	//注册句柄
	//ACE_Event_Handler::ACCEPT_MASK
	ret = reactor()->register_handler( this, ACE_Event_Handler::READ_MASK| ACE_Event_Handler::WRITE_MASK);
	if (ret != -1)	
	{		
		ACE_DEBUG( (LM_DEBUG, "handle data register ok!\n")  );		
	}	
	return ret;
}





ACE_INT32 Handle_data::handle_close( ACE_HANDLE , ACE_Reactor_Mask )
{	
	get_peer().close();
	ACE_DEBUG( (LM_DEBUG, "handle data close.\n") );
	delete this;
	return 0;
}
//数据接收
ACE_INT32 Handle_data::handle_input( ACE_HANDLE )
{	
	ACE_INT8 buf[512] = {0};
	ACE_INT32 len;
	  
	len = get_peer().recv( buf, 500);
	if (len > 0)
	{
		ACE_DEBUG( (LM_DEBUG, "recv data %s, len:%d.\n", buf, len) );
		
	    return 0;
	}
	else if (len == 0)
	{
		ACE_DEBUG( (LM_DEBUG, "recv data len is 0, client exit.\n") );
		return -1;
	}
	else
	{
		ACE_DEBUG( (LM_DEBUG, "recv data error len < 0" ) );
		return -1;
	}	
	
}
//数据发送
ACE_INT32 Handle_data::handle_output( ACE_HANDLE fd /*= ACE_INVALID_HANDLE*/ )
{
	get_peer().send_n("hello Client\n",12);
	return 0;
}

main.cpp

#include "ace/Log_Msg.h"
#include "acceptor.h"
#include "handle_data.h"

ACE_INT32 main( ACE_INT32 argc, char **argv )
{
	ACE_UINT16  port = 0;
	/*if (argc < 2) 
	return -1;*/
	//port = ACE_OS::atoi(argv[1]);
	port = 60000;
	Acceptor    *accept;
	
	ACE_NEW_RETURN( accept, Acceptor( ACE_Reactor::instance () ),-1 );
	if (accept->open( port ) == -1)
	{
		accept->handle_close();
		ACE_DEBUG( (LM_DEBUG, "main open error!\n") );
		return -1;
	}	
	if (ACE_Reactor::run_event_loop() ==-1)
	{
		accept->handle_close();
		ACE_DEBUG( (LM_DEBUG, "main run event loop error!\n") );
		return -1;
	}
	accept->handle_close();
	return 0;
}




ace react 相关内容

2013-04-25 17:10:20 zqf_office 阅读数 508

ace react 相关内容

2008-05-06 15:15:00 superyao2008 阅读数 1646

reactor 模式是ACE当中比较简单的一个,不初学的还是有个地方容易搞错以致程序无法运行,这是我写过跳过的一个简单的用reactor模式做客户端的练习。高手看到不足之处还请赐教。

HandlerClient.h

 

#ifndef HANDLER_CLIENT_H
#define HANDLER_CLIENT_H
#include 
<string>
#include 
"ace/Reactor.h"
#include 
"ace/SOCK_Connector.h"
#define REV_BUFSIZE 1024

class HandlerClient: public ACE_Event_Handler{
public:
    
int open();

    ACE_HANDLE get_handle() 
const;

    
int handle_input(ACE_HANDLE fd=ACE_INVALID_HANDLE);

    
int handle_output(ACE_HANDLE fd=ACE_INVALID_HANDLE);

    
int handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask);

public:
    std::
string _addr;
    unsigned 
short _port;

private:
    ACE_SOCK_Stream peer;
    
char revbuf[REV_BUFSIZE];

};
#endif

 

 

HandlerClient.cpp

 

#include "HandlerClient.h"

int HandlerClient::open(){
    ACE_SOCK_Connector connector;
    ACE_INET_Addr addr(_port,_addr.c_str());
    ACE_Time_Value timeout(
5,0);
    
if(connector.connect(peer,addr,&timeout) != 0)
        {
            ACE_ERROR_RETURN( (LM_ERROR,ACE_LIB_TEXT(
"error connect server %s:%d "),_addr.c_str(),_port),-1 );
        }
    
else{
        ACE_DEBUG( (LM_DEBUG,ACE_LIB_TEXT(
"connect server %s:%d done "),_addr.c_str(),_port) );
    }
    
return reactor()->register_handler(this,ACE_Event_Handler::READ_MASK|ACE_Event_Handler::WRITE_MASK);
}

ACE_HANDLE HandlerClient::get_handle() 
const{
    
return peer.get_handle();
}

int HandlerClient::handle_output(ACE_HANDLE fd ){
    size_t r
=peer.send("hello from client.",64);
    ACE_DEBUG( (LM_DEBUG,ACE_LIB_TEXT(
"send %d bytes to server "),r) );
    
return 0;
}

int HandlerClient::handle_input(ACE_HANDLE fd ){
    size_t r
=peer.recv(revbuf,REV_BUFSIZE);
    ACE_DEBUG( (LM_DEBUG,ACE_LIB_TEXT(
"receive from server: %s "),revbuf) );
    
return 0;

}

int HandlerClient::handle_close(ACE_HANDLE handle, ACE_Reactor_Mask close_mask){
    
if (peer.get_handle()!=INVALID_HANDLE_VALUE)
    {
        reactor()
->remove_handler(this,ACE_Event_Handler::ALL_EVENTS_MASK);
        peer.close();
    }
    
return 0;
}

 

 

main.cpp

 

#include "HandlerClient.h"

int ACE_TMAIN(int arg_num, ACE_TCHAR *[]){
    HandlerClient c;
    c._addr
="localhost";
    c._port
=7144;
    c.reactor(ACE_Reactor::instance());
    c.open();
    ACE_Reactor::instance()
->run_reactor_event_loop();
    
return 0;
}

 

 

ace react 相关内容

2012-02-16 23:59:02 weiqubo 阅读数 1628

1ACE反应器框架简介

反应器(Reactor):用于事件多路分离和分派的体系结构模式

通常的,对一个文件描述符指定的文件或设备, 有两种工作方式: 阻塞非阻塞。所谓阻塞方式的意思是指, 当试图对该文件描述符进行读写时, 如果当时没有东西可读,或者暂时不可写, 程序就进入等待状态, 直到有东西可读或者可写为止。而对于非阻塞状态, 如果没有东西可读, 或者不可写, 读写函数马上返回, 而不会等待

在前面的章节中提到的Tcp通信的例子中,就是采用的阻塞式的工作方式:当接收tcp数据时,如果远端没有数据可以读,则会一直阻塞到读到需要的数据为止。这种方式的传输和传统的被动方法的调用类似,非常直观,并且简单有效,但是同样也存在一个效率问题,如果你是开发一个面对着数千个连接的服务器程序,对每一个客户端都采用阻塞的方式通信,如果存在某个非常耗时的读写操作时,其它的客户端通信将无法响应,效率非常低下。

一种常用做法是:每建立一个Socket连接时,同时创建一个新线程对该Socket进行单独通信(采用阻塞的方式通信)。这种方式具有很高的响应速度,并且控制起来也很简单,在连接数较少的时候非常有效,但是如果对每一个连接都产生一个线程的无疑是对系统资源的一种浪费,如果连接数较多将会出现资源不足的情况。

另一种较高效的做法是:服务器端保存一个Socket连接列表,然后对这个列表进行轮询,如果发现某个Socket端口上有数据可读时(读就绪),则调用该socket连接的相应读操作;如果发现某个Socket端口上有数据可写时(写就绪),则调用该socket连接的相应写操作;如果某个端口的Socket连接已经中断,则调用相应的析构方法关闭该端口。这样能充分利用服务器资源,效率得到了很大提高。

在Socket编程中就可以通过select等相关API实现这一方式。但直接用这些API控制起来比较麻烦,并且也难以控制和移植,在ACE中可以通过Reactor模式简化这一开发过程。

反应器本质上提供一组更高级的编程抽象,简化了事件驱动的分布式应用的设计和实现。除此而外,反应器还将若干不同种类的事件的多路分离集成到易于使用的API中。特别地,反应器对基于定时器的事件、信号事件、基于I/O端口监控的事件和用户定义的通知进行统一地处理。

ACE中的反应器与若干内部和外部组件协同工作。其基本概念是反应器框架检测事件的发生(通过在OS事件多路分离接口上进行侦听),并发出对预登记事件处理器(event handler)对象中的方法的"回调"(callback)。该方法由应用开发者实现,其中含有应用处理此事件的特定代码。

使用ACE的反应器,只需如下几步:

  1. 创建事件处理器,以处理他所感兴趣的某事件。
  2. 在反应器上登记,通知说他有兴趣处理某事件,同时传递他想要用以处理此事件的事件处理器的指针给反应器。

随后反应器框架将自动地:

  1. 在内部维护一些表,将不同的事件类型与事件处理器对象关联起来。
  2. 在用户已登记的某个事件发生时,反应器发出对处理器中相应方法的回调。

反应器模式在ACE中被实现为ACE_Reactor类,它提供反应器框架的功能接口。

如上面所提到的,反应器将事件处理器对象作为服务提供者使用。反应器内部记录某个事件处理器特定事件的相关回调方法。当这些事件发生时,反应器会创建这种事件和相应的事件处理器的关联。

  1. 事件处理器
    事件处理器就是需要通过轮询发生事件改变的对象列表中的对象,如在上面的例子中就是连接的客户端,每个客户端都可以看成一个事件处理器。
  2. 回调事件
    就是反应器支持的事件,如Socket读就绪,写就绪。拿上面的例子来说,如果某个客户端(事件处理器)在反应器中注册了读就绪事件,当客户端给服务器发送一条消息的时候,就会触发这个客户端的数据可读的回调函数。

在反应器框架中,所有应用特有的事件处理器都必须由ACE_Event_Handler的抽象接口类派生。可以通过重载相应的"handle_"方法实现相关的回调方法。

使用ACE_Reactor基本上有三个步骤:

  1. 创建ACE_Event_Handler的子类,并在其中实现适当的"handle_"方法,以处理你想要此事件处理器为之服务的事件类型。
  2. 通过调用反应器对象的register_handler(),将你的事件处理器登记到反应器。
  3. 在事件发生时,反应器将自动回调相应的事件处理器对象的适当的handle_"方法

下面我就以一个Socket客户端的例子为例简单的说明反应器的基本用法。

Cpp代码  收藏代码
  1. #include <ace/OS.h>  
  2. #include <ace/Reactor.h>  
  3. #include <ace/SOCK_Connector.h>   
  4.   
  5. #include <string>  
  6. #include <iostream>  
  7. using namespace std;  
  8.   
  9. class MyClient:public ACE_Event_Handler   
  10. {  
  11. public:  
  12.     bool open()  
  13.     {  
  14.         ACE_SOCK_Connector connector;  
  15.         ACE_INET_Addr addr(3000,"127.0.0.1");  
  16.         ACE_Time_Value timeout(5,0);  
  17.         if(connector.connect(peer,addr,&timeout) != 0)  
  18.         {  
  19.             cout<<endl<<"connecetd fail";  
  20.             return false;  
  21.         }  
  22.         ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::READ_MASK);  
  23.         cout<<endl<<"connecetd ";  
  24.         return true;  
  25.     }  
  26.   
  27.     ACE_HANDLE get_handle(voidconst  
  28.     {  
  29.         return peer.get_handle();  
  30.     }  
  31.   
  32.     int handle_input (ACE_HANDLE fd)  
  33.     {  
  34.         int rev=0;  
  35.         ACE_Time_Value timeout(5,0);  
  36.         if((rev=peer.recv(buffer,1000,&timeout))>0)  
  37.         {  
  38.             buffer[rev]='\0';  
  39.             cout<<endl<<"rev:\t"<<buffer<<endl;  
  40.         }  
  41.         return 3;  
  42.     }  
  43.   
  44. private:  
  45.     ACE_SOCK_Stream peer;  
  46.     char buffer[1024];  
  47. };  
  48.   
  49. int main(int argc, char *argv[])   
  50. {  
  51.     MyClient client;  
  52.     client.open();  
  53.   
  54.     while(true)  
  55.     {  
  56.         ACE_Reactor::instance()->handle_events();   
  57.     }  
  58.   
  59.     return 0;   
  60. }  
 

在这个例子中,客户端连接上服务器后,通过ACE_Reactor::instance()->register_handler(this,ACE_Event_Handler::READ_MASK)注册了一个读就绪的回调函数,当服务器端给客户端发消息的时候,会自动触发handle_input()函数,将接收到的信息打印出来。

这个例子只是为了演示反应器的基本用法,并不完善,我将在下一节中对如何在Socket通信中使用反应器做进一步的介绍。


在Socket编程中,常见的事件就是"读就绪","写就绪",通过对这两个事件的捕获分发,可以实现Socket中的异步操作。

Socket编程中的事件处理器

在前面我们已经介绍过,在ACE反应器框架中,任何都必须派生自ACE_Event_Handler类,并通过重载其相应会调事件处理函数来实现相应的回调处理的。在Socket编程中,我们通常需要重载的函数有

  1. handle_input()
    当I/O句柄(比如UNIX中的文件描述符)上的输入可用时,反应器自动回调该方法。
  2. handle_output()
    当I/O设备的输出队列有可用空间时,反应器自动回调该方法。
  3. handle_close()
    当事件处理器中的事件从Reactor中移除的时候调用。

此外,为了使Reactor能通过I/O句柄找到对应的事件处理器,还必须重载其get_handle()方法以使得Reactor建立起I/O句柄和事件处理器的关联。

使用Reactor框架。

下面我们将以一个客户端的程序为例,介绍如何在Socket编程中使用Reactor框架。

一.建立一个客户端对象(事件处理器)。

客户端对象就是一个事件处理器,其声明如下:

Cpp代码  收藏代码
  1. class Client:public ACE_Event_Handler  
  2. {  
  3. public:  
  4.     ACE_HANDLE get_handle(voidconst;  
  5.     int handle_input (ACE_HANDLE fd);  
  6.     int handle_close (ACE_HANDLE handle,  
  7. ACE_Reactor_Mask close_mask);  
  8.     ACE_SOCK_Stream& Peer();  
  9. private:  
  10.     ACE_SOCK_Stream peer;  
  11. };  
 

Client端中我只关心"读就绪"事件,故只重载了handle_input函数(大多数应用下只需要重载handle_input函数)。另外,在客户端还保存了一个ACE_SOCK_Streampeer对象用来进行Socket通信,同时封装了一个Peer()函数返回它的引用。

二.重载相应回调处理函数

Cpp代码  收藏代码
  1. ACE_SOCK_Stream& Client::Peer()  
  2. {  
  3.     return peer;  
  4. }  
  5.   
  6. ACE_HANDLE Client::get_handle(voidconst  
  7. {  
  8.     return peer.get_handle();  
  9. }  
  10.   
  11. int Client::handle_input (ACE_HANDLE fd)  
  12. {  
  13.     int rev=0;  
  14.     if((rev = peer.recv(buffer,1000))>0)  
  15.     {  
  16.         buffer[rev]='\0';  
  17.         cout<<endl<<"rev:\t"<<buffer<<endl;  
  18.         return 0;  
  19.     }  
  20.     else    //Socket连接发生错误,返回-1,在Reactor中注销事件,触发handle_close函数  
  21.     {  
  22.         return -1;  
  23.     }  
  24. }  
  25.   
  26. int Client::handle_close (ACE_HANDLE handle,  
  27.                          ACE_Reactor_Mask close_mask)  
  28. {  
  29.     cout<<endl<<"connecetd closed";  
  30.     return ACE_Event_Handler::handle_close(handle,close_mask);  
  31. }  
 

几个函数的功能都非常简单,这里就不多做介绍了。

三.在Reactor中注册事件

首先让我们来看看相应的main函数的代码:

Cpp代码  收藏代码
  1. int main(int argc, char *argv[])   
  2. {  
  3.     Client client;  
  4.     ACE_SOCK_Connector connector;  
  5.     ACE_INET_Addr addr(3000,"127.0.0.1");  
  6.     ACE_Time_Value timeout(5,0);  
  7.     if(connector.connect(client.Peer(),addr,&timeout) != 0)  
  8.     {  
  9.         cout<<endl<<"connecetd fail";  
  10.         return 0;  
  11.     }  
  12.   
  13.     ACE_Reactor::instance()->register_handler(&client,ACE_Event_Handler::READ_MASK);  
  14.   
  15. while(true)  
  16. {  
  17. ACE_Reactor::instance()->handle_events();   
  18. }  
  19.   
  20. return 0;   
  21. }  
 

在这里可以看到,使用Reactor框架后,依然首先通过ACE_SOCK_Connector的connect函数来建立连接。建立连接后,可以通过ACE_Reactor::instance()->register_handler函数来实现Reactor的注册,实现I/O事件和Client对象的handle_input方法相关联,它的第一个参数是事件处理器的地址,第二个参数是事件类型,由于这里只关心读就绪事件,故注册的事件类型是ACE_Event_Handler::READ_MASK

四.启动Reactor事件循环

通过如上设置后,我们就可以通过ACE_Reactor::instance()->handle_events()启动Reactor循环了,这样,每当服务器端有数据发送给客户端时,当客户端的数据就绪时,就回触发Client对象的handle_input函数,将接收的数据打印出来。

通常的做法是,将Reactor事件循环作为一个单独的线程来处理,这样就不会阻塞main函数。

五.注销Reactor事件

Reactor事件的注销一般有两种方式,显式和隐式,下面将分别给予介绍。

  1. 隐式注销。
    当Reactor捕获事件后,会触发相应的"handle_"处理函数,当"handle_"处理函数返回值大于或等于0时,表示处理事件成功,当返回值小于0时,表示处理事件失败,这时Reactor会自动注销该句柄所注册的所有事件,并触发handle_close函数,以执行相应的资源清理工作。
    在本例中,当handle_input函数里的recv函数接收到0个数时,说明socket发生错误(大多为Socket连接中断),此时返回-1,则系统自动注销相应事件。
  2. 显示注销。
    调用Reactor对象的remove_handler方法移除,它有两个参数,第一个是所注册的事件反应器对象,第二个是所要注销的事件。

在这个示例程序里,连接方只有一个Socket连接,Reactor的优势并没有体现出来,但在一些网络管理系统里,连接方需要对多个需要管理的设备(服务器端)进行连接,在这种情况下使用Reactor模式,只需要多开一个Reactor事件循环线程就能实现事件多路分发复用,并且不会阻塞,通过面向对象的回调方式管理,使用起来非常方便。

Reactor框架的另外一个常用的地方就是服务器端,一般是一个服务器端对应多个客户端,这样用Reactor模式能大幅提高并发能力,这方面的编程方法将在下一章给与介绍。


在服务器端使用Reactor框架

使用Reactor框架的服务器端结构如下:

服务器端注册两种事件处理器,ClientAcceptor和ClientService ,ClientService类负责和客户端的通信,每一个ClientService对象对应一个客户端的Socket连接。 ClientAcceptor专门负责被动接受客户端的连接,并创建ClientService对象。这样,在一个N个Socket连接的服务器程序中,将存在1个ClientAcceptor对象和N个ClientService对象。

整个服务器端流程如下:

  1. 首先创建一个ClientAcceptor对象,该对象在Reactor上注册ACCEPT_MASK事件,Reactor将自动在监听端口建立Socket监听。
  2. 如果有对该端口的Socket连接时,Reactor将自动回调handle_input方法,ClientAcceptor重载此方法,并创建一个ClientService对象,用于处理和Client的通信。
  3. ClientService对象根据服务器的具体功能实现,其处理过程和客户端程序类似,注册相应的回调事件并分发即可。

代码如下:

Cpp代码  收藏代码
  1. #include <ace/OS.h>  
  2. #include <ace/Reactor.h>  
  3. #include <ace/SOCK_Connector.h>   
  4. #include <ace/SOCK_Acceptor.h>   
  5. #include <ace/Auto_Ptr.h>  
  6.   
  7. class ClientService : public ACE_Event_Handler  
  8. {  
  9. public:  
  10.     ACE_SOCK_Stream &peer (void) { return this->sock_; }  
  11.   
  12.     int open (void)  
  13.     {  
  14.         //注册读就绪回调函数  
  15.         return this->reactor ()->register_handler(this, ACE_Event_Handler::READ_MASK);  
  16.     }  
  17.   
  18.     virtual ACE_HANDLE get_handle (voidconst { return this->sock_.get_handle (); }  
  19.   
  20.     virtual int handle_input (ACE_HANDLE fd )  
  21.     {  
  22.         //一个简单的EchoServer,将客户端的信息返回  
  23.         int rev = peer().recv(buf,100);  
  24.         if(rev<=0)  
  25.             return -1;  
  26.   
  27.         peer().send(buf,rev);  
  28.         return 0;  
  29.     }  
  30.   
  31.     // 释放相应资源  
  32. virtual int handle_close (ACE_HANDLE, ACE_Reactor_Mask mask)  
  33.     {  
  34.         if (mask == ACE_Event_Handler::WRITE_MASK)  
  35.             return 0;  
  36.         mask = ACE_Event_Handler::ALL_EVENTS_MASK |  
  37.             ACE_Event_Handler::DONT_CALL;  
  38.         this->reactor ()->remove_handler (this, mask);  
  39.         this->sock_.close ();  
  40.         delete this;    //socket出错时,将自动删除该客户端,释放相应资源  
  41.         return 0;  
  42.     }  
  43.   
  44. protected:  
  45.     char buf[100];  
  46.     ACE_SOCK_Stream sock_;  
  47. };  
  48.   
  49. class ClientAcceptor : public ACE_Event_Handler  
  50. {  
  51. public:  
  52.     virtual ~ClientAcceptor (){this->handle_close (ACE_INVALID_HANDLE, 0);}  
  53.   
  54.     int open (const ACE_INET_Addr &listen_addr)  
  55.     {  
  56.         if (this->acceptor_.open (listen_addr, 1) == -1)  
  57.         {  
  58.             ACE_OS::printf("open port fail");  
  59.             return -1;  
  60.         }  
  61.         //注册接受连接回调事件  
  62.         return this->reactor ()->register_handler(this, ACE_Event_Handler::ACCEPT_MASK);  
  63.     }  
  64.   
  65.     virtual ACE_HANDLE get_handle (voidconst  
  66.     { return this->acceptor_.get_handle (); }  
  67.   
  68.     virtual int handle_input (ACE_HANDLE fd )  
  69.     {  
  70.         ClientService *client = new ClientService();  
  71.         auto_ptr<ClientService> p (client);  
  72.   
  73.         if (this->acceptor_.accept (client->peer ()) == -1)  
  74.         {  
  75.             ACE_OS::printf("accept client fail");  
  76.             return -1;  
  77.         }  
  78.         p.release ();  
  79.         client->reactor (this->reactor ());  
  80.         if (client->open () == -1)  
  81.             client->handle_close (ACE_INVALID_HANDLE, 0);  
  82.         return 0;  
  83.     }  
  84.       
  85.     virtual int handle_close (ACE_HANDLE handle,  
  86.         ACE_Reactor_Mask close_mask)  
  87.     {  
  88.         if (this->acceptor_.get_handle () != ACE_INVALID_HANDLE)  
  89.         {  
  90.             ACE_Reactor_Mask m = ACE_Event_Handler::ACCEPT_MASK |  
  91.                 ACE_Event_Handler::DONT_CALL;  
  92.             this->reactor ()->remove_handler (this, m);  
  93.             this->acceptor_.close ();  
  94.         }  
  95.         return 0;  
  96.     }  
  97.   
  98. protected:  
  99.     ACE_SOCK_Acceptor acceptor_;  
  100. };  
  101.   
  102. int main(int argc, char *argv[])   
  103. {  
  104.     ACE_INET_Addr addr(3000,"192.168.1.142");  
  105.     ClientAcceptor server;  
  106.     server.reactor(ACE_Reactor::instance());  
  107.     server.open(addr);  
  108.   
  109.     while(true)  
  110.     {  
  111.         ACE_Reactor::instance()->handle_events();   
  112.     }  
  113.   
  114.     return 0;   
  115. }  
 

代码功能比较简单,需要注意以下几点:

  1. 这里注册事件的方式和前面的文章中方式不一样,是通过ACE_Event_Handler类的reactor()方法设置和获取reactor的指针,比较直观和方便。前面的文章是通过ACE_Reactor::instance()来获取的一个单体reactor的指针。
  2. 当客户端Socket连接关闭时,需要释放相应资源,需要注意一下ClientService对象的handle_close方法中释放资源的相应代码。


定时器的实现

通过Reactor机制,还可以很容易的实现定时器的功能,使用方式如下。

  1. 编写一个事件反应器,重载handle_timeout()方法,该方法是定时器的触发时间到时,会自动触发该方法。
  2. 通过Reactor的schedule_timer()方法注册定时器。
  3. 启动reacotr的handle_events()事件分发循环。
  4. 当不想使用定时器时,可以通过Reactor的cancel_timer()方法注销定时器。

下面的代码简单的实现了一个定时器,并具有基本的开启,关闭功能。

Cpp代码  收藏代码
  1. #include <ace/OS.h>  
  2. #include <ace/Reactor.h>  
  3.   
  4. class MyTimerHandler : public ACE_Event_Handler  
  5. {  
  6. private:  
  7.     int inteval;    //执行时间间隔  
  8.     int delay;        //延迟执行时间  
  9.     int timerid;  
  10.   
  11. public:  
  12.     MyTimerHandler(int delay,int inteval)  
  13.     {  
  14.         this->delay=delay;  
  15.         this->inteval=inteval;  
  16.     }  
  17.   
  18.     int open()    //注册定时器  
  19.     {  
  20.         ACE_Time_Value delaytime(inteval);  
  21.         ACE_Time_Value intevaltime(inteval);  
  22.         timerid = reactor()->schedule_timer(this,  
  23.             0,    //传递handle_timeout给的参数  
  24.             delaytime,  
  25.             intevaltime);  
  26.         return timerid;  
  27.     }  
  28.   
  29.     int close()    //取消定时器  
  30.     {  
  31.         return reactor()->cancel_timer(timerid);  
  32.     }  
  33.   
  34.     //定时器回调函数  
  35.     int handle_timeout (const ACE_Time_Value &current_time,  
  36.         const void * = 0)  
  37.     {  
  38.         time_t epoch = ((timespec_t)current_time).tv_sec;  
  39.         ACE_DEBUG ((LM_INFO,  
  40.             ACE_TEXT ("handle_timeout: %s\n"),  
  41.             ACE_OS::ctime (&epoch)));  
  42.         return 0;  
  43.     }  
  44. };  
  45.   
  46. int main(int argc, char *argv[])   
  47. {  
  48.     MyTimerHandler * timer = new MyTimerHandler (3,5);  
  49.     timer->reactor(ACE_Reactor::instance());  
  50.     timer->open();  
  51.   
  52.     for(int i=0;i<2;i++)    //触发次handle_timeout事件  
  53.     {  
  54.         ACE_OS::printf("\n%d\n",i);  
  55.         ACE_Reactor::instance()->handle_events();  
  56.     }  
  57.   
  58.     timer->close();  
  59.     ACE_OS::printf("cancel timer");  
  60.     while(true)  
  61.         ACE_Reactor::instance()->handle_events();  
  62.     return 0;   
  63. }  
 

代码功能比较简单,这里就不多做介绍了。


ace react 相关内容

ACE反应器(Reactor)2

阅读数 733

没有更多推荐了,返回首页