精华内容
下载资源
问答
  • windows 线程池demo,示例,简单明了,适用范围广,容易上手使用,实现了最基本的线程池
  • 工作队列旨在协调生产者和工作线程池之间的工作。 当需要执行某个任务时,生产者将一个包含任务例程的对象添加到工作队列中。 如果工作队列已满,生产者将阻塞,直到工作线程从队列中删除一个对象。 最终,工作线程...
  • linux内核工作队列demo

    2016-07-01 18:08:59
    很多博客都有深入分析内核队列工作原理,但是少有能够拿来直接运行的demo,这个demo亲测可用!
  • 本文介绍了Linux操作系统内核工作队列的操作模式。
  • RabbitMQ六种队列模式-工作队列模式

    千次阅读 2019-09-02 19:58:09
    RabbitMQ六种队列模式-工作队列 [本文] RabbitMQ六种队列模式-发布订阅 RabbitMQ六种队列模式-路由模式 RabbitMQ六种队列模式-主题模式 上文我们了解了 RabbitMQ 六种队列模式中的简单队列,代码也是非常的简单,...

    前言

    RabbitMQ六种队列模式-简单队列
    RabbitMQ六种队列模式-工作队列 [本文]
    RabbitMQ六种队列模式-发布订阅
    RabbitMQ六种队列模式-路由模式
    RabbitMQ六种队列模式-主题模式

    上文我们了解了 RabbitMQ 六种队列模式中的简单队列,代码也是非常的简单,比较容易理解。

    但是简单队列有个缺点,简单队列是一一对应的关系,即点对点,一个生产者对应一个消费者,按照这个逻辑,如果我们有一些比较耗时的任务,也就意味着需要大量的时间才能处理完毕,显然简单队列模式并不能满足我们的工作需求,我们今天再来看看工作队列。

    文章目录

    1. 什么是工作队列

    工作队列:用来将耗时的任务分发给多个消费者(工作者)

    主要解决问题:处理资源密集型任务,并且还要等他完成。有了工作队列,我们就可以将具体的工作放到后面去做,将工作封装为一个消息,发送到队列中,一个工作进程就可以取出消息并完成工作。如果启动了多个工作进程,那么工作就可以在多个进程间共享。

    工作队列也称为公平性队列模式,怎么个说法呢?

    循环分发,假如我们拥有两个消费者,默认情况下,RabbitMQ 将按顺序将每条消息发送给下一个消费者,平均而言,每个消费者将获得相同数量的消息,这种分发消息的方式称为轮询。

    看代码吧。

    2. 代码部分

    2.1 生产者

    创建50个消息

    public class Producer2 {
    
        /** 队列名称 */
        private static final String QUEUE_NAME = "test_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            /** 1.获取连接 */
            Connection newConnection = MQConnectionUtils.newConnection();
             /** 2.创建通道 */
            Channel channel = newConnection.createChannel();
             /**3.创建队列声明 */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
             /**保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */
            channel.basicQos(1);
            for (int i = 1; i <= 50; i++) {
                String msg = "生产者消息_" + i;
                System.out.println("生产者发送消息:" + msg);
             /**4.发送消息 */
                channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            }
            channel.close();
            newConnection.close();
        }
    
    }
    

    2.2 消费者

    public class Customer2_1 {
    
        /**
         * 队列名称
         */
        private static final String QUEUE_NAME = "test_queue";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            System.out.println("001");
            /** 1.获取连接 */
            Connection newConnection = MQConnectionUtils.newConnection();
            /** 2.获取通道 */
            final Channel channel = newConnection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            /** 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */
            channel.basicQos(1);
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String msgString = new String(body, "UTF-8");
                    System.out.println("消费者获取消息:" + msgString);
                    try {
                        Thread.sleep(1000);
                    } catch (Exception e) {
    
                    } finally {
                        /** 手动回执消息 */
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                }
            };
            /** 3.监听队列 */
            channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
        }
    
    }
    

    3. 循环分发

    3.1 启动生产者

    3.2 启动两个消费者

    在生产者中我们发送了50条消息进入队列,而上方消费者启动图里很明显的看到轮询的效果,就是每个消费者会分到相同的队列任务。

    3.3 公平分发

    由于上方模拟的是非常简单的消息队列的消费,假如有一些非常耗时的任务,某个消费者在缓慢地进行处理,而另一个消费者则空闲,显然是非常消耗资源的。

    再举一个例子,一个1年的程序员,跟一个3年的程序员,分配相同的任务量,明显3年的程序员处理起来更加得心应手,很快就无所事事了,但是3年的程序员拿着非常高的薪资!显然3年的程序员应该承担更多的责任,那怎么办呢?

    公平分发。

    其实发生上述问题的原因是 RabbitMQ 收到消息后就立即分发出去,而没有确认各个工作者未返回确认的消息数量,类似于TCP/UDP中的UDP,面向无连接。

    因此我们可以使用 basicQos 方法,并将参数 prefetchCount 设为1,告诉 RabbitMQ 我每次值处理一条消息,你要等我处理完了再分给我下一个。这样 RabbitMQ 就不会轮流分发了,而是寻找空闲的工作者进行分发。

    关键性代码:

    /** 2.获取通道 */
    final Channel channel = newConnection.createChannel();
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    /** 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 */
    channel.basicQos(1);
    

    4. 消息持久化

    4.1 问题背景

    上边我们提到的公平分发是由消费者收取消息时确认解决的,但是这里面又会出现被 kill 的情况。

    当有多个消费者同时收取消息,且每个消费者在接收消息的同时,还要处理其它的事情,且会消耗很长的时间。在此过程中可能会出现一些意外,比如消息接收到一半的时候,一个消费者死掉了。

    这种情况要使用消息接收确认机制,可以执行上次宕机的消费者没有完成的事情。

    但是在默认情况下,我们程序创建的消息队列以及存放在队列里面的消息,都是非持久化的。当RabbitMQ死掉了或者重启了,上次创建的队列、消息都不会保存。

    怎么办呢?

    4.2 参数配置

    参数配置一:生产者创建队列声明时,修改第二个参数为 true

    /**3.创建队列声明 */
    channel.queueDeclare(QUEUE_NAME, true, false, false, null);
    

    参数配置二:生产者发送消息时,修改第三个参数为MessageProperties.PERSISTENT_TEXT_PLAIN

    for (int i = 1; i <= 50; i++) {
    	String msg = "生产者消息_" + i;
    	System.out.println("生产者发送消息:" + msg);
    	/**4.发送消息 */
    	channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
    }
    

    5. 工作队列总结

    1、循环分发:消费者端在信道上打开消息应答机制,并确保能返回接收消息的确认信息,这样可以保证消费者发生故障也不会丢失消息。

    2、消息持久化:服务器端和客户端都要指定队列的持久化和消息的持久化,这样可以保证RabbitMQ重启,队列和消息也不会。

    3、公平分发:指定消费者接收的消息个数,避免出现消息均匀推送出现的资源不合理利用的问题。

    案例代码:https://www.lanzous.com/i5ydu6d

    我创建了一个java相关的公众号,用来记录自己的学习之路,感兴趣的小伙伴可以关注一下微信公众号哈:niceyoo

    展开全文
  • Java工作队列代码详解

    2020-08-28 18:19:12
    主要介绍了Java工作队列代码详解,涉及Round-robin 转发,消息应答(messageacknowledgments),消息持久化(Messagedurability)等相关内容,具有一定参考价值,需要的朋友可以了解下。
  • linux内核工作队列

    千次阅读 2017-12-09 12:45:42
    内核工作队列概述工作队列(workqueue)是另外一种将工作推后执行的形式,工作队列可以把工作推后,交由一个内核线程去执行,也就是说,这个下半部分可以在进程上下文中执行,最重要的就是工作队列允许被重新调度...

    内核工作队列概述

    工作队列(workqueue)是另外一种将工作推后执行的形式,工作队列可以把工作推后,交由一个内核线程去执行,也就是说,这个下半部分可以在进程上下文中执行,最重要的就是工作队列允许被重新调度甚至睡眠。


    linux workqueue工作原理

    linux系统启动期间会创建名为kworker/u:x(x0开始的整数,表示CPU编号)工作者内核线程,该线程创建之后处于sleep状态。从调度器的角度来解释,内核线程就是可以调度的进程;从代码表现形式看,本质是一个函数。

    工作队列结构原理

    work_struct,workqueue_struct,struct cpu_workqueue_struct三者之间关系如下:

    • 内核启动时会为每一个CPU创建一个cpu_workqueue_struct结构,同时还会有一个内核工作的线程,这个线程创建好后处于睡眠状态,等待用户加入工作,来唤醒线程去调度工作结构体 work_struct 中的工作函数。
    • 工作work_struct 是通过链表连接在cpu_workqueue_struct上,后面其他work_struct连接在前一个后面,组成一个队列
    • 工作者线程被唤醒后,会去自己负责的工作队列上依次执行上面struct_work结构中的工作函数,执行完成后就会把 work_struct 从链表上删除。
    • 如果想使用工作队列来延后执行一段代码,必须先创建work_struct -> cpu_workqueue_struct,然后把工作节点work_struct加入到workqueue_struct工作队列中,加入后工作者线程就会被唤醒,在适当的时机就会执行工作函数。

    工作队列数据结构

    不同版本内核中,对数据结构定义是不同的,以下是linux3.5源码版本摘出来的数据结构

    work_struct

    我们把推后执行的任务叫工作(work),描述它的数据结构为work_struct
    路径:workqueue.h linux-3.5\include\linux

    struct work_struct {
        atomic_long_t data;
        struct list_head entry;  //链表指针 把每个工作连接在一个链表上组成一个双向链表
        work_func_t func;        //函数指针  指向工作函数
    #ifdef CONFIG_LOCKDEP
        struct lockdep_map lockdep_map;
    #endif
    };

    补充work_func_t结构

    typedef void (*work_func_t)(void *work);

    补充list_head结构

    struct list_head {
        struct list_head *next, *prev;
    };

    我们编程只需要关注func成员它是工作函数指针就是用户需要延后执行的代码

    workqueue_struct

    这个结构是用来描述内核队列的数据结构。定义在workqueue.c linux-3.5\kernel中,在workqueue.h linux-3.5\include\linux中声明
    具体定义如下:

    struct workqueue_struct {
        unsigned int        flags;      /* W: WQ_* flags */
        union {
            struct cpu_workqueue_struct __percpu    *pcpu;
            struct cpu_workqueue_struct     *single;
            unsigned long               v;
        } cpu_wq;               /* I: cwq's */
        struct list_head    list;       /* W: list of all workqueues */
    
        struct mutex        flush_mutex;    /* protects wq flushing */
        int         work_color; /* F: current work color */
        int         flush_color;    /* F: current flush color */
        atomic_t        nr_cwqs_to_flush; /* flush in progress */
        struct wq_flusher   *first_flusher; /* F: first flusher */
        struct list_head    flusher_queue;  /* F: flush waiters */
        struct list_head    flusher_overflow; /* F: flush overflow list */
    
        mayday_mask_t       mayday_mask;    /* cpus requesting rescue */
        struct worker       *rescuer;   /* I: rescue worker */
    
        int         nr_drainers;    /* W: drain in progress */
        int         saved_max_active; /* W: saved cwq max_active */
    #ifdef CONFIG_LOCKDEP
        struct lockdep_map  lockdep_map;
    #endif
        char            name[];     /* I: workqueue name */
    };

    注意:这个结构表示一个工作队列,一般情况下驱动开发者不需要接触太多这个结构成员,关于队列操作,内核都提供了相应的API函数

    cpu_workqueue_struct

    struct cpu_workqueue_struct {
        struct global_cwq   *gcwq;      /* I: the associated gcwq */
        struct workqueue_struct *wq;        /* I: the owning workqueue */
        int         work_color; /* L: current color */
        int         flush_color;    /* L: flushing color */
        int         nr_in_flight[WORK_NR_COLORS];
                            /* L: nr of in_flight works */
        int         nr_active;  /* L: nr of active works */
        int         max_active; /* L: max active works */
        struct list_head    delayed_works;  /* L: delayed works */
    };

    内核通过delayed_works成员把第一个 work_struct 连接起来,后面work_struct通过本身的entry成员把自己连接在链表上。


    内核工作队列分类

    内核工作队列分成共享工作队列和自定义工作队列两种

    共享工作队列

    系统在启动时候自动创建一个工作队列驱动开发者如果想使用这个队列,则不需要自己创建工作队列,只需要把自己的work添加到这个工作队列上即可。
    使用schedule_work这个函数可以把work_struct添加到工作队列中

    自定义工作队列

    由于共享工作队列是大家共同使用的,如果上面的工作函数有存在睡眠的情况,阻塞了,则会影响到后面挂接上去的工作执行时间,当你的动作需要尽快执行,不想受其它工作函数的影响,则自己创建一个工作队列,然后把自己的工作添加到这个自定义工作队列上去。
    使用自定义工作队列分为两步:

    • 创建工作队列:使用creat_workqueue(name)创建一个名为name的工作队列
    • 把工作添加到上面创建的工作队列上:使用queue_work函数把一个工作结构work_struc添加到指定的工作队列上

    linux内核共享工作队列

    共享工作队列介绍

    内核为了方便驱动开发者使用工作队列,给我们创建好一个工作队列,只要使用schedule_work 添加的工作节点都是添加到内核共享工作队列中,使用方法只需要开发者实现一个 work_struct结构,然后把它添加到共享工作中去。

    内核共享队列API

    静态定义工作结构DECLARE_WORK

    路径workqueue.h \linux-3.5\include\linux

    #define DECLARE_WORK(n, f)                  \
        struct work_struct n = __WORK_INITIALIZER(n, f)

    功能:定义一个名字为nwork_struct结构变量,并且初始化它,工作是f
    参数:n要定义的work_struct结构变量名,f工作函数,要延后执行的代码

    动态初始化工作结构INIT_WORK

    路径workqueue.h \linux-3.5\include\linux

    #define INIT_WORK(_work, _func)                 \
        do {                            \
            __INIT_WORK((_work), (_func), 0);       \
        } while (0)

    功能:运行期间动态初始化work_struct结构
    参数:_work要定义的work_struct结构变量地址,_func工作函数,要延后执行的代码

    调度工作schedule_work

    声明路径在workqueue.h \linux-3.5\include\linux

    int queue_work(struct workqueue_struct *wq, struct work_struct *work)
    {
        int ret;
    
        ret = queue_work_on(get_cpu(), wq, work);
        put_cpu();
    
        return ret;
    }

    功能:把一个work_struct添加道共享工作队列中,成为一个工作节点。
    参数:work要定义的work_struct结构变量名地址
    返回值:0 表示已经挂接到共享工作队列上还未执行。非0 其他情况内核未做说明
    函数返回值通常不需要驱动开发者关注

    共享队列的使用步骤

    • 需要工作队列
    • 创建工作
    • 调度工作(创建工作节点)
      对于共享工作队列来讲,第一部已经有了,需要做第2/3

    编程步骤

    • 编写一个工作函数
      原型void (*work_func_t)(void *work);
    void mywork_func(struct work_struct *work)
    {
    ······//工作内容
    }

    工作函数的参数是工作结构变量的首地址
    - 定义一个工作结构
    第一种:静态定义

    DECLARE_WORK(mywork,mywork_func); //定义并且初始化

    第二种:动态定义

    struct work_struct mywork;
    • 初始化上一步定义的工作结构
      对于静态定义可以略过这一步,使用动态方式定义需要手动初始化
      INIT_WORK(&mywork,mywork_func);
    • 在适当的地方调度工作,把工作结构添加到工作队列上
      schedule_work(&mywork);

    内核共享队列示例

    #include<linux/module.h>
    #include<linux/init.h>
    //添加头文件
    #include<linux/workqueue.h>
    
    //实现一个work_func工作函数
    void mywork_func(struct work_struct *work)
    
    {
      printk("%s is call!!  work:%p\r\n",__FUNCTION__,work);
    }
    //定义一个struct work_struct结构变量,并且进行初始化
    DECLARE_WORK(mywork,mywork_func); //定义并且初始化
    
    
    static int __init mywork_init(void)
    {
      //一安装模块就进行调度
      schedule_work(&mywork);
      printk("%s is call!!",__FUNCTION__);  
      return 0;
    }
    
    static void __exit mywork_exit(void)
    {
    
      printk("mywork is exit!\r\n");
    }
    
    module_init(mywork_init);
    module_exit(mywork_exit);
    MODULE_LICENSE("GPL");
    

    Makefile

    KERN_DIR = /zhangchao/linux3.5/linux-3.5
    all:
        make -C $(KERN_DIR) M=`pwd` modules
    clean:
        make -C $(KERN_DIR) M=`pwd` modules clean
        rm -rf modules.order
    obj-m += workqueue.o

    开发板运行效果

    [root@ZC/zhangchao]#insmod workqueue.ko 
    [ 3526.610000] mywork_init is call!!
    [ 3526.615000] mywork_func is call!!  work:bf0040c0

    验证工作函数的参数是工作结构变量的首地址

    验证代码

    #include<linux/module.h>
    #include<linux/init.h>
    //添加头文件
    #include<linux/workqueue.h>
    
    //实现一个work_func工作函数
    void mywork_func(struct work_struct *work)
    
    {
      printk("%s is call!!  work:%p\r\n",__FUNCTION__,work);
    }
    //定义一个struct work_struct结构变量,并且进行初始化
    DECLARE_WORK(mywork,mywork_func); //定义并且初始化
    
    
    static int __init mywork_init(void)
    {
      //一安装模块就进行调度
      schedule_work(&mywork);
      printk("&mywork:%p\r\n",&mywork);
      printk("%s is call!!\r\n",__FUNCTION__);  
      return 0;
    }
    
    static void __exit mywork_exit(void)
    {
    
      printk("mywork is exit!\r\n");
    }
    
    module_init(mywork_init);
    module_exit(mywork_exit);
    MODULE_LICENSE("GPL");
    

    开发板演示效果

    [root@ZC/zhangchao]#insmod workqueue.ko 
    [ 4208.410000] &mywork:bf0080c4
    
    [ 4208.410000] mywork_init is call!!mywork_func is call!!  work:bf0080c4

    运行结果表明:工作函数的参数是工作结构变量的首地址

    为什么将工作结构体指针传入工作函数内部呢?用一个全局变量直接赋值不更方便么?原来在实际驱动开发中,经常将工作结构封装进我们自定义的数据结构中,这个数据结构还有其他成员,将数据结构地址传入,相比全局变量应用更加方便,同时减少了全局变量的使用,在一定意义上方便了驱动开发人员编写代码。

    示例代码:

    #include<linux/module.h>
    #include<linux/init.h>
    //添加头文件
    #include<linux/workqueue.h>
    
    //定义一个自定义数据结构       , 将工作结构体包含其中
    struct my_data {
    struct work_struct mywork;
    int x;
    int y;
    int z;
    };
    
    struct my_data mydata;
    
    //实现一个work_func工作函数
    void mywork_func(struct work_struct *work)
    
    {
      struct my_data *p=(struct my_data *)work;
      printk("p->mywork is %p\r\n",&p->mywork);
      printk("x:%d y:%d z:%d\r\n",mydata.x,mydata.y,mydata.z);
      printk("p->x:%d,p->y:%d,p->z:%d\r\n",p->x,p->y,p->z);
    }
    //定义一个struct work_struct结构变量,并且进行初始化
    //DECLARE_WORK(mywork,mywork_func); //定义并且初始化
    
    
    static int __init mywork_init(void)
    {
      //初始化相关变量
      mydata.x=123;
      mydata.y=456;
      mydata.z=789;
      //工作结构初始化
      INIT_WORK(&mydata.mywork,mywork_func);
    
      //一安装模块就进行调度
      schedule_work(&mydata.mywork);
      printk("mywork:%p\r\n",&mydata.mywork);
      printk("%s is call!!\r\n",__FUNCTION__);  
      return 0;
    }
    
    static void __exit mywork_exit(void)
    {
      printk("mywork is exit!\r\n");
    }
    
    module_init(mywork_init);
    module_exit(mywork_exit);
    MODULE_LICENSE("GPL");

    开发板演示效果:

    [root@ZC/zhangchao]#insmod workqueue.ko 
    [  615.650000] p->mywork is bf004280
    [  615.650000] x:123 y:456 z:789
    [  615.650000] p->x:123,p->y:456,p->z:789
    [  615.650000] mywork:bf004280
    [  615.650000] mywork_init is call!!

    我们能观察到,通过全局变量打印的结果与通过指针传入打印的结果是相同的,如果采用指针传入的方式可以避免全局变量的使用,方便实用。由于传入的work工作结构恰好是结构体第一个成员,传入工作结构地址也就是传入整个结构的首地址,如果工作结构不是第一个成员那要怎样处理呢?

    驱动代码:

    #include<linux/module.h>
    #include<linux/init.h>
    //添加头文件
    #include<linux/workqueue.h>
    
    //定义一个自定义数据结构       , 将工作结构体包含其中 在第二位
    struct my_data {
    int x;
    struct work_struct mywork;
    int y;
    int z;
    };
    
    //实现一个work_func工作函数
    void mywork_func(struct work_struct *work)
    {
      struct my_data *p=(struct my_data *)((unsigned int)work-4);
      //打印工作数据结构地址
      printk("p->mywork is %p\r\n",&p->mywork);
      printk("p->x:%d,p->y:%d,p->z:%d\r\n",p->x,p->y,p->z);
    }
    //定义一个struct work_struct结构变量,并且进行初始化
    //DECLARE_WORK(mywork,mywork_func); //定义并且初始化
    
    
    static int __init mywork_init(void)
    {
      //不使用全局变量  在函数其中定义
      struct my_data mydata;
    
      //初始化相关变量
      mydata.x=123;
      mydata.y=456;
      mydata.z=789;
      //工作结构初始化
      INIT_WORK(&mydata.mywork,mywork_func);
    
      //一安装模块就进行调度
      schedule_work(&mydata.mywork);
      //打印结构体首地址
      printk("mydata.x:%p\r\n",&mydata.x);
      printk("%s is call!!\r\n",__FUNCTION__);  
      return 0;
    }
    
    static void __exit mywork_exit(void)
    {
      printk("mywork is exit!\r\n");
    }
    
    module_init(mywork_init);
    module_exit(mywork_exit);
    MODULE_LICENSE("GPL");
    

    开发板运行效果:

    [root@ZC/zhangchao]#insmod workqueue.ko 
    [ 1437.705000] p->mywork is ed063ea0
    [ 1437.705000] p->x:123,p->y:456,p->z:789
    [ 1437.705000] mydata.x:ed063e9c
    [ 1437.705000] mywork_init is call!!

    运行结果表明虽然传入的工作结构并不是数据结构的第一个成员,但是在工作函数中还原地址时,只需要将前面成员所占用地址减去就能得到数据结构的首地址,打印的结果也不会出现错误。但是!!这是在数据成员不复杂的情况下可以计算得出数据结构的首地址,如果数据结构成员很复杂、很多,计算起来就会很麻烦而且容易出错,因此,计算的办法不太好,怎么做?内核提供了一个计算宏可以帮我们算出成员所在数据结构的首地址

    首地址计算宏

    #define container_of(ptr, type, member) ({          \
        const typeof(((type *)0)->member) * __mptr = (ptr); \
        (type *)((char *)__mptr - offsetof(type, member)); })
    #endif

    功能:算出成员所在数据结构的首地址
    参数:
    ptr:已知结构的变量成员地址(&mydata.mywork
    type:已知结构所在结构的变量类型(struct my_data)
    member:结构体成员名(mywork)

    驱动代码

    #include<linux/module.h>
    #include<linux/init.h>
    //添加头文件
    #include<linux/workqueue.h>
    
    //定义一个自定义数据结构       , 将工作结构体包含其中 在第二位
    struct my_data {
    int x;
    struct work_struct mywork;
    int y;
    int z;
    };
    struct my_data mydata;
    //实现一个work_func工作函数
    void mywork_func(struct work_struct *work)
    {
      struct my_data *p=(struct my_data *)container_of(work,struct my_data,mywork);
      //打印工作数据结构地址
      printk("p->mywork is %p\r\n",&p->mywork);
      printk("p->x:%d,p->y:%d,p->z:%d\r\n",p->x,p->y,p->z);
    }
    //定义一个struct work_struct结构变量,并且进行初始化
    //DECLARE_WORK(mywork,mywork_func); //定义并且初始化
    
    
    static int __init mywork_init(void)
    {
    
      //初始化相关变量
      mydata.x=123;
      mydata.y=456;
      mydata.z=789;
      //工作结构初始化
      INIT_WORK(&mydata.mywork,mywork_func);
    
      //一安装模块就进行调度
      schedule_work(&mydata.mywork);
      //打印结构体首地址
      printk("mydata.x:%p\r\n",&mydata.x);
      printk("%s is call!!\r\n",__FUNCTION__);  
      return 0;
    }
    
    static void __exit mywork_exit(void)
    {
      printk("mywork is exit!\r\n");
    }
    
    module_init(mywork_init);
    module_exit(mywork_exit);
    MODULE_LICENSE("GPL");
    

    开发板运行效果:

    [root@ZC/zhangchao]#insmod workqueue.ko 
    [ 3151.620000] mydata.x:bf010258
    [ 3151.620000] mywork_init is call!!
    [ 3151.625000] p->mywork is bf01025c
    [ 3151.625000] p->x:123,p->y:456,p->z:789
    [root@ZC/zhangchao]#

    结果表明这个宏能起到计算首地址效果


    自定义内核工作队列

    自定义工作队列介绍

    根据前面的框架图可以知道,需要我们自己先创建一个工作队列,再往工作队列中添加工作结构。

    自定义工作队列API

    • create_workqueue(name)
      会在每个CPU创建一个cpu_workqueue_struct结构
      路径:workqueue.h linux-3.5\include\linux
    #define create_workqueue(name)                  \
        alloc_workqueue((name), WQ_MEM_RECLAIM, 1)

    补充alloc_workqueue:

    #ifdef CONFIG_LOCKDEP
    #define alloc_workqueue(fmt, flags, max_active, args...)    \
    ({                              \
        static struct lock_class_key __key;         \
        const char *__lock_name;                \
                                    \
        if (__builtin_constant_p(fmt))              \
            __lock_name = (fmt);                \
        else                            \
            __lock_name = #fmt;             \
                                    \
        __alloc_workqueue_key((fmt), (flags), (max_active), \
                      &__key, __lock_name, ##args); \
    })
    #else
    #define alloc_workqueue(fmt, flags, max_active, args...)    \
        __alloc_workqueue_key((fmt), (flags), (max_active), \
                      NULL, NULL, ##args)
    #endif

    作用:创建一个名为name的工作队列
    参数:工作队列的名字是一个字符串
    返回值:
    成功:创建的工作队列指针struct workqueue_struct *
    失败:返回NULL

    • create_singlethread_workqueue(name)
      作用和上面的相同区别在于,如果只创建一个cpu_workqueue_struct结构,其余属性也相同

    • queue_work
      int queue_work(struct workqueue_struct *wq, struct work_struct *work);
      作用:wq工作队列中添加work工作节点
      参数:wq自己定义的工作队列结构变量地址,work自己要添加的工作队列节点
      返回值:
      其实系统工作队列调用的也是这个函数,只不过系统工作队列只有一个在函数内部封装起来,不用我们自己去填写

    int schedule_work(struct work_struct *work)
    {
        return queue_work(system_wq, work);
    }
    • destory_workqueue
      当用户不需要自定义工作队列时,必须使用这个函数来销毁这个工作队列,释放资源。
      void destroy_workqueue(struct workqueue_struct *wq)
      作用:销毁自定义工作队列
      参数:wq自定一工作队列的指针

    驱动代码

    #include<linux/module.h>
    #include<linux/init.h>
    //添加头文件
    #include<linux/workqueue.h>
    
    //定义一个自定义数据结构       , 将工作结构体包含其中 在第二位
    struct my_data {
    int x;
    struct work_struct mywork;
    int y;
    int z;
    };
    struct my_data mydata;
    
    //创建一个自定义工作队列指针  用来承接创建等待队列成功的地址
    struct workqueue_struct *mywq;
    //实现一个work_func工作函数
    void mywork_func(struct work_struct *work)
    {
      struct my_data *p=(struct my_data *)container_of(work,struct my_data,mywork);
      //打印工作数据结构地址
      printk("p->mywork is %p\r\n",&p->mywork);
      printk("p->x:%d,p->y:%d,p->z:%d\r\n",p->x,p->y,p->z);
    }
    //定义一个struct work_struct结构变量,并且进行初始化
    //DECLARE_WORK(mywork,mywork_func); //定义并且初始化
    
    
    static int __init mywork_init(void)
    {
      //初始化相关变量
      mydata.x=123;
      mydata.y=456;
      mydata.z=789;
      //创建自定义工作队列
      mywq=create_workqueue("mywq");
      if( mywq ==NULL)
        {
       printk("create_workqueue error\r\n");
       return -1;
      }
      printk("create_workqueue ok\r\n");
      //工作结构初始化
      INIT_WORK(&mydata.mywork,mywork_func);
      //一安装模块就进行调度
      //schedule_work(&mydata.mywork);
      queue_work(mywq,&mydata.mywork);
      //打印结构体首地址
      printk("mydata.x:%p\r\n",&mydata.x);
      printk("%s is call!!\r\n",__FUNCTION__);  
      return 0;
    }
    
    static void __exit mywork_exit(void)
    {
      printk("mywork is exit!\r\n");
      destroy_workqueue(mywq);
    }
    
    module_init(mywork_init);
    module_exit(mywork_exit);
    MODULE_LICENSE("GPL");

    开发板运行效果

    [root@ZC/zhangchao]#insmod workqueue.ko 
    [  287.095000] create_workqueue ok
    [  287.095000] mydata.x:bf0002ac
    [  287.095000] mywork_init is call!!
    [  287.095000] p->mywork is bf0002b0
    [  287.095000] p->x:123,p->y:456,p->z:789

    延时工作队列

    与前面说的共享队列,还是自定义工作队列不同,延时队列是在调度时,需要等待指定时间才会调用工作函数。

    延时队列数据结构

    路径:workqueue.h linux-3.5\include\linux
    内核使用一个delayed_work结构来描述一个要延期执行的工作,其定义如下:

    struct delayed_work {
        struct work_struct work;
        struct timer_list timer;
    };

    从结构可以知道:延时工作队列就是把普通工作队列结构和一个内核定时器结合在一起实现延长指定时间的才调度。

    延时队列相关API

    • DECLARE_DELAYED_WORK
      路径:workqueue.h linux-3.5\include\linux
    #define DECLARE_DELAYED_WORK(n, f)              \
        struct delayed_work n = __DELAYED_WORK_INITIALIZER(n, f)

    功能:定义一个名字为ndelayed_work结构变量,并且初始化它,工作函数是f
    参数:
    n:要定义的delayed_work结构变量名
    f:工作函数,就是要延期执行的代码

    • INIT_DELAYED_WORK
    #define INIT_DELAYED_WORK(_work, _func)             \
        do {                            \
            INIT_WORK(&(_work)->work, (_func));     \
            init_timer(&(_work)->timer);            \
        } while (0)

    功能:运行期间动态初始化一个delayed_struct结构
    参数:
    _work:初始化的要定义的delayed_struct结构
    _func:工作函数,要延期执行的代码

    • int schedule_delayed_work
      int schedule_delayed_work(struct delayed_work *work, unsigned long delay);
      功能:把一个delayed_struct添加到延时共享工作队列中,成为一个工作节点。
      参数:
      work:定义的delayed_struct结构地址
      delay:延时时间,时间单位时钟节拍jiffies
      返回值:0 表示已经挂接到共享工作队列上还未执行。非0 其他情况内核未做说明
    • queue_delayed_work
    int queue_delayed_work(struct workqueue_struct *wq,
                struct delayed_work *work, unsigned long delay);

    功能:把一个delayed_struct添加到自定义延时工作队列中,成为一个工作节点。
    参数:
    wq:自定义的延时工作队列结构地址
    work:定义的delayed_struct结构地址
    delay:延时时间,时间单位时钟节拍jiffies
    返回值:0 表示已经挂接到共享工作队列上还未执行。非0


    延时工作队列示例(使用内核共享工作队列)

    软件编程思想

    • 定义工作函数
    • 定义一个延时工作结构
    • 初始化上面定义的延时工作结构
    • 在适当的地方调度已经初始化好的延时工作结构

    编写代码 最简单的演示等待队列代码

    #include<linux/module.h>
    #include<linux/init.h>
    //添加头文件
    #include<linux/workqueue.h>
    
    
    
    //实现一个work_func工作函数
    void mywork_func(struct work_struct *work)
    {
    
      printk("12345678\r\n");
    }
    //分配结构体
    //struct delayed_work mydelayed_work;  动态初始化
    DECLARE_DELAYED_WORK(mydelayed_work, mywork_func);
    
    
    
    static int __init mywork_init(void)
    {
      //一安装模块就进行调度
      schedule_delayed_work(&mydelayed_work,2*HZ);
      printk("%s is call!!\r\n",__FUNCTION__);  
      return 0;
    }
    
    static void __exit mywork_exit(void)
    {
      printk("mywork is exit!\r\n");
    
    }
    struct delayed_work;
    
    module_init(mywork_init);
    module_exit(mywork_exit);
    MODULE_LICENSE("GPL");
    

    开发板演示效果

    [root@ZC/zhangchao]#insmod workqueue.ko 
    [ 5573.260000] mywork_init is call!!
    [root@ZC/zhangchao]#[ 5575.265000] 12345678

    其余功能与上面的工作队列差不多可自行测试

    展开全文
  • 工作队列 ( workqueue )

    2020-11-18 12:01:08
    在内核代码中,经常会遇到不能或不合适去马上调用某个处理过程,此时希望将该工作推送给某个内核线程执行,这样做的原因有很多,比如: 中断触发了某个过程的执行条件,而该过程执行时间较长或者会调用导致睡眠的...

    由来

    在内核代码中,经常会遇到不能或不合适去马上调用某个处理过程,此时希望将该工作推送给某个内核线程执行,这样做的原因有很多,比如:

    • 中断触发了某个过程的执行条件,而该过程执行时间较长或者会调用导致睡眠的函数,则该过程不应该在中断上下文中立即被调用。
    • 类似于中断,一些紧急性的任务不希望执行比较耗时的非关键过程,则需要把该过程提交到低优先级线程执行。比如一个轮询的通信接收线程,它需要快速完成检测和接收数据,而对数据的解析则应该交由低优先级线程慢慢处理。
    • 有时希望将一些工作集中起来以获取批处理的性能;或则合并缩减一些执行线程,减少资源消耗。

    基于以上需求,人们开发出了工作队列这一机制。工作队列不光在操作系统内核中会用到,一些应用程序或协议栈也会实现自己的工作队列。

    概念

    工作队列 ( workqueue )是将操作(或回调)延期异步执行的一种机制。工作队列可以把工作推后,交由一个内核线程去执行,并且工作队列是执行在线程上下文中,因此工作执行过程中可以被重新调度、抢占、睡眠。

    工作项(work item)是工作队列中的元素,是一个回调函数和多个回调函数输入参数的集合,有时也会有额外的属性成员,总之通过一个结构体即可记录和描述一个工作项。

    特性

    通过工作队列来执行一个工作相比于直接执行,会有一下特性:

    • 异步,工作不是在本中断或线程中执行,而是交由其他线程执行。
    • 延期,交由低优先级线程执行,执行前可能被抢占,也可能有其他工作排在前面执行,所以从提交工作队列到工作真正被执行之间会延时不定长时间。
    • 排队,FIFO 模式先到先执行。也肯会有多个优先级的工作队列,低优先级的工作队列要等到高优先级的工作队列全部执行完成后才能执行。但同一个工作队列内部的各项都是按时间先后顺序执行的额,不会进行钱赞重排。
    • 缓存,既然是队列它就能缓存多个项,需要异步执行丢进去就行,队列会逐个执行。虽然工作队列能缓存多个项,但也是有上限的当队列已满时,新的入队项就会被丢弃,丢弃的个数会被统计下来。

    实现

    队列本身可以通过数组加索引实现也可以通过链表来实现,但鉴于工作队列不存在抢占重排序的问题,则通过数组加索引的方式会更高效简洁。

    工作项可以通过一个结构体来定义,则工作队列的主体就是一个结构体数组。同时还有一些数据用来描述该结构体,如:队列深度,入队指针,出队指针,丢弃统计,错误统计等。

    工作队列实现的关键操作:

    • 创建和删除工作队列
    • 清空队列
    • 入队,或添加工作
    • 出队,或执行工作
    • 统计显示队列状态,包括丢弃个数
    • 删除或取消已入队但还未执行的工作

    SylixOS中的工作队列

    SylixOS提供两套工作队列接口,工作队列和系统工作队列。

    • 内核工作队列(JobQueue)位于内核文件libsylixos/SylixOS/kernel/core/_JobQueue.c中,是纯粹的针对工作队列对象的一些操作集合,供内核使用,系统工作队列和网络工作队列都是基于内核工作队列实现的。 SylixOS 内核工作队列说明见博客《SylixOS 内核工作队列》

    • 系统工作队列(Work Queue)位于libsylixos\SylixOS\kernel\interface\WorkQueue.c中,供驱动和应用层使用。其中系统工作队列基于内核工作队列实现,并结合了执行线程创建、同步保护,延迟执行等相关操作。SylixOS 系统工作队列说明见博客《SylixOS 系统工作队列》

    系统工作队列相比于内核工作队列,不仅提供了执行线程,还提供了延时执行的功能,更便于被直接调用,是一种系统级调用接口。内核工作队列提供的则是更基础的调用方法,是一种半成品,有些不适合直接使用系统工作队列的地方,就需要在内核工作队列的基础上构造模块自己独享的工作队列,如网络接收处理和系统中断下半部执行等都是基于内核工作队列单独实现的。

    展开全文
  • 工作队列(work queue)是Linux内核中将操作延期执行的一种机制。因为它们是通过守护进程在用户上下文执行,函数可以睡眠的时间,与内核是无关的。在内核版本2.5开发期间,设计了工作队列,用以替换此前的keventd机制...

    在linux中,当你想延时几秒或者几毫秒再执行一个任务或者自定义的一个函数时,延时工作队列是你最好的选择。在你的任务或者函数中,加上queue_delayed_work,就可以每隔一段时间执行一次你的任务或者自定义的一个函数,具体实现如下:

    按如下步骤:

    首先还是要添加上工作队列的相关头文件,头文件一般有这些函数的声明和定义,include了相关的头文件,我们才能随心所欲的使用这些头文件里面的函数,达到我们的目的:

    #include <linux/workqueue.h>
    

    一.声明一个工作队列

    static struct workqueue_struct *test_wq;
    

    二.声明一个延期工作描述实例

    static struct delayed_work test_delay_wq;
    

    三.声明并实现一个工作队列延迟处理函数

    void test_power_delay_work_func(struct work_struct *work)
     {
            int value;
    
       value = get_adc_sample(0, CHAN_1);  //这里调用adc接口去读取数据
        pr_err("-czd-: current power adc value is %d\n", value);
    
       queue_delayed_work(test_wq, &test_delay_wq, msecs_to_jiffies(5000)); //加上这句就可以每隔5秒钟执行一次这个函数test_power_delay_work_func,不加就只执行一次,时间msecs_to_jiffies(5000)可以设置别的时间,这里是5秒,相关部分想了解更多可以百度
     }
    

    四.初始化一个工作队列(在init模块加载函数添加或者在probe函数添加)

    test_wq = create_workqueue("test_wq");
    if (!test_wq) {  
            printk(-czd-: No memory for workqueue\n");  
            return 1;     
        } 
    

    五. 任务初始化(在init模块加载函数添加或者在probe函数添加)

    INIT_DELAYED_WORK(&test_delay_wq, test_power_delay_work_func);
    

    六.向工作队列提交工作项(在init模块加载函数添加或者在probe函数添加)

    ret = queue_delayed_work(test_wq, &test_delay_wq, msecs_to_jiffies(5000));
    pr_err("-czd-: ret=%d\n", ret);
    

    七.取消工作队列中的工作项

    int cancel_delayed_work(test_wq);
    

    如果这个工作项在它开始执行前被取消,返回值是非零。内核保证给定工作项的执行不会在调用 cancel_delay_work 成功后被执行。 如果 cancel_delay_work 返回 0,则这个工作项可能已经运行在一个不同的处理器,并且仍然可能在调用 cancel_delayed_work 之后被执行。要绝对确保工作函数没有在 cancel_delayed_work 返回 0 后在任何地方运行,你必须跟随这个调用之后接着调用 flush_workqueue。在 flush_workqueue 返回后。任何在改调用之前提交的工作函数都不会在系统任何地方运行。

    八.刷新工作队列

    flush_workqueue(test_wq);
    

    九.工作队列销毁

    destroy_workqueue(test_wq);
    

    七八九可以同时添加到exit函数中,例如以下:

    static void __exit module_exit(void)  
    {  
    
       int ret;  
        ret = cancel_delayed_work(&test_dwq);  
        flush_workqueue(test_wq);
        destroy_workqueue(test_wq);
       printk("-czd-: enter %s, ret=%d\n", __func__,  ret);  
    }
    

    除了上面调用的queue_delayed_work之外,使用schedule_delayed_work也是可以的。其实schedule_delayed_work最终返回调用的还是queue_delayed_work。函数声明在include/linux/workqueue.h中。

     static inline bool schedule_delayed_work(struct delayed_work *dwork,
                                              unsigned long delay)
     {
             return queue_delayed_work(system_wq, dwork, delay);
     }
    

    下面的demo是实现3秒之后再执行work_queue:

    	#include <linux/workqueue.h>
    	static struct delayed_work send_event; //定义一个delay_work
    
    
    	static void send_event_workq(struct work_struct *work)  //定义你要延时执行的函数
    	{
        rk_send_wakeup_key();
    	   printk("***************charger mode send wakeup key\n\n");
    	   schedule_delayed_work(&send_event, msecs_to_jiffies(3000)); //添加之后每隔3秒执行一次
    	}
    
    
    	 static int  __init module_init(void)
    	 {
    
     	  INIT_DELAYED_WORK(&send_event, send_event_workq); //初始化工作队列
     	   schedule_delayed_work(&send_event, msecs_to_jiffies(3000)); //添加到延时工作队列,这里延时3秒
    	}
    
    	static void __exit module_exit(void)  
    	{  
     
        cancel_delayed_work_sync(&send_event);  //取消延时工作队列
    	} 
    

    下面是创建一个工作队列的demo 代码:

    	#include <linux/workqueue.h>
    	void my_func(void *data)
    	{
        char *name = (char *)data;
        printk(KERN_INFO “Hello world, my name is %s!\n”, name);
    	}
    
    	struct workqueue_struct *my_wq = create_workqueue(“my wq”);
    	struct work_struct my_work;
    
    	INIT_WORK(&my_work, my_func);
    	queue_work(my_wq, &my_work);
    
    	destroy_workqueue(my_wq);
    
    展开全文
  • Nuttx 工作队列 work queue

    千次阅读 2017-04-19 14:26:23
    在Linux操作系统中,工作队列(work queue)是Linux kernel中将工作推后执行的一种机制。这种机制和BH或Tasklets不同之处在于工作队列是把推后的工作交由一个内核线程去执行,因此工作队列的优势就在于它允许重新调度...
  • c#实现的工作队列,workquere,多线程管理-c# realization of the work queue, workquere, multi-threaded management
  • RabbitMQ官网介绍了,它支持六种应用场景:简单队列、工作队列、发布/订阅、路由模式、Topics主题模式、RPC,接下来分别介绍。 创建一个Maven项目命名rabbitmq,并引入rabbitmq依赖。 &lt;dependency&...
  • Linux 工作队列和等待队列

    千次阅读 2018-03-22 10:17:08
    schedule_work调度执行一个具体的任务,执行的任务...输入参数:@ workqueue_struct:指定的workqueue指针@work_struct:具体任务对象指针Linux 工作队列和等待队列的区别等待队列在内核中有很多用途,尤其适合用于...
  • 中断的上半部和下半部——工作队列中断的上半部和下半部——工作队列
  • redis-priority-queue是一个简单的工作队列,类似于具有以下新增功能: 可以添加具有优先级的项目(介于-9007199254740992和9007199254740992之间) 队列会自动进行重复数据删除(重复的项目在推送时会作废) ...
  • 工作队列(Work Queue)

    千次阅读 2019-03-17 14:25:41
    工作队列(Work Queue) 工作队列是一种企业级任务管理协同机制。在RPA领域,工作队列通常指将以业务视角出发的单一工作任务放入工作队列池,再按需执行的过程。这些单一工作任务,往往是指每一笔工单,每一笔业务...
  • 内核工作队列workqueue 简述

    千次阅读 2019-01-15 20:14:36
    一 引入工作队列(work queue) 之前聊过Linux中断机制分为上半部中断(硬中断)和下半部,顶半部中断用于完成比较紧急的功能,往往只是简单的读取寄存器中的中断状态,并在清除中断标志后,启动下半部,下半部需要...
  • 理解 linux 工作队列

    千次阅读 2017-06-05 21:29:17
    对 linux 工作队列的理解
  • C11 + Pthreads 原子有界工作队列 这是一个使用 C11 的stdatomic.h特性提供单写入器、多读取器无锁队列 (lqueue) 的小型库。 该队列使用 POSIX 线程和信号量封装在多线程工作队列 (wqueue) 中。 功能指针/参数元组...
  • 主要介绍了java线程池工作队列饱和策略代码示例,涉及线程池的简单介绍,工作队列饱和策略的分析及代码示例,具有一定参考价值,需要的朋友可以了解下。
  • Linux工作队列实现机制

    千次阅读 2016-06-30 22:18:26
     把推后执行的任务叫做工作(work),描述它的数据结构为work_struct ,这些工作以队列结构组织成工作队列(workqueue),其数据结构为workqueue_struct ,而工作线程就是负责执行工作队列中的工作。系统默
  • zephyr笔记 2.1.5 工作队列线程

    千次阅读 2018-04-18 22:10:35
    我正在学习 Zephyr,一个很可能会用到很多物联网设备上的操作系统,如果你也感兴趣... ISR或高优先级线程通常使用工作队列来将非紧急处理卸载到较低优先级的线程,因此不会影响对时间敏感的处理事务。 http://docs....
  • 面试题之小任务与工作队列的区别

    千次阅读 2016-12-09 16:41:48
    如前所述,我们把推后执行的任务叫做工作(work),描述它的数据结构为work_struct,这些工作以队列结构组织成工作队列(workqueue),其数据结构为workqueue_struct,而工作线程就是负责执行工作队列中的工作。...
  • 分配了另一个工作队列,但更好的分配器是一种工作队列,其目的是:快速,可靠,功能丰富,并针对开发人员的需求进行量身定制。 主要功能不会影响稳定性或重新分配其他工作队列,但是更好的分配是一种工作队列,其...
  • Linux 工作队列和等待队列的区别

    千次阅读 2018-05-24 11:26:33
    wait queue是一种「任务队列」,可以把一些进程放在上面睡眠等待某个事件,强调静态多一些,重点在queue上,即它就是一个queue,这个queue如何调度,什么时候调度并不重要。对这2个容易混淆的队列做简单概念上的区别...
  • 学到更多。 开发更多。 连接更多。 新的developerWorks Premium会员计划可通过Safari图书在线获得对强大的开发工具和资源的无障碍访问权,其中包括500个顶级技术标题(数十个专门针对Java开发人员),主要开发人员...
  • RabbitMQ是一个消息队列服务器,在本文中我们将学习到Python+Pika+RabbitMQ环境部署及实现工作队列的实例教程,需要的朋友可以参考下
  • tasklet 和 工作队列

    千次阅读 2015-05-14 14:27:11
    该函数会在工作线程的上下文运行,因此如果有必要,它可以休眠,当然我们应该仔细考虑休眠会不会影响提交到同一工作队列的其他任务。但是函数不能访问用户空间,这是因为它运行在内核线程,而该线程没有对应的用户...
  • 队列遵循先进先出,那么其实跟链表的尾插就类似的,正好,利用这个...这个等待队列可以设计为以下数据结构:工作者结构+基本队列链式结构所以可以设计出以下结构体://工作者结构 typedef struct __work_st { //工作

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 548,297
精华内容 219,318
关键字:

工作队列