精华内容
下载资源
问答
  • Android Socket连接,包含心跳检测,断线重连,数据发送等等 博客地址:http://blog.csdn.net/yuzhiqiang_1993/article/details/78094909
  • 本篇文章简单介绍了在业务逻辑中处理断线重连的一种方法 之前一直对如何在业务逻辑中处理断线重连没有一个清晰的认识,后来做了一些思考,这里简单记录一下~ 假设存在一段业务逻辑 AAA ,整体实现上分为两部分: ...

    本篇文章简单介绍了在业务逻辑中处理断线重连的一种方法

    之前一直对如何在业务逻辑中处理断线重连没有一个清晰的认识,后来做了一些思考,这里简单记录一下~

    假设存在一段业务逻辑 A A A ,整体实现上分为两部分:

    • 服务器逻辑部分 A S A_S AS
    • 客户端逻辑部分 A C A_C AC

    一般来讲都是 A S A_S AS 负责维护逻辑状态与事件分发, A C A_C AC 则主要负责显示,输入等表现层的处理.

    假设 A C A_C AC 不存在状态存储,仅作为纯终端显示的话,那么我们就不用处理断线重连的问题了,因为 A C A_C AC 的显示(由 A S A_S AS 驱动)总是与 A S A_S AS 同步的.

    不过在现实的开发中并没有这么理想化, A C A_C AC 或多或少总会在本地存储一些状态,于是 A C A_C AC A S A_S AS 便产生了状态同步问题,如果网络条件良好,逻辑上也没有纰漏的话, A C A_C AC A S A_S AS 间的状态同步其实也不会存在什么问题.

    只是一旦引入断线重连,状态同步问题就出现了,因为在 A C A_C AC 断线然后进行重连的这段时间中, A S A_S AS 发生的状态变化将无法同步至 A C A_C AC, 甚至 A C A_C AC 重连成功之后, A S A_S AS 本身都可能因为处理完毕而结束了自己的逻辑过程.

    那么如何正确的处理这种情况下的断线重连呢?

    以下是我的一点思考:

    • A S A_S AS A C A_C AC 都监听并处理 o n _ r e l a y _ s u c c e s s on\_relay\_success on_relay_success 事件
    • A C A_C AC o n _ r e l a y _ s u c c e s s on\_relay\_success on_relay_success 事件中将本地所有相关的逻辑状态清空
    • A S A_S AS o n _ r e l a y _ s u c c e s s on\_relay\_success on_relay_success 事件中将 A C A_C AC 所需要的逻辑状态做一次全量同步(需要保证 A S A_S AS o n _ r e l a y _ s u c c e s s on\_relay\_success on_relay_success 事件发生在 A C A_C AC o n _ r e l a y _ s u c c e s s on\_relay\_success on_relay_success 事件之后)

    除了逻辑状态以外, A S A_S AS A C A_C AC 之间可能还会进行事件通知,推荐规避这些事件通知,都改以状态(的变化)实现.

    采用上述方案之后, A C A_C AC 就能在重连成功之后,获得最新的 A S A_S AS 状态,于是便能与 A S A_S AS 再次形成同步;即便此时 A S A_S AS 逻辑已经退出,不再能推送当前状态信息,也因为 A C A_C AC o n _ r e l a y _ s u c c e s s on\_relay\_success on_relay_success 之后主动做了一次状态清除操作,所以状态上也是同步的( A S A_S AS 退出便意味着 A C A_C AC 状态需要清除).

    展开全文
  • 断线重连总结

    千次阅读 2017-07-11 13:26:29
    断线重连总结 gateserver负责所有与客户端的直接连接 m_conns[10000]也就是一个gateserver最多可以维持10000条socket连接,蓝月采用的是tcp 行走各种消息都是tcp,不存在丢包一说,只会延迟 每个客户端...


    断线重连总结


    gateserver负责所有与客户端的直接连接
    m_conns[10000]也就是一个gateserver最多可以维持10000条socket连接,蓝月采用的是tcp
    行走各种消息都是tcp,不存在丢包一说,只会延迟


    每个客户端点击登录时会做以下事情
    建立socket tcp连接,向gateserver 的ip port发送请求,
    gateserver收到后,建立连接,这时候客户端与gateserver之间socket连接成功.
    gateserver将这个conn(gateserver与这个客户端的连接)加入m_conns中。m_conns m_userPool,
    int index = m_conns.add(con),
    User* user = m_userPool.pop();
    user.setIndex(index);//序号表示第几个连接,比如第一个玩家连进来,第二个玩家连进来,第n个玩家连进来。。。
    conn->setPtr(user); //将user 与 conn对应


    1 gateserver与gameserver之间的通信


    当gateserver中连接进来一个用户后,
    用户登录成功,获取角色等过程暂时忽略,直接进入后面游戏协议
    客户端发送cm_game_pro 给gateserver,gateserver转发给gameserver
    这里每个user都有一个gameconn,这个连接表示此玩家gateserver与gameserver之间用的哪个连接
    因为并不是一条连接的,比如当游戏规模比较大,1个gateserver,多个gameserver,
    蓝月这里默认配置是两格gameserver,一般只用一个,当一个出现问题,比如崩了或者其他原因,可以通过更改
    配置,把玩家重新通过gateserver练到另一个gameserver,避免出现游戏无法进入的问题。


    SERVICE_SERVER.forwardToService(SN_GAME, conn, inPacket);
    gateserver中GateServer是gateserver与客户端的连接,作为tcpserver.
    gateserver与gameserver连接中,serviceserver是作为tcpserver的,gameserver作为tcpclient
    gameserver与gateserver服务器启动的时候会连上,
    玩家此时还没有user自己的gameconn,
    第一次会分配一个,假如默认gameserver1,
    那么user的gameconn就得到了,然后客户端发来的消息gateserver接收后,serviceserver通过user的gameconn发给对应的gameserver


    2 gameserver的处理
    收到消息放到一个消息队列,单独线程轮训此消息队列,处理
    发过来的消息都会有index,这个字段表示的是,此玩家客户端对应的gateserver哪个连接,
    第一次时候gameserver此时还没有此玩家的记录
    gameserver维护一个m_users[10000]
    对每一个新来的连接index,分配对应的m_users[index]


    cgindex,uid,sid都是gateser发过来的
    User *user = GAME_POOL.popUser();
    user->init(uid, sid);
    m_users[cgindex] = user;
    gameservice添加user就完成了,


    user对应一个player player就是游戏中的核心,移动,打怪,挖宝,传送,强化等所有操作都是绑定在player上面,发的消息也都是
    player处理
    gameservice收到消息,根据index,得到m_user[index]得到user,user->getPlayer得到player,然后player->addNetPakcet();
    然后player消息轮训处理这个netpacket,处理完
    下发消息通过player对应user的
    gameservice发给他的tcpserver,也就是gateserver中serviceserver,当然index也会发下去
    我们知道gameserver和gateserver之间的连接只有一个,
    gateserver的serviceserver收到后,根据index取得m_conns[index],得到这个玩家的客户端的连接
    然后下发客户端,流程结束




    3 select,epoll,socket
    int ret = select(0, rfds, wfds, NULL, &m_timeout);
    这句话的意思就是,系统监听多个socket连接,发现那个有变化就会知道,具体暂时略过
    总之ret>0说明有了变化
    dispatchEvent(rfds, wfds);tcp是双向的,所以读写变化都有
    然后遍历这些fd集合,m_conns是一个socket连接map,map<fd,tcpconn>,一个文件句柄(socket连接也是文件句柄)对应一个tcp连接
    每个连接看看 pConn->recv() pConn->onSend();
    具体就是
    recv()
    {
    int error = ::recv(m_fd, m_recvBuffer+m_recvOffset, m_recvBufferSize-m_recvOffset, 0);
    error>0说明确实这个socket tcp连接收到了消息,那么处理这些数据,数据的解析参照之前我的博文,
    //解析头解析消息体等等 tcpprocessor->postMessage,把整个netpacket交给能处理的人
    error==0说明有问题,连接断开了
    }
    onSend()
    {
    int error = ::send(m_fd, m_sendBuffer+m_sendCursor, m_sendLen-m_sendCursor, 0);
    //大于0说明有消息可发
    //等于0说明异常,连接断开了
    }
    之前的处理是,当异常,直接remove(pConn),然后处理一系列后续操作,玩家离线,登出等处理


    TcpMultiplexor --> 维护所有tcp连接集合,处理每个conn的read write读写消息
    TcpConnection --> 文件句柄(socket)->tcp连接
    TcpProcessor --> 多个connection用一个processor处理 
    TcpAcceptor --> 每个tcpserver都有一个acceptor,就是监听器,如下
    while(m_listening)
    {
    //监听线程 死循环
    socket_t fd = accept(m_listenFd, sockaddr_cast(&addr_in), &len);
    onAccept(fd, inetaddr);//有连接 建立连接 文件句柄,inetaddress地址
    pConn->bind(fd, addr, m_processors[m_nextProcessor]);//绑定端口地址 处理
    //一个connection 一个processor,processor就是把连接收到的netpacket post给对应的tcpserver(例如gateserver)
    //然后tcpserver(gateserver)自己会轮训处理这些netpacket,头部不管,已经解析完了,只用后面的readInt等对应协议即可
    m_multiplexors[m_nextMultiplexor++]->add(pConn,MASK_READ);
    //这个conn加入进了multiplexor 后续有了动作就可以处理了
    }
    TcpServer --> gateserver等


    4 断线重连
    以上流程已经搞清楚了 断线重连也不难
    两种:断开连接重新登录,断开连接不需重新登陆


    当select/epoll send recv结果为0,本来连接直接断开的,现在保留60s
    因为手机环境下,打电话,移动网络等很大概率很频繁会出现断网,也就是socket完全断开的情况
    因此,断开不能理解就把整个connection remove掉,remove掉就是跟页游端玩家退出游戏一样,
    类似妖妖平台的DOTA,断开连接 会尝试连接,1分钟连不上那么就判断完全断开,结束游戏。重新登录
    时间之内,点击断线重连,这时候客户端断开之前的socket,开启新的socket连接gateserver,
    连接成功,gateserver里面m_conn多了一个新的连接,找到该用户之前的conn,替换掉,同时通知gameserver
    m_users本来的c_gindex换位新的c_gindex,
    old odl
    client -------- gateserver   --------- gameserver
    --------      
    new
    因为在时间内,没有超过60s,我不会执行remove的操作,因此这个玩家的状态还是登录状态,别人看到的他是在线的
    一分钟后没连上就会真的离开游戏
    新旧连接替换好之后,类似于给人接骨,练好后,客户端新的socket的消息能发给gateserver,gateserver发给gameserver,
    gameserver能正确的交给正确的user,player处理,下发消息gameserver能正确的交给gateserver,gateserver能正确的返还给

    正确的客户端,那么重连的目标就达到了。

    new

    连接还是要断开的,因为epoll的边缘触发,连接断开,我没有把它从监听句柄移除的话,那么它会不停地触发事件,甚至cpu占满。应该改为,这个连接还是要从多路复用监听里面移除,但是玩家不执行logout操作,保持在线,不登出,gameserver那里逻辑层,设置玩家状态,掉线了,记录时间戳,然后当时间检测,发现掉线时间超过60秒执行logout操作,当然多线程环境下,调用函数都是使用消息,这样可以保证多线程不出问题,添加player的logout消息,。

    重连的话,就是gateserver这个保留这个user,当发现收到重连的消息,找到之前的user,把之前user的gameconn设置新连接的gameconn,就行了,然后一些细节处理。(注意这里的连接值客户端与gateserver之间的连接,gameconn指gateserver与gameserver之间的连接)

    展开全文
  • Websocket 断线重连及心跳问题解决方案预备代码解决方案断线重连心跳 预备代码 为描述方便,先将简单的 Websocket 连接函数 ws_connect() 贴出来,ws 为 Websocket 对象: var ws; /** * 连接 websocket * @param...

    预备代码

    为描述方便,先将简单的 Websocket 连接函数 ws_connect() 贴出来,ws 为 Websocket 对象:

    var ws;
    
    /**
     * 连接 websocket
     * @param func onopen要执行的函数,可以为空
     */
    function ws_connect(func) {
        ws = new WebSocket("ws://" + ws_ip);
        // 服务端主动推送消息时会触发这里的 onmessage
        ws.onmessage = function (e) {
            console.log('ws_onmessage ');
        };
        ws.onopen = function (e) {
            console.log('ws_onopen');
            // 开启心跳
            ws_heart();
            if (typeof func == 'function') {
                func();
            }
        };
        ws.onerror = function (e) {
            console.error("ws_onerror:", e);
        };
        ws.onclose = function (e) {
            console.log('ws_onclose:code:' + e.code + ';reason:' + e.reason + ';wasClean:' + e.wasClean);
        };
    }
    
    $(function () {
        var func = function () {
            var data = {type: 'login'};
            ws.send(JSON.stringify(data));
        };
    	// 页面加载时第一次连接,也可以传空
        ws_connect(func);
    });
    

    解决方案

    断线重连

    重新连接的时候 Websocket 的属性 readyState 有着至关重要的作用,先了解一下这个属性的含义(以下其中之一):

    • 0 (CONNECTING)
      正在链接中
    • 1 (OPEN)
      已经链接并且可以通讯
    • 2 (CLOSING)
      连接正在关闭
    • 3 (CLOSED)
      连接已关闭或者没有链接成功

    除了第一次连接,每一次连接时都必须考虑当前的连接状态,比如我要执行 ws.send(data); ,在四种状态下的执行时机是不同的,如下:

    • 0 (CONNECTING)
      正在链接中 - 等连接成功后,在 ws.onopen 里执行
    • 1 (OPEN)
      已经链接并且可以通讯 - 直接执行就是了
    • 2 (CLOSING)
      连接正在关闭 - 等关闭完成后,在 ws.onclose 里重新连接,在重连成功的 ws.onopen 里执行
    • 3 (CLOSED)
      连接已关闭或者没有链接成功 - 重新连接,在重连成功的 ws.onopen 里执行

    这样每次发送数据都能保证连接成功(除非网络断了或服务器挂了),写一个函数 ws_execute() 封装这些操作,这个函数如下:

    /**
     * 根据连接状态单线程连接 websocket
     * @param func onopen要执行的函数,可以为空
     */
    function ws_execute(func) {
        console.log('ws_execute:readyState:' + ws.readyState);
        if (ws.readyState == 0) {
            // 正在链接中
            var _old$open = ws.onopen;
            ws.onopen = function (e) {
            	// 原本 onopen 里的代码先执行完毕
                _old$open.apply(this, arguments);
                if (typeof func == 'function') {
                    func();
                }
            };
        } else if (ws.readyState == 1) {
            // 已经链接并且可以通讯
            if (typeof func == 'function') {
                func();
            }
        } else if (ws.readyState == 2) {
            // 连接正在关闭
            var _old$close = ws.onclose;
            ws.onclose = function (e) {
            	// 原本 onclose 里的代码先执行完毕
                _old$close.apply(this, arguments);
                ws_connect(func);
            };
        } else if (ws.readyState == 3) {
            // 连接已关闭或者没有链接成功
            ws_connect(func);
        }
    }
    

    业务逻辑里发送数据是这样的(代码片断):

    	// 发送数据时,将代码构造成函数作为参数,等 onopen 时执行
    	var func = function () {
    	    var data = {type: 'audio'};
    	    ws.send(JSON.stringify(data));
    	};
    	ws_execute(func);
    

    心跳

    有了上面的 ws_execute() 函数,心跳就简单了,比如每1分钟向服务器发送一次数据:

    var ws_heart_i = null;
    
    /**
     * websocket 每1分钟发一次心跳
     */
    function ws_heart() {
        if (ws_heart_i) clearInterval(ws_heart_i);
        ws_heart_i = setInterval(function () {
            console.log('ws_heart');
            var func = function () {
                var data = {type: 'ping'};
                ws.send(JSON.stringify(data));
            };
            ws_execute(func);
        }, 60000);
    }
    

    ws_heart() 函数放在 ws.onopen 里就可以了。

    展开全文
  • Netty的断线重连

    千次阅读 2019-05-31 17:24:20
    因为工作中经常使用到TCP,所以会频繁使用到诸如Mina或Netty之类的通信框架,为了... * 提供重连功能,需传入bootstrap,并实现handlers */ @ChannelHandler.Sharable public abstract class FunctionsChanne...

    因为工作中经常使用到TCP,所以会频繁使用到诸如Mina或Netty之类的通信框架,为了方便项目的逻辑调用,经常会在框架的基础上再一次进行封装,这样做其实有画蛇添足的嫌疑,但也是无奈之举。

    这里主要记载使用Mina和Netty,构建适合项目的一个完整的重连逻辑。
    当然,都是作为客户端,毕竟一般只有客户端才会做重连。

    在这之前,需要考虑几个问题:

    • 连接行为的结果可以较为方便地获得,成功或失败,最好直接有接口回调,可以在回调中进行后续逻辑处理
    • 当前通信连接的活跃状态需要准确实时而方便地获得,这样有利于重连时对连接的判断
    • 能够较为灵活的配置Listener或Handler或Filter
    • 支持计数,无论是首次连接失败多次后不再尝试连接,还是中途断开后断线重连多次后不再尝试连接,一般不作无休止地重连

    从代码层面看,框架中最好有一个类似Connector的类,能够暴露合适的接口或方法,提供各种状态与回调,使通信连接的动向能够实时把握,然而事情并不是那么美好。

    连接结果

    由于框架设计的一些原则,一个connector根本不足以暴露这些接口。
    对于Mina而言,作为客户端一般用于连接的连接器是NioSocketConnector
    对于Netty而言,则是Bootstrap

    下表是一些常见的定义在两个框架中的对比,不一定准确,但意义相近;

    定义MinaNetty
    连接器SocketConnectorBootstrap
    会话IoSessionChannel
    连接结果ConnectFutureChannelFuture
    逻辑处理IoHandlerChannelHandler
    过滤器IoFilterChannelHandler

    对于Mina而言,连接操作是这样的:

                ConnectFuture future = mConnector.connect();
                future.awaitUninterruptibly();
    			if (future.isConnected()) {
    				//得到会话
                    mSession = future.getSession();
    			}
    

    对于Netty来说,连接可以写成与Mina几乎相同的形式:

                ChannelFuture future = bootstrap.connect();
                future.awaitUninterruptibly();
    			if(future.isSuccess()){
                    mChannel = future.channel();
    			}
    

    也可以不阻塞等待,两种future都可以自行添加Listener监听异步任务是否完成:

    //Mina
                future.addListener(new IoFutureListener<IoFuture>() {
    				@Override
    				public void operationComplete(IoFuture ioFuture) {
    
    				}
    			});
    //Netty
                future.addListener(new GenericFutureListener<ChannelFuture>() {
    				@Override
    				public void operationComplete(ChannelFuture f) throws Exception {
    
    				}
    			});
    

    毕竟是出自一人之手,部分API真是惊人的相似。
    到这里,第一个连接返回结果问题算是有所结论,两种框架都可以正常返回连接的结果。

    会话状态

    而上述代码中,返回的mSession与mChannel就是得到的会话,这两种类各自提供了一些接口,可以用于获得通信连接的实时状态。
    Mina的IoSession这里只取部分方法:

    public interface IoSession {
        IoHandler getHandler();
    
        IoSessionConfig getConfig();
    
        IoFilterChain getFilterChain();
    
        ReadFuture read();
    
        WriteFuture write(Object var1);
    
        WriteFuture write(Object var1, SocketAddress var2);
    
        CloseFuture closeNow();
    
        boolean isConnected();
    
        boolean isActive();
    
        boolean isClosing();
    
        boolean isSecured();
    
        CloseFuture getCloseFuture();
    
        SocketAddress getRemoteAddress();
    
        SocketAddress getLocalAddress();
    
        SocketAddress getServiceAddress();
    
        boolean isIdle(IdleStatus var1);
    
        boolean isReaderIdle();
    
        boolean isWriterIdle();
    
        boolean isBothIdle();
    }
    

    再对比看下Netty提供的Channel,这里也只取部分方法展示:

    public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
        EventLoop eventLoop();
    
        Channel parent();
    
        ChannelConfig config();
    
        boolean isOpen();
    
        boolean isRegistered();
    
        boolean isActive();
    
        SocketAddress localAddress();
    
        SocketAddress remoteAddress();
    
        boolean isWritable();
    
        Channel.Unsafe unsafe();
    
        ChannelPipeline pipeline();
    
        public interface Unsafe {
            SocketAddress localAddress();
    
            SocketAddress remoteAddress();
    
            void register(EventLoop var1, ChannelPromise var2);
    
            void bind(SocketAddress var1, ChannelPromise var2);
    
            void connect(SocketAddress var1, SocketAddress var2, ChannelPromise var3);
    
            void disconnect(ChannelPromise var1);
    
            void close(ChannelPromise var1);
    
            void write(Object var1, ChannelPromise var2);
    
            void flush();
        }
    }
    

    可以看出,无论是IoSession还是Channel,都有相关的API可以知晓通信是否活跃,所以第二个问题在可以获得IoSession或Channel的情况下,是没有问题的。

    配置Handler

    那么再看配置Listener或Handler的相差操作是否灵活。
    二者在这方面的差别较为明显。

    对于Mina而言,添加Handler可以直接利用Connector,真正的逻辑Handler只能由setHandler方法添加,且只能为一个,而相关的Filter则要通过getFilterChain()拿到的过滤器集合去添加;对于Mina来说,Handler和Filter是没有交集的,他们分属不同的接口IoHandler和IoFilter

    mConnector.setHandler(handler);
    mConnector.getFilterChain().addLast(CODEC, protocolCodecFilter);
    

    Netty有所不同,netty中所有的handler、filter都是ChannelHandller,这些handler都要在连接行为发生后才能生效,也就是挂载到Channel上的,而不是Bootstrap,一般添加是这样的:

    bootstrap.handler(handler);
    channel.pipeline().addLast(someHandler);
    channel.pipeline().addLast(someFilter);
    

    但handler依旧只能添加一个,如果要添加多个handler或filter,就必须获取到channel,然后进行添加,netty本身提供了一个ChannelInitializer可以用于添加多个channelHandler,一般会这么写:

                bootstrap.handler(new ChannelInitializer<Channel>() {
    				@Override
    				protected void initChannel(Channel channel) throws Exception {
                        channel.pipeline().addLast(handler);
                        channel.pipeline().addLast(someHandler);
                        channel.pipeline().addLast(someFilter);
    				}
    			});
    

    对于Netty来说,Handler和Filter是同一个东西,都是ChannelHandler

    两者在这方面的区别比较明显:
    一是netty将handler和filter都统一为handler了,
    二是netty不能像mina一样,在未连接之前就可以配置所有的Handler或Filter,netty必须获得channel也就是连接成功后才能配置多个Filter。

    这就造成了一个问题,Mina可以提前就配置监听器监听连接的状态,可以正常监听中途断开,也就是在创建Connector后就可以挂载上监听:

            mConnector.getFilterChain().addFirst("reconnect", new IoFilterAdapter() {
    			@Override
    			public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {				
    				//监听到断开,可接入回调接口,做进一步的重连逻辑
                    mConnector.connect();
    			}
    		});
    

    而Netty不能,创建Connector也就是Bootstrap并不能实现类似的挂载,Bootstrap只能挂载一个Handler,而相关的过滤器或监听只能在Channel出现后再进行挂载,那么就会写成这样:

                bootstrap.handler(new ChannelInitializer<Channel>() {
    				@Override
    				protected void initChannel(Channel channel) throws Exception {
    					//添加其他Filter或Handler
    				}
    
    				@Override
    				public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    					super.channelInactive(ctx);
    					//监听到断开,重连
                        bootstrap.connect();
    				}
    			});
    

    这里initChannel方法永远是最先被调用的,因为在源码中是这样的:

    //ChannelInitializer.java
        protected abstract void initChannel(C var1) throws Exception;
    
        public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            if (this.initChannel(ctx)) {
                ctx.pipeline().fireChannelRegistered();
                this.removeState(ctx);
            } else {
                ctx.fireChannelRegistered();
            }
        }
    

    在这种逻辑下,Mina可以在sessionClosed回调中使用SocketConnetor进行重连,Netty可以在channelInactive回调中使用Bootstrap进行重连。
    看起来没什么毛病。

    但需要注意一点,就是Handler的复用问题,也就是对Handler或Filter的检查,Mina和Netty都有对Handler的重复添加进行过检查,不过检查逻辑有细微的差别。
    Mina中是这样检查的:

    //DefaultIoFilterChain.java
        public synchronized void addLast(String name, IoFilter filter) {
            this.checkAddable(name);
            this.register(this.tail.prevEntry, name, filter);
        }
    
        private final Map<String, Entry> name2entry = new ConcurrentHashMap();
    
        private void checkAddable(String name) {
            if (this.name2entry.containsKey(name)) {
                throw new IllegalArgumentException("Other filter is using the same name '" + name + "'");
            }
        }
    

    可以看到,Mina只会检查Filter在Map中对应的key是否被使用过,当然理论上Filter挂载在SocketConnector的FilterChain中,只要配置过一次,就无需再进行配置。

    那么Netty呢?
    Netty的Handler不是能随意复用的,要复用必须标明注解@Sharable,否则就会出现异常:

    警告: Failed to initialize a channel. Closing: [id: 0x1caafa97]
    io.netty.channel.ChannelPipelineException: io.netty.handler.timeout.IdleStateHandler is not a @Sharable handler, so can't be added or removed multiple times.
    

    这是因为在源码进行检查时,是对Handler本身进行检查的,handler会有一个added的属性,一旦被添加使用过,就会置为true,而判断逻辑会阻止为added=true的handler添加进来 。这样一来,如果强行添加已经添加过的handler就会抛出异常:

    //DefaultChannelPipeline.java
     public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
            AbstractChannelHandlerContext newCtx;
            synchronized(this) {
                checkMultiplicity(handler);
                newCtx = this.newContext(group, this.filterName(name, handler), handler);
                //省略部分代码
            }
    
            this.callHandlerAdded0(newCtx);
            return this;
        }
        
       private static void checkMultiplicity(ChannelHandler handler) {
            if (handler instanceof ChannelHandlerAdapter) {
                ChannelHandlerAdapter h = (ChannelHandlerAdapter)handler;
                if (!h.isSharable() && h.added) {
                    throw new ChannelPipelineException(h.getClass().getName() + " is not a @Sharable handler, so can't be added or removed multiple times.");
                }
                h.added = true;
            }
        }
    

    这也就说明使用channel.pipeline().addLast(handler)这种方法添加handler时,如果想不同的Channel添加同一个Handler实例,每种handler都必须注解了@Sharable,如果正好要使用IdleStateHandler这种源码内部的Handler,而IdleStateHandler是没有注解过@Sharable,那么就会出现上面的异常。
    而实际应用中,为了实现心跳,IdleStateHandler是一般都会使用到的。

    那么问题来了,Mina每次重新连接,创建新的session,但只要SocketConnector没有变,所有Handler和Filter自然就没有变,仍然可用,因为所有Handler和Filter是挂载到SocketConnector的FilterChain中,算是只和Connector相关的;
    而Netty,如果重新连接的话,会创建新的Channel,然后会重新调用initChannel,然后利用channel.pipeline().addLast添加Handler,算是挂载到Channel上的,而不是Bootstrap上。

    这样显示出两者最大的区别就是,Mina中配置一次即可,而Netty则需要每次产生新的Channel时对其进行重新配置。

    所以Netty中的handler想复用的话,就必须加注解,否则就会报异常。如果一定要用到无法注解@Sharable的Handler,比如上面的IdleStateHandler,那就要想办法每次initChannel时,也新建一个新的IdleStateHandler…
    或者,继承IdleStateHandler,然后加上注解也行,虽然也很丑就是了。

    So Bad…

    这样的情况下,可以想办法,每次都新建,类似这种:

                FunctionsChannelHandler functionsChannelHandler = new FunctionsChannelHandler(bootstrap){
    
    				@Override
    				public ChannelHandler[] handlers() {
    					return new ChannelHandler[]{
    							new NormalClientEncoder(),
    							new IdleStateHandler(20, 10, 20),
    							this,
    							new NormalClientHandler()};
    				}
    			};
    
                bootstrap.handler(new ChannelInitializer<Channel>() {
    				@Override
    				protected void initChannel(Channel channel) throws Exception {
    					//添加各种handler
                        channel.pipeline().addLast(functionsChannelHandler.handlers());
    				}
    
    				@Override
    				public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    					super.channelInactive(ctx);
    					//监听到断开
    				}
    			});
    

    因为netty把所有监听器过滤器逻辑处理都归为ChannelHandler的原因,把一个handler扩展成一个功能较为丰富的handler是一种不错的方法 。或者沿用这种思路,使其每次新加Handler时,都是new过的Handler。
    应对框架自带的一些未注解@Sharable的类,也可以继承之,自行加注解:

    @ChannelHandler.Sharable
    public class HeartHandler extends IdleStateHandler {
    	public HeartHandler(int readerIdleTimeSeconds, int writerIdleTimeSeconds, int allIdleTimeSeconds) {
    		super(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds);
    	}
    
    	public HeartHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
    		super(readerIdleTime, writerIdleTime, allIdleTime, unit);
    	}
    
    	public HeartHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit) {
    		super(observeOutput, readerIdleTime, writerIdleTime, allIdleTime, unit);
    	}
    }
    

    这样一来,配置Handler也勉强可算灵活了。

    连接计数

    对连接计数一般都是开发者编写的逻辑,主要是应对无休止地连接。
    主要应用在两种场景:
    一是首次连接,如果多次连接不成功,那么停止连接,或者另有逻辑;
    二是断线重连,如果多次重连不成功,那么停止连接并销毁,或者另有逻辑。

    因为Mina和Netty都是多线程模型的缘故,计数为了求稳可以直接使用Atom类,当然觉得大材小用也可以直接使用普通int值,毕竟理论上两次连接中间应该会有一定延时才对。

    应用示例

    所以最后,都可以对各自的连接器进行二次封装,然后编写对自己有利的逻辑。
    对于Mina,大概可以写成这样:

    public class TCPConnector {
    
    	private static final int BUFFER_SIZE = 10 * 1024;
    	private static final long CONNECT_TIMEOUT_MILLIS = 10 * 1000;
    	private static final int KEEPALIVE_REQUEST_INTERVAL = 10;
    	private static final int KEEPALIVE_REQUEST_TIMEOUT = 40;
    
    	private static final String RECONNECT = "reconnect";
    	private static final String CODEC = "codec";
    	private static final String HEARTBEAT = "heartbeat";
    	private static final String EXECUTOR = "executor";
    
    
    	/**
    	 * 连接器
    	 */
    	private SocketConnector mConnector;
    	/**
    	 * 会话
    	 */
    	private IoSession mSession;
    
    	/**
    	 * 外用接口
    	 */
    	private IConnectorListener connectorListener;
    
    	/**
    	 * 连接所在线程
    	 */
    	private ExecutorService mExecutor;
    
    	/**
    	 * 重连次数
    	 */
    	private AtomicInteger recconnectCounter;
    	/**
    	 * 首次连接次数
    	 */
    	private AtomicInteger connectCounter;
    
    	private String host;
    	private int port;
    
    	public interface IConnectorListener {
    		/**
    		 * 连接建立成功
    		 */
    		void connectSuccess(IoSession session);
    
    		/**
    		 * 连接建立失败
    		 */
    		void connectFailed();
    
    		/**
    		 * 连接中途断掉时
    		 */
    		void sessionClosed(IoSession session);
    	}
    
    	public TCPConnector() {
    		mConnector = new NioSocketConnector();
    		recconnectCounter = new AtomicInteger(0);
    		connectCounter = new AtomicInteger(0);
    	}
    
    	/**
    	 * 设置目标地址与端口
    	 *
    	 * @param host 目标地址
    	 * @param port 目标端口
    	 */
    	public void setHostPort(String host, int port) {
            L.d("设置地址与端口-" + host + ":" + port);
    		this.host = host;
    		this.port = port;
    	}
    
    	public String getHost() {
    		return this.host;
    	}
    
    	/**
    	 * 在子线程中启用连接
    	 */
    	public void connectInThread() {
            mExecutor.execute(new Runnable() {
    			@Override
    			public void run() {
    				connect();
    			}
    		});
    	}
    
    	/**
    	 * 根据设置的参数连接
    	 */
    	private void connect() {
    		//如果已经连接,则直接【连接成功】
    		if (mSession == null || !mSession.isConnected()) {
    			//连接
                mConnector.setDefaultRemoteAddress(new InetSocketAddress(host, port));
                L.i("连接-->" + host + ":" + port);
                ConnectFuture future = mConnector.connect();
    			//阻塞,等待连接建立响应
                future.awaitUninterruptibly(CONNECT_TIMEOUT_MILLIS);
    			//响应连接成功或失败
    			if (future.isConnected()) {
    				//得到会话
                    mSession = future.getSession();
    				if (connectorListener != null) {
                        connectCounter.set(0);
                        connectorListener.connectSuccess(mSession);
    				}
    			} else {
    				if (connectorListener != null) {
                        connectCounter.incrementAndGet();
                        connectorListener.connectFailed();
    				}
    			}
    		} else {
    			if (connectorListener != null) {
                    connectCounter.incrementAndGet();
                    connectorListener.connectSuccess(mSession);
    			}
    		}
    	}
    
    	/**
    	 * 重连
    	 */
    	private void reconnect() {
    		if (mConnector == null)
    			throw new IllegalArgumentException("IoConnector cannot be null");
    		//如果已经连接,则直接【连接成功】
    		if (mSession == null || !mSession.isConnected()) {
    			//连接
                ConnectFuture future = mConnector.connect();
    			//阻塞,等待连接建立响应
                future.awaitUninterruptibly();
    			//响应连接成功或失败
    			if (future.isConnected()) {
    				//得到会话
                    mSession = future.getSession();
    			}
    		}
    	}
    
    	/**
    	 * 重连
    	 *
    	 * @param reconnectTimeoutMills 连接的超时时间
    	 * @param reconnectTimes        重连次数
    	 */
    	public void reconnect(final long reconnectTimeoutMills, final int reconnectTimes) {
    		try {
                recconnectCounter.set(0);
    			while (mConnector != null && !(mSession != null && mSession.isConnected()) && recconnectCounter.incrementAndGet() < reconnectTimes) {
    				reconnect();
    				if (mSession != null && mSession.isConnected()) {
    					break;
    				}else{
                        TimeUnit.MILLISECONDS.sleep(reconnectTimeoutMills);
    				}
    				L.w(Thread.currentThread().getName() + "," + "重连" + host + ":" + port + "(" + recconnectCounter.get() + ")次...");
    			}
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} finally {
    			if (mSession != null && mSession.isConnected()) {
    				if (connectorListener != null) {
                        connectorListener.connectSuccess(mSession);
    				}
    			} else {
    				if (connectorListener != null) {
                        connectorListener.connectFailed();
    				}
    			}
    		}
    	}
    
    	public IoSession getSession() {
    		return mSession;
    	}
    
    	public IoConnector getConnector() {
    		return mConnector;
    	}
    
    	public boolean isActive() {
    		return mConnector != null && mConnector.isActive() && mSession != null;
    	}
    
    	public boolean isConnected() {
    		return mSession != null && mSession.isConnected();
    	}
    
    	public IConnectorListener getConnectorListener() {
    		return connectorListener;
    	}
    
    	public int getRecconnectCounter() {
    		return recconnectCounter.get();
    	}
    
    	public int getConnectCounter() {
    		return connectCounter.get();
    	}
    
    	public void resetConnectCounter() {
            connectCounter.set(0);
    	}
    
    	/**
    	 * 断开连接,释放资源
    	 */
    	public void disconnect() {
    		if (mConnector != null) {
                connectorListener = null;
                mConnector.getFilterChain().clear();
                mConnector.dispose();
                mConnector = null;
    		}
    		if (mSession != null) {
                mSession.closeNow();
                mSession = null;
    		}
    		if (mExecutor != null) {
                mExecutor.shutdown();
                mExecutor = null;
    		}
            L.w("断开");
    	}
    
    	public static class Builder {
    		private TCPConnector newInstance = new TCPConnector();
    		private ProtocolCodecFilter protocolCodecFilter;
    		private KeepAliveFilter keepAliveFilter;
    
    		public Builder setExecutor(ExecutorService executor) {
                newInstance.mExecutor = executor;
    			return this;
    		}
    
    		public Builder setConnectListener(IConnectorListener listener) {
                newInstance.connectorListener = listener;
    			return this;
    		}
    
    		public Builder setHost(String host) {
                newInstance.host = host;
    			return this;
    		}
    
    		public Builder setPort(int port) {
                newInstance.port = port;
    			return this;
    		}
    
    		public Builder setProtocolCodecFilter(ProtocolCodecFactory protocolCodecFactory) {
                protocolCodecFilter = new ProtocolCodecFilter(protocolCodecFactory);
    			return this;
    		}
    
    		public Builder setConnectTimeoutMillis(long connectTimeoutMillis) {
                newInstance.mConnector.setConnectTimeoutMillis(connectTimeoutMillis);
    			return this;
    		}
    
    		public Builder setKeepAliveFilter(KeepAliveMessageFactory keepAliveMessageFactory, int keepAliveRequestInterval) {
                keepAliveFilter = new KeepAliveFilter(keepAliveMessageFactory, IdleStatus.BOTH_IDLE, KeepAliveRequestTimeoutHandler.LOG, keepAliveRequestInterval, KEEPALIVE_REQUEST_TIMEOUT);
    			return this;
    		}
    
    		public Builder setKeepAliveFilter(KeepAliveMessageFactory keepAliveMessageFactory, KeepAliveRequestTimeoutHandler keepAliveRequestTimeoutHandler, int keepAliveRequestInterval, int keepAliveRequestTimeOut) {
                keepAliveFilter = new KeepAliveFilter(keepAliveMessageFactory, IdleStatus.BOTH_IDLE, keepAliveRequestTimeoutHandler, keepAliveRequestInterval, keepAliveRequestTimeOut);
    			return this;
    		}
    
    
    		public Builder setHandlerAdapter(IoHandlerAdapter handler) {
                newInstance.mConnector.setHandler(handler);
    			return this;
    		}
    
    		public Builder setReadBuffer(int size) {
                newInstance.mConnector.getSessionConfig().setReadBufferSize(size);
    			return this;
    		}
    
    		public Builder setReceiveBuffer(int size) {
                newInstance.mConnector.getSessionConfig().setReceiveBufferSize(size);
    			return this;
    		}
    
    		public Builder setSendBuffer(int size) {
                newInstance.mConnector.getSessionConfig().setSendBufferSize(size);
    			return this;
    		}
    
    		public TCPConnector build() {
    			//添加重连监听
    			if (newInstance.connectorListener != null) {
                    newInstance.mConnector.getFilterChain().addFirst(RECONNECT, new IoFilterAdapter() {
    					@Override
    					public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception {
    						if (newInstance != null && newInstance.connectorListener != null)
                                newInstance.connectorListener.sessionClosed(session);
    					}
    				});
    			}
    
    			//设置编码解码
    			if (protocolCodecFilter != null)
                    newInstance.mConnector.getFilterChain().addLast(CODEC, protocolCodecFilter);
    
    			//设置心跳
    			if (keepAliveFilter != null)
                    newInstance.mConnector.getFilterChain().addLast(HEARTBEAT, keepAliveFilter);
    
    			//connector不允许使用OrderedThreadPoolExecutor
                  newInstance.mConnector.getFilterChain().addLast(EXECUTOR, new ExecutorFilter(Executors.newSingleThreadExecutor()));
    			return newInstance;
    		}
    	}
    
    	@Override
    	public String toString() {
    		return "MinaHelper{" +
    				"mSession=" + mSession +
    				", mConnector=" + mConnector +
    				", connectorListener=" + connectorListener +
    				", mExecutor=" + mExecutor +
    				'}';
    	}
    
    	@Override
    	public boolean equals(Object o) {
    		if (this == o) return true;
    		if (o == null || getClass() != o.getClass()) return false;
    
            TCPConnector connector = (TCPConnector) o;
    
    		return port == connector.port && (host != null ? host.equals(connector.host) : connector.host == null);
    	}
    
    	@Override
    	public int hashCode() {
    		int result = host != null ? host.hashCode() : 0;
            result = 31 * result + port;
    		return result;
    	}
    

    使用起来是这样的:

            HigherGateWayHandler higherGateWayHandler = new HigherGateWayHandler();
            TCPConnector higherGateWayClient = new TCPConnector.Builder()
    				.setExecutor(ThreadPool.singleThread("higher_gateway_client"))
    				.setHost(NC.GATEWAT_HIGHER_HOST)
    				.setPort(NC.LOWER_PORT)
    				.setConnectTimeoutMillis(10 * 1000)
    				.setReadBuffer(10 * 1024)
    				.setHandlerAdapter(higherGateWayHandler)
    				.setProtocolCodecFilter(new HigherGateWayCodecFactory())
    				.setKeepAliveFilter(new KeepAliveHigherGateWay(), higherGateWayHandler, 10, 20)
    				.setConnectListener(new TCPConnector.IConnectorListener() {
    					@Override
    					public void connectSuccess(IoSession session) {
                            //连接成功后
    					}
    
    					@Override
    					public void connectFailed() {
    						//重连失败
    						if (higherGateWayClient.getRecconnectCounter() == 3) {
    							//重连失败后
    						}
    						//非重连失败,优先级连接情况下
    						if (higherGateWayClient.getRecconnectCounter() == 0 && higherGateWayClient.getConnectCounter() > 2) {
                                higherGateWayClient.resetConnectCounter();
    						} else {
                                higherGateWayClient.connectInThread();
    						}
    					}
    
    					@Override
    					public void sessionClosed(IoSession session) {
                            executors.execute(new Runnable() {
    							@Override
    							public void run() {
    							   //重连逻辑
                                   higherGateWayClient.reconnect(10 * 1000, 3);
    							}
    						});
    					}
    				})
    				.build();
    

    而Netty,封装起来会有一点花里胡哨,目前遇到的问题是当重连以后复用IdleStateHandler这种Handler时,就会使得其中的计时机制失效,也就是说,心跳没用了,暂时不明原因,大概率是其中的线程被销毁无法再起的原因。那么当前就只能想办法每次调用initChannel时,创建新的Handler才行:

    public class NettyConnector {
    
    	/**
    	 * 连接器
    	 */
    	private Bootstrap bootstrap;
    
    	/**
    	 * 地址
    	 */
    	private String host;
    	private int port;
    
    	/**
    	 * 会话
    	 */
    	private Channel channel;
    
    	private static final long TIME_OUT = 10;
    
    
    	private long connectTimeoutMills;
    
    	/**
    	 * 重连次数
    	 */
    	private AtomicInteger recconnectCounter;
    
    	/**
    	 * 首次连接次数
    	 */
    	private AtomicInteger connectCounter;
    
    	/**
    	 * 以接口引出通信状态
    	 */
    	public interface IChannelStateListener {
    		void onConnectSuccess(Channel channel);
    
    		void onConnectFailed();
    
    		void onDisconnect();
    	}
    
    	private IChannelStateListener channelStateListener;
    
    	private NettyConnector(final Builder builder) {
            recconnectCounter = new AtomicInteger(0);
            connectCounter = new AtomicInteger(0);
            connectTimeoutMills = builder.timeoutMills;
            bootstrap = builder.bootstrap;
            bootstrap.handler(new ChannelInitializer() {
    			@Override
    			protected void initChannel(Channel channel) throws Exception {
                    channel.pipeline().addLast(new ChannelDisconnectHandler());
                    channel.pipeline().addLast(builder.handlerSet.handlers());
    			}
    		});
    	}
    
    	public void setRemoteAddress(String host, int port) {
            L.d("设置地址与端口-" + host + ":" + port);
    		this.host = host;
    		this.port = port;
    	}
    
    
    	public void setChannelStateListener(IChannelStateListener listener) {
            channelStateListener = listener;
    	}
    
    	public void connect() {
    		if (channel == null || !channel.isActive()) {
                bootstrap.remoteAddress(this.host, this.port);
                L.d("第" + (connectCounter.get() + 1) + "次连接" + host + ":" + port + "中......");
    
    			final long startMills = System.currentTimeMillis();
                ChannelFuture channelFuture = bootstrap.connect();
    
                channelFuture.addListener(new GenericFutureListener<ChannelFuture>() {
    				@Override
    				public void operationComplete(ChannelFuture f) throws Exception {
    					if (f.isSuccess()) {
                            L.d("连接(" + bootstrap.config().remoteAddress() + ")成功");
                            channel = f.channel();
    						if (channelStateListener != null) {
                                connectCounter.set(0);
                                channelStateListener.onConnectSuccess(channel);
    						}
    					} else {
    						long delay = System.currentTimeMillis() - startMills;
    						if (delay > 0) {
                                TimeUnit.MILLISECONDS.sleep(connectTimeoutMills - delay);
    						}
                            L.d("连接(" + bootstrap.config().remoteAddress() + ")失败");
    						if (channelStateListener != null) {
                                connectCounter.incrementAndGet();
                                channelStateListener.onConnectFailed();
    						}
    					}
    				}
    			});
    		}
    	}
    
    	private void reconnect() {
    		if (bootstrap == null)
    			throw new IllegalArgumentException("bootstrap cannot be null");
    		//如果已经连接,则直接【连接成功】
    		if (channel == null || !channel.isActive()) {
    			//连接
                channel = bootstrap.connect().awaitUninterruptibly().channel();
    		}
    	}
    
    	/**
    	 * 重连
    	 * @param reconnectTimeoutMills 重连超时时间
    	 * @param reconnectTimes 重连次数
    	 */
    	public void reconnect(final long reconnectTimeoutMills, final int reconnectTimes) {
    		try {
                recconnectCounter.set(0);
    			while (channel != null && !channel.isActive() && recconnectCounter.getAndIncrement() < reconnectTimes) {
                    L.d(Thread.currentThread().getName() + "," + "重连" + bootstrap.config().remoteAddress() + "(" + recconnectCounter.get() + ")次...");
    				reconnect();
    				if (channel.isActive()) {
    					break;
    				} else {
                        TimeUnit.MILLISECONDS.sleep(reconnectTimeoutMills);
    				}
                    L.d(channel.isActive() + "");
    			}
    		} catch (InterruptedException e) {
                e.printStackTrace();
    		} finally {
    			if (channel != null && channel.isActive()) {
    				if (channelStateListener != null) {
                        channelStateListener.onConnectSuccess(channel);
    				}
    			} else {
    				if (channelStateListener != null) {
                        channelStateListener.onConnectFailed();
    				}
    			}
    		}
    	}
    
    	public Channel getChannel() {
    		return channel;
    	}
    
    	public boolean isConnected() {
    		return channel != null && channel.isActive();
    	}
    
    	public String getAddress() {
    		return host + ":" + port;
    	}
    
    	public int getConnectFailedTimes() {
    		return connectCounter.get();
    	}
    
    	public int getReconnectFailedTimes() {
    		return recconnectCounter.get();
    	}
    
    	public static class Builder {
    
    		private Bootstrap bootstrap = new Bootstrap();
    		private HandlerSet handlerSet;
    		private long timeoutMills = 10 * 1000;
    
    		public Builder group(EventLoopGroup loopGroup) {
                bootstrap.group(loopGroup);
    			return this;
    		}
    
    		@Deprecated
    		public Builder remoteAddress(String inetHost, int inetPort) {
                bootstrap.remoteAddress(inetHost, inetPort);
    			return this;
    		}
    
    		public Builder setConnectTimeoutMills(long timeout) {
                timeoutMills = timeout;
    			return this;
    		}
    
    		public Builder handler(HandlerSet handlers) {
                handlerSet = handlers;
    			return this;
    		}
    
    		public NettyConnector build() {
                bootstrap.channel(NioSocketChannel.class);
    			return new NettyConnector(this);
    		}
    	}
    
    	/**
    	 * 主要用于监听断开
    	 */
    	class ChannelDisconnectHandler extends ChannelInboundHandlerAdapter {
    
    		@Override
    		public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                ctx.fireChannelInactive();
    			if (channelStateListener != null) {
                    channelStateListener.onDisconnect();
    			}
    		}
    	}
    
    	/**
    	 * 主要用于创建新的handler,避免复用带来的一些问题
    	 */
    	@ChannelHandler.Sharable
    	public static abstract class HandlerSet extends ChannelInboundHandlerAdapter {
    		public abstract ChannelHandler[] handlers();
    	}
    
    }
    

    因为Netty不能像Mina直接在Connector上挂载监听sessionClosed,只能用一个ChannelDisconnectHandler这样的东西去监听是否已经断开,并通过接口引出结果;
    并且因为只能在Channel.Pipeline中才能添加多个Handler的原因,这里用一个HandlerSet强行将所有需要的Handler集合,然后在创建Bootstrap的时候一次性添加进去,想要保证每次都新建,这里就使用抽象方法,让使用的时候可以自行创建。
    注意,由于这里的抽象类HandlerSet每次其实并不是新建的,所有是需要复用的,所以需要加注解@Sharable,但也只需要加它一个就行了,其他都是新建出来的,无需理会。
    写出来就是这样:

            NettyConnector connector = new NettyConnector.Builder()
    				.group(new NioEventLoopGroup())
    				.handler(new NettyConnector.HandlerSet() {
    
    					@Override
    					public ChannelHandler[] handlers() {
    						return new ChannelHandler[]{new HeartHandler(10, 10, 10),
    								new NormalClientEncoder(),
    								new NormalClientHeartBeatHandler(),
    								new NormalClientHandler()};
    					}
    				})
    				.setConnectTimeoutMills(5 * 1000)
    				.build();
    
            connector.setRemoteAddress("192.168.0.102", 8000);
            connector.setChannelStateListener(new NettyConnector.IChannelStateListener() {
    			@Override
    			public void onConnectSuccess(Channel channel) {
                    L.d("连接" + channel.remoteAddress().toString() + "成功");
    			}
    
    			@Override
    			public void onConnectFailed() {
                    L.d("连接" + connector.getAddress() + "失败");
    				if (connector.getReconnectFailedTimes() == 0 && connector.getConnectFailedTimes() < 3) {
                        connector.connect();
    				}
    			}
    
    			@Override
    			public void onDisconnect() {
                    L.d(connector.getChannel().remoteAddress().toString() + "已断开");
                    connector.reconnect(5000, 5);
    			}
    		});
    

    其中的HeartHandler是继承自IdleStateHandler的。

    整个封装显得花里胡哨…却又很丑,不过勉强能用,水平有限。

    就这样吧。

    展开全文
  • STM32F407 LWIP掉线重连

    千次阅读 2021-03-02 14:22:28
    STM32F407 LWIP掉线重连STM32CUBE配置(简略)网卡配置(注意网卡复位引脚)LWIP配置TCP/IP 连接自动重连的实现Lwip协议栈TCP保活(KeepAlive)设定自动重连流程简介代码实现 STM32CUBE配置(简略) 请根据硬件自行百度网卡...
  • 断线重连方案

    千次阅读 2019-02-21 02:21:20
    《天天炫斗》弱网处理以及断线重连方案 当前手游现状 网络不稳定:购买道具半天没响应、切换副本版本不流畅、意外重复购买... 网络闪断:”又掉线又得重新登录“、”KAO,副本中掉了,前面的BOSS白打了。“... ...
  • 网络游戏-断线重连

    千次阅读 2019-01-31 11:24:54
    下面我主要从客户端方向就断线重连触发的条件,如何重连,以及重连后的后续处理三个方面来阐述,最后简略分析一下对我们游戏客户端容易掉线的一些思考。 2、判断重连条件 在弱网络条件下,我们根据网络状况的...
  • MQTT断线重连

    万次阅读 热门讨论 2018-02-08 15:57:19
    MQTT客户端:org.eclipse.paho.client.mqttv3 MQTT服务器:EMQ MQTT服务器官网:http://emqtt.com/ ...在之前的文章中我们简单介绍了MQTT的收发消息,并没有实现重连机制,我在实现重连时,发现...
  • 长连接、心跳和断线重连

    万次阅读 2017-06-22 18:38:53
    长在线这个功能依赖断线重连完成。  通常,网络不稳定是造成不能长时间在线的主要原因,还有比如:服务器强制注销客户端、次客户端被主客户端踢。目前的qq和飞信都有断线重连机制。有时候IM软件自动完成登录,...
  • WebSocket断线重连

    千次阅读 2018-09-10 17:18:29
    WebSocket断线重连 即时通讯 心跳重连 废话不多说,直接上代码. $scope.timeout = 10800,//3分钟发一次心跳 $scope.timeoutObj = null, $scope.serverTimeoutObj = null, $scope.init = function() { $scope....
  • Netty断线重连实现

    千次阅读 2019-03-05 14:46:06
    netty断线重连实现 学习文章 浅析 Netty 实现心跳机制与断线重连 心跳机制 心跳是TCP长连接中,c-s之间发送的一种特殊的数据包,用来通知对方还在线,以确保TCP连接的有效性。 原理是:当在一段时间Idle后,c或者s会...
  • netty客户端断线重连实现及问题思考

    千次阅读 多人点赞 2021-01-10 22:26:20
    前言 在实现TCP长连接功能中,客户端断线重连是一个很常见的问题,当我们使用netty实现断线重连时,是否考虑过如下几个问题: 如何监听到客户端和服务端连接断开 ? 如何实现断线后重新连接 ? netty客户端线程给多大...
  • Netty 断线重连解决方案

    万次阅读 2018-04-28 09:40:12
    本篇文章是Netty专题的第七篇,前面六篇文章如下: - 高性能NIO框架Netty入门篇 - 高性能NIO框架Netty-对象传输 - 高性能NIO框架Netty-整合kryo高性能...用Netty实现长连接服务,当发生下面的情况时,会发生断线...
  • 断线重连 粘包组包 粘包 一般所谓的TCP粘包是在一次接收数据不能完全地体现一个完整的消息数据。TCP通讯存在粘包的主要原因是TCP是以流的方式来处理数据,所以就会引发一次接收的数据无法满足消息的需要,导致粘包的...
  • } @Override public void onMessage(ByteBuffer bytes) { //该站点用的是Byte流,这里需要转换成文本 //TODO 这里可以加入消息处理逻辑 log.info("[websocket] 收到消息={}", ByteUtils.getString(bytes));...
  • } 2.NioClient 建立连接,断线重连; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io...
  • netty4.0 心跳检测与断线重连操作

    千次阅读 2018-09-25 17:03:14
    那如何实现netty长连接和断线重连呢(网络故障或者其他原因,客户端要无限取重连服务端)。接下来我们看一下如何实现这个两个功能呢。 服务端代码如下: package com.example.nettydemo.demo; import io.netty....
  • // //断线重连 f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { if (!channelFuture.isSuccess()) { final EventLoop...
  • js websocket断线重连

    2021-07-10 06:59:52
    js websocket断开重连实例代码,请根据自己需求做出相应改动Vue中使用websocket $(function() { var lockReconnect = false;//避免重复连接 var ws = null; //WebSocket的引用 var wsUrl = "xxxxxx"; //这个要与...
  • WebSocket 前端 Vue 长连接 断线重连

    千次阅读 2019-07-12 00:36:09
    //断线重连后,延迟5秒重新创建WebSocket连接 rec用来存储延迟请求的代码 let rec ; //定义重连函数 let reConnect = ( ) => { console . log ( '尝试重新连接' ) ; //如果已经上就不在重连...
  • 概述 IM软件有一个基本的功能就是长在线,即只要有网络就保持登录,然而,网络状态是无法预测的...比如网络状况可用的时候,自动断线重连。 目前的qq和飞信都有断线重连机制。 断线重连的定义: 1、用户已经成功登录IM
  • 没办法了,那就把最近做的一个简简单单的功能,拿出来说说吧,功能很简单,需要支持 k8s pod terminal 断开重连。 分析 我们知道,使用命令 kubectl exec -it pods/PODNAME -- bash [--kubeconfig /PATH/TO/KUBE...
  • WebRTC怎么断线重连

    2017-03-22 04:27:12
    用户加入房间后因为意外断网,导致该用户的信息并不能删除!再次进入会提示用户已经存在!敢问类似的聊天室是如何处理断网的?必须发送保活连接的心跳包吗?
  • 我们所需要开发的业务逻辑 Handler 就是在这里添加的。其代码如下: public class HeartBeatServerInitializer extends ChannelInitializer<Channel> { @Override protected void initChannel(Channel channel) ...
  • 下面将介绍Netty如何进行心跳检测以及处理客户端的断线重连。 为了适应恶劣的网络环境,比如网络超时、闪断,客户端进程僵死,需要机制来保证双方的通信能正常工作或者自动恢复。对于服务端来说,当客户端由于某些...
  • Netty 4.0 实现心跳检测和断线重连

    千次阅读 2016-02-17 21:42:39
    二 客户端实现断线重连 原理当客户端连接服务器时 bootstrap.connect(new InetSocketAddress( serverIP, port)); 会返回一个ChannelFuture的对象,我们对这个对象进行监听 代码如下: import android.os.Handler; ...
  • //方法实现说明 断线重连方法,如果是持久订阅,重连是不需要再次订阅,如果是非持久订阅,重连是需要重新订阅主题 取决于options.setCleanSession(true); true为非持久订阅 @Override public void ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 12,587
精华内容 5,034
关键字:

断线重连逻辑