精华内容
参与话题
问答
  • 如果有多个任务,就必须排队,前面一个任务完成,再执行后面一个任务,以此类推。 这种模式的好处是实现起来比较简单,执行环境相对单纯;坏处是只要有一个任务耗时很长,后面的任务都必须排队等着,会拖延整个程序...

    同步与异步模式简介

    我们知道,Javascript语言的执行环境是单线程(single thread)的。

    所谓"单线程",就是指一次只能完成一件任务。如果有多个任务,就必须排队,前面一个任务完成,再执行后面一个任务,以此类推。

    这种模式的好处是实现起来比较简单,执行环境相对单纯;坏处是只要有一个任务耗时很长,后面的任务都必须排队等着,会拖延整个程序的执行。常见的浏览器无响应(假死),往往就是因为某一段Javascript代码长时间运行(比如死循环),导致整个页面卡在这个地方,其他任务无法执行。

    为了解决这个问题,Javascript语言将任务的执行模式分成两种:同步(Synchronous)和异步(Asynchronous)。

    同步模式就是后一个任务等待前一个任务结束,然后再执行,程序的执行顺序与任务的排列顺序是一致的、同步的;

    异步模式则完全不同,每一个任务有一个或多个回调函数(callback),前一个任务结束后,不是执行队列上的后一个任务,而是执行回调函数;后一个任务则是不等前一个任务的回调函数的执行而执行,所以程序的执行顺序与任务的排列顺序是不一致的、异步的。

    "异步模式"非常重要。在浏览器端,耗时很长的操作都应该异步执行,避免浏览器失去响应,最好的例子就是Ajax操作。在服务器端,"异步模式"甚至是唯一的模式,因为执行环境是单线程的,如果允许同步执行所有http请求,服务器性能会急剧下降,很快就会失去响应。

     

    异步任务队列

    可能有人告诉你,Javascript内部存在着先进先出的异步任务队列,仅仅用以存储异步任务,与同步任务分开管理。

    进程执行完全部同步代码后,每当进程空闲、触发回调或定时器到达规定的时间,Javascript会从队列中顺序取出符合条件的异步任务并执行之。

    我们简单验证一下,

    var timeout1 = setTimeout(function() {
      console.log(2);
    }, 0);
    
    console.log(1);
    
    var timeout2 =setTimeout(function() {
      console.log(3);
    }, 0);
    

    上面的代码我们都知道输出是 1 2 3,因为setTimeout是异步任务,而timeout1又比timeout2先注册,所以最终输出了这个结果。

    然而,仅仅通过以上代码我们确定不了同步任务究竟是不是会优先于异步任务执行,因为setTimeout有一个最小的时间间隔限制,在这个时间间隔里语句console.log(1)完全可以执行完毕,我们要想办法让同步代码占用更长时间。

    定时器最小时间间隔:在苹果机上的最小时间间隔是10ms,在Windows系统上的最小时间间隔大约是15ms。Firefox中定义的最小时间间隔是10ms,而HTML5规范中定义的最小时间间隔是4ms

    再阅读下面代码,

    setTimeout(function() {
      console.log(1);
    }, 0);
    
    console.log(2);
    
    let end = Date.now() + 1000*5;
    
    while (Date.now() < end) {
    }
    
    console.log(3);
    
    end = Date.now() + 1000*5;
    
    while (Date.now() < end) {
    }
    
    console.log(4);
    

    输出顺序:2 3 4 1。从上面的输出结果我们可以确定,异步代码是在所有同步代码执行完毕以后才开始执行的。

    那我们刚刚对js异步任务队列的理解方式是对的吗?底层机制会是这样的吗?

    事实上,我们上述对于异步队列的理解和解释都是非常浅层和感性的(并且是错误的),虽然跟着上述的理解方式我们可以解释很多代码行为,但实际的机制却远没有这么简单,异步模式作为Javascript的重中之重,有很多设计细节是我们未知的,我们应当更加理性和学术地去探究学习。

    再看一段比较复杂的代码,说出它的输出顺序:

    setTimeout(function(){
        console.log(2);
    },0);
    
    new Promise(function(resolve){
        console.log(3);
        resolve();
        console.log(4);
    }).then(function(){
        console.log(5);
    });
    
    console.log(6);
    
    setTimeout(function(){
        console.log(7);
    },0);
    
    console.log(8);
    

    你认为上述代码输出结果是什么呢?讲出理由。

    浏览器环境输出结果:输出顺序为,3 4 6 8 5 2 7,跟你事先认为的结果一样吗?为什么结果会这样?

    除了注册顺序以外,还有什么因素影响着每个异步任务在异步队列中的顺序呢?

    我们先一起了解下事件循环任务队列两个概念,再回来解答这个问题。

     

    线程、事件循环和任务队列

    Javascript是单线程的,但是却能执行异步任务,这主要是因为 JS 中存在事件循环(Event Loop)和任务队列(Task Queue)

    事件循环:JS 会创建一个类似于 while (true) 的循环,每执行一次循环体的过程称之为Tick。每次Tick的过程就是查看是否有待处理事件,如果有则取出相关事件及回调函数放入执行栈中由主线程执行。待处理的事件会存储在一个任务队列中,也就是每次Tick会查看任务队列中是否有需要执行的任务。

    任务队列:异步操作会将相关回调添加到任务队列中。而不同的异步操作添加到任务队列的时机也不同,如onclicksetTimeout,ajax 处理的方式都不同,这些异步操作是由浏览器内核的webcore来执行的,webcore包含下图中的3种 webAPI,分别是DOM Bindingnetworktimer模块。

    • DOM Binding 模块处理一些DOM绑定事件,如onclick事件触发时,回调函数会立即被webcore添加到任务队列中。
    • network 模块处理Ajax请求,在网络请求返回时,才会将对应的回调函数添加到任务队列中。
    • timer 模块会对setTimeout等计时器进行延时处理,当时间到达的时候,才会将回调函数添加到任务队列中。

    主线程:JS 只有一个线程,称之为主线程。而事件循环是主线程中执行栈里的代码执行完毕之后,才开始执行的。所以,主线程中要执行的代码时间过长,会阻塞事件循环的执行,也就会阻塞异步操作的执行。只有当主线程中执行栈为空的时候(即同步代码执行完后),才会进行事件循环来观察要执行的事件回调,当事件循环检测到任务队列中有事件就取出相关回调放入执行栈中由主线程执行。

     

    ES5规范中对于事件循环的定义

    翻开规范《ECMAScript® 2015 Language Specification》,找到事件循环 6.1.4 Event loops

    规范中中提到,一个浏览器环境,只能有一个事件循环,而一个事件循环可以多个任务队列,每个任务都有一个任务源(Task source)。

    相同任务源的任务,只能放到一个任务队列中。

    不同任务源的任务,可以放到不同任务队列中。

    又举了一个例子说,客户端可能实现了一个包含鼠标键盘事件的任务队列,还有其他的任务队列,而给鼠标键盘事件的任务队列更高优先级,例如75%的可能性执行它。这样就能保证流畅的交互性,而且别的任务也能执行到了。同一个任务队列中的任务必须按先进先出的顺序执行,但是不保证多个任务队列中的任务优先级,具体实现可能会交叉执行。

    结论:一个事件循环可以有多个任务队列,队列之间可有不同的优先级,同一队列中的任务按先进先出的顺序执行,但是不保证多个任务队列中的任务优先级,具体实现可能会交叉执行。

    重新看回开始的代码:

    setTimeout(function(){
        console.log(2);
    },0);
    
    new Promise(function(resolve){
        console.log(3);
        resolve();
        console.log(4);
    }).then(function(){
        console.log(5);
    });
    
    console.log(6);
    
    setTimeout(function(){
        console.log(7);
    },0);
    
    console.log(8);
    

    输出结果是,3 4 6 8 5 2 7。为什么setTimeout会后于promise.then执行呢,原因或许就是它所处的任务队列优先级较低。

     

    不同任务队列的优先级

    那么接下来,我们探究一下不同任务队列的优先级。

    实际上,对于任务队列的优先级的定义,Promise/A+ 规范中有作详细的解释。

    图灵社区 : 阅读 : 【翻译】Promises/A+规范

    我们都知道,一个Promise的当前状态必须为以下三种状态中的一种:等待态(Pending)、执行态(Fulfilled)和拒绝态(Rejected)。

    而上面的Promises规范就规定了,实践中要确保onFulfilledonRejected异步执行,且应该在then方法被调用的那一轮事件循环以后的新执行栈中执行

    意思就是,当我们调用resolve()reject()的时候,触发promise.then(...)实际上是一个异步操作,这个promise.then(...)并不是在resolve()reject()的时候就立刻执行的,而也是要重新进入任务队列排队的,不过能直接在当前的事件循环新的执行栈中被取出执行(不用等下次事件循环)。

    知道这个以后,我们再看一段代码,这个代码包含常用的大部分异步操作,我们将借此得出不同任务队列的优先顺序:

    (其中setImmediate()process.nextTick()是node的语句)

    setImmediate(function(){
        console.log(1);
    },0);
    setTimeout(function(){
        console.log(2);
    },0);
    new Promise(function(resolve){
        console.log(3);
        resolve();
        console.log(4);
    }).then(function(){
        console.log(5);
    });
    console.log(6);
    process.nextTick(function(){
        console.log(7);
    });
    console.log(8);
    

    NodeJs环境输出

    其中3 4 6 8是同步输出的。 因为注册顺序:1 > 2 > 5 > 7,而输出顺序是7 > 5 > 2 > 1

    所以可以很容易得到,优先级 :process.nextTick > promise.then > setTimeout > setImmediate

    process.nextTick()属于idle观察者,setImmediate()属于check观察者.在每一轮循环检查中,idle观察者先于I/O观察者,I/O观察者先于check观察者.

    而实际上,上述的Promises规范早已提到异步队列优先级规定的详细定义和解释了,并不需要我们一个一个去测试。

     

    小结

    在JS引擎中,我们可以按性质把任务分为两类,macrotask(宏任务)和 microtask(微任务)。

    浏览器JS引擎中:

    • macrotask(按优先级顺序排列): script(你的全部JS代码,“同步代码”), setTimeoutsetIntervalsetImmediateI/O,UI rendering
    • microtask(按优先级顺序排列):process.nextTick,Promises(这里指浏览器原生实现的 Promise), Object.observeMutationObserver
    • JS引擎首先从macrotask queue中取出第一个任务,执行完毕后,将microtask queue中的所有任务取出,按顺序全部执行;
    • 然后再从macrotask queue(宏任务队列)中取下一个,执行完毕后,再次将microtask queue(微任务队列)中的全部取出;
    • 循环往复,直到两个queue中的任务都取完。

    所以,浏览器环境中,js执行任务的流程是这样的:

    1. 第一个事件循环,先执行script中的所有同步代码(即 macrotask 中的第一项任务)
    2. 再取出 microtask 中的全部任务执行(先清空process.nextTick队列,再清空promise.then队列)
    3. 下一个事件循环,再回到 macrotask 取其中的下一项任务
    4. 再重复2
    5. 反复执行事件循环…

    NodeJS引擎中:

    1. 先执行script中的所有同步代码,过程中把所有异步任务压进它们各自的队列(假设维护有process.nextTick队列、promise.then队列、setTimeout队列、setImmediate队列等4个队列)
    2. 按照优先级(process.nextTick > promise.then > setTimeout > setImmediate),选定一个  不为空 的任务队列,按先进先出的顺序,依次执行所有任务,执行过程中新产生的异步任务继续压进各自的队列尾,直到被选定的任务队列清空。
    3. 重复2...

    也就是说,NodeJS引擎中,每清空一个任务队列后,都会重新按照优先级来选择一个任务队列来清空,直到所有任务队列被清空。

     

    现在,你可以根据这个流程再看回前面的代码,其实一切都很容易理解了…

    以上,就是Javascript任务队列的顺序机制。

    展开全文
  • java多线程任务队列模型

    千次阅读 2018-11-08 16:59:45
    此篇文章将从任务队列的设计;任务调度的方式(串行和并行)。代码很简单,主要是设计的思想。 任务队列 final class PendingPostQueue {  // 含有头、尾指针的链表结构实现队列  private PendingPost head;  ...

    此篇文章将从任务队列的设计;任务调度的方式(串行和并行)。代码很简单,主要是设计的思想。

    任务队列
    final class PendingPostQueue {
        // 含有头、尾指针的链表结构实现队列
        private PendingPost head;
        private PendingPost tail;

        // 入队列
        synchronized void enqueue(PendingPost pendingPost) {
            if (pendingPost == null) {
                throw new NullPointerException("null cannot be enqueued");
            }
            if (tail != null) {
                tail.next = pendingPost;
                tail = pendingPost;
            } else if (head == null) {
                head = tail = pendingPost;
            } else {
                throw new IllegalStateException("Head present, but no tail");
            }
            notifyAll();
        }

        // 出队列
        synchronized PendingPost poll() {
            PendingPost pendingPost = head;
            if (head != null) {
                head = head.next;
                if (head == null) {
                    tail = null;
                }
            }
            return pendingPost;
        }

        // 等待最大时长; 如果此时有入队列的操作(notifyAll),直接出队列
        synchronized PendingPost poll(int maxMillisToWait) throws InterruptedException {
            if (head == null) {
                wait(maxMillisToWait);
            }
            return poll();
        }
    }
    上面的代码很简单,基本上一看就能明白;下面主要分析,这样设计的优点:

    使用头、尾指针的链表结构实现队列;入队列通过操作尾指针,出队列通过操作头指针的方式达到时间复杂度都是O(1).
    增加出队列延迟的功能,方式在空队列的时候,持续获取或直接返回空;增加一段时间间隔等待其他线程的入队列的操作(尽可能处理尽量多的任务。)
    任务调度:串行执行
    串行的任务调度,基本上是单线程模型。因为基本上是下一个任务的执行需要等到上一个任务执行完成。 
    代码如下:

    // 当前任务调度类(串行)
    final class BackgroundPoster implements Runnable {
        // 任务队列
        private final PendingPostQueue queue;
        // 当前线程是否在正在运行
        // volatile: 保证单个变量的读写操作是线程安全(通过cpu实现CAS)
        private volatile boolean executorRunning;

        BackgroundPoster() {
            queue = new PendingPostQueue();
        }

        public void enqueue(String id, Object event) {
            PendingPost pendingPost = PendingPost.obtainPendingPost(id, event); // 创建任务
            synchronized (this) {
                queue.enqueue(pendingPost); // 入队列
                // 如果当前没有正在运行的任务,开启任务
                if (!executorRunning) {
                    executorRunning = true;
                    ThreadUtils.getExecutorService().execute(this);
                }
            }
        }

        @Override
        public void run() {
            try {
                try {
                    while (true) {
                        // 从任务队列中获取任务;设置一分钟时间间隔,防止在1000分钟内有新任务入队列
                        PendingPost pendingPost = queue.poll(1000);
                        if (pendingPost == null) {
                            synchronized (this) {
                                // 双层检验
                                pendingPost = queue.poll();
                                if (pendingPost == null) {
                                    // 运行标志置为false
                                    executorRunning = false;
                                    return; // 如果没有任务了,将会结束此次循环,也就相当于停止了当前线程(也正因为此,上面的wait(1000)才很重要)
                                }
                            }
                        }
                        // 执行任务
                        invokePost(pendingPost);
                    }
                } catch (InterruptedException e) {
                    Log.w("Event", Thread.currentThread().getName() + " was interruppted", e);
                }
            } finally {
                executorRunning = false;
            }
        }

    }

    上面的代码也不难,对照我写的注释看起来会很简单。原理也很简单:

    任务的执行是在一个子线程(通过线程池开启的)中
    任务的调度是通过操作任务队列实现的,通过循环依次调用队列中的任务。
    wait(1000)的作用,最大化使用线程资源;防止队列中刚没有任务了就停止线程(具体分析在注释中)
    任务调度:并行执行
    并行调度任务,就需要多线程调度了。 
    具体代码实现如下:

    class AsyncPoster implements Runnable {

        private final PendingPostQueue queue;

        AsyncPoster() {
            queue = new PendingPostQueue();
        }

        public void enqueue(String id, Object event) {
            PendingPost pendingPost = PendingPost.obtainPendingPost(id, event);
            queue.enqueue(pendingPost);
            ThreadUtils.getExecutorService().execute(this);
        }

        @Override
        public void run() {
            PendingPost pendingPost = queue.poll();
            if(pendingPost == null) {
                throw new IllegalStateException("No pending post available");
            }
            invokePost(pendingPost);
        }
    }

    上面的代码更简单,就是每一个任务开启一个线程去执行。 
    但是如果仔细查看代码会发现: 
    这里根本就没有必要使用任务队列,直接开启线程去执行任务不就行了吗?这里任务队列的作用是用来传递数据。

    任务调度:Android主线程调度
    我们经常会遇到:回调在主线程中执行。由于主线程只有一个,也就相当于上面的串行执行。而Android有自己的Handler消息机制帮我们封装好了,下面就基于这个来实现。

    final class HandlerPoster extends Handler {

        private final PendingPostQueue queue;
        // 主线程执行最大时长(防止阻塞主线程)
        private final int maxMillisInsideHandleMessage;
        // 正在运行的标志(同串行执行)
        private boolean handlerActive;
        // 参数looper决定了当前任务所运行的线程,这里传递Looper.mainLooper()就会将当前任务运行在主线程中
        HandlerPoster(Looper looper, int maxMillisInsideHandleMessage) {
            super(looper);
            this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
            queue = new PendingPostQueue();
        }

        void enqueue(String id, Object event) {
            PendingPost pendingPost = PendingPost.obtainPendingPost(id, event);
            synchronized (this) {
                queue.enqueue(pendingPost);
                if (!handlerActive) {
                    handlerActive = true;
                    // 发送消息
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                }
            }
        }

        @Override
        public void handleMessage(Message msg) {
            boolean rescheduled = false;
            try {
                long started = SystemClock.uptimeMillis();
                while (true) {
                    PendingPost pendingPost = queue.poll();
                    if (pendingPost == null) {
                        synchronized (this) {
                            // Check again, this time in synchronized
                            pendingPost = queue.poll();
                            if (pendingPost == null) {
                                handlerActive = false;
                                return;
                            }
                        }
                    }
                    invokePost(pendingPost);
                    long timeInMethod = SystemClock.uptimeMillis() - started;
                    // 如果在主线程中执行的时间超过最大时间,停止当前操作,重新发送消息;防止祖册主线程
                    if (timeInMethod >= maxMillisInsideHandleMessage) {
                        if (!sendMessage(obtainMessage())) {
                            throw new EventBusException("Could not send handler message");
                        }
                        //重置执行,也就是还处于运行状态。
                        rescheduled = true;
                        return;
                    }
                }
            } finally {
                // 运行状态由rescheduled决定
                handlerActive = rescheduled;
            }
        }
    }

    代码也不难,原理基本和串行调度相同;唯一不同,因为是在主线程中,需要对线程阻塞的问题进行考虑。
    --------------------- 
    作者:qiaoba_gogo 
    来源:CSDN 
    原文:https://blog.csdn.net/u010014658/article/details/77925567 

    展开全文
  • 解决方案之任务队列

    千次阅读 2016-08-14 20:32:13
    在一些系统中,会有对某些任务状态进行跟踪,如果任务失败需要重新执行任务。本文主要是针对这种请求提出解决方案,因为时间原因,方案还没有在代码中实现。但是经过和朋友的推演,是目前能想到的比较有效的方案了。...

    主页:www.howardliu.cn
    博文:解决方案之任务队列

    在一些系统中,会有对某些任务状态进行跟踪,如果任务失败需要重新执行任务。本文主要是针对这种请求提出解决方案,因为时间原因,方案还没有在代码中实现。但是经过和朋友的推演,是目前能想到的比较有效的方案了。鉴于本人才疏学浅,如果有某位大神有更好的解决方案,请一定不吝赐教,感谢不尽。。。

    1. 问题描述

    1.1 一个主任务,多个子任务

    在当前的系统环境中,通常一个应用会有多个实例,即水平拆分,提升并发能力。正常情况下,一个实例接收到一条请求,即开始对该请求进行处理。如果该请求是命令当前实例对某一分类下所有商品重建索引,假设该分类下有10000个商品,即该实例在接下来一段时间要有大量资源投入到重建索引中。但是其他实例都在闲着,形成一人干活,众人围观的局面。

    假如该任务正常结束,这种方式也是没什么太大的问题的。但是可能出现一种极端的情况,该实例对其中的9999件商品重建索引都成功了,恰巧重建最后一条时失败实例挂了,则当前任务即任务是失败的,那前面的9999件商品创建索引的工作就是白费的。

    1.2 任务状态跟踪

    在一个消息平台中,接收到的消息向目标地址发送失败后,在一段时间后需要再尝试发送几次,保证消息可达。如果经过几次重试之后,发送消息依然失败,那将消息状态置为失败,等待人工干预。

    假设这个消息平台很不靠谱,或者目标服务不靠谱,经过一段时间后,重试任务累计到3000。这3000条需要重试的任务不均匀的分布的各个时间段上,消息标识不是序列号,没法通过序列号段进行取数。在这种情况下,即使有个多个实例可以同时对这些消息重试,为了不遗漏、不重复,只能够简单的通过时间分组重试,这样就会有任务分配不均,无法很好发挥集群的问题处理协作能力。

    2. 解题思路

    其实上面两种情况可以认为是一种,即一堆无状态的任务需要被执行。为了资源的有效利用,不应该同时有多个应用执行任务,而且当任务成功后,也不需要再次执行。

    最直接和最简单的思路就是需要提供可存储任务的系统:

    1. 定时的或以监听的方式从该存储系统中获取任务列表
    2. 检查该任务是否被加锁,如果加锁,放弃执行该任务;如果未加锁,对该任务加锁
    3. 开始执行任务
    4. 执行结束后,将任务结果写入存储系统,并对任务解锁
    5. 重复1操作,如果发现任务成功执行,则跳过任务或归档任务

    3. 解决方案

    3.1 轮询

    根据上面的解题思路,定时轮询是最简单最直接的方案。

    轮询

    如上图所示:

    1. JOB任务定时从1中获取任务列表
    2. 循环操作任务列表中的任务
    3. 将任务结果写回数据库

    但是这种方式可优化的地方很多,比如:
    - 如果有多个实例,每个实例在任务启动的时候取任务列表中的一部分,即分页取任务列表。这就需要保证任务列表可有效分页,并且需要保证任务平均分散在任务列表每页中。比如根据时间取列表,而且任务列表在时间轴上比较均匀。
    - 同一个任务执行过程中要有锁,不需要两个实例同时执行同一个任务
    - 任务执行过程中要有状态。当该任务执行还没有成功完成时,如果持有该任务的实例死亡,能够有其他实例重新执行该任务

    这种方式是我接手代码中使用的方式,但是那个人没有对任务列表分页。正常情况下,任务列表很短,只有小于100条,而且获取任务列表周期是5分钟,运行完全没有问题。但是一旦任务集中输入的时候,每次都获取所有任务,可以想象,一个实例在某一时刻输入3000个任务,然后开始一个一个执行,任务执行时间无限延长。为了利用集群共同处理问题的能力,于是开始对代码进行改造,就是下面这种轮询+监听的方式。

    3.2 轮询+监听

    轮询+监听的方式也是有弊端的,后面慢慢说。

    轮询

    如上图,很明显的可以看出,这个能够算是3.1的升级版(虽然是升级版,效果依然不佳)。

    1. JOB任务定时从数据获取任务列表
    2. 循环操作任务列表,剔除不符合要求的任务
    3. 将符合要求的任务写入zookeeper,在taskPath下创建任务节点。
    4. Listener监听taskPath字节点事件,发现有任务节点创建事件,从zookeeper读取节点数据,开始执行任务
    5. 任务执行结束,将任务状态写回数据库

    这种方式增强了任务执行效率,只要JOB定时规则设置合理,理论上任务会随机分配到各个监听实例中,并执行任务。这个方案中的短板在定时轮询和zookeeper压力:

    • 定时轮询:因为时间紧,所以没有抛弃一开始JOB轮询任务这部分。所以只能够利用zookeeper的分布式锁,集群中某一实例读取读取任务列表,并将任务写入zookeeper。如果没有后面的问题,也是可以接受这种方式。
    • zookeeper服务压力:因为zookeeper的节点监听是要创建长连接、而且经常要向zookeeper方法状态确认请求,所以如果任务节点比较多、且驻留时间较长的时候,对zookeeper服务器压力比较大。有弊必有利,如果服务器能够撑住这种压力,这种方式能够保证,任务节点的任何变化,能够被准实时的感知到,针对任务变化,迅速做出响应。

    3.3 任务队列

    分析前面两种方案的短板,以及加上之前的经验。其实解决方案就呼之欲出了:一个很长的任务列表,最快的方法是分组批量执行,即分页获取列表中任务,然后使用多线程批量执行这些任务。(至于每次取多少,使用多少线程执行只能根据不同的任务难度、任务周期来计算了):

    • 分页获取:分页的难度就在于分页要均匀,且有明显的分页标识,以便另外一个实例不会重复获取已经分页数据。最简单的数据结构就是FIFO队列,能够顺序读取队列中的数据。因为是集群环境,只需要这个队列能够实现数据排他(删除、隐藏或通过位移控制)读取即可。
    • 批量执行:批量执行最简单的方式是通过多线程并行执行任务,这点不难。

    执行过程如下图所示:

    轮询

    1. producer将任务数据写入数据库,做备份或记录任务状态使用
    2. producer将任务数据写入任务队列中
    3. consumer从任务队列中分页获取任务列表,批量执行。根据执行情况及执行状态,判断是否重新返回任务队列等待执行
    4. 执行成功的任务,将任务状态入库
    5. 执行失败的任务重新写回任务队列,等待再次被读取执行

    这里需要考虑一种异常情况:如果某一实例的consumer读取任务列表,任务队列将已读取任务列表删除后,该实例死亡。在该方案中,将丢失该实例中的任务,下面的双任务队列的方式可以解决这个问题。

    3.4 双任务队列

    可以考虑这个一个例子,生产线上工人们在做工,从传送带上取一组零件进行检查。检查不合格重新放会生产线末尾,等待机器重新加工零件;检查合格装箱打包。传送带即任务队列;员工即consumer;员工取一组零件后传送带上就没有这些零件,即任务被排他获取;零件合格装箱,即任务成功;零件不合格重新放回传送带,即任务失败。与上面的方案很类似。

    假设,有一个员工取完零件并检查了一半了,有的装箱,有的打回,然后突然不想干了,直接走了。这个时候其工作台上就散落一堆未检查零件。如果有一个人巡逻检查各个工作台,发现无人职守且有散落零件的工作台,只要把工作台上的零件放回传送带,这些零件又能够被正常的检查。

    将上面的例子应用到我们的方案中,就是一个双任务队列的模型,如下图所示:

    轮询

    1. producer将任务数据写入数据库,做备份或记录任务状态使用
    2. producer将任务数据写入任务队列中
    3. consumer从任务队列中分页获取任务列表
    4. consumer将任务列表写入第二任务队列,防止任务丢失
    5. 执行成功的任务,将任务状态入库
    6. 执行失败的任务重新写回任务队列,等待再次被读取执行
    7. 定时任务检查任务第二任务队列,找到无主任务
    8. 定时任务将从第二任务队列中获取的无主任务写回producer

    考虑这种情况:如果任务队列排他读取方式中使用的是数据读取后删除,那么consumer在读取数据之后,写入第二任务队列之前,所在实例死亡,任务依然会丢失。所以比较稳妥的办法是,任务队列的排他方式是屏蔽或位移。

    • 屏蔽,就是如果有一个consumer读取任务数据,则将改任务数据状态修改,其他consumer不能够再看到该条数据,等待consumer确认之后,则可以将数据删除或归档。
    • 位移是通过一个位移量记录当前读取位置,并设置锁,其他consumer等待当前处理任务,处理结束后,提交位移量,其他consumer可以读取数据。

    4 任务队列的选择

    4.1 RabbitMQ

    在RabbitMQ中,可以通过监听的方式Channel.basicConsume获取队列中的任务消息,为了安全考虑,需要将第二个参数autoAck置为false。这样当前的consumer读取消息之后,消息状态是Unacked,这个时候其他consumer就不能够看到这条消息,只有主动调用Channel.basicAck确认之后,消息才会被删除。如果消息未被ack确认,当前consumer死亡,消息会被重新置为Ready状态,可以被其他consumer消费。这种即上面所说的屏蔽的方式,任务可以无序的执行。

    为了可以尽可能的榨干集群中每个实例的资源,每个实例可以启用多个线程同时监听队列,即每个实例有多个consumer,这样能够尽可能快的将消息出队。下面是简单的实例代码,先创建指向RabbitMQ集群的连接,然后通过producer向RabbitMQ服务发送数据,最后通过consumer订阅方式消费消息。

    创建连接:

    ConnectionFactory factory = new ConnectionFactory();
    factory.setVirtualHost("/");
    factory.setUsername("username");
    factory.setPassword("password");
    factory.setAutomaticRecoveryEnabled(true);
    factory.setNetworkRecoveryInterval(10000);
    factory.setConnectionTimeout(60);
    Address[] addressArray = new Address[]{new Address("127.0.0.1", 5672)};
    ExecutorService es = Executors.newFixedThreadPool(200, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("rabbitMQ-thread-" + thread.getId());
            return thread;
        }
    });
    Connection conn = factory.newConnection(es, addressArray);

    简单的producer:

    Channel channel = conn.createChannel();
    channel.basicPublish("someExChange", "someQueue", true, MessageProperties.PERSISTENT_TEXT_PLAIN, "Hello, world!".getBytes());

    每个线程中consumer可以如下面的实例代码:

    final Thread currentThread = Thread.currentThread();
    try {
        final Channel channel = conn.createChannel();
        channel.basicConsume("someQueue", false, "someConsumerTag",
                new DefaultConsumer(channel) {
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope,
                            AMQP.BasicProperties properties, byte[] body) throws IOException {
                        String routingKey = envelope.getRoutingKey();
                        String contentType = properties.getContentType();
                        long deliveryTag = envelope.getDeliveryTag();
                        String message = new String(body, "UTF-8");
                        logger.info("threadName={}, routingKey={}, contentType={}, deliveryTag={}, message={}",
                                currentThread.getName(), routingKey, contentType, deliveryTag, message);
                        // 任务处理开始
                        // ...
                        // 任务处理结束
                        channel.basicAck(deliveryTag, false);
                    }
                });
    } catch (IOException e) {
        logger.error("发生错误", e);
    }

    4.2 Kafka

    Kafka的设计是用于顺序存储日志,通过这种设计,可以变相的用于有序队列,这种有序队列可以用于有序任务。定义一个有20各Partition的Topic,在集群中的每个实例中,启动5个线程作为consumer读取。(为了有效利用资源,Partition的数量要大于等于consumer线程数,这样不会导致有些线程空闲,白白耗费资源)。

    为了保证某一实例死亡后,其他实例可以继续上个实例未完成的任务,需要在每个任务消息处理结束后,调用ConsumerConnector.commitOffsets(true)来修改偏移量。这种即上面说的位移的方式。

    在kafka中有一种可变的使用方式,可以是任务有序或无序:

    • 有序:通过producer向kafka写数据的时候,设置一个key(kafka通过对key做hash,将数据写入对应partition中),如果设置的key固定,则partition固定,读取的consumer即相对固定(说相对是因为consumer会隔一段时间做负载均衡,所以可能会切换consumer)。在这种方式中,任务是有序执行的。缺点就是,集群中只会有一个实例能够获得读取数据的权利,其他实例都在等待。只有当这个实例死亡,才会有其他实例获得权利,继续上个实例未尽的事业。
    • 无序:在通过producer写数据的时候,可以将key中加一个变化的值,使数据均匀的分布在不同的partition中,这样不同的实例的consumer就都可以读取数据了。

    producer代码实例(示例代码为有序方式,无序方式只需要根据实际情况修改job-key即可):

    import static org.apache.kafka.clients.producer.ProducerConfig.*;
    
    Properties properties = new Properties();
    properties.put(BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
    properties.put(ACKS_CONFIG, "all");// 0, 1, all
    properties.put(BUFFER_MEMORY_CONFIG, "33554432");
    properties.put(COMPRESSION_TYPE_CONFIG, "none");// none, gzip, snappy
    properties.put(RETRIES_CONFIG, "0");
    properties.put(BATCH_SIZE_CONFIG, "16384");
    properties.put(CLIENT_ID_CONFIG, "someClientId");
    properties.put(LINGER_MS_CONFIG, "0");
    properties.put(MAX_REQUEST_SIZE_CONFIG, "1048576");
    properties.put(RECEIVE_BUFFER_CONFIG, "32768");
    properties.put(SEND_BUFFER_CONFIG, "131072");
    properties.put(TIMEOUT_CONFIG, "30000");
    properties.put(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    properties.put(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
    
    ProducerRecord<String, String> topic = new ProducerRecord<>("mq-job-topic", "job-key", "{id:1}");
    kafkaProducer.send(topic, new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception == null) {
                logger.info("topic={}, partition={}, offset={}", metadata.topic(), metadata.partition(), metadata.offset());
            } else {
                logger.error("producer发送消息失败", exception);
            }
        }
    });
    
    kafkaProducer.close();

    consumer代码实例:

    Properties properties = new Properties();
    properties.put("zookeeper.connect", "127.0.0.1:2181/kafka");
    properties.put("fetch.message.max.bytes", "1048576");
    properties.put("group.id", "someGroupId");
    properties.put("auto.commit.enable", "false");
    properties.put("auto.offset.reset", "largest");// smallest, largest
    
    final ConsumerConnector connector = new KafkaConsumerFactory(properties).build();
    Map<String, Integer> topicCountMap = new HashMap<>();
    topicCountMap.put("mq-job-topic", 10);
    Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = connector.createMessageStreams(topicCountMap);
    List<KafkaStream<byte[], byte[]>> kafkaStreams = messageStreams.get("mq-job-topic");
    ExecutorService executorService = Executors.newFixedThreadPool(10, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(r);
            t.setDaemon(true);
            return t;
        }
    });
    for (final KafkaStream<byte[], byte[]> kafkaStream : kafkaStreams) {
        executorService.submit(new Runnable() {
            @Override
            public void run() {
                for (MessageAndMetadata<byte[], byte[]> messageAndMetadata : kafkaStream) {
                    try {
                        String key = new String(messageAndMetadata.key(), "UTF-8");
                        String message = new String(messageAndMetadata.message(), "UTF-8");
                        logger.info("message={}, key={}", message, key);
                        // 任务处理开始
                        // ...
                        // 任务处理结束
                        connector.commitOffsets(true);
                    } catch (Exception e) {
                        logger.error("发生异常", e);
                    }
                }
            }
        }, null);
    }

    5 写在最后

    虽然没有在项目中确实的使用这种解决方案,但是已经通过demo进行了技术验证。另外,分布式队列可以根据不同的需求选择RabbitMQ(任务无序)或Kafka(任务有序、无序),当然绝不限于这两种,还可以有很多其他的选择。

    展开全文
  • 多进程实现消息队列 所谓消息队列,个人理解就是一个管道或者容器,用来存放生产者生产的消息,而消费者就从管道中取消息消费。这里提到的有三个对象:生产者、消费者、容器/管道/中间人,其中生产者和消费者是两个...

    多进程实现消息队列

    所谓消息队列,个人理解就是一个管道或者容器,用来存放生产者生产的消息,而消费者就从管道中取消息消费。这里提到的有三个对象:生产者、消费者、容器/管道/中间人,其中生产者和消费者是两个独立的进程,即它们不能相互通信(进程间不共享全局变量)。所以我们要借助一个中间人来帮我们。多进程实现消息队列原理就是创建两个子进程,让它们分别充当生产者和消费者。这里借助multiprocessing模块中的Queue和Process。

    from multiprocessing import Queue, Process
    
    def producer(q):
    	 for i in range(5):
    	 	q.put(i)
    	 	print('put {0} in queue'.format(i))
    
    def customer():
    	while True:
    		res = q.get()
    		print('get {0} from queue'.format(res))
    
    def main():
    	q = Queue()
    	sub_process1 = Process(target=producer, args=(q,))
    	sub_process2 = Process(target=customer)
    	
    	sub_process1.start() 
    	sub_process2.start() 
    	#由于进程的执行是无序的,使用join()方法,让sub_process1结束后在执行sub_process2
    	sub_process1.join()
    	#终结死循环
    	sub_process2.terminate()
    
    if __name__ = 'main':
    	main()
    

    执行结果
    put 1 in queue
    put 2 in queue
    put 3 in queue
    put 4 in queue
    get 1 from queue
    get 2 from queue
    get 3 from queue
    get 4 from queue

    基于Redis实现消息队列

    celery异步分布式
    Celery是一个python开发的异步分布式任务调度模块。
    Celery本身并不提供消息服务,使用第三方服务,也就是borker来传递任务,目前支持RabbitMQredis数据库等。

    Redis实现消息队列

    连接url的格式为:
    redis://password@hostname:port/db_number
    指定消息队列的位置
    broker_url = “redis://127.0.0.1/10”

    启动Celery服务
    $ cd ~/projects/md_project/md_mall
    $ celery -A celery_tasks.main worker -l info
    -A指对应的应用程序, 其参数是项目中 Celery实例的位置。
    worker指这里要启动的worker。
    -l指日志等级,比如info等级。

    补充celery worker的工作模式

    默认是进程池方式,进程数以当前机器的CPU核数为参考,每个CPU开四个进程。
    如何自己指定进程数:celery worker -A proj --concurrency=4
    如何改变进程池方式为协程方式:celery worker -A proj --concurrency=1000 -P eventlet -c 1000

    安装eventlet模块

    $ pip install eventlet

    启用 Eventlet 池

    $ celery -A celery_tasks.main worker -l info -P eventlet -c 1000

    展开全文
  • js 任务队列

    2020-09-29 09:46:20
    单线程就意味着,所有任务需要排队,前一个任务结束,才会执行后一个任务。这样所导致的问题是: 如果 JS 执行的时间过长,这样就会造成页面的渲染不连贯,导致页面渲染加载阻塞的感觉。 为了解决
  • 任务队列简单实现

    千次阅读 2019-03-27 19:18:02
    实用场景: 例子1: 例子2: 生产者-----消费者 之间 C/C++ .h #ifndef __TASK_QUEUE_H__ #define __TASK_QUEUE_H__ ... // 任务处理函数 void* param; // 附加参数 }Task_t; ty...
  • 任务队列

    2019-05-16 23:45:00
    1.linux本地执行任务队列 2.java执行pl 队列 vector list 子任务队列 转载于:https://www.cnblogs.com/hshy/p/10878833.html
  • using System; using System.Collections.Concurrent; using System.Threading; using System.Threading.Tasks;... /// 任务队列 /// </summary> public class TaskQueue { #region 构造函数 ///
  • 1. 事件轮询(Event Loop) js实现异步的具体解决方案 ...EXP4:微任务队列中创建的宏任务 总结 这篇博文仅为个人理解,文章内提供一些更加权威的参考,如有片面及错误,欢迎指正 1. 事件轮询...
  • Java/Android中的优先级任务队列的实践

    万次阅读 多人点赞 2017-05-12 22:18:32
    本篇文章适用于Java和Android开发者,会从实现一个最简单的队列过渡到实现一个带有优先级的队列,使用生活中最常见的的例子结合讲解,保准你可以掌握基本的队列原理。
  • 事件循环 JavaScript 语言的一大特点就是单线程,也就是说,同一个时间只能做一件事。为了协调事件、用户交互、脚本、UI 渲染和网络处理等行为,防止主线程的不阻塞,Event Loop 的方案应用而生。...任务队列
  • 事件循环与任务队列 是JS中比较重要的两个概念,在ES6中,清楚的区分宏观任务和微观任务队列才能解释Promise的一些表现。 js是单线程语言,对于异步操作只能先把它放在一边,按照某种规则按先后顺序放进一个容器...
  • java实现的任务队列

    千次阅读 2015-02-13 12:05:46
    最近由于工作需要,自己完成了一个任务队列。实现了基本功能,且用法也很简单。 示例程序如下 [code="java"] package com.tone.example; import org.junit.After; import org.junit.Before; ...
  • 用标准C++实现任务队列

    千次阅读 2018-11-22 13:47:59
    下面介绍一个简单的任务队列,查看完整代码。 在实现任务队列前需要定义一个接口与一个工具类 任务接口:子类实现接口的run方法来处理具体任务。 自旋锁类:用于保护任务队列的并发访问(用C++11原子操作实现)。...
  • java 任务队列

    千次阅读 2016-12-29 17:30:59
    Java任务队列在不断的学习中需要我们掌握很多相关的技术信息。 首先,下面我们就看看如何应用TaskQueue的具体问题。   Java任务队列需要我们详细的学习, 当然我们在不断的使用中还需要注意相关信息的
  • Qt C++子线程中执行任务队列

    千次阅读 2019-04-02 18:41:49
    概述 在项目中经常会遇到这种情况,当要执行一些耗时操作的时候...所以,这里来介绍一下该场景的框架设计,如何在子线程中执行耗时任务队列。 示例 上面的描述可能不是很好理解,接下来看一个实际的用例。 假如,我...
  • C++任务队列与多线程

    千次阅读 2017-07-30 23:13:21
    很多场合之所以使用C++,一方面是由于C++编译后的native code的高效性能,另一方面是由于C++优秀的并发能力。并行方式有多进程 和多线程之分,本章暂且只讨论多线程,多进程方面的知识会在其他章节具体讨论。...
  • Qt QTread 背景知识 1、moveToThread QObject worker; worker.moveToThread(_thread); 这个 worker 不能指定parent,否则 moveToThread 会失败。 ...同样,如果 worker moveToThread 后,不能设置非同一线程下的 ...
  • java多线程任务队列模型

    千次阅读 2017-09-10 21:12:35
    此篇文章将从任务队列的设计;任务调度的方式(串行和并行)。代码很简单,主要是设计的思想。任务队列final class PendingPostQueue { // 含有头、尾指针的链表结构实现队列 private PendingPost head; private ...
  • 分布式后台任务队列模拟(Golang)

    千次阅读 2015-03-24 14:12:50
    最近研究了下gowoker,这东西代码少而精,Golang真是很适合实现这类东西。 我去掉参数配置,JSON,Redis这些东西,用goworker的方式做了个最简单的实现。  实现如下功能:  1. worker向JobServer注册可执行... 2.... 3....

空空如也

1 2 3 4 5 ... 20
收藏数 388,439
精华内容 155,375
关键字:

任务队列