export react 多个
2011-11-23 21:54:56 chennut0802 阅读数 291

摘自 http://hi.baidu.com/pass86/blog/item/1d908b16f21a0e53f2de320a.html

ACE Reactor框架设计的目标是,实现一种灵活的事件处理机制,使应用无需为了满足事件处理的需要而编写平台相关的中心代码。使用Reactor框架,应用要实现其事件处理只需要做三件事情。


ONE:从 ACE_Event_Handler 派生一个或多个类,并给各个虚回调方法增加应用特有的事件处理行为。
#include <ace/Event_Handler.h>
//...
class Service : public ACE_Event_Handler
{
/// Called when input events occur (e.g., connection or data).
virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);

/// Called when output events are possible (e.g., when flow control
/// abates or non-blocking connection completes).
virtual int handle_output (ACE_HANDLE fd = ACE_INVALID_HANDLE);

/// Called when an exceptional events occur (e.g., SIGURG).
virtual int handle_exception (ACE_HANDLE fd = ACE_INVALID_HANDLE);

/**
   * Called when timer expires. <current_time> represents the current
   * time that the <Event_Handler> was selected for timeout
   * dispatching and <act> is the asynchronous completion token that
   * was passed in when <schedule_timer> was invoked.
   */
virtual int handle_timeout (const ACE_Time_Value &current_time,
                              const void *act = 0);


/// Called when a process exits.
virtual int handle_exit (ACE_Process *);

/// Called when a <handle_*()> method returns -1 or when the
/// <remove_handler> method is called on an ACE_Reactor. The
/// <close_mask> indicates which event has triggered the
/// <handle_close> method callback on a particular @a handle.
virtual int handle_close (ACE_HANDLE handle,
                            ACE_Reactor_Mask close_mask);


/// Called when object is signaled by OS (either via UNIX signals or
/// when a Win32 object becomes signaled).
virtual int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0);

virtual int handle_qos (ACE_HANDLE = ACE_INVALID_HANDLE);
virtual int handle_group_qos (ACE_HANDLE = ACE_INVALID_HANDLE);

/// Get the I/O handle.
virtual ACE_HANDLE get_handle (void) const;

//根据应用选择性的重载

}


TWO:向 ACE_Reactor 类登记应用的事件处理对象,把每个事件处理对象与它感兴趣的事件关联起来。
先看ACE_Reactor(<ace/Reactor.h>)成员register_handler一个声明(还有其它几个传不同参数):
/**
   * Register handler for I/O events.
   *
   * A handler can be associated with multiple handles. A handle
   * cannot be associated with multiple handlers.
   *
   * The handle will come from ACE_Event_Handler::get_handle().
   *
   * Reactor will call ACE_Event_Handler::add_reference() for a new
   * handler/handle pair.
   *
   * If this handler/handle pair has already been registered, any new
   * masks specified will be added. In this case,
   * ACE_Event_Handler::add_reference() will not be called.
   *
   * If the registered handler is currently suspended, it will remain
   * suspended. When the handler is resumed, it will have the
   * existing masks plus any masks added through this call. Handlers
   * do not have partial suspensions.
   */
virtual int register_handler (ACE_Event_Handler *event_handler,
                                ACE_Reactor_Mask mask);

ACE_Reactor_Mask声明于 <ace/Event_Handler.h> 中:
enum
{
    LO_PRIORITY = 0,
    HI_PRIORITY = 10,
    NULL_MASK = 0,
#if defined (ACE_USE_POLL)
    READ_MASK = POLLIN,
    WRITE_MASK = POLLOUT,
    EXCEPT_MASK = POLLPRI,
#else /* USE SELECT */
    READ_MASK = (1 << 0),
    WRITE_MASK = (1 << 1),
    EXCEPT_MASK = (1 << 2),
#endif /* ACE_USE_POLL */
    ACCEPT_MASK = (1 << 3),
    CONNECT_MASK = (1 << 4),
    TIMER_MASK = (1 << 5),
    QOS_MASK = (1 << 6),
    GROUP_QOS_MASK = (1 << 7),
    SIGNAL_MASK = (1 << 8),
    ALL_EVENTS_MASK = READ_MASK |
                      WRITE_MASK |
                      EXCEPT_MASK |
                      ACCEPT_MASK |
                      CONNECT_MASK |
                      TIMER_MASK |
                      QOS_MASK |
                      GROUP_QOS_MASK |
                      SIGNAL_MASK,
    RWE_MASK = READ_MASK |
               WRITE_MASK |
               EXCEPT_MASK,
    DONT_CALL = (1 << 9)
};

你可在 Service 类中,把每个事件处理对象与它感兴趣的事件关联起来:
    this->reactor()->register_handler(this, ACE_Event_Handler::READ_MASK);//关联virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);回调函数。

向 ACE_Reactor 类登记应用的事件处理对象:
    Service service;

    service.reactor(ACE_Reactor::instance());


