精华内容
参与话题
问答
  • Kcp

    2017-11-13 10:32:22
    KCP: using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; public class KCP { #region 常量 public const int IKCP_RTO_NDL = 30; ...

    KCP:

    using System;
    using System.Collections.Generic;
    using System.IO;
    using System.Linq;
    using System.Text;
    
    
    public class KCP
    {
        #region 常量
        public const int IKCP_RTO_NDL     = 30;        // rto: 无延时模式下最小超时重传时间 no delay min rto
        public const int IKCP_RTO_MIN     = 100;       // rto: 正常模式最小超时重传 normal min rto
        public const int IKCP_RTO_DEF     = 200;       // rto: 默认超时重传
        public const int IKCP_RTO_MAX     = 60000;     // rto: 最大超时超时
        public const int IKCP_CMD_PUSH    = 81;        // cmd: 协议类型 [正常接收数据] push data 
        public const int IKCP_CMD_ACK     = 82;        // cmd: 协议类型 [收到ack回复] ack 
        public const int IKCP_CMD_WASK    = 83;        // cmd: 协议类型 [询问对方窗口size] window probe (ask) 
        public const int IKCP_CMD_WINS    = 84;        // cmd: 协议类型 [告知对方我的窗口size] window size (tell) 
        public const int IKCP_ASK_SEND    = 1;         // cmd: 是否需要发送 IKCP_CMD_WASK
        public const int IKCP_ASK_TELL    = 2;         // cmd: 是否需要发送 IKCP_CMD_WINS
        public const int IKCP_WND_SND     = 32;        // wnd: 发送队列滑动窗口最大值 
        public const int IKCP_WND_RCV     = 32;        // wnd: 接收队列滑动窗口最大值
        public const int IKCP_MTU_DEF     = 1400;      // segment: 报文默认大小 [mtu 网络最小传输单元]
        public const int IKCP_ACK_FAST    = 3;         // null: 没有被用使用
        public const int IKCP_INTERVAL    = 100;       // flush: 控制刷新时间间隔
        public const int IKCP_OVERHEAD    = 24;        // segment: 报文默认大小 [mtu 网络最小传输单元]
        public const int IKCP_DEADLINK    = 10;        // rto: ???
        public const int IKCP_THRESH_INIT = 2;         // ssthresh: 慢热启动 初始窗口大小
        public const int IKCP_THRESH_MIN  = 2;         // ssthresh: 慢热启动 最小窗口大小
        public const int IKCP_PROBE_INIT = 7000;       // probe: 请求询问远端窗口大小的初始时间  7 secs to probe window size
        public const int IKCP_PROBE_LIMIT = 120000;    // probe: 请求询问远端窗口大小的最大时间  up to 120 secs to probe window
        #endregion
    
        #region 工具方法
        // encode 8 bits unsigned int
        public static int ikcp_encode8u(byte[] p, int offset, byte c)
        {
            p[0 + offset] = c;
            return 1;
        }
    
        // decode 8 bits unsigned int
        public static int ikcp_decode8u(byte[] p, int offset, ref byte c)
        {
            c = p[0 + offset];
            return 1;
        }
    
         /* encode 16 bits unsigned int (lsb) */
        public static int ikcp_encode16u(byte[] p, int offset, UInt16 w) 
        {
            p[0 + offset] = (byte)(w >> 0);
            p[1 + offset] = (byte)(w >> 8);
            return 2;
        }
    
        /* decode 16 bits unsigned int (lsb) */
        public static int ikcp_decode16u(byte[] p, int offset, ref UInt16 c)  
        {
            UInt16 result = 0;
            result |= (UInt16)p[0 + offset];
            result |= (UInt16)(p[1 + offset] << 8);
            c = result;
            return 2;
        }
    
        /* encode 32 bits unsigned int (lsb) */
        public static int ikcp_encode32u(byte[] p, int offset, UInt32 l)
        {
            p[0 + offset] = (byte)(l >> 0);
            p[1 + offset] = (byte)(l >> 8);
            p[2 + offset] = (byte)(l >> 16);
            p[3 + offset] = (byte)(l >> 24);
            return 4;
        }
    
        /* decode 32 bits unsigned int (lsb) */
        public static int ikcp_decode32u(byte[] p, int offset, ref UInt32 c) 
        {
            UInt32 result = 0;
            result |= (UInt32)p[0 + offset];
            result |= (UInt32)(p[1 + offset] << 8);
            result |= (UInt32)(p[2 + offset] << 16);
            result |= (UInt32)(p[3 + offset] << 24);
            c = result;
            return 4;
        }
    
        public static byte[] slice(byte[] p, int start, int stop) {
            var bytes = new byte[stop - start];
            Array.Copy(p, start, bytes, 0, bytes.Length);
            return bytes;
        }
    
        public static T[] slice<T>(T[] p, int start, int stop) {
            var arr = new T[stop - start];
            var index = 0;
            for (var i = start; i < stop; i++)
            {
                arr[index] = p[i];
                index++;
            }
    
            return arr;
        }
    
        public static byte[] append(byte[] p, byte c) {
            var bytes = new byte[p.Length + 1];
            Array.Copy(p, bytes, p.Length);
            bytes[p.Length] = c;
            return bytes;
        }
    
        public static T[] append<T>(T[] p, T c) {
            var arr = new T[p.Length + 1];
            for (var i = 0; i < p.Length; i++)
                arr[i] = p[i];
            arr[p.Length] = c;
            return arr;
        }
    
        public static T[] append<T>(T[] p, T[] cs)
        {
            var arr = new T[p.Length + cs.Length];
            for (var i = 0; i < p.Length; i++)
                arr[i] = p[i];
            for (var i = 0; i < cs.Length; i++ )
                arr[p.Length+i] = cs[i];
            return arr;
        }
    
        static UInt32 _imin_(UInt32 a, UInt32 b) 
        {
            return a <= b ? a : b;
        }
    
        static UInt32 _imax_(UInt32 a, UInt32 b)
        {
            return a >= b ? a : b;
        }
    
        static UInt32 _ibound_(UInt32 lower, UInt32 middle, UInt32 upper)
        {
            return _imin_(_imax_(lower, middle), upper);
        }
    
        static Int32 _itimediff(UInt32 later, UInt32 earlier) 
        {
            return ((Int32)(later - earlier));
        }
        #endregion
    
        #region 报文
        // KCP Segment Definition
        internal class Segment { 
          internal UInt32 conv = 0;            // ID:服务器为当前连接分配的唯一ID
          internal UInt32 cmd = 0;             // 报文类型:RUDP_CMD_PUSH | RUDP_CMD_ACK | RUDP_CMD_WASK|RUDP_CMD_WINS
          internal UInt32 frg = 0;             // 报文在包中的位置:包被分割成n个报文进行发送, frg = n - index - 1  [倒序索引]
          internal UInt32 wnd = 0;             // 接收窗口未被使用的大小:wnd = rcv_wnd - rcv_queue.Length
          internal UInt32 ts = 0;              // 报文发送时间
          internal UInt32 sn = 0;              // ack 当前报文,等待回应的帧ID
          internal UInt32 una = 0;             // ack 当前未应答最小帧号
    
          internal UInt32 resendts = 0;        // 下一次重传的时间
          internal UInt32 rto = 0;             // 超时重传机制的时间系数,它是会动态调整的
          internal UInt32 fastack = 0;         // 这个值会累加,当超过一个阈值的时候会触发一次重传。累计规则:收到比它后的帧号的ack应答后,它会被累加 
          internal UInt32 xmit = 0;            // 重传次数,当xmit > dead_link 表示连接断开
          internal byte[] data;
    
          internal Segment(int size)
          {
              this.data = new byte[size];
          }
    
          // encode a segment into buffer
          internal int encode(byte[] ptr, int offset) {
    
              var offset_ = offset;
    
              offset += ikcp_encode32u(ptr, offset, conv);
              offset += ikcp_encode8u(ptr, offset, (byte)cmd);
              offset += ikcp_encode8u(ptr, offset, (byte)frg);
              offset += ikcp_encode16u(ptr, offset, (UInt16)wnd);
              offset += ikcp_encode32u(ptr, offset, ts);
              offset += ikcp_encode32u(ptr, offset, sn);
              offset += ikcp_encode32u(ptr, offset, una);
              offset += ikcp_encode32u(ptr, offset, (UInt32)data.Length);
    
              return offset - offset_;
          }
        }
        #endregion
    
        // kcp members.
        UInt32 conv;         // 唯一ID
        UInt32 mtu;          // 报文大小
        UInt32 mss;          // 报文body大小
        UInt32 state;        // 一个描述连接是否死亡的状态
        UInt32 snd_una;      // 发送队列等待的ack最小帧号
        UInt32 snd_nxt;      // 发送队列下一个待发送的帧号
        UInt32 rcv_nxt;      // 接收队列下一个待接收的帧号
        UInt32 ts_recent;    // [未使用]
        UInt32 ts_lastack;   // [未使用]
        UInt32 ssthresh;     // 慢热启动系数,初始为 IKCP_THRESH_INIT
        #region rto 超时重传
        UInt32 rx_rttval;    // 
        UInt32 rx_srtt; 
        UInt32 rx_rto;       // 超时重传系数
        UInt32 rx_minrto;
        #endregion
        UInt32 snd_wnd;      // 发送窗口大小
        UInt32 rcv_wnd;      // 接收窗口大小
        UInt32 rmt_wnd;      // 远端接收窗口大小
    
        UInt32 cwnd;         // 窗口大小
        UInt32 probe;        // 请求窗口size | 发送窗口size
        UInt32 current;      // 当前Update时间
        UInt32 interval;     // 刷新间隔时间
        UInt32 ts_flush;     // 上一次flush的时间
        UInt32 xmit;         // 重传次数
        UInt32 nodelay;      // 无延迟模式系数
        UInt32 updated;      // 是否是第一次更新
        UInt32 ts_probe;     // 下一次请求探测远端窗口大小的时间
        UInt32 probe_wait;   // 探测等待时间,一个动态调整的值
        UInt32 dead_link;    // 超时重传超过这个值,视作连接断开
        UInt32 incr;         // 慢热启动
    
        Segment[] snd_queue = new Segment[0];     // 发送队列 buffer -> snd_queue -> snd_buf    snd_buf = 窗口
        Segment[] rcv_queue = new Segment[0];     // 接收队列 rcv_buf -> rcv_queue -> buffer    rcv_buf = 窗口
        Segment[] snd_buf = new Segment[0];
        Segment[] rcv_buf = new Segment[0]; 
    
        UInt32[] acklist = new UInt32[0];   // 待返回的ack应答序列
    
        byte[] buffer;        // 内部报文容器
        Int32 fastresend;     // 快速重传
        Int32 nocwnd;
        Int32 logmask;
        // buffer, size
        Action<byte[], int> output;
    
        // create a new kcp control object, 'conv' must equal in two endpoint
        // from the same connection.
        public KCP(UInt32 conv_, Action<byte[], int> output_) { 
            conv = conv_;
            snd_wnd = IKCP_WND_SND;
            rcv_wnd = IKCP_WND_RCV;
            rmt_wnd = IKCP_WND_RCV;
            mtu = IKCP_MTU_DEF;
            mss = mtu - IKCP_OVERHEAD;
    
            rx_rto = IKCP_RTO_DEF;
            rx_minrto = IKCP_RTO_MIN;
            interval = IKCP_INTERVAL;
            ts_flush = IKCP_INTERVAL;
            ssthresh = IKCP_THRESH_INIT;
            dead_link = IKCP_DEADLINK;
            buffer = new byte[(mtu+IKCP_OVERHEAD)*3];
            output = output_;
        }
    
        /// <summary>
        /// 检测是否有完整的包,返回包的长度,-1 表示没有
        /// </summary>
        public int PeekSize() {
            // 1.接收队列为空
            if (0 == rcv_queue.Length) return -1;
    
            // 2.当 frg = 0 表示已经接收到包的最后一个报文
            var seq = rcv_queue[0];
            if (0 == seq.frg) return seq.data.Length;
    
            // 3.接收队列的大小 小于 包的总报文数量,肯定不够
            if (rcv_queue.Length < seq.frg + 1) return -1;
    
            // 4.计算一个完整包的长度
            int length = 0;
            foreach (var item in rcv_queue) {
                length += item.data.Length;
                if (0 == item.frg) // 当 frg = 0 为最后一个报文
                    break;
            }
    
            return length;
        }
    
        /// <summary>
        /// rcv_queue -> buffer
        /// </summary>
        public int Recv(byte[] buffer) {
            // 1.处理异常流程
            if (0 == rcv_queue.Length) return -1;   // 接收队列为空
    
            var peekSize = PeekSize();  
            if (0 > peekSize) return -2;  // size小于0
    
            if (peekSize > buffer.Length) return -3; // size大于buffer长度 
    
            var fast_recover = false;
            if (rcv_queue.Length >= rcv_wnd) fast_recover = true; // 接收队列大于滑动窗口的size,触发快速发送我的窗口size给远端
    
            // 2.拷贝rcv_queue到buffer,直到 frg = 0
            var count = 0;
            var n = 0;
            foreach (var seg in rcv_queue) {
                Array.Copy(seg.data, 0, buffer, n, seg.data.Length);
                n += seg.data.Length;
                count++;
                if (0 == seg.frg) break;
            }
    
            if (0 < count) {
                rcv_queue = slice<Segment>(rcv_queue, count, rcv_queue.Length);
            }
    
            // 3.rcv_buf -> rcv_queue:直到碰到缺口,或窗口变满
            count = 0;
            foreach (var seg in rcv_buf) {
                if (seg.sn == rcv_nxt && rcv_queue.Length < rcv_wnd) {
                    rcv_queue = append<Segment>(rcv_queue, seg);
                    rcv_nxt++;
                    count++;
                } else {
                    break;
                }
            }
    
            if(0 < count) rcv_buf = slice<Segment>(rcv_buf, count, rcv_buf.Length);
    
            // fast recover
            if (rcv_queue.Length < rcv_wnd && fast_recover) {
                // ready to send back IKCP_CMD_WINS in ikcp_flush
                // tell remote my window size
                probe |= IKCP_ASK_TELL;  // 通知远端,我的窗口size
            }
    
            return n;
        }
    
        /// <summary>
        /// buffer -> snd_queue:  -1:buff为null  -2:buff > mss * 255 超过上限
        /// </summary>
        /// <param name="buffer"></param>
        /// <returns></returns>
        public int Send(byte[] buffer) {
            // 1.计算拆分的报文数 count
            if (0 == buffer.Length) return -1;
    
            var count = 0;
    
            if (buffer.Length < mss)
                count = 1;
            else
                count = (int)(buffer.Length + mss - 1) / (int)mss;
    
            if (255 < count) return -2;
    
            if (0 == count) count = 1;
    
            // 2.把 buff 转换成 snd_queue
            var offset = 0;
            for (var i = 0; i < count; i++) {
                var size = 0;
                if (buffer.Length - offset > mss)
                    size = (int)mss;
                else
                    size = buffer.Length - offset;
    
                var seg = new Segment(size);
                Array.Copy(buffer, offset, seg.data, 0, size);
                offset += size;
                seg.frg = (UInt32)(count - i - 1);
                snd_queue = append<Segment>(snd_queue, seg);
            }
    
           return 0;
        }
    
        /// <summary>
        /// rtt算法:调整rto的大小 [超时重传]
        /// </summary>
        void update_ack(Int32 rtt)
        {
            if (0 == rx_srtt)
            {
                rx_srtt = (UInt32)rtt;
                rx_rttval = (UInt32)rtt / 2;
            }
            else 
            {
                Int32 delta = (Int32)((UInt32)rtt - rx_srtt);
                if (0 > delta) delta = -delta;
    
                rx_rttval = (3 * rx_rttval + (uint)delta) / 4;
                rx_srtt = (UInt32)((7 * rx_srtt + rtt) / 8);
                if (rx_srtt < 1) rx_srtt = 1;
            }
    
            var rto = (int)(rx_srtt + _imax_(1, 4 * rx_rttval));
            rx_rto = _ibound_(rx_minrto, (UInt32)rto, IKCP_RTO_MAX);
        }
    
        /// <summary>
        /// 重置:snd_una 最小待确认帧号
        /// </summary>
        void shrink_buf() {
            if (snd_buf.Length > 0)
                snd_una = snd_buf[0].sn;
            else
                snd_una = snd_nxt;
        }
    
        /// <summary>
        /// 把收到ack的报文,从snd_buf确认队列中移除
        /// </summary>
        void parse_ack(UInt32 sn) {
            // 1.如果sn小于snd_una,说明是一个重复ack包,  sn大于snd_nxt,说明sn是错的
            if (_itimediff(sn, snd_una) < 0 || _itimediff(sn, snd_nxt) >= 0) return;
    
            // 2.把已经应答的内容,从snd_buf里面移出
            var index = 0;
            foreach (var seg in snd_buf) {
                if (sn == seg.sn)
                {
                    snd_buf = append<Segment>(slice<Segment>(snd_buf, 0, index), slice<Segment>(snd_buf, index + 1, snd_buf.Length));
                    break;
                }
                else
                {
                    seg.fastack++;  // 说明后面的应答了,但是前面的还没有,所以增加前面的ack的权重
                }
    
                index++;
            }
        }
    
        /// <summary>
        /// snd_buf 向下移动
        /// </summary>
        void parse_una(UInt32 una) {
            var count = 0;
            foreach (var seg in snd_buf) {
                if (_itimediff(una, seg.sn) > 0)
                    count++;
                else
                    break;
            }
    
            if (0 < count) snd_buf = slice<Segment>(snd_buf, count, snd_buf.Length);
        }
    
        void ack_push(UInt32 sn, UInt32 ts) {
            acklist = append<UInt32>(acklist, new UInt32[2]{sn, ts});
        }
    
        void ack_get(int p, ref UInt32 sn, ref UInt32 ts) {
            sn = acklist[p * 2 + 0];
            ts = acklist[p * 2 + 1];
        }
    
        /// <summary>
        /// 把报文放入 rcv_buf
        /// </summary>
        void parse_data(Segment newseg) {
            var sn = newseg.sn;
            if (_itimediff(sn, rcv_nxt + rcv_wnd) >= 0 || _itimediff(sn, rcv_nxt) < 0) return;  // 超出窗口,或者重复消息都会被丢弃
    
            // 1.查询报文的位置
            var n = rcv_buf.Length - 1;
            var after_idx = -1;
            var repeat = false;
            for (var i = n; i >= 0; i--) {
                var seg = rcv_buf[i];
                if (seg.sn == sn) {
                    repeat = true;
                    break;
                }
    
                if (_itimediff(sn, seg.sn) > 0) {
                    after_idx = i;
                    break;
                }
            }
    
            // 2.插入到指定位置
            if (!repeat) {
                if (after_idx == -1)
                    rcv_buf = append<Segment>(new Segment[1] { newseg }, rcv_buf);
                else
                    rcv_buf = append<Segment>(slice<Segment>(rcv_buf, 0, after_idx + 1), append<Segment>(new Segment[1] { newseg }, slice<Segment>(rcv_buf, after_idx + 1, rcv_buf.Length)));
            }
    
            // 3.rcv_buf -> rcv_queue
            var count = 0;
            foreach (var seg in rcv_buf) {
                if (seg.sn == rcv_nxt && rcv_queue.Length < rcv_wnd)
                {
                    rcv_queue = append<Segment>(rcv_queue, seg);
                    rcv_nxt++;
                    count++;
                }
                else 
                {
                    break;
                }
            }
    
            if (0 < count) {
                rcv_buf = slice<Segment>(rcv_buf, count, rcv_buf.Length);
            }
        }
    
        /// <summary>
        /// 接收报文处理:-1:conv不合法  -2:报文体内容为0   -3:cmd类型不对 
        /// 一个mtu不止一个报文
        /// </summary>
        public int Input(byte[] data) {
    
            var s_una = snd_una;
            if (data.Length < IKCP_OVERHEAD) return 0;
    
            var offset = 0;
    
            while (true)
            {
                // 1.获取报文内容
                UInt32 ts = 0;
                UInt32 sn = 0;
                UInt32 length = 0;
                UInt32 una = 0;
                UInt32 conv_ = 0;
    
                UInt16 wnd = 0;
    
                byte cmd = 0;
                byte frg = 0;
    
                // 1.1 判断大小是否够一个最小报文
                if (data.Length - offset < IKCP_OVERHEAD) break;
                // 1.2 即便大小够,也不一定是一个报文,需要判断 conv_ 值
                offset += ikcp_decode32u(data, offset, ref conv_);
    
                if (conv != conv_) return -1;
    
                offset += ikcp_decode8u(data, offset, ref cmd);
                offset += ikcp_decode8u(data, offset, ref frg);
                offset += ikcp_decode16u(data, offset, ref wnd);
                offset += ikcp_decode32u(data, offset, ref ts);
                offset += ikcp_decode32u(data, offset, ref sn);
                offset += ikcp_decode32u(data, offset, ref una);
                offset += ikcp_decode32u(data, offset, ref length);
    
                // 1.3 大小不一致
                if (data.Length - offset < length) return -2;
    
                switch (cmd) { 
                    case IKCP_CMD_PUSH:
                    case IKCP_CMD_ACK:
                    case IKCP_CMD_WASK:
                    case IKCP_CMD_WINS:
                        break;
                    default:
                        return -3;  // 1.4 协议类型不对
                }
    
                rmt_wnd = (UInt32)wnd;
                parse_una(una);   // 刷新snd_buf
                shrink_buf();     // 重置:snd_una
    
                // 1.5 基于协议内容处理报文
                if (IKCP_CMD_ACK == cmd) {
                    if (_itimediff(current, ts) >= 0) {
                        update_ack(_itimediff(current, ts)); // 调整 rto
                    }
                    parse_ack(sn); // 刷新 snd_buf,基于ack
                    shrink_buf();  // 重置:snd_una
                }
                else if (IKCP_CMD_PUSH == cmd) {
                    // 1.5.1 处理普通报文
                    if (_itimediff(sn, rcv_nxt + rcv_wnd) < 0) {
                        ack_push(sn, ts);
                        if (_itimediff(sn, rcv_nxt) >= 0) {
                            var seg = new Segment((int)length);
                            seg.conv = conv_;
                            seg.cmd = (UInt32)cmd;
                            seg.frg = (UInt32)frg;
                            seg.wnd = (UInt32)wnd;
                            seg.ts = ts;
                            seg.sn = sn;
                            seg.una = una;
    
                            if (length > 0) Array.Copy(data, offset, seg.data, 0, length);
    
                            parse_data(seg);
                        }
                    }
                }
                else if (IKCP_CMD_WASK == cmd) {
                    // ready to send back IKCP_CMD_WINS in Ikcp_flush
                    // tell remote my window size
                    probe |= IKCP_ASK_TELL;  // 远端请求窗口size,把状态置为发送size
                }
                else if (IKCP_CMD_WINS == cmd)
                {
                    // do nothing
                }
                else {
                    return -3;
                }
    
                offset += (int)length;
            }
    
            // 1.6 慢热启动
            if (_itimediff(snd_una, s_una) > 0) {
                if (cwnd < rmt_wnd) {
                    var mss_ = mss;
                    if (cwnd < ssthresh)
                    {
                        cwnd++;
                        incr += mss_;
                    }
                    else { 
                        if(incr < mss_) {
                            incr = mss_;
                        }
                        incr += (mss_ * mss_) / incr + (mss_ / 16);
                        if ((cwnd + 1) * mss_ <= incr) cwnd++;
                    }
                    if (cwnd > rmt_wnd) {
                        cwnd = rmt_wnd;
                        incr = rmt_wnd * mss_;
                    }
                }
            }
    
            return 0;
        }
    
        Int32 wnd_unused() {
            if (rcv_queue.Length < rcv_wnd)
                return (Int32)(int)rcv_wnd - rcv_queue.Length;
            return 0;
        }
    
        /// <summary>
        /// 刷新数据
        /// </summary>
        void flush() { 
            var current_ = current;
            var buffer_ = buffer;
            var change = 0;
            var lost = 0;
    
            if (0 == updated) return;
    
            var seg = new Segment(0);
            seg.conv = conv;
            seg.cmd = IKCP_CMD_ACK;
            seg.wnd = (UInt32)wnd_unused();
            seg.una = rcv_nxt;
    
            #region ACK:应答
            // 1.ACK:对接收到的报文进行应答
            var count = acklist.Length / 2;
            var offset = 0;
            for (var i = 0; i < count; i++) {
                if (offset + IKCP_OVERHEAD > mtu)
                {
                    output(buffer, offset);
                    //Array.Clear(buffer, 0, offset);
                    offset = 0;
                }
                ack_get(i, ref seg.sn, ref seg.ts);
                offset += seg.encode(buffer, offset);
            }
            acklist = new UInt32[0];
            #endregion
    
            #region WASK:询问对端窗口大小
            if (0 == rmt_wnd)
            {
                if (0 == probe_wait)
                {
                    probe_wait = IKCP_PROBE_INIT;
                    ts_probe = current + probe_wait;
                }
                else
                {
                    if (_itimediff(current, ts_probe) >= 0)
                    {
                        if (probe_wait < IKCP_PROBE_INIT)
                            probe_wait = IKCP_PROBE_INIT;
                        probe_wait += probe_wait / 2;
                        if (probe_wait > IKCP_PROBE_LIMIT)
                            probe_wait = IKCP_PROBE_LIMIT;
                        ts_probe = current + probe_wait;
                        probe |= IKCP_ASK_SEND;
                    }
                }
            }
            else {
                ts_probe = 0;
                probe_wait = 0;
            }
    
            // flush window probing commands
            if ((probe & IKCP_ASK_SEND) != 0) {
                seg.cmd = IKCP_CMD_WASK;
                if (offset + IKCP_OVERHEAD > (int)mtu) {
                    output(buffer, offset);
                    //Array.Clear(buffer, 0, offset);
                    offset = 0;
                }
                offset += seg.encode(buffer, offset);
            }
    
            probe = 0;
            #endregion
    
            #region Push:snd_queue -> snd_buf
            var cwnd_ = _imin_(snd_wnd, rmt_wnd);
            if (0 == nocwnd)
                cwnd_ = _imin_(cwnd, cwnd_);
    
            count = 0;
            for (var k = 0; k < snd_queue.Length; k++ )
            {
                if (_itimediff(snd_nxt, snd_una + cwnd_) >= 0) break;
    
                var newseg = snd_queue[k];
                newseg.conv = conv;
                newseg.cmd = IKCP_CMD_PUSH;
                newseg.wnd = seg.wnd;
                newseg.ts = current_;
                newseg.sn = snd_nxt;
                newseg.una = rcv_nxt;
                newseg.resendts = current_;
                newseg.rto = rx_rto;
                newseg.fastack = 0;
                newseg.xmit = 0;
                snd_buf = append<Segment>(snd_buf, newseg);
                snd_nxt++;
                count++;
            }
    
            if (0 < count) {
                snd_queue = slice<Segment>(snd_queue, count, snd_queue.Length);
            }
            #endregion
    
            #region Resent:超时重传
            // calculate resent
            var resent = (UInt32)fastresend;
            if (fastresend <= 0) resent = 0xffffffff;
            var rtomin = rx_rto >> 3;
            if(nodelay != 0) rtomin = 0;
    
            // flush data segments
            foreach (var segment in snd_buf) {
                var needsend = false;
                var debug = _itimediff(current_, segment.resendts);
                if (0 == segment.xmit) { // 满足条件:一次都没重发
                    needsend = true;
                    segment.xmit++;
                    segment.rto = rx_rto;
                    segment.resendts = current_ + segment.rto + rtomin;
                }
                else if (_itimediff(current_, segment.resendts) >= 0) {  // 满足条件:当前时间超过,重发时间
                    needsend = true;
                    segment.xmit++;
                    xmit++;
                    if (0 == nodelay)
                        segment.rto += rx_rto;
                    else
                        segment.rto += rx_rto / 2;
                    segment.resendts = current_ + segment.rto;
                    lost = 1;
                }
                else if (segment.fastack >= resent){  // 满足条件:fastack 超过重传阈值
                    needsend = true;
                    segment.xmit++;
                    segment.fastack = 0;
                    segment.resendts = current_ + segment.rto;
                    change++;
                }
    
                // 重传操作
                if (needsend) {
                    segment.ts = current_;
                    segment.wnd = seg.wnd;
                    segment.una = rcv_nxt;
    
                    var need = IKCP_OVERHEAD + segment.data.Length;
                    if (offset + need > mtu) {
                        output(buffer, offset);
                        //Array.Clear(buffer, 0, offset);
                        offset = 0;
                    }
    
                    offset += segment.encode(buffer, offset);
                    if (segment.data.Length > 0) {
                        Array.Copy(segment.data, 0, buffer, offset, segment.data.Length);
                        offset += segment.data.Length;
                    }
    
                    if (segment.xmit >= dead_link) {  // 单个报文重传超过指定次数,视为断线
                        state = 0;
                    }
                }
            }
            #endregion
    
            // flash remain segments
            if (offset > 0) {
                output(buffer, offset);
                //Array.Clear(buffer, 0, offset);
                offset = 0;
            }
    
            #region ssthresh:慢热启动
            if (change != 0) {
                var inflight = snd_nxt - snd_una;
                ssthresh = inflight / 2;
                if (ssthresh < IKCP_THRESH_MIN)
                    ssthresh = IKCP_THRESH_MIN;
                cwnd = ssthresh + resent;
                incr = cwnd * mss;
            }
    
            if (lost != 0) {
                ssthresh = cwnd / 2;
                if (ssthresh < IKCP_THRESH_MIN)
                    ssthresh = IKCP_THRESH_MIN;
                cwnd = 1;
                incr = mss;
            }
    
            if (cwnd < 1) {
                cwnd = 1;
                incr = mss;
            }
            #endregion
        }
    
        // update state (call it repeatedly, every 10ms-100ms), or you can ask
        // ikcp_check when to call it again (without ikcp_input/_send calling).
        // 'current' - current timestamp in millisec.
        public void Update(UInt32 current_)
        {
    
            current = current_;
    
            if (0 == updated) {  // 第一次处理
                updated = 1;
                ts_flush = current;
            }
    
            var slap = _itimediff(current, ts_flush); 
    
            if (slap >= 10000 || slap < -10000) {  // 超时处理
                ts_flush = current;
                slap = 0;
            }
    
            if (slap >= 0){ // 利用 ts_flush 控制刷新的时间间隔,interval 控制频率
                ts_flush += interval;
                if (_itimediff(current, ts_flush) >= 0)
                    ts_flush = current + interval;
                flush();
            }
        }
    
        // Determine when should you invoke ikcp_update:
        // returns when you should invoke ikcp_update in millisec, if there
        // is no ikcp_input/_send calling. you can call ikcp_update in that
        // time, instead of call update repeatly.
        // Important to reduce unnacessary ikcp_update invoking. use it to
        // schedule ikcp_update (eg. implementing an epoll-like mechanism,
        // or optimize ikcp_update when handling massive kcp connections)
        // 检测是否需要立即刷新
        public UInt32 Check(UInt32 current_)
        {
    
            if (0 == updated) return current_;
    
            var ts_flush_ = ts_flush;
            var tm_flush_ = 0x7fffffff;
            var tm_packet = 0x7fffffff;
            var minimal = 0;
    
            // 1.如果超时,立即刷新
            if (_itimediff(current_, ts_flush_) >= 10000 || _itimediff(current_, ts_flush_) < -10000)
            {
                ts_flush_ = current_;
            }
    
            // 2.如果超过了应该刷新的时间,立即刷新
            if (_itimediff(current_, ts_flush_) >= 0) return current_;
    
            tm_flush_ = (int)_itimediff(ts_flush_, current_);
    
            // 3.如果有需要超时重传的报文,立即刷新
            foreach (var seg in snd_buf) {
                var diff = _itimediff(seg.resendts, current_);
                if (diff <= 0) return current_;
                if (diff < tm_packet) tm_packet = (int)diff;
            }
    
            // 4.计算最终下一次更新的时间
            minimal = (int)tm_packet;
            if (tm_packet >= tm_flush_) minimal = (int)tm_flush_;
            if (minimal >= interval) minimal = (int)interval;
    
            return current_ + (UInt32)minimal;
        }
    
        // change MTU size, default is 1400
        public int SetMtu(Int32 mtu_)
        {
            if (mtu_ < 50 || mtu_ < (Int32)IKCP_OVERHEAD) return -1;
    
            var buffer_ = new byte[(mtu_ + IKCP_OVERHEAD) * 3];
            if (null == buffer_) return -2;
    
            mtu = (UInt32)mtu_;
            mss = mtu - IKCP_OVERHEAD;
            buffer = buffer_;
            return 0;
        }
    
        public int Interval(Int32 interval_)
        {
            if (interval_ > 5000) {
                interval_ = 5000;
            }
            else if (interval_ < 10) {
                interval_ = 10;
            }
            interval = (UInt32)interval_;
            return 0;
        }
    
        // fastest: ikcp_nodelay(kcp, 1, 20, 2, 1)
        // nodelay: 0:disable(default), 1:enable
        // interval: internal update timer interval in millisec, default is 100ms
        // resend: 0:disable fast resend(default), 1:enable fast resend
        // nc: 0:normal congestion control(default), 1:disable congestion control
        public int NoDelay(int nodelay_, int interval_, int resend_, int nc_)
        {
    
            if (nodelay_ > 0) {
                nodelay = (UInt32)nodelay_;
                if (nodelay_ != 0)
                    rx_minrto = IKCP_RTO_NDL;
                else
                    rx_minrto = IKCP_RTO_MIN;
            }
    
            if (interval_ >= 0) {
                if (interval_ > 5000)
                {
                    interval_ = 5000;
                }
                else if (interval_ < 10)
                {
                    interval_ = 10;
                }
                interval = (UInt32)interval_;
            }
    
            if (resend_ >= 0) fastresend = resend_;
    
            if (nc_ >= 0) nocwnd = nc_;
    
            return 0;
        }
    
        // set maximum window size: sndwnd=32, rcvwnd=32 by default
        public int WndSize(int sndwnd, int rcvwnd)
        {
            if (sndwnd > 0)
                snd_wnd = (UInt32)sndwnd;
    
            if (rcvwnd > 0)
                rcv_wnd = (UInt32)rcvwnd;
            return 0;
        }
    
        // get how many packet is waiting to be sent
        public int WaitSnd()
        {
            return snd_buf.Length + snd_queue.Length;
        }
    }
    

     

     

    展开全文
  • kcp

    2019-01-19 14:07:29
    KCP是一个快速可靠协议。 TCP是为流量设计的(每秒内可以传输多少KB的数据),讲究的是充分利用带宽。而 KCP是为流速设计的(单个数据包从一端发送到一端需要多少时间),以10%-20%带宽浪费的代价换取了比 TCP快30%...

    源码的官方网站   https://github.com/skywind3000/kcp

    KCP是一个快速可靠协议。

    TCP是为流量设计的(每秒内可以传输多少KB的数据),讲究的是充分利用带宽。而 KCP是为流速设计的(单个数据包从一端发送到一端需要多少时间),以10%-20%带宽浪费的代价换取了比 TCP快30%-40%的传输速度。TCP信道是一条流速很慢,但每秒流量很大的大运河,而KCP是水流湍急的小激流。
    KCP协议是一个纯粹的ARQ协议,通过重传机制实现UDP数据包的可靠传输。

    session ->kcp(ARQ)->udp(pachet)->ip->link->phy

    展开全文
  • KCP 是一个快速可靠协议,能以比 TCP浪费10%-20%的带宽的代价,换取平均延迟降低 30%-40%,且最大延迟降低三倍的传输效果。纯算法实现,并不负责底层协议(如UDP) 的收发,需要使用者自己定义下层数据包的发送方式...
  • <p>when deploying a kcp with 3 replicas, the controller do reconcile everything, generates the machines etc.. but after doing <pre><code> kubectl get kcp </code></pre> <p>the output is the following ...
  • KCP remediation proposal

    2020-11-27 20:33:32
    This PR updates the KCP document by introducing support for automatic remediation of unhealthy control-plane machines. <p>Kudos and credits to who kicked off this effort and laid the ground for this ...
  • KCP is scaling continuosly

    2020-11-28 08:23:30
    <p>We have been debugging KCP problem in Metal3 last two days. We deploy KCP with one replica and after the infrastructure is ready and our baremetal node is provisioned, it stays up for a while. But ...
  • As a user I would like to mark a specific controlplane Machine to be deleted by a KCP after I scale it down. <p><strong>Detailed Description As an example, I have three controlplane Machines (a. b, c)...
  • This PR makes KCP supporting external etcd. While investigating this issue I discovered that the CAPBK support for external etcd is broken for joining control-planes ...
  • <div><p>Currently, KCP scale out during the upgrade, and no other method is available. However, this is a very restrictive implementation in environments where the amount of resources is limited and ...
  • <ol><li>relax the validating webhook to allow mutations to the CoreDNS image details in KCP spec</li><li>add code to KCP controller that syncs the CoreDNS image details from the KCP to the kubeadm-...
  • Migrate to KCP backend.

    2020-11-29 03:05:23
    <div><p>Porting from gRPC to KCP. <p><strong>TODO</strong>: ~~1. Strange bug that upon peer discovery, the node will <code>Dial()</code> itself. Look into <code>network/discovery/*.go~~ ~~2. Have the ...
  • when editing a KCP resource to rotate ssh keys, the kcp validating webhook rejected the change. This is mainly due to the fact we're not adding <code>.spec.kubeadmConfigSpec.users</code> to the ...
  • Audit KCP test coverage

    2020-11-27 20:32:59
    <p>As already done at the beginning of v0.3.x series, we should re-audit test coverage for KCP and ensure that it is adequate for the complexity of this critical component <p>If I'm not wrong you ...
  • <p>As a baremetal cluster administrator I would like to upgrade my KCP without any extra resources. <p><strong>Detailed Description</strong></p> <p>The current KCP implementation always scale-out ...
  • <p>A mostly-functional prototype for having the KCP controller identify Machines that belong to the control plane of an existing cluster and adopt them. <p>Additionally, we set set a "best guess&#...
  • For both KCP and Machine Spec.Version, the version will be defaulted to have a <code>"v</code> prefix if the user doesn't pass it in with the prefix to avoid this being a breaking change. ⚠...
  • The roles specified for KCP here might not be needed https://github.com/kubernetes-sigs/cluster-api/blob/69347982dc6983615cd439e975cccb397083859e/controlplane/kubeadm/config/rbac/role.yaml#L68</p> <p>...
  • NAME-etcd but I get the error is that etcd unhealthy.</li><li>I see kcp controller call the function updateStatus and updateStatus will call GetWorkloadCluster, GetWorkloadCluster will use etcd tls....
  • 1. Clean up code in <a href="https://github.com/kubernetes-sigs/cluster-api/blob/5fdcb9e8dcf0ce93d9eaf984a1de1b1377290f8c/controlplane/kubeadm/internal/control_plane.go#L97-L146">KCP ControlPlane ...
  • <p>As a developer, we would like to be able to change the node registration options used by the KCP. When upgrading, we frequently change these settings to add node labels containing information about...
  • kcp-server, kcp服务器一个密钥安装外壳,用于 https kcp服务器##作为kcptun的搬运工,我只是提供了一键安装脚本,至于使用的原理啊、功能啊、bug啊请各位移步到kcptun项目,我真的无能为力。电子邮件服务器安装wget...
  • <div><p>When cluster delete happens before KCP controller able to add finalizer, the KCP object is deleted without deleting the CP Machine. <p>Seen this with capz. what happened is, the first time ...
  • With the introduction of new KCP conditions (https://github.com/kubernetes-sigs/cluster-api/pull/3674), we would like to call health checks in <code>reconcileDelete()</code> too so that we can get ...
  • This PR adds to KCP support for remediating unhealthy machines according to KCP proposal changes defined by https://github.com/kubernetes-sigs/cluster-api/pull/3676</p> <p><strong>Which issue(s) this ...
  • This PR backfills unit tests for KCP/workload_cluster components <p><strong>Which issue(s) this PR fixes</strong> (optional, in <code>fixes #<issue number>(, fixes #<issue_number>, ...)...
  • <div><p>In KCP controller the <code>updateStatus</code> method is in charge of updating <code>KubeadmControlPlane.Status</code> struct after each iteration. This line ...
  • kubectl get kcp -n default test-control-plane -o json | jq '.spec.version="v1.18.2"' | kubectl apply -f- </code></pre> <p><strong>Environment:</strong></p> <ul><li>Cluster-api ...
  • KCP will now automatically regenerate the Kubeconfig secret during cluster reconciliation. It will do so when the client certificate in it reaches it's half-life of 180 days. <p><strong>Which ...
  • <div><p><code>EtcdIsHealthy</code> and <code>ControlPlaneIsHealthy</code> are two functions in the critical path of all the KCP operations, however unit test coverage seems limited to the happy path ...
  • 1. Try to create a cluster with KCP.spec.kubernetesVersion set to something like <code>v1.19.1_mycompany.2 1. kubeadm bootstrapping fails with this in the output <pre><code> couldn't parse ...

空空如也

1 2 3 4 5 ... 20
收藏数 401
精华内容 160
关键字:

KCP