2017-09-10 15:51:43 G290095142 阅读数 717
  • Hello React

    React主要用于构建UI 你可以在React里传递多种类型的参数,如声明代码,帮助你渲染出UI、也可以是静态的HTML DOM元素、也可以传递动态变量、甚至是可交互的应用组件。

    7495 人正在学习 去看看 姜威
    -

参考博客

Reactor模式,或者叫反应器模式
Reactor模型 三种模型 演变
Netty Reactor模型
如何深刻理解reactor和proactor?
actor、reactor与proactor模型
高性能IO模型浅析
高性能Server—Reactor模型—–Socket
Reactor模式详解

什么是Reactor模型

这里写图片描述

我们想象以下情形:长途客车在路途上,有人上车有人下车,但是乘客总是希望能够在客车上得到休息。

传统的做法是:每隔一段时间(或每一个站),司机或售票员对每一个乘客询问是否下车。

反应器模式做法是:汽车是乘客访问的主体(Reactor),乘客上车后,到售票员(acceptor)处登记,之后乘客便可以休息睡觉去了,当到达乘客所要到达的目的地后,售票员将其唤醒即可。
Reactor单线程模型
这里写图片描述
这个模型和上面的NIO流程很类似,只是将消息相关处理独立到了Handler中去了!
虽然上面说到NIO一个线程就可以支持所有的IO处理。但是瓶颈也是显而易见的!我们看一个客户端的情况,如果这个客户端多次进行请求,如果在Handler中的处理速度较慢,那么后续的客户端请求都会被积压,导致响应变慢!所以引入了Reactor多线程模型!
这里写图片描述
主从Reactor模型
这里写图片描述

2017-08-14 16:59:03 u011389977 阅读数 289
  • Hello React

    React主要用于构建UI 你可以在React里传递多种类型的参数,如声明代码,帮助你渲染出UI、也可以是静态的HTML DOM元素、也可以传递动态变量、甚至是可交互的应用组件。

    7495 人正在学习 去看看 姜威

Reactor 模式

参考:https://segmentfault.com/a/1190000002715832

这里写图片描述

Reactor 模式包含几个主要的模块:

  1. Reactor:反应器,定义一个接口,实现以下功能:
    • 供应用程序注册和删除关注的事件句柄。
    • 运行事件循环
    • 有就绪事件到来时,分发事件到之前注册的回调函数上处理
  2. Handle (文件描述符)
  3. Synchronous Event Demultiplexer(多路复用,如 epoll,poll,select)
  4. Event Handler(暴露出来供程序使用的事件处理接口,可以在这些接口中实现具体的处理函数)
  5. Concrete Event Handler(实现 4 中的接口,实现应用程序所提供的特定事件处理逻辑)

反应器的工作过程:

    具体时间处理程序不调用反应器,而是由反应器分配一个具体事件来处理程序,具体事件处理程序对某个指定的事件发生做出反应,而做出这个反应动作,是应用程序之前注册到 Reactor 上的 Concrete Event Handler。

这里写图片描述

    Reactor 被动的等待指示事件的到来并做出反应,它有一个等待的过程,做什么都要先放入到监听事件集合中等待 handler 可用时再进行操作。

这里写图片描述

2018-08-20 22:15:58 u013354486 阅读数 1085
  • Hello React

    React主要用于构建UI 你可以在React里传递多种类型的参数,如声明代码,帮助你渲染出UI、也可以是静态的HTML DOM元素、也可以传递动态变量、甚至是可交互的应用组件。

    7495 人正在学习 去看看 姜威

  本文翻译至某大神的论文,论文名字叫就叫reactor。

  reactor设计模式用于需要并发处理多个客户端的服务器。

1. 举个例子吧

  假设我们需要建立一个提供分布式日志服务的事件驱动服务器。客户们使用客户端会向该服务器发送请求记录自己的状态信息,信息包括错误通知、debug信息、表现诊断等。该日志服务器对于收到的信息进行分类并分发操作,包括:显示屏显示、打印机打印、数据库存储等,如下图所示。

这里写图片描述

  客户端和服务器之间通信协议通常选用TCP等面向连接的协议(为了保证可靠性),通过IP和端口的四元组来认定客户端和服务器。日志服务器被多个客户端同时使用,为此该服务器需要保证多用户连接请求和日志记录的并发性。为了保证这个基本要求,不难想到使用多线程去实现该服务器,每一个线程专门针对一个连接。

  然而,该设计存在以下的问题:

(1) 效率问题。

  多线程导致的上下文切换、同步、数据移动等可能带来效率的下降。

