精华内容
下载资源
问答
  • Netty bytebuf 源码解析

    2017-09-29 10:10:15
    ByteBufHolder 封装一个ByteBuf ,添加其他的辅助方法,例如http 消息体 CompositeByteBuf 将多个ByteBuf组合在一起,相当于视图的功能。 ByteBufAllocator 字节缓冲分配器 (两种基于内存池和普通)

    java nio bytebuffer 和 bytebuf的对比
    1. 原生的类长度固定,不能动态扩容和收缩
    2. 只有一个读写标志位 position, 操作不灵活
    3. API较少,一些适用的操作不支持。

    这里写图片描述

    ByteBuf 实现类的分配
    按内存分配看,
    1. 堆内存, 内存分配和回收较快, Socket I/O 读写需要内存复制,会变慢
    2. 直接内存。对应上面则较慢, 较快
    建议: I/O 通信线程的读写缓冲区使用DirectByteBuf。 后端业务消息的编解码块使用HeapByteBuf.
    从内存回收看
    1. 基于对象池的ByteBuf. 高负载,大并发
    2. 普通ByteBuf。

    writeBytes(ByteBuf src, int srcIndex, int length)
    扩容方式: 固定阈值是4mb, 如果大于4M 就采用步长的增长方式
    小于4m的就用倍数的增长方式。步长和倍数两种方式混合使用,目的就是为了有效利用空间。

    UnpooledHeapByteBuf 基于堆,没有基于对象池技术实现,每一次I/O 读写都会创建一个新的

    PooledDirectByteBuf 对象池技术 与UnPo….不同的是内存分配策略不太一样。
    一次性申请一大块内存,这样就不需要频繁地申请和释放内存了。

    chunk 16个page, 一个page 4个字节, 二叉树。
    page
    PoolSubpage page中的内存分配,取决于第一块的内存大小。

    ByteBuf 辅助类
    ByteBufHolder 封装一个ByteBuf ,添加其他的辅助方法,例如http 消息体
    CompositeByteBuf 将多个ByteBuf组合在一起,相当于视图的功能。
    ByteBufAllocator 字节缓冲分配器 (两种基于内存池和普通)

    展开全文
  • Netty3架构解析

    2016-05-17 17:27:57
    这次趁着休假重新捡起这个硬骨头,因为Netty3现在还在被很多项目使用,因而这次决定先从Netty3入手,瞬间发现Netty3的代码比Netty4中规中矩的多,很多概念在代码本身中都有清晰的表达,所以半天就把整个框架...

    前记

    很早以前就有读Netty源码的打算了,然而第一次尝试的时候从Netty4开始,一直抓不到核心的框架流程,后来因为其他事情忙着就放下了。这次趁着休假重新捡起这个硬骨头,因为Netty3现在还在被很多项目使用,因而这次决定先从Netty3入手,瞬间发现Netty3的代码比Netty4中规中矩的多,很多概念在代码本身中都有清晰的表达,所以半天就把整个框架的骨架搞清楚了。再读 Netty4对Netty3的改进总结 ,回去读Netty4的源码,反而觉得轻松了,一种豁然开朗的感觉。

    记得去年读Jetty源码的时候,因为代码太庞大,并且自己的HTTP Server的了解太少,因而只能自底向上的一个一个模块的叠加,直到最后把所以的模块连接在一起而看清它的真正核心骨架。现在读源码,开始习惯先把骨架理清,然后延伸到不同的器官、血肉而看清整个人体。

    本文从Reactor模式在Netty3中的应用,引出Netty3的整体架构以及控制流程;然而除了Reactor模式,Netty3还在ChannelPipeline中使用了 Intercepting Filter 模式,这个模式也在Servlet的Filter中成功使用,因而本文还会从Intercepting Filter模式出发详细介绍ChannelPipeline的设计理念。本文假设读者已经对Netty有一定的了解,因而不会包含过多入门介绍,以及帮Netty做宣传的文字。

    Netty3中的Reactor模式

    Reactor模式在Netty中应用非常成功,因而它也是在Netty中受大肆宣传的模式,关于Reactor模式可以详细参考本人的另一篇文章 《Reactor模式详解》 ,对Reactor模式的实现是Netty3的基本骨架,因而本小节会详细介绍Reactor模式如何应用Netty3中。

    如果读《Reactor模式详解》,我们知道Reactor模式由Handle、Synchronous Event Demultiplexer、Initiation Dispatcher、Event Handler、Concrete Event Handler构成,在Java的实现版本中,Channel对应Handle,Selector对应Synchronous Event Demultiplexer,并且Netty3还使用了两层Reactor:Main Reactor用于处理Client的连接请求,Sub Reactor用于处理和Client连接后的读写请求(关于这个概念还可以参考Doug Lea的这篇PPT: Scalable IO In Java )。所以我们先要解决Netty3中使用什么类实现所有的上述模块并把他们联系在一起的,以NIO实现方式为例:

    模式是一种抽象,但是在实现中,经常会因为语言特性、框架和性能需要而做一些改变,因而Netty3对Reactor模式的实现有一套自己的设计:
    1. ChannelEvent: Reactor是基于事件编程的,因而在Netty3中使用ChannelEvent抽象的表达Netty3内部可以产生的各种事件,所有这些事件对象在Channels帮助类中产生,并且由它将事件推入到ChannelPipeline中,ChannelPipeline构建ChannelHandler管道,ChannelEvent流经这个管道实现所有的业务逻辑处理。ChannelEvent对应的事件有:ChannelStateEvent表示Channel状态的变化事件,而如果当前Channel存在Parent Channel,则该事件还会传递到Parent Channel的ChannelPipeline中,如OPEN、BOUND、CONNECTED、INTEREST_OPS等,该事件可以在各种不同实现的Channel、ChannelSink中产生;MessageEvent表示从Socket中读取数据完成、需要向Socket写数据或ChannelHandler对当前Message解析(如Decoder、Encoder)后触发的事件,它由NioWorker、需要对Message做进一步处理的ChannelHandler产生;WriteCompletionEvent表示写完成而触发的事件,它由NioWorker产生;ExceptionEvent表示在处理过程中出现的Exception,它可以发生在各个构件中,如Channel、ChannelSink、NioWorker、ChannelHandler中;IdleStateEvent由IdleStateHandler触发,这也是一个ChannelEvent可以无缝扩展的例子。注:在Netty4后,已经没有ChannelEvent类,所有不同事件都用对应方法表达,这也意味这ChannelEvent不可扩展,Netty4采用在ChannelInboundHandler中加入userEventTriggered()方法来实现这种扩展,具体可以参考 这里
    2. ChannelHandler: 在Netty3中,ChannelHandler用于表示Reactor模式中的EventHandler。ChannelHandler只是一个标记接口,它有两个子接口:ChannelDownstreamHandler和ChannelUpstreamHandler,其中ChannelDownstreamHandler表示从用户应用程序流向Netty3内部直到向Socket写数据的管道,在Netty4中改名为ChannelOutboundHandler;ChannelUpstreamHandler表示数据从Socket进入Netty3内部向用户应用程序做数据处理的管道,在Netty4中改名为ChannelInboundHandler。
    3. ChannelPipeline: 用于管理ChannelHandler的管道,每个Channel一个ChannelPipeline实例,可以运行过程中动态的向这个管道中添加、删除ChannelHandler(由于实现的限制,在最末端的ChannelHandler向后添加或删除ChannelHandler不一定在当前执行流程中起效,参考 这里 )。ChannelPipeline内部维护一个ChannelHandler的双向链表,它以Upstream(Inbound)方向为正向,Downstream(Outbound)方向为方向。ChannelPipeline采用Intercepting Filter模式实现,具体可以参考 这里 ,这个模式的实现在后一节中还是详细介绍。
    4. NioSelector: Netty3使用NioSelector来存放Selector(Synchronous Event Demultiplexer),每个新产生的NIO Channel都向这个Selector注册自己以让这个Selector监听这个NIO Channel中发生的事件,当事件发生时,调用帮助类Channels中的方法生成ChannelEvent实例,将该事件发送到这个Netty Channel对应的ChannelPipeline中,而交给各级ChannelHandler处理。其中在向Selector注册NIO Channel时,Netty Channel实例以Attachment的形式传入,该Netty Channel在其内部的NIO Channel事件发生时,会以Attachment的形式存在于SelectionKey中,因而每个事件可以直接从这个Attachment中获取相关链的Netty Channel,并从Netty Channel中获取与之相关联的ChannelPipeline,这个实现和Doug Lea的 Scalable IO In Java 一模一样。另外Netty3还采用了 Scalable IO In Java 中相同的Main Reactor和Sub Reactor设计,其中NioSelector的两个实现:Boss即为Main Reactor,NioWorker为Sub Reactor。Boss用来处理新连接加入的事件,NioWorker用来处理各个连接对Socket的读写事件,其中Boss通过NioWorkerPool获取NioWorker实例,Netty3模式使用RoundRobin方式放回NioWorker实例。更形象一点的,可以通过 Scalable IO In Java 的这张图表达:

    若与Ractor模式对应,NioSelector中包含了Synchronous Event Demultiplexer,而ChannelPipeline中管理着所有EventHandler,因而NioSelector和ChannelPipeline共同构成了Initiation Dispatcher。
    5. ChannelSink: 在ChannelHandler处理完成所有逻辑需要向客户端写响应数据时,一般会调用Netty Channel中的write方法,然而在这个write方法实现中,它不是直接向其内部的Socket写数据,而是交给Channels帮助类,内部创建DownstreamMessageEvent,反向从ChannelPipeline的管道中流过去,直到第一个ChannelHandler处理完毕,最后交给ChannelSink处理,以避免阻塞写而影响程序的吞吐量。ChannelSink将这个MessageEvent提交给Netty Channel中的writeBufferQueue,最后NioWorker会等到这个NIO Channel已经可以处理写事件时无阻塞的向这个NIO Channel写数据。这就是上图的send是从SubReactor直接出发的原因。
    6. Channel: Netty有自己的Channel抽象,它是一个资源的容器,包含了所有一个连接涉及到的所有资源的饮用,如封装NIO Channel、ChannelPipeline、Boss、NioWorkerPool等。另外它还提供了向内部NIO Channel写响应数据的接口write、连接/绑定到某个地址的connect/bind接口等,个人感觉虽然对Channel本身来说,因为它封装了NIO Channel,因而这些接口定义在这里是合理的,但是如果考虑到Netty的架构,它的Channel只是一个资源容器,有这个Channel实例就可以得到和它相关的基本所有资源,因而这种write、connect、bind动作不应该再由它负责,而是应该由其他类来负责,比如在Netty4中就在ChannelHandlerContext添加了write方法,虽然netty4并没有删除Channel中的write接口。

    Netty3中的Intercepting Filter模式

    如果说Reactor模式是Netty3的骨架,那么Intercepting Filter模式则是Netty的中枢。Reactor模式主要应用在Netty3的内部实现,它是Netty3具有良好性能的基础,而Intercepting Filter模式则是ChannelHandler组合实现一个应用程序逻辑的基础,只有很好的理解了这个模式才能使用好Netty,甚至能得心应手。

    关于Intercepting Filter模式的详细介绍可以参考 这里 ,本节主要介绍Netty3中对Intercepting Filter模式的实现,其实就是DefaultChannelPipeline对Intercepting Filter模式的实现。在上文有提到Netty3的ChannelPipeline是ChannelHandler的容器,用于存储与管理ChannelHandler,同时它在Netty3中也起到桥梁的作用,即它是连接Netty3内部到所有ChannelHandler的桥梁。作为ChannelPipeline的实现者DefaultChannelPipeline,它使用一个ChannelHandler的双向链表来存储,以DefaultChannelPipelineContext作为节点:
    public   interface  ChannelHandlerContext {
        Channel getChannel();

        ChannelPipeline getPipeline();

        String getName();

        ChannelHandler getHandler();

        
    boolean  canHandleUpstream();
        
    boolean  canHandleDownstream();
        
    void  sendUpstream(ChannelEvent e);
        
    void  sendDownstream(ChannelEvent e);
        Object getAttachment();

        
    void  setAttachment(Object attachment);
    }

    private   final   class  DefaultChannelHandlerContext  implements  ChannelHandlerContext {
        
    volatile  DefaultChannelHandlerContext next;
        
    volatile  DefaultChannelHandlerContext prev;
        
    private   final  String name;
        
    private   final  ChannelHandler handler;
        
    private   final   boolean  canHandleUpstream;
        
    private   final   boolean  canHandleDownstream;
        
    private   volatile  Object attachment;
    .....
    }
    在DefaultChannelPipeline中,它存储了和当前ChannelPipeline相关联的Channel、ChannelSink以及ChannelHandler链表的head、tail,所有ChannelEvent通过sendUpstream、sendDownstream为入口流经整个链表:
    public   class  DefaultChannelPipeline  implements  ChannelPipeline {
        
    private   volatile  Channel channel;
        
    private   volatile  ChannelSink sink;
        
    private   volatile  DefaultChannelHandlerContext head;
        
    private   volatile  DefaultChannelHandlerContext tail;
    ......
        
    public   void  sendUpstream(ChannelEvent e) {
            DefaultChannelHandlerContext head 
    =  getActualUpstreamContext( this .head);
            
    if  (head  ==   null ) {
                
    return ;
            }
            sendUpstream(head, e);
        }

        
    void  sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
            
    try  {
                ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
            } 
    catch  (Throwable t) {
                notifyHandlerException(e, t);
            }
        }

        
    public   void  sendDownstream(ChannelEvent e) {
            DefaultChannelHandlerContext tail 
    =  getActualDownstreamContext( this .tail);
            
    if  (tail  ==   null ) {
                
    try  {
                    getSink().eventSunk(
    this , e);
                    
    return ;
                } 
    catch  (Throwable t) {
                    notifyHandlerException(e, t);
                    
    return ;
                }
            }
            sendDownstream(tail, e);
        }

        
    void  sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
            
    if  (e  instanceof  UpstreamMessageEvent) {
                
    throw   new  IllegalArgumentException( " cannot send an upstream event to downstream " );
            }
            
    try  {
                ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
            } 
    catch  (Throwable t) {
                e.getFuture().setFailure(t);
                notifyHandlerException(e, t);
            }
        }
    对Upstream事件,向后找到所有实现了ChannelUpstreamHandler接口的ChannelHandler组成链( getActualUpstreamContext()) ,而对Downstream事件,向前找到所有实现了ChannelDownstreamHandler接口的ChannelHandler组成链( getActualDownstreamContext() ):
         private  DefaultChannelHandlerContext getActualUpstreamContext(DefaultChannelHandlerContext ctx) {
            
    if  (ctx  ==   null ) {
                
    return   null ;
            }
            DefaultChannelHandlerContext realCtx 
    =  ctx;
            
    while  ( ! realCtx.canHandleUpstream()) {
                realCtx 
    =  realCtx.next;
                
    if  (realCtx  ==   null ) {
                    
    return   null ;
                }
            }
            
    return  realCtx;
        }
        
    private  DefaultChannelHandlerContext getActualDownstreamContext(DefaultChannelHandlerContext ctx) {
            
    if  (ctx  ==   null ) {
                
    return   null ;
            }
            DefaultChannelHandlerContext realCtx 
    =  ctx;
            
    while  ( ! realCtx.canHandleDownstream()) {
                realCtx 
    =  realCtx.prev;
                
    if  (realCtx  ==   null ) {
                    
    return   null ;
                }
            }
            
    return  realCtx;
        }
    在实际实现ChannelUpstreamHandler或ChannelDownstreamHandler时,调用 ChannelHandlerContext中的sendUpstream或sendDownstream方法将控制流程交给下一个 ChannelUpstreamHandler或下一个ChannelDownstreamHandler,或调用Channel中的write方法发送 响应消息。
    public   class  MyChannelUpstreamHandler  implements  ChannelUpstreamHandler {
        
    public   void  handleUpstream(ChannelHandlerContext ctx, ChannelEvent e)  throws  Exception {
            
    //  handle current logic, use Channel to write response if needed.
            
    //  ctx.getChannel().write(message);
            ctx.sendUpstream(e);
        }
    }

    public   class  MyChannelDownstreamHandler  implements  ChannelDownstreamHandler {
        
    public   void  handleDownstream(
                ChannelHandlerContext ctx, ChannelEvent e) 
    throws  Exception {
            
    //  handle current logic
            ctx.sendDownstream(e);
        }
    }
    当ChannelHandler向ChannelPipelineContext发送事件时,其内部从当前ChannelPipelineContext节点出发找到下一个ChannelUpstreamHandler或ChannelDownstreamHandler实例,并向其发送ChannelEvent,对于Downstream链,如果到达链尾,则将ChannelEvent发送给ChannelSink:
    public   void  sendDownstream(ChannelEvent e) {
        DefaultChannelHandlerContext prev 
    =  getActualDownstreamContext( this .prev);
        
    if  (prev  ==   null ) {
            
    try  {
                getSink().eventSunk(DefaultChannelPipeline.
    this , e);
            } 
    catch  (Throwable t) {
                notifyHandlerException(e, t);
            }
        } 
    else  {
            DefaultChannelPipeline.
    this .sendDownstream(prev, e);
        }
    }

    public   void  sendUpstream(ChannelEvent e) {
        DefaultChannelHandlerContext next 
    =  getActualUpstreamContext( this .next);
        
    if  (next  !=   null ) {
            DefaultChannelPipeline.
    this .sendUpstream(next, e);
        }
    }
    正是因为这个实现,如果在一个末尾的ChannelUpstreamHandler中先移除自己,在向末尾添加一个新的ChannelUpstreamHandler,它是无效的,因为它的next已经在调用前就固定设置为null了。

    ChannelPipeline作为ChannelHandler的容器,它还提供了各种增、删、改ChannelHandler链表中的方法,而且如果某个ChannelHandler还实现了LifeCycleAwareChannelHandler,则该ChannelHandler在被添加进ChannelPipeline或从中删除时都会得到同志:
    public   interface  LifeCycleAwareChannelHandler  extends  ChannelHandler {
        
    void  beforeAdd(ChannelHandlerContext ctx)  throws  Exception;
        
    void  afterAdd(ChannelHandlerContext ctx)  throws  Exception;
        
    void  beforeRemove(ChannelHandlerContext ctx)  throws  Exception;
        
    void  afterRemove(ChannelHandlerContext ctx)  throws  Exception;
    }

    public   interface  ChannelPipeline {
        
    void  addFirst(String name, ChannelHandler handler);
        
    void  addLast(String name, ChannelHandler handler);
        
    void  addBefore(String baseName, String name, ChannelHandler handler);
        
    void  addAfter(String baseName, String name, ChannelHandler handler);
        
    void  remove(ChannelHandler handler);
        ChannelHandler remove(String name);

        
    < extends  ChannelHandler >  T remove(Class < T >  handlerType);
        ChannelHandler removeFirst();

        ChannelHandler removeLast();

        
    void  replace(ChannelHandler oldHandler, String newName, ChannelHandler newHandler);
        ChannelHandler replace(String oldName, String newName, ChannelHandler newHandler);

        
    < extends  ChannelHandler >  T replace(Class < T >  oldHandlerType, String newName, ChannelHandler newHandler);
        ChannelHandler getFirst();

        ChannelHandler getLast();

        ChannelHandler get(String name);

        
    < extends  ChannelHandler >  T get(Class < T >  handlerType);
        ChannelHandlerContext getContext(ChannelHandler handler);

        ChannelHandlerContext getContext(String name);

        ChannelHandlerContext getContext(Class
    <?   extends  ChannelHandler >  handlerType);
        
    void  sendUpstream(ChannelEvent e);
        
    void  sendDownstream(ChannelEvent e);
        ChannelFuture execute(Runnable task);

        Channel getChannel();

        ChannelSink getSink();

        
    void  attach(Channel channel, ChannelSink sink);
        
    boolean  isAttached();
        List
    < String >  getNames();
        Map
    < String, ChannelHandler >  toMap();
    }

    在DefaultChannelPipeline的ChannelHandler链条的处理流程为:


    展开全文
  • 就像很多标准的架构模式都被各种专用框架所支持一样,常见的数据处理模式往往也是目标实现的很好的候选对象,它可以节省开发人员大量的时间和精力。...例如,如果你正在构建一个基于 Netty 的邮...

    就像很多标准的架构模式都被各种专用框架所支持一样,常见的数据处理模式往往也是目标实现的很好的候选对象,它可以节省开发人员大量的时间和精力。
    当然这也适应于本文的主题:编码和解码,或者数据从一种特定协议的格式到另一种格式的转 换。这些任务将由通常称为编解码器的组件来处理
    Netty 提供了多种组件,简化了为了支持广泛 的协议而创建自定义的编解码器的过程
    例如,如果你正在构建一个基于 Netty 的邮件服务器,那 么你将会发现 Netty 对于编解码器的支持对于实现 POP3、IMAP 和 SMTP 协议来说是多么的宝贵

    0 什么是编解码器

    每个网络应用程序都必须定义

    • 如何解析在两个节点之间来回传输的原始字节
    • 如何将其和目标应用程序的数据格式做相互转换

    这种转换逻辑由编解码器处理,编解码器由编码器和解码器组成,它们每种都可以将字节流从一种格式转换为另一种格式

    那么它们的区别是什么呢?
    如果将消息看作是对于特定的应用程序具有具体含义的结构化的字节序列— 它的数据。那 么编码器是将消息转换为适合于传输的格式(最有可能的就是字节流);而对应的解码器则是将 网络字节流转换回应用程序的消息格式。因此,编码器操作出站数据,而解码器处理入站数据。
    记住这些背景信息,接下来让我们研究一下 Netty 所提供的用于实现这两种组件的类。

    1 Netty解码概述

    1.1 两个问题


    在这一节中,我们将研究 Netty 所提供的解码器类,这些类覆盖了两个不同的用例

    • 将字节解码为消息——ByteToMessageDecoder 和 ReplayingDecoder
    • 将一种消息类型解码为另一种——MessageToMessageDecoder

    因为解码器是负责将入站数据从一种格式转换到另一种格式,所以知道 Netty 的解码器实
    现了 ChannelInboundHandler 也不会让你感到意外
    什么时候会用到解码器呢?很简单:每当需要为 ChannelPipeline 中的下一个 Channel- InboundHandler 转换入站数据时会用到
    此外,得益于ChannelPipeline 的设计,可以将多个解码器连接在一起,以实现任意复杂的转换逻辑,这也是 Netty 是如何支持代码的模块化以及复用的一个很好的例子



    2 抽象解码器ByteToMessageDecoder

    2.1 示例

    将字节解码为消息(或者另一个字节序列)是一项如此常见的任务,以至于 Netty 特地为它提供了一个抽象的基类:ByteToMessageDecoder
    由于你不可能知道远程节点是否会一次性地发送一个完整的消息,所以这个类会对入站数据进行缓冲,直到它准备好处理

    ByteToMessageDecoderAPI

    假设你接收了一个包含简单 int 的字节流,每个 int 都需要被单独处理
    在这种情况下,你需要从入站 ByteBuf中读取每个 int,并将它传递给 ChannelPipeline 中的下一个 ChannelInboundHandler
    为了解码这个字节流,你要扩展 ByteToMessageDecoder类(原子类型的 int 在被添加到 List 中时,会被自动装箱为 Integer)
    ToIntegerDecoder

    每次从入站 ByteBuf 中读取 4 字节,将其解码为一个 int,然后将它添加到一个 List 中
    当没有更多的元素可以被添加到该 List 中时,它的内容将会被发送给下一个 Channel- InboundHandler
    ToIntegerDecoder类扩展了ByteToMessageDecoder

    虽然 ByteToMessageDecoder可以很简单地实现这种模式,但是你可能会发现,在调用 readInt()前不得不验证所输入的 ByteBuf 是否具有足够的数据有点繁琐
    在下一节中, 我们将讨论 ReplayingDecoder,它是一个特殊的解码器,以少量的开销消除了这个步骤

    2.2 源码解析


    解码步骤

    2.2.1 累加字节流

    @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            //基于 ByteBuf 进行解码的,如果不是直接将当前对象向下传播
            if (msg instanceof ByteBuf) {
                CodecOutputList out = CodecOutputList.newInstance();
                try {
                    ByteBuf data = (ByteBuf) msg;
                    //若当前累加器为空,说明是第一次从 IO 流中读取数据
                    first = cumulation == null;
                    if (first) {
                        //第一次会将累加器赋值为刚读进来的 ByteBuf 对象数据
                        cumulation = data;
                    } else {
                        //非第一次,则将当前累加器中的数据和读取进来的数据进行累加
                        cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                    }
                    //调用子类的解码方法去解析
                    callDecode(ctx, cumulation, out);
                } catch (DecoderException e) {
                    throw e;
                } catch (Throwable t) {
                    throw new DecoderException(t);
                } finally {
                    if (cumulation != null && !cumulation.isReadable()) {
                        numReads = 0;
                        cumulation.release();
                        cumulation = null;
                    } else if (++ numReads >= discardAfterReads) {
                        // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                        // See https://github.com/netty/netty/issues/4275
                        numReads = 0;
                        discardSomeReadBytes();
                    }
    
                    int size = out.size();
                    decodeWasNull = !out.insertSinceRecycled();
                    fireChannelRead(ctx, out, size);
                    out.recycle();
                }
            } else {
                ctx.fireChannelRead(msg);
            }
        }复制代码

    其中的cumulator


    看一下这个 MERGE_CUMULATOR

    public static final Cumulator MERGE_CUMULATOR = new Cumulator() {
            @Override
            public ByteBuf cumulate(ByteBufAllocator alloc, ByteBuf cumulation, ByteBuf in) {
                ByteBuf buffer;
                //当前的写指针后移一定字节,若超过最大容量,则进行扩容
                if (cumulation.writerIndex() > cumulation.maxCapacity() - in.readableBytes()
                        || cumulation.refCnt() > 1) {
                    // Expand cumulation (by replace it) when either there is not more room in the buffer
                    // or if the refCnt is greater then 1 which may happen when the user use slice().retain() or
                    // duplicate().retain().
                    //
                    // See:
                    // - https://github.com/netty/netty/issues/2327
                    // - https://github.com/netty/netty/issues/1764
                    buffer = expandCumulation(alloc, cumulation, in.readableBytes());
                } else {
                    buffer = cumulation;
                }
                //将当前数据写到累加器中
                buffer.writeBytes(in);
                //将读进的数据对象释放
                in.release();
                return buffer;
            }
        };复制代码

    2.2.2 调用子类的 decode方法进行解析

    进入该方法查看源码
    protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
            try {
                // 只要累加器有数据,循环就会继续执行下去
                while (in.isReadable()) {
                    int outSize = out.size();
                    // 判断当前list 里是否已经有对象(首次执行时,肯定是不会运行此段代码的)
                    if (outSize > 0) {
                        // 有,则通过事件传播机制向下传播
                        fireChannelRead(ctx, out, outSize);
                        out.clear();
    
                        // Check if this handler was removed before continuing with decoding.
                        // If it was removed, it is not safe to continue to operate on the buffer.
                        //
                        // See:
                        // - https://github.com/netty/netty/issues/4635
                        if (ctx.isRemoved()) {
                            break;
                        }
                        outSize = 0;
                    }
                    // 记录当前可读数据长度
                    int oldInputLength = in.readableBytes();
                    decode(ctx, in, out);
    
                    // Check if this handler was removed before continuing the loop.
                    // If it was removed, it is not safe to continue to operate on the buffer.
                    //
                    // See https://github.com/netty/netty/issues/1664
                    if (ctx.isRemoved()) {
                        break;
                    }
    
                    //说明什么对象都没解析出来
                    if (outSize == out.size()) {
                        if (oldInputLength == in.readableBytes()) {
                            break;
                        } else {
                            continue;
                        }
                    }
    
                    //说明没有从当前累加器中读取数据
                    if (oldInputLength == in.readableBytes()) {
                        throw new DecoderException(
                                StringUtil.simpleClassName(getClass()) +
                                ".decode() did not read anything but decoded a message.");
                    }
    
                    if (isSingleDecode()) {
                        break;
                    }
                }
            } catch (DecoderException e) {
                throw e;
            } catch (Throwable cause) {
                throw new DecoderException(cause);
            }
        }复制代码

    2.2.2 将解析到的 ByteBuf 向下传播

    @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            if (msg instanceof ByteBuf) {
                CodecOutputList out = CodecOutputList.newInstance();
                try {
                    ByteBuf data = (ByteBuf) msg;
                    first = cumulation == null;
                    if (first) {
                        cumulation = data;
                    } else {
                        cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
                    }
                    callDecode(ctx, cumulation, out);
                } catch (DecoderException e) {
                    throw e;
                } catch (Throwable t) {
                    throw new DecoderException(t);
                } finally {
                    if (cumulation != null && !cumulation.isReadable()) {
                        numReads = 0;
                        cumulation.release();
                        cumulation = null;
                    } else if (++ numReads >= discardAfterReads) {
                        // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                        // See https://github.com/netty/netty/issues/4275
                        numReads = 0;
                        discardSomeReadBytes();
                    }
                    // 记录当前 list 的长度
                    int size = out.size();
                    // 将解析到的一个对象向下进行传播
                    decodeWasNull = !out.insertSinceRecycled();
                    fireChannelRead(ctx, out, size);
                    out.recycle();
                }
            } else {
                ctx.fireChannelRead(msg);
            }
        }复制代码

    编解码器中的引用计数

    对于编码器和解码器来说:一旦消息被编码或者解码,它就会被 ReferenceCountUtil.release(message)调用自动释放
    如果你需要保留引用以便稍后使用,那么你可以调用 ReferenceCountUtil.retain(message)这将会增加该引用计数,从而防止该消息被释放

    3 基于固定长度解码器分析

    /**
     * A decoder that splits the received {@link ByteBuf}s by the fixed number
     * of bytes. For example, if you received the following four fragmented packets:
     * <pre>
     * +---+----+------+----+
     * | A | BC | DEFG | HI |
     * +---+----+------+----+
     * </pre>
     * A {@link FixedLengthFrameDecoder}{@code (3)} will decode them into the
     * following three packets with the fixed length:
     * <pre>
     * +-----+-----+-----+
     * | ABC | DEF | GHI |
     * +-----+-----+-----+
     * </pre>
     */
    public class FixedLengthFrameDecoder extends ByteToMessageDecoder {
    
        private final int frameLength;
    
        /**
         * Creates a new instance.
         *
         * @param frameLength the length of the frame
         */
        public FixedLengthFrameDecoder(int frameLength) {
            if (frameLength <= 0) {
                throw new IllegalArgumentException(
                        "frameLength must be a positive integer: " + frameLength);
            }
            this.frameLength = frameLength;
        }
    
        @Override
        protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            Object decoded = decode(ctx, in);
            if (decoded != null) {
                out.add(decoded);
            }
        }
    
        /**
         * Create a frame out of the {@link ByteBuf} and return it.
         *
         * @param   ctx             the {@link ChannelHandlerContext} which this {@link ByteToMessageDecoder} belongs to
         * @param   in              the {@link ByteBuf} from which to read data
         * @return  frame           the {@link ByteBuf} which represent the frame or {@code null} if no frame could
         *                          be created.
         */
        protected Object decode(
                @SuppressWarnings("UnusedParameters") ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            //判断当前累加器里的字节是否小于frameLength
            if (in.readableBytes() < frameLength) {
                return null;
            } else {
                return in.readRetainedSlice(frameLength);
            }
        }
    }复制代码

    4 行解码器分析

    非丢弃模式处理

    4.1 定位行尾



    4.2 非丢弃模式


    4.2.1 找到换行符情况下

    4.2.2 找不到换行符情况下



    解析出长度超过最大可解析长度
    直接进入丢弃模式,读指针移到写指针位(即丢弃)
    并传播异常

    4.3 丢弃模式

    找到换行符


    记录当前丢弃了多少字节(已丢弃 + 本次将丢弃的)
    锁定换行符类型
    将读指针直接移到换行符后
    丢弃字节置零
    重置为非丢弃状态
    所有字节丢弃后才触发快速失败机制

    找不到换行符


    直接记录当前丢弃字节(已丢弃 + 当前可读字节数)
    将读指针直接移到写指针

    5 基于分隔符解码器分析

    • 构造器
      传入一系列分隔符,通过解码器将二进制流分成完整数据包


    • decode 方法


    5.1 分析解码步骤

    5.1.1 行处理器

    • 行处理器决断


    • 定义位置


    • 初始化位置


    • 判断分隔符


    5.1.2 找到最小分隔符



    遍历所有分隔符,计算以每一个分隔符分割的数据包的长度

    5.1.3 解码

    5.1.3.1 找到分隔符


    非空,说明已经找到分隔符
    和之前一样,在此先判断当前是否处于丢弃模式


    非丢弃模式

    显然第一次时为 false, 因此非丢弃模式



    当前数据包大于允许解析最大数据长度时,直接将该段数据包连同最小分隔符跳过(丢弃)



    没有超过的就是正常合理逻辑的数据包的长度,判断解析出的数据包是否包含分隔符

    丢弃模式


    5.1.3.2 未找到分隔符


    5.1.3.2.1 非丢弃模式

    当前可读字节长大于允许解析最大数据长度时,记录该丢弃字节数

    5.1.3.2.2 丢弃模式

    6 基于长度域解码器参数分析

    重要参数

    • maxFrameLength (包的最大长度)



      防止太大导致内存溢出,超出包的最大长度 Netty 将会做一些特殊处理

    • lengthFieldOffset (消息体长度)


      长度域的偏移量lengthFieldOffset,0表示无偏移
      ByteBuf的什么位置开始就是length字段

    • lengthFieldLength


      长度域length字段的长度

    • lengthAdjustment



      有些情况可能会把header也包含到length长度中,或者length字段后面还有一些不包括在length长度内的,可以通过lengthAdjustment调节

    • initialBytesToStrip



      起始截掉的部分,如果传递给后面的Handler的数据不需要消息头了,可以通过这个设置
      可以通过消息中的一个表示消息长度的字段值动态分割收到的ByteBuf

    6.1 基于长度


    这类数据包协议比较常见,前几个字节表示数据包长度(不包括长度域),后面为具体数据
    拆完后数据包是一个完整的带有长度域的数据包(之后即可传递到应用层解码器进行解码),
    创建一个如下方式的 LengthFieldBasedFrameDecoder即可实现这类协议

    6.2 基于长度截断

    若应用层解码器不需用到长度字段,那么我们希望 Netty 拆包后,如此


    长度域被截掉,我们只需指定另一个参数 initialBytesToStrip 即可实现
    表 Netty 拿到一个完整数据包后向业务解码器传递之前,应该跳过多少字节

    initialBytesToStrip 为4,表获取一个完整数据包后,忽略前面4个字节,应用解码器拿到的就是 不带长度域的数据包

    6.3 基于偏移长度


    此方式二进制协议更为普遍,前几个固定字节表示协议头,通常包含一些 magicNumberprotocol version 之类的 meta信息,紧跟着后面的是一个长度域,表示包体有多少字节的数据
    只需要基于第一种情况,调整第二个参数既可以实现

    lengthFieldOffset为4,表示跳过4个字节才是长度域

    6.4 基于可调整长度的拆包

    有些时候,二进制协议可能会设计成如下方式


    长度域在前, header在后

    • 长度域在数据包最前面表示无偏移,lengthFieldOffset为 0
    • 长度域的长度为3,即lengthFieldLength为3
    • 长度域表示的包体的长度略过了header,这里有另外一个参数lengthAdjustment,包体长度调整的大小,长度域的数值表示的长度加上这个修正值表示的就是带header的包,这里是 12+2,header和包体一共占14字节

    6.5 基于偏移可调整长度的截断

    二进制协议带有两个header


    拆完后, HDR1 丢弃,长度域丢弃,只剩下第二个 header和有效包体
    这种协议中,一般 HDR1可以表示 magicNumber,表示应用只接受以该 magicNumber开头的二进制数据,RPC 里面用的较多

    参数设置

    • 长度域偏移为1,即lengthFieldOffset为1
    • 长度域长度为2,即 lengthFieldLength为2
    • 长度域表示的包体的长度略过HDR2,但拆包时HDR2也被 Netty 当作包体的一部分来拆,HDR2的长度为1,即 lengthAdjustment 为1
    • 拆完后,截掉前面三个字节,即initialBytesToStrip 为 3

    6.6 基于偏移可调整变异长度的截断

    前面所有的长度域表示的都是不带header的包体的长度
    如果让长度域表示的含义包含整个数据包的长度,如下


    长度域字段值为16, 其字段长度为2, HDR1的长度为1, HDR2的长度为1,包体的长度为12, 1+1+2+12=16

    参数设置

    除长度域表示的含义和上一种情况不一样外,其他都相同,因为 Netty 不了解业务情况,需告诉 Netty ,长度域后再跟多少字节就可形成一个完整数据包,这里显然是13字节,长度域为16,因此减掉3才是真是的拆包所需要的长度,lengthAdjustment为-3

    若你的协议基于长度,即可考虑不用字节来实现,而是直接拿来用,或者继承他,简单修改即可

    7 基于长度域解码器分析

    7.1 构造方法

    public LengthFieldBasedFrameDecoder(ByteOrder byteOrder, int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip, boolean failFast) {
        // 省略参数校验
        this.byteOrder = byteOrder;
        this.maxFrameLength = maxFrameLength;
        this.lengthFieldOffset = lengthFieldOffset;
        this.lengthFieldLength = lengthFieldLength;
        this.lengthAdjustment = lengthAdjustment;
        lengthFieldEndOffset = lengthFieldOffset + lengthFieldLength;
        this.initialBytesToStrip = initialBytesToStrip;
        this.failFast = failFast;
    }复制代码

    把传参数保存在 field即可

    • byteOrder
      字节流表示的数据是大端还是小端,用于长度域的读取
    • lengthFieldEndOffset
      紧跟长度域字段后面的第一个字节的在整个数据包中的偏移量
    • failFast
      • 为true 表读取到长度域,TA的值的超过maxFrameLength,就抛 TooLongFrameException
      • false 表只有当真正读取完长度域的值表示的字节之后,才抛 TooLongFrameException,默认设为true,建议不要修改,否则可能会造成内存溢出

    7.2 实现拆包抽象

    具体的拆包协议只需要实现

    void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)复制代码

    in 表目前为止还未拆的数据,拆完之后的包添加到 out这个list中即可实现包向下传递

    • 第一层实现


    重载的protected方法decode实现真正的拆包,以下三步走

    1 计算需要抽取的数据包的长度

    protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            // 拿到实际的未调整过的包长度
            long frameLength = getUnadjustedFrameLength(in, actualLengthFieldOffset, lengthFieldLength, byteOrder);
    
            if (frameLength < lengthFieldEndOffset) {
                failOnFrameLengthLessThanLengthFieldEndOffset(in, frameLength, lengthFieldEndOffset);
            }
    
            if (frameLength > maxFrameLength) {
                exceededFrameLength(in, frameLength);
                return null;
            }
        }复制代码
    • 拿到长度域的实际字节偏移


    • 调整包的长度


    • 如果当前可读字节还未达到长度长度域的偏移,那说明肯定是读不到长度域的,直接不读


    上面有个getUnadjustedFrameLength,若你的长度域代表的值表达的含义不是基本的int,short等基本类型,可重写该方法


    比如,有的奇葩的长度域里面虽然是4个字节,比如 0x1234,但是TA的含义是10进制,即长度就是十进制的1234,那么覆盖这个函数即可实现奇葩长度域拆包

    1. 长度校验
    • 整个数据包的长度还没有长度域长,直接抛异常



    • 数据包长度超出最大包长度,进入丢弃模式


      • 当前可读字节已达到frameLength,直接跳过frameLength个字节,丢弃之后,后面有可能就是一个合法的数据包
      • 当前可读字节未达到frameLength,说明后面未读到的字节也需丢弃,进入丢弃模式,先把当前累积的字节全部丢弃

    bytesToDiscard 表还需丢弃多少字节

    • 最后,调用failIfNecessary判断是否需要抛出异常
      • 不需要再丢弃后面的未读字节(bytesToDiscard == 0),重置丢弃状态
        • 如果没有设置快速失败(!failFast),或者设置了快速失败并且是第一次检测到大包错误(firstDetectionOfTooLongFrame),抛出异常,让handler处理
        • 如果设置了快速失败,并且是第一次检测到打包错误,抛出异常,让handler去处理


    前面我们可以知道failFast默认为true,而这里firstDetectionOfTooLongFrametrue,所以,第一次检测到大包肯定会抛出异常

    3 丢弃模式的处理

    LengthFieldBasedFrameDecoder.decoder方法入口处还有一段代码

    • 若当前处在丢弃模式,先计算需要丢弃多少字节,取当前还需可丢弃字节和可读字节的最小值,丢弃后,进入 failIfNecessary,对照着这个函数看,默认情况下是不会继续抛出异常,而如果设置了 failFast为false,那么等丢弃完之后,才会抛出异常

    2 跳过指定字节长度的逻辑处理

    在丢弃模式的处理及长度校验都通过后

    • 先验证当前是否已读到足够的字节,若读到了,在下一步抽取一个完整的数据包之前,需根据initialBytesToStrip的设置来跳过某些字节,当然,跳过的字节不能大于数据包的长度,否则抛 CorruptedFrameException 异常

    抽取frame

    • 拿到当前累积数据的读指针,然后拿到待抽取数据包的实际长度进行抽取,抽取之后,移动读指针


    • 抽取的过程即调用了一下 ByteBufretainedSlice API,该API无内存copy的开销

      从真正抽取数据包来看看,传入的参数为 int 型,所以自定义协议中,如果你的长度域是8字节,那么前4字节基本没用

    小结

    • 如果你使用了Netty,并且二进制协议基于长度,考虑使用LengthFieldBasedFrameDecoder吧,通过调整各种参数,一定会满足你
    • LengthFieldBasedFrameDecoder的拆包包括合法参数校验,异常包处理,以及最后调用 ByteBuf 的retainedSlice来实现无内存copy的拆包

    8 解码器总结

    8.1 ByteToMessageDecoder 解码步骤

    8.2 基于长度解码器步骤

    8.3 两个问题

    转载于:https://juejin.im/post/5bff46805188255b5d596e4e

    展开全文
  • Netty 解析

    2017-03-07 13:47:26
    描述符就是一个数字,指向内核中一个机构 Linux IO 复用模型: Linux 提供select/poll,进程通过将一个或多个fd传递给select或poll系统调用,阻塞在select,这样select/poll可以帮我们侦测许多fd是否就绪。...
    [b]Linux网络IO模型:[/b]
    

    Linux的内核将所有外部设备都可以看做一个文件来操作,那么我们对与外部设备的操作都可以看做对文件进行操作。我们对一个文件的读写,都通过调用内核提供的系统调用;内核给我们返回一个文件描述符file descriptor(fd)。

    描述符就是一个数字,指向内核中一个机构体

    Linux IO 复用模型:

    Linux 提供select/poll,进程通过将一个或多个fd传递给select或poll系统调用,阻塞在select,这样select/poll可以帮我们侦测许多fd是否就绪。但是select/poll是顺序扫描fd是否就绪,而且支持的fd数量有限,默认值2048。

    Linux 还提供了一个epoll系统调用,epoll是基于事件驱动方式,而不是顺序扫描,当有fd就绪时,立即回调函数rollback。

    IO 复用常见的应用场景:
    1)服务器需要同时处理多个处于监听状态和多个连接状态的套接字
    2)服务器需要处理多种网络协议的套接字

    epoll与select原理类似,只不过,epoll作出了一些重大改进,具体如下:

    1.支持一个进程打开的socket描述符(fd)不受限制(仅受限于操作系统的最大文件句柄数);select有个比较大的缺陷就是一个进程所打开的fd是有一定限制的,由FD_SETSIZE设置,默认值是2048(epoll在1GB内存的机器上大约是10万左右)。

    2.IO效率可能随着fd数目增加而线性下降;传统的select/poll另一个致命弱点就是当你拥有一个很大的socket集合,由于网络延时,任一时间只有部分的socket是“活跃”的,但是select/poll每次调用都会线性扫描全部的集合,导致效率呈现线性下降。但是epoll不存在这个问题,他只会对“活跃”的socket进行操作(这是因为在内核实现中epoll是根据每个fd上面的callback函数实现的)。那么,只有“活跃”的socket才会主动的去调用callback函数,其他idle状态的socket则不会。

    3.使用mmap加速内核与用户空间的消息传递;无论是select/poll还是epoll都需要内核把fd消息通知给用户空间,如何避免不必要的内存拷贝就很重要,在这点上,epoll是通过内核于用户空间mmap同一块内存实现的

    4.epoll的API更加简单:包括创建一个epoll描述符,添加监听事件,阻塞等待所监听的时间发生,关闭epoll描述符。


    [b]传统的BIO编程:[/b]

    网络编程的基本模型是Client/Server 模型,也就是两个进程之间进行相互通信,其中服务端提供位置信息(绑定的IP 地址和监听端口),客户端通过连接操作向服务端监听的地址发起连接请求,通过三次握手建立连接,如果连接建立成功,双方就可以通过网络套接字(Socket)进行通信。
    在基于传统同步阻塞模型开发中,ServerSocket 负责绑定IP 地址,启动监听端口;Socket 负责发起连接操作。连接成功之后,双方通过输入和输出流进行同步阻塞式通信。

    采用BIO 通信模型的服务端,通常由一个独立的Acceptor 线程负责监听客户端的连接,它接收到客户端连接请求之后为每个客户端创建一个新的线程进行链路处理,处理完成之后,通过输出流返回应答给客户端,线程销毁。这就是典型的一请求一应答通信模型。


    [img]http://dl2.iteye.com/upload/attachment/0123/1766/8aaae8ea-6cb3-371d-a032-2436fe503d34.png[/img]


    该模型最大的问题就是缺乏弹性伸缩能力,当客户端并发访问量增加后,服务端的线程个数和客户端并发访问数呈1:1 的正比关系,由于线程是Java 虚拟机非常宝贵的系统资源,当线程数膨胀之后,系统的性能将急剧下降,随着并发访问量的继续增大,系统会发生线程堆栈溢出、创建新线程失败等问题,并最终导致进程宕机或者僵死,不能对外提供服务。

    为了解决同步阻塞I/O 面临的一个链路需要一个线程处理的问题,后来有人对它的线程模型进行了优化,后端通过一个线程池来处理多个客户端的请求接入,形成客户端个数M:线程池最大线程数N 的比例关系,其中M 可以远远大于N,通过线程池可以灵活的调配线程资源,设置线程的最大值,防止由于海量并发接入导致线程耗尽。

    采用线程池和任务队列可以实现一种叫做伪异步的I/O 通信框架,它的模型如下所示。


    [img]http://dl2.iteye.com/upload/attachment/0123/1768/72be137e-b28a-37d9-b4b9-a74680754c84.png[/img]


    当有新的客户端接入的时候,将客户端的Socket 封装成一个Task(该任务实现java.lang.Runnable 接口)投递到后端的线程池中进行处理,由于线程池可以设置消息队列的大小和最大线程数,因此,它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机。

    伪异步I/O 实际上仅仅只是对之前I/O 线程模型的一个简单优化,它无法从根本上解决同步I/O 导致的通信线程阻塞问题。下面我们就简单分析下如果通信对方返回应答时间过长,会引起的级联故障。

    1. 服务端处理缓慢,返回应答消息耗费60s,平时只需要10ms。
    2. 采用伪异步I/O的线程正在读取故障服务节点的响应,由于读取输入流是阻塞的,因此,它将会被同步阻塞60s。
    3. 假如所有的可用线程都被故障服务器阻塞,那后续所有的I/O消息都将在队列中排队。
    4. 由于线程池采用阻塞队列实现,当队列积满之后,后续入队列的操作将被阻塞。
    5. 由于前端只有一个Accptor线程接收客户端接入,它被阻塞在线程池的同步阻塞队列之后,新的客户端请求消息将被拒绝,客户端会发生大量的连接超时。
    6. 由于几乎所有的连接都超时,调用者会认为系统已经崩溃,无法接收新的请求消息。


    [b]几种I/O 模型的功能和特性对比:[/b]


    [img]http://dl2.iteye.com/upload/attachment/0123/1786/664ca772-d4f6-38a1-89e4-060e009a1574.png[/img]

    从JDK1.4开始,JDK提供了一套专门的类库支持非阻塞I/O(NIO),NIO API由三个主要部分组成:缓存区(Buffers)、通道(Channels)和Selector组成。

    NIO是基于事件驱动思想来实现的,他采用Reactor模式实现,主要用来解决BIO模型中一个服务端无法同时并发处理大量客户端连接的问题。

    NIO基于Selector进行轮询,当socket有数据可读、可写、连接完成、新的TCP请求接入事件时,操作系统内核会触发Selector返回准备就绪的SelectionKey的集合,通过SelectableChannel进行读写操作。由于JDK的Selector底层基于epoll实现,因此不受2048连接数的限制,理论上可以同时处理操作系统最大文件句柄个数的连接。

    SelectableChannel的读写操作都是异步非阻塞的,当由于数据没有就绪导致读半包时,立即返回,不会同步阻塞等待数据就绪,当TCP缓存区数据就绪之后,会触发Selector的读事件,驱动下一次读操作。因此,一个Reactor线程就可以同时处理N个客户端的连接,这就解决了之前BIO的一连接一线程的弊端,使Java服务端的并发读写能力得到极大的提升。


    [b]业界主流的NIO框架Netty:[/b]


    [img]http://dl2.iteye.com/upload/attachment/0123/1792/bdfa9630-374b-3874-a495-6f94cd86dd6a.png[/img]


    Netty是一个异步、非阻塞、事件驱动的网络应用框架。基于Netty,可以快速的开发和部署高性能、高可用的网络服务端和客户端应用。

    Netty服务端时序图:

    [img]http://dl2.iteye.com/upload/attachment/0123/5512/b48f825a-5cc4-32e7-9917-5fbe668bbd97.png[/img]

    Netty服务端创建时序图:

    [img]http://dl2.iteye.com/upload/attachment/0123/1804/6458bd24-c4bf-352c-9fc2-d3ae4926c173.png[/img]


    Netty客户端时序图:

    [img]http://dl2.iteye.com/upload/attachment/0123/5514/dbf4bc03-5066-366a-8cc1-efbdd95fa364.png[/img]


    Netty客户端创建时序图:

    [img]http://dl2.iteye.com/upload/attachment/0123/1816/49bc6a4f-063c-345d-af9e-c3e625ce5516.png[/img]


    [b]Netty架构:[/b]

    Netty采用了比较典型的三层网络架构进行设计和开发,逻辑架构如下:

    [img]http://dl2.iteye.com/upload/attachment/0123/5552/a1f7bb11-64c5-37ea-8fe1-5c8d7d5ac5eb.png[/img]

    第一层:业务逻辑编排层,业务逻辑编排层通常有两类:一类是纯粹的业务逻辑编排,还有一类是其他的应用层协议插件,用于特性协议相关的会话和链路管理。

    第二层;职责链PipeLine,他负责事件在职责链中的有序传播,同时负责动态的编排职责链,职责链可以选择监听和处理自己关心的事件,他可以拦截处理和向后/向前传播事件,不同的应用的Handler节点的功能也不同,通常情况下,往往会开发编解码Handler用于消息的编解码,他可以将外部的协议消息转换成内部的POJO对象,这样上层业务侧只需要关心处理业务逻辑即可,不需要感知底层的协议差异和线程模型差异,实现了架构层面的分离隔离。

    第三层:Reactor通信调度层,他由一系列辅助类完成,包括Reactor线程NioEventLoop以及起父类、NioSocketChannel/NioserverSocketChannel以及其父类、ByteBuffer以及由其衍生出来的各种Buffer、Unsafe以及其衍生出的各种内部类等。该层的主要职责就是监听网络的读写和连接操作,负责将网络层的数据读取到内存缓存区中,然后触发各种网络事件,例如连接创建、连接激活、读事件、写事件等等,将这些时间出发到PipeLine中,由PipeLine充当的职责链来进行后续的处理。

    在Netty里,所有事件都来自ChannelEvent 接口,这些事件涵盖监听端口、建立连接、读写数
    据等网络通讯的各个阶段。而事件的处理者就是ChannelHandler ,这样,不但是业务逻辑,连网络通讯流程中底层的处理,都可以通过实现ChannelHandler 来完成了。事实上,Netty内部的连接处理、协议编解码、超时等机制,都是通过handler完成的。

    在Netty里, Channel 是通讯的载体(连接的通道),是ChannelEvent的产生者,而ChannelHandler 负责Channel中的逻辑处理。

    ChannelPipeline 是ChannelHandler的容器:一个Channel包含一个ChannelPipeline,所有ChannelHandler都会注册到ChannelPipeline中,并按顺序组织起来。

    在Netty中, ChannelEvent 是数据或者状态的载体,例如传输的数据对应MessageEvent ,状态的改变对应ChannelStateEvent 。当对Channel进行操作时,会产生一个ChannelEvent,并
    发送到ChannelPipeline 。ChannelPipeline会选择一个ChannelHandler进行处理。这个ChannelHandler处理之后,可能会产生新的ChannelEvent,并流转到下一个ChannelHandler。


    [img]http://dl2.iteye.com/upload/attachment/0123/5749/6ce25aa4-4e4c-3beb-b19b-6c1a58feae35.png[/img]


    Netty的ChannelPipeline包含两条线路:Upstream和Downstream。Upstream对应上行,接收到的消息、被动的状态改变,都属于Upstream。Downstream则对应下行,发送的消息、主动的状态改变,都属于Downstream。ChannelPipeline 接口包含了两个重要的方法: sendUpstream(ChannelEvent e) 和sendDownstream(ChannelEvent e) ,就分别对应了
    Upstream和Downstream。

    对应的,ChannelPipeline里包含的ChannelHandler也包含两类: ChannelUpstreamHandler 和ChannelDownstreamHandler 。每条线路的Handler是互相独立的。它们都很简单的只包含一个方法: ChannelUpstreamHandler.handleUpstream 和ChannelDownstreamHandler.handleDownstream 。


    [img]http://dl2.iteye.com/upload/attachment/0123/5751/190646a3-f26d-3a7e-b580-f5b427b83a71.png[/img]


    在一条“流”里,一个ChannelEvent 并不会主动的”流”经所有的Handler,而是由上一个Handler显式的调用ChannelPipeline.sendUp(Down)stream 产生,并Handler,而是由上一个Handler显式的调用ChannelPipeline.sendUp(Down)stream 产生,并交给下一个Handler处理。也就是说,每个Handler接收到一个ChannelEvent,并处理结束后,如果需要继续处理,那么它需要调用sendUp(Down)stream 新发起一个事件。如果它不再发起事件,那么处理就到此结束,即使它后面仍然有Handler没有执行。这个机制可以保证最大的灵活性,当然对Handler的先后顺序也有了更严格的要求。

    DefaultChannelPipeline的内部结构ChannelPipeline 的主要的实现代码在DefaultChannelPipeline 类里。

    列一下DefaultChannelPipeline的主要字段:

    public class DefaultChannelPipeline implements ChannelPipeline {
    private volatile Channel channel;
    private volatile ChannelSink sink;
    private volatile DefaultChannelHandlerContext head;
    private volatile DefaultChannelHandlerContext tail;
    private final Map<String, DefaultChannelHandlerContext> name2ctx =
    new HashMap<String, DefaultChannelHandlerContext>(4);
    }

    ChannelHandlerContext保存了Netty与Handler相关的的上下文信息。而这里的DefaultChannelHandlerContext ,则是对ChannelHandler 的一个包装。一个DefaultChannelHandlerContext 内部,除了包含一个ChannelHandler ,还保存了”next”和”prev”两个指针,从而形成一个双向链表。

    因此,在DefaultChannelPipeline 中,我们看到的是对DefaultChannelHandlerContext 的引
    用,而不是对ChannelHandler 的直接引用。这里包含”head”和”tail”两个引用,分别指向链表的头和尾。而name2ctx则是一个按名字索引DefaultChannelHandlerContext用户的一个
    map,主要在按照名称删除或者添加ChannelHandler时使用。

    sendUpstream和sendDownstream是ChannelPipeline 接口的两个重要的方法:sendUpstream(ChannelEvent e) 和sendDownstream(ChannelEvent e) 。所有事件的发起都
    是基于这两个方法进行的。Channels 类有一系列fireChannelBound 之类的fireXXXX 方法,其实都是对这两个方法的facade包装。

    先看DefaultChannelHandlerContext.sendUpstream(对代码做了一些简化,保留主逻辑):

    public void sendUpstream(ChannelEvent e) {

    DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
    head.getHandler().handleUpstream(head, e);
    }
    private DefaultChannelHandlerContext getActualUpstreamContext(DefaultChannelHandlerContext ctx) {
    DefaultChannelHandlerContext realCtx = ctx;
    while (!realCtx.canHandleUpstream()) {
    realCtx = realCtx.next;
    if (realCtx == null) {
    return null;
    }
    }
    return realCtx;
    }

    这里最终调用了ChannelUpstreamHandler.handleUpstream 来处理这个ChannelEvent。有意思的是,这里我们看不到任何”将Handler向后移一位”的操作,但是我们总不能每次都用同一个Handler来进行处理啊?实际上,我们更为常用的是ChannelHandlerContext.handleUpstream 方法(实现是DefaultChannelHandlerContext.sendUpstream 方法):

    public void sendUpstream(ChannelEvent e) {
    DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);
    DefaultChannelPipeline.this.sendUpstream(next, e);
    }

    可以看到,这里最终仍然调用了ChannelPipeline.sendUpstream 方法,但是它会将Handler指
    针后移。

    我们接下来看看DefaultChannelHandlerContext.sendDownstream :

    public void sendDownstream(ChannelEvent e) {
    DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
    if (prev == null) {
    try {
    getSink().eventSunk(DefaultChannelPipeline.this, e);
    } catch (Throwable t) {
    notifyHandlerException(e, t);
    }
    } else {
    DefaultChannelPipeline.this.sendDownstream(prev, e);
    }
    }


    与sendUpstream好像不大相同哦?这里有两点:一是到达末尾时,会调用ChannelSink进行处理;二是这里指针是往前移的,所以我们知道了:UpstreamHandler是从前往后执行的,DownstreamHandler是从后往前执行的。在ChannelPipeline里添加时需要注意顺序了!

    ChannelSink包含一个重要方法ChannelSink.eventSunk ,可以接受任意ChannelEvent,实际上,它的作用确实是这样,也可以换个说法:”处于末尾的万能Handler”。
    展开全文
  • Netty 消息接收类故障案例分析

    千次阅读 2019-02-19 14:32:22
    背景 消息接收类故障 尽管 Netty 应用广泛,非常成熟,但是由于对 Netty ...在各种故障中,Netty 服务端接收不到客户端消息是一种比较常见的异常,大部分场景下都是用户使用不当导致的,下面我们对常见的消息接收...
  • netty深度解析

    2019-01-28 00:11:04
    在分析客户端的代码时, 我们已经对 Bootstrap 启动 Netty 有了一个大致的认识, 那么接下来分析服务器端时, 就会相对简单一些了. 首先还是来看一下服务器端的启动代码: public final class EchoServer { static ...
  • Netty实战之使用Netty解析交通部JT808协议

    千次阅读 热门讨论 2019-05-17 17:44:11
    使用Netty也有一段时间了,对Netty也有个大概的了解。回想起刚刚使用Netty的时候踩了很多坑,很多Netty的组件也不会使用,或者说用得不够好,不能称之为"最佳实践"。此文的目的便是带领大家使用Netty构建出一个完整...
  • Netty解析Redis网络协议

    万次阅读 2015-06-19 21:45:40
    Netty解析Redis网络协议根据Redis官方文档的介绍,学习了一下Redis网络通信协议。然后偶然在GitHub上发现了个用Netty实现的Redis服务器,很有趣,于是就动手实现了一下!1.RESP协议Redis的客户端与服务端采用一种...
  • 解决Netty消息超长导致拆成多条消息(拆包) 遇到问题的场景 问题的起源是因为帮朋友用socket做一个消息中转,然后就发现了他每次发送的消息都特别长导致channelread会读到多次,最开始图了省事手动去处理的拆包...
  • Netty解析JT808协议

    千次阅读 2018-03-19 21:49:20
    消息体属性(2-3) 终端手机号(4-9) 消息流水号(10-11) 消息包封装项(12-15) byte [0-1] 消息ID word( 16 ) byte [2-3] 消息体属性 word( 16 ) bit [0-9] 消息体长度 bit [10-12] 数据加密方式 此三位都...
  • 使用Java Netty做Concox协议解析

    千次阅读 2020-08-08 18:37:04
    //组合完整消息体(包长度+协议号+信息内容+信息序列号) //包长度字节+协议号+消息内容+消息序列号(等同于消息长度-错误校验+包长度的字节) int msgLen = dataLen + length - 2; ByteBuf frame = ByteBufAllocator...
  • Nettynetty角度分析http协议解析(二)Post方法协议解析HttpPostRequestDecoder
  • (因为请求头,请求行,请求都是根据回车换行来分隔) 1.2 请求包 主要包含三部分:请求行(line),请求头(header),请求正文(body)  请求行(Line) :主要包含三部分:Method ,URI ,协议/版本。 各...
  • 请求报文 Http请求报文由三部分组成:请求行,请求头,请求 携带信息 请求行:请求方法、请求地址、...Content-Length可用于服务端判断消息接受完的条件 请求:GET请求与POST请求传递方式不同(Message Body) re...
  • netty角度分析http协议解析
  • import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio....
  • Netty

    2021-05-05 17:12:05
    1. Netty是什么 Netty是一个NIO网络编程框架,快速开发高性能、高可靠性的网络服务器/客户端程序。 极大地简化了TCP和UDP等网络编程。是一个异步事件驱动的网络框架。 重点是NIO、快速、高性能。 2.Netty 常见使用...
  • 优化其中的解析 2.基本demo 1.服务器端启动类 import java.util.Scanner; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import com.hy.netty.server.handler.NettySe
  • netty最核心的就是reactor线程,对应项目中使用广泛的NioEventLoop,那么NioEventLoop里面到底在干些什么事?netty是如何保证事件循环的高效轮询和任务的及时执行?又是如何来优雅地fix掉jdk的nio bug?带着这些疑问...
  • netty

    2020-07-07 12:04:36
                                     ... Netty框架介绍 ...一、netty(通讯框架)介紹 1、什么是netty  ...
  • 一,HTTP解码器可能会将一个HTTP请求解析成多个消息对象。 ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new ParseRequestHandler()); 经过HttpServerCodec解码之后,一个HTTP请求会导致...
  • Netty4 HTTP请求参数解析(GET, POST)

    万次阅读 2015-12-23 01:09:49
    我们在使用Netty编写HTTP服务器时,一个非常痛苦的地方就是解析POST请求的代码真是太丑陋了,远没有servlet中request.getParameter()优雅。我猜测这是因为Netty是一个网络通讯框架,所以设计者希望尽可能的把底层...
  • 部署方面, 可以采用以下4种方式,并会陆续公布jenkins集合以下3种部署方式的脚本和配置文件: IDEA 启动 jar部署 docker部署 k8s部署 架构图 技术栈/版本介绍 JSON序列化:Jackson 消息队列:RabbitMQ 缓存:Redis ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 5,789
精华内容 2,315
关键字:

netty消息体解析