精华内容
下载资源
问答
  • linux 中断机制浅析

    2021-05-15 03:53:07
    广义上的中断可以分为外部中断和内部中断(异常)中断是由外部事件引起的,一般分为可屏蔽的中断与非可屏蔽的中断,所谓可屏蔽就是可以通过设置CPU的IF标志位进行屏蔽,而非可屏蔽的是一些非常紧急的事件,往往IF对其...

    广义上的中断可以分为外部中断和内部中断(异常)

    中断是由外部事件引起的,一般分为可屏蔽的中断与非可屏蔽的中断,所谓可屏蔽就是可以通过设置CPU的IF标志位进行屏蔽,而非可屏蔽的是一些非常紧急的事件,往往IF对其不起作用。

    异常是由于内部事件造成的,比如说缺页异常,系统调用等

    异常的产生

    1,监视IRQ线,对引发信号检查(编号小者优先)

    2,如果一个引发信号出现在IRQ线上

    a,把此信号转换成对应的中断向量

    b,把这个向量存放在中断控制器的一个I/O端口,从而允许CPU通过数据总线读这个向量

    c,把引发信号发送到处理器的INTR引脚,即产生一个中断

    d,等待,直到CPU应答这个信号;收到应答后,清INTR引脚

    3,返回到第1步

    核心

    其实中断最核心的东西在于中断描述符表(中断向量表)IDT,他里面记录了中断号与中断处理程序之间的对应关系(确切的说并不是与真正的中断处理程序对应,只是把中断向量号取反压入堆栈,然后跳转到common_interrupt,交给这个函数继续执行)。

    )。表中的每一项称之为中断描述符为64位,linux中IDT表项共计256项,0~31内部中断,128系统调用中断,其他的可以自由使用。CPU的idtr寄存器指向IDT表的物理基地址,lidt指令

    IDT初始化

    在系统进入保护模式之前,中断向量表idt_table中的中断处理函数都是ignore_int,在start_kernel函数中需要重新设置itd_table,这个工作由tarp_init和init_IRQ完成。其中trap_init主要用来完成固定的映射(异常),而Init_IRQ除了填充中断向量表(外部中断)之外还要进行中断控制器的初始化。

    Init_IRQ

    LINUX经常采用面向对象的思想,抽象出来,对于硬件存在很多种中断控制器,Linux将其抽象为irq_chip。同样由于同一个外部中断号可能被多个外部设备共享,而每个不同的外部设备都可以动态地注册和撤销自己的中断处理程序,所以定义了Irq_desc结构,每一个外部中断(因为在256个中断中,32被保留内部使用,所以共有224个需要irq_desc)需要这样一个结构。

    当发生n号中断时,中断处理函数会利用n索引到irq_desc数组的第n个irq_desc成员,然后调用irq_desc结构中的handle_irq函数,而handle_irq函数又会调用action中的每一个handler函数。

    Init_IRQ(本质上是native_init_IRQ),初始化中断控制芯片,接着循环填充中断向量表IDT,讲interrupt[i](中断处理函数)填入到中断描述符中,其中interrupt[i]的含义是把中断向量号取反压入堆栈,然后跳转到common_interrupt处继续执行。

    驱动程序编写

    对于外部中断来说,设备驱动程序可以调用request_irq把一个中断处理程序handler挂起到中断请求队列中,其作用就是构建一个irqaction,并设置其handler指向驱动程序提供的handler,挂接到对应的irq_desc上去,在挂接的过程中,需要检查以前的action是否支持中断共享,如果不支持就不能挂载,否则可以。

    普通进程可以被中断或异常处理程序打断

    异常处理程序可以被中断程序打断

    中断程序只可能被其他的中断程序打断

    中断与异常处理

    无特权转换。如果CPU的当前执行环境是在内核态,而被中断,我们称之为无特权转换;

    特权转换。 如果CPU的当前执行环境是在用户态,而被中断,我们称之为特权转换;这个需要先从用户态的堆栈转换到内核态堆栈中,然后再处理。

    中断处理

    当执行到common_interrupt的时候肯定已经是在内核态了,

    common_interrupt:

    addl $-0×80,(%esp)  /* Adjust vector into the [-256,-1] range */

    SAVE_ALL        //保存现场到数据结构pt_regs

    movl %esp,%eax  //通过eax传递参数pt_regs给函数do_IRQ用

    call do_IRQ      //关键的中断处理函数,这个并不是我们指明的ISR

    jmp ret_from_intr

    do_IRQ

    进行中断处理,现在我们需要区分两个概念,禁止中断local_irq_disable,禁止抢占内核preempt_disable。

    禁止中断,是指设置当前CPU的IF标志位,从而禁止接受中断请求,达到屏蔽中断的目的,在中断处理过程中考虑到多重中断,所以不会禁止中断的。

    禁止抢占内核,是指当前进程的运行,不允许被其他进程抢占,因为在中断处理完毕后可能会调用其他进程执行,为了防止这种情况出现,强制在中断处理完毕后必须回到初始中断附属的进程继续执行,也就是说在禁止抢占内核的情况下是允许发生中断的。

    中断处理的思想:根据传递过来的中断号,找到irq_desc,调用其中的handle_irq函数,继而调用action中的handler函数(这个才是真正的ISR),但是同一个中断处理程序之能在某时刻在一个CPU上运行,同时需要注意Linux对于同一个Irq号的中断处理。

    do_IRQ ( )

    {

    …..

    Irq_enter( );//上面的都是关闭中断的,由硬件自动关闭中断

    Handle_irq()//执行我们定义的中断处理程序ISR,开中断执行,为了中断嵌套

    Irq_exit ( );

    …..

    }

    异常处理

    因为异常不与外部中断设备打交道,所以其过程比较简单,就是在init_trap中填入到IDT中的中断处理程序,最后调用do_trap进行异常处理,关键也是注意是在用户态还是内核态的问题。异常处理程序在最后向当前进程发送一个信号,因为异常肯定是由当前进程引起的,所以同步于当前进程,由当前进程试图恢复故障(缺页)或者终止。这与中断不一样,中断不于当前进程有什么直接的关系。

    驱动下半部分

    我们知道驱动程序分为上半部分与下半部分,通常上半部分是紧急的部分,通常放在ISR中直接处理,对于非紧急的部分有两种方法:1. 软中断与tasklet 2. 工作队列

    软中断

    每一个softirq都用一个softirq_action来表示(里面包含软中断处理函数action),内核使用open_softirq()来注册一个软中断,共计32个软中断,用sotfirq_vec[32]来表示。

    请求软中断

    在我们驱动程序的ISR中,可以通过函数raise_softirq()来请求软件中断(因为其主要工作就是处理驱动程序的下半部分),其原理是将__softirq_pending中第i标志位置1

    软中断处理

    在do_IRQ()àirq_exit()àdo_softirq()来进行软中断的处理,其处理过程如下:

    (1)     根据__softirq_pending,来确定有什么类型的软中断需要处理

    (2)     当有软中断I时,调用softirq_vec[i]->action进行软中断处理,期间如果又有新的软中断到达,继续重复处理直到max_restart次,在进行软中断处理时是关中断的

    (3)     剩下的软中断交给ksoftirqd来处理,这个进程会和正常进程一样去抢占CPU执行

    注意:为了保证内核的强壮性,软中断只是由系统使用,是在启动时静态建立的,不会交给用户来处理,如果说我们的驱动程序要使用这种机制,就需要使用建立在软中断基础之上的tasklet机制。

    Tasklet

    在软中断存在两个系统的软中断HI_SOFTIRQ和TASKLET_SOFTIRQ,这就是用来专门为tasklet机制提供的,tasklet就是建立在HI_SOFTIRQ和TASKLET_SOFTIRQ的软中断之上的。每一个CPU都有独立的tasklet队列,tasklet_hi_vec和tasklet_vec分别是高优先级和普通优先级tasklet的队列头。

    内核通过tasklet向驱动程序模块提供处理下半部分的接口,在驱动程序中根据自己的tasklet的优先级,选择调用tasklet_schedule或者tasklet_hi_schedule,这两个函数的作用基本上一样。

    Tasklet_schedule( )

    {

    Raise_soft_irqoff(TASKLET_SOFTIRQ);//高优先级的就请求HI_SOFTIRQ

    }

    这样在处理软中断的时候,就会最终调用到tasklet_action(自己定义的ISR下半部分)来处理,注意同一种类型的tasklet只能串行执行。

    软中断与tasklet的区别和联系

    联系:Tasklet是在软中断基础之上实现的

    区别:软中断的分配是静态的(定义了6个中断号对应软中断,比如网卡数据包、时钟等),而tasklet的分配和初始化可以在运行时进行,软中断(即便是同一种类型的)可以同时运行在多个CPU中,但是tasklet同一类型的总是被串行执行,不同的tasklet类型可以在不同CPU上同时执行,所以对于软中断来说可以是可重入的,而tasklet不必是可重入的。

    那么什么是可重入函数呢?所谓可重入是指一个可以被多个任务调用的过程,任务在调用时不必担心数据是否会出错

    对于软中断来说,同一种类型的(处理函数是同一个函数)可以同时被多个CPU执行,所以该函数必须是可重入的不能包含任何全局数据等,

    对于tasklet来说,同一种类型的不可以被同时执行,只能串行执行,所以对于函数P来说没有什么要求

    工作队列

    工作队列(work queue)是linux2.6后另外一种将工作推后执行的形式 ,它和我们前面讨论的所有其他形式都有不同。工作队列可以把工作推后,交由一个内核线程去执行,也就是说,这个下半部分可以在进程上下文中执行。这样,通过工作队列执行的代码能占尽进程上下文的所有优势。最重要的就是工作队列允许被重新调度甚至是睡眠。那么,什么情况下使用工作队列,什么情况下使用tasklet。如果推后执行的任务需要睡眠,那么就选择工作队列。如果推后执行的任务不需要睡眠,那么就选择tasklet。另外,如果需要用一个可以重新调度的实体来执行你的下半部处理,也应该使用工作队列。它是唯一能在进程上下文运行的下半部实现的机制,也只有它才可以睡眠。这意味着在需要获得大量的内存时、在需要获取信号量时,在需要执行阻塞式的I/O操作时,它都会非常有用。如果不需要用一个内核线程来推后执行工作,那么就考虑使用tasklet。

    (1)     工作、工作队列和工作者线程

    如前所述,我们把推后执行的任务叫做工作(work),描述它的数据结构为work_struct,这些工作以队列结构组织成工作队列(workqueue),其数据结构为workqueue_struct,而工作线程就是负责执行工作队列中的工作。系统默认的工作者线程为events,自己也可以创建自己的工作者线程。

    (2)     表示工作的数据结构

    工作用中定义的work_struct结构表示:

    struct  work_struct{

    unsigned long pending;          /* 这个工作正在等待处理吗?*/

    struct list_head entry;         /* 连接所有工作的链表 */

    void (*func) (void *);          /* 要执行的函数 */

    void *data;                     /* 传递给函数的参数 */

    void *wq_data;                  /* 内部使用 */

    struct timer_list timer;        /* 延迟的工作队列所用到的定时器 */

    };

    这些结构被连接成链表。当一个工作者线程被唤醒时,它会执行它的链表上的所有工作。工作被执行完毕,它就将相应的work_struct对象从链表上移去。当链表上不再有对象的时候,它就会继续休眠。

    总结:

    从实现方式上大体上分为两大类:

    在IDT中找到中断向量号,进而执行相应的中断处理程序

    (1)     中断:需要在特定时间内完成的,在do_IRQ中调用ISR来处理

    (2)     异常:类似于中断,但是通过do_trap来处理,系统自定义的

    (3)     软中断:系统留用的,用来处理非紧急的程序,通常处理驱动程序的下半部分

    不占用IDT中irq

    (1)     Tasklet:这是在软中断基础之上的,不会再IDT中存在

    (2)     工作队列,用内核线程的方式进行处理

    展开全文
  • JAVA中断机制详解

    2021-02-12 11:51:58
    Java提供了中断机制,可以使用它来结束一个线程。这种机制要求线程检查它是否被中断了,然后决定是不是响应这个中断请求。线程允许忽略中断请求并继续执行。Java的中断是一种协作机制。也就是说调用线程对象的...

    Java提供了中断机制,可以使用它来结束一个线程。这种机制要求线程检查它是否被中断了,然后决定是不是响应这个中断请求。线程允许忽略中断请求并继续执行。Java的中断是一种协作机制。也就是说调用线程对象的interrupt方法并不一定就中断了正在运行的线程,它只是要求线程自己在合适的时机中断自己。Thread类有一个表明线程被中断的属性,它存放boolean值。线程的interrupted()方法被调用时,该值设为true。isInterrupted()方法返回这个属性的值。

    一、Java中断的现象

    首先,看看Thread类里的几个方法:

    public static boolean

    interrupted()

    测试当前线程是否已经中断。线程的中断状态由该方法清除。换句话说,如果连续两次调用该方法,则第二次调用将返回 false(在第一次调用已清除了其中断状态之后,且第二次调用检验完中断状态前,当前线程再次中断的情况除外)。

    public boolean

    isInterrupted()

    测试线程是否已经中断。线程的中断状态不受该方法的影响。

    public void interrupt()

    中断线程。

    上面列出了与中断有关的几个方法及其行为,可以看到interrupt是中断线程。如果不了解Java的中断机制,这样的一种解释极容易造成误解,认为调用了线程的interrupt方法就一定会中断线程。Java的中断是一种协作机制。也就是说调用线程对象的interrupt方法并不一定就中断了正在运行的线程,它只是要求线程自己在合适的时机中断自己。每个线程都有一个boolean的中断状态(不一定就是对象的属性,事实上,该状态也确实不是Thread的字段),interrupt方法仅仅只是将该状态置为true 。代码如下:

    public class Test {

    public static void main(String[] args) {

    Thread t = new MyThread();

    t.start();

    t.interrupt();

    System.out.println("已调用线程的interrupt方法");

    }

    static class MyThread extends Thread {

    public void run() {

    int num = longTimeRunningNonInterruptMethod(2, 0);

    System.out.println("长时间任务运行结束,num=" + num);

    System.out.println("线程的中断状态:" + Thread.interrupted());

    }

    private static int longTimeRunningNonInterruptMethod(int count,int initNum) {

    for (int i = 0; i < count; i++) {

    for (int j = 0; j < Integer.MAX_VALUE; j++) {

    initNum++;

    }

    }

    return initNum;

    }

    }

    }

    一般情况下,会打印如下内容:

    已调用线程的interrupt方法

    长时间任务运行结束,num=-2

    线程的中断状态:true

    可见,interrupt方法并不一定能中断线程。但是,如果改成下面的程序,情况会怎样呢? 代码如下:

    public class Test {

    public static void main(String[] args) {

    Thread t = new MyThread();

    t.start();

    t.interrupt();

    System.out.println("已调用线程的interrupt方法");

    }

    static class MyThread extends Thread {

    public void run() {

    int num = -1;

    try {

    num = longTimeRunningInterruptMethod(2, 0);

    } catch (InterruptedException e) {

    System.out.println("线程被中断");

    throw new RuntimeException(e);

    }

    System.out.println("长时间任务运行结束,num=" + num);

    System.out.println("线程的中断状态:" + Thread.interrupted());

    }

    private static int longTimeRunningInterruptMethod(int count, int initNum)

    throws InterruptedException {

    for (int i = 0; i < count; i++) {

    TimeUnit.SECONDS.sleep(5);

    }

    return initNum;

    }

    }

    }

    运行结果如下:

    已调用线程的interrupt方法

    线程被中断

    Exception in thread "Thread-0" java.lang.RuntimeException: java.lang.InterruptedException: sleep interrupted at Chapter1.Test$MyThread.run(Test.java:20)

    Caused by: java.lang.InterruptedException: sleep interrupted

    at java.lang.Thread.sleep(Native Method)

    at java.lang.Thread.sleep(Thread.java:340)

    at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:360)

    at Chapter1.Test$MyThread.longTimeRunningInterruptMethod(Test.java:29)

    at Chapter1.Test$MyThread.run(Test.java:17)

    经运行可以发现,程序抛出异常停止了,run方法里的后两条打印语句没有执行。那么,区别在哪里?

    般说来,如果一个方法声明抛出InterruptedException,表示该方法是可中断的(没有在方法中处理中断却也声明抛出

    InterruptedException的除外),也就是说可中断方法会对interrupt调用做出响应(例如sleep响应interrupt的操作包括清除中断状态,抛出InterruptedException),如果interrupt调用是在可中断方法之前调用,可中断方法一定会处理中断,像上面的例子,interrupt方法极可能在run未进入sleep的时候就调用了,但sleep检测到中断,就会处理该中断。如果在可中断方法正在执行中的时候调用interrupt,会怎么样呢?这就要看可中断方法处理中断的时机了,只要可中断方法能检测到中断状态为true,就应该处理中断。让我们为开头的那段代码加上中断处理。 那么自定义的可中断方法该如何处理中断呢?那就是在适合处理中断的地方检测线程中断状态并处理。代码如下:

    public class Test {

    public static void main(String[] args) throws Exception {

    Thread t = new MyThread();

    t.start();

    TimeUnit.MILLISECONDS.sleep(1);//如果不能看到处理过程中被中断的情形,可以启用这句再看看效果

    t.interrupt();

    System.out.println("已调用线程的interrupt方法");

    }

    static class MyThread extends Thread {

    public void run() {

    int num;

    try {

    num = longTimeRunningNonInterruptMethod(2, 0);

    } catch (InterruptedException e) {

    throw new RuntimeException(e);

    }

    System.out.println("长时间任务运行结束,num=" + num);

    System.out.println("线程的中断状态:" + Thread.interrupted());

    }

    private static int longTimeRunningNonInterruptMethod(int count,int initNum) throws InterruptedException {

    if (interrupted()) {

    throw new InterruptedException("正式处理前线程已经被请求中断");

    }

    for (int i = 0; i < count; i++) {

    for (int j = 0; j < Integer.MAX_VALUE; j++) {

    initNum++;

    }

    // 假如这就是一个合适的地方

    if (interrupted()) {

    // 回滚数据,清理操作等

    throw new InterruptedException("线程正在处理过程中被中断");

    }

    }

    return initNum;

    }

    }

    }

    如上面的代码,方法longTimeRunningMethod此时已是一个可中断的方法了。在进入方法的时候判断是否被请求中断,如果是,就不进行相应的处理了;处理过程中,可能也有合适的地方处理中断,例如上面最内层循环结束后。这段代码中检测中断用了Thread的静态方法interrupted,它将中断状态置为false,并将之前的状态返回,而isInterrupted只是检测中断,并不改变中断状态。一般来说,处理过了中断请求,应该将其状态置为false。但具体还要看实际情形。

    二、Java中断的本质

    在历史上,Java试图提供过抢占式限制中断,但问题多多,例如已被废弃的Thread.stop、Thread.suspend和

    Thread.resume等。另一方面,出于Java应用代码的健壮性的考虑,降低了编程门槛,减少不清楚底层机制的程序员无意破坏系统的概率。如今,Java的线程调度不提供抢占式中断,而采用协作式的中断。其实,协作式的中断,原理很简单,就是轮询某个表示中断的标记,我们在任何普通代码的中都可以实现。 例如下面的代码:

    volatile bool isInterrupted;

    //…

    while(!isInterrupted) {

    compute();

    }

    但是,上述的代码问题也很明显。当compute执行时间比较长时,中断无法及时被响应。另一方面,利用轮询检查标志变量的方式,想要中断wait和sleep等线程阻塞操作也束手无策。如果仍然利用上面的思路,要想让中断及时被响应,必须在虚拟机底层进行线程调度的对标记变量进行检查。是的,JVM中确实是这样做的。下面摘自java.lang.Thread的源代码:

    public static boolean interrupted() {

    return currentThread().isInterrupted(true);

    }

    //…

    private native boolean isInterrupted(boolean ClearInterrupted);

    可以发现,isInterrupted被声明为native方法,取决于JVM底层的实现。实际上,JVM内部确实为每个线程维护了一个中断标记。但应用程序不能直接访问这个中断变量,必须通过下面几个方法进行操作:

    public class Thread {

    //设置中断标记

    public void interrupt() { ... }

    //获取中断标记的值

    public boolean isInterrupted() { ... }

    //清除中断标记,并返回上一次中断标记的值

    public static boolean interrupted() { ... }

    ...

    }

    通常情况下,调用线程的interrupt方法,并不能立即引发中断,只是设置了JVM内部的中断标记。因此,通过检查中断标记,应用程序可以做一些特殊操作,也可以完全忽略中断。你可能想,如果JVM只提供了这种简陋的中断机制,那和应用程序自己定义中断变量并轮询的方法相比,基本也没有什么优势。JVM内部中断变量的主要优势,就是对于某些情况,提供了模拟自动“中断陷入”的机制。

    在执行涉及线程调度的阻塞调用时(例如wait、sleep和join),如果发生中断,被阻塞线程会“尽可能快的”抛出InterruptedException。因此,我们就可以用下面的代码框架来处理线程阻塞中断.代码如下:

    try {

    //wait、sleep或join

    } catch(InterruptedException e) {

    //某些中断处理工作

    }

    所谓“尽可能快”,我猜测JVM就是在线程调度调度的间隙检查中断变量,速度取决于JVM的实现和硬件的性能。

    三、一些不会抛出 InterruptedException 的线程阻塞操作

    然而,对于某些线程阻塞操作,JVM并不会自动抛出InterruptedException异常。例如,某些I/O操作和内部锁操作。对于这类操作,可以用其他方式模拟中断:

    java.io中的异步socket I/O。读写socket的时候,InputStream和OutputStream的read和write方法会阻塞等待,但不会响应java中断。不过,调用Socket的close方法后,被阻塞线程会抛出SocketException异常。

    利用Selector实现的异步I/O 。如果线程被阻塞于Selector.select(在java.nio.channels中),调用wakeup方法会引起ClosedSelectorException异常。

    锁获取 。如果线程在等待获取一个内部锁,我们将无法中断它。但是,利用Lock类的lockInterruptibly方法,我们可以在等待锁的同时,提供中断能力。

    四、两条编程原则

    另外,在任务与线程分离的框架中,任务通常并不知道自身会被哪个线程调用,也就不知道调用线程处理中断的策略。所以,在任务设置了线程中断标记后,并不能确保任务会被取消。因此,有以下两条编程原则:

    除非你知道线程的中断策略,否则不应该中断它。这条原则告诉我们,不应该直接调用Executer之类框架中线程的interrupt方法,应该利用诸如Future.cancel的方法来取消任务。

    任务代码不该猜测中断对执行线程的含义。这条原则告诉我们,一般代码遇在到InterruptedException异常时,不应该将其捕获后“吞掉”,而应该继续向上层代码抛出。

    总之,Java中的非抢占式中断机制,要求我们必须改变传统的抢占式中断思路,在理解其本质的基础上,采用相应的原则和模式来编程

    展开全文
  • 在Linux中,分为中断处理采用“上半部”和“下半部”处理机制。 一、中断处理“下半部”机制 中断服务程序一般都是在中断请求关闭的条件下执行的,以避免嵌套而使中断控制复杂化。但是,中断是一个随机事件,它随时...

    前言

    中断分为硬件中断,软件中断。中断的处理原则主要有两个:一个是不能嵌套,另外一个是越快越好。在Linux中,分为中断处理采用“上半部”和“下半部”处理机制。

    一、中断处理“下半部”机制

    中断服务程序一般都是在中断请求关闭的条件下执行的,以避免嵌套而使中断控制复杂化。但是,中断是一个随机事件,它随时会到来,如果关中断的时间太长,CPU就不能及时响应其他的中断请求,从而造成中断的丢失。

    因此,Linux内核的目标就是尽可能快的处理完中断请求,尽其所能把更多的处理向后推迟。例如,假设一个数据块已经达到了网线,当中断控制器接受到这个中断请求信号时,Linux内核只是简单地标志数据到来了,然后让处理器恢复到它以前运行的状态,其余的处理稍后再进行(如把数据移入一个缓冲区,接受数据的进程就可以在缓冲区找到数据)。

    因此,内核把中断处理分为两部分:上半部(top-half)和下半部(bottom-half),上半部 (就是中断服务程序)内核立即执行,而下半部(就是一些内核函数)留着稍后处理。

    首先:一个快速的“上半部”来处理硬件发出的请求,它必须在一个新的中断产生之前终止。通常,除了在设备和一些内存缓冲区(如果你的设备用到了DMA,就不止这些)之间移动或传送数据,确定硬件是否处于健全的状态之外,这一部分做的工作很少。
    第二:“下半部”运行时是允许中断请求的,而上半部运行时是关中断的,这是二者之间的主要区别。

    内核到底什么时候执行下半部,以何种方式组织下半部?

    这就是我们要讨论的下半部实现机制,这种机制在内核的演变过程中不断得到改进,在以前的内核中,这个机制叫做bottom-half(以下简称BH)。但是,Linux的这种bottom-half机制有两个缺点:

    • 在任意一时刻,系统只能有一个CPU可以执行BH代码,以防止两个或多个CPU同时来执行BH函数而相互干扰。因此BH代码的执行是严格“串行化”的。
    • BH函数不允许嵌套。

    这两个缺点在单CPU系统中是无关紧要的,但在SMP系统中却是非常致命的。因为BH机制的严格串行化执行显然没有充分利用SMP系统的多CPU特点。为此,在2.4以后的版本中有了新的发展和改进,改进的目标使下半部可以在多处理机上并行执行,并有助于驱动程序的开发者进行驱动程序的开发。下面主要介绍3种5.4内核中的“下半部”处理机制:

    • 软中断请求(softirq)机制
    • 小任务(tasklet)机制
    • 工作队列机制
    • threaded_irq机制

    以上三种机制的比较如下图所示

    img

    二、软中断请求(softirq)机制

    Linux的softirq机制是与SMP紧密不可分的。为此,整个softirq机制的设计与实现中自始自终都贯彻了一个思想:“谁触发,谁执行”(Who marks,Who runs),也即触发软中断的那个CPU负责执行它所触发的软中断,而且每个CPU都有它自己的软中断触发与控制机制。这个设计思想也使得softirq机制充分利用了SMP系统的性能和特点。

    2.1 软中断描述符

    Linux在include/linux/interrupt.h头文件中定义了数据结构softirq_action,来描述一个软中断请求,如下所示:

    /* PLEASE, avoid to allocate new softirqs, if you need not _really_ high
       frequency threaded job scheduling. For almost all the purposes
       tasklets are more than enough. F.e. all serial device BHs et
       al. should be converted to tasklets, not to softirqs.
     */
    
    enum
    {
    	HI_SOFTIRQ=0,	//用于实现高优先级的软中断
    	TIMER_SOFTIRQ,
    	NET_TX_SOFTIRQ,	//用于实现网络数据的发送
    	NET_RX_SOFTIRQ, //用于实现网络数据的接收
    	BLOCK_SOFTIRQ,
    	IRQ_POLL_SOFTIRQ,
    	TASKLET_SOFTIRQ, //用于实现tasklet软中断
    	SCHED_SOFTIRQ,
    	HRTIMER_SOFTIRQ, /* Unused, but kept as tools rely on the
    			    numbering. Sigh! */
    	RCU_SOFTIRQ,    /* Preferable RCU should always be the last softirq */
    
    	NR_SOFTIRQS		//大小等于10,最后一个
    };
    
    /* map softirq index to softirq name. update 'softirq_to_name' in
     * kernel/softirq.c when adding a new softirq.
     */
    extern const char * const softirq_to_name[NR_SOFTIRQS];
    
    /* softirq mask and active fields moved to irq_cpustat_t in
     * asm/hardirq.h to get better cache usage.  KAO
     */
    
    struct softirq_action
    {
    	void	(*action)(struct softirq_action *);	//函数指针action指向软中断请求的服务函数
    };
    
    asmlinkage void do_softirq(void);
    asmlinkage void __do_softirq(void);
    

    其中,函数指针action指向软中断请求的服务函数。基于上述软中断描述符,Linux在kernel/softirq.c文件中定义了一个全局的softirq_vec数组:

    static struct softirq_action softirq_vec[NR_SOFTIRQS] __cacheline_aligned_in_smp;
    

    在这里系统一共定义了10个软中断请求描述符。软中断向量i(0≤i≤9)所对应的软中断请求描述符就是softirq_vec[i]。这个数组是个系统全局数组,即它被所有的CPU所共享。这里需要注意的一点是:每个CPU虽然都有它自己的触发和控制机制,并且只执行他自己所触发的软中断请求,但是各个CPU所执行的软中断服务例程却是相同的,也即都是执行softirq_vec[ ]数组中定义的软中断服务函数。Linux在kernel/softirq.c中的相关代码如下:

    /*
       - No shared variables, all the data are CPU local.
       - If a softirq needs serialization, let it serialize itself
         by its own spinlocks.
       - Even if softirq is serialized, only local cpu is marked for
         execution. Hence, we get something sort of weak cpu binding.
         Though it is still not clear, will it result in better locality
         or will not.
    
       Examples:
       - NET RX softirq. It is multithreaded and does not require
         any global serialization.
       - NET TX softirq. It kicks software netdevice queues, hence
         it is logically serialized per device, but this serialization
         is invisible to common code.
       - Tasklets: serialized wrt itself.
     */
    
    #ifndef __ARCH_IRQ_STAT
    DEFINE_PER_CPU_ALIGNED(irq_cpustat_t, irq_stat);	//较老版本内核irq_cpustat_t irq_stat[NR_CPUS] ____cacheline_aligned;
    EXPORT_PER_CPU_SYMBOL(irq_stat);	//较老版本内核EXPORT_SYMBOL(irq_stat)
    #endif
    
    static struct softirq_action softirq_vec[NR_SOFTIRQS] __cacheline_aligned_in_smp;
    
    DEFINE_PER_CPU(struct task_struct *, ksoftirqd);
    
    const char * const softirq_to_name[NR_SOFTIRQS] = {
    	"HI", "TIMER", "NET_TX", "NET_RX", "BLOCK", "IRQ_POLL",
    	"TASKLET", "SCHED", "HRTIMER", "RCU"
    };
    

    2.2 软中断触发机制

    要实现“谁触发,谁执行”的思想,就必须为每个CPU都定义它自己的触发和控制变量。为此,Linux在\Linux-5.4\arch\arm\include\asm\hardirq.h头文件中定义了数据结构irq_cpustat_t来描述一个CPU的中断统计信息,其中就有用于触发和控制软中断的成员变量。数据结构irq_cpustat_t的定义如下:

    IPI: 处理器间的中断(Inter-Processor Interrupts)

    #define NR_IPI	7		//Inter-Processor Interrupts,IPI
    
    typedef struct {
    	unsigned int __softirq_pending;
    #ifdef CONFIG_SMP
    	unsigned int ipi_irqs[NR_IPI];
    #endif
    } ____cacheline_aligned irq_cpustat_t;
    
    #define __inc_irq_stat(cpu, member)	__IRQ_STAT(cpu, member)++
    #define __get_irq_stat(cpu, member)	__IRQ_STAT(cpu, member)
    
    #define __IRQ_STAT(cpu, member) (irq_stat[cpu].member)
    
    //\Linux-5.4\include\linux\irq_cpustat.h文件中
    DECLARE_PER_CPU_ALIGNED(irq_cpustat_t, irq_stat);	/* defined in asm/hardirq.h */
    #define __IRQ_STAT(cpu, member)	(per_cpu(irq_stat.member, cpu))
    
    //\Linux-5.4\include\linux\percpu-defs.h文字中
    /*
     * Normal declaration and definition macros.
     */
    #define DECLARE_PER_CPU_SECTION(type, name, sec)			\
    	extern __PCPU_ATTRS(sec) __typeof__(type) name
    
    #define DECLARE_PER_CPU_ALIGNED(type, name)				\
    	DECLARE_PER_CPU_SECTION(type, name, PER_CPU_ALIGNED_SECTION)	\
    	____cacheline_aligned
    

    展开可得到以下定义:

     irq_cpustat_t irq_stat[NR_CPUS] ____cacheline_aligned;
    

    其中:

    1. NR_CPUS,为系统中CPU个数。
    2. 这样,每个CPU都只操作它自己的中断统计信息结构。假设有一个编号为id的CPU,那么它只能操作它自己的中断统计信息结构irq_stat[id](0≤id≤NR_CPUS-1),从而使各CPU之间互不影响。

    使用open_softirq()函数可以注册软中断对应的处理函数,而raise_softirq()函数可以出发一个软件中断。

    //使用softirq机制需要通过open_softirq来注册软中断处理函数,使中断索引号与中断处理函数对应。该函数定义在kernel/softirq.c文件中。
    //该函数将软中断的中断处理函数指针赋值给相应的softirq_vec。
    //nr为中断号,action为中断处理函数
    void open_softirq(int nr, void (*action)(struct softirq_action *))
    {
    	softirq_vec[nr].action = action;
    }
    
    //在中断处理函数完成了紧急的硬件操作后,就应该调用raise_softirq函数来触发软中断,让软中断来处理耗时的中断下半部操作。
    //如果没有处于中断中,则立马唤醒线程softirqd
    void raise_softirq(unsigned int nr)		//触发一个软中断,nr为中断号
    {
    	unsigned long flags;
    
    	local_irq_save(flags);
    	raise_softirq_irqoff(nr);	//raise_softirq_irqoff来触发相应的软中断,将相应的bit位置位
    	local_irq_restore(flags);
    }
    

    raise_softirq_irqoff函数:

    /*
     * This function must run with irqs disabled!
     */
    inline void raise_softirq_irqoff(unsigned int nr)
    {
    	__raise_softirq_irqoff(nr);
    
    	/*
    	 * If we're in an interrupt or softirq, we're done
    	 * (this also catches softirq-disabled code). We will
    	 * actually run the softirq once we return from
    	 * the irq or softirq.
    	 *
    	 * Otherwise we wake up ksoftirqd to make sure we
    	 * schedule the softirq soon.
    	 */
    	if (!in_interrupt())
    		wakeup_softirqd();
    }
    

    通过in_interrupt判断现在是否在中断上下文中,或者软中断是否被禁止,如果都不成立,我们必须要调用wakeup_softirqd函数用来唤醒本CPU上的softirqd这个内核线程。

    2.3 初始化软中断

    //初始化软中断(softirq_init)
    //在start_kernel()进行系统初始化中,就调用了softirq_init()函数对HI_SOFTIRQ和TASKLET_SOFTIRQ两个软中断进行了初始化
    //Linux-5.4\init\main.c -> softirq_init()
    void __init softirq_init(void)
    {
    	int cpu;
    
    	for_each_possible_cpu(cpu) {
    		per_cpu(tasklet_vec, cpu).tail =
    			&per_cpu(tasklet_vec, cpu).head;
    		per_cpu(tasklet_hi_vec, cpu).tail =
    			&per_cpu(tasklet_hi_vec, cpu).head;
    	}
    
    	open_softirq(TASKLET_SOFTIRQ, tasklet_action);	//设置软中断服务函数
    	open_softirq(HI_SOFTIRQ, tasklet_hi_action);	//设置软中断服务函数
    }
    

    2.4 软中断服务的执行及处理

    2.4.1 在中断返回现场调度__do_softirq

    在中断处理程序中触发软中断是最常见的形式,一个硬件中断处理完成之后。下面的函数在处理完硬件中断之后退出中断处理函数,在irq_exit中会触发软件中断的处理。

    中断处理模型:

    这里写图片描述

    硬中断处理过程:

    fastcall unsigned int do_IRQ(struct pt_regs *regs)
    {
            ...
            irq_enter();
    
            //handle external interrupt (ISR)
            ...
              irq_exit();
    
            return 1;
    }
    

    硬中断处理完毕后会执行irq_exit()函数

    /*
     * Exit an interrupt context. Process softirqs if needed and possible:
     */
    void irq_exit(void)
    {
    #ifndef __ARCH_IRQ_EXIT_IRQS_DISABLED
    	local_irq_disable();
    #else
    	lockdep_assert_irqs_disabled();
    #endif
    	account_irq_exit_time(current);
    	preempt_count_sub(HARDIRQ_OFFSET);
    	if (!in_interrupt() && local_softirq_pending())	//如果没有处于中断上下文中,以及没有标志位,则调用invoke_softirq函数,调用__do_softirq函数,或者唤醒处理软中断的线程。
    		invoke_softirq();
    
    	tick_irq_exit();
    	rcu_irq_exit();
    	trace_hardirq_exit(); /* must be last! */
    }
    

    在irq_exit()函数中会调用invoke_softirq函数

    static inline void invoke_softirq(void)
    {
    	if (ksoftirqd_running(local_softirq_pending()))
    		return;
    
    	if (!force_irqthreads) {
    #ifdef CONFIG_HAVE_IRQ_EXIT_ON_IRQ_STACK
    		/*
    		 * We can safely execute softirq on the current stack if
    		 * it is the irq stack, because it should be near empty
    		 * at this stage.
    		 */
    		__do_softirq();	/* 软中断处理函数 */
    #else
    		/*
    		 * Otherwise, irq_exit() is called on the task stack that can
    		 * be potentially deep already. So call softirq in its own stack
    		 * to prevent from any overrun.
    		 */
    		do_softirq_own_stack();	//__do_softirq();
    #endif
    	} else {
    		wakeup_softirqd();/* 如果强制使用软中断线程进行软中断处理,会通知调度器唤醒软中断线程ksoftirqd */
    	}
    }
    

    函数do_softirq()负责执行数组softirq_vec[i]中设置的软中断服务函数。每个CPU都是通过执行这个函数来执行软中断服务的。由于同一个CPU上的软中断服务例程不允许嵌套,因此,do_softirq()函数一开始就检查当前CPU是否已经正出在中断服务中,如果是则do_softirq()函数立即返回。举个例子,假设CPU0正在执行do_softirq()函数,执行过程产生了一个高优先级的硬件中断,于是CPU0转去执行这个高优先级中断所对应的中断服务程序。众所周知,所有的中断服务程序最后都要跳转到do_IRQ()函数并由它来依次执行中断服务队列中的ISR,这里我们假定这个高优先级中断的ISR请求触发了一次软中断,于是do_IRQ()函数在退出之前看到有软中断请求,从而调用do_softirq()函数来服务软中断请求。因此,CPU0再次进入do_softirq()函数(也即do_softirq()函数在CPU0上被重入了)。但是在这一次进入do_softirq()函数时,它马上发现CPU0此前已经处在中断服务状态中了,因此这一次do_softirq()函数立即返回。于是,CPU0回到该开始时的do_softirq()函数继续执行,并为高优先级中断的ISR所触发的软中断请求补上一次服务。从这里可以看出,do_softirq()函数在同一个CPU上的执行是串行的。

    do_softirq函数:

    asmlinkage __visible void do_softirq(void)
    {
    	__u32 pending;
    	unsigned long flags;
    	/* 判断是否在中断处理中,如果正在中断处理,就直接返回 */
    	if (in_interrupt())
    		return;
    	/* 保存当前寄存器的值 */
    	local_irq_save(flags);
    	/* 取得当前已注册软中断的位图 */
    	pending = local_softirq_pending();
    
        /* 循环处理所有已注册的软中断 */
    	if (pending && !ksoftirqd_running(pending))
    		do_softirq_own_stack();		//__do_softirq
    
    	local_irq_restore(flags);
    }
    

    __do_softirq函数:

    asmlinkage __visible void __softirq_entry __do_softirq(void)
    {
    	unsigned long end = jiffies + MAX_SOFTIRQ_TIME;	/* 为了防止软中断执行时间太长,设置了一个软中断结束时间 */
    	unsigned long old_flags = current->flags;		 /* 保存当前进程的标志 */
    	int max_restart = MAX_SOFTIRQ_RESTART;			/* 软中断循环执行次数: 10次 */
    	struct softirq_action *h;						/* 软中断的action指针 */
    	bool in_hardirq; 
    	__u32 pending;
    	int softirq_bit;
    
    	/*
    	 * Mask out PF_MEMALLOC as the current task context is borrowed for the
    	 * softirq. A softirq handled, such as network RX, might set PF_MEMALLOC
    	 * again if the socket is related to swapping.
    	 */
    	current->flags &= ~PF_MEMALLOC;
    
    	pending = local_softirq_pending();			/* 获取此CPU的__softirq_pengding变量值 */
    	account_irq_enter_time(current);			/* 用于统计进程被软中断使用时间 */
    
    	__local_bh_disable_ip(_RET_IP_, SOFTIRQ_OFFSET);	/* 增加preempt_count软中断计数器,也表明禁止了调度 */
    	in_hardirq = lockdep_softirq_start();
    
    restart:										/* 循环10次的入口,每次循环都会把所有挂起需要执行的软中断执行一遍 */
    	/* 该CPU的__softirq_pending清零,当前的__softirq_pending保存在pending变量中 */
        /* 这样做就保证了新的软中断会在下次循环中执行 */
    	
    	/* Reset the pending bitmask before enabling irqs */
    	set_softirq_pending(0);
    	
    	local_irq_enable();		/* 开中断 */
    
    	h = softirq_vec;		/* h指向软中断数组头 */
    
    	while ((softirq_bit = ffs(pending))) {/* 每次获取最高优先级的已挂起软中断 */
    		unsigned int vec_nr;
    		int prev_count;
    
    		h += softirq_bit - 1;		/* 获取此软中断描述符地址 */
    
    		vec_nr = h - softirq_vec;	/* 减去软中断描述符数组首地址,获得软中断号 */
    		prev_count = preempt_count();	/* 获取preempt_count的值 */
    
    		kstat_incr_softirqs_this_cpu(vec_nr);	/* 增加统计中该软中断发生次数 */
    
    		trace_softirq_entry(vec_nr);
    		h->action(h);							/* 执行该软中断的action操作 */
    		trace_softirq_exit(vec_nr);
    		/* 之前保存的preempt_count并不等于当前的preempt_count的情况处理,也是简单的把之前的复制到当前的preempt_count上,这样做是防止最后软中断计数不为0导致系统不能够执行调度 */
    		if (unlikely(prev_count != preempt_count())) {
    			pr_err("huh, entered softirq %u %s %p with preempt_count %08x, exited with %08x?\n",
    			       vec_nr, softirq_to_name[vec_nr], h->action,
    			       prev_count, preempt_count());
    			preempt_count_set(prev_count);
    		}
    		h++;/* h指向下一个软中断,但下个软中断并不一定需要执行,这里只是配合softirq_bit做到一个处理 */
    		pending >>= softirq_bit;
    	}
    
    	if (__this_cpu_read(ksoftirqd) == current)
    		rcu_softirq_qs();
    	local_irq_disable();		/* 关中断 */
    	/* 循环结束后再次获取CPU的__softirq_pending变量,为了检查是否还有软中断未执行 */
    	pending = local_softirq_pending();
    	/* 在还有软中断需要执行的情况下,如果时间片没有执行完,并且循环次数也没到10次,继续执行软中断 */
    	if (pending) {
    		if (time_before(jiffies, end) && !need_resched() &&
    		    --max_restart)
    			goto restart;
    		/* 这里是有软中断挂起,但是软中断时间和循环次数已经用完,通知调度器唤醒软中断线程去执行挂起的软中断,软中断线程是ksoftirqd,这里只起到一个通知作用,因为在中断上下文中是禁止调度的 */
    		wakeup_softirqd();
    	}
    
    	lockdep_softirq_end(in_hardirq);
    	account_irq_exit_time(current);/* 用于统计进程被软中断使用时间 */
    	/* 减少preempt_count中的软中断计数器 */
    	__local_bh_enable(SOFTIRQ_OFFSET);
    	WARN_ON_ONCE(in_interrupt());
    	/* 还原进程标志 */
    	current_restore_flags(old_flags, PF_MEMALLOC);
    }
    

    2.4.2 在守护线程ksoftirq中执行

    虽然大部分的softirq是在中断退出的情况下执行,但是有几种情况会在ksoftirq中执行。

    1. 从上文看出,raise_softirq主动触发,而此时正好不是在中断上下文中,ksoftirq进程将被唤醒。
    2. 在irq_exit中执行软中断,但是在经过MAX_SOFTIRQ_RESTART次循环后,软中断还未处理完,这种情况,ksoftirq进程也会被唤醒。

    所以加入守护线程这一机制,主要是担心一旦有大量的软中断等待执行,会使得内核过长地留在中断上下文中。

    static void wakeup_softirqd(void)
    {
    	/* Interrupts are disabled: no need to stop preemption */
    	struct task_struct *tsk = __this_cpu_read(ksoftirqd);//ksoftirqd这个线程
    
    	if (tsk && tsk->state != TASK_RUNNING)
    		wake_up_process(tsk);
    }
    

    softirq_threads线程函数为run_ksoftirqd:

    //\kernel\softirq.c文件中
    static struct smp_hotplug_thread softirq_threads = {
    	.store			= &ksoftirqd,
    	.thread_should_run	= ksoftirqd_should_run,
    	.thread_fn		= run_ksoftirqd,
    	.thread_comm		= "ksoftirqd/%u",
    };
    

    run_ksoftirqd函数:

    static void run_ksoftirqd(unsigned int cpu)
    {
    	local_irq_disable();
    	if (local_softirq_pending()) {
    		/*
    		 * We can safely run softirq on inline stack, as we are not deep
    		 * in the task stack here.
    		 */
    		__do_softirq();		//最终还是调用这个函数。
    		local_irq_enable();
    		cond_resched();
    		return;
    	}
    	local_irq_enable();
    }
    

    三、小任务机制

    tasklet机制是一种较为特殊的软中断。

    tasklet一词的原意是“小片任务”的意思,这里是指一小段可执行的代码,且通常以函数的形式出现。软中断向量HI_SOFTIRQ和TASKLET_SOFTIRQ均是用tasklet机制来实现的。

    从某种程度上讲,tasklet机制是Linux内核对BH机制的一种扩展。在2.4内核引入了softirq机制后,原有的BH机制正是通过tasklet机制这个桥梁来将softirq机制纳入整体框架中的。正是由于这种历史的延伸关系,使得tasklet机制与一般意义上的软中断有所不同,而呈现出以下两个显著的特点:

    1. 与一般的软中断不同,某一段tasklet代码在某个时刻只能在一个CPU上运行,而不像一般的软中断服务函数(即softirq_action结构中的action函数指针)那样——在同一时刻可以被多个CPU并发地执行。
    2. 与BH机制不同,不同的tasklet代码在同一时刻可以在多个CPU上并发地执行,而不像BH机制那样必须严格地串行化执行(也即在同一时刻系统中只能有一个CPU执行BH函数)。

    3.1 tasklet描述符

    Linux用数据结构tasklet_struct来描述一个tasklet,每个结构代表一个独立的小任务。该数据结构定义在include/linux/interrupt.h头文件中。如下所示:

    /* Tasklets --- multithreaded analogue of BHs.
    
       Main feature differing them of generic softirqs: tasklet
       is running only on one CPU simultaneously.
    
       Main feature differing them of BHs: different tasklets
       may be run simultaneously on different CPUs.
    
       Properties:
       * If tasklet_schedule() is called, then tasklet is guaranteed
         to be executed on some cpu at least once after this.
       * If the tasklet is already scheduled, but its execution is still not
         started, it will be executed only once.
       * If this tasklet is already running on another CPU (or schedule is called
         from tasklet itself), it is rescheduled for later.
       * Tasklet is strictly serialized wrt itself, but not
         wrt another tasklets. If client needs some intertask synchronization,
         he makes it with spinlocks.
     */
    
    struct tasklet_struct
    {
    	struct tasklet_struct *next;
    	unsigned long state;
    	atomic_t count;
    	void (*func)(unsigned long);
    	unsigned long data;
    };
    

    其中:

    • next: 指向下一个tasklet的指针;

    • state: 定义了这个tasklet的当前状态。这一个32位的无符号长整数,当前只使用了bit[1]和bit[0]两个状态位。其中,bit[1]=1 表示这个tasklet当前正在某个CPU上被执行,它仅对SMP系统才有意义,其作用就是为了防止多个CPU同时执行一个tasklet的情形出现;bit[0]=1表示这个tasklet已经被调度去等待执行了。

    对这两个状态位的宏定义如下所示(interrupt.h):

    enum
    {
    	TASKLET_STATE_SCHED,	/* Tasklet is scheduled for execution */
    	TASKLET_STATE_RUN	/* Tasklet is running (SMP only) */
    };
    
    • count: 子计数count,对这个tasklet的引用计数值。

    注:只有当count等于0时,tasklet代码段才能执行,也即此时tasklet是被使能的;如果count非零,则这个tasklet是被禁止的。任何想要执行一个tasklet代码段的人都首先必须先检查其count成员是否为0。

    • func:指向以函数形式表现的可执行tasklet代码段。
    • data:函数func的参数。这是一个32位的无符号整数,其具体含义可供func函数自行解释,比如将其解释成一个指向某个用户自定义数据结构的地址值。

    Linux在interrupt.h头文件中又定义了两个用来定义tasklet_struct结构变量的辅助宏:

    //定义一个tasklet_struct结构体
    #define DECLARE_TASKLET(name, func, data) \
    struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(0), func, data }
    
    #define DECLARE_TASKLET_DISABLED(name, func, data) \
    struct tasklet_struct name = { NULL, 0, ATOMIC_INIT(1), func, data }
    

    显然,从上述源代码可以看出,用DECLARE_TASKLET宏定义的tasklet在初始化时是被使能的(enabled),因为其count成员为0。而用DECLARE_TASKLET_DISABLED宏定义的tasklet在初始时是被禁止的(disabled),因为其count等于1。

    3.2 改变一个tasklet状态的操作

    在这里,tasklet状态指两个方面:

    1. state:成员所表示的运行状态;
    2. count:成员决定的使能/禁止状态。

    3.2.1 改变一个tasklet的运行状态

    state成员中的bit[0]表示一个tasklet是否已被调度去等待执行,bit[1]表示一个tasklet是否正在某个CPU上执行。对于state变量中某位的改变必须是一个原子操作,因此可以用定义在include/asm/bitops.h头文件中的位操作来进行。
      由于bit[1]这一位(即TASKLET_STATE_RUN)仅仅对于SMP系统才有意义,因此Linux在Interrupt.h头文件中显示地定义了对TASKLET_STATE_RUN位的操作。如下所示:

    #ifdef CONFIG_SMP
    static inline int tasklet_trylock(struct tasklet_struct *t)
    {
    	return !test_and_set_bit(TASKLET_STATE_RUN, &(t)->state);
    }
     
    static inline void tasklet_unlock(struct tasklet_struct *t)
    {
    	smp_mb__before_clear_bit(); 
    	clear_bit(TASKLET_STATE_RUN, &(t)->state);
    }
     
    static inline void tasklet_unlock_wait(struct tasklet_struct *t)
    {
    	while (test_bit(TASKLET_STATE_RUN, &(t)->state)) { barrier(); }
    }
    #else
    #define tasklet_trylock(t) 1
    #define tasklet_unlock_wait(t) do { } while (0)
    #define tasklet_unlock(t) do { } while (0)
    #endif
    

    显然,在SMP系统同,tasklet_trylock()宏将把一个tasklet_struct结构变量中的state成员中的bit[1]位设置成1,同时还返回bit[1]位的非。因此,如果bit[1]位原有值为1(表示另外一个CPU正在执行这个tasklet代码),那么tasklet_trylock()宏将返回值0,也就表示上锁不成功。如果bit[1]位的原有值为0,那么tasklet_trylock()宏将返回值1,表示加锁成功。而在单CPU系统中,tasklet_trylock()宏总是返回为1。
    任何想要执行某个tasklet代码的程序都必须首先调用宏tasklet_trylock()来试图对这个tasklet进行上锁(即设置TASKLET_STATE_RUN位),且只能在上锁成功的情况下才能执行这个tasklet。建议!即使你的程序只在CPU系统上运行,你也要在执行tasklet之前调用tasklet_trylock()宏,以便使你的代码获得良好可移植性。
      在SMP系统中,tasklet_unlock_wait()宏将一直不停地测试TASKLET_STATE_RUN位的值,直到该位的值变为0(即一直等待到解锁),假如:CPU0正在执行tasklet A的代码,在此期间,CPU1也想执行tasklet A的代码,但CPU1发现tasklet A的TASKLET_STATE_RUN位为1,于是它就可以通过tasklet_unlock_wait()宏等待tasklet A被解锁(也即TASKLET_STATE_RUN位被清零)。在单CPU系统中,这是一个空操作。
      宏tasklet_unlock()用来对一个tasklet进行解锁操作,也即将TASKLET_STATE_RUN位清零。在单CPU系统中,这是一个空操作。

    3.2.2 使能/禁止一个tasklet

    使能与禁止操作往往总是成对地被调用的,tasklet_disable()函数如下(interrupt.h):

    static inline void tasklet_disable(struct tasklet_struct *t)
    {
    	tasklet_disable_nosync(t);
    	tasklet_unlock_wait(t);
    	smp_mb();
    }
    

    函数tasklet_disable_nosync()也是一个静态inline函数,它简单地通过原子操作将count成员变量的值加1。如下所示(interrupt.h):

    static inline void tasklet_disable_nosync(struct tasklet_struct *t)
    {
    	atomic_inc(&t->count);
    	smp_mb__after_atomic_inc();
    }
    

    函数tasklet_enable()用于使能一个tasklet,如下所示(interrupt.h):

    static inline void tasklet_enable(struct tasklet_struct *t)
    {
    	smp_mb__before_atomic_dec();
    	atomic_dec(&t->count);
    }
    

    3.3 tasklet描述符的初始化与杀死

    函数tasklet_init()用来初始化一个指定的tasklet描述符,其源码如下所示(kernel/softirq.c):

    void tasklet_init(struct tasklet_struct *t,
    		  void (*func)(unsigned long), unsigned long data)
    {
    	t->next = NULL;
    	t->state = 0;
    	atomic_set(&t->count, 0);
    	t->func = func;
    	t->data = data;
    }
    

    函数tasklet_kill()用来将一个已经被调度了的tasklet杀死,即将其恢复到未调度的状态。其源码如下所示(kernel/softirq.c):

    void tasklet_kill(struct tasklet_struct *t)
    {
    	if (in_interrupt())
    		pr_notice("Attempt to kill tasklet from interrupt\n");
    
    	while (test_and_set_bit(TASKLET_STATE_SCHED, &t->state)) {
    		do {
    			yield();
    		} while (test_bit(TASKLET_STATE_SCHED, &t->state));
    	}
    	tasklet_unlock_wait(t);
    	clear_bit(TASKLET_STATE_SCHED, &t->state);
    }
    

    3.4 tasklet对列

    多个tasklet可以通过tasklet描述符中的next成员指针链接成一个单向对列。为此,Linux专门在头文件include/linux/interrupt.h中定义了数据结构tasklet_head来描述一个tasklet对列的头部指针。如下所示:

    //\Linux-5.4\kernel\softirq.c文件中
    /*
     * Tasklets
     */
    struct tasklet_head
    {
    	struct tasklet_struct *head;
    	struct tasklet_struct **tail;
    };
    

    尽管tasklet机制是特定于软中断向量HI_SOFTIRQ和TASKLET_SOFTIRQ的一种实现,但是tasklet机制仍然属于softirq机制的整体框架范围内的,因此,它的设计与实现仍然必须坚持“谁触发,谁执行”的思想。为此,Linux为系统中的每一个CPU都定义了一个tasklet对列头部,来表示应该有各个CPU负责执行的tasklet对列。如下所示(kernel/softirq.c):

    //\Linux-5.4\include\linux\percpu-defs.h
    /*
     * Variant on the per-CPU variable declaration/definition theme used for
     * ordinary per-CPU variables.
     */
    #define DECLARE_PER_CPU(type, name)					\
    	DECLARE_PER_CPU_SECTION(type, name, "")
    
    #define DEFINE_PER_CPU(type, name)					\
    	DEFINE_PER_CPU_SECTION(type, name, "")
    //(kernel/softirq.c)
    static DEFINE_PER_CPU(struct tasklet_head, tasklet_vec);
    static DEFINE_PER_CPU(struct tasklet_head, tasklet_hi_vec);
    

    上面展开,可得:

    struct tasklet_head tasklet_vec[NR_CPUS] __cacheline_aligned;
    struct tasklet_head tasklet_hi_vec[NR_CPUS] __cacheline_aligned;
    

    其中,tasklet_vec[]数组用于软中断向量TASKLET_SOFTIRQ,而tasklet_hi_vec[]数组则用于软中断向量HI_SOFTIRQ。也即,如果CPUi(0≤i≤NR_CPUS-1)触发了软中断向量TASKLET_SOFTIRQ,那么对列tasklet_vec[i]中的每一个tasklet都将在CPUi服务于软中断向量TASKLET_SOFTIRQ时被CPUi所执行。同样地,如果CPUi(0≤i≤NR_CPUS-1)触发了软中断向量HI_SOFTIRQ,那么队列tasklet_hi_vec[i]中的每一个tasklet都将CPUi在对软中断向量HI_SOFTIRQ进行服务时被CPUi所执行。
      队列tasklet_vec[I]和tasklet_hi_vec[I]中的各个tasklet是怎样被所CPUi所执行的呢?其关键就是软中断向量TASKLET_SOFTIRQ和HI_SOFTIRQ的软中断服务程序——tasklet_action()函数和tasklet_hi_action()函数。下面我们就来分析这两个函数。

    3.5 软中断向量TASKLET_SOFTIRQ和HI_SOFTIRQ

    Linux为软中断向量TASKLET_SOFTIRQ和HI_SOFTIRQ实现了专用的触发函数和软中断服务函数。

    • 专用的触发函数

    tasklet_schedule()函数和tasklet_hi_schedule()函数分别用来在当前CPU上触发软中断向量TASKLET_SOFTIRQ和HI_SOFTIRQ,并把指定的tasklet加入当前CPU所对应的tasklet队列中去等待执行。

    • 专用的软中断服务函数

    tasklet_action()函数和tasklet_hi_action()函数则分别是软中断向量TASKLET_SOFTIRQ和HI_SOFTIRQ的软中断服务函数。在初始化函数softirq_init()中,这两个软中断向量对应的描述符softirq_vec[0]和softirq_vec[6]中的action函数指针就被分别初始化成指向函数tasklet_hi_action()和函数tasklet_action()。

    3.5.1软中断向量TASKLET_SOFTIRQ的触发函数tasklet_schedule

    该函数实现在include/linux/interrupt.h头文件中,是一个inline函数。其源码如下所示:

    //Linux-5.4\include\linux\interrupt.h
    static inline void tasklet_schedule(struct tasklet_struct *t)
    {
    	if (!test_and_set_bit(TASKLET_STATE_SCHED, &t->state))
    		__tasklet_schedule(t);
    }
    
    //\Linux-5.4\kernel\softirq.c
    void __tasklet_schedule(struct tasklet_struct *t)
    {
    	__tasklet_schedule_common(t, &tasklet_vec,
    				  TASKLET_SOFTIRQ);
    }
    
    //\Linux-5.4\kernel\softirq.c
    static void __tasklet_schedule_common(struct tasklet_struct *t,
    				      struct tasklet_head __percpu *headp,
    				      unsigned int softirq_nr)
    {
    	struct tasklet_head *head;
    	unsigned long flags;
    
    	local_irq_save(flags);
    	head = this_cpu_ptr(headp);
    	t->next = NULL;
    	*head->tail = t;
    	head->tail = &(t->next);
    	raise_softirq_irqoff(softirq_nr);
    	local_irq_restore(flags);
    }
    
    • 调用test_and_set_bit()函数将待调度的tasklet的state成员变量的bit[0]位(也即TASKLET_STATE_SCHED位)设置为1,该函数同时还返回TASKLET_STATE_SCHED位的原有值。因此如果bit[0]为的原有值已经为1,那就说明这个tasklet已经被调度到另一个CPU上去等待执行了。由于一个tasklet在某一个时刻只能由一个CPU来执行,因此tasklet_schedule()函数什么也不做就直接返回了。否则,就继续下面的调度操作。

    • 首先,调用local_irq_save()函数来关闭当前CPU的中断,以保证下面的步骤在当前CPU上原子地被执行。

    • 然后,将待调度的tasklet添加到当前CPU对应的tasklet队列的尾部。

    • 接着,调用raise_softirq_irqoff函数在当前CPU上触发软中断请求TASKLET_SOFTIRQ。

    • 最后,调用local_irq_restore()函数来开当前CPU的中断。

    3.5.2软中断向量TASKLET_SOFTIRQ的服务程序tasklet_action

    函数tasklet_action()是tasklet机制与软中断向量TASKLET_SOFTIRQ的联系纽带。正是该函数将当前CPU的tasklet队列中的各个tasklet放到当前CPU上来执行的。该函数实现在kernel/softirq.c文件中,其源代码如下:

    static __latent_entropy void tasklet_action(struct softirq_action *a)
    {
    	tasklet_action_common(a, this_cpu_ptr(&tasklet_vec), TASKLET_SOFTIRQ);
    }
    
    static void tasklet_action_common(struct softirq_action *a,
    				  struct tasklet_head *tl_head,
    				  unsigned int softirq_nr)
    {
    	struct tasklet_struct *list;
    
    	local_irq_disable();
    	list = tl_head->head;
    	tl_head->head = NULL;
    	tl_head->tail = &tl_head->head;
    	local_irq_enable();
    
    	while (list) {
    		struct tasklet_struct *t = list;
    
    		list = list->next;
    
    		if (tasklet_trylock(t)) {
    			if (!atomic_read(&t->count)) {
    				if (!test_and_clear_bit(TASKLET_STATE_SCHED,
    							&t->state))
    					BUG();
    				t->func(t->data);
    				tasklet_unlock(t);
    				continue;
    			}
    			tasklet_unlock(t);
    		}
    
    		local_irq_disable();
    		t->next = NULL;
    		*tl_head->tail = t;
    		tl_head->tail = &t->next;
    		__raise_softirq_irqoff(softirq_nr);
    		local_irq_enable();
    	}
    }
    
    • 首先,在当前CPU关中断的情况下,“原子”地读取当前CPU的tasklet队列头部指针,将其保存到局部变量list指针中,然后将当前CPU的tasklet队列头部指针设置为NULL,以表示理论上当前CPU将不再有tasklet需要执行(但最后的实际结果却并不一定如此,下面将会看到)。
    • 然后,用一个while{}循环来遍历由list所指向的tasklet队列,队列中的各个元素就是将在当前CPU上执行的tasklet。循环体的执行步骤如下:
    • 用指针t来表示当前队列元素,即当前需要执行的tasklet。
    • 更新list指针为list->next,使它指向下一个要执行的tasklet。
    • 用tasklet_trylock()宏试图对当前要执行的tasklet(由指针t所指向)进行加锁,如果加锁成功(当前没有任何其他CPU正在执行这个tasklet),则用原子读函数atomic_read()进一步判断count成员的值。如果count为0,说明这个tasklet是允许执行的,于是:
    1. 先清除TASKLET_STATE_SCHED位;
    2. 然后,调用这个tasklet的可执行函数func;
    3. 调用宏tasklet_unlock()来清除TASKLET_STATE_RUN位 ;
    4. 最后,执行continue语句跳过下面的步骤,回到while循环继续遍历队列中的下一个元素。如果count不为0,说明这个tasklet是禁止运行的,于是调用tasklet_unlock()清除前面用tasklet_trylock()设置的TASKLET_STATE_RUN位。

    3.6 tasklet使用总结

    1、声明和使用小任务大多数情况下,为了控制一个常用的硬件设备,小任务机制是实现下半部的最佳选择。小任务可以动态创建,使用方便,执行起来也比较快。我们既可以静态地创建小任务,也可以动态地创建它。选择那种方式取决于到底是想要对小任务进行直接引用还是一个间接引用。如果准备静态地创建一个小任务(也就是对它直接引用),使用下面两个宏中的一个:
    DECLARE_TASKLET(name,func, data)
    DECLARE_TASKLET_DISABLED(name,func, data)
    这两个宏都能根据给定的名字静态地创建一个tasklet_struct结构。当该小任务被调度以后,给定的函数func会被执行,它的参数由data给出。这两个宏之间的区别在于引用计数器的初始值设置不同。第一个宏把创建的小任务的引用计数器设置为0,因此,该小任务处于激活状态。另一个把引用计数器设置为1,所以该小任务处于禁止状态。例如:
    DECLARE_TASKLET(my_tasklet,my_tasklet_handler, dev);
    这行代码其实等价于
    struct tasklet_struct my_tasklet = { NULL, 0, ATOMIC_INIT(0),tasklet_handler,dev};
    这样就创建了一个名为my_tasklet的小任务,其处理程序为tasklet_handler,并且已被激活。当处理程序被调用的时候,dev就会被传递给它。

    2、编写自己的小任务处理程序小任务处理程序必须符合如下的函数类型:

    void tasklet_handler(unsigned long data)
    由于小任务不能睡眠,因此不能在小任务中使用信号量或者其它产生阻塞的函数。但是小任务运行时可以响应中断。

    3、调度自己的小任务通过调用tasklet_schedule()函数并传递给它相应的tasklt_struct指针,该小任务就会被调度以便适当的时候执行:
    tasklet_schedule(&my_tasklet); /*把my_tasklet标记为挂起 */
    在小任务被调度以后,只要有机会它就会尽可能早的运行。在它还没有得到运行机会之前,如果一个相同的小任务又被调度了,那么它仍然只会运行一次。

    可以调用tasklet_disable()函数来禁止某个指定的小任务。如果该小任务当前正在执行,这个函数会等到它执行完毕再返回。调用tasklet_enable()函数可以激活一个小任务,如果希望把以DECLARE_TASKLET_DISABLED()创建的小任务激活,也得调用这个函数,如:

    tasklet_disable(&my_tasklet); /小任务现在被禁止,这个小任务不能运行/

    tasklet_enable(&my_tasklet); /* 小任务现在被激活*/

    也可以调用tasklet_kill()函数从挂起的队列中去掉一个小任务。该函数的参数是一个指向某个小任务的tasklet_struct的长指针。在小任务重新调度它自身的时候,从挂起的队列中移去已调度的小任务会很有用。这个函数首先等待该小任务执行完毕,然后再将它移去。

    4、tasklet的简单用法

    下面是tasklet的一个简单应用,以模块的形成加载。

    #include <linux/module.h>
    #include <linux/init.h>
    #include <linux/fs.h>
    #include <linux/kdev_t.h>
    #include <linux/cdev.h>
    #include <linux/kernel.h>
    #include <linux/interrupt.h>
     
    static struct  t asklet_struct my_tasklet;
     
    static void tasklet_handler (unsigned long d ata)
    {
            printk(KERN_ALERT,"tasklet_handler is running./n");
    }
     
    static int __init test_init(void)
    {
            tasklet_init(&my_tasklet,tasklet_handler,0);
            tasklet_schedule(&my_tasklet);
            return0;
    }
     
    static  void __exit test_exit(void)
    {
            tasklet_kill(&tasklet);
            printk(KERN_ALERT,"test_exit is running./n");
    }
    MODULE_LICENSE("GPL");
     
    module_init(test_init);
    module_exit(test_exit);
    

    从这个例子可以看出,所谓的小任务机制是为下半部函数的执行提供了一种执行机制,也就是说,推迟处理的事情是由tasklet_handler实现,何时执行,经由小任务机制封装后交给内核去处理。

    四、中断处理的工作队列机制

    工作队列(work queue)是另外一种将工作推后执行的形式,它和前面讨论的tasklet有所不同。工作队列可以把工作推后,交由一个内核线程去执行,也就是说,这个下半部分可以在进程上下文中执行。这样,通过工作队列执行的代码能占尽进程上下文的所有优势。最重要的就是工作队列允许被重新调度甚至是睡眠。
    那么,什么情况下使用工作队列,什么情况下使用tasklet。如果推后执行的任务需要睡眠,那么就选择工作队列;如果推后执行的任务不需要睡眠,那么就选择tasklet。另外,如果需要用一个可以重新调度的实体来执行你的下半部处理,也应该使用工作队列。它是唯一能在进程上下文运行的下半部实现的机制,也只有它才可以睡眠。这意味着在需要获得大量的内存时、在需要获取信号量时,在需要执行阻塞式的I/O操作时,它都会非常有用。如果不需要用一个内核线程来推后执行工作,那么就考虑使用tasklet。

    4.1 工作、工作队列

    如前所述,我们把推后执行的任务叫做工作(work),描述它的数据结构为work_struct,这些工作以队列结构组织成工作队列(workqueue),其数据结构为workqueue_struct,而工作线程就是负责执行工作队列中的工作。系统默认的工作者线程为events,自己也可以创建自己的工作者线程。表示工作的数据结构用<linux/workqueue.h>中定义的work_struct结构表示:

    struct work_struct {
    	atomic_long_t data;
    	struct list_head entry;	// 连接所有工作的链表
    	work_func_t func; 		// 要执行的函数
    #ifdef CONFIG_LOCKDEP
    	struct lockdep_map lockdep_map;
    #endif
    };
    typedef void (*work_func_t)(struct work_struct *work);
    

    这些结构被连接成链表。当一个工作者线程被唤醒时,它会执行它的链表上的所有工作。工作被执行完毕,它就将相应的work_struct对象从链表上移去。当链表上不再有对象的时候,它就会继续休眠。表示工作队列的数据结构用<kernel/workqueue.c>中定义的workqueue_struct:

    /*
     * The externally visible workqueue.  It relays the issued work items to
     * the appropriate worker_pool through its pool_workqueues.
     */
    struct workqueue_struct {
    	struct list_head	pwqs;		/* WR: all pwqs of this wq */
    	struct list_head	list;		/* PR: list of all workqueues */
    
    	struct mutex		mutex;		/* protects this wq */
    	int			work_color;	/* WQ: current work color */
    	int			flush_color;	/* WQ: current flush color */
    	atomic_t		nr_pwqs_to_flush; /* flush in progress */
    	struct wq_flusher	*first_flusher;	/* WQ: first flusher */
    	struct list_head	flusher_queue;	/* WQ: flush waiters */
    	struct list_head	flusher_overflow; /* WQ: flush overflow list */
    
    	struct list_head	maydays;	/* MD: pwqs requesting rescue */
    	struct worker		*rescuer;	/* I: rescue worker */
    
    	int			nr_drainers;	/* WQ: drain in progress */
    	int			saved_max_active; /* WQ: saved pwq max_active */
    
    	struct workqueue_attrs	*unbound_attrs;	/* PW: only for unbound wqs */
    	struct pool_workqueue	*dfl_pwq;	/* PW: only for unbound wqs */
    
    #ifdef CONFIG_SYSFS
    	struct wq_device	*wq_dev;	/* I: for sysfs interface */
    #endif
    #ifdef CONFIG_LOCKDEP
    	char			*lock_name;
    	struct lock_class_key	key;
    	struct lockdep_map	lockdep_map;
    #endif
    	char			name[WQ_NAME_LEN]; /* I: workqueue name */
    
    	/*
    	 * Destruction of workqueue_struct is RCU protected to allow walking
    	 * the workqueues list without grabbing wq_pool_mutex.
    	 * This is used to dump all workqueues from sysrq.
    	 */
    	struct rcu_head		rcu;
    
    	/* hot fields used during command issue, aligned to cacheline */
    	unsigned int		flags ____cacheline_aligned; /* WQ: WQ_* flags */
    	struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs */
    	struct pool_workqueue __rcu *numa_pwq_tbl[]; /* PWR: unbound pwqs indexed by node */
    };
    

    4.2 创建推后的工作

    4.2.1 静态地创建工作(work_struct)

    要使用工作队列,首先要做的是创建一些需要推后完成的工作。可以通过DECLARE_WORK在编译时静态地建该结构:

    DECLARE_WORK(name, func); 或者INIT_WORK(_work, _func)

    其定义如下:

    #define DECLARE_WORK(n, f)					\
    	struct work_struct n = __WORK_INITIALIZER(n, f)
    

    举例如下:

    static void do_poweroff(struct work_struct *dummy)
    {
    	kernel_power_off();
    }
     
    static DECLARE_WORK(poweroff_work, do_poweroff);
    

    即创建了一个全局静态变量:static work_struct poweroff_work,且被初始化了,其执行函数为do_poweroff。

    4.2.2 动态初始化工作(work_struct)

    先定义一具struct work_struct 变量,在需要使用时调用INIT_WORK进行初始化,然后便可以使用。

    #define INIT_WORK(_work, _func)					\
    	do {							\
    		__INIT_WORK((_work), (_func), 0);		\
    	} while (0)
    

    举例:

    void __cfg80211_scan_done(struct work_struct *wk)
    {
    	struct cfg80211_registered_device *rdev;
     
    	rdev = container_of(wk, struct cfg80211_registered_device,
    			    scan_done_wk);
     
    	cfg80211_lock_rdev(rdev);
    	___cfg80211_scan_done(rdev, false);
    	cfg80211_unlock_rdev(rdev);
    }
     
    struct cfg80211_registered_device {
     
    	struct work_struct scan_done_wk;
    	struct work_struct sched_scan_results_wk;
    	struct work_struct conn_work;
    	struct work_struct event_work;
    	struct cfg80211_wowlan *wowlan;
    }
    struct cfg80211_registered_device *rdev;
    rdev = kzalloc(alloc_size, GFP_KERNEL);
     
    INIT_WORK(&rdev->scan_done_wk, __cfg80211_scan_done);  // 其执行函数为: __cfg80211_scan_done
    INIT_WORK(&rdev->sched_scan_results_wk, __cfg80211_sched_scan_results);
    

    4.3 对工作进行调度

    现在工作已经被创建,我们可以调度它了。想要把给定工作的待处理函数提交给缺省的events工作线程,只需调用: int schedule_work(struct work_struct *work);
    它把work放入全局工作队列:system_wq,其定义如下:

    /**
     * schedule_work - put work task in global workqueue
     * @work: job to be done
     *
     * Returns %false if @work was already on the kernel-global workqueue and
     * %true otherwise.
     *
     * This puts a job in the kernel-global workqueue if it was not already
     * queued and leaves it in the same position on the kernel-global
     * workqueue otherwise.
     */
    static inline bool schedule_work(struct work_struct *work)
    {
    	return queue_work(system_wq, work);
    }
    /* System-wide workqueues which are always present.
     *
     * system_wq is the one used by schedule[_delayed]_work[_on]().
     * Multi-CPU multi-threaded.  There are users which expect relatively
     * short queue flush time.  Don't queue works which can run for too
     * long.
     */
    extern struct workqueue_struct *system_wq;
    

    queue_work:把一个工作放入工作队列:

    /**
     * queue_work - queue work on a workqueue
     * @wq: workqueue to use
     * @work: work to queue
     *
     * Returns %false if @work was already on a queue, %true otherwise.
     *
     * We queue the work to the CPU on which it was submitted, but if the CPU dies
     * it can be processed by another CPU.
     */
    static inline bool queue_work(struct workqueue_struct *wq,
    			      struct work_struct *work)
    {
    	return queue_work_on(WORK_CPU_UNBOUND, wq, work);
    }
    
    /**
     * queue_work_on - queue work on specific cpu
     * @cpu: CPU number to execute work on
     * @wq: workqueue to use
     * @work: work to queue
     *
     * We queue the work to a specific CPU, the caller must ensure it
     * can't go away.
     *
     * Return: %false if @work was already on a queue, %true otherwise.
     */
    bool queue_work_on(int cpu, struct workqueue_struct *wq,
    		   struct work_struct *work)
    {
    	bool ret = false;
    	unsigned long flags;
    
    	local_irq_save(flags);
    
    	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
    		__queue_work(cpu, wq, work);
    		ret = true;
    	}
    
    	local_irq_restore(flags);
    	return ret;
    }
    

    把work放入工作队列,work马上就会被调度,一旦其所在的处理器上的工作者线程被唤醒,它就会被执行。有时候并不希望工作马上就被执行,而是希望它经过一段延迟以后再执行。在这种情况下,可以调度它在指定的时间执行:

    /**
     * schedule_delayed_work - put work task in global workqueue after delay
     * @dwork: job to be done
     * @delay: number of jiffies to wait or 0 for immediate execution
     *
     * After waiting for a given time this puts a job in the kernel-global
     * workqueue.
     */
    static inline bool schedule_delayed_work(struct delayed_work *dwork,
    					 unsigned long delay)
    {
    	return queue_delayed_work(system_wq, dwork, delay);
    }
    
    #define DECLARE_DELAYED_WORK(n, f)					\
    	struct delayed_work n = __DELAYED_WORK_INITIALIZER(n, f, 0)
    
    #define INIT_DELAYED_WORK(_work, _func)					\
    	__INIT_DELAYED_WORK(_work, _func, 0)
    

    4.4 创建工作者线程

    工作放入工作队列之后,由管理此工作队列的工作者来执行这些work,通过alloc_workqueue或create_singlethread_workqueue来创建工作者线程,它最后调用kthread_create创建线程,其线程名为alloc_workqueue中指定的name,其举例如下:

    /**
     * workqueue_init_early - early init for workqueue subsystem
     *
     * This is the first half of two-staged workqueue subsystem initialization
     * and invoked as soon as the bare basics - memory allocation, cpumasks and
     * idr are up.  It sets up all the data structures and system workqueues
     * and allows early boot code to create workqueues and queue/cancel work
     * items.  Actual work item execution starts only after kthreads can be
     * created and scheduled right before early initcalls.
     */
    int __init workqueue_init_early(void)
    {
    	int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL };
    	int hk_flags = HK_FLAG_DOMAIN | HK_FLAG_WQ;
    	int i, cpu;
    
    	WARN_ON(__alignof__(struct pool_workqueue) < __alignof__(long long));
    
    	BUG_ON(!alloc_cpumask_var(&wq_unbound_cpumask, GFP_KERNEL));
    	cpumask_copy(wq_unbound_cpumask, housekeeping_cpumask(hk_flags));
    
    	pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC);
    
    	/* initialize CPU pools */
    	for_each_possible_cpu(cpu) {
    		struct worker_pool *pool;
    
    		i = 0;
    		for_each_cpu_worker_pool(pool, cpu) {
    			BUG_ON(init_worker_pool(pool));
    			pool->cpu = cpu;
    			cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
    			pool->attrs->nice = std_nice[i++];
    			pool->node = cpu_to_node(cpu);
    
    			/* alloc pool ID */
    			mutex_lock(&wq_pool_mutex);
    			BUG_ON(worker_pool_assign_id(pool));
    			mutex_unlock(&wq_pool_mutex);
    		}
    	}
    
    	/* create default unbound and ordered wq attrs */
    	for (i = 0; i < NR_STD_WORKER_POOLS; i++) {
    		struct workqueue_attrs *attrs;
    
    		BUG_ON(!(attrs = alloc_workqueue_attrs()));
    		attrs->nice = std_nice[i];
    		unbound_std_wq_attrs[i] = attrs;
    
    		/*
    		 * An ordered wq should have only one pwq as ordering is
    		 * guaranteed by max_active which is enforced by pwqs.
    		 * Turn off NUMA so that dfl_pwq is used for all nodes.
    		 */
    		BUG_ON(!(attrs = alloc_workqueue_attrs()));
    		attrs->nice = std_nice[i];
    		attrs->no_numa = true;
    		ordered_wq_attrs[i] = attrs;
    	}
    
    	system_wq = alloc_workqueue("events", 0, 0);
    	system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
    	system_long_wq = alloc_workqueue("events_long", 0, 0);
    	system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
    					    WQ_UNBOUND_MAX_ACTIVE);
    	system_freezable_wq = alloc_workqueue("events_freezable",
    					      WQ_FREEZABLE, 0);
    	system_power_efficient_wq = alloc_workqueue("events_power_efficient",
    					      WQ_POWER_EFFICIENT, 0);
    	system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_power_efficient",
    					      WQ_FREEZABLE | WQ_POWER_EFFICIENT,
    					      0);
    	BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||
    	       !system_unbound_wq || !system_freezable_wq ||
    	       !system_power_efficient_wq ||
    	       !system_freezable_power_efficient_wq);
    
    	return 0;
    }
    
    
    /**
     * workqueue_init - bring workqueue subsystem fully online
     *
     * This is the latter half of two-staged workqueue subsystem initialization
     * and invoked as soon as kthreads can be created and scheduled.
     * Workqueues have been created and work items queued on them, but there
     * are no kworkers executing the work items yet.  Populate the worker pools
     * with the initial workers and enable future kworker creations.
     */
    int __init workqueue_init(void)
    {
    	struct workqueue_struct *wq;
    	struct worker_pool *pool;
    	int cpu, bkt;
    
    	/*
    	 * It'd be simpler to initialize NUMA in workqueue_init_early() but
    	 * CPU to node mapping may not be available that early on some
    	 * archs such as power and arm64.  As per-cpu pools created
    	 * previously could be missing node hint and unbound pools NUMA
    	 * affinity, fix them up.
    	 *
    	 * Also, while iterating workqueues, create rescuers if requested.
    	 */
    	wq_numa_init();
    
    	mutex_lock(&wq_pool_mutex);
    
    	for_each_possible_cpu(cpu) {
    		for_each_cpu_worker_pool(pool, cpu) {
    			pool->node = cpu_to_node(cpu);
    		}
    	}
    
    	list_for_each_entry(wq, &workqueues, list) {
    		wq_update_unbound_numa(wq, smp_processor_id(), true);
    		WARN(init_rescuer(wq),
    		     "workqueue: failed to create early rescuer for %s",
    		     wq->name);
    	}
    
    	mutex_unlock(&wq_pool_mutex);
    
    	/* create the initial workers */
    	for_each_online_cpu(cpu) {
    		for_each_cpu_worker_pool(pool, cpu) {
    			pool->flags &= ~POOL_DISASSOCIATED;
    			BUG_ON(!create_worker(pool));
    		}
    	}
    
    	hash_for_each(unbound_pool_hash, bkt, pool, hash_node)
    		BUG_ON(!create_worker(pool));
    
    	wq_online = true;
    	wq_watchdog_init();
    
    	return 0;
    }
    
    

    如:cfg80211_wq = create_singlethread_workqueue(“cfg80211”);创建了一个名为cfg80211的kernel线程。

    4.5 工作队列的简单应用

    #include <linux/module.h> 
    #include <linux/init.h> 
    #include <linux/workqueue.h> 
    static struct workqueue_struct *queue = NULL; 
    static struct work_struct work; 
    static void work_handler(struct work_struct *data) 
    {
      	printk(KERN_ALERT "work handler function.\n"); 
    }
    
    static int __init test_init(void) 
    {
    	queue = create_singlethread_workqueue("helloworld"); 
    	/*创建一个单线程的工作队列*/         
    	if (!queue)                 
    		goto err;         
    	INIT_WORK(&work, work_handler);         
    	schedule_work(&work);         
    	return 0; 
    	err:         
    	return -1; 
    } 
    static void __exit test_exit(void) {
        destroy_workqueue(queue); 
    } 
        
    MODULE_LICENSE("GPL"); 
    module_init(test_init); 
    module_exit(test_exit);
    
    

    参考

    [1] https://blog.csdn.net/myarrow/article/details/9287169

    [2] https://blog.csdn.net/yhb1047818384/article/details/63687126

    展开全文
  • Python中断

    千次阅读 2021-01-13 09:52:55
    python是否有中断机制?初学Python,想编一个贪吃蛇的小程序,现在有一个问题待解决。定时移动pygame是一个解决办法。 另外你对界面交互可以了解一下。 程序能够处理多个事件,是因为它本身有多线程支持同时做多个...

    python是否有中断机制?

    初学Python,想编一个贪吃蛇的小程序,现在有一个问题待解决。定时移动pygame是一个解决办法。 另外你对界面交互可以了解一下。 程序能够处理多个事件,是因为它本身有多线程支持同时做多个事情。通常一个界面程序的结构是这样。 主界面线程,一直在循环接收窗口消息键盘消息,并绘制,或者是处理键盘。

    python里怎么终止程序的执行

    如果你是在程序中让其自动退出,则可以使用: exit()执行到此命令时,程序终止。 如果是程序陷入死循环,想强制结束,则按Ctrl + C。

    python 如何跳过异常继续执行

    我用try...except...处理异常的时候,只要有一个异常,程序就不继续执行下面有两种解决方法,第一种是类似if..else..;另外一种是使用语句来实现继续执行; 方法一:使用try...except...语句,类似于if...else...,可以跳过异常继续执行程序,这是Python的优势 用法如下: 方法二:使用语句来继续执行; 拓展资料 异

    python 脚本被意外打断之后(比如开网页但是断网了我写了个类似爬虫的脚本抓取网页,要跑大概一天,但是网不太稳,偶尔要如果你只想运行一次,那么不用循环几乎是不可能的。当然你用crontab之类的定时任务来处理也可以。我猜想你登陆之后应该是要做一些操作的,那么在做任何操作之前你都去判断一下登陆状态,如果session过期或者其他原因导致登陆状态失效。

    Python中如何在一段时间后停止程序

    python中循环语句半途中断

    bc=0s=192l=1z=1jihe=set(())while z<=400: bc=input('请输入barcode码 加一个 continue在后面

    python线程如何捕获中断信号

    import threadtext = Nonedef get_input(): global text text = raw_input()def main(): while True: print "running" if text 。= None: breakthread.start_new_thread(get_input,())main() global全局变量吧?看看上面这段代码是我常用的套路。

    Python中断多重循环的几种思路

    事实上,Python的标准语法是不支持跳出多重循环的,所以只能利用一些技巧,大概的思路有:写成函数、利用笛卡尔积、利用调试。 写成函数 在Python中,函数运行到return这一句就会停止,因此可以利用这一特性,将功能写成函数,终止多重循环。

    sublime text2 运行python程序 中途怎么停止

    关掉 sublime text2 使用快捷键或者哪些按钮,停止该程序,就像调试一样,如下图 Cancel Build 快捷键 Ctrl+Break ,试一下行不行得通。

    python如何终止os.system调用的程序

    在python中用os.system()调用的一个程序需要手动Ctrl+C来终止其运行,我建议用subprocess 因为如果你的程序不会自动停止,那么用os.system(cmd),cmd会一直持续运行知道调用的程序返回结果。 subprocess可以开启一个子线程,在子线程里面调用,在你需要的时候就可以把这个子线程关掉,这样代码也更灵活 例如: p = su

    展开全文
  • Java的中断是一种协作机制。也就是说调用线程对象的interrupt方法并不一定就中断了正在运行的线程,它只是要求线程自己在合适的时机中断自己。一、Java中断的现象首先,看看Thread类里的几个方法:public static ...
  • 我们看一下这两种方式,中断看似很高效,但是却会遗漏一些数据,避免遗漏的机制要么由硬件实现要么由上层的软件实现,而轮询就没有中断高效了,它会做很多 徒劳的操作,而且必须引入暂存机制,就是说由于 CPU 不...
  • 背景有一个项目对实时性要求比较高,于是在linux内核上打了RT_PREEMPT补丁。最终碰到的一个问题是,芯片本身性能不强,CPU资源...原来这是一个被线程化了的中断服务程序,负责处理i2c中断的。这个项目i2c总线上挂...
  • 前边几篇Blog分别介绍了JVM的类加载机制、运行时数据区域,字节码的执行,在执行完成后程序发挥完了自己的作用,线程独有的程序计数器、虚拟机栈、本地方法栈3个区域随线程而生,随线程而灭,而线程共享的堆和方法区...
  • 中断和轮询

    2021-09-07 17:39:30
    外部设备与中央处理器交互一般有两种手段:轮询和中断。 轮询(Polling) 很多I/O设备都有一个状态寄存器,用于描述设备当前的工作状态,每当设备状态发生改变时,设备将修改相应状态寄存器位。通过不断查询...
  • x86处理器如何处理MSI-X中断请求PCIe设备发出MSI-X中断请求的方法与发出MSI中断请求的方法类似,都是向Message Address所在的地址写Message Data字段包...
  • 线程的定义给我们提供了并发执行多个任务的方式,大多数情况下我们会让每个任务都自行...线程调度策略中有抢占式和协作式两个概念,与之类似的是中断机制也有协作式和抢占式。历史上Java曾经使用stop()方法终止线...
  • 一、Java中断的现象首先,看看Thread类里的几个方法:public static booleaninterrupted测试当前线程是否已经中断。线程的中断状态由该方法清除。换句话说,如果连续两次调用该方法,则第二次调用将返回 false(在第...
  • 线程的定义给我们提供了并发执行多个任务的方式,大多数情况下我们会让每个任务都自行...线程调度策略中有抢占式和协作式两个概念,与之类似的是中断机制也有协作式和抢占式。历史上Java曾经使用stop()方法终止线...
  • 目前有三种中断的三种机制: 软中断 tasklet 工作队列 软中断中断是一组静态定义的下半部接口,有 32 个,可以在所有处理器上同时执行(瞎胡扯哈!!看看gic初始化流程,所有中断只routing到boot cpu上…),...
  • 本文先对** ROS通讯机制及定时器中断(使用ROS的通讯机制、定时器和C++相关线程技术) **做个简单的介绍,具体内容后续再更,其他模块可以参考去我其他文章 提示:以下是本篇文章正文内容 一、参数加载方法...
  • 转载地址:http://blog.csdn.net/dlite/article/details/4218105在历史上,Java试图...另一方面,出于Java应用代码的健壮性的考虑,降低了编程门槛,减少不清楚底层机制的程序员无意破坏系统的概率。如今,Java的...
  • linux 中断子系统

    2021-08-15 15:14:16
    linux中断用在很多方面,如最简单的按键触发的中断事件,网卡收包后的中断等等。 文章参考了韦东山老师中断讲解内容。 1、环境 2、中断概念 2.1 异常 异常概念大于中断中断也是异常的一种。 指令未定义 ...
  • 一文看懂JUC之AQS机制

    2021-05-12 00:44:16
    作者:VectorJinjuejin.cn/post/6844904041760161806为了解决原子性的问题,Java加入了锁机制,同时保证了可见性和顺序性。JDK1.5的并发包中新...
  • 当报告车辆碰撞的传感器中断CPU后,操作系统应快速地分配展开气囊的任务,并且不允许任何其他非实时处理进行干扰,晚一秒钟展开气囊比没有气囊的情况更糟糕,这就是一个典型的必须使用硬实时的系统。而最近特斯拉...
  • 这台从库与主库的同步出现中断,报错为:Slave_IO_Running: Yes Slave_SQL_Running: No Last_Errno: 1756 Last_Error: ... The slave coordinator and worker threads are stopped, possibly leaving data in ...
  • posting 相对于sending的一个优势是,它给了Qt一个压缩(compress)事件的机会。 假如你在一个widget上连续地调用update() 十次,因update()而产生的这十个事件,将会自动地被合并为一个单独的事件,但是QPaintEvents...
  • 展开全部Java的线程机制 摘 要: 多线程机制是Java的重要技术,阐述了636f70793231313335323631343130323136353331333339656535线程和进程的差别;Java中线程4个状态之间的转换;并结合例子说明了两种创建线程的方法...
  • Android广播机制(1)

    2021-05-27 07:34:15
    } } } } 静态注册实现开机启动 动态注册广播接收器可以自由的控制注册与注销,在灵活性方面有很大的优势,但是它也存在着一个缺点即必须要在程序启动之后才能接受到广播,因为注册的逻辑写在onCreate方法中的 步骤 ...
  • 进程调度.png 默认的调度策略 实时进程 > 普通进程 实时中 FIFO > RR (Round Robin) 普通进程的策略是 CFM(完全公平调度策略) 默认调度策略.png 中断 是 硬件 与 操作系统 通信 的一种机制。 如果 CPU 正在运算,...
  • Linux中断下半部和推后执行的工作 中断处理程序只能完成整个中断处理流程的上半部分,同时中断处理程序具有相当的局限性: 中断处理程序以异步方式执行,并且有可能会打断其他重要代码的执行。因此,为了避免被打断...
  • Linux 驱动中断部分

    2021-06-30 10:12:26
    中断 1、定义 中断 : cpu在运行工作时,出现了某些突发事件,放下目前手上的工作,处理这个突发事件,处理完毕在返回处理中断前的事件。 2、Linux里中断处理框架 Linux里将中断分为上半部与下半部,原因:进行中断...
  • 此外另一个事实是,使用error-code作为错误汇报机制的代码往往并不使用RAII。 当整个函数需要保持完全的异常中立的时候,异常的优势就更显现出来了:使用error-code,你还是需要一次次的小心check每个返回的错误值,...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 46,280
精华内容 18,512
关键字:

中断机制的优势