精华内容
下载资源
问答
  • 本文主要介绍了两种java实现消息队列的方式,利用Spring消息模板发送消息和Apache ActiveMQ官方实例发送消息,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
  • 主要介绍了SpringBoot利用redis集成消息队列的方法,需要的朋友可以参考下
  • 本篇文章主要介绍了Java利用Redis实现消息队列的示例代码,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
  • 使用QQueue, QThread,QMutex,QWaitCondition模拟消息队列的任务处理,实现任务的同步处理
  • 消息队列

    千次阅读 多人点赞 2019-09-19 21:42:59
    消息队列消息队列”是在消息的传输过程中保存消息的容器,当我们需要使用消息的时候可以取出消息供自己使用。 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高...

    消息队列

    “消息队列”是在消息的传输过程中保存消息的容器,当我们需要使用消息的时候可以取出消息供自己使用。

    消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ。

    为什么要用消息队列

    (1) 通过异步处理提高系统性能(削峰、减少响应所需时间)
    在这里插入图片描述
    如上图,在不使用消息队列服务器的时候,用户的请求数据直接写入数据库,在高并发的情况下数据库压力剧增,使得响应速度变慢。但是在使用消息队列之后,用户的请求数据发送给消息队列之后立即 返回,再由消息队列的消费者进程从消息队列中获取数据,异步写入数据库。由于消息队列服务器处理速度快于数据库(消息队列也比数据库有更好的伸缩性),因此响应速度得到大幅改善。

    (2) 降低系统耦合性
    我们知道如果模块之间不存在直接调用,那么新增模块或者修改模块就对其他模块影响较小,这样系统的可扩展性无疑更好一些。
    我们最常见的事件驱动架构类似生产者消费者模式,在大型网站中通常用利用消息队列实现事件驱动结构。如下图所示:
    在这里插入图片描述
    消息队列使利用发布-订阅模式工作,消息发送者(生产者)发布消息,一个或多个消息接受者(消费者)订阅消息。 从上图可以看到消息发送者(生产者)和消息接受者(消费者)之间没有直接耦合,消息发送者将消息发送至分布式消息队列即结束对消息的处理,消息接受者从分布式消息队列获取该消息后进行后续处理,并不需要知道该消息从何而来。对新增业务,只要对该类消息感兴趣,即可订阅该消息,对原有系统和业务没有任何影响,从而实现网站业务的可扩展性设计。

    消息接受者对消息进行过滤、处理、包装后,构造成一个新的消息类型,将消息继续发送出去,等待其他消息接受者订阅该消息。因此基于事件(消息对象)驱动的业务架构可以是一系列流程。

    另外为了避免消息队列服务器宕机造成消息丢失,会将成功发送到消息队列的消息存储在消息生产者服务器上,等消息真正被消费者服务器处理后才删除消息。在消息队列服务器宕机后,生产者服务器会选择分布式消息队列服务器集群中的其他服务器发布消息。

    备注: 不要认为消息队列只能利用发布-订阅模式工作,只不过在解耦这个特定业务环境下是使用发布-订阅模式的。除了发布-订阅模式,还有点对点订阅模式(一个消息只有一个消费者),我们比较常用的是发布-订阅模式。 另外,这两种消息模型是 JMS 提供的,AMQP 协议还提供了 5 种消息模型。

    RocketMq

    RocketMq是一个由阿里巴巴开源的消息中间件,脱胎去阿里每部使用的MetaQ,在设计上借鉴了Kafka。2012年开源,2017年成为apache顶级项目

    RocketMQ 是什么?
    在这里插入图片描述
    上图是一个典型的消息中间件收发消息的模型,RocketMQ也是这样的设计,简单说来,RocketMQ具有以下特点:
    是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。
    Producer、Consumer、队列都可以分布式。
    Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer如果做广播消费,则一个consumer实例消费这个Topic对应的所有队列,如果做集群消费,则多个Consumer实例平均消费这个topic对应的队列集合。
    能够保证严格的消息顺序
    提供丰富的消息拉取模式
    高效的订阅者水平扩展能力
    实时的消息订阅机制
    亿级消息堆积能力
    较少的依赖

    RocketMQ 物理部署结构
    在这里插入图片描述
    如上图所示, RocketMQ的部署结构有以下特点:
    Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
    Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。
    Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
    Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

    RocketMQ 逻辑部署结构
    在这里插入图片描述

    如上图所示,RocketMQ的逻辑部署结构有Producer和Consumer两个特点。
    Producer Group
    用来表示一个发送消息应用,一个Producer Group下包含多个Producer实例,可以是多台机器,也可以是一台机器的多个进程,或者一个进程的多个Producer对象。一个Producer Group可以发送多个Topic消息,Producer Group作用如下:1.标识一类Producer
    2.可以通过运维工具查询这个发送消息应用下有多个Producer实例
    3.发送分布式事务消息时,如果Producer中途意外宕机,Broker会主动回调Producer Group内的任意一台机器来确认事务状态。
    Consumer Group
    用来表示一个消费消息应用,一个Consumer Group下包含多个Consumer实例,可以是多台机器,也可以是多个进程,或者是一个进程的多个Consumer对象。一个Consumer Group下的多个Consumer以均摊方式消费消息,如果设置为广播方式,那么这个Consumer Group下的每个实例都消费全量数据。

    RocketMQ 数据存储结构
    在这里插入图片描述
    如上图所示,RocketMQ采取了一种数据与索引分离的存储方法。有效降低文件资源、IO资源,内存资源的损耗。即便是阿里这种海量数据,高并发场景也能够有效降低端到端延迟,并具备较强的横向扩展能力。

    JMS和AMQP的区别

    JMS
    JMS(JAVA Message Service,java消息服务)是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。

    JMS(JAVA Message Service,Java消息服务)API是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

    ActiveMQ 就是基于 JMS 规范实现的。

    JMS两种消息模型

    ①点到点(P2P)模型
    在这里插入图片描述
    使用队列(Queue)作为消息通信载体;满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。比如:我们生产者发送100条消息的话,两个消费者来消费一般情况下两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费。)

    ② 发布/订阅(Pub/Sub)模型
    在这里插入图片描述
    发布订阅模型(Pub/Sub) 使用主题(Topic)作为消息通信载体,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者,在一条消息广播之后才订阅的用户则是收不到该条消息的。

    JMS 五种不同的消息正文格式
    JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。

    • StreamMessage – Java原始值的数据流
    • MapMessage–一套名称-值对
    • TextMessage–一个字符串对象
    • ObjectMessage–一个序列化的 Java对象
    • BytesMessage–一个字节的数据流

    AMQP
    ​ AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准 高级消息队列协议(二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计,兼容 JMS。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。

    RabbitMQ 就是基于 AMQP 协议实现的。
    在这里插入图片描述
    总结:

    • AMQP 为消息定义了线路层(wire-level protocol)的协议,而JMS所定义的是API规范。在 Java 体系中,多个client均可以通过JMS进行交互,不需要应用修改代码,但是其对跨平台的支持较差。而AMQP天然具有跨平台、跨语言特性。
    • JMS 支持TextMessage、MapMessage 等复杂的消息类型;而 AMQP 仅支持 byte[] 消息类型(复杂的类型可序列化后发送)。
    • 由于Exchange 提供的路由算法,AMQP可以提供多样化的路由方式来传递消息到消息队列,而 JMS 仅支持 队列 和 主题/订阅 方式两种。

    常见的消息队列对比

    在这里插入图片描述
    总结:

    • ActiveMQ 的社区算是比较成熟,但是较目前来说,ActiveMQ 的性能比较差,而且版本迭代很慢,不推荐使用。
    • RabbitMQ 在吞吐量方面虽然稍逊于 Kafka 和 RocketMQ ,但是由于它基于 erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。但是也因为 RabbitMQ 基于 erlang 开发,所以国内很少有公司有实力做erlang源码级别的研究和定制。如果业务场景对并发量要求不是太高(十万级、百万级),那这四种消息队列中,RabbitMQ 一定是你的首选。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。
    • RocketMQ 阿里出品,Java 系开源项目,源代码我们可以直接阅读,然后可以定制自己公司的MQ,并且 RocketMQ 有阿里巴巴的实际业务场景的实战考验。RocketMQ 社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些,然后接口这块不是按照标准 JMS 规范走的有些系统要迁移需要修改大量代码。还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险,那如果你们公司有技术实力我觉得用RocketMQ 挺好的
    • kafka 的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms 级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时 kafka 最好是支撑较少的 topic 数量即可,保证其超高吞吐量。kafka 唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略这个特性天然适合大数据实时计算以及日志收集。

    消息队列应用场景

    1、异步处理
    场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种 1.串行的方式;2.并行方式
    a、串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。
    在这里插入图片描述
    b、并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间
    在这里插入图片描述
    假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。
    因为CPU在单位时间内处理的请求数是一定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)
    小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?

    引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:
    在这里插入图片描述
    按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍。

    2、应用解耦
    场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图:
    在这里插入图片描述
    传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合

    如何解决以上问题呢?引入应用消息队列后的方案,如下图:
    在这里插入图片描述
    订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
    库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
    假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦

    3、流量削锋
    流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。
    应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。
    a、可以控制活动的人数
    b、可以缓解短时间内高流量压垮应用
    在这里插入图片描述
    用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。
    秒杀业务根据消息队列中的请求信息,再做后续处理

    2.4日志处理
    日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下
    在这里插入图片描述
    日志采集客户端,负责日志数据采集,定时写受写入Kafka队列
    Kafka消息队列,负责日志数据的接收,存储和转发
    日志处理应用:订阅并消费kafka队列中的日志数据

    5、消息通讯
    消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等
    点对点通讯:
    在这里插入图片描述
    客户端A和客户端B使用同一队列,进行消息通讯。

    聊天室通讯:
    在这里插入图片描述
    客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊天室效果。
    以上实际是消息队列的两种消息模式,点对点或发布订阅模式。模型为示意图,供参考。

    消息中间件示例
    电商系统
    在这里插入图片描述
    消息队列采用高可用,可持久化的消息中间件。比如Active MQ,Rabbit MQ,Rocket Mq。
    (1)应用将主干逻辑处理完成后,写入消息队列。消息发送是否成功可以开启消息的确认模式。(消息队列返回消息接收成功状态后,应用再返回,这样保障消息的完整性)
    (2)扩展流程(发短信,配送处理)订阅队列消息。采用推或拉的方式获取消息并处理。
    (3)消息将应用解耦的同时,带来了数据一致性问题,可以采用最终一致性方式解决。比如主数据写入数据库,扩展应用根据消息队列,并结合数据库方式实现基于消息队列的后续处理。

    日志收集系统
    在这里插入图片描述
    分为Zookeeper注册中心,日志收集客户端,Kafka集群和Storm集群(OtherApp)四部分组成。
    Zookeeper注册中心,提出负载均衡和地址查找服务
    日志收集客户端,用于采集应用系统的日志,并将数据推送到kafka队列
    Kafka集群:接收,路由,存储,转发等消息处理
    Storm集群:与OtherApp处于同一级别,采用拉的方式消费队列中的数据

    参考文章:
    https://www.jianshu.com/p/36a7775b04ec
    https://blog.csdn.net/HD243608836/article/details/80217591

    展开全文
  • 作为中间件,消息队列是分布式应用间交换信息的重要组件。消息队列可驻留在内存或磁盘上, 队列可以存储消息直到它们被应用程序读走。通过消息队列,应用程序可以在不知道彼此位置的情况下独立处理消息,或者在处理...
  • 采用客户-服务器结构,其中服务器实现各个用户的登录并存储相关信息,客户端通过服务器端获取当前登录用户信息,然后各客户进程通过消息队列实现双向通信。 Linux IPC通信利用消息队列消息机制,多线程通信,字符串...
  • C++封装实现的异步加锁消息队列,支持多线程,完美封装,可用于消息接收、处理
  • 线程间同步和通信之消息队列(动态),通过按键LED的组合学习消息队列的使用
  • 用Redis做的一个简单的消息队列
  • NULL 博文链接:https://j2ee2009.iteye.com/blog/689161
  • Linux 环境下利用消息队列消息机制,多线程通信,字符串处理,链表操作,信号简单处理等知识用C语言编写多人聊天室实现: 服务器实现各用户之间聊天的消息转发,在用户注册或者登录时对各用户进行消息提醒,客户端从...
  • 各种消息队列对比

    2015-10-04 21:32:12
    消息队列中间件调研文档。ActiveMQ、RabbitMQ、RocketMq、Joram、HornetQ、OpenMQ等的对比。
  • 在这篇文章中,我将介绍一个新的、独立的、开源的,完全基于C#和.NET Framework3.5的消息队列系统,DotNetMQ是一个消息代理,它包括确保传输,路由,负载均衡,服务器图等等多项功能。我将从解释消息的概念和消息...
  • MQ消息队列实例

    2017-05-05 16:49:11
    Java向MQ发送消息三种方式
  • redis 案例。包含, 队列操作, socket通信, 以及 socket 和 redis 配合 redis 案例。包含, 队列操作, socket通信, 以及 socket 和 redis 配合
  • C++多线程,消息队列用法,为了凑够20个字,拼了。
  • UCOSII消息队列实例,平台为MDK4.7+STM32+UCOSII
  • 什么是消息队列你了解过么?

    你知道的越多,你不知道的越多

    点赞再看,养成习惯

    GitHub上已经开源 https://github.com/JavaFamily 有一线大厂面试点脑图、个人联系方式,欢迎Star和完善

    面试开始

    一个风度翩翩,穿着格子衬衣的中年男子,拿着一个满是划痕的mac向你走来,看着铮亮的头,心想着肯定是尼玛顶级架构师吧!但是我们看过暖男敖丙的系列,腹有诗书气自华,虚都不虚。

    小伙子之前问了你这么多Redis的知识,你不仅对答如流,你还能把各自场景的解决方案,优缺点说得这么流畅,说你是不是看过敖丙写的《吊打面试官》系列呀?

    惊!!!老师你怎么知道的,我看了他的系列根本停不下来啊。

    呵呵,Redis没难住你,但是我问个新的技术栈我还怕难不住你?我问问你你项目中用过消息队列么?你为啥用消息队列?

    噗此,这也叫问题?别人用了我能不用么?别人用了我就用了呗,我就是为了用而用。

    你心里嘀咕就好了,千万别说出来哈,说出来了没拿到Offer别到时候就在那说,敖丙那个渣男教我说的!

    面试官你好:我们公司本身的业务体量很小,所以直接单机一把梭啥都能搞定了,但是后面业务体量不断扩大,采用微服务的设计思想分布式的部署方式,所以拆分了很多的服务,随着体量的增加以及业务场景越来越复杂了,很多场景单机的技术栈和中间件以及不够用了,而且对系统的友好性也下降了,最后做了很多技术选型的工作,我们决定引入消息队列中间件

    哦?你说到业务场景越来越复杂,你那说一下你都在什么场景用到了消息队列?

    嗯,我从三个方面去说一下我使用的场景吧。

    Tip:这三个场景也是消息队列的经典场景,大家基本上要烂熟于心那种,就是一说到消息队列你脑子就要想到异步、削峰、解耦,条件反射那种。

    异步:

    我们之前的场景里面有很多步骤都是在一个流程里面需要做完的,就比如说我的下单系统吧,本来我们业务简单,下单了付了钱就好了,流程就走完了。

    但是后面来了个产品经理,搞了个优惠券系统,OK问题不大,流程里面多100ms去扣减优惠券。

    后来产品经理灵光一闪说我们可以搞个积分系统啊,也行吧,流程里面多了200ms去增减积分。

    再后来后来隔壁的产品老王说:下单成功后我们要给用户发短信,也将就吧,100ms去发个短信。

    再后来。。。(敖丙你有完没完!!!)

    反正就流程有点像这样 ↓

    你们可以看到这才加了三个,我可以斩钉截铁的告诉你真正的下单流程涉及的系统绝对在10个以上(主流电商),越大的越多。

    这个链路这样下去,时间长得一批,用户发现我买个东西你特么要花几十秒,垃圾电商我不在你这里买了,不过要是都像并夕夕这么便宜,真香

    但是我们公司没有夕夕的那个经济实力啊,那只能优化系统了。

    Tip:我之前在的电商老东家要求所有接口的RtResponseTime响应时间)在200ms内,超出的全部优化,我现在所负责的系统QPS也是9W+就是抖动一下网络集群都可能炸锅那种,RT基本上都要求在50ms以内。

    大家感受一下这个QPS。

    嗯不错,链路长了就慢了,那你怎么解决的?

    那链路长了就慢了,但是我们发现上面的流程其实可以同时做的呀,你支付成功后,我去校验优惠券的同时我可以去增减积分啊,还可以同时发个短信啊。

    那正常的流程我们是没办法实现的呀,怎么办,异步

    你对比一下是不是发现,这样子最多只用100毫秒用户知道下单成功了,至于短信你迟几秒发给他他根本不在意是吧。

    小伙子我打断你一下,你说了异步,那我用线程,线程池去做不是一样的么?

    诶呀,面试官你不要急嘛,我后面还会说到的,骚等。

    解耦:

    既然面试官这么问了,我就说一下为啥我们不能用线程去做,因为用线程去做,你是不是要写代码?

    你一个订单流程,你扣积分,扣优惠券,发短信,扣库存。。。等等这么多业务要调用这么多的接口,每次加一个你要调用一个接口然后还要重新发布系统,写一次两次还好,写多了你就说:老子不干了!

    而且真的全部都写在一起的话,不单单是耦合这一个问题,你出问题排查也麻烦,流程里面随便一个地方出问题搞不好会影响到其他的点,小伙伴说我每个流程都try catch不就行了,相信我别这么做,这样的代码就像个定时炸弹💣,你不知道什么时候爆炸,平时不炸偏偏在你做活动的时候炸,你就领个P0故障收拾书包提前回家过年吧。

    Tip:P0—PN 是互联网大厂经常用来判定事故等级的机制,P0是最高等级了。

    但是你用了消息队列,耦合这个问题就迎刃而解了呀。

    哦,帅丙怎么说?

    且听我娓娓道来:

    你下单了,你就把你支付成功的消息告诉别的系统,他们收到了去处理就好了,你只用走完自己的流程,把自己的消息发出去,那后面要接入什么系统简单,直接订阅你发送的支付成功消息,你支付成功了我监听就好了

    那你的流程走完了,你不用管别人是否成功么?比如你下单了积分没加,优惠券没扣怎么办?

    问题是个好问题,但是没必要考虑,业务系统本身就是自己的开发人员维护的,你积分扣失败关我下单的什么事情?你管好自己下单系统的就好了。

    Tip:话是这么说,但是这其实是用了消息队列的一个缺点,涉及到分布式事务的知识点,我下面会提到。

    削峰:

    就拿我上一期写的秒杀来说(暗示新同学看我上一期),你平时流量很低,但是你要做秒杀活动00 :00的时候流量疯狂怼进来,你的服务器,RedisMySQL各自的承受能力都不一样,你直接全部流量照单全收肯定有问题啊,直接就打挂了。

    那怎么办?

    简单,把请求放到队列里面,然后至于每秒消费多少请求,就看自己的服务器处理能力,你能处理5000QPS你就消费这么多,可能会比正常的慢一点,但是不至于打挂服务器,等流量高峰下去了,你的服务也就没压力了。

    你看阿里双十一12:00的时候这么多流量瞬间涌进去,他有时候是不是会慢一点,但是人家没挂啊,或者降级给你个友好的提示页面,等高峰过去了又是一条好汉了。

    为了这个图特意打高一台服务的流量
    为了这个图特意打高一台服务的流量

    听你说了辣么多,怎么都是好处,那我问你使用了消息队列有啥问题么?

    诶,看过前面我写的文章的人才都知道,我经常说的就是,技术是把双刃剑

    没错面试官,我使用他是因为他带给我们很多好处,但是使用之后问题也是接踵而至

    同样的暖男我呀,也从三个点介绍他主要的缺点:

    系统复杂性

    本来蛮简单的一个系统,我代码随便写都没事,现在你凭空接入一个中间件在那,我是不是要考虑去维护他,而且使用的过程中是不是要考虑各种问题,比如消息重复消费消息丢失消息的顺序消费等等,反正用了之后就是贼烦。

    我插一句嘴,上面的问题(重复消费、消息丢失、顺序消费)你能分别介绍一下,并且说一下分别是怎么解决的么?

    不要!我都说了敖丙下一章写啥?

    其实不是暖男我不想在这里写,这三个问题我想了下,统统都是MQ重点问题,单独拿一个出来就是一篇文章了,篇幅实在太长了,我会在下一章挨个介绍一遍的。

    数据一致性

    这个其实是分布式服务本身就存在的一个问题,不仅仅是消息队列的问题,但是放在这里说是因为用了消息队列这个问题会暴露得比较严重一点。

    就像我开头说的,你下单的服务自己保证自己的逻辑成功处理了,你成功发了消息,但是优惠券系统,积分系统等等这么多系统,他们成功还是失败你就不管了?

    我说了保证自己的业务数据对的就好了,其实还是比较不负责任的一种说法,这样就像个渣男,没有格局这样呀你的路会越走越窄的

    所有的服务都成功才能算这一次下单是成功的,那怎么才能保证数据一致性呢?

    分布式事务:把下单,优惠券,积分。。。都放在一个事务里面一样,要成功一起成功,要失败一起失败。

    Tip:分布式事务在互联网公司里面实在常见,我也不在这里大篇幅介绍了,后面都会专门说的。

    可用性

    你搞个系统本身没啥问题,你现在突然接入一个中间件在那放着,万一挂了怎么办?我下个单MQ挂了,优惠券不扣了,积分不减了,这不是杀一个程序员能搞定的吧,感觉得杀一片。

    至于怎么保证高可用,还是那句话也不在这里展开讨论了,我后面一样会写,像写Redis那样写出来的。

    放心敖丙我不是渣男来的,我肯定会对你们负责的。点赞!

    看不出来啊,你有点东西呀,那我问一下你,你们是怎么做技术选型的?

    目前在市面上比较主流的消息队列中间件主要有,Kafka、ActiveMQ、RabbitMQ、RocketMQ 等这几种。

    不过敖丙我想说的是,ActiveMQRabbitMQ这两着因为吞吐量还有GitHub的社区活跃度的原因,在各大互联网公司都已经基本上绝迹了,业务体量一般的公司会是有在用的,但是越来越多的公司更青睐RocketMQ这样的消息中间件了。

    KafkaRocketMQ一直在各自擅长的领域发光发亮,不过写这篇文章的时候我问了蚂蚁金服,字节跳动和美团的朋友,好像大家用的都有点不一样,应该都是各自的中间件,可能做过修改,也可能是自研的,大多没有开源

    就像我们公司就是是基于KafkaRocketMQ两者的优点自研的消息队列中间件,吞吐量、可靠性、时效性等都很可观。

    我们回归正题,我这里用网上找的对比图让大家看看差距到底在哪里:

    大家其实一下子就能看到差距了,就拿吞吐量来说,早期比较活跃的ActiveMQRabbitMQ基本上不是后两者的对手了,在现在这样大数据的年代吞吐量是真的很重要

    比如现在突然爆发了一个超级热点新闻,你的APP注册用户高达亿数,你要想办法第一时间把突发全部推送到每个人手上,你没有大吞吐量的消息队列中间件用啥去推?

    再说这些用户大量涌进来看了你的新闻产生了一系列的附带流量,你怎么应对这些数据,很多场景离开消息队列基本上难以为继

    部署方式而言前两者也是大不如后面两个天然分布式架构的哥哥,都是高可用的分布式架构,而且数据多个副本的数据也能做到0丢失。

    我们再聊一下RabbitMQ这个中间件其实还行,但是这玩意开发语言居然是erlang,我敢说绝大部分工程师肯定不会为了一个中间件去刻意学习一门语言的,开发维护成本你想都想不到,出个问题查都查半天。

    至于RocketMQ(阿里开源的),git活跃度还可以。基本上你push了自己的bug确认了有问题都有阿里大佬跟你试试解答并修复的,我个人推荐的也是这个,他的架构设计部分跟同样是阿里开源的一个RPC框架是真的很像(Dubbo)可能是因为师出同门的原因吧。

    Tip:Dubbo等我写到RPC我会详细介绍的。

    Kafka我放到最后说,你们也应该知道了,压轴的这是个大哥,大数据领域,公司的日志采集,实时计算等场景,都离不开他的身影,他基本上算得上是世界范围级别的消息队列标杆了。

    以上这些都只是一些我自己的个人意见,真正的选型还是要去深入研究的,不然那你公司一天UV就1000你告诉我你要去用Kafka我只能说你吃饱撑的。

    记住,没有最好的技术,只有最适合的技术,不要为了用而用

    面试结束

    嗯,小伙子不错不错,分析得很到位,那你记得下期来说一下消息队列的高可用,重复消费、消息丢失、消息顺序、分布式事务等问题?

    嗯嗯好的面试官,不过不确定能不能一口气说完,毕竟敖丙还没开始写,而且读者还有可能白嫖,动力不一定够。

    嗯嗯这倒是个问题,不过啊在看的都是人才肯定会给你点赞👍的!

    我也这么认为。

    总结

    消息队列的基础知识我就先介绍这么多,消息队列在面试里面基本上也是跟我前面写的Redis一样必问的。

    面试的思路还是一样,要知其然,也要知其所以然,就是要知道为啥用,用了有啥好处,有啥坑。

    面试官不喜欢只知道用的,你只会用那哪天线上出问题怎么办?你难道在旁边拜佛?

    Tip:本来有很多我准备的资料的,但是都是外链,或者不合适的分享方式,博客的运营小姐姐提醒了我,所以大家去公众号回复【资料】好了。

    鸣谢

    之前的文章写了很多人加我,然后有个人才说是他蚂蚁金服的Leader推荐的我,我突然意识到我文章的受众好像慢慢变广了,之后不严谨的点要杜绝掉。

    所以之后我的文章经常会有大厂的小伙伴Review,也希望帮助我更好的监督自己的文章吧。

    这次是 某阿里系电商跟我一起做过活动小组的 佩恩 帮我Review的文章,感谢!

    絮叨

    另外,敖丙把自己的面试文章整理成了一本电子书,共 1630页!目录如下

    现在免费送给大家,在我的公众号三太子敖丙回复 【888】 即可获取。

    我是敖丙,一个在互联网苟且偷生的程序员。

    你知道的越多,你不知道的越多人才们的 【三连】 就是丙丙创作的最大动力,我们下期见!

    注:如果本篇博客有任何错误和建议,欢迎人才们留言!


    文章持续更新,可以微信搜索「 三太子敖丙 」第一时间阅读,回复【资料】有我准备的一线大厂面试资料和简历模板,本文 GitHub https://github.com/JavaFamily 已经收录,有大厂面试完整考点,欢迎Star。

    展开全文
  • 搭载FreeRTOS系统,任务一向消息队列填充数字,任务二从消息队列提取数据并发送到串口1,同时有LED灯跟随数据传送亮灭。 这里我们的课程设计内容。 对于STM32和FreeRTOS初学者以及想了解RTOS的任务机制与消息队列的...
  • 一、队列简介 ...由于队列用来传递消息的,所以也称为消息队列。FreeRTOS中的信号量的也是依据队列实现的!所以有必要深入的了解FreeRTOS的队列。 1、数据存储 通常队列采用先进先出(FIFO)的存储缓冲机制,

    一、队列简介

    队列是为了任务与任务、任务与中断之间的通信而准备的,可以在任务与任务、任务与中断之间传递消息,队列中可以存储有限的、大小固定的数据项目。 任务与任务、任务与中断之间要交流的数据保存在队列中,叫做队列项目。队列所能保存的最大数据项目数量叫做队列的长度,创建队列的时候会指定数据项目的大小和队列的长度。由于队列用来传递消息的,所以也称为消息队列。FreeRTOS中的信号量的也是依据队列实现的!所以有必要深入的了解FreeRTOS的队列。

    1、数据存储

    通常队列采用先进先出(FIFO)的存储缓冲机制,也就是往队列发送数据的时候(也叫入队)永远都是发送到队列的尾部,而从队列提取数据的时候(也叫出队)是从队列的头部提取的。但是也可以使用LIFO的存储缓冲,也就是后进先出,FreeRTOS中的队列也提供了LIFO的存储缓冲机制。数据发送到队列中会导致数据拷贝,也就是将要发送的数据拷贝到队列中,这就意味着在队列中存储的是数据的原始值,而不是原数据的引用即只传递数据的指针),这个也叫做值传递。UCOS的消息队列采用的是引用传递,传递的是消息指针采用引用传递的话消息内容就必须一直保持可见性,也就是消息内容必须有效,那么局部变量这种可能会随时被删掉的东西就不能用来传递消息,但是采用引用传递会节省时间啊!因为不用进行数据拷贝。采用值传递的话虽然会导致数据拷贝,会浪费一点时间,但是一旦将消息发送到队列中原始的数据缓冲区就可以删除掉或者覆写,这样的话这些缓冲区就可以被重复的使用。(注意FreeRTOS也可以实现引用传递,只需要传递地址即可

    2、多任务访问

    队列不是属于某个特定的任务的,任何任务都可以向队列中发送消息,或者从队列中提取消息。

    3、出队阻塞

    当任务尝试从一个队列中读取消息的时候可以指定一个阻塞时间,这个阻塞时间就是当任务从队列中读取消息无效的时候任务阻塞的时间。出队就是就从队列中读取消息,出队阻塞是针对从队列中读取消息的任务而言的。比如任务A用于处理串口接收到的数据,串口接收到数
    据以后就会放到队列Q中,任务A从队列Q中读取数据。但是如果此时队列Q是空的,说明还没有数据,任务A这时候来读取的话肯定是获取不到任何东西,那该怎么办呢?任务A现在有三种选择,一:二话不说扭头就走,二:要不我在等等吧,等一会看看,说不定一会就有数据了,三:死等,死也要等到你有数据!选哪一个就是由这个阻塞时间决定的,这个阻塞时间单位是时钟节拍数。
    ①、阻塞时间为0的话就是不阻塞,没有数据的话就马上返回任务继续执行接下来的代码,对应第一种选择。
    ②、如果阻塞时间为0~portMAX_DELAY,当任务没有从队列中获取到消息的话就进入阻塞态,阻塞时间指定了任务进入阻塞态的时间,当阻塞时间到了以后还没有接收到数据的话就退出阻塞态,返回任务接着运行下面的代码。如果在阻塞时间内接收到了数据就立即返回,执行任务中下面的代码,这种情况对应第二种选择。
    ③、当阻塞时间设置为portMAX_DELAY的话,任务就会一直进入阻塞态等待,直到接收到数据为止!这个就是第三种选择。

    4、入队阻塞

    入队说的是向队列中发送消息,将消息加入到队列中。和出队阻塞一样,当一个任务向队列发送消息的话也可以设置阻塞时间。比如任务B向消息队列Q发送消息,但是此时队列Q是满的,那肯定是发送失败的。此时任务B就会遇到和上面任务A一样的问题,这两种情况的处理过程是类似的,只不过一个是向队列Q发送消息,一个是从队列Q读取消息而已。

    5、队列操作过程

    • 创建队列
      在这里插入图片描述
      任务A要向任务B发送消息,这个消息是x变量的值。首先创建一个队列,并且指定队列的长度和每条消息的长度。这里我们创建了一个长度为4的队列,因为要传递的是x值,而x是个int类型的变量,所以每条消息的长度就是int类型的长度,在STM32中就是4字节,即每条消息是4个字节的。
    • 向队列发送第一个消息
      在这里插入图片描述
      任务A的变量x值为10,将这个值发送到消息队列中。此时队列剩余长度就是3了。前面说了向队列中发送消息是采用拷贝的方式,所以一旦消息发送完成变量x就可以再次被使用,赋其他的值。
    • 向队列发送第二个消息
      在这里插入图片描述
      任务A又向队列发送了一个消息,即新的x的值,这里是20。此时队列剩余长度为2。
    • 从队列中读取消息
      在这里插入图片描述
      任务B从队列中读取消息,并将读取到的消息值赋值给y,这样y就等于10了。任务B从队列中读取消息完成以后可以选择清除掉这个消息或者不清除。当选择清除这个消息的话其他任务或中断就不能获取这个消息了,而且队列剩余大小就会加一,变成3。如果不清除的话其他任务或中断也可以获取这个消息,而队列剩余大小依旧是2。

    二、队列结构体

    有一个结构体用于描述队列,叫做Queue_t,在queue.c中定义:

    typedef struct QueueDefinition
    {
    	int8_t *pcHead;	//指向队列存储区开始地址
    	int8_t *pcTail;	//指向队列存储区最后一个字节				
    	int8_t *pcWriteTo;//指向存储区中下一个空闲区域
    
    	union							
    	{
    		int8_t *pcReadFrom;//当用作队列的时候指向最后一个出队的队列项首地址			
    		UBaseType_t uxRecursiveCallCount;//当用作递归互斥量的时候用来记录递归互斥量被调用的次数
    	} u;
    
    	List_t xTasksWaitingToSend;//等待发送任务列表,那些因为队列满导致入队失败而进入阻塞态的任务就会挂在这个列表上
    	List_t xTasksWaitingToReceive;//等待接受任务列表,那些因为队列空导致出队失败而进入阻塞态的任务就会挂到此列表上
    
    	volatile UBaseType_t uxMessagesWaiting;//队列中当前队列项数量,也就是消息数
    	UBaseType_t uxLength;//创建队列时指定的队列长度,也就是队列中最大允许的队列项数量
    	UBaseType_t uxItemSize;	//创建队列时指定的每个队列项最大长度,单位字节
    
    	volatile int8_t cRxLock;//当队列上锁以后用来统计从队列中接收到的队列项数量,也就是出队的队列项数量	
    	volatile int8_t cTxLock;//当队列上锁以后用来统计发送到队列中的队列项数量,也就是入队的队列项数量	
    
    	#if( ( configSUPPORT_STATIC_ALLOCATION == 1 ) && ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) )
    		uint8_t ucStaticallyAllocated;	//如果使用静态存储的话此字段设置为pdTURE
    	#endif
    
    	#if ( configUSE_QUEUE_SETS == 1 )//队列集相关宏
    		struct QueueDefinition *pxQueueSetContainer;
    	#endif
    
    	#if ( configUSE_TRACE_FACILITY == 1 )//跟踪调试相关宏
    		UBaseType_t uxQueueNumber;
    		uint8_t ucQueueType;
    	#endif
    
    } xQUEUE;
    typedef xQUEUE Queue_t;
    

    队列的基础知识就讲解到这里啦!下一讲将会讲解队列API的使用。

    展开全文
  • 可用于调试MSMQ、RabbitMQ、ActiveMQ三种消息队列 其中MSMQ支持Active、Binary、XML格式(要勾选事务) RabbitMQ支持逐条接发、批量接发、RPC回调模式、新建队列、建立持久化队列、连接测试等功能。
  • FreeRTOS消息队列

    万次阅读 多人点赞 2019-01-31 17:55:04
    任务或者中断服务程序都可以给消息队列发送消息,当发送消息时,如果队列未满或者允许覆盖入队, FreeRTOS 会将消息拷贝到消息队列队尾,否则,会根据用户指定的阻塞超时时间进行阻塞,在这段时间中,如果队列一直不...

    全文字数9920,预计阅读时长12分钟

    问题解答

    曾经有人问我,FreeRTOS那么多API,到底怎么记住呢?
    我想说,其实API不难记,就是有点难找,因为FreeRTOS的API很多都是带参宏,所以跳来跳去的比较麻烦,而且注释也很多,要找还真不是那么容易,不过也没啥,一般都会有API手册的,我就告诉大家一下:
    FreeRTOS Kernel: Reference Manual
    FreeRTOS内核:参考手册,大家可以在官网下载,也能在后台得到。
    当然书本是英文的,如果英语像我这样子不咋地的同学,可以用谷歌浏览器在官网直接看API手册,直接翻译一下就行了。传送门:https://www.freertos.org/a00018.html
    Reference Manual
    FreeRTOS官网的API

    FreeRTOS消息队列

    基于 FreeRTOS 的应用程序由一组独立的任务构成——每个任务都是具有独立权限的程序。这些独立的任务之间的通讯与同步一般都是基于操作系统提供的IPC通讯机制,而FreeRTOS 中所有的通信与同步机制都是基于队列实现的。
    消息队列是一种常用于任务间通信的数据结构,队列可以在任务与任务间、中断和任务间传送信息,实现了任务接收来自其他任务或中断的不固定长度的消息。任务能够从队列里面读取消息,当队列中的消息是空时,挂起读取任务,用户还可以指定挂起的任务时间;当队列中有新消息时,挂起的读取任务被唤醒并处理新消息,消息队列是一种异步的通信方式。

    队列特性

    1.数据存储

    队列可以保存有限个具有确定长度的数据单元。队列可以保存的最大单元数目被称为队列的“深度”。在队列创建时需要设定其深度和每个单元的大小。
    通常情况下,队列被作为 FIFO(先进先出)缓冲区使用,即数据由队列尾写入,从队列首读出。当然,由队列首写入也是可能的。
    往队列写入数据是通过字节拷贝把数据复制存储到队列中;从队列读出数据使得把队列中的数据拷贝删除。

    2.读阻塞

    当某个任务试图读一个队列时,其可以指定一个阻塞超时时间。在这段时间中,如果队列为空,该任务将保持阻塞状态以等待队列数据有效。当其它任务或中断服务例程往其等待的队列中写入了数据,该任务将自动由阻塞态转移为就绪态。当等待的时间超过了指定的阻塞时间,即使队列中尚无有效数据,任务也会自动从阻塞态转移为就绪态。
    由于队列可以被多个任务读取,所以对单个队列而言,也可能有多个任务处于阻塞状态以等待队列数据有效。这种情况下,一旦队列数据有效,只会有一个任务会被解除阻塞,这个任务就是所有等待任务中优先级最高的任务。而如果所有等待任务的优先级相同,那么被解除阻塞的任务将是等待最久的任务。

    说些题外话,ucos中是具有广播消息的,当有多个任务阻塞在队列上,当发送消息的时候可以选择广播消息,那么这些阻塞的任务都能被解除阻塞。

    3.写阻塞

    与读阻塞想反,任务也可以在写队列时指定一个阻塞超时时间。这个时间是当被写队列已满时,任务进入阻塞态以等待队列空间有效的最长时间。
    由于队列可以被多个任务写入,所以对单个队列而言,也可能有多个任务处于阻塞状态以等待队列空间有效。这种情况下,一旦队列空间有效,只会有一个任务会被解除阻塞,这个任务就是所有等待任务中优先级最高的任务。而如果所有等待任务的优先级相同,那么被解除阻塞的任务将是等待最久的任务。

    消息队列的工作流程

    1.发送消息

    任务或者中断服务程序都可以给消息队列发送消息,当发送消息时,如果队列未满或者允许覆盖入队, FreeRTOS 会将消息拷贝到消息队列队尾,否则,会根据用户指定的阻塞超时时间进行阻塞,在这段时间中,如果队列一直不允许入队,该任务将保持阻塞状态以等待队列允许入队。当其它任务从其等待的队列中读取入了数据(队列未满),该任务将自动由阻塞态转为就绪态。当任务等待的时间超过了指定的阻塞时间,即使队列中还不允许入队,任务也会自动从阻塞态转移为就绪态,此时发送消息的任务或者中断程序会收到一个错误码 errQUEUE_FULL。
    发送紧急消息的过程与发送消息几乎一样,唯一的不同是,当发送紧急消息时,发送的位置是消息队列队头而非队尾,这样,接收者就能够优先接收到紧急消息,从而及时进行消息处理。
    下面是消息队列的发送API接口,函数中有FromISR则表明在中断中使用的。
    消息队列入队(发送)的API接口

    1 /*-----------------------------------------------------------*/
     2 BaseType_t xQueueGenericSend( QueueHandle_t xQueue,		(1)	
     3                               const void * const pvItemToQueue, 	(2)
     4                               TickType_t xTicksToWait,		(3)
     5                               const BaseType_t xCopyPosition )	(4)
     6 {
     7     BaseType_t xEntryTimeSet = pdFALSE, xYieldRequired;
     8     TimeOut_t xTimeOut;
     9     Queue_t * const pxQueue = ( Queue_t * ) xQueue;
    10 
    11     /* 已删除一些断言操作 */
    12 
    13     for ( ;; ) {
    14         taskENTER_CRITICAL();					(5)
    15         {
    16             /* 队列未满 */
    17             if ( ( pxQueue->uxMessagesWaiting < pxQueue->uxLength )
    18                  || ( xCopyPosition == queueOVERWRITE ) ) {	(6)	
    19                 traceQUEUE_SEND( pxQueue );
    20                 xYieldRequired =
    21           prvCopyDataToQueue( pxQueue, pvItemToQueue, xCopyPosition ); (7)
    22 
    23                 /* 已删除使用队列集部分代码 */
    24                 /* 如果有任务在等待获取此消息队列 */
    25       if ( listLIST_IS_EMPTY(&(pxQueue->xTasksWaitingToReceive))==pdFALSE){ (8)
    26                     /* 将任务从阻塞中恢复 */
    27             if ( xTaskRemoveFromEventList(
    28                   &( pxQueue->xTasksWaitingToReceive ) )!=pdFALSE) { (9)
    29                         /* 如果恢复的任务优先级比当前运行任务优先级还高,
    30                         那么需要进行一次任务切换 */
    31                         queueYIELD_IF_USING_PREEMPTION();	(10)
    32                     } else {
    33                         mtCOVERAGE_TEST_MARKER();
    34                     }
    35                 } else if ( xYieldRequired != pdFALSE ) {
    36                     /* 如果没有等待的任务,拷贝成功也需要任务切换 */
    37                     queueYIELD_IF_USING_PREEMPTION();		(11)
    38                 } else {
    39                     mtCOVERAGE_TEST_MARKER();
    40                 }
    41 
    42                 taskEXIT_CRITICAL();				(12)
    43                 return pdPASS;
    44             }
    45             /* 队列已满 */
    46             else {						(13)
    47                 if ( xTicksToWait == ( TickType_t ) 0 ) {
    48                     /* 如果用户不指定阻塞超时时间,退出 */
    49                     taskEXIT_CRITICAL();			(14)
    50                     traceQUEUE_SEND_FAILED( pxQueue );
    51                     return errQUEUE_FULL;
    52                 } else if ( xEntryTimeSet == pdFALSE ) {	
    53             		/* 初始化阻塞超时结构体变量,初始化进入
    54             	阻塞的时间xTickCount和溢出次数xNumOfOverflows */
    55                     vTaskSetTimeOutState( &xTimeOut );		(15)
    56                     xEntryTimeSet = pdTRUE;
    57                 } else {
    58                     mtCOVERAGE_TEST_MARKER();
    59                 }
    60             }
    61         }
    62         taskEXIT_CRITICAL();					(16)
    63         /* 挂起调度器 */
    64         vTaskSuspendAll();
    65         /* 队列上锁 */
    66         prvLockQueue( pxQueue );
    67 
    68         /* 检查超时时间是否已经过去了 */
    69         if (xTaskCheckForTimeOut(&xTimeOut, &xTicksToWait)==pdFALSE){ (17)
    70             /* 如果队列还是满的 */
    71             if ( prvIsQueueFull( pxQueue ) != pdFALSE ) {	(18)	
    72                 traceBLOCKING_ON_QUEUE_SEND( pxQueue );	
    73                 /* 将当前任务添加到队列的等待发送列表中
    74                    以及阻塞延时列表,延时时间为用户指定的超时时间xTicksToWait */
    75                 vTaskPlaceOnEventList(
    76                    &( pxQueue->xTasksWaitingToSend ), xTicksToWait );(19)
    77                 /* 队列解锁 */
    78                 prvUnlockQueue( pxQueue );			(20)
    79 
    80                 /* 恢复调度器 */
    81                 if ( xTaskResumeAll() == pdFALSE ) {
    82                     portYIELD_WITHIN_API();
    83                 }
    84             } else {
    85                 /* 队列有空闲消息空间,允许入队 */
    86                 prvUnlockQueue( pxQueue );			(21)
    87                 ( void ) xTaskResumeAll();
    88             }
    89         } else {
    90             /* 超时时间已过,退出 */
    91             prvUnlockQueue( pxQueue );				(22)
    92             ( void ) xTaskResumeAll();
    93 
    94             traceQUEUE_SEND_FAILED( pxQueue );
    95             return errQUEUE_FULL;
    96         }
    97     }
    98 }
    99 /*-----------------------------------------------------------*/
    

    如果阻塞时间不为 0,任务会因为等待入队而进入阻塞, 在将任务设置为阻塞的过程中, 系统不希望有其它任务和中断操作这个队列的 xTasksWaitingToReceive 列表和 xTasksWaitingToSend 列表,因为可能引起其它任务解除阻塞,这可能会发生优先级翻转。比如任务 A 的优先级低于当前任务,但是在当前任务进入阻塞的过程中,任务 A 却因为其它原因解除阻塞了,这显然是要绝对禁止的。因此FreeRTOS 使用挂起调度器禁止其它任务操作队列,因为挂起调度器意味着任务不能切换并且不准调用可能引起任务切换的 API 函数。但挂起调度器并不会禁止中断,中断服务函数仍然可以操作队列阻塞列表,可能会解除任务阻塞、可能会进行上下文切换,这也是不允许的。于是,FreeRTOS解决办法是不但挂起调度器,还要给队列上锁,禁止任何中断来操作队列。
    下面来看看流程图:
    消息队列发送流程
    相比在任务中调用的发送函数,在中断中调用的函数会更加简单一些, 没有任务阻塞操作。
    函数 xQueueGenericSend中插入数据后, 会检查等待接收链表是否有任务等待,如果有会恢复就绪。如果恢复的任务优先级比当前任务高, 则会触发任务切换;但是在中断中调用的这个函数的做法是返回一个参数标志是否需要触发任务切换,并不在中断中切换任务。
    在任务中调用的函数中有锁定和解锁队列的操作, 锁定队列的时候, 队列的事件链表不能被修改。 而在被中断中发送消息的处理是: 当遇到队列被锁定的时候, 将新数据插入到队列后, 并不会直接恢复因为等待接收的任务, 而是累加了计数, 当队列解锁的时候, 会根据这个计数, 对应恢复几个任务。
    遇到队列满的情况, 函数会直接返回, 而不是阻塞等待, 因为在中断中阻塞是不允许的!!!

     1 BaseType_t xQueueGenericSendFromISR(
     2        QueueHandle_t xQueue,
     3        const void * const pvItemToQueue,
     4        /* 不在中断函数中触发任务切换, 而是返回一个标记 */
     5        BaseType_t * const pxHigherPriorityTaskWoken,
     6        const BaseType_t xCopyPosition )
     7{
     8    BaseType_t xReturn;
     9    UBaseType_t uxSavedInterruptStatus;
    10    Queue_t * const pxQueue = ( Queue_t * ) xQueue;
    11
    12    uxSavedInterruptStatus = portSET_INTERRUPT_MASK_FROM_ISR();
    13    {
    14        // 判断队列是否有空间插入新内容
    15        if( ( pxQueue->uxMessagesWaiting < pxQueue->uxLength ) || ( xCopyPosition == queueOVERWRITE ) )
    16        {
    17            const int8_t cTxLock = pxQueue->cTxLock;
    18
    19            // 中断中不能使用互斥锁, 所以拷贝函数只是拷贝数据,
    20            // 没有任务优先级继承需要考虑
    21            ( void ) prvCopyDataToQueue( pxQueue, pvItemToQueue, xCopyPosition );
    22
    23            // 判断队列是否被锁定
    24            if( cTxLock == queueUNLOCKED )
    25            {
    26            #if ( configUSE_QUEUE_SETS == 1 )
    27                // 集合相关代码
    28            #else /* configUSE_QUEUE_SETS */
    29                {
    30                    // 将最高优先级的等待任务恢复到就绪链表
    31                    if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE )
    32                    {
    33                        if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE)
    34                        {
    35                            // 如果有高优先级的任务被恢复
    36                            // 此处不直接触发任务切换, 而是返回一个标记
    37                            if( pxHigherPriorityTaskWoken != NULL )
    38                            {
    39                                *pxHigherPriorityTaskWoken = pdTRUE;
    40                            }
    41                        }
    42                    }
    43                }
    44            #endif /* configUSE_QUEUE_SETS */
    45            }
    46            else
    47            {
    48                // 队列被锁定, 不能修改事件链表
    49                // 增加计数, 记录需要接触几个任务到就绪
    50                // 在解锁队列的时候会根据这个计数恢复任务
    51                pxQueue->cTxLock = ( int8_t ) ( cTxLock + 1 );
    52            }
    53            xReturn = pdPASS;
    54        }
    55        else
    56        {
    57            // 队列满 直接返回 不阻塞
    58            xReturn = errQUEUE_FULL;
    59        }
    60    }
    61
    62    // 恢复中断的优先级
    63    portCLEAR_INTERRUPT_MASK_FROM_ISR( uxSavedInterruptStatus );
    64
    65    return xReturn;
    66}
    

    消息队列读取

    消息读取
    任务调用接收函数收取队列消息, 函数首先判断当前队列是否有未读消息, 如果没有, 则会判断参数 xTicksToWait, 决定直接返回函数还是阻塞等待。
    如果队列中有消息未读, 首先会把待读的消息复制到传进来的指针所指内, 然后判断函数参数 xJustPeeking == pdFALSE的时候, 符合的话, 说明这个函数读取了数据, 需要把被读取的数据做出队处理, 如果不是, 则只是查看一下(peek),只是返回数据,但是不会把数据清除。
    对于正常读取数据的操作, 清除数据后队列会空出空位, 所以查看队列中的等待列表中是否有任务等发送数据而被挂起, 有的话恢复一个任务就绪, 并根据优先级判断是否需要出进行任务切换。
    对于只是查看数据的, 由于没有清除数据, 所以没有空间新空出,不需要检查发送等待链表, 但是会检查接收等待链表, 如果有任务挂起会切换其到就绪并判断是否需要切换。

    消息队列出队过程分析,其实跟入队差不多,请看注释:

     1 /*-----------------------------------------------------------*/
     2 BaseType_t xQueueGenericReceive( QueueHandle_t xQueue,		(1)	
     3                                  void * const pvBuffer,		(2)
     4                                  TickType_t xTicksToWait,	(3)	
     5                                  const BaseType_t xJustPeeking )	(4)
     6 {
     7     BaseType_t xEntryTimeSet = pdFALSE;
     8     TimeOut_t xTimeOut;
     9     int8_t *pcOriginalReadPosition;
    10     Queue_t * const pxQueue = ( Queue_t * ) xQueue;
    11 
    12     /* 已删除一些断言 */
    13     for ( ;; ) {
    14         taskENTER_CRITICAL();					(5)
    15         {
    16             const UBaseType_t uxMessagesWaiting = pxQueue->uxMessagesWaiting; 
    17 
    18             /* 看看队列中有没有消息 */
    19             if ( uxMessagesWaiting > ( UBaseType_t ) 0 ) {	(6)	
    20                 /*防止仅仅是读取消息,而不进行消息出队操作*/
    21                 pcOriginalReadPosition = pxQueue->u.pcReadFrom;	(7)
    22                 /* 拷贝消息到用户指定存放区域pvBuffer */
    23                 prvCopyDataFromQueue( pxQueue, pvBuffer );	(8)
    24 
    25                 if ( xJustPeeking == pdFALSE ) {		(9)
    26                     /* 读取消息并且消息出队 */
    27                     traceQUEUE_RECEIVE( pxQueue );	
    28 
    29                     /* 获取了消息,当前消息队列的消息个数需要减一 */
    30                     pxQueue->uxMessagesWaiting = uxMessagesWaiting - 1;  (10)
    31                     /* 判断一下消息队列中是否有等待发送消息的任务 */
    32                     if ( listLIST_IS_EMPTY(			(11)
    33                              &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE ) {
    34                         /* 将任务从阻塞中恢复 */
    35                         if ( xTaskRemoveFromEventList(		(12)
    36                                  &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE ) {
    37                             /* 如果被恢复的任务优先级比当前任务高,会进行一次任务切换 */
    38                             queueYIELD_IF_USING_PREEMPTION();	(13)
    39                         } else {
    40                             mtCOVERAGE_TEST_MARKER();
    41                         }
    42                     } else {
    43                         mtCOVERAGE_TEST_MARKER();
    44                     }
    45                 } else {					(14)
    46                     /* 任务只是看一下消息(peek),并不出队 */	
    47                     traceQUEUE_PEEK( pxQueue );
    48 
    49                     /* 因为是只读消息 所以还要还原读消息位置指针 */
    50                     pxQueue->u.pcReadFrom = pcOriginalReadPosition; (15)
    51 
    52                     /* 判断一下消息队列中是否还有等待获取消息的任务 */
    53                     if ( listLIST_IS_EMPTY(			(16)
    54                              &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE ) {
    55                         /* 将任务从阻塞中恢复 */
    56                         if ( xTaskRemoveFromEventList(			
    57                               &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE ) {
    58                             /* 如果被恢复的任务优先级比当前任务高,会进行一次任务切换 */
    59                             queueYIELD_IF_USING_PREEMPTION();	
    60                         } else {
    61                             mtCOVERAGE_TEST_MARKER();
    62                         }
    63                     } else {
    64                         mtCOVERAGE_TEST_MARKER();
    65                     }
    66                 }
    67 
    68                 taskEXIT_CRITICAL();				(17)
    69                 return pdPASS;
    70             } else {						(18)
    71                 /* 消息队列中没有消息可读 */
    72                 if ( xTicksToWait == ( TickType_t ) 0 ) {	(19)	
    73                     /* 不等待,直接返回 */
    74                     taskEXIT_CRITICAL();
    75                     traceQUEUE_RECEIVE_FAILED( pxQueue );
    76                     return errQUEUE_EMPTY;
    77                 } else if ( xEntryTimeSet == pdFALSE ) {		
    78                     /* 初始化阻塞超时结构体变量,初始化进入
    79                     阻塞的时间xTickCount和溢出次数xNumOfOverflows */
    80                     vTaskSetTimeOutState( &xTimeOut );		(20)
    81                     xEntryTimeSet = pdTRUE;
    82                 } else {
    83                     mtCOVERAGE_TEST_MARKER();
    84                 }
    85             }
    86         }
    87         taskEXIT_CRITICAL();					
    88 
    89         vTaskSuspendAll();
    90         prvLockQueue( pxQueue );				(21)
    91 
    92         /* 检查超时时间是否已经过去了*/
    93         if ( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) == pdFALSE ) {(22)
    94             /* 如果队列还是空的 */
    95             if ( prvIsQueueEmpty( pxQueue ) != pdFALSE ) {
    96                 traceBLOCKING_ON_QUEUE_RECEIVE( pxQueue );	(23)	
    97                 /* 将当前任务添加到队列的等待接收列表中
    98                    以及阻塞延时列表,阻塞时间为用户指定的超时时间xTicksToWait */
    99                 vTaskPlaceOnEventList(				
    100                     &( pxQueue->xTasksWaitingToReceive ), xTicksToWait );
    101                 prvUnlockQueue( pxQueue );
    102                 if ( xTaskResumeAll() == pdFALSE ) {		
    103                     /* 如果有任务优先级比当前任务高,会进行一次任务切换 */
    104                     portYIELD_WITHIN_API();			
    105                 } else {
    106                     mtCOVERAGE_TEST_MARKER();
    107                 }
    108             } else {
    109                 /* 如果队列有消息了,就再试一次获取消息 */
    110                 prvUnlockQueue( pxQueue );			(24)
    111                 ( void ) xTaskResumeAll();
    112             }
    113         } else {
    114             /* 超时时间已过,退出 */
    115             prvUnlockQueue( pxQueue );				(25)
    116             ( void ) xTaskResumeAll();
    117 
    118             if ( prvIsQueueEmpty( pxQueue ) != pdFALSE ) {
    119                 /* 如果队列还是空的,返回错误代码errQUEUE_EMPTY */
    120                 traceQUEUE_RECEIVE_FAILED( pxQueue );
    121                 return errQUEUE_EMPTY;				(26)
    122             } else {
    123                 mtCOVERAGE_TEST_MARKER();
    124             }
    125         }
    126     }
    127 }
    128 /*-----------------------------------------------------------*/
    

    提示

    如果队列存储的数据较大时,那最好是利用队列来传递数据的指针而不是数据本身,因为传递数据的时候是需要CPU一字节一字节地将数据拷贝进队列或从队列拷贝出来。而传递指针无论是在处理速度上还是内存空间利用上都更有效。但是,当利用队列传递指针时,一定要十分小心地做到以下两点:

    1.指针指向的内存空间的所有权必须明确

    当任务间通过指针共享内存时,应该从根本上保证所不会有任意两个任务同时修改共享内存中的数据,或是以其它行为方式使得共享内存数据无效或产生一致性问题。原则上,共享内存在其指针发送到队列之前,其内容只允许被发送任务访问;共享内存指针从队列中被读出之后,其内容亦只允许被接收任务访问。

    2.指针指向的内存空间必须有效

    如果指针指向的内存空间是动态分配的,只应该有一个任务负责对其进行内存释放。当这段内存空间被释放之后,就不应该有任何一个任务再访问这段空间。
    并且最最最重要的是禁止使用指针访问任务栈上的空间,也就是局部变量。因为当栈发生改变后,栈上的数据将不再有效。

    展开全文
  • 通俗易懂讲消息队列

    万次阅读 多人点赞 2020-03-05 17:00:21
    一、什么是消息队列消息队列不知道大家看到这个词的时候,会不会觉得它是一个比较高端的技术,反正我是觉得它好像是挺牛逼的。 消息队列,一般我们会简称它为MQ(Message Queue),嗯...
  • RabbitMQ消息队列常见面试题总结

    万次阅读 2021-03-22 02:46:39
    RabbitMQ消息队列常见面试题总结; 1、什么是消息队列消息队列的优缺点? 2、Kafka、ActiveMQ、RabbitMQ、RocketMQ的区别? 3、如何保证消息不被重复消费? 4、如何保证消息不丢失,进行可靠性传输? 5、如何保证...
  • 1 为什么要使用消息队列? 回答:这个问题,咱只答三个最主要的应用场景(不可否认还有其他的,但是只答三个主要的),即以下六个字: (1)解耦 传统模式: 传统模式的缺点:系统间耦合性太强,如上图所示,系统A在代码中...
  • 消息队列”是在消息的传输过程中保存消息的容器。 “消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。 消息被发送到队列中。“消息队列”是在...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 529,930
精华内容 211,972
关键字:

消息对列