2018-10-24 14:11:18 LOVE_HopeOne 阅读数 434
  • Mina网络应用程序NIO框架实战详解

    网络编程的基本模型是C/S模型,即两个进程间的通信。 课程通过循序渐进的方式从相关的名词概率介绍,到基础的模型BIO、NIO、AIO实现和练习,重点讲解NIO相关的知识,然后延伸到NIO框架上去练习。 课程还将在工作中应用较多的几种编解码器进行详细分析和练习。 Mina是基于java NIO类库开发; 采用非阻塞方式的异步传输; 事件驱动;支持批量数据传输; 支持TCP、UDP协议;支持Spring; 采用优雅的松耦合架构; 可灵活的加载过滤器机制;

    288 人正在学习 去看看 安燚

 

什么导致了断包、粘包:

        mina是基于TCP/IP、UDP/IP协议栈的通信框架。Mina 可以帮助我们快速开发高性能、高扩展性的网络通信应用,Mina 提供了事件驱动、异步(Mina 的异步IO 默认使用的是JAVA NIO 作为底层支持)操作的编程模型。

        断包、粘包的问题,是Mina基于TCP协议栈通信的问题。TCP是面向流的,而面向流传输的数据是无保护边界的,无保护边界代表着如果发送端连续传输数据接收端有可能在一次接收动作中,会接收两个或者更多的数据包。

什么是消息保护边界和无消息保护边界:

消息保护边界:就是传输协议把数据当做独立的一条数据在网上进行传输,而且接收端也只能接收独立的消息,也就是因为存在着消息保护边界,接收端一次只能接收发送端传来的一个数据包,这一点有一点像UDP协议。

无消息保护边界:面向流传输数据的是无消息保护边界的,也就是在发送端连续发送数据的情况下接受端可能会在一次中接收两个或多个数据包。

断包、粘包的体现实例:

① 先接收到数据包A,然后接收到数据包B;

② 先接收到数据包A的部分数据,然后接收到数据包A的剩余数据和数据包的全部数据。

③ 先接受到数据包A的全部数据和数据包B的部分数据,然后接收到数据包B的剩余全部数据。

④ 一次性接收完数据包A和数据包B的全部数据。

 

① 正常的情况

② 断包+粘包

③ 粘包+断包

④ 粘包

数据包(消息)的格式:

 

包头 + 消息长度(int)+消息内容(json字符串、普通字符串)+ 包尾

 

Mina处理 断包、粘包问题

       在Mina框架中有个——CumulativeProtocolDecoder (累积性的协议解码器),专门用来处理粘包和断包问题。doDecode()的返回值有重要作用。

        @ doDecode() 方法 ——》 返回 true ,CumulativeProtocolDecoder 的 decode() 方法会首先判断你是否在 doDecode() 方法中从内部的 IoBuffer 缓冲区中读取了数据,如果没有则会抛出非法状态的异常, 也就是因为 你的 doDecode()返回 true 的时候就表示你已经消费了本次的数据,(一个完整的消息已经被读取完毕)。

进一步说:也就是必须你已经消费过 内部 IoBuffer 缓冲区的数据(哪怕数据只有一个字节的大小)。如果 内部 验证通过,确实已经消费了数据,那么CumulativeProtocolDecoder(累积性的协议解码器)会检查缓冲区是否还有数据没有被读取,如果有那么 就继续调用 doDecode(),如果没有就停止对 doDecode() 方法的调用,直到有新的数据被缓冲。

       @ doDecode() 方法 ——》 返回 false , CumulativeProtocolDecoder 会停止对 doDecode() 方法的调用,但此时如果本次数据还有未读取完的,就将含有数据的 IoBuffer 缓冲区保存到 IoSession 中,以便下一次数据到来时可以从 IoSession 中提取合并。如果发现本次数据全部读取完毕,则清空 IoBuffer 缓冲区 (开始进行接收下一个包)。

        简单来说:当你认为读取的数据已经够解码了,那么就返回 true,否则就返回false。 CumulativeProtocolDecoder 就是帮你完成数据的累积,但是这个过程是很繁琐的。

        也就是说:当返回 true 时,CumulativeProtocolDecoder 会调用 deDecoder() 把剩余的数据发下来(剩余数据就是在 remaining()中的数据),返回false 就是不处理剩余的数据(剩余数据不交给 doDecoder()处理),当有新的数据包传过来的时候再把剩余的数据和新的数据拼接在一起,然后调用 decoder。

需要的jar包

Mina 实例 Java代码

service类:

acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(new ByteArrayCodecFactory(Charset.forName("UTF-8")))) ; // 自定义解编码器

编码器类:

import org.apache.mina.core.buffer.IoBuffer;

import org.apache.mina.core.session.IoSession;

import org.apache.mina.filter.codec.ProtocolEncoderAdapter;

import org.apache.mina.filter.codec.ProtocolEncoderOutput;

import java.nio.charset.Charset;



public class ByteArrayEncoder extends ProtocolEncoderAdapter {

private final Charset charset;

public ByteArrayEncoder(Charset charset) {

this.charset = charset;

}

/**

* 直接将数据发出去,数据格式,包头+消息长度(int)+消息内容(json字符串)+包尾 包头包尾是十六进 制字符串00 aa bb cc,转化成字节数组0, * -86, -69, -52四个字节

*

* @param session

* @param message

* @param out

* @throws Exception

*/

@Override

public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {

// 仿项目,解决断包,粘包问题

String value = (message == null ? "" : message.toString()); // 消息值

byte[] content = value.getBytes(charset);// 消息内容,字节数组

IoBuffer buf = IoBuffer.allocate(38 + content.length).setAutoExpand(true);

// 缓冲区容量大小38字节加上字符长度

buf.put(new byte[] { 0, -86, -69, -52 });

// 输入包开头固定值十六进制00 aa bb cc,转化成字节数组

buf.putUnsignedInt(content.length);

// int为4字节,一个字节等于2个16进制字符,所以有八位 00 00 00 0c,内容长度。 buf.put(content);// 消息内容

buf.put(new byte[] { 0, -86, -69, -52 });// 包尾

buf.flip();

out.write(buf); // 写入

}

}

解码器类 :解决Mina断包,丢包问题 (重点)

import org.apache.mina.core.buffer.IoBuffer;

import org.apache.mina.core.session.IoSession;

import org.apache.mina.filter.codec.CumulativeProtocolDecoder;

import org.apache.mina.filter.codec.ProtocolDecoderOutput;

import java.nio.charset.Charset;

/**

* 自定义解码器,确保能读到完整的包

*/

