消息队列 订阅
“消息队列”是在消息的传输过程中保存消息的容器。 展开全文
“消息队列”是在消息的传输过程中保存消息的容器。
信息
外文名
Message queue
对    象
两台计算机间
特    点
消息可以非常简单也可以更复杂
含    义
消息的传输过程中保存消息的容器
中文名
消息队列
消息队列消息简介
“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。消息被发送到队列中。“消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。
收起全文
精华内容
参与话题
问答
  • 什么是消息队列你了解过么?

    你知道的越多,你不知道的越多

    点赞再看,养成习惯

    GitHub上已经开源 https://github.com/JavaFamily 有一线大厂面试点脑图、个人联系方式,欢迎Star和完善

    面试开始

    一个风度翩翩,穿着格子衬衣的中年男子,拿着一个满是划痕的mac向你走来,看着铮亮的头,心想着肯定是尼玛顶级架构师吧!但是我们看过暖男敖丙的系列,腹有诗书气自华,虚都不虚。

    小伙子之前问了你这么多Redis的知识,你不仅对答如流,你还能把各自场景的解决方案,优缺点说得这么流畅,说你是不是看过敖丙写的《吊打面试官》系列呀?

    惊!!!老师你怎么知道的,我看了他的系列根本停不下来啊。

    呵呵,Redis没难住你,但是我问个新的技术栈我还怕难不住你?我问问你你项目中用过消息队列么?你为啥用消息队列?

    噗此,这也叫问题?别人用了我能不用么?别人用了我就用了呗,我就是为了用而用。

    你心里嘀咕就好了,千万别说出来哈,说出来了没拿到Offer别到时候就在那说,敖丙那个渣男教我说的!

    面试官你好:我们公司本身的业务体量很小,所以直接单机一把梭啥都能搞定了,但是后面业务体量不断扩大,采用微服务的设计思想分布式的部署方式,所以拆分了很多的服务,随着体量的增加以及业务场景越来越复杂了,很多场景单机的技术栈和中间件以及不够用了,而且对系统的友好性也下降了,最后做了很多技术选型的工作,我们决定引入消息队列中间件

    哦?你说到业务场景越来越复杂,你那说一下你都在什么场景用到了消息队列?

    嗯,我从三个方面去说一下我使用的场景吧。

    Tip:这三个场景也是消息队列的经典场景,大家基本上要烂熟于心那种,就是一说到消息队列你脑子就要想到异步、削峰、解耦,条件反射那种。

    异步:

    我们之前的场景里面有很多步骤都是在一个流程里面需要做完的,就比如说我的下单系统吧,本来我们业务简单,下单了付了钱就好了,流程就走完了。

    但是后面来了个产品经理,搞了个优惠券系统,OK问题不大,流程里面多100ms去扣减优惠券。

    后来产品经理灵光一闪说我们可以搞个积分系统啊,也行吧,流程里面多了200ms去增减积分。

    再后来后来隔壁的产品老王说:下单成功后我们要给用户发短信,也将就吧,100ms去发个短信。

    再后来。。。(敖丙你有完没完!!!)

    反正就流程有点像这样 ↓

    你们可以看到这才加了三个,我可以斩钉截铁的告诉你真正的下单流程涉及的系统绝对在10个以上(主流电商),越大的越多。

    这个链路这样下去,时间长得一批,用户发现我买个东西你特么要花几十秒,垃圾电商我不在你这里买了,不过要是都像并夕夕这么便宜,真香

    但是我们公司没有夕夕的那个经济实力啊,那只能优化系统了。

    Tip:我之前在的电商老东家要求所有接口的RtResponseTime响应时间)在200ms内,超出的全部优化,我现在所负责的系统QPS也是9W+就是抖动一下网络集群都可能炸锅那种,RT基本上都要求在50ms以内。

    大家感受一下这个QPS。

    嗯不错,链路长了就慢了,那你怎么解决的?

    那链路长了就慢了,但是我们发现上面的流程其实可以同时做的呀,你支付成功后,我去校验优惠券的同时我可以去增减积分啊,还可以同时发个短信啊。

    那正常的流程我们是没办法实现的呀,怎么办,异步

    你对比一下是不是发现,这样子最多只用100毫秒用户知道下单成功了,至于短信你迟几秒发给他他根本不在意是吧。

    小伙子我打断你一下,你说了异步,那我用线程,线程池去做不是一样的么?

    诶呀,面试官你不要急嘛,我后面还会说到的,骚等。

    解耦:

    既然面试官这么问了,我就说一下为啥我们不能用线程去做,因为用线程去做,你是不是要写代码?

    你一个订单流程,你扣积分,扣优惠券,发短信,扣库存。。。等等这么多业务要调用这么多的接口,每次加一个你要调用一个接口然后还要重新发布系统,写一次两次还好,写多了你就说:老子不干了!

    而且真的全部都写在一起的话,不单单是耦合这一个问题,你出问题排查也麻烦,流程里面随便一个地方出问题搞不好会影响到其他的点,小伙伴说我每个流程都try catch不就行了,相信我别这么做,这样的代码就像个定时炸弹💣,你不知道什么时候爆炸,平时不炸偏偏在你做活动的时候炸,你就领个P0故障收拾书包提前回家过年吧。

    Tip:P0—PN 是互联网大厂经常用来判定事故等级的机制,P0是最高等级了。

    但是你用了消息队列,耦合这个问题就迎刃而解了呀。

    哦,帅丙怎么说?

    且听我娓娓道来:

    你下单了,你就把你支付成功的消息告诉别的系统,他们收到了去处理就好了,你只用走完自己的流程,把自己的消息发出去,那后面要接入什么系统简单,直接订阅你发送的支付成功消息,你支付成功了我监听就好了

    那你的流程走完了,你不用管别人是否成功么?比如你下单了积分没加,优惠券没扣怎么办?

    问题是个好问题,但是没必要考虑,业务系统本身就是自己的开发人员维护的,你积分扣失败关我下单的什么事情?你管好自己下单系统的就好了。

    Tip:话是这么说,但是这其实是用了消息队列的一个缺点,涉及到分布式事务的知识点,我下面会提到。

    削峰:

    就拿我上一期写的秒杀来说(暗示新同学看我上一期),你平时流量很低,但是你要做秒杀活动00 :00的时候流量疯狂怼进来,你的服务器,RedisMySQL各自的承受能力都不一样,你直接全部流量照单全收肯定有问题啊,直接就打挂了。

    那怎么办?

    简单,把请求放到队列里面,然后至于每秒消费多少请求,就看自己的服务器处理能力,你能处理5000QPS你就消费这么多,可能会比正常的慢一点,但是不至于打挂服务器,等流量高峰下去了,你的服务也就没压力了。

    你看阿里双十一12:00的时候这么多流量瞬间涌进去,他有时候是不是会慢一点,但是人家没挂啊,或者降级给你个友好的提示页面,等高峰过去了又是一条好汉了。

    为了这个图特意打高一台服务的流量
    为了这个图特意打高一台服务的流量

    听你说了辣么多,怎么都是好处,那我问你使用了消息队列有啥问题么?

    诶,看过前面我写的文章的人才都知道,我经常说的就是,技术是把双刃剑

    没错面试官,我使用他是因为他带给我们很多好处,但是使用之后问题也是接踵而至

    同样的暖男我呀,也从三个点介绍他主要的缺点:

    系统复杂性

    本来蛮简单的一个系统,我代码随便写都没事,现在你凭空接入一个中间件在那,我是不是要考虑去维护他,而且使用的过程中是不是要考虑各种问题,比如消息重复消费消息丢失消息的顺序消费等等,反正用了之后就是贼烦。

    我插一句嘴,上面的问题(重复消费、消息丢失、顺序消费)你能分别介绍一下,并且说一下分别是怎么解决的么?

    不要!我都说了敖丙下一章写啥?

    其实不是暖男我不想在这里写,这三个问题我想了下,统统都是MQ重点问题,单独拿一个出来就是一篇文章了,篇幅实在太长了,我会在下一章挨个介绍一遍的。

    数据一致性

    这个其实是分布式服务本身就存在的一个问题,不仅仅是消息队列的问题,但是放在这里说是因为用了消息队列这个问题会暴露得比较严重一点。

    就像我开头说的,你下单的服务自己保证自己的逻辑成功处理了,你成功发了消息,但是优惠券系统,积分系统等等这么多系统,他们成功还是失败你就不管了?

    我说了保证自己的业务数据对的就好了,其实还是比较不负责任的一种说法,这样就像个渣男,没有格局这样呀你的路会越走越窄的

    所有的服务都成功才能算这一次下单是成功的,那怎么才能保证数据一致性呢?

    分布式事务:把下单,优惠券,积分。。。都放在一个事务里面一样,要成功一起成功,要失败一起失败。

    Tip:分布式事务在互联网公司里面实在常见,我也不在这里大篇幅介绍了,后面都会专门说的。

    可用性

    你搞个系统本身没啥问题,你现在突然接入一个中间件在那放着,万一挂了怎么办?我下个单MQ挂了,优惠券不扣了,积分不减了,这不是杀一个程序员能搞定的吧,感觉得杀一片。

    至于怎么保证高可用,还是那句话也不在这里展开讨论了,我后面一样会写,像写Redis那样写出来的。

    放心敖丙我不是渣男来的,我肯定会对你们负责的。点赞!

    看不出来啊,你有点东西呀,那我问一下你,你们是怎么做技术选型的?

    目前在市面上比较主流的消息队列中间件主要有,Kafka、ActiveMQ、RabbitMQ、RocketMQ 等这几种。

    不过敖丙我想说的是,ActiveMQRabbitMQ这两着因为吞吐量还有GitHub的社区活跃度的原因,在各大互联网公司都已经基本上绝迹了,业务体量一般的公司会是有在用的,但是越来越多的公司更青睐RocketMQ这样的消息中间件了。

    KafkaRocketMQ一直在各自擅长的领域发光发亮,不过写这篇文章的时候我问了蚂蚁金服,字节跳动和美团的朋友,好像大家用的都有点不一样,应该都是各自的中间件,可能做过修改,也可能是自研的,大多没有开源

    就像我们公司就是是基于KafkaRocketMQ两者的优点自研的消息队列中间件,吞吐量、可靠性、时效性等都很可观。

    我们回归正题,我这里用网上找的对比图让大家看看差距到底在哪里:

    大家其实一下子就能看到差距了,就拿吞吐量来说,早期比较活跃的ActiveMQRabbitMQ基本上不是后两者的对手了,在现在这样大数据的年代吞吐量是真的很重要

    比如现在突然爆发了一个超级热点新闻,你的APP注册用户高达亿数,你要想办法第一时间把突发全部推送到每个人手上,你没有大吞吐量的消息队列中间件用啥去推?

    再说这些用户大量涌进来看了你的新闻产生了一系列的附带流量,你怎么应对这些数据,很多场景离开消息队列基本上难以为继

    部署方式而言前两者也是大不如后面两个天然分布式架构的哥哥,都是高可用的分布式架构,而且数据多个副本的数据也能做到0丢失。

    我们再聊一下RabbitMQ这个中间件其实还行,但是这玩意开发语言居然是erlang,我敢说绝大部分工程师肯定不会为了一个中间件去刻意学习一门语言的,开发维护成本你想都想不到,出个问题查都查半天。

    至于RocketMQ(阿里开源的),git活跃度还可以。基本上你push了自己的bug确认了有问题都有阿里大佬跟你试试解答并修复的,我个人推荐的也是这个,他的架构设计部分跟同样是阿里开源的一个RPC框架是真的很像(Dubbo)可能是因为师出同门的原因吧。

    Tip:Dubbo等我写到RPC我会详细介绍的。

    Kafka我放到最后说,你们也应该知道了,压轴的这是个大哥,大数据领域,公司的日志采集,实时计算等场景,都离不开他的身影,他基本上算得上是世界范围级别的消息队列标杆了。

    以上这些都只是一些我自己的个人意见,真正的选型还是要去深入研究的,不然那你公司一天UV就1000你告诉我你要去用Kafka我只能说你吃饱撑的。

    记住,没有最好的技术,只有最适合的技术,不要为了用而用

    面试结束

    嗯,小伙子不错不错,分析得很到位,那你记得下期来说一下消息队列的高可用,重复消费、消息丢失、消息顺序、分布式事务等问题?

    嗯嗯好的面试官,不过不确定能不能一口气说完,毕竟敖丙还没开始写,而且读者还有可能白嫖,动力不一定够。

    嗯嗯这倒是个问题,不过啊在看的都是人才肯定会给你点赞👍的!

    我也这么认为。

    总结

    消息队列的基础知识我就先介绍这么多,消息队列在面试里面基本上也是跟我前面写的Redis一样必问的。

    面试的思路还是一样,要知其然,也要知其所以然,就是要知道为啥用,用了有啥好处,有啥坑。

    面试官不喜欢只知道用的,你只会用那哪天线上出问题怎么办?你难道在旁边拜佛?

    Tip:本来有很多我准备的资料的,但是都是外链,或者不合适的分享方式,博客的运营小姐姐提醒了我,所以大家去公众号回复【资料】好了。

    鸣谢

    之前的文章写了很多人加我,然后有个人才说是他蚂蚁金服的Leader推荐的我,我突然意识到我文章的受众好像慢慢变广了,之后不严谨的点要杜绝掉。

    所以之后我的文章经常会有大厂的小伙伴Review,也希望帮助我更好的监督自己的文章吧。

    这次是 某阿里系电商跟我一起做过活动小组的 佩恩 帮我Review的文章,感谢!

    絮叨

    另外,敖丙把自己的面试文章整理成了一本电子书,共 1630页!目录如下

    现在免费送给大家,在我的公众号三太子敖丙回复 【888】 即可获取。

    我是敖丙,一个在互联网苟且偷生的程序员。

    你知道的越多,你不知道的越多人才们的 【三连】 就是丙丙创作的最大动力,我们下期见!

    注:如果本篇博客有任何错误和建议,欢迎人才们留言!


    文章持续更新,可以微信搜索「 三太子敖丙 」第一时间阅读,回复【资料】有我准备的一线大厂面试资料和简历模板,本文 GitHub https://github.com/JavaFamily 已经收录,有大厂面试完整考点,欢迎Star。

    展开全文
  • 什么是消息队列

    千次阅读 2018-03-01 13:58:00
    一、什么是消息队列 消息(Message)是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。 消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以...

    最近公司重构订单中心,用到消息中间件,闲暇时间对此进行学习了解,下面是学习内容的总结。
    一、什么是消息队列
    消息(Message)是指在应用之间传送的数据,消息可以非常简单,比如只包含文本字符串,也可以更复杂,可能包含嵌入对象。
    消息队列(Message Queue)是一种应用间的通信方式,消息发送后可以立即返回,有消息系统来确保信息的可靠专递,消息发布者只管把消息发布到MQ中而不管谁来取,消息使用者只管从MQ中取消息而不管谁发布的,这样发布者和使用者都不用知道对方的存在。
    二、为何使用消息队列
    从上面描述中可以看出消息队列是一种应用间的异步协作机制,那什么时候需要使用MQ呢?
    以常见的订单系统为例子,用户点击【下单】按钮后的业务逻辑包括:扣减库存、生成相应的单据、发红包、发短信通知‘在业务发展初期这些逻辑可能放在一起同步执行,随着业务订单量增长,需要提升系统服务的性能,这时候可以将一些不需要立即生效的操作拆分出来异步执行,,比如发红包、发短信通知等。这种场景就可以用MQ,在下单的主流程(比如扣减库存、生成相应的单据)完成之后发送一条消息到MQ让主流程快速完结,而由另外的单独线程拉取MQ的消息(或者由MQ推送消息),当发现MQ中有发红包或者发短信之类的消息,执行相应的业务逻辑。
    以上是用于业务解耦的情况,其他常见场景包括最终一致性、广播、错峰流控等等。
    三、RabbitMQ特点
    RabbitMQ是一个由Erlang语言开发的AMQP的开源实现。
    AMQP:Advanced Meassage Queue,高级消息队列协议。它是应用层协议的一个开放标准,为面向消息的中间设计,基于此协议的客户端与消息中间件可传递消息,并不受产品、开发语言等条件限制。

    RabbitMQ最初起源于金融系统,用于分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗,具体特点:
    1、可靠性(Reliablitity)
    RabbitMQ 使用一些机制来保证可靠性,如持久化、传输确认、发布确认。
    2、灵活的路由(Flexible Routing)
    在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。
    3、消息集群(Clustering)
    多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。
    4、高可用(Highly Available Queues)
    队列可以在集群中的机器上进行镜像,使得在部分节点出问题的情况下队列仍然可用。
    5、多种协议(Multi-protocol)
    RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。
    6、多语言客户端(Many Clients)
    RabbitMQ 几乎支持所有常用语言,比如 Java、.NET、Ruby 等等。
    7、管理界面(Management UI)
    RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。
    8、跟踪机制(Tracing)
    如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。
    9、插件机制(Plugin System)
    RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编写自己的插件。

    四、RabbitMQ中的概念模型
    消息模型
    所有 MQ 产品从模型抽象上来说都是一样的过程:
    消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。
    这里写图片描述

    RabbitMQ 基本概念
    上面只是最简单抽象的描述,具体到 RabbitMQ 则有更详细的概念需要解释。上面介绍过 RabbitMQ 是 AMQP 协议的一个开源实现,所以其内部实际上也是 AMQP 中的基本概念:
    这里写图片描述

    1、Message
    消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
    2、Publisher
    消息的生产者,也是一个向交换器发布消息的客户端应用程序。
    3、Exchange
    交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
    4、Binding
    绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
    5、Queue
    消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
    6、Connection
    网络连接,比如一个TCP连接。
    7、Channel
    信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
    8、Consumer
    消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
    9、Virtual Host
    虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
    10、Broker
    表示消息队列服务器实体。

    五、消息队列的应用及好处

    例如

    (1)服务员点菜快,厨师做菜慢,服务员只需要下单给厨师,然后就可以继续去服务顾客,不需要等待厨师把菜做完

    点菜单就相当于消息,放单子的位置就相当于队列

    (2)业务系统需要发短信,但短信发送模块速度跟不上,业务系统就可以把发送短信的相关信息封装为一个消息,放入队列,短信发送模块从队列中获取消息进行处理

    消息队列的好处
    (1)提高系统响应速度
    使用消息队列,生产者一方,把消息往消息队列里一扔,就可以立马返回响应用户,无需等待处理结果
    (2)保证消息的传递
    如果发送消息时接收者不可用,消息队列会保留消息,直到成功的传递它
    (3)解耦
    只要信息格式不变,即使接收者的接口、位置、或者配置改变,也不会给发送者带来任何改变
    消息发送者无需知道消息接收者是谁,使得系统设计更清晰
    为什么需要分布式消息队列
    (1)多系统协作需要分布式
    例如消息队列的数据需要在多个系统之间共享,所以需要提供分布式通信机制、协同机制
    (2)可靠
    消息会被持久化到分布式存储中,这样避免了单台机器存储的消息由于机器问题导致消息丢失
    (3)可扩展
    分布式消息队列,会随着访问量的增加而方便的增加处理服务器

    展开全文
  • 深入理解RabbitMQ消息队列的使用

    千人学习 2018-06-20 22:35:54
    RabbitMQ当中生产者以及消费者的具体实现、消费者如何做到消息的确认、RabbitMQ如何做到消息的公平分发、 RabbitMQ当中fanout、direct、topic交换机的特点以及转化关系、RabbitMQ基于RPC机制的具体实现等内容.
  • 消息队列使用的四种场景介绍

    千次阅读 2018-08-23 16:55:36
    消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题 实现高性能,高可用,可伸缩和最终一致性架构 使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ ...

    原文地址:https://blog.csdn.net/cws1214/article/details/52922267

    消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题

    实现高性能,高可用,可伸缩和最终一致性架构

    使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ

    二、消息队列应用场景

    以下介绍消息队列在实际应用中常用的使用场景。异步处理,应用解耦,流量削锋和消息通讯四个场景

    2.1异步处理

    场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种 1.串行的方式;2.并行方式

    (1)串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端

     

    (2)并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间

     

    假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。

    因为CPU在单位时间内处理的请求数是一定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)

    小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?

    引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:

     

    按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍

    2.2应用解耦

    场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图

     

    传统模式的缺点:

    • 假如库存系统无法访问,则订单减库存将失败,从而导致订单失败

    • 订单系统与库存系统耦合

    如何解决以上问题呢?引入应用消息队列后的方案,如下图:

     

    • 订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功

    • 库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作

    • 假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦

    2.3流量削锋

    流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛

    应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。

    • 可以控制活动的人数

    • 可以缓解短时间内高流量压垮应用

     

    • 用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面

    • 秒杀业务根据消息队列中的请求信息,再做后续处理

    2.4日志处理

    日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下

     

    • 日志采集客户端,负责日志数据采集,定时写受写入Kafka队列

    • Kafka消息队列,负责日志数据的接收,存储和转发

    • 日志处理应用:订阅并消费kafka队列中的日志数据

    以下是新浪kafka日志处理应用案例:转自(http://cloud.51cto.com/art/201507/484338.htm)

     

    (1)Kafka:接收用户日志的消息队列

    (2)Logstash:做日志解析,统一成JSON输出给Elasticsearch

    (3)Elasticsearch:实时日志分析服务的核心技术,一个schemaless,实时的数据存储服务,通过index组织数据,兼具强大的搜索和统计功能

    (4)Kibana:基于Elasticsearch的数据可视化组件,超强的数据可视化能力是众多公司选择ELK stack的重要原因

    2.5消息通讯

    消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等

    点对点通讯:

     

    客户端A和客户端B使用同一队列,进行消息通讯。

    聊天室通讯:

     

    客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊天室效果。

    以上实际是消息队列的两种消息模式,点对点或发布订阅模式。模型为示意图,供参考。

    三、消息中间件示例

    3.1电商系统

     

    消息队列采用高可用,可持久化的消息中间件。比如Active MQ,Rabbit MQ,Rocket Mq。

    (1)应用将主干逻辑处理完成后,写入消息队列。消息发送是否成功可以开启消息的确认模式。(消息队列返回消息接收成功状态后,应用再返回,这样保障消息的完整性)

    (2)扩展流程(发短信,配送处理)订阅队列消息。采用推或拉的方式获取消息并处理。

    (3)消息将应用解耦的同时,带来了数据一致性问题,可以采用最终一致性方式解决。比如主数据写入数据库,扩展应用根据消息队列,并结合数据库方式实现基于消息队列的后续处理。

    3.2日志收集系统

     

    分为Zookeeper注册中心,日志收集客户端,Kafka集群和Storm集群(OtherApp)四部分组成。

    • Zookeeper注册中心,提出负载均衡和地址查找服务

    • 日志收集客户端,用于采集应用系统的日志,并将数据推送到kafka队列

    • Kafka集群:接收,路由,存储,转发等消息处理

    Storm集群:与OtherApp处于同一级别,采用拉的方式消费队列中的数据

    四、JMS消息服务

    讲消息队列就不得不提JMS 。JMS(JAVA Message Service,java消息服务)API是一个消息服务的标准/规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

    在EJB架构中,有消息bean可以无缝的与JM消息服务集成。在J2EE架构模式中,有消息服务者模式,用于实现消息与应用直接的解耦。

    4.1消息模型

    在JMS标准中,有两种消息模型P2P(Point to Point),Publish/Subscribe(Pub/Sub)。

    4.1.1 P2P模式

     

    P2P模式包含三个角色:消息队列(Queue),发送者(Sender),接收者(Receiver)。每个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到他们被消费或超时。

    P2P的特点

    • 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)

    • 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列

    • 接收者在成功接收消息之后需向队列应答成功 

    如果希望发送的每个消息都会被成功处理的话,那么需要P2P模式。(架构KKQ:466097527,欢迎加入)

    4.1.2 Pub/sub模式

     

    包含三个角色主题(Topic),发布者(Publisher),订阅者(Subscriber) 多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。

    Pub/Sub的特点

    • 每个消息可以有多个消费者

    • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息

    • 为了消费消息,订阅者必须保持运行的状态

    为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

    如果希望发送的消息可以不被做任何处理、或者只被一个消息者处理、或者可以被多个消费者处理的话,那么可以采用Pub/Sub模型。

    4.2消息消费

    在JMS中,消息的产生和消费都是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。

    (1)同步

    订阅者或接收者通过receive方法来接收消息,receive方法在接收到消息之前(或超时之前)将一直阻塞;

    (2)异步

    订阅者或接收者可以注册为一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。

     

    JNDI:Java命名和目录接口,是一种标准的Java命名系统接口。可以在网络上查找和访问服务。通过指定一个资源名称,该名称对应于数据库或命名服务中的一个记录,同时返回资源连接建立所必须的信息。

    JNDI在JMS中起到查找和访问发送目标或消息来源的作用。

    4.3JMS编程模型

    (1) ConnectionFactory

    创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。可以通过JNDI来查找ConnectionFactory对象。

    (2) Destination

    Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。

    所以,Destination实际上就是两种类型的对象:Queue、Topic可以通过JNDI来查找Destination。

    (3) Connection

    Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session。跟ConnectionFactory一样,Connection也有两种类型:QueueConnection和TopicConnection。

    (4) Session

    Session是操作消息的接口。可以通过session创建生产者、消费者、消息等。Session提供了事务的功能。当需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。同样,也分QueueSession和TopicSession。

    (5) 消息的生产者

    消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。

    (6) 消息消费者

    消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。

    (7) MessageListener

    消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。

     

    深入学习JMS对掌握JAVA架构,EJB架构有很好的帮助,消息中间件也是大型分布式系统必须的组件。本次分享主要做全局性介绍,具体的深入需要大家学习,实践,总结,领会。

    五、常用消息队列

    一般商用的容器,比如WebLogic,JBoss,都支持JMS标准,开发上很方便。但免费的比如Tomcat,Jetty等则需要使用第三方的消息中间件。本部分内容介绍常用的消息中间件(Active MQ,Rabbit MQ,Zero MQ,Kafka)以及他们的特点。

    5.1 ActiveMQ

    ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

    ActiveMQ特性如下:

    ⒈ 多种语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

    ⒉ 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)

    ⒊ 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性

    ⒋ 通过了常见J2EE服务器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上

    ⒌ 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA

    ⒍ 支持通过JDBC和journal提供高速的消息持久化

    ⒎ 从设计上保证了高性能的集群,客户端-服务器,点对点

    ⒏ 支持Ajax

    ⒐ 支持与Axis的整合

    ⒑ 可以很容易得调用内嵌JMS provider,进行测试

    5.2 RabbitMQ

    RabbitMQ是流行的开源消息队列系统,用erlang语言开发。RabbitMQ是AMQP(高级消息队列协议)的标准实现。支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,持久化。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    结构图如下:(架构KKQ:466097527,欢迎加入)

    几个重要概念:

    Broker:简单来说就是消息队列服务器实体。

      Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。

      Queue:消息队列载体,每个消息都会被投入到一个或多个队列。

      Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。

      Routing Key:路由关键字,exchange根据这个关键字进行消息投递。

      vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。

      producer:消息生产者,就是投递消息的程序。

      consumer:消息消费者,就是接受消息的程序。

      channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

    消息队列的使用过程,如下:

    (1)客户端连接到消息队列服务器,打开一个channel。

    (2)客户端声明一个exchange,并设置相关属性。

    (3)客户端声明一个queue,并设置相关属性。

    (4)客户端使用routing key,在exchange和queue之间建立好绑定关系。

    (5)客户端投递消息到exchange。

    exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

    5.3 ZeroMQ

    号称史上最快的消息队列,它实际类似于Socket的一系列接口,他跟Socket的区别是:普通的socket是端到端的(1:1的关系),而ZMQ却是可以N:M 的关系,人们对BSD套接字的了解较多的是点对点的连接,点对点连接需要显式地建立连接、销毁连接、选择协议(TCP/UDP)和处理错误等,而ZMQ屏蔽了这些细节,让你的网络编程更为简单。ZMQ用于node与node间的通信,node可以是主机或者是进程。

    引用官方的说法: “ZMQ(以下ZeroMQ简称ZMQ)是一个简单好用的传输层,像框架一样的一个socket library,他使得Socket编程更加简单、简洁和性能更高。是一个消息处理队列库,可在多个线程、内核和主机盒之间弹性伸缩。ZMQ的明确目标是“成为标准网络协议栈的一部分,之后进入Linux内核”。现在还未看到它们的成功。但是,它无疑是极具前景的、并且是人们更加需要的“传统”BSD套接字之上的一 层封装。ZMQ让编写高性能网络应用程序极为简单和有趣。”

    特点是:

    • 高性能,非持久化

    • 跨平台:支持Linux、Windows、OS X等

    • 多语言支持; C、C++、Java、.NET、Python等30多种开发语言

    • 可单独部署或集成到应用中使用

    • 可作为Socket通信库使用

    与RabbitMQ相比,ZMQ并不像是一个传统意义上的消息队列服务器,事实上,它也根本不是一个服务器,更像一个底层的网络通讯库,在Socket API之上做了一层封装,将网络通讯、进程通讯和线程通讯抽象为统一的API接口。支持“Request-Reply “,”Publisher-Subscriber“,”Parallel Pipeline”三种基本模型和扩展模型。

     

    ZeroMQ高性能设计要点:

    1、无锁的队列模型

       对于跨线程间的交互(用户端和session)之间的数据交换通道pipe,采用无锁的队列算法CAS;在pipe两端注册有异步事件,在读或者写消息到pipe的时,会自动触发读写事件。

    2、批量处理的算法

       对于传统的消息处理,每个消息在发送和接收的时候,都需要系统的调用,这样对于大量的消息,系统的开销比较大,zeroMQ对于批量的消息,进行了适应性的优化,可以批量的接收和发送消息。

    3、多核下的线程绑定,无须CPU切换

       区别于传统的多线程并发模式,信号量或者临界区, zeroMQ充分利用多核的优势,每个核绑定运行一个工作者线程,避免多线程之间的CPU切换开销。

    5.4 Kafka

    Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群机来提供实时的消费。

    Kafka是一种高吞吐量的分布式发布订阅消息系统,有如下特性:

    • 通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。(文件追加的方式写入数据,过期的数据定期删除)

    • 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息

    • 支持通过Kafka服务器和消费机集群来分区消息

    • 支持Hadoop并行数据加载

     

    Kafka相关概念

    • Broker

    Kafka集群包含一个或多个服务器,这种服务器被称为broker[5]

    • Topic

    每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)

    • Partition

    Parition是物理上的概念,每个Topic包含一个或多个Partition.

    • Producer

    负责发布消息到Kafka broker

    • Consumer

    消息消费者,向Kafka broker读取消息的客户端。

    • Consumer Group

    每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)。

     

    一般应用在大数据日志处理或对实时性(少量延迟),可靠性(少量丢数据)要求稍低的场景使用。

    六、参考资料

    (1)Jms

    http://blog.sina.com.cn/s/blog_3fba24680100r777.html

    http://blog.csdn.net/jiuqiyuliang/article/details/46701559(深入浅出JMS(一)--JMS基本概念)

    (2)RabbitMQ

    http://baike.baidu.com/link?url=s2cU-QgOsXan7j0AM5qxxlmruz6WEeBQXX-Bbk0O3F5jt9Qts2uYQARxQxl7CBT2SO2NF2VkzX_XZLqU-CTaPa

    http://blog.csdn.net/sun305355024sun/article/details/41913105

    (3)Zero MQ

    http://www.searchtb.com/2012/08/zeromq-primer.html

    http://blog.csdn.net/yangbutao/article/details/8498790

    http://wenku.baidu.com/link?url=yYoiZ_pYPCuUxEsGQvMMleY08bcptZvwF3IMHo2W1i-ti66YXXPpLLJBGXboddwgGBnOehHiUdslFhtz7RGZYkrtMQQ02DV5sv9JFF4LZnK

    (4)Kafka

    http://baike.baidu.com/link?url=qQXyqvPQ1MVrw9WkOGSGEfSX1NHy4unsgc4ezzJwU94SrPuVnrKf2tbm4SllVaN3ArGGxV_N5hw8JTT2-lw4QK

    http://www.infoq.com/cn/articles/apache-kafka/

    http://www.mincoder.com/article/3942.shtml

    展开全文
  • 4. 消息队列

    千次阅读 2020-05-31 13:47:51
    消息队列 队列又称消息队列,常用于任务间通信的数据结构,可以在任务与任务之间,中断与任务之间传递消息,实现任务接收来自其他任务或中断的不固定长度的消息 任务可从消息队列中读取消消息,当消息队列为空,读取...

    消息队列

    队列又称消息队列,常用于任务间通信的数据结构,可以在任务与任务之间,中断与任务之间传递消息,实现任务接收来自其他任务或中断的不固定长度的消息

    任务可从消息队列中读取消消息,当消息队列为空,读取消息的任务被阻塞,用户可指定任务阻塞任务的时间 xTicksToWait

    如果消息队列为满,向消息队列发送消息的任务会进入阻塞态,同样可指定阻塞时间,超时恢复为就绪态

    在指定阻塞时间内,如果队列为空,任务保持阻塞状态等待队列数据有效;如果等待超时,任务自动恢复就绪态

    不论是否写入,不论是否读取成功,任务或中断程序会收到一个错误码 errQUEUE_FULL

    通过消息队列服务,任务或中断服务例程可将一条,或者多条信息放入消息队列

    一个或多个任务可从消息队列中获取消息,当有多个消息发送到消息队列,将先进入消息队列的消息传给任务

    任务得到的是最先进入消息队列的消息,消息队列支持先进先出原则,也支持后进先出原则

    需要注意,读写速度不一致,消息队列为空,或者满载

    消息队列不仅支持任务与任务之间,也支持中断与队列之间,不过中断消息队列函数不支持超时等待

    运作机制

    创建消息队列时,RTOS会给消息队列分配一块内存空间,内存的大小 = 消息队列控制块大小 + 单个消息空间大小 x 消息队列长度

    在分配内存之后,系统会初始化消息队列,此时消息队列为空

    每个消息队列都与消息空间在同一段连续的内存空间,创建成功后,这些内存就被占用

    只有删除了消息队列后,这段内存才会被释放,创建成功,已经分配给每个消息空间与消息队列的容量无法更改

    每个消息空间可存放不大于消息大小 uxItemSize 的任意类型数据,所有消息队列中的消息空间总数就是消息队列的长度,长度可在消息队列创建时指定

    任务或中断给消息队列发送消息时,如果队列未满,或允许覆盖入队,RTOS会将消息拷贝到消息队列队尾

    否则会根据用户指定的阻塞超时时间进行阻塞,一直不允许入队,则任务保持阻塞状态直到允许队列允许入队

    发送紧急消息的过程与发送普通消息几乎一样,不同的是发送紧急消息时,消息的位置是消息队列队列头,而非是队尾(消息先出)

    这样接收者可优先接收紧急消息,可及时进行消息处理

    阻塞机制

    使用的消息队列一般不是属于某个任务的队列,创建的队列是共用的,每个任务都可对队列进行读写操作

    为了保护每个任务对其进行读写操作的过程,必须要有阻塞机制

    某个任务对消息队列读写操作时,必须保证任务能完成读写操作,不受其他任务的干扰

    保护任务对消息队列的读写操作的过程,这种机制称为阻塞机制

    任务A读队列,当消息队列没有消息,则A有三种可能

    1、任务A继续执行,不会陷入阻塞

    2、任务A阻塞,等待消息,可以指定等待时长,超时则变回就绪态,发送任务返回错误码(有消息就变回就绪态,准备继续执行)

    3、任务A一直阻塞,直到队列中有消息,完成消息队列的读取

    需要注意的是,中断中发送消息是不支持消息等待的,也是允许阻塞的,这会影响系统的实时性

    多个任务因为一个消息队列阻塞,阻塞任务会按照任务的优先级进行排序,优先级高的优先访问队列

    消息队列控制块

    typedef struct QueueDefinition 
    {
    	int8_t *pcHead;					// 消息队列存储区的起始位置,也就是第一个消息的地址
    	int8_t *pcTail;					// 指向内存中消息队列存储区最后一个字节,int8 *类型
    	int8_t *pcWriteTo;				// 指向下一个可用消息空间,可写入消息空间地址
    	union 
    	{
    		int8_t *pcReadFrom;			// 用作队列时,指向最后一个出队的队列项首地址
    		UBaseType_t uxRecursiveCallCount;	// 记录递归互斥量被调用次数
    	} u;
    
    	List_t xTasksWaitingToSend; 	// 等待发送消息列表,挂载列表项,因队列满发送失败进入阻塞态的任务会挂载在该列表,按优先级进行排序
    	List_t xTasksWaitingToReceive;	// 等待读取消息列表,挂载列表项,因队列空读取失败进入阻塞态的任务会挂载在该列表,按优先级进行排序
    	volatile UBaseType_t uxMessagesWaiting;// 记录当前消息队列已用消息个数,若用于信号量,表示已用信号量个数
    	UBaseType_t uxLength;			// 记录消息队列长度,队列深度,可存储消息个数量
    	UBaseType_t uxItemSize;			// 记录单个消息的大小
    	volatile int8_t cRxLock;		// 队列上锁后,从队列接收的消息数目,读取多少;没上锁,则设置为queueUNLOCKED
    	volatile int8_t cTxLock;		// 队列上锁后,发送到队列的消息数目,写入多少;没上锁,则设置为queueUNLOCKED
        								// 成员变量为queueLOCKED_UNMODIFIED ,表示队列上锁
    	#if( ( configSUPPORT_STATIC_ALLOCATION == 1 ) && ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) )
    		uint8_t ucStaticallyAllocated;		// 使用静态存储则设置为pdTRUE
    	#endif
    
    	#if ( configUSE_QUEUE_SETS == 1 )		// 队列结相关宏,其实就是队列控制块指针
    		struct QueueDefinition *pxQueueSetContainer;
    	#endif
    
    	#if ( configUSE_TRACE_FACILITY == 1 )	// 跟踪调试相关宏
    		UBaseType_t uxQueueNumber;
    		uint8_t ucQueueType;
    	#endif
    } xQUEUE;
    typedef xQUEUE Queue_t;
    

    API函数

    创建

    #if( configSUPPORT_DYNAMIC_ALLOCATION == 1 )
    	#define xQueueCreate( uxQueueLength, uxItemSize ) \
    			xQueueGenericCreate( ( uxQueueLength ), ( uxItemSize ), ( queueQUEUE_TYPE_BASE ) )
    #endif
    
    QueueHandle_t xQueueCreate( UBaseType_t uxQueueLength,UBaseType_t uxItemSize );
    // uxQueueLength:消息队列的消息个数(能放几个消息)
    // uxItemSize:每个消息的大小,单位字节(8位)
    // 创建成功返回队列的句柄,创建失败返回NULL,失败原因一般是RAM无法分配成功
    

    用于创建一个新的队列,返回可用于访问这个队列的队列句柄(需要我们自行定义句柄,才可以操作消息队列)

    队列句柄就是一个 指向队列数据结构类型 的指针

    队列就是一个数据结构,用于任务间的数据传递

    每创建一个新的队列都需要分配RAM,一部分用于存储队列的状态,剩下部分作为队列消息的存储区域

    xQueueCreate()为动态创建,使用静态创建时,需要事先定义空间进行分配

    #define xQueueCreate( uxQueueLength, uxItemSize ) \
    		xQueueGenericCreate( ( uxQueueLength ), ( uxItemSize ), ( queueQUEUE_TYPE_BASE ) )
    
    /*  ucQueueType表示队列类型
    	queueQUEUE_TYPE_BASE:表示消息队列 。
    	queueQUEUE_TYPE_SET:表示消息队列集合 。
    	queueQUEUE_TYPE_MUTEX:表示互斥量。
    	queueQUEUE_TYPE_COUNTING_SEMAPHORE:表示计数信号量 。
    	queueQUEUE_TYPE_BINARY_SEMAPHORE:表示二进制信号量 。
    	queueQUEUE_TYPE_RECURSIVE_MUTEX :表示递归互斥量。*/
    
    QueueHandle_t xQueueGenericCreate( 	const UBaseType_t uxQueueLength,
                                      	const UBaseType_t uxItemSize,
                                      	const uint8_t ucQueueType )
    {
    	Queue_t *pxNewQueue;
    	size_t xQueueSizeInBytes;									// 消息队列消息长度(个数)
    	uint8_t *pucQueueStorage;									// 消息队列消息起始地址
    
    	configASSERT( uxQueueLength > ( UBaseType_t ) 0 );			// 判定uxQueueLength队列消息长度是否为0
    	if ( uxItemSize == ( UBaseType_t ) 0 )                      // 消息长度0则不需要分配空间消息空间
    	{
    	     xQueueSizeInBytes = ( size_t ) 0;						// 不必给消息分配空间,只需要给控制块分配空间
    	} 
    	else                                                                        
    	{
    	     xQueueSizeInBytes = ( size_t ) ( uxQueueLength * uxItemSize );	// 消息空间大小 = 单个消息大小 * 消息个数 
    	}
    																// 申请内存,大小 = 消息队列控制块 + 消息空间
    	pxNewQueue = (Queue_t*)pvPortMalloc(sizeof(Queue_t) + xQueueSizeInBytes);
    	if ( pxNewQueue != NULL ) 									// 内存分配成功
    	{
    		// 计算消息队列消息起始地址 = 申请的地址 + 消息队列控制块,也就是消息队列开头是  |控制块|消息空间| 
    		pucQueueStorage = ( ( uint8_t * ) pxNewQueue ) + sizeof( Queue_t );
    
    		#if( configSUPPORT_STATIC_ALLOCATION == 1 )
    			pxNewQueue->ucStaticallyAllocated = pdFALSE;		// 静态分配内存设置为pdTRUE,动态配置为pdFALSE
    		#endif
    
    		prvInitialiseNewQueue(	uxQueueLength,					// 初始化消息队列控制块函数,消息个数
    								uxItemSize,						// 单个消息大小
    								pucQueueStorage,				// 消息存储空间首地址
    								ucQueueType,					// 消息队列类型
    								pxNewQueue );					// 控制块,已经分配空间
    	}
    	return pxNewQueue;											// 最终返回的句柄,是指向消息存储空间的指针 */
    }
    #endif
    
    static void prvInitialiseNewQueue(	const UBaseType_t uxQueueLength,	// 消息队列消息个数
    									const UBaseType_t uxItemSize,		// 消息队列单个消息大小
    									uint8_t *pucQueueStorage,			// 存储消息起始地址
    									const uint8_t ucQueueType,			// 消息队列类型
    									Queue_t *pxNewQueue )				// 消息队列控制块
    {
    	( void ) ucQueueType;
    	if ( uxItemSize == ( UBaseType_t ) 0 )					// 单个消息大小为0,pcHead直接指向队列控制块
    	{
    	     pxNewQueue->pcHead = ( int8_t * ) pxNewQueue;		// pcHead指向队列控制块,仅在队列用于互斥量,才能为NULL
    	} 
    	else 
    	{
    	     pxNewQueue->pcHead = ( int8_t * ) pucQueueStorage;	// pcHead指向申请空间中,消息存储的位置 |控制器|消息
    	}
    	pxNewQueue->uxLength = uxQueueLength;   				// 在控制块中记录消息长度
    	pxNewQueue->uxItemSize = uxItemSize;    				// 在控制块中记录单个消息大小
    	( void ) xQueueGenericReset( pxNewQueue, pdTRUE );		// 重置消息队列,重新设置某些参数
    
    	#if ( configUSE_TRACE_FACILITY == 1 )
    	      pxNewQueue->ucQueueType = ucQueueType;			// 记录消息队列类型
    	#endif
    
    	#if( configUSE_QUEUE_SETS == 1 )
    	     pxNewQueue->pxQueueSetContainer = NULL;			// 消息队列集先关字段
    	#endif
    	traceQUEUE_CREATE( pxNewQueue );
    }
    
    BaseType_t xQueueGenericReset( QueueHandle_t xQueue,BaseType_t xNewQueue )
    {
    	Queue_t * const pxQueue = ( Queue_t * ) xQueue;			// 获取队列控制块指针,以便操作队列
    	configASSERT( pxQueue );								// 断言函数
    
    	taskENTER_CRITICAL();   								// 临界段屏蔽代码
    	{                                						// 尾指针 = 队列控制块地址 + 队列长度 * 单个消息大小 
    	     pxQueue->pcTail = pxQueue->pcHead + ( pxQueue->uxLength * pxQueue->uxItemSize );
    	     pxQueue->uxMessagesWaiting = ( UBaseType_t ) 0U;	// 记录当前使用消息的数量
    	     pxQueue->pcWriteTo = pxQueue->pcHead;				// 下一个可写入消息地址,写入到这个地址
    		/* 下一可读消息地址 = 队列存储空间首地址 + (队列长度 - 1)* 单个消息大小,读取最后一个消息 */
    	     pxQueue->u.pcReadFrom = pxQueue->pcHead + (( pxQueue->uxLength - ( UBaseType_t ) 1U ) * pxQueue->uxItemSize );
    	     pxQueue->cRxLock = queueUNLOCKED; 					// 消息队列未上锁
    	     pxQueue->cTxLock = queueUNLOCKED;
    	     if ( xNewQueue == pdFALSE ) 						// 不是新建的消息队列,可能消息队列阻塞了任务,需解除阻塞
    	     {													// 读取等待发送消息任务列表是否为空,判定有发送任务阻塞否
    	           if ( listLIST_IS_EMPTY ( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE ) 
    	           {											// 将任务从等待发送消息列表中清除
    	                 if ( xTaskRemoveFromEventList ( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE ) 
    	                 {
    	                      queueYIELD_IF_USING_PREEMPTION();	// 上下文切换
    	                 } 
    	                 else 
    	                 {
    	                      mtCOVERAGE_TEST_MARKER();
    	                 }
    	           } 
    	           else 										// 如果读取消息任务被阻塞,重置后消息队列为空,无需被恢复
    	           {
    	                 mtCOVERAGE_TEST_MARKER();
    	           }
    	     } 
    	     else 												// 消息队列是新建的
    	     {													// 是新建的消息队列直接初始化等待发送列表,初始化等待接收/发送列表
    	           vListInitialise( &( pxQueue->xTasksWaitingToSend ) );
    	           vListInitialise( &( pxQueue->xTasksWaitingToReceive ) );
    	     }
    	}
    	taskEXIT_CRITICAL();									// 退出临界段
    	return pdPASS;
    }
    

    创建消息队列时,需要用户自行定义消息队列的句柄

    定义了队列的句柄并不等于创建队列,创建队列必须调用消息队列创建函数进行创建,否则根据句柄使用消息队列会发生错误

    通过句柄可使用消息队列进行发送与读取消息队列操作,如果返回NULL表示创建失败

    QueueHandle_t Test_Queue =NULL;
    
    #define QUEUE_LEN 4 				// 队列的长度,最大可包含多少个消息
    #define QUEUE_SIZE 4 				// 队列中每个消息大小(字节)
    
    BaseType_t xReturn = pdPASS;		// 定义一个创建信息返回值,默认为 pdPASS
    taskENTER_CRITICAL(); 				//进入临界区
    									// 创建 Test_Queue
    Test_Queue = xQueueCreate(	(UBaseType_t ) QUEUE_LEN,	// 消息队列的长度
    							(UBaseType_t ) QUEUE_SIZE);	// 消息的大小
    if (NULL != Test_Queue)
    	printf("创建 Test_Queue 消息队列成功!\r\n");
    taskEXIT_CRITICAL(); 				//退出临界区
    

    创建成功则返回队列的句柄,通过这个句柄可以对消息队列进行读写操作

    创建不成功则一定是RAM分配不成功的问题,返回值为NULL

    #define xQueueCreateStatic( uxQueueLength, uxItemSize, pucQueueStorage, pxQueueBuffer ) \
    		xQueueGenericCreateStatic(uxQueueLength,uxItemSize,pucQueueStorage,pxQueueBuffer,queueQUEUE_TYPE_BASE)
    
    QueueHandle_t xQueueCreateStatic(	UBaseType_t uxQueueLength,		// 消息队列消息个数
    									UBaseType_t uxItemSize,			// 消息队列单个消息大小
    									uint8_t *pucQueueStorageBuffer,	// 分配给消息队列的静态内存
    									StaticQueue_t *pxQueueBuffer );	// 用于存储队列的指针
    
    QueueHandle_t xQueueGenericCreateStatic(	const UBaseType_t uxQueueLength,
                                            	const UBaseType_t uxItemSize, 
                                            	uint8_t *pucQueueStorage, 
                                            	StaticQueue_t *pxStaticQueue, 
                                            	const uint8_t ucQueueType )
    	{
    		Queue_t *pxNewQueue;
    		configASSERT( uxQueueLength > ( UBaseType_t ) 0 );			// 判定消息队列消息长度
    		configASSERT( pxStaticQueue != NULL );						// 判定静态内存是否为空
    
    		configASSERT( !( ( pucQueueStorage != NULL ) && ( uxItemSize == 0 ) ) );
    		configASSERT( !( ( pucQueueStorage == NULL ) && ( uxItemSize != 0 ) ) );
    
    		#if( configASSERT_DEFINED == 1 )
    		{
    			volatile size_t xSize = sizeof( StaticQueue_t );		// 根据静态内存分配消息队列控制块的内存
    			configASSERT( xSize == sizeof( Queue_t ) );
    		}
    		#endif
    		pxNewQueue = ( Queue_t * ) pxStaticQueue;					// 消息队列控制块
    
    		if( pxNewQueue != NULL )
    		{
    			#if( configSUPPORT_DYNAMIC_ALLOCATION == 1 )
    			{
    				pxNewQueue->ucStaticallyAllocated = pdTRUE;			// 静态分配内存
    			}
    			#endif													// 初始化队列控制块
    			prvInitialiseNewQueue( uxQueueLength, uxItemSize, pucQueueStorage, ucQueueType, pxNewQueue );
    		}
    		else
    		{
    			traceQUEUE_CREATE_FAILED( ucQueueType );
    		}
    		return pxNewQueue;
    	}
    

    创建成功,返回的是消息队列的句柄

    消息队列的句柄其实是指向队列的指针

    每创建一个队列,都要为队列分配RAM,一部分存储消息队列的队列控制块,一部分用于存储消息队列

    删除

    通过消息队列句柄,清空消息队列控制块,然后这块内存被释放,不能再被使用消息队列

    删除后消息队列的所有信息都会被系统回收清空,并且不能再次使用这个消息队列

    如果消息队列没有被创建,则无法被删除

    vQueueDelete()可用于删除消息队列,也可用于删除信号量

    删除消息队列时,如果有任务正在等待消息,则不应该进行删除操作(并不禁止,只是自己不要操作)

    #define QUEUE_LENGTH 5
    #define QUEUE_ITEM_SIZE 4
    
    int main( void )
    {
    	QueueHandle_t xQueue;
    	xQueue = xQueueCreate( QUEUE_LENGTH, QUEUE_ITEM_SIZE );	// 创建消息队列
    	if ( xQueue == NULL ) 
    	{
    									// 消息队列创建失败
    	}
    	else 
    	{
    		vQueueDelete( xQueue );		// 删除已创建的消息队列
    	}
    }
    
    void vQueueDelete( QueueHandle_t xQueue )
    {
    	Queue_t * const pxQueue = ( Queue_t * ) xQueue;		// 消息队列控制块,通过此控制消息队列,const避免修改
    
    	configASSERT( pxQueue );
    	traceQUEUE_DELETE( pxQueue );
    
    	#if ( configQUEUE_REGISTRY_SIZE > 0 )
    	{
    		vQueueUnregisterQueue( pxQueue );				// 将消息队列从注册表中删除
    	}
    	#endif
    
    	#if( ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) && ( configSUPPORT_STATIC_ALLOCATION == 0 ) )
    	{													// 只支持动态分配内存
    		vPortFree( pxQueue );							// 因为使用动态分配内存,需要free函数释放空间
    	}
    	#elif( ( configSUPPORT_DYNAMIC_ALLOCATION == 1 ) && ( configSUPPORT_STATIC_ALLOCATION == 1 ) )
    	{													// 静态或动态都支持
    		if( pxQueue->ucStaticallyAllocated == ( uint8_t ) pdFALSE )	// 如果是动态分配,会调用释放内存函数
    		{
    			vPortFree( pxQueue );
    		}
    		else
    		{
    			mtCOVERAGE_TEST_MARKER();
    		}
    	}
    	#else
    	{
    		( void ) pxQueue;
    	}
    	#endif
    }
    

    发送

    任务或中断都可以给消息队列发送消息

    发送消息时,队列未满或允许覆盖入队,RTOS会将消息拷贝到消息队列队尾巴,否则会进入阻塞

    如果队列一直不允许入队,任务会保持阻塞直到允许入队,或者阻塞时间到达系统恢复阻塞态,函数返回错误信息

    发送紧急消息过程与发送普通消息一致,不同的是发送紧急消息,消息的位置在消息队列队头而非队尾

    xQueueSend()用于向队列队尾发送一个队列消息

    消息以拷贝的形式入队,而不是传入地址通过指针引用

    在中断中不允许使用api函数,必须使用带中断保护的的api函数,xQueueSendFromISR()

    当队列上锁后,队列向可加入或者溢出,但是事件列表中两个列表(WwaiToSend/Rec)的任务事件列表不会变化(操作不改变)

    #define xQueueSend( xQueue, pvItemToQueue, xTicksToWait ) \
       	xQueueGenericSend( ( xQueue ),( pvItemToQueue ),( xTicksToWait ), queueSEND_TO_BACK )	// 普通发送,发送到队列尾
    #define xQueueSendToBack( xQueue, pvItemToQueue, xTicksToWait ) \
       	xQueueGenericSend( ( xQueue ),( pvItemToQueue ),( xTicksToWait ), queueSEND_TO_BACK )	// 普通发送,发送到队列尾
    #define xQueueSendToFront( xQueue, pvItemToQueue, xTicksToWait ) \
       	xQueueGenericSend( ( xQueue ),( pvItemToQueue ),( xTicksToWait ), queueSEND_TO_FRONT )	// 紧急发送,发送到队列首
    #define xQueueOverwrite( xQueue, pvItemToQueue )\
       	xQueueGenericSend( ( xQueue ), ( pvItemToQueue ), 0, queueOVERWRITE )					// 覆盖发送,替换指向位置
       // xQueue:队列句柄
       // pvItemToQueue:指向要发送到队列尾部的消息
       // xTicksToWait:队列为满时,任务等待消息队列的最大超时时间。如果队列已满,等待时间为0,函数立刻返回
       // queueSEND_TO_BACK:表示发送到队列尾
       // queueSEND_TO_FRONT:表示发送到队列队首
       // 消息发送成功则返回 pdTrue,消息发送失败且任务阻塞时间到达则返回 errQUEUE_FULL
    
    #define xQueueSendToFrontFromISR(xQueue,pvItemToQueue,pxHigherPriorityTaskWoken) \
       	xQueueGenericSendFromISR( ( xQueue ),( pvItemToQueue ),( pxHigherPriorityTaskWoken ), queueSEND_TO_FRONT )
    #define xQueueSendToBackFromISR(xQueue,pvItemToQueue,pxHigherPriorityTaskWoken) \
       	xQueueGenericSendFromISR( ( xQueue ),( pvItemToQueue ),( pxHigherPriorityTaskWoken ), queueSEND_TO_BACK )
       // pxHigherPriorityTaskWoken:
       // 如果中断中消息入队导致一个任务解锁,解锁的任务优先级高于之前被中断的任务,则需要设定为pdTRUE
       // 在中断退出前需要进行一次上下文切换,执行被唤醒的优先级更高的任务,待高优先级任务执行完毕再执行之前被中断的任务
    
    void vBufferISR( void )									// 中断处理函数
    {
    	char cIn;
    	BaseType_t xHigherPriorityTaskWoken;				// 定义变量pxHigherPriorityTaskWoken
    	xHigherPriorityTaskWoken = pdFALSE;					// 不会导致高优先任务被唤醒
    	/* 直到缓冲区为空 */
    	do 
        {
    		cIn = portINPUT_BYTE( RX_REGISTER_ADDRESS );	// 从缓冲区获取一个字节的数据
    		xQueueSendFromISR( xRxQueue, &cIn, &xHigherPriorityTaskWoken );	// 发送这个数据到消息队列中
    	}
        while ( portINPUT_BYTE( BUFFER_COUNT ) );			// 消息队列读空之后,此时会导致任务切换
    	if ( xHigherPriorityTaskWoken )						// 如果导致任务切换
        {
    		taskYIELD_FROM_ISR ();							// 上下文切换,这是一个宏,不同的处理器,具体的方法不一样
    	}
    }
    

    阻塞时间不为0,任务因等待入队进入阻塞,在将任务设置为阻塞的过程中,不允许其他任务或中断操作对应列表,这回导致其他任务解除阻塞,导致任务优先级翻转

    BaseType_t xQueueGenericSend(	QueueHandle_t xQueue,				// 消息队列句柄,通过句柄控制消息队列
    								const void * const pvItemToQueue,	// 要发送的消息
    								TickType_t xTicksToWait,			// 写入消息队列最大阻塞时间
    								const BaseType_t xCopyPosition )	// 发送到消息队列位置,queueOVERWRITE以覆盖方式写入
    {
    	BaseType_t xEntryTimeSet = pdFALSE, xYieldRequired;				// 定义变量
    	TimeOut_t xTimeOut;
    	Queue_t * const pxQueue = ( Queue_t * ) xQueue;					// 定义常量句柄防止被修改
    
    	for ( ;; )
    	{
    		taskENTER_CRITICAL();										// 进入临界区
    		{															// 该情况为可写情况
    			if ( ( pxQueue->uxMessagesWaiting < pxQueue->uxLength )|| ( xCopyPosition == queueOVERWRITE ) )
    			{														// 已用消息数量没有达到消息队列深度;覆盖写入方式入,都允许写入
    				traceQUEUE_SEND( pxQueue );
    				xYieldRequired = prvCopyDataToQueue( pxQueue, pvItemToQueue, xCopyPosition );		// 拷贝消息到消息队列中
    				// queueSEND_TO_BACK写入 pcWriteTo , queueSEND_TO_FRONT 或 queueOVERWRITE 拷贝到 u.pcReadFrom (紧急消息)
    				if ( listLIST_IS_EMPTY(&(pxQueue->xTasksWaitingToReceive))==pdFALSE)				// 查询的等待消息列表是否为空
    				{// 现在有消息了,将任务从等待接收列表中xTasksWaitingToReceive删除,添加到就绪列表中。
    					if ( xTaskRemoveFromEventList(&( pxQueue->xTasksWaitingToReceive ) )!=pdFALSE)	// 任务从等待接收消息列表删除
    					{
    						queueYIELD_IF_USING_PREEMPTION(); 	// 进行任务切换,是否有高优先级任务切换 
    					}
    					else									// 等待接收消息列表删除失败
    					{
    						mtCOVERAGE_TEST_MARKER();
    					}
    				}
    				else if ( xYieldRequired != pdFALSE )		// 消息拷贝成功
    				{
    					queueYIELD_IF_USING_PREEMPTION();		// 任务切换
    				}
    				else										// 消息拷贝失败
    				{
    					mtCOVERAGE_TEST_MARKER();
    				}
    	
    				taskEXIT_CRITICAL();
    				return pdPASS;								// 至此消息都拷贝成功
    			}
    			else											// 消息队列已经写满并且不是覆写方式,也就是无法写入
    			{
    				if ( xTicksToWait == ( TickType_t ) 0 )		// 指定阻塞时间为0
    				{
    					taskEXIT_CRITICAL();					// 退出临界保护区
    					traceQUEUE_SEND_FAILED( pxQueue );
    					return errQUEUE_FULL;					// 直接退出,返回错误信息
    				}
    				else if ( xEntryTimeSet == pdFALSE )		// 队列已满,指定了阻塞时间
    				{
    					/* 设置事件结构体,记录进入阻塞的时间 xTickCount 和溢出次数 xNumOfOverflows */
    					vTaskSetTimeOutState( &xTimeOut );
    					xEntryTimeSet = pdTRUE;
    				}
    				else
    				{
    					mtCOVERAGE_TEST_MARKER();
    				}
    			}
    		}													// 至此消息都拷贝失败,记录了开始阻塞的时间
    		taskEXIT_CRITICAL();								// 退出临界保护区
    		vTaskSuspendAll();									// 关闭调度器,挂所有任务(任务要进入阻塞)
    		prvLockQueue( pxQueue );							// 锁住消息队列
    		if (xTaskCheckForTimeOut(&xTimeOut, &xTicksToWait)==pdFALSE)	// 根据时间结构体,检测超时时间是否溢出
    		{													// 超时时间没有溢出
    			if ( prvIsQueueFull( pxQueue ) != pdFALSE )		// 检测消息队列是否为满
    			{
    				traceBLOCKING_ON_QUEUE_SEND( pxQueue );		// 消息队列为满,根据设定的阻塞时间阻塞任务
    															// 当前任务的 EventList挂载在 xTasksWaitingToSend下
                    											// 根据延时的时间,将任务 xStateList 挂载在 xTasksWaitingToSend下
    				vTaskPlaceOnEventList(&( pxQueue->xTasksWaitingToSend ), xTicksToWait );
    				prvUnlockQueue( pxQueue );					// 消息队列解锁
    	
    				if ( xTaskResumeAll() == pdFALSE )			// 恢复调度器
    				{
    					portYIELD_WITHIN_API();					// 
    				}
    			}
    			else											// 消息队列没有满,消息队列有空闲消息
    			{
    				prvUnlockQueue( pxQueue );					// 消息队列解锁
    				( void ) xTaskResumeAll();					// 恢复调度器,所有任务恢复
    			}
    		}
    		else												// 超时时间溢出
    		{
    			prvUnlockQueue( pxQueue );						// 解锁消息队列
    			( void ) xTaskResumeAll();						// 恢复调度器,恢复所有任务
    			traceQUEUE_SEND_FAILED( pxQueue );				// 
    			return errQUEUE_FULL;							// 发送失败返回错误信息
    		}
    	}
    }
    
    BaseType_t xQueueGenericSendFromISR(	QueueHandle_t xQueue,							// 消息队列控制块
       									const void * const pvItemToQueue,				// 发送到队列的信息
       									BaseType_t * const xHigherPriorityTaskWoken,	// 
       									const BaseType_t xCopyPosition )				// 要发送到队列的位置
    {
       BaseType_t xReturn;
       UBaseType_t uxSavedInterruptStatus;
       Queue_t * const pxQueue = ( Queue_t * ) xQueue;
       uxSavedInterruptStatus = portSET_INTERRUPT_MASK_FROM_ISR();
       {
       	if ( ( pxQueue->uxMessagesWaiting < pxQueue->uxLength )|| ( xCopyPosition == queueOVERWRITE ) )
       	{// 已用消息不到消息队列深度 或 写入方式是覆写方式
       		const int8_t cTxLock = pxQueue->cTxLock;
       		traceQUEUE_SEND_FROM_ISR( pxQueue );
       		(void)prvCopyDataToQueue(pxQueue,pvItemToQueue,xCopyPosition );	// 复制消息到指定的消息队列中
    
       		if ( cTxLock == queueUNLOCKED )									// 判断消息队列是否上锁
       		{
       			/* 已删除使用队列集部分代码 */
       			{
       			/* 等待消息接收队列中有任务阻塞 */
       				if ( listLIST_IS_EMPTY(&( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE )
       				{
       					/* 将任务从等待接收队列中删除,恢复为就绪态 */
       					if ( xTaskRemoveFromEventList(&( pxQueue->xTasksWaitingToReceive )) != pdFALSE )
       					{
       						if ( pxHigherPriorityTaskWoken != NULL )
       						{
       							/* 解除阻塞任务优先级比当前任务高,记录上下文切换请求,返回中断服务程序后(退出中断前),进行上下文切换 */
       							*pxHigherPriorityTaskWoken = pdTRUE;
       						}
       						else
       						{
       							mtCOVERAGE_TEST_MARKER();
       						}
       					}
       					else
       					{
       						mtCOVERAGE_TEST_MARKER();
       					}
       				}
       				else
       				{
       					mtCOVERAGE_TEST_MARKER();
       				}
       			}
       		}
       		else
       		{/* 队列上锁,记录上锁次数,等到任务解除队列锁时,使用这个计录数就可以知道有多少数据入队 */
       			pxQueue->cTxLock = ( int8_t ) ( cTxLock + 1 );
       		}
       		xReturn = pdPASS;	// 写入成功,后续返回
       	}
       	else					// 队列无法写入
       	{
       		/* 队列是满的,因为 API 执行的上下文环境是中断,所以不能阻塞,直接返回队列已满错误代码 errQUEUE_FULL */
       		traceQUEUE_SEND_FROM_ISR_FAILED( pxQueue );
       		xReturn = errQUEUE_FULL;
       	}
       }
       portCLEAR_INTERRUPT_MASK_FROM_ISR( uxSavedInterruptStatus );
       return xReturn;
    }
    

    读消息

    任务从队列中读消息,可以指定阻塞超时时间,当消息队列中有效细,任务才会读取消息。

    当消息过于庞大的时候,可以将消息的地址作为消息进行发送,任务通过地址来进行读取。

    #define xQueueReceive( xQueue, pvBuffer, xTicksToWait) \
    		xQueueGenericReceive( xQueue, pvBuffer, xTicksToWait, pdFALSE)	// 读取后删除队列项
    #define xQueuePeek( xQueue, pvBuffer, xTicksToWait) \
    		xQueueGenericReceive( xQueue, pvBuffer, xTicksToWait, pdTRUE)	// 读取后不删除队列项
    // xQueue:队列句柄,通过句柄操作消息队列
    // pvBuffer:要发送消息的地址
    // xTicksToWait:最大阻塞时间
    // xJustPeek: 标记当读取成功以后是否删除掉队列项, pdTRUE为不删除,pdFLASE为删除
    
    BaseType_t xQueueGenericReceive( QueueHandle_t xQueue, void * const pvBuffer, TickType_t xTicksToWait, const BaseType_t xJustPeeking )
    {
    	BaseType_t xEntryTimeSet = pdFALSE;
    	TimeOut_t xTimeOut;
    	int8_t *pcOriginalReadPosition;
    	Queue_t * const pxQueue = ( Queue_t * ) xQueue;
    	for( ;; )
    	{
    		taskENTER_CRITICAL();
    		{
    			const UBaseType_t uxMessagesWaiting = pxQueue->uxMessagesWaiting;	// 已使用消息数量
    			if( uxMessagesWaiting > ( UBaseType_t ) 0 ) 						// 消息已被使用
    			{
    				pcOriginalReadPosition = pxQueue->u.pcReadFrom; 				// 获取初始读取位置并记录
    
    				prvCopyDataFromQueue( pxQueue, pvBuffer ); 						// 从队列中拷贝数据到数据区
    				if( xJustPeeking == pdFALSE ) 									// 是否删除消息
    				{
    					traceQUEUE_RECEIVE( pxQueue );
    					pxQueue->uxMessagesWaiting = uxMessagesWaiting - 1;			// pfFALSE会删除消息,消息数量减一
    					if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToSend ) ) == pdFALSE ) // 有任务因队列满无法发送阻塞
    					{
    						if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToSend ) ) != pdFALSE ) 
    						// 将任务从等待发送消息列表中删除,并主动进行消息调度
    						{
    							queueYIELD_IF_USING_PREEMPTION();
    						}
    						else
    						{
    						mtCOVERAGE_TEST_MARKER();
    						}
    					}
    					else
    					{
    						mtCOVERAGE_TEST_MARKER();
    					}
    				}
    				else															// 消息不需要删除
    				{
    					traceQUEUE_PEEK( pxQueue );
    					pxQueue->u.pcReadFrom = pcOriginalReadPosition; 			//拷贝数据后,读指针恢复为原始状态
    					if( listLIST_IS_EMPTY( &( pxQueue->xTasksWaitingToReceive ) ) == pdFALSE ) 
    					// 由于数据并没有被删除,所以如果有任务还在请求队列数据的,仍然可以拿数据,查看是否还有等待这个消息的任务
    					{
    						if( xTaskRemoveFromEventList( &( pxQueue->xTasksWaitingToReceive ) ) != pdFALSE ) 
    						// 从对应事件表或状态表删除并加入就绪表或挂起就绪表,并酌情调度,有tickless的系统,还要刷新最新任务解锁时间
    						{
    							/* The task waiting has a higher priority than this task. */
    							queueYIELD_IF_USING_PREEMPTION();
    						}
    						else
    						{
    							mtCOVERAGE_TEST_MARKER();
    						}
    					}
    					else
    					{
    						mtCOVERAGE_TEST_MARKER();
    					}
    				}
    				taskEXIT_CRITICAL();
    				return pdPASS;							// 至此消息读取成功,返回成功的消息
    			}
    			else 										// 没有有效消息
    			{
    				if( xTicksToWait == ( TickType_t ) 0 )	// 不设置超时,直接返回队列空错误
    				{
    					taskEXIT_CRITICAL();				// 退出临界区
    					traceQUEUE_RECEIVE_FAILED( pxQueue );
    					return errQUEUE_EMPTY;
    				}
    				else if( xEntryTimeSet == pdFALSE )		// 如果设置阻塞时间,记录阻塞时间与溢出次数
    				{
    					vTaskSetTimeOutState( &xTimeOut ); //记录当前系统节拍溢出次数和当前节拍数
    					xEntryTimeSet = pdTRUE;
    				}
    				else
    				{
    					mtCOVERAGE_TEST_MARKER();		/* Entry time was already set. */
    				}
    			}
    		}
    		taskEXIT_CRITICAL();						// 退出临界段,至此消息没有发送成功,从此开始检测阻塞
    		vTaskSuspendAll(); 							// 挂起调度
    		prvLockQueue( pxQueue ); 					// 开读写事务锁
    		/* Update the timeout state to see if it has expired yet. */
    		if( xTaskCheckForTimeOut( &xTimeOut, &xTicksToWait ) == pdFALSE ) 	// 检查是否超时
    		{
    			if( prvIsQueueEmpty( pxQueue ) != pdFALSE )						// 没有超时且队列空
    			{
    				traceBLOCKING_ON_QUEUE_RECEIVE( pxQueue );
    				//按优先级顺序向等待接收表中插入任务控制块的事件表项,并将当前任务从就绪表移除,挂入延时表,更新最新任务解锁时间
    				vTaskPlaceOnEventList( &( pxQueue->xTasksWaitingToReceive ), xTicksToWait );
    				prvUnlockQueue( pxQueue ); 									// 解锁读写事务锁
    				if( xTaskResumeAll() == pdFALSE )
    				{
    					portYIELD_WITHIN_API();
    				}
    				else
    				{
    					mtCOVERAGE_TEST_MARKER();
    				}
    			}															// 在超时时间内检测到队列中有消息
    			else
    			{
    				prvUnlockQueue( pxQueue );
    				( void ) xTaskResumeAll();
    			}
    		}
    		else															// 阻塞时间超时,直接退出
    		{
    			prvUnlockQueue( pxQueue );
    			( void ) xTaskResumeAll();
    			if( prvIsQueueEmpty( pxQueue ) != pdFALSE )
    			{
    				traceQUEUE_RECEIVE_FAILED( pxQueue );
    				return errQUEUE_EMPTY;
    			}
    			else
    			{
    				mtCOVERAGE_TEST_MARKER();
    			}
    		}
    	}
    }
    
    展开全文
  • 消息队列

    2020-11-26 14:56:07
    消息队列 消息 Message 网络中的两台计算机或者两个通讯设备之间传递的数据。例如说:文本、音乐、视频等内容。 队列 Queue 一种特殊的线性表(数据元素首尾相接),特殊之处在于只允许在首部删除元素和在尾部...
  • 引言为什么写这篇文章?博主有两位朋友分别是小A和小B:小A,工作于传统软件行业(某社保局的软件外包公司),每天工作内容就是和产品聊聊需求,改改业务逻辑。...然而,他只会订阅/发布消息。通俗点说,就是调调AP...
  • 【转】消息队列

    2020-11-21 16:08:12
    消息队列不知道大家看到这个词的时候,会不会觉得它是一个比较高端的技术,反正我是觉得它好像是挺牛逼的。 消息队列,一般我们会简称它为MQ(Message Queue),嗯,就是很直白的简写。 我们先不管消息(Message)这个...
  • 常见消息队列对比

    千次阅读 2019-05-27 13:58:32
    一、消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,...
  • Java架构之消息队列 (一):消息队列的概述

    万次阅读 多人点赞 2018-10-08 16:09:38
    消息队列系列分享大纲:  一、消息队列的概述 二、消息队列之RabbitMQ的使用 三、消息队列之Kafka的使用 四、消息队列之RabbitMQ的原理详解 五、消息队列之Kafka的原理详解 六、消息队列之面试集锦 1.消息...
  • 什么是消息队列(Message queue)

    千次阅读 2018-04-13 21:47:05
    最近接触到消息中间件,想通过本篇博文总结一些关于消息队列的知识以及 Java 语言中面向消息中间件(MOM)的API(JMS),希望能够帮助大家更好的理解消息中间件。 一、消息队列概述 消息队列(英语:Message queue)...
  • 分布式事务、重复消费、顺序消费这些大厂常见问题,我觉得你需要了解一下
  • 消息队列MQ的使用流程

    万次阅读 多人点赞 2018-09-19 19:21:21
    在大型平台的分布式项目中,消息队列MQ具有重要的作用,经常用在边缘业务功能的处理中,比如日志管理【下面将以Bug日志保存为例】,因为像日志保存、新用户注册发送邮件等操作都不是主干业务,可以放在消息队列异步...
  • 敖丙大佬面试视屏学习(一)

    万次阅读 多人点赞 2020-03-24 16:29:41
    敖丙面试一个大厂一年经验的程序员 看完之后感觉 自己很多方面的积累还不如人家一个一年经验的人 尤其是今天学习 学的好累 感觉自己要炸裂了 感觉不会的太多了 没想到一个视频给我看清醒了 还是要积累和学习 加油 ...
  • 云大使大咖秀系列已经为大家放送一个月时间的...今天为大家带来一位推广龄仅一月多,却坐收几万佣金的小白大使—敖丙 这位96年的水瓶座小哥哥在今年的双十一活动中正式加入云大使组织,小哥哥说成为云大使的时候活动...
  • 蘑菇街的年会,我没想到全是一群高颜值的小姐姐,OMG
  • 1 为什么要使用消息队列? 回答:这个问题,咱只答三个最主要的应用场景(不可否认还有其他的,但是只答三个主要的),即以下六个字: (1)解耦 传统模式: 传统模式的缺点:系统间耦合性太强,如上图所示,系统A在代码中...
  • 1、消息队列(以下简称MQ)天生就是处理高并发的有力工具,因为他可以把一个完整的流程拆为多部分,并发进行,或者不是很重要的步骤模块延迟进行。大家所熟悉的是消息队列在大基数用户项目的注册模块和电商项目的...
  • 敖丙大神的非科班Java学习路线

    千次阅读 2020-03-20 09:57:32
    你们也知道丙丙一直都是创作鬼才来的,所以我肯定不会一本正经的写,我想了好几个切入点,最后决定用一个完整的电商系统作为切入点,带着大家看看,我们需要学些啥,我甚至还收集配套视频和资料,暖男石锤啊,这期是...
  • Java笔试面试-消息队列面试题总结

    万次阅读 多人点赞 2019-09-26 15:10:12
    1.消息队列的应用场景有哪些? 答:消息队列的应用场景如下。 应用解耦,比如,用户下单后,订单系统需要通知库存系统,假如库存系统无法访问,则订单减库存将失败,从而导致订单失败。订单系统与库存系统耦合,这...
  • Java常用消息队列原理介绍及性能对比

    万次阅读 多人点赞 2017-11-27 20:28:12
    消息队列使用场景为什么会需要消息队列(MQ)? 解耦 在项目启动之初来预测将来项目会碰到什么需求,是极其困难的。消息系统在处理过程中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。...
  • FreeRTOS消息队列

    千次阅读 2019-01-31 17:55:04
    问题解答 曾经有人问我,FreeRTOS那么多API,到底怎么记住呢? 我想说,其实API不难记,就是有点难找,因为FreeRTOS的API很多都是带参宏,所以跳来跳去的比较麻烦,而且注释也很多,要找还真不是那么容易,不过也...
  • 消息队列mq的原理及实现方法

    万次阅读 多人点赞 2016-07-18 20:50:09
    消息队列技术是分布式应用间交换信息的一种技术。消息队列可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读走。通过消息队列,应用程序可独立地执行--它们不需要知道彼此的位置、或在继续执行前不需要等待...
  • [转]Redis作为消息队列与RabbitMQ的性能对比

    万次阅读 热门讨论 2014-06-25 21:32:24
    周末测试了一下RabbitMQ的性能,RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。个人认为,在互联网开发...
  • 消息队列及消息中间件

    千次阅读 2018-08-03 10:08:35
    消息队列 已经逐渐成为企业应用系统 内部通信 的核心手段。它具有 低耦合、可靠投递、广播、流量控制、最终一致性 等一系列功能。 当前使用较多的 消息队列 有 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ...
  • 消息队列”是在消息的传输过程中保存消息的容器。 “消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。 消息被发送到队列中。“消息队列”是在...
  • 大型网站架构之分布式消息队列

    万次阅读 多人点赞 2016-01-26 08:48:40
    大型网站架构之分布式消息队列   以下是消息队列以下的大纲,本文主要介绍消息队列概述,消息队列应用场景和消息中间件示例(电商,日志系统)。 本次分享大纲 消息队列概述消息队列应用场景消息中间件示例...
  • Linux进程间通信——使用消息队列

    万次阅读 多人点赞 2013-08-25 00:09:57
    下面来说说如何用不用消息队列来进行进程间的通信,消息队列与命名管道有很多相似之处。 一、什么是消息队列 消息队列提供了一种从一个进程向另一个进程发送一个数据块的方法。 每个数据块都被认为含有一个类型,...
  • 消息队列系列之分布式消息队列Kafka

    万次阅读 2017-12-03 20:00:11
    在这方面,它类似于消​​息队列或企业消息传递系统。它允许您以容错方式存储记录流。它可以让您在发生记录时处理记录流。 什么是卡夫卡好? 它被用于两大类的应用程序: 构建可在系统或应用程序之间可靠获取数据的...
  • 关于消息队列的使用

    万次阅读 2019-03-05 13:58:31
    一、消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,...

空空如也

1 2 3 4 5 ... 20
收藏数 437,700
精华内容 175,080
关键字:

消息队列