精华内容
下载资源
问答
  • springboot添加task异步任务队列
    2021-11-08 09:29:54
    public interface Itask {
        void run();
    }
    
    public class MyTask implements Itask {
        public MyTask () {
        }
    
    	//要执行的代码逻辑
        @Override
        public void run() {
            System.out.println("MyTask");
        }
    }
    
    public class TaskExecutor extends Thread {
    
        private BlockingQueue taskQueue;
        private Boolean isRunning = true;
    
        public TaskExecutor(BlockingQueue taskQueue) {
            this.taskQueue = taskQueue;
        }
    
        // 退出。
        public void quit() {
            isRunning = false;
            interrupt();
        }
        @Override
        public void run() {
            while (isRunning) { // 如果是执行状态就待着。
                Itask iTask;
                try {
                    iTask = (Itask) taskQueue.take(); // 下一个任务,没有就等着。
                } catch (InterruptedException e) {
                    if (!isRunning) {
                        // 发生错误中断代码
                        interrupt();
                        break; 
                    }
                    // 发生意外了,不是退出状态,那么窗口继续等待。
                    continue;
                }
    
                // 执行任务
                iTask.run();
            }
        }
    }
    
    public class TaskQueue {
        // 任务队列
        private BlockingQueue mTaskQueue;
        // 执行器
        private TaskExecutor[] mTaskExecutors;
    
        // 创建队列时,指定执行器数量,保证你开的多个线程是否需要等待
        public TaskQueue(int size) {
            mTaskQueue = new LinkedBlockingQueue<>();
            mTaskExecutors = new TaskExecutor[size];
        }
    
        // 开启队列。
        public void start() {
            stop();
            for (int i = 0; i < mTaskExecutors.length; i++) {
                mTaskExecutors[i] = new TaskExecutor(mTaskQueue);
                mTaskExecutors[i].start();
            }
        }
    
        // 关闭队列。
        public void stop() {
            if (mTaskExecutors != null)
                for (TaskExecutor taskExecutor : mTaskExecutors) {
                    if (taskExecutor != null) taskExecutor.quit();
                }
        }
    
        //添加任务到队列。
        public  int add(Itask task) {
            if (!mTaskQueue.contains(task)) {
                mTaskQueue.add(task);
            }
            // 返回队列中的任务数
            return mTaskQueue.size();
        }
    }
    
    public class Test{
    	public static void main(String[] args) {
            TaskQueue tq = new TaskQueue(1);
            MyTask myTask= new MyTask ();
            tq.add(myTask);
            tq.start();
      
        }
    }
    
    更多相关内容
  • 机械机械是基于分布式消息传递的异步任务队列/作业队列。 V2实验的第一步配置自定义Logger服务器工作程序任务注册任务机械机械是基于分布式消息传递的异步任务队列/作业队列。 V2实验的第一步配置自定义Logger服务器...
  • 本文实例讲述了PHP扩展Swoole实现实时异步任务队列。分享给大家供大家参考,具体如下: 假如要发100封邮件,for循环100遍,用户直接揭竿而起,什么破网站!...这便是定时异步任务队列。但当前提交的任务要一分钟后才能
  • 在同事的指引下接触了Celery这个异步任务队列框架,鉴于网上关于Celery和Django结合的文档较少,大部分也只是粗粗介绍了大概的流程,在实践过程中还是遇到了不少坑,希望记录下来帮助有需要的朋友。 一、Django中的...
  • Celery是异步任务队列,可以独立于主进程运行,在主进程退出后,也不影响队列中的任务执行。 任务执行异常退出,重新启动后,会继续执行队列中的其他任务,同时可以缓存停止期间接收的工作任务,这个功能依赖于消息...
  • 将这个任务添加到队列中 立即返回「操作成功,正在后台处理」的字样 后台消费这个队列,执行这个任务 我们按照这个思路,借助 Celery 进行实现。 实现 本文所使用的环境如下: Python 3.6.7 RabbitMQ 3.8 ...
  • 在同事的指引下接触了Celery这个异步任务队列框架,鉴于网上关于Celery和Django结合的文档较少,大部分也只是粗粗介绍了大概的流程,在实践过程中还是遇到了不少坑,希望记录下来帮助有需要的朋友。 一、Django中的...

      前段时间在Django Web平台开发中,碰到一些请求执行的任务时间较长(几分钟),为了加快用户的响应时间,因此决定采用异步任务的方式在后台执行这些任务。在同事的指引下接触了Celery这个异步任务队列框架,鉴于网上关于Celery和Django结合的文档较少,大部分也只是粗粗介绍了大概的流程,在实践过程中还是遇到了不少坑,希望记录下来帮助有需要的朋友。

    一、Django中的异步请求

    Django Web中从一个http请求发起,到获得响应返回html页面的流程大致如下:http请求发起 -- http handling(request解析) -- url mapping(url正则匹配找到对应的View) -- 在View中进行逻辑的处理、数据计算(包括调用Model类进行数据库的增删改查)--将数据推送到template,返回对应的template/response。

                             图1. Django架构总览

    同步请求:所有逻辑处理、数据计算任务在View中处理完毕后返回response。在View处理任务时用户处于等待状态,直到页面返回结果。

    异步请求:View中先返回response,再在后台处理任务。用户无需等待,可以继续浏览网站。当任务处理完成时,我们可以再告知用户。

    二、关于Celery

      Celery是基于Python开发的一个分布式任务队列框架,支持使用任务队列的方式在分布的机器/进程/线程上执行任务调度。

                图2. Celery架构

      图2展示的是Celery的架构,它采用典型的生产者-消费者模式,主要由三部分组成:broker(消息队列)、workers(消费者:处理任务)、backend(存储结果)。实际应用中,用户从Web前端发起一个请求,我们只需要将请求所要处理的任务丢入任务队列broker中,由空闲的worker去处理任务即可,处理的结果会暂存在后台数据库backend中。我们可以在一台机器或多台机器上同时起多个worker进程来实现分布式地并行处理任务。

    三、Django中Celery的实现

      在实际使用过程中,发现Celery在Django里的实现与其在一般.py文件中的实现还是有很大差别,Django有其特定的使用Celery的方式。这里着重介绍Celery在Django中的实现方法,简单介绍与其在一般.py文件中实现方式的差别。

      1. 建立消息队列

      首先,我们必须拥有一个broker消息队列用于发送和接收消息。Celery官网给出了多个broker的备选方案:RabbitMQ、Redis、Database(不推荐)以及其他的消息中间件。在官网的强力推荐下,我们就使用RabbitMQ作为我们的消息中间人。在Linux上安装的方式如下:

    sudo apt-get install rabbitmq-server

      命令执行成功后,rabbitmq-server就已经安装好并运行在后台了。

      另外也可以通过命令rabbitmq-server -detached来在后台启动rabbitmq server以及命令rabbitmqctl stop来停止server。

      更多的命令可以参考rabbitmq官网的用户手册:https://www.rabbitmq.com/manpages.html

      2. 安装django-celery

    pip install celery
    pip install django-celery

      3. 配置settings.py

      首先,在Django工程的settings.py文件中加入如下配置代码:

    import djcelery
    djcelery.setup_loader()
    BROKER_URL= 'amqp://guest@localhost//'
    CELERY_RESULT_BACKEND = 'amqp://guest@localhost//'

      其中,当djcelery.setup_loader()运行时,Celery便会去查看INSTALLD_APPS下包含的所有app目录中的tasks.py文件,找到标记为task的方法,将它们注册为celery task。BROKER_URL和CELERY_RESULT_BACKEND分别指代你的Broker的代理地址以及Backend(result store)数据存储地址。在Django中如果没有设置backend,会使用其默认的后台数据库用来存储数据。注意,此处backend的设置是通过关键字CELERY_RESULT_BACKEND来配置,与一般的.py文件中实现celery的backend设置方式有所不同。一般的.py中是直接通过设置backend关键字来配置,如下所示:

    app = Celery('tasks', backend='amqp://guest@localhost//', broker='amqp://guest@localhost//')

      然后,在INSTALLED_APPS中加入djcelery:

    INSTALLED_APPS = (
        ……   
        'qv',
        'djcelery'
        ……   
    )   

      4. 在要使用该任务队列的app根目录下(比如qv),建立tasks.py,比如:

      在tasks.py中我们就可以编码实现我们需要执行的任务逻辑,在开始处import task,然后在要执行的任务方法开头用上装饰器@task。需要注意的是,与一般的.py中实现celery不同,tasks.py必须建在各app的根目录下,且不能随意命名。

      5. 生产任务

      在需要执行该任务的View中,通过build_job.delay的方式来创建任务,并送入消息队列。比如:

      6. 启动worker的命令

    #先启动服务器
    python manage.py runserver
    #再启动worker 
    python manage.py celery worker -c 4 --loglevel=info

    四、补充

      Django下要查看其他celery的命令,包括参数配置、启动多worker进程的方式都可以通过python manage.py celery --help来查看:

       另外,Celery提供了一个工具flower,将各个任务的执行情况、各个worker的健康状态进行监控并以可视化的方式展现,如下图所示:

      Django下实现的方式如下: 

      1. 安装flower:

    pip install flower

      2. 启动flower(默认会启动一个webserver,端口为5555):

    python manage.py celery flower

      3. 进入http://localhost:5555即可查看。

    展开全文
  • 主要介绍了Python Celery异步任务队列使用方法解析,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • celery(芹菜)是一个异步任务队列/基于分布式消息传递的作业队列。它侧重于实时操作,但对调度支持也很好。 celery用于生产系统每天处理数以百万计的任务。 celery是用Python编写的,但该协议可以在任何语言实现。它...
  • 机械是基于分布式消息传递的异步任务队列/作业队列。 V2实验 请注意,V2正在开发中,并且可能并且会发生重大更改,直到准备就绪为止。 您可以使用当前的V2,以避免必须导入未使用的代理和后端的所有依赖项。 代替...
  • YTask -- Go 异步任务队列

    千次阅读 2019-09-06 10:26:41
    YTask 是 Go 的异步任务队列,比起其他框架更方便快捷。 架构图: 特性: 支持几乎所有类型,包括基本类型(int, floalt, string),数组切片,结构体以及复杂的结构体嵌套。 注册任务,调用任务一行代码完成...

    YTask 是 Go 的异步任务队列,比起其他框架更方便快捷。

    架构图:

    architecture_diagram

    特性:

    • 支持几乎所有类型,包括基本类型(int, floalt, string),数组切片,结构体以及复杂的结构体嵌套。
    • 注册任务,调用任务一行代码完成,不需要对参数进行而外处理。
    • 优雅的启动与结束方式,能1秒结束任务(如果你用过其他的框架(比如gocelery,machinery)会发现就算没有任务,他们也没法立即结束任务,而是需要等几秒)

    GitHub:

    https://github.com/gojuukaze/YTask

    安装

    go get github.com/gojuukaze/YTask/v2

    注册任务

    type User struct{
        ...
    }
    // 任务函数
    func DemoFunc(a int, b float64, c []string, user User) (int, []User, string) {
        ....
        return ....
    }
    
    ...
    
    ser.Add("group1", "demo_func", DemoFunc)

    调用任务

    taskId, _ = client.Send("group1", "demo_func", 11, 22.2, []string{"bb", "cc"}, User{"hh",24})
    

    获取结果

    result, _ = client.GetResult(taskId, 2*time.Second, 300*time.Millisecond)
    var a int
    var b []User
    var c string
    a, _ = result.GetInt64(0)
    // or
    result.Get(1,&b)
    // or
    result.Gets(&a, &b, &c)

     

    展开全文
  • 一、异步任务队列原理 我们可以把“处理单条数据”理解为一个异步任务,因此对这十万条数据的处理,就可以转化成有十万个异步任务等待进行。我们可以把这十万条数据塞到一个队列里面,让任务处理器自发地从队列里面...

    关于本文 作者:@jrainlau 原文:https://segmentfault.com/a/1190000037567355

    在最近的业务中,接到了一个需要处理约十万条数据的需求。这些数据都以字符串的形式给到,并且处理它们的步骤是异步且耗时的(平均处理一条数据需要 25s 的时间)。如果以串行的方式实现,其耗时是相当长的:

    总耗时时间 = 数据量 × 单条数据处理时间 T = N * t (N = 100,000; t = 25s)

    总耗时时间 = 2,500,000 秒 ≈ 695 小时 ≈ 29 天

    显然,我们不能简单地把数据一条一条地处理。那么有没有办法能够减少处理的时间呢?经过调研后发现,使用异步任务队列是个不错的办法。

    一、异步任务队列原理

    我们可以把“处理单条数据”理解为一个异步任务,因此对这十万条数据的处理,就可以转化成有十万个异步任务等待进行。我们可以把这十万条数据塞到一个队列里面,让任务处理器自发地从队列里面去取得并完成。

    任务处理器可以有多个,它们同时从队列里面把任务取走并处理。当任务队列为空,表示所有任务已经被认领完;当所有任务处理器完成任务,则表示所有任务已经被处理完。

    其基本原理如下图所示:

    首先来解决任务队列的问题。在这个需求中,任务队列里面的每一个任务,都包含了待处理的数据,数据以字符串的形式存在。为了方便起见,我们可以使用 Redis 的 List 数据格式来存放这些任务。

    由于项目是基于 NodeJS 的,我们可以利用 PM2 的 Cluster 模式来启动多个任务处理器,并行地处理任务。以一个 8 核的 CPU 为例,如果完全开启了多进程,其理论处理时间将提升 8 倍,从 29 天缩短到 3.6 天。

    接下来,我们会从实际编码的角度来讲解上述内容的实现过程。

    二、使用 NodeJS 操作 Redis

    异步任务队列使用 Redis 来实现,因此我们需要部署一个单独的 Redis 服务。在本地开发中为了快速完成 Redis 的安装,我使用了 Docker 的办法(默认机器已经安装了 Docker)。

    Docker 拉取 Redis 镜像

    docker pull redis:latest
    

    Docker 启动 Redis

    docker run -itd --name redis-local-p 6379:6379 redis
    

    此时我们已经使用 Docker 启动了一个 Redis 服务,其对外的 IP 及端口为 127.0.0.1:6379。此外,我们还可以在本地安装一个名为 Another Redis DeskTop Manager的 Redis 可视化工具,来实时查看、修改 Redis 的内容。

    在 NodeJS 中,我们可以使用 node-redis 来操作 Redis。新建一个 mqclient.ts 文件并写入如下内容:

    import* asRedisfrom'redis'
    const client = Redis.createClient({
      host: '127.0.0.1',
      port: 6379
    })
    exportdefault client
    

    Redis 本质上是一个数据库,而我们对数据库的操作无非就是增删改查。node-redis 支持 Redis 的所有交互操作方式,但是操作结果默认是以回调函数的形式返回。为了能够使用 async/await,我们可以新建一个 utils.ts 文件,把 node-redis 操作 Redis 的各种操作都封装成 Promise 的形式,方便我们后续使用。

    import client from'./mqClient'
    // 获取 Redis 中某个 key 的内容
    exportconst getRedisValue = (key: string): Promise<string| null> => newPromise(resolve => client.get(key, (err, reply) => resolve(reply)))
    // 设置 Redis 中某个 key 的内容
    exportconst setRedisValue = (key: string, value: string) => newPromise(resolve => client.set(key, value, resolve))
    // 删除 Redis 中某个 key 及其内容
    exportconst delRedisKey = (key: string) => newPromise(resolve => client.del(key, resolve))
    

    除此之外,还能在 utils.ts 中放置其他常用的工具方法,以实现代码的复用、保证代码的整洁。

    为了在 Redis 中创建任务队列,我们可以单独写一个 createTasks.ts 的脚本,用于往队列中塞入自定义的任务。

    import{ TASK_NAME, TASK_AMOUNT, setRedisValue, delRedisKey } from'./utils'
    import client from'./mqClient'
    client.on('ready', async() => {
    await delRedisKey(TASK_NAME)
    for(let i = TASK_AMOUNT; i > 0; i--) {
        client.lpush(TASK_NAME, `task-${i}`)
    }
      client.lrange(TASK_NAME, 0, TASK_AMOUNT, async(err, reply) => {
    if(err) {
          console.error(err)
    return
    }
        console.log(reply)
        process.exit()
    })
    })
    

    在这段脚本中,我们从 utils.ts 中获取了各个 Redis 操作的方法,以及任务的名称 TASKNAME (此处为 localtasks)和任务的总数 TASKAMOUNT(此处为 20 个)。通过 LPUSH 方法往 TASKNAME 的 List 当中塞入内容为 task-1 到 task-20 的任务,如图所示:

    三、异步任务处理

    首先新建一个 index.ts 文件,作为整个异步任务队列处理系统的入口文件。

    import taskHandler from'./tasksHandler'
    import client from'./mqClient'
    client.on('connect', () => {
      console.log('Redis is connected!')
    })
    client.on('ready', async() => {
      console.log('Redis is ready!')
    await taskHandler()
    })
    client.on('error', (e) => {
      console.log('Redis error! '+ e)
    })
    

    在运行该文件时,会自动连接 Redis,并且在 ready 状态时执行任务处理器 taskHandler()。

    在上一节的操作中,我们往任务队列里面添加了 20 个任务,每个任务都是形如 task-n 的字符串。为了验证异步任务的实现,我们可以在任务处理器 taskHandler.ts 中写一段 demo 函数,来模拟真正的异步任务:

    function handleTask(task: string) {
    returnnewPromise((resolve) => {
          setTimeout(async() => {
            console.log(`Handling task: ${task}...`)
            resolve()
    }, 2000)
    })
    }
    

    上面这个 handleTask() 函数,将会在执行的 2 秒后打印出当前任务的内容,并返回一个 Promise,很好地模拟了异步函数的实现方式。接下来我们将会围绕这个函数,来处理队列中的任务。

    其实到了这一步为止,整个异步任务队列处理系统已经基本完成了,只需要在 taskHandler.ts 中补充一点点代码即可:

    import{ popTask } from'./utils'
    import client from'./mqClient'
    function handleTask(task: string) { /* ... */}
    exportdefaultasyncfunction tasksHandler() {
    // 从队列中取出一个任务
    const task = await popTask()
    // 处理任务
    await handleTask(task)
    // 递归运行
    await tasksHandler()
    }
    

    最后,我们使用 PM2 启动 4 个进程,来试着跑一下整个项目:

    pm2 start ./dist/index.js -i 4&& pm2 logs
    

    可以看到,4 个任务处理器分别处理完了队列中的所有任务,相互之前互不影响。

    事到如今已经大功告成了吗?未必。为了测试我们的这套系统到底提升了多少的效率,还需要统计完成队列里面所有任务的总耗时。

    四、统计任务完成耗时

    要统计任务完成的耗时,只需要实现下列的公式即可:

    总耗时 = 最后一个任务的完成时间 - 首个任务被取得的时间

    首先来解决“获取首个任务被取得的时间”这个问题。

    由于我们是通过 PM2 的 Cluster 模式来启动应用的,且从 Redis 队列中读取任务是个异步操作,因此在多进程运行的情况下无法直接保证从队列中读取任务的先后顺序,必须通过一个额外的标记来判断。其原理如下图:

    如图所示,绿色的 worker 由于无法保证运行的先后顺序,所以编号用问号来表示。当第一个任务被取得时,把黄色的标记值从 false 设置成 true。当且仅当黄色的标记值为 false 时才会设置时间。这样一来,当其他任务被取得时,由于黄色的标记值已经是 true 了,因此无法设置时间,所以我们便能得到首个任务被取得的时间。

    在本文的例子中,黄色的标记值和首个任务被取得的时间也被存放在 Redis 中,分别被命名为 localtasksSETFIRST 和 localtasksBEGINTIME。

    原理已经弄懂,但是在实践中还有一个地方值得注意。我们知道,从 Redis 中读写数据也是一个异步操作。由于我们有多个 worker 但只有一个 Redis,那么在读取黄色标记值的时候很可能会出现“冲突”的问题。举个例子,当 worker-1 修改标记值为 true 的同时, worker-2 正好在读取标记值。由于时间的关系,可能 worker-2 读到的标记值依然是 false,那么这就冲突了。为了解决这个问题,我们可以使用 node-redlock 这个工具来实现“锁”的操作。

    顾名思义,“锁”的操作可以理解为当 worker-1 读取并修改标记值的时候,不允许其他 worker 读取该值,也就是把标记值给锁住了。当 worker-1 完成标记值的修改时会释放锁,此时才允许其他的 worker 去读取该标记值。

    node-redlock 是 Redis 分布式锁 Redlock 算法的 JavaScript 实现,关于该算法的讲解可参考 https://redis.io/topics/distlock。值得注意的是,在 node-redlock 在使用的过程中,如果要锁一个已存在的 key,就必须为该 key 添加一个前缀 locks:,否则会报错。

    回到 utils.ts,编写一个 setBeginTime() 的工具函数:

    exportconst setBeginTime = async(redlock: Redlock) => {
    // 读取标记值前先把它锁住
    constlock= await redlock.lock(`lock:${TASK_NAME}_SET_FIRST`, 1000)
    const setFirst = await getRedisValue(`${TASK_NAME}_SET_FIRST`)
    // 当且仅当标记值不等于 true 时,才设置起始时间
    if(setFirst !== 'true') {
        console.log(`${pm2tips} Get the first task!`)
    await setRedisValue(`${TASK_NAME}_SET_FIRST`, 'true')
    await setRedisValue(`${TASK_NAME}_BEGIN_TIME`, `${new Date().getTime()}`)
    }
    // 完成标记值的读写操作后,释放锁
    awaitlock.unlock().catch(e => e)
    }
    

    然后把它添加到 taskHandler() 函数里面即可:

    exportdefaultasyncfunction tasksHandler() {
    +  // 获取第一个任务被取得的时间
    +  await setBeginTime(redlock)
    // 从队列中取出一个任务
    const task = await popTask()
    // 处理任务
    await handleTask(task)
    // 递归运行
    await tasksHandler()
    }
    

    接下来解决“最后一个任务的完成时间”这个问题。

    类似上一个问题,由于任务执行的先后顺序无法保证,异步操作的完成时间也无法保证,因此我们也需要一个额外的标识来记录任务的完成情况。在 Redis 中创建一个初始值为 0 的标识 localtasksCURINDEX,当 worker 完成一个任务就让标识加。由于任务队列的初始长度是已知的(为 TASKAMOUNT 常量,也写入了 Redis 的 localtasksTOTAL 中),因此当标识的值等于队列初始长度的值时,即可表明所有任务都已经完成。

    如图所示,被完成的任务都会让黄色的标识加一,任何时候只要判断到标识的值等于队列的初始长度值,即可表明任务已经全部完成。

    回到 taskHandler() 函数,加入下列内容:

    exportdefaultasyncfunction tasksHandler() {
    +  // 获取标识值和队列初始长度
    +  let curIndex = Number(await getRedisValue(`${TASK_NAME}_CUR_INDEX`))
    +  const taskAmount = Number(await getRedisValue(`${TASK_NAME}_TOTAL`))
    +  // 等待新任务
    +  if(taskAmount === 0) {
    +    console.log(`${pm2tips} Wating new tasks...`)
    +    await sleep(2000)
    +    await tasksHandler()
    +    return
    +  }
    +  // 判断所有任务已经完成
    +  if(curIndex === taskAmount) {
    +    const beginTime = await getRedisValue(`${TASK_NAME}_BEGIN_TIME`)
    +    // 获取总耗时
    +    const cost = newDate().getTime() - Number(beginTime)
    +    console.log(`${pm2tips} All tasks were completed! Time cost: ${cost}ms. ${beginTime}`)
    +    // 初始化 Redis 的一些标识值
    +    await setRedisValue(`${TASK_NAME}_TOTAL`, '0')
    +    await setRedisValue(`${TASK_NAME}_CUR_INDEX`, '0')
    +    await setRedisValue(`${TASK_NAME}_SET_FIRST`, 'false')
    +    await delRedisKey(`${TASK_NAME}_BEGIN_TIME`)
    +    await sleep(2000)
    +    await tasksHandler()
    }
    // 获取第一个任务被取得的时间
    await setBeginTime(redlock)
    // 从队列中取出一个任务
    const task = await popTask()
    // 处理任务
    await handleTask(task)
    + // 任务完成后需要为标识位加一
    +  try{
    +    constlock= await redlock.lock(`lock:${TASK_NAME}_CUR_INDEX`, 1000)
    +    curIndex = await getCurIndex()
    +    await setCurIndex(curIndex + 1)
    +    awaitlock.unlock().catch((e) => e)
    +  } catch(e) {
    +    console.log(e)
    +  }
    +  // recursion
    +  await tasksHandler()
    +}
    // 递归运行
    await tasksHandler()
    }
    

    到这一步为止,我们已经解决了获取“最后一个任务的完成时间”的问题,再结合前面的首个任务被取得的时间,便能得出运行的总耗时。

    最后来看一下实际的运行效果。我们循例往队列里面添加了 task-1 到 task-20 这 20 个任务,然后启动 4 个进程来跑:

    运行状况良好。从运行结果来看,4 个进程处理 20 个平均耗时 2 秒的任务,只需要 10 秒的时间,完全符合设想。

    五、小结

    当面对海量的异步任务需要处理的时候,多进程 + 任务队列的方式是一个不错的解决方式。本文通过探索 Redis + NodeJS 结合的方式,构造出了一个异步任务队列处理系统,能较好地完成最初方案的设想,但依然有很多问题需要改进。比如说当任务出错了应该怎么办,系统能否支持不同类型的任务,能否运行多个队列等等,都是值得思考的问题。

    ❤️爱心三连击

    1.看到这里了就点个在看支持下吧,你的「点赞,在看是我创作的动力。

    2.关注公众号程序员成长指北,回复「1」加入Node进阶交流群!「在这里有好多 Node 开发者,会讨论 Node 知识,互相学习」!

    3.也可添加微信【ikoala520】,一起成长。

    “在看转发”是最大的支持

    展开全文
  • 基于Java 异步任务队列。 利用代码生成减少样板类定义。 使用 Factotum 定义任务和请求 创建一个具有默认零参数构造函数的非抽象类。 在这个类中创建一个包含将异步运行的代码的方法。 package com.example; ...
  • 使用asyncio的异步任务库 一个基于Asyncio的任务队列,其设计非常易于使用! 阅读文档: 安装 pip install pytask-io docker run redis # Rabbit MQ coming soon... 用法 from pytask_io import PyTaskIO # ...
  • 客户端api用于异步任务队列,使用PouchDB进行同步 例子 taskQueue ( 'email' ) . start ( { to : 'john@example.com' , subject : 'Ohaj there' , body : 'Hey John,\n\nhow are things?\n\n– Jane' } ) . then...
  • celery 异步任务队列

    2018-03-19 19:54:22
    celery python 异步队列
  • 异步任务队列的两种处理方法

    千次阅读 2017-10-09 13:34:53
    先对这里的异步任务做下解释: 这里的意思是,该任务有几种状态,创建,等待,运行,结束;其中等待是因为,该任务要正常运行,需要其他线程(或进程)提供相应的条件(或触发事件),然后才会运行。 针对这种要异步...
  • Celery异步任务队列 本质:通过创建进程调用函数来实现任务的异步执行。 概念: 任务发出者:发出任务(要执行函数)消息 任务执行者:提前创建的进程 中间人(任务队列):存放发出任务消息 使用: 1)安装:pip ...
  • 哈喽,大家好,我是asong,这次给大家介绍一个go的异步任务框架machinery。使用过python的同学们都知道Celery框架,machinery框架就类似于Celery框架。下面我们就来学习一下machinery的基本使用。 自己翻译一个粗略...
  • JavaScript 阻塞方式实现异步任务队列

    千次阅读 2020-09-08 21:07:58
    有个需求,需要实现一个异步任务队列,并依次处理队列中的所有任务,具体如下: 随机时间增加异步任务到队列中 队列中的任务按照先进先出的规则依次执行 任务为异步请求,等一个执行完了再执行下一个 这个需求若...
  • golang中提供了2种定时器timer和ticker(如果JS很熟悉的话应该会很了解),分别是一次性定时器和重复任务定时器。 一般用法: func main() { input := make(chan interface{}) //producer - produce the messages ...
  • 思路如下:1)用户每发一条微博,就生成一个异步任务,PUSH到队列中去2)由另外一个进程或线程顺序执行队列里的任务问题是:实现消息队列可以采用MemcachedQ,starling等等,将任务以某种格式放到消息队列中去没什么...
  • 异步任务是 Web 后端开发中最常见的需求,非常适合多任务、高并发的场景。本文分享如何使用 docker-compose、FastAPI、rq 来快速创建一个包含异步任务队列集群的 RES...
  • Machinery —— Go 语言异步任务队列

    千次阅读 2017-06-05 16:05:00
    Machinery 是一个 Go 语言的异步任务队列和作业队列,基于分布式消息传递。类似 Python 的 Celery 框架。 Machinery 中的任务(或者作业)可通过多个 worker 在很多服务器上并发的执行,或者可在单个服务器上利用 Go...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 182,635
精华内容 73,054
关键字:

异步任务队列

友情链接: WindowsD-master.zip