public class ByteArrayDecoder extends CumulativeProtocolDecoder {

private final Charset charset;

public ByteArrayDecoder(Charset charset) {

this.charset = charset;

}

@Override

protected boolean doDecode (IoSession ioSession, IoBuffer ioBuffer, ProtocolDecoderOutput protocolDecoderOutput)throws Exception {

// 丢包,断包处理

if (ioBuffer.remaining() > 4)// 有包头,包头足够

{

ioBuffer.mark();

// 标记当前position的快照标记mark,以便后继的reset操作能恢复position位置,开始是0

byte[] l = new byte[4];

ioBuffer.get(l);

// 读取包头,占4个字节

if (ioBuffer.remaining() < 4)

// 内容长度的4个字节不够,断包

{

ioBuffer.reset();

return false;

} else {

// 内容长度的4个字节数组足够

byte[] bytesLegth = new byte[4];

// 内容长度

ioBuffer.get(bytesLegth);// 读取内容长度,int类型,占四个字节

int len = MinaUtil.byteArrayToInt(bytesLegth);

// 内容长度有多少

if (ioBuffer.remaining() < len) // 内容不够,断包

{

ioBuffer.reset();

return false;

} else {

// 消息内容足够

byte[] bytes = new byte[len];

ioBuffer.get(bytes, 0, len);

protocolDecoderOutput.write(new String(bytes, charset));

// 读取内容,并且发送

if (ioBuffer.remaining() < 4) { // 包尾不够

ioBuffer.reset();

return false;

} else {

// 包尾足够

byte[] tails = new byte[4];

ioBuffer.get(tails);

// 读取包尾

if (ioBuffer.remaining() > 0){

// 最后如果粘了包,会再次调用doDeocde()方法,把剩余数据给doDeocde()方法处理

return true;

}}}}}

return false; // 断包,或者执行完,

} }

编解码工厂

import org.apache.mina.core.session.IoSession;

import org.apache.mina.filter.codec.ProtocolCodecFactory;

import org.apache.mina.filter.codec.ProtocolDecoder;

import org.apache.mina.filter.codec.ProtocolEncoder;

import java.nio.charset.Charset;

/** * 自定义解编码器工厂 * */

public class ByteArrayCodecFactory implements ProtocolCodecFactory {

private ByteArrayDecoder decoder;

private ByteArrayEncoder encoder;

public ByteArrayCodecFactory() {

this(Charset.defaultCharset());

}

public ByteArrayCodecFactory(Charset charSet) {

encoder = new ByteArrayEncoder(charSet);

decoder = new ByteArrayDecoder(charSet);

}

@Override

public ProtocolDecoder getDecoder(IoSession session) throws Exception {

return decoder;

}

@Override

public ProtocolEncoder getEncoder(IoSession session) throws Exception {

return encoder;

} }

工具类方法 (二进制转整型)

public static int byte2Int(byte[] l) {        
        
        return (l[0]&0xff)<<24            
                | (l[1]&0xff)<<16            
                | (l[2]&0xff)<<8            
                | (l[3]&0xff);
    }

 强调: 这个 编解码器   封装的是  String 类型数组,在业务逻辑处理层   收发信心的时候   要用 String 格式定义!
 

2014-03-29 11:20:07 qq244272324 阅读数 2838
  • Mina网络应用程序NIO框架实战详解

    网络编程的基本模型是C/S模型,即两个进程间的通信。 课程通过循序渐进的方式从相关的名词概率介绍,到基础的模型BIO、NIO、AIO实现和练习,重点讲解NIO相关的知识,然后延伸到NIO框架上去练习。 课程还将在工作中应用较多的几种编解码器进行详细分析和练习。 Mina是基于java NIO类库开发; 采用非阻塞方式的异步传输; 事件驱动;支持批量数据传输; 支持TCP、UDP协议;支持Spring; 采用优雅的松耦合架构; 可灵活的加载过滤器机制;

    288 人正在学习 去看看 安燚

协议编解码器是在使用Mina 的时候最需要关注的对象,因为网络传输的数据都是二进制数据(byte),而在程序中面向的是JAVA 对象,这就需要在发送数据时将JAVA 对象编码二进制数据,接收数据时将二进制数据解码为JAVA 对象。

编解码器同样是以过滤器的形式安插在过滤器链上,如下所示:

// 设置过滤器(使用Mina提供的文本换行符编解码器)

acceptor.getFilterChain().addLast(

     "codec",

     newProtocolCodecFilter(new TextLineCodecFactory(Charset

           .forName("UTF-8"),

           LineDelimiter.WINDOWS.getValue(),

           LineDelimiter.WINDOWS.getValue())));

协议编解码器是通过ProtocolCodecFilter过滤器构造的,看它的构造方法,它需要一个ProtocolCodecFactory对象:

publicProtocolCodecFilter(ProtocolCodecFactory factory) {

        if (factory == null){

            throw newNullPointerException("factory");

        }

        this.factory =factory;

}

ProtocolCodecFactory接口非常直接,通过ProtocolEncoder和

ProtocolDecoder对象来构建!

public interfaceProtocolCodecFactory {

    /**

     * Returns a new (orreusable) instance of {@link ProtocolEncoder} which

     * encodes message objectsinto binary or protocol-specific data.

     */

    ProtocolEncodergetEncoder(IoSession session) throws Exception;

 

    /**

     * Returns a new (orreusable) instance of {@link ProtocolDecoder} which

     * decodes binary orprotocol-specific data into message objects.

     */

    ProtocolDecodergetDecoder(IoSession session) throws Exception;

}

ProtocolEncoder和ProtocolDecoder接口是Mina负责编码和解码的顶级接口!

编码和解码的前提就是协议的制定:比如上面我们使用的Mina自带的根据文本换行符解码的TextLineCodecFactory(),如果遇到文本换行符就开始编解码!


package lcl.mina.demo2;


import java.nio.charset.Charset;


import org.apache.mina.common.IoSession;

import org.apache.mina.filter.codec.ProtocolCodecFactory;

import org.apache.mina.filter.codec.ProtocolDecoder;

import org.apache.mina.filter.codec.ProtocolEncoder;


import lcl.mina.demo2.LCLTextLineCodecDecoder;

import lcl.mina.demo2.LCLTextLineCodecEncoder;


public class LCLTextLineCodecFactory implements ProtocolCodecFactory {


private Charset charset; // 编码格式


private String delimiter; // 文本分隔符


public LCLTextLineCodecFactory(Charset charset, String delimiter) {

this.charset = charset;

this.delimiter = delimiter;

}


//读取信息执行

public ProtocolDecoder getDecoder(IoSession session) throws Exception {

return new LCLTextLineCodecDecoder(charset, delimiter);

}


//发送编写信息执行

public ProtocolEncoder getEncoder(IoSession session) throws Exception {

return new LCLTextLineCodecEncoder(charset, delimiter);

}


}


package lcl.mina.demo2;


import java.nio.charset.CharacterCodingException;

import java.nio.charset.Charset;

import java.nio.charset.CharsetDecoder;


import org.apache.mina.common.IoBuffer;

import org.apache.mina.common.IoSession;

import org.apache.mina.filter.codec.ProtocolDecoder;

import org.apache.mina.filter.codec.ProtocolDecoderOutput;


