2015-07-11 09:28:03 sunning9001 阅读数 611
 在《java NIO》作者PPT《How to Build a Scalable Multiplexed Server With NIO》 和 Doug Lea 《Scalable IO in Java》PPT中
 都有java nio的实现是通过reactor pattern 来实现的有说明。java nio作为一种跨平台IO操作。在不同平台上面封装了对应平台的IO模型。
 在reactor pattern 作者中已经提及,通过reactor pattern 模式可以来实现跨平台操作。所以,java nio通过reactor pattern模式就是这样完成的。
 java nio在window 平台下面是使用Select 模型。对于java nio源代码的分析,对于如果理解reactor pattern的设计模式意义不大。因为java nio实现中

 需要对JNI的封装。如果要了解对于不同平台的封装,可以通过ZThead库来了解会有更大的意义。这样可以避免对JNI 的干扰。因为JNI涉及到脚本语言java和C/C++交互的知识。

How to Build a Scalable Multiplexed Server With NIO

Reactor Pattern Mapped to NIO
Handle
SelectionKey
Event
SelectionKey.OP_READ, etc
Demultiplexer
Selector
Dispatcher
Selector.select() + iterate Selector.selectedKeys()
Handler
An instance of Runnable or Callable

最简单例子:TestReactor.java


public class TestReactor
{
	
   public static void main(String[] args) throws Exception 
   {
	 
	  //创建serversocketchannel通道.
	  ServerSocketChannel serversocketchannel =ServerSocketChannel.open();
	  //设置非阻塞,异步模式
	  serversocketchannel.configureBlocking(false);
	  //关联的serversocket
	  ServerSocket serversocket = serversocketchannel.socket();
	  SocketAddress endpoint =new InetSocketAddress("127.0.0.1", 8888);
	  //绑定指定的端口
	  serversocket.bind(endpoint);
	  //创建Selector。在Reactor Pattern模式中,相当于Demultiplexer 作用,用来多路复用器
	  Selector sel = Selector.open();
	  //在select中注册链接事件。
	  //在reactor 模式中SelectionKey 相当于event事件。
	  //在SectionKey中存在OP_READ,OP_WRITE,OP_CONNECT,OP_ACCEPT 事件类型。此时与OP_ACCEPT 关联的Channel为ServerSocketChannel
	   SelectionKey selKey = serversocketchannel.register(sel, SelectionKey.OP_ACCEPT);
        while(true)
        {      
        	    //进行阻塞操作,等待事件的到来。返回值在select 模型中表示完成操作的数目
                int selCount = sel.select();
                if(selCount>0)
                {
                	System.out.println("selCount=>>"+selCount);
                }
                //返回可以操作的键集合。在window select 模型中,返回可以操作的fd_set集合
        	    Set<SelectionKey> selKeySet = sel.selectedKeys();
        	    for(SelectionKey key:selKeySet)
        	    {
        	     
        	    	//在SelectionKey中,存在链接可以接受事件,则调用accept()函数就不会存在阻塞现象。
        	    	//select
        	    	if(key.isAcceptable())
        	    	{
        	    		 //获取与SelectionKey.OP_ACCEPT关联的通道。即ServerSocketChannel.
        	    		 ServerSocketChannel serverChannel = (ServerSocketChannel)key.channel();
        	    		 //调用ServerSocketChannel 不会发生阻塞。获取到客户链接
        	    		 SocketChannel socketchannel = serverChannel.accept();
        	    		 //设置阻塞模式
        	    		 socketchannel.configureBlocking(false);
        	    		 //关联SocketChannel的读和写事件
        	    		 socketchannel.register(sel, SelectionKey.OP_READ|SelectionKey.OP_WRITE);
        	    		 //同时可以在SelectionKey中关联其他对象。在Select 模式中,Selectionkey 相当于Completionkey参数
        	    	}
        	    	if(key.isWritable())
        	    	{
        	    		 SocketChannel socketchannel = (SocketChannel)key.channel();
        	    		 ByteBuffer src =ByteBuffer.allocate(100);
        	    		 src.putInt(100);
        	    		 src.flip();
						 socketchannel.write(src);
						 //关联SocketChannel的读和写事件
        	    		 socketchannel.register(sel, SelectionKey.OP_READ);
        	    		 //同时可以在SelectionKey中关联其他对象。在Select 模式中,Selectionkey 相当于Completionkey参数
        	    		
        	    	}
        	    	if(key.isReadable())
        	    	{
        	    		 SocketChannel socketchannel = (SocketChannel)key.channel();
        	    		 InetSocketAddress remote = (InetSocketAddress)socketchannel.getRemoteAddress();
        	    		 String remotestring = remote.getHostString()+remote.getPort();
        	    		 //关联SocketChannel的读和写事件
        	    		 socketchannel.register(sel, SelectionKey.OP_WRITE);
        	    		 //同时可以在SelectionKey中关联其他对象。在Select 模式中,Selectionkey 相当于Completionkey参数
        	    	}
        	    	selKeySet.remove(key);
        	    }
        	   
        }
        
	  
   }
}