THREE:运行 ACE_Reactor 事件循环。

    ACE_Reactor::instance()->run_reactor_event_loop();
    //还有其它的几个循环函数,还没有具体研究

可以看出,我们用的是一个 ACE_Reactor 单件,由 ACE_Object_Manager 管理它。

export react 多个 相关内容

2008-12-05 21:25:00 jacklam200 阅读数 2249

 ACE Reactor框架:

    只要做三件事:

        1.从ACE_Event_Handler派生一个或多个类,并给各个虚回调方法增加应用特有的事件处理行为

        2.向ACE_Reactor类登记应用的事件处理对象,把每个事件处理对象与它感兴趣的事件关联起来

        3.运行ACE_Reactor事件循环

一个接受连接的例子:

  1. #include <iostream>
  2. #include "ace/auto_ptr.h"
  3. #include "ace/log_msg.h"
  4. #include "ace/inet_addr.h"
  5. #include "ace/sock_acceptor.h"
  6. #include "ace/reactor.h"
  7. #include "ace/Message_Block.h"
  8. #include "ace/Message_Queue.h"
  9. #include "ace/SOCK_Stream.h"
  10. #include "ace/Null_Mutex.h"
  11. #include "ace/Null_Condition.h"
  12. using namespace std;
  13. //服务客户
  14. class ClientService:public ACE_Event_Handler
  15. {
  16. public:
  17.     ACE_SOCK_Stream &peer(void)
  18.     {
  19.         return this->sock_;
  20.     }
  21.     int open(void);
  22.     virtual ACE_HANDLE get_handle(voidconst
  23.     {
  24.         return this->sock_.get_handle();
  25.     }
  26.     virtual int handle_input(ACE_HANDLE fd=ACE_INVALID_HANDLE);
  27.     virtual int handle_output(ACE_HANDLE fd=ACE_INVALID_HANDLE);
  28.     virtual int handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask);
  29. protected:
  30.     ACE_SOCK_Stream sock_;
  31.     ACE_Message_Queue<ACE_NULL_SYNCH> output_queue_;
  32. };
  33. int ClientService::open(void)
  34. {
  35.     ACE_TCHAR peer_name[512];
  36.     ACE_INET_Addr peer_addr;
  37.     if(this->sock_.get_remote_addr(peer_addr)==0&&peer_addr.addr_to_string(peer_name,512)==0)
  38.         cout<<" connection from "<<peer_name<<endl;
  39.     return this->reactor()->register_handler(this,ACE_Event_Handler::READ_MASK);
  40. }
  41. int ClientService::handle_input(ACE_HANDLE)
  42. {
  43.     const size_t INPUT_SIZE=4096;
  44.     char buffer[INPUT_SIZE];
  45.     ssize_t recv_cnt,send_cnt;
  46.     if((recv_cnt=this->sock_.recv(buffer,sizeof(buffer)))<=0)
  47.     {
  48.         ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%P|%t) connection closed/n")));
  49.         return -1;
  50.     }
  51.     send_cnt=this->sock_.send(buffer,ACE_static_cast(size_t,recv_cnt));
  52.     if(send_cnt==recv_cnt)
  53.         return 0;
  54.     if(send_cnt==-1&&ACE_OS::last_error()!=EWOULDBLOCK)
  55.         ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("%P|%t) %p/n"),ACE_TEXT("send")),0);
  56.     if(send_cnt==-1)
  57.         send_cnt=0;
  58.     ACE_Message_Block *mb;
  59.     size_t remaining=ACE_static_cast(size_t,(recv_cnt-send_cnt));
  60.     ACE_NEW_RETURN(mb,ACE_Message_Block(&buffer[send_cnt],remaining),-1);
  61.     int output_off=this->output_queue_.is_empty();
  62.     ACE_Time_Value nowait(ACE_OS::gettimeofday());
  63.     if(this->output_queue_.enqueue_tail(mb,&nowait)==-1)
  64.     {   
  65.         ACE_ERROR((LM_ERROR,ACE_TEXT("(%P|%t)%P;discarding data/n"),ACE_TEXT("enqueue failed ")));
  66.         mb->release();
  67.         return 0;
  68.     }
  69.     if(output_off)
  70.         return this->reactor()->register_handler(this,ACE_Event_Handler::WRITE_MASK);
  71.     return 0;
  72. }
  73. int ClientService::handle_output(ACE_HANDLE)
  74. {
  75.     ACE_Message_Block *mb;
  76.     ACE_Time_Value nowait(ACE_OS::gettimeofday());
  77.     while(0==this->output_queue_.dequeue_head(mb,&nowait))
  78.     {
  79.         ssize_t send_cnt=this->sock_.send(mb->rd_ptr(),mb->length());
  80.         if(send_cnt==-1)
  81.             ACE_ERROR((LM_ERROR,ACE_TEXT("(%P|%t)%p/n"),ACE_TEXT("send")));
  82.         else
  83.             mb->rd_ptr(ACE_static_cast(size_t,send_cnt));
  84.         if(mb->length()>0)
  85.         {
  86.             this->output_queue_.enqueue_head(mb);
  87.             break;
  88.         }
  89.         mb->release();
  90.     }
  91.     return (this->output_queue_.is_empty())?-1:0;
  92. }
  93. int ClientService::handle_close(ACE_HANDLE,ACE_Reactor_Mask mask)
  94. {
  95.     if(mask==ACE_Event_Handler::WRITE_MASK)
  96.         return 0;
  97.     mask=ACE_Event_Handler::ALL_EVENTS_MASK|ACE_Event_Handler::DONT_CALL;
  98.     this->reactor()->remove_handler(this,mask);
  99.     this->sock_.close();
  100.     this->output_queue_.flush();
  101.     delete this;
  102.     return 0;
  103. }
  104. //接受客户
  105. class ClientAccept:public ACE_Event_Handler
  106. {
  107. public:
  108.     virtual ~ClientAccept()
  109.     {
  110.             this->handle_close(ACE_INVALID_HANDLE,0);
  111.     }
  112.     int open(const ACE_INET_Addr &listen_addr);
  113.     virtual ACE_HANDLE get_handle(voidconst
  114.     {
  115.         return this->acceptor_.get_handle();
  116.     }
  117.     virtual int handle_input(ACE_HANDLE fd=ACE_INVALID_HANDLE);
  118.     virtual int handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask);
  119. protected:
  120.     ACE_SOCK_Acceptor acceptor_;
  121. };
  122. int ClientAccept::open(const ACE_INET_Addr &listen_addr)
  123. {
  124.     if(this->acceptor_.open(listen_addr,1)==-1)
  125.     {
  126.         ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("%p/n"),ACE_TEXT("acceptor.open")),-1);
  127.     }
  128.     return this->reactor()->register_handler(this,ACE_Event_Handler::ACCEPT_MASK);
  129. }
  130. int ClientAccept::handle_input(ACE_HANDLE)
  131. {
  132.     ClientService *client;
  133.     ACE_NEW_RETURN(client,ClientService,-1);
  134.     auto_ptr<ClientService>p(client);
  135.     if(this->acceptor_.accept(client->peer())==-1)
  136.         ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("%P|%T)%p/N"),ACE_TEXT("Failed to accept ")ACE_TEXT("client connection")),-1);
  137.     p.release();
  138.     client->reactor(this->reactor());
  139.     if(client->open()==-1)
  140.         client->handle_close(ACE_INVALID_HANDLE,0);
  141.     return 0;
  142. }
  143. int ClientAccept::handle_close(ACE_HANDLE,ACE_Reactor_Mask)
  144. {
  145.     if(this->acceptor_.get_handle()!=ACE_INVALID_HANDLE)
  146.     {
  147.         ACE_Reactor_Mask m=ACE_Event_Handler::ACCEPT_MASK|ACE_Event_Handler::DONT_CALL;
  148.         this->reactor()->remove_handler(this,m);
  149.     }
  150.     return 0;
  151. }
  152. int main(int argc,char *argv[])
  153. {
  154.     ACE_INET_Addr port_to_listen(50000,ACE_LOCALHOST);
  155.     ClientAccept acceptor;
  156.     acceptor.reactor(ACE_Reactor::instance());
  157.     if(acceptor.open(port_to_listen)==-1)
  158.         return 1;
  159.     ACE_Reactor::instance()->run_reactor_event_loop();
  160.     return 0;
  161. }

