精华内容
参与话题
问答
  • Netty-TCP拆包/粘包

    2020-04-17 19:35:12
    Netty-TCP拆包/粘包 TCP拆包/粘包 TCP 是一个面向字节流的协议,它是性质是流式的,所以它并没有分段。就像水流一样,你没法知道什么时候开始,什么时候结束。所以它会根据当前的套接字缓冲区的情况进行拆包或是粘包...

    Netty-TCP拆包/粘包

    TCP拆包/粘包

    TCP 是一个面向字节流的协议,它是性质是流式的,所以它并没有分段。就像水流一样,你没法知道什么时候开始,什么时候结束。所以它会根据当前的套接字缓冲区的情况进行拆包或是粘包

    粘包问题图示:

    图1.png

    客户端发送两个数据包D1&D2给服务端,因为服务端一次读取的字节数是不确定的,所以可能出现:

    • 正常情况,服务端分两次读取到了两个独立的数据包

    • 服务端一次收到两个数据包,两个粘合在了一起,出现粘包现象

    • 服务端分两次读取到了两个数据包,第一次读取到完整的D1包&部分D2包的内容,第二次读取到了D2剩余内容,出现拆包现象

    • 服务端分两次读取到了两个数据包,第一次读取D1的部分,第二次读取了D1剩余内容以及完整D2

    TCP拆包/粘包发送原因

    图示:

    图2.png

    三个原因:

    • 应用程序write写入的字节大小大于套接口发送缓冲区的大小

    • 进行MSS大小的TCP分段

    • 以太网帧的payload大于MTU进行IP分片

    例子

    未考虑TCP粘包的情况

    
    public class Client {
    
        public void connect(String host, int port) throws InterruptedException {
    
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                        .channel(NioSocketChannel.class)
    //                    .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
    
                                ChannelPipeline p = ch.pipeline();
    //                            p.addLast(new LineBasedFrameDecoder(1024));
    //                            p.addLast(new StringDecoder());
    //                            p.addLast(new StringEncoder());
    
                                p.addLast(new ClientHandler());
                            }
                        });
    
                ChannelFuture future = b.connect(host, port).sync();
    
                future.channel().closeFuture().sync();
            } finally {
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
    
            new Client().connect("localhost",9988);
        }
    
    
    }
    
    
    public class ClientHandler extends ChannelInboundHandlerAdapter {
    
    //    private Logger logger = LoggerFactory.getLogger(getClass());
    
        private int count =0;
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // Send the message to Server
            ByteBuf buf = null;
            for(int i=0; i<100; i++){
    
                String msg = "hello from client "+i + "\n";
                byte[] r = msg.getBytes();
                buf = Unpooled.buffer(r.length);
                buf.writeBytes(r);
               ctx.writeAndFlush(buf);
    //            System.out.println("client send message:{}   " + msg);
    
    
    //            ctx.writeAndFlush(msg+System.getProperty("line.separator"));
            }
    
            System.out.println("out");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            String body = (String) msg;
            count++;
            System.out.println("client read msg:{}, count:{}   " + body + "    " + count);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    
    public class Server {
    
    //    private Logger logger = LoggerFactory.getLogger(getClass());
    
        public void bind(int port) throws Exception {
    
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
    //                    .option(ChannelOption.SO_BACKLOG, 1024)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
    
                                ChannelPipeline p = ch.pipeline();
    //                            p.addLast(new LineBasedFrameDecoder(1024));
    //                            p.addLast(new StringDecoder());
    //                            p.addLast(new StringEncoder());
    
                                p.addLast(new ServerHandler());
                            }
                        });
    
                // Bind and start to accept incoming connections.
                ChannelFuture f = b.bind(port).sync(); // (7)
    
                System.out.println("server bind port:{}    "+ + port);
    
                // Wait until the server socket is closed.
                f.channel().closeFuture().sync();
    
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
    
            new Server().bind(9988);
        }
    }
    
    
    public class ServerHandler extends ChannelInboundHandlerAdapter {
    //    private Logger logger = LoggerFactory.getLogger(getClass());
    
        private int count = 0;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            byte[] req = new byte[buf.readableBytes()];
    //        System.out.println("len  " + req.length);
            buf.readBytes(req);
            String body = new String(req, "UTF-8");
    
            System.out.println("============package=====================");
            System.out.println(body);
            System.out.println("============package=====================");
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
    }
    
    
    
    
    
    
    

    运行输出:

    
    ============package=====================
    hello from client 0
    hello from client 1
    hello from client 2
    hello from client 3
    hello from client 4
    hello from client 5
    hello from client 6
    hello from client 7
    hello from client 8
    hello from client 9
    hello from client 10
    hello from client 11
    hello from client 12
    hello from client 13
    hello from client 14
    hello from client 15
    hello from client 16
    hello from client 17
    hello from client 18
    hello from client 19
    hello from client 20
    hello from client 21
    hello from client 22
    hello from client 23
    hello from client 24
    hello from client 25
    hello from client 26
    hello from client 27
    hello from client 28
    hello from client 29
    hello from client 30
    hello from client 31
    hello from client 32
    hello from client 33
    hello from client 34
    hello from client 35
    hello from client 36
    hello from client 37
    hello from client 38
    hello from client 39
    hello from client 40
    hello from client 41
    hello from client 42
    hello from client 43
    hello from client 44
    hello from client 45
    hello from client 46
    hello from client 47
    hello from client 48
    hello
    ============package=====================
    ============package=====================
     from client 49
    hello from client 50
    hello from client 51
    hello from client 52
    hello from client 53
    hello from client 54
    hello from client 55
    hello from client 56
    hello from client 57
    hello from client 58
    hello from client 59
    hello from client 60
    hello from client 61
    hello from client 62
    hello from client 63
    hello from client 64
    hello from client 65
    hello from client 66
    hello from client 67
    hello from client 68
    hello from client 69
    hello from client 70
    hello from client 71
    hello from client 72
    hello from client 73
    hello from client 74
    hello from client 75
    hello from client 76
    hello from client 77
    hello from client 78
    hello from client 79
    hello from client 80
    hello from client 81
    hello from client 82
    hello from client 83
    hello from client 84
    hello from client 85
    hello from client 86
    hello from client 87
    hello from client 88
    hello from client 89
    hello from client 90
    hello from client 91
    hello from client 92
    hello from client 93
    hello from client 94
    hello from client 95
    hello from client 96
    hello from client 97
    hello from client 98
    hello from client 99
    
    ============package=====================
    

    客户端发送的100条消息被当成了两个数据包进行处理,说明发送了粘包现象

    使用LineBasedFrameDecoder + StringDecoder 解决问题

    LineBasedFrameDecoder

    文档:

    public class LineBasedFrameDecoder
    extends ByteToMessageDecoder
    

    在行尾拆分接收到的ByteBuf的解码器,“ \ n”和“ \ r \ n”都被处理,字节流应采用UTF-8字符编码或ASCII。 当前实现使用直接字节进行字符转换,然后将该字符与一些低范围的ASCII字符(例如’\ n’或’\ r’)进行比较。 UTF-8没有将低范围[0…0x7F]字节值用于多字节代码点表示,因此此实现完全支持。

    LineBasedFrameDecoder 的工作原理是它依次遍历 ByteBuf 中的可读字节,判断看是否有 "\n” 或者 "\r\n”,如果有,就以此位置为结束位置,从可读索引到结束位置区间的字节就组成了一行。它是以回车换行符为结束标记的解码器,支持配置单行的最大长度,如果连续读取到最大长度后仍然没有发现换行符,会抛出异常,同时忽略掉之前读取到的异常码流。

    StringDecoder

    public class StringDecoder
    extends MessageToMessageDecoder<ByteBuf>
    

    将收到的ByteBuf解码为字符串。 请注意,如果使用的是基于流的传输方式(例如TCP / IP),则此解码器必须与适当的ByteToMessageDecoder(例如DelimiterBasedFrameDecoder或LineBasedFrameDecoder)一起使用。 TCP / IP套接字中基于文本的线路协议的典型设置为:

     ChannelPipeline pipeline = ...;
    
     // Decoders
     pipeline.addLast("frameDecoder", new LineBasedFrameDecoder(80));
     pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
    
     // Encoder
     pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
     
    and then you can use a String instead of a ByteBuf as a message:
     void channelRead(ChannelHandlerContext ctx, String msg) {
         ch.write("Did you say '" + msg + "'?\n");
     }
    
    

    StringEncoder

    public class StringEncoder
    extends MessageToMessageEncoder<java.lang.CharSequence>
    
    

    将请求的字符串编码为ByteBuf。 TCP / IP套接字中基于文本的线路协议的典型设置为:

     ChannelPipeline pipeline = ...;
    
     // Decoders
     pipeline.addLast("frameDecoder", new LineBasedFrameDecoder(80));
     pipeline.addLast("stringDecoder", new StringDecoder(CharsetUtil.UTF_8));
    
     // Encoder
     pipeline.addLast("stringEncoder", new StringEncoder(CharsetUtil.UTF_8));
     
    and then you can use a String instead of a ByteBuf as a message:
     void channelRead(ChannelHandlerContext ctx, String msg) {
         ch.write("Did you say '" + msg + "'?\n");
     }
    
    

    应用:

    public class Server {
    
    //    private Logger logger = LoggerFactory.getLogger(getClass());
    
        public void bind(int port) throws Exception {
    
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
    //                    .option(ChannelOption.SO_BACKLOG, 1024)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
    
                                ChannelPipeline p = ch.pipeline();
                                p.addLast(new LineBasedFrameDecoder(1024));
                                p.addLast(new StringDecoder());
    //                            p.addLast(new StringEncoder());
    
                                p.addLast(new ServerHandler());
                            }
                        });
    
                // Bind and start to accept incoming connections.
                ChannelFuture f = b.bind(port).sync(); // (7)
    
                System.out.println("server bind port:{}    "+ + port);
    
                // Wait until the server socket is closed.
                f.channel().closeFuture().sync();
    
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
    
            new Server().bind(9988);
        }
    }
    
    public class ServerHandler extends ChannelInboundHandlerAdapter {
    //    private Logger logger = LoggerFactory.getLogger(getClass());
    
        private int count = 0;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
            String body = (String) msg;
            System.out.println("");
            System.out.println("============package=====================");
            System.out.println(body);
            System.out.println("============package=====================");
            System.out.println("");
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    
    }
    
    
    public class ClientHandler extends ChannelInboundHandlerAdapter {
    
        private int count =0;
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // Send the message to Server
            ByteBuf buf = null;
            for(int i=0; i<100; i++){
    
                String msg = "hello from client "+i + "\n";
                byte[] r = msg.getBytes();
                buf = Unpooled.buffer(r.length);
                buf.writeBytes(r);
               ctx.writeAndFlush(buf);
            }
    
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg)
                throws Exception {
            String body = (String) msg;
            count++;
            System.out.println("client read msg:{}, count:{}   " + body + "    " + count);
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
                throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    
    
    
    public class Client {
    
        public void connect(String host, int port) throws InterruptedException {
    
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group)
                        .channel(NioSocketChannel.class)
    //                    .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
    
                                ChannelPipeline p = ch.pipeline();
                                p.addLast(new LineBasedFrameDecoder(1024));
                                p.addLast(new StringDecoder());
    //                            p.addLast(new StringEncoder());
    
                                p.addLast(new ClientHandler());
                            }
                        });
    
                ChannelFuture future = b.connect(host, port).sync();
    
                future.channel().closeFuture().sync();
            } finally {
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) throws Exception {
    
            new Client().connect("localhost",9988);
        }
    
    
    }
    
    
    
    
    展开全文
  • TCP拆包粘包问题

    2018-12-08 11:59:00
    对于刚接触TCP网络编程的人有时候碰到一些问题,比如当客服端发送一串消息到服务端,服务端...这就是今天要讲的TCP拆包粘包现象。 拆包粘包产生的原因 我们可以通过以下图进行说明 1.图一是正常的情况下包的发送和...

    对于刚接触TCP网络编程的人有时候碰到一些问题,比如当客服端发送一串消息到服务端,服务端只收到消息的一半,或者当连续发送两个消息到服务端,服务端同时收到这两个消息但无法解析。这就是今天要讲的TCP拆包粘包现象。

    拆包粘包产生的原因

    我们可以通过以下图进行说明

    1.图一是正常的情况下包的发送和接受,客户端发送p1,p2包,服务端先后接受到p1,p2包,没有发生粘包和拆包。

    2.图二是发生了拆包的现象。客户端发送p1,p2包,客户端对p1拆包分成p1_1和p1_2,服务端先后收到p1_1,p1_2和p2包。 拆包发生原因分2种情况:

    • (1)发送的数据大于套接字缓冲区剩余大小。
    • (2)发送的数据大于MTU(最大传输单元)大小。

    在TCP通讯协议中TCP的每个包的头的长度都是固定的,总长度不能超过MTU(最大传输单元),且数据长度不能超过MSS(MSS=MTU-20bytes(IP包头)-20bytes(TCP包头))。如果超过了MTU系统会进行拆包处理。以图二举个例子:

    • (1)假设MTU设置的长度为1500bytes则MSS为1460bytes。
    • (2)客户端发送了p1包数据大小2000bytes。
    • (3)系统判断总长度超过了MTU大小,需要拆包处理。
    • (4)拆成2个包p1_1和p1_2,p1_1的总长度=1460+20+20=1500,p1_2的总长度=2000-1460+20+20=580。
    • (5)发送包p1_1和包p1_2。

    3.图三是发生了粘包的现象。客户端发送p1,p2包,p1,p2包到达接收端的缓存,服务端应用读取缓存时无法区分p1,p2各自的大小。因为在TCP通讯协议中TCP是面向流的,包和包之间没有界限。粘包可发生在发送端也可发生在接收端以图三各举例子:

    • (1)发送端原因导致的粘包,客户端在发送p1包时,先将p1包放入发送缓存,由于Nagle算法判断其发送的可用数据(去头数据)过小等待一小段时间,这时又发送了p2包,系统将p1和p2合成一个大包发送给服务端。服务端读到大包,无法区分p1和p2包。
    • (2)接收端原因导致的粘包,服务端缓存接收到客户端发送的p1包,服务端应用未能及时读取缓存,此时服务端缓存又接收到客户端发送的p2包,服务端应用读取缓存,无法区分p1和p2包。

    解决方案

    无论拆包还是粘包本质问题都是无法区分包界限,解决包界限的问题主要有以下几种方式:

    • (1)消息数据的定长,比如定长100字节,不足补空格,接收方收到后解析100字节数据即为完整数据。但这样的做的缺点是浪费了部分存储空间和带宽。
    • (2)消息数据使用特定分割符区分界限,比如使用换号符号做分割。
    • (3)把消息数据分成消息头和消息体,消息头带消息的长度,接收方收到后根据消息头中的长度解析数据。

    在实际开发中很多网络框架对TCP拆包粘包问题的解决做了很多支持,比如netty中LineBasedFrameDecoder解析器就是利用换号符号做分割。

    转载于:https://my.oschina.net/u/945573/blog/2982666

    展开全文
  • Dubbo处理TCP拆包粘包问题 在TCP网络传输工程中,由于TCP包的缓存大小限制,每次请求数据有可能不在一个TCP包里面,或者也可能多个请求的数据在一个TCP包里面。那么如果合理的decode接受的TCP数据很重要,需要考虑...

    Dubbo处理TCP拆包粘包问题

    在TCP网络传输工程中,由于TCP包的缓存大小限制,每次请求数据有可能不在一个TCP包里面,或者也可能多个请求的数据在一个TCP包里面。那么如果合理的decode接受的TCP数据很重要,需要考虑TCP拆包和粘包的问题。我们知道在Netty提供了各种Decoder来解决此类问题,比如LineBasedFrameDecoder,LengthFieldBasedFrameDecoder等等,但是这些都是处理一些通用简单的协议栈,并不能处理高度自定义的协议栈。由于dubbo协议是自定义协议栈,并且包含消息头和消息体两部分,而消息头中包含消息类型、协议版本、协议魔数以及payload长度等信息。所以使用Netty自带的处理方案可能无法满足Dubbo解析自身协议的需求,所以需要Dubbo自己来处理,那自己处理,就需要自己处理TCP的拆包和粘包的问题。这里就对Dubbo处理此类问题进行探讨,从而加深自己对它的理解。

    说明

    此处所描述的协议是dubbo协议,其他的协议比如http,webservice等协议不是这里讨论范围。并且这里使用的通信框架以Netty来讲解,Mina以及grizzly也不在种类讨论范围。

    NettyCodecAdapter

    NettyCodecAdapter是对dubbo协议解析的入口,里面包含decoder和encoder两部分,而TCP的拆包和粘包主要是decoder部分,所以encoder这里不进行讨论。在NettyCodecAdapter中的decoder是由InternalDecoder来实现,它的父类是Netty的SimpleChannelUpstreamHandler可以接受所有inbound消息,那么就可以对接受的消息进行decode。这里需要说明一下对于某一个Channel都有一个私有的InternalDecoder对象,并不是和其他的Channel共享,这里就避免了并发问题,所以在InternalDecoder里面可以用单线程的方式去看待,这样就比较容易理解。

    InternalDecoder

    每个channel的inbound消息都会发送到InternalDecodermessageReceived方法,而dubbo会先将接受的消息缓存到InternalDecoderbuffer属性中,这个变量很重要,后面会讨论。下面是messageReceived方法中将接受的消息负载到buffer实现。

          private class InternalDecoder extends SimpleChannelUpstreamHandler {
    
            private com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
                com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
    
            @Override
            public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
                Object o = event.getMessage();
                if (! (o instanceof ChannelBuffer)) {
                    ctx.sendUpstream(event);
                    return;
                }
    
                ChannelBuffer input = (ChannelBuffer) o;
                int readable = input.readableBytes();
                if (readable <= 0) {
                    return;
                }
    
                com.alibaba.dubbo.remoting.buffer.ChannelBuffer message;
                if (buffer.readable()) {
                    if (buffer instanceof DynamicChannelBuffer) {
                        buffer.writeBytes(input.toByteBuffer());
                        message = buffer;
                    } else {
                        int size = buffer.readableBytes() + input.readableBytes();
                        message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(
                            size > bufferSize ? size : bufferSize);
                        message.writeBytes(buffer, buffer.readableBytes());
                        message.writeBytes(input.toByteBuffer());
                    }
                } else {
                    message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer(
                        input.toByteBuffer());
                }
    
                NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
                Object msg;
                int saveReaderIndex;
    
                try {
                    // decode object.
                    do {
                        saveReaderIndex = message.readerIndex();
                        try {
                            msg = codec.decode(channel, message);
                        } catch (IOException e) {
                            buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                            throw e;
                        }
                        if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                            message.readerIndex(saveReaderIndex);
                            break;
                        } else {
                            if (saveReaderIndex == message.readerIndex()) {
                                buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                                throw new IOException("Decode without read data.");
                            }
                            if (msg != null) {
                                Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());
                            }
                        }
                    } while (message.readable());
                } finally {
                    if (message.readable()) {
                        message.discardReadBytes();
                        buffer = message;
                    } else {
                        buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                    }
                    NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
                }
            }
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
                ctx.sendUpstream(e);
            }
        }
    

    首先是判断当前decoder对象的buffer中是否有可以读取的消息,如果有则进行合并,并且把对象引用赋予message局部变量,所以message则获取了当前channel的inbound消息。得到inbound消息之后,那么接下来就是对协议的解析了。

         do {
                        saveReaderIndex = message.readerIndex();
                        try {
                            msg = codec.decode(channel, message);
                        } catch (IOException e) {
                            buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                            throw e;
                        }
                        if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
                            message.readerIndex(saveReaderIndex);
                            break;
                        } else {
                            if (saveReaderIndex == message.readerIndex()) {
                                buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
                                throw new IOException("Decode without read data.");
                            }
                            if (msg != null) {
                                Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());
                            }
                        }
                    } while (message.readable());
    

    这里首先要做的是把当前message的读索引保存到局部变量saveReaderIndex中,用于后面的消息回滚。后面紧接着是对消息的decode,这里的codecDubboCountCodec对象实体,这里需要注意一点,DubboCountCodecdecode每次只会解析出一个完整的dubbo协议栈,带着这个看看decode的实现。

        public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
            int save = buffer.readerIndex();
            MultiMessage result = MultiMessage.create();
            do {
                Object obj = codec.decode(channel, buffer);
                if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
                    buffer.readerIndex(save);
                    break;
                } else {
                    result.addMessage(obj);
                    logMessageLength(obj, buffer.readerIndex() - save);
                    save = buffer.readerIndex();
                }
            } while (true);
            if (result.isEmpty()) {
                return Codec2.DecodeResult.NEED_MORE_INPUT;
            }
            if (result.size() == 1) {
                return result.get(0);
            }
            return result;
        }
    

    这里暂存了当前buffer的读索引,同样也是为了后面的回滚。可以看到当decode返回的是NEED_MORE_INPUT则表示当前的buffer中数据不足,不能完整解析出一个dubbo协议栈,同时将buffer的读索引回滚到之前暂存的索引并且退出循环,将结果返回。那接下来看看什么时候会返回NEED_MORE_INPUT,最终会定位到在ExchangeCodecdecode方法会解析出协议栈。

        protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
            // check magic number.
            if (readable > 0 && header[0] != MAGIC_HIGH 
                    || readable > 1 && header[1] != MAGIC_LOW) {
                int length = header.length;
                if (header.length < readable) {
                    header = Bytes.copyOf(header, readable);
                    buffer.readBytes(header, length, readable - length);
                }
                for (int i = 1; i < header.length - 1; i ++) {
                    if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                        buffer.readerIndex(buffer.readerIndex() - header.length + i);
                        header = Bytes.copyOf(header, i);
                        break;
                    }
                }
                return super.decode(channel, buffer, readable, header);
            }
            // check length.
            if (readable < HEADER_LENGTH) {
                return DecodeResult.NEED_MORE_INPUT;
            }
    
            // get data length.
            int len = Bytes.bytes2int(header, 12);
            checkPayload(channel, len);
    
            int tt = len + HEADER_LENGTH;
            if( readable < tt ) {
                return DecodeResult.NEED_MORE_INPUT;
            }
    
            // limit input stream.
            ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
    
            try {
                return decodeBody(channel, is, header);
            } finally {
                if (is.available() > 0) {
                    try {
                        if (logger.isWarnEnabled()) {
                            logger.warn("Skip input stream " + is.available());
                        }
                        StreamUtils.skipUnusedStream(is);
                    } catch (IOException e) {
                        logger.warn(e.getMessage(), e);
                    }
                }
            }
        }
    

    这个方法开始是对telnet协议进行解析(由于dubbo支持telnet连接,所以这里提供了支持,可以忽略这一部分)。看到会有两个地方返回NEED_MORE_INPUT,一个是当前buffer的可读长度还没有消息头长,说明当前buffer连协议栈的头都不完整,所以需要继续读取inbound数据,另一个是当前buffer包含了完整的消息头,便可以得到payload的长度,发现它的可读的长度,并没有包含整个协议栈的数据,所以也需要继续读取inbound数据。如果上面两个情况都不复核,那么说明当前的buffer至少包含一个dubbo协议栈的数据,那么从当前buffer中读取一个dubbo协议栈的数据,解析出一个dubbo数据,当然这里可能读取完一个dubbo数据之后还会有剩余的数据。

    上面对dubbo解析出一个完整的dubbo协议栈过程进行了讨论,但是还没有对TCP的拆包和粘包问题做过多的讨论。下面结合上面内容做一个综合讨论。

    我这里对TCP拆包和粘包分别列举一个场景来讨论。

    当反生TCP拆包问题时候

    这里假设之前还没有发生过任何数据交互,系统刚刚初始化好,那么这个时候在InternalDecoder里面的buffer属性会是EMPTY_BUFFER。当发生第一次inbound数据的时候,第一次在InternalDecoder里面接收的肯定是dubbo消息头的部分(这个由TCP协议保证),由于发生了拆包情况,那么此时接收的inbound消息可能存在一下几种情况

    1、当前inbound消息只包含dubbo协议头的一部分

    2、当前inbound消息只包含dubbo的协议头

    3、当前inbound消息只包含dubbo消息头和部分payload消息

    通过上面的讨论,我们知道发生上面三种情况,都会触发ExchangeCodec返回NEED_MORE_INPUT,由于在DubboCountCodec对余返回NEED_MORE_INPUT会回滚读索引,所以此时的buffer里面的数据可以当作并没有发生过读取操作,并且DubboCountCodec的decode也会返回NEED_MORE_INPUT,在InternalDecoder对于当判断返回NEED_MORE_INPUT,也会进行读索引回滚,并且退出循环,最后会执行finally内容,这里会判断inbound消息是否还有可读的,由于在DubboCountCodec里面进行了读索引回滚,所以次数的buffer里面是完整的inbound消息,等待第二次的inbound消息的到来,当第二次inbound消息过来的时候,再次经过上面的判断。

    当发生TCP粘包的时候

    当发生粘包的时候是tcp将一个以上的dubbo协议栈放在一个tcp包中,那么有可能发生下面几种情况

    1、当前inbound消息只包含一个dubbo协议栈

    2、当前inbound消息包含一个dubbo协议栈,同时包含部分另一个或者多个dubbo协议栈内容

    如果发生只包含一个协议栈,那么当前buffer通过ExchangeCodec解析协议之后,当前的buffer的readeIndex位置应该是buffer尾部,那么在返回到InternalDecodermessage的方法readable返回的是false,那么就会对buffer重新赋予EMPTY_BUFFER实体,而针对包含一个以上的dubbo协议栈,当然也会解析出其中一个dubbo协议栈,但是经过ExchangeCodec解析之后,message的readIndex不在message尾部,所以messagereadable方法返回的是true。那么则会继续遍历message,读取下面的信息。最终要么message刚好整数倍包含完整的dubbo协议栈,要不ExchangeCodec返回NEED_MORE_INPUT,最后将未读完的数据缓存到buffer中,等待下次inbound事件,将buffer中的消息合并到下次的inbound消息中,种类又回到了拆包的问题上。

    总结

    dubbo在处理tcp的粘包和拆包时是借助InternalDecoderbuffer缓存对象来缓存不完整的dubbo协议栈数据,等待下次inbound事件,合并进去。所以说在dubbo中解决TCP拆包和粘包的时候是通过buffer变量来解决的

    源码来源参考:minglisoft.cn/technology欢迎参考交流

    转载于:https://my.oschina.net/u/3427216/blog/879525

    展开全文
  • # 解决tcp拆包,粘包问题 * 方案一:消息定长 * 方案二:在包尾部加上特殊字符进行分割 * 方案三:将消息分为消息头,消息体,类似与自定义协议 此处示例在尾部添加特殊字符解决拆包粘包问题 在 ServerBootstrap的...
    # 解决tcp拆包,粘包问题
    * 方案一:消息定长
    * 方案二:在包尾部加上特殊字符进行分割
    * 方案三:将消息分为消息头,消息体,类似与自定义协议

    此处示例在尾部添加特殊字符解决拆包粘包问题

    在 ServerBootstrap的childHandler方法里面(initChannel)添加 

    // 设置特殊分隔符
    ByteBuf buf = Unpooled.copiedBuffer("$end$".getBytes());
    sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,buf));
    
    // 设置字符串形式的解码
    sc.pipeline().addLast(new StringDecoder());

    算了,直接看代码吧:

    Client.java

    package zzq.neety02;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class Client {
        public static void main(String[] args) throws InterruptedException {
    
            // 创建线程组
            NioEventLoopGroup group = new NioEventLoopGroup();
    
            // 创建辅助工具类
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group) //绑定线程组
                    .channel(NioSocketChannel.class) // 指定 NIO 模式
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            // 设置特殊分隔符
                            ByteBuf buf = Unpooled.copiedBuffer("$end$".getBytes());
                            sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,buf));
    
                            // 设置字符串形式的解码
                            sc.pipeline().addLast(new StringDecoder());
                            sc.pipeline().addLast(new ClientHandler()); // 在这里配置数据接收处理的方法(类)
                        }
                    });
    
            // 绑定连接地址
            ChannelFuture channelFuture = bootstrap.connect("127.0.0.1",8888).sync();
    
            // 向服务端发送消息
            channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer("你好,一加一对于多少$end$".getBytes()));
            channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer("你好,1234$end$".getBytes()));
            channelFuture.channel().writeAndFlush(Unpooled.copiedBuffer("你好,5678$end$".getBytes()));
    
            channelFuture.channel().closeFuture().sync();
            group.shutdownGracefully();
        }
    }
    
    
     

    ClientHandler.java

    package zzq.neety02;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class ClientHandler extends ChannelHandlerAdapter {
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println( cause.toString() );
            System.out.println("出错了...");
            ctx.close();
        }
    
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println( "注册成功..." );
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println( "激活成功..." );
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String resp = (String) msg;
    
            System.out.println(  "收到了消息:" + msg );
        }
    
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            System.out.println("我读完了...");
        }
    }
    

     

    Server.java

    package zzq.neety02;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.DelimiterBasedFrameDecoder;
    import io.netty.handler.codec.string.StringDecoder;
    
    import java.nio.Buffer;
    
    public class Server {
        public static void main(String[] args) throws InterruptedException {
            //1 创建线两个程组
            //一个是用于处理服务器端接收客户端连接的
            //一个是进行网络通信的(网络读写的)
            NioEventLoopGroup pGroup = new NioEventLoopGroup();
            NioEventLoopGroup cGroup = new NioEventLoopGroup();
    
            //2.创建辅助工具类,用于服务器通信的一系列配置
            ServerBootstrap bootstrap = new ServerBootstrap();
            bootstrap.group(pGroup,cGroup) // 绑定两个线程组
            .channel(NioServerSocketChannel.class) // 指定NIO模式
            .option(ChannelOption.SO_KEEPALIVE , true) // 设置保持连接
            .childHandler(new ChannelInitializer<SocketChannel>() {
    
                @Override
                protected void initChannel(SocketChannel sc) throws Exception {
                    // 设置特殊分隔符
                    ByteBuf buf = Unpooled.copiedBuffer("$end$".getBytes());
                    sc.pipeline().addLast(new DelimiterBasedFrameDecoder(1024,buf));
    
                    // 设置字符串形式的解码
                    sc.pipeline().addLast(new StringDecoder());
    
                    // 3. 在这里配置具体的数据处理方法
                    sc.pipeline().addLast(new ServerHandler());
                }
            });
    
            //4 进行绑定
            ChannelFuture cf1 = bootstrap.bind(8888).sync();
            //ChannelFuture cf2 = b.bind(8764).sync();
            //5 等待关闭
            cf1.channel().closeFuture().sync();
            //cf2.channel().closeFuture().sync();
            pGroup.shutdownGracefully();
            cGroup.shutdownGracefully();
        }
    }
    

    ServerHandler.java

    package zzq.neety02;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    import java.util.Base64;
    import java.util.Date;
    
    public class ServerHandler extends ChannelHandlerAdapter {
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            // 出现异常
            System.out.println( cause.toString() );
            ctx.close();
        }
    
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            System.out.println("netty is register...");
        }
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("netty is active...");
        }
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            String request = (String)msg;
            System.out.println("客户端发来消息:"+ request );
            String resp = "服务端响应:收到了你的消息==>" + new Date() + "$end$";
    
            ctx.writeAndFlush( Unpooled.copiedBuffer(resp.getBytes()) );
            System.out.println("返回数据:" + resp );
         }
    
    
    }
    

     

    展开全文
  • Netty自定义TCP拆包解码器 定义开始、结束标识字节数组 这里使用###START###作为开始标识,###STOP###作为结束标识,代码如下: public class CodecConstants { public static final byte[] BEGIN_DELIMITER = ...
  • 1. Netty学习笔记之七:Netty关于TCP拆包和粘包的解决方案 文章目录1. Netty学习笔记之七:Netty关于TCP拆包和粘包的解决方案1.1. Netty编程核心之网络传输TCP粘包拆包1.1.1. 什么是TCP粘包,什么是TCP拆包1.1.2. ...
  • 所以在业务上认为,一个完整的包有可能被tcp拆分成多个包进行发送,也有可能把多个小包封装成一个大的数据包发送,这就是所谓的tcp拆包、粘包问题。 2、如何处理粘包/半包问题? 处理粘包/半包的思路就是找出数据...
  • 这就是今天要讲的TCP拆包粘包现象。 拆包粘包产生的原因 我们可以通过以下图进行说明 1.图一是正常的情况下包的发送和接受,客户端发送p1,p2包,服务端先后接受到p1,p2包,没有发生粘包和拆包。 2.图...
  • 1.TCP拆包、粘包问题 tcp编程,无论是服务器端还是客户端,当我们读取或者发送数据的时候,都需要考虑TCP底层的粘包/拆包机制。TCP是一个“流”协议,所谓流就是没有界限的遗传数据。可以想象下河里的水就好比数据...
  • 什么是TCP拆包、粘包? 在网络通信中,数据在底层都是以字节流形式在流动,那么发送方和接受方理应有一个约定(协议),只有这样接受方才知道需要接受多少数据,哪些数据需要在一起处理;如果没有这个约定,就会出现...
  • TCP拆包和黏包的过程和解决 粘包、拆包解决办法 通过以上分析,我们清楚了粘包或拆包发生的原因,那么如何解决这个问题呢?解决问题的关键在于如何给每个数据包添加边界信息,常用的方法有如下几个:   1、...
  • TCP 拆包分包

    千次阅读 2017-03-01 15:17:47
    对于基于TCP开发的通讯程序,有个很重要的问题需要解决,就是封包和拆包.下面就针对这个问题谈谈我的想法,抛砖引玉.若有不对,不妥之处,恳求大家指正.在此先谢过大家了.   一.为什么基于TCP的通讯程序需要进行封包和...
  • TCP 拆包 ,粘包

    2019-07-02 23:17:20
    所以他会根据当前的套接字缓冲区的情况进行拆包或是粘包。 下图展示了一个 TCP 协议传输的过程: 发送端的字节流都会先传入缓冲区,再通过网络传入到接收端的缓冲区中,最终由接收端获取。 当我们发送两个完整包...
  • TCP拆包和粘包问题

    2020-07-08 10:50:33
    我们日常的网络应用开发大都在传输层进行,由于UDP有消息保护边界,不会发生粘包拆包问题,因此粘包拆包问题只发生在TCP协议中。 1. 什么是粘包、拆包? 假设客户端向服务端连续发送了两个数据包,用packet1和packet...
  • TCP 拆包、粘包

    2018-09-28 15:10:00
    2.收到1个数据包,TCP把2个数据包合成1个发送给接收端了,这样应用层不能处理合成1个的两个数据包,应用层不知道两个数据包之间的分隔在哪,所以很难处理,这是粘包问题; 3.收到2个数据包,但1个数据包产生了粘包...
  • Netty之Tcp拆包粘包

    2018-04-19 11:07:35
    1、TCP粘包、拆包问题 1.1TCP粘包/拆包问题 TCP是一个“流”协议,所谓流,就是没有界限的一长串二进制数据。TCP作为传输层协议并不不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行数据包的划分,...
  • netty tcp拆包

    2017-08-03 08:42:00
    private List&lt;byte[]&gt; getCompletePacket(byte[] bytes, ByteBuf byteBuf) { byte[] clone = bytes.clone(); int i = 0; List&lt;byte[]&gt; ret = Lists.newArrayList();... ...
  • tcp是个流协议,所谓流,就是没有界限的一串数据。tcp底层并不了解上层...这就是所谓的tcp拆包/粘包问题。问题发生的原因有3个: 1,应用程序write写入的字节大小大于套接口发送缓冲区大小 2,进行MSS大小的tcp分段
  • tcp拆包与封包

    2019-08-01 00:01:32
    1)笔记 1)2个字节的头表示有多长,接着发送数据体 2)超过2个字节,即超过65535,则分为多个包发送 3)tcp package-->tp
  • <div><p>我仔细看了您流媒体服务器中HTTP中关于TCP合包的代码,但是无论才疏学浅,没有很明白,希望作者可以大概讲解下TCP处理拆包粘包的思路,现在基于ZLToolKit要实现一个大文件收发,出现拆包粘包问题. 另外我现在需要...
  • 3.1 TCP粘包拆包的问题TCP是一个“流”协议,所谓流就是没有界限的遗传数据。TCP底层并不知道上层的业务数据的具体含义,他会根据TCP缓冲区的实际情况进行包的划分,也就是在业务上,我们一个完整的包可能会被TCP...
  • go语言处理TCP拆包/粘包

    千次阅读 2019-02-17 09:46:59
    part 1 最近在学习go自带的rpc,看完了一遍想着自己实现一个codec,...而这个过程中就需要处理TCP拆包粘包了。 TCP拆包/粘包也算是网络编程中一个比较基础的问题了,具体的问题含义和解决方式也不再详细描述了。...
  • 5.Netty之TCP拆包粘包

    2019-05-04 23:17:09
    TCP网络传输之拆包粘包。 产生原因 熟悉tcp的都知道内部有个滑动窗口协议,分组以及限流。 数据包大于缓冲区 MSS大小的TCP分段 以太网payload大于MTU进行IP分片 解决方案 fixed length split head body ect...
  • 文章目录1. 拆包和粘包的概念2. 产生拆包和粘包的原因3. 解决方案 1. 拆包和粘包的概念 2. 产生拆包和粘包的原因 3. 解决方案
  • 官网解释 方法一 创建一个ByteBuf,等到所有的数据都接受到以后再进行业务操作。 AChannelHandler有两个生命周期侦听器方法:handlerAdded()和handlerRemoved()。您可以执行任意(反)初始化任务,只要它不会...

空空如也

1 2 3 4 5 ... 20
收藏数 1,783
精华内容 713
关键字:

tcp拆包