精华内容
下载资源
问答
  • 配套代码讲解:https://blog.csdn.net/songchuwang1868/article/details/90200251 ...同步队列-无锁队列-循环数组无锁队列 同步队列-无锁队列-循环数组无锁队列 同步队列-无锁队列-循环数组无锁队列
  • 无锁队列

    2019-05-13 16:56:26
    本文总体是转载的,但是更正了部分错误、增添了批注 ...5、无锁队列的实现 6、CAS的ABA问题 7、用数组实现无锁队列 1、前言  队列在计算机中非常重要的一种数据结构,尤其在操作系统中。队列典型的特征...

    https://www.cnblogs.com/alantu2018/p/8469168.html

    本文总体是转载的,但是更正了部分错误、增添了批注

    目录

    1、前言

    2、队列操作模型

    3、队列数据定长与变长

    4、并发无锁处理

    5、无锁队列的实现

    6、CAS的ABA问题

    7、用数组实现无锁队列


    1、前言

      队列在计算机中非常重要的一种数据结构,尤其在操作系统中。队列典型的特征是先进先出(FIFO),符合流水线业务流程。在进程间通信、网络通信之间经常采用队列做缓存,缓解数据处理压力。结合自己在工作中遇到的队列问题,总结一下对不同场景下的队列实现。根据操作队列的场景分为:单生产者——单消费者、多生产者——单消费者、单生产者——多消费者、多生产者——多消费者四大模型。其实后面三种的队列,可以归纳为一种多对多。根据队列中数据分为:队列中的数据是定长的、队列中的数据是变长的。

    2、队列操作模型

    (1)单生产者——单消费者

    (2)多生产者——单消费者

    (3)单生产者——多消费者

    (4)多生产者——多消费者

    3、队列数据定长与变长

    (1)队列数据定长

    (2)队列数据变长

    4、并发无锁处理

    (1)单生产者——单消费者模型

      此种场景不需要加锁,定长的可以通过读指针和写指针进行控制队列操作,变长的通过读指针、写指针、结束指针控制操作。具体实现可以参考linux内核提供的kfifo的实现。可以参考:

    http://blog.csdn.net/linyt/article/details/5764312

    (2)(一)多对多(一)模型

      正常逻辑操作是要对队列操作进行加锁处理。加锁的性能开销较大,一般采用无锁实现。无锁实现原理是CAS、FAA等机制。定长的可以参考:

    http://coolshell.cn/articles/8239.html

    变长的可以参考intel dpdk提供的rte_ring的实现。

    http://blog.csdn.net/linzhaolover/article/details/9771329

    5、无锁队列的实现

    关于无锁队列的实现,网上有很多文章,虽然本文可能和那些文章有所重复,但是我还是想以我自己的方式把这些文章中的重要的知识点串起来和大家讲一讲这个技术。下面开始正文。

    关于CAS等原子操作

    在开始说无锁队列之前,我们需要知道一个很重要的技术就是CAS操作——Compare & Set,或是 Compare & Swap,现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是 CMPXCHG 汇编指令。有了这个原子操作,我们就可以用其来实现各种无锁(lock free)的数据结构。

    这个操作用C语言来描述就是下面这个样子:(代码来自Wikipedia的Compare And Swap词条)意思就是说,看一看内存*reg里的值是不是oldval,如果是的话,则对其赋值newval。

    int compare_and_swap (int* reg, int oldval, int newval)
    {
      int old_reg_val = *reg;
      if (old_reg_val == oldval)
         *reg = newval;
      return old_reg_val;
    }

    这个操作可以变种为返回bool值的形式(返回 bool值的好处在于,可以调用者知道有没有更新成功):

    bool compare_and_swap (int *accum, int *dest, int newval)
    {
      if ( *accum == *dest ) {
          *dest = newval;
          return true;
      }
      return false;
    }

    与CAS相似的还有下面的原子操作:(这些东西大家自己看Wikipedia吧)

    注:在实际的C/C++程序中,CAS的各种实现版本如下:

    1)GCC的CAS

    GCC4.1+版本中支持CAS的原子操作(完整的原子操作可参看 GCC Atomic Builtins

    bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
    type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)

    2)Windows的CAS

    在Windows下,你可以使用下面的Windows API来完成CAS:(完整的Windows原子操作可参看MSDN的InterLocked Functions

    InterlockedCompareExchange ( __inout LONG volatile *Target,
                    __in LONG Exchange,
                    __in LONG Comperand);

    3) C++11中的CAS

    C++11中的STL中的atomic类的函数可以让你跨平台。(完整的C++11的原子操作可参看 Atomic Operation Library

    template< class T >
    bool atomic_compare_exchange_weak( std::atomic* obj,
                      T* expected, T desired );
    
    template< class T >
    bool atomic_compare_exchange_weak( volatile std::atomic* obj,
                      T* expected, T desired );

    无锁队列的链表实现

    下面的东西主要来自John D. Valois 1994年10月在拉斯维加斯的并行和分布系统系统国际大会上的一篇论文——《Implementing Lock-Free Queues》。

    我们先来看一下进队列用CAS实现的方式:

    EnQueue(x) //进队列
    {
        //准备新加入的结点数据
        q = new record();
        q->value = x;
        q->next = NULL;
    
        do {
            p = tail; //取链表尾指针的快照
        } while( CAS(p->next, NULL, q) != TRUE); //如果没有把结点链在尾指针上,再试
    
        CAS(tail, p, q); //置尾结点
    }

    (注:理解上述代码的一个关键是tail是一个单独的变量,指向链表队列的尾部,生产者们每次放数据成功都应该修改tail,所以tail也是互斥资源,也应使用CAS机制来无锁修改,这就是最后一行代码的含义)

    我们可以看到,程序中的那个 do- while 的 Re-Try-Loop。就是说,很有可能我在准备在队列尾加入结点时,别的线程已经加成功了,于是tail指针就变了,于是我的CAS返回了false,于是程序再试,直到试成功为止(此时生产者放入队列成功)。这个很像我们的抢电话热线的不停重播的情况。

    你会看到,为什么我们的“置尾结点”的操作(第12行)不判断是否成功,因为:

    1. 如果有一个线程T1,它的while中的CAS如果成功的话(T1生产者放入数据成功,但此时还没有修改tail),那么其它所有的 随后线程(其他生产者,因为tail还没有改变,必然CAS失败,出不了循环)的CAS都会失败,然后就会再循环,
    2. 此时,如果T1 线程还没有更新tail指针,其它的线程继续失败,因为tail->next不是NULL了(tail->next是T1刚放入的数据)
    3. 直到T1线程更新完tail指针(也就是12行,使用CAS更新tail),于是其它的线程中的某个线程就可以得到新的tail指针,继续往下走了。

    (??既然其他生产者线程都出不了while循环,更不会去修改tail,直接修改tail是安全的,所以最后直接tail=q即可)

    这里有一个潜在的问题——如果T1线程在用CAS更新tail指针的之前,线程停掉或是挂掉了,那么其它线程就进入死循环了。下面是改良版的EnQueue()

    EnQueue(x) //进队列改良版
    {
        q = new record();
        q->value = x;
        q->next = NULL;
    
        p = tail;
        oldp = p
    
        do {
            while (p->next != NULL)//改进部分,自己找到尾部
                p = p->next;
    
        } while( CAS(p.next, NULL, q) != TRUE); //如果没有把结点链在尾上,再试
    
        CAS(tail, oldp, q); //置尾结点
    }

    我们让每个线程,自己fetch 指针 p 到链表尾。但是这样的fetch会很影响性能。而通实际情况看下来,99.9%的情况不会有线程停转的情况,所以,更好的做法是,你可以接合上述的这两个版本,如果retry的次数超了一个值的话(比如说3次)(??感觉这是一种想当然的优化,实际上在绝大部分情况下,上述代码只是多了一次p->next是否为0的判断而已。采用retry依然需要判断,并无优势),那么,就自己fetch指针。

    好了,我们解决了EnQueue,我们再来看看DeQueue的代码:(很简单,我就不解释了)

    DeQueue() //出队列 这里的head没有存数据
    {
        do{
            p = head;
            if (p->next == NULL)
                return ERR_EMPTY_QUEUE;
    
            }while( CAS(head, p, p->next) != TRUE );
    
        return p->next->value;
    
    }

    (这里的head和EnQueue中的tail一样,是单独维护的一个变量,指向队列头部,这意味着head也是一个互斥资源,需要使用CAS修改)

    我们可以看到,DeQueue的代码操作的是 head->next,而不是head本身。这样考虑是因为一个边界条件,我们需要一个dummy的头指针来解决链表中如果只有一个元素,head和tail都指向同一个结点的问题,这样EnQueue和DeQueue要互相排斥了

    注:上图的tail正处于更新之前的装态。

    6、CAS的ABA问题

    所谓ABA(见维基百科的ABA词条),问题基本是这个样子:

    1. 进程P1在共享变量中读到值为A
    2. P1被抢占了,进程P2执行
    3. P2把共享变量里的值从A改成了B,再改回到A,此时被P1抢占。
    4. P1回来看到共享变量里的值没有被改变,于是继续执行。

    虽然P1以为变量值没有改变,继续执行了,但是这个会引发一些潜在的问题。ABA问题最容易发生在lock free 的算法中的,CAS首当其冲,因为CAS判断的是指针的地址。如果这个地址被重用了呢,问题就很大了。(地址被重用是很经常发生的,一个内存分配后释放了,再分配,很有可能还是原来的地址)

    比如上述的DeQueue()函数,因为我们要让head和tail分开,所以我们引入了一个dummy指针给head,当我们做CAS的之前,如果head的那块内存被回收并被重用了(被其他的消费者线程CAS获取),而重用的内存又被EnQueue()进来了,这会有很大的问题。(一定注意:ABA问题并不是存在于CAS操作内部,CAS可直接视为一条指令)内存管理中重用内存基本上是一种很常见的行为

    这个例子你可能没有看懂,维基百科上给了一个活生生的例子——

    你拿着一个装满钱的手提箱在飞机场,此时过来了一个火辣性感的美女,然后她很暖昧地挑逗着你,并趁你不注意的时候,把用一个一模一样的手提箱和你那装满钱的箱子调了个包,然后就离开了,你看到你的手提箱还在那,于是就提着手提箱去赶飞机去了。

    这就是ABA的问题。

    解决ABA问题三种方法:

    • CAS双保险机制
    • 针对特定的业务使用引用计数
    • 使用循环数组实现

    维基百科上给了一个解——使用double-CAS(双保险的CAS),例如,在32位系统上,我们要检查64位的内容

    1)一次用CAS检查双倍长度的值,前半部是指针,后半部分是一个计数器

    2)只有这两个都一样,才算通过检查,才能赋新的值。并把计数器累加1

    这样一来,ABA发生时,虽然值一样,但是计数器就不一样(但是在32位的系统上,这个计数器会溢出回来又从1开始的,这还是会有ABA的问题)

    当然,我们这个队列的问题就是不想让那个内存重用,这样明确的业务问题比较好解决,论文《Implementing Lock-Free Queues》给出一这么一个方法——使用结点内存引用计数refcnt

    SafeRead(q)
    {
        loop:
            p = q->next;
            if (p == NULL){
                return p;
            }
    
            Fetch&Add(p->refcnt, 1);
    
            if (p == q->next){
                return p;
            }else{
                Release(p);
            }
    
        goto loop;
    
    }

    其中的 Fetch&Add和Release分是是加引用计数和减引用计数,都是原子操作,这样就可以阻止内存被回收了。(在链表Node节点的析构函数中,仅仅当Node的引用计数,也就是上述代码的refcnt为0,才会被析构。自然不会出现前面的内存地址ABA问题)

    7、用数组实现无锁队列

    本实现来自论文《Implementing Lock-Free Queues

    使用数组来实现队列是很常见的方法,因为没有内存的分部和释放,一切都会变得简单,实现的思路如下:

    1)数组队列应该是一个ring buffer形式的数组(环形数组)

    2)数组的元素应该有三个可能的值:HEAD,TAIL,EMPTY(当然,还有实际的数据)

    3)数组一开始全部初始化成EMPTY,有两个相邻的元素要初始化成HEAD和TAIL,这代表空队列。

    4)EnQueue操作。假设数据x要入队列,定位TAIL的位置,使用double-CAS方法把(TAIL, EMPTY) 更新成 (x, TAIL)。需要注意,如果找不到(TAIL, EMPTY),则说明队列满了。

    5)DeQueue操作。定位HEAD的位置,把(HEAD, x)更新成(EMPTY, HEAD),并把x返回。同样需要注意,如果x是TAIL,则说明队列为空。

    算法的一个关键是——如何定位HEAD或TAIL?

    1)我们可以声明两个计数器,一个用来计数EnQueue的次数,一个用来计数DeQueue的次数。

    2)这两个计算器使用使用Fetch&ADD来进行原子累加,在EnQueue或DeQueue完成的时候累加就好了。

    3)累加后求个模什么的就可以知道TAIL和HEAD的位置了。

    如下图所示:

     小结

    以上基本上就是所有的无锁队列的技术细节,这些技术都可以用在其它的无锁数据结构上。

    1)无锁队列主要是通过CAS、FAA这些原子操作,和Retry-Loop实现。

    2)对于Retry-Loop,我个人感觉其实和锁什么什么两样。只是这种“锁”的粒度变小了,主要是“锁”HEAD和TAIL这两个关键资源。而不是整个数据结构。

    还有一些和Lock Free的文章你可以去看看:

    注:我配了一张look-free的自行车,寓意为——如果不用专门的车锁,那么自行得自己锁自己!

    展开全文
  • 无锁队列详细分解.pdf

    2020-06-24 16:44:28
    无锁队列
  • 基于cas的无锁队列C++实现,基于cas的无锁队列C++实现,基于cas的无锁队列C++实现,基于cas的无锁队列C++实现,基于cas的无锁队列C++实现,
  • 消息队列与无锁队列实现

    1 解决问题

    消息队列在服务器中的位置,一般用于生产消费者模式的,分类两种情况一种非线程安全的,一种在多线程下使用的。例如
    线程池中处理任务队列,任务队列就是消息队列。如下图
    在这里插入图片描述
    当多个线程同时操作一个任务队列或者消息队列或同一临界资源的时候,就可能会遇到线程切换的问题。而对于线程切换是一个开销很大的操作,故就产生了无锁队列的需求。

    2 消息队列分类

    非线程安全:例如stl中的list
    线程安全:可以采用锁和无锁两种方式实现。

    线程安全的消息队列:
    锁实现方式: 互斥锁,互斥尝试锁,自旋锁,读写锁
    无锁方式: 原子操作(__asm__汇编自己实现,gcc提供api实现
    开源组件: zeromq(单读单写),ArrayLockFreeQueue(基于循环数组实现的),SimpleLockFreeQueue(基于链表实现的)

    3 原子性

    1 i++: 对于i++而言,不是原子性cpu不保证;对于它编译器会转化成汇编三条指令
    在这里插入图片描述
    具体流程:
    (1)把变量i从内存(RAM)加载到寄存器;(2)把寄存器的值加1;(3)把寄存器的值写回内存(RAM)。
    三条指令在多线程环境下会被打乱执行,得到的值与预期的值不一致。

    2 计算器体系结构::
    在这里插入图片描述
    会产生cache实现等问题。
    3 gcc原子api接口
    c++编译器提供了一组原子api.

    type __sync_fetch_and_add (type *ptr, type value, ...) 
    type __sync_fetch_and_sub (type *ptr, type value, ...) 
    type __sync_fetch_and_or (type *ptr, type value, ...) 
    type __sync_fetch_and_and (type *ptr, type value, ...) 
    type __sync_fetch_and_xor (type *ptr, type value, ...) 
    type __sync_fetch_and_nand (type *ptr, type value, ...) 
    8bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...) 
    type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
    type __sync_lock_test_and_set (type *ptr, type value, ...)
    void __sync_lock_release (type *ptr, ...)
    

    对于c++11也提供了一组atomic接口

    4 无锁队列

    对于消息队列可以采用有锁来实现,多线程操作有锁队列也会引起的问题:
    1 同一个线程在不同cpu运行会发生切换 cache损坏(cache trashing)
    2 在同步机制上争抢队列block_queue(mutex+condition)
    3 动态分配内存(对于高性能队列,开辟释放内存,不是直接调用系统调用来实现,而是采用内存池来实现)

    5 开源无锁队列组件

    对于无锁队列可以自己采用原子操作来实现一个(汇编实现,gcc提供的api接口实现),但是大多数情况下都是使用开源组件,
    对于业界比较有名的实现方式有三种: zeromq组件,ArrayLockFreeQueue,SimpleLockFreeQueue
    1 zeromq
    原理:主要采用yqueue_t和ypipe_t两个数据结构,还有chunke的设计,在一个结点上一次性开辟N个大小的值,以及spare_chunk局部性原理。
    比较难理解的ypipe_t r指针的预读机制,r可以理解为read_end,r并不是读数据的位置索引,⽽是我们可以最多读到哪个位置的索引。读数据的索引位置还是begin_pos。
    a 原子指针类

     template <typename T> class atomic_ptr_t 
     {  
    public:  
    	 inline void set (T *ptr_); //⾮原⼦操作  
    	  inline T *xchg (T *val_); //原⼦操作,设置⼀个新的值,然后返回旧的值 
    	  inline T *cas (T *cmp_, T *val_);//原⼦操作 
      private: 
    	   volatile T *ptr;
     }
    

    b queue_t类
    此结点类型就是一个可以装载N个T类型的元素
    构造,析构,front,pop,back,push,unpush(回滚)
    成员变量:比较有特色的是spare_chunk

    template <typename T, int N> class yqueue_t
    {
    public:
    	inline yueue_t(); // 构造函数
    	inline ~yueue_t();// 析构函数
    	inline T &front(); // 获取队头
    	inline T &back();  // 获取队尾
    	inline void push(); // 入队,要先back后再写入data,push的时候更新位置索引
    	inline void pop(); // 出队,要先通过front取出来,pop更新位置索引
    	inline void unpush(); // 回滚
    private:
    	struct chunk_t
    	{
    		T values[N}chunk_t *prev;
    		chunk_t *next;
    	};
    
    	chunk_t *begin_chunk;  // 开始块
    	int begin_pos;
    	chunk_t *back_chunk;   // 当前块
    	int back_pos;
    	chunk_t *end_chunk;   // 结束块
    	int end_pos;
    
    	atomic_ptr_t<chunk_t> spare_chunk; // 空闲块,所有已经出队的块称空闲块, 读写线程的共享变量	
    };
    

    c 数据结构逻辑
    1 基本介绍:yqueue_t:每次批量分配一批元素,减少内存的分配和释放,解决不断动态内存分配问题。
    内部由一个一个chunk组成,每个chunk保持N个元素。

    	struct chunk_t
    	{
    		T values[N}chunk_t *prev;
    		chunk_t *next;
    	};
    

    在这里插入图片描述
    当队列空间不足时,每次分配一个chunk_t,每个能存储N个元素;
    数据出队列后,队列有多余空间时候,回收chunk不是妈说释放,而是根据局部性原理先回收到spare_chunk里面,当再次需要分配chunk_t时候,从spare_chunk中获取。

    私有变量:
    begin_chunk/begin_pos:队头的chunk;队列第一个元素的在当前chunk的位置
    back_chunk/back_pos:队尾的chunk;队列最后一个元素在当前chunk的位置(队友元素存储位置)
    end_chunk/end_pos:分配的队列最后一个chunk(决定是否分配chunk和回收chunk)
    在这里插入图片描述
    还有一个spare_chunk指针,用于保存释放的chunk指针,当需要分配内存chunk时候,先查看这里的chunk复用。
    2 基本接口操作:
    构造函数

    inline yquue_t()
    {
    	begin_chunk = (chunk_t*)malloc(sizeof(chunk_t)); // 预先分配chunk
    	alloc_assert(begin_chunk);
    	begin_pos = 0;
    	back_chunk = NULL; // back_chunk总是指向队列中最后一个元素所在的chunk,现在还没有元素,所以初始化为空
    	back_pos = 0;
    	end_chunk = begin_chunk; // end_chunk总是指向链表的最后一个chunk
    	end_pos = 0;
    }
    

    end_chunk总是指向最后分配的chunk,刚分配出来的chunk,end_pos都是0.
    back_chunk是需要chunk有元素插入的时候才指向对应的chunk。
    在这里插入图片描述
    front,back函数
    返回值是左值引用,我们可以通过修改来修改容器类的值;调用这两个函数对应的位置没有变化,只有调用push,pop后位置才变更。

    inline T& front()
    {
    	return begin_chunk->values[begin_pos]// 队列首个chunk对应的begin_pos
    }
    
    inline T& back()
    {
    	return back_chunk->values[back_pos];
    }
    

    push函数
    插入元素:

    inline void push()
    {
    	back_chunk = end_chunk;
    	back_pos = end_pos;	// 更新可写的位置,根据end_pos取更新
    
    	if(++end_pos != N) // end_pos==N表明这个链表结点已经满了
    		return;
    	
    	chunk_t *sc = spare_chunk.xchg(NULL); // 把之前值取出来了,则没有spare_chunk,所以设置NULL
    	if(sc){
    		end_chunk->next = sc;
    		sc->prev = end_chunk;
    	}else{ // 没有则重新分配
    		end_chunk->next = (chunk_t*)malloc(sizeof(chunk_t));//分配一个chunk
    		alloc_assert(end_chunk->next);
    		end_chunk->next->prev = end_chunk;
    	}
    
    	end_chunk = end_chunk->next;
    	end_pos = 0;
    }
    
    

    第一种情况++end_pos != N
    在这里插入图片描述
    第一种情况++end_pos == N
    在这里插入图片描述
    pop函数
    整个chunk元素被取出队列才去回收chunk,而且是把最后回收的chunk保存到spare_chunk,释放之前保存的chunk。

    inline void pop(){
    	if (++begin_pos == N) // 删除一个chunk才回收chunk
    	{
    		chunk_t *o = begin_chunk;
    		begin_chunk = begin_chunk->next;   // 更新begin_chunk位置
    		begin_chunk->prev = NULL;
    		begin_pos = 0;
    		
    		chunk_t *cs = spare_chunk.xchg(o);// 由于局部性原理,总是保存最新的空闲块而释放先前的空闲块
    		free(cs);
    	}
    
    }
    
    

    注意:pop掉的元素,其销毁工作交给调用者完成,即时pop前调用者需要通过front()接口读取并进行销毁。
    空闲块的保存,要求是原子操作。

    3 ypipe_t基于yqueue_t构建的单写单读的无锁队列

    template <typename T, int N>
    class ypipe_t
    {
    public:
    	inline ypipe_t();
    	inline virtual ~ypipe_t();
    
    	// 写入数据,incomplete参数表示写入是否还没完成,在没有完成的时候不会修改flush指针,即这部分数据不会让线程看到
    	inline void write(const T &value_, bool incomplete_);
    	inline bool unwrite(T *value_);
    	// 刷新所有已经完成的数据到管道,返回false意味着读线程在休眠,在这种情况下调用者需要唤醒线程
    	inline bool flush();
    
    	// 一个检查是否有数据可读,一个是预取
    	inline bool check_read();
    	inline bool read(T *value_);
    	inline bool probe(bool (*fn)(T&));
    
    proteced:
    	yqueue_t<T, N> queue;
    	T *w;//指向第一个未刷新的元素,只被写线程使用
    	T *r;// 指向第一个还没有预提取的元素,只被读线程使用
    	T *f;// 指向下一轮要被刷新的一批元素中的第一个
    
    	atomic_ptr_t<T> c;// 读写线程共享指针,指向每一轮刷新的点。当c为空时,表示读线程睡眠(只会在读线程中被设置为空)
    
    	ypipe_t(const ypipe_t&);
    	const ypipe_t &operator=(const ypipe_t&);
    };
    

    T *w: 第一个未刷新的元素,只被写线程使用
    T *r;第一个还没预提取的元素,只被读线程使用
    T *f;下一轮要被刷新的一批元素中的第一个
    atomic_ptr_t c;读写线程共享的指针,指向每一轮刷新的起点,当c为空时,表示读线程睡眠(只会在读线程中被设置为空)
    接口: write(),flush(),read()
    在这里插入图片描述
    初始化

    inline ypipe_t()
    {
    	queue.push();  // yqueue_t的尾指针加1,开始back_chunk为空,现在back_chunk指向第一个chunk_t块的第一个位置
    	r = w = f = &queue.back(); // r,w,f,c四个指针都指向end迭代器
    	c.set(&queue.back());
    }
    

    在这里插入图片描述
    write函数

    inline void write(const T &value_, bool incomplete)
    {
    	queue.back() = value_;
    	queue.push(); // 更新下一次写的位置
    
    	if (!incomplete_)  // 如果f不更新,flush的时候,read也是没有数据
    		f = &queue.back(); // 记录要刷新的位置
    }
    

    在这里插入图片描述
    flush函数
    将w更新到f的位置,说明已经写到的位置。

    // 刷新所有已经完成的数据到管道,返回false意味着读线程在休眠,在这种情况下调用者需要唤醒读线程
    inline bool flush()
    {
    	if (w == f) // 不需要刷新,即时还灭有新元素加入
    		return true;
    
    	// read导致数据没有可读取后,c会被设置为NULL
    	if (c.cas(w, f) != w){  // 尝试将c设置为f,即准备更新w的位置
    		c.set(f);
    		w = f;  
    		return false;   // 线程看到flush返回false会会发送一个消息给读线程,这个需要写业务去处理
    	}
    	
    	w = f;
    	return true;
    }
    

    在这里插入图片描述
    read函数
    两个点:一个检查是否有数据可读,一个是预取

    inline bool check_read()
    {
    	if (&queue.front() != r && r)  // 判断是否在前几次调用read函数时已经预取数据了return true
    		return true;
    
    	r = c.cas(&queue.front(), NULL);// 尝试预取数据
    	if (&queue.front() == r || !r)// 判断是否成功预取数据
    		return false;
    
    	return true;
    }
    
    inline bool read()
    {
    	if (!check_read()) return false;
    
    	*value = queue.front();
    	queue.pop();
    	return true;
    }
    

    在这里插入图片描述
    总结: 1 我们正常情况下的链表都是一个结点一个元素,而zeromq是在一个结点上N个元素,这样只有开辟元素内存时候,不必每次都要开辟一个结点,而是开辟一个结点预先创建N个元素的内存,后面需要就直接在单个结点上拿去,这样可以提升性能;
    2 而对于zeromq队列还有一个比较有意思的点是有一个spare_chunk空闲块,这是利用局部性原理,只会最多有一个chunk的,而当一个结点中的N个元素的读取完后,则不立马释放内存,而是保持最新的一个到spare_chunk中,这样下次需要开辟新结点的时候直接复用spare_chunk空间。
    3 对于zeromq支持单读单写的线程安全,具体实现是封装了一个原子指针类,一个yqueue_t类,在加上一个ypipe_t类管道类。
    在这里插入图片描述

    2 基于循环数组实现的无锁队列
    支持多读多写的
    比较重要的点:
    1 ArrayLockFreeQueue数据结构,可以理解为⼀个环形数组;
    2 多线程写⼊时候,m_maximumReadIndex、m_writeIndex索引如何更新
    3 在更新m_maximumReadIndex的时候为什么要让出cpu;
    4 多线程读取的时候,m_readIndex如何更新。
    5 可读位置是由m_maximumReadIndex控制,⽽不是m_writeIndex去控制的。 m_maximumReadIndex的更新由m_writeIndex。

    待有空了再详细学习多读线程多写线程的无锁队列。

    展开全文
  • 无锁队列 一个简单的通用C11并发无锁队列。 基于原子变量。
  • 无锁队列的实现

    2020-10-25 17:49:15
    《DPDK无锁队列rte_ring相关代码及示例程序(rte_ring.h,rte_ring.c,main.c,makefile)》 《DPDK ring库:环形缓冲区的解剖》 《无锁队列的实现》 Table of Contents 关于CAS等原子操作 无锁队列的链表实现 ...

    【共享内存】基于共享内存的无锁消息队列设计

    DPDK无锁队列rte_ring相关代码及示例程序(rte_ring.h,rte_ring.c,main.c,makefile)

    DPDK ring库:环形缓冲区的解剖

    无锁队列的实现

    Table of Contents

    关于CAS等原子操作

    无锁队列的链表实现

    CAS的ABA问题

    用数组实现无锁队列

    小结


     

    关于无锁队列的实现,网上有很多文章,虽然本文可能和那些文章有所重复,但是我还是想以我自己的方式把这些文章中的重要的知识点串起来和大家讲一讲这个技术。下面开始正文。

     

    关于CAS等原子操作


    在开始说无锁队列之前,我们需要知道一个很重要的技术就是CAS操作——Compare & Set,或是 Compare & Swap,现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是 CMPXCHG 汇编指令。有了这个原子操作,我们就可以用其来实现各种无锁(lock free)的数据结构。

    这个操作用C语言来描述就是下面这个样子:(代码来自Wikipedia的Compare And Swap词条)意思就是说,看一看内存*reg里的值是不是oldval,如果是的话,则对其赋值newval

    int compare_and_swap (int* reg, int oldval, int newval)
    {
      int old_reg_val = *reg;
      if (old_reg_val == oldval) {
         *reg = newval;
      }
      return old_reg_val;
    }

    我们可以看到,old_reg_val 总是返回,于是,我们可以在 compare_and_swap 操作之后对其进行测试,以查看它是否与 oldval相匹配,因为它可能有所不同,这意味着另一个并发线程已成功地竞争到 compare_and_swap 并成功将 reg 值从 oldval 更改为别的值了。

    这个操作可以变种为返回bool值的形式(返回 bool值的好处在于,可以调用者知道有没有更新成功):

    bool compare_and_swap (int *addr, int oldval, int newval)
    {
      if ( *addr != oldval ) {
          return false;
      }
      *addr = newval;
      return true;
    }

    与CAS相似的还有下面的原子操作:(这些东西大家自己看Wikipedia,也没什么复杂的)

    注:在实际的C/C++程序中,CAS的各种实现版本如下:

     

    1)GCC的CAS


    GCC4.1+版本中支持CAS的原子操作(完整的原子操作可参看 GCC Atomic Builtins

    bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
    type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)

    2)Windows的CAS


    在Windows下,你可以使用下面的Windows API来完成CAS:(完整的Windows原子操作可参看MSDN的InterLocked Functions

     InterlockedCompareExchange ( __inout LONG volatile *Target,
                                     __in LONG Exchange,
                                     __in LONG Comperand);

    3) C++11中的CAS


    C++11中的STL中的atomic类的函数可以让你跨平台。(完整的C++11的原子操作可参看 Atomic Operation Library

    template< class T >
    bool atomic_compare_exchange_weak( std::atomic* obj,
                                       T* expected, T desired );
    template< class T >
    bool atomic_compare_exchange_weak( volatile std::atomic* obj,
                                       T* expected, T desired );

     

    无锁队列的链表实现


    下面的代码主要参考于两篇论文:

    (注:下面的代码并不完全与这篇论文相同)

    初始化一个队列的代码很简,初始化一个dummy结点(注:在链表操作中,使用一个dummy结点,可以少掉很多边界条件的判断),如下所示:

    InitQueue(Q)
    {
        node = new node()
        node->next = NULL;
        Q->head = Q->tail = node;
    }

    我们先来看一下进队列用CAS实现的方式,基本上来说就是链表的两步操作:

    1. 第一步,把tail指针的next指向要加入的结点。 tail->next = p;
    2. 第二步,把tail指针移到队尾。 tail = p;
    EnQueue(Q, data) //进队列
    {
        //准备新加入的结点数据
        n = new node();
        n->value = data;
        n->next = NULL;
    
        do {
            p = Q->tail; //取链表尾指针的快照
        } while( CAS(p->next, NULL, n) != TRUE); 
        //while条件注释:如果没有把结点链在尾指针上,再试
    
        CAS(Q->tail, p, n); //置尾结点 tail = n;
    }

    我们可以看到,程序中的那个 do-while 的 Retry-Loop 中的 CAS 操作:如果 p->next 是 NULL,那么,把新结点 n 加到队尾。如果不成功,则重新再来一次!

    就是说,很有可能我在准备在队列尾加入结点时,别的线程已经加成功了,于是tail指针就变了,于是我的CAS返回了false,于是程序再试,直到试成功为止。这个很像我们的抢电话热线的不停重播的情况。

    但是你会看到,为什么我们的“置尾结点”的操作(第13行)不判断是否成功,因为:

    1. 如果有一个线程T1,它的while中的CAS如果成功的话,那么其它所有的 随后线程的CAS都会失败,然后就会再循环,
    2. 此时,如果T1 线程还没有更新tail指针,其它的线程继续失败,因为tail->next不是NULL了。
    3. 直到T1线程更新完 tail 指针,于是其它的线程中的某个线程就可以得到新的 tail 指针,继续往下走了。
    4. 所以,只要线程能从 while 循环中退出来,意味着,它已经“独占”了,tail 指针必然可以被更新。

    这里有一个潜在的问题——如果T1线程在用CAS更新tail指针的之前,线程停掉或是挂掉了,那么其它线程就进入死循环了。下面是改良版的EnQueue()

    EnQueue(Q, data) //进队列改良版 v1
    {
        n = new node();
        n->value = data;
        n->next = NULL;
    
        p = Q->tail;
        oldp = p
        do {
            while (p->next != NULL)
                p = p->next;
        } while( CAS(p.next, NULL, n) != TRUE); //如果没有把结点链在尾上,再试
    
        CAS(Q->tail, oldp, n); //置尾结点
    }

    我们让每个线程,自己fetch 指针 p 到链表尾。但是这样的fetch会很影响性能。而且,如果一个线程不断的EnQueue,会导致所有的其它线程都去 fetch 他们的 p 指针到队尾,能不能不要所有的线程都干同一个事?这样可以节省整体的时间?

    比如:直接 fetch Q->tail 到队尾?因为,所有的线程都共享着 Q->tail,所以,一旦有人动了它后,相当于其它的线程也跟着动了,于是,我们的代码可以改进成如下的实现:

    EnQueue(Q, data) //进队列改良版 v2 
    {
        n = new node();
        n->value = data;
        n->next = NULL;
    
        while(TRUE) {
            //先取一下尾指针和尾指针的next
            tail = Q->tail;
            next = tail->next;
    
            //如果尾指针已经被移动了,则重新开始
            if ( tail != Q->tail ) continue;
    
            //如果尾指针的 next 不为NULL,则 fetch 全局尾指针到next
            if ( next != NULL ) {
                CAS(Q->tail, tail, next);
                continue;
            }
    
            //如果加入结点成功,则退出
            if ( CAS(tail->next, next, n) == TRUE ) break;
        }
        CAS(Q->tail, tail, n); //置尾结点
    }

    上述的代码还是很清楚的,相信你一定能看懂,而且,这也是 Java 中的 ConcurrentLinkedQueue 的实现逻辑,当然,我上面的这个版本比 Java 的好一点,因为没有 if 嵌套,嘿嘿。

    好了,我们解决了EnQueue,我们再来看看DeQueue的代码:(很简单,我就不解释了)

    DeQueue(Q) //出队列
    {
        do{
            p = Q->head;
            if (p->next == NULL){
                return ERR_EMPTY_QUEUE;
            }
        while( CAS(Q->head, p, p->next) != TRUE );
        return p->next->value;
    }

    我们可以看到,DeQueue的代码操作的是 head->next,而不是 head 本身。这样考虑是因为一个边界条件,我们需要一个dummy的头指针来解决链表中如果只有一个元素,head 和 tail 都指向同一个结点的问题,这样 EnQueue 和 DeQueue 要互相排斥了

    但是,如果 head 和 tail 都指向同一个结点,这意味着队列为空,应该返回 ERR_EMPTY_QUEUE,但是,在判断 p->next == NULL 时,另外一个EnQueue操作做了一半,此时的 p->next 不为 NULL了,但是 tail 指针还差最后一步,没有更新到新加的结点,这个时候就会出现,在 EnQueue 并没有完成的时候, DeQueue 已经把新增加的结点给取走了,此时,队列为空,但是,head 与 tail 并没有指向同一个结点。如下所示:

    虽然,EnQueue的函数会把 tail 指针置对,但是,这种情况可能还是会导致一些并发问题,所以,严谨来说,我们需要避免这种情况。于是,我们需要加入更多的判断条件,还确保这个问题。下面是相关的改进代码:

    DeQueue(Q) //出队列,改进版
    {
        while(TRUE) {
            //取出头指针,尾指针,和第一个元素的指针
            head = Q->head;
            tail = Q->tail;
            next = head->next;
    
            // Q->head 指针已移动,重新取 head指针
            if ( head != Q->head ) continue;
            
            // 如果是空队列
            if ( head == tail && next == NULL ) {
                return ERR_EMPTY_QUEUE;
            }
            
            //如果 tail 指针落后了
            if ( head == tail && next == NULL ) {
                CAS(Q->tail, tail, next);
                continue;
            }
    
            //移动 head 指针成功后,取出数据
            if ( CAS( Q->head, head, next) == TRUE){
                value = next->value;
                break;
            }
        }
        free(head); //释放老的dummy结点
        return value;
    }

    上面这段代码的逻辑和 Java 的 ConcurrentLinkedQueue 的 poll 方法很一致了。也是《Simple, Fast, and Practical Non-Blocking and Blocking ConcurrentQueue Algorithms》这篇论文中的实现。

     

    CAS的ABA问题


    所谓ABA(见维基百科的ABA词条),问题基本是这个样子:

    1. 进程P1在共享变量中读到值为A
    2. P1被抢占了,进程P2执行
    3. P2把共享变量里的值从A改成了B,再改回到A,此时被P1抢占。
    4. P1回来看到共享变量里的值没有被改变,于是继续执行。

    虽然P1以为变量值没有改变,继续执行了,但是这个会引发一些潜在的问题。ABA问题最容易发生在lock free 的算法中的,CAS首当其冲,因为CAS判断的是指针的值。很明显,值是很容易又变成原样的。

    比如上述的DeQueue()函数,因为我们要让head和tail分开,所以我们引入了一个dummy指针给head,当我们做CAS的之前,如果head的那块内存被回收并被重用了,而重用的内存又被EnQueue()进来了,这会有很大的问题。(内存管理中重用内存基本上是一种很常见的行为

    这个例子你可能没有看懂,维基百科上给了一个活生生的例子

    你拿着一个装满钱的手提箱在飞机场,此时过来了一个火辣性感的美女,然后她很暖昧地挑逗着你,并趁你不注意的时候,把用一个一模一样的手提箱和你那装满钱的箱子调了个包,然后就离开了,你看到你的手提箱还在那,于是就提着手提箱去赶飞机去了。

    这就是ABA的问题。

    解决ABA的问题

    维基百科上给了一个解——使用double-CAS(双保险的CAS),例如,在32位系统上,我们要检查64位的内容

    • 1)一次用CAS检查双倍长度的值,前半部是值,后半部分是一个计数器。
    • 2)只有这两个都一样,才算通过检查,要吧赋新的值。并把计数器累加1。

    这样一来,ABA发生时,虽然值一样,但是计数器就不一样(但是在32位的系统上,这个计数器会溢出回来又从1开始的,这还是会有ABA的问题)

    当然,我们这个队列的问题就是不想让那个内存重用,这样明确的业务问题比较好解决,论文《Implementing Lock-Free Queues》给出一这么一个方法——使用结点内存引用计数refcnt!(论文《Simple, Fast, and Practical Non-Blocking and Blocking ConcurrentQueue Algorithms》中的实现方法也基本上是一样的,用到的是增加一个计数,可以理解为版本号)

    SafeRead(q)
    {
        loop:
            p = q->next;
            if (p == NULL){
                return p;
            }
    
            Fetch&Add(p->refcnt, 1);
    
            if (p == q->next){
                return p;
            }else{
                Release(p);
            }
        goto loop;
    }

    其中的 Fetch&Add和Release分是是加引用计数和减引用计数,都是原子操作,这样就可以阻止内存被回收了。

     

    用数组实现无锁队列


    本实现来自论文《Implementing Lock-Free Queues

    http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.53.8674&rep=rep1&type=pdf

    使用数组来实现队列是很常见的方法,因为没有内存的分部和释放,一切都会变得简单,实现的思路如下:

    • 1)数组队列应该是一个ring buffer形式的数组(环形数组)
    • 2)数组的元素应该有三个可能的值:HEAD,TAIL,EMPTY(当然,还有实际的数据)
    • 3)数组一开始全部初始化成EMPTY,有两个相邻的元素要初始化成HEAD和TAIL,这代表空队列。
    • 4)EnQueue操作。假设数据x要入队列,定位TAIL的位置,使用double-CAS方法把(TAIL, EMPTY) 更新成 (x, TAIL)。需要注意,如果找不到(TAIL, EMPTY),则说明队列满了。
    • 5)DeQueue操作。定位HEAD的位置,把(HEAD, x)更新成(EMPTY, HEAD),并把x返回。同样需要注意,如果x是TAIL,则说明队列为空。

    算法的一个关键是——如何定位HEAD或TAIL?

    • 1)我们可以声明两个计数器,一个用来计数EnQueue的次数,一个用来计数DeQueue的次数。
    • 2)这两个计算器使用使用Fetch&ADD来进行原子累加,在EnQueue或DeQueue完成的时候累加就好了。
    • 3)累加后求个模什么的就可以知道TAIL和HEAD的位置了。

    如下图所示:

    小结


    以上基本上就是所有的无锁队列的技术细节,这些技术都可以用在其它的无锁数据结构上。

    • 1)无锁队列主要是通过CAS、FAA这些原子操作,和Retry-Loop实现。
    • 2)对于Retry-Loop,我个人感觉其实和锁什么什么两样。只是这种“锁”的粒度变小了,主要是“锁”HEAD和TAIL这两个关键资源。而不是整个数据结构。

    还有一些和Lock Free的文章你可以去看看:

     

    展开全文
  • 无锁队列

    2021-09-27 21:08:09
    无锁队列的实现 | 酷 壳 - CoolShell
    展开全文
  • 一直想写一个无锁队列,为了提高项目的背景效率。有机会看到linux核心kfifo.h 原则。所以这个实现自己仿照,眼下linux我们应该能够提供外部接口。#ifndef _NO_LOCK_QUEUE_H_#define _NO_LOCK_QUEUE_H_#include #...
  • CAS 无锁队列

    2021-08-03 17:20:37
    cas实现 https://www.cnblogs.com/shines77/p/4209871.html 无锁队列 https://blog.csdn.net/yand789/article/details/27545135
  • 本篇文章是对C++无锁队列的实现进行了详细的分析介绍,需要的朋友参考下
  • 无锁队列的C++实现

    2020-05-26 22:01:39
    一个无锁队列的C++实现。Miscellaneous scripts and things that dont merit their own repo. All under MIT License unless otherwise specified
  • 无锁队列理解

    2021-04-05 19:35:53
    由于普通锁的粒度比较大,以至于在并发量高的环境下,锁对于并发性能影响很大,本文章对无锁队列做探索,该无锁队列目前只支持单读单写,上干货 该队列由链表组成,每个节点有N个泛型T组成,该队列实现对T类型...
  • Linux内核中的无锁队列 - kfifo
  • 无锁队列 先大致介绍一下无锁队列无锁队列的根本是CAS函数——CompareAndSwap,即比较并交换,函数功能可以用C++函数来说明: int compare_and_swap (int* reg, int oldval, int newval) { int old_reg_val = *...
  • 关于无锁队列的概念与实现,可以参考博文《无锁队列的实现》,主要涉及到的知识点包括CAS原子操作、无锁队列的链表实现、无锁队列的数组实现以及ABA问题。  下面借鉴了《多线程的那点儿事(之无锁队列)》的代码,...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 21,249
精华内容 8,499
关键字:

无锁队列