精华内容
下载资源
问答
  • 同步原语

    千次阅读 2012-07-27 15:59:48
    第十一讲 同步原语 * *********************  2007/03/09 asdjf@163.com www.armecos.com    ecos是多线程系统,并发执行造成了一些新问题的产生:多线程协同工作、对临界资源的竞争、线程间通信、线程间...
    第十一讲 同步原语 *
    *********************
        2007/03/09  asdjf@163.com  www.armecos.com
        
        ecos是多线程系统,并发执行造成了一些新问题的产生:多线程协同工作、对临界资源的竞争、线程间通信、线程间同步等等。其实,所有的多任务系统都会遇到类似问题,计算机专家们总结了很多抽象模型来应对,方法手段很多,各有特色,每种操作系统可能只实现了某个子集。ecos内核的同步机制提供了许多同步原语,包括:互斥、条件变量、信号量、信箱、事件标志和Spinlock等。
        
        抽象出来的同步原语操作主要包括:创建、删除、等待(阻塞/超时阻塞/非阻塞)、释放、设置、广播、查询数据、查询状态。
        
        虽然操作大同小异,但每种同步原语的含义和适用情况不同,下面详细介绍各种原语的使用方法和注意事项。
        
        ==========
        * 互斥体 *
        ==========
        互斥体用于实现线程对资源的安全共享。适用于线程间或线程与滞后中断服务程序DSR访问同一临界资源时的安全保护。
        考虑下面的例子:
        static volatile int counter = 0;
        void test(void)
        {
            ......
            counter++;
        }
        假设在某个时候counter的值为16,此时有两个同一优先级的线程A和B,它们都调用上面的test函数。线程A读取counter的值,并将其值加1,此时,counter为17。线程B也做同样的操作,counter的值为18。但是如果线程A在读取counter的值为16后,在将其值加1之前调度器调度运行线程B,此时,线程B读取的仍然是counter原来的值16,操作完成后counter变为17。这样,counter的值只增加了1,而不是2,因此最后counter的值只是17,而不是18。这足以说明该应用程序的运行是不可靠的。
        使用互斥体就可以安全地操作counter全局变量:
        static volatile int counter = 0;
        static cyg_mutex_t lock;
        void test(void)
        {
            ......
            cyg_mutex_lock(&lock);
            counter++;
            cyg_mutex_unlock(&lock);
        }
        
        互斥体的使用可能会引起优先级倒置问题的出现。假设有三个不同优先级的线程A、B、C,优先级A>B>C。A和B由于等待事件而处于阻塞状态,C得以运行。线程C在进入临界区时锁定了一个互斥体。当线程A、B被唤醒时,线程A要等待同一个互斥体,但它在线程C离开临界区并释放互斥体之前不得不等待。与此同时,线程B却可以毫无问题地正常运行。由于线程C比线程B优先级低,它在B被阻塞前将没有机会运行。这样线程A就不能运行。其结果就是高优先级的线程A由于优先级比它低的线程B的原因而无法运行,这就发生了优先级倒置。
        
        解决优先级倒置问题普遍使用的技术是:优先级置顶协议(Priority Ceiling Protocol)和优先级继承协议(Priority Inheritance Protocol)。
        优先级置顶意味着占有互斥体的线程在运行时的优先级比任何其他可以获取该互斥体的线程的优先级都要高。ecos组件包通常无法知道系统中各种线程的详细信息,因此无法对组件包内部使用的互斥体设置合适的置顶优先级。设置高了会影响调度操作。
        优先级继承将占有互斥体的线程优先级提升到所有正在等待该互斥体的线程优先级的最高值。当一个线程等待正被另一优先级较低的线程占有的互斥体时,拥有该互斥体的线程优先级被提升到正在等待该互斥体的线程优先级,优先级继承比优先级置顶效率高,不过增加了同步调用开销,而且实现起来比优先级置顶复杂。
        
        初始化          cyg_mutex_init
        删除            cyg_mutex_destroy
        锁定            cyg_mutex_lock
        尝试锁定        cyg_mutex_trylock
        解锁            cyg_mutex_unlock
        释放            cyg_mutex_release
        设置置顶优先级  cyg_mutex_set_ceiling
        设置协议        cyg_mutex_set_protocol
        
        ============
        * 条件变量 *
        ============
        条件变量是允许线程同时给多个线程发信号的一个同步机制。当线程等待一个条件变量时,它在进入等待状态之前将释放互斥体,在被唤醒后又重新拥有互斥体。这种操作是原子操作。
        
        举例说明见例1。
        
        初始化          cyg_cond_init
        删除            cyg_cond_destroy
        等待            cyg_cond_wait
        唤醒            cyg_cond_signal
        广播            cyg_cond_broadcast
        带超时等待      cyg_cond_timed_wait
        
        ==========
        * 信号量 *
        ==========
        信号量是一个允许线程等待直到事件发生的同步原语。每个信号量都有一个整数计数器,如果计数器为0,那么等待该信号量的线程将被阻塞。如果计数器大于0,那么等待的线程将消耗一个事件,即计数器减1。唤醒信号量将对计数器加1。即使事件连续快速发生多次,信号量也不会丢失信息。
        信号量的另一个用途是对资源的管理。计数器的值与当前可用资源的数目相对应。实际上,条件变量更适合于这种操作。
        
        初始化          cyg_semaphore_init
        删除            cyg_semaphore_destroy
        等待            cyg_semaphore_wait
        带超时等待      cyg_semaphore_timed_wait
        非阻塞等待      cyg_semaphore_trywait
        唤醒            cyg_semaphore_post
        获取信息        cyg_semaphore_peek
        
        ========
        * 信箱 *
        ========
        信箱是一个类似于信号量的同步原语,还可以在事件发生时传递一些数据。有些系统称之为消息队列。被称为消息的数据通常是数据结构的指针。信箱只具有有限的容量,缺省配置为10个槽位,有可能溢出。因此,信箱通常不能被DSR用来唤醒线程。
        
        创建            cyg_mbox_create
        删除            cyg_mbox_delete
        获得消息        cyg_mbox_get
        带超时获得消息  cyg_mbox_timed_get
        非阻塞获得消息  cyg_mbox_tryget
        非删除获得消息  cyg_mbox_peek_item
        发送消息        cyg_mbox_put
        带超时发送消息  cyg_mbox_timed_put
        非阻塞发送消息  cyg_mbox_tryput
        读取消息数      cyg_mbox_peek
        判断是否有消息  cyg_mbox_waiting_to_get
        发新消息前判断  cyg_mbox_waiting_to_put
        
        ============
        * 事件标志 *
        ============
        事件标志允许线程等待一个或几个不同类型的事件发生。还可以用于等待某些事件组合的发生。事件标志不存在溢出问题。
        事件标志可以指定函数调用者阻塞(1)直到所有指定事件发生为止;(2)直到至少一个指定事件发生为止;(3)直到所有指定事件发生为止并清除事件标志;(2)直到至少一个指定事件发生为止并清除事件标志。
        
        初始化              cyg_flag_init
        删除                cyg_flag_destroy
        设置标志位          cyg_flag_setbits
        清除标志位          cyg_flag_maskbits
        等待事件发生        cyg_flag_wait
        超时等待事件        cyg_flag_timed_wait
        探查事件是否发生    cyg_flag_poll
        返回事件标志当前值  cyg_flag_peek
        报告是否有线程等待  cyg_flag_waiting
        
        ============
        * Spinlock *
        ============
        Spinlock是为SMP系统中的应用线程提供的一个同步原语。Spinlock运行级别要低于其他同步原语(如互斥体)。特别在对中断进行处理以及线程需要共享硬件资源的情况下需要使用Spinlock。在SMP系统中,内核自身的实现也需要使用Spinlock。
        必须强调的是,对Spinlock的拥有时间必须很短,一般为几十条指令。在单处理器系统中,不应该使用Spinlock。
        
        初始化          cyg_spinlock_init
        删除            cyg_spinlock_destroy
        声称            cyg_spinlock_spin
        释放            cyg_spinlock_clear
        非阻塞声称      cyg_spinlock_try
        检查是否有等待  cyg_spinlock_test
        安全声称        cyg_spinlock_spin_intsave
        安全释放        cyg_spinlock_clear_intsave
    展开全文
  • ACE框架 同步原语设计

    2017-04-14 20:43:00
    同步原语使用系统平台(操作系统,多线程库)提供的同步原语,并为系统平台不提供的同步原语提供模拟实现。ACE框架使用了外观模式和适配器分两层,将同步原语统一接口。 在外观包装层,ACE框架为每种同步原语将系统...

    ACE框架常用的同步机制设计成统一的原语接口。同步原语使用系统平台(操作系统,多线程库)提供的同步原语,并为系统平台不提供的同步原语提供模拟实现。ACE框架使用了外观模式和适配器分两层,将同步原语统一接口。

    在外观包装层,ACE框架为每种同步原语将系统平台不同的同步原语函数统一成一致的函数接口集,并提供系统平台不支持的同步原语的模拟的实现。这一层位于ACE_OS命名空间层。

    ACE框架定义了7种系统平台需要提供的同步原语,包括有条件变量,事件,互斥体(锁),线程锁,可递归锁,读写锁以及信号量。

    并没有一个系统平台支持所有的同步原语,并且不同系统平台的原语函数各不相同。在这层ACE框架为这些同步原语函数定义了统一的外观。

    Windows平台支持事件,互斥体(锁)Mutext,线程锁 CriticalSection,可递归锁CriticalSection,信号量Semaphore,但不支持条件变量,读写锁。

    Solaris平台支持互斥体(锁)mutex_t,信号量sema_t,条件变量cond_t,读写锁rwlock_t,但并不支持事件,可递归锁。

    Linux平台,操作系统只提供信号量sem_t同步原语的系统调用,其它同步原语必须依赖线程库Pthread。

    线程库Pthread,实现了互斥体(锁)pthread_mutex_t,条件变量pthread_cond_t,读写锁pthread_rwlock_t,并提供一个信号量的模拟实现,但不支持事件,可递归锁。

    ACE为不支持的同步原语提供了实现模拟,分别有事件ACE_event_t,条件变更ACE_cond_t,事件ACE_event_t,以及信号量ACE_sema_t。

    Windows平台的同步原语外观实现:(红色为ACE定义的同步对象外观,黄色为实现)

    Linux平台的外观实现:

     

    虽然为7种同步原语各自统一了函数外观,但使用起来还很繁杂,不同的同步原语之间的函数不兼容,并且不利于开发中同步原语的替换。在适配层,ACE使用相同的接口包装了每种同步原语的使用,并且使同步原语轻易地使用在模板泛型开发。

     

    ACE_Barrier是更高一层的同步原语。

    转载于:https://www.cnblogs.com/bbqzsl/p/6710799.html

    展开全文
  • Go-同步原语

    2020-03-30 09:08:00
    一、同步原语 - Sync 这些基本原语提高了较为基础的同步功能,但是它们是一种相对原始的同步机制,在多数情况下,我们都应该使用抽象层级的更高的 Channel 实现同步。 1-1 并发状态下的资源冲突 由于引用传递,在...

    一、同步原语 - Sync

    这些基本原语提高了较为基础的同步功能,但是它们是一种相对原始的同步机制,在多数情况下,我们都应该使用抽象层级的更高的 Channel 实现同步。

    1-1 并发状态下的资源冲突

    由于引用传递,在并发状态下的数据资源获取无序,导致最终结果重复或者错误。

    package main
    
    import (
        "fmt"
        "sync"
        "time"
    )
    
    func main() {
        var a = 0
    
        for i := 0; i < 10; i++ {
            go func(idx int) {
                a += 1
                fmt.Printf("goroutine %d, a=%d\n", idx, a)
            }(i)
        }
    
        // 等待 1s 结束主程序 确保所有协程执行完
        time.Sleep(time.Second)
    }
    /*
    goroutine 2, a=4
    goroutine 0, a=1
    goroutine 4, a=2
    goroutine 3, a=10
    goroutine 5, a=8
    goroutine 8, a=5
    goroutine 6, a=9
    goroutine 1, a=3
    goroutine 7, a=7
    goroutine 9, a=7
    */
    

    二、 sync.Mutex - 互斥锁

    一份互斥锁对一个资源加锁,只能同时被一个 goroutine 锁定,其他 goroutine 阻塞等待资源释放。

    注意:对一个未锁定的互斥锁解锁,会抛错;首次使用后不能复制复制该互斥锁

    2-1 锁结构

    type Mutex struct {
    	state int32
    	sema  uint32
    }
    
    • state:互斥锁的当前状态
      • mutexLocked — 表示互斥锁的锁定状态;
      • mutexWoken — 表示从正常模式被从唤醒;
      • mutexStarving — 当前的互斥锁进入饥饿状态;
      • waitersCount — 当前互斥锁上等待的 Goroutine 个数;
    • sema:控制锁状态
      • 正常模式 - 锁的等待者会按照先进先出的顺序获取资源。
        • 刚被唤起的 goroutine 与新建的 goroutine 竞争资源的时,大概率竞争失败而无法获取锁资源,所以若 goroutine 超过 1ms 没有获取到锁,互斥锁会自动被切换成饥饿模式,防止 goroutine 没有资源。
      • 饥饿模式 - 互斥锁会直接将资源交给等待队列最前的 goroutine,新创建的 goroutine 会被至于等待最尾端。目的是为了确保互斥锁的公平性。
        • 若一个 goroutine 获得了互斥锁,并且它在队列尾端,或者等待的时间少于 1ms ,则互斥锁会自动切换成正常模式
        • 饥饿模式可以有效的避免 goroutine 陷入由于等待无法获取锁的高危延时。

    2-2 应用举例

    2-2-1 资源的有序化

    package main
    
    import (
    	"fmt"
    	"sync"
    	"time"
    )
    
    func main() {
    	var a = 0
    
    	var lock sync.Mutex
    	for i := 0; i < 10; i++ {
    		go func(idx int) {
    			lock.Lock()
    			defer lock.Unlock()
    			a += 1
    			fmt.Printf("goroutine %d, a=%d\n", idx, a)
    		}(i)
    	}
    
    	// 等待 1s 结束主程序
    	// 确保所有协程执行完
    	time.Sleep(time.Second)
    }
    /*
    goroutine 0, a=1
    goroutine 7, a=2
    goroutine 5, a=3
    goroutine 1, a=4
    goroutine 2, a=5
    goroutine 3, a=6
    goroutine 4, a=7
    goroutine 8, a=8
    goroutine 9, a=9
    goroutine 6, a=10
    */
    

    2-2-2 资源的占有

    package main
    
    import (
    	"fmt"
    	"sync"
    	"time"
    )
    
    func main() {
    	ch := make(chan struct{}, 2)
    
    	var l sync.Mutex
    	go func() {
    		l.Lock()
    		defer l.Unlock()
    		fmt.Println("goroutine1: 锁定 2s")
    		time.Sleep(time.Second * 2)
    		fmt.Println("goroutine1: 解锁资源")
    		ch <- struct{}{}
    	}()
    
    	go func() {
    		fmt.Println("goroutine2: 等待解锁")
    		l.Lock()
    		defer l.Unlock()
    		fmt.Println("goroutine2: 获取资源,锁定资源")
    		ch <- struct{}{}
    	}()
    
    	// 等待 goroutine 执行结束
    	for i := 0; i < 2; i++ {
    		<-ch
    	}
    }
    /*
    goroutine2: 等待解锁
    goroutine1: 锁定 2s
    goroutine1: 解锁资源
    goroutine2: 获取资源,锁定资源
    */
    

    2-3 互斥锁总结

    加锁过程

    • 如果互斥锁处于初始化状态,就会直接通过置位 mutexLocked 加锁;
    • 如果互斥锁处于 mutexLocked 并且在普通模式下工作,就会进入自旋,执行 30 次 PAUSE 指令消耗 CPU 时间等待锁的释放;
    • 如果当前 Goroutine 等待锁的时间超过了 1ms,互斥锁就会切换到饥饿模式
    • 互斥锁在正常情况下会通过 sync.runtime_SemacquireMutex 函数将尝试获取锁的 Goroutine 切换至休眠状态,等待锁的持有者唤醒当前 Goroutine;
    • 如果当前 Goroutine 是互斥锁上的最后一个等待的协程或者等待的时间小于 1ms,当前 Goroutine 会将互斥锁切换回正常模式

    解锁过程

    • 当互斥锁已经被解锁时,那么调用 sync.Mutex.Unlock 会直接抛出异常;
    • 当互斥锁处于饥饿模式时,会直接将锁的所有权交给队列中的下一个等待者,等待者会负责设置 mutexLocked 标志位;
    • 当互斥锁处于普通模式时,如果没有 Goroutine 等待锁的释放或者已经有被唤醒的 Goroutine 获得了锁,就会直接返回;在其他情况下会通过 sync.runtime_Semrelease 唤醒对应的 Goroutine;

    三、sync.RWMutex - 读写互斥锁

    读写互斥锁是细粒度的互斥锁,不限制资源的并发读,但是可以对 读、写 操作进行锁定。

    通常在大量读操作,少量写操作的业务场景下提高服务的性能。进行读写资源的操作分离,提高服务的性能。

    3-1 锁结构

    type RWMutex struct {
    	w           Mutex
    	writerSem   uint32
    	readerSem   uint32
    	readerCount int32
    	readerWait  int32
    }
    
    • w :复用互斥锁提供的能力
    • writerSem:写等待读
    • readerSem:读等待写
    • readerCount:当前正在执行读操作的数量
    • readerWait:当写操作被堵塞时,等待的读操作个数

    3-2 锁应用

    注意

    • 同时只能有一个 goroutine 能够获得写死锁
    • 同时可以用有任意多个 goroutine 获得读锁定
    • 同时只能存在写锁定或读锁定(读写互斥)
    • 一个 goroutine 获得写锁定,其他协程无论读或者写,都将阻塞到写解锁
    • 一个 goroutine 获得读锁定,其他读锁定可以继续获取读资源
    • 当有一个或多个读锁定,写锁定将等待所有读锁定解锁后才能进行写锁定
    package main
    
    import (
    	"fmt"
    	"math/rand"
    	"sync"
    )
    
    var count int
    var rw sync.RWMutex
    
    func main() {
    	ch := make(chan struct{}, 10)
    	for i := 0; i < 5; i++ {
    		go read(i, ch)
    	}
    	for i := 0; i < 5; i++ {
    		go write(i, ch)
    	}
    
    	for i := 0; i < 10; i++ {
    		<-ch
    	}
    }
    
    func read(n int, ch chan struct{}) {
    	rw.RLock()
    	fmt.Printf("goroutine %d 进入读操作...\n", n)
    	v := count
    	fmt.Printf("goroutine %d 读取结束,值为:%d\n", n, v)
    	rw.RUnlock()
    	ch <- struct{}{}
    }
    
    func write(n int, ch chan struct{}) {
    	rw.Lock()
    	fmt.Printf("goroutine %d 进入写操作...\n", n)
    	v := rand.Intn(1000)
    	count = v
    	fmt.Printf("goroutine %d 写入结束,新值为:%d\n", n, v)
    	rw.Unlock()
    	ch <- struct{}{}
    }
    /*
    goroutine 4 进入写操作...
    goroutine 4 写入结束,新值为:81
    goroutine 4 进入读操作...
    goroutine 2 进入读操作...
    goroutine 0 进入读操作...
    goroutine 0 读取结束,值为:81
    goroutine 1 进入读操作...
    goroutine 4 读取结束,值为:81
    goroutine 3 进入读操作...
    goroutine 2 读取结束,值为:81
    goroutine 3 读取结束,值为:81
    goroutine 1 读取结束,值为:81
    goroutine 1 进入写操作...
    goroutine 1 写入结束,新值为:887
    goroutine 0 进入写操作...
    goroutine 0 写入结束,新值为:847
    goroutine 2 进入写操作...
    goroutine 2 写入结束,新值为:59
    goroutine 3 进入写操作...
    goroutine 3 写入结束,新值为:81
    */
    

    四、sync.WaitGroup

    sync.WaitGroup 可以等待一组 Goroutine 的返回

    WaitGroup 对象内部有一个计数器,最初从0开始,它有三个方法:Add(), Done(), Wait() 用来控制计数器的数量。Add(n) 把计数器设置为nDone() 每次把计数器-1wait() 会阻塞代码的运行,直到计数器地值减为0。

    它将会让主 goroutine 等待所有的 worker goroutine 完成。
    如果你的应用有长时运行的消息处理循环的 worker,你也将需要一个方法向这些 goroutine 发送信号,让它们退出。
    你可以给各个worker 发送一个“kill”消息。
    另一个选项是关闭一个所有 worker 都接收的 channel。这是一次向所有 goroutine 发送信号的简单方式。

    注意点总结

    • sync.WaitGroup 必须在 sync.WaitGroup.Wait 方法返回之后才能被重新使用;
    • sync.WaitGroup.Done 只是对 sync.WaitGroup.Add 方法的简单封装,我们可以向 sync.WaitGroup.Add 方法传入任意负数(需要保证计数器非负)快速将计数器归零以唤醒其他等待的 Goroutine;
    • 可以同时有多个 Goroutine 等待当前 sync.WaitGroup 计数器的归零,这些 Goroutine 会被同时唤醒;
    • sync.WaitGroup.Add 必须在 goroutine 开始前执行

    4-1 结构

    type WaitGroup struct {
    	noCopy noCopy
    	state1 [3]uint32
    }
    
    • noCopy — 保证 sync.WaitGroup 不会被开发者通过再赋值的方式拷贝;
    • state1 — 存储着状态和信号量;
    func (wg *WaitGroup) Add(delta int) // 添加 goroutine 个数
    func (wg *WaitGroup) Done() // goroutine 完成,个数 -1 
    func (wg *WaitGroup) Wait() // 等待结束
    

    4-2 应用举例

    4-2-1 简单等待

    package main
    
    import (
        "fmt"
        "sync"
        "time"
    )
    
    func main() {
        var wg sync.WaitGroup
    
        for i := 0; i < 5; i++ {
            // 计数加 1
            wg.Add(1)
            go func(i int) {
                // 计数减 1
                defer wg.Done()
                time.Sleep(time.Second * time.Duration(i))
                fmt.Printf("goroutine%d 结束\n", i)
            }(i)
        }
    
        // 等待执行结束
        wg.Wait()
        fmt.Println("所有 goroutine 执行结束")
    }
    

    4-2-2 解决 进程不会等待所有的 goroutines 完成而引发的问题

    go 内使用 goroutines 所谓的轻量线程,实现并发。并且在同一个程序中,所有的 goroutines 共享同一个地址空间。

    但是由于 goroutines 的效率高,所以程序未等到所有的 goroutines 完成就会结束。

    package main
    import (  
        "fmt"
        "time"
    )
    func main() {  
        workerCount := 2
        for i := 0; i < workerCount; i++ {
          	// 使用轻量线程 执行函数
            go doit(i)
        }
        time.Sleep(1 * time.Second)
        fmt.Println("all done!")
    }
    func doit(workerId int) {  
        fmt.Printf("[%v] is running\n",workerId)
        time.Sleep(3 * time.Second)
        fmt.Printf("[%v] is done\n",workerId)
    }
    
    /*
    [0] is running 
    [1] is running 
    all done!
    */
    

    一个最常见的解决方法是使用 WaitGroup 变量,sync.WaitGroup 能有效的阻塞代码

    package main
    import (  
        "fmt"
        "sync"
    )
    func main() {  
        var wg sync.WaitGroup
        done := make(chan struct{}) // 创建结构体通道
        workerCount := 2
        for i := 0; i < workerCount; i++ {
            wg.Add(1)
            go doit(i,done,wg)
        }
        close(done)
        wg.Wait()
        fmt.Println("all done!")
    }
    func doit(workerId int,done <-chan struct{},wg sync.WaitGroup) {  
        fmt.Printf("[%v] is running\n",workerId)
      	defer wg.Done() 
    		// defer 确保 wg.Done() 在该函数执行完毕之后执行,用于解锁资源
        <- done
        fmt.Printf("[%v] is done\n",workerId)
    }
    
    /*
    [0] is running 
    [0] is done 
    [1] is running 
    [1] is done
    */
    

    但通常会出现死锁问题。
    死锁:两个或两个以上的线程,因抢夺资源而造成相互等待的无解结果。

    各个 worker 都得到了原始的 WaitGroup 变量的一个拷贝。所有 worker 都在等着对 wg 内存空间资源的修改权,所以当 worker 执行wg.Done()时,并没有在主 goroutine 上的 WaitGroup 变量上生效。

    fatal error: all goroutines are asleep - deadlock!
    
    package main
    import (  
        "fmt"
        "sync"
    )
    func main() {  
        var wg sync.WaitGroup
      	// 注意 已经初始,非nil
        done := make(chan struct{})
        wq := make(chan interface{})
      	// 非缓冲信道,发送方会阻塞直到接收方从信道中接受了值
        workerCount := 2
        for i := 0; i < workerCount; i++ {
            wg.Add(1)
            go doit(i,wq,done,&wg)
        }
        for i := 0; i < workerCount; i++ {
            wq <- i
        }
        close(done) // 关闭信道,禁止数据流入,但可读。
        wg.Wait()
        fmt.Println("all done!")
    }
    func doit(workerId int, wq <-chan interface{},done <-chan struct{},wg *sync.WaitGroup) {  
        fmt.Printf("[%v] is running\n",workerId)
        defer wg.Done()
        for {
          	// select 是 Go 中的一个控制结构,类似于用于通信的 switch 语句。每个 case 必须是一个通信操作,要么是发送要么是接收。
          	// select 能让 goroutine 同时等待多个 channel 的可读或可写
          	// 当满足多个 case 时,随机执行一个可运行的case 若没有 case 可执行将阻塞等待,直到存在可执行的case
            select {
            case m := <- wq:
                fmt.Printf("[%v] m => %v\n",workerId,m)
            case <- done:
              	/*
              	a,ok:= <- done
              	fmt.Printf("[%v] is done %v,%v\n",workerId,a,ok)
              	[1] is done {},false
              	*/
                // 从一个nil channel中接收数据会一直被block
              	// 从关闭的 channel 中不但可以读取已发送的数据,还可以不断读取初始值
                fmt.Printf("[%v] is done\n",workerId)
                return
            }
        }
    }
    
    // 注意:执行结果不唯一
    /*
    [1] is running
    [1] m => 0
    [0] is running
    [0] is done
    [1] m => 1
    [1] is done
    all done!
    */
    /*
    [1] is running
    [1] m => 0
    [0] is running
    [0] m => 1
    [0] is done {}
    [1] is done {}
    all done!
    */
    // 主线程执行程序,0 1 随机执行case,走到 done 即结束循环,走 wq 即读取
    // 主线程同时执行 wq 的写入,并关闭信道,等待所有线程完成
    
    

    五、sync.Once - 只允许函数被调一次

    5-1 结构体

    type Once struct {
    	done uint32
    	m    Mutex
    }
    
    • done :记录执行次数
    • m:确保仅被执行一次
    func (o *Once) Do(f func()) // 执行调用
    

    5-2 应用

    package main
    
    import (
    	"fmt"
    	"sync"
    )
    
    func main() {
    	var once sync.Once
    	onceBody := func() {
    		fmt.Println("Only once")
    	}
    	done := make(chan bool)
    	for i := 0; i < 10; i++ {
    		go func() {
    			once.Do(onceBody)
    			done <- true
    		}()
    	}
    	for i := 0; i < 10; i++ {
    		<-done
    	}
    }
    // Only once
    

    六、sync.Cond - 条件变量

    Cond 实现一个条件变量,可以让一系列 goroutine 都满足特定条件时被唤醒,保存一个通知列表,符合某种状态,goroutine 进行等待(Wait)被通知(Broadcast,Signal),若符合条件唤醒的 goroutine 将继续执行,其他的继续在等待序列中等待。

    注意总结

    • sync.Cond.Wait 方法在调用之前一定要使用获取互斥锁,否则会触发程序崩溃;
    • sync.Cond.Signal 方法唤醒的 Goroutine 都是队列最前面、等待最久的 Goroutine;
    • sync.Cond.Broadcast会按照一定顺序广播通知等待的全部 Goroutine;

    6-1 结构体

    type Cond struct {
    	noCopy  noCopy
    	L       Locker
    	notify  notifyList
    	checker copyChecker
    }
    
    • noCopy — 用于保证结构体不会在编译期间拷贝;
    • copyChecker — 用于禁止运行期间发生的拷贝;
    • L — 用于保护内部的 notify 字段,Locker 接口类型的变量;
    • notify — 一个 Goroutine 的链表,它是实现同步机制的核心结构(通知列表)
    func NewCond(l Locker) *Cond
    func (c *Cond) Broadcast() // 广播通知,唤醒队列中全部的 goroutine
    func (c *Cond) Signal() // 单发通知,唤醒队列最前的 goroutine
    func (c *Cond) Wait() // 等待通知
    

    6-2 应用

    package main
    
    import (
    	"fmt"
    	"sync"
    	"time"
    )
    
    func main() {
    	var count int = 4
    	ch := make(chan struct{}, 5)
    
    	// 新建 cond 传入互斥锁为参
    	var l sync.Mutex
    	cond := sync.NewCond(&l)
    
    	for i := 0; i < 5; i++ {
    		go func(i int) {
    			// 争抢互斥锁的锁定
    			cond.L.Lock()
    			defer func() {
    				cond.L.Unlock()
    				ch <- struct{}{}
    			}()
    
    			// 条件是否达成
    			for count > i {
    				cond.Wait()
    				fmt.Printf("收到一个通知 goroutine%d\n", i)
    			}
    
    			fmt.Printf("goroutine%d 执行结束\n", i)
    		}(i)
    	}
    
    	// 确保所有 goroutine 启动完成
    	time.Sleep(time.Millisecond * 20)
    
    	fmt.Println("broadcast...")
    	cond.L.Lock()
    	count -= 1
    	cond.Broadcast()
    	cond.L.Unlock()
    
    	time.Sleep(time.Second)
    	fmt.Println("signal...")
    	cond.L.Lock()
    	count -= 2
    	cond.Signal()
    	cond.L.Unlock()
    
    	time.Sleep(time.Second)
    	fmt.Println("broadcast...")
    	cond.L.Lock()
    	count -= 1
    	cond.Broadcast()
    	cond.L.Unlock()
    
    	for i := 0; i < 5; i++ {
    		<-ch
    	}
    }
    
    /*
    goroutine4 执行结束
    broadcast...
    收到一个通知 goroutine2
    收到一个通知 goroutine1
    收到一个通知 goroutine0
    收到一个通知 goroutine3
    goroutine3 执行结束
    signal...
    收到一个通知 goroutine2
    goroutine2 执行结束
    broadcast...
    收到一个通知 goroutine0
    goroutine0 执行结束
    收到一个通知 goroutine1
    goroutine1 执行结束
    */
    

    七、sync.Pool - 临时对象池

    sync.Pool 可以作为临时对象的保存和复用的集合

    通常适用与无状态的对象的复用,不适用在连接池的业务。

    若需要操池操作,可使用 fmt 包替代,fmt 内维护了一个动态大小的临时缓冲区。

    注意总结

    • Pool 中的对象在仅有 Pool 有着唯一索引的情况下可能会被自动删除,取决于下一次 GC 执行的时间
    • goroutines 协程安全,可以同时被多个协程使用。

    7-1 结构体

    type Pool struct {
        noCopy noCopy
        local     unsafe.Pointer 
        localSize uintptr       
        New func() interface{}
    }
    
    • New:需要提供一个 New 方法,保证获取不到临时对象的时候自动创一个,该对象不会自动被加入 Pool 中。
    • local:一个池子的固定大小,类型为 [P]pollLocal
    • local:本地数组的长度
    func (p *Pool) Get() interface{} // 获取临时对象,不存在则创建
    func (p *Pool) Put(x interface{}) // 将临时对象放回 Pool 中
    

    7-2 应用举例

    package main
    
    import (
        "bytes"
        "io"
        "os"
        "sync"
        "time"
    )
    
    var bufPool = sync.Pool{
        New: func() interface{} {
            return new(bytes.Buffer)
        },
    }
    
    func timeNow() time.Time {
        return time.Unix(1136214245, 0)
    }
    
    func Log(w io.Writer, key, val string) {
        // 获取临时对象,没有的话会自动创建
        b := bufPool.Get().(*bytes.Buffer)
      	
      	// 临时对象的存储方法
        b.Reset()
        b.WriteString(timeNow().UTC().Format(time.RFC3339))
        b.WriteByte(' ')
        b.WriteString(key)
        b.WriteByte('=')
        b.WriteString(val)
        w.Write(b.Bytes())
      
        // 将临时对象放回到 Pool 中
        bufPool.Put(b)
    }
    
    func main() {
        Log(os.Stdout, "path", "/search?q=flowers")
    }
    
    /*
    打印结果:
    2006-01-02T15:04:05Z path=/search?q=flowers
    */
    

    参考阅读

    Golang -sync doc

    同步原语与锁

    浅谈 Golang sync 包的相关使用方法

    展开全文
  • 2017/3/12 18.5.7 同步原语 Python 3.6.1rc1文档 18.5.7同步原语 锁 Lock Event Condition 信号量 Semaphore BoundedSemaphore ASYNCIO锁定API被设计成接近类threading 模块 Lock Event Condition Semaphore ...
  • 使用AmpPHP应用程序和库的同步原语。 安装 该软件包可以作为依赖项安装。 composer require amphp/sync 文献资料 可以在以及目录中找到文档。 版本控制 像所有其他amphp软件包一样, amphp/sync遵循语义版本控制...
  • asyncio同步原语与线程(threading)模块同步原语基本类似,但有两点重要区别: asyncio同步原语非线程安全,因此不应被用作系统线程同步(可以使用threading代替); asyncio同步原语不允许使用timeout参数;可以...

    asyncio同步原语与线程(threading)模块同步原语基本类似,但有两点重要区别:

    • asyncio同步原语非线程安全,因此不应被用作系统线程同步(可以使用threading代替);
    • asyncio同步原语不允许使用timeout参数;可以使用asyncio.wait_for()方法执行有超时设置的操作。

    asyncio有以下5个基本的同步原语:

    • Lock
    • Event
    • Condition
    • Semaphore
    • BoundedSemaphore

    Lock

    • class asyncio.**Lock(*,loop=None)**
      • 为asyncio任务提供一个互斥锁。非线程安全。
      • asyncio锁可以用来保证对共享资源的独占访问。
      • asyncio锁的首选用法是同async with语句一起使用:

        lock = asyncio.Lock()
        # ... later
        async with lock:
           # 访问共享资源

        此代码段和以下代码是等价的:

        lock = asyncio.Lock()
        
        # ... later
        await lock.acquire()
        try:
            # 访问共享资源
        finally:
            lock.release()
      • coroutine acquire()
        • 获取asyncio同步锁。
        • 该方法等待的状态变为unlocked,之后设置其为locked,并返回True
      • release()
        • 释放asyncio同步锁。
        • 如果的状态是locked,则将其重置为unlocked并返回。
        • 如果的状态是unlocked,会引发RuntimeError异常。
      • locked()
        • 如果的状态是locked,则返回True

    Event

    • class asyncio.**Event(*,loop=None)**
      • 事件对象,非线程安全。
      • 用于向asyncio任务通知某些事件已发生。
      • 事件对象用于管理内部标志。此标志可以通过set()方法设置为True,或通过clear()方法复位为Falsewait()方法在该标志设置为True前一直保持阻塞。初始状态下,该标志为False
      • 例如:

        async def waiter(event):
             print('waiting for it ...')
             await event.wait()
             print('... got it!')
        
        async def main():
             # Create an Event object.
             event = asyncio.Event()
        
             # Spawn a Task to wait until 'event' is set.
             waiter_task = asyncio.create_task(waiter(event))
        
             # Sleep for 1 second and set the event.
             await asyncio.sleep(1)
             event.set()
        
             # Wait until the waiter task is finished.
             await waiter_task
        
        asyncio.run(main())
      • coroutine wait()
        • 等待事件内部标志被设置为True
        • 如果事件的内部内部标志已设置,则立即返回True。否则,一直阻塞,直到另外的任务调用set()
      • set()
        • 设置事件内部标志True
        • 所有等待事件的任务将会立即被触发。
      • clear()
        • 清除事件内部标志(即设置为False)。
        • 等待事件的任务将会阻塞,直到set()方法被再次调用。
      • is_set()
        • 如果事件内部标志被设置为True,则返回True

    Condition

    • class asyncio.**Condition(lock=None,*,loop=None)**
      • 条件对象,非线程安全。
      • 异步条件原语用于在某些事件发生后,获得共享资源的独占访问权限。
      • 本质上,条件对象结合了事件和锁的功能。可以让多个Condition对象共享一个Lock,这允许在对共享资源的特定状态感兴趣的不同任务之间协调对共享资源的独占访问。
      • 可选参数lock必须为Lock对象或None。如果为None,会自动创建一个Lock对象。
      • 使用条件对象的首选方法是async with方式:

        cond = asyncio.Condition()
        
        # ... later
        async with cond:
            await cond.wait()

        等价于:

        cond = asyncio.Condition()
        
        # ... later
        await lock.acquire()
        try:
            await cond.wait()
        finally:
            lock.release()
      • coroutine acquire()
        • 获取底层锁。
        • 该方法一直等待,直到底层锁处于未锁定状态,然后设置其为锁定状态,并且返回True
      • notify(n=1)
        • 唤醒至多n个等待条件的任务。如果没有正在等待的任务,则该方法无操作。
        • 在调用该方法之前,必须先调用acquire()获取锁,并在调用该方法之后释放锁。如果在锁为锁定的情况下调用此方法,会引发RuntimeError异常。
      • locked()
        • 如果底层锁已获取,则返回True
      • notify_all()
        • 唤醒所有正在等待该条件的任务。
        • 该方法与notify()类似,区别只在它会唤醒所有正在等待的任务。
        • 在调用该方法前,必须首先获取底层锁,并在执行完该方法后释放锁。如果在底层锁未锁定的情况下执行该方法,会引发RuntimeError异常。
      • release()
        • 释放底层锁。
        • 在未锁定的锁上调用时,会引发RuntimeError异常。
      • coroutine wait()
        • 等待通知。
        • 如果调用此方法的任务没有获取到锁,则引发RuntimeError异常。
        • 此方法释放底层锁,然后保持阻塞,直至被notify()notify_all()唤醒。被唤醒之后,条件对象重新申请锁,该方法返回True
      • coroutine wait_for(predicate)
        • 等待predicate变为True
        • predicate必须可调用,它的执行结果会被解释为布尔值,并作为最终结果返回。

    Semaphore

    • class asyncio.**Semaphore(value=1,*,loop=None)**
      • 信号量(Semaphore)对象。非线程安全。
      • 信号量用于管理一个内部计数器,此计数器逢acquire()递减,逢release()递增。计数器的值不能小于0,如果acquire()被调用时计数器为0,则阻塞,直到某一任务调用release()
      • value为可选参数,用于设定内部计数器的初始值。如果给定的值小于0,则引发ValueError异常。
      • 使用信号量的最佳方法是async with声明:

        sem = asyncio.Semaphore(10)
        
        # ... later
        async with sem:
            # work with shared resource

        等价于:

        sem = asyncio.Semaphore(10)
        
        # ... later
        await sem.acquire()
        try:
            # work with shared resource
        finally:
            sem.release()
    • coroutine acquire()
      • 申请一个信号量
      • 如果内部计数器的值大于0,则减1并立即返回True。如果内部计数器的值为0,则等待release()被调用,然后返回True
    • locked()
      • 如果信号量不能被立即申请,则返回True
    • release()
      • 释放信号量,内部计数器加1。
      • BoundedSemaphore不同,Semaphore允许release()的调用次数大于acquire()的调用次数。

    BoundedSemaphore

    • class asyncio.**BoundedSemaphore(value=1,*,loop=None)**
      • 有界信号量,非线程安全。
      • 有界信号量是一种特殊的信号量——如果release()后内部计数器的值大于初始值,则引发ValueError异常。

    从python3.7开始:通过await lockyield from lock或通过声明with await lockwith(yield from lock)获取锁的用法被废弃。可使用async with lock代替。

    转载于:https://www.cnblogs.com/mamingqian/p/10075444.html

    展开全文
  • CPU多核同步原语

    2020-12-16 16:50:35
    程序执行顺序 在单个core上,program order是必须遵从;但在多个core上, 原子操作 保证操作的原子性,要么操作了,要么没有操作。...内存屏障同步原语 smp_mb()、 smp_wmb()、 smp_rmb()、 引入Store Buffer 为了防止
  • linux kernle 同步原语

    千次阅读 2019-04-18 10:41:45
    转载:同步原语 如何避免由于对共享数据的不安全访问导致的数据崩溃? 内核使用的各种同步技术: 技术 说明 适用范围 每CPU变量 在CPU之间复制数据结构 所有CPU 原子操作 对一个计数器原子地...
  • 操作系统—同步原语

    2021-01-07 20:49:35
    同步原语 原来我们都用的是单核的CPU,但是单核的性能现在已经很难有突破了,所以开始在一个CPU中添加多个物理核。 但是原来的应用程序都是为单核设计的,在多核运行无法体现多核的性能,为了更充分的使用多核,应用...
  • 当提到并发编程、多线程编程时,我们往往都离不开『锁』这一概念,Go 语言作为一个原生支持用户态进程 Goroutine 的语言,也一定会为开发者提供这一功能,锁的...在这一节中我们就会介绍 Go 语言中常见的同步原语...
  • 前面两篇文章,写了python线程同步原语的基本应用。下面这篇文章主要是通过阅读源码来了解这几个类的内部原理和是怎么协同一起工作来实现python多线程的。 相关文章链接:python同步原语--线程锁 python--线程...
  • fifolock 一个灵活的低级工具,用于在asyncio Python中创建同步原语
  • Python中的同步原语--锁 from atexit import register from random import randrange from threading import Thread, Lock, current_thread from time import ctime, sleep class CleanOutputSet(set): def __st....
  • 同步原语:当一个进程调用一个send原语时,在消息开始发送后,发送进程便处于阻塞状态,直至消息完全发送完毕,send原语的后继语句才能继续执行。当一个进程调用一个receive原语时,并不立即返回控制,而是等到把...
  • python--线程同步原语

    2018-11-20 00:56:00
    Threading模块是python3里面的多线程模块,模块内集成了许多的类,其中包括Thread,Condition,Event,Lock,Rlock,Semaphore,Timer...关于Lock的使用可以移步到我之前写的文章python同步原语--线程锁。 Event ...
  • 多线程锁是python多种同步原语中的其中一种。首先解析一下什么是同步原语,python因为GIL(全局解析锁)的缘故,并没有真正的多线性。另外python的多线程存在一个问题,在多线程编程时,会出现线程同时调用共同的...
  • eCos中的同步原语简要

    2014-02-19 09:54:30
    eCos提供的同步原语包括信号量、互斥量、条件变量、事件标志、邮箱以及消息队列。每种同步原语都有特别之处,不同的同步原语满足了应用对不同的线程间同步和消息传递需求。 eCos官网http://ecos.sourceware.org,...
  • 当提到并发编程、多线程编程时,我们往往都离不开『锁』这一概念,Go 语言作为一个原生支持用户态进程 Goroutine 的语言,也一定会为开发者提供这一功能,锁...在这一节中我们就会介绍 Go 语言中常见的同步原语 Mute...
  • 当提到并发编程、多线程编程时,我们往往都离不开『锁』这一概念,Go 语言作为一个原生支持用户态进程 Goroutine 的语言,也一定会为开发者提供这一功能,锁的...在这一节中我们就会介绍 Go 语言中常见的同步原语...
  • Barrier(屏障)是一种自定义的同步原语(synchronization primitive),它解决了多个线程(参与者)在多个阶段之间的并发和协调问题。 1)多个参与者执行相同的几个阶段的操作 2)在每一个阶段内,多个参与者并发...
  • cpp11-on-multicore, 在C 11中,多线程应用程序的各种同步原语 C 11中多线程应用程序的各种同步原语。在博客文章中,信号量是令人惊讶的。代码是在许可协议下发布的。 查看 LICENSE 文件。如何构建测试首先,你必须...
  • 将 Synchronization Primitives 暂且翻译为同步原语,设计目的是为了和线程模块相似。 一、Lock classasyncio.Lock(*,loop=None) 为了实现任务之间的互斥,不是线程安全的。 用于保证独自使用资源。...
  • Windows Vista 新增的同步原语 Robert Saccone and Alexander Taskov 代码下载位置: VistaSynchronization2007_06.exe (174 KB) Browse the Code Online 本文讨论: 条件变量Slim 读取器锁/...
  • C#并行编程-线程同步原语 原文:C#并行编程-线程同步原语菜鸟学习并行编程,参考《C#并行编程高级教程.PDF》,如有错误,欢迎指正。 背景 有时候必须访问变量、实例、方法、属性或者结构体,而这些并...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 2,842
精华内容 1,136
关键字:

同步原语