2018-12-13 12:24:01 ShrCheng 阅读数 291

案例概述

调试Reactive Streams可能是我们开始使用这些数据结构后必须面对的主要挑战之一。

考虑到Reactive Streams在过去几年中越来越受欢迎,了解我们如何有效地执行此任务是个好主意。

让我们首先使用Reactive Streams设置项目,看看为什么这通常很麻烦。

带有错误的场景

我们想要模拟一个实际情况,其中运行了几个异步进程,并且我们在代码中引入了一些最终会触发异常的缺陷。

为了理解全局,我们将提到我们的应用程序将使用和处理简单Foo对象流,这些对象只包含id、formattedName和quantity字段。

分析日志输出

现在,让我们检查一个片段以及当出现未处理的错误时它生成的输出:

public void processFoo(Flux<Foo> flux) {
    flux = FooNameHelper.concatFooName(flux);
    flux = FooNameHelper.substringFooName(flux);
    flux = FooReporter.reportResult(flux);
    flux.subscribe();
}
 
public void processFooInAnotherScenario(Flux<Foo> flux) {
    flux = FooNameHelper.substringFooName(flux);
    flux = FooQuantityHelper.divideFooQuantity(flux);
    flux.subscribe();
}

运行我们的应用程序几秒钟后,我们会看到它会不时记录异常。

仔细查看其中一个错误,我们会发现类似于此的内容:

Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 15
    at j.l.String.substring(String.java:1963)
    at com.baeldung.debugging.consumer.service.FooNameHelper
      .lambda$1(FooNameHelper.java:38)
    at r.c.p.FluxMap$MapSubscriber.onNext(FluxMap.java:100)
    at r.c.p.FluxMap$MapSubscriber.onNext(FluxMap.java:114)
    at r.c.p.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:275)
    at r.c.p.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:849)
    at r.c.p.Operators$MonoSubscriber.complete(Operators.java:1476)
    at r.c.p.MonoDelayUntil$DelayUntilCoordinator.signal(MonoDelayUntil.java:211)
    at r.c.p.MonoDelayUntil$DelayUntilTrigger.onComplete(MonoDelayUntil.java:290)
    at r.c.p.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:118)
    at r.c.s.SchedulerTask.call(SchedulerTask.java:50)
    at r.c.s.SchedulerTask.call(SchedulerTask.java:27)
    at j.u.c.FutureTask.run(FutureTask.java:266)
    at j.u.c.ScheduledThreadPoolExecutor$ScheduledFutureTask
      .access$201(ScheduledThreadPoolExecutor.java:180)
    at j.u.c.ScheduledThreadPoolExecutor$ScheduledFutureTask
      .run(ScheduledThreadPoolExecutor.java:293)
    at j.u.c.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at j.u.c.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at j.l.Thread.run(Thread.java:748)

基于根本原因,并注意到堆栈跟踪中提到的FooNameHelper类,我们可以想象在某些情况下,我们的Foo对象正在使用比预期更短的formattedName 值进行处理。

当然,这只是一个简化的案例,解决方案似乎相当明显。

但是让我们假设这是一个真实案例场景,如果没有一些上下文信息,异常本身并不能帮助我们解决问题。

异常是作为processFoo或processFooInAnotherScenario方法的一部分触发的吗?

在到达此阶段之前,其他前面的步骤是否影响了formattedName字段?

日志条目无法帮助我们找出这些问题。

更糟糕的是,有时甚至不会从我们的功能中抛出异常。

例如,假设我们依赖反应式存储库来保存我们的Foo对象。如果此时错误上升,我们甚至可能不知道从哪里开始调试代码。

我们需要工具来有效地调试反应流。

使用调试会话

确定我们的应用程序正在发生什么的一个选项是使用我们喜欢的IDE启动调试会话。

我们必须设置几个条件断点,并在流中的每个步骤执行时分析数据流。

实际上,当我们有大量的被动进程在运行和共享资源时,这可能是一项繁琐的任务。

此外,在许多情况下,出于安全原因,我们无法启动调试会话。

使用doOnError方法或使用订阅参数记录信息

有时,我们可以通过提供Consumer作为subscribe方法的第二个参数来添加有用的上下文信息

public void processFoo(Flux<Foo> flux) {
 
    // ...
 
    flux.subscribe(foo -> {
        logger.debug("Finished processing Foo with Id {}", foo.getId());
    }, error -> {
        logger.error(
          "The following error happened on processFoo method!",
           error);
    });
}

注意:值得一提的是,如果我们不需要对subscribe方法进行进一步处理,我们可以在发布者上链接doOnError函数

flux.doOnError(error -> {
    logger.error("The following error happened on processFoo method!", error);
}).subscribe();

