精华内容
下载资源
问答
  • java异步任务处理

    千次阅读 2018-04-16 15:24:17
    帮助理解异步的小程序原文地址:https://www.cnblogs.com/chenmo-xpw/p/5652029.html1、场景 最近做项目的时候遇到了一个小问题:从前台提交到服务端A,A调用服务端B处理超时,原因是前端一次请求往db插1万数据,插完...

    帮助理解异步的小程序

    原文地址:https://www.cnblogs.com/chenmo-xpw/p/5652029.html


    1、场景

     

      最近做项目的时候遇到了一个小问题:从前台提交到服务端A,A调用服务端B处理超时,原因是前端一次请求往db插1万数据,插完之后会去清理缓存、发送消息。

    服务端的有三个操作 a、插DB b、清理cache  c、发送消息。1万条数据,说多不多,说少不少.况且不是单单insert。出现超时现象,不足为奇了吧~~

     

    2、分析

     

      如何避免超时呢?一次请求处理辣么多数据,可分多次请求处理,每次请求处理100条数据,可以有效解决超时问题. 为了不影响用户的体验,请求改为ajax 异步请求。

    除此之外,仔细分析下场景. a 操作是本次处理的核心. 而b、c操作可以异步处理。换句话说,a操作处理完可以马上返回给结果, 不必等b、c处理完再返回。b、c操作可以放在一个异步任务去处理。

     

    3、实战

     

    (1)、ExecutorService : 任务提交

     

    (2)、demo

    异步任务类

    复制代码
    public class ExecutorDemo {
    
        private ExecutorService executor = Executors.newFixedThreadPool(1);
        
        public void asynTask() throws InterruptedException {
            
            
            executor.submit(new Runnable() {
                
                @Override
                public void run() {
                    
                    try {
                        Thread.sleep(10000);//方便观察结果
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    
                    
                    int sum = 0;
                    for(int i = 0; i < 1000; i++) {
                        
                        sum += i;
                    }
                    
                    System.out.println(sum);
                }
            });
            
        }
    }
    复制代码

     客户端模拟

    复制代码
    public class Client {
    
        public static void main(String[] args) throws InterruptedException {
            
            boolean r = task2();
            
            if(r) {
                task3();
            }
            
            System.out.println("------------main end-----------");
        }
        
        static boolean task2() throws InterruptedException {
            
            ExecutorDemo e = new ExecutorDemo();
            
            e.asynTask();
            
            System.out.println("------------task2 end-----------");
            
            return true;
        }
        
        
        static void task3() throws InterruptedException {
            int j = 0;
            while(true) {
                if(j++ > 10000) {
                    break;
                }
            }
            
            System.out.println("------------task3 end-----------");
        }
    }
    复制代码

     

    结果是酱紫的

    ------------task2 end-----------
    ------------task3 end-----------
    ------------main end-----------
    499500

    展开全文
  • 畅聊Java异步编程

    2020-01-01 22:59:00
    趁着阿里的“加多”大佬推出了自己的新书《Java异步编程实战》,我也来简单聊下自己对异步编程的了解吧。 我是如何接触到异步编程的呢? 目前,我其实也是一个刚入职场不久的年轻人,对于异步编程的了解和使用有限...

    趁着阿里的“加多”大佬推出了自己的新书《Java异步编程实战》,我也来简单聊下自己对异步编程的了解吧。

    我是如何接触到异步编程的呢?

    目前,我其实是一个刚入职场不久的年轻人,对于异步编程的了解和使用有限。在实际的工作中使用最多的异步编程,就是创建一个线程池来执行一个异步任务,使得主线程可以继续往下执行,不被阻塞。那么,我是如何接触到异步编程技术的呢?

    一切还得从3年前,那个“腥风血雨”的校园招聘说起。相信大多数的小伙伴都是从接触Java并发编程开始的,当然我也不另外。在校园招聘的时候,为了应对面试官,通过面试环节,我对JUC并发包中的内容进行了比较详细的背诵,对于一些知识点都“朗朗上口”,就问你强不强?

    在校园招聘期间,我对线程池有了一定的理论了解,知道线程池可以执行一些异步任务,并且对于Future可以用来获取异步任务的执行结果也都有所耳闻。在面试期间,我也会给面试官进行解析(背诵)这些知识点,嘿嘿。

    工作之后,都在什么场景下使用异步编程呢?

    其实吧,我在实际工作中需要使用异步编程的场景不太多,一般情况下我们所执行的任务还是比较重要的,所以都要求同步来执行(不知道是不是技术不够,导致影响了效率都不清楚,尴尬.jpg)。

    工作中,使用异步编程的场景大概就是客户端请求服务端一个接口,服务端做不到在短时间内返回该结果。所以,服务端启动了一个异步任务(线程)去执行该请求的具体任务,该请求可以直接返回。然后,客户端可以在一段时间内来重新获取服务端异步线程执行的结果。这些异步编程的使用场景都感觉很简单?但是异步编程应该不止于此,否则“加多”大佬也不会写出如此一部技术书籍,感觉是自己“菜是原罪”。

    实际工作中,关于异步编程有疑惑吗?

    有,必须有。还真是巧了,这段时间关于异步编程的使用场景,我还真的遇到了一个令我困惑的地方。大概的场景描述如下:

    • 客户端A在调用服务端B的一个接口1,服务端B的接口完成了第一步处理逻辑之后,已经得到了客户端A想要的结果。按照正常来说,这个时候可以返回结果。然后客户端A会接着请求服务端C的接口2。
    • 但是服务端B在处理完逻辑之后,还需要调用一个外部接口2,并且将结果插入数据库。
    • 服务端C在处理请求的过程中,最好是让其使用到刚刚插入数据库中的数据。

    看起来,这就是一个典型的异步编程技术使用的场景吧?

    但是,服务端B在处理完逻辑之后,调用一个外部接口2如果采用了异步方式。会存在当客户端后续请求服务端C的时候,这个外部接口返回的数据还没有被写入数据库,导致预期数据丢失

    如果采用同步方式调用外部接口2呢?会存在客户端请求服务端B接口超时问题(因为额外同步调用外部接口超时导致)。

    如果外部接口的调用刚好还是有代价的,比如是收费接口,那么我们通过收费接口获取的数据在异步方式中可能就没有被使用到,亏了;如果是同步方式,接口超时又影响了用户体验,还是不好。

    希望在看了加多老师的新书《Java异步编程实战》之后,我可以从中找到一些解决思路。

    Java异步编程实战》有哪些干货?

    Java异步编程实战》一书对Java中相关的异步编程技术进行归纳总结,为读者提供一个统一文档来查阅、参考。该书也是国内首本异步编程著作,作者全面解析Java异步编程,针对各种常见异步编程场景,从编程语言、开发框架等角度深入讲解了异步编程的原理和方法。

    看了这本书的目录和介绍,应该挺适合我们想要深入研究Java异步编程相关技术的同学。希望大家可以从《Java异步编程实战》一书中得到一些Java异步编程方面的启发,对Java异步编程技术有了一个更好的理解与掌握(期待.jpg)。

    最后,我给大家一个便捷购买入口吧,新书首发,大家敬请期待吧~

    购买链接:https://item.jd.com/12778422.html 在这里插入图片描述

    展开全文
  • Java中的线程池和异步任务详解 引言 java中经常需要用到多线程来处理一些业务,我们非常不建议单纯使用继承Thread或者实现Runnable接口的方式来创建线程,那样势必有创建及销毁线程耗费资源、线程上下文切换问题,...

    Java中的线程池和异步任务详解

    转载自:https://blog.csdn.net/fanrenxiang/article/details/79855992
    引言
    java中经常需要用到多线程来处理一些业务,我们非常不建议单纯使用继承Thread或者实现Runnable接口的方式来创建线程,那样势必有创建及销毁线程耗费资源、线程上下文切换问题,这个时候引入线程池比较合理。有些时候也需要把多线程的逻辑给异步话,接口不需要等待子线程逻辑执行完就马上返回,这里就需要异步任务。java中涉及到线程池的相关类均在jdk1.5开始的java.util.concurrent包中,涉及到的几个核心类及接口包括:ExecutorService、Executors、ThreadPoolExecutor、FeatureTask、Callable、Runnable等。后面会一一描述。
    接口流程图伪代码思路如下:

     @ResponseBody
        @RequestMapping(value = "/notifyOrder", method = RequestMethod.POST)
        public CommonResult notifyOrder(@RequestParam(value = "req", required = false) String req) {
            log.info("notifyOrder method parameter:req={}", req);
            CommonResult result = new CommonResult();
            // 1、校验工作
     
            //  2、decode送过来的原始数据并序列化
            PointsMallOrderInfo orderInfo = StringToBean(req, "data", PointsMallOrderInfo.class);
     
            // 3、保存推送过来的原始数据
            orderInfoService.addOrderInfo(orderInfo);
     
            // 4、获取系统中状态为可用的虚拟码
     
            // 5、多线程异步处理 发送短信、提交虚拟码
            AsynSendAndSubmitTask.asynSendAndSubmit(orderInfo);
     
            // 6、持久化虚拟码流水号等信息
            return result;
        }
     
    public class AsynSendAndSubmitTask {
        private static ExecutorService asynSendAndSubmitThreadPool = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(40));
     
        public static void asynSendAndSubmit(final PointsMallOrderInfo orderInfo) {
            asynSendAndSubmitThreadPool.execute(new Runnable() {
                @Override
                public void run() {
                    // 短信下发虚拟码给用户
                    Long messageId = smsService.sendSms(new SendSmsInfo());
                    // 给积分商城提交虚拟码
                    String submitResult = HttpUtil.httpPost(false, params, SETVIRTUALCODE_URL);
                }
            });
        }
    }
    

    我们程序逻辑主线程中执行“保存推送过来的基础数据、查询并取出可用的充值码、持久化充值码及相关流水号信息操作,但提交虚拟码和短信发送虚拟码则以多线程的方式异步处理,加快接口响应。(当然也可以用kafka/MQ之类的消息队列替代)

    角色划分
    1、任务:指的是实现了Callable或Runnable接口的类,里面包含主要的业务逻辑,任务用于提交至线程池里的线程去执行。实现Callable接口的任务类可以有返回值,而Runnable接口则无;
    2、异步计算结果:主要是FutureTask类,里面包含了异步任务的计算结果,可以理解为执行单元,用于提交至线程池时使用;
    3、线程池:ThreadPoolExecutor和Executors类,用于构建线程池。
    Executors 和ThreadPoolExecutor
    由源码可知Executors本质上还是使用的ThreadPoolExecutor来实例化线程池的,Executors类可以创建四种类型的线程池,分别为 newFixedThreadPool、newSingleThreadExecutor、newCachedThreadPool、newScheduledThreadPool
    FixedThreadPool
    使用固定线程数,适用于为了平衡服务器资源而指定线程数的场景,一般用于负载比较高的服务器

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }
    

    如上是在ThreadPoolExecutor类中的源码,可以看到newFixedThreadPool线程池重用放在"共享无边界的队列LinkedBlockingQueue"中的固定线程数,处理任务时,nThread大部分都将以活动状态在处理任务,如果当所有线程都处于活动状态时又有额外的任务提交,那么新任务将在队列中等待直至线程可用。
    SingleThreadExecutor
    使用单个线程数,适用于需要保证顺序的执行各个任务;并且在任意时间点,不会有多个线程活动的场景

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }
    

    关于这种方式创建的线程池,源码中特别强调了:如果该单个线程在关闭之前由于执行过程中的失败而终止,那么如果需要执行后续任务,将替换一个新的线程,使用到的也是LinkedBlockingQueue队列。
    CachedThreadPool
    使用非固定线程数,适用于执行很多短期的异步任务,或者负载较轻的服务器

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }
    

    根据需要创建线程,如优重用先前创建的可用线程,否则将新建线程并放到池子中,从构造函数可以看到,默认超过60秒未使用的线程将被终止并从cache中移除,使用到的同步移交SynchronousQueue队列。
    ScheduledThreadPool
    包含多个线程,适用于需要多个后台执行周期任务(或延迟任务),同时为了满足资源管理的需求而需要限制线程数量的场景

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }
    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
    

    对应的还有个SingleThreadScheduledExecutor只包含单个线程,适用于需要单个后台线程执行周期性任务,同时需要保证顺序的执行各个任务的场景

    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }
    public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1, threadFactory));
    }
    

    使用Executors创建线程池的隐患
    我这里以newFixedThreadPool构建的线程池为例,结合源码看看Executors.newFixedThreadPool(n)创建的线程池会有哪些潜在的隐患。进入newFixedThreadPool初始化的源码

    public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {……}
    

    corePoolSize:线程池核心线程数,要注意线程池初创时候并不会启动corePoolSize个线程,而是随着任务的提交逐渐达到这个值;
    maximumPoolSize:池中的最大线程数,要注意这个参数只有在任务数量大于corePoolSize时才会起作用;
    keepAliveTime:当线程数大于corePoolSize时,多余空闲线程在终止之前等待新任务的最大时间;
    unit:keepAliveTime的时间单位;
    workQueue:用于保存任务的队列,可以为无界、有界、同步移交类型的队列,这里是BlockingQueue。当池子里的工作线程数大于corePoolSize时,这时新进来的任务才会放到阻塞队列中,;
    threadFactory:创建新的线程时的工厂类,比如guava的ThreadFactoryBuilder;
    handler:队列已满且线程数达到maximunPoolSize时候的饱和策略,取值有AbortPolicy、CallerRunsPolicy、DiscardOldestPolicy、DiscardPolicy;
    上面说到了 FixedThreadPool、SingleThreadExecutor 源码中使用到的是LinkedBlockingQueue无界队列,而CacheThreadPool和ScheduledThreadPool实例化时默认最大线程数又是Integer.MAX_VALUE,这可能导致什么结果呢?《阿里巴巴Java开发手册v1.2.0》中这样说到:
    在这里插入图片描述结合源码看:FixedThreadPool、SingleThreadExecutor的LinkedBlockQueue是一个用链表实现的有界阻塞队列,容量可以选择进行设置,默认将是一个无边界的阻塞队列,最大长度为Integer.MAX_VALUE.

      /**
       * Creates a {@code LinkedBlockingQueue} with a capacity of
       * {@link Integer#MAX_VALUE}.
       */
      public LinkedBlockingQueue() {
          this(Integer.MAX_VALUE);
      }
    

    而CacheThreadPool和ScheduledThreadPool实例化时默认最大允许创建的线程数是Integer.MAX_VALUE

     public static ExecutorService newCachedThreadPool() {
         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                       60L, TimeUnit.SECONDS,
                                       new SynchronousQueue<Runnable>());
     }
     
     public ScheduledThreadPoolExecutor(int corePoolSize) {
         super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
               new DelayedWorkQueue());
     }
    

    所以上述两个"漏洞"在特定的场景下就有可能会导致OOM,故而很多人都不建议使用这颗"定时炸弹"。
    创建线程池的正确姿势?
    那么上面说了使用Executors创建的线程池有隐患,那如何使用才能避免这个隐患呢?对症下药,既然FixedThreadPool和SingleThreadPool"可能"导致的OOM是由于使用了无界队列任务堆积,CacheThreadPool和ScheduledThreadPool是由于"可能"创建Interger.MAX_VALUE,那创建线程池时我们就使用有界队列或者指定最大允许创建线程个数即可。使用下面的构造函数

    private static ExecutorService executor = new ThreadPoolExecutor(10,10,60L, TimeUnit.SECONDS,new ArrayBlockingQueue(10));
    

    这样可以指定corePoolSize、maximumPoolSize、workQueue为ArrayBlockingQueue有界队列

     public ThreadPoolExecutor(int corePoolSize,
                               int maximumPoolSize,
                               long keepAliveTime,
                               TimeUnit unit,
                               BlockingQueue<Runnable> workQueue) {
         this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
              Executors.defaultThreadFactory(), defaultHandler);
     }
     private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
    

    默认的handler队列饱和策略为AbortPolicy(直接抛出异常),当提交任务线程数高于可用线程数,队列放满而无法处理新请求,这时候会抛出java.util.concurrent.RejectedExecutionException异常,然后手动捕获即可,总比OOM强吧。当然你也可以使用guava包中的ThreadFactoryBuilder工厂类来构造线程池:

    private static ThreadFactory threadFactory = new ThreadFactoryBuilder().build();
    private static ExecutorService executorService = new ThreadPoolExecutor(10, 10, 60L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10), threadFactory, new ThreadPoolExecutor.AbortPolicy());
    

    通过guava的ThreadFactory工厂类还可以指定线程的名称,这对于后期定位错误时也是很有帮助的

    ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-pool-d%").build();
    

    Future和FutureTask(异步任务结果)
    Future接口和FutureTask类用来表示执行异步任务的结果,当向ThreadPoolExecutor或ScheduledThreadPoolExecutor提交了一个Callable或Runnable接口的实现类时,ThreadPoolExecutor或ScheduledThreadPoolExecutor就会返回FutureTask。到目前的jdk版本为止,submit返回的是都是实现了Future接口的FutureTask。

    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    

    FutureTask一般都是和线程池搭配使用,用于多线程的方式提交任务,通过futureTask.get()方法获取异步任务的计算结果即可。如下代码:

    import java.util.concurrent.*;
     
    public class FutureTaskTest {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ExecutorService executor = Executors.newCachedThreadPool();
            try {
                CustomCallable callable = new CustomCallable();
                FutureTask<Integer> callableTask = new FutureTask<Integer>(callable);
                executor.submit(callableTask);
                System.out.println("callableTask任务计算结果=" + callableTask.get());
     
                CustomRunnable runnable = new CustomRunnable();
                FutureTask<Void> runnableTask = new FutureTask<>(runnable, null);
                executor.submit(runnableTask);
     
            } finally {
                executor.shutdown();
            }
        }
    }
     
    class CustomCallable implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            int sum = 0;
            for (int i = 0; i < 10; i++) {
                sum += i;
            }
            return sum;
        }
    }
     
    class CustomRunnable implements Runnable {
        @Override
        public void run() {
            int sum = 0;
            for (int i = 0; i < 10; i++) {
                sum += i;
            }
            System.out.println("RunnableTask任务计算结果=" + sum);
        }
    }
    

    从上面代码和开头讲到的demo1场景来看,使用FutureTask后,提交给线程池的就变成了futuretask而不是简单的实现了Runnable或Callable接口的普通任务了,并且获取任务的结果也是通过futuretask.get()方法而不是executorservice.submit()返回值。
    Callable和Runnable(任务类逻辑)
    Callable接口和Runnable接口的实现类(近似等价于被提交任务的逻辑)均可以被ThreadPoolExecutor或ScheduledThreadPoolExecutor所执行,区别在于执行的任务逻辑是否需要返回值,Callable接口实现类可以有返回值,而Runnable接口实现类则没有返回值;比如这里自定义一个实现了Callable接口的任务类:

    class CustomCallable implements Callable<Integer> {
        @Override
        public Integer call() throws Exception {
            System.out.println("子线程开始进行计算");
            Thread.sleep(500);
            int sum = 0;
            for (int i = 0; i < 10; i++)
                sum += i;
            return sum;
        }
    }
    

    也可以通过Executors类包装的如下两种方式创建任务

    //此方式创建Callable对象,通过futureTask.get()方法可以获取到异步计算结果
    public static <T> Callable<T> callable(Runnable task, T result) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<T>(task, result);
    }
    //不返回异步计算结果
    public static Callable<Object> callable(Runnable task) {
        if (task == null)
            throw new NullPointerException();
        return new RunnableAdapter<Object>(task, null);
    }
    

    1、通过futureTask.get()方法获取任务计算结果时,当任务还未完成,会导致线程阻塞直至任务完成,一般会配合futureTask.isDone()方法判断子线程任务是否完成来一起使用;

    2、当项目中有很多异步任务时,要着重测试下每个异步任务的执行时间,比如某个异步任务是调用其他系统的web服务,这时候就得测试调用需用的时间长短,如果过长,则建议使用生产/消费模式的消息队列去实现,不然容易使服务器的jvm进程崩溃;

    线程池实现原理?
    current包下的线程池实现原理相对简单,就是一个线程集合workers和存放任务的阻塞队列workQueue,当有新任务提交时就放到workQueue队列中(注:当池子里的核心线程数小于corePoolSize时任务会直接被执行),然后线程池从任务阻塞队列中"分配"任务并执行。

    /**
      * 设置线程池中的所有工作线程
      */
     private final HashSet<Worker> workers = new HashSet<Worker>();
     /**
      * 用于保存任务和切换到工作线程的队列
      */
     private final BlockingQueue<Runnable> workQueue;
    

    在这里插入图片描述Springboot中使用线程池
    springboot可以说是非常流行了,下面说说如何在springboot中让spring来帮我们管理线程池

    
    /**
     * @ClassName ThreadPoolConfig
     * @Description 构建spring管理的线程池实例,方便调用
     * @Author simonsfan
     * @Date 2018/12/20
     * Version  1.0
     */
    @Configuration
    public class ThreadPoolConfig {
     
        @Bean(value = "threadPoolInstance")
        public ExecutorService createThreadPoolInstance() {
            ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-pool-%d").build();
            ExecutorService threadPool = new ThreadPoolExecutor(10, 10, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(10), threadFactory, new ThreadPoolExecutor.AbortPolicy());
            return threadPool;
        }
     
    }
    
     @Resource(name = "threadPoolInstance")
        private ExecutorService executorService;
     
        @Override
        public void spikeConsumer() {
            //TODO
            executorService.execute(new Runnable() {
                @Override
                public void run() {
                    //TODO
                }
            });
        }
    

    总结

    这篇文章主要讲的是线程池的知识,但是还是比较的基础的,如果想继续研究,还需要看一些其它的文章。

    展开全文
  • 本文内容将基于JDK1.7的源码进行讨论,并且在文章的结尾,笔者将会给出一些经验之谈,...煮食物的时间我们假设是2个小时,那么煮食物的这个过程就是一个“异步任务”,我们把它用代码实现出来:public static cla...

    本文内容将基于JDK1.7的源码进行讨论,并且在文章的结尾,笔者将会给出一些经验之谈,希望能给学习者带来些帮助。


    举个例子

    我们以一个例子开始开始本文内容。

    有一个作家,他准备开始写作,写作时间大约1个小时,作家想“那就在写作的时候顺便煮一些食物”,写作完刚好吃一点热食物。煮食物的时间我们假设是2个小时,那么煮食物的这个过程就是一个“异步任务”,我们把它用代码实现出来:

    public static class Food implements Callable<String>{
    
        public String call() {
            System.out.println("hot food starts");
            try {
                // 煮食物ing
                Thread.sleep(20000l);
            } catch (Exception e) {
                // ignore
            }
            System.out.println("hot food ends");
            return "food is ok";
        }
    }
    
    public static void main(String[] args) {
        System.out.println("writing starts");
        FutureTask<String> futureTask = new FutureTask<String>(new Food());
        // 使用新线程
        Thread thread = new Thread(futureTask);
        thread.start();
        try {
            // 写作ing
            Thread.sleep(20000l);
        } catch (Exception e) {
            // ignore
        }
        System.out.println("writing ends");
    
        try {
            String result = futureTask.get();
            System.out.println(result);
        } catch (Exception e) {
            // ignore
        }
    }
    

    为什么要异步

    有些时候,为了快速响应,或者节省任务执行时间,有些任务是可以并行执行的。
    举个例子,我们正在执行某个计算的时候,需要通过http请求获得某个远程服务的结果,而计算过程也是一个耗时操作,可以在计算开始前先发起一个异步任务做http请求,在需要使用到远程服务结果的位置,查看当前异步任务是否已经执行完成,可以做到两件事情同步进行,缩短了任务执行时间。

    异步任务

    再举个例子,在一个客户端程序里面,包含了“提交”和“取消”两个功能,在应用点击“提交”开始执行以后,将立马发起一个异步线程,开始执行任务,但是此时客户端用户仍然可以随便操作,并不会就此卡住,在任务正常执行完可以在窗口显示执行结果。当任务执行完前,用户点击“取消”以后,异步任务将被取消,后台线程就停止了。

    FutureTask源码分析

    FutureTask实现了Future的接口,它的计算实际上是通过Callable接口来实现的,相当于一种可以生成结果的Runnable。
    那我们一起来看看在JDK 1.7里面,是怎么实现这个异步任务的。

    状态码

    FutureTask任务执行的核心在内部类Sync类中,在Sync类的内部,包含了以下几种状态码:
    READY:FutureTask任务创建成功以后,初始状态码;
    RUNNING:任务开始启动以后的状态码;
    RAN:无论是任务执行成功还是任务执行过程中抛了异常,都将走入到该状态码;
    CANCELLED:任务执行过程中被调用innerCancel取消后,进入该状态码;

    状态码时序图

    其他信息

    除此之外,Sync还包含了其他信息:
    执行结果:在Sync类中用result字段表示任务执行结果;
    异常:用该字段表示任务执行过程中抛出的异常信息;

    Sync数据结构定义大致如下:

    // 状态码定义在AbstractQueuedSynchronizer中
    private final class Sync extends AbstractQueuedSynchronizer {
            // 执行结果
            private V result; 
            // 异常信息
            private Throwable exception;
    }
    

    创建任务

    源码如下:

    public FutureTask(Callable<V> callable) {
        if (callable == null)
            throw new NullPointerException();
        sync = new Sync(callable); 
    }
    

    参数是一个Callable类型的接口,这个接口不同于Runnable,是有返回值的,定义如下:

    public interface Callable<V> {
        V call() throws Exception;
    }
    

    然后我们一起来看看整个FutureTask任务的执行过程。

    任务执行

    任务开始执行后,程序源码如下:

    void innerRun() {
        // 把任务状态从READY改成RUNNING
        if (!compareAndSetState(READY, RUNNING))
            return;
    
        runner = Thread.currentThread();
        if (getState() == RUNNING) {
            V result;
            try {
                // 异步任务开始启动
                result = callable.call();
            } catch (Throwable ex) {
                // 这里调用了innerSetException方法
                setException(ex); 
                return;
            }
            // 任务执行成功后,设置任务执行结果
            set(result);
        } else {
            releaseShared(0); // cancel
        }
    }
    

    我们再来看看,在任务执行成功以后,set方法都做了什么事情:

    protected void set(V v) {
        sync.innerSet(v);
    }
    
    class Sync {
        ...
        void innerSet(V v) {
            for (;;) {
                // 获得当前任务状态
                int s = getState();
                if (s == RAN)
                    return;
                if (s == CANCELLED) {
                    releaseShared(0);
                    return;
                }
    
                // 把任务状态设置为"RAN"(已完成)
                if (compareAndSetState(s, RAN)) {
                    result = v;
                    releaseShared(0);
                    // 这实际上是需要开发者实现的钩子方法
                    done(); 
                    return;
                }
            }
        }
    }
    

    任务取消

    任务在执行的过程中,可以选择取消执行,比如,一个查询同时从多个网址查询搜索结果,然而产品的需求是只需要返回其中一个搜索结果,因此,当有任务已经完成了搜索结果,那么其他查询线程就无需继续执行了,因此可以发起cancel的操作,减少网络消耗。

    boolean innerCancel(boolean mayInterruptIfRunning) {
        for (;;) {
            int s = getState();
            // 任务可能此时已被取消,或者已经执行完成
            if (ranOrCancelled(s))
                return false;
            if (compareAndSetState(s, CANCELLED))
                break;
        }
        if (mayInterruptIfRunning) {
            Thread r = runner;
            if (r != null)
                // 如果任务还在执行,尝试中断
                r.interrupt();
        }
        releaseShared(0);
        done();
        return true;
    }
    

    结果获取

    我们再看看获取任务执行结果的get两个方法,一个是不带阻塞时间的get()方法和另外一个带了阻塞时长的get(long timeout, TimeUnit unit)方法。这两个方法对应源码如下:

    // 不带阻塞时长
    public V get() throws InterruptedException, ExecutionException {
        return sync.innerGet();
    }
    
    // 带阻塞时长
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        return sync.innerGet(unit.toNanos(timeout));
    }
    

    再看看两个方法对应的sync.innerGet方法:

    V innerGet() throws InterruptedException, ExecutionException {
        // 阻塞式等待,但该方法可以被中断
        acquireSharedInterruptibly(0);
        // 如果此时任务已经被取消了,那么将抛一个异常出来
        if (getState() == CANCELLED)
            throw new CancellationException();
        // 任务执行过程中有异常,重新抛出异常
        // 该异常是在innerSetException方法中设置的
        if (exception != null)
            throw new ExecutionException(exception);
        return result;
    }
    
    V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
        if (!tryAcquireSharedNanos(0, nanosTimeout))
            throw new TimeoutException();
        // 如果此时任务已经被取消了,那么将抛一个异常出来
        if (getState() == CANCELLED)
            throw new CancellationException();
        // 任务执行过程中有异常,重新抛出异常
        // 该异常是在innerSetException方法中设置的
        if (exception != null)
            throw new ExecutionException(exception);
        return result;
    }
    
    

    以上就是FutureTask的源码解读,不过FutureTask内容比较简单,不包含AQS源码只有大约400行。接下来我来简单讲讲在使用过程中的一些经验。

    经验之谈

    搭配线程使用

    FutureTask仅仅是一个任务执行框架,在执行过程中并没有创建一个新的线程,在本文最初的实例中,我仅仅是创建了一个新的Thread类,并启动该Thread类,当然你们也可以搭配ThreadPoolExecutor线程池使用。说到这里,ExecutorService类正是如此使用的:

    public abstract class AbstractExecutorService implements ExecutorService {
        ...
        public <T> Future<T> submit(Runnable task, T result) {
            if (task == null) throw new NullPointerException();
            // 创建一个新的FutureTask
            RunnableFuture<T> ftask = newTaskFor(task, result);
            // 执行该FutureTask
            execute(ftask);
            return ftask;
        }
        ...
    }
    

    做好线程中断策略

    你们不要以为,FutureTask提供了cancel方法,任务就一定能被取消,而实际上,底层还是依赖Thread提供的interrupt方法,因此,为了实现cancel功能,需要线程能够主动响应中断。
    换句话说,如果任务不检查中断取消标志,可能任务永远也不会结束。
    所以对中断的一个正确理解是:它不会真正地中断一个正在运行的线程,而只是发出中断请求,然后由线程在下一个合适的时候中断自己。当然,JDK中有一些库函数可以主动响应这些中断,如Thread.sleep和BlockingQueue.put方法等。

    展开全文
  • java 异步并发

    千次阅读 2018-09-20 13:05:19
    java 异步并发 在Java中,如使用Tomcat,一个请求会分配一个线程进行请求处理,该线程负责获取数据、拼装数据或模板然后返回给前端;在同步调用获取数据接口的情况下(等待依赖系统返回数据),整个线程是一直被...
  • 1)聚合多个异步任务 需求:多个tab页包含不同的表格数据,并且带分页,页面首次加载时需要显示第一页数据,并且在tab顶上显示总量,如下图所示: 各个表格数据从es中查询,涉及到计算,此处可以让前端调用多个...
  • Android中异步任务机制AsyncTask的理解

    千次阅读 2015-10-16 14:20:18
    在Android中实现异步任务机制有两种方式,Handler和AsyncTask。 Handler模式需要为每一个任务创建一个新的线程,任务完成后通过Handler实例向UI线程发送消息,完成界面的更新,这种方式对于整个过程的控制比较...
  • 直接看下面的实验,注释详细: package com.tbc.java;...import java.util.ArrayList;...import java.util.Date;...import java.util.List;...import java.util.concurrent....import java.util.concurrent.ExecutionExce
  • 先来说一下对异步和同步的理解: 同步调用:调用方在调用过程中,持续等待返回结果。异步调用:调用方在调用过程中,不直接等待返回结果,而是执行其他任务,结果返回形式通常为回调函数。 其实,两者的区别还是...
  • 初尝Java异步编程

    万次阅读 2017-04-13 00:01:17
    最近从头了解了一下java异步编程,通过一下例子去展现不同java版本下怎么去实现任务的并行。  (一)Java 1.0的通过Thread 类 和 Runnable接口 Thread类实现了Runnable接口。 一般用法: 1)创建一个类SendToCUPD...
  • Java 异步编程:从 Future 到 Loom

    千次阅读 2019-10-21 11:28:12
    Java 异步编程:从 Future 到 Loom     本文对我们了解异步编程有很好的指导性,稍长,希望大家耐心阅读。     众所周知,Java 开始方法执行到结束,都是由同一个线程完成的。这种方式虽易于开发调试,但...
  • 【RabbitMQ】异步任务

    千次阅读 热门讨论 2018-05-19 22:46:04
    一、前言 上一篇博客介绍了用线程池实现异步任务。这一篇博客谈一谈用MQ实现异步任务。MQ的产品有灰常多,像什么MSMQ、activeMQ、RocketMQ、RabbitMQ、kafak等。在此之前先谈一谈对消息队列的理解。二、MQ MQ是一...
  • 一文带你彻底了解 Java 异步编程

    千次阅读 2021-07-26 21:34:38
    随着RxJava、Reactor等异步框架的流行,异步编程受到了越来越多的关注,尤其是在 IO 密集型的业务场景中,相比传统的同步开发模式,异步编程的优势越来越明显。 那到底什么是异步编程?异步化真正的好处又是什么?...
  • spring执行同步任务和异步任务

    千次阅读 2017-11-14 17:20:47
    异步任务是指做当前任务的同时,后台还可以在执行其他任务,可理解为可同时执行多任务,不必一件一件接着去做,下面开始上例子了   1.同步任务 Java代码  /*   * @(#)SyncTaskExecutorTest....
  • java 并发异步实现

    千次阅读 2020-09-09 13:04:36
    java 在 1.8 版本提供了 CompletableFuture 来支持异步编程。 CompletableFuture 的核心优势 为了领略 CompletableFuture 异步编程的优势,这里我们用 CompletableFuture 重新实现前面曾提及的烧水泡茶程序。首先...
  • 介绍: AsyncTask 可以使得我们能够轻松在 UI 线程控制后台操作和后台操作所返回结果,无需使用 Thread 和 Handler 这样的组合...AsyncTask 是经过 Android 封装、简化的异步任务实现方式,源码实现也是由 Thread 和 H
  • 异步任务AsyncTask

    2014-10-28 19:40:52
    首先觉得有必要贴点最原始东西 AsyncTask extends Object java.lang.Object  ↳ android.os.AsyncTask
  • Java异步并发Callable与Runable

    千次阅读 2016-01-26 14:40:39
    很多人都对这个东西感到特别奇怪(好吧,我承认,那个很多人就只是我自己而已),就我现在的理解,因为本人在并发这方面没有多少实践经验,所以只好就着一些资料和自己的理解给它下个定义,Future就是保存我们任务的...
  • Android多线程和异步任务

    千次阅读 2019-08-27 22:04:54
    1、Android开发中使用多线程的原因 避免ANR(Application is not responding) ...2、同步和异步理解 有些事件必须使用同步比如用户的注册,需要得到结果后才能进行下面的操作,有些事件需要异步,比如微博...
  • Spring @Async注解实现异步任务

    千次阅读 2019-06-18 09:33:40
    1. Spring对任务调度和异步任务执行的支持 Spring提供了如下注解用于支持任务调用和异步方法执行: @Scheduled:任务调度 @Async:异步方法执行 启用任务调度和异步方法执行的注解: @EnableScheduling:...
  • 关于异步理解

    2017-10-11 09:20:34
    什么是异步举一个例子:比如单位中午要吃饭了,我叫其中的一个同事说“xxx,中午吃饭了。吃饭不积极,脑袋有问题”。 同步:我要等他的回复之后,再去吃饭,他可能有个bug需要改,我需要等他改完,才能去吃饭┭┮﹏...
  • Java任务调度和线程池理解

    千次阅读 2013-04-29 20:27:17
    Java任务调度实现方法: Timer,Scheduler 1、Timer Timer类是用来执行任务的类,它接受一个TimerTask做参数 建立任务:使用Timer调度的任务应该继承TimerTask抽象类,该类实现Runnable接口,因些具备多...
  • 1)聚合多个异步任务 需求:多个tab页包含不同的表格数据,并且带分页,页面首次加载时需要显示第一页数据,并且在tab顶上显示总量,如下图所示: 各个表格数据从es中查询,涉及到计算,此处可以让前端调用多个接口...
  • Sunny先来说一下对异步和同步的理解: 同步调用:调用方在调用过程中,持续等待返回结果。 异步调用:调用方在调用过程中,不直接等待返回结果,而是执行其他任务,结果返回形式通常为回调函数。 其实,两者...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 72,667
精华内容 29,066
关键字:

java的异步任务理解

java 订阅