精华内容
下载资源
问答
  • 有一定网络知识的朋友都有自己的判断方法,但入门级的朋友怎么办?这里介绍的是一种不需要任何网络知识的判断方法:用MSN Messenger帮助CoolGate用户了解网络接入情况--MSN Messenger的“高级连接信息”(注意:...

    有一定网络知识的朋友都有自己的判断方法,但入门级的朋友怎么办?这里介绍的是一种不需要任何网络知识的判断方法:用MSN Messenger帮助CoolGate用户了解网络接入情况看--MSN Messenger的“高级连接信息”(注意:这里用的是MSN Messenger,不是Windows Messenger。Windows Messenger高级连接信息与MSN Messenger的不一样。),它位于MSN Messenger 菜单的“工具”->“选项”->“连接”中。 高级连接信息有以下几种类型组成:

    “直接连接”
    表示用户是公网,没有使用Windows XP自带的Internet连接防火墙(ICF),不需要使用UPnP。但这种情况不排除用户安装了第三方的网络防火墙软件,如诺顿网络安全特警2002/2003/2004系列、国内著名的天网防火墙等等。

    “直接连接,使用了Internet连接防火墙(ICF)”
    表示用户是公网,使用了Windows XP自带的Internet连接防火墙(ICF),不是一定需要使用UPnP。可以在ICF的高级设置中为CoolGate手动打开相应监听的端口。

    “通用即插即用(UPnP)网络地址转换(NAT)”
    表示用户是内网,经由网关或路由器上网,网关或路由器支持的NAT支持UPnP(而且打开,目前支持UPnP的网关类软件有Windows XP的ICS、KERIO的WinRoute Firewall 5系列)。

    “非通用即插即用(UPnP)网络地址转换(NAT)”、“非对称NAT”、“对称NAT”、“级联”等等
    表示用户是内网,经由网关或路由器上网,网关或路由器的NAT不支持UPnP,或虽然支持UPnP,但UPnP被ISP、网络管理员关闭了。

    “非UPnP防火墙”
    通常与NAT类型一起出现,表示用户是内网,经由网关或路由器上网,网关或由器上使用不支持UPnP的网络防火墙,如诺顿网络安全特警2002等等。

    “UPnP防火墙”
    通常与NAT类型一起出现,表示用户是内网,经由网关或路由器上网,网关或由器上使用支持UPnP的网络防火墙,如诺顿网络安全特警2003、Windows XP的ICF等等。

    展开全文
  • ​ 最近在github上了soul网关的设计,突然就来了兴趣准备自己从零开始写一个高性能的网关。经过两周时间的开发,我的网关ship-gate核心功能基本都已完成,最大的缺陷就是前端功底太差没有管理后台????。 二、设计 ...

    **

    如何才能设计一个性能稳定好用的网关

    **
    一、前言

    ​ 最近在github上看了soul网关的设计,突然就来了兴趣准备自己从零开始写一个高性能的网关。经过两周时间的开发,我的网关ship-gate核心功能基本都已完成,最大的缺陷就是前端功底太差没有管理后台😤。

    二、设计

    2.1技术选型

    网关是所有请求的入口,所以要求有很高的吞吐量,为了实现这点可以使用请求异步化来解决。目前一般有以下两种方案:

    Tomcat/Jetty+NIO+Servlet3
    Servlet3已经支持异步,这种方案使用比较多,京东,有赞和Zuul,都用的是这种方案。

    Netty+NIO
    Netty为高并发而生,目前唯品会的网关使用这个策略,在唯品会的技术文章中在相同的情况下Netty是每秒30w+的吞吐量,Tomcat是13w+,可以看出是有一定的差距的,但是Netty需要自己处理HTTP协议,这一块比较麻烦。

    后面发现Soul网关是基于Spring WebFlux(底层Netty)的,不用太关心HTTP协议的处理,于是决定也用Spring WebFlux。

    网关的第二个特点是具备可扩展性,比如Netflix Zuul有preFilters,postFilters等在不同的阶段方便处理不同的业务,基于责任链模式将请求进行链式处理即可实现。

    在微服务架构下,服务都会进行多实例部署来保证高可用,请求到达网关时,网关需要根据URL找到所有可用的实例,这时就需要服务注册和发现功能,即注册中心。

    现在流行的注册中心有Apache的Zookeeper和阿里的Nacos两种(consul有点小众),因为之前写RPC框架时已经用过了Zookeeper,所以这次就选择了Nacos。

    2.2需求清单

    首先要明确目标,即开发一个具备哪些特性的网关,总结下后如下:

    自定义路由规则

    可基于version的路由规则设置,路由对象包括DEFAUL,HEADER和QUERY三种,匹配方式包括=、regex、like三种。

    跨语言

    HTTP协议天生跨语言

    高性能

    Netty本身就是一款高性能的通信框架,同时server将一些路由规则等数据缓存到JVM内存避免请求admin服务。

    高可用

    支持集群模式防止单节点故障,无状态。

    灰度发布

    灰度发布(又名金丝雀发布)是指在黑与白之间,能够平滑过渡的一种发布方式。在其上可以进行A/B testing,即让一部分用户继续用产品特性A,一部分用户开始用产品特性B,如果用户对B没有什么反对意见,那么逐步扩大范围,把所有用户都迁移到B上面来。通过特性一可以实现。

    接口鉴权

    基于责任链模式,用户开发自己的鉴权插件即可。

    负载均衡

    支持多种负载均衡算法,如随机,轮询,加权轮询等。利用SPI机制可以根据配置进行动态加载。

    2.3架构设计

    在参考了一些优秀的网关Zuul,Spring Cloud Gateway,Soul后,将项目划分为以下几个模块。

    名称 描述
    ship-admin 后台管理界面,配置路由规则等
    ship-server 网关服务端,核心功能模块
    ship-client-spring-boot-starter 网关客户端,自动注册服务信息到注册中心
    ship-common 一些公共的代码,如pojo,常量等。
    它们之间的关系如图:

    网关设计

    注意:这张图与实际实现有点出入,Nacos push到本地缓存的那个环节没有实现,目前只有ship-sever定时轮询pull的过程。ship-admin从Nacos获取注册服务信息的过程,也改成了ServiceA启动时主动发生HTTP请求通知ship-admin。

    2.4表结构设计

    三、编码

    3.1 ship-client-spring-boot-starter

    首先创建一个spring-boot-starter命名为ship-client-spring-boot-starter,不知道如何自定义starter的可以看我以前写的《开发自己的starter》。

    其核心类 AutoRegisterListener 就是在项目启动时做了两件事:

    1.将服务信息注册到Nacos注册中心

    2.通知ship-admin服务上线了并注册下线hook。

    代码如下:

    /**

    • Created by 2YSP on 2020/12/21
      */
      public class AutoRegisterListener implements ApplicationListener {

      private final static Logger LOGGER = LoggerFactory.getLogger(AutoRegisterListener.class);

      private volatile AtomicBoolean registered = new AtomicBoolean(false);

      private final ClientConfigProperties properties;

      @NacosInjected
      private NamingService namingService;

      @Autowired
      private RequestMappingHandlerMapping handlerMapping;

      private final ExecutorService pool;

      /**

      • url list to ignore
        */
        private static List ignoreUrlList = new LinkedList<>();

      static {
      ignoreUrlList.add("/error");
      }

      public AutoRegisterListener(ClientConfigProperties properties) {
      if (!check(properties)) {
      LOGGER.error(“client config port,contextPath,appName adminUrl and version can’t be empty!”);
      throw new ShipException(“client config port,contextPath,appName adminUrl and version can’t be empty!”);
      }
      this.properties = properties;
      pool = new ThreadPoolExecutor(1, 4, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
      }

      /**

      • check the ClientConfigProperties
      • @param properties
      • @return
        */
        private boolean check(ClientConfigProperties properties) {
        if (properties.getPort() == null || properties.getContextPath() == null
        || properties.getVersion() == null || properties.getAppName() == null
        || properties.getAdminUrl() == null) {
        return false;
        }
        return true;
        }

      @Override
      public void onApplicationEvent(ContextRefreshedEvent event) {
      if (!registered.compareAndSet(false, true)) {
      return;
      }
      doRegister();
      registerShutDownHook();
      }

      /**

      • send unregister request to admin when jvm shutdown
        */
        private void registerShutDownHook() {
        final String url = “http://” + properties.getAdminUrl() + AdminConstants.UNREGISTER_PATH;
        final UnregisterAppDTO unregisterAppDTO = new UnregisterAppDTO();
        unregisterAppDTO.setAppName(properties.getAppName());
        unregisterAppDTO.setVersion(properties.getVersion());
        unregisterAppDTO.setIp(IpUtil.getLocalIpAddress());
        unregisterAppDTO.setPort(properties.getPort());
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
        OkhttpTool.doPost(url, unregisterAppDTO);
        LOGGER.info("[{}:{}] unregister from ship-admin success!", unregisterAppDTO.getAppName(), unregisterAppDTO.getVersion());
        }));
        }

      /**

      • register all interface info to register center
        */
        private void doRegister() {
        Instance instance = new Instance();
        instance.setIp(IpUtil.getLocalIpAddress());
        instance.setPort(properties.getPort());
        instance.setEphemeral(true);
        Map<String, String> metadataMap = new HashMap<>();
        metadataMap.put(“version”, properties.getVersion());
        metadataMap.put(“appName”, properties.getAppName());
        instance.setMetadata(metadataMap);
        try {
        namingService.registerInstance(properties.getAppName(), NacosConstants.APP_GROUP_NAME, instance);
        } catch (NacosException e) {
        LOGGER.error(“register to nacos fail”, e);
        throw new ShipException(e.getErrCode(), e.getErrMsg());
        }
        LOGGER.info(“register interface info to nacos success!”);
        // send register request to ship-admin
        String url = “http://” + properties.getAdminUrl() + AdminConstants.REGISTER_PATH;
        RegisterAppDTO registerAppDTO = buildRegisterAppDTO(instance);
        OkhttpTool.doPost(url, registerAppDTO);
        LOGGER.info(“register to ship-admin success!”);
        }

      private RegisterAppDTO buildRegisterAppDTO(Instance instance) {
      RegisterAppDTO registerAppDTO = new RegisterAppDTO();
      registerAppDTO.setAppName(properties.getAppName());
      registerAppDTO.setContextPath(properties.getContextPath());
      registerAppDTO.setIp(instance.getIp());
      registerAppDTO.setPort(instance.getPort());
      registerAppDTO.setVersion(properties.getVersion());
      return registerAppDTO;
      }
      }

    3.2 ship-server

    ship-sever项目主要包括了两个部分内容,
    1.请求动态路由的主流程
    2.本地缓存数据和ship-admin及nacos同步,这部分在后面3.3再讲。

    ship-server实现动态路由的原理是利用WebFilter拦截请求,然后将请求教给plugin chain去链式处理。

    PluginFilter根据URL解析出appName,然后将启用的plugin组装成plugin chain。

    public class PluginFilter implements WebFilter {

    private ServerConfigProperties properties;
    
    public PluginFilter(ServerConfigProperties properties) {
        this.properties = properties;
    }
    
    @Override
    public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
        String appName = parseAppName(exchange);
        if (CollectionUtils.isEmpty(ServiceCache.getAllInstances(appName))) {
            throw new ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);
        }
        PluginChain pluginChain = new PluginChain(properties, appName);
        pluginChain.addPlugin(new DynamicRoutePlugin(properties));
        pluginChain.addPlugin(new AuthPlugin(properties));
        return pluginChain.execute(exchange, pluginChain);
    }
    
    private String parseAppName(ServerWebExchange exchange) {
        RequestPath path = exchange.getRequest().getPath();
        String appName = path.value().split("/")[1];
        return appName;
    }
    

    }
    PluginChain继承了AbstractShipPlugin并持有所有要执行的插件。

    /**

    • @Author: Ship

    • @Description:

    • @Date: Created in 2020/12/25
      /
      public class PluginChain extends AbstractShipPlugin {
      /
      *

      • the pos point to current plugin
        /
        private int pos;
        /
        *
      • the plugins of chain
        */
        private List plugins;

      private final String appName;

      public PluginChain(ServerConfigProperties properties, String appName) {
      super(properties);
      this.appName = appName;
      }

      /**

      • add enabled plugin to chain
      • @param shipPlugin
        */
        public void addPlugin(ShipPlugin shipPlugin) {
        if (plugins == null) {
        plugins = new ArrayList<>();
        }
        if (!PluginCache.isEnabled(appName, shipPlugin.name())) {
        return;
        }
        plugins.add(shipPlugin);
        // order by the plugin’s order
        plugins.sort(Comparator.comparing(ShipPlugin::order));
        }

      @Override
      public Integer order() {
      return null;
      }

      @Override
      public String name() {
      return null;
      }

      @Override
      public Mono execute(ServerWebExchange exchange, PluginChain pluginChain) {
      if (pos == plugins.size()) {
      return exchange.getResponse().setComplete();
      }
      return pluginChain.plugins.get(pos++).execute(exchange, pluginChain);
      }

      public String getAppName() {
      return appName;
      }

    }

    AbstractShipPlugin实现了ShipPlugin接口,并持有ServerConfigProperties配置对象。

    public abstract class AbstractShipPlugin implements ShipPlugin {

    protected ServerConfigProperties properties;
    
    public AbstractShipPlugin(ServerConfigProperties properties) {
        this.properties = properties;
    }
    

    }
    ShipPlugin接口定义了所有插件必须实现的三个方法order(),name()和execute()。

    public interface ShipPlugin {
    /**
    * lower values have higher priority
    *
    * @return
    */
    Integer order();

    /**
     * return current plugin name
     *
     * @return
     */
    String name();
    
    Mono<Void> execute(ServerWebExchange exchange,PluginChain pluginChain);
    

    }
    DynamicRoutePlugin继承了抽象类AbstractShipPlugin,包含了动态路由的主要业务逻辑。

    /**

    • @Author: Ship

    • @Description:

    • @Date: Created in 2020/12/25
      */
      public class DynamicRoutePlugin extends AbstractShipPlugin {

      private final static Logger LOGGER = LoggerFactory.getLogger(DynamicRoutePlugin.class);

      private static WebClient webClient;

      private static final Gson gson = new GsonBuilder().create();

      static {
      HttpClient httpClient = HttpClient.create()
      .tcpConfiguration(client ->
      client.doOnConnected(conn ->
      conn.addHandlerLast(new ReadTimeoutHandler(3))
      .addHandlerLast(new WriteTimeoutHandler(3)))
      .option(ChannelOption.TCP_NODELAY, true)
      );
      webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient))
      .build();
      }

      public DynamicRoutePlugin(ServerConfigProperties properties) {
      super(properties);
      }

      @Override
      public Integer order() {
      return ShipPluginEnum.DYNAMIC_ROUTE.getOrder();
      }

      @Override
      public String name() {
      return ShipPluginEnum.DYNAMIC_ROUTE.getName();
      }

      @Override
      public Mono execute(ServerWebExchange exchange, PluginChain pluginChain) {
      String appName = pluginChain.getAppName();
      ServiceInstance serviceInstance = chooseInstance(appName, exchange.getRequest());
      // LOGGER.info(“selected instance is [{}]”, gson.toJson(serviceInstance));
      // request service
      String url = buildUrl(exchange, serviceInstance);
      return forward(exchange, url);
      }

      /**

      • forward request to backend service

      • @param exchange

      • @param url

      • @return
        */
        private Mono forward(ServerWebExchange exchange, String url) {
        ServerHttpRequest request = exchange.getRequest();
        ServerHttpResponse response = exchange.getResponse();
        HttpMethod method = request.getMethod();

        WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(url).headers((headers) -> {
        headers.addAll(request.getHeaders());
        });

        WebClient.RequestHeadersSpec<?> reqHeadersSpec;
        if (requireHttpBody(method)) {
        reqHeadersSpec = requestBodySpec.body(BodyInserters.fromDataBuffers(request.getBody()));
        } else {
        reqHeadersSpec = requestBodySpec;
        }
        // nio->callback->nio
        return reqHeadersSpec.exchange().timeout(Duration.ofMillis(properties.getTimeOutMillis()))
        .onErrorResume(ex -> {
        return Mono.defer(() -> {
        String errorResultJson = “”;
        if (ex instanceof TimeoutException) {
        errorResultJson = “{“code”:5001,“message”:“network timeout”}”;
        } else {
        errorResultJson = “{“code”:5000,“message”:“system error”}”;
        }
        return ShipResponseUtil.doResponse(exchange, errorResultJson);
        }).then(Mono.empty());
        }).flatMap(backendResponse -> {
        response.setStatusCode(backendResponse.statusCode());
        response.getHeaders().putAll(backendResponse.headers().asHttpHeaders());
        return response.writeWith(backendResponse.bodyToFlux(DataBuffer.class));
        });
        }

      /**

      • weather the http method need http body
      • @param method
      • @return
        */
        private boolean requireHttpBody(HttpMethod method) {
        if (method.equals(HttpMethod.POST) || method.equals(HttpMethod.PUT) || method.equals(HttpMethod.PATCH)) {
        return true;
        }
        return false;
        }

      private String buildUrl(ServerWebExchange exchange, ServiceInstance serviceInstance) {
      ServerHttpRequest request = exchange.getRequest();
      String query = request.getURI().getQuery();
      String path = request.getPath().value().replaceFirst("/" + serviceInstance.getAppName(), “”);
      String url = “http://” + serviceInstance.getIp() + “:” + serviceInstance.getPort() + path;
      if (!StringUtils.isEmpty(query)) {
      url = url + “?” + query;
      }
      return url;
      }

      /**

      • choose an ServiceInstance according to route rule config and load balancing algorithm
      • @param appName
      • @param request
      • @return
        */
        private ServiceInstance chooseInstance(String appName, ServerHttpRequest request) {
        List serviceInstances = ServiceCache.getAllInstances(appName);
        if (CollectionUtils.isEmpty(serviceInstances)) {
        LOGGER.error(“service instance of {} not find”, appName);
        throw new ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);
        }
        String version = matchAppVersion(appName, request);
        if (StringUtils.isEmpty(version)) {
        throw new ShipException(“match app version error”);
        }
        // filter serviceInstances by version
        List instances = serviceInstances.stream().filter(i -> i.getVersion().equals(version)).collect(Collectors.toList());
        //Select an instance based on the load balancing algorithm
        LoadBalance loadBalance = LoadBalanceFactory.getInstance(properties.getLoadBalance(), appName, version);
        ServiceInstance serviceInstance = loadBalance.chooseOne(instances);
        return serviceInstance;
        }

      private String matchAppVersion(String appName, ServerHttpRequest request) {
      List rules = RouteRuleCache.getRules(appName);
      rules.sort(Comparator.comparing(AppRuleDTO::getPriority).reversed());
      for (AppRuleDTO rule : rules) {
      if (match(rule, request)) {
      return rule.getVersion();
      }
      }
      return null;
      }

      private boolean match(AppRuleDTO rule, ServerHttpRequest request) {
      String matchObject = rule.getMatchObject();
      String matchKey = rule.getMatchKey();
      String matchRule = rule.getMatchRule();
      Byte matchMethod = rule.getMatchMethod();
      if (MatchObjectEnum.DEFAULT.getCode().equals(matchObject)) {
      return true;
      } else if (MatchObjectEnum.QUERY.getCode().equals(matchObject)) {
      String param = request.getQueryParams().getFirst(matchKey);
      if (!StringUtils.isEmpty(param)) {
      return StringTools.match(param, matchMethod, matchRule);
      }
      } else if (MatchObjectEnum.HEADER.getCode().equals(matchObject)) {
      HttpHeaders headers = request.getHeaders();
      String headerValue = headers.getFirst(matchKey);
      if (!StringUtils.isEmpty(headerValue)) {
      return StringTools.match(headerValue, matchMethod, matchRule);
      }
      }
      return false;
      }

    }

    3.3 数据同步

    app数据同步
    http://www.
    http://www.
    http://www.
    http://www.
    http://www.
    http://www.
    http://www.
    http://www.
    http://www.
    http://www.
    http://www.
    http://www.
    http://www.
    http://www.
    http://www.
    http://www.
    http://www.
    http://www.
    http://www.
    http://www.
    http://www.
    http://www.
    http://www.
    http://www.
    http://www.
    http://www.
    http://www.
    http://www.
    http://www.
    后台服务(如订单服务)启动时,只将服务名,版本,ip地址和端口号注册到了Nacos,并没有实例的权重和启用的插件信息怎么办?

    一般在线的实例权重和插件列表都是在管理界面配置,然后动态生效的,所以需要ship-admin定时更新实例的权重和插件信息到注册中心。

    对应代码ship-admin的NacosSyncListener

    /**

    • @Author: Ship

    • @Description:

    • @Date: Created in 2020/12/30
      */
      @Configuration
      public class NacosSyncListener implements ApplicationListener {

      private static final Logger LOGGER = LoggerFactory.getLogger(NacosSyncListener.class);

      private static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1,
      new ShipThreadFactory(“nacos-sync”, true).create());

      @NacosInjected
      private NamingService namingService;

      @Value("${nacos.discovery.server-addr}")
      private String baseUrl;

      @Resource
      private AppService appService;

      @Override
      public void onApplicationEvent(ContextRefreshedEvent event) {
      if (event.getApplicationContext().getParent() != null) {
      return;
      }
      String url = “http://” + baseUrl + NacosConstants.INSTANCE_UPDATE_PATH;
      scheduledPool.scheduleWithFixedDelay(new NacosSyncTask(namingService, url, appService), 0, 30L, TimeUnit.SECONDS);
      }

      class NacosSyncTask implements Runnable {

       private NamingService namingService;
      
       private String url;
      
       private AppService appService;
      
       private Gson gson = new GsonBuilder().create();
      
       public NacosSyncTask(NamingService namingService, String url, AppService appService) {
           this.namingService = namingService;
           this.url = url;
           this.appService = appService;
       }
      
       /**
        * Regular update weight,enabled plugins to nacos instance
        */
       @Override
       public void run() {
           try {
               // get all app names
               ListView<String> services = namingService.getServicesOfServer(1, Integer.MAX_VALUE, NacosConstants.APP_GROUP_NAME);
               if (CollectionUtils.isEmpty(services.getData())) {
                   return;
               }
               List<String> appNames = services.getData();
               List<AppInfoDTO> appInfos = appService.getAppInfos(appNames);
               for (AppInfoDTO appInfo : appInfos) {
                   if (CollectionUtils.isEmpty(appInfo.getInstances())) {
                       continue;
                   }
                   for (ServiceInstance instance : appInfo.getInstances()) {
                       Map<String, Object> queryMap = buildQueryMap(appInfo, instance);
                       String resp = OkhttpTool.doPut(url, queryMap, "");
                       LOGGER.debug("response :{}", resp);
                   }
               }
      
           } catch (Exception e) {
               LOGGER.error("nacos sync task error", e);
           }
       }
      
       private Map<String, Object> buildQueryMap(AppInfoDTO appInfo, ServiceInstance instance) {
           Map<String, Object> map = new HashMap<>();
           map.put("serviceName", appInfo.getAppName());
           map.put("groupName", NacosConstants.APP_GROUP_NAME);
           map.put("ip", instance.getIp());
           map.put("port", instance.getPort());
           map.put("weight", instance.getWeight().doubleValue());
           NacosMetadata metadata = new NacosMetadata();
           metadata.setAppName(appInfo.getAppName());
           metadata.setVersion(instance.getVersion());
           metadata.setPlugins(String.join(",", appInfo.getEnabledPlugins()));
           map.put("metadata", StringTools.urlEncode(gson.toJson(metadata)));
           map.put("ephemeral", true);
           return map;
       }
      

      }
      }

    ship-server再定时从Nacos拉取app数据更新到本地Map缓存。

    /**

    • @Author: Ship

    • @Description: sync data to local cache

    • @Date: Created in 2020/12/25
      */
      @Configuration
      public class DataSyncTaskListener implements ApplicationListener {

      private static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1,
      new ShipThreadFactory(“service-sync”, true).create());

      @NacosInjected
      private NamingService namingService;

      @Autowired
      private ServerConfigProperties properties;

      @Override
      public void onApplicationEvent(ContextRefreshedEvent event) {
      if (event.getApplicationContext().getParent() != null) {
      return;
      }
      scheduledPool.scheduleWithFixedDelay(new DataSyncTask(namingService)
      , 0L, properties.getCacheRefreshInterval(), TimeUnit.SECONDS);
      WebsocketSyncCacheServer websocketSyncCacheServer = new WebsocketSyncCacheServer(properties.getWebSocketPort());
      websocketSyncCacheServer.start();
      }

      class DataSyncTask implements Runnable {

       private NamingService namingService;
      
       public DataSyncTask(NamingService namingService) {
           this.namingService = namingService;
       }
      
       @Override
       public void run() {
           try {
               // get all app names
               ListView<String> services = namingService.getServicesOfServer(1, Integer.MAX_VALUE, NacosConstants.APP_GROUP_NAME);
               if (CollectionUtils.isEmpty(services.getData())) {
                   return;
               }
               List<String> appNames = services.getData();
               // get all instances
               for (String appName : appNames) {
                   List<Instance> instanceList = namingService.getAllInstances(appName, NacosConstants.APP_GROUP_NAME);
                   if (CollectionUtils.isEmpty(instanceList)) {
                       continue;
                   }
                   ServiceCache.add(appName, buildServiceInstances(instanceList));
                   List<String> pluginNames = getEnabledPlugins(instanceList);
                   PluginCache.add(appName, pluginNames);
               }
               ServiceCache.removeExpired(appNames);
               PluginCache.removeExpired(appNames);
      
           } catch (NacosException e) {
               e.printStackTrace();
           }
       }
      
       private List<String> getEnabledPlugins(List<Instance> instanceList) {
           Instance instance = instanceList.get(0);
           Map<String, String> metadata = instance.getMetadata();
           // plugins: DynamicRoute,Auth
           String plugins = metadata.getOrDefault("plugins", ShipPluginEnum.DYNAMIC_ROUTE.getName());
           return Arrays.stream(plugins.split(",")).collect(Collectors.toList());
       }
      
       private List<ServiceInstance> buildServiceInstances(List<Instance> instanceList) {
           List<ServiceInstance> list = new LinkedList<>();
           instanceList.forEach(instance -> {
               Map<String, String> metadata = instance.getMetadata();
               ServiceInstance serviceInstance = new ServiceInstance();
               serviceInstance.setAppName(metadata.get("appName"));
               serviceInstance.setIp(instance.getIp());
               serviceInstance.setPort(instance.getPort());
               serviceInstance.setVersion(metadata.get("version"));
               serviceInstance.setWeight((int) instance.getWeight());
               list.add(serviceInstance);
           });
           return list;
       }
      

      }
      }

    路由规则数据同步

    同时,如果用户在管理后台更新了路由规则,ship-admin需要推送规则数据到ship-server,这里参考了soul网关的做法利用websocket在第一次建立连接后进行全量同步,此后路由规则发生变更就只作增量同步。

    服务端WebsocketSyncCacheServer:

    /**

    • @Author: Ship

    • @Description:

    • @Date: Created in 2020/12/28
      */
      public class WebsocketSyncCacheServer extends WebSocketServer {

      private final static Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncCacheServer.class);

      private Gson gson = new GsonBuilder().create();

      private MessageHandler messageHandler;

      public WebsocketSyncCacheServer(Integer port) {
      super(new InetSocketAddress(port));
      this.messageHandler = new MessageHandler();
      }

      @Override
      public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake) {
      LOGGER.info(“server is open”);
      }

      @Override
      public void onClose(WebSocket webSocket, int i, String s, boolean b) {
      LOGGER.info(“websocket server close…”);
      }

      @Override
      public void onMessage(WebSocket webSocket, String message) {
      LOGGER.info(“websocket server receive message:\n[{}]”, message);
      this.messageHandler.handler(message);
      }

      @Override
      public void onError(WebSocket webSocket, Exception e) {

      }

      @Override
      public void onStart() {
      LOGGER.info(“websocket server start…”);
      }

      class MessageHandler {

       public void handler(String message) {
           RouteRuleOperationDTO operationDTO = gson.fromJson(message, RouteRuleOperationDTO.class);
           if (CollectionUtils.isEmpty(operationDTO.getRuleList())) {
               return;
           }
           Map<String, List<AppRuleDTO>> map = operationDTO.getRuleList()
                   .stream().collect(Collectors.groupingBy(AppRuleDTO::getAppName));
           if (OperationTypeEnum.INSERT.getCode().equals(operationDTO.getOperationType())
                   || OperationTypeEnum.UPDATE.getCode().equals(operationDTO.getOperationType())) {
               RouteRuleCache.add(map);
           } else if (OperationTypeEnum.DELETE.getCode().equals(operationDTO.getOperationType())) {
               RouteRuleCache.remove(map);
           }
       }
      

      }
      }

    客户端WebsocketSyncCacheClient:

    /**

    • @Author: Ship

    • @Description:

    • @Date: Created in 2020/12/28
      */
      @Component
      public class WebsocketSyncCacheClient {

      private final static Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncCacheClient.class);

      private WebSocketClient client;

      private RuleService ruleService;

      private Gson gson = new GsonBuilder().create();

      public WebsocketSyncCacheClient(@Value("${ship.server-web-socket-url}") String serverWebSocketUrl,
      RuleService ruleService) {
      if (StringUtils.isEmpty(serverWebSocketUrl)) {
      throw new ShipException(ShipExceptionEnum.CONFIG_ERROR);
      }
      this.ruleService = ruleService;
      ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1,
      new ShipThreadFactory(“websocket-connect”, true).create());
      try {
      client = new WebSocketClient(new URI(serverWebSocketUrl)) {
      @Override
      public void onOpen(ServerHandshake serverHandshake) {
      LOGGER.info(“client is open”);
      List list = ruleService.getEnabledRule();
      String msg = gson.toJson(new RouteRuleOperationDTO(OperationTypeEnum.INSERT, list));
      send(msg);
      }

               @Override
               public void onMessage(String s) {
               }
      
               @Override
               public void onClose(int i, String s, boolean b) {
               }
      
               @Override
               public void onError(Exception e) {
                   LOGGER.error("websocket client error", e);
               }
           };
      
           client.connectBlocking();
           //使用调度线程池进行断线重连,30秒进行一次
           executor.scheduleAtFixedRate(() -> {
               if (client != null && client.isClosed()) {
                   try {
                       client.reconnectBlocking();
                   } catch (InterruptedException e) {
                       LOGGER.error("reconnect server fail", e);
                   }
               }
           }, 10, 30, TimeUnit.SECONDS);
      
       } catch (Exception e) {
           LOGGER.error("websocket sync cache exception", e);
           throw new ShipException(e.getMessage());
       }
      

      }

      public void send(T t) {
      while (!client.getReadyState().equals(ReadyState.OPEN)) {
      LOGGER.debug(“connecting …please wait”);
      }
      client.send(gson.toJson(t));
      }
      }

    四、测试

    4.1动态路由测试

    本地启动nacos ,sh startup.sh -m standalone

    启动ship-admin

    本地启动两个ship-example实例。

    实例1配置:

    ship:
    http:
    app-name: order
    version: gray_1.0
    context-path: /order
    port: 8081
    admin-url: 127.0.0.1:9001

    server:
    port: 8081

    nacos:
    discovery:
    server-addr: 127.0.0.1:8848
    实例2配置:

    ship:
    http:
    app-name: order
    version: prod_1.0
    context-path: /order
    port: 8082
    admin-url: 127.0.0.1:9001

    server:
    port: 8082

    nacos:
    discovery:
    server-addr: 127.0.0.1:8848
    在数据库添加路由规则配置,该规则表示当http header 中的name=ship时请求路由到gray_1.0版本的节点。

    启动ship-server,看到以下日志时则可以进行测试了。

    2021-01-02 19:57:09.159 INFO 30413 — [SocketWorker-29] cn.sp.sync.WebsocketSyncCacheServer : websocket server receive message:
    [{“operationType”:“INSERT”,“ruleList”:[{“id”:1,“appId”:5,“appName”:“order”,“version”:“gray_1.0”,“matchObject”:“HEADER”,“matchKey”:“name”,“matchMethod”:1,“matchRule”:“ship”,“priority”:50}]}]

    用Postman请求http://localhost:9000/order/user/add,POST方式,header设置name=ship,可以看到只有实例1有日志显示。

    ==========add user,version:gray_1.0
    4.2性能压测

    压测环境:

    MacBook Pro 13英寸

    处理器 2.3 GHz 四核Intel Core i7

    内存 16 GB 3733 MHz LPDDR4X

    后端节点个数一个

    压测工具:wrk

    压测结果:20个线程,500个连接数,吞吐量大概每秒9400个请求。

    压测结果

    五、总结

    ​ 千里之行始于足下,开始以为写一个网关会很难,但当你实际开始行动时就会发现其实没那么难,所以迈出第一步很重要。过程中也遇到了很多问题,还在github上给soul和nacos这两个开源项目提了两个issue,后来发现是自己的问题,尴尬😅。本文代码已全部上传到github,点击这里即可,最后,希望此文对你有所帮助。

    参考资料:

    https://nacos.io/zh-cn/docs/quick-start.html

    https://dromara.org/website/zh-cn/docs/soul/soul.html

    https://docs.spring.io/spring-framework/docs/5.1.7.RELEASE/spring-framework-reference/web-reactive.html#webflux

    https://github.com/TooTallNate/Java-WebSocket

    本文作者: 烟味i
    本文链接:https://www.cnblogs.com/2YSP/p/14223892.html
    版权声明:本作品采用知识共享署名-非商业性使用-禁止演绎 2.5 中国大陆许可协议进行许可。

    展开全文
  • 如何用蓝牙网关广播蓝牙数据广播数据在厂商数据段(0xFF)里广播数据在用户自定义段里: 如果我们要广播固定内容数据,让周边其他蓝牙设备扫描读取到话,我们可以利用金桔蓝牙网关的蓝牙广播功能。 广播数据...


    如果我们要广播固定内容的数据,让周边的其他蓝牙设备扫描读取到的话,我们可以利用金桔蓝牙网关的蓝牙广播功能。

    广播数据在厂商数据段(0xFF)里

    蓝牙的厂商数据一般允许用户将自己定义的数据放到这个段下面。
    我们在金桔ACServer里这样配置蓝牙网关:
    在这里插入图片描述
    点击普通广播,这样蓝牙网关就会广播0x34,0x35,0x36,0x37,0x38(注意是十六进制)这几个数据20秒的时间,我们看下手机上nrfconnect看到的结果:
    在这里插入图片描述

    广播数据在用户自定义的段里:

    蓝牙广播的标准原始数据格式是:段长度+段类型+段数据,如上面的例子02 是长度 01是类型(广播标志) 06是数据;再往后,0A又是长度,FF是类型(厂商数据)后面是10个字节的数据。

    如果我们要自己定义段的信息,比如0xAC段里广播0x31, 0x32, 0x33,0x34,0xAD段里段里广播0x35, 0x36, 0x37,0x38,我们这样设置金桔ACserver的网关数据:
    在这里插入图片描述
    点击段广播后,网关将广播20秒该段的内容,我们通过手机的nrfconnect查看广播的内容:
    在这里插入图片描述
    这里看到我们的设备名称(N/A)没有了,通过mac地址可以看到这个是蓝牙网关,因为我们自己定义了段的信息,所以蓝牙名称段(0x09)就没有数据了。

    展开全文
  • - 背景 -最近在github上了soul网关的设计,突然就来了兴趣准备自己从零开始写一个高性能的网关。经过两周时间的开发,我的网关ship-gate核心功能基本都已完成...

    -     背景     -

    最近在github上看了soul网关的设计,突然就来了兴趣准备自己从零开始写一个高性能的网关。经过两周时间的开发,我的网关ship-gate核心功能基本都已完成,最大的缺陷就是前端功底太差没有管理后台。

    -     设计     -

    1、技术选型

    网关是所有请求的入口,所以要求有很高的吞吐量,为了实现这点可以使用请求异步化来解决。目前一般有以下两种方案:

    • Tomcat/Jetty+NIO+Servlet3

    Servlet3已经支持异步,这种方案使用比较多,京东,有赞和Zuul,都用的是这种方案。

    • Netty+NIO

    Netty为高并发而生,目前唯品会的网关使用这个策略,在唯品会的技术文章中在相同的情况下Netty是每秒30w+的吞吐量,Tomcat是13w+,可以看出是有一定的差距的,但是Netty需要自己处理HTTP协议,这一块比较麻烦。

    后面发现Soul网关是基于Spring WebFlux(底层Netty)的,不用太关心HTTP协议的处理,于是决定也用Spring WebFlux。

    网关的第二个特点是具备可扩展性,比如Netflix Zuul有preFilters,postFilters等在不同的阶段方便处理不同的业务,基于责任链模式将请求进行链式处理即可实现。

    在微服务架构下,服务都会进行多实例部署来保证高可用,请求到达网关时,网关需要根据URL找到所有可用的实例,这时就需要服务注册和发现功能,即注册中心。

    现在流行的注册中心有Apache的Zookeeper和阿里的Nacos两种(consul有点小众),因为之前写RPC框架时已经用过了Zookeeper,所以这次就选择了Nacos。

    2、需求清单

    首先要明确目标,即开发一个具备哪些特性的网关,总结下后如下:

    自定义路由规则

    可基于version的路由规则设置,路由对象包括DEFAUL,HEADER和QUERY三种,匹配方式包括=、regex、like三种。

    跨语言

    HTTP协议天生跨语言

    高性能

    Netty本身就是一款高性能的通信框架,同时server将一些路由规则等数据缓存到JVM内存避免请求admin服务。

    高可用

    支持集群模式防止单节点故障,无状态。

    灰度发布

    灰度发布(又名金丝雀发布)是指在黑与白之间,能够平滑过渡的一种发布方式。在其上可以进行A/B testing,即让一部分用户继续用产品特性A,一部分用户开始用产品特性B,如果用户对B没有什么反对意见,那么逐步扩大范围,把所有用户都迁移到B上面来。通过特性一可以实现。

    接口鉴权

    基于责任链模式,用户开发自己的鉴权插件即可。

    负载均衡

    支持多种负载均衡算法,如随机,轮询,加权轮询等。利用SPI机制可以根据配置进行动态加载。

    3、架构设计

    在参考了一些优秀的网关Zuul,Spring Cloud Gateway,Soul后,将项目划分为以下几个模块。

    它们之间的关系如图:

    网关设计

    注意: 这张图与实际实现有点出入,Nacos push到本地缓存的那个环节没有实现,目前只有ship-sever定时轮询pull的过程。ship-admin从Nacos获取注册服务信息的过程,也改成了ServiceA启动时主动发生HTTP请求通知ship-admin。

    4、表结构设计


    -     编码     -

    1、ship-client-spring-boot-starter

    首先创建一个spring-boot-starter命名为ship-client-spring-boot-starter,不知道如何自定义starter的可以看我以前写的《开发自己的starter》。

    其核心类 AutoRegisterListener 就是在项目启动时做了两件事:

    1.将服务信息注册到Nacos注册中心

    2.通知ship-admin服务上线了并注册下线hook。

    代码如下:

    * Created by 2YSP on 2020/12/21
    */
    public class AutoRegisterListener implements ApplicationListener<ContextRefreshedEvent> {
    
       private final static Logger LOGGER = LoggerFactory.getLogger(AutoRegisterListener.class);
    
       private volatile AtomicBoolean registered = new AtomicBoolean(false);
    
       private final ClientConfigProperties properties;
    
       @NacosInjected
       private NamingService namingService;
    
       @Autowired
       private RequestMappingHandlerMapping handlerMapping;
    
       private final ExecutorService pool;
    
       /**
    * url list to ignore
    */
       private static List<String> ignoreUrlList = new LinkedList<>();
    
       static {
           ignoreUrlList.add("/error");
       }
    
       public AutoRegisterListener(ClientConfigProperties properties) {
           if (!check(properties)) {
               LOGGER.error("client config port,contextPath,appName adminUrl and version can't be empty!");
               throw new ShipException("client config port,contextPath,appName adminUrl and version can't be empty!");
           }
           this.properties = properties;
           pool = new ThreadPoolExecutor(1, 4, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
       }
    
       /**
    * check the ClientConfigProperties
    *
    * @param properties
    * @return
    */
       private boolean check(ClientConfigProperties properties) {
           if (properties.getPort() == null| properties.getContextPath() == null
                  | properties.getVersion() == null| properties.getAppName() == null
                  | properties.getAdminUrl() == null) {
               return false;
           }
           return true;
       }
    
    
       @Override
       public void onApplicationEvent(ContextRefreshedEvent event) {
           if (!registered.compareAndSet(false, true)) {
               return;
           }
           doRegister();
           registerShutDownHook();
       }
    
       /**
    * send unregister request to admin when jvm shutdown
    */
       private void registerShutDownHook() {
           final String url = "http://" + properties.getAdminUrl() + AdminConstants.UNREGISTER_PATH;
           final UnregisterAppDTO unregisterAppDTO = new UnregisterAppDTO();
           unregisterAppDTO.setAppName(properties.getAppName());
           unregisterAppDTO.setVersion(properties.getVersion());
           unregisterAppDTO.setIp(IpUtil.getLocalIpAddress());
           unregisterAppDTO.setPort(properties.getPort());
           Runtime.getRuntime().addShutdownHook(new Thread(() -> {
               OkhttpTool.doPost(url, unregisterAppDTO);
               LOGGER.info("[{}:{}] unregister from ship-admin success!", unregisterAppDTO.getAppName(), unregisterAppDTO.getVersion());
           }));
       }
    
       /**
    * register all interface info to register center
    */
       private void doRegister() {
           Instance instance = new Instance();
           instance.setIp(IpUtil.getLocalIpAddress());
           instance.setPort(properties.getPort());
           instance.setEphemeral(true);
           Map<String, String> metadataMap = new HashMap<>();
           metadataMap.put("version", properties.getVersion());
           metadataMap.put("appName", properties.getAppName());
           instance.setMetadata(metadataMap);
           try {
               namingService.registerInstance(properties.getAppName(), NacosConstants.APP_GROUP_NAME, instance);
           } catch (NacosException e) {
               LOGGER.error("register to nacos fail", e);
               throw new ShipException(e.getErrCode(), e.getErrMsg());
           }
           LOGGER.info("register interface info to nacos success!");
           // send register request to ship-admin
           String url = "http://" + properties.getAdminUrl() + AdminConstants.REGISTER_PATH;
           RegisterAppDTO registerAppDTO = buildRegisterAppDTO(instance);
           OkhttpTool.doPost(url, registerAppDTO);
           LOGGER.info("register to ship-admin success!");
       }
    
    
       private RegisterAppDTO buildRegisterAppDTO(Instance instance) {
           RegisterAppDTO registerAppDTO = new RegisterAppDTO();
           registerAppDTO.setAppName(properties.getAppName());
           registerAppDTO.setContextPath(properties.getContextPath());
           registerAppDTO.setIp(instance.getIp());
           registerAppDTO.setPort(instance.getPort());
           registerAppDTO.setVersion(properties.getVersion());
           return registerAppDTO;
       }
    }
    
    
    

    2、ship-server

    ship-sever项目主要包括了两个部分内容, 1.请求动态路由的主流程 2.本地缓存数据和ship-admin及nacos同步,这部分在后面3.3再讲。

    ship-server实现动态路由的原理是利用WebFilter拦截请求,然后将请求教给plugin chain去链式处理。

    PluginFilter根据URL解析出appName,然后将启用的plugin组装成plugin chain。

    public class PluginFilter implements WebFilter {
    
       private ServerConfigProperties properties;
    
       public PluginFilter(ServerConfigProperties properties) {
           this.properties = properties;
       }
    
       @Override
       public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
           String appName = parseAppName(exchange);
           if (CollectionUtils.isEmpty(ServiceCache.getAllInstances(appName))) {
               throw new ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);
           }
           PluginChain pluginChain = new PluginChain(properties, appName);
           pluginChain.addPlugin(new DynamicRoutePlugin(properties));
           pluginChain.addPlugin(new AuthPlugin(properties));
           return pluginChain.execute(exchange, pluginChain);
       }
    
       private String parseAppName(ServerWebExchange exchange) {
           RequestPath path = exchange.getRequest().getPath();
           String appName = path.value().split("/")[1];
           return appName;
       }
    }```
    
    PluginChain继承了AbstractShipPlugin并持有所有要执行的插件。
    
    ```java
    * @Author: Ship
    * @Description:
    * @Date: Created in 2020/12/25
    */
    public class PluginChain extends AbstractShipPlugin {
       /**
    * the pos point to current plugin
    */
       private int pos;
       /**
    * the plugins of chain
    */
       private List<ShipPlugin> plugins;
    
       private final String appName;
    
       public PluginChain(ServerConfigProperties properties, String appName) {
           super(properties);
           this.appName = appName;
       }
    
       /**
    * add enabled plugin to chain
    *
    * @param shipPlugin
    */
       public void addPlugin(ShipPlugin shipPlugin) {
           if (plugins == null) {
               plugins = new ArrayList<>();
           }
           if (!PluginCache.isEnabled(appName, shipPlugin.name())) {
               return;
           }
           plugins.add(shipPlugin);
           // order by the plugin's order
           plugins.sort(Comparator.comparing(ShipPlugin::order));
       }
    
       @Override
       public Integer order() {
           return null;
       }
    
       @Override
       public String name() {
           return null;
       }
    
       @Override
       public Mono<Void> execute(ServerWebExchange exchange, PluginChain pluginChain) {
           if (pos == plugins.size()) {
               return exchange.getResponse().setComplete();
           }
           return pluginChain.plugins.get(pos++).execute(exchange, pluginChain);
       }
    
       public String getAppName() {
           return appName;
       }
    
    }
    

    AbstractShipPlugin实现了ShipPlugin接口,并持有ServerConfigProperties配置对象。

    public abstract class AbstractShipPlugin implements ShipPlugin {
    
       protected ServerConfigProperties properties;
    
       public AbstractShipPlugin(ServerConfigProperties properties) {
           this.properties = properties;
       }
    }```
    
    ShipPlugin接口定义了所有插件必须实现的三个方法order(),name()和execute()。
    
    ```java
    public interface ShipPlugin {
       /**
    * lower values have higher priority
    *
    * @return
    */
       Integer order();
    
       /**
    * return current plugin name
    *
    * @return
    */
       String name();
    
       Mono<Void> execute(ServerWebExchange exchange,PluginChain pluginChain);
    
    }```
    
    DynamicRoutePlugin继承了抽象类AbstractShipPlugin,包含了动态路由的主要业务逻辑。
    
    ```java
    * @Author: Ship
    * @Description:
    * @Date: Created in 2020/12/25
    */
    public class DynamicRoutePlugin extends AbstractShipPlugin {
    
       private final static Logger LOGGER = LoggerFactory.getLogger(DynamicRoutePlugin.class);
    
       private static WebClient webClient;
    
       private static final Gson gson = new GsonBuilder().create();
    
       static {
           HttpClient httpClient = HttpClient.create()
                   .tcpConfiguration(client ->
                           client.doOnConnected(conn ->
                                   conn.addHandlerLast(new ReadTimeoutHandler(3))
                                           .addHandlerLast(new WriteTimeoutHandler(3)))
                                   .option(ChannelOption.TCP_NODELAY, true)
                   );
           webClient = WebClient.builder().clientConnector(new ReactorClientHttpConnector(httpClient))
                   .build();
       }
    
       public DynamicRoutePlugin(ServerConfigProperties properties) {
           super(properties);
       }
    
       @Override
       public Integer order() {
           return ShipPluginEnum.DYNAMIC_ROUTE.getOrder();
       }
    
       @Override
       public String name() {
           return ShipPluginEnum.DYNAMIC_ROUTE.getName();
       }
    
       @Override
       public Mono<Void> execute(ServerWebExchange exchange, PluginChain pluginChain) {
           String appName = pluginChain.getAppName();
           ServiceInstance serviceInstance = chooseInstance(appName, exchange.getRequest());
    //        LOGGER.info("selected instance is [{}]", gson.toJson(serviceInstance));
           // request service
           String url = buildUrl(exchange, serviceInstance);
           return forward(exchange, url);
       }
    
       /**
    * forward request to backend service
    *
    * @param exchange
    * @param url
    * @return
    */
       private Mono<Void> forward(ServerWebExchange exchange, String url) {
           ServerHttpRequest request = exchange.getRequest();
           ServerHttpResponse response = exchange.getResponse();
           HttpMethod method = request.getMethod();
    
           WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(url).headers((headers) -> {
               headers.addAll(request.getHeaders());
           });
    
           WebClient.RequestHeadersSpec<?> reqHeadersSpec;
           if (requireHttpBody(method)) {
               reqHeadersSpec = requestBodySpec.body(BodyInserters.fromDataBuffers(request.getBody()));
           } else {
               reqHeadersSpec = requestBodySpec;
           }
           // nio->callback->nio
           return reqHeadersSpec.exchange().timeout(Duration.ofMillis(properties.getTimeOutMillis()))
                   .onErrorResume(ex -> {
                       return Mono.defer(() -> {
                           String errorResultJson = "";
                           if (ex instanceof TimeoutException) {
                               errorResultJson = "{\"code\":5001,\"message\":\"network timeout\"}";
                           } else {
                               errorResultJson = "{\"code\":5000,\"message\":\"system error\"}";
                           }
                           return ShipResponseUtil.doResponse(exchange, errorResultJson);
                       }).then(Mono.empty());
                   }).flatMap(backendResponse -> {
                       response.setStatusCode(backendResponse.statusCode());
                       response.getHeaders().putAll(backendResponse.headers().asHttpHeaders());
                       return response.writeWith(backendResponse.bodyToFlux(DataBuffer.class));
                   });
       }
    
       /**
    * weather the http method need http body
    *
    * @param method
    * @return
    */
       private boolean requireHttpBody(HttpMethod method) {
           if (method.equals(HttpMethod.POST)| method.equals(HttpMethod.PUT)| method.equals(HttpMethod.PATCH)) {
               return true;
           }
           return false;
       }
    
       private String buildUrl(ServerWebExchange exchange, ServiceInstance serviceInstance) {
           ServerHttpRequest request = exchange.getRequest();
           String query = request.getURI().getQuery();
           String path = request.getPath().value().replaceFirst("/" + serviceInstance.getAppName(), "");
           String url = "http://" + serviceInstance.getIp() + ":" + serviceInstance.getPort() + path;
           if (!StringUtils.isEmpty(query)) {
               url = url + "?" + query;
           }
           return url;
       }
    
    
       /**
    * choose an ServiceInstance according to route rule config and load balancing algorithm
    *
    * @param appName
    * @param request
    * @return
    */
       private ServiceInstance chooseInstance(String appName, ServerHttpRequest request) {
           List<ServiceInstance> serviceInstances = ServiceCache.getAllInstances(appName);
           if (CollectionUtils.isEmpty(serviceInstances)) {
               LOGGER.error("service instance of {} not find", appName);
               throw new ShipException(ShipExceptionEnum.SERVICE_NOT_FIND);
           }
           String version = matchAppVersion(appName, request);
           if (StringUtils.isEmpty(version)) {
               throw new ShipException("match app version error");
           }
           // filter serviceInstances by version
           List<ServiceInstance> instances = serviceInstances.stream().filter(i -> i.getVersion().equals(version)).collect(Collectors.toList());
           //Select an instance based on the load balancing algorithm
           LoadBalance loadBalance = LoadBalanceFactory.getInstance(properties.getLoadBalance(), appName, version);
           ServiceInstance serviceInstance = loadBalance.chooseOne(instances);
           return serviceInstance;
       }
    
    
       private String matchAppVersion(String appName, ServerHttpRequest request) {
           List<AppRuleDTO> rules = RouteRuleCache.getRules(appName);
           rules.sort(Comparator.comparing(AppRuleDTO::getPriority).reversed());
           for (AppRuleDTO rule : rules) {
               if (match(rule, request)) {
                   return rule.getVersion();
               }
           }
           return null;
       }
    
    
       private boolean match(AppRuleDTO rule, ServerHttpRequest request) {
           String matchObject = rule.getMatchObject();
           String matchKey = rule.getMatchKey();
           String matchRule = rule.getMatchRule();
           Byte matchMethod = rule.getMatchMethod();
           if (MatchObjectEnum.DEFAULT.getCode().equals(matchObject)) {
               return true;
           } else if (MatchObjectEnum.QUERY.getCode().equals(matchObject)) {
               String param = request.getQueryParams().getFirst(matchKey);
               if (!StringUtils.isEmpty(param)) {
                   return StringTools.match(param, matchMethod, matchRule);
               }
           } else if (MatchObjectEnum.HEADER.getCode().equals(matchObject)) {
               HttpHeaders headers = request.getHeaders();
               String headerValue = headers.getFirst(matchKey);
               if (!StringUtils.isEmpty(headerValue)) {
                   return StringTools.match(headerValue, matchMethod, matchRule);
               }
           }
           return false;
       }
    
    }
    

    3、数据同步

    app数据同步

    后台服务(如订单服务)启动时,只将服务名,版本,ip地址和端口号注册到了Nacos,并没有实例的权重和启用的插件信息怎么办?

    一般在线的实例权重和插件列表都是在管理界面配置,然后动态生效的,所以需要ship-admin定时更新实例的权重和插件信息到注册中心。

    对应代码ship-admin的NacosSyncListener:

    * @Author: Ship
    * @Description:
    * @Date: Created in 2020/12/30
    */
    @Configuration
    public class NacosSyncListener implements ApplicationListener<ContextRefreshedEvent> {
    
       private static final Logger LOGGER = LoggerFactory.getLogger(NacosSyncListener.class);
    
       private static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1,
               new ShipThreadFactory("nacos-sync", true).create());
    
       @NacosInjected
       private NamingService namingService;
    
       @Value("${nacos.discovery.server-addr}")
       private String baseUrl;
    
       @Resource
       private AppService appService;
    
       @Override
       public void onApplicationEvent(ContextRefreshedEvent event) {
           if (event.getApplicationContext().getParent() != null) {
               return;
           }
           String url = "http://" + baseUrl + NacosConstants.INSTANCE_UPDATE_PATH;
           scheduledPool.scheduleWithFixedDelay(new NacosSyncTask(namingService, url, appService), 0, 30L, TimeUnit.SECONDS);
       }
    
       class NacosSyncTask implements Runnable {
    
           private NamingService namingService;
    
           private String url;
    
           private AppService appService;
    
           private Gson gson = new GsonBuilder().create();
    
           public NacosSyncTask(NamingService namingService, String url, AppService appService) {
               this.namingService = namingService;
               this.url = url;
               this.appService = appService;
           }
    
           /**
    * Regular update weight,enabled plugins to nacos instance
    */
           @Override
           public void run() {
               try {
                   // get all app names
                   ListView<String> services = namingService.getServicesOfServer(1, Integer.MAX_VALUE, NacosConstants.APP_GROUP_NAME);
                   if (CollectionUtils.isEmpty(services.getData())) {
                       return;
                   }
                   List<String> appNames = services.getData();
                   List<AppInfoDTO> appInfos = appService.getAppInfos(appNames);
                   for (AppInfoDTO appInfo : appInfos) {
                       if (CollectionUtils.isEmpty(appInfo.getInstances())) {
                           continue;
                       }
                       for (ServiceInstance instance : appInfo.getInstances()) {
                           Map<String, Object> queryMap = buildQueryMap(appInfo, instance);
                           String resp = OkhttpTool.doPut(url, queryMap, "");
                           LOGGER.debug("response :{}", resp);
                       }
                   }
    
               } catch (Exception e) {
                   LOGGER.error("nacos sync task error", e);
               }
           }
    
           private Map<String, Object> buildQueryMap(AppInfoDTO appInfo, ServiceInstance instance) {
               Map<String, Object> map = new HashMap<>();
               map.put("serviceName", appInfo.getAppName());
               map.put("groupName", NacosConstants.APP_GROUP_NAME);
               map.put("ip", instance.getIp());
               map.put("port", instance.getPort());
               map.put("weight", instance.getWeight().doubleValue());
               NacosMetadata metadata = new NacosMetadata();
               metadata.setAppName(appInfo.getAppName());
               metadata.setVersion(instance.getVersion());
               metadata.setPlugins(String.join(",", appInfo.getEnabledPlugins()));
               map.put("metadata", StringTools.urlEncode(gson.toJson(metadata)));
               map.put("ephemeral", true);
               return map;
           }
       }
    }
    

    ship-server再定时从Nacos拉取app数据更新到本地Map缓存。

    * @Author: Ship
    * @Description: sync data to local cache
    * @Date: Created in 2020/12/25
    */
    @Configuration
    public class DataSyncTaskListener implements ApplicationListener<ContextRefreshedEvent> {
    
       private static ScheduledThreadPoolExecutor scheduledPool = new ScheduledThreadPoolExecutor(1,
               new ShipThreadFactory("service-sync", true).create());
    
       @NacosInjected
       private NamingService namingService;
    
       @Autowired
       private ServerConfigProperties properties;
    
       @Override
       public void onApplicationEvent(ContextRefreshedEvent event) {
           if (event.getApplicationContext().getParent() != null) {
               return;
           }
           scheduledPool.scheduleWithFixedDelay(new DataSyncTask(namingService)
                   , 0L, properties.getCacheRefreshInterval(), TimeUnit.SECONDS);
           WebsocketSyncCacheServer websocketSyncCacheServer = new WebsocketSyncCacheServer(properties.getWebSocketPort());
           websocketSyncCacheServer.start();
       }
    
    
       class DataSyncTask implements Runnable {
    
           private NamingService namingService;
    
           public DataSyncTask(NamingService namingService) {
               this.namingService = namingService;
           }
    
           @Override
           public void run() {
               try {
                   // get all app names
                   ListView<String> services = namingService.getServicesOfServer(1, Integer.MAX_VALUE, NacosConstants.APP_GROUP_NAME);
                   if (CollectionUtils.isEmpty(services.getData())) {
                       return;
                   }
                   List<String> appNames = services.getData();
                   // get all instances
                   for (String appName : appNames) {
                       List<Instance> instanceList = namingService.getAllInstances(appName, NacosConstants.APP_GROUP_NAME);
                       if (CollectionUtils.isEmpty(instanceList)) {
                           continue;
                       }
                       ServiceCache.add(appName, buildServiceInstances(instanceList));
                       List<String> pluginNames = getEnabledPlugins(instanceList);
                       PluginCache.add(appName, pluginNames);
                   }
                   ServiceCache.removeExpired(appNames);
                   PluginCache.removeExpired(appNames);
    
               } catch (NacosException e) {
                   e.printStackTrace();
               }
           }
    
           private List<String> getEnabledPlugins(List<Instance> instanceList) {
               Instance instance = instanceList.get(0);
               Map<String, String> metadata = instance.getMetadata();
               // plugins: DynamicRoute,Auth
               String plugins = metadata.getOrDefault("plugins", ShipPluginEnum.DYNAMIC_ROUTE.getName());
               return Arrays.stream(plugins.split(",")).collect(Collectors.toList());
           }
    
           private List<ServiceInstance> buildServiceInstances(List<Instance> instanceList) {
               List<ServiceInstance> list = new LinkedList<>();
               instanceList.forEach(instance -> {
                   Map<String, String> metadata = instance.getMetadata();
                   ServiceInstance serviceInstance = new ServiceInstance();
                   serviceInstance.setAppName(metadata.get("appName"));
                   serviceInstance.setIp(instance.getIp());
                   serviceInstance.setPort(instance.getPort());
                   serviceInstance.setVersion(metadata.get("version"));
                   serviceInstance.setWeight((int) instance.getWeight());
                   list.add(serviceInstance);
               });
               return list;
           }
       }
    }
    

    路由规则数据同步

    同时,如果用户在管理后台更新了路由规则,ship-admin需要推送规则数据到ship-server,这里参考了soul网关的做法利用websocket在第一次建立连接后进行全量同步,此后路由规则发生变更就只作增量同步。

    服务端WebsocketSyncCacheServer:

    * @Author: Ship
    * @Description:
    * @Date: Created in 2020/12/28
    */
    public class WebsocketSyncCacheServer extends WebSocketServer {
    
       private final static Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncCacheServer.class);
    
       private Gson gson = new GsonBuilder().create();
    
       private MessageHandler messageHandler;
    
       public WebsocketSyncCacheServer(Integer port) {
           super(new InetSocketAddress(port));
           this.messageHandler = new MessageHandler();
       }
    
    
       @Override
       public void onOpen(WebSocket webSocket, ClientHandshake clientHandshake) {
           LOGGER.info("server is open");
       }
    
       @Override
       public void onClose(WebSocket webSocket, int i, String s, boolean b) {
           LOGGER.info("websocket server close...");
       }
    
       @Override
       public void onMessage(WebSocket webSocket, String message) {
           LOGGER.info("websocket server receive message:\n[{}]", message);
           this.messageHandler.handler(message);
       }
    
       @Override
       public void onError(WebSocket webSocket, Exception e) {
    
       }
    
       @Override
       public void onStart() {
           LOGGER.info("websocket server start...");
       }
    
    
       class MessageHandler {
    
           public void handler(String message) {
               RouteRuleOperationDTO operationDTO = gson.fromJson(message, RouteRuleOperationDTO.class);
               if (CollectionUtils.isEmpty(operationDTO.getRuleList())) {
                   return;
               }
               Map<String, List<AppRuleDTO>> map = operationDTO.getRuleList()
                       .stream().collect(Collectors.groupingBy(AppRuleDTO::getAppName));
               if (OperationTypeEnum.INSERT.getCode().equals(operationDTO.getOperationType())
                      | OperationTypeEnum.UPDATE.getCode().equals(operationDTO.getOperationType())) {
                   RouteRuleCache.add(map);
               } else if (OperationTypeEnum.DELETE.getCode().equals(operationDTO.getOperationType())) {
                   RouteRuleCache.remove(map);
               }
           }
       }
    }
    

    客户端WebsocketSyncCacheClient:

    * @Author: Ship
    * @Description:
    * @Date: Created in 2020/12/28
    */
    @Component
    public class WebsocketSyncCacheClient {
    
       private final static Logger LOGGER = LoggerFactory.getLogger(WebsocketSyncCacheClient.class);
    
       private WebSocketClient client;
    
       private RuleService ruleService;
    
       private Gson gson = new GsonBuilder().create();
    
       public WebsocketSyncCacheClient(@Value("${ship.server-web-socket-url}") String serverWebSocketUrl,
    RuleService ruleService) {
           if (StringUtils.isEmpty(serverWebSocketUrl)) {
               throw new ShipException(ShipExceptionEnum.CONFIG_ERROR);
           }
           this.ruleService = ruleService;
           ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1,
                   new ShipThreadFactory("websocket-connect", true).create());
           try {
               client = new WebSocketClient(new URI(serverWebSocketUrl)) {
                   @Override
                   public void onOpen(ServerHandshake serverHandshake) {
                       LOGGER.info("client is open");
                       List<AppRuleDTO> list = ruleService.getEnabledRule();
                       String msg = gson.toJson(new RouteRuleOperationDTO(OperationTypeEnum.INSERT, list));
                       send(msg);
                   }
    
                   @Override
                   public void onMessage(String s) {
                   }
    
                   @Override
                   public void onClose(int i, String s, boolean b) {
                   }
    
                   @Override
                   public void onError(Exception e) {
                       LOGGER.error("websocket client error", e);
                   }
               };
    
               client.connectBlocking();
               //使用调度线程池进行断线重连,30秒进行一次
               executor.scheduleAtFixedRate(() -> {
                   if (client != null && client.isClosed()) {
                       try {
                           client.reconnectBlocking();
                       } catch (InterruptedException e) {
                           LOGGER.error("reconnect server fail", e);
                       }
                   }
               }, 10, 30, TimeUnit.SECONDS);
    
           } catch (Exception e) {
               LOGGER.error("websocket sync cache exception", e);
               throw new ShipException(e.getMessage());
           }
       }
    
       public <T> void send(T t) {
           while (!client.getReadyState().equals(ReadyState.OPEN)) {
               LOGGER.debug("connecting ...please wait");
           }
           client.send(gson.toJson(t));
       }
    }


    -     测试     -


    1、动态路由测试

    • 本地启动nacos ,sh startup.sh -m standalone;

    • 启动ship-admin;

    • 本地启动两个ship-example实例。

    实例1配置:

    ship:
     http:
       app-name: order
       version: gray_1.0
       context-path: /order
       port: 8081
       admin-url: 127.0.0.1:9001
    
     server:
     port: 8081
    
     nacos:
     discovery:
       server-addr: 127.0.0.1:8848
    

    实例2配置:

    ship:
     http:
       app-name: order
       version: prod_1.0
       context-path: /order
       port: 8082
       admin-url: 127.0.0.1:9001
    
     server:
     port: 8082
    
     nacos:
     discovery:
       server-addr: 127.0.0.1:8848
    

    在数据库添加路由规则配置,该规则表示当http header 中的name=ship时请求路由到gray_1.0版本的节点。

    启动ship-server,看到以下日志时则可以进行测试了。

    2021-01-02 19:57:09.159  INFO 30413 --- [SocketWorker-29] cn.sp.sync.WebsocketSyncCacheServer      : websocket server receive message:
     [{"operationType":"INSERT","ruleList":[{"id":1,"appId":5,"appName":"order","version":"gray_1.0","matchObject":"HEADER","matchKey":"name","matchMethod":1,"matchRule":"ship","priority":50}]}]
    

    用Postman请求http://localhost:9000/order/user/add,POST方式,header设置name=ship,可以看到只有实例1有日志显示。

    ==========add user,version:gray_1.0
    

    2、性能压测

    压测环境:

    • MacBook Pro 13英寸

    • 处理器 2.3 GHz 四核Intel Core i7

    • 内存 16 GB 3733 MHz LPDDR4X

    • 后端节点个数一个

    • 压测工具:wrk

    • 压测结果:20个线程,500个连接数,吞吐量大概每秒9400个请求。

    压测结果


    -     总结    -


    千里之行始于足下,开始以为写一个网关会很难,但当你实际开始行动时就会发现其实没那么难,所以迈出第一步很重要。过程中也遇到了很多问题,还在github上给soul和nacos这两个开源项目提了两个issue,后来发现是自己的问题,尴尬。

    本文代码已全部上传到:https://github.com/2YSP/ship-gate 。

    内容来源于网络,版权归原作者所有。

    
    最近面试BAT,整理一份面试资料《Java面试BAT通关手册》,覆盖了Java核心技术、JVM、Java并发、SSM、微服务、数据库、数据结构等等。获取方式:关注公众号并回复 java 领取,更多内容陆续奉上。
    明天见(。・ω・。)ノ♡
    
    展开全文
  • 最近在github上了soul网关的设计,突然就来了兴趣准备自己从零开始写一个高性能的网关。 经过两周时间的开发,我的网关ship-gate核心功能基本都已完成,最大的缺陷就是前端功底太差没有管理后台????。 二、设计 ...
  • 如何通过CMPP短信网关下发WAP PUSH

    千次阅读 2012-06-14 13:24:38
    折腾了几天wap push,终于有个结论。通过CMPP短信网关下发已测试出技术实现没问题, ...来联通也明白自己的平台管理不善,容易让不法分子钻空子,干脆把门都堵上了。 个人认为,wap push不是正常
  • 最近在github上了soul网关的设计,突然就来了兴趣准备自己从零开始写一个高性能的网关。折腾了大概三周时间,我的网关ship-gate核心功能基本都已完成,写这篇文章是记录我是如何从零开始手写一个网关的。 二、...
  • (转)如何通过CMPP短信网关下发wap push

    千次阅读 2009-05-30 17:08:00
    如何通过CMPP短信网关下发wap push 折腾了几天wap push,终于有个结论。通过CMPP短信网关下发已测试出技术实现没问题, 而用同样的方式给联通的SGIP短信... 来联通也明白自己的平台管理不善,容易让不法分子钻空子
  • - 背景 -最近在github上了soul网关的设计,突然就来了兴趣准备自己从零开始写一个高性能的网关。经过两周时间的开发,我的网关ship-gate核心功能基本都已完成...
  • 如何简单地设置一个LoRa网关

    千次阅读 2019-08-05 01:24:45
    我会先讲述一下原理,然后给大家演示一下如何完成一个LoRa网关的设置。 在本文中需要准备软硬件: 我最近经常在玩LoRa模块,甚至自己制作了一块LoRa开发板。通过LoRa技术可以实现低功率远程无线电通信,这是一种...
  • <div><p>社区demo-edge 项目下载下来,...望哪位大神给一个最简单edge网关程序,包含最基础功能就可以啦</p><p>该提问来源于开源项目:apache/servicecomb-java-chassis</p></div>
  • 过很多老师文章,大多都介绍是内网发布TS网关的方法.所以自己做了个外网发布实验.把步骤简单写下来.之所以做这个实验目的,是为了公网访问TS服务时,对内网机器FQDN解析问题.例如,当发布remoteapp时候,...
  • 最近在github上了soul网关的设计,突然就来了兴趣准备自己从零开始写一个高性能的网关。经过两周时间的开发,我的网关ship-gate核心功能基本都已完成,最大的缺陷就是前端功底太差没有管理后台..
  • 但是发现自己复制2.1.2版本与下载源码soul-admin和soul-boostrap版本不对。项目无法被注册到网关上,这个是个问题。后续希望可以通过源码能了解甚至解决这个问题 dubbo版本配置无法读取到it's not a ...
  • 朋友自己开了个公司,接到一个升级项目,客户要求用Aps.Net Core做数据网关服务且基于JWT认证实现对前后端分离数据服务支持,于是想到我一直做.Net开发,问我是否对.Net Core有所了解?能不能做个简单Demo出来...
  • 企业如何考虑自己的网络防护设备 在IT投入还比较舍得的公司里面,除去功能强大(思科/华为/3COM一类)的路由器之外,大多都有UTM或 IPS或Web安全网关之类的安全设备。当然,我们不能忘记一直辛苦工作的普通防火墙...
  • 最近自己写了一个关于网关限流插件,然后想着肯定会有很多兄弟也需要使用到,所以就想着把jar包上传到Maven中央仓库上让大家可以更方便使用 现在咱们来一下这个流程是什么样呢。 首先呢,你得去这个...
  • 我们发现近期使用EasyNTS上云网关的用户不少,都向我们咨询过配置问题,EasyNTS配置起来繁琐,实际操作却很简单,我们也有相关配置文档,大家可以参照配置。 对于一些已经配置完毕,正在使用用户,偶尔也...
  • 我们发现近期使用EasyNTS上云网关的用户不少,都向我们咨询过配置问题,EasyNTS配置起来繁琐,实际操作却很简单,我们也有相关配置文档,大家可以参照配置。 对于一些已经配置完毕,正在使用用户,偶尔也...
  • 本项目是从网上找到乐优商城项目,本文以这个项目为例做分析,有兴趣可以看看。 首先是搭建项目 这里为什么选择maven而不选择Spring Initializr呢,因为选择Spring Initializr话,它会生成一些其他配置,...
  • 阿里云网关API接口调用是接口数据请求时...查看配置的网关API接口的后台超时时间是否太短,一般1000-2000ms,一定注意单位是ms,不是m; 修改完成后一定记得发布,不然你的修改是不起作用; 如何自己写得接口返回数据...
  • 在决定以一组微服务来构建自己的应用时,你需要确定应用客户端如何与微服务交互。 在单体式程序中,通常只有一组冗余的或者负载均衡的服务提供点。在微服务架构中,每一个微服务暴露一组细粒度的服务提供点。在本...

空空如也

空空如也

1 2 3 4 5 ... 8
收藏数 159
精华内容 63
关键字:

如何看自己的网关