-
2019-12-13 14:25:45
软硬件协同设计是未来发展的主流,软硬件的界限越来越模糊,软硬件的设计思想是相通的,实现方法是各异的,实现的结果上当然也存在较大差别,因此,很有必要做好软硬件的协同设计。
什么是workqueue?Linux中的Workqueue机制就是为了简化内核线程的创建。通过调用workqueue的接口就能创建内核线程。并且可以根据当前系统CPU的个数创建线程的数量,使得线程处理的事务能够并行化。workqueue是内核中实现简单而有效的机制,他显然简化了内核daemon的创建,方便了用户的编程, Workqueue机制的实现Workqueue机制中定义了两个重要的数据结构,分析如下:
1、cpu_workqueue_struct结构。
该结构将CPU和内核线程进行了绑定。在创建workqueue的过程中,Linux根据当前系统CPU的个数创建cpu_workqueue_struct。在该结构主要维护了一个任务队列,以及内核线程需要睡眠的等待队列,另外还维护了一个任务上下文,即task_struct
2、work_struct结构是对任务的抽象。
在该结构中需要维护具体的任务方法,需要处理的数据,以及任务处理的时间。该结构定义如下:
struct work_struct {
unsigned long pending; struct list_head entry; void (*func)(void *); void *data; void *wq_data; strut timer_list timer; }; 当用户调用workqueue的初始化接口create_workqueue或者create_singlethread_workqueue对workqueue队列进行初始化时,内核就开始为用户分配一个workqueue对象,并且将其链到一个全局的workqueue队列中。然后Linux根据当前CPU的情况,为workqueue对象分配与CPU个数相同的cpu_workqueue_struct对象,每个cpu_workqueue_struct对象都会存在一条任务队列。紧接着,Linux为每个cpu_workqueue_struct对象分配一个内核thread,即内核daemon去处理每个队列中的任务。至此,用户调用初始化接口将workqueue初始化完毕,返回workqueue的指针。 在初始化workqueue过程中,内核需要初始化内核线程,注册的内核线程工作比较简单,就是不断的扫描对应cpu_workqueue_struct中的任务队列,从中获取一个有效任务,然后执行该任务。所以如果任务队列为空,那么内核daemon就在cpu_workqueue_struct中的等待队列上睡眠,直到有人唤醒daemon去处理任务队列。 Workqueue初始化完毕之后,将任务运行的上下文环境构建起来了,但是具体还没有可执行的任务,所以,需要定义具体的work_struct对象。然后将work_struct加入到任务队列中,Linux会唤醒daemon去处理任务。 上述描述的workqueue内核实现原理可以描述如下: 在Workqueue机制中,提供了一个系统默认的workqueue队列——keventd_wq,这个队列是Linux系统在初始化的时候就创建的。用户可以直接初始化一个work_struct对象,然后在该队列中进行调度,使用更加方便。
Workqueue编程接口序号接口函数说明:
1、 create_workqueue 用于创建一个workqueue队列,为系统中的每个CPU都创建一个内核线程。
输入参数:@name:workqueue的名称
2、 create_singlethread_workqueue 用于创建workqueue,只创建一个内核线程。输入参数:@name:workqueue名称
区别:使用create_singlethread_workqueue创建工作队列即使对于多CPU系统,内核也只负责在一个cpu上创建一个worker_thread内核线程;而使用create_workqueue创建工作队列对于多CPU系统,内核将会在每个CPU上创建一个worker_thread内核线程,使得线程处理的事务能够并行化.
3、 destroy_workqueue 释放workqueue队列。输入参数:@ workqueue_struct:需要释放的workqueue队列指针
4、 schedule_work 调度执行一个具体的任务,执行的任务将会被挂入Linux系统提供的workqueue——keventd_wq输入参数:@ work_struct:具体任务对象指针
5、 schedule_delayed_work 延迟一定时间去执行一个具体的任务,功能与schedule_work类似,多了一个延迟时间,输入参数:@work_struct:具体任务对象指针@delay:延迟时间
6、 queue_work 调度执行一个指定workqueue中的任务。输入参数:@ workqueue_struct:指定的workqueue指针@work_struct:具体任务对象指针
7、 queue_delayed_work 延迟调度执行一个指定workqueue中的任务,功能与queue_work类似,输入参数多了一个delay。
更多相关内容 -
create_singlethread_workqueue与create_workqueue的区别
2021-12-01 16:13:38使用create_singlethread_workqueue创建工作队列即使对于多CPU系统,内核也只负责在一个cpu上创建一个worker_thread内核线程;而使用create_workqueue创建工作队列对于多CPU系统,内核将会在每个CPU上创建一个worker...系统版本:linux3.4
使用create_singlethread_workqueue创建工作队列即使对于多CPU系统,内核也只负责在一个cpu上创建一个worker_thread内核线程;而使用create_workqueue创建工作队列对于多CPU系统,内核将会在每个CPU上创建一个worker_thread内核线程,使得线程处理的事务能够并行化.
用代码解释前先说明一个知识点:
printk任何时候,任何地方都能调用它,可以在中断上下文和进程上下文中被调用,可以在任何持有锁时被调用;可以在多处理器上同时被调用,而且调用者连锁都不必使用=========================================================================
创建workqueue
#define alloc_workqueue(fmt, flags, max_active, args...) \ __alloc_workqueue_key((fmt), (flags), (max_active), \ NULL, NULL, ##args) #define alloc_ordered_workqueue(fmt, flags, args...) \ alloc_workqueue(fmt, WQ_UNBOUND | __WQ_ORDERED | \ __WQ_ORDERED_EXPLICIT | (flags), 1, ##args) / #define create_workqueue(name) \ alloc_workqueue("%s", __WQ_LEGACY | WQ_MEM_RECLAIM, 1, (name)) #define create_singlethread_workqueue(name) \ alloc_ordered_workqueue("%s", __WQ_LEGACY | WQ_MEM_RECLAIM, name) #define create_freezable_workqueue(name) \ alloc_workqueue("%s", __WQ_LEGACY | WQ_FREEZABLE | WQ_UNBOUND | \ WQ_MEM_RECLAIM, 1, (name))
本质上都是调用了 alloc_workqueue和alloc_ordered_workqueue
alloc_ordered_workqueue比前者多了 WQ_UNBOUND | __WQ_ORDERED | __WQ_ORDERED_EXPLICIT 三个 flag。
WQ_UNBOUND:不绑定任何cpu,一般工作线程并发级别波动很大或者cpu负担很重时使用
__WQ_ORDERED:工作队列顺序执行
__WQ_ORDERED_EXPLICIT:alloc_ordered_workqueue使用
其他flag:
__WQ_LEGACY:create*_workqueue()使用
WQ_MEM_RECLAIM:任何可能在内存回收路径上使用的工作队列都必须设置它,保证不管内存压力多大都能执行
WQ_FREEZABLE:系统暂停时,工作队列进入冻结状态并清除队列中的工作项
max_active:每个工作队列同时最多能有几个工作项在同一cpu上运行,对于绑定的工作队列,最大值为512,默认情况下传值0,此时为256;对于未绑定工作队列,最大值大于512
=========================================================================例程1:使用create_workqueue创建工作队列
#include <linux/module.h> #include <linux/init.h> #include <linux/string.h> #include <linux/list.h> #include <linux/sysfs.h> #include <linux/ctype.h> #include <linux/workqueue.h> #include <linux/delay.h> //工作以队列结构组织成工作队列(workqueue),其数据结构为workqueue_struct, static struct workqueue_struct *test_wq = NULL; //把推后执行的任务叫做工作(work),描述它的数据结构为work_struct static struct work_struct work; /* *定义工作队列调用函数 */ void work_func(struct work_struct *work){ while(1){ printk(KERN_ERR "-----%s-----\n",__func__); //printk可以在多处理器上同时被调用 } } static int __init test_init(void){ /*创建工作队列workqueue_struct,该函数会为cpu创建内核线程*/ test_wq = create_workqueue("test_wq"); /*初始化工作work_struct,指定工作函数*/ INIT_WORK(&work,work_func); /*将工作加入到工作队列中,最终唤醒内核线程*/ queue_work(test_wq, &work); while(1){ mdelay(1000); printk(KERN_ERR "-----%s-----\n",__func__); } return 0; } static void __exit test_exit(void){ } module_init(test_init); module_exit(test_exit); MODULE_LICENSE("GPL"); MODULE_AUTHOR("xxx@outlook.com");
运行结果:没有打印任何信息,系统直接卡死,卡死原因是因为所有的cpu都被printk占用,系统无法调用其他的进程
=========================================================================
例程2:使用create_singlethread_workqueue创建工作队列
#include <linux/module.h> #include <linux/init.h> #include <linux/string.h> #include <linux/list.h> #include <linux/sysfs.h> #include <linux/ctype.h> #include <linux/workqueue.h> #include <linux/delay.h> //工作以队列结构组织成工作队列(workqueue),其数据结构为workqueue_struct, static struct workqueue_struct *test_wq = NULL; //把推后执行的任务叫做工作(work),描述它的数据结构为work_struct static struct work_struct work; /* *定义工作队列调用函数 */ void work_func(struct work_struct *work){ while(1){ printk(KERN_ERR "-----%s-----\n",__func__); //printk可以在多处理器上同时被调用 } } static int __init test_init(void){ /*创建工作队列workqueue_struct,该函数会为cpu创建内核线程*/ test_wq = create_singlethread_workqueue("test_wq"); /*初始化工作work_struct,指定工作函数*/ INIT_WORK(&work,work_func); /*将工作加入到工作队列中,最终唤醒内核线程*/ queue_work(test_wq, &work); while(1){ mdelay(1000); printk(KERN_ERR "-----%s-----\n",__func__); } return 0; } static void __exit test_exit(void){ } module_init(test_init); module_exit(test_exit); MODULE_LICENSE("GPL"); MODULE_AUTHOR("xxx@outlook.com");
运行结果:
<3>[ 124.050211] -----work_func----- //work_func有打印
<3>[ 124.244364] -----work_func-----
<3>[ 124.341765] -----work_func-----
<3>[ 124.537000] -----work_func-----
<3>[ 124.630770] -----work_func-----
<3>[ 124.801644] -----test_init----- //test_init有打印
<3>[ 124.825084] -----work_func-----
…
…
…
一直打印log=========================================================================
总结:
使用create_workqueue创建的工作队列在工作执行函数work_func中循环调用printk会导致系统卡死,是因为create_workqueue创建工作队列时在每个cpu上都创建了worker_thread内核线程,worker_thread线程处理的事务能够并行化,导致所有的cpu都被printk函数所占用,系统无法调用其他的进程,所以系统出现卡死并且无任何log信息打印
而使用create_singlethread_workqueue创建的工作队列只在一个cpu上创建worker_thread内核线程,只会占用1个cpu,即使该cpu一直被printk占用也还有其他的cpu可以继续调用其他的进程,所以能够一直打印log -
设备树学习(二十七、番外篇-中断子系统之workqueue创建[3])
2019-03-13 23:58:22一、前言 前一节我们知道了可以用下面...#define alloc_ordered_workqueue(fmt, flags, args...) \ alloc_workqueue(fmt, WQ_UNBOUND | __WQ_ORDERED | \ __WQ_ORDERED_EXPLICIT | (flags), 1, ##args) #de...一、前言
前一节我们知道了可以用下面几个函数来创建workqueue。
#define alloc_ordered_workqueue(fmt, flags, args...) \ alloc_workqueue(fmt, WQ_UNBOUND | __WQ_ORDERED | \ __WQ_ORDERED_EXPLICIT | (flags), 1, ##args) #define create_workqueue(name) \ alloc_workqueue("%s", __WQ_LEGACY | WQ_MEM_RECLAIM, 1, (name)) #define create_freezable_workqueue(name) \ alloc_workqueue("%s", __WQ_LEGACY | WQ_FREEZABLE | WQ_UNBOUND | \ WQ_MEM_RECLAIM, 1, (name)) #define create_singlethread_workqueue(name) \ alloc_ordered_workqueue("%s", __WQ_LEGACY | WQ_MEM_RECLAIM, name)
见得分析上面函数,可以看到调用都是alloc_workqueue,它的代码如下:
#define alloc_workqueue(fmt, flags, max_active, args...) \ __alloc_workqueue_key((fmt), (flags), (max_active), \ NULL, NULL, ##args)
可以看到根本的调用都是__alloc_workqueue_key来实现的
本文主要以__alloc_workqueue_key函数为主线,描述CMWQ中的创建一个workqueue实例的代码过程。
二、工作、工作队列、工作线程池、工作线程数据结构
workqueue机制最小的调度单元是work_struct ,即工作任务
struct work_struct { atomic_long_t data; //低比特位部分是work的标志位,剩余比特位通常用于存放上一次运行的worker_pool ID或pool_workqueue的指针。存放的内容由WORK_STRUCT_PWQ标志位来决定 struct list_head entry; //用于把work挂到工作队列上 work_func_t func; //作任务的处理函数 #ifdef CONFIG_LOCKDEP struct lockdep_map lockdep_map; #endif };
工作队列由struct workqueue_struct数据结构描述:
/* * The externally visible workqueue. It relays the issued work items to * the appropriate worker_pool through its pool_workqueues. */ struct workqueue_struct { struct list_head pwqs; /* WR: all pwqs of this wq 该workqueue所在的所有pool_workqueue链表 */ struct list_head list; /* PR: list of all workqueues 系统所有workqueue_struct的全局链表*/ struct mutex mutex; /* protects this wq */ int work_color; /* WQ: current work color */ int flush_color; /* WQ: current flush color */ atomic_t nr_pwqs_to_flush; /* flush in progress */ struct wq_flusher *first_flusher; /* WQ: first flusher */ struct list_head flusher_queue; /* WQ: flush waiters */ struct list_head flusher_overflow; /* WQ: flush overflow list */ struct list_head maydays; /* MD: pwqs requesting rescue 所有rescue状态下的pool_workqueue数据结构链表 */ struct worker *rescuer; /* I: rescue workerrescue内核线程,内存紧张时创建新的工作线程可能会失败,如果创建workqueue是设置了WQ_MEM_RECLAIM,那么rescuer线程会接管这种情况。 */ int nr_drainers; /* WQ: drain in progress */ int saved_max_active; /* WQ: saved pwq max_active */ struct workqueue_attrs *unbound_attrs; /* PW: only for unbound wqs UNBOUND类型属性 */ struct pool_workqueue *dfl_pwq; /* PW: only for unbound wqs unbound类型的pool_workqueue */ #ifdef CONFIG_SYSFS struct wq_device *wq_dev; /* I: for sysfs interface */ #endif #ifdef CONFIG_LOCKDEP struct lockdep_map lockdep_map; #endif char name[WQ_NAME_LEN]; /* I: workqueue name 该workqueue的名字 */ /* * Destruction of workqueue_struct is sched-RCU protected to allow * walking the workqueues list without grabbing wq_pool_mutex. * This is used to dump all workqueues from sysrq. */ struct rcu_head rcu; /* hot fields used during command issue, aligned to cacheline */ unsigned int flags ____cacheline_aligned; /* WQ: WQ_* flags 经常被不同CUP访问,因此要和cache line对齐 */ struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs 指向per cpu的pool workqueue */ struct pool_workqueue __rcu *numa_pwq_tbl[]; /* PWR: unbound pwqs indexed by node 指向per node的pool workqueue */ };
运行work_struct的内核线程被称为worker,即工作线程。
/* * The poor guys doing the actual heavy lifting. All on-duty workers are * either serving the manager role, on idle list or on busy hash. For * details on the locking annotation (L, I, X...), refer to workqueue.c. * * Only to be used in workqueue and async. */ struct worker { /* on idle list while idle, on busy hash table while busy */ union { struct list_head entry; /* L: while idle */ struct hlist_node hentry; /* L: while busy */ }; struct work_struct *current_work; /* L: work being processed 当前正在处理的work */ work_func_t current_func; /* L: current_work's fn 当前正在执行的work回调函数 */ struct pool_workqueue *current_pwq; /* L: current_work's pwq 当前work所属的pool_workqueue */ struct list_head scheduled; /* L: scheduled works 所有被调度并正准备执行的work_struct都挂入该链表中 */ /* 64 bytes boundary on 64bit, 32 on 32bit */ struct task_struct *task; /* I: worker task 该工作线程的task_struct数据结构 */ struct worker_pool *pool; /* A: the associated pool 该工作线程所属的worker_pool */ /* L: for rescuers */ struct list_head node; /* A: anchored at pool->workers 可以把该worker挂入到worker_pool->workers链表中 */ /* A: runs through worker->node */ unsigned long last_active; /* L: last active timestamp */ unsigned int flags; /* X: flags */ int id; /* I: worker id */ /* * Opaque string set with work_set_desc(). Printed out with task * dump for debugging - WARN, BUG, panic or sysrq. */ char desc[WORKER_DESC_LEN]; /* used only by rescuers to point to the target workqueue */ struct workqueue_struct *rescue_wq; /* I: the workqueue to rescue */ };
CMWQ提出了工作线程池的概念,struct worker_pool数据结构用于描述工作线程池。
worker_pool是per-cpu变量,每个CPU都有worker_pool,而且有两个worker_pool。
一个用于普通优先级工作线程,另一个用于高优先级工作线程。
/* * Structure fields follow one of the following exclusion rules. * * I: Modifiable by initialization/destruction paths and read-only for * everyone else. * * P: Preemption protected. Disabling preemption is enough and should * only be modified and accessed from the local cpu. * * L: pool->lock protected. Access with pool->lock held. * * X: During normal operation, modification requires pool->lock and should * be done only from local cpu. Either disabling preemption on local * cpu or grabbing pool->lock is enough for read access. If * POOL_DISASSOCIATED is set, it's identical to L. * * A: wq_pool_attach_mutex protected. * * PL: wq_pool_mutex protected. * * PR: wq_pool_mutex protected for writes. Sched-RCU protected for reads. * * PW: wq_pool_mutex and wq->mutex protected for writes. Either for reads. * * PWR: wq_pool_mutex and wq->mutex protected for writes. Either or * sched-RCU for reads. * * WQ: wq->mutex protected. * * WR: wq->mutex protected for writes. Sched-RCU protected for reads. * * MD: wq_mayday_lock protected. */ /* struct worker is defined in workqueue_internal.h */ struct worker_pool { spinlock_t lock; /* the pool lock 用于保护worker_pool的自旋锁 */ int cpu; /* I: the associated cpu对于unbound类型为-1;对于bound类型workqueue表示绑定的CPU ID */ int node; /* I: the associated node ID */ int id; /* I: pool ID 该worker_pool的ID号 */ unsigned int flags; /* X: flags */ unsigned long watchdog_ts; /* L: watchdog timestamp */ struct list_head worklist; /* L: list of pending works 挂入pending状态的work_struct */ int nr_workers; /* L: total number of workers 工作线程的数量 */ int nr_idle; /* L: currently idle workers 处于idle状态的工作线程的数量 */ struct list_head idle_list; /* X: list of idle workers 处于idle状态的工作线程链表 */ struct timer_list idle_timer; /* L: worker idle timeout */ struct timer_list mayday_timer; /* L: SOS timer for workers */ /* a workers is either on busy_hash or idle_list, or the manager */ DECLARE_HASHTABLE(busy_hash, BUSY_WORKER_HASH_ORDER); /* L: hash of busy workers */ struct worker *manager; /* L: purely informational */ struct list_head workers; /* A: attached workers 该worker_pool管理的工作线程链表 */ struct completion *detach_completion; /* all workers detached */ struct ida worker_ida; /* worker IDs for task name */ struct workqueue_attrs *attrs; /* I: worker attributes 工作线程属性 */ struct hlist_node hash_node; /* PL: unbound_pool_hash node */ int refcnt; /* PL: refcnt for unbound pools */ /* * The current concurrency level. As it's likely to be accessed * from other CPUs during try_to_wake_up(), put it in a separate * cacheline. * 用于管理worker的创建和销毁的统计计数,表示运行中的worker数量。该变量可能被多CPU同时访问,因此独占一个缓存行 */ atomic_t nr_running ____cacheline_aligned_in_smp; /* * Destruction of pool is sched-RCU protected to allow dereferences * from get_work_pool(). */ struct rcu_head rcu; } ____cacheline_aligned_in_smp;
struct pool_workqueue用于链接workqueue和worker_pool。
/* * The per-pool workqueue. While queued, the lower WORK_STRUCT_FLAG_BITS * of work_struct->data are used for flags and the remaining high bits * point to the pwq; thus, pwqs need to be aligned at two's power of the * number of flag bits. */ struct pool_workqueue { struct worker_pool *pool; /* I: the associated pool 指向worker_pool结构 */ struct workqueue_struct *wq; /* I: the owning workqueue 指向workqueue_struct结构 */ int work_color; /* L: current color */ int flush_color; /* L: flushing color */ int refcnt; /* L: reference count */ int nr_in_flight[WORK_NR_COLORS]; /* L: nr of in_flight works */ int nr_active; /* L: nr of active works 活跃的work_strcut数量 */ int max_active; /* L: max active works 最大活跃work_struct数量 */ struct list_head delayed_works; /* L: delayed works 延迟执行work_struct链表 */ struct list_head pwqs_node; /* WR: node on wq->pwqs */ struct list_head mayday_node; /* MD: node on wq->maydays */ /* * Release of unbound pwq is punted to system_wq. See put_pwq() * and pwq_unbound_release_workfn() for details. pool_workqueue * itself is also sched-RCU protected so that the first pwq can be * determined without grabbing wq->mutex. */ struct work_struct unbound_release_work; struct rcu_head rcu; } __aligned(1 << WORK_STRUCT_FLAG_BITS);
三、代码
首先列出这个函数的代码
struct workqueue_struct *__alloc_workqueue_key(const char *fmt, unsigned int flags, int max_active, struct lock_class_key *key, const char *lock_name, ...) { size_t tbl_size = 0; va_list args; struct workqueue_struct *wq; struct pool_workqueue *pwq; /* * Unbound && max_active == 1 used to imply ordered, which is no * longer the case on NUMA machines due to per-node pools. While * alloc_ordered_workqueue() is the right way to create an ordered * workqueue, keep the previous behavior to avoid subtle breakages * on NUMA. */ if ((flags & WQ_UNBOUND) && max_active == 1) //见下面分析 1 flags |= __WQ_ORDERED; /* see the comment above the definition of WQ_POWER_EFFICIENT */ if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient) //见下面分析2 flags |= WQ_UNBOUND; /* allocate wq and format name */ if (flags & WQ_UNBOUND) //见下面分析3 tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]); //计算numa_pwq_tbl要占用的大小 wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL); //分配workqueue_struct if (!wq) return NULL; if (flags & WQ_UNBOUND) { wq->unbound_attrs = alloc_workqueue_attrs(GFP_KERNEL); //unbound类型的wq要有sttribute if (!wq->unbound_attrs) goto err_free_wq; } //分析4 va_start(args, lock_name); vsnprintf(wq->name, sizeof(wq->name), fmt, args); va_end(args); max_active = max_active ?: WQ_DFL_ACTIVE; max_active = wq_clamp_max_active(max_active, flags, wq->name); /* init wq */ wq->flags = flags; wq->saved_max_active = max_active; mutex_init(&wq->mutex); atomic_set(&wq->nr_pwqs_to_flush, 0); INIT_LIST_HEAD(&wq->pwqs); INIT_LIST_HEAD(&wq->flusher_queue); INIT_LIST_HEAD(&wq->flusher_overflow); INIT_LIST_HEAD(&wq->maydays); lockdep_init_map(&wq->lockdep_map, lock_name, key, 0); INIT_LIST_HEAD(&wq->list); //分析 5 if (alloc_and_link_pwqs(wq) < 0) goto err_free_wq; if (wq_online && init_rescuer(wq) < 0) goto err_destroy; if ((wq->flags & WQ_SYSFS) && workqueue_sysfs_register(wq)) goto err_destroy; /* * wq_pool_mutex protects global freeze state and workqueues list. * Grab it, adjust max_active and add the new @wq to workqueues * list. */ mutex_lock(&wq_pool_mutex); mutex_lock(&wq->mutex); for_each_pwq(pwq, wq) pwq_adjust_max_active(pwq); mutex_unlock(&wq->mutex); list_add_tail_rcu(&wq->list, &workqueues); mutex_unlock(&wq_pool_mutex); return wq; err_free_wq: free_workqueue_attrs(wq->unbound_attrs); kfree(wq); return NULL; err_destroy: destroy_workqueue(wq); return NULL; }
分析1:
if ((flags & WQ_UNBOUND) && max_active == 1) flags |= __WQ_ORDERED;
对于最大worker为1且没绑定具体cpu的workqueue,系统也是默认整个workqueue是有序执行的。
虽然正确的使用有序工作队列应该使用下面的这个宏,但不能保证那个奇葩自己直接调用__alloc_workqueue_key函数,所以还是要在开始再判断一次。还有一种就是非统一内存访问的cpu也要强制加上这个标志,保证统一性。
#define alloc_ordered_workqueue(fmt, flags, args...) \ alloc_workqueue(fmt, WQ_UNBOUND | __WQ_ORDERED | \ __WQ_ORDERED_EXPLICIT | (flags), 1, ##args)
分析2:
/* see the comment above the definition of WQ_POWER_EFFICIENT */ if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient) flags |= WQ_UNBOUND;
在kernel中,有两种线程池,一种是线程池是per cpu的,也就是说,系统中有多少个cpu,就会创建多少个线程池,cpu x上的线程池创建的worker线程也只会运行在cpu x上。另外一种是unbound thread pool,该线程池创建的worker线程可以调度到任意的cpu上去。由于cache locality的原因,per cpu的线程池的性能会好一些,但是对power saving有一些影响。设计往往如此,workqueue需要在performance和power saving之间平衡,想要更好的性能,那么最好让一个cpu上的worker thread来处理work,这样的话,cache命中率会比较高,性能会更好。但是,从电源管理的角度来看,最好的策略是让idle状态的cpu尽可能的保持idle,而不是反复idle,working,idle again。
我们来一个例子辅助理解上面的内容。在t1时刻,work被调度到CPU A上执行,t2时刻work执行完毕,CPU A进入idle,t3时刻有一个新的work需要处理,这时候调度work到那个CPU会好些呢?是处于working状态的CPU B还是处于idle状态的CPU A呢?如果调度到CPU A上运行,那么,由于之前处理过work,其cache内容新鲜热辣,处理起work当然是得心应手,速度很快,但是,这需要将CPU A从idle状态中唤醒。选择CPU B呢就不存在将CPU 从idle状态唤醒,从而获取power saving方面的好处。
了解了上面的基础内容之后,我们再来检视per cpu thread pool和unbound thread pool。当workqueue收到一个要处理的work,如果该workqueue是unbound类型的话,那么该work由unbound thread pool处理并把调度该work去哪一个CPU执行这样的策略交给系统的调度器模块来完成,对于scheduler而言,它会考虑CPU core的idle状态,从而尽可能的让CPU保持在idle状态,从而节省了功耗。因此,如果一个workqueue有WQ_UNBOUND这样的flag,则说明该workqueue上挂入的work处理是考虑到power saving的。如果workqueue没有WQ_UNBOUND flag,则说明该workqueue是per cpu的,这时候,调度哪一个CPU core运行worker thread来处理work已经不是scheduler可以控制的了,这样,也就间接影响了功耗。
有两个参数可以控制workqueue在performance和power saving之间的平衡:
1、各个workqueue需要通过WQ_POWER_EFFICIENT来标记自己在功耗方面的属性
2、系统级别的内核参数workqueue.power_efficient。
使用workqueue的用户知道自己在电源管理方面的特点,如果该workqueue在unbound的时候会极大的降低功耗,那么就需要加上WQ_POWER_EFFICIENT的标记。这时候,如果没有标记WQ_UNBOUND,那么缺省workqueue会创建per cpu thread pool来处理work。不过,也可以通过workqueue.power_efficient这个内核参数来修改workqueue的行为:
/* see the comment above the definition of WQ_POWER_EFFICIENT */ static bool wq_power_efficient = IS_ENABLED(CONFIG_WQ_POWER_EFFICIENT_DEFAULT); module_param_named(power_efficient, wq_power_efficient, bool, 0444);
如果wq_power_efficient设定为true,那么WQ_POWER_EFFICIENT的标记的workqueue就会强制按照unbound workqueue来处理,即使没有标记WQ_UNBOUND。
分析3:
/* allocate wq and format name */ if (flags & WQ_UNBOUND) tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]); wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL); if (!wq) return NULL; if (flags & WQ_UNBOUND) { wq->unbound_attrs = alloc_workqueue_attrs(GFP_KERNEL); if (!wq->unbound_attrs) goto err_free_wq; }
代码很简单,与其要解释代码,不如来解释一些基本概念。
这里涉及2个数据结构:workqueue_struct和pool_workqueue,为何如此处理呢?我们知道,在CMWQ中,workqueue和thread pool没有严格的一一对应关系了,因此,系统中的workqueue们共享一组thread pool,因此,workqueue中的成员包括两个类别:global类型和per thread pool类型的,我们把那些per thread pool类型的数据集合起来就形成了pool_workqueue的定义。
挂入workqueue的work终究需要worker pool中的某个worker thread来处理,也就是说,workqueue要和系统中那些共享的worker thread pool进行连接,这是通过pool_workqueue(该数据结构会包含一个指向worker pool的指针)的数据结构来管理的。和这个workqueue相关的pool_workqueue被挂入一个链表,链表头就是workqueue_struct中的pwqs成员。
和旧的workqueue机制一样,系统维护了一个所有workqueue的list,list head定义如下:
static LIST_HEAD(workqueues); /* PR: list of all workqueues */
workqueue_struct中的list成员就是挂入这个链表的节点。
workqueue有两种:unbound workqueue和per cpu workqueue。对于per cpu类型,cpu_pwqs指向了一组per cpu的pool_workqueue数据结构,用来维护workqueue和per cpu thread pool之间的关系。每个cpu都有两个thread pool,normal和高优先级的线程池,到底cpu_pwqs指向哪一个pool_workqueue(worker thread)是和workqueue的flag相关,如果标有WQ_HIGHPRI,那么cpu_pwqs指向高优先级的线程池。unbound workqueue对应的pool_workqueue和workqueue属性相关,我们在下一节描述。
2、workqueue attribute
挂入workqueue的work终究是需要worker线程来处理,针对worker线程有下面几个考量点(我们称之attribute):
(1)该worker线程的优先级
(2)该worker线程运行在哪一个CPU上
(3)如果worker线程可以运行在多个CPU上,且这些CPU属于不同的NUMA(Non Uniform Memory Access Architecture 非统一内存访问) node,那么是否在所有的NUMA node中都可以获取良好的性能。
对于per-CPU的workqueue,2和3不存在问题,哪个cpu上queue的work就在哪个cpu上执行,由于只能在一个确定的cpu上执行,因此起NUMA的node也是确定的(一个CPU不可能属于两个NUMA node)。置于优先级,per-CPU的workqueue使用WQ_HIGHPRI来标记。综上所述,per-CPU的workqueue不需要单独定义一个workqueue attribute,这也是为何在workqueue_struct中只有unbound_attrs这个成员来记录unbound workqueue的属性。
unbound workqueue由于不绑定在具体的cpu上,可以运行在系统中的任何一个cpu,直觉上似乎系统中有一个unbound thread pool就OK了,不过让一个thread pool创建多种属性的worker线程是一个好的设计吗?本质上,thread pool应该创建属性一样的worker thread。因此,我们通过workqueue属性来对unbound workqueue进行分类,workqueue属性定义如下:
/** * struct workqueue_attrs - A struct for workqueue attributes. * * This can be used to change attributes of an unbound workqueue. */ struct workqueue_attrs { /** * @nice: nice level */ int nice; /** * @cpumask: allowed CPUs */ cpumask_var_t cpumask; /** * @no_numa: disable NUMA affinity * * Unlike other fields, ``no_numa`` isn't a property of a worker_pool. It * only modifies how :c:func:`apply_workqueue_attrs` select pools and thus * doesn't participate in pool hash calculations or equality comparisons. */ bool no_numa; };
- nice是一个和thread优先级相关的属性,nice越低则优先级越高。
- cpumask是该workqueue挂入的work允许在哪些cpu上运行。
- no_numa是一个和NUMA affinity相关的设定。
3、unbound workqueue和NUMA之间的联系
UMA(Uniform Memory Access Architecture 统一内存访问)系统中,所有的processor看到的内存都是一样的,访问速度也是一样,无所谓local or remote,因此,内核线程如果要分配内存,那么也是无所谓,统一安排即可。在NUMA系统中,不同的一个或者一组cpu看到的memory是不一样的,我们假设node 0中有CPU A和B,node 1中有CPU C和D,如果运行在CPU A上内核线程现在要迁移到CPU C上的时候,悲剧发生了:该线程在A CPU创建并运行的时候,分配的内存是node 0中的memory,这些memory是local的访问速度很快,当迁移到CPU C上的时候,原来local memory变成remote,性能大大降低。因此,unbound workqueue需要引入NUMA的考量点。
NUMA是内存管理的范畴,本文不会深入描述,我们暂且放开NUMA,先思考这样的一个问题:一个确定属性的unbound workqueue需要几个线程池?看起来一个就够了,毕竟workqueue的属性已经确定了,一个线程池创建相同属性的worker thread就行了。但是我们来看一个例子:假设workqueue的work是可以在node 0中的CPU A和B,以及node 1中CPU C和D上处理,如果只有一个thread pool,那么就会存在worker thread在不同node之间的迁移问题。为了解决这个问题,实际上unbound workqueue实际上是创建了per node的pool_workqueue(thread pool)
当然,是否使用per node的pool workqueue用户是可以通过下面的参数进行设定的:
(1)workqueue attribute中的no_numa成员
(2)通过workqueue.disable_numa这个参数,disable所有workqueue的numa affinity的支持。
static bool wq_disable_numa; module_param_named(disable_numa, wq_disable_numa, bool, 0444);
分析4、初始化workqueue的成员
除了max active,没有什么要说的,代码都简单而且直观。如果用户没有设定max active(或者说max active等于0),那么系统会给出一个缺省的设定。系统定义了两个最大值WQ_MAX_ACTIVE(512)和WQ_UNBOUND_MAX_ACTIVE(和cpu数目有关,最大值是cpu数目乘以4,当然也不能大于WQ_MAX_ACTIVE),分别限定per cpu workqueue和unbound workqueue的最大可以创建的worker thread的数目。wq_clamp_max_active可以将max active限制在一个确定的范围内。
分析5
static int alloc_and_link_pwqs(struct workqueue_struct *wq) { bool highpri = wq->flags & WQ_HIGHPRI; //获取normal or high priority int cpu, ret; if (!(wq->flags & WQ_UNBOUND)) { //per cpu workqueue的处理 wq->cpu_pwqs = alloc_percpu(struct pool_workqueue); //为每个cpu分配一个pool_workqueue数据结构 if (!wq->cpu_pwqs) return -ENOMEM; for_each_possible_cpu(cpu) { //逐个cpu进行设定 struct pool_workqueue *pwq = per_cpu_ptr(wq->cpu_pwqs, cpu); //获取本cpu的pool_workqueue struct worker_pool *cpu_pools = per_cpu(cpu_worker_pools, cpu); //获取本cpu的worker_pool //将动态分配的cpu_pwqs和静态定义的cpu_worker_pools关联起来 init_pwq(pwq, wq, &cpu_pools[highpri]); mutex_lock(&wq->mutex); link_pwq(pwq); //把pool_workqueue添加到workqueue_struct->pwqs链表中 mutex_unlock(&wq->mutex); } return 0; } else if (wq->flags & __WQ_ORDERED) { //wq加入到ordered_wq_attrs的处理 (有序未绑定cpu) ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]); /* there should only be single pwq for ordering guarantee */ WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node || wq->pwqs.prev != &wq->dfl_pwq->pwqs_node), "ordering guarantee broken for workqueue %s\n", wq->name); return ret; } else { //wq加入到unbound_std_wq_attrs的处理 (无序未绑定cpu) return apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]); } }
通过alloc_percpu可以为每一个cpu分配一个pool_workqueue的memory。每个pool_workqueue都有一个对应的worker thread pool,对于per-CPU workqueue,它是静态定义的,如下:
/* the per-cpu worker pools */ static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], cpu_worker_pools); //NR_STD_WORKER_POOLS = 2, 表示每个cpu定义两个标准的worker pool //上面这个宏也就表示,每个cpu定义了两个struct worker_pool, 结构体的名字为cpu_worker_pools
对于未绑定cpu的wq,系统也定义了相关属性的指针(也是分为normal和high两种)
/* I: attributes used when instantiating standard unbound pools on demand */ static struct workqueue_attrs *unbound_std_wq_attrs[NR_STD_WORKER_POOLS]; /* I: attributes used when instantiating ordered pools on demand */ static struct workqueue_attrs *ordered_wq_attrs[NR_STD_WORKER_POOLS];
当然这连个真真的实例化是在系统初始化阶段,给动态申请内存的。
将动态分配的cpu_pwqs和静态定义的cpu_worker_pools关联起来/* initialize newly alloced @pwq which is associated with @wq and @pool */ static void init_pwq(struct pool_workqueue *pwq, struct workqueue_struct *wq, struct worker_pool *pool) { BUG_ON((unsigned long)pwq & WORK_STRUCT_FLAG_MASK); memset(pwq, 0, sizeof(*pwq)); pwq->pool = pool; //连接本cpu上的wq的线程池 pwq->wq = wq; //链接工作队列 pwq->flush_color = -1; pwq->refcnt = 1; INIT_LIST_HEAD(&pwq->delayed_works); INIT_LIST_HEAD(&pwq->pwqs_node); INIT_LIST_HEAD(&pwq->mayday_node); INIT_WORK(&pwq->unbound_release_work, pwq_unbound_release_workfn); }
pool_workqueue添加到workqueue_struct->pwqs链表中
/* sync @pwq with the current state of its associated wq and link it */ static void link_pwq(struct pool_workqueue *pwq) { struct workqueue_struct *wq = pwq->wq; lockdep_assert_held(&wq->mutex); /* may be called multiple times, ignore if already linked */ if (!list_empty(&pwq->pwqs_node)) return; /* set the matching work_color */ pwq->work_color = wq->work_color; /* sync max_active to the current setting */ pwq_adjust_max_active(pwq); /* link in @pwq */ list_add_rcu(&pwq->pwqs_node, &wq->pwqs); }
unbound workqueue有两种,一种是normal type。
另外一种是ordered type,这种workqueue上的work是严格按照顺序执行的,不存在并发问题。ordered unbound workqueue的行为类似过去的single thread workqueue。
但是,无论那种类型的unbound workqueue都使用apply_workqueue_attrs来建立workqueue、pool wq和thread pool之间的关系。
/** * apply_workqueue_attrs - apply new workqueue_attrs to an unbound workqueue * @wq: the target workqueue * @attrs: the workqueue_attrs to apply, allocated with alloc_workqueue_attrs() * * Apply @attrs to an unbound workqueue @wq. Unless disabled, on NUMA * machines, this function maps a separate pwq to each NUMA node with * possibles CPUs in @attrs->cpumask so that work items are affine to the * NUMA node it was issued on. Older pwqs are released as in-flight work * items finish. Note that a work item which repeatedly requeues itself * back-to-back will stay on its current pwq. * * Performs GFP_KERNEL allocations. * * Return: 0 on success and -errno on failure. */ int apply_workqueue_attrs(struct workqueue_struct *wq, const struct workqueue_attrs *attrs) { int ret; apply_wqattrs_lock(); ret = apply_workqueue_attrs_locked(wq, attrs); apply_wqattrs_unlock(); return ret; } static int apply_workqueue_attrs_locked(struct workqueue_struct *wq, const struct workqueue_attrs *attrs) { struct apply_wqattrs_ctx *ctx; /* only unbound workqueues can change attributes */ if (WARN_ON(!(wq->flags & WQ_UNBOUND))) //参数检查,这里是只处理未绑定cpu的 return -EINVAL; /* creating multiple pwqs breaks ordering guarantee */ if (!list_empty(&wq->pwqs)) { //参数检查,__WQ_ORDERED_EXPLICIT标志的属于他的pwq只能有一个 if (WARN_ON(wq->flags & __WQ_ORDERED_EXPLICIT)) return -EINVAL; wq->flags &= ~__WQ_ORDERED; } ctx = apply_wqattrs_prepare(wq, attrs); //申请特定属性的wq if (!ctx) return -ENOMEM; /* the ctx has been prepared successfully, let's commit it */ apply_wqattrs_commit(ctx); //提交安装,见最后面的分析 7 apply_wqattrs_cleanup(ctx); return 0; }
/* allocate the attrs and pwqs for later installation */ static struct apply_wqattrs_ctx * apply_wqattrs_prepare(struct workqueue_struct *wq, const struct workqueue_attrs *attrs) { struct apply_wqattrs_ctx *ctx; struct workqueue_attrs *new_attrs, *tmp_attrs; int node; lockdep_assert_held(&wq_pool_mutex); ctx = kzalloc(struct_size(ctx, pwq_tbl, nr_node_ids), GFP_KERNEL); new_attrs = alloc_workqueue_attrs(GFP_KERNEL); //申请一个workqueue_attrs tmp_attrs = alloc_workqueue_attrs(GFP_KERNEL); //申请临时sttr if (!ctx || !new_attrs || !tmp_attrs) goto out_free; /* * Calculate the attrs of the default pwq. * If the user configured cpumask doesn't overlap with the * wq_unbound_cpumask, we fallback to the wq_unbound_cpumask. */ copy_workqueue_attrs(new_attrs, attrs); //系统默认的拷贝的新的 cpumask_and(new_attrs->cpumask, new_attrs->cpumask, wq_unbound_cpumask); //对新的设置属性 if (unlikely(cpumask_empty(new_attrs->cpumask))) //检视是不是对所有cpu都无效 cpumask_copy(new_attrs->cpumask, wq_unbound_cpumask); /* * We may create multiple pwqs with differing cpumasks. Make a * copy of @new_attrs which will be modified and used to obtain * pools. */ copy_workqueue_attrs(tmp_attrs, new_attrs); //拷贝一份副本 /* * If something goes wrong during CPU up/down, we'll fall back to * the default pwq covering whole @attrs->cpumask. Always create * it even if we don't use it immediately. */ ctx->dfl_pwq = alloc_unbound_pwq(wq, new_attrs); //分配default pool workqueue if (!ctx->dfl_pwq) goto out_free; //遍历node for_each_node(node) { if (wq_calc_node_cpumask(new_attrs, node, -1, tmp_attrs->cpumask)) { //是否使用default pool wq ctx->pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs); 该node使用自己的pool wq if (!ctx->pwq_tbl[node]) goto out_free; } else { ctx->dfl_pwq->refcnt++; //默认的使用计数 ctx->pwq_tbl[node] = ctx->dfl_pwq; //该node使用default pool wq } } /* save the user configured attrs and sanitize it. */ copy_workqueue_attrs(new_attrs, attrs); cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask); ctx->attrs = new_attrs; ctx->wq = wq; free_workqueue_attrs(tmp_attrs); //释放掉临时的workqueue_attrs return ctx; out_free: free_workqueue_attrs(tmp_attrs); free_workqueue_attrs(new_attrs); apply_wqattrs_cleanup(ctx); return NULL; } /* obtain a pool matching @attr and create a pwq associating the pool and @wq */ static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq, const struct workqueue_attrs *attrs) { struct worker_pool *pool; struct pool_workqueue *pwq; lockdep_assert_held(&wq_pool_mutex); pool = get_unbound_pool(attrs); //获取一个worker_pool if (!pool) return NULL; //指定内存节点份分配pool_workqueue ,后面 分析6 pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node); if (!pwq) { put_unbound_pool(pool); return NULL; } init_pwq(pwq, wq, pool); //未绑定cpu的也要对应的worker_pool和wq绑定到pool_workqueue上 return pwq; }
pwq_tbl数组用来保存unbound workqueue各个node的pool workqueue的指针,new_attrs和tmp_attrs都是一些计算workqueue attribute的中间变量,开始的时候设定为用户传入的workqueue的attribute。
如何为unbound workqueue的pool workqueue寻找对应的线程池?
具体的代码在get_unbound_pool函数中。
/** * get_unbound_pool - get a worker_pool with the specified attributes * @attrs: the attributes of the worker_pool to get * * Obtain a worker_pool which has the same attributes as @attrs, bump the * reference count and return it. If there already is a matching * worker_pool, it will be used; otherwise, this function attempts to * create a new one. * * Should be called with wq_pool_mutex held. * * Return: On success, a worker_pool with the same attributes as @attrs. * On failure, %NULL. */ static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs) { u32 hash = wqattrs_hash(attrs); struct worker_pool *pool; int node; int target_node = NUMA_NO_NODE; lockdep_assert_held(&wq_pool_mutex); /* do we already have a matching pool? 有相同属相的,则不需要再创建了新的线程池 */ hash_for_each_possible(unbound_pool_hash, pool, hash_node, hash) { if (wqattrs_equal(pool->attrs, attrs)) { pool->refcnt++; return pool; } } /* if cpumask is contained inside a NUMA node, we belong to that node */ if (wq_numa_enabled) { for_each_node(node) { if (cpumask_subset(attrs->cpumask, wq_numa_possible_cpumask[node])) { target_node = node; break; } } } /* nope, create a new one,没有相同的,创建一个这个属性的线程池 */ pool = kzalloc_node(sizeof(*pool), GFP_KERNEL, target_node); if (!pool || init_worker_pool(pool) < 0) goto fail; lockdep_set_subclass(&pool->lock, 1); /* see put_pwq() */ copy_workqueue_attrs(pool->attrs, attrs); pool->node = target_node; /* * no_numa isn't a worker_pool attribute, always clear it. See * 'struct workqueue_attrs' comments for detail. */ pool->attrs->no_numa = false; if (worker_pool_assign_id(pool) < 0) goto fail; /* create and start the initial worker */ if (wq_online && !create_worker(pool)) goto fail; /* install */ hash_add(unbound_pool_hash, &pool->hash_node, hash); //把新的这个属性的线程池也挂到unbound_pool_hash的hash表上 return pool; fail: if (pool) put_unbound_pool(pool); return NULL; }
per cpu的workqueue的pool workqueue对应的线程池也是per cpu的,每个cpu有两个线程池(normal和high priority),因此将pool workqueue和thread pool对应起来是非常简单的事情。对于unbound workqueue,对应关系没有那么直接,如果属性相同,多个unbound workqueue的pool workqueue可能对应一个thread pool。
系统使用哈希表来保存所有的unbound worker thread pool,定义如下:
/* PL: hash of all unbound pools keyed by pool->attrs */ static DEFINE_HASHTABLE(unbound_pool_hash, UNBOUND_POOL_HASH_ORDER);
在创建unbound workqueue的时候,pool workqueue对应的worker thread pool需要在这个哈希表中搜索,如果有相同属性的worker thread pool的话,那么就不需要创建新的线程池,代码如下:
/* do we already have a matching pool? 有相同属相的,则不需要再创建了新的线程池 */ hash_for_each_possible(unbound_pool_hash, pool, hash_node, hash) { if (wqattrs_equal(pool->attrs, attrs)) { pool->refcnt++; return pool; } }
分析6 各个node分配pool workqueue并初始化
在进入代码之前,先了解一些基础知识。缺省情况下,挂入unbound workqueue的works最好是考虑NUMA Affinity,这样可以获取更好的性能。当然,实际上用户可以通过workqueue.disable_numa这个内核参数来关闭这个特性,这时候,系统需要一个default pool workqueue(workqueue_struct的dfl_pwq成员),所有的per node的pool workqueue指针都是执行default pool workqueue。
workqueue.disable_numa是enable的情况下是否不需要default pool workqueue了呢?也不是,我们举一个简单的例子,一个系统的构成是这样的:node 0中有CPU A和B,node 1中有CPU C和D,node 2中有CPU E和F,假设workqueue的attribute规定work只能在CPU A 和C上运行,那么在node 0和node 1中创建自己的pool workqueue是ok的,毕竟node 0中有CPU A,node 1中有CPU C,该node创建的worker thread可以在A或者C上运行。但是对于node 2节点,没有任何的CPU允许处理该workqueue的work,在这种情况下,没有必要为node 2建立自己的pool workqueue,而是使用default pool workqueue。
OK,我们来看代码:
/* * If something goes wrong during CPU up/down, we'll fall back to * the default pwq covering whole @attrs->cpumask. Always create * it even if we don't use it immediately. */ ctx->dfl_pwq = alloc_unbound_pwq(wq, new_attrs); //分配default pool workqueue if (!ctx->dfl_pwq) goto out_free; //遍历node for_each_node(node) { if (wq_calc_node_cpumask(new_attrs, node, -1, tmp_attrs->cpumask)) { //是否使用default pool wq ctx->pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs); 该node使用自己的pool wq if (!ctx->pwq_tbl[node]) goto out_free; } else { ctx->dfl_pwq->refcnt++; //默认的使用计数 ctx->pwq_tbl[node] = ctx->dfl_pwq; //该node使用default pool wq } }
分析7:
所有的node的pool workqueue及其worker thread pool已经ready,需要安装到workqueue中了
/* set attrs and install prepared pwqs, @ctx points to old pwqs on return */ static void apply_wqattrs_commit(struct apply_wqattrs_ctx *ctx) { int node; /* all pwqs have been created successfully, let's install'em */ mutex_lock(&ctx->wq->mutex); copy_workqueue_attrs(ctx->wq->unbound_attrs, ctx->attrs); /* save the previous pwq and install the new one */ for_each_node(node) ctx->pwq_tbl[node] = numa_pwq_tbl_install(ctx->wq, node, ctx->pwq_tbl[node]); /* @dfl_pwq might not have been used, ensure it's linked */ link_pwq(ctx->dfl_pwq); swap(ctx->wq->dfl_pwq, ctx->dfl_pwq); mutex_unlock(&ctx->wq->mutex); }
本文参考:
-
如何理解create_singlethread_workqueue是严格按照顺序执行的
2018-10-12 11:56:14我们知道工作队列有三种,分别是PerCpu, Unbound,以及ORDERED这三种类型,正如之前的文档分析: 1.PerCpu的工作队列: API: create_workqueue(name) 这种工作队列在queue_work的时候,首先检查当前的Cpu是哪一个...我们知道工作队列有三种,分别是PerCpu, Unbound,以及ORDERED这三种类型,正如之前的文档分析:
1.PerCpu的工作队列:
API:create_workqueue(name)
这种工作队列在queue_work的时候,首先检查当前的Cpu是哪一个,然后将work调度到该cpu下面的normal级别的线程池中运行。
注:针对PerCpu类型而言,系统在开机的时候会注册2个线程池,一个低优先级的,一个高优先级的。
2.Unbound的工作队列:
API:create_freezable_workqueue(name)
这种工作队列在queue_work的时候,同样首先检查当前的Cpu是哪一个,随后需要计算当前的Cpu属于哪一个Node,因为对于Unbound的工作队列而言,线程池并不是绑定到cpu的而是绑定到Node的,随后找到该Node对应的线程池中运行。需要留意的是这种工作队列是考虑了功耗的,例如:当work调度的时候,调度器会尽量的让已经休眠的cpu保持休眠,而将当前的work调度到其他active的cpu上去执行。
注:对于NUMA没有使能的情况下,所有节点的线程池都会指向dfl的线程池。
3.Ordered的工作队列:
API:create_singlethread_workqueue(name) 或者 alloc_ordered_workqueue(fmt,
flags, args…)这种work也是Unbound中的一种,但是这种工作队列即便是在NUMA使能的情况下,所有Node的线程池都会被指向dfl的线程池,换句话说Ordered的工作队列只有一个线程池,因为只有这样才能保证Ordered的工作队列是顺序执行的,而这也是本文分析的切入点。
有关并发问题的总结性陈述:
首先对于Ordered的工作队列(create_singlethread_workqueue,其他自定义的API则不一定了)这是严格顺序执行的,绝对不可能出现并发(无论提交给wq的是否是同一个work)。
但是对于PerCpu的工作队列(create_workqueue),其中对于提交给wq的如果是同一个work,那么也不会并发,会顺序执行。但是如果提交给wq的不是同一个work,则会在不同的cpu间并发。需要特别留意的是,其并不会在同一个CPU的不同线程间并发,这是因为create_workqueue这个API定义的max_active为1,也就意味者,当前wq只能最多在每个cpu上并发1个线程。
#define alloc_ordered_workqueue(fmt, flags, args...) \ alloc_workqueue(fmt, WQ_UNBOUND | __WQ_ORDERED | (flags), 1, ##args) #define create_singlethread_workqueue(name) \ alloc_ordered_workqueue("%s", WQ_MEM_RECLAIM, name)
这里面特别要去留意的是alloc_workqueue的第二个参数是flags,第三个参数表示当前工作队列max_active的work个数,比如当前值为1,那么在当前工作队列中如果已经有work在执行中了,随后排队的work只能进入pwq->delayed_works的延迟队列中,等到当前的work执行完毕后再顺序执行。
那么这儿有个疑问就是如果将active增大,是否意味着队列中的work可以并行执行了呢,也不全是,如果当前排队的work和正在执行的work是同一个的话则需要等待当前work执行完成后顺序执行。如果当前排队的work和正在执行的work不是同一个同时alloc_workqueue函数的第三个参数(max_active)大于1的话,那么内核会为你在线程池中开启一个新的线程来执行这个work。
OK,Read The Fuck Source.
kernel\Workqueue.c
static void __queue_work(int cpu, struct workqueue_struct *wq, struct work_struct *work) { ..... /* 1. 当第一次调度的时候,由于pwq->nr_active为0,低于max_active(1),则将work加入到线程池中的worklist中,pwq->nr_active自增. 2. 当第二次调度的时候,且第一次调度的work正在执行中(进入function了),由于pwq->nr_active为1,不低于max_active(1),则将work加入到线程池的delayed_works延迟列表中,并设置当前work的flag为WORK_STRUCT_DELAYED. */ if (likely(pwq->nr_active < pwq->max_active)) { trace_workqueue_activate_work(work); pwq->nr_active++; worklist = &pwq->pool->worklist; } else { work_flags |= WORK_STRUCT_DELAYED; worklist = &pwq->delayed_works; } //如上面的描述插入到对应的链表中 insert_work(pwq, work, worklist, work_flags); .... }
static void insert_work(struct pool_workqueue *pwq, struct work_struct *work, struct list_head *head, unsigned int extra_flags) { struct worker_pool *pool = pwq->pool; //设置work的flag set_work_pwq(work, pwq, extra_flags); //将work加入到对应的线程池的worklist或者delayed_works链表中 list_add_tail(&work->entry, head); ... //从线程池中取出处于idle的线程,唤醒它 if (__need_more_worker(pool)) wake_up_worker(pool); }
我们继续看看唤醒的线程中是怎么处理的,是使用这个唤醒的idle线程呢?还是在原有的线程处理结束后再执行?
static int worker_thread(void *__worker) { ... woke_up: /* 如下所示 1.针对第一次调度的情况,pool的worklist不为NULL,且pool->nr_running为0(意味着所有的worker都进入了阻塞状态),则当前唤醒的线程将继续处理这个work。 2.针对第二次调度的情况,且第一次调度的work正在执行中(进入function了),那么由于pool的worklist为NULL(该work进入了延迟队列),那么,当前唤醒的worker会直接睡眠。 */ if (!need_more_worker(pool)) goto sleep; if (unlikely(!may_start_working(pool)) && manage_workers(worker)) goto recheck; ... do { ... process_one_work(worker, work); ... } while (keep_working(pool)); .... sleep: worker_enter_idle(worker); __set_current_state(TASK_INTERRUPTIBLE); spin_unlock_irq(&pool->lock); schedule(); goto woke_up; }
我们再看看process_one_work的执行过程
static void process_one_work(struct worker *worker, struct work_struct *work) { ... //这里的理解也是非常的重要的 /* 首先在线程池中正在运行的线程中取出正在运行的work和当前想要处理的work进行比对,如果是同一个work那么直接返回,等待原先的那个work处理结束后再紧接着处理。 */ collision = find_worker_executing_work(pool, work); if (unlikely(collision)) { move_linked_works(work, &collision->scheduled, NULL); return; } ... //真正处理这个work的地方 worker->current_func(work); ... //判断是否要处理延迟队列的work pwq_dec_nr_in_flight(pwq, work_color); ... }
看看延迟队列是怎么提取出来的
static void pwq_dec_nr_in_flight(struct pool_workqueue *pwq, int color) { ... //当目前的work处理完成后,就可以将当前工作队列(ordered类型)active的work减1到0了,也就是说当前工作队列(ordered类型)又可以接收新的work了 pwq->nr_active--; //如果之前的延迟队列有待处理的work,那么取出来加到pool->worklist,等到线程的下一次while循环的时候执行。 /* 流程如下: pwq_activate_first_delayed->pwq_activate_delayed_work->move_linked_works */ if (!list_empty(&pwq->delayed_works)) { /* one down, submit a delayed one */ if (pwq->nr_active < pwq->max_active) pwq_activate_first_delayed(pwq); } ... } static void pwq_activate_delayed_work(struct work_struct *work) { struct pool_workqueue *pwq = get_work_pwq(work); trace_workqueue_activate_work(work); move_linked_works(work, &pwq->pool->worklist, NULL); __clear_bit(WORK_STRUCT_DELAYED_BIT, work_data_bits(work)); pwq->nr_active++; }
最后说明一下2个问题:
- pool->nr_running,这个flag表示当前线程池是否是阻塞或者Active状态。
0: 阻塞状态,work的function中有可能调用了导致sleep的函数,例如msleep,wait_interrupt,mutex等。这种情况下如果再次insert_work的话,需要在当前线程池中,开启新的线程(这个线程有可能是在当前CPU的不同线程或者是不同的CPU上)去处理。
1:Active状态,work的function还在执行中,且没有导致sleep的操作。这种情况下如果再次insert_work的话,不需要再开启新的线程了,直接在原有线程中处理即可。 - 第二个问题是针对unbound的工作队列,其线程池是否需要额外创建的原则是属性是否一致,属性匹配只关注2个地方,一个是优先级,一个是cpumask(当前工作是否可以在对应的cpu上运行)。
- pool->nr_running,这个flag表示当前线程池是否是阻塞或者Active状态。
-
linux工作队列 - workqueue总览【转】
2021-05-12 15:53:30版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。本文链接:https://blog.csdn.net/l289123557/article/details/52551176workqueue归入中断子系统是由于和...workqueue... -
Linux内核工作队列workqueue分析(七)
2021-03-14 17:38:17工作队列(workqueue)是除了软中断softirq和小任务tasklet以外最常用的一种中断下半部分机制,由内核统一管理。工作队列把推迟执行的任务交给内核线程来执行,其运行在进程上下文,允许重新调度、睡眠。工作队列... -
Linux-workqueue讲解
2019-06-22 16:15:00代码:linux-3.10.65/kernel/workqueue.c =============================== 1. workqueue 是什么? workqueue是对内核线程封装的用于处理各种工作项的一种处理方法, 由于处理对象是用链表拼接一个个工作项, ... -
linux工作队列 - workqueue总览
2016-10-15 18:17:28workqueue归入中断子系统是由于和中断处理有密切关系,写博客重要在于整理自己的思绪,写的时候会把一些不懂的细节问题暴露出来,这样会把问题看的更透彻,workqueue的代码在文件kernel/workqueue.c中,大约5K+行,... -
Linux 中断子系统(四)-Workqueue
2022-01-02 22:00:152)Workqueue工作队列可以用作中断处理的Bottom-half机制,利用进程上下文来执行中断处理中耗时的任务,因此它允许睡眠,而Softirg和Tasklet在处理任务时不能睡眠; 3)在中断处理过程中,或者其他子系统中,调用... -
linux 笔记--中断子系统之workqueue
2017-03-26 16:30:23linux workqueue,cmwq -
工作队列详解
2018-10-09 20:07:191.工作队列的创建 ...workqueue_demo = create_singlethread_workqueue(&amp;amp;amp;amp;amp;amp;amp;quot;workqueue demo&amp;amp;amp;amp;amp;amp;amp;quot;); queue_work(work -
struct workqueue_struct *alloc_workqueue(const char *fmt, unsigned int flags, int max_active, ...) { size_t tbl_size = 0; va_list args; struct workqueue_struct *wq; struct pool_workqueue *pwq; ...
-
Linux下的workqueue
2019-12-07 10:23:53先看workqueue的创建过程 ... * alloc_ordered_workqueue - allocate an ordered workqueue * @fmt: printf format for the name of the workqueue * @flags: WQ_* flags (only WQ_FREEZABLE and ... -
Linux Kernel 中 Workqueue 使用系统默认队列和创建队列的方法
2021-05-19 03:56:10关于workqueue,我们还是有很多话要说。想必大家对workqueue相关的函数(schedule_work 、queue_work、INIT_WORK、create_singlethread_workqueue 等)都不陌生。但说起差异,可能还有许多话需要坐下来慢慢讲。对于... -
linux工作队列 - workqueue_struct创建
2016-10-16 19:27:201.创建workqueue代码分析1.1整体代码分析根据FLAG的不同,创建workqueue的API分好几种(见系列文章1说明),根据情况使用,但最终这些API都会调用到alloc_workqueue,这是一个宏定义,它的调用序列图如下所示:这里... -
详解 Linux Workqueue 原理
2021-06-24 13:21:08Workqueue 是内核里面很重要的一个机制,特别是内核驱动,一般的小型任务 (work) 都不会自己起一个线程来处理,而是扔到 Workqueue 中处理。Workqueue 的主要工作就是用进程上下文来处理内核中大量的小任务。 所以 ... -
Linux内核架构: workqueue
2017-02-05 15:10:29http://lwn.net/Articles/403891/linux内核中断处理的工作队列workqueue机制工作队列(workqueue)是另外一种将工作推后执行的形式。工作队列可以把工作推后,交由一个内核线程去执行,也就是说,这个下半部分可以在... -
workqueue机制
2017-11-14 20:18:08workqueue和其他的bottom half最大的不同是它是运行在进程上下文中的,它可以睡眠,这和其他bottom half机制有本质的不同,大大方便了驱动工程师撰写中断处理代码。当然,驱动模块也可以自己创建一个kernel thread来... -
Linux Workqueue
2021-05-10 18:43:12Workqueue 是内核里面很重要的一个机制,特别是内核驱动,一般的小型任务 (work) 都不会自己起一个线程来处理,而是扔到 Workqueue 中处理。Workqueue 的主要工作就是用进程上下文来处理内核中大量的小任务。所以 ... -
linux 工作队列
2014-07-31 18:53:28转自 http://blog.csdn.net/a254373829/article/details/8528652 书上写的工作队列的实现是创建一个单独的线程来执行相应的work.... ...alloc_workqueue(name, flags, max_active) a -
Linux中断管理 (3)workqueue工作队列
2019-06-25 12:33:39/* PL: list of all workqueues */-----------------系统所有workqueue_struct的全局链表 struct mutex mutex; /* protects this wq */ int work_color; /* WQ: current work color */ int flush_color; /* WQ:... -
linux驱动之workqueue
2021-05-10 18:41:59一、前言在内核驱动中,常常见到 工作队列(workqueue)。对于熟悉内核或者驱动的工程师来说,这个机制应该是比较熟悉的,经常出现在 中断上下文 中,用于执行中断后的操作。随着内核发展,驱动遇到越多越多的场景,而... -
workqueue之内核公共部分
2018-07-06 00:51:18struct workqueue_struct *__alloc_workqueue_key(const char *fmt, unsigned int flags, int max_active, ... -
Linux驱动开发之工作队列
2020-11-02 15:20:07一、什么是工作队列 工作队列(work queue)是Linux kernel中将工作推后执行的一种机制。这种机制和BH或Tasklets不同之处在于工作队列是把推后的工作交由一个内核线程去执行... struct workqueue_struct *changan_mf...