mina 编解码_mina字符串编解码 - CSDN
精华内容
参与话题
  • mina编解码(摘录)

    2019-06-24 08:32:13
    一、Mina编解码的支持 我们知道网络通讯过程实际是对二进制数据进行处理的过程,二进制数据是计算机认识的数据。对于接收到的二进制数据我们需要将其转换成我们所熟悉的数据格式,此过程称为解码(decode);...

    一、Mina对编解码的支持

     

    我们知道网络通讯过程实际是对二进制数据进行处理的过程,二进制数据是计算机认识的数据。对于接收到的二进制数据我们需要将其转换成我们所熟悉的数据格式,此过程称为解码(decode);对于所要发送的数据,我们需要转换为计算机所能处理的二进制数据,此过程称为编码(encode)。



           Mina对数据的编解码提供了良好的支持,它提供了过滤器ProtocolCodecFilter支持编码和解码过程,可以查看包org.apache.mina.filter.codec下的代码。

    看下此过滤器的调用,代码很简单:

    Java代码  
    1. // 加入编解码过滤器  
    2. chain.addLast("codec", new ProtocolCodecFilter(new TextLineCodecFactory()));  

     实现原理

    ProtocolCodecFilter包含一个编码器和解码器工厂:

    Java代码  
    1. //编码器和解码器工厂  
    2. private final ProtocolCodecFactory factory;  

     此工厂可以通过构造方法传入,具体构造方法可以具体看源码,比较简单,此处不做详细介绍。

    主要看下解码和编码过程,解码应该是消息接收到,我们程序对消息进行处理时进行的,此时我们想到ProtocolCodecFilter应该覆盖messageReceived方法。编码应该是发送消息时,需要将我们的业务数据结构转换为二进制数据,此时我们想到ProtocolCodecFilter应该覆盖filterWrite方法。

    解码过程

    前面已经说了,解码过程就是将二进制数据转换为我们可以识别的数据结构,所以messageReceived方法一开始就有个判断:

    Java代码  
    1. //对于解码,消息类型必须是IoBuffer类型的,如果不是,转向下个filter  
    2. if (!(message instanceof IoBuffer)) {  
    3.     nextFilter.messageReceived(session, message);  
    4.     return;  
    5. }  

     解码的核心操作:

    Java代码  
    1. //处理消息,如果buffer中还有数据,就处理数据  
    2. while (in.hasRemaining()) {  
    3.     int oldPos = in.position();  
    4.         try {  
    5.         synchronized (decoderOut) {  
    6.             //进行解码操作。  
    7.             decoder.decode(session, in, decoderOut);  
    8.         }  
    9. …………  
    10. }  

     我们需要实现解码器ProtocolDecoder接口,主要实现解码方法decode。可以参考TextLineDecoder类的实现,下面代码是本人实际项目中的实现:

    Java代码  
    1. public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {  
    2.     final int packHeadLength = 2;  
    3.     // 先获取上次的处理上下文,其中可能有未处理完的数据  
    4.     Context ctx = getContext(session);  
    5.     // 先把当前buffer中的数据追加到Context的buffer当中  
    6.     ctx.append(in);  
    7.     // 把position指向0位置,把limit指向原来的position位置  
    8.     IoBuffer buf = ctx.getBuffer();  
    9.     buf.flip();  
    10.     // 当前剩余长度大于2  
    11.     while (buf.remaining() >= packHeadLength) {  
    12.         buf.mark();  
    13.         if (ByteConvertUtil.toHex(buf.get()).equalsIgnoreCase("eb")) {  
    14.             if (ByteConvertUtil.toHex(buf.get()).equalsIgnoreCase("93")) {  
    15.                 buf.reset();  
    16.                 if(buf.remaining()<11){  
    17.                     break;  
    18.                 }  
    19.                 byte[] dataArray = new byte[11];  
    20.                 buf.get(dataArray, 0, 11);  
    21.   
    22.                 if (SensorData.checkData(dataArray)) {  
    23.                     SensorData data = new SensorData(dataArray);  
    24.                     out.write(data);  
    25.                     // 回应客户端  
    26.                     byte[] b = new byte[2];  
    27.                     b[0] = ByteConvertUtil.uniteBytes("eb");  
    28.                     b[1] = ByteConvertUtil.uniteBytes("93");  
    29.                     session.write(IoBuffer.wrap(b));  
    30.                 }  
    31.             } else {  
    32.                 continue;  
    33.             }  
    34.         } else {  
    35.             continue;  
    36.         }  
    37.     }  
    38.     //断包处理,将剩余数据放入CONTEXT中  
    39.     if (buf.hasRemaining()) {  
    40.         IoBuffer temp = IoBuffer.allocate(maxPackLength).setAutoExpand(true);  
    41.         temp.put(buf);  
    42.         temp.flip();  
    43.         buf.clear();  
    44.         buf.put(temp);  
    45.     } else {  
    46.         buf.clear();  
    47.     }  
    48. }  

     顺便说下,我们最好要把我们的的数据包的格式提前定义好,了解了数据包的格式我们才能更好的进行数据的编解码。定义好数据包格式一方面方便编解码,另一方面可以解决下面要说的粘包和断包的问题。

    数据包的定义有很多种方式,这里说下我所用过的两种方式:

    1.固定消息长度,消息头+消息体+校验码。此方式相对简单,表示的内容也比较少。

    2.不定消息长度,消息头+消息长度+消息体。此方式可以无限消息长度,比较灵活。

    解码出一个消息体后,需要将数据通过ProtocolDecoderOutput的write方法写入到队列(queue)里面去:

    Java代码  
    1. public void write(Object message) {  
    2.     if (message == null) {  
    3.         throw new IllegalArgumentException("message");  
    4.     }  
    5.     //将消息写入队列  
    6.     messageQueue.add(message);  
    7. }  

     真正执行消息向下传递是通过flush方法:

    Java代码  
    1. public void flush(NextFilter nextFilter, IoSession session) {  
    2.     Queue<Object> messageQueue = getMessageQueue();  
    3.     // 取出队列里面的消息向下传递  
    4.     while (!messageQueue.isEmpty()) {  
    5.         nextFilter.messageReceived(session, messageQueue.poll());  
    6.     }  
    7. }  

     编码过程

    看了上面的解码过程,编码过程就不难理解了,编码过程只不过是解码过程的逆向过程,同样在filterWrite方法里有消息类型的判断:

    Java代码  
    1. //消息如果已经是IoBuffer,就不需要再进行编码  
    2. if ((message instanceof IoBuffer) || (message instanceof FileRegion)) {  
    3.     nextFilter.filterWrite(session, writeRequest);  
    4.     return;  
    5. }  

     编码:

    Java代码  
    1. // 进行数据编码  
    2. encoder.encode(session, message, encoderOut);  

     此处编码实现可以参考TextLineEncoder的编码实现,比较简单,此处就不多做解释了。

    同样编码也是通过write到一个队列中,然后通过flush写入到后面的过滤器中的。

     

    二、Mina对粘包和断包的处理

     

    上面说了mina对编解码的支持,在解码过程中,不得不面对的一个问题就是TCP的粘包和断包,先说下什么是粘包和断包。

    TCP通讯是面向数据流的通讯,我们将数据流理解为一支竹竿,数据包就相当于竹竿中的每一节,那么我们的解码过程就相当于对竹竿进行分解的过程。竹竿就是多个数据包的“粘包”,断包就是指竹节中间断开,我们需要将它拼接成为一个完整的竹节,如果不能拼接起来就要废弃这部分。

    粘包:

     

     

    断包:

     

     

     

     

    对粘包的处理相对比较简单,只需要依据数据包的格式进行数据流的分割即可;对于断包的处理我们需要将断包的数据保存起来,等待接收下次的数据进行拼接。

    通常情况下我们要考虑粘包和断包同时出现的情况下的解码代码编写。有两种实现方式:

    1.继承CumulativeProtocolDecoder类,实现doDecode方法。

    2.实现ProtocolDecoder接口,自己解决粘包和断包的问题。

    先看下CumulativeProtocolDecoder的实现。

    它有一个成员变量BUFFER:

    Java代码  
    1. //存放断包数据  
    2. private final AttributeKey BUFFER = new AttributeKey(getClass(), "buffer");  

     doDecode方法一方面判断数据包是否符合解码要求(数据包可能过短,数据包格式不合要求都可能不能通过解码要求),不符合刚返回false;另一方面对于符合解码要求的数据进行数据解码,并返回true。可以参考ImageRequestDecoder类的实现。

     

    看下它的decode方法实现:

    Java代码  
    1. public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {  
    2.     if (!session.getTransportMetadata().hasFragmentation()) {  
    3.         while (in.hasRemaining()) {  
    4.             // 判断是否符合解码要求,不符合则中断并返回  
    5.             if (!doDecode(session, in, out)) {  
    6.                 break;  
    7.             }  
    8.         }  
    9.         return;  
    10.     }  
    11.   
    12.     boolean usingSessionBuffer = true;  
    13.     // 取得上次断包数据  
    14.     IoBuffer buf = (IoBuffer) session.getAttribute(BUFFER);  
    15.     // If we have a session buffer, append data to that; otherwise  
    16.     // use the buffer read from the network directly.  
    17.     if (buf != null) { // 如果有断包数据  
    18.         boolean appended = false;  
    19.         // Make sure that the buffer is auto-expanded.  
    20.         if (buf.isAutoExpand()) {  
    21.             try {  
    22.                 // 将断包数据和当前传入的数据进行拼接  
    23.                 buf.put(in);  
    24.                 appended = true;  
    25.             } catch (IllegalStateException e) {  
    26.                 // A user called derivation method (e.g. slice()),  
    27.                 // which disables auto-expansion of the parent buffer.  
    28.             } catch (IndexOutOfBoundsException e) {  
    29.                 // A user disabled auto-expansion.  
    30.             }  
    31.         }  
    32.   
    33.         if (appended) {  
    34.             buf.flip();// 如果是拼接的数据,将buf置为读模式  
    35.         } else {  
    36.             // Reallocate the buffer if append operation failed due to  
    37.             // derivation or disabled auto-expansion.  
    38.             //如果buf不是可自动扩展的buffer,刚通过数据拷贝的方式将断包数据和当前数据进行拼接  
    39.             buf.flip();  
    40.             IoBuffer newBuf = IoBuffer.allocate(buf.remaining() + in.remaining()).setAutoExpand(true);  
    41.             newBuf.order(buf.order());  
    42.             newBuf.put(buf);  
    43.             newBuf.put(in);  
    44.             newBuf.flip();  
    45.             buf = newBuf;  
    46.   
    47.             // Update the session attribute.  
    48.             session.setAttribute(BUFFER, buf);  
    49.         }  
    50.     } else {  
    51.         buf = in;  
    52.         usingSessionBuffer = false;  
    53.     }  
    54.   
    55.     for (;;) {  
    56.         int oldPos = buf.position();  
    57.         boolean decoded = doDecode(session, buf, out);// 进行数据的解码操作  
    58.         if (decoded) {  
    59.             // 如果符合解码要求并进行了解码操作,  
    60.             // 则当前position和解码前的position不可能一样  
    61.             if (buf.position() == oldPos) {  
    62.                 throw new IllegalStateException("doDecode() can't return true when buffer is not consumed.");  
    63.             }  
    64.             // 如果已经没有数据,则退出循环  
    65.             if (!buf.hasRemaining()) {  
    66.                 break;  
    67.             }  
    68.         } else {// 如果不符合解码要求,则退出循环  
    69.             break;  
    70.         }  
    71.     }  
    72.     // if there is any data left that cannot be decoded, we store  
    73.     // it in a buffer in the session and next time this decoder is  
    74.     // invoked the session buffer gets appended to  
    75.     if (buf.hasRemaining()) {  
    76.         if (usingSessionBuffer && buf.isAutoExpand()) {  
    77.             buf.compact();  
    78.         } else {  
    79.             //如果还有没处理完的数据(一般为断包),刚将此数据存入session中,以便和下次数据进行拼接。
    80.             storeRemainingInSession(buf, session);  
    81.         }  
    82.     } else {  
    83.         if (usingSessionBuffer) {  
    84.             removeSessionBuffer(session);  
    85.         }  
    86.     }  
    87. }  

     上面的处理过程可以这样理解:

     

    1.取得断包数据,如果有断包数据,就和当前数据拼接。

    2.进行数据解码操作。

    3.将可以进行解码操作的数据解码完成后,如果还有数据,则将剩余数据存入session中,等待下次数据到来,从步骤1开始再次执行。

    通过继承ProtocolDecoder,实现decode方法,自己处理粘包和断包的方式其实和CumulativeProtocolDecoder类的实现原理是类似的,此处实现可以参考类TextLineDecoder,内部类Context保存了上下文信息,同样是保存在了sesion中的,具体实现方式大家可以仔细阅读代码。

     

     

    三、总结

     

     

    基于TCP的通讯协议才有可能产生粘包和断包的情况,粘包和断包的产生有多种原因,处理好粘包和断包的问题是网络编程必然面对的情况,对于这块的处理,大家如果有什么好的想法可以一起讨论。

    每天进步一点点,不做无为的码农。。。。。

    转载于:https://www.cnblogs.com/burgeen/p/3618052.html

    展开全文
  • TCP是一个流协议,所谓流协议就是没有界限的一串数据,数据之间并没有分界线,它会根据TCP缓冲区的实际大小进行包的划分,那么业务上的一个完整包可能会被TCP拆成多个包进行发送,也有可能把多个小的包封装成一个...

    TCP是一个流协议,所谓流协议就是没有界限的一串数据,数据之间并没有分界线,它会根据TCP缓冲区的实际大小进行包的划分,那么业务上的一个完整包可能会被TCP拆成多个包进行发送,也有可能把多个小的包封装成一个大包进行发送,这时候就可能发生拆包与粘包。对于接收端来说,由于数据并没有界限,接收端不知道到底什么时候数据算是读取完,也只能根据缓冲区的大小来进行包的划分,此时接收端也可能会发生拆包与粘包。
    针对TCP流协议的特性,有三种常用的方法可解决拆包与粘包的问题。
    1、因为包与包之间没有界限才会导致无法判断是否读取到了一个完整的包,因此可在发送端和接收端定义一个相同的分隔符,发送端在发送数据的时候,包与包之间用该分隔符隔开,这样接收端读取到对应的分隔符就认为读取到了一个完整的包,就可以继续下一步操作了。
    2、消息定长,比如每个报文的大小固定为200字节,如果不够空位补齐。
    3、在数据中的某个位置使用一个字段,表示数据的长度,类似Http协议中的Content-Length表示消息正文的长度。
    发送端使用以上三种方法中的一种来使原本没界限的数据变的有规律可循,这个过程就是我们常说的编码。对应的在接收端,根据发送端数据的规律解析数据的过程叫解码。
    mina底层使用的协议就是TCP,TCP传输过程中的数据都是二进制数据,服务端和客户端程序中使用的都是Java对象,那么必然有一个将Java对象变成二进制数据和将二进制数据解析成Java对象的过程,这个过程中很可能就会出现粘包与拆包的问题。那么为了解决TCP通信过程中产生的粘包、拆包问题,需要在发送方就要让Java对象按照某种规律变成二进制数据,然后接收方根据同样的规律将二进制数据解析成Java对象,这两个过程分别对应编码和解码过程。
    为了实现在发送数据之前对数据按照某种规律进行编码,我们需要实现自己的编码器。在mina中实现编码器是一件简单且愉快的事情,我们只需实现ProtocolEncoder接口,但是有更简便的方法,直接继承ProtocolEncoderAdapter类,实现encode方法即可。
    同理,为了对接收到的数据按照约定的规律进行解码,我们需要实现自己的解码器。在mina中实现解码器只需实现ProtocolDecoder接口,更简洁的方法就是继承ProtocolDecoderAdapter类,实现decode方法即可。

    展开全文
  • (五)Mina源码解析之编解码

    千次阅读 2016-06-13 21:22:28
    Mina中的编解码器通过过滤器ProtocolCodecFilter构造,这个过滤器有3个构造器,其中可以分为两类,一类需要一个ProtoCodecFactory,这个接口有两个方法,分别是getDecoder()和getEncoder(),我们需要实现这两个方法...

    Mina中的编解码器通过过滤器ProtocolCodecFilter构造,这个过滤器有3个构造器,其中可以分为两类,一类需要一个ProtoCodecFactory,这个接口有两个方法,分别是getDecoder()和getEncoder(),我们需要实现这两个方法,实现这两个方法就需要一个自定义的Decoder和一个自定义的Encoder

    public class CmccSipcCodecFactory implements ProtocolCodecFactory{
        private final ProtocolEncoder encoder;
        private final ProtocolDecoder decoder;
        
        public CmccSipcCodecFactory(Charset charset) {
            this.decoder = new CmccSipcDecoder(charset);
            this.encoder = new CmccSipcEncoder(charset);
        }
        @Override
        public ProtocolDecoder getDecoder(IoSession session) throws Exception {
            return decoder;
        }
    
        @Override
        public ProtocolEncoder getEncoder(IoSession session) throws Exception {
            return encoder;
        }
    }
    
    其中CmccSipcDecoder和CmccSipcEncoder是我们自定义的编辑码器,编辑码器需要跟据我们的具体业务来实现,下面是一个编辑码器的例子,发送端和接收端按照约定好的格式将数据组装起来,比如下面的例子中我们是以@作为数据之间的分割符,解码的时候我们也要根据@来进行解码

    public class CmccSipcEncoder extends ProtocolEncoderAdapter{
    	
    	private final Charset charset;
    
    	public CmccSipcEncoder(Charset charset) {
    		super();
    		this.charset = charset;
    	}
    	public void encode(IoSession session, Object message,
    			ProtocolEncoderOutput out) throws Exception {
    		SmsObject sms =(SmsObject) message;
    		CharsetEncoder ce=charset.newEncoder();
    		IoBuffer buffer =IoBuffer.allocate(100).setAutoExpand(true);
    		String statusLine="M sip:wap.fetion.com.cn SIP-C/2.0";
    		String sender = sms.getSender();
    		String receiver = sms.getReceiver();
    		String smsContent = sms.getMessage();
    		buffer.putString(statusLine+"@",ce);
    		buffer.putString("S:"+sender+'@',ce);
    		buffer.putString("R:"+receiver+"@", ce);
    		buffer.putString("L:"+(smsContent.getBytes(charset).length)+"@", ce);
    		buffer.putString(smsContent, ce);
    		buffer.flip();
    		out.write(buffer);
    	}
    }
    我们继承了ProtocolEncoderAdapter,这样只实现我们需要的方法就可以了,解码器也是一样,关于解码器主要有两点需要注意

    第一点:我们不知道数据发送过来的规模是多大,所以我们也不知道应该接收多少次才可以将数据接收完整,Mina提供了一个CumulativeProtocolDecoder类,这个类的作用是只要有数据发送过来,它就会读取数据,然后累积到内部的IoBuffer缓冲区,这样我们只要负责拆包就可以了。下面是一个解码器的例子

    public class CmccSipcDecoder extends CumulativeProtocolDecoder{
    	
    	private final Charset charset;
    	
    	private AttributeKey CONTEXT = new AttributeKey(this.getClass(), "context");
    	
    	private class Context{
    		int i=1;
    		int matchCount=0;
    		String statusLine = "";
    		String sender ="";
    		String receiver = "";
    		String length ="";
    		String sms = "";
    		IoBuffer buffer = IoBuffer.allocate(100).setAutoExpand(true);
    		public int getI() {
    			return i;
    		}
    		public void setI(int i) {
    			this.i = i;
    		}
    		public int getMatchCount() {
    			return matchCount;
    		}
    		public void setMatchCount(int matchCount) {
    			this.matchCount = matchCount;
    		}
    		public IoBuffer getBuffer() {
    			return buffer;
    		}
    		public String getStatusLine() {
    			return statusLine;
    		}
    		public void setStatusLine(String statusLine) {
    			this.statusLine = statusLine;
    		}
    		public String getSender() {
    			return sender;
    		}
    		public void setSender(String sender) {
    			this.sender = sender;
    		}
    		public String getReceiver() {
    			return receiver;
    		}
    		public void setReceiver(String receiver) {
    			this.receiver = receiver;
    		}
    		public String getLength() {
    			return length;
    		}
    		public void setLength(String length) {
    			this.length = length;
    		}
    		public String getSms() {
    			return sms;
    		}
    		public void setSms(String sms) {
    			this.sms = sms;
    		}
    		
    		
    	}
    	public CmccSipcDecoder(Charset charset){
    		this.charset=charset;
    	}
    	@Override
    	protected boolean doDecode(IoSession session, IoBuffer in,
    			ProtocolDecoderOutput out) throws Exception {
    		System.err.println("********************************");
    		CharsetDecoder cd = charset.newDecoder();
    		if(session.getAttribute(CONTEXT)==null){
    			session.setAttribute(CONTEXT, new Context());
    		}
    		Context context = (Context)session.getAttribute(CONTEXT);
    		IoBuffer buffer = context.getBuffer();
    		while(in.hasRemaining()){
    			byte b = in.get();
    			buffer.put(b);
    			if(b==64 && context.getI()<5){
    				context.setMatchCount(context.getMatchCount()+1);
    				if(context.getI()==1){
    					buffer.flip();
    					context.setStatusLine(buffer.getString(context.getMatchCount(),cd));
    					context.setStatusLine(context.getStatusLine().substring(0, context.getStatusLine().length()-1));
    					context.setMatchCount(0);
    					buffer.clear();
    				}
    				if(context.getI()==2){
    					buffer.flip();
    					context.setSender(buffer.getString(context.getMatchCount(),cd));
    					context.setSender(context.getSender().substring(0,context.getSender().length()-1));
    					context.setMatchCount(0);
    					buffer.clear();
    				}
    				if(context.getI()==3){
    					buffer.flip();
    					context.setReceiver(buffer.getString(context.getMatchCount(),cd));
    					context.setReceiver(context.getReceiver().substring(0,context.getReceiver().length()));
    					context.setMatchCount(0);
    					buffer.clear();
    				}
    				if(context.getI()==4){
    					buffer.flip();
    					context.setLength(buffer.getString(context.getMatchCount(),cd));
    					context.setLength(context.getLength().substring(0, context.getLength().length()-1));
    					context.setMatchCount(0);
    				}
    				context.setI(context.getI()+1);
    			}else if(context.getI()==5){
    				context.setMatchCount(context.getMatchCount()+1);
    				if(context.getMatchCount() == Long.parseLong(context.getLength().split(":")[1])){
    					buffer.flip();
    					buffer.position(context.getLength().indexOf(":")+4);
    					context.setSms(buffer.getString(context.getMatchCount(),cd));
    					context.setI(context.getI()+1);
    					session.removeAttribute(CONTEXT);
    					break;
    				}
    			}else{
    				context.setMatchCount(context.getMatchCount()+1);
    			}
    		}
    		if(session.getAttribute(CONTEXT)!=null){
    			return true;
    		}
    		SmsObject smsObject=new SmsObject();
    		smsObject.setSender(context.getSender().split(":")[1]);
    		smsObject.setReceiver(context.getReceiver().split(":")[1]);
    		smsObject.setMessage(context.getSms());
    		out.write(smsObject);
    		return false;
    	}
    
    }
    那么doDecode方法和CumulativeProtocolDecoder类之间到底是怎么配合的呢?在创建编解码过滤器ProtocolCodecFilter的时候我们创建了一个ProtocolCodecFactory工厂类,该类中包含了我们自定义的编解码器,在创建ProtocolCodecFilter的时候ProtocolCodecFactory是作为构造参数传进去的,在收到客户端的消息时,会依次调用所有过滤器的messageReceived()方法,当然也会调用编解码过滤器ProtocolCodecFilter的messageReceived()方法,在messageReceived()方法中,会调用ProtoCodecFactory的getDecoder()方法获得解码器,也就是我们上面的解码器CmccSipcDecoder,拿到解码器后会调用该解码器的decode()方法,我们注意到在CmccSipcDecoder中并没有decode方法,decode是CmccSipcDecoder的父类中的一个方法,让我们看看这个方法是如何实现的

        public void decode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
            if (!session.getTransportMetadata().hasFragmentation()) {
                while (in.hasRemaining()) {
                    if (!doDecode(session, in, out)) {
                        break;
                    }
                }
    
                return;
            }
    一旦IoBuffer中有数据decode会不断的调用子类(也就是我们自定义的解码器)的doDecode方法,如果doDecode()方法返回true会不断的进行调用,知道IoBuffer中没有数据时或当子类doDecode()方法返回false时会跳出while循环,所以我们在实现doDecode方法时,如果我们判断已经解析了一条完整的数据但是在buffer中还有多余的数据就要返回true这样就可以继续解析下一条数据,如果我们判断接收的数据包不是一个完整的包就要返回false,这样在下次数据进来时会继续解析直到接收一个完整的包为止

    第二点:在解码的时候,假设一条完整数据的大小是100K,但是我们的IoBuffer的大小为10K,这个时候需要执行4次(IoBuffer会以2被的速度自动扩容)doDecode才能完整的接收一条数据,但是这样存在一个问题第1,2,3次接收的数据要存放到什么地方呢,存到方法里肯定是不行的,因为每次调用方法时都会新创建一个方法栈,用完即被销毁。最好的办法就是讲数据存放到Session中,上面的例子程序演示了数据是如何存放到session中的。

    搞清楚了这两个问题,编写编解码器就轻而易举了。

    下面的链接中有完整的例子程序

    http://download.csdn.net/detail/u010031673/9499864



    展开全文
  • Apache Mina自定义编解码案例

    万次阅读 2011-08-09 00:21:58
    Mina中已经自带的编解码类: TextLineCodecFactory:基于文本的,根据回车换行来断点传输数据 ProtocolCodecFactory:自定义协议的编解码数据传输 ObjectSerializationCodecFactory:对象序列化传输

    Mina中已经自带的编解码类:

    TextLineCodecFactory:基于文本的,根据回车换行来断点传输数据

    ProtocolCodecFactory:自定义协议的编解码数据传输

    ObjectSerializationCodecFactory:对象序列化传输

    DemuxingProtocolCodecFactory:复用传输


    自定义通信协议:

    FlightSearch 1.0 \n
    startcity:BJS \n
    endcity:PEK \n
    flightway:1 \n
    date:2011-08-10 \n


    Domain对象

    package domain;
    
    /**
     * @function : 
     * @author   :jy
     * @company  :万里网
     * @date     :2011-8-7
     */
    public class Flight {
    	public String startCity;
    	public String endCity;
    	public String flightway;
    	public String date;
    	public String fromDate;
    	public String subclass1;
    	public String flight1;
    	
    	/**
    	 * 返回出发城市
    	 * @return
    	 */
    	public String getStartCity() {
    		return startCity;
    	}
    	public void setStartCity(String startCity) {
    		this.startCity = startCity;
    	}
    	/**
    	 * 返回到达城市
    	 * @return
    	 */
    	public String getEndCity() {
    		return endCity;
    	}
    	public void setEndCity(String endCity) {
    		this.endCity = endCity;
    	}
    	/**
    	 * 返回行程类型
    	 * @return
    	 */
    	public String getFlightway() {
    		return flightway;
    	}
    	public void setFlightway(String flightway) {
    		this.flightway = flightway;
    	}
    	/**
    	 * 返回出发日期
    	 * @return
    	 */
    	public String getDate() {
    		return date;
    	}
    	public void setDate(String date) {
    		this.date = date;
    	}
    	@Override
    	public String toString() {
    		return "Flight [startCity=" + startCity + ", endCity=" + endCity + ", flightway=" + flightway + ", date="
    				+ date + "]";
    	}
    	/**
    	 * 返回往返日期
    	 * @return
    	 */
    	public String getFromDate() {
    		return fromDate;
    	}
    	public void setFromDate(String fromDate) {
    		this.fromDate = fromDate;
    	}
    	public String getFlight1() {
    		return flight1;
    	}
    	public void setFlight1(String flight1) {
    		this.flight1 = flight1;
    	}
    	public String getSubclass1() {
    		return subclass1;
    	}
    	public void setSubclass1(String subclass1) {
    		this.subclass1 = subclass1;
    	}
    }
    

    服务器端编码

    package server;
    
    import java.nio.charset.Charset;
    import java.nio.charset.CharsetEncoder;
    
    import org.apache.mina.core.buffer.IoBuffer;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
    import org.apache.mina.filter.codec.ProtocolEncoderOutput;
    
    /**
     * @function : 
     * @author   :jy
     * @company  :万里网
     * @date     :2011-8-7
     */
    public class FlightEncoder extends ProtocolEncoderAdapter {
    	private final Charset charset = Charset.forName("UTF-8");
    	/* 
    	 * 服务器端编码无需处理,直接将接收到的数据向下传递
    	 */
    	@Override
    	public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
    		IoBuffer buf = IoBuffer.allocate(100).setAutoExpand(true);
    		
    		CharsetEncoder ce = charset.newEncoder();
    		
    		buf.putString((String)message, ce);
    		
    		buf.flip();
    		
    		out.write(buf);
    	}
    
    }
    

    重点是服务器端解码

    package server;
    
    import java.nio.charset.Charset;
    import java.nio.charset.CharsetDecoder;
    
    import org.apache.mina.core.buffer.IoBuffer;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
    import org.apache.mina.filter.codec.ProtocolDecoderAdapter;
    import org.apache.mina.filter.codec.ProtocolDecoderOutput;
    
    import domain.Flight;
    
    /**
     * @function : 
     * @author   :jy
     * @company  :万里网
     * @date     :2011-8-7
     */
    public class FlightDecoder extends CumulativeProtocolDecoder {
    
    	@Override
    	protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
    		IoBuffer buf = IoBuffer.allocate(100).setAutoExpand(true);
    		CharsetDecoder cd = Charset.forName("UTF-8").newDecoder();
    		
    		int ColumnNumber = 0;
    		String status="",startCity="",endCity="",flightway="",date="";
    		
    		int TextLineNumber = 1;
    		
    		Flight flight = new Flight();
    
    		/**
    		 * FlightSearch 1.0 \n
    		 * startcity:BJS \n
    		 * endcity:PEK \n
    		 * flightway:1 \n
    		 * date:2011-08-10 \n
    		 */
    		while(in.hasRemaining()){
    			byte b = in.get();
    			buf.put(b);
    			if(b == 10 && TextLineNumber <= 5){
    				ColumnNumber++;
    				if(TextLineNumber == 1){
    					buf.flip();
    					status = buf.getString(ColumnNumber, cd);
    				}
    				
    				if(TextLineNumber == 2){
    					buf.flip();
    					startCity = buf.getString(ColumnNumber, cd).split(":")[1];
    					startCity = startCity.substring(0, startCity.length()-1);
    					flight.setStartCity(startCity);
    				}
    				
    				if(TextLineNumber == 3){
    					buf.flip();
    					endCity = buf.getString(ColumnNumber, cd).split(":")[1];
    					endCity = endCity.substring(0, endCity.length()-1);
    					flight.setEndCity(endCity);
    				}
    				
    				if(TextLineNumber == 4){
    					buf.flip();
    					flightway = buf.getString(ColumnNumber, cd).split(":")[1];
    					flightway = flightway.substring(0, flightway.length()-1);
    					flight.setFlightway(flightway);
    				}
    				
    				if(TextLineNumber == 5){
    					buf.flip();
    					date = buf.getString(ColumnNumber, cd).split(":")[1];
    					date = date.substring(0, date.length()-1);
    					flight.setDate(date);
    					break;
    				}
    				
    				ColumnNumber = 0;
    				buf.clear();
    				TextLineNumber++;
    			}else{
    				ColumnNumber++;
    			}
    		}
    		out.write(flight);
    		return false;
    	}
    
    }
    

    服务器端编解码工厂

    package server;
    
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.codec.ProtocolCodecFactory;
    import org.apache.mina.filter.codec.ProtocolDecoder;
    import org.apache.mina.filter.codec.ProtocolEncoder;
    
    /**
     * @function : 
     * @author   :jy
     * @company  :万里网
     * @date     :2011-8-7
     */
    public class FlightCodecFactory implements ProtocolCodecFactory {
    	private final ProtocolEncoder encoder = new FlightEncoder();
    	private final ProtocolDecoder decoder = new FlightDecoder();
    
    	@Override
    	public ProtocolDecoder getDecoder(IoSession session) throws Exception {
    		return decoder;
    	}
    
    	@Override
    	public ProtocolEncoder getEncoder(IoSession session) throws Exception {
    		return encoder;
    	}
    
    }
    



    下面是客户端的编解码

    重点是编码,需要将数据组装成协议格式,发送给服务器

    package client;
    
    import java.nio.charset.Charset;
    import java.nio.charset.CharsetEncoder;
    
    import org.apache.mina.core.buffer.IoBuffer;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.codec.ProtocolEncoderAdapter;
    import org.apache.mina.filter.codec.ProtocolEncoderOutput;
    
    import domain.Flight;
    
    
    /**
     * @function : 
     * @author   :jy
     * @company  :万里网
     * @date     :2011-8-7
     */
    public class FlightClientEncoder extends ProtocolEncoderAdapter {
    	private final Charset charset;
    	
    	public FlightClientEncoder(){
    		this.charset = Charset.forName("UTF-8");
    	}
    
    	@Override
    	public void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception {
    		Flight flight = (Flight)message;
    		
    		IoBuffer buf = IoBuffer.allocate(100).setAutoExpand(true);
    		
    		CharsetEncoder ce = charset.newEncoder();
    		
    		buf.putString("Flight Search 1.0" + '\n', ce);
    		
    		buf.putString("startcty:" + flight.getStartCity() + '\n', ce);
    		
    		buf.putString("endcity:" + flight.getEndCity() + '\n', ce);
    		
    		buf.putString("flightway:" + flight.getFlightway() + '\n', ce);
    		
    		buf.putString("date:" + flight.getDate() + '\n', ce);
    		
    		buf.flip();
    		
    		out.write(buf);
    	}
    
    }
    


    解码无需特殊处理,接收完数据直接向下传递

    package client;
    
    import java.nio.charset.Charset;
    import java.nio.charset.CharsetDecoder;
    
    import org.apache.mina.core.buffer.IoBuffer;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
    import org.apache.mina.filter.codec.ProtocolDecoderOutput;
    
    /**
     * @function : 
     * @author   :jy
     * @company  :万里网
     * @date     :2011-8-7
     */
    public class FlightClientDecoder extends CumulativeProtocolDecoder {
    
    	/* (non-Javadoc)
    	 * @see org.apache.mina.filter.codec.ProtocolDecoder#decode(org.apache.mina.core.session.IoSession, org.apache.mina.core.buffer.IoBuffer, org.apache.mina.filter.codec.ProtocolDecoderOutput)
    	 */
    	@Override
    	protected boolean doDecode(IoSession session, IoBuffer in, ProtocolDecoderOutput out) throws Exception {
    		CharsetDecoder cd = Charset.forName("UTF-8").newDecoder();
    		IoBuffer buf = IoBuffer.allocate(100).setAutoExpand(true);
    		
    		while(in.hasRemaining()){
    			buf.put(in.get());
    		}
    		buf.flip();
    		out.write(buf.getString(cd));
    		return false;
    	}
    
    }
    

    客户端编解码工厂

    package client;
    
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.filter.codec.ProtocolCodecFactory;
    import org.apache.mina.filter.codec.ProtocolDecoder;
    import org.apache.mina.filter.codec.ProtocolEncoder;
    
    /**
     * @function : 
     * @author   :jy
     * @company  :万里网
     * @date     :2011-8-7
     */
    public class FlightClientCodecFactory implements ProtocolCodecFactory {
    	private final ProtocolEncoder encoder = new FlightClientEncoder();
    	private final ProtocolDecoder decoder = new FlightClientDecoder();
    	
    	/* (non-Javadoc)
    	 * @see org.apache.mina.filter.codec.ProtocolCodecFactory#getDecoder(org.apache.mina.core.session.IoSession)
    	 */
    	@Override
    	public ProtocolDecoder getDecoder(IoSession arg0) throws Exception {
    		return decoder;
    	}
    
    	/* (non-Javadoc)
    	 * @see org.apache.mina.filter.codec.ProtocolCodecFactory#getEncoder(org.apache.mina.core.session.IoSession)
    	 */
    	@Override
    	public ProtocolEncoder getEncoder(IoSession arg0) throws Exception {
    		return encoder;
    	}
    
    }
    


    展开全文
  • 一、编码器:将要发送的数据转化成byte[] 进行传输 ...import org.apache.mina.core.buffer.IoBuffer; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolEncoderAdap
  • 1、深入解析Apache Mina源码(1)——Mina的过滤器机制...4、深入解析Apache Mina源码(4)——Mina编解码以及对粘包和断包的处理   一、Mina对编解码的支持   我们知道网络通讯过程实际是对二进制数据进行处理的过程
  • 自定义Mina编解码

    千次阅读 2014-03-29 11:20:08
    协议编解码器是在使用Mina 的时候最需要关注的对象,因为网络传输的数据都是二进制数据(byte),而在程序中面向的是JAVA 对象,这就需要在发送数据时将JAVA 对象编码二进制数据,接收数据时将二进制数据解码为JAVA ...
  • Mina 可以帮助我们快速开发高性能、高扩展性的网络通信应用,Mina 提供了事件驱动、异步(Mina 的异步IO 默认使用的是JAVA NIO 作为底层支持)操作的编程模型。  断包、粘包的问题,是Mina基于TCP协议栈通信的问题...
  • MINA 中的 协议编解码

    千次阅读 2014-10-22 21:41:10
    前面说过,协议编解码器是在使用Mina 的时候你最需要关注的对象,因为在网络传输的数据都是二进制数据(byte),而你在程序中面向的是JAVA 对象,这就需要你实现在发送数据时将JAVA 对象编码二进制数据,而接收数据...
  • Mina自定义编解码

    2019-12-02 16:27:16
    本文摘自《Mina用户指南》Chapter 9 - Codec Filter,但是用户指南中客户端没有完整版代码,在这里我简单的补上了。 实现内容: client端向server端发送请求(请求图片),server端向client端传输请求的图片。 第一...
  • MINA 编解码器实例

    2017-05-21 12:27:33
  • Mina实现传输对象的编解码

    千次阅读 2013-04-25 00:59:15
    前言:协议编解码器是在使用Mina 的时候你最需要关注的对象,因为在网络传输的数据都是二进制数据(byte),而你在程序中面向的是JAVA 对象,这就需要你实现在发送数据时将JAVA 对象编码二进制数据,而接收数据时将...
  • mina自定义编解码

    2017-07-24 09:48:47
    Mina自定义编解码 协议编解码器是在使用Mina 的时候你最需要关注的对象,因为在网络传输的数据都是二进制数据(byte),而你在程序中面向的是JAVA 对象,这就需要你实现在发送数据时将JAVA 对象编码二进制...
  • Mina编解码工厂

    2019-06-13 06:18:21
    为什么80%的码农都做不了架构师?>>> ...
  • 本章对 MINA 的协议编解码过滤器进行了深入讨论,先后介绍了协议编解码过滤器的使用原因和如何使用。最后用一个 demo 演示了协议编解码过滤器的具体使用。
  • 本文解释一下为什么以及如何使用一个 ProtocolCodecFilter。  为什么要使用一个 ProtocolCodecFilter?...TCP 担保以正确的顺序交付所有数据包。但是没有担保对于在发送端写操作时影响到接收端的读事件。...
  • Mina自定义编解码

    2019-08-02 12:18:56
    协议编解码器是在使用Mina 的时候你最需要关注的对象,因为在网络传输的数据都是二进制数据(byte),而你在程序中面向的是JAVA 对象,这就需要你实现在发送数据时将JAVA 对象编码二进制数据,而接收数据时将二进制...
  • 我们看了协议编解码过滤器,其中涉及到协议编解码工厂、协议编码器、和协议解码器, 由于篇幅原因,我们只看了协议编解码工厂、协议编码器,先来回顾一下: 协议编解码过滤器ProtocolCodecFilter关联一个协议...
  • package lhy.charest; import java.nio.ByteOrder; import java.nio.charset.Charset; import lhy.client_domain.MsgPack;...import org.apache.mina.core.buffer.IoBuffer; import org.apache.m
  • Mina快速编码测试验证示例

    千次阅读 2016-06-01 12:12:51
    此文借鉴于Apache Mina 官网,Apache Mina快速入门指南(Quick start guide) NIO服务端接收器 package com.boonya.mina.quickstart; import java.io.IOException; import java.net.InetSocketAddress; import java....
1 2 3 4 5 ... 20
收藏数 1,996
精华内容 798
关键字:

mina 编解码