精华内容
下载资源
问答
  • 制造一个轮子线程池

    2019-12-12 10:24:20
    这次为了帮助自己深入理解线程池,决定手动写一个极简(陋)的线程池,顺便记录思考和造过程。 虽然不太可能和jdk自带的那么完美,但是该有的功能还是要有: - 新建线程池,有核心线程数和最大线程数,线程存活...

    很早之前就看过线程池源码(知道大概的运行原理),但是只是知道怎么用,并没有深究。这次为了帮助自己深入理解线程池,决定手动写一个极简(陋)的线程池,顺便记录思考和造轮过程。

    虽然不太可能和jdk自带的那么完美,但是该有的功能还是要有:

    - 新建线程池,有核心线程数和最大线程数,线程存活时间,队列

    - 在线程池加入线程,当前线程数不超过核心线程数就新建线程,超过核心放队列,队列满了再新建线程,达到最大线程

    - 全部线程运行完成后会保留核心线程数,支持线程存活时间

    - 立即关闭线程池

    - 优雅的关闭线程池

    1.新建一个轮子线程池类,就一个构造函数,把需要的参数都传进来

    2.用ThreadPoolExecutor的时候,新建了线程池就会往里面提交线程了,我们写的也一样,而且往线程池里面加线程的时候就会判断:当前运行线程数是否大于核心线程数,是否大于最大线程数等,这里需要一个当前运行线程数的变量。

    所以这里增加一个成员变量activeCount,初始值为0,运行一个线程就加1,线程运行结束就减1,这里面减的时候是在不同线程里面,所以为了线程安全用AtomicInteger类型

        /** 当前活动线程数 */
        private AtomicInteger activeCount = new AtomicInteger(0);

    提交线程的方法,看过ThreadPoolExecutor源码的应该知道,里面每个线程都是包装了一个Worker类,新增线程的时候就会新建一个worker,为什么要这么做呢?我第一次看的时候也不理解,想着如果直接把传进来的线程start一下怎么样,如果直接新建线程会立马就发现问题,怎么知道线程什么运行完,怎么把activeCount减1?所以这里不能直接start,必须新建一个线程(异步执行,废话),这个线程必须要运行参数的线程run方法(废话,不然参数还有啥用,我们的业务逻辑咋执行),线程运行完成后activeCount减1。

    所以这里有了worker类,worker本身也是一个线程。顺便把线程名字也解决了,新建一个threadNum从0开始,新建一个worker就 1

    提交线程

    3.现在已经有点样子了,但是问题还很多,主要的一个问题:比如我建一个core=2,max=5,queue=5的线程池,假设往里面放了8个线程,会出现只运行了3个线程,跑完也就结束了,剩下的5个会在队列里面没处理,而且也不会保留2个核心线程

    WheelThreadPool2的运行结果

    现在想想怎么把队列的线程拿出来运行,没看ThreadPoolExecutor源码前我第一次想着是不是在创建线程池的时候默认启动一个线程去队列里面获取并执行,然而一想立马否定了,因为线程池是多线程运行的,队列里面的线程需要max(参数最大线程数)个线程同时执行。所以在我们新建的worker里面要能不断的循环获取队列的线程去执行,如果队列为空了,则退出循环,让线程结束

    改造一下worker的run方法,在execute方法创建的worker线程执行完通过参数传进来的runnable之后,循环获取队列并执行队列线程的run方法

    这样还有点问题,如果try里面出现异常,比如runnable.run异常或者r.run异常,这个线程就退出了,不能保持max个线程并行执行

    所以如果异常了需要重新创建一个线程继续跑循环,改造后

    这样改造后如果队列空了会把所有线程都结束掉,所以现在要解决执行完队列后保留core个线程的问题,怎么保留线程其实是通过阻塞队列实现的,

    当队列为空时,通过queue.take()方法阻塞住当前线程,直到又有线程提交。如果当前活动线程超出core,结束当前线程

    这样改造后大概轮廓出来了,因为queue是阻塞队列,而且各个方法都加了线程锁,所以本身也是线程安全的,这部分代码貌似不需要加锁,跑个测试用例试下,貌似很合理

    WheelThreadPoolTest3结果

    先放着,再看看下个功能,要支持线程存活时间,这个存活时间的意思是:比如上面WheelThreadPoolTest3里面的线程池运行了10个线程,跑完之后剩余2个线程,3个消亡了(完成任务了)。后面再提交10个线程,又新建3个线程(从控制台的线程名字可以看出),如果我们设置一个存活时间,让第一批的10个跑完后的那3个线程不消亡,比如存活5秒,第二批的10个跑的时候就可以复用,不需要重新创建线程。因为线程是稀缺资源,能复用就复用,新建线程也影响效率

    目前的代码线程消亡的标记是因为queue.poll获取到了null,导致循环退出,线程完成。而阻塞队列的poll方法还有一个多态方法E poll(long timeout, TimeUnit unit),可以在一定时间内poll,在时间内获取到了就会返回,这个时间刚好用于是线程的存活时间(死亡倒计时??)。

    构造方法已经传了存活时间和单位,直接加上这两个参数

    再来测试下,存活时间设置为5秒,那样第二批只能提交5个线程,否则会导致线程池慢

    结果

    发生一个大问题,最后线程都没了,而且主线程也退出了

    原因:第一批的10个线程执行完后因为线程存活5秒,所以都保留了

    堆栈打印出来也证实了,都在poll里面阻塞,然后第二批5个线程已提交,这存活的5个线程就会立马开始执行,执行完后再次阻塞再poll,等过了存活时间,线程全部结束!

    在第二批执行结束后再次打印堆栈,结果果然是这样

    问题知道了,解决方法

    这里的lock是一个线程锁,防止多个线程同时判断(同时判断了写这个还有意义么。。)

        /** 线程锁 */
        private Lock lock = new ReentrantLock();

    至此,写好了主要部分,测试下

    不过有个地方我自己代码走读,感觉是有问题

    感觉这里会存在线程安全问题,假设线程池队列为空,当前activeCount大于core,并发情况下,多个线程同时满足activeCount.get()>coreCount,之后所有线程都会走queue.poll分支,因为队列为空,所有线程queue.poll返回为null,所有线程全部结束掉,这样和保留core个线程冲突了。纠结许久后来发现这个假设不成立,要在没有进入while之前就出现队列为空,且activeCount>core,这种情况不会出现,因为在提交线程的方法(execute)已经限制了这种情况,但是这个代码看起来会有歧义,还是决定改造改造

    这回走读一次,感觉好多了,终于把主要功能写完了,也能正常跑了

    4.立即关闭线程池

    线程池里面的线程跑完了,但是还有core个线程阻塞着,这么一直阻塞着也不是办法,所以要有个关闭的方法,先写暴力关闭,当前运行的线程中断,队列抛弃

    思考时间:中断线程肯定是调用Thread.interrupt方法,这样我得拿到正在运行得线程才行,所以在新增线程的时候得保存在一个集合里,而且线程执行得时候异常了,也会新增线程,所以这个保存集合要线程安全,而且存取速度要快

    这里需要一个线程安全得set集合,ConcurrentHashMap里面有个newKeySet方法,看了下源码是通过ConcurrentHashMap的key来的,是线程安全的,直接用

    `

    /** 保存正在运行的线程 */

    private Set workers = ConcurrentHashMap.newKeySet();

    `

    还需要一个状态标识当前线程池是否关闭,这个状态要在线程并发(获取队列线程)情况下可以判断,所以用volatile修饰,默认正在运行

    `

    /** 线程池状态,-1:正在运行,0:暴力关闭,1:优雅关闭 */

    private volatile int status = -1;

    `

    新建暴力关闭方法stopNow

    在新建worker的时候都加到workers里面去,这时候一想,activeCount和workers.size是不是重复了,顺带把activeCount删掉,用workers.size代替,线程执行完成workers.remove掉

    新增方法addWorker

    提交线程方法改造,删掉了activeCount,用workers.size代替

    worker的run方法改造,删掉了activeCount,用workers.size代替,activeCount-1用workers.remove(this)代替

    改造完了,查看一下,暴力关闭需要立即中断线程,抛弃队列,所以在while获取队列那里要增加判断

    提交线程方法增加状态判断

    改造完成,测试下暴力关闭

    结果,整个程序也退出了,线程池结束了,队列只运行了一个线程

    5.优雅关闭线程池

    优雅的关闭线程池,是要让所有的线程和队列都运行完毕再关闭所有线程,这样就不能直接interrupt线程了,先设置status=1未优雅关闭

    新增优雅关闭方法

    提交线程方法增加状态,限制提交

    这里调用stop方法时分情况:

    - 1.存在线程还在执行(队列或者当前线程),执行完成后调用workers所有线程的interrupt,防止存在线程在queue.take处阻塞

    - 2.不存在线程还在执行(队列或者当前线程),调用workers所有线程的interrupt,防止存在线程在queue.take处阻塞

    所以这里需要一个标记,是否还存在线程在执行,我们可以用一个数字标识当前还需要执行的线程数量,执行完一个线程就-1

    增加成员变量remainingCount,标识剩余线程数

        /** 剩余线程数 */
        private AtomicInteger remainingCount = new AtomicInteger(0); 

    每次提交一个线程 1

    每次执行完一个线程-1

    第一种情况,存在线程还在执行,在执行完成后判断remainingCount是否为0

    第二种情况,不存在线程还在执行,在stop的时候增加判断

    抽象封装下方法

    顺便把worker的run方法也优化下,一个屏幕都截不下了

    跑个第一种情况的测试用例

    结果出乎意料,居然没有结束所有线程

    加日志调试

    出现了更夸张的错误

    根据控制台信息可以想象,原本的5个线程全部被interrupt,又不断地创建线程,又不断的被interrupt。这里会创建线程的地方只有在worker的run方法异常,finally代码段里面,而且没加日志的时候没有出现这种情况,加了日志就出现了。多次代码走读后,发现一种可能,在最初5个线程同时将队列消耗完后,2个线程进入take阻塞,3个线程开始进入interruptWorkers方法,导致那2个线程出现异常,异常后会退出线程,再次创建新线程,并且interrupt新线程,由此陷入死循环

    改造getQueueTask方法,不抛出异常,出现异常返回null。顺便走读一下status=0的情况,发现不影响

    再次运行测试用例,结果符合预期了,这里的异常堆栈可以忽略

    再测试优雅关闭的第二种情况

    结果正常

    去掉调试日志,至此,这个轮子线程池完成,具备线程池基础功能

    总结

    写这个线程池过程曲折,各种问题不断出现,特别时两种关闭方法,判断比较烦,代码走读和调试良久,才堪堪解决,由此联想ThreadPoolExecutor是多么强大,多么不简单

    图片较多,代码可以在Github上找到

    参考资料:ThreadPoolExecutor源码

    感谢crossoverjie的文章:https://crossoverjie.top/2019/05/20/concurrent/threadpool-01/

    本文来自chentiefeng的博客

    展开全文
  • 线程池

    2013-10-28 17:31:59
    应用程序可以有多个线程,这些线程在休眠状态中需要耗费大量时间来等待事件发生。其他线程可能进入睡眠状态,并且仅定期被唤醒以循更改或更新状态信息,然后再次进入休眠状态。为了简化对这些线程的管理,.NET框架...

    线程池

    应用程序可以有多个线程,这些线程在休眠状态中需要耗费大量时间来等待事件发生。其他线程可能进入睡眠状态,并且仅定期被唤醒以轮循更改或更新状态信息,然后再次进入休眠状态。为了简化对这些线程的管理,.NET框架为每个进程提供了一个线程池,一个线程池有若干个等待操作状态,当一个等待操作完成时,线程池中的辅助线程会执行回调函数。线程池中的线程由系统管理,程序员不需要费力于线程管理,可以集中精力处理应用程序任务。

    线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。线程池线程都是后台线程.每个线程都使用默认的堆栈大小,以默认的优先级运行,并处于多线程单元中.如果某个线程在托管代码中空闲(如正在等待某个事件),则线程池将插入另一个辅助线程来使所有处理器保持繁忙.如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值.超过最大值的线程可以排队,但他们要等到其他线程完成后才启动。

    不使用线程池情况

    • 如果需要使一个任务具有特定优先级

    • 如果具有可能会长时间运行(并因此阻塞其他任务)的任务

    • 如果需要将线程放置到单线程单元中(线程池中的线程均处于多线程单元中)

    • 如果需要永久标识来标识和控制线程,比如想使用专用线程来终止该线程,将其挂起或按名称发现它

    条件变量

    与互斥锁不同,条件变量是用来等待而不是用来上锁的。条件变量用来自动阻塞一个线程,直到某特殊情况发生为止。通常条件变量和互斥锁同时使用。

    条件变量使我们可以睡眠等待某种条件出现。条件变量是利用线程间共享的全局变量进行同步的一种机制,主要包括两个动作:一个线程等待"条件变量的条件成立"而挂起;另一个线程使"条件成立"(给出条件成立信号)。

    条件的检测是在互斥锁的保护下进行的。如果一个条件为假,一个线程自动阻塞,并释放等待状态改变的互斥锁。如果另一个线程改变了条件,它发信号给关联的条件变量,唤醒一个或多个等待它的线程,重新获得互斥锁,重新评价条件。如果两进程共享可读写的内存,条件变量可以被用来实现这两进程间的线程同步。

    使用条件变量之前要先进行初始化。可以在单个语句中生成和初始化一个条件变量如:

    pthread_cond_tmy_condition=PTHREAD_COND_INITIALIZER;(用于进程间线程的通信)。

    也可以利用函数pthread_cond_init动态初始化。

    条件变量函数

    • 条件变量初始化

    #include< pthread.h>

    int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t*attr);

    参数cond条件变量,attr条件变量属性

    返回值:成功返回0,出错返回错误编号。

    说明:pthread_cond_init函数可以用来初始化一个条件变量。它使用变量attr所指定的属性来初始化一个条件变量,如果参数attr为空,那么它将使用缺省的属性来设置所指定的条件变量。

    • 条件变量摧毁

    int pthread_cond_destroy(pthread_cond_t *cond);

    参数:cond条件变量

    返回值:成功返回0,出错返回错误编号。

    说明:pthread_cond_destroy函数可以用来摧毁所指定的条件变量,同时将会释放所给它分配的资源。调用该函数的进程也并不要求等待在参数所指定的条件变量上

    • 条件变量等待

    int pthread_cond_wait(pthread_cond_t *cond,pthread_mutex_t *mutex);

    int pthread_cond_timedwait(pthread_cond_t *cond,pthread_mutex_tmytex,const struct timespec *abstime);

    参数:cond条件变量,mutex互斥锁

    返回值:成功返回0,出错返回错误编号。

    说明:第一个参数*cond是指向一个条件变量的指针,第二个参数*mutex则是对相关的互斥锁的指针。函数pthread_cond_timedwait函数类型与函数pthread_cond_wait区别在于,如果达到或是超过所引用的参数*abstime,它将结束并返回错误ETIME。pthread_cond_timedwait函数的参数*abstime指向一个timespec结构。该结构如下:

    typedefstruct timespec{

    time_ttv_sec;

    longtv_nsex;

    }timespec_t;

    • 条件变量通知

    intpthread_cond_signal(pthread_cond_t *cond);

    intpthread_cond_broadcast(pthread_cond_t *cond);

    参数:cond条件变量

    成功返回0,出错返回错误编号。

    说明:参数*cond是对类型为pthread_cond_t的一个条件变量的指针。当调用pthread_cond_signal时一个在相同条件变量上阻塞的线程将被解锁。如果同时有多个线程阻塞,则由调度策略确定接收通知的线程。如果调用pthread_cond_broadcast,则将通知阻塞在这个条件变量上的所有线程。一旦被唤醒,线程仍然会要求互斥锁。如果当前没有线程等待通知,则上面两种调用实际上成为一个空操作。如果参数*cond指向非法地址,则返回值EINVAL

    线程池示例

    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <sys/types.h>
    #include <pthread.h>
    #include <assert.h>
    
    /*
    *线程池里所有运行和等待的任务都是一个CThread_worker
    *由于所有任务都在链表里,所以是一个链表结构
    */
    typedef struct worker
    {
        /*回调函数,任务运行时会调用此函数,注意也可声明成其它形式*/
        void *(*process) (void *arg);
        void *arg;/*回调函数的参数*/
        struct worker *next;
    } CThread_worker;
    
    /*线程池结构*/
    typedef struct
    {
        pthread_mutex_t queue_lock;
        pthread_cond_t queue_ready;
    
        /*链表结构,线程池中所有等待任务*/
        CThread_worker *queue_head;
    
        /*是否销毁线程池*/
        int shutdown;
        pthread_t *threadid;
        /*线程池中允许的活动线程数目*/
        int max_thread_num;
        /*当前等待队列的任务数目*/
        int cur_queue_size;
    } CThread_pool;
    
    /*线程池初始化*/
    void pool_init (int max_thread_num);
    
    /*向线程池中加入任务*/
    int pool_add_worker (void *(*) (void *), void *);
    
    /*销毁线程池,等待队列中的任务不会再被执行;但是正在运行的线程会一直把任务运行完后再退出*/
    int pool_destroy ();
    
    /*线程正常执行*/
    void *thread_routine (void *);
    
    /*执行的回调函数*/
    void *myprocess (void *);
    
    static CThread_pool *pool = NULL;
    void pool_init (int max_thread_num)
    {
        pool = (CThread_pool *) malloc (sizeof (CThread_pool));
    
        pthread_mutex_init (&(pool->queue_lock), NULL);
        /*初始化条件变量*/
        pthread_cond_init (&(pool->queue_ready), NULL);
    
        pool->queue_head = NULL;
    
        pool->max_thread_num = max_thread_num;
        pool->cur_queue_size = 0;
    
        pool->shutdown = 0;
    
        pool->threadid = (pthread_t *) malloc (max_thread_num * sizeof (pthread_t));
        int i = 0;
        for (i = 0; i < max_thread_num; i++)
        {
            pthread_create (&(pool->threadid[i]), NULL, thread_routine, NULL);
        }
    }
    
    /*向线程池中加入任务*/
    int pool_add_worker (void *(*process) (void *arg), void *arg)
    {
        /*构造一个新任务*/
        CThread_worker *newworker = (CThread_worker *) malloc (sizeof (CThread_worker));
        newworker->process = process;
        newworker->arg = arg;
        newworker->next = NULL;/*别忘置空*/
    
        pthread_mutex_lock (&(pool->queue_lock));
        /*将任务加入到等待队列中*/
        CThread_worker *member = pool->queue_head;
        if (member != NULL)
        {
            while (member->next != NULL)
                member = member->next;
            member->next = newworker;
        }
        else
        {
            pool->queue_head = newworker;
        }
    
        assert (pool->queue_head != NULL);
    
        pool->cur_queue_size++;
        pthread_mutex_unlock (&(pool->queue_lock));
        /*等待队列中有任务了,唤醒一个等待线程;
        注意如果所有线程都在忙碌,这句没有任何作用*/
        pthread_cond_signal (&(pool->queue_ready));
        return 0;
    }
    
    /*销毁线程池,等待队列中的任务不会再被执行;但是正在运行的线程会一直把任务运行完后再退出*/
    int pool_destroy ()
    {
        if (pool->shutdown)
            return -1;/*防止两次调用*/
        pool->shutdown = 1;
    
        /*唤醒所有等待线程,线程池要销毁了*/
        pthread_cond_broadcast (&(pool->queue_ready));
    
        /*阻塞等待线程退出,否则就成僵尸了*/
        int i;
        for (i = 0; i < pool->max_thread_num; i++)
            pthread_join (pool->threadid[i], NULL);
        free (pool->threadid);
    
        /*销毁等待队列*/
        CThread_worker *head = NULL;
        while (pool->queue_head != NULL)
        {
            head = pool->queue_head;
            pool->queue_head = pool->queue_head->next;
            free (head);
        }
        /*条件变量和互斥量也别忘了销毁*/
        pthread_mutex_destroy(&(pool->queue_lock));
        pthread_cond_destroy(&(pool->queue_ready));
    
        free (pool);
        /*销毁后指针置空是个好习惯*/
        pool=NULL;
        return 0;
    }
    
    /*线程正常执行*/
    void *thread_routine (void *arg)
    {
        printf ("starting thread 0x%x\n", pthread_self ());
        while (1)
        {
            pthread_mutex_lock (&(pool->queue_lock));
            /*如果等待队列为0并且不销毁线程池,则处于阻塞状态; 注意
            pthread_cond_wait是一个原子操作,等待前会解锁,唤醒后会加锁*/
            while (pool->cur_queue_size == 0 && !pool->shutdown)
            {
                printf ("thread 0x%x is waiting\n", pthread_self ());
                pthread_cond_wait (&(pool->queue_ready), &(pool->queue_lock));
            }
    
            /*线程池要销毁了*/
            if (pool->shutdown)
            {
                /*遇到break,continue,return等跳转语句,千万不要忘记先解锁*/
                pthread_mutex_unlock (&(pool->queue_lock));
                printf ("thread 0x%x will exit\n", pthread_self ());
                pthread_exit (NULL);
            }
    
            printf ("thread 0x%x is starting to work\n", pthread_self ());
    
            /*assert是调试的好帮手*/
            assert (pool->cur_queue_size != 0);
            assert (pool->queue_head != NULL);
    
            /*等待队列长度减去1,并取出链表中的头元素*/
            pool->cur_queue_size--;
            CThread_worker *worker = pool->queue_head;
            pool->queue_head = worker->next;
            pthread_mutex_unlock (&(pool->queue_lock));
    
            /*调用回调函数,执行任务*/
            (*(worker->process)) (worker->arg);
            free (worker);
            worker = NULL;
        }
        /*这一句应该是不可达的*/
        pthread_exit (NULL);
    }
    
    /*执行的回调函数*/
    void *myprocess (void *arg)
    {
        printf ("threadid is 0x%x, working on task %d\n", pthread_self (),*(int *) arg);
        sleep (1);/*休息一秒,延长任务的执行时间*/
        return NULL;
    }
    
    int main (int argc, char **argv)
    {
        pool_init (3);/*线程池中最多三个活动线程*/
    
        /*连续向池中投入10个任务*/
        int *workingnum = (int *) malloc (sizeof (int) * 10);
        int i;
        for (i = 0; i < 10; i++)
        {
            workingnum[i] = i;
            pool_add_worker (myprocess, &workingnum[i]);
        }
        /*等待所有任务完成*/
        sleep (5);
        /*销毁线程池*/
        pool_destroy ();
    
        free (workingnum);
        return 0;
    }
    

    在linux下编译运行:

    #gcc -lpthread pthread.c -o pthread

    #./pthread


    展开全文
  • 这次为了帮助自己深入理解线程池,决定手动写一个极简(陋)的线程池,顺便记录思考和造过程。虽然不太可能和jdk自带的那么完美,但是该有的功能还是要有:新建线程池,有核心线程数和最大线程数,线程存活时间,...

    3dc274ad625eaca1d060a469da549602.png

    很早之前就看过线程池源码(知道大概的运行原理),但是只是知道怎么用,并没有深究。这次为了帮助自己深入理解线程池,决定手动写一个极简(陋)的线程池,顺便记录思考和造轮过程。

    虽然不太可能和jdk自带的那么完美,但是该有的功能还是要有:

    新建线程池,有核心线程数和最大线程数,线程存活时间,队列

    在线程池加入线程,当前线程数不超过核心线程数就新建线程,超过核心放队列,队列满了再新建线程,达到最大线程

    全部线程运行完成后会保留核心线程数,支持线程存活时间

    立即关闭线程池

    优雅的关闭线程池

    1.新建一个轮子线程池类,就一个构造函数,把需要的参数都传进来
    89944ebb8c00b88066d9e1d5e7a391a4.png
    2.用ThreadPoolExecutor的时候,新建了线程池就会往里面提交线程了,我们写的也一样,而且往线程池里面加线程的时候就会判断:当前运行线程数是否大于核心线程数,是否大于最大线程数等,这里需要一个当前运行线程数的变量。

    所以这里增加一个成员变量activeCount,初始值为0,运行一个线程就加1,线程运行结束就减1,这里面减的时候是在不同线程里面,所以为了线程安全用AtomicInteger类型

        /** 当前活动线程数 */    private AtomicInteger activeCount = new AtomicInteger(0);

    提交线程的方法,看过ThreadPoolExecutor源码的应该知道,里面每个线程都是包装了一个Worker类,新增线程的时候就会新建一个worker,为什么要这么做呢? 我第一次看的时候也不理解,想着如果直接把传进来的线程start一下怎么样,如果直接新建线程会立马就发现问题,怎么知道线程什么运行完,怎么把activeCount减1? 所以这里不能直接start,必须新建一个线程(异步执行,废话),这个线程必须要运行参数的线程run方法(废话,不然参数还有啥用,我们的业务逻辑咋执行),线程运行完成后activeCount减1。

    所以这里有了worker类,worker本身也是一个线程。顺便把线程名字也解决了,新建一个threadNum从0开始,新建一个worker就+1

    b270d8321ac1e5bff1e693c307cfb37f.png

    提交线程

    d83dcce6cb6223437efb7936cd4d98c0.png
    3.现在已经有点样子了,但是问题还很多,主要的一个问题:比如我建一个core=2,max=5,queue=5的线程池,假设往里面放了8个线程,会出现只运行了3个线程,跑完也就结束了,剩下的5个会在队列里面没处理,而且也不会保留2个核心线程
    b2eb41fc2dd2553a9f024166234f17c6.png

    WheelThreadPool2的运行结果

    7f417856c6762b086e9c1c86204f6358.png

    现在想想怎么把队列的线程拿出来运行,没看ThreadPoolExecutor源码前我第一次想着是不是在创建线程池的时候默认启动一个线程去队列里面获取并执行,然而一想立马否定了,因为线程池是多线程运行的,队列里面的线程需要max(参数最大线程数)个线程同时执行。 所以在我们新建的worker里面要能不断的循环获取队列的线程去执行,如果队列为空了,则退出循环,让线程结束

    改造一下worker的run方法,在execute方法创建的worker线程执行完通过参数传进来的runnable之后,循环获取队列并执行队列线程的run方法

    0a709dddfe2d714456874b00a0633903.png

    这样还有点问题,如果try里面出现异常,比如runnable.run异常或者r.run异常,这个线程就退出了,不能保持max个线程并行执行

    所以如果异常了需要重新创建一个线程继续跑循环,改造后

    d876b3e4008e876daf86fdcd566229da.png

    这样改造后如果队列空了会把所有线程都结束掉,所以现在要解决执行完队列后保留core个线程的问题,怎么保留线程其实是通过阻塞队列实现的,

    当队列为空时,通过queue.take()方法阻塞住当前线程,直到又有线程提交。如果当前活动线程超出core,结束当前线程

    1eebcd363142467ac19885bf6803cc77.png

    这样改造后大概轮廓出来了,因为queue是阻塞队列,而且各个方法都加了线程锁,所以本身也是线程安全的,这部分代码貌似不需要加锁,跑个测试用例试下,貌似很合理

    859dd87a26963a3040355fa19f1da5e6.png

    WheelThreadPoolTest3结果

    01e1ff8ac9f8b0824b179fd0b8de6912.png

    先放着,再看看下个功能,要支持线程存活时间,这个存活时间的意思是:比如上面WheelThreadPoolTest3里面的线程池运行了10个线程,跑完之后剩余2个线程,3个消亡了(完成任务了)。 后面再提交10个线程,又新建3个线程(从控制台的线程名字可以看出),如果我们设置一个存活时间,让第一批的10个跑完后的那3个线程不消亡,比如存活5秒,第二批的10个跑的时候就可以复用,不需要重新创建线程。 因为线程是稀缺资源,能复用就复用,新建线程也影响效率

    目前的代码线程消亡的标记是因为queue.poll获取到了null,导致循环退出,线程完成。而阻塞队列的poll方法还有一个多态方法E poll(long timeout, TimeUnit unit),可以在一定时间内poll,在时间内获取到了就会返回,这个时间刚好用于是线程的存活时间(死亡倒计时??)。

    构造方法已经传了存活时间和单位,直接加上这两个参数

    65cd705f9b33123cf4f152f70dff210c.png

    再来测试下,存活时间设置为5秒,那样第二批只能提交5个线程,否则会导致线程池慢

    fa64104f2484c672ccb7a15430385f08.png

    结果

    f7eac567aca825bb689ca6e2c93f9bc0.png

    发生一个大问题,最后线程都没了,而且主线程也退出了

    原因:第一批的10个线程执行完后因为线程存活5秒,所以都保留了

    eac7a6c3d1a8bf7ce71278c6b8c8e116.png
    ef3f9ea85d39801ffba270645608485f.png
    20f4a9d4707f916eeccc23e8c493453b.png

    堆栈打印出来也证实了,都在poll里面阻塞,然后第二批5个线程已提交,这存活的5个线程就会立马开始执行,执行完后再次阻塞再poll,等过了存活时间,线程全部结束!

    在第二批执行结束后再次打印堆栈,结果果然是这样

    c8625fb9b9f6b35d01ca3690d024caec.png
    8d4de95381f8ec6536fa3fad57c2a016.png

    问题知道了,解决方法

    be47e9f9b735545450a74c83e874c051.png
    beb35663408da02a42ff56824e2c5d27.png

    这里的lock是一个线程锁,防止多个线程同时判断(同时判断了写这个还有意义么。。)

        /** 线程锁 */    private Lock lock = new ReentrantLock();

    至此,写好了主要部分,测试下

    beb35663408da02a42ff56824e2c5d27.png

    不过有个地方我自己代码走读,感觉是有问题

    4bace5ceb9a47330ead4b509d780e0ed.png

    感觉这里会存在线程安全问题,假设线程池队列为空,当前activeCount大于core,并发情况下,多个线程同时满足activeCount.get()>coreCount,之后所有线程都会走queue.poll分支,因为队列为空,所有线程queue.poll返回为null,所有线程全部结束掉,这样和保留core个线程冲突了。 纠结许久后来发现这个假设不成立,要在没有进入while之前就出现队列为空,且activeCount>core,这种情况不会出现,因为在提交线程的方法(execute)已经限制了这种情况,但是这个代码看起来会有歧义,还是决定改造改造

    fc99ebf0103d43db719b1801e9f64cd0.png

    这回走读一次,感觉好多了,终于把主要功能写完了,也能正常跑了

    4.立即关闭线程池

    线程池里面的线程跑完了,但是还有core个线程阻塞着,这么一直阻塞着也不是办法,所以要有个关闭的方法,先写暴力关闭,当前运行的线程中断,队列抛弃

    思考时间:中断线程肯定是调用Thread.interrupt方法,这样我得拿到正在运行得线程才行,所以在新增线程的时候得保存在一个集合里,而且线程执行得时候异常了,也会新增线程,所以这个保存集合要线程安全,而且存取速度要快 这里需要一个线程安全得set集合,ConcurrentHashMap里面有个newKeySet方法,看了下源码是通过ConcurrentHashMap的key来的,是线程安全的,直接用

    /** 保存正在运行的线程 */ private Set workers = ConcurrentHashMap.newKeySet();

    还需要一个状态标识当前线程池是否关闭,这个状态要在线程并发(获取队列线程)情况下可以判断,所以用volatile修饰,默认正在运行

    /** 线程池状态,-1:正在运行,0:暴力关闭,1:优雅关闭 */ private volatile int status = -1;

    新建暴力关闭方法stopNow

    8d476a9e8408010ce671a7b25eebb218.png

    在新建worker的时候都加到workers里面去,这时候一想,activeCount和workers.size是不是重复了,顺带把activeCount删掉,用workers.size代替,线程执行完成workers.remove掉

    新增方法addWorker

    c868425b747a65e56bddd4642e88be72.png

    提交线程方法改造,删掉了activeCount,用workers.size代替

    184af86cd05d6fe9eb9ae7bf1ae8820f.png

    worker的run方法改造,删掉了activeCount,用workers.size代替,activeCount-1用workers.remove(this)代替

    36bb1b4a729ec3c7fdcbf10bbf26598b.png

    改造完了,查看一下,暴力关闭需要立即中断线程,抛弃队列,所以在while获取队列那里要增加判断

    585cf36e1d2805f65bc8032eac6c24cd.png

    提交线程方法增加状态判断

    f95d41063e3af9fca1eb3ef799b3f96c.png

    改造完成,测试下暴力关闭

    63d0201573f876d855210ff7c5cd6ae0.png

    结果,整个程序也退出了,线程池结束了,队列只运行了一个线程

    8d2b0ff5fc28bed0a4572886bedd4ac7.png
    5.优雅关闭线程池

    优雅的关闭线程池,是要让所有的线程和队列都运行完毕再关闭所有线程,这样就不能直接interrupt线程了,先设置status=1未优雅关闭

    新增优雅关闭方法

    1bd89059cab160d5ac3e7ef0298e6a4e.png

    提交线程方法增加状态,限制提交

    e28cf9700d3dd9f5fffbab0e1a8a1213.png

    这里调用stop方法时分情况:

    1.存在线程还在执行(队列或者当前线程),执行完成后调用workers所有线程的interrupt,防止存在线程在queue.take处阻塞

    2.不存在线程还在执行(队列或者当前线程),调用workers所有线程的interrupt,防止存在线程在queue.take处阻塞

    所以这里需要一个标记,是否还存在线程在执行,我们可以用一个数字标识当前还需要执行的线程数量,执行完一个线程就-1

    增加成员变量remainingCount,标识剩余线程数

        /** 剩余线程数 */    private AtomicInteger remainingCount = new AtomicInteger(0); 

    每次提交一个线程+1

    5159871f1bbcd919a923b2b04f62fc5c.png

    每次执行完一个线程-1

    5b7d9b117520e1f8376ecfe763afae65.png

    第一种情况,存在线程还在执行,在执行完成后判断remainingCount是否为0

    83b06d15bcd04fd76be960322fc3a551.png

    第二种情况,不存在线程还在执行,在stop的时候增加判断

    c6c2df0c127943622e014d44c755e218.png

    抽象封装下方法

    3d5ea420585ff030f2898f8302322660.png
    7bf4bb1ad85ec6c056448aaebf95f8a9.png

    顺便把worker的run方法也优化下,一个屏幕都截不下了

    12c9830d0c89316b8d8e25117bfa4e76.png
    5426319560621fd24e9de1b2174deac9.png

    跑个第一种情况的测试用例

    c4bbc49d4dd78a22e446aec1b0b9b44b.png

    结果出乎意料,居然没有结束所有线程

    22e60eed9def4b317c74f9d30d6bac60.png

    加日志调试

    b7e0892e5ca7ccfbfee2db19cb842d31.png

    出现了更夸张的错误

    ae122514eec2165974ed1e8be49a5759.png

    根据控制台信息可以想象,原本的5个线程全部被interrupt,又不断地创建线程,又不断的被interrupt。 这里会创建线程的地方只有在worker的run方法异常,finally代码段里面,而且没加日志的时候没有出现这种情况,加了日志就出现了。 多次代码走读后,发现一种可能,在最初5个线程同时将队列消耗完后,2个线程进入take阻塞,3个线程开始进入interruptWorkers方法,导致那2个线程出现异常,异常后会退出线程,再次创建新线程,并且interrupt新线程,由此陷入死循环

    改造getQueueTask方法,不抛出异常,出现异常返回null。顺便走读一下status=0的情况,发现不影响

    764e90d3ae71f7c1e75d0a734192fa2a.png

    再次运行测试用例,结果符合预期了,这里的异常堆栈可以忽略

    572702321974ae10746efe361228e53c.png

    再测试优雅关闭的第二种情况

    94042a427700d722551f97310f4d5513.png

    结果正常

    6764d1e1f6c1b39bcb4d4e4762cefc48.png

    去掉调试日志,至此,这个轮子线程池完成,具备线程池基础功能

    总结

    写这个线程池过程曲折,各种问题不断出现,特别时两种关闭方法,判断比较烦,代码走读和调试良久,才堪堪解决,由此联想ThreadPoolExecutor是多么强大,多么不简单

    图片较多,代码可以在Github上找到

    参考资料:ThreadPoolExecutor源码

    展开全文
  • 最近看 Kafka 看到了时间轮算法,记得以前看 Netty 也看到过这玩意,没太过关注。今天就来看看时间轮到底是什么东西。 为什么要用时间轮算法来实现延迟操作? 延时操作 Java 不是提供了 Timer 么? 还有 DelayQueue ...

    大家好,我是yes。

    最近看 Kafka 看到了时间轮算法,记得以前看 Netty 也看到过这玩意,没太过关注。今天就来看看时间轮到底是什么东西。

    为什么要用时间轮算法来实现延迟操作?

    延时操作 Java 不是提供了 Timer 么?

    还有 DelayQueue 配合线程池或者 ScheduledThreadPool 不香吗?

    我们先来简单看看 Timer、DelayQueue 和 ScheduledThreadPool 的相关实现,看看它们是如何实现延时任务的,源码之下无秘密。再来剖析下为何 Netty 和 Kafka 特意实现了时间轮来处理延迟任务。

    如果在手机上阅读其实纯看字也行,不用看代码,我都会先用文字描述清楚。不过电脑上看效果更佳。

    Timer

    Timer 可以实现延时任务,也可以实现周期性任务。我们先来看看 Timer 核心属性和构造器。

    核心就是一个优先队列和封装的执行任务的线程,从这我们也可以看到一个 Timer 只有一个线程执行任务。

    再来看看如何实现延时和周期性任务的。我先简单的概括一下,首先维持一个小顶堆,即最快需要执行的任务排在优先队列的第一个,根据堆的特性我们知道插入和删除的时间复杂度都是 O(logn)。

    然后 TimerThread 不断地拿排着的第一个任务的执行时间和当前时间做对比。如果时间到了先看看这个任务是不是周期性执行的任务,如果是则修改当前任务时间为下次执行的时间,如果不是周期性任务则将任务从优先队列中移除。最后执行任务。如果时间还未到则调用 wait() 等待。

    再看下图,整理下流程。

    流程知道了再对着看下代码,这块就差不多了。看代码不爽的可以跳过代码部分,影响不大。

    先来看下 TaskQueue,就简单看下插入任务的过程,就是个普通的堆插入操作。

    再来看看 TimerThread 的 run 操作。

    小结一下

    可以看出 Timer 实际就是根据任务的执行时间维护了一个优先队列,并且起了一个线程不断地拉取任务执行。

    有什么弊端呢?

    首先优先队列的插入和删除的时间复杂度是O(logn),当数据量大的时候,频繁的入堆出堆性能有待考虑。

    并且是单线程执行,那么如果一个任务执行的时间过久则会影响下一个任务的执行时间(当然你任务的run要是异步执行也行)。

    并且从代码可以看到对异常没有做什么处理,那么一个任务出错的时候会导致之后的任务都无法执行。

    ScheduledThreadPoolExecutor

    在说 ScheduledThreadPoolExecutor 之前我们再看下 Timer 的注释,注释可都是干货千万不要错过。我做了点修改,突出了下重点。

    Java 5.0 introduced ScheduledThreadPoolExecutor, It is effectively a more versatile replacement for the Timer, it allows multiple service threads. Configuring with one thread makes it equivalent to Timer。

    简单翻译下:1.5 引入了 ScheduledThreadPoolExecutor,它是一个具有更多功能的 Timer 的替代品,允许多个服务线程。如果设置一个服务线程和 Timer 没啥差别。

    从注释看出相对于 Timer ,可能就是单线程跑任务和多线程跑任务的区别。我们来看下。

    继承了 ThreadPoolExecutor,实现了 ScheduledExecutorService。可以定性操作就是正常线程池差不多了。区别就在于两点,一个是 ScheduledFutureTask ,一个是 DelayedWorkQueue。

    其实 DelayedWorkQueue 就是优先队列,也是利用数组实现的小顶堆。而 ScheduledFutureTask 继承自 FutureTask 重写了 run 方法,实现了周期性任务的需求。

    小结一下

    ScheduledThreadPoolExecutor 大致的流程和 Timer 差不多,也是维护一个优先队列,然后通过重写 task 的 run 方法来实现周期性任务,主要差别在于能多线程运行任务,不会单线程阻塞

    并且 Java 线程池的设定是 task 出错会把错误吃了,无声无息的。因此一个任务出错也不会影响之后的任务

    DelayQueue

    Java 中还有个延迟队列 DelayQueue,加入延迟队列的元素都必须实现 Delayed 接口。延迟队列内部是利用 PriorityQueue 实现的,所以还是利用优先队列!Delayed 接口继承了Comparable 因此优先队列是通过 delay 来排序的。

    然后我们再来看下延迟队列是如何获取元素的。

    小结一下

    也是利用优先队列实现的,元素通过实现 Delayed 接口来返回延迟的时间。不过延迟队列就是个容器,需要其他线程来获取和执行任务。

    这下是搞明白了 Timer 、ScheduledThreadPool 和 DelayQueue,总结的说下它们都是通过优先队列来获取最早需要执行的任务,因此插入和删除任务的时间复杂度都为O(logn),并且 Timer 、ScheduledThreadPool 的周期性任务是通过重置任务的下一次执行时间来完成的。

    问题就出在时间复杂度上,插入删除时间复杂度是O(logn),那么假设频繁插入删除次数为 m,总的时间复杂度就是O(mlogn),这种时间复杂度满足不了 Kafka 这类中间件对性能的要求,而时间轮算法的插入删除时间复杂度是O(1)。我们来看看时间轮算法是如何实现的。

    时间轮算法

    俗话说艺术源于生活,技术也能从日常生活中找到灵感。咱们先来看块表,嗯金色的表。

    都看清楚了吧,时间轮就是和手表时钟很相似的存在。时间轮用环形数组实现,数组的每个元素可以称为槽,和 HashMap一样称呼。

    槽的内部用双向链表存着待执行的任务,添加和删除的链表操作时间复杂度都是 O(1),槽位本身也指代时间精度,比如一秒扫一个槽,那么这个时间轮的最高精度就是 1 秒。

    也就是说延迟 1.2 秒的任务和 1.5 秒的任务会被加入到同一个槽中,然后在 1 秒的时候遍历这个槽中的链表执行任务。

    从图中可以看到此时指针指向的是第一个槽,一共有八个槽0~7,假设槽的时间单位为 1 秒,现在要加入一个延时 5 秒的任务,计算方式就是 5 % 8 + 1 = 6,即放在槽位为 6,下标为 5 的那个槽中。更具体的就是拼到槽的双向链表的尾部。

    然后每秒指针顺时针移动一格,这样就扫到了下一格,遍历这格中的双向链表执行任务。然后再循环继续。

    可以看到插入任务从计算槽位到插入链表,时间复杂度都是O(1)。那假设现在要加入一个50秒后执行的任务怎么办?这槽好像不够啊?难道要加槽嘛?和HashMap一样扩容?

    不是的,常见有两种方式,一种是通过增加轮次的概念。50 % 8 + 1 = 3,即应该放在槽位是 3,下标是 2 的位置。然后 (50 - 1) / 8 = 6,即轮数记为 6。也就是说当循环 6 轮之后扫到下标的 2 的这个槽位会触发这个任务。Netty 中的 HashedWheelTimer 使用的就是这种方式。

    还有一种是通过多层次的时间轮,这个和我们的手表就更像了,像我们秒针走一圈,分针走一格,分针走一圈,时针走一格。

    多层次时间轮就是这样实现的。假设上图就是第一层,那么第一层走了一圈,第二层就走一格。

    可以得知第二层的一格就是8秒,假设第二层也是 8 个槽,那么第二层走一圈,第三层走一格,可以得知第三层一格就是 64 秒。

    那么一格三层,每层8个槽,一共 24 个槽时间轮就可以处理最多延迟 512 秒的任务。

    而多层次时间轮还会有降级的操作,假设一个任务延迟 500 秒执行,那么刚开始加进来肯定是放在第三层的,当时间过了 436 秒后,此时还需要 64 秒就会触发任务的执行,而此时相对而言它就是个延迟 64 秒后的任务,因此它会被降低放在第二层中,第一层还放不下它。

    再过个 56 秒,相对而言它就是个延迟 8 秒后执行的任务,因此它会再被降级放在第一层中,等待执行。

    降级是为了保证时间精度一致性。Kafka内部用的就是多层次的时间轮算法。

    Netty中的时间轮

    在 Netty 中时间轮的实现类是 HashedWheelTimer,代码中的 wheel 就是上图画的循环数组,mask 的设计和HashMap一样,通过限制数组的大小为2的次方,利用位运算来替代取模运算,提高性能。tickDuration 就是每格的时间即精度。可以看到配备了一个工作线程来处理任务的执行。

    接下来我们再来看看任务是如何添加的。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TVwWiaZo-1596457022929)(https://upload-images.jianshu.io/upload_images/16034279-630f2a852196b586.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    可以看到任务并没有直接添加到时间轮中,而是先入了一个 mpsc 队列,我简单说下 mpsc 是 JCTools 中的并发队列,用在多个生产者可同时访问队列,但只有一个消费者会访问队列的情况。篇幅有限,有兴趣的朋友自行了解实现。

    然后我们再来看看工作线程是如何运作的。

    很直观没什么花头,我们先来看看 waitForNextTick,是如何得到下一次执行时间的。

    简单的说就是通过 tickDuration 和此时已经滴答的次数算出下一次需要检查的时间,时候未到就sleep等着。

    再来看下任务如何入槽的。

    注释的很清楚了,实现也和上述分析的一致。

    最后再来看下如何执行的。

    就是通过轮数和时间双重判断,执行完了移除任务。

    小结一下

    总体上看 Netty 的实现就是上文说的时间轮通过轮数的实现,完全一致。可以看出时间精度由 TickDuration 把控,并且工作线程的除了处理执行到时的任务还做了其他操作,因此任务不一定会被精准的执行。

    而且任务的执行如果不是新起一个线程,或者将任务扔到线程池执行,那么耗时的任务会阻塞下个任务的执行。

    并且会有很多无用的 tick 推进,例如 TickDuration 为1秒,此时就一个延迟350秒的任务,那就是有349次无用的操作。

    但是从另一面来看,如果任务都执行很快(当然你也可以异步执行),并且任务数很多,通过分批执行,并且增删任务的时间复杂度都是O(1)来说。时间轮还是比通过优先队列实现的延时任务来的合适些。

    Kafka 中的时间轮

    上面我们说到 Kafka 中的时间轮是多层次时间轮实现,总的而言实现和上述说的思路一致。不过细节有些不同,并且做了点优化。

    先看看添加任务的方法。在添加的时候就设置任务执行的绝对时间。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-01XhBQ65-1596457022933)(https://upload-images.jianshu.io/upload_images/16034279-fd01ec1c1cac3a62.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    那么时间轮是如何推动的呢?Netty 中是通过固定的时间间隔扫描,时候未到就等待来进行时间轮的推动。上面我们分析到这样会有空推进的情况。

    而 Kafka 就利用了空间换时间的思想,通过 DelayQueue,来保存每个槽,通过每个槽的过期时间排序。这样拥有最早需要执行任务的槽会有优先获取。如果时候未到,那么 delayQueue.poll 就会阻塞着,这样就不会有空推进的情况发送。

    我们来看下推进的方法。

    从上面的 add 方法我们知道每次对比都是根据expiration < currentTime + interval 来进行对比的,而advanceClock 就是用来推进更新 currentTime 的。

    小结一下

    Kafka 用了多层次时间轮来实现,并且是按需创建时间轮,采用任务的绝对时间来判断延期,并且对于每个槽(槽内存放的也是任务的双向链表)都会维护一个过期时间,利用 DelayQueue 来对每个槽的过期时间排序,来进行时间的推进,防止空推进的存在。

    每次推进都会更新 currentTime 为当前时间戳,当然做了点微调使得 currentTime 是 tickMs 的整数倍。并且每次推进都会把能降级的任务重新插入降级。

    可以看到这里的 DelayQueue 的元素是每个槽,而不是任务,因此数量就少很多了,这应该是权衡了对于槽操作的延时队列的时间复杂度与空推进的影响。

    总结

    首先介绍了 Timer、DelayQueue 和 ScheduledThreadPool,它们都是基于优先队列实现的,O(logn) 的时间复杂度在任务数多的情况下频繁的入队出队对性能来说有损耗。因此适合于任务数不多的情况

    Timer 是单线程的会有阻塞的风险,并且对异常没有做处理,一个任务出错 Timer 就挂了。而 ScheduledThreadPool 相比于 Timer 首先可以多线程来执行任务,并且线程池对异常做了处理,使得任务之间不会有影响。

    并且 Timer 和 ScheduledThreadPool 可以周期性执行任务。 而 DelayQueue 就是个具有优先级的阻塞队列。

    对比而言时间轮更适合任务数很大的延时场景,它的任务插入和删除时间复杂度都为O(1)。对于延迟超过时间轮所能表示的范围有两种处理方式,一是通过增加一个字段-轮数,Netty 就是这样实现的。二是多层次时间轮,Kakfa 是这样实现的。

    相比而言 Netty 的实现会有空推进的问题,而 Kafka 采用 DelayQueue 以槽为单位,利用空间换时间的思想解决了空推进的问题。

    可以看出延迟任务的实现都不是很精确的,并且或多或少都会有阻塞的情况,即使你异步执行,线程不够的情况下还是会阻塞。

    巨人的肩膀

    《深入理解Kafka:核心设计与实践原理》
    https://www.cnblogs.com/luozhiyun/p/12075326.html


    我是 yes,从一点点到亿点点,我们下篇见

    往期推荐:

    Kafka和RocketMQ底层存储之那些你不知道的事

    消息队列面试热点一锅端

    图解+代码|常见限流算法以及限流在单机分布式场景下的思考

    表弟告状,被逼无奈我都招了- 缓存高可用

    表弟面试被虐我教他缓存连招

    面试官:说说Kafka处理请求的全流程

    展开全文
  • Java 线程池

    2011-04-27 17:43:00
    线程池线程池功能应用程序可以有多个线程,这些线程在休眠状态中需要耗费大量时间来等待事件发生。其他线程可能进入睡眠状态,并且仅定期被唤醒以循更改或更新状态信息,然后再次进入休眠状态。为了简化对这些线程...
  • 线程池功能

    2011-11-02 16:01:43
    应用程序可以有多个线程,这些线程在休眠状态中需要耗费大量时间来等待事件发生。其他线程可能进入睡眠状态,并且仅定期被唤醒以循更改或更新状态信息,然后再次进入休眠状态。为了简化对这些线程的管理,.NET框架...
  • 线程池功能

    2011-04-27 23:11:00
    应用程序可以有多个线程,这些线程在休眠状态中需要耗费大量时间来等待事件发生。其他线程可能进入睡眠状态,并且仅定期被唤醒以循更改或更新状态信息,然后再次进入休眠状态。为了简化对这些线程的管理,...
  • 趣谈Java线程池

    2021-04-11 18:48:37
    就是同一个时间有多个任务要执行,比如,我一下想发上千封邮件,我们不可能一个一个发嘛,一个一个发就像是中午下课了,学生都排队去打饭,但是只有一个阿姨在打饭,这样就只能等前面的学生打完了,才能到你咯。...
  • 简单的线程池实现

    2016-03-14 16:51:07
    应用程序可以有多个线程,这些线程在休眠状态中需要耗费大量时间来等待事件发生。其他线程可能进入睡眠状态,并且仅定期被唤醒以循更改或更新状态信息,然后再次进入休眠状态。为了简化对这些线程的管理,.NET框架...
  • linux线程池的实现

    2014-12-02 14:49:59
    应用程序可以有多个线程,这些线程在休眠状态中需要耗费大量时间来等待事件发生。其他线程可能进入睡眠状态,并且仅定期被唤醒以循更改或更新状态信息,然后再次进入休眠状态。为了简化对这些线程的管理,.NET框架...
  • Java配置线程池

    2012-08-25 18:20:00
    前言  在之前写程序的时候并没有考虑到关于程序的性能方面的问题,随后到了公司工作之后老板叫我优化一个程序的链接...其他线程可能进入睡眠状态,并且仅定期被唤醒以循更改或更新状态信息,然后再次进入休眠状...
  • 摘要:源码这东西看着能似懂非懂,有些地方你不知道人家为什么...所以这次我从一个最简单的线程池开始,带着每一版遇到的问题,将线程池的各种核心功能逐一给造出来,最后再结合java线程池源码一起分析。所以共分三篇讲
  • 浅谈线程池的使用

    千次阅读 2013-06-20 20:01:09
    1、什么是线程池线程池是一种多线程处理形式,处理过程中将任务添加到队列,然后在创建线程后自动启动这些任务。...其他线程可能进入睡眠状态,并且仅定期被唤醒以循更改或更新状态信息,然后再次进入休眠状态。
  • 如果所有线程池线程都始终保持繁忙,但队列中包含挂起的工作,则线程池将在一段时间后创建另一个辅助线程但线程的数目永远不会超过最大值.超过最大值的线程可以排队,但他们要等到其他线程完成后才启动
  • linux线程池的实现——bobo

    千次阅读 2013-12-30 21:57:52
    应用程序可以有多个线程,这些线程在休眠状态中需要耗费大量时间来等待事件发生。其他线程可能进入睡眠状态,并且仅定期被唤醒以循更改或更新状态信息,然后再次进入休眠状态。为了简化对这些线程的管理,.NET框架...
  • linux C写一个项目级别的线程池

    千次阅读 2020-07-26 11:15:28
    微信公众号:二进制人生 专注于嵌入式linux开发。...线程池是一种高并发下常用的任务后台处理模型,主要是避免为短时间处理的任务进行频繁的线程创建和销毁,以及系统中过多的线程导致过度调度,而这些都.
  • 前段时间有幸接到腾讯上海分公司的 android开发面试,虽然最后一被毙了。但还是得总结一下自己在android开发中的一些盲点,最让我尴尬的是面试官问我几个android中线程池的使用与理解。。哎。。平时就知道new 线程...
  • 最近看 Kafka 看到了时间轮算法,记得以前看 Netty 也看到过这玩意,没太过关注。今天就来看看时间轮到底是什么东西。为什么要用时间轮算法来实现延迟操作?延时操作 Java 不是提供了 Timer 么?还有 DelayQueue 配合...
  • 最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目 即:最佳线程数目 = (线程等待时间/线程CPU时间之比 + 1)* CPU数目 ### PoolSizeCalculator 抽象类:网上有得抄,就不造轮子了。 ...
  • 时间轮HashedWheelTimer原理

    千次阅读 2019-04-08 15:53:42
    How to Use? 和计划任务线程池...前者基于延时队列,而后者基于时间轮实现。先看它的使用方式。 HashedWheelTimer hashedWheelTimer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS); has...
  • 前一段时间工作终于忙完了,可以短暂休息几周,因为老大们决定放弃一些平台了, 所以,这次技术上,可以一次性升级到位,使用C++ 11 ,15,17了, 真TMD和谐。 下面转载一篇关于C++ 11版本线程池的文章,个人觉得...

空空如也

空空如也

1 2 3 4
收藏数 75
精华内容 30
关键字:

时间轮线程池