响应式编程_响应式编程优点缺点 - CSDN
精华内容
参与话题
  • 响应式编程(Reactive Programming)介绍

    千次阅读 2016-10-14 17:32:19
    响应式编程(Reactive Programming)介绍 很明显你是有兴趣学习这种被称作响应式编程的新技术才来看这篇文章的。 学习响应式编程是很困难的一个过程,特别是在缺乏优秀资料的前提下。刚开始学习时,我试过去找一些...

    响应式编程(Reactive Programming)介绍

    很明显你是有兴趣学习这种被称作响应式编程的新技术才来看这篇文章的。

    学习响应式编程是很困难的一个过程,特别是在缺乏优秀资料的前提下。刚开始学习时,我试过去找一些教程,并找到了为数不多的实用教程,但是它们都流于表面,从没有围绕响应式编程构建起一个完整的知识体系。库的文档往往也无法帮助你去了解它的函数。不信的话可以看一下这个:

    通过合并元素的指针,将每一个可观察的元素序列放射到一个新的可观察的序列中,然后将多个可观察的序列中的一个转换成一个只从最近的可观察序列中产生值得可观察的序列。

    天啊。

    我看过两本书,一本只是讲述了一些概念,而另一本则纠结于如何使用响应式编程库。我最终放弃了这种痛苦的学习方式,决定在开发中一边使用响应式编程,一边理解它。在 Wikipedia 一如既往的空泛与理论化。Reactive Manifesto 看起来是你展示给你公司的项目经理或者老板们看的东西。微软的 Observer Design Pattern

    上面的示意图也可以使用ASCII重画为下图,在下面的部分教程中我们会使用这幅图:

        --a---b-c---d---X---|->
    
        a, b, c, d are emitted values
        X is an error
        | is the 'completed' signal
        ---> is the timeline 

    既然已经开始对响应式编程感到熟悉,为了不让你觉得无聊,我们可以尝试做一些新东西:我们将会把一个 Click event stream 转为新的 Click event stream。

    首先,让我们做一个能记录一个按钮点击了多少次的计数器 Stream。在常见的响应式编程库中,每个Stream都会有多个方法,如 mapfilterscan, 等等。当你调用其中一个方法时,例如 clickStream.map(f),它就会基于原来的 Click stream 返回一个新的 Stream 。它不会对原来的 Click steam 作任何修改。这个特性称为不可变性,它对于响应式编程 Stream,就如果汁对于薄煎饼。我们也可以对方法进行链式调用,如 clickStream.map(f).scan(g)

        clickStream: ---c----c--c----c------c-->
                   vvvvv map(c becomes 1) vvvv
                   ---1----1--1----1------1-->
                   vvvvvvvvv scan(+) vvvvvvvvv
        counterStream: ---1----2--3----4------5--> 

    map(f) 会根据你提供的 f 函数把原 Stream 中的 Value 分别映射到新的 Stream 中。在我们的例子中,我们把每一次 Click 都映射为数字 1。scan(g) 会根据你提供的 g 函数把 Stream 中的所有 Value 聚合成一个 Value x = g(accumulated, current) ,这个示例中 g 只是一个简单的添加函数。然后,每 Click 一次, counterStream 就会把点击的总次数发给它的观察者。

    为了展示响应式编程真正的实力,让我们假设你想得到一个包含“双击”事件的 Stream。为了让它更加有趣,假设我们想要的这个 Stream 要同时考虑三击(Triple clicks),或者更加宽泛,连击(两次或更多)。深呼吸一下,然后想像一下在传统的命令式且带状态的方式中你会怎么实现。我敢打赌代码会像一堆乱麻,并且会使用一些变量保存状态,同时也有一些计算时间间隔的代码。

    而在响应式编程中,这个功能的实现就非常简单。事实上,这逻辑只有 RxJS 作为工具 ,因为JavaScript是现在最多人会的语言,而 .NETJavaScriptPython,Objective-C/Cocoa, Groovy等等)。所以,无论你用的是什么工具,你都能从下面这个教程中受益。

    实现"Who to follow"推荐界面

    在 Twitter 上,这个表明其他账户的 UI 元素看起来是这样的:

    响应式编程

    我们将会重点模拟它的核心功能,如下:

    • 启动时从 API 那里加载帐户数据,并显示 3 个推荐
    • 点击"Refresh"时,加载另外 3 个推荐用户到这三行中
    • 点击帐户所在行的'x'按钮时,只清除那一个推荐然后显示一个新的推荐
    • 每行都会显示帐户的头像,以及他们主页的链接

    我们可以忽略其它的特性和按钮,因为它们是次要的。同时,因为 Twitter 最近关闭了对非授权用户的 API,我们将会为 Github 实现这个推荐界面,而非 Twitter。这是subscribing 这个 Stream。

        requestStream.subscribe(function(requestUrl) {
        // execute the request
        jQuery.getJSON(requestUrl, function(responseData) {
            // ...
        });
        }

    留意一下我们使用了 jQuery 的 Ajax 函数(我们假设你已经知道 Rx.Observable.create()所做的事就是通过显式的通知每一个 Observer (或者说是“Subscriber”) Data events(onNext() )或者 Errors ( onError() )来创建你自己的 Stream。而我们所做的就只是把 jQuery Ajax Promise 包装起来而已。打扰一下,这意味者Promise本质上就是一个Observable?

    响应式编程

    是的。

    Observable 就是 Promise++。在 Rx 中,你可以用 var stream = Rx.Observable.fromPromise(promise) 轻易的把一个 Promise 转为 Observable,所以我们就这样子做吧。唯一的不同就是 Observable 并不遵循pointers:每个映射的值都是一个指向其它 Stream 的指针。在我们的例子里,每个请求 URL 都会被映射一个指向包含响应 Promise stream 的指针。

    响应式编程

    Response 的 Metastream 看起来会让人困惑,并且看起来也没有帮到我们什么。我们只想要一个简单的响应 stream,其中每个映射的值应该是 JSON 对象,而不是一个 JSON 对象的'Promise'。是时候介绍 (Mr. Flatmap)(merge() 函数。这就是它做的事的图解:

        stream A: ---a--------e-----o----->
        stream B: -----B---C-----D-------->
              vvvvvvvvv merge vvvvvvvvv
              ---a-B---C--e--D--o-----> 

    这样就简单了:

        var requestOnRefreshStream = refreshClickStream
        .map(function() {
            var randomOffset = Math.floor(Math.random()*500);
            return 'https://api.github.com/users?since=' + randomOffset;
        });
    
        var startupRequestStream = Rx.Observable.just('https://api.github.com/users');
    
        var requestStream = Rx.Observable.merge(
        requestOnRefreshStream, startupRequestStream
        ); 

    还有一个更加简洁的可选方案,不需要使用中间变量。

        var requestStream = refreshClickStream
        .map(function() {
            var randomOffset = Math.floor(Math.random()*500);
            return 'https://api.github.com/users?since=' + randomOffset;
        })
          .merge(Rx.Observable.just('https://api.github.com/users')); 

    甚至可以更简短,更具有可读性:

        var requestStream = refreshClickStream
        .map(function() {
            var randomOffset = Math.floor(Math.random()*500);
            return 'https://api.github.com/users?since=' + randomOffset;
        })
          .startWith('https://api.github.com/users');

    <a rel="nofollow" href="https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/observable.md" "#rxobservableprototypestartwithscheduler-args="" style="box-sizing: border-box; color: rgb(45, 133, 202); text-decoration: none; background-color: transparent;">startWith() 函数做的事和你预期的完全一样。无论你输入的 Stream 是怎样,startWith(x) 输出的 Stream 一开始都是 x 。但是还不够 Separation of concerns。还记得响应式编程的咒语么?

    响应式编程

    所以让我们把显示的推荐设计成一个 stream,其中每一个映射的值都是包含了推荐内容的 JSON 对象。我们以此把三个推荐内容分开来。现在第一个推荐看起来是这样子的:

        var suggestion1Stream = responseStream
        .map(function(listUsers) {
            // get one random user from the list
            return listUsers[Math.floor(Math.random()*listUsers.length)];
          }); 

    其他的, suggestion2Stream 和 suggestion3Stream 可以简单的拷贝 suggestion1Stream 的代码来使用。这不是 DRY,它会让我们的例子变得更加简单一些,加之我觉得这是一个可以帮助考虑如何减少重复的良好实践。

    我们不在 responseStream 的 subscribe() 中处理渲染了,我们这么处理:

        suggestion1Stream.subscribe(function(suggestion) {
        // render the 1st suggestion to the DOM
        }); 

    回到"当刷新时,清理掉当前的推荐",我们可以很简单的把刷新点击映射为 null,并且在 suggestion1Stream 中包含进来,如下:

        var suggestion1Stream = responseStream
        .map(function(listUsers) {
            // get one random user from the list
            return listUsers[Math.floor(Math.random()*listUsers.length)];
        })
        .merge(
            refreshClickStream.map(function(){ return null; })
          );

    当渲染时,null 解释为"没有数据",所以把 UI 元素隐藏起来。

        suggestion1Stream.subscribe(function(suggestion) {
        if (suggestion === null) {
            // hide the first suggestion DOM element
        }
        else {
            // show the first suggestion DOM element
            // and render the data
        }
        }); 

    现在的示意图:

        refreshClickStream: ----------o--------o---->
        requestStream: -r--------r--------r---->
        responseStream: ----R---------R------R-->   
        suggestion1Stream: ----s-----N---s----N-s-->
        suggestion2Stream: ----q-----N---q----N-q-->
        suggestion3Stream: ----t-----N---t----N-t--> 

    其中,N 代表了 null

    作为一种补充,我们也可以在一开始的时候就渲染“空的”推荐内容。这通过把 startWith(null) 添加到 Suggestion stream 就完成了:

        var suggestion1Stream = responseStream
        .map(function(listUsers) {
            // get one random user from the list
            return listUsers[Math.floor(Math.random()*listUsers.length)];
        })
        .merge(
            refreshClickStream.map(function(){ return null; })
        )
          .startWith(null); 

    现在结果是:

        refreshClickStream: ----------o---------o---->
         requestStream: -r--------r---------r---->
        responseStream: ----R----------R------R-->   
        suggestion1Stream: -N--s-----N----s----N-s-->
        suggestion2Stream: -N--q-----N----q----N-q-->
        suggestion3Stream: -N--t-----N----t----N-t--> 

    关闭推荐并使用缓存的响应

    还有一个功能需要实现。每一个推荐,都该有自己的"X"按钮以关闭它,然后在该位置加载另一个推荐。最初的想法,点击任何关闭按钮时都需要发起一个新的请求:

        var close1Button = document.querySelector('.close1');
        var close1ClickStream = Rx.Observable.fromEvent(close1Button, 'click');
        // and the same for close2Button and close3Button
    
        var requestStream = refreshClickStream.startWith('startup click')
        .merge(close1ClickStream) // we added this
        .map(function() {
            var randomOffset = Math.floor(Math.random()*500);
            return 'https://api.github.com/users?since=' + randomOffset;
          }); 

    这个没有效果。这将会关闭并且重新加载 所有 的推荐,而不是仅仅处理我们点击的那一个。有一些不一样的方法可以解决,并且让它变得更加有趣,我们可以通过复用之前的请求来解决它。API 的响应页面有 100 个用户,而我们仅仅使用其中的三个,所以还有很多的新数据可以使用,无须重新发起请求。

    同样的,我们用Stream的方式来思考。当点击'close1'时,我们想要用 responseStream 最近的映射从响应列表中获取一个随机的用户,如:

        requestStream: --r--------------->
        responseStream: ------R----------->
        close1ClickStream: ------------c----->
        suggestion1Stream: ------s-----s-----> 

    在 Rx* 中, 叫做连接符函数的 big list of functions,它包括了如何转换、合并、以及创建 Observable。如果你想通过图表去理解这些函数,请看 Cold vs Hot Observables 中的概念。如果忽略了这些,你一不小心就会被它坑了。我提醒过你了。通过学习真正的函数式编程去提升自己的技能,并熟悉那些会影响到 Rx 的问题,比如副作用。

    但是响应式编程不仅仅是 Rx。还有相对容易理解的 Elm Language 则以它自己的方式支持 RP:它是一门会编译成 Javascript + HTML + CSS 的响应式编程语言 ,并有一个 RxJava 是实现Netflix's API服务器端并发的一个重要组件 。Rx 并不是一个只能在某种应用或者语言中使用的 Framework。它本质上是一个在开发任何 Event-driven 软件中都能使用的编程范式。

    本文来自于:http://wiki.jikexueyuan.com/project/android-weekly/issue-145/introduction-to-RP.html


    展开全文
  • Spring5.0响应式编程入门

    千次阅读 2018-06-14 09:35:29
    引言​ 响应式编程是一种面向数据流和变化传播的编程范式。使用它可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。我们可以使用声明的方式构建应用程序的...

    引言

    ​ 响应式编程是一种面向数据流和变化传播的编程范式。使用它可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。我们可以使用声明的方式构建应用程序的能力,形成更加敏感和有弹性的应用,所以Spring 5在其核心框架中增加了反应系统,已经开始向声明式编程的范式转变。

    响应式编程的优势

    • 提高了代码的可读性,因此开发人员只需要关注定义业务逻辑。

    • 在高并发环境中,可以自然的处理消息。

    • 可以控制生产者和消费者之间的流量,避免内存不足。

    • 对于一个或多个线程,IO绑定任务可以通过异步和非阻塞方式执行,而且不阻塞当前线程。

    • 可以有效的管理多个连接系统之间的通信。

    应用场景

    • 大量的交易处理服务,如银行部门。

    • 大型在线购物应用程序的通知服务,如亚马逊。

    • 股票价格同时变动的股票交易业务。

    Spring 5.0前瞻

    作为Java中的首个响应式Web框架,Spring 5.0最大的亮点莫过于提供了完整的端到端响应式编程的支持。
    如上图所示左侧是传统的基于Servlet的Spring Web MVC框架,右侧是spring 5.0新引入的基于Reactive Streams的Spring WebFlux框架,从上往下依次是:Router Functions,WebFlux,Reactive Streams三个新组件,其中:

    • Router Functions: 对标@Controller,@RequestMapping等标准的Spring MVC注解,提供一套函数式风格的API,用于创建Router,Handler和Filter。

    • WebFlux: 核心组件,协调上下游各个组件提供响应式编程支持。

    • Reactive Streams: 一种支持背压(Backpressure)的异步数据流处理标准,主流实现有RxJava和Reactor,Spring WebFlux默认集成的是Reactor。

    示例代码

    1.创建项目

    spring响应式开发,需要结合spring boot来完成,所以使用idea创建springboot项目,选择Spring Initializr选项,jdk选择1.8以上。 选择下一步,设置groupId和artifactId。
    选择下一步,选择web选项中的Reactive web选项,表明要创建响应式项目,注意Spring Boot的版本为2.0.2。选择下一步,设置项目的目录,点击finish创建项目。

    2.设置服务端口

    为了避免端口冲突,我们可以在项目的application.properties文件中配置服务端口。

    server.port=9002

    3.相关依赖包

    打开项目的pom.xml文件,会发现以下几个依赖包

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
    
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    其中:

    • spring-boot-starter-webflux:webflux依赖包,是响应式开发的核心依赖包,其中包含了spring-boot-starter-reactor-netty 、spring 5 webflux 包,默认是通过netty启动的。

    • spring-boot-starter-test:springboot的单元测试工具库。

    • reactor-test:Spring 5提供的官方针对RP框架测试工具库。

    小结

    ​ spring响应式开发需要结合spring boot来完成,同时需要引入spring-boot-starter-webflux和reactor-test两个依赖包来支持响应式开发。

    4.使用webflux创建web应用

    webflux的使用有两种方式,基于注解和函数式编程。这里使用函数式编程,具体操作如下:

    4.1.创建实体类

    
    public class Good {
    
        private int id;
        private String name;
        private String price;
    
        public Good(int id,String name,String price){
            this.id=id;
            this.name=name;
            this.price=price;
        }
    
        public int getId() {
            return id;
        }
    
        public void setId(int id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getPrice() {
            return price;
        }
    
        public void setPrice(String price) {
            this.price = price;
        }
    
        @Override
        public String toString() {
            return "Good{" +
                    "id=" + id +
                    ", name='" + name + '\'' +
                    ", price='" + price + '\'' +
                    '}';
        }
    }

    分析: 实体类没有什么特殊的操作,原来怎么操作现在还是怎么操作。

    4.2.创建GoodGenerator

    
    @Configuration
    public class GoodGenerator {
    
        public Flux<Good> findGoods(){
            List<Good> goods = new ArrayList<>();
            goods.add(new Good(1,"小米","2000"));
            goods.add(new Good(2,"华为","4000"));
            goods.add(new Good(3,"苹果","8000"));
            return Flux.fromIterable(goods);
        }
    }

    分析: 这里的方法返回的是Flux类型的数据,Flux是RP中最基础的数据类型之一,对应的是多值数据的返回操作,在RP中还有Mono数据类型,也是最基础的数据类型之一,对应单值数据的返回操作。

    注意:这里的GoodGenerator类要加上@Configuration注解。

    4.3.创建GoodHandler

    
    @Component
    @Configuration
    public class GoodHandler {
    
        private final Flux<Good> goods;
    
        public GoodHandler(GoodGenerator goodGenerator) {
            this.goods = goodGenerator.findGoods();
        }
    
        public Mono<ServerResponse> hello(ServerRequest request) {
    
            return ok().contentType(TEXT_PLAIN)
                    .body(BodyInserters.fromObject("Hello Spring!"));
        }
    
        public Mono<ServerResponse> echo(ServerRequest request) {
            return ok().contentType(APPLICATION_STREAM_JSON)
                    .body(this.goods,Good.class);
        }
    }

    分析: Handler主要用来处理请求操作,并将Mono<ServerResponse>返回,Mono<ServerResponse>中会封装响应数据,响应数据如果是字符串可以使用:

    
    ok().contentType(TEXT_PLAIN).body(BodyInserters.fromObject("Hello Spring!"));

    操作,如果是集合数据可以使用:

    
    ok().contentType(APPLICATION_STREAM_JSON).body(this.goods,Good.class)

    操作。

    4.4.创建GoodRouter

    
    @Configuration
    public class GoodRouter {
        @Bean
        public RouterFunction<ServerResponse> route(GoodHandler goodHandler) {
    
          return RouterFunctions
                  .route(RequestPredicates.GET("/good")
                          .and(RequestPredicates.accept(MediaType.TEXT_PLAIN)),goodHandler::hello)
                  .andRoute(RequestPredicates.GET("/goods")
                          .and(RequestPredicates.accept(MediaType.APPLICATION_STREAM_JSON)),goodHandler::echo);
        }
    
    }

    分析: GoodRouter主要用来设置请求路径和转化HTTP请求,可以使用route()方法和andRoute方法设置多个请求路径和转化操作。

    小结

    ​ HTTP请求会由GoodRouter转发给对应的Handler,Handler处理请求,并返回Mono<ServerResponse>,这里的Router类似@RequestMapping,Handler类似Controller

    4.4.运行测试

    ​ 实体类、GoodGenerator、GoodHandler、GoodRouter都已经创建完成了,我们可以运行项目打开浏览器进行测试.

    浏览器输入http://localhost:9002/good,即可获取到"Hello Spring!"文本信息

    浏览器输入http://localhost:9002/goods,即可获取到集合信息

    到目前为止,一个简单的webflux示例已经完成。

    4.5.单元测试

    在项目中我们也可以使用使用一个Spring 5新引入的测试工具类,WebTestClient,专门用于测试RP应用,具体代码如下:

    
    @RunWith(SpringRunner.class)
    @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
    public class Spring5demoApplicationTests {
    
        @Autowired
        private WebTestClient webTestClient;
    
        @Test
        public void helloTest() {
          String s=  webTestClient
                    .get().uri("/good")
                    .accept(MediaType.TEXT_PLAIN).exchange()
                    .expectStatus().isOk().returnResult(String.class)
                    .getResponseBody().blockFirst();
    
            System.out.println(s);
        }
    
        @Test
        public void findGoodsTest(){
            webTestClient.get().uri("/goods")
                    .accept(MediaType.APPLICATION_STREAM_JSON)
                    .exchange().expectStatus().isOk()
                    .expectHeader().contentType(MediaType.APPLICATION_STREAM_JSON)
                    .returnResult(Good.class)
                    .getResponseBody().collectList();
        }
    }

    创建WebTestClient实例时可以看到,编写RP应用的单元测试,同样也是数据不落地的流式风格

    总结

    ​ 到此,spring 5.0的响应式编程就给大家介绍到这里,这里只是简单进行了一个响应式入门操作,但是也能够体现出响应式编程的特点。当然spring 5.0响应式编程也不是完美的,它在故障诊断、依赖库集成、数据存储以及Spring Security安全权限框架支持等方面还是有局限性的。

    展开全文
  • 响应式编程介绍

    万次阅读 多人点赞 2015-06-15 17:35:02
    响应式编程简介(原文) 你应该对响应式编程这个新事件有点好奇吧,尤其是与之相关的部分框架:Rx、Bacon.js、RAC等等。 在缺乏好的资源的情况下,学习响应式编程成为痛苦。我开始学的时候,做死地找各种教程。结果...

    响应式编程简介

    (原文)

      你应该对响应式编程这个新事件有点好奇吧,尤其是与之相关的部分框架:Rx、Bacon.js、RAC等等。

      在缺乏好的资源的情况下,学习响应式编程成为痛苦。我开始学的时候,做死地找各种教程。结果发现有用的只是极少部分,而且这少部分也只是表面上的东西,对于整个体系结构的理解也起不了多大的作用。直接去看那些库文档同样也理解不了。比如下面这个:

    Rx.Observable.prototype.flatMapLatest(selector, [thisArg])

    Projects each element of an observable sequence into a new sequence of observable sequences by incorporating the element’s index and then transforms an observable sequence of observable sequences into an observable sequence producing values only from the most recent observable sequence.

      我擦,这究竟是什么鬼!

      我看过两本书,一本就是在那画图,另一本则在教怎么用响应式库。

      学习最困难的地方在于响应式思维。咱们得用不同于传统的方法来思考,而且还要尽量不用传统编程中的状态变量。我在网上没有找到任何关于这方面的东西,而我认为一个实用的教程就在于教会你怎么用响应式思维来思考,这样才能引导你入门。我希望这可以帮助你。

    “什么是响应式编程?”

      在网上的解释和定义大多是很烂的。维基中的定义又太泛而且过于理论。Stackoverflow上的标准答案对于新手而言又不太适合。 Reactive Manifesto听起来像是你在秀给你产品经理看似的。微软的 Rx术语 “Rx = Observables + LINQ + Schedulers” 这种微软式的说法,咱们大部分人是理解不了的。像“反应”和“变化传播”与典型的 MV* 没啥不同,现在的语言都是这么干的。我的视图当然反应于我的模型。变化当然会传播,如果不传播的话,那界面上不是不会变化了么!

      好了,不扯蛋了。

    响应式编程就是异步数据流编程。

      在某种程度上,这并不是什么新东西。事件总线(Event buses)或咱们常见的单击事件就是一个异步事件流,你可以观察这个流,也可以基于这个流做一些自定义操作(原文:side effects,副作用,本文皆翻译为自定义操作)。响应式就是基于这种想法。你能够创建所有事物的数据流,而不仅仅只是单击和悬停事件数据流。 流廉价且无处不在,任何事物都可以当作一个流:变量、用户输入、属性、缓存、数据结构等等。比如,假设你的微博评论就是一个跟单击事件一样的数据流,你能够监听这个流,并做出响应。

      最重要的是,有一堆的函数能够创建(create)任何流,也能将任何流进行组合(combine)和过滤(filter)。 这正是“函数式”的魔力所在。一个流能作为另一个流的输入(input),甚至多个流也可以作为其它流的输入。你能合并(merge)两个流。你还能通过过滤(filter)一个流得到那些你感兴趣的事件。你能将一个流中的数据映射(map)到一个新的流中。

      如果说流是响应式的中心,那咱们就来仔细地研究一下,先从咱们熟悉的“点击按钮”事件流开始。

      单击事件流

      一个流就是一个将要发生的以时间为序的事件序列。它能发射出三种不同的东西:一个数据值(data value)(某种类型的),一个错误(error)或者一个“完成(completed)”的信号。比如说,当前按钮所在的窗口或视图关闭时,“单击”事件流也就“完成”了。

      我们只能异步地捕获这些发出的事件:定义一个针对数据值的函数,在发出一个值时,该函数就会异步地执行;针对发出错误时的函数;还有针对发出‘完成’时的函数。有时你可以省略这最后两个函数,只专注于针对数据值的函数。“监听”流的行为叫做订阅。我们定义的这些函数就是观察者。这个流就是被观察的主体(subject)(或“可观察的(observable)”)。这正是观察者设计模式

      在本教程中,会有一部分地方用ASCII来画图:

    --a---b-c---d---X---|->
    
    a, b, c, d: 发出的值(value)
    X : 是一个错误(error)
    | : '完成'信号(completed)
    ---> : 时间线

      这已经够熟悉了,再说下去你就觉得烦了,咱来整点新玩意:咱们从点击事件流创建(通过转换)出新的点击事件流。

      首先,创建一个counter stream来记录一个按钮被点击了多少次。在所有的响应式库(Reactive libraries)中,有很多关于流的函数,比如mapfilterscan等等。在你调用这些函数时,比如clickStream.map(f),它会基于clickStream返回一个全新的流,也就是说,这个新的流随便怎么玩,也不会修改原来的clickStream。这就是所谓的不可变特性,这个特性与响应式流的结合极为nice。这使得咱们可以使用链式函数,比如clickStream.map(f).scan(g)

      clickStream: ---c----c--c----c------c-->
                   vvvvv map(c 变成 1) vvvv
                   ---1----1--1----1------1-->
                   vvvvvvvvv scan(+) vvvvvvvvv
    counterStream: ---1----2--3----4------5-->

      这个map(f) 函数根据你提供的f函数,将clickStream中发出的每一个值进行替换(替换后的值放到一个新的流中)。在咱们这个例子里,咱们直接将每一次点击都映射成为数字1。这个scan(g)函数会聚集流上之前所有的值,并得到一个值x = g(accumulated, current),在这里g只是一个简单的(+)函数。此时,每当点击事件发生的注意,counterStream这个流就会发出一个点击次数总数值,如上图的1、2、3、4、5就是在点击后发出的点击总数。

      注:
         scan可以这么理解:假设一个数组(流与数组不一样)
         id 为任意类型
         id x;
         for (id current in array) {
           x = g(x, current);
           发出(x);
         }
    

      为了显示响应式的强大之处,咱们来假设你想要一个“双击”事件流。为了使事情更有趣,咱们在这个流中,将多次点击(两次或两次以上)都当作是“双击”。深呼吸,然后想想用传统的编程方式该怎么来实现这个需求。我敢打赌,你会用一些变量来保存各种状态和计算时间间隔,这想想就好复杂。

      而在响应式中,这却很简单。实际上,实现这个逻辑只需要4行代码就可以了。但咱们先忽略掉代码。图是理解和构建流的最好的方法,无论是你初学者还是专家:

      Multiple clicks stream

      灰框里是将一个流转换成另一个流的函数。首先,我们先把那些点击间隔在250毫秒内的点击累积到一个列表中(简单来说,也就是buffer(stream.throttle(250ms))做的事情。先别急着理解这些代码的细节,这里只是响应式的一个小示例而已),这就个返回了一个列表流(即a stream of lists),然后咱们再针对这个流使用map()将列表转换成为代表列表长度的整数。最后,咱们使用filter(x >= 2)函数来过滤掉那些整数。就是这样,经过3步操作,得到了咱们想要的流。咱们可以订阅(监听)这个流来做咱们想做的事情

      我希望你会喜欢这个优雅地处理方式。这个例子仅仅是冰山一角,你可以用将相同的操作应用在不同的流上。比如API response流;另一方面,还有好多其它的函数可用。

    “为什么我应该考虑采用RP?”

      响应式编程提高了代码的抽象层次,这样你就可以专注于你的业务逻辑的事件定义,而不是尝尝捣鼓那些大量的实现细节。RP的代码可能会更简洁、清晰。

      对于现代web应用和移动应用这种众多UI事件与数据事件高度互动应用程序,好处更加明显。10年前,与web页面的交互基本上就是在后台提交一个表单,然后在前端进行简单的渲染。而现在的应用则更具实时性:修改一个单一表单字段可以自动触发保存到后端;“赞”某些内容则可以实时反映到其它相关联的用户那里,等等。

      如今的应用有丰富的各式各样的实时事件,给用户一种高度互动的体验。我们需要工具来妥善处理这些事情,响应式编程是其中一个答案。

    RP思维实践

      咱来整点真的。在这个真的例子中一步一步来教你怎么用RP来思考。这不是一堆的小例子,各种概念也会解释清楚。在教程的最后,咱们将会编出真正可用的代码,而且还理解咱们所做的每一件事。

      我选择 JavaScriptRxJS 作为本次教程的工具,原因是:JavaScript 是目前最广泛熟悉的语言,而 Rx* 类库 是很多语言和平台所广泛采用的类库 (.NET, Java, Scala, Clojure, JavaScript, Ruby, Python, C++, Objective-C/Cocoa, Groovy, 等等)。所以,基本上无论你的编程语言是什么,你都可以从本教程中受益。

    实现一个关注推荐表 “Who to follow”

      在 Twitter 中的关注推荐表是这样的:

      Twitter Who to follow suggestions box

      我们只关注模仿其核心功能:

      * 启动时,从API读取帐户数据,并且显示3个推荐的帐户
      * 点击 “Refresh”,读取另外3个帐户数据并放到推荐列表中
      * 点击 ‘x’ 按钮,清除按钮所在行的帐户数据,并显示另一个帐户
      * 每一行显示帐户的头像和他们的页面链接

      那些次要的功能和按钮咱就不管了。Twitter 最近关闭了其未授权公共API,所以咱们做一个关注 Github 用户的UI得了。这里是获取 Github 用户的 API

      如果你想提前看看的话,这里有完整的代码 http://jsfiddle.net/staltz/8jFJH/48/

    请求和响应

      你怎么用 Rx 来处理这里问题? 好了,开始,(基本上) 所有东西 都能当作是 流。这是 Rx 的口头禅。咱们先从最简单的功能开始:“启动时,用 API 读取3个帐户数据”。 这没有什么特别的地方,也就是几个简单的步骤:(1)发出一个请求(request),(2)获得到一个响应(response),(3)渲染得到的响应数据。OK,我们继续,咱们把请求当作一个流。这有点小题大做了,但我们得从基础做起,对吧?

      在启动时,我们只需要发送一个请求,所以我们将其建模为数据流,这个流只会发射一个值。咱们知道,接下来还会有很多请求,但现在只有一个。

    --a------|->
    
    这里,a 是一个字符串 'https://api.github.com/users'

      这是我们想要请求的URLs流,当一个请求事件发生时,它发告诉咱们两件事情:when and what。“when” 是说,当发出一个事件时就表示应该开始执行那个请求。“what” 指的是这个请求发出的值:一个包含URL的字符串。

      在 Rx* 中创建只包含一个值的流是非常简单的。在官方术语中,流是“可观察的”,也就是说它可被观察,但如果用“observable”来命名的话,则显得有点蠢了,所以我还是把它叫做 stream

    var requestStream = Rx.Observable.just('https://api.github.com/users');

      现在,这只是一个字符串的流,还没有做其它操作,在该值被发出时,咱们需要以某种方式做点什么事情。这可以通过订阅(subscribing)这个流来完成。

    requestStream.subscribe(function(requestUrl) {
      // execute the request
      jQuery.getJSON(requestUrl, function(responseData) {
        // ...
      });
    }

      注意,在这里我们使用了 jQuery 的 Ajax 回调 来处理这个异步的请求操作。但是先等一下,Rx 就是处理 异步 数据流的。这个请求的response不是会包含一些数据么,那咱们是不是也可以将这个response包装成一个流呢?从概念上来看可行,那咱们来试试看:

    requestStream.subscribe(function(requestUrl) {
      // execute the request
      var responseStream = Rx.Observable.create(function (observer) {
        jQuery.getJSON(requestUrl)
        .done(function(response) { observer.onNext(response); })
        .fail(function(jqXHR, status, error) { observer.onError(error); })
        .always(function() { observer.onCompleted(); });
      });
    
      responseStream.subscribe(function(response) {
        // do something with the response
      });
    }

      Rx.Observable.create() 所做的就是创建一个你自己的流,在有数据事件(onNext())或错误(onError())时,这个流会通知其每一个观察者(或“订阅者”)。我们所做的只是对 jQuery Ajax Promise(注:JS Promise 模式) 的封装而已。打断一下,这也就是说 Promise 是可观察的?

     
     
     
     
     

    Amazed

      没错!

      在 Rx 中用 var stream = Rx.Observable.fromPromise(promise) 用可以将一个 Promise 转换成一个可观察的流,够简单吧。虽然 Observable 与 Promises/A+ 不兼容,但从概念上来说并没有什么冲突。简单点说,一个 Promise 就是只发射一个值的 Observable。Rx流比 promises 多的就是能够返回多个值。

      这也就是说 Observables 至少也有 Promises 这么强大,如果你相信 Promises 的能力的话,那你也应该留意一下 Rx Observables。

      现在回到刚刚那个例子,你有注意到 subscribe() 么,它就是用来回调的。responseStream的创建是依赖于requestStream的,创建这个流也还是很简单的吧。

      接下来介绍 map(f) 函数,它是针对流A中的每一个值,运用 f() 产生一个值(即映射),并将这个产生的值由流B发射出来。如果将其用在咱们的请求和响应流上的话,咱们可以将请求URLs 映射成为响应 Promises(伪装为 streams)。

    var responseMetastream = requestStream
      .map(function(requestUrl) {
        return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
      });

      这里创建了一个名为 “metastream” 的玩意儿:流中流(a stream of streams)。 别恐慌,metastream 也就是一个流,这个流发射出的值也是一个流。你可以把它当作指针:每一个发射出的值都是一个 指针 ,它指向另一个流。在这个例子中,每个请求URL被映射为一个指针,指向一个包含有response的promise流。

    Response metastream

      response metastream除了使事情更复杂之外,看起来没什么其它用呀。咱们只是想要一个简单的response流,每次会发射出一个JSON对象的,而不是这种发射 ‘Promise’对象的。先来看看 Flatmap:这是 map()的一个变种,能够 “整合(flattens)” metastream,经过整合后,“trunk”流发射出的值都来自于“branch”流。Flatmap 不是 “修复版”,metastream也不是一个bug,在 Rx 中,它们是处理异步responses很有用的工具。

    var responseStream = requestStream
      .flatMap(function(requestUrl) {
        return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
      });

    Response stream

      Nice。response 流是根据 request流定义的,如果我们接下来的 request流还有事件的话,咱们的 response流也将会产生相应的响应事件:

    requestStream:  --a-----b--c------------|->
    responseStream: -----A--------B-----C---|->
    
    (小写字母是request, 大写字母是相应的response)

      现在有了一个 response流,咱们可以根据咱们接受到的数据来渲染:

    responseStream.subscribe(function(response) {
      // render `response` to the DOM however you wish
    });

      到目前为止,代码如下:

    var requestStream = Rx.Observable.just('https://api.github.com/users');
    
    var responseStream = requestStream
      .flatMap(function(requestUrl) {
        return Rx.Observable.fromPromise(jQuery.getJSON(requestUrl));
      });
    
    responseStream.subscribe(function(response) {
      // render `response` to the DOM however you wish
    });

    刷新按钮

      这个 reponse 中的JSON里包含有 100 个用户数据。而这个API只能指定请求的页数(page offset),而不能指定请求每页的大小(page size),而咱们只需要3个就可以了,所以会有97个用户数据浪费掉。现在先忽略这个问题,待会再看怎么缓存reponses。

      刷新按钮每点击一次,请求流都应该发射一个新的URL,然后我们就可以得到一个新的response。这需要做两件事:1、刷新按钮的点击事件流(所有事物都可以当作一个流);2、更改请求流依赖于刷新按钮的点击事件流。RxJS有工具能根据事件监听器创建 Observables。

    var refreshButton = document.querySelector('.refresh');
    var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');

      刷新点击事件不会关联任何 API URL,我们需要将其映射到一个实际的URL上。现在咱们更改请求流的实现逻辑:对刷新点击流运用 map 函数,映射成为随机页面的API。

    var requestStream = refreshClickStream
      .map(function() {
        var randomOffset = Math.floor(Math.random()*500);
        return 'https://api.github.com/users?since=' + randomOffset;
      });

      
      这个请求在启动时什么也不会做,它只有在刷新按钮点击时才会被触发。请求会在这两种行为下发生:刷新按钮点击或打开网页。
      
      加上本例子最开始那个请求流,现在有两个请求流了。为了区分这两个流,分别给它们取个不同的名字:

    var requestOnRefreshStream = refreshClickStream
      .map(function() {
        var randomOffset = Math.floor(Math.random()*500);
        return 'https://api.github.com/users?since=' + randomOffset;
      });
    
    var startupRequestStream = Rx.Observable.just('https://api.github.com/users');

      怎么将这两个流“合并(merge)”为一个流呢? merge() 函数就是专为来干这事的。用文字图来解释一下它做了些什么:

    stream A: ---a--------e-----o----->
    stream B: -----B---C-----D-------->
              vvvvvvvvv merge vvvvvvvvv
              ---a-B---C--e--D--o----->

      现在合并两个流很简单了:

    var requestOnRefreshStream = refreshClickStream
      .map(function() {
        var randomOffset = Math.floor(Math.random()*500);
        return 'https://api.github.com/users?since=' + randomOffset;
      });
    
    var startupRequestStream = Rx.Observable.just('https://api.github.com/users');
    
    var requestStream = Rx.Observable.merge(
      requestOnRefreshStream, startupRequestStream
    );

      还有一种替代的简洁方式,没有临时中间流:

    var requestStream = refreshClickStream
      .map(function() {
        var randomOffset = Math.floor(Math.random()*500);
        return 'https://api.github.com/users?since=' + randomOffset;
      })
      .merge(Rx.Observable.just('https://api.github.com/users'));

      更简单,更具有可讲性的写法:

    var requestStream = refreshClickStream
      .map(function() {
        var randomOffset = Math.floor(Math.random()*500);
        return 'https://api.github.com/users?since=' + randomOffset;
      })
      .startWith('https://api.github.com/users');

      
      startWith() 函数的功能如其字面意思。无论你的输入流是怎么样的,startWith(x) 输出流在开始的时候都会发射出 x。我这可不是在 DRY (重复劳动),而是在对比各API(指这里的 startWith 与 merge 没什么区别)。可以将 startWith() 紧接在 refreshClickStream 后面,本质上这是在启动时 “模仿” 刷新按钮点击。

    var requestStream = refreshClickStream.startWith('startup click')
      .map(function() {
        var randomOffset = Math.floor(Math.random()*500);
        return 'https://api.github.com/users?since=' + randomOffset;
      });

      Nice。将启动请求流是刷新按钮点击请求流合并,只加了一个函数:startWith() 而已。

    推荐里的3个模型流

      直到现在,我们也只是在 response 流里的 subscribe()里的渲染步骤中接触了一下 推荐
    UI元素。此时刷新按钮带来了问题:当你点击refresh按钮后,当前这3个推荐却没有清空。新的推荐会在得到 response 显示,但为了 UI 看起来更自然,咱们需要在刷新按钮点击后清空当前的推荐。

    refreshClickStream.subscribe(function() {
      // clear the 3 suggestion DOM elements 
    });

      
      别,兄弟别这么干。这么做可不好,这会导致有 两个 订阅者操作推荐 DOM 元素(另一个是 responseStream.subscribe()),这违反了 关注点分离 原则。你可曾记得:

     
     
     
     

    Mantra

      所以咱们将一个推荐作为一个流,它发射出的值就是包含推荐数据的 JSON 对象。我们为这3个推荐分别单独创建一个流,第一个推荐流:

    var suggestion1Stream = responseStream
      .map(function(listUsers) {
        // get one random user from the list
        return listUsers[Math.floor(Math.random()*listUsers.length)];
      });

      另外两个,suggestion2Streamsuggestion3Stream 直接从 suggestion1Stream 复制过来就可以了。

      去掉 response 流的 subscrbie() 函数调用,咱们这么来渲染:  

    suggestion1Stream.subscribe(function(suggestion) {
      // render the 1st suggestion to the DOM
    });

      回到 “点击刷新,清除推荐数据”,我们将刷新按钮点击事件映射为一个 null 推荐数据,并将其合并到 suggestion1Stream 流中:

    var suggestion1Stream = responseStream
      .map(function(listUsers) {
        // get one random user from the list
        return listUsers[Math.floor(Math.random()*listUsers.length)];
      })
      .merge(
        refreshClickStream.map(function(){ return null; })
      );

      当渲染的时候,我们将 null 当作是 “没有数据”,隐藏相应的 UI 元素。

    suggestion1Stream.subscribe(function(suggestion) {
      if (suggestion === null) {
        // hide the first suggestion DOM element
      }
      else {
        // show the first suggestion DOM element
        // and render the data
      }
    });

      图如下:

    refreshClickStream: ----------o--------o---->
         requestStream: -r--------r--------r---->
        responseStream: ----R---------R------R-->   
     suggestion1Stream: ----s-----N---s----N-s-->
     suggestion2Stream: ----q-----N---q----N-q-->
     suggestion3Stream: ----t-----N---t----N-t-->

      这里 N 代表的是 null

      同样,我们可以在启动的时候渲染 “空” 的推荐数据。只要给推荐流加上 startWith(null) 就可以了:

    var suggestion1Stream = responseStream
      .map(function(listUsers) {
        // get one random user from the list
        return listUsers[Math.floor(Math.random()*listUsers.length)];
      })
      .merge(
        refreshClickStream.map(function(){ return null; })
      )
      .startWith(null);

      结果:

    refreshClickStream: ----------o---------o---->
         requestStream: -r--------r---------r---->
        responseStream: ----R----------R------R-->   
     suggestion1Stream: -N--s-----N----s----N-s-->
     suggestion2Stream: -N--q-----N----q----N-q-->
     suggestion3Stream: -N--t-----N----t----N-t-->

    清除一个推荐 和 缓存responses

      还有一个功能要实现。每个推荐的后面都有一个 ‘x’ 按钮能够清除当前行推荐数据,然后再读取另一个推荐数据并显示。你首先的想法可能是任何一个清除按钮点击后,发一个新的请求:

    var close1Button = document.querySelector('.close1');
    var close1ClickStream = Rx.Observable.fromEvent(close1Button, 'click');
    // and the same for close2Button and close3Button
    
    var requestStream = refreshClickStream.startWith('startup click')
      .merge(close1ClickStream) // we added this
      .map(function() {
        var randomOffset = Math.floor(Math.random()*500);
        return 'https://api.github.com/users?since=' + randomOffset;
      });

      这行不通。它们清除所有的推荐数据,然后再重新载入,而不是只是当前这一个有影响。有两种不同的方式来解决这个问题,我们通过复用之前的 responses 来解决这个问题。这个 API 的 response 每页有100个用户数据,而我们只用了3个,所以还有很多数据可以用来当作刷新数据,没有必要再发请求。

      同样,咱们以流的方式来思考。当 ‘close1’ 的一次点击事件发生时,我们就从 responseStream 最后发射 的 response 数据中随机取一条用户数据:

        requestStream: --r--------------->
       responseStream: ------R----------->
    close1ClickStream: ------------c----->
    suggestion1Stream: ------s-----s----->

      在 Rx* 中有一个组合函数: combineLatest ,看上去正是我们想要的。两个流 A 和 B作为其输入,无论哪个流发射了值(两个流都至少要发射一次值才会触发),combineLatest 都会将两个流最近分别发射的值 ab 组合起来,再输出一个值 c = f(x,y), 这里的 f 是你定义的函数。如下图所示:

    stream A: --a-----------e--------i-------->
    stream B: -----b----c--------d-------q---->
              vvvvvvvv combineLatest(f) vvvvvvv
              ----AB---AC--EC---ED--ID--IQ---->
    
    这里的 f 是一个 小写转大写 函数

      我们针对 close1ClickStreamresponseStream 两个流使用 combineLatest(),每当 close1 按钮一点击,我们都能获取到最后的 response,然后产生一个新的值给 suggestion1Stream。而且,combineLatest() 是对称的:只要 responseStream 一发射新的 response,它都会与 close 1按钮最后的点击组合触发,并产生一个新的推荐数据。这样,咱们就只需对前面 suggestion1Stream 的代码简单改造一下就可以了:

    var suggestion1Stream = close1ClickStream
      .combineLatest(responseStream,             
        function(click, listUsers) {
          return listUsers[Math.floor(Math.random()*listUsers.length)];
        }
      )
      .merge(
        refreshClickStream.map(function(){ return null; })
      )
      .startWith(null);

      还有一个问题没解决。combineLatest() 使用了两个资源的最后的数据,如果其中一个没有发射过的话,combineLatest() 返回的输出流就不会发射数据。如果你看了上面的文字图的话,你会发现当第一个流发射数据 a时,输出流并没有发射数据。当第二个输入流发射数据 b 时,输出流才产生一个值。

      也有两种不同的文案来解决这个问题,我们依然采用最简单的那种,在启动时模拟 ‘close 1’ 按钮被点击:

    var suggestion1Stream = close1ClickStream.startWith('startup click') // we added this
      .combineLatest(responseStream,             
        function(click, listUsers) {l
          return listUsers[Math.floor(Math.random()*listUsers.length)];
        }
      )
      .merge(
        refreshClickStream.map(function(){ return null; })
      )
      .startWith(null);

    结束

      做完了。完整代码如下:

    var refreshButton = document.querySelector('.refresh');
    var refreshClickStream = Rx.Observable.fromEvent(refreshButton, 'click');
    
    var closeButton1 = document.querySelector('.close1');
    var close1ClickStream = Rx.Observable.fromEvent(closeButton1, 'click');
    // and the same logic for close2 and close3
    
    var requestStream = refreshClickStream.startWith('startup click')
      .map(function() {
        var randomOffset = Math.floor(Math.random()*500);
        return 'https://api.github.com/users?since=' + randomOffset;
      });
    
    var responseStream = requestStream
      .flatMap(function (requestUrl) {
        return Rx.Observable.fromPromise($.ajax({url: requestUrl}));
      });
    
    var suggestion1Stream = close1ClickStream.startWith('startup click')
      .combineLatest(responseStream,             
        function(click, listUsers) {
          return listUsers[Math.floor(Math.random()*listUsers.length)];
        }
      )
      .merge(
        refreshClickStream.map(function(){ return null; })
      )
      .startWith(null);
    // and the same logic for suggestion2Stream and suggestion3Stream
    
    suggestion1Stream.subscribe(function(suggestion) {
      if (suggestion === null) {
        // hide the first suggestion DOM element
      }
      else {
        // show the first suggestion DOM element
        // and render the data
      }
    });

    整个儿能跑的例子:http://jsfiddle.net/staltz/8jFJH/48/

      例子虽小,但五脏俱全:用中心分离想法管理多个事件,甚至还有缓存。函数式风格代码更像是声明式代码:我们不是指定要执行的一串指令,而是通过定义流之间的关系来 描述事情是什么。比如,我们用 Rx 告诉计算机 suggestion1Stream ‘close 1’按钮点击流与最新的 response 中的一个用户数据的组合,除了程序启动时或刷新事件发生时是 null
      
      同时,这里没有多少类似于 ifforwhile之类的控制元素,也没有多少常见的回调函数。你甚至可以通过在 subscribe() 之前使用 filter() 来摆脱 ifelse(不举例了,留给你当作练习)。在 Rx 中,有很多用来操作流的函数,如 mapfilterscanmergecombineLatest、’ startWith和更多的控制流的事件驱动函数。这个函数集能让你以少量的代码实现更强大的功能。

    展开全文
  • Java中的响应式编程浅析

    万次阅读 2018-06-24 09:50:45
    最近接触到响应式编程的概念,简单了解了一下在java中的响应式编程响应式编程是一个专注于数据流和变化传递的异步编程范式。 响应式编程是一种编程概念,在很多编程语言中都有应用。其中,在Java中,我们比较熟悉...

    最近接触到响应式编程的概念,简单了解了一下在java中的响应式编程。响应式编程是一个专注于数据流和变化传递的异步编程范式。

    响应式编程是一种编程概念,在很多编程语言中都有应用。其中,在Java中,我们比较熟悉的有RxJava,有关RxJava的介绍,已经有大神写出比较完善的介绍:
    深入浅出RxJava(一:基础篇)
    深入浅出RxJava(二:操作符)
    深入浅出RxJava三–响应式的好处

    以上三篇博客已经初步介绍RxJava的使用方式以及一些应用场景,在此不再多介绍如何使用,这里尝试简单分析一下他的设计原理以及实现方式。

    响应式编程是观察者模式的扩展,RxJava中的实现也是如此。下面是一个观察者模式demo。

    public class ReactiveDemo {
    
        public static void main(String[] args){
            //可观察对象
            MyObservable observable = new MyObservable();
            //添加观察者
            observable.addObserver((o, arg) -> {
                Util.println("观察者1处理事件:" + arg.toString());
            });
    
            observable.addObserver((o, arg) -> {
                Util.println("观察者2处理事件:" + arg.toString());
            });
    
            observable.addObserver((o, arg) -> {
                Util.println("观察者3处理事件:" + arg.toString());
            });
            //发布事件通知观察者
            observable.setChanged();
            observable.notifyObservers("事件@@");
        }
    
    
        static class MyObservable extends Observable{
            @Override
            public void setChanged(){
                super.setChanged();
            }
        }
    }
    

    执行输出

    2018-06-23 18:16:43:993[main] 观察者3处理事件:事件@@
    2018-06-23 18:16:44:015[main] 观察者2处理事件:事件@@
    2018-06-23 18:16:44:015[main] 观察者1处理事件:事件@@

    从输出可以看出,代码的执行顺序并非按照我们的代码顺序执行,而是反过来,通过debug可以进一步看到,在添加观察者回调函数时,回调函数代码没有执行(这不是废话嘛0.0),知道被观察者发布事件通知观察者处理事件时才执行回调函数,并且都是在main线程中同步执行。这种方式可以称为同步非阻塞的响应式编程。既然有同步式的非阻塞,那就有异步非阻塞的响应式编程,在Java中的Swing就是一个很好的例子。
    下面看一下我们最常见的一个swing例子。

    public class SwingFrame {
        public static void main(String[] args) {
            JFrame jFrame = new JFrame();
            jFrame.setVisible(true);
            jFrame.setBounds(200,200,400,400);
    
            jFrame.addMouseListener(new MouseAdapter() {
                @Override
                public void mouseClicked(MouseEvent e) {
                    super.mouseClicked(e);
                    CommonUtil.println("鼠标点击事件");
                }
            });
    
            jFrame.addFocusListener(new FocusAdapter() {
                @Override
                public void focusGained(FocusEvent e) {
                    super.focusGained(e);
                    CommonUtil.println("焦点事件");
                }
            });
        }
    }
    

    执行程序,看看鼠标点击事件和焦点事件。

    2018-06-24 07:53:20:753[AWT-EventQueue-0] 焦点事件
    2018-06-24 07:53:25:216[AWT-EventQueue-0] 鼠标点击事件
    2018-06-24 07:53:27:849[AWT-EventQueue-0] 焦点事件
    2018-06-24 07:53:30:416[AWT-EventQueue-0] 鼠标点击事件
    

    尝试了几次,发现在swing的事件响应处理并不是在main线程里面进行处理的,鼠标点击和焦点事件处理都是在一个叫AWT-EventQueue-0的线程中进行处理,可以看见这种异步处理的方式有别于上面的观察者模式。这里对于事件的响应更加类似于一种对事件进行拉取的方式,我们点击窗体,发现打印鼠标事件是有延迟的,原因就是这里对于事件的获取是采用另起一个线程轮询策略,监听到对应的事件之后委托给对应的事件处理器(回调函数)进行处理,这种方式叫异步非阻塞。

    基于以上概念延伸出来的Reactive web,实现思路大致类似。我们了解一下原生servlet的Reactive web是怎么实现的。下面是一段网上找的描述。

    在服务器的并发请求数量比较大的时候,会产生很多的servlet线程(这些servlet线程在servlet容器的线程池中维护),如果每个请求需要耗费的时间比较长(比如,执行了一些IO的处理等),在之前的非异步的servlet中,这些servlet线程将会阻塞,严重耗费服务器的资源.而在servlet3.0中首次出现的异步servlet,通过一个单独的新的线程来执行这些比较耗时的任务(也可以把这些任务放到一个自己维护的线程池里),servlet线程立即返回servlet容器的servlet池以便响应其他请求,这样,在降低了系统的资源消耗的同时,也会提升系统的吞吐量
    这里写图片描述

    上面这种异步处理请求的方式是我们开发中常用的思路,在netty中的selector,worker概念和这个类似,下面看一下servlet中如何实现。

    @WebServlet(urlPatterns = "/asyncServlet", asyncSupported = true)
    public class AsyncServlet extends HttpServlet{
        public void doGet(HttpServletRequest request,
                          HttpServletResponse response) throws ServletException, IOException {
            CommonUtil.println("开始执行servlet");
            //开启异步上下文
            AsyncContext asyncContext = request.startAsync();
            //异步上下文设置回调函数(监听器)
            asyncContext.addListener(new AsyncListener() {
                @Override
                public void onComplete(AsyncEvent asyncEvent) throws IOException {
                    ServletResponse response1 = asyncEvent.getSuppliedResponse();
                    response1.setContentType("text/html;charset=UTF-8");
                    response1.getWriter().println("complete回调函数返回输出");
                    CommonUtil.println("complete回调函数完成");
                }
    
                @Override
                public void onTimeout(AsyncEvent asyncEvent) throws IOException {
    
                }
    
                @Override
                public void onError(AsyncEvent asyncEvent) throws IOException {
    
                }
    
                @Override
                public void onStartAsync(AsyncEvent asyncEvent) throws IOException {
    
                }
            });
            //开启新的工作线程,释放servlet处理请求线程,工作完后回调异步上下文的监听器
            new Thread(() -> {
                try {
                    ServletResponse response1 = asyncContext.getResponse();
                    response1.setContentType("text/html;charset=UTF-8");
                    response1.getWriter().print("异步工作线程返回输出");
                    //出发回调函数onComplete()
                    CommonUtil.println("工作线程完成");
                    asyncContext.complete();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }).start();
            CommonUtil.println("释放servlet线程");
        }
    }
    

    执行程序并且访问,输出

    2018-06-24 09:37:18:581[http-nio-8080-exec-1] 开始执行servlet
    2018-06-24 09:37:18:584[http-nio-8080-exec-1] 释放servlet线程
    2018-06-24 09:37:18:585[Thread-13] 工作线程完成
    2018-06-24 09:37:18:587[http-nio-8080-exec-2] complete回调函数完成

    明显可以看到接收请求线程,工作线程和回调函数线程都是不同的线程,这里面的回调方式,我猜测跟swing的事件处理方式可能一样,有专门线程进行轮询(没验证。。)。上面代码可以看出实现思路是开辟一个工作线程处理io等,然后触发上下文监听器回调函数,基本思路是这样实现。

    这里的处理方式的好处是, 正如上面所说的,”servlet线程立即返回servlet容器的servlet池以便响应其他请求,这样,在降低了系统的资源消耗的同时,也会提升系统的吞吐量”。

    ractive的编程方式,不一定能提升程序性能,但是它希望做到的是用少量线程和内存提升伸缩性,及时响应新请求。

    展开全文
  • 响应式编程

    2019-05-03 11:45:11
    响应式编程 本文是对Reactor官方文档Introduction to Reactive Programming部分内容的翻译,希望对理解响应式编程的概念和原理有所帮助。 响应式编程是一种异步编程范式,它关注数据流和变化的传播。这意味着可以...
  • (1)什么是响应式编程——响应式Spring的道法术器

    万次阅读 多人点赞 2018-03-10 12:05:57
    响应式编程之道 1.1 什么是响应式编程? 在开始讨论响应式编程(Reactive Programming)之前,先来看一个我们经常使用的一款堪称“响应式典范”的强大的生产力工具——电子表格。 举个简单的例子,某电商网站...
  • 响应式编程简介

    千次阅读 2017-02-17 10:58:56
     你应该对响应式编程这个新事件有点好奇吧,尤其是与之相关的部分框架:Rx、Bacon.js、RAC等等。  在缺乏好的资源的情况下,学习响应式编程成为痛苦。我开始学的时候,做死地找各种教程。结果发现有用的只是...
  • 响应式编程规范

    千次阅读 2019-02-01 16:29:00
    响应式编程的概念已经很早就提出来了,业内很多大牛共同构建了 响应式宣言(中文版)。如果您认可该宣言,可以在其中签下自己的大名。 内容不多,这里我们直接拷贝过来。 在不同领域中深耕的组织都在不约而同地尝试...
  • 函数式编程和响应式编程

    千次阅读 2019-09-17 20:33:30
    响应式编程目标就是,如果b或者c的数值发生变化,a的数值会同时发生变化。函数式编程函数式编程是一系列被不公平对待的编程思想的保护伞,它的核心思想是,它是一种将程序看成是数学方法的求值、不会改变状态、不会...
  • 1、链式编程,主要通过点‘.’来连接不同的函数调用 iOS上实现链式编程比较好的框架就是Masonry,通过查看Masonry源码,我们发现,每一个函数返回类型都是一个Block变量,然后Block变量中返回的内容就是当前对象...
  • 响应式Spring的道法术器(Spring WebFlux 教程)

    万次阅读 多人点赞 2020-01-23 11:48:16
    Spring WebFlux 2小时快速...Reactor 3 响应式编程库(60min) Spring Webflux和Spring Data Reactive开发响应式应用(45min) 通过以上内容相信可以对Spring 5.0 推出的响应式开发有了初步的体会。如果希望有更加...
  • Reactive programming包括Object-oriented reactive programming 和 Functional reactive programming ...面向对象响应编程是结合面向对象编程(OOP)和响应式编程.其可能是目前以最自然的方式体现了某个对象(obj
  • 响应式编程 RxJava系列

    万次阅读 2019-06-13 20:37:33
    一方面反映出现在的高级开发者的追求越来越高逼格,另一方面也反映从从结构化编程到面向对象编程到函数式编程的发展历程,越来越多的应用和面试中都会有ReactiveX,响应式编程中RxJava可谓如鱼得水。 最近也一直在...
  • 响应式电子商城超强实战技术

    万人学习 2019-06-24 13:08:00
    响应式的页面设计是新型互联网发展的方向.它可以根据不同的设备显示效果不同的页面.电子商城一直以来都是互联网的宠儿.电子商城能否承载足够的压力,成为越来越多的电商需要考虑的事.那么在这里你可以搞定一切。
  • 谈谈响应式编程

    千次阅读 2016-10-31 11:33:52
    随着前端框架react,angular以及vue的流行,响应式编程也开始在前端领域得以广泛应用。因此,了解并且理解响应式编程有助于更好地学习这些框架,同时利用好响应式编程的相关工具,可以让编程更加轻松。 什么是响应式...
  • 【Spring 5】响应式Web框架前瞻

    万次阅读 2017-05-30 23:43:12
    引子:被誉为“中国大数据第一人”的涂子沛先生在其成名作《数据之巅》里提到,摩尔定律、社交媒体、数据挖掘是大数据的三大成因。...其中以RxJava和Reactor为代表的响应式(Reactive)编程技术针对的就是经典的大数
  • Project Reactor 是 Spring WebFlux 的御用响应式编程库,与 Spring 是兄弟项目。 关于如何基于Spring的组件进行响应式应用的开发,欢迎阅读我的系列文章《响应式Spring的道法术器》。 官方参考文档地址:...
  • java异步响应式编程

    千次阅读 2018-03-24 22:46:50
    响应式编程 响应式编程是一种怎样编程的套路,是一种特殊场景的代码优化. Remote资源 a.查询一个去过的地点 b.查询推荐的目标地点 c.预测目标地点天气 d.计算推荐地点路线信息 a和b没有依赖关系,所以我们...
  • 前情提要:响应式编程 | 响应式流 1.5 响应式系统 1.5.1 响应式宣言 关注“响应式”的朋友不难搜索到关于“响应式宣言”的介绍,先上图: 这张图凝聚了许多大神的智慧和经验,见官网,中文版官网,如果...
  • java9 响应式编程支持

    千次阅读 2020-02-10 14:42:02
    文章目录概述响应式编程接口demo 概述 java9开始,官方支持了响应式编程规范,提供了顶级的响应式编程接口。 java11开始,官方提供了支持http2的、友好的http客户端java.net.http,该客户端就是jdk内部第一个基于...
1 2 3 4 5 ... 20
收藏数 139,259
精华内容 55,703
关键字:

响应式编程