精华内容
下载资源
问答
  • BIO NIO 粘包与拆包

    2020-07-08 21:06:52
    BIO NIO 粘包与拆包动机什么是粘包与拆包那为什么我们经常把粘包拆包问题与NIO一起谈在TCP相关的API中没有packet相关的概念NIO模式 与粘包拆包 动机 Netty 提供了大量的开箱即用的组件, 这些组件对使用者透明了很多...

    动机

    Netty 提供了大量的开箱即用的组件, 这些组件对使用者透明了很多技术实现细节, 粘包拆包就是其中很有趣的部分.

    本人在使用LengthFieldBasedFrameDecoder(在自定义通讯协议的场景下经常被使用: 作为TCP流 -> 业务数据包的拆包解析器)时对这个组件的参数还有用法产生了困惑,查阅了一些资料也阅读了源码…写这篇文章的目的就是为了梳理这一块的知识.

    什么是粘包与拆包

    这篇文章简明扼要解释了何为粘包,拆包TCP粘包,拆包及解决方法
    首先, 粘包与拆包这两个行为并非是NIO独有的 只要你的应用层协议是基于TCP协议拟定且进行长通讯,就一定会涉及拆包粘包开发者在写业务代码时往往感知不到的原因是底层框架已经实现了相关的细节 这一点与是否是BIO或NIO通讯无关. 个人认为粘包 拆包跟IO模式完全是两个不同维度的概念.

    为什么我们经常把粘包拆包问题与NIO一起谈

    在TCP相关的API中没有packet相关的概念

    严谨点说, TCP不存在packet这个概念(TCP中是以segment作为细粒度的单位), TCP相关的API往往提供的是读写流的获取与相关的操作, 以JAVA为例:

    • ServerSocket & Socket -> BIO模型
    • ServerSocketChannel & SocketChannel -> NIO模型

    无论是使用哪种API, 涉及都是流相关的操作, 开发者是感知不到所谓的TCP Packet对象(本就不存在这个)
    -> 从代码层面上讲一般情况下(HTTP1.0) BIO模式下一次完整的流读取 = 一次完整的HTTP请求

    为了增加说服力,本人以JLHTTP(基于BIO的一款轻量级Web容器)部分源码为例:

     //维护的基于BIO的ServerSocket对象
      ServerSocket serv = HTTPServer.this.serv; // keep local to avoid NPE when stopped
                    while (serv != null && !serv.isClosed()) {
                        final Socket sock = serv.accept();
                        executor.execute(new Runnable() {
                            public void run() {
                                try {
                                    try {
                                        sock.setSoTimeout(socketTimeout);
                                        sock.setTcpNoDelay(true); // we buffer anyway, so improve latency
                                        handleConnection(sock, sock.getInputStream(), sock.getOutputStream());
                                    } finally {
                                        try {
                                            // RFC7230#6.6 - close socket gracefully
                                            // (except SSL socket which doesn't support half-closing)
                                            if (!(sock instanceof SSLSocket)) {
                                                sock.shutdownOutput(); // half-close socket (only output)
                                                transfer(sock.getInputStream(), null, -1); // consume input
                                            }
                                        } finally {
                                            sock.close(); // and finally close socket fully
                                        }
                                    }
                                } catch (IOException ignore) {}
                            }
                        });
    
    
    

    关键代码是handleConnection(sock, sock.getInputStream(), sock.getOutputStream());这一行, 需要注意的是传递的sock对象是调用一次java.net.ServerSocket#accept获取的:

     protected void handleConnection(Socket socket, InputStream in, OutputStream out) throws IOException {
            in = new BufferedInputStream(in, 4096);
            out = new BufferedOutputStream(out, 4096);
            Request req;
            Response resp;
            do {
                // create request and response and handle transaction
                req = null;
                resp = new Response(out);
                try {
                    req = new Request(in);
                    req._remote = socket.getInetAddress();//xyj,201901,add remoteAddr
                    handleTransaction(req, resp);
                } catch (Throwable t) { 
                   //异常处理
                    ..............
                    ..............
                    ..............
                    break; // proceed to close connection
                } finally {
                    resp.close(); // close response and flush output
                }
                // consume any leftover body data so next request can be processed
                transfer(req.getBody(), null, -1);
                // RFC7230#6.6: persist connection unless client or server close explicitly (or legacy client)
            } while (!"close".equalsIgnoreCase(req.getHeaders().get("Connection"))
                && !"close".equalsIgnoreCase(resp.getHeaders().get("Connection")) && req.getVersion().endsWith("1.1"));
        }
    

    在handleConnection(…) 函数 用java.net.Socket#getInputStream(仅一次)后获取的InputStream对象中读取的数据流(read()函数读到-1为止)反序列化了一个完整的HTTP请求对象

    因此从代码层面上讲 BIO模式下的粘包拆包是伪命题 -> 代码层面上不需要考虑这个点

    NIO模式 与粘包拆包

    大家把NIO Netty 粘包与拆包放在一起聊的原因很可能是因为在一些场景下,从代码/实现层面上需要考虑部分/全部粘包/拆包细节

    首先, 思考一下为什么会有这个问题吧

    从TCP API 角度上看, 无论NIO还是BIO都是流的操作,怎么BIO下的伪命题在NIO下变成大问题了呢

    老生常谈, 浅谈一下多路复用
    以Netty这个NIO的封装框架为例, 线程模型分为 parent & child 两种, 分别对应处理连接与执行业务逻辑(此上下文中,业务还涵盖了相关的统一的处理等),需要注意的是,parent线程不再专属于某一个客户端(BIO下),而是N个客户端分享同一个parent线程(多路复用)
    即使parent线程获取了数据流并且封装成对应的流相关对象,那个对象也不再能映射成一个完整的HTTP请求/业务数据包了...极端场景下,对流的读取甚至返回null
    那么如何 粘包拆包
    简单的思维方式就是: 解析通过读取流反序列化的对象(e.g Netty中的ByteBuf), 看看这个对象中的二进制流(代码层面上的byte[])满不满足一个完整的业务数据包, 这里的业务数据包, 可以指一个完整的Http请求对象…也可以是特定协议下的对应的通讯模型

    ps. Netty自带了众多开箱即用的Decoder组件,建议允许下使用这些组件

    本文仅代表本人观点, 如果有不足地方烦请大佬们指正.

    参考:
    JLHTTP
    https://blog.csdn.net/wxy941011/article/details/80428470

    展开全文
  • Java NIOtcp粘包拆包

    2019-12-16 21:19:09
    tcp拆包,即tcp在发送数据时,可能会把一个tcp包拆成多个来发送 例如:客户端分两次给服务端发送了两个消息"ABCD" 和 “EFG” 1)服务端可能收到三个数据包,分别是"AB", “CD”, “EFG”,即第一个数据包被拆包成...

    一 ByteToMessageDecoder
    1.1 实例
    ByteToMessageDecoder,用于把一个byte流转换成一个对象,实例:

    public class StringDecoder extends ByteToMessageDecoder {
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, 
    List<Object> out) throws Exception {        byte[] bytes = new byte[in.readableBytes()];
            in.readBytes(bytes);
            out.add(new String(bytes));
    }
    }

    它有一个抽象方法decode,我们实现了这个方法,这个方法的第三个参数是一个List,所有加入这个List的对象都会被逐一的调用fireChannelRead方法映射事件。
    使用方法:ByteToMessageDecoder其实就是一个ChannelInboundHandler,直接加入到Pipeline即可:

            serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                         protected void initChannel(SocketChannel socketChannel) throws Exception {
                             socketChannel.pipeline().addLast(new StringDecoder());
                            //...
                        }
                        });

    这样,ByteBuf数据到达这个Handler之后,会被转成String,然后继续传递数据。
    1.2 实现
    ByteToMessage Decoder是个抽象类,它继承了ChannelInboundHandler,做了以下逻辑:
    1)重写父类的channelRead方法,在这个方法中,把ByteBuf数据交给子类decode方法处理,decode的方法定义如下:

    protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

    它的第二个参数为需要处理的ByteBuf,第三个参数为一个List,用于记录处理后的数据。
    2)子类decode方法返回会,它会遍历参数中的List,把里面的对象依次取出来调用fireChannelRead方法传递事件。
    3)如果decode方法没有把ByteBuf读取完,则会记录这次的ByteBuf对象,然后下一次处理消息时,会把下一次的ByteBuf和这次的ByteBuf合并,然后再交给子类decode处理。
    关于第三点,这个逻辑的目的是为了方便处理TCP的粘包和拆包
    1.3 源码
    源码从ByteToMessageDecoder的channelRead方法开始
    步骤一:把当前的ByteBuf与上次未处理的ByteBuf合并:

    ByteBuf data =(ByteBuf)msg;
    first = curdlation ==null;
    if (first){
       cumulation = data;
    }else{
        cumulation = cumulator.cumulate(ctx.alloc(),cumulation,data);
    }

    例如:上次未处理的ByteBuf是[1,2,3,4,5,0,0],这一次的ByteBuf是[6,7,8],处理完之后结果是[1,2,3,4,5,6,7,8]
    注:可以会有一个疑问是:为什么ByteBuf会扩容成了8,而不是64?因为这里没有使用ByteBuf的扩容逻辑,而是自己实现了一套。
    步骤二:处理子类decode后添加到List中的数据:

        int outSize = out.size();
                    if (outSize > 0) {
                        fireChannelRead(ctx, out, outSize);
                        out.clear();
      }

    为啥还没调用子类的decode方法就要处理List了?因为这个逻辑是在循环里的,简化代码表示即:

    while (in.isReadable()) { 
                 fireChannelRead(ctx, out, outSize);
                 decode(ctx, in, out);
    }

    所以每次循环时处理的都是上一次循环后子类添加到List中的数据。
    步骤三:处理子类decode后的ByteBuf

     //这个if条件能进去,说明已经读完了
                    if (cumulation != null && !cumulation.isReadable()) {
                        numReads = 0;
                        cumulation.release();
                        cumulation = null;
                    } else if (++ numReads >= discardAfterReads) {
                        numReads = 0;
                        discardSomeReadBytes();
                    }

    这一步还有一个逻辑:如果连续16次都没处理ByteBuf,则会把ByteBuf中的数据压缩一次。
    顺便给大家推荐一个Java技术交流群:473984645里面会分享一些资深架构师录制的视频资料:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化、分布式架构等这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多!
    二 .tcp 拆包粘包问题
    2.1 问题描述
    tcp粘包,即tcp在发送数据时,可能会把两个tcp包合并成一个发送
    tcp拆包,即tcp在发送数据时,可能会把一个tcp包拆成多个来发送
    例如:客户端分两次给服务端发送了两个消息"ABCD" 和 “EFG”
    1)服务端可能收到三个数据包,分别是"AB", “CD”, “EFG”,即第一个数据包被拆包成了两个
    2)服务端可能只会收到一个数据包:“ABCDEFG”,即两个数据包被合并成了一个包
    3)服务端甚至可能会收到"ABC", “DEFG”,会拆包再粘包
    2.2 产生的原因
    分为以下三个原因:
    1)socket缓冲区造成的粘包:
    每个socket都有一个发送缓存区与接收缓冲区,客户端向服务端写数据时,实际上是写到了服务端socket的接收缓冲区中。
    服务端调用read方法时,其实只是把接收缓冲区的内容读取到内存中了。因此,服务端调用read方法时,可能客户端已经写了两个包到接收缓冲区中了,因此read到的数据其实是两个包粘包后的数据。
    2)MSS/MTU限制导致的拆包
    MSS是指TCP每次发送数据允许的最大长度,一般是1500字节,如果某个数据包超过了这个长度,就要分多次发送,这就是拆包。
    3)Nagle算法导致的粘包
    网络数据包都是要带有数据头部的,通常是40字节,假如我们发送一个字节的数据,也要加上这40个字节的头部再发送,显然这样是非常不划算的。
    所以tcp希望尽可能的一次发送大块的数据包,Nagle算法就是做这个事的,它会收集多个小数据包,合并为一个大数据包后再发送,这就是粘包。
    2.3 解决办法
    通常,解决tcp粘包拆包问题,是通过定义通信协议来实现的:
    定长协议
    即规定每个数据包的长度,假如我们规定每个数据包的长度为3,假如服务端收到客户端的数据为:“ABCD”, “EF”,那么也可以解析出实际的数据包为"ABC", “DEF”。
    特殊分隔符协议
    即规定每个数据包以什么样的字符结尾,如规定以 符 号 结 尾 , 假 如 服 务 端 收 到 的 数 据 包 为 : " A B C D 符号结尾,假如服务端收到的数据包为:"ABCD "ABCDEF", “G$”,那么可以解析出实际数据包为:“ABCD”, “EFG”。这种方式要确保消息体中可能会出现分隔符的情况。
    长度编码协议
    即把消息分为消息头和消息体,在消息头中包含消息的长度关于tcp粘包拆包的内容,这有篇文章讲得非常好,强推:TCP粘包、拆包与通信协议
    三 Netty中解决tcp粘包拆包问题的方法
    3.1 自定义一个tcp粘包拆包处理器
    基于ByteToMessageDecoder,我们可以很容易的实现处理tcp粘包拆包问题的Handler,以定长协议为例,我们来实现一个定长协议的tcp粘包拆包处理器:

    public class LengthDecoder extends ByteToMessageDecoder {
        private int length;
    
        public LengthDecoder(int length) {
            this.length = length;
        }
    
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            while (in.readableBytes() >= length) {
                byte[] buff = new byte[length];
                in.readBytes(buff);
                out.add(new String(buff));
                }
            }
        }
       

    在decode方法中,我们循环判断,如果ByteBuf中未读的数据量大于指定的长度length,我们就读到lenght个数据,然后转成字符串加入到List中。
    后续ByteToMessageDecoder依次把List中的数据fireChannelRead传递事件。
    如果ByteBuf中的未读数据不够length,说明发生了拆包,后续还有数据,这里直接不处理即可,ByteToMessageDecoder会帮我们记住这次的ByteBuf,下一次数据来了之后,会跟这次的数据合并后再处理。
    3.2 Netty中自带的tcp粘包拆包处理器
    Netty中实现了很多种粘包拆包处理器:
    1)FixedLengthFrameDecoder:与我们上面自定义的一样,定长协议处理器
    2)DelimiterBasedFrameDecoder:特殊分隔符协议的处理器
    3)LineBasedFrameDecoder:特殊分隔符协议处理器的一种特殊情况,行分隔符协议处理器。
    4)JsonObjectDecoder:json协议格式处理器
    5)HttpRequestDecoder:http请求体协议处理器
    6)HttpResponseDecoder:http响应体处理器,很明显这个是用于客户端的
    文章较长,感谢您的阅读。对文章如有疑问,欢迎提出。望分享的内容对大家有所帮助。搜集整理了一些Java资料,包括Java进阶学习路线以及对应学习资料,还有一些大厂面试题,需要的朋友可以自行领取:Java高级架构学习资料分享+架构师成长之路
    顺便给大家推荐一个Java技术交流群:473984645里面会分享一些资深架构师录制的视频资料:有Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化、分布式架构等这些成为架构师必备的知识体系。还能领取免费的学习资源,目前受益良多!

    展开全文
  • 概述 TCP传输协议是面向连接的,面向流提供高可靠的服务。收发两端(服务端和客户端)都要有一一...由于TCP无消息保护边界,需要在接收端处理消息边界问题,也就是粘包、拆包问题。 示意图: 说明: 服务端给客户端

    概述
    TCP传输协议是面向连接的,面向流提供高可靠的服务。收发两端(服务端和客户端)都要有一一成对的socket,因此,发送端为了将多个发给接收端的包,更有效地发给对方,使用了优化算法(Nagle算法),将多次间隔时间较小且数据较小的数据包,合成一个大的数据块,然后进行封包,这样做虽然提高了传输的效率,但是这样接收端就难以分辨出一个个完整的包的大小了,因为面向流的通信时无消息保护边界的。

    由于TCP无消息保护边界,需要在接收端处理消息边界问题,也就是粘包、拆包问题。

    示意图:

    说明:

    服务端给客户端发送D1、D2两个数据包。
    第一种情况是服务端两次都读取到的是两个独立的数据包,无拆包粘包问题,可以正常解析。
    第二种情况D1、D2包合并在一起一次发送,服务端一次性接收了两个数据包,分别是D1、D2,我们称之为粘包,因为不知道边界,所以服务端不知如何拆出来解析。
    第三种情况,服务端第一次接收到了完整D1包和部分D2包,第二次接收到了剩余的D2包,D2包被拆开来分多次发送了,我们称之为拆包。
    第四种情况,服务端第一次接收到了部分D1包,第二次接收到了剩余部分D1包和完整D2包,也称之为拆包。
    总的来说拆包和粘包问题,因为没有边界,最终会在正常情况下导致接收方无法分辨出一个一个包。

    粘包实例
    以下实例证实了粘包的存在:
    服务端及服务端的Handler:

    public class NettyTcpServer {


        private  int port;

        public NettyTcpServer(int port){
            this.port = port;
        }


        public void start() throws InterruptedException {

            NioEventLoopGroup bossGroup = new NioEventLoopGroup();
            NioEventLoopGroup workerGroup = new NioEventLoopGroup();

            ServerBootstrap serverBootstrap = new ServerBootstrap();

            serverBootstrap.group(bossGroup,workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new NettyTcpServerChannelHandler());
                        }
                    });

            ChannelFuture channelFuture = serverBootstrap.bind(port);
            channelFuture.channel().closeFuture().sync();
        }

        public static void main(String[] args) throws InterruptedException {
            NettyTcpServer nettyTcpServer = new NettyTcpServer(8989);
            nettyTcpServer.start();
        }
    }


    public class NettyTcpServerChannelHandler extends ChannelInboundHandlerAdapter {

        private int count;
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf byteBuf = (ByteBuf) msg;
            System.out.println("数据是:" + byteBuf.toString(CharsetUtil.UTF_8));
            System.out.println("count = " + (++count));
            super.channelRead(ctx, msg);
        }
    }

    客户端及客户端的Handler

    public class NettyTcpClient {

        private  int serverPort;


        public NettyTcpClient(int serverPort){
            this.serverPort = serverPort;
        }

        public void start() throws InterruptedException {
            NioEventLoopGroup worker = new NioEventLoopGroup();

            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(worker)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new NettyTcpClientChannelHandler());
                        }
                    });

            ChannelFuture connect = bootstrap.connect("127.0.0.1", serverPort);
            connect.channel().closeFuture().sync();
        }

        public static void main(String[] args) throws InterruptedException {
            NettyTcpClient nettyTcpClient = new NettyTcpClient(8989);
            nettyTcpClient.start();
        }
    }


    public class NettyTcpClientChannelHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            //发生这个事件代表通道已经顺利连接到远端,可以收发数据。我们就在这里发送数据、
            //分十次发送
            for (int i = 0;i<10;i++){
                ctx.channel().writeAndFlush(Unpooled.copiedBuffer("Hello I am John;", CharsetUtil.UTF_8));
            }
            ctx.fireChannelActive();
        }
    }

    执行结果:

    这个是服务端的输出结果,如果是没有拆包粘包的话,客户端分十次发送了数据,那么服务端也应该分十次接收,count应该等于10,但是此时count=1,然后数据一次性显示出来了 ,说明客户端的数据时把十次合并成一个数据包发送的,数据粘包现象。
     

    展开全文
  • # 解决tcp拆包,粘包问题 * 方案一:消息定长 * 方案二:在包尾部加上特殊字符进行分割 * 方案三:将消息分为消息头,消息体,类似与自定义协议 此处示例在尾部添加特殊字符解决拆包粘包问题 在 ServerBootstrap的...
    # 解决tcp拆包,粘包问题
    * 方案一:消息定长
    * 方案二:在包尾部加上特殊字符进行分割
    * 方案三:将消息分为消息头,消息体,类似与自定义协议

    此处示例在尾部添加特殊字符解决拆包粘包问题

    在 ServerBootstrap的childHandler方法里面(initChannel)添加 

    // 设置特殊分隔符
    ByteBuf buf = Unpooled.copiedBuffer("$end$".getBytes());
    sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,buf));
    
    // 设置字符串形式的解码
    sc.pipeline().addLast(new StringDecoder());

    算了,直接看代码吧:

    Client.java

    package zzq.neety02;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class Client {
        public static void main(String[] args) throws InterruptedException {
    
            // 创建线程组
            NioEventLoopGroup group = new NioEventLoopGroup();
    
            // 创建辅助工具类
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group) //绑定线程组
                    .channel(NioSocketChannel.class) // 指定 NIO 模式
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            // 设置特殊分隔符
                            ByteBuf buf = Unpooled.copiedBuffer("$end$".getBytes());
                            sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,buf));
    
                            // 设置字符串形式的解码
                            sc.pipeline().addLast(new StringDecoder());
                            sc.pipeline().addLast(new ClientHandler()); // 在这里配置数据接收处理的方法(类)
                        }
                    });
    
            // 绑定连接地址
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",8888).sync();
    
            // 向服务端发送消息
            channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer("你好,一加一对于多少$end$".getBytes()));
            channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer("你好,1234$end$".getBytes()));
            channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer("你好,5678$end$".getBytes()));
    
            channelFuture.channel().closeFuture().sync();
            group.shutdownGracefully();
        }
    }
    
     

    ClientHandler.java

    package zzq.neety02;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class ClientHandler extends ChannelHandlerAdapter {
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println( cause.toString() );
            System.out.println("出错了...");
            ctx.close();
        }
    
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println( "注册成功..." );
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println( "激活成功..." );
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String resp = (String) msg;
    
            System.out.println(  "收到了消息:" + msg );
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            System.out.println("我读完了...");
        }
    }
    

     

    Server.java

    package zzq.neety02;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    import java.nio.Buffer;
    
    public class Server {
        public static void main(String[] args) throws InterruptedException {
            //1 创建线两个程组
            //一个是用于处理服务器端接收客户端连接的
            //一个是进行网络通信的(网络读写的)
            NioEventLoopGroup pGroup = new NioEventLoopGroup();
            NioEventLoopGroup cGroup = new NioEventLoopGroup();
    
            //2.创建辅助工具类,用于服务器通信的一系列配置
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(pGroup,cGroup) // 绑定两个线程组
            .channel(NioServerSocketChannel.class) // 指定NIO模式
            .option(ChannelOption.SO_KEEPALIVE , true) // 设置保持连接
            .childHandler(new ChannelInitializer<SocketChannel>() {
    
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    // 设置特殊分隔符
                    ByteBuf buf = Unpooled.copiedBuffer("$end$".getBytes());
                    sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,buf));
    
                    // 设置字符串形式的解码
                    sc.pipeline().addLast(new StringDecoder());
    
                    // 3. 在这里配置具体的数据处理方法
                    sc.pipeline().addLast(new ServerHandler());
                }
            });
    
            //4 进行绑定
            ChannelFuture cf1 = bootstrap.bind(8888).sync();
            //ChannelFuture cf2 = b.bind(8764).sync();
            //5 等待关闭
            cf1.channel().closeFuture().sync();
            //cf2.channel().closeFuture().sync();
            pGroup.shutdownGracefully();
            cGroup.shutdownGracefully();
        }
    }
    

    ServerHandler.java

    package zzq.neety02;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    import java.util.Base64;
    import java.util.Date;
    
    public class ServerHandler extends ChannelHandlerAdapter {
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            // 出现异常
            System.out.println( cause.toString() );
            ctx.close();
        }
    
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println("netty is register...");
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("netty is active...");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String request = (String)msg;
            System.out.println("客户端发来消息:"+ request );
            String resp = "服务端响应:收到了你的消息==>" + new Date() + "$end$";
    
            ctx.writeAndFlush( Unpooled.copiedBuffer(resp.getBytes()) );
            System.out.println("返回数据:" + resp );
         }
    
    
    }
    

     

    展开全文
  • Netty-TCP拆包/粘包

    2020-04-17 19:35:12
    Netty-TCP拆包/粘包 TCP拆包/粘包 TCP 是一个面向字节流的协议,它是性质是流式的,所以它并没有分段。就像水流一样,你没法知道什么时候开始,什么时候结束。所以它会根据当前的套接字缓冲区的情况进行拆包或是粘包...
  • Netty自定义TCP拆包解码器 定义开始、结束标识字节数组 这里使用###START###作为开始标识,###STOP###作为结束标识,代码如下: public class CodecConstants { public static final byte[] BEGIN_DELIMITER = ...
  • Netty之Tcp拆包粘包

    2018-04-19 11:07:35
    1、TCP粘包、拆包问题 1.1TCP粘包/拆包问题 TCP是一个“流”协议,所谓流,就是没有界限的一长串二进制数据。TCP作为传输层协议并不不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行数据包的划分,...
  • 什么是TCP拆包、粘包? 在网络通信中,数据在底层都是以字节流形式在流动,那么发送方和接受方理应有一个约定(协议),只有这样接受方才知道需要接受多少数据,哪些数据需要在一起处理;如果没有这个约定,就会出现...
  • 1.TCP拆包、粘包问题 tcp编程,无论是服务器端还是客户端,当我们读取或者发送数据的时候,都需要考虑TCP底层的粘包/拆包机制。TCP是一个“流”协议,所谓流就是没有界限的遗传数据。可以想象下河里的水就好比数据...
  • // (1) ByteToMessageDecoder 是 ChannelInboundHandler 的一个实现,在收到请求时会得到执行 public class TimeDecoder extends ByteToMessageDecoder { // (2) decode() 方法会将所有接收到的数据放入一个内部的...
  • 三次握手、四次挥手 tcp连接要经历三次握手,断开要经历...NIOTCP点对点的通信基础上,服务端实例化一个ServerSocketChannel,创建一个多路复用Selector,客户端实例化SocketChannel,注册到Selector,Selector...
  • 然后将拆分后的分别发送,而我们接收时要获取发送时的数据报,如何再对其拆分与组装,以便于我们能知道报文的意思,这个提取报文的过程就是TCP拆包与粘包,在我们自己做底层的通信设计时,这是必须要考虑的。...
  • 文章目录异常情况模拟服务器改造客户端改造运行结果拆包粘包原因分析拆包粘包解决办法LineBasedFrameDecoder 换行符分隔消息服务端修改客户端修改运行结果原理分析DelimiterBasedFrameDecoder 固定分隔符解码器...
  • tcp是个流协议,所谓流,就是没有界限的一串数据。tcp底层并不了解上层...这就是所谓的tcp拆包/粘包问题。问题发生的原因有3个: 1,应用程序write写入的字节大小大于套接口发送缓冲区大小 2,进行MSS大小的tcp分段
  • 解决Netty中TCP拆包、粘包问题

    千次阅读 2018-08-21 12:18:13
    在基于流的传输里比如TCP/IP,接收到的数据会先被存储到一个socket接收缓冲里。不幸的是,基于流的传输并不是一个数据包队列,而是一个字节队列。即使你发送了2个独立的数据包,操作系统也不会作为2个消息处理而仅仅...
  • 最近通过学习@李林峰 大神的 《netty权威指南》 对Java io 有了进一步的了解,我对其代码加了一些注解和个人的看法。 对netty有了进一步的了解,所以后面有一些对rocketmq和dubbo中协议...书中的粘包拆包就是这
  • //我要指定使用NioServerSocketChannel这种类型的通道 .channel(NioServerSocketChannel.class) //一定要使用 childHandler 去绑定具体的 事件处理器 .childHandler(new ChannelInitializer&...
  • TCP协议是面向流的协议,是流式的,没有业务上的分段,只会根据当前套接字缓冲区的情况进行拆包或者粘包: 发送端的字节流都会先传入缓冲区,再通过网络传入到接收端的缓冲区中,最终由接收端获取。 2、TCP粘包和...
  • server端接收到13条就报错,怀疑发生了tcp粘包与拆包。 二、自定义协议 我们通过自定义协议来解决tcp粘包拆包问题 首先定义一个常量,用来标记开始位 public class TcpPackageProtocolHead { public ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 4,060
精华内容 1,624
关键字:

nio实现tcp拆包