精华内容
下载资源
问答
  • Alibaba Sentinel 源码阅读(Part 2 LeapArray
    千次阅读
    2018-10-21 12:00:00

    前言

    这一篇是上一篇的继续,如果不了解Sentinel ,请先阅读[Alibaba Sentinel 源码阅读(Part1 执行流程)](Alibaba Sentinel 源码阅读(Part1 执行流程))

    入口

    在上一篇我们看到 我们获取的所有信息,都是从StatisticNode 的这两个数据结构中获取的

    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(1000 / SampleCountProperty.SAMPLE_COUNT,
            IntervalProperty.INTERVAL);
    
        /**
         * Holds statistics of the recent 120 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds,
         * meaning each bucket per second, in this way we can get accurate statistics of each second.
         */
        private transient Metric rollingCounterInMinute = new ArrayMetric(1000, 2 * 60);
    

    rollingCounterInMinute 这个两分钟之内的每一秒中数据的一个list,而每一秒中的数据是存储在 MetricBucket,

    ArrayMetric

    // ArrayMetric 实现了Metric 接口,同时包含了 MetricsLeapArray数据结构,接口的实现就是通过这个MetricsLeapArray来实现的
    // MetricsLeapArray 是从 LeapArray 继承的,所以这一篇的重点就是LeapArray了
    public class ArrayMetric implements Metric {
        private final MetricsLeapArray data;
    
        /**
         * Constructor
         *
         * @param windowLengthInMs a single window bucket's time length in milliseconds.
         * @param intervalInSec    the total time span of this {@link ArrayMetric} in seconds.
         */
        public ArrayMetric(int windowLengthInMs, int intervalInSec) {
            this.data = new MetricsLeapArray(windowLengthInMs, intervalInSec);
        }
    }
    

    LeapArray

    实际上就是一个环形数组,来给张官方的图就明白了

    看文档其实很清晰,整个是基于时间窗口滑动算法来实现的

    新增当前统计数据

    @Override
        public void addSuccess() {
            WindowWrap<MetricBucket> wrap = data.currentWindow();
            wrap.value().addSuccess();
        }
    

    获取时间窗口内统计数据

    @Override
        public long success() {
            data.currentWindow();
            long success = 0;
    
            List<MetricBucket> list = data.values();
            for (MetricBucket window : list) {
                success += window.success();
            }
            return success;
        }
    

    所以重点的方法就是 data.currentWindow()方法了

    protected final AtomicReferenceArray<WindowWrap<T>> array;
    
    public LeapArray(int windowLengthInMs, int intervalInSec) {
            this.windowLengthInMs = windowLengthInMs;
            this.intervalInMs = intervalInSec * 1000;
            this.sampleCount = intervalInMs / windowLengthInMs;
            // 初始化容量大小
            this.array = new AtomicReferenceArray<WindowWrap<T>>(sampleCount);
        }
    
    
    
      /**
         * Get window at provided timestamp.
         *
         * @param time a valid timestamp
         * @return the window at provided timestamp
         */
        public WindowWrap<T> currentWindow(long time) {
            long timeId = time / windowLengthInMs;
            // Calculate current index.
            int idx = (int)(timeId % array.length());
    
            // Cut the time to current window start.
            time = time - time % windowLengthInMs;
    
            while (true) {
                WindowWrap<T> old = array.get(idx);
                if (old == null) {
                    WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, time, newEmptyBucket());
                    if (array.compareAndSet(idx, null, window)) {
                        return window;
                    } else {
                        Thread.yield();
                    }
                } else if (time == old.windowStart()) {
                    return old;
                } else if (time > old.windowStart()) {
                    if (updateLock.tryLock()) {
                        try {
                            // if (old is deprecated) then [LOCK] resetTo currentTime.
                            return resetWindowTo(old, time);
                        } finally {
                            updateLock.unlock();
                        }
                    } else {
                        Thread.yield();
                    }
    
                } else if (time < old.windowStart()) {
                    // Cannot go through here.
                    return new WindowWrap<T>(windowLengthInMs, time, newEmptyBucket());
                }
            }
        }
    
    
    

    这部分的内容会维持一个有效的环形数组以统计数据,具体要自己debug 看了。

    总结

    这里也只是把大致流程梳理了一下方便大家看源码而已,很多地方没有具体分析,这部分还是需要自己亲力亲为。

    参考

    Window Sliding Technique

    转载于:https://my.oschina.net/tigerlene/blog/2250158

    更多相关内容
  • Sentinel 底层采用高性能的滑动窗口数据结构LeapArray来统计实时的秒级指标数据,可以很好地支撑写多于读的高并发场景。 由此可以发现Sentinel使用了滑动窗口算法来做数据统计,并且具体实现是在LeapArray类中。 ...

    最近在使用Alibaba Sentinel来做服务的限流、熔断和降级。一直有一个比较好奇的点,Sentinel是如果做到高效的数据统计的。通过官方文档介绍

    • StatisticSlot: 则用于记录、统计不同纬度的 runtime 指标监控信息;(做实时统计)
    • Sentinel 底层采用高性能的滑动窗口数据结构LeapArray来统计实时的秒级指标数据,可以很好地支撑写多于读的高并发场景。

    由此可以发现Sentinel使用了滑动窗口算法来做数据统计,并且具体实现是在LeapArray类中。

    Sentinel 总体的框架如下:
    image.png

    通过架构图我们可以看到StatisticSlot中的LeapArray采用了一个环性数组的数据结构,这个和一致性hash算法的图类似,如图:

    image.png

    在这个结构中,每一个下标位就代表一个滑动窗口,至于这个窗口是怎么滑动的我们可以结合原来看。

    LeapArray 源码

    源码路径

    StatisticSlot作为统计的入口,在其entry()方法中我们可以看到StatisticSlot会使用StatisticNode,然后StatisticNode回去引用ArrayMetric,最终使用LeapArray

    根据当前时间获取滑动窗口

    public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
        // 根据当前时间计算出当前时间属于那个滑动窗口的数组下标
        int idx = calculateTimeIdx(timeMillis);
        // 根据当前时间计算出当前滑动窗口的开始时间
        long windowStart = calculateWindowStart(timeMillis);
    
        /*
         * 根据下脚标在环形数组中获取滑动窗口(桶)
         *
         * (1) 如果桶不存在则创建新的桶,并通过CAS将新桶赋值到数组下标位。
         * (2) 如果获取到的桶不为空,并且桶的开始时间等于刚刚算出来的时间,那么返回当前获取到的桶。
         * (3) 如果获取到的桶不为空,并且桶的开始时间小于刚刚算出来的开始时间,那么说明这个桶是上一圈用过的桶,重置当前桶
         * (4) 如果获取到的桶不为空,并且桶的开始时间大于刚刚算出来的开始时间,理论上不应该出现这种情况。
         */
        while (true) {
            WindowWrap<T> old = array.get(idx);
            if (old == null) {
                /*
                 *     B0       B1      B2    NULL      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            bucket is empty, so create new and update
                 *
                 * If the old bucket is absent, then we create a new bucket at {@code windowStart},
                 * then try to update circular array via a CAS operation. Only one thread can
                 * succeed to update, while other threads yield its time slice.
                 */
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                if (array.compareAndSet(idx, null, window)) {
                    // Successfully updated, return the created bucket.
                    return window;
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart == old.windowStart()) {
                /*
                 *     B0       B1      B2     B3      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            startTime of Bucket 3: 800, so it's up-to-date
                 *
                 * If current {@code windowStart} is equal to the start timestamp of old bucket,
                 * that means the time is within the bucket, so directly return the bucket.
                 */
                return old;
            } else if (windowStart > old.windowStart()) {
                /*
                 *   (old)
                 *             B0       B1      B2    NULL      B4
                 * |_______||_______|_______|_______|_______|_______||___
                 * ...    1200     1400    1600    1800    2000    2200  timestamp
                 *                              ^
                 *                           time=1676
                 *          startTime of Bucket 2: 400, deprecated, should be reset
                 *
                 * If the start timestamp of old bucket is behind provided time, that means
                 * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
                 * Note that the reset and clean-up operations are hard to be atomic,
                 * so we need a update lock to guarantee the correctness of bucket update.
                 *
                 * The update lock is conditional (tiny scope) and will take effect only when
                 * bucket is deprecated, so in most cases it won't lead to performance loss.
                 */
                if (updateLock.tryLock()) {
                    try {
                        // Successfully get the update lock, now we reset the bucket.
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart < old.windowStart()) {
                // Should not go through here, as the provided time is already behind.
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }
    

    根据下脚标在环形数组中获取滑动窗口(桶)的规则:

    • (1) 如果桶不存在则创建新的桶,并通过CAS将新桶赋值到数组下标位。
    • (2) 如果获取到的桶不为空,并且桶的开始时间等于刚刚算出来的时间,那么返回当前获取到的桶。
    • (3) 如果获取到的桶不为空,并且桶的开始时间小于刚刚算出来的开始时间,那么说明这个桶是上一圈用过的桶,重置当前桶,并返回。
    • (4) 如果获取到的桶不为空,并且桶的开始时间大于刚刚算出来的开始时间,理论上不应该出现这种情况。

    这里有一个比较值得学习的地方是:

    1. 对并发的控制:当一个新桶的创建直接是使用的CAS的原子操作来保证并发;但是重置一个桶的时候因为很难保证其原子操作(1. 需要重置多个值;2. 重置方法是一个抽象方法,需要子类去做实现),所以直接使用一个ReentrantLock锁来做并发控制。
    2. Thread.yield();方法的使用,这个方法主要的作用是交出CPU的执行权,并重新竞争CPU执行权。这个方法再我们业务代码中其实很少用到。

    如何实现的滑动的

    通过上面这个方法我们可以看到我们是如果根据当前时间获取到一个桶的(滑动窗口)。但是如何实现滑动效果的呢?实现滑动效果主要看上面那个方法的如何找到桶的下标和如何更加当前时间找到当前桶的开始时间,如下:

    // 根据当前时间计算出当前时间属于那个滑动窗口的数组下标
    int idx = calculateTimeIdx(timeMillis);
    // 根据当前时间计算出当前滑动窗口的开始时间
    long windowStart = calculateWindowStart(timeMillis);
    
    // 根据当前时间计算出当前时间属于那个滑动窗口的数组下标
    private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
        // 利用除法取整原则,保证了一秒内的所有时间搓得到的timeId是相等的
        long timeId = timeMillis / windowLengthInMs;
        // 利用求余运算原则,保证一秒内获取到的桶的下标位是一致的
        return (int) (timeId % array.length());
    }
    
    // 根据当前时间计算出当前滑动窗口的开始时间
    protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
        // 利用求余运算原则,保证一秒内获取到的桶的开始时间是一致的
        // 100 - 100 % 10 = 100 - 0 = 100
        // 101 - 101 % 10 = 101 - 1 = 100
        // 102 - 102 % 10 = 102 - 2 = 100
        return timeMillis - timeMillis % windowLengthInMs;
    }
    
    • timeMillis:表示当前时间的时间戳
    • windowLengthInMs:表示一个滑动窗口的时间长度,根据源码来看是1000ms即一个滑动窗口统计1秒内的数据。

    这两个方法巧妙的利用了除法取整和求余原则实现了窗口的滑动。通过最上面的结构图我们可以发现滑动窗口会根据时间戳顺时针旋转。

    桶的数量就决定了滑动窗口的统计时长,根据源码来看是60个桶,即一个统计1分钟内的数据。

    内部是利用并发工具类LongAdder的特性来实现的高效的数据的统计。

    展开全文
  • Sentinel限流实现原理 要实现限流、熔断等功能,首先要解决的是如何对资源的访问信息进行收集。例如将一个接口限制调用设置为1w/tps,...LeapArray 滑动窗口顶层数据结构,包含一个一个的窗口数据。 ArrayMetric ..

    Sentinel限流实现原理

    要实现限流、熔断等功能,首先要解决的是如何对资源的访问信息进行收集。例如将一个接口限制调用设置为1w/tps,那么我们如何知道当前接口的实时tps?

    1.滑动窗口核心类图

    Sentinel是采用滑动窗口来实时收集接口的调用信息,核心的类图结构如下:

    https://note.youdao.com/yws/public/resource/d1b1e0285875883a0434ddb558408256/xmlnote/7DCCC867BA404E718BDD72FB85097B6D/9134

     

     

    • Metric

    指标收集核心接口,主要定义一个滑动窗口中成功的数量、异常数量、阻塞数量,TPS、响应时间等数据。

    • LeapArray

    滑动窗口顶层数据结构,包含一个一个的窗口数据。

    • ArrayMetric

    滑动窗口核心实现类。

    • WindowWrap

    每一个滑动窗口的包装类,其内部的数据结构用 MetricBucket 表示。

    • MetricBucket

    指标桶,例如通过数量、阻塞数量、异常数量、成功数量、响应时间,已通过未来配额(抢占下一个滑动窗口的数量)。

    • MetricEvent

    指标类型,例如通过数量、阻塞数量、异常数量、成功数量、响应时间等。

     

    2.滑动窗口实现原理

    2.1 ArrayMetric

    是滑动窗口的入口类,他的主要核心方法有:

    private final LeapArray<MetricBucket> data;//@1
    
    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);//@2
    }
    
    public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {//@3
        if (enableOccupy) {
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        } else {
            this.data = new BucketLeapArray(sampleCount, intervalInMs);
        }
    }

    代码@1:是一个LeapArray属性值,存储滑动窗口的数据

    代码@2、@3:提供了两个构造方法,核心参数主要有

    • sampleCount

    在一个采集区间内采集的数据个数,比如在1000ms 内 sampleCount=2,那么这个区间滑动窗口个数为2个,每个滑动窗口时间范围是500ms。

    • intervalInMs

    采集数据的时间间隔,比如1秒、1分钟。

    • enableOccupy

    是否允许抢占,即当前时间戳已经达到限制后,是否可以占用下一个时间窗口的容量

    ArrayMetric中唯一的属性是泛型参数为MetricBucket的LeapArray,MetricBucket是指标准的桶。

    它的主要作用是Sentinel采集各种信息放入到MetricBucket中,例如一个时间窗口内请求的:通过数量、阻塞数量、 异常数量、成功数量、响应时间

     

    2.2 MetricBucket

    它的类图:

    https://note.youdao.com/yws/public/resource/d1b1e0285875883a0434ddb558408256/xmlnote/4158A402858E4F4DA89E92DAD6A6D872/9231

    主要代码分析:

    private final LongAdder[] counters;//@1
    
    private volatile long minRt;//@2
    
    public MetricBucket() {
        MetricEvent[] events = MetricEvent.values();
        this.counters = new LongAdder[events.length];//@3
        for (MetricEvent event : events) {//@4
            counters[event.ordinal()] = new LongAdder();
        }
        initMinRt();//@5
    }

    代码@1:存放数据收集指标的数组,里边会存放MetricEvent枚举里定义的6种采集信息。

    代码@2:属性最小的rt

    代码@3:初始化容量为MetricEvent个枚举的LongAdder数组

    代码@4:实例化LongAdder。

    代码@5:初始化rt,默认为:DEFAULT_STATISTIC_MAX_RT = 5000

    MetricEvent枚举类定义了6种信息类型:

    public enum MetricEvent {
        /**
         * Normal pass.
         */
        PASS,
        /**
         * Normal block.
         */
        BLOCK,
        EXCEPTION,
        SUCCESS,
        RT,
    
        /**
         * Passed in future quota (pre-occupied, since 1.5.0).
         */
        OCCUPIED_PASS
    }

    我们来看下LongAdder的类图:

    https://note.youdao.com/yws/public/resource/d1b1e0285875883a0434ddb558408256/xmlnote/4E32705785A742F58034F1A21CE45AAA/9228

    LongAdder是一个高性能的避免缓存行填充的类,它比java中的原子类性能更高。Sentinel使用它来做数据手机记录会有很不错的性能。

    2.3 LeapArray

    接下来我们分析下LeapArray,它的类图如下:

    https://note.youdao.com/yws/public/resource/d1b1e0285875883a0434ddb558408256/xmlnote/D617B20211214564A3BFB65CE5508EC7/9239

    LeapArray主要属性和构造方法:

    protected int windowLengthInMs;
    protected int sampleCount;
    protected int intervalInMs;
    
    protected final AtomicReferenceArray<WindowWrap<T>> array;
    
    private final ReentrantLock updateLock = new ReentrantLock();

    LeapArray的核心属性如下:

    • int windowLengthInMs

    每一个窗口的时间间隔,单位为毫秒。

    • int sampleCount

    抽样个数,就一个统计时间间隔中包含的滑动窗口个数,在 intervalInMs 相同的情况下,sampleCount 越多,抽样的统计数据就越精确,相应的需要的内存也越多。

    • int intervalInMs

    一个统计的时间间隔。

    • AtomicReferenceArray> array

    一个统计时间间隔中滑动窗口的数组,从这里也可以看出,一个滑动窗口就是使用的 WindowWrap< MetricBucket > 来表示。

    LeapArray的构造函数:

    /**
     * The total bucket count is: {@code sampleCount = intervalInMs / windowLengthInMs}.
     *
     * @param sampleCount  bucket count of the sliding window
     * @param intervalInMs the total time interval of this {@link LeapArray} in milliseconds
     */
    public LeapArray(int sampleCount, int intervalInMs) {
        AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
        AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
        AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
    
        this.windowLengthInMs = intervalInMs / sampleCount;
        this.intervalInMs = intervalInMs;
        this.sampleCount = sampleCount;
    
        this.array = new AtomicReferenceArray<>(sampleCount);
    }

    构造函数可以看出windowLengthInMs是由intervalInMs / sampleCount计算得出。接下来我们继续探究LeapArray的源码,探寻滑动窗口的实现原理。

    2.3.1 currentWindow() 详解

    public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
        
        int idx = calculateTimeIdx(timeMillis);//@1
        // Calculate current bucket start time.
        long windowStart = calculateWindowStart(timeMillis);//@2
    
        /*
         * Get bucket item at given time from the array.
         *
         * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
         * (2) Bucket is up-to-date, then just return the bucket.
         * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
         */
        while (true) {
            WindowWrap<T> old = array.get(idx);
            if (old == null) {//@3
                /*
                 *     B0       B1      B2    NULL      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            bucket is empty, so create new and update
                 *
                 * If the old bucket is absent, then we create a new bucket at {@code windowStart},
                 * then try to update circular array via a CAS operation. Only one thread can
                 * succeed to update, while other threads yield its time slice.
                 */
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                if (array.compareAndSet(idx, null, window)) {
                    // Successfully updated, return the created bucket.
                    return window;
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart == old.windowStart()) {//@4
                /*
                 *     B0       B1      B2     B3      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            startTime of Bucket 3: 800, so it's up-to-date
                 *
                 * If current {@code windowStart} is equal to the start timestamp of old bucket,
                 * that means the time is within the bucket, so directly return the bucket.
                 */
                return old;
            } else if (windowStart > old.windowStart()) {//@5
                /*
                 *   (old)
                 *             B0       B1      B2    NULL      B4
                 * |_______||_______|_______|_______|_______|_______||___
                 * ...    1200     1400    1600    1800    2000    2200  timestamp
                 *                              ^
                 *                           time=1676
                 *          startTime of Bucket 2: 400, deprecated, should be reset
                 *
                 * If the start timestamp of old bucket is behind provided time, that means
                 * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
                 * Note that the reset and clean-up operations are hard to be atomic,
                 * so we need a update lock to guarantee the correctness of bucket update.
                 *
                 * The update lock is conditional (tiny scope) and will take effect only when
                 * bucket is deprecated, so in most cases it won't lead to performance loss.
                 */
                if (updateLock.tryLock()) {
                    try {
                        // Successfully get the update lock, now we reset the bucket.
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart < old.windowStart()) {//@6
                // Should not go through here, as the provided time is already behind.
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }
    
    private int calculateTimeIdx(long timeMillis) {
        //获取滑动窗口的个数
        long timeId = timeMillis / windowLengthInMs;
        // Calculate current index so we can map the timestamp to the leap array.
        //timeId和滑动窗口数量取模得到当前时间在数组中的下标
        return (int)(timeId % array.length());
    }
    
    protected long calculateWindowStart(long timeMillis) {
        return timeMillis - timeMillis % windowLengthInMs;
    }

    代码@1:计算当前时间对应的滑动窗口数组下标,算法为:(timeMillis / windowLengthInMs)% array.length()。如timeMillis=888,windowLengthInMs=200,采集时间间隔为1000ms一次,那么根据公式计算出来的滑动窗口在array中的下标为4。

    代码@2:计算当前时间滑动窗口的windowStart,Sentinel 给出是算法为 ( timeMillis - timeMillis % windowLengthInMs )。

    代码@3:根据下标找到了空的WindowWrap,这个时候就初始化一个WindowWrap对象,并且采用cas方式去更新WindowWrap对象到array中。

    代码@4:根据下标找到不为空的WindowWrap且滑动窗口windowStart等于old windowStart,直接返回。

    代码@5:当前时间窗口windowStart大于old windowStart,这说明old window已经过期需要对其进行重置

    代码@6:不知道这段代码会不会被执行

    2.3.2 isWindowDeprecated() 详解

    判断滑动窗口是否过期:

    public boolean isWindowDeprecated(/*@NonNull*/ WindowWrap<T> windowWrap) {
        return isWindowDeprecated(TimeUtil.currentTimeMillis(), windowWrap);
    }
    
    public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
        return time - windowWrap.windowStart() > intervalInMs;//@1
    }

    代码@1:当前时间-滑动窗口开始时间大于采集时间间隔,说明当前滑动窗口已经过期。

    2.3.3 getPreviousWindow()详解

    获取上一个时间窗口

    public WindowWrap<T> getPreviousWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
        int idx = calculateTimeIdx(timeMillis - windowLengthInMs);//@1
        timeMillis = timeMillis - windowLengthInMs;
        WindowWrap<T> wrap = array.get(idx);//@2
    
        if (wrap == null || isWindowDeprecated(wrap)) {//@3
            return null;
        }
    
        if (wrap.windowStart() + windowLengthInMs < (timeMillis)) {//@4
            return null;
        }
    
        return wrap;
    }

    代码@1:传入当前时间减去一个时间窗口间隔时间,计算出前一个时间窗口的数组下标位置。

    代码@2:获取前一个时间窗口

    代码@3:判断前一个时间窗口wrap是否为空或者是否已经过期,如果过期则返回null。

    代码@4:如果定位的窗口的开始时间再加上 windowLengthInMs 小于 timeMills ,说明失效,则返回 null

    3. 滑动窗口原理示意图

    https://note.youdao.com/yws/public/resource/d1b1e0285875883a0434ddb558408256/xmlnote/6D7E0D47572742A888633A55BD375937/12995

    展开全文
  • 现在我们要定位到这个时间窗口的位置是落在 LeapArray 中数组的下标,而一个 LeapArray 中包含 sampleCount 个元素,要得到其下标,则使用 n % sampleCount 即可。 代码@2:计算当前时间戳所在的时间窗口的开始...

    要实现限流、熔断等功能,首先要解决的问题是如何实时采集服务(资源)调用信息。例如将某一个接口设置的限流阔值 1W/tps,那首先如何判断当前的 TPS 是多少?Alibaba Sentinel 采用滑动窗口来实现实时数据的统计。

    温馨提示:如果对源码不太感兴趣,可以先跳到文末,看一下滑动窗口的设计原理图,再决定是否需要阅读源码。

    1、滑动窗口核心类图



    我们先对上述核心类做一个简单的介绍,重点关注核心类的作用与核心属性(重点需要探究其核心数据结构)。

    • Metric
      指标收集核心接口,主要定义一个滑动窗口中成功的数量、异常数量、阻塞数量,TPS、响应时间等数据。

    • ArrayMetric
      滑动窗口核心实现类。

    • LeapArray
      滑动窗口顶层数据结构,包含一个一个的窗口数据。

    • WindowWrap
      每一个滑动窗口的包装类,其内部的数据结构用 MetricBucket 表示。

    • MetricBucket
      指标桶,例如通过数量、阻塞数量、异常数量、成功数量、响应时间,已通过未来配额(抢占下一个滑动窗口的数量)。

    • MetricEvent
      指标类型,例如通过数量、阻塞数量、异常数量、成功数量、响应时间等。

    2、滑动窗口实现原理


    2.1 ArrayMetric

    滑动窗口的入口类为 ArrayMetric ,我们先来看一下其核心代码。

    private final LeapArray<MetricBucket> data;   // @1
    public ArrayMetric(int sampleCount, int intervalInMs) {    // @2
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }
    public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {   // @3
        if (enableOccupy) {
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        } else {
            this.data = new BucketLeapArray(sampleCount, intervalInMs);
        }
    }
    

    代码@1:ArrayMetric 类唯一的属性,用来存储各个窗口的数据,这个是接下来我们探究的重点。

    代码@2,代码@3  该类提供了两个构造方法,其核心参数为:

    • int intervalInMs
      表示一个采集的时间间隔,例如1秒,1分钟。

    • int sampleCount
      在一个采集间隔中抽样的个数,默认为 2,例如当 intervalInMs = 1000时,抽象两次,则一个采集间隔中会包含两个相等的区间,一个区间就是滑动窗口。

    • boolean enableOccupy
      是否允许抢占,即当前时间戳已经达到限制后,是否可以占用下一个时间窗口的容量,这里对应 LeapArray 的两个实现类,如果允许抢占,则为 OccupiableBucketLeapArray,否则为 BucketLeapArray。

    注意,LeapArray 的泛型类为 MetricBucket,意思就是指标桶,可以认为一个 MetricBucket 对象可以存储一个抽样时间段内所有的指标,例如一个抽象时间段中通过数量、阻塞数量、异常数量、成功数量、响应时间,其实现的奥秘在 LongAdder 中,本文先不对该类进行详细介绍,后续文章会单独来探究其实现原理。

    这次,我们先不去看子类,反其道而行,先去看看其父类。

    2.2 LongAdder

    2.2.1 类图与核心属性

     

    LeapArray 的核心属性如下:

    • int windowLengthInMs
      每一个窗口的时间间隔,单位为毫秒。

    • int sampleCount
      抽样个数,就一个统计时间间隔中包含的滑动窗口个数,在 intervalInMs 相同的情况下,sampleCount 越多,抽样的统计数据就越精确,相应的需要的内存也越多。

    • int intervalInMs
      一个统计的时间间隔。

    • AtomicReferenceArray> array
      一个统计时间间隔中滑动窗口的数组,从这里也可以看出,一个滑动窗口就是使用的 WindowWrap< MetricBucket > 来表示。

    上面的各个属性的含义是从其构造函数得出来的,请其看构造函数。

    public LeapArray(int sampleCount, int intervalInMs) {
        AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
        AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
        AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
        this.windowLengthInMs = intervalInMs / sampleCount;
        this.intervalInMs = intervalInMs;
        this.sampleCount = sampleCount;
        this.array = new AtomicReferenceArray<>(sampleCount);
    }
    

    那我们继续来看 LeapArray 中的方法,深入探究滑动窗口的实现细节。

    2.2.2 currentWindow() 详解

    该方法主要是根据当前时间来确定处于哪一个滑动窗口中,即找到上图中的 WindowWrap,该方法内部就是调用其重载方法,参数为系统的当前时间,故我们重点来看一下重载方法的实现。

    public WindowWrap<T> currentWindow(long timeMillis) { 
        if (timeMillis < 0) {
            return null;
        }
        int idx = calculateTimeIdx(timeMillis);  // @1
        long windowStart = calculateWindowStart(timeMillis); // @2
        while (true) { // 死循环查找当前的时间窗口,这里之所有需要循环,是因为可能多个线程都在获取当前时间窗口。
            WindowWrap<T> old = array.get(idx);  // @3
                    if (old == null) {  // @4
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                        if (array.compareAndSet(idx, null, window)) {  // @5
                    return window;
                        } else {
                    Thread.yield();
                        }
                    } else if (windowStart == old.windowStart()) { // @6
                return old;
                    } else if (windowStart > old.windowStart()) {  // @7
                if (updateLock.tryLock()) {
                                try {
                        return resetWindowTo(old, windowStart);
                            } finally {
                        updateLock.unlock();
                              }
                        } else {
                    Thread.yield();
                        }
                } else if (windowStart < old.windowStart()) { // @8
                        return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                }
            }
    }
    

    代码@1:计算当前时间会落在一个采集间隔 ( LeapArray ) 中哪一个时间窗口中,即在 LeapArray 中属性 AtomicReferenceArray > array 的下标。其实现算法如下:

    • 首先用当前时间除以一个时间窗口的时间间隔,得出当前时间是多少个时间窗口的倍数,用 n 表示。

    • 然后我们可以看出从一系列时间窗口,从 0 开始,一起向前滚动 n 隔得到当前时间戳代表的时间窗口的位置。现在我们要定位到这个时间窗口的位置是落在 LeapArray 中数组的下标,而一个 LeapArray 中包含 sampleCount 个元素,要得到其下标,则使用 n % sampleCount 即可。

    代码@2:计算当前时间戳所在的时间窗口的开始时间,即要计算出 WindowWrap 中 windowStart 的值,其实就是要算出小于当前时间戳,并且是 windowLengthInMs 的整数倍最大的数字,Sentinel 给出是算法为 ( timeMillis - timeMillis % windowLengthInMs )。

    代码@3:尝试从 LeapArray 中的 WindowWrap 数组查找指定下标的元素。

    代码@4:如果指定下标的元素为空,则需要创建一个 WindowWrap 。其中 WindowWrap 中的 MetricBucket 是调用其抽象方法 newEmptyBucket (timeMillis),由不同的子类去实现。

    代码@5:这里使用了 CAS 机制来更新 LeapArray 数组中的 元素,因为同一时间戳,可能有多个线程都在获取当前时间窗口对象,但该时间窗口对象还未创建,这里就是避免创建多个,导致统计数据被覆盖,如果用 CAS 更新成功的线程,则返回新建好的 WindowWrap ,CAS 设置不成功的线程继续跑这个流程,然后会进入到代码@6。

    代码@6:如果指定索引下的时间窗口对象不为空并判断起始时间相等,则返回。

    代码@7:如果原先存在的窗口开始时间小于当前时间戳计算出来的开始时间,则表示 bucket 已被弃用。则需要将开始时间重置到新时间戳对应的开始时间戳,重置的逻辑将在下文详细介绍。

    代码@8:应该不会进入到该分支,因为当前时间算出来时间窗口不会比之前的小。

    2.2.3 isWindowDeprecated() 详解

    接下来我们来看一下窗口的过期机制。

    public boolean isWindowDeprecated(/*@NonNull*/ WindowWrap<T> windowWrap) {
        return isWindowDeprecated(TimeUtil.currentTimeMillis(), windowWrap);
    }
    public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
        return time - windowWrap.windowStart() > intervalInMs;
    }
    

    判断滑动窗口是否生效的依据是当系统时间与滑动窗口的开始时间戳的间隔大于一个采集时间,即表示过期。即从当前窗口开始,通常包含的有效窗口为 sampleCount 个有效滑动窗口。

    2.2.4 getPreviousWindow() 详解

    根据当前时间获取前一个有效滑动窗口,其代码如下:

    public WindowWrap<T> getPreviousWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
        int idx = calculateTimeIdx(timeMillis - windowLengthInMs); // @1
        timeMillis = timeMillis - windowLengthInMs;
        WindowWrap<T> wrap = array.get(idx);
        if (wrap == null || isWindowDeprecated(wrap)) {                 // @2
            return null;
        }
       if (wrap.windowStart() + windowLengthInMs < (timeMillis)) {   // @3
            return null;
        }
        return wrap;
    }
    

    其实现的关键点如下:
    代码@1:用当前时间减去一个时间窗口间隔,然后去定位所在 LeapArray 中 数组的下标。
    代码@2:如果为空或已过期,则返回 null。
    代码@3:如果定位的窗口的开始时间再加上 windowLengthInMs 小于 timeMills ,说明失效,则返回 null,通常是不会走到该分支。

    2.2.5 滑动窗口图示

    经过上面的分析,虽然还有一个核心方法 (resetWindowTo) 未进行分析,但我们应该可以画出滑动窗口的实现的实现原理图了。

     

    接下来对上面的图进行一个简单的说明:下面的示例以采集间隔为 1 s,抽样次数为 2。

    首先会创建一个 LeapArray,内部持有一个数组,元素为 2,一开始进行采集时,数组的第一个,第二个下标都会 null,例如当前时间经过 calculateTimeIdx 定位到下标为 0,此时没有滑动窗口,会创建一个滑动窗口,然后该滑动窗口会采集指标,随着进入 1s 的后500ms,后会创建第二个抽样窗口。

    然后时间前进 1s,又会定位到下标为 0 的地方,但此时不会为空,因为有上一秒的采集数据,故需要将这些采集数据丢弃 ( MetricBucket value ),然后重置该窗口的 windowStart,这就是 resetWindowTo 方法的作用。

    在 ArrayMetric 的构造函数出现了 LeapArray 的两个实现类型 BucketLeapArray 与 OccupiableBucketLeapArray。

    其中 BucketLeapArray 比较简单,在这里就不深入研究了, 我们接下来将重点探讨一下 OccupiableBucketLeapArray 的实现原理,即支持抢占未来的“令牌”。

    3、OccupiableBucketLeapArray 详解


    所谓的 OccupiableBucketLeapArray ,实现的思想是当前抽样统计中的“令牌”已耗尽,即达到用户设定的相关指标的阔值后,可以向下一个时间窗口,即借用未来一个采样区间。接下来我们详细来探讨一下它的核心实现原理。

    3.1 类图


    我们重点关注一下 OccupiableBucketLeapArray 引入了一个 FutureBucketLeapArray 的成员变量,其命名叫 borrowArray,即为借用的意思。

    3.2 构造函数

    public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
        super(sampleCount, intervalInMs);
        this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);
    }
    

    从构造函数可以看出,不仅创建了一个常规的 LeapArray,对应一个采集周期,还会创建一个  borrowArray ,也会包含一个采集周期。

    3.3 newEmptyBucket

    public MetricBucket newEmptyBucket(long time) {
        MetricBucket newBucket = new MetricBucket();   // @1
        MetricBucket borrowBucket = borrowArray.getWindowValue(time);  // @2
        if (borrowBucket != null) {  
            newBucket.reset(borrowBucket);  
        }
        return newBucket;
    }
    

    我们知道 newEmptyBucket 是在获取当前窗口时,对应的数组下标为空的时会创建。
    代码@1:首先新建一个 MetricBucket。
    代码@2:在新建的时候,如果曾经有借用过未来的滑动窗口,则将未来的滑动窗口上收集的数据 copy 到新创建的采集指标上,再返回。

    3.4 resetWindowTo

    protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long time) {      
        w.resetTo(time);
        MetricBucket borrowBucket = borrowArray.getWindowValue(time);
        if (borrowBucket != null) {
            w.value().reset();
            w.value().addPass((int)borrowBucket.pass());
        } else {
            w.value().reset();
        }
        return w;
    }
    

    遇到过期的滑动窗口时,需要对滑动窗口进行重置,这里的思路和 newEmptyBucket 的核心思想是一样的,即如果存在已借用的情况,在重置后需要加上在未来已使用过的许可,就不一一展开了。

    3.5 addWaiting

    public void addWaiting(long time, int acquireCount) {
        WindowWrap<MetricBucket> window = borrowArray.currentWindow(time);
        window.value().add(MetricEvent.PASS, acquireCount);
    }
    

    经过上面的分析,先做一个大胆的猜测,该方法应该是当前滑动窗口中的“令牌”已使用完成,借用未来的令牌。将在下文给出证明。

    滑动窗口的实现原理就介绍到这里了。大家可以按照上面的代码结合下图做一个理解。

     

    思考题,大家可以画一下 OccupiableBucketLeapArray 滑动窗口的图示以及引入Occupiable 机制的目的。这部分内容也将在我的【中间件知识星球】中与各位星友一起探讨,欢迎大家的加入。


     

    欢迎加入我的知识星球,一起交流源码,探讨架构,打造高质量的技术交流圈,长按如下二维码

     

    中间件兴趣圈 知识星球 正在对如下话题展开如火如荼的讨论:

     

    1、【让天下没有难学的Netty-网络通道篇】

    1、Netty4 Channel概述(已发表)

    2、Netty4 ChannelHandler概述(已发表)

    3、Netty4事件处理传播机制(已发表)

    4、Netty4服务端启动流程

    5、Netty4 NIO 客户端启动流程

    6、Netty4 NIO线程模型分析

    7、Netty4编码器、解码器实现原理

    8、Netty4 读事件处理流程

    9、Netty4 写事件处理流程

    10、Netty4 NIO Channel其他方法详解

    2、Java 并发框架(JUC) 探讨【面试神器】
    3、源码分析Alibaba Sentienl 专栏背后的写作与学习技巧。

     

    如果您喜欢这篇文章,点【在看】与转发是一种美德,期待您的认可与鼓励,越努力越幸运。

     

    https://mp.weixin.qq.com/s/tn8rSHyv_hiJi6QDhfxuIA

    展开全文
  • sentinel默认有每秒和每分钟的滑动窗口,对应的LeapArray类型,它们的初始化逻辑是: protected int windowLengthInMs ; // 单个滑动窗口时间值 protected int sampleCount ; // 滑动窗口个数 protected ...
  • Sentinel Core流程分析

    2019-09-30 09:08:50
    LeapArray:滑动窗口模型,实现不同时间粒度的滑动窗口行为,LeapArray为抽象类 BucketLeapArray:LeapArray的实现类,主要实现方法 1) newEmptyBucket:初始化窗口的动作,这里初始化时设置了包装类...
  • sentinel限流相关指标统计源码分析

    千次阅读 2019-10-28 15:38:10
    StatisticSlot是slotChain中负责记录统计数据的slot,因此自然使用了LeapArray,下面通过分析源码说明StatisticSlot是怎么通过LeapArray记录限流信号量的。 分析slot自然首先从它的entry方法入手 ...
  • 限流-滑动窗口

    2021-01-04 17:25:52
    enableOccupy 是否允许抢占,即当前时间戳已经达到限制后,是否可以占用下一个时间窗口的容量,这里对应 LeapArray 的两个实现类,如果允许抢占,则为 OccupiableBucketLeapArray,否则为 BucketLeapArray。...
  • 实现思路      Sentinel 底层采用高性能的滑动窗口数据结构 LeapArray 来统计实时的秒级指标数据,可以很好地支撑写多于读的高并发场景。 2.源码解析 LeapArray表示整个滑动窗口,其中有几个很重要的属性 代码...
  • Sentinel源码分析----滑动窗口

    千次阅读 2019-03-05 00:23:30
    LeapArray < MetricBucket > { public MetricsLeapArray ( int sampleCount , int intervalInMs ) { super ( sampleCount , intervalInMs ) ; } @Override public MetricBucket ...
  • Java数组(Array)

    2022-03-07 22:02:46
    Java数组(Array) 概述 ​ 数组(Array)是有序的元素序列。若将有限个类型相同的变量的集合命名,那么这个名称为数组名。组成数组的各个变量称为数组的分量,也称为数组的元素,有时也称为下标变量。...
  • 实时统计实现 滑动窗口(LeapArray) 滑动窗口(RxJava) Ring Bit Buffer 动态规则配置 支持多种数据源 支持多种数据源 有限支持 扩展性 多个扩展点 插件形式 接口形式 基于注解的支持 支持 支持 支持 限流 基于 ...
  • 系列文章 Sentinel 原理-调用链Sentinel 原理-滑动窗口Sentinel 原理-实体类Sentinel 实战-限流篇Sentinel 实战-控制台篇Sentinel 实战-规则持久化Sentinel 实战-集群限流篇 Sentinel 系列教程,现已上传到 github...
  • 滑动窗口(LeapArray) 滑动窗口(基于 RxJava) Ring Bit Buffer 动态规则配置 支持多种数据源 支持多种数据源 有限支持 扩展性 多个扩展点 插件的形式 接口的形式 基于注解的支持 支持 ...
  • 数组之滑动窗口

    2020-10-09 17:25:37
    /** * 给定一个含有 n 个正整数的数组和一个正整数 s ,找出该数组中满足其和 ≥ s 的长度最小的 连续 子数组,并返回其长度。如果不存在符合条件的子数组,返回 0。 * <p> * 示例: * 输入:s = 7, nums ...
  • 统计槽实施指标数据统计 StatisticSlot.entry() 4.1 统计“增加线程数”和“请求通过数” 4.2 数据统计的数据结构 4.2.1 ArrayMetric 指标数组 4.2.2 LeapArray 环形数组 4.2.3 WindowWrap 窗口包装类 4.2.4 ...
  • 信号量隔离实现原理 Sentinel 底层采用高性能的滑动窗口数据结构 LeapArray 来统计实时的秒级指标数据,在 信号量隔离的底层实现中, 通过根据不同的策略,如 异常数 策略,统计在 滑动窗口区间内, 异常请求量的...
  • 目录 Sentinel 介绍 Sentinel 的历史 Sentinel 基本概念 资源 规则 Sentinel 功能和设计理念 流量控制 熔断降级 系统负载保护 Sentinel 是如何工作的 快速开始 本地Demo 1. 引入 Sentinel 依赖 ...Sentin
  • 两者内部都内部维护了一个LeapArray: public class ExceptionCircuitBreaker extends AbstractCircuitBreaker { ​ private final int strategy; private final int minRequestAmount; private final double ...
  • Sentinel进阶(二)

    2022-03-30 14:26:10
    一、Sentinel的原理(源码分析) 源码入口 Entry entry = SphU.entry(resourceName); 围绕两点展开: 寻找入口SphU.entry() 初始化流控规则的入口 自动装配: 最核心的配置类: ......
  • Hystrix Hystrix介绍 在微服务场景中,通常会有很多层的服务调用。如果一个底层服务出现问题,故障会被向上传播给用户。我们需要一种机制,当底层服务不可用时,可以阻断故障的传播。这就是断路器的作用。...
  • public class WindowLeapArray extends LeapArray<Window> {  public WindowLeapArray(int windowLengthInMs, int intervalInSec) {  super(windowLengthInMs, intervalInSec);  } } 该对象的构造方法有两...
  • 现在我们要定位到这个时间窗口的位置是落在 LeapArray 中数组的下标,而一个 LeapArray 中包含 sampleCount 个元素,要得到其下标,则使用 n % sampleCount 即可。 代码@2:计算当前时间戳所在的时间窗口的开始时间...
  • sentinel 史上最全

    2021-01-16 19:42:35
    滑动窗口源码实现 3.1 MetricBucket 3.2 WindowWrap 3.3 LeapArray 推荐2: 地表最强 开发环境 系列 工欲善其事 必先利其器 地表最强 开发环境: vagrant+java+springcloud+redis+zookeeper镜像下载(&制作详解) 地表...
  • 目录 一、StatisticSlot 流程 二、添加指标流程 三、指标获取流程 一、... } 3、ArrayMetric (1)使用LeapArray来进行秒级统计和分钟级统计 private final LeapArray<MetricBucket> data; public ArrayMetric...
  • 1.8 源码分析

    2020-10-08 12:23:18
    Pt2.1 MetricEvent Pt2.2 MetricBucket Pt2.3 WindowWrap Pt2.4 LeapArray Pt2.5 ArrayMetric Pt3 限流源码逻辑 Pt3.1 示例FlowQpsDemo Pt3.2 代码流程说明 Pt3.3 定义并加载规则 Pt3.4 限流规则校验Sph*#entry() Pt...
  • } } 可以看到ArrayMetric其实也是一个包装类,内部通过实例化LeapArray的对应实现类,来实现具体的统计逻辑,LeapArray是一个抽象类,OccupiableBucketLeapArray和BucketLeapArray都是其具体的实现类 ...
  • Sentinel源码分析

    2022-04-09 20:02:30
    Sentinel源码分析 1.Sentinel的基本概念 Sentinel实现限流、隔离、降级、熔断等功能,本质要做的就是两件事情: 统计数据:统计某个资源的访问数据(QPS、RT等信息) 规则判断:判断限流规则、隔离规则、降级规则、...
  • 现在我们要定位到这个时间窗口的位置是落在 LeapArray 中数组的下标,而一个 LeapArray 中包含 sampleCount 个元素,要得到其下标,则使用 n % sampleCount 即可。 代码@2:计算当前时间戳所在的时间窗口的开始时间...
  • 滑动窗口(LeapArray) 滑动窗口(基于 RxJava) Ring Bit Buffer 动态规则配置 支持多种数据源 支持多种数据源 有限支持 扩展性 多个扩展点 插件的形式 接口的形式 基于注解的支持 支持 支持 支持 限流 基于 QPS,...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 451
精华内容 180
关键字:

leaparray

友情链接: 084837.zip