精华内容
下载资源
问答
  • RocketMq支持任意延迟的延时消息方案 主要特性 支持任意延迟的延时消息,精确到秒,最长延迟时间为1年。 使用方法 配置broker.conf segmentScale=60 ##每个时间桶的时间范围,单位分钟,默认60,可选值0-60之间,...
  • RabbitMQ延时消息实现方案,主要是用于java开发中企业实际应用,有流程图,详细配置
  • 由于日常开发中遇到几次使用延时消息的场景,而且目前业务中使用到的消息中间件有rabbitmq和kafka,对延时消息的支持都不太理想。 其中 rabbitmq 延时消息是通过 设置队列ttl+死信exchange实现 缺点嘛:每次都得...

    由于日常开发中遇到几次使用延时消息的场景,而且目前业务中使用到的消息中间件有rabbitmq和kafka,对延时消息的支持都不太理想。
    其中

    • rabbitmq 延时消息是通过 设置队列ttl+死信exchange实现
      • 缺点嘛:每次都得设置两个队列,一个用来实现延时,过期后经死信exchange转到对应的业务队列提供消费。
      • 另:rabbitmq有提供延时插件,但缺点较多,如:1. 启动插件要么重启,要么引入一个新的集群;2. 不支持高可用,延时消息发送前只存储在当前broker节点的内部数据库Mnesia中,不会被镜像复制;3. 延时不可靠,存在消息数量较大或使用很久后延迟不准确;4:不支持大规模消息,同3;5:只支持ram节点(Mnesia数据库存在磁盘中)
    • kafka 延时消息通过在消费端判断消息是否达到消费时间,决定是否进行消费实现。未达到延时时间则暂停消费。
      • 缺点:针对单个topic固定的延时时间。 需要额外在消费端进行开发 (实际上这种在消费端控制延时的方式大部分消息队列都能做到)

    无论是rabbitmq的死信还是kafka消费端控制,基本上都是每个topic只能使用固定的延时时间。但现实中,也存在一些同一个业务场景使用不同延时时间的消息的场景:

    • 考试结束后强制交卷。不同的考试规定的时间是不一样的,不可能每个考试都创建一个新的队列。
    • 一些异常的重试操作。执行某个操作失败后,需要多次不同等级的延时重试。(虽说这个用一个本地线程也可以,但是在同一台机器上延时重试,仍然存在较大可能失败。所以比较关键场景可以使用延时消息,分发到其他机器上执行。)

    于是调研了一下其他的消息中间件,发现rocketmq貌似是支持延时消息的,虽然也只有固定的18个延时等级,但相比rabbitmq和kafka固定的延时消息,要好很多了。于是开始学习探究rocketmq延时消息的实现。

    正文开始

    先了解一下rocketmq

    rocketmq 的架构

    在这里插入图片描述

    整个架构如图,简单描述一下:

    • nameServer 提供注册中心的服务,负责broker的管理,以及topic路由信息的管理。
    • brokerServer 则主要负责消息的存储、投递和查询及高可用。
    • Producer 连接nameServer获取到broker信息后,发送信息到对应的broker。
    • Consumer 同样先连接 nameServer,查询topic路由信息,然后连接broker消费消息。

    消息的存储

    在这里插入图片描述
    如图,rocketmq 的所有消息都存储在 commitlog 中,然后ConsumerQueue作为逻辑消费队列,维护一个topic消息的索引,记录topic内消息在commitlog中的一些信息。其中 ConsumeQueue的存储单元为 8字节的offset+4字节的size+8字节的tags hashcode, 对于延时消息,最后8字节则用于存储消息计划投递时间。

    然后关于rocketmq的延时消息的使用

    rocketmq 只支持固定18个等级的延时消息:
    1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

    发送延时消息只需要 setDelayTimeLevel 就可以,(连个延时等级相关的常量都没有。。。)

        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
         // Launch producer
         producer.start();
         int totalMessagesToSend = 100;
         for (int i = 0; i < totalMessagesToSend; i++) {
             Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
             // This message will be delivered to consumer 10 seconds later.
             message.setDelayTimeLevel(3);
             // Send the message
             producer.send(message);
         }
    
         // Shutdown producer after use.
         producer.shutdown();
    

    然后关于延时消息的实现

    先看下流程图
    在这里插入图片描述

    然后文字+代码介绍

    • org.apache.rocketmq.store.CommitLog#putMessage
      通过查看putMessage源码可以得知,rocketmq在最终将message存入commitlog时,会先判断是否延时消息,如果延时消息则替换topic为SCHEDULE_TOPIC_XXXX,并将原topic存入message.properties,然后根据延时level存入指定的queue。(18个延时等级分别对应18个queue的id)。

      ...
          // Delay Delivery
          if (msg.getDelayTimeLevel() > 0) {
              if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                  msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
              }
      
              topic = ScheduleMessageService.SCHEDULE_TOPIC;
              queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
      
              // Backup real topic, queueId
              MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
              MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
              msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
      
              msg.setTopic(topic);
              msg.setQueueId(queueId);
          }
      ...
      
    • org.apache.rocketmq.store.schedule.ScheduleMessageService#start
      延时发送的代码,启动1s后,对已创建的delayQueue启动一个投递延时消息的任务,然后根据offset批量拉取对应的消息,判断是否到达投递时间,未到达则使用timer延时对应时长后启动下一次投递任务;到达投递时间则恢复原始topic和queueid并调用writeMessageStore.putMessage(msgInner)将消息再次投递到commitlog中,然后投递下一个消息。

      public void start() {
          if (started.compareAndSet(false, true)) {
              this.timer = new Timer("ScheduleMessageTimerThread", true);
              for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {
                  Integer level = entry.getKey();
                  Long timeDelay = entry.getValue();
                  Long offset = this.offsetTable.get(level);
                  if (null == offset) {
                      offset = 0L;
                  }
                  // 1s 后启动一个投递延时消息的任务
                  if (timeDelay != null) {
                      this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);
                  }
              }
      
              // 默认每隔10s持久化一次 delayConsumeQueue的offset信息, 一个delayOfset.json文件
              this.timer.scheduleAtFixedRate(new TimerTask() {
                  @Override
                  public void run() {
                      try {
                          if (started.get()) ScheduleMessageService.this.persist();
                      } catch (Throwable e) {
                          log.error("scheduleAtFixedRate flush exception", e);
                      }
                  }
              }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval());
          }
      }
      
    • org.apache.rocketmq.store.schedule.ScheduleMessageService.DeliverDelayedMessageTimerTask#executeOnTimeup 延时任务处理逻辑
      先获取当前延时等级的ConsumeQueue逻辑消费队列。然后根据传入的offset获取消息offset,tagsCode(延时消息存的是计划投递时间)。然后判断消息是否到期,到期则根据offset从commitLog中取出消息内容,并将其投递到原始topic中;如果未到期则再次在timer中添加一个延时任务(延时时间为计划投递的时间)。然后继续处理下一条记录。

      public void executeOnTimeup() {
              ConsumeQueue cq = ScheduleMessageService.this.defaultMessageStore.findConsumeQueue(SCHEDULE_TOPIC, delayLevel2QueueId(delayLevel));
      
              long failScheduleOffset = offset;
      
              if (cq != null) {
                  SelectMappedBufferResult bufferCQ = cq.getIndexBuffer(this.offset);
                  if (bufferCQ != null) {
                      try {
                          long nextOffset = offset;
                          int i = 0;
                          ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
                          for (; i < bufferCQ.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {
                              long offsetPy = bufferCQ.getByteBuffer().getLong(); //消息在commitLog中的offset
                              int sizePy = bufferCQ.getByteBuffer().getInt();
                              long tagsCode = bufferCQ.getByteBuffer().getLong(); // 计划投递时间
      
                              //省略一些代码....
      
                              //判断消息是否到期
                              long countdown = deliverTimestamp - now;
                              if (countdown <= 0) {
                                  MessageExt msgExt = ScheduleMessageService.this.defaultMessageStore.lookMessageByOffset(offsetPy, sizePy);
                                  if (msgExt != null) {
                                      try {
                                          MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
                                          PutMessageResult putMessageResult =
                                              ScheduleMessageService.this.writeMessageStore
                                                  .putMessage(msgInner);
      
                                          ....
                                          
                                      } catch (Exception e) {
                                          .....
                                      }
                                  }
                              } else {
                                  ScheduleMessageService.this.timer.schedule(
                                      new DeliverDelayedMessageTimerTask(this.delayLevel, nextOffset),countdown);
                                  ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                                  return;
                              }
                          } // end of for
      
                          nextOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
                          ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(
                              this.delayLevel, nextOffset), DELAY_FOR_A_WHILE);
                          ScheduleMessageService.this.updateOffset(this.delayLevel, nextOffset);
                          return;
                      } finally {
      
                          bufferCQ.release();
                      }
                  } // end of if (bufferCQ != null)
                  else {
                      long cqMinOffset = cq.getMinOffsetInQueue();
                      if (offset < cqMinOffset) {
                          failScheduleOffset = cqMinOffset;
                          log.error("schedule CQ offset invalid. offset=" + offset + ", cqMinOffset="
                              + cqMinOffset + ", queueId=" + cq.getQueueId());
                      }
                  }
              } // end of if (cq != null)
      
              ScheduleMessageService.this.timer.schedule(new DeliverDelayedMessageTimerTask(this.delayLevel,
                  failScheduleOffset), DELAY_FOR_A_WHILE);
          }
      

    总结

    rocketmq 先将不同延时等级的消息存入内部对应延时队列中,然后不断的从延时队列中拉取消息判断是否到期,然后进行投递到对应的topic中。

    通过固定延时等级的方式,同一个队列中的消息都是相同的延时等级,不需要对消息进行排序,只需要按顺序拉取消息判断是否可以投递就行了。但也限制了延时时间。

    另外,因为只要延时消息存入延时队列中,就会写入commitlog文件中,然后rocketmq的高可用(同步复制或异步复制)就会将消息复制到slave中,从而保证延时消息的可靠性。

    虽然rocketmq不支持任意延时时间,但相比于rabbitmq的死信消息,仍然提供了18个延时等级,基本也能覆盖很多场景了。

    另外:后面又看到了去哪了开源的qmq,貌似支持任意延迟时间,感觉也可以学习一波。

    展开全文
  • Serve 基于Swoole Server 编写的消息队列消费系统
  • 延时消息1.1 使用限制1.2 示例1.2.1 延时消息的生产者1.2.2 延时消息的消费者2. 批量消息2.1 小于4MB的批量消息发送2.2 大于4MB的批量消息发送2.3 消费者代码 1. 延时消息 延时消息是指消费者延时消费,比如电商里...

    1. 延时消息

    延时消息是指消费者延时消费,比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。

    它的实现与普通消息的发送和消费没多大区别,只多了一句话:

    message.setDelayTimeLevel()
    

    1.1 使用限制

    现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18

    private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
    

    1.2 示例

    1.2.1 延时消息的生产者

    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
    	// 创建 producer 实例,并设置生产者组名
    	DefaultMQProducer producer = new DefaultMQProducer("delayGroup");
    	// 设置 NameServer 地址
    	producer.setNamesrvAddr("127.0.0.1:9876");
    	// 启动生产者
    	producer.start();
    
    	for (int i = 0; i < 3; i++) {
    		Message message = new Message("delayTopic", "delayTag", ("Hello Delay message-" + i).getBytes());
    		
    		// 设置延时等级2,这个消息将在5s之后发送(现在只支持固定的几个时间,详看delayTimeLevel)
    		message.setDelayTimeLevel(2);
    		// 发送消息
    		SendResult sendResult = producer.send(message);
    		// 打印结果
    		System.out.println(String.format("SendResult status:%s, queueId:%d",
    				sendResult.getSendStatus(),
    				sendResult.getMessageQueue().getQueueId()));
    	}
    	// 关闭生产者
    	producer.shutdown();
    }
    

    打印结果:

    SendResult status:SEND_OK, queueId:2
    SendResult status:SEND_OK, queueId:3
    SendResult status:SEND_OK, queueId:0
    

    1.2.2 延时消息的消费者

    public static void main(String[] args) throws MQClientException {
    	// 实例化消费者,并设置组名
    	DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delayGroup");
    	// 设置 NameServer 地址
    	consumer.setNamesrvAddr("127.0.0.1:9876");
    	// 订阅 Topic
    	consumer.subscribe("delayTopic", "delayTag");
    	// 注册消息监听者
    	consumer.registerMessageListener(new MessageListenerConcurrently() {
    		@Override
    		public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
    			for (MessageExt message : messages) {
    				// 打印消息内容和延时时间
    				System.out.println("Receive message[msgBody=" + new String(message.getBody()) + "]");
    			}
    			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    		}
    	});
    	// 启动消费者
    	consumer.start();
    }
    

    在5秒钟之后就会收到延时消息,打印结果:

    Receive message[msgBody=Hello Delay message-0] 
    Receive message[msgBody=Hello Delay message-1] 
    Receive message[msgBody=Hello Delay message-2] 
    

    2. 批量消息

    批量发送消息能显著提高传递小消息的性能。限制是这些批量消息应该有相同的 topic,相同的waitStoreMsgOK,而且不能是延时消息。此外,这一批消息的总大小不应超过4MB

    2.1 小于4MB的批量消息发送

    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
    	// 创建 producer 实例,并设置生产者组名
    	DefaultMQProducer producer = new DefaultMQProducer("batchGroup");
    	// 设置 NameServer 地址
    	producer.setNamesrvAddr("127.0.0.1:9876");
    	// 启动生产者
    	producer.start();
    
    	// 创建批量消息
    	List<Message> messageList = new ArrayList<>();
    	Message message1 = new Message("batchTopic", "batchTag", ("Hello Batch message-1").getBytes());
    	Message message2 = new Message("batchTopic", "batchTag", ("Hello Batch message-2").getBytes());
    	Message message3 = new Message("batchTopic", "batchTag", ("Hello Batch message-3").getBytes());
    	messageList.add(message1);
    	messageList.add(message2);
    	messageList.add(message3);
    
    	// 发送批量消息
    	SendResult sendResult = producer.send(messageList);
    	// 打印结果
    	System.out.println(String.format("SendResult status:%s, queueId:%d",
    			sendResult.getSendStatus(),
    			sendResult.getMessageQueue().getQueueId()));
    	// 关闭生产者
    	producer.shutdown();
    }
    

    2.2 大于4MB的批量消息发送

    要发送大于4MB的消息,我们只需要把大的消息分裂成若干个小的消息。首先创建一个拆分消息的工具类

    public class ListSplitter implements Iterator<List<Message>> {
        private final int SIZE_LIMIT = 1024 * 1024 * 4;
        private final List<Message> messages;
        private int currIndex;
    
        public ListSplitter(List<Message> messages) {
            this.messages = messages;
        }
    
        @Override
        public boolean hasNext() {
            return currIndex < messages.size();
        }
        @Override
        public List<Message> next() {
            int nextIndex = currIndex;
            int totalSize = 0;
            for (; nextIndex < messages.size(); nextIndex++) {
                Message message = messages.get(nextIndex);
                int tmpSize = message.getTopic().length() + message.getBody().length;
                Map<String, String> properties = message.getProperties();
                for (Map.Entry<String, String> entry : properties.entrySet()) {
                    tmpSize += entry.getKey().length() + entry.getValue().length();
                }
                // 增加日志的开销20字节
                tmpSize = tmpSize + 20;
    
                if (tmpSize > SIZE_LIMIT) {
                    //单个消息超过了最大的限制
                    //忽略,否则会阻塞分裂的进程
                    if (nextIndex - currIndex == 0) {
                        //假如下一个子列表没有元素,则添加这个子列表然后退出循环,否则只是退出循环
                        nextIndex++;
                    }
                    break;
                }
                if (tmpSize + totalSize > SIZE_LIMIT) {
                    break;
                } else {
                    totalSize += tmpSize;
                }
    
            }
            List<Message> subList = messages.subList(currIndex, nextIndex);
            currIndex = nextIndex;
            return subList;
        }
    }
    

    然后在实现发送消息,和上面的区别在于在发送消息前,使用了工具类分隔消息

    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
    	// 创建 producer 实例,并设置生产者组名
    	DefaultMQProducer producer = new DefaultMQProducer("batchGroup");
    	// 设置 NameServer 地址
    	producer.setNamesrvAddr("127.0.0.1:9876");
    	// 启动生产者
    	producer.start();
    
    	// 创建批量消息
    	List<Message> messageList = new ArrayList<>();
    	Message message1 = new Message("batchTopic", "batchTag", ("Hello Batch message-1").getBytes());
    	Message message2 = new Message("batchTopic", "batchTag", ("Hello Batch message-2").getBytes());
    	Message message3 = new Message("batchTopic", "batchTag", ("Hello Batch message-3").getBytes());
    	messageList.add(message1);
    	messageList.add(message2);
    	messageList.add(message3);
    
    	//发送批量消息:把大的消息分裂成若干个小的消息
    	ListSplitter splitter = new ListSplitter(messageList);
    	while (splitter.hasNext()) {
    		try {
    			List<Message> listItem = splitter.next();
    			SendResult sendResult = producer.send(listItem);
    			System.out.println(String.format("SendResult status:%s, queueId:%d",
    					sendResult.getSendStatus(),
    					sendResult.getMessageQueue().getQueueId()));
    		} catch (Exception e) {
    			e.printStackTrace();
    			//处理error
    		}
    	}
    	// 关闭生产者
    	producer.shutdown();
    }
    

    2.3 消费者代码

    消费者代码与普通消费者代码实现一样

    public static void main(String[] args) throws MQClientException {
    	// 实例化消费者,并设置组名
    	DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batchGroup");
    	// 设置 NameServer 地址
    	consumer.setNamesrvAddr("127.0.0.1:9876");
    	// 订阅 Topic
    	consumer.subscribe("batchTopic", "batchTag");
    	// 注册消息监听者
    	consumer.registerMessageListener(new MessageListenerConcurrently() {
    		@Override
    		public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
    			for (MessageExt message : messages) {
    				// 打印消息内容
    				System.out.println("Receive message[msgBody=" + new String(message.getBody()) + "] ");
    			}
    			return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    		}
    	});
    	// 启动消费者
    	consumer.start();
    }
    

    技 术 无 他, 唯 有 熟 尔。
    知 其 然, 也 知 其 所 以 然。
    踏 实 一 些, 不 要 着 急, 你 想 要 的 岁 月 都 会 给 你。


    展开全文
  • 主要介绍了golang实现redis的延时消息队列功能,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
  • 消息队列服务相信大家一定都不陌生了,在很多应用系统中,都有一些场景会使用到消息队列服务,简单来说,我们可以把消息队列比作是一个存放消息的容器,上游发送端将消息发送到消息队列,下游消费端从消息队列里消费...

    前言

    消息队列服务相信大家一定都不陌生了,在很多应用系统中,都有一些场景会使用到消息队列服务,简单来说,我们可以把消息队列比作是一个存放消息的容器,上游发送端将消息发送到消息队列,下游消费端从消息队列里消费消息。消息队列是分布式系统中重要的组件,核心作用可以帮助我们实现异步、解耦以及削峰,从而提高系统性能和稳定性。

    在大部分场景下业务系统如果只需要实现异步解耦、削峰填谷等能力,常规的普通消息就可以满足此类需求。除此之外,在某些特殊的业务场景中,普通消息类型存在无法满足需求的情况。这就需要消息队列服务本身支持一些特殊的消息类型,或者开发者通过开发一些定制化的代码实现目的。这里我们列举在使用消息队列过程中几种特殊场景的例子:

    顺序消费场景

    生产者按照一定的先后顺序发布消息,消费者按照既定的先后顺序消费消息,即先发布的消息一定会先被客户端消费。

    分布式事务场景

    分布式架构下,随着系统的演进,数据库也进行了垂直拆分,如果选择使用消息队列进行上下游解耦的话,生产者和消费者需要保证数据一致性。

    延时消费场景

    生产者将消息发送到消息队列后,并不期望立马投递这条消息,而是推迟到某个时间点之后将消息投递给消费者进行消费。

    对于顺序消息和事务消息,这里就不进行详细介绍了,大家有兴趣可以自行研究,本文后续内容会和大家一起详细讨论下延时消息更多的细节及应用场景。

    延时消息介绍

    延时(定时)消息的特点就是发送者成功发送一条消息后,这条消息并不会马上被消费者消费,而是在某个特定的时间或者延迟一段时间后,消息才被消费者可见并进行后续的消费,延时消息整个生命周期可以用如下示意图来表示:

    1. 消息发布者将一条延时消息发送到消息队列服务端;
    2. 在预计投递时间未到之前,消息对消费者不可见,消费者此时无法立刻消费;
    3. 投递时间到达后,消息才对消费者可见,消费者此时可以消费;
    4. 消费者获取此条消息并进行消费;
    5. 消费者成功消费后,进行确认,此条消息将不再被消费。

    延时消息应用场景

    交易场景

    在生产者和消费者有时间窗口的要求下,我们可以考虑使用延时消息。如在电商交易场景下,交易中超时未支付的订单需要被关闭的场景,在订单创建时会发送一条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后,需要判断对应的订单是否已完成支付;如支付未完成,则关闭订单。

    游戏场景

    再比如在游戏社区里,游戏运营方经常会发起一些活动,玩家在活动期间内按照规则完成一系列任务,活动时间截止后,游戏后台根据玩家完成任务的情况进行判定,发送系统通知或者进行rank排名并派发奖励等。

    此种场景也可以采用延时消息来实现,上游系统发布活动公告后,同时发送一条延时消息,延时时间设置为活动周期的时间。当活动截止后,下游系统可以随即消费消息并进行相应的逻辑处理。

    其他场景

    同时延时消息也可以广泛应用于信息提醒等比较通用的场景。

    如何实现延时消息

    介绍完延时消息的一些概念及应用场景后,我们接下来分析一下目前比较主流的几款开源消息中间件对延时消息的支持情况以及实现方式。

    Kafka

    原生Kafka默认是不支持延时消息的,需要开发者自己实现一层代理服务,比如发送端将消息发送到延时Topic,代理服务消费延时Topic的消息然后转存起来,代理服务通过一定的算法,计算延时消息所附带的延时时间是否到达,然后将延时消息取出来并发送到实际的Topic里面,消费端从实际的Topic里面进行消费。

    RabbitMQ

    RabbitMQ实现延时消息有两种方案,第一种是采用rabbitmq-delayed-message-exchange 插件实现,第二种则是利用DLX(Dead Letter Exchanges)+ TTL(消息存活时间)来间接实现。大致的实现思路如下:

    1. 创建一个普通队列delay_queue,为此队列设置死信交换机 (通过x-dead-letter-exchange参数) 和 RoutingKey (通过x-dead-letter-routing-key参数),生产者将向delay_queue发送延时消息。
    2. 创建步骤1中设置的死信交换机,同时创建一个目标队列 target_queue,并使用步骤1中设置的RoutingKey将两者绑定起来。消费者将从target_queue里面消费延时消息。
    3. 设置消息的存活时间TTL,可以在步骤1中设置到队列级别delay_queue的消息存活时间,或者在发送消息时动态设置消息级别的存活时间。

    RocketMQ

    开源RocketMQ支持延迟消息,但是不支持秒级精度。默认支持18个level的延迟消息,这是通过broker端的messageDelayLevel配置项确定的
    messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

    消息队列服务在启动时,会创建一个内部topic:SCHEDULE_TOPIC_XXXX,根据延迟level的个数,创建对应数量的队列。生产者发送消息时可以设置延时等级,示例代码:

    Message msg=new Message();
    msg.setTopic("TopicA");
    msg.setBody("this is a delay message".getBytes());
    //设置延迟level为5,对应延迟1分钟
    msg.setDelayTimeLevel(5);
    producer.send(msg);

    发送的消息会暂存在Broker对应的内部topic中,再通过定时任务从内部topic中拉取数据,如果延迟时间到了,就会把消息转发到目标topic下,消费者从目标topic消费消息。

    阿里云消息队列RocketMQ版

    通过上一章节的讨论,我们可以看出目前几款主流的开源消息队列服务,在支持延时消息的场景下或多或少有些不完美的地方。主要体现在以下几点:

    1. Kafka不支持延时消息,需要完全开发代理服务来实现,工作量大。
    2. RabbitMQ需要额外的插件,或者利用DLX+TTL的方式进行中转,实现不是非常直观。
    3. RocketMQ支持延时消息,但是无法支持秒级延时。

    那么有没有一款消息队列服务,能够完美的支持延时(定时)消息。本节我们将介绍阿里云消息队列RocketMQ版。

    阿里云消息队列RocketMQ版基于Apache RocketMQ构建的低延迟、高并发、高可用、高可靠的分布式消息中间件。消息队列RocketMQ版既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。同时支持丰富的消息类型包括普通消息、顺序消息、事务消息以及我们本文讨论的延时消息。接下来我们看下阿里云RocketMQ为延时消息提供的能力及优势:

    1. 支持秒级的延时(定时)消息,同时延时时间可以最大设置为40天,基本满足所有场景。
    2. 延时(定时)消息的投递精度可控制在1~2秒之内。
    3. 延时(定时)消息在某段时间内是对消费者不可见的,从另一个维度看也属于积压的消息,阿里云消息队列RocketMQ版的不同实例规格可以支持亿级的消息积压。
    4. 提供了多语言支持,包括Java、.NET、CC++、GO、Python、PHP、Node.js等

    使用阿里云消息队列RocketMQ版收发延时(定时)消息,只需要在控制台创建Topic的时候选择定时/延时消息类型,既可以使用TCP或者http协议进行消息收发。

    控制台创建定时/延时Topic

    Java语言示例代码(TCP协议)

    • 发送定时消息
    // 定时消息,单位毫秒(ms),在指定时间戳(当前时间之后)进行投递,例如2020-03-07 16:21:00投递。如果被设置成当前时间戳之前的某个时刻,消息将立刻投递给消费者。
    long timeStamp = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse("2020-03-07 16:21:00").getTime();
    msg.setStartDeliverTime(timeStamp);
    // 发送消息,只要不抛异常就是成功。
    SendResult sendResult = producer.send(msg);
    • 发送延时消息
    // 延时消息,单位毫秒(ms),在指定延迟时间(当前时间之后)进行投递,例如消息在3秒后投递。
    long delayTime = System.currentTimeMillis() + 3000;
    // 设置消息需要被投递的时间。
    msg.setStartDeliverTime(delayTime);
    SendResult sendResult = producer.send(msg);

    同时订阅延时消息的逻辑无需任何改造,完全可以按照订阅普通消息的方式,没有任何的代码侵入性。

    结束语

    到此我们讨论了延时消息的特性、应用场景,对比了各类消息队列对延时消息的支持情况,同时也向大家介绍了阿里云消息队列RocketMQ版。我们在对消息中间件进行选型时,也会考虑到多方面的因素。除了消息中间件本身所能提供的能力外,也包括服务性能、稳定性、可扩展能力,以及需要结合开发团队自身的技术栈等情况。

    作者:阿里云解决方案架构师 鹿玄

     

    原文链接

    本文为阿里云原创内容,未经允许不得转载。

    展开全文
  • 手把手教springboot整合rabbitmq发送延时消息


    gitee demo地址:

    一、Rabbit MQ 实现延时消息

    1.死信队列

    通过死信队列来间接实现延时消息,如果延时时间,不是一个固定的时间,则不建议使用这种方式。

    (1) 具体实现

    1. 引入依赖
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
                <version>2.1.8.RELEASE</version>
            </dependency>
    
    1. 声明相关 exchange route-key queue
    package com.lfg.message.delay.message.demo.delay.plugin;
    
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author lfg
     * @version v1.0
     */
    @Configuration
    public class RabbitMqConfig {
        /**
         * 流程:
         * 1.消费者:重发队列跟 重发交换机 重发路由绑定
         * 2.死信交换机 死信路由不绑定消费者
         * 3.死信消息找不到消费者,有效时间到,自动转发到重发交换机
         * 4.重发队列消费成功
         */
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 死信交换机
         */
        private String dlExchange = "dead-letter-exchange-name";
        /**
         * 死信 路由key
         */
        private String dlRoutingKey = "dead-letter-routing-key-name";
        /**
         * 死信队列
         */
        private String dlQueue = "dead-letter-queue-name";
        /**
         * 重发交换机
         */
        private String reExchange = "exchange-name";
        /**
         * 重发路由key
         */
        private String reRoutingKey = "routing-key-name";
        /**
         * 重发队列
         */
        private String reQueue = "queue-name";
    
    
        /**
         * 声明死信交换机
         */
        @Bean("dlExchange")
        public Exchange dlExchange() {
            return ExchangeBuilder.directExchange(dlExchange).durable(true).build();
        }
    
        /**
         * 声明重发交换机
         */
        @Bean("reExchange")
        public Exchange reExchange() {
            return ExchangeBuilder.directExchange(reExchange).durable(true).build();
        }
    
        /**
         * 声明死信队列,指定死信交换机,死信路由
         */
        @Bean("dlQueue")
        public Queue dlQueue() {
            Map<String, Object> args = new HashMap<>(4);
            //指定交换机为死信交换机
            args.put("x-dead-letter-exchange", dlExchange);
            //指定路由为死信路由,值为重发路由
            args.put("x-dead-letter-routing-key", reRoutingKey);
            return QueueBuilder.durable(dlQueue).withArguments(args).build();
        }
    
        /**
         * 声明重发队列
         */
        @Bean("reQueue")
        public Queue reQueue() {
            return QueueBuilder.durable(reQueue).build();
        }
    
        /**
         * 死信队列绑定 死信交换机 死信路由
         */
        @Bean
        public Binding deadLetterPushBinding() {
            return new Binding(dlQueue, Binding.DestinationType.QUEUE, dlExchange, dlRoutingKey, null);
        }
    
        /**
         * 重发队列 绑定死信交换机 死信路由
         */
        @Bean
        public Binding redirectPushBinding() {
            return new Binding(reQueue, Binding.DestinationType.QUEUE, dlExchange, reRoutingKey, null);
        }
    
        /**
         * 重发队列 绑定重发交换机 重发路由
         */
        @Bean
        public Binding reExchangeBinding() {
            return new Binding(reQueue, Binding.DestinationType.QUEUE, reExchange, reRoutingKey, null);
        }
    }
    
    
    
    1. 生产者
    package com.lfg.message.delay.message.demo.delay.dl;
    
    import org.springframework.amqp.core.MessageDeliveryMode;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * @author lfg
     * @version v1.0
     */
    @RestController
    public class Product {
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @RequestMapping("/product/message")
        public String sendMessage() {
            rabbitTemplate.convertAndSend("dead-letter-exchange-name",
                    "dead-letter-routing-key-name", "测试消息:Test message !", message -> {
                        //设定编码
                        message.getMessageProperties().setContentEncoding("utf-8");
                        //永久有效
                        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                        //延时时间10s
                        message.getMessageProperties().setExpiration("10000");
                        return message;
                    });
            return "已发送延时消息!";
        }
    }
    
    
    1. 消费者
    package com.lfg.message.delay.message.demo.delay.dl;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
    * @author lfg
    * @version v1.0
    */
    @Component
    public class Consumer {
    
       private static final Logger log = LoggerFactory.getLogger(Consumer.class);
    
       @RabbitListener(queues = "queue-name")
       public void consume(Message message) {
           try {
               log.info("正在消费消息---{}", new String(message.getBody()));
           } catch (Exception e) {
               //注意进行异常处理,默认当出现异常时,消息将会被 再次立即重新消费
           }
       }
    }
    
    

    (2) 潜在的坑

    通过死信队列来间接实现延时消息,如果延时时间,不是一个固定的时间,则不建议使用这种方式。

    例如:一个请求推送一条消息两次,第一次延时时间1小时,第二次延时时间2小时。
    这时候有两次请求过来:
    第一个请求完成后,产生两条延时消息 A 和 B ,过期时间是 1 和 2 。
    第二个请求完成后,产生两条延时消息 C 和 D ,过期时间是 1 和 2 。

    理想情况:当消息过期时,则直接消费。消费顺序为 ACBD
    实际情况:根据消息的生成顺序延时消费。消费顺序为ABCD

    结论:当消息过期时间一致时则没有问题,但是当过期时间不一致,则会有大坑。

    ps: 再消息B-C之间,B过期时间2小时,C过期时间1小时,C先过期,B后过期。
    但消费顺序像队列一样 先进先出,B先消费,C后消费,并且B消费后,立即消费C

    通过建多个死信队列可以解决这个问题,过期时间类别少还好,多的话,非常麻烦。
    例如微信JSAPI支付成功,间隔时间回调通知便不适合建多个队列。

    2.延时插件

    通过安装延时插件来实现延时消息,此插件只适用于磁盘节点,内存节点会安装失败。

    (1) 具体实现

    1. rabbitmq安装延时插件,这里不做过多赘述,百度/google相关教程。
      官方插件链接
      插件下载地址:rabbitmq_delayed_message_exchange

    (2) 潜在的坑

    此插件只能安装在硬盘存储中,无法安装在内存存储。
    rabbitmq集群,一个内存存储,一个硬盘存储。
    对于生产环境不推荐使用此插件。
    插件github地址

    引用插件官方的一句话:

    This plugin was created with disk nodes in mind. RAM nodes are currently unsupported and adding support for them is not a priority (if you aren’t sure what RAM nodes are and whether you need to use them, you almost certainly don’t).

    二、Redis 实现延时消息

    1.具体实现

    推荐使用这种方式,但是需注意redis如果挂了,要保证存储数据的完整性。

    1. 生产者
    package com.lfg.message.delay.message.demo.delay.redis;
    
    import org.redisson.api.RDelayedQueue;
    import org.redisson.api.RQueue;
    import org.redisson.api.RedissonClient;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author lfg
     * @version v1.0
     */
    @RestController
    public class Product3 {
    
        @Autowired
        private RedissonClient redissonClient;
    
        /**
         * 频率为10s/20s/30s
         */
        private static final Long[] PUSH_DELAY = {10L, 20L, 30L};
    
        @RequestMapping("/product/message3")
        public String sendMessage() {
            for (int i = 0; i < PUSH_DELAY.length; i++) {
                //推送到队列
                RQueue<String> queue = redissonClient.getBlockingQueue("redis-key-name");
                RDelayedQueue<String> delayedQueue = redissonClient.getDelayedQueue(queue);
                delayedQueue.offer("测试消息-Test Message-" + PUSH_DELAY[i], PUSH_DELAY[i], TimeUnit.SECONDS);
                delayedQueue.destroy();
            }
            return "已发送延时消息!";
        }
    
    }
    
    
    1. 消费者
    package com.lfg.message.delay.message.demo.delay.redis;
    
    import org.redisson.api.RBlockingQueue;
    import org.redisson.api.RedissonClient;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.EnableScheduling;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;
    
    /**
     * @author lfg
     * @version v1.0
     */
    @Component
    @EnableScheduling
    public class Consumer3 {
    
        private static final Logger log = LoggerFactory.getLogger(Consumer3.class);
    
        @Autowired
        private RedissonClient redissonClient;
    
        @Scheduled(fixedDelay = 1000)
        public void consume() throws InterruptedException {
            RBlockingQueue<String> queue = redissonClient.getBlockingQueue("redis-key-name");
            String message = queue.take();
            log.info("正在消费推送信息:" + message);
            //此种方式抛异常也会正常消费掉消息
        }
    }
    
    
    展开全文
  • 1. 延时消息的使用场景 比如电商里,提交了一个订单就可以发送一个延时消息,1h后去检查这个订单的状态,如果还是未付款就取消订单释放库存。 2. 延时消息的使用限制 // org/apache/rocketmq/store/config/...
  • RocketMQ 延时消息实现

    2020-12-30 13:16:10
    RocketMQ提供了延时消息实现,不过这个延时是一定级别的延迟,默认在MessageStoreConfig.messageDelayLevel定义,1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h这里来看下RocketMQ实现这一功能的相关...
  • RocketMQ-延时消息Demo及实现原理分析

    万次阅读 多人点赞 2019-05-26 19:34:18
    文章目录延时消息Producer Demo源码分析延时消息持久化内部变量含义初始化load方法start方法延时消息调度总结 假设有这么一个需求,用户下单后如果30分钟未支付,则该订单需要被关闭。你会怎么做? 最简单的做法,...
  • 前言:因为公司需要一个kafka延时消息的组件服务,看了下市面上的实现kafka延时消息的实现,感觉都比较复杂难理解,自己就去研究了下使用其他中间件进行解决,于是有了这篇分享文章 实现技术:SpringBoot+kafka+...
  • RocketMq发送延时消息

    2021-11-30 10:45:06
    rocketmq可以发送延时消息,通过设置消息的延时等级即可。rocketmq的延时消息不支持任意时间的延迟,而是根据下面的固定的时间选择一个来延迟 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" ...
  • RocketMQ〖四〗顺序消息,延时消息,批量消息一. 顺序消息二. 延时消息1. 使用限制2. 延时消费代码三. 批量消息1. 发送批量消息制作不易,转载请标注~ 一. 顺序消息 建议还没有搭建rocketmq集群的先去我的第一节博客...
  • 使用场景 当我们在电商平台购买一件物品,但是没有付款时,平台会把对应的库存减少...此时我们就可以用到延时消息,订单没有支付发一个延时时间为30m的延时消息,30m过后系统就会收到这个消息,进而关闭订单 RocketMQ.
  • kafka 延时消息处理

    千次阅读 2020-08-10 01:45:04
    你一定遇到过这种情况,接收到消息时并不符合马上处理的条件(例如频率限制),但是又不能丢掉,于是先存起来,过一阵子再来处理。系统应该怎么设计呢?可能你会想到数据库,用一个字段来标记执行的状态,或者设置一...
  • RabbitMQ实现延时消息的两种方法

    千次阅读 2020-12-17 00:01:04
    文章目录RabbitMQ实现延时消息的两种方法1、死信队列1.1消息什么时候变为死信(dead-letter)1.2死信队列的原理1.3 代码实现1.4死信队列的一个小坑2 、延时插件2.1如何实现 RabbitMQ实现延时消息的两种方法 1、死信...
  • 延时消息适用的业务场景非常的广泛,在分布式系统环境下,延时消息的功能一般会在下沉到中间件层,通常是 MQ 中内置这个功能或者内聚成一个公共基础服务。 本文旨在探讨常见延时消息的实现方案以及方案设计的优缺点...
  • rocketmq 示例 延时消息

    2019-10-17 09:49:01
    rocketmq延时消息 rocketmq支持为消息设置延时时间,预设的时间长度为 1s、5s、10s、30; 1m、2m、 3m、4m、5m、6m、7m、8m、9m、10m、20m、30m; 1h,2h 通过message类中设置消息的延时级别 public ...
  • RocketMQ进阶-延时消息

    千次阅读 2020-05-20 17:33:07
    RocketMQ 延时消息
  • 而越来越多的公司采用 RocketMQ 做为自己系统的消息队列,但是目前开源的 RocketMQ 的延时消息只支持固定的延时消息,比如:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h但...
  • 实现RabbitMQ 延时消息

    千次阅读 2019-01-23 13:58:24
    RabbitMQ 延时消息的实现(上) 我们在实际业务中有一些需要延时发送消息的场景,例如: 家里有一台智能热水器,需要在30分钟后启动 未付款的订单,15分钟后关闭 注意这里的场景是延时,不是定时。当然,解决了...
  • 深入Kafka-延时消息

    千次阅读 2020-02-28 17:48:36
    如果在使用生产者客户端发送消息的时候将acks参数设置为-1,那么就意味着需要等待ISR 集合中...在将消息写入leader副本的本地日志文件之后,Kafka会创建一个延时的生产操作(DelayedProduce),用来处理消息正常写入所...
  • 主要介绍了使用Kotlin+RocketMQ实现延时消息的示例代码,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
  • 实现定时消息(延时消息),比较阿里云ONS、开源RocketMQ、Java异步线程、持久化定时任务等几种方式
  • Java - Redis 实现延时消息队列什么是延时任务延时任务的特点实现思路:代码实现使用事例 什么是延时任务 延时任务,顾名思义,就是延迟一段时间后才执行的任务。举个例子,假设我们有个发布资讯的功能,运营需要在...
  • 本次直播为您深入剖析延时消息的特性、应用场景,对比各类消息队列对延时消息的支持情况,并向大家介绍阿里云商业版RocketMQ消息队列服务。直播议题名称:消息队列之延时消息应用解析及实践讲师简介:鹿玄,阿里云...
  • 解决RocketMQ延时消息失效的问题

    千次阅读 2021-05-24 15:35:52
    解决RocketMQ延时消息失效的问题前言问题的排查解决方案总结 前言 先说一下使用场景,RocketMQ客户端使用的是阿里云的 <dependency> <groupId>com.aliyun.openservices</groupId> <...
  • 在实现分布式消息队列的延时消息之前,我们想想我们平时是如何在自己的应用程序上实现一些延时功能的?在Java中可以通过下面的方式来完成我们延时功能: ScheduledThreadPoolExecutor:ScheduledThreadPoolExecutor...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 94,021
精华内容 37,608
关键字:

延时消息