精华内容
下载资源
问答
  • goroutine
    千次阅读
    2018-08-21 16:51:50

    并发概念

      回到在 Windows 和 Linux 出现之前的古老年代,程序员在开发程序时并没有并发的概念,因为命令式程序设计语言是以串行为基础的,程序会顺序执行每一条指令,整个程序只有一个执行上下文,即一个调用栈,一个堆。并发则意味着程序在运行时有多个执行上下文,对应着多个调用栈。我们知道每一个进程在运行时,都有自己的调用栈和堆,有一个完整的上下文,而操作系统在调度进程的时候,会保存被调度进程的上下文环境,等该进程获得时间片后,再恢复该进程的上下文到系统中。

      从整个操作系统层面来说,多个进程是可以并发的,那么并发的价值何在?下面我们先看以下几种场景。

    • 一方面我们需要灵敏响应的图形用户界面,一方面程序还需要执行大量的运算或者 IO 密集操作,而我们需要让界面响应与运算同时执行。
    • 当我们的 Web 服务器面对大量用户请求时,需要有更多的Web 服务器工作单元来分别响应用户。
    • 我们的事务处于分布式环境上,相同的工作单元在不同的计算机上处理着被分片的数据。
    • 计算机的 CPU 从单内核(core)向多内核发展,而我们的程序都是串行的,计算机硬件的能力没有得到发挥。
    • 我们的程序因为 IO 操作被阻塞,整个程序处于停滞状态,其他 IO 无关的任务无法执行。

      从以上几个例子可以看到,串行程序在很多场景下无法满足我们的要求。下面归纳了并发程序的几条优点,可以发现并发的势在必行:

    • 并发能更客观地表现问题模型;
    • 并发可以充分利用 CPU 核心的优势,提高程序的执行效率;
    • 并发能充分利用 CPU 与其他硬件设备固有的异步性。

      现在我们已经意识到并发的好处了,那么到底有哪些方式可以实现并发执行呢?就目前而言,并发包含以下几种主流的实现模型。

    • 多进程。多进程是在操作系统层面进行并发的基本模式。同时也是开销最大的模式。在 Linux 平台上,很多工具链正是采用这种模式在工作。比如某个 Web 服务器,它会有专门的进程负责网络端口的监听和链接管理,还会有专门的进程负责事务和运算。这种方法的好处在于简单、进程间互不影响,坏处在于系统开销大,因为所有的进程都是由内核管理的。
    • 多线程。多线程在大部分操作系统上都属于系统层面的并发模式,也是我们使用最多的最有效的一种模式。目前,我们所见的几乎所有工具链都会使用这种模式。它比多进程的开销小很多,但是其开销依旧比较大,且在高并发模式下,效率会有影响。
    • 基于回调的非阻塞/异步 IO。这种架构的诞生实际上来源于多线程模式的危机。在很多高并发服务器开发实践中,使用多线程模式会很快耗尽服务器的内存和 CPU 资源。而这种模式通过事件驱动的方式使用异步 IO,使服务器持续运转,且尽可能地少用线程,降低开销,它目前在 Node.js 中得到了很好的实践。但是使用这种模式,编程比多线程要复杂,因为它把流程做了分割,对于问题本身的反应不够自然。
    • 协程。协程(Coroutine)本质上是一种用户态线程,不需要操作系统来进行抢占式调度,且在真正的实现中寄存于线程中,因此,系统开销极小,可以有效提高线程的任务并发性,而避免多线程的缺点。使用协程的优点是编程简单,结构清晰;缺点是需要语言的支持,如果不支持,则需要用户在程序中自行实现调度器。目前,原生支持协程的语言还很少。

      接下来先诠释一下传统并发模型的缺陷,之后再所说 goroutine 并发模型是如何逐一解决这些缺陷的。

      人的思维模式可以认为是串行的,而且串行的事务具有确定性。线程类并发模式在原先的确定性中引入了不确定性,这种不确定性给程序的行为带来了意外和危害,也让程序变得不可控。线程之间通信只能采用共享内存的方式。为了保证共享内存的有效性,我们采取了很多措施,比如加锁等,来避免死锁或资源竞争。实践证明,我们很难面面俱到,往往会在工程中遇到各种奇怪的故障和问题。

      我们可以将之前的线程加共享内存的方式归纳为共享内存系统,虽然共享内存系统是一种有效的并发模式,但它也暴露了众多使用上的问题。计算机科学家们在近40年的研究中又产生了一种新的系统模型,称为消息传递系统

      对线程间共享状态的各种操作都被封装在线程之间传递的消息中,这通常要求:发送消息时对状态进行复制,并且在消息传递的边界上交出这个状态的所有权。从逻辑上来看,这个操作与共享内存系统中执行的原子更新操作相同,但从物理上来看则非常不同。由于需要执行复制操作,所以大多数消息传递的实现在性能上并不优越,但线程中的状态管理工作通常会变得更为简单。

      最早被广泛应用的消息传递系统是由 C. A. R. Hoare 在他的 Communicating Sequential Processes 中提出的。在 CSP 系统中,所有的并发操作都是通过独立线程以异步运行的方式来实现的。这些线程必须通过在彼此之间发送消息,从而向另一个线程请求信息或者将信息提供给另一个线程。使用类似 CSP 的系统将提高编程的抽象级别。

      随着时间的推移,一些语言开始完善消息传递系统,并以此为核心支持并发,比如 Erlang。

    协程

      执行体是个抽象的概念,在操作系统层面有多个概念与之对应,比如操作系统自己掌管的进程(process)、进程内的线程(thread)以及进程内的协程(coroutine,也叫轻量级线程)。与传统的系统级线程和进程相比,协程的最大优势在于其轻量级,可以轻松创建上百万个而不会导致系统资源衰竭,而线程和进程通常最多也不能超过1万个。这也是协程也叫轻量级线程的原因。

      多数语言在语法层面并不直接支持协程,而是通过库的方式支持,但用库的方式支持的功能也并不完整,比如仅仅提供轻量级线程的创建、销毁与切换等能力。如果在这样的轻量级线程中调用一个同步 IO 操作,比如网络通信、本地文件读写,都会阻塞其他的并发执行轻量级线程,从而无法真正达到轻量级线程本身期望达到的目标。

      Go 语言在语言级别支持轻量级线程,叫 goroutine。Go 语言标准库提供的所有系统调用操作(当然也包括所有同步 IO 操作),都会出让 CPU 给其他 goroutine。这让事情变得非常简单,让轻量级线程的切换管理不依赖于系统的线程和进程,也不依赖于 CPU 的核心数量。

    goroutine

      goroutine是 Go 语言中的轻量级线程实现,由 Go 运行时(runtime)管理。你将会发现,它的使用出人意料得简单。

      假设我们需要实现一个函数 Add(),它把两个参数相加,并将结果打印到屏幕上,具体代码如下:

    func Add(x, y int) {
        z := x + y
        fmt.Println(z)
    }

      那么,如何让这个函数并发执行呢?具体代码如下:

    go Add(1, 1)

      是不是很简单?

      你应该已经猜到,go 这个单词是关键。与普通的函数调用相比,这也是唯一的区别。的确,go 是 Go 语言中最重要的关键字,这一点从 Go 语言本身的命名即可看出。

      在一个函数调用前加上 go 关键字,这次调用就会在一个新的 goroutine 中并发执行。当被调用的函数返回时,这个 goroutine 也自动结束了。需要注意的是,如果这个函数有返回值,那么这个返回值会被丢弃

    package main
    
    import "fmt"
    
    func Add(x, y int) {
        z := x + y
        fmt.Println(z)
    }
    
    func main() {
        for i := 0; i < 10; i++ {
            go Add(i, i)
        }
    }

      在上面的代码里,在一个 for 循环中调用了10次 Add() 函数,它们是并发执行的。可是当你编译执行了上面的代码,就会发现一些奇怪的现象:

      “什么?!屏幕上什么都没有,程序没有正常工作!”

      是什么原因呢?明明调用了10次 Add(),应该有10次屏幕输出才对。要解释这个现象,就涉及 Go 语言的程序执行机制了。

      Go 程序从初始化 main package 并执行 main() 函数开始,当 main() 函数返回时,程序退出,且程序并不等待其他 goroutine(非主 goroutine)结束。

      对于上面的例子,主函数启动了10个 goroutine,然后返回,这时程序就退出了,而被启动的执行 Add(i, i) 的 goroutine 没有来得及执行,所以程序没有任何输出。

      OK,问题找到了,怎么解决呢?提到这一点,估计写过多线程程序的读者就已经恍然大悟,并且摩拳擦掌地准备使用类似 WaitForSingleObject、sleep 之类的调用,或者写个自己很拿手的忙等待或者稍微先进一些的 sleep 循环等待来等待所有线程执行完毕。

      在 Go 语言中有自己推荐的方式,它要比这些方法都优雅得多。

      要让主函数等待所有 goroutine 退出后再返回,如何知道 goroutine 都退出了呢?这就引出了多个 goroutine 之间通信的问题。

    并发通信

      从上面的例子中可以看到,关键字 go 的引入使得在 Go 语言中并发编程变得简单而优雅,但我们同时也应该意识到并发编程的原生复杂性,并时刻对并发中容易出现的问题保持警惕。别忘了,我们的例子还不能正常工作呢。

      事实上,不管是什么平台,什么编程语言,不管在哪,并发都是一个大话题。话题大小通常也直接对应于问题的大小。并发编程的难度在于协调,而协调就要通过交流。从这个角度看来,并发单元间的通信是最大问题。

      在工程上,有两种最常见的并发通信模型:共享数据消息

      共享数据是指多个并发单元分别保持对同一个数据的引用,实现对该数据的共享。被共享的数据可能有多种形式,比如内存数据块、磁盘文件、网络数据等。在实际工程应用中最常见的无疑是内存了,也就是常说的共享内存

      先看看在 C 语言中通常是怎么处理线程间数据共享的,如下代码清单所示。

    #include <stdio.h>
    #include <stdlib.h>
    #include <pthread.h>
    
    void *count();
    
    pthread_mutex_t mutex1 = PTHREAD_MUTEX_INITIALIZER;
    
    int counter = 0;
    
    main()
    {
        int rc1, rc2;
        pthread_t thread1, thread2;
    
        /* 创建线程,每个线程独立执行函数functionC */
        if((rc1 = pthread_create(&thread1, NULL, &add, NULL)))
        {
            printf("Thread creation failed: %d\n", rc1);
        }
        if((rc2 = pthread_create(&thread2, NULL, &add, NULL)))
        {
            printf("Thread creation failed: %d\n", rc2);
        }
    
        /* 等待所有线程执行完毕 */
        pthread_join( thread1, NULL);
        pthread_join( thread2, NULL);
        exit(0);
    }
    
    void *count()
    {
        pthread_mutex_lock( &mutex1 );
        counter++;
        printf("Counter value: %d\n",counter);
        pthread_mutex_unlock( &mutex1 );
    }

      现在尝试将这段 C 语言代码直接翻译为 Go 语言代码,如下代码清单所示。

    package main
    
    import "fmt"
    import "sync"
    import "runtime"
    
    var counter int = 0
    
    func Count(lock *sync.Mutex) {
        lock.Lock()
        counter++
        fmt.Println(z)
        lock.Unlock()
    }
    
    func main() {
        lock := &sync.Mutex{}
    
        for i := 0; i < 10; i++ {
            go Count(lock)
        }
    
        for {
            lock.Lock()
            c := counter
            lock.Unlock()
            runtime.Gosched()
            if c >= 10 {
                break
            }
        }
    }

      此时这个例子终于可以正常工作了。在上面的例子中,我们在10个 goroutine 中共享了变量 counter。每个 goroutine 执行完成后,将 counter 的值加1。因为10个 goroutine 是并发执行的,所以我们还引入了锁,也就是代码中的 lock 变量。每次对 n 的操作,都要先将锁锁住,操作完成后,再将锁打开。在主函数中,使用 for 循环来不断检查 counter 的值(同样需要加锁)。当其值达到10时,说明所有 goroutine 都执行完毕了,这时主函数返回,程序退出。

      事情好像开始变得糟糕了。实现一个如此简单的功能,却写出如此臃肿而且难以理解的代码。想象一下,在一个大的系统中具有无数的锁、无数的共享变量、无数的业务逻辑与错误处理分支,那将是一场噩梦。这噩梦就是众多 C/C++ 开发者正在经历的,其实 Java 和 C# 开发者也好不到哪里去。

      Go 语言既然以并发编程作为语言的最核心优势,当然不至于将这样的问题用这么无奈的方式来解决。Go 语言提供的是另一种通信模型,即以消息机制而非共享内存作为通信方式。

      消息机制认为每个并发单元是自包含的、独立的个体,并且都有自己的变量,但在不同并发单元间这些变量不共享。每个并发单元的输入和输出只有一种,那就是消息。这有点类似于进程的概念,每个进程不会被其他进程打扰,它只做好自己的工作就可以了。不同进程间靠消息来通信,它们不会共享内存。

      Go 语言提供的消息通信机制被称为 channel接下来我们将详细记录 channel。现在,用 Go 语言社区的那句著名的口号来结束这一小节:

      不要通过共享内存来通信,而应该通过通信来共享内存。

    更多相关内容
  • 工作包使用一个无缓冲的通道来创建一个可以执行工作的 goroutine 池。 维护池中挂起和活动请求数量的统计信息。 在池操作期间,可以在池中添加和删除 Goroutine。 这允许实施控制端口来调整池的大小以获得最大性能...
  • wasps是用于golang的轻量级goroutine池,使用有限的goroutine来实现多任务并发执行。 WaSP中文| 中文简介wasps是一个轻量级的goroutine池,它实现了多个goroutine的调度管理。 特点:自动调度goroutine。 提供常用...
  • 向通道发送一次消息只有一个goroutine能收到数据,goroutine向一个通道取数据类似于银行里一个柜台排队取钱,goroutine是那排在长长的队伍,一个通道(channel)就是一个柜台,只有等前一个goroutine取完数据之后,...
  • 一般一个业务很少不用到goroutine的,因为很多方法是需要等待的,例如http.Server.ListenAndServe这个就是等待的,除非关闭了Server或Listener,否则是不会返回的。除非是一个API服务器,否则肯定需要另外起...
  • #使用goroutines 此存储库包含可在找到的博客文章的一些示例代码 #启动并运行 git clone git@github.... 运行./goroutine-example 您可以通过注释/取消注释主函数中的函数调用在示例之间切换。
  • 今天来学习Go语言的Goroutine机制,这也可能是Go语言最为吸引人的特性了,理解它对于掌握Go语言大有裨益,话不多说开始吧! 通过本文你将了解到以下内容: 什么是协程以及横向对比优势 Go语言的Goroutine机制底层...
  • gevent-goroutine 两种轻量级处理技术对比:gevent vs goroutine 安装 安装 apachebench 按照 [apachebench-standalone] ( ) 安装 [apr 和 apr-util] ( ) 编译 ab 跑步 去 go run go_server.go w3m ...
  • go-floc Floc:轻松编排goroutine。 该项目的目标是使并行运行goroutine并使它们同步的过程变得容易。 公告万岁! 新版本v2是relea go-floc Floc:轻松编排goroutine。 该项目的目标是使并行运行goroutine并使它们...
  • 这是一个goroutine池,可以避免在高并发情况下大量的创建和销毁性能消耗,确保模块的稳定调度,并自动扩展协同程序池的大小以适合当前的业务调度。 workerPool这是一个goroutine池,它可以避免在高并发情况下创建和...
  • 基于golang的动态协程池实现 功能 控制程序的协程数 动态修改程序的协程数 。。。 实例代码见example / main.go
  • EVA Package EVA实现了一个固定的goroutine池,用于管理和回收无限制任务队列的大量goroutine,从而允许开发人员限制由您的并发pro EVA创建的goroutine池的数量。EVA Package EVA实现了一个用于管理和回收大量...
  • Limiter是一个Golang库,用于限制来自任意数量goroutine的work。 当您需要限制特定操作最大并发调用数时,这非常有用。
  • ants是一个高性能的协程池,实现了对大规模goroutine的调度管理、goroutine复用,允许使用者在开发并发程序的时候限制协程数量,复用资源,达到更高效执行任务的效果。
  • 下面小编就为大家分享一篇Golang 探索对Goroutine的控制方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
  • 主要给大家介绍了关于Golang中for-loop与goroutine问题的相关资料,文中通过示例代码介绍的非常详细,对大家学习或者使用golang具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧。
  • pool - 一个有限的goroutine消费者或无限goroutine池用于轻松实现goroutine 处理和取消
  • 主要介绍了go语言执行等待直到后台goroutine执行完成的方法,实例分析了Go语言中WaitGroup的使用技巧,需要的朋友可以参考下
  • 今天写代码的时候突发奇想goroutine中是否可以再运行一个goroutine,初步的想法是可以的,因为main函数运行的时候其实是一个主goroutine,在主goroutine里面再运行一个goroutine是没问题的,那么我们一个普通的...

    前言

    今天写代码的时候突发奇想goroutine中是否可以再运行一个goroutine,初步的想法是可以的,因为main函数运行的时候其实是一个主goroutine,在主goroutine里面再运行一个goroutine是没问题的,那么我们一个普通的goroutine运行一个goroutine不也是同理?

    代码

    我们直接上代码

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func main() {
    	go func() {
    		go func() {
    			time.Sleep(time.Duration(3) * time.Second)
    			fmt.Println("hey")
    		}()
    		time.Sleep(time.Duration(7) * time.Second)
    	}()
    	//time.Sleep(time.Duration(8) * time.Second)
    
    }
    

    发现上面代码运行什么也没打印直接退出,想了半天是主goroutine创建完goroutine就退出,导致goroutine还没执行子goroutine程序就结束了,所以把上述代码的注释去掉

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func main() {
    	go func() {
    		go func() {
    			time.Sleep(time.Duration(3) * time.Second)
    			fmt.Println("hey")
    		}()
    		time.Sleep(time.Duration(7) * time.Second)
    	}()
    	time.Sleep(time.Duration(8) * time.Second)
    
    }
    

    下面会提及第一层goroutine(主goroutine),第二层goroutine(第一个go func(){}()),第三层goroutine(最里面的go func (){}())

    再运行果真打印了hey,这个时候换个玩法,讲中间的goroutine sleep时间改为3,第三层goroutine的sleep时间改为7,按照我们上面的推测应该不会打印,因为第二层go func在创建第三层goroutine后sleep 3秒就退出,第三层goroutine需要sleep7秒才能打印,所以按照常理第三层goroutine应该还在sleep的时候创建它的goroutine(第二层goroutine)就已经凉了,所以他也不会打印,看下面代码

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func main() {
    	go func() {
    		go func() {
    			time.Sleep(time.Duration(7) * time.Second)
    			fmt.Println("hey")
    		}()
    		time.Sleep(time.Duration(3) * time.Second)
    	}()
    	time.Sleep(time.Duration(8) * time.Second)
    
    }
    

    运行完后发现还是打印了hey…这是啥情况

    goroutine原理

    为了解决这个问题我们有必要看一下goroutine的原理
    先对比一下goroutine和线程

    goroutine                                   | thread
    ___________________________________________________________________________________________
    Goroutines are managed by the go runtime.   | Operating system threads are managed by kernal.
    ___________________________________________________________________________________________
    Goroutine are not hardware dependent.       | Threads are hardware dependent.
    ___________________________________________________________________________________________
    Goroutines have easy communication medium   | Thread does not have easy communication 
     known as channel.                          | medium.
    ___________________________________________________________________________________________
     Due to the presence of channel one         | Due to lack of easy communication medium 
     goroutine can communicate with other       | inter-threads communicate takes place with
     goroutine with low latency.                | high latency.
    ___________________________________________________________________________________________
      Goroutine does not have ID because        | Threads have their own unique ID because 
      go does not have Thread Local Storage     | they have Thread Local Storage.
    ___________________________________________________________________________________________
      Goroutines are cheaper than threads.      | 	The cost of threads are higher than goroutine.
    ___________________________________________________________________________________________
      They are cooperatively scheduled.         | They are preemptively scheduled.
    ___________________________________________________________________________________________
    They have fasted startup time than threads. | They have slow startup time than goroutines.
    ___________________________________________________________________________________________
    Goroutine has growable segmented stacks.    | Threads does not have growable segmented stacks.
        
    

    在此之前可以先了解一下user-level thread vs kernel level thread 1

    顺带说一句linux中pthreads库关于线程的实现是用的clone,也就是说pthread_create(...)创建的是一个kernel线程

    以下参考Detailed Go routine scheduling2
    根据上述的链接所解释,我们的goroutine是Green thread,由语言实现,我们把操作系统看成一个软件,我们可以发现这个软件有非常多的线程来干一些非常底层的事情,比如直接操作硬件等等,我们把goroutine看成user space的线程,把操作系统自带的线程看成kernel space的线程,一个goroutine最终可能在一个或者多个kernel thread上运行,一个kernel thread也可能对应多个 green thread,因为一个kernel thread都是对应一个功能

    go使用者不能创建kernel thread!!!但是go的runtime确可以 linux下c可以通过pthreads库创建

    go在开始程序的时候会干什么
    先说几个关于goroutine的概念
    processor:这个processor不是cpu里面的核,而是go里面的概念,它用于执行goroutine,维护一个goroutine的列表,processor可以从属一个kernel thread,假设一个processor从属了一个kernel thread,那么其上面的goroutine都在运行,假设processor不从属一个kernel thread,那么其上面的goroutine都需要等待调度,GOMAXPROCS就是设置processor的变量,假如设置大了就有可能发生其上面的goroutine需要等待调度的情况

    首先go会初始化Scheduler,
    其次我们的主goroutine创建一个kernel thread(主goroutine和其他的goroutine不一样),这个主goroutine有一个作用就是监控其他goroutine,这个主goroutine在main函数执行的时候生成,

    创建一个普通goroutine会发生什么
    首先当我们创建一个普通的goroutine我们先指定一个代码片段给goroutine,可以是函数,可以是匿名函数,然后这个普通的goroutine会被添加到processor上,然后这个新的goroutine会包含stack address,PC(program counter)

    goroutine的状态
    goroutine的状态和关系如下图

    exit
    entersyscall
    yield preempt
    park
    newproc
    exitsyscall
    exitsyscall
    execute
    ready
    Gdead
    Grunning
    Gsyscall
    Grunable
    Gidle
    Gwaiting

    解释一下各个状态
    Gilde:字面意思,创建的goroutine什么资源也没有分配

    Grunnable:在分配并且初始化资源后新的gouroutine进入这个状态,且这个goroutine已经关联Processor,并且processor在某个kernel thread之上

    Grunning:running是个什么状态呢,当这个goroutine等待到了cpu的空闲,他将进入Grunning状态,

    Gwait:和传统的wait一样goroutine执行了某个阻塞操作就wait,比如channel操作,IO操作等等,当ready后就便会grunable状态了,等到cpu有空闲再变成grunning状态

    Gsyscall:顾名思义,当goroutine进行syscall的时候变成gsyscall状态,等结束返回成grunable 状态,等待cpu有空闲变回Grunning状态

    回到问题
    有了以上的基础我们回到之前的问题上来,为什么第三层goroutine在第二层goroutine结束后还会继续运行?

    TODO


    1. Difference between user-level and kernel-supported threads? ↩︎

    2. Detailed Go routine scheduling ↩︎

    展开全文
  • 一文掌握 goroutine channel select 的使用

    本文为极客时间 Go 语言第一课 相关章节学习笔记及思考。

    Go 语言之父 Rob Pike 的经典名言:“不要通过共享内存来通信,应该通过通信来共享内存(Don’t communicate by sharing memory, share memory by communicating)

    C/C++ 线程的复杂性:

    线程退出时要考虑新创建的线程是否要与主线程分离(detach)
    还是需要主线程等待子线程终止(join)并获取其终止状态?
    又或者是否需要在新线程中设置取消点(cancel point)来保证被主线程取消(cancel)的时候能顺利退出

    goroutine 是由 Go 运行时(runtime)负责调度的、轻量的用户级线程。

    优势:

    1. 占用内存小,goroutine 初始栈只有 2k,比 Linux 线程小多了
    2. 用户态调度,不需要内核介入,代价更小
    3. 一退出就会被回收
    4. 提供 channel 通信

    无论是 Go 自身运行时代码还是用户层 Go 代码,都无一例外地运行在 goroutine 中。

    goroutine

    调用函数、方法(具名、匿名、闭包都可以)时,前面加上 go 关键字,就会创建一个 goroutine。

    goroutine 调度原理

    Goroutine 调度器的任务:将 Goroutine 按照一定算法放到不同的操作系统线程中去执行。

    演进:

    • G-M 模型(废弃):将 G(Goroutine) 调度到 M(Machine) 上运行
    • G-P-M 模型(使用中):增加中间层 P(Processor),提供队列管理多个 G,然后在合适的时候绑定 M。先后使用协作式、抢占式调度。
    • NUMA 调度模型(尚未实现)

    【G-P-M调度图】

    图片来自:https://time.geekbang.org/column/article/476643

    • G:存储 Goroutine 的执行信息,包括:栈、状态
    • P:逻辑处理器,有一个待调度的 G 队列
    • M:真正的计算资源,Go 代码运行的真实载体(用户态线程),要执行 G 需要绑定 P,绑定后会从 P 的本地队列和全局队列获取 G 然后执行
    
    //src/runtime/runtime2.go
    type g struct {
        stack      stack   // offset known to runtime/cgo
        sched      gobuf
        goid       int64
        gopc       uintptr // pc of go statement that created this goroutine
        startpc    uintptr // pc of goroutine function
        ... ...
    }
    
    type p struct {
        lock mutex
    
        id          int32
        status      uint32 // one of pidle/prunning/...
      
        mcache      *mcache
        racectx     uintptr
    
        // Queue of runnable goroutines. Accessed without lock.
        runqhead uint32
        runqtail uint32
        runq     [256]guintptr
    
        runnext guintptr
    
        // Available G's (status == Gdead)
        gfree    *g
        gfreecnt int32
    
        ... ...
    }
    
    
    
    type m struct {
        g0            *g     // goroutine with scheduling stack
        mstartfn      func()
        curg          *g     // current running goroutine
        ... ...
    }
    

    从 Go 1.2 以后,Go 调度器使用 G-P-M 模型,调度目标:公平地将 G 调度到 P 上运行。

    调度策略:

    1. 常规执行,G 运行超出时间片后抢占调度
    2. G 阻塞在 channel 或者 I/O 上时,会被放置到等待队列,M 会尝试运行 P 的下一个可运行 G;当 G 可运行时,会被唤醒并修改状态,然后放到某个 P 的队列中,等待被绑定 M、执行
    3. G 阻塞在 syscall 上时,执行 G 的 M 也会受影响,会解绑 P、进入挂起状态;syscall 返回后,G 会尝试获取可用的 P,没获取到的话,修改状态,等待被运行

    如果一个 G 任务运行 10ms,sysmon 就会认为它的运行时间太久而发出抢占式调度的请求。
    一旦 G 的抢占标志位被设为 true,那么等到这个 G 下一次调用函数或方法时,运行时就可以将 G 抢占并移出运行状态,放入队列中,等待下一次被调度。

    // $GOROOT/src/runtime/proc.go
    
    
    // forcePreemptNS is the time slice given to a G before it is
    // preempted.
    const forcePreemptNS = 10 * 1000 * 1000 // 10ms
    

    channel

    和线程一样,一个应用内部启动的所有 goroutine 共享进程空间的资源,如果多个 goroutine 访问同一块内存数据,将会存在竞争。

    Go 提供了 channel 作为 goroutine 之间的通信方式,goroutine 可以从 channel 读取数据,处理后再把数据写到 channel 中。

    channel 是和 切片、map 类似的复合类型,使用时需要指定具体的类型:

    c := make(chan int) //c 是一个 int 类型的 channel
    

    和函数一样,channel 也是“第一公民” 身份,可以做变量、参数、返回值等。

    
    func spawn(f func() error) <-chan error {
        c := make(chan error)
    
        go func() {
            c <- f()
        }()
    
        return c
    }
    
    func main() {
        c := spawn(func() error {
            time.Sleep(2 * time.Second)
            return errors.New("timeout")
        })
        fmt.Println(<-c)
    }
    

    main goroutine 与子 goroutine 之间建立了一个元素类型为 error 的 channel,子 goroutine 退出时,会将它执行的函数的错误返回值写入这个 channel,main goroutine 可以通过读取 channel 的值来获取子 goroutine 的退出状态。

    channel 的不同类型

    通过 make 可以创建 2 种类型的 channel:

    1. 无缓冲:读写是同步进行,没有对接人的话会一直阻塞着
    2. 有缓冲:有数据时读不会阻塞;未满时写数据不会阻塞

    下面是无 buffer channel 的测试例子:

    func testNoBufferChannel() {
        var c chan int = make(chan int) //无缓冲,同步进行,没有对接人,就会阻塞住
        //var c chan int = make(chan int, 5)    //有缓冲,容量为 5
    
        //大多数时候,读写要在不同 goroutine,尤其是无缓冲 channel
        go func() {
            fmt.Println("goroutine run")
            b := <-c //读取 channel
            fmt.Println("read from channel: ", b)
        }()
    
        fmt.Println("main goroutine before write")
        c <- 1  //没有 buffer,写入 channel 时会阻塞,直到有读取
        fmt.Println("main goroutine finish")
    }
    

    运行结果:

    main goroutine before write
    goroutine run
    read from channel:  1
    main goroutine finish
    

    和预期一致,主 goroutine 在写入无 buffer 的 channel 时会阻塞,直到 子 goroutine 读取。

    下面是有 buffer channel 的测试例子:

    func testBufferChannel() {
        c := make(chan int, 1)  //有缓冲,容量为 5
    
        //大多数时候,读写要在不同 goroutine,尤其是无缓冲 channel
        go func() {
            fmt.Println("child_goroutine run")
            b := <-c //读取 channel,有数据时不会阻塞
            fmt.Println("child_goroutine read from channel: ", b)
        }()
    
        fmt.Println("main goroutine before write first")
        c <- 1  //有 buffer,写入 channel 时不会阻塞,除非满了
        fmt.Println("main_goroutine first write finish, len:", len(c))
    
        fmt.Println("main_goroutine write second:")
        c <-2
        fmt.Println("main_goroutine finish, len:", len(c))
    
        time.Sleep( 3 * time.Second)    //不加这个子 goroutine 没执行就退出了
    }
    

    运行结果:

    main goroutine before write first
    main_goroutine first write finish, len: 1
    main_goroutine write second:
    child_goroutine run
    child_goroutine read from channel:  1
    main_goroutine finish, len: 1
    

    可以看到

    1. 第一次写完立刻就返回;第二次写时,因为这个 goroutine 已经满了,所以阻塞在写上
    2. 子 goroutine 读取了一次,主 goroutine 才从写上返回

    作为参数的单向类型

    1. 只发送, chan<-
    2. 只接收, <-chan
    func testSingleDirectionChannel() {
    
        f := func(a chan<- int, b <- chan int) {    //a 是只能写入,b 是只能读取
            x := <- a   //编译报错:Invalid operation: <- a (receive from send-only type chan<- int)
            b <- 2      //编译报错:nvalid operation: b <- 2 (send to receive-only type <-chan int)
        }
    }
    

    通常只发送 channel 类型和只接收 channel 类型,会被用作函数的参数类型或返回值,用于限制对 channel 内的操作,或者是明确可对 channel 进行的操作的类型

    普通channel,可以传入函数作为只发送或只接收类型

    关闭 channel

    close(channel) 后,不同语句的结果:

    func testCloseChannel() {
        a := make(chan int)
        close(a)    //先关闭,然后看下几种读取关闭 channel 的结果
        b := <- a
        fmt.Println("关闭后直接读取:", b)  //0
        c, ok := <-a
        fmt.Println("关闭后通过逗号 ok 读取:", c, ok)    //0 false
    
        for v := range a{   //关闭的话直接跳过
            fmt.Println("关闭后通过 for-range 读取", v)
        }
    }
    

    通过“comma, ok” 我们可以知道 channel 是否被关闭。

    一般由发送端负责关闭 channel,原因:

    1. 向一个关闭的 channel 中发送数据,会 panic (⚠️注意了!!!)
    2. 发送端没有办法判断 channel 是否已经关闭。

    len(channel)

    当 ch 为无缓冲 channel 时,len(ch) 总是返回 0;当 ch 为带缓冲 channel 时,len(ch) 返回当前 channel ch 中尚未被读取的元素个数。

    如果只是想知道 channel 中是否有数据、不想阻塞,可以使用 len(channel) 先做检查:

    【len(channel) 的图】

    nil channel

    默认读取一个关闭的 channel,会返回零值。但是读取一个 nil channel,操作将阻塞。

    所以在有些场景下,可能需要手动修改 channel 为 nil,以实现阻塞的效果,比如在 select 语句中。

    无缓冲 channel 的常见用途 🔥

    Go 语言倡导:

    Do not communicate by sharing memory; instead, share memory by communicating.
    不要通过共享内存来通信,而是通过通信来共享内存

    多 goroutine 通信:信号

    基于无 buffer channel,可以实现一对一和一对多的信号传递。

    1.一对一

    type signal struct{}
    
    //接收一个函数,在子 routine 里执行,然后返回一个 channel,用于主 routine 等待
    func spawn(f func()) <-chan signal {
        c := make(chan signal)
        go func() {
            fmt.Println("exec f in child_routine");
            f();
            fmt.Println("f exec finished, write to channel")
            c<- signal{}
        }()
        return c
    }
    
    //测试使用无 buffer channel 实现信号
    func testUseNonBufferChannelImplSignal() {
        //模拟主 routine 等待子 routine
    
        worker := func() {
            fmt.Println("do some work")
            time.Sleep(3 * time.Second)
        }
    
        fmt.Println("start a worker...")
        c := spawn(worker)
    
        fmt.Println("spawn finished, read channel...")
        <-c //读取,阻塞等待
    
        fmt.Println("worker finished")
    }
    

    上面的代码中,主 routine 创建了一个函数,然后在子 routine 中执行,主 routine 阻塞在一个 channel 上,等待子 routine 完成后继续执行。

    执行结果:

    start a worker...
    spawn finished, read channel...
    exec f in child_routine
    do some work
    f exec finished, write to channel
    worker finished
    

    可以看到,这样的确实现了类似“信号”的机制:在一个 routine 中通知另一个 routine。
    如果 channel 的类型复杂些,就可以传递任意数据了!

    struct{} 大小是0,不占内存

    2.一对多

    关闭一个无 buffer channel 会让所有阻塞在这个 channel 上的 read 操作返回,基于此我们可以实现 1 对 n 的“广播”机制。

    var waitGroup sync.WaitGroup
    
    func spawnGroup(f func(ind int), count int, groupSignal chan struct{}) <-chan signal {
    	c := make(chan signal)	//用于让主 routine 阻塞的 channel
    	waitGroup.Add(count)	//等待总数
    
    	//创建 n 个 goroutine
    	for i := 0; i < count; i++ {
    		go func(index int) {
    			<- groupSignal	//读取阻塞,等待通知执行
    
    			//fmt.Println("exec f in child_routine, index: ", i);
    			//⚠️注意上面注释的代码,这里不能直接访问 for 循环的 i,因为这个是复用的,会导致访问的值不是目标值
    
    			fmt.Println("exec f in child_routine, index: ", index);
    			f(index);
    			fmt.Println(index , " exec finished, write to channel")
    
    			waitGroup.Done()
    		}(i + 1)
    	}
    
    	//创建通知主 routine 结束的 routine,不能阻塞当前函数
    	go func() {
    		//需要同步等待所有子 routine 执行完
    		waitGroup.Wait()
    		c <- signal{}	//写入数据
    	}()
    	return c
    }
    
    
    func testUseNonBufferChannelImplGroupSignal() {
    	worker := func(i int) {
    		fmt.Println("do some work, index ", i)
    		time.Sleep(3 * time.Second)
    	}
    
    	groupSignal := make(chan struct{})
    	c := spawnGroup(worker, 5, groupSignal)
    
    	fmt.Println("main routine: close channel")
    	close(groupSignal)	//通知刚创建的所有 routine
    
    
    	fmt.Println("main routine: read channel...")
    	<- c	//阻塞在这里
    
    	fmt.Println("main routine: all worker finished")
    }
    
    

    上面的代码做了这些事:

    1. 创建 channelA,传递给多个 goroutine
    2. 子 routine 中读取等待这个 channelA
    3. 主 routine 关闭 channel,然后阻塞在 channelB 上,此时所有子 routine 开始执行
    4. 所有子 routine 执行完后,通过 channelB 唤醒主 routine

    运行结果:

    main routine: close channel
    main routine: read channel
    exec f in child_routine, index:  2
    do some work, index  2
    exec f in child_routine, index:  1
    do some work, index  1
    exec f in child_routine, index:  3
    do some work, index  3
    exec f in child_routine, index:  4
    do some work, index  4
    exec f in child_routine, index:  5
    do some work, index  5
    4  exec finished, write to channel
    5  exec finished, write to channel
    3  exec finished, write to channel
    1  exec finished, write to channel
    2  exec finished, write to channel
    main routine: all worker finished
    

    一句话总结:
    用 2 个 channel 实现了 【主 routine 通知所有子 routine 开始】 和【子 routine 通知主 routine 任务结束】。

    多 goroutine 同步:通过阻塞,替代锁

    type NewCounter struct {
    	c chan int
    	i int
    }
    
    func CreateNewCounter() *NewCounter {
    	counter := &NewCounter{
    		c: make(chan int),
    		i: 0,
    	}
    
    	go func() {
    		for {
    			counter.i ++
    			counter.c <- counter.i		//每次加一,阻塞在这里
    		}
    	}()
    
    	return counter
    }
    
    func (c *NewCounter)Increase() int {
    	return <- c.c		//读取到的值,是上一次加一
    }
    
    //多协程并发增加计数,通过 channel 写入阻塞,读取时加一
    func testCounterWithChannel() {
    	fmt.Println("\ntestCounterWithChannel ->>>")
    
    	group := sync.WaitGroup{}
    	counter := CreateNewCounter()
    
    	for i:=0 ; i<10 ; i++ {
    		group.Add(1)
    
    		go func(i int) {
    			count := counter.Increase()
    			fmt.Printf("Goroutine-%d, count %d \n", i, count)
    		}(i)
    	}
    
    	group.Wait()
    
    }
    

    上面的代码中,我们创建了一个单独的协程,在其中循环增加计数,但每次加一后,就会尝试写入 channel(无 buffer 的),在没有人读取时,会阻塞在这个方法上。

    然后在 10 个协程里并发读取 channel,从而实现每次读取递增。

    带缓冲 channel 的常见用途 🔥

    消息队列

    channel 的特性符合对消息队列的要求:

    1. 跨 goroutine 访问安全
    2. FIFO
    3. 可设置容量
    4. 异步收发

    Go 支持 channel 的初衷是将它作为 Goroutine 间的通信手段,它并不是专门用于消息队列场景的。
    如果你的项目需要专业消息队列的功能特性,比如支持优先级、支持权重、支持离线持久化等,那么 channel 就不合适了,可以使用第三方的专业的消息队列实现。

    计数信号量

    由于带 buffer channel 的特性(容量满时写入会阻塞),可以用它的容量表示同时最大并发数量。

    下面是一个例子:

    var active = make(chan struct{}, 3)	//"信号量",最多 3 个
    var jobs = make(chan int, 10)
    
    //使用带缓存的 channel,容量就是信号量的大小
    func testSemaphoreWithBufferChannel() {
    
    	//先写入数据,用作表示任务
    	go func() {
    		for i:= 0; i < 9; i++ {
    			jobs <- i + 1
    		}
    		close(jobs)
    	}()
    
    	var wg sync.WaitGroup
    
    	for j := range jobs {
    		wg.Add(1)
    
    		//执行任务
    		go func(i int) {
    			//通知开始执行,当容量用完时,阻塞
    			active <- struct{}{}
    
    			//fmt.Println("exec job ", i)
    			log.Printf("exec job: %d, length of active: %d \n", i, len(active))
    			time.Sleep(2 * time.Second)
    
    			//执行完,通知结束
    			<- active
    			wg.Done()
    
    		}(j)
    	}
    
    	wg.Wait()
    }
    

    上面的代码中,我们用 channel jobs 表示要执行的任务(这里为 8 个),然后用 channel active 表示信号量(最多三个)。

    然后在 8 个 goroutine 里执行任务,每个任务耗时 2s。在每次执行任务前,先写入 channel 表示获取信号量;执行完后读取,表示释放信号量。

    由于信号量最多三个,所以同一时刻最多能有 3 个任务得以执行。

    运行结果如下,符合预期:

    2022/04/20 19:14:26 exec job: 1, length of active: 1 
    2022/04/20 19:14:26 exec job: 9, length of active: 2 
    2022/04/20 19:14:26 exec job: 5, length of active: 3 
    2022/04/20 19:14:28 exec job: 6, length of active: 3 
    2022/04/20 19:14:28 exec job: 7, length of active: 3 
    2022/04/20 19:14:28 exec job: 8, length of active: 3 
    2022/04/20 19:14:30 exec job: 3, length of active: 3 
    2022/04/20 19:14:30 exec job: 2, length of active: 3 
    2022/04/20 19:14:30 exec job: 4, length of active: 3 
    

    select

    当需要在一个 goroutine 同时读/写多个 channel 时,可以使用 select:

    类似 Linux 的 I/O 多路复用思路,我们可以叫它:goroutine 多路复用。

    
    func testSelect() {
        channelA := make(chan int)
        channelB := make(chan int)
    
        go func() {
            var readA bool
            var readB bool
    
            for {
                select {
                case x := <- channelA:
                    fmt.Println("child_routine: read from channelA:", x)
                    readA = true
                case y := <- channelB:
                    fmt.Println("child_routine:  read from channelB:", y)
                    readB = true
                //default:
                //  //其他 case 阻塞,就执行 default
                //  fmt.Println("default")
                }
    
                if readA && readB {
    
                    fmt.Println("child_goroutine finish")
                    return;
                } else {
                    fmt.Println("child_goroutine still loop, ", readA, readB)
                }
            }
        }()
    
        fmt.Println("main goroutine")
    
        time.Sleep(2 * time.Second)
    
        fmt.Println("main goroutine, write to channelA")
        channelA <- 111
        fmt.Println("main goroutine, write to channelA finish")
    
        time.Sleep(1 * time.Second)
    
        fmt.Println("main goroutine, write to channelB")
        channelB <- 111
        fmt.Println("main goroutine, write to channelB finish")
    
        time.Sleep( 5 * time.Second)
        fmt.Println("main goroutinefinish")
    }
    

    输出:

    main goroutine
    main goroutine, write to channelA
    main goroutine, write to channelA finish
    child_routine: read from channelA: 111
    child_goroutine still loop,  true false
    main goroutine, write to channelB
    main goroutine, write to channelB finish
    child_routine:  read from channelB: 111
    child_goroutine finish
    main goroutinefinish
    

    可以看到:

    1. 使用 select 在一个 goroutine 里读取了 2 个 channel
    2. 这 2 个 case 里的 channel 都不可读时,select 阻塞,只会执行 default,不会执行 select 代码块以外的
    3. 主 goroutine 写入数据后,select 的其中一个 case 返回,然后继续执行 select 后面的逻辑
    4. 下一轮循环后 2 个 case 都不可读,继续阻塞
    5. 然后主 goroutine 写入后,另外一个 case 也返回,循环结束

    channel 与 select 结合的常见用途 🔥

    利用 default 分支避免阻塞

    select 的 default 分支语义:当所有 case 语句里读/写 channel 阻塞时,会执行 default!

    无论 channel 是否有 buffer。

    有些时候,我们可能不希望阻塞在写入 channel 上,那可以利用 select default 的特性,这样封装一个函数,当写入阻塞时,返回一个 false,让外界可以处理阻塞的情况:

    func tryWriteChannel(c chan<- int, value int) bool {
    	select {
    	case c <- value
    		return true
    	default:	//其他没就绪时,会执行
    		return false
    	}
    }
    

    这样使用:

    			//active <- 1		//之前直接写 channel,如果满了,就会阻塞
    			writed := tryWriteChannel(active, 1)	//改成这样,可以在阻塞时,处理相关逻辑
    			if !writed {
    				log.Println("failed to write channel")
    				return
    			}
    

    实现超时

    假如我们想在一个 channel 的读/写操作上加一个超时逻辑,可以通过这样实现:
    在 select 代码块中,加一个 case,这个 case 会在超时后执行,这样会结束其他 case。

    比如这样:

    func tryGetSemaphore(c chan<- struct{}) bool {
    	select {
    	case c <- struct {}{}:
    		return true
    	case <- time.After(1 * time.Second):		//在写 channel 的基础上,额外加一个情况,超时情况
    		log.Println("timeout!!!")
    		//1s 后返回,可以在这里做超时处理
    		return true
    	}
    }
    

    及时调用 timer 的 Stop 方法回收 Timer 资源。

    心跳机制

    循环执行一个额外的 case,这个 case 会定时返回。

    
    func worker() {
      heartbeat := time.NewTicker(30 * time.Second)
      defer heartbeat.Stop()
      for {
        select {
        case <-c:
          // ... do some stuff
        case <- heartbeat.C:
          //... do heartbeat stuff
        }
      }
    }
    

    time.NewTicker 会创建一个定时执行的心跳,可以把这个 ticker channel 读取的操作放到一个 case 里,这样 select 代码块就会定时执行一次。

    ticker 也要及时 Stop。

    总结

    本文介绍了 Golang 中通过 goroutine channel 和 select 实现并发操作的一些典型场景。

    可以看到,通过 goroutine 实现并发是如此的简单;通过 channel 无 buffer 和有 buffer,实现一些 goroutine 同步机制也比较方便;结合 select,实现 goroutine 的统一管理。

    在学习一门语言时,既要结合已有的语言知识,也要吸收新语言的设计思想。

    需要记住的是,Go 提倡通过 CSP 模型(communicating sequential processes 通信顺序进程)进行通信,而不是传统语言的共享内存方式。

    CSP:两个独立的并发实体通过共享 channel(管道)进行通信的并发模型。

    我们在遇到多 goroutine 通信、同步的情况,可以尽量多使本文的内容进行处理。

    不过对于某些情况,也可以使用 go 提供的 sync 包下的内容,进行局部同步。下篇文章我们就来看看这些内容。

    对于局部情况,比如涉及性能敏感的区域或需要保护的结构体数据时,我们可以使用更为高效的低级同步原语(如 mutex),保证 goroutine 对数据的同步访问。

    展开全文
  • Goroutine原理

    2021-08-10 01:07:22
    title: Goroutine原理 category: Goroutine date: 2021-07-25 22:19:29 tags: Golang GMP Algorithm 有图文地址 Goroutine 定义 “Goroutine 是一个与其他 goroutines 并行运行在同一地址空间的 Go 函数或方法。...

    title: Goroutine原理
    category: Goroutine
    date: 2021-07-25 22:19:29
    tags:

    • Golang
    • GMP
    • Algorithm

    有图文地址

    Goroutine

    定义

    “Goroutine 是一个与其他 goroutines 并行运行在同一地址空间的 Go 函数或方法。一个运行的程序由一个或更多个 goroutine 组成。它与线程、协程、进程等不同。它是一个 goroutine” —— Rob Pike

    Goroutines 在同一个用户地址空间里并行独立执行 functions,channels 则用于 goroutines 间的通信和同步访问控制。

    goroutine 与thread有何区别

    • 内存占用,创建一个 goroutine 的栈内存消耗为 2 KB(Linux AMD64 Go v1.4后),运行过程中,如果栈空间不够用,会自动进行扩缩容
      创建一个 thread 为了尽量避免极端情况下操作系统线程栈的溢出,默认会为其分配一个较大的栈内存( 1 - 8 MB 栈内存,线程标准 POSIX Thread),而且还需要一个被称为 “guard page” 的区域用于和其他 thread 的栈空间进行隔离。而栈内存空间一旦创建和初始化完成之后其大小就不能再有变化,这决定了在某些特殊场景下系统线程栈还是有溢出的风险。

    • 创建/销毁,线程创建和销毀都会有巨大的消耗,是内核级的交互(trap)。
      POSIX 线程(定义了创建和操纵线程的一套 API)通常是在已有的进程模型中增加的逻辑扩展,所以线程控制和进程控制很相似。而进入内核调度所消耗的性能代价比较高,开销较大。goroutine 是用户态线程,是由 go runtime 管理,创建和销毁的消耗非常小。

    • 调度切换
      抛开陷入内核,线程切换会消耗 1000-1500 纳秒(上下文保存成本高,较多寄存器,公平性,复杂时间计算统计),一个纳秒平均可以执行 12-18 条指令。
      所以由于线程切换,执行指令的条数会减少 12000-18000。goroutine 的切换约为 200 ns(用户态、3个寄存器,现在甚至达到了100~120ns),相当于 2400-3600 条指令。因此,goroutines 切换成本比 threads 要小得多

    • 复杂性
      线程的创建和退出复杂,多个 thread 间通讯复杂(share memory)。
      不能大量创建线程(参考早期的 httpd),成本高,使用网络多路复用,存在大量callback(参考twemproxy、nginx 的代码)。对于应用服务线程门槛高,例如需要做第三方库隔离,需要考虑引入线程池等。

    M:N模型

    Go 创建 M 个线程(CPU 执行调度的单元,内核的 task_struct),之后创建的 N 个 goroutine 都会依附在这 M 个线程上执行,即 M:N 模型。它们能够同时运行,与线程类似,但相比之下非常轻量。因此,程序运行时,Goroutines 的个数应该是远大于线程的个数的(phread 是内核线程?)。

    同一个时刻,一个线程只能跑一个 goroutine当 goroutine 发生阻塞 (chan 阻塞、mutex、syscall 等等) 时,Go 会把当前的 goroutine 调度走,让其他 goroutine 来继续执行,而不是让线程阻塞休眠,尽可能多的分发任务出去,让 CPU 忙

    GMP调度模型

    GMP概念

    • G

    goroutine 的缩写,每次 go func() 都代表一个 G,无限制。
    使用 struct runtime.g,包含了当前 goroutine 的状态、堆栈、上下文

    • M

    工作线程(OS thread)也被称为 Machine,使用 struct runtime.m,所有 M 是有线程栈的。
    如果不对该线程栈提供内存的话,系统会给该线程栈提供内存(不同操作系统提供的线程栈大小不同)。当指定了线程栈,则 M.stack→G.stack,M 的 PC 寄存器(下一个指令执行寄存器)指向 G 提供的函数,然后去执行。

    • P

    “Processor”是一个抽象的概念,并不是真正的物理 CPU。

    Dmitry Vyukov 的方案是引入一个结构 P,它代表了 M 所需的上下文环境,也是处理用户级代码逻辑的处理器。它负责衔接 M 和 G 的调度上下文,将等待执行的 G 与 M 对接。当 P 有任务时需要创建或者唤醒一个 M 来执行它队列里的任务。所以 P/M 需要进行绑定,构成一个执行单元。P 决定了并行任务的数量,可通过 runtime.GOMAXPROCS 来设定。在 Go1.5 之后GOMAXPROCS 被默认设置可用的核数,而之前则默认为1。

    Tips: https://github.com/uber-go/automaxprocs

    Automatically set GOMAXPROCS to match Linux container CPU quota.

    mcache/stackalloc 从 M 移到了 P,而 G 队列也被分成两类,保留全局 G 队列,同时每个 P 中都会有一个本地的 G 队列。

    GM调度器

    Go 1.2前的调度器实现,限制了 Go 并发程序的伸缩性,尤其是对那些有高吞吐或并行计算需求的服务程序。

    每个 goroutine 对应于 runtime 中的一个抽象结构:G,而 thread 作为“物理 CPU”的存在而被抽象为一个结构:M(machine)。当 goroutine 调用了一个阻塞的系统调用,运行这个 goroutine 的线程就会被阻塞,这时至少应该再创建/唤醒一个线程来运行别的没有阻塞的 goroutine。线程这里可以创建不止一个,可以按需不断地创建,而活跃的线程(处于非阻塞状态的线程)的最大个数存储在变量 GOMAXPROCS中。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gXVyacxS-1628528830605)(Goroutine原理/GM-Schedule.png)]

    GM调度模型的问题

    • 单一全局互斥锁(Sched.Lock)和集中状态存储
      导致所有 goroutine 相关操作,比如:创建、结束、重新调度等都要上锁。

    • Goroutine 传递问题
      M 经常在 M 之间传递”可运行”的 goroutine,这导致调度延迟增大以及额外的性能损耗刚创建的 G 放到了全局队列,而不是本地 M 执行,不必要的开销和延迟)。

    • Per-M 持有内存缓存 (M.mcache)
      每个 M 持有 mcachestackalloc,然而只有在 M 运行 Go 代码时才需要使用内存(每个 mcache 可以高达2mb),当 M 在处于 syscall 时并不需要这个内存。运行 Go 代码和阻塞在 syscall 的 M 的比例高达1:100,造成了很大的浪费。同时内存亲缘性也较差,G 当前在 M 运行后对 M 的内存进行了预热,因为现在* G 调度到同一个 M 的概率不高,数据局部性不好*。

    • 严重的线程阻塞/解锁
      系统调用的情况下,工作线程经常被阻塞和取消阻塞,这增加了很多开销。比如 M 找不到G,此时 M 就会进入频繁阻塞/唤醒来进行检查的逻辑,以便及时发现新的 G 来执行。
      by Dmitry Vyukov “Scalable Go Scheduler Design Doc”

    GMP调度器

    引入了local queue,因为 P 的存在,runtime 并不需要做一个集中式的 goroutine 调度,每一个 M 都会在 P's local queueglobal queue 或者其他 P 队列中找 G执行,减少全局锁对性能的影响

    这也是 GMP Work-stealing 调度算法的核心。注意 P 的本地 G 队列还是可能面临一个并发访问的场景,为了避免加锁,这里 P 的本地队列是一个 LockFree的队列,窃取 G 时使用 CAS 原子操作来完成。关于LockFree 和 CAS 的知识参见 Lock-Free。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KFRpeY2L-1628528830606)(Goroutine原理/GMP-schedule模型.png)]

    Work-stealing调度算法

    Work-stealing

    当一个 P 执行完本地所有的 G 之后,并且全局队列为空的时候,会尝试挑选一个受害者 P,从它的 G 队列中窃取一半的 G。否则会从全局队列中获取(当前个数/GOMAXPROCS)个 G。

    为了保证公平性,从随机位置上的 P 开始,而且遍历的顺序也随机化了(选择一个小于 GOMAXPROCS,且和它互为质数的步长),保证遍历的顺序也随机化了。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FftVPaxU-1628528830607)(Goroutine原理/work-stealing-p0.png)]

    光窃取失败时获取是不够的,可能会导致全局队列饥饿。P 的调度算法中还会每个 N (1/61 time)轮调度之后就去全局队列拿一个 G

    谁放入的全局队列呢?

    新建 G 时 P 的本地 G 队列放不下已满并达到256个的时候会放半数 G 到全局队列去,阻塞的系统调用返回时找不到空闲 P 也会放到全局队列。

    Syscall

    调用 syscall 后M会解绑 P,然后**MG 进入阻塞**,而 P 此时的状态就是 syscall表明这个 P 的 G 正在 syscall 中,这时的 P 是不能被调度给别的 M 的。如果在短时间内阻塞的 M 就唤醒了,那么 M 会优先来重新获取这个 P,能获取到就继续绑回去,这样有利于数据的局部性

    系统监视器 (system monitor),称为 sysmon,会定时扫描。在执行 syscall 时, 如果某个 P 的 G 执行超过一个sysmon tick(10ms),就会把他设为 idle重新调度给需要的 M,强制解绑

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gPG9e1ui-1628528830608)(Goroutine原理/syscall-handoffs-p-sysmon.png)]

    P1 和 M 脱离后目前在 idle list 中等待被绑定(处于 syscall 状态)。而 syscall 结束后 M 按照如下规则执行直到满足其中一个条件:

    1. 尝试获取同一个 P(P1),恢复执行 G
    2. 尝试获取 idle list 中的其他空闲 P,恢复执行 G
    3. 找不到空闲 P,把 G 放回 global queue,M 放回到 idle list

    Spining thread

    线程自旋是相对于线程阻塞而言的,表象就是循环执行一个指定逻辑(调度逻辑,目的是不停地寻找 G)。这样做的问题显而易见,如果 G 迟迟不来,CPU 会白白浪费在这无意义的计算上。但好处也很明显,降低了 M 的上下文切换成本,提高了性能。在两个地方引入自旋:

    • 类型1:M 不带 P 的找 P 挂载(一有 P 释放就结合)
    • 类型2:M 带 P 的找 G 运行(一有 runable 的 G 就执行)

    为了避免过多浪费 CPU 资源,自旋的 M 数量最多只允许 GOMAXPROCS (Busy P)。同时当有类型1的自旋 M 存在时,类型2的自旋 M 就不阻塞,阻塞会释放 P,一释放 P 就马上被类型1的自旋 M 抢走了,没必要。

    新 G 被创建M 进入系统调用M 从空闲被激活这三种状态变化前,调度器会确保至少有一个自旋 M 存在(唤醒或者创建一个 M),除非没有空闲的 P

    • 当新 G 创建,如果有可用 P,就意味着新 G 可以被立即执行,即便不在同一个 P 也无妨,所以我们保留一个自旋的 M(这时应该不存在类型1的自旋只有类型2的自旋)就可以保证新 G 很快被运行。

    • 当 M 进入系统调用,意味着 M 不知道何时可以醒来,那么 M 对应的 P 中剩下的 G 就得有新的 M 来执行,所以我们保留一个自旋的 M 来执行剩下的 G(这时应该不存在类型2的自旋只有类型1的自旋)。

    • 如果 M 从空闲变成活跃,意味着可能一个处于自旋状态的 M 进入工作状态了,这时要检查并确保还有一个自旋 M 存在,以防还有 G 或者还有 P 空着的。

    GMP问题总结

    • 单一全局互斥锁(Sched.Lock)和集中状态存储
      G 被分成全局队列和 P 的本地队列,全局队列依旧是全局锁,但是使用场景明显很少,P 本地队列使用无锁队列,使用原子操作来面对可能的并发场景

    • Goroutine 传递问题
      G 创建时就在 P 的本地队列,可以避免在 G 之间传递(窃取除外),G 对 P 的数据局部性好; 当 G 开始执行了,系统调用返回后 M 会尝试获取可用 P,获取到了的话可以避免在 M 之间传递。而且优先获取调用阻塞前的 P,所以 G 对 M 数据局部性好,G 对 P 的数据局部性也好。

    • Per-M 持有内存缓存 (M.mcache)
      内存 mcache 只存在 P 结构中,P 最多只有 GOMAXPROCS 个,远小于 M 的个数,所以内存没有过多的消耗。

    • 严重的线程阻塞/解锁
      通过引入自旋,保证任何时候都有处于等待状态的自旋 M,避免在等待可用的 P 和 G 时频繁的阻塞和唤醒

    by Dmitry Vyukov “Scalable Go Scheduler Design Doc”

    sysmon

    sysmon 也叫监控线程,它无需 P 也可以运行,他是一个死循环,每20us~10ms循环一次,循环完一次就 sleep一会,为什么会是一个变动的周期呢,主要是避免空转,如果每次循环都没什么需要做的事,那么 sleep 的时间就会加大。

    • 释放闲置超过5分钟的 span物理内存
    • 如果超过2分钟没有垃圾回收,强制执行
    • 将长时间未处理的 netpoll添加到全局队列
    • 向长时间运行的 G 任务发出抢占调度
    • 收回syscall长时间阻塞的 P

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-8siHGXsR-1628528830609)(Goroutine原理/preempt-by-sysmon.png)]

    当 P 在 M 上执行时间超过10mssysmon调用 preemptone将 G 标记为 stackPreempt。因此需要在某个地方触发检测逻辑,Go 当前是在检查栈是否溢出的地方判定(morestack()),M 会保存当前 G 的上下文,重新进入调度逻辑。

    死循环:issues/11462

    信号抢占:go1.14基于信号的抢占式调度实现原理

    异步抢占,注册 sigurg信号,通过 sysmon检测,对 M 对应的线程发送信号,触发注册的 handler,它往当前 G 的 PC 中插入一条指令(调用某个方法),在处理完 handler,G 恢复后,自己把自己推到了 global queue 中。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-k4JNLBPb-1628528830609)(Goroutine原理/preempt-by-sysmon-sig.png)]

    Network poller

    Go 所有的 I/O 都是阻塞的。然后通过 goroutine + channel处理并发。因此所有的 IO 逻辑都是直来直去的,你不再需要回调,不再需要 future,要的仅仅是 step by step。这对于代码的可读性是很有帮助的。

    G 发起网络 I/O 操作也不会导致 M 被阻塞(仅阻塞G),从而不会导致大量 M 被创建出来将异步 I/O 转换为阻塞 I/O 的部分称为 netpoller打开或接受连接都被设置为非阻塞模式。如果你试图对其进行 I/O 操作,并且文件描述符数据还没有准备好,G 会进入 gopark 函数,将当前正在执行的 G 状态保存起来,然后切换到新的堆栈上执行新的 G。

    那什么时候 G 被调度回来呢?

    • sysmon
    • schedule():M 找 G 的调度函数
    • GC:start the world

    调用 netpoll() 在某一次调度 G 的过程中,处于就绪状态的 fd 对应的 G 就会被调度回来。

    G 的 gopark 状态:G 置为 waiting 状态,等待显示 goready 唤醒,在 poller 中用得较多,还有锁、chan 等。

    Schedule Affinity

    在 chan 来回通信的 goroutine 会导致频繁的 blocks,即频繁地在本地队列中重新排队。然而,由于本地队列是 FIFO 实现,如果另一个 goroutine 占用线程,unblock goroutine 不能保证尽快运行。同时 Go 亲缘性调度的一些限制:Work-stealing、系统调用。

    goroutine #9 在 chan 被阻塞后恢复。但是,它必须等待#2、#5和#4之后才能运行。goroutine #5将阻塞其线程,从而延迟goroutine #9,并使其面临被另一个 P 窃取的风险。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-85YHJbOc-1628528830610)(Goroutine原理/image-20210718160233704.png)]

    针对 communicate-and-wait 模式,进行了亲缘性调度的优化。Go 1.5 在 P 中引入了 runnext特殊的一个字段,可以高优先级执行 unblock G

    goroutine #9现在被标记为下一个可运行的这种新的优先级排序允许 goroutine 在再次被阻塞之前快速运行。这一变化对运行中的标准库产生了总体上的积极影响,提高了一些包的性能。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HXwFb8OS-1628528830610)(Goroutine原理/affinity-schedule.png)]

    Goroutine Lifecycle

    Go程序启动

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1WLgPfgt-1628528830610)(Goroutine原理/runtime-main.png)]

    整个程序始于一段汇编,而在随后的runtime·rt0_go(也是汇编程序)中,会执行很多初始化工作。

    • 绑定 m0 和 g0m0就是程序的主线程,程序启动必然会拥有一个主线程,这个就是 m0。g0 负责调度,即 shedule() 函数
    • 创建 P绑定 m0 和 p0,首先会创建 GOMAXPROCS 个 P ,存储在 sched空闲链表(pidle)
      新建任务 g 到 p0 本地队列m0 的 g0 会创建一个 指向 runtime.main() 的 g ,并放到 p0 的本地队列
    • runtime.main(): 启动 sysmon线程;启动 GC 协程;执行 init,即代码中的各种 init 函数;执行 main.main(用户程序main) 函数。

    OS thread 创建

    准备运行的新 goroutine 将唤醒 P 以更好地分发工作。这个 P 将创建一个与之关联的 M 绑定到一个 OS thread

    go func() 中 触发 Wakeup唤醒机制:

    有空闲的 P 而没有在 spinning状态的 M 时候, 需要去唤醒一个*空闲(睡眠)的 M *或者新建一个。当线程首次创建时,会执行一个特殊的 G,即 g0,它负责管理和调度 G

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-P5ONomPk-1628528830611)(Goroutine原理/OS-thread.png)]

    g0

    Go 基于两种断点将 G 调度到线程上:

    • G 阻塞时:系统调用、互斥锁或 chan。阻塞的 G 进入睡眠模式/进入队列,并允许 Go 安排和运行等待其他的 G。
    • 函数调用期间,如果 G 必须扩展其堆栈。这个断点允许 Go 调度另一个 G 并避免运行 G 占用 CPU。

    在这两种情况下,运行调度程序的 g0 将当前 G 替换为另一个 G,即 ready to run。然后,选择的 G 替换 g0 并在线程上运行。与常规 G 相反,g0 有一个固定和更大的栈

    • Defer 函数的分配
    • GC 收集,比如 STW、扫描 G 的堆栈和标记、清除操作
    • 栈扩容,当需要的时候,由 g0 进行扩栈操作

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gUOz2uqg-1628528830611)(Goroutine原理/G0-schedule.png)]

    Schedule

    在 Go 中,G 的切换相当轻便,其中需要保存的状态仅仅涉及以下两个

    • Goroutine 在停止运行前执行的指令,程序当前要运行的指令记录在程序计数器(PC)中的, G 稍后将在同一指令处恢复运行;
    • G 的堆栈,以便在再次运行时还原局部变量

    在切换之前,堆栈将被保存,以便在 G 再次运行时进行恢复.

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jxyCoeCw-1628528830611)(Goroutine原理/schedule-time.png)]

    从 g 到 g0 或从 g0 到 g 的切换是相当迅速的,它们只包含少量固定的指令。相反,对于调度阶段,调度程序需要检查许多资源以便确定下一个要运行的 G

    • 当前 g 阻塞在 chan 上并切换到 g0:1、PC 和堆栈指针一起保存在内部结构中;2、将 g0 设置为正在运行的 goroutine;3、g0 的堆栈替换当前堆栈;
    • g0 寻找新的 Goroutine 来运行
    • g0 使用所选的 Goroutine 进行切换: 1、PC 和堆栈指针是从其内部结构中获取的;2、程序跳转到对应的 PC 地址;

    Goroutine Recycle

    G 很容易创建,栈很小以及快速的上下文切换。基于这些原因,开发人员非常喜欢并使用它们。然而,一个产生许多 shortlive 的 G 的程序将花费相当长的时间来创建和销毁它们

    每个 P 维护一个 freelist G,保持这个列表是本地的,这样做的好处是不使用任何锁来 push/get 一个空闲的 G。当 G 退出当前工作时,它将被 push 到这个空闲列表中

    为了更好地分发空闲的 G ,调度器也有自己的列表。它实际上有两个列表:一个包含已分配栈的 G,另一个包含释放过堆栈的 G(无栈)

    锁保护 central list,因为任何 M 都可以访问它。当本地列表长度超过64时,调度程序持有的列表从 P 获取 G。然后一半的 G 将移动到中心列表。需求回收 G 是一种节省分配成本的好方法。但是,由于堆栈是动态增长的,现有的G 最终可能会有一个大栈。因此,当堆栈增长(即超过2K)时,Go 不会保留这些栈

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XDQE6LDW-1628528830612)(Goroutine原理/go-recycle.png)]

    展开全文
  • goroutine并发控制

    2021-11-03 10:36:49
    } func watch(ctx context.Context, name string) { for { select { case (): fmt.Println(name, "over") return default: fmt.Println(name, "running") time.Sleep(2 * time.Second) } } } // output: goroutine 1...
  • Go语言编程笔记7:goroutine和通道 图源:wallpapercave.com goroutine Python中并发的核心概念是协程,Go语言中类似的概念叫做goroutine。虽然两者在原理和使用方式等方面都有很大不同,但都是用于解决并发问题的...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 30,207
精华内容 12,082
关键字:

goroutine

友情链接: 超快宽屏焦点图.rar