精华内容
下载资源
问答
  • spinlock
    2021-03-14 23:05:58

    自旋锁(Spin lock)

    自旋锁是指当一个线程尝试获取某个锁时,如果该锁已被其他线程占用,就一直循环检测锁是否被释放,而不是进入线程挂起或睡眠状态。

    自旋锁适用于锁保护的临界区很小的情况,临界区很小的话,锁占用的时间就很短。

    简单的实现

    import java.util.concurrent.atomic.AtomicReference;

    public class SpinLock {

    private AtomicReference owner = new AtomicReference();

    public void lock() {

    Thread currentThread = Thread.currentThread();

    // 如果锁未被占用,则设置当前线程为锁的拥有者

    while (!owner.compareAndSet(null, currentThread)) {

    }

    }

    public void unlock() {

    Thread currentThread = Thread.currentThread();

    // 只有锁的拥有者才能释放锁

    owner.compareAndSet(currentThread, null);

    }

    }

    SimpleSpinLock里有一个owner属性持有锁当前拥有者的线程的引用,如果该引用为null,则表示锁未被占用,不为null则被占用。

    这里用AtomicReference是为了使用它的原子性的compareAndSet方法(CAS操作),解决了多线程并发操作导致数据不一致的问题,确保其他线程可以看到锁的真实状态。

    继续阅读 →

    更多相关内容
  • use spinlock :: SpinLock; fn main () { let spin = SpinLock :: new ( 0 ); // Write access { let mut data = spin. write (). unwrap (); * data += 1 ; } // Read access { let data = spin. read ...
  • Go和内联汇编程序中的Spinlock实现。 概述 程序包自旋锁提供了汇编中低级自旋锁的实现。 此外,它还提供了基于内置原子的实现的后备功能。 在我的私人笔记本电脑上,基准测试结果如下。 所测量的时间是墙上的时间。...
  •  一、自旋锁(spinlock)简介  自旋锁在同一时刻只能被最多一个内核任务持有,所以一个时刻只有一个线程允许存在于临界区中。这点可以应用在多处理机器、或运行在单处理器上的抢占式内核中需要的锁定服务。  二...
  • 本文主要以2.6.22.6内核分析Linux中spinlock在ARM及X86平台上的实现(不同版本的内核实现形式会有一些差异,但原理大致相同)。此处默认大家已经熟悉了spinlock的使用,重点解释容易引起迷惑的体系结构相关的实现...
  • spinlock-master.zip,spinlock-master,spinlock-xchg-hle.h,spinlock-mcs.h,spinlock-xchg-backoff.h,run-test-spinlock.sh,spinlock-ticket.h,spinlock-pthread.h,spinlock-xchg.h,Makefile,README.md,spinlock-...
  • SpinLock.cpp

    2020-05-26 16:30:15
    为了效率,不使用C++语言提供的Mutex互斥量,而使用不使用线程被阻塞的方式,即所谓的自旋锁,这是自旋锁的一种实现方式,使用C++11的原子变量,不用锁机制,实现的一种无锁的自旋锁
  • MCS spinlock的Linux内核模块实现.pdf
  • 它首先肯定要解决mcs_spinlock占用大小,实际上它结合了ticket spinlock和mcs_spinlock的优点,大小保持一致,如果只有1个或2个CPU试图获取锁,只用这个新的spinlock,当有3个以上的CPU试图获取锁,才需要mcs_...

    1. 引言

    通常我们的说的同步其实有两个层面的意思:

    1. 一个是线程间的同步,主要是为了按照编程者指定的特定顺序执行;

    2. 另外一个是数据的同步,主要是为了保存数据。

    为了高效解决同步问题,前人抽象出同步原语供开发者使用。不仅多核,单核也需要同步原语,核心的问题要保证共享资源访问的正确性。如果共享资源划分不合理,同步原语的开销会制约多核性能。

    常见的同步原语有:互斥锁、条件变量、信号量、读写锁、RCU等。

    本文主要聚焦于互斥锁,对应linux的spinlock,我们试图沿着时间的脉络去梳理spinlock的不断改进的进程,如果涉及CPU体系结构,我们主要关注ARM体系结构的实现。如果后续有时间我们会继续分析其他同步原语的演进和优化历程。

    2. 演化进程

    linux内核spinlock是互斥机制的最底层基础设施,它的性能直接关系到内核的性能,主要分为这么几个阶段:

    1. linux-2.6.25之前,我们称之为原始spinlock

      对锁的实现,是使用原子操作去无序竞争全局的锁资源。

      这个阶段对锁的争用处于无序竞争的状态。如果CPU核心数不多,资源相对充裕,好比我们去银行柜台办理业务,一共就1-2个人,无非你在前还是我在前的问题,公平性的问题并不突出,性能也没什么大的影响,但是一旦cpu核心数和锁的竞争者相对较多时,就会出现有些人因为某些优势(如CPU算力强,锁正好落在当前CPU的cacheline中等)总是能抢到锁,其他人总是抢不到的情况出现。

    2. linux-2.6.25,在x86下实现了ticket spinlock,替换原始spinlock。

      随着CPU核数的增多以及对共享资源的争用越发激烈,公平性问题就显现出来了。保证公平一个很自然的思路就是大家都来排队。如果对锁的争用比较激烈再加上如果此时核比较多,此时一旦释放锁,其实只有1个CPU能抢到锁,但是因为大家观察都是全局的一个锁,那其他CPU的cacheline会因此失效,会有相当程度的性能损耗。还是就以去银行柜台办理业务为例,它的实现相当于去银行取号、排队、等叫号这么一个过程,问题在于叫号相当是一个广播机制,所有人都要侦听,还是有点浪费时间精力。

    3. linux-3.5,ARM体系结构用ticket spinlock替换了原始spinlock

      过了几个版本,ARM才替换,原因作者没有去考证,不得而知。

    4. linux-3.15,实现了mcs_spinlock,但未替换ticket spinlock

      它把对全局锁转换成per-cpu的锁,减少争用的损耗,然后各个锁之间通过链表串起来排队,解决了公平性和性能损耗的问题。然后它却没有替代ticket spinlock成为内核默认实现,因为spinlock太底层了,已经嵌入了内核的各种关键数据结构中,它的数据结构要比spinlock大,这是内核锁不能接受的,但是最终它还是合入了内核,只是没有人去用它。但是它的存在为后一步的优化,仍然起到了非常重要的作用。它的实现思路是把ticket spinlock的广播机制转变为击鼓传花,也就是实际上可以我并需要侦听广播,主要在我前面排队的人在使用完锁以后告诉我可以了。

    5. linux-4.2,实现了queued spinlock(简称qspinlock),替换了ticket spinlock

      它首先肯定要解决mcs_spinlock占用大小,实际上它结合了ticket spinlockmcs_spinlock的优点,大小保持一致,如果只有1个或2个CPU试图获取锁,只用这个新的spinlock,当有3个以上的CPU试图获取锁,才需要mcs_spinlock。它的数据结构有表示当前锁的持有情况、是否有还有一个竞争者,已经需要快速找到对应CPU的per-cpu结构的mcs_spinlock节点,这3个大的域被塞在ticket spinlock同样大小数据结构中。这种遵守原先架构约束的情况而做出的改进,非常值得我们学习。

    6. linux-4.19,ARM体系结构将queued spinlock替换成默认实现。

      原因是什么,作者同样没去考证。

    3. 原始spinlock实现

    3.1 关键数据结构和公共代码

    typedef struct {
    	volatile unsigned int lock;
    } raw_spinlock_t;
    
    typedef struct {
    	raw_spinlock_t raw_lock;
    } spinlock_t;
    
    #define spin_lock(lock)			_spin_lock(lock)
    
    void __lockfunc _spin_lock(spinlock_t *lock)
    {
    	preempt_disable();
        _raw_spin_lock(lock);
    }
    # define _raw_spin_lock(lock)		__raw_spin_lock(&(lock)->raw_lock)

    3.2 ARM体系结构的加锁实现

    //arm32(那时候还没arm64)的实现,这个时期的内核大体对应ARMV6
    static inline void __raw_spin_lock(raw_spinlock_t *lock)
    {
    	unsigned long tmp;
    
    	__asm__ __volatile__(
    "1:	ldrex	%0, [%1]\n"      //1.
    "	teq	%0, #0\n"            //2. 
    "	strexeq	%0, %2, [%1]\n"  //3.
    "	teqeq	%0, #0\n"        //4.
    #ifdef CONFIG_CPU_32v6K
    "	wfene\n"                 //5.
    #endif
    "	bne	1b"                  //6.
    	: "=&r" (tmp)
    	: "r" (&lock->lock), "r" (1) 
    	: "cc");
    
    	smp_mb();                //7.
    }

    通过数据结构,可以看出,此时的lock还是一个unsigned int类型的数据,加锁的时候,首先会关闭抢占,然后会转到各个体系结构的实现,我们关注ARM的实现,__raw_spin_lock的分析如下:

    1. 读取lock的状态值给tmp,并将&lock->lock标记为独占。

    2. 判断lock的状态是否为0。如果是0说明可以继续往下走(跳到第3步);如果不为0,说明自旋锁处于上锁状态,不能访问,跳到第5步(如果不支持WFE则直接跳到第6步)自旋,最后回到第1步自旋。teq执行会影响标志寄存器中Z标志位,后面带eq或者ne后缀的执行都受该标志位影响。

    3. 执行strex执行,只有从上一次ldrex执行到本次strex这个被标记为独占的地址(&lock->lock)没有改变,才会执行成功(lock的状态改写为1)。通过strex执和ldrex实现原子性操作。

    4. 继续判断lock的状态是否为0,为0说明获得锁;不为0说明没有获得锁,跳到第5步(如果支持WFE的话)。

    5. 执行WFE指令(如果支持的话),CPU进低功耗状态,省点功耗。

    6. 如果收到SEV指令(如果有第五步的话),继续判断lock的状态是否为0,不为0跳到第1步,继续循环;如果lock为0,继续跳到第7步

    7. 执行barrier(多核情况下为)DMB指令,保证访存顺序按我们的编程顺序执行(即后面的load/store绝不允许越过smp_mb()屏障乱序到前面执行)。

    3.3 ARM体系结构的解锁实现

    static inline void __raw_spin_unlock(raw_spinlock_t *lock)
    {
    	smp_mb();
    
    	__asm__ __volatile__(
    "	str	%1, [%0]\n"
    #ifdef CONFIG_CPU_32v6K
    "	mcr	p15, 0, %1, c7, c10, 4\n" /* DSB */
    "	sev"
    #endif
    	:
    	: "r" (&lock->lock), "r" (0)
    	: "cc");
    }

    解锁的操作相对简单,str将lock->lock赋值,然后使用DSB保序,使用sev通知持锁cpu得到锁。

    4. ticket spinlock

    4.1 关键数据结构和公共代码

    typedef struct {
    	union {
    		u32 slock;
    		struct __raw_tickets { //只考虑小端
    			u16 owner;
    			u16 next;
    		} tickets;
    	};
    } arch_spinlock_t;
    
    typedef struct raw_spinlock {
    	arch_spinlock_t raw_lock;
    } raw_spinlock_t;
    
    typedef struct spinlock {
    	union {
    		struct raw_spinlock rlock;
    	};
    } spinlock_t;
    
    static inline void spin_lock(spinlock_t *lock)
    {
    	raw_spin_lock(&lock->rlock);
    }
    #define raw_spin_lock(lock)	_raw_spin_lock(lock)
    void __lockfunc _raw_spin_lock(raw_spinlock_t *lock)
    {
    	__raw_spin_lock(lock);
    }
    static inline void __raw_spin_lock(raw_spinlock_t *lock)
    {
    	preempt_disable();
    	do_raw_spin_lock();
    }
    static inline void do_raw_spin_lock(raw_spinlock_t *lock) __acquires(lock)
    {
    	arch_spin_lock(&lock->raw_lock);
    }

    4.2 AArch32的加锁实现

    static inline void arch_spin_lock(arch_spinlock_t *lock)
    {
    	unsigned long tmp;
    	u32 newval;
    	arch_spinlock_t lockval;
    
    	__asm__ __volatile__(
    "1:	ldrex	%0, [%3]\n"          //1. 
    "	add	%1, %0, %4\n"            //2.
    "	strex	%2, %1, [%3]\n"      //3.
    "	teq	%2, #0\n"                //4.
    "	bne	1b"                      //5.
    	: "=&r" (lockval), "=&r" (newval), "=&r" (tmp)
    	: "r" (&lock->slock), "I" (1 << TICKET_SHIFT)
    	: "cc");
    
    	while (lockval.tickets.next != lockval.tickets.owner) {      //6.
    		wfe();                                                   //7.
    		lockval.tickets.owner = ACCESS_ONCE(lock->tickets.owner);//8.
    	}
    
    	smp_mb(); //9.
    }
    1. 把lock->slock值保存到lock_val

    2. newval = lockval + (1 << TICKET_SHIFT)= lockval + (1 << 16),等价于newval =lockval.tickets.next++,相当于从银行取号机取号。

    3. strex tmp, newval, [&lock->slock],将新的值newval 存在lock中,strex将是否成功结果存入在tmp中

    4. 检查是否写入成功lockval.tickets.next

    5. 成功则跳到第6步,否则返回第1步重试,同上文类似也是实现原子操作

    6. lockval.tickets.next和owner成员是否相等,相等跳到第9步,成功获得锁;没有则跳到第7步。成功的话,相当于银行柜台已经叫我的号了,我就可以去办理业务了,没有的话,我还要继续等。

    7. 执行WFE指令,CPU进低功耗状态,省点功耗。

    8. 如果收到SEV指令,从低功耗状态恢复,重新获得新的owner值,因为一般是别人释放了锁才会发送SEV指令,这时owner的值已经发生了变化,需要重新从内存中获取(ACCESS_ONCE本身的实现就是增加了volatile这个关键字,它确保编译器每次访问的变量都是从内存中获取,防止编译器优化)。

    9. 执行barrier,同上文描述,不再赘述。

    4.3 AArch32的解锁实现

    static inline void arch_spin_unlock(arch_spinlock_t *lock)
    {
    	unsigned long tmp;
    	u32 slock;
    
    	smp_mb();
    
    	__asm__ __volatile__(
    "	mov	%1, #1\n"                //1. 
    "1:	ldrex	%0, [%2]\n"          //2. 
    "	uadd16	%0, %0, %1\n"        //3.
    "	strex	%1, %0, [%2]\n"      //4.
    "	teq	%1, #0\n"                //5.
    "	bne	1b"                      //6.
    	: "=&r" (slock), "=&r" (tmp)
    	: "r" (&lock->slock)
    	: "cc");
    
    	dsb_sev();                   //7.
    }
    1. 将tmp赋值为1

    2. 将lock->slock的值赋值给slock。

    3. 将slock的低16bit,也就是owner成员的值加1。

    4. 将新的值新的ower,使用strex写入中lock->slock,将是否成功结果存入在tmp中

    5. 检查是否写入成功,成功,跳到第7步,实现了原子操作;不成功跳到第6步;

    6. tmp不等于0(不成功),继续返回label 1重试,即跳回第2步

    4.4 AArch64的加锁实现

    static inline void arch_spin_lock(arch_spinlock_t *lock)
    {
    	unsigned int tmp;
    	arch_spinlock_t lockval, newval;
    
    	asm volatile(
    "	prfm	pstl1strm, %3\n"       //1.
    "1:	ldaxr	%w0, %3\n"             //2.
    "	add	%w1, %w0, %w5\n"           //3.
    "	stxr	%w2, %w1, %3\n"        //4.
    "	cbnz	%w2, 1b\n"             //5.
    	/* Did we get the lock? */
    "	eor	%w1, %w0, %w0, ror #16\n"  //6.
    "	cbz	%w1, 3f\n"                 //7.
    "	sevl\n"                        //8.
    "2:	wfe\n"                         //9.
    "	ldaxrh	%w2, %4\n"             //10.
    "	eor	%w1, %w2, %w0, lsr #16\n"  //11.
    "	cbnz	%w1, 2b\n"             //12.
    "3:"                               //13.
    	: "=&r" (lockval), "=&r" (newval), "=&r" (tmp), "+Q" (*lock)
    	: "Q" (lock->owner), "I" (1 << TICKET_SHIFT)
    	: "memory");
    }

    核心逻辑与AArch32类似,汇编实现会有不一样,这里不再展开。

    1. 从lock(memory)预取数据到L1cache中,加速执行。

    2. 使用ldaxr指令(Load-acquire exclusive register,带Exclusive和Acquire-Release 两种语义),将lock的值赋值给lockval。

    3. newval = lockval + (1 << TICKET_SHIFT)= lockval + (1 << 16),等价于newval =lockval.tickets.next++,相当于从银行取号机取号。

    4. 使用stxr指令,将newval赋值给lock,并将exclusive是否设置成功结果存放到tmp中。

    5. 如果tmp != 0,说明exclusive失败,需要重新跳到开始处(第2步)重试,因为这时候其他CPU核心可能有是执行流插入,抢在我们前面执行。否则继续。

    6. eor 异或运算实现lockval.tickets.next和owner成员是否相等的判断

    7. 如果相等,跳到label3(对应13步),获得锁进入临界区。否则往下,执行自旋。

    8. 使用SEVL指令(发送本地事件,Send Event Locally),唤醒本CPU核心,防止有丢失unlock的情况出现。

    9. 执行WFE指令,CPU进低功耗状态,省点功耗。

    10. 获取当前的Owner值存放在tmp中

    11. 判断lockval.tickets.next和owner成员的值是否相等

    12. 如果不相等就回跳到label2(对应第9步)。相等继续往下。

    13. 结束退出。

    4.5 AArch64的解锁实现

    static inline void arch_spin_unlock(arch_spinlock_t *lock)
    {
        asm volatile(
    "   stlrh   %w1, %0\n"
        : "=Q" (lock->owner)
        : "r" (lock->owner + 1)
        : "memory");
    }

    解锁的操作相对简单,stlrh除了将lock->owner++(相当于银行柜台叫下一个排队者的号码),将会兼有SEVDSB的功能。

    5. mcs spinlock

    5.1 关键数据结构和变量

    struct mcs_spinlock {
        struct mcs_spinlock *next; //1.
        in
    t nt locked; //2.
    };

    1. 当一个CPU试图获取一个spinlock时,它就会将自己的MCS lock加到这个spinlock的等待队列的队尾,然后next指向这个新的MCS lock。

    2. locked的值为1表示已经获得spinlock,为0则表示还没有持有该锁。

    5.2 加锁的实现

    static inline void mcs_spin_lock(struct mcs_spinlock **lock, struct mcs_spinlock *node)
    {
    	struct mcs_spinlock *prev;
     
    	node->locked = 0;               //1.
    	node->next   = NULL;
    
    	prev = xchg(lock, node);        //2
    	if (likely(prev == NULL)) {     //3
    		return;
    	}
    	ACCESS_ONCE(prev->next) = node; //4
    
    	arch_mcs_spin_lock_contended(&node->locked); //5.
    }

    先看下两个参数:

    • 第1个参数lock:是指向指针的指针(二级指针),是因为它指向的是末尾节点里的next域,而next本身是一个指向struct mcs_spinlock的指针。

    • 第2个参数node:试图加锁的CPU对应的MCS lock节点。

    接下来看代码逻辑

    1. 初始化node节点

    2. 找队列末尾的那个mcs lock。xchg完成两件事,一是给一个指针赋值,二是获取了这个指针在赋值前的值,相当于下面两句:

      prev = *lock; //队尾的lock
      *lock = node; //将lock指向新的node
    3. 如果队列为空,CPU可以立即获得锁,直接返回;否则继续往下。不需要基于"locked"的值进行spin,所以此时locked的值不需要关心。

    4. 等价于prev->next = node,把自己这个node加入等待队列的末尾。

    5. 调用arch_mcs_spin_lock_contended,等待当前锁的持有者将锁释放给我。

    #define arch_mcs_spin_lock_contended(l)	 \
    do {									 \
    	while (!(smp_load_acquire(l)))		 \ //1. 
    		arch_mutex_cpu_relax();			 \ //2.
    } while (0)
    1. 上文中的node->locked==0,说明没有获得锁,需要继续往下执行;说明已经获得锁,直接退出

    2. ARM64中arch_mutex_cpu_relax调用cpu_relax函数的,有一个内存屏障指令防止编译器优化。从4.1开始还存一个yield指令。该指令,为了提高性能,占用cpu的线程可以使用该给其他线程。

    5.3 解锁的实现

    static inline void mcs_spin_unlock(struct mcs_spinlock **lock, struct mcs_spinlock *node)
    {
    	struct mcs_spinlock *next = ACCESS_ONCE(node->next); //1.
    
    	if (likely(!next)) {                                 //2.
    		if (likely(cmpxchg(lock, node, NULL) == node))   //3
    			return;
    		while (!(next = ACCESS_ONCE(node->next)))        //4.
    			arch_mutex_cpu_relax();                      //5.
    	}
    
    	arch_mcs_spin_unlock_contended(&next->locked);       //6.
    }
    1. 找到等待队列中的下一个节点

    2. 当前没有其他CPU试图获得锁

    3. 直接释放锁。如果*lock == node,将*lock = NULL,然后直接返回;反之,说明当前队列中有等待获取锁的CPU。继续往下。cmpxchg作用翻译大致如下所示:

      cmpxchg(lock, node, NULL)
      {
      	ret = *lock;
      	if (*lock == node)
      		*lock = NULL;
      	return ret;
      }
    4. 距离函数开头获得"next"指针的值已经过去一段时间了,在这个时间间隔里,可能又有CPU把自己添加到队列里来了。需要重新获得next指针的值。于是,待新的node添加成功后,才可以通过arch_mcs_spin_unlock_contended()将spinlock传给下一个CPU

    5. 如果next为空,调用arch_mutex_cpu_relax,作用同上文。

    6. arch_mcs_spin_unlock_contended(l),实际上是调用smp_store_release((l), 1),将next->locked设置为1。将spinlock传给下一个CPU

    6. queued spinlock

    6.1 概述

    它首先肯定要解决mcs_spinlock的占用空间问题,否则设计再好,也无法合入主线。它是这样的:大部分情况用的锁大小控制在跟以前ticket spinlock一样的水平,设计了两个域:分别是lockedpending,分别表示锁当前是否被持有,已经在持有时,是否又来了一个申请者竞争。争锁好比抢皇位,皇位永远只有1个(对应locked域),除此之外还有1个太子位(对应pending域),防止皇帝出现意外能随时候补上,不至于出现群龙无首的状态,他们可以住在紫禁城内(使用qspinlock)。这两个位置被占后,其他人还想来竞争皇位,只有等皇帝和太子都移交各自的位子以后才可以,在等待的时候你需要在紫禁城外待在自己的府邸里(使用mcs_spinlock),减小紫禁城的拥挤(减少系统损耗)。

    即当锁的申请者小于或等于2时,只需要1个qspinlock就可以了,其所占内存的大小和ticket spinlock一样。当锁的竞争者大于2个时候,才需要(N-2)个mcs_spinlockqspinlock还是全局的,为降低锁的竞争,使用退化到per-cpu的mcs_spinlock锁,所有的mcs_spinlock锁串行构成一个等待队列,这样cacheline invalide带来的损耗减轻了很多。这是它的基本设计思想。

    在大多数情况下,per-cpu的mcs lock只有1个,除非发生了嵌套task上下文被中断抢占,因为中断上下文只有3种类(softirq、hardirq和nmi),所有每个CPU核心至多有4个mcs_spinlock锁竞争。而且,所有mcs_spinlock会串联到一个等待队列里的。

    b2634126435969553697f10380800b17.png

    上图展示的是:qspinlocklockedpending位都被占,需要进入mcs_spinlock等待队列,而CPU(2)是第1个进入等待队列的,qspinlocktail.cpu则被赋值成2,CPU(2)的mcs_spinlock数组的第3个成员空闲,则qspinlocktail.index被赋值成3。入队的是CPU(3)、CPU(1)和CPU(0),通过mcs_spinlocknext域将大家连成队列。

    假如等到竞争qspinlock的2个锁的持有者,都释放了,则CPU(2)的第3个空闲成员则获得锁,完成它的临界区访问后,通过qspinlocktail.cpu(类似于页表基址)和tail.index(类似于页表内的偏移),快速找到下个mcs_spinlock的node节点。

    下面要进入细节分析了,本文只考虑小端模式、NR_CPUS < 16K的情况,不考虑虚拟化这块,去掉qstats统计,力图聚焦在该锁实现的核心逻辑上。

    6.2 关键数据结构和变量

    struct qspinlock

    数据简化如下:

    typedef struct qspinlock {
        union {
            atomic_t val;
            struct {
                u8	locked;
                u8	pending;
            };
            struct {
                u16	locked_pending;
                u16	tail;
            };
        };
    } arch_spinlock_t;

    struct qspinlock包含了三个主要部分:

    1. locked(0- 7bit):表示是否持有锁,只有1和0两个值,1表示某个CPU已经持有锁,0则表示没有持有锁。

    2. pending(8bit):作为竞争锁的第一候补,第1个等待自旋锁的CPU只需要简单地设置它为1,则表示它成为第一候补,后面再有CPU来竞争锁,则需要创建mcs lock节点了;为0则表示该候补位置是空闲的。

    3. tail(16-31bit): 通过这个域可以找到自旋锁队列的最后一个节点。又细分为:

    • tail cpu(18-31bit):来记录和快速索引需要访问的mcs_spinlock位于哪个CPU上,作用类似于页表基址。

    • tail index(16-17bit):用来标识当前处在哪种context中。Linux中一共有4种context,分别是task, softirq, hardirq和nmi,1个CPU在1种context下,至多试图获取一个spinlock,1个CPU至多同时试图获取4个spinlock。当然也表示嵌套的层数。对应per-cpu的mcs_spinlock的数组(对应下文的struct mcs_spinlock mcs_nodes[4])的下标,作用类似于类似于页表内的偏移。

    struct mcs_spinlock

    mcs_spinlock的数据结构如下:

    struct mcs_spinlock {
    	struct mcs_spinlock *next;
    	int locked; /* 1 if lock acquired */
    	int count;  /* nesting count, see qspinlock.c */
    };
    
    struct qnode {
    	struct mcs_spinlock mcs;
    };
    
    static DEFINE_PER_CPU_ALIGNED(struct mcs_spinlock, mcs_nodes[4]);

    使用per-cpu的struct mcs_spinlock mcs_nodes[4],可以用来减少对cacheline的竞争。数组数量为4前文已经解释过了。struct mcs_spinlock具体含义如下:

    1. locked:用来通知这个CPU你可以持锁了,通过该域完成击鼓传花,当然这个动作是上一个申请者释放的时候通知的。

    2. count:嵌套的计数。只有第0个节点这个域才有用,用来索引空闲节点的。

    3. next:指向下一个锁的申请者,构成串行的等待队列的链表。

    6.3 加锁实现

    核心逻辑概述

    我们把一个qspinlock对应的**( tail, pending, locked)称为一个三元组(x,y,z)**,以此描述锁状态及其迁移,有2中特殊状态:

    1. 初始状态(无申请者竞争):(0, 0, 0)

    2. 过渡状态(类似于皇帝正在传位给太子,处于交接期):(0, 1, 0)

    按加锁原有代码只有慢速路径和非慢速路径,我们为了行文方便,将源代码的慢速路径又分为中速路径和慢速路径,这样就有以下三组状态:

    不同路径出现情况核心功能所在的函数和使用锁的种类
    快速路径锁没有持有者,locked位(皇位)空缺queued_spin_lock,使用qspinlock
    中速路径锁已经被持有,但pending位(太子位)空缺queued_spin_lock_slowpath,使用qspinlock
    慢速路径锁已经被持有,locked位和pending位都没空缺queued_spin_lock_slowpath,使用mcs_spinlock

    核心逻辑就变成:

    1. 如果没有人持有锁,那我们进入快速路径,直接拿到锁走人,走之前把locked位(皇位)标记成已抢占状态。期间,只需要使用qspinlock

    2. 如果有人持有锁(抢到了皇位成为皇帝),但pending位(太子位)空缺,那我们先抢这个位置,进入的是中速路径,等这个人(皇帝)释放锁(传位)了,我们就可以拿到锁了。期间,只需要用到qspinlock

    3. 如果这两个位置我们都抢不到,则进入慢速路径,需要使用per-cpu的mcs_spinlock

    总体状态流程图如下:

    44c8405963323d5719a281c1fca5b078.png

    梳理一下对应伪码描述:

    static __always_inline void queued_spin_lock(struct qspinlock *lock)
    {
        if (初始状态){ //没人持锁
            //进入快速路径
      return;}
     queued_spin_lock_slowpath(lock, val); //进入中速或者慢速路径
    }
    
    void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val) //lock是原始锁,val是之前拿到的lock->val的值
    {
        //代码片段1:过渡状态判断:看是否处于过渡状态,尝试等这个状态完成
        //代码片段2:慢速路径判断:看是否有其他申请者竞争,则直接进入慢速路径的核心片段
        //代码片段3:中速路径
    
    queue: 
        //代码判断4:慢速路径
        //1. 调用__this_cpu_inc(mcs_nodes[0].count)将当前cpu竞争锁的数量+1
        //2. mcs node的寻址和初始化
        //3. 调用queued_spin_trylock(),看看准备期间spinlock是不是已经被释放了
        //4. 处理等待队列已经有人的情况,重点是arch_mcs_spin_lock_contended();
        //5. 处理我们加入队列就是队首的情况
    locked:// 已经等到锁了
        //6.处理我们是队列最后一个节点的情况
        //7.处理我们前面还有节点的情况
        //8.调用arch_mcs_spin_unlock_contended()通知下一个节点
    release:
        //9.__this_cpu_dec(mcs_nodes[0].count);
    }

    下面我们开始分析

    queued_spin_lockqueued_spin_lock_slowpath函数的实现细节。

    实现细节分析

    static __always_inline void queued_spin_lock(struct qspinlock *lock)
    {
    	u32 val;
    
    	val = atomic_cmpxchg_acquire(&lock->val, 0, _Q_LOCKED_VAL); //1
    	if (likely(val == 0))                                       //2
    		return;
    	queued_spin_lock_slowpath(lock, val);                       //3
    }
    1. 通过atomic_cmpxchg_acquire,与之前的cmpxchg类似,用原子操作实现:

    • 将val赋值为lock->val。

    • lock->val值如果为0(没人拿到锁),将lock->val的值设为1(即*_Q_LOCKED_VAL*),三元组状态由**( tail, pending, locked)**= (0, 0, 0)迁移为(0, 0, 1)。

    • lock->val值如果不为0,保持lock->val的值不变。

    如果当前没有人获得锁,直接拿到锁返回。三元组的状态迁移已经在上一步完成了。否则需要继续往下,走中速/慢速路径。

    进入中速/慢速路径,调用queued_spin_lock_slowpath函数

    快速路径

    lock->val值==0(没人拿到锁),三元组状态由**( tail, pending, locked)**= (0, 0, 0)迁移为(0, 0, 1)。快速返回,不用等待。

    下面进入queued_spin_lock_slowpath函数的分析,我们先分析过渡状态判断和中速路径两个代码片段:

    void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
    {
    
        /* 过渡状态判断 */
    	if (val == _Q_PENDING_VAL) {                                          //0.    
    		int cnt = _Q_PENDING_LOOPS;
    		val = atomic_cond_read_relaxed(&lock->val,(VAL != _Q_PENDING_VAL) || !cnt--);
    	}
        /*其他部分代码*/
    
    }

    过渡状态判断

    1. 如果三元组( tail,pending,locked)状态如果是(0, 1, 0),则尝试等待状态变为 (0, 0, 1),但是只会循环等待1次。

      简单翻译一下atomic_cond_read_relaxed语句的意思为:如果(lock->val != _Q_PENDING_VAL) || !cnt--)则跳出循环,继续往下。

    中速路径

    31be6e0d02e2effb2fb9bebbc77268bd.png

    进入中速路径的前提是当前没有其他竞争者在等待队列中排队以及pending位空缺,之后我们先将pending位占住。如果已经这段期间已经有人释放了,那直接获取锁并将pending位重置为空闲,反正则要自旋等持锁者释放锁再做其他动作。

    void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
    {
    
        /* ... 过渡状态判断及处理,代码省略 */
    
        /* ... 判断是否有pending和tail是否有竞争者,有竞争者直接进入慢速路径排队,代码省略 */
        
        /* 中速路径开始 */
        val = atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val);             //1. 
        if (!(val & ~_Q_LOCKED_MASK)) {                                        //2.
            if (val & _Q_LOCKED_MASK) {                                        //3.
                atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_MASK)); //4.
            }
    
            clear_pending_set_locked(lock);                                    //5.
            return;                                                            //6.
        }
        
        if (!(val & _Q_PENDING_MASK))                                          //7.
            clear_pending(lock);
        /* 中速路径结束 */
    
        /*其他部分代码*/
    
    }

    这部分代码的核心逻辑是在竞争qspinlocpending,即竞争太子位。进入中速路径的前提是三元组( tail,pending,locked)=(0, 0, *)

    1. 离“是否是过渡状态”和“除锁的持有者者之外是否竞争者”的判断已经过了一段时间了。需要用atomic_fetch_or_acquire函数,通过原子操作,重新看一下锁的状态,执行了两个动作:

      执行完,三元组状态会从(0, 0, *)改为(0, 1, *)。

    • val = lock->val(成为“原始的lock->val”)

    • lock->val的peding域被置位。

    如果原始的lock->valpendingtail域都为0,则表明没有pending位没有其他竞争者,即太子位空闲可以去竞争;否则,则表明有其他竞争者,不满足中速路径的条件,需要进入慢速路径。lock->val的和tail域就不用关心了。所以此时的三元状态可以使(*, 1, *)

    如果locked域为1,表明位置被占着,已经有人在持锁了,我们需要跳到第4步,等锁被释放;否则,没有人持锁,皇位是空着的,我们跳到第5步,直接去上位就好了。

    spin等待,直到lock->vallocked域变回0,也就是等皇位空出来。三元组状态会从(*, 1, 1) 转变为( *, 1, 0)

    拿到锁了,可以登基上位了,但还要做些清理工作,调用clear_pending_set_locked,将lock->valpending域清零以及将locked置1。即三元组状态由(*, 1, 0) 转变为( *, 0, 1)。

    结束,提前返回。

    如果没有进入中速路径,第1步开始时获得的锁的状态是(n, 0, *) ,需要把在第一步设置的现在锁的pending域恢复成0。

    慢速路径

    1e89f79b361da37a9ae79b1dd8c31c66.png

    上面流程图有些地方不太准确,但没有关系,可以先帮我们建立起总体流程印象,细节后续我们会展开。

    在拿到mcs lock的空闲节点之后,我们先用queued_spin_trylock函数,检查一下在我们准备的这段时间里,是不是已经有人释放了该锁了,如果有人释放了,就提前返回了。

    获得锁之前,要分两种情况分别处理:

    • 如果队列中只有我们,那只用自旋等lock的pending位空出来即可。

    • 如果队列中还有其他竞争者在排在我们前面,则需要自旋等前序节点释放该锁。

    获得锁之后,也要分两种情况处理:

    • 如果队列中只有我们,将锁的locked位置位,表明锁已经被我们持有了,不需要再做其他处理。

    • 如果队列中还有其他竞争者在排在我们后面,则需要将锁传递这位竞争者,当然需要要等它正式加入。

    慢速路径的简化代码如下所示:

    void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
    {
    	struct mcs_spinlock *prev, *next, *node;
    	u32 old, tail;
    	int idx;
    
        /* ... 过渡状态判断及处理,省略 */
    
        /* 先判断是否有pending和tail是否有竞争者,有竞争者直接进入慢速路径排队 */
        if (val & ~_Q_LOCKED_MASK)
            goto queue;
    
        /* 中速路径代码开始 */
    	val = atomic_fetch_or_acquire(_Q_PENDING_VAL, &lock->val);
        if (!(val & ~_Q_LOCKED_MASK)) {
            // ...中速路径处理,省略
            return;
        }
        /* 中速路径代码结束 */
    
        /* 之前代码的收尾清理工作 */
    	if (!(val & _Q_PENDING_MASK))
    		clear_pending(lock); //将pending状态恢复。
    
    
        //慢速路径代码开始位置
    queue://进入mcs lock队列
    	node = this_cpu_ptr(&mcs_nodes[0]);                            //1.
    	idx = node->count++;
    	tail = encode_tail(smp_processor_id(), idx);
    
    	node += idx;                                                   //2.
    	barrier();
    	node->locked = 0;
    	node->next = NULL;
    
    	if (queued_spin_trylock(lock))                                 //3.
    		goto release;
    	smp_wmb();
    
    	old = xchg_tail(lock, tail);                                   //4.
    	next = NULL;
    
    	if (old & _Q_TAIL_MASK) {                                      //5.
    		prev = decode_tail(old);                                   //6.
    		WRITE_ONCE(prev->next, node);                              //7.
    		arch_mcs_spin_lock_contended(&node->locked);               //8.
    		next = READ_ONCE(node->next);                              //9.
    		if (next)
    			prefetchw(next);
    	}
    
    	val = atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK)); //10.
    
    locked: 
    	if (((val & _Q_TAIL_MASK) == tail) &&                            //11.
            atomic_try_cmpxchg_relaxed(&lock->val, &val, _Q_LOCKED_VAL))
    		goto release; /* No contention */
    
    	set_locked(lock);                                                //12.
    
    	if (!next)                                                       //13.
    		next = smp_cond_load_relaxed(&node->next, (VAL));
    
    	arch_mcs_spin_unlock_contended(&next->locked);                   //14.
    
    release:
    	__this_cpu_dec(mcs_nodes[0].count);                              //15.
    }

    之前是进入queued_spin_lock_slowpath函数,有“是否过渡状态的判断”及处理,“是否要进入慢速路径的判断”及处理,如果进了中速路径的处理。下面正式进入慢速路径的处理。因为系统可能有多个cpu并发,即使在同一个cpu上也可能切换了4种context上下文的中一种,说不定大家竞争的锁已经都被释放了也说不准,所以此时三元组的状态是未知的。

    1. 获得local CPU上的第0个节点mcs_spinlock类型的node节点,并将节点的count值+1,count值记录了local CPU上空闲节点的起始下标,该值也只在第0个有意义。encode_tail函数,节点编号编码成tail(2bit的tail index和14bit的tail cpu),同时还完成:

    • 区分到底是tail域没有指向任何节点,还是指向了第0个CPU的第0个节点。

    • 检查当前CPU上自旋锁嵌套的层数是否超过4层。

    根据idx取到取到空闲节点,相当于node = mcs_nodes[idx]。这里有barrier保序,防止编译器优化。然后对空闲节点进行初始化。

    调用queued_spin_trylock函数,检查一下在我们准备的这段时间里,是不是已经有人释放了该锁了。如果成功获得锁,则直接跳到lable release处;否则,继续往下。queued_spin_trylock函数的作用具体来讲是这样:

    连续两次使用原子操作检查锁的状态:如果val为0(对应三元组(0, 0, 0))并且再次读取val的locked域也为0,则表示可以获得锁。上文说过,此时锁的状态是未知的。

    在调用xchg_tail函数之前,有smp_wmb内存屏障,保证tail值获得的正确性。该函数将lock->tail设置为新生成的tail值,并将旧的值存在old中。

    如果旧的tail为0,说明队列里只有我们这个新生成的节点,直接跳到第10步;否则继续往下执行

    通过旧的tail拿到等待队列的尾结点prev

    将当前节点插入等待队列,作为新的尾节点。

    自旋等待本节点的locked域变为1。在local CPU上自旋等待自己的locked成员被置位。arch_mcs_spin_lock_contended在arm64上最终会调用__cmpwait_case_##name宏,展开后同arm64的tick spinlock的arch_spin_lock的实现类似。核心功能都是在自旋时执行WFE节省部分能耗。

    后面3行代码,是看下在我们后面是否还有其他人在排队,如果有的话,使用prfm指令,提前将相关值预取到local CPU的cache中,加速后续的执行。

    使用原子操作重新获取lock->val的值,并且循环等到直到pendinglocked都为0。三元组( tail, pending, locked)的值为(*, 0, 0),也就是说需要等到皇位和太子位都是空闲的状态才是我们真正获得了锁的条件。

    如果tail还是我们设置的,说明我们同时是等待队列的最后一个节点或者说是唯一节点,后面没人在排队了。通过atomic_try_cmpxchg_relaxed原子的将locked域设置为1,至此才真正完成了锁的合法拥有,直接跳到最后1步。三元组的状态迁移是(n, 0, 0) --> (0, 0, 1)。否则,如果tail发生了改变,说明后面还有节点或者pending位被占,则继续往下处理。

    先将locked设置为1。三元组的状态迁移是(*, *, 0) --> (*, 0, 1)。

    等待的下一个节点还有正式加入进来,则需要等next变成非空(非空才真正了完成加入)。

    调用arch_mcs_spin_unlock_contended,将下一个节点的locked值设置成1 ,完成了锁的传递。也就是完成了击鼓传花。

    释放本节点。

    6.4 解锁实现

    static __always_inline void queued_spin_unlock(struct qspinlock *lock)
    {
    	smp_store_release(&lock->locked, 0);
    }

    qspinlocklocked清零即可。

    【参考资料】

    1. 【原创】linux spinlock/rwlock/seqlock原理剖析(基于ARM64)

    2.  宋宝华:几个人一起抢spinlock,到底谁先抢到?

    3. Linux内核同步机制之(四):spin lock,蜗窝科技

    4. Linux中的spinlock机制[一] - CAS和ticket spinlock,兰新宇,知乎


    推荐阅读:

    专辑|Linux文章汇总

    专辑|程序人生

    专辑|C语言

    我的知识小密圈

    关注公众号,后台回复「1024」获取学习资料网盘链接。

    欢迎点赞,关注,转发,在看,您的每一次鼓励,我都将铭记于心~

    展开全文
  • C语言Spinlock

    2021-01-26 11:13:01
    #define SPINLOCK_ATTR static __inline __attribute__((always_inline, no_instrument_function)) __attribute__((always_inline))的意思是强制内联,即加了该声明的函数在编译时不会编译成函数,而是直接扩展到...

    C语言编译与汇编

    语法

    inline函数

    #define SPINLOCK_ATTR static __inline __attribute__((always_inline, no_instrument_function))
    
    1. __attribute__((always_inline))的意思是强制内联,即加了该声明的函数在编译时不会编译成函数,而是直接扩展到调用函数体内。

    汇编语法

    asm asm-qualifiers ( AssemblerTemplate 
                     : OutputOperands 
                     [ : InputOperands
                     [ : Clobbers ] ])
    
    asm asm-qualifiers ( AssemblerTemplate 
                          : OutputOperands
                          : InputOperands
                          : Clobbers
                          : GotoLabels)
    

    关键词asm是GNU扩展的一个关键词,当代码能用-ansi-std选项编译时,建议使用__asm__替代asm。

    asm-qualifiers有三个可选项:

    1. volatile
      • 典型场景是将input value转换为output value
      • 加上该关键字,严禁将此处的汇编语句与其它的语句重组合优化。例如,该段汇编代码的output为空时,若不加上volatile,则编译器可能会优化,但是加上就一定按照原来的语句进行执行。
    2. inline
      • 使用该标记,会使得该段汇编代码在使用时size最小
    3. goto
      • 代码跳转

    汇编指令

    在实现SpinLock时,涉及到的一些汇编指令如下所示,下面依次介绍具体语句的意思。

    /* Compile read-write barrier */
    #define barrier() asm volatile("": : :"memory")
    
    /* Pause instruction to prevent excess processor bus usage */
    #define cpu_relax() asm volatile("pause\n": : :"memory")
    
    #define __HLE_ACQUIRE ".byte 0xf2 ; "
    #define __HLE_RELEASE ".byte 0xf3 ; "
    
    #define cpu_pause_v2()  __asm__ (".byte 0xf3, 0x90")
    

    Pause

    Pause指令,会减少CPU的消耗,节省电量。指令的本质功能是:让加锁失败的CPU睡眠大约30个clock,从而使得读操作的频率低很多,流水线重排的代价也会很小。

    官方解释:

    Description
    Improves the performance of spin-wait loops. When executing a “spin-wait loop,” a Pentium 4 or Intel Xeon processor suffers a severe performance penalty when exiting the loop because it detects a possible memory order violation. The PAUSE instruction provides a hint to the processor that the code sequence is a spin-wait loop. The processor uses this hint to avoid the memory order violation in most situations, which greatly improves processor performance. For this reason, it is recommended that a PAUSE instruction be placed in all spin-wait loops.

    An additional function of the PAUSE instruction is to reduce the power consumed by a Pentium 4 processor while executing a spin loop. The Pentium 4 processor can execute a spin-wait loop extremely quickly, causing the processor to consume a lot of power while it waits for the resource it is spinning on to become available. Inserting a pause instruction in a spin-wait loop greatly reduces the processor’s power consumption.

    This instruction was introduced in the Pentium 4 processors, but is backward compatible with all IA-32 processors. In earlier IA-32 processors, the PAUSE instruction operates like a NOP instruction. The Pentium 4 and Intel Xeon processors implement the PAUSE instruction as a pre-defined delay. The delay is finite and can be zero for some processors. This instruction does not change the architectural state of the processor (that is, it performs essentially a delaying no-op operation).

    This instruction’s operation is the same in non-64-bit modes and 64-bit mode.

    翻译过来就是:

    PAUSE指令提升了自旋等待循环(spin-wait loop)的性能。当执行一个循环等待时,Intel P4或Intel Xeon处理器会因为检测到一个可能的内存顺序违规(memory order violation)而在退出循环时使性能大幅下降。PAUSE指令给处理器提了个醒:这段代码序列是个循环等待。处理器利用这个提示可以避免在大多数情况下的内存顺序违规,这将大幅提升性能。因为这个原因,所以推荐在循环等待中使用PAUSE指令。

    PAUSE的另一个功能就是降低Intel P4在执行循环等待时的耗电量。Intel P4处理器在循环等待时会执行得非常快,这将导致处理器消耗大量的电力,而在循环中插入一个PAUSE指令会大幅降低处理器的电力消耗。

    nginx代码示例

    // 以下代码引用自ngix源码的ngx_gcc_atomic_amd64.h文件
    /* old "as" does not support "pause" opcode */
    #define ngx_cpu_pause()         __asm__ (".byte 0xf3, 0x90")
    
    // 以下代码引用自ngix源码的ngx_gcc_atomic_x86.h文件
    #define ngx_cpu_pause()         __asm__ ("pause")
    

    x86系列架构中,pause指令也会被翻译成0xf3, 0x90, 因此,但是没有pause指令的机器会将其视为NOP指令。

    参考一

    memory

    The "memory" clobber tells the compiler that the assembly code performs memory reads or writes to items other than those listed in the input and output operands (for example, accessing the memory pointed to by one of the input parameters). To ensure memory contains correct values, GCC may need to flush specific register values to memory before executing the asm. Further, the compiler does not assume that any values read from memory before an asm remain unchanged after that asm; it reloads them as needed. Using the "memory" clobber effectively forms a read/write memory barrier for the compiler.

    Note that this clobber does not prevent the processor from doing speculative reads past the asm statement. To prevent that, you need processor-specific fence instructions.

    ngix代码示例

    // 以下代码引用自ngix源码的ngx_gcc_atomic_x86.h文件
    /*
     * on x86 the write operations go in a program order, so we need only
     * to disable the gcc reorder optimizations
     */
    #define ngx_memory_barrier()    __asm__ volatile ("" ::: "memory")
    
    // 以下代码引用自ngix源码的ngx_gcc_atomic_amd64.h文件
    #define ngx_memory_barrier()    __asm__ volatile ("" ::: "memory")
    

    简单来说,就是告诉编译器内存的内容可能被更改了,需要无效所有Cache,并访问实际的内容,而不是Cache。

    即memory强制gcc编译器假设RAM所有内存单元均被汇编指令修改,这样cpu中的registers和cache中已缓存的内存单元中的数据将作 废。cpu将不得不在需要的时候重新读取内存中的数据。这就阻止了cpu又将registers,cache中的数据用于去优化指令,而避免去访问内存。

    .byte 0xf2;

    • [0xF2] REPNE/REPNZ prefix or BND prefix

    .byte 0xf3;

    • [0xF3] REP or REPE/REPZ prefix

    SpinLock实现

    代码引用自github spinlock

    spinlock-cmpxchg.h

    #ifndef _SPINLOCK_CMPXCHG_H
    #define _SPINLOCK_CMPXCHG_H
    
    typedef struct {
        volatile char lock;
    } spinlock;
    
    #define SPINLOCK_ATTR static __inline __attribute__((always_inline, no_instrument_function))
    
    /* Pause instruction to prevent excess processor bus usage */
    #define cpu_relax() asm volatile("pause\n": : :"memory")
    
    SPINLOCK_ATTR char __testandset(spinlock *p)
    {
        char readval = 0;
    
        asm volatile (
                "lock; cmpxchgb %b2, %0"
                : "+m" (p->lock), "+a" (readval)
                : "r" (1)
                : "cc");
        return readval;
    }
    
    SPINLOCK_ATTR void spin_lock(spinlock *lock)
    {
        while (__testandset(lock)) {
            /* Should wait until lock is free before another try.
             * cmpxchg is write to cache, competing write for a sinlge cache line
             * would generate large amount of cache traffic. That's why this
             * implementation is not scalable compared to xchg based one. Otherwise,
             * they should have similar performance. */
            cpu_relax();
        }
    }
    
    SPINLOCK_ATTR void spin_unlock(spinlock *s)
    {
        s->lock = 0;
    }
    
    #define SPINLOCK_INITIALIZER { 0 }
    
    #endif /* _SPINLOCK_CMPXCHG_H */
    
    
    

    spinlock-xchg

    
    #ifndef _SPINLOCK_XCHG_H
    #define _SPINLOCK_XCHG_H
    
    /* Spin lock using xchg.
     * Copied from http://locklessinc.com/articles/locks/
     */
    
    /* Compile read-write barrier */
    #define barrier() asm volatile("": : :"memory")
    
    /* Pause instruction to prevent excess processor bus usage */
    #define cpu_relax() asm volatile("pause\n": : :"memory")
    
    static inline unsigned short xchg_8(void *ptr, unsigned char x)
    {
        __asm__ __volatile__("xchgb %0,%1"
                    :"=r" (x)
                    :"m" (*(volatile unsigned char *)ptr), "0" (x)
                    :"memory");
    
        return x;
    }
    
    #define BUSY 1
    typedef unsigned char spinlock;
    
    #define SPINLOCK_INITIALIZER 0
    
    static inline void spin_lock(spinlock *lock)
    {
        while (1) {
            if (!xchg_8(lock, BUSY)) return;
        
            while (*lock) cpu_relax();
        }
    }
    
    static inline void spin_unlock(spinlock *lock)
    {
        barrier();
        *lock = 0;
    }
    
    static inline int spin_trylock(spinlock *lock)
    {
        return xchg_8(lock, BUSY);
    }
    
    #endif /* _SPINLOCK_XCHG_H */
    
    

    spinlock-xchg-backoff

    
    #ifndef _SPINLOCK_XCHG_BACKOFF_H
    #define _SPINLOCK_XCHG_BACKOFF_H
    
    /* Spin lock using xchg. Added backoff wait to avoid concurrent lock/unlock
     * operation.
     * Original code copied from http://locklessinc.com/articles/locks/
     */
    
    /* Compile read-write barrier */
    #define barrier() asm volatile("": : :"memory")
    
    /* Pause instruction to prevent excess processor bus usage */
    #define cpu_relax() asm volatile("pause\n": : :"memory")
    
    static inline unsigned short xchg_8(void *ptr, unsigned char x)
    {
        __asm__ __volatile__("xchgb %0,%1"
                    :"=r" (x)
                    :"m" (*(volatile unsigned char *)ptr), "0" (x)
                    :"memory");
    
        return x;
    }
    
    #define BUSY 1
    typedef unsigned char spinlock;
    
    #define SPINLOCK_INITIALIZER 0
    
    static inline void spin_lock(spinlock *lock)
    {
        int wait = 1;
        while (1) {
            if (!xchg_8(lock, BUSY)) return;
        
            // wait here is important to performance.
            for (int i = 0; i < wait; i++) {
                cpu_relax();
            }
            while (*lock) {
                wait *= 2; // exponential backoff if can't get lock
                for (int i = 0; i < wait; i++) {
                    cpu_relax();
                }
            }
        }
    }
    
    static inline void spin_unlock(spinlock *lock)
    {
        barrier();
        *lock = 0;
    }
    
    static inline int spin_trylock(spinlock *lock)
    {
        return xchg_8(lock, BUSY);
    }
    
    #endif /* _SPINLOCK_XCHG_BACKOFF_H */
    
    

    spinlock-xchg-hle

    #ifndef _SPINLOCK_XCHG_H
    #define _SPINLOCK_XCHG_H
    
    /* Spin lock using xchg.
     * Copied from http://locklessinc.com/articles/locks/
     */
    
    /* Compile read-write barrier */
    #define barrier() asm volatile("": : :"memory")
    
    /* Pause instruction to prevent excess processor bus usage */
    #define cpu_relax() asm volatile("pause\n": : :"memory")
    
    #define __HLE_ACQUIRE ".byte 0xf2 ; "
    #define __HLE_RELEASE ".byte 0xf3 ; "
    
    static inline unsigned short xchg_8(void *ptr, unsigned char x)
    {
        __asm__ __volatile__(__HLE_ACQUIRE "xchgb %0,%1"
                    :"=r" (x)
                    :"m" (*(volatile unsigned char *)ptr), "0" (x)
                    :"memory");
    
        return x;
    }
    
    #define BUSY 1
    typedef unsigned char spinlock;
    
    #define SPINLOCK_INITIALIZER 0
    
    static inline void spin_lock(spinlock *lock)
    {
        while (1) {
            if (!xchg_8(lock, BUSY)) return;
        
            while (*lock) cpu_relax();
        }
    }
    
    static inline void spin_unlock(spinlock *lock)
    {
        __asm__ __volatile__(__HLE_RELEASE "movb $0, %0"
                :"=m" (*lock)
                :
                :"memory");
    }
    
    static inline int spin_trylock(spinlock *lock)
    {
        return xchg_8(lock, BUSY);
    }
    
    #endif /* _SPINLOCK_XCHG_H */
    
    
    

    性能对比

    cmpxchg < xchg < xchg-backoff < xchg-hle

    测试条件

    总叠加数为1000万,并发量(线程数)分别为1, 5, 10,每种场景测试三遍。

    function run_test() {
        for nthr in 1 5 10; do
            ./$1 $nthr > /dev/null
            for i in `seq 1 3`; do
                ./$1 $nthr
            done
            echo
        done
    }
    

    性能测试结果

    test spin lock using cmpxchg
    0.104997	0.108640	0.110231	
    1.502738	1.780505	1.588555	
    3.188721	3.372979	3.288685	
    test spin lock using xchg
    0.102495	0.105192	0.101462	
    1.511772	1.467292	1.315992	
    1.966999	1.767013	1.607366	
    test spin lock using xchg-backoff
    0.102869	0.102168	0.107096	
    0.210090	0.148667	0.115931	
    0.691257	0.183631	0.194179	
    test spin lock using xchg-hle
    0.248885	0.248751	0.249399	
    0.061266	0.056018	0.054837	
    0.042291	0.052939	0.036410
    
    展开全文
  • Linux内核源码中最常见的数据结构之【Spinlock】 1 定义 在软件工程中,自旋锁是一种锁,它使试图获取它的线程在循环中原地等待(“自旋”),同时反复检查锁是否可用。 由于线程保持活动状态但没有执行有用的任务,...

    Linux内核源码中最常见的数据结构之【Spinlock】

    1 定义

    在软件工程中,自旋锁是一种锁,它使试图获取它的线程在循环中原地等待(“自旋”),同时反复检查锁是否可用。

    由于线程保持活动状态但没有执行有用的任务,因此使用这种锁是一种忙等待。一旦获得,自旋锁通常会一直保持直到它们被显式释放,尽管在某些实现中,如果正在等待的线程(持有锁的线程)阻塞或“进入睡眠状态”,它们可能会自动释放。

    为什么Linux内核经常使用自旋锁?

    因为它们避免了操作系统进程重新调度或上下文切换的开销,所以如果线程可能只在短时间内被阻塞,自旋锁是有效的。出于这个原因,操作系统内核经常使用自旋锁。

    自旋锁有什么弊端?

    如果长时间持有自旋锁,则会变得浪费,因为它们可能会阻止其他线程运行并需要重新调度。线程持有锁的时间越长,线程在持有锁时被操作系统调度程序中断的风险就越大。如果发生这种情况,其他线程将处于“旋转”状态(反复尝试获取锁)。结果是无限期推迟,直到持有锁的线程可以完成并释放它。

    2 发展

    wild spinlock

    wild spinlock是早期的自旋锁实现,实现也非常简单

    struct spinlock {
    	volatile unsigned int lock;
    };
    
    //上锁
    void spin_lock(struct spinlock *lock)
    {
        while (lock->lock);
        lock->lock = 1;
        //可以优化为==>while (lock->locked || test_and_set(&lock->locked));
    }
    
    //释放锁
    void spin_unlock(struct spinlock *lock)
    {
        lock->lock = 0;
    }
    

    在这里插入图片描述

    但是考虑到多个CPU核心由于架构原因,如上图所示,cache无法在同一时间更新,现在CPU1释放锁,CPU2-CPUN哪个CPU的cache先更新就能获得锁。所以不排队的机制可能导致部分CPU饿死,始终拿不到锁。为了解决这个问题,自旋锁从wild spinlock跨度到ticket spinlock。

    ticket spinlock

    引入排队机制,以FIFO的顺序处理申请者,谁先申请,谁先获得,保证公平性。

    struct spinlock {
        // 当前持有者票号
        int owner;
        // 票号
        int next;
    };
    
    //上锁
    void spin_lock(struct spinlock *lock)
    {
            unsigned short next = xadd(&lock->next, 1);
            while (lock->owner != next);
    }
    
    //释放锁
    void spin_unlock(struct spinlock *lock)
    {
            lock->owner++;
    }
    

    spin_lock()中xadd()也是一条原子操作,原子的将变量加1,并返回变量之前的值。当owner等于next时,代表锁是释放状态,否则,说明锁是持有状态。next就像是排队拿票机制,每来一个申请者,next就加1,代表票号,owner代表当前持锁的票号。

    在这里插入图片描述

    ticket spinlock也存在问题

    随着CPU数量的增多,总线带宽压力很大。而且延迟也会随着增长,性能也会逐渐下降。而且CPU0释放锁后,CPU1-CPU7也只有一个CPU可以获得锁,理论上没有必要影响其他CPU的缓存,只需要影响接下来应该获取锁的CPU(按照FIFO的顺序)。这说明ticket spinlock不是scalable(同样最初的wild spinlock也存在此问题)。

    为了解决这个问题,自旋锁从ticket spinlock跨度到mcs pinlock。

    mcs spinlock

    先来思考下造成该问题的原因。根因就是每个CPU都spin在共享变量spinlock上。所以我们只需要保证每个CPU spin的变量是不同的就可以避免这种情况了。
    MCS 锁(由 Mellor-Crummey 和 Scott 提出)是一个简单的自旋锁,具有公平的理想属性,并且每个 cpu 都试图获取在本地变量上旋转的锁。 它避免了常见的test-and-set自旋锁实现引起的昂贵的cacheline bouncing。
    同时我们需要换一种排队的方式,例如单链表。单链表也可以做到FIFO,每次解锁时,也只需要通知链表头的CPU即可。

    struct mcs_spinlock {
            struct mcs_spinlock *next;
            int locked;
    };
    
    // 自己的排队节点node需要自己在外部初始化,它将被排队到lock指示的等锁队列中。
    void mcs_spin_lock(struct mcs_spinlock **lock, struct mcs_spinlock *node)
    {
        struct mcs_spinlock *prev;
    
        /* Init node */
        node->locked = 0;
        node->next   = NULL;
        // 原子执行*lock = node并返回原来的*lock
        prev = xchg(lock, node);
        if (likely(prev == NULL)) {
            return;
        }
        // 原子执行 prev->next = node;
        // 这相当于一个排入队的操作。记作(*)
        WRITE_ONCE(prev->next, node);
    
        /* 自旋,直到锁的持有者放弃锁 */
    	arch_mcs_spin_lock_contended(&node->locked);
    }
    
    // 自己要传入一个排入到lock的队列中的自己的node对象node,解锁操作就是node出队并且主动将队列中下一个对象的locked帮忙设置成1.
    void mcs_spin_unlock(struct mcs_spinlock **lock, struct mcs_spinlock *node)
    {
        // 原子获取node的下一个节点。
        struct mcs_spinlock *next = READ_ONCE(node->next);
    
        if (likely(!next)) {
            // 二次确认真的是NULL,则返回,说明自己是最后一个,什么都不需要做。
            if (likely(cmpxchg_release(lock, node, NULL) == node))
                return;
            // 否则说明在if判断next为MULL和cmpxchg原子操作之间有node插入,随即等待它的mcs_spin_lock调用完成,即上面mcs_spin_lock中的(*)注释那句完成以后
            while (!(next = READ_ONCE(node->next)))
                cpu_relax_lowlatency();
        }
        // 原子执行next->locked = 1;
        /*将锁传递给下一个等待者*/
        arch_mcs_spin_unlock_contended(&next->locked);
    }
    

    通过以上步骤,我们可以看到每个CPU都spin在自己的使用变量上面,因此不会存在ticket spinlock的问题。

    在这里插入图片描述

    overview

    在这里插入图片描述

    3 实现

    在Linux内核源码中的具体实现如下:

    位于:include/linux/spinlock_types.h,通用的spinlock类型定义以及初始化

    typedef struct spinlock {
    	union {
    		struct raw_spinlock rlock;
    
    #ifdef CONFIG_DEBUG_LOCK_ALLOC
    # define LOCK_PADSIZE (offsetof(struct raw_spinlock, dep_map))
    		struct {
    			u8 __padding[LOCK_PADSIZE];
    			struct lockdep_map dep_map;
    		};
    #endif
    	};
    } spinlock_t;
    
    typedef struct raw_spinlock {
    	arch_spinlock_t raw_lock;
    #ifdef CONFIG_DEBUG_SPINLOCK
    	unsigned int magic, owner_cpu;
    	void *owner;
    #endif
    #ifdef CONFIG_DEBUG_LOCK_ALLOC
    	struct lockdep_map dep_map;
    #endif
    } raw_spinlock_t;
    

    可以看到,spinlock_t的实现主要靠raw_spinlock_t,而raw_spinlock_t又调用了与CPU架构相关的arch_spinlock_t

    如果你使用的是alpha架构

    typedef struct {
    	volatile unsigned int lock;
    } arch_spinlock_t;
    

    如果你使用的是arm架构

    typedef struct {
    	union {
    		u32 slock;
    		struct __raw_tickets {
    #ifdef __ARMEB__
    			u16 next;
    			u16 owner;
    #else
    			u16 owner;
    			u16 next;
    #endif
    		} tickets;
    	};
    } arch_spinlock_t;
    

    由于不同CPU架构spinlock的具体实现不同,下文主要依据ARMv6架构的实现源码

    Linux内核提供了多个个宏用于初始化、测试及设置自旋锁。所有这些宏都是基于原子操作的,这样可以保证即使有多个运行在不同CPU上的进程试图同时修改自旋锁,自旋锁也能够被正确地更新。

    1. 初始化自旋锁
    # define spin_lock_init(lock)					\
    do {								\
    	static struct lock_class_key __key;			\
    								\
    	__raw_spin_lock_init(spinlock_check(lock),		\
    			     #lock, &__key, LD_WAIT_CONFIG);	\
    } while (0)
    
    void __raw_spin_lock_init(raw_spinlock_t *lock, const char *name,
    			  struct lock_class_key *key, short inner)
    {
    #ifdef CONFIG_DEBUG_LOCK_ALLOC
    	/*
    	 * Make sure we are not reinitializing a held lock:
    	 */
    	debug_check_no_locks_freed((void *)lock, sizeof(*lock));
    	lockdep_init_map_wait(&lock->dep_map, name, key, 0, inner);
    #endif
    	lock->raw_lock = (arch_spinlock_t)__ARCH_SPIN_LOCK_UNLOCKED;
    	lock->magic = SPINLOCK_MAGIC;
    	lock->owner = SPINLOCK_OWNER_INIT;
    	lock->owner_cpu = -1;
    }
    
    
    #define SPINLOCK_MAGIC		0xdead4ead
    #define SPINLOCK_OWNER_INIT	((void *)-1L)
    #define __ARCH_SPIN_LOCK_UNLOCKED	{ 0 }
    
    static __always_inline raw_spinlock_t *spinlock_check(spinlock_t *lock)
    {
    	return &lock->rlock;
    }
    

    初始化自旋锁,把lock->raw_lock置为0,lock->magic置为0xdead4ead,lock->owner置为初始状态

    1. 上锁
    static __always_inline void spin_lock(spinlock_t *lock)
    {
    	raw_spin_lock(&lock->rlock);
    }
    
    #define raw_spin_lock(lock)	_raw_spin_lock(lock)
    
    /**单核CUP UP**/
    #define _raw_spin_lock(lock)			__LOCK(lock)
    
    /*上锁前关闭抢占*/
    #define __LOCK(lock) \
      do { preempt_disable(); ___LOCK(lock); } while (0)
    
    /*将lock+1*/
    #define ___LOCK(lock) \
      do { __acquire(lock); (void)(lock); } while (0)
    
    /**对称多核CPU,SMP**/
    void __lockfunc _raw_spin_lock(raw_spinlock_t *lock)
    {
    	__raw_spin_lock(lock);
    }
    
    static inline void __raw_spin_lock(raw_spinlock_t *lock)
    {
        /*上锁前关闭抢占,防止死锁*/
    	preempt_disable();
    	spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
    	LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
    }
    
    void do_raw_spin_lock(raw_spinlock_t *lock)
    {
    	debug_spin_lock_before(lock);
    	arch_spin_lock(&lock->raw_lock);
    	mmiowb_spin_lock();
    	debug_spin_lock_after(lock);
    }
    
    static inline void arch_spin_lock(arch_spinlock_t *lock)
    {
    	unsigned long tmp;
    	u32 newval;
    	arch_spinlock_t lockval;
    
    	prefetchw(&lock->slock);
    	__asm__ __volatile__(
    "1:	ldrex	%0, [%3]\n"						//将slock的值保存在lockval这个临时变量中
    "	add	%1, %0, %4\n"						
    "	strex	%2, %1, [%3]\n"					//将spin lock中的next加一
    "	teq	%2, #0\n"							//判断是否有其他的thread插入
    "	bne	1b"									
    	: "=&r" (lockval), "=&r" (newval), "=&r" (tmp)
    	: "r" (&lock->slock), "I" (1 << TICKET_SHIFT)
    	: "cc");
    	/*判断当前spin lock的状态,如果是unlocked,那么直接获取到该锁*/
    	while (lockval.tickets.next != lockval.tickets.owner) {
            /*如果当前spin lock的状态是locked,那么调用wfe进入等待状态。*/
    		wfe(); 
            /*其他的CPU唤醒了本cpu的执行,说明owner发生了变化,该新的own赋给lockval,然后继续判断spin lock的状态*/
    		lockval.tickets.owner = READ_ONCE(lock->tickets.owner);
    	}
    
    	smp_mb();
    }
    

    为什么获取锁之前一定要关闭抢占?

    假设 process1通过系统调用进入内核态,如果其需要访问临界区,则在进入临界区前获得锁,上锁,然后进入临界区。如果process1在内核态执行临界区代码的过程中发生了一个外部中断,当中断处理函数返回时,因为内核的可抢占性,此时将会出现一个调度点,如果CPU的运行队列中出现了一个比当前被中断进程process1优先级更高的进程process2,那么被中断的进程将会被换出处理器,即便此时它正运行于内核态。进程process2就会进行空转,浪费CPU资源。

    当然这种实现方式对于内核中的spinlock比较合适,而一般用户态的spinlock实现不需要

    1. 尝试上锁
    static __always_inline int spin_trylock(spinlock_t *lock)
    {
    	return raw_spin_trylock(&lock->rlock);
    }
    
    #define raw_spin_trylock(lock)	__cond_lock(lock, _raw_spin_trylock(lock))
    
    /**单核CUP UP**/
    #define _raw_spin_trylock(lock)			({ __LOCK(lock); 1; })
    #define __LOCK(lock) \
      do { preempt_disable(); ___LOCK(lock); } while (0)
    
    #define ___LOCK(lock) \
      do { __acquire(lock); (void)(lock); } while (0)
    
    /**对称多核CPU,SMP**/
    int __lockfunc _raw_spin_trylock(raw_spinlock_t *lock)
    {
    	return __raw_spin_trylock(lock);
    }
    
    #define BUILD_LOCK_OPS(op, locktype)					\
    void __lockfunc __raw_##op##_lock(locktype##_t *lock)			\
    {									\
    	for (;;) {							\
    		preempt_disable();					\
    		if (likely(do_raw_##op##_trylock(lock)))		\
    			break;						\
    		preempt_enable();					\
    									\
    		arch_##op##_relax(&lock->raw_lock);			\
    	}								\
    }	
    
    int do_raw_spin_trylock(raw_spinlock_t *lock)
    {
    	int ret = arch_spin_trylock(&lock->raw_lock);
    
    	if (ret) {
    		mmiowb_spin_lock();
    		debug_spin_lock_after(lock);
    	}
    #ifndef CONFIG_SMP
    	/*
    	 * Must not happen on UP:
    	 */
    	SPIN_BUG_ON(!ret, lock, "trylock failure on UP");
    #endif
    	return ret;
    }
    
    static inline int arch_spin_trylock(arch_spinlock_t *lock)
    {
    	unsigned long contended, res;
    	u32 slock;
    
    	prefetchw(&lock->slock);
    	do {
    		__asm__ __volatile__(
    		"	ldrex	%0, [%3]\n"
    		"	mov	%2, #0\n"
    		"	subs	%1, %0, %0, ror #16\n"
    		"	addeq	%0, %0, %4\n"
    		"	strexeq	%2, %0, [%3]"
    		: "=&r" (slock), "=&r" (contended), "=&r" (res)
    		: "r" (&lock->slock), "I" (1 << TICKET_SHIFT)
    		: "cc");
    	} while (res);
    
    	if (!contended) {
    		smp_mb();
    		return 1;
    	} else {
    		return 0;
    	}
    }
    
    

    spin_trylock() 不会自旋,但如果它在第一次尝试时获得自旋锁,则返回非零值,否则返回 0。 此函数可用于所有上下文, 而spin_lock使用时必须禁用可能会中断获取自旋锁的上下文。

    1. 释放锁
    static __always_inline void spin_unlock(spinlock_t *lock)
    {
    	raw_spin_unlock(&lock->rlock);
    }
    
    #define raw_spin_unlock(lock)		_raw_spin_unlock(lock)
    
    /**单核CUP UP**/
    #define _raw_spin_unlock(lock)			__UNLOCK(lock)
    #define __UNLOCK(lock) \
      do { preempt_enable(); ___UNLOCK(lock); } while (0)
    
    /**对称多核CPU,SMP**/
    void __lockfunc _raw_spin_unlock(raw_spinlock_t *lock)
    {
    	__raw_spin_unlock(lock);
    }
    
    static inline void __raw_spin_unlock(raw_spinlock_t *lock)
    {
    	spin_release(&lock->dep_map, _RET_IP_);
    	do_raw_spin_unlock(lock);
    	preempt_enable();
    }
    
    static inline void do_raw_spin_unlock(raw_spinlock_t *lock) __releases(lock)
    {
    	mmiowb_spin_unlock();
    	arch_spin_unlock(&lock->raw_lock);
    	__release(lock);
    }
    
    static inline void arch_spin_unlock(arch_spinlock_t *lock)
    {
    	smp_mb();
    	lock->tickets.owner++;
    	dsb_sev();
    }
    
    1. 判断锁状态
    static __always_inline int spin_is_locked(spinlock_t *lock)
    {
    	return raw_spin_is_locked(&lock->rlock);
    }
    
    #define raw_spin_is_locked(lock)	arch_spin_is_locked(&(lock)->raw_lock)
    
    static inline int arch_spin_is_locked(arch_spinlock_t *lock)
    {
    	return !arch_spin_value_unlocked(READ_ONCE(*lock));
    }
    
    static inline int arch_spin_value_unlocked(arch_spinlock_t lock)
    {
    	return lock.tickets.owner == lock.tickets.next;
    }
    

    4 实际案例

    spinlock的使用很简单:

    1. 我们要访问临界资源需要首先申请自旋锁;
    2. 获取不到锁就自旋,如果能获得锁就进入临界区;
    3. 当自旋锁释放后,自旋在这个锁的任务即可获得锁并进入临界区,退出临界区的任务必须释放自旋锁。

    实验:

    #include <pthread.h>
    #include <stdio.h>
    
    int cnt = 0;
    
    void* task(void* args) {
        for(int i = 0; i < 100000; i++)
        {
            cnt ++;
        }
        return NULL;
    }
    
    
    int main() {
        pthread_t tid1, tid2;
    	/* create the thread */
    	pthread_create(&tid1, NULL, task, NULL);
        pthread_create(&tid2, NULL, task, NULL);
    	/* wait for thread to exit */
    	pthread_join(tid1, NULL);
        pthread_join(tid2, NULL);
    
    	printf("cnt = %d\n", cnt);
    	return 0;
    }
    

    输出:

    cnt = 131189
    

    正确结果不应该是200000吗?为什么会出错呢,我们可以从汇编角度来分析一下。

    $> g++ -E test.c -o test.i
    $> g++ -S test.i -o test.s
    $> vim test.s
        
    .file	"test.c"
    	.globl	_cnt
    	.bss
    	.align 4
    _cnt:
    	.space 4
    	.text
    	.globl	__Z5task1Pv
    	.def	__Z5task1Pv;	.scl	2;	.type	32;	.endef
    __Z5task1Pv:
    	...
    

    我们可以看到

    for(int i = 0; i < 100000; i++)
    {
        cnt ++;
    }
    

    对应的汇编代码为

    cmpl	$99999, -4(%ebp)
    jg	L2
    movl	_cnt, %eax
    addl	$1, %eax
    movl	%eax, _cnt
    addl	$1, -4(%ebp)
    jmp	L3
    

    一个简单的cnt++,对应

    movl	_cnt, %eax
    addl	$1, %eax
    movl	%eax, _cnt
    

    CPU先将cnt的值读到寄存器eax中,然后将[eax] + 1,最后将eax的值返回到cnt中,这些操作不是**原子性质(atomic)**的,这就导致cnt被多个线程操作时,+1过程会被打断。

    比如task1和 task2,假设cnt初始值为0

    Task 1                     Task 2
    load cnt       
                               load cnt
    cnt + 1      
    store cnt                  cnt + 1
                               store cnt
    

    本来cnt应该为2,现在却为1

    现在使用spinlock,修改后的代码为:

    #include <pthread.h>
    #include <stdio.h>
    
    pthread_spinlock_t lock;
    int cnt = 0;
    
    void* task(void* args) {
        for(int i = 0; i < 1000000; i++)
        {
            pthread_spin_lock(&lock);
            cnt++;
            pthread_spin_unlock(&lock);
        }
        return NULL;
    }
    
    int main() {
        pthread_spin_init(&lock, 0);
        pthread_t tid1;
    	pthread_t tid2;
    	/* create the thread */
    	pthread_create(&tid1, NULL, task, (void*)1);
        pthread_create(&tid2, NULL, task, (void*)2);
    	/* wait for thread to exit */
    	pthread_join(tid1, NULL);
        pthread_join(tid2, NULL);
    
    	printf("cnt = %d\n", cnt);
    	return 0;
    }
    
    

    输出

    cnt = 2000000
    

    保证了cnt++的原子性,最终取得正确结果

    Linux常用数据结构 第二篇,哈希表hlist
    Linux常用数据结构 第四篇,mutex

    参考文章:

    从CPU cache一致性的角度看Linux spinlock的不可伸缩性(non-scalable)

    spinlock前世今生

    Linux kernel-5.8源码

    展开全文
  • Linux内核spinlock实现

    2021-05-21 01:21:16
    Linux内核spinlock实现 文章目录 Linux内核spinlock实现 1. 自旋锁结构 2. 获取自旋锁 3. 释放自旋锁 linux内核自旋锁spinlock实现详解(基于ARM处理器) 内核当发生访问资源冲突的时候,可以有两种锁的解决方案选择:...
  • spinlock的实现

    2021-05-24 07:57:16
    /** include/linux/spinlock.h - generic spinlock/rwlock declarationshere's the role of the various spinlock/rwlock related include files:* * on SMP builds: **asm/spinlock_types.h: ...
  • Spinlock

    2017-04-09 15:15:42
    A locked spinner puzzle is a puzzle where you can only change wheels in groups. It is a common puzzle to achieve some value on the spinners by only changing them in the allowed groups. ...
  • spinlock作用

    2021-05-13 07:39:46
    spinlock只是一种选择,并不是所有同步的地方都用spinlock.通常它适用于对内核(包括模块)的一些全局数据结构的访问。spinlock中所保护的codes最好能迅速完成,同时释放该锁。在无法获得该锁的情况下,内核不会切换,...
  • SpinLock是最底层的锁,使用互斥信号量实现,与操作系统和硬件环境联系紧密。SpinLock分为与机器相关的实现方法(定义在s_lock.c中)和与机器不相关的实现方法(定义在Spin.c中)。SpinLock的特点是:封锁时间很短,...
  • Linux内核原语(四)——自旋锁(Spinlock) 小狼@http://blog.csdn.net/xiaolangyangyang 内核当发生访问资源冲突的时候,可以有两种锁的解决方案选择: 一个是原地等待 一个是挂起当前进程,调度其他进程...
  • spinlock 使用介绍

    千次阅读 2021-05-24 17:06:57
    文章目录一、spinlock 简介二、自旋锁与互斥锁的区别三、自旋锁的优缺点 一、spinlock 简介 自旋锁(spinlock):是指当一个线程在获取锁的时候,如果锁已经被其它线程获取,那么该线程将循环等待,不断尝试获取锁...
  • 早期spinlock的设计 早期的spinlock的设计是锁的拥有者加锁时将锁的值设置为1,释放锁时将锁的值设置为0,这样做的缺点是会出现 先来抢占锁的进程一直抢占不到锁,而后来的进程可能一来 就能获取到锁。导致这个原因...
  • SpinLock接口用于封装依赖于硬件实现的SpinLock机制和不依赖于硬件实现的SpinLock机制(使用PG信号量来仿真SpinLock)。SpinLock接口定义在spin.h中,包括SpinLockInit、SpinLockAcquire、SpinLockRelease和...
  • 在c++中实现一个good spinlock 网上有很多关于spinlock不好的实现。他们都犯了相同的错误:在RMW(read-modify-write)操作上自旋。我后面会说明为什么这些实现不好,也会介绍如何在C++中实现一个好的spinlock,并且...
  • 头文件: 初始化: 静态:spinlock_t lock; 动态:void spin_lock_init(spinlock_t *lock); 获取锁: void spin_lock(spinlock_t *lock); void spin_lock_irqsave(spinlock_t *lock, unsigned long flags); 获得...
  • 想学习SMP多核调度功能,需要了解下SpinLock自旋锁。除了多核的自旋锁机制,本文还会介绍下LiteOS 5.0引入的LockDep死锁检测特性。 本文中所涉及的LiteOS源码,均可以在LiteOS开源站点...
  • 我们知道linux内核中存在进程、中断、软中断等相互之间的同步问题,在SMP下这些使用场景会变的尤为复杂,因此linux内核提供了semaphore、spinlock、mutex,rcu、atomic等锁机制来解决这些问题。然而每种锁机制都有...
  • 1. TAS lock (test-and-set)这是最简单的spinlock,CPU会在硬件上提供一些指令来帮助OS实现spinlock,比如x86就有xchg, LOCK指令前缀等指令。。。test_and_set()可以利用这些指令对某个memory地址,来原子地完成:...
  • kprobe/kretprobe框架内部使用spinlock来进行同步,在ret_handler执行的时候,它已经持有了该spinlock,而属于kretprobe的ret_handler本身同样也需要该spinlock,因此会死锁。 啦啦啦,这就是为什么我喜欢纯手工...
  • Spinlock 是内核中提供的一种比较常见的锁机制,自旋锁是“原地等待”的方式解决资源冲突的,即,一个线程获取了一个自旋锁后,另外一个线程期望获取该自旋锁,获取不到,只能够原地“打转”(忙等待)。由于自旋锁...
  • spinlock是linux内核锁机制的一种,而linux内核锁机制是linux内核同步机制的一部分。linux内核同步机制的使用原因是为了避免共享数据之间的竞争出现,它包括per cpu变量、原子操作、内存屏障、spinlock、信号量、...
  • 转载:Linux下Spinlock编程(自旋锁)(2010-05-10 00:19:29)标签:itSpinlock编程(自旋锁)本文转载在 Linux Kernel里有著许多重要的资料结构,这些资料在操作系统的运作中扮演著举足轻重的角色。然而,Linux是个多工的...
  • 多线程读写concurrentqueue 单线程读写readerwriterqueue 测试验证基于C++ STL利用CAS原子操作封装的无锁list 摘要 实测无锁队列concurrentqueue、boost.spinlock 和 std::mutex 在多线程情况下的表现; 结论: 已经...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 35,049
精华内容 14,019
关键字:

spinlock

友情链接: mysql.rar