2018-08-17 00:55:46 bumingchun 阅读数 638

Reactor模式是网络编程中常用的模式,著名的libevent网络库就是采用了Reactor模式。最近通过阅读Douglas C. Schmidt 1995年写的关于Reactor模式的论文,理解了Reactor模式,下面是我通过阅读论文的感悟和复现论文中日志服务器的例子程序源码。
该论文讲解透彻,点此下载,建议阅读
Reactor模式常用于同步IO模型,本文中以select()为例进行介绍。

一、Reactor模式构成

Reactor类图
Reactor类图如上所示,有以下几个部分构成
Handles(句柄):唯一表示系统管理资源。例如文件描述符(file descriptor, fd)。
Synchronous Event Demultiplexer(同步事件复用器):阻塞等待一组Handle上event发生,例如select(),poll()等。
Initiation Dispatcher(分发器):定义注册(registering)事件回调对象(Event Handlers),移除(removing)事件回调对象(Event Handlers)和分发(dispatch) 事件。Demultiplexer负责等待事件发生,当时间发生后,Demultiplexer通知Dispatcher回调具体的Event Handlers。
Event Handler(事件回调抽象类):事件回调的接口。
Concrete Event Handler(具体事件回调类):事件回调的具体处理逻辑。在分布式日志服务器中有两个Concrete Event Handler:分别是Logging Handler 和 Logging Acceptor,Logging Handler负责接受和处理日志,Logging Acceptor负责接受连接并创建Logging Handler。

分布式日志服务器

二、Reactor优缺点

Reactor模式的优点:
(1) 业务处理逻辑和复用器(Demultiplex)、调度器(dispatch)分离
(2) 提高了事件驱动(event-driven)应用程序的模块化,复用性。
(3) 通过了粗粒度的并发控制。Reactor模式中同一个dispatcher串行(serialize)调用事件回调对象,这样避免了复杂的同步控制。
Reactor模式的缺点:
(1) 非抢占式(Non-preemptive) Event Handlers中不能产生长时间阻塞,这样会阻塞其他Handles。

Reactor论文链接 >https://www.cse.wustl.edu/~schmidt/PDF/reactor-siemens.pdf

三、代码逻辑及源代码

代码实现了分布式日志服务器,有以下几个类构成。
Initiation_Dispatcher类是分发器。
Event_Handler类是事件回调的抽象接口。
Logging_Acceptor类负责接受连接并创建Logging Handler对象。
Logging_Handler类复杂负责接受日志并输出到标准输出。

Initiation_Dispatcher.h

#ifndef INITIATION_DIAPATCH_HH
#define INITIATION_DIAPATCH_HH
#include <iostream>
#include <vector>
#include <stdlib.h>
#include <string>
#include <time.h>
#include <assert.h>
#include <sys/socket.h>
#include <unordered_map>
#include "SingletonTemplate.h"
#include "Event_Handler.h"

using namespace std;

class Initiation_Dispatcher
{
  public:
    int register_handler(Event_Handler *eh, Event_Type et);
    int remove_handler(Event_Handler *eh, Event_Type et);
    int handler_events(time_t *timeout = 0);

  private:
    int maxfdpl;
    fd_set readset;
    unordered_map<int, Event_Handler*> ReadHandleMap;
    unordered_map<int, Event_Handler*> AcceptHandleMap;
};
#endif

Initiation_Dispatcher.cpp

#include "Initiation_Dispatcher.h"

int Initiation_Dispatcher::register_handler(Event_Handler *eh, Event_Type et)
{
    if (et == ACCEPT_EVENT)
    {
        AcceptHandleMap.insert(make_pair(eh->get_handle(), eh));
    }
    else if (et == READ_EVENT)
    {
        ReadHandleMap.insert(make_pair(eh->get_handle(), eh));
    }
}

int Initiation_Dispatcher::remove_handler(Event_Handler *eh, Event_Type et)
{
    if (et == ACCEPT_EVENT)
    {
        AcceptHandleMap.erase(eh->get_handle());
    }
    else if (et == READ_EVENT)
    {
        ReadHandleMap.erase(eh->get_handle());
    }
}