现在我们将对错误的来源提供一些指导,即使我们仍然没有太多关于生成异常的实际元素的信息。

激活Reactor的全局调试配置

Reactor库提供了一个hook类,它允许我们配置Flux和Mono操作符的行为。

通过添加以下语句,我们的应用程序将检测对发布者方法的调用,包装运算符的构造,并捕获堆栈跟踪:

Hooks.onOperatorDebug();

这样就可以默认启用Thymeleaf - 无需额外配置。
调试模式激活后,我们的异常日志将包含一些有用的信息:

16:06:35.334 [parallel-1] ERROR c.b.d.consumer.service.FooService
  - The following error happened on processFoo method!
java.lang.StringIndexOutOfBoundsException: String index out of range: 15
    at j.l.String.substring(String.java:1963)
    at c.d.b.c.s.FooNameHelper.lambda$1(FooNameHelper.java:38)
    ...
    at j.l.Thread.run(Thread.java:748)
    Suppressed: r.c.p.FluxOnAssembly$OnAssemblyException: 
Assembly trace from producer [reactor.core.publisher.FluxMapFuseable] :
    reactor.core.publisher.Flux.map(Flux.java:5653)
    c.d.b.c.s.FooNameHelper.substringFooName(FooNameHelper.java:32)
    c.d.b.c.s.FooService.processFoo(FooService.java:24)
    c.d.b.c.c.ChronJobs.consumeInfiniteFlux(ChronJobs.java:46)
    o.s.s.s.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    o.s.s.s.DelegatingErrorHandlingRunnable
      .run(DelegatingErrorHandlingRunnable.java:54)
    o.u.c.Executors$RunnableAdapter.call(Executors.java:511)
    o.u.c.FutureTask.runAndReset(FutureTask.java:308)
Error has been observed by the following operator(s):
    |_    Flux.map ⇢ c.d.b.c.s.FooNameHelper
            .substringFooName(FooNameHelper.java:32)
    |_    Flux.map ⇢ c.d.b.c.s.FooReporter.reportResult(FooReporter.java:15)

我们可以看到,第一部分保持相对相同,但以下部分提供了有关以下内容的信息:

  • 发布者的程序集跟踪 - 这里我们可以确认错误最初是在processFoo方法中生成的。
  • 在第一次触发错误之后观察到错误的运算符,以及链接它们的用户类。

注意:在这个例子中,主要是为了清楚地看到这一点,我们在不同的类上添加操作。

我们可以随时打开或关闭调试模式,但它不会影响已经实例化的Flux和Mono对象。

在不同的线程上执行运算符

要记住的另一个方面是即使在流上运行不同的线程,也会正确生成程序集跟踪。

我们来看看下面的例子:

public void processFoo(Flux<Foo> flux) {
    flux = flux.publishOn(Schedulers.newSingle("foo-thread"));
    // ...
 
    flux = flux.publishOn(Schedulers.newSingle("bar-thread"));
    flux = FooReporter.reportResult(flux);
    flux.subscribeOn(Schedulers.newSingle("starter-thread"))
      .subscribe();
}

现在,如果我们检查日志,我们会理解在这种情况下,第一部分可能会稍微改变,但最后两部分保持相同。

第一部分是线程堆栈跟踪,因此它只显示特定线程执行的操作。

正如我们所看到的,当我们调试应用程序时,这不是最重要的部分,因此这种更改是可以接受的。

在单个进程上激活调试输出

在每个单一的反应过程中检测和生成堆栈跟踪都是昂贵的。

因此,我们应该只在关键情况下实施前一种方法。

无论如何**,Reactor提供了一种在单个关键进程上启用调试模式的方法,这样可以减少内存消耗。**

我们指的是检查点操作员:

public void processFoo(Flux<Foo> flux) {
     
    // ...
 
    flux = flux.checkpoint("Observed error on processFoo", true);
    flux.subscribe();
}

请注意,以这种方式,将在检查点阶段记录程序集跟踪:

Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 15
    ...
Assembly trace from producer [reactor.core.publisher.FluxMap],
  described as [Observed error on processFoo] :
    r.c.p.Flux.checkpoint(Flux.java:3096)
    c.b.d.c.s.FooService.processFoo(FooService.java:26)
    c.b.d.c.c.ChronJobs.consumeInfiniteFlux(ChronJobs.java:46)
    o.s.s.s.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
    o.s.s.s.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    j.u.c.Executors$RunnableAdapter.call(Executors.java:511)
    j.u.c.FutureTask.runAndReset(FutureTask.java:308)
Error has been observed by the following operator(s):
    |_    Flux.checkpoint ⇢ c.b.d.c.s.FooService.processFoo(FooService.java:26)

