精华内容
下载资源
问答
  • 行业资料-电子功用-改进电弧电流转移型交流故障限流器及限流方法.pdf
  • 交流饱和铁芯型故障限流器(SCFCL)的研究现状和发展历程进行了综述。首先介绍了直流偏置型、永磁体偏置型、超导型和混合偏置型这4类SCFCL的工作原理、拓扑结构、等效电路和应用实例,比较并分析了4类SCFCL的功能...
  • 有效防止高温失灵—PTC热敏电阻用作LED限流器、电子技术,开发板制作交流
  • 摘要:介绍了一种适用于固态限流器的多输出高压隔离谐振式电源,给出了主电路拓扑结构,分析了其工作原理,并用PSpice对其进行了仿真验证,最后给出了实验结果。 理论分析和实验结果证明,负载谐振模式使负载电流...
  • 针对电力系统的短路电流限制问题,研究了一种新型桥式高温超导故障限流器。该限流器主要由两个超导带材绕制的线圈反向并联而成。线路正常工作时,并联线圈的感抗很小,因此对电路无影响,短路故障发生时,其中一个...
  • 电子政务-快速限流保护交流电子调压.zip
  • 近日,STMicroelectronics为高效功率转换提供限流芯片——STIL02-P5 和STIL04-P5。在很多的交流/支流电源转换中,都采用桥式电路。但是桥式电路会产生瞬间电流,这部分电流对功率转换不起作用,同时,常常会在PCB...
  • 限流型保护电路

    2021-02-03 14:19:00
    限流型保护电路、电子技术,开发板制作交流
  • 详细分析了基于模块化多电平换流器(MMC)的统一潮流控制器(UPFC)接入线路发生故障时的UPFC本体响应特性,提出一种基于限流电抗器的UPFC故障渡越策略。通过限制经过直流母线的故障电流,可使得UPFC外部故障期间并联侧...
  • 模糊控制限流软启动设计,金立,王梅,交流异步电机直接起动有许多弊端,通过分析传统PID控制存在的不足,讲述了一种将模糊控制和PID结合的异步电机软启动控制方式,并基
  • 基于Step-Down PWM电源管理芯片的PFM限流比较电路设计、电子技术,开发板制作交流
  • 一路数字输出用于检测交流适配器是否插入。 MAX8730提供0.5%的电池电压调节精度,因此提高了电池容量并极大地缩短了充电时间。该器件可通过硬件连线或微处理控制充电电流或电压。另外,MAX8730还可通过两个p沟道...
  • 德力西RN 型高压限流熔断pdf,德力西RN型高压限流熔断:本产品使用于户内交流50Hz,额定电压6~35kV系统中作为电力设备及电力线路的过载或短路保护。
  • 德力西RN1 型高压限流熔断pdf,德力西RN1型高压限流熔断:本产品使用于户内交流50Hz,额定电压6~35kV系统中作为电力设备及电力线路的过载或短路保护。
  • 基于柔性直流电网的故障特征,结合国内外研究成果,从交流限流、换流器限流以及直流侧限流3个方面分析了柔性直流电网各类限流技术和方法的原理及性能,对相关限流技术和方法进行了仿真测试和比较。基于对比分析...
  • 德力西XRNT1型变压器保护用高压限流熔断pdf,德力西XRNT1型变压器保护用高压限流熔断:本产品适用于户内交流50Hz,额定电压12kV的系统,可与其他开关电器如负荷开关、真空接触配合使用,作为电力变压器及其它...
  • 德力西XRNM1型电动机保护用高压限流熔断pdf,德力西XRNM1型电动机保护用高压限流熔断:本产品适用于户内交流50Hz,额定电压3.6KV系统.可与其他保护电器(如开关,真空断路)配合使用,作为高压电动机及其它电力设备...
  • 为了降低交流系统短路故障给基于模块化多电平换流器的统一潮流控制器(MMC-UPFC)带来的大电流冲击风险,保障电力电子设备的安全运行,提出了一种基于分裂电感的限流式MMC-UPFC。通过对桥臂电感的分裂设计和串联变压器...
  • Spring Cloud Gateway 限流操作

    万次阅读 2018-07-23 16:00:42
    开发高并发系统时有三把利器用来保护系统:缓存、降级和限流,API网关作为所有请求的入口,请求量大,我们可以通过对并发访问的请求进行限速来保护系统的可用性。 常用的限流算法比如有令牌桶算法,漏桶算法,...

    开发高并发系统时有三把利器用来保护系统:缓存、降级和限流,API网关作为所有请求的入口,请求量大,我们可以通过对并发访问的请求进行限速来保护系统的可用性。

    常用的限流算法比如有令牌桶算法,漏桶算法,计数器算法等,在Zuul中我们可以自己去实现限流的功能(Zuul中如何限流在我的书《Spring Cloud微服务-全栈技术与案例解析》中有详细讲解),Spring Cloud Gateway的出现本身就是用来替代Zuul的,要想替代那肯定得有强大的功能,除了性能上的优势之外,Spring Cloud Gateway还提供了很多新功能,比如今天我们要讲的限流操作,使用起来非常简单,今天我们就来学习在如何在Spring Cloud Gateway中进行限流操作。

    目前限流提供了基于Redis的实现,我们需要增加对应的依赖:

     <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
    </dependency>
    

    可以通过KeyResolver来指定限流的Key,比如我们需要根据用户来做限流,IP来做限流等等。

    IP限流

    @Bean
    public KeyResolver ipKeyResolver() {
    	return exchange -> Mono.just(exchange.getRequest().getRemoteAddress().getHostName());
    }
    

    通过exchange对象可以获取到请求信息,这边用了HostName,如果你想根据用户来做限流的话这边可以获取当前请求的用户ID或者用户名就可以了,比如:

    用户限流
    使用这种方式限流,请求路径中必须携带userId参数

    @Bean
    KeyResolver userKeyResolver() {
    	return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("userId"));
    }
    

    接口限流
    获取请求地址的uri作为限流key

    @Bean
    KeyResolver apiKeyResolver() {
    	return exchange -> Mono.just(exchange.getRequest().getPath().value());
    }
    

    然后配置限流的过滤器信息:

    server:
      port: 8084
    spring:
      redis:
        host: 127.0.0.1
        port: 6379
      cloud:
        gateway:
          routes:
          - id: fsh-house
            uri: lb://fsh-house
            predicates:
            - Path=/house/**
            filters:
            - name: RequestRateLimiter
              args:
                redis-rate-limiter.replenishRate: 10
                redis-rate-limiter.burstCapacity: 20
                key-resolver: "#{@ipKeyResolver}"
    
    • filter名称必须是RequestRateLimiter
    • redis-rate-limiter.replenishRate:允许用户每秒处理多少个请求
    • redis-rate-limiter.burstCapacity:令牌桶的容量,允许在一秒钟内完成的最大请求数
    • key-resolver:使用SpEL按名称引用bean

    可以访问接口进行测试,这时候Redis中会有对应的数据:

    127.0.0.1:6379> keys *
    1) "request_rate_limiter.{localhost}.timestamp"
    2) "request_rate_limiter.{localhost}.tokens"
    

    大括号中就是我们的限流Key,这边是IP,本地的就是localhost

    • timestamp:存储的是当前时间的秒数,也就是System.currentTimeMillis() / 1000或者Instant.now().getEpochSecond()
    • tokens:存储的是当前这秒钟的对应的可用的令牌数量

    Spring Cloud Gateway目前提供的限流还是相对比较简单的,在实际中我们的限流策略会有很多种情况,比如:

    • 每个接口的限流数量不同,可以通过配置中心动态调整
    • 超过的流量被拒绝后可以返回固定的格式给调用方
    • 对某个服务进行整体限流(这个大家可以思考下用Spring Cloud Gateway如何实现,其实很简单)

    当然我们也可以通过重新RedisRateLimiter来实现自己的限流策略,这个我们后面再进行介绍。

    限流源码

    // routeId也就是我们的fsh-house,id就是限流的key,也就是localhost。
    public Mono<Response> isAllowed(String routeId, String id) {
        // 会判断RedisRateLimiter是否初始化了
    	if (!this.initialized.get()) {
    		throw new IllegalStateException("RedisRateLimiter is not initialized");
    	}
        // 获取routeId对应的限流配置
        Config routeConfig = getConfig().getOrDefault(routeId, defaultConfig);
    
        if (routeConfig == null) {
    		throw new IllegalArgumentException("No Configuration found for route " + routeId);
        }
    
        // 允许用户每秒做多少次请求
        int replenishRate = routeConfig.getReplenishRate();
    
        // 令牌桶的容量,允许在一秒钟内完成的最大请求数
        int burstCapacity = routeConfig.getBurstCapacity();
    
    	try {
            // 限流key的名称(request_rate_limiter.{localhost}.timestamp,request_rate_limiter.{localhost}.tokens)
    		List<String> keys = getKeys(id);
    
    
    		// The arguments to the LUA script. time() returns unixtime in seconds.
    		List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "",
    				Instant.now().getEpochSecond() + "", "1");
    		// allowed, tokens_left = redis.eval(SCRIPT, keys, args)
            // 执行LUA脚本
    		Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
    				// .log("redisratelimiter", Level.FINER);
    		return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
    				.reduce(new ArrayList<Long>(), (longs, l) -> {
    					longs.addAll(l);
    					return longs;
    				}) .map(results -> {
    					boolean allowed = results.get(0) == 1L;
    					Long tokensLeft = results.get(1);
    
    					Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft));
    
    					if (log.isDebugEnabled()) {
    						log.debug("response: " + response);
    					}
    					return response;
    				});
    	}
    	catch (Exception e) {
    		log.error("Error determining if user allowed from redis", e);
    	}
    	return Mono.just(new Response(true, getHeaders(routeConfig, -1L)));
    }
    

    LUA脚本在:
    WX20180715-150447@2x.png

    local tokens_key = KEYS[1]
    local timestamp_key = KEYS[2]
    --redis.log(redis.LOG_WARNING, "tokens_key " .. tokens_key)
    
    local rate = tonumber(ARGV[1])
    local capacity = tonumber(ARGV[2])
    local now = tonumber(ARGV[3])
    local requested = tonumber(ARGV[4])
    
    local fill_time = capacity/rate
    local ttl = math.floor(fill_time*2)
    
    --redis.log(redis.LOG_WARNING, "rate " .. ARGV[1])
    --redis.log(redis.LOG_WARNING, "capacity " .. ARGV[2])
    --redis.log(redis.LOG_WARNING, "now " .. ARGV[3])
    --redis.log(redis.LOG_WARNING, "requested " .. ARGV[4])
    --redis.log(redis.LOG_WARNING, "filltime " .. fill_time)
    --redis.log(redis.LOG_WARNING, "ttl " .. ttl)
    
    local last_tokens = tonumber(redis.call("get", tokens_key))
    if last_tokens == nil then
      last_tokens = capacity
    end
    --redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)
    
    local last_refreshed = tonumber(redis.call("get", timestamp_key))
    if last_refreshed == nil then
      last_refreshed = 0
    end
    --redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)
    
    local delta = math.max(0, now-last_refreshed)
    local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
    local allowed = filled_tokens >= requested
    local new_tokens = filled_tokens
    local allowed_num = 0
    if allowed then
      new_tokens = filled_tokens - requested
      allowed_num = 1
    end
    
    --redis.log(redis.LOG_WARNING, "delta " .. delta)
    --redis.log(redis.LOG_WARNING, "filled_tokens " .. filled_tokens)
    --redis.log(redis.LOG_WARNING, "allowed_num " .. allowed_num)
    --redis.log(redis.LOG_WARNING, "new_tokens " .. new_tokens)
    
    redis.call("setex", tokens_key, ttl, new_tokens)
    redis.call("setex", timestamp_key, ttl, now)
    
    return { allowed_num, new_tokens }
    
    

    欢迎加入我的知识星球,一起交流技术,免费学习猿天地的课程(http://cxytiandi.com/course)

    PS:目前星球中正在星主的带领下组队学习Spring Cloud,等你哦!

    微信扫码加入猿天地知识星球

    猿天地

    展开全文
  • 本文主要讲了限流电阻和分压电阻的区别,下面一起来学习一下
  • 德力西XRNT1-12/200型变压器保护用高压限流熔断pdf,德力西XRNT1-12/200型变压器保护用高压限流熔断:本产品适用于户内交流50Hz,额定电压12kV的系统,可与其他开关电器如负荷开关、真空接触配合使用,作为电力...
  • 德力西XRNT1-12/125型变压器保护用高压限流熔断pdf,德力西XRNT1-12/125型变压器保护用高压限流熔断:本产品适用于户内交流50Hz,额定电压12kV的系统,可与其他开关电器如负荷开关、真空接触配合使用,作为电力...
  • 本文是源码分析 Sentinel 系列的第十三篇,已经非常详细的介绍了 Sentinel 的架构体系、滑动窗口、调用链上下文、限流、熔断的实现原理,相信各位读者朋友们对Sentinel有一个较为体系化的认知了,这个时候是该开始...

    本文是源码分析 Sentinel 系列的第十三篇,已经非常详细的介绍了 Sentinel 的架构体系、滑动窗口、调用链上下文、限流、熔断的实现原理,相信各位读者朋友们对Sentinel有一个较为体系化的认知了,这个时候是该开始如何在生产环境进行运用了。

    本文将以 Dubbo 服务调用为案例剖析场景,尝试对官方提供的 Dubbo 适配器做一个研究学习并对此做出自己的评价,抛出我的观点,期待与大家共同探讨,交流。

    一个 Dubbo RPC 的简易调用过程如下图所示:
    在这里插入图片描述
    消费者会维护一个服务提供者列表,然后再发一起一个服务调用的时候会首先根据负载均衡算法从中选择一个服务提供者,然后发起 RPC 调用,在请求真实发送之前会依次通过客户端设置的过滤器链(Filter),然后经过网络传输到到达服务提供者,并执行完服务提供者端的 Filter,最后进入到业务逻辑执行并返回结果。

    Sentinel 与 Dubbo 的整合就是利用了 Dubbo 的 Filter 机制,为 Dubbo 提供对应的 过滤器,无缝实现限流、熔断等功能,做到业务无感知,即业务代码无需使用 Sentinel 相关的 API。

    接下来请大家带着在 Dubbo 中如何使用限流、熔断方面来看官方给出的解决方案。

    思考题:在看接下来的内容之前,建议大家稍作停顿,思考一下,在服务调用模型中,限流、熔断通常在哪个端做比较合适。

    1、从消费端来看限流与熔断

    在这里插入图片描述从消费端的视角,虽然提供了服务端的负载均衡,但从客户端不管是向192.168.1.3还是向192.168.1.4发送RPC调用,都会经过同一个 Sentinel Dubbo Filter。这个看似简单明了,但这是我们接下来思考的最基本最核心的点。

    我们先来看看官方提供的 Dubbo 适配器的核心实现:
    SentinelDubboConsumerFilter#invoke
    在这里插入图片描述
    消费端这边使用到了两个资源名称,一个是接口级别,例如 com.demo.service.UserService,另外一是方法级别,例如 com.demo.servcie.UserServce#findUser(Ljava.lang.String)。
    定义了两个资源后,Sentinel 会使用滑动窗口机制,为上述两个资源收集实时的调用信息,为后续的限流、熔断提供数据依据。

    限流规则是依附于具体某一个项目的,例如如下图所示:
    在这里插入图片描述限流、熔断都是根据资源级别,如果需要对消费端的调用进行限流的话,就需要为这两个资源配置对应的限流规则,如果不配置则默认通过,表示不限流。

    1.1 服务调用端(消费方)是否需要配置限流规则

    在 dubbo 的服务调用场景中,在消费端设置限流的规则的话,这个调用链是针对整个集群所有服务提供者的,例如当前集群中包含3个服务提供者,每个服务提供者用于1000tps的服务能力,那消费端的限流,应该配置的限流TPS应该为3000tps,如果设置为1000tps,则无法完整的利用服务端的能力,基于这样的情况,通常消费端无需配置限流规则。

    那是不是说消费端就没必要配置限流规则呢?其实也不是,有如下这个场景,例如调用第三方外部的计费类服务接口,对方通常为特定的账户等级设置对应的TPS上限,如果超过该调用频率就会抛出错误,这种情况还是需要设置限流规则,确保消费端以不超过要求进行调用,避免业务异常。

    1.2 服务调用端(消费方)是否需要配置熔断

    引入熔断的目的是避免服务端单节点响应慢而导致这个服务不稳定,例如示例中有3个服务提供者,如果192.168.1.3的服务提供者由于触发了BUG导致响应时间大大增加,导致发往该服务提供者的请求大概率超时,在这样的情况下希望在接下来某段时间内消费方发往这这个服务提供者的请求快速熔断降级,返回错误,由客户端重试其他服务提供者。其实现效果如下:
    在这里插入图片描述
    当前的 Sentinel 默认是否能满足上述的需求呢?

    我们以 Sentinel 基于异常比例熔断策略来进行阐述,如果资源的调用异常比例超过一定值是会触发降级熔断,抛出 DegradeException 异常。

    由于总共只有三个服务提供者,其中发往192.168.1.3的请求大概率会由于超时失败,则异常比例会超过设置的熔断降级规则,会触发降级,造成的效果是整个服务调用都会发送熔断降级,即调用192.168.1.4,5两个请求都不会被熔断,造成整个服务调用不可用,与期望不一致。即还是会出现一个节点的不稳定而导致整个服务不稳定的情况。

    其造成的根本原因是因为其资源的定义并没有包含服务提供者的信息,改进的初步方案:

    1. 在过滤器中再定义一个资源,加上服务提供的IP与端口号,例如 SphU.entry(“com.d.s.UserService@ip:port”),对单个服务提供者进行单独收集调用信息,并且需要提供一可配置的项,用来标记该类型的资源在做熔断判断可使用某一个资源的配置,例如配置为 com.d.s.UserService,表示用这个配置规则来判断熔断。
    2. 在熔断规则判断的时候,该资源使用被引用资源的熔断规则进行判断。

    最后来解答一下,熔断规则通常只需要配置在调用方即可。

    2、从服务来看限流与熔断

    由于服务端看限流与熔断就比较简单,因为服务端与客户端存在一个非常大的区别是客户端存在负载均衡机制,一个消费端对于同一资源来说,会调用多个服务提供者,而服务提供者对于一个资源来就是其自身,故限流规则,熔断规则都是针对个体,其复杂度降低。

    为了知识体系的完备性,我们来看一下 Sentinel Dubbo 在服务端的适配器的实现。

    SentinelDubboProviderFilter#invoke
    在这里插入图片描述
    这里有二个关键点:

    1. 使用了 ContextUtil 的 entry 方法,定义本次调用的上下文环境名称为:resourceName,默认为接口名与方法名、参数列表,例如 com.d.s.UserServce#findUser(Ljava.lang.String),源头为消费端的应用名称。
    2. 定义两个资源,这里与消费端相同,就不做重复解读。

    关于这个 ContextUtil 的 entry 方法非常关键,因为 Sentinel 中数据节点的统计是以 ContextName 为维度的。

    例如想对一个应用所有的操作 redis 操作统一设置为一个资源名,redisOpsResource,即想控制该应用整体的 redis 操作 tps,其场景如下:
    在这里插入图片描述
    例如初衷设计为 opsReisTotal 的整个 tps 为 500,例如从UserService#findser链路的访问 redis tps 为 400,而从 Order#createOrder 链路访问 redis tps 为 400,此时 redis 的整体 tps 已经达到了 800 tps,但你会发现并不会触发限流,因为对资源 RredisOpResource 的调用信息统计是以 context name 为维度的,不同的 context name 互不影响,从而无法做到全局控制。

    3、总结

    本文结合 Sentinel 官方对于 Dubbo 提供的适配器并加以理解,提出了如下观点,欢迎大家留言探讨,相互交流,共同进步。

    1. 限流规则根据不同的使用场景可以在客户端、服务端配置。
    2. 熔断规则通常在服务调用方配置即可。
    3. Sentinel 目前的熔断还实现的比较简单,无法解决集群中因为某一个节点的访问慢而触发熔断,并使服务稳定的能力。
    4. Sentienl 的实时统计是以调用上下文(Context Name),即 ContextUtil.entry 方法定义的上下文为维度的,这点非常关键,不然容易踩坑。

    好了,本文就介绍到这里了,您的点赞是对我持续输出高质量文章最大的鼓励。

    欢迎加笔者微信号(dingwpmz),加群探讨,笔者优质专栏目录:
    1、源码分析RocketMQ专栏(40篇+)
    2、源码分析Sentinel专栏(12篇+)
    3、源码分析Dubbo专栏(28篇+)
    4、源码分析Mybatis专栏
    5、源码分析Netty专栏(18篇+)
    6、源码分析JUC专栏
    7、源码分析Elasticjob专栏
    8、Elasticsearch专栏(20篇+)
    9、源码分析MyCat专栏

    展开全文
  • 德力西XRNM1-3.6/400型电动机保护用高压限流熔断pdf,德力西XRNM1-3.6/400型电动机保护用高压限流熔断:本产品适用于户内交流50Hz,额定电压3.6KV系统.可与其他保护电器(如开关,真空断路)配合使用,作为高压电动...
  • 德力西XRNM1-7.2/224型电动机保护用高压限流熔断pdf,德力西XRNM1-7.2/224型电动机保护用高压限流熔断:本产品适用于户内交流50Hz,额定电压7.2KV系统.可与其他保护电器(如开关,真空断路)配合使用,作为高压电动...
  • Spring Cloud Gateway 结合配置中心限流

    千次阅读 2018-08-20 09:37:52
    上篇文章我讲过复杂的限流场景可以通过扩展RedisRateLimiter来实现自己的限流策略。 假设你领导给你安排了一个任务,具体需求如下: - 针对具体的接口做限流 - 不同接口限流的力度可以不同 - 可以动态调整限流...

    前言

    上篇文章我讲过复杂的限流场景可以通过扩展RedisRateLimiter来实现自己的限流策略。

    假设你领导给你安排了一个任务,具体需求如下:

    • 针对具体的接口做限流
    • 不同接口限流的力度可以不同
    • 可以动态调整限流配置,实时生效

    如果你接到上面的任务,你会怎么去设计+实现呢?

    每个人看待问题的角度不同,自然思考出来的方案也不同,正所谓条条大路通罗马,能到达目的地的路那就是一条好路。

    如何分析需求

    下面我给出我的实现方式,仅供各位参考,大牛请忽略。

    具体问题具体分析,针对需求点,分别去做分析。

    需求一 “如何针对具体的接口做限流” 这个在上篇文章中也有讲过,只需要让KeyResolver返回的是接口的URI即可,这样限流的维度那就是对这个接口进行限流。

    需求二 “不同接口限流的力度可以不同” 这个通过配置的方式明显实现不了,配置中的replenishRate和burstCapacity都是配置死的,如果要做成动态的那么必须的自己通过扩展RedisRateLimiter来实现。

    前提是必须有一个配置列表,这个配置列表就是每个接口对应的限流数值。有了这个配置我们就可以通过请求的接口获取这个接口对应的限流值。

    需求三“可以动态调整限流配置,实时生效” 这个的话也比较容易,无论你是存文件,存数据库,存缓存只要每次都去读取,必然是实时生效的,但是性能问题我们不得不考虑啊。

    存文件,读取文件,耗IO,主要是不方便修改
    存数据库,可以通过web界面去修改,也可以直接改数据库,每次都要查询,性能不行
    存分布式缓存(redis),性能比数据库有提高

    对比下来肯定是缓存是最优的方案,还有更好的方案吗?
    有,结合配置中心来做,我这边用自己的配置中心(https://github.com/yinjihuan/smconf)来讲解,换成其他的配置中心也是一样的思路。

    配置中心的优点在于它本来就是用来存储配置的,配置在项目启动时加载完毕,当有修改时推送更新,每次读取都在本地对象中,性能好。

    具体方案有了之后我们就可以开始撸代码了,但是你有想过这么多接口的限流值怎么初始化吗?手动一个个去加?

    不同的服务维护的小组不同,当然也有可能是一个小组维护,从设计者的角度来思考,应该把设置的权利交给用户,交给我们的接口开发者,每个接口能够承受多少并发让用户来定,你的职责就是在网关进行限流。当然在公司中具体的限制量也不一定会由开发人员来定哈,这个得根据压测结果,做最好的调整。

    话不多说-开始撸码

    首先我们定义自己的RedisRateLimiter,复制源码稍微改造下即可, 这边只贴核心代码。

    public class CustomRedisRateLimiter extends AbstractRateLimiter<CustomRedisRateLimiter.Config>
    		implements ApplicationContextAware {
    
    	public static final String CONFIGURATION_PROPERTY_NAME = "custom-redis-rate-limiter";
    	public static final String REDIS_SCRIPT_NAME = "redisRequestRateLimiterScript";
    	public static final String REMAINING_HEADER = "X-RateLimit-Remaining";
    	public static final String REPLENISH_RATE_HEADER = "X-RateLimit-Replenish-Rate";
    	public static final String BURST_CAPACITY_HEADER = "X-RateLimit-Burst-Capacity";
    
    	public CustomRedisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate, RedisScript<List<Long>> script,
    			Validator validator) {
    		super(Config.class, CONFIGURATION_PROPERTY_NAME, validator);
    		this.redisTemplate = redisTemplate;
    		this.script = script;
    		initialized.compareAndSet(false, true);
    	}
    
    	public CustomRedisRateLimiter(int defaultReplenishRate, int defaultBurstCapacity) {
    		super(Config.class, CONFIGURATION_PROPERTY_NAME, null);
    		this.defaultConfig = new Config().setReplenishRate(defaultReplenishRate).setBurstCapacity(defaultBurstCapacity);
    	}
    
    	// 限流配置
    	private RateLimitConf rateLimitConf;
    
    	@Override
    	@SuppressWarnings("unchecked")
    	public void setApplicationContext(ApplicationContext context) throws BeansException {
    		// 加载配置
    		this.rateLimitConf = context.getBean(RateLimitConf.class);	
    	}
    
    
    	/**
    	 * This uses a basic token bucket algorithm and relies on the fact that
    	 * Redis scripts execute atomically. No other operations can run between
    	 * fetching the count and writing the new count.
    	 */
    	@Override
    	@SuppressWarnings("unchecked")
    	public Mono<Response> isAllowed(String routeId, String id) {
    		if (!this.initialized.get()) {
    			throw new IllegalStateException("RedisRateLimiter is not initialized");
    		}
    
    		//Config routeConfig = getConfig().getOrDefault(routeId, defaultConfig);
    		
    		if (rateLimitConf == null) {
    			throw new IllegalArgumentException("No Configuration found for route " + routeId);
    		}
    		Map<String,Integer> routeConfig = rateLimitConf.getLimitMap();
    
    		// Key的格式:服务名称.接口URI.类型
    		String replenishRateKey = routeId + "." + id + ".replenishRate";
    		int replenishRate = routeConfig.get(replenishRateKey) == null ? routeConfig.get("default.replenishRate") : routeConfig.get(replenishRateKey);
    		
    		String burstCapacityKey = routeId + "." + id + ".burstCapacity";
    		int burstCapacity = routeConfig.get(burstCapacityKey) == null ? routeConfig.get("default.burstCapacity") : routeConfig.get(burstCapacityKey);
    			
    		try {
    			List<String> keys = getKeys(id);
    
    			// The arguments to the LUA script. time() returns unixtime in
    			// seconds.
    			List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "",
    					Instant.now().getEpochSecond() + "", "1");
    			// allowed, tokens_left = redis.eval(SCRIPT, keys, args)
    			Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
    			// .log("redisratelimiter", Level.FINER);
    			return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
    					.reduce(new ArrayList<Long>(), (longs, l) -> {
    						longs.addAll(l);
    						return longs;
    					}).map(results -> {
    						boolean allowed = results.get(0) == 1L;
    						Long tokensLeft = results.get(1);
    
    						Response response = new Response(allowed, getHeaders(replenishRate, burstCapacity, tokensLeft));
    
    						if (log.isDebugEnabled()) {
    							log.debug("response: " + response);
    						}
    						return response;
    					});
    		} catch (Exception e) {
    			/*
    			 * We don't want a hard dependency on Redis to allow traffic. Make
    			 * sure to set an alert so you know if this is happening too much.
    			 * Stripe's observed failure rate is 0.01%.
    			 */
    			log.error("Error determining if user allowed from redis", e);
    		}
    		return Mono.just(new Response(true, getHeaders(replenishRate, burstCapacity, -1L)));
    	}
    
    	public HashMap<String, String> getHeaders(Integer replenishRate, Integer burstCapacity, Long tokensLeft) {
    		HashMap<String, String> headers = new HashMap<>();
    		headers.put(this.remainingHeader, tokensLeft.toString());
    		headers.put(this.replenishRateHeader, String.valueOf(replenishRate));
    		headers.put(this.burstCapacityHeader, String.valueOf(burstCapacity));
    		return headers;
    	}
    
    }
    

    需要在setApplicationContext中加载我们的配置类,配置类的定义如下:

    @CxytianDiConf(system="fangjia-gateway")
    public class RateLimitConf {
    	// 限流配置
    	@ConfField(value = "limitMap")
    	private Map<String, Integer> limitMap = new HashMap<String, Integer>(){{
    		put("default.replenishRate", 100);
    		put("default.burstCapacity", 1000);
    	}};
    	public void setLimitMap(Map<String, Integer> limitMap) {
    		this.limitMap = limitMap;
    	}
    	public Map<String, Integer> getLimitMap() {
    		return limitMap;
    	}
    }
    
    

    所有的接口对应的限流信息都在map中,有默认值,如果没有对应的配置就用默认的值对接口进行限流。

    isAllowed方法中通过‘服务名称.接口URI.类型’组成一个Key, 通过这个Key去Map中获取对应的值。

    类型的作用主要是用来区分replenishRate和burstCapacity两个值。

    接下来就是配置CustomRedisRateLimiter:

    
    @Bean
    @Primary
    public CustomRedisRateLimiter customRedisRateLimiter(
                      ReactiveRedisTemplate<String, String> redisTemplate,	 
                      @Qualifier(CustomRedisRateLimiter.REDIS_SCRIPT_NAME)  RedisScript<List<Long>> redisScript,
    		          Validator validator) {
    	return new CustomRedisRateLimiter(redisTemplate, redisScript, validator);
    }
    

    网关这边的逻辑已经实现好了,接下来就是需要在具体的服务中自定义注解,然后将限流的参数初始化到我们的配置中心就可以了。

    定义注解

    @Target(ElementType.METHOD)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    public @interface ApiRateLimit {
    	
    	/**
    	 * 速率
    	 * @return
    	 */
    	int replenishRate() default 100;
    	
    	/**
    	 * 容积
    	 * @return
    	 */
    	int burstCapacity() default 1000;
    	
    }
    
    

    启动监听器,读取注解,初始化配置

    /**
     * 初始化API网关需要进行并发限制的API
     * @author yinjihuan
     *
     */
    public class InitGatewayApiLimitRateListener implements ApplicationListener<ApplicationReadyEvent> {
    
    	// Controller包路径
    	private String controllerPath;
    
    	private RateLimitConf rateLimitConf;
    	
    	private ConfInit confInit;
    	
    	private String applicationName;
    	
    	public InitGatewayApiLimitRateListener(String controllerPath) {
    		this.controllerPath = controllerPath;
    	}
    
    	@Override
    	public void onApplicationEvent(ApplicationReadyEvent event) {
    		this.rateLimitConf = event.getApplicationContext().getBean(RateLimitConf.class);
    		this.confInit = event.getApplicationContext().getBean(ConfInit.class);
    		this.applicationName = event.getApplicationContext().getEnvironment().getProperty("spring.application.name");
    		try {
    			initLimitRateAPI();
    		} catch (Exception e) {
    			throw new RuntimeException("初始化需要进行并发限制的API异常", e);
    		}
    	}
    	
    	/**
    	 * 初始化需要进行并发限制的API
    	 * @throws IOException
    	 * @throws ClassNotFoundException
    	 */
    	private void initLimitRateAPI() throws IOException, ClassNotFoundException {
    		Map<String, Integer> limitMap = rateLimitConf.getLimitMap();
    		ClasspathPackageScannerUtils scan = new ClasspathPackageScannerUtils(this.controllerPath);
    		List<String> classList = scan.getFullyQualifiedClassNameList();
    		for (String clazz : classList) {
    			Class<?> clz = Class.forName(clazz);
    			if (!clz.isAnnotationPresent(RestController.class)) {
    				continue;
    			}
    			Method[] methods = clz.getDeclaredMethods();
    			for (Method method : methods) {
    				if (method.isAnnotationPresent(ApiRateLimit.class)) {
    					ApiRateLimit apiRateLimit = method.getAnnotation(ApiRateLimit.class);
    					String replenishRateKey = applicationName + "." + getApiUri(clz, method) + ".replenishRate";
    					String burstCapacityKey = applicationName + "." + getApiUri(clz, method) + ".burstCapacity";
    					limitMap.put(replenishRateKey, apiRateLimit.replenishRate());
    					limitMap.put(burstCapacityKey, apiRateLimit.burstCapacity());
    				}
    			}
    		}
    		rateLimitConf.setLimitMap(limitMap);
    		// 初始化值到配置中心
    		confInit.init(rateLimitConf);
    	}
    
    	 private String getApiUri(Class<?> clz, Method method) {
    	        StringBuilder uri = new StringBuilder();
    	        uri.append(clz.getAnnotation(RequestMapping.class).value()[0]);
    	        if (method.isAnnotationPresent(GetMapping.class)) {
    	            uri.append(method.getAnnotation(GetMapping.class).value()[0]);
    	        } else if (method.isAnnotationPresent(PostMapping.class)) {
    	            uri.append(method.getAnnotation(PostMapping.class).value()[0]);
    	        } else if (method.isAnnotationPresent(RequestMapping.class)) {
    	            uri.append(method.getAnnotation(RequestMapping.class).value()[0]);
    	        }
    	        return uri.toString();
    	 }
    }
    

    配置监听器

    SpringApplication application = new SpringApplication(FshHouseServiceApplication.class);
    application.addListeners(new InitGatewayApiLimitRateListener("com.fangjia.fsh.house.controller"));
    context = application.run(args);
    

    最后使用就很简单了,只需要增加注解就可以了

    @ApiRateLimit(replenishRate=10, burstCapacity=100)
    @GetMapping("/data")
    public HouseInfo getData(@RequestParam("name") String name) {
    	return new HouseInfo(1L, "上海", "虹口", "东体小区");
    }
    

    我这边只是给大家提供一种去实现的思路,也许大家还有更好的方案。

    我觉得只要不让每个开发都去关心这种非业务性质的功能,那就可以了,都在框架层面处理掉。当然实现原理可以跟大家分享下,会用很好,既会用又了解原理那就更好了。

    新书购买:单本75折包邮

    WechatIMG48.jpeg

    欢迎加入我的知识星球,一起交流技术,免费学习猿天地的课程(http://cxytiandi.com/course)

    PS:目前星球中正在星主的带领下组队学习Spring Cloud,等你哦!

    微信扫码加入猿天地知识星球

    猿天地

    展开全文
  • 德力西XRNT1-12M/125型变压器保护用高压限流熔断pdf,德力西XRNT1-12M/125型变压器保护用高压限流熔断:本产品适用于户内交流50Hz,额定电压12kV的系统,可与其他开关电器如负荷开关、真空接触配合使用,作为...
  • 德力西XRNT1-12M/50 型变压器保护用高压限流熔断pdf,德力西XRNT1-12M/50型变压器保护用高压限流熔断:本产品适用于户内交流50Hz,额定电压12kV的系统,可与其他开关电器如负荷开关、真空接触配合使用,作为...
  • Sentinel 触发限流的实现类为 FlowSlot。我们再来简单思考一下,要实现触发限流,至少需要完成如下几件事情: 收集实时调用信息。 设置触发限流规则 根据限流规则与调用信息来决定是否对请求进行限流等。 如何收集...

    Sentinel 触发限流的实现类为 FlowSlot。我们再来简单思考一下,要实现触发限流,至少需要完成如下几件事情:

    • 收集实时调用信息。
    • 设置触发限流规则
    • 根据限流规则与调用信息来决定是否对请求进行限流等。

    如何收集实时调用信息在前面的文章中已详细介绍,请带着上述问题开始本节的探讨。

    1、初始 FlowSlot

    我们先从 FlotSlot 类的注释来简单认识一下流量控制相关的内容。

    • 根据已(NodeSelectorSlot、ClusterNodeBuilderSlot 和 StatisticSlot)收集的运行时统计信息,FlowSlot将使用预先设置的规则来决定是否应阻止传入请求。
    • SphU.entry(resourceName)调用时,如果有任意一条规则被触发则会抛出 FlowException 异常,应用程序可捕捉该异常对业务进行定制化处理。
    • 每一条流控规则(FlowRule)都包含三个要素:流控类别、基于调用链的流控制策略、限流后的处理行为(参考FlowRule相关的注释)。
      • grade 流量控制的阈值类型
        可选值:QPS(基于QPS限流策略)、并发线程数。
      • strategy 基于调用链的流控制策略
        可选值:STRATEGY_DIRECT(根据调用方限流策略)、STRATEGY_RELATE(关联流量限
        流策略)、STRATEGY_CHAIN(根据调用链入口限流策略)
      • controlBehavior 流量控制后的采取的行为
        CONTROL_BEHAVIOR_DEFAULT(直接拒绝)、CONTROL_BEHAVIOR_WARM_UP(预热)、CONTROL_BEHAVIOR_RATE_LIMITER(匀速排队)、
        CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER(预热与匀速排队)。

    2、FlowSlot 详解

    2.1 FlowSlot 类图

    在这里插入图片描述
    FlowSlot 的类图非常简单,内部持有一个成员变量,FlowRuleChecker,用来判断是否满足流控触发条件。

    在继续探讨 Sentinel 限流之前,我们先来了解一下 FlowRule,即认识一下 Sentienl 流控规则主要包含哪些配置项,为后续的流程做一个消息的准备。

    2.2 FlowRule 配置项

    FlowRule 的类体系如图所示:
    在这里插入图片描述
    其属性的含义如下:

    • String resource
      资源的名称。
    • String limitApp
      需要限制的调用来源,对应【新增流控规则界面】的针对来源。
    • int grade
      流量控制的阈值类型,目前支持 QPS 与 并发线程数,对应 【新增流控规则界面】的阔值类型。
    • int strategy
      基于调用链的流量控制策略,对应【新增流控规则界面】的流控模式,其可选取值在本文开头部分有详细介绍。
    • String refResource
      关联资源或入口资源,当流控模式为关联或链路时配置的关联资源或入口资源,对应【新增流控规则界面】的【入口资源】
    • int controlBehavior
      流量控制后的采取的行为,其可选取值在本文开头部分有详细介绍,对应【新增流控规则界面】的流控效果。
    • int warmUpPeriodSec
      预热时间,如果 controlBehavior 设置为预热(warm up)时,可以配置其预热时间,在【新增流控规则界面】中选择 warm up 类型后,会增加一行,供用户配置,默认值 10s。
    • int maxQueueingTimeMs
      最大超时时间,如果 controlBehavior 设置为排队等待时,等待的最大超时时间,默认为500ms。
    • boolean clusterMode
      是否是集群限流模式,对应【新增流控规则界面】的是否集群。
    • ClusterFlowConfig clusterConfig
      集群扩容相关配置,集群限流将在后续文章中重点介绍。

    在 sentinel-dashboard 的配置界面如下图所示:
    在这里插入图片描述

    2.3 FlowSlot#entry 流程详解

    FlowSlot#entry

    public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                          boolean prioritized, Object... args) throws Throwable {   // @1
        checkFlow(resourceWrapper, context, node, count, prioritized);       // @2
        fireEntry(context, resourceWrapper, node, count, prioritized, args);  // @3
    }
    

    代码@1:首先来解释一下该方法的参数:

    • Context context
      当前 Sentinel 调用的上下文。
    • ResourceWrapper resourceWrapper
      当前访问的资源。
    • DefaultNode node
      当前上下文环境对应的节点。
    • int count
      本次调用需要消耗的“令牌”个数
    • boolean prioritized
      是否是高优先级。
    • Object… args
      额外参数。

    代码@2:调用 checkFlow ,根据配置的限流规则,结合实时统计信息,判断是否满足流控条件,如果满足,则触发流控,稍后会详细探讨该方法的实现原理。

    代码@3:调用 fireEntry 继续沿着 slot 链进行传播。

    FlowSlot 的 checkFlow 方法在内部就是直接调用 FlowRuleChecker 的 checkFlow 方法,故我们将目光放到 FlowRuleChecker 中。

    2.4 FlowRuleChecker checkFlow 方法详解

    FlowRuleChecker#checkFlow

    public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                              Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
        if (ruleProvider == null || resource == null) { 
            return;
        }
        Collection<FlowRule> rules = ruleProvider.apply(resource.getName());   // @1
        if (rules != null) {
            for (FlowRule rule : rules) {
                if (!canPassCheck(rule, context, node, count, prioritized)) {            // @2
                    throw new FlowException(rule.getLimitApp(), rule);
                }
            }
        }
    }
    

    代码@1:通过限流规则提供器获取与该资源相关的流控规则列表。

    代码@2:然后遍历流控规则列表,通过调用 canPassCheck 方法来判断是否满足该规则设置的条件,如果满足流控规则,则抛出 FlowException,即只需要满足一个即结束校验。

    接下来继续查看 canPassCheck 方法。

    2.4.1 FlowRuleChecker canPassCheck 详解
    public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, 
    				int acquireCount, boolean prioritized) {
        String limitApp = rule.getLimitApp(); 
        if (limitApp == null) {    // @1
            return true;
        }
        if (rule.isClusterMode()) {  // @2
            return passClusterCheck(rule, context, node, acquireCount, prioritized);  
        }
        return passLocalCheck(rule, context, node, acquireCount, prioritized);     
    }
    

    代码@1:如果限流规则没有配置针对来源,则直接默认通过,该值在配置时,默认为 default,即对所有调用发起方都生效。

    代码@2:如果是集群限流模式,则调用 passClusterCheck,非集群限流模式则调用 passLocalCheck 方法,本文重点讲述单节点限流,集群限流模式将在后续文章中详细探讨。

    FlowRuleChecker#passLocalCheck

    private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
            boolean prioritized) {
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);    // @1
        if (selectedNode == null) {
            return true;
        }
        return rule.getRater().canPass(selectedNode, acquireCount, prioritized);    // @2
    }
    

    代码@1:首先根据流控模式(strategy)选择一个合适的 Node,看到这,大家可以思考一下,这一步骤的目的,如果为空,则直接返回 true,表示放行。

    代码@2:调用 FlowRule 内部持有的流量控制器来判断是否符合流控规则,最终调用的是 TrafficShapingController canPass 方法。

    那我们接下来分别对上述两个方法进行详细展开。

    2.4.1.1 selectNodeByRequesterAndStrategy
    FlowRuleChecker#selectNodeByRequesterAndStrategy
    static Node selectNodeByRequesterAndStrategy(FlowRule rule, Context context, DefaultNode node) {
        String limitApp = rule.getLimitApp();
        int strategy = rule.getStrategy();
        String origin = context.getOrigin();   // @1
        if (limitApp.equals(origin) && filterOrigin(origin)) {    // @2
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                // Matches limit origin, return origin statistic node.
                return context.getOriginNode();
            }
            return selectReferenceNode(rule, context, node);
        } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {  // @3
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                // Return the cluster node.
                return node.getClusterNode();
            }
            return selectReferenceNode(rule, context, node);
        } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
                && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {    // @4
            if (strategy == RuleConstant.STRATEGY_DIRECT) {
                return context.getOriginNode();
            }
            return selectReferenceNode(rule, context, node);
        }
        return null;
    }
    

    在介绍该方法之前,先回答上文提到一个问题,我们知道,要判断是否满足了限流规则所配置的条件,一个重要的点就是要拿到当前的实时统计信息,通过上面介绍限流规则时提到 Sentinel 目前支持3种流控模式(直接、关联、链路),针对模式的不同,选择的实时统计数据的逻辑就应该不同,即该方法主要是根据流控策略找到对应的实时统计信息(Node)。

    代码@1:首先先介绍几个局部变量的含义:

    • String limitApp
      该条限流规则针对的调用方。
    • int strategy
      该条限流规则的流控策略。
    • String origin
      本次请求的调用方,从当前上下文环境中获取,例如 dubbo 服务提供者,原始调用方为 dubbo 服务提供者的 application。

    代码@2:如果限流规则配置的针对的调用方与当前请求实际调用来源匹配(并且不是 default、other)时的处理逻辑,其实现的要点:

    • 如果流控模式为 RuleConstant.STRATEGY_DIRECT(直接),则从 context 中获取源调用方所代表的 Node。
    • 如果流控模式为 RuleConstant.STRATEGY_RELATE(关联),则从集群环境中获取对应关联资源所代表的 Node,通过(ClusterBuilderSlot会收集每一个资源的实时统计信息,子集群限流时详细介绍)
    • 如果流控模式为 RuleConstant.STRATEGY_CHAIN(调用链),则判断当前调用上下文的入口资源与规则配置的是否一样,如果是,则返回入口资源对应的 Node,否则返回 null,注意:返回空则该条流控规则直接通过。【这部分代码,对应代码中的 selectReferenceNode 方法】

    代码@3:如果流控规则针对的调用方(limitApp) 配置的为 default,表示对所有的调用源都生效,其获取实时统计节点(Node)的处理逻辑为:

    • 如果流控模式为 RuleConstant.STRATEGY_DIRECT,则直接获取本次调用上下文环境对应的节点的ClusterNode。
    • 如果是其他流控模式,与代码@2的获取逻辑一样,都是调用 selectReferenceNode 进行获取。

    代码@4:如果流控规则针对的调用方为(other),此时需要判断是否有针对当前的流控规则,只要存在,则这条规则对当前资源“失效”,如果针对该资源没有配置其他额外的流控规则,则获取实时统计节点(Node)的处理逻辑为:

    • 如果流控模式为 RuleConstant.STRATEGY_DIRECT(直接),则从 context 中获取源调用方所代表的 Node。
    • 如果是其他流控模式,与代码@2的获取逻辑一样,都是调用 selectReferenceNode 进行获取。

    从这里可以看出,流控规则针对调用方如果设置为 other,表示针对没有配置流控规则的资源。

    根据流控策略选择合适的 Node 的逻辑就介绍到这里,如果没有选择到合适的 Node,则针对该流控规则,默认放行。

    2.4.1.2 TrafficShapingController canPass

    经过上一个步骤获取到对应的实时统计数据,接下来就是根据数据与流控规则,是否匹配。Sentinel 中用于实现流控规则的匹配其类体系如图所示:
    在这里插入图片描述
    由于篇幅的关系,本节只会以 DefaultController 来介绍其实现原理,对应【流控模式:快速失败】,由于篇幅的关系,其他两种流控模式将在下文详细探讨。
    DefaultController#canPass

    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        int curCount = avgUsedTokens(node);     // @1
        if (curCount + acquireCount > count) {   // @2
            if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {   // @3
                long currentTime;
                long waitInMs;
                currentTime = TimeUtil.currentTimeMillis();
                waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);   // @4
                if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {             // @5
                    node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                    node.addOccupiedPass(acquireCount);
                    sleep(waitInMs);                                                                                  // @6
                    // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                    throw new PriorityWaitException(waitInMs);   // @7
                }
            }
            return false;     // @8
        }
        return true;       // @9
    }
    

    代码@1:首先先解释一下两个局部变量的含义:

    • int curCount
      当前已消耗的令牌数量,即当前时间窗口内已创建的线程数量(FLOW_GRADE_THREAD) 或已通过的请求个数(FLOW_GRADE_QPS)。
    • double count
      流控规则中配置的阔值(即一个时间窗口中总的令牌个数)

    代码@2:如果当前请求的令牌数加上已消耗的令牌数之和小于总令牌数,则直接返回true,表示通过,见代码@9;如果当前时间窗口剩余令牌数小于需要申请的令牌数,则需要根据是否有优先级进行不同的处理。

    • 如果该请求存在优先级,即 prioritized 设置为 true,并且流控类型为基于QPS进行限流,则进入相关的处理逻辑,见代码@3~@8。
    • 否则直接返回 false,最终会直接抛出 FlowException,即快速失败,应用方可以捕捉该异常,对其业务进行容错处理。

    代码@4:尝试抢占下一个滑动窗口的令牌,并返回该时间窗口所剩余的时间,如果获取失败,则返回 OccupyTimeoutProperty.getOccupyTimeout() 值,该返回值的作用就是当前申请资源的线程将 sleep(阻塞)的时间。

    代码@5:如果 waitInMs 小于抢占的最大超时时间,则在下一个时间窗口中增加对应令牌数,并且线程将sleep,见代码@6。

    代码@7:这里不是很明白为什么等待 waitMs 之后,还需要抛出 PriorityWaitException,那这个prioritized 机制、可抢占下一个时间窗口的令牌有什么意义呢?应该是一个BUG吧。

    3、总结

    整个 FlowSlot 限流规则就介绍到这里了,为了更加直观的认识其限流的流程,下面给出一张流程图来对上面的源码分析进行一个总结。
    在这里插入图片描述

    该篇注重理论与实践相结合,在进行源码解读之前先从流控规则配置界面入手,代入感比较强,文章再提供一张流程图。

    整个限流部分目前还有所欠缺的两个部分:
    1、流程规则的存储与加载。
    2、其他几种流控后行为(预热、匀速排队等实现原理)

    该部分内容将在后续文章中详细介绍,本文疑似发现一个BUG,也请大家一起交流、探讨。
    在分析 DefaultController canPass 方法时,prioritized 为 true 时,执行 sleep 方法唤醒后不管三七二十一,直接抛出 PriorityWaitException 这是要起到一个什么作用呢?

    点赞是一种美德,麻烦帮忙点个赞,谢谢


    欢迎加笔者微信号(dingwpmz),加群探讨,笔者优质专栏目录:
    1、源码分析RocketMQ专栏(40篇+)
    2、源码分析Sentinel专栏(12篇+)
    3、源码分析Dubbo专栏(28篇+)
    4、源码分析Mybatis专栏
    5、源码分析Netty专栏(18篇+)
    6、源码分析JUC专栏
    7、源码分析Elasticjob专栏
    8、Elasticsearch专栏(20篇+)
    9、源码分析MyCat专栏

    展开全文
  • 25、zuul之多维度限流

    千次阅读 2019-03-12 09:46:10
    对请求的目标URL进行限流(例如:某个URL每分钟只允许调用多少次) 对客户端的访问IP进行限流(例如:某个IP每分钟只允许请求多少次) 对某些特定用户或者用户组进行限流(例如:非VIP用户限制每分钟只...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 43,404
精华内容 17,361
关键字:

交流限流器