精华内容
下载资源
问答
  • 如何优雅关闭 Netty服务

    万次阅读 2021-08-02 13:38:10
    Netty服务如何优雅关闭 Netty 中将写出数据分成了两个部分 第一部分先缓存起来 第二部分再通过 Java 原生的 SocketChannel 发送出去。 问题

    Netty 中将写出数据分成了两个部分
    第一部分先缓存起来
    第二部分再通过 Java 原生的 SocketChannel 发送出去。

    问题

    try {
        // 省略其他代码
        // 9. 等待服务端监听端口关闭,这里会阻塞主线程
        f.channel().closeFuture().sync();
    } finally {
        // 10. 优雅地关闭两个线程池
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
    

    通过 NioEventLoopGroup 的 shutdownGracefully () 来关闭服务器,

    那么,今天的问题来了

    1 shutdownGracefully () 内部的逻辑是怎样的?
    2 进行了哪些资源的释放?
    3 NioEventLoopGroup 如何知道所有的 EventLoop 都优雅关闭了呢?

    优雅关闭服务过程
    正常来说,服务是不会走到第 10 步的,除非出现异常,因为第 9 步的 sync () 会阻塞 main 线程。
    bossGroup.shutdownGracefully();

    // io.netty.util.concurrent.AbstractEventExecutorGroup#shutdownGracefully
    @Override
    public Future<?> shutdownGracefully() {
        // 调用重载方法
        // 第一个参数为静默周期,默认2秒
        // 第二个参数为超时时间,默认15秒
        return shutdownGracefully(DEFAULT_SHUTDOWN_QUIET_PERIOD, DEFAULT_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
    }
    
    // io.netty.util.concurrent.MultithreadEventExecutorGroup#shutdownGracefully
    @Override
    public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
        for (EventExecutor l: children) {
            // 调用孩子的shutdownGracefully()
            // 这里的EventExecutor无疑就是NioEventLoop
            l.shutdownGracefully(quietPeriod, timeout, unit);
        }
        // 返回的是NioEventLoopGroup的terminationFuture
        return terminationFuture();
    }
    
    可以看到,内部其实是调用了每个 NioEventLoop 的 shutdownGracefully () 方法,最后返回了 NioEventLoopGroup 的 terminationFuture。
    
    // io.netty.util.concurrent.SingleThreadEventExecutor#shutdownGracefully
    @Override
    public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
        // 参数检验
        ObjectUtil.checkPositiveOrZero(quietPeriod, "quietPeriod");
        if (timeout < quietPeriod) {
            throw new IllegalArgumentException(
                "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
        }
        ObjectUtil.checkNotNull(unit, "unit");
    
        // 其它线程正在执行关闭,直接返回
        if (isShuttingDown()) {
            return terminationFuture();
        }
    
        boolean inEventLoop = inEventLoop();
        boolean wakeup;
        int oldState;
        for (;;) {
            // 再次检查其它线程正在执行关闭,直接返回
            if (isShuttingDown()) {
                return terminationFuture();
            }
            int newState;
            wakeup = true;
            oldState = state;
            // 我们是在main线程中执行的
            // 所以不在EventLoop中
            if (inEventLoop) {
                newState = ST_SHUTTING_DOWN;
            } else {
                switch (oldState) {
                    case ST_NOT_STARTED:
                    case ST_STARTED:
                        // 显然,我们的程序已经启动了
                        // 所以,新状态为ST_SHUTTING_DOWN
                        newState = ST_SHUTTING_DOWN;
                        break;
                    default:
                        newState = oldState;
                        wakeup = false;
                }
            }
            // key1,更新状态成功,退出循环
            if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
                break;
            }
        }
        // key2,修改NioEventLoop的属性标识
        gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
        gracefulShutdownTimeout = unit.toNanos(timeout);
    
        if (ensureThreadStarted(oldState)) {
            return terminationFuture;
        }
    
        // key3,添加一个空任务,唤醒EventLoop
        if (wakeup) {
            taskQueue.offer(WAKEUP_TASK);
            if (!addTaskWakesUp) {
                wakeup(inEventLoop);
            }
        }
    
        // key4,返回NioEventLoop的terminationFuture
        return terminationFuture();
    }
    // io.netty.channel.nio.NioEventLoop#wakeup
    @Override
    protected void wakeup(boolean inEventLoop) {
        if (!inEventLoop && nextWakeupNanos.getAndSet(AWAKE) != AWAKE) {
            selector.wakeup();
        }
    }
    
    

    这个方法就结束了,让我们整理下这个方法的主要逻辑:

    1 修改状态为 ST_SHUTTING_DOWN;
    2 修改两个属性,一个是静默周期,一个是超时时间;
    3 往队列中放了一个空任务,并唤醒 NioEventLoop,实际上是唤醒的 selector,也就是 selector.select () 的位置;
    4 返回 NioEventLoop 的 terminationFuture,这个 terminationFuture 怎么跟 NioEventLoopGroup 的 terminationFuture 联系起来的呢?或者说,NioEventLoopGroup 怎么知道所有的 NioEventLoop 都关闭成功了呢?

    可以看到,这个方法中并没有对 “优雅关闭” 做什么实质的处理,那么,真正关闭的处理逻辑在哪里呢?
    经过前面的学习,我们知道,NioEventLoop 相当于一个线程,它的 run () 方法中有对 selector 的轮询,而这里又唤醒了 selector,所以,我们大胆猜测,处理逻辑应该是在 NioEventLoop 的 run () 方法中,让我们再仔细研究一下这个 run () 方法:

    @Override
    protected void run() {
        int selectCnt = 0;
        // 轮询
        for (;;) {
            try {
                int strategy;
                try {
                    // 策略,这里面会检测如果有任务,调用的是selectNow(),也就是不阻塞
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
                    switch (strategy) {
                        case SelectStrategy.CONTINUE:
                            continue;
                        case SelectStrategy.BUSY_WAIT:
                        case SelectStrategy.SELECT:
                            long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
                            if (curDeadlineNanos == -1L) {
                                curDeadlineNanos = NONE; // nothing on the calendar
                            }
                            nextWakeupNanos.set(curDeadlineNanos);
                            try {
                                if (!hasTasks()) {
                                    // 如果没有任务,才会调用这里的select(),默认是阻塞的
                                    // 通过前面的唤醒,唤醒的是这里的select()
                                    strategy = select(curDeadlineNanos);
                                }
                            } finally {
                                nextWakeupNanos.lazySet(AWAKE);
                            }
                        default:
                    }
                
                    // 省略其他内容
            }
            try {
                // 判断是否处于关闭中
                if (isShuttingDown()) {
                    // key1,关闭什么?
                    closeAll();
                    // key2,确定关闭什么?
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }
    

    针对优雅关闭的时候,肯定不能让 run () 方法阻塞在 select () 上,所以前面向队列中添加了一个空任务,当有任务的时候,它调用的就是 selectNow () 方法,selectNow () 方法不管有没有读取到任务都会直接返回,不会产生任何阻塞,最后,程序逻辑会走到 isShuttingDown() 这个判断的地方,这里有两个比较重要的方法:

    1.closeAll (),关闭了什么?
    2 confirmShutdown (),确定关闭了什么?

    我们先来看看 closeAll() 这个方法,这里有个调试技巧,请看源码:

    // io.netty.channel.nio.NioEventLoop#closeAll
    private void closeAll() {
        // 再次调用selectNow()方法
        selectAgain();
        // selector中所有的SelectionKey
        Set<SelectionKey> keys = selector.keys();
        Collection<AbstractNioChannel> channels = new ArrayList<AbstractNioChannel>(keys.size());
        for (SelectionKey k: keys) {
            // 最好在这里打个断点,因为有些NioEventLoop里面是没有绑定Channel的,所以没有Channel需要关闭
            // 在这里打个断点,NioServerSocketChannel对应的NioEventLoop肯定会到这里
            // 这里取出来的附件就是NioServerSocketChannel
            Object a = k.attachment();
            if (a instanceof AbstractNioChannel) {
                // 把要关闭的Channel加到集合中
                channels.add((AbstractNioChannel) a);
            } else {
                // NioTask当前版本没有地方使用
                k.cancel();
                @SuppressWarnings("unchecked")
                NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
                invokeChannelUnregistered(task, k, null);
            }
        }
    
        // 遍历集合
        for (AbstractNioChannel ch: channels) {
            // key,调用Channel的unsafe进行关闭
            ch.unsafe().close(ch.unsafe().voidPromise());
        }
    }
    

    closeAll() 方法中主要就是对 Channel 进行关闭,这些 Channel 的关闭最后又是调用他们的 unsafe 进行关闭的,让我们跟踪到 unsafe 里面看看到底做了哪些操作:

    // io.netty.channel.AbstractChannel.AbstractUnsafe#close
    @Override
    public final void close(final ChannelPromise promise) {
        assertEventLoop();
    
        ClosedChannelException closedChannelException = new ClosedChannelException();
        // 调用重载方法
        close(promise, closedChannelException, closedChannelException, false);
    }
    // io.netty.channel.AbstractChannel.AbstractUnsafe#close
    private void close(final ChannelPromise promise, final Throwable cause,
                       final ClosedChannelException closeCause, final boolean notify) {
        if (!promise.setUncancellable()) {
            return;
        }
    
        // 使用closeInitiated防止重复关闭
        // closeInitiated初始值为false,所以这个大if可以跳过
        if (closeInitiated) {
            // 省略不相关代码
            return;
        }
    
        // 下面的逻辑只会执行一次
        closeInitiated = true;
    
        final boolean wasActive = isActive();
        // 写出数据时的缓存
        final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
        // 置为空表示不允许再写出数据了
        this.outboundBuffer = null;
        // 默认为空
        Executor closeExecutor = prepareToClose();
        if (closeExecutor != null) {
            // 对于NioServerSocketChannel,默认为空,不会走到这,省略这段代码
        } else {
            try {
                // key,一看就是干正事的方法
                doClose0(promise);
            } finally {
                if (outboundBuffer != null) {
                    // todo 未发送的数据将失败,为什么不让它们发送出去呢?
                    outboundBuffer.failFlushed(cause, notify);
                    outboundBuffer.close(closeCause);
                }
            }
            if (inFlush0) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        // 触发channelInactive()和channelDeregister()方法
                        fireChannelInactiveAndDeregister(wasActive);
                    }
                });
            } else {
                // 触发channelInactive()和channelDeregister()方法
                fireChannelInactiveAndDeregister(wasActive);
            }
        }
    }
    private void doClose0(ChannelPromise promise) {
        try {
            // 干正事的方法
            doClose();
            // 将closeFuture设置为已关闭
            closeFuture.setClosed();
            // 将promise设置为已成功
            safeSetSuccess(promise);
        } catch (Throwable t) {
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
    @Override
    protected void doClose() throws Exception {
        // 关闭Java原生的Channel,我们这里为ServerSocketChannel
        javaChannel().close();
    }
    // io.netty.channel.nio.AbstractNioChannel#doDeregister
    // 这个方法是在fireChannelInactiveAndDeregister()中调用的
    @Override
    protected void doDeregister() throws Exception {
        // 取消SelectionKey
        eventLoop().cancel(selectionKey());
    }
    

    总结一下,close() 的过程关闭了哪些资源:

    1 写出数据的缓存置空,不允许再写出数据;
    2 缓存中未发送的数据将失败;
    3 关闭 Java 原生的 Channel;
    4 closeFuture 置为关闭状态;
    5 取消 Channel 关联的 SelectionKey;
    6 调用 channelInactive () 和 channelDeregister () 方法;

    到这里,整个 closeAll() 就完了,我们再来看看 confirmShutdown():

    protected boolean confirmShutdown() {
        // 不是正在关闭,返回false
        if (!isShuttingDown()) {
            return false;
        }
    
        if (!inEventLoop()) {
            throw new IllegalStateException("must be invoked from an event loop");
        }
    
        // 取消定时任务
        cancelScheduledTasks();
    
        // 设置优雅关闭服务的开始时间
        if (gracefulShutdownStartTime == 0) {
            gracefulShutdownStartTime = ScheduledFutureTask.nanoTime();
        }
    
        // 运行所有任务和所有shudown的钩子任务
        if (runAllTasks() || runShutdownHooks()) {
            // 有任何一个任务运行,都会进入这里
            // 已经关闭了,返回true
            if (isShutdown()) {
                return true;
            }
    
            // 如果静默周期为0,返回true
            if (gracefulShutdownQuietPeriod == 0) {
                return true;
            }
            // 否则添加一个空任务,返回false
            taskQueue.offer(WAKEUP_TASK);
            return false;
        }
    
        // 没有任何任务运行
        final long nanoTime = ScheduledFutureTask.nanoTime();
    
        // 如果已经关闭,或者超时了,返回true
        if (isShutdown() || nanoTime - gracefulShutdownStartTime > gracefulShutdownTimeout) {
            return true;
        }
    
        // 如果当前时间减去上一次运行的时间在静默周期以内
        if (nanoTime - lastExecutionTime <= gracefulShutdownQuietPeriod) {
            // 添加一个空任务,并休眠100ms
            taskQueue.offer(WAKEUP_TASK);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                // Ignore
            }
            // 返回false
            return false;
        }
    
        // 超过了静默周期,返回true
        return true;
    }
    
    protected void run() {
        for(;;) {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;
                }
            }
        }
    }
    

    好了,到这里,今天所有的问题就都解决完了,让我们来总结一下 “优雅关闭服务” 的过程:

    1 优雅关闭服务分成两个部分,调用 shutdownGracefully (),只是修改一些状态和属性的值,并唤醒 NioEventLoop 中的 select () 方法;
    2 真正的处理逻辑在 NioEventLoop 的 run () 方法中;
    3 关闭的关键方法又分为 closeAll () 和 confirmShutdown () 两个方法;
    4 closeAll () 中主要是对 Channel 的关闭,跟 Channel 相关的资源释放都在这里,比如缓存消息的失败、SelectionKey 的取消、Java 原生 Channel 的关闭等;
    5 confirmShutdown () 中主要是对队列中的任务或者钩子任务进行处理,主要是通过一个叫做静默周期的参数来控制尽量执行完所有任务,但是,也不能无限期等待,所以,还有一个超时时间进行控制;
    6 接着,程序的逻辑来到了真正启动线程的地方,也就是 doStartThread () 方法,它里面有个非常重要的方法 cleanup (),并清理了所有 ThreadLocal 相关的资源,最后把 NioEventLoop 的状态设置为 ST_TERMINATED;
    7 cleanup () 方法中主要是对 Selector 进行关闭;
    8 然后,我们分析了 NioEventLoopGroup 与 NioEventLoop 在程序终止时相关的联系,包括 threadLock 和 terminationFuture 等;
    9 最后,我们验证了 Netty 中的优雅关闭服务的时候确实不会真正发送缓存中的内容。

    还是非常枯燥。抽空补充一些有意思的资料。

    不妨我们假设一下。如果我们是设计者。我们怎么去优雅的关闭呢?或许会有不同的感受! TODO 补充一些有趣的知识点,设计点。而不是为了读源码而去读 TODO 。。。。

    题外话:Tomcat 与 Netty 的区别?

    Tomcat 与 Spring 的关系 ?

    Spring 应用本身就是一个 Servlet,而 Tomcat 和 Jetty 这样的 Web 容器,负责加载和运行 Servlet。

    在这里插入图片描述

    展开全文
  • socket优雅关闭连接

    2021-08-19 17:20:02
    优雅关闭连接前言closeshutdown若被动方一直不发第三次挥手代码 前言 今天突然发现最近搞的那个HTTP服务器的一个bug。 以前有个突然服务器崩溃的问题,不过是偶然发生的,所以一直搁置没有解决。 今天调试过程中...


    前言

    今天突然发现最近搞的那个HTTP服务器的一个bug。
    以前有个突然服务器崩溃的问题,不过是偶然发生的,所以一直搁置没有解决。
    今天调试过程中突然发现一个致命问题,就是客户端发的HTTP请求在最后一次执行后直接调用了close,而服务端返回的数据导致这个客户端无法接受到,经百度发现确实存在使得进程直接退出的问题。


    close

    close函数或者shutdown函数调用后都会向对端发送FIN。
    一般是客户端作为发起断开连接的主动方,而close函数调用后会将调用者的读写端全部关闭,因此,当被动关闭方要进行读写处理就会触发异常。

    主动关闭端close以后收到被动关闭端发来的数据,会直接回复RST,内核连接断开,但此时被动关闭端的应用层并不知道已经断开。
    若被动关闭方调用的是read,则会触发RST机制,然后断开。
    若调用的是write,则会产生SIGPIPE中断,不进行捕捉则默认退出进程。


    shutdown

    函数原型

    int shutdown(int sock, int howto);
    

    其中 howto 为断开方式。有以下取值:

    SHUT_RD:关闭读。这时应用层不应该再尝试接收数据,内核协议栈中就算接收缓冲区收到数据也会被丢弃。

    SHUT_WR:关闭写。如果发送缓冲区中还有数据没发,会将将数据传递到目标主机。

    SHUT_RDWR:关闭读和写。相当于close()了。

    FIN 是指"我不再发送数据",因此shutdown() 关闭读不会给对方发FIN, 关闭写才会发FIN。


    两函数的区别

    close函数会关闭套接字,如果有其他进程共享,那么这个套接字仍然是打开的,可以读写,并不会发生四次挥手;

    shutdown则会根据how选项切断进程共享的套接字的该功能,比如所有试图读的进程都会接收到EOF标识,所有试图写的进程将会检测到SIGPIPE信号;

    注意:showdown后仍然要调用close关闭socket


    若被动方一直不发第三次挥手

    第三次挥手,是由被动方主动触发的,比如调用close()。

    如果由于代码错误或者其他一些原因,被动方就是不执行第三次挥手。

    这时候,主动方会根据自身第一次挥手的时候用的是 close() 还是 shutdown(fd, SHUT_WR) ,有不同的行为表现。

    如果是 shutdown(fd, SHUT_WR) ,说明主动方其实只关闭了写,但还可以读,此时会一直处于 FIN-WAIT-2, 死等被动方的第三次挥手。

    如果是 close(), 说明主动方读写都关闭了,这时候会处于 FIN-WAIT-2一段时间,这个时间由 net.ipv4.tcp_fin_timeout 控制,一般是 60s,这个值正好跟2MSL一样 。超过这段时间之后,状态不会变成 TIME-WAIT,而是直接变成CLOSED


    代码

    #include<pthread.h>
    #include<unistd.h>
    #include<errno.h>
    #include<sys/socket.h>
    #include<stdlib.h>
    #include<ctype.h>
    #include<arpa/inet.h>
    #include<stdio.h>
    #include<signal.h>
    #include<string.h>
    
    void sysError(const char* str){
        perror(str);
        exit(1);
    }
    
     void sign_func(){
        printf("Catch SIGPIPE\n");
        sleep(1);
     }
    
    int main(){
        struct sockaddr_in serveraddr, clientaddr;
        serveraddr.sin_family = AF_INET;
        serveraddr.sin_port = htons(6666);
        serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
        int lfd,cfd;
    
        lfd = socket(AF_INET,SOCK_STREAM,0);
        if(lfd==-1) sysError("socket error ");
        if(bind(lfd,(struct sockaddr*)&serveraddr,sizeof(serveraddr))==-1) sysError("bind error ");
        if(listen(lfd,128)==-1) sysError("listen error ");
    
        socklen_t clientaddrlen=sizeof(clientaddr);
        if((cfd = accept(lfd,(struct sockaddr*)&clientaddr,&(clientaddrlen)))==-1) sysError("accept error ");
    
        struct sigaction sa;
        memset(&sa, '\0', sizeof(sa));
        sa.sa_handler = sign_func; 
        sigfillset(&sa.sa_mask);       
        sigaction(SIGPIPE, &sa, NULL);
        
        char buffer[BUFSIZ];
        while(1){
            int n = read(cfd,buffer,sizeof(buffer));
            write(STDOUT_FILENO,buffer,n);
            for(int i=0;i<n;i++){
                buffer[i] = toupper(buffer[i]);
                sleep(1);
            } 
            write(cfd,buffer,n);
        }
        close(cfd);
        close(lfd);
        return 0;
    }
    

    结果如下:

    客户端发完数据不等服务端回复立刻关闭,强制退出效果和close一样
    在这里插入图片描述
    服务端等待数据处理完成尝试write写时就会产生SIGPIPE
    服务端任务逻辑是将客户端发来的数据小写转大写,sleep的作用表示服务端要处理很长时间。
    在这里插入图片描述


    epoll试试

    #include<pthread.h>
    #include<unistd.h>
    #include<errno.h>
    #include<sys/socket.h>
    #include<stdlib.h>
    #include<ctype.h>
    #include<arpa/inet.h>
    #include<stdio.h>
    #include<signal.h>
    #include<string.h>
    #include<sys/epoll.h>
    
    #define MAXLINE 8192
    #define SERV_PORT 6666
    #define OPEN_MAX 5000
    
    void sysError(const char* str){
        perror(str);
        exit(1);
    }
    
     void sign_func(){
        printf("Catch SIGPIPE\n");
        sleep(1);
     }
    
    int main(){
        int listenfd, connfd, sockfd;
        int n, num = 0;
        ssize_t nready, efd, res;
        char buf[MAXLINE], str[INET_ADDRSTRLEN];
        socklen_t cli_len;
    
        struct sockaddr_in cliaddr, servaddr;
        struct epoll_event tep, ep[OPEN_MAX]; 
    
        listenfd = socket(AF_INET, SOCK_STREAM, 0);
        int opt = 1;
        setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); 
        bzero(&servaddr, sizeof(servaddr));
        servaddr.sin_family = AF_INET;
        servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
        servaddr.sin_port = htons(SERV_PORT);
        bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
        listen(listenfd, 20);
    
        struct sigaction sa;
        memset(&sa, '\0', sizeof(sa));
        sa.sa_handler = SIG_IGN; 
        sigfillset(&sa.sa_mask);       
        sigaction(SIGPIPE, &sa, NULL);
    
        efd = epoll_create(OPEN_MAX); 
        if (efd == -1)
            sysError("epoll_create error");
    
        tep.events = EPOLLIN;
        tep.data.fd = listenfd;                              
        res = epoll_ctl(efd, EPOLL_CTL_ADD, listenfd, &tep); 
        if (res == -1)
            sysError("epoll_ctl error");
    
        while (1)
        {
            nready = epoll_wait(efd, ep, OPEN_MAX, -1); 
            if (nready == -1)
                sysError("epoll_wait error");
    
            for (int i = 0; i < nready; ++i)
            {
                if (!(ep[i].events & EPOLLIN)) 
                    continue;
    
                if (ep[i].data.fd == listenfd) 
                {
                    cli_len = sizeof(cliaddr);
                    connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &cli_len);
    
                    printf("received from %s at PORT %d\n", inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)), ntohs(cliaddr.sin_port));
                    printf("cfd %d---client %d\n", connfd, ++num);
    
                    tep.events = EPOLLIN;
                    tep.data.fd = connfd;
                    res = epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &tep);
                    if (res == -1)
                        sysError("epoll_ctl error");
                }
                else
                {
                    sockfd = ep[i].data.fd;
                    n = read(sockfd, buf, MAXLINE);
    
                    // if (n == 0)
                    // {
                    //     res = epoll_ctl(efd, EPOLL_CTL_DEL, sockfd, NULL);
                    //     if (res == -1)
                    //         sysError("epoll_ctl error");
    
                    //     close(sockfd);
                    //     printf("client[%d] closed connection\n", sockfd);
                    // }
                    // else if (n < 0)
                    // {
                    //     perror("read n<0 error:");
                    //     res = epoll_ctl(efd, EPOLL_CTL_DEL, sockfd, NULL);
                    //     close(sockfd);
                    // }
                    // else
                    // {
                        for (int i = 0; i < n; ++i){
                            buf[i] = toupper(buf[i]);
                            sleep(1);
                        } 
                        //write(STDOUT_FILENO, buf, n);
                        write(sockfd, buf, n);
                    // }
                }
            }
        }
        close(listenfd);
        return 0;
    }
    

    同时开启两个客户端进行连接,其中一个客户端发送完立刻断开
    结果如下:
    可以看到,其中一个客户端断开以后,服务端无论是通过信号捕捉还是SIG_IGN忽略信号,其他连接再次建立或者传输数据均不受影响。

    而没有这两个动作处理的话就会使得服务器直接退出,这也验证了前言中我那个服务器偶然崩溃的现象。

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    把上面例子中的read函数注释取消掉再次实验:

    #include<pthread.h>
    #include<unistd.h>
    #include<errno.h>
    #include<sys/socket.h>
    #include<stdlib.h>
    #include<ctype.h>
    #include<arpa/inet.h>
    #include<stdio.h>
    #include<signal.h>
    #include<string.h>
    #include<sys/epoll.h>
    
    #define MAXLINE 8192
    #define SERV_PORT 6666
    #define OPEN_MAX 5000
    
    void sysError(const char* str){
        perror(str);
        exit(1);
    }
    
     void sign_func(){
        printf("Catch SIGPIPE\n");
        sleep(1);
     }
    
    int main(){
        int listenfd, connfd, sockfd;
        int n, num = 0;
        ssize_t nready, efd, res;
        char buf[MAXLINE], str[INET_ADDRSTRLEN];
        socklen_t cli_len;
    
        struct sockaddr_in cliaddr, servaddr;
        struct epoll_event tep, ep[OPEN_MAX]; 
    
        listenfd = socket(AF_INET, SOCK_STREAM, 0);
        int opt = 1;
        setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); 
        bzero(&servaddr, sizeof(servaddr));
        servaddr.sin_family = AF_INET;
        servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
        servaddr.sin_port = htons(SERV_PORT);
        bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr));
        listen(listenfd, 20);
    
        // struct sigaction sa;
        // memset(&sa, '\0', sizeof(sa));
        // sa.sa_handler = SIG_IGN; 
        // sigfillset(&sa.sa_mask);       
        // sigaction(SIGPIPE, &sa, NULL);
    
        efd = epoll_create(OPEN_MAX); 
        if (efd == -1)
            sysError("epoll_create error");
    
        tep.events = EPOLLIN;
        tep.data.fd = listenfd;                              
        res = epoll_ctl(efd, EPOLL_CTL_ADD, listenfd, &tep); 
        if (res == -1)
            sysError("epoll_ctl error");
    
        while (1)
        {
            nready = epoll_wait(efd, ep, OPEN_MAX, -1); 
            if (nready == -1)
                sysError("epoll_wait error");
    
            for (int i = 0; i < nready; ++i)
            {
                if (!(ep[i].events & EPOLLIN)) 
                    continue;
    
                if (ep[i].data.fd == listenfd) 
                {
                    cli_len = sizeof(cliaddr);
                    connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &cli_len);
    
                    printf("received from %s at PORT %d\n", inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)), ntohs(cliaddr.sin_port));
                    printf("cfd %d---client %d\n", connfd, ++num);
    
                    tep.events = EPOLLIN;
                    tep.data.fd = connfd;
                    res = epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &tep);
                    if (res == -1)
                        sysError("epoll_ctl error");
                }
                else
                {
                    sockfd = ep[i].data.fd;
                    n = read(sockfd, buf, MAXLINE);
    
                    if (n == 0)
                    {
                        res = epoll_ctl(efd, EPOLL_CTL_DEL, sockfd, NULL);
                        if (res == -1)
                            sysError("epoll_ctl error");
                        sleep(6);
    
                        close(sockfd);
                        printf("client[%d] closed connection\n", sockfd);
                    }
                    else if (n < 0)
                    {
                        perror("read n<0 error:");
                        res = epoll_ctl(efd, EPOLL_CTL_DEL, sockfd, NULL);
                        close(sockfd);
                    }
                    else
                    {
                        for (int i = 0; i < n; ++i){
                            buf[i] = toupper(buf[i]);
                            sleep(1);
                        } 
                        //write(STDOUT_FILENO, buf, n);
                        write(sockfd, buf, n);
                        write(sockfd, buf, n);
                    }
                }
            }
        }
        close(listenfd);
        return 0;
    }
    

    read的注释取消掉,注意这次代码有两次write
    因为当主动方调用close后,被动方调用write时,主动方第一次会产生一个,当前接收到的数据丢弃并返回RST的动作,此时被动端收到RST直接关闭。再次调用write向主动方写,就会触发SIGPIPE导致进程退出。

    展开全文
  • 优雅关闭web服务DBHelper, err = gorm.Open("mysql", "root:root@(115.159.59.129:3306)/test?charset=utf8&parseTime=True&loc=Local")if err != nil {log.Fatal("数据库初始化错误", err) //log.Fatal输出...

    优雅关闭web服务

    DBHelper, err = gorm.Open("mysql", "root:root@(115.159.59.129:3306)/test?charset=utf8&parseTime=True&loc=Local")

    if err != nil {

    log.Fatal("数据库初始化错误", err) //log.Fatal输出日志并且退出主程序

    return

    }

    优雅的关闭server

    part 1初始化退出信号

    package src

    import (

    "context"

    "log"

    "os"

    "os/signal"

    "time"

    )

    var ServerSigChan chan os.Signal

    var ctx context.Context

    func init() {

    ServerSigChan = make(chan os.Signal)

    ctx, _ = context.WithTimeout(context.Background(), 10*time.Second)

    go ServerNotify(ctx)

    }

    func ServerNotify(ctx context.Context) {

    signal.Notify(ServerSigChan)

    select {

    case

    ServerSigChan

    case

    ServerSigChan

    }

    }

    func ShutDownServer(err error) {

    log.Println(err)

    ServerSigChan

    log.Println("数据库优雅退出")

    os.Exit(1)

    }

    part2 初始化数据库连接

    package src

    import (

    "github.com/jinzhu/gorm"

    "time"

    )

    var DBHelper *gorm.DB

    var err error

    func InitDB() {

    DBHelper, err = gorm.Open("mysql", "root:root@(115.159.59.129:3306)/test?charset=utf8&parseTime=True&loc=Local")

    if err != nil {

    ShutDownServer(err) //关闭连接

    }

    DBHelper.SingularTable(true)

    DBHelper.DB().SetMaxIdleConns(10)

    DBHelper.DB().SetMaxOpenConns(100)

    DBHelper.DB().SetConnMaxLifetime(time.Hour)

    }

    启动项目

    func main(){

    router := gin.Default()

    go InitDB() //协程初始化数据库连接

    if v, ok := binding.Validator.Engine().(*validator.Validate); ok {

    v.RegisterValidation("topicurl", TopicUrl)

    }

    v1 := router.Group("/v2/mtopics")

    {

    v2.DELETE("/mtopics", DelMTopic)

    }

    router.Run()

    }

    展开全文
  • 当把打包好的jar包发布到服务器,并通过java -jar运行,一般要把springboot项目关闭大多数都是先找到项目的pid,然后直接kill pid,不过这种方法在特殊需求场景下不太合适(不安全),同时也不优雅。下面通过actuator...

    当把打包好的jar包发布到服务器,并通过java -jar运行,一般要把springboot项目关闭大多数都是先找到项目的pid,然后直接kill pid,不过这种方法在特殊需求场景下不太合适(不安全),同时也不优雅。下面通过actuator来让springboot项目关闭优雅化。

    先导入maven依赖

    org.springframework.boot

    spring-boot-starter-actuator

    然后切换到application.yaml文件,添加management:

    server:

    # 自定义端口

    port: 12358

    # 不允许远程管理连接(不允许外部调用保证安全)

    address: 127.0.0.1

    endpoints:

    web:

    exposure:

    # 公开的端点

    include: shutdown

    # 自定义管理端点的前缀(保证安全)

    base-path: /myshutdown

    endpoint:

    shutdown:

    enabled: true

    这样配置后,目前的关闭地址是: http://127.0.0.1:12358/myshutdown/shutdown

    只是只需要在本地运行: curl -X POST   http://127.0.0.1:12358/myshutdown/shutdown

    命令就可以关闭spring boot 项目了

    展开全文
  • 一、关闭问题 在“单体应用”复杂到一定程度后,一般会进行系统拆分,也就是时下流行的微服务架构。服务拆分之后,自然就需要协同,于是 RPC 框架就出来了,它用来解决各个子系统之间的通信问题。 拆分之后可以...
  • Netty如何实现服务的优雅关闭

    千次阅读 2020-12-24 16:26:14
    1 优雅关闭的常见作用 最常见的,比如业务开发中,服务突然异常,刚进来的用户请求还在,通过优雅关闭,给他们 30s 时间继续执行,以免直接报错出去。 2 Netty 优雅关闭流程图 修改 NioEventLoop 的 State 标志位 ...
  • SpringBoot 2.3.0 优雅关闭 shutdown graceful

    千次阅读 2021-01-13 17:57:56
    缘起最近看到Springboot 新版有了 优雅关闭的新特性,于是学习验证了下学习很简单 ,在application.yml 中配置 server.shutdown=graceful即可 ,启用该选项,web容器在关闭时会有始有终的处理完已经接收到的请求,...
  • 线程池优雅停机方案 简介 在开发中使用线程池去执行异步任务是比较普遍的操作,然而虽然有些异步操作我们并不十分要求可靠性和实时性,但总归业务还是需要的。如果在每次的服务发版过程中,我们不去介入线程池的停机...
  • java优雅关闭io流

    2021-03-06 06:38:10
    教科书关闭的方法是每个都需要判断是否为空,每个都调用close。而且每个都需要在自己的try catch里面调用如下图InputStream is=null;InputStreamReader isr=null;BufferedReader read=null;try {is = new ...
  • SpringBoot自带优雅关闭

    2021-08-04 10:26:55
    使用 #开启优雅关闭 服务器将支持的关闭类型 : GRACEFUL:优雅 IMMEDIATE:即时 server.shutdown=GRACEFUL #关闭缓冲时间 任何阶段(具有相同“阶段”值的 SmartLifecycle bean 组)关闭的超时时间 spring.lifecycle...
  • 除了上面这种延时队列的方式外,Redisson还提供了另一种方式,也能优雅关闭订单,方法很简单,就是通过对将要过期的key值的监听。 创建一个类继承KeyExpirationEventMessageListener,重写其中的onMessage方法,...
  • 网络编程中,当一段发送完数据,想要关闭连接时,一般调用close().但close是不“优雅”的,首先看一下close()到底执行了什么。 一、close 以服务端调用close(sockfd)为例: 使得sockfd的引用计数减1,当前进程不能再...
  • JAVA线程池如何优雅关闭 Effective JAVA 第三版并发部分提起了线程池优雅关闭的问题,意识到之前的线程关闭知识还不完善。讨论如下: 1.shutdown() 基本意思是:启动有序关闭,其中先前提交的任务将被执行关闭,...
  • TARS 服务优雅关闭

    2021-04-01 12:49:56
    TARS 优雅关闭 TARS是腾讯开源的微服务调用框架,框架基础设施(RegistryServer,NodeServer,ConfigServer等)是由C++编写,框架上的微服务支持C++、Java、Go等。在众多介绍TARS的使用文章中都是介绍服务的开发...
  • php的普通server(非swoole)因为是解释型的,更新代码无需...而golang实现的常驻内存server就需要做到平滑关闭,否则并发量大的时候,因为重启而导致部分连接终端,从而导致数据库脏乱,也会给业务带来很大的麻烦。
  • 优雅停机 主要应用在版本更新的时候,为了等待正在工作的线程全部执行完毕,然后再停止 2.3及以上版本 1.maven引入依赖 <dependency> <groupId>org.springframework.boot</groupId> <...
  • 那也不一定,毕竟所谓的优雅关闭,其实另一面就意味这关闭得慢,因此项目的优雅关闭得看项目的核心程度,换言之其实就是看该项目处理的数据是不是核心数据,其实项目的最终本质,是对数据的处理。 如何实现优雅关闭 ...
  • 总结 这篇文章主要讲解了如何优雅关闭一个线程,首先我们应该避免使用stop()方法,这种方法简单粗暴但具有不确定性,容易造成bug,正确的做法是通过两阶段终止方案,先发出中断请求,设置线程为中断状态,当线程...
  • 优雅关闭(半关闭)

    2021-08-10 20:11:17
    } /*关闭输出流,但是还可以接收数据,输入流没有关闭,执行这个函数以后, 客户端会收到一个EOF*/ shutdown(clnt_sd, SHUT_WR); read(clnt_sd, buf, BUF_SIZE); printf("Message from client: %s \n", buf...
  • 静态代码执行 //创建钩子线程优雅关闭线程池 Runtime.getRuntime().addShutdownHook(new Thread(() -> { executorService.shutdown(); log.info("[Thread1] shutdown success"); }));
  • Tomcat优雅关闭之路

    2021-02-10 10:47:21
    本文通过阅读Tomcat启动和关闭流程的源码,深入分析不同的Tomcat关闭方式背后的原理,让开发人员能够了解在使用不同的关闭方式时需要注意的点,避免因JVM进程异常退出导致的各种非预见性错误。0. 写在前面tomcat ...
  • 简介:本文将介绍优雅关闭 gRPC 微服务。在进程收到关闭信号时,我们需要关闭后台运行的逻辑,比如,MySQL 连接等等。 介绍 本文将介绍优雅关闭 gRPC 微服务。 什么是优雅关闭? 在进程收到关闭信号时,我们需要...
  • 但是JVM对外部资源的引用却无法自动回收,例如数据库连接,网络连接以及输入输出IO流等,这些连接就需要我们手动去关闭,不然会导致外部资源泄露,连接池溢出以及文件被异常占用等。传统的手动释放外部资源一般放在...
  • Golang优雅关闭channel

    2021-11-10 14:45:39
    Golang优雅关闭channel的方法示例 https://www.jb51.net/article/128530.htm 奇怪的是WIN10跑他的代码 我没有问题!! package main import ( "fmt" "sync" "time" ) func main() { jobs := make(chan int...
  • 优雅关闭JAVA应用

    2021-10-15 18:26:24
    2 spring优雅关闭的原理就是利用JVM提供的钩子函数 Runtime.getRuntime().addShutdownHook(Thread t); 3 shutdown一般会做哪几件事 1 回收资源 (1)线程池 (2)一些注册(dubbo的接口暴露注册) 2 应用...
  • Java 程序优雅关闭

    2021-04-23 15:38:37
    JVM支持在程序kill的时候根据kill 信号进行优雅关闭。 首先我们需要在Runtime.getRuntime().addShutdownHook(thread);中注册要执行的内容。 public class GracefulTest { public static void main(String[] args) ...
  • 概述优雅关闭:在关闭前,执行正常的关闭过程,释放连接和资源,如我们操作系统执行 shutdown。目前业务系统组件众多,互相之间调用关系也比较复杂,一个组件的下线、关闭会涉及到多个组件 对...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 59,568
精华内容 23,827
关键字:

优雅关闭