int Initiation_Dispatcher::handler_events(time_t *timeout)
{
    // 清除标志位
    FD_ZERO(&readset);
    maxfdpl = -1;
    unordered_map<int, Event_Handler *>::iterator iter;
    for (iter = AcceptHandleMap.begin(); iter != AcceptHandleMap.end(); ++iter)
    {
        FD_SET(iter->first, &readset);
        if(maxfdpl < iter->first)
        {
            maxfdpl = iter->first;
        }
    }
    for (iter = ReadHandleMap.begin(); iter != ReadHandleMap.end(); ++iter)
    {
        FD_SET(iter->first, &readset);
        if(maxfdpl < iter->first)
        {
            maxfdpl = iter->first;
        }
    }

    maxfdpl += 1;

    select(maxfdpl, &readset, NULL, NULL, NULL);

    for (iter = AcceptHandleMap.begin(); iter != AcceptHandleMap.end(); ++iter)
    {
        if(FD_ISSET(iter->first, &readset))
        {
            iter->second->handle_event(ACCEPT_EVENT);
        }
    }
    for (iter = ReadHandleMap.begin(); iter != ReadHandleMap.end(); ++iter)
    {
        if(FD_ISSET(iter->first, &readset))
        {
            iter->second->handle_event(READ_EVENT);
        }
    }
}

Event_Handler.h

#ifndef EVENT_HANDLE__HH
#define EVENT_HANDLE__HH
#include <sys/select.h>
#include <sys/time.h>
#include <string>
#include <iostream>
#include <vector>
#include <forward_list>
#include <algorithm>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#include <errno.h>
#include <malloc.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/ioctl.h>
#include <stdarg.h>
#include <fcntl.h>
using namespace std;

enum Event_Type
{
    ACCEPT_EVENT = 01,
    READ_EVENT = 02
};

class SOCK_Stream
{
  public:
    int& get_handle()
    {
        return fd;
    }
    int recv(char * buff, int length)
    {
        int bytes_read = read(fd, buff, length);
        return bytes_read;
    }
    void Close()
    {
        close(fd);
    }
  private:
    int fd;    
};

class Event_Handler
{
  public:
    virtual int handle_event(Event_Type et) = 0;
    virtual int get_handle(void) = 0;
};


class SOCK_Acceptor
{
  public:
    SOCK_Acceptor(short &port)
    {
        listenfd = socket(AF_INET, SOCK_STREAM, 0);
        bzero(&servaddr, sizeof(servaddr));
        servaddr.sin_family = AF_INET;
        servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
        servaddr.sin_port = htons(port);
        bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
        listen(listenfd, 10);
    }
    int Accept(SOCK_Stream &new_connection)
    {
        //int accept(int socket, struct sockaddr *restrict address, socklen_t *restrict address_len);

        // 有新的连接,处理
        socklen_t len = sizeof(cliaddr);
        connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &len);
        new_connection.get_handle() =  connfd; 

        cout << "Accept Client success" << endl;
        return new_connection.get_handle();
    }

    int get_handle()
    {
        return listenfd;
    }

  private:
    int listenfd, connfd;
    struct sockaddr_in servaddr, cliaddr;
};
#endif

Logging_Acceptor.h

#ifndef LOGGING_ACCEPTOR_HH
#define LOGGING_ACCEPTOR_HH

#include "Event_Handler.h"
#include "SingletonTemplate.h"

class Logging_Acceptor : public Event_Handler
{
  public:
    Logging_Acceptor(short &port);
    virtual int handle_event(Event_Type et);
    virtual int get_handle(void)
    {
        return acceptorObj.get_handle();
    }

  private:
    SOCK_Acceptor acceptorObj;
};
#endif

Logging_Acceptor.cpp

#include "Logging_Acceptor.h"
#include "Initiation_Dispatcher.h"
#include "Logging_Handler.h"
Logging_Acceptor::Logging_Acceptor(short &port) : acceptorObj(port)
{
    singleton<Initiation_Dispatcher>::instance().register_handler(this, ACCEPT_EVENT); // 向 Initiation Dispatcher 注册
}

int Logging_Acceptor::handle_event(Event_Type et)
{
    // Can only be called for an ACCEPT event.
    assert(et == ACCEPT_EVENT);
    SOCK_Stream new_connection;
    acceptorObj.Accept(new_connection);
    Logging_Handler *handler = new Logging_Handler(new_connection);
}

Logging_Handler.h

#ifndef LOGGING_HANDLER_HH
#define LOGGING_HANDLER_HH