我们应该在反应链的末尾实施检查点方法

否则,操作员将无法观察下游发生的错误。

另外,请注意,库提供了重载方法。我们可以避免:

  • 如果我们使用no-args选项,则指定观察到的错误的描述
  • 通过仅提供自定义描述来生成填充堆栈跟踪(这是最昂贵的操作)

记录元素序列

最后,Reactor发布商提供了一种在某些情况下可能会派上用场的方法。

通过在我们的反应链中调用log方法,应用程序将使用它在该阶段具有的状态记录流中的每个元素。

让我们在我们的例子中尝试一下:

public void processFoo(Flux<Foo> flux) {
    flux = FooNameHelper.concatFooName(flux);
    flux = FooNameHelper.substringFooName(flux);
    flux = flux.log();
    flux = FooReporter.reportResult(flux);
    flux = flux.doOnError(error -> {
        logger.error("The following error happened on processFoo method!", error);
    });
    flux.subscribe();
}

并检查日志:

INFO  reactor.Flux.Map.1 - onSubscribe(FluxMap.MapSubscriber)
INFO  reactor.Flux.Map.1 - request(unbounded)
INFO  reactor.Flux.Map.1 - onNext(Foo(id=0, formattedName=theFo, quantity=8))
INFO  reactor.Flux.Map.1 - onNext(Foo(id=1, formattedName=theFo, quantity=3))
INFO  reactor.Flux.Map.1 - onNext(Foo(id=2, formattedName=theFo, quantity=5))
INFO  reactor.Flux.Map.1 - onNext(Foo(id=3, formattedName=theFo, quantity=6))
INFO  reactor.Flux.Map.1 - onNext(Foo(id=4, formattedName=theFo, quantity=6))
INFO  reactor.Flux.Map.1 - cancel()
ERROR c.b.d.consumer.service.FooService 
  - The following error happened on processFoo method!
...

我们可以在此阶段轻松查看每个Foo对象的状态,以及在异常发生时框架如何取消流。

当然,这种方法也很昂贵,我们必须适度使用它。

案例结论

如果我们不知道正确调试应用程序的工具和机制,我们可能会花费大量时间和精力来解决问题。

如果我们不习惯处理被动和异步数据结构,那么尤其如此,我们需要额外的帮助来弄清楚事情是如何工作的。

2017-06-14 10:19:36 hj7jay 阅读数 1969

Spring 5 - Spring webflux 是一个新的非堵塞函数式 Reactive Web 框架,可以用来建立异步的,非阻塞,事件驱动的服务,并且扩展性非常好。

把阻塞(不可避免的)风格的代码迁移到函数式的非阻塞 Reactive 风格代码,需要把商业逻辑作为异步函数来调用。这可以参考 Java 8 的方法或者 lambda 表达式。由于线程是非阻塞的,处理能力能被最大化使用。

在发布这篇文章的时候,Spring 5 还处于一个里程碑版本中(5.0.0 M5)。

创建一个Spring Boost项目

可以通过 Spring initializer 创建一个Spring Boot项目。将如下的依赖添加到 pom.xml 中

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

Spring-boot-starter-webflux 包中带了 spring-webflux, netty。其他的依赖需要自行添加。

建立一个简单的用户数据表和从 list 中获取到 user 数据的 DTO 类。这仅仅是一个虚拟的数据 bean,但是这可以实时从其它的数据源像 Rdbms,MongoDb,或者 RestClient 加载数据。由于 JDBC 天生不是响应式的,所以任何对数据库的调用都会阻塞这个线程。MongoDB 有一个响应式的客户端驱动。在测试响应式 Web 服务时的进一步渲染时,REST 风格的调用不会导致任何的阻塞。

public class User {
public User(){}

public User(Long id, String user) {
this.id = id;
this.user = user;
}

private Long id;
private String user;

public Long getId() { return id; }
public void setId(Long id) { this.id = id; }
public String getUser() { return user; }
public void setUser(String user) { this.user = user; }
}

@Repository
public class UserRepository {
private final List<User> users = Arrays.asList(new User(1L, "User1"), new User(2L, "User2"));

public Mono<User> getUserById(String id) {
return Mono.justOrEmpty(users.stream().filter(user -> {
return user.getId().equals(Long.valueOf(id));
}).findFirst().orElse(null));
}

public Flux<User> getUsers() {
return Flux.fromIterable(users);
}
}

Mono 和 Flux 是由目标反应器提供的响应类型。Springs 还提供其他的响应流的实现,例如 RXJava。

