为您推荐:
精华内容
最热下载
问答
  • 5星
    2KB lifexx 2016-08-15 17:28:01
  • 由以下博客的分析可以知道,内核的kfifo使用了很多技巧以实现...还用设置buffer缓冲区的大小为2的幂次方,以简化求模运算,这样求模运算就演变为(fifo->in & (fifo->size - 1))。通过使用unsigned int为kf...

    由以下博客的分析可以知道,内核的kfifo使用了很多技巧以实现其高效性。比如,通过限定写入的数据不能溢出和内存屏障实现在单线程写单线程读的情况下不使用锁。因为锁是使用在共享资源可能存在冲突的情况下。还用设置buffer缓冲区的大小为2的幂次方,以简化求模运算,这样求模运算就演变为 (fifo->in & (fifo->size - 1))。通过使用unsigned int为kfifo的下标,可以不用考虑每次下标超过size时对下表进行取模运算赋值,这里使用到了无符号整数的溢出回零的特性。由于指示读写指针的下标一直在增加,没有进行取模运算,知道其溢出,在这种情况下写满和读完就是不一样的标志,写满是两者指针之差为fifo->size,读完的标志是两者指针相等。后面有一篇博客还介绍了VxWorks下的环形缓冲区的实现机制点击打开链接,从而可以看出linux下的fifo的灵巧性和高效性。

     

    通过这篇文章也了解到了一些计算机体系结构的知识:多核计算机,每个核都有一个cache。

     

    眉目传情之匠心独运的kfifo

     

    Author:Echo Chen(陈斌)

    Email:chenb19870707@gmail.com

    Blog:Blog.csdn.net/chen19870707

    Date:October 8th, 2014

          学不考儒,务掇精华;文不按古,匠心独运。Linux kernal 鬼斧神工,博大精深,让人叹为观止,拍手叫绝。然匠心独运的设计并非扑朔迷离、盘根错节,真正的匠心独运乃辞简理博、化繁为简,在简洁中昭显优雅和智慧,kfifo就是这样一种数据结构,它就是这样简约高效,匠心独运,妙不可言,下面就跟大家一起探讨学习。

     

    一、kfifo概述

    本文分析的原代码版本2.6.32.63
    kfifo的头文件include/linux/kfifo.h
    kfifo的源文件kernel/kfifo.c

     

    kfifo是一种"First In First Out “数据结构,它采用了前面提到的环形缓冲区来实现,提供一个无边界的字节流服务。采用环形缓冲区的好处为,当一个数据元素被用掉后,其余数据元素不需要移动其存储位置,从而减少拷贝提高效率。更重要的是,kfifo采用了并行无锁技术,kfifo实现的单生产/单消费模式的共享队列是不需要加锁同步的。

       1: struct kfifo {
       2:     unsigned char *buffer;    /* the buffer holding the data */
       3:     unsigned int size;    /* the size of the allocated buffer */
       4:     unsigned int in;    /* data is added at offset (in % size) */
       5:     unsigned int out;    /* data is extracted from off. (out % size) */
       6:     spinlock_t *lock;    /* protects concurrent modifications */
       7: };
    buffer用于存放数据的缓存
    size缓冲区空间的大小,在初化时,将它向上圆整成2的幂
    in指向buffer中队头
    out指向buffer中的队尾
    lock如果使用不能保证任何时间最多只有一个读线程和写线程,必须使用该lock实施同步。

     

    它的结构如图:

    image

    这看起来与普通的环形缓冲区没有什么差别,但是让人叹为观止的地方就是它巧妙的用 in 和 out 的关系和特性,处理各种操作,下面我们来详细分析。

     

    二、kfifo内存分配和初始化

     

    首先,看一个很有趣的函数,判断一个数是否为2的次幂,按照一般的思路,求一个数n是否为2的次幂的方法为看 n % 2 是否等于0, 我们知道“取模运算”的效率并没有 “位运算” 的效率高,有兴趣的同学可以自己做下实验。下面再验证一下这样取2的模的正确性,若n为2的次幂,则n和n-1的二进制各个位肯定不同 (如8(1000)和7(0111)),&出来的结果肯定是0;如果n不为2的次幂,则各个位肯定有相同的 (如7(0111) 和6(0110)),&出来结果肯定为0。是不是很巧妙?

       1: bool is_power_of_2(unsigned long n)
       2: {
       3:     return (n != 0 && ((n & (n - 1)) == 0));
       4: }

    再看下kfifo内存分配和初始化的代码,前面提到kfifo总是对size进行2次幂的圆整,这样的好处不言而喻,可以将kfifo->size取模运算可以转化为与运算,如下:
               kfifo->in % kfifo->size 可以转化为 kfifo->in & (kfifo->size – 1)

     

    “取模运算”的效率并没有 “位运算” 的效率高还记得不,不放过任何一点可以提高效率的地方。

       1: struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
       2: {
       3:     unsigned char *buffer;
       4:     struct kfifo *ret;
       5:  
       6:     /*
       7:      * round up to the next power of 2, since our 'let the indices
       8:      * wrap' technique works only in this case.
       9:      */
      10:     if (!is_power_of_2(size)) {
      11:         BUG_ON(size > 0x80000000);
      12:         size = roundup_pow_of_two(size);
      13:     }
      14:  
      15:     buffer = kmalloc(size, gfp_mask);
      16:     if (!buffer)
      17:         return ERR_PTR(-ENOMEM);
      18:  
      19:     ret = kfifo_init(buffer, size, gfp_mask, lock);
      20:  
      21:     if (IS_ERR(ret))
      22:         kfree(buffer);
      23:  
      24:     return ret;
      25: }

     

    三、kfifo并发无锁奥秘---内存屏障

      

       为什么kfifo实现的单生产/单消费模式的共享队列是不需要加锁同步的呢?天底下没有免费的午餐的道理人人都懂,下面我们就来看看kfifo实现并发无锁的奥秘。

    我们知道 编译器编译源代码时,会将源代码进行优化,将源代码的指令进行重排序,以适合于CPU的并行执行。然而,内核同步必须避免指令重新排序,优化屏障(Optimization barrier)避免编译器的重排序优化操作,保证编译程序时在优化屏障之前的指令不会在优化屏障之后执行

    举个例子,如果多核CPU执行以下程序:

       1: a = 1;
       2: b = a + 1;
       3: assert(b == 2);

    假设初始时a和b的值都是0,a处于CPU1-cache中,b处于CPU0-cache中。如果按照下面流程执行这段代码:

    1 CPU0执行a=1; 
    2 因为a在CPU1-cache中,所以CPU0发送一个read invalidate消息来占有数据 
    3 CPU0将a存入store buffer 
    4 CPU1接收到read invalidate消息,于是它传递cache-line,并从自己的cache中移出该cache-line 
    5 CPU0开始执行b=a+1; 
    6 CPU0接收到了CPU1传递来的cache-line,即“a=0” 
    7 CPU0从cache中读取a的值,即“0” 
    8 CPU0更新cache-line,将store buffer中的数据写入,即“a=1” 
    9 CPU0使用读取到的a的值“0”,执行加1操作,并将结果“1”写入b(b在CPU0-cache中,所以直接进行) 
    10 CPU0执行assert(b == 2); 失败

    软件可通过读写屏障强制内存访问次序。读写屏障像一堵墙,所有在设置读写屏障之前发起的内存访问,必须先于在设置屏障之后发起的内存访问之前完成,确保内存访问按程序的顺序完成。Linux内核提供的内存屏障API函数说明如下表。内存屏障可用于多处理器和单处理器系统,如果仅用于多处理器系统,就使用smp_xxx函数,在单处理器系统上,它们什么都不要。

    smp_rmb
    适用于多处理器的读内存屏障。
    smp_wmb
    适用于多处理器的写内存屏障。
    smp_mb
    适用于多处理器的内存屏障。

    如果对上述代码加上内存屏障,就能保证在CPU0取a时,一定已经设置好了a = 1:

     

       1: void foo(void)
       2: {
       3:  a = 1;
       4:  smp_wmb();
       5:  b = a + 1;
       6: }

    这里只是简单介绍了内存屏障的概念,如果想对内存屏障有进一步理解,请参考我的译文《为什么需要内存屏障》。

     

     

    四、kfifo的入队__kfifo_put和出队__kfifo_get操作

     

          __kfifo_put是入队操作,它先将数据放入buffer中,然后移动in的位置,其源代码如下:

       1: unsigned int __kfifo_put(struct kfifo *fifo,
       2:             const unsigned char *buffer, unsigned int len)
       3: {
       4:     unsigned int l;
       5:  
       6:     len = min(len, fifo->size - fifo->in + fifo->out);
       7:  
       8:     /*
       9:      * Ensure that we sample the fifo->out index -before- we
      10:      * start putting bytes into the kfifo.
      11:      */
      12:  
      13:     smp_mb();
      14:  
      15:     /* first put the data starting from fifo->in to buffer end */
      16:     l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
      17:     memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
      18:  
      19:     /* then put the rest (if any) at the beginning of the buffer */
      20:     memcpy(fifo->buffer, buffer + l, len - l);
      21:  
      22:     /*
      23:      * Ensure that we add the bytes to the kfifo -before-
      24:      * we update the fifo->in index.
      25:      */
      26:  
      27:     smp_wmb();
      28:  
      29:     fifo->in += len;
      30:  
      31:     return len;
      32: }

     

    6行,环形缓冲区的剩余容量为fifo->size - fifo->in + fifo->out,让写入的长度取len和剩余容量中较小的,避免写越界;

    13行,加内存屏障,保证在开始放入数据之前,fifo->out取到正确的值(另一个CPU可能正在改写out值)

    16行,前面讲到fifo->size已经2的次幂圆整,而且kfifo->in % kfifo->size 可以转化为 kfifo->in & (kfifo->size – 1),所以fifo->size - (fifo->in & (fifo->size - 1)) 即位 fifo->in 到 buffer末尾所剩余的长度,l取len和剩余长度的最小值,即为需要拷贝l 字节到fifo->buffer + fifo->in的位置上。

    17行,拷贝l 字节到fifo->buffer + fifo->in的位置上,如果l = len,则已拷贝完成,第20行len – l 为0,将不执行,如果l = fifo->size - (fifo->in & (fifo->size - 1)) ,则第20行还需要把剩下的 len – l 长度拷贝到buffer的头部。

    27行,加写内存屏障,保证in 加之前,memcpy的字节已经全部写入buffer,如果不加内存屏障,可能数据还没写完,另一个CPU就来读数据,读到的缓冲区内的数据不完全,因为读数据是通过 in – out 来判断的。

    29行,注意这里 只是用了 fifo->in +=  len而未取模,这就是kfifo的设计精妙之处,这里用到了unsigned int的溢出性质,当in 持续增加到溢出时又会被置为0,这样就节省了每次in向前增加都要取模的性能,锱铢必较,精益求精,让人不得不佩服。

    __kfifo_get是出队操作,它从buffer中取出数据,然后移动out的位置,其源代码如下:

       1: unsigned int __kfifo_get(struct kfifo *fifo,
       2:              unsigned char *buffer, unsigned int len)
       3: {
       4:     unsigned int l;
       5:  
       6:     len = min(len, fifo->in - fifo->out);
       7:  
       8:     /*
       9:      * Ensure that we sample the fifo->in index -before- we
      10:      * start removing bytes from the kfifo.
      11:      */
      12:  
      13:     smp_rmb();
      14:  
      15:     /* first get the data from fifo->out until the end of the buffer */
      16:     l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
      17:     memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
      18:  
      19:     /* then get the rest (if any) from the beginning of the buffer */
      20:     memcpy(buffer + l, fifo->buffer, len - l);
      21:  
      22:     /*
      23:      * Ensure that we remove the bytes from the kfifo -before-
      24:      * we update the fifo->out index.
      25:      */
      26:  
      27:     smp_mb();
      28:  
      29:     fifo->out += len;
      30:  
      31:     return len;
      32: }

     

    6行,可去读的长度为fifo->in – fifo->out,让读的长度取len和剩余容量中较小的,避免读越界;

    13行,加读内存屏障,保证在开始取数据之前,fifo->in取到正确的值(另一个CPU可能正在改写in值)

    16行,前面讲到fifo->size已经2的次幂圆整,而且kfifo->out % kfifo->size 可以转化为 kfifo->out & (kfifo->size – 1),所以fifo->size - (fifo->out & (fifo->size - 1)) 即位 fifo->out 到 buffer末尾所剩余的长度,l取len和剩余长度的最小值,即为从fifo->buffer + fifo->in到末尾所要去读的长度。

    17行,从fifo->buffer + fifo->out的位置开始读取l长度,如果l = len,则已读取完成,第20行len – l 为0,将不执行,如果l =fifo->size - (fifo->out & (fifo->size - 1)) ,则第20行还需从buffer头部读取 len – l 长。

    27行,加内存屏障,保证在修改out前,已经从buffer中取走了数据,如果不加屏障,可能先执行了增加out的操作,数据还没取完,令一个CPU可能已经往buffer写数据,将数据破坏,因为写数据是通过fifo->size - (fifo->in & (fifo->size - 1))来判断的 。

    29行,注意这里 只是用了 fifo->out +=  len 也未取模,同样unsigned int的溢出性质,当out 持续增加到溢出时又会被置为0,如果in先溢出,出现 in  < out 的情况,那么 in – out 为负数(又将溢出),in – out 的值还是为buffer中数据的长度。

     

    这里图解一下 in 先溢出的情况,size = 64, 写入前 in = 4294967291, out = 4294967279 ,数据 in – out = 12;

    image

        写入 数据16个字节,则 in + 16 = 4294967307,溢出为 11,此时 in – out = –4294967268,溢出为28,数据长度仍然正确,由此可见,在这种特殊情况下,这种计算仍然正确,是不是让人叹为观止,妙不可言?

     

    image

    五、扩展

              kfifo设计精巧,妙不可言,但主要为内核提供服务,内存屏障函数也主要为内核提供服务,并未开放出来,但是我们学习到了这种设计巧妙之处,就可以依葫芦画瓢,写出自己的并发无锁环形缓冲区,这将在下篇文章中给出,至于内存屏障函数的问题,好在gcc 4.2以上的版本都内置提供__sync_synchronize()这类的函数,效果相差不多。《眉目传情之并发无锁环形队列的实现》给出自己的并发无锁的实现,有兴趣的朋友可以参考一下。

     

    Reference

    1.http://blog.csdn.net/xujianqun/article/details/7800813

    2.http://zh.wikipedia.org/wiki/%E7%92%B0%E5%BD%A2%E7%B7%A9%E8%A1%9D%E5%8D%80#.E7.94.A8.E6.B3.95

    3.http://blog.csdn.net/linyt/article/details/5764312

     

    -

    Echo Chen:Blog.csdn.net/chen19870707

    -

    展开全文
    zxh2075 2019-04-10 19:31:46
  • 前段时间有个项目要实现一个基于live555的rtspserver,部分功能要用到环形缓冲区,网上看了一些blog,大部分是实验性质的,不太敢用,原理比较简单,所以就自己写了一个;实现环形缓冲区的关键点:1. 一个线程读,一...

    前段时间有个项目要实现一个基于live555的rtspserver,部分功能要用到环形缓冲区,网上看了一些blog,大部分是实验性质的,不太敢用,原理比较简单,所以就自己写了一个;

    实现环形缓冲区的关键点:

    1. 一个线程读,一个线程写

    2. 读线程维护读指针,写线程维护写指针

    3. 数据一致性

    3.1 写线程写数据时,要先确定读指针;读线程读数据时,要先确定写指针;

    这里写的可能比较拗口,其实就是 写线程写数据时,需要多次用使用读指针,比如说计算ringbuf可用空间,是否达到ringbuf末尾等等;由于读指针是在读线程里实时更新的,所以写线程写数据函数多次使用读指针时,读指针的值会不一样;解决这个问题只需要在 读/写 函数 开始处 定义一个临时变量,保存 读/写 指针的值,后续计算都使用该临时变量就OK了;

    3.2 这里多说几句废话;

    ringbuf实现类中可能不仅会开放 ReadData/WriteData接口,还会有类似GetRingbufDataLen的函数,这类函数内部肯定要使用读写指针,这个时候就要注意,如果ReadData/WriteData函数调用这类函数,要保证它们和ReadData/WriteData 使用的读写指针的值是一致的。

    说完废话贴代码:

    //left 1Byte for guard

    int RingBuffer::GetFreeBufferBytes(int iRidx, int iWidx)

    {

    if (iRidx == iWidx)

    {

    return m_uBufferSize-1;

    }

    else if (iWidx > iRidx)

    {

    return (iRidx - iWidx + m_uBufferSize - 1);

    }

    else

    {

    return (iRidx - iWidx - 1);

    }

    }

    //this func can write to the addr = ridx-1 (at most)

    int RingBuffer::Write(const unsigned char* pBuf, unsigned writeLen)

    {

    //m_pBuffer may alloc memory failed

    if (!m_pBuffer)

    {

    return -1;

    }

    int iRidx = m_iRIdx;

    int iWidx = m_iWidx;

    if (!pBuf || 0 == writeLen || GetFreeBufferBytes(iRidx, iWidx) < writeLen)

    {

    return -1;

    }

    int len1 = 0;

    if (m_iWidx < iRidx)

    {

    memcpy(&m_pBuffer[m_iWidx], pBuf, writeLen);

    m_iWidx += writeLen;

    }

    else

    {

    len1 = m_uBufferSize - m_iWidx;

    if (writeLen <= len1)

    {

    memcpy(&m_pBuffer[m_iWidx], pBuf, writeLen);

    m_iWidx += writeLen;

    }

    else

    {

    memcpy(&m_pBuffer[m_iWidx], pBuf, len1);

    memcpy(m_pBuffer, pBuf + len1, writeLen - len1);

    m_iWidx = writeLen - len1;

    }

    }

    return writeLen;

    }

    //

    int RingBuffer::Read(unsigned char* pBuf, unsigned readLen)

    {

    //m_pBuffer may alloc memory failed

    if (!m_pBuffer)

    {

    return -1;

    }

    if (!pBuf)

    {

    return -1;

    }

    int iWidx = m_iWidx;

    int iRidx = m_iRIdx;

    int bufferDataLen = m_uBufferSize - GetFreeBufferBytes(iRidx, iWidx) - 1;

    if (bufferDataLen <= readLen)

    {

    //can not use readall here, because GetFreeBufferBytes func and readall func may use the different

    //ridx and widx

    return ReadToWidx(pBuf, iWidx);

    }

    else

    {

    if (m_iRIdx < iWidx)

    {

    memcpy(pBuf, &m_pBuffer[m_iRIdx], readLen);

    m_iRIdx += readLen;

    }

    else

    {

    int len1 = m_uBufferSize - m_iRIdx;

    if (len1 >= readLen)

    {

    memcpy(pBuf, &m_pBuffer[m_iRIdx], readLen);

    m_iRIdx += readLen;

    }

    else

    {

    memcpy(pBuf, &m_pBuffer[m_iRIdx], len1);

    memcpy(pBuf + len1, m_pBuffer, readLen - len1);

    m_iRIdx = readLen - len1;

    }

    } //end m_iRIdx >= m_iWidx

    return readLen;

    }//end bufferDataLen > readLen

    }

    // read to widx

    int RingBuffer::ReadToWidx(unsigned char* pBuf, int iWidx)

    {

    //m_pBuffer may alloc memory failed

    if (!m_pBuffer)

    {

    return -1;

    }

    if (!pBuf || m_iWidx == m_iRIdx)

    {

    return -1;

    }

    int curWidx = m_iWidx;

    if (m_iRIdx < curWidx)

    {

    if (iWidx < m_iRIdx || iWidx > curWidx)

    {

    return -1;

    }

    }

    else

    {

    if (iWidx > curWidx && iWidx < m_iRIdx)

    {

    return -1;

    }

    }

    //must use temp varible here

    //int iWidx = m_iWidx;

    int readLen = 0;

    if (m_iRIdx > iWidx)

    {

    memcpy(pBuf, &m_pBuffer[m_iRIdx], m_uBufferSize - m_iRIdx);

    memcpy(pBuf + m_uBufferSize - m_iRIdx, m_pBuffer, iWidx);

    readLen = m_uBufferSize - m_iRIdx + iWidx;

    }

    else

    {

    memcpy(pBuf, &m_pBuffer[m_iRIdx], iWidx - m_iRIdx);

    readLen = iWidx - m_iRIdx;

    }

    //###can not set m_iRIdx = m_iWidx!!!!!

    m_iRIdx = iWidx;

    return readLen;

    }

    int RingBuffer::ReadAll(unsigned char* pBuf)

    {

    //m_pBuffer may alloc memory failed

    if (!m_pBuffer)

    {

    return -1;

    }

    if (!pBuf || m_iWidx == m_iRIdx)

    {

    return -1;

    }

    return ReadToWidx(pBuf, m_iWidx);

    } 关键函数就这些了,有些地方写的比较繁琐,因为已经测试通过了,当做工具类,暂时就不改了...

    展开全文
    weixin_35403151 2021-05-22 06:49:30
  • DEFINE_KFIFO宏参数1是一个变量名(调用者只给一个名称,宏内部负责定义类型和变量),参数2是成员类型(自定义结构体,也可以是任意其他类型),参数3是缓冲区成员个数(必须是2的幂)。 DEFINE_KFIFO(g_canbuf, ...

    kfifo的移植

    两个月前,我花了两天时间,查找Linux内核里kfifo的相关资料,将其从内核层移植到应用层,并成功应用于多线程CAN总线采集程序(一个线程接收/一个线程输出)。kfifo.c是从Linux 5.3 stable内核代码里复制出来的,路径是lib/kfifo,对应的kfifo.h路径是include/linux/kfifo.h。由于kfifo是内核里的代码,应用层无法直接使用,我做了如下修改:

    • 注释掉无关的或不必要的代码,如对内核头文件的引用,如涉及dma、sgl的代码

    • 重新实现某些功能,如采用SO上的代码取代了roundup_pow_of_two,用GCC内置函数__sync_synchronize取代了smp_wmb,重新定义了ARRAY_SIZE

    代码仓库:https://github.com/liigo/kfifo

    kfifo的使用

    很简单就三点:用 DEFINE_KFIFO 定义变量并初始化,用 kfifo_in 向缓冲区内写入数据,用 kfifo_out 从缓冲区取出数据。DEFINE_KFIFO宏参数1是一个变量名(调用者只给一个名称,宏内部负责定义类型和变量),参数2是成员类型(自定义结构体,也可以是任意其他类型),参数3是缓冲区成员个数(必须是2的幂)。

    DEFINE_KFIFO(g_canbuf, buf_item_t, 1024);
    kfifo_in(&g_canbuf, &bufitem, 1);
    int n = kfifo_out(&g_canbuf, &bufitem, 1);
    

    kfifo代码里大量使用宏,理解起来很费劲,主要因为宏参数的类型不明确。


    20210528 Liigo 补记,关于kfifo对象的定义和初始化,大致有以下几种模式:

    • 模式1. DEFINE_KFIFO(name, int, 1024); 定义变量name并对其初始化。可用于定义全局变量和结构体变量。
    • 模式2. DECLARE_KFIFO(name, int, 1024); + INIT_KFIFO(name); 定义和初始化分开,这样一来前者可以出现在结构体里面用于定义结构体成员,比模式1更灵活。另外某些旧版gcc不支持模式1的也可使用此模式。
    • 模式3. DECLARE_KFIFO_PTR(name, int); + kfifo_init(&name, buf, size); 另一种形式的定义+初始化,缓冲区不在对象内部而是由程序员另行动态分配。相比前两模式,此模式可在运行时确定缓冲区大小(单位是字节),更加灵活;而且如此定义的name变量占用内存较小(32位系统下占24字节)。

    kfifo的设计和实现

    关于Linux内核kfifo的设计和实现的精妙之处,推荐大家阅读如下文章:

    需要特别说明的是,以上第三方分析文章所基于的kfifo内核版本都相对陈旧,而本文所采用的代码所属内核版本是当前最新的5.3。

    展开全文
    liigo 2019-09-18 18:46:45
  • 14KB weixin_38748263 2021-01-28 12:41:44
  • 3KB weixin_42111465 2021-05-18 03:11:52
  • kfifo是内核里面的一个First In First Out数据结构,它采用环形循环队列的数据结构来实现;它提供一个无边界的字节流服务,最重要的一点是,它使用并行无锁编程技术,即当它用于只有一个入队线程和一个出队线程的场...

    原文:CSDN博主-海枫

    http://blog.csdn.net/linyt/article/details/5764312

     

    Linux kernel里面从来就不缺少简洁,优雅和高效的代码,只是我们缺少发现和品味的眼光。在Linux kernel里面,简洁并不表示代码使用神出鬼没的超然技巧,相反,它使用的不过是大家非常熟悉的基础数据结构,但是kernel开发者能从基础的数据结构中,提炼出优美的特性。

    kfifo就是这样的一类优美代码,它十分简洁,绝无多余的一行代码,却非常高效。关于kfifo信息如下:

    本文分析的原代码版本:2.6.24.4

    kfifo的定义文件:kernel/kfifo.c

    kfifo的头文件:  include/linux/kfifo.h

    1. kfifo概述

    kfifo是内核里面的一个First In First Out数据结构,它采用环形循环队列的数据结构来实现;它提供一个无边界的字节流服务,最重要的一点是,它使用并行无锁编程技术,即当它用于只有一个入队线程和一个出队线程的场情时,两个线程可以并发操作,而不需要任何加锁行为,就可以保证kfifo的线程安全。

    kfifo代码既然肩负着这么多特性,那我们先一敝它的代码:

    struct kfifo {   
        unsigned char *buffer;    /* the buffer holding the data */   
        unsigned int size;    /* the size of the allocated buffer */   
        unsigned int in;    /* data is added at offset (in % size) */   
        unsigned int out;    /* data is extracted from off. (out % size) */   
        spinlock_t *lock;    /* protects concurrent modifications */   
    };  

      

    这是kfifo的数据结构,kfifo主要提供了两个操作,__kfifo_put(入队操作)和__kfifo_get(出队操作)。 它的各个数据成员如下:

    buffer, 用于存放数据的缓存

    size,      buffer空间的大小,在初化时,将它向上扩展成2的幂

    lock,      如果使用不能保证任何时间最多只有一个读线程和写线程,需要使用该lock实施同步。

    in, out,  和buffer一起构成一个循环队列。 in指向buffer中队头,而且out指向buffer中的队尾,它的结构如示图如下:

    +--------------------------------------------------------------+ 
    |           |<----------data---------->|                                    | 
    +--------------------------------------------------------------+ 
                ^                                       ^  
                 |                                        | 
                in                                       out

    当然,内核开发者使用了一种更好的技术处理了in, out和buffer的关系,我们将在下面进行详细的分析。

    2. kfifo_alloc 分配kfifo内存和初始化工作

    struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)   
    {   
        unsigned char *buffer;   
        struct kfifo *ret;   
      
        /*  
         * round up to the next power of 2, since our 'let the indices  
         * wrap' tachnique works only in this case.  
         */   
        if (size & (size - 1)) {   
            BUG_ON(size > 0x80000000);   
            size = roundup_pow_of_two(size);   
        }   
      
        buffer = kmalloc(size, gfp_mask);   
        if (!buffer)   
            return ERR_PTR(-ENOMEM);   
      
        ret = kfifo_init(buffer, size, gfp_mask, lock);   
      
        if (IS_ERR(ret))   
            kfree(buffer);   
      
        return ret;   
    }   

     

    这里值得一提的是,kfifo->size的值总是在调用者传进来的size参数的基础上向2的幂扩展,这是内核一贯的做法。这样的好处不言而喻--对kfifo->size取模运算可以转化为与运算,如下:

    kfifo->in % kfifo->size 可以转化为 kfifo->in & (kfifo->size – 1)

    在kfifo_alloc函数中,使用size & (size – 1)来判断size 是否为2幂,如果条件为真,则表示size不是2的幂,然后调用roundup_pow_of_two将之向上扩展为2的幂。 这些都是很常用的技巧,只不过大家没有将它们结合起来使用而已,下面要分析的__kfifo_put和__kfifo_get则是将kfifo->size的特点发挥到了极致。

     

    3. __kfifo_put和__kfifo_get,巧妙的入队和出队操作,无锁并发

    __kfifo_put是入队操作,它先将数据放入buffer里面,最后才修改in参数;__kfifo_get是出队操作,它先将数据从buffer中移走,最后才修改out。你会发现in和out两者各司其职。计算机科学家已经证明,当只有一个读经程和一个写线程并发操作时,不需要任何额外的锁,就可以确保是线程安全的,也即kfifo使用了无锁编程技术,以提高kernel的并发。

    下面是__kfifo_put和__kfifo_get的代码

    unsigned int __kfifo_put(struct kfifo *fifo,   
                 unsigned char *buffer, unsigned int len)   
    {   
        unsigned int l;   
      
        len = min(len, fifo->size - fifo->in + fifo->out);   
      
        /*  
         * Ensure that we sample the fifo->out index -before- we  
         * start putting bytes into the kfifo.  
         */   
      
        smp_mb();   
      
        /* first put the data starting from fifo->in to buffer end */   
        l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));   
        memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);   
      
        /* then put the rest (if any) at the beginning of the buffer */   
        memcpy(fifo->buffer, buffer + l, len - l);   
      
        /*  
         * Ensure that we add the bytes to the kfifo -before-  
         * we update the fifo->in index.  
         */   
      
        smp_wmb();   
      
        fifo->in += len;   
      
        return len;   
    }  
      
    unsigned int __kfifo_get(struct kfifo *fifo,   
                 unsigned char *buffer, unsigned int len)   
    {   
        unsigned int l;   
      
        len = min(len, fifo->in - fifo->out);   
      
        /*  
         * Ensure that we sample the fifo->in index -before- we  
         * start removing bytes from the kfifo.  
         */   
      
        smp_rmb();   
      
        /* first get the data from fifo->out until the end of the buffer */   
        l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));   
        memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);   
      
        /* then get the rest (if any) from the beginning of the buffer */   
        memcpy(buffer + l, fifo->buffer, len - l);   
      
        /*  
         * Ensure that we remove the bytes from the kfifo -before-  
         * we update the fifo->out index.  
         */   
      
        smp_mb();   
      
        fifo->out += len;   
      
        return len;   
    }   

     认真读两遍吧,我也读了多次,每次总是有新发现,因为in, out和size的关系太巧妙了,竟然能利用上unsigned int回绕的特性。

    原来,kfifo每次入队或出队,kfifo->in或kfifo->out只是简单地kfifo->in/kfifo->out += len,并没有对kfifo->size 进行取模运算。因此kfifo->in和kfifo->out总是一直增大,直到unsigned in最大值时,又会绕回到0这一起始端。但始终满足kfifo->out < kfifo->in,除非kfifo->in回绕到了0的那一端,即使如此,代码中计算长度的性质仍然是保持的。

    我们先用简单的例子来形象说明这些性质吧:

    +----------------------------------------+ 
    |                                   |<—data--->|  | 
    +----------------------------------------+ 
                                        ^                 ^ 
                                        |                   | 
                                        out               in

     

    上图的out和in为kfifo->buffer的出队数据和入队数据的一下,方框为buffer的内存区域。当有数据入队时,那么in的值可能超过kfifo->size的值,那么我们使用另一个虚拟的方框来表示in变化后,在buffer内对kfifo->size取模的值。如下图如标:

     

         真实的buffer内存                                     虚拟的buffer内存,方便查看in对kfifo->size取模后在buffer的下标 
    +----------------------------------------+ +------------------------------------+ 
    |                                   |<—data-------|  |--------->|                                    | 
    +----------------------------------------+ +------------------------------------+ 
                                        ^                                       ^ 
                                        |                                        | 
                                        out                                    in

    当用户调用__kfifo_put函数,入队的数据使kfifo的内存关系,引起上述两图的变化时,要拷贝两次内存。

    因为入队数据,一部存放在kfifo->buffer的尾部,另一部分存放在kfifo->buffer的头部,计算公式非常简单。

    l = kfifo->size – kfifo->in & (kfifo->size – 1) 表示in下标到buffer末尾,还有多少空间。

    如果len表示需要拷贝的长度的话,那么len - l则表示有多少字节需要拷贝到buffer开始之处。

    这样,我们读__kfifo_put代码就很容易了。

    len = min(len, fifo->size - fifo->in + fifo->out);

    fifo->in – fifo->out表示队列里面已使用的空间大小,fifo->size - (fifo->in – fifo->out)表示队列未使用的空间,

    因此en = min(…),取两者之小,表示实际要拷贝的字节数。

    拷贝len个字符数,fifo->in到buffer末尾所剩的空间是多少,这里面计算:

    l = min(len, fifo->size - (fifo->in & (fifo->size - 1))); 
    memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);

    /* then put the rest (if any) at the beginning of the buffer */ 
    memcpy(fifo->buffer, buffer + l, len - l);

    l表示len或fifo->in到buffer末尾所剩的空进行间大小的最小值,因为需要拷l字节到fifo->buffer + fifo->in的位置上;那么剩下要拷贝到buffer开始之处的长度为len – l,当然,此值可能会为0,为0 时,memcpy函数不进行任何拷贝。

    所有的拷贝完成后(可能是一次,也可能是两次memcpy),fifo->in 直接+= len,不需要取模运算。

    写到这里,细心的读者会发现,如果fifo->in超过了unsigned int的最大值时,而回绕到0这一端,上述的计算公式还正确吗? 答案是肯定的。

    因为fifo->size的大小是2的幂,而unsigned int空间的大小是2^32,后者刚好是前者的倍数。如果从上述两个图的方式来描述,则表示unsigned int空间的数轴上刚好可以划成一定数量个kfifo->size大小方框,没有长度多余。这样,fifo->in/fifo->out对fifo->size取模后,刚好落后对应的位置上。

    现在假设往kfifo加入数据后,使用fifo->in < fifo->out关系,如下:

    +----------------------------------------+                                   +------------------------------------+ 
    |—data—>|                                          |          ……                     |                               |<--data------| 
    +----------------------------------------+                                   +------------------------------------+ 
    |-------------------------------------------------------------------------------------------------------->| 
    0                                                                                                                                         0xffffffff 
                  ^                                                                                                                ^ 
                  |                                                                                                                  | 
                  in                                                                                                                out

     

    假设kfifo中数据的长度为ldata,那么fifo->in和fifo->out有这样的关系:fifo->in = fifo->out + ldata,并且fifo->in < fifo->out。这说明fifo->in 回绕到0这一段了,尽管如此,fifo->in和fifo->out的差距还是保持的,没有变化。即fifo->in – fifo->out仍然是ldata, 那么此时的可用空间是fifo->size – ldata = fifo->size - (fifo->in – fifo->out) = fifo->size – fifo->in + fifo->out。

    因此无论fifo->in和fifo->out谁大谁小,计算fifo剩余空间大小的公式fifo->size – fifo->in + fifo->out都正确,故可以保证__kfifo_put函数里面的长度计算均是正确的。

    __kfifo_get函数使用fifo->in – fifo->out来计算fifo内数据的空间长度,然后再后需要出队的数据,是否需要两次拷贝。其中原理和方法都和__kfifo_put是一样的。

    4. 总结

    读完kfifo代码,令我想起那首诗“众里寻他千百度,默然回首,那人正在灯火阑珊处”。不知你是否和我一样,总想追求简洁,高质量和可读性的代码,当用尽各种方法,江郞才尽之时,才发现Linux kernel里面的代码就是我们寻找和学习的对象。

     

    展开全文
    wag2765 2016-04-08 22:59:30
  • z69183787 2019-03-27 19:11:17
  • hhl3065212 2020-07-19 12:35:14
  • u013860464 2021-12-07 16:16:06
  • nicai888 2017-05-05 15:12:37
  • das59202 2019-10-02 12:32:25
  • u011740530 2017-06-26 16:33:41
  • yzf279533105 2021-11-03 19:03:58
  • Rong_Toa 2020-07-27 16:43:47
  • s2603898260 2020-11-08 14:54:06
  • s2603898260 2020-11-08 22:18:57
  • yizhiniu_xuyw 2021-02-21 15:25:48
  • wag2765 2013-07-25 15:42:18

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,378
精华内容 551
关键字:

无锁环形缓冲