-
2019-01-25 18:18:15
转自:https://blog.csdn.net/zhanghan18333611647/article/details/79519085
二、用代码进行相关测试:
1、生产者进行添加,20s后不进行消费,发现消息从正常的队列进入了死信队列;
2、生产者进行添加,消费者进行消费;
3、设置死信队列要根据具体的业务场景去应用,一般应用在当正常业务处理时出现异常时,将消息拒绝则会进入到死信队列中,有助于统计异常数据并做后续处理;
三、利用jemeter进行压力测试:
1、向正常队列中压入1000条消息,生产者不消费,过20s后发现进入死信队列;
2、向正常队列中压入1000条消息,消费者一直在消费,看20s后消费者能消费多少条消息;
【总结】
1、对程序进行异步处理后可以缓解一方产生块另一方消费慢的问题;
2、可以实现延迟处理的功效:数据上来后先进入正常的queue中(根据业务设置相应的失效时间【延迟时间】)进入死信队列,消费者消费死信队列中的数据;注:正常queue没有消费者,死信queue有消费者;3、给客户的体验感好了,异步处理后,不需要等待同步的结果;
更多相关内容 -
RabbitMQ死信队列处理超时未支付的订单
2020-12-19 15:11:31“死信队列”,顾明思议,是可以延时、延迟一定的时间再处理消息的一种特殊队列,它相对于“普通的队列”而言,可以实现“进入死信队列的消息不立即处理,而是可以等待一定的时间再进行处理”的功能!而普通的队列则...前言:
我们在抢购商品的时候总有这样的一种场景,就是我们已经抢购到我们的商品,但是由于我们某种原因没有及时的支付导致订单失效的情况。
那么我们今天就用rabbitmq来实现这么的一个场景。
“死信队列”,顾明思议,是可以延时、延迟一定的时间再处理消息的一种特殊队列,它相对于“普通的队列”而言,可以实现“进入死信队列的消息不立即处理,而是可以等待一定的时间再进行处理”的功能!而普通的队列则不行,即进入队列后的消息会立即被对应的消费者监听消费,如下图所示为普通队列的基本消息模型:
而对于“死信队列”,它的构成以及使用相对而言比较复杂一点,在正常情况,死信队列由三大核心组件组成:死信交换机+死信路由+TTL(消息存活时间~非必需的),而死信队列又可以由“面向生产者的基本交换机+基本路由”绑定而成,故而生产者首先是将消息发送至“基本交换机+基本路由”所绑定而成的消息模型中,即间接性地进入到死信队列中,当过了TTL,消息将“挂掉”,从而进入下一个中转站,即“面下那个消费者的死信交换机+死信路由”所绑定而成的消息模型中。如下图所示:
引入maven依赖:
<!--rabbitmq--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.1.5.RELEASE</version> </dependency>
填写yml文件的配置信息:application.yml
# Spring配置 spring: # rabbitmq rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / listener: simple: concurrency: 20 max-concurrency: 30 prefetch: 15
RabbitmqConfig.class 自定义配置信息
import com.ruoyi.common.constant.RabbitMQConstants; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitAdmin; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.core.env.Environment; import java.util.HashMap; import java.util.Map; /** * @Author Lxq * @Date 2020/12/17 9:52 * @Version 1.0 */ @Slf4j @Configuration public class RabbitmqConfig { @Autowired private Environment env; @Autowired private CachingConnectionFactory connectionFactory; @Autowired private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer; @Bean public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } /** * 单一消费 * * @return */ @Bean(name = "singleListenerContainer") public SimpleRabbitListenerContainerFactory listenerContainerFactory() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factory.setConnectionFactory(connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); factory.setConcurrentConsumers(1); factory.setMaxConcurrentConsumers(1); factory.setPrefetchCount(1); factory.setTxSize(1); return factory; } /** * 为了保证数据不被丢失,RabbitMQ支持消息确认机制,为了保证数据能被正确处理而不仅仅是被Consumer收到,那么我们不能采用no-ack,而应该是在处理完数据之后发送ack. * 在处理完数据之后发送ack,就是告诉RabbitMQ数据已经被接收,处理完成,RabbitMQ可以安全的删除它了. * 如果Consumer退出了但是没有发送ack,那么RabbitMQ就会把这个Message发送到下一个Consumer,这样就保证在Consumer异常退出情况下数据也不会丢失. * RabbitMQ它没有用到超时机制.RabbitMQ仅仅通过Consumer的连接中断来确认该Message并没有正确处理,也就是说RabbitMQ给了Consumer足够长的时间做数据处理。 * 如果忘记ack,那么当Consumer退出时,Mesage会重新分发,然后RabbitMQ会占用越来越多的内存. * <p> * 无ack模式(AcknowledgeMode.NONE) * 有ack模式(AcknowledgeMode.AUTO,AcknowledgeMode.MANUAL) * <p> * AcknowledgeMode.MANUAL模式需要人为地获取到channel之后调用方法向server发送ack(或消费失败时的nack)信息。 * <p> * AcknowledgeMode.AUTO模式下,由spring-rabbit依据消息处理逻辑是否抛出异常自动发送ack(无异常)或nack(异常)到server端。 */ /** * 消费者的数量 默认 20 */ private static final Integer spring_rabbitmq_listener_simple_concurrency = 20; /** * 最大的消费者数量 */ private static final Integer spring_rabbitmq_listener_simple_max_concurrency = 30; /** * 每个消费者获取最大投递数量 */ private static final Integer spring_rabbitmq_listener_simple_prefetch = 15; /** * 多个消费值 * * @return */ @Bean(name = "multiListenerContainer") public SimpleRabbitListenerContainerFactory multiListenerContainer() { SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); factoryConfigurer.configure(factory, connectionFactory); factory.setMessageConverter(new Jackson2JsonMessageConverter()); // 确认消费模式(无ack) factory.setAcknowledgeMode(AcknowledgeMode.NONE); factory.setConcurrentConsumers(spring_rabbitmq_listener_simple_concurrency); factory.setMaxConcurrentConsumers(spring_rabbitmq_listener_simple_max_concurrency); factory.setPrefetchCount(spring_rabbitmq_listener_simple_prefetch); return factory; } @Bean public RabbitTemplate rabbitTemplate() { // 发布者确认 connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); /** * mandatory:交换器无法根据自身类型和路由键找到一个符合条件的队列时的处理方式 * true:RabbitMQ会调用Basic.Return命令将消息返回给生产者 * false:RabbitMQ会把消息直接丢弃 */ rabbitTemplate.setMandatory(true); /** * ConfirmCallback:每一条发到rabbitmq server的消息都会调一次confirm方法。 * 如果消息成功到达exchange,则ack参数为true,反之为false; * cause参数是错误信息; * CorrelationData可以理解为context,在发送消息时传入的这个参数,此时会拿到。 */ rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause); } }); /** * ReturnCallback:成功到达exchange,但routing不到任何queue时会调用。 * 可以看到这里能直接拿到message,exchange,routingKey信息。 */ rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.warn("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message); } }); return rabbitTemplate; } //构建异步发送邮箱通知的消息模型 @Bean public Queue successEmailQueue() { return new Queue(RabbitMQConstants.MQ_KILL_ITEM_SUCCESS_EMAIL_QUEUE, true); } @Bean public TopicExchange successEmailExchange() { return new TopicExchange(RabbitMQConstants.MQ_KILL_ITEM_SUCCESS_EMAIL_EXCHANGE, true, false); } @Bean public Binding successEmailBinding() { return BindingBuilder.bind(successEmailQueue()).to(successEmailExchange()).with(RabbitMQConstants.MQ_KILL_ITEM_SUCCESS_EMAIL_ROUTING_KEY); } /** * 死信队列 */ @Bean public Queue successKillDeadQueue() { Map<String, Object> argsMap = new HashMap(2); // 死信交换机 argsMap.put("x-dead-letter-exchange", RabbitMQConstants.MQ_KILL_ITEM_SUCCESS_KILL_DEAD_EXCHANGE); // 死信路由 argsMap.put("x-dead-letter-routing-key", RabbitMQConstants.MQ_KILL_ITEM_SUCCESS_KILL_DEAD_ROUTING_KEY); // 死信队列 return new Queue(RabbitMQConstants.MQ_KILL_ITEM_SUCCESS_KILL_DEAD_QUEUE, true, false, false, argsMap); } /** * 基本交换机 */ @Bean public TopicExchange successKillDeadProdExchange() { return new TopicExchange(RabbitMQConstants.MQ_KILL_ITEM_SUCCESS_KILL_DEAD_PROD_EXCHANGE, true, false); } /** * 创建基本交换机+基本路由 -> 死信队列 的绑定 */ @Bean public Binding successKillDeadProdBinding() { return BindingBuilder.bind(successKillDeadQueue()).to(successKillDeadProdExchange()).with(RabbitMQConstants.MQ_KILL_ITEM_SUCCESS_KILL_DEAD_PROD_ROUTING_KEY); } /** * 真正的队列 */ @Bean public Queue successKillRealQueue() { return new Queue(RabbitMQConstants.MQ_KILL_ITEM_SUCCESS_KILL_DEAD_REAL_QUEUE, true); } /** * 死信交换机 */ @Bean public TopicExchange successKillDeadExchange() { return new TopicExchange(RabbitMQConstants.MQ_KILL_ITEM_SUCCESS_KILL_DEAD_EXCHANGE, true, false); } /** * 死信交换机+死信路由->真正队列 的绑定 */ @Bean public Binding successKillDeadBinding() { return BindingBuilder.bind(successKillRealQueue()).to(successKillDeadExchange()).with(RabbitMQConstants.MQ_KILL_ITEM_SUCCESS_KILL_DEAD_ROUTING_KEY); } }
从上面的配置信息可以看出,首先是定义个死信队列 = 死信交换机 + 死信路由 ,然后用 基本交换机 + 基本路由 - > 绑定到我们的死信队列、 接着按照上面的模型图,我们还需要的是将 死信交换机 + 死信路由 -> 绑定到基本的队列
附上常量类:RabbitMQConstants
/** * @Author Lxq * @Date 2020/12/17 14:53 * @Version 1_0 */ public class RabbitMQConstants { /** * 秒杀成功异步发送邮件的消息模型 */ public static final String MQ_KILL_ITEM_SUCCESS_EMAIL_QUEUE = "kill_item_success_email_queue"; public static final String MQ_KILL_ITEM_SUCCESS_EMAIL_EXCHANGE = "kill_item_success_email_exchange"; public static final String MQ_KILL_ITEM_SUCCESS_EMAIL_ROUTING_KEY = "kill_item_success_email_routing_key"; /** * 死信队列 */ public static final String MQ_KILL_ITEM_SUCCESS_KILL_DEAD_QUEUE = "kill_item_success_kill_dead_queue"; /** * 死信交换机 */ public static final String MQ_KILL_ITEM_SUCCESS_KILL_DEAD_EXCHANGE = "kill_item_success_kill_dead_exchange"; /** * 死信路由key */ public static final String MQ_KILL_ITEM_SUCCESS_KILL_DEAD_ROUTING_KEY = "kill_item_success_kill_dead_routing_key"; /** * 正常队列 */ public static final String MQ_KILL_ITEM_SUCCESS_KILL_DEAD_REAL_QUEUE = "kill_item_success_kill_dead_real_queue"; /** * 正常交换机 */ public static final String MQ_KILL_ITEM_SUCCESS_KILL_DEAD_PROD_EXCHANGE = "kill_item_success_kill_dead_prod_exchange"; /** * 正常路由key */ public static final String MQ_KILL_ITEM_SUCCESS_KILL_DEAD_PROD_ROUTING_KEY = "kill_item_success_kill_dead_prod_routing_key"; }
接下来就是如何去发送消息到MQ中,那么我定义一个通用的service
RabbitSenderService
/** * @Author Lxq * @Date 2020/12/17 9:50 * @Version 1.0 * RabbitMQ通用的消息发送服务 */ @Slf4j @Service public class RabbitSenderService { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private ItemKillSuccessMapper itemKillSuccessMapper; /** * 秒杀成功后生成抢购订单-发送信息入死信队列,等待着一定时间失效超时未支付的订单 * * @param orderCode */ public void sendKillSuccessOrderExpireMsg(String orderCode) { try { if (StringUtils.isNotEmpty(orderCode)) { KillSuccessUserInfo info = itemKillSuccessMapper.selectByCode(orderCode); if (info != null) { rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); rabbitTemplate.setExchange(RabbitMQConstants.MQ_KILL_ITEM_SUCCESS_KILL_DEAD_PROD_EXCHANGE); rabbitTemplate.setRoutingKey(RabbitMQConstants.MQ_KILL_ITEM_SUCCESS_KILL_DEAD_PROD_ROUTING_KEY); rabbitTemplate.convertAndSend(info, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { MessageProperties mp = message.getMessageProperties(); mp.setDeliveryMode(MessageDeliveryMode.PERSISTENT); mp.setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, KillSuccessUserInfo.class); //TODO:动态设置TTL(为了测试方便,暂且设置20s) // 消息的失效时间(下单并且没有支付限定的时间) mp.setExpiration("20000"); return message; } }); } } } catch (Exception e) { log.error("秒杀成功后生成抢购订单-发送信息入死信队列,等待着一定时间失效超时未支付的订单-发生异常,消息为:{}", orderCode, e.fillInStackTrace()); } } }
这里其实就是模拟秒杀 成功之后,我就会调用这个方法将订单的编号传来获取订单信息,然后将订单信息通过MQ TTL 发送
这样子设置就是默认订单下单之后10秒钟没有支付,待会下面写的监听器就会监听这个队列,然后处理这里超时没有支付的信息,将状态进行修改。
这里要注意的是发送消息设置的交换机和路由key(基本交换机 + 基本的路由key)
订单的监听service
RabbitReceiverService
/** * @Author Lxq * @Date 2020/12/17 15:29 * @Version 1.0 * RabbitMQ通用的消息接收服务 */ @Slf4j @Service public class RabbitReceiverService { @Autowired private ItemKillSuccessMapper itemKillSuccessMapper; @RabbitListener(queues = {RabbitMQConstants.MQ_KILL_ITEM_SUCCESS_KILL_DEAD_REAL_QUEUE}, containerFactory = "singleListenerContainer") public void consumeExpireOrder(KillSuccessUserInfo info) { try { log.info("用户秒杀成功后超时未支付-监听者-接收消息:{}", info); if (info != null) { ItemKillSuccess entity = itemKillSuccessMapper.selectByPrimaryKey(info.getCode()); if (entity != null && entity.getStatus().intValue() == 0) { itemKillSuccessMapper.expireOrder(info.getCode()); } } } catch (Exception e) { log.error("用户秒杀成功后超时未支付-监听者-发生异常:", e.fillInStackTrace()); } } }
这里我们监听基本的队列。
然后附上测试图:
-
RabbitMQ 死信队列详解
2022-05-28 23:07:04Broker 或者直接到 Queue 里了,Consumer 从 Queue 取出消息进行消费,但某些时候由于特定的原因导致 Queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。...一、死信的概念
死信,顾名思义就是无法被消费的消息。一般来说,Producer 将消息投递到 Broker 或者直接到 Queue 里了,Consumer 从 Queue 取出消息进行消费,但某些时候由于特定的原因导致 Queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,档消息消费发生异常时,将消息投入到死信队列中。还有比如说:用户在商城下单成功并点击支付后再指定时间未支付时自动失效。
二、死信的来源
- 消息 TTL 过期
- 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
- 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false(不再重新入队)
三、死信实战
3.1 代码架构图
3.2 消息 TTL 过期
生产者
public class DeadLetterProducer { private static String EXCHANGE_NAME = "normal_exchange"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtil.getChannel(); // 声明一个交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 设置消息 TTL 过期时间 AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build(); String message = "info"; channel.basicPublish(EXCHANGE_NAME, "zhangsan", properties, message.getBytes()); System.out.println("消息发送完成:" + message); } }
消费者1
public class DeadLetterConsumer1 { private static String NORMAL_EXCHANGE_NAME = "normal_exchange"; private static String NORMAL_QUEUE_NAME = "normal-queue"; private static String DEAD_EXCHANGE_NAME = "dead_exchange"; private static String DEAD_QUEUE_NAME = "dead-queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtil.getChannel(); // 声明一个死信队列 channel.queueDeclare(DEAD_QUEUE_NAME, false, false, false, null); // 声明一个死信交换机 channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 死信队列与死信交换机绑定 channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, "lisi"); // 正常队列与死信交换机的绑定关系 Map<String, Object> deadLetterParams = new HashMap<>(2); deadLetterParams.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME); deadLetterParams.put("x-dead-letter-routing-key","lisi"); // 声明一个正常队列 channel.queueDeclare(NORMAL_QUEUE_NAME, false, false, false, deadLetterParams); // 声明一个正常交换机 channel.exchangeDeclare(NORMAL_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 把队列和交换机进行绑定 channel.queueBind(NORMAL_QUEUE_NAME, NORMAL_EXCHANGE_NAME, "zhangsan"); System.out.println("C1消费者启动等待消费消息:"); channel.basicConsume(NORMAL_QUEUE_NAME, true, (consumerTag, delivery) -> { String receivedMessage = new String(delivery.getBody()); System.out.println("消费者接收到消息:" + receivedMessage); },(consumerTag) -> { System.out.println(consumerTag + "消费者取消消费消息"); }); } }
消费者2
public class DeadLetterConsumer2 { private static String NORMAL_EXCHANGE_NAME = "normal_exchange"; private static String DEAD_QUEUE_NAME = "dead-queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtil.getChannel(); channel.exchangeDeclare(NORMAL_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); System.out.println("C2消费者启动等待消费消息:"); channel.basicConsume(DEAD_QUEUE_NAME, true, (consumerTag, delivery) -> { String receivedMessage = new String(delivery.getBody()); System.out.println("消费者接收到死信:" + receivedMessage); },(consumerTag) -> { System.out.println(consumerTag + "消费者取消消费消息"); }); } }
先启动消费者1,将正常交换机、死信交换机、正常队列、死信队列创建出来,否则会报错。接着启动消费者2,然后在启动生产者,观察控制台。
消费者1启动后进入RabbitMQ系统后台,可以看到队列 normal-queue 的 features 一列多了两个信息。其中 DLX 表示死信交换机,DLK 表示死信交换机的路由键(RoutingKey)。
此时由于消费者1可以正常消费消息,所以在消费者2中,死信队列是接收不到消息的。控制台情况如下:
将消费者1和消费者2的服务停止,重新运行生产者,10s 后消息会被进入到死信队列
再来看下后台系统:
生产者未发送消息
生产者发送了 1 条消息,此时正常队列中有 1 条未消费消息
时间过去 10 秒,正常队列里面的消息由于没有被消费,消息进入死信队列。
3.3 队列达到最大长度
生产者
public class DeadLetterLengthProducer { private static String EXCHANGE_NAME = "normal_exchange"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtil.getChannel(); // 声明一个交换机 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 设置消息 TTL 过期时间 for (int i = 0; i < 10; i++) { String message = "info" + i; channel.basicPublish(EXCHANGE_NAME, "zhangsan", null, message.getBytes()); } System.out.println("消息发送完成"); } }
消费者1
public class DeadLetterLengthConsumer1 { private static String NORMAL_EXCHANGE_NAME = "normal_exchange"; private static String NORMAL_QUEUE_NAME = "normal-queue"; private static String DEAD_EXCHANGE_NAME = "dead_exchange"; private static String DEAD_QUEUE_NAME = "dead-queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtil.getChannel(); // 声明一个死信队列 channel.queueDeclare(DEAD_QUEUE_NAME, false, false, false, null); // 声明一个死信交换机 channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 死信队列与死信交换机绑定 channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, "lisi"); // 正常队列与死信交换机的绑定关系 Map<String, Object> deadLetterParams = new HashMap<>(2); deadLetterParams.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME); deadLetterParams.put("x-dead-letter-routing-key","lisi"); deadLetterParams.put("x-max-length", 6); // 声明一个正常队列 channel.queueDeclare(NORMAL_QUEUE_NAME, false, false, false, deadLetterParams); // 声明一个正常交换机 channel.exchangeDeclare(NORMAL_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 把队列和交换机进行绑定 channel.queueBind(NORMAL_QUEUE_NAME, NORMAL_EXCHANGE_NAME, "zhangsan"); System.out.println("C1消费者启动等待消费消息:"); channel.basicConsume(NORMAL_QUEUE_NAME, true, (consumerTag, delivery) -> { String receivedMessage = new String(delivery.getBody()); System.out.println("消费者接收到消息:" + receivedMessage); },(consumerTag) -> { System.out.println(consumerTag + "消费者取消消费消息"); }); } }
消费者2
public class DeadLetterLengthConsumer2 { private static String NORMAL_EXCHANGE_NAME = "normal_exchange"; private static String DEAD_QUEUE_NAME = "dead-queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtil.getChannel(); channel.exchangeDeclare(NORMAL_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); System.out.println("C2消费者启动等待消费消息:"); channel.basicConsume(DEAD_QUEUE_NAME, true, (consumerTag, delivery) -> { String receivedMessage = new String(delivery.getBody()); System.out.println("消费者接收到死信:" + receivedMessage); },(consumerTag) -> { System.out.println(consumerTag + "消费者取消消费消息"); }); } }
由于消费者1中修改了队列参数,所以启动前需要先将原先的队列删除,然后再启动消费者1,创建相关的队列及交换机。接着关闭消费者 1,启动生产者。打开后台系统:
普通队列中有 6 条消息未消费,超出队列长度的 4 条消息进入到了死信队列。然后启动消费者1 和消费者2
3.4 消息被拒
生产者和消费者2 的代码不需要修改,修改消费者1 的代码,修改后的代码如下:
消费者2
public class DeadLetterRejectConsumer1 { private static String NORMAL_EXCHANGE_NAME = "normal_exchange"; private static String NORMAL_QUEUE_NAME = "normal-queue"; private static String DEAD_EXCHANGE_NAME = "dead_exchange"; private static String DEAD_QUEUE_NAME = "dead-queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtil.getChannel(); // 声明一个死信队列 channel.queueDeclare(DEAD_QUEUE_NAME, false, false, false, null); // 声明一个死信交换机 channel.exchangeDeclare(DEAD_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 死信队列与死信交换机绑定 channel.queueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, "lisi"); // 正常队列与死信交换机的绑定关系 Map<String, Object> deadLetterParams = new HashMap<>(2); deadLetterParams.put("x-dead-letter-exchange", DEAD_EXCHANGE_NAME); deadLetterParams.put("x-dead-letter-routing-key","lisi"); // 声明一个正常队列 channel.queueDeclare(NORMAL_QUEUE_NAME, false, false, false, deadLetterParams); // 声明一个正常交换机 channel.exchangeDeclare(NORMAL_EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 把队列和交换机进行绑定 channel.queueBind(NORMAL_QUEUE_NAME, NORMAL_EXCHANGE_NAME, "zhangsan"); System.out.println("C1消费者启动等待消费消息:"); channel.basicConsume(NORMAL_QUEUE_NAME, false, (consumerTag, delivery) -> { String receivedMessage = new String(delivery.getBody()); if ("info5".equals(receivedMessage)) { System.out.println("C1接收到消息:" + receivedMessage+"并且拒绝签收了"); // 禁止重新入队 channel.basicReject(delivery.getEnvelope().getDeliveryTag(), false); } else { System.out.println("消费者接收到消息:" + receivedMessage); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } },(consumerTag) -> { System.out.println(consumerTag + "消费者取消消费消息"); }); } }
将原先的队列删除,重新启动消费者2,接着启动生产者
最后启动消费者2
-
RabbitMQ中的死信及死信队列详解
2022-03-06 15:59:101、什么是死信 死信产生主要来自于两个过程角色 来自于消费者端 来自于queue 产生死信的三种情况: 如果queue中的消息被消费者接收, 但是消费者拒绝消费(消费者执行了reject 或nack 并将 requee 参数设置为 ...1、什么是死信
死信产生主要来自于两个过程角色
- 来自于消费者端
- 来自于queue
产生死信的三种情况:
- 如果queue中的消息被消费者接收, 但是消费者拒绝消费(消费者执行了reject 或nack 并将 requee 参数设置为 false )的时候,这个消息就会变成死信。
- 消息本身设置了过期时间(TTL), 并且消息过期时间已经生效, 还未被消费的消息就会变成死信【特点是每个消息的过期时间都不同】
- 可以设置队列中所有消息的过期时间,如果消息过期时间已经生效,消息还未被消费
- 队列设置了最大长度限制, 当队列已满, 之后从交换机路由到该队列的消息会自动变成死信。
2、什么是死信交换机
死信交换机是专门处理死信的交换机
3、什么是死信队列
跟死信交换机绑定的队列就是死信队列,死信队列中存储的消息都是死信消息。
4、死信队列的应用
- 保证消息不会丢失,保证数据的完整性;
- 可以借助延时消费的特性完成特定的功能(比如订单生成但是未支付,超过30分钟自动取消的业务场景)
5、原理图介绍
6、三种场景代码演示
6.1 消费者拒绝消费消息, 消息进入死信队列
首先需要在application.yml文件中开启手动ack
spring: rabbitmq: # 开启消费者手动ack listener: direct: acknowledge-mode: manual
创建配置类
往容器中注入普通交换机、普通队列、路由规则和死信交换机和死信队列、死信路由规则package com.kkarma.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class DeadLettingConfig { public static final String GENERIC_EXCHANGE = "generic-exchange"; public static final String GENERIC_QUEUE = "generic-queue"; public static final String GENERIC_ROUTING_KEY = "generic.#"; public static final String DEAD_EXCHANGE = "dead-exchange"; public static final String DEAD_QUEUE = "dead-queue"; public static final String DEAD_ROUTING_KEY = "dead.#"; @Bean public Exchange genericExchange(){ return ExchangeBuilder.topicExchange(GENERIC_EXCHANGE).build(); } @Bean public Queue genericQueue(){ return QueueBuilder.durable(GENERIC_QUEUE).deadLetterExchange(DEAD_EXCHANGE).deadLetterRoutingKey("dead.demo").build(); } @Bean public Binding genericBinding(Queue genericQueue, Exchange genericExchange){ return BindingBuilder.bind(genericQueue).to(genericExchange).with(GENERIC_ROUTING_KEY).noargs(); } @Bean public Exchange deadExchange(){ return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).build(); } @Bean public Queue deadQueue(){ return QueueBuilder.durable(DEAD_QUEUE).build(); } @Bean public Binding deadBinding(Queue deadQueue, Exchange deadExchange){ return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs(); } }
生产者端
package com.kkarma.deadletter; import com.kkarma.config.DeadLettingConfig; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import java.io.IOException; @SpringBootTest public class Publisher { private final RabbitTemplate rabbitTemplate; @Autowired public Publisher(RabbitTemplate rabbitTemplate){ this.rabbitTemplate = rabbitTemplate; } /** * 消费者拒绝消费,消息进入死信队列 */ @Test public void push() throws IOException { rabbitTemplate.convertAndSend(DeadLettingConfig.GENERIC_EXCHANGE, "generic.demo", "正常消息被拒绝消息会变成死信..."); System.out.println("消息发送成功..."); System.in.read(); } }
消费者端
package com.kkarma.deadletter; import com.kkarma.config.DeadLettingConfig; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class Consumer { @RabbitListener(queues = DeadLettingConfig.GENERIC_QUEUE) public void pullAndNack(String msg, Channel channel, Message message) throws Exception { System.out.println("接收generic-queue队列中的消息内容: " + msg); String correlationId = message.getMessageProperties().getCorrelationId(); System.out.println("唯一标识: " + correlationId); // 拒绝消费消息 channel.basicReject(message.getMessageProperties().getDeliveryTag(),false); // 确认未成功消费 // channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); } }
测试结果
6.2 消息指定过期时间
主要是生产者端在发送消息是指定消息的过期时间
/** * 消息指定过期时间 */ @Test public void publishMsgExpire(){ String msg = "generic letter expire become dear letter"; rabbitTemplate.convertAndSend(DeadLettingConfig.GENERIC_EXCHANGE, "generic.demo", msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("10000"); return message; } }); }
6.3 给队列设置消息过期时间
这一项设置主要是通过DeadLettingConfig配置类中的正常队列来进行设置
@Bean public Queue genericQueue(){ // 这里需要指定当消息变成死信时绑定的死信交换机,并且需要重新指定指定死信交换机和死信队列之间的路由 return QueueBuilder .durable(GENERIC_QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey("dead.demo") // 设置队列中消息的过期时间 .ttl(10000) .build(); }
6.4 设置队列的最大长度
@Bean public Queue genericQueue(){ // 这里需要指定当消息变成死信时绑定的死信交换机,并且需要重新指定指定死信交换机和死信队列之间的路由 return QueueBuilder .durable(GENERIC_QUEUE) .deadLetterExchange(DEAD_EXCHANGE) .deadLetterRoutingKey("dead.demo") // 设置队列最大长度,当队列已满,再路由过来的消息就被当成死信处理 .maxlength(10) .build(); }
-
websphere mq 8死信队列测试--队列满了场景
2022-05-10 11:46:25websphere mq 死信队列测试 -
mq监听死信队列后如何处理
2021-06-10 07:27:34昨天试了半天为啥监听不到死信队列的消息,原因是打开方式不对,还有死信队列就一条消息,没意思。什么事务啊?我都没启用事务,他怎么就进去了呢?你不说重试是默认6次吗?我都没改配置,怎么就进了?1.如何让消息... -
RabbitMQ的死信队列
2021-11-13 14:12:57什么是死信 在 RabbitMQ 中充当主角的就是消息,在不同场景下,消息会有不同地表现。 死信就是消息在特定场景下的一种表现形式,这些场景包括: ...死信队列在 RabbitMQ 中并不会单独存在,往往死信队列都会绑 -
关于websphereMQ 对死信队列的处理
2017-08-18 02:22:51现状是目标队列满了以后,消息会存入队列管理器的死信队列,我试着取了死信队列是一串数字看不懂,我知道死信队列是加了一个DLQ的修饰头标记,但是对于死信队列中的数据应该处理呢?求哪位大神给我支一招.............. -
RabbitMQ:死信队列+延迟队列
2022-04-28 21:15:27文章目录1、死信队列1.1、概念1.2、死信的来源1.3、死信实战1.3.1、消息TTL过期1.3.2、队列达到最大长度1.3.3、消息被拒2、延迟队列2.1、概念2.2、延迟队列使用场景2.3、整合SpringBoot2.3.1、添加依赖2.3.2、修改... -
rabbitmq死信队列详解与使用
2019-08-28 22:26:13consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信,自然就有了死信队列; 以上是个人的通俗解释,专业术语解释的... -
activemq--死信队列
2021-08-26 08:51:07死信队列 死信队列:Dead Letter Queue ...2.死信队列:处理异常信息 配置 SharedDeadLetterStrategy:共享死信队列策略 将所有的DeadLetter保存在一个共享的队列中,这是activemq的默认策略。共享队列. -
Java秒杀系统实战系列~RabbitMQ死信队列处理超时未支付的订单
2019-08-08 12:00:40本篇博文是“Java秒杀系统实战系列文章”的第十篇,本篇博文我们将采用RabbitMQ的死信队列的方式处理“用户秒杀成功生成订单后,却迟迟没有支付”的情况,一起来见识一下RabbitMQ死信队列在实际业务环境下的强大之处... -
Spring boot 整合 rabbit MQ 死信队列的应用-订单过期自动取消
2019-10-06 11:45:15如果实现过期自动取消,下面有几种解决方案 ...其实方案二和死信队列的原理差不多,但是MQ已经拥有类似的机制,所以我们直接沿用即可。 死信队列介绍 死信队列:DLX,dead-letter-exchange 利... -
RabbitMQ中消费者对死信队列的处理
2020-08-01 08:43:59我们希望的是:可以指定重试的次数,重试完了之后进入死信队列,然后就可以人为的对死信队列进行处理 代码实现 设置重试次数 关联死信队列 需要将一个Queue关联死信队列的Exchange和RoutingKey package ... -
【RabbitMQ 实战指南】一 死信队列
2021-04-24 00:02:491、死信队列DLX,全称为 Dead-Letter-Exchange,可以称之为死信交换器。当消息在一个队列中变成死信(dead message)之后,它能被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列。DLX 也是... -
RabbitMQ死信队列实现定时任务
2020-11-29 18:31:08RabbitMQ本身不支持延时队列,但是可以利用以下两大特性曲线实现延时队列: 1.1 Time To Live ( TTL ) RabbitMQ可以针对Queue设置x-expires 或者针对消息设置x-message-ttl来控制消息的生存时间(都设置则以较短的为... -
RocketMQ-什么是死信队列?怎么解决
2022-01-14 09:43:14目录 什么是死信队列 死信队列的特征 死信消息的处理 什么是死信队列 当一条消息初次消费失败,消息队列会自动进行消费重试;达到最大重试次数后,若消费...死信队列是用于处理无法被正常消费的消息的。 ... -
rabbitmq死信队列以及延迟
2022-05-01 16:17:41“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。 RabbitMQ 中有一种交换器叫 DLX,全称为 Dead-Letter-Exchange,可以称之为... -
RabbitMQ之死信队列
2021-12-21 23:04:00死信队列1、死信队列的概念2、死信的来源3、死信实战...queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。 应用场景:为了保证订单业务的消息数据不丢失,需要使 -
RabbitMQ-Java-死信队列
2022-05-12 17:13:30来自队列的消息可以是“死信”;也就是说,当发生以下任何事件时,重新发布到交易所: 消费者使用basic.reject或 basic.nack否定确认消息,并将requeue参数设置为false。 消息由于每条消息的 TTL而过期 消息被丢弃... -
RabbitMQ死信队列-案例及代码梳理
2022-01-12 21:27:33RabbitMQ死信队列-案例及代码梳理 -
RabbitMQ(一)使用死信队列实现延迟队列消费
2020-12-07 16:11:02在使用rabbitmq实现延迟消费时,需要先明白什么是死信队列,什么是延迟队列。 rabbitmq并没有直接支持延迟队列,延迟队列是通过死信队列来实现的。 死信队列 DLX(Dead Letter Exchange),死信交换器。当队列中的... -
RocketMQ死信队列
2022-02-01 16:37:56RocketMQ的死信队列介绍就没有RabbitMQ那么复杂。在这篇文章会简单的介绍一下RocketMQ死信队列的特点。 死信消息特性 不会再被消费者正常消费 有效期与正常消息相同,均为 3 天,3 天后会被自动删除。故死信消息应... -
RabbitMQ死信队列和延时队列
2021-07-07 22:45:54consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,接收死信消息的队列就叫死信队列 2.成为死信的条件 一个消息如果满足... -
RocketMQ重试队列与死信队列简介
2021-11-29 00:22:011、消费者异常了怎么办?假设我们使用RocketMQ作为消息中间件,传输订单相关的数据,消费者拿到数据后,执行一些后续处理,比如调用物流系统,准备发货。如果这时候,物流系统的数据库宕机了,... -
RabbitMQ死信队列
2022-04-12 07:30:23消费者从队列取出消息进行消费,但某些时候由于特定的原因导致队列中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有死信队列。 2.应用场景 (1)为了保证订单业务的数据不... -
RabbitMQ(五)死信队列、延迟队列
2022-06-11 00:10:58死信队列、延迟队列 -
RabbitMQ高级:死信队列详解
2022-03-15 16:32:26应用场景:下单20分钟还未完成支付,订单自动取消(转移到死信队列,并更新订单状态) 什么是死信队列? DLX (全称:Dead-Letter-Exchange 可称为死信交换机),如果一条消息成为死信 dead message,它不是...