-
2019-11-18 19:45:16
本文实现了一种基本的令牌桶算法
令牌桶算法思想:以固定速率产生令牌,放入令牌桶,每次用户请求都得申请令牌,令牌不足则拒绝请求或等待。
代码逻辑:线程池每0.5s发送随机数量的请求,每次请求计算当前的令牌数量,请求令牌数量超出当前令牌数量,则产生限流。
@Slf4j public class TokensLimiter { private ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(5); // 最后一次令牌发放时间 public long timeStamp = System.currentTimeMillis(); // 桶的容量 public int capacity = 10; // 令牌生成速度10/s public int rate = 10; // 当前令牌数量 public int tokens; public void acquire() { scheduledExecutorService.scheduleWithFixedDelay(() -> { long now = System.currentTimeMillis(); // 当前令牌数 tokens = Math.min(capacity, (int) (tokens + (now - timeStamp) * rate / 1000)); //每隔0.5秒发送随机数量的请求 int permits = (int) (Math.random() * 9) + 1; log.info("请求令牌数:" + permits + ",当前令牌数:" + tokens); timeStamp = now; if (tokens < permits) { // 若不到令牌,则拒绝 log.info("限流了"); } else { // 还有令牌,领取令牌 tokens -= permits; log.info("剩余令牌=" + tokens); } }, 1000, 500, TimeUnit.MILLISECONDS); } public static void main(String[] args) { TokensLimiter tokensLimiter = new TokensLimiter(); tokensLimiter.acquire(); } }
输出结果:
16:13:20.042 [pool-1-thread-1] INFO com.example.demo.limit.TokensLimiter - 请求令牌数:1,当前令牌数:10
16:13:20.045 [pool-1-thread-1] INFO com.example.demo.limit.TokensLimiter - 剩余令牌=9
16:13:20.549 [pool-1-thread-1] INFO com.example.demo.limit.TokensLimiter - 请求令牌数:7,当前令牌数:10
16:13:20.549 [pool-1-thread-1] INFO com.example.demo.limit.TokensLimiter - 剩余令牌=3
16:13:21.054 [pool-1-thread-2] INFO com.example.demo.limit.TokensLimiter - 请求令牌数:5,当前令牌数:8
16:13:21.054 [pool-1-thread-2] INFO com.example.demo.limit.TokensLimiter - 剩余令牌=3
16:13:21.559 [pool-1-thread-1] INFO com.example.demo.limit.TokensLimiter - 请求令牌数:1,当前令牌数:8
16:13:21.559 [pool-1-thread-1] INFO com.example.demo.limit.TokensLimiter - 剩余令牌=7
16:13:22.063 [pool-1-thread-3] INFO com.example.demo.limit.TokensLimiter - 请求令牌数:7,当前令牌数:10
16:13:22.063 [pool-1-thread-3] INFO com.example.demo.limit.TokensLimiter - 剩余令牌=3
16:13:22.568 [pool-1-thread-3] INFO com.example.demo.limit.TokensLimiter - 请求令牌数:7,当前令牌数:8
16:13:22.568 [pool-1-thread-3] INFO com.example.demo.limit.TokensLimiter - 剩余令牌=1
16:13:23.072 [pool-1-thread-3] INFO com.example.demo.limit.TokensLimiter - 请求令牌数:7,当前令牌数:6
16:13:23.072 [pool-1-thread-3] INFO com.example.demo.limit.TokensLimiter - 限流了更多相关内容 -
java实现令牌桶限流
2020-09-11 08:59:33限流是对某一时间窗口内的请求数进行限制,保持...常用的限流算法有令牌桶和和漏桶,而Google开源项目Guava中的RateLimiter使用的就是令牌桶控制算法。 在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流 -
基于令牌桶算法的Java限流实现
2021-02-12 09:50:40项目需要使用限流措施,查阅后主要使用令牌桶算法实现,为了更灵活的实现限流,就自己实现了一个简单的基于令牌桶算法的限流实现。令牌桶算法描述令牌桶这种控制机制基于令牌桶中是否存在令牌来指示什么时候可以发送...项目需要使用限流措施,查阅后主要使用令牌桶算法实现,为了更灵活的实现限流,就自己实现了一个简单的基于令牌桶算法的限流实现。
令牌桶算法描述
令牌桶这种控制机制基于令牌桶中是否存在令牌来指示什么时候可以发送流量。令牌桶中的每一个令牌都代表一个字节。如果令牌桶中存在令牌,则允许发送流量;而如果令牌桶中不存在令牌,则不允许发送流量。因此,如果突发门限被合理地配置并且令牌桶中有足够的令牌,那么流量就可以以峰值速率发送。
令牌桶算法图描述
简单的说就是,一边请求时会消耗桶内的令牌,另一边会以固定速率往桶内放令牌。当消耗的请求大于放入的速率时,进行相应的措施,比如等待,或者拒绝等。
Java的简单实现
为了更灵活的定制限流措施,自己实现了限流的部分代码,如下:
/**
* @author xiezhengchao
* @since 18/1/3 上午9:45.
* 限流器
*/
public class RateLimiter{
private volatile int token;
private final int originToken;
private static Unsafe unsafe = null;
private static final long valueOffset;
private final Object lock = new Object();
static {
try {
// 应用开发中使用unsafe对象必须通过反射获取
Class> clazz = Unsafe.class;
Field f = clazz.getDeclaredField("theUnsafe");
f.setAccessible(true);
unsafe = (Unsafe) f.get(clazz);
valueOffset = unsafe.objectFieldOffset(RateLimiter.class.getDeclaredField("token"));
} catch (Exception ex) {throw new Error(ex);}
}
public RateLimiter(int token){
this.originToken = token;
this.token = token;
}
/**
* 获取一个令牌
*/
public boolean acquire(){
int current = token;
if(current<=0){
// 保证在token已经用光的情况下依然有获取竞争的能力
current = originToken;
}
long expect = 1000;// max wait 1s
long future = System.currentTimeMillis()+expect;
while(current>0){
if(compareAndSet(current, current-1)){
return true;
}
current = token;
if(current<=0 && expect>0){
// 在有效期内等待通知
synchronized (lock){
try {
lock.wait(expect);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
current = token;
if(current<=0){
current = originToken;
}
expect = future - System.currentTimeMillis();
}
}
return false;
}
private boolean compareAndSet(int expect, int update) {
return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
}
/**
* 补充令牌
*/
public void supplement(final ExecutorService executorService){
this.token = originToken;
executorService.execute(() -> {
synchronized (lock){
lock.notifyAll();
}
});
}
}
核心代码在acquire方法部分主要思路就是,优先采用CAS进行自旋操作获取令牌,一直尝试令牌消耗光,那么会进入等待,在定时线程调用supplement方法时,会唤醒所有等待线程,此时进入CAS进行尝试消耗令牌,以此循环一直到设置的最大等待时间(代码中的expect)消耗光,如果还没获得令牌,那么会返回false
这段代码如果自己起一个线程进行限流,然后自己开个定时线程进行补充也可以,但是实际运用中往往需要一个限流管理器来分配限流器,然后通过限流管理器统一的进行定时触发,这样可以不用开很多的定时线程,同时通过线程池也避免了在定时线程竞争锁时引发的过长等待造成定时线程不准的情况。
下面贴出限流管理器部分代码
/**
* @author xiezhengchao
* @since 18/1/3 上午9:43.
* 限流管理器
*/
@Component
public class ConfineManager{
// 定时线程
private final ScheduledThreadPoolExecutor scheduledCheck = new ScheduledThreadPoolExecutor(2);
// 执行补充线程池
private final ExecutorService executorService = new ThreadPoolExecutor(5, 200,
60L, TimeUnit.SECONDS, new SynchronousQueue<>(),
new NamedThreadFactory("supplement",true,false));
// 限流器容器
private Map rateLimiterMap = new ConcurrentHashMap<>();
@PostConstruct
public void init(){
scheduledCheck.scheduleAtFixedRate(new SupplementRateLimiter(), 1, 1, TimeUnit.SECONDS);
}
@PreDestroy
public void destroy(){
scheduledCheck.shutdown();
}
/**
* 通过key获取相应的限流器
*/
public void acquire(String key,int tokenCount){
RateLimiter rateLimiter = rateLimiterMap.get(key);
// 双检锁确保安全创建
if(rateLimiter==null){
synchronized (this){
// init RateLimiter
rateLimiter = rateLimiterMap.computeIfAbsent(key, k -> new RateLimiter(tokenCount));
}
}
// 尝试获取令牌
if(!rateLimiter.acquire()){
// 获取失败,根据实际情况进行处理,这里直接抛异常了
Assert.throwBizException(ErrorCode.API_CONFINE_RATE_LIMITER);
}
}
/**
* 补充相应的令牌数
*/
private class SupplementRateLimiter implements Runnable{
@Override
public void run(){
rateLimiterMap.values().forEach(rateLimiter -> rateLimiter.supplement(executorService));
}
}
}
代码中主要是创建了定时线程,补充令牌线程池。
部分代码不是开源的,需要调整下,不影响主流程,代码使用了一些spring的注解。
其中的不足
在集群的环境下,没有考虑分布式的情况,也就是如果一个应用部署的限流是1s产生10个令牌,假设部署了5个应用,那么实际1s可以产生50个令牌。如果需要考虑这部分,那么在CAS操作可以替换为通过redis的setnx来进行获取锁操作然后更新redis存储对应的令牌,补充则直接设置更新redis对应的令牌数即可,这样效率肯定比现在基于CAS操作低。
总结
实际上实现这个限流器更多的考虑是可以自行定义等待的最大时间,超时措施,定时补充令牌时间间隔等,同时也温习了一下之前的并发知识。
-
限流算法-令牌桶、漏桶算法之java实现
2022-02-28 17:02:46限流算法-令牌桶、漏桶算法之java实现介绍
令牌桶算法:
一个存放固定容量令牌的桶,按照固定速率往桶里添加令牌;如下:
a.假设是2r/s,则每500毫秒添加n个令牌;
b.桶中最多存放x个令牌,桶满时,再添加会拒绝;
c.请求去获取令牌,如果令牌足够,允许通过;如果令牌不足,拒绝请求或放入缓冲区等待;漏桶算法:
漏桶容量固定,按照固定速率流出水滴;如果桶是空的,不用流,如果桶满了,再流入水滴,则拒绝服务。
区别
令牌桶控制的是平均流入速率,速率可高可低,允许有突发请求;漏桶控制的是恒定的流出速率,从而平滑流入速率。
java实现
令牌桶算法:
package com.cvnavi.oa.report; import org.apache.kafka.common.protocol.types.Field; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; public class TokenLimiter { private ArrayBlockingQueue<String> blockingQueue; //容量大小 private int limit; //令牌的产生间隔 private int period; //令牌每次产生的个数 private int amount; public TokenLimiter(int limit, int period, int amount) { this.limit = limit; this.period = period; this.amount = amount; blockingQueue = new ArrayBlockingQueue<>(limit); } private void init() { for(int i = 0; i < limit; i++) { blockingQueue.add("lp"); } } private void addToken(int amount) { for(int i = 0; i < amount; i++) { //溢出返回false blockingQueue.offer("lp"); } } /** * 获取令牌 * @return */ public boolean tryAcquire() { //队首元素出队 return blockingQueue.poll() != null ? true : false; } /** * 生产令牌 */ private void start(Object lock) { Executors.newScheduledThreadPool(1).scheduleAtFixedRate(() -> { synchronized (lock) { addToken(this.amount); lock.notify(); } }, 500, this.period, TimeUnit.MILLISECONDS); } /** * 先生产2个令牌,减少4个令牌;再每500ms生产2个令牌,减少4个令牌 */ public static void main(String[] args) throws InterruptedException { int period = 500; TokenLimiter limiter = new TokenLimiter(8, period, 2); limiter.init(); Object lock = new Object(); limiter.start(lock); //让线程先产生2个令牌(溢出) synchronized (lock) { lock.wait(); } for(int i = 0; i < 8; i++) { for(int j = 0; j < 4; j++) { String s = i + "," + j + ":"; if(limiter.tryAcquire()) { System.out.println(s + "拿到令牌"); } else{ System.out.println(s + "拒绝"); } } Thread.sleep(period); } } }
漏桶算法:package com.cvnavi.report; public class Funnel { //漏斗容量 int capacity; //漏水速度 double leakingRate; //漏斗剩余空间(不是水的大小) int leftQuota; //上次漏斗漏水的时间 long leakingTs; public Funnel(int capacity, double leakingRate, int leftQuota, long leakingTs) { this.capacity = capacity; this.leakingRate = leakingRate; this.leftQuota = leftQuota; this.leakingTs = leakingTs; } private void makeSpace() { long nowTs = System.currentTimeMillis(); //这个是间隔的时间 long deltaTs = nowTs - this.leakingTs; //漏掉的水 int deltaQuota = (int)(deltaTs * leakingRate); //间隔时间太长,溢出 if (deltaQuota < 0){ this.leftQuota = capacity; this.leakingTs = nowTs; return; } //说明漏的时间不够 if (deltaQuota < 1) { return; } this.leakingTs = nowTs; this.leftQuota = this.leftQuota + deltaQuota; if (this.leftQuota > this.capacity) { this.leftQuota = this.capacity; } } /** * 流入 * @param quota * @return */ public Boolean water(int quota) { makeSpace(); //表示量充足 if (leftQuota >= quota){ leftQuota = leftQuota - quota; return true; } //剩余量不够 return false; } } package com.cvnavi.report; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; public class FunnelLimitRate { private static Map<String,Funnel> funnels = new HashMap<>(); public static void main(String[] args) throws InterruptedException { String userId= "2019"; String actionKey = "rgister"; int capacity = 8; double leakingRate = 0.001; int leftQuota = 5; Funnel funnel = funnels.get(userId); if (funnel == null) { funnel = new Funnel(capacity, leakingRate, leftQuota, System.currentTimeMillis()); } //每1000ms(1s),流入1的水 //每1000ms,流出1000*leakingRate(而且要>1)的水 for (int i = 0; i < 8; i++) { Boolean isBoolean = funnel.water(1); TimeUnit.MILLISECONDS.sleep(1000); System.out.println(isBoolean + " " + funnel.leftQuota); } } }
-
Java 实现令牌桶限流算法 原生极简实现 包括单机和多线程版本
2021-10-24 11:37:53文章目录漏桶(令牌桶)算法简介令牌桶算法限流范围:单机版实现多线程版实现 漏桶(令牌桶)算法简介 令牌桶是指一个限流容器,容器有最大容量,每秒或每100ms产生一个令牌(具体取决于机器每秒处理的请求数),当容量...令牌桶算法简介
令牌桶是指一个限流容器,容器有最大容量,每秒或每100ms产生一个令牌(具体取决于机器每秒处理的请求数),当容量中令牌数量达到最大容量时,令牌数量也不会改变了,只有当有请求过来时,使得令牌数量减少(只有获取到令牌的请求才会执行业务逻辑),才会不断生成令牌,所以令牌桶算法是一种弹性的限流算法
限流完下一步我们可以做什么呢,限流完后所有请求都是获取令牌的,都是我们要执行的,但是如果还有很多请求,这时我们要有一个方式决定怎样执行这些请求,这个方式称为调度,可以看我这篇文章 调度算法
令牌桶算法限流范围:
假设令牌桶最大容量为n,每秒产生r个令牌
- 平均速率:则随着时间推延,处理请求的平均速率越来越趋近于每秒处理r个请求,说明令牌桶算法可以控制平均速率
- 瞬时速率:如果在一瞬间有很多请求进来,此时来不及产生令牌,则在一瞬间最多只有n个请求能获取到令牌执行业务逻辑,所以令牌桶算法也可以控制瞬时速率
在这里提下漏桶,漏桶由于出水量固定,所以无法应对突然的流量爆发访问,也就是没有保证瞬时速率的功能,但是可以保证平均速率
单机版实现
- capacity为当前令牌桶的中令牌数量,timeStamp为上一次请求获取令牌的时间,我们没必要真的实现计时器每秒产生多少令牌放入容器中,只要记住上一次请求到来的时间,和这次请求的差值就知道在这段时间内产生了多少令牌
- 下面假设100ms产生1个令牌,且令牌桶最大容量为10
- Thread.sleep是模拟不同请求到来的间隔,改变间隔可以看到不同现象
public class LeakyBucketAlgorithm { private int capacity = 10; private long timeStamp = System.currentTimeMillis(); public boolean getToken() { if (capacity > 0) { capacity--; return true; } long current = System.currentTimeMillis(); if (current - timeStamp >= 100) { if ((current - timeStamp) / 100 >= 2) { // 假设100ms产生一个令牌 capacity += (int)((current - timeStamp) / 100) - 1; } timeStamp = current; if (capacity > 10) capacity = 10; return true; } return false; } public static void main(String[] args) throws InterruptedException { LeakyBucketAlgorithm leakyBucketAlgorithm = new LeakyBucketAlgorithm(); while (true) { // Thread.sleep(10); Thread.sleep(100); if (leakyBucketAlgorithm.getToken()) { System.out.println("获取令牌成功,可以执行业务逻辑了"); } else { System.out.println("获取令牌失败,请稍后重试"); } } } }
多线程版实现
- 多线程实现也非常简单,只要在方法加一个同步锁,保证同时只有一个请求能尝试获取令牌就可以了
- Thread.sleep(1000)是因为有10个线程在抢令牌,而每100ms才产生一个令牌,所以每隔1秒抢一次,可以改变这个值看现象
public class LeakyBucketAlgoritmThread { private int capacity = 10; private long timeStamp = System.currentTimeMillis(); public synchronized boolean getToken() { if (capacity > 0) { capacity--; return true; } long current = System.currentTimeMillis(); if (current - timeStamp >= 100) { if ((current - timeStamp) / 100 >= 2) { capacity += (int)((current - timeStamp) / 100) - 1; } timeStamp = current; if (capacity > 10) capacity = 10; return true; } return false; } public static void main(String[] args) throws InterruptedException { LeakyBucketAlgoritmThread leakyBucketAlgorithmThread = new LeakyBucketAlgoritmThread(); for (int i = 0; i < 10; i++) { new Thread(new Runnable() { @Override public void run() { while (true) { try { // Thread.sleep(900); Thread.sleep(1000); if (leakyBucketAlgorithmThread.getToken()) { System.out.println(Thread.currentThread().getName()+" 获取令牌成功,可以执行业务逻辑了"); } else { System.out.println(Thread.currentThread().getName()+" 获取令牌失败,请稍后重试"); } } catch (Exception e) { e.printStackTrace(); } } } }).start(); } } }
-
令牌桶算法及实现(三)
2021-03-06 17:39:59在第一篇、 第二篇文章中分别介绍了Guava令牌桶算法原理,固定速率生产token的SmothBursty限流器。但在实际环境中,如果想在初始阶段或隔一段时间系统再次被调用时,有一个预热的过程,即启动时生产令牌的速率慢一些... -
令牌桶算法
2022-06-09 10:07:10令牌桶算法就实现了这个功能,可控制发送到网络上数据的数目,并允许突发数据的发送。 CAR(Committed Access Rate)三色模式 为控制网络速率,首先我们需要对进入网络的用户流量的速率限制在约定的范围之内,从而避免... -
(附Java代码)基于令牌桶算法的分布式限流工具
2021-02-12 17:13:50一、开场白最近在使用SpringCloudGateway构建项目网关,处理限流的过程中发现gateway提供了一种基于令牌桶的分布式限流实现,非常感兴趣,于是在经过一番处理,从gateway的源码中提取出一个轻量的基于令牌桶算法的... -
基于令牌桶算法实现的SpringBoot无锁限流插件
2019-08-07 18:39:19基于令牌桶算法实现的SpringBoot无锁限流插件,支持方法级别、系统级别的限流,提供快速失败与CAS阻塞两种方案,开箱即用! -
bucket4j-基于令牌桶算法的Java速率限制库
2019-08-07 14:29:55bucket4j类库是一款优秀的java限流类库,可以用来限流,也可以用作简单调度 -
令牌桶算法原理
2021-10-27 16:33:40token_bucket令牌桶算法原理 讲在原理之前 我们今天主要是分析令牌桶算法,分析之前我们先介绍一下使用该算法的背景。 限流 每个API接口都是有访问上限的,当访问频率或者并发量超过其承受范围时候,我们就必须考虑限... -
基于RateLimiter的令牌桶算法实现限速控制和计算法实现限流控制
2020-08-01 10:05:31本demo适用于分布式环境的基于RateLimiter令牌桶算法的限速控制与基于计数器算法的限量控制,可应用于中小型项目中有相关需求的场景(注:本实现未做压力测试,如果用户并发量较大需验证效果)。 -
限流之令牌桶和漏桶算法(java)
2022-07-01 09:47:10限流算法 -
bucket4j, 基于令牌桶算法的Java速率限制库.zip
2019-10-10 03:52:04bucket4j, 基于令牌桶算法的Java速率限制库 Bucket4j - 基于令牌桶算法的Java速率限制库。 的优点在已经知算法的基础上实现,它是in行业的速率限制的实际标准。有效的锁自由实现,Bucket4j可以用于多线程处理。绝对... -
简单分析Guava中RateLimiter中的令牌桶算法的实现
2021-02-12 17:28:24令牌桶算法是网络流量整形(Traffic Shaping)和速率限制(Rate Limiting)中最常使用的一种算法。典型情况下,令牌桶算法用来控制发送到网络上的数据的数目,并允许突发数据的发送。大小固定的令牌桶可自行以恒定的速率... -
漏桶算法和令牌桶算法
2021-05-20 16:22:22二、Redis+lua脚本 + 令牌桶算法 实现限流控制 1、自定义一个注解,用来给限流的方法标注 2、编写lua脚本 3、读取lua脚本 4、创建拦截器拦截带有该注解的方法 5、在WebConfig中注册这个这个拦截器 6、注解使用... -
redis实现的简单令牌桶
2021-03-13 07:02:39这里给出的令牌桶是以redis单节点或者集群为中间件. 不过, 这里的实现比较简单, 主要提供两个函数, 一个用于消费令牌, 一个用于添加令牌. 这里, 消费令牌和添加令牌都是通过lua来保证原子性.消费令牌的代码如下 :// ... -
Java 限流算法之令牌桶实现
2021-08-15 10:57:44采用计数法实现,通过子任务定期向桶中添加令牌。 -
java面试必备--JAVA算法(四) 之 高并发之限流令牌桶和漏桶算法
2021-07-26 09:19:31不论是干得不开心还是想跳槽涨薪,在如此内卷的行业,我们都面临着“面试造火箭,上班拧螺丝”的局面,鉴于当前形势博主呕心沥血整理的干货满满的造火箭的技巧来了,本博主花费2个月时间,整理归纳java全生态知识... -
高并发系统限流-漏桶算法和令牌桶算法
2021-05-17 23:30:22一、问题描述 某天A君突然发现自己的接口请求量突然涨到之前的10倍,没多久该接口几乎不可使用,并引发连锁反应导致整个系统崩溃。... 常用的限流算法有两种:漏桶算法和令牌桶算法。 漏桶算... -
服务限流之令牌桶算法
2022-04-16 00:11:49令牌桶算法是一个存放固定容量令牌的桶,按照固定速率往桶里添加令牌。 令牌桶算法的描述如下: 假设限制2r/s,则按照500毫秒的固定速率往桶中添加令牌; 桶中最多存放b个令牌,当桶满时,新添加的令牌被丢弃或拒绝... -
漏桶算法与令牌桶算法
2021-06-21 16:07:00漏桶算法(有点像Kafka,大批量的数据吞吐) 漏桶算法的主要思路为: 在nginx层与controller层加一层(即漏桶层), 用于接收nginx收到的大批量的请求,接收的请求的速度是没有控制的,但是如果超过了漏桶层的最大容量则... -
令牌桶简单实现(Java)
2020-07-15 14:08:30令牌桶简单实现(Java) 文章目录令牌桶简单实现(Java)简介实现思路codemain输出结果 简介 百度可得,令牌桶是一个桶,定时往里面放令牌,然后请求来了从令牌桶里取令牌,取到了继续后续逻辑,没取到就拦截不让... -
限流——漏桶算法和令牌桶算法的区别
2020-09-02 18:08:20限流 在开发高并发系统时有三把利器用来保护系统:缓存、降级和限流 缓存:缓存的目的是提升系统访问速度和增大系统处理能力 ...常用的限流算法有令牌桶和,漏桶,滑动窗口 漏桶算法 漏桶(Leaky Bucket) 算