精华内容
下载资源
问答
  • 文章目录准备redis延迟队列工具类枚举执行器开搞发送延迟队列线程池接受队列处理业务 准备 redis延迟队列工具类 import lombok.extern.slf4j.Slf4j; import org.redisson.api.RBlockingDeque; import org.redisson....

    准备

    redis延迟队列工具类

    在这里插入图片描述

    import lombok.extern.slf4j.Slf4j;
    import org.redisson.api.RBlockingDeque;
    import org.redisson.api.RDelayedQueue;
    import org.redisson.api.RedissonClient;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.Map;
    import java.util.concurrent.TimeUnit;
    @Slf4j
    @Component
    public class RedisDelayQueueUtil {
     
        @Autowired
        private RedissonClient redissonClient;
     
        /**
         * 添加延迟队列
         * @param value 队列值
         * @param delay 延迟时间
         * @param timeUnit 时间单位
         * @param queueCode 队列键
         * @param <T>
         */
        public <T> void addDelayQueue(T value, long delay, TimeUnit timeUnit, String queueCode){
            try {
                RBlockingDeque<Object> blockingDeque = redissonClient.getBlockingDeque(queueCode);
                RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingDeque);
                delayedQueue.offer(value, delay, timeUnit);
                log.info("(添加延时队列成功) 队列键:{},队列值:{},延迟时间:{}", queueCode, value, timeUnit.toHours(delay) + "小时");
            } catch (Exception e) {
                log.error("(添加延时队列失败) {}", e.getMessage());
                throw new RuntimeException("(添加延时队列失败)");
            }
        }
     
     /**
      * 获取延迟队列
      * @param queueCode
      * @param <T>
      * @return
      * @throws InterruptedException
      */
        public <T> T getDelayQueue(String queueCode) throws InterruptedException {
             RBlockingDeque<Map> blockingDeque = redissonClient.getBlockingDeque(queueCode);
            if (blockingDeque.size() > 0) {
                T value = (T) blockingDeque.take();
                return value;
            }
            return null;
     }
    }
    

    枚举

    @Getter
    @NoArgsConstructor
    @AllArgsConstructor
    public enum RedisDelayQueueEnum {
     
     ORDER_PAYMENT_TIMEOUT("ORDER_PAYMENT_TIMEOUT","订单支付超时,自动取消订单", "orderTimeout");
    
     /**
      * 延迟队列 Redis Key
      */
     private String code;
     
     /**
      * 中文描述
      */
     private String name;
     
     /**
      * 延迟队列具体业务实现的 Bean
      * 可通过 Spring 的上下文获取
      */
     private String beanId;
     
    }
    

    执行器

    在这里插入图片描述

    public interface RedisDelayQueueHandle<T> {
     
     void execute(T t);
     
    }
    
    • 这里是一个bean,名字默认是驼峰命名,便于线程池部分获取到此bean用于处理执行
    @Component
    @Slf4j
    public class OrderTimeout implements RedisDelayQueueHandle<String> {
    
        @Autowired
        private ActActivityTravelOrderService orderService;
        @Autowired
        private ActActivityService actActivityService;
        @Override
        public void execute(String id) {
            log.info("(收到订单超时延迟消息) {}", id);
            ActActivityTravelOrder order = orderService.getById(id);
            //超时未确认,出行机构未提交方案
            if (Objects.isNull(order.getSchemeId())){
                String activityId = order.getActivityId();
                ActActivity activity = actActivityService.getById(activityId);
                LambdaUpdateWrapper<ActActivity> lambdaUpdate = Wrappers.lambdaUpdate();
                lambdaUpdate.set(ActActivity::getTravelAgencyId, null)
                            .set(ActActivity::getTravelStatus, ActivityTravelStatus.NO_SELECT)
                            .eq(ActActivity::getId, activity.getId());
                actActivityService.update(lambdaUpdate);
            }
        }
    }
    

    开搞

    发送延迟队列

    • 用于实际业务场景中
      redisDelayQueueUtil.addDelayQueue(travelOrder.getId(), 4, TimeUnit.HOURS, RedisDelayQueueEnum.ORDER_PAYMENT_TIMEOUT.getCode());
    

    线程池接受队列处理业务

    import com.xk.practice.act.emuns.RedisDelayQueueEnum;
    import com.xk.practice.act.handler.RedisDelayQueueHandle;
    import com.xk.practice.act.utils.RedisDelayQueueUtil;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.CommandLineRunner;
    import org.springframework.stereotype.Component;
    import com.xk.base.core.util.SpringContextHolder;
    
    import java.util.concurrent.Executors;
    @Slf4j
    @Component
    public class RedisDelayQueueRunner implements CommandLineRunner {
    
        @Autowired
        private RedisDelayQueueUtil redisDelayQueueUtil;
    
        @Override
        public void run(String... args) {
            Executors.newFixedThreadPool(1).execute(() -> {
                while (true) {
                    try {
                        RedisDelayQueueEnum[] queueEnums = RedisDelayQueueEnum.values();
                        for (RedisDelayQueueEnum queueEnum : queueEnums) {
                            Object value = redisDelayQueueUtil.getDelayQueue(queueEnum.getCode());
                            if (value != null) {
                                RedisDelayQueueHandle redisDelayQueueHandle = SpringContextHolder.getBean(queueEnum.getBeanId());
                                redisDelayQueueHandle.execute(value);
                            }
                        }
                    } catch (Exception e) {
                        log.error("(Redis延迟队列异常中断) {}", e.getMessage());
                    }
                }
            });
            log.info("(Redis延迟队列启动成功)");
        }
    }
    
    展开全文
  • 线程池和读/写延迟统计信息 Cassandra为不同的执行阶段维护不同的线程池。每个线程池提供活动,挂起和完成的任务数量的统计信息。未决任务列增加的这些池的趋势表明何时增加额外的容量。建立基准后,在待处理任务列...

    线程池和读/写延迟统计信息

    Cassandra为不同的执行阶段维护不同的线程池。每个线程池提供活动,挂起和完成的任务数量的统计信息。未决任务列增加的这些池的趋势表明何时增加额外的容量。建立基准后,在待处理任务列中配置超出正常范围的报警。在命令行上使用nodetool tpstats来查看下表中显示的线程池详细信息。

    由nodetool tpstats报告的线程池统计信息

    线程池描述
    AntiEntropyStage与维修有关的任务
    CacheCleanupExecutor与缓存维护相关的任务(计数器缓存,行缓存)
    CompactionExecutor与压实有关的任务
    CounterMutationStage与领先的计数器写入有关的任务
    GossipStage有关八卦协议的任务
    HintsDispatcher与发送提示相关的任务
    InternalResponseStage与其他内部任务响应相关的任务
    MemtableFlushWriter与清除memtables相关的任务
    MemtablePostFlushmemtable刷新完成后与维护相关的任务
    MemtableReclaimMemory与回收memtable内存有关的任务
    MigrationStage与模式维护相关的任务
    MiscStage与各种任务相关的任务,包括快照和删除主机
    MutationStage与写入相关的任务
    Native-Transport-Requests与来自CQL的客户端请求相关的任务
    PendingRangeCalculator任务涉及在引导/解除后重新计算范围所有权
    PerDiskMemtableFlushWriter_ *有关将memtables刷新到给定磁盘的任务
    ReadRepairStage与执行读取修复相关的任务
    ReadStage与读取有关的任务
    RequestResponseStage来自节点内请求的回调任务
    Sampler与抽样统计有关的任务
    SecondaryIndexManagement与二级索引维护有关的任务
    ValidationExecutor与验证压缩相关的任务
    ViewMutationStage与维护物化视图有关的任务
    展开全文
  • 线程池

    2018-10-11 18:53:07
    线程池的分类 ThreadPoolExecutor Java是天生就支持并发的语言,支持并发意味着多线程,线程的频繁创建在高并发及大数据量是非常消耗资源的,因为java提供了线程池。在jdk1.5以前的版本中,线程池的使用是及其简陋...

    在这里插入图片描述

    线程池的分类

    ThreadPoolExecutor
    Java是天生就支持并发的语言,支持并发意味着多线程,线程的频繁创建在高并发及大数据量是非常消耗资源的,因为java提供了线程池。在jdk1.5以前的版本中,线程池的使用是及其简陋的,但是在JDK1.5后,有了很大的改善。JDK1.5之后加入了java.util.concurrent包,java.util.concurrent包的加入给予开发人员开发并发程序以及解决并发问题很大的帮助。这篇文章主要介绍下并发包下的Executor接口,Executor接口虽然作为一个非常旧的接口(JDK1.5 2004年发布),但是很多程序员对于其中的一些原理还是不熟悉,因此写这篇文章来介绍下Executor接口,同时巩固下自己的知识。如果文章中有出现错误,欢迎大家指出。
    Executor框架的最顶层实现是ThreadPoolExecutor类,Executors工厂类中提供的newScheduledThreadPool、newFixedThreadPool、newCachedThreadPool方法其实也只是ThreadPoolExecutor的构造函数参数不同而已。通过传入不同的参数,就可以构造出适用于不同应用场景下的线程池,那么它的底层原理是怎样实现的呢,这篇就来介绍下ThreadPoolExecutor线程池的运行过程。

    corePoolSize: 核心池的大小。 当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中
    maximumPoolSize: 线程池最大线程数,它表示在线程池中最多能创建多少个线程;
    keepAliveTime: 表示线程没有任务执行时最多保持多久时间会终止。
    unit: 参数keepAliveTime的时间单位,有7种取值,在TimeUnit类中有7种静态属性:

    在这里插入图片描述

    1.newCachedThreadPool

    创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。示例代码如下:

    		// 无限大小线程池 jvm自动回收
    		ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
    		for (int i = 0; i < 10; i++) {
    			final int temp = i;
    			newCachedThreadPool.execute(new Runnable() {
    				@Override
    				public void run() {
    					try {
    						Thread.sleep(100);
    					} catch (Exception e) {
    						// TODO: handle exception
    					}
    					System.out.println(Thread.currentThread().getName() + ",i:" + temp);
    
    				}
    			});
    		}
    

    总结: 线程池为无限大,当执行第二个任务时第一个任务已经完成,会复用执行第一个任务的线程,而不用每次新建线程。

    2.newFixedThreadPool

    创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。示例代码如下:

    ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
    		for (int i = 0; i < 10; i++) {
    			final int temp = i;
    			newFixedThreadPool.execute(new Runnable() {
    
    				@Override
    				public void run() {
    					System.out.println(Thread.currentThread().getId() + ",i:" + temp);
    
    				}
    			});
    		}
    

    3.newScheduledThreadPool

    创建一个定长线程池,支持定时及周期性任务执行。延迟执行示例代码如下:

    ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(5);
    		for (int i = 0; i < 10; i++) {
    			final int temp = i;
    			newScheduledThreadPool.schedule(new Runnable() {
    				public void run() {
    					System.out.println("i:" + temp);
    				}
    			}, 3, TimeUnit.SECONDS);
    }
    

    表示延迟3秒执行。

    4.newSingleThreadExecutor

    创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。示例代码如下:

    	ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
    		for (int i = 0; i < 10; i++) {
    			final int index = i;
    			newSingleThreadExecutor.execute(new Runnable() {
    
    				@Override
    				public void run() {
    					System.out.println("index:" + index);
    					try {
    						Thread.sleep(200);
    					} catch (Exception e) {
    						// TODO: handle exception
    					}
    				}
    			});
    		}
    

    线程池原理剖析

    提交一个任务到线程池中,线程池的处理流程如下:
    1、判断线程池里的核心线程是否都在执行任务,如果不是(核心线程空闲或者还有核心线程没有被创建)则创建一个新的工作线程来执行任务。如果核心线程都在执行任务,则进入下个流程。
    2、线程池判断工作队列是否已满,如果工作队列没有满,则将新提交的任务存储在这个工作队列里。如果工作队列满了,则进入下个流程。
    3、判断线程池里的线程是否都处于工作状态,如果没有,则创建一个新的工作线程来执行任务。如果已经满了,则交给饱和策略来处理这个任务。()
    在这里插入图片描述

    自定义线程线程池

    (解释:corePoolSize是指最大运行线程数,maximumPoolSize是指最大创建线程数)
    如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
    如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
    如果队列已经满了,则在总线程数不大于maximumPoolSize的前提下,则创建新的线程
    如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
    如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。

    合理配置线程池

    1. CPU密集

    (解释:当cpu密集时,线程池配置的最大线程数maximumPoolSize=cpu核数)
    CPU密集的意思是该任务需要大量的运算,而没有阻塞,CPU一直全速运行。
    CPU密集任务只有在真正的多核CPU上才可能得到加速(通过多线程),而在单核CPU上,无论你开几个模拟的多线程,该任务都不可能得到加速,因为CPU总的运算能力就那些。

    2.IO密集

    (解释:当io密集时,线程池的最大线程数maximumPoolSize=cpu核数*2)

    IO密集型,即该任务需要大量的IO,即大量的阻塞。在单线程上运行IO密集型的任务会导致浪费大量的CPU运算能力浪费在等待。所以在IO密集型任务中使用多线程可以大大的加速程序运行,即时在单核CPU上,这种加速主要就是利用了被浪费掉的阻塞时间。

    接着上一篇探讨线程池留下的尾巴,如何合理的设置线程池大小。
    要想合理的配置线程池的大小,首先得分析任务的特性,可以从以下几个角度分析:

    1. 任务的性质:CPU密集型任务、IO密集型任务、混合型任务。
    2. 任务的优先级:高、中、低。
    3. 任务的执行时间:长、中、短。
    4. 任务的依赖性:是否依赖其他系统资源,如数据库连接等。
      性质不同的任务可以交给不同规模的线程池执行。
      对于不同性质的任务来说,CPU密集型任务应配置尽可能小的线程,如配置CPU个数+1的线程数,IO密集型任务应配置尽可能多的线程,因为IO操作不占用CPU,不要让CPU闲下来,应加大线程数量,如配置两倍CPU个数+1,而对于混合型的任务,如果可以拆分,拆分成IO密集型和CPU密集型分别处理,前提是两者运行的时间是差不多的,如果处理时间相差很大,则没必要拆分了。
      若任务对其他系统资源有依赖,如某个任务依赖数据库的连接返回的结果,这时候等待的时间越长,则CPU空闲的时间越长,那么线程数量应设置得越大,才能更好的利用CPU。
      当然具体合理线程池值大小,需要结合系统实际情况,在大量的尝试下比较才能得出,以上只是前人总结的规律。

    最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目
    比如平均每个线程CPU运行时间为0.5s,而线程等待时间(非CPU运行时间,比如IO)为1.5s,CPU核心数为8,那么根据上面这个公式估算得到:((0.5+1.5)/0.5)8=32。这个公式进一步转化为:
    最佳线程数目 = (线程等待时间与线程CPU时间之比 + 1)
    CPU数目
    可以得出一个结论:
    线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。
    以上公式与之前的CPU和IO密集型任务设置线程数基本吻合。

    CPU密集型时,任务可以少配置线程数,大概和机器的cpu核数相当,这样可以使得每个线程都在执行任务
    IO密集型时,大部分线程都阻塞,故需要多配置线程数,2*cpu核数
    操作系统之名称解释:
    某些进程花费了绝大多数时间在计算上,而其他则在等待I/O上花费了大多是时间,
    前者称为计算密集型(CPU密集型)computer-bound,后者称为I/O密集型,I/O-bound。

    Callable

    在Java中,创建线程一般有两种方式,一种是继承Thread类,一种是实现Runnable接口。然而,这两种方式的缺点是在线程任务执行结束后,无法获取执行结果。我们一般只能采用共享变量或共享存储区以及线程通信的方式实现获得任务结果的目的。
    不过,Java中,也提供了使用Callable和Future来实现获取任务结果的操作。Callable用来执行任务,产生结果,而Future用来获得结果。
    Callable接口与Runnable接口是否相似,查看源码,可知Callable接口的定义如下:

    @FunctionalInterface
    public interface Callable<V> {
        /**
         * Computes a result, or throws an exception if unable to do so.
         *
         * @return computed result
         * @throws Exception if unable to compute a result
         */
        V call() throws Exception;
    }
    

    可以看到,与Runnable接口不同之处在于,call方法带有泛型返回值V。

    Future模式

    Future模式的核心在于:去除了主函数的等待时间,并使得原本需要等待的时间段可以用于处理其他业务逻辑
    Futrure模式:对于多线程,如果线程A要等待线程B的结果,那么线程A没必要等待B,直到B有结果,可以先拿到一个未来的Future,等B有结果是再取真实的结果。
     在多线程中经常举的一个例子就是:网络图片的下载,刚开始是通过模糊的图片来代替最后的图片,等下载图片的线程下载完图片后在替换。而在这个过程中可以做一些其他的事情。
     在这里插入图片描述

    首先客户端向服务器请求RealSubject,但是这个资源的创建是非常耗时的,怎么办呢?这种情况下,首先返回Client一个FutureSubject,以满足客户端的需求,于此同时呢,Future会通过另外一个Thread 去构造一个真正的资源,资源准备完毕之后,在给future一个通知。如果客户端急于获取这个真正的资源,那么就会阻塞客户端的其他所有线程,等待资源准备完毕。

    公共数据接口,FutureData和RealData都要实现。

    public interface Data {
    	public abstract String getRequest();
    }
    

    FutureData,当有线程想要获取RealData的时候,程序会被阻塞。等到RealData被注入才会使用getReal()方法。

    public class FurureData implements Data {
    
    	public volatile static boolean ISFLAG = false;
    	private RealData realData;
    
    	public synchronized void setRealData(RealData realData) {
    		// 如果已经获取到结果,直接返回
    		if (ISFLAG) {
    			return;
    		}
    		// 如果没有获取到数据,传递真是对象
    		this.realData = realData;
    		ISFLAG = true;
    		// 进行通知
    		notify();
    	}
    
    	@Override
    	public synchronized String getRequest() {
    		while (!ISFLAG) {
    			try {
    				wait();
    			} catch (Exception e) {
    
    			}
    		}
    		// 获取到数据,直接返回
    		return realData.getRequest();
    	}
    
    }
    

    真实数据RealData

    public class RealData implements Data {
    	private String result;
    
    	public RealData(String data) {
    		System.out.println("正在使用data:" + data + "网络请求数据,耗时操作需要等待.");
    		try {
    			Thread.sleep(3000);
    		} catch (Exception e) {
    
    		}
    		System.out.println("操作完毕,获取结果...");
    		result = "余胜军";
    	}
    
    	@Override
    	public String getRequest() {
    		return result;
    	}
    
    

    调用者:

    public class Main {
    
    	public static void main(String[] args) {
    		FutureClient futureClient = new FutureClient();
    		Data request = futureClient.request("请求参数.");
    		System.out.println("请求发送成功!");
    		System.out.println("执行其他任务...");
    		String result = request.getRequest();
    		System.out.println("获取到结果..." + result);
    	}
    
    }
    

    调用者请求资源,client.request(“name”); 完成对数据的准备
    当要获取资源的时候,data.getResult() ,如果资源没有准备好isReady = false;那么就会阻塞该线程。直到资源获取然后该线程被唤醒。

    展开全文
  • 基于 Executor自定义线程池内的延迟和周期性任务 基于 Executor自定义线程池内的延迟和周期性任务,它通过 ScheduledThreadPoolExecutor 类来实现,并允许运行以下这两种任务: Delayed 任务:延迟运行一个任务 ...

    基于 Executor自定义线程池内的延迟和周期性任务

    基于 Executor自定义线程池内的延迟和周期性任务,它通过 ScheduledThreadPoolExecutor 类来实现,并允许运行以下这两种任务:

    • Delayed 任务:延迟运行一个任务
      Executor框架提供ThreadPoolExecutor类,使用池中的线程来执行Callable和Runnable任务,这样可以避免所有线程的创建操作。当你提交一个任务给执行者,会根据执行者的配置尽快执行它。在有些使用情况下,当你对尽快执行任务不感觉兴趣。你可能想要在一段时间之后执行任务或周期性地执行任务。基于这些目的,Executor框架提供 ScheduledThreadPoolExecutor类。
    • Periodic 任务:这种任务在延迟后执行,然后通常周期性运行
    • Executor框架提供ThreadPoolExecutor类,使用池中的线程执行并发任务,从而避免所有线程的创建操作。当你提交任务给执行者,根据它的配置,它尽快地执行任务。当它结束,任务将被执行者删除,如果你想再次运行任务,你必须再次提交任务给执行者。

    Delayed 任务可以执行 Callable 和 Runnable 对象,但是 periodic任务只能执行 Runnable 对象。全部任务通过计划池执行的都必须实现 RunnableScheduledFuture 接口。在这个例子中实现 RunnableScheduledFuture 接口来执行延迟和周期性任务。

    1、 创建类MyScheduledTask,扩展 FutureTask 类并实现 RunnableScheduledFuture 接口:

    //1.  创建一个类,名为 MyScheduledTask,使名为 V 的泛型类型参数化。它扩展 FutureTask 类并实现 RunnableScheduledFuture 接口。
    public class MyScheduledTask<V> extends FutureTask<V> implements
            RunnableScheduledFuture<V> {
    
        //2.   声明一个私有 RunnableScheduledFuture 属性,名为 task.
        private RunnableScheduledFuture<V> task;
    
        //3.   声明一个私有 ScheduledThreadPoolExecutor,名为 executor.
        private ScheduledThreadPoolExecutor executor;
    
        //4.   声明一个私有long属性,名为 period。
        private long period;
    
        //5.   声明一个私有long属性,名为 startDate。
        private long startDate;
    
        //6.   实现类的构造函数。它接收任务:将要运行的 Runnable 对象,任务要返回的 result,
        // 将被用来创建 MyScheduledTask 对象的 RunnableScheduledFuture 任务,
        // 和要执行这个任务的 ScheduledThreadPoolExecutor 对象。 调用它的父类的构造函数并储存任务和执行者属性。
        public MyScheduledTask(Runnable runnable, V result, RunnableScheduledFuture<V> task, ScheduledThreadPoolExecutor executor) {
            super(runnable, result);
            this.task = task;
            this.executor = executor;
        }
    
        //7.	实现 getDelay() 方法。如果是周期性任务且 startDate 形象的值非0,计算并返回 startDate 属性与当前日期的相差值。
        // 否则,返回储存在 task 属性的原先任务的延迟值。不要忘记你要返回结果时,要传递 time unit 作为参数哦。
        @Override
        public long getDelay(TimeUnit unit) {
            if (!isPeriodic()) {
                return task.getDelay(unit);
            } else {
                if (startDate == 0) {
                    return task.getDelay(unit);
                } else {
                    Date now = new Date();
                    long delay = startDate - now.getTime();
                    return unit.convert(delay, TimeUnit.MILLISECONDS);
                }
            }
        }
    
        //8.  实现 compareTo() 方法。调用原先任务的 compareTo() 方法。
        @Override
        public int compareTo(Delayed o) {
            return task.compareTo(o);
        }
    
        //9.  实现 isPeriodic() 方法。调用原来任务的 isPeriodic() 方法。
        @Override
        public boolean isPeriodic() {
            return task.isPeriodic();
        }
    
        //10. 实现方法 run()。如果这是一个周期性任务,你要用下一个执行任务的开始日期更新它的 startDate 属性。
        // 用当前日期和时间间隔的和 计算它。 然后再次把任务添加到 ScheduledThreadPoolExecutor 对象的 queue中。
        @Override
        public void run() {
            if (isPeriodic() && (!executor.isShutdown())) {
                Date now = new Date();
                startDate = now.getTime() + period;
                executor.getQueue().add(this);
            }
    
            //11.打印当前日期的信息到操控台,调用 runAndReset() 方法运行任务,然后再打印另一条关于当前日期的信息。
            System.out.printf("Pre-MyScheduledTask: %s\n", new Date());
            System.out.printf("MyScheduledTask: Is Periodic:%s\n", isPeriodic());
            super.runAndReset();
            System.out.printf("Post-MyScheduledTask: %s\n", new Date());
        }
    
        //12. 实现 setPeriod() 方法,来确立任务的周期时间。
        public void setPeriod(long period) {
            this.period = period;
        }
    }
    

    MyScheduledTask 类可以执行延迟和周期性任务。你已经实现了有全部必须的算法可以执行这2种任务的方法。他们是 getDelay() 和 run() 方法:

    1. getDelay() 方法被计划的执行者调用来确认它是否需要运行任务。此方法对延迟任务和周期任务的响应是不同的。在之前提到的, MyScheduledClass 类的构造函数接收 原先的将要执行 Runnable 对象的 ScheduledRunnableFuture 对象, 并储存它作为类的属性来获取它的方法和它的数据。当我们要运行延迟任务时,getDelay() 方法返回原先任务的延迟,但是在周期任务的例子中,getDelay() 方法返回 startDate 属性值与当前时间的相差值。
    2. run() 方法是用来执行任务的。周期性任务的一个特别之处是你必须把下一次任务的执行作为一个新的任务放入到执行者的queue中,如果你要再次运行任务的话。所以,如果你执行周期性任务,你确定 startDate 属性值通过把当前时间和任务的执行周期相加,然后把任务储存在执行者的queue中。startDate 属性储存下一次任务将开始运行的时间。然后,使用 FutureTask 类提供的 runAndReset() 方法来运行任务。
    3. 必须要注意如果执行者已经关闭,就不需要再次把周期性任务储存进执行者的queue。

    2、创建类MyScheduledThreadPoolExecutor 来实现一个运行 MyScheduledTask 任务的 ScheduledThreadPoolExecutor 对象:

    //1. 创建一个类,名为 MyScheduledThreadPoolExecutor 来实现一个运行 MyScheduledTask 任务的 ScheduledThreadPoolExecutor 对象。
    // 特别扩展 ScheduledThreadPoolExecutor 类。
    public class MyScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor {
    
        //2. 实现类的构造函数,只要调用它的父类的构造函数。
        public MyScheduledThreadPoolExecutor(int corePoolSize) {
            super(corePoolSize);
        }
    
        //3. 实现方法 decorateTask()。它接收将要被运行的 Runnable 对象和将运行 Runnable 对象的 RunnableScheduledFuture 任务作为参数。
        // 使用这些对象来构造来创建并返回 MyScheduledTask 任务。
        @Override
        protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
            MyScheduledTask<V> myTask = new MyScheduledTask<V>(runnable, null, task, this);
            return myTask;
        }
    
        //4. 覆盖方法 scheduledAtFixedRate()。调用它的父类的方法
        // method:Call the method of its parent class, convert the returned object into a MyScheduledTask object,
        // and establish the period of that task using the setPeriod() method.
        @Override
        public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
            ScheduledFuture<?> task = super.scheduleAtFixedRate(command, initialDelay, period, unit);
            MyScheduledTask<?> myTask = (MyScheduledTask<?>) task;
            myTask.setPeriod(TimeUnit.MILLISECONDS.convert(period, unit));
            return task;
        }
    
        //5.  创建一个类,名为 Task,实现 Runnable 接口。
        public static class Task implements Runnable {
    
            //6. 实现方法 run() 。在任务开始时打印一条信息,再让当前线程进入休眠2秒。最后在任务结束时,再打印另一条信息。
            @Override
            public void run() {
                System.out.printf("Task: Begin.\n");
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.printf("Task: End.\n");
            }
    
            //7. 创建例子的主类通过创建一个类,名为 Main 并添加 main()方法。
            public static class Main {
    
                public static void main(String[] args) throws Exception {
    
                    //8. 创建一个 MyScheduledThreadPoolExecutor 对象,名为 executor。使用2作为参数来在池中获得2个线程。
                    MyScheduledThreadPoolExecutor executor = new MyScheduledThreadPoolExecutor(2);
    
                    //9. 创建 Task 对象,名为 task。把当前日期写入操控台。
                    Task task = new Task();
                    System.out.printf("Main: %s\n", new Date());
    
                    //10. 使用 schedule() 方法发送一个延迟任务给执行者。此任务在延迟一秒后运行。
                    executor.schedule(task, 1, TimeUnit.SECONDS);
    
                    //11. 让主线程休眠3秒。
                    TimeUnit.SECONDS.sleep(3);
    
                    //12. 创建另一个 Task 对象。再次在操控台打印当前日期。
                    task = new Task();
                    System.out.printf("Main: %s\n", new Date());
    
                    //13. 使用方法 scheduleAtFixedRate()发送一个周期性任务给执行者。此任务在延迟一秒后被运行,然后每3秒执行。
                    executor.scheduleAtFixedRate(task, 1, 3, TimeUnit.SECONDS);
    
                    //14. 让主线程休眠10秒。
                    TimeUnit.SECONDS.sleep(10);
    
                    //15. 使用 shutdown() 方法关闭执行者。使用 awaitTermination() 方法等待执行者的完结。
                    executor.shutdown();
                    executor.awaitTermination(1, TimeUnit.DAYS);
    
                    //16. 写信息到操控台表明任务结束。
                    System.out.printf("Main: End of the program.\n");
                }
            }
        }
    }
    

    运行结果:
    在这里插入图片描述

    3、结论

    这个例子实现了 MyScheduledTask 类实现在 ScheduledThreadPoolExecutor 执行者中执行的自定义任务。它实现 RunnableScheduledFuture 接口, 因为在计划的执行者中执行的全部任务都一定要实现 这个接口,并扩展了 FutureTask 类,因为这个类提供了能有效的实现在 RunnableScheduledFuture 接口声明的方法。 之前提到的全部接口和类都被参数化成任务要返回的数据类型。

    为了在计划的执行者中使用 MyScheduledTask 任务,要重写在 MyScheduledThreadPoolExecutor 类的 decorateTask() 方法。这个类扩展 ScheduledThreadPoolExecutor 执行者和它的方法提供一个把 ScheduledThreadPoolExecutor 执行者默认的计划任务转换成 MyScheduledTask 任务来实现的机制。所以,当你实现你的版本的计划任务时,你必须实现你的版本的计划的执行者。

    decorateTask() 方法只是简单的创建了新的带有参数的 MyScheduledTask 对象:将要在任务中执行的 Runnable 对象; 将被任务返回结果对象,在这个例子,任务将不会返回结果,所以你要使用null值;原来执行 Runnable 对象的任务,新的对象将在池中代替这个任务;和
    将执行任务的执行者,在这个例子,你使用 this 关键词指向创建这个任务的执行者。

    最后重写了在 MyScheduledThreadPoolExecutor 类的 scheduleAtFixedRate() 方法。我们之前提到的,对于周期任务,你要使用任务的周期来确定 startDate 属性值,但是你还没有初始这个周期呢。你必须重写此方法接收周期作为参数,然后传递给 MyScheduledTask 类这样它才能使用。

    Task 类实现 Runnable 接口,也是执行者中运行的任务。这个例子的主类创建了 MyScheduledThreadPoolExecutor 执行者,然后给他们发送了以下2个任务:

    1. 一个延迟任务,在当前时间过一秒后运行
    2. 一个周期任务,在当前时间过一秒后运行,接着每隔3秒运行
    展开全文
  • 线程池

    2011-09-02 10:08:59
    为什么要用线程池? 诸如 Web 服务器、数据库服务器、文件服务器或邮件服务器之类的许多服务器应用程序都面向处理来自某些远程来源的大量短小的任务。请求以某种方式到达服务器,这种方 式可能是通过网络...
  • 线程池原理

    2021-03-23 15:26:46
    目录任务提交与任务执行策略的隐性耦合线程饥饿死锁运行时间较长的任务设置线程池的大小配置ThreadPoolExecutor线程的创建与销毁管理队列任务饱和策略线程工厂ThreadPoolExecutor定制化ThreadPoolExecutor配置总结...
  • java线程池

    2020-07-18 18:18:44
    目录线程池1、为什么要用线程池2、线程池的内部原理3、案例一:Callable+ThreadPoolExecutor示例代码4、案例二:Runnable+ThreadPoolExecutor5、线程池的5种状态6、线程池的使用场景7、创建线程池8、监测线程池运行...
  • 线程池详解

    2020-11-10 08:35:41
    线程池内部结构​ 使用线程池比手动创建线程好在哪里? 线程池相关类与接口 线程池的各个参数及创建过程? 线程池的四种拒绝策略? 常见的六种线程池? ForkJoinPool 线程池常用的阻塞队列有哪些? 合适的...
  • 线程池基础

    2017-04-23 22:24:35
    一.Executor线程池三种线程执行策略 JDK中的Executor框架是基于生产者-消费者模式的线程池,提交任务的线程是生产者,执行任务的线程是消费者。 Executor线程池可以用于异步任务执行,而且支持很多不同类型任务...
  • 线程池总结

    2019-02-21 15:40:44
    什么是线程池?  诸如web服务器、数据库服务器、文件服务器和邮件服务器等许多服务器应用都面向处理来自某些远程来源的大量短小的任务。构建服务器应用程序的一个过于简单的模型是:每当一个请求到达就创建一个新...
  • JDK线程池

    2019-09-18 23:48:47
    线程池 类继承关系,方法太多就不列出来了。 Executors Executors扮演线程池工厂的角色,ThreadPoolExecutor就代表一个线程池。Executors提供了各种类型的线程池,主要有以下这些方法: public static ...
  • java 线程池

    2018-06-26 23:17:49
    1. 为什么使用线程池诸如 Web 服务器、数据库服务器、文件服务器或邮件服务器之类的许多服务器应用程序都面向处理来自某些远程来源的大量短小的任务。请求以某种方式到达服务器,这种方式可能是通过网络协议...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 26,634
精华内容 10,653
关键字:

线程池延迟发送