精华内容
下载资源
问答
  • 同步原语
    2021-07-25 10:08:28

    本文主要讲述C++11线程同步原语,以及使用场景。

    C++11 线程同步原语有三个:mutex(互斥锁),condition variable(条件变量)和semaphore(信号量)。


    一 mutex

    mutex是最简单的同步原语,用来防止多个线程并发的访问临界区资源。如果应用中有些资源在同一时间最多只能被一个线程访问,那么就使用mutex来保护这些资源。

    使用方法也非常简单,总共3步,

    1. 加锁
    2. 执行临界区代码
    3. 释放锁

    下面是简单的范例代码,一般配合std::lock_guard或者std::unique_lock来使用

    #include <iostream>
    #include <map>
    #include <string>
    #include <chrono>
    #include <thread>
    #include <mutex>
     
    std::map<std::string, std::string> g_pages;
    std::mutex g_pages_mutex;
     
    void save_page(const std::string &url)
    {
        // simulate a long page fetch
        std::this_thread::sleep_for(std::chrono::seconds(2));
        std::string result = "fake content";
     
        std::lock_guard<std::mutex> guard(g_pages_mutex);
        g_pages[url] = result;
    }
     
    int main() 
    {
        std::thread t1(save_page, "http://foo");
        std::thread t2(save_page, "http://bar");
        t1.join();
        t2.join();
     
        // safe to access g_pages without lock now, as the threads are joined
        for (const auto &pair : g_pages) {
            std::cout << pair.first << " => " << pair.second << '\n';
        }
        return 0;
    }
    
    

    加锁由guard的构造函数来完成,释放锁由guard的析构函数来完成,只要出了guard的作用域就会自动释放。

    PS: std::mutex提供的互斥锁是non-recursive的


    二 condition variable

    条件变量用于实现多个线程间的notification/synchronization机制,使用场景如下,

    Condition variable allows a thread T to wait for completion of a given event on a particular object (some shared state, data structure, anything). The event over here is really the change in state of some condition that thread is interested in. Until that is satisfied, the thread waits to be awakened later by a signalling thread (that actually changes the condition).

    这个notification/synchronization机制是mutex无法做到的。

    条件变量的使用需要3个元素,

    1. 条件变量 (同步原语)
    2. 互斥锁
    3. 条件 (一个共享变量或其它)

    下面是个简单范例,代码中cv是条件变量,m是互斥锁,ready和processed是条件

    #include <iostream>
    #include <string>
    #include <thread>
    #include <mutex>
    #include <condition_variable>
     
    std::mutex m;
    std::condition_variable cv;
    std::string data;
    bool ready = false;
    bool processed = false;
     
    void worker_thread()
    {
        // Wait until main() sends data
        std::unique_lock<std::mutex> lk(m);
        cv.wait(lk, []{return ready;});
     
        // after the wait, we own the lock.
        std::cout << "Worker thread is processing data\n";
        data += " after processing";
     
        // Send data back to main()
        processed = true;
        std::cout << "Worker thread signals data processing completed\n";
     
        // Manual unlocking is done before notifying, to avoid waking up
        // the waiting thread only to block again (see notify_one for details)
        lk.unlock();
        cv.notify_one();
    }
     
    int main()
    {
        std::thread worker(worker_thread);
     
        data = "Example data";
        // send data to the worker thread
        {
            std::lock_guard<std::mutex> lk(m);
            ready = true;
            std::cout << "main() signals data ready for processing\n";
        }
        cv.notify_one();
     	
        // wait for the worker
        {
            std::unique_lock<std::mutex> lk(m);
            cv.wait(lk, []{return processed;});
        }
        std::cout << "Back in main(), data = " << data << '\n';
     
        worker.join();
        return 0;
    }
    

    运行逻辑如下,

    1. worker_thread调用cv.wait()时,条件ready是false,就会释放互斥锁,然后在那里等待通知
    2. 主线程先去拿锁,然后把条件ready修改为true,接着释放锁,最后通知worker_thread
    3. worker_thread接到通知后发现条件为真,又会获取锁,接着处理data,并把条件processed置位true
    4. 同理可以分析条件processed

    三 semaphore

    信号量提供了以下2个特性:

    1. 允许N个线程并发访问临界区,N与应用相关
    2. 提供了类似条件变量的notification/synchronization机制

    本人没有使用过信号量…

    更多相关内容
  • 同步原语

    2021-04-28 14:57:34
    由于并发访问共享资源时,对共享资源的竞争(程序的正确性依赖于特定的执行顺序)导致错误,操作系统提供同步原语供开发者使用。 互斥锁 01 临界区问题 临界区:保证互斥访问的共享资源的代码区域。 互斥访问:...

    由于并发访问共享资源时,对共享资源的竞争(程序的正确性依赖于特定的执行顺序)导致错误,操作系统提供同步原语供开发者使用。

    互斥锁

    01 临界区问题

    • 临界区:保证互斥访问的共享资源的代码区域。
    1. 互斥访问:同一时刻,至多一个线程可以进入临界区。
    2. 有限等待:当一个线程申请进入临界区后,必须在有限时间内获得许可并进入临界区,不能无限等待。
    3. 空闲让进:当没有线程执行临界区代码时,必须在申请进入临界区的线程中选择一个,允许其执行代码,保证程序执行的进展。

    02 互斥锁的硬件实现:关闭中断

    03 互斥锁的软件实现:皮特森算法,解决只有两个线程的情况。

    • boolean全局数组flag[],flag[0],flag[1]分别代表线程0与线程1是否尝试进入临界区。初始值都为false。
    • 全局变量turn,决定最终进入临界区的线程编号。无需初始化,会在线程申请进入临界区时初始化。
    • 线程0进入临界区前必须满足:flag[1]==false || turn == 0,否则循环等待。

    04 软硬件协同:使用原子操作实现互斥锁。

    • 原子操作
      • Compare-And-Swap (CAS):比较地址addr上的值与期望值expected是否相等,若相等则将地址上的值换为新的new_value,否则不进行置换。cpu对任意地址的修改都会锁总线,以实现原子操作。
      • Fetch-And-Add (FAA):读取地址addr上的旧值,将其加上add_value后重新存回改地址,最后返回addr上的旧值。
      • Load Link/Store-Conditional(LL/SC):LL时cpu使用一个专门的监视器记录当前访问的地址,而在SC时,仅当监视的地址没有被其他核修改,才执行存储,否则返回存储失败。
    • 互斥锁抽象:
      • CAS自旋锁:利用一个lock变量表示锁状态,0:锁空闲,1:锁占用。加锁时,会确认是否锁空闲,空闲的话加锁,否则循环确认。
        • 缺点:不保证有限等待,不公平。
        • 优点:在竞争程度低时非常高效。
      • FAA牌号自旋锁:FIFO

    条件变量

    通过使用条件变量接口,一个线程可以停止使用cpu并将自己挂起,当等待条件满足后,其他线程会唤醒使它继续执行。场景:避免线程的循环忙等。必须搭配互斥锁一起使用,用于保护对条件的判断与修改。

    信号量

    根据剩余资源数量控制不同线程的执行。

    读写锁

    在多个读操作同时进行时不需要互斥锁。针对读者与写者分别提供不同的加锁解锁操作。

    • 读临界区:只能保证读者与写者互斥,允许多个读者同时。
    • 写临界区:不允许其他读者或写者进入。
    • 读写锁的偏向性:
      • 偏向读者:读者并行度提高,写者延迟。
      • 偏向写者:避免写延迟,读者并行度下降。

    RCU Read Copy Update

    多读者并行,写者不会阻塞读者。通过订阅/发布机制(保证顺序和原子性,修改链表节点),让写者原子的更新任意大小的数据;

    • 宽限期:为了确定何时回收旧资源,宽限期用于描述从写者更新指针到最后一个可能观察到旧数据的读者离开的时间段。读者需要标记自己读临界区的开始与结束点。
    • 优势:读者开销更小,不会被阻塞且无需耗时的同步操作。
    • 弊端:对于双向链表的订阅/发布机制较难实现,需要等待到所有读者离开临界区后才回收旧数据。

    同步带来的问题

    01 死锁

    • 死锁出现的必要条件:
      1. 互斥访问:保证一个共享资源在同一时间至多允许一个线程访问。
      2. 持有并等待:线程持有一些资源并等待另一些资源。
      3. 资源非抢占:一旦一个资源被持有,除非持有者主动放弃,否则其他竞争者都得不到。
      4. 循环等待:存在一系列线程都得不到资源,形成等待循环。
    • 死锁的检测与恢复:
      1. 被动策略:定时检测死锁存在、超时等待检测。
      2. 死锁预防:
        1. 避免互斥访问:如设计代理线程,专门用于管理对共享数据的访问和修改。(不易实现,代理线程多余)
        2. 不允许持有并等待:线程一次性申请所有资源,一旦任意资源不可用则释放所有资源。(造成线程饥饿,资源利用率低)
        3. 允许资源被抢占:允许线程抢占其他线程已占有的资源。(需保证被抢占的线程正确回滚)
        4. 避免循环等待:要求线程必须按照一定顺序获取资源,顺序靠前的线程能获取所有所需资源,避免循环等待。
    • 死锁避免算法:让系统在每次分配资源后处于安全状态,否则不分配资源给线程。(银行家算法)

    02 活锁

    • 锁的竞争者长时间无法获取锁进入临界区。线程没有发生阻塞,而是一直不断尝试获取锁、失败。

    03 优先级反转:由于同步导致线程执行顺序违反预设优先级问题。

    • 缺点:实时操作系统中,优先级反转会导致高优先级任务阻塞。
    • 避免优先级反转
      1. 不可抢占临界区协议:仍然阻塞优先级更高的线程。
      2. 优先级继承协议:当高优先级线程等待锁时,会使锁的持有者继承其优先级,从而避免该锁的临界区被低优先级任务打断。
      3. 优先级置顶协议:将获取锁的线程优先级置为可能竞争该锁线程中的最高优先级。
    展开全文
  • 它还公开了用于创建您自己的有效同步原语的低级API。 在x86_64 Linux上进行测试时,发现parking_lot::Mutex速度比std::sync::Mutex速度快1.5倍,而在多线程竞争时,速度最高可快5倍。 RwLock的数字取决于读取器和...
  • 操作系统—同步原语

    千次阅读 2021-01-07 20:49:35
    同步原语 原来我们都用的是单核的CPU,但是单核的性能现在已经很难有突破了,所以开始在一个CPU中添加多个物理核。 但是原来的应用程序都是为单核设计的,在多核运行无法体现多核的性能,为了更充分的使用多核,应用...

    同步原语

    原来我们都用的是单核的CPU,但是单核的性能现在已经很难有突破了,所以开始在一个CPU中添加多个物理核。
    但是原来的应用程序都是为单核设计的,在多核运行无法体现多核的性能,为了更充分的使用多核,应用程序需要将待处理的数据进行划分,从而能够在同一时间分配任务到多个核心上并行处理,利用更短的时间完成计算。
    然而并行处理同一任务意味着对共享资源的并发访问,为了保证共享资源状态的正确性,需要正确地在这些子任务之间进行同步。看下面这个例子
    在这里插入图片描述
    生产者不断生成数据放到共享缓冲区中,消费者从缓冲区拿数据,但是他需要满足两个前提条件:
    1.当缓冲区满时,生产者应停止向缓冲区写入数据
    2.当缓冲区为空时,消费者应停止从缓冲区拿数据

    下面这个代码段展示了一个生产者和一个消费者时的方案

    volatile int empty_slot = 5;
    volatile int filled_slot = 0;
    
    void producer(void)
    {
    	int new_msg;
    	while(TRUE)
    	{
    	new_msg = produce_new();
    	while(empty_slot == 0)
    	;//没有空位可使用
    	empty_slot--;
    	buffer_add(new_msg);
    	filled_slot ++;
    	}
    }
    
    void consumer(void)
    {
    	int cur_msg;
    	while(TRUE){
    	while(filled_slot == 0)
    	;//没有对象可消耗
    	filled_slot --;
    	cur_msg = buffer_remove();
    	empty_slot ++;
    	consume_msg(cur_msg);
    }
    }
    

    这里设置了两个全局计数器:filled_slot和empty_slot,其中filled_slot用于记录缓冲区中可以被消耗的对象数量,empty_slot用于记录缓冲区中剩余的空位数量。

    先看生产者,当要写入数据时,要看是否有空位,如果没有空位需要一直等待,直到有空位,放入数据后,要将空位数量减1,将可消耗数量加1.
    消费者同理。
    生产者消费者模型在并行编程中很常见,为了正确、高效地解决这些同步问题,前人抽象出了一系列同步原语供开发者使用。

    互斥锁

    临界区问题

    在这里插入图片描述
    像上图这样有两个生产者的情况,他们可能同时向3号缓存块进行写入,这样就产生了对该缓存块的竞争,这就叫做竞争冒险
    解决这个竞争冒险的方法就是在同一时刻,只允许一个生产者可以向3号缓冲区写入,这就叫做互斥访问
    保证互斥访问共享资源的代码区域被称为临界区

    在这里插入图片描述
    图中这个循环就是用来解决临界区问题的,那么为了程序的正确运行,必须满足以下条件的算法:
    (1)互斥访问:在同一时刻,最多只有一个线程可以执行临界区
    (2)有限等待:当一个线程申请进入临界区之后,必须在有限的时间内获得许可并进入临界区,不能无限等待。
    (3)空闲让进:当没有线程在执行临界区代码时,必须在申请进入临界区的线程中选择一个线程,允许其执行临界区代码,保证程序执行的进展。

    硬件实现:关闭中断

    我们可以通过关闭中断来解决单核的临界区问题,关中断意味着当前执行的线程不会被其他线程抢占,因此线程在进入临界区之前关闭中断,在离开临界区后开启中断,从而保证任意时刻只有一个线程执行临界区。关中断在执行在单核中满足了,互斥访问,有限等待,空闲让进。
    但是在多核中,关闭中断无法阻止多个同时运行的线程需要执行临界区

    软件实现:皮特森算法

    皮特森算法分可以用于多核的临界区问题在这里插入图片描述
    flag数组,0对应0号线程,1对应1号线程,TRUE代表申请进入临界区,turn是表示最终决定谁可以进临界区。如果0线程想进入临界区,必须满足下述两个条件之一:
    1.flag[1] == FALSE
    2.turn == 0

    在检查线程0是否可以进入线程时,线程1可能处于以下三种情况:
    1.线程1在准备进入临界区(空闲让进)
    也就是执行完第二行代码,这里解释一下turn为什么取的是对方的值,因为如果取自己的话,他就可以直接进入临界区了,那么线程0和线程1就可以同时进入临界区了,而取对方的,可以保证有一个可以进入临界区。
    2.线程1在临界区内部(互斥访问)
    由于线程1在临界区内,他就不会更新flag和turn的值,而线程0还置turn为1,因此线程0就需要循环等待。
    3.线程1在执行其他代码(有限等待)
    线程0这会在进行循环等待,而线程1执行完了第六行代码,flag[1]为false,turn=0,并且turn不会被线程0和1继续更新了,满足了线程0进入临界区的条件。

    软硬件协同:使用原子操作实现互斥锁

    我们还可以利用硬件提供的原子操作,设计新的软件算法来解决临界区问题。

    原子操作:不可被打断的一个或一系列操作,即要么这一系列指令都执行完成,要么这一系列指令一条都没有执行,不会出现执行到一半的状态。

    互斥锁抽象

    一把锁在一同一时刻只能被一个线程所拥有,一旦一个线程获取了一个锁,其他的线程均不能同时拥有该锁,只能等待该锁被释放。
    只有拥有锁的线程才能允许执行临界区代码,在退出临界区后要释放锁,从而允许其他线程拥有锁并进入临界区。

    代码展示了如何使用互斥锁保护生产者消费者的共享缓冲区。

    volatile int buffer_write_cnt = 0;
    volatile int buffer_read_cnt = 0;
    lock_t buffer_lock;
    int buffer[5];
    
    void buffer_init(void)
    {
    	lock_init(&buffer_lock);
    }
    
    void buffer_add_safe(int msg)
    {
    	lock(&buffer_lock);
    	buffer[buffer_write_cnt] = msg;
    	buffer_write_cnt = (buffer_write_cnt + 1) % 5;
    	unlock(&buffer_lock);
    }
    
    int buffer_remove_safe(void)
    {
    	lock(&buffer_lock);
    	int ret = buffer[buffer_read_cnt];
    	buffer_read_cnt = (buffer_read_cnt + 1) % 5;
    	unlock(&buffer_lock);
    	return ret;
    }
    

    互斥锁的种类繁多,不同的互斥锁被用于不同的场景,以达到最好的性能表现。
    本节介绍分别为利用原子CAS实现的自旋锁,和利用原子FAA实现的排号自旋锁。

    自旋锁

    自旋锁利用一变量lock来表示锁的状态,lock为1表示已经有人拿锁,而为0表示该锁空闲。

    void lock_init(int *lock)
    {
    	//初始化自旋锁
    	*lock = 0;
    }
    
    void lock(int *lock)
    {
    	while(atomic_CAS(lock, 0, 1) != 0)
    	;	//循环忙等
    }
    
    void unlock(int *lock)
    {
    	*lock = 0;
    }
    

    自旋锁满足了互斥访问和空闲让进,不满足有限等待,但是这不要紧,他还是很高效的,因此依然广泛的应用在各种软件之中。

    排号自旋锁

    顾名思义,排号锁就是要给用锁的线程进行排号,然后锁沿着这个号进行传递,因此可以说锁的竞争就变成了一个先进先出的等待队列。

    struct lock
    {
    	volatile int owner;
    	volatile int next;
    };
    
    void lock_init(struct lock *lock)
    {
    	//初始化排号锁
    	lock->owner = 0;
    	lock->next = 0;
    }
    
    void lock(struct lock*lock)
    {
    	//拿取自己的序号
    	volatile int my_ticket = atomic_FAA(&lock->next,1);
    	while(lock->owner != my_ticket)
    	; //循环忙等
    }
    
    void unlock(struct lock*lock)
    {
    	//传递给下一位竞争者
    	lock->owner++;
    }
    

    owner表示当前的锁持有序号,next表示下一个需要分发的序号
    第17行是拿取自己的序号,并累加,这样就不会拿取相同的序号
    第18行是看当前锁的持有者是不是自己,不是就循环等待
    第25行是释放锁,然后锁持有者向后传递。

    条件变量

    在生产者和消费者模型中,无剩余空位时,生产者会陷入循环等待,他可以不用循环等待的,这会浪费cpu资源,因此需要一种挂起/唤醒机制,条件变量就是为这个机制而设计的。
    通过条件变量的接口,一个线程可以停止使用CPU并将自己挂起,当等待的条件满足时,其他线程会唤醒该挂起的线程让其继续执行。

    int empty_slot = 5;
    int filled_slot = 0;
    struct cond empty_cond;
    struct lock empty_cnt_lock;
    struct cond filled_cond;
    struct lock filled_cnt_lock;
    
    void producer(void)
    {
    	int new_msg;
    	while(TRUE)
    	{
    		new_msg = produce_new();
    		lock(&empty_cnt_lock);
    		while(empty_slot == 0)
    		{
    			cond_wait(&empty_cond, &empty_cnt_lock);
    		}
    		empty_slot --;
    		unlock(&empty_cnt_lock);
    		
    		buffer_add_safe(new_msg);
    		
    		lock(&filled_cnt_lock);
    		filled_slot ++;
    		cond_signal(&filled_cond);
    		unlock(&filled_cnt_lock);
    	}
    }
    

    empty_cnt_lock和filled_cnt_lock是来保护对共享计数器empty_slot与filled_slot的修改的锁,这个锁设计的目的是在使用条件变量时,必须要搭配互斥锁一起使用。

    这里设置了两个条件,empty_cond 缓冲区无空位和filled_cond 缓冲区无数据。

    当生产者要写数据时发现没有空位,则通过cond_wait函数挂起,条件是empty_cond无空位,搭配的互斥锁是empty_cnt_lock,后边那个cond_signal是唤醒线程,由于写入数据则缓冲区存在数据了,可以唤醒由于缓冲区无数据而挂起的消费者线程。

    struct cond{
    	struct thread *wait_list;
    };
    
    void cond_wait(struct cond *cond, struct lock *mutex)
    {
    	list_append(cond->wait_list, thread_self());//将线程加入等待队列
    	atomic_block_unlock(mutex);	//原子挂起并放锁
    	lock(mutex);	//重新获得互斥锁(被唤醒后)
    }
    
    void cond_signal(struct cond *cond)
    {
    	if(!list_empty(cond->wait_list))//看是否有线程等待在条件变量上
    		wakeup(list_remove(cond->wait_list));	//操作系统提供的唤醒
    }
    
    void cond_broadcast(struct cond *cond)	//广播操作,用于唤醒所有等待在条件变量上的线程
    {
    	while(!list_empty(cond->wait_list))
    		wakeup(list_remove(cond->wait_list));
    }
    

    信号量

    上述解决生产者和消费者问题的两个计时器,就是信号量,除了初始化之外,信号量只能通过两个操作来更新,P操作即检索,V操作即自增,因此信号量又称为PV原语,为了便于理解,一般会使用wait和signal来表示信号量的P和V操作。

    wait操作用于用于等待,当信号量的值小于或等于0时进入循环等待。
    signal操作则用于通知,其会增加信号量的值供wait的线程使用。

    void wait (int *S)
    {
    	while(*S <= 0)
    		;	//循环忙等
    	*S = *S - 1;
    }
    
    void signal(int *S)
    {
    	*S = *S + 1;
    }
    

    信号量的实现

    struct sem{
    	int value;	//可正可负。正代表可用资源,负代表等待获取资源的线程数量
    	int wakeup;	//有线程等待时的可用资源数量
    	struct lock sem_lock;
    	struct cond sem_cond;
    }
    
    void wait(struct sem *S)
    {
    	lock(&S->sem_lock);
    	S->value --;
    	if(S->value < 0){
    	do{
    		cond_wait(&S->sem_cond,&S->sem_lock);
    	  }
    	 while(S->wakeup == 0);
    	 S->wakeup --;
    	}
    	unlock(&S->sem_lock);
    }
    
    void signal(struct sem *S)
    {
    	lock(&S->sem_lock);
    	S->value ++;
    	if(S->value <= 0)
    	{
    	S->wakeup ++;
    	cond_signal(&S->sem_cond);
    	}
    	unlock(&S->sem_cond);
    }
    

    在这里插入图片描述

    管程

    线程安全是指某个函数,函数库在多线程环境中被调用时,能够正确地使用同步原语保护多个线程对共享变量的访问与修改。

    管程:共享的数据,操作共享数据的函数。
    管程保证在同一时刻最多只有一个操作者能够进入管程的保护区域访问共享数据。

    死锁

    必要条件:

    • 互斥访问
    • 持有并等待
    • 资源非抢占
    • 循环等待

    死锁预防:

    • 避免互斥访问
    • 不允许持有并等待
    • 允许资源被抢占
    • 避免循环等待

    死锁避免:
    系统存在两种状态,安全状态和非安全状态,在安全状态下,一定存在一个安全序列,如果系统按照这个序列调度线程执行,即可避免资源不足的情况发生。

    活锁:两个线程,1号线程获取锁a,2号线程获取锁b,然后1号获取锁b,2号获取锁a,发现锁被占有,然后1号放弃锁a,2号放弃锁b,但是两个线程执行速度相似,1号又获取了锁a,2号又获取了锁b,导致循环往复,这是活锁现象。

    优先级反转

    优先级反转的意思就是,当一个低优先级线程获取了一个锁并进入临界区执行,那么当一个高优先级线程也要获取该锁并执行时,需要等待低优先级执行完并放锁后才能执行,因此出现了低优先级先于高优先级执行的情况,这既是优先级反转。

    展开全文
  • 本框架实现了一套协程同步原语来解决原生同步原语带来的阻塞问题,在协程同步原语之上实现更高层次的抽象——Channel用于协程之间的便捷通信,本文简单介绍一下如何设计。 我们都知道,一旦协程阻塞后整个协程所在的...

    本框架实现了一套协程同步原语来解决原生同步原语带来的阻塞问题,在协程同步原语之上实现更高层次的抽象——Channel用于协程之间的便捷通信,本文简单介绍一下如何设计。

    我们都知道,一旦协程阻塞后整个协程所在的线程都将阻塞,这也就失去了协程的优势。编写协程程序时难免会对一些数据进行同步,而Linux下常见的同步原语互斥量、条件变量、信号量等基本都会堵塞整个线程,使用原生同步原语协程性能将大幅下降,甚至发生死锁的概率大大增加!

    只有重新实现一套用户态协程同步原语才能解决这个问题。

    在开始实现之前我们先简单介绍一下原理。原生同步对象由内核维护,当互斥量获取锁失败,条件变量wait,信号量wait获取失败时,内核将条件不满足的线程加入一个由内核维护的等待队列,然后阻塞线程,等待条件满足时将线程重新加入调度。

    如同协程之于线程,我们很容易得到一个启示,既然内核维护等待队列会阻塞线程,那可不可以由用户态来维护等待队列呢。当获取协程同步对象失败时,用户将条件不满足的协程加入一个由用户维护的协程等待队列,然后让出协程,等待条件满足时将协程重新加入协程调度器调度。看,我们解决了线程同步问题,而且没有阻塞线程!

    介绍完了原理,我们来看看实现,框架实现了一下以下几种协程同步原语

    • CoMutex 协程锁
    • CoCondvar 协程条件变量
    • CoSemaphore 协程信号量
    • Channel 消息通道

    依赖关系如下:

        
    CoMutex    CoCondVar    CoMutex    CoCondVar
        |         |             |         |
        -----------             -----------
             |                       |
             V                       V
        CoSemaphore				  Channel
    

    为了保持通用性,我在部分代码采用了伪代码,你可以很容易地移植到你的协程框架上,当然如果你想看一下具体实现可翻到文章的最后找一下框架链接。

    SpinLock 自旋锁

    在此之前不得不提一下自旋锁。不管你是用TAS实现还是直接封装posix spin lock他们都有一个共同特点,就是不阻塞线程。我们的同步原语可以说都是基于自旋锁来实现,这里简单封装了一下posix自旋锁。

    
    /**
     * @brief 自旋锁
     */
    class SpinLock : Noncopyable {
    public:
        using Lock = ScopedLock<SpinLock>;
        SpinLock(){
            pthread_spin_init(&m_mutex,0);
        }
        ~SpinLock(){
            pthread_spin_destroy(&m_mutex);
        }
        void lock(){
            pthread_spin_lock(&m_mutex);
        }
        bool tryLock() {
            return !pthread_spin_trylock(&m_mutex);
        }
        void unlock(){
            pthread_spin_unlock(&m_mutex);
        }
    private:
        pthread_spinlock_t m_mutex;
    };
    

    CoMutex 协程锁

    CoMutex的定义如下

    
    /**
     * @brief 协程锁
     */
    class CoMutex : Noncopyable {
    public:
        using Lock = ScopedLock<CoMutex>;
        bool tryLock();
        void lock();
        void unlock();
    private:
        // 协程所持有的锁
        SpinLock m_mutex;
        // 保护等待队列的锁
        SpinLock m_gaurd;
        // 持有锁的协程id
        uint64_t m_fiberId = 0;
        // 协程等待队列
        std::queue<std::shared_ptr<Fiber>> m_waitQueue;
    };
    

    成员m_waitQueue就是用户态维护的等待队列,维护等待这个锁的协程。

    成员函数lock的主要代码如下

    void lock() {
        ...
         // 第一次尝试获取锁
        while (!tryLock()) {
            // 由于进入等待队列和出队的代价比较大,所以再次尝试获取锁,
            // 成功获取锁就返回
            if(tryLock()){
                ...
                return;
            }
            // 获取所在的协程
            auto self = GetTHisFiber();
            // 将自己加入协程等待队列
            m_waitQueue.push(self);
            // 让出协程
            Yield;
        }
        ...
    }
    

    我们尝试获取锁,如果获取失败就把自己放入等待队列并让出协程。

    成员函数unlock的主要代码如下

    void unlock() {
        ...
        auto Fiber = m_waitQueue.front();
        ...
        // 释放协程锁    
        m_mutex.unlock();
        ...
        // 将等待的协程重新加入调度
        Schedule(fiber);
        ...
    }
    

    我们取出等待这个锁的协程,释放锁后将协程重新加入调度器。

    通过一个很简单方式,我们在用户空间实现了互斥量。

    使用样例

    CoMutex mutex;
    void a() {
        for (int i = 0; i < 100000; ++i) {
            CoMutex::Lock lock(mutex);
            ++n;
        }
    }
    
    void b() {
        for (int i = 0; i < 100000; ++i) {
            CoMutex::Lock lock(mutex);
            ++n;
        }
    }
    

    CoCondVar 协程条件变量

    CoCondVar的定义如下

    /**
     * @brief 协程条件变量
     */
    class CoCondVar : Noncopyable {
    public:
        using MutexType = SpinLock;
        /**
         * @brief 唤醒一个等待的协程
         */
        void notify();
        /**
         * @brief 唤醒全部等待的协程
         */
        void notifyAll();
    	...
        /**
         * @brief 等待唤醒
         */
        void wait(CoMutex::Lock& lock);
    
    private:
        // 协程等待队列
        std::queue<std::shared_ptr<Fiber>> m_waitQueue;
        // 保护协程等待队列
        MutexType m_mutex;
        ...
    };
    

    CoMutex一样,协程条件变量也维护了一个等待队列。

    成员函数notify的主要代码如下

    void notify() {
        ...
        Fiber::ptr fiber;
    
        // 减小锁的粒度
        {
            // 获取一个等待的协程
            MutexType::Lock lock(m_mutex);
            fiber = m_waitQueue.front();
            m_waitQueue.pop();
        }
        // 将等待的协程重新加入调度
        Schedule(fiber);
    }
    

    与协程锁的解锁类似,获取一个在等待队列里的协程重新加入调度器。

    成员函数notifyAll则是将全部等待的协程加入调度器。

    成员函数notify的主要代码如下

    void wait(CoMutex::Lock& lock) {
        // 获取本协程对象
        auto self = GetThisFiber();
        {
            MutexType::Lock lock1(m_mutex);
            // 将自己加入等待队列
            m_waitQueue.push(self);
    		...
        }
        // 先解锁
        lock.unlock();
        // 让出协程
        Yield;
        // 重新获取锁
        lock.lock();
    }
    

    注意,只有先将协程锁解锁了才能加入到等待队列,否则别的协程无法获取锁,被唤醒后要重新获取锁。

    至此我们已经实现了两个重要的同步原语。

    使用样例

    CoMutex mutex;
    CoCondVar condVar;
    void cond_a() {
        CoMutex::Lock lock(mutex);
        LOG_INFO() << "cond a wait";
        condVar.wait(lock);
        LOG_INFO() << "cond a notify";
    }
    void cond_b() {
        CoMutex::Lock lock(mutex);
        LOG_INFO() << "cond b wait";
        condVar.wait(lock);
        LOG_INFO() << "cond b notify";
    }
    void cond_c() {
        sleep(2);
        LOG_INFO() << "notify cone";
        condVar.notify();
        sleep(2);
        LOG_INFO() << "notify cone";
        condVar.notify();
    }
    

    CoSemaphore 协程信号量

    CoSemaphore的定义如下

    /**
     * @brief 协程信号量
     */
    class CoSemaphore : Noncopyable {
    public:
        CoSemaphore(uint32_t num) {
            m_num = num;
            m_used = 0;
        }
    
        void wait();
        void notify();
    
    private:
        // 信号量的数量
        uint32_t m_num;
        // 已经获取的信号量的数量
        uint32_t m_used;
        // 协程条件变量
        CoCondVar m_condvar;
        // 协程锁
        CoMutex m_mutex;
    };
    

    协程信号量是基于协程锁和协程条件变量的。

    成员函数wait的主要代码如下

    void wait() {
        CoMutex::Lock lock(m_mutex);
        // 如果已经获取的信号量大于等于信号量数量则等待
        while (m_used >= m_num) {
            m_condvar.wait(lock);
        }
        ++m_used;
    }
    

    在条件变量的wait里让出协程等待。

    成员函数notify的主要代码如下

    void notify() {
        CoMutex::Lock lock(m_mutex);
        if (m_used > 0) {
            --m_used;
        }
        // 通知一个等待的协程
        m_condvar.notify();
    }
    

    有了协程锁和协程条件变量,协程信号量实现起来十分简单。

    使用样例

    CoSemaphore sem(5);
    void sem_a() {
        for (int i = 0; i < 5; ++i) {
            sem.wait();
        }
        sleep(2);
        for (int i = 0; i < 5; ++i) {
            sem.notify();
        }
    }
    void sem_b() {
        sleep(1);
        for (int i = 0; i < 5; ++i) {
            sem.wait();
        }
        for (int i = 0; i < 5; ++i) {
            sem.notify();
        }
    }
    

    Channel 消息通道

    Channel主要是用于协程之间的通信,属于更高级层次的抽象。

    在类的实现上采用了 PIMPL 设计模式,将具体操作转发给实现类

    Channel对象可随意复制,通过智能指针指向同一个 ChannelImpl

    Channel的定义如下

    template<typename T>
    class Channel {
    public:
        Channel(size_t capacity) {
            m_channel = std::make_shared<ChannelImpl<T>>(capacity);
        }
        Channel(const Channel& chan) {
            m_channel = chan.m_channel;
        }
        void close() {
            m_channel->close();
        }
        operator bool() const {
            return *m_channel;
        }
    
        bool push(const T& t) {
            return m_channel->push(t);
        }
    
        bool pop(T& t) {
            return m_channel->pop(t);
        }
    
        Channel& operator>>(T& t) {
            (*m_channel) >> t;
            return *this;
        }
    
        Channel& operator<<(const T& t) {
            (*m_channel) << t;
            return *this;
        }
    
        size_t capacity() const {
            return m_channel->capacity();
        }
    
        size_t size() {
            return m_channel->size();
        }
    
        bool empty() {
            return m_channel->empty();
        }
    
        bool unique() const {
            return m_channel.unique();
        }
    private:
        std::shared_ptr<ChannelImpl<T>> m_channel;
    };
    

    ChannelImpl的定义如下

    /**
     * @brief Channel 的具体实现
     */
    template<typename T>
    class ChannelImpl : Noncopyable {
    public:
        ChannelImpl(size_t capacity)
                : m_isClose(false)
                , m_capacity(capacity){
        }
    
        /**
         * @brief 发送数据到 Channel
         * @param[in] t 发送的数据
         * @return 返回调用结果
         */
        bool push(const T& t);
        /**
         * @brief 从 Channel 读取数据
         * @param[in] t 读取到 t
         * @return 返回调用结果
         */
        bool pop(T& t);
    
        ChannelImpl& operator>>(T& t) {
            pop(t);
            return *this;
        }
    
        ChannelImpl& operator<<(const T& t) {
            push(t);
            return *this;
        }
        /**
         * @brief 关闭 Channel
         */
        void close();
    
        operator bool() {
            return !m_isClose;
        }
    
        size_t capacity() const {
            return m_capacity;
        }
    
        size_t size() {
            CoMutex::Lock lock(m_mutex);
            return m_queue.size();
        }
    
        bool empty() {
            return !size();
        }
    private:
        bool m_isClose;
        // Channel 缓冲区大小
        size_t m_capacity;
        // 协程锁和协程条件变量配合使用保护消息队列
        CoMutex m_mutex;
        // 入队条件变量
        CoCondVar m_pushCv;
        // 出队条件变量
        CoCondVar m_popCv;
        // 消息队列
        std::queue<T> m_queue;
    };
    

    成员函数push的主要代码如下

    bool push(const T& t) {
        CoMutex::Lock lock(m_mutex);
        if (m_isClose) {
            return false;
        }
        // 如果缓冲区已满,等待m_pushCv唤醒
        while (m_queue.size() >= m_capacity) {
            m_pushCv.wait(lock);
            if (m_isClose) {
                return false;
            }
        }
        m_queue.push(t);
        // 唤醒m_popCv
        m_popCv.notify();
        return true;
    }
    

    成员函数pop的主要代码如下

    bool pop(T& t) {
        CoMutex::Lock lock(m_mutex);
        if (m_isClose) {
            return false;
        }
        // 如果缓冲区为空,等待m_pushCv唤醒
        while (m_queue.empty()) {
            m_popCv.wait(lock);
            if (m_isClose) {
                return false;
            }
        }
        t = m_queue.front();
        m_queue.pop();
        // 唤醒 m_pushCv
        m_pushCv.notify();
        return true;
    }
    

    成员函数close的主要代码如下

    void close() {
        CoMutex::Lock lock(m_mutex);
        if (m_isClose) {
            return;
        }
        m_isClose = true;
        // 唤醒等待的协程
        m_pushCv.notify();
        m_popCv.notify();
        std::queue<T> q;
        std::swap(m_queue, q);
    }
    

    通过Channel我们很容易实现一个生产者消费者的样例

    
    void chan_a(Channel<int> chan) {
        for (int i = 0; i < 10; ++i) {
            chan << i;
            ACID_LOG_INFO(g_logger) << "provider " << i;
        }
        ACID_LOG_INFO(g_logger) << "close";
        chan.close();
    }
    
    void chan_b(Channel<int> chan) {
        int i;
        while (chan >> i) {
            ACID_LOG_INFO(g_logger) << "consumer " << i;
        }
        ACID_LOG_INFO(g_logger) << "close";
    }
    void test_channel() {
        IOManager loop{};
        Channel<int> chan(5);
        loop.submit(std::bind(chan_a, chan));
        loop.submit(std::bind(chan_b, chan));
    }
    

    最后

    整套协程同步原语的核心其实就是协程队列,我们通过在用户态模拟了等待队列达到了原生同步原语的效果。并对之进行更高层次的抽象,得到了Channel,它使代码变得简洁优雅,不用考虑协程间的同步问题。

    项目地址:zavier-wong/acid: A high performance fiber RPC network framework. 高性能协程RPC网络框架 (github.com)

    展开全文
  • 异步同步原语。 此板条箱提供以下原语: Barrier -使任务可以同时同步所有任务。 Mutex -互斥锁。 RwLock读写器锁,允许任意数量的读取器或单个写入器。 Semaphore -限制并发操作的数量。 执照 根据以下任一...
  • 自旋 基于自旋的同步原语。 此板条箱在std::sync提供了版本。 由于同步是通过旋转完成的,因此这些原语适合在no_std环境中使用。 在决定使用spin之前,我们建议阅读,其中讨论了spinlock的优缺点。 如果您可以访问...
  • 这个C90库提供了可移植的同步原语集合,用于多线程编程。 提供了以下原语: cds_sync_futex_t一个 (快速用户空间),除非必须将线程置于睡眠或唤醒状态,否则保证保留在用户空间代码中。 cds_sync_fusem_t快速的...
  • 更高级的同步原语。 实现了一些 Go 同步原语。 令牌 提供令牌实现。 只有拥有Token才能做事,然后才能将令牌移交给其他人。 批 提供批量实现。 类似于errgroup ,可以返回每个任务的所有错误结果。 任何 提供部分...
  • zksync zksync提供了一个go实现,您可以在找到同步原语:锁和屏障。 这些可用于协调多个过程的计算。RWMutex RWMutex提供读写锁定。 读者可以共享并发访问权限,只要作家没有领取该锁即可。 作家必须是排他性的,...
  • 同步原语 安装 npm install sync-primitives 原料药 信号 构造函数 number [value = 1] 获得 Object [选择] boolean [blocking = true] boolean [超时= -1] return : Promise. 释放 锁 function fn Object ...
  • 使用AmpPHP应用程序和库的同步原语。安装该软件包可以作为依赖项安装。 composer require amphp/sync文献资料可以在以及目录中找到文档。版本控制像所有其他amphp软件包一样, amphp/sync遵循语义版本控制规范。安全...
  • 1. 介绍 2. 初始代码 3. 任务 4. 测试
  • CPU多核同步原语

    2020-12-16 16:50:35
    程序执行顺序 在单个core上,program order是必须遵从;但在多个core上, 原子操作 保证操作的原子性,要么操作了,要么没有操作。...内存屏障同步原语 smp_mb()、 smp_wmb()、 smp_rmb()、 引入Store Buffer 为了防止
  • 本篇博客将谈谈java的三个同步原语。 volatile volatile变量自身具有下列特性 可见性:对一个volatile变量的读,总是能看到任意线程对这个变量最后的写入。 原子性:对任意单个volatile变量的读、写具有原子性,...
  • 硬件同步原语 硬件同步原语(Atomic Hardware Primitives)是由计算机硬件提供的一组原子操作,我们比较常用的原语主要是 CAS 和 FAA 这两种。 原语:原子操作,只要当前线程执行完毕之后,才会切换下一个线程执行。...
  • 征服一次延迟和一次性初始化的同步原语使用低级阻塞机制,阻塞和非阻塞方法之间有明显区别,并在使用自旋锁时额外支持#[no_std]环境。用法要使用此板条箱,请将以下内容添加到Cargo.toml [dependencies]conquer-once...
  • Golang 并发编程之同步原语

    千次阅读 2020-04-30 14:45:42
    在这一节中我们就会介绍 Go 语言中常见的同步原语 Mutex、RWMutex、WaitGroup、Once 和 Cond 以及扩展原语 ErrGroup、Semaphore和 SingleFlight 的实现原理,同时也会涉及互斥锁、信号量等并发编程中的常见概念。...
  • asyncio同步原语与线程(threading)模块同步原语基本类似,但有两点重要区别: asyncio同步原语非线程安全,因此不应被用作系统线程同步(可以使用threading代替); asyncio同步原语不允许使用timeout参数;可以...
  • ecos系统同步原语

    2018-06-06 10:14:38
    ecos系统同步原语, 包含互斥,信号量, 信箱, 事件, Spinlock ,条件变量。
  • 2017/3/12 18.5.7 同步原语 Python 3.6.1rc1文档 18.5.7同步原语 锁 Lock Event Condition 信号量 Semaphore BoundedSemaphore ASYNCIO锁定API被设计成接近类threading 模块 Lock Event Condition Semaphore ...
  • 非阻塞轻量级同步结构 提供基于板箱的轻量级同步结构。 嵌入式系统上Rust的意思。 这是一个no_std板条箱,不需要堆分配。执照您可以根据自己的判断,根据以下其中一项许可使用此板条箱: 阿帕奇2.0 麻省理工学院
  • 期货介入式此板条箱基于侵入式集合的概念,提供了各种基于Futures和async/await兼容类型: 多种口味的渠道: 单发多生产者多消费者(MPMC) 国家广播同步基元: 手动重置事件互斥体信号计时器有关详细信息,请参阅。...
  • 操作系统同步原语

    2018-09-16 15:51:59
    内核提供给核外调用的过程或者函数成为原语(primitive),原语在执行过程中不允许中断。 sleep 是一个将调用进程阻塞的系统调用,直到另外一个进程调用 wakeup 方法,将被阻塞的进程作为参数,将其唤醒。阻塞与忙...
  • 高可移植的C系统库:线程和同步原语,套接字(TCP,UDP,SCTP),IPv4和IPv6,IPC,散列函数(MD5,SHA-1,SHA-2,SHA-3,GOST),二进制树 ,AVL)等等。 本地代码性能。
  • 内核同步原语

    2017-08-25 22:08:30
    最好的内核同步手段是不需要同步。 对于SMP系统,如果数据结构可以设计为每cpu变量,将最有效的避免cpu间的同步。 原子锁要求一些寄存器读写动作采用automic_t型变量,并在单条指令内完成,并对该指令加automic锁,...
  • 英文版ARM公司技术资料,讲述ARMv6的同步原语以及如何在ARMv5之前的CPU上通过 SWP 和 SWPB 指令实现同步。
  • cpp11-on-multicore, 在C 11中,多线程应用程序的各种同步原语 C 11中多线程应用程序的各种同步原语。在博客文章中,信号量是令人惊讶的。代码是在许可协议下发布的。 查看 LICENSE 文件。如何构建测试首先,你必须...
  • fifolock 一个灵活的低级工具,用于在asyncio Python中创建同步原语
  • Python中的同步原语--锁 from atexit import register from random import randrange from threading import Thread, Lock, current_thread from time import ctime, sleep class CleanOutputSet(set): def __st....

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 39,553
精华内容 15,821
关键字:

同步原语

友情链接: SINE256demos.rar