精华内容
下载资源
问答
  • [C++] C++11 原子变量

    2021-11-06 10:25:47
    在多线程程序中使用原子变量std::atomic可以非常方便地避免并发访问时的线程安全问题(data races),此外还可以通过指定不同的std::memory_order指定线程间数据同步的粒度。 原子变量 template <class T> ...

    在多线程程序中使用原子变量std::atomic可以非常方便地避免并发访问时的线程安全问题(data races),此外还可以通过指定不同的std::memory_order指定线程间数据同步的粒度。

    原子变量

    template <class T> struct atomic;
    

    常用方法:

    	void store (T val, memory_order sync = memory_order_seq_cst);	// 修改值
    	T load (memory_order sync = memory_order_seq_cst);				// 赋值
    	T exchange (T val, memory_order sync = memory_order_seq_cst);	// 获取值的同时修改值
    	
    	T fetch_add (T val, memory_order sync = memory_order_seq_cst);	// 加上一个值
    	T fetch_sub (T val, memory_order sync = memory_order_seq_cst);	// 减去一个值
    	
    	T operator++() / T operator++ (int);							// 自增,i++ / ++i
    	T operator--() / T operator-- (int);							// 自减,i-- / --i
    

    简单示例:

    #include <iostream>
    #include <thread>
    #include <atomic>
    using namespace std;
    
    atomic<int> foo(0); // 正确的初始化方式
    
    void put_foo(int x)
    {
        foo.store(x);
    }
    
    void get_foo()
    {
        int x;
        do
        {
            x = foo.load();
        } while (x == 0);
        cout << "foo:" << x << endl;
    }
    
    int main()
    {
        thread t1(get_foo);
        thread t2(put_foo, 1000);
        t1.join();
        t2.join();
        return 0;
    }
    

    memory_order

    编译器对代码进行优化时可能对代码重新排序,CPU也可能对指令进行重新排序、延缓执行、缓存等等,在多线程环境下这种指令的重排序有可能造成问题。

    于是引入了内存屏障(Memory Barrier),它确保 Barrier 之前的指令都执行完了,Barrier 之后的指令才执行。从而确保了线程间数据的同步方式是与我们代码中的逻辑一样的。

    并不是所有的多线程场合都需要内存屏障,比如 C++ 的 shared_ptr 的引用计数,我们只关心当前的引用数量,而不关心引用计数何时增加了何时减少了。原子变量通过指定 memory_order 参数来指定这种同步的粒度。详细内容可以参考这篇文章,写的很清楚。

    typedef enum memory_order {
        memory_order_relaxed,   // relaxed
        memory_order_consume,   // consume
        memory_order_acquire,   // acquire
        memory_order_release,   // release
        memory_order_acq_rel,   // acquire/release
        memory_order_seq_cst    // sequentially consistent
    } memory_order;
    
    展开全文
  • C语言中,为了实现高性能,多线程编程要使用无锁数据结构(lock-free),引用计数是一种经常被使用的机制。但是,引用计数单打独斗是不行的,必须配合延迟删除机制(原文中的RCU方法),才可以高性能的、线程安全的...

    【译文】使用原子变量和GCC在C/C++中实现引用计数

    译者序

    原文 C/C++ reference counting with atomic variables and gcc 作者 Alexander Sandler,发布于2009年5月27日。

    为了实现高性能,多线程编程要使用无锁数据结构(lock-free),引用计数是一种经常被使用的机制。但是,引用计数单打独斗是不行的,必须配合延迟删除机制(原文中的RCU方法),才可以高性能的、线程安全的访问对象。

    例如下面的示例中,如果线程thread_eraser直接调用item_unref(而不是放入延迟删除队列),函数判断引用计数为0后释放内存;同一时刻线程thread_manipulator调用item_ref增加引用计数后访问对象,由于该对象已经被释放,就会出非法的内存访问(野指针)。

    struct item
    {
    	int ref_cnt;
    };
    struct item* item_new(void)
    {
    	struct item* p=(struct item*)calloc(sizeof(struct item), 1);
    	p->ref_cnt=1;
    	return p;
    }
    void item_unref(struct item* p)
    {
    	if ((__sync_sub_and_fetch(&p->ref_cnt, 1) == 0))
    	{
    		free(p);
    	}
    }
    void item_ref(struct item* p)
    {
    	__sync_add_and_fetch(&(p->ref_cnt), 1);
    	return;
    }
    struct hash_table htable;
    pthread_mutex_t lock_htable;
    void thread_manipulator(void)
    {
    	struct item* p=NULL;
    	//...
    	//某些情况下,我们需要修改key1对应的对象
    	pthread_mutex_lock(&lock_htable);
    	p=hash_table_search(htable, key1);
    	pthread_mutex_unlock(&lock_htable);
    	
    	item_ref(p);
    	//some code work on p
    	item_unref(p);
    	...
    }
    void thread_eraser(void)
    {
    	struct item* p=NULL;
    	//..
    	//某些情况下,我们需要删除key1和其对象
    	pthread_mutex_lock(&lock_htable);
    	p=hash_table_remove(htable, key1);
    	pthread_mutex_unlock(&lock_htable);	
    	p->remove_time=time(NULL);
    	queue_append_on_tail(queue, p);
    	//...
    	//处理队列中的延迟删除数据
    	while(1)
    	{
    		p=queue_read_head(queue);
    		if(time(NULL)-p->remove_time>60)
    		{
    			queue_remove(queue, p);
    			item_unref(p);
    		}
    		else
    		{
    			break;
    		}
    	}
    }
    

    以下为翻译正文

    简介

    我们经常需要使用两个以上的线程访问数据结构(Data Structure,如哈希表)中的对象(Object,如哈希表中的Value)。为了达到最佳性能,我们必须区分两种保护机制:

    1. 保护数据结构自身,如哈希表。
    2. 保护数据结构中的对象,如哈希表中的Value。

    引用计数用来解决什么问题?

    为了理解这一问题,我们考虑下面的场景:

    • 线程1(操控者)需要从数据结构中找到一个特定的对象并修改。对应示例中的thread_manipulator线程,从hash table读取key1的value,然后修改。
    • 线程2(擦除者)删除数据结构中过期淘汰的对象。对应到示例中的thread_eraser线程,从hash table删除key1和其value。

    在程序运行时,线程1和线程2有可能访问相同的对象,即竞争访问,这是典型的多线程编程场景。

    很明显,两个线程必须在某种互斥机制下才能正常工作。我们可以用一个全局互斥锁(Mutex)保护整个数据结构。这种情况下,操控者线程在查找和修改对象时,必须全程持有锁。这意味着,操控者线程工作期间,擦除者线程不能访问数据结构。

    某些情况下,全局互斥锁的设计将成为系统性能的瓶颈。解决该问题的唯一方法是将保护数据结构保护对象的两种线程安全机制区分开来。这样,操控者线程只需在查询对象时持有锁,一旦查询到就可以释放,然后擦除者线程就可以淘汰过期对象了。

    但是,我们如何确保擦除者线程删除的对象,没有被操控者线程修改呢?很自然的,我们能会想用另一个互斥锁去保护对象本身。此刻,我们要问自己一个问题:需要多少个互斥锁来保护对象的内容?

    如果用一个互斥锁保护所有的对象,这和上面全局互斥锁的方案有一样的性能瓶颈。如果每个对象用一个互斥锁,我们会陷入另一个略微不同的困境中。

    假设我们为每个对象创建了一个互斥锁。我们怎么管理这些互斥锁呢?我们可以把互斥锁作为对象内的成员,但是,这又引起另一个问题:当擦除者线程决定删除一个对象时,操控者线程查询到该对象并准备获取它的锁,由于锁的持有者是擦除者线程,操控者线程会被阻塞。之后擦除者线程删除对象和其内部的锁,留着操控线程永远阻塞在一个不存在的对象。哎唷~

    引用计数就是解决上述问题的一个方案,而且幸亏有原子变量,要不还得整个互斥锁保护这个引用计数。

    用原子变量实现引用计数

    首先,我们将对象的引用计数初始化为1;操控者线程访问对象时,要将引用计数加1,访问结束后减1。
    当擦除者线程决定删除特定的对象时,它需要获得数据结构的全局锁,然后从数据结构中移除该对象,再将该对象的引用计数减1。对象的引用计数的初始值是1,对象未被使用时计数应该是0。

    如果是0,我们可以安全的删除它。此时对象已从数据结构中移除,我们可以确认操控者线程不会再使用它。

    如果引用计数大于0,我们需要等引用计数变为0后再删除。问题是怎么等?这个问题比较棘手,我通常会从两个方案中选一个:朴素方案和RCU方案。

    朴素方案

    我们创建一个待删除对象的列表,每当擦除者线程唤醒时,遍历整个列表,删除其中引用计数为0的对象。
    如果列表中的对象特别多,遍历的开销会比较大,此时可以考虑RCU方案。

    RCU方案

    RCU是另一种同步机制,Linux内核使用的比较多,你可阅读Hierarchical RCU了解更多。我们的方案和RCU比较像。
    方案的思路基于对象被操控者线程持有的时间是有上限的,过了这个时间,对象肯定不会被使用,擦除者线程就可以删掉它。
    我们假定操控者线程最多需要用1分钟来修改对象。1分钟为了便于理解问题而特意夸大的。擦除者线程试图删除对象的过程如下:

    1. 获得得数据结构的锁,然后将要删除的对象从数据结构中移除。
    2. 在对象中存储当前时间。
    3. 将对象追加到待删除队列的尾部。追加到队列尾部的好处是可以元素按时间排序,越早删除的对象越在队列前面。

    待删除队列
    之后,擦除者线程定期访问队列头部的对象,检查它是否已经数据结构中移除超过1分钟,如果超过则检查它的引用计数是否为0:

    • 如引用计数不是0,更新对象中的删除时间,追加到队列尾部(通常不会发生)。
    • 如引用计数是0,从队列中移除对象,并删除(释放)对象。

    在上图中,假定当前时间是15:35:12,队列头部对象的删除时间已超过了1分钟,因此擦除者线程可以删除该对象。接着检查队列中的下一个对象,它的删除时间还不足1分钟,所以要继续留在队列中。现在,RCU方案有一个有趣的性质:擦除者线程不需要再检查队列中的其它对象,因为我们总是在队列尾部追加对象,队列是有序的,擦除者线程可以确定其它的对象都不满足1分钟的删除时间。

    相比于之前的朴素方案的全部遍历,RCU方案仅需检查队列头部的少数对象,可以节省大量的处理时间。

    原子变量从哪里来?

    从 gcc 4.1.2起,gcc内置了原子变量的方法,主流CPU架构都已经支持,请在使用前检查你代码运行的平台是否支持。以下是一系列原子变量的操作函数:

    type __sync_fetch_and_add (type *ptr, type value);
    type __sync_fetch_and_sub (type *ptr, type value);
    type __sync_fetch_and_or (type *ptr, type value);
    type __sync_fetch_and_and (type *ptr, type value);
    type __sync_fetch_and_xor (type *ptr, type value);
    type __sync_fetch_and_nand (type *ptr, type value);
    
    type __sync_add_and_fetch (type *ptr, type value);
    type __sync_sub_and_fetch (type *ptr, type value);
    type __sync_or_and_fetch (type *ptr, type value);
    type __sync_and_and_fetch (type *ptr, type value);
    type __sync_xor_and_fetch (type *ptr, type value);
    type __sync_nand_and_fetch (type *ptr, type value);
    

    译者注: 可以参见拙作 没有atomic.h后如何在linux实现原子操作, 也可以使用C11标准库stdatomic.h

    使用这些函数不需要包含任何头文件,如果你的架构不支持这些函数,链接时会报错。这些函数可以支持char,int,long,long long或其它无符号变体。

    最后,我的另一篇文章介绍了原子变量的使用 multithreaded simple data type access and atomic variables

    结论

    希望这篇文章让你觉得有趣。如果有更多问题可以发邮件到 alex@alexonlinux.com

    展开全文
  • 从语句的意思可以看出是要使用与当前cpu相关的一个变量,不过查看这个变量的定义,总是有这样一个宏:DEFINE_PER_CPU(type, name),将这个宏展开成下面的语句:  __attribute__((__section__(".data.percpu"))) __...

    原子变量

    谢宝友: 深入理解 Linux RCU 之从硬件说起

    一、原子操作有多慢?

    这里的原子操作,是特指Linux内核中,类似于atomic_long_add_return这样的API。简单的说,就是当某个原子操作完成时,确保所有CPU核已经识别到对原子变量的修改,并且在原子操作期间,其他CPU核不会同步对该变量进行修改。这必然要求相应的电信号在所有的CPU之间广播。如下图:

    对于普通变量操作(非原子操作)来说,电信号则不必在所有CPU核之间传播并来回传递:

    不能忘记一点:Linux操作系统可以运行在超过1024个CPU的大型系统中。在这些大型系统中,在所有CPU之间广播传递电信号,需要花费“很长”的时间

    二、原子变量的必要性

    在汇编语言中,加1操作(i++)通常分为3步执行:

    1. 将计数器值从内存赋值到处理器寄存器;
    2. 将其值加1;
    3. 将寄存器数据写回内存。

    如果两个cpu同时执行加1操作,二者同时从内存读取计数器的值到cpu寄存器(例如是4),将其加1得到5,最后二者将新值写回到内存,此时内存的值是5,但正确来讲值应该是6.

    原子变量的操作atomic_inc()的作用就是将上面3步看做是1条汇编指令。那么是怎么实现的呢?

    三、原子变量原理

    原文

     staticinlinevoid atomic_sub(inti,atomic_t*v)
    
     {
    
           asmvolatile(LOCK_PREFIX“subl%1,%0″
    
                       :“+m” (v->counter)
    
                       :“ir”(i));
    
     }
      在原子加减操作中,我们可以发现实现都基于关键指令LOCK_PREFIX。该宏定义在文件arch/x86/include/asm/alternative.h中。
    
     #ifdefCONFIG_SMP
    
     #defineLOCK_PREFIX\
    
                   “.section.smp_locks,\”a\”\n”\
    
                   _ASM_ALIGN“\n”                \
    
                   _ASM_PTR “661f\n”/*address*/       \
    
                   “.previous\n”                \
    
                   “661:\n\tlock;”
    
    
    
     #else/*!CONFIG_SMP*/
    
     #defineLOCK_PREFIX“”
    
     #endif

    在X86平台上,CPU提供了在指令执行期间对总线加锁的手段。CPU上有一根引线#HLOCK pin连到北桥,如果汇编语言的程序中在一条指令前面加上前缀“LOCK”,经过汇编以后的机器代码就使CPU在执行这条指令的时候把#HLOCK pin的电位拉低,持续到这条指令结束时放开,从而把总线锁住,这样同一总线上别的CPU就暂时不能通过总线访问内存了,保证了这条指令在多处理器环境中的原子性。 LOCK_PREFIX宏扩展开后,就是在语句前加上lock前缀。

    static inline void atomic_add(int i, atomic_t *v)
    
    {
            asm volatile(LOCK_PREFIX “addl %1,%0″
    }
    相当于:
    
    c041c21b:       8b 44 24 20             mov    0×20(%esp),%eax
    
    c041c21f:       f0 01 05 1c 8f af c0    lock add %eax,0xc0af8f1c

    类似的另一种解释方式:

    在X86平台上,atomic_t定义如下:
    typedef struct {  
    
        int counter;  
    
    } atomic_t;  
    
    下面选取atomic_add来进行分析:
    
    static inline void atomic_add(int i, atomic_t *v)  
    
    {  
    
        asm volatile(LOCK_PREFIX "addl %1,%0"  
    
                 : "+m" (v->counter)  
    
                 : "ir" (i));  
    
    }  
    
    可以看到,atomic_add使用了gcc提供的内嵌汇编来实现,是用一个addl指令来实现增加操作。重点看一下LOCK_PREFIX宏,它就是上文提到的锁总线操作,也就是它保证了操作的原子性。LOCK_PREFIX定义如下:
    
    #define LOCK_PREFIX \  
    
                    ".section .smp_locks,\"a\"\n"   \  
    
                    "  .align 4\n"                  \  
    
                    "  .long 661f\n" /* address */  \  
    
                    ".previous\n"                   \  
    
                    "661:\n\tlock; "  
    
    展开后变成:
    
    .section .smp_locks,"a"  
    
      .align 4  
    
      .long 661f  
    
    .previous  
    
    661:  
    
            lock;  

    asm volatile 可以看看内联定义asm volatile,加上lock前缀就相当于这条汇编指令被锁住总线,直到指令执行完成被释放(这条内联汇编相当于3个步骤)。

    逐条解释如下:
    .section .smp_locks,"a"
    下面的代码生成到 .smp_locks 段里,属性为"a", allocatable
      .align 4
    四字节对齐
      .long 661f
    生成一个整数,值为下面的 661 标号的实际地址,f 表示向前引用,如果 661 标号出现
    在前面,要写 661b。
    .previous
    代码生成恢复到原来的段,也就是 .text
    661:
    数字标号是局部标号,5.3 Symbol Names
            lock;
    开始生成指令,lock 前缀
    这段代码汇编后,在 .text 段生成一条 lock 指令前缀 0xf0,在 .smp_locks 段生成四个字节的 lock 前缀的地址,链接的时候,所有的.smp_locks 段合并起来,形成一个所有 lock 指令地址的数组,这样统计 .smp_locks 段就能知道代码里有多少个加锁的指令被生成,猜测是为了调试目的。

    原子变量有什么弊端呢?当有32个cpu同时要对全局变量进行原子变量++操作,因为原子变量操作锁住了内存总线,也就是只有1个cpu占有内存,其他cpu必须等到该cpu释放后才能使用,原子变量的这种操作也是不能强占而且不能中断的,这就相当于一个粒度很小的bh锁,流量大的时候也很消耗cpu的资源。这个目前没有办法优化。

    至于if (atomic_inc(g_acl_counter) > 10000)这种多cpu处理结果异常,这就是上一层的处理异常了,可用atomic_add_and_return()代替。

    2021.7.22

    ------------------------------------------------------------------------------

    以上是x86架构的实现,其实对于其他架构都是相同道理,x86是锁总线,而arm是锁内存。

     LDREX和STREX:

    1)LDREX用来读取内存中的值,并标记对该段内存的独占访问:

    LDREX Rx, [Ry]
    上面的指令意味着,读取寄存器Ry指向的4字节内存值,将其保存到Rx寄存器中,同时标记对Ry指向内存区域的独占访问。

    如果执行LDREX指令的时候发现已经被标记为独占访问了,并不会对指令的执行产生影响。

    2)而STREX在更新内存数值时,会检查该段内存是否已经被标记为独占访问,并以此来决定是否更新内存中的值:

    STREX Rx, Ry, [Rz]
    如果执行这条指令的时候发现已经被标记为独占访问了,则将寄存器Ry中的值更新到寄存器Rz指向的内存,并将寄存器Rx设置成0。指令执行成功后,会将独占访问标记位清除。

    而如果执行这条指令的时候发现没有设置独占标记,则不会更新内存,且将寄存器Rx的值设置成1。

    一旦某条STREX指令执行成功后,以后再对同一段内存尝试使用STREX指令更新的时候,会发现独占标记已经被清空了,就不能再更新了,从而实现独占访问的机制。

    大致的流程就是这样,但是ARM内部为了实现这个功能,还有不少复杂的情况要处理。

    在ARM系统中,内存有两种不同且对立的属性,即共享(Shareable)和非共享(Non-shareable)。共享意味着该段内存可以被系统中不同处理器访问到,这些处理器可以是同构的也可以是异构的。而非共享,则相反,意味着该段内存只能被系统中的一个处理器所访问到,对别的处理器来说不可见。

    为了实现独占访问,ARM系统中还特别提供了所谓独占监视器(Exclusive Monitor)的东西,其结构大致如下:

    可以看出来,一共有两种类型的独占监视器。每一个处理器内部都有一个本地监视器(Local Monitor),且在整个系统范围内还有一个全局监视器(Global Monitor)。

    如果要对非共享内存区中的值进行独占访问,只需要涉及本处理器内部的本地监视器就可以了;而如果要对共享内存区中的内存进行独占访问,除了要涉及到本处理器内部的本地监视器外,由于该内存区域可以被系统中所有处理器访问到,因此还必须要由全局监视器来协调。

    对于本地监视器来说,它只标记了本处理器对某段内存的独占访问,在调用LDREX指令时设置独占访问标志,在调用STREX指令时清除独占访问标志。

    而对于全局监视器来说,它可以标记每个处理器对某段内存的独占访问。也就是说,当一个处理器调用LDREX访问某段共享内存时,全局监视器只会设置针对该处理器的独占访问标记,不会影响到其它的处理器。当在以下两种情况下,会清除某个处理器的独占访问标记:

    1)当该处理器调用LDREX指令,申请独占访问另一段内存时;

    2)当别的处理器成功更新了该段独占访问内存值时。

    对于第二种情况,也就是说,当独占内存访问内存的值在任何情况下,被任何一个处理器更改过之后,所有申请独占该段内存的处理器的独占标记都会被清空。

    另外,更新内存的操作不一定非要是STREX指令,任何其它存储指令都可以。但如果不是STREX的话,则没法保证独占访问性。

    现在的处理器基本上都是多核的,一个芯片上集成了多个处理器。而且对于一般的操作系统,系统内存基本上都被设置上了共享属性,也就是说对系统中所有处理器可见。因此,我们这里主要分析多核系统中对共享内存的独占访问的情况。

    为了更加清楚的说明,我们可以举一个例子。假设系统中有两个处理器内核,而一个程序由三个线程组成,其中两个线程被分配到了第一个处理器上,另外一个线程被分配到了第二个处理器上。且他们的执行序列如下:

    大致经历的步骤如下:

    1)CPU2上的线程3最早执行LDREX,锁定某段共享内存区域。它会相应更新本地监视器和全局监视器。

    2)然后,CPU1上的线程1执行LDREX,它也会更新本地监视器和全局监视器。这时在全局监视器上,CPU1和CPU2都对该段内存做了独占标记。

    3)接着,CPU1上的线程2执行LDREX指令,它会发现本处理器的本地监视器对该段内存有了独占标记,同时全局监视器上CPU1也对该段内存做了独占标记,但这并不会影响这条指令的操作。

    4)再下来,CPU1上的线程1最先执行了STREX指令,尝试更新该段内存的值。它会发现本地监视器对该段内存是有独占标记的,而全局监视器上CPU1也有该段内存的独占标记,则更新内存值成功。同时,清除本地监视器对该段内存的独占标记,还有全局监视器所有处理器对该段内存的独占标记。

    5)下面,CPU2上的线程3执行STREX指令,也想更新该段内存值。它会发现本地监视器拥有对该段内存的独占标记,但是在全局监视器上CPU1没有了该段内存的独占标记(前面一步清空了),则更新不成功。

    6)最后,CPU1上的线程2执行STREX指令,试着更新该段内存值。它会发现本地监视器已经没有了对该段内存的独占标记(第4步清除了),则直接更新失败,不需要再查全局监视器了。

    所以,可以看出来,这套机制的精髓就是,无论有多少个处理器,有多少个地方会申请对同一个内存段进行操作,保证只有最早的更新可以成功,这之后的更新都会失败。失败了就证明对该段内存有访问冲突了。实际的使用中,可以重新用LDREX读取该段内存中保存的最新值,再处理一次,再尝试保存,直到成功为止。

    还有一点需要说明,LDREX和STREX是对内存中的一个字(Word,32 bit)进行独占访问的指令。如果想独占访问的内存区域不是一个字,还有其它的指令:

    1)LDREXB和STREXB:对内存中的一个字节(Byte,8 bit)进行独占访问;

    2)LDREXH和STREXH:中的一个半字(Half Word,16 bit)进行独占访问;

    3)LDREXD和STREXD:中的一个双字(Double Word,64 bit)进行独占访问。

    它们必须配对使用,不能混用。

    strex 命令是有返回值的

    每CPU变量

    原文

    一、linux中的每cpu变量

      看linux内核代码的时候,会发现大量的per_cpu(name, cpu),get_cpu_var(name)等出现cpu字眼的语句。从语句的意思可以看出是要使用与当前cpu相关的一个变量,不过查看这个变量的定义,总是有这样一个宏:DEFINE_PER_CPU(type, name),将这个宏展开成下面的语句:

      __attribute__((__section__(".data.percpu"))) __typeof__(type) per_cpu__##name

    ,这个语句就是在.data.percpu段中定义type类型的per_cpu##name变量。看到这里,我就不明白了,既然只是在一个段中定义了一个变量,那为什么每个cpu都有一个这样的变量呢

    二、linux中的每cpu变量的实现

      这应该算是linux内核产生每cpu变量的一个技巧吧!首先我们来看下链接linux内核的链接脚本,这个脚本主要用来控制gcc怎样链接linux中的各个段并最终产生linux内核映像文件的,不明白链接脚本的可以到网上搜下,大把的资料。这个脚本文件叫vmlinux.lds.S,放在arch/i386/kernel目录中,在这个文件中有下面一段代码:

     __per_cpu_start = .;
    
      .data.percpu  : { *(.data.percpu) }
    
      __per_cpu_end = .;
     

      这段代码定义了两个符号,分别是__per_cpu_start和_per_cpu_end,它们标识了段data.percpu的起始和结束地址。而段.data.percpu是通过各个对象文件中的.data.percpu段合并起来的,也就是说前面我们定义的per_cpu##name变量终止都会放在.data.percpu段中,而这个段的起始地址和结束地址分别是__per_cpu_start和_per_cpu_end。到了这一步,貌似还是没有看出per_cpu##name变量怎么会对每个cpu都有一个。(该链接文件的代码简单说就是讲所有定义的每cpu变量整合到该段中

      在linux初始化的时候,会调用函数setup_per_cpu_areas来真正的把per_cpu##name变量赋值给每个cpu,具体代码如下:

    static void __init setup_per_cpu_areas(void)
    {
        unsigned long size, i;
        char *ptr;
        /* Created by linker magic */
        extern char __per_cpu_start[], __per_cpu_end[];//引用所有每cpu变量的起始地址和终止地址
    
        /* Copy section for each CPU (we discard the original) */
        size = ALIGN(__per_cpu_end - __per_cpu_start, SMP_CACHE_BYTES);//获得所有每cpu变量的总大小
    #ifdef CONFIG_MODULES
        if (size < PERCPU_ENOUGH_ROOM)
            size = PERCPU_ENOUGH_ROOM;
    #endif
    
        ptr = alloc_bootmem(size * NR_CPUS);
    
        for (i = 0; i < NR_CPUS; i++, ptr += size) {
            __per_cpu_offset[i] = ptr - __per_cpu_start;
            memcpy(ptr, __per_cpu_start, __per_cpu_end - __per_cpu_start);
        }
    }
    #endif /* !__GENERIC_PER_CPU */

     

      代码中引用了由链接器产生的变量__per_cpu_start和 __per_cpu_end,在它们之间的内存空间存放了所有的每cpu变量,总大小为size。然后内核通过alloc_bootmem给每个cpu都分配了一个这么大小的内存空间。下面的for循环把__per_cpu_start和 __per_cpu_end之间的所有cpu变量拷贝一份到每个cpu对应的内存空间中,并用__per_cpu_offset[i]来存放第i个cpu对应的每cpu变量的起始地址。

    不过为什么__per_cpu_offset[i]存放的是ptr - __per_cpu_start,而不是ptr,原因很简单,当我们用per_cpu##name来访问某个cpu上的每cpu变量时,我们应该这样访问:获取该cpu对应每cpu变量的起始地址+per_cpu##name的偏移量。我们现在展开宏per_cpu(var, cpu):

    *(&per_cpu__##var + __per_cpu_offset[cpu])=*(ptr+&per_cpu__##var- __per_cpu_start)

    这样就访问了在cpu上的var变量了。

    其实就是在内存上申请了大小为NR_CPUS * 所有每cpu变量size,只不过是访问方式改变了。

    三、每cpu变量的作用

      从上面可以看出,为了定义一个变量,绕了一个很大的弯,为什么要定义这样的每cpu变量?这其实和linux内部的同步有关,因为如果我们把变量定义成所有cpu都可以访问的,那么就必须用同步机制来保证cpu对这个变量的互斥访问,很明显这是要花费时间的,linux内核为了能够减少这种时间开销,就在每个cpu都定义了一个一模一样的变量,这样每个cpu都使用自己的变量,而不会去访问其它cpu上的变量,也就没有了同步的开销。每cpu变量保证了不同cpu的数据不存在竞争的关系,但对同一个cpu上的两个线程仍然存在竞态,比如cpu上两个线程同时访问该变量或者被同一个cpu上的中断打断。在使用每cpu变量时,必须保证禁用内核抢占并且关闭中断。

    RCU

    基于谢宝友老师rcu之二整理

    RCU是read-copy-update的简称,翻译为中文有点别扭“读-复制-更新”。RCU允许读操作可以与更新操作并发执行,这一点提升了程序的可扩展性。常规的互斥锁让并发线程互斥执行,并不关心该线程是读者还是写者,而读/写锁在没有写者时允许并发的读者,相比于这些常规锁操作,RCU在维护对象的多个版本时确保读操作保持一致,同时保证只有所有当前读端临界区都执行完毕后才释放对象。RCU定义并使用了高效并且易于扩展的机制,用来发布和读取对象的新版本,还用于延后旧版本对象的垃圾收集工作。这些机制恰当地在读端和更新端并行工作,使得读端特别快速。在某些场合下(比如非抢占式内核里),RCU读端的函数完全是零开销。

    一、rcu原语

    1. rcu_assign_pointer() 指针的发布
      //gp 全局变量
      
      p =  kmalloc(sizeof(*p),  GFP_KERNEL);
      p->a =  1;
      p->b =  2;
      p->c =  3;
      
      gp  =  p;

      这种情况由于编译器的编译优化选项,可能会导致p中个元素没赋值的情况下执行了gp=p,这样其他用到gp的线程会拿到没有初始化的gp全局指针。可以用如下代替:

      //gp 全局变量
      
      p =  kmalloc(sizeof(*p),  GFP_KERNEL);
      p->a =  1;
      p->b =  2;
      p->c =  3;
      
      rcu_assign_pointer(gp,  p);
      #define rcu_assign_pointer(p, v) \   
               __rcu_assign_pointer((p), (v), __rcu)  
        
      #define __rcu_assign_pointer(p, v, space) \   
               do { \  
                       smp_wmb(); \  
                       (p) = (typeof(*v) __force space *)(v); \  
               } while (0)  

      从上可以看出rcu_assign_pointer()仅仅是在赋值之前加入了内存屏障,保证了前后代码的执行顺序不被优化。

    2. rcu_read_lock()、rcu_read_unlock()、rcu_dereference()指针的订阅
      1 p =  gp;
      2 if (p  !=  NULL) {
      3    do_something_with(p->a,  p->b, p->c);
      4 }

      以上代码是在读全局变量gp的应用场景,这种代码会出现什么问题呢?这个会出现1和2的执行顺序不一致;还有另外一点很重要,引起问题的根源在于:在同一个CPU内部,使用了不止一个缓存来缓存CPU数据。这样可能使用p和p->a被分布不同一个CPU的不同缓存中,造成缓存一致性方面的问题,因为这个段代码没有使用锁机制访问全局变量,这可能导致在p的写线程中p->a 刚好被赋值而p->b和p->c还没有被赋值时执行了上述代码,所以我们要解决两个问题1、保证代码的执行数据 问题2、保证abc全被复制后此处代码再使用gp。所以我们在用引用全局变量gp的时候需要一个rcu操作如下:

      1  rcu_read_lock();
      2  p =  rcu_dereference(gp);
      3  if (p  !=  NULL) {
      4     do_something_with(p->a,  p->b, p->c);
      5  }
      6  rcu_read_unlock();
      

      其中rcu_read_ lock()和rcu_read_unlock()这对原语定义了RCU读端的临界区。事实上,在没有配置CONFIG_PREEMPT的内核里,这对原语就是空函数。在可抢占内核中,这这对原语就是关闭/打开抢占。

      rcu_dereference()原语用一种“订阅”的办法获取指定指针的值。保证后续的解引用操作可以看见在对应的“发布”操作(rcu_assign_pointer())前进行的初始化即:在看到p的新值之前,能够看到p->a、p->b、p->c的新值,这就解决上了上述问题2。请注意,rcu_assign_pointer()和rcu_dereference()这对原语既不会自旋或者阻塞,也不会阻止list_add_ rcu()的并发执行。看下rcu_dereference()的源码也解决了问题1如下:

      #define rcu_dereference(p)     ({ \   
                          typeof(p) _________p1 = p; \  
                          smp_read_barrier_depends(); \  
                          (_________p1); \  
                          })  

      rcu_dereference()简化看到有个读的内存屏障。

    以上介绍了2个rcu的原语,还有一个原语synchronize_rcu()稍后介绍。原语一般不单独使用,rcu原语常用的场景是rcu链表中,如下图:

    可以看到链表跟哈希链表对应的发布、删除发布、订阅的函数,以下看下链表的rcu操作分别都插入了那些原语:

    static inline void __list_add_rcu(struct list_head *new,
    struct list_head *prev, struct list_head *next)
    {
    new->next = next;
    new->prev = prev;
    rcu_assign_pointer(list_next_rcu(prev), new);
    next->prev = new;
    }
    
    //仅仅是多了一个rcu_assign_pointer操作保证代码顺序
    
    / * Note that the caller is not permitted to immediately free
     * the newly deleted entry.  Instead, either synchronize_rcu()
     * or call_rcu() must be used to defer freeing until an RCU
     * grace period has elapsed.
     */
    static inline void list_del_rcu(struct list_head *entry)
    {
    __list_del(entry->prev, entry->next);
    entry->prev = LIST_POISON2;
    }
    /*
    此函数将一个条目从链表删除。__list_del将两个条目链接起来,entry是被删除的结点,此时
    entry->prev = LIST_POISON2;   给条目的prev指针赋值
    却没给条目next赋值,就是为了保证读线程的完整性,能读到next的结点,已经获取此条目地址的遍历程序不会中断,没有获取此条目的会获得新的next地址。
    
    这里也符合rcu理念,保证链表遍历完整性。
    */

    二、宽限期

    图中每行代表一个线程,最下面的一行是删除线程,当它执行完删除操作后,线程进入了宽限期。宽限期的意义是,在一个删除动作发生后,它必须等待所有在宽限期开始前已经开始的读线程结束,才可以进行销毁操作。这样做的原因是这些线程有可能读到了要删除的元素。图中的宽限期必须等待1和2结束;而读线程5在宽限期开始前已经结束,不需要考虑;而3,4,6也不需要考虑,因为在宽限期结束后开始后的线程不可能读到已删除的元素。此处要理解什么是“执行完删除操作,线程进入了宽限期”,当链表在执行摘链操作后调用synchronize_rcu()函数,开始进入宽限期,此时查看所有读者线程有没有正在使用,如果没有正在使用的读者线程,那么开始执行销毁操作(kfree)

    从最基本的角度来说,RCU就是一种等待事物结束的方式。当然,有很多其他的方式可以用来等待事物结束,比如引用计数、读/写锁、事件等等。RCU的最伟大之处在于它可以等待(比如)20,000种不同的事物,而无需显式地去跟踪它们中的每一个,也无需去担心对性能的影响,对扩展性的限制,复杂的死锁场景,还有内存泄漏带来的危害等等使用显式跟踪手段会出现的问题。

    在RCU的例子中,被等待的事物称为“RCU读端临界区”。RCU读端临界区(读线程1)从rcu_read_lock()原语开始,到对应的rcu_read_unlock()原语结束。RCU读端临界区可以嵌套,也可以包含一大块代码,只要这其中的代码不会阻塞或者睡眠(先不考虑可睡眠RCU),因为synchronize_rcu()原语会等待所有读线程都越过宽限期的左侧才会继续执行。如果你遵守这些约定,就可以使用RCU去等待任何代码的完成。

    下列伪代码展示了写者使用RCU等待读者的基本方法。

    1.作出改变,比如替换链表中的一个元素。

    2.等待所有已有的RCU读端临界区执行完毕(比如使用synchronize_rcu()原语)。这里要注意的是后续的RCU读端临界区无法获取刚刚删除元素的引用。

    3.清理,比如释放刚才被替换的元素。

    下图所示的代码片段演示了这个过程,其中字段a是搜索关键字。
     

    1  struct  foo  {
    2    struct  list_head  *list;
    3     int  a;
    4     int  b;
    5     int  c;
    6  };
    7 LIST_HEAD(head);
    8
    9  /*  . .  .  */
    10
    11  p  = search(head,  key);
    12  if  (p ==  NULL)  {
    13     /*  Take appropriate  action,  unlock, and
      return. */
    14  }
    15  q  = kmalloc(sizeof(*p),  GFP_KERNEL);
    16  *q  =  *p;
    17 q->b  =  2;
    18 q->c  =  3;
    19 list_replace_rcu(&p->list, &q->list);
    20 synchronize_rcu();
    21  kfree(p);

    这段代码看到使用了synchronize_rcu()进行阻塞等带宽限期里没有读者,然后才会释放内存。

    以上的rcu介绍有一个疑问是rcu_read_lock的使用是基于全局的,难道所有使用了rcu机制的全局变量都用这个所?介绍了一点rcu_read_lock。其实这个rcu_read_lock()和 rcu_read_unlock()只是为了防止进程抢占(对于所有cpu仅仅加读锁相当于没加,仅使用了锁防抢占的功能),同时跟rcu_dereference配合确定了某一个全局变量的是否在占用宽限期。

    展开全文
  • //原子Integer 实现计数器 public class AtomicCounter { // 原子Integer递增对象 public static AtomicInteger atomicInteger = new AtomicInteger(0); public static void main(String[] args) {...

    package class10;

    import java.util.concurrent.ExecutorService;

    import java.util.concurrent.Executors;

    //使用synchronized 实现计数器

    public class SyncCounter {

    private int count = 0;

    public synchronized void increase() {

    count++;

    }

    public int getCount() {

    return count;

    }

    public static void main(String[] args) {

    final SyncCounter aor = new SyncCounter();

    // 线程池

    ExecutorService exec = Executors.newCachedThreadPool();

    // 模拟20个客户端访问

    for (int index = 0; index 

    Runnable run = new Runnable() {

    public void run() {

    aor.increase();

    int result = aor.getCount();

    System.out.println(result);

    }

    };

    exec.execute(run);

    }

    // 退出线程池

    exec.shutdown();

    }

    }

    package class10;

    import java.util.concurrent.ExecutorService;

    import java.util.concurrent.Executors;

    import java.util.concurrent.atomic.AtomicInteger;

    //原子Integer 实现计数器

    public class AtomicCounter {

    // 原子Integer递增对象

    public static AtomicInteger atomicInteger = new AtomicInteger(0);

    public static void main(String[] args) {

    // 线程池

    ExecutorService exec = Executors.newCachedThreadPool();

    // 模拟50个客户端访问

    for (int index = 0; index 

    Runnable run = new Runnable() {

    public void run() {

    atomicInteger.getAndIncrement();

    System.out.println(atomicInteger);

    }

    };

    exec.execute(run);

    }

    // 退出线程池

    exec.shutdown();

    System.out.println(atomicInteger);

    }

    }

    package class10;

    import java.util.concurrent.ExecutorService;

    import java.util.concurrent.Executors;

    import java.util.concurrent.locks.ReentrantLock;

    //使用ReentrantLock实现计数器

    public class ReentrantLockTest {

    private ReentrantLock lock = new ReentrantLock();

    private int count;

    public void increase() {

    lock.lock();

    count++;

    try {

    try {

    Thread.sleep(10);

    } catch (InterruptedException e) {

    System.err.println(Thread.currentThread().getName() + " interrupted");

    Thread.currentThread().interrupt();

    }

    } finally {

    lock.unlock();

    }

    }

    public int getCount() {

    return count;

    }

    public static void main(String[] args) {

    final ReentrantLockTest reentrantLockTest = new ReentrantLockTest();

    // 线程池

    ExecutorService exec = Executors.newCachedThreadPool();

    // 模拟20个客户端访问

    for (int index = 0; index 

    Runnable run = new Runnable() {

    public void run() {

    reentrantLockTest.increase();

    int result = reentrantLockTest.getCount();

    System.out.println(result);

    }

    };

    exec.execute(run);

    }

    // 退出线程池

    exec.shutdown();

    System.out.println(reentrantLockTest.getCount());

    }

    }

    展开全文
  • 这里写目录标题16.1 原子变量和CAS16.1.1 AtomicInteger1.基本用法2.基本原理和思维3.实现锁16.1.2 ABA问题16.1.3 小结参考目录 16.1 原子变量和CAS         什么是原子...
  • 前面讲线程同步时,我们对多线程容易出现的问题进行了分析,在那个例子中,问题的根源在于c++和c--这... } 在Java1.8中还增加了DoubleAccumulator、DoubleAdder、LongAccumulator、LongAdder这四个类用于并发累积计数
  • JUC并发包提供了一系列的原子操作类,这些类都是使用非阻塞算法(CAS)实现的,相比于使用锁来实现,这些原子操作类在性能上更好一些。JUC并发包中包含有AtomicInteger、AtomicLong和AtomicBoolean等原子性操作类,...
  • 并发是指的是多个执行单元同时被执行,而并发的执行单元对共享资源(硬件资源和软件上的全局变量、静态变量等)的访问很容易导致竟态。主要有以下三个方面:一、对称处理器的多个CPU。二、单CPU内,进程与抢占它的进程...
  • 今天我将就C++11多线程中的atomic原子操作展开讨论;比较互斥锁,自旋锁(spinlock),无锁编程的异同,并进行性能测试;最后会讨论一下内存序的问题;为了流畅阅读你最好先熟悉一下C++11 Atomic的基本操作英文文档,...
  • Linux原子操作

    2020-12-23 17:57:38
    一、原子操作所谓原子操作,就是该操作绝不会在执行完毕前被任何其他任务或事件打断,也就说,它是最小的执行单位,不可能有比它更小的执行单位,因此这里的原子实际...原子操作主要用于实现资源计数,很多引用计数(...
  • FreeRTOS计数型信号量

    2021-05-17 15:43:40
    除初始化外,仅能通过两个标准的原子操作Wait(S)和Signal(S)来访问.其通常分别被称为P.V操作. 描述如下: P操作:S=S-1:如果S小于 ... Java 硬件同步机制 Swap 指令模拟 + 记录型信号量模拟 学校实验存档//.. 以经典的...
  • PHP引用计数

    2021-03-26 12:55:19
    1. PHP官方手册引用计数介绍引用计数每个php变量存在一个叫"zval"的变量容器中。一个zval变量容器,除了包含变量的类型和值,还包括两个字节的额外信息。第一个是"is_ref",是个bool值,用来标识这个变量是否是属于...
  • 在nginx里,我们都知道nginx是一个多进程的反向代理服务器,他的ngx_events_module里就有一些原子变量,比如ngx_connection_counter,在ngx_event_module_init函数里指向一块共享内存,是nginx的连接计数统计。...
  • 1、原子整数累加和原子累加器性能比较 import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.LongAdder; import java.util....
  • 如果是跟随正点原子学习STM32的同学会发现,原子的代码里面很喜欢设置一个状态标志变量。 例如串口实验中的USART_RX_STA变量、定时器中的TIM5CH1_CAPTURE_STA变量,都是将一些重要数据保存在自定义的变量里面,这些...
  • 引用计数-kref 1、 前言 众所周知,C/C++语言本身并不支持垃圾回收机制,虽然语言本身具有极高的灵活性,但是当遇到大型的项目时,繁琐的内存管理往往让人痛苦异常。现代的C/C++类库一般会提供智能指针来作为内存...
  • 原子类AtomicInteger的ABA问题 连环套路 从AtomicInteger引出下面的问题 CAS -> Unsafe -> CAS底层思想 -> ABA -> 原子引用更新 -> 如何规避ABA问题 ABA问题是什么 狸猫换太子 假设现在有两个线程,...
  • 在这个场合中,每次事件发生的时候就在事件处理函数中释放信号量(增加信号量的计数值),其他任务会获取信号量(信号量计数值减一,信号量值就是队列结构体成员变量uxMessagesWaiting)来处理事件。在这种场合中创建的...
  • 如何使用二进制信号量实现计数信号量
  • C++内存模型和原子类型操作 std::memory_order初探 动态内存模型可以理解为存储一致性模型,主要是从行为上来看多个线程对同一个对象读写操作时所做的约束,动态内存理解起来会有少许复杂,涉及到内存、Cache、CPU的...
  • 文章目录AtomicLong概述AtomicLong实现原理AtomicLong的缺陷LongAdderLongAdder实现原理LongAdder源码分析AtomicLong和 LongAdder对比...说到线程安全的计数统计工具类,肯定少不了Atomic下的几个原子类。AtomicLong
  • 说说我们在多线程对一些变量做自增同时要保证线程安全,我们最容易使用高性能的原子类来处理,因为他们是采用cas保证线程安全的,一般情况下比重量级加锁性能更佳,所以能用原子类的地方我们尽力不使用加锁。...
  • 芯灵思SinlinxA33开发板Linux内核原子操作(附实测代码)[复制链接]原子操作是指不会被线程调度机制打断的操作;这种操作一旦开始,就一直运行到结束,中间不会有任何线程切换。原子操作是不可分割的,在执行完毕之前...
  • 第一种情况编写一个类Counter里面有一个成员变量count,写一段简单的i++的代码完成计数的功能,为了暴露多线程下的问题,让每次自增之前睡100mspublic class Counter {private int count;private s...
  • 计数用synchronized,AtomicLong,LongAdder 在很多系统中都用到了计数的功能,那么计数我们应该用synchronized,AtomicLong,LongAdder中的哪一个呢?来跑个例子 public class CountTest { private int count = ...
  • 本篇文章给大家带来的内容是关于AtomicInteger原子类的作用介绍(代码示例),有一定的参考价值,有需要的朋友可以参考一下,希望对你有所帮助。AtomicInteger 原子类的作用多线程操作,Synchronized 性能开销太大...
  • 定义变量是寄存器保存,不写到内存上因为在寄存器上,所以不能取地址不能保存存放在data和bss段的数据寄存器相当于内存的高速缓存,增加访问速度,但具体能够保存多少个由cpu决定,多余的将被忽略。static:static...
  • Linux 原子操作

    2021-01-14 01:58:49
    所谓原子操作,就是该操作绝不会在执行完毕前被任何其他任务或事件打断,也就说,它的最小的执行单位,不可能有比它更小的执行单位,因此这里的原子实际是使用了物理学里的...原子操作主要用于实现资源计数,很多...
  • } 2 三个版本的计数函数 #include "thread_group_zjd.h" #include #include #include #include using namespace std; // 全局的结果数据 long total = 0; // 点击函数 void click_normal() { for (int i = 0; i ; +...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 34,463
精华内容 13,785
关键字:

原子变量计数