Mono 和 Flux 是 Reactive streams 的发布者实现。Mono 是 0 或者任意单个值的发布,Flux 是 0 到任意值的发布。他们和 RXJava 中的 Flowable 和 Observable 类似。他们代替流向这些订阅者发布信息。

GetUserById() 返回一个 Mono<User> 对象,这个对象不论何时都会返回 0~1 个用户对象,GetUsers() 返回一连串变动的用户对象,不论何时都包含 0~n 个用户对象。

相比命令式编程风格,我们并不返回可用前阻塞线程的 User/List<User> 对象,而只是返回一个流的引用,流可以在后面访问 User/List<User>。

创建带有处理 HTTP 请求函数的 Handler 类

@Service
public class UserHandler {
@Autowired
private UserRepository userRepository;

public Mono<ServerResponse> handleGetUsers(ServerRequest request) {
return ServerResponse.ok().body(userRepository.getUsers(), User.class);
}

public Mono<ServerResponse> handleGetUserById(ServerRequest request) {
return userRepository.getUserById(request.pathVariable("id"))
.flatMap(user -> ServerResponse.ok().body(Mono.just(user), User.class))
.switchIfEmpty(ServerResponse.notFound().build());
}
}

handler 类就像 Spring Web 中的 Service beans 一样,我们需要编写该服务的大部分业务功能。ServerResponse 就像 Spring Web 中的 ResponseEntity 类一样,我们可以在 ServerResponse 对象中打包 Response 的数据、状态码、头信息等。 ServerResponse 有很多有用的默认方法,如 notFound()ok()accepted()created()等,可用于创建不同类型的反馈。
UserHandler 有不同的方法,都返回 Mono<ServerResponse>; UserRepository.getUsers() 返回Flux<User>; 和 ServerResponse.ok().body(UserRepository.getUsers(), User.class) 可将此 Flux <User> 转换为 Mono<ServerResponse>,这表明只要可用时均可发起 ServerResponse 的流。UserRepository.getUserById()返回一个Mono<User>,ServerResponse.ok().body(Mono.just(user), User.class) 将此 Mono<User> 转换为Mono<ServerResponse>,这说明随时都可以发起 ServerResponse 的流。

在给定的路径变量(pathVariable)中没有找到用户时,ServerResponse.notFound().build() 返回一个 Mono<ServerResponse>,表名是一个返回 404 服务响应的流。

在命令式编程风格中,数据接收前线程会一直阻塞,这样使得其线程在数据到来前无法运行。而响应式编程中,我们定义一个获取数据的流,然后定义一个在数据到来后的回调函数操作。这样不会使线程堵塞,在数据被返回时,可用线程就用于执行。

创建一个定义应用程序路由的路由类

import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
import static org.springframework.web.reactive.function.server.RequestPredicates.accept;
import static org.springframework.web.reactive.function.server.RouterFunctions.route;
@Configuration
public class Routes {
private UserHandler userHandler;

    public Routes(UserHandler userHandler) {
this.userHandler = userHandler;
}

@Bean
public RouterFunction<?> routerFunction() {
return route(GET("/api/user").and(accept(MediaType.APPLICATION_JSON)), userHandler::handleGetUsers)
.and(route(GET("/api/user/{id}").and(accept(MediaType.APPLICATION_JSON)), userHandler::handleGetUserById));
}
}

RouterFunction就像Spring Web中的@RequestMapping类一样。 RouterFunction用于定义Spring5应用程序的路由。 RouterFunctions帮助器类有一个有用的方法,类似路由,可用于定义路由并构建RouterFunction对象。 RequestPredicates有许多有用的方法,如GET,POST,path,queryParam,accept,headers,contentType等,来定义路由并构建RouterFunction。 每个路由映射到一个处理程序方法,当接收到适当的HttpRequest时,该方法必须被调用。
Spring5还支持定义应用程序处理程序映射的@RequestMapping类型的控制器。 我们可以编写如下所示的控制器方法,以在@RequestMapping样式中创建类似的API。

@GetMapping("/user") public Mono<ServerResponse> handleGetUsers() {}

控制器方法返回Mono<ServerResponse>。

RouterFunction为应用程序提供了DSL类型的路由功能。 到目前为止,Springs不支持混合这两种类型。

创建HttpServerConfig类,用于创建HttpServer类


import org.springframework.http.server.reactive.HttpHandler;
import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
import reactor.ipc.netty.http.server.HttpServer;
@Configuration
public class HttpServerConfig {
@Autowired
private Environment environment;

@Bean
public HttpServer httpServer(RouterFunction<?> routerFunction) {
HttpHandler httpHandler = RouterFunctions.toHttpHandler(routerFunction);
ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
HttpServer server = HttpServer.create("localhost", Integer.valueOf(environment.getProperty("server.port")));
server.newHandler(adapter);
return server;
}
}