public class LCLTextLineCodecDecoder implements ProtocolDecoder {


private Charset charset; // 编码格式


private String delimiter; // 文本分隔符


private IoBuffer delimBuf; // 文本分割符匹配的变量


// 定义常量值,作为每个IoSession中保存解码任务的key值

private static String CONTEXT = LCLTextLineCodecDecoder.class.getName()

+ ".context";


// 构造函数,必须指定Charset和文本分隔符

public LCLTextLineCodecDecoder(Charset charset, String delimiter) {

this.charset = charset;

this.delimiter = delimiter;

}


public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out)

throws Exception {

Context ctx = getContext(session);

if (delimiter == null || "".equals(delimiter)) { // 如果文本换行符未指定,使用默认值

delimiter = "\r\n";

}

if (charset == null) {

charset = Charset.forName("utf-8");

}

decodeNormal(ctx, in, out);

}


// 从IoSession中获取Context对象

private Context getContext(IoSession session) {

Context ctx;

ctx = (Context) session.getAttribute(CONTEXT);

if (ctx == null) {

ctx = new Context();

session.setAttribute(CONTEXT, ctx);

}

return ctx;

}


// 解码

private void decodeNormal(Context ctx, IoBuffer in,

ProtocolDecoderOutput out) throws CharacterCodingException {

// 取出未完成任务中已经匹配的文本换行符的个数

int matchCount = ctx.getMatchCount();

// 设置匹配文本换行符的IoBuffer变量

if (delimBuf == null) {

            IoBuffer tmp = IoBuffer.allocate(2).setAutoExpand(true);

            tmp.putString(delimiter, charset.newEncoder());

            tmp.flip();

            delimBuf = tmp;

        }

int oldPos = in.position(); // 解码的IoBuffer中数据的原始信息

        int oldLimit = in.limit();

        while (in.hasRemaining()) { // 变量解码的IoBuffer

            byte b = in.get();

            if (delimBuf.get(matchCount) == b) { // 匹配第matchCount位换行符成功

                matchCount++;               

                if (matchCount == delimBuf.limit()) { // 当前匹配到字节个数与文本换行符字节个数相同,匹配结束

                    int pos = in.position();   // 获得当前匹配到的position(position前所有数据有效)

                    in.limit(pos);

                    in.position(oldPos);   // position回到原始位置


                    ctx.append(in);   // 追加到Context对象未完成数据后面


                    in.limit(oldLimit); // in中匹配结束后剩余数据

                    in.position(pos);


                    IoBuffer buf = ctx.getBuf();

                    buf.flip();

                    buf.limit(buf.limit() - matchCount);// 去掉匹配数据中的文本换行符

                    try {

                        out.write(buf.getString(ctx.getDecoder())); // 输出解码内容

                    } finally {

                        buf.clear(); // 释放缓存空间

                    }


                    oldPos = pos;

                    matchCount = 0;

                }

            } else {

            // 如果matchCount==0,则继续匹配

            // 如果matchCount>0,说明没有匹配到文本换行符的中的前一个匹配成功字节的下一个字节,

            // 跳转到匹配失败字符处,并置matchCount=0,继续匹配

                in.position(in.position()-matchCount);

                matchCount = 0;  // 匹配成功后,matchCount置空

            }

        }

        

        // 把in中未解码内容放回buf

        in.position(oldPos);

        ctx.append(in);


        ctx.setMatchCount(matchCount);

}


public void dispose(IoSession session) throws Exception {


}


public void finishDecode(IoSession session, ProtocolDecoderOutput out)

throws Exception {

}


// 内部类,保存IoSession解码时未完成的任务

private class Context {

private CharsetDecoder decoder;

private IoBuffer buf; // 保存真实解码内容

private int matchCount = 0; // 匹配到的文本换行符个数


private Context() {

decoder = charset.newDecoder();

buf = IoBuffer.allocate(80).setAutoExpand(true);

}


// 重置

public void reset() {

matchCount = 0;

decoder.reset();

}


// 追加数据

public void append(IoBuffer in) {

getBuf().put(in);

}


// ======get/set方法=====================

public CharsetDecoder getDecoder() {

return decoder;

}


public IoBuffer getBuf() {

return buf;

}


public int getMatchCount() {

return matchCount;

}


public void setMatchCount(int matchCount) {

this.matchCount = matchCount;

}

} // end class Context;


}


package lcl.mina.demo2;


import java.nio.charset.Charset;


import org.apache.mina.common.IoBuffer;

import org.apache.mina.common.IoSession;

import org.apache.mina.filter.codec.ProtocolEncoder;

import org.apache.mina.filter.codec.ProtocolEncoderOutput;


public class LCLTextLineCodecEncoder implements ProtocolEncoder {


private Charset charset; // 编码格式


private String delimiter; // 文本分隔符


public LCLTextLineCodecEncoder(Charset charset, String delimiter) {

this.charset = charset;

this.delimiter = delimiter;

}


public void encode(IoSession session, Object message,

ProtocolEncoderOutput out) throws Exception {

if (delimiter == null || "".equals(delimiter)) { // 如果文本换行符未指定,使用默认值

delimiter = "\r\n";

}

if (charset == null) {

charset = Charset.forName("utf-8");

}


String value = message.toString();

IoBuffer buf = IoBuffer.allocate(value.length()).setAutoExpand(true);

buf.putString(value, charset.newEncoder()); // 真实数据

buf.putString(delimiter, charset.newEncoder()); // 文本换行符

buf.flip();

out.write(buf);

}


public void dispose(IoSession session) throws Exception {

}


}



2014-03-22 20:42:00 weixin_33911824 阅读数 11
  • Mina网络应用程序NIO框架实战详解

    网络编程的基本模型是C/S模型,即两个进程间的通信。 课程通过循序渐进的方式从相关的名词概率介绍,到基础的模型BIO、NIO、AIO实现和练习,重点讲解NIO相关的知识,然后延伸到NIO框架上去练习。 课程还将在工作中应用较多的几种编解码器进行详细分析和练习。 Mina是基于java NIO类库开发; 采用非阻塞方式的异步传输; 事件驱动;支持批量数据传输; 支持TCP、UDP协议;支持Spring; 采用优雅的松耦合架构; 可灵活的加载过滤器机制;

    288 人正在学习 去看看 安燚

一、Mina对编解码的支持

 

我们知道网络通讯过程实际是对二进制数据进行处理的过程,二进制数据是计算机认识的数据。对于接收到的二进制数据我们需要将其转换成我们所熟悉的数据格式,此过程称为解码(decode);对于所要发送的数据,我们需要转换为计算机所能处理的二进制数据,此过程称为编码(encode)。



       Mina对数据的编解码提供了良好的支持,它提供了过滤器ProtocolCodecFilter支持编码和解码过程,可以查看包org.apache.mina.filter.codec下的代码。

看下此过滤器的调用,代码很简单:

Java代码  
  1. // 加入编解码过滤器  
  2. chain.addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory()));  

 实现原理

ProtocolCodecFilter包含一个编码器和解码器工厂:

Java代码  
  1. //编码器和解码器工厂  
  2. private final ProtocolCodecFactory factory;  

 此工厂可以通过构造方法传入,具体构造方法可以具体看源码,比较简单,此处不做详细介绍。