每个类要处理任何类型的Reactor事件的类,必须从ACE_Event_Handler派生,虽然可以用一个类控制接受和所有客户的连接,但还是创建“连接接受”和“连接服务”不同的类比较好!

1.这样可以更好的封装数据和行为,这个类接受来自客户的连接,而这是他所做的全部事情

2.代表客户的类将为客户连接提供服务

 

在针对一些I/O事件向反应器登记某个事件处理器时,反应器会把一个ACE_Event_Handler指针与一个句柄以及处理器感兴趣的I/O事件类型关联在一起!

 

当I/O事件触发时,会回调特定的句柄传给handle_input()方法的ACE_HANDLE参数

而在上面程序例子中,创建了一个clientservice实例,为每个连接使用单独的服务处理对象,所以每次接受新的连接都会得到一个新的CLIENTSERVICE实例

 

为了对要发送的数据进行排队,CLientService用一个ACE_Message_Queue,当要对稍后发送的数据进行排队时,分配一个ACE_Message_Block保存这些数据,并把它放入队列中,以备后用,如果我们无法把数据放入队列,我们就会放弃,抛弃那些数据。如果在我们尝试把余下的数据放入队列之前,输出队列是空的,我们就会再向反应器登记这个处理,这一次针对的是 WRITE事件

 

 

ACE_Message_Queue