这将使用应用程序属性中定义的端口创建一个 netty HttpServer。Spring 支持的其他服务器也跟 Tomcat 和 undertow 一样。由于 netty 是异步的,而且天生基于事件驱动,因此更适合响应式的应用程序。Tomcat 使用 Java NIO 来实现 servlet 规范。Netty 是 NIO 的一个实现,它针对异步、事件驱动的非阻塞 IO 应用程序进行了优化。

Tomcat 服务器也可以按照如下代码所示的用法使用:

Tomcat tomcatServer = new Tomcat();
    tomcatServer.setHostname("localhost");
    tomcatServer.setPort(Integer.valueOf(environment.getProperty("server.port")));
    Context rootContext = tomcatServer.addContext("", System.getProperty("java.io.tmpdir"));
    ServletHttpHandlerAdapter servlet = new ServletHttpHandlerAdapter(httpHandler);
    Tomcat.addServlet(rootContext, "httpHandlerServlet", servlet);
    rootContext.addServletMapping("/", "httpHandlerServlet");
    tomcatServer.start();

创建用于启动应用的Spring启动主类

@SpringBootApplication
public class Spring5ReactiveApplication {
public static void main(String[] args) throws IOException {
SpringApplication.run(Spring5ReactiveApplication.class, args);
}
}

测试应用

你可以使用任意诸如Postman、CURL等的HTTP测试工具测试该应用。

Spring测试也支持为响应式服务编写集成测试的功能。

@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
public class UserTest {
@Autowired
private WebTestClient webTestClient;

@Test
public void test() throws IOException {
FluxExchangeResult<User> result = webTestClient.get().uri("/api/user").accept(MediaType.APPLICATION_JSON)
.exchange().returnResult(User.class);
assert result.getStatus().value() == 200;
List<User> users = result.getResponseBody().collectList().block();
assert users.size() == 2;
assert users.iterator().next().getUser().equals("User1");
}

@Test
public void test1() throws IOException {
User user = webTestClient.get().uri("/api/user/1")
.accept(MediaType.APPLICATION_JSON).exchange().returnResult(User.class).getResponseBody().blockFirst();
assert user.getId() == 1;
assert user.getUser().equals("User1");
}

@Test
public void test2() throws IOException {
webTestClient.get().uri("/api/user/10").accept(MediaType.APPLICATION_JSON).exchange().expectStatus()
.isNotFound();
}
}

WebTestClient 和 TestRestTemplate 类似, 他们都有调用 Spring 启动应用的 rest 方法,并能够验证响应结果。在 test 的配置中,Spring 测试创建了一个 TestRestTemplate 的 bean。这里面有一个 WebClient,就跟 Spring Web 中的 RestTemplate 类似。这可用于处理响应式和非阻塞的 rest 调用。

WebClient.create("http://localhost:9000").get().uri("/api/user/1")
        .accept(MediaType.APPLICATION_JSON).exchange().flatMap(resp -> resp.bodyToMono(User.class)).block();


exchange()返回Mono<ClientResponse>,它在Emits clientResponse可用时表示一个流。

    block()阻塞线程执行,直到Mono返回User/List<User>,因为这是我们需要数据来验证响应的测试用例。

Spring Web 因其易于开发/调试而是必要的。使用Spring5响应式或Spring Web命令式服务的决定必须根据用例明智地做出。在许多情况下,只有命令式的可能会很好,但是在高可扩展性是关键因素时,响应式非阻塞将更适合。



2018-04-17 17:13:00 universsky2015 阅读数 76

Spring Boot 集成 WebFlux 开发 Reactive Web 应用

《Spring Boot 实战开发》—— 基于 Gradle + Kotlin的企业级应用开发最佳实践

IBM的研究称,整个人类文明所获得的全部数据中,有90%是过去两年内产生的。在此背景下,包括NoSQL,Hadoop, Spark, Storm, Kylin在内的大批新技术应运而生。其中以RxJava和Reactor为代表的响应式(Reactive)编程技术针对的就是经典的大数据4V( Volume,Variety,Velocity,Value)中的Velocity,即高并发问题,而在Spring 5中,引入了响应式编程的支持。
本章介绍 Spring Boot 如何集成Spring 5 中的WebFlux 开发响应式 Web 应用。
1.1 响应式宣言