#include "Event_Handler.h"
#include "Initiation_Dispatcher.h"

#define MAXLINE 1000
// the logging handler class
class Logging_Handler : public Event_Handler
{
  public:
    Logging_Handler(SOCK_Stream &cs);
    virtual int handle_event(Event_Type et);

    virtual int get_handle(void)
    {
        return peer_stream_.get_handle();
    }

  private:
    SOCK_Stream peer_stream_;

    char buff[MAXLINE];
};

#endif

Logging_Handler.cpp

#include "Logging_Handler.h"
#include "SingletonTemplate.h"

Logging_Handler::Logging_Handler(SOCK_Stream &cs)
    : peer_stream_(cs)
{
    // Register with the dispatcher for
    // READ events.
    singleton<Initiation_Dispatcher>::instance().register_handler(this, READ_EVENT);
}

int Logging_Handler::handle_event(Event_Type et)
{
    if (et == READ_EVENT)
    {
        int bytes_recv = peer_stream_.recv(buff, sizeof(buff));

        if (bytes_recv == 0)
        {
            // 表示socket关闭
            cout << "fd = " << peer_stream_.get_handle() << "Closed." << endl;
            peer_stream_.Close();
            singleton<Initiation_Dispatcher>::instance().remove_handler(this, READ_EVENT);
            delete this;
        }
        else if(bytes_recv > 0)
        {
            // 将接受到的日志标准输出
            cout << peer_stream_.get_handle() << "Log:" << buff << endl;
            // 清空 buff
            bzero(buff, sizeof(buff));
        }
    }
}

SingletonTemplate.h

#pragma once

template <typename T>
struct singleton {
private:
  struct object_creator {
    object_creator() {
      singleton<T>::instance();
    } // 初始化create_object时,初始化singleton<T>
    inline void do_nothing() const {
    }
  };
  static object_creator create_object;
  singleton();

public:
  typedef T object_type;
  static object_type &instance() {
    static object_type obj;
    create_object.do_nothing(); // 使用create_object,使得编译器一定会初始化create_object
    return obj;
  }
};

template <typename T>
typename singleton<T>::object_creator singleton<T>::create_object;

main.cpp


#include "Initiation_Dispatcher.h"
#include "Logging_Acceptor.h"
int main()
{
    short port = 8888;
    Logging_Acceptor AcceptorObj(port);
    for(;;)
    {
        singleton<Initiation_Dispatcher>::instance().handler_events();
    }
    return 0;
}
2018-04-02 14:38:06 u013828625 阅读数 315

An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events

一个对于分离和分发同步事件句柄的对象行为模式

1 Intent/意图

The Reactor design pattern handles service requests that are delivered concurrently to an application by one or moreclients. Each service in an application may consist ofserveral methods and is represented by a separate event handlerthat is responsible for dispatching service-specific requests.Dispatching of event handlers is performed by an initiationdispatcher, which manages the registered event handlers.Demultiplexing of service requests is performed by asynchronous event demultiplexer.

Reactor设计模式处理来个一个或多个客户端的向一个应用同时发起的服务请求。应用中的每个服务可能由少数几个方法组成,通过一个单独的事件处理器表现出来,这个事件处理器负责分发特定的服务请求。事件处理器的分发是通过一个初始化的分发器(initiation dispatcher)进行,负责管理注册其上的事件处理器。分离服务请求是由异步事件分离器进行的。

2 Also Known As

Dispatcher, Notifier

分发器,通知器

3 Example

To illustrate the Reactor pattern, consider the event-driven server for a distributed logging service shown in Figure 1.Client applications use the logging service to record information about their status in a distributed environment. This status information commonly includes error notifications, debugging traces, and performance reports. Logging records are sent to a central logging server, which can write there cords to various output devices, such as a console, a printer,a file, or a network management database.

为了阐述reactor模式,考虑使用分布式的日志服务(在下图1)。客户端应用程序使用这个日志服务来记录他们在分布式环境中的状态信息。这些状态信息通常包括错误通知,调试栈以及性能报告。这些日志记录会被发送到一个中央的日志服务器上,它可以将这些记录写到多个输出设备上,像控制台,打印机,文件,网络管理数据库。

The logging server shown in Figure 1 handles logging records and connection requests sent by clients. Logging records and connection requests can arrive concurrently on multiple handles. A handle identifies network communication resources managed within an OS.

图一中的日志服务器处理通过客户端发送日志记录以及连接请求。日志记录和连接请求会同时到达多个handle上,一个handle定义了在操作系统内部管理的网络通信资源。