主要看下解码和编码过程,解码应该是消息接收到,我们程序对消息进行处理时进行的,此时我们想到ProtocolCodecFilter应该覆盖messageReceived方法。编码应该是发送消息时,需要将我们的业务数据结构转换为二进制数据,此时我们想到ProtocolCodecFilter应该覆盖filterWrite方法。

解码过程

前面已经说了,解码过程就是将二进制数据转换为我们可以识别的数据结构,所以messageReceived方法一开始就有个判断:

Java代码  
  1. //对于解码,消息类型必须是IoBuffer类型的,如果不是,转向下个filter  
  2. if (!(message instanceof IoBuffer)) {  
  3.     nextFilter.messageReceived(session, message);  
  4.     return;  
  5. }  

 解码的核心操作:

Java代码  
  1. //处理消息,如果buffer中还有数据,就处理数据  
  2. while (in.hasRemaining()) {  
  3.     int oldPos = in.position();  
  4.         try {  
  5.         synchronized (decoderOut) {  
  6.             //进行解码操作。  
  7.             decoder.decode(session, in, decoderOut);  
  8.         }  
  9. …………  
  10. }  

 我们需要实现解码器ProtocolDecoder接口,主要实现解码方法decode。可以参考TextLineDecoder类的实现,下面代码是本人实际项目中的实现:

Java代码  
  1. public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {  
  2.     final int packHeadLength = 2;  
  3.     // 先获取上次的处理上下文,其中可能有未处理完的数据  
  4.     Context ctx = getContext(session);  
  5.     // 先把当前buffer中的数据追加到Context的buffer当中  
  6.     ctx.append(in);  
  7.     // 把position指向0位置,把limit指向原来的position位置  
  8.     IoBuffer buf = ctx.getBuffer();  
  9.     buf.flip();  
  10.     // 当前剩余长度大于2  
  11.     while (buf.remaining() >= packHeadLength) {  
  12.         buf.mark();  
  13.         if (ByteConvertUtil.toHex(buf.get()).equalsIgnoreCase("eb")) {  
  14.             if (ByteConvertUtil.toHex(buf.get()).equalsIgnoreCase("93")) {  
  15.                 buf.reset();  
  16.                 if(buf.remaining()<11){  
  17.                     break;  
  18.                 }  
  19.                 byte[] dataArray = new byte[11];  
  20.                 buf.get(dataArray, 0, 11);  
  21.   
  22.                 if (SensorData.checkData(dataArray)) {  
  23.                     SensorData data = new SensorData(dataArray);  
  24.                     out.write(data);  
  25.                     // 回应客户端  
  26.                     byte[] b = new byte[2];  
  27.                     b[0] = ByteConvertUtil.uniteBytes("eb");  
  28.                     b[1] = ByteConvertUtil.uniteBytes("93");  
  29.                     session.write(IoBuffer.wrap(b));  
  30.                 }  
  31.             } else {  
  32.                 continue;  
  33.             }  
  34.         } else {  
  35.             continue;  
  36.         }  
  37.     }  
  38.     //断包处理,将剩余数据放入CONTEXT中  
  39.     if (buf.hasRemaining()) {  
  40.         IoBuffer temp = IoBuffer.allocate(maxPackLength).setAutoExpand(true);  
  41.         temp.put(buf);  
  42.         temp.flip();  
  43.         buf.clear();  
  44.         buf.put(temp);  
  45.     } else {  
  46.         buf.clear();  
  47.     }  
  48. }  

 顺便说下,我们最好要把我们的的数据包的格式提前定义好,了解了数据包的格式我们才能更好的进行数据的编解码。定义好数据包格式一方面方便编解码,另一方面可以解决下面要说的粘包和断包的问题。

数据包的定义有很多种方式,这里说下我所用过的两种方式:

1.固定消息长度,消息头+消息体+校验码。此方式相对简单,表示的内容也比较少。

2.不定消息长度,消息头+消息长度+消息体。此方式可以无限消息长度,比较灵活。

解码出一个消息体后,需要将数据通过ProtocolDecoderOutput的write方法写入到队列(queue)里面去:

Java代码  
  1. public void write(Object message) {  
  2.     if (message == null) {  
  3.         throw new IllegalArgumentException("message");  
  4.     }  
  5.     //将消息写入队列  
  6.     messageQueue.add(message);  
  7. }  

 真正执行消息向下传递是通过flush方法:

Java代码  
  1. public void flush(NextFilter nextFilter, IoSession session) {  
  2.     Queue<Object> messageQueue = getMessageQueue();  
  3.     // 取出队列里面的消息向下传递  
  4.     while (!messageQueue.isEmpty()) {  
  5.         nextFilter.messageReceived(session, messageQueue.poll());  
  6.     }  
  7. }  

 编码过程

看了上面的解码过程,编码过程就不难理解了,编码过程只不过是解码过程的逆向过程,同样在filterWrite方法里有消息类型的判断:

Java代码  
  1. //消息如果已经是IoBuffer,就不需要再进行编码  
  2. if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {  
  3.     nextFilter.filterWrite(session, writeRequest);  
  4.     return;  
  5. }  

 编码:

Java代码  
  1. // 进行数据编码  
  2. encoder.encode(session, message, encoderOut);  

 此处编码实现可以参考TextLineEncoder的编码实现,比较简单,此处就不多做解释了。

同样编码也是通过write到一个队列中,然后通过flush写入到后面的过滤器中的。

 

二、Mina对粘包和断包的处理

 

上面说了mina对编解码的支持,在解码过程中,不得不面对的一个问题就是TCP的粘包和断包,先说下什么是粘包和断包。

TCP通讯是面向数据流的通讯,我们将数据流理解为一支竹竿,数据包就相当于竹竿中的每一节,那么我们的解码过程就相当于对竹竿进行分解的过程。竹竿就是多个数据包的“粘包”,断包就是指竹节中间断开,我们需要将它拼接成为一个完整的竹节,如果不能拼接起来就要废弃这部分。

粘包:

 

 

断包:

 

 

 

 

对粘包的处理相对比较简单,只需要依据数据包的格式进行数据流的分割即可;对于断包的处理我们需要将断包的数据保存起来,等待接收下次的数据进行拼接。

通常情况下我们要考虑粘包和断包同时出现的情况下的解码代码编写。有两种实现方式:

1.继承CumulativeProtocolDecoder类,实现doDecode方法。

2.实现ProtocolDecoder接口,自己解决粘包和断包的问题。

先看下CumulativeProtocolDecoder的实现。

它有一个成员变量BUFFER:

Java代码  
  1. //存放断包数据  
  2. private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");  

 doDecode方法一方面判断数据包是否符合解码要求(数据包可能过短,数据包格式不合要求都可能不能通过解码要求),不符合刚返回false;另一方面对于符合解码要求的数据进行数据解码,并返回true。可以参考ImageRequestDecoder类的实现。

 

看下它的decode方法实现:

Java代码  
  1. public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {  
  2.     if (!session.getTransportMetadata().hasFragmentation()) {  
  3.         while (in.hasRemaining()) {  
  4.             // 判断是否符合解码要求,不符合则中断并返回  
  5.             if (!doDecode(session, in, out)) {  
  6.                 break;  
  7.             }  
  8.         }  
  9.         return;  
  10.     }  
  11.   
  12.     boolean usingSessionBuffer = true;  
  13.     // 取得上次断包数据  
  14.     IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);  
  15.     // If we have a session buffer, append data to that; otherwise  
  16.     // use the buffer read from the network directly.  
  17.     if (buf != null) { // 如果有断包数据  
  18.         boolean appended = false;  
  19.         // Make sure that the buffer is auto-expanded.  
  20.         if (buf.isAutoExpand()) {  
  21.             try {  
  22.                 // 将断包数据和当前传入的数据进行拼接  
  23.                 buf.put(in);  
  24.                 appended = true;  
  25.             } catch (IllegalStateException e) {  
  26.                 // A user called derivation method (e.g. slice()),  
  27.                 // which disables auto-expansion of the parent buffer.  
  28.             } catch (IndexOutOfBoundsException e) {  
  29.                 // A user disabled auto-expansion.  
  30.             }  
  31.         }  
  32.   
  33.         if (appended) {  
  34.             buf.flip();// 如果是拼接的数据,将buf置为读模式  
  35.         } else {  
  36.             // Reallocate the buffer if append operation failed due to  
  37.             // derivation or disabled auto-expansion.  
  38.             //如果buf不是可自动扩展的buffer,刚通过数据拷贝的方式将断包数据和当前数据进行拼接  
  39.             buf.flip();  
  40.             IoBuffer newBuf = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true);  
  41.             newBuf.order(buf.order());  
  42.             newBuf.put(buf);  
  43.             newBuf.put(in);  
  44.             newBuf.flip();  
  45.             buf = newBuf;  
  46.   
  47.             // Update the session attribute.  
  48.             session.setAttribute(BUFFER, buf);  
  49.         }  
  50.     } else {  
  51.         buf = in;  
  52.         usingSessionBuffer = false;  
  53.     }  
  54.   
  55.     for (;;) {  
  56.         int oldPos = buf.position();  
  57.         boolean decoded = doDecode(session, buf, out);// 进行数据的解码操作  
  58.         if (decoded) {  
  59.             // 如果符合解码要求并进行了解码操作,  
  60.             // 则当前position和解码前的position不可能一样  
  61.             if (buf.position() == oldPos) {  
  62.                 throw new IllegalStateException("doDecode() can't return true when buffer is not consumed.");  
  63.             }  
  64.             // 如果已经没有数据,则退出循环  
  65.             if (!buf.hasRemaining()) {  
  66.                 break;  
  67.             }  
  68.         } else {// 如果不符合解码要求,则退出循环  
  69.             break;  
  70.         }  
  71.     }  
  72.     // if there is any data left that cannot be decoded, we store  
  73.     // it in a buffer in the session and next time this decoder is  
  74.     // invoked the session buffer gets appended to  
  75.     if (buf.hasRemaining()) {  
  76.         if (usingSessionBuffer && buf.isAutoExpand()) {  
  77.             buf.compact();  
  78.         } else {  
  79.             //如果还有没处理完的数据(一般为断包),刚将此数据存入session中,以便和下次数据进行拼接。
  80.             storeRemainingInSession(buf, session);  
  81.         }  
  82.     } else {  
  83.         if (usingSessionBuffer) {  
  84.             removeSessionBuffer(session);  
  85.         }  
  86.     }  
  87. }  

 上面的处理过程可以这样理解:

 