响应式宣言和敏捷宣言一样,说起响应式编程,必先提到响应式宣言——
We want systems that are Responsive, Resilient, Elastic and Message Driven. We call these Reactive Systems. - The Reactive Manifesto
响应式宣言中包含了4组关键词:

 Responsive: 可响应的。要求系统尽可能做到在任何时候都能及时响应。
 Resilient: 可恢复的。要求系统即使出错了,也能保持可响应性。
 Elastic: 可伸缩的。要求系统在各种负载下都能保持可响应性。
 Message Driven: 消息驱动的。要求系统通过异步消息连接各个组件。
可以看到,对于任何一个响应式系统,首先要保证的就是可响应性,否则就称不上是响应式系统。

1.2 Spring 5 响应式Web框架架构图
引用一张来自 Spring 5框架官方文档中的图:

图13-1 Spring 5框架

左侧是传统的基于Servlet的Spring Web MVC框架。右侧是Spring 5.0新引入的基于Reactive Streams的Spring WebFlux框架。从上到下依次是如下三个新组件:

 Router Functions
 WebFlux
 Reactive Streams

下面分别作简要介绍。

Router Functions
对标@Controller,@RequestMapping等标准的Spring MVC注解,提供一套函数式风格的API,用于创建Router,Handler和Filter。
WebFlux: 核心组件
协调上下游各个组件提供响应式编程支持。
Reactive Streams
一种支持背压(Backpressure)的异步数据流处理标准,主流实现有RxJava和Reactor,Spring WebFlux默认集成的是Reactor。
在Web容器的选择上,Spring WebFlux既支持像Tomcat,Jetty这样的的传统容器(前提是支持Servlet 3.1 Non-Blocking IO API),又支持像Netty,Undertow那样的异步容器。不管是何种容器,Spring WebFlux都会将其输入输出流适配成Flux<DataBuffer>格式,以便进行统一处理。
值得一提的是,除了新的Router Functions接口,Spring WebFlux同时支持使用老的Spring MVC注解声明Reactive Controller。和传统的MVC Controller不同,Reactive Controller操作的是非阻塞的ServerHttpRequest和ServerHttpResponse,而不再是Spring MVC里的HttpServletRequest和HttpServletResponse。
1.3 项目实战
本节通过实例工程具体介绍开发一个Reactive Web 应用程序的过程。
1.3.1 创建项目
使用http://start.spring.io/ 创建项目,选择 Reactive Web起步依赖。如下图:

图13-2 选择 Reactive Web起步依赖
生成好项目 zip 包后,解压导入 IDEA 中, 选择 Gradle 构建项目。如下图:

图13-3 选择 Gradle 构建
配置 Gradle 本地环境,如下图:

图13-4 配置 Gradle 本地环境
完成导入 IDEA,等待项目构建初始化完毕,可以看到项目依赖树如下图:

图13-5 项目依赖树
可以看到,在 webflux的 starter 中依赖了 reactor、reactive-streams、netty 等。
Spring Initializr 将会帮我们自动生成一个样板工程。下面我们分别来加入 model 层 、dao层、 service层、 handler层等模块的代码。完整的项目的代码目录结构设计如下:

├── src
│   ├── main
│   │   ├── java
│   │   ├── kotlin
│   │   │   └── com
│   │   │       └── easy
│   │   │           └── kotlin
│   │   │               └── webflux
│   │   │                   ├── WebfluxApplication.kt
│   │   │                   ├── dao
│   │   │                   │   └── PersonRepository.kt
│   │   │                   ├── handler
│   │   │                   │   └── PersonHandler.kt
│   │   │                   ├── model
│   │   │                   │   └── Person.kt
│   │   │                   ├── router
│   │   │                   │   └── RouterConfig.kt
│   │   │                   ├── server
│   │   │                   │   └── HttpServerConfig.kt
│   │   │                   └── service
│   │   │                       └── PersonService.kt
│   │   └── resources
│   │       └── application.properties

1.3.2 项目代码
本节具体介绍具体的代码实现。
模型层
Person 对象模型代码是

class Person(@JsonProperty("name") val name: String, @JsonProperty("age") val age: Int) {

    override fun toString(): String {
        return "Person{" +
                "name='" + name + '\'' +
                ", age=" + age +
                '}'
    }
}

服务层
接口PersonRepository.kt 的代码如下:

interface PersonRepository {

    fun getPerson(id: Int): Mono<Person>

    fun allPeople(): Flux<Person>

    fun savePerson(person: Mono<Person>): Mono<Void>
}

服务层的实现类PersonService.kt 代码如下

@Service
class PersonService : PersonRepository {
    var persons: MutableMap<Int, Person> = hashMapOf()

    constructor() {
        this.persons[1] = Person("Jack", 20)
        this.persons[2] = Person("Rose", 16)
    }