(2) 编程的简单性。

  多线程需要考虑复杂的并发设计,包括线程安全等。

(3)可移植性

  多线程在不同OS下的不同(或者没有)影响了可移植性。

  由于以上问题,多线程设计往往既不是最高效也不是最易实现的方案。由此我们需要别的方案来实现可以处理并发请求的服务器(推销自己的reactor)。

2. 需要解决的问题

  我们需要一个可以处理多个客户端请求的分布式服务器系统,每一个请求和某种特定的操作系统事件对应。比如在上文中提到的日志服务器,客户端请求处理日志记录就代表着读事件。在激活特定的服务前,服务器必须把收到的事件分发给各个事件所属的服务提供者。为了有效实现这一点,我们需要以下条件:

(1)服务器必须在等待事件发生的同事处理到达的事件,即不可由于单个事件导致服务器的完全阻塞,否则将导致其他事件严重延迟。

(2)服务器需要最小化延迟,最大化吞吐量,同时避免CPU的不必要开销。

(3)服务器的设计必须尽量简化并发策略的使用。

(4)集成新的或者改进的服务,例如改变信息格式或增加服务端缓存,应该尽量减少修改和维护已有代码的代价。例如对服务器客户端的改进不应该调整通用的事件分发机制。

(5)方便移植到不同的操作系统。

3. 解决方案

  我们将事件的同步分发器和提供事件服务的分配器整合在一起。另外,将针对特定应用、事件的分发和服务分配器和通用概念上的分发、分配模型解耦。对应用提供的每一项服务,我们使用一个单独的event handler处理该类事件。event handler向初始分配器(initiation dispatcher)注册,该分配器使用同步分发器(synchronous event demultiplexer)等待事件的发生。当时间发生时,同步分发器通知初始分配器回调event handler处理该事件。

4. 结构

  reactor模式主要组成部分包括操作系统管理的各种资源,如网络连接,文件读写,计时器,同步对象等。在日志服务器的例子中,我们使用句柄去识别终端套接字,从而使同步事件分发器可以等待多个事件的发生。日志服务器关注的时间类型主要包括连接事件和读事件,分别处理客户端的接入连接和日志信息录入。日志服务器为每个客户端维持了独立的连接,在服务端通过套接字句柄实现。

  同步事件分发器设置了一系列的句柄等待事件的发生,在没有事件发生的时候是阻塞的,当有事件到来可以初始化时即返回。IO事件常用的事件分发器是select。select是一个在UNIX和WIN32下均可使用的事件分发系统。

  初始分配器定义了注册、删除、分配事件句柄(event handler)对象的接口。当同步事件分发器检测到事件发生时,将通知初始分配器回调特定的事件句柄。常见的事件包括连接接受事件,数据输入输出事件,超时事件等。

这里写图片描述

  一个事件句柄通过钩子方法(hook method)实例化了为特定服务事件抽象出的分配操作。实例化的事件句柄(concrete event handler)继承于抽象事件句柄(abstract event handler)。另外,实例化事件句柄实现了继承的事件分配钩子方法。服务端通过初始分配器注册实例化句柄处理某类事件。当事件到达时,初始分配器回调合适的实例化句柄的钩子方法。

这里写图片描述

  在日志服务器中,如上文所述有两种实例化事件句柄:日志读写句柄(logging handler)和连接接受句柄(logging acceptor)。日志读写句柄用于接收和处理日志记录,连接接受句柄使用连接接受模式(Acceptor-Connecter pattern)创建、连接日志读写句柄以处理客户端传来的日志信息。下图中给出了reactor模式中各类结构的关系图。

这里写图片描述

5. 流程图

  reactor模式的运行流程图如下所示。

这里写图片描述

  (1)服务端使用初始分配器注册实例化事件句柄,即告诉初始分配器,当某种特定类型事件发生时则回调该实例化句柄。

  (2)初始分配器请求所有的事件句柄返回其句柄进行记录以便于辨识。

  (3)当所有事件均注册之后,主程序调用handle_event()函数开始初始分配器的事件循环。初始分配器此时掌握了事件句柄以便随时激活,同时联合同步事件分发器等待事件的到来。例如,我们可以使用select等待TCP的接入。

  (4)当事件发生的时候,同步事件分发器通知初始分配器。

  (5)初始分配器触发事件句柄钩子方法以处理事件。当事件发生时,初始分发器使用句柄激活事件资源并分配合适的事件句柄的钩子方法,并使用方法内在的函数进行处理。