1.取得断包数据,如果有断包数据,就和当前数据拼接。

2.进行数据解码操作。

3.将可以进行解码操作的数据解码完成后,如果还有数据,则将剩余数据存入session中,等待下次数据到来,从步骤1开始再次执行。

通过继承ProtocolDecoder,实现decode方法,自己处理粘包和断包的方式其实和CumulativeProtocolDecoder类的实现原理是类似的,此处实现可以参考类TextLineDecoder,内部类Context保存了上下文信息,同样是保存在了sesion中的,具体实现方式大家可以仔细阅读代码。

 

 

三、总结

 

 

基于TCP的通讯协议才有可能产生粘包和断包的情况,粘包和断包的产生有多种原因,处理好粘包和断包的问题是网络编程必然面对的情况,对于这块的处理,大家如果有什么好的想法可以一起讨论。

每天进步一点点,不做无为的码农。。。。。

转载于:https://www.cnblogs.com/burgeen/p/3618052.html

2014-11-18 11:47:34 cao478208248 阅读数 1059
  • Mina网络应用程序NIO框架实战详解

    网络编程的基本模型是C/S模型,即两个进程间的通信。 课程通过循序渐进的方式从相关的名词概率介绍,到基础的模型BIO、NIO、AIO实现和练习,重点讲解NIO相关的知识,然后延伸到NIO框架上去练习。 课程还将在工作中应用较多的几种编解码器进行详细分析和练习。 Mina是基于java NIO类库开发; 采用非阻塞方式的异步传输; 事件驱动;支持批量数据传输; 支持TCP、UDP协议;支持Spring; 采用优雅的松耦合架构; 可灵活的加载过滤器机制;

    288 人正在学习 去看看 安燚

1、深入解析Apache Mina源码(1)——Mina的过滤器机制实现

2、深入解析Apache Mina源码(2)——Mina的事件模型

3、深入解析Apache Mina源码(3)——Mina的线程池模型

4、深入解析Apache Mina源码(4)——Mina编解码以及对粘包和断包的处理

 http://www.iteye.com/topic/1125178

一、Mina对编解码的支持

 

我们知道网络通讯过程实际是对二进制数据进行处理的过程,二进制数据是计算机认识的数据。对于接收到的二进制数据我们需要将其转换成我们所熟悉的数据格式,此过程称为解码(decode);对于所要发送的数据,我们需要转换为计算机所能处理的二进制数据,此过程称为编码(encode)。



       Mina对数据的编解码提供了良好的支持,它提供了过滤器ProtocolCodecFilter支持编码和解码过程,可以查看包org.apache.mina.filter.codec下的代码。

看下此过滤器的调用,代码很简单:

Java代码  收藏代码
  1. // 加入编解码过滤器  
  2. chain.addLast("codec"new ProtocolCodecFilter(new TextLineCodecFactory()));  

 实现原理

ProtocolCodecFilter包含一个编码器和解码器工厂:

Java代码  收藏代码
  1. //编码器和解码器工厂  
  2. private final ProtocolCodecFactory factory;  

 此工厂可以通过构造方法传入,具体构造方法可以具体看源码,比较简单,此处不做详细介绍。

主要看下解码和编码过程,解码应该是消息接收到,我们程序对消息进行处理时进行的,此时我们想到ProtocolCodecFilter应该覆盖messageReceived方法。编码应该是发送消息时,需要将我们的业务数据结构转换为二进制数据,此时我们想到ProtocolCodecFilter应该覆盖filterWrite方法。

解码过程

前面已经说了,解码过程就是将二进制数据转换为我们可以识别的数据结构,所以messageReceived方法一开始就有个判断:

