精华内容
下载资源
问答
  • 基于netty即时通信im-任性聊

    千次阅读 2018-11-27 14:07:06
    基于springboot+netty+redis+mysql的移动端im即时通信聊天,样式参考layerIm, 源码地址:源码地址

    基于springboot+netty+redis+mysql的移动端im即时通信聊天,样式参考layerIm,
    在这里插入图片描述

    源码地址:源码地址
    演示地址:演示地址

    展开全文
  • Netty实战 IM即时通讯系统(八)服务端和客户端通信协议编解码 零、 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 客户端启动流程 实战: 客户端和服务端双向通信 数据传输载体ByteBuf介绍 客户端...

    Netty实战 IM即时通讯系统(八)服务端和客户端通信协议编解码

    零、 目录

    1. IM系统简介
    • Netty 简介
    • Netty 环境配置
    • 服务端启动流程
    • 客户端启动流程
    • 实战: 客户端和服务端双向通信
    • 数据传输载体ByteBuf介绍
    • 客户端与服务端通信协议编解码
    • 实现客户端登录
    • 实现客户端与服务端收发消息
    • pipeline与channelHandler
    • 构建客户端与服务端pipeline
    • 拆包粘包理论与解决方案
    • channelHandler的生命周期
    • 使用channelHandler的热插拔实现客户端身份校验
    • 客户端互聊原理与实现
    • 群聊的发起与通知
    • 群聊的成员管理(加入与退出,获取成员列表)
    • 群聊消息的收发及Netty性能优化
    • 心跳与空闲检测
    • 总结
    • 扩展

    八、 服务端和客户端通信协议编解码

    1. 上一小节我们学习了ByteBuf 的API , 这一小节我们拉学习如何设计并实现客户端与服务端的通信协议
    2. 什么是服务端与客户端的通信协议?
      1. 无论是Netty 还是原始的Socket编程 , 基于TCP通信的数据包格式均为二进制 , 协议指的就是客户端和服务端事先商量好的 , 每一个二进制数据包中每一段字节分别表示什么含义的规则 , 如下图的一个简单的登录指令
        1. 这个数据包中 , 第一个字节 为1 表示这是一个登录指令 , 接下来是用户名和密码 , 这两个值以\0 分割 , 客户端发送这段二进制数据包到服务端 , 服务端就能根据这个协议取出来用户名密码 , 进行登录逻辑 , 实际的通信协议设计中 , 我们会考虑更多细节 , 比这个稍微复杂一点 。
      2. 那么协议设计好之后 , 客户端和服务端之间的通信过程又是怎样的呢?
        1. 如上图所示 , 客户端和服务端通信:
          1. 首先 , 客户端把一个java对象按照通信协议转换成二进制数据包
          2. 然后通过网络 , 把这段二进制数据包发送到服务端 , 数据的传输过程由TCP/IP协议负责数据的传输 , 和我们应用层无关
          3. 服务端接收到数据之后 , 按照协议取出二进制数据包中的相应的字段 , 包装成java对象 , 交给应用逻辑处理
          4. 服务端处理完之后, 如果需要突出相应给客户端 , 那么按照相同的逻辑进行。
      3. 在本系列的第一小节中我们一进列出了实现一个支持单聊和群聊的IM指令集合 , 我们设计协议的目的就是为了客户端与服务端能够识别出这些具体的指令。
    3. 通信协议的设计
        1. 首先第一个数是 魔数 , 通常情况下为固定的几个字节 (我们这里规定4个字节) , 为什么需要这个字段 , 而且还是一个固定的数? 假设我们在服务器上开了一个端口 , 比如 80 端口 , 如果没有这个魔数 , 任何数据包传递到服务器 , 服务器都会根据自定义的协议进行处理 , 包括不符合自定义协议规范的数据包 。 例如: 我们直接通过http://IP:port来访问服务器 , 服务器收到的是一个标准的Http协议数据包 , 但是他仍然会按照事先约定好的协议来处理HTTP协议 , 显然这时会解析出错的 , 而有了这个魔数之后 , 服务器首先取出前四个字节进行比对 , 能够在第一时间识别出这个数据包并非是遵循自定义协议的 , 也就是说无效的数据包 , 为了安全考虑 , 可以直接关闭连接以节省资源 。 在java的二进制文件中 , 开头的4个字节为0xcafebabe 用来表示这是一个字节码文件 , 也是异曲同工之妙 。
        2. 接下来一个字节是版本号 , 通常情况下是预留字段 , 用于协议升级的时候用到 , 有点类似TCP/IP 协议中的一个字段表示是IPV4还是IPV6 , 大多数情况下 , 这个字段是用不到的 , 不过为了协议能够支持升级 , 我们留着 。
        3. 第三部分 ,序列化算法表示如何把java对象转换为二进制数据以及二进制数据转换会java对象 , 比如java自带的序列化 , json 、 hessian 等序列化方式。
        4. 第四部分 表示 指令 , 关于指令的介绍 , 我们在前面已经讨论过 , 服务端或者客户端每收到一种指令都会有相应的处理逻辑 , 这里我们用一个字节来表示 , 最高支持256中指令 , 对于我们的IM 系统来说 完全够用了
        5. 接下来 的字段为数据部分的长度 , 占四个字节
        6. 最后一个部分为数据部分 , 每一种指令对应的数据是不一样的 , 比如登录的时候需要用户名密码 , 收消息的时候需要用户标识和具体的消息内容
        7. 通常情况下 ,这样一套标准的协议能够适配大多数情况下的服务端与客户端的通信场景 , 接下来我们就来看一下 如何使用Netty 来实现这套协议
    4. 通信协议的实现
      1. 我们把java对象根据协议封装成二进制数据包的过程称为编码 , 而把从二进制数据包中解析出就java对象的过程称为解码 。 在学习如何使用Netty 进行通信协议编解码之前 , 我们先来定义一下客户端和服务端通信的java 对象 。

        1. java 对象

          /**
           *  数据包对象
           *  @author outman
           * */
          @Data
          abstract class Packet{
          	/**
          	 * 协议版本
          	 * */
          	private Byte version = 1;
          	
          	/**
          	 * 获取指令
          	 * */
          	public abstract Byte getCommand();
          	
          	/**
          	 * 指令集合内部接口
          	 * */
          	interface Command{
          		public static final Byte LOGIN_REQUEST = 1;
          	}
          }
          
          1. 以上是通信过程中 java对象的抽象类 , 可以看到我们定义了一个版本号(默认值为1) 以及一个获取指令的抽象方法 , 所有的指令数据包都必须实现这个方法 , 这样我们就可以知道某种指令的含义

          2. @Data 注解由lombok 提供 , 他会自动帮我们产生getter/setter 方法 , 减少大量的重复代码 , 需要添加依赖

             <dependency>
             	<groupId>org.projectlombok</groupId>
             	<artifactId>lombok</artifactId>
             	<version>1.16.18</version>
             	<scope>provided</scope>
             </dependency>
            
          3. 接下来 , 我们一客户登录请求为例 , 定义登录请求数据包 :

             @Data
             class LoginRequestPacket extends Packet{
             	
             	private Integer uerId ;
             	
             	private String userName;
             	
             	private String password;
             	
             	@Override
             	public Byte getCommand() {
             		return Command.LOGIN_REQUEST;
             	}
             	
             }
            
            1. 登录请求数据包 继承自Packet 然后定义了三个字段 , 分别是用户ID , 用户名 、密码 , 这里最为重要的是覆盖了父类的getCommand() 方法 值为常量Command.LOGIN_REQUEST
          4. java对象定义完成之后 , 接下来我们就要定义一种规则 , 如何把一个java对象转换成二进制数据 , 这个规则叫做java对象的序列化

        2. 序列化

          1. 我们如下定义序列化接口

             /**
              * 序列化接口
              * @author outman
              * */
             interface Serializer{
             	/**
             	 * 序列化算法
             	 * */
             	byte getSerializerAlgorithm();
             	
             	/**
             	 * java 对象转换成二进制   (序列化) 
             	 * */
             	byte[] serialize(Object obj);
             	
             	/**
             	 * 二进制转换为java对象 (反序列化)
             	 * */
             	<T> T deSerialize(Class<T> clazz , byte[] bytes);
             	
             	/**
             	 * 序列化算法标识集合接口
             	 * */
             	interface SerializerAlgorithm{
             		public static final byte JSON = 1;
             	}
             }
            
            1. 序列化接口有三个方法: getSerializerAlgorithm() 获取具体的序列化算法标识 , serialize() 将java对象转换为字节数组 , deSerialize()将字节数组转转为对应类型的java对象 , 在本小节中 , 我们使用最简单的json序列化方式 , 使用阿里巴巴的fastJson作为序列化框架; 接口中还有一个内部接口 , 用于让我们定义序列化算法标识的集合
              1. fastjson 依赖

                 <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
                 <dependency>
                 	<groupId>com.alibaba</groupId>
                 	<artifactId>fastjson</artifactId>
                 	<version>1.2.54</version>
                 </dependency>
                
              2. Json序列化实现类

                 /**
                  * JSON 序列化实现类
                  * @author outman
                  * */
                 class JSONSerializer implements Serializer{
                 
                 	@Override
                 	public byte getSerializerAlgorithm() {
                 		return SerializerAlgorithm.JSON;
                 	}
                 
                 	@Override
                 	public byte[] serialize(Object obj) {
                 		return JSONObject.toJSONBytes(obj);
                 	}
                 
                 	@Override
                 	public <T> T deSerialize(Class<T> clazz, byte[] bytes) {
                 		return JSONObject.parseObject(bytes, clazz);
                 	}
                 	
                 }
                
              3. 我们设置 Srializer 的默认序列化方式为JSONSerializer

                 /**
                  * 序列化接口
                  * @author outman
                  * */
                 interface Serializer{
                 	
                 	/**
                 	 * 默认的序列化对象
                 	 * */
                 	Serializer DEFAULT = new JSONSerializer();
                 	
                 	/**
                 	 * 序列化算法
                 	 * */
                 	byte getSerializerAlgorithm();
                 	
                 	/**
                 	 * java 对象转换成二进制   (序列化) 
                 	 * */
                 	byte[] serialize(Object obj);
                 	
                 	/**
                 	 * 二进制转换为java对象 (反序列化)
                 	 * */
                 	<T> T deSerialize(Class<T> clazz , byte[] bytes);
                 	
                 	/**
                 	 * 序列化算法标识集合接口
                 	 * */
                 	interface SerializerAlgorithm{
                 		public static final byte JSON = 1;
                 	}
                 }
                
          2. 这样我们就是实现了序列化相关的逻辑 , 如果想要实现其他的序列化算法的话 , 只需要实现一下Serializer接口 , 然后定义一下 序列化算法标识 就好啦。

        3. 编码: 封装成二进制数据的过程

           /**
            * 数据包编解码类
            * @author outman
            * */
           class PacketCodec{
           	// 魔数
           	private static final int MAGIC_NUMBER = 0x12345678;
           	
           	public ByteBuf enCode(Packet packet) {
           		// 1. 创建ByteBuf 对象
           		ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer();
           		// 2. 序列化java对象
           		byte[] bs = Serializer.DEFAULT.serialize(packet);
           		// 3. 实际编码过程
           		byteBuf.writeInt(MAGIC_NUMBER); // 写入魔数
           		byteBuf.writeInt(packet.getVersion());  // 写入协议版本号
           		byteBuf.writeInt(Serializer.DEFAULT.getSerializerAlgorithm()); // 写入序列化算法
           		byteBuf.writeByte(packet.getCommand()); // 写入指令
           		byteBuf.writeInt(bs.length); // 写入数据长度
           		byteBuf.writeBytes(bs); // 写入数据
           		return byteBuf;
           	}
           }
          
          1. 编码过程分为三个过程:
            1. 首先我们创建一个ByteBuf , 这里我们调用Netty 的ByteBuf分配器来创建 , ioBuffrer() 方法会返回适配io读写相关的内存 , 他尽可能创建一个直接内存 ,直接内存可以理解为不受jvm 对内存法管理 , 写到Io 缓冲区的效果更高
            2. 接下来我们把java 对象序列化成二进制数据包
            3. 最后我们对照本小节开头的协议的设计以及上一小节ByeBuf 的API , 逐个往bytebuf 中写入字段 , 及实现了编码过程
          2. 一端实现了编码 。 Netty 会将次ByteBuf 写到另外一端 , 另外一端拿到的也是一个ByteBuf 对象, 基于这个ByteBuf 对象 , 就可以反解出对端创建的java对象 , 这个过程我们称之为解码
        4. 解码: 解析java对象的过程

          1. 还是刚刚的PacketCodec.java 中添加deCode(buf) 、 getRequestPacket(byte command) 、 getSerializer(byte serializeAlgorithm)方法

             /**
              * 数据包编解码类
              * 
              * @author outman
              */
             class PacketCodec {
             	// 魔数
             	private static final int MAGIC_NUMBER = 0x12345678;
             	// 指令 与 数据包 映射
             	private final Map<Byte, Class<? super Packet>> packetTypeMap;
             	// 序列化算法 与 序列化类 映射
             	private final Map<Byte, Class<? super Serializer>> serializerMap;
             	// 单例
             	public static final PacketCodec INSTANCE = new PacketCodec();
             	// 注册Packet集合
             	List<Class> packetList = Arrays.asList(new Class[] { LoginRequestPacket.class });
             	// 注册序列化算法集合
             	List<Class> serializerList = Arrays.asList(new Class[] { JSONSerializer.class });
             
             	/**
             	 * 初始化 指令 与 数据包 映射 序列化算法 与 序列化类 映射
             	 */
             	private PacketCodec() {
             		// 初始化 指令 与 数据包 映射
             		packetTypeMap = new HashMap<Byte, Class<? super Packet>>();
             		packetList.forEach(clazz -> {
             			try {
             
             				Method method = clazz.getMethod("getCommand");
             				Byte command = (Byte) method.invoke(clazz);
             				packetTypeMap.put(command, clazz);
             
             			} catch (Exception e) {
             				// TODO Auto-generated catch block
             				e.printStackTrace();
             			}
             		});
             
             		// 初始化序列化算法 与 序列化类 映射
             		serializerMap = new HashMap<Byte, Class<? super Serializer>>();
             		serializerList.forEach(clazz -> {
             			try {
             
             				Method method = clazz.getMethod("getSerializerAlgorithm");
             				Byte serializerAlgorithm = (Byte) method.invoke(clazz);
             				serializerMap.put(serializerAlgorithm, clazz);
             			} catch (Exception e) {
             				// TODO Auto-generated catch block
             				e.printStackTrace();
             			}
             		});
             	}
             
             	// 编码
             	public ByteBuf enCode(Packet packet) {
             		// 1. 创建ByteBuf 对象
             		ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer();
             		// 2. 序列化java对象
             		byte[] bs = Serializer.DEFAULT.serialize(packet);
             		// 3. 实际编码过程
             		byteBuf.writeInt(MAGIC_NUMBER); // 写入魔数
             		byteBuf.writeByte(packet.getVersion()); // 写入协议版本号
             		byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlgorithm()); // 写入序列化算法
             		byteBuf.writeByte(packet.getCommand()); // 写入指令
             		byteBuf.writeInt(bs.length); // 写入数据长度
             		byteBuf.writeBytes(bs); // 写入数据
             		return byteBuf;
             	}
             
             	// 解码
             	public Packet deCode(ByteBuf byteBuf) throws Exception {
             		// 跳过 魔数 校验
             		byteBuf.skipBytes(4);
             
             		// 跳过版本号
             		byteBuf.skipBytes(1);
             
             		// 序列化算法标识
             		byte serializeAlgorithm = byteBuf.readByte();
             
             		// 指令标识
             		byte command = byteBuf.readByte();
             
             		// 数据包长度
             		int length = byteBuf.readInt();
             
             		// 数据
             		byte[] bs = new byte[length];
             		byteBuf.readBytes(bs);
             
             		// 通过序列化算法标识获取对应的 序列化对象
             		Serializer serializer = getSerializer(serializeAlgorithm);
             
             		// 通过指令标识 获取对应的 数据包类
             		Packet packet = getRequestPacket(command);
             
             		// 执行解码
             		if (serializer != null && packet != null) {
             			return serializer.deSerialize(packet.getClass(), bs);
             		} else {
             			System.out.println("没有找到对应的序列化对象或数据包对象");
             			return null;
             		}
             
             	}
             
             	// 通过指令获取对应的 数据包类  
             	private Packet getRequestPacket(byte command) throws Exception {
             
             		return (Packet) packetTypeMap.get(command).newInstance();
             	}
             
             	// 通过序列化算法标识 获取对应的序列化类
             	private Serializer getSerializer(byte serializeAlgorithm) throws Exception {
             
             		return (Serializer) serializerMap.get(serializeAlgorithm).newInstance();
             	}
             
             }
            
            1. 解码的流程如下
              1. 我们假定deCode方法传递进来的ByteBuf已经合法(后面的小节我们会实现校验) 即首部4个字节是我们前面定义的魔数 , 这里我们跳过
              2. 这里我们暂时不关注协议版本 , 通常我们没有遇到协议升级的时候 , 这个字段暂不处理 , 因为你会发现 , 在大多数情况下 , 这个字段几乎用不着 , 但是我们仍然保留
              3. 接下来我们拿到 序列化算法标识 、 指令标识 、 数据长度 、 数据
              4. 最后我们根据拿到的数据长度取出数据 , 通过指令标识拿到对应的java对象 , 根据序列化算法标识 拿到序列化对象 , 将字节码转换为java对象
      2. 完整代码

        package com.tj.NIO_test_maven;
        
        import java.lang.reflect.Method;
        
        import java.util.Arrays;
        import java.util.HashMap;
        import java.util.List;
        import java.util.Map;
        
        import com.alibaba.fastjson.JSONObject;
        
        import io.netty.buffer.ByteBuf;
        import io.netty.buffer.ByteBufAllocator;
        import lombok.Data;
        
        /**
         * @author outman
         */
        public class Test_09_客戶端与服务端通信协议编解码 {
        	public static void main(String[] args) {
        
        	}
        
        }
        
        /**
         * 数据包对象
         * 
         * @author outman
         */
        @Data
        abstract class Packet {
        	/**
        	 * 协议版本
        	 */
        	private Byte version = 1;
        
        	/**
        	 * 获取指令
        	 */
        	public abstract Byte getCommand();
        
        	/**
        	 * 指令集合内部接口
        	 */
        	interface Command {
        		public static final Byte LOGIN_REQUEST = 1;
        	}
        }
        
        /**
         * 登录请求数据包
         * 
         * @author outman
         */
        @Data
        class LoginRequestPacket extends Packet {
        
        	private Integer uerId;
        
        	private String userName;
        
        	private String password;
        
        	@Override
        	public Byte getCommand() {
        		return Command.LOGIN_REQUEST;
        	}
        
        }
        
        /**
         * 序列化接口
         * 
         * @author outman
         */
        interface Serializer {
        
        	/**
        	 * 默认的序列化对对象
        	 */
        	Serializer DEFAULT = new JSONSerializer();
        
        	/**
        	 * 序列化算法
        	 */
        	byte getSerializerAlgorithm();
        
        	/**
        	 * java 对象转换成二进制 (序列化)
        	 */
        	byte[] serialize(Object obj);
        
        	/**
        	 * 二进制转换为java对象 (反序列化)
        	 */
        	<T> T deSerialize(Class<T> clazz, byte[] bytes);
        
        	/**
        	 * 序列化算法标识集合接口
        	 */
        	interface SerializerAlgorithm {
        		public static final byte JSON = 1;
        	}
        }
        
        /**
         * JSON 序列化实现类
         * 
         * @author outman
         */
        class JSONSerializer implements Serializer {
        
        	@Override
        	public byte getSerializerAlgorithm() {
        		return SerializerAlgorithm.JSON;
        	}
        
        	@Override
        	public byte[] serialize(Object obj) {
        		return JSONObject.toJSONBytes(obj);
        	}
        
        	@Override
        	public <T> T deSerialize(Class<T> clazz, byte[] bytes) {
        		return JSONObject.parseObject(bytes, clazz);
        	}
        
        }
        
        /**
         * 数据包编解码类
         * 
         * @author outman
         */
        class PacketCodec {
        	// 魔数
        	private static final int MAGIC_NUMBER = 0x12345678;
        	// 指令 与 数据包 映射
        	private final Map<Byte, Class<? super Packet>> packetTypeMap;
        	// 序列化算法 与 序列化类 映射
        	private final Map<Byte, Class<? super Serializer>> serializerMap;
        	// 单例
        	public static final PacketCodec INSTANCE = new PacketCodec();
        	// 注册Packet集合
        	List<Class> packetList = Arrays.asList(new Class[] { LoginRequestPacket.class });
        	// 注册序列化算法集合
        	List<Class> serializerList = Arrays.asList(new Class[] { JSONSerializer.class });
        
        	/**
        	 * 初始化 指令 与 数据包 映射 序列化算法 与 序列化类 映射
        	 */
        	private PacketCodec() {
        		// 初始化 指令 与 数据包 映射
        		packetTypeMap = new HashMap<Byte, Class<? super Packet>>();
        		packetList.forEach(clazz -> {
        			try {
        
        				Method method = clazz.getMethod("getCommand");
        				Byte command = (Byte) method.invoke(clazz);
        				packetTypeMap.put(command, clazz);
        
        			} catch (Exception e) {
        				// TODO Auto-generated catch block
        				e.printStackTrace();
        			}
        		});
        
        		// 初始化序列化算法 与 序列化类 映射
        		serializerMap = new HashMap<Byte, Class<? super Serializer>>();
        		serializerList.forEach(clazz -> {
        			try {
        
        				Method method = clazz.getMethod("getSerializerAlgorithm");
        				Byte serializerAlgorithm = (Byte) method.invoke(clazz);
        				serializerMap.put(serializerAlgorithm, clazz);
        			} catch (Exception e) {
        				// TODO Auto-generated catch block
        				e.printStackTrace();
        			}
        		});
        	}
        
        	// 编码
        	public ByteBuf enCode(Packet packet) {
        		// 1. 创建ByteBuf 对象
        		ByteBuf byteBuf = ByteBufAllocator.DEFAULT.ioBuffer();
        		// 2. 序列化java对象
        		byte[] bs = Serializer.DEFAULT.serialize(packet);
        		// 3. 实际编码过程
        		byteBuf.writeInt(MAGIC_NUMBER); // 写入魔数
        		byteBuf.writeByte(packet.getVersion()); // 写入协议版本号
        		byteBuf.writeByte(Serializer.DEFAULT.getSerializerAlgorithm()); // 写入序列化算法
        		byteBuf.writeByte(packet.getCommand()); // 写入指令
        		byteBuf.writeInt(bs.length); // 写入数据长度
        		byteBuf.writeBytes(bs); // 写入数据
        		return byteBuf;
        	}
        
        	// 解码
        	public Packet deCode(ByteBuf byteBuf) throws Exception {
        		// 跳过 魔数 校验
        		byteBuf.skipBytes(4);
        
        		// 跳过版本号
        		byteBuf.skipBytes(1);
        
        		// 序列化算法标识
        		byte serializeAlgorithm = byteBuf.readByte();
        
        		// 指令标识
        		byte command = byteBuf.readByte();
        
        		// 数据包长度
        		int length = byteBuf.readInt();
        
        		// 数据
        		byte[] bs = new byte[length];
        		byteBuf.readBytes(bs);
        
        		// 通过序列化算法标识获取对应的 序列化对象
        		Serializer serializer = getSerializer(serializeAlgorithm);
        
        		// 通过指令标识 获取对应的 数据包类
        		Packet packet = getRequestPacket(command);
        
        		// 执行解码
        		if (serializer != null && packet != null) {
        			return serializer.deSerialize(packet.getClass(), bs);
        		} else {
        			System.out.println("没有找到对应的序列化对象或数据包对象");
        			return null;
        		}
        
        	}
        
        	// 通过指令获取对应的 数据包类  
        	private Packet getRequestPacket(byte command) throws Exception {
        
        		return (Packet) packetTypeMap.get(command).newInstance();
        	}
        
        	// 通过序列化算法标识 获取对应的序列化类
        	private Serializer getSerializer(byte serializeAlgorithm) throws Exception {
        
        		return (Serializer) serializerMap.get(serializeAlgorithm).newInstance();
        	}
        
        }
        
    5. 总结:
      1. 通信协议书为了服务端和客户端交互 , 双方协商出来的满足一定规则的二进制数据格式
      2. 介绍了一种通用的通信协议的设计 , 包括魔数 、 版本号 、 序列化算法标识 、 指令标识 、数据长度 、 数据几个字段 , 该协议能够满足大多数通信的场景
      3. java对象以及序列化 , 目的就是实现java对象与二进制数据的互换
      4. 最后我们依照我们设计的协议以及ByteBuf 的API 实现了通信协议 , 这个过程成为编码过程
    6. 思考:
      1. 序列化和编码都是 把java对象封装成二进制数据的过程 , 这两者有什么区别?
        1. 序列化是把内容变成计算机可传输的资源,而编码则是让程序认识这份资源。
      2. 指令标识 、 序列化算法标识为什么不用枚举?
      3. 请问这节课自己设计的通信协议是属于应用层协议,和http协议是同一级别是吧?
        1. 对的,这样理解起来完全正确,只不过自定义协议属于私有协议,http属于共有协议
      4. 使用protobuf 生成的对象的二进制要小很多,使用protobuf 可以减小数据包的大小。一个数据包无法装下一个对象的这种情况怎么处理呢(就是一个ByteBuf 被很多个物理层数据包传输的情况)?
        1. 所以设计协议的时候,长度字段的长度需要考量,需要支持最大的数据包大小,这里是4个字节,最大值为 2147483647,已经完全足够了,然后一个物理层数据包如果塞不下,会被拆成多个数据包,另外一端接受的时候把这些数据包粘合起来,可参考13小结
    展开全文
  • ##Netty实战 IM即时通讯系统(六)实战: 客户端和服务端双向通信 零、 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 实战: 客户端和服务端双向通信 数据传输载体ByteBuf介绍 客户端与服务端通信...

    ##

    Netty实战 IM即时通讯系统(六)实战: 客户端和服务端双向通信

    零、 目录

    1. IM系统简介
    • Netty 简介
    • Netty 环境配置
    • 服务端启动流程
    • 实战: 客户端和服务端双向通信
    • 数据传输载体ByteBuf介绍
    • 客户端与服务端通信协议编解码
    • 实现客户端登录
    • 实现客户端与服务端收发消息
    • pipeline与channelHandler
    • 构建客户端与服务端pipeline
    • 拆包粘包理论与解决方案
    • channelHandler的生命周期
    • 使用channelHandler的热插拔实现客户端身份校验
    • 客户端互聊原理与实现
    • 群聊的发起与通知
    • 群聊的成员管理(加入与退出,获取成员列表)
    • 群聊消息的收发及Netty性能优化
    • 心跳与空闲检测
    • 总结
    • 扩展

    ###六、 实战: 客户端和服务端双向通信

    1. 本节我们要实现的功能是客户端连接成功后,向服务端写出一段数据 , 服务端收到数据后打印 , 并向客户端回复一段数据 。

    2. 我们先做一个代码框架 , 然后在框架上面做修改

       public class Test_07_客户端和服务端双向通信 {
       	public static void main(String[] args) {
       		Test_07_Server.start(8000);
       		Test_07_Client.start("127.0.0.1" , 8000  ,5);
       	}
       }
       
       class Test_07_Client{
       	public static void start(String IP , int port ,int maxRetry){
       		NioEventLoopGroup workerGroup = new NioEventLoopGroup();
       		
       		Bootstrap bootstrap = new Bootstrap();
       		
       		bootstrap.group(workerGroup)
       			.channel(NioSocketChannel.class)
       			.handler(new ChannelInitializer<NioSocketChannel>() {
       
       				@Override
       				protected void initChannel(NioSocketChannel ch) throws Exception {
       
       				}
       			});
       		connect(bootstrap , IP , port , maxRetry);
       	}
       	
       	private static void connect(Bootstrap bootstrap, String IP, int port, int maxRetry , int... retryIndex) {
       		bootstrap.connect(IP , port).addListener(future ->{
       			int[] finalRetryIndex;
       			if(future.isSuccess()) {
       				System.out.println("连接成功");
       			}else if(maxRetry ==0) {
       				System.out.println("达到最大重试此时,放弃重试");
       			}else {
       				// 初始化 重试计数
       				if(retryIndex.length == 0) {
       					finalRetryIndex = new int[]{0};
       				}else {
       					finalRetryIndex = retryIndex;
       				}
       				// 计算时间间隔
       				int delay = 1 << finalRetryIndex[0];
       				// 执行重试
       				System.out.println(new Date() +" 连接失败,剩余重试次数:"+ maxRetry + ","+delay+"秒后执行重试");
       				bootstrap.config().group().schedule(()->{
       					connect(bootstrap , IP, port , maxRetry -1 , finalRetryIndex[0]+1);
       				}, delay, TimeUnit.SECONDS);
       			}
       		});
       	}
       }
       
       
       
       class Test_07_Server{
       	public static void start(int port){
       		NioEventLoopGroup bossGroup = new NioEventLoopGroup();
       		NioEventLoopGroup workerGroup = new NioEventLoopGroup();
       		
       		ServerBootstrap serverBootstrap = new ServerBootstrap();
       		
       		serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
       		.childHandler(new ChannelInitializer<NioSocketChannel>() {
      
       			@Override
       			protected void initChannel(NioSocketChannel ch) throws Exception {
       				ch.pipeline().addLast(new Test_07_ServerHandler());
       			}
       		});
       		bind(serverBootstrap, port);
       
       	}
       	private static void bind(ServerBootstrap serverBootstrap, int port) {
       		serverBootstrap
       			.bind(port)
       			.addListener(future -> {
       				if(future.isSuccess()) {
       					System.out.println("服务端:端口【"+port+"】绑定成功!");
       				}else {
       					System.out.println("服务端:端口【"+port+"】绑定失败,尝试绑定【"+(port+1)+"】!");
       					bind(serverBootstrap, port+1);
       				}
       			});
       	}
      
       	
       }
      
    3. 客户端发送数据到服务端

      1. 在《客户端启动流程》这一小节 , 我们提到 客户端相关的数据读写逻辑是通过BootStrap的handler()方法指定

         bootstrap.group(workerGroup).channel(NioSocketChannel.class)
         	.handler(new ChannelInitializer<NioSocketChannel>() {
        
         		@Override
         		protected void initChannel(NioSocketChannel ch) throws Exception {
         			
         		}
         	});		
        
      2. 现在我们在initChannel()中给客户端添加一个逻辑处理器 , 这个处理器的作用就是负责向服务端写数据

         bootstrap.group(workerGroup).channel(NioSocketChannel.class)
         	.handler(new ChannelInitializer<NioSocketChannel>() {
        
         		@Override
         		protected void initChannel(NioSocketChannel ch) throws Exception {
         			// 添加业务处理逻辑  可以添加自定义的业务处理逻辑也可以添加    Netty自带的简单通用的处理逻辑
         			ch.pipeline().addLast(new Test_07_ClientHandler());
         		}
         	});
        
        1. ch.pipeline()方法返回的是和这条连接相关的逻辑处理链 , 采用了责任链处理模式 , 这里不理解没关系 , 后面会讲到。

        2. 然后再调用addLast()方法添加一个逻辑处理器 , 这个逻辑处理器为的就是在客户端建立连接成功之后向服务端写数据 , 下面是这个逻辑处理器的代码:

           class Test_07_ClientHandler extends ChannelInboundHandlerAdapter {
           
           	@Override
           	public void channelActive(ChannelHandlerContext ctx) throws Exception {
           		System.out.println(new Date() + " 客户端写出数据...");
           		
           		// 1. 获取数据
           		ByteBuf buffer = getByteBuf(ctx);
           		// 2. 写数据
           		ctx.channel().writeAndFlush(buffer);
           	}
           	
           	private ByteBuf getByteBuf(ChannelHandlerContext ctx) {
           		// 获取二进制抽象 ByteBuffer
           		ByteBuf buf = ctx.alloc().buffer();
           		
           		// 准备数据
           		byte[] bs = "你好,奥特曼!".getBytes(Charset.forName("UTF-8"));
           		
           		// 把数据填充到 buf
           		buf.writeBytes(bs);
           		return buf;
           	}
           
           }
          
          1. 这个逻辑处理器继承自ChannelInboundHandlerAdapter ,然后覆盖了channelActive()方法 , 这个方法会在客户端连接建立成功之后被调用
          2. 客户端连接建立成功之后 , 调用channelActive() , 在这个方法里面 , 我们编写向服务端写数据的逻辑
          3. 向服务端写数据分为两步 , 首先我们要获取一个netty对二进制数据抽象的二进制ByteBuf , 上面代码中ctx.alloc() 获取一个ByteBuf的内存管理器 , 这个内存管理器的作用就是分配一个ByteBuf , 然后我们把字符串的二进制数据填充到ByteBuf , 这样我们就获取到了Netty需要的一个数据格式, 最后我们调用ctx.channel().writeAndFlush()把数据写到服务端。
          4. 以上就是 向服务端写数据的逻辑 , 和传统的socket 编程不同的是 , Netty 里面的数据是以ByteBuf为单位的 , 所有需要写出的数据必须塞到一个ByteBuf里 , 需要读取的数据也是如此。
    4. 服务端读取客户端数据

      1. 服务端的数据处理逻辑 是通过ServerBootStrap 的childHandler()方法指定

         serverBootStrtap.childHandler(new ChannelInitializer<NioSocketChannel>() {
        
         	@Override
         	protected void initChannel(NioSocketChannel ch) throws Exception {
         		// TODO Auto-generated method stub
        
         	}
         })
        
      2. 现在 , 我们在initChannel() 中 给服务端添加一个逻辑处理器 , 这个处理器 的作用就是负责客户端读数据

         serverBootStrtap.childHandler(new ChannelInitializer<NioSocketChannel>() {
        
         	@Override
         	protected void initChannel(NioSocketChannel ch) throws Exception {
         		ch.pipeline().addLast(new Test_07_ServerHandler());
         	}
         })
        
        1. 这个方法里的逻辑和客户端类似 , 获取服务端关于这条连接的逻辑处理链pipeline , 然后添加一个逻辑处理器 , 负责读取客户端发来的数据

           class Test_07_ServerHandler extends ChannelInboundHandlerAdapter{
           
           	@Override
           	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
           		ByteBuf buf = (ByteBuf) msg;
           		System.out.println(new Date() + ": 服务端读到数据->"+ buf.toString(Charset.forName("UTF-8")));
           	}
           }
          
          1. 服务端的逻辑处理器同样是继承自ChannelInboundHandlerAdapter , 与客户端不同的是 , 这里覆盖的方法是 channelRead() ,这个方法在接收到数据之后会被回调
          2. 这里的msg 值的是Netty里面数据读写的载体 , 为什么不直接是ByteBuf , 而需要我们强转一下 , 我们后面会分析道 , 这里我们强转之后 , 然后调用buteBuf.toString() 就能够拿到我们客户端发过来的字符串数据。
    5. 运行测试

      1. 完整代码

         import java.nio.charset.Charset;
         import java.util.Date;
         import java.util.concurrent.TimeUnit;
         import io.netty.bootstrap.Bootstrap;
         import io.netty.bootstrap.ServerBootstrap;
         import io.netty.buffer.ByteBuf;
         import io.netty.channel.ChannelHandlerContext;
         import io.netty.channel.ChannelInboundHandlerAdapter;
         import io.netty.channel.ChannelInitializer;
         import io.netty.channel.nio.NioEventLoopGroup;
         import io.netty.channel.socket.nio.NioServerSocketChannel;
         import io.netty.channel.socket.nio.NioSocketChannel;
         
         public class Test_07_客户端和服务端双向通信 {
         	public static void main(String[] args) throws Exception {
         		Test_07_Server.start(8000);
         		Test_07_Client.start("127.0.0.1", 8000, 5);
         	}
         }
         
         class Test_07_Client {
         	public static void start(String IP, int port, int maxRetry) {
         		NioEventLoopGroup workerGroup = new NioEventLoopGroup();
         
         		Bootstrap bootstrap = new Bootstrap();
         
         		bootstrap.group(workerGroup).channel(NioSocketChannel.class)
         				.handler(new ChannelInitializer<NioSocketChannel>() {
         
         					@Override
         					protected void initChannel(NioSocketChannel ch) throws Exception {
         						// 添加业务处理逻辑  可以添加自定义的业务处理逻辑也可以添加    Netty自带的简单通用的处理逻辑
         						ch.pipeline().addLast(new Test_07_ClientHandler());
         					}
         				});
         		connect(bootstrap, IP, port, maxRetry);
         	}
         
         	private static void connect(Bootstrap bootstrap, String IP, int port, int maxRetry, int... retryIndex) {
         		bootstrap.connect(IP, port).addListener(future -> {
         			int[] finalRetryIndex;
         			if (future.isSuccess()) {
         				System.out.println("客户端连接【"+IP+":"+port+"】成功");
         			} else if (maxRetry == 0) {
         				System.out.println("达到最大重试此时,放弃重试");
         			} else {
         				// 初始化 重试计数
         				if (retryIndex.length == 0) {
         					finalRetryIndex = new int[] { 0 };
         				} else {
         					finalRetryIndex = retryIndex;
         				}
         				// 计算时间间隔
         				int delay = 1 << finalRetryIndex[0];
         				// 执行重试
         				System.out.println(new Date() + " 连接失败,剩余重试次数:" + maxRetry + "," + delay + "秒后执行重试");
         				bootstrap.config().group().schedule(() -> {
         					connect(bootstrap, IP, port, maxRetry - 1, finalRetryIndex[0] + 1);
         				}, delay, TimeUnit.SECONDS);
         			}
         		});
         	}
         }
         
         class Test_07_Server {
         	public static void start(int port) {
         		NioEventLoopGroup bossGroup = new NioEventLoopGroup();
         		NioEventLoopGroup workerGroup = new NioEventLoopGroup();
         
         		ServerBootstrap serverBootstrap = new ServerBootstrap();
         
         		serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
         				.childHandler(new ChannelInitializer<NioSocketChannel>() {
         
         					@Override
         					protected void initChannel(NioSocketChannel ch) throws Exception {
         						ch.pipeline().addLast(new Test_07_ServerHandler());
         					}
         				});
         		bind(serverBootstrap, port);
         		
         	}
         	private static void bind(ServerBootstrap serverBootstrap, int port) {
         		serverBootstrap
         			.bind(port)
         			.addListener(future -> {
         				if(future.isSuccess()) {
         					System.out.println("服务端:端口【"+port+"】绑定成功!");
         				}else {
         					System.out.println("服务端:端口【"+port+"】绑定失败,尝试绑定【"+(port+1)+"】!");
         					bind(serverBootstrap, port+1);
         				}
         			});
         	}
         
         }
         
         class Test_07_ClientHandler extends ChannelInboundHandlerAdapter {
         
         	@Override
         	public void channelActive(ChannelHandlerContext ctx) throws Exception {
         		
         		String content = "你好,奥特曼!";
         		System.out.println(new Date() + " 客户端写出数据:"+content);
         		
         		// 1. 获取数据
         		ByteBuf buffer = getByteBuf(ctx , content);
         		// 2. 写数据
         		ctx.channel().writeAndFlush(buffer);
         	}
         	
         	private ByteBuf getByteBuf(ChannelHandlerContext ctx , String content ) {
         		// 获取二进制抽象 ByteBuffer
         		ByteBuf buf = ctx.alloc().buffer();
         		
         		// 准备数据
         		byte[] bs = content.getBytes(Charset.forName("UTF-8"));
         		
         		// 把数据填充到 buf
         		buf.writeBytes(bs);
         		return buf;
         	}
         
         }
         
         class Test_07_ServerHandler extends ChannelInboundHandlerAdapter{
         
         	@Override
         	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         		ByteBuf buf = (ByteBuf) msg;
         		System.out.println(new Date() + ": 服务端读到数据->"+ buf.toString(Charset.forName("UTF-8")));
         	}
         }
        
      2. 运行结果:

    6. 服务端回复数据给客户端

      1. 服务端向客户端写数据的逻辑与客户端向服务端写数据的逻辑一样 , 先创建一个ByteBuf , 然后填充二进制数据 , 最后调用writeAndFlush()方法写出去

         class Test_07_ServerHandler extends ChannelInboundHandlerAdapter{
         
         	@Override
         	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         		ByteBuf buf = (ByteBuf) msg;
         		System.out.println(new Date() + ": 服务端读到数据->"+ buf.toString(Charset.forName("UTF-8")));
         		
         		// 向客户端回复数据
         		String content = "你好,田先森!";
         		System.out.println(new Date() +":服务端写出数据-> "+content);
         		ByteBuf byteBuf = getByteBuf(ctx , content);
         		ctx.channel().writeAndFlush(byteBuf);
         	}
         
         	private static ByteBuf getByteBuf(ChannelHandlerContext cxt , String content) {
         		
         		// 获取 二进制抽象 ByteBuf
         		ByteBuf byteBuf = cxt.alloc().buffer();
         		
         		// 准备数据
         		byte[] bs = content.getBytes(Charset.forName("UTF-8"));
         		
         		// 把数据填充到buf中
         		byteBuf.writeBytes(bs);
         		return byteBuf;
         	}
         }
        
      2. 现在轮到客户端了 , 客户端读取数据的逻辑和服务端读数据的逻辑一样 , 同样是覆盖channelRead() 方法

         class Test_07_ClientHandler extends ChannelInboundHandlerAdapter {
         
         	@Override
         	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         		ByteBuf byteBuf = (ByteBuf) msg;
         		System.out.println(new Date()+": 客户端读到数据 ->"+ byteBuf.toString(Charset.forName("UTF-8")));
         	}
         
         	@Override
         	public void channelActive(ChannelHandlerContext ctx) throws Exception {
         		
         		String content = "你好,奥特曼!";
         		System.out.println(new Date() + " 客户端写出数据:"+content);
         		
         		// 1. 获取数据
         		ByteBuf buffer = getByteBuf(ctx , content);
         		// 2. 写数据
         		ctx.channel().writeAndFlush(buffer);
         		
         	}
         	
         	private ByteBuf getByteBuf(ChannelHandlerContext ctx , String content ) {
         		// 获取二进制抽象 ByteBuffer
         		ByteBuf buf = ctx.alloc().buffer();
         		
         		// 准备数据
         		byte[] bs = content.getBytes(Charset.forName("UTF-8"));
         		
         		// 把数据填充到 buf
         		buf.writeBytes(bs);
         		return buf;
         	}
         
         }
        
      3. 现在 客户端和服务端就实现了双向通信

        1. 完整代码:

           import java.nio.charset.Charset;
           import java.util.Date;
           import java.util.concurrent.TimeUnit;
           import io.netty.bootstrap.Bootstrap;
           import io.netty.bootstrap.ServerBootstrap;
           import io.netty.buffer.ByteBuf;
           import io.netty.channel.ChannelHandlerContext;
           import io.netty.channel.ChannelInboundHandlerAdapter;
           import io.netty.channel.ChannelInitializer;
           import io.netty.channel.nio.NioEventLoopGroup;
           import io.netty.channel.socket.nio.NioServerSocketChannel;
           import io.netty.channel.socket.nio.NioSocketChannel;
           
           public class Test_07_客户端和服务端双向通信 {
           	public static void main(String[] args) throws Exception {
           		Test_07_Server.start(8000);
           		Test_07_Client.start("127.0.0.1", 8000, 5);
           	}
           }
           
           class Test_07_Client {
           	public static void start(String IP, int port, int maxRetry) {
           		NioEventLoopGroup workerGroup = new NioEventLoopGroup();
           
           		Bootstrap bootstrap = new Bootstrap();
           
           		bootstrap.group(workerGroup).channel(NioSocketChannel.class)
           				.handler(new ChannelInitializer<NioSocketChannel>() {
           
           					@Override
           					protected void initChannel(NioSocketChannel ch) throws Exception {
           						// 添加业务处理逻辑  可以添加自定义的业务处理逻辑也可以添加    Netty自带的简单通用的处理逻辑
           						ch.pipeline().addLast(new Test_07_ClientHandler());
           					}
           				});
           		connect(bootstrap, IP, port, maxRetry);
           	}
           
           	private static void connect(Bootstrap bootstrap, String IP, int port, int maxRetry, int... retryIndex) {
           		bootstrap.connect(IP, port).addListener(future -> {
           			int[] finalRetryIndex;
           			if (future.isSuccess()) {
           				System.out.println("客户端连接【"+IP+":"+port+"】成功");
           			} else if (maxRetry == 0) {
           				System.out.println("达到最大重试此时,放弃重试");
           			} else {
           				// 初始化 重试计数
           				if (retryIndex.length == 0) {
           					finalRetryIndex = new int[] { 0 };
           				} else {
           					finalRetryIndex = retryIndex;
           				}
           				// 计算时间间隔
           				int delay = 1 << finalRetryIndex[0];
           				// 执行重试
           				System.out.println(new Date() + " 连接失败,剩余重试次数:" + maxRetry + "," + delay + "秒后执行重试");
           				bootstrap.config().group().schedule(() -> {
           					connect(bootstrap, IP, port, maxRetry - 1, finalRetryIndex[0] + 1);
           				}, delay, TimeUnit.SECONDS);
           			}
           		});
           	}
           }
           
           class Test_07_Server {
           	public static void start(int port) {
           		NioEventLoopGroup bossGroup = new NioEventLoopGroup();
           		NioEventLoopGroup workerGroup = new NioEventLoopGroup();
           
           		ServerBootstrap serverBootstrap = new ServerBootstrap();
           
           		serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
           				.childHandler(new ChannelInitializer<NioSocketChannel>() {
           
           					@Override
           					protected void initChannel(NioSocketChannel ch) throws Exception {
           						ch.pipeline().addLast(new Test_07_ServerHandler());
           					}
           				});
           		bind(serverBootstrap, port);
           		
           	}
           	private static void bind(ServerBootstrap serverBootstrap, int port) {
           		serverBootstrap
           			.bind(port)
           			.addListener(future -> {
           				if(future.isSuccess()) {
           					System.out.println("服务端:端口【"+port+"】绑定成功!");
           				}else {
           					System.out.println("服务端:端口【"+port+"】绑定失败,尝试绑定【"+(port+1)+"】!");
           					bind(serverBootstrap, port+1);
           				}
           			});
           	}
           
           }
           
           class Test_07_ClientHandler extends ChannelInboundHandlerAdapter {
           
           	@Override
           	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
           		ByteBuf byteBuf = (ByteBuf) msg;
           		System.out.println(new Date()+": 客户端读到数据 ->"+ byteBuf.toString(Charset.forName("UTF-8")));
           	}
           
           	@Override
           	public void channelActive(ChannelHandlerContext ctx) throws Exception {
           		
           		String content = "你好,奥特曼!";
           		System.out.println(new Date() + " 客户端写出数据:"+content);
           		
           		// 1. 获取数据
           		ByteBuf buffer = getByteBuf(ctx , content);
           		// 2. 写数据
           		ctx.channel().writeAndFlush(buffer);
           		
           	}
           	
           	private ByteBuf getByteBuf(ChannelHandlerContext ctx , String content ) {
           		// 获取二进制抽象 ByteBuffer
           		ByteBuf buf = ctx.alloc().buffer();
           		
           		// 准备数据
           		byte[] bs = content.getBytes(Charset.forName("UTF-8"));
           		
           		// 把数据填充到 buf
           		buf.writeBytes(bs);
           		return buf;
           	}
           
           }
           
           class Test_07_ServerHandler extends ChannelInboundHandlerAdapter{
           
           	@Override
           	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
           		ByteBuf buf = (ByteBuf) msg;
           		System.out.println(new Date() + ": 服务端读到数据->"+ buf.toString(Charset.forName("UTF-8")));
           		
           		// 向客户端回复数据
           		String content = "你好,田先森!";
           		System.out.println(new Date() +":服务端写出数据-> "+content);
           		ByteBuf byteBuf = getByteBuf(ctx , content);
           		ctx.channel().writeAndFlush(byteBuf);
           	}
           
           	private static ByteBuf getByteBuf(ChannelHandlerContext cxt , String content) {
           		
           		// 获取 二进制抽象 ByteBuf
           		ByteBuf byteBuf = cxt.alloc().buffer();
           		
           		// 准备数据
           		byte[] bs = content.getBytes(Charset.forName("UTF-8"));
           		
           		// 把数据填充到buf中
           		byteBuf.writeBytes(bs);
           		return byteBuf;
           	}
           	
           }
          
        2. 执行结果

    7. 总结

      1. 本小节中 , 我们了解到客户端和服务端的逻辑处理均是在启动的时候 , 通过给逻辑处理链pipeline添加逻辑处理器 , 来编写数据的处理逻辑 , pipeline的逻辑我们会在后面分析
      2. 接下来我们学到了 在客户端连接成功之后会回调逻辑处理器的channelActive()方法 , 而不管是服务端还是客户端 , 收到数据之后都会调用channelRead方法
      3. 写数据用writeAndFlush() 方法 客户端与服务端交互的二进制数据载体为ByteBuf , ByteBuf 通过连接的内存管理器创建 , 字节数据填充到ByteBuf 之后才能写到对端 , 接下来一小节 , 我们就重点来分析ByteBuf
    8. 思考: 如何实现在新连接介入的时候 , 服务端主动向客户端推送消息 , 客户端回复服务端消息?

      1. 解答: 在服务器端的逻辑处理其中也实现 channelActive() 在有新的连接接入时 会回调此方法

         class Test_07_ServerHandler extends ChannelInboundHandlerAdapter{
         
         	@Override
         	public void channelActive(ChannelHandlerContext ctx) throws Exception {
         		String content = "是不是你连我了?";
         		System.out.println(new Date() +":服务端写出数据-> "+content);
         		ByteBuf byteBuf = getByteBuf(ctx , content);
         		ctx.channel().writeAndFlush(byteBuf);
         	}
         
         	@Override
         	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
         		ByteBuf buf = (ByteBuf) msg;
         		System.out.println(new Date() + ": 服务端读到数据->"+ buf.toString(Charset.forName("UTF-8")));
         		
         		// 向客户端回复数据
         		String content = "你好,田先森!";
         		System.out.println(new Date() +":服务端写出数据-> "+content);
         		ByteBuf byteBuf = getByteBuf(ctx , content);
         		ctx.channel().writeAndFlush(byteBuf);
         	}
         
         	private static ByteBuf getByteBuf(ChannelHandlerContext cxt , String content) {
         		
         		// 获取 二进制抽象 ByteBuf
         		ByteBuf byteBuf = cxt.alloc().buffer();
         		
         		// 准备数据
         		byte[] bs = content.getBytes(Charset.forName("UTF-8"));
         		
         		// 把数据填充到buf中
         		byteBuf.writeBytes(bs);
         		return byteBuf;
         	}
         	
         }
        
    展开全文
  • ##Netty实战 IM即时通讯系统(二)Netty简介 零、 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 实战: 客户端和服务端双向通信 数据传输载体ByteBuf介绍 客户端与服务端通信协议编解码 实现客户端...

    ##

    Netty实战 IM即时通讯系统(二)Netty简介

    零、 目录

    1. IM系统简介
    • Netty 简介
    • Netty 环境配置
    • 服务端启动流程
    • 实战: 客户端和服务端双向通信
    • 数据传输载体ByteBuf介绍
    • 客户端与服务端通信协议编解码
    • 实现客户端登录
    • 实现客户端与服务端收发消息
    • pipeline与channelHandler
    • 构建客户端与服务端pipeline
    • 拆包粘包理论与解决方案
    • channelHandler的生命周期
    • 使用channelHandler的热插拔实现客户端身份校验
    • 客户端互聊原理与实现
    • 群聊的发起与通知
    • 群聊的成员管理(加入与退出,获取成员列表)
    • 群聊消息的收发及Netty性能优化
    • 心跳与空闲检测
    • 总结
    • 扩展

    二、 Netty简介

    1. 回顾IO编程
      1. 场景: 客户端每隔两秒发送一个带有时间戳的“hello world”给服务端 , 服务端收到之后打印。

      2. 代码:

         IOServer.java
         /**
          * @author 闪电侠
          */
         public class IOServer {
             public static void main(String[] args) throws Exception {
         
                 ServerSocket serverSocket = new ServerSocket(8000);
         
                 // (1) 接收新连接线程
                 new Thread(() -> {
                     while (true) {
                         try {
                             // (1) 阻塞方法获取新的连接
                             Socket socket = serverSocket.accept();
         
                             // (2) 每一个新的连接都创建一个线程,负责读取数据
                             new Thread(() -> {
                                 try {
                                     int len;
                                     byte[] data = new byte[1024];
                                     InputStream inputStream = socket.getInputStream();
                                     // (3) 按字节流方式读取数据
                                     while ((len = inputStream.read(data)) != -1) {
                                         System.out.println(new String(data, 0, len));
                                     }
                                 } catch (IOException e) {
                                 }
                             }).start();
         
                         } catch (IOException e) {
                         }
         
                     }
                 }).start();
             }
         }
        
        
        
        
         IOClient.java
         /**
          * @author 闪电侠
          */
         public class IOClient {
         
             public static void main(String[] args) {
                 new Thread(() -> {
                     try {
                         Socket socket = new Socket("127.0.0.1", 8000);
                         while (true) {
                             try {
                                 socket.getOutputStream().write((new Date() + ": hello world").getBytes());
                                 Thread.sleep(2000);
                             } catch (Exception e) {
                             }
                         }
                     } catch (IOException e) {
                     }
                 }).start();
             }
         }
        
      3. IO编程,模型在客户端较少的场景下运行良好 , 但是客户端比较多的业务来说 , 单机服务端可能需要支撑成千上万的连接, IO模型可能就不太合适了 , 原因:

        1. 在传统的IO模型中 , 每一个连接创建成功之后都需要一个线程来维护 , 每个线程包含一个while死循环, 那么1W个连接就对应1W个线程 , 继而1W个死循环。
        2. 线程资源受限: 线程是操作系统中非常宝贵的资源 , 同一时刻有大量的线程处于阻塞状态是非常严重的资源浪费,操作系统开销太大。
        3. 线程切换效率低下: 单机CPU核数固定 , 线程爆炸之后操作系统频繁的进行线程切换 , 应用性能几句下降
        4. IO编程中 , 数据读写是以字节流为单位。
      4. 为了解决这些问题 , JDK1.4之后提出了NIO

    2. NIO 编程
      1. NIO 是如何解决一下三个问题。

        1. 线程资源受限
          1. NIO编程模型中 , 新来一个连接不再创建一个新的线程, 而是可以把这条连接直接绑定在某个固定的线程 , 然后这条连接所有的读写都由这个线程来负责 , 那么他是怎么做到的? IO  与 NIO 对比
            1. 如上图所示,IO 模型中,一个连接来了,会创建一个线程,对应一个 while 死循环,死循环的目的就是不断监测这条连接上是否有数据可以读,大多数情况下,1w 个连接里面同一时刻只有少量的连接有数据可读,因此,很多个 while 死循环都白白浪费掉了,因为读不出啥数据。
            2. 而在 NIO 模型中,他把这么多 while 死循环变成一个死循环,这个死循环由一个线程控制,那么他又是如何做到一个线程,一个 while 死循环就能监测1w个连接是否有数据可读的呢? 这就是 NIO 模型中 selector 的作用,一条连接来了之后,现在不创建一个 while 死循环去监听是否有数据可读了,而是直接把这条连接注册到 selector 上,然后,通过检查这个 selector,就可以批量监测出有数据可读的连接,进而读取数据,下面我再举个非常简单的生活中的例子说明 IO 与 NIO 的区别。
            3. 在一家幼儿园里,小朋友有上厕所的需求,小朋友都太小以至于你要问他要不要上厕所,他才会告诉你。幼儿园一共有 100 个小朋友,有两种方案可以解决小朋友上厕所的问题:
              1. 每个小朋友配一个老师。每个老师隔段时间询问小朋友是否要上厕所,如果要上,就领他去厕所,100 个小朋友就需要 100 个老师来询问,并且每个小朋友上厕所的时候都需要一个老师领着他去上,这就是IO模型,一个连接对应一个线程。
              2. 所有的小朋友都配同一个老师。这个老师隔段时间询问所有的小朋友是否有人要上厕所,然后每一时刻把所有要上厕所的小朋友批量领到厕所,这就是 NIO 模型,所有小朋友都注册到同一个老师,对应的就是所有的连接都注册到一个线程,然后批量轮询。
            4. 这就是 NIO 模型解决线程资源受限的方案,实际开发过程中,我们会开多个线程,每个线程都管理着一批连接,相对于 IO 模型中一个线程管理一条连接,消耗的线程资源大幅减少
        2. 线程切换效率低下
          1. 由于NIO模型中线程数量大大降低 , 线程切换的效率也因此大幅度提高
        3. IO读写面向流
          1. IO读写是面向流的 , 一次性只能从流中读取一个或多个字节 , 并且读完之后无法再次读取 , 你需要自己缓存数据 , 而NIO的读写是面向Buffer的 , 你可以随意读取里面的任何一个字节数据 , 不需要你自己缓存数据 , 这一切只需要移动读写指针即可。
      2. 原生NIO 实现

        /**
         * 服务端
         * */
        class NIO_server_test_01{
        	
        	public static void start () throws IOException {
        		Selector serverSelect = Selector.open();
        		Selector clientSelect = Selector.open();
        		
        		new Thread(() -> {
        			try {
        				ServerSocketChannel socketChannel = ServerSocketChannel.open();
        				socketChannel.socket().bind(new InetSocketAddress(8000)); // 监听端口
        				socketChannel.configureBlocking(false); // 是否阻塞
        				socketChannel.register(serverSelect, SelectionKey.OP_ACCEPT);
        				
        				while ( true ) {
        					// 检测是否有新的连接
        					if(serverSelect.select(1) > 0) {  // 1 是超时时间     select 方法返回当前连接数量
        						Set<SelectionKey> set = serverSelect.selectedKeys();
        						
        						set.stream()
        							.filter(key -> key.isAcceptable())
        							.collect(Collectors.toList())
        							.forEach(key ->{
        								try {
        									// 每次来一个新的连接, 不需要创建新的线程 , 而是注册到clientSelector
        									SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
        									clientChannel.configureBlocking(false);
        									clientChannel.register(serverSelect, SelectionKey.OP_ACCEPT);
        								}catch(Exception e) {
        									e.printStackTrace();
        								}finally {
        									set.iterator().remove();
        								}
        							});
        					}
        				}
        			}catch (Exception e) {
        				e.printStackTrace();
        			}
        		}).start();
        		
        		
        		new Thread(() -> {
        			try {
        				// 批量轮询  有哪些连接有数据可读
        				while ( true ) {
        					if(clientSelect.select(1) > 0) {
        						clientSelect.selectedKeys().stream()
        							.filter(key -> key.isReadable())
        							.collect(Collectors.toList())
        							.forEach(key -> {
        								try {
        									SocketChannel clientChannl = (SocketChannel) key.channel();
        									ByteBuffer bf = ByteBuffer.allocate(1024);
        									// 面向byteBuffer
        									clientChannl.read(bf);
        									bf.flip();
        									System.out.println(Charset.defaultCharset().newDecoder().decode(bf).toString());
        								}catch ( Exception e) {
        									e.printStackTrace();
        								}finally {
        									clientSelect.selectedKeys().iterator().remove();
        									key.interestOps(SelectionKey.OP_READ);
        								}
        								
        							});
        					}
        				}
        			}catch (Exception e) {
        				e.printStackTrace();
        			}
        		}).start();
        	}
        	
        }   
        
        1. 通常NIO 模型中会有两个线程每个线程中绑定一个轮询器selector , 在我们的例子中serverSelector负责轮询是否有新的连接 , clientSelector 负责轮询连接中是否有数据可读。
        2. 服务端检测到新的连接之后 , 不在创建一个新的线程 , 而是直接将连接注册到clientSelector中
        3. clientorSelector 被一个while死循环抱着 , 如果在某一时刻有多个连接数据可读 ,数据将会被clientSelector.select() 方法轮询出来。 进而批量处理 。
        4. 数据的读写面向buffer 而不是面向流。
      3. 原生NIO 进行网络开发的缺点:

        1. JDK 的NIO 编程需要了解很多概念, 编程复杂 , 对NIO 入门很不友好 , 编程模型不友好 , ByteBuffer的API简直反人类 (这是书里这么说的 , 不要喷我)。
        2. 对NIO 编程来说 , 一个比较适合的线程模型能充分发挥它的优势 , 而JDK没有给你实现 , 你需要自己实现 , 就连简单的协议拆包都要自己实现 (我感觉这样才根据创造力呀 )
        3. JDK NIO 底层由epoll 实现 , 该实现饱受诟病的空轮训bug会导致cpu 飙升100%
        4. 项目庞大之后 , 自己实现的NIO 很容易出现各类BUG , 维护成本高 (作者怎么把自己的过推向JDK haha~)
        5. 正因为如此 , 我连客户端的代码都懒得给你写了 (这作者可真够懒的) , 你可以直接使用IOClient 和NIO_Server 通信
      4. JDK 的NIO 犹如带刺的玫瑰 , 虽然美好 , 让人向往 , 但是使用不当会让你抓耳挠腮 , 痛不欲生 , 正因为如此 , Netty横空出世!(作者这才华 啧啧啧~)

    3. Netty 编程
      1. Netty到底是何方神圣(被作者吹上天了都) , 用依据简单的话来说就是: Netty 封装了JDK 的NIO , 让你使用更加干爽 (干爽???) , 你不用在写一大堆复杂的代码了 , 用官方的话来说就是: Netty是一个异步事件驱动的网络应用框架 , 用于快速开发可维护的高性能服务器和客户端。
      2. Netty 相比 JDK 原生NIO 的优点 :
        1. 使用NIO 需要了解太多概念, 编程复杂 , 一不小心 BUG 横飞
        2. Netty 底层IO模型随意切换 , 而这一切只需要小小的改动 , 改改参数 , Netty乐意直接从NIO模型转换为IO 模型 。
        3. Netty 自带的拆包解包 , 异常检测可以让你从NIO 的繁重细节中脱离出来 , 让你只关心业务逻辑 。
        4. Netty 解决了JDK 的很多包括空轮训在内的BUG
        5. Netty社区活跃 , 遇到问题可以轻松解决
        6. Netty 已经经历各大RPC 框架 , 消息中间价 , 分布式通信中间件线上的广泛验证 , 健壮性无比强大
      3. 代码实例
        1. maven 依赖

          <dependency>
              <groupId>io.netty</groupId>
              <artifactId>netty-all</artifactId>
              <version>4.1.6.Final</version>
          </dependency>
          
        2. NettyServer

           /**
            * @author outman
            * */
           class Netty_server_02 {
           	public void start () {
           		ServerBootstrap serverBootstrap = new ServerBootstrap();
           		
           		NioEventLoopGroup boss = new NioEventLoopGroup();
           		NioEventLoopGroup woker = new NioEventLoopGroup();
           		
           		serverBootstrap.group(boss ,woker)
           			.channel(NioServerSocketChannel.class)
           			.childHandler(new ChannelInitializer<NioSocketChannel>() {
           
           				@Override
           				protected void initChannel(NioSocketChannel ch) throws Exception {
           					ch.pipeline().addLast(new StringDecoder());
           					ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
           
           						@Override
           						protected void channelRead0(ChannelHandlerContext cxt, String msg) throws Exception {
           							System.out.println(msg);
           							
           						}
           					});
           					
           				}
           				
           			}).bind(8000);
           	}
           }
          
          1. 这么一小段代码就实现了我们前面NIO 编程中所有的功能 , 包括服务端启动 , 接收新连接 , 打印客户端传来的数据。
          2. 将NIO 中的概念与IO模型结合起来理解:
            1. boss 对应 IOServer 中接收新连接创建线程 , 主要负责创建新连接
            2. worker 对应 IOServer 中负责读取数据的线程 , 主要用于数据读取语句业务逻辑处理 。
            3. 详细逻辑会在后续深入讨论
        3. NettyClient

          /**
          * @author outman
          * */
          class Netty_client_02 {

             public static void main(String[] args) throws InterruptedException {
             	Bootstrap bootstrap = new Bootstrap();
             	NioEventLoopGroup group = new NioEventLoopGroup();
          
             	bootstrap.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<Channel>() {
             		@Override
             		protected void initChannel(Channel ch) {
             			ch.pipeline().addLast(new StringEncoder());
             		}
             	});
          
             	Channel channel = bootstrap.connect("127.0.0.1", 8000).channel();
          
             	while (true) {
             		channel.writeAndFlush(new Date() + ": hello world!");
             		Thread.sleep(2000);
             	}
             }
          

          }

        4. 在客户端程序中 , group 对应了我们IOClient 中 新起的线程。

        5. 剩下的逻辑 我们在后文中详细分析 , 现在你可以把 Netty_server_02 和Netty_client_02 复制到 你的IDE 中 运行起来 感受世界 的美好 (注意 先启动 服务端 再启动客户端 )

        6. 使用Netty 之后 整个世界都美好了, 一方面 Netty 对NIO 封装的如此完美 , 另一方面 , 使用Netty 之后 , 网络通信这块的性能问题几乎不用操心 , 尽情的让Netty 榨干你的CPU 吧~~

    展开全文
  • ##Netty实战 IM即时通讯系统(三)Netty环境配置 零、 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 实战: 客户端和服务端双向通信 数据传输载体ByteBuf介绍 客户端与服务端通信协议编解码 实现...
  • ##Netty实战 IM即时通讯系统(一)IM系统简介 零、 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 实战: 客户端和服务端双向通信 数据传输载体ByteBuf介绍 客户端与服务端通信协议编解码 实现...
  • IM即时通讯项目上,用到的消息处理机制NettyNetty框架,TCP长连接,心跳,阻塞消息队列,线程池处理消息发送, 基于Google ProtoBuf自定义的消息协议。
  • 上篇 “SpringBoot+Netty开发IM即时通讯系列(一)”介绍了Netty与NIO等基础知识点,感兴趣的可以去看下: https://blog.csdn.net/qq_26975307/article/details/85004424  本篇使用Netty+WebSocket构建一个最简单...
  • 之前工作接触了几个开源的IM产品,再加上曾经用Netty实现过几个服务,于是就有了用Netty实现一个IM的想法,于是用业余时间写了一个IM,和喜欢Netty的程序员们分享。考虑到方便扩展,在服务端采用了Http+Socket结合的...
  • 转载自:Netty入门与实战:仿写微信IM即时通讯系统 Netty是互联网中间件领域使用最广泛最核心的网络通信框架,几乎所有互联网中间件或者大数据领域均离不开Netty,掌握Netty是作为初中级工程师迈向高级工程师最重要的...
  • Netty实战 IM即时通讯系统(十一)pipeline与channelHandler 零、 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 客户端启动流程 实战: 客户端和服务端双向通信 数据传输载体ByteBuf介绍 客户端与...
  • ##Netty实战 IM即时通讯系统(九)实现客户端登录 零、 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 客户端启动流程 实战: 客户端和服务端双向通信 数据传输载体ByteBuf介绍 客户端与服务端通信...
  • ##Netty实战 IM即时通讯系统(四)服务端启动流程 零、 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 实战: 客户端和服务端双向通信 数据传输载体ByteBuf介绍 客户端与服务端通信协议编解码 实现...
  • ##Netty实战 IM即时通讯系统(五)客户端启动流程 零、 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 实战: 客户端和服务端双向通信 数据传输载体ByteBuf介绍 客户端与服务端通信协议编解码 实现...
  • Netty实战 IM即时通讯系统(十二)构建客户端与服务端pipeline 零、 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 客户端启动流程 实战: 客户端和服务端双向通信 数据传输载体ByteBuf介绍 客户端...
  • ##Netty实战 IM即时通讯系统(七)数据传输载体ByteBuf介绍 零、 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 客户端启动流程 实战: 客户端和服务端双向通信 数据传输载体ByteBuf介绍 客户端与...
  • SpringBoot+Netty开发IM即时通讯系列(一)

    万次阅读 多人点赞 2018-12-14 16:02:32
    最近项目的需求有IM通讯这个模块,经过与老大商量决定使用SpringBoot+Netty的方式构建。于是,在这个系列中记录下过程中的学习历程以及撸码上线,以供日后参考。 如果文中有不当或错误请指出,虚心接受批评。 ...
  • Netty实战 IM即时通讯系统(十)实现客户端和服务端收发消息 零、 目录 IM系统简介 Netty 简介 Netty 环境配置 服务端启动流程 客户端启动流程 实战: 客户端和服务端双向通信 数据传输载体ByteBuf介绍 客户端与...
  • Netty 入门与实战:仿写微信 IM 即时通讯系统,掘金小册子,netty教程。章节齐全无缺失,排版非常不错。 1.仿微信IM系统简介 1 2.Netty是什么? 2 3.服务端启动流程 8 4.客户端启动流程 11 5.实战:客户端与服务端双向...
  • ​ 使用netty框架,管理在线用户,当有该用户的消息,就通过该用户对应的channalPipeLine将未读消息提示发过去。 4.消息如何保证不丢失? ​ 消息先做持久化,保证持久化完成后,消息怎么样都不会丢失。因为消息...
  • Spring boot + netty开发即时通讯 IM目的github地址项目目录结构主要核心类介绍 目的 学习以及开源思想,写一个可使用的IM通信程序 github地址 链接: link. 我们对Markdown编辑器进行了一些功能拓展与语法支持,除了...
  • 2Netty 环境配置3服务端启动流程4客户端启动流程5实战:客户端与服务端双向通信6数据传输载体 ByteBuf 介绍7客户端与服务端通信协议编解码8实战:实现客户端登录9实战:实现客户端与服务端收发消息10pipeline 与 ...
  • Netty+入门与实战:仿写微信+IM+即时通讯系统 Netty是互联网中间件领域使用最广泛最核心的网络通信框架,几乎所有互联网中间件或者大数据领域均离不开Netty,掌握Netty是作为初中级工程师迈向高级工程师最重要的技能之...
  • 从小我就喜欢动手,就以一个即时通信的项目为例,已经基于不同技术方案实现了5、6次,仅为了实践技术,截图如下: 有些是刚学完Socket和Swing的时候,想动手试试这些技术能不能写个QQ出来。 也有的是因为实习培训...
  • 根据Netty框架实现消息推送(即时聊天)功能. Netty框架,TCP长连接,心跳,阻塞消息队列,线程池处理消息发送, 基于Google ProtoBuf自定义的消息协议, TCP粘包/拆包.... 客户端通过TCP连接到服务器,并建立TCP长...
  • Netty 即时通信 前端 (六)

    千次阅读 2018-12-21 17:30:00
    本编接着上篇后端基于Netty服务器的websocket服务 ,做一个前端的简单展示 顺便学习一下前端的知识点,关于js的websocket通信方式和http请求也差不多,看下面: var socket = new WebSocket("ws://[ip地址]:[端口]...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 755
精华内容 302
关键字:

im即时通信netty