精华内容
下载资源
问答
  • 不同于用CAS旋转锁实现的无锁容器,作者实现的容器是Wait Free的,永远不会阻塞。先来看看Queue,Queue由于FIFO的特点,对Queue的操作都在Queue的两端进行,非常适合进行并行处理。网上也有不少无锁Queue的实现,但...

    这篇文章介绍一下gc_ptr中用到的一些无锁容器。不同于用CAS旋转锁实现的无锁容器,作者实现的容器是Wait Free的,永远不会阻塞。先来看看Queue,Queue由于FIFO的特点,对Queue的操作都在Queue的两端进行,非常适合进行并行处理。网上也有不少无锁Queue的实现,但不少都是基于CAS旋转锁,链表实现的,或者使用环形数组,固定长度实现。总感觉不是很完美,于是作者自己写了一个Queue,目前这个Queue是玩具级的实现,主要是为了实践一下作者的构想与思路。好的,接下来看看作者实现的Queue有哪些特点:

    • Wait Free级别的无锁
    • 泛型容器
    • 支持多生产多消费
    • 环形数组实现,支持自动扩容。

    在来看看这个Queue有哪些成员变量,函数:

    c0697d120cd0e51a4e8acfec99ee35f7.png

    现在具体解释一下,各个成员的作用:

    • m_queue:队列主体,所有数据存在这里。
    • m_states:状态数组,由一组atomic组成。负责记录m_queue在每个位置的状态,一共由三种状态:
    1. free(表示m_queue在此位置为空,没有数据)
    2. vaild(表示m_queue在此位置有数据)
    3. copying(表示m_queue在此位置正在进行enqueue或者dequeue,正在拷贝数据)
    m_enqueue_count:表示调用了多少次enqueue,用来计算入列时写入的位置和索引。m_dequeue_count:表示调用了多少次dequeue,用来计算出列时读取的位置和索引。m_size:表示m_queue中有多少数据m_capacity:表示m_queue的最大长度。m_enqueue:表示enqueue的调用情况,开始调用enqueue时会加一,调用结束时减一。m_dequeue:表示dequeue的调用情况,开始调用dequeue时会加一,调用结束时减一。m_offset:配合m_enqueue_count,m_dequeue_count,用来计算入列出列的位置。m_resize: 当Queue满时,m_resize置true,阻塞enqueue,dequeue。并开始进行resize扩容,结束后m_resize置false。

    wait_free_queue是如何做到Wait Free的?来看Enqueue函数

    6eaadf2c012c86e8e1903f1029e8653d.png

    上图的enqueue是作者为了讲解,去除了resize机制的版本,项目中的enqueue比这个还要复杂一点。注意,上面用到的CAS函数不是CAS自旋锁,是为了保证变量的正确性使用的。例如m_size,由于m_size存在上限(m_capacity),不能简单的调用fetch_add,不然会超出上限,需要使用CAS来保证正确性。

    do
    

    如上图当m_size达到最大值后便不会再增加,并且会进入无限循坏,但项目中的enqueue是有resize扩容机制的,不会卡在循环。

    当一个线程调用enqueue时,首先增加m_size,抢占位置。只要容器没满,就可以继续走下去。这时由于m_size已经增加,大于零,所以其他线程也可以正确的调用dequeue,即使这时数据还没写入到位置(后面会有同步)。完成m_size增加后,线程会去抢占具体的写入位置:

    do
    

    注意实现中,写入位置是通过m_enqueue_count计算得来的。这么做是为了得到正确的数据的索引(编号),在多线程环境下,在容器外进行编号是没有意义的,因为第一个调用enqueue的线程可能是最后一个写入的,函数的执行顺序是不确定的。上面代码中的old_count就是数据的索引(编号),en_pos 既是写入位置。抢占完位置后,线程终于有机会去写入数据了,但还有最后一关:

    while (!m_states[en_pos].compare_exchange_strong(before_state, wait_for_state::copying))
    {
        before_state = wait_for_state::free;
    }
    m_queue[en_pos] = elem;
    m_states[en_pos].compare_exchange_strong(after_state, wait_for_state::vaild);

    由于容器中有没有数据是由m_size控制的,就算还没有写入数据,dequeue就已经可以正确运行了,所以dequeue可能会走在enqueue前面,这也是无锁编程中经典的ABA问题。于是这里多了一层状态屏障。一个数据元素有3种状态:free,copying,valid。

    • 当enqueue时元素的状态变化一定是free->copying->vaild。
    • 当dequeue时元素的状态变化一定是valid->copying->free。

    通过状态屏障即保证了原子性,也解决了ABA问题。wait_free_queue在环形数组的任意位置,都可以有多组enqueue/dequeue并行。如果dequeue走在了enqueue的前面,dequeue会等到元素变成valid在运行。上面enqueue的是逻辑,dequeue的逻辑也是类似的。此外作者还实现了enqueue_range, dequeue_range来进行批量操作,并且保证批量操作时元素之间不被间断,是连续的。

    如此三步走就可以实现一个Wait Free的队列:

    1. 抢占m_size来保证有写入/读取的位置。
    2. 抢占m_enqueue_count/m_dequeue_count来确定具体的写入/读取的位置。
    3. 通过状态屏障来保证写入/读取的原子性和避免ABA问题。

    通过类似的技巧,还可以实现stack,buffer,等容器(基于哈希的set,map还有待验证)。

    展开全文
  • #include #include #include #include #include #include #include using namespace std; templatetypename T> ...class waitfree_queue { ... waitfree_queue() : datalist_(0),freelist_(0
    #include <atomic>
    #include <future>
    #include <chrono>
    #include <vector>
    #include <iostream>
    #include <algorithm>
    #include <cassert>
    using namespace std;
    template<typename T>
    class waitfree_queue {
    public:
        waitfree_queue() : datalist_(0),freelist_(0){
            return ;
            node *head = 0;
            for (int i=0;i<10000;++i){
                node * item = new node;
                item->next=head;
                head = item;
            }
            freelist_ = head;
        }
        ~waitfree_queue(){
            node *next = freelist_;
            while(next){
                node *todel = next;
                next = todel->next;
                delete todel;
            }
        }
        //链表项
        struct node {  T data;  node * next;  };
        //插入数据
        void push(const T &data)
        {
            //从内存池中分配一个
            node * n = alloc();
            //拷贝数据
            n->data = data;
            //放入队列
            node * stale_head = datalist_.load(memory_order_relaxed);
            do {
                n->next = stale_head;
            } while (!datalist_.compare_exchange_weak(stale_head, n, memory_order_release));
        }
        //一次提取一批数据,后插入的数据在链表的前面,先插入的在链表后面。
        node * pop_all(void)
        {
            return datalist_.exchange(0, memory_order_consume);
        }
    private:
      node* alloc(){
          //读取freelist_链表的头(可能陈旧)
          node *stale_head = freelist_.load(memory_order_relaxed);
          node* new_head = 0;
          do
          {
              if (stale_head==0){
                  node* n = new node;
                  n->next=0;
                  return n;
              }
              new_head = stale_head->next;
          }
          while (!freelist_.compare_exchange_weak(stale_head, new_head, memory_order_release));
      }
      void free(node* n){
          //计算n链表的尾
          node *tail=n;
          while(tail->next){
              tail = tail->next;
          }
          //读取freelist_链表的头(可能陈旧)
          node *stale_head = freelist_.load(memory_order_relaxed);
          do {
            tail->next = stale_head;
          } while (!freelist_.compare_exchange_weak(stale_head, n, memory_order_release));
      }
    private:
      atomic<node*> datalist_  ; //数据传送的队列
      atomic<node*> freelist_  ; //预分配的内存池
    };
    waitfree_queue<int> g_queue;
    atomic<int> done;
    void foo_imp(int& count){
        // pop elements
        waitfree_queue<int>::node *head = g_queue.pop_all();
        if (head==0){
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
            return;
        }
        waitfree_queue<int>::node *ite = head;
        while(ite) {
            count+=1;
            ite = ite->next;
        }
        free(head);
    }
    int foo(){
        int count=0;
        do{
            foo_imp(count);
        }
        while(!done.load(memory_order_relaxed));
        foo_imp(count);
        return count;
    }
    int bar(){
        for(int j=0;j<1000;++j){
            for (int i=0;i<2000;++i){
                g_queue.push(1);
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
        }
        return 0;
    }
    int main(){
        done=0;
        auto f1 = async(std::launch::async,&foo);
        auto f2 = async(std::launch::async,&foo);
        auto f3 = async(std::launch::async,&foo);
        auto b1 = async(std::launch::async,&bar);
        auto b2 = async(std::launch::async,&bar);
        b1.get();
        b2.get();
        done.store(1,memory_order_relaxed);
        int count1=f1.get();
        int count2=f2.get();
        int count3=f3.get();
        cout<<"OK "<<count1+count2+count3<<endl;
        return 0;
    }


    
    
    展开全文
  • #include #include #include #include #include #include #include using namespace std; template class waitfree_queue { ... waitfree_queue() : datalist_(0),freelist_(0){} //链表项 stru
    #include <atomic>
    #include <thread>
    #include <chrono>
    #include <vector>
    #include <iostream>
    #include <algorithm>
    #include <cassert>
    using namespace std;
    template<typename T>
    class waitfree_queue {
    public:
        waitfree_queue() : datalist_(0),freelist_(0){}
        //链表项
        struct node {  T data;  node * next;  };
        //插入数据
        void push(const T &data)
        {
            //从内存池中分配一个
            node * n = alloc();
            //拷贝数据
            n->data = data;
            //放入队列
            node * stale_head = datalist_.load(memory_order_relaxed);
            do {
                n->next = stale_head;
            } while (!datalist_.compare_exchange_weak(stale_head, n, memory_order_release));
        }
        //一次提取一批数据,后插入的数据在链表的前面,先插入的在链表后面。
        node * pop_all(void)
        {
            return datalist_.exchange(0, memory_order_consume);
        }
    private:
      node* alloc(){
          //读取freelist_链表的头(可能陈旧)
          node *stale_head = freelist_.load(memory_order_relaxed);
          node* new_head = 0;
          do
          {
              if (stale_head==0){
                  node* n = new node;
                  n->next=0;
                  return n;
              }
              new_head = stale_head->next;
          }
          while (!freelist_.compare_exchange_weak(stale_head, new_head, memory_order_release));
      }
      void free(node* n){
          //计算n链表的尾
          node *tail=n;
          while(tail->next){
              tail = tail->next;
          }
          //读取freelist_链表的头(可能陈旧)
          node *stale_head = freelist_.load(memory_order_relaxed);
          do {
            tail->next = stale_head;
          } while (!freelist_.compare_exchange_weak(stale_head, n, memory_order_release));
      }
    private:
      atomic<node*> datalist_  ; //数据传送的队列
      atomic<node*> freelist_  ; //预分配的内存池
    };
    waitfree_queue<int> g_queue;
    int done=0;
    atomic<int> g_sum;
    void foo_imp(){
        // pop elements
        waitfree_queue<int>::node *head = g_queue.pop_all();
        if (head==0){
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
            return;
        }
        waitfree_queue<int>::node *ite = head;
        while(ite) {
            g_sum+=1;
            ite = ite->next;
        }
        free(head);
    }
    void foo(){
        do{
            foo_imp();
        }
        while(!done);
        std::this_thread::sleep_for(std::chrono::seconds(1));
        foo_imp();
    }
    void bar(){
        for(int j=0;j<1000;++j){
            for (int i=0;i<1000;++i){
                g_queue.push(1);
            }
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
        }
        done=1;
    }
    int main(){
        thread t0(&foo);
        thread t1(&foo);
        thread t2(&bar);
        thread t3(&bar);
        t3.join();
        t1.join();
        t2.join();
        t0.join();
        cout<<"OK"<<endl;
        return 0;
    }


    
    
    
    
    
    
    
    
    
    
    
    
    展开全文
  • Wait-Free 和 Lock-Free 算法意味着它们有更多的有点: 1 :线程中止免疫 - 杀掉系统中的任何线程都不会倒置其它线程被延迟。 2 :信号免疫 - 线程可以自由穿插执行,不会导致死锁。 3 :优先级倒置免疫 - 这...

    在上篇文章中,Unsafe的操作都是native操作,这个我们在上一节中已经分析过了,那么这些native操作有几个比较特殊:

     

     

    public final native boolean compareAndSwapObject(Object o, long offset, Object expected, Object x);
    
    public final native boolean compareAndSwapInt(Object o, long offset, int expected,  int x);
    
    public final native boolean compareAndSwapLong(Object o, long offset, long expected, long x);
    

     

    这几个函数都有个compareAndSwap操作,这个是什么操作?

    这个是典型的CAS的操作,这个操作能够保证操作的原子性;这个操作在并行化编程中是个很重要的技术,链接请戳这里现在几乎所有的CPU都至此这个操作,在X86系统结构中,compare-and-exchange(CMPXCHG)指令能够实现这个原子操作,有了这个院子操作,就可以使用这个操作实现各种无锁(Lock free)的数据结构。

    想象一下这个CMPXCHG操作是什么样子的?使用wikipedia上的一个例子来表示这个过程:

     

     

    int compare_and_swap (int* reg, int oldval, int newval) 
    {
      int old_reg_val = *reg;
      if (old_reg_val == oldval) 
         *reg = newval;
      return old_reg_val;
    }
    

     

    这个操作还是比较易懂的,先看*reg的内存值是不是等于oldval,如果是的话,将其赋值newval

    这个操作其实并不能看出来赋值是否成功,只是赋值而已;因为这个操作始终会返回oldval。我们将这个返回值设定为bool型,可以确定使用者的调用是不是成功。

     

     

    bool compare_and_swap (int *accum, int *dest, int newval)
    {
      if ( *accum == *dest ) {
          *dest = newval;
          return true;
      } else {
          *accum = *dest;
          return false;
      }
    }
    
     

    好了,现在大家理解了这个操作,与CAS类似的原子操作还有:

    Fetch-And-Add

    Test and Test-And-Set

    Test-Add-Set

    CASABA问题:

     

    问题描述:

    1:进程T读取共存变量值A

    2T进程被抢占,进程K执行

    3:进程K把共享变量值从A改为B,使用完毕后,再改回A,此时进程T抢占。

    4:进程T读取变量值A,发现没有变化,继续执行后续流程。

    问题:虽然T认为变量值没有变化,继续执行;但是这回引发潜在的问题。CAS是最容易受到影响的实现,因为CAS判断的是指针的地址。如果地址被重用,那就麻烦了。

    解决方案:

    wikipedia上的Compare-and-set后面,提供了一个Double Compare-And-Set的解法,戳这里。例如在32位系统上,需要检查64位的内容。

    1:一次CAS操作检查双倍长度的值,前半部分是指针,后半部分是计数器值。

    2:必须这两个值全部通过,才能进行Double-CAS赋值操作。

    这样的话,ABA问题发生时,值同计数器值不同,就能顺利解决ABA问题。依然的问题是,DCAS并不被所有的CPU支持(up to 2006);所以基于DCAS的解决方案都是当成理想方案来考虑的。大家考虑的解决方案依然是用CAS解决。


    上面提到了Lock-FreeWait-FreeLock-Based,那么这几种算法究竟是什么意思呢?

    Lock-Free:锁无关,一个锁无关的程序能够确保它所有线程中至少有一个能够继续往下执行。这意味着有些线程可能会被任意的延迟,然而在每一个步骤中至少有一个线程能够执行下去。因此这个系统作为一个整体总是在前进的,尽管有些线程的进度可能没有其它线程走的快。

    Wait-Free:等待无关,一个等待无关的程序可以在有限步之内结束,而不管其它线程的相对执行速度如何。

    Lock-Based:基于锁,基于锁的程序无法提供上面的任何保证,任一线程持有了某互斥体并处于等待状态,那么其它想要获取同意互斥体的线程只有等待,所有基于锁的算法无法摆脱死锁的阴影。

    那么根据上述的描述,Wait-FreeLock-Free算法意味着它们有更多的有点:

    1:线程中止免疫-杀掉系统中的任何线程都不会倒置其它线程被延迟。

    2:信号免疫-线程可以自由穿插执行,不会导致死锁。

    3:优先级倒置免疫-这两种算法对优先级倒置免疫。

    可见在高并发系统中,基于lock-freewait-free的算法对性能的关键性。

    最后返回来,我们再看下AtomicLonggetAndIncrement实现:

     

     

    public final long getAndIncrement() {
            while (true) {
                long current = get();
                long next = current + 1;
                if (compareAndSet(current, next))
                    return current;
            }
    }
    

     

    有感觉到这个实现熟悉嘛?再回一下compareAndSetC++实现,你还有什么想问的呢。


    大家在学习时,要多看源码,考虑下开源代码的实现思路,认真学习,总结,再学习,形成自己的学习方法。下面提供几个算法的系统实现链接:

     

    Java:

                LockfreeStack:  https://github.com/mthssdrbrg/LockFreeStack

    Disruptor: http://www.searchtb.com/2012/10/introduction_to_disruptor.html

     

    C:

    Nobel:  http://www.cse.chalmers.se/research/group/noble/

    Lock-free-lib: http://www.cl.cam.ac.uk/research/srg/netos/lock-free/

     

    更多项目实现:

    http://www.rossbencina.com/code/lockfree

     

    展开全文
  • s fast, wait free, and doesn't require any allocations after the start. It doesn't use the epoch manager but it seems fitting. <p>The allocate/deallocate functions are kinda hacky, but I don&#...
  • lock-free&wait-free

    千次阅读 2017-06-16 10:36:09
    lock-free:需要取得锁的线程在有限步骤或时间内内就可以成功(多数线程都会成功,一些可能失败,比wait-free语义稍弱) wait-free:需要取得锁的线程在有限步骤或时间内内就可以成功(任意线程都会成功,语义更加强烈...
  • wait-free buffer

    2020-12-08 23:38:58
    <p>The proposed solution here is to use an array of buffers instead of a single buffer, with a wait-free lock on each buffer that goroutines have to acquire before writing metrics. They don't ...
  • Blocking vs Starvation-Free vs Obstruction-Free vs Lock-Free vs Wait-Free vs Wait-Free Bounded vs Wait-Free Population Oblivious   Blocking:   Starvation-Free:   Obstruction-Free:   Lock...
  • There are two types of non-blocking thread synchronization algorithms - lock-free, and wait-free. https://rethinkdb.com/blog/lock-free-vs-wait-free-concurrency ...
  • A Wait-Free MPMCQueue

    2020-12-28 08:09:17
    ve just created a wait-free MPMC based on an idea by this project, can you help take a look: <a href="https://github.com/MengRao/WFMPMC">WFMPMC</a>. <p>Thanks.</p><p>该提问来源于开源项目:...
  • Wait-free WAL

    2020-12-27 00:40:39
    <div><h3>New struct <pre><code> pub struct Wal { files: FileLocator, sequence_number: AtomicU64, total_size: AtomicU64, active_file: crossbeam_epoch::Atomic<file>, file_rollover_size: u64, ...
  • Busy wait-free TSCH

    2020-12-26 03:46:04
    * [ ] CCA check: not enabled by default, but if enabled, the CCA check is done as a busy wait * [ ] Tx and ACK Tx: until we have async RF operations ...
  • Fast Wait-free Queue

    2017-11-29 21:19:42
    本文介绍一种高效的,很有价值的无等待队列实现 fast wait-free queue。它基于FAA(fetch-and-add)+CAS(compare-and-swap)来实现。第一步的FAA避开高度竞争情况下第一个CAS造成的竞争。同时它还提供最强的wait-...
  •   原文:...Over the past two decades the research community has developed a body of knowledge concerning “Lock-Free” and “Wait-Free” algorithms and data struct...
  • 一个Wait-Free MPMC队列的实现

    千次阅读 2018-10-18 13:57:16
    昨天在wait-free是指什么的评论中,我和朱元兄对wait-free的MPMC(多生产者多消费者)队列进行了一番探讨,也激发了我对已有的wait-free思路进行更深入的挖掘,结果发现MPMC队列有可能做到完全wait-free。...
  • 所谓lock-freewait-free算法是指对于共享的数据并非对其加锁来控制访问,而是多个线程并行的访问。通过该算法可以达到对共享对象并发的读写而不会破坏对象本身。所谓lock-free是指对于线程不加锁,让系统执行所有...
  • Lock-free VS wait-free

    千次阅读 2015-01-07 11:21:52
    转载自并发编程网 有两种非阻塞线程同步算法,即无锁和无等待,这两种算法经常会产生混淆。 ...在无锁系统中,当任何特定的运算被阻塞的时候,所有CPU可以继续处理其他的运算。换种方式说,在无锁系统中,当给定...
  • <div><p>it provides a full-fat wait-free <code>relaxedPoll</code> for the mpmc xadd q. Soon I will publish some results to check how/if it improves if compared with <code>poll</code>. I'm not sure...
  • ArcCell: wait-free get()

    2021-01-08 16:28:54
    <code>set()</code> is wait-free with respect to other <code>set()</code>s. Whenever any <code>get()</code> arrives, the <code>set()</code> will block and have to wait until no concurrent <code>get()...
  • 所谓lock-freewait-free算法是指对于共享的数据并非对其加锁来控制访问,而是多个线程并行的访问。通过该算法可以达到对共享对象并发的读写而不会破坏对象本身。所谓lock-free是指对于线程不加锁,让系统执行所有...
  • ZooKeeper Wait-free coordination for Internet-scale systems
  • <div><p>JdbcConnectionPool comes with a object level lock for ...<p>A wait-free implement with atomic types and wait-free queue may help.</p><p>该提问来源于开源项目:h2database/h2database</p></div>
  • wait-free 和 lock-free 资料收集

    千次阅读 2011-12-26 15:26:16
    等待无关(Wait-Free)/锁无关(Lock-Free)与基于锁(Lock-Based)的比较 一个“等待无关”的程序可以在有限步之内结束,而不管其它线程的相对速度如何。, @% B- m6 I/ O: k0 N7 W f0 [ 一个“锁无关”的程序...
  • perf: wait-free WAL

    2020-12-30 20:46:44
    <div><p>https://github.com/influxdata/influxdb_iox/issues/437</p><p>该提问来源于开源项目:influxdata/influxdb_iox</p></div>
  • A Lock-Free Wait-Free Hash TableDr. Cliff Click Distinguished Engineer• Constant-Time Key-Value Mapping • Fast arbitrary function • Extendable, defined at runtime • Used for symbol tables, DB ...
  • lock-free/wait-free算法以及ABA问题

    千次阅读 2012-03-18 14:14:04
    Came for the beer, stayed for the Freedom ...Introduction to lock-free/wait-free and the ABA problem I recently read a paper on Hazard Pointers[2] and thought I would share what I learned and some
  • <p>This wait-free write method provides a <strong>stronger progress guarantee, theoretically. But, it doesn't as fast as the current one actually :( If not, please ignore.</p><p>该提问来源于开源...
  • 所以使用一种更快速的类似wait-free队列,从而获得更好的公平性。 几乎所有的无锁算法(先不考虑垃圾回收),对于一个线程而言,都是一个如下的过程: 要么成功(本操作make progress)、要么失败(其他线程的操作...
  • WAITEVENT latch free Reference Note 文档 ID 34576 1

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 3,824
精华内容 1,529
关键字:

waitfree