2017-05-11 14:28:35 u011693064 阅读数 3412
  • Hello React

    React主要用于构建UI 你可以在React里传递多种类型的参数,如声明代码,帮助你渲染出UI、也可以是静态的HTML DOM元素、也可以传递动态变量、甚至是可交互的应用组件。

    7495 人正在学习 去看看 姜威

copy from github上的一份实现。。。找不到链接了。。。

  • epoll主要负责fd到event类型的映射
  • EventDemultiplexer管理fd <-> event类型 <-> eventhandler具体怎么做event的回调方法,从而间接实现fd <–event类型–> eventhandler
    的具体回调方法方法
  • Reactor负责注册、管理、分配
    ###核心代码###
  • reactor.h
#include "event_handler.h"
#include "event.h"
#include "reactor_impl.h"

class ReactorImpl; // 为了隐藏具体实现么。。

class Reactor {
public:
    static Reactor& get_instance();
    int regist(EventHandler* handler, Event evt);
    void remove(EventHandler* handler);
    void dispatch(int timeout = 0);

private:
    Reactor();
    ~Reactor();
    Reactor(const Reactor&);
    Reactor& operator=(const Reactor&);

private:
    ReactorImpl* _reactor_impl;
    static Reactor reactor;
};
  • reactor.cpp
#include "reactor.h"
#include <assert.h>
#include <new>

Reactor Reactor::reactor;

Reactor& Reactor::get_instance() {
    return reactor;
}

Reactor::Reactor() {
    _reactor_impl = new (std::nothrow)ReactorImpl();
    assert(_reactor_impl != NULL);
}

Reactor::~Reactor() {
    if (_reactor_impl != NULL) {
        delete _reactor_impl;
        _reactor_impl = NULL;
    }
}


int Reactor::regist(EventHandler* handler, Event evt) {
    return _reactor_impl->regist(handler, evt);
}

void Reactor::remove(EventHandler* handler) {
    return _reactor_impl->remove(handler);
}

void Reactor::dispatch(int timeout) {
    return _reactor_impl->dispatch(timeout);
}
  • reactor_impl.h
#include <map>
#include "event.h"
#include "event_handler.h"
#include "event_demultiplexer.h"

class ReactorImpl {
public:
    ReactorImpl();
    ~ReactorImpl();
    int regist(EventHandler* handler, Event evt);
    void remove(EventHandler* handler);
    void dispatch(int timeout = 0);

private:
    EventDemultiplexer* _demultiplexer;
    std::map<Handle, EventHandler*> _handlers;
};
  • reactor_impl.cpp
#include "reactor_impl.h"
#include <new>
#include <assert.h>
#include "epoll_demultiplexer.h"

ReactorImpl::ReactorImpl() {
    _demultiplexer = new (std::nothrow)EpollDemultiplexer();
    assert(_demultiplexer != NULL);
}

ReactorImpl::~ReactorImpl() {
    std::map<Handle, EventHandler*>::iterator iter = _handlers.begin();
    for(; iter != _handlers.end(); ++iter) {
        delete iter->second;
    }

    if (_demultiplexer != NULL) {
        delete _demultiplexer;
    }
}

int ReactorImpl::regist(EventHandler* handler, Event evt) {
    int handle = handler->get_handle();
    if (_handlers.find(handle) == _handlers.end()) {
        _handlers.insert(std::make_pair(handle, handler));
    }
    return _demultiplexer->regist(handle, evt);
}

void ReactorImpl::remove(EventHandler* handler) {
    int handle = handler->get_handle();
    // not check?
    _demultiplexer->remove(handle);

    std::map<Handle, EventHandler*>::iterator iter = _handlers.find(handle);
    delete iter->second;
    _handlers.erase(iter);
}

void ReactorImpl::dispatch(int timeout) {
    _demultiplexer->wait_event(_handlers, timeout);
}
  • event.h
typedef unsigned int Event;
enum EventMask {
    ReadEvent  = 0x01,
    WriteEvent = 0x02,
    ErrorEvent = 0x04,
    EventMask  = 0xff
};
  • event_demultiplexer.h
#include <map>
#include "event_handler.h"
#include "event.h"

class EventDemultiplexer {
public:
    EventDemultiplexer() {}
    virtual ~EventDemultiplexer()  {}
    virtual int wait_event(std::map<Handle, EventHandler*>& handlers, int timeout = 0) = 0;
    virtual int regist(Handle handle, Event evt) = 0;
    virtual int remove(Handle handle) = 0;
};
  • epoll_demultiplexer.h
#include <map>
#include "event_handler.h"
#include "event.h"
#include "event_demultiplexer.h"

