精华内容
参与话题
问答
  • 条件变量

    2019-06-01 14:26:46
    条件变量使线程同步中一个很重要的概念,在之前的文章中我们也多次提及过。 条件变量 条件变量(cond)使在多线程程序中用来实现“等待--->唤醒”逻辑常用的方法,是进程间同步的一种机制。条件变量用来阻塞一...

    条件变量使线程同步中一个很重要的概念,在之前的文章中我们也多次提及过。

    条件变量

    条件变量(cond)使在多线程程序中用来实现“等待--->唤醒”逻辑常用的方法,是进程间同步的一种机制。条件变量用来阻塞一个线程,直到条件满足被触发为止,通常情况下条件变量和互斥量同时使用。一般条件变量有两个状态:(1)一个/多个线程为等待“条件变量的条件成立“而挂起;(2)另一个线程在“条件变量条件成立时”通知其他线程。

    为什么条件变量总是和互斥锁结合使用?

    这其实有两方面的原因:

    (1)互斥锁可以表示的状态的太少了,可以利用条件变量来增加有限的状态。

    (2)条件变量虽然是线程同步的重要方法,但仅仅依靠条件变量是没有办法完成完成线程同步的工作的。

    现在提出一个问题:

    有两个线程,贡献一个全局变量count,count的初始值为0。这两个线程的任务是:线程1负责将count的的数值加到10,而线程而负责在线程1将count加到10之后将count输出后清零,这交替循环。

    #include <stdio.h>
    #include <pthread.h>
    #include <sys/types.h>
    
    int count=0;
    pthread_mutex_t myMutex=PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t myCond=PTHREAD_COND_INITIALIZER;
    
    void* threadHandle1(void* argv)
    {
        while(1)
        {
            pthread_mutex_lock(&myMutex);
            ++count;
            pthread_mutex_unlock(&myMutex);
            //留给其他线程足够的时间争用锁
            sleep(1);
        }
    }
    
    void* threadHandle2(void* argv)
    {
        while(1)
        {
            //为了保证在线程进入临界区是,count的数值不会被修变。
            if(count==10)
            {
                pthread_mutex_lock(&myMutex);
                if(count==10)
                {
                    printf("%d\n",count);
                    count=0;
                }
                pthread_mutex_unlock(&myMutex);
            }
            printf("%d\n",count);
            sleep(1);
        }
    }
    
    int main()
    {
        pthread_t pid[2];
        pthread_create(&pid[0],NULL,threadHandle1,NULL);
        pthread_create(&pid[1],NULL,threadHandle2,NULL);
        pthread_join(pid[0],NULL);
        pthread_join(pid[1],NULL);
        return 0;
    }

    虽然只是简单的两个线程对加法的运算,但线程1和线程2需要不停的交换锁的控制权,这样无疑就会给系统带来一些不必要的压力,原因是互斥锁只有两个状态(锁和不锁),而通过条件变量就会可以改进互斥锁在这一面的不足。

    #include <stdio.h>
    #include <pthread.h>
    #include <sys/types.h>
    
    int count=0;
    pthread_mutex_t mutex=PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t cond=PTHREAD_COND_INITIALIZER;
    
    void* threadHandle1(void* argv)
    {
        while(1)
        {
            pthread_mutex_lock(&mutex);
            ++count;
            printf("thread1(mutex):count=%d\n",count);
            pthread_mutex_unlock(&mutex);
            
            pthread_mutex_lock(&mutex);
            if(count==5)
            {
                if(pthread_cond_signal(&cond)==0)
                {
                    printf("thread1:(count=5)signal\n");
                }
            }
            if(count>=10)
            {
                if(pthread_cond_signal(&cond)==0)
                {
                    printf("thread1:(count=10)signal\n");
                }
            }
            pthread_mutex_unlock(&mutex);
            printf("thread1:(cond)unlock\n");
            sleep(1);
        }
    }
    
    void* threadHandle2(void* argv)
    {
        while(1)
        {
            pthread_mutex_lock(&mutex);
            while(count<10)
            {
                //为什么使用while?
                //防止signal唤醒的时机不对。
                printf("thread2(while):count=%d\n",count);
                //在函数返回之前将锁打开,在函数返回之后将锁关闭。
                pthread_cond_wait(&cond,&mutex);
                printf("condWait\n");
            }
            if(count>=10)
            {
                printf("thread2(if):count=%d\n",count);
                count=0;
            }
            pthread_mutex_unlock(&mutex);
            printf("mutexUnlock\n");
        }
    }
    
    int main()
    {
        pthread_t pid[2];
        pthread_create(&pid[0],NULL,threadHandle1,NULL);
        sleep(1);
        pthread_create(&pid[1],NULL,threadHandle2,NULL);
        pthread_join(pid[0],NULL);
        pthread_join(pid[1],NULL);
        return 0;
    }

     

    代码解析:

    pthread_cond_wait(&cond,&mutex);

    该函数有三个作用:

    (1)阻塞线程。

    (2)将互斥锁解锁,并等待其他线程将其唤醒。(1)(2)为原子操作。

    (3)在其他线程将其唤醒之后,将解锁的互斥锁重新加锁。

    这里有两个问题:

    (1)为什么要对线程2中的条件变量的部分加锁?

    (2)在条件变量判断的时候为什么不用if而要使用while?

    为什么要对线程2中的条件变量的部分加锁?

    如果不加锁,在线程判断时假设这样一种情况:当线程1将count的数值加到9的时候,线程2去判断count的值,此时count的值还为9,那么线程2就会进入while循环中,等待线程1的条件成立,将自己唤醒。但就这这个时候,线程1还没有执行pthread_cond_wait时,线程1将count的值修改为10,并发送了signal信号,试图唤醒线程2。而线程2还没有执行wait所以并不会接收到这个信号,之后执行wait,而继续等待线程1的信号,但线程1会任务,自己已经将唤醒的信号发送了,这样就存在问题。

    所以,需要在条件变量进行判断时,将变量锁住,让其他线程不能修改此变量,这样就可以保证在判断的时候条件的变量的值是正确的。即互斥锁的作用不是为了保护条件变量,而是为了保护条件判断时共享变量的值不会被修改

    在条件变量判断的时候为什么不用if而要使用while?

    这个主要是为了防止其他线程在条件变量的条件还不成立的情况下,将睡眠中的线程错误的唤醒。

    就像刚才的程序中的情况:我们的想法是在线程1将count的结果加到10时,将线程2唤醒,但线程1却在count等于5时将线程2唤醒,如果这里使用if就会出现问题。即程序不能保证signal线程将wait线程唤醒的时机时正确的,所以需要多重判断,就需要使用while,而不是使用if。

    signal唤醒线程的时机

    pthread_cond_signal(&cond);

    通过上面的代码的结果分析,可以看出pthread_cond_signal的功能只是唤醒一个被条件变量阻塞的线程,但该函数不会修改锁的状态。而pthread_cond_wait会修改互斥锁的状态。

    这里存在这样一个问题:(1)先解锁,再唤醒;(2)先唤醒,再解锁。因为wait再被唤醒会会有加锁操作。

    (1)先解锁互斥锁,再唤醒睡眠的线程。

    优点:减少了线程再内核态了用户态切换的次数,减少了资源的消耗。因为唤醒线程和解锁,都是需要再内核态完成的,而先解锁,再唤醒,内核会一次将这两个操作完成,这样就减少了用户态和内核态切换的次数,从而节省了资源。

    缺点:如果此时存在一个低优先级的线程在等待锁,那么一旦锁被释放,那么这个锁就会被低优先级的线程争抢去,而不会被wait的线程得到,导致wait线程阻塞,无法返回。

    (2)先唤醒睡眠的线程,再解锁互斥锁。

    优点:唤醒后的线程在等待为该互斥锁加锁,一旦锁被释放,wait线程就会立即加锁,而不会发生上述,锁被抢占额度情况。

    缺点:会增加用户态到内核态切换的次数,增加资源的消耗。

    虽然在语法这两个都可以,但一般在程序使用先唤醒,再解锁的方式。

    展开全文
  • Golang 锁和条件变量

    万次阅读 2020-08-07 12:10:00
    前言 前面我们为了解决go程同步的问题我们使用了channel, 但是go也提供了传统的同步工具. 它们都在go的标准库代码包 sync 和 sync/atomic 中. 下面我们来看一下锁的应用. 什么是锁呢? 就是某个协程(线程)在访问某个...

    前言

    前面我们为了解决go程同步的问题我们使用了channel, 但是go也提供了传统的同步工具.

    它们都在go的标准库代码包 syncsync/atomic 中.

    下面我们来看一下锁的应用.

    什么是锁呢? 就是某个协程(线程)在访问某个资源时先锁住, 防止其他协程的访问, 等访问完毕解锁后其他协程再来加锁进行访问.

    这和我们生活中加锁使用公共资源相似, 例如: 公共卫生间.

    死锁

    死锁是指两个或者两个以上的进程在执行过程中, 由于竞争资源或者由于彼此通信而造成的一种阻塞的现象, 若无外力作用, 它们都将无法推进下去. 此时称系统处于死锁状态系统产生了死锁.

    死锁不是锁的一种! 它是一种错误使用锁导致的现象.

    产生死锁的几种情况

    • 单go程自己死锁
    • go程间channel访问顺序导致死锁
    • 多go程, 多channel交叉死锁
    • 将 互斥锁、读写锁与channel混用 – 隐性死锁(在 读写锁 讲到)

    单go程自己死锁 示例代码:

    package main
    
    import "fmt"
    
    // 单go程自己死锁
    func main() {
    	ch := make(chan int)
    	ch <- 789
    	num := <- ch
    	fmt.Println(num)
    }
    

    上面这段乍一看有可能会觉得没有什么问题, 可是仔细一看就会发现这个 ch 是一个无缓冲的channel, 当789写入缓冲区时, 这时读端还没有准备好. 所以, 写端 会发生阻塞, 后面的代码不再运行.

    所以可以得出一个结论: channel应该在至少2个及以上的go程进行通信, 否则会造成死锁.

    我们继续看 go程间channel访问顺序导致死锁 的例子:

    package main
    
    import "fmt"
    
    // go程间channel访问顺序导致死锁
    func main(){
    	ch := make(chan int)
    	num := <- ch
    	fmt.Println("num = ", num)
    	go func() {
    		ch <- 789
    	}()
    }
    

    在代码运行到 num := <- ch 时, 发生阻塞, 并且下面的代码不会执行, 所以发生死锁.

    正确应该这样写:

    package main
    
    import "fmt"
    
    func main(){
    	ch := make(chan int)
    	go func() {
    		ch <- 789
    	}()
    	num := <- ch
    	fmt.Println("num = ", num)
    }
    

    所以, 在使用channel一端读(写)时, 要保证另一端写(读)操作有机会执行.

    我们再来看下 多go程, 多channel交叉死锁 的示例代码:

    package main
    
    import "fmt"
    
    // 多go程, 多channel交叉死锁
    func main(){
    	ch1 := make(chan int)
    	ch2 := make(chan int)
    
    	go func() {
    		for {
    			select {
    			case num := <- ch1:
    				ch2 <- num
    			}
    		}
    	}()
    
    	for {
    		select {
    		case num := <- ch2:
    			ch1 <- num
    		}
    	}
    }
    

    互斥锁(互斥量)

    每个资源都对应于一个可称为"互斥锁"的标记, 这个标记用来保证在任意时刻, 只能有一个协程(线程)访问该资源, 其它的协程只能等待.

    互斥锁是传统并发编程对共享资源进行访问控制的主要手段, 它由标准库 sync 中的 Mutex 结构体类型表示.

    sync.Mutex 类型只有两个公开的指针方法, LockUnlock.

    Lock锁定当前的共享资源, Unlock进行解锁.

    在使用互斥锁时, 一定要注意, 对资源操作完成后, 一定要解锁, 否则会出现流程执行异常, 死锁等问题, 通常借助defer. 锁定后, 立即使用 defer 语句保证互斥锁及时解锁. 如下所示:

    var mutex sync.Mutex  // 定义互斥锁变量: mutex
    
    func write() {
        mutex.Lock()
        defer mutex.Unlock()
    }
    

    我们先来回顾一下channel是怎么样完成数据同步的.

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    var ch = make(chan int)
    
    func printer(str string) {
    	for _, s := range str {
    		fmt.Printf("%c ", s)
    		time.Sleep(time.Millisecond * 300)
    	}
    }
    
    func person1() {        // 先
    	printer("hello")
    	ch <- 666
    }
    
    func person2() {        // 后
    	<-ch
    	printer("world")
    }
    
    func main() {
    	go person1()
    	go person2()
    	time.Sleep(5 * time.Second)
    }
    

    同样可以使用互斥锁来解决, 如下所示:

    package main
    
    import (
    	"fmt"
    	"sync"
    	"time"
    )
    
    // 使用传统的 "锁" 完成同步  -- 互斥锁
    var mutex sync.Mutex  // 创建一个互斥锁(互斥量), 新建的互斥锁状态为0 -> 未加锁状态. 锁只有一把.
    func printer(str string) {
    	mutex.Lock()        // 访问共享数据之前, 加锁
    	for _, s := range str {
    		fmt.Printf("%c ", s)
    		time.Sleep(time.Millisecond * 300)
    	}
    	mutex.Unlock()  // 共享数据访问结束, 解锁
    }
    
    func person1() {
    	printer("hello")
    }
    
    func person2() {
    	printer("world")
    }
    
    func main() {
    	go person1()
    	go person2()
    	time.Sleep(5 * time.Second)
    }
    

    这种锁为建议锁: 操作系统提供, 建议你在编程时使用.

    强制锁只会在底层操作系统自己用到, 我们在写代码时用不到.

    person1与person2两个go程共同访问共享数据, 由于CPU调度随机, 需要对 共享数据访问顺序加以限定(同步).

    创建mutex(互斥锁), 访问共享数据之前, 加锁; 访问结束, 解锁.

    在person1的go程加锁期间, person2的go程加锁会失败 --> 阻塞.

    直至person1的go程解锁mutext, person2从阻塞处, 恢复执行.

    读写锁

    互斥锁的本质是当一个goroutine访问的时候, 其它goroutine都不能访问. 这样在资源同步, 避免竞争的同时, 也降低了程序的并发性能, 程序由原来的并行执行变成了串行执行.

    其实, 当我们对一个不会变化的数据只做操作的话, 是不存在资源竞争的问题的. 因为数据是不变的, 不管怎么读取, 多少goroutine同时读取, 都是可以的.

    所以问题不是出在上, 主要是修改, 也就是. 修改的数据要同步, 这样其它goroutine才可以感知到. 所以真正的互斥应该是读取和修改、修改和修改之间, 读和读是没有互斥操作的必要的.

    因此, 衍生出另外一种锁, 叫做读写锁.

    读写锁可以让多个读操作并发, 同时读取, 但是对于写操作是完全互斥的. 也就是说, 当一个goroutine进行写操作的时候, 其它goroutine既不能进行读操作, 也不能进行写操作.

    Go中的读写锁由结构体类型 sync.RWMutex 表示. 此类型的方法集合中包含两对方法:

    一组是对写操作的锁定和解锁, 简称为: 写锁定写解锁.

    func (*RWMutex) Lock()
    func (*RWMutex) Unlock()
    

    另一组表示对读操作的锁定和解锁, 简称为: 读锁定读解锁.

    func (*RWMutex) RLock()
    func (*RWMutex) RUnlock()
    

    我们先来看一下没有使用读写锁的情况下会发生什么:

    package main
    
    import (
    	"fmt"
    	"math/rand"
    	"time"
    )
    
    func readGo(in <-chan int, idx int){
    	for {
    		num := <- in
    		fmt.Printf("----第%d个读go程, 读入: %d\n", idx, num)
    	}
    
    }
    
    func writeGo(out chan<- int, idx int){
    	for {
    		// 生成随机数
    		num := rand.Intn(1000)
    		out <- num
    		fmt.Printf("第%d个写go程, 写入: %d\n", idx, num)
    		time.Sleep(time.Millisecond * 300)
    	}
    
    }
    
    func main() {
    	// 随机数种子
    	rand.Seed(time.Now().UnixNano())
    
    	ch := make(chan int)
    
    	for i:=0; i<5; i++ {
    		go readGo(ch, i+1)
    	}
    
    	for i:=0; i<5; i++ {
    		go writeGo(ch, i+1)
    	}
    	time.Sleep(time.Second * 3)
    }
    

    结果(截取部分):

    ......4个写go, 写入: 763
    ----1个读go, 读入: 9981个写go, 写入: 2383个写go, 写入: 998
    ......5个写go, 写入: 6074个写go, 写入: 151
    ----1个读go, 读入: 992
    ----2个读go, 读入: 151
    ......
    

    通过结果我们可以知道, 当写入 763 时, 由于创建的是无缓冲的channel, 应该先把这个数读出来, 然后才可以继续写数据, 但是结果显示, 读到的是 998, 998 在下面才显示写入啊, 怎么会先读出来呢? 出现这个情况的问题在于, 当运行到 num := <- in 时, 已经把 998 写进去了, 但是这个时候还没有来得及打印, 就失去了CPU, 失去CPU之后, 缓冲区中的数据就会被覆盖掉, 这时被 763 所覆盖.

    这是第一个错误现象, 我们再来看一下第二个错误现象.

    既然都是对数据进行读操作, 相邻的读入应该都是相同的数, 比如说----第1个读go程, 读入: 992 ----第2个读go程, 读入: 151, 这两个应该读到的数都是一样的, 但是结果显示却是不同的.

    那么加了读写锁之后, 先来看一下错误代码, 大家可以想一下为什么会出现这种错误.

    package main
    
    import (
    	"fmt"
    	"math/rand"
    	"sync"
    	"time"
    )
    
    var rwMutex sync.RWMutex
    
    func readGo(in <-chan int, idx int){
    	for {
    		rwMutex.RLock()    // 以读模式加锁
    		num := <- in
    		fmt.Printf("----第%d个读go程, 读入: %d\n", idx, num)
    		rwMutex.RUnlock()    // 以读模式解锁
    	}
    }
    
    func writeGo(out chan<- int, idx int){
    
    	for {
    		// 生成随机数
    		num := rand.Intn(1000)
    		rwMutex.Lock()    // 以写模式加锁
    		out <- num
    		fmt.Printf("第%d个写go程, 写入: %d\n", idx, num)
    		time.Sleep(time.Millisecond * 300)
    		rwMutex.Unlock()    // 以写模式解锁
    	}
    }
    
    func main() {
    	// 随机数种子
    	rand.Seed(time.Now().UnixNano())
    
    	ch := make(chan int)
    
    	for i:=0; i<5; i++ {
    		go readGo(ch, i+1)
    	}
    
    	for i:=0; i<5; i++ {
    		go writeGo(ch, i+1)
    	}
    	time.Sleep(time.Second * 3)
    }
    

    上面代码的结果会一直阻塞, 没有输出, 大家可以简单想一下出现这种情况的原因是什么?

    代码看得仔细的应该都可以看出来, 这上面的代码中, 比如说读操作先抢到了CPU, 运行代码 rwMutex.RLock() 读加锁, 然后运行到 num := <- in 时, 会要求写端同时在线, 否则就会发生阻塞, 但是这时写端不可能在线, 因为读加锁了. 所以就会一直在这发生阻塞.

    这也就是我们之前在死锁部分中提到的 隐性死锁 (不报错).

    那么解决办法有两种: 一种是不混用, 另一种是使用条件变量(之后会讲到)

    我们先看一下不混用读写锁与channel的解决办法(只使用读写锁, 如果只使用channel达不到想要的效果):

    package main
    
    import (
    	"fmt"
    	"math/rand"
    	"sync"
    	"time"
    )
    
    var rwMutex2 sync.RWMutex    // 锁只有一把, 两个属性: r w
    
    var value int    // 定义全局变量, 模拟共享数据
    
    func readGo2(in <-chan int, idx int){
    	for {
    		rwMutex2.RLock()    // 以读模式加锁
    		num := value
    		fmt.Printf("----第%d个读go程, 读入: %d\n", idx, num)
    		rwMutex2.RUnlock()    // 以读模式解锁
    	}
    }
    
    func writeGo2(out chan<- int, idx int){
    	for {
    		// 生成随机数
    		num := rand.Intn(1000)
    		rwMutex2.Lock()    // 以写模式加锁
    		value = num
    		fmt.Printf("第%d个写go程, 写入: %d\n", idx, num)
    		time.Sleep(time.Millisecond * 300)
    		rwMutex2.Unlock()    // 以写模式解锁
    	}
    }
    
    func main() {
    	// 随机数种子
    	rand.Seed(time.Now().UnixNano())
    
    	ch := make(chan int)
    
    	for i:=0; i<5; i++ {
    		go readGo2(ch, i+1)
    	}
    
    	for i:=0; i<5; i++ {
    		go writeGo2(ch, i+1)
    	}
    	time.Sleep(time.Second * 3)
    }
    

    结果:

    ......5个写go, 写入: 363
    ----4个读go, 读入: 363
    ----4个读go, 读入: 363
    ----4个读go, 读入: 363
    ----4个读go, 读入: 363
    ----2个读go, 读入: 3635个写go, 写入: 726
    ----5个读go, 读入: 726
    ----4个读go, 读入: 726
    ----2个读go, 读入: 726
    ----1个读go, 读入: 726
    ----3个读go, 读入: 7261个写go, 写入: 764
    ----5个读go, 读入: 764
    ----2个读go, 读入: 764
    ----5个读go, 读入: 764
    ----1个读go, 读入: 764
    ----3个读go, 读入: 764
    ......
    

    处于读锁定状态, 那么针对它的写锁定操作将永远不会成功, 且相应的goroutine也会被一直阻塞, 因为它们是互斥的.

    总结: 读写锁控制下的多个写操作之间都是互斥的, 并且写操作与读操作之间也都是互斥的. 但是多个读操作之间不存在互斥关系.

    从互斥锁和读写锁的源码可以看出, 它们是同源的. 读写锁的内部用互斥锁来实现写锁定操作之间的互斥. 可以把读写锁看作是互斥锁的一种扩展.

    条件变量

    在讲条件变量之前, 我们先来回顾一下之前的生产者消费者模型:

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func producer(out chan <- int) {
    	for i:=0; i<5; i++ {
    		fmt.Println("生产者, 生产: ", i)
    		out <- i
    	}
    	close(out)
    }
    
    func consumer(in <- chan int) {
    	for num := range in {
    		fmt.Println("---消费者, 消费: ", num)
    	}
    }
    
    func main() {
    	ch := make(chan int)
    	go producer(ch)
    	go consumer(ch)
    	time.Sleep(5 * time.Second)
    }
    

    之前都是一个生产者与一个消费者, 那么如果是多个生产者与多个消费者的情况呢?

    package main
    
    import (
    	"fmt"
    	"math/rand"
    	"time"
    )
    
    func producer(out chan <- int, idx int) {
    	for i:=0; i<10; i++ {
    		num := rand.Intn(800)
    		fmt.Printf("第%d个生产者, 生产: %d\n", idx, num)
    		out <- num
    	}
    }
    
    func consumer(in <- chan int, idx int) {
    	for num := range in {
    		fmt.Printf("---第%d个消费者, 消费: %d\n", idx, num)
    	}
    }
    
    func main() {
    	ch := make(chan int)
    	rand.Seed(time.Now().UnixNano())
    	for i := 0; i < 5; i++ {
    		go producer(ch, i + 1)
    	}
    	for i := 0; i < 5; i++ {
    		go consumer(ch, i + 1)
    	}
    	time.Sleep(5 * time.Second)
    }
    

    如果是按照上面的代码写的话, 就又会出现之前的错误.

    上面已经说过了, 解决这种错误有两种方法: 用锁或者用条件变量.

    这次就用条件变量来解决一下.

    首先, 强调一下. 条件变量本身不是锁!! 但是经常与锁结合使用!!

    还有另外一个问题, 如果消费者比生产者多, 仓库中就会出现没有数据的情况. 我们需要不断的通过循环来判断仓库队列中是否有数据, 这样会造成cpu的浪费. 反之, 如果生产者比较多, 仓库很容易满, 满了就不能继续添加数据, 也需要循环判断仓库满这一事件, 同样也会造成cpu的浪费.

    我们希望当仓库满时, 生产者停止生产, 等待消费者消费; 同理, 如果仓库空了, 我们希望消费者停下来等待生产者生产. 为了达到这个目的, 这里就引入了条件变量. (需要注意, 如果仓库队列用channel, 是不存在以上情况的, 因为channel被填满后就阻塞了, 或者channel中没有数据也会阻塞).

    条件变量: 条件变量的作用并不保证在同一时刻仅有一个协程(线程)访问某个共享的数据资源, 而是在对应的共享数据的状态发生变化时, 通知阻塞在某个条件上的协程(线程). 条件变量不是锁, 在并发中不能达到同步的目的, 因此条件变量总是与锁一块使用.

    例如, 我们上面说的, 如果仓库队列满了, 我们可以使用条件变量让生产者对应的goroutine暂停(阻塞), 但是当消费者消费了某个产品后, 仓库就不再满了, 应该唤醒(发送通知给)阻塞的生产者goroutine继续生产产品.

    Go标准库中的 sync.Cond 类型代表了条件变量. 条件变量要与锁(互斥锁或者读写锁)一起使用. 成员变量L代表与条件变量搭配使用的锁.

    type Cond struct {
        noCopy noCopy
        L Locker
        notify notifyList
        checker copyChecker
    }
    

    对应的有3个常用的方法, Wait, Signal, Broadcast

    1. func (c *Cond) Wait()

    该函数的作用可归纳为如下三点:

    • 阻塞等待条件变量满足
    • 释放已掌握的互斥锁相当于cond.L.Unlock()。注意: 两步为一个原子操作(第一步与第二步操作不可再分).
    • 当被唤醒时, Wait() 函数返回时, 解除阻塞并重新获取互斥锁. 相当于cond.L.Lock()
    1. func (c *Cond) Signal()

    单发通知, 给一个正等待(阻塞)在该条件变量上的goroutine(线程)发送通知.

    1. func (c *Cond) Broadcast()

    广播通知, 给正在等待(阻塞)在该条件变量上的所有goroutine(线程)发送通知

    下面, 我们就用条件变量来写一个生产者消费者模型.

    package main
    
    import (
    	"fmt"
    	"math/rand"
    	"sync"
    	"time"
    )
    
    var cond sync.Cond  // 定义全局变量
    
    func producer2(out chan<- int, idx int) {
    	for {
    		// 先加锁
    		cond.L.Lock()
    		// 判断缓冲区是否满
    		for len(out) == 3 {
    			cond.Wait()
    		}
    		num := rand.Intn(800)
    		out <- num
    		fmt.Printf("第%d个生产者, 生产: %d\n", idx, num)
    		// 访问公共区结束, 并且打印结束, 解锁
    		cond.L.Unlock()
    		// 唤醒阻塞在条件变量上的 消费者
    		cond.Signal()
    	}
    }
    
    func consumer2(in <- chan int, idx int) {
    	for {
    		// 先加锁
    		cond.L.Lock()
    		// 判断缓冲区是否为 空
    		for len(in) == 0 {
    			cond.Wait()
    		}
    		num := <- in
    		fmt.Printf("---第%d个消费者, 消费: %d\n", idx, num)
    		// 访问公共区结束后, 解锁
    		cond.L.Unlock()
    		// 唤醒阻塞在条件变量上的生产者
    		cond.Signal()
    	}
    }
    
    func main() {
    	// 设置随机种子数
    	rand.Seed(time.Now().UnixNano())
    
    	ch := make(chan int, 3)
    
    	cond.L = new(sync.Mutex)
    
    	for i := 0; i < 5; i++ {
    		go producer2(ch, i + 1)
    	}
    	for i := 0; i < 5; i++ {
    		go consumer2(ch, i + 1)
    	}
    	time.Sleep(time.Second * 1)
    }
    

    1)定义 ch 作为队列, 生产者产生数据保存至队列中, 最多存储3个数据, 消费者从中取出数据模拟消费

    2)条件变量要与一起使用, 这里定义全局条件变量 cond, 它有一个属性: L Locker, 是一个互斥锁.

    3)开启5个消费者go程, 开启5个生产者go程.

    4)producer2 生产者, 在该方法中开启互斥锁, 保证数据完整性. 并且判断队列是否满, 如果已满, 调用 cond.Wait() 让该goroutine阻塞. 当消费者取出数据后执行 cond.Signal(), 会唤醒该goroutine, 继续产生数据.

    5)consumer2 消费者, 同样开启互斥锁, 保证数据完整性. 判断队列是否为空, 如果为空, 调用 cond.Wait() 使得当前goroutine阻塞. 当生产者产生数据并添加到队列, 执行 cond.Signal() 唤醒该goroutine.

    条件变量使用流程:

    1. 创建条件变量: var cond sync.Cond
    2. 指定条件变量用的: cond.L = new(sync.Mutex)
    3. 给公共区加锁(互斥锁): cond.L.Lock()
    4. 判断是否到达阻塞条件(缓冲区满/空) --> for循环判断
      for len(ch) == cap(ch) { cond.Wait() }
      或者 for len(ch) == 0 { cond.Wait() }
      1) 阻塞 2)解锁 3)加锁
      
    5. 访问公共区 --> 读、写数据、打印
    6. 解锁条件变量用的: cond.L.Unlock()
    7. 唤醒阻塞在条件变量上的对端: cond.Signal() cond.Broadcast()

    李培冠博客

    欢迎访问我的个人网站:

    李培冠博客:lpgit.com

    展开全文

空空如也

1 2 3 4 5 ... 20
收藏数 41,611
精华内容 16,644
关键字:

条件变量