精华内容
参与话题
问答
  • Channel

    千次阅读 2020-06-28 17:07:26
    goroutine之间的通信使用channel。数据传送是阻塞式的,发了数据之后必须有人来收数据。 func chanDemo() { //var c chan int // c == nil c := make(chan int) go func() { //这里的匿名函数相当于闭包,引用了...

    goroutine之间的通信使用channel。数据传送是阻塞式的,发了数据之后必须有人来收数据。

    func chanDemo() {
    	//var c chan int // c == nil
    	c := make(chan int)
    	go func() {  //这里的匿名函数相当于闭包,引用了外面的c变量
    		for {
    			n := <-c //开了一个goroutine去接数据
    			fmt.Println(n)
    		}
    	}()
    	c <- 1 //往channel中发生数据
    	c <- 2
    	time.Sleep(time.Millisecond)
    
    }
    
    func main() {
    	chanDemo()
    }
    
    

    channel作为参数

    func woker(id int, c chan int) {
    	for {
    		fmt.Printf("Worker %d received %c\n", id, <-c) //因为协程是主动式非抢占,在遇到I/O操作时,会进行调度
    	}
    }
    
    func chanDemo() {
    	var channels [10]chan int
    	for i := 0; i < 10; i++ {
    		channels[i] = make(chan int)
    		go woker(i, channels[i])
    	}
    
    	for i := 0; i < 10; i++ {
    		channels[i] <- 'a' + i
    	}
    	for i := 0; i < 10; i++ {
    		channels[i] <- 'A' + i
    	}
    	time.Sleep(time.Millisecond) //防止main函数先退出
    
    }
    
    func main() {
    	chanDemo()
    }
    
    

    channel作为返回值

    func createWorker(id int) chan<- int { //返回一个只能往里写数据的channel
    	c := make(chan int) //创建一个可以写数据和读数据的channnel
    	go func() {
    		for {
    			fmt.Printf("Worker %d received %c\n", id, <-c)
    		}
    	}()
    	return c
    }
    
    func chanDemo() {
    	var channels [10]chan<- int //只能往channel中写数据
    	for i := 0; i < 10; i++ {
    		//channels[i] = make(chan int)
    		//go woker(i, channels[i])
    		channels[i] = createWorker(i)
    	}
    
    	for i := 0; i < 10; i++ {
    		channels[i] <- 'a' + i
    	}
    	for i := 0; i < 10; i++ {
    		channels[i] <- 'A' + i
    	}
    	time.Sleep(time.Millisecond)
    
    }
    

    bufferchannel

    func woker(id int, c chan int) {
    	for {
    		fmt.Printf("Worker %d received %c\n", id, <-c) //因为协程是主动式非抢占,在遇到I/O操作时,会进行调度
    	}
    }
    func bufferedChannel() {
    	c := make(chan int, 3) //创建缓冲区为3的channel
    
    	go woker(0, c)
    	//只要有人发数据,就必须有人来接数据。缓冲区为3说明 发送的数据小于3时,若没人来接数据,不会出现错误
    	c <- 'a'
    	c <- 'b'
    	c <- 'c'
    	c <- 'd'
    	time.Sleep(time.Millisecond)
    }
    

    channel是可以被发送方进行关闭的,接收方使用两种方法进行判断。

    func woker(id int, c chan int) {
    	for {
    	//当发送方关闭channel之后,接收方还会进行接收,值为channel对应类型的默认值。因此在这里进行判断是否channel关闭
    		if n, ok := <-c; ok {
    			fmt.Printf("Worker %d received %c\n", id, n) //因为协程是主动式非抢占,在遇到I/O操作时,会进行调度
    		} else {
    			break
    		}
    	}
    	//第二种方法
    	//for n := range c {
    	//	fmt.Printf("Worker %d received %c\n", id, n)
    	//}
    }
    func channelClosed() {
    	c := make(chan int, 3) //创建缓冲区为3的channel
    
    	go woker(0, c)
    	c <- 'a'
    	c <- 'b'
    	c <- 'c'
    	c <- 'd'
    	close(c) 
    	time.Sleep(time.Millisecond)
    }
    

    不要通过共享内存来通信,通过通信来共享内存。
    使用channel等待任务结束。两种方法:使用channel进行传递信息;使用sync.WaitGroup类型的变量。

    	data := make(chan int)
    	exit := make(chan bool)
    	go func() {
    		for d := range data {
    			fmt.Println(d)
    		}
    		fmt.Println("recv over.")
    		exit <- true
    	}()
    
    	data <- 1
    	data <- 2
    	data <- 3
    	close(data)
    	fmt.Println("send over.")
    	<-exit
    
    func doWoker(id int, c chan int, done chan bool) {
    	for n := range c {
    		fmt.Printf("Worker %d received %c\n", id, n)
    		//done <- true //往done中发送了数据,在外面必须有人来接收数据
    		go func() {
    			done <- true
    		}()
    	}
    }
    
    type worker struct {
    	in   chan int
    	done chan bool
    }
    
    func createWorker(id int) worker {
    	w := worker{
    		in:   make(chan int),
    		done: make(chan bool),
    	}
    	go doWoker(id, w.in, w.done)
    	return w
    }
    
    func chanDemo() {
    	var workers [10]worker
    	for i := 0; i < 10; i++ {
    		workers[i] = createWorker(i)
    	}
    
    	for i, worker := range workers {
    		worker.in <- 'a' + i
    		//<-workers[i].done //只有收到了数据说明打印已经完成了,才会往下执行
    	}
    	for i, worker := range workers {
    		worker.in <- 'A' + i
    		//<-workers[i].done
    	}
    
    	//将20个数据全部发出去,然后再进行等待
    	for _, worker := range workers {
    		<-worker.done
    		<-worker.done
    	}
    
    }
    
    
    func doWoker(id int, w worker) {
    	for n := range w.in {
    		fmt.Printf("Worker %d received %c\n", id, n)
    		w.done()
    	}
    }
    
    type worker struct {
    	in   chan int
    	done func()
    }
    
    func createWorker(id int, wg *sync.WaitGroup) worker {
    	w := worker{
    		in: make(chan int),
    		done: func() {
    			wg.Done() //完成一次任务调用一次Done()
    		},
    	}
    	go doWoker(id, w)
    	return w
    }
    
    func chanDemo() {
    	var workers [10]worker
    	var wq sync.WaitGroup
    
    	for i := 0; i < 10; i++ {
    		workers[i] = createWorker(i, &wq)
    	}
    
    	wq.Add(20) //添加20个任务
    	for i, worker := range workers {
    		worker.in <- 'a' + i
    
    	}
    	for i, worker := range workers {
    		worker.in <- 'A' + i
    	}
    	wq.Wait()
    }
    
    

    使用select进行调度。

    
    func generator() chan int {
    	out := make(chan int)
    	go func() {
    		i := 0
    		for {
    			time.Sleep(
    				time.Duration(rand.Intn(1500)) *
    					time.Millisecond)
    			out <- i
    			i++
    		}
    	}()
    	return out
    }
    func woker(id int, c chan int) {
    	for n := range c {
    		time.Sleep(time.Second)
    		fmt.Printf("Worker %d received %d\n", id, n)
    	}
    }
    func createWorker(id int) chan<- int {
    	c := make(chan int)
    	go woker(id, c)
    	return c
    }
    
    func main() {
    	//从c1, c2中读出数据 写入worker中
    	var c1, c2 = generator(), generator()
    	worker := createWorker(0)
    
    	var values []int //生成和消耗的速度不一样,需要存储接收到的数据
    
    	tm := time.After(10 * time.Second) //返回的是一个channel,10s后往这个channel中发生一个时间
    	tick := time.Tick(time.Second)
    	for {
    		var activeWorker chan<- int // activeWorker 是 nil,在select虽然不会运行错误,但是永远不是被select到
    		var activeValue int
    		if len(values) > 0 {
    			activeWorker = worker
    			activeValue = values[0]
    		}
    
    		select {
    		case n := <-c1:
    			values = append(values, n)
    		case n := <-c2:
    			values = append(values, n)
    		case activeWorker <- activeValue:
    			values = values[1:]
    		case <-time.After(800 * time.Millisecond): //如果两次生成数据的时间大于800ms,则会timeout
    			fmt.Println("Time out")
    		case <-tick: //每1s查看一下数据的长度
    			fmt.Println("queue lens = ", len(values))
    		case <-tm:
    			fmt.Println("Bye")
    			return
    		}
    	}
    }
    

    传统的同步机制:WaitGroup,mutex,conditional variable

    type atomicInt struct {
    	value int
    	m     sync.Mutex
    }
    
    func (a *atomicInt) increasement() {
    	fmt.Println("safe increasement")
    	func() {
    		a.m.Lock()
    		defer a.m.Unlock()
    		a.value++
    	}()
    
    }
    
    func (a *atomicInt) get() int {
    	a.m.Lock()
    	defer a.m.Unlock()
    	return a.value
    }
    

    select语句的用法


    通过select可以监听channel上的数据流动。
    每一个case语句里必须是一个I/O操作。

    elect {
    
          case <-chan1:
    
            // 如果chan1成功读到数据,则进行该case处理语句
    
          case chan2 <- 1:
    
            // 如果成功向chan2写入数据,则进行该case处理语句
    
          default:
    
            // 如果上面都没有成功,则进入default处理流程
    
        }
    

    在一个select语句中,Go语言会按顺序从头至尾评估每一个发送和接收的语句。

    如果其中的任意一语句可以继续执行(即没有被阻塞),那么就从那些可以执行的语句中任意选择一条来使用。

    如果没有任意一条语句可以执行(即所有的通道都被阻塞),那么有两种可能的情况:

    • 如果给出了default语句,那么就会执行default语句,同时程序的执行会从select语句后的语句中恢复。

    • 如果没有default语句,那么select语句将被阻塞,直到至少有一个通信可以进行下去。

    go的CSP模型


    go语言支持两种形式的并发:多线程共享内存;CSP并发模型。

    多线程共享内存模型:在访问数据的时候,通过锁来访问。

    CSP并发模型:通过channel和goroutine实现。goroutine是Go中并发的执行单位,channel是各个并发单位之前的通信机制,通俗来说就是各个goroutine之间的管道

    传数据用channel <- data,取数据用<-channel,在通信过程中,传数据channel <- data和取数据<-channel必然会成对出现,而且不管传还是取,必阻塞,直到另外的goroutine传或者取为止。

    参考一

    参考二

    展开全文
  • channel

    2018-08-23 11:00:32
    基本语法 ...channel 是进程内的通信方式,因此通过 channel 传递对象的过程和调用函数时的参数传递行为比较一致,比如也可以传递指针等。如果需要跨进程通信,建议用分布式系统的方法来解决,比如...

      channel 是 Go 语言在语言级别提供的 goroutine 间的通信方式。我们可以使用 channel 在两个或多个 goroutine 之间传递消息。channel 是进程内的通信方式,因此通过 channel 传递对象的过程和调用函数时的参数传递行为比较一致,比如也可以传递指针等。如果需要跨进程通信,建议用分布式系统的方法来解决,比如使用 Socket 或者 HTTP 等通信协议。Go 语言对于网络方面也有非常完善的支持。

      channel 是类型相关的。也就是说,一个 channel 只能传递一种类型的值,这个类型需要在声明 channel 时指定。如果对 Unix 管道有所了解的话,就不难理解 channel,可以将其认为是一种类型安全的管道。

      在了解 channel 的语法前,我们先看下用 channel 的方式重写上面的例子是什么样子的,以此对 channel 先有一个直感的认识,如下代码清单所示。

    package main
    
    import "fmt"
    
    func Count(ch chan int) {
        ch <- 1
        fmt.Println("Counting")
    }
    
    func main() {
        chs := make([]chan int 10)
        for i := 0; i < 10; i++ {
            chs[i] = make(chan int)
            go Count(chs[i])
        }
    
        for _, ch := range(chs) {
            <-ch
        }
    }

      在这个例子中,定义了一个包含10个 channel 的数组(名为 chs),并把数组中的每个 channel 分配给10个不同的 goroutine。在每个 goroutine 的 Add() 函数完成后,我们通过 ch <- 1 语句向对应的 channel 中写入一个数据。在这个 channel 被读取前,这个操作是阻塞的。在所有的 goroutine 启动完成后,我们通过 <-ch 语句从10个 channel 中依次读取数据。在对应的 channel 写入数据前,这个操作也是阻塞的。这样,我们就用 channel 实现了类似锁的功能,进而保证了所有 goroutine 完成后主函数才返回。是不是比共享内存的方式更简单、优雅呢?

      我们在使用 Go 语言开发时,经常会遇到需要实现条件等待的场景,这也是 channel 可以发挥作用的地方。对 channel 的熟练使用,才能真正理解和掌握 Go 语言并发编程。

    基本语法

      一般 channel 的声明形式为:

    var chanName chan ElementType

      与一般的变量声明不同的地方仅仅是在类型之前加了 chan 关键字。ElementType 指定这个 channel 所能传递的元素类型。举个例子,声明一个传递类型为 int 的 channel:

    var ch chan int

      或者,声明一个 map,元素是 bool 型的 channel:

    var m map[string] chan bool

      上面的语句都是合法的。

      定义一个 channel 也很简单,直接使用内置的函数 make() 即可:

    ch := make(chan int)

      这就声明并初始化了一个 int 型的名为 ch 的 channel。

      在 channel 的用法中,最常见的包括写入和读出。将一个数据写入(发送)至 channel 的语法很直观,如下:

    ch <- value

      向 channel 写入数据通常会导致程序阻塞,直到有其他 goroutine 从这个 channel 中读取数据。从 channel 中读取数据的语法是:

    value := <-ch

      如果 channel 之前没有写入数据,那么从 channel 中读取数据也会导致程序阻塞,直到 channel 中被写入数据为止。之后还会提到如何控制 channel 只接受写或者只允许读取,即单向 channel

    select

      早在 Unix 时代,select 机制就已经被引入。通过调用 select() 函数来监控一系列的文件句柄,一旦其中一个文件句柄发生了 IO 动作,该 select() 调用就会被返回。后来该机制也被用于实现高并发的 Socket 服务器程序。Go 语言直接在语言级别支持 select 关键字,用于处理异步 IO 问题。

      select 的用法与 switch 语言非常类似,由 select 开始一个新的选择块,每个选择条件由 case 语句来描述。与 switch 语句可以选择任何可使用相等比较的条件相比,select 有比较多的限制,其中最大的一条限制就是每个 case 语句里必须是一个 IO 操作,大致的结构如下:

    select {
        case <-chan1:
            // 如果chan1成功读到数据,则进行该case处理语句
        case chan2 <- 1:
            // 如果成功向chan2写入数据,则进行该case处理语句
        default:
            // 如果上面都没有成功,则进入default处理流程
    }

      可以看出,select 不像 switch,后面并不带判断条件,而是直接去查看 case 语句。每个 case 语句都必须是一个面向 channel 的操作。比如上面的例子中,第一个 case 试图从 chan1 读取一个数据并直接忽略读到的数据,而第二个 case 则是试图向 chan2 中写入一个整型数 1,如果这两者都没有成功,则到达 default 语句。

      基于此功能,我们可以实现一个有趣的程序:

    ch := make(chan int, 1)
    
    for {
        select {
            case ch <- 0:
            case ch <- 1:
        }
        i := <-ch
        fmt.Println("Value received:", i)
    }

      能看明白这段代码的含义吗?其实很简单,这个程序实现了一个随机向 ch 中写入一个 0 或者 1 的过程。当然,这是个死循环。

    缓冲机制

      之前示范创建的都是不带缓冲的 channel,这种做法对于传递单个数据的场景可以接受,但对于需要持续传输大量数据的场景就有些不合适了。接下来介绍如何给 channel 带上缓冲,从而达到消息队列的效果。

      要创建一个带缓冲的 channel,其实也非常容易:

    c := make(chan int, 1024)

      在调用 make() 时将缓冲区大小作为第二个参数传入即可,比如上面这个例子就创建了一个大小为 1024 的 int 类型 channel,即使没有读取方,写入方也可以一直往 channel 里写入,在缓冲区被填完之前都不会阻塞。

      从带缓冲的 channel 中读取数据可以使用与常规非缓冲 channel 完全一致的方法,但我们也可以使用 range 关键来实现更为简便的循环读取:

    for i := range c {
        fmt.Println("Received:", i)
    }

    超时机制

      在之前对 channel 的介绍中,我们完全没有提到错误处理的问题,而这个问题显然是不能被忽略的。在并发编程的通信过程中,最需要处理的就是超时问题,即向 channel 写数据时发现 channel 已满,或者从 channel 试图读取数据时发现 channel 为空。如果不正确处理这些情况,很可能会导致整个 goroutine 锁死。

      虽然 goroutine 是 Go 语言引入的新概念,但通信锁死问题已经存在很长时间,在之前的 C/C++ 开发中也存在。操作系统在提供此类系统级通信函数时也会考虑入超时场景,因此这些方法通常都会带一个独立的超时参数。超过设定的时间时,仍然没有处理完任务,则该方法会立即终止并返回对应的超时信息。超时机制本身虽然也会带来一些问题,比如在运行比较快的机器或者高速的网络上运行正常的程序,到了慢速的机器或者网络上运行就会出问题,从而出现结果不一致的现象,但从根本上来说,解决死锁问题的价值要远大于所带来的问题。

      使用 channel 时需要小心,比如对于以下这个用法:

    i := <-ch

      不出问题的话一切都正常运行。但如果出现了一个错误情况,即永远都没有人往 ch 里写数据,那么上述这个读取动作也将永远无法从 ch 中读取到数据,导致的结果就是整个 goroutine 永远阻塞并没有挽回的机会。如果 channel 只是被同一个开发者使用,那样出问题的可能性还低一些。但如果一旦对外公开,就必须考虑到最差的情况并对程序进行保护。

      Go 语言没有提供直接的超时处理机制,但我们可以利用 select 机制。虽然 select 机制不是专为超时而设计的,却能很方便地解决超时问题。因为 select 的特点是只要其中一个 case 已经完成,程序就会继续往下执行,而不会考虑其他 case 的情况。

      基于此特性,我们来为 channel 实现超时机制:

    // 首先,我们实现并执行一个匿名的超时等待函数
    timeout := make(chan bool, 1)
    
    go func() {
        time.Sleep(1e9) // 等待1秒钟
        timeout <- true
    }()
    
    // 然后我们把timeout这个channel利用起来
    select {
        case <- ch:
        // 从ch中读取到数据
        case <- timeout:
        // 一直没有从ch中读取到数据,但从timeout中读取到了数据
    }

      这样使用 select 机制可以避免永久等待的问题,因为程序会在 timeout 中获取到一个数据后继续执行,无论对 ch 的读取是否还处于等待状态,从而达成1秒超时的效果。

      这种写法看起来是一个小技巧,但却是在 Go 语言开发中避免 channel 通信超时的最有效方法。在实际的开发过程中,这种写法也需要被合理利用起来,从而有效地提高代码质量。

    channel的传递

      需要注意的是,在 Go 语言中 channel 本身也是一个原生类型,与 map 之类的类型地位一样,因此 channel 本身在定义后也可以通过 channel 来传递。

      我们可以使用这个特性来实现 *nix 上非常常见的管道(pipe)特性。管道也是使用非常广泛的一种设计模式,比如在处理数据时,可以采用管道设计,这样可以比较容易以插件的方式增加数据的处理流程。

      下面利用 channel 可被传递的特性来实现我们的管道。为了简化表达,假设在管道中传递的数据只是一个整型数,在实际的应用场景中这通常会是一个数据块。

      首先限定基本的数据结构:

    type PipeData struct {
        value int
        handler func(int) int
        next chan int
    }

      然后写一个常规的处理函数。只要定义一系列 PipeData 的数据结构并一起传递给这个函数,就可以达到流式处理数据的目的:

    func handle(queue chan *PipeData) {
        for data := range queue {
            data.next <- data.handler(data.value)
        }
    }

      这里只有大概的样子。同理,利用 channel 的这个可传递特性,可以实现非常强大、灵活的系统架构。相比之下,在 C++、Java、C# 中,要达成这样的效果,通常就意味着要设计一系列接口。

      与 Go 语言接口的非侵入式类似,channel 的这些特性也可以大大降低开发者的心智成本,用一些比较简单却实用的方式来达成在其他语言中需要使用众多技巧才能达成的效果。

    单向channel

      顾名思义,单向 channel 只能用于发送或者接收数据。channel 本身必然是同时支持读写的,否则根本没法用。假如一个 channel 真的只能读,那么肯定只会是空的,因为你没机会往里面写数据。同理,如果一个 channel 只允许写,即使写进去了,也没有丝毫意义,因为没有机会读取里面的数据。所谓的单向 channel 概念,其实只是对 channel 的一种使用限制。

      我们在将一个 channel 变量传递到一个函数时,可以通过将其指定为单向 channel 变量,从而限制该函数中可以对此 channel 的操作,比如只能往这个 channel 写,或者只能从这个 channel 读。

      单向 channel 变量的声明非常简单,如下:

    var ch1 chan int // ch1是一个正常的channel,不是单向的
    var ch2 chan<- float64// ch2是单向channel,只用于写float64数据
    var ch3 <-chan int // ch3是单向channel,只用于读取int数据

      那么单向 channel 如何初始化呢?之前已经提到过,channel 是一个原生类型,因此不仅支持被传递,还支持类型转换。只有在了解了单向 channel 的概念后,才会明白类型转换对于 channel 的意义:就是在单向 channel 和双向 channel 之间进行转换。示例如下:

    ch4 := make(chan int)
    ch5 := <-chan int(ch4) // ch5就是一个单向的读取channel
    ch6 := chan<- int(ch4) // ch6 是一个单向的写入channel

      基于 ch4,通过类型转换初始化了两个单向 channel:单向读的 ch5 和单向写的 ch6。为什么要做这样的限制呢?从设计的角度考虑,所有的代码应该都遵循“最小权限原则”,从而避免没必要地使用泛滥问题,进而导致程序失控。写过 C++ 程序的人肯定就会联想起 const 指针的用法。非 const 指针具备 const 指针的所有功能,将一个指针设定为 const 就是明确告诉函数实现者不要试图对该指针进行修改。单向 channel 也是起到这样的一种契约作用。

      下面来看一下单向 channel 的用法:

    func Parse(ch <-chan int) {
        for value := range ch {
            fmt.Println("Parsing value", value)
        }
    }

      除非这个函数的实现者无耻地使用了类型转换,否则这个函数就不会因为各种原因而对 ch 进行写,避免在 ch 中出现非期望的数据,从而很好地实践最小权限原则。

    关闭channel

      关闭 channel 非常简单,直接使用 Go 语言内置的 close() 函数即可:

    close(ch)

      在介绍了如何关闭 channel 之后,就多了一个问题:如何判断一个 channel 是否已经被关闭?可以在读取的时候使用多重返回值的方式:

    x, ok := <-ch

      这个用法与 map 中的按键获取 value 的过程比较类似,只需要看第二个 bool 返回值即可,如果返回值是 false 则表示 ch 已经被关闭。

      参考:《Go语言编程》

    展开全文
  • 文章目录Channel shutdown: channel error; protocol method: #methodChannel shutdown: channel error; protocol method: #method<channel.close>(rep 1、启动springboot 应用报错 Channel shutdown: ...

    Channel shutdown: channel error; protocol method: #method<channel.close>(rep


    1、启动springboot 应用报错

    Channel shutdown: channel error; protocol method: #method<channel.close>(rep

    2、原因

    当应用启动时,spring 会去检查注册的队列,跟服务器上的队列配置是否一致,如果不一致,则抛出这个错误

    比如你在项目中的配置是

        @Bean(DEAD_LETTER_PROD_CLOSE_ORDER)
        Queue a() {
            Map<String, Object> args = Maps.newHashMap();
            args.put("x-dead-letter-exchange", RabbitMqExchange.ExchangeCenter.DEAD_LETTER_EXCHANGE_CONSUME);
            args.put("x-dead-letter-routing-key", DEAD_LETTER_CONSUME_CLOSE_ORDER);
            args.put("x-message-ttl", 600 * 1000);
    
            return new Queue(DEAD_LETTER_PROD_CLOSE_ORDER, true, false, false, args);
        }
    

    但是服务器上的配置是

    x-message-ttl=1000
    

    则代码配置与现有队列配置不一致,抛出该错误

    3、解决

    方式一、修改项目配置与mq 保持一致

    修改项目配置与MQ一致则可以正常运行


    方式二、删除mq上现有队列
    删除mq 上现有队列,则springboot 自动向mq 服务器注册一个新的队列,也可以解决该问题

    展开全文
  • Golang 深入源码 —— channel

    万次阅读 2020-07-21 15:58:12
    这句话是 Go 语言设计团队的首任负责人 Rob Pike 对并发编程的建议,也是 Go 的并发哲学,通道 Channel 便是基于这种哲学 我们可以把 Channel 看做是一个先进先出(FIFO)的数据队列 数据结构 type hchan struct { ...

    Don’t communicate by sharing memory, share memory by communicating.

    不要通过共享内存来通信,而要通过通信来实现内存共享。

    数据结构

    我们可以把 Channel 看做是一个先进先出(FIFO)的数据队列,那么如何实现这种队列

    channel 的底层数据结构是一个 *hchan,在编译时期会将 make(chan int) 语句转换成 makechan 函数调用

    hchan

    // runtime/chan.go
    type hchan struct {
        lock mutex  // lock 用来保护 hchan 上所有的字段
        
        // 缓冲区实际是一个循环队列
        buf unsafe.Pointer  // 指向缓冲区的指针
        dataqsiz uint   // 缓冲区循环队列的大小
        sendx uint      // 缓冲区循环队列接收下一个元素的索引
        recvx uint      //  缓冲区循环队列中下一个会返回的元素的索引
        
        qcount uint // 当前 hchan 缓存的元素数量
        closed uint32   // hchan 是否关闭
        
        elemsize uint16 // hchan 的元素大小
        elemtype *_type // hchan 的元素类型
        
        recvq waitq     // 等待接收的 goroutine 队列
        sendq waitq     // 等待发送的 goroutine 队列
    }
    

    可以看出 channel 的底层数据结构

    • 缓冲区 buf 底层是一个循环队列,dataqsizqcount 分别记录了缓冲区的大小和当前缓冲的元素数量,sendxrecvx 用来记录位置索引
    • elemsizeelemtype 表示元素大小和类型
    • recvqsendq 来记录被发送接收阻塞的 goroutine 队列
    • closed 用来记录是否关闭
    • lock 用来保护hchan中的字段,更新其他字段的时候都需要加锁

    对于无缓冲 channel 是不需要和缓冲区相关的字段的

    channel 在实现中依然使用到了锁,Go 所说的 使用通信来实现共享内存,实际上依然在底层使用锁来保证读写的原子性,实现出了一个面向数据流式的数据结构

    待发送者和待接收者

    注意到 recvqsendq 类型 waitq 是一个双向链表,提供了等待 goroutine 的出队入队

    // runtime/chan.go
    type waitq struct {
        first *sudog
        last * sudog
    }
    
    func(q *waitq) enqueue(sgp *sudog){
    // ...
    }
    func (q *waitq) dequeue(sgp *sudog){
    // ...
    }
    

    sudog 是对被阻塞的 goroutine 的封装,简单看一下 channel 会使用到的一些字段

    // runtime/runtime2.go
    type sudog struct {
        g       *g              //阻塞的 goroutine
        elem    unsafe.Pointer
        c       *hchan          // 阻塞的 channel
    

    elem 字段是一个指针,在 channel 会被用来指向待发送者要发送的数据或者待接收者的接收位置

    // 从 ch 接收数据被阻塞,那么 sudog.elem 会指向 x
    x <- ch 
    
    // 向 ch 发送数据被阻塞,那么 sudog.elem 会指向 y
    ch <- y 
    

    makechan 创建 channel

    channel 分为无缓冲 channel 和 缓冲 channel,虽然两种 channel 的创建方式不同,但是都是调用 makechan

    ch := make(chan int)    // 无缓冲 channel
        
    ch := make(chan int, 10)// 有缓冲 channel
    

    makechan 函数会接受元素的类型和缓冲的大小,如果 size 为 0,就是无缓冲 channel 了

    // src/runtime/chan.go
    func makechan(t *chantype, size int) *hchan{
        elem := t.elem
        
        // 检查 elem size,align
        
        // 计算出缓冲区的大小,如果是非缓冲 channel 或者元素为 struct{},那么 mem 就是 0
        mem, overflow := math.MulUintptr(elem.size, uintptr(size))
        if overflow || mem > maxAlloc-hchanSize || size < 0{
            panic(plainError("makechan: size out of range"))
        }
        
        var c *hchan
        switch{
        // 非缓冲 channel 或者 缓冲区元素 为 struct{}
        case mem == 0:
            c = (*hchan)(mallocgc(hchanSize, nil, true))
            // 如果是非缓冲,则buf并没有用
            // 如果缓冲元素类型为 struct{}, 则只会用到 sendx 和 recvx, 并不会真正拷贝数据到缓冲区
            c.buf = unsafe.Pointer(&c.buf)
            
        // channel 中元素不包含指针
        case elem.ptrdata == 0:
            // 将 hchan 结构和缓冲区的内存一起分配
            c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
            // buf 指向 hchan 后边的地址
            c.buf = add(unsafe.Pointer(c), hchanSize)
            
        // 默认,分别分配 chan 和 buf 的内存    
        default:
            c = new(hchan)
            c.buf = mallocgc(mem, elem, true)
        }
        
        // 设置 hchan 的其他字段
        c.elemsize = uint16(elem.size)
        c.elemtype = elem
        // 底层循环队列长度
        c.datasiz = uint(size)
        return c
    

    通过 makechan 函数,可以总结出 hchan 结构的特点

    • 无缓冲或者缓冲的元素类型为 struct{} 时,并不会为缓冲区(hcha.buf)分配内存
    • 缓冲的元素结构中不包含指针时,会将 hchan 和 缓冲区buf 是一块连续的内存

    make 与 makechan

    make 函数在编译阶段又是如何转换成 makechan 函数调用的呢

    首先编译器会将 make 的调用转换成 OMAKE 类型的节点,然后判断 make 的对象类型,如果是 TCHAN 的话,将节点类型置为 OMAKECHAN,并且检查 make 的第二个参数,也就是缓冲区大小

    // src/cmd/compile/internal/gc/typecheck.go
    func typecheck1(n *Node, top int) (res *Node) {
        // ...
        switch n.Op{
        case OMAKE:
            switch t.Etype {
            case TCHAN:
                l = nil
                if i < len(args){
                    // ... 对缓冲区大小进行检测
                    n.Left = l  // 带缓冲区,赋值缓冲区大小
                }else{
                    n.Left = nodintconst(0) // 不带缓冲区
                }
                n.Op = OMAKECHAN
            }
        }
    }
    

    然后OMAKECHAN 节点会在 walkexpr 函数中转换成调用 makechan 或者 makechan64 函数

    // src/cmd/compile/internal/gc/walk.go
    func walkexpr(n *Node, init *Nodes) *Node {
        switch n.Op {
        case OMAKECHAN:
            size := n.Left
            fnname := "makechan64"
            argtype := types.Types[TINT64]
    
            if size.Type.IsKind(TIDEAL) || maxintval[size.Type.Etype].Cmp(maxintval[TUINT]) <= 0 {
                fnname = "makechan"
                argtype = types.Types[TINT]
            }
            n = mkcall1(chanfn(fnname, 1, n.Type), n.Type, init, typename(n.Type), conv(size, argtype))
        }
    }
    

    发送数据

    向 channel 发送数据的语句会在编译期间转换成 chansend 函数

    ch := make(chan int)
    ch <- 10
    

    发送语句非常简单,但是真正的函数执行会区分很多的情况,做一些小的优化,可以称为特性

    发送操作的特性

    • 向 nil channel 发送数据会被永久阻塞,并且不会被 select 语句选中
    • 如果 channel 未关闭,非缓冲并且没有待接收的 goroutine,或者缓冲区已满,那么不会被 select 语句选中
    • 向关闭的 channel 发送数据,会 panic ,并且可以被 select 语句选中,意味着 select 语句中可能会 panic
    • 如果有待接收者,那么会将发送的数据直接 copy 到待接收者的接收位置,然后唤醒接收者
    • 如果有缓冲区,并且缓冲区未满,那么就把发送的数据 copy 到缓冲区中
    • 如果 channel 未关闭,缓冲区为空并且没有待接收者,那么直接阻塞当前 goroutine, 等待被唤醒
    • 发送者被阻塞后,可以被关闭 channel 操作或者被接收操作唤醒,关闭 channel 导致发送者被唤醒后,会panic
    • 当 channel 中有待接收 goroutine,那么 channel 的状态必然是 非缓冲或者缓冲区为空
    发送数据,可以被 select 选中的情况
    • channel 已关闭
    • channel 未关闭,channel有待接收的 goroutine,或者缓冲区不为空并且缓冲区未满

    深入源码

    ch <- i 发送语句实际会被转换为 chansend1

    func chansend1(c *hchan, elem unsafe.Pointer) {
        chansend(c, elem, true, getcallerpc())
    }
    

    chansend1 会直接调用 chansend 来发送数据,并且 block 为 true,说明 ch <- i 语句可以被阻塞

    // src/runtime/chan.go
    
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool 
    

    c 表示操作的 channel
    ep 是一个指针,指向发送的数据 ch <- i
    block 表示是否是阻塞调用,在 select case 语句中才会设置为 false
    callerpc 暂时不需要关心

    返回值是个 bool 类型,表示是否发送成功,未发送成功的操作也不会被 select 语句选中

    首先看一下 channel 为 nil 的情况,这时并不需要加锁

        if c == nil{
            if !block {
                // block 为 false, 则直接返回 false, 表示发送失败
                return false
            }
            // 对于 nil channel,直接挂起当前 goroutine,并永久阻塞
            gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
            // 不会执行到这一步
            throw("unreadable")
        }
    

    如果是非阻塞调用,也就是 select case 语句中调用,那么直接返回 false,意味着向 nil channel 发送数据不会被选中
    阻塞调用就被 gopark 挂起,永久阻塞


    在 channel 加锁之前,对于非阻塞并且未关闭的情况会有一步快速检测的判断,可以快速返回

        // 快速检测,非阻塞时,有些情况不需要获取锁就可以直接返回
        // 非阻塞,未关闭,非缓冲+没有等待接收的 goroutine 或者 缓冲+缓冲区已满
        if !block && c.closed == 0 &&
            ((c.dataqsiz == 0 && c.recvq.first == nil) ||
            ((c.dataqsiz < 0 && c.qcount == c.dataqsiz)) {
            // 返回 false,表示未发送成功
            return false
        }
    

    缓冲区没有空间,并且待接收的 goroutine 时,可以直接返回未发送成功


    加锁,判断 channel 是否关闭,如果已关闭,直接 panic

    // 加锁
        lock(&c.lock)
        
        // 如果 channel 已关闭,则 panic
        if c.closed != 0{
            unlock(&c.lock)
            panic(plainError("send on closed channel"))
        }
    

    channel 待接收队列中有等待的 goroutine

        lock(&c.lock)
        
        // ...
        
        // 从待接收队列中获取等待的 goroutine
        if sg := c.recvq.dequeue(); seq != nil {
    
            // 只要可以从待接收队列中获取到 goroutine,那么发送操作都是只需要 copy 一次
            send(c, sg, ep, func() { unlock(&c.lock) },  3)
            return true
        }
    

    如果待接收队列中有等待的接收者的话,说明 channel 的缓冲区为空
    调用 send 函数,无论是否是无缓冲 channel,都直接复制给待接收者

    func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
        // sg.elem 是指向待接收 goroutine 中接收数据的指针 s <- ch
        // 如果待接收 goroutine 需要接收具体的数据,那么直接将数据 copy 到 sg.elem
        if sg.elem != nil{
            sendDirect(c.elemtype, sg, ep)
            sg.elem = nil
        }
        
        gp := sg.g
        unlockf()   // unlock(&c.lock)
        
        // 赋值 param,待接收者被唤醒后会根据 param 来判断是否是被发送者唤醒的
        gp.param = unsafe.Pointer(sg)
        goready(gp, skip+1) // 唤醒待接收者
    }
    

    会判断一下接收者是否需要接收数据,也就是 sudog.elem 是否为 nil
    如果不为 nil,就调用 sendDirect 把发送的数据(ep 指向的数据) 复制到 sudog.elem

    func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
        // src 是发送的数据源地址,dst 是接收数据的地址
        // src 在当前的 goroutine 栈中,而 dst 在其他栈上
        dst := sg.elem
        
        // 使用 memove 直接进行内存 copy
        // 因为 dst 指向其他 goroutine 的栈,如果它发生了栈收缩,那么就没有修改真正的 dst 位置
        // 所以会加读写前加一个屏障
        typebitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size)
        memove(dst, src, t.size)    
    

    sendDirect 在进行跨 goroutine 内存 copy 时,调用 typebitsBulkBarrier 来加上了写屏障
    因为 GC 会假设对栈的写操作只会发生在 goroutine 正在运行时,并且是由当前 goroutine 写的,
    sendDirect 跨 goroutine 的栈读写会违背这个假设,为了避免出现问题,需要加上写屏障


    缓冲区未满,直接将数据发送到缓冲区中

        lock(&c.lock)
        // ...
        
        if c.qcount < c.dataqsiz {
            //  获取缓冲发送数据的指针
            // add(c.buf, uintptr(i)*uintptr(c.elemsize))
            qp := chanbuf(c, c.sendx)
            
            // copy 数据,ep, gp 都是指针,分别指向数据源和数据目的地
            typedmemove(c.elemtype, qp, ep)
            
            // 递增存放发送数据的索引
            c.sendx++
            if c.sendx == c.dataqsiz{
                // 缓冲区是一个循环数组,调整索引
                c.sendx = 0
            }
            c.qcount++
            unlock(&c.lock)
            return true
        }
    

    chanbuf 函数通过 hchan.sendx 获取到缓冲区存放发送的数据的地址,然后调整循环数组的sendx 索引


    channel 未关闭,对于非缓冲 channel,待接收队列为空,对于缓冲 channel,缓冲区已满
    逻辑依次执行到这里:

        lock(&c.lock)
        // ...
        
        // 如果非阻塞发送,那么可以直接解锁返回,未发送成功
        if !block{
            unlock(&c.lock)
            return false
        }
        
        // 阻塞发送,那么就挂起当前 goroutine
        gp := getg()
        // 生成配置 sudo,省略部分赋值操作
        mysg := acquireSudog()
        mysg.elem = ep  // 将指向发送数据的指针保存到 elem 中
        mysg.g = gp
        mysg.c = c  // 当前阻塞的 channel
        gp.wait = mysg
        
        // param 可以用来传递数据,其他 goroutine 唤醒该 goroutine 时可以设置该字段,然后根据该字段做一些判断
        pg.param = nil  
        
        // 入队待发送队列
        c.sendq.enqueue(mysg)
    
        // 挂起goroutine,等待唤醒
        // chanparkcommit 函数会解锁 ch.lock
        gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    

    非阻塞的话,会直接返回为发送成功
    阻塞调用,则会构建 sudog 对象,然后添加到待发送队列,解锁,挂起当前 goroutine

    会被唤醒的情况有两种

    • 关闭 channel
    • 发生接收操作,接收者可能会唤醒该发送者
        // 被唤醒,执行检查清理操作
        // ...
        
        // param 字段为 nil 表示是由于 close channel 导致的关闭,panic
        // close channel 和接收操作都可能唤醒等待发送的 goroutine, 但是他们设置 param 不一样
        if gp.param == nil {
            if c.closed = 0 {
                throw("chansend: suprious wakeup")
            }
            panic(plainError("send on closed channel"))
        }
        // 清理,释放 sudog
        pg.param == nil
        mysq.c = nil
        releaseSudog(mysg)
        // 发送成功
        return true
    }
    

    被唤醒后会判断 g.param 是否为 nil,因为关闭 channel 时会将待发送 goroutine 的 param 字段置为 nil,会根据这个字段决定是否 panic

    select & 发送操作

    golang 会对 select 语句进行一些优化

    单个发送 case
    select {
    case ch <- i:
        // ...
    }
    
    // 会被优化为
    
    if ch == nil {
        block()
    }
    ch <- i
    

    会在编译期间转换为阻塞发送语句

    非阻塞操作,发送 + default
    select {
    case ch <- i:
        // ...
    default:
        // ...
    }
    
    // =====>
    
    if selectnbsend(ch, i) {
        // ...
    } else {
        // ...
    }
    

    非阻塞操作实际调用 selectnbsend,根据函数返回值决定是否执行 default 逻辑

    func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
        // block 参数为 false,非阻塞调用
        return chansend(c,elem, false, getcallerpc())
    }
    

    返回 false 表示未发送成功,select 便会执行 default

    思考:为什么向关闭的 channel 发送数据需要 panic

    接收数据

    如何从 channel 中接收数据

    // 接收单个值,如果 channel 被关闭后,会返回 channel 中元素的零值
    i <- ch // 调用 `chanrecv1` 函数
    
    // 如果 channel 被关闭并且缓冲区为空,那么 ok 的值就是 false
    i, ok <- ch // 调用 `chanrecv2` 函数
    

    i 是接收操作的接收值ok 表示是否从 channel 中接收到有效的数据,即使 channel 已经关闭,但是缓冲区中依然存在数据,那么 ok 也会是 true

    接收操作的特性

    • 从 nil channel 中接收数据会永久阻塞,而且不会被select 语句选中
    • 如果 channel 未关闭,没有待发送者或者缓冲 channel 的缓冲区为空的话,不会被 select 语句选中
    • 从已关闭并且缓冲区为空的 channel 中接收数据的话,会把接收值置为空值,而且可以被 select 语句选中
    • 如果待发送队列不为空,说明无缓冲或者缓冲已满,对于无缓冲直接从待发送者复制数据到接收值,如果缓冲区已满,那么先将缓冲区中数据复制给接收者,然后将待发送者的数据复制到缓冲区中并唤醒发送者
    • 只要缓冲区不为空,即使channel已关闭,依然可以从缓冲区中获取到数据
    • 如果缓冲为空并且没有待发送者,不会被 select 语句选中,如果是阻塞接收操作的话,会被阻塞直到 channel 被关闭或者被发送者唤醒
    • 接收者被关闭操作唤醒,那么接收值会被置为空值
    接收操作被 select 语句选中的情况
    • channel 已关闭
    • 缓冲区中有数据
    • 待发送队列不为空

    深入源码

    单值的接收语句实际调用 chanrecv1

    // src/runtime/chan.go
    i <- ch
    
    // ===>
    
    func chanrecv1(c *hchan, elem unsafe.Pointer){
        chanrecv(c, elem, true)
    }
    

    接收两个值实际调用 chanrecv2

    i, ok <- ch
    
    // ===>
    
    func chanrecv2(c *hchan, elem unsafe.Pointer)(received bool) {
        _, received = chanrecv(c, elem, true)
    }
    

    chanrecv1chanrecv2 实际都是调用 chanrecv ,他们两个之间的区别就是是否返回接收到有效数据


    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
    

    c 表示接收操作的 channel
    ep 是一个指针,指向接收值i <- ch语句 ep 就是 接收值 i 的地址
    block 是否是阻塞操作,chanrecv1chanrecv2 函数中block为 true,说明是阻塞操作

    返回值 selected 表示是否可以被 select 语句选中
    返回值 received 表示是否可以接收到有效数据


    **channel 在加锁前会判断一下是否为 nil **

        if c == nil {
            // 非阻塞下会直接返回
            if !block {
                return
            }
            // 永久挂起
            gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
            throw("unreachable")
        }
    

    阻塞接收会被永久阻塞,非阻塞的话就直接返回,而且不会被 select 选中


    阻塞接收时,对于未关闭 channel 满足一些条件不需要加锁就可以直接返回

        // 快速检测,在非阻塞模式下,和发送一样有些条件不需要加锁就可以直接判断返回
        // 非阻塞并且未关闭,非缓冲+没有待发送者或者有缓冲+缓冲为空
        if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
            c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
            atomic.Load(&c.closed) == 0 {
            return
        }
    
    • 非缓冲 channel 如果没有待发送者
    • 缓冲 channel 但是缓冲区为空

    加锁,首先判断 channel 是否已关闭,缓冲区中是否还有数据

        lock(&c.lock)
        
        // channel 处于关闭,并且缓冲区已空
        if c.closed != 0 && c.qcount == 0{
            unlock(&c.lock)
            if ep != nil{
                // 如果接收的值需要赋值到变量 x <- ch
                // 将接收的值置为空值
                typedmemclr(c.elemtype, ep)
            }
            // 可被 select 语句选中,但是未接收到有效数据
            return true, false
        }
    

    channel 已经关闭,而且缓冲区没有数据,如果 ep 不为nil ,也就是说存在接收值,那么就把接收值置为空值

    ep 为空的情况是 <- chan 接收操作没有接收值

    selected 返回 true,表示可以被 select 语句选中


    待发送队列不为空,存在待发送者

        lock(&c.lock)
        // ...
        
        // 待发送队列中有 goroutine,说明是非缓冲 channel 或者 缓冲已满的 channel
        if sg := c.sendq.dequeue(); sg != nil {
            recv(c, sg, ep, func(){ unlock(&c.lock) }, 3)
            return true, true   // 可被选中,并且接收成功
        }
    

    如果待发送队列中有等待发送的 goroutine,说明 channel 是非缓冲channel,或者缓冲区已经满了

    • 非缓冲channel,会将数据从待发送者复制给接收者
    • 缓冲区已满的话,会先从缓冲区中接收数据,然后将待发送者的数据发送到缓冲区中

    这里和发送操作时,channel 的待接收队列不为空的情况不一样,因为待接收队列不为空,说明缓冲区肯定是没有数据的,可以跳过缓冲区,直接将数据发送到等待接收的 goroutine

    因为要区分 channel 的类型所以 recv 函数的逻辑就会有一点复杂
    对于非缓冲 channel,如果有接收值,直接调用 recvDirect 从待发送者复制值

    func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
        // 无缓冲 channel
        if c.dataqsiz == 0 {
            // 如果ep 不为 nil,那么直接从发送 goroutine 中将数据 copy 到接收位置
            if ep != nil{
                recvDirect(c.elemtype, sg, ep)
            }
        } 
    

    对于缓冲区有数据的情况

    • 先从缓冲区复制数据到接收值,也就是 ep 指向的地址
    • 然后将待发送者要发送的数据复制到缓冲区中
    • 调整缓冲区循环数据的接收索引 recvx
        } else {
            // 获取缓冲区中待接收的地址
            gp := chanbuf(c, c.recvx)
            if ep != nil {
                // 将待接收数据复制到接收位置
                typedmemmove(c.elemtype, ep, qp)
            }
            // 将待发送者发送的数据复制到相应缓冲区的位置
            typedmemmove(c.elemtype, qp,sq.elem)
            // 调整 recvx
            c.recvx++
            if c.recvx == c.dataqsiz {
                c.recvx = 0
            }
            // 由于缓冲区已满,sendx 和 recvx 必然相等
            c.sendx = c.recvx
        }
    

    无论是缓冲还是非缓冲 channel,recv 函数最后都会唤醒发送者

        // 赋值发送者的 param,发送者被唤醒后会根据 param 来判断是否是关闭唤醒的
        sg.elem = nil
        gp := sg,g
        unlockf()
        gp.param = unsafe.Pointer(sg)
        goready(gp, skip+1)
    }
    

    接收操作会赋值发送者 goroutine 的 param 字段,发送者被唤醒后,会根据 param 参数来判断是有接收操作唤醒还是被关闭 channel 操作唤醒


    缓冲区中有数据,无论 channel 被关闭,都会发送给接收者

        lock(&c.lock)
        // ...
        
        // 如果缓冲区不为空,依然有未发送的数据
        // 需要注意,这时 channel 可能已经处于关闭状态了,但是依然可以从关闭的缓冲区中接收到数据
        if c.qcount > 0{
            // 获取指向缓冲区中待接收数据的指针
            gp ;= chanbuf(c, c.recvx)
            if ep != nil{
                // 如果接收操作有接收值,那么直接 copy 到 ep
                typedmemmove(c.elemtype, ep, gp)
            }
            // 清理缓冲区中已接收到的数据内存
            typedememclr(c.elemtype, gp)
            // 调整待接收索引
            c.recv++
            if c.recvx == c.dataqsiz {
                c.recvx = 0
            }
            c.qcount--
            unlock(&c.lock)
            // 可以被选中,并且接收成功
            return true, true
        }
    

    这一部分的逻辑就比较简单

    • 获取缓冲区的待接收数据的地址 gp,如果有接收者,便将数据复制给接收者
    • 调整缓冲区循环数据的待接收索引recvx

    channel 未关闭, 缓冲区没有元素,并且没有待接收者
    非阻塞操作,可以直接解锁返回,并且不会被 select 语句选中

        lock(&c.lock)
        // ...
    
        // 缓冲区没有元素并且没有待发送者
        if !block {
            unblock(&c.block)
            // 不会被选中,并且没有接收到有效数据
            return false, false
        }
    

    阻塞操作,挂起当前 goroutine,等待被发送操作或者关闭操作唤醒

        lock(&c.lock)
        // ...
    
        gp = getg()
        mysg := acquireSudog()
        mysg.elem = ep
        mysg.g = gp
        mysg.c = c
        gp.param = nil
    
        // 入队到待发送者队列中
        c.recvq.enqueue(mysg)
        // 挂起 goroutine,等待由关闭操作或者发送操作唤醒
        goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
        
        // 被唤醒,做一些检测,和清理操作
        
        // 根据 param 判断是否是由关闭唤醒的
        // 有 closed 唤醒时,param 会被置为 nil
        closed := gp.param == nil   
        pg.param = nil
        mysg.c = nil
        releaseSudog(mysg)
        
        // 可以被选中,但是 closed 反应是否接受到有效数据
        return true, !closed
    }
    

    被唤醒后会根据 param 字段,判断是否是由关闭操作唤醒


    select 与 接收操作
    单个接收 case
    select {
    case i <- ch:
    }
    // ====>
    
    if ch == nil{
        block()
    }
    i <- ch
    
    非阻塞接收
    select {
    case v <- ch: // case v, received <- ch:
        // ...
    default:
        // ...
    }
    // ===>
    
    // if ch != nil && selectnbrecv2(&v, &ok, ch) {
    if selectnbrecv(&v, ch) { 
        // ...
    } else {
        // ...
    }
    

    非阻塞接收会调用 selectnbrecvselectnbrecv2

    func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected bool) {
        selected, _ = chanrecv(c, elem, false)
        return
    }
    func selectnbrecv2(elem unsafe.Pointer, received *bool, c *hchan) (selected bool) {
        elected, *received = chanrecv(c, elem, false)
        return
    }
    

    关闭 channel

    关闭 channel 直接调用 close 函数即可,但是贸然关闭 channel 会引发很多的问题

    ch := make(chan int)
    
    // 关闭 goroutine
    close(ch)
    

    关闭操作的特性

    • 关闭 nil channel 会 panic
    • 关闭已关闭的 channel 会 panic
    • 关闭操作会将待接收者的接收值置为空值,唤醒所有待发送者和待接收者

    关于如何优雅的关闭 channel,可以看一下 go101如何优雅地关闭通道

    深入源码

    关闭 nil channel 会panic

    func closechan(c *hchan) {
        // 关闭 nil channel 会 panic
        if c == nil{
            panic(plainError("close of nil channel"))
        }
    

    重复关闭 channel,也会 panic

        // 加锁
        lock(&c.lock)
        if c.closed != 0 {
            // 重复关闭会 panic
            unlock(&c.lock)
            panic(plainError("close of closed channel"))
        }
    

    需要注意关闭操作中,判断 channel 是否关闭前会加锁


    处理待接收者,如果有接收者,那么就置为空值

        c.closed = 1
    
        var glist gList
        // 处理待接收者
        for {
            sg := c.recvq.dequeue()
            if sg == nil {
                break
            }
            if sg.elem != nil {
                // 将待接收位置置为空值
                typedmemclr(c.elemtype, sg.elem)
                sg.elem = nil   //  清理 elem 指针
            }
            gp := sg.g
            // param 置为 nil,接收者被唤醒后会返回未接收到有效数据
            gp.param = nil
            glist.push(gp)
        }
    

    处理待发送者

        // 处理待发送的队列
        for {
            sg := c.sendq.dequeue()
            if sg == nil {
                // 没有待发送的goroutine了
                break
            }
            sg.elem = nil
            gp := sg.g
            // 将 param 置为 nil, 待发送者被唤醒后,会 panic
            gp.param = nil
            glist.push(gp)
        }
    

    解锁,唤醒所有待发送者和待接收者

        unlock(&c.lock)
        
        // 唤醒所有阻塞的 goroutine
        for !glist.empty(){
            gp := glist.pop()
            gpready(gp, 3)
        }
    }
    

    关闭操作唤醒 channel 中阻塞的 goroutine

    在处理待发送者和待接收者时,都会将 goroutine 的 param 字段置为 nil,然后当被唤醒后待发送者和待接收者就能区分如何被唤醒的

    发送操作
    // runtime/chan.go
    
    func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
        // ...
    
        // 阻塞,挂起 goroutine
        gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
    
        if gp.param == nil {
            if c.closed = 0 {
                throw("chansend: suprious wakeup")
            }
            panic(plainError("send on closed channel"))
        }
        // ...
    

    可以看到发送操作被唤醒后会判断 param 字段
    如果是由于 channel 关闭导致被唤醒,那么直接 panic

    • 关闭操作唤醒,goroutine param 字段为 nil
    func closechan(c *hchan) {
        // ...
        for {
            sg := c.recvq.dequeue()
            // ...
            pg := sg.pg
            gp.param = nil
            // ...
        }
        // ... 唤醒 goroutine
    }
    
    • 接收操作唤醒,goroutine param 不为 nil
    func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
        // ... 数据复制
        
        pg := sg.g
        pg.param = unsafe.Pointer(sg)
        goready(gp, skip+1)
    }
        
    
    接收操作
    func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
        // ...
        
        // 阻塞,挂起当前 goroutine
        goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
        // 被唤醒
        // ...
        
        closed := gp.parma == nil
        // ...
        return true, !closed
    

    接收操作在关闭后并不会 panic,而是会作为 received 返回,表示是否接收到有效的数据

    参考资料

    深度解密Go语言之channel
    Go 语言设计与实现 —— Channel

    推荐阅读

    Go101 通道
    如何优雅地关闭通道
    图解Go的channel底层原理
    走进Golang之Channel的使用
    走进Golang之Channel的数据结构

    展开全文
  • Golang channel

    万次阅读 2020-08-04 11:17:42
    channel 是 Go 语言中的一个核心类型,可以把它看成管道。并发核心单元通过它就可以发送或者接收数据进行通讯,这在一定程度上又进一步降低了编程的难度。 channel 是一个数据类型,主要用来解决 go 程的同步问题...

空空如也

1 2 3 4 5 ... 20
收藏数 68,292
精华内容 27,316
关键字:

channel