精华内容
下载资源
问答
  • 删除重试
    千次阅读
    2020-10-10 14:57:42

    最初的业务场景就是:

    需要需要使用restTemplate调用个接口并且调用失败后需要延时重复调用(最多3次),第一次5秒,第二次10秒,第三次15秒。

    1. 主要功能

    最起初的话思考如果只是简单这样的话其实也好实现,重写httpClient里面就有相关的超时重试机制,但是如果要是实现了某个整体的方法来进行失败重试那不能更好。所以就根据这个想法实现了以下的几个功能:

    1. 可以更简单的针对整个方法来进行延时或者有失败重试的调用执行。
    2. 可以设置首次是否延迟执行以及延迟执行的时间。
    3. 可以设置失败重试次数以及开发者并可自定义配置重试延时时间策略(默认四种:渐进步长、固定时间、固定步长、斐波那契数列)。
    4. 支持查看每次执行结果(包括失败重试的执行结果)。
    5. 执行器统一管理所有任务。
    6. 支持任务自定义顺序完成(流水线完成任务) 例如1 -> 2 -> 3,以及支持查看流水线任务中每个任务的执行结果。

    1.1 使用方式:

    最新版本:1.0.2  这个我已经打包发布至maven中央仓库中,大家可以直接在项目的pom文件中添加下面依赖即可,并且在此com.b0c0.common依赖中还正在不断完善,现在是只有这个延时队列和一个通用的方法日志拦截输出:

    <dependency>
        <groupId>com.b0c0</groupId>
        <artifactId>common</artifactId>
        <version>1.0.2</version>
    </dependency>
    

    1.2 使用示例

    package com.b0c0.common.delayedQueue;
    
    import com.b0c0.common.delayedQueue.base.GeneralQueueConsumerable;
    import com.b0c0.common.domain.vo.GeneralResultVo;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    import java.util.logging.Logger;
    
    public class GeneralDelayedQueueExecuteTest {
    
        private static final Logger logger = Logger.getLogger(GeneralDelayedQueueExecuteTest.class.getName());
    
        public static void main(String[] args) {
            GeneralDelayedQueueExecuteTest test = new GeneralDelayedQueueExecuteTest();
    //        test.run();
    //        try {
    //            test.runAsync();
    //        } catch (ExecutionException e) {
    //            e.printStackTrace();
    //        } catch (InterruptedException e) {
    //            e.printStackTrace();
    //        }
            test.runLine();
        }
    
    
        /**
         * 同步执行示例
         */
        public void run() {
            GeneralDelayedQueue delayedQueue1 = new GeneralDelayedQueue(
                    new TestConsumer1(), "1", "body", 1, 500, 50);
            GeneralDelayedQueue delayedQueue2 = new GeneralDelayedQueue(
                    new TestConsumer1(), "2", "body", 3, 100, 100);
            GeneralDelayedQueue delayedQueue3 = new GeneralDelayedQueue(
                    new TestConsumer1(), "3", "body", 3, 150, 150);
    
            GeneralDelayedQueueExecute.run(delayedQueue1);
            GeneralDelayedQueueExecute.run(delayedQueue2);
            GeneralDelayedQueueExecute.run(delayedQueue3);
        }
    
        /**
         * 异步执行示例
         * @throws ExecutionException
         * @throws InterruptedException
         */
        public void runAsync() throws ExecutionException, InterruptedException {
            GeneralDelayedQueue delayedQueue1 = new GeneralDelayedQueue(
                    new TestConsumer1(), "1", "body", 5, 500, 5);
            GeneralDelayedQueue delayedQueue2 = new GeneralDelayedQueue(
                    new TestConsumer1(), "2", "body", 3, 10000, 2);
            GeneralDelayedQueue delayedQueue3 = new GeneralDelayedQueue(
                    new TestConsumer1(), "3", "body", 3, 1500, 300);
            Future<GeneralResultVo<String>> future1 = GeneralDelayedQueueExecute.runAsync(delayedQueue1);
            Future<GeneralResultVo<String>> future2 = GeneralDelayedQueueExecute.runAsync(delayedQueue2);
            Future<GeneralResultVo<String>> future3 = GeneralDelayedQueueExecute.runAsync(delayedQueue3);
            System.out.println("time ->" + System.currentTimeMillis() + " future1:" + future1.get().getReslutData());
            System.out.println("time ->" + System.currentTimeMillis() + " future2:" + future2.get().getReslutData());
            System.out.println("time ->" + System.currentTimeMillis() + " future3:" + future3.get().getReslutData());
    
        }
    
        /**
         * 流水线执行示例
         */
        public void runLine() {
            GeneralDelayedQueue delayedQueue1 = new GeneralDelayedQueue(
                    new TestConsumer1(), "1", "body", 2, 500, 5);
            GeneralDelayedQueue delayedQueue2 = new GeneralDelayedQueue(
                    new TestConsumer2(), "2", "body", 3, 10, 2);
            GeneralDelayedQueue delayedQueue3 = new GeneralDelayedQueue(
                    new TestConsumer1(), "3", "body", 3, 600, 100);
            List<GeneralDelayedQueue> list = new ArrayList<>();
            list.add(delayedQueue1);
            list.add(delayedQueue2);
            list.add(delayedQueue3);
            GeneralResultVo<String> a = GeneralDelayedQueueExecute.runLine(list);
            TestVo t = (TestVo) delayedQueue3.getBodyData().getPreResult();
            System.out.println(t.getA());
            System.out.println(a.getReslutData());
    
        }
    
    
        static class TestConsumer1 implements GeneralQueueConsumerable {
    
    
            @Override
            public GeneralResultVo<String> run(GeneralDelayedQueue task) {
                GeneralDelayedQueue.BodyData<String,String> resultVo = task.getBodyData();
                String body = resultVo.getBody();
                String id = task.getId();
                int currExecuteNum = task.getCurrExecuteNum();
                logger.info("thread ->" + Thread.currentThread().getId() + " time ->" + System.currentTimeMillis() + " 消费延时队列 id -> " + id + " ,第 -> " + (currExecuteNum + 1) + " 次,body -> " + body);
                if (task.getId().equals("3")) {
                    return GeneralResultVo.fail();
                } else {
                    return GeneralResultVo.success("sss");
                }
            }
        }
    
        static class TestConsumer2 implements GeneralQueueConsumerable {
    
            @Override
            public GeneralResultVo<TestVo> run(GeneralDelayedQueue task) {
                GeneralDelayedQueue.BodyData<String,String> resultVo = task.getBodyData();
                String body = resultVo.getBody();
                String id = task.getId();
                int currExecuteNum = task.getCurrExecuteNum();
                logger.info("thread ->" + Thread.currentThread().getId() + "time ->" + System.currentTimeMillis() + " 消费延时队列 id -> " + id + " ,第 -> " + (currExecuteNum + 1) + " 次,body -> " + body);
                TestVo testVo = new TestVo();
                testVo.setA("a");
                testVo.setB("b");
                return GeneralResultVo.success(testVo);
            }
        }
    
        public static class TestVo {
            private String a;
            private String b;
    
            public String getA() {
                return a;
            }
    
            public void setA(String a) {
                this.a = a;
            }
    
            public String getB() {
                return b;
            }
    
            public void setB(String b) {
                this.b = b;
            }
        }
    
    
    }

    2. 具体实现代码

    具体实现的话是根据Delayed延时队列基础之上来实现的。关于怎么Delayed的使用在这里也就不多说了,大家自行百度学习。

    2.1.延时队列实体类:

    首先我们要是实现这个的话既然要根据Delayed延时队列机制来实现,我们就首先实现一个Delayed接口的基础延时队列实体,里面来保存我们的信息,例如延时时间、重试时间、执行次数、以及开发者根据业务需要自定义的主题内容等。该类如下所示:

    package com.b0c0.common.delayedQueue;
    
    
    import com.b0c0.common.delayedQueue.base.GeneralQueueConsumerable;
    
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    
    /**
     * @program: springbootdemo
     * @description: 通用延时队列实体
     * @author: lidongsheng
     * @createData: 2020-09-21 14:01
     * @updateAuthor: lidongsheng
     * @updateData: 2020-09-21 14:01
     * @updateContent:
     * @Version: 0.0.8
     * @email: lidongshenglife@163.com
     * @blog: https://www.b0c0.com
     * @csdn: https://blog.csdn.net/LDSWAN0
     * ************************************************
     * Copyright @ 李东升 2020. All rights reserved
     * ************************************************
     */
    public class GeneralDelayedQueue<T> implements Delayed {
    
    
        public static class BodyData<T,V>{
            /**
             * 开发者自定义的需要的数据体.
             */
            private T body;
    
            /**
             * 如果为任务链任务,上一个任务的执行结果会保存到这里,
             */
            private V preResult;
    
            public T getBody() {
                return body;
            }
    
            protected void setBody(T body) {
                this.body = body;
            }
    
            public V getPreResult() {
                return preResult;
            }
    
            protected void setPreResult(V preResult) {
                this.preResult = preResult;
            }
        }
    
        //任务的唯一id
        private String id;
        /**
         * 任务的自定义数据体
         */
        private BodyData bodyData;
        /**
         * 任务当前的执行次数(可设置此值为maxExecuteNum来达到强制中断之后的重试执行)
         *
         */
        private int currExecuteNum;
        /**
         * 最大执行次数
         * 此值为1 表示只执行一次,不开启重发
         * 此值大于1 表示开启重发,并且此值为最大执行次数(包含首次执行)。
         */
        private int maxExecuteNum;
    
        /**
         * 任务的首次执行的延时时间,只有任务的首次执行时会用到此值进行延时执行
         */
        private long delayedTime;
    
        /**
         * 任务的重发延时时间,重发自定义延时策略会用到此值
         */
        private long retryTime;
        /**
         * 任务的过期时间,任务到达了过期时间就会执行
         * 检测延迟任务是否到期
         */
        private long expireTime;
        /**
         * 上次的延时时间
         */
        private long lastTime = -1;
        /**
         * 时间单位
         */
        private TimeUnit timeUnit;
    
        /**
         * 执行结果一直保存,可在执行器中随时获取,直至开发人员手动调用删除
         * 注意:如果设置为true了,请务必手动调用GeneralDelayedQueueExecute.clearTask 进行删除。否则任务相关信息将一直存在于内存中
         */
        private boolean keepResults;
    
        private GeneralQueueConsumerable consumerable;
    
        public String getId() {
            return id;
        }
    
        public <T,V>BodyData<T,V> getBodyData() {
            return bodyData;
        }
    
        public static<T,V> BodyData<T,V> initBodyData(T userData) {
            BodyData<T,V> bodyData= new BodyData<>();
            bodyData.setBody(userData);
            return bodyData;
        }
    
        public int getCurrExecuteNum() {
            return currExecuteNum;
        }
    
        public int getMaxExecuteNum() {
            return maxExecuteNum;
        }
    
        public long getDelayedTime() {
            return delayedTime;
        }
    
        public long getRetryTime() {
            return retryTime;
        }
    
        public long getLastTime() {
            return lastTime;
        }
    
        protected void setLastTime(long lastTime) {
            this.lastTime = lastTime;
        }
    
        protected void setCurrExecuteNum(int currExecuteNum) {
            this.currExecuteNum = currExecuteNum;
        }
    
        protected void setExpireTime(long expireTime) {
            this.expireTime = expireTime;
        }
    
        public TimeUnit getTimeUnit() {
            return timeUnit;
        }
    
        public GeneralQueueConsumerable getConsumerable() {
            return consumerable;
        }
    
        public void setConsumerable(GeneralQueueConsumerable consumerable) {
            this.consumerable = consumerable;
        }
    
        public boolean isKeepResults() {
            return keepResults;
        }
    
        /**
         * 完整参数的构造方法
         *
         * @param consumerable  具体任务方法
         * @param id            唯一标识
         * @param userData      主题内容
         * @param keepResults   true表示执行结果一直保存,可在执行器中随时获取,直至开发人员手动调用删除
         * @param maxExecuteNum 最大执行次数
         * @param delayedTime   首次执行延时时间
         * @param retryTime     重试延时时间
         * @param timeUnit      时间单位
         */
        public GeneralDelayedQueue(GeneralQueueConsumerable consumerable,String id, T userData,boolean keepResults, int maxExecuteNum, long delayedTime, long retryTime,TimeUnit timeUnit) {
            this.consumerable = consumerable;
            this.id = id;
            this.bodyData = initBodyData(userData);
            this.keepResults = keepResults;
            this.currExecuteNum = 0;
            this.maxExecuteNum = maxExecuteNum;
            this.delayedTime = delayedTime;
            this.retryTime = retryTime;
            this.timeUnit = timeUnit;
    
        }
    
    
        /**
         * 构造方法 默认时间单位秒,自动捕获异常
         *
         * @param consumerable  具体任务方法
         * @param id            唯一标识
         * @param userData      主题内容
         * @param maxExecuteNum 最大执行次数
         * @param delayedTime   首次执行延时时间
         * @param retryTime     重试延时时间
         */
        public GeneralDelayedQueue(GeneralQueueConsumerable consumerable,String id, T userData, int maxExecuteNum, long delayedTime, long retryTime) {
            this(consumerable,id, userData,false, maxExecuteNum, delayedTime, retryTime, TimeUnit.MILLISECONDS);
        }
    
    
        @Override
        public int compareTo(Delayed delayed) {
            long result = this.getDelay(TimeUnit.NANOSECONDS)
                    - delayed.getDelay(TimeUnit.NANOSECONDS);
            if (result < 0) {
                return -1;
            } else if (result > 0) {
                return 1;
            } else {
                return 0;
            }
        }
    
    
        /**
         * 检测延迟任务是否到期
         * 如果返回的是负数则说明到期否则还没到期
         *
         * @param unit
         * @return
         */
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.expireTime - System.nanoTime(), TimeUnit.NANOSECONDS);
        }
    
    }
    

    2.2  延时队列消费者

    延时队列消费者,也就是开发者要进行执行的具体业务方法,开发者实现这个GeneralQueueConsumerable接口里面的run方法即可执行自己的业务逻辑,该接口如下所示:

    package com.b0c0.common.delayedQueue.base;
    
    
    import com.b0c0.common.delayedQueue.GeneralDelayedQueue;
    import com.b0c0.common.domain.vo.GeneralResultVo;
    
    /**
     * @program: springbootdemo
     * @description: 通用延时队列消费
     * @author: lidongsheng
     * @createData: 2020-09-21 15:01
     * @updateAuthor: lidongsheng
     * @updateData: 2020-09-21 15:01
     * @updateContent:
     * @Version: 0.0.8
     * @email: lidongshenglife@163.com
     * @blog: https://www.b0c0.com
     * @csdn: https://blog.csdn.net/LDSWAN0
     * ************************************************
     * Copyright @ 李东升 2020. All rights reserved
     * ************************************************
     */
    
    
    /**
     * 开发者实现这个GeneralQueueConsumerable接口来实现重试
     */
    public interface GeneralQueueConsumerable {
    
        /**
         * 开发者要进行执行的具体业务方法,重写run方法即可执行自己的业务逻辑。
         * @param task 具体任务
         * @return 返回false根据重发的策略进行重发执行方法,如果设置了自定义的重发,则会根据重试机制进行重试,true表示执行结束。
         */
    
        <T>GeneralResultVo<T> run(GeneralDelayedQueue task);
    }
    

    2.3  自定义重试时间

    自定义的重试时间实现RetryTimeTypeable接口里面的getTime方法即可。该接口如下所示:

    package com.b0c0.common.delayedQueue.base;
    
    import com.b0c0.common.delayedQueue.GeneralDelayedQueue;
    
    /**
     * @program: springbootdemo
     * @description: 重试时间的type接口,自定义的重试时间实现此接口即可
     * @author: lidongsheng
     * @createData: 2020-09-25 19:07
     * @updateAuthor: lidongsheng
     * @updateData: 2020-09-25 19:07
     * @updateContent: 重试时间的type接口
     * @Version: 0.0.8
     * @email: lidongshenglife@163.com
     * @blog: https://www.b0c0.com
     * @csdn: https://blog.csdn.net/LDSWAN0
     * ************************************************
     * Copyright @ 李东升 2020. All rights reserved
     * ************************************************
     */
    
    public interface RetryTimeTypeable {
        /**
         * 返回延时时间
         * 开发者实现这个此接口重写里面的getTime方法即可根据具体需要进行定义重试机制延时时间。
         * @param task
         * @return 返回此次的延时时间,单位和GeneralDelayedQueue构造方法中设置的时间单位一致(默认为秒)。
         */
        long getTime(GeneralDelayedQueue task);
    }
    

    2.4  默认的四种重试时间实现

    1. 渐进步长 retryTime越大,重试延时时间的间隔时间就会越来越大
    2. 固定时间
    3. 固定步长
    4. 斐波那契数列

    这四种感觉是最常用的重试时间,已经默认集成,使用时可直接调用。该类如下所示:

    package com.b0c0.common.delayedQueue;
    
    
    import com.b0c0.common.delayedQueue.base.RetryTimeTypeable;
    
    import java.util.UUID;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @program: springbootdemo
     * @description: 默认的重试时间实现接口
     * @author: lidongsheng
     * @createData: 2020-09-25 19:41
     * @updateAuthor: lidongsheng
     * @updateData: 2020-09-25 19:41
     * @updateContent:
     * @Version: 0.0.8
     * @email: lidongshenglife@163.com
     * @blog: https://www.b0c0.com
     * @csdn: https://blog.csdn.net/LDSWAN0
     * ************************************************
     * Copyright @ 李东升 2020. All rights reserved
     * ************************************************
     */
    
    public class DefaultRetryTimeTypeator {
    
    
        /**
         * 渐进步长 retryTime越大,重试延时时间的间隔时间就会越来越大
         * 栗子 retryTime = 5
         * 第一次重试延时 5 = 0 + 1 * 5
         * 第二次重试延时 15 = 5 + 2 * 5
         * 第三次重试延时  30 = 15 + 3 * 5
         * 第四次重试延时  50 = 30 + 4 * 5
         * ... 第10次重试延时  225
         * ... 第20次重试延时  950
         * ... 第30次重试延时  2175
         * ... 第40次重试延时  3900
         * ... 第50次重试延时  6125
         * @return
         */
        public static RetryTimeTypeable AdvanceStepTimeRetryTimeTypeator() {
            return new AdvanceStepTimeRetryTimeTypeator();
        }
    
        /**
         * 固定时间
         * 栗子 retryTime = 5
         * 第一次重试延时 5 第二次重试延时 5 第三次重试延时 5
         *
         * @return
         */
        public static RetryTimeTypeable FixDelayedRetryTimeTypeator() {
            return new FixDelayedRetryTimeTypeator();
        }
    
        /**
         * 固定步长
         * 栗子 retryTime = 5
         * 第一次重试延时 5 第二次重试延时 10 第三次重试延时 15
         *
         * @return
         */
        public static RetryTimeTypeable FixStepTimeRetryTimeTypeator() {
            return new FixStepTimeRetryTimeTypeator();
        }
    
        /**
         * 斐波那契数列 建议不要超过30
         * CurrExecuteNum = 10  return 55
         * CurrExecuteNum = 20  return 6765
         * CurrExecuteNum = 30  return 832040
         * CurrExecuteNum = 40  return 102334155
         * .....
         * @return
         */
        public static RetryTimeTypeable FibonacciSeriesRetryTimeTypeator() {
            return new FibonacciSeriesRetryTimeTypeator();
        }
    
        private static class AdvanceStepTimeRetryTimeTypeator implements RetryTimeTypeable {
    
            @Override
            public long getTime(GeneralDelayedQueue task) {
                return (task.getCurrExecuteNum() == 1 ? 0 : task.getLastTime()) + task.getCurrExecuteNum() * task.getRetryTime();
            }
        }
    
        private static class FixDelayedRetryTimeTypeator implements RetryTimeTypeable {
    
            @Override
            public long getTime(GeneralDelayedQueue task) {
                return task.getRetryTime();
            }
        }
    
        private static class FixStepTimeRetryTimeTypeator implements RetryTimeTypeable {
            @Override
            public long getTime(GeneralDelayedQueue task) {
                return task.getCurrExecuteNum() * task.getRetryTime();
            }
        }
    
        private static class FibonacciSeriesRetryTimeTypeator implements RetryTimeTypeable {
            @Override
            public long getTime(GeneralDelayedQueue task) {
                int a = 0, b = 1, sum;
                int n = task.getCurrExecuteNum();
                for (int i = 0; i < n; i++) {
                    sum = a + b;
                    a = b;
                    b = sum;
                }
                return a;
            }
        }
    
    //    public static void main(String[] args) {
    //        AdvanceStepTimeRetryTimeTypeator retryTimeTypeator = new AdvanceStepTimeRetryTimeTypeator();
    //        GeneralDelayedQueue delayedQueue = new GeneralDelayedQueue(
    //                UUID.randomUUID().toString(),
    //                null,
    //                8, 0, 150,TimeUnit.MILLISECONDS);
    //        for (int i = 0; i < 8; i++) {
    //            delayedQueue.setCurrExecuteNum(i);
    //            long time = retryTimeTypeator.getTime(delayedQueue);
    //            delayedQueue.setLastTime(time);
    //            System.out.println(time);
    //        }
    //    }
    }
    

    2.5  延时队列执行器(主要)

    延时队列执行器就是调用的入口类,此类应该时储存一下延时队列实体以及具体的执行方法实体等,以及承担具体执行。并且能够支持多线程调用,所以此类已经实现了Runnable接口,开发者可以根据需要来多线程调用或者直接同步调用。该类如下所示:

    package com.b0c0.common.delayedQueue;
    
    
    import com.b0c0.common.delayedQueue.base.RetryTimeTypeable;
    import com.b0c0.common.domain.vo.GeneralResultCodeEnum;
    import com.b0c0.common.domain.vo.GeneralResultVo;
    import com.google.common.util.concurrent.ThreadFactoryBuilder;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.*;
    import java.util.logging.Logger;
    
    
    /**
     * @program: springbootdemo
     * @description: 通用延时队列执行器
     * @author: lidongsheng
     * @createData: 2020-09-21 15:50
     * @updateAuthor: lidongsheng
     * @updateData: 2020-09-21 15:50
     * @updateContent:
     * @Version: 0.0.8
     * @email: lidongshenglife@163.com
     * @blog: https://www.b0c0.com
     * @csdn: https://blog.csdn.net/LDSWAN0
     * ************************************************
     * Copyright @ 李东升 2020. All rights reserved
     * ************************************************
     */
    
    /**
     * 延时队列执行器就是调用的入口类,此类应该时储存一下延时队列实体以及具体的执行方法实体等,以及承担具体执行。
     * 并且能够支持多线程调用,所以此类已经实现了Runnable接口,开发者可以根据需要来多线程调用或者直接同步调用。
     */
    public class GeneralDelayedQueueExecute {
    
        private static final Logger logger = Logger.getLogger(GeneralDelayedQueueExecute.class.getName());
    
        //延时队列
        private static Map<String, DelayQueue<GeneralDelayedQueue>> delayQueueMap = new ConcurrentHashMap<>();
        //延时队列主题信息
        private static Map<String, GeneralDelayedQueue> taskMap = new ConcurrentHashMap<>();
        //重试时间的具体实现
        private static Map<String, RetryTimeTypeable> retryTimeTypeableMap = new ConcurrentHashMap<>();
        //用来保证程序执行完成之后才能获取到执行结果
        private static Map<String, CountDownLatch> countDownLatchMap = new ConcurrentHashMap<>();
        //存储每次执行的具体结果信息
        private static Map<String, List> resultListMap = new ConcurrentHashMap<>();
        //异步执行的线程池
        private static ExecutorService executor;
    
        static {
            final ThreadFactory GENERAL_THREAD_FACTORY = new ThreadFactoryBuilder().setNameFormat("com.b0c0.commom.delayedQueue-pool-general-%d").build();
            //核心线程池大小
            final int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() << 1 + 1;
            //最大线程池大小
            final int MAX_POOL_SIZE = CORE_POOL_SIZE << 1;
            //线程任务队列大小
            final int QUEUE_CAPACITY = 500;
            //空闲线程的存活时间.默认情况下核心线程不会退出
            final int KEEP_ALIVE_TIME = 15;
            executor = new ThreadPoolExecutor(
                    CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>(QUEUE_CAPACITY), GENERAL_THREAD_FACTORY, new ThreadPoolExecutor.DiscardOldestPolicy());
        }
    
        /**
         * 用户可自定义异步执行时候的线程池
         *
         * @param executor
         */
        public static void setExecutor(ExecutorService executor) {
            GeneralDelayedQueueExecute.executor = executor;
        }
    
        /**
         * 执行方法
         *
         * @param task              具体任务
         * @param retryTimeTypeator 重试延时时间策略
         */
        public static <T> GeneralResultVo<T> run(GeneralDelayedQueue task, RetryTimeTypeable retryTimeTypeator) {
            DelayQueue<GeneralDelayedQueue> queue = new DelayQueue<>();
            initTask(task, retryTimeTypeator, queue);
            return execute(task);
        }
    
        public static <T> GeneralResultVo<T> run(GeneralDelayedQueue task) {
            return run(task, DefaultRetryTimeTypeator.FixDelayedRetryTimeTypeator());
        }
    
        /**
         * 异步执行方法,可以单独为某任务根据传入一个线程池执行
         *
         * @param task              具体任务
         * @param retryTimeTypeator 重试延时时间策略
         * @param executor          用户自定义线程池
         */
        public static <T> Future<GeneralResultVo<T>> runAsync(GeneralDelayedQueue task, RetryTimeTypeable retryTimeTypeator, ExecutorService executor) {
            return executor.submit(() -> run(task, retryTimeTypeator));
        }
    
        /**
         * 异步执行方法 默认内置线程池
         *
         * @param task              具体任务
         * @param retryTimeTypeator 重试延时时间策略
         */
        public static <T> Future<GeneralResultVo<T>> runAsync(GeneralDelayedQueue task, RetryTimeTypeable retryTimeTypeator) {
            return executor.submit(() -> run(task, retryTimeTypeator));
        }
    
        /**
         * 异步执行方法  默认内置线程池
         *
         * @param task 具体任务
         */
        public static <T> Future<GeneralResultVo<T>> runAsync(GeneralDelayedQueue task) {
            return executor.submit(() -> run(task));
        }
    
        /**
         * 任务链的执行方法 自定义顺序完成(流水线完成任务) 例如A -> B -> C
         * 并且任务的执行结果会自动传递给下一任务。比如A任务的执行结果,会传递给B任务。
         * 注意:
         * 此方法返回的为流水线最后一个任务的值,若想在最后得到某个任务或者所有任务的具体执行结果,须将 GeneralDelayedQueue.keepResults,设置为true;
         *
         * @param tasks              具体任务list集合,会按照集合的添加顺序来流水线顺序执行任务
         * @param retryTimeTypeators 重试延时时间策略
         * @param <T>
         * @return 任务链执行返回值为:返回的为最后一个运行任务的返回值。
         */
        public static <T> GeneralResultVo<T> runLine(List<GeneralDelayedQueue> tasks, List<RetryTimeTypeable> retryTimeTypeators) {
    
            if (tasks == null || tasks.isEmpty() || retryTimeTypeators == null || retryTimeTypeators.isEmpty()) {
                return GeneralResultVo.fail(GeneralResultCodeEnum.PARAM_ERROR.getCode(), "任务集合和重试延时时间策略集合不能为空");
            }
            int taskSize = tasks.size();
            if (taskSize != retryTimeTypeators.size()) {
                return GeneralResultVo.fail(GeneralResultCodeEnum.PARAM_ERROR.getCode(), "任务集合和重试延时时间策略集合大小不一致,无法相互对应");
            }
            DelayQueue<GeneralDelayedQueue> queue = new DelayQueue<>();
            GeneralResultVo<T> resultVo = GeneralResultVo.fail();
            for (int i = 0; i < taskSize; i++) {
                initTask(tasks.get(i), retryTimeTypeators.get(i), queue);
                resultVo = execute(tasks.get(i));
                if (resultVo.isSuccess()) {
                    if (i != 0 && i != taskSize - 1) {
                        tasks.get(i + 1).getBodyData().setPreResult(resultVo.getReslutData());
                    }
                } else {
                    break;
                }
            }
            return resultVo;
        }
    
        public static <T> GeneralResultVo<T> runLine(List<GeneralDelayedQueue> tasks) {
            int taskSize = tasks.size();
            List<RetryTimeTypeable> retryTimeTypeators = new ArrayList<>();
            for (int i = 0; i < taskSize; i++) {
                retryTimeTypeators.add(DefaultRetryTimeTypeator.FixDelayedRetryTimeTypeator());
            }
            return runLine(tasks, retryTimeTypeators);
        }
    
        /**
         * 异步执行任务链方法,可以单独为某任务根据传入一个线程池执行
         *
         * @param tasks              具体任务
         * @param retryTimeTypeators 重试延时时间策略
         * @param executor           用户自定义线程池
         */
        public static <T> Future<GeneralResultVo<T>> runLinesync(List<GeneralDelayedQueue> tasks, List<RetryTimeTypeable> retryTimeTypeators, ExecutorService executor) {
            return executor.submit(() -> runLine(tasks, retryTimeTypeators));
        }
    
        /**
         * 异步执行任务链方法 默认内置线程池
         *
         * @param tasks              具体任务
         * @param retryTimeTypeators 重试延时时间策略
         */
        public static <T> Future<GeneralResultVo<T>> runLinesync(List<GeneralDelayedQueue> tasks, List<RetryTimeTypeable> retryTimeTypeators) {
            return executor.submit(() -> runLine(tasks, retryTimeTypeators));
        }
    
        /**
         * 异步执行任务链方法  默认内置线程池
         *
         * @param tasks 具体任务
         */
        public static <T> Future<GeneralResultVo<T>> runLinesync(List<GeneralDelayedQueue> tasks) {
            return executor.submit(() -> runLine(tasks));
        }
    
        private static <T> GeneralResultVo<T> execute(GeneralDelayedQueue task) {
    
            GeneralResultVo<T> result = GeneralResultVo.fail();
            String id = task.getId();
            RetryTimeTypeable retryTimeTypeator = retryTimeTypeableMap.get(id);
            List<GeneralResultVo<T>> resultList = resultListMap.get(id);
            CountDownLatch countDownLatch = countDownLatchMap.get(id);
            DelayQueue<GeneralDelayedQueue> queue = delayQueueMap.get(id);
            try {
                result = task.getConsumerable().run(queue.take());
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                task.setLastTime(retryTimeTypeator.getTime(task));
                //添加执行结果
                resultList.add(result);
                countDownLatch.countDown();
                //延时执行
                if (task.getCurrExecuteNum() < task.getMaxExecuteNum() - 1 && !result.isSuccess()) {
                    task.setCurrExecuteNum(task.getCurrExecuteNum() + 1);
                    setExpireTime(task);
                    queue.offer(task);
                    result = execute(task);
                } else {
                    while ((countDownLatch.getCount()) > 0) {
                        countDownLatch.countDown();
                    }
                }
                if (!task.isKeepResults()) {
                    clearTask(task.getId());
                }
            }
            return result;
        }
    
        private static void setExpireTime(GeneralDelayedQueue task) {
            long expireTime = 0;
            RetryTimeTypeable retryTimeTypeator = retryTimeTypeableMap.get(task.getId());
            if (task.getCurrExecuteNum() == 0) {
                expireTime = TimeUnit.NANOSECONDS.convert(
                        task.getDelayedTime(), task.getTimeUnit()) + System.nanoTime();
            } else {
                expireTime = TimeUnit.NANOSECONDS.convert(
                        retryTimeTypeator.getTime(task), task.getTimeUnit()) + System.nanoTime();
            }
            task.setExpireTime(expireTime);
        }
    
        /**
         * 得到全部的执行结果
         *
         * @param taskId     任务id标识
         * @param fastReturn 立即返回 true 代表立即返回, false 代表必须等到最大执行次数后返回(list.size = maxExecuteNum)
         * @param outTime    超时时间
         * @param timeUnit   时间单位
         * @return 执行结果列表
         */
        public static <T> List<GeneralResultVo<T>> getResultList(String taskId, boolean fastReturn, long outTime, TimeUnit timeUnit) {
            try {
                if (taskMap.containsKey(taskId)) {
                    if (!fastReturn) {
                        awaitCountDown(taskId, outTime, timeUnit);
                    }
                    return resultListMap.get(taskId);
                } else {
                    GeneralResultVo generalResultVo = GeneralResultVo.fail(
                            GeneralResultCodeEnum.TASK_EXIST.getCode(), GeneralResultCodeEnum.TASK_EXIST.getDesc());
                    List<GeneralResultVo<T>> res = new ArrayList<>();
                    res.add(generalResultVo);
                    return res;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        /**
         * 等待任务全部完成
         *
         * @param id       任务id
         * @param timeOut  等待超时时间
         * @param timeUnit 时间单位
         * @return countDownLatch当前计数值
         * @throws InterruptedException
         */
        private static long awaitCountDown(String id, Long timeOut, TimeUnit timeUnit) throws InterruptedException {
            CountDownLatch countDownLatch = countDownLatchMap.get(id);
            if (timeOut == null) {
                countDownLatch.await();
            } else {
                countDownLatch.await(timeOut, timeUnit);
            }
            return countDownLatch.getCount();
        }
    
        /**
         * 根据任务初始化任务信息
         *
         * @param task
         * @param retryTimeTypeator
         */
        private static void initTask(GeneralDelayedQueue task, RetryTimeTypeable retryTimeTypeator, DelayQueue<GeneralDelayedQueue> queue) {
            String id = task.getId();
            delayQueueMap.put(id, queue);
            taskMap.put(id, task);
            retryTimeTypeableMap.put(id, retryTimeTypeator);
            CountDownLatch countDownLatch = new CountDownLatch(task.getMaxExecuteNum());
            countDownLatchMap.put(id, countDownLatch);
            setExpireTime(task);
            resultListMap.put(id, new ArrayList<>());
            queue.offer(task);
        }
    
    
        /**
         * 根据任务id清除任务全部的map信息
         * GeneralDelayedQueue的keepResults如果设置为true了,请务必手动调用此方法进行删除。否则任务相关信息将一直存在于内存中
         *
         * @param taskId 任务id
         */
        public static void clearTask(String taskId) {
            CountDownLatch countDownLatch = countDownLatchMap.get(taskId);
            while (countDownLatch != null && countDownLatch.getCount() > 0) {
                countDownLatch.countDown();
            }
            delayQueueMap.remove(taskId);
            taskMap.remove(taskId);
            countDownLatchMap.remove(taskId);
            taskMap.remove(taskId);
            resultListMap.remove(taskId);
            retryTimeTypeableMap.remove(taskId);
        }
    }
    

    关于更多com.b00c.common 依赖的源码详情、使用方法、更新历史请去下面这个github仓库查看。

    GitHub - DeBug-Bug/common

    更多相关内容
  • RocketMQ-重试队列

    千次阅读 2021-12-29 14:50:52
    对于需要重试消费的消息,并不是Consumer在等待一个指定时长后再去拉取原来的消息进行消费,而是将这些需要重试的消息放入到一个特殊的Topic队列中,而后进行再次消费的,这个特殊的队列就是重试队列。当出现需要...

    重试队列介绍

    对于需要重试消费的消息,并不是Consumer在等待一个指定时长后再去拉取原来的消息进行消费,而是将这些需要重试的消息放入到一个特殊的Topic队列中,而后进行再次消费的,这个特殊的队列就是重试队列。当出现需要进行重试消费的消息时,Broker会为每个消费组都设置Topic名称,为%RETRY%consumerGroup@consumerGroup的重试队列。

    这个重试队列是针对消息才组的,而不是针对每个Topic设置的(一个Topic的消息可以让多个消费者进行消费,所以会为这些消费者组各创建一个重试队列)
    只有当现在需要进行重试消费的消息时,才会为该消费者组创建重试队列

    在这里插入图片描述
    Broker对于重试消息的处理是通过延迟消息来实现的,先将消息保存到SCHEDULE_TOPIC_XXXX延迟队列中,延迟时间到后,会将消息投递到%consumerGroup@consumerGroup重试队列中。

    展开全文
  • 上一篇《RocketMQ:消息重试》中我们提到当一条消息消费失败时,RocketMQ会进行一定次数的重试重试的结果也很简单,无非就是在第N次重试时,被成功消费。或者就是经过M次重试后,仍然没有被消息。这通常是由于消费...

    1. 死信队列

    上一篇【RocketMQ】消息重试中我们提到当一条消息消费失败时,RocketMQ会进行一定次数的重试。重试的结果也很简单,无非就是在第N次重试时,被成功消费。或者就是经过M次重试后,仍然没有被成功消费。这通常是由于消费者在正常情况下无法正确地消费该消息。此时,RocketMQ不会立即将消息丢弃,而是将其发送到该消费者对应的特殊队列中去。

    在RocketMQ中,这种正常情况下无法被消费的消息被称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

    1.1 死信特性

    (1)死信消息具有以下特性:

    • 不会再被消费者正常消费。
    • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。

    (2)死信队列具有以下特性:

    • 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
    • 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
    • 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。

    1.2 查看死信消息

    (1)在控制条查询出现死信队列的主题信息
    在这里插入图片描述

    (2)在消费界面根据主题查询死信消息
    在这里插入图片描述
    (3)选择重新发送消息

    一条消息进入死信队列,意味着某些因素导致消费者无法正常消费该消息。因此,通常需要我们对其进行特殊处理。排查可疑因素并解决问题后,可以在消息队列 RocketMQ 控制台重新发送该消息,让消费者重新消费一次。

    2.重试次数参数

    RocketMQ的重试机制涉及发送端重试和消费端重试,消费端重试关联死信队列

    2.1 Producer端重试

    生产者端的消息失败,也就是Producer往MQ上发消息没有发送成功,比如网络抖动导致生产者发送消息到MQ失败。

    这种消息失败重试我们可以手动设置发送失败重试的次数,看一下代码:

    public class DefaultMQProducer  {
    	//设置消息发送失败时的最大重试次数
    	public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
    	   this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
    	}
    

    在这里插入图片描述

    2.2 Consumer端重试

    注:只有在消息模式为MessageModel.CLUSTERING集群模式时,Broker才会自动进行重试,广播消息是不会重试的。

    消费者消费消息后,需要给Broker返回消费状态。以MessageListenerConcurrently监听器为例,Consumer消费完成后需要返回ConsumeConcurrentlyStatus并发消费状态。查看源码,ConsumeConcurrentlyStatus是一个枚举,共有两种状态:

    public enum ConsumeConcurrentlyStatus {
       //消费成功
       ConsumeConcurrentlyStatus,
    
       //消费失败,一段时间后重试
       RECONSUME_LATER;
    }
    

    Consumer端的重试包括两种情况

    • 异常重试:由于Consumer端逻辑出现了异常,导致返回了RECONSUME_LATER状态,那么Broker就会在一段时间后尝试重试。
    • 超时重试:如果Consumer端处理时间过长,或者由于某些原因线程挂起,导致迟迟没有返回消费状态,Broker就会认为Consumer消费超时,此时会发起超时重试。

    因此,如果Consumer端正常消费成功,一定要返回ConsumeConcurrentlyStatus.ConsumeConcurrentlyStatus状态。

    3.1 异常重试

    RocketMQ可在broker.conf文件中配置Consumer端的重试次数和重试时间间隔,如下:

    messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    

    默认18时间间隔,表示重试18次,可以减少配置的数量吗?确切的说异常重试复用了延迟队列,因为如果失败了,立即重试,往往还是失败的,例如网络暂时中断,这样通过不断增加重试时间间隔,第一次失败,丢进1s的队列,第二次丢进5s的队列。利用延迟队列,保证了可以多次重试,并通过延迟时间确保业务尽量能成功。

    如果用户没有配置,在有默认值(broker侧源码,不是客户端的源码),以rocketmq-all-4.7.1-source-release.zip为例(https://gitee.com/king_beijixiong/study-rocketmq/blob/master/rocketmq-all-4.7.1-source-release.zip):

    public class MessageStoreConfig {
       private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    
    package org.apache.rocketmq.example.quickstart;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.Date;
    import java.util.List;
    
    public class Consumer {
    
        public static void main(String[] args) throws InterruptedException, MQClientException {
    
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
    
            consumer.setNamesrvAddr("10.89.184.62:9876");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
    
            consumer.subscribe("TopicTest2", "*");
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                    ConsumeConcurrentlyContext context) {
    
                    System.out.println("date="+new Date()+" *******");
    
                    for(MessageExt msg :msgs){
                        System.out.println("msg="+msg.getMsgId());
                        System.out.println("date="+new Date());
                        System.out.println("ReconsumeTimes="+msg.getReconsumeTimes());
                        System.out.println();
                    }
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    //  return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
    
            consumer.start();
    
            System.out.printf("Consumer Started.%n");
        }
    }
    
    

    执行结果:

    onsumer Started.
    date=Fri Aug 05 14:08:52 CST 2022 *******
    msg=0A28A4923EC018B4AAC217A272330000
    date=Fri Aug 05 14:08:52 CST 2022
    ReconsumeTimes=0                        '第一次处理'
    
    date=Fri Aug 05 14:09:02 CST 2022 *******
    msg=0A28A4923EC018B4AAC217A272330000
    date=Fri Aug 05 14:09:02 CST 2022
    ReconsumeTimes=1                       '第2次处理 与第一次间隔10s'
    
    date=Fri Aug 05 14:09:33 CST 2022 *******
    msg=0A28A4923EC018B4AAC217A272330000
    date=Fri Aug 05 14:09:33 CST 2022
    ReconsumeTimes=2						'第3次处理 与第2次间隔20s'
    
    date=Fri Aug 05 14:10:33 CST 2022 *******
    msg=0A28A4923EC018B4AAC217A272330000
    date=Fri Aug 05 14:10:33 CST 2022
    ReconsumeTimes=3                       '第4次处理 与第3次间隔1m'
    
    
    .....后面还有,省略
    

    从结果看,失败后会重试,并且每次间隔时间符合messageDelayLevel规律,当然跳过了1和5s,是从第三个开始的。

    但是在大部分情况下,如果Consumer端逻辑出现异常,重试太多次也没有很大的意义,我们可以在代码中指定最大的重试次数。如下:

    即提前返回成功状态,是假的成功,也不会进入死信队列。反之,捕获异常,每次返回RECONSUME_LATER,到18次就会进入死信队列

    package william.rmq.consumer.quickstart;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Service;
    import org.springframework.util.CollectionUtils;
    import william.rmq.common.constant.RocketMQConstant;
    
    import javax.annotation.PostConstruct;
    import java.util.List;
    
    /**
    
    * @Description:RocketMQ消息消费者
    */
    @Slf4j
    @Service
    public class MessageConsumer implements MessageListenerConcurrently {
       @Value("${spring.rocketmq.namesrvAddr}")
       private String namesrvAddr;
    
       private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");
    
    
       @PostConstruct
       public void start() {
           try {
               consumer.setNamesrvAddr(namesrvAddr);
    
               //从消息队列头部开始消费
               consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
               //设置集群消费模式
               consumer.setMessageModel(MessageModel.CLUSTERING);
    
               //订阅主题
               consumer.subscribe("DefaultCluster", "*");
    
               //注册消息监听器
               consumer.registerMessageListener(this);
    
               //启动消费端
               consumer.start();
    
               log.info("Message Consumer Start...");
               System.err.println("Message Consumer Start...");
           } catch (MQClientException e) {
               log.error("Message Consumer Start Error!!",e);
           }
    
       }
    
       @Override
       public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
           if (CollectionUtils.isEmpty(msgs)) {
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
           }
    
           MessageExt message = msgs.get(0);
           try {
               //逐条消费
               String messageBody = new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET);
               System.err.println("Message Consumer: Handle New Message: messageId: " + message.getMsgId() + ",topic: " +
                       message.getTopic() + ",tags: " + message.getTags() + ",messageBody: " + messageBody);
    
               //模拟业务异常
               int i = 1 / 0;
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
           } catch (Exception e) {
               log.error("Consume Message Error!!", e);
               //抛出异常时,返回ConsumeConcurrentlyStatus.RECONSUME_LATER,尝试重试。当重试指定次数后返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS
               int reconsumeTimes = message.getReconsumeTimes();
               System.err.println("Now Retry Times: " + reconsumeTimes);
               if (reconsumeTimes >= RocketMQConstant.MAX_RETRY_TIMES) {
                   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
               }
               return ConsumeConcurrentlyStatus.RECONSUME_LATER;
           }
       }
    
    }
    
    

    可以看到控制台打印如下:

    Now Retry Times: 3
    Message Consumer: Handle New Message: messageId: 0A0E096CA14618B4AAC2562C6D5B0000,topic: DefaultCluster,tags: Tags,messageBody: Message-1
    Now Retry Times: 3
    Message Consumer: Handle New Message: messageId: C0A81FFA7FF318B4AAC24A37C32C0007,topic: DefaultCluster,tags: Tags,messageBody: Order-2-完成
    Now Retry Times: 3
    Now Retry Times: 3
    Message Consumer: Handle New Message: messageId: C0A81FFA7FF318B4AAC24A37C3290006,topic: DefaultCluster,tags: Tags,messageBody: Order-2-支付
    Now Retry Times: 3
    Now Retry Times: 3
    
    

    消息重试指定的次数后,就返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS不再重试了。

    3.2 超时重试

    当Consumer处理时间过长,在超时时间内没有返回给Broker消费状态,那么Broker也会自动重试

    package william.rmq.consumer.quickstart;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Service;
    import org.springframework.util.CollectionUtils;
    import william.rmq.common.constant.RocketMQConstant;
    
    import javax.annotation.PostConstruct;
    import java.util.List;
    
    /**
    * @Auther: ZhangShenao
    * @Date: 2018/9/7 11:06
    * @Description:RocketMQ消息消费者
    */
    @Slf4j
    @Service
    public class MessageConsumer implements MessageListenerConcurrently {
       @Value("${spring.rocketmq.namesrvAddr}")
       private String namesrvAddr;
    
       private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");
    
    
       @PostConstruct
       public void start() {
           try {
               consumer.setNamesrvAddr(namesrvAddr);
    
               //从消息队列头部开始消费
               consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
               //设置集群消费模式
               consumer.setMessageModel(MessageModel.CLUSTERING);
    
               //设置消费超时时间(分钟)
               consumer.setConsumeTimeout(RocketMQConstant.CONSUMER_TIMEOUT_MINUTES);
    
               //订阅主题
               consumer.subscribe("DefaultCluster", "*");
    
               //注册消息监听器
               consumer.registerMessageListener(this);
    
               //启动消费端
               consumer.start();
    
               log.info("Message Consumer Start...");
               System.err.println("Message Consumer Start...");
           } catch (MQClientException e) {
               log.error("Message Consumer Start Error!!",e);
           }
    
       }
    
       @Override
       public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
           if (CollectionUtils.isEmpty(msgs)) {
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
           }
    
           MessageExt message = msgs.get(0);
           try {
               //逐条消费
               String messageBody = new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET);
               System.err.println("Message Consumer: Handle New Message: messageId: " + message.getMsgId() + ",topic: " +
                       message.getTopic() + ",tags: " + message.getTags() + ",messageBody: " + messageBody);
    
               //模拟耗时操作2分钟,大于设置的消费超时时间
               Thread.sleep(1000L * 60 * 2);
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
           } catch (Exception e) {
               log.error("Consume Message Error!!", e);
               return ConsumeConcurrentlyStatus.RECONSUME_LATER;
           }
       }
    
    }
    
    

    可以看到, Thread.sleep耗时超过最大超时时间,触发失败

    参考

    RocketMQ:死信队列和消息幂等
    RocketMQ详解(12)——RocketMQ的重试机制
    源码分析RocketMQ之消息消费重试机制 broker源码,含有16次最大重试次数源码

    展开全文
  • RabbitMQ的延时重试队列

    千次阅读 2021-11-04 17:18:06
    这就可以使用延时重试队列,本文将介绍如何实现延时重试队列。 2.原理 图是俺在网上找的,请原作者谅解。 发送到业务队里 如果正常收到 正常运行 如果处理失败 重试 并投入延时队列 如果超过延时时间 重新投入...

    在这里插入图片描述

    1.背景

    通过上文学习知道了死信队列,如果只是网络抖动,出现异常那么直接进入死信队列,那么是不合理的。这就可以使用延时重试队列,本文将介绍如何实现延时重试队列。

    2.原理

    在这里插入图片描述

    图是俺在网上找的,请原作者谅解。

    1. 发送到业务队里 如果正常收到 正常运行
    2. 如果处理失败 重试 并投入延时队列 如果超过延时时间 重新投入业务队列
    3. 如果重试次数大于3 那么进入死信队列

    3.代码实现

    1.业务队列

    这里声明业务队列与绑定关系。

    @Configuration
    public class BusinessConfig {
    
        /**
         * yewu1模块direct交换机的名字
         */
        public static final String YEWU1_EXCHANGE = "yewu1_direct_exchange";
    
        /**
         * demo业务的队列名称
         */
        public static final String YEWU1_DEMO_QUEUE = "yewu1_demo_queue";
    
        /**
         * demo业务的routekey
         */
        public static final String YEWU1_DEMO_ROUTINGKEY = "yewu1_demo_key";
    
        /**
         * 业务交换机交换机(一个项目一个业务交换机即可)
         * 1.定义direct exchange,绑定queueTest
         * 2.durable="true" rabbitmq重启的时候不需要创建新的交换机
         * 3.direct交换器相对来说比较简单,匹配规则为:如果路由键匹配,消息就被投送到相关的队列
         * fanout交换器中没有路由键的概念,他会把消息发送到所有绑定在此交换器上面的队列中。
         * topic交换器你采用模糊匹配路由键的原则进行转发消息到队列中
         */
        @Bean
        public DirectExchange yewu1Exchange() {
            DirectExchange directExchange = new DirectExchange(YEWU1_EXCHANGE, true, false);
            return directExchange;
        }
    
        /**
         * 新建队列(一个业务需要一个队列一个routekey 命名格式 项目名-业务名)
         * 1.队列名称
         * 2.durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
         * 3.exclusive 表示该消息队列是否只在当前connection生效,默认是false
         * 4.auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
         * 5.对nack或者发送超时的 发送给死信队列 args是绑定死信队列
         *
         */
        @Bean
        public Queue yewu1DemoQueue() {
            return new Queue(YEWU1_DEMO_QUEUE, true, false, false);
        }
    
        /**
         * 交换机与routekey绑定
         * 
         * @return
         */
        @Bean
        public Binding yewu1DemoBinding() {
            return BindingBuilder.bind(yewu1DemoQueue()).to(yewu1Exchange())
                .with(YEWU1_DEMO_ROUTINGKEY);
        }
    }
    

    2.延时队列

    声明延时队列与绑定关系。

    @Configuration
    public class RetryConfig {
    
        /**
         * 延时队列 交换机配置标识符(固定)
         */
        public static final String RETRY_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
    
        /**
         * 延时队列交换机绑定配置键标识符(固定)
         */
        public static final String RETRY_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
    
        /**
         * 延时队列消息的配置超时时间枚举(固定)
         */
        public static final String RETRY_MESSAGE_TTL = "x-message-ttl";
    
        /**
         * yewu1模块延时队列交换机
         */
        public final static String YEWU1_RETRY_EXCHANGE_NAME = "yewu1_retry_exchange";
    
        /**
         * yewu1模块DEMO业务延时队列
         */
        public final static String YEWU1_DEMO_RETRY_QUEUE_NAME = "yewu1_demo_retry_queue";
    
        /**
         * yewu1模块DEMO延时队列routekey
         */
        public final static String YEWU1_DEMO_RETRY_ROUTING_KEY = "yewu1_demo_retry_key";
    
        /**
         * 延时队列交换机
         *
         * @return
         */
        @Bean
        public DirectExchange yewu1RetryExchange() {
            DirectExchange directExchange = new DirectExchange(YEWU1_RETRY_EXCHANGE_NAME, true, false);
            return directExchange;
        }
    
        /**
         * 新建延时队列 一个业务队列需要一个延时队列
         * 
         * @return
         */
        @Bean
        public Queue yewu1DemoRetryQueue() {
            Map<String, Object> args = new ConcurrentHashMap<>(3);
            // 将消息重新投递到业务交换机Exchange中
            args.put(RETRY_LETTER_QUEUE_KEY, BusinessConfig.YEWU1_EXCHANGE);
            args.put(RETRY_LETTER_ROUTING_KEY, BusinessConfig.YEWU1_DEMO_ROUTINGKEY);
            // 消息在队列中延迟3s后超时,消息会重新投递到x-dead-letter-exchage对应的队列中,routingkey为自己指定
            args.put(RETRY_MESSAGE_TTL, 3 * 1000);
            return new Queue(YEWU1_DEMO_RETRY_QUEUE_NAME, true, false, false, args);
        }
    
        /**
         * 绑定以上定义关系
         * 
         * @return
         */
        @Bean
        public Binding retryDirectBinding() {
            return BindingBuilder.bind(yewu1DemoRetryQueue()).to(yewu1RetryExchange())
                .with(YEWU1_DEMO_RETRY_ROUTING_KEY);
        }
    
    }
    

    3.死信队列

    声明私信队列与绑定关系。

    @Configuration
    public class DeadConfig {
    
        /**
         * 死信队列
         */
        public final static String FAIL_QUEUE_NAME = "fail_queue";
    
        /**
         * 死信交换机
         */
        public final static String FAIL_EXCHANGE_NAME = "fail_exchange";
    
        /**
         * 死信routing
         */
        public final static String FAIL_ROUTING_KEY = "fail_routing";
    
        /**
         * 创建配置死信队列
         *
         */
        @Bean
        public Queue deadQueue() {
            return new Queue(FAIL_QUEUE_NAME, true, false, false);
        }
    
        /**
         * 死信交换机
         *
         * @return
         */
        @Bean
        public DirectExchange deadExchange() {
            DirectExchange directExchange = new DirectExchange(FAIL_EXCHANGE_NAME, true, false);
            return directExchange;
        }
    
        /**
         * 绑定关系
         * 
         * @return
         */
        @Bean
        public Binding failBinding() {
            return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(FAIL_ROUTING_KEY);
        }
    
    }
    

    4.生产者

    生产者如上文,通用代码。

    @RestController
    @RequestMapping("/TestRabbit")
    public class ProducerDemo {
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        //@RequestMapping("/sendDirect")
        String sendDirect(@RequestBody String message) throws Exception {
            System.out.println("开始生产");
            CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend(BusinessConfig.YEWU1_EXCHANGE, BusinessConfig.YEWU1_DEMO_ROUTINGKEY,
                message, data);
            System.out.println("结束生产");
            System.out.println("发送id:" + data);
            return "OK,sendDirect:" + message;
        }
    }
    

    5.消费者

    大量的逻辑,请参考注释。

    public enum RabbitEnum {
     
        /**
         * 处理成功
         */
        ACCEPT,
     
        /**
         * 可以重试的错误
         */
        RETRY,
     
        /**
         * 无需重试的错误
         */
        REJECT
    @Component
    public class ConsumerDemo {
    
        private final static Logger logger = LoggerFactory.getLogger(ConsumerDemo.class);
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        // @RabbitListener(queues = "yewu1_demo_queue")
        protected void consumer(Message message, Channel channel) throws Exception {
            RabbitEnum ackSign = RabbitEnum.RETRY;
            System.out.println(message.getMessageProperties().getCorrelationId());
            try {
                // 可以加入重复消费判断
                int i = 1 / 0;
    
            } catch (Exception e) {
                ackSign = RabbitEnum.RETRY;
                throw e;
            } finally {
                // 通过finally块来保证Ack/Nack会且只会执行一次
                if (ackSign == RabbitEnum.ACCEPT) {
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                } else if (ackSign == RabbitEnum.RETRY) {
                    String correlationData =
                        (String)message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
                    System.out.println(message.getMessageProperties().getCorrelationId());
                    long retryCount = getRetryCount(message.getMessageProperties());
                    if (retryCount >= 3) {
                        // 重试次数超过3次,则将消息发送到失败队列等待特定消费者处理或者人工处理
                        try {
                            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                            rabbitTemplate.convertAndSend(DeadConfig.FAIL_EXCHANGE_NAME, DeadConfig.FAIL_ROUTING_KEY,
                                message, new CorrelationData(correlationData));
                            logger.info("连续失败三次,将消息发送到死信队列,发送消息:" + new String(message.getBody()));
                        } catch (Exception e1) {
                            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                            logger.error("发送死信队列报错:" + e1.getMessage() + ",原始消息:" + new String(message.getBody()));
                        }
                    } else {
                        try {
                            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                            // 重试次数不超过3次,则将消息发送到重试队列等待重新被消费
                            rabbitTemplate.convertAndSend(RetryConfig.YEWU1_RETRY_EXCHANGE_NAME,
                                RetryConfig.YEWU1_DEMO_RETRY_ROUTING_KEY, message,
                                new CorrelationData(correlationData));
                            logger.info("消费失败,消息发送到重试队列;" + "原始消息:" + new String(message.getBody()) + ";第"
                                + (retryCount + 1) + "次重试");
                        } catch (Exception e1) {
                            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                            logger.error("消息发送到重试队列的时候,异常了:" + e1.getMessage() + ",重新发送消息");
                        }
                    }
                }
            }
        }
    
      
    
        /**
         * 获取消息被重试的次数
         */
        public long getRetryCount(MessageProperties messageProperties) {
            Long retryCount = 0L;
            if (null != messageProperties) {
                List<Map<String, ?>> deaths = messageProperties.getXDeathHeader();
                if (deaths != null && deaths.size() > 0) {
                    Map<String, Object> death = (Map<String, Object>)deaths.get(0);
                    retryCount = (Long)death.get("count");
                }
            }
            return retryCount;
        }
    }
    

    参考:https://www.cnblogs.com/mfrank/p/11260355.html

    展开全文
  • RabbitMQ重试机制

    千次阅读 2021-01-23 13:23:31
    1、RabbitMQ重试机制的简介 RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息连接是否已经断开,这个设置的原因是 RabbitMQ 允许消费者消费一条消息的时间...
  • 消息队列确实是ok的,毕竟题主的需求来看无碍乎就是一个循环的重试重试间隔需要指定,所以用延迟消息队列绝对能够解决问题,消息队列中间件选择很多BaLaLaLs提到的rabbitmq确实需要一个插件来做,也可以选择...
  • 消费失败重试机制

    2021-11-19 23:45:47
    我们可以利用Spring的retry机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。 修改consumer服务的application.yml文件,添加内容: spring: rabbitmq: listener: simple:
  • 今日在帮助朋友安装新版Office,由于他没有按照正常的流程卸载,导致产生了很多冗余文件。于是就百度了一下,经过不懈努力,终于找到了解决方案。...共享软件地址如下: OfficeRegClean: ...提取码:p8ei ...
  • 一、重试及重定向拦截器 第一个拦截器:RetryAndFollowUpInterceptor,主要就是完成两件事情:重试与重定向。 重试 请求阶段发生了 RouteException 或者 IOException会进行判断是否重新发起请求。 RouteException: ...
  • SpringBoot整合RabbitMQ重试机制及配置

    千次阅读 2022-03-28 16:42:00
    1.默认情况下,重试多次还是失败的话,会自动删除该消息(消息可能会丢失) 解决思路: A:如果重试多次还是失败的情况下,最终存放到死信队列. B:采用表日志记录,消费失败错误的日志记录 后期人工自动对消
  • 在进行真机调试时出现了如下问题: 针对此问题,我们使用微信开放文档给出的方法 我们可以在搜索框中输入分包,回车。 点击使用分包进行问题解决 基础能力/分包加载/使用分包 ...│ └── page...
  • 默认值为true,需要手动basicNack时这些参数谅失效了 retry: enabled: true #开启消费者 程序异常情况下会进行重试 max-attempts: 3 #重试次数 initial-interval: 2000 #消费者重试间隔次数 2s RabbitConfig @...
  • 变更日志0.0.2-快照添加了忽略某些标头的方法; Aded逻辑,用于删除标头中的双引号(添加了todo以优化此功能); 0.0.1-快照初次提交
  • 这篇博客中,我会讨论关于feign客户端的重试机制。本能的,我们会这样实现,在try catch和while循环中编写api调用语句,并为另一个api调用编写代码,直到满足条件。这也许能符合我们的目的,但是这会使得我们的代码...
  • 在一个微服务系统里,服务调用(同步或异步)失败处理是一个非常重要的一个环节,而失败重试又是一个常用的处理模式;本文从消息中间件的角度来分析下 AWS 上常见的消息服务的重试和失败处理策略。...
  • TCP重试机制与运用实例

    千次阅读 2018-02-05 21:21:31
    TCP重试机制与运用实例 second60 20180205 1. TCP重试机制简介   在TCP三次握手的过程中,当客户端发送SYN分节之后,如果没有收到服务端返回的确认ACK,那么TCP会在6s后继续发送SYN分节,一直没收到,25s继续...
  • 要在单个测试/套件上启用重试,请删除Cypress.currentTest使用, Cypress.currentTest采用测试配置替代,例如: // on a single test it ( 'test title' , { retries : 2 } , ( ) => { ... } ) // or on a suite ...
  • 先了解Redis 有序集合(sorted set) Redis 有序集合和集合一样也是string类型元素的集合,且不允许重复的成员。...集合是通过哈希表实现的,所以添加,删除,查找的复杂度都是O(1)。 集合中最大的成员数为 23...
  • 解决办法如下: (1)新建个TXT , (2)把下边的代码粘贴过去 DEL /F /A /Q \\?\%1RD /S /Q \\?...保存之后把名字改成Delete.bat ... (3)然后把要删除的文件夹拖到这... (4)最后删除需要删除的文件夹即可 ...
  • 重试任务(二)

    千次阅读 2020-12-29 17:53:23
    5、核心调度策略类 ScheduleStrategy,主要用于解析时间间隔,计算重试次数,下次重试时间等 6、核心重试任务管理类 RetryTaskManagerImpl,管理重试任务执行器的注册,重试任务的创建,重试任务的处理等。...
  • 它们一般的流程都是:每隔一段时间,去数据库获取有效的任务,然后执行,执行完成之后,删除任务或者将任务设置为失效。 那么这就可能存在一个潜在的风险:“雪崩效应”。 试想一下如下场景:我有个定时任务,每隔...
  • 经常出现文件删掉了,文件夹无法删除,出现“该项目不在请确认该项目位置,然后重试”的场景 问题描述 提示:这里描述项目中遇到的问题: 有很多的CDSN文章都是如此解决,在此列出,可能有人能成功解决。 解决办法...
  • 是消费者抛出异常后的一种重试机制,想要触发异常需要把异常抛出来 配置(配置类代码见最后): # 开启重试 spring.rabbitmq.listener.simple.retry.enabled=true # 重试次数,默认为3次 spring.rabbitmq.listener....
  • 真机调试文件超2M(message:Error: 代码包大小为3095 kb,上限为 2048 kb,请删除文件后重试
  • 首先声明一点,这里的重试并不是报错以后的重试,而是负载均衡客户端发现远程请求实例不可到达后,去重试其他实例。 feign重试 feign的重试机制默认是关闭的,源码如下 //FeignClientsConfiguration.java @Bean...
  • grpc 开发进阶 - 失败重试

    万次阅读 2020-05-14 18:41:54
    RPC调用失败情况分析 RPC 调用失败可以分为三种情况: RPC 请求还没有离开客户端 ...因为这两种情况,服务端的逻辑并没有开始处理请求,所以始终可以重试,也被称为透明重试(transparent retries) 对于第一
  • rabbimq消费者实现异常重试机制

    千次阅读 2018-07-05 10:37:31
    功能描述异常重试指的是当消费者处理消息异常失败时,为保证数据最终一致性,通过设置重试策略来对消息进行重复再消费。对于重试策略我们指定延迟多长时间重试一次,重试多少次,以及时间单位等。策略描述原理:利用...
  • 消息消费失败重新投递的流程 我们接着消息消费的逻辑分析,当...因为重试的消息只会消息的基本信息,没有具体的消息体,需要重新从commitLog中获取原来的消息获得消息体 // MQClientAPIImpl#consumerSendMessageBac.
  • MQ消费失败,自动重试思路 在遇到与第三方系统做对接时,MQ无疑是非常好的解决方案(解耦、异步)。但是如果引入MQ组件,随之要考虑的问题就变多了,如何保证MQ消息能够正常被业务消费。所以引入MQ消费失败情况下,...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 322,919
精华内容 129,167
关键字:

删除重试

友情链接: arbprogram.rar