通过在类声明是指定锁类型就可以很方便实现进程,线程安全的消息队列
ACE_Message_Queue<ACE_MT_SYNCH> message_queue_;如果程序是单线程的话,
可以ACE_Message_Queue<ACE_NULL_SYNCH> message_queue_。

 

 

ACE_Message_Block功能简介

ACE_Message_Block在Ace中用来表示消息的存放空间,可用做网络通信中的消息缓冲区,使用非常频繁,下面将在如下方简单的介绍一下ACE_Message_Block相关功能。

  1. 创建消息块
  2. 释放消息块
  3. 从消息块中读写数据
  4. 数据的拷贝
  5. 其它常用函数

1。创建消息块

创建消息块的方式比较灵活,常用的有以下几种方式 :

1。直接给消息块分配内存空间创建。

    ACE_Message_Block *mb = new ACE_Message_Block (30);

2。共享底层数据块创建。

    char buffer[100];
    ACE_Message_Block *mb = new ACE_Message_Block (buffer,30);

这种方式共享底层的数据块,被创建的消息块并不拷贝该数据,也不假定自己拥有它的所有权。在消息块mb被销毁时,相关联的数据缓冲区data将不会被销毁。这是有意义的:消息块没有拷贝数据,因此内存也不是它分配的,这样它也不应该负责销毁它。

3。通过duplicate()函数从已有的消息块中创建副本。

    ACE_Message_Block *mb = new ACE_Message_Block (30);
    ACE_Message_Block *mb2 = mb->duplicate();

这种方式下,mb2和mb共享同一数据空间,使用的是ACE_Message_Block的引用计数机制。它返回指向要被复制的消息块的指针,并在内部增加内部引用计数

4。通过clone()函数从已有的消息块中复制。

    ACE_Message_Block *mb = new ACE_Message_Block (30);
    ACE_Message_Block *mb2 = mb->clone();

clone()方法实际地创建整个消息块的新副本,包括它的数据块和附加部分;也就是说,这是一次"深拷贝"。

2。释放消息块

一旦使用完消息块,程序员可以调用它的release()方法来释放它。

  1. 如果消息数据内存是由该消息块分配的,调用release()方法就也会释放此内存。
  2. 如果消息块是引用计数的,release()就会减少计数,直到到达0为止;之后消息块和与它相关联的数据块才从内存中被移除。
  3. 如果消息块是通过共享已分配的底层数据块创建的,底层数据块不会被释放。

无论消息块是哪种方式创建的,只要在使用完后及时调用release()函数,就能确保相应的内存能正确的释放。

3。从消息块中读写数据

ACE_Message_Block提供了两个指针函数以供程序员进行读写操作,rd_ptr()指向可读的数据块地址,wr_ptr()指向可写的数据块地址,默认情况下都执行数据块的首地址。下面的例子简单了演示它的使用方法。

#include "ace/Message_Queue.h"
#include "ace/OS.h"

int main(int argc, char *argv[])
{
    ACE_Message_Block *mb = new ACE_Message_Block (30);
    ACE_OS::sprintf(mb->wr_ptr(),"%s","hello");
    ACE_OS::printf("%s/n",mb->rd_ptr ());
    mb->release();
    return 0;
}

注意:这两个指针所指向的位置并不会自动移动,在上面的例子中,函数执行完毕后,执行的位置仍然是最开始的0,而不是最新的可写位置5,程序员需要通过wr_ptr(5)函数手动移动写指针的位置。

4。数据的拷贝

一般的数据的拷贝可以通过函数来实现数据的拷贝,copy()还会保证wr_ptr()的更新,使其指向缓冲区的新末尾处。

下面的例子演示了copy()函数的用法。

    mb->copy("hello");
    mb->copy("123",4);

注意:由于c++是以'/0'作为字符串结束标志的,对于上面的例子,底层数据块中保存的是"hello/0123/0",而用ACE_OS::printf("%s/n",mb->rd_ptr ());打印出来的结果是"hello",使用copy函数进行字符串连接的时候需要注意。

5。其它常用函数

  1. length()    返回当前的数据长度
  2. next()    获取和设置下一个ACE_Message_Block的链接。(用来建立消息队列非常有用)
  3. space()    获取剩余可用空间大小
  4. size()    获取和设置数据存储空间大小。

注:ACE_NEW_RETURN的意思用new动态生成一个参数2类型的空间,并将空间的首地址副给第一个参数。如果有错误产生则将第一个参数的值设为空,并返回值RET_VAL。

export react 多个 相关内容

2011-03-16 21:35:00 jintao0704 阅读数 2080

 ACE Reactor框架:

    只要做三件事:

        1.从ACE_Event_Handler派生一个或多个类,并给各个虚回调方法增加应用特有的事件处理行为

        2.向ACE_Reactor类登记应用的事件处理对象,把每个事件处理对象与它感兴趣的事件关联起来

        3.运行ACE_Reactor事件循环

一个接受连接的例子:

