reactor_reactor-netty - CSDN
  • Reactor框架是ACE各个框架中最基础的一个框架,其他框架都或多或少地用到了Reactor框架。本文分析Reactor构架模式的基本原理。 2.1 Reactor构架模式 对每一个构架模式的分析,我们都使用参考文献的分析风格,着重...

    本文转载自:http://www.cnblogs.com/hzbook/archive/2012/07/19/2599698.html

    Reactor框架是ACE各个框架中最基础的一个框架,其他框架都或多或少地用到了Reactor框架。本文分析Reactor构架模式的基本原理。

    2.1  Reactor构架模式

    对每一个构架模式的分析,我们都使用参考文献的分析风格,着重分析意图、上下文、问题、解决方案、结构和实现 6个方面的内容。而实现就是ACE源代码。

    1. 意图

    在事件驱动的应用中,将一个或多个客户的服务请求分离(demultiplex)和调度(dispatch)给应用程序。

    2. 上下文

    在事件驱动的应用中,同步地、有序地处理同时接收的多个服务请求。

    3. 问题

    在分布式系统尤其是服务器这一类事件驱动应用中,虽然这些请求最终会被序列化地处理,但是必须时刻准备着处理多个同时到来的服务请求。在实际应用 中,这些请求总是通过一个事件(如CONNECTOR、READ、WRITE等)来表示的。在有序地处理这些服务请求之前,应用程序必须先分离和调度这些 同时到达的事件。为了有效地解决这个问题,我们需要做到以下4方面:

    • 为了提高系统的可测量性和反应时间,应用程序不能长时间阻塞在某个事件源上而停止对其他事件的处理,这样会严重降低对客户端的响应度。
    • 为了提高吞吐量,任何没有必要的上下文切换、同步和CPU之间的数据移动都要避免。
    • 引进新的服务或改良已有的服务都要对既有的事件分离和调度机制带来尽可能小的影响。
    • 大量的应用程序代码需要隐藏在复杂的多线程和同步机制之后。

    4. 解决方案

    在一个或多个事件源上等待事件的到来,例如,一个已经连接的Socket描述符就是一个事件源。将事件的分离和调度整合到处理它的服务中,而将分离和调度机制从应用程序对特定事件的处理中分离开,也就是说分离和调度机制与特定的应用程序无关。

    具体来说,每个应用程序提供的每个服务都有一个独立的事件处理器与之对应。由事件处理器处理来自事件源的特定类型的事件。每个事件处理器都事先注册 到Reactor管理器中。Reactor管理器使用同步事件分离器在一个或多个事件源中等待事件的发生。当事件发生后,同步事件分离器通知 Reactor管理器,最后由Reactor管理器调度和该事件相关的事件处理器来完成请求的服务。

    5. 结构

    在Reactor模式中,有5个关键的参与者。

    • 描述符(handle):由操作系统提供,用于识别每一个事件,如Socket描述符、文件描述符等。在Linux中,它用一个整数来表示。事件可以来自外部,如来自客户端的连接请求、数据等。事件也可以来自内部,如定时器事件。
    • 同步事件分离器(demultiplexer):是一个函数,用来等待一个或多个事件的发生。调用者会被阻塞,直到分离器分离的描述符集上有事件发生。Linux的select函数是一个经常被使用的分离器。
    • 事件处理器接口(event handler):是由一个或多个模板函数组成的接口。这些模板函数描述了和应用程序相关的对某个事件的操作。
    • 具体的事件处理器:是事件处理器接口的实现。它实现了应用程序提供的某个服务。每个具体的事件处理器总和一个描述符相关。它使用描述符来识别事件、识别应用程序提供的服务。
    • Reactor 管理器(reactor):定义了一些接口,用于应用程序控制事件调度,以及应用程序注册、删除事件处理器和相关的描述符。它是事件处理器的调度核心。 Reactor管理器使用同步事件分离器来等待事件的发生。一旦事件发生,Reactor管理器先是分离每个事件,然后调度事件处理器,最后调用相关的模 板函数来处理这个事件。

    通过上述分析,我们注意到,是Reactor管理器而不是应用程序负责等待事件、分离事件和调度事件。实际上,Reactor管理器并没有被具体的 事件处理器调用,而是管理器调度具体的事件处理器,由事件处理器对发生的事件做出处理。这就是类似Hollywood原则的“反向控制”。应用程序要做的 仅仅是实现一个具体的事件处理器,然后把它注册到Reactor管理器中。接下来的工作由管理器来完成。这些参与者的相互关系如图2-1所示。

    现在结合第1章分析的框架五元素来看一下Reactor构架模式的参与者与框架五元素之间的关系:Reactor构架模式的具体实现对应了元素1; 事件处理器接口对应元素2;具体的事件处理器对应元素3;Reactor管理器使用了Hollywood原则,可以认为和元素5对应;元素4的功能相对不 明显,没有明确的对应关系。

    如果还是没有理解Reactor构架模式,没有关系,源代码会说明所有问题。此时可再分析一遍Reactor构架模式,然后继续以下内容。


    2.2  Reactor框架结构


    Reactor包含如下角色:

    • Handle 句柄;用来标识socket连接或是打开文件;
    • Synchronous Event Demultiplexer:同步事件多路分解器:由操作系统内核实现的一个函数;用于阻塞等待发生在句柄集合上的一个或多个事件;(如select/epoll;)
    • Event Handler:事件处理接口
    • Concrete Event HandlerA:实现应用程序所提供的特定事件处理逻辑;
    • Reactor:反应器,定义一个接口,实现以下功能:
      1)供应用程序注册和删除关注的事件句柄;
      2)运行事件循环;
      3)有就绪事件到来时,分发事件到之前注册的回调函数上处理;

    “反应”器名字中”反应“的由来:

    “反应”即“倒置”,“控制逆转”

    具体事件处理程序不调用反应器,而是由反应器分配一个具体事件处理程序,具体事件处理程序对某个指定的事件发生做出反应;这种控制逆转又称为“好莱坞法则”(不要调用我,让我来调用你)


    3.1  Reactor框架概述

    从对Reactor构架模式的分析中我们可以看出,要设计和实现一个简单Reactor框架以支持I/O事件,需要实现两个组件:事件处理器接口和Reactor管理器。至于其他组件,如同步事件分离器可以使用操作系统提供的select、poll或其他类似的函数;而描述符可以使用文件描述符或其他可以识别事件的数据结构,一般操作系统都会提供。事件处理器接口包含一系列模板函数,可以根据实际处理的数据进行设计;Reactor管理器肩负着事件的分离和调度,是整个框架设计的核心。

    ACE的Reactor框架在Linux平台下使用文件描述符作为I/O事件的描述符,使用ACE_Event_Handler类作为各类事件的处理器接口。将同步事件分离函数放到Reactor管理器中,这样使用不同的同步事件分离函数就需要实现不同的Reactor管理器。ACE使用Bridge设计模式解决了这一问题,将与同步事件分离函数相关的操作放到Bridge设计模式的Implementor中。凡是ACE支持的同步事件分离函数都会有一个具体的Implementor与之对应。

    ACE的Reactor管理器还提供了用于实现Singleton设计模式的操作,使用这些操作时,一个进程只能有一个全局的Reactor管理器。在调用Singleton设计模式接口时,Reactor管理器会在启动时根据操作系统的配置选择一个具体的Implementor。当然,如果你不喜欢这个默认的Implementor,可以通过函数进行更换。为了提高整个系统对事件分离和调度的性能,ACE还允许应用程序创建多个Reactor管理器实例。在这种情况下,应用程序将不能调用用于Singleton设计模式的操作,只能直接使用Reactor管理器实例对象的方法实现对事件的分离和调度。同时提供这两种使用方法,可以最大程度地满足应用程序的苛刻要求。

    ACE实现的Reactor框架结构要比Reactor构架模式中分析的结构复杂得多。这是因为ACE的Reactor框架除了处理I/O事件之外,还要处理定时器、信号量等常见的事件,并且所有的这些处理都必须满足跨平台的要求。要将对这些事件的处理抽象出来,并且提供给应用程序一个统一的接口,ACE的Reactor管理器的实现还采用了Facade设计模式。实际上,Reactor框架管理的I/O事件、信号量事件、定时器事件和Notify事件在实现上都有一个小的组件与之对应,这样可以将Reactor管理器与具体的事件处理解耦。使用Facade设计模式,将这些小的组件的接口封装起来,使得应用程序无法感知它们的存在,可以减少应用程序处理对象的数目,并且使得这些小的组件使用起来更加方便。

    以上分析的3种设计模式以及Factory设计模式,在ACE的框架管理器的实现中被频繁使用。这些设计模式以及它们的使用,既为我们学习设计模式提供了非常好的场景,又为我们实现软件框架管理器提供了实用的方法。ACE的Reactor框架与框架五元素的对应关系非常密切,是一个典型的事件驱动型框架,它为我们打开了ACE的框架之门,是学习其他框架的基础。

    在深入到框架代码之前,我们先来看一个Reactor框架的使用示例,示例虽然简单,但却提供了一个实实在在的应用程序,也为我们的分析提供了一些思路。

    3.2  Reactor框架应用示例

    在本示例中我们用Reactor框架实现一个简单的服务器程序。这个服务器程序等待客户的连接请求,一旦请求到来,Socket连接建立后,服务器程序简单地打印客户端的地址信息和接收的数据,最后将新建的Socket关闭。

    示例中的服务器端程序需要处理两类事件:Socket连接事件和通常的I/O事件。Socket连接事件用于接收客户端的连接请求,I/O事件用于接收客户端发送的数据。这两类事件是完全不同的,它们的文件描述符也不一样。与这两类事件相对应,应用程序需要实现两个具体的事件处理器。一个用于处理客户端的连接,一个用于处理客户端的数据。在示例中,这两个具体的事件处理器分别为Acceptor类和Handle_data类,其中Acceptor类用于处理客户端的连接,Handle_data类用于处理客户端的数据。


    展开全文
  • 抽丝剥茧Reactor模式

    2020-07-28 14:18:11
    今天在看书的时候看到了一个新的设计模式——Reactor模式,这个模式是出现在NIO中,至于这到底是个什么模式,今天我们来细说一下。 reactor模式是javaNIO非堵塞技术的实现原理,我们不仅要知道其原理流程,还要知道...

          今天在看书的时候看到了一个新的设计模式——Reactor模式,这个模式是出现在NIO中,至于这到底是个什么模式,今天我们来细说一下。

    一、是什么

    1、概念

          reactor设计模式,是一种基于事件驱动的设计模式。Reactor框架是ACE各个框架中最基础的一个框架,其他框架都或多或少地用到了Reactor框架。
          在事件驱动的应用中,将一个或多个客户的服务请求分离(demultiplex)和调度(dispatch)给应用程序。在事件驱动的应用中,同步地、有序地处理同时接收的多个服务请求。
          reactor模式与外观模式有点像。不过,观察者模式与单个事件源关联,而反应器模式则与多个事件源关联 。当一个主体发生改变时,所有依属体都得到通知。

    2、优点

           1)响应快,不必为单个同步时间所阻塞,虽然Reactor本身依然是同步的;
           2)编程相对简单,可以最大程度的避免复杂的多线程及同步问题,并且避免了多线程/进程的切换开销;
           3)可扩展性,可以方便的通过增加Reactor实例个数来充分利用CPU资源;
           4)可复用性,reactor框架本身与具体事件处理逻辑无关,具有很高的复用性;

    3、缺点

          1)相比传统的简单模型,Reactor增加了一定的复杂性,因而有一定的门槛,并且不易于调试。
          2)Reactor模式需要底层的Synchronous Event Demultiplexer支持,比如Java中的Selector支持,操作系统的select系统调用支持,如果要自己实现Synchronous Event Demultiplexer可能不会有那么高效。
          3) Reactor模式在IO读写数据时还是在同一个线程中实现的,即使使用多个Reactor机制的情况下,那些共享一个Reactor的Channel如果出现一个长时间的数据读写,会影响这个Reactor中其他Channel的相应时间,比如在大文件传输时,IO操作就会影响其他Client的相应时间,因而对这种操作,使用传统的Thread-Per-Connection或许是一个更好的选择,或则此时使用Proactor模式。

    二、架构模式

    1、架构图

    这里写图片描述

    2、构成

    Handles :表示操作系统管理的资源,我们可以理解为fd。

    Synchronous Event Demultiplexer :同步事件分离器,阻塞等待Handles中的事件发生。

    Initiation Dispatcher :初始分派器,作用为添加Event handler(事件处理器)、删除Event handler以及分派事件给Event handler。也就是说,Synchronous Event Demultiplexer负责等待新事件发生,事件发生时通知Initiation Dispatcher,然后Initiation Dispatcher调用event handler处理事件。

    Event Handler :事件处理器的接口

    Concrete Event Handler :事件处理器的实际实现,而且绑定了一个Handle。因为在实际情况中,我们往往不止一种事件处理器,因此这里将事件处理器接口和实现分开,与C++、Java这些高级语言中的多态类似。

    3、模块交互

          1)我们注册Concrete Event Handler到Initiation Dispatcher中。
          2)Initiation Dispatcher调用每个Event Handler的get_handle接口获取其绑定的Handle。
          3)Initiation Dispatcher调用handle_events开始事件处理循环。在这里,Initiation Dispatcher会将步骤2获取的所有Handle都收集起来,使用Synchronous Event Demultiplexer来等待这些Handle的事件发生。
         4)当某个(或某几个)Handle的事件发生时,Synchronous Event Demultiplexer通知Initiation Dispatcher。
           5)Initiation Dispatcher根据发生事件的Handle找出所对应的Handler。
           6)Initiation Dispatcher调用Handler的handle_event方法处理事件。

    三、代码注释

    package com.linxcool.reactor;  
      
    import java.io.IOException;  
    import java.net.InetAddress;  
    import java.net.InetSocketAddress;  
    import java.nio.channels.SelectionKey;  
    import java.nio.channels.Selector;  
    import java.nio.channels.ServerSocketChannel;  
    import java.util.Iterator;  
    import java.util.Set;  
      
    /** 
     * 反应器模式 
     * 用于解决多用户访问并发问题 
     *  
     * 举个例子:餐厅服务问题 
     *  
     * 传统线程池做法:来一个客人(请求)去一个服务员(线程) 
     * 反应器模式做法:当客人点菜的时候,服务员就可以去招呼其他客人了,等客人点好了菜,直接招呼一声“服务员” 
     *  
     * @author linxcool 
     */  
    public class Reactor implements Runnable{  
        public final Selector selector;  
        public final ServerSocketChannel serverSocketChannel;  
      
        public Reactor(int port) throws IOException{  
            selector=Selector.open();  
            serverSocketChannel=ServerSocketChannel.open();  
            InetSocketAddress inetSocketAddress=new InetSocketAddress(InetAddress.getLocalHost(),port);  
            serverSocketChannel.socket().bind(inetSocketAddress);  
            serverSocketChannel.configureBlocking(false);  
              
            //向selector注册该channel    
            SelectionKey selectionKey=serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);  
      
            //利用selectionKey的attache功能绑定Acceptor 如果有事情,触发Acceptor   
            selectionKey.attach(new Acceptor(this));  
        }  
      
        @Override  
        public void run() {  
            try {  
                while(!Thread.interrupted()){  
                    selector.select();  
                    Set<SelectionKey> selectionKeys= selector.selectedKeys();  
                    Iterator<SelectionKey> it=selectionKeys.iterator();  
                    //Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。  
                    while(it.hasNext()){  
                        //来一个事件 第一次触发一个accepter线程    
                        //以后触发SocketReadHandler  
                        SelectionKey selectionKey=it.next();  
                        dispatch(selectionKey);  
                        selectionKeys.clear();  
                    }  
                }  
            } catch (IOException e) {  
                e.printStackTrace();  
            }  
        }  
          
        /** 
         * 运行Acceptor或SocketReadHandler 
         * @param key 
         */  
        void dispatch(SelectionKey key) {  
            Runnable r = (Runnable)(key.attachment());    
            if (r != null){    
                r.run();  
            }    
        }    
      
    }  
    
    package com.linxcool.reactor;  
      
    import java.io.IOException;  
    import java.nio.channels.SocketChannel;  
      
    public class Acceptor implements Runnable{  
        private Reactor reactor;  
        public Acceptor(Reactor reactor){  
            this.reactor=reactor;  
        }  
        @Override  
        public void run() {  
            try {  
                SocketChannel socketChannel=reactor.serverSocketChannel.accept();  
                if(socketChannel!=null)//调用Handler来处理channel  
                    new SocketReadHandler(reactor.selector, socketChannel);  
            } catch (IOException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
    
    package com.linxcool.reactor;  
      
    import java.io.IOException;  
    import java.nio.ByteBuffer;  
    import java.nio.channels.SelectionKey;  
    import java.nio.channels.Selector;  
    import java.nio.channels.SocketChannel;  
      
    public class SocketReadHandler implements Runnable{  
        private SocketChannel socketChannel;  
        public SocketReadHandler(Selector selector,SocketChannel socketChannel) throws IOException{  
            this.socketChannel=socketChannel;  
            socketChannel.configureBlocking(false);  
              
            SelectionKey selectionKey=socketChannel.register(selector, 0);  
              
            //将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法。    
            //参看dispatch(SelectionKey key)    
            selectionKey.attach(this);  
              
            //同时将SelectionKey标记为可读,以便读取。    
            selectionKey.interestOps(SelectionKey.OP_READ);    
            selector.wakeup();  
        }  
          
        /** 
         * 处理读取数据 
         */  
        @Override  
        public void run() {  
            ByteBuffer inputBuffer=ByteBuffer.allocate(1024);  
            inputBuffer.clear();  
            try {  
                socketChannel.read(inputBuffer);  
                //激活线程池 处理这些request  
                //requestHandle(new Request(socket,btt));   
            } catch (IOException e) {  
                e.printStackTrace();  
            }  
        }  
    }  
    

    总结:

          reactor模式是javaNIO非堵塞技术的实现原理,我们不仅要知道其原理流程,还要知道其代码实现,当然这个reactor模式不仅仅在NIO中实现,而且在redies等其他地方也出现过,说明这个模式还是比较实用的,尤其是在多线程高并发的情况下使用。

    展开全文
  • 适合阅读的人群:本文适合对 Spring、Netty 等框架,以及 Java 8 的 Lambda、Stream 等特性有基本认识,希望了解 Spring 5 的反应式编程特性的技术人员阅读。一、前言最近几年,随着 Node.js、Golang 等新技术、新...

    适合阅读的人群:本文适合对 Spring、Netty 等框架,以及 Java 8 的 Lambda、Stream 等特性有基本认识,希望了解 Spring 5 的反应式编程特性的技术人员阅读。

    一、前言

    最近几年,随着 Node.js、Golang 等新技术、新语言的出现,Java 的服务器端开发语言老大的地位受到了不小的挑战。虽然,Java 的市场份额依旧很大,短时间内也不会改变,但 Java 社区对于挑战也并没有无动于衷。相反,Java 社区积极应对这些挑战,不断提高自身应对高并发服务器端开发场景的能力。

    为了应对高并发的服务器端开发,在2009年的时候,微软提出了一个更优雅地实现异步编程的方式 —— Reactive Programming,中文称反应式编程。随后,其它技术也迅速地跟上了脚步,像 ES6 通过 Promise 引入了类似的异步编程方式。Java 社区也没有落后很多,Netflix 和 TypeSafe 公司提供了 RxJava 和 Akka Stream 技术,让 Java 平台也有了能够实现反应式编程的框架。

    其实,在更早之前,像 Mina 和 Netty 这样的 NIO 框架其实也能搞定高并发的服务器端开发任务,但这样的技术相对来说只是少数高级开发人员手中的工具。对于更多的普通开发者来说,难度显得大了些,所以不容易普及。

    很多年过去了,到了2017年,虽然已经有不少公司在实践反应式编程。但整体来说,应用范围依旧不大。原因在于缺少简单易用的技术将反应式编程推广普及,并同诸如 MVC 框架、HTTP 客户端、数据库技术等整合。

    终于,在2017年9月28日,解决上面问题的利器浮出水面 —— Spring 5 正式发布。Spring 5 其最大的意义就是能将反应式编程技术的普及向前推进一大步。而作为在背后支持 Spring 5 反应式编程的框架 Reactor,也相应的发布了 3.1.0 版本。

    本文接下来将会向大家介绍 Reactive Programming(反应式编程)、Reactor 的入门以及实践技巧等相关的内容。文章中的实践内容来自作者使用 Spring 5 和 Reactor 等技术改造实际项目的经历。

    二、Reactor 简介

    先介绍一下 Reactor 技术。Reactor 框架是 Pivotal 公司(开发 Spring 等技术的公司)开发的,实现了 Reactive Programming 思想,符合 Reactive Streams 规范(Reactive Streams 是由 Netflix、TypeSafe、Pivotal 等公司发起的)的一项技术。其名字有反应堆之意,反映了其背后的强大的性能。

    Reactive Programming

    Reactive Programming,中文称反应式编程,是一种高性能应用的编程方式。其最早是由微软提出并引入到 .NET 平台中,随后 ES6 也引入了类似的技术。在 Java 平台上,较早采用反应式编程技术的是 Netflix 公司开源的 RxJava 框架。现在大家比较熟知的 Hystrix 就是以 RxJava 为基础开发的。

    反应式编程其实并不神秘,通过与我们熟悉的迭代器模式对比便可了解其基本思想:

    eventIterable (pull)Observable (push)
    retrieve dataT next()onNext(T)
    discover errorthrows ExceptiononError(Exception)
    complete!hasNext()onCompleted()

    上面表格的中的 Observable 那一列便代表反应式编程的 API 使用方式。可见,它就是常见的观察者模式的一种延伸。如果将迭代器看作是拉模式,那观测者模式便是推模式。被订阅者(Publisher)主动的推送数据给订阅者(Subscriber),触发 onNext 方法。异常和完成时触发另外两个方法。如果 Publisher 发布消息太快了,超过了 Subscriber 的处理速度,那怎么办。这就是 Backpressure 的由来,Reactive Programming 框架需要提供机制,使得 Subscriber 能够控制消费消息的速度。

    在 Java 平台上,Netflix(开发了 RxJava)、TypeSafe(开发了 Scala、Akka)、Pivatol(开发了 Spring、Reactor)共同制定了一个被称为 Reactive Streams 项目(规范),用于制定反应式编程相关的规范以及接口。其主要的接口有这三个:

    • Publisher
    • Subscriber
    • Subcription

    其中,Subcriber 中便包含了上面表格提到的 onNextonErroronCompleted 这三个方法。

    对于 Reactive Streams,大家只需要理解其思想就可以,包括基本思想以及 Backpressure 等思想即可。

    Imperative vs Reactive

    对于上面表格里提到的 Iterable 和 Observale 两种风格,还有另一个称呼,便是 Imperative(指令式编程)和 Reactive(反应式编程)这两种风格。其实就是拉模型和推模型的另一种表述,大家理解其中的思想即可。对于 Imperative,老外写的文章有时会用,直译就是指令式编程,其实就是我们大家平时用 Java、Python 等语言写代码的常见风格,代码执行顺序和编写顺序基本一致(这里不考虑 JVM 指令重排)

    Reactor 的主要模块

    Reactor 框架主要有两个主要的模块:reactor-core 和 reactor-ipc。前者主要负责 Reactive Programming 相关的核心 API 的实现,后者负责高性能网络通信的实现,目前是基于 Netty 实现的。

    Reactor 的主要类

    在 Reactor 中,经常使用的类并不是很多,主要有以下两个:

    • Mono 实现了 org.reactivestreams.Publisher 接口,代表0到1个元素的发布者。
    • Flux 同样实现了 org.reactivestreams.Publisher 接口,代表0到N个元素的发表者。

    可能会使用到的类

    • Scheduler 表示背后驱动反应式流的调度器,通常由各种线程池实现。

    Web Flux

    Spring 5 引入的一个基于 Netty 而不是 Servlet 的高性能的 Web 框架,但是使用方式并没有同传统的基于 Servlet 的 Spring MVC 有什么大的不同。

    ▼ Web Flux 中 MVC 接口的示例

    @RequestMapping("/demo")
    @RestController
    public class DemoController {
        @RequestMapping(value = "/foobar")
        public Mono<Foobar> foobar() {
            return Mono.just(new Foobar());
        }
    }
    

    最大的变化就是返回值从 Foobar 所表示的一个对象变为 Mono<Foobar> (或 Flux<T>

    当然,实际的程序并不会像示例那样就一行代码。关于如何开发实际的应用,这些正是后面介绍 Reactor 的部分所要详细叙述的。

    Reactive Streams、Reactor 和 Web Flux

    上面介绍了反应式编程的一些概念,以及 Reactor 和 Web Flux。可能读者看到这里有些乱。这里介绍一下三者的关系。其实很简单:

    Reactive Streams 是规范,Reactor 实现了 Reactive Streams。Web Flux 以 Reactor 为基础,实现 Web 领域的反应式编程框架。

    其实,对于大部分业务开发人员来说,当编写反应式代码时,我们通常只会接触到 Publisher 这个接口,对应到 Reactor 便是 MonoFlux。对于 SubscriberSubcription 这两个接口,Reactor 必然也有相应的实现。但是,这些都是 Web Flux 和 Spring Data Reactive 这样的框架用到的。如果不开发中间件,通常开发人员是不会接触到的。

    比如,在 Web Flux,你的方法只需返回 MonoFlux 即可。你的代码基本也只和 MonoFlux 打交道。而 Web Flux 则会实现 SubscriberonNext 时将业务开发人员编写的 MonoFlux 转换为 HTTP Response 返回给客户端。

    三、Reactor 入门

    接下来介绍一下 Reactor 中 Mono 和 Flux 这两个类中的主要方法的使用。

    如同 Java 8 所引入的 Stream 一样,Reactor 的使用方式基本上也是分三步:开始阶段的创建、中间阶段的处理和最终阶段的消费。只不过创建和消费可能是通过像 Spring 5 这样框架完成的(比如通过 Web Flux 中的 WebClient 调用 HTTP 接口,返回值便是一个 Mono)。但我们还是需要基本了解这些阶段的开发方式。

    1. 创建 Mono 和 Flux(开始阶段)

    使用 Reactor 编程的开始必然是先创建出 Mono 或 Flux。有些时候不需要我们自己创建,而是实现例如 WebFlux 中的 WebClient 或 Spring Data Reactive 得到一个 Mono 或 Flux。

    ▼ 使用 WebFlux WebClient 调用 HTTP 接口

    WebClient webClient = WebClient.create("http://localhost:8080");
    
    public Mono<User> findById(Long userId) {
        return webClient
                .get()
                .uri("/users/" + userId)
                .accept(MediaType.APPLICATION_JSON)
                .exchange()
                .flatMap(cr -> cr.bodyToMono(User.class));
    }
    

    ▼ 使用 ReactiveMongoRepository 查询 User

    public interface UserRepository extends ReactiveMongoRepository<User, Long> {
        Mono<User> findByUsername(String username);
    }
    

    但有些时候,我们也需要主动地创建一个 Mono 或 Flux。

    “普通”的创建方式

    简单的创建方式是主要是使用像 just 这样的方法创建

    Mono<String> helloWorld = Mono.just("Hello World");
    Flux<String> fewWords = Flux.just("Hello", "World");
    Flux<String> manyWords = Flux.fromIterable(words);
    

    这样的创建方式在什么时候用呢?一般是用在当你在经过一系列非 IO 型的操作之后,得到了一个对象。接下来要基于这个对象运用 Reactor 进行高性能的 IO 操作时,可以用这种方式将你之前得到的对象转换为 Mono 或 Flux。

    “文艺”的创建方式

    上述是我们通过一个同步调用得到的结果创建出 MonoFlux,但有时我们需要从一个非 Reactive 的异步调用的结果创建出 Mono 或 Flux。那如何实现呢。

    如果这个异步方法返回一个 CompletableFuture,那我们可以基于这个 CompletableFuture 创建一个 Mono:

    Mono.fromFuture(aCompletableFuture);
    

    如果这个异步调用不会返回 CompletableFuture,是有自己的回调方法,那怎么创建 Mono 呢?我们可以使用 static <T> Mono<T> create(Consumer<MonoSink<T>> callback) 方法:

    Mono.create(sink -> {
        ListenableFuture<ResponseEntity<String>> entity = asyncRestTemplate.getForEntity(url, String.class);
    
        entity.addCallback(new ListenableFutureCallback<ResponseEntity<String>>() {
            @Override
            public void onFailure(Throwable ex) {
                sink.error(ex);
            }
    
            @Override
            public void onSuccess(ResponseEntity<String> result) {
                sink.success(result.getBody());
            }
        });
    });
    

    在使用 WebFlux 之后,AsyncRestTemplate 已经不推荐使用,这里只是做演示。

    2. 处理 Mono 和 Flux(中间阶段)

    中间阶段的 Mono 和 Flux 的方法主要有 filtermapflatMapthenzipreduce 等。这些方法使用方法和 Stream 中的方法类似。对于这些方法的介绍,将会放在下一节“Reactor 进阶”中,主要介绍这些方法不容易理解和使用容易出问题的点。

    3. 消费 Mono 和 Flux(结束阶段)

    直接消费的 Mono 或 Flux 的方式就是调用 subscribe 方法。如果在 Web Flux 接口中开发,直接返回 Mono 或 Flux 即可。Web Flux 框架会为我们完成最后的 Response 输出工作。

    四、Reactor 进阶

    接下来我将介绍一下我在使用 Reactor 开发实际项目时遇到的一些稍显复杂的问题,以及解决方法。

    问题一:mapflatMapthen 分别在什么时候使用?

    本段内容将涉及到如下类和方法:

    • 方法:Mono.map
    • 方法:Mono.flatMap
    • 方法:Mono.then
    • 类:Function

    MonoFlux 中间环节处理的处理过程中,有三个有些类似的方法:mapflatMapthen。这三个方法可以说是 Reactor 中使用频率很高的方法。

    ▼ 传统的命令式编程

    Object result1 = doStep1(params);
    Object result2 = doStep2(result1);
    Object result3 = doStep3(result2);
    

    ▼ 对应的反应式编程

    Mono.just(params)
        .flatMap(v -> doStep1(v))
        .flatMap(v -> doStep2(v))
        .flatMap(v -> doStep3(v));
    

    从上面两段代码的对比就很容易看出来 flatMap 方法在其中起到的作用,mapthen 方法也有类似的作用。但这些方法之间的区别是什么呢?我们先来看看这三个方法的签名(以 Mono 为例):

    • flatMap(Function<? super T, ? extends Mono<? extends R>> transformer)
    • map(Function<? super T, ? extends R> mapper)
    • then(Mono<V> other)

    可见,最复杂的是 flatMap 方法,map 次之,then 最简单。从方法名字上看,flatMapmap 都是做映射之用。而 then 则是下一步的意思,最适合用于链式调用,但为什么上面的例子使用的是 flatMap 而不是 then

    then 表面看上去是下一步的意思,但它只表示执行顺序的下一步,不表示下一步依赖于上一步。这个语义同 ES6 Promise 中的 then 方法是不同的。从 then 方法的参数只是一个 Mono,无从接受上一步的执行结果。而 flatMapmap 的参数都是一个 Function。入参是上一步的执行结果。

    flatMapmap 的区别在于,flatMap 中的入参 Function 的返回值要求是一个 Mono(不明白的复习一下 Function 接口的定义),而 map 的入参 Function 只要求返回一个普通对象。因为我们在业务处理中常需要调用 WebClientReactiveXxxRepository 中的方法,这些方法的返回值都是 Mono(或 Flux)。所以要将这些调用串联为一个整体链式调用,就必须使用 flatMap,而不是 map

    所以,我们要正确理解 flatMapmapthen 这三个方法的用法和背后的含义,这样才能正确实践反应式编程。

    问题二:如何实现并发执行

    本段内容将涉及到如下类和方法:

    • 方法:Mono.zip
    • 类:Tuple2
    • 类:BiFunction

    并发执行是常见的一个需求。Reactive Programming 虽然是一种异步编程方式,但是异步不代表就是并发并行的。

    在传统的命令式开发方式中,并发执行是通过线程池加 Future 的方式实现的。

    Future<Result1> result1Future = doStep1(params);
    Future<Result2> result2Future = doStep2(params);
    Result1 result1 = result1Future.get();
    Result2 result2 = result2Future.get();
    // Do merge;
    return mergeResult;
    

    因为上面的代码虽然有一些异步效果在里面,但 Future.get() 方法是阻塞的。所以,当我们使用 Reactor 开发有并发执行场景的反应式代码时,肯定不能用上面的方式。这时,需要使用到 MonoFlux 中的 zip 方法。这里我们以 Mono 为例演示。代码如下:

    Mono<CustomType1> item1Mono = ...;
    Mono<CustomType2> item2Mono = ...;
    Mono.zip(items -> {
        CustomType1 item1 = CustomType1.class.cast(items[0]);
        CustomType2 item2 = CustomType2.class.cast(items[1]);
        // Do merge
        return mergeResult;
    }, item1Mono, item2Mono);
    

    上述代码中,产生 item1Monoitem2Mono 的过程是并行的。比如,调用一个 HTTP 接口的同时,执行一个数据库查询操作。这样就可以加快程序的执行。

    但上述代码存在一个问题,就是 zip 方法需要做强制类型转换。而强制类型转换是不安全的。所以我们需要更优雅的方式。

    好在 zip 方法存在多种重载形式。除了最基本的形式以外,还有多种类型安全的形式:

    static <T1, T2> Mono<Tuple2<T1, T2>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2);
    static <T1, T2, O> Mono<O> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, BiFunction<? super T1, ? super T2, ? extends O> combinator); 
    static <T1, T2, T3> Mono<Tuple3<T1, T2, T3>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2, Mono<? extends T3> p3);
    

    对于不超过7个元素的合并操作,都有类型安全的 zip 方法可选。

    以两个元素的合并为例,介绍一下使用方法:

    Mono.zip(item1Mono, item2Mono).map(tuple -> {
        CustomType1 item1 = tuple.getT1();
        CustomType2 item2 = tuple.getT2();
        // Do merge
        return mergeResult;
    });
    

    上述代码中,map 方法的参数是一个 Tuple2,表示一个二元数组,相应的还有 Tuple3Tuple4 等。

    另外,对于两个元素的并发执行,也可以通过 zip(Mono<? extends T1> p1, Mono<? extends T2> p2, BiFunction<? super T1, ? super T2, ? extends O> combinator) 方法直接将结果合并。方法是传递 BiFunction 实现合并算法。

    问题三:集合循环之后的汇聚

    本段内容将涉及到如下类和方法:

    • 方法:Flux.fromIterable
    • 方法:Flux.reduce
    • 类:BiFunction

    另外一个稍微复杂的场景是对一个对象中的一个类型为集合类的(List、Set)进行处理之后,再对原本的对象进行处理。使用 Imperative 风格的代码很容易编写:

    List<SubData> subDataList = data.getSubDataList();
    for (SubData item : subDataList) {
        // Do something on data and item
    }
    // Do something on data
    

    是不是简单到无以复加的地步了。但当我们要用 Reactive 风格的代码实现上述逻辑时,就不是那么简单了。

    要在 Reactive 风格的代码中实现上述逻辑,我们主要是要用到 Fluxreduce 方法。我们先来看 reduce 方法的签名:

    <A> Mono<A> reduce(A initial, BiFunction<A, ? super T, A> accumulator);
    

    从方法签名我们可以看出 reduce 方法的功能就是讲一个 Flux 聚合成一个 Mono。参数中第一个参数是返回值 Mono 中元素的初始值。

    第二个参数是一个 BiFunction,用来实现聚合操作的逻辑。泛型参数 <A, ? super T, A> 中,第一个 A 表示每次聚合操作(因为需要对集合中每个元素进行操作)之后的结果的类型,它作为 BiFunction.apply 方法的第一个入参 ;? super T 表示集合中的每个元素,它作为 BiFunction.apply 方法的第二个入参;最后一个 A 表示聚合操作的结果,它作为 BiFunction.apply 方法的返回值。

    接下来看一下示例:

    Data initData = ...;
    List<SubData> aList = ...;
    Flux.fromIterable(aList)
        .reduce(initData, (data, itemInList) -> {
            // Do something on data and itemInList
            return data;
        });
    

    上面的示例代码中,initDatadata 的类型相同,我们,但是命名不能重复。执行完上述代码之后, reduce 方法会返回 Mono<Data>

    五、小结

    本文介绍了反应式编程的一些概念和 Spring 的 Reactor 框架的基本用法,还介绍了如何用 Reactor 解决一些稍微复杂一点的问题。文章中的这些示例都来自我使用 Reactor 改造真实项目的实践过程,因为精力和时间有限,上面的例子还有很多局限性。但是希望本文能起到抛砖引玉的作用,让我们能看到更多关于反应式编程和 Reactor 使用方面的实践分享。

    上面的示例为了简单清晰,都是直接调用 Reactor 中相关的方法。但这里给大家一个建议,就是实践 Reactive 编程,更需要不断地使用像提取方法这样的手段进行重构以提高代码可读性,否则项目代码的可读性只会比传统风格的代码低。

    除了代码可读性方面的问题,实践反应式编程还有很多很多问题需要考虑。比如引入像 Netty 这样的 NIO 框架所带来的技术复杂度、单元测试的难度提升、和其它框架技术的整合等等。所以,对于像反应式编程这样的新技术,我们一方面要积极探索,另一方面也要评估它所带来的技术风险和它所带来的价值对于你的项目是否匹配。

    当然,整体而言,因为能带来更高的吞吐量,所以反应式编程的前景是非常光明。可以说,未来高性能的 Java Web 应用基本上都是反应式编程技术的天下,也是 Java 平台对抗 Golang、Node.js 等新技术的利器。

    本文结束!

    展开全文
  • Reactor详解

    2019-05-06 18:38:00
    reactor 是什么 为何要用,能解决什么问题 如何用,更好的方式 其他事件处理模式 一、Reactor 是什么 关于reactor 是什么,我们先从wiki上看下: The reactor design pattern is an event ...
    1. reactor 是什么

    2. 为何要用,能解决什么问题

    3. 如何用,更好的方式

    4. 其他事件处理模式

    一、Reactor 是什么

    关于reactor 是什么,我们先从wiki上看下:

    The reactor design pattern is an event handling pattern for handling service requests delivered concurrently to a service handler by one or more inputs. The service handler then demultiplexes the incoming requests and dispatches them synchronously to the associated request handlers.

    从上述文字中我们可以看出以下关键点 :

    1. 事件驱动(event handling)

    2. 可以处理一个或多个输入源(one or more inputs)

    3. 通过Service Handler同步的将输入事件(Event)采用多路复用分发给相应的Request Handler(多个)处理

    自POSA2 中的关于Reactor Pattern 介绍中,我们了解了Reactor 的处理方式:

    1. 同步的等待多个事件源到达(采用select()实现)

    2. 将事件多路分解以及分配相应的事件服务进行处理,这个分派采用server集中处理(dispatch)

    3. 分解的事件以及对应的事件服务应用从分派服务中分离出去(handler)

     

    关于Reactor Pattern 的OMT 类图设计:

     

    二、为何要用Reactor

    常见的网络服务中,如果每一个客户端都维持一个与登陆服务器的连接。那么服务器将维护多个和客户端的连接以出来和客户端的contnect 、read、write ,特别是对于长链接的服务,有多少个c端,就需要在s端维护同等的IO连接。这对服务器来说是一个很大的开销。

    1、BIO

    比如我们采用BIO的方式来维护和客户端的连接:

    // 主线程维护连接
      public void run() {
          try {
              while (true) {
                  Socket socket = serverSocket.accept();
                  //提交线程池处理
                  executorService.submit(new Handler(socket));
              }
          } catch (Exception e) {
              e.printStackTrace();
          }
      }
    ​
      // 处理读写服务
      class Handler implements Runnable {
          public void run() {
              try {
                  //获取Socket的输入流,接收数据
                  BufferedReader buf = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                  String readData = buf.readLine();
                  while (readData != null) {
                      readData = buf.readLine();
                      System.out.println(readData);
                  }
              } catch (Exception e) {
                  e.printStackTrace();
              }
          }
      }

     

    很明显,为了避免资源耗尽,我们采用线程池的方式来处理读写服务。但是这么做依然有很明显的弊端:

    1. 同步阻塞IO,读写阻塞,线程等待时间过长

    2. 在制定线程策略的时候,只能根据CPU的数目来限定可用线程资源,不能根据连接并发数目来制定,也就是连接有限制。否则很难保证对客户端请求的高效和公平。

    3. 多线程之间的上下文切换,造成线程使用效率并不高,并且不易扩展

    4. 状态数据以及其他需要保持一致的数据,需要采用并发同步控制

    2、NIO

    那么可以有其他方式来更好的处理么,我们可以采用NIO来处理,NIO中支持的基本机制:

    1. 非阻塞的IO读写

    2. 基于IO事件进行分发任务,同时支持对多个fd的监听

    我们看下NIO 中实现相关方式:

    public NIOServer(int port) throws Exception {
          selector = Selector.open();
          serverSocket = ServerSocketChannel.open();
          serverSocket.socket().bind(new InetSocketAddress(port));
          serverSocket.configureBlocking(false);
          serverSocket.register(selector, SelectionKey.OP_ACCEPT);
      }
    ​
      @Override
      public void run() {
          while (!Thread.interrupted()) {
              try {
                  //阻塞等待事件
                  selector.select();
                  // 事件列表
                  Set selected = selector.selectedKeys();
                  Iterator it = selected.iterator();
                  while (it.hasNext()) {
                      it.remove();
                      //分发事件
                      dispatch((SelectionKey) (it.next()));
                  }
              } catch (Exception e) {
    ​
              }
          }
      }
    ​
      private void dispatch(SelectionKey key) throws Exception {
          if (key.isAcceptable()) {
              register(key);//新链接建立,注册
          } else if (key.isReadable()) {
              read(key);//读事件处理
          } else if (key.isWritable()) {
              wirete(key);//写事件处理
          }
      }
    ​
      private void register(SelectionKey key) throws Exception {
          ServerSocketChannel server = (ServerSocketChannel) key
                  .channel();
          // 获得和客户端连接的通道
          SocketChannel channel = server.accept();
          channel.configureBlocking(false);
          //客户端通道注册到selector 上
          channel.register(this.selector, SelectionKey.OP_READ);
      }

     

    我们可以看到上述的NIO例子已经差不多拥有reactor的影子了

    1. 基于事件驱动-> selector(支持对多个socketChannel的监听)

    2. 统一的事件分派中心-> dispatch

    3. 事件处理服务-> read & write

     

    事实上NIO已经解决了上述BIO暴露的1&2问题了,服务器的并发客户端有了量的提升,不再受限于一个客户端一个线程来处理,而是一个线程可以维护多个客户端(selector 支持对多个socketChannel 监听)。

    但这依然不是一个完善的Reactor Pattern ,首先Reactor 是一种设计模式,好的模式应该是支持更好的扩展性,显然以上的并不支持,另外好的Reactor Pattern 必须有以下特点:

    1. 更少的资源利用,通常不需要一个客户端一个线程

    2. 更少的开销,更少的上下文切换以及locking

    3. 能够跟踪服务器状态

    4. 能够管理handler 对event的绑定

    那么好的Reactor Pattern应该是怎样的?

    三、Reactor

    在应用Java NIO构建Reactor Pattern中,大神 Doug Lea(让人无限景仰的java 大神)在“Scalable IO in Java”中给了很好的阐述。我们采用大神介绍的3种Reactor 来分别介绍。

    首先我们基于Reactor Pattern 处理模式中,定义以下三种角色:

    • Reactor 将I/O事件分派给对应的Handler

    • Acceptor 处理客户端新连接,并分派请求到处理器链中

    • Handlers 执行非阻塞读/写 任务

     

    1、单Reactor单线程模型

    我们看代码的实现方式:

      /**
        * 等待事件到来,分发事件处理
        */
      class Reactor implements Runnable {
    ​
          private Reactor() throws Exception {
    ​
              SelectionKey sk =
                      serverSocket.register(selector,
                              SelectionKey.OP_ACCEPT);
              // attach Acceptor 处理新连接
              sk.attach(new Acceptor());
          }
    ​
          public void run() {
              try {
                  while (!Thread.interrupted()) {
                      selector.select();
                      Set selected = selector.selectedKeys();
                      Iterator it = selected.iterator();
                      while (it.hasNext()) {
                          it.remove();
                          //分发事件处理
                          dispatch((SelectionKey) (it.next()));
                      }
                  }
              } catch (IOException ex) {
                  //do something
              }
          }
    ​
          void dispatch(SelectionKey k) {
              // 若是连接事件获取是acceptor
              // 若是IO读写事件获取是handler
              Runnable runnable = (Runnable) (k.attachment());
              if (runnable != null) {
                  runnable.run();
              }
          }
    ​
      }
      
      /**
        * 连接事件就绪,处理连接事件
        */
      class Acceptor implements Runnable {
          @Override
          public void run() {
              try {
                  SocketChannel c = serverSocket.accept();
                  if (c != null) {// 注册读写
                      new Handler(c, selector);
                  }
              } catch (Exception e) {
    ​
              }
          }
      }

    这是最基本的单Reactor单线程模型。其中Reactor线程,负责多路分离套接字,有新连接到来触发connect 事件之后,交由Acceptor进行处理,有IO读写事件之后交给hanlder 处理。

    Acceptor主要任务就是构建handler ,在获取到和client相关的SocketChannel之后 ,绑定到相应的hanlder上,对应的SocketChannel有读写事件之后,基于racotor 分发,hanlder就可以处理了(所有的IO事件都绑定到selector上,有Reactor分发)。

    该模型 适用于处理器链中业务处理组件能快速完成的场景。不过,这种单线程模型不能充分利用多核资源,所以实际使用的不多。

     

    2、单Reactor多线程模型

    相对于第一种单线程的模式来说,在处理业务逻辑,也就是获取到IO的读写事件之后,交由线程池来处理,这样可以减小主reactor的性能开销,从而更专注的做事件分发工作了,从而提升整个应用的吞吐。

    我们看下实现方式:

      /**
        * 多线程处理读写业务逻辑
        */
      class MultiThreadHandler implements Runnable {
          public static final int READING = 0, WRITING = 1;
          int state;
          final SocketChannel socket;
          final SelectionKey sk;
    ​
          //多线程处理业务逻辑
          ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    ​
    ​
          public MultiThreadHandler(SocketChannel socket, Selector sl) throws Exception {
              this.state = READING;
              this.socket = socket;
              sk = socket.register(selector, SelectionKey.OP_READ);
              sk.attach(this);
              socket.configureBlocking(false);
          }
    ​
          @Override
          public void run() {
              if (state == READING) {
                  read();
              } else if (state == WRITING) {
                  write();
              }
          }
    ​
          private void read() {
              //任务异步处理
              executorService.submit(() -> process());
    ​
              //下一步处理写事件
              sk.interestOps(SelectionKey.OP_WRITE);
              this.state = WRITING;
          }
    ​
          private void write() {
              //任务异步处理
              executorService.submit(() -> process());
    ​
              //下一步处理读事件
              sk.interestOps(SelectionKey.OP_READ);
              this.state = READING;
          }
    ​
          /**
            * task 业务处理
            */
          public void process() {
              //do IO ,task,queue something
          }
      }

     

    3、多Reactor多线程模型

     

    第三种模型比起第二种模型,是将Reactor分成两部分,

    1. mainReactor负责监听server socket,用来处理新连接的建立,将建立的socketChannel指定注册给subReactor。

    2. subReactor维护自己的selector, 基于mainReactor 注册的socketChannel多路分离IO读写事件,读写网 络数据,对业务处理的功能,另其扔给worker线程池来完成。

     

    我们看下实现方式:

      /**
        * 多work 连接事件Acceptor,处理连接事件
        */
      class MultiWorkThreadAcceptor implements Runnable {
    ​
          // cpu线程数相同多work线程
          int workCount =Runtime.getRuntime().availableProcessors();
          SubReactor[] workThreadHandlers = new SubReactor[workCount];
          volatile int nextHandler = 0;
    ​
          public MultiWorkThreadAcceptor() {
              this.init();
          }
    ​
          public void init() {
              nextHandler = 0;
              for (int i = 0; i < workThreadHandlers.length; i++) {
                  try {
                      workThreadHandlers[i] = new SubReactor();
                  } catch (Exception e) {
                  }
    ​
              }
          }
    ​
          @Override
          public void run() {
              try {
                  SocketChannel c = serverSocket.accept();
                  if (c != null) {// 注册读写
                      synchronized (c) {
                          // 顺序获取SubReactor,然后注册channel 
                          SubReactor work = workThreadHandlers[nextHandler];
                          work.registerChannel(c);
                          nextHandler++;
                          if (nextHandler >= workThreadHandlers.length) {
                              nextHandler = 0;
                          }
                      }
                  }
              } catch (Exception e) {
              }
          }
      }
     
      /**
        * 多work线程处理读写业务逻辑
        */
      class SubReactor implements Runnable {
          final Selector mySelector;
    ​
          //多线程处理业务逻辑
          int workCount =Runtime.getRuntime().availableProcessors();
          ExecutorService executorService = Executors.newFixedThreadPool(workCount);
    ​
    ​
          public SubReactor() throws Exception {
              // 每个SubReactor 一个selector 
              this.mySelector = SelectorProvider.provider().openSelector();
          }
    ​
          /**
            * 注册chanel
            *
            * @param sc
            * @throws Exception
            */
          public void registerChannel(SocketChannel sc) throws Exception {
              sc.register(mySelector, SelectionKey.OP_READ | SelectionKey.OP_CONNECT);
          }
    ​
          @Override
          public void run() {
              while (true) {
                  try {
                  //每个SubReactor 自己做事件分派处理读写事件
                      selector.select();
                      Set<SelectionKey> keys = selector.selectedKeys();
                      Iterator<SelectionKey> iterator = keys.iterator();
                      while (iterator.hasNext()) {
                          SelectionKey key = iterator.next();
                          iterator.remove();
                          if (key.isReadable()) {
                              read();
                          } else if (key.isWritable()) {
                              write();
                          }
                      }
    ​
                  } catch (Exception e) {
    ​
                  }
              }
          }
    ​
          private void read() {
              //任务异步处理
              executorService.submit(() -> process());
          }
    ​
          private void write() {
              //任务异步处理
              executorService.submit(() -> process());
          }
    ​
          /**
            * task 业务处理
            */
          public void process() {
              //do IO ,task,queue something
          }
      }
    ​

    第三种模型中,我们可以看到,mainReactor 主要是用来处理网络IO 连接建立操作,通常一个线程就可以处理,而subReactor主要做和建立起来的socket做数据交互和事件业务处理操作,它的个数上一般是和CPU个数等同,每个subReactor一个县城来处理。

    此种模型中,每个模块的工作更加专一,耦合度更低,性能和稳定性也大量的提升,支持的可并发客户端数量可达到上百万级别。

    关于此种模型的应用,目前有很多优秀的矿建已经在应用了,比如mina 和netty 等。上述中去掉线程池的第三种形式的变种,也 是Netty NIO的默认模式。下一节我们将着重讲解netty的架构模式。

     

    四、事件处理模式

    在 Douglas Schmidt 的大作《POSA2》中有关于事件处理模式的介绍,其中有四种事件处理模式:

    1. Reactor  

    2. Proactor  

    3. Asynchronous Completion Token  

    4. Acceptor-Connector  

    1. Proactor

    本文介绍的Reactor就是其中一种,而Proactor的整体结构和reacotor的处理方式大同小异,不同的是Proactor采用的是异步非阻塞IO的方式实现,对数据的读写由异步处理,无需用户线程来处理,服务程序更专注于业务事件的处理,而非IO阻塞。

    2. Asynchronous Completion Token

    简单来说,ACT就是应对应用程序异步调用服务操作,并处理相应的服务完成事件。从token这个字面意思,我们大概就能了解到,它是一种状态的保持和传递。

    比如,通常应用程序会有调用第三方服务的需求,一般是业务线程请求都到,需要第三方资源的时候,去同步的发起第三方请求,而为了提升应用性能,需要异步的方式发起请求,但异步请求的话,等数据到达之后,此时的我方应用程序的语境以及上下文信息已经发生了变化,你没办法去处理。

    ACT 解决的就是这个问题,采用了一个token的方式记录异步发送前的信息,发送给接受方,接受方回复的时候再带上这个token,此时就能恢复业务的调用场景。

     

    上图中我们可以看到在client processing 这个阶段,客户端是可以继续处理其他业务逻辑的,不是阻塞状态。service 返回期间会带上token信息。  

    3. Acceptor-Connector

    Acceptor-Connector是于Reactor的结合,也可以看成是一种变种,它看起来很像上面介绍的Reactor第三种实现方式,但又有本质的不同。

    Acceptor-Connector模式是将网络中对等服务的连接和初始化分开处理,使系统中的连接建立及服务一旦服务初始化后就分开解除耦合。连接器主动地建立到远地接受器组件的连接,并初始化服务处理器来处理在连接上交换的数据。同样地,接受器被动地等待来自远地连接器的连接请求,在这样的请求到达时建立连接,并初始化服务处理器来处理在连接上交换的数据。随后已初始化的服务处理器执行应用特有的处理,并通过连接器和接受器组件建立的连接来进行通信。

    这么处理的好处是:

    1. 一般而言,用于连接建立和服务初始化的策略变动的频度要远小于应用服务实现和通信协议。

    2. 容易增加新类型的服务、新的服务实现和新的通信协议,而又不影响现有的连接建立和服务初始化软件。比如采用IPX/SPX通信协议或者TCP协议。

    3. 连接角色和通信角色的去耦合,连接角色只管发起连接 vs. 接受连接。通信角色只管数据交互。

    4. 将程序员与低级网络编程API(像socket或TLI)类型安全性的缺乏屏蔽开来。业务开发关系底层通信

    转载:https://my.oschina.net/u/1859679/blog/1844109 

    展开全文
  • Reactor模式是什么

    2020-03-01 18:08:42
    一、Reactor模式是什么 反应器设计模式(Reactor pattern)是一种为处理并发服务请求,并将请求提交到一个或 者多个服务处理程序的事件设计模式。当客户端请求抵达后,服务处理程序使用多路分配策略,由一个非阻塞的...
  • Reactor和preactor都是IO多路复用模式,一般地,I/O多路复用机制都依赖于一个事件多路分离器(Event Demultiplexer)。分离器对象可将来自事件源的I/O事件分离出来,并分发到对应的read/write事件处理器(Event ...
  • Reactor

    2018-09-12 17:39:03
    Project Reactor与Spring是兄弟项目,侧重于Server端的响应式编程,主要artifact是reactor-core,这是一个基于Java 8的实现了响应式流规范(Reactive Streams specification)的响应式库。 1.Flux与Mono Reactor中...
  • Reactor介绍

    2019-08-12 15:29:38
    在处理web请求时,通常有两种体系结构,分别为:thread-based architecture(基于线程)、event-driven architecture(事件驱动) thread-basedarchitecture 基于线程的体系结构通常会使用多线程来处理客户端的...
  • .NET Reactor 4.9 破解版

    2020-07-30 23:32:25
    .NET Reactor是一款功能强大的代码保护以及许可授权管理系统软件,主要用于开发人员保护其.NET软件程序,.NET Reactor支持所有支持.NET编译的程序开发语言。 .NET Reactor 4.9是目前最新版本,由大神yoza破解,亲测...
  • 文章目录单 Reactor 单线程工作原理示意图方案说明方案优缺点分析优点缺点使用场景单 Reactor 多线程工作原理示意图方案说明方案优缺点分析优点缺点主从 Reactor 多线程工作原理示意图方案说明方案优缺点分析优点...
  • reactor和proactor模式

    2013-01-02 08:11:25
    首先分享一下,我在网上看到的两篇不错的文章:正是这两篇文章才理解了reactor和proactor模式;  Reactor模式,或者叫反应器模式 高性能IO设计的Reactor和Proactor模式  首先就第一篇《Reactor模式,或者叫...
  • NIO学习--Reactor模型

    2018-07-13 10:03:35
    通过之前的Unix的IO模型介绍,想必也了解到了5种IO...本文将从以下几点阐述Reactor模式:reactor 是什么为何要用,能解决什么问题如何用,更好的方式其他事件处理模式 一、Reactor 是什么关于reactor 是什么,我...
  • Reactor 介绍 Reactor模式网上有很多讲解,我这里不想过多介绍。其核心共有三个。 抽象的事件 事件多路分发器 一个Reactor用来管理整个流程 reactor by c Reactor是我们从代码中抽取出来方便我们进行管理的。但...
  • 2)安装.Net Reactor v4.0.0.0,然后安装.NET Reactor Registration v4.0.0.0。 注意: 1).NET Reactor Registration v4.0.0.0安装程序不会修改原有的程序。 2).NET Reactor Registration v4.0.0.0和下面我在2009...
  •   在讨论Netty的架构模式之前,我们先来介绍下Reactor模式,因为Netty的架构模式是在此基础上演变而来的 Reactor模式介绍 1. 线程模型基本介绍   不同的线程模式,对程序的性能有很大影响,为了搞清Netty 线程...
  • 与模糊工具(Obfuscator)相比,.NET Reactor 可以完全阻止对 .NET 程序集(由 C#, VB.NET, Delphi.NET, J#, MSIL... 等语言编写)的反编译。通俗的讲,.NET Reactor 在破解者和您的 .NET 代码之间构建了强大的防...
  • 什么是 reactor 模式

    2019-02-16 12:08:58
    在网上看了很多reactor 模式,每个都是各有千秋,这里我写一下自己对reactor 模式感悟。 1. Reactor模式是什么 反应器设计模式(Reactor pattern)是一种为处理并发服务请求,并将请求提交到一个或者多个服务处理...
  • .NET Reactor反编译

    2020-07-25 23:33:11
    .NET Reactor 是一款强大的 .NET 代码保护和授权管理系统,安全可靠、简单易用,主要用来帮助开发人员保护他们的 .NET 软件产品。开发人员从此不必担心如何保护他们的知识产权,可以将更多精力放在产品功能的开发上...
  • 然而,Java NIO以及后面要介绍的netty网络框架都是有一套理论在背后支撑的,那就是reactor模式的应用。 二、什么是reactor模式? reactor模式翻译过来叫做反应器模式,通常我们都直接叫做reactor模式。 reactor模式...
  • 一、什么是reactor模式? reactor模式翻译过来叫做反应器模式,通常我们都直接叫做reactor模式。 reactor模式是一种事件驱动模式,用于处理一个或者多个客户端发过来的请求,服务端会有一个处理器对到来的请求进行...
1 2 3 4 5 ... 20
收藏数 32,654
精华内容 13,061
关键字:

reactor