The logging server communicates with clients using a connection-oriented protocol, such as TCP [1]. Clients that want to log data must first send a connection request to the server. The server waits for these connection requests using a handle factory that listens on an address known to clients.When a connection request arrives, the handle factory establishes a connection between the client and the server by creating a new handle that represents an endpoint of the connection.This handle is returned to the server, which then waits for client service requests to arrive on the handle. Once clients are connected, they can send logging records concurrently to the server. The server receives these records via the connected socket handles.

日志服务器和客户端使用一种面向连接的协议进行通信,例如Tcp。客户端想要发送日志数据,首先必须发送一个连接请求给服务器。服务器使用一个handle工厂来监听已经的客户端地址,然后等待连接请求的到来。当一个连接请求到达时,这个handle工厂就会通过一种创建新的handle来建立一个客户端与服务器之间的链接,这个handle表示了一个连接到端点。这个handle是被服务器返回的,然后会等待客户端服务请求到达这个handle。一旦客户端连接上了,他们就能够同时的发送日志记录给服务器,服务器通过已经连接的socket handle收到这些记录。

Perhaps the most intuitive way to develop a concurrent logging server is to use multiple threads that can process multiple clients concurrently, as shown in Figure 2. This approach synchronously accepts network connections and spawns a “thread-per-connection” to handle client logging records.

大概开发一个并发的日志服务器最直接的方式就是使用多线程的方式,它能够并发地处理多个客户端(图二),这种方法同步接收网络连接,然后产生(一个线程对应一个连接)来处理客户端的日志记录。

However, using multi-threading to implement the processing of logging records in the server fails to resolve the following forces:

  • Efficiency: Threading may lead to poor performance due to context switching, synchronization, and data movement[2];
  • Programming simplicity: Threading may require complex concurrency control schemes;
  • Portability: Threading is not available on all OS platforms.

As a result of these drawbacks, multi-threading is often not the most efficient nor the least complex solution to develop a concurrent logging server.

但是,在服务器上使用多线程来实现日志记录的处理不能解决下面的问题:

  • 性能:线程可能会导致极低的性能,因为上下文切换,同步以及数据迁移;
  • 编程简单:线程可能会要求复杂的并发控制方案;
  • 可移植性:线程可能并不适用所有的操作系统;

因为这些缺陷,多线程通常不是最有效率,也不是最小的复杂度解决方案,来开发一个并发的日志服务器。

4 Context

A server application in a distributed system that receives events from one or more clients concurrently.

在分布式系统中的一个服务应用程序会同时接收一个或者多个客户端的事件。

5 Problem

Server applications in a distributed system must handle multiple clients that send them service requests. Before invokinga specific service, however, the server application must demultiplex and dispatch each incoming request to its corresponding service provider. Developing an effective server mechanisms for demultiplexing and dispatching client requests requires the resolution of the following forces:

在分布式系统中的服务应用程序必须处理多个发送服务请求的客户端。但是在调用特定服务之前,服务应用程序必须分离和分发每个进来的请求到它的对应的服务提供者。开发一个有效率的服务器机制来分离和分发客户端请求要求下面的问题的解决方案

  • Availability: The server must be available to handle incoming requests even if it is waiting for other requests to arrive.In particular, a server must not block indefinitely handling any single source of events at the exclusion of other event sources since this may significantly delay the responseness to other clients.
  • Efficiency: A server must minimize latency, maximize throughput, and avoid utilizing the CPU(s) unnecessarily.
  • Programming simplicity: The design of a server should simplify the use of suitable concurrency strategies.
  • Adaptability: Integrating new or improved services,such as changing message formats or adding server-side caching, should incur minimal modifications and maintenance costs for existing code. For instance, implementing new application services should not require modifications to the generic event demultiplexing and dispatching mechanisms.
  • Portability: Porting a server to a new OS platform should not require significant effort.
  • 可用性:服务器必须是可用的,即使在它等待其他请求的到来的时候,也能够处理进来的请求。另外,服务器在处理任何单个的事件源时,一定不能够无限期的阻塞,把其他事件源排除在外。这样会显著的延迟发给其他客户端的额响应
  • 性能:一个服务器必须是尽可能最低的延迟,尽可能最大的吞吐量,避免不必要的CPU利用。
  • 编程简单:这个服务器的设计应该简化合适的并发策略的使用。
  • 可扩展性:集成一个新的或者改进的服务,例如改变消息格式,添加服务器端缓存,应该针对现有的代码引发最小的改动和维护成本。举个例子,实现一个新的应用服务应该不需要修改通用的时间分离和分发机制。
  • 可移植性:移植一个服务器到一个新的操作系统上应该不需要做出重大的改动。

