netty 订阅
Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。也就是说,Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。“快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议(包括FTP、SMTP、HTTP等各种二进制文本协议)的实现经验,并经过相当精心设计的项目。最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。 [1] 展开全文
Netty是由JBOSS提供的一个java开源框架,现为 Github上的独立项目。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。也就是说,Netty 是一个基于NIO的客户、服务器端的编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用,例如实现了某种协议的客户、服务端应用。Netty相当于简化和流线化了网络应用的编程开发过程,例如:基于TCP和UDP的socket服务开发。“快速”和“简单”并不用产生维护性或性能上的问题。Netty 是一个吸收了多种协议(包括FTP、SMTP、HTTP等各种二进制文本协议)的实现经验,并经过相当精心设计的项目。最终,Netty 成功的找到了一种方式,在保证易于开发的同时还保证了其应用的性能,稳定性和伸缩性。 [1]
信息
依赖平台
JBOSS
新的传输方式
处理大容量数据流更简单
特    点
快速开发高性能、高可靠性
性    质
java开源框架
外文名
Netty
Netty特点
针对多种传输类型的统一接口 - 阻塞和非阻塞简单但更强大的线程模型真正的无连接的数据报套接字支持链接逻辑支持复用大量的 Javadoc 和 代码实例除了在 JDK 1.6 + 额外的限制。(一些特征是只支持在Java 1.7 +。可选的功能可能有额外的限制。)比核心 Java API 更好的吞吐量,较低的延时资源消耗更少,这个得益于共享池和重用减少内存拷贝消除由于慢,快,或重载连接产生的 OutOfMemoryError消除经常发现在 NIO 在高速网络中的应用中的不公平的读/写比完整的 SSL / TLS 和 StartTLS 的支持运行在受限的环境例如 Applet 或 OSGI发布的更早和更频繁社区驱动 [2] 
收起全文
精华内容
下载资源
问答
  • Netty

    万次阅读 2018-03-02 14:55:08
    Netty概述:1、netty是基于Java NIO的网络应用框架,client-server框架2、Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞...
    Netty概述:
    1、netty是基于Java NIO的网络应用框架,client-server框架
    2、Netty是一个高性能、异步事件驱动的NIO框架,它提供了对TCP、UDP和文件传输的支持,
    作为一个异步NIO框架,Netty的所有IO操作都是异步非阻塞的,
    通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果
    3、作为当前最流行的NIO框架,Netty在互联网领域、大数据分布式计算领域、游戏行业、通信行业等获得了广泛的应用,
    一些业界著名的开源组件也基于Netty的NIO框架构建。

    Netty创建步骤:

    NIO通讯服务端步骤:
    1、创建ServerSocketChannel,为它配置非阻塞模式
    2、绑定监听,配置TCP参数,录入backlog大小等
    3、创建一个独立的IO线程,用于轮询多路复用器Selector
    4、创建Selector,将之前的ServerSocketChannel注册到Selector上,并设置监听标识位SelectionKey.ACCEPT
    5、启动IO线程,在循环体中执行Selector.select()方法,轮询就绪的通道
    6、当轮询到处于就绪的通道时,需要进行判断操作位,如果是ACCEPT状态,说明是新的客户端介入,则调用accept方法接受新的客户端。
    7、设置新接入客户端的一些参数,并将其通道继续注册到Selector之中。设置监听标识等
    8、如果轮询的通道操作位是READ,则进行读取,构造Buffer对象等
    9、更细节的还有数据没发送完成继续发送的问题


    Netty实现通讯的步骤:
    1、创建两个NIO线程组,一个专门用来网络事件处理(接受客户端连接),另一个则进行网络通讯读写
    2、创建一个ServerBootstrap对象,配置Netty的一系列参数,例如接受传入数据的缓存大小等。
    3、创建一个实际处理数据的类ChannelInitializer,进行初始化的准备工作,比如设置传入数据的字符集,格式,实现实际处理数据的接口。
    4、绑定端口,执行同步阻塞方法等待服务器启动即可。

    当对于NIO模型,netty简单、健壮、性能稳定,而且这几步都是模板式开发,以后可以直接用,开发只需专注实际处理数据类的实现。


    Netty最佳实践(数据通讯、心跳检测)

    netty服务最好可以单独作为一个项目,当然也可以与web项目集成在一起发布到tomcat,
    这样好处是可以用到web项目中的service方法,但是web项目8080关闭,netty监听的端口号也关闭了
    所以netty可以打成jar包运行,当然如果要用到service层的代码,也可以将service层的代码打成jar包
    给netty业务类使用。

    netty通讯的方式:
    ①使用长连接通道不断开的形式进行通信,也就是服务器和客户端的通道一直处于开启状态,如果服务器的
    性能比较好,而且客户端的数量也不多的情况下,可以考虑这种方式
    ②一次性批量提交数据,采用短连接的方式,也就是我们把数据保存在本地临时缓冲区或者临时表中,
    当达到临界值时进行一次性批量提交,又或者根据定时任务轮询提交,这种情况下弊端是做不到
    实时性传输,在实时性要求不高的程序中可以采用
    ③采用一种特殊的长连接,在指定某一段时间之内,服务端和某台客户端没有任何通讯,则断开连接,

    下次如果客户端要向服务端发送数据时,再次建立连接。

    但有两个因素要考虑:

    1、如何在超时(即服务端和客户端没有任何通信)后关闭通道?关闭后如何再次连接?

    2、客户端宕机,无需考虑,下次客户端重启后可以与服务端建立连接,但是服务器宕机怎么办?

    服务端代码Server:

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.timeout.ReadTimeoutHandler;
    
    public class Server {
    
    	public static void main(String[] args) throws Exception{
    		
    		EventLoopGroup pGroup = new NioEventLoopGroup(); //线程组:用来处理网络事件处理(接受客户端连接)
    		EventLoopGroup cGroup = new NioEventLoopGroup(); //线程组:用来进行网络通讯读写
    		
    		//Bootstrap用来配置参数
    		ServerBootstrap b = new ServerBootstrap();
    		b.group(pGroup, cGroup)
    		 .channel(NioServerSocketChannel.class) //注册服务端channel
    		 /**
    		  * BACKLOG用于构造服务端套接字ServerSocket对象,标识当服务器请求处理线程全满时,
    		  * 用于临时存放已完成三次握手的请求的队列的最大长度。如果未设置或所设置的值小于1,将使用默认值50。
    		  * 服务端处理客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接,多个客户端来的时候,
    		  * 服务端将不能处理的客户端连接请求放在队列中等待处理,backlog参数指定了队列的大小
    		  */
    		 .option(ChannelOption.SO_BACKLOG, 1024)
    		 //设置日志
    		 .handler(new LoggingHandler(LogLevel.INFO))
    		 .childHandler(new ChannelInitializer<SocketChannel>() {
    			protected void initChannel(SocketChannel sc) throws Exception {
    				//marshaliing的编解码操作,要传输对象,必须编解码
    				sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
    				sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
    				//5s没有交互,就会关闭channel
    				sc.pipeline().addLast(new ReadTimeoutHandler(5)); 
    				sc.pipeline().addLast(new ServerHandler());   //服务端业务处理类
    			}
    		});
    		ChannelFuture cf = b.bind(8765).sync();
    		
    		cf.channel().closeFuture().sync();
    		pGroup.shutdownGracefully();
    		cGroup.shutdownGracefully();
    	}
    }


    客户端代码:

    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import io.netty.handler.timeout.ReadTimeoutHandler;
    import java.util.concurrent.TimeUnit;
    
    public class Client {
    	
    	private static class SingletonHolder {
    		static final Client instance = new Client();
    	}
    	
    	public static Client getInstance(){
    		return SingletonHolder.instance;
    	}
    	
    	private EventLoopGroup group;
    	private Bootstrap b;
    	private ChannelFuture cf ;
    	
    	private Client(){
    			group = new NioEventLoopGroup();
    			b = new Bootstrap();
    			b.group(group)
    			 .channel(NioSocketChannel.class)
    			 .handler(new LoggingHandler(LogLevel.INFO))
    			 .handler(new ChannelInitializer<SocketChannel>() {
    					@Override
    					protected void initChannel(SocketChannel sc) throws Exception {
    						sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
    						sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
    						//超时handler(当服务器端与客户端在指定时间以上没有任何进行通信,则会关闭响应的通道,主要为减小服务端资源占用)
    						sc.pipeline().addLast(new ReadTimeoutHandler(5)); 
    						sc.pipeline().addLast(new ClientHandler());  //客户端业务处理类
    					}
    		    });
    	}
    	
    	public void connect(){
    		try {
    			this.cf = b.connect("127.0.0.1", 8765).sync();
    			System.out.println("远程服务器已经连接, 可以进行数据交换..");				
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	}
    	
    	public ChannelFuture getChannelFuture(){
    		//如果管道没有被开启或者被关闭了,那么重连
    		if(this.cf == null){
    			this.connect();
    		}
    		if(!this.cf.channel().isActive()){
    			this.connect();
    		}
    		return this.cf;
    	}
    	
    	public static void main(String[] args) throws Exception{
    		final Client c = Client.getInstance();
    		
    		ChannelFuture cf = c.getChannelFuture();
    		for(int i = 1; i <= 3; i++ ){
    			//客户端发送的数据
    			UserParam request = new UserParam();
    			request.setId("" + i);
    			request.setName("pro" + i);
    			request.setRequestMessage("数据信息" + i);
    			
    			cf.channel().writeAndFlush(request);
    			TimeUnit.SECONDS.sleep(4);
    		}
    		//当5s没有交互,就会异步关闭channel
    		cf.channel().closeFuture().sync();
    		
    		//再模拟一次传输
    		new Thread(new Runnable() {
    			@Override
    			public void run() {
    				try {
    					ChannelFuture cf = c.getChannelFuture();
    					//System.out.println(cf.channel().isActive());
    					//System.out.println(cf.channel().isOpen());
    					
    					//再次发送数据
    					UserParam request = new UserParam();
    					request.setId("" + 4);
    					request.setName("pro" + 4);
    					request.setRequestMessage("数据信息" + 4);
    					
    					cf.channel().writeAndFlush(request);					
    					cf.channel().closeFuture().sync();
    					System.out.println("子线程结束.");
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    		}).start();
    		
    		System.out.println("断开连接,主线程结束..");
    	}
    }
    

    服务端处理类:
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    public class ServerHandler extends ChannelHandlerAdapter{
    
    	@Override
    	public void channelActive(ChannelHandlerContext ctx) throws Exception {
    
    	}
    
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		//接受客户端对象
    		UserParam user = (UserParam)msg;
    		System.out.println("客户端发来的消息 : " + user.getId() + ", " + user.getName() + ", " + user.getRequestMessage());
    		//给客户端返回对象
    		UserData response = new UserData();
    		response.setId(user.getId());
    		response.setName("response" + user.getId());
    		response.setResponseMessage("响应内容" + user.getId());
    		ctx.writeAndFlush(response);
    		//处理完毕,关闭服务端
    		//ctx.addListener(ChannelFutureListener.CLOSE);
    	}
    
    	@Override
    	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    		
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		ctx.close();
    	}
    }

    客户端处理类:

    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.ReferenceCountUtil;
    
    public class ClientHandler extends ChannelHandlerAdapter{
    	
    	@Override
    	public void channelActive(ChannelHandlerContext ctx) throws Exception {
    	}
    
    	@Override
    	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		try {
    			UserData user = (UserData)msg;
    			System.out.println("服务器返回的消息  : " + user.getId() + ", " + user.getName() + ", " + user.getResponseMessage());			
    		} finally {
    			ReferenceCountUtil.release(msg);
    		}
    	}
    
    	@Override
    	public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		ctx.close();
    	}
    }
    

    客户端传输的参数对象UserParam -- > id  name  requestMessage

    服务端传输的参数对象UserData   -- >  id  name responseMessage


    心跳检测:

    Server代码,Client代码是模板代码,基本都一样,不同是业务处理的方法。

    Server业务处理类ServerHeartBeatHandler:

    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    
    import java.util.HashMap;
    
    public class ServerHeartBeatHandler extends ChannelHandlerAdapter {
        
    	/**
    	 * key:ip value:auth **
    	 * 拥有的客户端列表,从数据库中或者配置文件中读取
    	 */
    	private static HashMap<String, String> AUTH_IP_MAP = new HashMap<String, String>();
    	//模拟授权的key
    	private static final String SUCCESS_KEY = "auth_success_key";
    	
    	static {
    		AUTH_IP_MAP.put("192.168.1.200", "1234");
    	}
    	
    	private boolean auth(ChannelHandlerContext ctx, Object msg){
    			//System.out.println(msg);
    			String [] ret = ((String) msg).split(",");
    			String auth = AUTH_IP_MAP.get(ret[0]);
    			if(auth != null && auth.equals(ret[1])){
    				ctx.writeAndFlush(SUCCESS_KEY);
    				return true;
    			} else {
    				ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE);
    				return false;
    			}
    	}
    	
    	@Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    		if(msg instanceof String){
    			auth(ctx, msg);
    		} else if (msg instanceof RequestInfo) {
    			//接受客户端发来的他机器的性能参数
    			RequestInfo info = (RequestInfo) msg;
    			System.out.println("--------------------------------------------");
    			System.out.println("当前主机ip为: " + info.getIp());
    			System.out.println("当前主机cpu情况: ");
    			HashMap<String, Object> cpu = info.getCpuPercMap();
    			System.out.println("总使用率: " + cpu.get("combined"));
    			System.out.println("用户使用率: " + cpu.get("user"));
    			System.out.println("系统使用率: " + cpu.get("sys"));
    			System.out.println("等待率: " + cpu.get("wait"));
    			System.out.println("空闲率: " + cpu.get("idle"));
    			
    			System.out.println("当前主机memory情况: ");
    			HashMap<String, Object> memory = info.getMemoryMap();
    			System.out.println("内存总量: " + memory.get("total"));
    			System.out.println("当前内存使用量: " + memory.get("used"));
    			System.out.println("当前内存剩余量: " + memory.get("free"));
    			System.out.println("--------------------------------------------");
    			
    			//通知客户端消息已收到
    			ctx.writeAndFlush("info received!");
    		}else {
    			ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE);
    		}
        }
    
    
    }

    Client业务处理类ClienHeartBeattHandler:

    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.ReferenceCountUtil;
    import java.net.InetAddress;
    import java.util.HashMap;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.ScheduledFuture;
    import java.util.concurrent.TimeUnit;
    import org.hyperic.sigar.CpuPerc;
    import org.hyperic.sigar.Mem;
    import org.hyperic.sigar.Sigar;
    
    public class ClienHeartBeattHandler extends ChannelHandlerAdapter {
    
        private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        
        private ScheduledFuture<?> heartBeat;
    	//主动向服务器发送认证信息
        private InetAddress addr ;
        
        private static final String SUCCESS_KEY = "auth_success_key";
    
    	@Override
    	public void channelActive(ChannelHandlerContext ctx) throws Exception {
    		addr = InetAddress.getLocalHost();
            String ip = addr.getHostAddress();
    		String key = "1234";
    		//证书
    		String auth = ip + "," + key;
    		ctx.writeAndFlush(auth);
    	}
    	
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        	try {
            	if(msg instanceof String){
            		String ret = (String)msg;
            		if(SUCCESS_KEY.equals(ret)){
            	    	// 握手成功,主动发送心跳消息
            	    	this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 2, TimeUnit.SECONDS);
            		    System.out.println(msg);    			
            		}
            		else {
            			System.out.println(msg);
            		}
            	}
    		} finally {
    			ReferenceCountUtil.release(msg);
    		}
        }
    
        private class HeartBeatTask implements Runnable {
        	private final ChannelHandlerContext ctx;
    
    		public HeartBeatTask(final ChannelHandlerContext ctx) {
    		    this.ctx = ctx;
    		}
    	
    		@Override
    		public void run() {
    			try {
    			    RequestInfo info = new RequestInfo();
    			    //ip
    			    info.setIp(addr.getHostAddress());
    		        Sigar sigar = new Sigar();
    		        //cpu prec
    		        CpuPerc cpuPerc = sigar.getCpuPerc();
    		        HashMap<String, Object> cpuPercMap = new HashMap<String, Object>();
    		        cpuPercMap.put("combined", cpuPerc.getCombined());
    		        cpuPercMap.put("user", cpuPerc.getUser());
    		        cpuPercMap.put("sys", cpuPerc.getSys());
    		        cpuPercMap.put("wait", cpuPerc.getWait());
    		        cpuPercMap.put("idle", cpuPerc.getIdle());
    		        // memory
    		        Mem mem = sigar.getMem();
    				HashMap<String, Object> memoryMap = new HashMap<String, Object>();
    				memoryMap.put("total", mem.getTotal() / 1024L);
    				memoryMap.put("used", mem.getUsed() / 1024L);
    				memoryMap.put("free", mem.getFree() / 1024L);
    				info.setCpuPercMap(cpuPercMap);
    			    info.setMemoryMap(memoryMap);
    			    ctx.writeAndFlush(info);
    			    
    			} catch (Exception e) {
    				e.printStackTrace();
    			}
    		}
    
    	    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    	    	cause.printStackTrace();
    			if (heartBeat != null) {
    			    heartBeat.cancel(true);
    			    heartBeat = null;
    			}
    			ctx.fireExceptionCaught(cause);
    	    }
    	    
    	}
    }

    netty编解码技术:
    java序列化技术,序列化目的:
    ①网络传输(网络协议是基于二进制的,内存中的参数的值要序列化成二进制的形式)
    ②对象持久化(对象必须在JVM中存活,不可能超过JVM的生命周期)


    虽然我们可以使用java进行对象序列化,netty去传输,但是java序列化的硬伤太多:
    1.无法跨语言。这应该是java序列化最致命的问题了。
    由于java序列化是java内部私有的协议,其他语言不支持,导致别的语言无法反序列化,这严重阻碍了它的应用。
    关于跨语言问题,也就是对象传输,一般都采用json字符串。
    2.序列后的码流太大。java序列化的大小是二进制编码的5倍多!
    3.序列化性能太低。java序列化的性能只有二进制编码的6.17倍,可见java序列化性能实在太差了。

    我们判断一个编码框架的优劣主要从以下几个方面:
    1.是否支持跨语言,支持语种是否丰富
    2.编码后的码流
    3.编解码的性能
    4.类库是否小巧,API使用是否方便

    5.使用者开发的工作量和难度。

    java序列化前3条变现太差,导致在远程服务调用中很少用它

    主流的编解码框架:
    JBoss的Marshalling包
    对jdk默认的序列化进行了优化,又保持跟java.io.Serializable接口的兼容,同时增加了一些可调的参数和附加特性,
    并且这些参数和特性可通过工厂类的配置
    1.可拔插的类解析器,提供更加便捷的类加载定制策略,通过一个接口即可实现定制。
    2.可拔插的对象替换技术,不需要通过继承的方式。
    3.可拔插的预定义类缓存表,可以减少序列化的字节数组长度,提升常用类型的对象序列化性能。
    4.无须实现java.io.Serializable接口
    5.通过缓存技术提升对象的序列化性能。
    6.使用非常简单
    ②google的Protobuf
    ③基于Protobuf的Kyro
    ④MessagePack框架

    Marshalling工具类:

    import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
    import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
    import io.netty.handler.codec.marshalling.MarshallerProvider;
    import io.netty.handler.codec.marshalling.MarshallingDecoder;
    import io.netty.handler.codec.marshalling.MarshallingEncoder;
    import io.netty.handler.codec.marshalling.UnmarshallerProvider;
    import org.jboss.marshalling.MarshallerFactory;
    import org.jboss.marshalling.Marshalling;
    import org.jboss.marshalling.MarshallingConfiguration;
    
    public final class MarshallingCodeCFactory {
        /**
         * 创建Jboss Marshalling解码器MarshallingDecoder
         * @return MarshallingDecoder
         */
        public static MarshallingDecoder buildMarshallingDecoder() {
        	//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
    		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
    		//创建了MarshallingConfiguration对象,配置了版本号为5 
    		final MarshallingConfiguration configuration = new MarshallingConfiguration();
    		configuration.setVersion(5);
    		//根据marshallerFactory和configuration创建provider
    		UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
    		//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
    		MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
    		return decoder;
        }
    
        /**
         * 创建Jboss Marshalling编码器MarshallingEncoder
         * @return MarshallingEncoder
         */
        public static MarshallingEncoder buildMarshallingEncoder() {
    		final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
    		final MarshallingConfiguration configuration = new MarshallingConfiguration();
    		configuration.setVersion(5);
    		MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
    		//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
    		MarshallingEncoder encoder = new MarshallingEncoder(provider);
    		return encoder;
        }
    }

    RPC(Remote Procedure Call):
    RPC是指远程过程调用,也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数/方法,
    由于不在一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。

    比如远程调用方法:Employee getEmployeeByName(String fullName)
    1、要解决通讯的问题,主要是通过在客户端和服务器之间建立TCP连接,远程过程调用的所有交换的数据都在这个连接里传输。
    连接可以是按需连接,调用结束后就断掉,也可以是长连接,多个远程过程调用共享同一个连接。

    2、要解决寻址的问题,也就是说,A服务器上的应用怎么告诉底层的RPC框架,
    如何连接到B服务器(如主机或IP地址)以及特定的端口,方法的名称名称是什么,这样才能完成调用。
    比如基于Web服务协议栈的RPC,就要提供一个endpoint URI,或者是从UDDI服务上查找。
    如果是RMI调用的话,还需要一个RMI Registry来注册服务的地址。

    3、要解决编码的问题,当A服务器上的应用发起远程过程调用时,方法的参数需要通过底层的网络协议如TCP传递到B服务器,
    由于网络协议是基于二进制的,内存中的参数的值要序列化成二进制的形式,也就是序列化(Serialize)或编组(marshal),
    通过寻址和传输将序列化的二进制发送给B服务器。

    4、要解决解码的问题,B服务器收到请求后,需要对参数进行反序列化(序列化的逆操作),恢复为内存中的表达方式,
    然后找到对应的方法(寻址的一部分)进行本地调用,然后得到返回值。

    5、返回值还要发送回服务器A上的应用,也要经过序列化的方式发送,服务器A接到后,再反序列化,
    恢复为内存中的表达方式,交给A服务器上的应用

    为什么RPC呢?
    就是无法在一个进程内,甚至一个计算机内通过本地调用的方式完成的需求,
    比如不同的系统间的通讯,甚至不同的组织间的通讯。由于计算能力需要横向扩展,需要在多台机器组成的集群上部署应用。


    而Netty框架不局限于RPC,更多的是作为一种网络协议的实现框架,
    由于RPC需要高效的网络通信,就可能选择以Netty作为基础

    展开全文
  • netty

    千次阅读 2020-02-17 13:34:45
    什么是NettyNetty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。 Netty 是一个广泛使用的 Java 网络编程框架(Netty 在 2011 年获得了Duke's Choice ...

    什么是Netty?

    Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。
    Netty 是一个广泛使用的 Java 网络编程框架(Netty 在 2011 年获得了Duke's Choice Award,见https://www.java.net/dukeschoice/2011)。它活跃和成长于用户社区,像大型公司 Facebook 和 Instagram 以及流行 开源项目如 Infinispan, HornetQ, Vert.x, Apache Cassandra 和 Elasticsearch 等,都利用其强大的对于网络抽象的核心代码。

    以上是摘自《Essential Netty In Action》这本书,本文的内容也是本人读了这本书之后的一些整理心得,如有不当之处欢迎大虾们指正

    Netty和Tomcat有什么区别?

    Netty和Tomcat最大的区别就在于通信协议,Tomcat是基于Http协议的,他的实质是一个基于http协议的web容器,但是Netty不一样,他能通过编程自定义各种协议,因为netty能够通过codec自己来编码/解码字节流,完成类似redis访问的功能,这就是netty和tomcat最大的不同。

    有人说netty的性能就一定比tomcat性能高,其实不然,tomcat从6.x开始就支持了nio模式,并且后续还有APR模式——一种通过jni调用apache网络库的模式,相比于旧的bio模式,并发性能得到了很大提高,特别是APR模式,而netty是否比tomcat性能更高,则要取决于netty程序作者的技术实力了。

    为什么Netty受欢迎?

    如第一部分所述,netty是一款收到大公司青睐的框架,在我看来,netty能够受到青睐的原因有三:

    1. 并发高
    2. 传输快
    3. 封装好

    Netty为什么并发高

    Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,对比于BIO(Blocking I/O,阻塞IO),他的并发性能得到了很大提高,两张图让你了解BIO和NIO的区别:

    阻塞IO的通信方式

     

    非阻塞IO的通信方式


    从这两图可以看出,NIO的单线程能处理连接的数量比BIO要高出很多,而为什么单线程能处理更多的连接呢?原因就是图二中出现的Selector
    当一个连接建立之后,他有两个步骤要做,第一步是接收完客户端发过来的全部数据,第二步是服务端处理完请求业务之后返回response给客户端。NIO和BIO的区别主要是在第一步。
    在BIO中,等待客户端发数据这个过程是阻塞的,这样就造成了一个线程只能处理一个请求的情况,而机器能支持的最大线程数是有限的,这就是为什么BIO不能支持高并发的原因。
    而NIO中,当一个Socket建立好之后,Thread并不会阻塞去接受这个Socket,而是将这个请求交给Selector,Selector会不断的去遍历所有的Socket,一旦有一个Socket建立完成,他会通知Thread,然后Thread处理完数据再返回给客户端——这个过程是不阻塞的,这样就能让一个Thread处理更多的请求了。
    下面两张图是基于BIO的处理流程和netty的处理流程,辅助你理解两种方式的差别:

    BIO的处理流程

     

    NIO的处理流程

     

    除了BIO和NIO之外,还有一些其他的IO模型,下面这张图就表示了五种IO模型的处理流程:

     

    五种常见的IO模型

    • BIO,同步阻塞IO,阻塞整个步骤,如果连接少,他的延迟是最低的,因为一个线程只处理一个连接,适用于少连接且延迟低的场景,比如说数据库连接。
    • NIO,同步非阻塞IO,阻塞业务处理但不阻塞数据接收,适用于高并发且处理简单的场景,比如聊天软件。
    • 多路复用IO,他的两个步骤处理是分开的,也就是说,一个连接可能他的数据接收是线程a完成的,数据处理是线程b完成的,他比BIO能处理更多请求。
    • 信号驱动IO,这种IO模型主要用在嵌入式开发,不参与讨论。
    • 异步IO,他的数据请求和数据处理都是异步的,数据请求一次返回一次,适用于长连接的业务场景。

    以上摘自Linux IO模式及 select、poll、epoll详解

    Netty为什么传输快

    Netty的传输快其实也是依赖了NIO的一个特性——零拷贝。我们知道,Java的内存有堆内存、栈内存和字符串常量池等等,其中堆内存是占用内存空间最大的一块,也是Java对象存放的地方,一般我们的数据如果需要从IO读取到堆内存,中间需要经过Socket缓冲区,也就是说一个数据会被拷贝两次才能到达他的的终点,如果数据量大,就会造成不必要的资源浪费。
    Netty针对这种情况,使用了NIO中的另一大特性——零拷贝,当他需要接收数据的时候,他会在堆内存之外开辟一块内存,数据就直接从IO读到了那块内存中去,在netty里面通过ByteBuf可以直接对这些数据进行直接操作,从而加快了传输速度。
    下两图就介绍了两种拷贝方式的区别,摘自Linux 中的零拷贝技术,第 1 部分

    传统数据拷贝

     

    零拷贝

    上文介绍的ByteBuf是Netty的一个重要概念,他是netty数据处理的容器,也是Netty封装好的一个重要体现,将在下一部分做详细介绍。

    为什么说Netty封装好?

    要说Netty为什么封装好,这种用文字是说不清的,直接上代码:

    • 阻塞I/O

     

    public class PlainOioServer {
    
        public void serve(int port) throws IOException {
            final ServerSocket socket = new ServerSocket(port);     //1
            try {
                for (;;) {
                    final Socket clientSocket = socket.accept();    //2
                    System.out.println("Accepted connection from " + clientSocket);
    
                    new Thread(new Runnable() {                        //3
                        @Override
                        public void run() {
                            OutputStream out;
                            try {
                                out = clientSocket.getOutputStream();
                                out.write("Hi!\r\n".getBytes(Charset.forName("UTF-8")));                            //4
                                out.flush();
                                clientSocket.close();                //5
    
                            } catch (IOException e) {
                                e.printStackTrace();
                                try {
                                    clientSocket.close();
                                } catch (IOException ex) {
                                    // ignore on close
                                }
                            }
                        }
                    }).start();                                        //6
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    • 非阻塞IO

     

    public class PlainNioServer {
        public void serve(int port) throws IOException {
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking(false);
            ServerSocket ss = serverChannel.socket();
            InetSocketAddress address = new InetSocketAddress(port);
            ss.bind(address);                                            //1
            Selector selector = Selector.open();                        //2
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);    //3
            final ByteBuffer msg = ByteBuffer.wrap("Hi!\r\n".getBytes());
            for (;;) {
                try {
                    selector.select();                                    //4
                } catch (IOException ex) {
                    ex.printStackTrace();
                    // handle exception
                    break;
                }
                Set<SelectionKey> readyKeys = selector.selectedKeys();    //5
                Iterator<SelectionKey> iterator = readyKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    try {
                        if (key.isAcceptable()) {                //6
                            ServerSocketChannel server =
                                    (ServerSocketChannel)key.channel();
                            SocketChannel client = server.accept();
                            client.configureBlocking(false);
                            client.register(selector, SelectionKey.OP_WRITE |
                                    SelectionKey.OP_READ, msg.duplicate());    //7
                            System.out.println(
                                    "Accepted connection from " + client);
                        }
                        if (key.isWritable()) {                //8
                            SocketChannel client =
                                    (SocketChannel)key.channel();
                            ByteBuffer buffer =
                                    (ByteBuffer)key.attachment();
                            while (buffer.hasRemaining()) {
                                if (client.write(buffer) == 0) {        //9
                                    break;
                                }
                            }
                            client.close();                    //10
                        }
                    } catch (IOException ex) {
                        key.cancel();
                        try {
                            key.channel().close();
                        } catch (IOException cex) {
                            // 在关闭时忽略
                        }
                    }
                }
            }
        }
    }
    
    • Netty

     

    public class NettyOioServer {
    
        public void server(int port) throws Exception {
            final ByteBuf buf = Unpooled.unreleasableBuffer(
                    Unpooled.copiedBuffer("Hi!\r\n", Charset.forName("UTF-8")));
            EventLoopGroup group = new OioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();        //1
    
                b.group(group)                                    //2
                 .channel(OioServerSocketChannel.class)
                 .localAddress(new InetSocketAddress(port))
                 .childHandler(new ChannelInitializer<SocketChannel>() {//3
                     @Override
                     public void initChannel(SocketChannel ch) 
                         throws Exception {
                         ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {            //4
                             @Override
                             public void channelActive(ChannelHandlerContext ctx) throws Exception {
                                 ctx.writeAndFlush(buf.duplicate()).addListener(ChannelFutureListener.CLOSE);//5
                             }
                         });
                     }
                 });
                ChannelFuture f = b.bind().sync();  //6
                f.channel().closeFuture().sync();
            } finally {
                group.shutdownGracefully().sync();        //7
            }
        }
    }
    

    从代码量上来看,Netty就已经秒杀传统Socket编程了,但是这一部分博大精深,仅仅贴几个代码岂能说明问题,在这里给大家介绍一下Netty的一些重要概念,让大家更理解Netty。

    • Channel
      数据传输流,与channel相关的概念有以下四个,上一张图让你了解netty里面的Channel。

       

      Channel一览

      • Channel,表示一个连接,可以理解为每一个请求,就是一个Channel。
      • ChannelHandler,核心处理业务就在这里,用于处理业务请求。
      • ChannelHandlerContext,用于传输业务数据。
      • ChannelPipeline,用于保存处理过程需要用到的ChannelHandler和ChannelHandlerContext。
    • ByteBuf
      ByteBuf是一个存储字节的容器,最大特点就是使用方便,它既有自己的读索引和写索引,方便你对整段字节缓存进行读写,也支持get/set,方便你对其中每一个字节进行读写,他的数据结构如下图所示:

    ByteBuf数据结构

     

    他有三种使用模式:

    1. Heap Buffer 堆缓冲区
      堆缓冲区是ByteBuf最常用的模式,他将数据存储在堆空间。
    2. Direct Buffer 直接缓冲区
      直接缓冲区是ByteBuf的另外一种常用模式,他的内存分配都不发生在堆,jdk1.4引入的nio的ByteBuffer类允许jvm通过本地方法调用分配内存,这样做有两个好处
      • 通过免去中间交换的内存拷贝, 提升IO处理速度; 直接缓冲区的内容可以驻留在垃圾回收扫描的堆区以外。
      • DirectBuffer 在 -XX:MaxDirectMemorySize=xxM大小限制下, 使用 Heap 之外的内存, GC对此”无能为力”,也就意味着规避了在高负载下频繁的GC过程对应用线程的中断影响.
    3. Composite Buffer 复合缓冲区
      复合缓冲区相当于多个不同ByteBuf的视图,这是netty提供的,jdk不提供这样的功能。

    除此之外,他还提供一大堆api方便你使用,在这里我就不一一列出了,具体参见ByteBuf字节缓存

    • Codec
      Netty中的编码/解码器,通过他你能完成字节与pojo、pojo与pojo的相互转换,从而达到自定义协议的目的。
      在Netty里面最有名的就是HttpRequestDecoder和HttpResponseEncoder了。



    原文链接:https://www.jianshu.com/p/b9f3f6a16911
     

    展开全文
  • Netty面试题(2020最新版)

    万次阅读 多人点赞 2020-02-19 12:45:30
    文章目录1.Netty 是什么?2.Netty 的特点是什么?3.Netty 的优势有哪些?4.Netty 的应用场景有哪些?5.Netty 高性能表现在哪些方面?6.BIO、NIO和AIO的区别?7.NIO的组成?8.Netty的线程模型?9.TCP 粘包/拆包的原因...

    大家好,我是CSDN的博主ThinkWon,“2020博客之星年度总评选"开始啦,希望大家帮我投票,每天都可以投多票哦,点击下方链接,然后点击"最大”,再点击"投TA一票"就可以啦!
    投票链接:https://bss.csdn.net/m/topic/blog_star2020/detail?username=thinkwon
    在技术的世界里,ThinkWon将一路与你相伴!创作出更多更高质量的文章!2020为努力奋斗的你点赞👍,️新的一年,祝各位大牛牛气冲天,牛年大吉!😊😊

    Java面试总结汇总,整理了包括Java基础知识,集合容器,并发编程,JVM,常用开源框架Spring,MyBatis,数据库,中间件等,包含了作为一个Java工程师在面试中需要用到或者可能用到的绝大部分知识。欢迎大家阅读,本人见识有限,写的博客难免有错误或者疏忽的地方,还望各位大佬指点,在此表示感激不尽。文章持续更新中…

    序号 内容 链接地址
    1 Java基础知识面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104390612
    2 Java集合容器面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104588551
    3 Java异常面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104390689
    4 并发编程面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104863992
    5 JVM面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104390752
    6 Spring面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104397516
    7 Spring MVC面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104397427
    8 Spring Boot面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104397299
    9 Spring Cloud面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104397367
    10 MyBatis面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/101292950
    11 Redis面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/103522351
    12 MySQL数据库面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104778621
    13 消息中间件MQ与RabbitMQ面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104588612
    14 Dubbo面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104390006
    15 Linux面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104588679
    16 Tomcat面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104397665
    17 ZooKeeper面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104397719
    18 Netty面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/104391081
    19 架构设计&分布式&数据结构与算法面试题(2020最新版) https://thinkwon.blog.csdn.net/article/details/105870730

    1.Netty 是什么?

    Netty是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。Netty是基于nio的,它封装了jdk的nio,让我们使用起来更加方法灵活。

    2.Netty 的特点是什么?

    • 高并发:Netty 是一款基于 NIO(Nonblocking IO,非阻塞IO)开发的网络通信框架,对比于 BIO(Blocking I/O,阻塞IO),他的并发性能得到了很大提高。
    • 传输快:Netty 的传输依赖于零拷贝特性,尽量减少不必要的内存拷贝,实现了更高效率的传输。
    • 封装好:Netty 封装了 NIO 操作的很多细节,提供了易于使用调用接口。

    3.Netty 的优势有哪些?

    • 使用简单:封装了 NIO 的很多细节,使用更简单。
    • 功能强大:预置了多种编解码功能,支持多种主流协议。
    • 定制能力强:可以通过 ChannelHandler 对通信框架进行灵活地扩展。
    • 性能高:通过与其他业界主流的 NIO 框架对比,Netty 的综合性能最优。
    • 稳定:Netty 修复了已经发现的所有 NIO 的 bug,让开发人员可以专注于业务本身。
    • 社区活跃:Netty 是活跃的开源项目,版本迭代周期短,bug 修复速度快。

    4.Netty 的应用场景有哪些?

    典型的应用有:阿里分布式服务框架 Dubbo,默认使用 Netty 作为基础通信组件,还有 RocketMQ 也是使用 Netty 作为通讯的基础。

    5.Netty 高性能表现在哪些方面?

    • IO 线程模型:同步非阻塞,用最少的资源做更多的事。
    • 内存零拷贝:尽量减少不必要的内存拷贝,实现了更高效率的传输。
    • 内存池设计:申请的内存可以重用,主要指直接内存。内部实现是用一颗二叉查找树管理内存分配情况。
    • 串形化处理读写:避免使用锁带来的性能开销。
    • 高性能序列化协议:支持 protobuf 等高性能序列化协议。

    6.BIO、NIO和AIO的区别?

    BIO:一个连接一个线程,客户端有连接请求时服务器端就需要启动一个线程进行处理。线程开销大。
    伪异步IO:将请求连接放入线程池,一对多,但线程还是很宝贵的资源。

    NIO:一个请求一个线程,但客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有I/O请求时才启动一个线程进行处理。

    AIO:一个有效请求一个线程,客户端的I/O请求都是由OS先完成了再通知服务器应用去启动线程进行处理,

    BIO是面向流的,NIO是面向缓冲区的;BIO的各种流是阻塞的。而NIO是非阻塞的;BIO的Stream是单向的,而NIO的channel是双向的。

    NIO的特点:事件驱动模型、单线程处理多任务、非阻塞I/O,I/O读写不再阻塞,而是返回0、基于block的传输比基于流的传输更高效、更高级的IO函数zero-copy、IO多路复用大大提高了Java网络应用的可伸缩性和实用性。基于Reactor线程模型。

    在Reactor模式中,事件分发器等待某个事件或者可应用或个操作的状态发生,事件分发器就把这个事件传给事先注册的事件处理函数或者回调函数,由后者来做实际的读写操作。如在Reactor中实现读:注册读就绪事件和相应的事件处理器、事件分发器等待事件、事件到来,激活分发器,分发器调用事件对应的处理器、事件处理器完成实际的读操作,处理读到的数据,注册新的事件,然后返还控制权。

    7.NIO的组成?

    Buffer:与Channel进行交互,数据是从Channel读入缓冲区,从缓冲区写入Channel中的

    flip方法 : 反转此缓冲区,将position给limit,然后将position置为0,其实就是切换读写模式

    clear方法 :清除此缓冲区,将position置为0,把capacity的值给limit。

    rewind方法 : 重绕此缓冲区,将position置为0

    DirectByteBuffer可减少一次系统空间到用户空间的拷贝。但Buffer创建和销毁的成本更高,不可控,通常会用内存池来提高性能。直接缓冲区主要分配给那些易受基础系统的本机I/O 操作影响的大型、持久的缓冲区。如果数据量比较小的中小应用情况下,可以考虑使用heapBuffer,由JVM进行管理。

    Channel:表示 IO 源与目标打开的连接,是双向的,但不能直接访问数据,只能与Buffer 进行交互。通过源码可知,FileChannel的read方法和write方法都导致数据复制了两次!

    Selector可使一个单独的线程管理多个Channel,open方法可创建Selector,register方法向多路复用器器注册通道,可以监听的事件类型:读、写、连接、accept。注册事件后会产生一个SelectionKey:它表示SelectableChannel 和Selector 之间的注册关系,wakeup方法:使尚未返回的第一个选择操作立即返回,唤醒的

    原因是:注册了新的channel或者事件;channel关闭,取消注册;优先级更高的事件触发(如定时器事件),希望及时处理。

    Selector在Linux的实现类是EPollSelectorImpl,委托给EPollArrayWrapper实现,其中三个native方法是对epoll的封装,而EPollSelectorImpl. implRegister方法,通过调用epoll_ctl向epoll实例中注册事件,还将注册的文件描述符(fd)与SelectionKey的对应关系添加到fdToKey中,这个map维护了文件描述符与SelectionKey的映射。

    fdToKey有时会变得非常大,因为注册到Selector上的Channel非常多(百万连接);过期或失效的Channel没有及时关闭。fdToKey总是串行读取的,而读取是在select方法中进行的,该方法是非线程安全的。

    Pipe:两个线程之间的单向数据连接,数据会被写到sink通道,从source通道读取

    NIO的服务端建立过程:Selector.open():打开一个Selector;ServerSocketChannel.open():创建服务端的Channel;bind():绑定到某个端口上。并配置非阻塞模式;register():注册Channel和关注的事件到Selector上;select()轮询拿到已经就绪的事件

    8.Netty的线程模型?

    Netty通过Reactor模型基于多路复用器接收并处理用户请求,内部实现了两个线程池,boss线程池和work线程池,其中boss线程池的线程负责处理请求的accept事件,当接收到accept事件的请求时,把对应的socket封装到一个NioSocketChannel中,并交给work线程池,其中work线程池负责请求的read和write事件,由对应的Handler处理。

    单线程模型:所有I/O操作都由一个线程完成,即多路复用、事件分发和处理都是在一个Reactor线程上完成的。既要接收客户端的连接请求,向服务端发起连接,又要发送/读取请求或应答/响应消息。一个NIO 线程同时处理成百上千的链路,性能上无法支撑,速度慢,若线程进入死循环,整个程序不可用,对于高负载、大并发的应用场景不合适。

    多线程模型:有一个NIO 线程(Acceptor) 只负责监听服务端,接收客户端的TCP 连接请求;NIO 线程池负责网络IO 的操作,即消息的读取、解码、编码和发送;1 个NIO 线程可以同时处理N 条链路,但是1 个链路只对应1 个NIO 线程,这是为了防止发生并发操作问题。但在并发百万客户端连接或需要安全认证时,一个Acceptor 线程可能会存在性能不足问题。

    主从多线程模型:Acceptor 线程用于绑定监听端口,接收客户端连接,将SocketChannel 从主线程池的Reactor 线程的多路复用器上移除,重新注册到Sub 线程池的线程上,用于处理I/O 的读写等操作,从而保证mainReactor只负责接入认证、握手等操作;

    9.TCP 粘包/拆包的原因及解决方法?

    TCP是以流的方式来处理数据,一个完整的包可能会被TCP拆分成多个包进行发送,也可能把小的封装成一个大的数据包发送。

    TCP粘包/分包的原因:

    应用程序写入的字节大小大于套接字发送缓冲区的大小,会发生拆包现象,而应用程序写入数据小于套接字缓冲区大小,网卡将应用多次写入的数据发送到网络上,这将会发生粘包现象;

    进行MSS大小的TCP分段,当TCP报文长度-TCP头部长度>MSS的时候将发生拆包
    以太网帧的payload(净荷)大于MTU(1500字节)进行ip分片。

    解决方法

    消息定长:FixedLengthFrameDecoder类

    包尾增加特殊字符分割:

    • 行分隔符类:LineBasedFrameDecoder
    • 或自定义分隔符类 :DelimiterBasedFrameDecoder

    将消息分为消息头和消息体:LengthFieldBasedFrameDecoder类。分为有头部的拆包与粘包、长度字段在前且有头部的拆包与粘包、多扩展头部的拆包与粘包。

    10.什么是 Netty 的零拷贝?

    Netty 的零拷贝主要包含三个方面:

    • Netty 的接收和发送 ByteBuffer 采用 DIRECT BUFFERS,使用堆外直接内存进行 Socket 读写,不需要进行字节缓冲区的二次拷贝。如果使用传统的堆内存(HEAP BUFFERS)进行 Socket 读写,JVM 会将堆内存 Buffer 拷贝一份到直接内存中,然后才写入 Socket 中。相比于堆外直接内存,消息在发送过程中多了一次缓冲区的内存拷贝。
    • Netty 提供了组合 Buffer 对象,可以聚合多个 ByteBuffer 对象,用户可以像操作一个 Buffer 那样方便的对组合 Buffer 进行操作,避免了传统通过内存拷贝的方式将几个小 Buffer 合并成一个大的 Buffer。
    • Netty 的文件传输采用了 transferTo 方法,它可以直接将文件缓冲区的数据发送到目标 Channel,避免了传统通过循环 write 方式导致的内存拷贝问题。

    11.Netty 中有哪种重要组件?

    • Channel:Netty 网络操作抽象类,它除了包括基本的 I/O 操作,如 bind、connect、read、write 等。
    • EventLoop:主要是配合 Channel 处理 I/O 操作,用来处理连接的生命周期中所发生的事情。
    • ChannelFuture:Netty 框架中所有的 I/O 操作都为异步的,因此我们需要 ChannelFuture 的 addListener()注册一个 ChannelFutureListener 监听事件,当操作执行成功或者失败时,监听就会自动触发返回结果。
    • ChannelHandler:充当了所有处理入站和出站数据的逻辑容器。ChannelHandler 主要用来处理各种事件,这里的事件很广泛,比如可以是连接、数据接收、异常、数据转换等。
    • ChannelPipeline:为 ChannelHandler 链提供了容器,当 channel 创建时,就会被自动分配到它专属的 ChannelPipeline,这个关联是永久性的。

    12.Netty 发送消息有几种方式?

    Netty 有两种发送消息的方式:

    • 直接写入 Channel 中,消息从 ChannelPipeline 当中尾部开始移动;
    • 写入和 ChannelHandler 绑定的 ChannelHandlerContext 中,消息从 ChannelPipeline 中的下一个 ChannelHandler 中移动。

    13.默认情况 Netty 起多少线程?何时启动?

    Netty 默认是 CPU 处理器数的两倍,bind 完之后启动。

    14.了解哪几种序列化协议?

    序列化(编码)是将对象序列化为二进制形式(字节数组),主要用于网络传输、数据持久化等;而反序列化(解码)则是将从网络、磁盘等读取的字节数组还原成原始对象,主要用于网络传输对象的解码,以便完成远程调用。

    影响序列化性能的关键因素:序列化后的码流大小(网络带宽的占用)、序列化的性能(CPU资源占用);是否支持跨语言(异构系统的对接和开发语言切换)。

    Java默认提供的序列化:无法跨语言、序列化后的码流太大、序列化的性能差

    XML,优点:人机可读性好,可指定元素或特性的名称。缺点:序列化数据只包含数据本身以及类的结构,不包括类型标识和程序集信息;只能序列化公共属性和字段;不能序列化方法;文件庞大,文件格式复杂,传输占带宽。适用场景:当做配置文件存储数据,实时数据转换。

    JSON,是一种轻量级的数据交换格式,优点:兼容性高、数据格式比较简单,易于读写、序列化后数据较小,可扩展性好,兼容性好、与XML相比,其协议比较简单,解析速度比较快。缺点:数据的描述性比XML差、不适合性能要求为ms级别的情况、额外空间开销比较大。适用场景(可替代XML):跨防火墙访问、可调式性要求高、基于Web browser的Ajax请求、传输数据量相对小,实时性要求相对低(例如秒级别)的服务。

    Fastjson,采用一种“假定有序快速匹配”的算法。优点:接口简单易用、目前java语言中最快的json库。缺点:过于注重快,而偏离了“标准”及功能性、代码质量不高,文档不全。适用场景:协议交互、Web输出、Android客户端

    Thrift,不仅是序列化协议,还是一个RPC框架。优点:序列化后的体积小, 速度快、支持多种语言和丰富的数据类型、对于数据字段的增删具有较强的兼容性、支持二进制压缩编码。缺点:使用者较少、跨防火墙访问时,不安全、不具有可读性,调试代码时相对困难、不能与其他传输层协议共同使用(例如HTTP)、无法支持向持久层直接读写数据,即不适合做数据持久化序列化协议。适用场景:分布式系统的RPC解决方案

    Avro,Hadoop的一个子项目,解决了JSON的冗长和没有IDL的问题。优点:支持丰富的数据类型、简单的动态语言结合功能、具有自我描述属性、提高了数据解析速度、快速可压缩的二进制数据形式、可以实现远程过程调用RPC、支持跨编程语言实现。缺点:对于习惯于静态类型语言的用户不直观。适用场景:在Hadoop中做Hive、Pig和MapReduce的持久化数据格式。

    Protobuf,将数据结构以.proto文件进行描述,通过代码生成工具可以生成对应数据结构的POJO对象和Protobuf相关的方法和属性。优点:序列化后码流小,性能高、结构化数据存储格式(XML JSON等)、通过标识字段的顺序,可以实现协议的前向兼容、结构化的文档更容易管理和维护。缺点:需要依赖于工具生成代码、支持的语言相对较少,官方只支持Java 、C++ 、python。适用场景:对性能要求高的RPC调用、具有良好的跨防火墙的访问属性、适合应用层对象的持久化

    其它

    protostuff 基于protobuf协议,但不需要配置proto文件,直接导包即可
    Jboss marshaling 可以直接序列化java类, 无须实java.io.Serializable接口
    Message pack 一个高效的二进制序列化格式
    Hessian 采用二进制协议的轻量级remoting onhttp工具
    kryo 基于protobuf协议,只支持java语言,需要注册(Registration),然后序列化(Output),反序列化(Input)

    15.如何选择序列化协议?

    具体场景

    对于公司间的系统调用,如果性能要求在100ms以上的服务,基于XML的SOAP协议是一个值得考虑的方案。
    基于Web browser的Ajax,以及Mobile app与服务端之间的通讯,JSON协议是首选。对于性能要求不太高,或者以动态类型语言为主,或者传输数据载荷很小的的运用场景,JSON也是非常不错的选择。
    对于调试环境比较恶劣的场景,采用JSON或XML能够极大的提高调试效率,降低系统开发成本。
    当对性能和简洁性有极高要求的场景,Protobuf,Thrift,Avro之间具有一定的竞争关系。
    对于T级别的数据的持久化应用场景,Protobuf和Avro是首要选择。如果持久化后的数据存储在hadoop子项目里,Avro会是更好的选择。

    对于持久层非Hadoop项目,以静态类型语言为主的应用场景,Protobuf会更符合静态类型语言工程师的开发习惯。由于Avro的设计理念偏向于动态类型语言,对于动态语言为主的应用场景,Avro是更好的选择。
    如果需要提供一个完整的RPC解决方案,Thrift是一个好的选择。
    如果序列化之后需要支持不同的传输层协议,或者需要跨防火墙访问的高性能场景,Protobuf可以优先考虑。
    protobuf的数据类型有多种:bool、double、float、int32、int64、string、bytes、enum、message。protobuf的限定符:required: 必须赋值,不能为空、optional:字段可以赋值,也可以不赋值、repeated: 该字段可以重复任意次数(包括0次)、枚举;只能用指定的常量集中的一个值作为其值;

    protobuf的基本规则:每个消息中必须至少留有一个required类型的字段、包含0个或多个optional类型的字段;repeated表示的字段可以包含0个或多个数据;[1,15]之内的标识号在编码的时候会占用一个字节(常用),[16,2047]之内的标识号则占用2个字节,标识号一定不能重复、使用消息类型,也可以将消息嵌套任意多层,可用嵌套消息类型来代替组。

    protobuf的消息升级原则:不要更改任何已有的字段的数值标识;不能移除已经存在的required字段,optional和repeated类型的字段可以被移除,但要保留标号不能被重用。新添加的字段必须是optional或repeated。因为旧版本程序无法读取或写入新增的required限定符的字段。

    编译器为每一个消息类型生成了一个.java文件,以及一个特殊的Builder类(该类是用来创建消息类接口的)。如:UserProto.User.Builder builder = UserProto.User.newBuilder();builder.build();

    Netty中的使用:ProtobufVarint32FrameDecoder 是用于处理半包消息的解码类;ProtobufDecoder(UserProto.User.getDefaultInstance())这是创建的UserProto.java文件中的解码类;ProtobufVarint32LengthFieldPrepender 对protobuf协议的消息头上加上一个长度为32的整形字段,用于标志这个消息的长度的类;ProtobufEncoder 是编码类

    将StringBuilder转换为ByteBuf类型:copiedBuffer()方法

    16.Netty 支持哪些心跳类型设置?

    • readerIdleTime:为读超时时间(即测试端一定时间内未接受到被测试端消息)。
    • writerIdleTime:为写超时时间(即测试端一定时间内向被测试端发送消息)。
    • allIdleTime:所有类型的超时时间。

    17.Netty 和 Tomcat 的区别?

    • 作用不同:Tomcat 是 Servlet 容器,可以视为 Web 服务器,而 Netty 是异步事件驱动的网络应用程序框架和工具用于简化网络编程,例如TCP和UDP套接字服务器。
    • 协议不同:Tomcat 是基于 http 协议的 Web 服务器,而 Netty 能通过编程自定义各种协议,因为 Netty 本身自己能编码/解码字节流,所有 Netty 可以实现,HTTP 服务器、FTP 服务器、UDP 服务器、RPC 服务器、WebSocket 服务器、Redis 的 Proxy 服务器、MySQL 的 Proxy 服务器等等。

    18.NIOEventLoopGroup源码?

    NioEventLoopGroup(其实是MultithreadEventExecutorGroup) 内部维护一个类型为 EventExecutor children [], 默认大小是处理器核数 * 2, 这样就构成了一个线程池,初始化EventExecutor时NioEventLoopGroup重载newChild方法,所以children元素的实际类型为NioEventLoop。

    线程启动时调用SingleThreadEventExecutor的构造方法,执行NioEventLoop类的run方法,首先会调用hasTasks()方法判断当前taskQueue是否有元素。如果taskQueue中有元素,执行 selectNow() 方法,最终执行selector.selectNow(),该方法会立即返回。如果taskQueue没有元素,执行 select(oldWakenUp) 方法

    select ( oldWakenUp) 方法解决了 Nio 中的 bug,selectCnt 用来记录selector.select方法的执行次数和标识是否执行过selector.selectNow(),若触发了epoll的空轮询bug,则会反复执行selector.select(timeoutMillis),变量selectCnt 会逐渐变大,当selectCnt 达到阈值(默认512),则执行rebuildSelector方法,进行selector重建,解决cpu占用100%的bug。

    rebuildSelector方法先通过openSelector方法创建一个新的selector。然后将old selector的selectionKey执行cancel。最后将old selector的channel重新注册到新的selector中。rebuild后,需要重新执行方法selectNow,检查是否有已ready的selectionKey。

    接下来调用processSelectedKeys 方法(处理I/O任务),当selectedKeys != null时,调用processSelectedKeysOptimized方法,迭代 selectedKeys 获取就绪的 IO 事件的selectkey存放在数组selectedKeys中, 然后为每个事件都调用 processSelectedKey 来处理它,processSelectedKey 中分别处理OP_READ;OP_WRITE;OP_CONNECT事件。

    最后调用runAllTasks方法(非IO任务),该方法首先会调用fetchFromScheduledTaskQueue方法,把scheduledTaskQueue中已经超过延迟执行时间的任务移到taskQueue中等待被执行,然后依次从taskQueue中取任务执行,每执行64个任务,进行耗时检查,如果已执行时间超过预先设定的执行时间,则停止执行非IO任务,避免非IO任务太多,影响IO任务的执行。

    每个NioEventLoop对应一个线程和一个Selector,NioServerSocketChannel会主动注册到某一个NioEventLoop的Selector上,NioEventLoop负责事件轮询。

    Outbound 事件都是请求事件, 发起者是 Channel,处理者是 unsafe,通过 Outbound 事件进行通知,传播方向是 tail到head。Inbound 事件发起者是 unsafe,事件的处理者是 Channel, 是通知事件,传播方向是从头到尾。

    内存管理机制,首先会预申请一大块内存Arena,Arena由许多Chunk组成,而每个Chunk默认由2048个page组成。Chunk通过AVL树的形式组织Page,每个叶子节点表示一个Page,而中间节点表示内存区域,节点自己记录它在整个Arena中的偏移地址。当区域被分配出去后,中间节点上的标记位会被标记,这样就表示这个中间节点以下的所有节点都已被分配了。大于8k的内存分配在poolChunkList中,而PoolSubpage用于分配小于8k的内存,它会把一个page分割成多段,进行内存分配。

    ByteBuf的特点:支持自动扩容(4M),保证put方法不会抛出异常、通过内置的复合缓冲类型,实现零拷贝(zero-copy);不需要调用flip()来切换读/写模式,读取和写入索引分开;方法链;引用计数基于AtomicIntegerFieldUpdater用于内存回收;PooledByteBuf采用二叉树来实现一个内存池,集中管理内存的分配和释放,不用每次使用都新建一个缓冲区对象。UnpooledHeapByteBuf每次都会新建一个缓冲区对象。

    Netty简介

    img

    Netty是 一个异步事件驱动的网络应用程序框架,用于快速开发可维护的高性能协议服务器和客户端。

    JDK原生NIO程序的问题

    JDK原生也有一套网络应用程序API,但是存在一系列问题,主要如下:

    • NIO的类库和API繁杂,使用麻烦,你需要熟练掌握Selector、ServerSocketChannel、SocketChannel、ByteBuffer等
    • 需要具备其它的额外技能做铺垫,例如熟悉Java多线程编程,因为NIO编程涉及到Reactor模式,你必须对多线程和网路编程非常熟悉,才能编写出高质量的NIO程序
    • 可靠性能力补齐,开发工作量和难度都非常大。例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码流的处理等等,NIO编程的特点是功能开发相对容易,但是可靠性能力补齐工作量和难度都非常大
    • JDK NIO的BUG,例如臭名昭著的epoll bug,它会导致Selector空轮询,最终导致CPU 100%。官方声称在JDK1.6版本的update18修复了该问题,但是直到JDK1.7版本该问题仍旧存在,只不过该bug发生概率降低了一些而已,它并没有被根本解决

    Netty的特点

    Netty的对JDK自带的NIO的API进行封装,解决上述问题,主要特点有:

    • 设计优雅 适用于各种传输类型的统一API - 阻塞和非阻塞Socket 基于灵活且可扩展的事件模型,可以清晰地分离关注点 高度可定制的线程模型 - 单线程,一个或多个线程池 真正的无连接数据报套接字支持(自3.1起)
    • 使用方便 详细记录的Javadoc,用户指南和示例 没有其他依赖项,JDK 5(Netty 3.x)或6(Netty 4.x)就足够了
    • 高性能 吞吐量更高,延迟更低 减少资源消耗 最小化不必要的内存复制
    • 安全 完整的SSL / TLS和StartTLS支持
    • 社区活跃,不断更新 社区活跃,版本迭代周期短,发现的BUG可以被及时修复,同时,更多的新功能会被加入

    Netty常见使用场景

    Netty常见的使用场景如下:

    • 互联网行业 在分布式系统中,各个节点之间需要远程服务调用,高性能的RPC框架必不可少,Netty作为异步高新能的通信框架,往往作为基础通信组件被这些RPC框架使用。 典型的应用有:阿里分布式服务框架Dubbo的RPC框架使用Dubbo协议进行节点间通信,Dubbo协议默认使用Netty作为基础通信组件,用于实现各进程节点之间的内部通信。
    • 游戏行业 无论是手游服务端还是大型的网络游戏,Java语言得到了越来越广泛的应用。Netty作为高性能的基础通信组件,它本身提供了TCP/UDP和HTTP协议栈。 非常方便定制和开发私有协议栈,账号登录服务器,地图服务器之间可以方便的通过Netty进行高性能的通信
    • 大数据领域 经典的Hadoop的高性能通信和序列化组件Avro的RPC框架,默认采用Netty进行跨界点通信,它的Netty Service基于Netty框架二次封装实现

    有兴趣的读者可以了解一下目前有哪些开源项目使用了 Netty:Related projects

    Netty高性能设计

    Netty作为异步事件驱动的网络,高性能之处主要来自于其I/O模型和线程处理模型,前者决定如何收发数据,后者决定如何处理数据

    I/O模型

    用什么样的通道将数据发送给对方,BIO、NIO或者AIO,I/O模型在很大程度上决定了框架的性能

    阻塞I/O

    传统阻塞型I/O(BIO)可以用下图表示:

    Blocking I/O

    特点

    • 每个请求都需要独立的线程完成数据read,业务处理,数据write的完整操作

    问题

    • 当并发数较大时,需要创建大量线程来处理连接,系统资源占用较大
    • 连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在read操作上,造成线程资源浪费

    I/O复用模型

    img

    在I/O复用模型中,会用到select,这个函数也会使进程阻塞,但是和阻塞I/O所不同的的,这两个函数可以同时阻塞多个I/O操作,而且可以同时对多个读操作,多个写操作的I/O函数进行检测,直到有数据可读或可写时,才真正调用I/O操作函数

    Netty的非阻塞I/O的实现关键是基于I/O复用模型,这里用Selector对象表示:

    Nonblocking I/O

    Netty的IO线程NioEventLoop由于聚合了多路复用器Selector,可以同时并发处理成百上千个客户端连接。当线程从某客户端Socket通道进行读写数据时,若没有数据可用时,该线程可以进行其他任务。线程通常将非阻塞 IO 的空闲时间用于在其他通道上执行 IO 操作,所以单独的线程可以管理多个输入和输出通道。

    由于读写操作都是非阻塞的,这就可以充分提升IO线程的运行效率,避免由于频繁I/O阻塞导致的线程挂起,一个I/O线程可以并发处理N个客户端连接和读写操作,这从根本上解决了传统同步阻塞I/O一连接一线程模型,架构的性能、弹性伸缩能力和可靠性都得到了极大的提升。

    基于buffer

    传统的I/O是面向字节流或字符流的,以流式的方式顺序地从一个Stream 中读取一个或多个字节, 因此也就不能随意改变读取指针的位置。

    在NIO中, 抛弃了传统的 I/O流, 而是引入了Channel和Buffer的概念. 在NIO中, 只能从Channel中读取数据到Buffer中或将数据 Buffer 中写入到 Channel。

    基于buffer操作不像传统IO的顺序操作, NIO 中可以随意地读取任意位置的数据

    线程模型

    数据报如何读取?读取之后的编解码在哪个线程进行,编解码后的消息如何派发,线程模型的不同,对性能的影响也非常大。

    事件驱动模型

    通常,我们设计一个事件处理模型的程序有两种思路

    • 轮询方式 线程不断轮询访问相关事件发生源有没有发生事件,有发生事件就调用事件处理逻辑。
    • 事件驱动方式 发生事件,主线程把事件放入事件队列,在另外线程不断循环消费事件列表中的事件,调用事件对应的处理逻辑处理事件。事件驱动方式也被称为消息通知方式,其实是设计模式中观察者模式的思路。

    以GUI的逻辑处理为例,说明两种逻辑的不同:

    • 轮询方式 线程不断轮询是否发生按钮点击事件,如果发生,调用处理逻辑
    • 事件驱动方式 发生点击事件把事件放入事件队列,在另外线程消费的事件列表中的事件,根据事件类型调用相关事件处理逻辑

    这里借用O’Reilly 大神关于事件驱动模型解释图

    事件驱动模型

    主要包括4个基本组件:

    • 事件队列(event queue):接收事件的入口,存储待处理事件
    • 分发器(event mediator):将不同的事件分发到不同的业务逻辑单元
    • 事件通道(event channel):分发器与处理器之间的联系渠道
    • 事件处理器(event processor):实现业务逻辑,处理完成后会发出事件,触发下一步操作

    可以看出,相对传统轮询模式,事件驱动有如下优点:

    • 可扩展性好,分布式的异步架构,事件处理器之间高度解耦,可以方便扩展事件处理逻辑
    • 高性能,基于队列暂存事件,能方便并行异步处理事件

    Reactor线程模型

    Reactor是反应堆的意思,Reactor模型,是指通过一个或多个输入同时传递给服务处理器的服务请求的事件驱动处理模式。 服务端程序处理传入多路请求,并将它们同步分派给请求对应的处理线程,Reactor模式也叫Dispatcher模式,即I/O多了复用统一监听事件,收到事件后分发(Dispatch给某进程),是编写高性能网络服务器的必备技术之一。

    Reactor模型中有2个关键组成:

    • Reactor Reactor在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对IO事件做出反应。 它就像公司的电话接线员,它接听来自客户的电话并将线路转移到适当的联系人
    • Handlers 处理程序执行I/O事件要完成的实际事件,类似于客户想要与之交谈的公司中的实际官员。Reactor通过调度适当的处理程序来响应I/O事件,处理程序执行非阻塞操作

    Reactor模型

    取决于Reactor的数量和Hanndler线程数量的不同,Reactor模型有3个变种

    • 单Reactor单线程
    • 单Reactor多线程
    • 主从Reactor多线程

    可以这样理解,Reactor就是一个执行while (true) { selector.select(); …}循环的线程,会源源不断的产生新的事件,称作反应堆很贴切。

    篇幅关系,这里不再具体展开Reactor特性、优缺点比较,有兴趣的读者可以参考我之前另外一篇文章:《理解高性能网络模型》

    Netty线程模型

    Netty主要基于主从Reactors多线程模型(如下图)做了一定的修改,其中主从Reactor多线程模型有多个Reactor:MainReactor和SubReactor:

    • MainReactor负责客户端的连接请求,并将请求转交给SubReactor
    • SubReactor负责相应通道的IO读写请求
    • 非IO请求(具体逻辑处理)的任务则会直接写入队列,等待worker threads进行处理

    这里引用Doug Lee大神的Reactor介绍:Scalable IO in Java里面关于主从Reactor多线程模型的图

    主从Rreactor多线程模型

    特别说明的是: 虽然Netty的线程模型基于主从Reactor多线程,借用了MainReactor和SubReactor的结构,但是实际实现上,SubReactor和Worker线程在同一个线程池中:

    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    ServerBootstrap server = new ServerBootstrap();
    server.group(bossGroup, workerGroup)
     .channel(NioServerSocketChannel.class)
    
    

    上面代码中的bossGroup 和workerGroup是Bootstrap构造方法中传入的两个对象,这两个group均是线程池

    • bossGroup线程池则只是在bind某个端口后,获得其中一个线程作为MainReactor,专门处理端口的accept事件,每个端口对应一个boss线程
    • workerGroup线程池会被各个SubReactor和worker线程充分利用

    异步处理

    异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。

    Netty中的I/O操作是异步的,包括bind、write、connect等操作会简单的返回一个ChannelFuture,调用者并不能立刻获得结果,通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果。

    当future对象刚刚创建时,处于非完成状态,调用者可以通过返回的ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操,常见有如下操作:

    • 通过isDone方法来判断当前操作是否完成
    • 通过isSuccess方法来判断已完成的当前操作是否成功
    • 通过getCause方法来获取已完成的当前操作失败的原因
    • 通过isCancelled方法来判断已完成的当前操作是否被取消
    • 通过addListener方法来注册监听器,当操作已完成(isDone方法返回完成),将会通知指定的监听器;如果future对象已完成,则理解通知指定的监听器

    例如下面的的代码中绑定端口是异步操作,当绑定操作处理完,将会调用相应的监听器处理逻辑

        serverBootstrap.bind(port).addListener(future -> {
            if (future.isSuccess()) {
                System.out.println(new Date() + ": 端口[" + port + "]绑定成功!");
            } else {
                System.err.println("端口[" + port + "]绑定失败!");
            }
        });
    
    

    相比传统阻塞I/O,执行I/O操作后线程会被阻塞住, 直到操作完成;异步处理的好处是不会造成线程阻塞,线程在I/O操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量。

    Netty架构设计

    前面介绍完Netty相关一些理论介绍,下面从功能特性、模块组件、运作过程来介绍Netty的架构设计

    功能特性

    Netty功能特性图

    • 传输服务 支持BIO和NIO
    • 容器集成 支持OSGI、JBossMC、Spring、Guice容器
    • 协议支持 HTTP、Protobuf、二进制、文本、WebSocket等一系列常见协议都支持。 还支持通过实行编码解码逻辑来实现自定义协议
    • Core核心 可扩展事件模型、通用通信API、支持零拷贝的ByteBuf缓冲对象

    模块组件

    Bootstrap、ServerBootstrap

    Bootstrap意思是引导,一个Netty应用通常由一个Bootstrap开始,主要作用是配置整个Netty程序,串联各个组件,Netty中Bootstrap类是客户端程序的启动引导类,ServerBootstrap是服务端启动引导类。

    Future、ChannelFuture

    正如前面介绍,在Netty中所有的IO操作都是异步的,不能立刻得知消息是否被正确处理,但是可以过一会等它执行完成或者直接注册一个监听,具体的实现就是通过Future和ChannelFutures,他们可以注册一个监听,当操作执行成功或失败时监听会自动触发注册的监听事件。

    Channel

    Netty网络通信的组件,能够用于执行网络I/O操作。 Channel为用户提供:

    • 当前网络连接的通道的状态(例如是否打开?是否已连接?)
    • 网络连接的配置参数 (例如接收缓冲区大小)
    • 提供异步的网络I/O操作(如建立连接,读写,绑定端口),异步调用意味着任何I / O调用都将立即返回,并且不保证在调用结束时所请求的I / O操作已完成。调用立即返回一个ChannelFuture实例,通过注册监听器到ChannelFuture上,可以I / O操作成功、失败或取消时回调通知调用方。
    • 支持关联I/O操作与对应的处理程序

    不同协议、不同的阻塞类型的连接都有不同的 Channel 类型与之对应,下面是一些常用的 Channel 类型

    • NioSocketChannel,异步的客户端 TCP Socket 连接
    • NioServerSocketChannel,异步的服务器端 TCP Socket 连接
    • NioDatagramChannel,异步的 UDP 连接
    • NioSctpChannel,异步的客户端 Sctp 连接
    • NioSctpServerChannel,异步的 Sctp 服务器端连接 这些通道涵盖了 UDP 和 TCP网络 IO以及文件 IO.

    Selector

    Netty基于Selector对象实现I/O多路复用,通过 Selector, 一个线程可以监听多个连接的Channel事件, 当向一个Selector中注册Channel 后,Selector 内部的机制就可以自动不断地查询(select) 这些注册的Channel是否有已就绪的I/O事件(例如可读, 可写, 网络连接完成等),这样程序就可以很简单地使用一个线程高效地管理多个 Channel 。

    NioEventLoop

    NioEventLoop中维护了一个线程和任务队列,支持异步提交执行任务,线程启动时会调用NioEventLoop的run方法,执行I/O任务和非I/O任务:

    • I/O任务 即selectionKey中ready的事件,如accept、connect、read、write等,由processSelectedKeys方法触发。
    • 非IO任务 添加到taskQueue中的任务,如register0、bind0等任务,由runAllTasks方法触发。

    两种任务的执行时间比由变量ioRatio控制,默认为50,则表示允许非IO任务执行的时间与IO任务的执行时间相等。

    NioEventLoopGroup

    NioEventLoopGroup,主要管理eventLoop的生命周期,可以理解为一个线程池,内部维护了一组线程,每个线程(NioEventLoop)负责处理多个Channel上的事件,而一个Channel只对应于一个线程。

    ChannelHandler

    ChannelHandler是一个接口,处理I / O事件或拦截I / O操作,并将其转发到其ChannelPipeline(业务处理链)中的下一个处理程序。

    ChannelHandler本身并没有提供很多方法,因为这个接口有许多的方法需要实现,方便使用期间,可以继承它的子类:

    • ChannelInboundHandler用于处理入站I / O事件
    • ChannelOutboundHandler用于处理出站I / O操作

    或者使用以下适配器类:

    • ChannelInboundHandlerAdapter用于处理入站I / O事件
    • ChannelOutboundHandlerAdapter用于处理出站I / O操作
    • ChannelDuplexHandler用于处理入站和出站事件

    ChannelHandlerContext

    保存Channel相关的所有上下文信息,同时关联一个ChannelHandler对象

    ChannelPipline

    保存ChannelHandler的List,用于处理或拦截Channel的入站事件和出站操作。 ChannelPipeline实现了一种高级形式的拦截过滤器模式,使用户可以完全控制事件的处理方式,以及Channel中各个的ChannelHandler如何相互交互。

    下图引用Netty的Javadoc4.1中ChannelPipline的说明,描述了ChannelPipeline中ChannelHandler通常如何处理I/O事件。 I/O事件由ChannelInboundHandler或ChannelOutboundHandler处理,并通过调用ChannelHandlerContext中定义的事件传播方法(例如ChannelHandlerContext.fireChannelRead(Object)和ChannelOutboundInvoker.write(Object))转发到其最近的处理程序。

                                                     I/O Request
                                                via Channel or
                                            ChannelHandlerContext
                                                          |
      +---------------------------------------------------+---------------+
      |                           ChannelPipeline         |               |
      |                                                  \|/              |
      |    +---------------------+            +-----------+----------+    |
      |    | Inbound Handler  N  |            | Outbound Handler  1  |    |
      |    +----------+----------+            +-----------+----------+    |
      |              /|\                                  |               |
      |               |                                  \|/              |
      |    +----------+----------+            +-----------+----------+    |
      |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |
      |    +----------+----------+            +-----------+----------+    |
      |              /|\                                  .               |
      |               .                                   .               |
      | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|
      |        [ method call]                       [method call]         |
      |               .                                   .               |
      |               .                                  \|/              |
      |    +----------+----------+            +-----------+----------+    |
      |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |
      |    +----------+----------+            +-----------+----------+    |
      |              /|\                                  |               |
      |               |                                  \|/              |
      |    +----------+----------+            +-----------+----------+    |
      |    | Inbound Handler  1  |            | Outbound Handler  M  |    |
      |    +----------+----------+            +-----------+----------+    |
      |              /|\                                  |               |
      +---------------+-----------------------------------+---------------+
                      |                                  \|/
      +---------------+-----------------------------------+---------------+
      |               |                                   |               |
      |       [ Socket.read() ]                    [ Socket.write() ]     |
      |                                                                   |
      |  Netty Internal I/O Threads (Transport Implementation)            |
      +-------------------------------------------------------------------+
    
    

    入站事件由自下而上方向的入站处理程序处理,如图左侧所示。 入站Handler处理程序通常处理由图底部的I / O线程生成的入站数据。 通常通过实际输入操作(例如SocketChannel.read(ByteBuffer))从远程读取入站数据。

    出站事件由上下方向处理,如图右侧所示。 出站Handler处理程序通常会生成或转换出站传输,例如write请求。 I/O线程通常执行实际的输出操作,例如SocketChannel.write(ByteBuffer)。

    在 Netty 中每个 Channel 都有且仅有一个 ChannelPipeline 与之对应, 它们的组成关系如下:

    img

    一个 Channel 包含了一个 ChannelPipeline, 而 ChannelPipeline 中又维护了一个由 ChannelHandlerContext 组成的双向链表, 并且每个 ChannelHandlerContext 中又关联着一个 ChannelHandler。入站事件和出站事件在一个双向链表中,入站事件会从链表head往后传递到最后一个入站的handler,出站事件会从链表tail往前传递到最前一个出站的handler,两种类型的handler互不干扰。

    工作原理架构

    初始化并启动Netty服务端过程如下:

        public static void main(String[] args) {
            // 创建mainReactor
            NioEventLoopGroup boosGroup = new NioEventLoopGroup();
            // 创建工作线程组
            NioEventLoopGroup workerGroup = new NioEventLoopGroup();
    
            final ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap 
                     // 组装NioEventLoopGroup 
                    .group(boosGroup, workerGroup)
                     // 设置channel类型为NIO类型
                    .channel(NioServerSocketChannel.class)
                    // 设置连接配置参数
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    // 配置入站、出站事件handler
                    .childHandler(new ChannelInitializer<NioSocketChannel>() {
                        @Override
                        protected void initChannel(NioSocketChannel ch) {
                            // 配置入站、出站事件channel
                            ch.pipeline().addLast(...);
                            ch.pipeline().addLast(...);
                        }
        });
    
            // 绑定端口
            int port = 8080;
            serverBootstrap.bind(port).addListener(future -> {
                if (future.isSuccess()) {
                    System.out.println(new Date() + ": 端口[" + port + "]绑定成功!");
                } else {
                    System.err.println("端口[" + port + "]绑定失败!");
                }
            });
    }
    
    
    • 基本过程如下:
    • 1 初始化创建2个NioEventLoopGroup,其中boosGroup用于Accetpt连接建立事件并分发请求, workerGroup用于处理I/O读写事件和业务逻辑
    • 2 基于ServerBootstrap(服务端启动引导类),配置EventLoopGroup、Channel类型,连接参数、配置入站、出站事件handler
    • 3 绑定端口,开始工作

    结合上面的介绍的Netty Reactor模型,介绍服务端Netty的工作架构图:

    服务端Netty Reactor工作架构图

    server端包含1个Boss NioEventLoopGroup和1个Worker NioEventLoopGroup,NioEventLoopGroup相当于1个事件循环组,这个组里包含多个事件循环NioEventLoop,每个NioEventLoop包含1个selector和1个事件循环线程。

    每个Boss NioEventLoop循环执行的任务包含3步:

    • 1 轮询accept事件
    • 2 处理accept I/O事件,与Client建立连接,生成NioSocketChannel,并将NioSocketChannel注册到某个Worker NioEventLoop的Selector上 *3 处理任务队列中的任务,runAllTasks。任务队列中的任务包括用户调用eventloop.execute或schedule执行的任务,或者其它线程提交到该eventloop的任务。

    每个Worker NioEventLoop循环执行的任务包含3步:

    • 1 轮询read、write事件;
    • 2 处I/O事件,即read、write事件,在NioSocketChannel可读、可写事件发生时进行处理
    • 3 处理任务队列中的任务,runAllTasks。

    其中任务队列中的task有3种典型使用场景

    • 1 用户程序自定义的普通任务
    ctx.channel().eventLoop().execute(new Runnable() {
        @Override
        public void run() {
            //...
        }
    });
    
    
    • 2 非当前reactor线程调用channel的各种方法 例如在推送系统的业务线程里面,根据用户的标识,找到对应的channel引用,然后调用write类方法向该用户推送消息,就会进入到这种场景。最终的write会提交到任务队列中后被异步消费。
    • 3 用户自定义定时任务
    ctx.channel().eventLoop().schedule(new Runnable() {
        @Override
        public void run() {
    
        }
    }, 60, TimeUnit.SECONDS);
    
    

    总结

    现在稳定推荐使用的主流版本还是Netty4,Netty5 中使用了 ForkJoinPool,增加了代码的复杂度,但是对性能的改善却不明显,所以这个版本不推荐使用,官网也没有提供下载链接。

    Netty 入门门槛相对较高,其实是因为这方面的资料较少,并不是因为他有多难,大家其实都可以像搞透 Spring 一样搞透 Netty。在学习之前,建议先理解透整个框架原理结构,运行过程,可以少走很多弯路。

    展开全文
  • ES 9300 Netty jar冲突

    万次阅读 2020-01-13 18:16:04
    [2020-01-13T16:12:24,691][WARN ][o.e.t.n.Netty4Transport ] [yq1] send message failed [channel: NettyTcpChannel{localAddress=0.0.0.0/0.0.0.0:9300, remoteAddress=/192.168.251.205:50072}] java.io.IOExc.....
    [2020-01-13T16:12:24,691][WARN ][o.e.t.n.Netty4Transport  ] [yq1] send message failed [channel: NettyTcpChannel{localAddress=0.0.0.0/0.0.0.0:9300, remoteAddress=/192.168.251.205:50072}]
    java.io.IOException: Connection reset by peer
        at sun.nio.ch.FileDispatcherImpl.write0(Native Method) ~[?:?]
        at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) ~[?:?]
        at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) ~[?:?]
        at sun.nio.ch.IOUtil.write(IOUtil.java:51) ~[?:?]
        at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) ~[?:?]
        at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:388) ~[netty-transport-4.1.16.Final.jar:4.1.16.Final]
        at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:934) [netty-transport-4.1.16.Final.jar:4.1.16.Final]
        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.forceFlush(AbstractNioChannel.java:368) [netty-transport-4.1.16.Final.jar:4.1.16.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:639) [netty-transport-4.1.16.Final.jar:4.1.16.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysPlain(NioEventLoop.java:545) [netty-transport-4.1.16.Final.jar:4.1.16.Final]
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:499) [netty-transport-4.1.16.Final.jar:4.1.16.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) [netty-transport-4.1.16.Final.jar:4.1.16.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) [netty-common-4.1.16.Final.jar:4.1.16.Final]
        at java.lang.Thread.run(Thread.java:745) [?:1.8.0_112]

     

    发现是netty相关问题,查看程序pom文件中的netty依赖版本如下

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.38.Final</version>
            </dependency>
        </dependencies>
    </dependencyManagement>


    错误中提示netty版本为4.1.16.Final,与依赖中版本不一致,

    解决办法 :把pom文件中依赖注释掉
     

    展开全文
  • Netty部分API说明

    万次阅读 2019-08-04 22:44:15
    介绍了Netty的基本结构和一些API的用法,想学习Netty基础的可以看一下。 我的网易云笔记: Netty 结构和API分析 网址:https://note.youdao.com/ynoteshare1/index.html?id=e3f45354f557cbb8e6395021a5f06642&...
  • 详尽Netty(一):初探netty

    万次阅读 2020-06-15 00:29:26
    如果大家对java架构相关感兴趣,可以关注下面公众号,会持续更新java基础面试题, netty, spring boot,spring cloud等系列文章,一系列干货随时送达, 超神之路从此展开, BTAJ不再是梦想! 概念 ​ Netty 是一个高...
  • netty源码构建

    万次阅读 2020-11-30 17:17:37
    netty源码构建 构建步骤 github源代码地址:https://github.com/netty/netty。 使用git clone:git clone git@github.com:netty/netty.git。 $ git clone git@github.com:netty/netty.git Cloning into 'netty'... ...
  • Netty框架学习之(一):Netty框架简介

    万次阅读 多人点赞 2018-05-23 18:43:19
    官方定义为:”Netty 是一款异步的事件驱动的网络应用程序框架,支持快速地开发可维护的高性能的面向协议的服务器 和客户端”,按照惯例贴上一张High Level的架构图: 纵观Java系的多种服务器/大数据框架,都离不...
  • SpringBoot2+Netty+WebSocket(netty实现websocket,支持URL参数)

    万次阅读 多人点赞 2019-06-12 16:20:11
    关于Netty Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。 MAVEN依赖 <dependencies> <!-- ...
  • Netty —— Netty 模型

    千次阅读 2020-05-17 12:00:54
    文章目录Netty 模型工作原理示意图 - 简单版工作原理示意图 - 进阶版工作原理示意图 - 详细版 Netty 模型 工作原理示意图 - 简单版 Netty 主要基于主从 Reactor 多线程模型(如图)做了一定的改进,其中主从 Reactor...
  • netty实战:SpringBoot集成netty实现客户端服务端交互

    万次阅读 多人点赞 2019-04-30 10:44:59
    SpringBoot集成netty实现客户端服务端交互在springboot中集成netty。我在代码里用到了lombok的@data和@Slf4j注解,主要用来简化get set方法和输出日志。我们先加入netty的依赖下面是我的netty代码结构server端server...
  • Netty 介绍

    万次阅读 2020-04-19 17:06:29
    1.1. Netty 原理 Netty 是一个高性能、异步事件驱动的 NIO 框架,基于 JAVA NIO 提供的 API 实现。它提供了对 TCP、UDP 和文件传输的支持,作为一个异步 NIO 框架,Netty 的所有 IO 操作都是异步非阻塞 的,通过 ...
  • Netty 入门示例详解

    万次阅读 多人点赞 2018-10-13 10:40:40
    在已经了解完《Netty 理论详解》之后,想必已经开始跃跃欲试了,毕竟这么好的东西呀! 本文将详细讲解 Netty 入门案例。 Netty 官网地址:http://netty.io/ GitHub 托管地址:https://github.com/n...
  • NettyNetty实例开源项目

    万次阅读 2018-08-23 23:07:15
    Netty 前言 netty-introduction-demo:Netty的入门实例 简介 netty-serialization-demo:Netty传输序列化对象 简介 netty-heartbeat-detection-demo:使用Netty进行服务器和客户端的心跳检测 简介 netty-not-...
  • 在Java界,Netty无疑是开发网络应用的拿手菜。你不需要太多关注复杂的NIO模型和底层网络的细节,使用其丰富的接口,可以很容易的实现复杂的通讯功能。 本课程准备的十二个实例,按照由简单到复杂的学习路线,...
  • 今天,是年后第一次发表博文,今天就给大家带来一篇有关netty的文章吧,好了,我们直接进入主题。 一、前言介绍 本案例主要介绍如何使用Netty开发websocket。 二、环境需求 1、jdk1.7 2、Eclipse 3、Netty...
  • Netty研究

    千次阅读 2020-11-27 16:30:46
    文章目录1 netty的版本2 netty的核心类3 零拷贝(zero-copy)4 nio的epoll空轮询bug5 要读Netty源码嘛? 这篇文档是本人在学习netty,使用netty和阅读netty源码的过程中的总结,涉及到的学习知识点和学习重点会一一...
  • 高性能IO框架Netty一-第一个Netty程序

    万次阅读 2019-09-07 11:50:55
    一、Netty简介 1、Netty是什么? 2、为什么要用Netty? 3、为什么Netty使用NIO而不是AIO? 4、为什么不用Netty5 二、HelloNetty! 1、NettyServer 2、NettyServerHandler 3、NettyClient 4、...
  • Netty-认识netty

    2018-01-14 17:21:23
    好久没有在博客上写作笔记了,最近公司项目有使用到netty,作为一个之前对netty没有使用过的小白来说,后面我会系统的学习相关的知识,并将项目中用到的技术记录下来,以供学习,后面争取每周发一篇内容。netty是...
  • 一、Netty应用场景 二、Netty实现文件的上传和下载 三、程序演示 1、下载演示 2、上传演示 一、Netty应用场景 讲了一些Netty的组件,来聊一聊大家最关心的事情吧,他能够做什么?毕竟,我们学习就是拿来用的嘛...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 62,104
精华内容 24,841
关键字:

netty