1.#include <iostream> 2.#include "ace/auto_ptr.h" 3.#include "ace/log_msg.h" 4.#include "ace/inet_addr.h" 5.#include "ace/sock_acceptor.h" 6.#include "ace/reactor.h" 7.8.#include "ace/Message_Block.h" 9.#include "ace/Message_Queue.h" 10.#include "ace/SOCK_Stream.h" 11.12.#include "ace/Null_Mutex.h" 13.#include "ace/Null_Condition.h" 14.15.16.using namespace std;
17.//服务客户 18.class ClientService:public ACE_Event_Handler
19.{
20.public:
21.    ACE_SOCK_Stream &peer(void)
22.    {
23.        return this->sock_;
24.    }
25.    int open(void);
26.    virtual ACE_HANDLE get_handle(void) const27.    {
28.        return this->sock_.get_handle();
29.    }
30.    virtual int handle_input(ACE_HANDLE fd=ACE_INVALID_HANDLE);
31.    virtual int handle_output(ACE_HANDLE fd=ACE_INVALID_HANDLE);
32.    virtual int handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask);
33.34.protected:
35.    ACE_SOCK_Stream sock_;
36.    ACE_Message_Queue<ACE_NULL_SYNCH> output_queue_;
37.};
38.int ClientService::open(void)
39.{
40.    ACE_TCHAR peer_name[512];
41.    ACE_INET_Addr peer_addr;
42.    if(this->sock_.get_remote_addr(peer_addr)==0&&peer_addr.addr_to_string(peer_name,512)==0)
43.        cout<<" connection from "<<peer_name<<endl;
44.    return this->reactor()->register_handler(this,ACE_Event_Handler::READ_MASK);
45.}
46.int ClientService::handle_input(ACE_HANDLE)
47.{
48.    const size_t INPUT_SIZE=4096;
49.    char buffer[INPUT_SIZE];
50.    ssize_t recv_cnt,send_cnt;
51.    if((recv_cnt=this->sock_.recv(buffer,sizeof(buffer)))<=0)
52.    {
53.        ACE_DEBUG((LM_DEBUG,ACE_TEXT("(%P|%t) connection closed/n")));
54.        return -1;
55.    }
56.    send_cnt=this->sock_.send(buffer,ACE_static_cast(size_t,recv_cnt));
57.    if(send_cnt==recv_cnt)
58.        return 0;
59.    if(send_cnt==-1&&ACE_OS::last_error()!=EWOULDBLOCK)
60.        ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("%P|%t) %p/n"),ACE_TEXT("send")),0);
61.    if(send_cnt==-1)
62.        send_cnt=0;
63.    ACE_Message_Block *mb;
64.    size_t remaining=ACE_static_cast(size_t,(recv_cnt-send_cnt));
65.    ACE_NEW_RETURN(mb,ACE_Message_Block(&buffer[send_cnt],remaining),-1);
66.    int output_off=this->output_queue_.is_empty();
67.    ACE_Time_Value nowait(ACE_OS::gettimeofday());
68.    if(this->output_queue_.enqueue_tail(mb,&nowait)==-1)
69.    {  
70.        ACE_ERROR((LM_ERROR,ACE_TEXT("(%P|%t)%P;discarding data/n"),ACE_TEXT("enqueue failed ")));
71.        mb->release();
72.        return 0;
73.    }
74.    if(output_off)
75.        return this->reactor()->register_handler(this,ACE_Event_Handler::WRITE_MASK);
76.    return 0;
77.78.79.}
80.int ClientService::handle_output(ACE_HANDLE)
81.{
82.    ACE_Message_Block *mb;
83.    ACE_Time_Value nowait(ACE_OS::gettimeofday());
84.    while(0==this->output_queue_.dequeue_head(mb,&nowait))
85.    {
86.        ssize_t send_cnt=this->sock_.send(mb->rd_ptr(),mb->length());
87.        if(send_cnt==-1)
88.            ACE_ERROR((LM_ERROR,ACE_TEXT("(%P|%t)%p/n"),ACE_TEXT("send")));
89.        else90.            mb->rd_ptr(ACE_static_cast(size_t,send_cnt));
91.        if(mb->length()>0)
92.        {
93.            this->output_queue_.enqueue_head(mb);
94.            break;
95.        }
96.        mb->release();
97.    }
98.    return (this->output_queue_.is_empty())?-1:0;
99.}
100.int ClientService::handle_close(ACE_HANDLE,ACE_Reactor_Mask mask)
101.{
102.    if(mask==ACE_Event_Handler::WRITE_MASK)
103.        return 0;
104.    mask=ACE_Event_Handler::ALL_EVENTS_MASK|ACE_Event_Handler::DONT_CALL;
105.    this->reactor()->remove_handler(this,mask);
106.    this->sock_.close();
107.    this->output_queue_.flush();
108.    delete this;
109.    return 0;
110.}
111.//接受客户 112.class ClientAccept:public ACE_Event_Handler
113.{
114.public:
115.    virtual ~ClientAccept()
116.    {
117.            this->handle_close(ACE_INVALID_HANDLE,0);
118.    }
119.    int open(const ACE_INET_Addr &listen_addr);
120.    virtual ACE_HANDLE get_handle(void) const121.    {
122.        return this->acceptor_.get_handle();
123.    }
124.    virtual int handle_input(ACE_HANDLE fd=ACE_INVALID_HANDLE);
125.    virtual int handle_close(ACE_HANDLE handle,ACE_Reactor_Mask close_mask);
126.protected:
127.    ACE_SOCK_Acceptor acceptor_;
128.};
129.int ClientAccept::open(const ACE_INET_Addr &listen_addr)
130.{
131.    if(this->acceptor_.open(listen_addr,1)==-1)
132.    {
133.        ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("%p/n"),ACE_TEXT("acceptor.open")),-1);
134.    }
135.    return this->reactor()->register_handler(this,ACE_Event_Handler::ACCEPT_MASK);
136.}
137.int ClientAccept::handle_input(ACE_HANDLE)
138.{
139.    ClientService *client;
140.    ACE_NEW_RETURN(client,ClientService,-1);
141.    auto_ptr<ClientService>p(client);
142.    if(this->acceptor_.accept(client->peer())==-1)
143.        ACE_ERROR_RETURN((LM_ERROR,ACE_TEXT("%P|%T)%p/N"),ACE_TEXT("Failed to accept ")ACE_TEXT("client connection")),-1);
144.    p.release();
145.    client->reactor(this->reactor());
146.    if(client->open()==-1)
147.        client->handle_close(ACE_INVALID_HANDLE,0);
148.    return 0;
149.150.}
151.int ClientAccept::handle_close(ACE_HANDLE,ACE_Reactor_Mask)
152.{
153.    if(this->acceptor_.get_handle()!=ACE_INVALID_HANDLE)
154.    {
155.        ACE_Reactor_Mask m=ACE_Event_Handler::ACCEPT_MASK|ACE_Event_Handler::DONT_CALL;
156.        this->reactor()->remove_handler(this,m);
157.158.    }
159.    return 0;
160.}
161.int main(int argc,char *argv[])
162.{
163.    ACE_INET_Addr port_to_listen(50000,ACE_LOCALHOST);
164.    ClientAccept acceptor;
165.    acceptor.reactor(ACE_Reactor::instance());
166.    if(acceptor.open(port_to_listen)==-1)
167.        return 1;
168.    ACE_Reactor::instance()->run_reactor_event_loop();
169.    return 0;
170.}
每个类要处理任何类型的Reactor事件的类,必须从ACE_Event_Handler派生,虽然可以用一个类控制接受和所有客户的连接,但还是创建“连接接受”和“连接服务”不同的类比较好!

