精华内容
下载资源
问答
  • 楔子:翻了帖子两三天,硬是没有找到哪个帖子能证明生产端的消息重试是确实重试了的。大多要么是对概念、源码说明了一下,或者把实现示例贴贴,但基本并没有有效测试证明。想了想,还是自己来捋一捋这 RocketMQ 的...

    楔子:翻了帖子两三天,硬是没有找到哪个帖子能证明生产端的消息重试是确实重试了的。大多要么是对概念、源码说明了一下,或者把实现示例贴贴,但基本并没有有效测试证明。想了想,还是自己来捋一捋这 RocketMQ 的消息重试机制。

    由于 MQ 经常处于庞大的分布式系统中,考虑到网络波动、服务宕机、程序异常等因素,很可能会出现消息发送或者消费失败的问题。因此,如果没有消息重试,就有可能造成消息丢失,最终影响到系统某些业务或流程。所以,大部分消息中间件都对消息重试提供了很好的支持。RocketMQ 消息重试分为两种:Producer 发送重试 和 Consumer 消费重试

    1. 生产端重试

    也叫消息重投。一般由于网络抖动等原因,Producer 向 Broker 发送消息时没有成功,导致最终 Consumer 无法消费消息,此时 RocketMQ 会自动进行消息重试/重投。我们可以手动设置发送失败时的重试次数。默认为 2 次,但加上程序本身的 1 次发送,如果失败,总共会发送 3 次,也就是 N + 1 次。N 为 retryTimesWhenSendFailed。

    1.2. 源码分析

    验证前,我们先来撸一下源码:

    private SendResult sendDefaultImpl(Message msg, CommunicationMode communicationMode, SendCallback sendCallback, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // ...源码省略...
        // 获取当前时间
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        // 去服务器看下有没有主题消息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if(topicPublishInfo != null && topicPublishInfo.ok()) {
            // ...源码省略...
            // 通过这里可以很明显看出,如果是同步消息,则重试 变量值+1次;如果不是同步发送消息,那么消息重试只有1次
            int timesTotal = communicationMode == CommunicationMode.SYNC?1 + this.defaultMQProducer.getRetryTimesWhenSendFailed():1;
            // 重试累计次数
            int times = 0;
            String[] brokersSent = new String[timesTotal];
    
            while(true) {
                label129: {
                    String info;
                    // 如果重试累计次数 小于 总重试次数阀值,则轮询获取服务器主题消息
                    if(times < timesTotal) {
                        info = null == mq?null:mq.getBrokerName();
                        MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, info);
                        if(mqSelected != null) {
                            mq = mqSelected;
                            brokersSent[times] = mqSelected.getBrokerName();
    
                            long endTimestamp;
                            try {
                                beginTimestampPrev = System.currentTimeMillis();
                                if(times > 0) {
                                    msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                                }
                                // 消息投送耗时
                                long costTime = beginTimestampPrev - beginTimestampFirst;
                                // 如果 消息投送耗时 小于等于 超时时间,则向 Broker 进行消息重投;否则,超时
                                if(timeout >= costTime) {
                                    // 调用sendKernelImpl开始发送消息
                                    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                                    // ...源码省略...
                                    default:
                                        break label129;
                                    }
                                }
    
                                // 设置超时
                                callTimeout = true;
                            } catch (RemotingException var26) {
                                // ...源码省略...
                                // 当出现 RemotingException、MQClientException 和部分 MQBrokerException 时会重投
                                break label129;
                            } catch (MQClientException var27) {
                                // ...源码省略...
                                // 当出现 RemotingException、MQClientException 和部分 MQBrokerException 时会重投
                                break label129;
                            } catch (MQBrokerException var28) {
                                // ...源码省略...
                                // 当出现 RemotingException、MQClientException 和部分 MQBrokerException 时会重投
                                switch(var28.getResponseCode()) {
                                case 1:
                                case 14:
                                case 16:
                                case 17:
                                case 204:
                                case 205:
                                    break label129;
                                default:
                                    if(sendResult != null) {
                                        return sendResult;
                                    } else {
                                        throw var28;
                                    }
                                }
                            } catch (InterruptedException var29) {
                                // 源码省略......
                                throw var29;
                            }
                        }
                    }
    
                    if(sendResult != null) {
                        return sendResult;
                    }
    
                    // 重试日志
                    info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", new Object[]{Integer.valueOf(times), Long.valueOf(System.currentTimeMillis() - beginTimestampFirst), msg.getTopic(), Arrays.toString(brokersSent)});
                    info = info + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/");
                    MQClientException mqClientException = new MQClientException(info, (Throwable)exception);
                    // 如果是消息发送/重试/重投超时,则抛出异常。如果还有重试次数,该异常不会再对该消息进行重试
                    if(callTimeout) {
                        throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");
                    }
    
                    // ...源码省略...
                    // 默认走 MQClientException 异常
                    throw mqClientException;
                }
                // 重试次数累加
                ++times;
            }
        } else {
            // 如果没有可用 Topic,且 NamesrvAddr 地址列表不为空,则构建 MQClientException,没有重试
            List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
            if(null != nsList && !nsList.isEmpty()) {
                throw (new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10005);
            } else {
                // 如果没有可用 Topic,且 NamesrvAddr 地址列表为空,则构建 MQClientException,没有重试
                throw (new MQClientException("No name server address, please set it." + FAQUrl.suggestTodo("http://rocketmq.apache.org/docs/faq/"), (Throwable)null)).setResponseCode(10004);
            }
        }
    }

    从 DefaultMQProducer 源码分析可以看出:

    生产者重试几次?

    • 同步发送:默认 retryTimesWhenSendFailed 是 2次重试,所以除了正常调用 1 次外,发送消息如果失败了会重试 2 次;超时异常不会重试
    • 异步发送:不会重试(调用总次数等于1)
    • 单向发送:oneway 没有任何保证

    什么时候重试?

    • 当使用 RocketMQ 的 send() 方法发送消息时,出现 RemotingException、MQClientException 和部分 MQBrokerException 时会重投(见上面源码分析)。需要注意的是消息重试/重投是发生在 RocketMQ 内部,我们所能干预的是重试次数等。
    • 在多条消息发送的 for 循环下的 try catch 可以实现服务降级,防止前一条消息的发送失败阻断后面的消息发送,但是起不到消息重试的作用,原因如上,消息重试/重投是发生在 RocketMQ 内部。

    怎么重试?
    每次重试都会重新进行负载均衡(会考虑发送失败的因素),使用 selectOneMessageQueue 重新选择 MessageQueue,这样增大发送消息成功的可能性。

    隔多久重试?
    立即重试,中间没有单独的间隔时间。见源码 真死循环 while(true) 的 sendDefaultImpl 方法,里面有个 label129 标记,只要上一次发送消息后被标记为 label129,就会立马进行下一次消息重投,没有时间间隔。

     

    1.3. 代码示例

    配置一个不存在的 nameServer 地址,实现一个设置重试次数 retryTimesWhenSendFailed 为 2,但总共会重试/重投 3 次(因为 N + 1)的消息生产者:

    public class RetryMultiMqProducer {
    
        // Topic 为 Message 所属的一级分类,就像学校里面的初中、高中
        private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";
        // Tag 为 Message 所属的二级分类,比如初中可分为初一、初二、初三;高中可分为高一、高二、高三
        private static final String MQ_CONFIG_TAG_RETRY = "PID_MEIWEI_SMS_RETRY_PRODUCER";
    
        public static void main(String[] args) throws Exception {
            // 创建一个 producer 生产者
            DefaultMQProducer producer = new DefaultMQProducer("meiwei-producer-retry");
            // 指定 NameServer 地址列表,多个 nameServer 地址用半角分号隔开。此处应改为实际 NameServer 地址
            // NameServer 的地址必须有,但也可以通过启动参数指定、环境变量指定的方式设置,不一定要写死在代码里
            producer.setNamesrvAddr("127.0.0.1:9876;127.0.0.1:9877");
            // 设置重试次数,默认情况下是2次重试
            // 虽然默认2次,但算上程序本身的1次,其实有3次机会,即如果本身的1次发送成功,2次重试机制就不重试了;如果本身的1次发送失败,则再执行这2次重试机会
            producer.setRetryTimesWhenSendFailed(2);
            // 设置超时时长,默认情况下是3000毫秒,即3秒
            producer.setSendMsgTimeout(1000);
            // 在发送MQ消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
            producer.start();
    
            // 循环发送MQ测试消息
            String content = "";
            for (int i = 0; i < 5; i++) {
                try {
                    content = "【MQ测试消息】测试消息 " + i;
                    // 构建一条消息
                    Message message = new Message(MQ_CONFIG_TOPIC, MQ_CONFIG_TAG_RETRY, content.getBytes(RemotingHelper.DEFAULT_CHARSET));
                    // 发送消息,发送消息到一个 Broker。默认以同步方式发送
                    SendResult sendResult = producer.send(message);
    
                    // 消息发送成功
                    System.out.printf("Send MQ message success! Topic: %s, Tag: %s, MsgId: %s, Message: %s %n",
                            message.getTopic(), message.getTags(), sendResult.getMsgId(), new String(message.getBody()));
                } catch (Exception e) {
                    // 只有当出现 RemotingException、MQClientException 和部分 MQBrokerException 时会重投
                    System.out.printf(new Date() + ", 异常信息:%s %n", e);
                    Thread.sleep(1000);
                }
            }
    
            // 在发送完消息之后,销毁 Producer 对象。如果不销毁也没有问题
            producer.shutdown();
        }
    }

    1.4. 验证结果

    正常开启 RocketMQ 服务,启动生产者:

    从生产者输出的日志可以看到,后面的 4 条消息各重试了 3 次:

    org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [1]ms, Topic: TOPIC_MEIWEI_SMS_NOTICE_TEST, BrokersSent: [YYW-SH-PC-1454, YYW-SH-PC-1454, YYW-SH-PC-1454]

    再看看 RocketMQ 日志。如果是 Windows 安装的 RocketMQ,且使用的是默认日志配置,则可以在路径 C:\Users\yourname\logs\rocketmqlogs 下查看 rocketmq_client.log

    日志文件 rocketmq_client.log 输出了源码中的日志:sendKernelImpl exception, resend at once, InvokeID

     

    2. 消费端重试

    Consumer 消费消息失败后,要提供一种重试机制,令消息再消费一次。Consumer 消费消息失败通常可以认为有以下几种情况:

    1. 由于消息本身的原因,例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。这种错误通常需要跳过这条消息,再消费其它消息,而这条失败的消息即使立刻重试消费,99%也不成功,所以最好提供一种定时重试机制,即过10秒后再重试。
    2. 由于依赖的下游应用服务不可用,例如db连接不可用,外系统网络不可达等。遇到这种错误,即使跳过当前失败的消息,消费其他消息同样也会报错。这种情况建议应用 sleep 30s,再消费下一条消息,这样可以减轻 Broker 重试消息的压力。

    只有在消息模式为 MessageModel.CLUSTERING 集群模式时,Broker 才会自动进行重试,广播消息模式下不会自动进行重试。消费者消费消息后,需要给 Broker 返回消费状态。以 MessageListenerConcurrently 监听器为例,Consumer 消费完成后需要返回 ConsumeConcurrentlyStatus 消费状态。

    RocketMQ 会为每个消费组都设置一个 Topic 名称为 “%RETRY%+consumerGroup” 的重试队列(这里需要注意的是,这个 Topic 的重试队列是针对消费组,而不是针对每个 Topic 设置的),用于暂时保存因为各种异常而导致 Consumer 端无法消费的消息。考虑到异常恢复起来需要一些时间,会为重试队列设置多个重试级别,每个重试级别都有与之对应的重新投递延时,重试次数越多投递延时就越大。RocketMQ 对于重试消息的处理是先保存至 Topic 名称为 “SCHEDULE_TOPIC_XXXX” 的延迟队列中,后台定时任务按照对应的时间进行 Delay 后重新保存至 “%RETRY%+consumerGroup” 的重试队列中。

    2.1. 源码分析

    ConsumeConcurrentlyStatus 有 消费成功 和 消费失败 两种状态:

    public enum ConsumeConcurrentlyStatus {
        // 消费成功
        CONSUME_SUCCESS,
        // 消费失败,需要稍后重新消费
        RECONSUME_LATER;
    
        private ConsumeConcurrentlyStatus() {
        }
    }

    Consumer 端的重试包括两种情况:

    1. 异常重试:由于 Consumer 端逻辑出现了异常,导致返回了 RECONSUME_LATER 状态,那么 Broker 就会在一段时间后尝试重试。
    2. 超时重试:如果 Consumer 端处理时间过长,或者由于某些原因线程挂起,导致迟迟没有返回消费状态,Broker 就会认为 Consumer 消费超时,此时会发起超时重试。

    因此,如果 Consumer 端正常消费成功,一定要返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 状态。

    2.2. 生产者

    public class RetryExceptionMqProducer {
    
        private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";
        private static final String MQ_CONFIG_TAG_RETRY = "PID_MEIWEI_SMS_RETRY_EXCEPTION";
    
        public static void main(String[] args) throws Exception {
            DefaultMQProducer producer = new DefaultMQProducer("meiwei-producer-retry");
            producer.setNamesrvAddr("127.0.0.1:9876");
            producer.start();
    
            String content = "【MQ测试消息】测试消息 ";
            Message message = new Message(MQ_CONFIG_TOPIC, MQ_CONFIG_TAG_RETRY, content.getBytes(RemotingHelper.DEFAULT_CHARSET));
    
            // 发送消息,发送消息到一个 Broker。默认以同步方式发送
            SendResult sendResult = producer.send(message);
            System.out.printf("Send MQ message success! Topic: %s, Tag: %s, MsgId: %s, Message: %s %n",
                    message.getTopic(), message.getTags(), sendResult.getMsgId(), new String(message.getBody()));
    
            producer.shutdown();
        }
    }

    2.3. 异常重试

    实现一个设置了最大重试次数 maxReconsumeTimes 为 4,但业务异常中有重试阀值 3,满足阀值条件,则返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 不再重试的消费者:

    public class Retry4ExceptionMqConsumer {
        // Message 所属的 Topic 一级分类,须要与提供者的频道保持一致才能消费到消息内容
        private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";
        private static final String MQ_CONFIG_TAG_PUSH = "PID_MEIWEI_SMS_RETRY_EXCEPTION";
    
        public static void main(String[] args) throws Exception {
            // 声明并初始化一个 consumer
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("meiwei-consumer-retry-exception");
            // 同样也要设置 NameServer 地址,须要与提供者的地址列表保持一致
            consumer.setNamesrvAddr("127.0.0.1:9876");
            // 设置 consumer 所订阅的 Topic 和 Tag,*代表全部的 Tag
            consumer.subscribe(MQ_CONFIG_TOPIC, MQ_CONFIG_TAG_PUSH);
            // 设置最大重试数次
            consumer.setMaxReconsumeTimes(4);
    
            // 注册一个监听器,主要进行消息消费的逻辑处理
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    // 获取消息
                    MessageExt msg = list.get(0);
    
                    try {
                        // 获取重试次数
                        int reconsumeTimes = msg.getReconsumeTimes() + 1;
                        System.out.printf(new Date() + ",第 %s 次轮询消费 %n", reconsumeTimes);
    
                        // 模拟业务逻辑。此处为超过最大重试次数,自动标记消息消费成功
                        if (reconsumeTimes >= 3) {
                            System.out.printf(new Date() + ",超过最大重试次数,自动标记消息消费成功 Topic: %s, Tags: %s, Message: %s %n",
                                    msg.getTopic(), msg.getTags(), new String(msg.getBody()));
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
    
                        // 模拟异常发生
                        int num = 1 / 0;
                        System.out.printf(new Date() + ",第 %s 次正常消费 %n", reconsumeTimes);
    
                        // 返回消费状态
                        // CONSUME_SUCCESS 消费成功
                        // RECONSUME_LATER 消费失败,需要稍后重新消费
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    } catch (Exception e) {
                        // 获取重试次数
                        int reconsumeTimes = msg.getReconsumeTimes() + 1;
                        System.out.printf(new Date() + ",第 %s 次重试消费,异常信息:%s %n", reconsumeTimes, e);
                        // 每次重试时间间隔遵循延时等级递增:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
            });
    
            // 调用 start() 方法启动 consumer
            consumer.start();
            System.out.println("Retry Consumer Started.");
        }
    }

    2.3.1 测试结果

    2.3.2 实验总结

    1. 如果 maxReconsumeTimes 的指定重试次数 大于 业务重试最大重试阀值 reconsumeTimes,则完成业务逻辑处理,返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 后,不再重试;若此处没有返回 CONSUME_SUCCESS,则还会继续重试,到指定重试次数截止,即便没有返回 CONSUME_SUCCESS
    2. 如果 maxReconsumeTimes 的指定重试次数 小于 业务重试最大重试阀值 reconsumeTimes,则重试完指定重试次数后不再重试,即便没有返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS;若没有设定业务重试最大重试阀值校验也同理
    3. 如果没有设置 maxReconsumeTimes 的指定重试次数,也没有设定业务重试最大重试阀值处返回 CONSUME_SUCCESS,则会一直发起重试;如果重试 16 次还是没有返回 CONSUME_SUCCESS 成功状态,就会认为消息消费不了,丢进死信队列
    4. 以上重试时间间隔遵循延时等级逐次递增:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h;这种“时间衰减策略”进行消息的重复投递,即重试次数越多,消息消费成功的可能性越小
    5. 重试期间,即便关闭然后再次找开当前消费者,也能继续收到重试消息/进度状态
    6. 默认重试次数:Producer 生产端重试默认是 2 次,而 Consumer 消费端重试默认是 16 次
    7. 失效情况:Producer 生产端在异步发送情况下重试失效;而 Consumer 消费端在广播消费模式下重试失效

     

    2.4. 超时重试

    这里的超时重试并非真正意义上的超时,它是说获取消息后,因为某种原因没有给 RocketMQ 返回消费的状态,即没有return ConsumeConcurrentlyStatus.CONSUME_SUCCESS 或 return ConsumeConcurrentlyStatus.RECONSUME_LATER。这种情况 MQ 会无限制的发送消息给消费端,因为 RocketMQ 会认为该消息没有发送,所以会一直发送。

    2.4.1 代码示例

    public class Retry4TimeoutMqConsumer {
        // Message 所属的 Topic 一级分类,须要与提供者的频道保持一致才能消费到消息内容
        private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";
        private static final String MQ_CONFIG_TAG_PUSH = "PID_MEIWEI_SMS_RETRY_TIMEOUT";
    
        public static void main(String[] args) throws Exception {
            // 创建一个 consumer 消费者
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("meiwei-consumer-retry-timeout");
            // 同样也要设置 NameServer 地址,须要与提供者的地址列表保持一致
            consumer.setNamesrvAddr("127.0.0.1:9876");
            // 设置 consumer 所订阅的 Topic 和 Tag,*代表全部的 Tag
            consumer.subscribe(MQ_CONFIG_TOPIC, MQ_CONFIG_TAG_PUSH);
            // 设置消费超时时间(默认值15L,为15分钟)
            consumer.setConsumeTimeout(1L);
            // 设置最大重试数次
            consumer.setMaxReconsumeTimes(2);
    
            // 注册一个监听器,主要进行消息消费的逻辑处理
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    // 获取消息
                    MessageExt msg = list.get(0);
                    try {
                        // 获取重试次数
                        int reconsumeTimes = msg.getReconsumeTimes() + 1;
                        if (reconsumeTimes == 1) {
                            // 模拟操作:设置一个大于上面已经设置的消费超时时间 来验证超时重试场景(setConsumeTimeout(1L))
                            System.out.println("---------- 服务暂停 ---------- " + new Date());
                            Thread.sleep(1000 * 60 * 2);
                        } else {
                            System.out.println("---------- 重试消费 ---------- " + new Date());
                        }
    
                        System.out.printf(new Date() + " 第 %s 次重试消费:Topic: %s, Tags: %s, MsgId: %s, Message: %s %n",
                                reconsumeTimes, msg.getTopic(), msg.getTags(), msg.getMsgId(), new String(msg.getBody()));
    
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    } catch (Exception e) {
                        System.out.printf(new Date() + ",异常信息:%s %n", e);
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
            });
    
            // 调用 start() 方法启动 consumer
            consumer.start();
            System.out.println("Retry Timeout Consumer Started.");
        }
    }

    2.4.2 测试结果

    测试期间,当控制台输出日志 “服务暂停” 后,关闭当前消费者:

    再次开启该消费者:

    2.4.3 实验总结

    1. 重试期间,关闭当前消费者,再开启该消费者,合理区间内该消费者也能再次收到重试消息或者消费的进度状态
    2. 如果设置了指定最大重试次数,但有业务重试次数阀值校验中返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS 后,便不再重试

    RocketMQ进击物语:
    RocketMQ进击(零)RocketMQ这个大水池子
    RocketMQ进击(一)Windows环境下安装部署Apache RocketMQ
    RocketMQ进击(二)一个默认生产者,两种消费方式,三类普通消息详解分析
    RocketMQ进击(三)顺序消息与高速公路收费站
    RocketMQ进击(四)定时消息(延时队列)
    RocketMQ进击(五)集群消费模式与广播消费模式
    RocketMQ进击(六)磕一磕RocketMQ的事务消息和事务性消息的生产与消费
    RocketMQ进击(七)盘一盘RocketMQ的重试机制
    RocketMQ进击(八)RocketMQ的日志收集Logappender
    RocketMQ异常:RocketMQ顺序消息收不到或者只能收到一部分消息
    RocketMQ异常:Unrecognized VM option 'MetaspaceSize=128m'

    展开全文
  • 一、 Producer端重试 二、 Consumer端重试 1. Exception 2. Timeout 总结   对于MQ,可能存在各种异常情况,导致消息无法最终被Consumer消费掉,因此就有了消息失败重试机制。很显示,消息重试分为2种:...

     

    对于MQ,可能存在各种异常情况,导致消息无法最终被Consumer消费掉,因此就有了消息失败重试机制。很显示,消息重试分为2种:Producer端重试和Consumer端重试。

    一、 Producer端重试

    生产者端的消息失败,也就是Producer往MQ上发消息没有发送成功,比如网络抖动导致生产者发送消息到MQ失败。 
    这种消息失败重试我们可以手动设置发送失败重试的次数,看一下代码:

    /**
     * Producer,发送消息
     */
    public class Producer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
            DefaultMQProducer producer = new DefaultMQProducer("group_name");
            producer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876");
            producer.setRetryTimesWhenSendFailed(3);
            producer.start();
    
            for (int i = 0; i < 100; i++) {
                try {
                    Message msg = new Message("TopicTest",              // topic
                            "TagA",                                     // tag
                            ("HelloWorld - RocketMQ" + i).getBytes()    // body
                    );
                    SendResult sendResult = producer.send(msg, 1000);
                    System.out.println(sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                    Thread.sleep(1000);
                }
            }
    
            producer.shutdown();
        }
    }

                                                                     生产者端失败重试

    上图代码示例的处理手段是:如果该条消息在1S内没有发送成功,那么重试3次。

    producer.setRetryTimesWhenSendFailed(3); //失败的情况重发3次 
    producer.send(msg, 1000); //消息在1S内没有发送成功,就会重试

     

    二、 Consumer端重试

    消费者端的失败,分为2种情况,一个是exception,一个是timeout。

    1. Exception

    消息正常的到了消费者,结果消费者发生异常,处理失败了。例如反序列化失败,消息数据本身无法处理(例如话费充值,当前消息的手机号被注销,无法充值)等。 
    这里涉及到一些问题,需要我们思考下,比如,消费者消费消息的状态有哪些定义?如果失败,MQ将采取什么策略进行重试?假设一次性批量PUSH了10条,其中某条数据消费异常,那么消息重试是10条呢,还是1条呢?而且在重试的过程中,需要保证不重复消费吗?

    public enum ConsumeConcurrentlyStatus {
        /**
         * Success consumption
         */
        CONSUME_SUCCESS,
        /**
         * Failure consumption,later try to consume
         */
        RECONSUME_LATER;
    }

                                                                           ConsumeConcurrentlyStatus枚举的源码

    通过查看源码,消息消费的状态,有2种,一个是成功(CONSUME_SUCCESS),一个是失败&稍后重试(RECONSUME_LATER)

    RECONSUME_LATER的策略 
                                                                RECONSUME_LATER的策略 

    在启动broker的过程中,可以观察到上图日志,你会发现RECONSUME_LATER的策略:如果消费失败,那么1S后再次消费,如果失败,那么5S后,再次消费,……直至2H后如果消费还失败,那么该条消息就会终止发送给消费者了! 
    RocketMQ为我们提供了这么多次数的失败重试,但是在实际中也许我们并不需要这么多重试,比如重试3次,还没有成功,我们希望把这条消息存储起来并采用另一种方式处理,而且希望RocketMQ不要再重试呢,因为重试解决不了问题了!这该如何做呢? 
    看一段代码:

    /**
     * Consumer,订阅消息
     */
    public class Consumer {
    
        public static void main(String[] args) throws InterruptedException, MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
            consumer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.subscribe("TopicTest", "*");
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    try {
                        MessageExt msg = msgs.get(0);
                        String msgbody = new String(msg.getBody(), "utf-8");
                        System.out.println(msgbody + " Receive New Messages: " + msgs);
                        if (msgbody.equals("HelloWorld - RocketMQ4")) {
                            System.out.println("======错误=======");
                            int a = 1 / 0;
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        if (msgs.get(0).getReconsumeTimes() == 3) {
                            // 该条消息可以存储到DB或者LOG日志中,或其他处理方式
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功
                        } else {
                            return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
                        }
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            consumer.start();
            System.out.println("Consumer Started.");
        }
    }

     生产端发送了10条消息,看一下消费端的运行效果:

    RECONSUME_LATER的重试效果
    RECONSUME_LATER的重试效果

     

    观察上图发现,HelloWorld - RocketMQ4的消息的reconsumeTimes属性值发生了变化,其实这个属性就代表了消息重试的次数!因此我们可以通过reconsumeTimes属性,让MQ超过了多少次之后让他不再重试,而是记录日志等处理,也就是上面代码catch中的内容

    2. Timeout

    比如由于网络原因导致消息压根就没有从MQ到消费者上,那么在RocketMQ内部会不断的尝试发送这条消息,直至发送成功为止!(比如集群中一个broker失败,就尝试另一个broker) 
    延续Exception的思路,也就是消费端没有给RocketMQ返回消费的状态,即没有return ConsumeConcurrentlyStatus.CONSUME_SUCCESS或return ConsumeConcurrentlyStatus.RECONSUME_LATER,这样的就认为没有到达Consumer端。 
    下面进行模拟:

    1)消费端有consumer1和consumer2这样一个集群。 
    2)consumer1端的业务代码中暂停1分钟并且不发送接收状态给RocketMQ。

    public class Consumer1 {
    
        public static void main(String[] args) throws InterruptedException, MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
            consumer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.subscribe("Topic1", "Tag1 || Tag2 || Tag3");
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    try {                   
                            String topic = msg.getTopic();
                            String msgBody = new String(msg.getBody(),"utf-8");
                            String tags = msg.getTags();
                            System.out.println("收到消息:" + " topic:" + topic + " ,tags:" + tags + " ,msg:" + msgBody);
    
                            // 表示业务处理时间
                            System.out.println("=========开始暂停==========");
                            Thread.sleep(60000);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            consumer.start();
            System.out.println("Consumer Started.");
        }
    }

     

    Consumer1端Timeout异常测试代码

     

    3)启动consumer1和consumer2。 
    4)启动Producer,只发送一条数据。 
    看一下此时consumer1和consumer2的运行结果: 

    consumer1 
    Consumer1

    Consumer2-未接收到消息 
    Consumer2-未接收到消息

    发现consumer1接收到消息并且暂停,consumer2未接收到消息。

    5)关闭consumer1。 
    观察consumer2的运行结果: 

    Consumer2-接收到消息 
    Consumer2-接收到消息

    总结

    Producer端没什么好说的,Consumer端值得注意。对于消费消息而言,存在2种指定的状态(成功 OR 失败重试),如果一条消息在消费端处理没有返回这2个状态,那么相当于这条消息没有达到消费者,势必会再次发送给消费者!也即是消息的处理必须有返回值,否则就进行重发。

    转载于:https://blog.csdn.net/zhanglianhai555/article/details/77162208?ref=myrecommend

     

    展开全文
  • 在应用程序中,由于一些网络等不可预知的问题,我们的程序或者接口会失败,比如调用一个第三方的接口获取数据失败了,这时就需要重试机制,比如延时3S后重试、间隔不断增加重试等,而这些机制完全不需要你自己去实现...

    楔子

    在应用程序中,由于一些网络等不可预知的问题,我们的程序或者接口会失败,比如调用一个第三方的接口获取数据失败了,这时就需要重试机制,比如延时3S后重试、间隔不断增加重试等,而这些机制完全不需要你自己去实现,全部交给Spring Retry吧。

    使用

    1.在pom文件中添加相应的依赖
    <dependency>
        <groupId>org.springframework.retry</groupId>
        <artifactId>spring-retry</artifactId>
        <version>1.2.2.RELEASE</version>
    </dependency>
    

    最新版本可以在Maven Central找到
    Spring Retry使用AOP实现,所以必须要有spring-aspects依赖,

    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-aspects</artifactId>
    </dependency>
    
    2.在启动类或者配置类上添加注解@EnableRetry

    配置类

    @Configuration
    @EnableRetry
    public class AppConfig { ... }
    

    启动类

    @SpringBootApplication
    @EnableRetry
    public class Application
    
    3. 在需要重试的方法上添加注解@Retryable
    @Service
    public class DemoService {
    @Retryable(value= {Exception.class},maxAttempts = 3)
        public void call() throws Exception {
                System.out.println("do something...");
                throw new Exception("RPC调用异常");
        }
        @Recover
        public void recover(RemoteAccessException e) {
                System.out.println(e.getMessage());
        }
    }
    
    @Retryable(maxAttempts = 3, backoff = @Backoff(value = 3000, multiplier = 1.5))
    public Customer getCustomer(String customerId) {
    		if (true) {
    			JSONArray data = retObj.getJSONArray("data");
    			if (data != null && !data.isEmpty()) {
    				return data.toJavaList(Customer.class).get(0);
    			}
    		} else {
    			log.error("异常,{}", customerId);
    			throw new RuntimeException("获数据失败");
    		}
    		return null;
    }
    

    @Retryable被注解的方法发生异常时会重试

    @Retryable注解中的参数说明:
    maxAttempts :最大重试次数,默认为3,如果要设置的重试次数为3,可以不写;
    value:抛出指定异常才会重试
    include:和value一样,默认为空,当exclude也为空时,所有异常都重试
    exclude:指定不处理的异常,默认空,当include也为空时,所有异常都重试
    backoff:重试等待策略,默认使用@Backoff@Backoff的value默认为1000L,我们设置为2000L。

    @Backoff重试补偿机制,默认没有

    @Backoff注解中的参数说明:
    value:隔多少毫秒后重试,默认为1000L,我们设置为3000L;
    delay:和value一样,但是默认为0;
    multiplier(指定延迟倍数)默认为0,表示固定暂停1秒后进行重试,如果把multiplier设置为1.5,则第一次重试为2秒,第二次为3秒,第三次为4.5秒。

    4. 可以在指定方法上标记@Recover来开启重试失败后调用的方法(注意,需跟重处理方法在同一个类中)

    @Recover
    当重试到达指定次数时,被注解的方法将被回调,可以在该方法中进行日志处理。需要注意的是发生的异常和入参类型一致时才会回调。

    5. 采坑提示
    • 1、由于retry用到了aspect增强,所有会有aspect的坑,就是方法内部调用,会使aspect增强失效,那么retry当然也会失效。参考改链接
    public class demo {
        public void A() {
            B();
        }
    
        //这里B不会执行
        @Retryable(Exception.class)
        public void B() {
            throw new RuntimeException("retry...");
        }
    }
    
    • 2、重试机制,不能在接口实现类里面写。所以要做重试,必须单独写个service。
    • 3、maxAttemps参数解释的是说重试次数,但是我再打断点的时候发现这个=1时,方法一共只执行了一次。

    参考文献

    展开全文
  • 微服务调用总会通过Ribbon,同时里面会实现一些重试的机制,相关配置是:#最多重试多少台服务器 ribbon.MaxAutoRetriesNextServer=2 #每台服务器最多重试次数,但是首次调用不包括在内 ribbon.MaxAutoRetries=1在...

    背景

    本文基于Spring-Cloud, Daltson SR4

    微服务一般多实例部署,在发布的时候,我们要做到无感知发布;微服务调用总会通过Ribbon,同时里面会实现一些重试的机制,相关配置是:

    #最多重试多少台服务器
    ribbon.MaxAutoRetriesNextServer=2
    #每台服务器最多重试次数,但是首次调用不包括在内
    ribbon.MaxAutoRetries=1

    在发布时,为了适应Eureka注册中心的注册信息变换(参考Eureka上线下线解析),我们挨个重启实例,并且在每个实例启动后等待一段时间((Eureka客户端注册信息刷新时间+Eureka客户端Ribbon刷新事件)*3)再重启另外一个实例,来避免注册信息变化带来的影响,这样这个被重启的实例的微服务的调用方总能负载均衡重试调用到可用的实例。

    但是,实际生产中,我们发现,某个实例重启其他实例正常工作时,会有一小段时间,调用方调用到被重启的实例,直接失败,没有触发重试。

    代码分析

    无论上层是Feign调用还是Zuul调用,到了Ribbon这一层都是创建一个LoadBalancerCommand,调用其中的submit方法执行http请求,这里利用了RxJava机制:

    public Observable<T> submit(final ServerOperation<T> operation) {
        final ExecutionInfoContext context = new ExecutionInfoContext();
    
        if (listenerInvoker != null) {
            try {
                listenerInvoker.onExecutionStart();
            } catch (AbortExecutionException e) {
                return Observable.error(e);
            }
        }
    
        //这里就是读取上面说的配置最多重试多少台服务器以及每台服务器最多重试次数
        final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
        final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
    
        // 利用RxJava生成一个Observable用于后面的回调
        Observable<T> o = 
                //选择一个server进行调用
                (server == null ? selectServer() : Observable.just(server))
                .concatMap(new Func1<Server, Observable<T>>() {
                    @Override
                    // Called for each server being selected
                    public Observable<T> call(Server server) {
                        context.setServer(server);
                        //获取这个server调用监控记录,用于各种统计和LoadBalanceRule的筛选server处理
                        final ServerStats stats = loadBalancerContext.getServerStats(server);
    
                        //获取本次server调用的回调入口,用于重试同一实例的重试回调
                        Observable<T> o = Observable
                                .just(server)
                                .concatMap(new Func1<Server, Observable<T>>() {
                                    @Override
                                    public Observable<T> call(final Server server) {
                                        context.incAttemptCount();
                                        loadBalancerContext.noteOpenConnection(stats);
    
                                        if (listenerInvoker != null) {
                                            try {
                                                listenerInvoker.onStartWithServer(context.toExecutionInfo());
                                            } catch (AbortExecutionException e) {
                                                return Observable.error(e);
                                            }
                                        }
    
                                        final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
    
                                        return operation.call(server).doOnEach(new Observer<T>() {
                                            private T entity;
                                            @Override
                                            public void onCompleted() {
                                                recordStats(tracer, stats, entity, null);
                                                // TODO: What to do if onNext or onError are never called?
                                            }
    
                                            @Override
                                            public void onError(Throwable e) {
                                                recordStats(tracer, stats, null, e);
                                                logger.debug("Got error {} when executed on server {}", e, server);
                                                if (listenerInvoker != null) {
                                                    listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
                                                }
                                            }
    
                                            @Override
                                            public void onNext(T entity) {
                                                this.entity = entity;
                                                if (listenerInvoker != null) {
                                                    listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
                                                }
                                            }                            
    
                                            private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
                                                tracer.stop();
                                                loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
                                            }
                                        });
                                    }
                                });
                        //设置针对同一实例的重试回调
                        if (maxRetrysSame > 0) 
                            o = o.retry(retryPolicy(maxRetrysSame, true));
                        return o;
                    }
                });
        //设置重试下一个实例的回调    
        if (maxRetrysNext > 0 && server == null) 
            o = o.retry(retryPolicy(maxRetrysNext, false));
        //设置重试超过次数则终止调用并设置对应异常的回调
        return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
            @Override
            public Observable<T> call(Throwable e) {
                if (context.getAttemptCount() > 0) {
                    if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
                        e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
                                "Number of retries on next server exceeded max " + maxRetrysNext
                                + " retries, while making a call for: " + context.getServer(), e);
                    }
                    else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
                        e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
                                "Number of retries exceeded max " + maxRetrysSame
                                + " retries, while making a call for: " + context.getServer(), e);
                    }
                }
                if (listenerInvoker != null) {
                    listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
                }
                return Observable.error(e);
            }
        });
    }

    我们重点看一下设置重试的回调的详细回调代码:

    private Func2<Integer, Throwable, Boolean> retryPolicy(final int maxRetrys, final boolean same) {
        return new Func2<Integer, Throwable, Boolean>() {
            //只有返回为true的时候才会retry
            @Override
            public Boolean call(Integer tryCount, Throwable e) {
                //抛出的异常是AbortExecutionException则不重试
                if (e instanceof AbortExecutionException) {
                    return false;
                }
    
                //超过最大重试次数则不重试
                if (tryCount > maxRetrys) {
                    return false;
                }
    
                if (e.getCause() != null && e instanceof RuntimeException) {
                    e = e.getCause();
                }
                //判断是否是可以重试的exception
                return retryHandler.isRetriableException(e, same);
            }
        };
    }

    这个判断是否是可以重试的exception里面的逻辑是:

    public boolean isRetriableException(Throwable e, boolean sameServer)
    {
        //如果已经配置了ribbon.okToRetryOnAllErrors为true,则不论什么异常都会重试,我们没有这么配置,一般也不会这么配置
        if (okToRetryOnAllErrors)
        {
            return true;
        }
        else if (e instanceof ClientException)
        {
            ClientException ce = (ClientException) e;
            if (ce.getErrorType() == ClientException.ErrorType.SERVER_THROTTLED)
            {
                return !sameServer;
            }
            else
            {
                return false;
            }
        }
        else
        {
    
            if (e instanceof RetryableHttpCodeAndMethodException)
            {
             //如果是有response返回的异常就会到这里
                if (((RetryableHttpCodeAndMethodException) e).getMethod().equals("GET") || okToRetryOnAllOperations)
                    return true;
                return false;
            }
            //其他情况,就是连接失败的判断。首先需要配置ribbon.okToRetryOnConnectErrors为true,这个默认就是true;然后通过isConnectionException判断
            return okToRetryOnConnectErrors && isConnectionException(e);
        }
    }

    最后,我们来看看如何判断一个Exception为ConnectionException:

    protected List<Class<? extends Throwable>> connectionRelated = Lists
                .<Class<? extends Throwable>> newArrayList(SocketException.class);
    public boolean isConnectionException(Throwable e)
    {
        return Utils.isPresentAsCause(e, connectionRelated);
    }

    这个方法其实就看这个异常的异常以及Cause中是否有SocketException,如果有则返回true。

    问题定位

    在Windows环境下调试,我们发现一个有意思的现象,当我们设置ribbon连接超时
    ribbon.ConnectTimeout=500时(这个和我们线上配置一样),重试失败,捕获到“java.net.SocketTimeoutException: connect timed out”这个Exception;当设置连接超时为1000ms以上时(不包括1000),抛出的异常就是“java.net.ConnectException: Connection refused: connect”

    我们写一段测试代码看一下:

     public static void main(String[] args) throws IOException {
        Socket socket = new Socket();
        try {
            socket.connect(new InetSocketAddress("127.0.0.1", 8080), 500);
        } catch (Exception e) {
            e.printStackTrace();
        }
        socket = new Socket();
        socket.connect(new InetSocketAddress("127.0.0.1", 8080), 1100);
    
    }

    这个端口没有启用,输出为:

    java.net.SocketTimeoutException: connect timed out
        at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
        at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
        at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.net.Socket.connect(Socket.java:589)
        at com.hash.test.TestRxJava.main(TestRxJava.java:14)
    Exception in thread "main" java.net.ConnectException: Connection refused: connect
        at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
        at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
        at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
        at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
        at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
        at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
        at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
        at java.net.Socket.connect(Socket.java:589)
        at com.hash.test.TestRxJava.main(TestRxJava.java:19)

    就是不一样的Exception

    而SocketTimeoutException不是一种SocketException,所以,原有的重试逻辑不能重试。

    对于这个问题,我在Feign的github源代码库提了个issue

    所以,我们要改造isConnectionException这个方法;对于SocketTimeoutException,不是全都重试,只重试msg为connect timed out的Exception。同时,SocketTimeoutException可能会被封装,我们为了简单,只通过msg进行判断:

    public boolean isConnectionException(Throwable e)
    {
        return Utils.isPresentAsCause(e, connectionRelated)
                || e.getMessage().contains("connect timed out");
    }

    这段代码,也提了Pull Request

    修改替换源代码后,线上问题解决

    展开全文
  • retryIfException,抛出runtime异常、checked异常时都会重试,但是抛出error不会重试。retryIfRuntimeException只会在抛runtime异常的时候才重试,checked异常和error都不重试。retryIfExceptionOfType允许我们只在...
  • 今天来介绍下Python中从requests请求重试到万能重试装饰器。 从requests请求重试到万能重试装饰器 重试,在编写代码的过程中,是一个很常见的需求。 比如: 请求重试(例如:超时) 文件占用 IO阻塞等待 那么,...
  • Azkaban任务失败重试重试间隔命令

    千次阅读 2018-09-14 11:43:01
    Azkaban任务失败重试重试间隔命令 在.job文件中,添加如下命令: retries=12 retry.backoff=300000
  • noWait失败后立刻重试,fixedWait间隔固定时间之后重试,WaitStrategies.randomWait间隔随机时间后重试,incrementingWait增量重试,fibonacciWait按照斐波那契数列重试,exponentialWait按照指数递增(2的n次方)来...
  • 消息重试

    千次阅读 2017-05-07 13:46:20
    MQ 消息重试 本文档主要介绍 MQ 消费者的消费逻辑失败时,通过设置返回状态可达到消息重试的结果。 MQ 消息重试只针对集群消费方式生效;广播方式不提供失败重试特性,即消费失败后,失败消息不再重试,继续...
  • 层层递进打造你的重试机制

    万次阅读 2019-08-25 15:49:32
    重试机制在分布式系统中,或者调用外部接口中,都是十分重要的。 重试机制可以保护系统减少因网络波动、依赖服务短暂性不可用带来的影响,让系统能更稳定的运行的一种保护机制。 为了方便说明,先假设我们想要进行...
  • guzzle请求重试

    千次阅读 2020-04-25 14:28:59
    在网络不稳定或者其他一些原因导致会偶现请求失败的情况,所以就需要根据一定规则进行请求重试 而本文中请求重试就是利用中间件系统实现的。 使用php的trait特性 use GuzzleHttp\Client; use Guzzle...
  • 昨天晚上,我收到了来自表弟发来的一条信息,这吊毛已经很久没联系我了,突然间...“错误,呃,出错了,请稍后重试”,这个之前我遇到过,加速器没有打开,就会提示这个,把加速器打开就可以继续操作了! 我叫表弟把他.
  • Springboot 整合Retry 实现重试机制

    千次阅读 2019-08-16 13:51:01
    重试,在项目需求中是非常常见的,例如遇到网络波动等,要求某个接口或者是方法可以最多/最少调用几次; 实现重试机制,非得用Retry这个重试框架吗?那肯定不是,相信很多伙伴手写一下控制流程的逻辑也可以达到重试...
  • RocketMQ(四)——消息重试

    万次阅读 2017-08-17 16:11:39
    对于MQ,可能存在各种异常情况,导致消息无法最终被Consumer消费掉,因此就有了消息失败重试机制。很显示,消息重试分为2种:Producer端重试和Consumer端重试
  • 友情提示 : 超时重试的解决方案在最下面,可直接查看 首先介绍一下我的版本是httpclient 4.3.4,采用的是 PoolingHttpClientConnectionManager 连接池的方式构造CloseableHttpClient,代码如下: 接下来,执行如...
  • 如何优雅地重试

    万次阅读 多人点赞 2021-01-05 10:00:00
    背景在微服务架构中,一个大系统被拆分成多个小服务,小服务之间大量 RPC 调用,经常可能因为网络抖动等原因导致 RPC 调用失败,这时候使用重试机制可以提高请求的最终成功率,减少故障影响...
  • 接口调用失败重试方案

    万次阅读 2018-12-20 10:59:45
    在项目开发中,有时候会出现接口调用失败,本身调用又是异步的,如果是因为一些网络问题请求超时,总想可以重试几次把任务处理掉。 一些RPC框架,比如dubbo都是有重试机制的,但是并不是每一个项目多会使用dubbo...
  • SpringCloud重试机制配置 首先声明一点,这里的重试并不是报错以后的重试,而是负载均衡客户端发现远程请求实例不可到达后,去重试其他实例。 Ribbon+RestTemplate的重试 对于整合了Ribbon的RestTemplate,例如一...
  • Java 实现重试机制

    千次阅读 2019-11-06 18:29:56
    2)循环重试(包括:重试次数,重试间隔) 3、代码 package com.retry; /** * @author liuxd * @version 1.0 * @date 2019-11-06 17:36 */ import java.util.Random; import java.util.conc...
  • 自动化测试框架[Cypress重试机制]

    千次阅读 2020-12-10 17:50:35
    重试机制是Cypress的核心概念,由于如今的web应用几乎都是异步加载的,那么如果断言发生的时候程序尚未更新DOM该如何处理?如果断言发生的时候应用程序正在等待其后端响应而导致页面暂时没有结果该如何处理?如果...
  • 解决:该公众号提供的服务出现故障,请稍后重试

    万次阅读 多人点赞 2018-07-01 22:14:31
    最近在进行微信公众号接口开发,做到自动回复的接口开发时,我按照接入指南接口文档开发好功能后,推上线了测试(你也可以使用ngrok内网穿透)调试发现:总是出现该公众号提供的服务出现故障,请稍后重试的提示。...
  • 对于开发过网络应用程序的程序员来说,重试并不陌生,由于网络的拥堵和波动,此刻不能访问服务的请求,也许过一小段时间就可以正常访问了。随着重试逻辑的不断变化,上面代码会越来越复杂。而且重试逻辑,其实是各个...
  • 项目中有个接口必须要上报成功,否则要进行重试重试次数为9,重试时间为1,2,4,8,16,24,32,64,128,256,512,1024
  • 此时需要增加重试机制。刚出来的时候掉接口需要三次重试,由于对httpclient不是很了解。只能在for循环里面对异常经常处理并重新调接口。后来做http服务端的时候,有次debug偶然发现客户端调一次请求,服务端会跳多次...
  • kafka消息重试

    千次阅读 2019-02-02 15:07:09
    kafka没有重试机制不支持消息重试,也没有死信队列,因此使用kafka做消息队列时,如果遇到了消息在业务处理时出现异常,就会很难进行下一步处理。应对这种场景,需要自己实现消息重试的功能。 如果不想自己实现消息...
  • Retry重试机制

    万次阅读 2016-12-25 18:53:36
    业务场景  应用中需要实现一个功能: 需要将数据上传到远程存储服务,同时在返回处理成功情况下做其他操作。这个功能不复杂,分为两个步骤:第一... 这个问题的技术点在于能够触发重试,以及重试情况下逻辑有效执行
  • 分布式环境下,重试是高可用技术中的一个部分,大家在调用RPC接口或者发送MQ时,针对可能会出现网络抖动请求超时情况采取一下重试操作,自己简单的编写重试大多不够优雅,而重试目前已有很多技术实现和框架支持,但...
  • 实现HttpClient重试

    千次阅读 2017-09-30 16:46:06
    场景应用 目前程序中涉及到需要callback操作,product需要被动的接收consume的处理状态,为了最大程度的能够callback成功因此consume在...1、针对异常的重试 例如:connect timed out/read timed out HttpClientBuilder
  • grpc 开发进阶 - 失败重试

    万次阅读 2020-05-14 18:41:54
    RPC调用失败情况分析 RPC 调用失败可以分为三种情况: RPC 请求还没有离开客户端 ...因为这两种情况,服务端的逻辑并没有开始处理请求,所以始终可以重试,也被称为透明重试(transparent retries) 对于第一
  • Python-重试机制

    千次阅读 2019-10-10 14:48:23
    retrying是Python编写的通用重试库,用于简化向任何东西添加到重试行为的任务,最低支持Python3.5!下面来看一下retring库的使用: 安装 pip install retrying 亦或者 easy_install retrying 举个栗子 下面我们来...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 823,172
精华内容 329,268
关键字:

重试