精华内容
下载资源
问答
  • 最起初的话思考如果只是简单这样的话其实也好实现,重写httpClient里面就有相关的超时重试机制,但是如果要是实现了某个整体的方法来进行失败重试那不能更好。所以就根据这个想法实现了以下的几个功能: 可以更简单...

    最初的业务场景就是:

    需要需要使用restTemplate调用个接口并且调用失败后需要延时重复调用(最多3次),第一次5秒,第二次10秒,第三次15秒。

    1. 主要功能

    最起初的话思考如果只是简单这样的话其实也好实现,重写httpClient里面就有相关的超时重试机制,但是如果要是实现了某个整体的方法来进行失败重试那不能更好。所以就根据这个想法实现了以下的几个功能:

    1. 可以更简单的针对整个方法来进行延时或者有失败重试的调用执行。
    2. 可以设置首次是否延迟执行以及延迟执行的时间。
    3. 可以设置失败重试次数以及开发者并可自定义配置重试延时时间策略(默认四种:渐进步长、固定时间、固定步长、斐波那契数列)。
    4. 支持查看每次执行结果(包括失败重试的执行结果)。
    5. 执行器统一管理所有任务。
    6. 支持任务自定义顺序完成(流水线完成任务) 例如1 -> 2 -> 3,以及支持查看流水线任务中每个任务的执行结果。

    1.1 使用方式:

    最新版本:1.0.2  这个我已经打包发布至maven中央仓库中,大家可以直接在项目的pom文件中添加下面依赖即可,并且在此com.b0c0.common依赖中还正在不断完善,现在是只有这个延时队列和一个通用的方法日志拦截输出:

    <dependency>
        <groupId>com.b0c0</groupId>
        <artifactId>common</artifactId>
        <version>1.0.2</version>
    </dependency>
    

    1.2 使用示例

    package com.b0c0.common.delayedQueue;
    
    import com.b0c0.common.delayedQueue.base.GeneralQueueConsumerable;
    import com.b0c0.common.domain.vo.GeneralResultVo;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    import java.util.logging.Logger;
    
    public class GeneralDelayedQueueExecuteTest {
    
        private static final Logger logger = Logger.getLogger(GeneralDelayedQueueExecuteTest.class.getName());
    
        public static void main(String[] args) {
            GeneralDelayedQueueExecuteTest test = new GeneralDelayedQueueExecuteTest();
    //        test.run();
    //        try {
    //            test.runAsync();
    //        } catch (ExecutionException e) {
    //            e.printStackTrace();
    //        } catch (InterruptedException e) {
    //            e.printStackTrace();
    //        }
            test.runLine();
        }
    
    
        /**
         * 同步执行示例
         */
        public void run() {
            GeneralDelayedQueue delayedQueue1 = new GeneralDelayedQueue(
                    new TestConsumer1(), "1", "body", 1, 500, 50);
            GeneralDelayedQueue delayedQueue2 = new GeneralDelayedQueue(
                    new TestConsumer1(), "2", "body", 3, 100, 100);
            GeneralDelayedQueue delayedQueue3 = new GeneralDelayedQueue(
                    new TestConsumer1(), "3", "body", 3, 150, 150);
    
            GeneralDelayedQueueExecute.run(delayedQueue1);
            GeneralDelayedQueueExecute.run(delayedQueue2);
            GeneralDelayedQueueExecute.run(delayedQueue3);
        }
    
        /**
         * 异步执行示例
         * @throws ExecutionException
         * @throws InterruptedException
         */
        public void runAsync() throws ExecutionException, InterruptedException {
            GeneralDelayedQueue delayedQueue1 = new GeneralDelayedQueue(
                    new TestConsumer1(), "1", "body", 5, 500, 5);
            GeneralDelayedQueue delayedQueue2 = new GeneralDelayedQueue(
                    new TestConsumer1(), "2", "body", 3, 10000, 2);
            GeneralDelayedQueue delayedQueue3 = new GeneralDelayedQueue(
                    new TestConsumer1(), "3", "body", 3, 1500, 300);
            Future<GeneralResultVo<String>> future1 = GeneralDelayedQueueExecute.runAsync(delayedQueue1);
            Future<GeneralResultVo<String>> future2 = GeneralDelayedQueueExecute.runAsync(delayedQueue2);
            Future<GeneralResultVo<String>> future3 = GeneralDelayedQueueExecute.runAsync(delayedQueue3);
            System.out.println("time ->" + System.currentTimeMillis() + " future1:" + future1.get().getReslutData());
            System.out.println("time ->" + System.currentTimeMillis() + " future2:" + future2.get().getReslutData());
            System.out.println("time ->" + System.currentTimeMillis() + " future3:" + future3.get().getReslutData());
    
        }
    
        /**
         * 流水线执行示例
         */
        public void runLine() {
            GeneralDelayedQueue delayedQueue1 = new GeneralDelayedQueue(
                    new TestConsumer1(), "1", "body", 2, 500, 5);
            GeneralDelayedQueue delayedQueue2 = new GeneralDelayedQueue(
                    new TestConsumer2(), "2", "body", 3, 10, 2);
            GeneralDelayedQueue delayedQueue3 = new GeneralDelayedQueue(
                    new TestConsumer1(), "3", "body", 3, 600, 100);
            List<GeneralDelayedQueue> list = new ArrayList<>();
            list.add(delayedQueue1);
            list.add(delayedQueue2);
            list.add(delayedQueue3);
            GeneralResultVo<String> a = GeneralDelayedQueueExecute.runLine(list);
            TestVo t = (TestVo) delayedQueue3.getBodyData().getPreResult();
            System.out.println(t.getA());
            System.out.println(a.getReslutData());
    
        }
    
    
        static class TestConsumer1 implements GeneralQueueConsumerable {
    
    
            @Override
            public GeneralResultVo<String> run(GeneralDelayedQueue task) {
                GeneralDelayedQueue.BodyData<String,String> resultVo = task.getBodyData();
                String body = resultVo.getBody();
                String id = task.getId();
                int currExecuteNum = task.getCurrExecuteNum();
                logger.info("thread ->" + Thread.currentThread().getId() + " time ->" + System.currentTimeMillis() + " 消费延时队列 id -> " + id + " ,第 -> " + (currExecuteNum + 1) + " 次,body -> " + body);
                if (task.getId().equals("3")) {
                    return GeneralResultVo.fail();
                } else {
                    return GeneralResultVo.success("sss");
                }
            }
        }
    
        static class TestConsumer2 implements GeneralQueueConsumerable {
    
            @Override
            public GeneralResultVo<TestVo> run(GeneralDelayedQueue task) {
                GeneralDelayedQueue.BodyData<String,String> resultVo = task.getBodyData();
                String body = resultVo.getBody();
                String id = task.getId();
                int currExecuteNum = task.getCurrExecuteNum();
                logger.info("thread ->" + Thread.currentThread().getId() + "time ->" + System.currentTimeMillis() + " 消费延时队列 id -> " + id + " ,第 -> " + (currExecuteNum + 1) + " 次,body -> " + body);
                TestVo testVo = new TestVo();
                testVo.setA("a");
                testVo.setB("b");
                return GeneralResultVo.success(testVo);
            }
        }
    
        public static class TestVo {
            private String a;
            private String b;
    
            public String getA() {
                return a;
            }
    
            public void setA(String a) {
                this.a = a;
            }
    
            public String getB() {
                return b;
            }
    
            public void setB(String b) {
                this.b = b;
            }
        }
    
    
    }

    2. 具体实现代码

    具体实现的话是根据Delayed延时队列基础之上来实现的。关于怎么Delayed的使用在这里也就不多说了,大家自行百度学习。

    2.1.延时队列实体类:

    首先我们要是实现这个的话既然要根据Delayed延时队列机制来实现,我们就首先实现一个Delayed接口的基础延时队列实体,里面来保存我们的信息,例如延时时间、重试时间、执行次数、以及开发者根据业务需要自定义的主题内容等。该类如下所示:

    package com.b0c0.common.delayedQueue;
    
    
    import com.b0c0.common.delayedQueue.base.GeneralQueueConsumerable;
    
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    
    /**
     * @program: springbootdemo
     * @description: 通用延时队列实体
     * @author: lidongsheng
     * @createData: 2020-09-21 14:01
     * @updateAuthor: lidongsheng
     * @updateData: 2020-09-21 14:01
     * @updateContent:
     * @Version: 0.0.8
     * @email: lidongshenglife@163.com
     * @blog: https://www.b0c0.com
     * @csdn: https://blog.csdn.net/LDSWAN0
     * ************************************************
     * Copyright @ 李东升 2020. All rights reserved
     * ************************************************
     */
    public class GeneralDelayedQueue<T> implements Delayed {
    
    
        public static class BodyData<T,V>{
            /**
             * 开发者自定义的需要的数据体.
             */
            private T body;
    
            /**
             * 如果为任务链任务,上一个任务的执行结果会保存到这里,
             */
            private V preResult;
    
            public T getBody() {
                return body;
            }
    
            protected void setBody(T body) {
                this.body = body;
            }
    
            public V getPreResult() {
                return preResult;
            }
    
            protected void setPreResult(V preResult) {
                this.preResult = preResult;
            }
        }
    
        //任务的唯一id
        private String id;
        /**
         * 任务的自定义数据体
         */
        private BodyData bodyData;
        /**
         * 任务当前的执行次数(可设置此值为maxExecuteNum来达到强制中断之后的重试执行)
         *
         */
        private int currExecuteNum;
        /**
         * 最大执行次数
         * 此值为1 表示只执行一次,不开启重发
         * 此值大于1 表示开启重发,并且此值为最大执行次数(包含首次执行)。
         */
        private int maxExecuteNum;
    
        /**
         * 任务的首次执行的延时时间,只有任务的首次执行时会用到此值进行延时执行
         */
        private long delayedTime;
    
        /**
         * 任务的重发延时时间,重发自定义延时策略会用到此值
         */
        private long retryTime;
        /**
         * 任务的过期时间,任务到达了过期时间就会执行
         * 检测延迟任务是否到期
         */
        private long expireTime;
        /**
         * 上次的延时时间
         */
        private long lastTime = -1;
        /**
         * 时间单位
         */
        private TimeUnit timeUnit;
    
        /**
         * 执行结果一直保存,可在执行器中随时获取,直至开发人员手动调用删除
         * 注意:如果设置为true了,请务必手动调用GeneralDelayedQueueExecute.clearTask 进行删除。否则任务相关信息将一直存在于内存中
         */
        private boolean keepResults;
    
        private GeneralQueueConsumerable consumerable;
    
        public String getId() {
            return id;
        }
    
        public <T,V>BodyData<T,V> getBodyData() {
            return bodyData;
        }
    
        public static<T,V> BodyData<T,V> initBodyData(T userData) {
            BodyData<T,V> bodyData= new BodyData<>();
            bodyData.setBody(userData);
            return bodyData;
        }
    
        public int getCurrExecuteNum() {
            return currExecuteNum;
        }
    
        public int getMaxExecuteNum() {
            return maxExecuteNum;
        }
    
        public long getDelayedTime() {
            return delayedTime;
        }
    
        public long getRetryTime() {
            return retryTime;
        }
    
        public long getLastTime() {
            return lastTime;
        }
    
        protected void setLastTime(long lastTime) {
            this.lastTime = lastTime;
        }
    
        protected void setCurrExecuteNum(int currExecuteNum) {
            this.currExecuteNum = currExecuteNum;
        }
    
        protected void setExpireTime(long expireTime) {
            this.expireTime = expireTime;
        }
    
        public TimeUnit getTimeUnit() {
            return timeUnit;
        }
    
        public GeneralQueueConsumerable getConsumerable() {
            return consumerable;
        }
    
        public void setConsumerable(GeneralQueueConsumerable consumerable) {
            this.consumerable = consumerable;
        }
    
        public boolean isKeepResults() {
            return keepResults;
        }
    
        /**
         * 完整参数的构造方法
         *
         * @param consumerable  具体任务方法
         * @param id            唯一标识
         * @param userData      主题内容
         * @param keepResults   true表示执行结果一直保存,可在执行器中随时获取,直至开发人员手动调用删除
         * @param maxExecuteNum 最大执行次数
         * @param delayedTime   首次执行延时时间
         * @param retryTime     重试延时时间
         * @param timeUnit      时间单位
         */
        public GeneralDelayedQueue(GeneralQueueConsumerable consumerable,String id, T userData,boolean keepResults, int maxExecuteNum, long delayedTime, long retryTime,TimeUnit timeUnit) {
            this.consumerable = consumerable;
            this.id = id;
            this.bodyData = initBodyData(userData);
            this.keepResults = keepResults;
            this.currExecuteNum = 0;
            this.maxExecuteNum = maxExecuteNum;
            this.delayedTime = delayedTime;
            this.retryTime = retryTime;
            this.timeUnit = timeUnit;
    
        }
    
    
        /**
         * 构造方法 默认时间单位秒,自动捕获异常
         *
         * @param consumerable  具体任务方法
         * @param id            唯一标识
         * @param userData      主题内容
         * @param maxExecuteNum 最大执行次数
         * @param delayedTime   首次执行延时时间
         * @param retryTime     重试延时时间
         */
        public GeneralDelayedQueue(GeneralQueueConsumerable consumerable,String id, T userData, int maxExecuteNum, long delayedTime, long retryTime) {
            this(consumerable,id, userData,false, maxExecuteNum, delayedTime, retryTime, TimeUnit.MILLISECONDS);
        }
    
    
        @Override
        public int compareTo(Delayed delayed) {
            long result = this.getDelay(TimeUnit.NANOSECONDS)
                    - delayed.getDelay(TimeUnit.NANOSECONDS);
            if (result < 0) {
                return -1;
            } else if (result > 0) {
                return 1;
            } else {
                return 0;
            }
        }
    
    
        /**
         * 检测延迟任务是否到期
         * 如果返回的是负数则说明到期否则还没到期
         *
         * @param unit
         * @return
         */
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.expireTime - System.nanoTime(), TimeUnit.NANOSECONDS);
        }
    
    }
    

    2.2  延时队列消费者

    延时队列消费者,也就是开发者要进行执行的具体业务方法,开发者实现这个GeneralQueueConsumerable接口里面的run方法即可执行自己的业务逻辑,该接口如下所示:

    package com.b0c0.common.delayedQueue.base;
    
    
    import com.b0c0.common.delayedQueue.GeneralDelayedQueue;
    import com.b0c0.common.domain.vo.GeneralResultVo;
    
    /**
     * @program: springbootdemo
     * @description: 通用延时队列消费
     * @author: lidongsheng
     * @createData: 2020-09-21 15:01
     * @updateAuthor: lidongsheng
     * @updateData: 2020-09-21 15:01
     * @updateContent:
     * @Version: 0.0.8
     * @email: lidongshenglife@163.com
     * @blog: https://www.b0c0.com
     * @csdn: https://blog.csdn.net/LDSWAN0
     * ************************************************
     * Copyright @ 李东升 2020. All rights reserved
     * ************************************************
     */
    
    
    /**
     * 开发者实现这个GeneralQueueConsumerable接口来实现重试
     */
    public interface GeneralQueueConsumerable {
    
        /**
         * 开发者要进行执行的具体业务方法,重写run方法即可执行自己的业务逻辑。
         * @param task 具体任务
         * @return 返回false根据重发的策略进行重发执行方法,如果设置了自定义的重发,则会根据重试机制进行重试,true表示执行结束。
         */
    
        <T>GeneralResultVo<T> run(GeneralDelayedQueue task);
    }
    

    2.3  自定义重试时间

    自定义的重试时间实现RetryTimeTypeable接口里面的getTime方法即可。该接口如下所示:

    package com.b0c0.common.delayedQueue.base;
    
    import com.b0c0.common.delayedQueue.GeneralDelayedQueue;
    
    /**
     * @program: springbootdemo
     * @description: 重试时间的type接口,自定义的重试时间实现此接口即可
     * @author: lidongsheng
     * @createData: 2020-09-25 19:07
     * @updateAuthor: lidongsheng
     * @updateData: 2020-09-25 19:07
     * @updateContent: 重试时间的type接口
     * @Version: 0.0.8
     * @email: lidongshenglife@163.com
     * @blog: https://www.b0c0.com
     * @csdn: https://blog.csdn.net/LDSWAN0
     * ************************************************
     * Copyright @ 李东升 2020. All rights reserved
     * ************************************************
     */
    
    public interface RetryTimeTypeable {
        /**
         * 返回延时时间
         * 开发者实现这个此接口重写里面的getTime方法即可根据具体需要进行定义重试机制延时时间。
         * @param task
         * @return 返回此次的延时时间,单位和GeneralDelayedQueue构造方法中设置的时间单位一致(默认为秒)。
         */
        long getTime(GeneralDelayedQueue task);
    }
    

    2.4  默认的四种重试时间实现

    1. 渐进步长 retryTime越大,重试延时时间的间隔时间就会越来越大
    2. 固定时间
    3. 固定步长
    4. 斐波那契数列

    这四种感觉是最常用的重试时间,已经默认集成,使用时可直接调用。该类如下所示:

    package com.b0c0.common.delayedQueue;
    
    
    import com.b0c0.common.delayedQueue.base.RetryTimeTypeable;
    
    import java.util.UUID;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @program: springbootdemo
     * @description: 默认的重试时间实现接口
     * @author: lidongsheng
     * @createData: 2020-09-25 19:41
     * @updateAuthor: lidongsheng
     * @updateData: 2020-09-25 19:41
     * @updateContent:
     * @Version: 0.0.8
     * @email: lidongshenglife@163.com
     * @blog: https://www.b0c0.com
     * @csdn: https://blog.csdn.net/LDSWAN0
     * ************************************************
     * Copyright @ 李东升 2020. All rights reserved
     * ************************************************
     */
    
    public class DefaultRetryTimeTypeator {
    
    
        /**
         * 渐进步长 retryTime越大,重试延时时间的间隔时间就会越来越大
         * 栗子 retryTime = 5
         * 第一次重试延时 5 = 0 + 1 * 5
         * 第二次重试延时 15 = 5 + 2 * 5
         * 第三次重试延时  30 = 15 + 3 * 5
         * 第四次重试延时  50 = 30 + 4 * 5
         * ... 第10次重试延时  225
         * ... 第20次重试延时  950
         * ... 第30次重试延时  2175
         * ... 第40次重试延时  3900
         * ... 第50次重试延时  6125
         * @return
         */
        public static RetryTimeTypeable AdvanceStepTimeRetryTimeTypeator() {
            return new AdvanceStepTimeRetryTimeTypeator();
        }
    
        /**
         * 固定时间
         * 栗子 retryTime = 5
         * 第一次重试延时 5 第二次重试延时 5 第三次重试延时 5
         *
         * @return
         */
        public static RetryTimeTypeable FixDelayedRetryTimeTypeator() {
            return new FixDelayedRetryTimeTypeator();
        }
    
        /**
         * 固定步长
         * 栗子 retryTime = 5
         * 第一次重试延时 5 第二次重试延时 10 第三次重试延时 15
         *
         * @return
         */
        public static RetryTimeTypeable FixStepTimeRetryTimeTypeator() {
            return new FixStepTimeRetryTimeTypeator();
        }
    
        /**
         * 斐波那契数列 建议不要超过30
         * CurrExecuteNum = 10  return 55
         * CurrExecuteNum = 20  return 6765
         * CurrExecuteNum = 30  return 832040
         * CurrExecuteNum = 40  return 102334155
         * .....
         * @return
         */
        public static RetryTimeTypeable FibonacciSeriesRetryTimeTypeator() {
            return new FibonacciSeriesRetryTimeTypeator();
        }
    
        private static class AdvanceStepTimeRetryTimeTypeator implements RetryTimeTypeable {
    
            @Override
            public long getTime(GeneralDelayedQueue task) {
                return (task.getCurrExecuteNum() == 1 ? 0 : task.getLastTime()) + task.getCurrExecuteNum() * task.getRetryTime();
            }
        }
    
        private static class FixDelayedRetryTimeTypeator implements RetryTimeTypeable {
    
            @Override
            public long getTime(GeneralDelayedQueue task) {
                return task.getRetryTime();
            }
        }
    
        private static class FixStepTimeRetryTimeTypeator implements RetryTimeTypeable {
            @Override
            public long getTime(GeneralDelayedQueue task) {
                return task.getCurrExecuteNum() * task.getRetryTime();
            }
        }
    
        private static class FibonacciSeriesRetryTimeTypeator implements RetryTimeTypeable {
            @Override
            public long getTime(GeneralDelayedQueue task) {
                int a = 0, b = 1, sum;
                int n = task.getCurrExecuteNum();
                for (int i = 0; i < n; i++) {
                    sum = a + b;
                    a = b;
                    b = sum;
                }
                return a;
            }
        }
    
    //    public static void main(String[] args) {
    //        AdvanceStepTimeRetryTimeTypeator retryTimeTypeator = new AdvanceStepTimeRetryTimeTypeator();
    //        GeneralDelayedQueue delayedQueue = new GeneralDelayedQueue(
    //                UUID.randomUUID().toString(),
    //                null,
    //                8, 0, 150,TimeUnit.MILLISECONDS);
    //        for (int i = 0; i < 8; i++) {
    //            delayedQueue.setCurrExecuteNum(i);
    //            long time = retryTimeTypeator.getTime(delayedQueue);
    //            delayedQueue.setLastTime(time);
    //            System.out.println(time);
    //        }
    //    }
    }
    

    2.5  延时队列执行器(主要)

    延时队列执行器就是调用的入口类,此类应该时储存一下延时队列实体以及具体的执行方法实体等,以及承担具体执行。并且能够支持多线程调用,所以此类已经实现了Runnable接口,开发者可以根据需要来多线程调用或者直接同步调用。该类如下所示:

    package com.b0c0.common.delayedQueue;
    
    
    import com.b0c0.common.delayedQueue.base.RetryTimeTypeable;
    import com.b0c0.common.domain.vo.GeneralResultCodeEnum;
    import com.b0c0.common.domain.vo.GeneralResultVo;
    import com.google.common.util.concurrent.ThreadFactoryBuilder;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.*;
    import java.util.logging.Logger;
    
    
    /**
     * @program: springbootdemo
     * @description: 通用延时队列执行器
     * @author: lidongsheng
     * @createData: 2020-09-21 15:50
     * @updateAuthor: lidongsheng
     * @updateData: 2020-09-21 15:50
     * @updateContent:
     * @Version: 0.0.8
     * @email: lidongshenglife@163.com
     * @blog: https://www.b0c0.com
     * @csdn: https://blog.csdn.net/LDSWAN0
     * ************************************************
     * Copyright @ 李东升 2020. All rights reserved
     * ************************************************
     */
    
    /**
     * 延时队列执行器就是调用的入口类,此类应该时储存一下延时队列实体以及具体的执行方法实体等,以及承担具体执行。
     * 并且能够支持多线程调用,所以此类已经实现了Runnable接口,开发者可以根据需要来多线程调用或者直接同步调用。
     */
    public class GeneralDelayedQueueExecute {
    
        private static final Logger logger = Logger.getLogger(GeneralDelayedQueueExecute.class.getName());
    
        //延时队列
        private static Map<String, DelayQueue<GeneralDelayedQueue>> delayQueueMap = new ConcurrentHashMap<>();
        //延时队列主题信息
        private static Map<String, GeneralDelayedQueue> taskMap = new ConcurrentHashMap<>();
        //重试时间的具体实现
        private static Map<String, RetryTimeTypeable> retryTimeTypeableMap = new ConcurrentHashMap<>();
        //用来保证程序执行完成之后才能获取到执行结果
        private static Map<String, CountDownLatch> countDownLatchMap = new ConcurrentHashMap<>();
        //存储每次执行的具体结果信息
        private static Map<String, List> resultListMap = new ConcurrentHashMap<>();
        //异步执行的线程池
        private static ExecutorService executor;
    
        static {
            final ThreadFactory GENERAL_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("com.b0c0.commom.delayedQueue-pool-general-%d").build();
            //核心线程池大小
            final int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() << 1 + 1;
            //最大线程池大小
            final int MAX_POOL_SIZE = CORE_POOL_SIZE << 1;
            //线程任务队列大小
            final int QUEUE_CAPACITY = 500;
            //空闲线程的存活时间.默认情况下核心线程不会退出
            final int KEEP_ALIVE_TIME = 15;
            executor = new ThreadPoolExecutor(
                    CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>(QUEUE_CAPACITY), GENERAL_THREAD_FACTORY, new ThreadPoolExecutor.DiscardOldestPolicy());
        }
    
        /**
         * 用户可自定义异步执行时候的线程池
         *
         * @param executor
         */
        public static void setExecutor(ExecutorService executor) {
            GeneralDelayedQueueExecute.executor = executor;
        }
    
        /**
         * 执行方法
         *
         * @param task              具体任务
         * @param retryTimeTypeator 重试延时时间策略
         */
        public static <T> GeneralResultVo<T> run(GeneralDelayedQueue task, RetryTimeTypeable retryTimeTypeator) {
            DelayQueue<GeneralDelayedQueue> queue = new DelayQueue<>();
            initTask(task, retryTimeTypeator, queue);
            return execute(task);
        }
    
        public static <T> GeneralResultVo<T> run(GeneralDelayedQueue task) {
            return run(task, DefaultRetryTimeTypeator.FixDelayedRetryTimeTypeator());
        }
    
        /**
         * 异步执行方法,可以单独为某任务根据传入一个线程池执行
         *
         * @param task              具体任务
         * @param retryTimeTypeator 重试延时时间策略
         * @param executor          用户自定义线程池
         */
        public static <T> Future<GeneralResultVo<T>> runAsync(GeneralDelayedQueue task, RetryTimeTypeable retryTimeTypeator, ExecutorService executor) {
            return executor.submit(() -> run(task, retryTimeTypeator));
        }
    
        /**
         * 异步执行方法 默认内置线程池
         *
         * @param task              具体任务
         * @param retryTimeTypeator 重试延时时间策略
         */
        public static <T> Future<GeneralResultVo<T>> runAsync(GeneralDelayedQueue task, RetryTimeTypeable retryTimeTypeator) {
            return executor.submit(() -> run(task, retryTimeTypeator));
        }
    
        /**
         * 异步执行方法  默认内置线程池
         *
         * @param task 具体任务
         */
        public static <T> Future<GeneralResultVo<T>> runAsync(GeneralDelayedQueue task) {
            return executor.submit(() -> run(task));
        }
    
        /**
         * 任务链的执行方法 自定义顺序完成(流水线完成任务) 例如A -> B -> C
         * 并且任务的执行结果会自动传递给下一任务。比如A任务的执行结果,会传递给B任务。
         * 注意:
         * 此方法返回的为流水线最后一个任务的值,若想在最后得到某个任务或者所有任务的具体执行结果,须将 GeneralDelayedQueue.keepResults,设置为true;
         *
         * @param tasks              具体任务list集合,会按照集合的添加顺序来流水线顺序执行任务
         * @param retryTimeTypeators 重试延时时间策略
         * @param <T>
         * @return 任务链执行返回值为:返回的为最后一个运行任务的返回值。
         */
        public static <T> GeneralResultVo<T> runLine(List<GeneralDelayedQueue> tasks, List<RetryTimeTypeable> retryTimeTypeators) {
    
            if (tasks == null || tasks.isEmpty() || retryTimeTypeators == null || retryTimeTypeators.isEmpty()) {
                return GeneralResultVo.fail(GeneralResultCodeEnum.PARAM_ERROR.getCode(), "任务集合和重试延时时间策略集合不能为空");
            }
            int taskSize = tasks.size();
            if (taskSize != retryTimeTypeators.size()) {
                return GeneralResultVo.fail(GeneralResultCodeEnum.PARAM_ERROR.getCode(), "任务集合和重试延时时间策略集合大小不一致,无法相互对应");
            }
            DelayQueue<GeneralDelayedQueue> queue = new DelayQueue<>();
            GeneralResultVo<T> resultVo = GeneralResultVo.fail();
            for (int i = 0; i < taskSize; i++) {
                initTask(tasks.get(i), retryTimeTypeators.get(i), queue);
                resultVo = execute(tasks.get(i));
                if (resultVo.isSuccess()) {
                    if (i != 0 && i != taskSize - 1) {
                        tasks.get(i + 1).getBodyData().setPreResult(resultVo.getReslutData());
                    }
                } else {
                    break;
                }
            }
            return resultVo;
        }
    
        public static <T> GeneralResultVo<T> runLine(List<GeneralDelayedQueue> tasks) {
            int taskSize = tasks.size();
            List<RetryTimeTypeable> retryTimeTypeators = new ArrayList<>();
            for (int i = 0; i < taskSize; i++) {
                retryTimeTypeators.add(DefaultRetryTimeTypeator.FixDelayedRetryTimeTypeator());
            }
            return runLine(tasks, retryTimeTypeators);
        }
    
        /**
         * 异步执行任务链方法,可以单独为某任务根据传入一个线程池执行
         *
         * @param tasks              具体任务
         * @param retryTimeTypeators 重试延时时间策略
         * @param executor           用户自定义线程池
         */
        public static <T> Future<GeneralResultVo<T>> runLinesync(List<GeneralDelayedQueue> tasks, List<RetryTimeTypeable> retryTimeTypeators, ExecutorService executor) {
            return executor.submit(() -> runLine(tasks, retryTimeTypeators));
        }
    
        /**
         * 异步执行任务链方法 默认内置线程池
         *
         * @param tasks              具体任务
         * @param retryTimeTypeators 重试延时时间策略
         */
        public static <T> Future<GeneralResultVo<T>> runLinesync(List<GeneralDelayedQueue> tasks, List<RetryTimeTypeable> retryTimeTypeators) {
            return executor.submit(() -> runLine(tasks, retryTimeTypeators));
        }
    
        /**
         * 异步执行任务链方法  默认内置线程池
         *
         * @param tasks 具体任务
         */
        public static <T> Future<GeneralResultVo<T>> runLinesync(List<GeneralDelayedQueue> tasks) {
            return executor.submit(() -> runLine(tasks));
        }
    
        private static <T> GeneralResultVo<T> execute(GeneralDelayedQueue task) {
    
            GeneralResultVo<T> result = GeneralResultVo.fail();
            String id = task.getId();
            RetryTimeTypeable retryTimeTypeator = retryTimeTypeableMap.get(id);
            List<GeneralResultVo<T>> resultList = resultListMap.get(id);
            CountDownLatch countDownLatch = countDownLatchMap.get(id);
            DelayQueue<GeneralDelayedQueue> queue = delayQueueMap.get(id);
            try {
                result = task.getConsumerable().run(queue.take());
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                task.setLastTime(retryTimeTypeator.getTime(task));
                //添加执行结果
                resultList.add(result);
                countDownLatch.countDown();
                //延时执行
                if (task.getCurrExecuteNum() < task.getMaxExecuteNum() - 1 && !result.isSuccess()) {
                    task.setCurrExecuteNum(task.getCurrExecuteNum() + 1);
                    setExpireTime(task);
                    queue.offer(task);
                    result = execute(task);
                } else {
                    while ((countDownLatch.getCount()) > 0) {
                        countDownLatch.countDown();
                    }
                }
                if (!task.isKeepResults()) {
                    clearTask(task.getId());
                }
            }
            return result;
        }
    
        private static void setExpireTime(GeneralDelayedQueue task) {
            long expireTime = 0;
            RetryTimeTypeable retryTimeTypeator = retryTimeTypeableMap.get(task.getId());
            if (task.getCurrExecuteNum() == 0) {
                expireTime = TimeUnit.NANOSECONDS.convert(
                        task.getDelayedTime(), task.getTimeUnit()) + System.nanoTime();
            } else {
                expireTime = TimeUnit.NANOSECONDS.convert(
                        retryTimeTypeator.getTime(task), task.getTimeUnit()) + System.nanoTime();
            }
            task.setExpireTime(expireTime);
        }
    
        /**
         * 得到全部的执行结果
         *
         * @param taskId     任务id标识
         * @param fastReturn 立即返回 true 代表立即返回, false 代表必须等到最大执行次数后返回(list.size = maxExecuteNum)
         * @param outTime    超时时间
         * @param timeUnit   时间单位
         * @return 执行结果列表
         */
        public static <T> List<GeneralResultVo<T>> getResultList(String taskId, boolean fastReturn, long outTime, TimeUnit timeUnit) {
            try {
                if (taskMap.containsKey(taskId)) {
                    if (!fastReturn) {
                        awaitCountDown(taskId, outTime, timeUnit);
                    }
                    return resultListMap.get(taskId);
                } else {
                    GeneralResultVo generalResultVo = GeneralResultVo.fail(
                            GeneralResultCodeEnum.TASK_EXIST.getCode(), GeneralResultCodeEnum.TASK_EXIST.getDesc());
                    List<GeneralResultVo<T>> res = new ArrayList<>();
                    res.add(generalResultVo);
                    return res;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        /**
         * 等待任务全部完成
         *
         * @param id       任务id
         * @param timeOut  等待超时时间
         * @param timeUnit 时间单位
         * @return countDownLatch当前计数值
         * @throws InterruptedException
         */
        private static long awaitCountDown(String id, Long timeOut, TimeUnit timeUnit) throws InterruptedException {
            CountDownLatch countDownLatch = countDownLatchMap.get(id);
            if (timeOut == null) {
                countDownLatch.await();
            } else {
                countDownLatch.await(timeOut, timeUnit);
            }
            return countDownLatch.getCount();
        }
    
        /**
         * 根据任务初始化任务信息
         *
         * @param task
         * @param retryTimeTypeator
         */
        private static void initTask(GeneralDelayedQueue task, RetryTimeTypeable retryTimeTypeator, DelayQueue<GeneralDelayedQueue> queue) {
            String id = task.getId();
            delayQueueMap.put(id, queue);
            taskMap.put(id, task);
            retryTimeTypeableMap.put(id, retryTimeTypeator);
            CountDownLatch countDownLatch = new CountDownLatch(task.getMaxExecuteNum());
            countDownLatchMap.put(id, countDownLatch);
            setExpireTime(task);
            resultListMap.put(id, new ArrayList<>());
            queue.offer(task);
        }
    
    
        /**
         * 根据任务id清除任务全部的map信息
         * GeneralDelayedQueue的keepResults如果设置为true了,请务必手动调用此方法进行删除。否则任务相关信息将一直存在于内存中
         *
         * @param taskId 任务id
         */
        public static void clearTask(String taskId) {
            CountDownLatch countDownLatch = countDownLatchMap.get(taskId);
            while (countDownLatch != null && countDownLatch.getCount() > 0) {
                countDownLatch.countDown();
            }
            delayQueueMap.remove(taskId);
            taskMap.remove(taskId);
            countDownLatchMap.remove(taskId);
            taskMap.remove(taskId);
            resultListMap.remove(taskId);
            retryTimeTypeableMap.remove(taskId);
        }
    }
    

    关于更多com.b00c.common 依赖的源码详情、使用方法、更新历史请去下面这个github仓库查看。

    GitHub - DeBug-Bug/common

    展开全文
  • Dubbo配置——重试次数

    千次阅读 2019-01-31 14:23:35
    重试次数 原因:当我们某一个服务,由于各种原因,比如:网络不佳,服务运行缓慢等,导致超时,远程方法调用失败,我们可以通过调整重试次数,让它多试上几次。 重试次数是一个整数,不包含第一次调用,0 代表不...

    重试次数

    • 原因:当我们某一个服务,由于各种原因,比如:网络不佳,服务运行缓慢等,导致超时,远程方法调用失败,我们可以通过调整重试次数,让它多试上几次。
    • 重试次数是一个整数,不包含第一次调用,0 代表不重试

     

    • 在服务方设置重试次数
    • 适用范围:
      • 幂等(设置重试次数)【无论重试多少次,产生的效果都是一样的,例如;删除、查询、修改】
      • 非幂等(不能设置查询次数)【每一次运行都会产生新的效果,例如:新增】
    展开全文
  • RocketMQ(四)——消息重试

    万次阅读 2017-08-17 16:11:39
    对于MQ,可能存在各种异常情况,导致消息无法最终被Consumer消费掉,因此就有了消息失败重试机制。很显示,消息重试分为2种:Producer端重试和Consumer端重试


    对于MQ,可能存在各种异常情况,导致消息无法最终被Consumer消费掉,因此就有了消息失败重试机制。很显示,消息重试分为2种:Producer端重试和Consumer端重试。

    一、 Producer端重试

    生产者端的消息失败,也就是Producer往MQ上发消息没有发送成功,比如网络抖动导致生产者发送消息到MQ失败。
    这种消息失败重试我们可以手动设置发送失败重试的次数,看一下代码:

    /**
     * Producer,发送消息
     */
    public class Producer {
    	public static void main(String[] args) throws MQClientException, InterruptedException {
    		DefaultMQProducer producer = new DefaultMQProducer("group_name");
    		producer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876");
    		producer.setRetryTimesWhenSendFailed(3);
    		producer.start();
    
    		for (int i = 0; i < 100; i++) {
    			try {
    				Message msg = new Message("TopicTest", 				// topic
    						"TagA", 									// tag
    						("HelloWorld - RocketMQ" + i).getBytes()	// body
    				);
    				SendResult sendResult = producer.send(msg, 1000);
    				System.out.println(sendResult);
    			} catch (Exception e) {
    				e.printStackTrace();
    				Thread.sleep(1000);
    			}
    		}
    
    		producer.shutdown();
    	}
    }
    
    *生产者端失败重试*

    上图代码示例的处理手段是:如果该条消息在1S内没有发送成功,那么重试3次。

    producer.setRetryTimesWhenSendFailed(3); //失败的情况重发3次
    producer.send(msg, 1000); //消息在1S内没有发送成功,就会重试


    二、 Consumer端重试

    消费者端的失败,分为2种情况,一个是exception,一个是timeout。

    1. Exception

    消息正常的到了消费者,结果消费者发生异常,处理失败了。例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。
    这里涉及到一些问题,需要我们思考下,比如,消费者消费消息的状态有哪些定义?如果失败,MQ将采取什么策略进行重试?假设一次性批量PUSH了10条,其中某条数据消费异常,那么消息重试是10条呢,还是1条呢?而且在重试的过程中,需要保证不重复消费吗?

    public enum ConsumeConcurrentlyStatus {
        /**
         * Success consumption
         */
        CONSUME_SUCCESS,
        /**
         * Failure consumption,later try to consume
         */
        RECONSUME_LATER;
    }
    
    *ConsumeConcurrentlyStatus枚举的源码*

    通过查看源码,消息消费的状态,有2种,一个是成功(CONSUME_SUCCESS),一个是失败&稍后重试(RECONSUME_LATER)
    RECONSUME_LATER的策略

    RECONSUME_LATER的策略

    在启动broker的过程中,可以观察到上图日志,你会发现RECONSUME_LATER的策略:如果消费失败,那么1S后再次消费,如果失败,那么5S后,再次消费,…直至2H后如果消费还失败,那么该条消息就会终止发送给消费者了!
    RocketMQ为我们提供了这么多次数的失败重试,但是在实际中也许我们并不需要这么多重试,比如重试3次,还没有成功,我们希望把这条消息存储起来并采用另一种方式处理,而且希望RocketMQ不要再重试呢,因为重试解决不了问题了!这该如何做呢?
    看一段代码:

    /**
     * Consumer,订阅消息
     */
    public class Consumer {
    
    	public static void main(String[] args) throws InterruptedException, MQClientException {
    		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
    		consumer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876");
    		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    		consumer.subscribe("TopicTest", "*");
    
    		consumer.registerMessageListener(new MessageListenerConcurrently() {
    			@Override
    			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    				try {
    					MessageExt msg = msgs.get(0);
    					String msgbody = new String(msg.getBody(), "utf-8");
    					System.out.println(msgbody + " Receive New Messages: " + msgs);
    					if (msgbody.equals("HelloWorld - RocketMQ4")) {
    						System.out.println("======错误=======");
    						int a = 1 / 0;
    					}
    				} catch (Exception e) {
    					e.printStackTrace();
    					if (msgs.get(0).getReconsumeTimes() == 3) {
    						// 该条消息可以存储到DB或者LOG日志中,或其他处理方式
    						return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
    					} else {
    						return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
    					}
    				}
    				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    			}
    		});
    
    		consumer.start();
    		System.out.println("Consumer Started.");
    	}
    }
    
    RECONSUME_LATER的重试测试代码

    生产端发送了10条消息,看一下消费端的运行效果:
    RECONSUME_LATER的重试效果

    RECONSUME_LATER的重试效果

    观察上图发现,HelloWorld - RocketMQ4的消息的***reconsumeTimes***属性值发生了变化,其实这个属性就代表了消息重试的次数!因此我们可以通过reconsumeTimes属性,让MQ超过了多少次之后让他不再重试,而是记录日志等处理,也就是上面代码catch中的内容。

    2. Timeout

    比如由于网络原因导致消息压根就没有从MQ到消费者上,那么在RocketMQ内部会不断的尝试发送这条消息,直至发送成功为止!(比如集群中一个broker失败,就尝试另一个broker)
    延续Exception的思路,也就是消费端没有给RocketMQ返回消费的状态,即没有return ConsumeConcurrentlyStatus.CONSUME_SUCCESS或return ConsumeConcurrentlyStatus.RECONSUME_LATER,这样的就认为没有到达Consumer端。
    下面进行模拟:

    1)消费端有consumer1和consumer2这样一个集群。
    2)consumer1端的业务代码中暂停1分钟并且不发送接收状态给RocketMQ。

    public class Consumer1 {
    
    	public static void main(String[] args) throws InterruptedException, MQClientException {
    		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
    		consumer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876");
    		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    		consumer.subscribe("Topic1", "Tag1 || Tag2 || Tag3");
    
    		consumer.registerMessageListener(new MessageListenerConcurrently() {
    			@Override
    			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
    				try {					
    						String topic = msg.getTopic();
    						String msgBody = new String(msg.getBody(),"utf-8");
    						String tags = msg.getTags();
    						System.out.println("收到消息:" + " topic:" + topic + " ,tags:" + tags + " ,msg:" + msgBody);
    						
    						// 表示业务处理时间
    						System.out.println("=========开始暂停==========");
    						Thread.sleep(60000);
    					}
    				} catch (Exception e) {
    					e.printStackTrace();
    					return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
    				}
    				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    			}
    		});
    
    		consumer.start();
    		System.out.println("Consumer Started.");
    	}
    }
    
    Consumer1端Timeout异常测试代码

    3)启动consumer1和consumer2。
    4)启动Producer,只发送一条数据。
    看一下此时consumer1和consumer2的运行结果:
    consumer1

    Consumer1

    Consumer2-未接收到消息

    Consumer2-未接收到消息

    发现consumer1接收到消息并且暂停,consumer2未接收到消息。

    5)关闭consumer1。
    观察consumer2的运行结果:
    Consumer2-接收到消息

    Consumer2-接收到消息

    总结

    Producer端没什么好说的,Consumer端值得注意。对于消费消息而言,存在2种指定的状态(成功 OR 失败重试),如果一条消息在消费端处理没有返回这2个状态,那么相当于这条消息没有达到消费者,势必会再次发送给消费者!也即是消息的处理必须有返回值,否则就进行重发。

    展开全文
  • 在一个微服务系统里,服务调用(同步或异步)失败处理是一个非常重要的一个环节,而失败重试又是一个常用的处理模式;本文从消息中间件的角度来分析下 AWS 上常见的消息服务的重试和失败处理策略。...

    在一个微服务系统里,服务调用(同步或异步)失败处理是一个非常重要的一个环节,而失败重试又是一个常用的处理模式;本文从消息中间件的角度来分析下 AWS 上常见的消息服务的重试和失败处理策略。

    在这里插入图片描述

    在一个微服务系统里,服务调用(同步或异步)失败处理是一个非常重要的一个环节,而这些失败场景里,通信异常处理又是比较常见的问题;如我在微服务架构开发和平台演进篇 中所总结的通信异常处理常见手段:

    在这里插入图片描述

    微服务之间利用消息系统进行解耦和异步通信是一个常见的事件驱动模式,所以本篇我们来总结下,AWS 消息服务提供了哪些自动重试的机制和失败处理策略;本文涉及的服务包括 Amazon SNS, Amazon SQS 及 Amazon EventBridge。

    Amazon SQS

    Amazon Simple Queue Service (SQS) 是一种完全托管的轻量级消息队列服务,可让您分离和扩展微服务、分布式系统和无服务器应用程序。在 SQS 中有个特殊的消息队列死信队列(Dead Letter Queue)来保存正常消息队列中未被成功处理的消息。通常可以设定标准队列里面的消息被读取 N 次但没有被删除就认为未处理成功,这样的消息会自动被转存到死信队列。

    Amazon SNS - SQS 为订阅者模式

    在这里插入图片描述

    假定由于网络等原因后端 SQS 队列不可用,SNS 支持自动重试,比如说下图的第一个SQS Queue (Q1)突然不可用,那对于 SNS 服务而言,会尝试如下重试策略:(1)立刻重试10次(2)每隔20秒重试一次,总计不超过10,000次 (可最多持续23天)

    Amazon SNS - Lambda 函数 为订阅者模式

    在这里插入图片描述

    对于 Lambda 函数,如果订阅消息的 Lambda 函数不可用, 重试策略是(1)每隔1秒钟,重试一次,总共2次(2)接着随机在1分钟~20分钟选一个时间间隔重试不超过20次(3)每隔20分钟再重试38次;总共50次尝试,持续最长12个小时的重试尝试。

    Amazon SNS - HTTP/S 为订阅者模式

    对于 HTTP/S 订阅,首先SNS 如何定义一个失败的消息传递:(1)HTTP status codes 100 to 101 and 500 to 599 (inclusive).(2)A request timeout (15 seconds).(3)Any connection error such as connection timeout, endpoint unreachable, bad SSL certificate, etc.

    对于这样的订阅,Amazon SNS 支持自定义的重试策略(delivery policies)最多100次重试,消息在系统保留时间最多1个小时。重试策略会横跨4个阶段(1)即刻重试(无延迟阶段)(2)前退避阶段,使用最短延迟重试指定重试次数(3)退避阶段,设置最短延迟和最长延迟时间,利用重试退避功能指定从最短延迟到最长延迟之间增长有多快(4)后退避阶段,使用最长延迟时间重试操作。默认重试3次,每个20秒一次。

    在这里插入图片描述在这里插入图片描述

    重试退避功能用于计算延迟的算法,在退避阶段,从第一次到最后一次重试的延迟时间计算方法(1)线性的(2)算术的(3)几何的(4)指数的:

    在这里插入图片描述

    Amazon EventBridge

    Amazon EventBridge 是一种无服务器事件总线,支持您使用自己的应用程序、软件即服务 (SaaS) 应用程序和 AWS 服务的数据轻松将应用程序连接到一起。EventBridge 提供来自事件源(例如 Zendesk、Datadog 或 Pagerduty)的实时数据流,并将该数据路由到 AWS Lambda 之类的目标。您可以设置路由规则来确定发送数据的目的地,以便构建能够实时响应所有数据源的应用程序架构。EventBridge 让事件驱动型应用程序的构建变得简单,因为它可以为您完成事件摄取和传送、安全保障、授权以及错误处理工作。

    该服务的重试策略如下:

    支持失败自动重试,最长事件消息保留24小时左右默认重试策略,利用指数退避算法,最多重试次数在185次左右

    小结

    在这里插入图片描述

    在这里插入图片描述


    本文首发于 GitChat,未经授权不得转载,转载需与 GitChat 联系。

    阅读全文: http://gitbook.cn/gitchat/activity/5d5bfacc1d99df205cf5c0cb

    您还可以下载 CSDN 旗下精品原创内容社区 GitChat App ,阅读更多 GitChat 专享技术内容哦。

    FtooAtPSkEJwnW-9xkCLqSTRpBKX

    展开全文
  • Dubbo重试次数

    2019-02-10 12:40:00
     *我们应该在幂等方法上设置重试次数【查询、删除、修改】,在非幂等方法上禁止设置重试次数。  ★幂等:指多次运行方法所产生的最终效果是一致的   1 <!--3、声明需要调用的远程服务接口,生成远程服务...
  • RocketMq重试及消息不丢失机制

    千次阅读 2019-05-27 15:57:49
    1、消息重试机制 由于MQ经常处于复杂的分布式系统中,考虑网络波动、服务宕机、程序异常因素,很有可能出现消息发送或者消费失败的问题。因此,消息的重试就是所有MQ中间件必须考虑到的一个关键点。如果没有消息重试...
  • rabbimq消费者实现异常重试机制

    千次阅读 2018-07-05 10:37:31
    功能描述异常重试指的是当消费者处理消息异常失败时,为保证数据最终一致性,通过设置重试策略来对消息进行重复再消费。对于重试策略我们指定延迟多长时间重试一次,重试多少次,以及时间单位等。策略描述原理:利用...
  • 如果消费者处理消息失败后不重试,然后发送应答给rabbitmq,rabbitmq就会将队列中的消息删除,从而造成消息的丢失。所以我们要在消费者处理消息失败的时候,重试一定的次数。比如重试3次,如果重试3次之后还是失败...
  • 事实上,有时候某些测试需要依赖于组件,不可能 100% 的可靠,Flaky 不会删除这些测试,或者标记 @skip ,而是会自动的重试测试。 想任何的 nose 插件一样,Flaky 可以通过命令行执行: nosetests --with-flaky 把...
  • TCP重试机制与运用实例

    千次阅读 2018-02-05 21:21:31
    TCP重试机制与运用实例 second60 20180205 1. TCP重试机制简介   在TCP三次握手的过程中,当客户端发送SYN分节之后,如果没有收到服务端返回的确认ACK,那么TCP会在6s后继续发送SYN分节,一直没收到,25s继续...
  • grpc 开发进阶 - 失败重试

    万次阅读 2020-05-14 18:41:54
    RPC调用失败情况分析 RPC 调用失败可以分为三种情况: RPC 请求还没有离开客户端 ...因为这两种情况,服务端的逻辑并没有开始处理请求,所以始终可以重试,也被称为透明重试(transparent retries) 对于第一
  • RocketMQ-重试队列

    千次阅读 2019-08-21 16:31:07
    什么场景下使用重试队列 消费端一直不回传消费结果,MQ认为消息没有收到,Consumer下一次拉取,Broker依然会发送该消息,所以,任何异常都要返回ConsumeConcurrentlyStatus.RECONSUME_LATER,这样MQ会将消息放到...
  • RabbitMQ重试机制

    千次阅读 2021-01-23 13:23:31
    1、RabbitMQ重试机制的简介 RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息连接是否已经断开,这个设置的原因是 RabbitMQ 允许消费者消费一条消息的时间...
  • 首先声明一点,这里的重试并不是报错以后的重试,而是负载均衡客户端发现远程请求实例不可到达后,去重试其他实例。 feign重试 feign的重试机制默认是关闭的,源码如下 //FeignClientsConfiguration.java @Bean...
  • Zuul ribbon 重试失效分析

    千次阅读 2019-06-05 21:12:59
    Zuul ribbon 重试机制 问题描述: Zuul转发POST请求接口异常,read timeout,没有进行重试,期望进行重试! 配置参数模拟 spring.cloud.loadbalancer.retry.enabled=true ribbon.ConnectTimeout=5000 ribbon....
  • RocketMQ系列之消息重试+重复消费

    万次阅读 2020-06-04 13:18:48
    上节我们介绍了RMQ的几大模块以及每个模块的作用,前面也介绍了RMQ的订阅消费案例,今天我们来看一个RMQ的消息重试机制和重复消费的问题,了解这2点有助于我们更好更合理的处理消息消费的异常情况。 为什么会...
  • 1,消费者代码: package com.ryfchina.ipay.regionalpay.mq; import ... import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client...
  • 手动删除oracle数据库DBF文件时,将无法正常启动数据库, 可以尝试将删除的DBF数据文件进行离线后重试: 1.alter database open 命令可以提示数据文件序号 2.alter database datafile 10 offline drop 命令可以根据...
  • 1.重试机制 ribbon 1.1 解释:当一次服务调用失败后,不会立即抛出异常,而是再次重试另一个服务。 1.2 实现步骤: 在服务调用端配置文件中开启重试机制 spring: cloud: loadbalancer: retry: enabled: true # ...
  • 网速很慢,能上qq,不能看视频,...6.实在不行就“还原系统”或“装系统”! 7.用360卫士修复一下主页:360卫士打开后--高级--修复IE 8.打开360浏览器,“工具”,“360安全浏览器选项”,改下主页
  • 说的文件夹中的文件已经在另外一个程序中打开,请关闭该文件夹,重试。其实这样我们也不知道到底哪里出现问题了,那个文件或文件夹被什么程序占用,此时都不知道。下面给出解决方法。 2、解决方法 (1)打开任务...
  • 要在单个测试/套件上启用重试,请删除Cypress.currentTest使用, Cypress.currentTest采用测试配置替代,例如: // on a single test it ( 'test title' , { retries : 2 } , ( ) => { ... } ) // or on a suite ...
  • 它们一般的流程都是:每隔一段时间,去数据库获取有效的任务,然后执行,执行完成之后,删除任务或者将任务设置为失效。 那么这就可能存在一个潜在的风险:“雪崩效应”。 试想一下如下场景:我有个定时任务,每隔...
  • 先了解Redis 有序集合(sorted set) Redis 有序集合和集合一样也是string类型元素的集合,且不允许重复的成员。...集合是通过哈希表实现的,所以添加,删除,查找的复杂度都是O(1)。 集合中最大的成员数为 23...
  • 订单退款与退款失败任务重试

    千次阅读 2017-05-03 20:14:18
    订单退款与退款失败任务重试
  • 重试功能 gateway 本身是支持重试的(retry ), 只有简单请求会被重试(get) 只会对连接超时进行重试,响应超时不会进行重试 比如: 前一种可能是网络不通 后一种是连接已经建立,接口已经调用到,但是业务...
  • 在Windows系统电脑下,使用移动硬盘或者U盘复制拷贝文件的时候,...请运行chkdsk并重试。”,如下图所示: 不要慌,按照以下步骤,立马解决上述问题。具体步骤如下所示: 1、看到上述的问题提示之后,在Windows...
  • Spring boot 重试机制用法与实现

    千次阅读 2019-08-22 20:13:21
    在调用第三方接口或者使用mq时,会出现网络抖动,连接超时等网络异常,所以需要重试。为了使处理更加健壮并且不太容易出现故障,后续的尝试操作,有时候会帮助失败的操作最后执行成功。例如,由于网络故障或数据库...
  • springboot Rabbit死信队列实现,rocketMq重试消息实现 基于springboot2.15版本,最新rabbit和rocktMq 中间件实例,亲测可用

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 284,729
精华内容 113,891
关键字:

删除重试