1.这样可以更好的封装数据和行为,这个类接受来自客户的连接,而这是他所做的全部事情

2.代表客户的类将为客户连接提供服务

 

在针对一些I/O事件向反应器登记某个事件处理器时,反应器会把一个ACE_Event_Handler指针与一个句柄以及处理器感兴趣的I/O事件类型关联在一起!

 

当I/O事件触发时,会回调特定的句柄传给handle_input()方法的ACE_HANDLE参数

而在上面程序例子中,创建了一个clientservice实例,为每个连接使用单独的服务处理对象,所以每次接受新的连接都会得到一个新的CLIENTSERVICE实例

 

为了对要发送的数据进行排队,CLientService用一个ACE_Message_Queue,当要对稍后发送的数据进行排队时,分配一个ACE_Message_Block保存这些数据,并把它放入队列中,以备后用,如果我们无法把数据放入队列,我们就会放弃,抛弃那些数据。如果在我们尝试把余下的数据放入队列之前,输出队列是空的,我们就会再向反应器登记这个处理,这一次针对的是 WRITE事件

 

 

ACE_Message_Queue

通过在类声明是指定锁类型就可以很方便实现进程,线程安全的消息队列
ACE_Message_Queue<ACE_MT_SYNCH> message_queue_;如果程序是单线程的话,
可以ACE_Message_Queue<ACE_NULL_SYNCH> message_queue_。

 

 

ACE_Message_Block功能简介
ACE_Message_Block在Ace中用来表示消息的存放空间,可用做网络通信中的消息缓冲区,使用非常频繁,下面将在如下方简单的介绍一下ACE_Message_Block相关功能。

1.创建消息块
2.释放消息块
3.从消息块中读写数据
4.数据的拷贝
5.其它常用函数
1。创建消息块

创建消息块的方式比较灵活,常用的有以下几种方式 :

1。直接给消息块分配内存空间创建。

    ACE_Message_Block *mb = new ACE_Message_Block (30);

2。共享底层数据块创建。

    char buffer[100];
    ACE_Message_Block *mb = new ACE_Message_Block (buffer,30);

