精华内容
下载资源
问答
  • ForkJoinPool的使用 微服务并行调用多个接口

    1、概念

    1.1、用来做什么

    在这里插入图片描述

    1.2、意图梳理

    在这里插入图片描述

    1.3、使用场景

    经典网关场景,查询多个系统数据,由于是接口调用,存在阻塞性,所以并不适合下边这种情况,但本教程会涉及到这个场景的实现来达到了解ForkJoinPool的使用,通过这个场景可以更清楚了解ForkJoinPool的使用:
    在这里插入图片描述
    通常的使用场景下边1.5会列出

    1.4、实现思路

    在这里插入图片描述

    1.5、适用

    在这里插入图片描述

    2、代码

    2.1、应用场景

    一个方法中调用多个微服务获取数据:
    在这里插入图片描述
    上边这样写的问题是很大的,接口响应总时间大概为调用的各个微服务接口时间之和(还不包括本接口的其他逻辑处理),所以这样效率是非常低的。我们可以采用多线程应用来做这个事情。

    先来了解forkjoinpool,它本质上还是一个线程池,默认的线程数量为cpu的核数,可以通过调用Executeors来获取,也可以直接用return 后边的代码进行创建:
    在这里插入图片描述
    forkjoinpool的使用方法与我们平时的线程池类似,也是用submit方法,不过除了可以穿runable和callable参数,多了可以传forkjointask参数的方法,由于这个类是一个抽象类,我们经常继承下边两个子抽象类做具体实现:
    在这里插入图片描述

    2.2、场景解决方法

    前两个顺便提供了这个场景的常用方法,最后一个是为了介绍ForkJoinPool的使用才写的,实际中这样用到的并不多

    2.2.1、利用FutureTask解决

    这里顺便为了了解FutureTask的大致原理,自制了一个简单的futuretask,如不需要了解FutureTask原理可直接跳过:

    import java.util.concurrent.*;
    import java.util.concurrent.locks.LockSupport;
    
    // 我们想一想,这个功能怎么实现
    // (jdk本质,就是利用一些底层API,为开发人员提供便利)
    public class NeteaseFutureTask<T> implements Runnable, Future { // 获取 线程异步执行结果 的方式
        Callable<T> callable; //  业务逻辑在callable里面
        T result = null;
        volatile String state = "NEW";  // task执行状态
        LinkedBlockingQueue<Thread> waiters = new LinkedBlockingQueue<>();// 定义一个存储等待者的集合
    
        public NeteaseFutureTask(Callable<T> callable) {
            this.callable = callable;
        }
    
        @Override
        public void run() {
            try {
                result = callable.call();
            } catch (Exception e) {
                e.printStackTrace();
                // result = exception
            } finally {
                state = "END";
            }
    
            // 唤醒等待者
            Thread waiter = waiters.poll();
            while (waiter != null) {
                LockSupport.unpark(waiter);
    
                waiter = waiters.poll(); // 继续取出队列中的等待者
            }
        }
    
        // 返回结果,
        @Override
        public T get() {
            if ("END".equals(state)) {
                return result;
            }
    
            waiters.offer(Thread.currentThread()); // 加入到等待队列,线程不继续往下执行
    
            while (!"END".equals(state)) {
                LockSupport.park(); // 线程通信的知识点
            }
            // 如果没有结束,那么调用get方法的线程,就应该进入等待
            return result;
        }
    
        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return false;
        }
    
        @Override
        public boolean isCancelled() {
            return false;
        }
    
        @Override
        public boolean isDone() {
            return false;
        }
    
        @Override
        public Object get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            return null;
        }
    }
    
    

    然后我们利用上边这个自制的futuretask进行解决问题,当然这个自制的futuretask也可以换成jdk中的FutureTask,只不过上边做了简单的实现仅仅来满足我们的需求:

    import com.alibaba.fastjson.JSONObject;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import org.springframework.web.client.RestTemplate;
    
    import java.util.concurrent.*;
    
    /**
     * 串行调用http接口
     */
    @Service
    public class UserServiceFutureTask {
        ExecutorService executorService = Executors.newCachedThreadPool();
        @Autowired
        private RestTemplate restTemplate;
    
        /**
         * 查询多个系统的数据,合并返回
         */
        public Object getUserInfo(String userId) throws ExecutionException, InterruptedException {
            // 其他例子, 查数据库的多个表数据,分多次查询
    
            // 原味爱好
            // Future < >  Callable
            // 1 和runnable一样的业务定义.  但是本质上是有区别的:  返回值 异常 call run.
            Callable<JSONObject> callable = new Callable<JSONObject>() {
                @Override
                public JSONObject call() throws Exception {
                    // 1. 先从调用获取用户基础信息的http接口
                    long userinfoTime = System.currentTimeMillis();
                    String value = restTemplate.getForObject("http://www.tony.com/userinfo-api/get?userId=" + userId, String.class);
                    JSONObject userInfo = JSONObject.parseObject(value);
                    System.out.println("userinfo-api用户基本信息接口调用时间为" + (System.currentTimeMillis() - userinfoTime));
                    return userInfo;
                }
            };
    
            // 通过多线程运行callable
            NeteaseFutureTask<JSONObject> userInfoFutureTask = new NeteaseFutureTask<>(callable);
            new Thread(userInfoFutureTask).start();
    
            NeteaseFutureTask<JSONObject> intergralInfoTask = new NeteaseFutureTask(() -> {
                // 2. 再调用获取用户积分信息的接口
                long integralApiTime = System.currentTimeMillis();
                String intergral = restTemplate.getForObject("http://www.tony.com/integral-api/get?userId=" + userId,
                        String.class);
                JSONObject intergralInfo = JSONObject.parseObject(intergral);
                System.out.println("integral-api积分接口调用时间为" + (System.currentTimeMillis() - integralApiTime));
                return intergralInfo;
            });
            new Thread(intergralInfoTask).start();
    
            // 3. 合并为一个json对象
            JSONObject result = new JSONObject();
            result.putAll(userInfoFutureTask.get()); // 会等待任务执行结束
            result.putAll(intergralInfoTask.get());
    
            return result;
        }
    
    }
    
    

    FutureTask的应用,核心是不要有依赖关系:
    在这里插入图片描述

    2.2.2、利用CountDownLatch解决

    当然为了代码的复用性,下边的接口调用可以封装成一个工具类传入url就行了。

    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import org.springframework.web.client.RestTemplate;
    
    import java.util.ArrayList;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * 调用http接口
     */
    @Service
    public class UserServiceCountLatch {
        ExecutorService executorService = Executors.newCachedThreadPool();
    
        @Autowired
        private RestTemplate restTemplate;
    
        /**
         * 查询多个系统的数据,合并返回
         */
        public Object getUserInfo(String userId) throws InterruptedException {
            CountDownLatch count = new CountDownLatch(2);
            ArrayList<JSONObject> values = new ArrayList<>();
            // 你可以封装成一个 提交URL 就能自动多线程调用的 工具
                executorService.submit(() -> {
                    // 1. 先从调用获取用户基础信息的http接口
                    long userinfoTime = System.currentTimeMillis();
                    String value = restTemplate.getForObject("http://www.tony.com/userinfo-api/get?userId=" + userId, String.class);
                    JSONObject userInfo = JSONObject.parseObject(value);
                    System.out.println("userinfo-api用户基本信息接口调用时间为" + (System.currentTimeMillis() - userinfoTime));
                    values.add(userInfo);
                    count.countDown();
                });
                executorService.submit(() -> {
                    // 2. 再调用获取用户积分信息的接口
                    long integralApiTime = System.currentTimeMillis();
                    String intergral = restTemplate.getForObject("http://www.tony.com/integral-api/get?userId=" + userId,
                            String.class);
                    JSONObject intergralInfo = JSONObject.parseObject(intergral);
                    System.out.println("integral-api积分接口调用时间为" + (System.currentTimeMillis() - integralApiTime));
                    values.add(intergralInfo);
                    count.countDown();
            });
    
            count.await();// 等待计数器归零
    
            // 3. 合并为一个json对象
            JSONObject result = new JSONObject();
            for (JSONObject value : values) {
                result.putAll(value);
            }
            return result;
        }
    }
    

    2.2.3、利用ForkJoinPool来解决(不建议用这个解决接口调用)

    为什么说不是最优解,因为forkjoinpool实现复杂,并且接口调用是阻塞的任务,所以根据1.5的概念,最好不要用这个解决。
    下边这个代码是可以复用的,因为上边代码需要根据具体调用的接口数量来改变代码,而下边直接将所有接口都放到了urls中,由RecursiveTask的实现类中重写compute方法来进行了递归操作,然后将所有结果合并,提高代码复用。

    import com.alibaba.fastjson.JSONObject;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import org.springframework.web.client.RestTemplate;
    
    import java.util.ArrayList;
    import java.util.concurrent.*;
    
    /**
     * 并行调用http接口
     */
    @Service
    public class UserServiceForkJoin {
        // 本质是一个线程池,默认的线程数量:CPU的核数
        ForkJoinPool forkJoinPool = new ForkJoinPool(10, ForkJoinPool.defaultForkJoinWorkerThreadFactory,
                null, true);
        @Autowired
        private RestTemplate restTemplate;
    
        /**
         * 查询多个系统的数据,合并返回
         */
        public Object getUserInfo(String userId) throws ExecutionException, InterruptedException {
            // 其他例子, 查数据库的多个表数据,分多次查询
            // fork/join
            // forkJoinPool.submit()
            ArrayList<String> urls = new ArrayList<>();
            urls.add("http://www.tony.com/userinfo-api/get?userId=" + userId);
            urls.add("http://www.tony.com/integral-api/get?userId=" + userId);
    
            HttpJsonRequest httpJsonRequest = new HttpJsonRequest(restTemplate, urls, 0, urls.size() - 1);
            ForkJoinTask<JSONObject> forkJoinTask = forkJoinPool.submit(httpJsonRequest);
    
            JSONObject result = forkJoinTask.get();//获取结果,这里的get也是阻塞的
            return result;
        }
    }
    
    // 任务
    class HttpJsonRequest extends RecursiveTask<JSONObject> {
    
        RestTemplate restTemplate;
        ArrayList<String> urls;
        int start;
        int end;
    
        HttpJsonRequest(RestTemplate restTemplate, ArrayList<String> urls, int start, int end) {
            this.restTemplate = restTemplate;
            this.urls = urls;
            this.start = start;
            this.end = end;
        }
    
        // 就是实际去执行的一个方法入口(任务拆分)
        @Override
        protected JSONObject compute() {
            int count = end - start; // 代表当前这个task需要处理多少数据
            // 自行根据业务场景去判断是否是大任务,是否需要拆分
            if (count == 0) {
                String url = urls.get(start);
                // TODO 如果只有一个接口调用,立刻调用
                long userinfoTime = System.currentTimeMillis();
                String response = restTemplate.getForObject(url, String.class);
                JSONObject value = JSONObject.parseObject(response);
                System.out.println(Thread.currentThread() + " 接口调用完毕" + (System.currentTimeMillis() - userinfoTime) + " #" + url);
                return value;
            } else { // 如果是多个接口调用,拆分成子任务  7,8,   9,10
                System.out.println(Thread.currentThread() + "任务拆分一次");
                int x = (start + end) / 2;
                HttpJsonRequest httpJsonRequest = new HttpJsonRequest(restTemplate, urls, start, x);// 负责处理哪一部分?
                httpJsonRequest.fork();//调用的这个fork方法就是把当前任务再提交到线程池处理队列中,再开线程进行处理提高效率
    
                HttpJsonRequest httpJsonRequest1 = new HttpJsonRequest(restTemplate, urls, x + 1, end);// 负责处理哪一部分?
                httpJsonRequest1.fork();
    
                // join获取处理结果
                JSONObject result = new JSONObject();
                result.putAll(httpJsonRequest.join());
                result.putAll(httpJsonRequest1.join());
                return result;
            }
        }
    }
    
    

    运行结果,因为调用了两个接口,所以只需要拆分一次:
    在这里插入图片描述

    3、ForkJoinPool与普通线程池的区别

    普通线程池:
    在这里插入图片描述
    ForkJoinPool:
    区别就是每个线程都有自己的队列,原理就是下边的图,每个线程执行大task和由大task拆分成的小task,但是问题来了,这样大task和小task不都由同一个线程来串行执行了吗,这里还有一个forkjoinpool的特点,就是工作窃取,如果其他线程的队列里边没有任务的话,会分担其他线程的任务队列,这就是工作窃取,所以这个forkjoinpool实现的源代码也是相当麻烦的,看源码的话会发现jdk中用了五百行去描述它是怎么实现的,目的是提高线程效率。
    在这里插入图片描述
    但也不是说forkjoinpool就是万能的,从上边1.5可以知道,最好不要声明太多的forkjoinpool,适合非阻塞的和内存性的操作,因为一旦阻塞就意味着线程的浪费,所以网络操作,数据库操作,文件操作就最好不要用他,如果要用最好只声明一个线程池,线程数为cpu核数。具体什么情况下适合呢,比如说我们传了1000w个字符串,我们需要知道不同字符串出现的次数,这种纯数据计算类的可以用这个。

    展开全文
  • 微服务并行调用多个外部接口

    千次阅读 2019-10-17 10:55:37
    // 降低了系统资源分配和避免OOM异常,并行调用外部多个接口, 降低了调用时间 ThreadPoolExecutor executor = new ThreadPoolExecutor(16, 32, 60, TimeUnit.SECONDS, new LinkedBlockingQueue(64), new ...

    线程池: 如何解决线程池中的线程被堵住的问题?
    如果线程池被堵住, 将没有可用线程来执行新提交任务, 新提交任务将被存到任务队列等待执行。
    例如, 在一个线程中并发请求多个其他外部接口,

    一个折中的方案

    既用到了当前线程, 又用到了线程池, 给当前线程加上了翅膀/辅助。

    // coreSize 16 任务队列最长64, 当超出64, 创建新的线程,非核心线程最长存活60s, 决绝策略设置为 用main线程执行被拒绝的任务
            // 降低了系统资源分配和避免OOM异常,并行调用外部多个接口, 降低了调用时间
    
            ThreadPoolExecutor executor =
                    new ThreadPoolExecutor(16, 32, 60,
                            TimeUnit.SECONDS, new LinkedBlockingQueue<>(64), new ThreadPoolExecutor.CallerRunsPolicy());
    
            LinkedList<Future<String>> futures = new LinkedList<>();
    
            for (int i = 0; i < 300; i++) {
                int finalI = i;
                Future<String> future = executor.submit(() -> {
                    Thread.sleep(200);
                    return Thread.currentThread().getName();
                });
                futures.add(future);
            }
    
            futures.forEach(f -> {
                try {
                    if ("main".equalsIgnoreCase(f.get())) {
                        System.out.println(f.get());
                    }
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            });
    
            executor.shutdown();
    
    
    
    展开全文
  • 当然实际情况肯定复杂的多,如果一个接口内部存在不相互依赖的耗时调用的话,那么我们可以做这样的合并,响应时间上的减少还是非常明显的。整个接口的响应时间取决于最长的那个内部接口。 那么我们来看看在 Java ...
    img_3d09cbce5e9a8232923e5c82f157b3db.jpe

    我们先来设想一个场景。

    有一个 http 的接口 A,该接口内部实际上是由另外三个接口 B、C、D 返回结果的组合,这三个接口不存在相互依赖。我们一般的写法就是 B、C、D 同步顺序执行,依次拿到结果后组装在一起。那么假如这三个接口分别耗时 2 秒,那么 A 接口就要耗时 6 秒。如果可以让 B、C、D 同时执行的话,那么 A 接口理论上只要耗时 2 秒。

    当然实际情况肯定复杂的多,如果一个接口内部存在不相互依赖的耗时调用的话,那么我们可以做这样的合并,响应时间上的减少还是非常明显的。整个接口的响应时间取决于最长的那个内部接口。

    那么我们来看看在 Java 中有哪些方法可以达到这样的目的。认真思考下你会发现,如果要并行处理的话,在 Java 中只能用多线程来做。实际情况中每个线程处理完的时间肯定不一样,那么如何让线程先处理完的停下来等最后那个处理完的呢。如果经常用多线程的小伙伴肯定能想到 CountDownLatch 工具类。当然也有直接简单暴力的方法,在空循环里轮询每个线程是否执行完,但是这样做肯定不优雅。

    那下面就直接上代码了: 假设有个学生服务提供查询学生名字,年龄和家庭信息,每个服务之间没有相互依赖。 我们就简单模拟下来获取学生信息的一个接口。

    常规方法

    @RequestMapping("/getStudentInfo")

    public Object getStudentInfo() {

    long start = System.currentTimeMillis();

    Map resultMap = new HashMap<>(10);

    try {

    resultMap.put("studentName", studentService.getStudentName());

    resultMap.put("studentAge", studentService.getSutdentAge());

    resultMap.put("studentFamilyInfo", studentService.getSutdentFamilyInfo());

    } catch (Exception e) {

    resultMap.put("errMsg", e.getMessage());

    }

    resultMap.put("total cost", System.currentTimeMillis() - start);

    return resultMap;

    }

    顺序同步执行,耗时 6 秒。

    1. Future

    @RequestMapping("/getStudentInfoWithFuture")

    public Object testWhitCallable() {

    long start = System.currentTimeMillis();

    Map resultMap = new HashMap<>(10);

    try {

    CountDownLatch countDownLatch = new CountDownLatch(3);

    Future futureStudentName = es.submit(() -> {

    Object studentName = studentService.getStudentName();

    countDownLatch.countDown();

    return studentName;

    });

    Future futureStudentAge = es.submit(() -> {

    Object studentAge = studentService.getSutdentAge();

    countDownLatch.countDown();

    return studentAge;

    });

    Future futureStudentFamilyInfo = es.submit(() -> {

    Object studentFamilyInfo = studentService.getSutdentFamilyInfo();

    countDownLatch.countDown();

    return studentFamilyInfo;

    });

    //同步等待所有线程执行完之后再继续

    countDownLatch.await();

    resultMap.put("studentName", futureStudentName.get());

    resultMap.put("studentAge", futureStudentAge.get());

    resultMap.put("studentFamilyInfo", futureStudentFamilyInfo.get());

    } catch (Exception e) {

    resultMap.put("errMsg", e.getMessage());

    }

    resultMap.put("total cost", System.currentTimeMillis() - start);

    return resultMap;

    }

    2.RxJava

    @RequestMapping("/getStudentInfoWithRxJava")

    public Object testWithRxJava() {

    long start = System.currentTimeMillis();

    Map resultMap = new HashMap<>(10);

    try {

    CountDownLatch countDownLatch = new CountDownLatch(1);

    Observable studentNameObservable = Observable.create(observableEmitter -> {

    resultMap.put("studentName", studentService.getStudentName());

    observableEmitter.onComplete();

    }).subscribeOn(Schedulers.io());

    Observable studentAgeObservable = Observable.create(observableEmitter -> {

    resultMap.put("studentAge", studentService.getSutdentAge());

    observableEmitter.onComplete();

    }).subscribeOn(Schedulers.io());

    Observable familyInfoObservable = Observable.create(observableEmitter -> {

    resultMap.put("studentFamilyInfo", studentService.getSutdentFamilyInfo());

    observableEmitter.onComplete();

    }).subscribeOn(Schedulers.io());

    //创建一个下游 Observer

    Observer observer = new Observer() {

    @Override

    public void onSubscribe(Disposable d) {

    }

    @Override

    public void onNext(Object o) {

    }

    @Override

    public void onError(Throwable e) {

    }

    @Override

    public void onComplete() {

    //因为后面用了 merge 操作符,所以会合并后发射,那么只要 countdown 一次就行了。

    countDownLatch.countDown();

    }

    };

    //建立连接,

    Observable.merge(studentNameObservable, studentAgeObservable, familyInfoObservable).subscribe(observer);

    //等待异步线程完成

    countDownLatch.await();

    } catch (Exception e) {

    resultMap.put("errMsg", e.getMessage());

    }

    resultMap.put("total cost", System.currentTimeMillis() - start);

    return resultMap;

    }

    对于 RxJava 我不熟,我也是临时学习的,不知道这种写法是不是最佳的。

    3.CompletableFutures

    @RequestMapping("/getStudentInfoWithCompletableFuture")

    public Object getStudentInfoWithCompletableFuture() {

    long start = System.currentTimeMillis();

    Map resultMap = new HashMap<>(10);

    try {

    CompletableFuture completableFutureStudentName = CompletableFuture.supplyAsync(() -> {

    try {

    return studentService.getStudentName();

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    return null;

    });

    CompletableFuture completableFutureSutdentAge = CompletableFuture.supplyAsync(() -> {

    try {

    return studentService.getSutdentAge();

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    return null;

    });

    CompletableFuture completableFutureFamilyInfo = CompletableFuture.supplyAsync(() -> {

    try {

    return studentService.getSutdentFamilyInfo();

    } catch (InterruptedException e) {

    e.printStackTrace();

    }

    return null;

    });

    CompletableFuture.allOf(completableFutureStudentName, completableFutureSutdentAge, completableFutureFamilyInfo).join();

    resultMap.put("studentName", completableFutureStudentName.get());

    resultMap.put("studentAge", completableFutureSutdentAge.get());

    resultMap.put("studentFamilyInfo", completableFutureFamilyInfo.get());

    } catch (Exception e) {

    resultMap.put("errMsg", e.getMessage());

    }

    resultMap.put("total cost", System.currentTimeMillis() - start);

    return resultMap;

    }

    自带最后的同步等待,不需要 CountDownLatch。CompletableFuture 还有很多其他好用的方法。

    有兴趣的可以自己来实验下。 github 项目地址 reactive-programming-sample。

    Java程序员如何学习才能快速入门并精通呢?

    当真正开始学习的时候难免不知道从哪入手,导致效率低下影响继续学习的信心。

    但最重要的是不知道哪些技术需要重点掌握,学习时频繁踩坑,最终浪费大量时间,所以有一套实用的视频课程用来跟着学习是非常有必要的。

    为了让学习变得轻松、高效,今天给大家免费分享一套阿里架构师传授的一套教学资源。帮助大家在成为架构师的道路上披荆斩棘。这套视频课程详细讲解了(Spring,MyBatis,Netty源码分析,高并发、高性能、分布式、微服务架构的原理,JVM性能优化、分布式架构)等这些成为架构师必备的内容!而且还把框架需要用到的各种程序进行了打包,根据基础视频可以让你轻松搭建分布式框架环境,像在企业生产环境一样进行学习和实践。

    img_f793477d300677b850142a345b7f86da.png

    如果想提升自己的,看看上图大纲能知道你现在还处于什么阶段要向那些方面发展?

    同时小编已将上图知识大纲里面的内容打包好了......

    想要资料的朋友,可以直接加群960439918获取免费架构资料(包括高可用,高并发,spring源码,mybatis源码,JVM,大数据,Netty等多个技术知识的架构视频资料和各种电子书籍阅读)

    加入群聊【java高级架构交流群】

    img_d8dffde0a77193324beaa168bc56d709.png
    展开全文
  • 微服务优化之并行调用

    千次阅读 2018-05-10 22:54:34
    微服务优化之并行调用 互联网产品随着用户的增加,系统对服务的高性能、高可用、可伸缩、可扩展的支持,大都采用分布式RPC框架。然而随着业务的增加,系统越来越多,系统之间的调用也越来越复杂,原本一个系统中一...

    微服务优化之并行调用

    互联网产品随着用户的增加,系统对服务的高性能、高可用、可伸缩、可扩展的支持,大都采用分布式RPC框架。然而随着业务的增加,系统越来越多,系统之间的调用也越来越复杂,原本一个系统中一次请求就可以完成的工作,现在可能被分散在多个系统中,一次请求需要多个系统响应。这样就会放大RPC调用延迟带来的副作用,影响系统的高性能需求。

    例如:一个RPC接口中需要依赖另外三个系统的RPC服务,各RPC服务的响应时间分别是20ms、10ms、10ms,那么这个接口的对外系统依赖的耗时40ms。如果接口依赖越多,响应时间就会越长。

    对此,需要在业务范围内进行性能优化,优化思路总的来说有两种:

    第一:如果对RPC接口调用,不需要关心接口的返回值,那么可以采用异步RPC调用。

    第二:如果依赖RPC接口返回值,并且连续调用的多个RPC之间没有依赖关系,可以采用并行化处理。

    本文主要分享一下通过并行化处理,来优化RPC接口响应时间,如上例子中的RPC采用并行调用,对外系统接口的依赖耗时会降低到20ms。

    第一:因为Java对线程的使用非常方便,所以完成并行调用对于Java语言来说是相对简单,根据依赖外部接口分别创建一个线程来调用就可以完成。

    第二:那么问题来:如果我的接口在完成其他接口调用后,还需要完成额外的功能而且需要依赖其他接口调用结果,该怎么处理呢?Thread类通过join调用,可以让主线程等待子线程处理结果。

    第三:那么问题又来了:子线程内部的异常无法在外部获取,而需要依赖外部接口的调用结果的情况下,如果RPC接口抛出异常,必须在主线程中获取并作出相应处理,这个工作可以通过FutureTask来完成。

    第四:那么问题又来了:如果一个接口依赖十个外部系统,那么每次请求就需要创建十个线程,随着接口TPS增加,系统创建线程和销毁的线程耗费的资源越来越高,这个时候需要考虑采用线程池方案了。

    第五:那么问题又来了:以上实例只是一个单应用的测试Demo,真实应用情况下如上这样在代码中创建线程池并没太大意义,应该创建全局的线程池,所有请求共用线程池才能达到线程资源共用。但是Executors中线程池都默认采用AbortPolicy 的拒绝策略,在高并发情况下,就会频繁出现的线程池拒绝服务异常。此时可以考虑自定义线程池,采用CallerRunsPolicy拒绝策略,在高并发量,当线程池无法提供服务的情况下,采用主线程自己创建线程,达到并发量和计算资源的最优协调。

    第六:完成以上操作就可以完美了吗?然而情况并非如此,如果细心测试发现,如果其中一个接口抛出异常时,主线程就结束了,而其他还没有执行结束的子线程将继续执行,一开始我们通过Thread.join()来协调主子线程的先后顺序,而现在采用线程池,无法在获取线程并且调用join方法,而是采用FutureTask.get()来协调先后顺序,那么还可以采用哪些方式保证主线程最后结束呢?此时可以采用一些特有的并发工具,如:闭锁,栅栏,信号量。如下为网络摘抄的三个工具对比:

    闭锁(CountDownLatch)

    类似于门。门初始是关闭的,试图进门的线程挂起等待开门。当负责开门进程将门打开后,所有等待线程被唤醒。

    门一旦打开就不能再关闭了。

    CountDownLatch(int n):指定闭锁计数器

    await() :挂起等待闭锁计数器为0

    countDown():闭锁计数器减1

    栅栏(CyclicBarrier)

    和闭锁有类似之处。闭锁是等待“开门”事件;栅栏是等待其他线程。例如有N个线程视图通过栅栏,此时先到的要等待,直到所有线程到到达后,栅栏开启,所有等待线程被唤醒通过栅栏。

    CyclicBarrier(int n):需要等待的线程数量

    await():挂起等待达到线程数量

    信号量(Semaphore)

    和锁的作用类似。区别是锁只允许被一个线程获取,但是信号量可以设置资源数量。当没有可用资源时,才被挂起等待。

    Semaphore(int n):指定初始的资源数量

    acquire():试图获取资源。当没有可用资源时挂起

    release():释放一个资源

           本文采用栅栏完成实例代码如下:

    package com.halfworlders.test.domo;

    import java.util.concurrent.Callable;

    import java.util.concurrent.CyclicBarrier;

    import java.util.concurrent.ExecutionException;

    import java.util.concurrent.ExecutorService;

    import java.util.concurrent.FutureTask;

    import java.util.concurrent.LinkedBlockingQueue;

    import java.util.concurrent.ThreadPoolExecutor;

    import java.util.concurrent.TimeUnit;

     

    import com.halfworlders.test.exp.AppException;

    import com.halfworlders.test.impl.ServiceImpl;

    import com.halfworlders.test.intf.ServiceInterface;

     

    publicclass App {

        /**

         * 外接口总数

         */

        privatestaticfinalintINTERFACE_COUNT = 10;

        ExecutorService executorService = new ThreadPoolExecutor(INTERFACE_COUNT, INTERFACE_COUNT*3, 10L,

               TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), newThreadPoolExecutor.CallerRunsPolicy());

     

        publicstaticvoid main(String[] args) {

           longstart = System.currentTimeMillis();

           App test = new App();

           test.test();

           longend = System.currentTimeMillis();

           System.out.println("总耗时:"+(end-start)+"ms");

        }

     

        @SuppressWarnings("unchecked")

        publicvoid test() {

           final CyclicBarrier cb = new CyclicBarrier(INTERFACE_COUNT + 1);

           final ServiceInterface[] services = assembles();

           final FutureTask<Integer>[] futureTasks = new FutureTask[INTERFACE_COUNT];

           for (inti = 0; i < INTERFACE_COUNT; i++) {

               final Integer fi = i;

               futureTasks[i] = new FutureTask<Integer>(new Callable<Integer>() {

                  @Override

                  public Integer call() throws Exception {

                      try {

                         returnservices[fi].service();

                      } finally {

                         cb.await();

                      }

                  }

               });

               executorService.submit(futureTasks[i]);

           }

           String serviceName = null;

           try {

               // 打开栅栏

               cb.await();

               // 如果有其他系统调用异常,则将该异常向外层抛出

               for (inti = 0; i < INTERFACE_COUNT; i++) {

                  serviceName = services[i].getName();

                  futureTasks[i].get();

               }

           } catch (Exception e) {

               if ((einstanceof ExecutionException) && (e.getCause() instanceof AppException)) {

                  throw (AppException) e.getCause();

               } else {

                  thrownew RuntimeException(serviceName+"系统异常", e);

               }

           }

        }

       

        private ServiceInterface[] assembles(){

           ServiceInterface[] service = new ServiceInterface[INTERFACE_COUNT];

           for (inti = 0; i < INTERFACE_COUNT; i++) {

               service[i] = new ServiceImpl("接口"+i);

           }

            returnservice;

        }

    }

     

     

    展开全文
  • 经常会有这样的调用场景:app(或web前端)调用后台的一个接口,该接口接到该请求后,需要调用其他多个微服务来获取数据,最终汇总一个最终结果返回给用户。 譬如用户请求“我的订单”,后台在收到请求后,就需要去...
  • 微服务优化之异步调用

    千次阅读 2018-05-18 23:01:48
    前一节《微服务优化之并行》,主要从并行的角度来提高微服务的响应时间,本节讲一下微服务优化之异步调用。异步的前提是对依赖的RPC接口调用,不需要关心其执行结果,对数据没有强一致性要求,只要能够达到最终一致...
  • 前言:微服务架构上通过业务来划分服务的,通过REST调用,对外暴露的一个接口,可能需要很多个服务协同才能完成这个接口功能,如果链路上任何一个服务出现问题或者网络超时,都会形成导致接口调用失败。随着业务的...
  • 让API并行调用变得如丝般顺滑的绝招

    万次阅读 多人点赞 2020-12-29 12:17:18
    目前常见的做法将数据同步到ES这类搜索框架中进行查询,然后通过搜出来的结果,一般是主键ID, 再去具体的数据表中查询完整的数据,组装返回给调用方。 比如下面这段代码,首先查询出文章信息,然后根据文章中的用户...
  • 微服务升级导致调用超时异常的分析和方案 前几天,在做线上异常日志巡检时,发现通过公司自研的RPC框架调用订单服务出现connect timed out异常,在此记录对该问题的排查和相应的解决思路,作为备忘。 前情提要: 1...
  • 框架采用消息机制调用服务,速度快,灵活,通过使用缓存,解决服务调用的冥想性和消息的冥想性,在事务处理时,采用并行并行调用对应的服务,提高了性能。MOONWATER是一个非常优秀的框架,优势在于提高了应用的成功...
  • 服务之间如何互相调用就变成微服务架构中的一个关键问题。 服务调用有两种方式,一种是RPC方式,另一种是事件驱动(Event-driven)方式,也就是发消息方式。 消息方式是松耦合方式,比紧耦合的RPC方式要优越,但RPC...
  • 问题:ssm框架,spring中controller和service都是单例的,那是怎么保证线程安全呢? 通过threadLocal保证, ...微服务优化之并行调用 https://blog.csdn.net/tidu2chengfo/article/details/80275064
  • 微服务

    2018-08-12 10:35:16
    一、微服务介绍 1. 什么是微服务  在介绍微服务时,首先得先理解什么是微服务,顾名思义,微服务得从两个方面去理解,什么是"微"、什么是"服务", 微 狭义来讲就是体积小、著名的"2 ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 11,751
精华内容 4,700
关键字:

微服务并行调用