-
2017-09-10 15:51:43 G290095142 阅读数 700
-
-
-
参考博客
Reactor模式,或者叫反应器模式
Reactor模型 三种模型 演变
Netty Reactor模型
如何深刻理解reactor和proactor?
actor、reactor与proactor模型
高性能IO模型浅析
高性能Server—Reactor模型—–Socket
Reactor模式详解什么是Reactor模型
我们想象以下情形:长途客车在路途上,有人上车有人下车,但是乘客总是希望能够在客车上得到休息。
传统的做法是:每隔一段时间(或每一个站),司机或售票员对每一个乘客询问是否下车。
反应器模式做法是:汽车是乘客访问的主体(Reactor),乘客上车后,到售票员(acceptor)处登记,之后乘客便可以休息睡觉去了,当到达乘客所要到达的目的地后,售票员将其唤醒即可。
Reactor单线程模型
这个模型和上面的NIO流程很类似,只是将消息相关处理独立到了Handler中去了!
虽然上面说到NIO一个线程就可以支持所有的IO处理。但是瓶颈也是显而易见的!我们看一个客户端的情况,如果这个客户端多次进行请求,如果在Handler中的处理速度较慢,那么后续的客户端请求都会被积压,导致响应变慢!所以引入了Reactor多线程模型!
主从Reactor模型
-
2018-08-20 22:15:58 u013354486 阅读数 1057
-
本文翻译至某大神的论文,论文名字叫就叫reactor。
reactor设计模式用于需要并发处理多个客户端的服务器。
1. 举个例子吧
假设我们需要建立一个提供分布式日志服务的事件驱动服务器。客户们使用客户端会向该服务器发送请求记录自己的状态信息,信息包括错误通知、debug信息、表现诊断等。该日志服务器对于收到的信息进行分类并分发操作,包括:显示屏显示、打印机打印、数据库存储等,如下图所示。
客户端和服务器之间通信协议通常选用TCP等面向连接的协议(为了保证可靠性),通过IP和端口的四元组来认定客户端和服务器。日志服务器被多个客户端同时使用,为此该服务器需要保证多用户连接请求和日志记录的并发性。为了保证这个基本要求,不难想到使用多线程去实现该服务器,每一个线程专门针对一个连接。
然而,该设计存在以下的问题:
(1) 效率问题。
多线程导致的上下文切换、同步、数据移动等可能带来效率的下降。
(2) 编程的简单性。
多线程需要考虑复杂的并发设计,包括线程安全等。
(3)可移植性
多线程在不同OS下的不同(或者没有)影响了可移植性。
由于以上问题,多线程设计往往既不是最高效也不是最易实现的方案。由此我们需要别的方案来实现可以处理并发请求的服务器(推销自己的reactor)。
2. 需要解决的问题
我们需要一个可以处理多个客户端请求的分布式服务器系统,每一个请求和某种特定的操作系统事件对应。比如在上文中提到的日志服务器,客户端请求处理日志记录就代表着读事件。在激活特定的服务前,服务器必须把收到的事件分发给各个事件所属的服务提供者。为了有效实现这一点,我们需要以下条件:
(1)服务器必须在等待事件发生的同事处理到达的事件,即不可由于单个事件导致服务器的完全阻塞,否则将导致其他事件严重延迟。
(2)服务器需要最小化延迟,最大化吞吐量,同时避免CPU的不必要开销。
(3)服务器的设计必须尽量简化并发策略的使用。
(4)集成新的或者改进的服务,例如改变信息格式或增加服务端缓存,应该尽量减少修改和维护已有代码的代价。例如对服务器客户端的改进不应该调整通用的事件分发机制。
(5)方便移植到不同的操作系统。
3. 解决方案
我们将事件的同步分发器和提供事件服务的分配器整合在一起。另外,将针对特定应用、事件的分发和服务分配器和通用概念上的分发、分配模型解耦。对应用提供的每一项服务,我们使用一个单独的event handler处理该类事件。event handler向初始分配器(initiation dispatcher)注册,该分配器使用同步分发器(synchronous event demultiplexer)等待事件的发生。当时间发生时,同步分发器通知初始分配器回调event handler处理该事件。
4. 结构
reactor模式主要组成部分包括操作系统管理的各种资源,如网络连接,文件读写,计时器,同步对象等。在日志服务器的例子中,我们使用句柄去识别终端套接字,从而使同步事件分发器可以等待多个事件的发生。日志服务器关注的时间类型主要包括连接事件和读事件,分别处理客户端的接入连接和日志信息录入。日志服务器为每个客户端维持了独立的连接,在服务端通过套接字句柄实现。
同步事件分发器设置了一系列的句柄等待事件的发生,在没有事件发生的时候是阻塞的,当有事件到来可以初始化时即返回。IO事件常用的事件分发器是select。select是一个在UNIX和WIN32下均可使用的事件分发系统。
初始分配器定义了注册、删除、分配事件句柄(event handler)对象的接口。当同步事件分发器检测到事件发生时,将通知初始分配器回调特定的事件句柄。常见的事件包括连接接受事件,数据输入输出事件,超时事件等。
一个事件句柄通过钩子方法(hook method)实例化了为特定服务事件抽象出的分配操作。实例化的事件句柄(concrete event handler)继承于抽象事件句柄(abstract event handler)。另外,实例化事件句柄实现了继承的事件分配钩子方法。服务端通过初始分配器注册实例化句柄处理某类事件。当事件到达时,初始分配器回调合适的实例化句柄的钩子方法。
在日志服务器中,如上文所述有两种实例化事件句柄:日志读写句柄(logging handler)和连接接受句柄(logging acceptor)。日志读写句柄用于接收和处理日志记录,连接接受句柄使用连接接受模式(Acceptor-Connecter pattern)创建、连接日志读写句柄以处理客户端传来的日志信息。下图中给出了reactor模式中各类结构的关系图。
5. 流程图
reactor模式的运行流程图如下所示。
(1)服务端使用初始分配器注册实例化事件句柄,即告诉初始分配器,当某种特定类型事件发生时则回调该实例化句柄。
(2)初始分配器请求所有的事件句柄返回其句柄进行记录以便于辨识。
(3)当所有事件均注册之后,主程序调用handle_event()函数开始初始分配器的事件循环。初始分配器此时掌握了事件句柄以便随时激活,同时联合同步事件分发器等待事件的到来。例如,我们可以使用select等待TCP的接入。
(4)当事件发生的时候,同步事件分发器通知初始分配器。
(5)初始分配器触发事件句柄钩子方法以处理事件。当事件发生时,初始分发器使用句柄激活事件资源并分配合适的事件句柄的钩子方法,并使用方法内在的函数进行处理。
-
2017-05-11 14:28:35 u011693064 阅读数 3387
-
copy from github上的一份实现。。。找不到链接了。。。
- epoll主要负责fd到event类型的映射
- EventDemultiplexer管理fd <-> event类型 <-> eventhandler具体怎么做event的回调方法,从而间接实现fd <–event类型–> eventhandler
的具体回调方法方法 - Reactor负责注册、管理、分配
###核心代码### - reactor.h
#include "event_handler.h" #include "event.h" #include "reactor_impl.h" class ReactorImpl; // 为了隐藏具体实现么。。 class Reactor { public: static Reactor& get_instance(); int regist(EventHandler* handler, Event evt); void remove(EventHandler* handler); void dispatch(int timeout = 0); private: Reactor(); ~Reactor(); Reactor(const Reactor&); Reactor& operator=(const Reactor&); private: ReactorImpl* _reactor_impl; static Reactor reactor; };
- reactor.cpp
#include "reactor.h" #include <assert.h> #include <new> Reactor Reactor::reactor; Reactor& Reactor::get_instance() { return reactor; } Reactor::Reactor() { _reactor_impl = new (std::nothrow)ReactorImpl(); assert(_reactor_impl != NULL); } Reactor::~Reactor() { if (_reactor_impl != NULL) { delete _reactor_impl; _reactor_impl = NULL; } } int Reactor::regist(EventHandler* handler, Event evt) { return _reactor_impl->regist(handler, evt); } void Reactor::remove(EventHandler* handler) { return _reactor_impl->remove(handler); } void Reactor::dispatch(int timeout) { return _reactor_impl->dispatch(timeout); }
- reactor_impl.h
#include <map> #include "event.h" #include "event_handler.h" #include "event_demultiplexer.h" class ReactorImpl { public: ReactorImpl(); ~ReactorImpl(); int regist(EventHandler* handler, Event evt); void remove(EventHandler* handler); void dispatch(int timeout = 0); private: EventDemultiplexer* _demultiplexer; std::map<Handle, EventHandler*> _handlers; };
- reactor_impl.cpp
#include "reactor_impl.h" #include <new> #include <assert.h> #include "epoll_demultiplexer.h" ReactorImpl::ReactorImpl() { _demultiplexer = new (std::nothrow)EpollDemultiplexer(); assert(_demultiplexer != NULL); } ReactorImpl::~ReactorImpl() { std::map<Handle, EventHandler*>::iterator iter = _handlers.begin(); for(; iter != _handlers.end(); ++iter) { delete iter->second; } if (_demultiplexer != NULL) { delete _demultiplexer; } } int ReactorImpl::regist(EventHandler* handler, Event evt) { int handle = handler->get_handle(); if (_handlers.find(handle) == _handlers.end()) { _handlers.insert(std::make_pair(handle, handler)); } return _demultiplexer->regist(handle, evt); } void ReactorImpl::remove(EventHandler* handler) { int handle = handler->get_handle(); // not check? _demultiplexer->remove(handle); std::map<Handle, EventHandler*>::iterator iter = _handlers.find(handle); delete iter->second; _handlers.erase(iter); } void ReactorImpl::dispatch(int timeout) { _demultiplexer->wait_event(_handlers, timeout); }
- event.h
typedef unsigned int Event; enum EventMask { ReadEvent = 0x01, WriteEvent = 0x02, ErrorEvent = 0x04, EventMask = 0xff };
- event_demultiplexer.h
#include <map> #include "event_handler.h" #include "event.h" class EventDemultiplexer { public: EventDemultiplexer() {} virtual ~EventDemultiplexer() {} virtual int wait_event(std::map<Handle, EventHandler*>& handlers, int timeout = 0) = 0; virtual int regist(Handle handle, Event evt) = 0; virtual int remove(Handle handle) = 0; };
- epoll_demultiplexer.h
#include <map> #include "event_handler.h" #include "event.h" #include "event_demultiplexer.h" class EpollDemultiplexer : public EventDemultiplexer { public: EpollDemultiplexer(); virtual ~EpollDemultiplexer(); virtual int wait_event(std::map<Handle, EventHandler*>& handlers, int timeout = 0); virtual int regist(Handle handle, Event evt); virtual int remove(Handle handle); private: int _max_fd; int _epoll_fd; };
- epoll_demultiplexer.cpp
#include <vector> #include <sys/epoll.h> #include <iostream> #include <errno.h> #include <unistd.h> #include "epoll_demultiplexer.h" EpollDemultiplexer::EpollDemultiplexer() : _max_fd(0) { _epoll_fd = epoll_create(1024); } EpollDemultiplexer::~EpollDemultiplexer() { close(_epoll_fd); } int EpollDemultiplexer::wait_event(std::map<Handle, EventHandler*>& handlers, int timeout) { std::vector<struct epoll_event> events(_max_fd); int num = epoll_wait(_epoll_fd, &events[0], _max_fd, timeout); if (num < 0) { //std::cerr << "WARNING: epoll_wait error " << errno << std::endl; return num; } for (int i = 0; i < num; ++i) { Handle handle = events[i].data.fd; if ((EPOLLHUP|EPOLLERR) & events[i].events) { assert(handlers[handle] != NULL); (handlers[handle])->handle_error(); } else { if ((EPOLLIN) & events[i].events) { assert(handlers[handle] != NULL); (handlers[handle])->handle_read(); } if (EPOLLOUT & events[i].events) { (handlers[handle])->handle_write(); } } } return num; } int EpollDemultiplexer::regist(Handle handle, Event evt) { struct epoll_event ev; ev.data.fd = handle; if (evt & ReadEvent) { ev.events |= EPOLLIN; } if (evt & WriteEvent) { ev.events |= EPOLLOUT; } ev.events |= EPOLLET; if (epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, handle, &ev) != 0) { if (errno == ENOENT) { if (epoll_ctl(_epoll_fd, EPOLL_CTL_ADD, handle, &ev) != 0) { std::cerr << "epoll_ctl add error " << errno << std::endl; return -errno; } ++_max_fd; } else { ++_max_fd; } } return 0; } int EpollDemultiplexer::remove(Handle handle) { struct epoll_event ev; if (epoll_ctl(_epoll_fd, EPOLL_CTL_DEL, handle, &ev) != 0) { std::cerr << "epoll_ctl del error" << errno << std::endl; return -errno; } --_max_fd; return 0; }
- event_handler.h
typedef int Handle; class EventHandler { public: EventHandler() {} virtual ~EventHandler() {} virtual Handle get_handle() const = 0; virtual void handle_read() = 0; virtual void handle_write() = 0; virtual void handle_error() = 0; };
- listen_handler.h
#include "event_handler.h" #include "event.h" class ListenHandler : public EventHandler { public: ListenHandler(Handle fd); virtual ~ListenHandler(); virtual Handle get_handle() const { return _listen_fd; } virtual void handle_read(); virtual void handle_write(); virtual void handle_error(); private: Handle _listen_fd; };
- listen_handler.cpp
#include "listen_handler.h" #include <unistd.h> #include <sys/socket.h> #include <stdio.h> #include <new> #include <assert.h> #include "event_handler.h" #include "reactor.h" #include "socket_handler.h" ListenHandler::ListenHandler(Handle fd) : _listen_fd(fd) { // do nothing } ListenHandler::~ListenHandler() { close(_listen_fd); } void ListenHandler::handle_read() { int fd = accept(_listen_fd, NULL, NULL); EventHandler* h = new (std::nothrow)SocketHandler(fd); assert(h != NULL); Reactor& r = Reactor::get_instance(); r.regist(h, ReadEvent); } void ListenHandler::handle_write() { // do nothing } void ListenHandler::handle_error() { // do nothing }
- socket_handler.h
#include "event_handler.h" #include "event.h" class SocketHandler : public EventHandler { public: SocketHandler(Handle fd); virtual ~SocketHandler(); virtual Handle get_handle() const { return _socket_fd; } virtual void handle_read(); virtual void handle_write(); virtual void handle_error(); private: Handle _socket_fd; char* _buf; static const int MAX_SIZE = 1024; };
- socket_handler.cpp
#include "socket_handler.h" #include <string.h> #include <stdio.h> #include <assert.h> #include <unistd.h> #include <sys/socket.h> #include <stdio.h> #include <new> #include "reactor.h" SocketHandler::SocketHandler(Handle fd) : _socket_fd(fd) { _buf = new (std::nothrow)char[MAX_SIZE]; assert(_buf != NULL); memset(_buf, 0, MAX_SIZE); } SocketHandler::~SocketHandler() { close(_socket_fd); delete[] _buf; } void SocketHandler::handle_read() { if (read(_socket_fd, _buf, MAX_SIZE) > 0) { write(_socket_fd, _buf, strlen(_buf)); } handle_error(); } void SocketHandler::handle_write() { // do nothing } void SocketHandler::handle_error() { Reactor& r = Reactor::get_instance(); r.remove(this); }
demo###
- client.cpp
#include "sys/socket.h" #include <arpa/inet.h> #include <sys/socket.h> #include <iostream> #include <errno.h> #include "reactor.h" #include "event_handler.h" #include "listen_handler.h" #include "event.h" int main() { int socketfd = -1; if ( (socketfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) { std::cerr << "socket error " << errno << std::endl; exit(-1); } struct sockaddr_in seraddr; seraddr.sin_family = AF_INET; seraddr.sin_port = htons(53031); seraddr.sin_addr.s_addr = inet_addr("127.0.0.1"); // TODO if (connect(socketfd, (struct sockaddr*)&seraddr, sizeof(seraddr)) < 0) { std::cerr << "connect error " << errno << std::endl; exit(-2); } char wbuf[64] = {0}; strcpy(wbuf, "hello world"); int n = write(socketfd, wbuf, strlen(wbuf)); char rbuf[64] = {0}; memset(rbuf, 0, sizeof(rbuf)); n = read(socketfd, rbuf, sizeof(rbuf)); std::cout << "send [" << wbuf << "] reply [" << rbuf << "]" << std::endl; if (n < 0) { std::cerr << "read error " << errno << std::endl; exit(-3); } close(socketfd); return 0; Reactor& actor = Reactor::get_instance(); EventHandler* handler = new ListenHandler(socketfd); actor.regist(handler, ReadEvent); while(true) { actor.dispatch(-1); std::cout << "client one loop" << std::endl; } return 0; }
- server.cpp
#include <sys/socket.h> #include <arpa/inet.h> #include <iostream> #include <errno.h> #include "reactor.h" #include "event_handler.h" #include "listen_handler.h" #include "event.h" int main() { int listenfd = -1; if ((listenfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) < 0) { std::cerr << "socket error " << errno << std::endl; exit(-1); } struct sockaddr_in seraddr; seraddr.sin_family = AF_INET; seraddr.sin_port = htons(53031); seraddr.sin_addr.s_addr = htonl(INADDR_ANY); if (bind(listenfd, (struct sockaddr*)&seraddr, sizeof(seraddr)) < 0) { std::cerr << "bind error " << errno << std::endl; exit(-2); } if (listen(listenfd, 5) < 0) { std::cerr << "listen error " << errno << std::endl; exit(-3); } Reactor& actor = Reactor::get_instance(); EventHandler* handler = new ListenHandler(listenfd); actor.regist(handler, ReadEvent); while(true) { actor.dispatch(100); //std::cout << "one loop" << std::endl; } return 0; }
-
2016-09-27 15:58:28 aiwo429001 阅读数 755
-
依赖包
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bus</artifactId>
<version>2.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor.spring</groupId>
<artifactId>reactor-spring-core</artifactId>
<version>2.0.7.RELEASE</version>
</dependency>
<dependency>
<groupId>io.projectreactor.spring</groupId>
<artifactId>reactor-spring-context</artifactId>
<version>2.0.7.RELEASE</version>
</dependency>类BaseEvent
@Component
public class BaseEvent implements ApplicationContextAware{
private static ApplicationContext context;
@Override
public void setApplicationContext(ApplicationContext applicationContext)
throws BeansException {
context = applicationContext;
}
public static EventBus getEventBus(){
return context.getBean(EventBus.class);
}
}TestSerivce
@Service
@Consumer
public class TestSerivce extends BaseEvent{
private static final Log log = LogFactory.getLog("test");
@Autowired
private CommonDao dao;
@Autowired
private TestBorDao;
@Autowired
private ThreadPoolTaskExecutor executor;
final static String EVT_CLEAR_COMPLETED = "EVT_CLEAR_COMPLETED";
final static String EVT_UPDATE_COMPLETED = "EVT_UPDATE_COMPLETED";
final static String EVT_START_CLEAN_UP = "EVT_START_CLEAN_UP";
public void checkMain(){
//当前系统存在问题, 有2层事务sprint和hibernate, 导致无法手动控制事务, 必须异步跳出, 使内外执行处于不同事物中
log.info("开始贷后扫描");
Future<Boolean> result = executor.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
try {
//检查已监控但未启用的数据, 将列表中旧检查数据置为已删除, 并物理删除半年以前的数据
checkNotEnabled();
} catch (Exception e) {
log.error("检查已监控但未启用的数据出错", e);
}
try {
//检查未监控但已启用的数据, 将其加入贷后监控主表
checkEnabled();
} catch (Exception e) {
log.error("检查未监控但已启用的数据出错", e);
}
return true;
}
});
try{
result.get();
getEventBus().notify(EVT_CLEAR_COMPLETED);
}catch(Exception e){
}
}
@Selector(EVT_CLEAR_COMPLETED)
public void update(){
final StringBuffer errorMsg=new StringBuffer();
for (final TestMain mo : dao.find(TestMain.class, "isDeleted = 1")){
Future<Boolean> result = executor.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
try {
updateData(mo);
return true;
} catch (Exception e) {
log.error("数据处理失败,ID"+mo.getId(), e);
errorMsg.append("数据处理失败,ID"+mo.getId());
return false;
}
}
});
try{
if (!result.get()) mo.setIsDeleted(0);
getEventBus().notify(EVT_UPDATE_COMPLETED, Event.wrap(mo));
}catch(Exception e){
getEventBus().notify(EVT_UPDATE_COMPLETED, Event.wrap(mo));
}
}
getEventBus().notify(EVT_START_CLEAN_UP, Event.wrap(errorMsg.toString()));
}
@Selector(EVT_UPDATE_COMPLETED)
public void update(Event<RkMonMain> mo){
try{
dao.saveOrUpdate(mo.getData());
}catch(Exception e){
log.error(e);
}
}
@Selector(EVT_START_CLEAN_UP)
public void cleanup(Event<String> result){
String errors = result.getData();
if(!CommValidation.isEmpty(errors)){
List<SysAutowarnMember> amList=dao.find(SysAutowarnMember.class,"isDeleted = 1 ");
String toMail="";
for(SysAutowarnMember member:amList){
toMail+=member.getEmail()+",";
}
log.warn("跑批没有全部成功:"+errors);
messageService.sendMessage("数据处理失败,ID", errors, toMail, null, null);
}
}
private void updateData(TestMain mo){
}
}
通过通知的方式实现异步执行

在react中的路由 相关内容

在react中的路由 相关内容

在react中的路由 相关内容

在react中的路由 相关内容
-
阅读数 935
-
阅读数 210
-
阅读数 286
-
阅读数 38
-
阅读数 177