精华内容
下载资源
问答
  • RabbitMQ延时队列原理讲解

    千次阅读 2020-05-10 20:27:43
    RabbitMQ延时消息队列 延时队列介绍 延时队列即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。 那么,为什么需要延迟消费呢?我们来看以下的场景 网上商城下订单后30分钟后没有完成...

    RabbitMQ延时消息队列

    延时队列介绍

    延时队列即放置在该队列里面的消息是不需要立即消费的,而是等待一段时间之后取出消费。

    那么,为什么需要延迟消费呢?我们来看以下的场景

    网上商城下订单后30分钟后没有完成支付,取消订单(如:淘宝、去哪儿网)系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会系统中的业务失败之后,需要重试这些场景都非常常见,我们可以思考,比如第二个需求,系统创建了预约之后,需要在预约时间到达前一小时提醒被预约的双方参会。那么一天之中肯定是会有很多个预约的,时间也是不一定的,假设现在有1点 2点 3点 三个预约,如何让系统知道在当前时间等于0点 1点 2点给用户发送信息呢,是不是需要一个轮询,一直去查看所有的预约,比对当前的系统时间和预约提前一小时的时间是否相等呢?这样做非常浪费资源而且轮询的时间间隔不好控制。如果我们使用延时消息队列呢,我们在创建时把需要通知的预约放入消息中间件中,并且设置该消息的过期时间,等过期时间到达时再取出消费即可。

    Rabbitmq实现延时队列一般而言有两种形式:第一种方式:利用两个特性: Time To Live(TTL)、Dead Letter Exchanges(DLX)[A队列过期->转发给B队列]

    第二种方式:利用rabbitmq中的插件x-delay-message

    TTL DLX实现延时队列

    TTL DLX介绍

    TTL

    RabbitMQ可以针对队列设置x-expires(则队列中所有的消息都有相同的过期时间)或者针对Message设置x-message-ttl(对消息进行单独设置,每条消息TTL可以不同),来控制消息的生存时间,如果超时(两者同时设置以最先到期的时间为准),则消息变为dead letter(死信)

    Dead Letter Exchanges(DLX)RabbitMQ的Queue可以配置x-dead-letter-exchange和x-dead-letter-routing-key(可选)两个参数,如果队列内出现了dead letter,则按照这两个参数重新路由转发到指定的队列。x-dead-letter-exchange:出现dead letter之后将dead letter重新发送到指定exchange

    x-dead-letter-routing-key:出现dead letter之后将dead letter重新按照指定的routing-key发送

     

    展开全文
  • RabbitMq延时队列实现

    2019-11-06 10:07:31
    RabbitMq延时实现原理:rabbitMq没有提供延时队列功能,但是我们可以利用死信队列来实现。首先消息过期会变为死信,死信会被死信交换机,死信路由关键字发送到死信队列,之后再监听死信队列中的消息并进行业务逻辑...

    RabbitMq延时实现原理:rabbitMq没有提供延时队列功能,但是我们可以利用死信队列来实现。首先消息过期会变为死信,死信会被死信交换机,死信路由关键字发送到死信队列,之后再监听死信队列中的消息并进行业务逻辑处理。

    PS: 设置队列过期“x-message-ttl”过期时间为10秒,一旦队列中的消息在10秒内未被消费,就会变为死信。

     

    生产者端代码:

    package com.timothy.demo.controller;
    
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class SendMessageController {
    
        @Autowired
        RabbitTemplate rabbitTemplate;  //使用RabbitTemplate,这提供了接收/发送等等方法
    
        @GetMapping("/sendDelayMessage")
        public String sendDelayMessage() {
            String messageData = "Hello World!";
            rabbitTemplate.convertAndSend("myDelayQueue", messageData);
            return "ok";
        }
    
    }

     生产者端基本配置:

    #spring.rabbitmq.host=localhost
    #spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest

    消费者端代码:

    package com.timothy.demo.config;
    
    import org.springframework.amqp.core.*;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * 延时队列配置
     */
    @Configuration
    public class DelayRabbitConfig {
    
        /*
           设置延时队列
         */
        @Bean
        public Queue delayQueue() {
            return QueueBuilder.durable("myDelayQueue")
                    .withArgument("x-dead-letter-exchange", "myDeadLetterExchange")
                    .withArgument("x-dead-letter-routing-key", "myDeadLetterRoutingKey")
                    .withArgument("x-message-ttl", 10000) // 设置队列中的消息过期时间, 10000, 单位毫秒
                    .build();
        }
    
        @Bean
        public Queue myDeadLetterQueue() {
            return QueueBuilder.durable("myDeadLetterQueue") 
                    .build();
        }
    
        @Bean
        public DirectExchange myDeadLetterExchange() {
            return new DirectExchange("myDeadLetterExchange");
        }
    
        @Bean
        public Binding dlxBinding(Queue myDeadLetterQueue, DirectExchange myDeadLetterExchange) {
            return BindingBuilder.bind(myDeadLetterQueue)  // 使用路由键myDeadLetterRoutingKey,绑定死信队列到死信交换机
                    .to(myDeadLetterExchange)
                    .with("myDeadLetterRoutingKey");
        }
    
    
    }
    

    消费者监听器代码:

    package com.timothy.demo.consumer;
    
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    import java.util.UUID;
    
    @Component
    @RabbitListener(queues = "myDeadLetterQueue")
    public class DelayReceiver {
    
        @RabbitHandler
        public void process(Channel channel, Message message, String content) {
            String uuid = UUID.randomUUID().toString();
            System.out.println(uuid + ": DelayReceiver消费者处理开始");
            System.out.println(uuid + ": DelayReceiver消费者收到消息  : " + content);
            System.out.println(uuid + ", thread name: " + Thread.currentThread().getName()+ ", trigger time =" + new Date());
            System.out.println(uuid + ": DelayReceiver消费者处理结束");
        }
    
    }
    

    消费者端基本配置: 

    #spring.rabbitmq.host=localhost
    #spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.listener.simple.concurrency=3
    spring.rabbitmq.listener.simple.max-concurrency=3
    spring.rabbitmq.publisher-confirms=true
    #spring.rabbitmq.listener.direct.acknowledge-mode=manual 不设置,默认为自动确认AUTO
    #spring.rabbitmq.listener.simple.acknowledge-mode=manual
    展开全文
  • 原理:我们在下单后,往MQ投递一个消息,设置其有效期为30分钟,在不设置对应队列的消费者的情况下,该消息将一直不被消费,那么30分钟后,该消息过期会被投递到死信队列,由死信消费者消费,我们就可以在死信消费者根据...
    • 场景
      订单30分钟未支付,系统自动关闭有哪些实现方案?
      1.基于任务调度实现;
      2.基于redis过期key实现;
      3.基于redis延迟队列;
      4.基于MQ的延迟队列;
    • 本次使用RabbitMQ实现一个小demo
      原理:我们在下单后,往MQ投递一个消息,设置其有效期为30分钟,在不设置对应队列的消费者的情况下,该消息将一直不被消费,那么30分钟后,该消息过期会被投递到死信队列,由死信消费者消费,我们就可以在死信消费者根据订单id执行相应的业务逻辑。
    • application.properties中MQ的配置
    #配置virtual-host虚拟主机
    spring.rabbitmq.virtual-host=test_order_close
    #ip地址
    spring.rabbitmq.host=127.0.0.1
    #用户名  密码
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    #连接端口号
    spring.rabbitmq.port=5672
    
    #死信队列
    test.dlx.exchange=test_order_dlx_exchange
    test.dlx.queue=test_order_dlx_queue
    test.dlx.routingKey=dlx
    
    ##备胎交换机
    test.order.exchange=test_order_exchange
    test.order.queue=test_order_queue
    test.order.routingKey=test.order
    
    • 死信的config类
    @Component
    public class DeadLetterMQConfig {
    
        /**
         * 订单交换机
         */
        @Value("${test.order.exchange}")
        private String orderExchange;
    
        /**
         * 订单队列
         */
        @Value("${test.order.queue}")
        private String orderQueue;
    
        /**
         * 订单路由key
         */
        @Value("${test.order.routingKey}")
        private String orderRoutingKey;
    
        /**
         * 死信交换机
         */
        @Value("${test.dlx.exchange}")
        private String dlxExchange;
    
        /**
         * 死信队列
         */
        @Value("${test.dlx.queue}")
        private String dlxQueue;
        /**
         * 死信路由
         */
        @Value("${test.dlx.routingKey}")
        private String dlxRoutingKey;
    
        /**
         * 声明死信交换机
         */
        @Bean
        public DirectExchange dlxExchange() {
            return new DirectExchange(dlxExchange);
        }
    
        /**
         * 声明死信队列
         */
        @Bean
        public Queue dlxQueue() {
            return new Queue(dlxQueue);
        }
    
        /**
         * 声明订单业务交换机
         */
        @Bean
        public DirectExchange orderExchange() {
            return new DirectExchange(orderExchange);
        }
    
        /**
         * 声明订单队列 核心操作一
         */
        @Bean
        public Queue orderQueue() {
            Map<String, Object> arguments = new HashMap<>(2);
            // 绑定我们的死信交换机
            arguments.put("x-dead-letter-exchange", dlxExchange);
            // 绑定我们的路由key
            arguments.put("x-dead-letter-routing-key", dlxRoutingKey);
            return new Queue(orderQueue, true, false, false, arguments);
        }
    
        /**
         * 绑定订单队列到订单交换机
         */
        @Bean
        public Binding orderBinding() {
            return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(orderRoutingKey);
        }
    
        /**
         * 绑定死信队列到死信交换机
         */
        @Bean
        public Binding binding() {
            return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(dlxRoutingKey);
        }
    }
    
    
    • 测试controller
        @GetMapping("/addOrder")
        public String addOrder(){
            String orderId=System.currentTimeMillis()+"";
            OrderEntity orderEntity=new OrderEntity("订单过期测试",orderId,0);
            int result= orderMapper.addOrder(orderEntity);
            if(result<=0){
                return "fail";
            }
            rabbitTemplate.convertAndSend(orderExchange,orderRoutingKey,orderId,messagePostProcessor());
            return "success";
        }
    
        //处理待发送消息
        private MessagePostProcessor messagePostProcessor(){
            return  new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    //设置有效期20秒
                    message.getMessageProperties().setExpiration("20000");
                    return message;
                }
            };
        }
    
    • 死信消费者
    @Component
    public class OrderDlxConsumer {
    
        @Autowired
        private OrderMapper orderMapper;
    
        //监听死信队列
        @RabbitListener(queues = "test_order_dlx_queue")
        public void orderConsumer(String orderId) {
            System.out.println("死信队列获取消息:" + orderId);
            if (StringUtils.isEmpty(orderId)) {
                return;
            }
            //根据id查询
            OrderEntity orderEntity = orderMapper.getOrder(orderId);
            if (null == orderEntity) {
                return;
            }
            //获取状态
            Integer orderStatus=orderEntity.getOrderStatus();
            //判断未支付 , 关闭订单
            if(0==orderStatus){
                orderMapper.updateStatus(orderId,2);
            }
        }
    }
    
    • 测试
      配置虚拟消息服务器
      在这里插入图片描述
      接口测试工具调用我们写的controller方法
      在这里插入图片描述
      新增成功后数据库表中会新增一条status为0的记录
      在这里插入图片描述
      过期时间到了之后会执行死信消费者的方法
      在这里插入图片描述
      然后执行对应的业务逻辑修改订单的status
      在这里插入图片描述
    • 文末附一个关于死信队列的博客,学习学习。
      【RabbitMQ】一文带你搞定RabbitMQ死信队列
    展开全文
  • 摘要:在阿里云栖开发者沙龙PHP技术专场上,掌阅资深后端工程师、掘金小测《Redis深度历险》作者钱文品为大家介绍了RabbitMQ延时队列和镜像队列的原理与实践,重点比较了RabbitMQ提供的消息可靠与不可靠模式,同时...

     

    摘要:在阿里云栖开发者沙龙PHP技术专场上,掌阅资深后端工程师、掘金小测《Redis深度历险》作者钱文品为大家介绍了RabbitMQ的延时队列和镜像队列的原理与实践,重点比较了RabbitMQ提供的消息可靠与不可靠模式,同时介绍了生产环境下如何使用RabbitMQ实现集群间消息传输。

    本次直播视频精彩回顾,戳这里!

    直播回顾:https://yq.aliyun.com/live/965

    PPT分享:https://yq.aliyun.com/download/3529

    本文根据演讲视频以及PPT整理而成。

    本文将主要围绕以下四个方面进行分享:

    RabbitMQ特性

    RabbitMQ中的消息不可靠问题及其解决方案

    死信队列

    生产环境下使用RabbitMQ应注意的事项

    RabbitMQ特性

     

    对于左边的Client Publisher而言,RabbitMQ Server是消息的接收者,也就是消费者;对于右边的Client Consumer而言,RabbitMQ Server是消息的发送者,也就是生产者。RabbitMQ Server将消息从Client Publisher传送给Client Consumer,扮演着消息中间商的角色。

    RabbitMQ Server负责将Client Publisher传递来的消息持久化,延后地将消息传递给Client Consumer.这样,即使消费者挂掉,RabbitMQ Server也可以存储消息,当消费者重新工作时再将存储的消息传递过去,从而保证消息不丢失。RabbitMQ Server提供了堆积消息的能力。

     

    另外,RabbitMQ Server还具有复制和广播消息的能力。具体来说,RabbitMQ Server可以将Client Publisher发布的消息分发给多个消费者,比如它能够将特定的消息按照特定的队列分发给特定的消费者。“特定”指不同消息具有不同的routing key属性,由上图实例,不同的消息生产者生产了具有不同routing key的消息,通过exchange路由器将不同的routing key消息投递到不同队列,从而分发给不同消费者。

    RabbitMQ中的消息不可靠问题及其解决方案

    消费端消息不可靠问题及其解决方案

     

    实际上,RabbitMQ Server将消息投递给消费者,具有消息不可靠的特点。具体来说,RabbitMQ Server将消息投递给消费者时会调用套接字的write操作,而write操作的过程是不可靠性的。在write操作的过程中,Server需要将消息发送到套接字的缓存中,通过网卡转发到链路上,最终到达消费者所在的机器内核的套接字缓存中,由消费者使用套接字的read操作将消息读出来。

     

    即使套接字的write操作成功也无法保证消息可靠,潜在的网络故障可能使消费者接收不到消息。机器宕机也可能使消息不可靠,即使消息字节流已经到达消费者所在机器,消费者所在机器的宕机也可能使消息无法被即时读取并处理。另外,即使消费者即时读取消息,内存消息队列中的所有消息也可能因为kill-9操作发生丢失。这些可能性都直接导致了消息不可靠。

     

    因此,需要额外的措施为消息提供可靠保障。一种消息可靠性保障方式是,Server投递消息后并不立即将消息从Server删除,而是等到消费者接收、处理消息并返回Ack包给Server后,Server才删除该消息。如果消费者没有发送Ack包,那么Server将重新投递该消息。这个过程确保消息被消费者处理,保证了消息可靠。另外,假如消费者已处理消息并发送Ack包给Server,但由于网络故障等问题导致Ack包丢失时,那么Server同样会重新投递该消息,导致消息被重复处理。消息的重复处理通常由业务层面的技术手段来避免,比如在数据库层面添加主键约束等。另一种重复消息处理的避免方式是客户端对每条消息维护ID, 将被处理消息的ID记录在列表中,同时检查新到消息是否在该列表中。

     

    RabbitMQ中的Auto Ack和Manual Ack对应着消息不可靠模式和消息可靠模式. Auto Ack即no ack,指消息投后即删除,对应消息不可靠传输。Manual Ack即手动Ack,消费者处理完消息后使用Ack包通知Server删除消息,对应消息可靠传输。

    Auto Ack是RabbitMQ中最常用的模式,性能较好,但具有以下问题。当消息通过套接字write操作投递后,RabbitMQ Server立即删除该消息,该模式在遇到网络故障时容易发生消息丢失。另外,假如消费者处理消息的速率过低,可能导致消息在消费者recv buffer中大量堆积,从而导致Server端send buffer也堆积大量消息, Server端无法继续调用套接字write操作。这样,一段时间之后,Server可能强制关闭消息传输链接,导致消息不可传输。

    尽管Auto Ack存在一定风险,目前许多公司仍在应用Auto Ack模式。使用Auto Ack模式时,开发者需要注意消费者和生产者的实例数量比例,使消息生产者产生消息的速率与消费者消费消息的速率大致持平。

     

    Manual Ack是RabbitMQ 中更加智能的一种模式。Manual Ack在工作时会考虑消息消费者的消息接收能力,根据消费者的消息接受能力和当前接收到的Ack包自动调节分发消息的速率,保证消息分发可靠、不阻塞。具体来说,客户端通过PrefetchCount告知Server自身堆积消息的能力。

    生产端消息不可靠问题及其解决方案

    消息生产端同样存在消息的可靠性问题。从Client Publisher将消息传递给Server和从Server将消息传递给Client Consumer的过程是完全对等的,Server和Client Consumer间传递消息的可靠性问题在Client Publisher和Server间同样存在。

    Client Publisher首先将消息写到套接字,再通过网络传递给Server的套接字buffer,最终由Server读取该消息。这一过程的潜在网络问题也可能使Server端接收不到消息。

     

    另外,Server端本身也可能导致消息不可靠。Server端需要持久化消息,但出于性能开销的考虑,Server端并不在每次持久化消息时都刷盘。具体来说,Server端会对文件执行write操作,将脏数据写入操作系统的缓存中,而不是立即将数据写入磁盘。一般情况下,Server可能每几百毫秒执行一次fsync操作,通过fsync操作将文件的脏数据写入磁盘。由于Server具有宕机风险,那么每次Server宕机时,还未被fsync操作处理的数据就可能丢失,此过程类似于Redis AOF。

     

    RabbitMQ通过生产者事务和生产者确认两个方法解决Server产生的数据不可靠问题。

    生产者事务的基本原理是采用select和commit指令包裹publish,在消息生产者publish数据之前执行select操作,相当于begin transaction事务开始,在执行若干个publish操作后,再执行commit操作,相当于提交事务。根据tcp包的有序性,commit包成功接收意味着commit包之前的包也成功接收。因此,收到从Client Publisher传递过来的commit包意味着该commit包之前的所有publish包都已成功接收,即所有消息都成功接收。然而,commit包只有等到Server端的fsync操作执行完毕时才返回,因此生产者事务的效率较低,通常只在有批量publish操作时才使用生产者事务模式。也就是说,客户端将消息累计起来批量发送,以降低fsync操作带来的性能损失。此外,在进程中累计消息也存在风险,累计的消息可能由于进程挂掉而丢失。总的来说,生产者事务由于性能缺点不被RabbitMQ官方推荐。

     

    另一种Server带来的数据不可靠问题的解决方案是生产者确认。生产者确认类似于消费端的Ack机制,生产者可能连续发送多条消息,Server将这些消息异步地通过fsync操作写入磁盘再异步地给生产者发送Ack包,告知生产者消息的接收成功。由于Ack包异步传输,不影响生产者端消息的正常发送。生产者确认模式下,Ack包批量发送,并且都携带有序号,以告知生产者该序号以前的所有消息都已正常落盘。尽管RabbitMQ推荐用户使用生产者确认模式,目前的RabbitMQ版本还未实现消息的重发机制,只实现了Ack包的批量发送,以通知Client Publisher哪些消息接收成功。当消息丢失时,Client Publisher端已publish的消息在进程挂掉时也可能丢失,而不是重新发送,因此生产者确认的作用也不明显。当然,生产者确认起到了降低消息发布速度的作用,减小了消息丢失的数量。

     

    生产者确认中的消息重发可以通过以下几种方法实现。第一种方式在内存中累积还未收到Ack包的消息,收到Ack包后删除该消息,对于一段时间内还停留在内存中的消息,重发该消息。这种方式将未Ack消息存入内存,一旦消息生产者宕机,这些消息也会丢失。另一种方式将未收到Ack包消息存入磁盘,当收到Ack包后删除该消息,然而,磁盘存储依赖于fsync操作,降低了系统处理消息的性能。同时,这还会提高编程的复杂度,因为这要求发布消息时维护文件队列,还要求一个异步线程将文件队列中的消息发布到Server,带来了多线程和锁问题。还有一种方式将未Ack消息存入Redis,但当出现网络故障时,Redis也是不可靠的。目前提供的生产者确认中的消息重发方案都还存在问题,具体的方案选择依赖于实际场景和个人取舍。

    死信队列

    生产者确认中的消息重发可以通过以下几种方法实现。第一种方式在内存中累积还未收到Ack包的消息,收到Ack包后删除该消息,对于一段时间内还停留在内存中的消息,重发该消息。这种方式将未Ack消息存入内存,一旦消息生产者宕机,这些消息也会丢失。另一种方式将未收到Ack包消息存入磁盘,当收到Ack包后删除该消息,然而,磁盘存储依赖于fsync操作,降低了系统处理消息的性能。同时,这还会提高编程的复杂度,因为这要求发布消息时维护文件队列,还要求一个异步线程将文件队列中的消息发布到Server,带来了多线程和锁问题。还有一种方式将未Ack消息存入Redis,但当出现网络故障时,Redis也是不可靠的。目前提供的生产者确认中的消息重发方案都还存在问题,具体的方案选择依赖于实际场景和个人取舍。

    三、死信队列

    死信队列使用了RabbitMQ中的一种特殊队列属性,即x-message-ttl属性,表示队列中消息的构建时间。假如用户在声明队列时定义队列的x-message-ttl属性,此后所有进入该队列的消息都将持有构建时间,到达构建时间的消息将被删除。如果还为队列配置了回收站属性,那么即使构建时间到达,RabbitMQ也不会立即删除这些消息,而是将这些过期消息丢入回收站,即死信队列。

     

    死信队列的工作方式如上图。Client Publisher将消息投递给路由器,也就是exchange,再由exchange将消息投递给队列,由队列生成该消息的构建时间,到达构建时间的消息将过期,同时进入死信队列。过期消息进入死信队列的方式和进入普通队列的方式基本一致,即先投递给exchange路由器,再由exchange投递消息。消费者消费死信队列,得到的消息是延后的消息,延迟的时间长度即构建时间。目前,死信队列存在的问题是,一个队列只能设置一个构建时间,消息的过期时间不够灵活,不能满足一些特殊场景的需求,比如动态的重试时间。

     

    死信队列的另一个使用场景是Retry Later,即在一段时间后才重新处理此前处理失败的消息,这时可能用到双重死信。具体来说,死信队列不仅可以接收过期消息,还可以接收被reject的消息,即消费端拒绝处理或处理过程发生异常的消息,Reject操作具有requeue参数,当requeue设为true时被reject消息会重新进入消息队列并被重新投递,当requeue设为false时被reject消息将进入死信队列。假如死信队列持有构建时间,那么到达构建消息的消息将重新投递给原有队列,实现Retry Later。双重死信在使用过程中需注意消息处理的死循环问题,因为消息可能无限循环地进入死信队列。

    生产环境下使用RabbitMQ应注意的事项

     

    生产环境下,RabbitMQ通过使用集群模式。集群模式下,只有元信息分布在所有节点中。元信息指队列信息,路由器信息等,队列中的信息只存储在一个节点中,因此,单个节点宕机会导致所有节点都不可用。另外,RabbitMQ的所有节点间存在转发机制,即允许节点转发其他目标节点的消息处理请求,这样客户端只需连接到任意一个节点就可以实现其消息转发需求。

     

    队列的高可用依赖于RabbitMQ的镜像队列,即在其他节点上备份某节点的消息内容。这样,当消息所在主节点宕机时,其他镜像节点可以替代主节点完成消息传递任务。

     

    通常情况下,镜像节点是默默无闻的,客户端无需感知镜像节点的存在。只有当主节点宕机时,镜像节点才发挥作用。镜像队列的配置如下:

    Ha-mode具有三个选项,all指将所有队列的信息存入所有节点,这种模式最安全,但也最浪费存储空间;exactly指由用户精确指定每个队列的复制数,当ha-mode设置为exactly,ha-params设置为2时表示“一主一从”,这种模式是官方推荐的;nodes指由用户指定副本所在的节点,这种模式极少被使用。

    x-queue-master-locator用于设置存储队列主节点的RabbitMQ节点。min-master指将队列主节点设置在队列数量最少的RabbitMQ节点,client-local指将队列主节点设置在当前客户端所在的RabbitMQ节点,random即随机选择节点。

    Ha-sync-mode用于镜像节点代替宕机主节点并创建新节点以弥补缺失节点时,设置新节点上数据的同步策略。automatic指自动地将新主节点上数据全部同步给新节点,manual指不同步新主节点上的老数据,只同步新产生的数据。由于节点间数据同步需要耗费时间,长时间的数据同步可能会影响服务的稳定性,但通常情况下RabbitMQ的节点堆积的数据量并不大,因此RabbitMQ官方推荐使用Automatic进行数据同步。

    Ha-sync-batch-size指节点间批量同步的数据量。

    Ha-promote-on-shutdown表示主动停止主节点的服务时,其他节点如何替代主节点。Always指其他节点总是能顺利地替代主节点,when-synced要求与原主节点数据完全一致的节点才能替代主节点。

    Ha-promote-on-failure表示异常情况下其他节点如何替代主节点,always和when-synced的含义与Ha-promote-on-shutdown中一致。

     

    许多公司为RabbitMQ集群设置了内存模式,认为内存模式无需落盘,能够提升系统性能。但实际上,RabbitMQ官方文档指出,内存模式无法提升系统性能,它只提升了产生元信息数据的速度,即Ram Node指将元信息存入内存,可以提升元信息的创建速度,而不是消息数据的性能。这是使用RabbitMQ时的一个常见误区。

    展开全文
  • 本Demo的原理就是:将死信队列倒着用,然后实现延时队列 简单的配置文件: spring: rabbitmq: virtual-host: / addresses: localhost username: guest password: guest port: 5672 application: name: ...
  • rabbitmq 延迟队列

    2018-07-12 16:11:00
    1.rabbitmq 延时原理,有2个队列,一个是发送消息,设置消息过期时间或者队列过期时间(死信队列),如果达到过期时间后 将改消息发送到指定的队列中进行处理。 链接:https://share.weiyun.com/5j1Gvc6 转载于:...
  • RabbitMQ 延迟队列

    2021-02-05 14:01:32
    RabbitMQ 延迟队列什么是延迟队列Time To Live(TTL)Dead Letter Exchanges(DLX)延迟队列实现方式一(推荐)原理代码实现延迟队列实现方式二(不推荐)原理缺点 什么是延迟队列 延迟队列存储的对象肯定是对应的延时消息...
  • 关于死信的原理rabbitMQ的特性等可以在其他文章中找到,这里就不详细介绍了。 场景: 死信的场景主要是用于来实现延迟队列,比如之前介绍的redis订阅的过期事件。都是用于在未来某个时间段需要对某些数据进行...
  • RabbitMQ本身不支持延时队列,但是可以利用以下两大特性曲线实现延时队列: 1.1 Time To Live ( TTL ) RabbitMQ可以针对Queue设置x-expires 或者针对消息设置x-message-ttl来控制消息的生存时间(都设置则以较短的为...
  • 关于死信的原理rabbitMQ的特性等可以在其他文章中找到,这里就不详细介绍了。场景:死信的场景主要是用于来实现延迟队列,比如之前介绍的redis订阅的过期事件。都是用于在未来某个时间段需要对某些数据进行操作...
  • 1. 什么是延时队列? 2. 如何实现一个高效的延时队列? 3. DelayQueue的实现原理 ...4. RabbitMQ实现延时队列的基本原理 5. Redis实现延时队列的基本原理 6. 时间轮(Time Wheel) 7. 几种方案的对比
  • Rabbitmq实现延时队列一般而言有两种形式: 方式一:利用两个特性: Time To Live(TTL)、Dead Letter Exchanges(DLX) 方式二:利用rabbitmq中的插件x-delay-message 方式一: 此方式使用的是direct模式的...
  • Redis延时队列

    2020-07-15 10:21:33
    《Redis 深度历险:核心原理和应用实践》之 缓兵之计 —— 延时队列 我们平时习惯于使用 Rabbitmq 和 Kafka 作为消息队列中间件,来给应用程序之间增加异步消息传递功能。这两个中间件都是专业的消息队列中间件,...
  • 2.如果处理失败 重试 并投入延时队列 如果超过延时时间 重新投入业务队列 3.如果重试次数大于3 那么进入死信队列 1.业务队列 @Configuration public class BusinessConfig { /** * EARLYWARNING模块direct...
  • 运用RabbitMQ的DIRECT模式以及死信队列实现延时操作以及不同间隔时间后重试 一 、原理描述 图解: 一条绑定路由为【FOR_QUEUE1】的消息被发送到交换机【EXCHANGE】上 RabbitTemplate.convertSendAndReceive(...
  • RabbitMQ实现延时消息的两种方法

    千次阅读 2020-12-17 00:01:04
    文章目录RabbitMQ实现延时消息的两种方法1、死信队列1.1消息什么时候变为死信(dead-letter)1.2死信队列原理1.3 代码实现1.4死信队列的一个小坑2 、延时插件2.1如何实现 RabbitMQ实现延时消息的两种方法 1、死信...
  • RabbitMQ工作原理

    2020-08-25 22:33:37
    它扮演中间商的角色,可以用来降低web服务器因发送消息带来的负载以及延时RabbitMQ如何工作的? 我们来简单看看RabbitMQ是如何工作的。首先来看看RabbitMQ里的几个重要概念: 生产者(Producer):发送消息的...
  • 我们平时习惯于使用 Rabbitmq 和 Kafka 作为消息队列中间件,来给应用程序之间增加异步消息传递功能。这两个中间件都是专业的消息队列中间件,特性之多超出了大多数人的理解能力。 使用过 Rabbitmq 的同学知道它使用...
  • 延时消息队列(延时不是死信,延时队列不需要监听,而死信队列需要监听) 3. 路由键 都与同一个交换机进行绑定 异步下单路由键 异步支付路由键 死信消息路由键 延时消息路由键 业务流程 调用下单接口,使用...
  • 1.pom依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp<...2.配置(这里我使用了死讯队列原理其实很简单,先发...
  • RabbitMQ使用场景练习:延迟队列(八)

    千次阅读 2016-12-22 15:31:18
    延时队列  在实际业务场景中可能会用到延时消息发送,例如支付场景,准时支付、超过未支付将执行不同的方案,其中超时未支付可以看做一个延时消息。   RabbitMQ本身不具有延时消息队列的功能,但是可以通过TTL...
  • Rabbitmq特点 低延时 基于AMQP协议,支持主流开发语言接入,支持主流操作系统 开源,Erlang开发,天然支持分布式扩展,不需要像ActiveMQ、Kafka那样通过ZooKeeper分别来实现HA方案和保存集群的元数据 开箱即用,上手方便...
  • 当消费者的消息消费异常时,消息进入延迟重试队列,待超时后重新发送到重试队列指定的死信队列,死信队列重新消费信息,如果又出现死信情况,继续进入延时重试队列,依次循环,当重试超过3次后,消息进入失败队列...
  • springboot+rabbitMq整合开发实战二:模拟用户下单的过程 上一篇博客简单介绍了rabbitMQ原理以及生产消费的... 延迟队列,也叫“延时队列”,顾名思义,其实就是“生产者生产消息,消息进入队列之后,并不会立即...
  • 1 queue和exchange的绑定 当我们在SpringConfiguration中配置了queue、exchange、 binding的@Bean之后。就不用在消费端的 @RabbitListener(binding=@QueueBinding(.....延时队列原理,就是让消息一经exchange投递,就

空空如也

空空如也

1 2
收藏数 40
精华内容 16
关键字:

rabbitmq延时队列原理