    // 根据 id 获取 Mono 对象包装的 Person数据
    override fun getPerson(id: Int): Mono<Person> {
        return Mono.justOrEmpty(this.persons[id])
    }
    // 返回所有 Person数据,包装在 Flux 对象中
    override fun allPeople(): Flux<Person> {
        return Flux.fromIterable(this.persons.values)
    }

    override fun savePerson(person: Mono<Person>): Mono<Void> {
        return person.doOnNext {
            val id = this.persons.size + 1
            persons.put(id, it)
            println("Saved ${person} with ${id}")
        }.thenEmpty(Mono.empty())

    }
}

其中, Mono 和 Flux 是由 Reactor 提供的两个 Reactor的类型。Reactor有两种类型,Flux<T>和Mono<T>。
 Flux
Flux 单词的意思是“流”。Flux类似RaxJava的Observable,它可以触发零个或者多个事件,并根据实际情况结束处理或触发错误。
 Mono
Mono这个单词本身的意思是“单子”的意思。Mono最多只触发一个事件,它跟RxJava的Single和Maybe类似,所以可以把Mono<Void>用于在异步任务完成时发出通知。
Spring 同时支持其他 Reactive 流实现,如 RXJava。
控制器层PersonHandler.kt 代码如下:

@Service
class PersonHandler {

    @Autowired lateinit var repository: PersonRepository

    fun getPerson(request: ServerRequest): Mono<ServerResponse> {
        val personId = Integer.valueOf(request.pathVariable("id"))!!
        val notFound = ServerResponse.notFound().build()
        val personMono = this.repository.getPerson(personId)
        return personMono
            .flatMap { person -> ServerResponse.ok().contentType(APPLICATION_JSON).body(fromObject(person)) }
            .switchIfEmpty(notFound)
    }

    fun createPerson(request: ServerRequest): Mono<ServerResponse> {
        val person = request.bodyToMono(Person::class.java)
        return ServerResponse.ok().build(this.repository.savePerson(person))
    }

    fun listPeople(request: ServerRequest): Mono<ServerResponse> {
        val people = this.repository.allPeople()
        return ServerResponse.ok().contentType(APPLICATION_JSON).body(people, Person::class.java)
    }

}

这里我们没有真实去连接数据库进行操作,只是在内存中模拟了数据的返回。
请求路由
RouterConfig.kt 配置请求路由,把请求映射到相应的 Handler 处理方法。代码如下:

@Configuration
class RouterConfig {

    @Autowired lateinit var personHandler: PersonHandler

    @Bean
    fun routerFunction(): RouterFunction<*> {
        return route(GET("/api/person").and(accept(APPLICATION_JSON)),
                HandlerFunction { personHandler.listPeople(it) })
            .and(route(GET("/api/person/{id}").and(accept(APPLICATION_JSON)),
                    HandlerFunction { personHandler.getPerson(it) }))
    }

}

这里我们配置/api/person的 GET 请求映射到personHandler.listPeople()方法处理;/api/person/{id}的 GET 请求映射到personHandler.getPerson() 方法来处理。
Reactive Web服务器配置类HttpServerConfig.kt 配置基于 netty 的 Reactive Web Server。我们配置端口号为application.properties 文件中server.port的值。代码如下

@Configuration
class HttpServerConfig {
    @Autowired
    lateinit var environment: Environment

    @Bean
    fun httpServer(routerFunction: RouterFunction<*>): HttpServer {
        val httpHandler = RouterFunctions.toHttpHandler(routerFunction)
        val adapter = ReactorHttpHandlerAdapter(httpHandler)
        val server = HttpServer.create("localhost", environment.getProperty("server.port").toInt())
        server.newHandler(adapter)
        return server
    }

}

项目入口类
项目入口类 WebfluxApplication代码如下:

@SpringBootApplication
class WebfluxApplication

fun main(args: Array<String>) {
    runApplication<WebfluxApplication>(*args)
}

运行测试
直接在 IDEA 中启动运行应用,在控制台启动日志中,可以看到路由映射的信息:

Mapped ((GET && /api/person) && Accept: [application/json]) -> com.easy.kotlin.webflux.router.RouterConfig$routerFunction$1@46292372
((GET && /api/person/{id}) && Accept: [application/json]) -> com.easy.kotlin.webflux.router.RouterConfig$routerFunction$2@126be319
……
2017-11-04 00:39:50.459  INFO 2884 --- [ctor-http-nio-1] r.ipc.netty.tcp.BlockingNettyContext     : Started HttpServer on /0:0:0:0:0:0:0:0:9000
2017-11-04 00:39:50.459  INFO 2884 --- [           main] o.s.b.web.embedded.netty.NettyWebServer  : Netty started on port(s): 9000
2017-11-04 00:39:50.466  INFO 2884 --- [           main] c.e.kotlin.webflux.WebfluxApplicationKt  : Started WebfluxApplicationKt in 5.047 seconds (JVM running for 6.276)

