精华内容
下载资源
问答
  • 死信队列

    2020-11-18 10:12:47
    死信队列 什么是死信队列 一般来说,producer将消息投递到queue中,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信...

    死信队列

    什么是死信队列

    一般来说,producer将消息投递到queue中,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信(Dead Letter),所有的死信都会放到死信队列中。

    “死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

    消费者消费消息

    ​ 1)正常消费–>手动ack–>MQ从队列中删除消息

     2)消费者报错-->没有ack-->消息是待应答状态-->channel断开后-->消费恢复为待分配状态
    
    3)消费者报错-->手动nack-->如果配置了死信队列消息会被发送到死信队列中,如果没有配置消息会被丢弃。
    

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-q11bPizC-1605665422041)(images/1600357615151.png)]

    死信队列的来源

    • 消息被拒绝(basic.reject或basic.nack)并且requeue=false.
    • 消息TTL过期
    • 队列达到最大长度(队列满了,无法再添加数据到mq中)

    配置死信队列

    
    @Configuration
    public class RabbitMQConfig {
    
        // 声明业务Exchange
        @Bean
        public TopicExchange businessExchange(){
            return new TopicExchange("businessExchange");
        }
    
        // 声明业务队列A
        @Bean
        public Queue businessQueue(){
            Map<String, Object> args = new HashMap<>();
    //       x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
            args.put("x-dead-letter-exchange", "deadLetterExchange");
    //       x-dead-letter-routing-key  这里声明当前队列的死信路由key
            args.put("x-dead-letter-routing-key", "dle.err");
    
            return new Queue("businessQueue",true,false,false,args);
        }
    
        // 声明业务队列A绑定关系
        @Bean
        public Binding businessBinding(Queue businessQueue, TopicExchange businessExchange){
            return BindingBuilder.bind(businessQueue).to(businessExchange).with("emp.*");
        }
    
    
        //声明死信Exchange
       @Bean
        public TopicExchange deadLetterExchange(){
            return new TopicExchange("deadLetterExchange");
        }
    
        // 声明死信队列A
        @Bean
        public Queue deadLetterQueue(){
            return new Queue("dle-queue");
        }
    
       @Bean
        public Binding deadLetterQueueBinding(Queue deadLetterQueue, TopicExchange deadLetterExchange){
            return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dle.*");
        }
    
    }
    

    YML配置

    spring:
      rabbitmq:
        host: 192.168.193.88
        port: 5672
        username: guest
        password: guest
        virtual-host: /
        listener:
          simple:
            acknowledge-mode: manual # 设置手动ack
    

    设置消费者

    @Component
    public class DedaLetterListener {
    
        // 监听业务队列
        @RabbitListener(queues = "businessQueue")
        public void businessQueue(String msg, Channel channel, Message message) throws IOException {
            if ("error".equals(msg)) {
                System.out.println("业务消费者出现问题:" + msg);
                try {
                    throw new RuntimeException();
                }catch (Exception e){
                    // 无法消费消息,nack
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
                }
            } else {
                System.out.println("正常消费消息:" + msg);
                // 正常消费了消息,手动ack
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        }
    
        // 监听死信队列
        @RabbitListener(queues = "dle-queue")
        public void deadLetterQueue(String msg, Channel channel, Message message) throws IOException {
            System.out.println("死信队列消费消息:" + msg);
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }
    
    参数说明
    // deliveryTag:该消息的index
    // multiple:是否批量.true:将一次性ack所有小于deliveryTag的消息。
    public void basicAck(long deliveryTag, boolean multiple)
        
    //deliveryTag:该消息的index
    //multiple:是否批量.true:将一次性拒绝所有小于deliveryTag的消息。
    //requeue:是否重新入队列   
    public void basicNack(long deliveryTag, boolean multiple, boolean requeue) 
    

    设置提供者

        @Autowired
        RabbitTemplate rabbitTemplate;
    
        @RequestMapping("/send")
        public void send(String msg){
            System.out.println("msg = [" + msg + "]");
            rabbitTemplate.convertAndSend("businessExchange","emp.add",msg);
        }
    

    死信消息的变化

    ​ 如果队列配置了参数 x-dead-letter-routing-key 的话,“死信”的路由key将会被替换成该参数对应的值。如果没有设置,则保留该消息原有的路由key。

    比如:

    ​ 如果原有消息的路由key是testA,被发送到业务Exchage中,然后被投递到业务队列QueueA中,如果该队列没有配置参数x-dead-letter-routing-key,则该消息成为死信后,将保留原有的路由keytestA,如果配置了该参数,并且值设置为testB,那么该消息成为死信后,路由key将会被替换为testB,然后被抛到死信交换机中。

    死信队列的应用场景

    一般用在较为重要的业务队列中,确保未被正确消费的消息不被丢弃,一般发生消费异常可能原因主要有由于消息信息本身存在错误导致处理异常,处理过程中参数校验异常,或者因网络波动导致的查询异常等等,当发生异常时,当然不能每次通过日志来获取原消息,然后让运维帮忙重新投递消息。通过配置死信队列,可以让未正确处理的消息暂存到另一个队列中,待后续排查清楚问题后,编写相应的处理代码来处理死信消息,这样比手工恢复数据要好太多了 。

    延时队列

    什么是延时队列

    ​ 延迟队列存储的对象肯定是对应的延时消息,所谓"延时消息"是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费。

    普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望被在指定时间得到取出和处理 。

    延时队列的设置

    RbbitMQ中存在TTL机制,一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了TTL属性或者进入了设置TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为“死信”。如果同时配置了队列的TTL和消息的TTL,那么较小的那个值将会被使用。

    给消息设置TTL时间
    Map<String, Object> args = new HashMap<String, Object>();
    args.put("x-message-ttl", 6000);// 但是毫秒
    channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
    

    每条消息的超时时间是6s,如果6s内没有被消费者消费,该消息就会变成死信。

    给队列设置超时时间
        @Bean
        public Queue businessQueue1(){
            Map<String, Object> args = new HashMap<>();
            args.put("x-message-ttl", 5000);  // 这个队列中的所有的消息最多能活6s
            return new Queue("5-queue",true,false,false,args);
        }
    

    但这两种方式是有区别的,如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃,而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-x21RHP7K-1605665422044)(images/1600502409459.png)]

    配置延时队列

    
    @Configuration
    public class RabbitMQConfigTTL {
    
        // 声明业务Exchange
        @Bean
        public TopicExchange businessExchange(){
            return new TopicExchange("ttl-Exchange");
        }
    
        // 创建延时队列1
        @Bean
        public Queue businessQueue1(){
            Map<String, Object> args = new HashMap<>();
            args.put("x-dead-letter-exchange", "deadLetterExchange");
            args.put("x-dead-letter-routing-key", "dle.err");
            args.put("x-message-ttl", 5000);   // 超时时间是5s
            return new Queue("5-queue",true,false,false,args);
        }
    
        // 创建延时队列2
        @Bean
        public Queue businessQueue2(){
            Map<String, Object> args = new HashMap<>();
            args.put("x-dead-letter-exchange", "deadLetterExchange");
            args.put("x-dead-letter-routing-key", "dle.err");
            args.put("x-message-ttl", 20000); //  // 超时时间是20s
            return new Queue("20-queue",true,false,false,args);
        }
    
        // 延时队列绑定关系
        @Bean
        public Binding businessBinding1(Queue businessQueue1, TopicExchange businessExchange){
            return BindingBuilder.bind(businessQueue1).to(businessExchange).with("emp.*");
        }
    
         // 延时队列绑定
        @Bean
        public Binding businessBinding2(Queue businessQueue2, TopicExchange businessExchange){
            return BindingBuilder.bind(businessQueue2).to(businessExchange).with("user.*");
        }
    
    
        //声明死信Exchange
       @Bean
        public TopicExchange deadLetterExchange(){
            return new TopicExchange("deadLetterExchange");
        }
    
        // 声明死信队列
        @Bean
        public Queue deadLetterQueue(){
            return new Queue("dle-queue",true,false,false,null);
        }
    
        // 死信队列绑定交换机
       @Bean
        public Binding deadLetterQueueBinding(Queue deadLetterQueue, TopicExchange deadLetterExchange){
            return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with("dle.*");
        }
    }
    

    YAML配置

    spring:
      rabbitmq:
        host: 192.168.193.88
        port: 5672
        username: guest
        password: guest
        virtual-host: /
        listener:
          simple:
            acknowledge-mode: manual # 设置手动ack
    

    设置提供者

        @RequestMapping("/ttl")
        public void test1(String msg) {
            System.out.println("p:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
            if ("5".equals(msg)) { // 添加到5s队列
                rabbitTemplate.convertAndSend("ttl-Exchange", "emp.add", msg);
            } else if ("20".equals(msg)) { // 添加到20s队列中
                rabbitTemplate.convertAndSend("ttl-Exchange", "user.add", msg);
            }
        }
    
    

    设置消费者

        @RabbitListener(queues = "dle-queue")
        public void dleQueue(String msg, Channel channel, Message message) throws IOException {
            System.out.println("dleQueue1:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    
    展开全文
  • 总的来说,为了让消息队列消息更加健壮,于是配置了超时时间和死信队列。但是出现的问题是,配置队列的TTL,总有一些消息在超过TTL时间后,进入不了死信队列,影响及时的业务通知系统。 问题在什么地方呢? ...

    原因是线上一场时间不精准问题导致的。

    总的来说,为了让消息队列消息更加健壮,于是配置了超时时间和死信队列。但是出现的问题是,配置队列的TTL,总有一些消息在超过TTL时间后,进入不了死信队列,影响及时的业务通知系统。

    问题在什么地方呢?

    prefetch: 1

    属性配置上。

    以下是问题重现,与解决过程

    1.环境搭建

    1.1rabbit服务器略

    1.2springboot工程略

    1.3代码

    消息队列配置类:

    @Configuration
    public class RabbitMQConfig {
    
        public static final String BUSINESS_EXCHANGE_NAME = "letter.demo.simple.business.exchange";
        public static final String BUSINESS_QUEUEA_NAME = "letter.demo.simple.business.queuea";
        public static final String BUSINESS_QUEUEB_NAME = "letter.demo.simple.business.queueb";
        public static final String DEAD_LETTER_EXCHANGE = "letter.demo.simple.deadletter.exchange";
        public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "letter.demo.simple.deadletter.queuea.routingkey";
        public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "letter.demo.simple.deadletter.queueb.routingkey";
        public static final String DEAD_LETTER_QUEUEA_NAME = "letter.demo.simple.deadletter.queuea";
        public static final String DEAD_LETTER_QUEUEB_NAME = "letter.demo.simple.deadletter.queueb";
    
        // 声明业务Exchange
        @Bean("businessExchange")
        public FanoutExchange businessExchange(){
            return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
        }
    
        // 声明死信Exchange
        @Bean("deadLetterExchange")
        public DirectExchange deadLetterExchange(){
            return new DirectExchange(DEAD_LETTER_EXCHANGE);
        }
    
        // 声明业务队列A
        @Bean("businessQueueA")
        public Queue businessQueueA(){
            Map<String, Object> args = new HashMap<>(2);
    //       x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
            args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
    //       x-dead-letter-routing-key  这里声明当前队列的死信路由key
            args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
    //      设置队列最大存活时间
            args.put("x-message-ttl", 6000);
            return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
        }
    
        // 声明业务队列B
        @Bean("businessQueueB")
        public Queue businessQueueB(){
            Map<String, Object> args = new HashMap<>(2);
    //       x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
            args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
    //       x-dead-letter-routing-key  这里声明当前队列的死信路由key
            args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
    
            args.put("x-message-ttl", 6000);
            return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(args).build();
        }
    
        // 声明死信队列A
        @Bean("deadLetterQueueA")
        public Queue deadLetterQueueA(){
            return new Queue(DEAD_LETTER_QUEUEA_NAME);
        }
    
        // 声明死信队列B
        @Bean("deadLetterQueueB")
        public Queue deadLetterQueueB(){
            return new Queue(DEAD_LETTER_QUEUEB_NAME);
        }
    
        // 声明业务队列A绑定关系
        @Bean
        public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
                                        @Qualifier("businessExchange") FanoutExchange exchange){
            return BindingBuilder.bind(queue).to(exchange);
        }
    
        // 声明业务队列B绑定关系
        @Bean
        public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue,
                                        @Qualifier("businessExchange") FanoutExchange exchange){
            return BindingBuilder.bind(queue).to(exchange);
        }
    
        // 声明死信队列A绑定关系
        @Bean
        public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
                                        @Qualifier("deadLetterExchange") DirectExchange exchange){
            return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
        }
    
        // 声明死信队列B绑定关系
        @Bean
        public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
                                          @Qualifier("deadLetterExchange") DirectExchange exchange){
            return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
        }
    }

    业务监听类:

    @Slf4j
    @Component
    public class BusinessMessageReceiver {
    
        @RabbitListener(queues = BUSINESS_QUEUEA_NAME)
        public void receiveA(Message message, Channel channel) throws IOException {
    
    
            /**
             * 模拟休息
             */
            try {
                Thread.sleep(1000*1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            String msg = new String(message.getBody());
            log.info("收到业务消息A:{}", msg);
            boolean ack = true;
            Exception exception = null;
            try {
                if (msg.contains("deadletter")){
                    throw new RuntimeException("dead letter exception");
                }
            } catch (Exception e){
                ack = false;
                exception = e;
            }
            if (!ack){
                log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception);
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
            } else {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            }
        }
    
        @RabbitListener(queues = BUSINESS_QUEUEB_NAME)
        public void receiveB(Message message, Channel channel) throws IOException {
    
            /**
             * 模拟休息
             */
            try {
                Thread.sleep(1000*1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            System.out.println("收到业务消息B:" + new String(message.getBody()));
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }

    死信队列监听类:

    @Component
    public class DeadLetterMessageReceiver {
    
    
        @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
        public void receiveA(Message message, Channel channel) throws IOException {
            System.out.println("收到死信消息A:" + new String(message.getBody()));
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    
        @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
        public void receiveB(Message message, Channel channel) throws IOException {
            System.out.println("收到死信消息B:" + new String(message.getBody()));
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }

    消息发送类:

    @Component
    public class BusinessMessageSender {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void sendMsg(String msg){
            rabbitTemplate.convertSendAndReceive(BUSINESS_EXCHANGE_NAME, "", msg);
        }
    }

    消息发送接口:

    @RequestMapping("rabbitmq")
    @RestController
    public class RabbitMQMsgController {
    
        @Autowired
        private BusinessMessageSender sender;
    
        @RequestMapping("sendmsg")
        public void sendMsg(String msg){
            sender.sendMsg(msg);
        }
    }

    application.yml:

    server:
      port: 8088
    spring:
      rabbitmq:
        host: 192.168.83.11
        username: nc
        password: nc
        virtual-host: /nc
        listener:
          type: simple
          simple:
            default-requeue-rejected: false
            acknowledge-mode: manual
    
    
      application:
        name: miaosha-service
      datasource:
        url: jdbc:mysql://localhost:3306/ttt?useUnicode=true&characterEncoding=UTF-8
        username: root
        password: root
        hikari:
          max-lifetime: 28830000 # 一个连接的生命时长(毫秒),超时而且没被使用则被释放(retired),缺省:30分钟,建议设置比数据库超时时长少30秒,参考MySQL wait_timeout参数(show variables like '%timeout%';)
          maximum-pool-size: 9 # 连接池中允许的最大连接数。缺省值:10;推荐的公式:((core_count * 2) + effective_spindle_count)
      redis:
        host: 192.168.83.11

    pom:

        <parent>
            <artifactId>nc-item</artifactId>
            <groupId>com.nc.item</groupId>
            <version>1.0.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>nc-test-sha</artifactId>
    
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis</artifactId>
            </dependency>
            <!-- web启动器 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <!-- eureka客户端 -->
            <!--        <dependency>
                        <groupId>org.springframework.cloud</groupId>
                        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
                    </dependency>-->
            <!-- mybatis的启动器 -->
            <dependency>
                <groupId>org.mybatis.spring.boot</groupId>
                <artifactId>mybatis-spring-boot-starter</artifactId>
            </dependency>
            <!-- 通用mapper启动器 -->
            <dependency>
                <groupId>tk.mybatis</groupId>
                <artifactId>mapper-spring-boot-starter</artifactId>
            </dependency>
            <!-- 分页助手启动器 -->
            <dependency>
                <groupId>com.github.pagehelper</groupId>
                <artifactId>pagehelper-spring-boot-starter</artifactId>
            </dependency>
            <!-- jdbc启动器 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-jdbc</artifactId>
            </dependency>
            <!-- mysql驱动 -->
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
            </dependency>
            <dependency>
                <groupId>com.nc.item</groupId>
                <artifactId>nc-item-interface</artifactId>
                <version>1.0.0-SNAPSHOT</version>
            </dependency>
            <!-- springboot检测服务启动器 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
    
            <dependency>
                <groupId>com.nc.common</groupId>
                <artifactId>nc-common</artifactId>
                <version>1.0.0-SNAPSHOT</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.54</version>
            </dependency>
    
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <scope>provided</scope>
                <version>1.16.22</version>
            </dependency>
        </dependencies>
    
    </project>

    说明,其实这里配置了TTL过期时间是6s。客户端连接后,进行线程暂停,时间超过TTL时间。于是会发现,应该转移到死信队列的消息,一直停留在原队列中。

    点击查看消费者消息:

     会发现 我队列参数设置是什么没问的,但是就是成为不了死信。

    也就是说,你认为消息应该死了,但是实际上它还驻留在原队列中没有被消费。

    2原因分析

    当监听者进行消费时,如果出现了系统资源紧张,程序出错,长期hang住,那么死信队列的消息就不能及时让死信队列监听者消费,影响后续业务逻辑。

    可问题是,为什么这些TTL的消息不进入配置的死信队列呢?

    2.1消息队列消费机制

    这个你自己复习一下好了,我不再赘述

    2.2消息队列优化参数中,有一个prefetch参数

    rabbitmq默认的参数是250,也就是说监听者会一次性抓取250条消息进行批量消费,这样效率更高

    2.3和咱们有什么关系

    我们实验场景消息队列基本没有积压,就是1,2条消息。客户端再消费第一条消息的时候,进行了休眠,导致后面的消息在同一个package中,说消费完了,也没有,一直就是在消费中的状态。。。但是也一直没有ack掉。所以,你的消息一直进入不了死信队列。

    3解决

    适当的调整prefetch参数

    本例中,将prefetch=1设置上,基本不会出现问题了

    那是不是,业务中要设置成为1呢?很显然,不是的。因为你要兼顾效率呀。

    4感悟 

    千万不要人云亦云,一定要上手实践,多看官网。

    展开全文
  • rabbitmq死信队列详解与使用

    万次阅读 多人点赞 2019-08-28 22:26:13
    什么是死信队列 先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些...

    什么是死信队列

    先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列;

    以上是个人的通俗解释,专业术语解释的比较正规点大家可以参考,主要想搞清楚这个概念,不同的消息中间件大概都有自身对于死信或者死信队列的处理方式,下面重点要说说rabbitmq的死信队列

    RabbitMQ的死信队列

    对rabbitmq来说,产生死信的来源大致有如下几种:

    • 消息被拒绝(basic.reject或basic.nack)并且requeue=false.
    • 消息TTL过期
    • 队列达到最大长度(队列满了,无法再添加数据到mq中)

    死信的处理方式

    死信的产生既然不可避免,那么就需要从实际的业务角度和场景出发,对这些死信进行后续的处理,常见的处理方式大致有下面几种,

    1. 丢弃,如果不是很重要,可以选择丢弃
    2. 记录死信入库,然后做后续的业务分析或处理
    3. 通过死信队列,由负责监听死信的应用程序进行处理

    综合来看,更常用的做法是第三种,即通过死信队列,将产生的死信通过程序的配置路由到指定的死信队列,然后应用监听死信队列,对接收到的死信做后续的处理,关于这一点,也是本篇要重点讲述的,下面将用代码演示一下死信的产生及路由,即上面提到的三种方式,网上可供参考的资料比较多,但大多不全面,下面提供比较完整的demo,将各种场景的产生和过程进行列举,

    在这里插入图片描述

    方式1:消息超时进入死信队列

    这是一种在实际生产中应用场景比较多的一种方式,比如我们熟知的订单业务场景,当用户购买商品产生了一个订单的时候,可以设置过期时间,如果在这段时间内,消息还没有被消费,将会被路由到死信队列,专业术语来讲,即消息的TTL,TTL过期了消息将进入死信队列,下面是一段演示代码,这里包括两部分,生产者和消费者,

    rabbitmq的死信队列设置主要在参数argument中做配置,这里需要设置的有 x-dead-letter-exchange 和 x-message-ttl

    producer代码,
    此处模拟生产者产生订单,推送到队列中,消息有效时间是10S,过了10S如果没有被消费将会被路由到死信队列,

     public static void main(String[] args) throws Exception{
    
            final Channel channel = RabbitUtil.getChannel();
    
            String orderExchangeName = "order_exchange";
            String orderQueueName = "order_queue";
            String orderRoutingKey = "order.#";
            Map<String, Object> arguments = new HashMap<String, Object>(16);
    
            //死信队列配置  ----------------
            String dlxExchangeName = "dlx.exchange";
            String dlxQueueName = "dlx.queue";
            String dlxRoutingKey = "#";
    
            // 为队列设置队列交换器
            arguments.put("x-dead-letter-exchange",dlxExchangeName);
            // 设置队列中的消息 10s 钟后过期
            arguments.put("x-message-ttl", 10000);
    
            //正常的队列绑定
            channel.exchangeDeclare(orderExchangeName, "topic", true, false, null);
            channel.queueDeclare(orderQueueName, true, false, false, arguments);
            channel.queueBind(orderQueueName, orderExchangeName, orderRoutingKey);
    
            String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 创建订单.";
    
            // 创建死信交换器和队列
            channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);
            channel.queueDeclare(dlxQueueName, true, false, false, null);
            channel.queueBind(dlxQueueName, dlxExchangeName, orderRoutingKey);
    
            channel.basicPublish(orderExchangeName, "order.save", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
    
            System.err.println("消息发送完成......");
        }
    

    consumer代码,消费端监听的是死信队列,如果conusmer收到了消息,表明死信队列里面有消息了

    public class Consumer {
    
        //消费端监听的是死信队列,如果conusmer收到了消息,表明死信队列里面有消息了
        private static final String QUEUE_NAME = "dlx.queue";
    
        public static void main(String[] args) throws Exception{
            // 创建信道
            final Channel channel = RabbitUtil.getChannel();
    
            System.out.println("消费者启动 ..........");
    
            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.err.println("死信队列接收到消息:" + new String(body));
                    System.err.println("deliveryTag:" + envelope.getDeliveryTag());
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
    
            channel.basicConsume(QUEUE_NAME, consumer);
            TimeUnit.SECONDS.sleep(10000000L);
        }
    
    }
    

    然后我们分别运行两端的代码,这里提示一下,我们并没有提前在控制台去创建queue 和 exchange,这个在producer启动或者consumer启动的时候,如果没有创建过会自动创建以及建立queue和exchange的绑定关系,

    启动producer,消息发送成功,同时可以通过控制台看到,exhange和相关的队列也帮我们创建了,要注意的是在dlx.queue中,有一个消息就绪,很明显,消息过了10S中没有任何消费者消费,就被路由到了死信队列dlx.queue中,

    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述

    启动consumer,通过控制台打印结果,可以看到,由于消费端监听的是死信队列,已经从dlx.queue中成功获取到了这条信息,
    在这里插入图片描述

    2、消息被拒绝,且requeue=false,
    没有细致研究过这个问题的可能会有点儿懵,其实就是在consumer端,当消费者要过滤某些消息的时候,那部分被过滤掉的消息如果不设置退回,即上一篇所讲的消息重回队列的话,这些消息就变成了死信,即在下面的代码中第三个参数设置成false即可,下面来看具体的代码,

    channel.basicNack(envelope.getDeliveryTag(),false,false);
    

    有这样一个场景,一批消息中,当消费端从header中收到了num=0的消息将会被过滤掉,并且设置如上requeue=false,下面看具体的代码,

    peoducer端代码,

    /**
     * 生产者
     * 死信队列使用
     */
    public class Producer {
    
        public static void main(String[] args) throws Exception{
    
            Channel channel = RabbitUtil.getChannel();
            String exchangeName = "test_ack_exchange";
            String routingKey = "ack.save";
    
            //通过在properties设置来标识消息的相关属性
            for(int i=0;i<5;i++){
                Map<String, Object> headers = new HashMap<String, Object>();
                headers.put("num",i);
                AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                        .deliveryMode(2)                    // 传送方式 2:持久化投递
                        .contentEncoding("UTF-8")           // 编码方式
                        //.expiration("10000")              // 过期时间
                        .headers(headers)                  //自定义属性
                        .build();
                String message = "hello this is ack message ....."  + i;
                System.out.println(message);
                channel.basicPublish(exchangeName,routingKey,true,properties,message.getBytes());
            }
    
        }
    
    
    }
    

    consumer端代码,

    public class Consumer {
    
        public static void main(String[] args) throws Exception{
    
            final Channel channel = RabbitUtil.getChannel();
            String exchangeName = "test_ack_exchange";
            String exchangeType="topic";
            final String queueName = "test_ack_queue";
            String routingKey = "ack.#";
    
            //死信队列配置  ----------------
            String deadExchangeName = "dead_exchange";
            String deadQueueName = "dead_queue";
            String deadRoutingKey = "#";
            //死信队列配置  ----------------
    
            //如果需要将死信消息路由
            Map<String,Object> arguments = new HashMap<String, Object>();
            arguments.put("x-dead-letter-exchange",deadExchangeName);
    
            channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
            channel.queueDeclare(queueName,false,false,false,arguments);
            channel.queueBind(queueName,exchangeName,routingKey);
    
            //死信队列绑定配置  ----------------
            channel.exchangeDeclare(deadExchangeName,exchangeType,true,false,false,null);
            channel.queueDeclare(deadQueueName,true,false,false,null);
            channel.queueBind(deadQueueName,deadExchangeName,deadRoutingKey);
            //死信队列配置  ----------------
    
            System.out.println("consumer启动 .....");
    
            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    try{
                        Thread.sleep(2000);
                    }catch (Exception e){
    
                    }
                    Integer num = (Integer)properties.getHeaders().get("num");
                    if(num==0){
                        //未被ack的消息,并且requeue=false。即nack的 消息不再被退回队列而成为死信队列
                        channel.basicNack(envelope.getDeliveryTag(),false,false);
                        String message = new String(body, "UTF-8");
                        System.out.println("consumer端的Nack消息是: " + message);
                    }else {
                        channel.basicAck(envelope.getDeliveryTag(),false);
                        String message = new String(body, "UTF-8");
                        System.out.println("consumer端的ack消息是: " + message);
                    }
                }
            };
            //消息要能重回队列,需要设置autoAck的属性为false,即在回调函数中进行手动签收
            channel.basicConsume(queueName,false,consumer);
        }
    }
    

    要关注的即下面的这处代码和第三个参数,
    在这里插入图片描述

    启动生产者和消费者,
    在这里插入图片描述

    启动生产者,生产者成功发送5条消息,
    在这里插入图片描述

    再看消费端的控制台,这里num=0的这条消息由于设置了死信队列而不会重回原来的队列,在上一篇中,当参数设置成了true的时候,看到控制台一直会打印一条消息,

    在这里插入图片描述
    同时,通过控制台也可以发现,在dead_queue中,有一条消息为就绪状态了,即死信消息,但这里并没有对这条消息做处理,目前一直存在队列里面,可以根据实际应用做后续的处理,
    在这里插入图片描述

    3、队列达到最大长度,
    这个很好理解,比如我们设置某个队列的最大可承载消息的数量是100个,超出第100个的消息将会被路由到死信队列中,设置消息队列的最大数量也是实际生产中作为队列限流的一种常规手段,具有实际的业务意义,下面是代码演示,基本设置和上述的TTL类似,只是在参数中将TTL更换为如下配置,

    arguments.put("x-max-length",3);
    

    生产者代码,这里我们设定order_queue这个队列的容量是5个,但是我们在程序中设置的x-max-length=3,那么按照这个猜想,将会有两个消息被路由到死信队列,

    public class Producer {
    
        public static void main(String[] args) throws Exception{
    
            final Channel channel = RabbitUtil.getChannel();
            String orderExchangeName = "order_exchange";
            String orderQueueName = "order_queue";
            String orderRoutingKey = "order.#";
            Map<String, Object> arguments = new HashMap<String, Object>(16);
    
            //死信队列配置  ----------------
            String dlxExchangeName = "dlx.exchange";
            String dlxQueueName = "dlx.queue";
            String dlxRoutingKey = "#";
    
            // 为队列设置队列交换器
            arguments.put("x-dead-letter-exchange",dlxExchangeName);
            // 设置队列中的消息 10s 钟后过期
            //arguments.put("x-message-ttl", 10000);
            arguments.put("x-max-length",3);
    
            //正常的队列绑定
            channel.exchangeDeclare(orderExchangeName, "topic", true, false, null);
            channel.queueDeclare(orderQueueName, true, false, false, arguments);
            channel.queueBind(orderQueueName, orderExchangeName, orderRoutingKey);
    
            String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 创建订单.";
    
            // 创建死信交换器和队列
            channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);
            channel.queueDeclare(dlxQueueName, true, false, false, null);
            channel.queueBind(dlxQueueName, dlxExchangeName, orderRoutingKey);
    
            for(int i=0;i<5;i++){
                message = message + "========> " + i ;
                System.out.println("发送的消息是:" + message);
                channel.basicPublish(orderExchangeName, "order.save",null, message.getBytes());
            }
    
            System.err.println("消息发送完成......");
        }
    
    }
    

    消费者代码,

    public class Consumer {
    
        private static final String QUEUE_NAME = "order_queue";
    
        public static void main(String[] args) throws Exception{
            // 创建信道
            final Channel channel = RabbitUtil.getChannel();
            // 消费端消息限流。
            // 设置客户端最多接收未被ack的消息个数, 只有消息 手动签收  此参数才会生效。
            //channel.basicQos(1);
    
            System.out.println("消费者启动 ..........");
    
            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.err.println("死信队列接收到消息:" + new String(body));
                    System.err.println("deliveryTag:" + envelope.getDeliveryTag());
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
    
            channel.basicConsume(QUEUE_NAME,false, consumer);
            //TimeUnit.SECONDS.sleep(10000000L);
        }
    
    }
    

    启动生产者,5条消息发送完毕,

    在这里插入图片描述

    再启动消费端,通过控制台可以看到,消费端只从order_queue中消费了3条消息,还剩2条消息去哪里了呢?
    在这里插入图片描述

    我们再回到控制台观察一下,发现在dlx.queue这个死信队列中有两条就绪的消息,即剩下的2条消息被路由到了死信队列了
    在这里插入图片描述

    以上便是关于死信队列常见的3种方式的处理程序和逻辑,这里只是一个简单的demo,实际应用中可以作为参考进行整合,希望对看到的同学有用!最后,感谢观看!

    需要源码的同学可前往下载https://download.csdn.net/download/zhangcongyi420/15403561

    展开全文
  • 死信队列死信队列:没有被及时消费的消息存放的队列。消息没有被及时消费的原因:a.消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=falseb.TTL(time-to-live) 消息超时未消费c.达到最大队列长度实现...

    死信队列

    死信队列:没有被及时消费的消息存放的队列。

    消息没有被及时消费的原因:

    a.消息被拒绝(basic.reject/ basic.nack)并且不再重新投递 requeue=false

    b.TTL(time-to-live) 消息超时未消费

    c.达到最大队列长度

    实现死信队列步骤

    首先需要设置死信队列的 exchange 和 queue,然后进行绑定:

    Exchange: dlx.exchange

    Queue: dlx.queue

    RoutingKey: # 代表接收所有路由 key

    然后我们进行正常声明交换机、队列、绑定,只不过我们需要在普通队列加上一个参数即可: arguments.put("x-dead-letter-exchange",' dlx.exchange' )

    这样消息在过期、requeue失败、 队列在达到最大长度时,消息就可以直接路由到死信队列!

    import com.rabbitmq.client.AMQP;

    import com.rabbitmq.client.Channel;

    import com.rabbitmq.client.Connection;

    import com.rabbitmq.client.ConnectionFactory;

    public class DlxProducer {

    public static void main(String[] args) throws Exception {

    //设置连接以及创建 channel 湖绿

    String exchangeName = "test_dlx_exchange";

    String routingKey = "item.update";

    String msg = "this is dlx msg";

    //我们设置消息过期时间,10秒后再消费 让消息进入死信队列

    AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()

    .deliveryMode(2)

    .expiration("10000")

    .build();

    channel.basicPublish(exchangeName, routingKey, true, properties, msg.getBytes());

    System.out.println("Send message : " + msg);

    channel.close();

    connection.close();

    }

    }

    import com.rabbitmq.client.*;

    import java.io.IOException;

    import java.util.HashMap;

    import java.util.Map;

    public class DlxConsumer {

    public static void main(String[] args) throws Exception {

    //创建连接、创建channel忽略 内容可以在上面代码中获取

    String exchangeName = "test_dlx_exchange";

    String queueName = "test_dlx_queue";

    String routingKey = "item.#";

    //必须设置参数到 arguments 中

    Map arguments = new HashMap();

    arguments.put("x-dead-letter-exchange", "dlx.exchange");

    channel.exchangeDeclare(exchangeName, "topic", true, false, null);

    //将 arguments 放入队列的声明中

    channel.queueDeclare(queueName, true, false, false, arguments);

    //一般不用代码绑定,在管理界面手动绑定

    channel.queueBind(queueName, exchangeName, routingKey);

    //声明死信队列

    channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);

    channel.queueDeclare("dlx.queue", true, false, false, null);

    //路由键为 # 代表可以路由到所有消息

    channel.queueBind("dlx.queue", "dlx.exchange", "#");

    Consumer consumer = new DefaultConsumer(channel) {

    @Override

    public void handleDelivery(String consumerTag, Envelope envelope,

    AMQP.BasicProperties properties, byte[] body)

    throws IOException {

    String message = new String(body, "UTF-8");

    System.out.println(" [x] Received '" + message + "'");

    }

    };

    //6. 设置 Channel 消费者绑定队列

    channel.basicConsume(queueName, true, consumer);

    }

    }

    总结

    DLX也是一个正常的 Exchange,和一般的 Exchange 没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。

    当这个队列中有死信时,RabbitMQ 就会自动的将这个消息重新发布到设置的 Exchange 上去,进而被路由到另一个队列。可以监听这个队列中消息做相应的处理。

    近期热文推荐:

    觉得不错,别忘了随手点赞+转发哦!

    展开全文
  • 死信队列介绍死信队列:DLX,dead-letter-exchange利用DLX,当消息在一个队列中变成死信 (dead message) 之后,它能被重新publish到另一个Exchange,这个Exchange就是DLX消息变成死信有以下几种情况消息被拒绝(basic...
  • 队列管理器的死信队列Exchanging messages from a microservice to another has never been easier. With the presence of modern web frameworks we are now able to bootstrap microservices from ground up with ...
  • 什么是死信队列2. 代码示例3. 使用死信队列实现延时队列 1. 什么是死信队列         就是在队列中的消息如果没有消费者消费,那么该消息就成为一个死信。如果这个消息被...
  • 前言在说死信队列之前,我们先介绍下为什么需要用死信队列。如果想直接了解死信对接,直接跳入下文的"死信队列"部分即可。ack机制和requeue-rejected属性在项目springboot-demo我们看到application.yaml文件部分配置...
  • Spring Cloud Stream RabbitMQ 配置死信队列,消费死信队列 Application.java package com.buxiaoxia; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import...
  • rabbitmq的死信队列

    千次阅读 2020-12-11 11:28:45
    如何配置死信队列 配置业务队列,绑定到业务交换机上 为业务队列配置死信交换机和路由key 为死信交换机配置死信队列 死信队列的应用场景 通过上面的信息,我们已经知道如何使用死信队列了,那么死信队列一般在什么...
  • 文章目录死信队列延迟队列 死信队列 死信队列,英文缩写:DLX (Dead Letter Exchange(死信交换机)),当消息成为 Dead message 后,可以被重新发送到另一个交换机,这个交换机就是DLX 在RabbitMQ中死信队列...
  • RabbitMQ死信队列

    2021-04-28 16:39:11
    为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。但由于对死信队列的概念及配置不熟悉,导致曾一度陷入百度的汪洋大海,无法自拔,很多文章都看...
  • RabbitMQ 死信/死信队列

    2019-08-13 06:45:00
    一、RabbitMQ 死信/死信队列1、DLXDead Letter Exchange 的缩写DLX(Dead Letter Exchanges)死信交换,死信队列本身也...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 2,342
精华内容 936
关键字:

死信队列