精华内容
下载资源
问答
  • Mono里是不支持注解事务的。 比如 @Transactional public Mono<CommonOutput> save(RecordFileSaveReq req) { return Mono.just(true) .filter(b -> saveLog(req)) .fi...

    Mono里是不支持注解事务的。
    比如

        @Transactional
        public Mono<CommonOutput> save(RecordFileSaveReq req) {
            return Mono.just(true)
                    .filter(b -> saveLog(req))
                    .filter(b -> copyFile(req))
                    .filter(b -> removeTempFile(req))
                    .map(b -> success());
        }
    
    

    这样是不行的。

    只能回到老办法:

        @Transactional
        public CommonOutput logProcess(RecordFileSaveReq req) {
            //保存日志
            repository.save(req);
            String name = storeService.generateName();
            //转储
            storeService.copyFile(req.getVoiceAddress(), "");
            //删除临时文件
            removeTempFile(req);
    
            return success();
        }
    

    其中,repository.save方法和storeService.copyFile方法都会抛运行时异常。
    Controller里的方法就会很罗嗦,要截获异常:

        @PostMapping(name = "/recordFile/save")
        @ResponseBody
        public Mono<CommonOutput> save(@Valid @RequestBody Mono<RecordFileSaveReq> req) {
            return Mono.create(sink ->
                    req.doOnError(WebExchangeBindException.class, throwable ->
                            sink.success(fail(throwable))
                    ).doOnNext(r -> Mono.just(true)
                            .map(b -> service.logProcess(r))
                            .onErrorReturn(RecordObjectException.class, recordObjError())
                            .onErrorReturn(SessionIDTypeException.class, sessionAndTypeError())
                            .onErrorReturn(DataAccessException.class, otherError())
                            .onErrorReturn(IvcFileException.class, fileError())
                            .subscribe(sink::success)
                    ).subscribe());
        }
    

    分析源码可以发现,类TransactionAspectSupport中,prepareTransactionInfo方法要准备一个TransactionInfo:

    		// We always bind the TransactionInfo to the thread, even if we didn't create
    		// a new transaction here. This guarantees that the TransactionInfo stack
    		// will be managed correctly even if no transaction was created by this aspect.
    		txInfo.bindToThread();
    

    其中,TransactionInfo绑定到了当前线程中。
    事务的执行见invokeWithinTransaction方法:

    		if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
    			// Standard transaction demarcation with getTransaction and commit/rollback calls.
    			TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
    			Object retVal = null;
    			try {
    				// This is an around advice: Invoke the next interceptor in the chain.
    				// This will normally result in a target object being invoked.
    				retVal = invocation.proceedWithInvocation();
    			}
    			catch (Throwable ex) {
    				// target invocation exception
    				completeTransactionAfterThrowing(txInfo, ex);
    				throw ex;
    			}
    			finally {
    				cleanupTransactionInfo(txInfo);
    			}
    

    如果捕获到异常,就执行completeTransactionAfterThrowing方法:

    			if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
    				try {
    					txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
    				}
    				catch (TransactionSystemException ex2) {
    					logger.error("Application exception overridden by rollback exception", ex);
    					ex2.initApplicationException(ex);
    					throw ex2;
    				}
    				catch (RuntimeException | Error ex2) {
    					logger.error("Application exception overridden by rollback exception", ex);
    					throw ex2;
    				}
    			}
    			else {
    				// We don't roll back on this exception.
    				// Will still roll back if TransactionStatus.isRollbackOnly() is true.
    				try {
    					txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
    				}
    				catch (TransactionSystemException ex2) {
    					logger.error("Application exception overridden by commit exception", ex);
    					ex2.initApplicationException(ex);
    					throw ex2;
    				}
    				catch (RuntimeException | Error ex2) {
    					logger.error("Application exception overridden by commit exception", ex);
    					throw ex2;
    				}
    			}
    

    如果判断需要回滚(rollbackOn方法),就执行DataSourceTransactionManager类的回滚动作:

    	protected void doRollback(DefaultTransactionStatus status) {
    		DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
    		Connection con = txObject.getConnectionHolder().getConnection();
    		if (status.isDebug()) {
    			logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
    		}
    		try {
    			con.rollback();
    		}
    		catch (SQLException ex) {
    			throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
    		}
    	}
    

    WebFlux配合同步操作,性能太差了。
    做优化,继续修改事务部分
    目前是这样写的,简单测试一下,资源占用减少,性能大幅提升:

        public Mono<CommonOutput> logProcess(RecordFileSaveReq req) {
            return transactionTemplate.execute(transactionStatus ->
                    //客户是否正确
                    Mono.just(customerRepository.get(req.getChargeNbr()))
                            //session是否正确
                            .zipWith(Mono.just(callRepository.get(req.getSessionid())))
                            //保存日志
                            .doOnNext(z -> repository.save(req))
                            //获取存储信息
                            .zipWhen(z -> Mono.just(storageRepository.getStorageOfEnt(z.getT1().getId())))
                            .flatMap(z -> fileEventProcess(req, z))
                            .doOnNext(z -> LOG.debug("file process {} end.", req.getVoiceAddress()))
                            .thenReturn(success())
            );
        }
    

    其中,TransactionTemplate使用构造器注入:

        private final ReactiveTransactionTemplete transactionTemplate;
    
        public RecordFileSaveService(PlatformTransactionManager transactionTemplate) {
            this.transactionTemplate = new ReactiveTransactionTemplete(transactionTemplate);
        }
    

    ReactiveTransactionTemplete是参考Spring的TransactionTemplete自己实现的。

    首先,定义接口ReactiveTransactionCallback:

    public interface ReactiveTransactionCallback<T> {
        @NonNull
        Mono<T> doInTransaction(TransactionStatus status);
    }
    

    定义接口ReactiveTransactionOperations:

    public interface ReactiveTransactionOperations {
        @NonNull
        <T> Mono<T> execute(ReactiveTransactionCallback<T> action);
    }
    

    ReactiveTransactionTemplete的实现如下:

    public class ReactiveTransactionTemplete extends DefaultTransactionDefinition
            implements ReactiveTransactionOperations {
        private Logger LOG = LoggerFactory.getLogger(ReactiveTransactionTemplete.class);
    
        @NonNull
        private final PlatformTransactionManager transactionManager;
    
        public ReactiveTransactionTemplete(@NonNull PlatformTransactionManager transactionManager) {
            this.transactionManager = transactionManager;
        }
    
        @Override
        @NonNull
        public <T> Mono<T> execute(@NonNull ReactiveTransactionCallback<T> action) {
            Scheduler scheduler = Schedulers.newSingle(new DefaultThreadFactory("transaction"));
            return Mono.just(true)
                    .publishOn(scheduler)
                    .map(b -> transactionManager.getTransaction(this))
                    .zipWhen(status -> doAndError(action, status, scheduler))
                    .publishOn(scheduler)
                    .doOnNext(z -> transactionManager.commit(z.getT1()))
                    .map(Tuple2::getT2)
                    .doFinally(t -> scheduler.dispose());
        }
    
        private <T> Mono<T> doAndError(@NonNull ReactiveTransactionCallback<T> action,
                                       @NonNull TransactionStatus status,
                                       @NonNull Scheduler scheduler) {
            return action.doInTransaction(status)
                    .publishOn(scheduler)
                    .doOnError(e -> {
                        status.setRollbackOnly();
                        transactionManager.rollback(status);
                    });
        }
    }
    

    每次调用execute方法,都会增加新的线程,在执行结束的时候,调用scheduler.dispose()释放线程资源。
    execute方法的实现关键是,要确保getTransaction方法、commit方法和rollback方法被同一个线程调用
    这是因为,事务提交后,要清理资源,见TransactionSynchronizationManager类:

    	@Nullable
    	private static Object doUnbindResource(Object actualKey) {
    		Map<Object, Object> map = resources.get();
    		if (map == null) {
    			return null;
    		}
    		Object value = map.remove(actualKey);
    		// Remove entire ThreadLocal if empty...
    		if (map.isEmpty()) {
    			resources.remove();
    		}
    		// Transparently suppress a ResourceHolder that was marked as void...
    		if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
    			value = null;
    		}
    		if (value != null && logger.isTraceEnabled()) {
    			logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from thread [" +
    					Thread.currentThread().getName() + "]");
    		}
    		return value;
    	}
    
    展开全文
  • spring webflux

    2020-03-25 14:35:11
    spring webflux(一) 所有示例代码:https://github.com/cumtbzy2011/webfluxdemo 功能与api 背景 Netty作为java领域首屈一指的nio框架,其以优越的性能被众多中间件所使用。但到了java的web开发领域,却很难享受到...

    spring webflux(一)

    所有示例代码:https://github.com/cumtbzy2011/webfluxdemo

    功能与api
    背景
    Netty作为java领域首屈一指的nio框架,其以优越的性能被众多中间件所使用。但到了java的web开发领域,却很难享受到Netty的性能优势。其原因在于传统web开发基于servlet容器,许多依赖和开发框架都是基于servlet实现的,比如spring。而netty为了保持代码的简单和高效,并没有实现servlet标准,这就导致将web容器迁移到netty后许多框架和第三方库不能使用,迁移的成本过大。但spring webflux出现改变了这一现状。她在兼容原有mvc开发方式的同时,重写和实现了大量第三方库,在提升性能的同时,降低了迁移的成本。同时spring webflux适配多种web容器,即使仍然使用tomcat也是可以的。

    接口声明
    接口声明除了保留原有注解式声明的方式,为了满足reactor的编程风格,额外支持了函数式声明的方式。通工具类RouterFunctions过构造RounterFunction对象,并向Spring注入实现函数式接口声明。

    @Bean
    public TestHandler testHandler() {
        return new TestHandler();
    }
    
    @Bean
    public RouterFunction<ServerResponse> routes(TestHandler testHandler) {
        return RouterFunctions.route(RequestPredicates.POST("/route"),
            testHandler::echoName);
    }
    
    @GetMapping("anno")
    public String sayHello(String name) {
        return "hello world! " + name;
    }
    
    class TestHandler {
        public Mono<ServerResponse> echoName(ServerRequest request) {
            return request.bodyToMono(Post.class)
              .map(Post::getName)
              .flatMap(name -> ServerResponse.ok()
                .contentType(MediaType.TEXT_PLAIN)
                .body(BodyInserters.fromObject("hello world!" + name)));
        }
    }
    

    在WebFlux中,request和respose不再是原来的ServletRequest和ServletRequest,取而代之的是ServerRequest和ServerResponse。这两个对象是webflux新出现的。首先webflux底层如果使用了reactor-netty,那么自然就没有所谓的servlet一说,另外ServerRequest和ServerResponse提供了对non-blocking和backpressure特性的支持,提供了将Http消息内容转换成Mono和Flux的方法,使响应式编程成为了可能。

    过滤器Filter
    过滤器的使用方法和spring mvc类似,不过与ServerRequest和ServerResponse相同的是,webflux提供了一个新的过滤器接口WebFilter以提供对Mono和Flux的支持。代码如下:

    ```bash
    @Component
    public class DemoWebFilter implements WebFilter{
    
        @Override
        public Mono<Void> filter(ServerWebExchange serverWebExchange, WebFilterChain webFilterChain) {
            if (!serverWebExchange.getRequest().getHeaders().containsKey("token")) {
                serverWebExchange.getResponse().setStatusCode(HttpStatus.UNAUTHORIZED);
                return Mono.empty();
            }
            return webFilterChain.filter(serverWebExchange);
        }
    }
    ``
    

    `

    值得注意的是Mono这个返回值,在框架的很多地方都会用到。他意味着一个空的Mono,对于任何读取,他会立刻发出一个complete信号。相比与直接返回void,Mono作为方法的返回值时,可以对该方法进行链式调用。另外虽然Mono虽然没有返回值,但是其本身的complete或者error状态,也可以注册回调进行异步处理。

    异常处理
    在Spring Webflux中,异常分两种。一是controller中方法抛出的异常,这在webflux中同样可以像在mvc中用@ExceptionHandler声明异常处理方法。二是在WebHandler API这种比较偏底层的api,典型的是WebFilter,异常处理使用了支持Mono的新接口:WebExceptionHandler,可用于处理来自WebFilter链和WebHandler的异常。使用WebExceptionHandler时,只要将其声明为Spring bean即可自动注入并使用,并可选择通过bean声明上的@Order或通过实现Ordered来表示优先级。需要注意的是webflux有默认的WebExceptionHandler-DefaultErrorWebExceptionHandler,其order为默认的-1。如果我们想自定义WebExceptionHandler,那么必须将order声明为-2以上,否则异常将不会传递到我们自定义的WebExceptionHandler中。

    
    ```bash
    @Component
    //要比DefaultErrorWebExceptionHandler优先级-1高
    //比较底层,如果异常被@ExceptionHandler处理了,那么将不会由此处理
    //可以处理filter和webHandler中的异常
    @Order(-2)
    public class ErrorLogHandler implements WebExceptionHandler {
        @Override
        public Mono<Void> handle(ServerWebExchange exchange, Throwable ex) {
            exchange.getResponse().setStatusCode(HttpStatus.OK);
            byte[] bytes = ("ErrorLogHandler: " + ex.getMessage()).getBytes(StandardCharsets.UTF_8);
            DataBuffer wrap = exchange.getResponse().bufferFactory().wrap(bytes);
            return exchange.getResponse().writeWith(Flux.just(wrap));
        }
    }
    @ExceptionHandler(Exception.class)
        public String test(Exception e) {
            return "@ExceptionHandler: " + e.getMessage();
    }
    
    
    Multipart和Stream
    在基础框架reactor中Mono代表一个单次发送的数据源,而Flux代表一个可多次发送的数据源。在spring webflux的controller中,Mono很好理解,代表前端的一次传参或接口的一次返回。那么Flux该如何使用呢?简单来说Flux在这两个场景下使用:接受Multipart参数、返回Stream类型数据或者用于分批返回。代码如下:
    
    
    ```bash
    @PostMapping(value = "", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
        Mono<String> requestBodyFlux(@RequestBody Flux<Part> parts) {
            return parts.map(part -> part instanceof FilePart
                  ? part.name() + ":" + ((FilePart) part).filename()
                  : part.name())
              .collect(Collectors.joining(",", "[", "]"));
        }
    
        //如果不是application/stream json則呼叫端無法滾動得到結果,將一直阻塞等待資料流結束或超時。
        @GetMapping(value = "stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
        public Flux<Post> getBeanStream() {
            return Flux.interval(Duration.ofMillis(500))
              .map(l -> new Post("bian", LocalDateTime.now()))
              .log();
        }
    

    Multipart是Htp请求的一种常见的数据结构,常用于表单提交。在spring mvc中,表单中的每个键值对会映射成一个个part。到了webflux,自然而然地转换成代表多个表单字段Flux。而返回值Flux,则对应了一种新的MediaType:APPLICATION_STREAM_JSON_VALUE。他的使用需要浏览器或者客户端的支持。从使用中来看,浏览器会对每一次返回的数据分批处理。如果简单的get调用,会在页面滚动打印返回值,直到Flux发射完成:

    image.png

    而如果接口并没有声明produces = MediaType.APPLICATION_STREAM_JSON_VALUE的媒体类型,浏览器将会在Flux所有数据发射完毕后一次性打印。

    WebSocket
    在webflux中使用WebSocket功能很简单,只要注册WebSocketHandlerAdapter用于websocket协议的握手,再定义对应路径的websocket消息处理器即可:

    @Configuration
    @ComponentScan
    @EnableWebFlux
    class WebConfig {

    @Bean
    public HandlerMapping handlerMapping() {
        Map<String, WebSocketHandler> map = new HashMap<>();
        map.put("/echo", new EchoWebSocketHandler());
        SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
        mapping.setUrlMap(map);
        return mapping;
    }
    
    @Bean
    WebSocketHandlerAdapter webSocketHandlerAdapter(){
        return new WebSocketHandlerAdapter();
    }
    

    }
    public class EchoWebSocketHandler implements WebSocketHandler {

    public EchoWebSocketHandler() {
    }
    
    @Override
    public Mono<Void> handle(WebSocketSession session) {
        return session.send(    //1. 向一个websocket连接发送一段消息
          session.receive()     //2. 获得入站消息流
            .doOnNext(          //3. 对每一个websocket消息进行处理,相当于stream的map,返回的仍是一个流
              WebSocketMessage::retain  //4. 保留消息(主要针对池化内存(内部使用了netty的ByteBuf),使之引用计数+1,避免过早回收)
            )
        );
    }
    

    }
    需要注意的是,通过webSocketSession.receive() 获得的Flux,其每一次发射的数据WebSocketMessage如果是再Netty容器中,是一个对Netty中ByteBuf的保证,而ByteBuf在使用中有一点要注意,就是谁使用谁释放、retain()和release()成对出现。所以当把Flux发射的WebSocketMessage传递给其他方法使用时,注意要retain()增加一次计数,避免上一级方法release()使ByteBuf引用计数归零,导致过早回收。关于Netty的内存使用,下面会写一篇简要的介绍文章。

    Mongo
    MongoDB由于支持异步客户端,所以很适合在webflux项目中使用,spring-data-reactor也在第一时间做了支持。配合springboot的@EnableMongoAuditing注解,可以很快搭建异步mongo客户端。相关代码如下:

    @SpringBootApplication
    @EnableMongoAuditing
    public class Application {
    
        public static void main(String[] args) {
            SpringApplication.run(Application.class, args);
        }
    
    }
    
    
    @Component
    @Slf4j
    class DataInitializer implements CommandLineRunner {
    
        private final PostRepository posts;
    
        public DataInitializer(PostRepository posts) {
            this.posts = posts;
        }
    
        @Override
        public void run(String[] args) {
            log.info("start data initialization  ...");
            this.posts
              .deleteAll()
              .thenMany(
                Flux
                  .just("bianzhaoyu", "xinan")
                  .flatMap(
                    name -> this.posts.save(Post.builder().name(name).age(25).build())
                  )
              )
              .log()
              .subscribe(
                null,
                null,
                () -> log.info("done initialization...")
              );
    
        }
    
    }
    
    @RestController()
    @RequestMapping(value = "/posts")
    class PostController {
    
        private final PostRepository posts;
    
        public PostController(PostRepository posts) {
            this.posts = posts;
        }
    
        @GetMapping("")
        public Flux<Post> all() {
            return this.posts.findAll();
        }
    
        @PostMapping("")
        public Mono<Post> create(@RequestBody Post post) {
            return this.posts.save(post);
        }
    
        @GetMapping("/{id}")
        public Mono<Post> get(@PathVariable("id") String id) {
            return this.posts.findById(id);
        }
    
        @PutMapping("/{id}")
        public Mono<Post> update(@PathVariable("id") String id, @RequestBody Post post) {
            return this.posts.findById(id)
              .map(p -> {
                  p.setName(post.getName());
                  p.setAge(post.getAge());
    
                  return p;
              })
              .flatMap(p -> this.posts.save(p));
        }
    
        @DeleteMapping("/{id}")
        public Mono<Void> delete(@PathVariable("id") String id) {
            return this.posts.deleteById(id);
        }
    
    }
    
    interface PostRepository extends ReactiveMongoRepository<Post, String> {
    }
    
    @Data
    @ToString
    @Builder
    @NoArgsConstructor
    @AllArgsConstructor
    class Post {
    
        @Id
        private String id;
        private String name;
        private Integer age;
    
        @CreatedDate
        private LocalDateTime createdDate;
    }
    

    配置如下:

    spring:
      data:
        mongodb:
          uri: mongodb://localhost:27017/blog
          grid-fs-database: images
    
    <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
            </dependency>
    </dependencies>
    

    Redis
    异步Redis客户端的使用和普通Redis客户端类似,只是RedisTemplate的方法都原生支持了异步调用。使用时只要引入spring-boot-starter-data-redis-reactive依赖,并注册ReactiveRedisTemplate即可:

    @Bean
        public ReactiveRedisTemplate<String, Post> reactiveJsonPostRedisTemplate(
          ReactiveRedisConnectionFactory connectionFactory) {
            
            RedisSerializationContext<String, Post> serializationContext = RedisSerializationContext
              .<String, Post>newSerializationContext(new StringRedisSerializer())
              .hashKey(new StringRedisSerializer())
              .hashValue(new Jackson2JsonRedisSerializer<>(Post.class))
              .build();
            return new ReactiveRedisTemplate<>(connectionFactory, serializationContext);
        }
    
    @Component
    class PostRepository {
    
        ReactiveRedisOperations<String, Post> template;
    
        public PostRepository(ReactiveRedisOperations<String, Post> template) {
            this.template = template;
        }
    
        Flux<Post> findAll() {
            return template.<String, Post>opsForHash().values("posts");
        }
    
        Mono<Post> findById(String id) {
            return template.<String, Post>opsForHash().get("posts", id);
        }
    
        Mono<Post> save(Post post) {
            if (post.getId() != null) {
                String id = UUID.randomUUID().toString();
                post.setId(id);
            }
            return template.<String, Post>opsForHash().put("posts", post.getId(), post)
              .log()
              .map(p -> post);
    
        }
    
        Mono<Void> deleteById(String id) {
            return template.<String, Post>opsForHash().remove("posts", id)
              .flatMap(p -> Mono.<Void>empty());
        }
    
        Mono<Boolean> deleteAll() {
            return template.<String, Post>opsForHash().delete("posts");
        }
    
    }
    

    MySQL
    mysql作为现在使用最广的数据存储工具,可以说是选择任何框架时必须考虑到兼容性的一点。但是遗憾的是,由于JDBC协议只支持同步访问,spring目前并没有直接对jdbc的reactor客户端的支持。虽然可以通过引入第三方异步数据库连接池,或者将普通jpa方法用Mono,Flux指定调用线程池的方式进行包装,但是作为关系型数据库最重要的一点:事务,却无法用@Transactional实现。虽然可以将一个事务的代码写在一个异步函数中,但却无法做到像同步方法那样,使用@Transactional各个业务方法,导致可复用性和实用性极低。这里使用一个异步jdbc线程池rxjava2-jdbc,相比与Mono/Flux包装的方式,rxjava2-jdbc在返回一个connection时是异步的,虽然由于jdbc协议的线程,执行sql语句的时候仍然是同步阻塞的。rxjava-jdbc内部维护了一个线程池用于执行阻塞代码,这也避免了我们自定义线程池的麻烦。
    pom依赖:

    <dependency>
        <groupId>com.github.davidmoten</groupId>
        <artifactId>rxjava2-jdbc</artifactId>
        <version>0.1-RC23</version>
    </dependency>
    

    代码如下:

    /**
     * spring-data-jpa是同步的,repository返回的结果并不是Mono或者Flux形式。
     *     可以使用第三方异步jdbc连接池rxjava2-jdbc,但是由于每个方法是异步的,
     * 当数个异步方法组合起来时,并不能保证每个方法都是由一个线程按顺序调用的,
     * 这就使基于ThreadLocal的@Transactional无法使用
     *     当然,可以手动在一个异步方法中开启并提交事务,但是这还是失去了@Transactional组合
     * 不同方法到一个事物的便利性和可扩展性
     * @author xinan
     */
    @Component
    public class RxJava2PostRepository {
        private Database db;
    
        RxJava2PostRepository(Database db) {
            this.db = db;
        }
    
        public Observable<Post> findAll() {
            return this.db.select("select * from posts")
                .get(
                    rs -> new Post(rs.getLong("id"),
                        rs.getString("name"),
                        rs.getInt("age")
                    )
                )
                .toObservable();
        }
    
        public Single<Post> findById(Long id) {
            return this.db.select("select * from posts where id=?")
                .parameter(id)
                .get(
                    rs -> new Post(rs.getLong("id"),
                        rs.getString("name"),
                        rs.getInt("age")
                    )
                )
                .firstElement()
                .toSingle();
        }
    
        public Single<Integer> save(Post post) {
            return this.db.update("insert into posts(name, age) values(?, ?)")
                .parameters(post.getName(), post.getAge())
                .returnGeneratedKeys()
                .getAs(Integer.class)
                .firstElement()
                .toSingle();
        }
    
        String sql = "insert into posts(title, content) values(?, ?)";
    
        //使用事务
        public Single<Integer> saveTx(Post post) {
            return db.connection()
              .map(connection -> {
                  connection.setAutoCommit(false);
                  PreparedStatement pstmt = connection.prepareStatement(sql);
                  pstmt.setInt(1, post.getAge());
                  pstmt.setInt(2, post.getAge());
                  int i = pstmt.executeUpdate();
                  pstmt.close();
                  connection.commit();
                  return i;
              });
        }
    }
    

    13人点赞
    java

    作者:云海_54d4
    链接:https://www.jianshu.com/p/d648af830183
    来源:简书
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

    展开全文
  • AOP切面编程 什么是AOP AOP是面向切面编程。全称:Aspect Oriented Programming 面向切面编程指的是:程序是运行期间,动态地将某段代码插入到原来方法代码的某些位置中。这就叫面向切面编程。 ...

    AOP切面编程

    什么是AOP

      AOP是面向切面编程。全称:Aspect Oriented Programming

      面向切面编程指的是:程序是运行期间,动态地将某段代码插入到原来方法代码的某些位置中。这就叫面向切面编程。

    一个简单计算数功能加日记

      准备计算器相关类
    计算接口

    public interface Calculate {
    	public int add(int num1, int num2);
    
    	public int mul(int num1, int num2);
    
    	public int div(int num1, int num2);
    
    	public int sub(int num1, int num2);
    }
    

      计算机类

    public class Calculator implements Calculate {
    	public int add(int num1, int num2) {
    		System.out.println("日记 :【add】 方法调用前 。参数1是:" + num1 + " , 参数2是:" + num2);
    		return num1 + num2;
    	}
    
    	public int mul(int num1, int num2) {
    		System.out.println("日记 :【mul】 方法调用前 。参数1是:" + num1 + " , 参数2是:" + num2);
    		return num1 * num2;
    	}
    
    	public int div(int num1, int num2) {
    		System.out.println("日记 :【div】 方法调用前 。参数1是:" + num1 + " , 参数2是:" + num2);
    		return num1 / num2;
    	}
    
    	public int sub(int num1, int num2) {
    		System.out.println("日记 :【sub】 方法调用前 。参数1是:" + num1 + " , 参数2是:" + num2);
    		return num1 - num2;
    	}
    }
    

      测试的代码

    public class CalculatorTest {
    	public static void main(String[] args) {
    		Calculate calculate = new Calculator();
    		int result = calculate.add(12, 12);
    		System.out.println("相加的结果:" + result);
    		
    		result = calculate.mul(12, 12);
    		System.out.println("相乘的结果:" + result);
    	}
    }
    

    在这里插入图片描述

      上面这种方法加日记处理操作。日记的代码就会耦合到业务代码中。而且后期如果需要修改日记就需要去指的修改所有方法中的日记操作。这个维护操作非常不方便。
    可以说是一个很失败的例子。

    原始方法统一日记处理。

      把日记的内容封装到一个类去中集中处理。

    编写一个日记处理工具类

    public class LogUtil {
    	 /**
         * 记录前置的日志操作
         * @param method 当前运算操作
         * @param args 当前运算参数
         */
        public static void logBefore(String method, Object ... args){
            System.out.println("操作运算是 : " + method + " 参数是 : " + Arrays.asList(args));
        }
    
        /**
         * 返回日志操作
         * @param method 当前方法
         * @param result 当前操作返回值
         */
        public static void logAfterReturning(String method, Object result){
            System.out.println("当前操作运算时 : " + method + " 返回值是 : " + result);
        }
    
        /**
         * 当前操作产生的异常
         * @param method 当前操作
         * @param e 发生的异常
         */
        public static void logAfterThrowing(String method, Exception e){
            System.out.println("当前运算时 : " + method + " 发生的异常是 : " + e);
        }
    }
    
    

      修改原来Calculator中的日记代码

    @Override
    	public int add(int num1, int num2) {
    		LogUtil.log("add", num1, num2);
    		return num1 + num2;
    	}
    
    	@Override
    	public int mul(int num1, int num2) {
    		LogUtil.log("mul", num1, num2);
    		return num1 * num2;
    	}
    

      但是这种方式的不足之处是,每有一个需要加日记的类,都需要到类的代码中去添加日记功能代码。
    无法做到所有对象都统一处理。

    使用代理实现日记

    使用jdk动态代理实现日记

    创建一个计算器代理工具类

    public class CalculateProxyFactory {
    
        public static Object getProxy(final Calculate target) {
    
            /**
             * Proxy 是Jdk中自带的一个工具类(反射包下,属于反射的功能).
             * Proxy类的作用: 它可以帮我们创建代理类或实例
             * 方法newProxyInstance()说明: 创建代理对象实例
             * 第一个参数是: 目标对象的类加载器
             * 第二个参数是: 目标对象实现的所有接口
             * 第三个参数是: InvocationHandler 接口的实例
             * InvocationHandler 接口的实现类可以对代理的目标对象方法进行增强操作.
             * 代理的目标对象 ===>>> 需要额外增加功能的类(对象实例)
             * 增强操作 ===>>> 给原来功能添加的额外功能叫增强操作 ( 日记就是增强操作 )
             */
            return Proxy.newProxyInstance(
                    target.getClass().getClassLoader(),
                    target.getClass().getInterfaces(),
                    new InvocationHandler() { // 匿名内部类
                        /**
                         * invoke 方法是 InvocationHandler 接口中唯一的方法
                         * 代理对象每次调用方法时,都会执行 invoke() 方法 , 所有的增强操作都需要在invoke()方法中完成
                         * @param proxy  代理对象实例
                         * @param method 代理调用的方法的反射 Method 对象实例
                         * @param args  调用代理方法时传递进来的参数
                         * @return
                         * @throws Throwable
                         */
                        @Override
                        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                            System.out.println("代理调用了 invoke 方法 ");
                            System.out.println(method);  //打印方法信息
                            System.out.println(Arrays.asList(args)); //打印参数信息
                            // invoke() 方法执行代理对象的(加法 / 除法 / 增强日志)操作
                            Object result = null;
                            LogUtil.logBefore(method.getName(), args);
                            try {
                                // 1. 返回值是 method 方法调用时的返回值
                                result = method.invoke(target, args);
                                // 2. 增强操作
                                LogUtil.logAfterReturning(method.getName(), result);
                            }catch (Exception e){
                                LogUtil.logAfterThrowing(method.getName(), e);
                            }
                            // invoke() 返回代理方法的返回值
                            return result;
                        }
                    });
        }
    }
    
    

    测试代码:

    // 测试代码
     public static void main(String[] args) {
      // 目标对象
      Calculate target = new CalculateImpl();
      // 创建 Calculate 的代理对象实例
      Calculate calculateProxy = (Calculate) createJDKProxy(target );
      // jdk动态代理对象实例和目标对象实例 同宗同族 ( 他们都实现了相同的接口 )
      System.out.println(calculateProxy instanceof Calculate);
      System.out.println(target instanceof Calculate);
     
      System.out.println( "代理方法的结果是 : " + calculateProxy.div(100,20) );
     
      // jdk动态代理创建出来的代理对象实例 是 目标对象 接口的一个实现类
      // 这个代理对象 和 目标对象类没有父子关系 ( 只能用接口接收代理对象 )
     }
    

      优点:这种方式已经解决我们前面所有日记需要的问题。非常的灵活。而且可以方便的在后期进行维护和升级。
      缺点:当然使用jdk动态代理,需要有接口。如果没有接口。就无法使用jdk动态代理。

    使用cglib代理

    public class CGLibProxyFactory implements MethodInterceptor {
    
    	public static Object getCGLibProxy(Object target, Callback callback) {
    		// 创建一个CGLig生成器
    		Enhancer enhancer = new Enhancer();
    		// 设置父类。因为cglib是通过类,进行代码,不是通过接口
    		enhancer.setSuperclass(target.getClass());
    		// 设置拦截的代理方法
    		enhancer.setCallback(callback);
    		// create 方法创建一个代理对象并返回
    		return enhancer.create();
    	}
    
    	@Override
    	public Object intercept(Object proxy, Method method, Object[] params, MethodProxy methodProxy)
    			throws Throwable {
    		LogUtil.log(method.getName(), (int) params[0], (int) params[1]);
    		// 调用实际的对象的方法
    		// 一定要使用methodProxy对象
    		// 第一个参数是proxy代码对象的父类方法
    		Object result = methodProxy.invokeSuper(proxy, params);
    		System.out.println("这是后置代码");
    
    		return result;
    	}
    
    	public static void main(String[] args) {
    		Calculator calculator = (Calculator) CGLibProxyFactory.getCGLibProxy(new Calculator(),
    				new CGLibProxyFactory());
    		calculator.add(12, 13);
    	}
    }
    
    

    优点:在没有接口的情况下,同样可以实现代理的效果。
    缺点:同样需要自己编码实现代理全部过程。
    但是为了更好的整合Spring框架使用。所以我们需要学习一下Spring 的AOP 功能。

    AOP编程的专业术语

    通知(Advice)
      通知就是增强的代码。比如前置增强的代码。后置增强的代码。异常增强代码。这些就叫通知
    切面(Aspect)
    切面就是包含有通知代码的类叫切面。

    横切关注点
      横切关注点,就是我们可以添加增强代码的位置。比如前置位置,后置位置,异常位置。和返回值位置。这些都叫横切关注点。
    目标(Target)
      目标对象就是被关注的对象。或者被代理的对象。

    代理(Proxy)
      为了拦截目标对象方法,而被创建出来的那个对象,就叫做代理对象。

    连接点(Joinpoint)
      连接点指的是横切关注点和程序代码的连接,叫连接点。

    切入点(pointcut)
      切入点指的是用户真正处理的连接点,叫切入点。

      在Spring中切入点通过org.springframework.aop.Pointcut 接口进行描述,它使用类和方法作为连接点的查询条件。
    在这里插入图片描述

    使用Spring实现AOP简单切面编程

    使用ProxyFactoryBean进行AOP

    目标类:

    package com.zy.entity;
    
    
    import org.springframework.stereotype.Component;
    
    import java.io.Serializable;
    
    /**
     * @ClassName: Student
     * @Author: Tiger
     * @Title:
     * @Datetime: 2020/8/19   14:24
     * @Package: com.zy.entity
     */
    @Component
    public class Student implements Serializable ,StudentInterface{
        private Integer sid;
        private String name;
    //getset、tostring略
        public void study(){
            System.out.println("我在认真学习");
        }
        public void smoke(){
            System.out.println("吸烟有害健康");
        }
        public void show(){
            System.out.println("介绍一下我自己!");
        }
    }
    
    

    创建通知类:(封装辅助功能的类)

      BeforeAdvice:目标类方法执行前要执行的代码 实现接口MethodBeforeAdivce

    public class MyBeforeAdvice implements MethodBeforeAdvice {
        @Override
        public void before(Method method, Object[] objects, Object o) throws Throwable {
            System.out.println("目标方法"+method.getName()+"("+ Arrays.toString(objects) +")执行前要执行的代码");
        }
    }
    

      AfterAdivce:目标类方法执行后要执行的代码: 实现接口AfterReturningAdvice

    public class MyAfterAdvice implements AfterReturningAdvice {
        @Override
        public void afterReturning(Object o, Method method, Object[] objects, Object o1) throws Throwable {
            System.out.println("目标方法"+method.getName()+"("+ Arrays.toString(objects) +")执行之后要执行的代码");
            System.out.println(o+":::"+o1);
        }
    }
    

      AroundAdivce:目标类方法执行前后要执行的代码: 实现接口MethodIntercoptor

    public class MyInterceptor implements MethodInterceptor {
        @Override
        public Object invoke(MethodInvocation arg0) throws Throwable {
            Method method = arg0.getMethod();
            System.out.println(method+"方法的around通知1111");
            Object proceed = arg0.proceed();
            System.out.println(method+"方法的around通知2222");
            return proceed;
        }
    }
    

      ThrowAdivce:目标类方法异常时要执行的代码:实现接口ThrowsAdivce
    在这里插入图片描述

    public class MyThrowAdvice implements ThrowsAdvice {
        public void afterThrowing(Method method, Object[] objects, Object o1,Exception e){
            System.out.println(method.getName()+"出现异常,异常是:"+e.getMessage());
        }
    }
    
    

      配置问价中创建目标bean,通知bean创建bean

    <bean id="s1" class="com.zy.entity.Student">
            <property name="sid" value="112"></property>
            <property name="name" value="小脑斧"></property>
        </bean>
        <bean id="before" class="com.zy.aop.MyBeforeAdvice"/>
        <bean id="after" class="com.zy.aop.MyAfterAdvice"/>
        <bean id="around" class="com.zy.aop.MyInterceptor"/>
        <bean id="throw" class="com.zy.aop.MyThrowAdvice"/>
    <!--    使用proxyfactorybean吧bean组合成一个代理对象-->
        <bean id="studentProxy" class="org.springframework.aop.framework.ProxyFactoryBean">
    <property name="target" ref="s1"/>
            <property name="interceptorNames">
                <list>
                    <value>before</value>
                    <value>after</value>
                    <value>around</value>
                    <value>throw</value>
                </list>
            </property>
        </bean>
    

      为目标类创建接口

    public interface StudentInterface {
        void show();
        void smoke();
        void study();
    }
    

        测试;

     @Test
        public void test1(){
            ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
            StudentInterface student = (StudentInterface) context.getBean("studentProxy");
            student.show();
            student.study();
            student.smoke();
        }
    

    在这里插入图片描述
    在这里插入图片描述

    Spring中基于AOP的XML架构

    1.1 ioc和aop jar包
    1.2 创建目标bean对应的类 并提供接口
    在这里插入图片描述1.3 对应核心配置文件:引入context和aop标签
    在这里插入图片描述1.4 创建通知bean 其包含所有通知类型对应的方法

    @Component
    public class MyAdvice {
        public void afterMethod1() {
            System.out.println("public void afterMethod1()111");
        }
    
        public void afterMethod2() {
            System.out.println("public void afterMethods2()222");
        }
    
        public void beforeMethod() {
            System.out.println("public void beforeMethod()000");
        }
    
        public Object aroundMethod(ProceedingJoinPoint pjp) {
            System.out.println("方法运行前");
            Object result = null;
            try {
                result = pjp.proceed();
            } catch (Throwable e) {
                throw new RuntimeException(e);
            }
            System.out.println("public void aroundMethod()后");
            return result;
        }
        public void throwsMethod(Exception e){
            System.out.println("public void throwsMethod()!!!!!!!  异常原因:"+e.getMessage());
        }
    

    1.5 在核心装配文件中为目标类和通知类创建bean
    和通过aop标签 设置切面(通知+切入点)

      <!-- 创建目标bean -->
        <bean id = "s2" class="com.zy.entity.Student">
            <property name="name" value="小脑斧"></property>
            <property name="sid" value="34"></property>
        </bean>
        <bean id = "s3" class="com.zy.entity.Student">
            <property name="name" value="大狮子"></property>
            <property name="sid" value="24"></property>
        </bean>
        <!--======================================================-->
    <!--    为通知类创建bean-->
        <bean id="myAdvice" class="com.zy.advice.MyAdvice"/>
    <!--    通过aop标签,声明要在哪些对象的哪些方法上添加哪些通知-->
        <aop:config>
            <aop:aspect ref="myAdvice"> <!-- 声明通知额bean-->
    <!--            声明从切入点-->
                <aop:pointcut id="myPointCut" expression="execution(* com.zy.entity.*.*(..))"/>
                <aop:before  pointcut-ref="myPointCut" method="beforeMethod"/>
                <aop:after  pointcut-ref="myPointCut" method="afterMethod1"/><!-- 指定每种通知对应的方法 -->
                <aop:after-returning pointcut-ref="myPointCut"  method="afterMethod2"/>
                <aop:after-throwing  pointcut-ref="myPointCut" method="throwsMethod"  throwing="e"/>
                <aop:around  pointcut-ref="myPointCut" method="aroundMethod"/>
            </aop:aspect>
        </aop:config>
    

    测试:
    在这里插入图片描述
    在这里插入图片描述

    Spring中基于AOP的@AspectJ

      1.1 在核心配置文件中设置自动扫描包+自动代理
    在这里插入图片描述

       1.2 给目标类添加compontent注解
    在这里插入图片描述

       1.3 给通知类添加compontent注解和aspect注解(指定此类为通知类)
    在这里插入图片描述

      1.4 给通知类中的方法添加注解 指定对应的通知类型和切入点

    @Component("myAdvice2")
    @Aspect//声明此类是通知类
    public class MyAdvice2 {
        @After("execution(* com.zy.entity.*.*(..))")
        public void afterMethod1() {
            System.out.println("public void afterMethod1()1111");
        }
    
        @AfterReturning("execution(* com.zy.entity.*.*(..))")
        public void afterMethod2() {
            System.out.println("public void afterMethod2()22222222");
        }
    
        @Before("execution(* com.zy.entity.*.*(..))")
        public void beforeMethod() {
            System.out.println("public void beforeMethod()0000");
        }
    
        @Around("execution(* com.zy.entity.*.*(..))")
        public Object aroundMethod(ProceedingJoinPoint pjp) {
            System.out.println("public void aroundMethod()前");
            Object result = null;
            try {
                result = pjp.proceed();
            } catch (Throwable e) {
                throw new RuntimeException(e);
            }
            System.out.println("public void aroundMethod()后");
            return result;
        }
    
        //throwing  ::指定参数列表的变量名
        @AfterThrowing(value = "execution(* com.zy.entity.Student.show(..))", throwing = "e")
        public void throwsMethod(Exception e) {
            System.out.println("public void throwsMethod()!!!!!!!  异常原因:" + e.getMessage());
        }
    }
    

      1.5 测试:
    在这里插入图片描述

    spring事务操作

    什么是事务

    1)事务是数据库操作最基本单元,逻辑上一组操作,要么都成功,如果有一个失败所有操作都失败
    (2)典型场景:银行转账

    • lucy 转账 100 元 给 mary
    • lucy 少 100,mary 多 100
      2 、事务四个特性(ACID )
      (1)原子性
      (2)一致性
      (3)隔离性
      (4)持久性

    事务操作( (Spring 事务管理介绍)

    1 、事务添加到 JavaEE 三层结构里面 Service 层(业务逻辑层)
    2 、在 Spring 进行事务管理操作
    ( (1)有两种方式: )有两种方式:编程式事务管理和声明式事务管理(使用) 和声明式事务管理(使用)
    3 、声明式事务管理
    ( (1)基于注解方式 )基于注解方式(使用) (使用)
    (2)基于 xml 配置文件方式
    4 、在 Spring 进行声明式事务管理,底层使用 AOP 原理
    5 、Spring 事务管理 API
    (1)提供一个接口,代表事务管理器,这个接口针对不同的框架提供不同的实现类
    在这里插入图片描述

    注解声明式事务操作

    1、在spring配置文件中配置事务管理器

    <!--    创建事务管理器-->
        <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
            <!--        注入数据源-->
            <property name="dataSource" ref="dataSource"/>
        </bean>
    

    2、在spring配置文件中,开启事务注解
    (1)在spring配置文件中引入名称空间tx
    (2)开启事务注解

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:P="http://www.springframework.org/schema/p"
           xmlns:util="http://www.springframework.org/schema/util"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:tx="http://www.springframework.org/schema/cache"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/util https://www.springframework.org/schema/util/spring-util.xsd http://www.springframework.org/schema/context https://www.springframework.org/schema/context/spring-context.xsd http://www.springframework.org/schema/cache http://www.springframework.org/schema/cache/spring-cache.xsd">
    
    

    (2)开启事务注解

    <!--开启事务注解-->
    < tx:annotation- driventransaction-manager= "transactionManager"></ tx:annotation-driven>
    

    3 、在 service 类上面( 类上面(或者 或者 service 类里面方法上面)添加事务注解
    (1)@Transactional,这个注解添加到类上面,也可以添加方法上面
    (2)如果把这个注解添加类上面,这个类里面所有的方法都添加事务
    (3)如果把这个注解添加方法上面,为这个方法添加事务

    @Service
    @Transactional
    public class UserService {
    

    xml声明事务管理

    1 、在 spring 配置文件中进行配置
    第一步 配置事务管理器
    第二步 配置通知
    第三步 配置切入点和切面

      <context:component-scan base-package="com.zy"/>
        <!--    创建事务管理器-->
        <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
            <!--        注入数据源-->
            <property name="dataSource" ref="dataSource"/>
        </bean>
        <!--2 配置通知-->
        <tx:advice id="txAdvice">
            <!--配置事务参数-->
            <tx:attributes><!--指定哪种规则的方法上面添加事务-->
                <tx:method name="accountMoney" propagation="REQUIRED"/>
                <!--<tx:method name="account*"/>-->
            </tx:attributes>
        </ tx:advice>
    
        <!--3 配置切入点和切面-->
        <aop:config>
            <!--配置切入点-->
            <aop:pointcut id="pt" expression="execution(*
    com.atguigu.spring5.service.UserService.*(..))"/>
            <!--配置切面-->
            <aop:advisor advice- ref="txadvice" pointcut- ref="pt"/>
        </aop:config>
    

    事务操作(完全注解声明式事务管理)

    @Configuration
    @EnableTransactionManagement//开启事务
    @ComponentScan(basePackages = "com.zy")
    public class TXConfig {
    
        //创建数据库连接池
        @Bean
        public DruidDataSource getDruidDataSource() {
            DruidDataSource dataSource = new DruidDataSource();
            dataSource.setDriverClassName("com.mysql.jdbc.Driver");
            dataSource.setUrl("jdbc:mysql:///user_db");
            dataSource.setUsername("root");
            dataSource.setPassword("root");
            return dataSource;
        }
    
        @Bean
        public JdbcTemplate getJdbcTemplate(DataSource dataSource) {
            JdbcTemplate jdbcTemplate = new JdbcTemplate();
            jdbcTemplate.setDataSource(dataSource);
            return new JdbcTemplate();
        }
    
        //创建事务管理器
        @Bean
        public DataSourceTransactionManager
        getDataSourceTransactionManager(DataSource dataSource) {
            DataSourceTransactionManager transactionManager = new
                    DataSourceTransactionManager();
            transactionManager.setDataSource(dataSource);
            return transactionManager;
        }
    }
    
    

    Webflux

    1 、SpringWebflux 介绍

    (1)是 Spring5 添加新的模块,用于 web 开发的,功能和 SpringMVC 类似的,Webflux 使用
    当前一种比较流程响应式编程出现的框架。
    (2)使用传统 web 框架,比如 SpringMVC,这些基于 Servlet 容器,Webflux 是一种异步非阻
    塞的框架,异步非阻塞的框架在 Servlet3.1 以后才支持,核心是基于 Reactor 的相关 API 实现的。

    解释什么是异步非阻塞

    • 异步和同步
    • 非阻塞和阻塞
    • 上面都是针对对象不一样
    • 异步和同步针对调用者 异步和同步针对调用者,调用者发送请求,如果等着对方回应之后才去做其他事情就是同
      步,如果发送请求之后不等着对方回应就去做其他事情就是异步
    • 阻塞和非阻塞针对被调用者 阻塞和非阻塞针对被调用者,被调用者受到请求之后,做完请求任务之后才给出反馈就是阻
      塞,受到请求之后马上给出反馈然后再去做事情就是非阻塞

    Webflux 特点:

    第一 非阻塞式:在有限资源下,提高系统吞吐量和伸缩性,以 Reactor 为基础实现响应式编程
    第二 函数式编程:Spring5 框架基于 java8,Webflux 使用 Java8 函数式编程方式实现路由请求
    (5)比较 SpringMVC
    第一 两个框架都可以使用注解方式,都运行在 Tomet 等容器中
    第二 SpringMVC 采用命令式编程,Webflux 采用异步响应式编程

    2、响应式编程 、响应式编程( (Java 实现)

    (1)什么是响应式编程

    响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便
    地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。
    电子表格程序就是响应式编程的一个例子。单元格可以包含字面值或类似"=B1+C1"的公
    式,而包含公式的单元格的值会依据其他单元格的值的变化而变化。

    (2)Java8 及其之前版本

    • 提供的 观察者模式两个类 Observer 和 Observable、
    public class ObserverDemo extends Observable {
        public static void main(String[] args) {
            ObserverDemo observer =new ObserverDemo();
            //添加观察者
            observer.addObserver((o,arg)->{
                System.out.println("发生了变化");
            });
            observer.addObserver((o,arg)->{
                System.out.println("手动被观察者的通知,准备发生变化");
            });
    
            observer.setChanged();//监控数据变化
            observer.notifyObservers();//通知
        }
    }
    
    

    3、响应式编程 、响应式编程( (Reactor 实现)

    ( (1 )响应式编程操作中,Reactor 是满足 Reactive 规范框架
    ( (2 )Reactor 有两个核心类,Mono 和 和 Flux ,这两个类实现接口 Publisher,提供丰富操作 ,提供丰富操作
    符。 符。Flux 对象实现发布者,返回 N 个元素;Mono 实现发布者,返回 0 或者 1 个元素
    (3)Flux 和 Mono 都是数据流的发布者,使用 Flux 和 Mono 都可以发出三种数据信号:
    元素值,错误信号,完成信号,错误信号和完成信号都代表终止信号,终止信号用于告诉
    订阅者数据流结束了,错误信号终止数据流同时把错误信息传递给订阅者
    在这里插入图片描述

    (4 )代码演示 Flux 和 和 Mono
    第一步 引入依赖

    < dependency>
    < groupId>io.projectreactor</ groupId>
    < artifactId>reactor-core</ artifactId>
    < version>3.1.5.RELEASE</ version>
    </ dependency>
    

    第二步 编程代码

    public class ReactorTest {
        //just声明元素
        public static void main(String[] args) {
            Flux.just(1, 2, 3, 4, 5, 6);
            Mono.just(1);
            //其他方法
            Integer[] a = {1, 2, 3, 4, 5, 6};
            Flux.fromArray(a);
            List<Integer> list = Arrays.asList(a);
            Flux.fromIterable(list);
            Stream<Integer> stream = list.stream();
            Flux.fromStream(stream);
    
        }
    }
    

    (5)三种信号特点

    • 错误信号和完成信号都是终止信号,不能共存的
    • 如果没有发送任何元素值,而是直接发送错误或者完成信号,表示是空数据流
    • 如果没有错误信号,没有完成信号,表示是无限数据流
      (6)调用 just 或者其他方法只是声明数据流,数据流并没有发出,只有进行订阅之后才会触发数据流,不订阅什么都不会发生的
      在这里插入图片描述

    (7 )操作符

    • 对数据流进行一道道操作,成为操作符,比如工厂流水线
      第一 map 元素映射为新元素
      在这里插入图片描述第二 flatMap 元素映射为流
      ⚫ 把每个元素转换流,把转换之后多个流合并大的流
      在这里插入图片描述

    4 、SpringWebflux 执行流程和核心 API

    SpringWebflux 基于 Reactor,默认使用容器是 Netty,Netty 是高性能的 NIO 框架,异步非阻塞的框架。
    (1)Netty

    • BIO(Blocking IO阻塞io)
    • 在这里插入图片描述

    NIO(non-blocking IO非阻塞io)
    在这里插入图片描述

    (2)SpringWebflux 执行过程和 SpringMVC 相似的

    • SpringWebflux 核心控制器 DispatchHandler,实现接口 WebHandler
    • 接口 WebHandler 有一个方法
      在这里插入图片描述在这里插入图片描述(3)SpringWebflux 里面 DispatcherHandler,负责请求的处理
    • HandlerMapping:请求查询到处理的方法
    • HandlerAdapter:真正负责请求处理
    • HandlerResultHandler:响应结果处理
      (4)SpringWebflux 实现函数式编程,两个接口:RouterFunction(路由处理)
      和 HandlerFunction(处理函数)

    5 、SpringWebflux (基于注解编程模型)

    SpringWebflux 实现方式有两种:注解编程模型和函数式编程模型
    使用注解编程模型方式,和之前 SpringMVC 使用相似的,只需要把相关依赖配置到项目中,
    SpringBoot 自动配置相关运行容器,默认情况下使用 Netty 服务器
    第一步 创建 SpringBoot 工程,引入 Webflux 依赖
    在这里插入图片描述在这里插入图片描述第二步配置端口号在这里插入图片描述第三步 创建包和相关类
    ⚫ 实体类
    ⚫ 创建接口定义操作的方
    在这里插入图片描述⚫ 创建接口定义操作的方法

    public interface UserService {
        //根据id查用户
        Mono<User> getUserById(Integer id);
    
        //查询所有
        Flux<User> getAllUser();
    
        //添加用户
        Mono<Void> insertUser(Mono<User> user);
    }
    

    ⚫ 接口实现类

    @Repository
    public class UserServiceIml implements UserService {
    
        private final Map<Integer, User> users = new HashMap<>();
    
        public UserServiceIml() {
            this.users.put(1, new User("小脑斧", 21, "nan"));
            this.users.put(2, new User("大脑斧", 21, "nan"));
            this.users.put(3, new User("武松打虎", 21, "nan"));
        }
    
        @Override
        public Mono<User> getUserById(Integer id) {
    
            return Mono.just(this.users.get(id));
        }
    
        @Override
        public Flux<User> getAllUser() {
            return Flux.fromIterable(this.users.values());
        }
    
        @Override
        public Mono<Void> insertUser(Mono<User> userMono) {
    
            return userMono.doOnNext(person -> {
    //向 map 集合里面放值
                int id = users.size() + 1;
                users.put(id, person);
            }).thenEmpty(Mono.empty());
        }
    }
    

    ⚫ 创建 controller

    @RestController
    public class UserController {
    
        @Autowired
        private UserService userService;
    
        @GetMapping("/user/{id}")
        public Mono<User> getUser(@PathVariable Integer id) {
            return userService.getUserById(id);
        }
    
        @GetMapping("/user/all")
        public Flux<User> findAll() {
            return userService.getAllUser();
        }
    
        @PostMapping("/saveUser")
        public Mono<Void> save(@RequestBody User userMono) {
            Mono<User> mono = Mono.just(userMono);
            return userService.insertUser(mono);
    
        }
    }
    

    在这里插入图片描述⚫ 说明
    SpringMVC 方式实现,同步阻塞的方式,基于 SpringMVC+Servlet+Tomcat
    SpringWebflux 方式实现,异步非阻塞 方式,基于 SpringWebflux+Reactor+Netty

    6 、SpringWebflux (基于函数式编程模型)

    (1)在使用函数式编程模型操作时候,需要自己初始化服务器
    (2)基于函数式编程模型时候,有两个核心接口:RouterFunction(实现路由功能,请求转发
    给对应的 handler)和 HandlerFunction(处理请求生成响应的函数)。核心任务定义两个函数
    式接口的实现并且启动需要的服务器。
    ( 3 ) SpringWebflux 请 求 和 响 应 不 再 是 ServletRequest 和 ServletResponse , 而 是
    ServerRequest 和 ServerResponse
    第一步 把注解编程模型工程复制一份 ,保留 entity 和 service 内容
    第二步 创建 Handler(具体实现方法)

    import org.springframework.http.MediaType;
    import org.springframework.web.reactive.function.server.ServerRequest;
    import org.springframework.web.reactive.function.server.ServerResponse;
    import reactor.core.publisher.Flux;
    import reactor.core.publisher.Mono;
    import static org.springframework.web.reactive.function.BodyInserters.fromObject;
    public class UserHandler {
    
        private final UserService userService;
    
        public UserHandler(UserService userService) {
            this.userService = userService;
        }
    
        //根据id查询
        public Mono<ServerResponse> getUserById(ServerRequest request) {
            //获取id
            Integer id = Integer.valueOf(request.pathVariable("id"));
            //空值处理
            Mono<ServerResponse> notFound = ServerResponse.notFound().build();
            //查询数据
            Mono<User> user = this.userService.getUserById(id);
    
            return
                    user
                            .flatMap(person -> ServerResponse.ok()
                                    .contentType(MediaType.APPLICATION_JSON).
                                            body(fromObject(person))
                                    .switchIfEmpty(notFound));
    
    
        }
    
    
        //添加操作
        public Mono<ServerResponse> saveUser(ServerRequest request) {
            //得到user对象
            Mono<User> userMono = request.bodyToMono(User.class);
            return ServerResponse.ok().build(this.userService.insertUser(userMono));
        }
    
        //查询所有
        public Mono<ServerResponse> getAllUsers(ServerRequest request) {
            Flux<User> user = this.userService.getAllUser();
            return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON).body(user, User.class);
        }
    }
    
    

    第三步 初始化服务器,编写 Router
    ⚫ 创建路由的方法

    public class Server {
        //创建Router路由
        public RouterFunction<ServerResponse> routingFunction(){
            UserService userService = new UserServiceIml();
            UserHandler handler = new UserHandler(userService);
    
            return RouterFunctions. route (
                    GET ( "/users/{id}").and( accept ( APPLICATION_JSON )),handler::getUserById)
                    .andRoute( GET ( "/users").and( accept ( APPLICATION_JSON )),handler::getAllUsers);
        }
    }
    
    

    ⚫ 创建服务器完成适配

    //创建服务器完成适配
        public void creatReactorServer(){
            //路由和适配器
            RouterFunction<ServerResponse> route = routingFunction();
            HttpHandler httpHandler = toHttpHandler (route);
            ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(httpHandler);
    
            //创建服务器
            HttpServer httpServer = HttpServer. create ();
            httpServer.handle(adapter).bindNow();
    
        }
    

    ⚫ 最终调用

    public static void main(String[] args) throws IOException {
            Server server = new Server();
            server.creatReactorServer();
            System.out.println("enter to continue");
            System.in.read();
        }
    
    

    在这里插入图片描述
    在这里插入图片描述使用webclient调用

    public class MyClient {
        public static void main(String[] args) {
            //调用服务器地址
            WebClient webClient = WebClient.create("https//127.0.0.1:64803");
            //根据id查询
            String id = "1";
            User user = webClient.get().uri("/users/{id}", id).accept(MediaType.APPLICATION_JSON).retrieve().bodyToMono(User.class).block();
            System.out.println(user);
    
    
            //查询所有
            Flux<User> userFlux = webClient.get().uri("/users").accept(MediaType.APPLICATION_JSON).retrieve().bodyToFlux(User.class);
    
            userFlux.map(stu -> stu.getName()).buffer().doOnNext(System.out::println).blockFirst();
        }
    }
    
    展开全文
  • WebFlux 描述 Spring Framework 中包含的原始 Web 框架 Spring Web MVC 是专门为 Servlet API 和 Servlet 容器构建的。反应式堆栈 Web 框架 Spring WebFlux 是在 5.0 版本中添加的。它是完全非阻塞的,支持 Reactive...

    WebFlux

    描述

    Spring Framework 中包含的原始 Web 框架 Spring Web MVC 是专门为 Servlet API 和 Servlet 容器构建的。反应式堆栈 Web 框架 Spring WebFlux 是在 5.0 版本中添加的。它是完全非阻塞的,支持 Reactive Streams背压,并在 Netty、Undertow 和 Servlet 3.1+ 容器等服务器上运行。

    官方文档: SpringWebFlux

    概述

    为什么创建 Spring WebFlux?

    部分答案是需要一个非阻塞的 Web 堆栈来处理具有少量线程的并发并使用较少的硬件资源进行扩展。Servlet 3.1 确实为非阻塞 I/O 提供了 API。但是,使用它会远离 Servlet API 的其余部分,其中契约是同步 ( Filter, Servlet) 或阻塞 ( getParameter, getPart)。这是一个新的通用 API 作为任何非阻塞运行时的基础的动机。这很重要,因为服务器(例如 Netty)在异步、非阻塞空间中建立良好。

    答案的另一部分是函数式编程。就像 Java 5 中添加注释创造了机会(例如带注释的 REST 控制器或单元测试)一样,Java 8 中添加的 lambda 表达式为 Java 中的函数式 API 创造了机会。这对于允许异步逻辑的声明式组合的非阻塞应用程序和延续式 API(由CompletableFutureReactiveX推广)是一个福音。在编程模型级别,Java 8 使 Spring WebFlux 能够提供功能性 Web 端点以及带注释的控制器。

    并发模型

    Spring MVC 和 Spring WebFlux 都支持带注释的控制器,但在并发模型和阻塞和线程的默认假设上有一个关键的区别。

    在 Spring MVC(以及一般的 servlet 应用程序)中,假设应用程序可以阻塞当前线程(例如,用于远程调用)。出于这个原因,servlet 容器在请求处理期间使用一个大型线程池来吸收潜在的阻塞。

    在 Spring WebFlux(以及一般的非阻塞服务器)中,假设应用程序不会阻塞。因此,非阻塞服务器使用一个小的、固定大小的线程池(事件循环工作者)来处理请求。

    整合

    pom.xml

    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-webflux</artifactId>
    		</dependency>
    

    yml配置: 加入 mybatis 操作数据库:

    spring:
      datasource:
        username: root
        password: root
        url: jdbc:mysql://localhost/user?serverTimezone=UTC&useSSL=false&allowMultiQueries=true
        driver-class-name: com.mysql.cj.jdbc.Driver
    ## 服务端口
    server:
      port: 8080
    
    ##扫描的xml 路径和 实体类
    mybatis:
      mapper-locations: classpath:mapper/*Mapper.xml,
      type-aliases-package: com.example.springbootwebfluxintegrat.bean
    
    

    UserController 控制层:

        @Autowired
        private IUserService iUserService;
    
        /**
         * 获取用户信息:
         * @param userId
         * @return
         */
        @GetMapping("/getUser")
        public Mono<User> getUser(Integer userId) {
            return Mono.just(iUserService.getById(userId));
        }
    
        /**
         * 删除用户信息:
         * @param userId
         */
        @DeleteMapping("/deleteUser")
        public void deleteUser(Long userId) {
            iUserService.removeById(userId);
        }
    
    
        @GetMapping("list")
        public Flux<List<User>> list(){
            return Flux.just(iUserService.list());
        }
    
    
        /**
         * 分页列表查询:
         * @param page
         * @param rows
         * @return
         */
        @GetMapping("getPageUser")
        public Flux<IPage<User>> getPageUser(Integer page, Integer rows){
            Page<User> userPage = new Page<User>();
            userPage.setCurrent(page);
            userPage.setSize(rows);
            IPage<User> userIPage = iUserService.selectUserPage(userPage);
            return Flux.just(userIPage);
        }
    
    

    实例代码在github上面需要的请自行拉取:spring-boot-integrate 然后后续会集成更多的模块进去,需要请点个star。后续会集成更多的接口实现,有需要的请保存。

    如果这篇文章,有帮助到大家的,请给作者一个一键三连,谢谢

    展开全文

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 4,572
精华内容 1,828
关键字:

webflux事务