精华内容
下载资源
问答
  • redis消息队列和mq
    千次阅读
    2021-12-08 14:23:10

    简介

            为了保障基础服务的稳定,需要对MQ进行灾备,这个灾备主要是防患MQ突然不可能,基础服务依然可以调用其他队列来继续正常运行。第一想法是引入其他MQ中间件来做灾备,这样只需要实现一套生产者消费者就好。但因为公司这块中间件都使用的云产品。要过要使用其他类型的MQ中间件就需要购买资源,但程序运行中正常情况下不会用到这个备胎,因此剩下的选项就是将队列数据落到数据库(关系型/非关系型),要么就是redis,因为即便MQ出现问题,供应商也会很快的修复问题保证MQ中间件的可用,因此我们这边灾备主要的职责就是应急。考虑到落库的处理逻辑和因此选择轻量级的redis队列来对基础服务中的MQ进行灾备。

    对比

     Redis队列:Redis队列是一个Key-Value的NoSQL数据库,开发维护很活跃,虽然是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用

     MQ队列   :在分布式系统中存储转发消息,在易用性、扩展性、高可用等方面表现不俗,主要是为了实现系统之间的双向解耦

    区别

            1. Redis没有相应的机制保证消息的消费,当消费者消费失败的时候,消费体丢失,需要手动处理。MQ:具有消息消费确认,即使消费者消费失败,也会自动使消息体返回原队列,同时可全程持久化,保证消息体被正确消费

      2. Redis采用主从模式,读写分离,但是故障转移还没有非常完善的官方解决方案;MQ集群采用磁盘、内存节点,任意单点故障都不会影响整个队列的操作

      3. 将整个Redis实例持久化到磁盘,MQ的队列、消息,都可以选择是否持久化

      4. Redis的特点是轻量级,高并发,延迟敏感,用于即使数据分析、秒杀计数器、缓存等,MQ的特点是重量级,高并发,用于异步、批量数据异步处理、并发任务串行化,高负载任务的负载均衡等

    更多相关内容
  • 万物皆K8S: 从服务化es到kafka和redis技术实践 新东方的容器化之路 Swarm Cattle K8S 2016 2017 2018 我们怎么看待K8S Paas Caas Kubernetes Iaas 新东方容器云总体设计 XDF新东方 K8S实践 XDF新东方 镜像分发 XDF...
  • 这篇文章,我就你聊一聊把 Redis 当作队列,究竟是否合适这个问题。我会从简单到复杂,一步步带你梳理其中的细节,把这个问题真正的讲清楚。看完这篇文章后,我希望你对这个问题你会有全新的认识。在文章的最后,...

    大家好,我是 Jack。

    我经常听到很多人讨论,关于「把 Redis 当作队列来用是否合适」的问题。

    有些人表示赞成,他们认为 Redis 很轻量,用作队列很方便。

    也些人则反对,认为 Redis 会「丢」数据,最好还是用「专业」的队列中间件更稳妥。

    究竟哪种方案更好呢?

    这篇文章,我就和你聊一聊把 Redis 当作队列,究竟是否合适这个问题。

    我会从简单到复杂,一步步带你梳理其中的细节,把这个问题真正的讲清楚。

    看完这篇文章后,我希望你对这个问题你会有全新的认识。

    在文章的最后,我还会告诉你关于「技术选型」的思路,文章有点长,希望你可以耐心读完。
    在这里插入图片描述

    从最简单的开始:List 队列

    首先,我们先从最简单的场景开始讲起。

    如果你的业务需求足够简单,想把 Redis 当作队列来使用,肯定最先想到的就是使用 List 这个数据类型。

    因为 List 底层的实现就是一个「链表」,在头部和尾部操作元素,时间复杂度都是 O(1),这意味着它非常符合消息队列的模型。

    如果把 List 当作队列,你可以这么来用。

    生产者使用 LPUSH 发布消息:

    127.0.0.1:6379> LPUSH queue msg1
    (integer) 1
    127.0.0.1:6379> LPUSH queue msg2
    (integer) 2
    

    消费者这一侧,使用 RPOP 拉取消息:

    127.0.0.1:6379> RPOP queue
    "msg1"
    127.0.0.1:6379> RPOP queue
    "msg2"
    

    这个模型非常简单,也很容易理解。
    在这里插入图片描述
    但这里有个小问题,当队列中已经没有消息了,消费者在执行 RPOP 时,会返回 NULL。

    127.0.0.1:6379> RPOP queue
    (nil)   // 没消息了
    

    而我们在编写消费者逻辑时,一般是一个「死循环」,这个逻辑需要不断地从队列中拉取消息进行处理,伪代码一般会这么写:

    while true:
        msg = redis.rpop("queue")
        // 没有消息,继续循环
        if msg == null:
            continue
        // 处理消息
        handle(msg)
    

    如果此时队列为空,那消费者依旧会频繁拉取消息,这会造成「CPU 空转」,不仅浪费 CPU 资源,还会对 Redis 造成压力。

    怎么解决这个问题呢?

    也很简单,当队列为空时,我们可以「休眠」一会,再去尝试拉取消息。代码可以修改成这样:

    while true:
        msg = redis.rpop("queue")
        // 没有消息,休眠2s
        if msg == null:
            sleep(2)
            continue
        // 处理消息        
        handle(msg)
    

    这就解决了 CPU 空转问题。

    这个问题虽然解决了,但又带来另外一个问题:当消费者在休眠等待时,有新消息来了,那消费者处理新消息就会存在「延迟」。

    假设设置的休眠时间是 2s,那新消息最多存在 2s 的延迟。

    要想缩短这个延迟,只能减小休眠的时间。但休眠时间越小,又有可能引发 CPU 空转问题。

    鱼和熊掌不可兼得。

    那如何做,既能及时处理新消息,还能避免 CPU 空转呢?

    Redis 是否存在这样一种机制:如果队列为空,消费者在拉取消息时就「阻塞等待」,一旦有新消息过来,就通知我的消费者立即处理新消息呢?

    幸运的是,Redis 确实提供了「阻塞式」拉取消息的命令:BRPOP / BLPOP,这里的 B 指的是阻塞(Block)。
    在这里插入图片描述
    现在,你可以这样来拉取消息了:

    while true:
        // 没消息阻塞等待,0表示不设置超时时间
        msg = redis.brpop("queue", 0)
        if msg == null:
            continue
        // 处理消息
        handle(msg)
    

    使用 BRPOP 这种阻塞式方式拉取消息时,还支持传入一个「超时时间」,如果设置为 0,则表示不设置超时,直到有新消息才返回,否则会在指定的超时时间后返回 NULL。

    这个方案不错,既兼顾了效率,还避免了 CPU 空转问题,一举两得。

    注意:如果设置的超时时间太长,这个连接太久没有活跃过,可能会被 Redis Server 判定为无效连接,之后 Redis Server 会强制把这个客户端踢下线。所以,采用这种方案,客户端要有重连机制。

    解决了消息处理不及时的问题,你可以再思考一下,这种队列模型,有什么缺点?

    我们一起来分析一下:

    不支持重复消费:消费者拉取消息后,这条消息就从 List 中删除了,无法被其它消费者再次消费,即不支持多个消费者消费同一批数据
    消息丢失:消费者拉取到消息后,如果发生异常宕机,那这条消息就丢失了
    第一个问题是功能上的,使用 List 做消息队列,它仅仅支持最简单的,一组生产者对应一组消费者,不能满足多组生产者和消费者的业务场景。

    第二个问题就比较棘手了,因为从 List 中 POP 一条消息出来后,这条消息就会立即从链表中删除了。也就是说,无论消费者是否处理成功,这条消息都没办法再次消费了。

    这也意味着,如果消费者在处理消息时异常宕机,那这条消息就相当于丢失了。

    针对这 2 个问题怎么解决呢?我们一个个来看。

    发布/订阅模型:Pub/Sub

    从名字就能看出来,这个模块是 Redis 专门是针对「发布/订阅」这种队列模型设计的。

    它正好可以解决前面提到的第一个问题:重复消费。

    即多组生产者、消费者的场景,我们来看它是如何做的。

    Redis 提供了 PUBLISH / SUBSCRIBE 命令,来完成发布、订阅的操作。
    在这里插入图片描述
    假设你想开启 2 个消费者,同时消费同一批数据,就可以按照以下方式来实现。

    首先,使用 SUBSCRIBE 命令,启动 2 个消费者,并「订阅」同一个队列。

    // 2个消费者 都订阅一个队列
    127.0.0.1:6379> SUBSCRIBE queue
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "queue"
    3) (integer) 1
    

    此时,2 个消费者都会被阻塞住,等待新消息的到来。

    之后,再启动一个生产者,发布一条消息。

    127.0.0.1:6379> PUBLISH queue msg1
    (integer) 1
    

    这时,2 个消费者就会解除阻塞,收到生产者发来的新消息。

    127.0.0.1:6379> SUBSCRIBE queue
    // 收到新消息
    1) "message"
    2) "queue"
    3) "msg1"
    

    看到了么,使用 Pub/Sub 这种方案,既支持阻塞式拉取消息,还很好地满足了多组消费者,消费同一批数据的业务需求。

    除此之外,Pub/Sub 还提供了「匹配订阅」模式,允许消费者根据一定规则,订阅「多个」自己感兴趣的队列。

    // 订阅符合规则的队列
    127.0.0.1:6379> PSUBSCRIBE queue.*
    Reading messages... (press Ctrl-C to quit)
    1) "psubscribe"
    2) "queue.*"
    3) (integer) 1
    

    这里的消费者,订阅了 queue.* 相关的队列消息。

    之后,生产者分别向 queue.p1 和 queue.p2 发布消息。

    127.0.0.1:6379> PUBLISH queue.p1 msg1
    (integer) 1
    127.0.0.1:6379> PUBLISH queue.p2 msg2
    (integer) 1
    

    这时再看消费者,它就可以接收到这 2 个生产者的消息了。

    127.0.0.1:6379> PSUBSCRIBE queue.*
    Reading messages... (press Ctrl-C to quit)
    ...
    // 来自queue.p1的消息
    1) "pmessage"
    2) "queue.*"
    3) "queue.p1"
    4) "msg1"
    
    // 来自queue.p2的消息
    1) "pmessage"
    2) "queue.*"
    3) "queue.p2"
    4) "msg2"
    

    在这里插入图片描述
    我们可以看到,Pub/Sub 最大的优势就是,支持多组生产者、消费者处理消息。

    讲完了它的优点,那它有什么缺点呢?

    其实,Pub/Sub 最大问题是:丢数据。

    如果发生以下场景,就有可能导致数据丢失:

    1、消费者下线
    2、Redis 宕机
    3、消息堆积
    究竟是怎么回事?

    这其实与 Pub/Sub 的实现方式有很大关系。

    Pub/Sub 在实现时非常简单,它没有基于任何数据类型,也没有做任何的数据存储,它只是单纯地为生产者、消费者建立「数据转发通道」,把符合规则的数据,从一端转发到另一端。

    一个完整的发布、订阅消息处理流程是这样的:

    1、消费者订阅指定队列,Redis 就会记录一个映射关系:队列->消费者
    2、生产者向这个队列发布消息,那 Redis 就从映射关系中找出对应的消费者,把消息转发给它
    在这里插入图片描述
    看到了么,整个过程中,没有任何的数据存储,一切都是实时转发的。

    这种设计方案,就导致了上面提到的那些问题。

    例如,如果一个消费者异常挂掉了,它再重新上线后,只能接收新的消息,在下线期间生产者发布的消息,因为找不到消费者,都会被丢弃掉。

    如果所有消费者都下线了,那生产者发布的消息,因为找不到任何一个消费者,也会全部「丢弃」。

    所以,当你在使用 Pub/Sub 时,一定要注意:消费者必须先订阅队列,生产者才能发布消息,否则消息会丢失。

    这也是前面讲例子时,我们让消费者先订阅队列,之后才让生产者发布消息的原因。

    另外,因为 Pub/Sub 没有基于任何数据类型实现,所以它也不具备「数据持久化」的能力。

    也就是说,Pub/Sub 的相关操作,不会写入到 RDB 和 AOF 中,当 Redis 宕机重启,Pub/Sub 的数据也会全部丢失。

    最后,我们来看 Pub/Sub 在处理「消息积压」时,为什么也会丢数据?

    当消费者的速度,跟不上生产者时,就会导致数据积压的情况发生。

    如果采用 List 当作队列,消息积压时,会导致这个链表很长,最直接的影响就是,Redis 内存会持续增长,直到消费者把所有数据都从链表中取出。

    但 Pub/Sub 的处理方式却不一样,当消息积压时,有可能会导致消费失败和消息丢失!

    这是怎么回事?

    还是回到 Pub/Sub 的实现细节上来说。

    每个消费者订阅一个队列时,Redis 都会在 Server 上给这个消费者在分配一个「缓冲区」,这个缓冲区其实就是一块内存。

    当生产者发布消息时,Redis 先把消息写到对应消费者的缓冲区中。

    之后,消费者不断地从缓冲区读取消息,处理消息。

    在这里插入图片描述
    但是,问题就出在这个缓冲区上。

    因为这个缓冲区其实是有「上限」的(可配置),如果消费者拉取消息很慢,就会造成生产者发布到缓冲区的消息开始积压,缓冲区内存持续增长。

    如果超过了缓冲区配置的上限,此时,Redis 就会「强制」把这个消费者踢下线。

    这时消费者就会消费失败,也会丢失数据。

    如果你有看过 Redis 的配置文件,可以看到这个缓冲区的默认配置:client-output-buffer-limit pubsub 32mb 8mb 60。

    它的参数含义如下:

    • 32mb:缓冲区一旦超过 32MB,Redis 直接强制把消费者踢下线
    • 8mb + 60:缓冲区超过 8MB,并且持续 60 秒,Redis 也会把消费者踢下线

    Pub/Sub 的这一点特点,是与 List 作队列差异比较大的。

    从这里你应该可以看出,List 其实是属于「拉」模型,而 Pub/Sub 其实属于「推」模型。

    List 中的数据可以一直积压在内存中,消费者什么时候来「拉」都可以。

    但 Pub/Sub 是把消息先「推」到消费者在 Redis Server 上的缓冲区中,然后等消费者再来取。

    当生产、消费速度不匹配时,就会导致缓冲区的内存开始膨胀,Redis 为了控制缓冲区的上限,所以就有了上面讲到的,强制把消费者踢下线的机制。

    好了,现在我们总结一下 Pub/Sub 的优缺点:

    1、支持发布 / 订阅,支持多组生产者、消费者处理消息
    2、消费者下线,数据会丢失
    3、不支持数据持久化,Redis 宕机,数据也会丢失
    4、消息堆积,缓冲区溢出,消费者会被强制踢下线,数据也会丢失
    有没有发现,除了第一个是优点之外,剩下的都是缺点。

    所以,很多人看到 Pub/Sub 的特点后,觉得这个功能很「鸡肋」。

    也正是以上原因,Pub/Sub 在实际的应用场景中用得并不多。

    目前只有哨兵集群和 Redis 实例通信时,采用了 Pub/Sub 的方案,因为哨兵正好符合即时通讯的业务场景。
    我们再来看一下,Pub/Sub 有没有解决,消息处理时异常宕机,无法再次消费的问题呢?

    其实也不行,Pub/Sub 从缓冲区取走数据之后,数据就从 Redis 缓冲区删除了,消费者发生异常,自然也无法再次重新消费。

    好,现在我们重新梳理一下,我们在使用消息队列时的需求。

    当我们在使用一个消息队列时,希望它的功能如下:

    • 支持阻塞等待拉取消息
    • 支持发布 / 订阅模式
    • 消费失败,可重新消费,消息不丢失
    • 实例宕机,消息不丢失,数据可持久化
    • 消息可堆积

    Redis 除了 List 和 Pub/Sub 之外,还有符合这些要求的数据类型吗?

    其实,Redis 的作者也看到了以上这些问题,也一直在朝着这些方向努力着。

    Redis 作者在开发 Redis 期间,还另外开发了一个开源项目 disque。

    这个项目的定位,就是一个基于内存的分布式消息队列中间件。

    但由于种种原因,这个项目一直不温不火。

    终于,在 Redis 5.0 版本,作者把 disque 功能移植到了 Redis 中,并给它定义了一个新的数据类型:Stream。

    下面我们就来看看,它能符合上面提到的这些要求吗?

    趋于成熟的队列:Stream

    我们来看 Stream 是如何解决上面这些问题的。

    我们依旧从简单到复杂,依次来看 Stream 在做消息队列时,是如何处理的?

    首先,Stream 通过 XADD 和 XREAD 完成最简单的生产、消费模型:

    • XADD:发布消息
    • XREAD:读取消息

    生产者发布 2 条消息:

    // *表示让Redis自动生成消息ID
    127.0.0.1:6379> XADD queue * name zhangsan
    "1618469123380-0"
    127.0.0.1:6379> XADD queue * name lisi
    "1618469127777-0"
    

    使用 XADD 命令发布消息,其中的「*」表示让 Redis 自动生成唯一的消息 ID。

    这个消息 ID 的格式是「时间戳-自增序号」。

    消费者拉取消息:

    // 从开头读取5条消息,0-0表示从开头读取
    127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 0-0
    1) 1) "queue"
       2) 1) 1) "1618469123380-0"
             2) 1) "name"
                2) "zhangsan"
          2) 1) "1618469127777-0"
             2) 1) "name"
                2) "lisi"
    

    如果想继续拉取消息,需要传入上一条消息的 ID:

    127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 1618469127777-0
    (nil)
    

    没有消息,Redis 会返回 NULL。
    在这里插入图片描述
    以上就是 Stream 最简单的生产、消费。
    这里不再重点介绍 Stream 命令的各种参数,我在例子中演示时,凡是大写的单词都是「固定」参数,凡是小写的单词,都是可以自己定义的,例如队列名、消息长度等等,下面的例子规则也是一样,为了方便你理解,这里有必要提醒一下。
    下面我们来看,针对前面提到的消息队列要求,Stream 都是如何解决的?

    1、Stream 是否支持「阻塞式」拉取消息?
    可以的,在读取消息时,只需要增加 BLOCK 参数即可。

    // BLOCK 0 表示阻塞等待,不设置超时时间
    127.0.0.1:6379> XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0
    

    这时,消费者就会阻塞等待,直到生产者发布新的消息才会返回。

    2、Stream 是否支持发布 / 订阅模式?
    也没问题,Stream 通过以下命令完成发布订阅:

    • XGROUP:创建消费者组
    • XREADGROUP:在指定消费组下,开启消费者拉取消息

    下面我们来看具体如何做?

    首先,生产者依旧发布 2 条消息:

    127.0.0.1:6379> XADD queue * name zhangsan
    "1618470740565-0"
    127.0.0.1:6379> XADD queue * name lisi
    "1618470743793-0"
    

    之后,我们想要开启 2 组消费者处理同一批数据,就需要创建 2 个消费者组:

    // 创建消费者组1,0-0表示从头拉取消息
    127.0.0.1:6379> XGROUP CREATE queue group1 0-0
    OK
    // 创建消费者组2,0-0表示从头拉取消息
    127.0.0.1:6379> XGROUP CREATE queue group2 0-0
    OK
    

    消费者组创建好之后,我们可以给每个「消费者组」下面挂一个「消费者」,让它们分别处理同一批数据。

    第一个消费组开始消费:

    // group1的consumer开始消费,>表示拉取最新数据
    127.0.0.1:6379> XREADGROUP GROUP group1 consumer COUNT 5 STREAMS queue >
    1) 1) "queue"
       2) 1) 1) "1618470740565-0"
             2) 1) "name"
                2) "zhangsan"
          2) 1) "1618470743793-0"
             2) 1) "name"
                2) "lisi"
    

    同样地,第二个消费组开始消费:

    // group2的consumer开始消费,>表示拉取最新数据
    127.0.0.1:6379> XREADGROUP GROUP group2 consumer COUNT 5 STREAMS queue >
    1) 1) "queue"
       2) 1) 1) "1618470740565-0"
             2) 1) "name"
                2) "zhangsan"
          2) 1) "1618470743793-0"
             2) 1) "name"
                2) "lisi"
    

    我们可以看到,这 2 组消费者,都可以获取同一批数据进行处理了。

    这样一来,就达到了多组消费者「订阅」消费的目的。
    在这里插入图片描述
    3、消息处理时异常,Stream 能否保证消息不丢失,重新消费?
    除了上面拉取消息时用到了消息 ID,这里为了保证重新消费,也要用到这个消息 ID。

    当一组消费者处理完消息后,需要执行 XACK 命令告知 Redis,这时 Redis 就会把这条消息标记为「处理完成」。

    // group1下的 1618472043089-0 消息已处理完成
    127.0.0.1:6379> XACK queue group1 1618472043089-0
    

    在这里插入图片描述
    如果消费者异常宕机,肯定不会发送 XACK,那么 Redis 就会依旧保留这条消息。

    待这组消费者重新上线后,Redis 就会把之前没有处理成功的数据,重新发给这个消费者。这样一来,即使消费者异常,也不会丢失数据了。

    // 消费者重新上线,0-0表示重新拉取未ACK的消息
    127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS queue 0-0
    // 之前没消费成功的数据,依旧可以重新消费
    1) 1) "queue"
       2) 1) 1) "1618472043089-0"
             2) 1) "name"
                2) "zhangsan"
          2) 1) "1618472045158-0"
             2) 1) "name"
                2) "lisi"
    

    4、 Stream 数据会写入到 RDB 和 AOF 做持久化吗?
    Stream 是新增加的数据类型,它与其它数据类型一样,每个写操作,也都会写入到 RDB 和 AOF 中。

    我们只需要配置好持久化策略,这样的话,就算 Redis 宕机重启,Stream 中的数据也可以从 RDB 或 AOF 中恢复回来。

    5、消息堆积时,Stream 是怎么处理的?
    其实,当消息队列发生消息堆积时,一般只有 2 个解决方案:

    生产者限流:避免消费者处理不及时,导致持续积压
    丢弃消息:中间件丢弃旧消息,只保留固定长度的新消息
    而 Redis 在实现 Stream 时,采用了第 2 个方案。

    在发布消息时,你可以指定队列的最大长度,防止队列积压导致内存爆炸。

    // 队列长度最大10000
    127.0.0.1:6379> XADD queue MAXLEN 10000 * name zhangsan
    "1618473015018-0"
    

    当队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息。

    这么来看,Stream 在消息积压时,如果指定了最大长度,还是有可能丢失消息的。
    除了以上介绍到的命令,Stream 还支持查看消息长度(XLEN)、查看消费者状态(XINFO)等命令,使用也比较简单,你可以查询官方文档了解一下,这里就不过多介绍了。
    好了,通过以上介绍,我们可以看到,Redis 的 Stream 几乎覆盖到了消息队列的各种场景,是不是觉得很完美?

    既然它的功能这么强大,这是不是意味着,Redis 真的可以作为专业的消息队列中间件来使用呢?

    但是还「差一点」,就算 Redis 能做到以上这些,也只是「趋近于」专业的消息队列。

    原因在于 Redis 本身的一些问题,如果把其定位成消息队列,还是有些欠缺的。

    到这里,就不得不把 Redis 与专业的队列中间件做对比了。

    下面我们就来看一下,Redis 在作队列时,到底还有哪些欠缺?

    与专业的消息队列对比

    其实,一个专业的消息队列,必须要做到两大块:

    消息不丢
    消息可堆积
    前面我们讨论的重点,很大篇幅围绕的是第一点展开的。

    这里我们换个角度,从一个消息队列的「使用模型」来分析一下,怎么做,才能保证数据不丢?

    使用一个消息队列,其实就分为三大块:生产者、队列中间件、消费者。
    在这里插入图片描述
    消息是否会发生丢失,其重点也就在于以下 3 个环节:

    1、生产者会不会丢消息?
    2、消费者会不会丢消息?
    3、队列中间件会不会丢消息?

    1、 生产者会不会丢消息?
    当生产者在发布消息时,可能发生以下异常情况:

    1、消息没发出去:网络故障或其它问题导致发布失败,中间件直接返回失败
    2、不确定是否发布成功:网络问题导致发布超时,可能数据已发送成功,但读取响应结果超时了
    如果是情况 1,消息根本没发出去,那么重新发一次就好了。

    如果是情况 2,生产者没办法知道消息到底有没有发成功?所以,为了避免消息丢失,它也只能继续重试,直到发布成功为止。
    生产者一般会设定一个最大重试次数,超过上限依旧失败,需要记录日志报警处理。
    也就是说,生产者为了避免消息丢失,只能采用失败重试的方式来处理。

    但发现没有?这也意味着消息可能会重复发送。

    是的,在使用消息队列时,要保证消息不丢,宁可重发,也不能丢弃。

    那消费者这边,就需要多做一些逻辑了。

    对于敏感业务,当消费者收到重复数据数据时,要设计幂等逻辑,保证业务的正确性。

    从这个角度来看,生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。

    所以,无论是 Redis 还是专业的队列中间件,生产者在这一点上都是可以保证消息不丢的。

    2、消费者会不会丢消息?
    这种情况就是我们前面提到的,消费者拿到消息后,还没处理完成,就异常宕机了,那消费者还能否重新消费失败的消息?

    要解决这个问题,消费者在处理完消息后,必须「告知」队列中间件,队列中间件才会把标记已处理,否则仍旧把这些数据发给消费者。

    这种方案需要消费者和中间件互相配合,才能保证消费者这一侧的消息不丢。

    无论是 Redis 的 Stream,还是专业的队列中间件,例如 RabbitMQ、Kafka,其实都是这么做的。

    所以,从这个角度来看,Redis 也是合格的。

    3、队列中间件会不会丢消息?
    前面 2 个问题都比较好处理,只要客户端和服务端配合好,就能保证生产端、消费端都不丢消息。

    但是,如果队列中间件本身就不可靠呢?

    毕竟生产者和消费这都依赖它,如果它不可靠,那么生产者和消费者无论怎么做,都无法保证数据不丢。

    在这个方面,Redis 其实没有达到要求。

    Redis 在以下 2 个场景下,都会导致数据丢失。

    1、AOF 持久化配置为每秒写盘,但这个写盘过程是异步的,Redis 宕机时会存在数据丢失的可能
    2、主从复制也是异步的,主从切换时,也存在丢失数据的可能(从库还未同步完成主库发来的数据,就被提成主库)
    基于以上原因我们可以看到,Redis 本身的无法保证严格的数据完整性。

    所以,如果把 Redis 当做消息队列,在这方面是有可能导致数据丢失的。

    再来看那些专业的消息队列中间件是如何解决这个问题的?

    像 RabbitMQ 或 Kafka 这类专业的队列中间件,在使用时,一般是部署一个集群,生产者在发布消息时,队列中间件通常会写「多个节点」,以此保证消息的完整性。这样一来,即便其中一个节点挂了,也能保证集群的数据不丢失。

    也正因为如此,RabbitMQ、Kafka在设计时也更复杂。毕竟,它们是专门针对队列场景设计的。

    但 Redis 的定位则不同,它的定位更多是当作缓存来用,它们两者在这个方面肯定是存在差异的。

    最后,我们来看消息积压怎么办?

    4、消息积压怎么办?
    因为 Redis 的数据都存储在内存中,这就意味着一旦发生消息积压,则会导致 Redis 的内存持续增长,如果超过机器内存上限,就会面临被 OOM 的风险。

    所以,Redis 的 Stream 提供了可以指定队列最大长度的功能,就是为了避免这种情况发生。

    但 Kafka、RabbitMQ 这类消息队列就不一样了,它们的数据都会存储在磁盘上,磁盘的成本要比内存小得多,当消息积压时,无非就是多占用一些磁盘空间,相比于内存,在面对积压时也会更加「坦然」。

    综上,我们可以看到,把 Redis 当作队列来使用时,始终面临的 2 个问题:

    1、Redis 本身可能会丢数据
    2、面对消息积压,Redis 内存资源紧张
    到这里,Redis 是否可以用作队列,我想这个答案你应该会比较清晰了。

    如果你的业务场景足够简单,对于数据丢失不敏感,而且消息积压概率比较小的情况下,把 Redis 当作队列是完全可以的。

    而且,Redis 相比于 Kafka、RabbitMQ,部署和运维也更加轻量。

    如果你的业务场景对于数据丢失非常敏感,而且写入量非常大,消息积压时会占用很多的机器资源,那么我建议你使用专业的消息队列中间件。

    总结

    好了,总结一下。这篇文章我们从「Redis 能否用作队列」这个角度出发,介绍了 List、Pub/Sub、Stream 在做队列的使用方式,以及它们各自的优劣。

    之后又把 Redis 和专业的消息队列中间件做对比,发现 Redis 的不足之处。

    最后,我们得出 Redis 做队列的合适场景。

    这里我也列了一个表格,总结了它们各自的优缺点。
    在这里插入图片描述

    后记

    最后,我想和你再聊一聊关于「技术方案选型」的问题。

    你应该也看到了,这篇文章虽然始于 Redis,但并不止于 Redis。

    我们在分析 Redis 细节时,一直在提出问题,然后寻找更好的解决方案,在文章最后,又聊到一个专业的消息队列应该怎么做。

    其实,我们在讨论技术选型时,就是一个关于如何取舍的问题。

    而这里我想传达给你的信息是,在面对技术选型时,不要不经过思考就觉得哪个方案好,哪个方案不好。

    你需要根据具体场景具体分析,这里我把这个分析过程分为 2 个层面:

    1、业务功能角度
    2、技术资源角度
    这篇文章所讲到的内容,都是以业务功能角度出发做决策的。

    但这里的第二点,从技术资源角度出发,其实也很重要。

    技术资源的角度是说,你所处的公司环境、技术资源能否匹配这些技术方案。

    这个怎么解释呢?

    简单来讲,就是你所在的公司、团队,是否有匹配的资源能 hold 住这些技术方案。

    我们都知道 Kafka、RabbitMQ 是非常专业的消息中间件,但它们的部署和运维,相比于 Redis 来说,也会更复杂一些。

    如果你在一个大公司,公司本身就有优秀的运维团队,那么使用这些中间件肯定没问题,因为有足够优秀的人能 hold 住这些中间件,公司也会投入人力和时间在这个方向上。

    但如果你是在一个初创公司,业务正处在快速发展期,暂时没有能 hold 住这些中间件的团队和人,如果贸然使用这些组件,当发生故障时,排查问题也会变得很困难,甚至会阻碍业务的发展。

    而这种情形下,如果公司的技术人员对于 Redis 都很熟,综合评估来看,Redis 也基本可以满足业务 90% 的需求,那当下选择 Redis 未必不是一个好的决策。

    所以,做技术选型不只是技术问题,还与人、团队、管理、组织结构有关。

    也正是因为这些原因,当你在和别人讨论技术选型问题时,你会发现每个公司的做法都不相同。

    毕竟每个公司所处的环境和文化不一样,做出的决策当然就会各有差异。

    如果你不了解这其中的逻辑,那在做技术选型时,只会趋于表面现象,无法深入到问题根源。

    而一旦你理解了这个逻辑,那么你在看待这个问题时,不仅对于技术会有更加深刻认识,对技术资源和人的把握,也会更加清晰。

    希望你以后在做技术选型时,能够把这些因素也考虑在内,这对你的技术成长之路也是非常有帮助的。

    展开全文
  • redis的发布/订阅和mq消息队列的区别

    好久没写笔记了,今天记录下使用消息队列的心得.

    本文以reids和rocketmq对比

    很多人一直有个疑问(包括我之前也是):redis支持已经消息队列(发布/订阅)了,为什么还需要mq呢?

    项目已经集成了redis,为什么还要多集成一个mq,那不是显得更臃肿吗?增加了维护成本

    redis和mq相同点:

    解耦

    服务与之间耦合度,比如订单服务与用户积分服务(需求:下单成功,增加积分)

    如果不用消息队列,订单服务和积分服务就要通信,下单后调用积分服务的接口通知积分服务进行处理(或者定时扫描之类的),那么调用接口失败,或者延时等等...一系列的问题要考虑处理,非常繁琐

    用户了消息队列,用户A下单成功后下单服务通过redis发布(mq的生产者)一消息,就不用管了.用户积分服务redis订阅了(mq的消费者),就会受到这用户A下单的消息,进行处理.这就降低了多个服务之间的耦合,即使积分服务发生异常,也不会影响用户正常下单.处理起来就非常的丝滑,各干各的互不影响.

    redis和mq区别

    可靠性和机制不一样

    redis客户端在订阅消息时,要求订阅在发布之前,否则无法订阅到客户端订阅前,已经发布的消息。(,需要先订阅,然后发布的信息才会被订阅者收到)

    redis的消息发布与订阅,无法实现高并发和大数据量。前者受限于redis本身的并发量限制和内存大小;后者是因为redis发布消息时,会先将数据推送到每个客户端的连接缓冲区,如果单个消息过大会撑爆缓冲区,导致redis错误,就算redis没有撑爆缓冲区,如果消费者(订阅方)没有及时取走消息,也会因为数据积累而撑爆内存。

    mq的生产者和消费者则不存在先后关系,生产者只管往队列里面加信息,消费者只管从队列里面取信息

    可以拿送快递来比喻

    redis 是兼职跑腿的

    mq 是专业的快递公司投递

    redis送的快递,你必须在指定位置等到,送来你就能立马拿到,如何送过来如何你没在,那么证明你不要了,过后你再来,你就收不到了.

    mq送的快递,你先来后来无所谓,先来,快递到了立马拿走,如何快递到了,还没来,那么放到门卫处,你到了再拿走.

    现在你该知道如何选择了吧???

    展开全文
  • 目前使用redis消息队列的的方式有3中,list, publish/subscribe, stream list做mq的总结 使用方法 1. 生产者可以 lpush 写入消息,消费者可以 rpop 读取消息,也就是pull模式 2. 消费者可以使用 brpop 阻塞...

    总结

    目前使用redis做消息队列的的方式有3中,list,      publish/subscribe,       stream

    list做mq的总结

    使用方法

    1. 生产者可以 lpush 写入消息,消费者可以 rpop 读取消息,也就是pull模式

    2. 消费者可以使用 brpop 阻塞性读取消息,约等于服务器端的实时推送

    3. 如何保证消息读取但未处理时,消费者程序异常宕机造成的消息丢失,答案:rpoplpushbrpoplpush ,即先从原队列中移除一个消息并插入到一个新队列,消费者处理完该消息后再从新队列中删除,相当于ack机制,避免消费者异常时消息丢失

    缺点

    1.一个消息只能被消费一次,缺乏广播机制

    缺点举例:游戏开发中,玩家登陆时,很多进程需要监听该登陆消息做对应的处理,但是因为list消息只能被消费一次,无法满足需求(虽然你可以为登陆事件搞多个list,每个关心登陆事件的进程监听一个list,登陆时也把一个消息压入多个list,只是生产者消费者太过耦合)

    publish/subscribe

    使用方法

    1. 生产者可以 publish 写入消息,消费者可以 subscribe 监听消息,也就是push模式

    2. 多个消费者可以 subscribe 同一个消息,消费者publist后,所有的消费者都能收到通知,以此解决了list中一个消息只能被消费一次的缺点

    3. 可以使用通配符*进行简单的正则匹配,比如多个类似channel(广东深圳频道,广东广州频道,广东东莞频道)这个时候消费者可以 subscribe 广东*频道 来监听所有这样的频道

    缺点

    1. 解决了list做mq时消息不能广播的问题

    2. 服务器是即时推送,不保存消息,所以消费者一旦断线,消息就丢失了

    3. 消息无堆积,不能削峰填谷。因为服务器不考虑消费者,只按照生产者的速度推送。若消费者速度慢,消息就会在client连接的buf中堆积,超过阀值后会断开连接,这样消息就全部丢失了

    stream

    使用方法

    具体使用过可参考笔者的这篇博客

    redis 流 stream的使用总结 - 基础命令_YZF_Kevin的博客-CSDN博客_redis 流

    优点

    1. 完全对标kafka的模型重新设计的全内存的mq,速度高于kafka

    2. pull模式,且有ack机制。且数据也会随着持久化保存在rdb和aof文件中,即使redis重启,保证数据不会丢失

    3. 有消费者组的概念,一个topic可以有多个消费者组,一个消费者组内可以有多个消费者,既支持消息广播(一条msg能被多个不同的消费者组重复消费),又可以水平扩展(一个消费者组内增加多个消费者依次增加处理速度)

    缺点

    1. 完全用内存做消息堆积,相比那些以硬盘做堆积的,成本较高

    2. redis存在数据丢失的可能性(单点redis持久化时everysec也可能会丢失一秒的数据;集群redis的主从同步也可能会丢失数据)

    展开全文
  • Redis和消息队列

    千次阅读 2021-09-02 17:04:29
    Redis 一、Redis数据类型、Redis数据结构、Redis使用场景 Redis数据类型 键的类型只能是字符串 值支持5种数据类型: 字符串String,可以存储字符串、整数、浮点数 列表list 集合set 散列表hash,包含键值...
  • Redis消息队列

    2022-07-04 17:13:54
    所以常用来做异步队列使用。将需要延后处理的任务结构体序列化成字符串塞进 Redis 的列表,另一个线程从这个列表中轮询数据进行处理。 通过 LPUSH,RPOP 这样的方式,会存在一个性能风险点,就是消费者如果想要及时...
  • 从零到一 用nodejs 手写一个简易的消息队列,手把手教学。emm 如果还是不会 ,去仓库拿吧
  • rabbitmq:具有消息消费确认机制,如果发布一条消息,还没有消费者消费该队列,那么这条消息将一直存放在队列中,直到有消费者消费了该条消息,以此可以保证消息的可靠消费; 实时性 redis:实时性高,redis作为高效...
  • Redis消息队列——Redis Stream

    千次阅读 2022-05-26 09:05:16
    文章目录消息队列为什么不使用Redis 发布订阅 (pub/sub) 来实现消息队列Stream消息队列相关命令:消费者组相关命令:Stream最简单的生产、消费模型Stream 优点/改进Stream 支持「阻塞式」拉取消息支持发布 / 订阅...
  • Springboot 实现Redis 消息队列 之前被面试官问到怎么实现Redis消息队列,我人麻了,当时一个劲的摇头,娘的,欺负我那时知识少,恶心啊  最近看到一个Demo,然后随笔记录了一篇,以便记录自己的成长过程。 一、...
  • 消息队列处理后台任务带来的问题 项目中经常会有后台运行任务的需求,比如发送邮件时,因为要连接邮件服务器,往往需要5-10秒甚至更长时间,如果能先给用户一个成功的提示信息,然后在后台慢慢处理发送邮件的操作,...
  • Redis消息队列的实现消息队列一直是中间件三剑客(RedisMQ、MySQL)中的重要一环,它能够实现异步、削峰、解耦等功能,特别在一些分布式系统架构中优势发挥的淋漓尽致,目前比较成熟的消息中间件种类很多如...
  • redis消息队列

    2020-11-25 16:52:03
    redis消息队列,实现了kafka队列的调度(顺序消费,避免高并发内存溢出),大家参考,提高开发速度
  • 这些消息队列需要独立安装部署,作为一个中间件来提供服务,虽然有着高性能、高可靠的优点,但是额外部署这些中间件也会增加运维成本,服务器成本。 本篇文章探讨了一下如何使用redis实现消息队列。使用redis无需...
  • 前言: 本文基于jedis 2.9.0.jar、commons-pool2-2.4.2.jar以及json-20160810.jar ...1、jedis的消息队列方法简述 1.1、发布消息方法 (其中,channel是对应消息通道,message是对应消息体) jedis.p...
  • redis实现普通消息队列与延迟消息队列1.redis实现普通消息队列1.1 实现原理1.2 pom.xml1.3 JedisUtils工具类1.4 消息类1.4 消息队列类1.5 消息入队测试1.5 消息出队测试2.redis实现延迟消息队列2.1 实现原理2.2 pom....
  • 为啥造轮子?redis消息队列
  • 主要用于探究使用Redis的使用场景,以及Redis作为消息队列,与专业的MQ的区别,各自的使用场景。
  • 消息队列1.1 基于List结构模拟消息队列1.2 基于PubSub的消息队列1.3 基于Stream的消息队列2. 基于Stream的消息队列---消费者组2.1 消费者组介绍2.2 消费者监听消息基本思路2.3 消费者组总结3. 基于Stream的消息队列...
  • Java redis实现消息队列

    千次阅读 2021-07-01 11:16:00
    文章目录一、单元测试Java多线程二、redis实现消息队列三、java多线程模拟生产者消费者 一、单元测试Java多线程 使用junit测试多线程代码,但是等到程序结束,输出结果不完整,或者是完全没结果,因此,可能是其他...
  • redis消息队列,你还不敢用?

    千次阅读 2022-01-16 18:12:39
    消息队列要能支持组件通信消息的快速读写,而 Redis 作为一款常用的缓存组件,本身支持数据的高速访问,正好可以满足消息队列的读写性能需求。不过,除了性能,消息队列还有其他的要求,所以,很多人都很关心一个...
  • 因为 List 底层的实现就是一个「链表」,在头部尾部操作元素,时间复杂度都是 O(1),这意味着它非常符合消息队列的模型。 如果把 List 当作队列,你可以这么来用。 生产者使用 LPUSH 发布消息: 127.0.0.1:6379>...
  • 是一个key-value的NoSQL数据库,redis的设计是用来做缓存的,他是一个内存数据库,不过因为其某些特性适合用来充当队列(list),多被用于做简单的mq,有阻塞式的API可以做消息队列redis可以实现主从同步,负载
  • redis在我们的项目中一般作为缓存使用,在我们团队中偶然听到有在讨论要实现一个消息队列的功能。 领导在向一个负责实现此功能的同事下达要使用redis实现这项功能的时候,好像在沟通中出现了很多问题(注:我们的...
  • Redis 是一个Key-Value的NoSQL数据库,开发维护很活跃,虽然它是一个Key-Value数据库存储系统,但它本身支持MQ功能,所以完全可以当做一个轻量级的队列服务来使用。 对于RabbitMQ和Redis的入队出队操作,各执行100...
  • 使用Redis搭建消息队列(python版)

    千次阅读 2022-03-26 20:47:07
    这种方法存在的问题,因为是通过同步的定时任务的方式,会因为数据量太大导致定时任务执行超时,导致事务回滚,数据库中未创建对应消息的记录,所以导致消息无法确认。 在发现这个问题之后,我们的第一反应是把定时...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 27,270
精华内容 10,908
关键字:

redis消息队列和mq