精华内容
下载资源
问答
  • 是不是觉得很玄?...那好,我们来看点具体的,比如935行,INIT_DELAYED_WORK().这是一张新面孔.同志们大概注意到了,在hub这个故事里,我们的讲解风格略有变化,对于那些旧的东西,对于那些在usb-storage里面...

    转自:http://blog.chinaunix.net/uid-9688646-id-4052595.html

    是不是觉得很玄?像思念一样玄?那好,我们来看点具体的,比如935行,INIT_DELAYED_WORK().这是一张新面孔.同志们大概注意到了,在hub这个故事里,我们的讲解风格略有变化,对于那些旧的东西,对于那些在usb-storage里面讲过很多次的东西,我们不会再多提,但是对于新鲜的东西,我们会花大把的笔墨去描摹.这样做的原因很简单,男人嘛,有几个不是喜新厌旧呢,要不然也不会结婚前觉得适合自己的女人很少,结婚后觉得适合自己的女人很多.

    所以本节我们就用大把的笔墨来讲述老百姓自己的故事.就讲这一行,935行.INIT_DELAYED_WORK()是一个宏,我们给它传递了两个参数.&hub->leds和led_work.对设备驱动熟悉的人不会觉得INIT_DELAYED_WORK()很陌生,其实鸦片战争那会儿就有这个宏了,只不过从2.6.20的内核开始这个宏做了改变,原来这个宏是三个参数,后来改成了两个参数,所以经常在网上看见一些同志抱怨说最近某个模块编译失败了,说什么make的时候遇见这么一个错误:

    error: macro "INIT_DELAYED_WORK" passed 3 arguments, but takes just 2

    当然更为普遍的看到下面这个错误:

    error: macro "INIT_WORK" passed 3 arguments, but takes just 2

    于是就让我们来仔细看看INIT_WORK和INIT_DELAYED_WORK.其实前者是后者的一个特例,它们涉及到的就是传说中的工作队列.这两个宏都定义于include/linux/workqueue.h中:

         79 #define INIT_WORK(_work, _func)                                         /

         80         do {                                                            /

         81                 (_work)->data = (atomic_long_t) WORK_DATA_INIT();       /

         82                 INIT_LIST_HEAD(&(_work)->entry);                        /

         83                 PREPARE_WORK((_work), (_func));                         /

         84         } while (0)

         85

         86 #define INIT_DELAYED_WORK(_work, _func)                         /

         87         do {                                                    /

         88                 INIT_WORK(&(_work)->work, (_func));             /

         89                 init_timer(&(_work)->timer);                    /

         90         } while (0)

    有时候特怀念谭浩强那本书里的那些例子程序,因为那些程序都特简单,不像现在看到的这些,动不动就是些复杂的函数复杂的数据结构复杂的宏,严重挫伤了我这样的有志青年的自信心.就比如眼下这几个宏吧,宏里边还是宏,一个套一个,不是说看不懂,因为要看懂也不难,一层一层展开,只不过确实没必要非得都看懂,现在这样一种朦胧美也许更美,有那功夫把这些都展开我还不如去认认真真学习三个代表呢.总之,关于工作队列,就这么说吧,Linux内核实现了一个内核线程,直观一点,ps命令看一下您的进程,

    localhost:/usr/src/linux-2.6.22.1/drivers/usb/core # ps -el

    F S   UID   PID  PPID  C PRI  NI ADDR SZ WCHAN  TTY          TIME CMD

    4 S     0     1     0  0  76   0 -   195 -      ?        00:00:02 init

    1 S     0     2     1  0 -40   - -     0 migrat ?        00:00:00 migration/0

    1 S     0     3     1  0  94  19 -     0 ksofti ?        00:00:00 ksoftirqd/0

    1 S     0     4     1  0 -40   - -     0 migrat ?        00:00:00 migration/1

    1 S     0     5     1  0  94  19 -     0 ksofti ?        00:00:00 ksoftirqd/1

    1 S     0     6     1  0 -40   - -     0 migrat ?        00:00:00 migration/2

    1 S     0     7     1  0  94  19 -     0 ksofti ?        00:00:00 ksoftirqd/2

    1 S     0     8     1  0 -40   - -     0 migrat ?        00:00:00 migration/3

    1 S     0     9     1  0  94  19 -     0 ksofti ?        00:00:00 ksoftirqd/3

    1 S     0    10     1  0 -40   - -     0 migrat ?        00:00:00 migration/4

    1 S     0    11     1  0  94  19 -     0 ksofti ?        00:00:00 ksoftirqd/4

    1 S     0    12     1  0 -40   - -     0 migrat ?        00:00:00 migration/5

    1 S     0    13     1  0  94  19 -     0 ksofti ?        00:00:00 ksoftirqd/5

    1 S     0    14     1  0 -40   - -     0 migrat ?        00:00:00 migration/6

    1 S     0    15     1  0  94  19 -     0 ksofti ?        00:00:00 ksoftirqd/6

    1 S     0    16     1  0 -40   - -     0 migrat ?        00:00:00 migration/7

    1 S     0    17     1  0  94  19 -     0 ksofti ?        00:00:00 ksoftirqd/7

    5 S     0    18     1  0  70  -5 -     0 worker ?        00:00:00 events/0

    1 S     0    19     1  0  70  -5 -     0 worker ?        00:00:00 events/1

    5 S     0    20     1  0  70  -5 -     0 worker ?        00:00:00 events/2

    5 S     0    21     1  0  70  -5 -     0 worker ?        00:00:00 events/3

    5 S     0    22     1  0  70  -5 -     0 worker ?        00:00:00 events/4

    1 S     0    23     1  0  70  -5 -     0 worker ?        00:00:00 events/5

    5 S     0    24     1  0  70  -5 -     0 worker ?        00:00:00 events/6

    5 S     0    25     1  0  70  -5 -     0 worker ?        00:00:00 events/7

    瞅见最后这几行了吗,events/0到events/7,0啊7啊这些都是处理器的编号,每个处理器对应其中的一个线程.要是您的计算机只有一个处理器,那么您只能看到一个这样的线程,events/0,您要是双处理器那您就会看到多出一个events/1的线程.哥们儿这里Dell PowerEdge 2950的机器,8个处理器,所以就是events/0到events/7了.

    那么究竟这些events代表什么意思呢?或者说它们具体干嘛用的?这些events被叫做工作者线程,或者说worker threads,更确切的说,这些应该是缺省的工作者线程.而与工作者线程相关的一个概念就是工作队列,或者叫work queue.工作队列的作用就是把工作推后,交由一个内核线程去执行,更直接的说就是如果您写了一个函数,而您现在不想马上执行它,您想在将来某个时刻去执行它,那您用工作队列准没错.您大概会想到中断也是这样,提供一个中断服务函数,在发生中断的时候去执行,没错,和中断相比,工作队列最大的好处就是可以调度可以睡眠,灵活性更好.

    就比如这里,如果我们将来某个时刻希望能够调用led_work()这么一个我们自己写的函数,那么我们所要做的就是利用工作队列.如何利用呢?第一步就是使用INIT_WORK()或者INIT_DELAYED_WORK()来初始化这么一个工作,或者叫任务,初始化了之后,将来如果咱们希望调用这个led_work()函数,那么咱们只要用一句schedule_work()或者schedule_delayed_work()就可以了,特别的,咱们这里使用的是INIT_DELAYED_WORK(),那么之后我们就会调用schedule_delayed_work(),这俩是一对.它表示,您希望经过一段延时然后再执行某个函数,所以,咱们今后会见到schedule_delayed_work()这个函数的,而它所需要的参数,一个就是咱们这里的&hub->leds,另一个就是具体自己需要的延时.&hub->leds是什么呢?struct usb_hub中的成员,struct delayed_work leds,专门用于延时工作的,再看struct delayed_work,这个结构体定义于include/linux/workqueue.h:

         35 struct delayed_work {

         36         struct work_struct work;

         37         struct timer_list timer;

         38 };

    其实就是一个struct work_struct和一个timer_list,前者是为了往工作队列里加入自己的工作,后者是为了能够实现延时执行,咱们把话说得更明白一点,您看那些events线程,它们对应一个结构体,struct workqueue_struct,也就是说它们维护着一个队列,完了您要是想利用工作队列这么一个机制呢,您可以自己创建一个队列,也可以直接使用events对应的这个队列,对于大多数情况来说,都是选择了events对应的这个队列,也就是说大家都共用这么一个队列,怎么用呢?先初始化,比如调用INIT_DELAYED_WORK(),这么一初始化吧,实际上就是为一个struct work_struct结构体绑定一个函数,就比如咱们这里的两个参数,&hub->leds和led_work()的关系,就最终让hub_leds这个struct work_struct结构体和函数led_work()相绑定了起来,您问怎么绑定的?您瞧,struct work_struct也是定义于include/linux/workqueue.h:

         24 struct work_struct {

         25         atomic_long_t data;

         26 #define WORK_STRUCT_PENDING 0           /* T if work item pending execution */

         27 #define WORK_STRUCT_FLAG_MASK (3UL)

         28 #define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK)

         29         struct list_head entry;

         30         work_func_t func;

         31 };

    瞅见最后这个成员func了吗,初始化的目的就是让func指向led_work(),这就是绑定,所以以后咱们调用schedule_delayed_work()的时候,咱们只要传递struct work_struct的结构体参数即可,不用再每次都把led_work()这个函数名也给传递一次,一旦绑定,人家就知道了,对于led_work(),那她就嫁鸡随鸡,嫁狗随狗,嫁混蛋随混蛋了.您大概还有一个疑问,为什么只要这里初始化好了,到时候调用schedule_delayed_work()就可以了呢?事实上,events这么一个线程吧,它其实和hub的内核线程一样,有事情就处理,没事情就睡眠,也是一个死循环,而schedule_delayed_work()的作用就是唤醒这个线程,确切的说,是先把自己的这个struct work_struct插入workqueue_struct这个队列里,然后唤醒昏睡中的events.然后events就会去处理,您要是有延时,那么它就给您安排延时以后执行,您要是没有延时,或者您设了延时为0,那好,那就赶紧给您执行.咱这里不是讲了两个宏吗,一个INIT_WORK(),一个INIT_DELAYED_WORK(),后者就是专门用于可以有延时的,而前者就是没有延时的,这里咱们调用的是INIT_DELAYED_WORK(),不过您别美,过一会您会看见INIT_WORK()也被使用了,因为咱们hub驱动中还有另一个地方也想利用工作队列这么一个机制,而它不需要延时,所以就使用INIT_WORK()进行初始化,然后在需要调用相关函数的时候调用schedule_work()即可.此乃后话,暂且不表.

    基本上这一节咱们就是介绍了Linux内核中工作队列机制提供的接口,两对函数INIT_DELAYED_WORK()对schedule_delayed_work(),INIT_WORK()对schedule_work().

    关于工作队列机制,咱们还会用到另外两个函数,它们是cancel_delayed_work(struct delayed_work *work)和flush_scheduled_work().其中cancel_delayed_work()的意思不言自明,对一个延迟执行的工作来说,这个函数的作用是在这个工作还未执行的时候就把它给取消掉.而flush_scheduled_work()的作用,是为了防止有竞争条件的出现,虽说哥们儿也不是很清楚如何防止竞争,可是好歹大二那年学过一门专业课,数字电子线路,尽管没学到什么有用的东西,怎么说也还是记住了两个专业名词,竞争与冒险.您要是对竞争条件不是很明白,那也不要紧,反正基本上每次cancel_delayed_work之后您都得调用flush_scheduled_work()这个函数,特别是对于内核模块,如果一个模块使用了工作队列机制,并且利用了events这个缺省队列,那么在卸载这个模块之前,您必须得调用这个函数,这叫做刷新一个工作队列,也就是说,函数会一直等待,直到队列中所有对象都被执行以后才返回.当然,在等待的过程中,这个函数可以进入睡眠.反正刷新完了之后,这个函数会被唤醒,然后它就返回了.关于这里这个竞争,可以这样理解,events对应的这个队列,人家本来是按部就班的执行,一个一个来,您要是突然把您的模块给卸载了,或者说你把你的那个工作从工作队列里取出来了,那events作为队列管理者,它可能根本就不知道,比如说它先想好了,下午3点执行队列里的第N个成员,可是您突然把第N-1个成员给取走了,那您说这是不是得出错?所以,为了防止您这种唯恐天下不乱的人做出冒天下之大不韪的事情来,提供了一个函数,flush_scheduled_work(),给您调用,以消除所谓的竞争条件,其实说竞争太专业了点,说白了就是防止混乱吧.

    Ok,关于这些接口就讲到这里,日后咱们自然会在hub驱动里见到这些接口函数是如何被使用的.到那时候再来看.这就是蝴蝶效应.当我们看到INIT_WORK/INIT_DELAYED_WORK()的时候,我们是没法预测未来会发生什么的.所以我们只能拭目以待.又想起了那句老话,大学生活就像被强奸,如果不能反抗,那就只能静静的去享受它.










    本文转自张昺华-sky博客园博客,原文链接:http://www.cnblogs.com/sky-heaven/p/5037172.html,如需转载请自行联系原作者


    展开全文
  • cancel_delayed_work和flush_scheduled_work

    千次阅读 2012-12-25 20:33:47
    那好,我们来看点具体的,比如935行,INIT_DELAYED_WORK().这是一张新面孔.同志们大概注意到了,在hub这个故事里,我们的讲解风格略有变化,对于那些旧的东西,对于那些在usb-storage里面讲过很多次的东西,我们不会再多提,...

    是不是觉得很玄?像思念一样玄?那好,我们来看点具体的,比如935,INIT_DELAYED_WORK().这是一张新面孔.同志们大概注意到了,hub这个故事里,我们的讲解风格略有变化,对于那些旧的东西,对于那些在usb-storage里面讲过很多次的东西,我们不会再多提,但是对于新鲜的东西,我们会花大把的笔墨去描摹.这样做的原因很简单,男人嘛,有几个不是喜新厌旧呢,要不然也不会结婚前觉得适合自己的女人很少,结婚后觉得适合自己的女人很多.

    所以本节我们就用大把的笔墨来讲述老百姓自己的故事.就讲这一行,935.INIT_DELAYED_WORK()是一个宏,我们给它传递了两个参数.&hub->ledsled_work.对设备驱动熟悉的人不会觉得INIT_DELAYED_WORK()很陌生,其实鸦片战争那会儿就有这个宏了,只不过从2.6.20的内核开始这个宏做了改变,原来这个宏是三个参数,后来改成了两个参数,所以经常在网上看见一些同志抱怨说最近某个模块编译失败了,说什么make的时候遇见这么一个错误:

    error: macro "INIT_DELAYED_WORK" passed 3 arguments, but takes just 2

    当然更为普遍的看到下面这个错误:

    error: macro "INIT_WORK" passed 3 arguments, but takes just 2

    于是就让我们来仔细看看INIT_WORKINIT_DELAYED_WORK.其实前者是后者的一个特例,它们涉及到的就是传说中的工作队列.这两个宏都定义于include/linux/workqueue.h:

         79 #define INIT_WORK(_work, _func)                                         /

         80         do {                                                            /

         81                 (_work)->data = (atomic_long_t) WORK_DATA_INIT();       /

         82                 INIT_LIST_HEAD(&(_work)->entry);                        /

         83                 PREPARE_WORK((_work), (_func));                         /

         84         } while (0)

         85

         86 #define INIT_DELAYED_WORK(_work, _func)                         /

         87         do {                                                    /

         88                 INIT_WORK(&(_work)->work, (_func));             /

         89                 init_timer(&(_work)->timer);                    /

         90         } while (0)

    有时候特怀念谭浩强那本书里的那些例子程序,因为那些程序都特简单,不像现在看到的这些,动不动就是些复杂的函数复杂的数据结构复杂的宏,严重挫伤了我这样的有志青年的自信心.就比如眼下这几个宏吧,宏里边还是宏,一个套一个,不是说看不懂,因为要看懂也不难,一层一层展开,只不过确实没必要非得都看懂,现在这样一种朦胧美也许更美,有那功夫把这些都展开我还不如去认认真真学习三个代表呢.总之,关于工作队列,就这么说吧,Linux内核实现了一个内核线程,直观一点,ps命令看一下您的进程,

    localhost:/usr/src/linux-2.6.22.1/drivers/usb/core # ps -el

    F S   UID   PID  PPID  C PRI  NI ADDR SZ WCHAN  TTY          TIME CMD

    4 S     0     1     0  0  76   0 -   195 -      ?        00:00:02 init

    1 S     0     2     1  0 -40   - -     0 migrat ?        00:00:00 migration/0

    1 S     0     3     1  0  94  19 -     0 ksofti ?        00:00:00 ksoftirqd/0

    1 S     0     4     1  0 -40   - -     0 migrat ?        00:00:00 migration/1

    1 S     0     5     1  0  94  19 -     0 ksofti ?        00:00:00 ksoftirqd/1

    1 S     0     6     1  0 -40   - -     0 migrat ?        00:00:00 migration/2

    1 S     0     7     1  0  94  19 -     0 ksofti ?        00:00:00 ksoftirqd/2

    1 S     0     8     1  0 -40   - -     0 migrat ?        00:00:00 migration/3

    1 S     0     9     1  0  94  19 -     0 ksofti ?        00:00:00 ksoftirqd/3

    1 S     0    10     1  0 -40   - -     0 migrat ?        00:00:00 migration/4

    1 S     0    11     1  0  94  19 -     0 ksofti ?        00:00:00 ksoftirqd/4

    1 S     0    12     1  0 -40   - -     0 migrat ?        00:00:00 migration/5

    1 S     0    13     1  0  94  19 -     0 ksofti ?        00:00:00 ksoftirqd/5

    1 S     0    14     1  0 -40   - -     0 migrat ?        00:00:00 migration/6

    1 S     0    15     1  0  94  19 -     0 ksofti ?        00:00:00 ksoftirqd/6

    1 S     0    16     1  0 -40   - -     0 migrat ?        00:00:00 migration/7

    1 S     0    17     1  0  94  19 -     0 ksofti ?        00:00:00 ksoftirqd/7

    5 S     0    18     1  0  70  -5 -     0 worker ?        00:00:00 events/0

    1 S     0    19     1  0  70  -5 -     0 worker ?        00:00:00 events/1

    5 S     0    20     1  0  70  -5 -     0 worker ?        00:00:00 events/2

    5 S     0    21     1  0  70  -5 -     0 worker ?        00:00:00 events/3

    5 S     0    22     1  0  70  -5 -     0 worker ?        00:00:00 events/4

    1 S     0    23     1  0  70  -5 -     0 worker ?        00:00:00 events/5

    5 S     0    24     1  0  70  -5 -     0 worker ?        00:00:00 events/6

    5 S     0    25     1  0  70  -5 -     0 worker ?        00:00:00 events/7

    瞅见最后这几行了吗,events/0events/7,07啊这些都是处理器的编号,每个处理器对应其中的一个线程.要是您的计算机只有一个处理器,那么您只能看到一个这样的线程,events/0,您要是双处理器那您就会看到多出一个events/1的线程.哥们儿这里Dell PowerEdge 2950的机器,8个处理器,所以就是events/0events/7.

    那么究竟这些events代表什么意思呢?或者说它们具体干嘛用的?这些events被叫做工作者线程,或者说worker threads,更确切的说,这些应该是缺省的工作者线程.而与工作者线程相关的一个概念就是工作队列,或者叫work queue.工作队列的作用就是把工作推后,交由一个内核线程去执行,更直接的说就是如果您写了一个函数,而您现在不想马上执行它,您想在将来某个时刻去执行它,那您用工作队列准没错.您大概会想到中断也是这样,提供一个中断服务函数,在发生中断的时候去执行,没错,和中断相比,工作队列最大的好处就是可以调度可以睡眠,灵活性更好.

    就比如这里,如果我们将来某个时刻希望能够调用led_work()这么一个我们自己写的函数,那么我们所要做的就是利用工作队列.如何利用呢?第一步就是使用INIT_WORK()或者INIT_DELAYED_WORK()来初始化这么一个工作,或者叫任务,初始化了之后,将来如果咱们希望调用这个led_work()函数,那么咱们只要用一句schedule_work()或者schedule_delayed_work()就可以了,特别的,咱们这里使用的是INIT_DELAYED_WORK(),那么之后我们就会调用schedule_delayed_work(),这俩是一对.它表示,您希望经过一段延时然后再执行某个函数,所以,咱们今后会见到schedule_delayed_work()这个函数的,而它所需要的参数,一个就是咱们这里的&hub->leds,另一个就是具体自己需要的延时.&hub->leds是什么呢?struct usb_hub中的成员,struct delayed_work leds,专门用于延时工作的,再看struct delayed_work,这个结构体定义于include/linux/workqueue.h:

         35 struct delayed_work {

         36         struct work_struct work;

         37         struct timer_list timer;

         38 };

    其实就是一个struct work_struct和一个timer_list,前者是为了往工作队列里加入自己的工作,后者是为了能够实现延时执行,咱们把话说得更明白一点,您看那些events线程,它们对应一个结构体,struct workqueue_struct,也就是说它们维护着一个队列,完了您要是想利用工作队列这么一个机制呢,您可以自己创建一个队列,也可以直接使用events对应的这个队列,对于大多数情况来说,都是选择了events对应的这个队列,也就是说大家都共用这么一个队列,怎么用呢?先初始化,比如调用INIT_DELAYED_WORK(),这么一初始化吧,实际上就是为一个struct work_struct结构体绑定一个函数,就比如咱们这里的两个参数,&hub->ledsled_work()的关系,就最终让hub_leds这个struct work_struct结构体和函数led_work()相绑定了起来,您问怎么绑定的?您瞧,struct work_struct也是定义于include/linux/workqueue.h:

         24 struct work_struct {

         25         atomic_long_t data;

         26 #define WORK_STRUCT_PENDING 0           /* T if work item pending execution */

         27 #define WORK_STRUCT_FLAG_MASK (3UL)

         28 #define WORK_STRUCT_WQ_DATA_MASK (~WORK_STRUCT_FLAG_MASK)

         29         struct list_head entry;

         30         work_func_t func;

         31 };

    瞅见最后这个成员func了吗,初始化的目的就是让func指向led_work(),这就是绑定,所以以后咱们调用schedule_delayed_work()的时候,咱们只要传递struct work_struct的结构体参数即可,不用再每次都把led_work()这个函数名也给传递一次,一旦绑定,人家就知道了,对于led_work(),那她就嫁鸡随鸡,嫁狗随狗,嫁混蛋随混蛋了.您大概还有一个疑问,为什么只要这里初始化好了,到时候调用schedule_delayed_work()就可以了呢?事实上,events这么一个线程吧,它其实和hub的内核线程一样,有事情就处理,没事情就睡眠,也是一个死循环,schedule_delayed_work()的作用就是唤醒这个线程,确切的说,是先把自己的这个struct work_struct插入workqueue_struct这个队列里,然后唤醒昏睡中的events.然后events就会去处理,您要是有延时,那么它就给您安排延时以后执行,您要是没有延时,或者您设了延时为0,那好,那就赶紧给您执行.咱这里不是讲了两个宏吗,一个INIT_WORK(),一个INIT_DELAYED_WORK(),后者就是专门用于可以有延时的,而前者就是没有延时的,这里咱们调用的是INIT_DELAYED_WORK(),不过您别美,过一会您会看见INIT_WORK()也被使用了,因为咱们hub驱动中还有另一个地方也想利用工作队列这么一个机制,而它不需要延时,所以就使用INIT_WORK()进行初始化,然后在需要调用相关函数的时候调用schedule_work()即可.此乃后话,暂且不表.

    基本上这一节咱们就是介绍了Linux内核中工作队列机制提供的接口,两对函数INIT_DELAYED_WORK()schedule_delayed_work(),INIT_WORK()schedule_work().

    关于工作队列机制,咱们还会用到另外两个函数,它们是cancel_delayed_work(struct delayed_work *work)flush_scheduled_work().其中cancel_delayed_work()的意思不言自明,对一个延迟执行的工作来说,这个函数的作用是在这个工作还未执行的时候就把它给取消掉.flush_scheduled_work()的作用,是为了防止有竞争条件的出现,虽说哥们儿也不是很清楚如何防止竞争,可是好歹大二那年学过一门专业课,数字电子线路,尽管没学到什么有用的东西,怎么说也还是记住了两个专业名词,竞争与冒险.您要是对竞争条件不是很明白,那也不要紧,反正基本上每次cancel_delayed_work之后您都得调用flush_scheduled_work()这个函数,特别是对于内核模块,如果一个模块使用了工作队列机制,并且利用了events这个缺省队列,那么在卸载这个模块之前,您必须得调用这个函数,这叫做刷新一个工作队列,也就是说,函数会一直等待,直到队列中所有对象都被执行以后才返回.当然,在等待的过程中,这个函数可以进入睡眠.反正刷新完了之后,这个函数会被唤醒,然后它就返回了.关于这里这个竞争,可以这样理解,events对应的这个队列,人家本来是按部就班的执行,一个一个来,您要是突然把您的模块给卸载了,或者说你把你的那个工作从工作队列里取出来了,events作为队列管理者,它可能根本就不知道,比如说它先想好了,下午3点执行队列里的第N个成员,可是您突然把第N-1个成员给取走了,那您说这是不是得出错?所以,为了防止您这种唯恐天下不乱的人做出冒天下之大不韪的事情来,提供了一个函数,flush_scheduled_work(),给您调用,以消除所谓的竞争条件,其实说竞争太专业了点,说白了就是防止混乱吧.

    Ok,关于这些接口就讲到这里,日后咱们自然会在hub驱动里见到这些接口函数是如何被使用的.到那时候再来看.这就是蝴蝶效应.当我们看到INIT_WORK/INIT_DELAYED_WORK()的时候,我们是没法预测未来会发生什么的.所以我们只能拭目以待.又想起了那句老话,大学生活就像被强奸,如果不能反抗,那就只能静静的去享受它.

     

    展开全文
  • 本文参考了下面两位博主的文章,采用4.18版本的内核分析。 http://www.wowotech.net/sort/irq_subsystem... ... 一、前言 ...本文主要讲述下面两部分的内容: ...1、将work挂入workqueue的处理过程 2、如何处理挂入workqu...

    本文参考了下面两位博主的文章,采用4.18版本的内核分析。

    http://www.wowotech.net/sort/irq_subsystem

    https://blog.csdn.net/chenying126/article/details/78786406

     

    一、前言

    本文主要讲述下面两部分的内容:

    1、将work挂入workqueue的处理过程

    2、如何处理挂入workqueue的work

     

    二、用户将一个work挂入workqueue

     

    这里先给出几个常用的接口:

    1.把一个work挂入未绑定cpu的workqueue中

    /**
     * queue_work - queue work on a workqueue
     * @wq: workqueue to use
     * @work: work to queue
     *
     * Returns %false if @work was already on a queue, %true otherwise.
     *
     * We queue the work to the CPU on which it was submitted, but if the CPU dies
     * it can be processed by another CPU.
     */
    static inline bool queue_work(struct workqueue_struct *wq,
    			      struct work_struct *work)
    {
    	return queue_work_on(WORK_CPU_UNBOUND, wq, work);
    }
    

    2.把一个work在一段时间后,挂入未绑定cpu的workqueue

    /**
     * queue_delayed_work - queue work on a workqueue after delay
     * @wq: workqueue to use
     * @dwork: delayable work to queue
     * @delay: number of jiffies to wait before queueing
     *
     * Equivalent to queue_delayed_work_on() but tries to use the local CPU.
     */
    static inline bool queue_delayed_work(struct workqueue_struct *wq,
    				      struct delayed_work *dwork,
    				      unsigned long delay)
    {
    	return queue_delayed_work_on(WORK_CPU_UNBOUND, wq, dwork, delay);
    }
    

    这里的一段时间,是创建一个内核定时器,让在定时器处理函数中把work挂载workqueue上,在前面文章中已经分析过了。

    下面我们以queue_work_on函数作为开始:

     

    1、queue_work_on函数

    使用workqueue机制的模块可以调用queue_work_on(有其他变种的接口,这里略过,其实思路是一致的)将一个定义好的work挂入workqueue,具体代码如下:

    /**
     * queue_work_on - queue work on specific cpu
     * @cpu: CPU number to execute work on
     * @wq: workqueue to use
     * @work: work to queue
     *
     * We queue the work to a specific CPU, the caller must ensure it
     * can't go away.
     *
     * Return: %false if @work was already on a queue, %true otherwise.
     */
    bool queue_work_on(int cpu, struct workqueue_struct *wq,
    		   struct work_struct *work)
    {
    	bool ret = false;
    	unsigned long flags;
    
    	local_irq_save(flags);    //把work加入工作队列是在关本地中断下运行的
    
    	if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
    		__queue_work(cpu, wq, work);        //挂入work list并通知worker thread pool来处理 
    		ret = true;
    	}
    
    	local_irq_restore(flags);    //开本地中断
    	return ret;
    }
    

    work_struct的data member中的WORK_STRUCT_PENDING_BIT这个bit标识了该work是处于pending状态还是正在处理中,pending状态的work只会挂入一次。大部分的逻辑都是在__queue_work函数中,下面的小节都是描述该函数的执行过程。

     

    
    static void __queue_work(int cpu, struct workqueue_struct *wq,
    			 struct work_struct *work)
    {
    	struct pool_workqueue *pwq;
    	struct worker_pool *last_pool;
    	struct list_head *worklist;
    	unsigned int work_flags;
    	unsigned int req_cpu = cpu;
    
    	/*
    	 * While a work item is PENDING && off queue, a task trying to
    	 * steal the PENDING will busy-loop waiting for it to either get
    	 * queued or lose PENDING.  Grabbing PENDING and queueing should
    	 * happen with IRQ disabled.
    	 */
    	lockdep_assert_irqs_disabled();
    
    	debug_work_activate(work);
    
    	/* if draining, only works from the same workqueue are allowed */
    	if (unlikely(wq->flags & __WQ_DRAINING) &&
    	    WARN_ON_ONCE(!is_chained_work(wq)))
    		return;
    retry:
    	if (req_cpu == WORK_CPU_UNBOUND)
    		cpu = wq_select_unbound_cpu(raw_smp_processor_id());
    
    	/* pwq which will be used unless @work is executing elsewhere */
    	if (!(wq->flags & WQ_UNBOUND))
    		pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
    	else
    		pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));
    
    	/*
    	 * If @work was previously on a different pool, it might still be
    	 * running there, in which case the work needs to be queued on that
    	 * pool to guarantee non-reentrancy.
    	 */
    	last_pool = get_work_pool(work);
    	if (last_pool && last_pool != pwq->pool) {
    		struct worker *worker;
    
    		spin_lock(&last_pool->lock);
    
    		worker = find_worker_executing_work(last_pool, work);
    
    		if (worker && worker->current_pwq->wq == wq) {
    			pwq = worker->current_pwq;
    		} else {
    			/* meh... not running there, queue here */
    			spin_unlock(&last_pool->lock);
    			spin_lock(&pwq->pool->lock);
    		}
    	} else {
    		spin_lock(&pwq->pool->lock);
    	}
    
    	/*
    	 * pwq is determined and locked.  For unbound pools, we could have
    	 * raced with pwq release and it could already be dead.  If its
    	 * refcnt is zero, repeat pwq selection.  Note that pwqs never die
    	 * without another pwq replacing it in the numa_pwq_tbl or while
    	 * work items are executing on it, so the retrying is guaranteed to
    	 * make forward-progress.
    	 */
    	if (unlikely(!pwq->refcnt)) {
    		if (wq->flags & WQ_UNBOUND) {
    			spin_unlock(&pwq->pool->lock);
    			cpu_relax();
    			goto retry;
    		}
    		/* oops */
    		WARN_ONCE(true, "workqueue: per-cpu pwq for %s on cpu%d has 0 refcnt",
    			  wq->name, cpu);
    	}
    
    	/* pwq determined, queue */
    	trace_workqueue_queue_work(req_cpu, pwq, work);
    
    	if (WARN_ON(!list_empty(&work->entry))) {
    		spin_unlock(&pwq->pool->lock);
    		return;
    	}
    
    	pwq->nr_in_flight[pwq->work_color]++;
    	work_flags = work_color_to_flags(pwq->work_color);
    
    	if (likely(pwq->nr_active < pwq->max_active)) {
    		trace_workqueue_activate_work(work);
    		pwq->nr_active++;
    		worklist = &pwq->pool->worklist;
    		if (list_empty(worklist))
    			pwq->pool->watchdog_ts = jiffies;
    	} else {
    		work_flags |= WORK_STRUCT_DELAYED;
    		worklist = &pwq->delayed_works;
    	}
    
    	insert_work(pwq, work, worklist, work_flags);
    
    	spin_unlock(&pwq->pool->lock);
    }
    

    2、__WQ_DRAINING的解释

    __queue_work函数一开始会校验__WQ_DRAINING这个flag,如下:

    	/* if draining, only works from the same workqueue are allowed */
    	if (unlikely(wq->flags & __WQ_DRAINING) &&
    	    WARN_ON_ONCE(!is_chained_work(wq)))
    		return;
    

    __WQ_DRAINING这个flag表示该workqueue正在进行draining的操作,这多半是发送在销毁workqueue的时候,既然要销毁,那么挂入该workqueue的所有的work都要处理完毕,才允许它消亡。当想要将一个workqueue中所有的work都清空的时候,如果还有work挂入怎么办?一般而言,这时候当然是不允许新的work挂入了,毕竟现在的目标是清空workqueue中的work。但是有一种特例(通过is_chained_work判定),也就是正在清空的work(隶属于该workqueue)又触发了一个queue work的操作(也就是所谓chained work),这时候该work允许挂入。

     

    3、选择pool workqueue

    retry:
    	if (req_cpu == WORK_CPU_UNBOUND)        //这个work是否要求使用哪个cpu
    		cpu = wq_select_unbound_cpu(raw_smp_processor_id());
    
    	/* pwq which will be used unless @work is executing elsewhere */
    	if (!(wq->flags & WQ_UNBOUND))
    		pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);    //对于bound型的workqueue,直接使用本地CPU对应pool_workqueue
    	else
    		pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));   //对于unbound型,调用unbound_pwq_by_node()寻找本地node节点对应的unbound类型的pool_workqueue
    

    WORK_CPU_UNBOUND表示并不指定cpu,这时候,选择当前代码运行的那个cpu了。一旦确定了cpu了,对于非unbound的workqueue,当然使用per cpu的pool workqueue。如果是unbound的workqueue,那么要根据numa node id来选择。cpu_to_node可以从cpu id获取node id。需要注意的是:这里选择的pool wq只是备选的,可能用也可能不用,它有可能会被替换掉,具体参考下一节描述。

     

    4、选择worker thread pool

    与其说挂入workqueue,不如说挂入worker thread pool,因为毕竟是线程池来处理具体的work。pool_workqueue有一个相关联的worker thread pool(struct pool_workqueue的pool成员),因此看起来选择了pool wq也就选定了worker pool了,但是,不一定当前选定的那个pool wq对应的worker pool就适合该work,因为有时候该work可能正在其他的worker thread上执行中,在这种情况下,为了确保work的callback function不会重入,该work最好还是挂在那个worker thread pool上,具体代码如下:

    
    	/*
    	 * If @work was previously on a different pool, it might still be
    	 * running there, in which case the work needs to be queued on that
    	 * pool to guarantee non-reentrancy.
    	 */
    	last_pool = get_work_pool(work);        //通过work_struct的成员data查询该work上一次是在哪个worker_pool中运行的。
    	if (last_pool && last_pool != pwq->pool) {       //如果上次运行的worker_pool和本次不一致
    		struct worker *worker;
    
    		spin_lock(&last_pool->lock);
    
    		worker = find_worker_executing_work(last_pool, work);    //判断一个work是否正在last_pool上运行,也即不在当前worker_pool运行。如果是,返回这个正在执行的工作线程worker
    
    		if (worker && worker->current_pwq->wq == wq) {
    			pwq = worker->current_pwq;                //利用当前work正在执行的pool_workqueue,利用缓存热度,不进行调度
    		} else {
    			/* meh... not running there, queue here */
    			spin_unlock(&last_pool->lock);
    			spin_lock(&pwq->pool->lock);
    		}
    	} else {
    		spin_lock(&pwq->pool->lock);
    	}

    last_pool记录了上一次该work是被哪一个worker pool处理的,如果last_pool就是pool wq对应的worker pool,那么皆大欢喜,否则只能使用last pool了。使用last pool的例子比较复杂一些,因为这时候需要根据last worker pool找到对应的pool workqueue。find_worker_executing_work函数可以找到具体哪一个worker线程正在处理该work,如果没有找到,那么还是使用第3节中选定的pool wq吧,否则,选择该worker线程当前的那个pool workqueue(其实也就是选定了线程池)。

     

    5.确定这个pwq可用

    	/*
    	 * pwq is determined and locked.  For unbound pools, we could have
    	 * raced with pwq release and it could already be dead.  If its
    	 * refcnt is zero, repeat pwq selection.  Note that pwqs never die
    	 * without another pwq replacing it in the numa_pwq_tbl or while
    	 * work items are executing on it, so the retrying is guaranteed to
    	 * make forward-progress.
    	 */
    	if (unlikely(!pwq->refcnt)) {
    		if (wq->flags & WQ_UNBOUND) {
    			spin_unlock(&pwq->pool->lock);
    			cpu_relax();    //对unbound类型pool_workqueue释放是异步的,当refcnt减少到0时,说明该pool_workqueue已经被释放,那么需要跳转到retry出重新选择pool_workqueue
    			goto retry;
    		}
    		/* oops */
            /* 对于绑定cpu的,pwq每个cpu都有两个线程池,所以pwq不会使0,如果是0,只能证明程序已经跑飞 */
    		WARN_ONCE(true, "workqueue: per-cpu pwq for %s on cpu%d has 0 refcnt",
    			  wq->name, cpu);
    	}
    

     

    /* initialize newly alloced @pwq which is associated with @wq and @pool */
    static void init_pwq(struct pool_workqueue *pwq, struct workqueue_struct *wq,
    		     struct worker_pool *pool)
    {
    	BUG_ON((unsigned long)pwq & WORK_STRUCT_FLAG_MASK);
    
    	memset(pwq, 0, sizeof(*pwq));
    
    	pwq->pool = pool;
    	pwq->wq = wq;
    	pwq->flush_color = -1;
    	pwq->refcnt = 1;        //新申请的pwq,初始化时,值为1
    	INIT_LIST_HEAD(&pwq->delayed_works);
    	INIT_LIST_HEAD(&pwq->pwqs_node);
    	INIT_LIST_HEAD(&pwq->mayday_node);
    	INIT_WORK(&pwq->unbound_release_work, pwq_unbound_release_workfn);
    }
    

     

    在pwq中加入一个work时,是要引用计数加1

    
    /**
     * insert_work - insert a work into a pool
     * @pwq: pwq @work belongs to
     * @work: work to insert
     * @head: insertion point
     * @extra_flags: extra WORK_STRUCT_* flags to set
     *
     * Insert @work which belongs to @pwq after @head.  @extra_flags is or'd to
     * work_struct flags.
     *
     * CONTEXT:
     * spin_lock_irq(pool->lock).
     */
    static void insert_work(struct pool_workqueue *pwq, struct work_struct *work,
    			struct list_head *head, unsigned int extra_flags)
    {
    	struct worker_pool *pool = pwq->pool;
    
    	/* we own @work, set data and link */
    	set_work_pwq(work, pwq, extra_flags);
    	list_add_tail(&work->entry, head);
    	get_pwq(pwq);
    
    	/*
    	 * Ensure either wq_worker_sleeping() sees the above
    	 * list_add_tail() or we see zero nr_running to avoid workers lying
    	 * around lazily while there are works to be processed.
    	 */
    	smp_mb();
    
    	if (__need_more_worker(pool))
    		wake_up_worker(pool);
    }
    
    /**
     * get_pwq - get an extra reference on the specified pool_workqueue
     * @pwq: pool_workqueue to get
     *
     * Obtain an extra reference on @pwq.  The caller should guarantee that
     * @pwq has positive refcnt and be holding the matching pool->lock.
     */
    static void get_pwq(struct pool_workqueue *pwq)
    {
    	lockdep_assert_held(&pwq->pool->lock);
    	WARN_ON_ONCE(pwq->refcnt <= 0);
    	pwq->refcnt++;
    }
    
    

     

    在一个work执行完之后,要在链表中删除这个work前,引用计数减1

    /**
     * put_pwq - put a pool_workqueue reference
     * @pwq: pool_workqueue to put
     *
     * Drop a reference of @pwq.  If its refcnt reaches zero, schedule its
     * destruction.  The caller should be holding the matching pool->lock.
     */
    static void put_pwq(struct pool_workqueue *pwq)
    {
    	lockdep_assert_held(&pwq->pool->lock);
    	if (likely(--pwq->refcnt))            // 减1操作
    		return;
    	if (WARN_ON_ONCE(!(pwq->wq->flags & WQ_UNBOUND)))
    		return;
    	/*
    	 * @pwq can't be released under pool->lock, bounce to
    	 * pwq_unbound_release_workfn().  This never recurses on the same
    	 * pool->lock as this path is taken only for unbound workqueues and
    	 * the release work item is scheduled on a per-cpu workqueue.  To
    	 * avoid lockdep warning, unbound pool->locks are given lockdep
    	 * subclass of 1 in get_unbound_pool().
    	 */
    	schedule_work(&pwq->unbound_release_work);
    }

     

    6、选择work挂入的队列

    队列有两个,一个是被推迟执行的队列(pwq->delayed_works),一个是线程池要处理的队列(pwq->pool->worklist),如果挂入线程池要处理的队列,也就意味着该work进入active状态,线程池会立刻启动处理流程,如果挂入推迟执行的队列,那么该work还是pending状态:

    	/* pwq determined, queue */
    	trace_workqueue_queue_work(req_cpu, pwq, work);
    
    	if (WARN_ON(!list_empty(&work->entry))) {
    		spin_unlock(&pwq->pool->lock);
    		return;
    	}
    
    	pwq->nr_in_flight[pwq->work_color]++;
    	work_flags = work_color_to_flags(pwq->work_color);
    
    	if (likely(pwq->nr_active < pwq->max_active)) {        //判断当前pool_workqueue的work活跃数量,如果少于最高限值,就加入链表worker_pool->worklist,否则加入delayed_works链表中
    		trace_workqueue_activate_work(work);
    		pwq->nr_active++;
    		worklist = &pwq->pool->worklist;
    		if (list_empty(worklist))
    			pwq->pool->watchdog_ts = jiffies;
    	} else {
    		work_flags |= WORK_STRUCT_DELAYED;
    		worklist = &pwq->delayed_works;
    	}
    
    

     

    7.将当前work加入到pool_workqueue->worklist尾部

    
    	insert_work(pwq, work, worklist, work_flags);
    
    	spin_unlock(&pwq->pool->lock);
    
    
    /**
     * insert_work - insert a work into a pool
     * @pwq: pwq @work belongs to
     * @work: work to insert
     * @head: insertion point
     * @extra_flags: extra WORK_STRUCT_* flags to set
     *
     * Insert @work which belongs to @pwq after @head.  @extra_flags is or'd to
     * work_struct flags.
     *
     * CONTEXT:
     * spin_lock_irq(pool->lock).
     */
    static void insert_work(struct pool_workqueue *pwq, struct work_struct *work,
    			struct list_head *head, unsigned int extra_flags)
    {
    	struct worker_pool *pool = pwq->pool;
    
    	/* we own @work, set data and link   */
    	set_work_pwq(work, pwq, extra_flags);     //把pool_workqueue指针的值和一些flag设置到data成员中,方便下次调用
    	list_add_tail(&work->entry, head);        //加入这个线程池的工作链表尾部
    	get_pwq(pwq);                             //每次加入,pwq的引用计数都要累加
    
    	/*
    	 * Ensure either wq_worker_sleeping() sees the above
    	 * list_add_tail() or we see zero nr_running to avoid workers lying
    	 * around lazily while there are works to be processed.
    	 */
        //保证wake_up_worker()唤醒worker时,在__schedule()->wq_worker_sleeping()时,这里的list_add_tail()已经完成。同时保证下面__need_more_worker()读取nr_running时list_add_tail()链表已经完成。
    	smp_mb();      
    
    	if (__need_more_worker(pool))        //如果当前nr_running为0,表示当前worker可能并没有处于运行状态。那么需要wake_up_worker()强行唤醒一次。
    		wake_up_worker(pool);
    }
    

     

    三、系统wq的初始化的初始化流程

     

     

    四、 workqueue

    workqueue 就是存放一组 work 的集合,基本可以分为两类:一类系统创建的 workqueue,一类是用户自己创建的 workqueue。

    不论是系统还是用户的 workqueue,如果没有指定 WQ_UNBOUND,默认都是和 normal worker_pool 绑定。

    1. 系统 workqueue

    系统在初始化时创建了一批默认的 workqueue:system_wq、system_highpri_wq、system_long_wq、system_unbound_wq、system_freezable_wq、system_power_efficient_wq、system_freezable_power_efficient_wq。

    
    /**
     * workqueue_init_early - early init for workqueue subsystem
     *
     * This is the first half of two-staged workqueue subsystem initialization
     * and invoked as soon as the bare basics - memory allocation, cpumasks and
     * idr are up.  It sets up all the data structures and system workqueues
     * and allows early boot code to create workqueues and queue/cancel work
     * items.  Actual work item execution starts only after kthreads can be
     * created and scheduled right before early initcalls.
     */
    int __init workqueue_init_early(void)
    {
    	int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL };    //每个cpu有高低优先级的,2个,这里HIGHPRI_NICE_LEVEL为-20,对应的prio为100,是普通进程里面的最高优先级
    	int hk_flags = HK_FLAG_DOMAIN | HK_FLAG_WQ;
    	int i, cpu;
    
    	WARN_ON(__alignof__(struct pool_workqueue) < __alignof__(long long));
    
    	BUG_ON(!alloc_cpumask_var(&wq_unbound_cpumask, GFP_KERNEL));
    	cpumask_copy(wq_unbound_cpumask, housekeeping_cpumask(hk_flags));
    
    	pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC);
    
    	/* initialize CPU pools 为每个cpu初始化两个线程池  */
    	for_each_possible_cpu(cpu) {        
    		struct worker_pool *pool;
    
    		i = 0;
    		for_each_cpu_worker_pool(pool, cpu) {    //每个CPU两个worker_pool,分别对应per-cpu变量cpu_worker_pool[0]和cpu_worker_pool[1]
    			BUG_ON(init_worker_pool(pool));        //初始化worker_pool(线程池)
    			pool->cpu = cpu;                       // 指定 cpu
    			cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
    			pool->attrs->nice = std_nice[i++];     //设置nice值
    			pool->node = cpu_to_node(cpu);        //pre-cpu的都是设置的near  mermory
    
    			/* alloc pool ID */
    			mutex_lock(&wq_pool_mutex);
    			BUG_ON(worker_pool_assign_id(pool));
    			mutex_unlock(&wq_pool_mutex);
    		}
    	}
    
    	/* create default unbound and ordered wq attrs */
    	for (i = 0; i < NR_STD_WORKER_POOLS; i++) {    //默认未绑定和ordered的也是各申请两个attribute(高低优先级)
    		struct workqueue_attrs *attrs;
    
    		BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL)));
    		attrs->nice = std_nice[i];
    		unbound_std_wq_attrs[i] = attrs;        //设置Unbound类型workqueue的属性
    
    		/*
    		 * An ordered wq should have only one pwq as ordering is
    		 * guaranteed by max_active which is enforced by pwqs.
    		 * Turn off NUMA so that dfl_pwq is used for all nodes.
    		 */
    		BUG_ON(!(attrs = alloc_workqueue_attrs(GFP_KERNEL)));
    		attrs->nice = std_nice[i];
    		attrs->no_numa = true;
    		ordered_wq_attrs[i] = attrs;    //设置ordered类型workqueue的属性,ordered类型workqueue同一时刻只能有一个work item在运行。
    	}
    
    	system_wq = alloc_workqueue("events", 0, 0);            //普通优先级bound类型工作队列system_wq
    	system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);    //高优先级bound类型工作队列system_highpri_wq
    	system_long_wq = alloc_workqueue("events_long", 0, 0);            //比较耗时的任务
    	system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,    //普通优先级unbound类型工作队列
    					    WQ_UNBOUND_MAX_ACTIVE);
    	system_freezable_wq = alloc_workqueue("events_freezable",        //freezable(系统挂起后,这类内核线程可以被冻结)类型工作队列system_freezable_wq
    					      WQ_FREEZABLE, 0);
    	system_power_efficient_wq = alloc_workqueue("events_power_efficient",    //省电类型的工作队列
    					      WQ_POWER_EFFICIENT, 0);
    	system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_power_efficient",   //freezable并且省电类型的工作队列
    					      WQ_FREEZABLE | WQ_POWER_EFFICIENT,
    					      0);
    	BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||
    	       !system_unbound_wq || !system_freezable_wq ||
    	       !system_power_efficient_wq ||
    	       !system_freezable_power_efficient_wq);
    
    	return 0;
    }
    

     

    这里我们看到,pre-cpu的线程池是直接初始化的。前面的章节如果有印象,应该知道,pre-cpu的现场池其实是静态定义的,所以它不用动态申请可以直接初始化。

    /* the per-cpu worker pools */
    static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], cpu_worker_pools);
    

     

     

    下面的几个函数都是系统默认的system_wq

    
    /**
     * schedule_work_on - put work task on a specific cpu
     * @cpu: cpu to put the work task on
     * @work: job to be done
     *
     * This puts a job on a specific cpu
     */
    static inline bool schedule_work_on(int cpu, struct work_struct *work)
    {
    	return queue_work_on(cpu, system_wq, work);
    }
    
    /**
     * schedule_work - put work task in global workqueue
     * @work: job to be done
     *
     * Returns %false if @work was already on the kernel-global workqueue and
     * %true otherwise.
     *
     * This puts a job in the kernel-global workqueue if it was not already
     * queued and leaves it in the same position on the kernel-global
     * workqueue otherwise.
     */
    static inline bool schedule_work(struct work_struct *work)
    {
    	return queue_work(system_wq, work);
    }
    
    
    /**
     * schedule_delayed_work - put work task in global workqueue after delay
     * @dwork: job to be done
     * @delay: number of jiffies to wait or 0 for immediate execution
     *
     * After waiting for a given time this puts a job in the kernel-global
     * workqueue.
     */
    static inline bool schedule_delayed_work(struct delayed_work *dwork,
    					 unsigned long delay)
    {
    	return queue_delayed_work(system_wq, dwork, delay);
    }
    

     

    五、线程池初始化

    
    /**
     * init_worker_pool - initialize a newly zalloc'd worker_pool
     * @pool: worker_pool to initialize
     *
     * Initialize a newly zalloc'd @pool.  It also allocates @pool->attrs.
     *
     * Return: 0 on success, -errno on failure.  Even on failure, all fields
     * inside @pool proper are initialized and put_unbound_pool() can be called
     * on @pool safely to release it.
     */
    static int init_worker_pool(struct worker_pool *pool)
    {
    	spin_lock_init(&pool->lock);
    	pool->id = -1;
    	pool->cpu = -1;                    //初始值-1表示当前worker_pool是unbound型的
    	pool->node = NUMA_NO_NODE;            
    	pool->flags |= POOL_DISASSOCIATED;
    	pool->watchdog_ts = jiffies;
    	INIT_LIST_HEAD(&pool->worklist);   //worker_pool 的 work list,各个 workqueue 把 work 挂载到这个链表上, 让 worker_pool 对应的多个 worker 来执行
    	INIT_LIST_HEAD(&pool->idle_list);    //worker_pool 的 idle worker list
    	hash_init(pool->busy_hash);          //worker_pool 的 busy worker list
    
        //销毁多余worker,每IDLE_WORKER_TIMEOUT(300秒)执行一次
    	timer_setup(&pool->idle_timer, idle_worker_timeout, TIMER_DEFERRABLE);
    
        //设置mayday_timer,周期为MAYDAY_INTERVAL,即100ms
    	timer_setup(&pool->mayday_timer, pool_mayday_timeout, 0);
    
    	INIT_LIST_HEAD(&pool->workers);        //worker_pool的 worker list
    
    	ida_init(&pool->worker_ida);
    	INIT_HLIST_NODE(&pool->hash_node);
    	pool->refcnt = 1;                     //线程池,初始化时引用计数为1,创建一个worker + 1,销毁一个worker - 1
    
    	/* shouldn't fail above this point */
    	pool->attrs = alloc_workqueue_attrs(GFP_KERNEL);
    	if (!pool->attrs)
    		return -ENOMEM;
    	return 0;
    }

     

     

     

    六、线程池如何创建worker线程?

    1、per cpu worker pool什么时候创建worker线程?

    2.unbound cpu worker pool 什么时候创建worker线程?

    
    /**
     * workqueue_init - bring workqueue subsystem fully online
     *
     * This is the latter half of two-staged workqueue subsystem initialization
     * and invoked as soon as kthreads can be created and scheduled.
     * Workqueues have been created and work items queued on them, but there
     * are no kworkers executing the work items yet.  Populate the worker pools
     * with the initial workers and enable future kworker creations.
     */
    int __init workqueue_init(void)
    {
    	struct workqueue_struct *wq;
    	struct worker_pool *pool;
    	int cpu, bkt;
    
    	/*
    	 * It'd be simpler to initialize NUMA in workqueue_init_early() but
    	 * CPU to node mapping may not be available that early on some
    	 * archs such as power and arm64.  As per-cpu pools created
    	 * previously could be missing node hint and unbound pools NUMA
    	 * affinity, fix them up.
    	 *
    	 * Also, while iterating workqueues, create rescuers if requested.
    	 */
    	wq_numa_init();
    
    	mutex_lock(&wq_pool_mutex);
    
    	for_each_possible_cpu(cpu) {        //初始化线程池的内存节点编号
    		for_each_cpu_worker_pool(pool, cpu) {
    			pool->node = cpu_to_node(cpu);    
    		}
    	}
    
        //workqueues上挂了系统中所有的wq,这里是初始化这些wq
    	list_for_each_entry(wq, &workqueues, list) {
    		wq_update_unbound_numa(wq, smp_processor_id(), true);
    		WARN(init_rescuer(wq),    //创建救援线程
    		     "workqueue: failed to create early rescuer for %s",
    		     wq->name);
    	}
    
    	mutex_unlock(&wq_pool_mutex);
    
    	/* create the initial workers  给每个在线的cpu的 worker_pool 创建 worker*/
    	for_each_online_cpu(cpu) {        
    		for_each_cpu_worker_pool(pool, cpu) {
    			pool->flags &= ~POOL_DISASSOCIATED;
    			BUG_ON(!create_worker(pool));    //创建一个线程池
    		}
    	}
    
        //对未绑定cpu的线程池,也创建线程一个默认的
    	hash_for_each(unbound_pool_hash, bkt, pool, hash_node)    //创建unbound线程池
    		BUG_ON(!create_worker(pool));
    
    	wq_online = true;
    	wq_watchdog_init();        //初始化wq_watchdog,用于监视某个worker是不是卡主了
    
    	return 0;
    }
    

    因此,在系统初始化的时候,per cpu workqueue共享的那些线程池(2 x cpu nr)和unbound cpu workqueue的线程池就会通过create_worker创建一个initial worker。

    一旦initial worker启动,该线程会执行worker_thread函数来处理work,在处理过程中,如果有需要, worker会创建新的线程。

     

    2、unbound thread pool什么时候创建worker线程?

    我们先看看unbound thread pool的建立,和per-CPU不同的是unbound thread pool是全局共享的,因此,每当创建不同属性的unbound workqueue的时候,都需要创建pool_workqueue及其对应的worker pool,这时候就会调用get_unbound_pool函数在当前系统中现存的线程池中找是否有匹配的worker pool,如果没有就需要创建新的线程池。在创建新的线程池之后,会立刻调用create_worker创建一个initial worker。和per cpu worker pool一样,一旦initial worker启动,随着work不断的挂入以及worker处理work的具体情况,线程池会动态创建worker。当然unbound thread pool在系统刚开始初始化的时候是创建了一个的。

     

    
    /**
     * get_unbound_pool - get a worker_pool with the specified attributes
     * @attrs: the attributes of the worker_pool to get
     *
     * Obtain a worker_pool which has the same attributes as @attrs, bump the
     * reference count and return it.  If there already is a matching
     * worker_pool, it will be used; otherwise, this function attempts to
     * create a new one.
     *
     * Should be called with wq_pool_mutex held.
     *
     * Return: On success, a worker_pool with the same attributes as @attrs.
     * On failure, %NULL.
     */
    static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
    {
    	u32 hash = wqattrs_hash(attrs);
    	struct worker_pool *pool;
    	int node;
    	int target_node = NUMA_NO_NODE;
    
    	lockdep_assert_held(&wq_pool_mutex);
    
    	/* do we already have a matching pool? ,查找是否已经有这个属性的未绑定的cpu的线程池 */
    	hash_for_each_possible(unbound_pool_hash, pool, hash_node, hash) {
    		if (wqattrs_equal(pool->attrs, attrs)) {        //比较属性
    			pool->refcnt++;
    			return pool;        //有的话,直接返回
    		}
    	}
    
    	/* if cpumask is contained inside a NUMA node, we belong to that node */
    	if (wq_numa_enabled) {
    		for_each_node(node) {
    			if (cpumask_subset(attrs->cpumask,
    					   wq_numa_possible_cpumask[node])) {
    				target_node = node;
    				break;
    			}
    		}
    	}
    
    	/* nope, create a new one ,没有测创建一个新的 */
    	pool = kzalloc_node(sizeof(*pool), GFP_KERNEL, target_node);
    	if (!pool || init_worker_pool(pool) < 0)
    		goto fail;
    
    	lockdep_set_subclass(&pool->lock, 1);	/* see put_pwq() */
    	copy_workqueue_attrs(pool->attrs, attrs);
    	pool->node = target_node;
    
    	/*
    	 * no_numa isn't a worker_pool attribute, always clear it.  See
    	 * 'struct workqueue_attrs' comments for detail.
    	 */
    	pool->attrs->no_numa = false;
    
    	if (worker_pool_assign_id(pool) < 0)
    		goto fail;
    
    	/* create and start the initial worker */
    	if (wq_online && !create_worker(pool))
    		goto fail;
    
    	/* install */
    	hash_add(unbound_pool_hash, &pool->hash_node, hash);        //新的线程池,加入哈希表
    
    	return pool;
    fail:
    	if (pool)
    		put_unbound_pool(pool);
    	return NULL;
    }

     

     

    3、如何创建worker。代码如下:

    
    /**
     * create_worker - create a new workqueue worker
     * @pool: pool the new worker will belong to
     *
     * Create and start a new worker which is attached to @pool.
     *
     * CONTEXT:
     * Might sleep.  Does GFP_KERNEL allocations.
     *
     * Return:
     * Pointer to the newly created worker.
     */
    static struct worker *create_worker(struct worker_pool *pool)
    {
    	struct worker *worker = NULL;
    	int id = -1;
    	char id_buf[16];
    
    	/* ID is needed to determine kthread name */
    	id = ida_simple_get(&pool->worker_ida, 0, 0, GFP_KERNEL);    //当前worker_pool->worker_ida获取一个空闲id
    	if (id < 0)
    		goto fail;
    
    	worker = alloc_worker(pool->node);        //分配一个woker结构体
    	if (!worker)
    		goto fail;
    
    	worker->id = id;                          //递增的id
    
        //worker的名字
    	if (pool->cpu >= 0)
    		snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
    			 pool->attrs->nice < 0  ? "H" : "");
    	else
    		snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
    
        //创建内核线程,线程处理函数是worker_thread
    	worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
    					      "kworker/%s", id_buf);
    	if (IS_ERR(worker->task))
    		goto fail;
    
    	set_user_nice(worker->task, pool->attrs->nice);        //设置内核工作线程的优先级相关
    	kthread_bind_mask(worker->task, pool->attrs->cpumask);    
    
    	/* successful, attach the worker to the pool */
    	worker_attach_to_pool(worker, pool);        //建立worker和线程池的关系 
    
    	/* start the newly created worker */
    	spin_lock_irq(&pool->lock);
    	worker->pool->nr_workers++;                //统计当前worker对应worker_pool中工作线程数目
    	worker_enter_idle(worker);                 //让该工作线程进入idle状态
    	wake_up_process(worker->task);             //唤醒刚创建的工作线程
    	spin_unlock_irq(&pool->lock);    
    
    	return worker;
    
    fail:
    	if (id >= 0)
    		ida_simple_remove(&pool->worker_ida, id);
    	kfree(worker);
    	return NULL;
    }
    

    代码不复杂,通过线程池(struct worker_pool)绑定的cpu信息(struct worker_pool的cpu成员)可以知道该pool是per-CPU还是unbound,对于per-CPU线程池,pool->cpu是大于等于0的。对于对于per-CPU线程池,其worker线程的名字是kworker/cpuworker id,如果是high priority的,后面还跟着一个H字符。对于unbound线程池,其worker线程的名字是kworker/u pool idworker id。

     

    	if (pool->cpu >= 0)
    		snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
    			 pool->attrs->nice < 0  ? "H" : "");
    	else
    		snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
    

    可以看到,下面这几个就是pre-cpu的worker

    root         4  0.0  0.0      0     0 ?        S     3月15   0:00 [kworker/0:0]
    root         5  0.0  0.0      0     0 ?        S<    3月15   0:00 [kworker/0:0H]
    root        25  0.0  0.0      0     0 ?        S<    3月15   0:00 [kworker/3:0H]
    root        66  0.0  0.0      0     0 ?        S     3月15   0:07 [kworker/1:1]
    root        67  0.0  0.0      0     0 ?        S     3月15   0:07 [kworker/0:1]

    下面几个是unbound的worker

    root        32  0.0  0.0      0     0 ?        S<    3月15   0:00 [kworker/u17:0]
    root      4600  0.0  0.0      0     0 ?        S    13:38   0:00 [kworker/u16:2]
    root      4605  0.0  0.0      0     0 ?        S    13:43   0:00 [kworker/u16:0]
    root      4612  1.0  0.0      0     0 ?        S    14:09   0:00 [kworker/u16:1]
    

     

    七、work的处理

    先给出函数,下面分析

    
    /**
     * worker_thread - the worker thread function
     * @__worker: self
     *
     * The worker thread function.  All workers belong to a worker_pool -
     * either a per-cpu one or dynamic unbound one.  These workers process all
     * work items regardless of their specific target workqueue.  The only
     * exception is work items which belong to workqueues with a rescuer which
     * will be explained in rescuer_thread().
     *
     * Return: 0
     */
    static int worker_thread(void *__worker)
    {
    	struct worker *worker = __worker;
    	struct worker_pool *pool = worker->pool;
    
    	/* tell the scheduler that this is a workqueue worker */
    	set_pf_worker(true);    //分析1 
    woke_up:
    	spin_lock_irq(&pool->lock);
    
    	/* am I supposed to die?  分析2  */
    	if (unlikely(worker->flags & WORKER_DIE)) {
    		spin_unlock_irq(&pool->lock);
    		WARN_ON_ONCE(!list_empty(&worker->entry));
    		set_pf_worker(false);
    
    		set_task_comm(worker->task, "kworker/dying");
    		ida_simple_remove(&pool->worker_ida, worker->id);
    		worker_detach_from_pool(worker);
    		kfree(worker);
    		return 0;
    	}
    
    	worker_leave_idle(worker);
    recheck:   
    	/* no more worker necessary? 分析3  */
    	if (!need_more_worker(pool))
    		goto sleep;
    
    	/* do we need to manage?  分析4  */
    	if (unlikely(!may_start_working(pool)) && manage_workers(worker))
    		goto recheck;
    
    	/*
    	 * ->scheduled list can only be filled while a worker is
    	 * preparing to process a work or actually processing it.
    	 * Make sure nobody diddled with it while I was sleeping.
    	 */
        //scheduled链表表示工作线程准备处理一个work或者正在执行一个work时才会有work添加到该链表中
    	WARN_ON_ONCE(!list_empty(&worker->scheduled));
    
    	/* 分析 5
    	 * Finish PREP stage.  We're guaranteed to have at least one idle
    	 * worker or that someone else has already assumed the manager
    	 * role.  This is where @worker starts participating in concurrency
    	 * management if applicable and concurrency management is restored
    	 * after being rebound.  See rebind_workers() for details.
    	 */
    	worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);
    
    	do {
    		struct work_struct *work =
    			list_first_entry(&pool->worklist,
    					 struct work_struct, entry);
    
    		pool->watchdog_ts = jiffies;
    
    		if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
    			/* optimization path, not strictly necessary */
    			process_one_work(worker, work);
    			if (unlikely(!list_empty(&worker->scheduled)))
    				process_scheduled_works(worker);
    		} else {
    			move_linked_works(work, &worker->scheduled, NULL);
    			process_scheduled_works(worker);
    		}
    	} while (keep_working(pool));
    
    	worker_set_flags(worker, WORKER_PREP);
    sleep:
    	/*
    	 * pool->lock is held and there's no work to process and no need to
    	 * manage, sleep.  Workers are woken up only while holding
    	 * pool->lock or from local cpu, so setting the current state
    	 * before releasing pool->lock is enough to prevent losing any
    	 * event.
    	 */
    	worker_enter_idle(worker);
    	__set_current_state(TASK_IDLE);
    	spin_unlock_irq(&pool->lock);
    	schedule();
    	goto woke_up;
    }
    

     

    分析1:

    	/* tell the scheduler that this is a workqueue worker */
    	set_pf_worker(true);
    

     

    static void set_pf_worker(bool val)
    {
    	mutex_lock(&wq_pool_attach_mutex);
    	if (val)
    		current->flags |= PF_WQ_WORKER;
    	else
    		current->flags &= ~PF_WQ_WORKER;
    	mutex_unlock(&wq_pool_attach_mutex);
    }
    

    worker线程函数一开始就会通过PF_WQ_WORKER来标注自己

    有了这样一个flag,调度器在调度当前进程sleep的时候可以检查这个准备sleep的进程是否是一个worker线程,如果是的话,那么调度器不能鲁莽的调度到其他的进程,这时候,还需要找到该worker对应的线程池,唤醒一个idle的worker线程。通过workqueue模块和调度器模块的交互,当work A被阻塞后(处理该work的worker线程进入sleep),调度器会唤醒其他的worker线程来处理其他的work B,work C……

     

    分析2:

    woke_up:
    	spin_lock_irq(&pool->lock);
    
    	/* am I supposed to die? */
    	if (unlikely(worker->flags & WORKER_DIE)) {        //-WORKER_DIE表示此工作线程将要被销毁,此时即不能使用
    		spin_unlock_irq(&pool->lock);
    		WARN_ON_ONCE(!list_empty(&worker->entry));
    		set_pf_worker(false);
    
    		set_task_comm(worker->task, "kworker/dying");
    		ida_simple_remove(&pool->worker_ida, worker->id);
    		worker_detach_from_pool(worker);
    		kfree(worker);
    		return 0;
    	}
    
    	worker_leave_idle(worker);                        //清除worker(线程)的空闲状态
    
    
        ......
    
    
    	/*
    	 * pool->lock is held and there's no work to process and no need to
    	 * manage, sleep.  Workers are woken up only while holding
    	 * pool->lock or from local cpu, so setting the current state
    	 * before releasing pool->lock is enough to prevent losing any
    	 * event.
    	 */
    	worker_enter_idle(worker);
    	__set_current_state(TASK_IDLE);
    	spin_unlock_irq(&pool->lock);
    	schedule();          //没任务了,主动放弃资源,下次被唤醒时,从调度函数出来
    	goto woke_up;        //执行这里,说是被唤醒,可能是删除该线程被唤醒,也可能是有任务了被唤醒
    

    因为线程的销毁是异步的。所以标记了要销毁的线程,会不再挂接work。待已经挂在worker上的work处理完毕,就会销毁。所以上面要是标记了销毁标志且worker上面的work确实没有了,就会销毁这个worker。

    当然一般没设置销毁标志的,则会进来之后先清除空闲标志,表示这个线程没处于挂起状态。

     

    分析3:

    recheck:
    	/* no more worker necessary? */
    	if (!need_more_worker(pool))
    		goto sleep;
    
    
        ......
    
    sleep:
    	/*
    	 * pool->lock is held and there's no work to process and no need to
    	 * manage, sleep.  Workers are woken up only while holding
    	 * pool->lock or from local cpu, so setting the current state
    	 * before releasing pool->lock is enough to prevent losing any
    	 * event.
    	 */
    	worker_enter_idle(worker);           //线程进入睡眠
    	__set_current_state(TASK_IDLE);      //设置线程的睡眠标志
    	spin_unlock_irq(&pool->lock);
    	schedule();                          //调度出去,让出cpu资源
    	goto woke_up;
    }
    

    如何判断是否需要创建更多的worker线程呢?原则如下:

    (1)有事情做:挂在worker pool中的work list不能是空的,如果是空的,那么当然sleep就好了

    /*
     * Need to wake up a worker?  Called from anything but currently
     * running workers.
     *
     * Note that, because unbound workers never contribute to nr_running, this
     * function will always return %true for unbound pools as long as the
     * worklist isn't empty.
     */
    static bool need_more_worker(struct worker_pool *pool)
    {
    	return !list_empty(&pool->worklist) && __need_more_worker(pool);
    }
    
    
    

    (2)比较忙:worker pool的nr_running成员表示线程池中当前正在干活(running状态)的worker线程有多少个,当nr_running等于0表示所有的worker线程在处理work的时候阻塞了,这时候,必须要启动新的worker线程来处理worker pool上处于active状态的work链表上的work们。

    /*
     * Policy functions.  These define the policies on how the global worker
     * pools are managed.  Unless noted otherwise, these functions assume that
     * they're being called with pool->lock held.
     */
    
    static bool __need_more_worker(struct worker_pool *pool)
    {
    	return !atomic_read(&pool->nr_running);
    }

     

    分析4

    recheck:
    	/* no more worker necessary? */
    	if (!need_more_worker(pool))
    		goto sleep;
    
    	/* do we need to manage? */
    	if (unlikely(!may_start_working(pool)) && manage_workers(worker))
    		goto recheck;
    
     

    may_start_working()判断pool中是否有idle状态工作线程。如果没有,那么manage_workers()创建一些工作线程

    /* Can I start working?  Called from busy but !running workers. */
    static bool may_start_working(struct worker_pool *pool)
    {
    	return pool->nr_idle;
    }
    

    manage_workers()函数动态管理创建工作线程的函数

    /**
     * manage_workers - manage worker pool
     * @worker: self
     *
     * Assume the manager role and manage the worker pool @worker belongs
     * to.  At any given time, there can be only zero or one manager per
     * pool.  The exclusion is handled automatically by this function.
     *
     * The caller can safely start processing works on false return.  On
     * true return, it's guaranteed that need_to_create_worker() is false
     * and may_start_working() is true.
     *
     * CONTEXT:
     * spin_lock_irq(pool->lock) which may be released and regrabbed
     * multiple times.  Does GFP_KERNEL allocations.
     *
     * Return:
     * %false if the pool doesn't need management and the caller can safely
     * start processing works, %true if management function was performed and
     * the conditions that the caller verified before calling the function may
     * no longer be true.
     */
    static bool manage_workers(struct worker *worker)
    {
    	struct worker_pool *pool = worker->pool;
    
    	if (pool->flags & POOL_MANAGER_ACTIVE)
    		return false;
    
    	pool->flags |= POOL_MANAGER_ACTIVE;        //标记线程池在管理创建线程
    	pool->manager = worker;            
    
    	maybe_create_worker(pool);                 //创建备用线程
    
    	pool->manager = NULL;
    	pool->flags &= ~POOL_MANAGER_ACTIVE;
    	wake_up(&wq_manager_wait);                 //唤醒等待队列中的事件
    	return true;
    }
    

    maybo_create_worker()函数中while首先调用create_worker()来创建新的工作线程。

    
    /**
     * maybe_create_worker - create a new worker if necessary
     * @pool: pool to create a new worker for
     *
     * Create a new worker for @pool if necessary.  @pool is guaranteed to
     * have at least one idle worker on return from this function.  If
     * creating a new worker takes longer than MAYDAY_INTERVAL, mayday is
     * sent to all rescuers with works scheduled on @pool to resolve
     * possible allocation deadlock.
     *
     * On return, need_to_create_worker() is guaranteed to be %false and
     * may_start_working() %true.
     *
     * LOCKING:
     * spin_lock_irq(pool->lock) which may be released and regrabbed
     * multiple times.  Does GFP_KERNEL allocations.  Called only from
     * manager.
     */
    static void maybe_create_worker(struct worker_pool *pool)
    __releases(&pool->lock)
    __acquires(&pool->lock)
    {
    restart:
    	spin_unlock_irq(&pool->lock);
    
    	/* if we don't make progress in MAYDAY_INITIAL_TIMEOUT, call for help */
        // 启动mayday_timer,如果创建worker时间太长就唤醒紧急worker(rescuer)处理work_struct
    	mod_timer(&pool->mayday_timer, jiffies + MAYDAY_INITIAL_TIMEOUT);
    
    	while (true) {
            //create_worker()创建成功则退出while循环;或者通过need_to_create_worker()判断是否需要继续创建新线程
    		if (create_worker(pool) || !need_to_create_worker(pool))
    			break;
    
    		schedule_timeout_interruptible(CREATE_COOLDOWN);
    
            //再次判断是否需要继续创建新线程,不需要的退出,需要的话继续创建线程
    		if (!need_to_create_worker(pool))
    			break;
    	}
    
    	del_timer_sync(&pool->mayday_timer);    //一定时间内创建成功了线程,则删除掉,本函数前面加入的定时器
    	spin_lock_irq(&pool->lock);
    	/*
    	 * This is necessary even after a new worker was just successfully
    	 * created as @pool->lock was dropped and the new worker might have
    	 * already become busy.
         * 再次判断是否需要继续创建新线程
    	 */
    	if (need_to_create_worker(pool))
    		goto restart;
    }
    

     

    分析 5 worker线程开始处理work

    
    	/*
    	 * Finish PREP stage.  We're guaranteed to have at least one idle
    	 * worker or that someone else has already assumed the manager
    	 * role.  This is where @worker starts participating in concurrency
    	 * management if applicable and concurrency management is restored
    	 * after being rebound.  See rebind_workers() for details.
    	 */
    	worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);
    
    	do {
    		struct work_struct *work =                    //获取该线程池上链表上的第一个work
    			list_first_entry(&pool->worklist,
    					 struct work_struct, entry);
    
    		pool->watchdog_ts = jiffies;
    
    		if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
    			/* optimization path, not strictly necessary */
    			process_one_work(worker, work);        //单独处理一个work
    			if (unlikely(!list_empty(&worker->scheduled)))
    				process_scheduled_works(worker);   //处理worker_pool->scheduled链表上的work_struct
    		} else {
                //如果当前work_struct置位WORK_STRUCT_LINKED表示work后面还串上其它work,把这些work迁移到woeker_pool->scheduled中,然后一并再用process_one_work()函数处理
    			move_linked_works(work, &worker->scheduled, NULL);
    			process_scheduled_works(worker);
    		}
    	} while (keep_working(pool));
    
    	worker_set_flags(worker, WORKER_PREP);

    按理说worker线程处理work应该比较简单,从线程池的worklist中取一个work,然后调用process_one_work处理之就OK了,不过现实稍微复杂一些,work和work之间并不是独立的,也就是说,work A和work B可能是linked work,这些linked work应该被一个worker来处理。WORK_STRUCT_LINKED标记了work是属于linked work,如果是linked work,worker并不直接处理,而是将其挂入scheduled work list,然后调用process_scheduled_works来处理。毫无疑问,process_scheduled_works也是调用process_one_work来处理一个一个scheduled work list上的work。

    scheduled work list并非仅仅应用在linked work,在worker处理work的时候,有一个原则要保证:同一个work不能被同一个cpu上的多个worker同时执行。这时候,如果worker发现自己要处理的work正在被另外一个worker线程处理,那么本worker线程将不处理该work,只需要挂入正在执行该work的worker线程的scheduled work list即可。

     

    把这些work迁移到woeker_pool->scheduled中,然后一并再用process_one_work()函数处理

    
    /**
     * move_linked_works - move linked works to a list
     * @work: start of series of works to be scheduled
     * @head: target list to append @work to
     * @nextp: out parameter for nested worklist walking
     *
     * Schedule linked works starting from @work to @head.  Work series to
     * be scheduled starts at @work and includes any consecutive work with
     * WORK_STRUCT_LINKED set in its predecessor.
     *
     * If @nextp is not NULL, it's updated to point to the next work of
     * the last scheduled work.  This allows move_linked_works() to be
     * nested inside outer list_for_each_entry_safe().
     *
     * CONTEXT:
     * spin_lock_irq(pool->lock).
     */
    static void move_linked_works(struct work_struct *work, struct list_head *head,
    			      struct work_struct **nextp)
    {
    	struct work_struct *n;
    
    	/*
    	 * Linked worklist will always end before the end of the list,
    	 * use NULL for list head.
    	 */
    	list_for_each_entry_safe_from(work, n, NULL, entry) {
    		list_move_tail(&work->entry, head);
    		if (!(*work_data_bits(work) & WORK_STRUCT_LINKED))
    			break;
    	}
    
    	/*
    	 * If we're already inside safe list traversal and have moved
    	 * multiple works to the scheduled queue, the next position
    	 * needs to be updated.
    	 */
    	if (nextp)
    		*nextp = n;
    }
    
    
    
    
    /**
     * process_scheduled_works - process scheduled works
     * @worker: self
     *
     * Process all scheduled works.  Please note that the scheduled list
     * may change while processing a work, so this function repeatedly
     * fetches a work from the top and executes it.
     *
     * CONTEXT:
     * spin_lock_irq(pool->lock) which may be released and regrabbed
     * multiple times.
     */
    static void process_scheduled_works(struct worker *worker)
    {
    	while (!list_empty(&worker->scheduled)) {
    		struct work_struct *work = list_first_entry(&worker->scheduled,
    						struct work_struct, entry);
    		process_one_work(worker, work);
    	}
    }
    

     

    处理一个work

    
    /**
     * process_one_work - process single work
     * @worker: self
     * @work: work to process
     *
     * Process @work.  This function contains all the logics necessary to
     * process a single work including synchronization against and
     * interaction with other workers on the same cpu, queueing and
     * flushing.  As long as context requirement is met, any worker can
     * call this function to process a work.
     *
     * CONTEXT:
     * spin_lock_irq(pool->lock) which is released and regrabbed.
     */
    static void process_one_work(struct worker *worker, struct work_struct *work)
    __releases(&pool->lock)
    __acquires(&pool->lock)
    {
    	struct pool_workqueue *pwq = get_work_pwq(work);
    	struct worker_pool *pool = worker->pool;
    	bool cpu_intensive = pwq->wq->flags & WQ_CPU_INTENSIVE;    //判断当前的workqueue是否是CPU_INTENSIVE,会对其所在工作线程进行特殊设置
    	int work_color;
    	struct worker *collision;
    #ifdef CONFIG_LOCKDEP
    	/*
    	 * It is permissible to free the struct work_struct from
    	 * inside the function that is called from it, this we need to
    	 * take into account for lockdep too.  To avoid bogus "held
    	 * lock freed" warnings as well as problems when looking into
    	 * work->lockdep_map, make a copy and use that here.
    	 */
    	struct lockdep_map lockdep_map;
    
    	lockdep_copy_map(&lockdep_map, &work->lockdep_map);
    #endif
    	/* ensure we're on the correct CPU */
    	WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) &&
    		     raw_smp_processor_id() != pool->cpu);
    
    	/*
    	 * A single work shouldn't be executed concurrently by
    	 * multiple workers on a single cpu.  Check whether anyone is
    	 * already processing the work.  If so, defer the work to the
    	 * currently executing one.
    	 */
        //-查询当前work是否在worker_pool->busy_hash表中正在运行,如果在就移到当前work正在执行的worker->scheduled并退出当前处理
    	collision = find_worker_executing_work(pool, work);
    	if (unlikely(collision)) {
    		move_linked_works(work, &collision->scheduled, NULL);
    		return;
    	}
    
    	/* claim and dequeue, */
    	debug_work_deactivate(work);
    	hash_add(pool->busy_hash, &worker->hentry, (unsigned long)work);   //将 worker 加入 busy 队列 pool->busy_hash 
    	worker->current_work = work;            //把这个work,放到这个worker的当前运行上
    	worker->current_func = work->func;
    	worker->current_pwq = pwq;
    	work_color = get_work_color(work);
    
    	/*
    	 * Record wq name for cmdline and debug reporting, may get
    	 * overridden through set_worker_desc().
    	 */
    	strscpy(worker->desc, pwq->wq->name, WORKER_DESC_LEN);
    
    	list_del_init(&work->entry);            //每次执行work,都会从所在链表上删除自己
    
    	/*
    	 * CPU intensive works don't participate in concurrency management.
    	 * They're the scheduler's responsibility.  This takes @worker out
    	 * of concurrency management and the next code block will chain
    	 * execution of the pending work items.
    	 */
    	if (unlikely(cpu_intensive))        //设置cpu密集型标志
    		worker_set_flags(worker, WORKER_CPU_INTENSIVE);
    
    	/*
    	 * Wake up another worker if necessary.  The condition is always
    	 * false for normal per-cpu workers since nr_running would always
    	 * be >= 1 at this point.  This is used to chain execution of the
    	 * pending work items for WORKER_NOT_RUNNING workers such as the
    	 * UNBOUND and CPU_INTENSIVE ones.
    	 */
            //判断是否需要唤醒更多工作线程,wake_up_worker()去唤醒worker_pool中第一个idle线程。对于bound型worker_pool此时一般nr_running>=1,所以条件不成立
    	if (need_more_worker(pool))        
    		wake_up_worker(pool);
    
    	/*
    	 * Record the last pool and clear PENDING which should be the last
    	 * update to @work.  Also, do this inside @pool->lock so that
    	 * PENDING and queued state changes happen together while IRQ is
    	 * disabled.
    	 */
        //清除struct worker中data成员pending标志位,里面使用了smp_wmb保证了pending之前的写操作完成之后才清除pending
    	set_work_pool_and_clear_pending(work, pool->id);    
    
    	spin_unlock_irq(&pool->lock);
    
    	lock_map_acquire(&pwq->wq->lockdep_map);
    	lock_map_acquire(&lockdep_map);
    	/*
    	 * Strictly speaking we should mark the invariant state without holding
    	 * any locks, that is, before these two lock_map_acquire()'s.
    	 *
    	 * However, that would result in:
    	 *
    	 *   A(W1)
    	 *   WFC(C)
    	 *		A(W1)
    	 *		C(C)
    	 *
    	 * Which would create W1->C->W1 dependencies, even though there is no
    	 * actual deadlock possible. There are two solutions, using a
    	 * read-recursive acquire on the work(queue) 'locks', but this will then
    	 * hit the lockdep limitation on recursive locks, or simply discard
    	 * these locks.
    	 *
    	 * AFAICT there is no possible deadlock scenario between the
    	 * flush_work() and complete() primitives (except for single-threaded
    	 * workqueues), so hiding them isn't a problem.
    	 */
    	lockdep_invariant_state(true);
    	trace_workqueue_execute_start(work);
    	worker->current_func(work);                //真正执行work的回调函数
    	/*
    	 * While we must be careful to not use "work" after this, the trace
    	 * point will only record its address.
    	 */
    	trace_workqueue_execute_end(work);
    	lock_map_release(&lockdep_map);
    	lock_map_release(&pwq->wq->lockdep_map);
    
    	if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
    		pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d\n"
    		       "     last function: %pf\n",
    		       current->comm, preempt_count(), task_pid_nr(current),
    		       worker->current_func);
    		debug_show_held_locks(current);
    		dump_stack();
    	}
    
    	/*
    	 * The following prevents a kworker from hogging CPU on !PREEMPT
    	 * kernels, where a requeueing work item waiting for something to
    	 * happen could deadlock with stop_machine as such work item could
    	 * indefinitely requeue itself while all other CPUs are trapped in
    	 * stop_machine. At the same time, report a quiescent RCU state so
    	 * the same condition doesn't freeze RCU.
    	 */
    	cond_resched();
    
    	spin_lock_irq(&pool->lock);
    
    	/* clear cpu intensive status */
    	if (unlikely(cpu_intensive))
    		worker_clr_flags(worker, WORKER_CPU_INTENSIVE);
    
    	/* we're done with it, release */
    	hash_del(&worker->hentry);               //-work回调函数执行完成后的清理工作
    	worker->current_work = NULL;
    	worker->current_func = NULL;
    	worker->current_pwq = NULL;
    	pwq_dec_nr_in_flight(pwq, work_color);
    }
    

     

     

     

     

    展开全文
  • 报表订阅在一段日期以后突然不工作了。 原因是用户的账号是跟AD账号捆绑的,而公司每两个月都强制需要更新一...而cognos schedule依赖的凭证并不会一起更新。 解决方法是,到用户首选项当中renew the credentials. ...

    报表订阅在一段日期以后突然不工作了。

    原因是用户的账号是跟AD账号捆绑的,而公司每两个月都强制需要更新一次AD账户的密码。而cognos schedule依赖的凭证并不会一起更新。

    解决方法是,到用户首选项当中renew the credentials.



    一劳永逸的解决方法是,新建一个公共sender账号,专门用于订阅报表。并把它的密码设置为永不过期。


    展开全文
  • spring boot之@Scheduled原理

    千次阅读 2019-02-20 20:27:29
    spring boot之@Scheduled原理前沿源码分析 前沿 当一个方法被加上@Schedule注解,然后做一些相关配置,在Spring容器启动之后,这个方法就会按照@Schedule注解的配置周期性或者延迟执行。Spring是如何办到这个的,...
  • * @param scheduled the @Scheduled annotation * @param method the method that the annotation has been declared on * @param bean the target bean instance * @see #createRunnable(Object, Method) */ ...
  • 今天让我们来看下另一个中断下半部常用的机制----work queue 一、特性 1、先激活,在某个时间点再执行函数 2、有专用的内核线程work thread 二、与可延迟函数区别 1、可延迟函数处于中断上下文,不能调用导致...
  • @Scheduled 这个注解确实给我们带了很大的方便,我们只要加上该注解,并且根据需求设置好就可以使用定时任务了。 但是,我们需要注意的是,@Scheduled 并不一定一定会按时执行。 因为使用@Scheduled 的定时任务...
  • @RefreshScope 和 @Scheduled

    2021-06-08 10:08:45
    To make it work, spring creates * lazy proxies and push them instead of real object. The issue with * scope refresh is that right after refresh in order for such a lazy proxy * to be actually ...
  • 首先来看下EnableScheduling的javadoc: @EnableScheduling启用了Spring的任务调度功能,这跟在xml中配置task:* 是一样的,它...下面的代码可以在容器中的bean上查找到@Scheduled注解,比如: package com.myco.task
  • 参考一: ... 使用spring3.1 以注解的形式配置定时任务 ...当我想把定时任务的时间...https://stackoverflow.com/questions/18079289/workaround-for-cronsequencegenerator-last-day-of-month?noredirect=1&lq= 1
  • SpringBoot使用定时器非常简单,首先定义一个需要定时执行的类,接着定义一个需要执行的方法,并在方法上添加@Scheduled注解 @Component public class ScheduleTest { @Scheduled(cron = "*/4 * * * * *") ...
  • @[TOC]Scheduled Sampling for Transformers Abstract Scheduled sampling本是用来解决序列到序列中的exposure bias问题,即RNN模型在训练时使用的是真实值(teacher forcing),而在测试时使用的是预测值,这两者的...
  • queue_delayed_work和queue_work区别

    千次阅读 2018-05-15 17:06:26
    queue_delayed_work和queue_work一、参考文献:1)http://www.linuxidc.com/Linux/2011-08/41655.htmqueue_delayed_work的使用过程如下:--&gt; 定义workqueue: struct workqueue_struct *test_workqueue; ...
  • http://blog.csdn.net/bingqingsuimeng/article/details/7891157?readlog 这两个宏都定义于include/linux/workqueue.h中:  79 #define INIT_WORK(_work, _func) /  
  • 文章源自于:http://bbs.chinaunix.net/forum.php?mod=viewthread&tid=3641752 int schedule_work(struct work_struct *work) {  return queue_work(keventd_wq, work); } /*  * schedule_work_on - put wo
  • work接口

    2021-04-19 00:25:56
    cancel_work_sync - cancel a work and wait for it to finish @work: the work to cancel Cancel @work and wait for its execution to finish. This function can be used even if the work re-queues itself or ...
  • 流利说 Level 4 全文

    万次阅读 多人点赞 2019-05-22 10:52:40
    –He has to work. -Why is Christina’s husband saving money? –He wants to start a new company. This is the second time Christina has been to San Francisco. The first time was when she was a high ...
  • more_work上的等待 有work需要处理 * /  finish_wait ( & cwq - > more_work ,   & wait ) ;     / * 下面空,因为没有定义电源管理 * /  try_to_freeze ( ) ;     if   ( kthread_should...
  • 项目开发中经常需要执行一些定时任务,比如需要在每天凌晨...SpringBoot中使用两个注解:@EnableScheduling、@Scheduled来简单实现定时任务。 【1】@Scheduled注解 按照惯例,先看源码: /** * ...
  • Scheduled和quartz的简单比较

    千次阅读 2017-08-10 12:02:48
    然后加上这个配置,让Spring识别@Scheduled注解(org.springframework.scheduling.annotation.Scheduled)。 如果还想扩展一下,改成下面这样: < task :executor id= "executor" pool-size= "5" /> < task :...
  •  88 INIT_WORK(&(_work)->work, (_func)); /  89 init_timer(&(_work)->timer); /  90 } while (0) 有时候特怀念谭浩强那本书里的那些例子程序 , 因为那些程序都特简单 , 不像现在看到的...
  • Spring提供的@Scheduled注解的定时任务并不支持任务的动态开启与关闭,以及排期的动态修改,本案例通过重写Spring的定时任务注解@Scheduled 来使其可以支持应用服务不停止的情况下在线动态修改任务的排期以及启停...
  •  88 INIT_WORK(&(_work)->work, (_func)); /  89 init_timer(&(_work)->timer); /  90 } while (0) 有时候特怀念谭浩强那本书里的那些例子程序 , 因为那些程序都特简单 , 不像现在看到的这些 , 动不动...
  • deferred work

    2020-02-02 15:40:27
    总共有两种类型的工作:structure work_struct和struct delayed_work struct delayed_work { struct work_struct work ; struct timer_list timer ; /* target workqueue and CPU ->timer uses to ...
  • // giving other ContextRefreshedEvent listeners a chance to perform // their work at the same time (e.g. Spring Batch's job registration). finishRegistration(); } } private void finishRegistration() ...
  • Springboot系列-定时任务@Scheduled 前言:在平常项目的开发中,很少去实现定时任务,也就是说很少接触到@Scheduled这个注解,在之前的Spring(MVC)开发中实现定时任务一般使用@Scheduled这个注解或者第三方框架 ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 11,133
精华内容 4,453
关键字:

scheduledwork