精华内容
下载资源
问答
  • Linux软中断处理实现 (2012-02-10 22:32) 标签: Linux 分类: Linux内核初始化 一、概念   首先我们要知道为什么中断需要下半部 。我们可以想象一下,如果没有下半部的概念,一个网卡中断过来了...
    Linux的软中断处理实现 (2012-02-10 22:32)


    一、概念
     
    首先我们要知道为什么中断需要下半部 。我们可以想象一下,如果没有下半部的概念,一个网卡中断过来了以后会是什么样的情况。首先,我们会从网卡硬件buffer中把网卡收到的packet拷贝到系统内存中,然后对这个packet进行TCP/IP协议栈的处理。我们知道TCP/IP协议栈是一个比较复杂的软件模块,里面对packet的处理会经过非常多的步骤,首先是链路层,然后是IP层(这里又包括分片,奇偶校验之类的),然后是TCP层(TCP层的实现相当复杂,会花费比较长的时间对packet进行一些状态或者内容的分析处理),最后通过socket把packet传入用户空间。在传入用户空间之间的这些动作,都必须在中断处理中完成,因为这些操作都是在kernel中的,并且这些操作会花费比较长的时间。在这段时间里,cpu由于进入了中断门,会自动关中断,也就是说cpu不会去响应在这段时间里网卡另外发过来的中断,这样的话很有可能网卡硬件buffer会由于网卡自身的缓存不足而导致丢包    。所以linux为了解决这样的问题,把copy packet这样比较紧急的动作放在了上半部去处理(上半部默认情况下是在关中断中完成的),把协议栈这些不是特别紧急的任务放到了下半部去处理(下半部是在开中断中进行的,有就是说,处理下半部的过程中,允许cpu被其他中断打断)。

    二、软件构架和实现

    1. 一些基础数据结构

    文件softirq.c

    /*PER-CPU变量,每个cpu对应一个,描述当前cpu中关于softirq的一些状态,比如是否有softirq挂起需要执行等等*/

    irq_cpustat_t irq_stat[NR_CPUS] ____cacheline_aligned;

    typedef struct {

        unsigned int __softirq_pending; /*32位,对应Linux中32种softirq是否被上半部触发了(为1表示被触发,为0表示未被触发)*/

        unsigned long idle_timestamp;

        unsigned int __nmi_count;   /* arch dependent */

        unsigned int apic_timer_irqs;   /* arch dependent */

    } ____cacheline_aligned irq_cpustat_t;

     

    /*表示softirq最多有32种类型,实际上Linux只用了6种,见文件interrupt.h*/

    static struct softirq_action softirq_vec[32] __cacheline_aligned_in_smp;

     

    /* PLEASE, avoid to allocate new softirqs, if you need not _really_ high

       frequency threaded job scheduling. For almost all the purposes

       tasklets are more than enough. F.e. all serial device BHs et

       al. should be converted to tasklets, not to softirqs.

     */

     

    enum

    {

        HI_SOFTIRQ=0,   /*用于高优先级的tasklet*/

        TIMER_SOFTIRQ, /*用于定时器的下半部*/

        NET_TX_SOFTIRQ,/*用于网络层发包*/

        NET_RX_SOFTIRQ, /*用于网络层收包*/

        SCSI_SOFTIRQ,   /*用于SCSI设备*/

        TASKLET_SOFTIRQ /*用于低优先级的tasklet*/

    }; 

    struct softirq_action

    {

        void    (*action)(struct softirq_action *);  /*softirq的回调函数*/

        void    *data; /*传入action的参数*/

    };

    Struct softirq_action是每个softirq的配置结构,一般在系统启动的时候,6个不同的softirq,会通过函数open_softirq()来注册自己的softirq_action,实现很简单:

    void open_softirq(int nr, void (*action)(struct softirq_action*), void *data)

    {

        softirq_vec[nr].data = data;

        softirq_vec[nr].action = action;

    }

    关键是传入的函数指针,具体指明了该softirq要实现的功能或要做的动作。

    这里分开看下这六个注册点:

    Net/core/dev.c中的net_dev_init()里面注册了网络层需要用到的收包和发包的两个softirq:

        open_softirq(NET_TX_SOFTIRQ, net_tx_action, NULL);

        open_softirq(NET_RX_SOFTIRQ, net_rx_action, NULL);

    对应的函数指针为net_tx_action和net_rx_action,具体功能就不在本文的范围之内的。

     

    Driver/scsi/scsi.c中init_scsi()注册了SCSI_SOFTIRQ

    open_softirq(SCSI_SOFTIRQ, scsi_softirq, NULL);

     

    start_kernel() àsiftirq_init() 中注册了两种tasklet,一种是高优先级的tasklet,一直是低优先级的tasklet:

           open_softirq(TASKLET_SOFTIRQ, tasklet_action, NULL);

           open_softirq(HI_SOFTIRQ, tasklet_hi_action, NULL);

     

    start_kernel() àinit_timers() 中注册了TIMER_SOFTIRQ

           open_softirq(TIMER_SOFTIRQ, run_timer_softirq, NULL);

     

    2. softirq运行时机:

    系统在运行过程中,会在合适的地方使用函数local_softirq_pending()检查系统是否有softirq需要处理;需要时会调用函数do_softirq()进行处理。这些检查点主要包括以下几个地方:

    (1)中断过程退出函数irq_exit();

    (2)内核线程ksoftirqd;

    (3)内核网络子系统中显示调用;

    (4)函数local_bh_enable().

     

    先前我们分析irq_cpustat_t结构的时候,看到__softirq_pending字段。这是一个32位无符号的变量,对应Linux中32种softirq是否被上半部触发了(为1表示被触发,为0表示未被触发)。那么在softirq运行之前肯定就有地方设置了这个变量的各个位,才会触发到softirq运行。这个触发动作一般是在上半部中进行的,即上半部通过系统,还有下半部需要运行。触发函数如下:

    #define __raise_softirq_irqoff(nr) do { or_softirq_pending(1UL << (nr)); } while (0)

    #define or_softirq_pending(x)  (local_softirq_pending() |= (x))

    #define local_softirq_pending() \

           __IRQ_STAT(smp_processor_id(), __softirq_pending)

    #define __IRQ_STAT(cpu, member)     (irq_stat[cpu].member)

    由此可以看出,  __raise_softirq_irqoff(nr)实际上是把当前cpu的per-cpu变量irq_stat的__softirq_pending 的从右往左数的第nr位置1,这样系统就知道某个softirq需要在某个时刻运行了。

     

    当然,__raise_sofritq_irqoff()被很多地方封装过,不同子系统用自己封装的函数,比如网络子系统就用netif_rx_reschedule和net_rx_action来激活softirq。

     3.softirq的执行分析:

    我们先提到了softirq在四个地方有可能运行,最常见的就是中断过程退出函数irq_exit(),我们下面分析之,其他的触发点请大家对照代码自行分析:

     

    void irq_exit(void)

    {

           account_system_vtime(current);

           sub_preempt_count(IRQ_EXIT_OFFSET);

           /*如果在上半部中设置了per-cpu变量irq_stat的__softirq_pending字段,则运行下半部的处理函数, 从这里也可以看出,上半部和下半部一定是运行在同一个cpu上,因为上半部中是设置了per-cpu变量irq_stat本地cpu副本中的__softirq_pending中的位,而下半部也只是判断本地cpu的__sofrirq_pending中的位。这样有效的利用了cpu cache的特性*/

           if (!in_interrupt() && local_softirq_pending())

                  invoke_softirq();

           preempt_enable_no_resched();

    }

     

     Invoke_sofirq()àdo_softirq()

     

    asmlinkage void do_softirq(void)

    {

           __u32 pending;

           unsigned long flags;

            /*本地cpu中,softirq不能在中断环境中运行,这个中断环境包括了上半部和下半部,所以这里保证了同一个cpu上,下半部是不会被重入的,但不能保证其他cpu上的同时运行同一个softirq处理。所以编程人员必须让自己编写的softirq处理函数可重入,以防SMP系统中同时运行这些softirq导致数据出现不一致性*/

           if (in_interrupt())

                  return;

     

           local_irq_save(flags);  /*关中断*/

     

           pending = local_softirq_pending(); /*取得per-cpu变量irq_stat本地cpu副本的__softirq_pending的值*/

     

           if (pending) /*如果有本地cpu有softirq挂起需要处理,则通过__do_softirq()运行之,否则恢复中断并退出*/

                  __do_softirq();

     

           local_irq_restore(flags);  /*恢复开中断*/

    }

     

    关键是__do_softirq()

    asmlinkage void __do_softirq(void)

    {

           struct softirq_action *h;

           __u32 pending;

           int max_restart = MAX_SOFTIRQ_RESTART;

           int cpu;

     

           pending = local_softirq_pending();

     /*这个地方加local_bh_disable的原因是softirq的处理必须是串行的,又因为softirq的执行期间中断是打开的,所以当另一个中断被执行的话在in_interrupt函数上面就会发现这里已经有在执行了,就会自动退出*/

           local_bh_disable();


           cpu = smp_processor_id();

    restart:

           /* Reset the pending bitmask before enabling irqs */

            /*把per-cpu变量 irq_stat的本地cpu副本的__softirq_pengding字段清0,

            表示代码有信心在这一次处理中把本地cpu上挂起的所有softirq都处理掉(有信心只是开个玩笑;)*/

           set_softirq_pending(0);

     

           local_irq_enable(); /*保证下半部要在中断打开的情况下进行,否则下半部就失去意义了*/

     

           h = softirq_vec; /*softirq_vec是一个全局数组(有32个元素),存放了32种softirq的处理函数*/

     

           /*遍历unsigned int pengding的每一位,如果有被置为1,则运行对应的下半部处理函数action*/

           do {

                  if (pending & 1) {

                           /*action是一个处理队列,对于非tasklet的softirq来说只有一个元素,

                           但对于tasklet来说,就有N个函数需要处理*/

                         h->action(h);

                         rcu_bh_qsctr_inc(cpu);

                  }

                  h++;

                  pending >>= 1;

           } while (pending);

     

           local_irq_disable();  /*关闭中断*/

          

           /*因为下半部是在开中断的环境中运行的,

           所以有可能在运行了softirq A以后,

           然后在运行其他的softirq B,

           这时又产生A的硬件中断(A和B在同一个cpu中产生),

           而在A的上半部中又设置了per-cpu变量irq_stat的本地

           cpu副本的irq_stat的__softirq_pending的对应的bit,

           所以代码运行到这里又发现__softirq_pending不0,

           所以要重做处理。这样的情况最多执行max_restart次,因为

           如果不限次数的运行下去,中断就一直不返回,那么进程

           就得不到调度,系统性能会大大受影响。所以运行max_restart次以后,

           如果这样的情况还在一直发生,那么就唤醒per-cpu thread来专门执行

           这些下半部,注意,在per-cpu thread中处理的中断下半部,是可以睡眠

           的,但是编程人员无法掌握他编写的softirq处理程序是在irq_exit()中处理还是

           在per-cpu thread中处理,所以一般都不会有睡眠的可能(编程人员需要保证

           这一点)*/

           pending = local_softirq_pending();

           if (pending && --max_restart)

                  goto restart;

     

           if (pending)

                  wakeup_softirqd();

     

           __local_bh_enable(); /*enable下半部运行*/

    }

     

    这里就不画流程图了,关键是要仔细的分析几个中断关闭和中断使能的时机,以及softirq是否可以重入的问题。

     

    三总结

    本文分析了softirq运行的时间点,以及softirq是怎样被cpu调度的。后面还要继续分析tasklet的实现,tasklet实际上就是凌驾在softirq机制上的,它占用了Linux现有6种softirq的2种(优先级最高的和优先级最低的)。

    要特别注意的是:softirq处理函数也不能睡眠,因为它也是运行在中断上下文环境中的(不考虑ksoftirqd线程)。

    一、为什么要进入tasklet

    我们在softirq的文章中分析过,在SMP系统中,任何一个处理器在响应外设中断请求,完成中断上半部处理后,都可以调用函数do_softirq()来处理构建在softirq机制上的下半部。也就是说,softirq处理函数在SMP系统中是可以并行执行的,这要求使用softirq机制的下半部必须是多处理器可重入的。这对于一般的驱动程序开发者而言, 事情会变得复杂化、难度增大。为了降低驱动开发难度必须提供一套有效的机制,tasklet就是为了解决这一问题而出现的。

     

    二、tasklet实现分析

    1. 一个实例

    #include <linux/module.h>

    #include <linux/init.h>

    #include <linux/fs.h>

    #include <linux/kdev_t.h>

    #include <linux/cdev.h>

    #include <linux/kernel.h>

    #include <linux/interrupt.h>

     

    static struct tasklet_struct my_tasklet;  /*定义自己的tasklet_struct变量*/

     

    static void tasklet_handler (unsigned long data)

    {

            printk(KERN_ALERT “tasklet_handler is running.\n”);

    }

     

    static int __init test_init(void)

    {

            tasklet_init(&my_tasklet, tasklet_handler, 0); /*挂入钩子函数tasklet_handler*/

            tasklet_schedule(&my_tasklet); /* 触发softirq的TASKLET_SOFTIRQ,在下一次运行softirq时运行这个tasklet*/ 

            return 0;

    }

     

    static void __exit test_exit(void)

    {

            tasklet_kill(&my_tasklet); /*禁止该tasklet的运行*/

            printk(KERN_ALERT “test_exit running.\n”);

    }

    MODULE_LICENSE(“GPL”);

     

    module_init(test_init);

    module_exit(test_exit);

     

    运行结果如图:

     

    <IMG border=0 src="http://blogimg.chinaunix.net/blog/upfile2/090420111812.jpg" width=500 οnlοad="javascript:if(this.width>500)this.width=500;">

     

    2.       实现分析

    我们就从上面这个实例入手来分析tasklet的实现,

    在init中,通过函数tasklet_init()来初始化自己需要注册到系统中的tasklet结构:

    void tasklet_init(struct tasklet_struct *t,

                    void (*func)(unsigned long), unsigned long data)

    {

           t->next = NULL;

           t->state = 0;

           atomic_set(&t->count, 0);

           t->func = func;

           t->data = data;

    }

    很简单,只是初始化tasklet_struct的各个字段,挂上钩子函数。

     

    然后,通过函数tasklet_schedule()来触发该tasklet

    static inline void tasklet_schedule(struct tasklet_struct *t)

    {

            /*如果需要调度的tasklet的state不为TASKLET_STATE_SCHED,则触发之。这样,就保证了多个cpu不可能同时运行同一个tasklet,因为如果一个tasklet被调度过一次,那么它的state字段就会被设置TASKLET_STATE_SCHED标记,然后插入per-cpu变量的链表中。如果这时另外一个cpu也去调度该tasklet,那么就会在下面的if语句中被挡掉,不会运行到__tasklet_schedule(),从而不会插入到另外这个cpu的per-cpu变量的链表中,就不会被运行到。所以这里是保证了tasklet编写的函数不用是可重入的,这样就方便了编程人员。(注意,softirq机制需要编写可重入的函数)*/

           if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))

                  __tasklet_schedule(t);

    }

     

    我们来看__tasklet_schedule()的实现:

    void fastcall __tasklet_schedule(struct tasklet_struct *t)

    {

           unsigned long flags;

     

           local_irq_save(flags);

            /*把需要添加进系统的自己编写的struct tasklet_struc加入

            到per-cpu变量tasklet_vec的本地副本的链表的表头中*/

           t->next = __get_cpu_var(tasklet_vec).list;

           __get_cpu_var(tasklet_vec).list = t;

           raise_softirq_irqoff(TASKLET_SOFTIRQ); /*触发softirq的TASKLET_SOFTIRQ*/ 

           local_irq_restore(flags);

    }

    这段代码也非常简单,只是把自己要注册到系统中的tasklet_struct挂入到per-cpu变量tasklet_vec的list中而已,这里是挂到链表首部。因为需要修改per-cpu变量tasklet_vec的list的值,为了防止中断处理程序也去修改这个值,所以要加自旋锁,为了保持数据的一致性。

    然后通过raise_softirq_irqoff()设置低优先级的tasklet对应的softirq标记,以便cpu在运行softirq的时候运行到tasklet,因为tasklet是凌驾在softirq机制之上的。

     

    OK,这里就完成了我们自己的my_tasklet的注册和触发对应的softirq,那我们现在就应该分析tasklet的运行了。

    我们前面提到,tasklet是凌驾在softirq机制之上的。还记得前面说到了Linux中有六种softirq,优先级最高的是HI_SOFTIRQ,优先级最低的是TASKLET_SOFTIRQ,一般情况下我们是利用TASKLET_SOFTIRQ来实现tasklet的功能。

           open_softirq(TASKLET_SOFTIRQ, tasklet_action, NULL);中定义了处理tasklet的处理函数tasklet_action.所以我们要分析这个函数的实现:

     

    static void tasklet_action(struct softirq_action *a)

    {

           struct tasklet_struct *list;

     

           /*把per-cpu变量tasklet_vec的本地副本上的list设置为NULL,

           由于这里要修改per-cpu变量,为了防止中断处理程序

           或者内核抢占造成该数据的不一致性,所以这里禁止中断再修改数据

           ,然后再开启中断.(注意,关闭本地中断的副作用就是禁止内核抢占,

           因为内核抢占只有两个时间点: 1.中断返回到内核态;2.手动使能内核抢占。

           明显程序员不会在临界区内手动使能内核抢占,所以关闭本地中断的

           副作用就是禁止内核抢占)*/

           local_irq_disable();

           list = __get_cpu_var(tasklet_vec).list;

           __get_cpu_var(tasklet_vec).list = NULL;

           local_irq_enable();

     

           /*遍历tasklet链表,让链表上挂入的函数全部执行完成*/

           while (list) {

                  struct tasklet_struct *t = list;

     

                  list = list->next;

     

                  if (tasklet_trylock(t)) {

                         if (!atomic_read(&t->count)) {

                                if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state))

                                       BUG();

                                t->func(t->data); /*真正运行user注册的tasklet函数的地方*/

                                tasklet_unlock(t);

                                continue;

                         }

                         tasklet_unlock(t);

                  }

     

                  /*这里相当于把tasklet的list指针从链表中后移了(可以自行画图分析),

                  所以刚才运行过的tasklet回调函数以后不会再次运行,除非用于再次

                  通过tasklet_schedule()注册之*/

                  local_irq_disable();

                  t->next = __get_cpu_var(tasklet_vec).list;

                  __get_cpu_var(tasklet_vec).list = t;

                  __raise_softirq_irqoff(TASKLET_SOFTIRQ);  /*再一次触发tasklet对应的softirq,使下次系统运行softirq时能运行到tasklet*/

                  local_irq_enable();

           }

    }

    运行流程是不是很简单呢?呵呵。只要注意到加锁的时机就OK了!

     

     

    三、总结

    Tasklet与一般的softirq的比较重要的一个区别在于: softirq处理函数需要被编写成可重入的,因为多个cpu可能同时执行同一个softirq处理函数,为了防止数据出现不一致性,所以softirq的处理函数必须被编写成可重入。最典型的就是要在softirq处理函数中用spinlock保护一些共享资源。而tasklet机制本身就保证了tasklet处理函数不会同时被多个cpu调度到。因为在tasklet_schedule()中,就保证了多个cpu不可能同时调度到同一个tasklet处理函数,这样tasklet就不用编写成可重入的处理函数,这样就大大减轻了kernel编程人员的负担。

    (2012-02-10 22:32)


    一、概念
     
    首先我们要知道为什么中断需要下半部 。我们可以想象一下,如果没有下半部的概念,一个网卡中断过来了以后会是什么样的情况。首先,我们会从网卡硬件buffer中把网卡收到的packet拷贝到系统内存中,然后对这个packet进行TCP/IP协议栈的处理。我们知道TCP/IP协议栈是一个比较复杂的软件模块,里面对packet的处理会经过非常多的步骤,首先是链路层,然后是IP层(这里又包括分片,奇偶校验之类的),然后是TCP层(TCP层的实现相当复杂,会花费比较长的时间对packet进行一些状态或者内容的分析处理),最后通过socket把packet传入用户空间。在传入用户空间之间的这些动作,都必须在中断处理中完成,因为这些操作都是在kernel中的,并且这些操作会花费比较长的时间。在这段时间里,cpu由于进入了中断门,会自动关中断,也就是说cpu不会去响应在这段时间里网卡另外发过来的中断,这样的话很有可能网卡硬件buffer会由于网卡自身的缓存不足而导致丢包    。所以linux为了解决这样的问题,把copy packet这样比较紧急的动作放在了上半部去处理(上半部默认情况下是在关中断中完成的),把协议栈这些不是特别紧急的任务放到了下半部去处理(下半部是在开中断中进行的,有就是说,处理下半部的过程中,允许cpu被其他中断打断)。

    二、软件构架和实现

    1. 一些基础数据结构

    文件softirq.c

    /*PER-CPU变量,每个cpu对应一个,描述当前cpu中关于softirq的一些状态,比如是否有softirq挂起需要执行等等*/

    irq_cpustat_t irq_stat[NR_CPUS] ____cacheline_aligned;

    typedef struct {

        unsigned int __softirq_pending; /*32位,对应Linux中32种softirq是否被上半部触发了(为1表示被触发,为0表示未被触发)*/

        unsigned long idle_timestamp;

        unsigned int __nmi_count;   /* arch dependent */

        unsigned int apic_timer_irqs;   /* arch dependent */

    } ____cacheline_aligned irq_cpustat_t;

     

    /*表示softirq最多有32种类型,实际上Linux只用了6种,见文件interrupt.h*/

    static struct softirq_action softirq_vec[32] __cacheline_aligned_in_smp;

     

    /* PLEASE, avoid to allocate new softirqs, if you need not _really_ high

       frequency threaded job scheduling. For almost all the purposes

       tasklets are more than enough. F.e. all serial device BHs et

       al. should be converted to tasklets, not to softirqs.

     */

     

    enum

    {

        HI_SOFTIRQ=0,   /*用于高优先级的tasklet*/

        TIMER_SOFTIRQ, /*用于定时器的下半部*/

        NET_TX_SOFTIRQ,/*用于网络层发包*/

        NET_RX_SOFTIRQ, /*用于网络层收包*/

        SCSI_SOFTIRQ,   /*用于SCSI设备*/

        TASKLET_SOFTIRQ /*用于低优先级的tasklet*/

    }; 

    struct softirq_action

    {

        void    (*action)(struct softirq_action *);  /*softirq的回调函数*/

        void    *data; /*传入action的参数*/

    };

    Struct softirq_action是每个softirq的配置结构,一般在系统启动的时候,6个不同的softirq,会通过函数open_softirq()来注册自己的softirq_action,实现很简单:

    void open_softirq(int nr, void (*action)(struct softirq_action*), void *data)

    {

        softirq_vec[nr].data = data;

        softirq_vec[nr].action = action;

    }

    关键是传入的函数指针,具体指明了该softirq要实现的功能或要做的动作。

    这里分开看下这六个注册点:

    Net/core/dev.c中的net_dev_init()里面注册了网络层需要用到的收包和发包的两个softirq:

        open_softirq(NET_TX_SOFTIRQ, net_tx_action, NULL);

        open_softirq(NET_RX_SOFTIRQ, net_rx_action, NULL);

    对应的函数指针为net_tx_action和net_rx_action,具体功能就不在本文的范围之内的。

     

    Driver/scsi/scsi.c中init_scsi()注册了SCSI_SOFTIRQ

    open_softirq(SCSI_SOFTIRQ, scsi_softirq, NULL);

     

    start_kernel() àsiftirq_init() 中注册了两种tasklet,一种是高优先级的tasklet,一直是低优先级的tasklet:

           open_softirq(TASKLET_SOFTIRQ, tasklet_action, NULL);

           open_softirq(HI_SOFTIRQ, tasklet_hi_action, NULL);

     

    start_kernel() àinit_timers() 中注册了TIMER_SOFTIRQ

           open_softirq(TIMER_SOFTIRQ, run_timer_softirq, NULL);

     

    2. softirq运行时机:

    系统在运行过程中,会在合适的地方使用函数local_softirq_pending()检查系统是否有softirq需要处理;需要时会调用函数do_softirq()进行处理。这些检查点主要包括以下几个地方:

    (1)中断过程退出函数irq_exit();

    (2)内核线程ksoftirqd;

    (3)内核网络子系统中显示调用;

    (4)函数local_bh_enable().

     

    先前我们分析irq_cpustat_t结构的时候,看到__softirq_pending字段。这是一个32位无符号的变量,对应Linux中32种softirq是否被上半部触发了(为1表示被触发,为0表示未被触发)。那么在softirq运行之前肯定就有地方设置了这个变量的各个位,才会触发到softirq运行。这个触发动作一般是在上半部中进行的,即上半部通过系统,还有下半部需要运行。触发函数如下:

    #define __raise_softirq_irqoff(nr) do { or_softirq_pending(1UL << (nr)); } while (0)

    #define or_softirq_pending(x)  (local_softirq_pending() |= (x))

    #define local_softirq_pending() \

           __IRQ_STAT(smp_processor_id(), __softirq_pending)

    #define __IRQ_STAT(cpu, member)     (irq_stat[cpu].member)

    由此可以看出,  __raise_softirq_irqoff(nr)实际上是把当前cpu的per-cpu变量irq_stat的__softirq_pending 的从右往左数的第nr位置1,这样系统就知道某个softirq需要在某个时刻运行了。

     

    当然,__raise_sofritq_irqoff()被很多地方封装过,不同子系统用自己封装的函数,比如网络子系统就用netif_rx_reschedule和net_rx_action来激活softirq。

     3.softirq的执行分析:

    我们先提到了softirq在四个地方有可能运行,最常见的就是中断过程退出函数irq_exit(),我们下面分析之,其他的触发点请大家对照代码自行分析:

     

    void irq_exit(void)

    {

           account_system_vtime(current);

           sub_preempt_count(IRQ_EXIT_OFFSET);

           /*如果在上半部中设置了per-cpu变量irq_stat的__softirq_pending字段,则运行下半部的处理函数, 从这里也可以看出,上半部和下半部一定是运行在同一个cpu上,因为上半部中是设置了per-cpu变量irq_stat本地cpu副本中的__softirq_pending中的位,而下半部也只是判断本地cpu的__sofrirq_pending中的位。这样有效的利用了cpu cache的特性*/

           if (!in_interrupt() && local_softirq_pending())

                  invoke_softirq();

           preempt_enable_no_resched();

    }

     

     Invoke_sofirq()àdo_softirq()

     

    asmlinkage void do_softirq(void)

    {

           __u32 pending;

           unsigned long flags;

            /*本地cpu中,softirq不能在中断环境中运行,这个中断环境包括了上半部和下半部,所以这里保证了同一个cpu上,下半部是不会被重入的,但不能保证其他cpu上的同时运行同一个softirq处理。所以编程人员必须让自己编写的softirq处理函数可重入,以防SMP系统中同时运行这些softirq导致数据出现不一致性*/

           if (in_interrupt())

                  return;

     

           local_irq_save(flags);  /*关中断*/

     

           pending = local_softirq_pending(); /*取得per-cpu变量irq_stat本地cpu副本的__softirq_pending的值*/

     

           if (pending) /*如果有本地cpu有softirq挂起需要处理,则通过__do_softirq()运行之,否则恢复中断并退出*/

                  __do_softirq();

     

           local_irq_restore(flags);  /*恢复开中断*/

    }

     

    关键是__do_softirq()

    asmlinkage void __do_softirq(void)

    {

           struct softirq_action *h;

           __u32 pending;

           int max_restart = MAX_SOFTIRQ_RESTART;

           int cpu;

     

           pending = local_softirq_pending();

     /*这个地方加local_bh_disable的原因是softirq的处理必须是串行的,又因为softirq的执行期间中断是打开的,所以当另一个中断被执行的话在in_interrupt函数上面就会发现这里已经有在执行了,就会自动退出*/

           local_bh_disable();


           cpu = smp_processor_id();

    restart:

           /* Reset the pending bitmask before enabling irqs */

            /*把per-cpu变量 irq_stat的本地cpu副本的__softirq_pengding字段清0,

            表示代码有信心在这一次处理中把本地cpu上挂起的所有softirq都处理掉(有信心只是开个玩笑;)*/

           set_softirq_pending(0);

     

           local_irq_enable(); /*保证下半部要在中断打开的情况下进行,否则下半部就失去意义了*/

     

           h = softirq_vec; /*softirq_vec是一个全局数组(有32个元素),存放了32种softirq的处理函数*/

     

           /*遍历unsigned int pengding的每一位,如果有被置为1,则运行对应的下半部处理函数action*/

           do {

                  if (pending & 1) {

                           /*action是一个处理队列,对于非tasklet的softirq来说只有一个元素,

                           但对于tasklet来说,就有N个函数需要处理*/

                         h->action(h);

                         rcu_bh_qsctr_inc(cpu);

                  }

                  h++;

                  pending >>= 1;

           } while (pending);

     

           local_irq_disable();  /*关闭中断*/

          

           /*因为下半部是在开中断的环境中运行的,

           所以有可能在运行了softirq A以后,

           然后在运行其他的softirq B,

           这时又产生A的硬件中断(A和B在同一个cpu中产生),

           而在A的上半部中又设置了per-cpu变量irq_stat的本地

           cpu副本的irq_stat的__softirq_pending的对应的bit,

           所以代码运行到这里又发现__softirq_pending不0,

           所以要重做处理。这样的情况最多执行max_restart次,因为

           如果不限次数的运行下去,中断就一直不返回,那么进程

           就得不到调度,系统性能会大大受影响。所以运行max_restart次以后,

           如果这样的情况还在一直发生,那么就唤醒per-cpu thread来专门执行

           这些下半部,注意,在per-cpu thread中处理的中断下半部,是可以睡眠

           的,但是编程人员无法掌握他编写的softirq处理程序是在irq_exit()中处理还是

           在per-cpu thread中处理,所以一般都不会有睡眠的可能(编程人员需要保证

           这一点)*/

           pending = local_softirq_pending();

           if (pending && --max_restart)

                  goto restart;

     

           if (pending)

                  wakeup_softirqd();

     

           __local_bh_enable(); /*enable下半部运行*/

    }

     

    这里就不画流程图了,关键是要仔细的分析几个中断关闭和中断使能的时机,以及softirq是否可以重入的问题。

     

    三总结

    本文分析了softirq运行的时间点,以及softirq是怎样被cpu调度的。后面还要继续分析tasklet的实现,tasklet实际上就是凌驾在softirq机制上的,它占用了Linux现有6种softirq的2种(优先级最高的和优先级最低的)。

    要特别注意的是:softirq处理函数也不能睡眠,因为它也是运行在中断上下文环境中的(不考虑ksoftirqd线程)。

    一、为什么要进入tasklet

    我们在softirq的文章中分析过,在SMP系统中,任何一个处理器在响应外设中断请求,完成中断上半部处理后,都可以调用函数do_softirq()来处理构建在softirq机制上的下半部。也就是说,softirq处理函数在SMP系统中是可以并行执行的,这要求使用softirq机制的下半部必须是多处理器可重入的。这对于一般的驱动程序开发者而言, 事情会变得复杂化、难度增大。为了降低驱动开发难度必须提供一套有效的机制,tasklet就是为了解决这一问题而出现的。

     

    二、tasklet实现分析

    1. 一个实例

    #include <linux/module.h>

    #include <linux/init.h>

    #include <linux/fs.h>

    #include <linux/kdev_t.h>

    #include <linux/cdev.h>

    #include <linux/kernel.h>

    #include <linux/interrupt.h>

     

    static struct tasklet_struct my_tasklet;  /*定义自己的tasklet_struct变量*/

     

    static void tasklet_handler (unsigned long data)

    {

            printk(KERN_ALERT “tasklet_handler is running.\n”);

    }

     

    static int __init test_init(void)

    {

            tasklet_init(&my_tasklet, tasklet_handler, 0); /*挂入钩子函数tasklet_handler*/

            tasklet_schedule(&my_tasklet); /* 触发softirq的TASKLET_SOFTIRQ,在下一次运行softirq时运行这个tasklet*/ 

            return 0;

    }

     

    static void __exit test_exit(void)

    {

            tasklet_kill(&my_tasklet); /*禁止该tasklet的运行*/

            printk(KERN_ALERT “test_exit running.\n”);

    }

    MODULE_LICENSE(“GPL”);

     

    module_init(test_init);

    module_exit(test_exit);

     

    运行结果如图:

     

    <IMG border=0 src="http://blogimg.chinaunix.net/blog/upfile2/090420111812.jpg" width=500 οnlοad="javascript:if(this.width>500)this.width=500;">

     

    2.       实现分析

    我们就从上面这个实例入手来分析tasklet的实现,

    在init中,通过函数tasklet_init()来初始化自己需要注册到系统中的tasklet结构:

    void tasklet_init(struct tasklet_struct *t,

                    void (*func)(unsigned long), unsigned long data)

    {

           t->next = NULL;

           t->state = 0;

           atomic_set(&t->count, 0);

           t->func = func;

           t->data = data;

    }

    很简单,只是初始化tasklet_struct的各个字段,挂上钩子函数。

     

    然后,通过函数tasklet_schedule()来触发该tasklet

    static inline void tasklet_schedule(struct tasklet_struct *t)

    {

            /*如果需要调度的tasklet的state不为TASKLET_STATE_SCHED,则触发之。这样,就保证了多个cpu不可能同时运行同一个tasklet,因为如果一个tasklet被调度过一次,那么它的state字段就会被设置TASKLET_STATE_SCHED标记,然后插入per-cpu变量的链表中。如果这时另外一个cpu也去调度该tasklet,那么就会在下面的if语句中被挡掉,不会运行到__tasklet_schedule(),从而不会插入到另外这个cpu的per-cpu变量的链表中,就不会被运行到。所以这里是保证了tasklet编写的函数不用是可重入的,这样就方便了编程人员。(注意,softirq机制需要编写可重入的函数)*/

           if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))

                  __tasklet_schedule(t);

    }

     

    我们来看__tasklet_schedule()的实现:

    void fastcall __tasklet_schedule(struct tasklet_struct *t)

    {

           unsigned long flags;

     

           local_irq_save(flags);

            /*把需要添加进系统的自己编写的struct tasklet_struc加入

            到per-cpu变量tasklet_vec的本地副本的链表的表头中*/

           t->next = __get_cpu_var(tasklet_vec).list;

           __get_cpu_var(tasklet_vec).list = t;

           raise_softirq_irqoff(TASKLET_SOFTIRQ); /*触发softirq的TASKLET_SOFTIRQ*/ 

           local_irq_restore(flags);

    }

    这段代码也非常简单,只是把自己要注册到系统中的tasklet_struct挂入到per-cpu变量tasklet_vec的list中而已,这里是挂到链表首部。因为需要修改per-cpu变量tasklet_vec的list的值,为了防止中断处理程序也去修改这个值,所以要加自旋锁,为了保持数据的一致性。

    然后通过raise_softirq_irqoff()设置低优先级的tasklet对应的softirq标记,以便cpu在运行softirq的时候运行到tasklet,因为tasklet是凌驾在softirq机制之上的。

     

    OK,这里就完成了我们自己的my_tasklet的注册和触发对应的softirq,那我们现在就应该分析tasklet的运行了。

    我们前面提到,tasklet是凌驾在softirq机制之上的。还记得前面说到了Linux中有六种softirq,优先级最高的是HI_SOFTIRQ,优先级最低的是TASKLET_SOFTIRQ,一般情况下我们是利用TASKLET_SOFTIRQ来实现tasklet的功能。

           open_softirq(TASKLET_SOFTIRQ, tasklet_action, NULL);中定义了处理tasklet的处理函数tasklet_action.所以我们要分析这个函数的实现:

     

    static void tasklet_action(struct softirq_action *a)

    {

           struct tasklet_struct *list;

     

           /*把per-cpu变量tasklet_vec的本地副本上的list设置为NULL,

           由于这里要修改per-cpu变量,为了防止中断处理程序

           或者内核抢占造成该数据的不一致性,所以这里禁止中断再修改数据

           ,然后再开启中断.(注意,关闭本地中断的副作用就是禁止内核抢占,

           因为内核抢占只有两个时间点: 1.中断返回到内核态;2.手动使能内核抢占。

           明显程序员不会在临界区内手动使能内核抢占,所以关闭本地中断的

           副作用就是禁止内核抢占)*/

           local_irq_disable();

           list = __get_cpu_var(tasklet_vec).list;

           __get_cpu_var(tasklet_vec).list = NULL;

           local_irq_enable();

     

           /*遍历tasklet链表,让链表上挂入的函数全部执行完成*/

           while (list) {

                  struct tasklet_struct *t = list;

     

                  list = list->next;

     

                  if (tasklet_trylock(t)) {

                         if (!atomic_read(&t->count)) {

                                if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state))

                                       BUG();

                                t->func(t->data); /*真正运行user注册的tasklet函数的地方*/

                                tasklet_unlock(t);

                                continue;

                         }

                         tasklet_unlock(t);

                  }

     

                  /*这里相当于把tasklet的list指针从链表中后移了(可以自行画图分析),

                  所以刚才运行过的tasklet回调函数以后不会再次运行,除非用于再次

                  通过tasklet_schedule()注册之*/

                  local_irq_disable();

                  t->next = __get_cpu_var(tasklet_vec).list;

                  __get_cpu_var(tasklet_vec).list = t;

                  __raise_softirq_irqoff(TASKLET_SOFTIRQ);  /*再一次触发tasklet对应的softirq,使下次系统运行softirq时能运行到tasklet*/

                  local_irq_enable();

           }

    }

    运行流程是不是很简单呢?呵呵。只要注意到加锁的时机就OK了!

     

     

    三、总结

    Tasklet与一般的softirq的比较重要的一个区别在于: softirq处理函数需要被编写成可重入的,因为多个cpu可能同时执行同一个softirq处理函数,为了防止数据出现不一致性,所以softirq的处理函数必须被编写成可重入。最典型的就是要在softirq处理函数中用spinlock保护一些共享资源。而tasklet机制本身就保证了tasklet处理函数不会同时被多个cpu调度到。因为在tasklet_schedule()中,就保证了多个cpu不可能同时调度到同一个tasklet处理函数,这样tasklet就不用编写成可重入的处理函数,这样就大大减轻了kernel编程人员的负担。


    展开全文
  • 1、在linux中,对中断的处理有原则 对中断的处理原则有两个: 1)、中断不能嵌套中断(即使发生了更高优先级的中断)–》防止空间不够用。 因为每个中断被执行,都要先保存现场。如果中断1在执行的过程中,嵌套了...

    1、在linux中,对中断的处理有原则

    对中断的处理原则有两个:
    1)、中断不能嵌套中断(即使发生了更高优先级的中断)–》防止栈空间不够用。
    因为每个中断被执行,都要先保存现场。如果中断1在执行的过程中,嵌套了中断2;那么在中断2在被执行之前,中断1的现场应该先被保存(保存在栈中)。同样,如果还嵌套了其他中断,还要将现场保存在栈中。这样,就会发生一个问题,如果嵌套的中断很多,那么可能会导致栈空间不够用。为了避免这个问题,不允许中断嵌套。
    2)、对中断的处理应当越快越好。
    假设系统只有一个CPU,一个进程在执行过程中,发生了一个中断,这个中断将要执行1分钟。那么,这个进程在1分钟之内,将不能执行其他任何操作。这中情况,是不能被允许的。

    2、耗时中断的处理方式

    前言

    在上节分析中,我们知道,中断的处理应该越快越好。但是,在实际的应用中,总会有一些耗时的中断。为了处理耗时的中断,为了实现中断的快进快出,我们将中断分为上半部/下半部 。中断下半部的实现,主要有软件中断,tasklet和任务队列。其中,tasklet是基于软中断来实现的,实际应用中,多用tasklet,而不用软件中断。

    2.1软中断

    在这里插入图片描述
    那么,下半部是如何实现的呢?
    答案是软件中断

    2.1.1为了更好的理解软件中断,我们将软件中断和硬件中断一起来分析:

    在这里插入图片描述
    在这里插入图片描述

    2.1.2软件中断的实现流程:

    在这里插入图片描述

    2.2 tasklet

    软件中断有很多,那么我们的中断下半部属于哪种软件中断呢?

    enum
    {  HI_SOFTIRQ=0,
        TIMER_SOFTIRQ,
        NET_TX_SOFTIRQ,
        NET_RX_SOFTIRQ,
        BLOCK_SOFTIRQ,
        TASKLET_SOFTIRQ,
        SCHED_SOFTIRQ,
        HRTIMER_SOFTIRQ,};
    再这份枚举表,记录了所有软件
    

    中断的类型。其中, TASKLET_SOFTIRQ,也叫做tasklet软件中断,是用来处理中断下半部。

    2.2.1 tasklet的实现原理图

    在这里插入图片描述

    2.2.2 tasklet的使用

    中断下半部使用结构体 tasklet_struct 来表示,以下简称tasklet。
    先定义/初始化 tasklet(一般在probe()函数里面),可以使用函数tasklet_init(struct tasklet_struct *t, void (*func)(unsigned long), unsigned long data);也可以直接定义tasklet结构体:
    struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(0), func, data };
    需要使用时调用 tasklet_schedule,驱动卸载前调用 tasklet_kill。
    tasklet_schedule 只是把 tasklet 放入内核队列,它的 func 函数会在软件中断的执行过程中被调用。
    注:
    tasklet具有如下特性:
    1)、 tasklet可以被多次调度,但是调度不会累积。即实际只会运行一次。
    2)、 tasklet运行在中断上下文,tasklet处理函数中不能睡眠。

    展开全文
  • 一、概念 首先我们要知道为什么中断需要下半部。我们可以想象一下,如果没有下半...首先,我们会从网卡硬件buffer中把网卡收到的packet拷贝到系统内存中,然后对这个packet进行TCP/IP协议的处理。我们知道TCP/...
    一、概念
     
    首先我们要知道为什么中断需要下半部 。我们可以想象一下,如果没有下半部的概念,一个网卡中断过来了以后会是什么样的情况。首先,我们会从网卡硬件buffer中把网卡收到的packet拷贝到系统内存中,然后对这个packet进行TCP/IP协议栈的处理。我们知道TCP/IP协议栈是一个比较复杂的软件模块,里面对packet的处理会经过非常多的步骤,首先是链路层,然后是IP层(这里又包括分片,奇偶校验之类的),然后是TCP层(TCP层的实现相当复杂,会花费比较长的时间对packet进行一些状态或者内容的分析处理),最后通过socket把packet传入用户空间。在传入用户空间之间的这些动作,都必须在中断处理中完成,因为这些操作都是在kernel中的,并且这些操作会花费比较长的时间。在这段时间里,cpu由于进入了中断门,会自动关中断,也就是说cpu不会去响应在这段时间里网卡另外发过来的中断,这样的话很有可能网卡硬件buffer会由于网卡自身的缓存不足而导致丢包   。所以linux为了解决这样的问题,把copy packet这样比较紧急的动作放在了上半部去处理(上半部默认情况下是在关中断中完成的),把协议栈这些不是特别紧急的任务放到了下半部去处理(下半部是在开中断中进行的,有就是说,处理下半部的过程中,允许cpu被其他中断打断)。

     

    二、软件构架和实现

    1. 一些基础数据结构

    文件softirq.c

    /*PER-CPU变量,每个cpu对应一个,描述当前cpu中关于softirq的一些状态,比如是否有softirq挂起需要执行等等*/

    irq_cpustat_t irq_stat[NR_CPUS] ____cacheline_aligned;

    typedef struct {

        unsigned int __softirq_pending; /*32位,对应Linux中32种softirq是否被上半部触发了(为1表示被触发,为0表示未被触发)*/

        unsigned long idle_timestamp;

        unsigned int __nmi_count;   /* arch dependent */

        unsigned int apic_timer_irqs;   /* arch dependent */

    } ____cacheline_aligned irq_cpustat_t;

     

    /*表示softirq最多有32种类型,实际上Linux只用了6种,见文件interrupt.h*/

    static struct softirq_action softirq_vec[32] __cacheline_aligned_in_smp;

     

    /* PLEASE, avoid to allocate new softirqs, if you need not _really_ high

       frequency threaded job scheduling. For almost all the purposes

       tasklets are more than enough. F.e. all serial device BHs et

       al. should be converted to tasklets, not to softirqs.

     */

     

    enum

    {

        HI_SOFTIRQ=0,   /*用于高优先级的tasklet*/

        TIMER_SOFTIRQ, /*用于定时器的下半部*/

        NET_TX_SOFTIRQ,/*用于网络层发包*/

        NET_RX_SOFTIRQ, /*用于网络层收包*/

        SCSI_SOFTIRQ,   /*用于SCSI设备*/

        TASKLET_SOFTIRQ /*用于低优先级的tasklet*/

    }; 

    struct softirq_action

    {

        void    (*action)(struct softirq_action *);  /*softirq的回调函数*/

        void    *data; /*传入action的参数*/

    };

     

    Struct softirq_action是每个softirq的配置结构,一般在系统启动的时候,6个不同的softirq,会通过函数open_softirq()来注册自己的softirq_action,实现很简单:

    void open_softirq(int nr, void (*action)(struct softirq_action*), void *data)

    {

        softirq_vec[nr].data = data;

        softirq_vec[nr].action = action;

    }

    关键是传入的函数指针,具体指明了该softirq要实现的功能或要做的动作。

    这里分开看下这六个注册点:

    Net/core/dev.c中的net_dev_init()里面注册了网络层需要用到的收包和发包的两个softirq:

        open_softirq(NET_TX_SOFTIRQ, net_tx_action, NULL);

        open_softirq(NET_RX_SOFTIRQ, net_rx_action, NULL);

    对应的函数指针为net_tx_action和net_rx_action,具体功能就不在本文的范围之内的。

     

    Driver/scsi/scsi.c中init_scsi()注册了SCSI_SOFTIRQ

    open_softirq(SCSI_SOFTIRQ, scsi_softirq, NULL);

     

    start_kernel() àsiftirq_init() 中注册了两种tasklet,一种是高优先级的tasklet,一直是低优先级的tasklet:

           open_softirq(TASKLET_SOFTIRQ, tasklet_action, NULL);

           open_softirq(HI_SOFTIRQ, tasklet_hi_action, NULL);

     

    start_kernel() àinit_timers() 中注册了TIMER_SOFTIRQ

           open_softirq(TIMER_SOFTIRQ, run_timer_softirq, NULL);

     

    2. softirq运行时机:

    系统在运行过程中,会在合适的地方使用函数local_softirq_pending()检查系统是否有softirq需要处理;需要时会调用函数do_softirq()进行处理。这些检查点主要包括以下几个地方:

    (1)中断过程退出函数irq_exit();

    (2)内核线程ksoftirqd;

    (3)内核网络子系统中显示调用;

    (4)函数local_bh_enable().

     

    先前我们分析irq_cpustat_t结构的时候,看到__softirq_pending字段。这是一个32位无符号的变量,对应Linux中32种softirq是否被上半部触发了(为1表示被触发,为0表示未被触发)。那么在softirq运行之前肯定就有地方设置了这个变量的各个位,才会触发到softirq运行。这个触发动作一般是在上半部中进行的,即上半部通过系统,还有下半部需要运行。触发函数如下:

    #define __raise_softirq_irqoff(nr) do { or_softirq_pending(1UL << (nr)); } while (0)

    #define or_softirq_pending(x)  (local_softirq_pending() |= (x))

    #define local_softirq_pending() \

           __IRQ_STAT(smp_processor_id(), __softirq_pending)

    #define __IRQ_STAT(cpu, member)     (irq_stat[cpu].member)

    由此可以看出,  __raise_softirq_irqoff(nr)实际上是把当前cpu的per-cpu变量irq_stat的__softirq_pending 的从右往左数的第nr位置1,这样系统就知道某个softirq需要在某个时刻运行了。

     

    当然,__raise_sofritq_irqoff()被很多地方封装过,不同子系统用自己封装的函数,比如网络子系统就用netif_rx_reschedule和net_rx_action来激活softirq。

     3.softirq的执行分析:

    我们先提到了softirq在四个地方有可能运行,最常见的就是中断过程退出函数irq_exit(),我们下面分析之,其他的触发点请大家对照代码自行分析:

     

    void irq_exit(void)

    {

           account_system_vtime(current);

           sub_preempt_count(IRQ_EXIT_OFFSET);

           /*如果在上半部中设置了per-cpu变量irq_stat的__softirq_pending字段,则运行下半部的处理函数, 从这里也可以看出,上半部和下半部一定是运行在同一个cpu上,因为上半部中是设置了per-cpu变量irq_stat本地cpu副本中的__softirq_pending中的位,而下半部也只是判断本地cpu的__sofrirq_pending中的位。这样有效的利用了cpu cache的特性*/

           if (!in_interrupt() && local_softirq_pending())

                  invoke_softirq();

           preempt_enable_no_resched();

    }

     

     Invoke_sofirq()àdo_softirq()

     

    asmlinkage void do_softirq(void)

    {

           __u32 pending;

           unsigned long flags;

            /*本地cpu中,softirq不能在中断环境中运行,这个中断环境包括了上半部和下半部,所以这里保证了同一个cpu上,下半部是不会被重入的,但不能保证其他cpu上的同时运行同一个softirq处理。所以编程人员必须让自己编写的softirq处理函数可重入,以防SMP系统中同时运行这些softirq导致数据出现不一致性*/

           if (in_interrupt())

                  return;

     

           local_irq_save(flags);  /*关中断*/

     

           pending = local_softirq_pending(); /*取得per-cpu变量irq_stat本地cpu副本的__softirq_pending的值*/

     

           if (pending) /*如果有本地cpu有softirq挂起需要处理,则通过__do_softirq()运行之,否则恢复中断并退出*/

                  __do_softirq();

     

           local_irq_restore(flags);  /*恢复开中断*/

    }

     

    关键是__do_softirq()

    asmlinkage void __do_softirq(void)

    {

           struct softirq_action *h;

           __u32 pending;

           int max_restart = MAX_SOFTIRQ_RESTART;

           int cpu;

     

           pending = local_softirq_pending();

     /*这个地方加local_bh_disable的原因是softirq的处理必须是串行的,又因为softirq的执行期间中断是打开的,所以当另一个中断被执行的话在in_interrupt函数上面就会发现这里已经有在执行了,就会自动退出*/

           local_bh_disable();

     

           cpu = smp_processor_id();

    restart:

           /* Reset the pending bitmask before enabling irqs */

            /*把per-cpu变量 irq_stat的本地cpu副本的__softirq_pengding字段清0,

            表示代码有信心在这一次处理中把本地cpu上挂起的所有softirq都处理掉(有信心只是开个玩笑;)*/

           set_softirq_pending(0);

     

           local_irq_enable(); /*保证下半部要在中断打开的情况下进行,否则下半部就失去意义了*/

     

           h = softirq_vec; /*softirq_vec是一个全局数组(有32个元素),存放了32种softirq的处理函数*/

     

           /*遍历unsigned int pengding的每一位,如果有被置为1,则运行对应的下半部处理函数action*/

           do {

                  if (pending & 1) {

                           /*action是一个处理队列,对于非tasklet的softirq来说只有一个元素,

                           但对于tasklet来说,就有N个函数需要处理*/

                         h->action(h);

                         rcu_bh_qsctr_inc(cpu);

                  }

                  h++;

                  pending >>= 1;

           } while (pending);

     

           local_irq_disable();  /*关闭中断*/

          

           /*因为下半部是在开中断的环境中运行的,

           所以有可能在运行了softirq A以后,

           然后在运行其他的softirq B,

           这时又产生A的硬件中断(A和B在同一个cpu中产生),

           而在A的上半部中又设置了per-cpu变量irq_stat的本地

           cpu副本的irq_stat的__softirq_pending的对应的bit,

           所以代码运行到这里又发现__softirq_pending不0,

           所以要重做处理。这样的情况最多执行max_restart次,因为

           如果不限次数的运行下去,中断就一直不返回,那么进程

           就得不到调度,系统性能会大大受影响。所以运行max_restart次以后,

           如果这样的情况还在一直发生,那么就唤醒per-cpu thread来专门执行

           这些下半部,注意,在per-cpu thread中处理的中断下半部,是可以睡眠

           的,但是编程人员无法掌握他编写的softirq处理程序是在irq_exit()中处理还是

           在per-cpu thread中处理,所以一般都不会有睡眠的可能(编程人员需要保证

           这一点)*/

           pending = local_softirq_pending();

           if (pending && --max_restart)

                  goto restart;

     

           if (pending)

                  wakeup_softirqd();

     

           __local_bh_enable(); /*enable下半部运行*/

    }

     

    这里就不画流程图了,关键是要仔细的分析几个中断关闭和中断使能的时机,以及softirq是否可以重入的问题。

     

    三总结

    本文分析了softirq运行的时间点,以及softirq是怎样被cpu调度的。后面还要继续分析tasklet的实现,tasklet实际上就是凌驾在softirq机制上的,它占用了Linux现有6种softirq的2种(优先级最高的和优先级最低的)。

    要特别注意的是:softirq处理函数也不能睡眠,因为它也是运行在中断上下文环境中的(不考虑ksoftirqd线程)。

    一、为什么要进入tasklet

    我们在softirq的文章中分析过,在SMP系统中,任何一个处理器在响应外设中断请求,完成中断上半部处理后,都可以调用函数do_softirq()来处理构建在softirq机制上的下半部。也就是说,softirq处理函数在SMP系统中是可以并行执行的,这要求使用softirq机制的下半部必须是多处理器可重入的。这对于一般的驱动程序开发者而言, 事情会变得复杂化、难度增大。为了降低驱动开发难度必须提供一套有效的机制,tasklet就是为了解决这一问题而出现的。

     

    二、tasklet实现分析

    1. 一个实例

    #include

    #include

    #include

    #include

    #include

    #include

    #include

     

    static struct tasklet_struct my_tasklet;  /*定义自己的tasklet_struct变量*/

     

    static void tasklet_handler (unsigned long data)

    {

            printk(KERN_ALERT “tasklet_handler is running.\n”);

    }

     

    static int __init test_init(void)

    {

            tasklet_init(&my_tasklet, tasklet_handler, 0); /*挂入钩子函数tasklet_handler*/

            tasklet_schedule(&my_tasklet); /* 触发softirq的TASKLET_SOFTIRQ,在下一次运行softirq时运行这个tasklet*/ 

            return 0;

    }

     

    static void __exit test_exit(void)

    {

            tasklet_kill(&my_tasklet); /*禁止该tasklet的运行*/

            printk(KERN_ALERT “test_exit running.\n”);

    }

    MODULE_LICENSE(“GPL”);

     

    module_init(test_init);

    module_exit(test_exit);

     

    运行结果如图:

     

     

     

    2.       实现分析

    我们就从上面这个实例入手来分析tasklet的实现,

    在init中,通过函数tasklet_init()来初始化自己需要注册到系统中的tasklet结构:

    void tasklet_init(struct tasklet_struct *t,

                    void (*func)(unsigned long), unsigned long data)

    {

           t->next = NULL;

           t->state = 0;

           atomic_set(&t->count, 0);

           t->func = func;

           t->data = data;

    }

    很简单,只是初始化tasklet_struct的各个字段,挂上钩子函数。

     

    然后,通过函数tasklet_schedule()来触发该tasklet

    static inline void tasklet_schedule(struct tasklet_struct *t)

    {

            /*如果需要调度的tasklet的state不为TASKLET_STATE_SCHED,则触发之。这样,就保证了多个cpu不可能同时运行同一个tasklet,因为如果一个tasklet被调度过一次,那么它的state字段就会被设置TASKLET_STATE_SCHED标记,然后插入per-cpu变量的链表中。如果这时另外一个cpu也去调度该tasklet,那么就会在下面的if语句中被挡掉,不会运行到__tasklet_schedule(),从而不会插入到另外这个cpu的per-cpu变量的链表中,就不会被运行到。所以这里是保证了tasklet编写的函数不用是可重入的,这样就方便了编程人员。(注意,softirq机制需要编写可重入的函数)*/

           if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))

                  __tasklet_schedule(t);

    }

     

    我们来看__tasklet_schedule()的实现:

    void fastcall __tasklet_schedule(struct tasklet_struct *t)

    {

           unsigned long flags;

     

           local_irq_save(flags);

            /*把需要添加进系统的自己编写的struct tasklet_struc加入

            到per-cpu变量tasklet_vec的本地副本的链表的表头中*/

           t->next = __get_cpu_var(tasklet_vec).list;

           __get_cpu_var(tasklet_vec).list = t;

           raise_softirq_irqoff(TASKLET_SOFTIRQ); /*触发softirq的TASKLET_SOFTIRQ*/ 

           local_irq_restore(flags);

    }

    这段代码也非常简单,只是把自己要注册到系统中的tasklet_struct挂入到per-cpu变量tasklet_vec的list中而已,这里是挂到链表首部。因为需要修改per-cpu变量tasklet_vec的list的值,为了防止中断处理程序也去修改这个值,所以要加自旋锁,为了保持数据的一致性。

    然后通过raise_softirq_irqoff()设置低优先级的tasklet对应的softirq标记,以便cpu在运行softirq的时候运行到tasklet,因为tasklet是凌驾在softirq机制之上的。

     

    OK,这里就完成了我们自己的my_tasklet的注册和触发对应的softirq,那我们现在就应该分析tasklet的运行了。

    我们前面提到,tasklet是凌驾在softirq机制之上的。还记得前面说到了Linux中有六种softirq,优先级最高的是HI_SOFTIRQ,优先级最低的是TASKLET_SOFTIRQ,一般情况下我们是利用TASKLET_SOFTIRQ来实现tasklet的功能。

           open_softirq(TASKLET_SOFTIRQ, tasklet_action, NULL);中定义了处理tasklet的处理函数tasklet_action.所以我们要分析这个函数的实现:

     

    static void tasklet_action(struct softirq_action *a)

    {

           struct tasklet_struct *list;

     

           /*把per-cpu变量tasklet_vec的本地副本上的list设置为NULL,

           由于这里要修改per-cpu变量,为了防止中断处理程序

           或者内核抢占造成该数据的不一致性,所以这里禁止中断再修改数据

           ,然后再开启中断.(注意,关闭本地中断的副作用就是禁止内核抢占,

           因为内核抢占只有两个时间点: 1.中断返回到内核态;2.手动使能内核抢占。

           明显程序员不会在临界区内手动使能内核抢占,所以关闭本地中断的

           副作用就是禁止内核抢占)*/

           local_irq_disable();

           list = __get_cpu_var(tasklet_vec).list;

           __get_cpu_var(tasklet_vec).list = NULL;

           local_irq_enable();

     

           /*遍历tasklet链表,让链表上挂入的函数全部执行完成*/

           while (list) {

                  struct tasklet_struct *t = list;

     

                  list = list->next;

     

                  if (tasklet_trylock(t)) {

                         if (!atomic_read(&t->count)) {

                                if (!test_and_clear_bit(TASKLET_STATE_SCHED, &t->state))

                                       BUG();

                                t->func(t->data); /*真正运行user注册的tasklet函数的地方*/

                                tasklet_unlock(t);

                                continue;

                         }

                         tasklet_unlock(t);

                  }

     

                  /*这里相当于把tasklet的list指针从链表中后移了(可以自行画图分析),

                  所以刚才运行过的tasklet回调函数以后不会再次运行,除非用于再次

                  通过tasklet_schedule()注册之*/

                  local_irq_disable();

                  t->next = __get_cpu_var(tasklet_vec).list;

                  __get_cpu_var(tasklet_vec).list = t;

                  __raise_softirq_irqoff(TASKLET_SOFTIRQ);  /*再一次触发tasklet对应的softirq,使下次系统运行softirq时能运行到tasklet*/

                  local_irq_enable();

           }

    }

    运行流程是不是很简单呢?呵呵。只要注意到加锁的时机就OK了!

     

     

    三、总结

    Tasklet与一般的softirq的比较重要的一个区别在于: softirq处理函数需要被编写成可重入的,因为多个cpu可能同时执行同一个softirq处理函数,为了防止数据出现不一致性,所以softirq的处理函数必须被编写成可重入。最典型的就是要在softirq处理函数中用spinlock保护一些共享资源。而tasklet机制本身就保证了tasklet处理函数不会同时被多个cpu调度到。因为在tasklet_schedule()中,就保证了多个cpu不可能同时调度到同一个tasklet处理函数,这样tasklet就不用编写成可重入的处理函数,这样就大大减轻了kernel编程人员的负担。

    转载于:https://www.cnblogs.com/sky-heaven/p/4846816.html

    展开全文
  • 注: 内核代码是 4.9 版本 协议从报文接收说起,报文接收从网卡驱动说起。 两种方式,NAPI 和 非NAPI。 NAPI(New API) 是Linux内核针对网络数据传输做出的一个...中断处理函数只是触发软中断准备接收报文; 2.

    注: 内核代码是 4.9 版本

    协议栈从报文接收说起,报文接收从网卡驱动说起。

    两种方式,NAPI 和 非NAPI。

    NAPI(New API) 是Linux内核针对网络数据传输做出的一个优化措施。
    其目的是在大量数据传输时, 在收到硬件中断后,通过poll方式将传输过来的数据包统一处理, 通过禁止网络设备中断以减少硬件中断数量((Interrupt Mitigation),从而实现更高的数据传输。
    
     

    其中要点:

    1、硬件中断后开始处理报文。中断处理函数只是触发软中断准备接收报文;

    2、软中断中通过pool方式处理报文。通过轮训的方式一次性处理多个报文;

    3、禁止网络设备中断以减少硬件中断数量。同上,在软中断处理函数中,将禁止中断,处理完成后,开启中断,这样一次中断处理多个报文。

     

    先从NAPI方式说起

    以 Inter 的 e1000 的驱动为例,e1000在加载驱动并做设备初始化时会调用 e1000_probe 函数,完成设备的部分初始化工作,重要的是设备的napi结构 和 poll函数是在这个函数中设置的:

    static int e1000_probe(struct pci_dev *pdev, const struct pci_device_id *ent)
    {
        struct net_device *netdev;
        struct e1000_adapter *adapter;
        struct e1000_hw *hw;
    ......
    
        // 为网卡创建网络设备对象 net_device 结构,并完成组册
        netdev = alloc_etherdev(sizeof(struct e1000_adapter));
        if (!netdev)
            goto err_alloc_etherdev;
    
        SET_NETDEV_DEV(netdev, &pdev->dev);
            // 设置网卡私有数据
        pci_set_drvdata(pdev, netdev);
        adapter = netdev_priv(netdev);
        adapter->netdev = netdev;
        adapter->pdev = pdev;
        adapter->msg_enable = netif_msg_init(debug, DEFAULT_MSG_ENABLE);
        adapter->bars = bars;
        adapter->need_ioport = need_ioport;
    
        hw = &adapter->hw;
        hw->back = adapter;
    
        err = -EIO;
            // 映射寄存器IO区域(后面拷贝报文)
        hw->hw_addr = pci_ioremap_bar(pdev, BAR_0);
        if (!hw->hw_addr)
            goto err_ioremap;
    
        ......
            // 挂载网络设备操作接口
        netdev->netdev_ops = &e1000_netdev_ops;
        e1000_set_ethtool_ops(netdev);
        netdev->watchdog_timeo = 5 * HZ;
            // 初始化并挂载设备的NAPI接口,e1000_clean 是其poll函数,软中断中调用处理报文
        netif_napi_add(netdev, &adapter->napi, e1000_clean, 64);
    
        strncpy(netdev->name, pci_name(pdev), sizeof(netdev->name) - 1);
    
        ......
    }
    
    
     

    还有个重要的函数是e1000的网卡中断处理函数,其是在上面 e1000_netdev_ops 中的ndo_open函数(网卡UP后会调用)中,调用的 e1000_request_irq注册的 e1000_intr 函数。

    然后是NAPI的接收流程。 在网卡中断之前,数据包到达网卡之后,就通过DMA直接将数据从网卡拷贝到内存的环形缓冲区了,成为 ring buffer,和非NAPI不同。

    中断处理函数,如果没有在运行的NAPI任务,调度一个新的NAPI任务,会调用通用的NAPI处理函数,__napi_schedule,将设备的napi 挂载到当前CPU的 softnet_data 的待轮训设备列表poll_list中,并触发软中断。 napi_schedule_prep 中会检查并设置 napi_struct的 NAPI_STATE_SCHED位,并在流程结束后(软中断处理完报文)调用 __napi_complete,清楚状态位。

    static irqreturn_t e1000_intr(int irq, void *data)
    {
        struct net_device *netdev = data;
        struct e1000_adapter *adapter = netdev_priv(netdev);
        struct e1000_hw *hw = &adapter->hw;
    ......
            // 如果没有在运行的NAPI任务,调度一个新的NAPI任务
        if (likely(napi_schedule_prep(&adapter->napi))) {
            adapter->total_tx_bytes = 0;
            adapter->total_tx_packets = 0;
            adapter->total_rx_bytes = 0;
            adapter->total_rx_packets = 0;
                    // 调用通用的NAPI处理函数
            __napi_schedule(&adapter->napi);
        } else {
            /* this really should not happen! if it does it is basically a
             * bug, but not a hard error, so enable ints and continue
             */
            if (!test_bit(__E1000_DOWN, &adapter->flags))
                e1000_irq_enable(adapter);
        }
    
        return IRQ_HANDLED;
    }
    
    void __napi_schedule(struct napi_struct *n)
    {
        unsigned long flags;
    
        local_irq_save(flags);
        ____napi_schedule(this_cpu_ptr(&softnet_data), n);
        local_irq_restore(flags);
    }
    
    static inline void ____napi_schedule(struct softnet_data *sd,
                         struct napi_struct *napi)
    {
            //  将设备的napi 挂载带了 sd 的poll_list中。
        list_add_tail(&napi->poll_list, &sd->poll_list);
           //  触发软中断
        __raise_softirq_irqoff(NET_RX_SOFTIRQ);
    }
    
     

    软中断接收处理函数是net_rx_action,在网络设备模块初始化时注册。

    net_dev_init:
    
        open_softirq(NET_TX_SOFTIRQ, net_tx_action);
        open_softirq(NET_RX_SOFTIRQ, net_rx_action);
    
     

    Linuxc/c++服务器开发高阶视频学习资料加群720209036获取

    内容包括C/C++,Linux,Nginx,ZeroMQ,MySQL,Redis,MongoDB,ZK,流媒体,P2P,K8S,Docker,TCP/IP,协程,DPDK多个高级知识点。

    完整视频学习链接:https://ke.qq.com/course/417774?flowToken=1013189

    关注VX公众号:Linux C后台服务器开发

     

     

    内核ksoftirqd%d(%d对应CPU的ID)内核线程用于处理CPU上的软中断。内核在初始化的时候,会在每个CPU上启动一个这样的内核线程用来处理每个CPU上的软中断。 最终会调用到上面注册的收发报的软中断处理函数。

    // 注册
    static struct smp_hotplug_thread softirq_threads = {
        .store          = &ksoftirqd,
        .thread_should_run  = ksoftirqd_should_run,
        .thread_fn      = run_ksoftirqd,
        .thread_comm        = "ksoftirqd/%u",
    };
    
    static __init int spawn_ksoftirqd(void)
    {
        register_cpu_notifier(&cpu_nfb);
    
        BUG_ON(smpboot_register_percpu_thread(&softirq_threads));
    
        return 0;
    }
    early_initcall(spawn_ksoftirqd);
    // ksoftirqd 处理函数
    static void run_ksoftirqd(unsigned int cpu)
    {
        local_irq_disable();
        if (local_softirq_pending()) {
            /*
             * We can safely run softirq on inline stack, as we are not deep
             * in the task stack here.
             */
            __do_softirq();
            local_irq_enable();
            cond_resched_rcu_qs();
            return;
        }
        local_irq_enable();
    }
    
    asmlinkage __visible void __softirq_entry __do_softirq(void)
    {
        ...
        while ((softirq_bit = ffs(pending))) {
            unsigned int vec_nr;
            int prev_count;
    
            h += softirq_bit - 1;
    
            vec_nr = h - softirq_vec;
            ...
            h->action(h);
            ...
            h++;
            pending >>= softirq_bit;
        }
        ...
    }
    

     

    收包软中断处理函数,最终调用设备poll 函数处理报文。

    static __latent_entropy void net_rx_action(struct softirq_action *h)
    {
        struct softnet_data *sd = this_cpu_ptr(&softnet_data);
        unsigned long time_limit = jiffies + 2;
        int budget = netdev_budget;
        LIST_HEAD(list);
        LIST_HEAD(repoll);
    
        local_irq_disable();
        list_splice_init(&sd->poll_list, &list);
        local_irq_enable();
    
        for (;;) {
            struct napi_struct *n;
    
            if (list_empty(&list)) {
                if (!sd_has_rps_ipi_waiting(sd) && list_empty(&repoll))
                    return;
                break;
            }
                    // 这里始终取第一个,处理完成摘除节点处理是在napi_poll中,着实让我找了一会,代码不对称影响阅读。
            n = list_first_entry(&list, struct napi_struct, poll_list);
                    // napi_poll 主要是为了调用设备注册的poll 函数,如果报文未处理完(通过poll 函数的配额判断)会通过repoll记录这个设备的napi结构,后面再挂到sd中。
            budget -= napi_poll(n, &repoll);
    
            /* If softirq window is exhausted then punt.
             * Allow this to run for 2 jiffies since which will allow
             * an average latency of 1.5/HZ.
             */
                    // budget 是每次软中断执行的配额,配额用尽或者poll函数执行的时间超过2个tick,结束处理
            if (unlikely(budget <= 0 ||
                     time_after_eq(jiffies, time_limit))) {
                sd->time_squeeze++;
                break;
            }
        }
    
        __kfree_skb_flush();
        local_irq_disable();
            //  把这个napi重新加到sd->poll_list头部,等待下次软中断再次poll
        list_splice_tail_init(&sd->poll_list, &list);
        list_splice_tail(&repoll, &list);
        list_splice(&list, &sd->poll_list);
        if (!list_empty(&sd->poll_list))
                   // 存在未处理完的情况,再次触发软中断,等待下次处理
            __raise_softirq_irqoff(NET_RX_SOFTIRQ);
        net_rps_action_and_irq_enable(sd);
    }
    
    
    static int napi_poll(struct napi_struct *n, struct list_head *repoll)
    {
        void *have;
        int work, weight;
            // 摘节点
        list_del_init(&n->poll_list);
    
        have = netpoll_poll_lock(n);
            // poll 函数的配额
        weight = n->weight;
    
        /* This NAPI_STATE_SCHED test is for avoiding a race
         * with netpoll's poll_napi().  Only the entity which
         * obtains the lock and sees NAPI_STATE_SCHED set will
         * actually make the ->poll() call.  Therefore we avoid
         * accidentally calling ->poll() when NAPI is not scheduled.
         */
        work = 0;
        if (test_bit(NAPI_STATE_SCHED, &n->state)) {
                    // 调用设备的poll 函数处理报文
            work = n->poll(n, weight);
            trace_napi_poll(n, work, weight);
        }
    
        WARN_ON_ONCE(work > weight);
            // 配额未用尽,结束处理
        if (likely(work < weight))
            goto out_unlock;
    
        /* Drivers must not modify the NAPI state if they
         * consume the entire weight.  In such cases this code
         * still "owns" the NAPI instance and therefore can
         * move the instance around on the list at-will.
         */
        if (unlikely(napi_disable_pending(n))) {
            napi_complete(n);
            goto out_unlock;
        }
    
        if (n->gro_list) {
            /* flush too old packets
             * If HZ < 1000, flush all packets.
             */
            napi_gro_flush(n, HZ >= 1000);
        }
    
        /* Some drivers may have called napi_schedule
         * prior to exhausting their budget.
         */
        if (unlikely(!list_empty(&n->poll_list))) {
            pr_warn_once("%s: Budget exhausted after napi rescheduled\n",
                     n->dev ? n->dev->name : "backlog");
            goto out_unlock;
        }
            // 配额用尽,说明存在报文未处理完,记录,等待下次软中断处理。
        list_add_tail(&n->poll_list, repoll);
    
    out_unlock:
        netpoll_poll_unlock(have);
    
        return work;
    }
    
     

    设备的poll 函数,e1000_clean。adapter->clean_rx 是在e1000_open中挂载的e1000_clean_rx_irq,它循环处理设备ring buf中的报文,并调用 e1000_receive_skb处理。

    static int e1000_clean(struct napi_struct *napi, int budget)
    {
        struct e1000_adapter *adapter = container_of(napi, struct e1000_adapter,
                                 napi);
        int tx_clean_complete = 0, work_done = 0;
    
        tx_clean_complete = e1000_clean_tx_irq(adapter, &adapter->tx_ring[0]);
    
        adapter->clean_rx(adapter, &adapter->rx_ring[0], &work_done, budget);
    
        if (!tx_clean_complete)
            work_done = budget;
    
        /* If budget not fully consumed, exit the polling mode */
        if (work_done < budget) {
            if (likely(adapter->itr_setting & 3))
                e1000_set_itr(adapter);
            napi_complete_done(napi, work_done);
            if (!test_bit(__E1000_DOWN, &adapter->flags))
                e1000_irq_enable(adapter);
        }
    
        return work_done;
    }
    
     

    e1000_receive_skb 解析eth头,获取上次协议类型,以及设置 skb->pkt_type,然后调用napi_gro_receive ,在开启GRO的情况下尝试走GRO接收,否则将数据上送协议栈。 GRO(generic receive offload)主要思想就是,组合一些类似的数据包(基于一些数据域)为一个大的数据包(一个skb),然后feed给协议栈,这里主要是利用Scatter-gather IO,也就是skb的struct skb_shared_info域来合并数据包。

    static void e1000_receive_skb(struct e1000_adapter *adapter, u8 status,
                      __le16 vlan, struct sk_buff *skb)
    {
            // 解析eth头,获取上次协议类型,以及设置 skb->pkt_type
        skb->protocol = eth_type_trans(skb, adapter->netdev);
    
        if (status & E1000_RXD_STAT_VP) {
            u16 vid = le16_to_cpu(vlan) & E1000_RXD_SPC_VLAN_MASK;
    
            __vlan_hwaccel_put_tag(skb, htons(ETH_P_8021Q), vid);
        }
        napi_gro_receive(&adapter->napi, skb);
    }
    
    gro_result_t napi_gro_receive(struct napi_struct *napi, struct sk_buff *skb)
    {
        skb_mark_napi_id(skb, napi);
        trace_napi_gro_receive_entry(skb);
    
        skb_gro_reset_offset(skb);
            
        return napi_skb_finish(dev_gro_receive(napi, skb), skb);
    }
    EXPORT_SYMBOL(napi_gro_receive);
    
    // 根据 dev_gro_receive 的返回结果处理报文
    static gro_result_t napi_skb_finish(gro_result_t ret, struct sk_buff *skb)
    {
        switch (ret) {
            // 不支持GRO,送入协议栈
        case GRO_NORMAL:
            if (netif_receive_skb_internal(skb))
                ret = GRO_DROP;
            break;
             // skb被合并(数据区),skb可以释放
        case GRO_DROP:
            kfree_skb(skb);
            break;
        case GRO_MERGED_FREE:
            /*skb数据被合并入其它skb(数据区),或合并后发送,skb可以释放。 */ 
            if (NAPI_GRO_CB(skb)->free == NAPI_GRO_FREE_STOLEN_HEAD)
                napi_skb_free_stolen_head(skb);
            else
                __kfree_skb(skb);
            break;
            // 报文已经被保存,但没做合并,skb被接管不需要释放
        case GRO_HELD:
        case GRO_MERGED:
            break;
        }
    
        return ret;
    }
    
     

    最后送入协议栈的处理都是调用的 netif_receive_skb_internal,如果配置了RPS,会走RPS接收流程,选中CPU后,走一遍非NAPI收包流程,不做详细说。否则,调用__netif_receive_skb 上送上次协议栈。

    RPS全称Receive packet Steering,用于在软件层面实现报文在多个CPU之间的负载均衡以及提高报文处理的缓存命中率,和它类似的还有一个RFS(Receive Flow Steering)rps和rfs出现的原因主要有以下两个:

    1、 对于多队列网卡,网卡硬件接收队列与cpu核数在数量上不匹配导致报文在cpu之间分配不均。

    2、 对于单队列网卡,rps和rfs可以在软件层面将报文平均分配到多个cpu上。

    static int netif_receive_skb_internal(struct sk_buff *skb)
    {
        int ret;
    
        net_timestamp_check(netdev_tstamp_prequeue, skb);
    
        if (skb_defer_rx_timestamp(skb))
            return NET_RX_SUCCESS;
    
        rcu_read_lock();
    
    #ifdef CONFIG_RPS
        if (static_key_false(&rps_needed)) {
            struct rps_dev_flow voidflow, *rflow = &voidflow;
            // 根据报文以及入接口信息获取CPU以及rps_dev_flow
            int cpu = get_rps_cpu(skb->dev, skb, &rflow);
    
            if (cpu >= 0) {
                /* 这个是非NAPI中断收包的流程,这里选中了CPU之后,将报文放入softnet_data的input_pkt_queue队列中,
                  softnet_data是每CPU的私有数据对象,它自带一个napi_struct成员backlog,函数enqueue_to_backlog
                  就是将backlog加入到softnet_data的待轮训设备列表中,并触发软中断,在软中断处理函数net_rx_action()
                  中,同样会调用backlog代表的CPU共用轮训设备的poll函数,为process_backlog(net_dev_init中初始化),
                  该函数会调用__netif_receive_skb()函数将报文送到上层协议栈处理。
                */
                ret = enqueue_to_backlog(skb, cpu, &rflow->last_qtail);
                rcu_read_unlock();
                return ret;
            }
        }
    #endif
        ret = __netif_receive_skb(skb);
        rcu_read_unlock();
        return ret;
    }
    
    static int enqueue_to_backlog(struct sk_buff *skb, int cpu,
                      unsigned int *qtail)
    {
        struct softnet_data *sd;
        unsigned long flags;
        unsigned int qlen;
    
        sd = &per_cpu(softnet_data, cpu);
    
        local_irq_save(flags);
    
        rps_lock(sd);
        if (!netif_running(skb->dev))
            goto drop;
        qlen = skb_queue_len(&sd->input_pkt_queue);
        if (qlen <= netdev_max_backlog && !skb_flow_limit(skb, qlen)) {
            if (qlen) {
    enqueue:
                // 报文挂载到input_pkt_queue中
                __skb_queue_tail(&sd->input_pkt_queue, skb);
                input_queue_tail_incr_save(sd, qtail);
                rps_unlock(sd);
                local_irq_restore(flags);
                return NET_RX_SUCCESS;
            }
    
            /* Schedule NAPI for backlog device
             * We can use non atomic operation since we own the queue lock
             */
            // 测试释放已经有调度实例
            if (!__test_and_set_bit(NAPI_STATE_SCHED, &sd->backlog.state)) {
                if (!rps_ipi_queued(sd))
                    // 这里将sd 自己的napi_struct 挂到了待轮训列表,设备无关的,共用的。
                    ____napi_schedule(sd, &sd->backlog);
            }
            goto enqueue;
        }
    
    drop:
        sd->dropped++;
        rps_unlock(sd);
    
        local_irq_restore(flags);
    
        atomic_long_inc(&skb->dev->rx_dropped);
        kfree_skb(skb);
        return NET_RX_DROP;
    }
    
    
     

    后面就是根据上层协议类型,调用对应的协议接收处理函数处理报文,算是正式进入上层协议栈了。无论是否是能了RPS,最终都会调用__netif_receive_skb 调用__netif_receive_skb_core,主要做:

    1、ptype_all处理,例如抓包程序、raw socket等;

    2、vlan报文的处理,主要是循环把vlan头剥掉,如果qinq场景,两个vlan都会被剥掉;

    3、特殊设备接口处理,例如OVS、linux bridge等;

    4、ptype_base处理,交给协议栈处理,例如ip、arp等;

    static int __netif_receive_skb_core(struct sk_buff *skb, bool pfmemalloc)
    {
        struct packet_type *ptype, *pt_prev;
        rx_handler_func_t *rx_handler;
        struct net_device *orig_dev;
        bool deliver_exact = false;
        int ret = NET_RX_DROP;
        __be16 type;
    
        net_timestamp_check(!netdev_tstamp_prequeue, skb);
    
        trace_netif_receive_skb(skb);
    
        orig_dev = skb->dev;
    
        skb_reset_network_header(skb);
        if (!skb_transport_header_was_set(skb))
            skb_reset_transport_header(skb);
        skb_reset_mac_len(skb);
    
        pt_prev = NULL;
    
    another_round:
        // 送上层协议栈前设置iif
        skb->skb_iif = skb->dev->ifindex;
    
        __this_cpu_inc(softnet_data.processed);
    
        if (skb->protocol == cpu_to_be16(ETH_P_8021Q) ||
            skb->protocol == cpu_to_be16(ETH_P_8021AD)) {
            // 这里解析了vlan头,再skb中记录vlan id,上层协议类型,并下移skb->data
            skb = skb_vlan_untag(skb);
            if (unlikely(!skb))
                goto out;
        }
    
    #ifdef CONFIG_NET_CLS_ACT
        if (skb->tc_verd & TC_NCLS) {
            skb->tc_verd = CLR_TC_NCLS(skb->tc_verd);
            goto ncls;
        }
    #endif
    
        if (pfmemalloc)
            goto skip_taps;
        // ptype_all,如tcpdump,所有包都会调用注册的handle处理
        list_for_each_entry_rcu(ptype, &ptype_all, list) {
            if (pt_prev)
                ret = deliver_skb(skb, pt_prev, orig_dev);
            pt_prev = ptype;
        }
            // 协议区分全局和设备指定的
        list_for_each_entry_rcu(ptype, &skb->dev->ptype_all, list) {
            if (pt_prev)
                ret = deliver_skb(skb, pt_prev, orig_dev);
            pt_prev = ptype;
        }
    
    skip_taps:
    #ifdef CONFIG_NET_INGRESS
        if (static_key_false(&ingress_needed)) {
            skb = sch_handle_ingress(skb, &pt_prev, &ret, orig_dev);
            if (!skb)
                goto out;
    
            if (nf_ingress(skb, &pt_prev, &ret, orig_dev) < 0)
                goto out;
        }
    #endif
    #ifdef CONFIG_NET_CLS_ACT
        skb->tc_verd = 0;
    ncls:
    #endif
        if (pfmemalloc && !skb_pfmemalloc_protocol(skb))
            goto drop;
    
        if (skb_vlan_tag_present(skb)) {
            if (pt_prev) {
                ret = deliver_skb(skb, pt_prev, orig_dev);
                pt_prev = NULL;
            }
            // 这里主要做的事情是更新skb->dev为vlan id对应的设备(vlan子接口),然后再走一遍接收处理。
            if (vlan_do_receive(&skb))
                goto another_round;
            else if (unlikely(!skb))
                goto out;
        }
        /*
        bridge、ovs的接口,都会走到。
        如果一个dev被添加到一个bridge(做为bridge的一个接口),这个接口设备的rx_handler将被设置为,
        br_handle_frame函数这是在br_add_if函数中设置的,而br_add_if (net/bridge/br_if.c)是在向
        网桥设备上添加接口时设置的。进入br_handle_frame也就进入了bridge的逻辑代码。*/
        rx_handler = rcu_dereference(skb->dev->rx_handler);
        if (rx_handler) {
            if (pt_prev) {
                ret = deliver_skb(skb, pt_prev, orig_dev);
                pt_prev = NULL;
            }
            switch (rx_handler(&skb)) {
            case RX_HANDLER_CONSUMED:  // 报文已经被消费,结束处理
                ret = NET_RX_SUCCESS;
                goto out;
            case RX_HANDLER_ANOTHER:  // skb->dev 被修改,重新走一次
                goto another_round;
            case RX_HANDLER_EXACT: /* 精确传递到ptype->dev == skb->dev */
                deliver_exact = true;
            case RX_HANDLER_PASS:
                break;
            default:
                BUG();
            }
        }
    
        if (unlikely(skb_vlan_tag_present(skb))) {
            // 还有vlan标记,说明找不到vlanid对应的设备,存在vlanid,则判定是到其他设备的包
            if (skb_vlan_tag_get_id(skb))
                skb->pkt_type = PACKET_OTHERHOST;
            /* Note: we might in the future use prio bits
             * and set skb->priority like in vlan_do_receive()
             * For the time being, just ignore Priority Code Point
             */
            skb->vlan_tci = 0;
        }
    
        type = skb->protocol;
    
        /* deliver only exact match when indicated */
        /* 设置三层协议,下面提交都是按照解析三层协议提交的,调用最终的三层协议注册的handler,如ip_rcv */
        if (likely(!deliver_exact)) {
            deliver_ptype_list_skb(skb, &pt_prev, orig_dev, type,
                           &ptype_base[ntohs(type) &
                               PTYPE_HASH_MASK]);
        }
    
        deliver_ptype_list_skb(skb, &pt_prev, orig_dev, type,
                       &orig_dev->ptype_specific);
    
        if (unlikely(skb->dev != orig_dev)) {
            deliver_ptype_list_skb(skb, &pt_prev, orig_dev, type,
                           &skb->dev->ptype_specific);
        }
    
        if (pt_prev) {
            if (unlikely(skb_orphan_frags(skb, GFP_ATOMIC)))
                goto drop;
            else
                ret = pt_prev->func(skb, skb->dev, pt_prev, orig_dev);
        } else {
    drop:
            if (!deliver_exact)
                atomic_long_inc(&skb->dev->rx_dropped);
            else
                atomic_long_inc(&skb->dev->rx_nohandler);
            kfree_skb(skb);
            /* Jamal, now you will not able to escape explaining
             * me how you were going to use this. :-)
             */
            ret = NET_RX_DROP;
        }
    
    out:
        return ret;
    }
    
    static inline int deliver_skb(struct sk_buff *skb,
                      struct packet_type *pt_prev,
                      struct net_device *orig_dev)
    {
        if (unlikely(skb_orphan_frags(skb, GFP_ATOMIC)))
            return -ENOMEM;
        atomic_inc(&skb->users);
        return pt_prev->func(skb, skb->dev, pt_prev, orig_dev);
    }
    
    
     

    各个协议的处理函数,是在各个协议模块初始化的时候注册的,如下面ipv4协议,注册的ip_rcv处理函数。

    static struct packet_type ip_packet_type __read_mostly = {
    
            .type = cpu_to_be16(ETH_P_IP),
    
            .func = ip_rcv,
    
    };
    
    static int __init inet_init(void)
    
    {
    
             ...
    
             dev_add_pack(&ip_packet_type);
    
             ...
    
    }
    
     

    非NAPI方式

    非NAPI方式,一般流程是(eth口):

    1、设备驱动程序调用netdev_alloc_skb 分配sk_buf,并完成数据拷贝;

    2、调用eth_type_trans 解析eth头,设置上层协议类型和 pkt_type

    3、调用netif_rx --> netif_rx_internal -->enqueue_to_backlog -->___napi_schedule流程,将报文挂载到softnet_data的input_pkt_queue,将 softnet_data的 napi_struct结构的backlog 挂载到 softnet_data 的待轮训设备列表中,触发软中断;

    4、软中断中调用 softnet_data的backlog poll函数,处理input_pkt_queue中的报文,即process_backlog.

    可以看到,这里的软中断之前,报文已经拷贝input_pkt_queue中,而当软中断开始运行时,input_pkt_queue中可能已经有不同网卡的报文了,process_backlog作为一个CPU 通用的poll函数处理不同网卡的报文。

    static int process_backlog(struct napi_struct *napi, int quota)
    {
       struct softnet_data *sd = container_of(napi, struct softnet_data, backlog);
       bool again = true;
       int work = 0;
    
       /* Check if we have pending ipi, its better to send them now,
        * not waiting net_rx_action() end.
        */
       if (sd_has_rps_ipi_waiting(sd)) {
           local_irq_disable();
           net_rps_action_and_irq_enable(sd);
       }
    
       napi->weight = weight_p;
       while (again) {
           struct sk_buff *skb;
           // 最重要的,出队,调用__netif_receive_skb 送协议栈,同上面NAPI方式。
           while ((skb = __skb_dequeue(&sd->process_queue))) {
               rcu_read_lock();
               __netif_receive_skb(skb);
               rcu_read_unlock();
               input_queue_head_incr(sd);
               if (++work >= quota)
                   return work;
    
           }
    
           local_irq_disable();
           rps_lock(sd);
           if (skb_queue_empty(&sd->input_pkt_queue)) {
               /*
                * Inline a custom version of __napi_complete().
                * only current cpu owns and manipulates this napi,
                * and NAPI_STATE_SCHED is the only possible flag set
                * on backlog.
                * We can use a plain write instead of clear_bit(),
                * and we dont need an smp_mb() memory barrier.
                */
               napi->state = 0;
               again = false;
           } else {
               skb_queue_splice_tail_init(&sd->input_pkt_queue,
                              &sd->process_queue);
           }
           rps_unlock(sd);
           local_irq_enable();
       }
    
       return work;
    }
    
     
    展开全文
  • 中断栈与内核的话题更多地属于内核的范畴,所以在《深入Linux设备驱动程序内核机制》第5章“中断处理”当中,基本上没怎么涉及到上述内容,只是在5.4节有些许的文字讨论中断栈中断嵌套情形下可能的溢出问题。...
  • 2.2 软中断 2.3 tasklet 2.4 工作队列 2.5 内核定时器 2.6 底半部机制的选择 3 中断亲和性 4 IPI(Interrupt-Procecesorr Interrupt):处理器中间的中断 5 /proc/ 目录下中断相关的文件 5.1 /proc/inter
  • 整个linux协议是运行在软中断环境中,所以学习协议首先要了解软中断。第一节就总结一下linux内核中软中断的具体实现。 中断的作用: 当一个中断信号到达时,CPU必须停止它当前正做的工作,转而去做中断...
  • 16.2.1 Linux内核网络协议层的层间传递手段——软中断网络协议是分层实现的,如何实现高效的网络数据是协议设计的核心问题之一。1.Linux内核中软中断的机制在Linux内核中是采用软中断的方式实现的,软中断机制...
  • 中断及软中断模型我们在此不会对中断及异常的原理和机制做深入的介绍。但必须要作出一些说明,因为这是理解 Linux 内核与其它嵌入式/实时操作系统的不同,以及理解网络协议收报文的基础。 Linux 支持 CPU 的外部...
  • linux io

    2021-06-08 16:21:23
    2 Linux内核收到系统调用的软中断,通过参数检查后,会调用虚拟文件系统(Virtual File System,VFS),虚拟文件系统会根据信息把相应的处理交给具体的文件系统,如ext2/3/4等文件系统,接着相应的文件I/O命令会转化...
  • 接收(&硬中断处理) * 硬中断处理 ixgbe_msix_clean_rings -> napi_schedule(&q_vector->napi) -> napi_schedule_prep -> __napi_schedule -> ____napi_schedule(this_cpu_ptr(&...
  • 中断处理 - 下半部(软中断) softirq机制 注册softirq处理函数 处理softirq tasklet机制 调度tasklet 中断处理 - 上半部(硬中断) 由于APIC中断控制器有点小复杂,所以本文主要通过8259A中断控制器来介绍...
  • 目录 1 协议入口__netif_receive_skb_core() ...网络收包流程从网卡驱动开始,一直往上,涉及NAPI、GRO、RPS等特性,通常是经过硬件中断后在经由软中断处理,在内核软中断的最后一步就是调用...
  • Linux4.1.12源码分析】virtio_net之NAPI机制 ...【Linux4.1.12源码分析...【Linux4.1.12源码分析】收包软中断和NAPI 【Linux4.1.12源码分析】VXLAN之remcsum实现分析 【Linux4.1
  • Linux内核网络协议笔记

    千次阅读 2018-04-13 17:14:42
    Linux内核网络协议笔记0:...最近看完《深入理解Linux内核》前几章之后(特别是与网络子系统密切相关的软中断),觉得可以而且应该看一下网络协议了。这部分网上的文章大部分都没有什么结构和思路,很少有能够条...
  • LINUX协议详解 数据包发送

    千次阅读 2011-07-12 19:16:59
    数据包发送和接收有点类似,我们一般都用int dev_queue_xmit(struct sk_buff *skb)来发送数据包,例如我们自己构造完整的数据包,最后调用真正的...但是我们也知道发送也是有软中断处理的,open_softirq(NET_TX_SOFTIRQ
  • 内核在编译的时候设置了THREAD_SIZE的值为8K... 但是如果设置了THREAD_SIZE的大小为4K的话, 内核就会使用3种类型的内核, 异常, 硬件中断请求以及软中断请求( When using 4K stacks, interrupts get their own
  • 研究linux系统,不管是做驱动、协议还是进程调度等等,都离不开中断。 这里我们就一同讨论下linux中断处理。 这里先列几个新手最容易迷糊,也最想了解的关于中断的几个问题: 本期讨论话题: 1.什么是硬中断,...
  • 从 NIC 收数据开始,到触发软中断,交付数据包到 IP 层再经由路由机制到 TCP 层,最终交付用户进程。会尽力介绍收消息过程中的各种配置信息,以及各种监控数据。知道了收消息的完整过程,了解了各种配置,明白了各种...
  • Table of Contents NAPI机制 NAPI缺陷 使用 NAPI 先决条件 ...struct napi_struct结构 -内核处理软中断的入口 netif_napi_add函数 -驱动初始时向内核注册软软中断处理回调poll函数 __napi_schedul...
  • 前面说到数据是交给netif_receive_skb来做进一步的处理,而netif_receive_skb基本没干什么事情,主要事情都在netif_receive_skb_internal中完成...此时数据处理都还在软中断的 Handler 中,top的si能反应出 CPU 在这...
  • 版本说明 Linux版本: 3.10.103 网卡驱动: ixgbev 报文收发简单流程 ...内核启动时会调用do_initcalls,从而调用注册的初始化接口net_dev_init,net_dev_init注册软中断的回调函数,分别为接收和发送的:NET_R
  • 网卡在接受数据包时会产生中断,即当 有一个以太网帧到来时,网卡向内核产生一次中断...当CPU 空闲的时候,软中断被调用,操作系统从sk_buffer 队列中取出数据包,将它传送至用户态缓冲区,提供给应用程 序使用。 ...

空空如也

空空如也

1 2 3 4
收藏数 80
精华内容 32
关键字:

linux栈软中断

linux 订阅