精华内容
下载资源
问答
  • rocketmq消息保留时间
    千次阅读
    2022-04-05 15:21:20

    RocketMQ 持久化机制

    RocketMQ 的消息持久化主要依靠以下文件完成

    1. CommitLog

      日志数据文件,存储消息内容,所有 queue 共享,不区分 topic ,顺序读写 ,1G 一个文件

    2. ConsumeQueue

      逻辑 Queue,基于 topic 的 CommitLog 的索引文件

      消息先到达 commitLog,然后异步转发到 consumeQueue,包含 queue 在 commitLog 中的物理偏移量 offset,消息实体内容大小和 Message Tag 的 hash 值,大于 600W 个字节,写满之后重新生成,顺序写

    3. IndexFile

      基于 Key 或 时间区间的 CommitLog 的索引文件,文件名以创建的时间戳命名,固定的单个 indexFile 大小为 400M,可以保存 2000W 个索引

    RocketMQ 的 queue 只存储少量数据、更加的轻量化,对于磁盘的访问是串行化避免磁盘竞争;缺点:写入是顺序写,但读是随机读,先读 ConsumeQueue,再读 CommitLog ,会降低消息读的效率

    消息发送到 broker 后,会被写入 commitLog,写之前加锁,保证顺序写入。然后转发到 consumeQueue

    • 同步刷盘:消息持久化到磁盘才会给生产者返回 ACK,保证消息可靠,但是会影响性能
    • 异步刷盘:消息写入 pageCache 就返回 ACK 给生产者,刷盘采用异步线程,降低读写延迟,提高性能和吞吐,但是消息有可能丢失
    更多相关内容
  • RocketMQ消息的存储设计

    万次阅读 2022-07-19 11:35:42
    RocketMQ因为有高可靠性的要求(宕机不丢失数据),所以数据要进行持久化存储。所以RocketMQ采用文件进行存储。

    RocketMQ因为有高可靠性的要求(宕机不丢失数据),所以数据要进行持久化存储。所以RocketMQ采用文件进行存储。

    物理文件结构

    RocketMQ的数据默认存储在${user.home}/store目录下,可以通过修改broker.conf中的参数storePathRootDir的值进行设置。

    物理目录结构大致如下:

    ├── abort
    ├── checkpoint
    ├── commitlog
    │   ├── 00000000000000000000
    │   └── 00000000001073741824
    ├── config
    │   ├── consumerFilter.json
    │   ├── consumerFilter.json.bak
    │   ├── consumerOffset.json
    │   ├── consumerOffset.json.bak
    │   ├── delayOffset.json
    │   ├── delayOffset.json.bak
    │   ├── subscriptionGroup.json
    │   ├── topics.json
    │   └── topics.json.bak
    ├── consumequeue
    │   ├── TopicTest
    │   │   ├── 0
    │   │   │   └── 00000000000000000000
    │   │   ├── 1
    │   │   │   └── 00000000000000000000
    │   │   ├── 2
    │   │   │   └── 00000000000000000000
    │   │   └── 3
    │   │       └── 00000000000000000000
    │   └── TopicTest1
    │       ├── 0
    │       │   └── 00000000000000000000
    │       └── 1
    │           └── 00000000000000000000
    ├── index
    │   └── 20220706092335158
    └── lock
    

    目录结构说明:

    • commitLog:消息存储目录
    • config:运行期间一些配置信息
    • consumerqueue:消息消费队列存储目录
    • index:消息索引文件存储目录
    • abort:如果存在改文件则Broker非正常关闭
    • checkpoint:文件检查点,存储CommitLog文件最后一次刷盘时间戳、consumerqueue最后一次刷盘时间,index索引文件最后一次刷盘时间戳。

    逻辑结构

    1657091649468.png

    RocketMQ消息的存储是由ConsumeQueue和CommitLog配合完成的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。

    • CommitLog:存储消息的元数据
    • ConsumerQueue:存储消息在CommitLog的索引
    • IndexFile:为了消息查询提供了一种通过key或时间区间来查询消息的方法,这种通过IndexFile来查找消息的方法不影响发送与消费消息的主流程

    1657091672408.png

    CommitLog

    CommitLog以物理文件的方式存放,每台Broker上的CommitLog被本机器所有ConsumeQueue共享。在CommitLog中,一个消息的存储长度是不固定的, RocketMQ采取一些机制,尽量向CommitLog中顺序写,但是随机读。commitlog文件默认大小为lG ,可通过在broker配置文件中设置mappedFileSizeCommitLog属性来改变默认大小。

    CommitLog作为消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。

    单个文件大小默认1G, 文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。

    每个Rocket实例只会往一个commitlog文件中写,写完一个接着写下一个。indexFile和ComsumerQueue中都有消息对应的物理偏移量,通过物理偏移量就可以计算出该消息位于哪个CommitLog文件上。

    CommitLog目前存储的MappedFile文件有两种内容类型:

    • MESSAGE:消息信息
    • BLANK:文件不足以存储消息时的空白占位

    MESSAGE

    第几位字段字节数数据类型说明
    1MsgLen4Int消息总长度
    2MagicCode4IntMESSAGE_MAGIC_CODE
    3BodyCRC4Int消息内容CRC
    4QueueId4Int消息队列编号
    5Flag4Intflag
    6QueueOffset8Long消息队列位置
    7PhysicalOffset8Long物理位置。在 CommitLog 的顺序存储位置。
    8SysFlag4IntMessageSysFlag
    9BornTimestamp8Long生成消息时间戳
    10BornHost8Long生效消息的地址+端口
    11StoreTimestamp8Long存储消息时间戳
    12StoreHost8Long存储消息的地址+端口
    13ReconsumeTimes4Int重新消费消息次数
    14PreparedTransationOffset8Long
    15BodyLength + Body4 + bodyLengthInt + Bytes内容长度 + 内容
    16TopicLength + Topic1 + topicLengthByte + BytesTopic长度 + Topic
    17PropertiesLength + Properties2 + PropertiesLengthShort + Bytes拓展字段长度 + 拓展字段

    BLANK

    第几位字段字节数数据类型说明
    1maxBlank4Int空白长度
    2MagicCode4IntBLANK_MAGIC_CODE

    ConsumeQueue

    ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个Message Queue都有一个对应的 ConsumeQueue文件。

    为了加速ConsumeQueue消息条目的检索速度与节省磁盘空间,每一个 Consumequeue条目不会存储消息的全量信息,存储结构如下:

    第几位字段字节数数据类型说明
    1offset8Long在commitLog中的偏移量
    2size4Int消息的大小
    3tagsCode8Longtag标签

    ConsumeQueue是消息的逻辑队列,相当于字典的目录,用来指定消息在物理文件CommitLog上的位置。

    ConsumeQueue对应一个topic的一个队列结构,由topic和queueId可以唯一创建一个ConsumeQueue结构,同样ConsumeQueue包含一个MappedFileQueue结构,而MappedFileQueue结构由多个MappedFile文件组成,每个文件的大小为30000020(300000 ConsumeQueue.CQ_STORE_UNIT_SIZE),其中20是queue中每个存储单元的大小。

    consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:${user.home}/store/consumequeue/{topic}/{queueId}/{fileName}。

    ConsumeQueue即为CommitLog文件的索引文件, 其构建机制是当消息到达Commitlog文件后 由专门的线程产生消息转发任务,从而构建消息消费队列文件(ConsumeQueue )与索引文件(Index File)。

    存储机制这样设计有以下几个好处:

    1. CommitLog顺序写 ,可以大大提高写入效率。(实际上,磁盘有时候会比你想象的快很多,有时候也比你想象的慢很多,关键在如何使用,使用得当,磁盘的速度完全可以匹配上网络的数据传输速度。目前的高性能磁盘,顺序写速度可以达到600MB/s,超过了一般网卡的传输速度,这是磁盘比想象的快的地方,但是磁盘随机写的速度只有大概lOOKB/s,和顺序写的性能相差6000倍!)
    2. 虽然是随机读,但是利用操作系统的pagecache机制,可以批量地从磁盘读取,作为cache存到内存中,加速后续的读取速度。
    3. 为了保证完全的顺序写,需要ConsumeQueue这个中间结构,因为ConsumeQueue里只存偏移量信息,所以尺寸是有限的,在实际情况中,大部分的ConsumeQueue能够被全部读入内存,所以这个中间结构的操作速度很快,可以认为是内存读取的速度。此外为了保证CommitLog和ConsumeQueue的一致性,CommitLog里存储了Consume Queues、Message Key、Tag等所有信息,即使ConsumeQueue丢失,也可以通过CommitLog完全恢复出来。

    IndexFile

    RocketMQ还支持通过MessageID或者MessageKey来查询消息;使用ID查询时,因为ID就是用broker+offset生成的(这里msgId指的是服务端的),所以很容易就找到对应的commitLog文件来读取消息。但是对于用MessageKey来查询消息,RocketMQ则通过构建一个index来提高读取速度。

    index存的是索引文件,这个文件用来加快消息查询的速度。消息消费队列RocketMQ专门为消息订阅构建的索引文件,提高根据主题与消息检索消息的速度 ,使用Hash索引机制,具体是Hash槽与Hash冲突的链表结构。1657099982839.png

    IndexFile 也是定长的,从单个文件的数据结构来说,这是实现了一种简单原生的哈希拉链机制。当一条新的消息索引进来时,首先使用 hash 算法命中黄色部分 500w 个 slot 中的一个,如果存在冲突就使用拉链解决,将最新索引数据的 next 指向上一条索引位置。同时将消息的索引数据 append 至文件尾部(绿色部分),这样便形成了一条当前 slot 按照时间存入的倒序的链表。

    Index Header

    第几位字段字节数数据类型说明
    1beginTimestamp8Long开始插入时间
    2endTimestamp8Long最后插入时间
    3beginPhyOffset8Long第一个索引对应CommitLog的偏移量
    4endPhyOffset8Long最后一个索引对应CommitLog的偏移量
    5hashSlotCount4Integer槽位使用数
    6indexCount4Integer索引总数

    Slot

    第几位字段字节数数据类型说明
    1absSlotPos4int索引在Content中的位置

    Content

    第几位字段字节数数据类型说明
    1keyHash4Intkey的hashcode
    2phyOffset8LongCommitLog中的偏移量
    3timeDiff8Long消息的延迟时间
    4indexCount8Long上一次槽内的indexCount

    Config

    config文件夹中存储着Topic和Consumer等相关信息。主题和消费者群组相关的信息就存在在此。

    • topics.json : topic 配置属性
    • subscriptionGroup.json :消息消费组配置信息。
    • delayOffset.json :延时消息队列拉取进度。
    • consumerOffset.json :集群消费模式消息消进度。
    • consumerFilter.json :主题消息过滤信息。

    过期文件删除

    由于RocketMQ操作CommitLog,ConsumeQueue文件是基于内存映射机制并在启动的时候会加载CommitLog,ConsumeQueue目录下的所有文件,为了避免内存与磁盘的浪费,不可能将消息永久存储在消息服务器上,所以需要引入一种机制来删除己过期的文件。

    删除过程分别执行清理消息存储文件(Commitlog)与消息消费队列文件(ConsumeQueue文件),消息消费队列文件与消息存储文件共用一套过期文件机制。

    RocketMQ清除过期文件的方法是:如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件,可以被删除,RocketMQ不会关注这个文件上的消息是否全部被消费。默认每个文件的过期时间为72小时(不同版本的默认值不同,这里以4.9.4为例),通过在Broker配置文件中设置 fileReservedTime来改变过期时间,单位为小时。

    触发文件清除操作的是一个定时任务,而且只有定时任务,文件过期删除定时任务的周期由该删除决定,默认每10s执行一次。

    过期判断

    文件删除主要是由这个配置属性:fileReservedTime,文件保留时间。也就是从最后一次更新时间到现在,如果超过了该时间,则认为是过期文件,可以删除。

    另外还有其他两个配置参数:

    • deletePhysicFilesInterval:删除物理文件的时间间隔(默认是100MS),在一次定时任务触发时,可能会有多个物理文件超过过期时间可被删除,因此删除一个文件后需要间隔deletePhysicFilesInterval这个时间再删除另外一个文件,由于删除文件是一个非常耗费IO的操作,会引起消息插入消费的延迟(相比于正常情况下),所以不建议直接删除所有过期文件。
    • destroyMapedFileIntervalForcibly:在删除文件时,如果该文件还被线程引用,此时会阻止此次删除操作,同时将该文件标记不可用并且纪录当前时间戳destroyMapedFileIntervalForcibly这个表示文件在第一次删除拒绝后,文件保存的最大时间,在此时间内一直会被拒绝删除,当超过这个时间时,会将引用每次减少1000,直到引用小于等于0为止,即可删除该文件。

    删除条件

    1. 指定删除文件的时间点,RocketMQ通过deleteWhen设置一天的固定时间执行一次。删除过期文件操作,默认为凌晨4点。
    2. 磁盘空间是否充足,如果磁盘空间不充足(DiskSpaceCleanForciblyRatio。磁盘空间强制删除文件水位。默认是85),会触发过期文件删除操作。

    另外还有RocketMQ的磁盘配置参数:

    1. 物理使用率大于diskSpaceWarningLevelRatio(默认90%可通过参数设置),则会阻止新消息的插入。
    2. 物理磁盘使用率小于diskMaxUsedSpaceRatio(默认75%) 表示磁盘使用正常。
    展开全文
  • 消息检索 消息检索,为了方便和省事,我直接在rocketmq-console控制台新开发一个历史消息的页面用来查询消息,reput server会以心跳的方式将自己可查询的时间段及地址注册到控制台上。 在控制台上选择topic和时间段...
    
     

    今日推荐

    
     
    
     
    这 9 个 Java 开源项目 yyds,你知道几个?
    
    阿里技术专家推荐的20本书,免费送!
    
    K8S 部署 SpringBoot 项目(一篇够用)
    
    妙用Java 8中的 Function接口 消灭if...else(非常新颖的写法)
    Nginx 入门到实战,新手必懂。

    来源:blog.csdn.net/x763795151/article/

    details/118500973

    前言

    RocketMQ作为国人开源的一款消息引擎,相对kafka也更加适合在线的业务场景,在业内使用的也是非常广泛,很多同学也是非常熟悉它及它的存储机制,所以这里不再对它的原理性东西作太多说明。

    我们也知道,RocketMQ所有的数据如消息信息都是以文件形式保存到broker节点所在主机上指定的分区目录下,比如消息的数据都是保存在commitlog中,默认保存72小时(在磁盘使用率未达到阈值的情况下)会在指定时间清理过期数据,释放磁盘空间。

    当然,如果消息量不大且所在磁盘的分区够大,我们可以增加消息的保存时间。但受限于磁盘大小,这个保存时间总归有限,如果消息比较重要,或者我们想保存的更久一些就需要一些其它方案解决。

    背景

    我们线上的几个集群目前消息保存时间在2-3天,实在是磁盘空间大小有限,消息量相对不算小。比如,有个比较核心的集群,部署方式是6个高配物理机采用DLedger模式4主8从交叉部署,发送的tps在10000多,所以每个节点的日消息量目前应该是在600G吧。老大给我说他现在设置的线上保存时间是2天,业务量一直在增加,继续增长下去,就要设置保存1天了,目前每个节点的磁盘使用率将近50%,年初我搭建监控平台的时候,注意过还没这么高。

    7d9e6f4885dc58037a7d5e8039d42e54.png

    还有其它集群上的业务,有些业务相关开发人员想要他们消息保存7天甚至更久。

    基于这些原因,所以我们也的确需要一种过期消息备份的解决方案。

    解决思路

    如果需要对过期消息进行备份,然后支持过期消息检索及重新消费的能力,我们想到的,常规的方案有如下两种:

    • 将发送到broker的消息持久化一份到第三方存储介质,如mysql

    • 备份将要过期的commitlog到其它地方,重新恢复

    业内大厂是采用哪些更好的方案,时间问题也没有具体调研过,我不得而知。关于第一种方案,老大也跟我聊过,我是不倾向的,原因如下:

    • 我们的消息代理平台还没有建设出来,业务用的基本都是原生的,如果想要在消息生命周期中镜像一份出来到其它存储系统,在不改源码的情况下,确实没有很好的切入点

    • 依赖其它存储介质,复杂性,开发成本也高,我的开发时间也不充裕,短期内实现这个,有点难

    • 全量保存的话,消息体的减少很难有质的变化,当然可以在处理的时候,去掉一些元数据信息,消息体也可以压缩减少存储空间的占用,但无论存哪,质量守恒,不会换个地方,用的硬盘资源就能等比减少很多倍

    当然,这种方案的好处也很明显,可以更精细化的控制保存时间及消息类别,设定对哪些topic或哪类消息的保存时限。另外如果我们的MQ代理层建设完,无论是RocketMQ还是kafka等都可以采用一种通用方案备份。

    我目前主要采用第2种解决方案并进行实现,备份commitlog,支持检索和重新消费。主要思路就是,开发一个应用,备份集群里将要过期的commitlog到更大的磁盘空间的主机(一台主机,备份整个集群的数据,且硬件配置不需要太高,硬盘尽量大即可),并提供接口,支持检索消息。

    解决方案

    基本实现

    我们的主要目标是让消息保存的更久一些,不是为了灾备什么的,所以不需要双活、冷备这样搭建一个同等的部署模型的集群。况且资源有限,不可能再申请同配置或者低配的主机资源解决,比如上面那个4主8从Dledger模式,如果需要同样的集群来解析commitlog检索消息,至少也需要4主4从部署8个节点才行,双活太浪费,冷备维护也不方便。主要原因是资源也不好申请。

    我用了一周的时间,紧赶赶的写了一个工具能支持备份commitlog及检索消息:rocketmq-reput。

    该工具支持3种模式:客户端服务器混合模式

    • 客户端:部署在broker节点,定时扫描上传将要过期的commitlog

    • 服务器:保存过期的commitlog并支持消息检索

    • 混合模式:同时开启客户端和服务器模式,无限期备份的关键

    主要流程如下:

    • 将reput client部署到rokcetmq集群的各个broker的从节点上,配置监听的commitlog目录,定时扫描将要过期的commitlog上传到reput server上。

    • reput server接收client传来的commit log并根据不同的broker存放在不同的目录下。

    • 重新分发commit log的消息(所以我起名reput),构建索引文件(消息检索使用)和逻辑消费队列。

    • 在reput server端可以通过restful接口查询指定topic的历史消息(根据时间范围、消息ID[客户端ID/服务端ID],消息key等)

    569e0e7e052c8dce17d5be88e2825166.png

    数据上传

    从方案到开发,因为时间上的原因,我也没太多时间花费在这上面,所以在实现上并没有太注意细节,开发上也比较粗糙。

    数据上传这里也是很简单的压缩->传输->校验->保存,基本流程如下:

    65af3dcc9c5e5fe374e2c001ec184df5.png

    如果上传到一半服务器关闭等原因导致客户端当前文件上传失败,会重置队列,重新检查上传文件,避免有commitlog遗漏。

    主机配置

    该工具在执行时,大多情况下不需要太多算力,所以CPU是双核的即可,内存4G足够,堆内存配置2G就行,需要留一些物理内存给操作系统的page cache。我目前测试的时候,堆内存只配置了512M,挺好。

    reput client尽量部署在从节点上,可以减少对master的影响。

    另外开发的时候,为了节省时间,减少开发的代码,像文件压缩和md5检查,都是直接调用的shell 命令,这也导致不支持在windows平台下使用,只能在mac 和linux上运行,mac os不检查md5,只检查文件长度是否一致。

    因为执行脚本命令的原因,会占用一些额外的性能,我观测的有以下几点:

    • 压缩的时候一个cpu的核心使用率达到100%,所以要求最低双核cpu,单核会影响broker的处理性能

    • 网络传输带宽占用在50M/s,其实压缩比挺高,一般在72%-92%吧,100M-300M之间,所以传输时间大概在2-6秒吧,如果本身带宽是瓶颈,需要注意

    • 硬盘,硬盘得够大,毕竟要保存整个集群的commitlog

    无限期备份方案

    硬盘即使再大,但空间大小也有上限,所以能保存消息量也有限,比如一个节点消息量600-700G左右,4个节点一天的量就在2.5T左右,即使申请了一个8T的硬盘,也只能保存2天(3天是不可能了)。

    reput自身也是和rockemq一样的过期删除策略(这部分代码直接copy rocketmq的实现的),所以数据在reput server上过期也要被清除释放磁盘空间。

    所以目前reput支持混合模式,可以再申请一台主机,当前reput作为客户端,新reput作为server,将快要过期的文件以同样方式传输过去保存,完整流程如下:

    b09ef85725b2898567303592c26631f4.png

    就以这种接力的方式一直保存下去,一个主机保存2天,想要保存多久,就申请多少主机吧。

    消息检索

    消息检索,为了方便和省事,我直接在rocketmq-console控制台新开发一个历史消息的页面用来查询消息,reput server会以心跳的方式将自己可查询的时间段及地址注册到控制台上。

    在控制台上选择topic和时间段,然后根据选择的时间段符合条件的一个或多个reput server上获取消息。如果是消息ID或消息key,那就只能到所有的server上一起查了,只要消息还在,总能查到返回。

    效果如下,我还可以查到4天前的消息(测试的这个集群配置的是保存2天的数据):

    80fcf90b8f9813f4326c2ae9d7c82ac0.png fca79a81a1021d35a27f79c1da5f543b.png

    重新消费

    重新消费可以将要消费的历史消息检索出来,重新发回broker。

    写在最后

    其实开发上还是遇到不少问题点,比如因为commtlog的生成方式和rocketmq自身的生成是不一样的,rocketmq是在写入消息的时候,commitlog写不下了才会创建。在重新构建索引和消息队列的时候基于原有流程有些场景走不通,无法直接滚到下个文件等。

    我是每个环节一一开发进行验证的,最终把所有环节走通,写了个完整流程的demo。

    https://github.com/xxd763795151/rocketmq-reput

    我把基本启停脚本也简单补充了下,只是上面有些bug后来就没在修改。

    整个流程走通后,我就修改包名提交到私服了,后续的开发包括和rocketmq-console的联调,支持可视化检索消息等都是在私服的代码仓库上,这部分功能及后续的bug修复,这个demo上是没有了。但是这份demo代码已支持消息检索,也提供的有接口,可以直接调用接口检索消息看结果,接口说明如下:

    /**
         * get the total of message between startTime and endTime.
         *
         * @param topic     topic name.
         * @param startTime start time.
         * @param endTime   end time.
         * @return a long value, the total of message between startTime and endTime.
         */
        @GetMapping("/total/{topic}/{startTime}/{endTime}")
        public Object getMessageTotalByTime(@PathVariable String topic, @PathVariable long startTime,
            @PathVariable long endTime) {
            return ResponseData.create().success().data(messageService.getMessageTotalByTime(topic, startTime, endTime));
        }
     
        /**
         * get the message list between startTime and endTime.
         *
         * @param topic     topic name.
         * @param startTime start time.
         * @param endTime   end time.
         * @return List(MessageExt),  he message list between startTime and endTime.
         */
        @GetMapping("/list/{topic}/{startTime}/{endTime}")
        public Object getMessageByTime(@PathVariable String topic, @PathVariable long startTime,
            @PathVariable long endTime) {
            return ResponseData.create().success().data(messageService.getMessageByTime(topic, startTime, endTime));
        }
     
        /**
         * get the message list between startTime and endTime. It differs from the above getMessageByTime is that the
         * message body is null , as a result,  the size is smaller when return the same messages.
         *
         * @param topic     topic name.
         * @param startTime start time.
         * @param endTime   end time.
         * @return List(MessageExt),  he message list between startTime and endTime.
         */
        @GetMapping("/view/{topic}/{startTime}/{endTime}")
        public Object viewMessageList(@PathVariable String topic, @PathVariable long startTime,
            @PathVariable long endTime) {
            return ResponseData.create().success().data(messageService.viewMessageList(topic, startTime, endTime));
        }
     
        /**
         * get message by message id(server id(offset id) or client id(unique key)).
         *
         * @param topic topic name
         * @param msgId msg id: server id/ client id.
         * @return {@link org.apache.rocketmq.common.message.MessageExt}
         */
        @GetMapping("/id/{topic}/{msgId}")
        public Object queryMessageByMsgId(@PathVariable final String topic, @PathVariable final String msgId) {
            return ResponseData.create().success().data(messageService.queryMessageByMsgId(topic, msgId));
        }
     
        /**
         * get message by message key.
         *
         * @param topic topic name
         * @param key   msg key: custom business key/ client id.
         * @return {@link org.apache.rocketmq.common.message.MessageExt}
         */
        @GetMapping("/key/{topic}/{key}")
        public Object queryMessageByKey(@PathVariable final String topic, @PathVariable final String key) {
            return ResponseData.create().success().data(messageService.queryMessageByKey(topic, key));
        }

    这个实现是支持Dledger模式与常规的部署模型的。最近在测试环境(2主2从非DLedger模式)运行了几天,看了下效果,结果挺预期的,可以验证该方案是完全可行的。

    
     
    推荐文章
    
     
    1、一款高颜值的 SpringBoot+JPA 博客项目
    
    2、超优 Vue+Element+Spring 中后端解决方案
    
    3、推荐几个支付项目!
    
    4、推荐一个 Java 企业信息化系统
    
    5、一款基于 Spring Boot 的现代化社区(论坛/问答/社交网络/博客)
    展开全文
  • RocketMQ中的消息存储在本地文件系统中,主要是由ConsumeQueue和CommitLog配合完成的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。...

    目录

    一、概述

    二、CommitLog文件

    三、ConsumerQueue消费逻辑队列

    四、IndexFile索引文件

    五、页缓存与内存映射


    一、概述

    RocketMQ中的消息存储在本地文件系统中,主要是由ConsumeQueue和CommitLog配合完成的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列,类似数据库的索引文件,存储的是指向物理存储的地址。每个Topic下的每个Message Queue都有一个对应的ConsumeQueue文件。

    来看一张RocketMQ消息存储整体架构图:

    我们可以在broker的配置文件中配置store存储的目录。例如前面搭建RocketMQ时我们就自定义了存储目录。

    #存储路径
    storePathRootDir=/rocketmq/rocketmq-4.9.2/store
    #commitLog 存储路径
    storePathCommitLog=/rocketmq/rocketmq-4.9.2/store/commitlog
    #消费队列存储路径存储路径
    storePathConsumeQueue=/rocketmq/rocketmq-4.9.2/store/consumequeue
    #消息索引存储路径
    storePathIndex=/rocketmq/rocketmq-4.9.2/store/index
    #checkpoint 文件存储路径
    storeCheckpoint=/rocketmq/rocketmq-4.9.2/store/checkpoint
    #abort 文件存储路径
    abortFile=/rocketmq/rocketmq-4.9.2/store/abort

    分别介绍一下各个文件夹/文件的含义:

    • abort:该文件在Broker启动后会自动创建,正常关闭Broker,该文件会自动消失。若在没有启动 Broker的情况下,发现这个文件是存在的,则说明之前Broker的关闭是非正常关闭;
    • checkpoint:其中存储着commitlog、consumequeue、index文件的最后刷盘时间戳;
    • commitlog:其中存放着commitlog文件,而消息是写在commitlog文件中的;
    • config:存放着Broker运行期间的一些配置数据;
    • consumequeue:其中存放着consumequeue文件,队列就存放在这个目录中;
    • index:其中存放着消息索引文件indexFile;
      lock:运行期间使用到的全局资源锁;

    二、CommitLog文件

    CommitLog是消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G, 文件名长度为20位,左边补零,剩余为起始偏移量,比如00000000000000000000代表了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件。

    需要注意的是,一个Broker中仅包含一个commitlog目录,所有的mappedFile文件都是存放在该目录中 的。即无论当前Broker中存放着多少Topic的消息,这些消息都是被顺序写入到了mappedFile文件中。

    RocketMQ利用“零拷贝”技术,提高消息存盘和网络发送的速度。这里需要注意的是,采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5~2G 的文件至用户态的虚拟内存,这也是为何RocketMQ默认设置单个CommitLog日志数据文件为1G的原因。

    三、ConsumerQueue消费逻辑队列

    ConsumerQueue是消息消费队列,引入的目的主要是提高消息消费的性能,由于RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,如果要遍历commitlog文件中根据topic检索消息是非常低效的。Consumer即可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)作为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式如下:topic/queue/file三层组织结构:

    如下图,可以看到一个consumequeue中的索引条目结构为:

    同样consumequeue文件采取定长设计,单个consumequeue文件由30W个索引条目组成。每一个索引条目包含了三个重要属性:消息在commitlog中的偏移量(8字节)、消息长度(4字节)、消息Tag的hashcode值(8字节)。可以像数组一样随机访问每一个索引条目,每个索引条目这三个属性占20个字节,所以每个ConsumeQueue文件的大小是固定的30w * 20字节,约5.72M。

    四、IndexFile索引文件

    除了通过通常的指定Topic进行消息消费外,RocketMQ还提供了根据key进行消息查询的功能。该查询是通过store目录中的index子目录中的indexFile进行索引实现的快速查询。当然,这个indexFile中的索引数据是在包含了key的消息被发送到Broker时写入的。如果消息中没有包含key,则不会写入。

    RocketMQ的索引文件逻辑结构,类似JDK中HashMap的实现。索引文件的具体结构如下:

    每个Broker中会包含一组indexFile,每个indexFile都是以一个时间戳命名的(这个indexFile被创建时的时间戳),文件大小是固定的。如果消息的properties中设置了UNIQ_KEY这个属性,就用 topic + “#” + UNIQ_KEY的value作为 key 来做写入操作。如果消息设置了KEYS属性(多个KEY以空格分隔),也会用 topic + “#” + KEY 来做索引。

    其中的索引数据包含了Key Hash、CommitLog Offset、Timestamp、NextIndex offset 这四个字段,一共20 字节。NextIndex offset 即前面读出来的slotValue,如果有 hash冲突,就可以用这个字段将所有冲突的索引用链表的方式串起来了。Timestamp记录的是消息storeTimestamp之间的差,并不是一个绝对的时间。整个Index File的结构如图,40 Byte 的Header用于保存一些总的统计信息,4*500W的 Slot Table并不保存真正的索引数据,而是保存每个槽位对应的单向链表的头。20*2000W 是真正的索引数据,即一个 Index File 可以保存 2000W个索引。

    五、页缓存与内存映射

    页缓存(PageCache)是OS对文件的缓存,用于加速对文件的读写。一般来说,程序对文件进行顺序读写的速度几乎接近于内存的读写速度,主要原因就是由于OS使用PageCache机制对读写访问操作进行了性能优化,将一部分的内存用作PageCache。对于数据的写入,OS会先写入至Cache内,随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。对于数据的读取,如果一次读取文件时出现未命中PageCache的情况,OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取

    在RocketMQ中,ConsumeQueue逻辑消费队列存储的数据较少,并且是顺序读取,在page cache机制的预读取作用下,Consume Queue文件的读性能几乎接近读内存,即使在有消息堆积情况下也不会影响性能。而对于CommitLog消息存储的日志数据文件来说,读取CommitLog消息内容时候会产生较多的随机访问读取,严重影响性能。如果选择合适的系统IO调度算法,比如设置调度算法为“Deadline”(此时块存储采用SSD的话),随机读的性能也会有所提升。

    另外,RocketMQ主要通过MappedByteBuffer(零拷贝)对文件进行读写操作其中,利用了NIO中的FileChannel模型将磁盘上的物理文件直接映射到用户态的内存地址中(这种Mmap的方式减少了传统IO将磁盘文件数据在操作系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操作转化为直接对内存地址进行操作,从而极大地提高了文件的读写效率(正因为需要使用内存映射机制,故RocketMQ的文件存储都使用定长结构来存储,方便一次将整个文件映射至内存)。

    展开全文
  • RocketMQ 消息发送

    2021-02-11 17:51:46
    第 3 章主要聚焦在 RocketMQ 如何发送消息,然后从消息的数据结构开始,逐步介绍生产者的启动流程和消息发送的流程,最后再详细阐述批量消息发送 。 本章重点内容如下 。 • RocketMQ 消息结构 ·消息生产者...
  • 延迟消息是实际开发中一个非常有用的功能,本文第一部分从整体上介绍秒级精度延迟消息的实现思路,在第二部分结合RocketMQ的延迟消息实现,进行细致的讲解,点出关键部分的源码。第三步介绍延迟消息消息重试的关系...
  • RocketMQ延迟消息

    2021-07-11 19:01:24
    延迟消息介绍 延迟队列表示生产的消息发送到服务端后,并不能立刻被消费者消费,等到到达消息的延迟时间后才会被消费...RocketMQRocketMQ 开源版本延迟消息临时存储在一个内部主题中,不支持任意时间精度,支持特定的
  • 保证信息不丢失 当你系统需要保证百分百消息不丢失,你可以使用生产者每发送一个消息,Broker 同步返回一个 消息发送成功的反馈消息 即每发送一个消息,同步落盘后才返回生产者消息发送成功,这样只要生产者得到了...
  • MQ消息丢失,一致性问题在生产,存储,消费阶段如何解决 消息重发之后,如何避免重复消费 继上篇RocketMQ技术总结二,这篇主要介绍一下 消息积压问题如何处理 阅读源码的小技巧 异步方案提升系统性能 MQ的缓存策略...
  • 前面我们已经介绍过了消息存储在磁盘中的表现形式:RocketMQ消息整体存储架构(CommitLog、ConsumeQueue),现在我们来介绍一下,一个生产消息请求到Broker端后,Broker端是如何保存这条消息的。 二、存储架构 三...
  • RocketMQ延时消息是怎么实现的
  • RocketMQ消息队列使用

    千次阅读 2019-01-03 08:55:11
    RocketMQ是阿里巴巴在2012年开源的分布式消息中间件,目前已经捐赠给Apache基金会,并于2016年11月成为 Apache 孵化项目。  中间件是一类连接软件组件和应用的计算机软件,它包括一组服务。以便于运行在一台或多...
  • RocketMQ消息存储原理

    2021-10-30 14:47:59
    producer发送到mq中,然后Comsumer去消费,mq为了保证工作效率,所有的消息肯定是在内存中去中转的,那么就有个问题,一断电,内存中的消息就丢失了,肯定得有个方式需要把消息存到硬盘中. MQ收到消息之后会给producer一个...
  • RocketMQ 消息存储

    2021-02-11 20:24:06
    目前的 MQ 中间件从存储模型来看 ,分为需要持久化和不需要持久化的两种模型,现在大多数的 MQ 都是支持持久化存储的,比如 ActiveMQ 、 RabbitMQ 、 Kafka,RocketMQ ,而 ZeroMQ 却不需要支持持久化存储 。...
  • 今天我们来聊一聊 RocketMQ 怎么做能确保消息不丢失。 1 RocketMQ 简介 RocketMQ 是阿里巴巴开源的分布式消息中间件,整体架构如下图: RocketMQ 主要包括 Producer、Consumer 和 Broker,同时 Name Server ...
  • rocketMQ-消息队列

    2022-02-19 12:11:36
    RocketMQ4.X基础介绍 官网地址 http://rocketmq.apache.org/ ...Apache RocketMQ作为阿里开源的一款高性能、高吞吐量的分布式消息中间件 特点 支持Broker和Consumer端消息过滤 支持发布订阅模型,
  • tags可由应用自行设置,只有生产者在发送消息设置了tags,消费方在订阅消息时才可以利用tags通过broker做消息过滤: message.setTags("TagA"); 2 Keys的使用 每个消息在业务层面的唯一标识码要设置到keys字段,方便...
  • rocketmq 消息机制

    2019-07-12 10:35:07
    RocketMQ消息存储是由consume queue和commit log配合完成的。 刷盘时间消息只是被写入内存 pagecache,写操作返回快,吞吐量达,当内存里的消息积累到一定程度时,统一出发写磁盘动作,快速写入。 默认...
  • RocketMQ 延时消息机制

    2022-06-18 14:15:24
    RocketMQ 延时消息机制
  • RocketMQ 延迟消息

    千次阅读 2020-01-06 20:27:31
    延迟消息是实际开发中一个非常有用的功能,本文第一部分从整体上介绍秒级精度延迟消息的实现思路,在第二部分结合RocketMQ的延迟消息实现,进行细致的讲解,点出关键部分的源码。第三步介绍延迟消息消息重试的关系...
  • RocketMQ不保证消息不重复,如果你的业务需要保证严格的不重复消息,需要你自己在业务端去重。 业务端如何去重呢?原理很简简单,步骤如下: 1、记录下每个消息的msgID 2、新消息来的时候,查看该消息的msgID是否...
  • RocketMQ源码 — 八、 RocketMQ消息重试

    千次阅读 2018-05-08 22:33:18
    RocketMQ消息重试包含了producer发送消息的重试和consumer消息消费的重试。 producer发送消息重试 producer在发送消息的时候如果发送失败了,RocketMQ会自动重试。 private SendResult sendDefaultImpl( ...
  • 本文通过分析消息流转的整个过程,从消息发送、消息存储和消息消费三个阶段介绍RocketMQ是如何保证消息的可靠性的。 分布式系统中一个重要的前提假设是所有的网络传输都是不可靠的,在网络传输不可靠的情况下,保证...
  • Broker是RocketMQ的核心,提供了消息的接收,存储,拉取等功能 我们可以先从Broker服务入手。从源码可以得知。RocketMQ启用了一个 BrokerController 的 start 函数 public static void main(String[] args) { ...
  • 消息中间件activemq/rabbitMQ/rocketMQ/kafka 从入门到精通 mom消息中间件 推模式 参考资料 分布式消息中间件实战(倪炜)沈剑 架构师之路 spring实战 文章目录消息中间件activemq/rabbitMQ/rocketMQ/kafka 从入门...
  • rocketMQ消息队列 ,docker部署rocketMQ 单机、集群
  • 目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。今天我就首先分析一下RocketMQ,目前公司用的也是这个,因此在进行一下梳理,加深一下印象。 RocketMQ概述 RocketMQ...
  • RocketMQ提供两个模式进行消费 1、拉模式 代码上使用DefaultMQPullConsumer 1)获取MessageQueues并遍历(一个Topic包括多个MessageQueue),如果是特殊情况,也可以选择指定的MessageQueue来读取消息。 2)维护...
  • RocketMQ-延迟消息

    2021-05-19 11:13:58
    延迟消息是实际开发中一个非常有用的功能,本文第一部分从整体上介绍秒级精度延迟消息的实现思路,在第二部分结合RocketMQ的延迟消息实现,进行细致的讲解,点出关键部分的源码。第三步介绍延迟消息消息重试的关系...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 7,089
精华内容 2,835
热门标签
关键字:

rocketmq消息保留时间