6 Solution

Integrate the synchronous demultiplexing of events and the dispatching of their corresponding event handlers that process the events. In addition, decouple the application specific dispatching and implementation of services from the general-purpose event demultiplexing and dispatching mechanisms.

集成同步的事件分离器以及处理这些事件的事件处理器的调度。除此之外,多路事件分离和调度机制解耦了特定于应用程序的调度与服务实现。

For each service the application offers, introduce a separate Event Handler that processes certain types of events. All Event Handlers implement the same interface.Event Handlers register with an InitiationDispatcher, which uses a Synchronous EventDemultiplexer to wait for events to occur. When events occur, the Synchronous Event Demultiplexer notifies the Initiation Dispatcher, which synchronously calls back to the Event Handler associated with the event. The Event Handler then dispatches the event to the method that implements the requested service.

对于每个服务,应用程序提供,介绍了一个单独的事件处理器,用于处理某些事件类型。所有的事件处理器都实现了相同的接口。事件处理器(Event Handler)通过注册到一个初始化的分发器(InitiationDispatcher)当中,这个InitiationDispatcher使用了同步事件分离器(Synchronous Event Demultiplexer)来等待事件的发生。当事件发生时,这个同步事件分离器(Synchronous Event Demultiplexer)会通知初始化分发器(InitiationDispatcher),初始化分发器(InitiationDispatcher)会同步地回调与这个时间相关联的事件处理器(Event Handler)。这个事件处理器(Event Handler)然后会分发这个事件到实现了请求服务的方法当中。

7 Structure

The key participants in the Reactor pattern include the following:

在Reactor模式中的关键角色:

Handles

  • Identify resources that are managed by an OS.These resources commonly include network connections,open files, timers, synchronization objects, etc.Handles are used in the logging server to identify socket endpoints so that a Synchronous EventDemultiplexer can wait for events to occur on them. The two types of events the logging server is interested in are connection events and read events, which represent incoming client connections and logging data,respectively. The logging server maintains a separate connection for each client. Every connection is represented in the server by a socket handle.

句柄

  • 可以看作是操作系统管理的资源。这些资源通常包括网络连接,打开的文件,定时器,同步对象等等。句柄(handles)在这个日志服务器中用来识别socket断点,以便同步事件分离器(Synchronous EventDemultiplexer)能够在他们上面等待事件发生。这个日志服务器的两种事件类型是连接事件和读事件,直观地展示了进来的客户端连接和日志数据。日志服务器为每个客户端都保持了独立的链接,每个连接在服务器上通过一个socket handle来进行表示。

Synchronous Event Demultiplexer

  • Blocks awaiting events to occur on a set of Handles.It returns when it is possible to initiate an operation on a Handle without blocking. A common demultiplexerfor I/O events is select [1], which is an event demultiplexing system call provided by the UNIX and Win32 OS platforms. The select call indicates which Handles can have operations invoked on them synchronously without blocking the application process.

同步事件分离器

  • 在一个handle集合上阻塞地等待事件发生,当它有可能在一个handle 上开始了一个操作时,它就会返回,结束阻塞。一个常用的I/O事件分离器就是select,它是一个被UNIX和window平台提供的事件分离系统调用,这个select能够指明了在哪个handles上有操作能够被进行同步调用了,不会去阻塞应用进程。

Initiation Dispatcher

  • Defines an interface for registering, removing, and dispatching Event Handlers. Ultimately, the Synchronous Event Demultiplexer is responsible for waiting until new events occur. When it detects new events, it informs the InitiationDispatcher to call back application-specific event handlers. Common events include connection acceptance events, data input and output events, and timeout events.

初始化分发器(初始化的调度器)

  • 定义了一个注册、移除,以及调度事件处理器的接口,从根本上讲,同步事件分离器(Synchronous Event Demultiplexer)是负责等待一个新的事件发生。当它发现了一个新的事件之后,它会通知初始化的调度器(InitiationDispatcher)来回调特定于应用的事件处理器。普通事件包括连接接收事件,数据输入输出事件,超时事件。

Event Handler

  • Specifies an interface consisting of a hook method [3]that abstractly represents the dispatching operation for service-specific events. This method must be implemented by application-specific services.

事件处理器

  • 指一个由回调钩子方法组成的接口,能够抽象地表示面向服务的事件的调度操作。这个方法必须被面向应用的服务所实现。