这种方式共享底层的数据块,被创建的消息块并不拷贝该数据,也不假定自己拥有它的所有权。在消息块mb被销毁时,相关联的数据缓冲区data将不会被销毁。这是有意义的:消息块没有拷贝数据,因此内存也不是它分配的,这样它也不应该负责销毁它。

3。通过duplicate()函数从已有的消息块中创建副本。

    ACE_Message_Block *mb = new ACE_Message_Block (30);
    ACE_Message_Block *mb2 = mb->duplicate();

这种方式下,mb2和mb共享同一数据空间,使用的是ACE_Message_Block的引用计数机制。它返回指向要被复制的消息块的指针,并在内部增加内部引用计数。

4。通过clone()函数从已有的消息块中复制。

    ACE_Message_Block *mb = new ACE_Message_Block (30);
    ACE_Message_Block *mb2 = mb->clone();

clone()方法实际地创建整个消息块的新副本,包括它的数据块和附加部分;也就是说,这是一次"深拷贝"。

2。释放消息块

一旦使用完消息块,程序员可以调用它的release()方法来释放它。

1.如果消息数据内存是由该消息块分配的,调用release()方法就也会释放此内存。
2.如果消息块是引用计数的,release()就会减少计数,直到到达0为止;之后消息块和与它相关联的数据块才从内存中被移除。
3.如果消息块是通过共享已分配的底层数据块创建的,底层数据块不会被释放。
无论消息块是哪种方式创建的,只要在使用完后及时调用release()函数,就能确保相应的内存能正确的释放。

3。从消息块中读写数据

ACE_Message_Block提供了两个指针函数以供程序员进行读写操作,rd_ptr()指向可读的数据块地址,wr_ptr()指向可写的数据块地址,默认情况下都执行数据块的首地址。下面的例子简单了演示它的使用方法。

#include "ace/Message_Queue.h"
#include "ace/OS.h"

int main(int argc, char *argv[])
{
    ACE_Message_Block *mb = new ACE_Message_Block (30);
    ACE_OS::sprintf(mb->wr_ptr(),"%s","hello");
    ACE_OS::printf("%s/n",mb->rd_ptr ());
    mb->release();
    return 0;
}

注意:这两个指针所指向的位置并不会自动移动,在上面的例子中,函数执行完毕后,执行的位置仍然是最开始的0,而不是最新的可写位置5,程序员需要通过wr_ptr(5)函数手动移动写指针的位置。

4。数据的拷贝

一般的数据的拷贝可以通过函数来实现数据的拷贝,copy()还会保证wr_ptr()的更新,使其指向缓冲区的新末尾处。

下面的例子演示了copy()函数的用法。

    mb->copy("hello");
    mb->copy("123",4);

注意:由于c++是以'/0'作为字符串结束标志的,对于上面的例子,底层数据块中保存的是"hello/0123/0",而用ACE_OS::printf("%s/n",mb->rd_ptr ());打印出来的结果是"hello",使用copy函数进行字符串连接的时候需要注意。

5。其它常用函数

1.length()    返回当前的数据长度
2.next()    获取和设置下一个ACE_Message_Block的链接。(用来建立消息队列非常有用)
3.space()    获取剩余可用空间大小
4.size()    获取和设置数据存储空间大小。
注:ACE_NEW_RETURN的意思用new动态生成一个参数2类型的空间,并将空间的首地址副给第一个参数。如果有错误产生则将第一个参数的值设为空,并返回值RET_VAL。

 

本文来自CSDN博客,转载请标明出处:http://blog.csdn.net/jacklam200/archive/2008/12/05/3455481.aspx

export react 多个 相关内容

2011-03-03 11:23:00 jintao0704 阅读数 513

ACE Reactor框架设计的目标是,实现一种灵活的事件处理机制,使应用无需为了满足事件处理的需要而编写平台相关的中心代码。使用Reactor框架,应用要实现其事件处理只需要做三件事情。

ONE:从 ACE_Event_Handler 派生一个或多个类,并给各个虚回调方法增加应用特有的事件处理行为。
#include <ace/Event_Handler.h>
//...
class Service : public ACE_Event_Handler
{
/// Called when input events occur (e.g., connection or data).
virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);

/// Called when output events are possible (e.g., when flow control
/// abates or non-blocking connection completes).
virtual int handle_output (ACE_HANDLE fd = ACE_INVALID_HANDLE);

/// Called when an exceptional events occur (e.g., SIGURG).
virtual int handle_exception (ACE_HANDLE fd = ACE_INVALID_HANDLE);

/**
   * Called when timer expires. <current_time> represents the current
   * time that the <Event_Handler> was selected for timeout
   * dispatching and <act> is the asynchronous completion token that
   * was passed in when <schedule_timer> was invoked.
   */
virtual int handle_timeout (const ACE_Time_Value &current_time,
                              const void *act = 0);


/// Called when a process exits.
virtual int handle_exit (ACE_Process *);

