精华内容
下载资源
问答
  • 维持长连接

    千次阅读 2010-04-19 11:03:00
    现成的长连接应用--Connection:keep-alive 在HTTp协议请求和响应中加入这条就能维持长连接。再封装HTTP消息数据体的消息应用就显的非常简单易用 Keep-Alive功能使客户端到服务器端的连接持续有效,当出现对服务器的...

     

    现成的长连接应用--Connection:keep-alive
    在HTTp协议请求和响应中加入这条就能维持长连接。
    再封装HTTP消息数据体的消息应用就显的非常简单易用

    Keep-Alive功能使客户端到服务器端的连接持续有效,当出现对服务器的后继请求时,Keep-Alive功能避免了建立或者重新建立连接。市场上 的大部分Web服务器,包括iPlanet、IIS和Apache,都支持HTTP Keep-Alive。对于提供静态内容的网站来说,这个功能通常很有用。但是,对于负担较重的网站来说,这里存在另外一个问题:虽然为客户保留打开的连 接有一定的好处,但它同样影响了性能,因为在处理暂停期间,本来可以释放的资源仍旧被占用。当Web服务器和应用服务器在同一台机器上运行时,Keep- Alive功能对资源利用的影响尤其突出。 此功能为HTTP 1.1预设的功能,HTTP 1.0加上Keep-Alive header也可以提供HTTP的持续作用功能。


    Keep-Alive: timeout=5, max=100
    timeout:过期时间5秒(对应httpd.conf里的参数是:KeepAliveTimeout),max是最多一百次请求,强制断掉连接
    就是在timeout时间内又有新的连接过来,同时max会自动减1,直到为0,强制断掉。见下面的四个图,注意看Date的值(前后时间差都是在5秒之 内)!



    Tomcat中的相关设置,在server.xml 中的Connector 元素中。
    keepAliveTimeout:
    The number of milliseconds this Connector will wait for another HTTP request before closing the connection. The default value is to use the value that has been set for the connectionTimeout attribute.

    maxKeepAliveRequests:
    The maximum number of HTTP requests which can be pipelined until the connection is closed by the server. Setting this attribute to 1 will disable HTTP/1.0 keep-alive, as well as HTTP/1.1 keep-alive and pipelining. Setting this to -1 will allow an unlimited amount of pipelined or keep-alive HTTP requests. If not specified, this attribute is set to 100.

    **************************************************************************************

    在网页开发过程中,Keep-Alive是HTTP协议中非常重要的一个属性。大家知道HTTP构建在TCP之上。在HTTP早期实现中,每个 HTTP请求都要打开一个socket连接。这种做效率很低,因为一个Web 页面中的很多HTTP请求都指向同一个服务器。例如,很多为Web页面中的图片发起的请求都指向一个通用的图片服务器。持久连接的引入解决了多对已请求服务器导致的socket连接低效性的问题。它使浏览器可以再一个单独的连接上进行多个请求。浏览器和服务器使用Connection头ilai指出对 Keep-Alive的支持。

     

    笔者在去年遇到一个跟Keep-Alive的问题:

     

    问题现象: 一个JSP页面,居然要耗时40多秒。网页中有大量的图片的CSS

     

    问题解决: 原因也找了半天,原来Apache配置里面,把Keep-Alive的开关关闭了。这个是个大问题,工程师为什么要关闭它,原来他考虑的太简单了,我们知道Apache适合处于短连接的请求,处理时间越短,并发数才能上去,原来他是这么考虑,但是没有办法,只能这样了,还是打开Keep-Alive开关吧。

     

    当然,不是所有的情况都设置KeepAlive为On,下面的文字总结比较好:

     

    【在使用apache 的过程中,KeepAlive 属性我一直保持为默认值On,其实,该属性设置为On还是Off还是要具体问题具体分析的,在生产环境中的影响还是蛮大的。

    KeepAlive 选项到底有什么用处?如果你用过Mysql ,应该知道Mysql的连接属性中有一个与KeepAlive 类似的Persistent Connection,即:长连接(PConnect)。该属性打开的话,可以使一次TCP连接为同一用户的多次请求服务,提高了响应速度。

    比如很多网页中图片、CSS、JS、Html都在一台Server上,当用户访问其中的Html网页时,网页中的图片、Css、Js都构成了访问请求,打开KeepAlive 属性可以有效地降低TCP握手的次数(当然浏览器对同一域下同时请求的图片数有限制,一般是2),减少httpd进程数,从而降低内存的使用(假定 prefork模式)。MaxKeepAliveRequests 和KeepAliveTimeOut 两个属性在KeepAlive =On时起作用,可以控制持久连接的生存时间和最大服务请求数。

    不过,上面说的只是一种情形,那就是静态网页居多的情况下,并且网页中的其他请求与网页在同一台Server上。当你的应用动态程序(比如:php )居多,用户访问时由动态程序即时生成html内容,html内容中图片素材和Css、Js等比较少或者散列在其他Server上时,KeepAlive =On反而会降低Apache 的性能。为什么呢?

    前面提到过,KeepAlive =On时,每次用户访问,打开一个TCP连接,Apache 都会保持该连接一段时间,以便该连接能连续为同一client服务,在KeepAliveTimeOut还没到期并且 MaxKeepAliveRequests还没到阈值之前,Apache 必然要有一个httpd进程来维持该连接,httpd进程不是廉价的,他要消耗内存和CPU时间片的。假如当前Apache 每秒响应100个用户访问,KeepAliveTimeOut=5,此时httpd进程数就是100*5=500个(prefork 模式),一个httpd进程消耗5M内存的话,就是500*5M=2500M=2.5G,夸张吧?当然,Apache 与Client只进行了100次TCP连接。如果你的内存够大,系统负载不会太高,如果你的内存小于2.5G,就会用到Swap,频繁的Swap切换会加重CPU的Load。

    现在我们关掉KeepAlive ,Apache 仍然每秒响应100个用户访问,因为我们将图片、js、css等分离出去了,每次访问只有1个request,此时httpd的进程数是 100*1=100个,使用内存100*5M=500M,此时Apache 与Client也是进行了100次TCP连接。性能却提升了太多。

    总结:

    1、当你的Server内存充足时,KeepAlive =On还是Off对系统性能影响不大。

    2、当你的Server上静态网页(Html、图片、Css、Js)居多时,建议打开KeepAlive 。

    3、当你的Server多为动态请求(因为连接数据库,对文件系统访问较多),KeepAlive 关掉,会节省一定的内存,节省的内存正好可以作为文件系统的Cache(vmstat命令中cache一列),降低I/O压力。

    PS:当KeepAlive =On时,KeepAliveTimeOut的设置其实也是一个问题,设置的过短,会导致Apache 频繁建立连接,给Cpu造成压力,设置的过长,系统中就会堆积无用的Http连接,消耗掉大量内存,具体设置多少,可以进行不断的调节,因你的网站浏览和服务器配

    展开全文
  • linux维持长连接调优

    千次阅读 2017-02-16 21:49:41
    ip_conntrack 是Linux NAT一个跟踪连接条目的模块记录着允许的跟踪连接条目ip_conntrack 模块会记录 tcp 通讯协议的 established connection 记录, 而且预设 timeout 时间长达五天 (432,000 秒).所以局域网中当有人...
    由于linux内核的限制,files open too many是一个常见的问题
    
    修改/etc/sysctl.conf文件

    net.ipv4.tcp_tw_recycle = 1
    net.ipv4.tcp_tw_reuse = 1

    net.ipv4.tcp_timestamps = 0
    net.ipv4.ip_local_port_range = 1024 65000
    fs.file-max=655360
    net.netfilter.nf_conntrack_max = 798641


    以上参数存在/proc/sys/下的相应目录

    [quote]目前,大多的 ip_conntrack_* 已被 nf_conntrack_* 取代,很多 ip_conntrack_* 仅仅是个 alias,原先的 ip_conntrack 的 /proc/sys/net/ipv4/netfilter/ 依然存在,但是新的 nf_conntrack 在 /proc/sys/net/netfilter/ 中[/quote]
    [quote]file-max是设置 系统所有进程一共可以打开的文件数量
    /proc/sys/fs/file-max
    [/quote]

    [quote]
    ip_conntrack 是Linux NAT一个跟踪连接条目的模块记录着允许的跟踪连接条目ip_conntrack 模块会记录 tcp 通讯协议的 established connection 记录, 而且预设 timeout 时间长达五天 (432,000 秒).所以局域网中当有人使用p2p类的软件就很容易使ip_conntrack达到最大值...也由此造成。
    echo "3600" > /proc/sys/net/netfilter/nf_conntrack_tcp_timeout_established
    [/quote]


    /etc/security/limits.conf
    *    soft    nofile  1000000  
    * hard nofile 1000000

    [quote]ulimit作用:即设置当前shell以及由它启动的进程的资源限制[/quote]


    [quote]
    1.验证某个进程的限制

    # ps -ef |grep nginx

    将得出的PID XXX带入下面

    #cat /proc/XXX/limits

    查看Max open files 那一行

    2.验证系统级别的限制

    # ulimit -n

    3.验证内核级别的限制

    #cat /proc/sys/fs/file-max
    [/quote]
    展开全文
  •  多机器只要把连接map内容维护到缓存里面 集中管理 就可以了。。 Netty   准备说明:引入java-websocket,netty-all-5.0等的jar包。其中内部心跳机制使用userEventTriggered事件方式实现,客户端的心跳实现...

    ps: 以下是单机的处理方案 

         多机器只要把连接map内容维护到缓存里面 集中管理 就可以了。。

    Netty

     

    准备说明:引入java-websocket,netty-all-5.0等的jar包。其中内部心跳机制使用userEventTriggered事件方式实现,客户端的心跳实现客户端的断点重连工作,服务端的心跳实现服务端清除多余链接的功能

     

    。以下为一些实现的代码:1. 

    [java] view plain copy

    package base;  

      

    /** 

     *  

     * 请求类型的消息 

     */  

    public class AskMsg extends BaseMsg {  

        public AskMsg() {  

            super();  

            setType(MsgType.ASK);  

        }  

        private AskParams params;  

      

        public AskParams getParams() {  

            return params;  

        }  

      

        public void setParams(AskParams params) {  

            this.params = params;  

        }  

    }  

    2. 
    [java] view plain copy

    package base;  

      

    import java.io.Serializable;  

      

    /** 

     *   

     */  

    public class AskParams implements Serializable {  

        private static final long serialVersionUID = 1L;  

        private String auth;  

        private String content;  

          

        public String getContent() {  

            return content;  

        }  

      

        public void setContent(String content) {  

            this.content = content;  

        }  

      

        public String getAuth() {  

            return auth;  

        }  

      

        public void setAuth(String auth) {  

            this.auth = auth;  

        }  

    }  

    3. 
    [java] view plain copy

    package base;  

      

    import java.io.Serializable;  

      

    /** 

     *  

     * 必须实现序列,serialVersionUID 一定要有 

     */  

      

    public abstract class BaseMsg  implements Serializable {  

        private static final long serialVersionUID = 1L;  

        private MsgType type;  

        //必须唯一,否者会出现channel调用混乱  

        private String clientId;  

      

        //初始化客户端id  

        public BaseMsg() {  

            this.clientId = Constants.getClientId();  

        }  

      

        public String getClientId() {  

            return clientId;  

        }  

      

        public void setClientId(String clientId) {  

            this.clientId = clientId;  

        }  

      

        public MsgType getType() {  

            return type;  

        }  

      

        public void setType(MsgType type) {  

            this.type = type;  

        }  

    }  

    4. 
    [java] view plain copy

    package base;  

      

      

    /** 

     *   

     */  

    public class Constants {  

        private static String clientId;  

      

        public static String getClientId() {  

            return clientId;  

        }  

      

        public static void setClientId(String clientId) {  

            Constants.clientId = clientId;  

        }  

    }  

    5. 
    [java] view plain copy

    package base;  

      

    /** 

     *   

     * 登录验证类型的消息 

     */  

    public class LoginMsg extends BaseMsg {  

        private String userName;  

        private String password;  

        public LoginMsg() {  

            super();  

            setType(MsgType.LOGIN);  

        }  

      

        public String getUserName() {  

            return userName;  

        }  

      

        public void setUserName(String userName) {  

            this.userName = userName;  

        }  

      

        public String getPassword() {  

            return password;  

        }  

      

        public void setPassword(String password) {  

            this.password = password;  

        }  

    }  

    6. 
    [java] view plain copy

    package base;  

      

    /** 

     *   

     */  

    public enum  MsgType {  

        PING,ASK,REPLY,LOGIN  

    }  

    7. 
    [java] view plain copy

    package base;  

      

    /** 

     *   

     * 心跳检测的消息类型 

     */  

    public class PingMsg extends BaseMsg {  

        public PingMsg() {  

            super();  

            setType(MsgType.PING);  

        }  

    }  

    8. 
    [java] view plain copy

    package base;  

      

    import java.io.Serializable;  

      

    /** 

     *   

     */  

    public class ReplyBody implements Serializable {  

        private static final long serialVersionUID = 1L;  

    }  

    9. 
    [java] view plain copy

    package base;  

      

    /** 

     *   

     */  

    public class ReplyClientBody extends ReplyBody {  

        private String clientInfo;  

      

        public ReplyClientBody(String clientInfo) {  

            this.clientInfo = clientInfo;  

        }  

      

        public String getClientInfo() {  

            return clientInfo;  

        }  

      

        public void setClientInfo(String clientInfo) {  

            this.clientInfo = clientInfo;  

        }  

    }  

    10. 
    [java] view plain copy

    package base;  

      

    /** 

     *   

     */  

    public class ReplyMsg extends BaseMsg {  

        public ReplyMsg() {  

            super();  

            setType(MsgType.REPLY);  

        }  

        private ReplyBody body;  

      

        public ReplyBody getBody() {  

            return body;  

        }  

      

        public void setBody(ReplyBody body) {  

            this.body = body;  

        }  

    }  

    11. 
    [java] view plain copy

    package base;  

      

    /** 

     *   

     */  

    public class ReplyServerBody extends ReplyBody {  

        private String serverInfo;  

        public ReplyServerBody(String serverInfo) {  

            this.serverInfo = serverInfo;  

        }  

        public String getServerInfo() {  

            return serverInfo;  

        }  

        public void setServerInfo(String serverInfo) {  

            this.serverInfo = serverInfo;  

        }  

    }  

    12.  netty客户端启动类
    [java] view plain copy

    package client;  

      

    import io.netty.bootstrap.Bootstrap;  

    import io.netty.buffer.Unpooled;  

    import io.netty.channel.ChannelFuture;  

    import io.netty.channel.ChannelInitializer;  

    import io.netty.channel.ChannelOption;  

    import io.netty.channel.EventLoopGroup;  

    import io.netty.channel.nio.NioEventLoopGroup;  

    import io.netty.channel.socket.SocketChannel;  

    import io.netty.channel.socket.nio.NioSocketChannel;  

    import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;  

    import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;  

    import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;  

    import io.netty.handler.codec.http.websocketx.WebSocketFrame;  

    import io.netty.handler.codec.serialization.ClassResolvers;  

    import io.netty.handler.codec.serialization.ObjectDecoder;  

    import io.netty.handler.codec.serialization.ObjectEncoder;  

    import io.netty.handler.timeout.IdleStateHandler;  

    import io.netty.util.concurrent.DefaultEventExecutorGroup;  

    import io.netty.util.concurrent.EventExecutorGroup;  

      

    import java.io.BufferedReader;  

    import java.io.IOException;  

    import java.io.InputStreamReader;  

    import java.util.concurrent.TimeUnit;  

      

    import base.AskMsg;  

    import base.AskParams;  

    import base.Constants;  

    import base.LoginMsg;  

      

    /** 

     *  

     */  

    public class NettyClientBootstrap {  

        private int port;  

        private String host;  

        private SocketChannel socketChannel;  

        private static final EventExecutorGroup group = new DefaultEventExecutorGroup(20);  

        public NettyClientBootstrap(int port, String host) throws InterruptedException {  

            this.port = port;  

            this.host = host;  

            start();  

        }  

        private void start() throws InterruptedException {  

            EventLoopGroup eventLoopGroup=new NioEventLoopGroup();  

            Bootstrap bootstrap=new Bootstrap();  

            bootstrap.channel(NioSocketChannel.class);  

            bootstrap.option(ChannelOption.SO_KEEPALIVE,true);  

            bootstrap.group(eventLoopGroup);  

            bootstrap.remoteAddress(host,port);  

            bootstrap.handler(new ChannelInitializer<SocketChannel>() {  

                @Override  

                protected void initChannel(SocketChannel socketChannel) throws Exception {  

                    socketChannel.pipeline().addLast(new IdleStateHandler(20,10,0));  

                    socketChannel.pipeline().addLast(new ObjectEncoder());  

                    socketChannel.pipeline().addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));  

                    socketChannel.pipeline().addLast(new NettyClientHandler());  

                }  

            });  

            ChannelFuture future =bootstrap.connect(host,port).sync();  

            if (future.isSuccess()) {  

                socketChannel = (SocketChannel)future.channel();  

                System.out.println("connect server  成功---------");  

            }  

        }  

        public static void main(String[]args) throws InterruptedException, IOException {  

            Constants.setClientId("002");  

            NettyClientBootstrap bootstrap=new NettyClientBootstrap(9999,"localhost");  

      

            LoginMsg loginMsg=new LoginMsg();  

            loginMsg.setPassword("yao");  

            loginMsg.setUserName("robin");  

            bootstrap.socketChannel.writeAndFlush(loginMsg);  

    //        while (true){  

    //            TimeUnit.SECONDS.sleep(3);  

    //            AskMsg askMsg=new AskMsg();  

    //            AskParams askParams=new AskParams();  

    //            askParams.setAuth("authToken");  

    //            askMsg.setParams(askParams);  

    //            bootstrap.socketChannel.writeAndFlush(askMsg);  

    //        }  

              

            BufferedReader console = new BufferedReader(new InputStreamReader(System.in));  

            while (true) {  

                String msg = console.readLine();  

                if (msg == null) {  

                    break;  

                } else if ("bye".equals(msg.toLowerCase())) {  

                    break;  

                } else if ("ping".equals(msg.toLowerCase())) {  

                } else {  

                  AskMsg askMsg=new AskMsg();  

                  AskParams askParams=new AskParams();  

                  askParams.setAuth("authToken");  

                  askParams.setContent(msg);  

                  askMsg.setParams(askParams);  

                  bootstrap.socketChannel.writeAndFlush(askMsg);  

                }  

            }  

              

              

        }  

    }  

    13. netty客户端操作实现类
    [java] view plain copy

    package client;  

      

      

      

    import io.netty.channel.ChannelHandlerContext;  

    import io.netty.channel.SimpleChannelInboundHandler;  

    import io.netty.handler.timeout.IdleStateEvent;  

    import io.netty.util.ReferenceCountUtil;  

      

    import java.util.Date;  

      

    import server.NettyChannelMap;  

    import base.AskMsg;  

    import base.BaseMsg;  

    import base.LoginMsg;  

    import base.MsgType;  

    import base.PingMsg;  

    import base.ReplyClientBody;  

    import base.ReplyMsg;  

    import base.ReplyServerBody;  

      

    /** 

     *   

     */  

    public class NettyClientHandler extends SimpleChannelInboundHandler<BaseMsg> {  

      

        private int UNCONNECT_NUM = 0;  

          

        @Override  

        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  

            if (evt instanceof IdleStateEvent) {  

                if(UNCONNECT_NUM >= 4) {  

                    System.err.println("connect status is disconnect.");  

                    ctx.close();  

                    //此处当重启次数达到4次之后,关闭此链接后,并重新请求进行一次登录请求  

                    return;  

                }  

                  

                IdleStateEvent e = (IdleStateEvent) evt;  

                switch (e.state()) {  

                    case WRITER_IDLE:  

                        System.out.println("send ping to server---date=" + new Date());  

                        PingMsg pingMsg=new PingMsg();  

                        ctx.writeAndFlush(pingMsg);  

                        UNCONNECT_NUM++;  

                        System.err.println("writer_idle over. and UNCONNECT_NUM=" + UNCONNECT_NUM);  

                        break;  

                    case READER_IDLE:    

                        System.err.println("reader_idle over.");  

                        UNCONNECT_NUM++;  

                        //读取服务端消息超时时,直接断开该链接,并重新登录请求,建立通道  

                    case ALL_IDLE:  

                        System.err.println("all_idle over.");  

                        UNCONNECT_NUM++;  

                        //读取服务端消息超时时,直接断开该链接,并重新登录请求,建立通道  

                    default:  

                        break;  

                }  

            }  

        }  

          

        @Override  

        protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {  

            MsgType msgType=baseMsg.getType();  

            switch (msgType){  

                case LOGIN:{  

                    //向服务器发起登录  

                    LoginMsg loginMsg=new LoginMsg();  

                    loginMsg.setPassword("alan");  

                    loginMsg.setUserName("lin");  

                    channelHandlerContext.writeAndFlush(loginMsg);  

                }break;  

                case PING:{  

                    System.out.println("receive server ping ---date=" + new Date());  

                    ReplyMsg replyPing=new ReplyMsg();  

                    ReplyClientBody body = new ReplyClientBody("send client msg.");  

                    replyPing.setBody(body);  

                    channelHandlerContext.writeAndFlush(replyPing);  

                }break;  

                case ASK:{  

                    AskMsg askMsg=(AskMsg)baseMsg;  

                    ReplyClientBody replyClientBody=new ReplyClientBody("receive server askmsg:" + askMsg.getParams().getContent());  

                    ReplyMsg replyMsg=new ReplyMsg();  

                    replyMsg.setBody(replyClientBody);  

                    channelHandlerContext.writeAndFlush(replyMsg);  

                }break;  

                case REPLY:{  

                    ReplyMsg replyMsg=(ReplyMsg)baseMsg;  

                    ReplyServerBody replyServerBody=(ReplyServerBody)replyMsg.getBody();  

                    UNCONNECT_NUM = 0;  

                    System.out.println("UNCONNECT_NUM="+ UNCONNECT_NUM + ",receive server replymsg: "+replyServerBody.getServerInfo());  

                }  

                default:break;  

            }  

            ReferenceCountUtil.release(msgType);  

        }  

          

        @Override  

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  

              

            System.err.println("in client exceptionCaught.");  

            super.exceptionCaught(ctx, cause);  

            //出现异常时,可以发送或者记录相关日志信息,之后,直接断开该链接,并重新登录请求,建立通道  

      

        }  

          

    }  

    14. 
    [java] view plain copy

    package server;  

      

    import io.netty.channel.Channel;  

    import io.netty.channel.socket.SocketChannel;  

      

    import java.util.Map;  

    import java.util.concurrent.ConcurrentHashMap;  

      

    /** 

     *   

     */  

    public class NettyChannelMap {  

        private static Map<String,SocketChannel> map=new ConcurrentHashMap<String, SocketChannel>();  

        public static void add(String clientId,SocketChannel socketChannel){  

            map.put(clientId,socketChannel);  

        }  

        public static Channel get(String clientId){  

           return map.get(clientId);  

        }  

        public static void remove(SocketChannel socketChannel){  

            for (Map.Entry entry:map.entrySet()){  

                if (entry.getValue()==socketChannel){  

                    map.remove(entry.getKey());  

                }  

            }  

        }  

      

    }  

    15.  netty服务端启动类
    [java] view plain copy

    package server;  

      

    import io.netty.bootstrap.ServerBootstrap;  

    import io.netty.channel.ChannelFuture;  

    import io.netty.channel.ChannelInitializer;  

    import io.netty.channel.ChannelOption;  

    import io.netty.channel.ChannelPipeline;  

    import io.netty.channel.EventLoopGroup;  

    import io.netty.channel.nio.NioEventLoopGroup;  

    import io.netty.channel.socket.SocketChannel;  

    import io.netty.channel.socket.nio.NioServerSocketChannel;  

    import io.netty.handler.codec.serialization.ClassResolvers;  

    import io.netty.handler.codec.serialization.ObjectDecoder;  

    import io.netty.handler.codec.serialization.ObjectEncoder;  

    import io.netty.handler.timeout.IdleStateHandler;  

      

    import java.util.concurrent.TimeUnit;  

      

    import base.AskMsg;  

      

    /** 

     *   

     */  

    public class NettyServerBootstrap {  

        private int port;  

        private SocketChannel socketChannel;  

        public NettyServerBootstrap(int port) throws InterruptedException {  

            this.port = port;  

            bind();  

        }  

      

        private void bind() throws InterruptedException {  

            EventLoopGroup boss=new NioEventLoopGroup();  

            EventLoopGroup worker=new NioEventLoopGroup();  

            ServerBootstrap bootstrap=new ServerBootstrap();  

            bootstrap.group(boss,worker);  

            bootstrap.channel(NioServerSocketChannel.class);  

            bootstrap.option(ChannelOption.SO_BACKLOG, 128);  

            bootstrap.option(ChannelOption.TCP_NODELAY, true);  

            bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);  

            bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {  

                @Override  

                protected void initChannel(SocketChannel socketChannel) throws Exception {  

                    ChannelPipeline p = socketChannel.pipeline();  

                  p.addLast(new IdleStateHandler(10,5,0));  

                    p.addLast(new ObjectEncoder());  

                    p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)));  

                    p.addLast(new NettyServerHandler());  

                }  

            });  

            ChannelFuture f= bootstrap.bind(port).sync();  

            if(f.isSuccess()){  

                System.out.println("server start---------------");  

            }  

        }  

        public static void main(String []args) throws InterruptedException {  

            NettyServerBootstrap bootstrap=new NettyServerBootstrap(9999);  

            while (true){  

    //            SocketChannel channel=(SocketChannel)NettyChannelMap.get("001");  

    //            if(channel!=null){  

    //                AskMsg askMsg=new AskMsg();  

    //                channel.writeAndFlush(askMsg);  

    //            }  

                TimeUnit.SECONDS.sleep(10);  

            }  

        }  

    }  

    16. netty服务端操作实现类
    [java] view plain copy

    package server;  

      

    import java.util.Date;  

      

    import io.netty.channel.ChannelHandlerContext;  

    import io.netty.channel.SimpleChannelInboundHandler;  

    import io.netty.channel.socket.SocketChannel;  

    import io.netty.handler.timeout.IdleStateEvent;  

    import io.netty.util.ReferenceCountUtil;  

    import base.AskMsg;  

    import base.BaseMsg;  

    import base.LoginMsg;  

    import base.MsgType;  

    import base.PingMsg;  

    import base.ReplyBody;  

    import base.ReplyClientBody;  

    import base.ReplyMsg;  

    import base.ReplyServerBody;  

      

    /** 

     *   

     */  

    public class NettyServerHandler extends SimpleChannelInboundHandler<BaseMsg> {  

        

         private int UNCONNECT_NUM_S = 0;  

          

         @Override  

            public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  

                if (evt instanceof IdleStateEvent) {  

                    if(UNCONNECT_NUM_S >= 4) {  

                        System.err.println("client connect status is disconnect.");  

                        ctx.close();  

                        //此处当重启次数达到4次之后,关闭此链接后,清除服务端相关的记录信息  

                        return;  

                    }  

                      

                    IdleStateEvent e = (IdleStateEvent) evt;  

                    switch (e.state()) {  

                        case WRITER_IDLE:  

                            System.out.println("send ping to client---date=" + new Date());  

                            PingMsg pingMsg=new PingMsg();  

                            ctx.writeAndFlush(pingMsg);  

                            UNCONNECT_NUM_S++;  

                            System.err.println("writer_idle over. and UNCONNECT_NUM_S=" + UNCONNECT_NUM_S);  

                            break;  

                        case READER_IDLE:    

                            System.err.println("reader_idle over.");  

                            UNCONNECT_NUM_S++;  

                            //读取服务端消息超时时,直接断开该链接,并重新登录请求,建立通道  

                        case ALL_IDLE:  

                            System.err.println("all_idle over.");  

                            UNCONNECT_NUM_S++;  

                            //读取服务端消息超时时,直接断开该链接,并重新登录请求,建立通道  

                        default:  

                            break;  

                    }  

                }  

            }  

          

          

        @Override  

        public void channelInactive(ChannelHandlerContext ctx) throws Exception {  

            System.err.println("in channelInactive.");  

            NettyChannelMap.remove((SocketChannel)ctx.channel());  

        }  

          

          

        @Override  

        protected void messageReceived(ChannelHandlerContext channelHandlerContext, BaseMsg baseMsg) throws Exception {  

      

            if(MsgType.LOGIN.equals(baseMsg.getType())){  

                LoginMsg loginMsg=(LoginMsg)baseMsg;  

                if("lin".equals(loginMsg.getUserName())&&"alan".equals(loginMsg.getPassword())){  

                    //登录成功,把channel存到服务端的map中  

                    NettyChannelMap.add(loginMsg.getClientId(),(SocketChannel)channelHandlerContext.channel());  

                    System.out.println("client"+loginMsg.getClientId()+" 登录成功");  

                }  

            }else{  

                if(NettyChannelMap.get(baseMsg.getClientId())==null){  

                        //说明未登录,或者连接断了,服务器向客户端发起登录请求,让客户端重新登录  

                        LoginMsg loginMsg=new LoginMsg();  

                        channelHandlerContext.channel().writeAndFlush(loginMsg);  

                }  

            }  

            switch (baseMsg.getType()){  

                case PING:{  

                      PingMsg pingMsg=(PingMsg)baseMsg;  

                      ReplyMsg replyPing=new ReplyMsg();  

                      ReplyServerBody body = new ReplyServerBody("send server msg.");  

                      replyPing.setBody(body);  

                      NettyChannelMap.get(pingMsg.getClientId()).writeAndFlush(replyPing);  

                      System.err.println("ping over.");  

                }break;  

                case ASK:{  

                    //收到客户端的请求  

                    AskMsg askMsg=(AskMsg)baseMsg;  

                    if("authToken".equals(askMsg.getParams().getAuth())){  

                        ReplyServerBody replyBody=new ReplyServerBody("receive client askmsg:" + askMsg.getParams().getContent());  

                        ReplyMsg replyMsg=new ReplyMsg();  

                        replyMsg.setBody(replyBody);  

                        NettyChannelMap.get(askMsg.getClientId()).writeAndFlush(replyMsg);  

                    }  

                }break;  

                case REPLY:{  

                    //收到客户端回复  

                    ReplyMsg replyMsg=(ReplyMsg)baseMsg;  

                    ReplyClientBody clientBody=(ReplyClientBody)replyMsg.getBody();  

                    UNCONNECT_NUM_S = 0;  

                    System.out.println("UNCONNECT_NUM_S=" + UNCONNECT_NUM_S +",receive client replymsg: "+clientBody.getClientInfo());  

                }break;  

                default:break;  

            }  

            ReferenceCountUtil.release(baseMsg);  

        }  

          

          

        @Override  

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {  

              

            System.err.println("in here has an error.");  

            NettyChannelMap.remove((SocketChannel)ctx.channel());  

            super.exceptionCaught(ctx, cause);  

            System.err.println("channel is exception over. (SocketChannel)ctx.channel()=" + (SocketChannel)ctx.channel());  

        }  

    展开全文
  • nginx跟grpc长连接相关配置,发现在1.15.6版本引入了"grpc_socket_keepalive"跟grpc直接相关长连接配置 Configures the “TCP keepalive” behavior for outgoing connections to a gRPC server. By default, the...

    问题描述

    公司内部容器平台,接入层用nginx做LB,用户有grpc协议需求,所以在lb层支持grcp反向代理,nginx从1.13开始支持grpc反向代理,将公司使用的nginx包从1.12升级到1.14.0后,增加grpc反向代理配置。配置完成后,打压力测试时,发现接入层机器端口占满而导致服务异常,开始追查问题。

    追查方向

    深入了解grpc协议

    • gRPC是一个高性能、通用的开源 RPC 框架,其由 Google 主要面向移动应用开发并基于HTTP/2协议标准而设计,基于ProtoBuf(Protocol Buffers) 序列化协议开发,且支持众多开发语言。gRPC 提供了一种简单的方法来精确地定义服务和为 iOS、Android 和后台支持服务自动生成可靠性很强的客户端功能库。客户端充分利用高级流和链接功能,从而有助于节省带宽、降低的 TCP 链接次数、节省 CPU 使用、和电池寿命。

    从上述描述可以看出grpc基于http2,client到server应该保持长连接,理论上不应该出现端口占满的问题

    抓包

    • client到接入层抓包看请求状态,发现client到接入层确实是长连接状态。接入层到后端server抓包发现,请求并没有保持长连接,一个请求处理完之后,链接就断开了。

    查nginx跟长连接相关配置

    • nginx长连接相关说明参考以下文档:nginx长连接
    • nginx跟grpc长连接相关配置,发现在1.15.6版本引入了"grpc_socket_keepalive"跟grpc直接相关长连接配置

      Configures the “TCP keepalive” behavior for outgoing connections to a gRPC server. By default, the operating system’s settings are in effect for the socket. If the directive is set to the value “on”, the SO_KEEPALIVE socket option is turned on for the socket.

    参考nginx长连接相关文档做了配置调整之后,发现nginx到server依然是短连接

    将nginx升级到nginx1.15.6(升级过程中由于线上的nginx用到了lua模块,碰到了lua模块不适配问题,解决方案见链接lua模块适配问题)配置grpc_socket_keepalive on,抓包发现,会有少量处理多个请求的长连接存在,但大部分依然是短连接。

    • 开启nginx debug模式

    从debug日志来看nginx确实尝试重用链接,但是从实际抓包看,nginx的链接重用的情况非常少,大部分都是请求处理完之后链接断开,怀疑nginx对grpc反向代理支持的不够理想。

    调整端口回收策略

    • 回到问题本身,要解决的问题是接入层端口占满,各种调整nginx的长连接配置并不能解决这个问题,就尝试从tcp链接端口回收方面解决,大部分的tcp链接都处于TIME_WAIT状态。

    • TIME_WAIT状态的时间是2倍的MSL(linux里一个MSL为30s,是不可配置的),在TIME_WAIT状态TCP连接实际上已经断掉,但是该端口又不能被新的连接实例使用。这种情况一般都是程序中建立了大量的短连接,而操作系统中对使用端口数量做了限制最大能支持65535个,TIME_WAIT过多非常容易出现连接数占满的异常。对TIME_WATI的优化有两个系统参数可配置:tcp_tw_reuse,tcp_tw_recycle 这两个参数在网上都有详细介绍,这是就不展开来讲

    • tcp_tw_reuse参考链接

    • tcp_tw_recycle(Enable fast recycling TIME-WAIT sockets)参考链接

    • 测试来看,开启tcp_tw_reuse并没有解决端口被占满的问题,所以开启了更激进的tcp_tw_recycle,至此端口占用显著降低,问题解决,由于我们接入层并不是用的NAT,所以这个配置不会影响服务。

    结论

    • 通过测试发现nginx对grpc的反向代理支持的不够理想,从nginx到后端server大部分请求不能保持长连接
    • 当用nginx做接入层反向代理时,对tcp参数调优可以避免端口占满等问题的发生
    展开全文
  • 今天,我将手把手教大家实现自适应的心跳保活机制,从而能高效维持长连接 1. 长连接 介绍 http的长连接和短连接(史上最通俗!) 2. 长连接断开的原因 从上节可知,在长连接的情况下,双方的所有通信 都建立...
  • 今天,我将手把手教大家实现自适应的心跳保活机制,从而能高效维持长连接 目录 示意图 1. 长连接 介绍 1.1 简介 示意图 1.2 作用 通过长时间保持双方连接,从而: 提高通信速度 确保实时性 避免短时间...
  • socket长连接维持

    千次阅读 2016-02-02 10:47:49
    长连接维持,是要客户端程序,定时向服务端程序,发送一个维持连接包的。 如果,长时间未发送维持连接包,服务端程序将断开连接。 客户端: 通过持有Client对象,可以随时(使用sendObject方法)发送Object给...
  • socket长连接可以维持多长时间,在客户端和服务器都不主动关闭socket的情况下,普通的中国移动和中国联通sim卡,运营商会多长时间释放掉socket,如果是运营商把socket关掉了,客户端和服务端都能收到通知吗?
  • 今天,我将 手把手教大家实现自适应的心跳保活机制,从而能高效维持长连接 目录 1. 长连接 介绍 1.1 简介 1.2 作用 通过 长时间保持双方连接,从而: 提高通信速度 确保实时性 避免短时间内...
  • 长连接-心跳保活机制

    千次阅读 2018-03-21 17:24:55
    前言当实现具备实时性需求时,我们一般会选择长连接的通信方式而在实现长连接方式时,存在很多性能问题,如 长连接保活今天,我将 手把手教大家实现自适应的心跳保活机制,从而能高效维持长连接目录1. 长连接 介绍...
  • python使用paramiko模块对多台服务器进行ssh连接,要用交互式的invoke_shell()执行服务器命令,现在使用的是cgi方式,每次调用都要新建...请问想要初始化python程序后一直维持ssh连接随用随取的话什么方法可以实现?
  • nginx设置响应连接是长连接或者短连接 ... nginx反向代理时保持长连接 ... ...用nginx做grpc反向代理,nginx到后端server不能维持长连接问题 https://juejin.im/post/6844903809534148622 ...
  • 长连接与短连接

    2013-03-14 16:49:58
    所谓长连接,指在一个TCP连接上可以连续发送多个数据包,在TCP连接保持期间,如果没有数据包发送,需要双方发检测包以维持此连接,一般需要自己做在线维持。 短连接是指通信双方有数据交互时,就建立一个TCP连接...
  • 长连接与短连接含义

    千次阅读 2020-08-27 21:44:21
    所谓长连接,指在一个TCP连接上可以连续发送多个数据包,在TCP连接保持期间,如果没有数据包发送,需要双方发检测包以维持此连接,一般需要自己做在线维持。 短连接是指通信双方有数据交互时,就建立一个TCP连接,...
  • 解决peewee的MySQL长连接问题

    千次阅读 2019-09-05 17:26:09
    在peewee中如何维持长连接呢? 解决方法比较晦涩,需要自定义一个支持重试的类,然后自定义一种RetryMySQLDatabase混入类 from peewee import * from peewee import __exception_wrapper__ class ...
  • 基于Proxy代理服务器的长连接方法

    千次阅读 2016-01-16 12:39:14
    基于Proxy代理服务器的长连接方法...用Proxy代替智能终端与服务器维持长连接,而智能终端可以断开网络连接,进入休眠状态,不仅能极大地改善现有的智能手机耗电量过大的现状,而且能充分利用移动运营商的分配的带宽资源
  • websocket 长连接

    千次阅读 2018-06-14 18:37:27
    web socket 长连接
  • 长连接为何要发送心跳包

    万次阅读 2015-07-23 22:42:23
    最近面试的时候,被闻到关于Android IM 的问题,因为之前做过一个类似于微信的聊天... 回来之后我查询资料后发现,Android的推送在后台维持的服务都会发送心跳包来维持长连接,当一台智能手机连上移动网络的时候,其实
  • 前言当实现具备实时性需求时,我们一般会选择长连接的通信方式而在实现长连接方式时,存在很多性能问题,如 长连接保活今天,我将 手把手教大家实现自适应的心跳保活机制,从而能高效维持长连接目录1. 长连接 介绍...
  • mysql长连接与短连接

    千次阅读 2019-02-14 16:38:42
    什么是长连接? 其实长连接是相对于通常的短连接而说的,也就是长时间...这就要求长连接在没有数据通信时,定时发送数据包,以维持连接状态,短连接在没有数据传输时直接关闭就行了 什么时候用长连接,短连接? ...
  • 移动端长连接

    千次阅读 2017-03-18 23:41:14
    1、什么是长连接先说短连接, 短连接是通讯双方有数据交互时就建立一个TCP连接, 数据发送完成后,则断开此连接(建立连接需三次握手,断开连接需四次挥手)。长连接就是大家建立TCP连接之后, 不主动断开。 双方互相...
  • socketio长连接应用

    2018-01-09 16:16:53
    兼容所有浏览器的长连接源码,低版本浏览器是长轮询,高版本websocket
  • tcp如何保持时间连接不断开

    千次阅读 2019-07-08 17:52:09
    KeepAlive机制无法代替心跳机制,需要在应用层 自己实现心跳机制以检测长连接的有效性,从而高效维持长连接 综合主流移动IM产品,此处建议心跳检测时间 为4分钟 swoole中 如果想保持长时间连接(中间没有数据交互...
  • mysql长连接和短连接的问题

    千次阅读 2013-05-29 14:50:02
    什么是长连接? 其实长连接是相对于通常的短连接而说的,也就是长时间保持...这就要求长连接在没有数据通信时,定时发送数据包,以维持连接状态,短连接在没有数据传输时直接关闭就行了 什么时候用长连接,短连接?
  • 【PHP】 phpredis 长连接实现原理

    千次阅读 2019-10-05 22:36:21
    多年以前有个大佬问过一个问题,PHP的phpredis第三方扩展(客户端)怎么实现与redis服务端维持长连接,并且每个请求是怎么复用这些连接的,今天才突然想一探究竟,便翻了翻一下源码。PHP源码版本是php-7.2.19, ...
  • 前提: HTTP/1.0默认使用短连接,HTTP/1.1开始默认...长连接:在一个TCP连接上可以发送多个数据包,但是如果没有数据包发送时,也要双方发检测包以维持这个长连接;三次握手后连接,不断开连接,保持客户端和服务端...
  • 我们一般会选择长连接的通信方式而在实现长连接方式时,存在很多性能问题,如 长连接保活今天,我将 手把手教大家实现自适应的心跳保活机制,从而能高效维持长连接目录1. 长连接 介绍1.1 简介1.2 作用通过 长时间...
  • Socket 长连接与短连接,心跳

    千次阅读 2013-10-26 09:45:29
    所谓长连接,指在一个TCP连接上可以连续发送多个数据包,在TCP连接保持期间,如果没有数据包发送,需要双方发检测包以维持此连接,一般需要自己做在线维持。  短连接是指通信双方有数据交互时,就建立一个TCP连接...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 91,915
精华内容 36,766
关键字:

维持长连接