Java代码  收藏代码
  1. //对于解码,消息类型必须是IoBuffer类型的,如果不是,转向下个filter  
  2. if (!(message instanceof IoBuffer)) {  
  3.     nextFilter.messageReceived(session, message);  
  4.     return;  
  5. }  

 解码的核心操作:

Java代码  收藏代码
  1. //处理消息,如果buffer中还有数据,就处理数据  
  2. while (in.hasRemaining()) {  
  3.     int oldPos = in.position();  
  4.         try {  
  5.         synchronized (decoderOut) {  
  6.             //进行解码操作。  
  7.             decoder.decode(session, in, decoderOut);  
  8.         }  
  9. …………  
  10. }  

 我们需要实现解码器ProtocolDecoder接口,主要实现解码方法decode。可以参考TextLineDecoder类的实现,下面代码是本人实际项目中的实现:

Java代码  收藏代码
  1. public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {  
  2.     final int packHeadLength = 2;  
  3.     // 先获取上次的处理上下文,其中可能有未处理完的数据  
  4.     Context ctx = getContext(session);  
  5.     // 先把当前buffer中的数据追加到Context的buffer当中  
  6.     ctx.append(in);  
  7.     // 把position指向0位置,把limit指向原来的position位置  
  8.     IoBuffer buf = ctx.getBuffer();  
  9.     buf.flip();  
  10.     // 当前剩余长度大于2  
  11.     while (buf.remaining() >= packHeadLength) {  
  12.         buf.mark();  
  13.         if (ByteConvertUtil.toHex(buf.get()).equalsIgnoreCase("eb")) {  
  14.             if (ByteConvertUtil.toHex(buf.get()).equalsIgnoreCase("93")) {  
  15.                 buf.reset();  
  16.                 if(buf.remaining()<11){  
  17.                     break;  
  18.                 }  
  19.                 byte[] dataArray = new byte[11];  
  20.                 buf.get(dataArray, 011);  
  21.   
  22.                 if (SensorData.checkData(dataArray)) {  
  23.                     SensorData data = new SensorData(dataArray);  
  24.                     out.write(data);  
  25.                     // 回应客户端  
  26.                     byte[] b = new byte[2];  
  27.                     b[0] = ByteConvertUtil.uniteBytes("eb");  
  28.                     b[1] = ByteConvertUtil.uniteBytes("93");  
  29.                     session.write(IoBuffer.wrap(b));  
  30.                 }  
  31.             } else {  
  32.                 continue;  
  33.             }  
  34.         } else {  
  35.             continue;  
  36.         }  
  37.     }  
  38.     //断包处理,将剩余数据放入CONTEXT中  
  39.     if (buf.hasRemaining()) {  
  40.         IoBuffer temp = IoBuffer.allocate(maxPackLength).setAutoExpand(true);  
  41.         temp.put(buf);  
  42.         temp.flip();  
  43.         buf.clear();  
  44.         buf.put(temp);  
  45.     } else {  
  46.         buf.clear();  
  47.     }  
  48. }  

 顺便说下,我们最好要把我们的的数据包的格式提前定义好,了解了数据包的格式我们才能更好的进行数据的编解码。定义好数据包格式一方面方便编解码,另一方面可以解决下面要说的粘包和断包的问题。

数据包的定义有很多种方式,这里说下我所用过的两种方式:

1.固定消息长度,消息头+消息体+校验码。此方式相对简单,表示的内容也比较少。

2.不定消息长度,消息头+消息长度+消息体。此方式可以无限消息长度,比较灵活。

解码出一个消息体后,需要将数据通过ProtocolDecoderOutput的write方法写入到队列(queue)里面去:

Java代码  收藏代码
  1. public void write(Object message) {  
  2.     if (message == null) {  
  3.         throw new IllegalArgumentException("message");  
  4.     }  
  5.     //将消息写入队列  
  6.     messageQueue.add(message);  
  7. }  

 真正执行消息向下传递是通过flush方法:

Java代码  收藏代码
  1. public void flush(NextFilter nextFilter, IoSession session) {  
  2.     Queue<Object> messageQueue = getMessageQueue();  
  3.     // 取出队列里面的消息向下传递  
  4.     while (!messageQueue.isEmpty()) {  
  5.         nextFilter.messageReceived(session, messageQueue.poll());  
  6.     }  
  7. }  

 编码过程

看了上面的解码过程,编码过程就不难理解了,编码过程只不过是解码过程的逆向过程,同样在filterWrite方法里有消息类型的判断:

Java代码  收藏代码
  1. //消息如果已经是IoBuffer,就不需要再进行编码  
  2. if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {  
  3.     nextFilter.filterWrite(session, writeRequest);  
  4.     return;  
  5. }  

 编码:

Java代码  收藏代码
  1. // 进行数据编码  
  2. encoder.encode(session, message, encoderOut);  

 此处编码实现可以参考TextLineEncoder的编码实现,比较简单,此处就不多做解释了。

同样编码也是通过write到一个队列中,然后通过flush写入到后面的过滤器中的。

 

二、Mina对粘包和断包的处理

 

上面说了mina对编解码的支持,在解码过程中,不得不面对的一个问题就是TCP的粘包和断包,先说下什么是粘包和断包。

TCP通讯是面向数据流的通讯,我们将数据流理解为一支竹竿,数据包就相当于竹竿中的每一节,那么我们的解码过程就相当于对竹竿进行分解的过程。竹竿就是多个数据包的“粘包”,断包就是指竹节中间断开,我们需要将它拼接成为一个完整的竹节,如果不能拼接起来就要废弃这部分。

粘包:

 

断包:

 

 

 

 

对粘包的处理相对比较简单,只需要依据数据包的格式进行数据流的分割即可;对于断包的处理我们需要将断包的数据保存起来,等待接收下次的数据进行拼接。

通常情况下我们要考虑粘包和断包同时出现的情况下的解码代码编写。有两种实现方式:

1.继承CumulativeProtocolDecoder类,实现doDecode方法。

2.实现ProtocolDecoder接口,自己解决粘包和断包的问题。

先看下CumulativeProtocolDecoder的实现。

它有一个成员变量BUFFER:

Java代码  收藏代码
  1. //存放断包数据  
  2. private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");  

 doDecode方法一方面判断数据包是否符合解码要求(数据包可能过短,数据包格式不合要求都可能不能通过解码要求),不符合刚返回false;另一方面对于符合解码要求的数据进行数据解码,并返回true。可以参考ImageRequestDecoder类的实现。

 

看下它的decode方法实现:

Java代码  收藏代码
  1. public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {  
  2.     if (!session.getTransportMetadata().hasFragmentation()) {  
  3.         while (in.hasRemaining()) {  
  4.             // 判断是否符合解码要求,不符合则中断并返回  
  5.             if (!doDecode(session, in, out)) {  
  6.                 break;  
  7.             }  
  8.         }  
  9.         return;  
  10.     }  
  11.   
  12.     boolean usingSessionBuffer = true;  
  13.     // 取得上次断包数据  
  14.     IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);  
  15.     // If we have a session buffer, append data to that; otherwise  
  16.     // use the buffer read from the network directly.  
  17.     if (buf != null) { // 如果有断包数据  
  18.         boolean appended = false;  
  19.         // Make sure that the buffer is auto-expanded.  
  20.         if (buf.isAutoExpand()) {  
  21.             try {  
  22.                 // 将断包数据和当前传入的数据进行拼接  
  23.                 buf.put(in);  
  24.                 appended = true;  
  25.             } catch (IllegalStateException e) {  
  26.                 // A user called derivation method (e.g. slice()),  
  27.                 // which disables auto-expansion of the parent buffer.  
  28.             } catch (IndexOutOfBoundsException e) {  
  29.                 // A user disabled auto-expansion.  
  30.             }  
  31.         }  
  32.   
  33.         if (appended) {  
  34.             buf.flip();// 如果是拼接的数据,将buf置为读模式  
  35.         } else {  
  36.             // Reallocate the buffer if append operation failed due to  
  37.             // derivation or disabled auto-expansion.  
  38.             //如果buf不是可自动扩展的buffer,刚通过数据拷贝的方式将断包数据和当前数据进行拼接  
  39.             buf.flip();  
  40.             IoBuffer newBuf = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true);  
  41.             newBuf.order(buf.order());  
  42.             newBuf.put(buf);  
  43.             newBuf.put(in);  
  44.             newBuf.flip();  
  45.             buf = newBuf;  
  46.   
  47.             // Update the session attribute.  
  48.             session.setAttribute(BUFFER, buf);  
  49.         }  
  50.     } else {  
  51.         buf = in;  
  52.         usingSessionBuffer = false;  
  53.     }  
  54.   
  55.     for (;;) {  
  56.         int oldPos = buf.position();  
  57.         boolean decoded = doDecode(session, buf, out);// 进行数据的解码操作  
  58.         if (decoded) {  
  59.             // 如果符合解码要求并进行了解码操作,  
  60.             // 则当前position和解码前的position不可能一样  
  61.             if (buf.position() == oldPos) {  
  62.                 throw new IllegalStateException("doDecode() can't return true when buffer is not consumed.");  
  63.             }  
  64.             // 如果已经没有数据,则退出循环  
  65.             if (!buf.hasRemaining()) {  
  66.                 break;  
  67.             }  
  68.         } else {// 如果不符合解码要求,则退出循环  
  69.             break;  
  70.         }  
  71.     }  
  72.     // if there is any data left that cannot be decoded, we store  
  73.     // it in a buffer in the session and next time this decoder is  
  74.     // invoked the session buffer gets appended to  
  75.     if (buf.hasRemaining()) {  
  76.         if (usingSessionBuffer && buf.isAutoExpand()) {  
  77.             buf.compact();  
  78.         } else {  
  79.             //如果还有没处理完的数据(一般为断包),刚将此数据存入session中,以便和下次数据进行拼接。  
  80.             storeRemainingInSession(buf, session);  
  81.         }  
  82.     } else {  
  83.         if (usingSessionBuffer) {  
  84.             removeSessionBuffer(session);  
  85.         }  
  86.     }  
  87. }  

 上面的处理过程可以这样理解:

 

