精华内容
下载资源
问答
  • 加入task超时机制的思路与此有关,想法是通过task对象本身传入一个超时时限,这样使用者可以根据task的内容判断执行多久还没有返回即可认为任务已经失败,可以终止执行进程。在子进程内部可以取到这个时限的值,并...

    (本文基于Ansible 2.7)
    在使用ansible执行运维作业的过程中经常会遇到某些目标服务器由于种种五花八门的原因失去响应,尤其是在作业在远端主机执行过程中失去响应的情况,由于本地的子进程无法收到运行结果,可能卡上几天都不退出。例如我们曾经遇到过的,需要在大约15000+虚拟机上执行批量任务,其中两台虚拟机在建立ssh连接时还正常,但随后内存耗尽,远端任务失去响应,这个批量作业最后只能通过查找并强行终止子进程的方式退出。
    Ansible本身的超时机制(如PERSISTENT_COMMAND_TIMEOUT等),对这种情况并不生效,因此我萌生了加入一些超时机制的想法。
    Ansible 源码解析:forks并发机制的实现这篇文章中,我们提到StrategyBase类的_queue_task方法(lib/ansible/plugins/strategy/_init_.py,279-336行)用来启动task的执行子进程,不过并没有深入讨论子进程本身的内容。
    加入task超时机制的思路与此有关,想法是通过task对象本身传入一个超时时限,这样使用者可以根据task的内容判断执行多久还没有返回即可认为任务已经失败,可以终止执行进程。在子进程内部可以取到这个时限的值,并使用计时器触发结束当前进程。同时,我们不希望过多干涉Ansible已有的进程调度机制和结果处理机制。
    StrategyBase使用WorkerProcess类(lib/ansible/executor/process/worker.py, 继承自multiprocessing.Process)来启动工作子进程,最初我们尝试用在WorkerProcess类中加入计时器,任务执行超时时直接终止WorkerProcess进程的运行这种简单粗暴的方法来实现超时终止任务,但后来发现由于WorkerProcess与父进程间通过multiprocessing.Queue来进行通信,强行终止WorkerProcess可能导致死锁,故改而采取设法在TaskExecutor.run()方法中抛出异常来实现目的。代价则是不得不引入新的依赖psutil来清理WorkerProcess产生的子进程(有可能成为孤儿进程)

    1. 超时时限的传递: 通过task
      做法跟Ansible开发实战:在结果输出Action Plugin的名称中提到的一样,直接在Task类中加入FieldAttribute
    _task_timeout = FieldAttribute(isa='int', default=0)
    
    1. 在lib/ansible/errors/_init_.py中,新增一个错误类 AnsibleTaskTimeoutError
    class AnsibleTaskTimeoutError(AnsibleRuntimeError):
        '''an ansible task running lasts beyond the limit given'''
        pass
    
    1. 修改TaskExecutor类,新增 _task_timeout 字段,并通过__init__()传递值:
        def __init__(self, host, task, job_vars, play_context, new_stdin, loader, shared_loader_obj, final_q, task_start_time, task_timeout=0):
            self._host = host
            self._task = task
            self._job_vars = job_vars
            self._play_context = play_context
            self._new_stdin = new_stdin
            self._loader = loader
            self._shared_loader_obj = shared_loader_obj
            self._connection = None
            self._final_q = final_q
            self._start_time = task_start_time
            self._loop_eval_error = None
            self._task_timeout = task_timeout
    
            self._task.squash()
    
    1. 为TaskExecutor类加入一个用于结束当前task的方法exec_timeout_handler,用于处理SIGALRM信号,抛出AnsibleTaskTimeoutError
        def exec_timeout_handler(self,signal,frame):
            if signal == 14:
                display.debug('Task %s: execution timed out!' % self._task._uuid)
                raise AnsibleTaskTimeoutError 
    
    1. 修改TaskExecutor.run(),首先在方法体开头加入以下代码:
            if self._task_timeout is not None and self._task_timeout > 0:
                signal.signal(signal.SIGALRM, self.exec_timeout_handler)
                signal.alarm(self._task_timeout)
    

    这样在任务运行达到超时时间限制的时候,TaskExecutor类就会抛出异常。
    在TaskExecutor.run()中增加新的except语句来处理AnsibleTaskTimeoutError:

            except AnsibleTaskTimeoutError as e:
                return dict(failed=True, task_timeout=True, msg='Task execution timed out, limit is: %s second(s)' % self._task_timeout)  
    

    并在finally语句块中加入以下代码来清理子进程:

                    import psutil
                    for p in psutil.Process(os.getpid()).children():
                        if p.is_running():
                            p.terminate()
                            display.debug("task execution subprocess: %d terminated" % p.pid)
    

    这里如果求保险的话其实可以写成children(recursive=True),然后再通过psutil.wait_procs来赶尽杀绝。但考虑到此时整个play尚未结束,没必要进行彻底的进程清理,于是将这部分放到了TaskQueueManager._cleanup_processes里,这块有机会再写

    1. 最后WorkerProcess中构建TaskExecutor对象时传入task_timeout值:
                executor_result = TaskExecutor(
                    self._host,
                    self._task,
                    self._task_vars,
                    self._play_context,
                    self._new_stdin,
                    self._loader,
                    self._shared_loader_obj,
                    self._final_q,
                    self._task.task_timeout
                ).run()
    
    展开全文
  • future task 分析

    2020-11-05 18:45:31
    future task 分析 类 实现RunnableFuture 接口, RunnableFuture继承 Runnable 和 Future 状态 0:NEW 初始状态 1:COMPLETING 任务已经执行完(正常或者异常),准备赋值结果 2 :NORMAL 任务已经正常执行完,并已将...

    future task 分析

    实现RunnableFuture 接口, RunnableFuture继承 Runnable 和 Future

    状态

    0:NEW 初始状态
    1:COMPLETING 任务已经执行完(正常或者异常),准备赋值结果
    2 :NORMAL 任务已经正常执行完,并已将任务返回值赋值到结果
    3: EXCEPTIONAL 任务执行失败,并将异常赋值到结果
    4 :CANCELLED 取消
    5:INTERRUPTING 准备尝试中断执行任务的线程
    6:INTERRUPTED 对执行任务的线程进行中断(未必中断到)

    NEW --run-> COMPLETING --set–> normal
    NEW -run-> COMPLETING --set–> EXCEPTIONAL
    NEW --cancel(false)–> CANCELLED
    NEW --cancel(true)–> INTERRUPTING --interrupt–> INTERRUPTED

    内部变量

    1、Callable callable
    内部封装的Callable对象。如果通过构造函数传的是Runnable对象,FutureTask会通过调用Executors#callable,把Runnable对象封装成一个callable。
    2、Object outcome
    用于保存计算结果或者异常信息。
    3、volatile Thread runner
    用来运行callable的线程。
    4、volatile WaitNode waiters
    FutureTask中用了[Trieber Stack] (https://segmentfault.com/a/1190000012463330)来保存等待的线程。
    WaitNode中有两个变量 1.nextWaitNode. 2.thread 等待的线程

    run

    1、 判断状态是否为NEW,并cas设置runner(callable),如果为false则返回。
    2、执行如果有异常,则setException -> (cas NEW和 COMPLETING, 成功则设置outcome 为异常t,然后设置状态为EXCEPTIONAL , 然后 finishCompletion -> (设置waiters的栈顶为空,然后唤醒waiters所有节点))
    3、执行成功,set逻辑同上,依次设置为COMPLETING和NORMAL
    4、finally, 设置runner为空,如果状态是INTERRUPTING,yield线程,直到不为INTERRUPTING。

    finishCompletion 唤醒线程在 setResult, setException,cancel的时候会调用。

    public void run() {
        /* 
         * state为NEW且对runner变量CAS成功。
         * 对state的判断写在前面,是一种优化。
         */
        if (state != NEW ||
                !UNSAFE.compareAndSwapObject(this, runnerOffset,
                    null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                /*
                 * 是否成功运行。
                 * 之所以用了这样一个标志位,而不是把set方法写在try中call调用的后一句,
                 * 是为了不想捕获set方法出现的异常。
                 *
                 * 举例来说,子类覆盖了FutureTask的done方法,
                 * set -> finishCompletion -> done会抛出异常,
                 * 然而实际上提交的任务是有正常的结果的。
                 */
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            /* 
             * 
             * 要清楚,即便在runner被清为null后,仍然有可能有线程会进入到run方法的外层try块。
             * 举例:线程A和B都在执行第一行的if语句读到state == NEW,线程A成功cas了runner,并执行到此处。
             *       在此过程中线程B都没拿到CPU时间片。此时线程B一旦拿到时间片就能进到外层try块。
             *
             * 为了避免线程B重复执行任务,必须在set/setException方法被调用,才能把runner清为null。
             * 这时候其他线程即便进入到了外层try块,也一定能够读到state != NEW,从而避免任务重复执行。
             */
            runner = null;
            /*
             * 因为任务执行过程中由于cancel方法的调用,状态为INTERRUPTING,
             * 令当前线程等待INTERRUPTING状态变为INTERRUPTED。
             * 这是为了不想让中断操作逃逸出run方法以至于线程在执行后续操作时被中断。
             */
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
    
    protected void set(V v) {
        // 通过CAS状态来确认计算没有被取消,结果也没有被设置过。
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }
    
    protected void setException(Throwable t) {
        // 通过CAS状态来确认计算没有被取消,结果也没有被设置过。
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }
    
    private void finishCompletion() {
        for (WaitNode q; (q = waiters) != null;) {
            // 必须将栈顶CAS为null,否则重读栈顶并重试。
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                // 遍历并唤醒栈中节点对应的线程。
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    // 将next域置为null,这样对GC友好。
                    q.next = null; 
                    q = next;
                }
                break;
            }
        }
    
        /*
         * done方法是暴露给子类的一个钩子方法。
         *
         * 这个方法在ExecutorCompletionService.QueueingFuture中的override实现是把结果加到阻塞队列里。
         * CompletionService谁用谁知道,奥秘全在这。
         */
        done();
    
        /* 
         * callable置为null主要为了减少内存开销,
         * 更多可以了解JVM memory footprint相关资料。
         */
        callable = null;
    }
    
    private void handlePossibleCancellationInterrupt(int s) {
        /*
         * 这里的主要目的就是等调用cancel方法的线程完成中断。
         *
         * 以防止中断操作逃逸出run或者runAndReset方法,影响后续操作。
         *
         * 实际上,当前调用cancel方法的线程不一定能够中断到本线程。
         * 有可能cancel方法里读到runner是null,甚至有可能是其它并发调用run/runAndReset方法的线程。
         * 但是也没办法判断另一个线程在cancel方法中读到的runner到底是什么,所以索性自旋让出CPU时间片也没事。
         */
        if (s == INTERRUPTING)
            while (state == INTERRUPTING)
                Thread.yield();
    
        /*
         * 下面的代码在JDK8中已经被注释掉了。
         * 因为在原来的设计中,是想把cancel方法设置的中断位给清除的。
         * 但是实际上也应该允许调用FutureTask的线程使用中断作为线程间通信的机制,
         * 这里没办法区分中断位到底是不是来自cancel方法的调用。
         */
    
        // assert state == INTERRUPTED;
    
        // We want to clear any interrupt we may have received from
        // cancel(true).  However, it is permissible to use interrupts
        // as an independent mechanism for a task to communicate with
        // its caller, and there is no way to clear only the
        // cancellation interrupt.
        //
        // Thread.interrupted();
    }
    
    

    get

    1、 如果状态是NEW 和 COMPLETING则执行awaitDone -> (1, 当前线程被中断则抛出中断异常,并移除waiters。2. 不是NEW和 COMPLETING 则设置等待节点的线程为空,并返回状态。3. COMPLETING yield线程。4. 如果waitNode为null创建waitNode。5. 如果q未入队,q入队waiters。6. 如果超时,removeWaiters(设置参数node为空,移除栈中无效节点),并返回状态,如果未超时,则park线程到超时时间。7. 如果其他则park线程)
    2、report(如果status是NORMAL则返回,如果是CANCELLED,INTERRUPTING,INTERRUPTED则抛出cancel异常)

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        // NEW或者COMPLETING。
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
    
    private int awaitDone(boolean timed, long nanos) throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }
    
            int s = state;
            // 完成赋值
            if (s > COMPLETING) {
                // 如果q已经被初始化了,为了GC需要清q.thread。
                if (q != null)
                    q.thread = null;
                return s;
            }
            // COMPLETING是一个很短暂的状态,调用Thread.yield期望让出时间片,之后重试循环。
            else if (s == COMPLETING)
                Thread.yield();
            // 初始化节点,重试一次循环。
            else if (q == null)
                q = new WaitNode();
            // queued记录是否已经入栈,此处准备将节点压栈。
            else if (!queued)
                /*
                 *  这是Treiber Stack算法入栈的逻辑。
                 *  Treiber Stack是一个基于CAS的无锁并发栈实现,
                 *  更多可以参考https://en.wikipedia.org/wiki/Treiber_Stack
                 */
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                        q.next = waiters, q);
            // 如果有时限,判断是否超时,未超时则park剩下的时间。
            else if (timed) {
                nanos = deadline - System.nanoTime();
                // 超时,移除栈中节点。
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }
    
    /**
     * 清理用于保存等待线程栈里的节点。
     * 所谓节点无效就是内部的thread为null,
     * 一般有以下几种情况:
     * 1. 节点调用get超时。
     * 2. 节点调用get中断。
     * 3. 节点调用get拿到task的状态值(> COMPLETING)。
     *
     * 此方法干了两件事情:
     * 1. 置标记参数node的thread为null。
     * 2. 清理栈中的无效节点。
     *
     * 如果在遍历过程中发现有竞争则重新遍历栈。
     */
    private void removeWaiter(WaitNode node) {
        if (node != null) {
            node.thread = null;
            retry:
            for (;;) {          // restart on removeWaiter race
                for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                    s = q.next;
                    // 如果当前节点仍有效,则置pred为当前节点,继续遍历。
                    if (q.thread != null)
                        pred = q;
                    /*
                     * 当前节点已无效且有前驱,则将前驱的后继置为当前节点的后继实现删除节点。
                     * 如果前驱节点已无效,则重新遍历waiters栈。
                     */
                    else if (pred != null) {
                        pred.next = s;
                        if (pred.thread == null)
                            continue retry;
                    }
                    /*
                     * 当前节点已无效,且当前节点没有前驱,则将栈顶置为当前节点的后继。
                     * 失败的话重新遍历waiters栈。
                     */
                    else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                q, s))
                        continue retry;
                }
                break;
            }
        }
    }
    
    /**
     * 导出结果。
     */
    private V report(int s) throws ExecutionException {
        Object x = outcome;
        // 正常执行完计算任务。
        if (s == NORMAL)
            return (V)x;
        // 取消。
        if (s >= CANCELLED)
            throw new CancellationException();
        // 执行计算任务时发生异常。
        throw new ExecutionException((Throwable)x);
    }
    

    cancel

    1、cas 比较new并设置 mayInterruptIfRunning ? INTERRUPTING : CANCELLED
    2、如果是mayInterruptIfRunning,interrupt runner线程,finally putOrderedInt(设置status 为INTERRUPTED )
    3、finally finishCompletion -> (设置waiters的栈顶为空,然后唤醒waiters所有节点)

    关于cancel
    interrupt 如果任务不处于阻塞状态不会中断线程,只会标记interrupted为true。处于阻塞则会抛出interrupt异常,

    FutureTask不为New状态,那么直接返回false。
    若status状态为New。
    2.1参数mayInterruptIfRunning=false,

    1. 执行线程:不会被中断,并且任务的状态为cancelled(FutureTask的终态)
    2. 获取线程:get立刻方法抛出异常。
      2.2 参数mayInterruptIfRunning=true
    3. 执行线程:若线程为Running状态,则会变更isInterrupted标志位。若线程为Blocking状态,则会抛出异常,终止线程
    4. 获取线程:参数mayInterruptIfRunning=true,status状态会变为interrupting,FutrueTask中的线程会调用interrupt()方法,接着会变更为interrupted(FutureTask的终态)。get立刻方法抛出异常。
    public boolean cancel(boolean mayInterruptIfRunning) {
        /*
         * 在状态还为NEW的时候,根据参数中的是否允许传递,
         * 将状态流转到INTERRUPTING或者CANCELLED。
         */
        if (!(state == NEW &&
                    UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                        mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {
            if (mayInterruptIfRunning) {
                try {
                    // 中断runner线程。
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally {
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            // 该方法上文已经分析过。
            finishCompletion();
        }
        return true;
    }
    

    FutureTask存在的问题

    至此已经将FutureTask的源码解读分析完毕,在读过源码之后,我个人认为JDK8u111的FutureTask源码存在两个问题,目前还需要进一步确认。

    3.1 cancel(true)调用interrupt的线程对象

    FutureTask的run方法的进入条件是

    state == NEW && UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
    假设有两个线程A和B调用run方法,线程C调用cancel方法。
    时刻1: 线程A和B同时读到state == NEW。
    时刻2: 线程A成功对runner变量CAS进入run方法主体。
    时刻3: 线程C调用cancel方法,成功将状态CAS为CANCELLED。
    时刻4: 线程A调用finally中的runner = null。
    时刻5: 线程B开始执行run方法第一句if的后半句,成功将runner变量CAS到线程B。
    时刻6: 线程C读到runner为线程B,准备对线程B进行interrupt()
    时刻7: 线程A调用handlePossibleCancellationInterrupt等待状态从INTERRUPTING流转至INTERRUPTED。
    时刻8: 线程B被中断。

    这里的问题是,调用cancel方法的线程C中断的是实质上没有对callable进行call调用的线程B,而线程A还试图防止中断操作逃逸出run方法。
    这个东西在Future的JavaDoc上说了很含糊,如下所示:

    • @param mayInterruptIfRunning {@code true} if the thread executing this
      • task should be interrupted; otherwise, in-progress tasks are allowed
      • to complete
        上面的情况到底线程A和B哪个算是the thread executing this task说不清。

    3.2 内存占用问题

    通过阅读源码,发现FutureTask还是存在一个隐形的内存占用问题的,或者按照《Effective Java》上说的应该叫无意识的对象保留。
    这个问题就是在FutureTask计算完成后,可能内部用于保存等待线程的栈留有一些已经无用的等待节点。

    时刻1: 某线程调用get,已经入等待栈,此时waiters为该线程对应节点。
    时刻2: 有大量线程通过调用get试图获取计算结果,get -> awaitDone方法中,经过两轮循环都读到状态是NEW的话,此时它们节点已经被初始化过了,但还没开始入队。
    时刻3: 有线程调用run方法,通过run -> set -> finishCompletion,将waiters置为null,并唤醒了已经入栈的那个线程。
    时刻4: 调用awaitDone方法的那些线程再试图入队的话,后面循环会发现状态已经是NORMAL了,但是waiters栈此时不为空,而且再也没法被清掉了。

    这样下来,该FutureTask内部可能会留有一些的无效节点。具体会留多少实际上取决于那个瞬间有多少线程准备执行以及多少能够成功CAS。

    queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q);

    https://www.cnblogs.com/micrari/p/7374513.html

    展开全文
  • 1. 在线程中执行任务 1.1 串行的执行任务 ...6.3.7 为任务设置时限 Future的get支持timeout。 6.3.8 批量提交任务 使用invokeAll方法提交 List ,返回一个 List

    1. 在线程中执行任务

    1.1 串行的执行任务

    这是最经典的一个最简单的Socket server的例子,服务器的资源利用率非常低,因为单线程在等待I/O操作完成时,CPU处于空闲状态。从而阻塞了当前请求的延迟,还彻底阻止了其他等待中的请求被处理。

    public class SingleThreadWebServer {
        public static void main(String[] args) throws IOException {
            ServerSocket socket = new ServerSocket(80);
            while (true) {
                Socket connection = socket.accept();
                handleRequest(connection);
            }
        }
    
        private static void handleRequest(Socket connection) {
            // request-handling logic here
        }
    }
    

    1.2 显式地为任务创建线程

    任务处理从主线程中分离出来,主循环可以快速等待下一个连接,提高响应性。同时任务可以并行处理了,吞吐量也提高了。

    public class ThreadPerTaskWebServer {
        public static void main(String[] args) throws IOException {
            ServerSocket socket = new ServerSocket(80);
            while (true) {
                final Socket connection = socket.accept();
                Runnable task = new Runnable() {
                    public void run() {
                        handleRequest(connection);
                    }
                };
                new Thread(task).start();
            }
        }
        private static void handleRequest(Socket connection) {
            // request-handling logic here
        }
    }
    

    1.3 无限制创建线程的不足

    • 线程的生命周期开销非常高
    • 资源消耗。大量的空闲线程占用内存,给GC带来压力,同时线程数量过多,竞争CPU资源开销太大。
      稳定性。容易引起GC问题,甚至OOM

    2 Executor框架

    任务就是一组逻辑工作单元(unit of work),而线程则是使任务异步执行的机制。

    Executor接口,是代替Thread来做异步执行的入口,接口虽然简单,却为非常灵活强大的异步任务执行框架提供了基础。
    提供了一种标准的方法将任务的提交与执行过程解耦,并用Runnable(无返回时)或者Callable(有返回值)表示任务。

    Executor基于生产者-消费者模式
    提交任务/执行任务分别相当于生产者/消费者,通常是最简单的实现生产者-消费者设计的方式了

    2.1 基于Executor改造后的样例如下

    img_764e49e2bfb5ee30bdf8e6ea9ffc1a8d.png

    将请求处理任务的提交与任务的实际执行解耦,并且只需采用另一种不同的Executor实现,就可以改变服务器的行为,其影响远远小于修改任务提交方式带来的影响

    2.2 执行策略

    这一节主要介绍做一个Executor框架需要靠那些点?

    在什么线程中执行任务?
    任务按照什么顺序执行?FIFO/LIFO/优先级
    有多少个任务可以并发执行?
    队列中允许多少个任务等待?
    如果系统过载了要拒绝一个任务,那么选择拒绝哪一个?如何通知客户端任务被拒绝了?
    在执行任务过程中能不能有些别的动作before/after或者回调?
    各种执行策略都是一种资源管理工具,最佳的策略取决于可用的计算资源以及对服务质量的要求。

    因此每当看到
    new Thread(runnable).start();
    并且希望有一种灵活的执行策略的时候,请考虑使用Executor来代替

    2.3 线程池

    在线程池中执行任务比为每个任务分配一个线程优势明显:

    重用线程,减少开销。
    延迟低,线程是等待任务到达。
    最大化挖掘系统资源以及保证稳定性。CPU忙碌但是又不会出现线程竞争资源而耗尽内存或者失败的情况。
    Executors可以看做一个工厂,提供如下几种Executor的创建:

    newCachedThreadPool
    newFixedThreadPool
    newSingleThreadExecutor
    newScheduledThreadPool
    

    2.4 Executor的生命周期

    为解决执行服务的生命周期问题,Executor扩展了ExecutorService接口,添加了一些用于生命周期管理的方法


    img_b7e6f3ad08789e15eeec1426911d613c.png

    一个优雅停止的例子:


    img_00cbcda01d8b710c7267b055c78a651c.png

    增加生命周期扩展Web服务器的功能

    • 调用stop
    • 客户端请求形式

    关闭

    2.5 延迟任务与周期任务

    使用Timer的弊端在于

    • 如果某个任务执行时间过长,那么将破坏其他TimerTask的定时精确性(执行所有定时任务时只会创建一个线程),只支持基于绝对时间的调度机制,所以对系统时钟变化敏感
    • TimerTask抛出未检查异常后就会终止定时线程(不会捕获异常)

    更加合理的做法是使用ScheduledThreadPoolExecutor,只支持基于相对时间的调度
    它是DelayQueue的应用场景

    3 找出可利用的并行性

    3.1 携带结果的任务Callable和Future

    Executor框架支持Runnable,同时也支持Callable(它将返回一个值或者抛出一个异常)
    在Executor框架中,已提交但是尚未开始的任务可以取消,但是对于那些已经开始执行的任务,只有他们能响应中断时,才能取消。
    Future非常实用,他的API如下


    img_a48979fc962f8dc0a1c4935b80ab2568.png

    内部get的阻塞是靠LockSupport.park来做的,在任务完成后Executor回调finishCompletion方法会依次唤醒被阻塞的线程。

    ExecutorService的submit方法接受Runnable和Callable,返回一个Future。ThreadPoolExecutor框架留了一个口子,子类可以重写newTaskFor来决定创建什么Future的实现,默认是FutureTask类。

    3.2 示例:使用Future实现页面的渲染器

    img_3b90c94fe1e3492fa50084bc81f75ca3.png

    3.3 CompletionService: Executor与BlockingQueue

    计算完成后FutureTask会调用done方法,而CompletionService集成了FutureTask,对于计算完毕的结果直接放在自己维护的BlockingQueue里面,这样上层调用者就可以一个个take或者poll出来。


    img_0bacc8f12b604f855c2a476d3e260288.png

    3.3 示例:使用CompletionService提高渲染性能

    void renderPage(CharSequence source) {
            final List<ImageInfo> info = scanForImageInfo(source);
            CompletionService<ImageData> completionService =
                    new ExecutorCompletionService<ImageData>(executor);
            for (final ImageInfo imageInfo : info)
                completionService.submit(new Callable<ImageData>() {
                    public ImageData call() {
                        return imageInfo.downloadImage();
                    }
                });
    
            renderText(source);
    
            try {
                for (int t = 0, n = info.size(); t < n; t++) {
                    Future<ImageData> f = completionService.take();
                    ImageData imageData = f.get();
                    renderImage(imageData);
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (ExecutionException e) {
                throw launderThrowable(e.getCause());
            }
        }
    
    

    6.3.7 为任务设置时限

    Future的get支持timeout。

    6.3.8 批量提交任务

    使用invokeAll方法提交List<Callable>,返回一个List<Future>

    展开全文
  • 记一次Linux主机内存脏数据引发的NameNode故障,主机提示echo 0 > /proc/sys/kernel/hung_task_timeout_secs。内存脏数据是什么,如何调优。

    全文共10789字,阅读约需3min。

    如果你运维过一个大数据集群,你就能够明白内存对于集群主节点来说是一种稀缺资源,特别是集群越大,机器内存问题给集群带来的问题就越明显且越频繁。
    在我管理的这么多大数据集群中,该问题现象就层出不穷。下面对其中一次现象进行了总结。

    问题现象

    1. 一台主机无缘无故死机;
    2. 其上namenode服务无响应其发生准备切换。

    问题排查思路是:

    1. 接收到主机网络不通告警;
    2. 接收到其上namenode服务STOP告警;
    3. 查看备节点是否成功切换为主节点;
    4. 查看zkfc日志确定failover原因;
    5. 重启(如果需要)故障主机,备份日志,启动nn服务,查看主机日志确认主机故障原因。

    其实正确的处理问题路径是:重启主机->启动nn->查看日志定位原因。这里不对问题处理思路作深究,仅重点阐述本案中主机死机的原因。

    主机死机,一般是要查看/var/log/message日志,发现提示:echo 0 > /proc/sys/kernel/hung_task_timeout_secs.

    完整的日志如下:

    Mar 28 03:12:47 namenode01 collectd[1731]: write_graphite plugin: send to epc-graphite-server:2003 (tcp) failed with status 110 (Connection timed out)
    Mar 28 03:12:47 namenode01 collectd[1731]: Filter subsystem: Built-in target `write': Dispatching value to all write plugins failed with status -1.
    Mar 28 03:12:47 namenode01 collectd[1731]: Filter subsystem: Built-in target `write': Some write plugin is back to normal operation. `write' succeeded.
    Mar 28 03:14:31 namenode01 kernel: INFO: task kworker/15:2:22246 blocked for more than 120 seconds.
    Mar 28 03:14:31 namenode01 kernel: "echo 0 > /proc/sys/kernel/hung_task_timeout_secs" disables this message.
    Mar 28 03:14:31 namenode01 kernel: kworker/15:2    D ffff88208618bdd8     0 22246      2 0x00000080
    Mar 28 03:14:31 namenode01 kernel: ffff88208618bbf0 0000000000000046 ffff8828e3130000 ffff88208618bfd8
    Mar 28 03:14:31 namenode01 kernel: ffff88208618bfd8 ffff88208618bfd8 ffff8828e3130000 ffff88208618bd60
    Mar 28 03:14:31 namenode01 kernel: 7fffffffffffffff ffff88208618bd58 ffff8828e3130000 ffff88208618bdd8
    Mar 28 03:14:31 namenode01 kernel: Call Trace:
    Mar 28 03:14:31 namenode01 kernel: [<ffffffff816a94e9>] schedule+0x29/0x70
    Mar 28 03:14:31 namenode01 kernel: [<ffffffff816a6ff9>] schedule_timeout+0x239/0x2c0
    Mar 28 03:14:31 namenode01 kernel: [<ffffffff8105aeae>] ? physflat_send_IPI_mask+0xe/0x10
    Mar 28 03:14:31 namenode01 kernel: [<ffffffff81050b5c>] ? native_smp_send_reschedule+0x4c/0x70
    Mar 28 03:14:31 namenode01 kernel: [<ffffffff810c0548>] ? resched_curr+0xa8/0xc0
    Mar 28 03:14:31 namenode01 kernel: [<ffffffff810c12c8>] ? check_preempt_curr+0x78/0xa0
    Mar 28 03:14:31 namenode01 kernel: [<ffffffff810c1309>] ? ttwu_do_wakeup+0x19/0xd0
    Mar 28 03:14:31 namenode01 kernel: [<ffffffff816a989d>] wait_for_completion+0xfd/0x140
    Mar 28 03:14:31 namenode01 kernel: [<ffffffff810c4810>] ? wake_up_state+0x20/0x20
    Mar 28 03:14:31 namenode01 kernel: [<ffffffff810b07ea>] kthread_create_on_node+0xaa/0x140
    Mar 28 03:14:31 namenode01 kernel: [<ffffffff810a93c0>] ? manage_workers.isra.24+0x2a0/0x2a0
    Mar 28 03:14:31 namenode01 kernel: [<ffffffff810a8f8b>] create_worker+0xeb/0x200
    Mar 28 03:14:31 namenode01 kernel: [<ffffffff810a9216>] manage_workers.isra.24+0xf6/0x2a0
    Mar 28 03:14:31 namenode01 kernel: [<ffffffff810a9743>] worker_thread+0x383/0x3c0
    Mar 28 03:14:31 namenode01 kernel: [<ffffffff810a93c0>] ? manage_workers.isra.24+0x2a0/0x2a0
    Mar 28 03:14:31 namenode01 kernel: [<ffffffff810b098f>] kthread+0xcf/0xe0
    Mar 28 03:14:31 namenode01 kernel: [<ffffffff810b08c0>] ? insert_kthread_work+0x40/0x40
    Mar 28 03:14:31 namenode01 kernel: [<ffffffff816b4f58>] ret_from_fork+0x58/0x90
    Mar 28 03:14:31 namenode01 kernel: [<ffffffff810b08c0>] ? insert_kthread_work+0x40/0x40
    Mar 28 03:16:31 namenode01 kernel: INFO: task kworker/2:2:11678 blocked for more than 120 seconds.
    Mar 28 03:16:31 namenode01 kernel: "echo 0 > /proc/sys/kernel/hung_task_timeout_secs" disables this message.
    Mar 28 03:16:31 namenode01 kernel: kworker/2:2     D ffff88003922bdd8     0 11678      2 0x00000080
    Mar 28 03:16:31 namenode01 kernel: ffff88003922bbf0 0000000000000046 ffff881fffd86eb0 ffff88003922bfd8
    Mar 28 03:16:31 namenode01 kernel: ffff88003922bfd8 ffff88003922bfd8 ffff881fffd86eb0 ffff88003922bd60
    Mar 28 03:16:31 namenode01 kernel: 7fffffffffffffff ffff88003922bd58 ffff881fffd86eb0 ffff88003922bdd8
    Mar 28 03:16:31 namenode01 kernel: Call Trace:
    Mar 28 03:16:31 namenode01 kernel: [<ffffffff816a94e9>] schedule+0x29/0x70
    Mar 28 03:16:31 namenode01 kernel: [<ffffffff816a6ff9>] schedule_timeout+0x239/0x2c0
    Mar 28 03:16:31 namenode01 kernel: [<ffffffff810c8f18>] ? __enqueue_entity+0x78/0x80
    Mar 28 03:16:31 namenode01 kernel: [<ffffffff810cf90c>] ? enqueue_entity+0x26c/0xb60
    Mar 28 03:16:31 namenode01 kernel: [<ffffffff816a989d>] wait_for_completion+0xfd/0x140
    Mar 28 03:16:31 namenode01 kernel: [<ffffffff810c4810>] ? wake_up_state+0x20/0x20
    Mar 28 03:16:31 namenode01 kernel: [<ffffffff810b07ea>] kthread_create_on_node+0xaa/0x140
    Mar 28 03:16:31 namenode01 kernel: [<ffffffff810a93c0>] ? manage_workers.isra.24+0x2a0/0x2a0
    Mar 28 03:16:31 namenode01 kernel: [<ffffffff810a8f8b>] create_worker+0xeb/0x200
    Mar 28 03:16:31 namenode01 kernel: [<ffffffff810a9216>] manage_workers.isra.24+0xf6/0x2a0
    Mar 28 03:16:31 namenode01 kernel: [<ffffffff810a9743>] worker_thread+0x383/0x3c0
    Mar 28 03:16:31 namenode01 kernel: [<ffffffff810a93c0>] ? manage_workers.isra.24+0x2a0/0x2a0
    Mar 28 03:16:31 namenode01 kernel: [<ffffffff810b098f>] kthread+0xcf/0xe0
    Mar 28 03:16:31 namenode01 kernel: [<ffffffff810b08c0>] ? insert_kthread_work+0x40/0x40
    Mar 28 03:16:31 namenode01 kernel: [<ffffffff816b4f58>] ret_from_fork+0x58/0x90
    Mar 28 03:16:31 namenode01 kernel: [<ffffffff810b08c0>] ? insert_kthread_work+0x40/0x40
    Mar 28 03:16:31 namenode01 kernel: INFO: task kworker/15:2:22246 blocked for more than 120 seconds.
    Mar 28 03:16:31 namenode01 kernel: "echo 0 > /proc/sys/kernel/hung_task_timeout_secs" disables this message.
    Mar 28 03:16:31 namenode01 kernel: kworker/15:2    D ffff88208618bdd8     0 22246      2 0x00000080
    Mar 28 03:16:31 namenode01 kernel: ffff88208618bbf0 0000000000000046 ffff8828e3130000 ffff88208618bfd8
    Mar 28 03:16:31 namenode01 kernel: ffff88208618bfd8 ffff88208618bfd8 ffff8828e3130000 ffff88208618bd60
    Mar 28 03:16:31 namenode01 kernel: 7fffffffffffffff ffff88208618bd58 ffff8828e3130000 ffff88208618bdd8
    

    原因在于,默认情况下, Linux会最多使用40%的可用内存作为文件系统缓存。当超过这个阈值后,文件系统会把将缓存中的内存全部写入磁盘, 导致后续的IO请求都是同步的。将缓存写入磁盘时,有一个默认120秒的超时时间。

    出现上面的问题的原因是IO子系统的处理速度不够快,不能在120秒将缓存中的数据全部写入磁盘。IO系统响应缓慢,导致越来越多的请求堆积,最终系统内存全部被占用,导致系统失去响应。

    这个Linux延迟写机制带来的问题,并且在主机内存越大时,出现该问题的可能性更大。研发发现This is a know bug

    This is a know bug. By default Linux uses up to 40% of the available memory for file system caching. After this mark has been reached the file system flushes all outstanding data to disk causing all following IOs going synchronous. For flushing out this data to disk this there is a time limit of 120 seconds by default. In the case here the IO subsystem is not fast enough to flush the data withing 120 seconds. This especially happens on systems with a lof of memory.

    The problem is solved in later kernels and there is not “fix” from Oracle. I fixed this by lowering the mark for flushing the cache from 40% to 10% by setting “vm.dirty_ratio=10” in /etc/sysctl.conf. This setting does not influence overall database performance since you hopefully use Direct IO and bypass the file system cache completely.

    链接:nfo-task-blocked-for-more-than-120-seconds

    关于脏数据,有几个配置:

    vm.dirty_background_ratio是内存可以填充“脏数据”的百分比。这些“脏数据”在稍后是会写入磁盘的,pdflush/flush/kdmflush这些后台进程会稍后清理脏数据。举一个例子,我有32G内存,那么有3.2G的内存可以待着内存里,超过3.2G的话就会有后来进程来清理它。

    vm.dirty_ratio 是绝对的脏数据限制,内存里的脏数据百分比不能超过这个值,如果超过,将强制刷写到磁盘。如果脏数据超过这个数量,新的IO请求将会被阻挡,直到脏数据被写进磁盘。这是造成IO卡顿的重要原因,但这也是保证内存中不会存在过量脏数据的保护机制。

    vm.dirty_expire_centisecs 指定脏数据能存活的时间。在这里它的值是30秒。当 pdflush/flush/kdmflush 进行起来时,它会检查是否有数据超过这个时限,如果有则会把它异步地写到磁盘中。毕竟数据在内存里待太久也会有丢失风险。

    vm.dirty_writeback_centisecs 指定多长时间
    pdflush/flush/kdmflush 这些进程会起来一次。

    调优

    我们可以从以下思路进行调优:

    1. 减少脏数据的比例,避免刷写超时
    2. 减小脏数据在内存中的存放时间,避免积少成多

    修改相应参数

    临时修改

    # sysctl -w vm.dirty_ratio=10
    # sysctl -w vm.dirty_background_ratio=5
    # sysctl -p
    

    永久修改

    #vi /etc/sysctl.conf
    写入
    vm.dirty_background_ratio = 5
    vm.dirty_ratio = 10
    
    sysctl -p
    

    关于脏数据,这里简单说一下:

    脏数据 由于Linux内核实现的一种主要磁盘缓存的存在,也就是页高速缓存(cache)。页高速缓存的缓存作用,写操作实际上会被延迟。当页高速缓存中的数据比后台存储的数据更新时,那么该数据就被称做脏数据。

    参考


    在这里插入图片描述


    展开全文
  • CF558E A Simple Task

    2018-08-02 21:00:00
    题目大意: 给定一个长度不超过10^5的字符串(小写英文字母),和不超过5000个操作。 每个操作 L R K 表示给区间[L,R]的字符串排序,K=1为升序,K=0为降序。 最后输出最终的字符串 首先这么想想,对于一段区间...
  • 可以在上面do some task的地方放置你的耗时操作,例如 Activator.GetObject 或者 instance.dosomething() - 一定要用try-catch括起来哦。 需要注意ChannelServices.UnRegisterChannel 有一点需要注意 ...
  • 等待在时限内完成 超时后结束 ThreadPool.UnsafeQueueUserWorkItem() 1 非原生支持1 非原生支持 非原生支持3 不支持 ThreadPool.QueueUserWorkItem() 2.7 非原生支持1 非...
  • 单点时限:1000ms 内存限制:256MB 描述 给定 N 项任务的起至时间( S1, E1 ), ( S2, E2 ), ..., ( SN, EN ), 计算最少需要多少台机器才能按时完成所有任务。 同一时间一台机器上...
  • 硬实时系统的任务运行正确性与响应时限是紧密相关的,一旦超过时限将导致严重的后果,比如导弹控制系统、高铁自动驾驶系统等,都是需要严格的响应时限的。 软实时系统中,虽然也存在时限指标,但是如果输出响应超过...
  • public static void Invoke(Action method, int milliseconds) { Thread thdToKill = null; Action invokemethod = new Action(() => { ...
  • 时限:2000MS 内存:65536KB 64位IO格式:%lld & %llu 问题描述 Calculating the derivation of a polynomial is an easy task. Given a function f(x) , we use (f(x))' to denote its derivati
  • 多线程之任务时限

    2018-12-12 14:57:03
    有的时候我们希望在一定时间内...future.get方法提供了这样的参数,当结果可用时,它将立即返回,如果在指定时限内没有计算出结果,那么将抛出TimeoutException。当任务超时后,future也提供了一定的方法来取消任务...
  • 我们可以通过Futrue中的get方法解决这个问题: 上面的方法的执行逻辑是,当结果可用时,它将立即返回,如果在指定时限内没有计算出结果,那么将抛出TimeoutException. 2.在使用限时任务时要注意,当这些任务超时后...
  • 软件测试面试题整理

    千次阅读 多人点赞 2013-11-28 15:53:22
    软件测试面试题整理   01. 为什么要在一个团队中开展软件测试工作? ... 因为没有经过测试的软件很难在发布之前知道该软件的质量,就好比ISO质量认证一样,测试同样也需要质量的保证,这个时候就需要在团队中...
  • Linux系统出现hung_task_timeout_secs和blocked for more than 120 seconds的解决方法 Linux系统出现系统没有响应。 在/var/log/message日志中出现大量的“echo 0 > /proc/sys/kernel/hung_task_timeout_secs" ...
  • 等待在时限内完成 超时后结束 ThreadPool.UnsafeQueueUserWorkItem() 1 非原生支持 1 非原生支持 非原生支持 3 不支持 ThreadPool.QueueUserWorkItem() ...
  • hdu 3572 Task ScheduleDescription Our geometry princess XMM has stoped her study in computational geometry to concentrate on her newly opened factory. Her factory has introduced M new machines in ...
  • obsidian-plugin-todo-源码

    2021-05-15 01:42:57
    黑曜石TODO插件 黑曜石中基于文本的GTD。 特征 汇总保管库中所有未完成的待办事项,并在单个视图中列出它们 按类型划分“待办事项”(“今天”,“计划中”,“收件箱”和“某天/也许”) 通过添加标签#YYYY-DD-MM...
  • 思路:使用ExecutorService的invokeAll(time,timeUint)方法来设置执行时限,该方法返回一个List<Future<T>>,一旦返回后,即取消尚未完成的任务,然后再从list中读取future并调用future.get()方法来获取...
  • Android下affinities和任务(task) 本文参考了官方Dev Guide文档,简单介绍Android下的affinities和任务(task)。 Activity和Task task就好像是能包含很多activity的栈。 默认情况下,一个activity启动另外一个...
  • 题意:给出n个任务+m台机器,还有一个任务处理时限+开始时间+结束时间,一个时刻里一台机器只能处理一个任务,但是一个任务可以在不同机器处理,问能否处理完所有任务? 方法:最大流。这个题的建图算是经典,因为...
  • // newTask.setIdentityLinkCount(historyService.getHistoricIdentityLinksForTask(newTask.getId()).size()); CommandContextUtil.getTaskService().insertTask(newTask, true); // 置空实例部分信息 instance....
  • 工作流activiti中任务的过期时间

    千次阅读 2021-01-26 17:28:50
    这样在工作流中ACT_RU_TASK表中DUE_DATE_字段就会有时间数据 方式二:直接写时间,如下:它不能直接写正常的时间日期,无法解析。这个格式相当于一天后延后三个小时,距离格式说明下面有介绍 格式解析 R2/2015-...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 2,627
精华内容 1,050
关键字:

时限task