直接在命令行执行curl 请求相应的 url,可以看到对应的输出:

$ curl http://127.0.0.1:9000/api/person
 [{"name":"Jack","age":20},{"name":"Rose","age":16}]

$ curl http://127.0.0.1:9000/api/person/1
{"name":"Jack","age":20}

$ curl http://127.0.0.1:9000/api/person/2
{"name":"Rose","age":16}

1.4 本章小结
Spring Web MVC是一个命令式的编程框架,可以很方便的进行开发和调试。在很多情况下,命令式的编程风格就可以满足,但当我们的应用需要高可伸缩性,那么 Reactive 非堵塞方式是最适合的。 所以,需要根据实际情况去决定采用 Spring 5 Reactive 或者是 Spring Web MVC命令式框架。
提示:本章工程源代码:https://github.com/EasyKotlin/kotlin-with-webflux

新书上架:《Spring Boot 开发实战》

— 基于 Kotlin + Gradle + Spring Boot 2.0 的企业级服务端开发实战

京东下单链接

https://item.jd.com/31178320122.html

天猫下单链接

https://detail.tmall.com/item.htm?id=574928877711

1233356-596a64de8adf2b27.jpg
2018-12-05 09:50:43 pchwenwenti 阅读数 193

反应式编程是关于异步和事件驱动的非阻塞应用程序,需要少量线程垂直扩展(即在JVM内)而不是水平扩展(即通过群集)。

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

参考:

  1. Notes on Reactive Programming Part I: The Reactive Landscape
  2. Notes on Reactive Programming Part II: Writing Some Code
  3. Notes on Reactive Programming Part III: A Simple HTTP Server Application

 

2018-08-01 12:51:43 woshiyexinjie 阅读数 192

 看到这Akka的官网,描述使用java或者scala构建响应式,并发和分布式应用更加简单,听着很高级的样子,下面的小字写着消息驱动,但是在quickstart里面又写容错事件驱动,就是这么钻牛角尖。

    Actors是的Akka的执行单位,Actor模型是一个抽象概念,它使得编写并发的、并行的和分布式的系统变得更加容易。官方给了一个 “helloworld” 示例,演示了基本知识,我们可以在如下的页面创建一个项目,点开旁边的 show all akka projects 可以看到所有的示例(https://developer.lightbend.com/start/?group=akka)。

    作为小白,就是这个最基础的这个,然后下载过来是个 zip 包,解压一下,执行以下命令:

 $ chmod u+x ./sbt
 $ chmod u+x ./sbt-dist/bin/sbt

      然后在这个目录中,我们执行 ./sbt 或者 sbt.bat(windows 环境)来下载项目依赖的包,不过真的是够慢的哈。

    然后执行 reStart 来构建项目和运行 Hello World,可以看到以下的输出,还可以看到 Actor 和我们打的招呼。

    在 akka 的 quickstart 里面还给我们画了一张图,看怎么运行的

    main 类里面创建一个 akka.actor.ActorSystem,构建 Actors 运行的容器,创建了三个Greeter Actor 和一个Printer Actor。

Messages

    该示例将消息发送到GreeterActor实例,这些实例在内部存储这些消息。最后,给Greeter Actor的指令消息会触发它们向PrinterActor发送消息,PrinterActor会将它们输出到控制台:

    Akka使用Actor和异步消息传递带来了一系列好处,Akka的以下特性允许您以直观的方式解决困难的并发性和可伸缩性挑战,举几个例子。

  • 事件驱动模型-Actors 执行响应消息的工作。Actors之间的通信是异步的,允许Actors发送消息并继续自己的工作,而不阻塞等待答复。
  • 强隔离原则-与Scala中的常规对象不同,Actor在可以调用的方法方面没有公共API。相反,它的公共API是通过参与者处理的消息定义的。这可以防止参与者之间共享状态;观察另一个参与者状态的唯一方法是向其发送请求它的消息。
  • 位置透明性-系统从工厂构造Actor并返回对实例的引用。由于位置并不重要,Actor实例可以启动、停止、移动和重新启动,以便向上和向下扩展,并从意外故障中恢复。
  • 轻量级-每个实例只消耗几百字节,这实际上允许数百万并发Actor存在于一个应用程序中。

    好戏再续:看看在HelloWorld示例的上下文中使用Actor和Message的一些最佳实践。

 

    有什么讨论的内容,可以加我公众号:

Reactive Streams简单介绍

博文 来自: netyeaxi

Reactive System

阅读数 821

没有更多推荐了,返回首页