1.取得断包数据,如果有断包数据,就和当前数据拼接。

2.进行数据解码操作。

3.将可以进行解码操作的数据解码完成后,如果还有数据,则将剩余数据存入session中,等待下次数据到来,从步骤1开始再次执行。

通过继承ProtocolDecoder,实现decode方法,自己处理粘包和断包的方式其实和CumulativeProtocolDecoder类的实现原理是类似的,此处实现可以参考类TextLineDecoder,内部类Context保存了上下文信息,同样是保存在了sesion中的,具体实现方式大家可以仔细阅读代码。

 

 

三、总结

 

 

基于TCP的通讯协议才有可能产生粘包和断包的情况,粘包和断包的产生有多种原因,处理好粘包和断包的问题是网络编程必然面对的情况,对于这块的处理,大家如果有什么好的想法可以一起讨论。

每天进步一点点,不做无为的码农。。。。。

2016-04-23 00:00:00 u010031673 阅读数 1061
  • Mina网络应用程序NIO框架实战详解

    网络编程的基本模型是C/S模型,即两个进程间的通信。 课程通过循序渐进的方式从相关的名词概率介绍,到基础的模型BIO、NIO、AIO实现和练习,重点讲解NIO相关的知识,然后延伸到NIO框架上去练习。 课程还将在工作中应用较多的几种编解码器进行详细分析和练习。 Mina是基于java NIO类库开发; 采用非阻塞方式的异步传输; 事件驱动;支持批量数据传输; 支持TCP、UDP协议;支持Spring; 采用优雅的松耦合架构; 可灵活的加载过滤器机制;

    288 人正在学习 去看看 安燚

Mina中的编解码器通过过滤器ProtocolCodecFilter构造,这个过滤器有3个构造器,其中可以分为两类,一类需要一个ProtoCodecFactory,这个接口有两个方法,分别是getDecoder()和getEncoder(),我们需要实现这两个方法,实现这两个方法就需要一个自定义的Decoder和一个自定义的Encoder

public class CmccSipcCodecFactory implements ProtocolCodecFactory{
    private final ProtocolEncoder encoder;
    private final ProtocolDecoder decoder;
    
    public CmccSipcCodecFactory(Charset charset) {
        this.decoder = new CmccSipcDecoder(charset);
        this.encoder = new CmccSipcEncoder(charset);
    }
    @Override
    public ProtocolDecoder getDecoder(IoSession session) throws Exception {
        return decoder;
    }

    @Override
    public ProtocolEncoder getEncoder(IoSession session) throws Exception {
        return encoder;
    }
}
其中CmccSipcDecoder和CmccSipcEncoder是我们自定义的编辑码器,编辑码器需要跟据我们的具体业务来实现,下面是一个编辑码器的例子,发送端和接收端按照约定好的格式将数据组装起来,比如下面的例子中我们是以@作为数据之间的分割符,解码的时候我们也要根据@来进行解码

public class CmccSipcEncoder extends ProtocolEncoderAdapter{
	
	private final Charset charset;

	public CmccSipcEncoder(Charset charset) {
		super();
		this.charset = charset;
	}
	public void encode(IoSession session, Object message,
			ProtocolEncoderOutput out) throws Exception {
		SmsObject sms =(SmsObject) message;
		CharsetEncoder ce=charset.newEncoder();
		IoBuffer buffer =IoBuffer.allocate(100).setAutoExpand(true);
		String statusLine="M sip:wap.fetion.com.cn SIP-C/2.0";
		String sender = sms.getSender();
		String receiver = sms.getReceiver();
		String smsContent = sms.getMessage();
		buffer.putString(statusLine+"@",ce);
		buffer.putString("S:"+sender+'@',ce);
		buffer.putString("R:"+receiver+"@", ce);
		buffer.putString("L:"+(smsContent.getBytes(charset).length)+"@", ce);
		buffer.putString(smsContent, ce);
		buffer.flip();
		out.write(buffer);
	}
}
我们继承了ProtocolEncoderAdapter,这样只实现我们需要的方法就可以了,解码器也是一样,关于解码器主要有两点需要注意

