精华内容
下载资源
问答
  • 本系列其他文章见:《响应式Spring的道法术器》。 前情提要:响应式流 | lambda与函数式 | Reactor快速上手 1.3.3 Spring WebFlux Spring WebFlux是随Spring 5推出的响应式Web框架。 1)服务端技术栈 ...

    本系列其他文章见:《响应式Spring的道法术器》
    前情提要:响应式流 | lambda与函数式 | Reactor快速上手

    1.3.3 Spring WebFlux

    Spring WebFlux是随Spring 5推出的响应式Web框架。

    1)服务端技术栈

    Spring提供了完整的支持响应式的服务端技术栈。

    如上图所示,左侧为基于spring-webmvc的技术栈,右侧为基于spring-webflux的技术栈,

    • Spring WebFlux是基于响应式流的,因此可以用来建立异步的、非阻塞的、事件驱动的服务。它采用Reactor作为首选的响应式流的实现库,不过也提供了对RxJava的支持。
    • 由于响应式编程的特性,Spring WebFlux和Reactor底层需要支持异步的运行环境,比如Netty和Undertow;也可以运行在支持异步I/O的Servlet 3.1的容器之上,比如Tomcat(8.0.23及以上)和Jetty(9.0.4及以上)。
    • 从图的纵向上看,spring-webflux上层支持两种开发模式:
      • 类似于Spring WebMVC的基于注解(@Controller@RequestMapping)的开发模式;
      • Java 8 lambda 风格的函数式开发模式。
    • Spring WebFlux也支持响应式的Websocket服务端开发。

    由此看来,Spring WebFlux与Vert.x有一些相通之处,都是建立在非阻塞的异步I/O和事件驱动的基础之上的。

    2)响应式Http客户端

    此外,Spring WebFlux也提供了一个响应式的Http客户端API WebClient。它可以用函数式的方式异步非阻塞地发起Http请求并处理响应。其底层也是由Netty提供的异步支持。

    我们可以把WebClient看做是响应式的RestTemplate,与后者相比,前者:

    • 是非阻塞的,可以基于少量的线程处理更高的并发;
    • 可以使用Java 8 lambda表达式;
    • 支持异步的同时也可以支持同步的使用方式;
    • 可以通过数据流的方式与服务端进行双向通信。

    当然,与服务端对应的,Spring WebFlux也提供了响应式的Websocket客户端API。

    简单介绍这些,让我们来Coding吧(本文源码)~

    本节,我们仍然是本着“Hello,world!”的精神来上手熟悉WebFlux,因此暂时不会像手册一样面面俱到地谈到WebFlux的各个细节,我们通过以下几个例子来了解它:

    1. 先介绍一下使用Spring WebMVC风格的基于注解的方式如何编写响应式的Web服务,这几乎没有学习成本,非常赞。虽然这种方式在开发上与Spring WebMVC变化不大,但是框架底层已经是完全的响应式技术栈了;
    2. 再进一步介绍函数式的开发模式;
    3. 简单几行代码实现服务端推送(Server Send Event,SSE);
    4. 然后我们再加入响应式数据库的支持(使用Reactive Spring Data for MongoDB);
    5. 使用WebClient与前几步做好的服务端进行通信;
    6. 最后我们看一下如何通过“流”的方式在Http上进行通信。

    Spring Boot 2是基于Spring 5的,其中一个比较大的更新就在于支持包括spring-webflux和响应式的spring-data在内的响应式模块。Spring Boot 2即将发布正式版,不过目前的版本从功能上已经完备,下边的例子我们就用Spring Boot 2在进行搭建。

    1.3.3.1 基于WebMVC注解的方式

    我们首先用Spring WebMVC开发一个只有Controller层的简单的Web服务,然后仅仅做一点点调整就可切换为基于Spring WebFlux的具有同样功能的Web服务。

    我们使用Spring Boot 2搭建项目框架。

    以下截图来自IntelliJ IDEA,不过其他IDE也都是类似的。

    1)基于Spring Initializr创建项目

    本节的例子很简单,不涉及Service层和Dao层,因此只选择spring-webmvc即可,也就是“Web”的starter。

    也可以使用网页版的https://start.spring.io来创建项目:

    创建后的项目POM中,包含下边的依赖,即表示基于Spring WebMVC:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    

    2)创建Controller和Endpoint

    创建Controller类HelloController,仅提供一个Endpoint:/hello

        @RestController
        public class HelloController {
        
            @GetMapping("/hello")
            public String hello() {
                return "Welcome to reactive world ~";
            }
        }
    

    3)启动应用

    OK了,一个简单的基于Spring WebMVC的Web服务。我们新增了HelloController.java,修改了application.properties

    使用IDE启动应用,或使用maven命令:

    mvn spring-boot:run
    

    通过打印的log可以看到,服务运行于Tomcat的8080端口:

    测试Endpoint。在浏览器中访问http://localhost:8080/hello,或运行命令:

    curl http://localhost:8080/hello
    

    返回Welcome to reactive world ~

    基于Spring WebFlux的项目与上边的步骤一致,仅有两点不同。我们这次偷个懒,就不从新建项目了,修改一下上边的项目:

    4)依赖“Reactive Web”的starter而不是“Web”

    修改项目POM,调整依赖使其基于Spring WebFlux:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>    <!--【改】增加“flux”四个字符-->
        </dependency>
    

    5)Controller中处理请求的返回类型采用响应式类型

        @RestController
        public class HelloController {
        
            @GetMapping("/hello")
            public Mono<String> hello() {   // 【改】返回类型为Mono<String>
                return Mono.just("Welcome to reactive world ~");     // 【改】使用Mono.just生成响应式数据
            }
        }
    

    6)启动应用

    仅需要上边两步就改完了,是不是很简单,同样的方法启动应用。启动后发现应用运行于Netty上:

    访问http://localhost:8080/hello,结果与Spring WebMVC的相同。

    7)总结

    从上边这个非常非常简单的例子中可以看出,Spring真是用心良苦,WebFlux提供了与之前WebMVC相同的一套注解来定义请求的处理,使得Spring使用者迁移到响应式开发方式的过程变得异常轻松。

    虽然我们只修改了少量的代码,但是其实这个简单的项目已经脱胎换骨了。整个技术栈从命令式的、同步阻塞的【spring-webmvc + servlet + Tomcat】变成了响应式的、异步非阻塞的【spring-webflux + Reactor + Netty】。

    Netty是一套异步的、事件驱动的网络应用程序框架和工具,能够开发高性能、高可靠性的网络服务器和客户端程序,因此与同样是异步的、事件驱动的响应式编程范式一拍即合。

    下边的内容了解即可,就不实战了。
    在Java 7推出异步I/O库,以及Servlet3.1增加了对异步I/O的支持之后,Tomcat等Servlet容器也随后开始支持异步I/O,然后Spring WebMVC也增加了对Reactor库的支持,所以上边第4)步如果不是将spring-boot-starter-web替换为spring-boot-starter-WebFlux,而是增加reactor-core的依赖的话,仍然可以用注解的方式开发基于Tomcat的响应式应用。

    1.3.3.2 WebFlux的函数式开发模式

    既然是响应式编程了,有些朋友可能会想统一用函数式的编程风格,WebFlux满足你。WebFlux提供了一套函数式接口,可以用来实现类似MVC的效果。我们先接触两个常用的。

    再回头瞧一眼上边例子中我们用Controller定义定义对Request的处理逻辑的方式,主要有两个点:

    1. 方法定义处理逻辑;
    2. 然后用@RequestMapping注解定义好这个方法对什么样url进行响应。

    在WebFlux的函数式开发模式中,我们用HandlerFunctionRouterFunction来实现上边这两点。

    • HandlerFunction相当于Controller中的具体处理方法,输入为请求,输出为装在Mono中的响应:
        Mono<T extends ServerResponse> handle(ServerRequest request);
    
    • RouterFunction,顾名思义,路由,相当于@RequestMapping,用来判断什么样的url映射到那个具体的HandlerFunction,输入为请求,输出为装在Mono里边的Handlerfunction
        Mono<HandlerFunction<T>> route(ServerRequest request);
    

    我们看到,在WebFlux中,请求和响应不再是WebMVC中的ServletRequestServletResponse,而是ServerRequestServerResponse。后者是在响应式编程中使用的接口,它们提供了对非阻塞和回压特性的支持,以及Http消息体与响应式类型Mono和Flux的转换方法。

    下面我们用函数式的方式开发两个Endpoint:

    1. /time返回当前的时间;
    2. /date返回当前的日期。

    对于这两个需求,HandlerFunction很容易写:

        // 返回包含时间字符串的ServerResponse
        HandlerFunction<ServerResponse> timeFunction = 
            request -> ServerResponse.ok().contentType(MediaType.TEXT_PLAIN).body(
                Mono.just("Now is " + new SimpleDateFormat("HH:mm:ss").format(new Date())), String.class);
        
        // 返回包含日期字符串的ServerResponse
        HandlerFunction<ServerResponse> dateFunction = 
            request -> ServerResponse.ok().contentType(MediaType.TEXT_PLAIN).body(
                Mono.just("Today is " + new SimpleDateFormat("yyyy-MM-dd").format(new Date())), String.class);
    

    那么RouterFunction为:

        RouterFunction<ServerResponse> router = 
            RouterFunctions.route(GET("/time"), timeFunction)
                .andRoute(GET("/date"), dateFunction);
    

    按照常见的套路,RouterFunctions是工具类。

    不过这么写在业务逻辑复杂的时候不太好组织,我们通常采用跟MVC类似的代码组织方式,将同类业务的HandlerFunction放在一个类中,然后在Java Config中将RouterFunction配置为Spring容器的Bean。我们继续在第一个例子的代码上开发:

    1)创建统一存放处理时间的Handler类

    创建TimeHandler.java

        import static org.springframework.web.reactive.function.server.ServerResponse.ok;
    
        @Component
        public class TimeHandler {
            public Mono<ServerResponse> getTime(ServerRequest serverRequest) {
                return ok().contentType(MediaType.TEXT_PLAIN).body(Mono.just("Now is " + new SimpleDateFormat("HH:mm:ss").format(new Date())), String.class);
            }
            public Mono<ServerResponse> getDate(ServerRequest serverRequest) {
                return ok().contentType(MediaType.TEXT_PLAIN).body(Mono.just("Today is " + new SimpleDateFormat("yyyy-MM-dd").format(new Date())), String.class);
            }
        }
    

    由于出现次数通常比较多,这里静态引入ServerResponse.ok()方法。

    2)在Spring容器配置RouterFunction

    我们采用Spring现在比较推荐的Java Config的配置Bean的方式,创建用于存放Router的配置类RouterConfig.java

        import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
        import static org.springframework.web.reactive.function.server.RouterFunctions.route;
        
        @Configuration
        public class RouterConfig {
            @Autowired
            private TimeHandler timeHandler;
        
            @Bean
            public RouterFunction<ServerResponse> timerRouter() {
                return route(GET("/time"), req -> timeHandler.getTime(req))
                        .andRoute(GET("/date"), timeHandler::getDate);  // 这种方式相对于上一行更加简洁
            }
        }
    

    3)重启服务试一试

    重启服务测试一下吧:

    $ curl http://localhost:8080/date
    Today is 2018-02-26
    
    $ curl http://localhost:8080/time
    Now is 21:12:53
    

    1.3.3.3 服务器推送

    我们可能会遇到一些需要网页与服务器端保持连接(起码看上去是保持连接)的需求,比如类似微信网页版的聊天类应用,比如需要频繁更新页面数据的监控系统页面或股票看盘页面。我们通常采用如下几种技术:

    • 短轮询:利用ajax定期向服务器请求,无论数据是否更新立马返回数据,高并发情况下可能会对服务器和带宽造成压力;
    • 长轮询:利用comet不断向服务器发起请求,服务器将请求暂时挂起,直到有新的数据的时候才返回,相对短轮询减少了请求次数;
    • SSE:服务端推送(Server Send Event),在客户端发起一次请求后会保持该连接,服务器端基于该连接持续向客户端发送数据,从HTML5开始加入。
    • Websocket:这是也是一种保持连接的技术,并且是双向的,从HTML5开始加入,并非完全基于HTTP,适合于频繁和较大流量的双向通讯场景。

    既然响应式编程是一种基于数据流的编程范式,自然在服务器推送方面得心应手,我们基于函数式方式再增加一个Endpoint /times,可以每秒推送一次时间。

    1)增加Handler方法

    TimeHandler.java

        public Mono<ServerResponse> sendTimePerSec(ServerRequest serverRequest) {
            return ok().contentType(MediaType.TEXT_EVENT_STREAM).body(  // 1
                    Flux.interval(Duration.ofSeconds(1)).   // 2
                            map(l -> new SimpleDateFormat("HH:mm:ss").format(new Date())), 
                    String.class);
        }
    
    1. MediaType.TEXT_EVENT_STREAM表示Content-Typetext/event-stream,即SSE;
    2. 利用interval生成每秒一个数据的流。

    2)配置router

    RouterConfig.java

            @Bean
            public RouterFunction<ServerResponse> timerRouter() {
                return route(GET("/time"), timeHandler::getTime)
                        .andRoute(GET("/date"), timeHandler::getDate)
                        .andRoute(GET("/times"), timeHandler::sendTimePerSec);  // 增加这一行
            }
    

    3)重启服务试一下

    重启服务后,测试一下:

    curl http://localhost:8080/times
    data:21:32:22
    data:21:32:23
    data:21:32:24
    data:21:32:25
    data:21:32:26
    <Ctrl+C>
    

    就酱,访问这个url会收到持续不断的报时数据(时间数据是在data中的)。

    那么用注解的方式如何进行服务端推送呢,这个演示就融到下一个例子中吧~

    1.3.3.3 响应式Spring Data

    开发基于响应式流的应用,就像是在搭建数据流流动的管道,从而异步的数据能够顺畅流过每个环节。前边的例子主要聚焦于应用层,然而绝大多数系统免不了要与数据库进行交互,所以我们也需要响应式的持久层API和支持异步的数据库驱动。就像从自来水厂到家里水龙头这个管道中,如果任何一个环节发生了阻塞,那就可能造成整体吞吐量的下降。

    各个数据库都开始陆续推出异步驱动,目前Spring Data支持的可以进行响应式数据访问的数据库有MongoDB、Redis、Apache Cassandra和CouchDB。今天我们用MongoDB来写一个响应式demo。

    我们这个例子很简单,就是关于User的增删改查,以及基于注解的服务端推送。

    1)编写User

    既然是举例,我们随便定义几个属性吧~

        public class User {
            private String id;
            private String username;
            private String phone;
            private String email;
            private String name;
            private Date birthday;
        }
    

    然后为了方便开发,我们引入lombok库,它能够通过注解的方式为我们添加必要的Getter/Setter/hashCode()/equals()/toString()/构造方法等,添加依赖(版本可自行到http://search.maven.org搜索最新):

    	<dependency>
    		<groupId>org.projectlombok</groupId>
    		<artifactId>lombok</artifactId>
    		<version>1.16.20</version>
    	</dependency>
    

    然后为User添加注解:

        @Data   // 生成无参构造方法/getter/setter/hashCode/equals/toString
        @AllArgsConstructor // 生成所有参数构造方法
        @NoArgsConstructor  // @AllArgsConstructor会导致@Data不生成无参构造方法,需要手动添加@NoArgsConstructor,如果没有无参构造方法,可能会导致比如com.fasterxml.jackson在序列化处理时报错
        public class User {
            ...
    

    我们可以利用IDE看一下生成的方法(如下图黄框所示):

    可能需要先在IDE中进行少量配置以便支持lombok的注解,比如IntelliJ IDEA:

    1. 安装“lombok plugin”:
    1. 开启对注解编译的支持:

    lombok对于Java开发者来说绝对算是个福音了,希望使用Kotlin的朋友不要笑话我们土哦~

    2)增加Spring Data的依赖

    在POM中增加Spring Data Reactive Mongo的依赖:

    	<dependency>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
    	</dependency>
    

    MongoDB是文档型的NoSQL数据库,因此,我们使用@Document注解User类:

        @Data
        @AllArgsConstructor
        @Document
        public class User {
            @Id
            private String id;      // 注解属性id为ID
            @Indexed(unique = true) // 注解属性username为索引,并且不能重复
            private String username;
            private String name;
            private String phone;
            private Date birthday;
        }
    

    OK,这样我们的模型就准备好了。MongoDB会自动创建collection,默认为类名首字母小写,也就是user

    3)配置数据源

    Spring Boot为我们搞定了几乎所有的配置,太赞了,下边是MongoDB的默认配置:

    # MONGODB (MongoProperties)
    spring.data.mongodb.authentication-database= # Authentication database name.
    spring.data.mongodb.database=test # Database name.
    spring.data.mongodb.field-naming-strategy= # Fully qualified name of the FieldNamingStrategy to use.
    spring.data.mongodb.grid-fs-database= # GridFS database name.
    spring.data.mongodb.host=localhost # Mongo server host. Cannot be set with uri.
    spring.data.mongodb.password= # Login password of the mongo server. Cannot be set with uri.
    spring.data.mongodb.port=27017 # Mongo server port. Cannot be set with uri.
    spring.data.mongodb.repositories.enabled=true # Enable Mongo repositories.
    spring.data.mongodb.uri=mongodb://localhost/test # Mongo database URI. Cannot be set with host, port and credentials.
    spring.data.mongodb.username= # Login user of the mongo server. Cannot be set with uri.
    

    请根据需要添加自定义的配置,比如我的MongoDB是跑在IP为192.168.0.101的虚拟机的Docker中的,就可在application.properties中增加一条:

    spring.data.mongodb.host=192.168.0.101
    

    4)增加DAO层repository

    与非响应式Spring Data的CrudReposity对应的,响应式的Spring Data也提供了相应的Repository库:ReactiveCrudReposity,当然,我们也可以使用它的子接口ReactiveMongoRepository

    我们增加UserRepository

        public interface UserRepository extends ReactiveCrudRepository<User, String> {  // 1
            Mono<User> findByUsername(String username);     // 2
            Mono<Long> deleteByUsername(String username);
        }
    
    1. 同样的,ReactiveCrudRepository的泛型分别是UserID的类型;
    2. ReactiveCrudRepository已经提供了基本的增删改查的方法,根据业务需要,我们增加四个方法(在此膜拜一下Spring团队的牛人们,使得我们仅需按照规则定义接口方法名即可完成DAO层逻辑的开发,牛~)

    5)Service层

    由于业务逻辑几乎为零,只是简单调用了DAO层,直接贴代码:

        @Service
        public class UserService {
            @Autowired
            private UserRepository userRepository;
        
            /**
             * 保存或更新。
             * 如果传入的user没有id属性,由于username是unique的,在重复的情况下有可能报错,
             * 这时找到以保存的user记录用传入的user更新它。
             */
            public Mono<User> save(User user) {
                return userRepository.save(user)
                        .onErrorResume(e ->     // 1
                                userRepository.findByUsername(user.getUsername())   // 2
                                        .flatMap(originalUser -> {      // 4
                                            user.setId(originalUser.getId());
                                            return userRepository.save(user);   // 3
                                        }));
            }
        
            public Mono<Long> deleteByUsername(String username) {
                return userRepository.deleteByUsername(username);
            }
        
            public Mono<User> findByUsername(String username) {
                return userRepository.findByUsername(username);
            }
            
            public Flux<User> findAll() {
                return userRepository.findAll();
            }
        }
    
    1. onErrorResume进行错误处理;
    2. 找到username重复的记录;
    3. 拿到ID从而进行更新而不是创建;
    4. 由于函数式为User -> Publisher,所以用flatMap

    6)Controller层

    直接贴代码:

        @RestController
        @RequestMapping("/user")
        public class UserController {
            @Autowired
            private UserService userService;
        
            @PostMapping("")
            public Mono<User> save(User user) {
                return this.userService.save(user);
            }
        
            @DeleteMapping("/{username}")
            public Mono<Long> deleteByUsername(@PathVariable String username) {
                return this.userService.deleteByUsername(username);
            }
        
            @GetMapping("/{username}")
            public Mono<User> findByUsername(@PathVariable String username) {
                return this.userService.findByUsername(username);
            }
        
            @GetMapping("")
            public Flux<User> findAll() {
                return this.userService.findAll();
            }
        }
    

    7)启动应用测试一下

    由于涉及到POST和DELETE方法的请求,建议用支持RESTful的client来测试,比如“Restlet client”:

    如图,增加操作是成功的,只要username不变,再次发送请求会更新该记录。

    图中birthday的时间差8小时,不去管它。

    用同样的方法增加一个李四,之后我们再来测试一下查询。

    1. 根据用户名查询(METHOD:GET URL:http://localhost:8080/user/zhangsan),下边输出是格式化的JSON:

      {
      “id”: “5a9504a167646d057051e229”,
      “username”: “zhangsan”,
      “name”: “张三”,
      “phone”: “18610861861”,
      “birthday”: “1989-12-31T16:00:00.000+0000”
      }

    2. 查询全部(METHOD:GET URL:http://localhost:8080/user)

      [{“id”:“5a9504a167646d057051e229”,“username”:“zhangsan”,“name”:“张三”,“phone”:“18610861861”,“birthday”:“1989-12-31T16:00:00.000+0000”},{“id”:“5a9511db67646d3c782f2e7f”,“username”:“lisi”,“name”:“李四”,“phone”:“18610861862”,“birthday”:“1992-02-01T16:00:00.000+0000”}]

    测试一下删除(METHOD:DELETE URL:http://localhost:8080/user/zhangsan),返回值为1,再查询全部,发现张三已经被删除了,OK。

    8)stream+json

    看到这里细心的朋友可能会有点嘀咕,怎么看是不是异步的呢?毕竟查询全部的时候,结果都用中括号括起来了,这和原来返回List<User>的效果似乎没多大区别。假设一下查询100个数据,如果是异步的话,以我们对“异步响应式流”的印象似乎应该是一个一个至少是一批一批的到达客户端的嘛。我们加个延迟验证一下:

    	@GetMapping("")
    	public Flux<User> findAll() {
    	    return this.userService.findAll().delayElements(Duration.ofSeconds(1));
    	}
    

    每个元素都延迟1秒,现在我们在数据库里弄三条记录,然后请求查询全部的那个URL,发现并不是像/times一样一秒一个地出来,而是3秒之后一块儿出来的。果然如此,这一点都不响应式啊!

    /times类似,我们也加一个MediaType,不过由于这里返回的是JSON,因此不能使用TEXT_EVENT_STREAM,而是使用APPLICATION_STREAM_JSON,即application/stream+json格式。

    @GetMapping(value = "", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<User> findAll() {
        return this.userService.findAll().delayElements(Duration.ofSeconds(2));
    }
    
    1. produces后边的值应该是application/stream+json字符串,因此用APPLICATION_STREAM_JSON_VALUE

    重启服务再次请求,发现三个user是一秒一个的速度出来的,中括号也没有了,而是一个一个独立的JSON值构成的json stream:

    {"id":"5a9504a167646d057051e229","username":"zhangsan","name":"张三","phone":"18610861861","birthday":"1989-12-31T16:00:00.000+0000"}
    {"id":"5a9511db67646d3c782f2e7f","username":"lisi","name":"李四","phone":"18610861862","birthday":"1992-02-01T16:00:00.000+0000"}
    {"id":"5a955f08fa10b93ec48df37f","username":"wangwu","name":"王五","phone":"18610861865","birthday":"1995-05-04T16:00:00.000+0000"}
    

    9)总结

    如果有Spring Data开发经验的话,切换到Spring Data Reactive的难度并不高。跟Spring WebFlux类似:原来返回User的话,那现在就返回Mono<User>;原来返回List<User>的话,那现在就返回Flux<User>

    对于稍微复杂的业务逻辑或一些必要的异常处理,比如上边的save方法,请一定采用响应式的编程方式来定义,从而一切都是异步非阻塞的。如下图所示,从HttpServer(如Netty或Servlet3.1以上的Servlet容器)到ServerAdapter(Spring WebFlux框架提供的针对不同server的适配器),到我们编写的Controller和DAO,以及异步数据库驱动,构成了一个完整的异步非阻塞的管道,里边流动的就是响应式流。

    1.3.3.4 使用WebClient开发响应式Http客户端

    下面,我们用WebClient测试一下前边几个例子的成果。

    1) /hello,返回Mono

        @Test
        public void webClientTest1() throws InterruptedException {
            WebClient webClient = WebClient.create("http://localhost:8080");   // 1
            Mono<String> resp = webClient
                    .get().uri("/hello") // 2
                    .retrieve() // 3
                    .bodyToMono(String.class);  // 4
            resp.subscribe(System.out::println);    // 5
            TimeUnit.SECONDS.sleep(1);  // 6
        }
    
    1. 创建WebClient对象并指定baseUrl;
    2. HTTP GET;
    3. 异步地获取response信息;
    4. 将response body解析为字符串;
    5. 打印出来;
    6. 由于是异步的,我们将测试线程sleep 1秒确保拿到response,也可以像前边的例子一样用CountDownLatch

    运行效果如下:

    2) /user,返回Flux

    为了多演示一些不同的实现方式,下边的例子我们调整几个地方,但是效果跟上边是一样的:

        @Test
        public void webClientTest2() throws InterruptedException {
            WebClient webClient = WebClient.builder().baseUrl("http://localhost:8080").build(); // 1
            webClient
                    .get().uri("/user")
                    .accept(MediaType.APPLICATION_STREAM_JSON) // 2
                    .exchange() // 3
                    .flatMapMany(response -> response.bodyToFlux(User.class))   // 4
                    .doOnNext(System.out::println)  // 5
                    .blockLast();   // 6
        }
    
    1. 这次我们使用WebClientBuilder来构建WebClient对象;
    2. 配置请求Header:Content-Type: application/stream+json
    3. 获取response信息,返回值为ClientResponseretrive()可以看做是exchange()方法的“快捷版”;
    4. 使用flatMap来将ClientResponse映射为Flux;
    5. 只读地peek每个元素,然后打印出来,它并不是subscribe,所以不会触发流;
    6. 上个例子中sleep的方式有点low,blockLast方法,顾名思义,在收到最后一个元素前会阻塞,响应式业务场景中慎用。

    运行效果如下:
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dnZqusDX-1615630172707)(https://leanote.com/api/file/getImage?fileId=5a9570d6ab64415d3400b4f0)]

    3) /times,服务端推送

        @Test
        public void webClientTest3() throws InterruptedException {
            WebClient webClient = WebClient.create("http://localhost:8080");
            webClient
                    .get().uri("/times")
                    .accept(MediaType.TEXT_EVENT_STREAM)    // 1
                    .retrieve()
                    .bodyToFlux(String.class)
                    .log()  // 2
                    .take(10)   // 3
                    .blockLast();
        }
    
    1. 配置请求Header:Content-Type: text/event-stream,即SSE;
    2. 这次用log()代替doOnNext(System.out::println)来查看每个元素;
    3. 由于/times是一个无限流,这里取前10个,会导致流被取消

    运行效果如下:

    1.3.3.5 让数据在Http上双向无限流动起来

    许多朋友看到这个题目会想到Websocket,的确,Websocket确实可以实现全双工通信,但它的数据传输并非是完全基于HTTP协议的,关于Websocket我们后边再聊。

    下面我们实现一个这样两个Endpoint:

    • POST方法的/events,“源源不断”地收集数据,并存入数据库;
    • GET方法的/events,“源源不断”将数据库中的记录发出来。

    0)准备

    一、数据模型MyEvent

        @Data
        @AllArgsConstructor
        @NoArgsConstructor
        @Document(collection = "event") // 1
        public class MyEvent {
            @Id
            private Long id;    // 2
            private String message;
        }
    
    1. 指定collection名为event
    2. 这次我们使用表示时间的long型数据作为ID。

    二、DAO层:

        public interface MyEventRepository extends ReactiveMongoRepository<MyEvent, Long> { // 1
        }
    
    1. 下边用到了可以保存Flux的insert(Flux)方法,这个方法是在ReactiveMongoRepository中定义的。

    三、简单起见就不要Service层了,直接Controller:

        @RestController
        @RequestMapping("/events")
        public class MyEventController {
            @Autowired
            private MyEventRepository myEventRepository;
        
            @PostMapping(path = "")
            public Mono<Void> loadEvents(@RequestBody Flux<MyEvent> events) {   // 1
                // TODO
                return null;
            }
        
            @GetMapping(path = "", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
            public Flux<MyEvent> getEvents() {  // 2
                // TODO
                return null;
            }
        }
    
    1. POST方法的接收数据流的Endpoint,所以传入的参数是一个Flux,返回结果其实就看需要了,我们用一个Mono<Void>作为方法返回值,表示如果传输完的话只给一个“完成信号”就OK了;
    2. GET方法的无限发出数据流的Endpoint,所以返回结果是一个Flux<MyEvent>,不要忘了注解上produces = MediaType.APPLICATION_STREAM_JSON_VALUE

    准备到此为止,类如下。我们来完成上边的两个TODO吧。

    1)接收数据流的Endpoint

    在客户端,WebClient可以接收text/event-streamapplication/stream+json格式的数据流,也可以在请求的时候上传一个数据流到服务器;
    在服务端,WebFlux也支持接收一个数据流作为请求参数,从而实现一个接收数据流的Endpoint。

    我们先看服务端。Controller中的loadEvents方法:

        @PostMapping(path = "", consumes = MediaType.APPLICATION_STREAM_JSON_VALUE) // 1
        public Mono<Void> loadEvents(@RequestBody Flux<MyEvent> events) {
            return this.myEventRepository.insert(events).then();    // 2
        }
    
    1. 指定传入的数据是application/stream+json,与getEvents方法的区别在于这个方法是consume这个数据流;
    2. insert返回的是保存成功的记录的Flux,但我们不需要,使用then方法表示“忽略数据元素,只返回一个完成信号”。

    服务端写好后,启动之,再看一下客户端怎么写(还是放在src/test下):

        @Test
        public void webClientTest4() {
            Flux<MyEvent> eventFlux = Flux.interval(Duration.ofSeconds(1))
                    .map(l -> new MyEvent(System.currentTimeMillis(), "message-" + l)).take(5); // 1
            WebClient webClient = WebClient.create("http://localhost:8080");
            webClient
                    .post().uri("/events")
                    .contentType(MediaType.APPLICATION_STREAM_JSON) // 2
                    .body(eventFlux, MyEvent.class) // 3
                    .retrieve()
                    .bodyToMono(Void.class)
                    .block();
        }
    
    1. 声明速度为每秒一个MyEvent元素的数据流,不加take的话表示无限个元素的数据流;
    2. 声明请求体的数据格式为application/stream+json
    3. body方法设置请求体的数据。

    运行一下这个测试,根据控制台数据可以看到是一条一条将数据发到/events的,看一下MongoDB中的数据:

    2)发出无限流的Endpoint

    回想一下前边/user的例子,当数据库中所有的内容都查询出来之后,这个流就结束了,因为其后跟了一个“完成信号”,我们可以通过在UserServicefindAll()方法的流上增加log()操作符来观察更详细的日志:

    我们可以看到在三个onNext信号后是一个onComplete信号。

    这样的流是有限流,这个时候如果在数据库中再新增一个User的话,已经结束的请求也不会再有新的内容出现了。

    反观/times请求,它会无限地发出SSE,而不会有“完成信号”出现,这是无限流。

    我们希望的情况是无论是请求GET的/events之后,当所有数据都发完之后,不要结束,而是挂起等待新的数据。如果我们用上边的POST的/events传入新的数据到数据库后,新的数据会自动地流到客户端。

    这可以在DAO层配置实现:

        public interface MyEventRepository extends ReactiveMongoRepository<MyEvent, Long> {
            @Tailable   // 1
            Flux<MyEvent> findBy(); // 2
        }
    
    1. @Tailable注解的作用类似于linux的tail命令,被注解的方法将发送无限流,需要注解在返回值为Flux这样的多个元素的Publisher的方法上;
    2. findAll()是想要的方法,但是在ReactiveMongoRepository中我们够不着,所以使用findBy()代替。

    然后完成Controller中的方法:

        @GetMapping(path = "", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
        public Flux<MyEvent> getEvents() {
            return this.myEventRepository.findBy();
        }
    

    不过,这还不够,@Tailable仅支持有大小限制的(“capped”)collection,而自动创建的collection是不限制大小的,因此我们需要先手动创建。Spring Boot提供的CommandLineRunner可以帮助我们实现这一点。

    Spring Boot应用程序在启动后,会遍历CommandLineRunner接口的实例并运行它们的run方法。

        @Bean   // 1
        public CommandLineRunner initData(MongoOperations mongo) {  // 2
            return (String... args) -> {    // 3
                mongo.dropCollection(MyEvent.class);    // 4
                mongo.createCollection(MyEvent.class, CollectionOptions.empty().size(200).capped()); // 5
            };
        }
    
    1. 对于复杂的Bean只能通过Java Config的方式配置,这也是为什么Spring3之后官方推荐这种配置方式的原因,这段代码可以放到配置类中,本例我们就直接放到启动类WebFluxDemoApplication了;
    2. MongoOperations提供对MongoDB的操作方法,由Spring注入的mongo实例已经配置好,直接使用即可;
    3. CommandLineRunner也是一个函数式接口,其实例可以用lambda表达;
    4. 如果有,先删除collection,生产环境慎用这种操作;
    5. 创建一个记录个数为10的capped的collection,容量满了之后,新增的记录会覆盖最旧的。

    启动应用,我们检查一下event collection:

    OK,这个时候我们请求一下http://localhost:8080/events,发现立马返回了,并没有挂起。原因在于collection中一条记录都没有,而@Tailable起作用的前提是至少有一条记录。

    跑一下WebClient测试程序插入5条数据,然后再次请求:

    请求是挂起的,这没错,但是只有两条数据,看WebClient测试程序的控制台明明发出了5个请求啊。

    原因定义的CollectionOptions.empty().size(200).capped()中,size指的是以字节为单位的大小,并且会向上取到256的整倍数,所以我们刚才定义的是256byte大小的collection,所以最多容纳两条记录。我们可以这样改一下:

    CollectionOptions.empty().maxDocuments(200).size(100000).capped()
    

    maxDocuments限制了记录条数,size限制容量且是必须定义的,因为MongoDB不像关系型数据库有严格的列和字段大小定义,鬼知道会存多大的数据进来,所以容量限制是必要的。

    好了,再次启动应用,先插入5条数据,然后请求/events,收到5条记录后请求仍然挂起,在插入5条数据,curl客户端又会陆续收到新的数据。

    我们用代码搭建了图中箭头所表示的“管道”,看效果还是很畅通的嘛。现在再回想我们最初的那个Excel的例子,是不是感觉这个demo很有响应式的“范儿”了呢?

    1.3.3.6 总结

    这一节,我们对WebFlux做了一个简单的基于实例的介绍,相信你对响应式编程及其在WEB应用中如何发挥作用有了更多的体会,本章的实战是比较基础的,初衷是希望能够通过上手编写代码体会响应式编程的感觉,因为切换到响应式思维方式并非易事。

    这一章的核心关键词其实翻来覆去就是:“异步非阻塞的响应式流”。我们了解了异步非阻塞的好处,也知道如何让数据流动起来,下面我们就通过对实例的性能测试,借助实实在在的数据,真切感受一下异步非阻塞的“丝滑”。

    展开全文
  • 产品经理的势道法术器.pdf
  • 今天我给大家分享的话题是DevOps的道法术器,这是由高效运维社区和DevOps时代社区联合发布的DevOps体系化实施框架,我们希望通过这个框架的发布,能够帮助业界朋友们更好实施和践行DevOps。近几年比较流行的一个概念...
  • (4)Reactor 3快速上手——响应式Spring的道法术器

    万次阅读 多人点赞 2018-03-08 09:54:18
    本系列其他文章见:《响应式Spring的道法术器》。 前情提要:响应式流 | lambda与函数式 1.3.2 Reactor Reactor与Spring是兄弟项目,侧重于Server端的响应式编程,主要 artifact 是 reactor-core,这是一个...

    本系列其他文章见:《响应式Spring的道法术器》
    前情提要:响应式流 | lambda与函数式

    1.3.2 Reactor

    Reactor与Spring是兄弟项目,侧重于Server端的响应式编程,主要 artifact 是 reactor-core,这是一个基于 Java 8 的实现了响应式流规范 (Reactive Streams specification)的响应式库。

    本文对Reactor的介绍以基本的概念和简单的使用为主,深度以能够满足基本的Spring WebFlux使用为准。在下一章,我会结合Reactor的设计模式、并发调度模型等原理层面的内容系统介绍Reactor的使用。

    本文源码

    光说不练假把式,我们先把练习用的项目搭起来。先创建一个maven项目,然后添加依赖:

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-core</artifactId>
            <version>3.1.4.RELEASE</version>
        </dependency>
    

    最新版本可到http://search.maven.org查询,复制过来即可。另外出于测试的需要,添加如下依赖:

        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <version>3.1.4.RELEASE</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    

    好了,我们开始Coding吧。

    1.3.2.1 Flux与Mono

    Reactor中的发布者(Publisher)由FluxMono两个类定义,它们都提供了丰富的操作符(operator)。一个Flux对象代表一个包含0…N个元素的响应式序列,而一个Mono对象代表一个包含零/一个(0…1)元素的结果。

    既然是“数据流”的发布者,Flux和Mono都可以发出三种“数据信号”:元素值、错误信号、完成信号,错误信号和完成信号都是终止信号,完成信号用于告知下游订阅者该数据流正常结束,错误信号终止数据流的同时将错误传递给下游订阅者。

    下图所示就是一个Flux类型的数据流,黑色箭头是时间轴。它连续发出“1” - “6”共6个元素值,以及一个完成信号(图中⑥后边的加粗竖线来表示),完成信号告知订阅者数据流已经结束。


    下图所示是一个Mono类型的数据流,它发出一个元素值后,又发出一个完成信号。

    既然Flux具有发布一个数据元素的能力,为什么还要专门定义一个Mono类呢?举个例子,一个HTTP请求产生一个响应,所以对其进行“count”操作是没有多大意义的。表示这样一个结果的话,应该用Mono<HttpResponse>而不是 Flux<HttpResponse>,对于的操作通常只用于处理 0/1 个元素。它们从语义上就原生包含着元素个数的信息,从而避免了对Mono对象进行多元素场景下的处理。

    有些操作可以改变基数,从而需要切换类型。比如,count操作用于Flux,但是操作返回的结果是Mono<Long>

    我们可以用如下代码声明上边两幅图所示的Flux和Mono:

    Flux.just(1, 2, 3, 4, 5, 6);
    Mono.just(1);
    

    Flux和Mono提供了多种创建数据流的方法,just就是一种比较直接的声明数据流的方式,其参数就是数据元素。

    对于图中的Flux,还可以通过如下方式声明(分别基于数组、集合和Stream生成):

    Integer[] array = new Integer[]{1,2,3,4,5,6};
    Flux.fromArray(array);
    List<Integer> list = Arrays.asList(array);
    Flux.fromIterable(list);
    Stream<Integer> stream = list.stream();
    Flux.fromStream(stream);
    

    不过,这三种信号都不是一定要具备的:

    • 首先,错误信号和完成信号都是终止信号,二者不可能同时共存;
    • 如果没有发出任何一个元素值,而是直接发出完成/错误信号,表示这是一个空数据流;
    • 如果没有错误信号和完成信号,那么就是一个无限数据流。

    比如,对于只有完成/错误信号的数据流:

    // 只有完成信号的空数据流
    Flux.just();
    Flux.empty();
    Mono.empty();
    Mono.justOrEmpty(Optional.empty());
    // 只有错误信号的数据流
    Flux.error(new Exception("some error"));
    Mono.error(new Exception("some error"));
    

    你可能会纳闷,空的数据流有什么用?举个例子,当我们从响应式的DB中获取结果的时候(假设DAO层是ReactiveRepository<User>),就有可能为空:

     Mono<User> findById(long id);
     Flux<User> findAll();
    

    无论是空还是发生异常,都需要通过完成/错误信号告知订阅者,已经查询完毕,但是抱歉没有得到值,礼貌问题嘛~

    1.3.2.2 订阅前什么都不会发生

    数据流有了,假设我们想把每个数据元素原封不动地打印出来:

    Flux.just(1, 2, 3, 4, 5, 6).subscribe(System.out::print);
    System.out.println();
    Mono.just(1).subscribe(System.out::println);
    

    输出如下:

    123456
    1
    

    可见,subscribe方法中的lambda表达式作用在了每一个数据元素上。此外,Flux和Mono还提供了多个subscribe方法的变体:

    // 订阅并触发数据流
    subscribe(); 
    // 订阅并指定对正常数据元素如何处理
    subscribe(Consumer<? super T> consumer); 
    // 订阅并定义对正常数据元素和错误信号的处理
    subscribe(Consumer<? super T> consumer,
              Consumer<? super Throwable> errorConsumer); 
    // 订阅并定义对正常数据元素、错误信号和完成信号的处理
    subscribe(Consumer<? super T> consumer,
              Consumer<? super Throwable> errorConsumer,
              Runnable completeConsumer); 
    // 订阅并定义对正常数据元素、错误信号和完成信号的处理,以及订阅发生时的处理逻辑
    subscribe(Consumer<? super T> consumer,
              Consumer<? super Throwable> errorConsumer,
              Runnable completeConsumer,
              Consumer<? super Subscription> subscriptionConsumer); 
    

    1)如果是订阅上边声明的Flux:

    Flux.just(1, 2, 3, 4, 5, 6).subscribe(
        System.out::println,
        System.err::println,
        () -> System.out.println("Completed!"));
    

    输出如下:

    1
    2
    3
    4
    5
    6
    Completed!
    

    2)再举一个有错误信号的例子:

    Mono.error(new Exception("some error")).subscribe(
            System.out::println,
            System.err::println,
            () -> System.out.println("Completed!")
    );
    

    输出如下:

    java.lang.Exception: some error
    

    打印出了错误信号,没有输出Completed!表明没有发出完成信号。

    这里需要注意的一点是,Flux.just(1, 2, 3, 4, 5, 6)仅仅声明了这个数据流,此时数据元素并未发出,只有subscribe()方法调用的时候才会触发数据流。所以,订阅前什么都不会发生

    1.3.2.3 测试与调试

    从命令式和同步式编程切换到响应式和异步式编程有时候是令人生畏的。学习曲线中最陡峭的地方就是出错时如何分析和调试。

    在命令式世界,调试通常都是非常直观的:直接看 stack trace 就可以找到问题出现的位置, 以及其他信息:是否问题责任全部出在你自己的代码?问题是不是发生在某些库代码?如果是, 那你的哪部分代码调用了库,是不是传参不合适导致的问题?等等。

    当你切换到响应式的异步代码,事情就变得复杂的多了。不过我们先不接触过于复杂的内容,先了解一个基本的单元测试工具——StepVerifier

    最常见的测试 Reactor 序列的场景就是定义一个 Flux 或 Mono,然后在订阅它的时候测试它的行为。

    当你的测试关注于每一个数据元素的时候,就非常贴近使用 StepVerifier 的测试场景: 下一个期望的数据或信号是什么?你是否期望使用 Flux 来发出某一个特别的值?或者是否接下来 300ms 什么都不做?——所有这些都可以使用 StepVerifier API 来表示。

    还是以那个1-6的Flux以及会发出错误信号的Mono为例:

    private Flux<Integer> generateFluxFrom1To6() {
        return Flux.just(1, 2, 3, 4, 5, 6);
    }
    private Mono<Integer> generateMonoWithError() {
        return Mono.error(new Exception("some error"));
    }
    @Test
    public void testViaStepVerifier() {
        StepVerifier.create(generateFluxFrom1To6())
                .expectNext(1, 2, 3, 4, 5, 6)
                .expectComplete()
                .verify();
        StepVerifier.create(generateMonoWithError())
                .expectErrorMessage("some error")
                .verify();
    }
    

    其中,expectNext用于测试下一个期望的数据元素,expectErrorMessage用于校验下一个元素是否为错误信号,expectComplete用于测试下一个元素是否为完成信号。

    StepVerifier还提供了其他丰富的测试方法,我们会在后续的介绍中陆续接触到。

    1.3.2.4 操作符(Operator)

    通常情况下,我们需要对源发布者发出的原始数据流进行多个阶段的处理,并最终得到我们需要的数据。这种感觉就像是一条流水线,从流水线的源头进入传送带的是原料,经过流水线上各个工位的处理,逐渐由原料变成半成品、零件、组件、成品,最终成为消费者需要的包装品。这其中,流水线源头的下料机就相当于源发布者,消费者就相当于订阅者,流水线上的一道道工序就相当于一个一个的操作符(Operator)。

    下面介绍一些我们常用的操作符。

    1)map - 元素映射为新元素

    map操作可以将数据元素进行转换/映射,得到一个新元素。

    public final <V> Flux<V> map(Function<? super T,? extends V> mapper)
    public final <R> Mono<R> map(Function<? super T, ? extends R> mapper) 
    

    上图是Flux的map操作示意图,上方的箭头是原始序列的时间轴,下方的箭头是经过map处理后的数据序列时间轴。

    map接受一个Function的函数式接口为参数,这个函数式的作用是定义转换操作的策略。举例说明:

    StepVerifier.create(Flux.range(1, 6)    // 1
                .map(i -> i * i))   // 2
                .expectNext(1, 4, 9, 16, 25, 36)    //3
                .expectComplete();  // 4
    
    1. Flux.range(1, 6)用于生成从“1”开始的,自增为1的“6”个整型数据;
    2. map接受lambdai -> i * i为参数,表示对每个数据进行平方;
    3. 验证新的序列的数据;
    4. verifyComplete()相当于expectComplete().verify()

    2)flatMap - 元素映射为流

    flatMap操作可以将每个数据元素转换/映射为一个流,然后将这些流合并为一个大的数据流。

    注意到,流的合并是异步的,先来先到,并非是严格按照原始序列的顺序(如图蓝色和红色方块是交叉的)。

    public final <R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper)
    public final <R> Mono<R> flatMap(Function<? super T, ? extends Mono<? extends R>> transformer) 
    

    flatMap也是接收一个Function的函数式接口为参数,这个函数式的输入为一个T类型数据值,对于Flux来说输出可以是Flux和Mono,对于Mono来说输出只能是Mono。举例说明:

        StepVerifier.create(
            Flux.just("flux", "mono")
                    .flatMap(s -> Flux.fromArray(s.split("\\s*"))   // 1
                            .delayElements(Duration.ofMillis(100))) // 2
                    .doOnNext(System.out::print)) // 3
            .expectNextCount(8) // 4
            .verifyComplete();
    
    1. 对于每一个字符串s,将其拆分为包含一个字符的字符串流;
    2. 对每个元素延迟100ms;
    3. 对每个元素进行打印(注doOnNext方法是“偷窥式”的方法,不会消费数据流);
    4. 验证是否发出了8个元素。

    打印结果为mfolnuox,原因在于各个拆分后的小字符串都是间隔100ms发出的,因此会交叉。

    flatMap通常用于每个元素又会引入数据流的情况,比如我们有一串url数据流,需要请求每个url并收集response数据。假设响应式的请求方法如下:

    Mono<HttpResponse> requestUrl(String url) {...}
    

    而url数据流为一个Flux<String> urlFlux,那么为了得到所有的HttpResponse,就需要用到flatMap:

    urlFlux.flatMap(url -> requestUrl(url));
    

    其返回内容为Flux<HttpResponse>类型的HttpResponse流。

    3)filter - 过滤

    filter操作可以对数据元素进行筛选。

    public final Flux<T> filter(Predicate<? super T> tester)
    public final Mono<T> filter(Predicate<? super T> tester) 
    

    filter接受一个Predicate的函数式接口为参数,这个函数式的作用是进行判断并返回boolean。举例说明:

    StepVerifier.create(Flux.range(1, 6)
                .filter(i -> i % 2 == 1)    // 1
                .map(i -> i * i))
                .expectNext(1, 9, 25)   // 2
                .verifyComplete();
    
    1. filter的lambda参数表示过滤操作将保留奇数;
    2. 验证仅得到奇数的平方。

    4)zip - 一对一合并

    看到zip这个词可能会联想到拉链,它能够将多个流一对一的合并起来。zip有多个方法变体,我们介绍一个最常见的二合一的。

    它对两个Flux/Mono流每次各取一个元素,合并为一个二元组(Tuple2):

    public static <T1,T2> Flux<Tuple2<T1,T2>> zip(Publisher<? extends T1> source1,
                                              Publisher<? extends T2> source2)
    public static <T1, T2> Mono<Tuple2<T1, T2>> zip(Mono<? extends T1> p1, Mono<? extends T2> p2) 
    

    Fluxzip方法接受Flux或Mono为参数,Monozip方法只能接受Mono类型的参数。

    举个例子,假设我们有一个关于zip方法的说明:“Zip two sources together, that is to say wait for all the sources to emit one element and combine these elements once into a Tuple2.”,我们希望将这句话拆分为一个一个的单词并以每200ms一个的速度发出,除了前面flatMap的例子中用到的delayElements,可以如下操作:

    private Flux<String> getZipDescFlux() {
        String desc = "Zip two sources together, that is to say wait for all the sources to emit one element and combine these elements once into a Tuple2.";
        return Flux.fromArray(desc.split("\\s+"));  // 1
    }
    
    @Test
    public void testSimpleOperators() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);  // 2
        Flux.zip(
                getZipDescFlux(),
                Flux.interval(Duration.ofMillis(200)))  // 3
                .subscribe(t -> System.out.println(t.getT1()), null, countDownLatch::countDown);    // 4
        countDownLatch.await(10, TimeUnit.SECONDS);     // 5
    }
    
    1. 将英文说明用空格拆分为字符串流;
    2. 定义一个CountDownLatch,初始为1,则会等待执行1次countDown方法后结束,不使用它的话,测试方法所在的线程会直接返回而不会等待数据流发出完毕;
    3. 使用Flux.interval声明一个每200ms发出一个元素的long数据流;因为zip操作是一对一的,故而将其与字符串流zip之后,字符串流也将具有同样的速度;
    4. zip之后的流中元素类型为Tuple2,使用getT1方法拿到字符串流的元素;定义完成信号的处理为countDown;
    5. countDownLatch.await(10, TimeUnit.SECONDS)会等待countDown倒数至0,最多等待10秒钟。

    除了zip静态方法之外,还有zipWith等非静态方法,效果与之类似:

    getZipDescFlux().zipWith(Flux.interval(Duration.ofMillis(200)))
    

    在异步条件下,数据流的流速不同,使用zip能够一对一地将两个或多个数据流的元素对齐发出。

    5)更多

    Reactor中提供了非常丰富的操作符,除了以上几个常见的,还有:

    • 用于编程方式自定义生成数据流的creategenerate等及其变体方法;
    • 用于“无副作用的peek”场景的doOnNextdoOnErrordoOncompletedoOnSubscribedoOnCancel等及其变体方法;
    • 用于数据流转换的whenand/ormergeconcatcollectcountrepeat等及其变体方法;
    • 用于过滤/拣选的takefirstlastsampleskiplimitRequest等及其变体方法;
    • 用于错误处理的timeoutonErrorReturnonErrorResumedoFinallyretryWhen等及其变体方法;
    • 用于分批的windowbuffergroup等及其变体方法;
    • 用于线程调度的publishOnsubscribeOn方法。

    使用这些操作符,你几乎可以搭建出能够进行任何业务需求的数据处理管道/流水线。

    抱歉以上这些暂时不能一一介绍,更多详情请参考JavaDoc,在下一章我们还会回头对Reactor从更深层次进行系统的分析。

    此外,也可阅读我翻译的Reactor参考文档,我会尽量及时更新翻译的内容。文档源码位于github,如有翻译不当,欢迎提交Pull-Request。

    1.3.2.5 调度器与线程模型

    在Reactor中,对于多线程并发调度的处理变得异常简单。

    在以往的多线程开发场景中,我们通常使用Executors工具类来创建线程池,通常有如下四种类型:

    • newCachedThreadPool创建一个弹性大小缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程;
    • newFixedThreadPool创建一个大小固定的线程池,可控制线程最大并发数,超出的线程会在队列中等待;
    • newScheduledThreadPool创建一个大小固定的线程池,支持定时及周期性的任务执行;
    • newSingleThreadExecutor创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。

    此外,newWorkStealingPool还可以创建支持work-stealing的线程池。

    说良心话,Java提供的Executors工具类使得我们对ExecutorService使用已经非常得心应手了。BUT~ Reactor让线程管理和任务调度更加“傻瓜”——调度器(Scheduler)帮助我们搞定这件事。Scheduler是一个拥有多个实现类的抽象接口。Schedulers类(按照通常的套路,最后为s的就是工具类咯)提供的静态方法可搭建以下几种线程执行环境:

    • 当前线程(Schedulers.immediate());
    • 可重用的单线程(Schedulers.single())。注意,这个方法对所有调用者都提供同一个线程来使用, 直到该调度器被废弃。如果你想使用独占的线程,请使用Schedulers.newSingle()
    • 弹性线程池(Schedulers.elastic())。它根据需要创建一个线程池,重用空闲线程。线程池如果空闲时间过长 (默认为 60s)就会被废弃。对于 I/O 阻塞的场景比较适用。Schedulers.elastic()能够方便地给一个阻塞 的任务分配它自己的线程,从而不会妨碍其他任务和资源;
    • 固定大小线程池(Schedulers.parallel()),所创建线程池的大小与CPU个数等同;
    • 自定义线程池(Schedulers.fromExecutorService(ExecutorService))基于自定义的ExecutorService创建 Scheduler(虽然不太建议,不过你也可以使用Executor来创建)。

    Schedulers类已经预先创建了几种常用的线程池:使用single()elastic()parallel()方法可以分别使用内置的单线程、弹性线程池和固定大小线程池。如果想创建新的线程池,可以使用newSingle()newElastic()newParallel()方法。

    Executors提供的几种线程池在Reactor中都支持:

    • Schedulers.single()Schedulers.newSingle()对应Executors.newSingleThreadExecutor()
    • Schedulers.elastic()Schedulers.newElastic()对应Executors.newCachedThreadPool()
    • Schedulers.parallel()Schedulers.newParallel()对应Executors.newFixedThreadPool()
    • 下一章会介绍到,Schedulers提供的以上三种调度器底层都是基于ScheduledExecutorService的,因此都是支持任务定时和周期性执行的;
    • FluxMono的调度操作符subscribeOnpublishOn支持work-stealing。

    举例:将同步的阻塞调用变为异步的

    前面介绍到Schedulers.elastic()能够方便地给一个阻塞的任务分配专门的线程,从而不会妨碍其他任务和资源。我们就可以利用这一点将一个同步阻塞的调用调度到一个自己的线程中,并利用订阅机制,待调用结束后异步返回。

    假设我们有一个同步阻塞的调用方法:

    private String getStringSync() {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "Hello, Reactor!";
    }
    

    正常情况下,调用这个方法会被阻塞2秒钟,然后同步地返回结果。我们借助elastic调度器将其变为异步,由于是异步的,为了保证测试方法所在的线程能够等待结果的返回,我们使用CountDownLatch

    @Test
    public void testSyncToAsync() throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        Mono.fromCallable(() -> getStringSync())    // 1
                .subscribeOn(Schedulers.elastic())  // 2
                .subscribe(System.out::println, null, countDownLatch::countDown);
        countDownLatch.await(10, TimeUnit.SECONDS);
    }
    
    1. 使用fromCallable声明一个基于Callable的Mono;
    2. 使用subscribeOn将任务调度到Schedulers内置的弹性线程池执行,弹性线程池会为Callable的执行任务分配一个单独的线程。

    切换调度器的操作符

    Reactor 提供了两种在响应式链中调整调度器 Scheduler的方法:publishOnsubscribeOn。它们都接受一个 Scheduler作为参数,从而可以改变调度器。但是publishOn在链中出现的位置是有讲究的,而subscribeOn 则无所谓。

    假设与上图对应的代码是:
    Flux.range(1, 1000)
    .map(…)
    .publishOn(Schedulers.elastic()).filter(…)
    .publishOn(Schedulers.parallel()).flatMap(…)
    .subscribeOn(Schedulers.single())

    • 如图所示,publishOn会影响链中其后的操作符,比如第一个publishOn调整调度器为elastic,则filter的处理操作是在弹性线程池中执行的;同理,flatMap是执行在固定大小的parallel线程池中的;
    • subscribeOn无论出现在什么位置,都只影响源头的执行环境,也就是range方法是执行在单线程中的,直至被第一个publishOn切换调度器之前,所以range后的map也在单线程中执行。

    关于publishOnsubscribeOn为什么会出现如此的调度策略,需要深入讨论Reactor的实现原理,我们将在下一章展开。

    1.3.2.6 错误处理

    在响应式流中,错误(error)是终止信号。当有错误发生时,它会导致流序列停止,并且错误信号会沿着操作链条向下传递,直至遇到subscribe中的错误处理方法。这样的错误还是应该在应用层面解决的。否则,你可能会将错误信息显示在用户界面,或者通过某个REST endpoint发出。所以还是建议在subscribe时通过错误处理方法妥善解决错误。

    @Test
    public void testErrorHandling() {
        Flux.range(1, 6)
                .map(i -> 10/(i-3)) // 1
                .map(i -> i*i)
                .subscribe(System.out::println, System.err::println);
    }
    
    1. 当i为3时会导致异常。

    输出为:

    25
    100
    java.lang.ArithmeticException: / by zero    //注:这一行是红色,表示标准错误输出
    

    subscribe方法的第二个参数定义了对错误信号的处理,从而测试方法exit为0(即正常退出),可见错误没有蔓延出去。不过这还不够~

    此外,Reactor还提供了其他的用于在链中处理错误的操作符(error-handling operators),使得对于错误信号的处理更加及时,处理方式更加多样化。

    在讨论错误处理操作符的时候,我们借助命令式编程风格的 try 代码块来作比较。我们都很熟悉在 try-catch 代码块中处理异常的几种方法。常见的包括如下几种:

    1. 捕获并返回一个静态的缺省值。
    2. 捕获并执行一个异常处理方法或动态计算一个候补值来顶替。
    3. 捕获,并再包装为某一个 业务相关的异常,然后再抛出业务异常。
    4. 捕获,记录错误日志,然后继续抛出。
    5. 使用 finally 来清理资源,或使用 Java 7 引入的 “try-with-resource”。

    以上所有这些在 Reactor 都有相应的基于 error-handling 操作符处理方式。

    1. 捕获并返回一个静态的缺省值

    onErrorReturn方法能够在收到错误信号的时候提供一个缺省值:

    Flux.range(1, 6)
        .map(i -> 10/(i-3))
        .onErrorReturn(0)   // 1
        .map(i -> i*i)
        .subscribe(System.out::println, System.err::println);
    
    1. 当发生异常时提供一个缺省值0

    输出如下:

    25
    100
    0
    

    2. 捕获并执行一个异常处理方法或计算一个候补值来顶替

    onErrorResume方法能够在收到错误信号的时候提供一个新的数据流:

    Flux.range(1, 6)
        .map(i -> 10/(i-3))
        .onErrorResume(e -> Mono.just(new Random().nextInt(6))) // 提供新的数据流
        .map(i -> i*i)
        .subscribe(System.out::println, System.err::println);
    

    输出如下:

    25
    100
    16
    

    举一个更有业务含义的例子:

    Flux.just(endpoint1, endpoint2)
        .flatMap(k -> callExternalService(k))   // 1
        .onErrorResume(e -> getFromCache(k));   // 2
    
    1. 调用外部服务;
    2. 如果外部服务异常,则从缓存中取值代替。

    3. 捕获,并再包装为某一个业务相关的异常,然后再抛出业务异常

    有时候,我们收到异常后并不想立即处理,而是会包装成一个业务相关的异常交给后续的逻辑处理,可以使用onErrorMap方法:

    Flux.just("timeout1")
        .flatMap(k -> callExternalService(k))   // 1
        .onErrorMap(original -> new BusinessException("SLA exceeded", original)); // 2
    
    1. 调用外部服务;
    2. 如果外部服务异常,将其包装为业务相关的异常后再次抛出。

    这一功能其实也可以用onErrorResume实现,略麻烦一点:

    Flux.just("timeout1")
        .flatMap(k -> callExternalService(k))
        .onErrorResume(original -> Flux.error(
            new BusinessException("SLA exceeded", original)
        );
    

    4. 捕获,记录错误日志,然后继续抛出

    如果对于错误你只是想在不改变它的情况下做出响应(如记录日志),并让错误继续传递下去, 那么可以用doOnError 方法。前面提到,形如doOnXxx是只读的,对数据流不会造成影响:

    Flux.just(endpoint1, endpoint2)
        .flatMap(k -> callExternalService(k)) 
        .doOnError(e -> {   // 1
            log("uh oh, falling back, service failed for key " + k);    // 2
        })
        .onErrorResume(e -> getFromCache(k)); 
    
    1. 只读地拿到错误信息,错误信号会继续向下游传递;
    2. 记录日志。

    5. 使用 finally 来清理资源,或使用 Java 7 引入的 "try-with-resource"

    Flux.using(
            () -> getResource(),    // 1
            resource -> Flux.just(resource.getAll()),   // 2
            MyResource::clean   // 3
    );
    
    1. 第一个参数获取资源;
    2. 第二个参数利用资源生成数据流;
    3. 第三个参数最终清理资源。

    另一方面, doFinally在序列终止(无论是 onComplete、onError还是取消)的时候被执行, 并且能够判断是什么类型的终止事件(完成、错误还是取消),以便进行针对性的清理。如:

    LongAdder statsCancel = new LongAdder();    // 1
    
    Flux<String> flux =
    Flux.just("foo", "bar")
        .doFinally(type -> {
            if (type == SignalType.CANCEL)  // 2
              statsCancel.increment();  // 3
        })
        .take(1);   // 4
    
    1. LongAdder进行统计;
    2. doFinallySignalType检查了终止信号的类型;
    3. 如果是取消,那么统计数据自增;
    4. take(1)能够在发出1个元素后取消流。

    重试

    还有一个用于错误处理的操作符你可能会用到,就是retry,见文知意,用它可以对出现错误的序列进行重试。

    请注意:**retry对于上游Flux是采取的重订阅(re-subscribing)的方式,因此重试之后实际上已经一个不同的序列了, 发出错误信号的序列仍然是终止了的。举例如下:

    Flux.range(1, 6)
        .map(i -> 10 / (3 - i))
        .retry(1)
        .subscribe(System.out::println, System.err::println);
    Thread.sleep(100);  // 确保序列执行完
    

    输出如下:

    5
    10
    5
    10
    java.lang.ArithmeticException: / by zero
    

    可见,retry不过是再一次从新订阅了原始的数据流,从1开始。第二次,由于异常再次出现,便将异常传递到下游了。

    1.3.2.7 回压

    前边的例子并没有进行流量控制,也就是,当执行.subscribe(System.out::println)这样的订阅的时候,直接发起了一个无限的请求(unbounded request),就是对于数据流中的元素无论快慢都“照单全收”。

    subscribe方法还有一个变体:

    // 接收一个Subscriber为参数,该Subscriber可以进行更加灵活的定义
    subscribe(Subscriber subscriber)
    

    注:其实这才是subscribe方法本尊,前边介绍到的可以接收0~4个函数式接口为参数的subscribe最终都是拼装为这个方法,所以按理说前边的subscribe方法才是“变体”。

    我们可以通过自定义具有流量控制能力的Subscriber进行订阅。Reactor提供了一个BaseSubscriber,我们可以通过扩展它来定义自己的Subscriber。

    假设,我们现在有一个非常快的Publisher——Flux.range(1, 6),然后自定义一个每秒处理一个数据元素的慢的Subscriber,Subscriber就需要通过request(n)的方法来告知上游它的需求速度。代码如下:

    @Test
    public void testBackpressure() {
        Flux.range(1, 6)    // 1
                .doOnRequest(n -> System.out.println("Request " + n + " values..."))    // 2
                .subscribe(new BaseSubscriber<Integer>() {  // 3
                    @Override
                    protected void hookOnSubscribe(Subscription subscription) { // 4
                        System.out.println("Subscribed and make a request...");
                        request(1); // 5
                    }
    
                    @Override
                    protected void hookOnNext(Integer value) {  // 6
                        try {
                            TimeUnit.SECONDS.sleep(1);  // 7
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("Get value [" + value + "]");    // 8
                        request(1); // 9
                    }
                });
    }
    
    1. Flux.range是一个快的Publisher;
    2. 在每次request的时候打印request个数;
    3. 通过重写BaseSubscriber的方法来自定义Subscriber;
    4. hookOnSubscribe定义在订阅的时候执行的操作;
    5. 订阅时首先向上游请求1个元素;
    6. hookOnNext定义每次在收到一个元素的时候的操作;
    7. sleep 1秒钟来模拟慢的Subscriber;
    8. 打印收到的元素;
    9. 每次处理完1个元素后再请求1个。

    输出如下(我们也可以使用log()来打印类似下边的输出,以代替上边代码中的System.out.println):

    Subscribed and make a request...
    Request 1 values...
    Get value [1]
    Request 1 values...
    Get value [2]
    Request 1 values...
    Get value [3]
    Request 1 values...
    Get value [4]
    Request 1 values...
    Get value [5]
    Request 1 values...
    Get value [6]
    Request 1 values...
    

    这6个元素是以每秒1个的速度被处理的。由此可见range方法生成的Flux采用的是缓存的回压策略,能够缓存下游暂时来不及处理的元素。

    1.3.2.8 总结

    以上关于Reactor的介绍主要是概念层面和使用层面的介绍,不过应该也足以应对常见的业务环境了。

    从命令式编程到响应式编程的切换并不是一件容易的事,需要一个适应的过程。不过相信你通过本节的了解和实操,已经可以体会到使用Reactor编程的一些特点:

    • 相对于传统的基于回调和Future的异步开发方式,响应式编程更加具有可编排性和可读性,配合lambda表达式,代码更加简洁,处理逻辑的表达就像装配“流水线”,适用于对数据流的处理;
    • 订阅(subscribe)时才触发数据流,这种数据流叫做“冷”数据流,就像插座插上电器才会有电流一样,还有一种数据流不管是否有订阅者订阅它都会一直发出数据,称之为“热”数据流,Reactor中几乎都是“冷”数据流;
    • 调度器对线程管理进行更高层次的抽象,使得我们可以非常容易地切换线程执行环境;
    • 灵活的错误处理机制有利于编写健壮的程序;
    • “回压”机制使得订阅者可以无限接受数据并让它的源头“满负荷”推送所有的数据,也可以通过使用request方法来告知源头它一次最多能够处理 n 个元素,从而将“推送”模式转换为“推送+拉取”混合的模式。

    后续随着对Reactor的了解我们还会逐渐了解它更多的好玩又好用的特性。

    Reactor的开发者中也有来自RxJava的大牛,因此Reactor中甚至许多方法名都是来自RxJava的API的,学习了Reactor之后,很轻松就可以上手Rx家族的库了。

    展开全文
  • 道法术器

    千次阅读 2017-11-23 14:34:11
    道法术器势道:目标法:系统规划术:学习手段器:学习工具和手段势:学习心态和状态

    道法术器势

    道:目标

    法:系统规划

    术:学习手段

    器:学习工具和手段

    势:学习心态和状态

    展开全文
  • 2.敏捷全景图之道法术器.pdf
  • (1)什么是响应式编程——响应式Spring的道法术器

    万次阅读 多人点赞 2018-03-06 10:54:22
    本系列其他文章见:《响应式Spring的道法术器》。 响应式编程之道 1.1 什么是响应式编程? 在开始讨论响应式编程(Reactive Programming)之前,先来看一个我们经常使用的一款堪称“响应式典范”的强大的生产...

    本系列其他文章见:《响应式Spring的道法术器》

    响应式编程之道

    1.1 什么是响应式编程?

    在开始讨论响应式编程(Reactive Programming)之前,先来看一个我们经常使用的一款堪称“响应式典范”的强大的生产力工具——电子表格。

    举个简单的例子,某电商网站正在搞促销活动,任何单品都可以参加“满199减40”的活动,而且“满500包邮”。吃货小明有选择障碍(当然主要原因还是一个字:穷),他有个习惯,就是先在Excel上根据预算算好自己要买的东西:

    相信大家都用过Excel中的公式,这是一个统计购物车商品和订单应付金额的表格,其中涉及到一些公式:

    上图中蓝色的线是公式的引用关系,从中可以看出,“商品金额”是通过“单价x数量”得到的,“满199减40”会判断该商品金额是否满199并根据情况减掉40,右侧“订单总金额”是“满199减40”这一列的和,“邮费”会根据订单总金额计算,“最终应付款”就是订单总金额加上邮费。

    1.1.1 变化传递(propagation of change)

    为什么说电子表格软件是“响应式典范”呢,因为“单价”和“数量”的任何变动,都会被引用(“监听”)它的单元格实时更新计算结果,如果还有图表或数据透视图引用了这块数据,那么也会相应变化,做到了实时响应。变化的时候甚至还有动画效果,用户体验一级棒!

    这是响应式的核心特点之一:变化传递(propagation of change)。一个单元格变化之后,会像多米诺骨牌一样,导致直接和间接引用它的其他单元格均发生相应变化。

    看到这里,你可能会说,“切~ 不就是算付款金额吗,购物网站上都有这个最基础不过的功能啊”,这就“响应式”啦?但凡一个与用户交互的系统都得“响应”用户交互啊

    但是在响应式编程中,基于“变化传递”的特点,触发响应的主体发生了变化。假设购物车管理和订单付款是两个不同的模块,或者至少是两个不同的类——CartInvoice。也许我们的代码是这样的:

    Product.java(假设商品有两个属性nameprice,简单起见,price就不用BigDecimal类型了)

    public class Product {
        private String name;
        private double price;
        // 构造方法、getters、setters
    }
    

    Cart模块中:

    import com.example.Invoice; // 2
    
    public class Cart {
        ...
        public boolean addProduct(Product product, int quantity) {
            ...
            double figure = product.getPrice() * quantity;
            invoice.update(figure); // 1
            ...
        }
        ...
    }
    
    1. 是由Cart的对象去调用Invoice对象的更新订单金额的方法;
    2. Cart的代码中需要import Invoice

    而我们再观察这个Excel,发现“订单总金额”的计算公式不仅位于自己的单元格中,而且这个公式能主动监听和响应购物车数据的变化事件。对于购物车来说,它没有对订单付款方面的任何公式引用。感觉就像这样:

    假设数据流有操作的商品product和变化个数quantity两个属性:

    public class CartEvent {
        private Product product;
        private int quantity;
        // 构造方法、getters、setters
    }
    

    Invoice模块中:

    import com.example.Cart // 2
    
    public class Invoice {
        ...
        public Invoice(Cart cart) {
            ...
            this.listenOn(cart);    // 1
            ...
        }
        // 回调方法
        public void onCartChange(CartEvent event) {
            ...
        }
        ...
    }
    
    1. 是由Invoice的对象在初始化的时候就声明了对Cart对象的监听,从而一旦Cart对象有响应的事件(比如添加商品)发生的时候,Invoice就会响应;
    2. Invoice的代码中import Cart

    做过Java桌面开发的朋友可能会想到Java swing中的各种监听器,比如MouseListener能够监听鼠标的操作,并实时做出响应。所以C/S的客户端总是比B/S的Web界面更具有响应性嘛。

    所以,这里我们说的是一种生产者只负责生成并发出数据/事件,消费者来监听并负责定义如何处理数据/事件的变化传递方式

    那么,Cart对象如何在发生变化的时候“发出”数据或事件呢?

    1.1.2 数据流(data stream)

    这些数据/事件在响应式编程里会以数据流的形式发出。

    我们再观察一下购物车,这里有若干商品,小明每次往购物车里添加或移除一种商品,或调整商品的购买数量,这种事件都会像过电一样流过这由公式串起来的多米诺骨牌一次。这一次一次的操作事件连起来就是一串数据流(data stream),如果我们能够及时对数据流的每一个事件做出响应,会有效提高系统的响应水平。这是响应式的另一个核心特点:基于数据流(data stream)

    如下图是小明选购商品的过程,为了既不超预算,又能省邮费,有时加有时减:

    这一次一次的操作就构成了一串数据流。Invoice模块中的代码可能是这样:

        public Invoice(Cart cart) {
            ...
            this.listenOn(cart.eventStream());  // 1
            ...
        }
    
    1. 其中,cart.eventStream()是要监听的购物车的操作事件数据流,listenOn方法能够对数据流中到来的元素依次进行处理。

    1.1.3 声明式(declarative)

    我们再到listenOn方法去看一下:

    Invoice模块中,上边的一串公式被组装成如下的伪代码:

        public void listenOn(DataStream<CartEvent> cartEventStream) {   // 1
            double sum = 0;
            double total = cartEventStream
                // 分别计算商品金额
                .map(cartEvent -> cartEvent.getProduct().getPrice() * cartEvent.getQuantity())  // 2
                // 计算满减后的商品金额
                .map(v -> (v > 199) ? (v - 40) : v)
                // 将金额的变化累加到sum
                .map(v -> {sum += v; return sum;})
                // 根据sum判断是否免邮,得到最终总付款金额
                .map(sum -> (sum > 500) ? sum : (sum + 50));
            ...
    
    1. cartEventStream是数据流,DataStream是某种数据流类型,可以暂时想象成类似在Java 8版本增加的对数据流进行处理的Stream API(下节会说到为啥不用Java Stream)。

    2. map方法用于对数据流中的元素进行映射,比如第一个将cartEvent中的商品价格和数量拿到,然后算出本次操作的金额;第二个判断是否能享受“满199减40”的活动。

    这里的伪代码用到了lambda,它非常适用于数据流的处理。没有接触过lambda的话没有关系,我们后续会再聊到它。

    这是一种**“声明式(declarative)”**的编程范式。通过四个串起来的map调用,我们先声明好了对于数据流“将会”进行什么样的处理,当有数据流过来时,就会按照声明好的处理流程逐个进行处理。

    比如对于第一个map操作:

    **声明式编程范式的威力在于以不变应万变。**无论到来的元素是什么,计算逻辑是不变的,从而形成了一种对计算逻辑的“绑定”。

    再举个简单的例子方便理解:

    a = 1;
    b = a + 1;
    a = 2;
    

    这个时候,b是多少呢?在Java以及多数语言中,b的结果是2,第二次对a的赋值并不会影响b的值。

    假设Java引入了一种新的赋值方式:=,表示一种对a的绑定关系,如

    a = 1;
    b := a + 1;
    a = 2;
    

    由于b保存的不是某次计算的值,而是针对a的一种绑定关系,所以b能够随时根据a的值的变化而变化,这时候b==3,我们就可以说:=是一种声明式赋值方式。而普通的=是一种命令式赋值方式。事实上,我们绝大多数的开发都是命令式的,如果需要用命令式编程表达类似上边的这种绑定关系,在每次a发生变化并需要拿到b的时候都得执行b = a + 1来更新b的值。

    如此想来,“绑定美元政策”不也是一种声明式的范式吗~

    总结来说,命令式是面向过程的,声明式是面向结构的

    不过命令式和声明式本身并无高低之分,只是声明式比较适合基于流的处理方式。这是响应式的第三个核心特点:声明式(declarative)。结合“变化传递”的特点,声明式能够让基于数据流的开发更加友好。

    1.1.4 总结

    总结起来,响应式编程(reactive programming)是一种基于数据流(data stream)和变化传递(propagation of change)的声明式(declarative)的编程范式。

    响应式编程的“变化传递”就相当于果汁流水线的管道;在入口放进橙子,出来的就是橙汁;放西瓜,出来的就是西瓜汁,橙子和西瓜、以及机器中的果肉果汁以及残渣等,都是流动的“数据流”;管道的图纸是用“声明式”的语言表示的。

    这种编程范式如何让Web应用更加“reactive”呢?

    我们设想这样一种场景,我们从底层数据库驱动,经过持久层、服务层、MVC层中的model,到用户的前端界面的元素,全部都采用声明式的编程范式,从而搭建一条能够传递变化的管道,这样我们只要更新一下数据库中的数据,用户的界面上就相应的发生变化,岂不美哉?尤其重要的是,一处发生变化,我们不需要各种命令式的调用来传递这种变化,而是由搭建好的“流水线”自动传递。

    这种场景用在哪呢?比如一个日志监控系统,我们的前端页面将不再需要通过“命令式”的轮询的方式不断向服务器请求数据然后进行更新,而是在建立好通道之后,数据流从系统源源不断流向页面,从而展现实时的指标变化曲线;再比如一个社交平台,朋友的动态、点赞和留言不是手动刷出来的,而是当后台数据变化的时候自动体现到界面上的。

    具体如何来实现呢,请看下一节关于响应式流的介绍。

    展开全文
  • 《响应式Spring的道法术器》专栏相关代码
  • 本系列其他文章见:《响应式Spring的道法术器》。 前情提要:Reactor快速上手 | Spring WebFlux快速上手 本文源码 1.4 从负载测试看异步非阻塞的优势 前面总是“安利”异步非阻塞的好处,下面我们就实实在在...
  • (3)lambda与函数式——响应式Spring的道法术器

    万次阅读 多人点赞 2018-03-08 09:51:37
    本系列其他文章见:《响应式Spring的道法术器》。 前情提要:响应式编程 | 响应式流 1.3 Hello,reactive world 前面两篇文章介绍了响应式编程和响应式流的特性,一味讲概念终是枯燥,还是上手敲一敲代码实在...
  • SpringBoot2.1.15(26) WebFlux快速上手——响应式Spring的道法术器.pdf
  • (2)响应式流——响应式Spring的道法术器

    万次阅读 多人点赞 2018-03-07 09:21:32
    本系列其他文章见:《响应式Spring的道法术器》。 前情提要: 什么是响应式编程 1.2 响应式流 上一节留了一个坑——为啥不用Java Stream来进行数据流的操作? 原因在于,若将其用于响应式编程中,是有局限性...
  • 本系列其他文章见:《响应式Spring的道法术器》。 前情提要:Spring WebFlux快速上手 | Spring WebFlux性能测试 本文源码 1.4.2 调用带有延迟的服务负载分析 由于微服务架构的盛行,大型系统内服务间基于...
  • 响应式Spring的道法术器(Spring WebFlux 教程)

    万次阅读 多人点赞 2018-03-07 09:38:37
    如果希望有更加深入的了解,欢迎阅读下边的系列文章—— 响应式Spring的道法术器 这个系列的文章是为了记录下自己学习Spring响应式编程的所得,也希望能够帮助到更多的朋友。 原谅我标题党了,希望能从道、法、术、...
  • 本系列其他文章见:《响应式Spring的道法术器》。 前情提要:Reactor3快速上手 | 深入理解响应式流规范 | Reactor3自定义数据流 本文测试源码 2.4 调度器与线程模型 在1.3.2节简单介绍了不同类型的调度器...
  • 公有链和联盟链的道法术器

    千次阅读 2017-02-15 13:57:47
    目前已经有不少文章针对区块链中公有链和联盟链两种形态的讨论,里面跟我的了解有些差异,Elwin这里尝试从道法术器的角度对此进行分析,涵盖信念价值、产生背景、技术特点、维护治理、发展趋势等方面进行讨论,希望...
  • 响应式Spring的道法术器(Spring WebFlux 教程)
  • 本系列其他文章见:《响应式Spring的道法术器》。 前情提要: 什么是响应式编程 1.2 响应式流 上一节留了一个坑——为啥不用Java Stream来进行数据流的操作? 原因在于,若将其用于响应式编程中,是有局限性的。...
  • [url=https://blog.csdn.net/get_set/article/details/79480233]Spring WebFlux快速上手——响应式Spring的道法术器[/url]
  • 内容来源:2017年8月18日,DevOps时代联合发起人张乐在“DevOpsDays 【主会场】”进行《DevOps 道法术器及全开源端到端部署流水线 2.0发布》演讲分享。IT 大咖说(ID:itdakashuo)作为独家视频合作方,经主办方和讲...
  • docker的道法术器

    2021-04-11 21:48:49
    1.2 什么是docker【时势造英雄——云原生,云部署】 docker的发展历史: 啥是docker: 开源的应用软件的【容器】,可以让开发者打包他们的应用以及依赖包到一个轻量级、可移植的容器中,然后发布到任何流行的机器上...
  • 本系列其他文章见:《响应式Spring的道法术器》。 前情提要:Reactor3快速上手 | 深入理解响应式流规范 本文测试源码 2.5 Reactor 3 Operators 虽然响应式流规范中对Operator(以下均称作”操作符“)并未...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,984
精华内容 793
关键字:

道法术器