Concrete Event Handler

  • Implements the hook method, as well as the methods to process these events in an application-specific manner. Applications register Concrete EventHandlers with the Initiation Dispatcher to process certain types of events. When these events arrive,the Initiation Dispatcher calls back the hook method of the appropriate Concrete EventHandler.

    There are two Concrete Event Handlers in the logging server: Logging Handler and Logging Acceptor. The Logging Handler is responsiblefor receiving and processing logging records. The Logging Acceptor creates and connects LoggingHandlers that process subsequent logging records from clients.

具体的事件处理器

  • 实现了回调钩子方法,也就是以一种面向应用的方法处理这些事件的方法。应用程序注册具体的事件处理器(Concrete EventHandlers)到初始化调度器(Initiation Dispatcher)上,用来处理某一类事件。当这些事件到达时,初始化调度器(Initiation Dispatcher)就会回调合适的具体的事件处理器(Concrete EventHandler)中的钩子方法。
  • 在日志服务器上有两种具体事件处理器(Concrete Event Handlers):日志处理器(Logging Handler)和日志接收器(Logging Acceptor)。日志处理器(Logging Handler)是用来接收和处理日志记录的。日志接收器(Logging Acceptor)是用来创建和连接日志处理器(Logging Handler),处理后面来自客户端的日志记录。

The structure of the participants of the Reactor pattern is illustrated in the following OMT class diagram:

Reactor模式的各个角色的架构在下面的OMT类图中进行阐述:


8 Dynamics

8.1 General Collaborations(通常的协作)