第一点:我们不知道数据发送过来的规模是多大,所以我们也不知道应该接收多少次才可以将数据接收完整,Mina提供了一个CumulativeProtocolDecoder类,这个类的作用是只要有数据发送过来,它就会读取数据,然后累积到内部的IoBuffer缓冲区,这样我们只要负责拆包就可以了。下面是一个解码器的例子

public class CmccSipcDecoder extends CumulativeProtocolDecoder{
	
	private final Charset charset;
	
	private AttributeKey CONTEXT = new AttributeKey(this.getClass(), "context");
	
	private class Context{
		int i=1;
		int matchCount=0;
		String statusLine = "";
		String sender ="";
		String receiver = "";
		String length ="";
		String sms = "";
		IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);
		public int getI() {
			return i;
		}
		public void setI(int i) {
			this.i = i;
		}
		public int getMatchCount() {
			return matchCount;
		}
		public void setMatchCount(int matchCount) {
			this.matchCount = matchCount;
		}
		public IoBuffer getBuffer() {
			return buffer;
		}
		public String getStatusLine() {
			return statusLine;
		}
		public void setStatusLine(String statusLine) {
			this.statusLine = statusLine;
		}
		public String getSender() {
			return sender;
		}
		public void setSender(String sender) {
			this.sender = sender;
		}
		public String getReceiver() {
			return receiver;
		}
		public void setReceiver(String receiver) {
			this.receiver = receiver;
		}
		public String getLength() {
			return length;
		}
		public void setLength(String length) {
			this.length = length;
		}
		public String getSms() {
			return sms;
		}
		public void setSms(String sms) {
			this.sms = sms;
		}
		
		
	}
	public CmccSipcDecoder(Charset charset){
		this.charset=charset;
	}
	@Override
	protected boolean doDecode(IoSession session, IoBuffer in,
			ProtocolDecoderOutput out) throws Exception {
		System.err.println("********************************");
		CharsetDecoder cd = charset.newDecoder();
		if(session.getAttribute(CONTEXT)==null){
			session.setAttribute(CONTEXT, new Context());
		}
		Context context = (Context)session.getAttribute(CONTEXT);
		IoBuffer buffer = context.getBuffer();
		while(in.hasRemaining()){
			byte b = in.get();
			buffer.put(b);
			if(b==64 && context.getI()<5){
				context.setMatchCount(context.getMatchCount()+1);
				if(context.getI()==1){
					buffer.flip();
					context.setStatusLine(buffer.getString(context.getMatchCount(),cd));
					context.setStatusLine(context.getStatusLine().substring(0, context.getStatusLine().length()-1));
					context.setMatchCount(0);
					buffer.clear();
				}
				if(context.getI()==2){
					buffer.flip();
					context.setSender(buffer.getString(context.getMatchCount(),cd));
					context.setSender(context.getSender().substring(0,context.getSender().length()-1));
					context.setMatchCount(0);
					buffer.clear();
				}
				if(context.getI()==3){
					buffer.flip();
					context.setReceiver(buffer.getString(context.getMatchCount(),cd));
					context.setReceiver(context.getReceiver().substring(0,context.getReceiver().length()));
					context.setMatchCount(0);
					buffer.clear();
				}
				if(context.getI()==4){
					buffer.flip();
					context.setLength(buffer.getString(context.getMatchCount(),cd));
					context.setLength(context.getLength().substring(0, context.getLength().length()-1));
					context.setMatchCount(0);
				}
				context.setI(context.getI()+1);
			}else if(context.getI()==5){
				context.setMatchCount(context.getMatchCount()+1);
				if(context.getMatchCount() == Long.parseLong(context.getLength().split(":")[1])){
					buffer.flip();
					buffer.position(context.getLength().indexOf(":")+4);
					context.setSms(buffer.getString(context.getMatchCount(),cd));
					context.setI(context.getI()+1);
					session.removeAttribute(CONTEXT);
					break;
				}
			}else{
				context.setMatchCount(context.getMatchCount()+1);
			}
		}
		if(session.getAttribute(CONTEXT)!=null){
			return true;
		}
		SmsObject smsObject=new SmsObject();
		smsObject.setSender(context.getSender().split(":")[1]);
		smsObject.setReceiver(context.getReceiver().split(":")[1]);
		smsObject.setMessage(context.getSms());
		out.write(smsObject);
		return false;
	}

}
那么doDecode方法和CumulativeProtocolDecoder类之间到底是怎么配合的呢?在创建编解码过滤器ProtocolCodecFilter的时候我们创建了一个ProtocolCodecFactory工厂类,该类中包含了我们自定义的编解码器,在创建ProtocolCodecFilter的时候ProtocolCodecFactory是作为构造参数传进去的,在收到客户端的消息时,会依次调用所有过滤器的messageReceived()方法,当然也会调用编解码过滤器ProtocolCodecFilter的messageReceived()方法,在messageReceived()方法中,会调用ProtoCodecFactory的getDecoder()方法获得解码器,也就是我们上面的解码器CmccSipcDecoder,拿到解码器后会调用该解码器的decode()方法,我们注意到在CmccSipcDecoder中并没有decode方法,decode是CmccSipcDecoder的父类中的一个方法,让我们看看这个方法是如何实现的

    public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
        if (!session.getTransportMetadata().hasFragmentation()) {
            while (in.hasRemaining()) {
                if (!doDecode(session, in, out)) {
                    break;
                }
            }

            return;
        }
一旦IoBuffer中有数据decode会不断的调用子类(也就是我们自定义的解码器)的doDecode方法,如果doDecode()方法返回true会不断的进行调用,知道IoBuffer中没有数据时或当子类doDecode()方法返回false时会跳出while循环,所以我们在实现doDecode方法时,如果我们判断已经解析了一条完整的数据但是在buffer中还有多余的数据就要返回true这样就可以继续解析下一条数据,如果我们判断接收的数据包不是一个完整的包就要返回false,这样在下次数据进来时会继续解析直到接收一个完整的包为止

第二点:在解码的时候,假设一条完整数据的大小是100K,但是我们的IoBuffer的大小为10K,这个时候需要执行4次(IoBuffer会以2被的速度自动扩容)doDecode才能完整的接收一条数据,但是这样存在一个问题第1,2,3次接收的数据要存放到什么地方呢,存到方法里肯定是不行的,因为每次调用方法时都会新创建一个方法栈,用完即被销毁。最好的办法就是讲数据存放到Session中,上面的例子程序演示了数据是如何存放到session中的。

搞清楚了这两个问题,编写编解码器就轻而易举了。

下面的链接中有完整的例子程序

http://download.csdn.net/detail/u010031673/9499864



Mina编解码工厂

阅读数 16

为什么80%的码农都做不了架构师?>>> ...

博文 来自: weixin_34270606

博文 来自: Donald_Draper
没有更多推荐了,返回首页