/// Called when a <handle_*()> method returns -1 or when the
/// <remove_handler> method is called on an ACE_Reactor. The
/// <close_mask> indicates which event has triggered the
/// <handle_close> method callback on a particular @a handle.
virtual int handle_close (ACE_HANDLE handle,
                            ACE_Reactor_Mask close_mask);


/// Called when object is signaled by OS (either via UNIX signals or
/// when a Win32 object becomes signaled).
virtual int handle_signal (int signum, siginfo_t * = 0, ucontext_t * = 0);

virtual int handle_qos (ACE_HANDLE = ACE_INVALID_HANDLE);
virtual int handle_group_qos (ACE_HANDLE = ACE_INVALID_HANDLE);

/// Get the I/O handle.
virtual ACE_HANDLE get_handle (void) const;

//根据应用选择性的重载

}


TWO:向 ACE_Reactor 类登记应用的事件处理对象,把每个事件处理对象与它感兴趣的事件关联起来。
先看ACE_Reactor(<ace/Reactor.h>)成员register_handler一个声明(还有其它几个传不同参数):
/**
   * Register handler for I/O events.
   *
   * A handler can be associated with multiple handles. A handle
   * cannot be associated with multiple handlers.
   *
   * The handle will come from ACE_Event_Handler::get_handle().
   *
   * Reactor will call ACE_Event_Handler::add_reference() for a new
   * handler/handle pair.
   *
   * If this handler/handle pair has already been registered, any new
   * masks specified will be added. In this case,
   * ACE_Event_Handler::add_reference() will not be called.
   *
   * If the registered handler is currently suspended, it will remain
   * suspended. When the handler is resumed, it will have the
   * existing masks plus any masks added through this call. Handlers
   * do not have partial suspensions.
   */
virtual int register_handler (ACE_Event_Handler *event_handler,
                                ACE_Reactor_Mask mask);

ACE_Reactor_Mask声明于 <ace/Event_Handler.h> 中:
enum
{
    LO_PRIORITY = 0,
    HI_PRIORITY = 10,
    NULL_MASK = 0,
#if defined (ACE_USE_POLL)
    READ_MASK = POLLIN,
    WRITE_MASK = POLLOUT,
    EXCEPT_MASK = POLLPRI,
#else /* USE SELECT */
    READ_MASK = (1 << 0),
    WRITE_MASK = (1 << 1),
    EXCEPT_MASK = (1 << 2),
#endif /* ACE_USE_POLL */
    ACCEPT_MASK = (1 << 3),
    CONNECT_MASK = (1 << 4),
    TIMER_MASK = (1 << 5),
    QOS_MASK = (1 << 6),
    GROUP_QOS_MASK = (1 << 7),
    SIGNAL_MASK = (1 << 8),
    ALL_EVENTS_MASK = READ_MASK |
                      WRITE_MASK |
                      EXCEPT_MASK |
                      ACCEPT_MASK |
                      CONNECT_MASK |
                      TIMER_MASK |
                      QOS_MASK |
                      GROUP_QOS_MASK |
                      SIGNAL_MASK,
    RWE_MASK = READ_MASK |
               WRITE_MASK |
               EXCEPT_MASK,
    DONT_CALL = (1 << 9)
};

你可在 Service 类中,把每个事件处理对象与它感兴趣的事件关联起来:
    this->reactor()->register_handler(this, ACE_Event_Handler::READ_MASK);//关联virtual int handle_input (ACE_HANDLE fd = ACE_INVALID_HANDLE);回调函数。

向 ACE_Reactor 类登记应用的事件处理对象:
    Service service;

    service.reactor(ACE_Reactor::instance());


THREE:运行 ACE_Reactor 事件循环。

    ACE_Reactor::instance()->run_reactor_event_loop();
    //还有其它的几个循环函数,还没有具体研究

可以看出,我们用的是一个 ACE_Reactor 单件,由 ACE_Object_Manager 管理它。

export react 多个 相关内容

2010-10-14 14:31:00 sws9999 阅读数 807

   为了处理多个I/O源,比如多个网络连接,许多传统的应用程序都要创建新进程或新线程。但我们可以通过另外的途径解决,即反应式模型(recative model),其基础是事件多路分离器,比如select()、poll()或WaitMultipleObjects()系统函数,这些优秀的函数允许 我们使用一个进程或者线程,就能处理许多时间。但是编写使用这些函数的可移植性应用相当富有挑战性,而这正是ACE Reactor框架可以帮助我们的地方。

主要的类有:

1 ACE_Reactor

2ACE_Event_Handler

3ACE_Time_Value

4ACE_Sig_Set

5ACE_Acceptor

6ACE_Connector

7ACE_Svc_Handler

Reactor框架最常见的用途是,处理来自多个来源的I/O。前些文章中简单服务器,它只能一次处理一个连接上的请求,我们将用Reactor框架,处理多个连接。

export react 多个 相关内容

没有更多推荐了,返回首页