The following collaborations occur in the Reactor pattern:

  • When an application registers a Concrete Event Handler with the Initiation Dispatcher the application indicates the type of event(s) this Event Handler wants the Initiation Dispatcher to notify it about when the event(s) occur on the associated Handle.
  • 当一个应用程序将一个具体的事件处理器(Concrete Event Handle)注册到初始化调度器(Initiation Dispatcher)上时,这个应用会指定事件类型,当相关的句柄(handle)上发生这个类型的事件时,事件处理器( Event Handler)想要初始化调度器(Initiation Dispatcher)通知它。
  • The Initiation Dispatcher requests each Event Handler to pass back its internal Handle. This Handle identifies the Event Handler to the OS.
  • 初始化调度器( Initiation Dispatcher)要求每个事件处理器( Event Handler )回传它内部的句柄(handle)。这个句柄(handle)向操作系统标识了事件处理器(Event Handler)。
  • After all Event Handlers are registered, an application calls handle_events to start the Initiation Dispatcher's event loop. At this point, the Initiation Dispatcher combines the Handle from each registeredEvent Handler and uses the Synchronous Event Demultiplexer to wait for events to occur on these Handles. For instance, the TCP protocol layer uses the select synchronous event demultiplexing operation to wait for client logging record events to arrive on connected socket Handles.
  • 在所有的事件处理器(Event Handlers)都注册完毕之后,应用程序会调用handle_events 方法来开始这个初始化调度器( Initiation Dispatcher)的事件循环。在这个点上,初始化调度器(Initiation Dispatcher )组合了从每个已经注册的事件处理器(Event Handler)上回传的handle,并且使用同步事件分离器(Synchronous Event Demultiplexer)来等待在这些句柄(handles)上事件的发生。例如,TCP协议层使用select 同步事件分离器操作来等待客户端到达已连接的套接字(socket)句柄的日志记录事件。
  • The Synchronous Event Demultiplexer notifies the Initiation Dispatcher when a Handle corresponding to an event source becomes “ready,” e.g., that a TCP socket is “ready for reading.”
  • 当一个句柄(handle)变成准备状态,同步事件分离器(Synchronous Event Demultiplexer)通知初始化调度器( Initiation Dispatcher
  • The Initiation Dispatcher triggers Event Handler hook method in response to events on the readyHandles. When events occur, the Initiation Dispatcher uses the Handles activated by the event sources as “keys” to locate and dispatch the appropriate Event Handler's hook method.
  • 初始化调度器( Initiation Dispatcher)触发事件处理器的钩子方法来响应在句柄(handle)上准备好的事件。当事件发生时,初始化调度器( Initiation Dispatcher )使用被事件源激活的句柄(handle)作为定位和调度正确的事件处理的回调方法的key。
  • The Initiation Dispatcher calls back to the handle_event hook method of the Event Handler to perform application-specific functionality in response to an event. The type of event that occurred can be passed as a parameter to the method and used internally by this method to perform additional service specific demultiplexing and dispatching. An alternative dispatching approach is described in Section 9.4.
  • 初始化调度器(Initiation Dispatcher )回调事件处理器(Event Handler)当中的handle_event的钩子方法,来执行特定于程序的功能,以响应一个事件。发生的事件的类型能够作为参数传递到这个方法,然后在这个方法内部使用,以完成附加的业务功能

The following interaction diagram illustrates the collaboration between application code and participants in the Reactor pattern:

下面的交互类图阐述了应用代码和在Reactor模式中各个角色的协作关系:




2019-06-16 12:41:43 dam454450872 阅读数 515

Reactor是什么?

The reactor design_pattern is an event_handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.

from wiki

 

通过wiki中的定义我们可以发现Reactor的重点

  1. 事件驱动

  2. 可以处理一个或多个输入源

  3. 通过多路复用将请求的事件分发给对应的处理器处理

 

根据大神Doug Lea 在 《Scalable IO in Java 》中的介绍,Reacotr模型主要分为三个角色

  1. Reactor:把IO事件分配给对应的handler处理

  2. Acceptor:处理客户端连接事件

  3. Handler:处理非阻塞的任务

 

为什么使用Reactor?

 

传统阻塞IO模型的不足

  1. 每个连接都需要独立线程处理,当并发数大时,创建线程数多,占用资源

  2. 采用阻塞IO模型,连接建立后,若当前线程没有数据可读,线程会阻塞在读操作上,造成资源浪费

 

 

针对传统阻塞IO模型的两个问题,可以采用如下的方案

  1. 基于池化思想,避免为每个连接创建线程,连接完成后将业务处理交给线程池处理

  2. 基于IO复用模型,多个连接共用同一个阻塞对象,不用等待所有的连接。遍历到有新数据可以处理时,操作系统会通知程序,线程跳出阻塞状态,进行业务逻辑处理

Reactor线程模型的思想就是基于IO复用和线程池的结合

 

Reactor线程模型分类

根据Reactor的数量和处理资源的线程数量的不同,分为三类:

  1. 单Reactor单线程模型

  2. 单Reactor多线程模型

  3. 多Reactor多线程模型

 

单Reactor单线程模型

这种模型在Reactor中处理事件,并分发事件,如果是连接事件交给acceptor处理,如果是读写事件和业务处理就交给handler处理,但始终只有一个线程执行所有的事情

 

该线程模型的不足

  1. 仅用一个线程处理请求,对于多核资源机器来说是有点浪费的

  2. 当处理读写任务的线程负载过高后,处理速度下降,事件会堆积,严重的会超时,可能导致客户端重新发送请求,性能越来越差

  3. 单线程也会有可靠性的问题

 

针对上面的种种不足,就有了下面的线程模型

 

单Reactor多线程模型

这种模型和第一种模型到的主要区别是把业务处理从之前的单一线程脱离出来,换成线程池处理,也就是Reactor线程只处理连接事件和读写事件,业务处理交给线程池处理,充分利用多核机器的资源、提高性能并且增加可靠性

 

 

该线程模型的不足

Reactor线程承担所有的事件,例如监听和响应,高并发场景下单线程存在性能问题

 

多Reactor多线程模型

这种模型下和第二种模型相比是把Reactor线程拆分了mainReactor和subReactor两个部分,mainReactor只处理连接事件,读写事件交给subReactor来处理。业务逻辑还是由线程池来处理

 

 

mainRactor只处理连接事件,用一个线程来处理就好。处理读写事件的subReactor个数一般和CPU数量相等,一个subReactor对应一个线程,业务逻辑由线程池处理

这种模型使各个模块职责单一,降低耦合度,性能和稳定性都有提高

这种模型在许多项目中广泛应用,比如Netty的主从线程模型等

 

Reactor三种模式形象比喻

餐厅一般有接待员和服务员,接待员负责在门口接待顾客,服务员负责全程服务顾客

Reactor的三种线程模型可以用接待员和服务员类比

  1. 单Reactor单线程模型:接待员和服务员是同一个人,一直为顾客服务。客流量较少适合

  2. 单Reactor多线程模型:一个接待员,多个服务员。客流量大,一个人忙不过来,由专门的接待员在门口接待顾客,然后安排好桌子后,由一个服务员一直服务,一般每个服务员负责一片中的几张桌子

  3. 多Reactor多线程模型:多个接待员,多个服务员。这种就是客流量太大了,一个接待员忙不过来了

 

 

参考资料

  1. 《Scalable IO in Java》 -Doug Lea  【关注公众号"每天晒白牙",回复“Doug Lea” 获取该pdf】

Reactor

阅读数 461

reactor模式

阅读数 8

没有更多推荐了,返回首页