class EpollDemultiplexer : public EventDemultiplexer {
public:
    EpollDemultiplexer();
    virtual ~EpollDemultiplexer();
    virtual int wait_event(std::map<Handle, EventHandler*>& handlers, int timeout = 0);
    virtual int regist(Handle handle, Event evt);
    virtual int remove(Handle handle);
private:
    int _max_fd;
    int _epoll_fd;
};
  • epoll_demultiplexer.cpp
#include <vector>
#include <sys/epoll.h>
#include <iostream>
#include <errno.h>
#include <unistd.h>

#include "epoll_demultiplexer.h"

EpollDemultiplexer::EpollDemultiplexer() : _max_fd(0) {
    _epoll_fd = epoll_create(1024);
}

EpollDemultiplexer::~EpollDemultiplexer() {
    close(_epoll_fd);
}

int EpollDemultiplexer::wait_event(std::map<Handle, EventHandler*>& handlers, int timeout) {
    std::vector<struct epoll_event> events(_max_fd);
    int num = epoll_wait(_epoll_fd, &events[0], _max_fd, timeout);
    if (num < 0) {
        //std::cerr << "WARNING: epoll_wait error " << errno << std::endl;
        return num;
    }

    for (int i = 0; i < num; ++i) {
        Handle handle = events[i].data.fd;
        if ((EPOLLHUP|EPOLLERR) & events[i].events) {
            assert(handlers[handle] != NULL);
            (handlers[handle])->handle_error();
        } else {
            if ((EPOLLIN) & events[i].events) {
                assert(handlers[handle] != NULL);
                (handlers[handle])->handle_read();
            }
            if (EPOLLOUT & events[i].events) {
                (handlers[handle])->handle_write();
            }
        }
    }
    return num;
}

int EpollDemultiplexer::regist(Handle handle, Event evt) {
    struct epoll_event ev;
    ev.data.fd = handle;
    if (evt & ReadEvent) {
        ev.events |= EPOLLIN;
    }
    if (evt & WriteEvent) {
        ev.events |= EPOLLOUT;
    }
    ev.events |= EPOLLET;

    if (epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, handle, &ev) != 0) { 
        if (errno == ENOENT) {
            if (epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, handle, &ev) != 0) {
                std::cerr << "epoll_ctl add error " << errno << std::endl;
                return -errno;
            }
            ++_max_fd;
        } else {
            ++_max_fd;
        }
    }
    return 0;
}

int EpollDemultiplexer::remove(Handle handle) {
    struct epoll_event ev;
    if (epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, handle, &ev) != 0) {
        std::cerr << "epoll_ctl del error" << errno << std::endl;
        return -errno;
    }
    --_max_fd;
    return 0;
}
  • event_handler.h
typedef int Handle;

class EventHandler {
public:
    EventHandler() {}
    virtual ~EventHandler() {}
    virtual Handle get_handle() const = 0;
    virtual void handle_read() = 0;
    virtual void handle_write() = 0;
    virtual void handle_error() = 0;
};
  • listen_handler.h
#include "event_handler.h"
#include "event.h"

class ListenHandler : public EventHandler {
public:
    ListenHandler(Handle fd);
    virtual ~ListenHandler();
    virtual Handle get_handle() const {
        return _listen_fd;
    }
    virtual void handle_read();
    virtual void handle_write();
    virtual void handle_error();
private:
    Handle _listen_fd;
};
  • listen_handler.cpp
#include "listen_handler.h"
#include <unistd.h>
#include <sys/socket.h>
#include <stdio.h>
#include <new>
#include <assert.h>
#include "event_handler.h"
#include "reactor.h"
#include "socket_handler.h"

ListenHandler::ListenHandler(Handle fd) : _listen_fd(fd) {
    // do nothing
}

ListenHandler::~ListenHandler() {
    close(_listen_fd);
}

void ListenHandler::handle_read() {
    int fd = accept(_listen_fd, NULL, NULL);
    EventHandler* h = new (std::nothrow)SocketHandler(fd);
    assert(h != NULL);
    Reactor& r = Reactor::get_instance();
    r.regist(h, ReadEvent);
}

void ListenHandler::handle_write() {
    // do nothing
}

void ListenHandler::handle_error() {
    // do nothing
}
  • socket_handler.h
#include "event_handler.h"
#include "event.h"

class SocketHandler : public EventHandler {
public:
    SocketHandler(Handle fd);
    virtual ~SocketHandler();
    virtual Handle get_handle() const {
        return _socket_fd;
    }
    virtual void handle_read();
    virtual void handle_write();
    virtual void handle_error();
private:
    Handle _socket_fd;
    char* _buf;
    static const int MAX_SIZE = 1024;
};
  • socket_handler.cpp
