精华内容
下载资源
问答
  • Reactor模式

    2020-06-26 08:20:07
    什么是Reactor模式 Reactor模式是一种设计模式,它是基于事件驱动的,可以并发的处理多个服务请求,当请求抵达后,依据多路复用策略,同步的派发这些请求至相关的请求处理程序。 Reactor模式角色构成 在早先的论文An...
        

    什么是Reactor模式

    Reactor模式是一种设计模式,它是基于事件驱动的,可以并发的处理多个服务请求,当请求抵达后,依据多路复用策略,同步的派发这些请求至相关的请求处理程序。

    Reactor模式角色构成

    在早先的论文An Object Behavioral Pattern for
    Demultiplexing and Dispatching Handles for Synchronous Events
    中Reactor模式主要有五大角色组成,分别如下:

    Handle:操作系统提供的一种资源,用于表示一个个的事件,在网络编程中可以是一个连接事件,一个读取事件,一个写入事件,Handle是事件产生的发源地
    Synchronous Event Demultiplexer:本质上是一个系统调用,用于等待事件的发生,调用方在调用它的时候会被阻塞,一直阻塞到同步事件分离器上有事件产生为止
    Initiation Dispatcher:定义了一些用于控制事件的调度方式的规范,提供对事件管理。它本身是整个事件处理器的核心所在,Initiation Dispatcher会通过Synchronous Event Demultiplexer来等待事件的发生。一旦事件发生,Initiation Dispatcher首先会分离出每一个事件,然后调用事件处理器,最后调用相关的回调方法来处理这些事件
    Event Handler:定义事件处理方法以供InitiationDispatcher回调使用
    Concrete Event Handler:是事件处理器的实现。它本身实现了事件处理器所提供的各种回调方法,从而实现了特定于业务的逻辑。它本质上就是我们所编写的一个个的处理器实现。

    img

    Reactor模式实现流程

    1. 初始化 Initiation Dispatcher,然后将若干个Concrete Event Handler注册到 Initiation Dispatcher中,应用会标识出该事件处理器希望Initiation Dispatcher在某些事件发生时向其发出通知
    2. Initiation Dispatcher 会要求每个事件处理器向其传递内部的Handle,该Handle向操作系统标识了事件处理器
    3. 当所有的Concrete Event Handler都注册完毕后,就会启动 Initiation Dispatcher的事件循环,使用Synchronous Event Demultiplexer同步阻塞的等待事件的发生
    4. 当与某个事件源对应的Handle变为ready状态时,Synchronous Event Demultiplexer就会通知 Initiation Dispatcher
    5. Initiation Dispatcher会触发事件处理器的回调方法响应这个事件

    img

    Java NIO对Reactor的实现

    在Java的NIO中,对Reactor模式有无缝的支持,即使用Selector类封装了操作系统提供的Synchronous Event Demultiplexer功能。Doug Lea(Java concurrent包的作者)在Scalable IO in Java中对此有非常详细的描述。概况来说其主要流程如下:

    1. 服务器端的Reactor线程对象会启动事件循环,并使用Selector来实现IO的多路复用
    2. 注册Acceptor事件处理器到Reactor中,Acceptor事件处理器所关注的事件是ACCEPT事件,这样Reactor会监听客户端向服务器端发起的连接请求事件
    3. 客户端向服务器端发起连接请求,Reactor监听到了该ACCEPT事件的发生并将该ACCEPT事件派发给相应的Acceptor处理器来进行处理。Acceptor处理器通过accept()方法得到与这个客户端对应的连接(SocketChannel),然后将该连接所关注的READ/WRITE事件以及对应的READ/WRITE事件处理器注册到Reactor中,这样一来Reactor就会监听该连接的READ/WRITE事件了。
    4. 当Reactor监听到有读或者写事件发生时,将相关的事件派发给对应的处理器进行处理
    5. 每当处理完所有就绪的感兴趣的I/O事件后,Reactor线程会再次执行select()阻塞等待新的事件就绪并将其分派给对应处理器进行处理

    Doug Lea 在Scalable IO in Java中分别描述了单线程的Reactor,多线程模式的Reactor以及多Reactor线程模式。

    单线程的Reactor,主要依赖Java NIO中的Channel,Buffer,Selector,SelectionKey。在单线程Reactor模式中,不仅I/O操作在该Reactor线程上,连非I/O的业务操作也在该线程上进行处理了,这可能会大大延迟I/O请求的响应

    img

    在多线程Reactor中添加了一个工作线程池,将非I/O操作从Reactor线程中移出转交给工作者线程池来执行。这样能够提高Reactor线程的I/O响应,不至于因为一些耗时的业务逻辑而延迟对后面I/O请求的处理,但是所有的I/O操作依旧由一个Reactor来完成,包括I/O的accept()、read()、write()以及connect()操作

    img

    多Reactor线程模式将“接受客户端的连接请求”和“与该客户端的通信”分在了两个Reactor线程来完成。mainReactor完成接收客户端连接请求的操作,它不负责与客户端的通信,而是将建立好的连接转交给subReactor线程来完成与客户端的通信,这样一来就不会因为read()数据量太大而导致后面的客户端连接请求得不到即时处理的情况。并且多Reactor线程模式在海量的客户端并发请求的情况下,还可以通过实现subReactor线程池来将海量的连接分发给多个subReactor线程,在多核的操作系统中这能大大提升应用的负载和吞吐量

    img

    代码示例:

    // NIO selector 多路复用reactor线程模型
    public class NIOReactor {
    
      // 处理业务操作的线程池
      private static ExecutorService workPool = Executors.newCachedThreadPool();
    
      // 封装了selector.select()等事件轮询的代码
      abstract class ReactorThread extends Thread {
    
        Selector selector;
        LinkedBlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>();
    
        volatile boolean running = false;
    
        private ReactorThread() throws IOException {
          selector = Selector.open();
        }
    
        // Selector监听到有事件后,调用这个方法
        public abstract void handler(SelectableChannel channel) throws Exception;
    
        @Override
        public void run() {
          // 轮询Selector事件
          while (running) {
            try {
              // 执行队列中的任务
              Runnable task;
              while ((task = taskQueue.poll()) != null) {
                task.run();
              }
              selector.select(1000);
              // 获取查询结果
              Set<SelectionKey> selectionKeys = selector.selectedKeys();
              // 遍历查询结果
              Iterator<SelectionKey> keyIterator = selectionKeys.iterator();
              while (keyIterator.hasNext()) {
                // 被封装的查询结果
                SelectionKey selectionKey = keyIterator.next();
                keyIterator.remove();
                int readyOps = selectionKey.readyOps();
                // 关注 Read 和 Accept两个事件
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0
                    || readyOps == 0) {
                  try {
                    SelectableChannel channel = (SelectableChannel) selectionKey.attachment();
                    channel.configureBlocking(false);
                    handler(channel);
                    // 如果关闭了,就取消这个KEY的订阅
                    if (!channel.isOpen()) {
                      selectionKey.cancel();
                    }
    
                  } catch (Exception e) {
                    // 如果有异常,就取消这个KEY的订阅
                    selectionKey.cancel();
                    e.printStackTrace();
                  }
                }
              }
    
            } catch (Exception e) {
              e.printStackTrace();
            }
          }
        }
    
        private SelectionKey register(SelectableChannel channel) throws Exception {
          // 为什么register要以任务提交的形式,让reactor线程去处理?
          // 因为线程在执行channel注册到selector的过程中,会和调用selector.select()方法的线程争用同一把锁
          // 而select()方法实在eventLoop中通过while循环调用的,争抢的可能性很高,
          // 为了让register能更快的执行,就放到同一个线程来处理
          FutureTask<SelectionKey> futureTask =
              new FutureTask<>(() -> channel.register(selector, 0, channel));
          taskQueue.add(futureTask);
          return futureTask.get();
        }
    
        private void doStart() {
          if (!running) {
            running = true;
            start();
          }
        }
      }
    
      private ServerSocketChannel serverSocketChannel;
    
      // 1、创建多个线程 - accept处理reactor线程 (accept线程)
      private ReactorThread[] mainReactorThreads = new ReactorThread[1];
    
      // 2、创建多个线程 - io处理reactor线程  (I/O线程)
      private ReactorThread[] subReactorThreads = new ReactorThread[8];
    
      // 初始化线程组
      private void newGroup() throws IOException {
        // 创建mainReactor线程, 只负责处理serverSocketChannel
        for (int i = 0; i < mainReactorThreads.length; i++) {
          mainReactorThreads[i] =
              new ReactorThread() {
                AtomicInteger incr = new AtomicInteger(0);
    
                @Override
                public void handler(SelectableChannel channel) throws Exception {
                  // 只做请求分发,不做具体的数据读取
                  ServerSocketChannel ch = (ServerSocketChannel) channel;
                  SocketChannel socketChannel = ch.accept();
                  socketChannel.configureBlocking(false);
                  // 收到连接建立的通知之后,分发给I/O线程继续去读取数据
                  int index = incr.getAndIncrement() % subReactorThreads.length;
                  ReactorThread workEventLoop = subReactorThreads[index];
                  workEventLoop.doStart();
                  SelectionKey selectionKey = workEventLoop.register(socketChannel);
                  selectionKey.interestOps(SelectionKey.OP_READ);
                  System.out.println(
                      Thread.currentThread().getName() + "收到新连接 : " + socketChannel.getRemoteAddress());
                }
              };
        }
    
        // 创建IO线程,负责处理客户端连接以后socketChannel的IO读写
        for (int i = 0; i < subReactorThreads.length; i++) {
          subReactorThreads[i] =
              new ReactorThread() {
    
                @Override
                public void handler(SelectableChannel channel) throws Exception {
                  // work线程只负责处理IO处理,不处理accept事件
                  SocketChannel ch = (SocketChannel) channel;
                  ByteBuffer requestBuffer = ByteBuffer.allocate(1024);
                  while (ch.isOpen() && ch.read(requestBuffer) != -1) {
                    // 长连接情况下,需要手动判断数据有没有读取结束 (此处做一个简单的判断: 超过0字节就认为请求结束了)
                    if (requestBuffer.position() > 0) break;
                  }
                  if (requestBuffer.position() == 0) return; // 如果没数据了, 则不继续后面的处理
                  requestBuffer.flip();
                  byte[] content = new byte[requestBuffer.limit()];
                  requestBuffer.get(content);
                  System.out.println(new String(content));
                  System.out.println(
                      Thread.currentThread().getName() + "收到数据,来自:" + ch.getRemoteAddress());
    
                  // TODO 业务操作 数据库、接口...
                  workPool.submit(() -> {});
    
                  // 响应结果 200
                  String response =
                      "HTTP/1.1 200 OK\r\n" + "Content-Length: 11\r\n\r\n" + "Hello World";
                  ByteBuffer buffer = ByteBuffer.wrap(response.getBytes());
                  while (buffer.hasRemaining()) {
                    ch.write(buffer);
                  }
                }
              };
        }
      }
    
      // 始化channel,并且绑定一个eventLoop线程
      private void initAndRegister() throws Exception {
        // 1、 创建ServerSocketChannel
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        // 2、 将serverSocketChannel注册到selector
        int index = new Random().nextInt(mainReactorThreads.length);
        mainReactorThreads[index].doStart();
        SelectionKey selectionKey = mainReactorThreads[index].register(serverSocketChannel);
        selectionKey.interestOps(SelectionKey.OP_ACCEPT);
      }
    
      // 绑定端口
      private void bind() throws IOException {
        //  1、 正式绑定端口,对外服务
        serverSocketChannel.bind(new InetSocketAddress(8080));
        System.out.println("启动完成,端口8080");
      }
    
      public static void main(String[] args) throws Exception {
        NIOReactor nioReactor = new NIOReactor();
        // 1、 创建main和sub两组线程
        nioReactor.newGroup();
        // 2、 创建serverSocketChannel,注册到mainReactor线程上的selector上
        nioReactor.initAndRegister();
        // 3、 为serverSocketChannel绑定端口
        nioReactor.bind();
      }
    }
    

    展开全文
  • reactor模式

    2018-12-11 10:50:39
    一. 概念 reactor是一种设计模式,常常可以用来设计网络服务器。reactor是一种基于事件触发的模式,类似观察者模式,与观察者模式不同的是,reactor关联监听多个...reactor模式是基于IO多路复用的,充分利用系统...

    一. 概念

    reactor是一种设计模式,常常可以用来设计网络服务器。reactor是一种基于事件触发的模式,类似观察者模式,与观察者模式不同的是,reactor关联监听多个主体,当某个主体有事件发生时,分发事件给相应的handler处理。

    我们以网络服务器为例,来说明reactor模式在网络服务器中应用。

    二. reactor模式解析

    reactor模式是基于IO多路复用的,充分利用系统的IO多路复用的机制,能够高效的同时处理多个socket fd。

    reactor的模式的基本结构如下所示(盗图):

    è¿éåå¾çæè¿°

    1. Initaition Dispacher(简称Dispacher,下同)

    (1) Dispacher作为一个分发系统,负责Event Handler的注册和删除。

    (2) 使用Synchronous Event Demultiplexer,监听Event Handler注册的所有的Handle。

    (3) 当监听的Handle(socket fd)中有事件发生时,分发事件给相应的Event Handler处理。

    2. Synchronous Event Demultiplexer

    可以为IO多路复用的机制,如select/epoll等,监听所有的Handle事件。

    3. Event Handler

    (1) 拥有相应的Handle,如socket fd。

    (2) Handle对应的不同的事件的处理单元。

    4. Concrete Event Handler

    特指Event Handler的具体实现。

    5. Handle

    可以为socket fd,包括listen的socket fd和accept后创建的与client连接的socket fd。

    三. 一个应用reactor的linux网络服务器模型

    1. 定义一个Dispatcher,定义process, register_handler以及remove_handler三个接口;process中通过epoll来监听网络IO事件。

    2. 通过epoll_create创建一个用于epoll监听的epoll fd: efd。

    3. 通过epoll wait来监听efd中有事件发生的socket fd。

    class Dispatcher
    {
    public:
    	void process();
    	void register_handler(int handle_type, int fd, auto *event_handler);
    	void remove_handler(int fd);
    
    private:
    	int _efd;
    };
    
    Dispacher::Dispacher():
    	_efd(epoll_create(EXCEPTED_SIZE))
    {}
    
    void Dispacheer::process()
    {
    	struct epoll_event events[MAX_EVENTS]
    	int nfds = epoll_wait(epfd_, events, MAX_EVENTS, maxWaitInMilliseconds);
    	for (int i = 0; i < nfds; ++i)
    	{
    		// find event handler by fd and epoll event(EPOLLIN and EPOLLOUT)
    		...
    		evnet_handler->handle_event(events[i].data.fd);
    	}
    }

    4. 创建一个server端的socket fd,将此fd通过epoll_ctrl添加到efd的监听队列中。

    5. 为该socket fd注册EventHandler:AcceptHandler来处理客户端的连接,创建新的socket fd。

    void create_listen_socket()
    {
    	listenfd = socket(AF_INET, SOCK_STREAM, 0); 
    	...
    	listen(listenfd, LISTENQ); 
    	struct epoll_event ev
    	ev.data.fd=listenfd;   
    	ev.events=EPOLLIN|EPOLLET;  
    	epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd,&ev);
    	...
    	_dispacher->register_accept(listenfd, new AcceptHandler(listenfd, _dispacher));
    }

    6. AcceptHandler的定义如下;在handle_event中,将新创建的socket fd通过epoll_ctrl加入到efd的监听队列中,Dispatcher通过epoll监听多个socket fd中的事件。

    7. 为新创建的socket fd注册读写事件,处理读写请求。

    class AcceptHandler()
    {
    public:
    	bool handle_event(int fd);
    
    private:
    	int _handle_fd;
    	Dispacher *_dispacher;
    };
    
    AcceptHandler::AcceptHandler(int handle_fd, Dispacher *dispacher):
    	_handle_fd(handle_fd),
    	_dispacher(dispacher)
    {}
    
    
    
    bool AcceptHandler::handle_event(int fd)
    {	
    	...
    	connfd = accept(_handle_fd,(sockaddr *)&clientaddr, &clilen)
    	...
    	epoll_ctl(epfd, EPOLL_CTL_ADD, listenfd, &ev);
    	...
    	# register handler for new connfd
    	_dispacher->regiter_read(connfd, new ReadHandler(connfd, _dispacher));
    	_dispacher->register_write(connfd, new WriteHandler(connfd, _dispacher));
    }
    
    
    class ReadHandler() {}
    class WriteHandler() {}

    四. 扩展

    1. kbengie的reactor网络模型(待补充)。

    2. boost的proactor网络模型(待补充)。

     

    参考

    https://blog.csdn.net/u010168160/article/details/53019039

     

     

    展开全文
  • Reactor 模式

    2018-10-10 06:29:40
    什么是 Reactor 模式 wiki:“The reactor design pattern is an event handling pattern for handling service requests delivered concurrently by one or more inputs. The service handler then demultiplexes ....

    Reactor

    什么是 Reactor 模式

    wiki:“The reactor design pattern is an event handling pattern for handling service requests delivered concurrently by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to associated request handlers.”

    为什么会有 Reactor 呢

    对于应用程序而言,CPU 的处理速度是远远快于 IO 的速度的。如果CPU为了IO操作(例如从Socket读取一段数据)而阻塞显然是不划算的。好一点的方法是分为多进程或者线程去进行处理,但是这样会带来一些进程切换的开销,试想一个进程一个数据读了500ms,期间进程切换到它3次,但是CPU却什么都不能干,就这么切换走了,是不是也不划算?

    这时先驱们找到了事件驱动,或者叫回调的方式,来完成这件事情。这种方式就是,应用业务向一个中间人注册一个回调(event handler),当IO就绪后,就这个中间人产生一个事件,并通知此handler进行处理。这种回调的方式,也体现了“好莱坞原则”(Hollywood principle)-“Don’t call us, we’ll call you”,在我们熟悉的IoC中也有用到。看来软件开发真是互通的!

    Reactor 应用场景

    Reactor 核心是解决多请求问题。一般来说,Thread-Per-Connection 的应用场景并发量不是特别大,如果并发量过大,会导致线程资源瞬间耗尽,导致服务陷入阻塞,这个时候就需要 Reactor 模式来解决这个问题。Reactor 通过多路复用的思想大大减少线程资源的使用。

    Reactor 结构

    上图是 Reactor 模型,主要涉及的类:

    • Initiation Dispatcher:EventHandler 的容器,用来注册、移除 EventHandler 等;另外,它作为 Reactor 模式的入口调用 Synchronous Event Demultiplexer 的 select 方法以阻塞等待事件的返回,当阻塞事件返回时,将事件发生的 Handle 分发到相应的 EvenHandler 处理。
    • Even Handler:定义了事件处理的方法。
    • Handle:即操作系统中的句柄,是对资源在操作系统层面上的一种抽象,它可以是打开的文件、一个连接(Socket)、Timer等。
    • Synchronous Event Demultiplexer:使用一个事件循环 ,以阻止所有的资源。当可以启动一个同步操作上的资源不会阻塞,多路分解器发送资源到分发器。

    Reactor 时序图

    1. 初始化 InitationDispatcher,并初始化一个Handle到EventHandler的Map。
    2. 注册 EvenHandler 到 InitationDispatcher,每个 EventHandler 包含对相应 Handle 的引用,从而建立Handle到EventHandler的映射(Map)。
    3. 调用 InitiationDispatcher 的 handle_events() 方法以启动 Event Loop。在 Event Loop 中,调用 select()方法(Synchronous Event Demultiplexer)阻塞等待Event发生。
    4. 当某个或某些 Handle 的 Event 发生后,select() 方法返回,InitiationDispatcher 根据返回的Handle找到注册的 EventHandler ,并回调该 EventHandler 的 handle_events() 方法。
    5. 在 EventHandler 的 handle_events() 方法中还可以向 InitiationDispatcher 中注册新的 Eventhandler,比如对 AcceptorEventHandler 来说,当有新的 client 连接时,它会产生新的 EventHandler 以处理新的连接,并注册到 InitiationDispatcher 中。

    Reactor 模式

    单线程 Reactor 模式

    简单来说,接收请求和处理请求是同一线程中处理。

    对于一些小容量应用场景,可以使用单线程模型。但是对于高负载、大并发或大数据量的应用场景却不合适,主要原因如下: ① 一个NIO线程同时处理成百上千的链路,性能上无法支撑,即便NIO线程的CPU负荷达到100%,也无法满足海量消息的读取和发送; ② 当NIO线程负载过重之后,处理速度将变慢,这会导致大量客户端连接超时,超时之后往往会进行重发,这更加重了NIO线程的负载,最终会导致大量消息积压和处理超时,成为系统的性能瓶颈;

    多线程 Reactor 模式

    简单来说,接收请求和处理请求是不同线程中处理。

    mainReactor 一般只有一个,主要负责接收客户端的连接并将其传递给 subReactor。subReactor 一般会有多个,主要负责处理与客户端的通信。

    注意:上图使用了Thread Pool来处理耗时的业务逻辑,提高Reactor线程的I/O响应,不至于因为一些耗时的业务逻辑而延迟对后面I/O请求的处理。

    Reactor 的优缺点

    优点:

    1. 大多数设计模式的共性:解耦、提升复用性、模块化、可移植性、事件驱动、细力度的并发控制等。
    2. 更为显著的是对性能的提升,即不需要每个 Client 对应一个线程,减少线程的使用。

    缺点:

    1. 相比传统的简单模型,Reactor增加了一定的复杂性,因而有一定的门槛,并且不易于调试。
    2. Reactor模式需要底层的Synchronous Event Demultiplexer支持,比如Java中的Selector支持,操作系统的select系统调用支持,如果要自己实现Synchronous Event Demultiplexer可能不会有那么高效。
    3. Reactor模式在IO读写数据时还是在同一个线程中实现的,即使使用多个Reactor机制的情况下,那些共享一个Reactor的Channel如果出现一个长时间的数据读写,会影响这个Reactor中其他Channel的相应时间,比如在大文件传输时,IO操作就会影响其他Client的相应时间,因而对这种操作,使用传统的Thread-Per-Connection或许是一个更好的选择,或则此时使用Proactor模式。

    Reactor 代码实现

    预告

    介绍与 Reactor 相关的 NIO 以及 Netty。

    参考文献

    转载于:https://juejin.im/post/5bbd9b546fb9a05d2068651c

    展开全文

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 3,492
精华内容 1,396
关键字:

reactor模式