精华内容
下载资源
问答
  • kafka-整理-重试机制
    千次阅读
    2021-05-24 09:22:00

    首先:
    1:kafka是拉取模式的消息队列,是消费者控制什么时候拉取消息的;
    2:每条消息都有一个偏移量,每个消费者都会跟踪最近消费消息的偏移量;

    当消费者消费某条消息失败时?

    有下面几种处理方式:

    1:重试,不停的重试,直到成功;
    可能导致的问题:
    问题是若是这条消息(通过目前的代码)可能永远不能消费成功,
    导致消费者不会继续处理后续的任何问题,导致消费者阻塞;

    2:跳过,跳过这条没有消费的消息;
    这个其实是不合理的,可能会造成数据不一致性;

    解决方案?

    1:方案一【重试】

    这里需要建立一个专门用于重试的topic(retry topic);
    当消费者没有正确消费这条消息的时候,
    则将这条消息转发(发布)到重试主题(retry topic)上,然后提交消息的偏移量,以便继续处理下一个消息;
    注意:
    这个时候,这个没有正确消费的消息,其实对于这个消费者来说,也算是消费完成了,因为也正常提交了偏移量,只不过是业务没有正确处理,而且这个消息被发布到另一个topic中了(retry topic);
    ;;
    之后再创建一个重试消费者,用于订阅这个重试主题,
    只不过这个重试消费者,跟之前那个消费者处理相同的业务,两个逻辑是一样的;
    ;;
    如果这个重试消费者,也无法小得这条消息,那就把这个消息发布到另一个重试主题上,并提交该消息的偏移量;
    ;;
    循环,递归
    ;;
    最后,当创建了很多重试消费者的时候,在最终重试消费者无法处理某条消息后,把该消息发布到一个死信队列(DLQ);

    这种方式优缺点:
    优点:可以重试,可能在重试中,就正常消费了;
    缺点:可能一直重试,都不会正常消费,一直到死信队列中;
    这个问题就比较大了,可能我们就是消费的是一个错误的消息,
    比如说缺字段,或者数据库中根本就没有这个业务的id;
    或者消息中包含特殊字段,导致无法消费;

    这种是消息本身的错误,导致无法消费,除非你去解决消息,不然是永远不会成功的;

    建议:
    对于第一种:
    消息正确,是因为业务或者其他问题导致第一次消费失败,其实可以让消费者自己去重试,知道消费成功;
    对于第二种:
    当然可以用上面那个重试主题,不然你这个消息阻塞了我后面的后续的消费,所以需要把这个消息分流,分流该消息会为我们的消费清除障碍;

    重试主题的缺点:

    1:它会忽略排序
    当事件发布到同一分区时,可以保证各个事件按照它们发生的顺序进行处理。如果对同一聚合进行连续更改,并且所产生的事件发布到不同的分区,就可能发生争用状况,也就是消费者在消费第一个更改之前就消费了第二个更改。这会导致数据不一致。

    重试主题真正出问题的地方。它让我们的消费者容易打乱处理事件的顺序。

    重试主题什么时候可用:

    需要明确的是,重试主题并非一直都是错误的模式。当然,它也存在一些合适的用例。具体来说,当消费者的工作是收集不可修改的记录时,这种模式就很不错。这样的例子可能包括:

    统计日志啊
    记录用户操作记录啊

    这类消费者可能会从重试主题模式中受益,同时没有数据损坏的风险。

    重试主题怎么优化:

    1:消除错误类型;
    如果我们能在生产者发送该消息的时候,就确定这个消息是正常的,可以被消费的,那么就没必要用重试主题了,直接让消费者重试就好了;
    解决方案:
    在消息的body中增加一个字段:
    isRetry:是否重试
    那么我们消费者拿到这个消息后,可以根据这个字段来判断生产者是否需要我们重试,
    当然这个是最简单的一种;
    还有根据错误类型来判断的:

    void processMessage(KafkaMessage km) {
      try {
        Message m = km.getMessage();
        transformAndSave(m);
      } catch (Throwable t) {
        if (isRecoverable(t)) {
          // ...
        } else {
          // ...
        }
      }
    }
    

    在上面的 Java 伪代码示例中,isRecoverable()将采用一种白名单方法来确定 t 是否表示可恢复错误。换句话说,它检查 t 以确定它是否与任何已知的可恢复错误(例如 SQL 连接错误或 ReST 客户端超时)相匹配,如果匹配则返回 true,否则返回 false。这样就能防止我们的消费者被不可恢复错误一直阻塞下去。

    当然这种其实也不太准,
    例如,一个 SQLException 可能指的是一次数据库故障(可恢复)或一次约束违反状况(不可恢复)。

    2:在消费者内重试可恢复错误

    可恢复错误:就是在重试一段时间后,可能成功的;
    不可恢复错误:你直接就是消息体错误,缺字段,结构错误,有特殊字符等;

    存在可恢复错误时,将消息发布到重试主题毫无意义。我们只会为下一条消息的失败扫清道路。相反,消费者可以简单地重试,直到条件恢复。

    当然,出现可恢复错误意味着外部资源存在问题。我们不断对这块资源发送请求是无济于事的。因此,我们希望对重试应用一个退避策略。我们的伪 Java 代码现在可能看起来像这样:

    void processMessage(KafkaMessage km) {
      try {
        Message m = km.getMessage();
        transformAndSave(m);
      } catch (Throwable t) {
        if (isRecoverable(t)) {
          doWithRetry(m, Backoff.EXPONENTIAL, this::transformAndSave);
        } else {
          // ...
        }
      }
    }
    

    (注意:我们使用的任何退避机制都应配置为在达到某个阈值时向我们发出警报,并通知我们潜在的严重错误)

    2:遇到不可恢复错误时,将消息直接发送到最后一个主题

    另一方面,当我们的消费者遇到不可恢复错误时,我们可能希望立即隐藏(stash)该消息,以释放后续消息。但在这里使用多个重试主题会有用吗?答案是否定的。在转到 DLQ 之前,我们的消息只会经历 n 次消费失败而已。那么,为什么不从一开始就将消息粘贴在那里呢?

    与重试主题一样,这个主题(在这里,我们将其称为隐藏主题)将拥有自己的消费者,其与主消费者保持一致。但就像 DLQ 一样,这个消费者并不总是在消费消息;它只有在我们明确需要时才会这么做。

    总结:

    不论我们是直接用消费者的重试,还是发送到重试主题,没有最好的解决方案,只有最适合自己的,需要根据自己的实际业务来选择最合适的处理方式,
    最好能加人工告警机制,【代码不够,人工来凑】,这个也是我开发中经常用来背锅的一种思路;

    更多相关内容
  • 当某些接口超时、返回的数据有问题时需要对接口进行重试,但是有的接口需要重试三次,有的需要重试两次,有的不需要重试;有的返回连接超时才重试,有的读取超时才重试,有的404才重试;有的返回-1才重试,有的返回...
  • 主要介绍了Java编程Retry重试机制实例详解,分享了相关代码示例,小编觉得还是挺不错的,具有一定借鉴价值,需要的朋友可以参考下
  • 上一篇《RocketMQ:消息重试》中我们提到当一条消息消费失败时,RocketMQ会进行一定次数的重试重试的结果也很简单,无非就是在第N次重试时,被成功消费。或者就是经过M次重试后,仍然没有被消息。这通常是由于消费...

    1. 死信队列

    上一篇《RocketMQ:消息重试》中我们提到当一条消息消费失败时,RocketMQ会进行一定次数的重试。重试的结果也很简单,无非就是在第N次重试时,被成功消费。或者就是经过M次重试后,仍然没有被成功消费。这通常是由于消费者在正常情况下无法正确地消费该消息。此时,RocketMQ不会立即将消息丢弃,而是将其发送到该消费者对应的特殊队列中去。

    在RocketMQ中,这种正常情况下无法被消费的消息被称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

    1.1 死信特性

    (1)死信消息具有以下特性:

    • 不会再被消费者正常消费。
    • 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。

    (2)死信队列具有以下特性:

    • 一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
    • 如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
    • 一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。

    1.2 查看死信消息

    (1)在控制条查询出现死信队列的主题信息
    在这里插入图片描述

    (2)在消费界面根据主题查询死信消息
    在这里插入图片描述
    (3)选择重新发送消息

    一条消息进入死信队列,意味着某些因素导致消费者无法正常消费该消息。因此,通常需要我们对其进行特殊处理。排查可疑因素并解决问题后,可以在消息队列 RocketMQ 控制台重新发送该消息,让消费者重新消费一次。

    2.重试次数参数

    RocketMQ的重试机制涉及发送端重试和消费端重试,消费端重试关联死信队列

    2.1 Producer端重试

    生产者端的消息失败,也就是Producer往MQ上发消息没有发送成功,比如网络抖动导致生产者发送消息到MQ失败。

    这种消息失败重试我们可以手动设置发送失败重试的次数,看一下代码:

    public class DefaultMQProducer  {
    	//设置消息发送失败时的最大重试次数
    	public void setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed) {
    	   this.retryTimesWhenSendFailed = retryTimesWhenSendFailed;
    	}
    

    在这里插入图片描述

    2.2 Consumer端重试

    注:只有在消息模式为MessageModel.CLUSTERING集群模式时,Broker才会自动进行重试,广播消息是不会重试的。

    消费者消费消息后,需要给Broker返回消费状态。以MessageListenerConcurrently监听器为例,Consumer消费完成后需要返回ConsumeConcurrentlyStatus并发消费状态。查看源码,ConsumeConcurrentlyStatus是一个枚举,共有两种状态:

    public enum ConsumeConcurrentlyStatus {
       //消费成功
       ConsumeConcurrentlyStatus,
    
       //消费失败,一段时间后重试
       RECONSUME_LATER;
    }
    

    Consumer端的重试包括两种情况

    • 异常重试:由于Consumer端逻辑出现了异常,导致返回了RECONSUME_LATER状态,那么Broker就会在一段时间后尝试重试。
    • 超时重试:如果Consumer端处理时间过长,或者由于某些原因线程挂起,导致迟迟没有返回消费状态,Broker就会认为Consumer消费超时,此时会发起超时重试。

    因此,如果Consumer端正常消费成功,一定要返回ConsumeConcurrentlyStatus.ConsumeConcurrentlyStatus状态。

    3.1 异常重试

    RocketMQ可在broker.conf文件中配置Consumer端的重试次数和重试时间间隔,如下:

    messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    

    默认18时间间隔,表示重试18次,可以减少配置的数量吗?

    但是在大部分情况下,如果Consumer端逻辑出现异常,重试太多次也没有很大的意义,我们可以在代码中指定最大的重试次数。如下:

    即提前返回成功状态,是假的成功,也不会进入死信队列。反之,捕获异常,每次返回RECONSUME_LATER,到18次就会进入死信队列

    package william.rmq.consumer.quickstart;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Service;
    import org.springframework.util.CollectionUtils;
    import william.rmq.common.constant.RocketMQConstant;
    
    import javax.annotation.PostConstruct;
    import java.util.List;
    
    /**
    
    * @Description:RocketMQ消息消费者
    */
    @Slf4j
    @Service
    public class MessageConsumer implements MessageListenerConcurrently {
       @Value("${spring.rocketmq.namesrvAddr}")
       private String namesrvAddr;
    
       private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");
    
    
       @PostConstruct
       public void start() {
           try {
               consumer.setNamesrvAddr(namesrvAddr);
    
               //从消息队列头部开始消费
               consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
               //设置集群消费模式
               consumer.setMessageModel(MessageModel.CLUSTERING);
    
               //订阅主题
               consumer.subscribe("DefaultCluster", "*");
    
               //注册消息监听器
               consumer.registerMessageListener(this);
    
               //启动消费端
               consumer.start();
    
               log.info("Message Consumer Start...");
               System.err.println("Message Consumer Start...");
           } catch (MQClientException e) {
               log.error("Message Consumer Start Error!!",e);
           }
    
       }
    
       @Override
       public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
           if (CollectionUtils.isEmpty(msgs)) {
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
           }
    
           MessageExt message = msgs.get(0);
           try {
               //逐条消费
               String messageBody = new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET);
               System.err.println("Message Consumer: Handle New Message: messageId: " + message.getMsgId() + ",topic: " +
                       message.getTopic() + ",tags: " + message.getTags() + ",messageBody: " + messageBody);
    
               //模拟业务异常
               int i = 1 / 0;
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
           } catch (Exception e) {
               log.error("Consume Message Error!!", e);
               //抛出异常时,返回ConsumeConcurrentlyStatus.RECONSUME_LATER,尝试重试。当重试指定次数后返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS
               int reconsumeTimes = message.getReconsumeTimes();
               System.err.println("Now Retry Times: " + reconsumeTimes);
               if (reconsumeTimes >= RocketMQConstant.MAX_RETRY_TIMES) {
                   return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
               }
               return ConsumeConcurrentlyStatus.RECONSUME_LATER;
           }
       }
    
    }
    
    

    可以看到控制台打印如下:

    Now Retry Times: 3
    Message Consumer: Handle New Message: messageId: 0A0E096CA14618B4AAC2562C6D5B0000,topic: DefaultCluster,tags: Tags,messageBody: Message-1
    Now Retry Times: 3
    Message Consumer: Handle New Message: messageId: C0A81FFA7FF318B4AAC24A37C32C0007,topic: DefaultCluster,tags: Tags,messageBody: Order-2-完成
    Now Retry Times: 3
    Now Retry Times: 3
    Message Consumer: Handle New Message: messageId: C0A81FFA7FF318B4AAC24A37C3290006,topic: DefaultCluster,tags: Tags,messageBody: Order-2-支付
    Now Retry Times: 3
    Now Retry Times: 3
    
    

    消息重试指定的次数后,就返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS不再重试了。

    3.2 超时重试

    当Consumer处理时间过长,在超时时间内没有返回给Broker消费状态,那么Broker也会自动重试

    package william.rmq.consumer.quickstart;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Service;
    import org.springframework.util.CollectionUtils;
    import william.rmq.common.constant.RocketMQConstant;
    
    import javax.annotation.PostConstruct;
    import java.util.List;
    
    /**
    * @Auther: ZhangShenao
    * @Date: 2018/9/7 11:06
    * @Description:RocketMQ消息消费者
    */
    @Slf4j
    @Service
    public class MessageConsumer implements MessageListenerConcurrently {
       @Value("${spring.rocketmq.namesrvAddr}")
       private String namesrvAddr;
    
       private final DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("DefaultConsumer");
    
    
       @PostConstruct
       public void start() {
           try {
               consumer.setNamesrvAddr(namesrvAddr);
    
               //从消息队列头部开始消费
               consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
               //设置集群消费模式
               consumer.setMessageModel(MessageModel.CLUSTERING);
    
               //设置消费超时时间(分钟)
               consumer.setConsumeTimeout(RocketMQConstant.CONSUMER_TIMEOUT_MINUTES);
    
               //订阅主题
               consumer.subscribe("DefaultCluster", "*");
    
               //注册消息监听器
               consumer.registerMessageListener(this);
    
               //启动消费端
               consumer.start();
    
               log.info("Message Consumer Start...");
               System.err.println("Message Consumer Start...");
           } catch (MQClientException e) {
               log.error("Message Consumer Start Error!!",e);
           }
    
       }
    
       @Override
       public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
           if (CollectionUtils.isEmpty(msgs)) {
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
           }
    
           MessageExt message = msgs.get(0);
           try {
               //逐条消费
               String messageBody = new String(message.getBody(), RemotingHelper.DEFAULT_CHARSET);
               System.err.println("Message Consumer: Handle New Message: messageId: " + message.getMsgId() + ",topic: " +
                       message.getTopic() + ",tags: " + message.getTags() + ",messageBody: " + messageBody);
    
               //模拟耗时操作2分钟,大于设置的消费超时时间
               Thread.sleep(1000L * 60 * 2);
               return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
           } catch (Exception e) {
               log.error("Consume Message Error!!", e);
               return ConsumeConcurrentlyStatus.RECONSUME_LATER;
           }
       }
    
    }
    
    

    可以看到, Thread.sleep耗时超过最大超时时间,触发失败

    参考

    RocketMQ:死信队列和消息幂等
    RocketMQ详解(12)——RocketMQ的重试机制

    展开全文
  • RabbitMQ的延时重试队列

    千次阅读 2021-11-04 17:18:06
    这就可以使用延时重试队列,本文将介绍如何实现延时重试队列。 2.原理 图是俺在网上找的,请原作者谅解。 发送到业务队里 如果正常收到 正常运行 如果处理失败 重试 并投入延时队列 如果超过延时时间 重新投入...

    在这里插入图片描述

    1.背景

    通过上文学习知道了死信队列,如果只是网络抖动,出现异常那么直接进入死信队列,那么是不合理的。这就可以使用延时重试队列,本文将介绍如何实现延时重试队列。

    2.原理

    在这里插入图片描述

    图是俺在网上找的,请原作者谅解。

    1. 发送到业务队里 如果正常收到 正常运行
    2. 如果处理失败 重试 并投入延时队列 如果超过延时时间 重新投入业务队列
    3. 如果重试次数大于3 那么进入死信队列

    3.代码实现

    1.业务队列

    这里声明业务队列与绑定关系。

    @Configuration
    public class BusinessConfig {
    
        /**
         * yewu1模块direct交换机的名字
         */
        public static final String YEWU1_EXCHANGE = "yewu1_direct_exchange";
    
        /**
         * demo业务的队列名称
         */
        public static final String YEWU1_DEMO_QUEUE = "yewu1_demo_queue";
    
        /**
         * demo业务的routekey
         */
        public static final String YEWU1_DEMO_ROUTINGKEY = "yewu1_demo_key";
    
        /**
         * 业务交换机交换机(一个项目一个业务交换机即可)
         * 1.定义direct exchange,绑定queueTest
         * 2.durable="true" rabbitmq重启的时候不需要创建新的交换机
         * 3.direct交换器相对来说比较简单,匹配规则为:如果路由键匹配,消息就被投送到相关的队列
         * fanout交换器中没有路由键的概念,他会把消息发送到所有绑定在此交换器上面的队列中。
         * topic交换器你采用模糊匹配路由键的原则进行转发消息到队列中
         */
        @Bean
        public DirectExchange yewu1Exchange() {
            DirectExchange directExchange = new DirectExchange(YEWU1_EXCHANGE, true, false);
            return directExchange;
        }
    
        /**
         * 新建队列(一个业务需要一个队列一个routekey 命名格式 项目名-业务名)
         * 1.队列名称
         * 2.durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
         * 3.exclusive 表示该消息队列是否只在当前connection生效,默认是false
         * 4.auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
         * 5.对nack或者发送超时的 发送给死信队列 args是绑定死信队列
         *
         */
        @Bean
        public Queue yewu1DemoQueue() {
            return new Queue(YEWU1_DEMO_QUEUE, true, false, false);
        }
    
        /**
         * 交换机与routekey绑定
         * 
         * @return
         */
        @Bean
        public Binding yewu1DemoBinding() {
            return BindingBuilder.bind(yewu1DemoQueue()).to(yewu1Exchange())
                .with(YEWU1_DEMO_ROUTINGKEY);
        }
    }
    

    2.延时队列

    声明延时队列与绑定关系。

    @Configuration
    public class RetryConfig {
    
        /**
         * 延时队列 交换机配置标识符(固定)
         */
        public static final String RETRY_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
    
        /**
         * 延时队列交换机绑定配置键标识符(固定)
         */
        public static final String RETRY_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";
    
        /**
         * 延时队列消息的配置超时时间枚举(固定)
         */
        public static final String RETRY_MESSAGE_TTL = "x-message-ttl";
    
        /**
         * yewu1模块延时队列交换机
         */
        public final static String YEWU1_RETRY_EXCHANGE_NAME = "yewu1_retry_exchange";
    
        /**
         * yewu1模块DEMO业务延时队列
         */
        public final static String YEWU1_DEMO_RETRY_QUEUE_NAME = "yewu1_demo_retry_queue";
    
        /**
         * yewu1模块DEMO延时队列routekey
         */
        public final static String YEWU1_DEMO_RETRY_ROUTING_KEY = "yewu1_demo_retry_key";
    
        /**
         * 延时队列交换机
         *
         * @return
         */
        @Bean
        public DirectExchange yewu1RetryExchange() {
            DirectExchange directExchange = new DirectExchange(YEWU1_RETRY_EXCHANGE_NAME, true, false);
            return directExchange;
        }
    
        /**
         * 新建延时队列 一个业务队列需要一个延时队列
         * 
         * @return
         */
        @Bean
        public Queue yewu1DemoRetryQueue() {
            Map<String, Object> args = new ConcurrentHashMap<>(3);
            // 将消息重新投递到业务交换机Exchange中
            args.put(RETRY_LETTER_QUEUE_KEY, BusinessConfig.YEWU1_EXCHANGE);
            args.put(RETRY_LETTER_ROUTING_KEY, BusinessConfig.YEWU1_DEMO_ROUTINGKEY);
            // 消息在队列中延迟3s后超时,消息会重新投递到x-dead-letter-exchage对应的队列中,routingkey为自己指定
            args.put(RETRY_MESSAGE_TTL, 3 * 1000);
            return new Queue(YEWU1_DEMO_RETRY_QUEUE_NAME, true, false, false, args);
        }
    
        /**
         * 绑定以上定义关系
         * 
         * @return
         */
        @Bean
        public Binding retryDirectBinding() {
            return BindingBuilder.bind(yewu1DemoRetryQueue()).to(yewu1RetryExchange())
                .with(YEWU1_DEMO_RETRY_ROUTING_KEY);
        }
    
    }
    

    3.死信队列

    声明私信队列与绑定关系。

    @Configuration
    public class DeadConfig {
    
        /**
         * 死信队列
         */
        public final static String FAIL_QUEUE_NAME = "fail_queue";
    
        /**
         * 死信交换机
         */
        public final static String FAIL_EXCHANGE_NAME = "fail_exchange";
    
        /**
         * 死信routing
         */
        public final static String FAIL_ROUTING_KEY = "fail_routing";
    
        /**
         * 创建配置死信队列
         *
         */
        @Bean
        public Queue deadQueue() {
            return new Queue(FAIL_QUEUE_NAME, true, false, false);
        }
    
        /**
         * 死信交换机
         *
         * @return
         */
        @Bean
        public DirectExchange deadExchange() {
            DirectExchange directExchange = new DirectExchange(FAIL_EXCHANGE_NAME, true, false);
            return directExchange;
        }
    
        /**
         * 绑定关系
         * 
         * @return
         */
        @Bean
        public Binding failBinding() {
            return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(FAIL_ROUTING_KEY);
        }
    
    }
    

    4.生产者

    生产者如上文,通用代码。

    @RestController
    @RequestMapping("/TestRabbit")
    public class ProducerDemo {
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        //@RequestMapping("/sendDirect")
        String sendDirect(@RequestBody String message) throws Exception {
            System.out.println("开始生产");
            CorrelationData data = new CorrelationData(UUID.randomUUID().toString());
            rabbitTemplate.convertAndSend(BusinessConfig.YEWU1_EXCHANGE, BusinessConfig.YEWU1_DEMO_ROUTINGKEY,
                message, data);
            System.out.println("结束生产");
            System.out.println("发送id:" + data);
            return "OK,sendDirect:" + message;
        }
    }
    

    5.消费者

    大量的逻辑,请参考注释。

    public enum RabbitEnum {
     
        /**
         * 处理成功
         */
        ACCEPT,
     
        /**
         * 可以重试的错误
         */
        RETRY,
     
        /**
         * 无需重试的错误
         */
        REJECT
    @Component
    public class ConsumerDemo {
    
        private final static Logger logger = LoggerFactory.getLogger(ConsumerDemo.class);
    
        @Resource
        private RabbitTemplate rabbitTemplate;
    
        // @RabbitListener(queues = "yewu1_demo_queue")
        protected void consumer(Message message, Channel channel) throws Exception {
            RabbitEnum ackSign = RabbitEnum.RETRY;
            System.out.println(message.getMessageProperties().getCorrelationId());
            try {
                // 可以加入重复消费判断
                int i = 1 / 0;
    
            } catch (Exception e) {
                ackSign = RabbitEnum.RETRY;
                throw e;
            } finally {
                // 通过finally块来保证Ack/Nack会且只会执行一次
                if (ackSign == RabbitEnum.ACCEPT) {
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                } else if (ackSign == RabbitEnum.RETRY) {
                    String correlationData =
                        (String)message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
                    System.out.println(message.getMessageProperties().getCorrelationId());
                    long retryCount = getRetryCount(message.getMessageProperties());
                    if (retryCount >= 3) {
                        // 重试次数超过3次,则将消息发送到失败队列等待特定消费者处理或者人工处理
                        try {
                            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                            rabbitTemplate.convertAndSend(DeadConfig.FAIL_EXCHANGE_NAME, DeadConfig.FAIL_ROUTING_KEY,
                                message, new CorrelationData(correlationData));
                            logger.info("连续失败三次,将消息发送到死信队列,发送消息:" + new String(message.getBody()));
                        } catch (Exception e1) {
                            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                            logger.error("发送死信队列报错:" + e1.getMessage() + ",原始消息:" + new String(message.getBody()));
                        }
                    } else {
                        try {
                            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                            // 重试次数不超过3次,则将消息发送到重试队列等待重新被消费
                            rabbitTemplate.convertAndSend(RetryConfig.YEWU1_RETRY_EXCHANGE_NAME,
                                RetryConfig.YEWU1_DEMO_RETRY_ROUTING_KEY, message,
                                new CorrelationData(correlationData));
                            logger.info("消费失败,消息发送到重试队列;" + "原始消息:" + new String(message.getBody()) + ";第"
                                + (retryCount + 1) + "次重试");
                        } catch (Exception e1) {
                            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                            logger.error("消息发送到重试队列的时候,异常了:" + e1.getMessage() + ",重新发送消息");
                        }
                    }
                }
            }
        }
    
      
    
        /**
         * 获取消息被重试的次数
         */
        public long getRetryCount(MessageProperties messageProperties) {
            Long retryCount = 0L;
            if (null != messageProperties) {
                List<Map<String, ?>> deaths = messageProperties.getXDeathHeader();
                if (deaths != null && deaths.size() > 0) {
                    Map<String, Object> death = (Map<String, Object>)deaths.get(0);
                    retryCount = (Long)death.get("count");
                }
            }
            return retryCount;
        }
    }
    

    参考:https://www.cnblogs.com/mfrank/p/11260355.html

    展开全文
  • 【SpringBoot】spring-retry(重试机制)

    千次阅读 2020-11-26 18:51:07
    在 Spring Retry需要指定触发重试的异常类型,并设置每次重试的间隔以及如果重试失败是继续重试还是熔断(停止重试)。 对于重试是有场景限制的,不是什么场景都适合重试,比如参数校验不合法、写操作等(要考虑写...

    retry: 英/ˌriːˈtraɪ//ˌriːˈtraɪ/

    概述

    github地址

    官网地址

    • 在调用第三方接口或者使用Mq时,会出现网络抖动,连接超时等网络异常,所以需要重试。

      网络抖动:标识一个网络的稳定性。抖动越小,网络越稳定。

    • Spring Retry是从Spring Batch 2.2.0版本独立出来的一个功能,主要实现了重试和熔断

    • 在 Spring Retry需要指定触发重试的异常类型,并设置每次重试的间隔以及如果重试失败是继续重试还是熔断(停止重试)。

      对于重试是有场景限制的,不是什么场景都适合重试,比如参数校验不合法、写操作等(要考虑写是否幂等)都不适合重试。远程调用超时、网络突然中断可以重试

      • 在微服务治理框架中,通常都有自己的重试与超时配置,比如dubbo可以设置retries=1,timeout=500调用失败只重试1次,超过500ms调用仍未返回则调用失败。

    一.简单实现重试

    1. 异常捕获
    2. 循环重试(包括:重试次数,重试间隔)
    
    public class TestRetry {
        //最大重试次数
        private static final Integer tryTimes = 6;
        //重试间隔时间单位秒
        private static final Integer intervalTime = 1;
     
        public static void main(String[] args) throws InterruptedException {
            boolean flag = TestRetry.retryBuss();
            System.out.println("最终执行结果:" + (flag ? "成功" : "失败"));
        }
     
        public static boolean retryBuss() throws InterruptedException {
            Integer retryNum = 1;
            boolean flag = false;
            while (retryNum <= tryTimes) {
                try {
                    flag = execute(retryNum);
                    if (flag) {
                        System.out.println("第" + retryNum + "次执行成功!!!");
                        break;
                    }
                    System.err.println("第" + retryNum + "次执行失败...");
                    retryNum++;
                } catch (Exception e) {
                    retryNum++;
                    TimeUnit.SECONDS.sleep(intervalTime);
                    continue;
                }
            }
     
            return flag;
        }
     
        /**
         * 具体业务
         * @param retryNum
         * @return
         */
        private static boolean execute(int retryNum) {
            Random random = new Random();
            int a = random.nextInt(10);
            boolean flag = true;
            try {
                if (a != 6) {
                    flag = false;
                    throw new RuntimeException();
                }
            } catch (Exception e) {//这里捕获异常只是为了能,返回flag的结果
            }
            return flag;
        }
    } 
    

    失败情况
    在这里插入图片描述

    成功情况
    在这里插入图片描述

    二.声明式使用Spring-Retry

    2.1.如何使用

    1.引入依赖

    		<dependency>
                <groupId>org.springframework.retry</groupId>
                <artifactId>spring-retry</artifactId>
            </dependency>
            <dependency><!--如果其他的依赖已经引入了,可以不加-->
                <groupId>org.aspectj</groupId>
                <artifactId>aspectjweaver</artifactId>
            </dependency>
    

    2.配置类

    • 要使用@EnabelRetry开启重试才行,写在配置类或者启动类上都是可以的
    @Configuration
    @EnableRetry
    public class RetryConfiguration {
        @Bean
        public PayService payService() {
            return new PayService();
        }
    }
    

    3.实现类

    • 服务类一般写主要逻辑,在需要重试的方法上面使用@Retryable
    //@Service
    @Slf4j
    public class PayService {
        private final int totalNum = 100000;
        
        @Retryable(value = {Exception.class}, maxAttempts = 3, backoff = @Backoff(delay = 2000, multiplier = 1.5))
        public int minGoodsNum(int num) {
            log.info("减库存开始=>" + LocalTime.now());
            log.info("库存=>" + totalNum);
            if (num <= 0) {
                throw new IllegalArgumentException("数量不对");
            }
            log.info("减库存执行结束=>" + LocalTime.now());
            return totalNum - num;
        }
    
        /**
         * 使用@Recover注解,当重试次数达到设置的次数的时候,还是失败抛出异常,执行的回调函数。
         */
        @Recover
        public int recover(Exception e) {
            log.warn("减库存失败!!!" + LocalTime.now());
            //记日志到数据库
            return totalNum;
        }
    }
    

    测试代码

    @RunWith(SpringRunner.class)
    @SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
    public class RetryTest {
        @Autowired
        private PayService payService;
    
        /**
         * 应该使用 org.junit.Test 才是正确的
         */
        @Test
        public void minGoodsNum() {
            payService.minGoodsNum(-1);
        }
    }
    
    

    结果
    在这里插入图片描述

    @Retryable修饰的minGoodsNum()方法,如果调用期间报了异常,那么它将进行重试3次(默认三次),如果超过最大重试次数,则执行@Retryable修饰的recover()方法,

    @Retryable注解有各种配置,用于包含和排除异常类型、限制重试次数和回退策略。

    2.2.常用注解

    @EnableRetry:启用重试,proxyTargetClass属性为true时(默认false),使用CGLIB代理

    @Retryable:标记当前方法会使用重试机制

    • value:指定抛出那些异常才会触发重试(可以配置多个异常类型) 默认为空
    • include:就是value默认为空,当exclude也为空时,默认所有异常都可以触发重试
    • exclude:指定那些异常不触发重试(可以配置多个异常类型),默认为空
    • maxAttempts:最大重试次数,默认3次包括第一次调用)
    • backoff:重试等待策略 默认使用@Backoff注解
      在这里插入图片描述

    @Backoff:重试回退策略(立即重试还是等待一会再重试)

    • value: 重试的间隔时间默认为1000L,我们设置为2000L
    • delay:重试的间隔时间,就是value
    • maxDelay:重试次数之间的最大时间间隔,默认为0,如果小于delay的设置,则默认为30000L
    • multiplier:delay时间的间隔倍数,默认为0,表示固定暂停1秒后进行重试,如果把multiplier设置为1.5,则第一次重试为2秒,第二次为3秒,第三次为4.5秒。
      .
    1. 不设置参数时,默认使用FixedBackOffPolicy(固定时间等待策略),重试等待1000ms
    2. 只设置delay时,使用FixedBackOffPolicy,重试等待指定的毫秒数
    3. 当设置delay和maxDealy时,重试等待在这两个值之间均态分布
    4. 设置delay,maxDealy和multiplier时,使用ExponentialBackOffPolicy(倍数等待策略)
    5. 当设置multiplier不等于0时,同时也设置了random时,使用ExponentialRandomBackOffPolicy(随机倍数等待策略)

    @Recover标记方法为@Retryable失败时的“兜底”处理方法

    • 传参与@Retryable的配置的value必须一样
    • @Recover标记方法必须要与@Retryable注解的方法“形参”保持一致第一入参为要重试的异常(一定要是@Retryable方法里抛出的异常或者异常父类)其他参数与@Retryable保持一致,返回值也要一样,否则无法执行!

    @CircuitBreaker:用于标记方法,实现熔断模式

    • include 指定处理的异常类。默认为空
    • exclude指定不需要处理的异常。默认为空
    • vaue指定要重试的异常。默认为空
    • maxAttempts 最大重试次数。默认3次
    • openTimeout 配置熔断器打开的超时时间,默认5s,当超过openTimeout之后熔断器电路变成半打开状态(只要有一次重试成功,则闭合电路)
    • resetTimeout 配置熔断器重新闭合的超时时间,默认20s,超过这个时间断路器关闭

    2.3.注意事项

    • 使用了@Retryable注解的方法直接实例化调用不会触发重试,要先将实现类实例化到Spring容器中,然后通过注入等方式使用

    • Spring-Retry是通过捕获异常的方式来触发重试的,@Retryable标注方法产生的异常不能使用try-catch捕获,要在方法上抛出异常,不然不会触发重试

    重试原则

    1. 查询可以进行重试
    2. 写操作要慎重,除非业务方支持重入

    三.编程式使用Spring-Retry

    3.1.核心类

    • RetryOperations : 定义了“重试”的基本框架(模板),要求传入RetryCallback,可选传入RecoveryCallback;
    • RetryCallback: 封装你需要重试的业务逻辑;
    • RecoverCallback:封装在多次重试都失败后"兜底"的业务逻辑;
    • RetryTemplate: RetryOperations的具体实现,组合了RetryListener[],BackOffPolicy,RetryPolicy。
    • RetryContext: 重试下的上下文,可用于在多次Retry或者Retry 和Recover之间传递参数或状态;
    • RetryPolicy : 重试的策略,可以固定次数的重试,也可以是指定超时时间进行重试;
    • BackOffPolicy: 重试的等待策略,在业务逻辑执行发生异常时。如果需要重试,我们可能需要等一段时间(可能服务器过于繁忙,如果一直不间隔重试可能拖垮服务器),当然这段时间可以是 0,也可以是固定的,可以是随机的
    • RetryListener:典型的“监听者”,在重试的不同阶段通知“监听者”;

    3.2.RetryOperations

    RetryOperations是重试的顶级接口:

    • RetryOperations: 统一定义了重试的API( 定义了实现“重试”的基本框架(模板))
    • RetryTemplate: 是RetryOperations模板模式实现,实现了重试和熔断etryTemplate将重试、提供健壮和不易出错的API供大家使用。

    提供的API如下:
    在这里插入图片描述

    public interface RetryOperations {
        <T, E extends Throwable> T execute(RetryCallback<T, E> var1) throws E;
    
        <T, E extends Throwable> T execute(RetryCallback<T, E> var1, RecoveryCallback<T> var2) throws E;
    
        <T, E extends Throwable> T execute(RetryCallback<T, E> var1, RetryState var2) throws E, ExhaustedRetryException;
    
        <T, E extends Throwable> T execute(RetryCallback<T, E> var1, RecoveryCallback<T> var2, RetryState var3) throws E;
    }
    
    • RetryCallback: 编写需要执行重试的业务逻辑,定义好业务逻辑后,就是如何重试的问题了。
    • RetryTemplate: 通过设置不同的重试策略来控制如何进行重试。默认的重试策略是SimpleRetryPlicy(会重试3次)

      第1次重试如果成功后面就不会继续重试了。如果3次都重试失败了流程结束或者返回兜底结果。而返回兜底结果需要配置RecoveyCallBack

    • RecoveyCallBack:从名字可以看出这是一个兜底回调接口,也就是重试失败后执行的逻辑。

      当重试超过最大重试时间或最大重试次数后可以调用RecoveryCallback进行恢复,比如返回假数据或托底数据。

    3.3.RetryPolicy(重试策略)

    RetryPolicy: 重试策略的顶级接口
    在这里插入图片描述

    public interface RetryPolicy extends Serializable {
        boolean canRetry(RetryContext var1);
    
        RetryContext open(RetryContext var1);
    
        void close(RetryContext var1);
    
        void registerThrowable(RetryContext var1, Throwable var2);
    }
    

    方法说明:

    • canRetry:在每次重试的时候调用,是否可以继续重试的判断条件
    • open重试开始前调用,会创建一个重试上下文到RetryContext保存重试的堆栈等信息
    • registerThrowable每次重试异常时调用(有异常会继续重试)

    以 SimpleRetryPolicy为例,当重试次数达到3(默认3次)停止重试,重试次数保存在重试上下文中

    RetryPolicy提供了如下策略实现:

    • NeverRetryPolicy:只允许调用RetryCallback一次,不允许重试
    • AlwaysRetryPolicy:允许无限重试,直到成功,此方式逻辑不当会导致死循环
    • SimpleRetryPolicy固定次数重试策略,默认重试最大次数为3次RetryTemplate默认使用的策略
    • TimeoutRetryPolicy超时时间重试策略,默认超时时间为1秒,在指定的超时时间内允许重试
    • ExceptionClassifierRetryPolicy:设置不同异常的重试策略,类似组合重试策略,区别在于这里只区分不同异常的重试
    • CircuitBreakerRetryPolicy:有熔断功能的重试策略,需设置3个参数openTimeout、resetTimeout和delegate

      在这里插入图片描述

      • delegate:真正执行的重试策略,由构造方法传入,当重试失败时,则执行熔断策略,默认SimpleRetryPolicy策略
      • openTimeout:openWindow,熔断器电路打开的超时时间,当超过openTimeout之后熔断器电路变成半打开状态(主要有一次重试成功,则闭合电路),默认5000毫秒
      • resetTimeout:timeout,重置熔断器重新闭合的超时时间。默认20000毫秒
    • CompositeRetryPolicy组合重试策略,有两种组合方式

      乐观组合重试策略:指只要有一个策略允许重试即可以
      悲观组合重试策略:指只要有一个策略不允许重试即可以

      • 但不管哪种组合方式,组合中的每一个策略都会执行

    3.4.BackOffPolicy(重试回退(等待)策略)

    回退策略: 指的是每次重试是立即重试还是等待一段时间后重试默认情况下是立即重试,如果需要配置等待一段时间后重试则需要指定回退策略

    比如是网络错误,立即重试将导致立即失败,最好等待一小段时间后重试,还要防止很多服务同时重试导致DDos。

    • BackOffPolicy是回退策略的顶级接口

    BackOffPolicy 提供了如下策略实现
    在这里插入图片描述

    • NoBackOffPolicy无等待策略,每次重试时立即重试

    • FixedBackOffPolicy固定时间的等待策略

      需设置参数sleeper和backOffPeriod

      • sleeper指定等待策略,默认是Thread.sleep,即线程休眠
      • backOffPeriod指定休眠时间,默认1000毫秒
    • UniformRandomBackOffPolicy随机时间回退策略

      需设置sleeper、minBackOffPeriod和maxBackOffPeriod
      该策略在[minBackOffPeriod,maxBackOffPeriod之间取一个随机休眠时间。

      • sleeper指定等待策略,默认是Thread.sleep,即线程休眠
      • minBackOffPeriod 默认500毫秒
      • maxBackOffPeriod 默认1500毫秒
    • ExponentialBackOffPolicy倍数等待策略

      需设置参数sleeper、initialInterval、maxInterval和multiplier

      • sleeper指定等待策略,默认是Thread.sleep,即线程休眠
      • initialInterval指定初始休眠时间,默认100毫秒
      • maxInterval指定最大休眠时间,默认30秒
      • multiplier指定乘数,即下一次休眠时间为 当前休眠时间*multiplier,之前说过固定倍数可能会引起很多服务同时重试导致DDos,使用随机休眠时间来避免这种情况。
    • ExponentialRandomBackOffPolicy随机倍数等待策略,引入随机乘数可以实现随机乘数回退

    3.5.RetryTemplate主要流程实现源码

    RetryTemplate类是对 RetryOperations的具体实现,组合了RetryListener[],BackOffPolicy,RetryPolicy

    3.5.1.RetryTemplate.doExecute()方法

    protected <T, E extends Throwable> T doExecute(RetryCallback<T, E> retryCallback,
          RecoveryCallback<T> recoveryCallback, RetryState state)
          throws E, ExhaustedRetryException {
       //重试策略
       RetryPolicy retryPolicy = this.retryPolicy;
       //退避策略
       BackOffPolicy backOffPolicy = this.backOffPolicy;
       //重试上下文,当前重试次数等都记录在上下文中
       RetryContext context = open(retryPolicy, state);
       try {
          //拦截器模式,执行RetryListener#open
          boolean running = doOpenInterceptors(retryCallback, context);
          //判断是否可以重试执行
          while (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
             try {//执行RetryCallback回调
                return retryCallback.doWithRetry(context);
             } catch (Throwable e) {//异常时,要进行下一次重试准备
                //遇到异常后,注册该异常的失败次数
                registerThrowable(retryPolicy, state, context, e);
                //执行RetryListener#onError
                doOnErrorInterceptors(retryCallback, context, e);
                //如果可以重试,执行退避算法,比如休眠一小段时间后再重试
                if (canRetry(retryPolicy, context) && !context.isExhaustedOnly()) {
                   backOffPolicy.backOff(backOffContext);
                }
                //在有状态重试时,如果是需要执行回滚操作的异常,则立即抛出异常
                //shouldRethrow方法只有一行代码: state != null && state.rollbackFor(context.getLastThrowable())
                if (shouldRethrow(retryPolicy, context, state)) {
                   throw RetryTemplate.<E>wrapIfNecessary(e);
                }
             }
             //如果是有状态重试,且有GLOBAL_STATE属性,则立即跳出重试终止;当抛出的异常是非需要执行回滚操作的异常时,
             //才会执行到此处,CircuitBreakerRetryPolicy会在此跳出循环;
             if (state != null && context.hasAttribute(GLOBAL_STATE)) {
                break;
             }
          }
          //重试失败后,如果有RecoveryCallback,则执行此回调,否则抛出异常
          return handleRetryExhausted(recoveryCallback, context, state);
       } catch (Throwable e) {
          throw RetryTemplate.<E>wrapIfNecessary(e);
       } finally {
          //清理环境
          close(retryPolicy, context, state, lastException == null || exhausted);
          //执行RetryListener#close,比如统计重试信息
          doCloseInterceptors(retryCallback, context, lastException);
       }
    }
    

    3.5.2.如何使用RetryTemplate

       @Test
        public void testSimple() throws Exception {
            @Data
            class Foo {
                private String id;
            }
    
            RetryTemplate template = new RetryTemplate();
            //超时时间重试策略,默认超时时间为1秒,在指定的超时时间内允许重试
            TimeoutRetryPolicy policy = new TimeoutRetryPolicy();
            policy.setTimeout(3000L);
    
            template.setRetryPolicy(policy);
            Foo result = template.execute(
                    //可能触发重试的业务逻辑
                    new RetryCallback<Foo, Exception>() {
                        @Override
                        public Foo doWithRetry(RetryContext context) {
                            try {
                                System.out.println("调用百度接口。。。。");
                                TimeUnit.MILLISECONDS.sleep(500);
                                throw new RuntimeException("调用百度接口超时");
                            } catch (InterruptedException e) {
                                //e.printStackTrace();
                            }
                            return new Foo();
                        }
    
                    },
                    //重试耗尽后的回调,配置了RecoveryCallback,超出重试次数后不会抛出异常,而是执行回调里的代码
                    new RecoveryCallback<Foo>() {//可以没有RecoveryCallback,超出重试次数后依然会抛出异常
                        @Override
                        public Foo recover(RetryContext context) throws Exception {
                            System.out.println("调用百度接口。。。recover");
                            return new Foo();
                        }
                    });
    
            System.out.println("result=>" + result);
        }
    
    

    我们模拟调用百度并将结果返回给用户,如果该调用失败,则重试该调用,直到达到超时为止。

    如果不使用RecoveryCallback,当重试失败后,当重试失败后,抛出异常
    在这里插入图片描述
    如果使用RecoveryCallback,,当重试失败后,执行RecoveryCallback
    在这里插入图片描述

    3.6.状态重试 OR 无状态重试

    3.6.1.无状态重试

    无状态重试: 是在一个循环中执行完重试策略,即重试上下文保持在一个线程上下文中,在一次调用中进行完整的重试策略判断。

    非常简单的情况,如远程调用某个查询方法时是最常见的无状态重试。 SimpleRetryPolicy就属于无状态重试,因为重试是在一个循环中完成的。

        public static void main(String[] args) {
            RetryTemplate template = new RetryTemplate();
            //重试策略:次数重试策略
            RetryPolicy retryPolicy = new SimpleRetryPolicy(3);
            template.setRetryPolicy(retryPolicy);
    
            //退避策略:倍数回退策略
            ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
            backOffPolicy.setInitialInterval(100);
            backOffPolicy.setMaxInterval(3000);
            backOffPolicy.setMultiplier(2);
            backOffPolicy.setSleeper(new ThreadWaitSleeper());
            template.setBackOffPolicy(backOffPolicy);
    
    
            String resul = template.execute(
                    new RetryCallback<String, RuntimeException>() {
                        @Override
                        public String doWithRetry(RetryContext context) throws RuntimeException {
                            System.out.println("retry count:" + context.getRetryCount());
                            throw new RuntimeException("timeout");
                        }
                    },
                    new RecoveryCallback<String>() {
                        @Override
                        public String recover(RetryContext context) throws Exception {
                            return "default";
                        }
                    });
            System.out.println("result=>" + result);
        }
    

    在这里插入图片描述

    3.6.2.有状态重试(回滚/熔断)

    有状态重试:就是不在一个线程上下文完成重试,有2种场景需要使用有状态重试,事务操作需要回滚或者熔断器模式

    • 在事务操作需要回滚场景时,当整个操作中抛出的是数据库异常DataAccessException,则不能进行重试需要回滚,而抛出其他异常则可以进行重试,可以通过RetryState实现:
     @Test
        public void retryState () {
            RetryTemplate template = new RetryTemplate();
            //重试策略:次数重试策略
            RetryPolicy retryPolicy = new SimpleRetryPolicy(3);
            template.setRetryPolicy(retryPolicy);
    
            //当前状态的名称,当把状态放入缓存时,通过该key查询获取
            Object key = "mykey";
            //是否每次都重新生成上下文还是从缓存中查询,即全局模式(如熔断器策略时从缓存中查询)
            boolean isForceRefresh = true;
            //对DataAccessException进行回滚
            BinaryExceptionClassifier rollbackClassifier = new BinaryExceptionClassifier(Collections.<Class<? extends Throwable>>singleton(DataAccessException.class));
    
            RetryState state = new DefaultRetryState(key, isForceRefresh, rollbackClassifier);
    
            String result = template.execute(
                    new RetryCallback<String, RuntimeException>() {
                        @Override
                        public String doWithRetry(RetryContext context) throws RuntimeException {
                            System.out.println("retry count:" + context.getRetryCount());
                            throw new TypeMismatchDataAccessException("");
                        }
                    },
                    new RecoveryCallback<String>() {
                        @Override
                        public String recover(RetryContext context) throws Exception {
                            System.out.println("recovery count:" + context.getRetryCount());
                            return "default";
                        }
                    }, state);
        }
    

    执行结果:报异常,没有进RecoveryCallback中

    在这里插入图片描述
    RetryTemplate中在有状态重试时,执行RecoveryCallback报异常

    (截图为 RetryTemplate主要流程实现doExecute方法的源码)
    在这里插入图片描述

    什么是熔断?

    • 熔断的意思是指 不在当前循环中处理重试,而是全局重试模式(不是线程上下文)。
    • 熔断会跳出循环,那么必然会丢失线程上下文的堆栈信息。那么肯定需要一种“全局模式”保存这种信息,目前的实现放在一个缓存map实现的,下次从缓存中获取就能继续重试了。

    熔断器场景

    • 在有状态重试时,且是全局模式时,跳出重试循环

    测试代码

    @Test
        public void circuitBreakerRetryPolicy () {
            RetryTemplate template = new RetryTemplate();
            //传入RetryPolicy(每个RetryPolicy实现都有自己的重试策略实现),是真正判断是否重试的策略,当重试失败时,则执行熔断策略;
            CircuitBreakerRetryPolicy retryPolicy = new CircuitBreakerRetryPolicy(new SimpleRetryPolicy(3));
            //熔断器电路打开的超时时间
            retryPolicy.setOpenTimeout(5000);
            //重置熔断器重新闭合的超时时间
            retryPolicy.setResetTimeout(20000);
            template.setRetryPolicy(retryPolicy);
    
            for (int i = 0; i < 10; i++) {
                try {
                    Object key = "circuit";
                    boolean isForceRefresh = false;
                    RetryState state = new DefaultRetryState(key, isForceRefresh);
    
                    String result = template.execute(
                    		//重试逻辑
                            new RetryCallback<String, RuntimeException>() {
                                @Override
                                public String doWithRetry(RetryContext context) throws RuntimeException {
                                    System.out.println("retry count:" + context.getRetryCount());
                                    throw new RuntimeException("timeout");
                                }
                            },
                            //重试失败兜底
                            new RecoveryCallback<String>() {
                                @Override
                                public String recover(RetryContext context) throws Exception {
                                    return "default";
                                }
                            }, state);
                    System.out.println(result);
                } catch (Exception e) {
                    System.out.println("catch=>" + e.getMessage());
                }
            }
        }
    

    执行结果
    在这里插入图片描述
    为什么说是全局模式呢?

    • 我们配置了isForceRefresh为false,则在获取上下文时是根据key “circuit”从缓存中获取,从而拿到同一个上下文
      在这里插入图片描述
      如下RetryTemplate源码说明在有状态模式下,不会在循环中进行重试,会跳出循环
      在这里插入图片描述

    熔断器策略CircuitBreakerRetryPolicy需要配置三个参数:

    • delegate:当前重试策略,由构造方法传入,当重试失败时,则执行熔断策略
    • openTimeout:熔断器电`路打开的超时时间,当超过openTimeout之后熔断器电路变成半打开状态(主要有一次重试成功,则闭合电路)

      源码的openWindow属性

    • resetTimeout:重置熔断器重新闭合的超时时间。

      源码的timeOut属性

    CircuitBreakerRetryPolicy.isOpen()源码

    public boolean isOpen() {
       long time = System.currentTimeMillis() - this.start;
       boolean retryable = this.policy.canRetry(this.context);
       if (!retryable) {//重试失败
          //在重置熔断器超时后,熔断器器电路闭合,重置上下文
          if (time > this.timeout) {
             this.context = createDelegateContext(policy, getParent());
             this.start = System.currentTimeMillis();
             retryable = this.policy.canRetry(this.context);
          } else if (time < this.openWindow) {
             //当在熔断器打开状态时,熔断器电路打开,立即熔断
             if ((Boolean) getAttribute(CIRCUIT_OPEN) == false) {
                setAttribute(CIRCUIT_OPEN, true);
             }
             this.start = System.currentTimeMillis();
             return true;
          }
       } else {//重试成功
          //在熔断器电路半打开状态时,断路器电路闭合,重置上下文
          if (time > this.openWindow) {
             this.start = System.currentTimeMillis();
             this.context = createDelegateContext(policy, getParent());
          }
       }
       setAttribute(CIRCUIT_OPEN, !retryable);
       return !retryable;
    }
    

    从如上代码可看出spring-retry的熔断策略相对简单:

    • 当重试失败,且在熔断器打开时间窗口[0,openWindow) 内,立即熔断;
    • 当重试失败,且在指定超时时间后(>timeout),熔断器电路重新闭合;
    • 在熔断器半打开状态[openWindow, timeout] 时,只要重试成功则重置上下文,断路器闭合。

    使用CircuitBreakerRetryPolicy注意事项

    • CircuitBreakerRetryPolicy的delegate(重试策略)应该配置基于次数的SimpleRetryPolicy或者基于超时的TimeoutRetryPolicy策略,且策略都是全局模式,而非局部模式,所以要注意次数或超时的配置合理性

    比如SimpleRetryPolicy配置为3次,openWindow=5s,timeout=20s,我们来CircuitBreakerRetryPolicy的极端情况。

    3.6.3.通过RetryListener实现拦截器模式

    Spring-retry通过RetryListener实现拦截器模式,默认提供了StatisticsListener实现重试操作统计分析数据

    • 要给RetryCallback定义一个name如“method.key”,从而查询该RetryCallback的统计分析数据
        @Test
        public void analyses() {
            RetryTemplate template = new RetryTemplate();
            DefaultStatisticsRepository repository = new DefaultStatisticsRepository();
            StatisticsListener listener = new StatisticsListener(repository);
            template.setListeners(new RetryListener[]{listener});
         
            for (int i = 0; i < 10; i++) {
                String result = template.execute(new RetryCallback<String, RuntimeException>() {
                    @Override
                    public String doWithRetry(RetryContext context) throws RuntimeException {
                        context.setAttribute(RetryContext.NAME, "method.key");
                        return "ok";
                    }
                });
            }
            
            RetryStatistics statistics = repository.findOne("method.key");
            System.out.println(statistics);
        }
    

    执行结果
    在这里插入图片描述

    3.7.案例实践

    场景描述: num作为计数器,如果num小于5则抛出异常,num会进行自增一操作,直到等于5方正常返回,否则根据重试策略进行重试操作,如果直到最后一直未重试成功,则返回Integer最大值。在重试上下文中添加一个value变量,后续通过其值实现根据返回值判断重试应用。最后打印最终的返回值。
    公共方法

    3.7.0.公用代码

    下面案例中调用的run方法就是下面这个

        /**
         * 运行重试方法
         *
         * @param retryTemplate
         * @throws Exception
         */
        public void run(RetryTemplate retryTemplate) throws Exception {
            Integer result = retryTemplate.execute(
                    new RetryCallback<Integer, Exception>() {
                        int i = 0;
    
                        // 重试操作
                        @Override
                        public Integer doWithRetry(RetryContext retryContext) throws Exception {
                            retryContext.setAttribute("value", i);
                            log.info("重试 {} 次.", retryContext.getRetryCount());
                            return checkLen(i++);
                        }
                    },
                    new RecoveryCallback<Integer>() {//兜底回调
                        @Override
                        public Integer recover(RetryContext retryContext) throws Exception {
                            log.info("重试{}次后,调用兜底方法", retryContext.getRetryCount());
                            return Integer.MAX_VALUE;
                        }
    
                    });
    
            log.info("最终结果: {}", result);
        }
    
    
        /**
         * 根据i判断是否抛出异常
         *
         * @param num
         * @return
         * @throws Exception
         */
        public int checkLen(int num) throws Exception {
            //小于5抛出异常
            if (num < 5) throw new Exception(num + " le 5");
            //否则正常返回
            return num;
        }
    

    3.7.1.SimpleRetryPolicy固定次数重试策略

     @Test
        public void retryFixTimes() throws Exception {
            //重试模板
            RetryTemplate retryTemplate = new RetryTemplate();
            //简单重试策略
            SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
            //最大重试次数3次
            simpleRetryPolicy.setMaxAttempts(3);
            //模板设置重试策略
            retryTemplate.setRetryPolicy(simpleRetryPolicy);
            //开始执行- 超过3次最大重试次数,触发了recoveryCall,并返回Integer最大值。
            run(retryTemplate);
        }
    

    执行结果
    在这里插入图片描述
    超过3次最大重试次数,触发了recoveryCall,并返回Integer最大值

    3.7.2.AlwaysRetryPolicy无限重试策略

    AlwaysRetryPolicy 允许无限重试,直到成功,此方式逻辑不当会导致死循环

        @Test
        public void retryAlwaysTimes() throws Exception {
            //重试模板
            RetryTemplate retryTemplate = new RetryTemplate();
            //设置为无限重试策略
            retryTemplate.setRetryPolicy(new AlwaysRetryPolicy());
            //开始执行-直到i等于5则正常返回,之前将实现无限重试。
            run(retryTemplate);
        }
    

    执行结果
    在这里插入图片描述
    直到i等于5则正常返回,之前将实现无限重试。

    3.7.3.TimeoutRetryPolicy超时时间重试策略

    @Test
        public void retryTimeout() throws Exception {
            //重试模板
            RetryTemplate retryTemplate = new RetryTemplate();
    
            TimeoutRetryPolicy timeoutRetryPolicy = new TimeoutRetryPolicy();
            //超时时间为 1000 毫秒
            timeoutRetryPolicy.setTimeout(1000);
    
            //固定时间的回退策略,需设置参数sleeper和backOffPeriod,sleeper指定等待策略,默认是Thread.sleep,即线程休眠,backOffPeriod指定休眠时间,默认1秒,单位毫秒
            FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
            //休眠时间400毫秒
            fixedBackOffPolicy.setBackOffPeriod(400);
    
            //设置为超时时间重试策略
            retryTemplate.setRetryPolicy(timeoutRetryPolicy);
            //设置为固定时间的回退策略
            retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
    
            //开始执行-设定1000ms后则认定为超时,每次重试等待时长400ms,故重试3次后即会超出超时阈值,触发RecoveryCallback回调,并返回Integer最大值。
            run(retryTemplate);
        }
    

    执行策略
    在这里插入图片描述
    设定1000ms后则认定为超时,每次重试等待时长400ms,故重试3次后即会超出超时阈值,触发RecoveryCallback回调,并返回Integer最大值。

    3.7.4.根据返回结果值实现重试

        @Test
        public void retryWithResult() throws Exception {
            //重试模板
            RetryTemplate retryTemplate = new RetryTemplate();
    
            //设置为无限重试策略
            retryTemplate.setRetryPolicy(new AlwaysRetryPolicy() {
                private static final long serialVersionUID = 1213824522266301314L;
    
                @Override
                public boolean canRetry(RetryContext context) {
                    //小于1则重试
                    return context.getAttribute("value") == null || Integer.parseInt(context.getAttribute("value").toString()) < 1;
                }
            });
    
            //开始执行-如果value值小于1或者为null则进行重试,反之不在进行重试,触发RecoveryCallback回调,并返回Integer最大值。
            run(retryTemplate);
        }
    

    在这里插入图片描述
    如果value值小于1或者为null则进行重试,反之不在进行重试,触发RecoveryCallback回调,并返回Integer最大值。

    3.7.5.启用熔断器重试策略

     @Test
        public void retryCircuitBreakerTest() {
            RetryTemplate retryTemplate = new RetryTemplate();
    
            //传入RetryPolicy(每个RetryPolicy实现都有自己的重试策略实现),是真正判断是否重试的策略,当重试失败时,则执行熔断策略;
            CircuitBreakerRetryPolicy retryPolicy = new CircuitBreakerRetryPolicy(new SimpleRetryPolicy(4));
    
            //固定时间等待策略-每次重试等待300毫秒
            FixedBackOffPolicy fixedBackOffPolicy = new FixedBackOffPolicy();
            fixedBackOffPolicy.setBackOffPeriod(300);
    
            // 熔断器电路打开的超时时间
            retryPolicy.setOpenTimeout(1500);
            //重置熔断器重新闭合的超时时间
            retryPolicy.setResetTimeout(2000);
    
            //设置重试策略
            retryTemplate.setRetryPolicy(retryPolicy);
            //设置回退策略
            retryTemplate.setBackOffPolicy(fixedBackOffPolicy);
    
            long startTime = System.currentTimeMillis();
    
            //
            IntStream.range(0, 10).forEach(index -> {
                try {
                    Thread.sleep(100);
                    RetryState state = new DefaultRetryState("circuit", false);
                    String result = retryTemplate.execute(
                            //重试业务逻辑
                            new RetryCallback<String, RuntimeException>() {
                                @Override
                                public String doWithRetry(RetryContext context) throws RuntimeException {
                                    log.info("重试 {} 次", context.getRetryCount());
                                    if (System.currentTimeMillis() - startTime > 1300 && System.currentTimeMillis() - startTime < 1500) {
                                        return "retryCallback-success";
                                    }
                                    throw new RuntimeException("timeout");
                                }
                            },
                            //重试失败回调
                            new RecoveryCallback<String>() {
                                @Override
                                public String recover(RetryContext context) throws Exception {
                                    return "recoveryCallback-default";
                                }
                            }, state);
                    log.info("result: {}", result);
                } catch (Exception e) {
                    log.error("报错了: type:{}:{}", e.getClass().getName(), e.getMessage());
                }
            });
        }
    

    执行结果
    在这里插入图片描述

    设定重试次数为4次,在执行1300ms至1500ms期间连续两次调用成功,无需重试,其后继续抛出异常重试,第4次重试时(1405)仍然在1500ms内,故打开了断路器,后续请求异常均会直接返回 RecoveryCallback中回调定义。

    3.8.spring-retry1.3 以上构建RetryTemplate

    以下代码需要引入spring-retry1.3 以上才能使用,目前仅供参考

    RetryTemplate template = RetryTemplate.builder()
                    .maxAttempts(3)//重试次数
                    .fixedBackoff(1000)//重试时间,单位ms
                    .retryOn(RemoteAccessException.class)//触发重试异常
                    .build();
    
    template.execute(ctx -> {
        // ... do something
    });
    

    1.3开始可以这样构建

    RetryTemplate.builder()
          .maxAttempts(10)
          .exponentialBackoff(100, 2, 10000)
          .retryOn(IOException.class)
          .traversingCauses()
          .build();
     
    RetryTemplate.builder()
          .fixedBackoff(10)
          .withinMillis(3000)
          .build();
     
    RetryTemplate.builder()
          .infiniteRetry()
          .retryOn(IOException.class)
          .uniformRandomBackoff(1000, 3000)
          .build();
    

    3.9.较常见的重试技术实现

    1. Spring Retry重试框架;
    2. Guava Retry重试框架;
      Guava-Retry教程
    3. Spring Cloud 重试配置;

    spring-retry重试与熔断详解—《亿级流量》内容补充

    展开全文
  • RocketMQ的消息重试(消息重投)

    千次阅读 2021-11-04 09:01:41
    详细介绍了RocketMQ的消息重试机制,RocketMQ的消息重试可以分为生产者重试和消费者重试两个部分。
  • 如何优雅地重试

    万次阅读 多人点赞 2021-01-05 10:00:00
    背景在微服务架构中,一个大系统被拆分成多个小服务,小服务之间大量 RPC 调用,经常可能因为网络抖动等原因导致 RPC 调用失败,这时候使用重试机制可以提高请求的最终成功率,减少故障影响...
  • 你可能用错了 kafka 的重试机制

    千次阅读 2021-03-01 17:14:20
    这一过程继续,并增加了一些重试主题和重试消费者,每个重试的延迟越来越多(用作退避策略)。最后,在最终重试消费者无法处理某条消息后,该消息将发布到一个死信队列(Dead Letter Queue,DLQ)中,工程团队将在...
  • 自动化测试框架[Cypress重试机制]

    万次阅读 2020-12-10 17:50:35
    重试机制是Cypress的核心概念,由于如今的web应用几乎都是异步加载的,那么如果断言发生的时候程序尚未更新DOM该如何处理?如果断言发生的时候应用程序正在等待其后端响应而导致页面暂时没有结果该如何处理?如果...
  • Java实现几种简单的重试机制

    千次阅读 2021-02-12 10:47:17
    设计我们的目标是实现一个优雅的重试机制,那么先来看下怎么样才算是优雅无侵入:这个好理解,不改动当前的业务逻辑,对于需要重试的地方,可以很简单的实现可配置:包括重试次数,重试的间隔时间,是否使用异步方式...
  • 对比Daltson和Finchley的基本组件,发现Ribbon还有Hystrix的重试逻辑基本没变,feign编程openfeign之后,增加了个重试逻辑,我们用下面这个图来展示其中的逻辑: 首先搞清楚调用链: 可以总结如下: OpenFeign有...
  • 楔子:翻了帖子两三天,硬是没有找到哪个帖子能证明生产端的消息重试是确实重试了的。大多要么是对概念、源码说明了一下,或者把实现示例贴贴,但基本并没有有效测试证明。想了想,还是自己来捋一捋这 RocketMQ 的...
  • 重试机制 Retry

    千次阅读 2021-08-26 11:24:41
    1、重试 1.1 重试作用 对于重试是有场景限制的,不是什么场景都适合重试,比如参数校验不合法、写操作等(要考虑写是否幂等)都不适合重试。 远程调用超时、网络突然中断可以重试。在微服务治理框架中,通常都有自己...
  • Nginx 失败重试机制(详细)

    千次阅读 2021-01-28 18:41:32
    然而这部分失败重试机制比较复杂且官方文档没详细介绍,本文将对其解析,并配合实际场景例子使之更容易被理解。 基础失败重试 这部分介绍最常见、最基础的失败重试场景。 为了方便理解,使用了以下配置进行分析...
  • RabbitMQ之消息重试机制

    千次阅读 2021-11-05 10:20:26
    RabbitMQ之消息重试机制
  • Istio超时与重试

    千次阅读 2021-07-24 17:07:55
    一、理解超时与重试  超时的原理是,如果程序请求长时间无法返回结果,则需要设置超时机制,超过设置的时间则返回错误的信息;这样做既可以节约等待时消耗的资源,也可以避免由于级联错误引起的一系列问题。设置...
  • 分布式环境下,重试是高可用技术中的一个部分,大家在调用RPC接口或者发送MQ时,针对可能会出现网络抖动请求超时情况采取一下重试操作,自己简单的编写重试大多不够优雅,而重试目前已有很多技术实现和框架支持,但...
  • 聊聊openfeign的超时和重试

    千次阅读 2020-11-12 09:46:00
    openfeign的超时和重试
  • Google Guava Retry 优雅的重试方案

    千次阅读 2021-03-07 13:37:20
    Google Guava Retry 优雅的重试方案前言使用场景什么场景不适合重试了解幂等性一、Guava Retry是什么?与Spring retry比较二、使用步骤1.引入库2.码代码总结导读 前言 使用场景 当需要两个或者多个组件协同工作而...
  • 今天来介绍下Python中从requests请求重试到万能重试装饰器。 从requests请求重试到万能重试装饰器 重试,在编写代码的过程中,是一个很常见的需求。 比如: 请求重试(例如:超时) 文件占用 IO阻塞等待 那么,...
  • RabbitMQ重试机制

    千次阅读 2021-01-23 13:23:31
    1、RabbitMQ重试机制的简介 RabbitMQ 不会为未确认的消息设置过期时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息连接是否已经断开,这个设置的原因是 RabbitMQ 允许消费者消费一条消息的时间...
  • Java之Retry重试机制详解

    千次阅读 2021-02-28 18:53:59
    应用中需要实现一个功能: 需要将常规解决方案try-在包装正常上传逻辑基础上,通过判断返回结果或监听异常决定是否重试,同时为了解决立即重试的无效执行(假设异常是有外部执行不稳定导致的:网络抖动),休眠一定...
  • springboot 配置RabbitMQ 重试机制

    千次阅读 2020-09-26 00:07:22
    springboot集成rabbitMQ中可以直接在配置文件中启用重试机制,被配置重试相关参数,当消费者在处理消息时如果发生异常则该条消息不会被移出消息队列。只有当正常处理后才会将消息移除队列。 生产者 导入依赖 <?...
  • 外部接口调用失败重试

    千次阅读 2021-01-20 10:58:53
    文章目录@[toc]第三方接口调用失败重试规则代码`@MyRetry``MyRetryFactory``MyRetryTemplate``ResponseResult``ThirdCallService``ThirdCallServiceImpl``Tester`结果总结 第三方接口调用失败重试 规则 第三方接口...
  • Python retrying 重试机制

    千次阅读 2021-11-22 18:48:10
    我们在程序开发中,经常会需要请求一些外部的接口资源,而且我们不能保证每次请求一定会成功,所以这些涉及到网络请求的代码片段就需要加上重试机制。下面来说一下Python中的重试方法。 循环加判断 最简单的重试...
  • springcloud-ribbon重试机制详解

    千次阅读 2019-02-21 01:56:35
    一、版本信息 springboot:2.0.5.RELEASE springcloud:Finchley.RELEASE...--重试依赖--&amp;amp;gt; &amp;amp;lt;dependency&amp;amp;gt; &amp;amp;lt;groupId&amp;amp;gt;org.springframewor
  • HttpClient连接池及重试机制

    千次阅读 2019-07-30 16:50:47
    // 设置重试次数 . setRetryHandler ( new DefaultHttpRequestRetryHandler ( ) ) . setServiceUnavailableRetryStrategy ( serviceUnavailableRetryStrategy ) . build ( ) ; return httpClient ; ...
  • 【RocketMQ】重试

    千次阅读 2021-11-29 18:05:08
    消息发送重试二. 消息消费重试三. 重试队列 一. 消息发送重试 生产者在发送消息时,同步消息失败会重投,异步消息有重试,oneway没有任何保证。消息重投保证消息尽可能发送成功、不丢失,但可能会造成消息重复发送...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 938,080
精华内容 375,232
关键字:

重试

友情链接: 抽采.zip