#include "socket_handler.h"
#include <string.h>
#include <stdio.h>
#include <assert.h>
#include <unistd.h>
#include <sys/socket.h>
#include <stdio.h>
#include <new>
#include "reactor.h"

SocketHandler::SocketHandler(Handle fd) :
        _socket_fd(fd) {
    _buf = new (std::nothrow)char[MAX_SIZE];
    assert(_buf != NULL);
    memset(_buf, 0, MAX_SIZE);
}

SocketHandler::~SocketHandler() {
    close(_socket_fd);
    delete[] _buf;
}

void SocketHandler::handle_read() {
    if (read(_socket_fd, _buf, MAX_SIZE) > 0) {
        write(_socket_fd, _buf, strlen(_buf));
    } 
    handle_error();
}

void SocketHandler::handle_write() {
    // do nothing
}

void SocketHandler::handle_error() {
    Reactor& r = Reactor::get_instance();
    r.remove(this);
}

demo###

  • client.cpp
#include "sys/socket.h"
#include <arpa/inet.h>
#include <sys/socket.h>
#include <iostream>
#include <errno.h>
#include "reactor.h"
#include "event_handler.h"
#include "listen_handler.h"
#include "event.h"

int main() {
    int socketfd = -1;
    if ( (socketfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
        std::cerr << "socket error " << errno << std::endl;
        exit(-1);
    }

    struct sockaddr_in seraddr;
    seraddr.sin_family = AF_INET;
    seraddr.sin_port = htons(53031);
    seraddr.sin_addr.s_addr = inet_addr("127.0.0.1"); // TODO

    if (connect(socketfd, (struct sockaddr*)&seraddr, sizeof(seraddr)) < 0) {
        std::cerr << "connect error " << errno << std::endl;
        exit(-2);
    }

    char wbuf[64] = {0};
    strcpy(wbuf, "hello world");
    int n = write(socketfd, wbuf, strlen(wbuf));

    char rbuf[64] = {0};
    memset(rbuf, 0, sizeof(rbuf));
    n = read(socketfd, rbuf, sizeof(rbuf));
    std::cout << "send [" << wbuf << "] reply [" << rbuf << "]" << std::endl;

    if (n < 0) {
        std::cerr << "read error " << errno << std::endl;
        exit(-3);
    }
    close(socketfd);
    return 0;

    Reactor& actor = Reactor::get_instance();
    EventHandler* handler = new ListenHandler(socketfd);
    actor.regist(handler, ReadEvent);

    while(true) {
        actor.dispatch(-1);
        std::cout << "client one loop" << std::endl;
    }

    return 0;
}
  • server.cpp
#include <sys/socket.h>
#include <arpa/inet.h>
#include <iostream>
#include <errno.h>
#include "reactor.h"
#include "event_handler.h"
#include "listen_handler.h"
#include "event.h"

int main() {
    int listenfd = -1;
    if ((listenfd = socket(AF_INET, SOCK_STREAM  | SOCK_NONBLOCK, 0)) < 0) {
        std::cerr << "socket error " << errno << std::endl;
        exit(-1);
    }

    struct sockaddr_in seraddr;
    seraddr.sin_family = AF_INET;
    seraddr.sin_port = htons(53031);
    seraddr.sin_addr.s_addr = htonl(INADDR_ANY);

    if (bind(listenfd, (struct sockaddr*)&seraddr, sizeof(seraddr)) < 0) {
        std::cerr << "bind error " << errno << std::endl;
        exit(-2);
    }

    if (listen(listenfd, 5) < 0) {
        std::cerr << "listen error " << errno << std::endl;
        exit(-3);
    }

    Reactor& actor = Reactor::get_instance();
    EventHandler* handler = new ListenHandler(listenfd);
    actor.regist(handler, ReadEvent);

    while(true) {
        actor.dispatch(100);
        //std::cout << "one loop" << std::endl;
    }

    return 0;
}
2018-11-19 16:45:59 m0_37556444 阅读数 187
  • Hello React

    React主要用于构建UI 你可以在React里传递多种类型的参数,如声明代码,帮助你渲染出UI、也可以是静态的HTML DOM元素、也可以传递动态变量、甚至是可交互的应用组件。

    7495 人正在学习 去看看 姜威

reactor模式

阅读数 653

Reactor的框架有5个关键部分:1》

博文 来自: tom555cat
没有更多推荐了,返回首页