精华内容
下载资源
问答
  • springboot 集成 vertx-kafka-client

    千次阅读 2021-01-10 13:55:18
    springboot 集成 vertx-kafka-client 为什么尝试做这个集成 vertx是一套封装了netty的异步事件驱动的框架,netty采用的线程模型可以高效处理某些情况下的网络通讯,然而这套框架需要程序员使用函数编程的方式,不是...

    为什么尝试做这个集成

    vertx是一套封装了netty的异步事件驱动的框架,netty采用的线程模型可以高效处理某些情况下的网络通讯,然而这套框架需要程序员使用函数编程的方式,不是传统的方式。本项目主要是为了构建一个框架。熟悉springboot编程的程序员只需要通过注解或者接口编程的式就可以使用到 vertx-kafka-client。

    项目依赖

    集成demo采用的依赖如下,主要是spring-boot-starter-web和vertx-kafka-client。

    <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
                <version>2.2.2.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.16.18</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.73</version>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/io.vertx/vertx-kafka-client -->
            <dependency>
                <groupId>io.vertx</groupId>
                <artifactId>vertx-kafka-client</artifactId>
                <version>4.0.0</version>
            </dependency>
    

    初始化生产者和消费者Bean

    初始化生产者

    初始化生产者的逻辑很简单,通过ApplicationContext取出所有的Consumer类,再通过反射取到消费者上的注解 MessageHandler所标注的信息(包括topic,msgType等)。在config类中,注册了消费者的事件,并进行topic监听。代码如下:

    @Bean
        public List<KafkaConsumer> kafkaConsumers(){
    // use consumer for interacting with Apache Kafka
            List<KafkaConsumer> kafkaConsumers = new ArrayList<>();
            Map<String, IKafkaHandler> consumerHandlers = this.context.getBeansOfType(IKafkaHandler.class);
            for(String kafkaHandlerBean : consumerHandlers.keySet()){
    
    
                //通过反射获取MessageHandler里的元信息
    
                try {
                    IKafkaHandler handler = consumerHandlers.get(kafkaHandlerBean);
                    Class clazz = handler.getClass();
                    Method handleMethod = clazz.getDeclaredMethod("handle",Object.class);
                    MessageHandler anno = handleMethod.getAnnotation(MessageHandler.class);
                    String consumerGroup = anno.consumerGroup();
                    Class msgType = anno.msgType();
                    Map<String, String> config = new HashMap<>();
                    config.put("bootstrap.servers", "localhost:9092");
                    config.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                    config.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
                    if(Strings.isNotBlank(consumerGroup)) {
                        config.put("group.id", consumerGroup);
                    }
                    config.put("auto.offset.reset", "earliest");
                    config.put("enable.auto.commit", "false");
                    KafkaConsumer<String,String> consumer = KafkaConsumer.create(vertx, config);
    
                    String topic = anno.topic();
                    consumer.handler(message->{
                        String value = message.record().value();
                        String key = message.record().key();
                        try {
                            if (msgType.getSimpleName().equals("String")) {
                                handler.handle(value);
                            } else {
                                Object var1 = JSON.parseObject(value, msgType);
                                handler.handle(var1);
                            }
                        } catch (Exception e){
                            logger.info("consume error,msg = {}",value);
                        }
                    });
                    consumer.exceptionHandler(error->logger.info("consumer出错{}",error.toString()));
                    consumer.subscribe(topic);
                    kafkaConsumers.add(consumer);
                } catch (Exception e) {
                    logger.error("error",e);
                }
    
            }
            return kafkaConsumers;
        }
    
    初始化生产者

    生产者的初始化很简单,直接初始化一个Bean即可。代码如下:

        @Bean
        public KafkaProducer kafkaProducer(){
            Map<String, String> config = new HashMap<>();
            config.put("bootstrap.servers", "localhost:9092");
            config.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            config.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            config.put("acks", "1");
    
    // use producer for interacting with Apache Kafka
            KafkaProducer<String, String> producer = KafkaProducer.create(vertx, config);
            return producer;
        }
    

    如何使用消费者

    用户可以通过接口编程的方式的实现来进行 consumer 的使用。继承如下接口即可,使用 MessageHanlder 表明其 topic, 示例说明:

    public class Topic1Handler implements IKafkaHandler<Message> {
    
        private static final Logger logger = LoggerFactory.getLogger(Topic1Handler.class);
    
        /**
         * msgType 需要和 handle里的参数类型相同
         * @param message
         */
        @MessageHandler(topic = "topic1",msgType = Message.class)
        @Override
        public void handle(Message message) {
            logger.info("topic1 收到消息:{}",message);
        }
    }
    

    如何使用生产者

    通过注入producer即可实现一个默认配置的KafkaProducer,进行消息生产的代码。

    @Autowired
    KafkaProducer producer;
    

    以上便是大概的代码,在我的git上有完整的demo,大家可以看看。
    https://github.com/chifanchen/intergrate-springboot-with-vertx-kafka-client

    展开全文
  • 这两个密码需要设置的一致 项目介绍 springboot-vertx 是基于swak 的一个演示项目,主要演示了如下部分: 基本的启动框架 api的配置方式,例如AdminApi、AnnoApi、ParamApi、UserApi。 api权限的校验,简单的权限...

    项目第一步,配置jwt

    有三类算法:HS,RS,EC.

    两种场景:

    场景一: 如果创建token和验证token都在同一个项目中,则可以使用keystore的方式。

    见 swak-test项目下的 KeyStoreTes

    创建keystore 的方式:

    keystore文件可以存放在classpath目录或者文件系统,配置时指定存储的路径。

    keystore 加载规则:

    classpath 的配置方式:直接放在src/main/resources 目录下,打包时会打进包中。

    然后在代码中设置keystore 的名称即可。

    文件系统:file:path 这样的配置方式。

    场景二: 如果创建token和验证token需要分开使用不同的项目,则可以使用RS和EC的算法。

    见 swak-test项目下的RsaTest.java、EcTest.java、RsaReadOnlyTest.java

    支持公匙和私匙的配置方式。

    创建token则需要配置公匙和私匙。

    解密token则只需要配置公匙

    keytool -genseckey -keystore D:\keystore.jceks -storetype jceks -storepass secret -keyalg HMacSHA256 -keysize 2048 -alias HS256 -keypass secret

    请修改-storepass -keypass 设定的密码,这两个密码需要设置的一致

    项目介绍

    springboot-vertx 是基于swak 的一个演示项目,主要演示了如下部分:

    基本的启动框架

    api的配置方式,例如AdminApi、AnnoApi、ParamApi、UserApi。

    api权限的校验,简单的权限校验方式,配置方式AppConfiguration 第 48 行代码到第 54 行代码。

    参数的传递方式,详细见测试代码 ParamTest.java

    返回值的使用方式,详细见测试代码 ParamTest.java

    项目如何测试继承AppRunnerTest,详细见测试代码 ParamTest.java

    service 的配置 UserServiceImpl

    自定义的项目启动

    大家都很熟悉springboot的四大特性项目依赖,自动化配置。本人也非常喜欢,但不太喜欢引入过多的start.jar来引入组件。

    所以本人对spring做了一些简单的定制。将他的启动属性文件spring.factories 改为 swak.factories.

    改变的方法也很简单中有详细的源码:swak-starter。所有我们熟悉的组件都放在 com.swak.config 中来自启动。

    AppConfiguration.java -- 项目级别的启动配置

    AppRunner.java -- 启动入口,执行main方法

    执行注解com.swak.ApplicationBoot所引入的com.swak.selector.AutoConfigurationImportSelector,此类将会执行swak.factories中引入的启动配置。

    如果是web环境(依赖的jar有vertx的相关类)则会实例化 ReactiveServerApplicationContext,否则会实例化 AnnotationConfigApplicationContext。

    ReactiveServerApplicationContext 实例化之后会启动 ReactiveServer,vertx 开始初始化。

    vertx 的启动依赖如下:(swak-vertx)

    ReactiveServer -> MainVerticle -> ServiceVerticle(多个) -> HttpVerticle(一个多实例)

    如上是简单的解释,后续在完善。

    API的配置方式、参数处理、权限校验、返回值处理

    通过注解的方式来配置,类似springmvc 的方式.注解只有:PageController、RestController、PostMapping、GetMapping简单明了。

    PageController 和 RestController 没有过多的差别,只是说明此Controller是来做展示页面的。

    API 中最重要的类是 HandlerAdapter, 将api的path绑定到vertx中的 Router,调用api的method,处理参数,处理返回值等。

    响应式HTTP服务器的简单介绍

    何为响应式,说白了就是回调,IO处理好之后回调注册的方法。这样线程就不需要等待IO完成才继续执行后续代码.

    vertx 被称为java中的node.js,多线程版node.js。

    swak是基于vertx做了简单的封装,让我们能像使用springmvc一样使用vertx。

    响应式开发并不难,难的是需要弄清楚回调之后的代码在哪个线程中执行。以前我们开发ssm项目时,一个请求对应一个线程,从头走到尾,我们基本上不用理会当前线程是谁,执行是否需要花费很长时间。

    我觉得响应式最大的特点是在请求响应这条事务中可以任意(只是比如而已)的切换线程。

    例如,本例子中,Controller的执行在netty的eventloop线程中,UserServiceImpl 的执行在 vertx的work线程中。

    下面简单解释下这个执行过程。vertx 的关键点其内部的 Eventbus。可以简单的把他想象成为一个map。map 的key是接口类型,value是实现类对象的封装。

    Controller 调用 service 时,实际上是发送一个消息到 Eventbus,消息中包含service的接口类型,Eventbus获取相应的实现类对象并在相应线程中执行,将返回值通过Eventbus通过消息的形式返回。

    具体参见 InvokerHandler 第 73 行 发送消息代码。

    service 对象执行 参见 ServiceVerticle handle 部分

    项目中为啥会出现这两类接口

    UserService、UserServiceAsync

    UserService -- 和我们做ssm时后的service接口一样,用于同步返回,一帮用于jdbc 的操作接口。

    UserServiceAsync -- 异步返回版本,这个可以自动生成,但自动生成的代码返回值中的范型无法动态设置,所以现在都手动生成此异步接口。

    关于开发响应式系统的一些经验

    以前我们开发ssm时喜欢将系统分为三层的开发模式,Controller、service、dao。现在也一样也是三层。不同点在于Controller在eventloop线程中执行,service、dao和之前一样在work线程中执行,service、dao以前怎么开发现在也怎么开发,没变。

    Controller 调用 service 通过异步发消息的方式来调用。

    Controller 的 代码

    @RestController(path = "/api/user", value = "userApi")

    public class UserApi {

    @VertxReferer

    private UserServiceAsync userService;

    /**

    * 获取用户

    *

    * @param subject

    * @return

    */

    @GetMapping("/get")

    public CompletableFuture get(Subject subject) {

    return userService.get(subject.getIdAsLong()).thenApply(res -> Result.success(res));

    }

    }

    service 的代码,和之前一样使用spring声明式事务等。

    @VertxService

    public class UserServiceImpl implements UserService {

    @Override

    public User get(Long id) {

    return new User().setId(id);

    }

    }

    java的jdbc是同步执行的,spring的事务业务依赖当前线程的,所以不要在service中切换线程。

    其他一些io,例如redis,http 客户端,我都是用的基于netty的异步客户端。

    相关说明后续补充...

    SWAK 项目

    展开全文
  • vertx for springboot

    2018-08-29 14:58:08
    vertx for springboot 是基于vert.x集成spring boot框架,只需要启动自己的eureka service 就能启动
  • <groupId>io.vertx</groupId> <artifactId>vertx-codegen</artifactId> ${vertex.version}</version> </dependency> <dependency> <groupId>io.vertx</groupId> <artifactId>vertx-web-client</artifactId> ${vertex...

    项目结构:
    在这里插入图片描述

    依赖:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.4.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.xxxx</groupId>
        <artifactId>vertxspringboot</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>vertxspringboot</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
            <vertex.version>3.9.1</vertex.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-jpa</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
                <exclusions>
                    <exclusion>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-tomcat</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <scope>runtime</scope>
            </dependency>
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>io.vertx</groupId>
                <artifactId>vertx-core</artifactId>
                <version>${vertex.version}</version>
            </dependency>
            <dependency>
                <groupId>io.vertx</groupId>
                <artifactId>vertx-web</artifactId>
                <version>${vertex.version}</version>
            </dependency>
            <dependency>
                <groupId>io.vertx</groupId>
                <artifactId>vertx-codegen</artifactId>
                <version>${vertex.version}</version>
            </dependency>
            <dependency>
                <groupId>io.vertx</groupId>
                <artifactId>vertx-web-client</artifactId>
                <version>${vertex.version}</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    
    

    配置:

    spring.datasource.url=jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
    spring.datasource.username=root
    spring.datasource.password=123456
    spring.datasource.driver-class-name=com.mysql.jdbc.Driver
    spring.jpa.properties.hibernate.hbm2ddl.auto=update
    

    启动:

    package com.xxxx.vertxspringboot;
    
    
    import com.xxxx.vertxspringboot.verticle.JpaProductVerticle;
    import com.xxxx.vertxspringboot.verticle.ProductServerVerticle;
    import io.vertx.core.Vertx;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    import javax.annotation.PostConstruct;
    
    @Slf4j
    @SpringBootApplication
    public class VertxspringbootApplication {
    
        @Autowired
        private ProductServerVerticle productServerVerticle;
    
        @Autowired
        private JpaProductVerticle jpaProductVerticle;
    
        public static void main(String[] args) {
            SpringApplication.run(VertxspringbootApplication.class, args);
        }
    
        @PostConstruct
        public void deployVerticle() {
            Vertx vertx = Vertx.vertx();
            vertx.deployVerticle(productServerVerticle);
            vertx.deployVerticle(jpaProductVerticle);
            vertx.exceptionHandler(throwable -> log.error("exception happened: {}", throwable.toString()));
            log.info("verticle deployed!!");
        }
    
    }
    
    
    package com.xxxx.vertxspringboot.verticle;
    
    import com.xxxx.vertxspringboot.utils.Constants;
    import io.vertx.core.AbstractVerticle;
    import io.vertx.core.eventbus.EventBus;
    import io.vertx.core.http.HttpServerResponse;
    import io.vertx.core.json.JsonObject;
    import io.vertx.ext.web.Router;
    import io.vertx.ext.web.RoutingContext;
    import io.vertx.ext.web.handler.BodyHandler;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Component;
    
    
    /**
     * @author nick
     */
    @Slf4j
    @Component
    public class ProductServerVerticle extends AbstractVerticle {
    
        @Override
        public void start() throws Exception {
            super.start();
            Router router = Router.router(vertx);
            router.route().handler(BodyHandler.create());
            router.get("/products/:productID").handler(this::handleGetProduct);
            router.put("/products/:productID").handler(this::handleAddProduct);
            router.get("/products/").handler(this::handleListProducts);
            router.delete("/products/:productID").handler(this::handleDeleteProduct);
            router.patch("/products/:productID").handler(this::handlePatchProduct);
            vertx.createHttpServer().requestHandler(router).listen(8080);
    //                .exceptionHandler(
    //                throwable -> log.error("HTTPServer error happened: {}", throwable.toString())
    //        );
        }
    
        private void getProductFailureHandler(RoutingContext failureRoutingContext){
            HttpServerResponse response = failureRoutingContext.response();
            response.end("something bad happened!");
        }
    
        private void handlePatchProduct(RoutingContext routingContext){
            String productId = routingContext.request().getParam("productID");
            log.info("productId used to patch product from request param: {}", productId);
            if (productId == null) {
                log.error("GET: the required productId is null!");
                sendError(400, routingContext.response());
                return;
            }
            JsonObject product = routingContext.getBodyAsJson();
            vertx.eventBus().<JsonObject>send(Constants.PATCH_PRODUCT_ADDRESS, product, asyncResult -> {
                log.info("result is: {}", asyncResult.result().body());
                if (asyncResult.succeeded()) {
                    log.info("handle PATCH_PRODUCT_ADDRESS success!!");
                    routingContext.response()
                            .putHeader("content-type", "application/json")
                            .setStatusCode(200)
                            .end(String.valueOf(asyncResult.result().body()));
                } else {
                    log.info("handle PATCH_PRODUCT_ADDRESS failed!!");
                    routingContext.response().setStatusCode(500).end();
                }
            });
        }
    
        private void handleDeleteProduct(RoutingContext routingContext){
            Integer productId = Integer.valueOf(routingContext.request().getParam("productID"));
            log.info("productId from request param: {}", productId);
            vertx.eventBus()
                    .<Integer>send(Constants.DELETE_PRODUCT_ADDRESS, productId, result -> {
                        log.info("result is: {}", result.result().body());
                        if (result.succeeded()) {
                            log.info("handle DELETE_PRODUCT_ADDRESS success!!");
                            routingContext.response()
                                    .putHeader("content-type", "application/json")
                                    .setStatusCode(200)
                                    .end(String.valueOf(result.result().body()));
                        } else {
                            log.info("handle DELETE_PRODUCT_ADDRESS failed!!");
                            routingContext.response().setStatusCode(500).end();
                        }
                    });
        }
    
        private void handleGetProduct(RoutingContext routingContext) {
            String productId = routingContext.request().getParam("productID");
            if (productId == null) {
                log.error("GET: the required productId is null!");
                sendError(400, routingContext.response());
                return;
            }
            vertx.eventBus().<String>send(Constants.GET_ONE_PRODUCT_ADDRESS, productId, asyncResult -> {
                log.info("Got one product by productId: {}", asyncResult);
                if (asyncResult.succeeded()) {
                    String body = asyncResult.result().body();
                    if ("null".equals(body)){
                        log.info("No product found by the given productId: {}", productId);
                        routingContext.response().setStatusCode(404).end();
                    }else {
                        log.info("handle GET_ONE_PRODUCT_ADDRESS success!!");
                        routingContext.response()
                                .putHeader("content-type", "application/json")
                                .setStatusCode(200)
                                .end(body);
                    }
    
                } else {
                    log.info("handle GET_ONE_PRODUCT_ADDRESS failed!!");
                    routingContext.response().setStatusCode(500).end();
                }
            });
        }
    
        private void handleAddProduct(RoutingContext routingContext) {
            String productID = routingContext.request().getParam("productID");
            if (productID == null) {
                log.error("PUT: the productId is null!");
                sendError(400, routingContext.response());
                return;
            }
            JsonObject product = routingContext.getBodyAsJson();
            vertx.eventBus().<JsonObject> send(Constants.ADD_PRODUCT_ADDRESS, product, asyncResult -> {
                log.info("added one product: {}", asyncResult);
                if (asyncResult.succeeded()) {
                    log.info("handle ADD_PRODUCT_ADDRESS success!!");
                    routingContext.response()
                            .putHeader("content-type", "application/json")
                            .setStatusCode(200)
                            .end(asyncResult.result().body().toString());
                } else {
                    log.info("handle ADD_PRODUCT_ADDRESS failed!!");
                    routingContext.response().setStatusCode(500).end();
                }
            });
        }
    
        private void handleListProducts(RoutingContext routingContext) {
            EventBus eventBus = vertx.eventBus();
            eventBus.<String>send(Constants.ALL_PRODUCTS_ADDRESS, "", asyncResult -> {
                log.info("result is: {}", asyncResult.result().body());
                if (asyncResult.succeeded()) {
                    log.info("handle  ALL_PRODUCTS_ADDRESS success!!");
                    routingContext.response()
                            .putHeader("content-type", "application/json")
                            .setStatusCode(200)
                            .end(asyncResult.result().body());
                } else {
                    log.info("handle ALL_PRODUCTS_ADDRESS failed!!");
                    routingContext.response().setStatusCode(500).end();
                }
            });
            log.info("ALL_PRODUCTS_ADDRESS Event already sent!");
        }
    
        private void sendError(int statusCode, HttpServerResponse response) {
            response.setStatusCode(statusCode).end();
        }
    }
    
    
    package com.xxxx.vertxspringboot.verticle;
    
    import com.xxxx.vertxspringboot.dto.ProductResult;
    import com.xxxx.vertxspringboot.service.ProductService;
    import com.xxxx.vertxspringboot.utils.Constants;
    import com.fasterxml.jackson.core.JsonProcessingException;
    import com.fasterxml.jackson.databind.ObjectMapper;
    import io.vertx.core.AbstractVerticle;
    import io.vertx.core.Handler;
    import io.vertx.core.eventbus.EventBus;
    import io.vertx.core.eventbus.Message;
    import io.vertx.core.json.Json;
    import io.vertx.core.json.JsonObject;
    import lombok.extern.slf4j.Slf4j;
    import lombok.val;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * @author nick
     */
    @Slf4j
    @Component
    public class JpaProductVerticle extends AbstractVerticle {
    
        private final static ObjectMapper objectMapper = Json.mapper;
    
        @Autowired
        private ProductService productService;
    
        private Handler<Message<String>> allProductsHandler(ProductService productService) {
            return msg -> vertx.<String>executeBlocking(future -> {
                        try {
                            future.complete(objectMapper.writeValueAsString(productService.getAllProduct()));
                            log.info("got all products from blocking...");
                        } catch (JsonProcessingException e) {
                            log.error("Failed to serialize result");
                            future.fail(e);
                        }
                    },
                    result -> {
                        if (result.succeeded()) {
                            msg.reply(result.result());
                        } else {
                            msg.reply(result.cause().toString());
                        }
                    });
        }
    
        private Handler<Message<String>> getOneProductHandler(ProductService productService) {
            return msg -> vertx.<String>executeBlocking(future -> {
                        try {
                            Integer productId = Integer.valueOf(msg.body());
                            log.info("productId from sender: {}", productId);
                            val productDTO = productService.findProductById(productId);
                            future.complete(objectMapper.writeValueAsString(productDTO));
                            log.info("got one product from blocking...");
                        } catch (Exception e) {
                            log.error("Failed to serialize result");
                            future.fail(e);
                        }
                    },
                    result -> {
                        if (result.succeeded()) {
                            msg.reply(result.result());
                        } else {
                            msg.reply(result.cause().toString());
                        }
                    });
        }
    
        @Override
        public void start() throws Exception {
            super.start();
            EventBus eventBus = vertx.eventBus();
            eventBus.<String>consumer(Constants.ALL_PRODUCTS_ADDRESS).handler(allProductsHandler(productService));
            eventBus.<String>consumer(Constants.GET_ONE_PRODUCT_ADDRESS).handler(getOneProductHandler(productService));
            eventBus.<JsonObject>consumer(Constants.ADD_PRODUCT_ADDRESS).handler(addProductHandler(productService));
            eventBus.<Integer>consumer(Constants.DELETE_PRODUCT_ADDRESS).handler(deleteProductHandler(productService));
            eventBus.<JsonObject>consumer(Constants.PATCH_PRODUCT_ADDRESS).handler(patchProductHandler(productService));
        }
    
        private Handler<Message<Integer>> deleteProductHandler(ProductService productService) {
    
            return message -> vertx.<Integer>executeBlocking(
                    future -> {
                        try {
                            Integer productId = message.body();
                            log.info("productId from sender: {}", productId);
                            future.complete(productService.deleteProduct(productId));
                            log.info("deleted one product from blocking...");
                        } catch (Exception e) {
                            log.error("Failed to serialize result");
                            future.fail(e);
                        }
                    },
                    result -> {
                        if (result.succeeded()) {
                            message.reply(result.result());
                        } else {
                            message.reply(result.cause().toString());
                        }
                    }
            );
        }
    
        private Handler<Message<JsonObject>> patchProductHandler(ProductService productService) {
    
            return message -> vertx.<JsonObject>executeBlocking(
                    future -> {
                        try {
                            JsonObject product = message.body();
                            log.info("product to be patched from sender: {}", product);
                            future.complete(JsonObject.mapFrom(productService.patchProduct(product.mapTo(ProductResult.class))));
                        } catch (Exception e) {
                            log.error("Failed to serialize result");
                            future.fail(e);
                        }
                    },
                    result -> {
                        if (result.succeeded()) {
                            message.reply(result.result());
                        } else {
                            message.reply(result.cause().toString());
                        }
                    }
            );
        }
    
        private Handler<Message<JsonObject>> addProductHandler(ProductService productService) {
    
            return message -> vertx.<JsonObject>executeBlocking(
                    future -> {
                        try {
                            JsonObject product = message.body();
                            log.info("product from sender: {}", product);
                            future.complete(JsonObject.mapFrom(productService.addProduct(product.mapTo(ProductResult.class))));
                            log.info("got one product from blocking...");
                        } catch (Exception e) {
                            log.error("Failed to serialize result");
                            future.fail(e);
                        }
                    },
                    result -> {
                        if (result.succeeded()) {
                            message.reply(result.result());
                        } else {
                            message.reply(result.cause().toString());
                        }
                    }
            );
        }
    
        private Handler<Message<String>> getAllService() {
            return msg -> vertx.<String>executeBlocking(future -> {
                log.info("try to get json.....");
                try {
                    log.info("get json success..");
                    future.complete(new JsonObject().put("name", "wade").toString());
                } catch (Exception e) {
                    log.info("Failed to serialize result");
                    future.fail(e);
                }
            }, result -> {
                if (result.succeeded()) {
                    msg.reply(result.result());
                } else {
                    msg.reply(result.cause()
                            .toString());
                }
            });
        }
    }
    
    
    package com.xxxx.vertxspringboot.utils;
    
    public class Constants {
    
        public static final String ALL_PRODUCTS_ADDRESS = "all-products-address";
    
        public static final String GET_ONE_PRODUCT_ADDRESS = "get-one-product-address";
    
        public static final String ADD_PRODUCT_ADDRESS = "add-product-address";
    
        public static final String DELETE_PRODUCT_ADDRESS = "delete-product-address";
    
        public static final String PATCH_PRODUCT_ADDRESS = "patch-product-address";
    }
    
    
    package com.xxxx.vertxspringboot.service;
    
    import com.xxxx.vertxspringboot.dto.ProductResult;
    import com.xxxx.vertxspringboot.entity.Product;
    import com.xxxx.vertxspringboot.repository.ProductRepository;
    import lombok.extern.slf4j.Slf4j;
    import lombok.val;
    import org.springframework.beans.BeanUtils;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import org.springframework.util.CollectionUtils;
    import org.springframework.util.StringUtils;
    
    import javax.transaction.Transactional;
    import java.util.Collections;
    import java.util.List;
    import java.util.Objects;
    import java.util.stream.Collectors;
    
    /**
     * @author nick
     */
    @Slf4j
    @Service
    @Transactional
    public class ProductService {
    
        @Autowired
        private ProductRepository productRepository;
    
        /**
         * 获取全部数据
         * @return
         */
        public List<ProductResult> getAllProduct() {
            val entityList = productRepository.findAll();
            if (CollectionUtils.isEmpty(entityList)){
                return Collections.EMPTY_LIST;
            }
            return entityList.stream().map(Product::getProductResult).collect(Collectors.toList());
        }
    
        /**
         * 查询单个数据Constants
         * @param productId
         * @return
         */
        public ProductResult findProductById(Integer productId){
            if (StringUtils.isEmpty(productId)){
                return null;
            }
            val entity = productRepository.findById(productId);
            return Objects.isNull(entity) ? null :  entity.map(Product::getProductResult).get();
        }
    
        /**
         * 添加
         * @param productResult
         * @return
         */
        public ProductResult addProduct(ProductResult productResult){
            Product toBeSavedEntity = new Product();
            BeanUtils.copyProperties(productResult, toBeSavedEntity);
            val savedEntity = productRepository.save(toBeSavedEntity);
            return savedEntity.getProductResult();
        }
    
        /**
         * 删除数据
         * @param productId
         * @return
         */
        public Integer deleteProduct(Integer productId){
            productRepository.deleteById(productId);
            return productId;
        }
    
    
        public ProductResult patchProduct(ProductResult productResult) {
            val entity = productRepository.findById(productResult.getProductId()).get();
            entity.setProductName(productResult.getProductName());
            entity.setDescription(productResult.getDescription());
            val saved = productRepository.save(entity);
            return saved.getProductResult();
        }
    
    }
    
    
    package com.xxxx.vertxspringboot.repository;
    
    import com.xxxx.vertxspringboot.entity.Product;
    import org.springframework.data.jpa.repository.JpaRepository;
    import org.springframework.stereotype.Repository;
    
    @Repository
    public interface ProductRepository extends JpaRepository<Product, Integer> {
    
    }
    
    
    package com.xxxx.vertxspringboot.entity;
    
    
    import com.xxxx.vertxspringboot.dto.ProductResult;
    import lombok.Data;
    import lombok.val;
    import org.springframework.beans.BeanUtils;
    
    import javax.persistence.Column;
    import javax.persistence.Entity;
    import javax.persistence.Id;
    import javax.persistence.Table;
    
    
    /**
     * @author nick
     */
    @Data
    @Entity
    @Table(name = "product")
    public class Product {
    
        @Id
        @Column(name = "product_id")
        private Integer productId;
    
        @Column(name = "product_name")
        private String productName;
    
        @Column(name = "description")
        private String description;
    
        /**
         * 将数据对象转成返给前端对象
         * @return
         */
        public ProductResult getProductResult(){
            val productDTO = new ProductResult();
            BeanUtils.copyProperties(this, productDTO);
            return productDTO;
        }
    
    }
    
    
    package com.xxxx.vertxspringboot.dto;
    
    import lombok.Data;
    
    @Data
    public class ProductResult {
    
        private Integer productId;
    
        private String description;
    
        private String productName;
    }
    
    

    sql:

    -- ----------------------------
    -- Table structure for product
    -- ----------------------------
    DROP TABLE IF EXISTS `product`;
    CREATE TABLE `product` (
      `product_id` int(11) NOT NULL,
      `product_name` varchar(45) NOT NULL,
      `description` varchar(45) DEFAULT NULL,
      PRIMARY KEY (`product_id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    -- ----------------------------
    -- Records of product
    -- ----------------------------
    INSERT INTO `product` VALUES ('1', '张三', '测试中');
    
    展开全文
  • vertx_example+springboot_example+maven 开发模板
  • 一、创建SpringBoot工程 二、 Vertx 实现http服务器、路由器的使用 1. 用vertx实现http服务器 添加依赖: 工程的创建可以创建两种 : 第一种:Gradle方式 第二种: Maven方式 以上两种vertx都支持: ...

    个人博客网:https://wushaopei.github.io/    (你想要这里多有)

    一、创建SpringBoot工程

    二、 Vertx 实现http服务器、路由器的使用

    1.  用vertx实现http服务器

    添加依赖:

    工程的创建可以创建两种 :

    第一种:Gradle方式

    第二种: Maven方式

    以上两种vertx都支持:

    java代码:

    
    public class Demo1 {
    
        //用vertx实现http服务器
        public static void main(String[] args){
    
            //vertx核心
            Vertx vertx = Vertx.vertx();
    
            //创建http的server
            HttpServer httpServer = vertx.createHttpServer();
            //服务器响应
            httpServer.requestHandler(request -> {
                request.response().end("Hello Itheima!");
            });
    
            httpServer.listen(8080);
        }
    }

    这里使用IDE本身自带的测试工具进行测试:

    测试结果:

    2. 自定义请求头数据

    添加响应依赖:

    java代码:

    public class Demo2 {
    
        public static void main(String [] args){
    
    
            Vertx vertx = Vertx.vertx();
    
            //web路由器
            Router router = Router.router(vertx);
    
            //接口地址为“/”
            router.route("/").handler(context -> {
                context.request();
                context.response().end("hello root!");
            });
    
            //接口地址为 “/abc”
            router.route("/abc").handler((RoutingContext context) ->{
                HttpServerRequest request = context.request();
                String name = request.getParam("name");
                String itheima = request.getHeader("itheima");
    
                context.response().end("hello abc!"+ name+ " " + itheima);
            });
    
    
            HttpServer httpServer = vertx.createHttpServer();
            //处理request(监听请求的接口地址)
            httpServer.requestHandler(router::accept);
            //监听请求的端口
            httpServer.listen(8080);
        }
    }

    httpServer 指定请求的handler,由路由器进行监听请求,在httpServer声明监听的端口号。当“/”时,路由到第一个handler请求,响应 hello root ; 当“/abc”时,路由到第二个router,响应itheima。

    测试中 指定响应头参数:

    当请求为 itheima 时,自动根据已设定好的headers的Name/value,进行value的响应。

    3. Vertx应用的 get 、post请求

    public class Demo3 {
    
        public static void main(String [] args){
    
            Vertx vertx = Vertx.vertx();
    
            //Web路由器
            Router router = Router.router(vertx);
            //get请求
            router.get("/get").handler(context -> {
               context.response().end("hello get!");
            });
            //post请求
            router.post("/post").handler(context -> {
               context.response().end("hello post!");
            });
    
            HttpServer httpServer = vertx.createHttpServer();
            //处理request
            httpServer.requestHandler(router::accept);
            httpServer.listen(8080);
        }
    }
    

    4 、 请求接口的同步、 异步处理请求

    
    public class Demo4 {
    
        public static void main(String [] args){
    
            Vertx vertx = Vertx.vertx();
    
            //Web路由器
            Router router = Router.router(vertx);
            router.get("/sync").handler(context -> {
               //同步处理请求
               context.response().end("hello get!");
                System.err.println("sync: " + Thread.currentThread());
            });
            router.get("/async").blockingHandler(context -> {
               //异步处理请求
                //执行耗时操作
                //数据库访问
                //服务访问
               context.response().end("hello *!");
                System.err.println("async: " + Thread.currentThread());
            });
    
            HttpServer httpServer = vertx.createHttpServer();
            //处理request
            httpServer.requestHandler(router::accept);
            httpServer.listen(8080);
    
        }
    
    }
    

    5.对用户的请求解析时间(测试给定的请求量时vertx处理所消耗的时间)

    
    public class Demo5 {
    
        public static void main(String[] args) {
    
            Vertx vertx = Vertx.vertx();
    
            //web路由器
            Router router = Router.router(vertx);
            router.get("/vertx").blockingHandler(context -> {
    
                long start = System.currentTimeMillis();
                Pi.computePi(20000);
                long end = System.currentTimeMillis();
    
                //同步处理请求
                context.response().end("hello" + (end - start));
    
            });
    
            HttpServer httpServer = vertx.createHttpServer();
            httpServer.requestHandler(router::accept);
            httpServer.listen(8080);
        }
    }

     

    对应的PI:

    
    public class Pi {
    
        private static final BigDecimal FOUR = BigDecimal.valueOf(4);
        private static final int roundingMode = BigDecimal.ROUND_HALF_EVEN;
    
        public static BigDecimal computePi(int digits){
            int scale = digits + 5;
            BigDecimal arctan1_5 = arctan(5,scale);
            BigDecimal arctan1_239 = arctan(239,scale);
            BigDecimal pi = arctan1_5.multiply(FOUR).subtract(arctan1_239).multiply(FOUR);
            return pi.setScale(digits,BigDecimal.ROUND_HALF_UP);
    
        }
    
        private static BigDecimal arctan(int inverseX, int scale) {
    
            BigDecimal result, numer, term;
            BigDecimal invX = BigDecimal.valueOf(inverseX);
            BigDecimal invx2 = BigDecimal.valueOf(inverseX * inverseX);
            numer = BigDecimal.ONE.divide(invX,scale,roundingMode);
            result = numer;
            int i =1;
            do{
                numer = numer.divide(invx2,scale,roundingMode);
                int denom = 2 * i + 1;
                term = numer.divide(BigDecimal.valueOf(denom),scale,roundingMode);
                if((i % 2) != 0){
                    result = result.subtract(term);
                }else {
                    result = result.add(term);
                }
                i++;
            }while (term.compareTo(BigDecimal.ZERO) != 0);
            return result;
        }
    }
    

    代码地址:

    链接:https://pan.baidu.com/s/1SWkSqCxLrg-FD4UZqlhwSg
    提取码:3vgj
     

    展开全文
  • <p>I am preparing a demo to show how to embed Vertx and Apex in a springboot application. The small demo code can be found here: https://github.com/ufoscout/demo-springboot-vertx3</p> <p>The ...
  • 1、在eclipse中启动调试背景:不想用springboot,想尽量的简单,高效。调试时没有main启动函数。写一个就是了,怎么写?样例如下:public class UserApp {public static void main(final String... args) {Vertx ...
  • Vertx getting start

    2021-01-07 21:21:38
    t want to compare it with springboot but they have a great consistencies between examples, like if you explore JPA example and then explore rest repositories, you would find absolute same packaging ...
  • Vert.x-Web与SpringBoot整合

    万次阅读 2018-08-17 14:33:12
    本文主要介绍博主整理开发的VertxSpringBoot整合的脚手架项目。项目旨在便于现有spring项目与Vert.x整合,既能体验Vert.x的高效又兼顾spring的泛用性,降低Vert.x的学习和使用成本,可以做到spring到Vert.x...
  • SpringBoot负责脚手架 Vert.x负责Web服务 Github地址:https://github.com/cklogic/blog/tree/master/vertx-spingboot-demo 运行打开浏览器输入 http://localhost:8888/ 会显示ok,这里使用了...
  • SpringBoot是最近几年比较流行的web应用开发框架,它是微服务的一个开发框架。它的Web服务器内核为Tomcat或Jetty,它们作为Servlet容量来对客户端的http/https请求进行解析。最近,spring.io又出推出一套新的服务器...
  • 整体目录结构 SQL脚本 /* Navicat MySQL Data Transfer Source Server : localhost_3306 Source Server Version : 50723 Source Host : localhost:3306 Source Database : vertx-test Tar...
  • Tomcat 6/7/8/9, Jetty 8/9, JBoss EAP 6/7, Resin 4, Websphere 6/7/8, Vertx 3.3/3.4/3.5, Weblogic 10/11g/12c, Undertow Spring, Spring Boot (Embedded Tomcat, Jetty, Undertow), Spring asynchronous ...
  • <p>Also adding at the same time vertx-jvm , similar to springboot-jvm (micrometer as well) <p>There's some documentation update related to this PR: ...
  • 1、Lambda表达式 Vertx全异步框架 springboot2.5全异步 Lambda表达式是一个匿名函数,没有名字的函数,作用是简化了代码的实现过程,说白了就是简约。 Lambda表达式的形式 语法: ()->{表达式} ()代表参数 ->...

空空如也

空空如也

1 2 3
收藏数 45
精华内容 18
关键字:

springbootvertx

spring 订阅