精华内容
下载资源
问答
  • 为什么不用redis做消息队列
    千次阅读
    2022-01-13 20:01:15

    消息队列

    消息队列在存取消息时,必须要满足三个需求,分别是消息保序、处理重复的消息和保证消息可靠性。

    消息保序

    虽然消费者是异步处理消息,但是,消费者仍然需要按照生产者发送消息的顺序来处理消息,避免后发送的消息被先处理了。对于要求消息保序的场景来说,一旦出现这种消息被乱序处理的情况,就可能会导致业务逻辑被错误执行,从而给业务方造成损失。

    重复消息处理

    消费者从消息队列读取消息时,有时会因为网络堵塞而出现消息重传的情况。此时,消费者可能会收到多条重复的消息。对于重复的消息,消费者如果多次处理的话,就可能造成一个业务逻辑被多次执行,如果业务逻辑正好是要修改数据,那就会出现数据被多次修改的问题了。

    消息可靠性保证

    另外,消费者在处理消息的时候,还可能出现因为故障或宕机导致消息没有处理完成的情况。此时,消息队列需要能提供消息可靠性的保证,也就是说,当消费者重启后,可以重新读取消息再次进行处理,否则,就会出现消息漏处理的问题了。

    List 作为消息队列

    List 本身就是按先进先出的顺序对数据进行存取的,所以,如果使用 List 作为消息队列保存消息的话,就已经能满足消息保序的需求了。
    产者可以使用 LPUSH 命令把要发送的消息依次写入 List,而消费者则可以使用 RPOP 命令,从 List 的另一端按照消息的写入顺序,依次读取消息并进行处理。

    生产者往 List 中写入数据时,List 并不会主动地通知消费者有新消息写入,如果消费者想要及时处理消息,就需要在程序中不停地调用 RPOP 命令(比如使用一个 while(1) 循环)。如果有新消息写入,RPOP 命令就会返回结果,否则,RPOP 命令返回空值,再继续循环。所以,即使没有新消息写入 List,消费者也要不停地调用 RPOP 命令,这就会导致消费者程序的 CPU 一直消耗在执行 RPOP 命令上,带来不必要的性能损失。

    Redis 提供了 BRPOP 命令。BRPOP 命令也称为阻塞式读取,客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。和消费者程序自己不停地调用 RPOP 命令相比,这种方式能节省 CPU 开销。

    List 本身是不会为每个消息生成 ID 号的,所以,消息的全局唯一 ID 号就需要生产者程序在发送消息前自行生成。生成之后,我们在用 LPUSH 命令把消息插入 List 时,需要在消息中包含这个全局唯一 ID。

    缺陷:生产者消息发送很快,而消费者处理消息的速度比较慢,这就导致 List 中的消息越积越多,给 Redis 的内存带来很大压力。

    Streams 的消息队列、

    Streams 是 Redis 专门为消息队列设计的数据类型,它提供了丰富的消息队列操作命令。

    • XADD:插入消息,保证有序,可以自动生成全局唯一 ID;
    • XREAD:用于读取消息,可以按 ID 读取数据;
    • XREADGROUP:按消费组形式读取消息;
    • XPENDING 和 XACK:XPENDING 命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息,而 XACK 命令用于向消息队列确认消息处理已完成。
      XADD 命令可以往消息队列中插入新消息,消息的格式是键 - 值对形式。对于插入的每一条消息,Streams 可以自动为其生成一个全局唯一的 ID。
      当消费者需要读取消息时,可以直接使用 XREAD 命令从消息队列中读取。
    XADD mqstream * repo 5
    "1599203861727-0"
    
    XREAD BLOCK 100 STREAMS mqstream 1599203861727-0
    1) 1) "mqstream" 
       2) 1) 1) "1599274912765-0" 
          2) 1) "repo" 
             2) "3" 
       	   2) 1) "1599274925823-0" 
       	      2) 1) "repo" 
       	         2) "2" 
       	      3) 1) "1599274927910-0" 
       	         2) 1) "repo" 
       	            2) "1"
    

    消费者也可以在调用 XRAED 时设定 block 配置项,实现类似于 BRPOP 的阻塞读取操作。当消息队列中没有消息时,一旦设置了 block 配置项,XREAD 就会阻塞,阻塞的时长可以在 block 配置项进行设置。

    Streams 本身可以使用 XGROUP 创建消费组,创建消费组之后,Streams 可以使用 XREADGROUP 命令让消费组内的消费者读取消息,

    GROUP create mqstream group1 0
    OK
    

    然后,我们再执行一段命令,让 group1 消费组里的消费者 consumer1 从 mqstream 中读取所有消息,其中,命令最后的参数“>”,表示从第一条尚未被消费的消息开始读取。因为在 consumer1 读取消息前,group1 中没有其他消费者读取过消息,所以,consumer1 就得到 mqstream 消息队列中的所有消息了(一共 4 条)。

    
    XREADGROUP group group1 consumer1 streams mqstream >
    1) 1) "mqstream"
       2) 1) 1) "1599203861727-0"
             2) 1) "repo"
                2) "5"
          2) 1) "1599274912765-0"
             2) 1) "repo"
                2) "3"
          3) 1) "1599274925823-0"
             2) 1) "repo"
                2) "2"
          4) 1) "1599274927910-0"
             2) 1) "repo"
                2) "1"
    

    消息队列中的消息一旦被消费组里的一个消费者读取了,就不能再被该消费组内的其他消费者读取了
    使用消费组的目的是让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的。

    更多相关内容
  • 消息队列为什么不用redis作为队列

    千次阅读 2021-04-29 08:45:02
    文章目录1 引言1.1 Redis中List队列1.1.1 简单使用1.1.2 解决cpu空转问题1.1.3 Redis阻塞式拉取1.2 Redis发布订阅1.2.1 简单使用1.2.2 发布订阅的缺点1.3 Redis中的Stream1.3.1 简单使用1.3.2 stream阻塞拉取1.3.3 ...


    本文转载于: https://mp.weixin.qq.com/s/uhMrqR__6qgpl7vrE_otTQ

    1 引言

    我经常听到很多人讨论,关于把 Redis 当作队列来用是否合适的问题。
    有些人表示赞成,他们认为 Redis 很轻量,用作队列很方便。
    也些人则反对,认为 Redis数据,最好还是用专业的队列中间件更稳妥

    究竟哪种方案更好呢?
    这篇文章,就聊一聊把 Redis 当作队列,究竟是否合适这个问题。
    从简单到复杂,一步步梳理其中的细节,把这个问题真正的讲清楚。
    看完这篇文章后对这个问题你会有全新的认识。
    在文章的最后,还会告诉你关于技术选型的思路,文章有点长,希望你可以耐心读完

    1.1 Redis中List队列

    1.1.1 简单使用

    从最简单的开始:List 队列
    首先,我们先从最简单的场景开始讲起
    如果你的业务需求足够简单,想把 Redis 当作队列来使用,肯定最先想到的就是使用 List 这个数据类型
    因为List底层的实现就是一个链表,在头部和尾部操作元素,时间复杂度都是 O(1),这意味着它非常符合消息队列的模型。

    如果把 List 当作队列,你可以这么来用。

    生产者使用 LPUSH 发布消息:

    127.0.0.1:6379> LPUSH queue msg1
    (integer) 1
    127.0.0.1:6379> LPUSH queue msg2
    (integer) 2
    

    消费者这一侧,使用 RPOP 拉取消息:

    127.0.0.1:6379> RPOP queue
    "msg1"
    127.0.0.1:6379> RPOP queue
    "msg2"
    

    这个模型非常简单,也很容易理解。
    在这里插入图片描述
    但这里有个小问题,当队列中已经没有消息了,消费者在执行 RPOP 时,会返回 NULL

    127.0.0.1:6379> RPOP queue
    (nil)   // 没消息了
    

    而我们在编写消费者逻辑时,一般是一个死循环,这个逻辑需要不断地从队列中拉取消息进行处理,伪代码一般会这么写:

    while true:
        msg = redis.rpop("queue")
        // 没有消息,继续循环
        if msg == null:
            continue
        // 处理消息
        handle(msg)
    

    如果此时队列为空,那消费者依旧会频繁拉取消息,这会造成CPU 空转,不仅浪费 CPU 资源,还会对 Redis 造成压力。

    1.1.2 解决cpu空转问题

    怎么解决这个问题呢?
    也很简单,当队列为空时,我们可以休眠一会,再去尝试拉取消息。代码可以修改成这样:

    while true:
        msg = redis.rpop("queue")
        // 没有消息,休眠2s
        if msg == null:
            sleep(2)
            continue
        // 处理消息        
        handle(msg)
    

    这就解决了 CPU 空转问题
    这个问题虽然解决了,但又带来另外一个问题:当消费者在休眠等待时,有新消息来了,那消费者处理新消息就会存在延迟
    假设设置的休眠时间是 2s,那新消息最多存在 2s 的延迟。

    1.1.3 Redis阻塞式拉取

    要想缩短这个延迟,只能减小休眠的时间。但休眠时间越小,又有可能引发 CPU 空转问题。
    鱼和熊掌不可兼得
    那如何做,既能及时处理新消息,还能避免 CPU 空转呢?
    Redis 是否存在这样一种机制:如果队列为空,消费者在拉取消息时就阻塞等待,一旦有新消息过来,就通知我的消费者立即处理新消息呢?
    幸运的是,Redis 确实提供了「阻塞式」拉取消息的命令:BRPOP / BLPOP,这里的 B 指的是阻塞Block
    在这里插入图片描述

    现在,你可以这样来拉取消息了:

    while true:
        // 没消息阻塞等待,0表示不设置超时时间
        msg = redis.brpop("queue", 0)
        if msg == null:
            continue
        // 处理消息
        handle(msg)
    

    使用 BRPOP 这种阻塞式方式拉取消息时,还支持传入一个超时时间,如果设置为 0,则表示不设置超时,直到有新消息才返回,否则会在指定的超时时间后返回 NULL
    这个方案不错,既兼顾了效率,还避免了 CPU 空转问题,一举两得

    注意:如果设置的超时时间太长,这个连接太久没有活跃过,可能会被 Redis Server 判定为无效连接,之后 Redis Server 会强制把这个客户端踢下线。所以,采用这种方案,客户端要有重连机制。

    解决了消息处理不及时的问题,你可以再思考一下,这种队列模型,有什么缺点?

    • 不支持重复消费:消费者拉取消息后,这条消息就从 List 中删除了,无法被其它消费者再次消费,即不支持多个消费者消费同一批数据
    • 消息丢失:消费者拉取到消息后,如果发生异常宕机,那这条消息就丢失了

    第一个问题是功能上的,使用 List 做消息队列,它仅仅支持最简单的,一组生产者对应一组消费者,不能满足多组生产者和消费者的业务场景
    第二个问题就比较棘手了,因为从 ListPOP 一条消息出来后,这条消息就会立即从链表中删除了。也就是说,无论消费者是否处理成功,这条消息都没办法再次消费了。这也意味着,如果消费者在处理消息时异常宕机,那这条消息就相当于丢失了。

    1.2 Redis发布订阅

    1.2.1 简单使用

    发布/订阅模型:Pub/Sub
    从名字就能看出来,这个模块是 Redis 专门是针对发布/订阅这种队列模型设计的
    它正好可以解决前面提到的第一个问题:重复消费
    即多组生产者、消费者的场景,我们来看它是如何做的。
    Redis 提供了 PUBLISH / SUBSCRIBE 命令,来完成发布、订阅的操作。
    在这里插入图片描述
    假设你想开启 2 个消费者,同时消费同一批数据,就可以按照以下方式来实现。
    首先,使用 SUBSCRIBE 命令,启动 2 个消费者,并订阅同一个队列。

    // 2个消费者 都订阅一个队列
    127.0.0.1:6379> SUBSCRIBE queue
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "queue"
    3) (integer) 1
    

    此时,2 个消费者都会被阻塞住,等待新消息的到来。
    之后,再启动一个生产者,发布一条消息。

    127.0.0.1:6379> PUBLISH queue msg1
    (integer) 1
    

    这时,2 个消费者就会解除阻塞,收到生产者发来的新消息。

    127.0.0.1:6379> SUBSCRIBE queue
    // 收到新消息
    1) "message"
    2) "queue"
    3) "msg1"
    

    看到了么,使用 Pub/Sub 这种方案,既支持阻塞式拉取消息,还很好地满足了多组消费者,消费同一批数据的业务需求。
    除此之外,Pub/Sub 还提供了匹配订阅模式,允许消费者根据一定规则,订阅多个自己感兴趣的队列

    // 订阅符合规则的队列
    127.0.0.1:6379> PSUBSCRIBE queue.*
    Reading messages... (press Ctrl-C to quit)
    1) "psubscribe"
    2) "queue.*"
    3) (integer) 1
    

    这里的消费者,订阅了 queue.*相关的队列消息。
    之后,生产者分别向 queue.p1queue.p2 发布消息。

    127.0.0.1:6379> PUBLISH queue.p1 msg1
    (integer) 1
    127.0.0.1:6379> PUBLISH queue.p2 msg2
    (integer) 1
    

    这时再看消费者,它就可以接收到这 2 个生产者的消息了。

    127.0.0.1:6379> PSUBSCRIBE queue.*
    Reading messages... (press Ctrl-C to quit)
    ...
    // 来自queue.p1的消息
    1) "pmessage"
    2) "queue.*"
    3) "queue.p1"
    4) "msg1"
    
    // 来自queue.p2的消息
    1) "pmessage"
    2) "queue.*"
    3) "queue.p2"
    4) "msg2"
    

    在这里插入图片描述
    我们可以看到,Pub/Sub 最大的优势就是,支持多组生产者、消费者处理消息。

    1.2.2 发布订阅的缺点

    讲完了它的优点,那它有什么缺点呢?
    其实,Pub/Sub 最大问题是:丢数据

    如果发生以下场景,就有可能导致数据丢失:

    • 消费者下线
    • Redis 宕机
    • 消息堆积

    究竟是怎么回事?
    这其实与 Pub/Sub 的实现方式有很大关系。
    Pub/Sub 在实现时非常简单,它没有基于任何数据类型,也没有做任何的数据存储,它只是单纯地为生产者、消费者建立数据转发通道,把符合规则的数据,从一端转发到另一端。

    一个完整的发布、订阅消息处理流程是这样的:

    • 消费者订阅指定队列,Redis 就会记录一个映射关系:队列->消费者
    • 生产者向这个队列发布消息,那 Redis 就从映射关系中找出对应的消费者,把消息转发给它

    在这里插入图片描述
    看到了么,整个过程中,没有任何的数据存储,一切都是实时转发的。
    这种设计方案,就导致了上面提到的那些问题。
    例如,如果一个消费者异常挂掉了,它再重新上线后,只能接收新的消息,在下线期间生产者发布的消息,因为找不到消费者,都会被丢弃掉。
    如果所有消费者都下线了,那生产者发布的消息,因为找不到任何一个消费者,也会全部丢弃
    所以,当你在使用 Pub/Sub 时,一定要注意:消费者必须先订阅队列,生产者才能发布消息,否则消息会丢失
    这也是前面讲例子时,我们让消费者先订阅队列,之后才让生产者发布消息的原因。
    另外,因为 Pub/Sub 没有基于任何数据类型实现,所以它也不具备数据持久化的能力。
    也就是说,Pub/Sub 的相关操作,不会写入到 RDBAOF 中,当 Redis 宕机重启,Pub/Sub 的数据也会全部丢失。

    最后,我们来看 Pub/Sub 在处理消息积压时,为什么也会丢数据?
    当消费者的速度,跟不上生产者时,就会导致数据积压的情况发生。
    如果采用 List 当作队列,消息积压时,会导致这个链表很长,最直接的影响就是,Redis 内存会持续增长,直到消费者把所有数据都从链表中取出。
    Pub/Sub 的处理方式却不一样,当消息积压时,有可能会导致消费失败和消息丢失!

    这是怎么回事?
    还是回到 Pub/Sub 的实现细节上来说。
    每个消费者订阅一个队列时,Redis 都会在 Server 上给这个消费者在分配一个缓冲区,这个缓冲区其实就是一块内存。
    当生产者发布消息时,Redis 先把消息写到对应消费者的缓冲区中。
    之后,消费者不断地从缓冲区读取消息,处理消息。

    在这里插入图片描述
    但是,问题就出在这个缓冲区
    因为这个缓冲区其实是有上限的(可配置),如果消费者拉取消息很慢,就会造成生产者发布到缓冲区的消息开始积压,缓冲区内存持续增长。
    如果超过了缓冲区配置的上限,此时,Redis 就会强制把这个消费者踢下线
    这时消费者就会消费失败,也会丢失数据。

    如果你有看过 Redis 的配置文件,可以看到这个缓冲区的默认配置:client-output-buffer-limit pubsub 32mb 8mb 60
    它的参数含义如下:

    • 32mb:缓冲区一旦超过 32MB,Redis 直接强制把消费者踢下线
    • 8mb + 60:缓冲区超过 8MB,并且持续 60 秒,Redis 也会把消费者踢下线

    Pub/Sub 的这一点特点,是与 List 作队列差异比较大的
    从这里你应该可以看出,List 其实是属于模型,而 Pub/Sub 其实属于模型。
    List 中的数据可以一直积压在内存中,消费者什么时候来都可以。
    Pub/Sub 是把消息先到消费者在 Redis Server 上的缓冲区中,然后等消费者再来取。
    当生产、消费速度不匹配时,就会导致缓冲区的内存开始膨胀,Redis 为了控制缓冲区的上限,所以就有了上面讲到的,强制把消费者踢下线的机制。

    好了,现在我们总结一下 Pub/Sub 的优缺点:

    • 支持发布 / 订阅,支持多组生产者、消费者处理消息
    • 消费者下线,数据会丢失
    • 不支持数据持久化,Redis 宕机,数据也会丢失
    • 消息堆积,缓冲区溢出,消费者会被强制踢下线,数据也会丢失

    有没有发现,除了第一个是优点之外,剩下的都是缺点。
    所以,很多人看到 Pub/Sub 的特点后,觉得这个功能很鸡肋
    也正是以上原因,Pub/Sub 在实际的应用场景中用得并不多
    目前只有哨兵集群和Redis 实例通信时,采用了 Pub/Sub 的方案,因为哨兵正好符合即时通讯的业务场景。
    我们再来看一下,Pub/Sub 有没有解决,消息处理时异常宕机,无法再次消费的问题呢?
    其实也不行,Pub/Sub 从缓冲区取走数据之后,数据就从 Redis 缓冲区删除了,消费者发生异常,自然也无法再次重新消费。

    好,现在我们重新梳理一下,我们在使用消息队列时的需求。

    当我们在使用一个消息队列时,希望它的功能如下:

    • 支持阻塞等待拉取消息
    • 支持发布 / 订阅模式
    • 消费失败,可重新消费,消息不丢失
    • 实例宕机,消息不丢失,数据可持久化
    • 消息可堆积

    Redis 除了 ListPub/Sub 之外,还有符合这些要求的数据类型吗?
    其实,Redis 的作者也看到了以上这些问题,也一直在朝着这些方向努力着。
    Redis 作者在开发 Redis 期间,还另外开发了一个开源项目 disque
    这个项目的定位,就是一个基于内存的分布式消息队列中间件。
    但由于种种原因,这个项目一直不温不火。
    终于,在 Redis 5.0 版本,作者把 disque 功能移植到了 Redis 中,并给它定义了一个新的数据类型:Stream
    下面我们就来看看,它能符合上面提到的这些要求吗?

    1.3 Redis中的Stream

    1.3.1 简单使用

    趋于成熟的队列:Stream
    我们来看 Stream 是如何解决上面这些问题的
    我们依旧从简单到复杂,依次来看 Stream 在做消息队列时,是如何处理的?

    首先,Stream 通过 XADDXREAD 完成最简单的生产、消费模型:

    • XADD:发布消息
    • XREAD:读取消息

    生产者发布 2 条消息:

    // *表示让Redis自动生成消息ID
    127.0.0.1:6379> XADD queue * name zhangsan
    "1618469123380-0"
    127.0.0.1:6379> XADD queue * name lisi
    "1618469127777-0"
    

    使用 XADD 命令发布消息,其中的*表示让 Redis 自动生成唯一的消息 ID
    这个消息 ID 的格式是时间戳-自增序号

    消费者拉取消息:

    // 从开头读取5条消息,0-0表示从开头读取
    127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 0-0
    1) 1) "queue"
       2) 1) 1) "1618469123380-0"
             2) 1) "name"
                2) "zhangsan"
          2) 1) "1618469127777-0"
             2) 1) "name"
                2) "lisi"
    

    如果想继续拉取消息,需要传入上一条消息的 ID:

    127.0.0.1:6379> XREAD COUNT 5 STREAMS queue 1618469127777-0
    (nil)
    

    没有消息,Redis 会返回 NULL

    图片

    以上就是 Stream 最简单的生产、消费
    这里不再重点介绍 Stream 命令的各种参数,我在例子中演示时,凡是大写的单词都是固定参数,凡是小写的单词,都是可以自己定义的,例如队列名、消息长度等等,下面的例子规则也是一样,为了方便你理解,这里有必要提醒一下。

    下面我们来看,针对前面提到的消息队列要求,Stream 都是如何解决的?

    1.3.2 stream阻塞拉取

    Stream 是否支持阻塞式拉取消息?
    可以的,在读取消息时,只需要增加 BLOCK 参数即可

    // BLOCK 0 表示阻塞等待,不设置超时时间
    127.0.0.1:6379> XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0
    

    这时,消费者就会阻塞等待,直到生产者发布新的消息才会返回。

    1.3.3 Stream支持发布 / 订阅模式

    也没问题,Stream 通过以下命令完成发布订阅:

    • XGROUP:创建消费者组
    • XREADGROUP:在指定消费组下,开启消费者拉取消息

    下面我们来看具体如何做?
    首先,生产者依旧发布 2 条消息:

    127.0.0.1:6379> XADD queue * name zhangsan
    "1618470740565-0"
    127.0.0.1:6379> XADD queue * name lisi
    "1618470743793-0"
    

    之后,我们想要开启 2 组消费者处理同一批数据,就需要创建 2 个消费者组:

    // 创建消费者组1,0-0表示从头拉取消息
    127.0.0.1:6379> XGROUP CREATE queue group1 0-0
    OK
    // 创建消费者组2,0-0表示从头拉取消息
    127.0.0.1:6379> XGROUP CREATE queue group2 0-0
    OK
    

    消费者组创建好之后,我们可以给每个消费者组下面挂一个消费者,让它们分别处理同一批数据。

    第一个消费组开始消费:

    // group1的consumer开始消费,>表示拉取最新数据
    127.0.0.1:6379> XREADGROUP GROUP group1 consumer COUNT 5 STREAMS queue >
    1) 1) "queue"
       2) 1) 1) "1618470740565-0"
             2) 1) "name"
                2) "zhangsan"
          2) 1) "1618470743793-0"
             2) 1) "name"
                2) "lisi"
    

    同样地,第二个消费组开始消费:

    // group2的consumer开始消费,>表示拉取最新数据
    127.0.0.1:6379> XREADGROUP GROUP group2 consumer COUNT 5 STREAMS queue >
    1) 1) "queue"
       2) 1) 1) "1618470740565-0"
             2) 1) "name"
                2) "zhangsan"
          2) 1) "1618470743793-0"
             2) 1) "name"
                2) "lisi"
    

    我们可以看到,这 2 组消费者,都可以获取同一批数据进行处理了。

    这样一来,就达到了多组消费者「订阅」消费的目的。

    在这里插入图片描述

    1.3.4 stream不丢消息

    消息处理时异常,Stream 能否保证消息不丢失,重新消费?
    除了上面拉取消息时用到了消息 ID,这里为了保证重新消费,也要用到这个消息 ID。
    当一组消费者处理完消息后,需要执行 XACK 命令告知 Redis,这时 Redis 就会把这条消息标记为处理完成

    // group1下的 1618472043089-0 消息已处理完成
    127.0.0.1:6379> XACK queue group1 1618472043089-0
    

    图片

    如果消费者异常宕机,肯定不会发送 XACK,那么 Redis 就会依旧保留这条消息。
    待这组消费者重新上线后,Redis 就会把之前没有处理成功的数据,重新发给这个消费者。这样一来,即使消费者异常,也不会丢失数据了。

    // 消费者重新上线,0-0表示重新拉取未ACK的消息
    127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS queue 0-0
    // 之前没消费成功的数据,依旧可以重新消费
    1) 1) "queue"
       2) 1) 1) "1618472043089-0"
             2) 1) "name"
                2) "zhangsan"
          2) 1) "1618472045158-0"
             2) 1) "name"
                2) "lisi"
    

    1.3.5 stream持久化处理

    Stream 是新增加的数据类型,它与其它数据类型一样,每个写操作,也都会写入到 RDBAOF
    我们只需要配置好持久化策略,这样的话,就算 Redis 宕机重启,Stream 中的数据也可以从 RDBAOF 中恢复回来。

    1.3.6 stream消息堆积

    消息堆积时,Stream 是怎么处理的?
    其实,当消息队列发生消息堆积时,一般只有 2 个解决方案:

    • 生产者限流:避免消费者处理不及时,导致持续积压
    • 丢弃消息:中间件丢弃旧消息,只保留固定长度的新消息

    Redis 在实现 Stream 时,采用了第 2 个方案。

    在发布消息时,你可以指定队列的最大长度,防止队列积压导致内存爆炸。

    // 队列长度最大10000
    127.0.0.1:6379> XADD queue MAXLEN 10000 * name zhangsan
    "1618473015018-0"
    

    当队列长度超过上限后,旧消息会被删除,只保留固定长度的新消息。
    这么来看,Stream 在消息积压时,如果指定了最大长度,还是有可能丢失消息的。
    除了以上介绍到的命令,Stream 还支持查看消息长度XLEN、查看消费者状态XINFO等命令,使用也比较简单,你可以查询官方文档了解一下,这里就不过多介绍了。

    好了,通过以上介绍,我们可以看到,RedisStream 几乎覆盖到了消息队列的各种场景,是不是觉得很完美?
    既然它的功能这么强大,这是不是意味着,Redis 真的可以作为专业的消息队列中间件来使用呢?
    但是还差一点,就算 Redis 能做到以上这些,也只是趋近于专业的消息队列。
    原因在于 Redis 本身的一些问题,如果把其定位成消息队列,还是有些欠缺的。
    到这里,就不得不把 Redis 与专业的队列中间件做对比了。

    1.4 与专业消息对比

    与专业的消息队列对比
    其实,一个专业的消息队列,必须要做到两大块:

    • 消息不丢
    • 消息可堆积

    前面我们讨论的重点,很大篇幅围绕的是第一点展开的。
    这里我们换个角度,从一个消息队列的「使用模型」来分析一下,怎么做,才能保证数据不丢?

    使用一个消息队列,其实就分为三大块:生产者、队列中间件、消费者
    在这里插入图片描述

    消息是否会发生丢失,其重点也就在于以下 3 个环节:

    • 生产者会不会丢消息?
    • 消费者会不会丢消息?
    • 队列中间件会不会丢消息?

    1.4.1 生产者会不会丢消息

    当生产者在发布消息时,可能发生以下异常情况:

    • 消息没发出去:网络故障或其它问题导致发布失败,中间件直接返回失败
    • 不确定是否发布成功:网络问题导致发布超时,可能数据已发送成功,但读取响应结果超时了

    如果是情况 1,消息根本没发出去,那么重新发一次就好了。
    如果是情况 2,生产者没办法知道消息到底有没有发成功?所以,为了避免消息丢失,它也只能继续重试,直到发布成功为止。
    生产者一般会设定一个最大重试次数,超过上限依旧失败,需要记录日志报警处理。
    也就是说,生产者为了避免消息丢失,只能采用失败重试的方式来处理。
    但发现没有?这也意味着消息可能会重复发送。
    是的,在使用消息队列时,要保证消息不丢,宁可重发,也不能丢弃。
    那消费者这边,就需要多做一些逻辑了。
    对于敏感业务,当消费者收到重复数据数据时,要设计幂等逻辑,保证业务的正确性。
    从这个角度来看,生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。
    所以,无论是 Redis 还是专业的队列中间件,生产者在这一点上都是可以保证消息不丢的。

    1.4.2 消费者会不会丢消息

    这种情况就是我们前面提到的,消费者拿到消息后,还没处理完成,就异常宕机了,那消费者还能否重新消费失败的消息?
    要解决这个问题,消费者在处理完消息后,必须「告知」队列中间件,队列中间件才会把标记已处理,否则仍旧把这些数据发给消费者。
    这种方案需要消费者和中间件互相配合,才能保证消费者这一侧的消息不丢。
    无论是 RedisStream,还是专业的队列中间件,例如 RabbitMQ、Kafka,其实都是这么做的。
    所以,从这个角度来看,Redis 也是合格的。

    1.4.3 队列中间件会不会丢消息

    前面 2 个问题都比较好处理,只要客户端和服务端配合好,就能保证生产端、消费端都不丢消息。
    但是,如果队列中间件本身就不可靠呢?

    毕竟生产者和消费这都依赖它,如果它不可靠,那么生产者和消费者无论怎么做,都无法保证数据不丢。
    在这个方面,Redis 其实没有达到要求。
    Redis 在以下 2 个场景下,都会导致数据丢失。
    AOF 持久化配置为每秒写盘,但这个写盘过程是异步的,Redis 宕机时会存在数据丢失的可能
    主从复制也是异步的,主从切换时,也存在丢失数据的可能(从库还未同步完成主库发来的数据,就被提成主库)
    基于以上原因我们可以看到,Redis 本身的无法保证严格的数据完整性。
    所以,如果把 Redis 当做消息队列,在这方面是有可能导致数据丢失的。

    再来看那些专业的消息队列中间件是如何解决这个问题的?
    RabbitMQKafka 这类专业的队列中间件,在使用时,一般是部署一个集群,生产者在发布消息时,队列中间件通常会写多个节点,以此保证消息的完整性。这样一来,即便其中一个节点挂了,也能保证集群的数据不丢失。
    也正因为如此,RabbitMQ、Kafka在设计时也更复杂。毕竟,它们是专门针对队列场景设计的。
    Redis 的定位则不同,它的定位更多是当作缓存来用,它们两者在这个方面肯定是存在差异的。

    1.4.4 消息积压怎么办

    因为 Redis 的数据都存储在内存中,这就意味着一旦发生消息积压,则会导致 Redis 的内存持续增长,如果超过机器内存上限,就会面临被 OOM 的风险。
    所以,RedisStream 提供了可以指定队列最大长度的功能,就是为了避免这种情况发生。
    Kafka、RabbitMQ 这类消息队列就不一样了,它们的数据都会存储在磁盘上,磁盘的成本要比内存小得多,当消息积压时,无非就是多占用一些磁盘空间,相比于内存,在面对积压时也会更加坦然

    综上,我们可以看到,把 Redis 当作队列来使用时,始终面临的 2 个问题:

    • Redis 本身可能会丢数据
    • 面对消息积压,Redis 内存资源紧张

    到这里,Redis 是否可以用作队列,我想这个答案你应该会比较清晰了。
    如果你的业务场景足够简单,对于数据丢失不敏感,而且消息积压概率比较小的情况下,把 Redis 当作队列是完全可以的。
    而且,Redis 相比于 Kafka、RabbitMQ,部署和运维也更加轻量。
    如果你的业务场景对于数据丢失非常敏感,而且写入量非常大,消息积压时会占用很多的机器资源,那么我建议你使用专业的消息队列中间件。

    在这里插入图片描述

    展开全文
  • 主要介绍了SpringBoot利用redis集成消息队列的方法,需要的朋友可以参考下
  • 本篇文章主要介绍了Java利用Redis实现消息队列的示例代码,小编觉得挺不错的,现在分享给大家,也给大家个参考。一起跟随小编过来看看吧
  • 主要介绍了基于python操作redis消息队列,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • 主要用于探究使用Redis的使用场景,以及Redis作为消息队列,与专业的MQ的区别,各自的使用场景。

    1.应用场景

    主要用于探究使用Redis的使用场景,以及Redis作为消息队列,与专业的MQ的区别,各自的使用场景

    2.学习/操作

    1.文档阅读

    把Redis当作队列来用,真的合适吗?

    15 | 消息队列的考验:Redis有哪些解决方案?-极客时间

    2.整理输出

    临时插入:

    其实,关于 Redis 是否适合做消息队列,业界一直是有争论的。

    很多人认为,要使用消息队列,就应该采用 Kafka、RabbitMQ 这些专门面向消息队列场景的软件,而 Redis 更加适合做缓存。

    根据这些年做 Redis 研发工作的经验,我的看法是:Redis 是一个非常轻量级的键值数据库,部署一个 Redis 实例就是启动一个进程,部署 Redis 集群,也就是部署多个 Redis 实例。

    而 Kafka、RabbitMQ 部署时,涉及额外的组件,例如 Kafka 的运行就需要再部署 ZooKeeper。

    相比 Redis 来说,Kafka 和 RabbitMQ 一般被认为是重量级的消息队列。

    所以,关于是否用 Redis 做消息队列的问题,不能一概而论,我们需要考虑业务层面的数据体量,以及对性能、可靠性、可扩展性的需求。如果分布式系统中的组件消息通信量不大,那么,Redis 只需要使用有限的内存空间就能满足消息存储的需求,而且,Redis 的高性能特性能支持快速的消息读写,不失为消息队列的一个好的解决方案。

    临时截图

     

    后续补充

    ...

    3.问题/补充

    TBD

    4.参考

    把Redis当作队列来用,真的合适吗?

    15 | 消息队列的考验:Redis有哪些解决方案?-极客时间

    后续补充

    ...

    展开全文
  • redis做消息队列

    2020-11-25 16:52:03
    redis做消息队列,实现了kafka队列的调度(顺序消费,避免高并发内存溢出),大家参考,提高开发速度
  • 本文实例讲述了php+redis实现消息队列功能。...文件:demo.php插入数据到redis队列 <?php $redis = new Redis(); $redis->connect('127.0.0.1',6379); $password = '123456'; $redis->auth($passwor
  • 2. DelayBucket是一组以时间维度的有序队列,用来存放所有需要延迟的Job(这里只存放Job Id)。 3. Timer负责实时扫描各个Bucket,并将delay时间大于等于当前时间的Job放入到对应的Ready Queue。 4. ReadyQueue...
  • 本文实例大家分享了php+redis消息队列实现抢购的具体代码,供大家参考,具体内容如下 实现功能: 1. 基于redis队列,防止高并发的超卖 2. 基于mysql的事务加排它锁,防止高并发的超卖 基于redis队列工作流程:...
  • Redis实现消息队列

    千次阅读 2022-05-04 09:38:54
    Redis中提供了三种实现消息队列的方式: List结构:基于List结构来模拟消息队列 PubSub:基本的点对点消息模型 Stream:较完善的消息队列模型 1. List实现消息队列 Redis的List数据结构类型是一个双向链表,而...

    在Redis中提供了三种实现消息队列的方式:

    1. List结构:基于List结构来模拟消息队列
    2. PubSub:基本的点对点消息模型
    3. Stream:较完善的消息队列模型

    1. List实现消息队列

    Redis的List数据结构类型是一个双向链表,而队列要求进,出口不能在同一个位置,所以可以利用List的添加取出命令来实现模拟消息队列。

    1. LPUSH,RPOP
    2. RPUSH,LPOP

    但是java在消费消息的时候,如果没有消息了,消费者应该是阻塞等待,等到有消息投递了,再继续消费信息,而上述命令不是阻塞式的,如果没有消息了还在获取的话会获取到Null。所以应该实现阻塞的效果用下列命令

    1. BRPOP
    2. BLPOP

    上述两个命令的取出效果是阻塞式的。

    List实现消息队列的缺点:

    1. 无法避免消息丢失:例如消费者拿到消息还没有消费就宕机了
    2. 只能支持单个消费

    2. 基于PubSub的消息队列

    PubSub(发布订阅)是Redis2.0版本引入的消息传递模型。顾名思义,消费者可以订阅一个或多个channel,生产者向对应channel发送消息后,所有订阅者都能收到相关消息。

    1. SUBSCRIBE channel [channel] :订阅一个或多个频道
    2. PUBLISH channel msg :向一个频道发送消息
    3. PSUBSCRIBE pattern[pattern] :订阅与pattern格式匹配的所有频道

    这里的PSUBSCRIBE与RabbitMQ的匹配相似。

    基于PubSub的消息队列的缺点:

    1. List支持数据持久化,但是PubSub不支持数据持久化

    3. 基于Stream的消息队列

    Stream是Redis5.0引入的新的数据类型,可以实现一个功能较为完善的消息队列

    添加命令
    在这里插入图片描述

    例如

    XADD users * name jack age 21
    

    users是队列,*表示消息id ,后面的部分表示消息体

    消费命令

    在这里插入图片描述
    当ID为$时代表读取最新的消息。

    例如

    XREAD COUNT 1 STREAMS users 0
    

    COUNT 1 代表每次只读取一条,STREAMS users 表示从users这个队列里读取

    注意:Stream的消息队列消费消息后是不会剔除该消息的

    缺点:当指定ID为$,代表读取最新的消息,如果在处理一条新消息的时候,突然来了5条消息,当再次读取最新消息时,只能读取到5条消息的最后一条,造成消息漏读的现象

    Stream消息队列的优点:

    1. 消息可回溯(消费后不会被剔除)
    2. 消息可以被多个消费者读取
    3. 可以阻塞读取

    3.1 消费者组

    消费者组(Consumer Group):将多个消费者划分到一个组中,监听同一个队列。具备下列特点:

    1. 消息分流:队列中的消息会分流给组内的不同消费者,而不是重复消费,从而加快消息处理的速度,同一个消费者组里的消费者之间处于一种竞争的关系,消息是不会出现消费重复的,同时一定程度上也可以避免消息漏读的现象
    2. 消息标识:消费者组会维护一个标示,记录最后一个被处理的消息,哪怕消费者宕机重启,还会从标示之后读取消息。确保每一个消息都会被消费
    3. 消息确认:消费者获取消息后,消息处于pending状态,并存入一个pending-list。当处理完成后需要通过XACK来确认消息,标记消息为已处理,才会从pending-list移

    如何创建消费者组?

    XGROUP CREATE key groupName ID [MKSTREAM]
    
    1. key:队列名称
    2. groupName:消费者组名称
    3. ID:起始ID标识,$代表队列中最后一个消息,0代表队列中第一个消息
    4. MKSTREAM:队列不存在时自动创建

    在这里插入图片描述

    如何从消费者组读取消息?

    XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key..] ID [ID..]
    
    1. group:消费者组名称
    2. consumer:消费者名称,如果消费者不存在,会自动创建一个消费者
    3. count:本次查询最大数量
    4. BLOCK milliseconds:是否阻塞?阻塞的时间
    5. NOACK:消费消息后不响应
    6. STREAMS key:指定队列名称
    7. ID:获取消息的起始ID >表示从下一个未消费的消息开始 。其它:根据指定id从pending-list中获取已消费但未确认的消息,例如0,是从pending-list中的第一个消息开始

    那么消费者消费完消息后如何确认消息呢?

    XACK key group ID [ID..]
    
    1. key:队列名称
    2. group:消费者组名称
    3. ID:消息的ID

    java手动模拟消费者监听消息的代码

    while(true){
                Object message = redis.call("XREADGROUP GROUP 你的消费组名称 消费者名称 COUNT 每次读取消息的数量 BLOCK 阻塞时间毫秒 STREAMS 队列名称 >");
                if (message == null){
                    continue;
                }
                try{
                    // 处理消息的逻辑 处理完毕后要ACK
                    handleMessage(message);
                }catch (Exception e){
                    while (true){
                        // 从等待响应的队列里拿消息
                        Object unAckMessage = redis.call("XREADGROUP GROUP 你的消费组名称 消费者名称 COUNT 每次读取消息的数量 BLOCK 阻塞时间毫秒 STREAMS 队列名称 >");
                        if (unAckMessage == null){
                            continue;
                        }
                        try {
                            handleMessage(unAckMessage);
                        }catch (Exception e1){
                            continue;
                        }
                    }
                }
            }
    
    展开全文
  • 数据库存贮都用list形式 要存2个队列 1个用作消息队列保存到数据 还有个 就是用来实时读取数据在redis $redis->lpush($queenkey, json_encode($array)); $redis->lpush($listkey, json_encode($array)); /*消息...
  • 为什么学习Redis作为消息队列服务器

    千次阅读 2015-01-06 17:27:04
    使用Redis作为消息队列服务场景  “ 消息 ”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。消息被发送到队列中,“ 消息队列 ”是在消息的传输...
  • 本篇文章探讨了一下如何使用redis实现消息队列。使用redis无需额外的部署,如果原先就有使用redis的话。此外redis更为轻量也更容易维护。但是redis实现消息队列有多种方案,这些方案有其优点也有其缺点,适用于不同...
  • Redis消息队列

    千次阅读 2021-09-02 17:04:29
    Redis 一、Redis数据类型、Redis数据结构、Redis使用场景 Redis数据类型 键的类型只能是字符串 值支持5种数据类型: 字符串String,可以存储字符串、整数、浮点数 列表list 集合set 散列表hash,包含键值...
  • Redis简单消息队列Node.js的轻量级消息队列,不需要专用的队列服务器。 只是一个Redis服务器。 tl; dr:如果您运行Redis服务器并且当前使用Amazon SQS或类似的消息队列,则最好使用这种快速的替代方法。 使用共享的...
  • 博文a 中的老师,提供了Redis 实现消息队列的整体思路,言简意赅,但部分类库a 老师并未提供,因此我参照了博文b 中老师的RedisHelper 类,主要借鉴的方法ListLeftPop及ListRightPush,及实现消息队列的核心思想,...
  • Qt 使用 Redis实现 消息队列,点对点 生产者-消费者 模式
  • Redis异步消息队列

    千次阅读 2022-03-27 15:57:15
    Redis 的 list(列表) 数据结构常用来作为异步消息队列使用,使用rpush和lpush操作入队列,使用lpop 和 rpop来出队列。客户端是通过队列的 pop 操作来获取消息,然后进行处理。处理完了再接着获取消息,再进行处理。...
  • redis消息队列

    2018-09-05 15:13:41
    redis 消息队列源码示例 redis 消息队列源码示例 redis 消息队列源码示例
  • 使用redis做任务队列分发子任务,用于分布式拆分子任务提高系统运行效率
  • 啥造轮子?redis消息队列
  • Redis 实现消息队列

    千次阅读 2022-01-24 15:47:49
    面试的时候你提到了,Redis 和 MQ,面试官可能会让你用 Redis 实现消息队列,一方面考察你 Redis 的掌握,又考察了你对 MQ 的理解。可谓 一箭双雕。 消息队列 消息队列在分布式系统中用途非常广泛。 它具有 低耦合、...
  • 队列模型如图所示,它具有以下几个特点,就像我们用微信和好友(群聊除外)聊天一样,微信就是这个队列,我们可以和很多个好友聊天,但是每条消息只能发给一个好友。 只有一个消费者将获得消息 生产者不需要在接收者...
  • Java redis实现消息队列

    千次阅读 2021-07-01 11:16:00
    文章目录一、单元测试Java多线程二、redis实现消息队列三、java多线程模拟生产者消费者 一、单元测试Java多线程 使用junit测试多线程代码,但是等到程序结束,输出结果不完整,或者是完全没结果,因此,可能是其他...
  • 原生的redis中通过L/R PUSH/POP方式来实现队列的功能,这个当然是没办法满足需求的(没有ack功能),所以需要自己对redis的list(队列个小小的调整。 大体思路在POP时将pop出的数据放到备份的地方,当有ACK...
  • http://127.0.0.1:8000/getList queueName">一个NodeJS和redis做的基于http协议使用的队列 了点小修改 支持多个队列和post提交 原github地址:https://github.com/lnmp/nodemq 使用方法: 在安装好redis和nodejs...
  • 最近博主在看redis的时候发现了两种redis使用方式,与之前redis作为缓存不同,利用的是redis可设置key的有效时间和redis的BRPOP命令。 分布式锁 由于目前一些编程语言,如PHP等,不能在内存中使用锁,或者如Java这样...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 171,071
精华内容 68,428
关键字:

为什么不用redis做消息队列