精华内容
下载资源
问答
  • IdleStateHandler心跳机制

    千次阅读 2018-04-18 10:52:03
    (0)基础心跳机制心跳是在TCP长连接中,客户端和服务端定时向对方发送数据包通知对方自己还在线,保证连接的有效性的种机制在服务器和客户端之间一定时间内没有数据交互时, 即处于 idle 状态时, 客户端或服务器会...


    (0)基础

    • 心跳机制

      • 心跳是在TCP长连接中,客户端和服务端定时向对方发送数据包通知对方自己还在线,保证连接的有效性的一种机制
      • 在服务器和客户端之间一定时间内没有数据交互时, 即处于 idle 状态时, 客户端或服务器会发送一个特殊的数据包给对方, 当接收方收到这个数据报文后, 也立即发送一个特殊的数据报文, 回应发送方, 此即一个 PING-PONG 交互. 自然地, 当某一端收到心跳消息后, 就知道了对方仍然在线, 这就确保 TCP 连接的有效性.
    • 心跳实现

      • 使用TCP协议层的Keeplive机制,但是该机制默认的心跳时间是2小时,依赖操作系统实现不够灵活;

      • 应用层实现自定义心跳机制,比如Netty实现心跳机制;

    (1) IdleStateHandler心跳检测实例

    服务端

    • 服务端添加IdleStateHandler心跳检测处理器,并添加自定义处理Handler类实现userEventTriggered()方法作为超时事件的逻辑处理;

    • 设定IdleStateHandler心跳检测每五秒进行一次读检测,如果五秒内ChannelRead()方法未被调用则触发一次userEventTrigger()方法

    ServerBootstrap b= new ServerBootstrap();
    b.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG,1024)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                 socketChannel.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
                    socketChannel.pipeline().addLast(new StringDecoder());
                    socketChannel.pipeline().addLast(new HeartBeatServerHandler());
                }
            });
    
    • 自定义处理类Handler继承ChannlInboundHandlerAdapter,实现其userEventTriggered()方法,在出现超时事件时会被触发,包括读空闲超时或者写空闲超时;
    class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {
        private int lossConnectCount = 0;
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            System.out.println("已经5秒未收到客户端的消息了!");
            if (evt instanceof IdleStateEvent){
                IdleStateEvent event = (IdleStateEvent)evt;
                if (event.state()== IdleState.READER_IDLE){
                    lossConnectCount++;
                    if (lossConnectCount>2){
                        System.out.println("关闭这个不活跃通道!");
                        ctx.channel().close();
                    }
                }
            }else {
                super.userEventTriggered(ctx,evt);
            }
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            lossConnectCount = 0;
            System.out.println("client says: "+msg.toString());
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    

    客户端

    • 客户端添加IdleStateHandler心跳检测处理器,并添加自定义处理Handler类实现userEventTriggered()方法作为超时事件的逻辑处理;

    • 设定IdleStateHandler心跳检测每四秒进行一次写检测,如果四秒内write()方法未被调用则触发一次userEventTrigger()方法,实现客户端每四秒向服务端发送一次消息;

    Bootstrap b = new Bootstrap();
    b.group(group).channel(NioSocketChannel.class)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new IdleStateHandler(0,4,0, TimeUnit.SECONDS));
                    socketChannel.pipeline().addLast(new StringEncoder());
                    socketChannel.pipeline().addLast(new HeartBeatClientHandler());
                }
            });
    
    • 自定义处理类Handler继承ChannlInboundHandlerAdapter,实现自定义userEventTrigger()方法,如果出现超时时间就会被触发,包括读空闲超时或者写空闲超时;
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        System.out.println("客户端循环心跳监测发送: "+new Date());
        if (evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            if (event.state()== IdleState.WRITER_IDLE){
                if (curTime<beatTime){
                    curTime++;
                    ctx.writeAndFlush("biubiu");
                }
            }
        }
    }
    

    (2) IdleStateHandler源码分析

    • IdleStateHandler构造器

      • readerIdleTime读空闲超时时间设定,如果channelRead()方法超过readerIdleTime时间未被调用则会触发超时事件调用userEventTrigger()方法;

      • writerIdleTime写空闲超时时间设定,如果write()方法超过writerIdleTime时间未被调用则会触发超时事件调用userEventTrigger()方法;

      • allIdleTime所有类型的空闲超时时间设定,包括读空闲和写空闲;

      • unit时间单位,包括时分秒等;

    public IdleStateHandler(
            long readerIdleTime, long writerIdleTime, long allIdleTime,
            TimeUnit unit) {
        this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
    }
    
    • 心跳检测也是一种Handler,在启动时添加到ChannelPipeline管道中,当有读写操作时消息在其中传递;
    socketChannel.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
    
    • IdleStateHandler的channelActive()方法在socket通道建立时被触发
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        initialize(ctx);
        super.channelActive(ctx);
    }
    
    • channelActive()方法调用Initialize()方法,根据配置的readerIdleTime,WriteIdleTIme等超时事件参数往任务队列taskQueue中添加定时任务task ;
    private void initialize(ChannelHandlerContext ctx) {
        switch (state) {
        case 1:
        case 2:
            return;
        }
    
        state = 1;
        initOutputChanged(ctx);
    
        lastReadTime = lastWriteTime = ticksInNanos();
        if (readerIdleTimeNanos > 0) {
            readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
                    readerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (writerIdleTimeNanos > 0) {
            writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
                    writerIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
        if (allIdleTimeNanos > 0) {
            allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
                    allIdleTimeNanos, TimeUnit.NANOSECONDS);
        }
    }
    
    • 定时任务添加到对应线程EventLoopExecutor对应的任务队列taskQueue中,在对应线程的run()方法中循环执行

      • 用当前时间减去最后一次channelRead方法调用的时间判断是否空闲超时;

      • 如果空闲超时则创建空闲超时事件并传递到channelPipeline中;

    protected void run(ChannelHandlerContext ctx) {
        long nextDelay = readerIdleTimeNanos;
        if (!reading) {
            nextDelay -= ticksInNanos() - lastReadTime;
        }
    
        if (nextDelay <= 0) {
            // Reader is idle - set a new timeout and notify the callback.
            readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS);
    
            boolean first = firstReaderIdleEvent;
            firstReaderIdleEvent = false;
    
            try {
                IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
                channelIdle(ctx, event);
            } catch (Throwable t) {
                ctx.fireExceptionCaught(t);
            }
        } else {
            // Read occurred before the timeout - set a new timeout with shorter delay.
            readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
        }
    }
    
    • 在管道中传递调用自定义的userEventTrigger()方法
    protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
        ctx.fireUserEventTriggered(evt);
    }
    

    总结

    • IdleStateHandler心跳检测主要是通过向线程任务队列中添加定时任务,判断channelRead()方法或write()方法是否调用空闲超时,如果超时则触发超时事件执行自定义userEventTrigger()方法;

    • Netty通过IdleStateHandler实现最常见的心跳机制不是一种双向心跳的PING-PONG模式,而是客户端发送心跳数据包,服务端接收心跳但不回复,因为如果服务端同时有上千个连接,心跳的回复需要消耗大量网络资源;如果服务端一段时间内内有收到客户端的心跳数据包则认为客户端已经下线,将通道关闭避免资源的浪费;在这种心跳模式下服务端可以感知客户端的存活情况,无论是宕机的正常下线还是网络问题的非正常下线,服务端都能感知到,而客户端不能感知到服务端的非正常下线;

    • 要想实现客户端感知服务端的存活情况,需要进行双向的心跳;Netty中的channelInactive()方法是通过Socket连接关闭时挥手数据包触发的,因此可以通过channelInactive()方法感知正常的下线情况,但是因为网络异常等非正常下线则无法感知;

    原文地址:Netty学习(五)—IdleStateHandler心跳机制 

    展开全文
  • 心跳

    2020-07-21 16:13:06
    客户端定时发送或者在有消息发送时会发送时间同步包,服务器收到这个包后会记录一下标记位,当服务器定时检测标记位时间到达时先检测标记位,如果标记位已经被记录,则会清空标记位和计数器,然后会回个包含...

    心跳包是指不断发送消息通知另外一方自己"还在",在游戏开发中,一般使用“时间同步包”充当心跳包,检测游戏时间的同时,检测一下对方是否存在。客户端定时发送或者在有消息发送时就会发送时间同步包,服务器收到这个包后会记录一下标记位,当服务器定时检测标记位时间到达时先检测标记位,如果标记位已经被记录,则会清空标记位和计数器,然后会回一个包含服务器时间的返回包。如果标记位没有被记录,则计数器加1,等待下次服务器定时检测标记位时间到来。当计数器到达一定数值时,标记位还没有被记录,说明客户端已经不在了,这时可以释放连接,做一下清理工作。相应的,如果客户端在一定时间内没收到服务器返回的同步时间包,说明已经断开服务器了,可以尝试重连服务器操作。

    参考资料:

    https://cloud.tencent.com/developer/article/1342680

    https://blog.csdn.net/xnn2s/article/details/6054443

    https://blog.csdn.net/qq_16209077/article/details/68941901?locationNum=5&fps=1

    https://blog.csdn.net/yangxuan0261/article/details/52064708

    https://www.cnblogs.com/CnKker/articles/5050675.html

    展开全文
  • 心跳是啥 在 TCP 长连接中, 客户端和服务器之间定期发送的种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性. ... 自然地, 当某一端收到心跳消息后, 知道了对方仍然在线, 这确保 TCP 连接的有效性

    心跳是啥

    在 TCP 长连接中, 客户端和服务器之间定期发送的一种特殊的数据包, 通知对方自己还在线, 以确保 TCP 连接的有效性.

    心跳机制的工作原理

    心跳机制的工作原理是: 在服务器和客户端之间一定时间内没有数据交互时, 即处于 idle 状态时, 客户端或服务器会发送一个特殊的数据包给对方, 当接收方收到这个数据报文后, 也立即发送一个特殊的数据报文, 回应发送方, 此即一个 PING-PONG 交互. 自然地, 当某一端收到心跳消息后, 就知道了对方仍然在线, 这就确保 TCP 连接的有效性.

    实现心跳

    在 Netty 中, 实现心跳机制的关键是 IdleStateHandler, 它可以对一个 Channel 的 读/写设置定时器, 当 Channel 在一定事件间隔内没有数据交互时(即处于 idle 状态), 就会触发指定的事件.

     

       public IdleStateHandler(
                int readerIdleTimeSeconds,
                int writerIdleTimeSeconds,
                int allIdleTimeSeconds) {
    
            this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
                 TimeUnit.SECONDS);
        }
    

    IdleStateHandler构造函数需要提供三个参数:

    • readerIdleTimeSeconds, 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个 READER_IDLE 的 IdleStateEvent 事件.
    • writerIdleTimeSeconds, 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个 WRITER_IDLE 的 IdleStateEvent 事件.
    • allIdleTimeSeconds, 读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个 ALL_IDLE 的 IdleStateEvent 事件.

    服务端

    服务端启动初始化代码

    服务器的初始化部分为 pipeline 添加了三个 Handler,其中IdleStateHandler就是心跳处理Handler

     

    public class HeartbeatServer {
    
    
        public static void main(String[] args) throws InterruptedException {
    
    
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            try {
                ServerBootstrap b = new ServerBootstrap();
    
                b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 1024)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast(new IdleStateHandler(5, 5, 5, TimeUnit.SECONDS));
                                socketChannel.pipeline().addLast(new StringDecoder());
                                socketChannel.pipeline().addLast(new HeartBeatServerHandler());
                            }
                        });
    
                ChannelFuture f = b.bind(8089).sync();
                f.channel().closeFuture().sync();
            }catch (Exception e){
                e.printStackTrace();
            }finally {
                workerGroup.shutdownGracefully().sync();
                bossGroup.shutdownGracefully().sync();
            }
    
        }
    }
    
    

    服务端心跳处理Handler

    IdleStateHandler 是实现心跳的关键, 它会根据不同的 IO idle 类型来产生不同的 IdleStateEvent 事件, 而这个事件的捕获, 其实就是在 userEventTriggered 方法中实现的.

     

    public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {
        private int lossConnectCount = 0;
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            System.out.println("已经5秒未收到客户端的消息了!");
            if (evt instanceof IdleStateEvent){
                IdleStateEvent event = (IdleStateEvent)evt;
                if (event.state()== IdleState.READER_IDLE){
                    lossConnectCount++;
                    if (lossConnectCount>2){
                        System.out.println("关闭这个不活跃通道!");
                        ctx.channel().close();
                    }
                }
    
    
    
            }else {
                super.userEventTriggered(ctx,evt);
            }
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            lossConnectCount = 0;
            System.out.println("client says: "+msg.toString());
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    

    客户端

    客户端启动初始化代码

    心跳的配置也是跟服务端一样,往pipeline中添加IdleStateHandler,其中的参数可以自己随意配置。

     

    public class HeartBeatClient {
    
        public static void main(String[] args) throws Exception {
    
            EventLoopGroup group = new NioEventLoopGroup();
    
            try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new IdleStateHandler(0,4,0, TimeUnit.SECONDS));
                            socketChannel.pipeline().addLast(new StringEncoder());
                            socketChannel.pipeline().addLast(new HeartBeatClientHandler());
                        }
                    });
    
                ChannelFuture f = b.connect(new InetSocketAddress(8089)).sync();
                f.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                group.shutdownGracefully().sync();
            }
        }
    }
    

    客户端心跳处理代码

    关键也是在userEventTriggered方法中实现的,主要的逻辑就是往服务端发送心跳,发了3次就不发了,这时候就会触发服务端的userEventTriggered中lossConnectCount 如果超过2次就把这个通道给断开。也就是把这个客户端给断开。

     

    
    public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {
    
        private int curTime = 0;
        private int beatTime = 3;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("==channelRead===");
            System.out.println(msg.toString());
        }
    
    
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            System.out.println("客户端循环心跳监测发送: "+new Date());
    
            if (evt instanceof IdleStateEvent){
                IdleStateEvent event = (IdleStateEvent)evt;
                if (event.state()== IdleState.WRITER_IDLE){
                    if (curTime<beatTime) {
                        curTime++;
                        ctx.writeAndFlush("biubiu");
                    }
                }
            }
        }
    }
    
    

    小结

    Netty心跳的做法大致就是如此,

    1.利用 IdleStateHandler来产生对应的 idle 事件.
    2.userEventTriggered中做好心跳交互逻辑。

    至于更加复杂的逻辑,还是各位遇到的时候自己发挥。

    断线重连

    服务端代码依旧是上面的不变。

    客户端

    主要工作以及初始化代码如下:
    1.通过 channel().eventLoop().schedule来延时10s 后尝试重新连接.

     

    public class ReconnectClient {
    
    
        private NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
        private Channel channel;
        private Bootstrap bootstrap;
    
        public static void main(String[] args) throws Exception {
            ReconnectClient client = new ReconnectClient();
            client.start();
            client.sendData();
        }
    
        public void sendData() throws Exception {
            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < 10000; i++) {
                if (channel != null && channel.isActive()) {
                    channel.writeAndFlush("ReconnectClient心跳來了呀.....");
                }
    
                Thread.sleep(random.nextInt(20000));
            }
        }
    
        public void start() {
            try {
                bootstrap = new Bootstrap();
                bootstrap
                        .group(workGroup)
                        .channel(NioSocketChannel.class)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new IdleStateHandler(0,4,0, TimeUnit.SECONDS));
                                ch.pipeline().addLast(new StringEncoder());
                                ch.pipeline().addLast(new ReconnectClientHandler(ReconnectClient.this));
                            }
                        });
                doConnect();
    
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    
        protected void doConnect() {
            if (channel != null && channel.isActive()) {
                return;
            }
    
            ChannelFuture future = bootstrap.connect(new InetSocketAddress(8089));
    
            future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture futureListener) throws Exception {
                        if (futureListener.isSuccess()) {
                            channel = futureListener.channel();
                            System.out.println("Connect to server successfully!");
                        } else {
                            System.out.println("Failed to connect to server, try connect after 10s");
    
                            futureListener.channel().eventLoop().schedule(new Runnable() {
                                @Override
                                public void run() {
                                    doConnect();
                                }
                            }, 10, TimeUnit.SECONDS);
                        }
                    }
            });
        }
    }
    

    ReconnectClientHandler 处理断线重连处理类

    2.断线重连的关键一点是检测连接是否已经断开. 因此我们重写了 channelInactive 方法. 当 TCP 连接断开时, 会回调channelInactive方法, 因此我们在这个方法中调用 client.doConnect() 来进行重连.

     

    public class ReconnectClientHandler extends ChannelInboundHandlerAdapter {
    
        private int curTime = 0;
        private int beatTime = 3;
    
        private ReconnectClient client;
        public ReconnectClientHandler(ReconnectClient client) {
            this.client = client;
        }
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            System.out.println("ReconnectClientHandler客户端循环心跳监测发送: "+new Date());
    
            if (evt instanceof IdleStateEvent){
                IdleStateEvent event = (IdleStateEvent)evt;
                if (event.state()== IdleState.WRITER_IDLE){
                    if (curTime<beatTime) {
                        curTime++;
                        ctx.writeAndFlush("ReconnectClientHandler=biubiu.....");
                    }
                }
            }
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            super.channelInactive(ctx);
            client.doConnect();
            System.out.println("重新連接了呀。。。。");
        }
    }
    

    总结

    心跳机制与断线重连的基本步骤如上所述。

    展开全文
  • 在简易RPC框架中,采用的是TCP长连接,为了确保长连接有效,需要客户端与服务端之间有种通知机制告知对方的存活状态。 如何实现 客户端发送心跳消息 在状态空闲的时候定时给服务端发送消息类型为PING消息。...

    心跳

    就是告诉其它人自己还活着。在简易RPC框架中,采用的是TCP长连接,为了确保长连接有效,就需要客户端与服务端之间有一种通知机制告知对方的存活状态。

    如何实现

    客户端发送心跳消息

    在状态空闲的时候定时给服务端发送消息类型为PING消息。

    服务端接收心跳消息

    捕获通道空闲状态事件,如果接收客户端PING消息,则发送PONG消息给服务端。如果在一定时间内没有收到客户端的PING消息,则说明客户端已经不在线,此时关闭通道。

    客户端管理可用连接

    由于服务端会因为长时间接收不到服务端的PING消息而关闭通道,这就导致缓存在客户端的连接的可用性发生变化。需要将不可用的从可用列表中转移出去,并对不可用连接进行处理,比如直接丢弃或者是重新连接。

    预备知识

    ChannelPipeline与handle的关系。netty中的这些handle和spring mvc中的filter作用是类似的,ChannelPipeline可以理解成handle的容器,里面可以被注册众多处理不同业务功能的事件处理器,比如:

    • 编码
    • 解码
    • 心跳
    • 权限
    • 加密
    • 解密
    • 业务代码执行
    • ......

    具体实现

    空闲状态处理器

    可以利用netty提供的IdleStateHandler来发送PING-PONG消息。这个处理器主要是捕获通道超时事件,主要有三类

    • 读超时,一定时间内没有从通道内读取到任何数据
    • 写超时,一定时间内没有从通道内写入任何数据
    • 读写超时,一定时间内没有从通道内读取或者是写入任何数据

    客户端加入空闲状态处理器

    客户端捕获读写超时,如果事件触发就给服务端发送PING消息。

    服务端加入空闲状态处理器

    服务端只需要捕获读超时即可,当读超时触发后就关闭通道。

    为什么在空闲状态才发送心跳消息

    在正常客户端与服务端有交互的情况下,说明双方都在正常工作不需要额外的心跳来告知对方的存活。只有双方在一定时间内没有接收到对方的消息时才开始采用心跳消息来探测对方的存活,这也是一种提升效率的做法。

    抽象心跳处理器

    创建AbstractHeartbeatHandler,并继承ChannelInboundHandlerAdapter,服务于客户端与服务端的心跳处理器。在读取方法中判断消息类型:

    • 如果是PING消息就发送PONG消息给客户端
    • 如果收到的是PONG消息,则直接打印消息说明客户端已经成功接收到服务端返回的PONG消息
    • 如果是其它类型的消息,则通知下一个处理器处理消息
    
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object msg) throws Exception {
    
            if(!(msg instanceof RpcMessage)){
                channelHandlerContext.fireChannelRead(msg);
                return;
            }
            RpcMessage message=(RpcMessage)msg;
    
            if(null==message||null==message.getMessageHeader()){
                channelHandlerContext.fireChannelRead(msg);
                return;
            }
            if(message.getMessageHeader().getType()== Constants.MESSAGE_TYPE_HEARTBEAT_PONG){
                logger.info("ClientHeartbeatHandler.channelRead0 ,pong data is:{}",message.getMessageBody());
            }
            else if(message.getMessageHeader().getType()== Constants.MESSAGE_TYPE_HEARTBEAT_PING){
                this.sendPong(channelHandlerContext);
            }
            else {
                channelHandlerContext.fireChannelRead(msg);
            }
    
        }

    空闲状态事件,可以根据不同的状态做不同的行为处理,定义三个可重写事件供客户端与服务端处理器具体确认处理事件。

    
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent e = (IdleStateEvent) evt;
                switch (e.state()) {
                    case READER_IDLE:
                        this.handleReaderIdle(ctx);
                        break;
                    case WRITER_IDLE:
                        this.handleWriterIdle(ctx);
                        break;
                    case ALL_IDLE:
                        this.handleAllIdle(ctx);
                        break;
                    default:
                        break;
                }
            }
        }

    客户端心跳处理器

    继承抽象心跳处理器,并重写事件发送PING消息。

    
    public class ClientHeartbeatHandler extends AbstractHeartbeatHandler {
    
        @Override
        protected void handleAllIdle(ChannelHandlerContext ctx) {
            this.sendPing(ctx);
        }
    }

    服务端心跳处理器

    继承抽象心跳处理器,并重写事件关闭通道。

    
    public class ServerHeartbeatHandler extends AbstractHeartbeatHandler {
    
        @Override
        protected void handleReaderIdle(ChannelHandlerContext ctx) {
            logger.info("ServerHeartbeatHandler.handleReaderIdle reader timeout ,close channel");
            ctx.close();
        }
    
    }

    客户端ChannelPipeline中加入心跳处理器

    比如5秒内未写入或者读取通道数据就触发超时事件。

    
    .addLast(new IdleStateHandler(0, 0, Constants.ALLIDLE_TIME_SECONDS));

    服务端ChannelPipeline中加入心跳处理器

    比如10秒未接收到通道消息就触发读超时事件。

    
     .addLast(new IdleStateHandler(Constants.READER_TIME_SECONDS, 0, 0))

    客户端消息示例

    正常情况下心跳消息显示如下图所示,消息的内容可以根据自己的情况自行定义。

    客户端下线消息示例

    停止客户端程序,然后服务端读超时事件触发,并关闭通道。

    客户端可用连接管理

    由于上述的服务端心跳处理器,在触发读超时后会关闭通信管道,这导致客户端缓存的连接状态会出现不可用的情况,为了让客户端一直只能取到可用连接就需要对从缓存中获取到的连接做状态判断,如果可用直接返回,如果不可用则将连接从可用列表中删除然后取下一个可用连接。

    修改获取连接方法

    通过channel的isActive属性可以判断连接是否可用,如果不可以做删除并重新获取的操作。

    
    public RpcClientInvoker getInvoker() {
            // ...
            int index = loadbalanceService.index(size);
            RpcClientInvoker invoker= RpcClientInvokerCache.get(index);
            if(invoker.getChannel().isActive()) {
                return invoker;
            }
            else {
                RpcClientInvokerCache.removeHandler(invoker);
                logger.info("invoker is not active,so remove it and get next one");
                return this.getInvoker();
            }
        }

    后台启动任务处理不可用连接

    启动一个每隔5秒执行一次任务的线程,定时取出不可用连接,然后重连,并将不可用连接删除。

    这里我处理的重连是直接丢弃原有不可用连接,然后重新创建新连接。

    
        private static final Logger logger = LoggerFactory.getLogger(RpcClientInvokerManager.class);
    
        static {
            executorService.schedule(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        List<RpcClientInvoker> notConnectedHandlers = RpcClientInvokerCache.getNotConnectedHandlers();
                        if (!CollectionUtils.isEmpty(notConnectedHandlers)) {
                            for (RpcClientInvoker invoker : notConnectedHandlers) {
                                RpcClientInvokerManager.getInstance(referenceConfig).connect();
                            }
                            RpcClientInvokerCache.clearNotConnectedHandler();
                        }
                    }
                }
            }, Constants.RECONNECT_TIME_SECONDS,TimeUnit.SECONDS);
    
        }

    本文源码

    https://github.com/jiangmin168168/jim-framework

    文中代码是依赖上述项目的,如果有不明白的可下载源码

    引用

    本文中的图取自于网格

    转载于:https://www.cnblogs.com/ASPNET2008/p/7615973.html

    展开全文
  • 电话之于短信、微信的个很大的...而短信或者微信,只知道消息发出去了,但对方是否收到,或者是否查看不清楚了。在通过网络通信的环境下,也是很难知道消息对方是否已经处理,因为要知道对方是否处理,依赖...
  • 电话之于短信、微信的个很大的不同点...而短信或者微信,只知道消息发出去了,但对方是否收到,或者是否查看不清楚了。 在通过网络通信的环境下,也是很难知道消息对方是否已经处理,因为要知道对方是否处...
  • 余链路,它们之间相互发送报文来告诉对方自己当前的状态,如果在指定的时间内未收到对方发送的报文,那么认为对方失效,这时需启动资源接管模块来接管运 行在对方主机上的资源或者服务。 高可用集群是指组通过...
  • UDP/TCP与fork

    千次阅读 2010-06-30 22:32:00
    因此不要指望对端能收到自己closesocket的消息,即使是有人想出用带外数据传输也是徒劳的,因此只能通过超时机制或者心跳机制来保活; 2.对于tcp,其比udp开销大,大大在协议头的空间开销和重传以及ip分段...
  • 目录 1. 概述 2. Timing wheel 原理 2.1 连接超时被踢掉的过程 ...应该用心跳消息来判断对方进程是否能正常工作,“踢掉空闲连接”只是权宜之计。如果个连接在连续几秒时间内没有收到数据,
  • 保持个 TCP 心跳,如果发现对方不在了,超时重复 1 步骤,重新建立联系。 整体的步骤和上述的一样,下面用代码展开: 搭建 UDP 模块 public UDPSocket(Context context) { this.mContext = context;
  • 两台服务器互为主备,这是他们之间还会互相发送报文来告诉对方自己的当前的状态,如果在指定的时间内未收到对方发送的心跳报文,那么,一方会认为对方失效或者是已经宕机了,这时每个运行正常的主机会启动自身...
  • 应该用心跳消息来判断对方进程是否能正常工作,“踢掉空闲连接”只是时权宜之计。我这里想顺便讲讲 shared_ptr 和 weak_ptr 的用法。如果个连接连续几秒钟(后文以 8s 为例)内没有收到数据,把它断开,为此有...
  • UDP穿透内网

    2020-05-02 20:05:57
    服务器收到后记录并交换对方地址发给客户端,因为nat中分配的外网地址是暂时的可能几秒后会变化,所以这里需要和服务器建立心跳机制,之后两个客服端拿到对方地址后发送穿透信息,此时第消息会被两个客户端的...
  • 对方发送任意消息给你时[包括request,response消息],smsgate会自动调用handler里的channelRead方法,因此可在此方法内接收消息并作处理业务,但避免作非常耗时的操作,会影响netty的处理效率,甚至完全耗完...
  • 果发送成功并且对方已经收到;会触发发送成功的事件; 三:用户可以通过ITxServer.FileLog记录服务器的运行信息; 2 ) :TCPClient客户端,通过静态方法TxStart.startClient(服务器地址或网址, 服务器端口号)注册成功...
  • 比如说服务发现框架<code>consul用了gossip协议( <a href="https://www.serf.io/">Serf)来做管理主机的关系以及集群之间的消息广播,Cassandra也用到了这个协议,用来实现一些节点发现、健康检查等。 ...

空空如也

空空如也

1 2
收藏数 23
精华内容 9
关键字:

一收到对方消息就心跳