消息队列 订阅
“消息队列”是在消息的传输过程中保存消息的容器。 展开全文
“消息队列”是在消息的传输过程中保存消息的容器。
信息
外文名
Message queue
对    象
两台计算机间
特    点
消息可以非常简单也可以更复杂
含    义
消息的传输过程中保存消息的容器
中文名
消息队列
消息队列消息简介
“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。消息被发送到队列中。“消息队列”是在消息的传输过程中保存消息的容器。消息队列管理器在将消息从它的源中继到它的目标时充当中间人。队列的主要目的是提供路由并保证消息的传递;如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它。
收起全文
精华内容
参与话题
问答
  • 蘑菇街的年会,我没想到全是一群高颜值的小姐姐,OMG

    22:40

    我今天跟歪哥运动完了,早早回到了家里,因为想留半小时给自己去选择明天服装搭配,回家翻箱倒柜,我的衣服大多都是运动风,难登大雅之堂。

    人生再一次到了面临选择和纠结的时候了,这个时候我看到了去年冬天我有的最正式的衣服了。

    是的就是这件原谅绿的大衣,我不想穿西装,虽然我有,但是并不高挑的我穿不出那种帅气。

    今年肯定不会内搭一样颜色的毛衣了,毕竟去年这么穿了之后没多久,我当时的女朋友就跟我分手了,我也不知道是不是上天暗示着我什么,总之我的眼角又湿了…

    那怎么搭配呢,我想有我好友的小伙伴都知道了,格子衬衣,作为一个程序员我还是保留自己最后的倔强吧。

    是的,最后就是这样穿了一下,你们是不是没想到,一个程序员选一件年会的衣服都这么难呢?

    翌日9:00

    这是我们前一日的聊天记录

    是的一年了我们都在小区门口的早餐店吃的早餐,一过去我们不开口,老板娘都知道我们要点两份混沌。

    老板跟我们熟到什么地步,三歪或者我自己一个人吃,老板就会问你那个小伙伴呢,或者谁早去了,老板就会说你小伙伴刚才已经吃了走了,刚走。

    一年了,我们准备吃顿好的,我们来到了平时想都不敢想的早餐店,环境比那好多了,点了葱油拌面和豆浆,真香。

    出小区门口就看到门口开始有大巴驶入了,最后我都跟老大说过多少次了,接我一个人不需要这么铺张浪费真的是,我自己打车也没事啊。

    10:30

    这是我们出发的时间点,因为去会场有很长的时间都不会有吃的,公司热心的准备了补给包,大概有:牛肉干、面包、小吃的、咖啡饮料(可能是怕大家困了吧)

    总之在凉凉冬日,还是比较温暖的举动。

    拿到餐包之后,大家都陆续登车,出发,公司专门为我准备了两个座位,我早就告诉过高层不需要这么铺张浪费的,最后我选择让补给包陪我度过这50分钟的车程。

    我们公司年会在奥体中心那,相当于是从西湖区->萧山区,路上跟小伙伴闲聊时间倒也过得飞快。

    11:50

    我们到达了年会现场,楼下就是G20峰会的场馆,我准备开完年会了下去顺便把首脑会议开了,我是龙族首脑,你呢?

    正门就是我们的蘑菇Logo我是从这个U下面走过去的,不要问我26个字母为啥偏偏选U。

    进去后是红毯和签名墙,我这样低调的人还是不准备留下什么了,其实就是不好意思,因为去签名的,都是身着华服,看着自己的原谅绿大衣,我的眼角,又湿了。

    这就是我们的年会场馆了,因为前半部分都是几大部门的业务总结,所以都是排排坐,开个会,总结过去,展望未来。

    到达之后每个人的位置下面,都有一份伴手礼,有红包和蘑菇的咖啡,后面我才知道,这就是阳光普照555

    开会的内容我就不过多赘述了,因为都是公司自己的东西,所以大家应该也不感兴趣。

    开完会我们现场是要上桌子的,这个时候我们就需要去两边候场了,两边那种拍照区花里花哨的,我觉得没啥东西,后面我会收回这句话的,因为有了妹妹们的点缀,一切都变得不一样了。

    img
    img

    大概就是这样。

    这个时候是不是觉得其实也就一般般,因为没人,后面就大不一样了。

    小姐姐的照片我就不过多赘述了,可以但是没必要,因为实在是太多了,我鼻血到现在还没止住,还有个原因是因为小伙伴的照片嘛,需要征求本人同意我才会放上去,昨天晚上我数了一下,大概592.5个妹妹,我不能全去私聊吧,那多不好意思啊。

    不过节目是真心精彩,我都以为我走到了巴黎时装秀现场,很震撼,这大概也是因为我们公司定位是时尚公司的原因嘛。

    随便放几张图大家感受一下嘛:(文末我会贴上公司年会相册的二维码,有小姐姐哟,也有我,不过要耐心寻找了哈哈)

    所有走秀的都是公司的同学,说真的,比肩专业模特了。

    表演中间都是穿插抽奖和恰饭的,抽奖嘛,前一天晚上我对着镜子一次次练习,自己拿了特等奖之后的获奖感言,但是到最后都没派上用场,将近30台手机一台每中,这都算了,三等奖,二等奖纷纷与我无缘,我的眼角,又湿了…

    晚上酒劲上来,就去跟所有小伙伴喝酒去了,算法工程团队的老伙伴(我现在在数据平台)直接被我全部喝趴下(一个个比我这个贵州人还能喝),我胆子大到跟CEO和COO都喝了一杯,我膨胀了。

    灯红酒绿,觥筹交错之后就是离别了,2019过去了,不开心都在酒里喝下去了,我们要做的就是让2020燥起来。

    回去后还和宗伟、居易他们组了局,看到别的团队妹妹的时候我一次次萌生了转岗的想法,我的眼角再一次…

    我其实很讨厌这样的酒局,真的是,太烦恼了,连个男生都没有,我比较害羞。

    现在是2.57,我睡醒了第一时间来到了公司写文章出来,因为晚上要去朋友的乔迁局,我估计这周末是废了,没时间怼文章了,那还是怼点东西给你们看吧,你们就说吧,是不是暖男来的?

    来到公司看到有个同事在公司,一问他是在等他的女朋友,打扰了,我身边但凡有个女人,至于周末不出去玩,写文章给你们看?

    大家可以大概看一下我们年会整个过程的视频剪辑。

    视频网页放不出,点击原文阅读

    下周有我导演的视频将在我们技术部的年会上播出,可以的话,到时候我给你们看看,还是有点东西的。

    你们要的小姐姐
    你们要的小姐姐

    叫我【丙导】。

    我去500强前端架构家吃饭了,我争取多拍点,这样又能水一篇了哈哈。

    我们下次见!拜拜👋

    点关注,不迷路

    好了各位,以上就是这篇文章的全部内容了,我是敖丙,励志做一名让大家都记得住的博主,能看到这里的人呀,都是人才

    我后面会每周都更新几篇一线互联网大厂面试和常用技术栈相关的文章,非常感谢人才们能看到这里,如果这个文章写得还不错,觉得「敖丙」我有点东西的话 求点赞👍 求关注❤️ 求分享👥 对暖男我来说真的 非常有用!!!

    白嫖不好,创作不易,各位的支持和认可,就是我创作的最大动力,我们下篇文章见!

    敖丙 | 文 【原创】

    如果本篇博客有任何错误,请批评指教,不胜感激 !


    文章每周持续更新,可以微信搜索「 三太子敖丙 」第一时间阅读和催更(比博客早一到两篇哟),本文 GitHub https://github.com/JavaFamily 已经收录,有一线大厂面试点思维导图,也整理了很多我的文档,欢迎Star和完善,大家面试可以参照考点复习,希望我们一起有点东西。

    展开全文
  • 消息队列

    千次阅读 多人点赞 2019-09-19 21:42:59
    消息队列消息队列”是在消息的传输过程中保存消息的容器,当我们需要使用消息的时候可以取出消息供自己使用。 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高...

    消息队列

    “消息队列”是在消息的传输过程中保存消息的容器,当我们需要使用消息的时候可以取出消息供自己使用。

    消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ。

    为什么要用消息队列

    (1) 通过异步处理提高系统性能(削峰、减少响应所需时间)
    在这里插入图片描述
    如上图,在不使用消息队列服务器的时候,用户的请求数据直接写入数据库,在高并发的情况下数据库压力剧增,使得响应速度变慢。但是在使用消息队列之后,用户的请求数据发送给消息队列之后立即 返回,再由消息队列的消费者进程从消息队列中获取数据,异步写入数据库。由于消息队列服务器处理速度快于数据库(消息队列也比数据库有更好的伸缩性),因此响应速度得到大幅改善。

    (2) 降低系统耦合性
    我们知道如果模块之间不存在直接调用,那么新增模块或者修改模块就对其他模块影响较小,这样系统的可扩展性无疑更好一些。
    我们最常见的事件驱动架构类似生产者消费者模式,在大型网站中通常用利用消息队列实现事件驱动结构。如下图所示:
    在这里插入图片描述
    消息队列使利用发布-订阅模式工作,消息发送者(生产者)发布消息,一个或多个消息接受者(消费者)订阅消息。 从上图可以看到消息发送者(生产者)和消息接受者(消费者)之间没有直接耦合,消息发送者将消息发送至分布式消息队列即结束对消息的处理,消息接受者从分布式消息队列获取该消息后进行后续处理,并不需要知道该消息从何而来。对新增业务,只要对该类消息感兴趣,即可订阅该消息,对原有系统和业务没有任何影响,从而实现网站业务的可扩展性设计。

    消息接受者对消息进行过滤、处理、包装后,构造成一个新的消息类型,将消息继续发送出去,等待其他消息接受者订阅该消息。因此基于事件(消息对象)驱动的业务架构可以是一系列流程。

    另外为了避免消息队列服务器宕机造成消息丢失,会将成功发送到消息队列的消息存储在消息生产者服务器上,等消息真正被消费者服务器处理后才删除消息。在消息队列服务器宕机后,生产者服务器会选择分布式消息队列服务器集群中的其他服务器发布消息。

    备注: 不要认为消息队列只能利用发布-订阅模式工作,只不过在解耦这个特定业务环境下是使用发布-订阅模式的。除了发布-订阅模式,还有点对点订阅模式(一个消息只有一个消费者),我们比较常用的是发布-订阅模式。 另外,这两种消息模型是 JMS 提供的,AMQP 协议还提供了 5 种消息模型。

    RocketMq

    RocketMq是一个由阿里巴巴开源的消息中间件,脱胎去阿里每部使用的MetaQ,在设计上借鉴了Kafka。2012年开源,2017年成为apache顶级项目

    RocketMQ 是什么?
    在这里插入图片描述
    上图是一个典型的消息中间件收发消息的模型,RocketMQ也是这样的设计,简单说来,RocketMQ具有以下特点:
    是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。
    Producer、Consumer、队列都可以分布式。
    Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer如果做广播消费,则一个consumer实例消费这个Topic对应的所有队列,如果做集群消费,则多个Consumer实例平均消费这个topic对应的队列集合。
    能够保证严格的消息顺序
    提供丰富的消息拉取模式
    高效的订阅者水平扩展能力
    实时的消息订阅机制
    亿级消息堆积能力
    较少的依赖

    RocketMQ 物理部署结构
    在这里插入图片描述
    如上图所示, RocketMQ的部署结构有以下特点:
    Name Server是一个几乎无状态节点,可集群部署,节点之间无任何信息同步。
    Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave的对应关系通过指定相同的BrokerName,不同的BrokerId来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与Name Server集群中的所有节点建立长连接,定时注册Topic信息到所有Name Server。
    Producer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
    Consumer与Name Server集群中的其中一个节点(随机选择)建立长连接,定期从Name Server取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订阅规则由Broker配置决定。

    RocketMQ 逻辑部署结构
    在这里插入图片描述

    如上图所示,RocketMQ的逻辑部署结构有Producer和Consumer两个特点。
    Producer Group
    用来表示一个发送消息应用,一个Producer Group下包含多个Producer实例,可以是多台机器,也可以是一台机器的多个进程,或者一个进程的多个Producer对象。一个Producer Group可以发送多个Topic消息,Producer Group作用如下:1.标识一类Producer
    2.可以通过运维工具查询这个发送消息应用下有多个Producer实例
    3.发送分布式事务消息时,如果Producer中途意外宕机,Broker会主动回调Producer Group内的任意一台机器来确认事务状态。
    Consumer Group
    用来表示一个消费消息应用,一个Consumer Group下包含多个Consumer实例,可以是多台机器,也可以是多个进程,或者是一个进程的多个Consumer对象。一个Consumer Group下的多个Consumer以均摊方式消费消息,如果设置为广播方式,那么这个Consumer Group下的每个实例都消费全量数据。

    RocketMQ 数据存储结构
    在这里插入图片描述
    如上图所示,RocketMQ采取了一种数据与索引分离的存储方法。有效降低文件资源、IO资源,内存资源的损耗。即便是阿里这种海量数据,高并发场景也能够有效降低端到端延迟,并具备较强的横向扩展能力。

    JMS和AMQP的区别

    JMS
    JMS(JAVA Message Service,java消息服务)是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。

    JMS(JAVA Message Service,Java消息服务)API是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

    ActiveMQ 就是基于 JMS 规范实现的。

    JMS两种消息模型

    ①点到点(P2P)模型
    在这里插入图片描述
    使用队列(Queue)作为消息通信载体;满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。比如:我们生产者发送100条消息的话,两个消费者来消费一般情况下两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费。)

    ② 发布/订阅(Pub/Sub)模型
    在这里插入图片描述
    发布订阅模型(Pub/Sub) 使用主题(Topic)作为消息通信载体,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者,在一条消息广播之后才订阅的用户则是收不到该条消息的。

    JMS 五种不同的消息正文格式
    JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。

    • StreamMessage – Java原始值的数据流
    • MapMessage–一套名称-值对
    • TextMessage–一个字符串对象
    • ObjectMessage–一个序列化的 Java对象
    • BytesMessage–一个字节的数据流

    AMQP
    ​ AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准 高级消息队列协议(二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计,兼容 JMS。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。

    RabbitMQ 就是基于 AMQP 协议实现的。
    在这里插入图片描述
    总结:

    • AMQP 为消息定义了线路层(wire-level protocol)的协议,而JMS所定义的是API规范。在 Java 体系中,多个client均可以通过JMS进行交互,不需要应用修改代码,但是其对跨平台的支持较差。而AMQP天然具有跨平台、跨语言特性。
    • JMS 支持TextMessage、MapMessage 等复杂的消息类型;而 AMQP 仅支持 byte[] 消息类型(复杂的类型可序列化后发送)。
    • 由于Exchange 提供的路由算法,AMQP可以提供多样化的路由方式来传递消息到消息队列,而 JMS 仅支持 队列 和 主题/订阅 方式两种。

    常见的消息队列对比

    在这里插入图片描述
    总结:

    • ActiveMQ 的社区算是比较成熟,但是较目前来说,ActiveMQ 的性能比较差,而且版本迭代很慢,不推荐使用。
    • RabbitMQ 在吞吐量方面虽然稍逊于 Kafka 和 RocketMQ ,但是由于它基于 erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。但是也因为 RabbitMQ 基于 erlang 开发,所以国内很少有公司有实力做erlang源码级别的研究和定制。如果业务场景对并发量要求不是太高(十万级、百万级),那这四种消息队列中,RabbitMQ 一定是你的首选。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。
    • RocketMQ 阿里出品,Java 系开源项目,源代码我们可以直接阅读,然后可以定制自己公司的MQ,并且 RocketMQ 有阿里巴巴的实际业务场景的实战考验。RocketMQ 社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些,然后接口这块不是按照标准 JMS 规范走的有些系统要迁移需要修改大量代码。还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险,那如果你们公司有技术实力我觉得用RocketMQ 挺好的
    • kafka 的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms 级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时 kafka 最好是支撑较少的 topic 数量即可,保证其超高吞吐量。kafka 唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略这个特性天然适合大数据实时计算以及日志收集。

    消息队列应用场景

    1、异步处理
    场景说明:用户注册后,需要发注册邮件和注册短信。传统的做法有两种 1.串行的方式;2.并行方式
    a、串行方式:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。以上三个任务全部完成后,返回给客户端。
    在这里插入图片描述
    b、并行方式:将注册信息写入数据库成功后,发送注册邮件的同时,发送注册短信。以上三个任务完成后,返回给客户端。与串行的差别是,并行的方式可以提高处理的时间
    在这里插入图片描述
    假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。
    因为CPU在单位时间内处理的请求数是一定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)
    小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?

    引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:
    在这里插入图片描述
    按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此用户的响应时间可能是50毫秒。因此架构改变后,系统的吞吐量提高到每秒20 QPS。比串行提高了3倍,比并行提高了两倍。

    2、应用解耦
    场景说明:用户下单后,订单系统需要通知库存系统。传统的做法是,订单系统调用库存系统的接口。如下图:
    在这里插入图片描述
    传统模式的缺点:假如库存系统无法访问,则订单减库存将失败,从而导致订单失败,订单系统与库存系统耦合

    如何解决以上问题呢?引入应用消息队列后的方案,如下图:
    在这里插入图片描述
    订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功
    库存系统:订阅下单的消息,采用拉/推的方式,获取下单信息,库存系统根据下单信息,进行库存操作
    假如:在下单时库存系统不能正常使用。也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了。实现订单系统与库存系统的应用解耦

    3、流量削锋
    流量削锋也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。
    应用场景:秒杀活动,一般会因为流量过大,导致流量暴增,应用挂掉。为解决这个问题,一般需要在应用前端加入消息队列。
    a、可以控制活动的人数
    b、可以缓解短时间内高流量压垮应用
    在这里插入图片描述
    用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面。
    秒杀业务根据消息队列中的请求信息,再做后续处理

    2.4日志处理
    日志处理是指将消息队列用在日志处理中,比如Kafka的应用,解决大量日志传输的问题。架构简化如下
    在这里插入图片描述
    日志采集客户端,负责日志数据采集,定时写受写入Kafka队列
    Kafka消息队列,负责日志数据的接收,存储和转发
    日志处理应用:订阅并消费kafka队列中的日志数据

    5、消息通讯
    消息通讯是指,消息队列一般都内置了高效的通信机制,因此也可以用在纯的消息通讯。比如实现点对点消息队列,或者聊天室等
    点对点通讯:
    在这里插入图片描述
    客户端A和客户端B使用同一队列,进行消息通讯。

    聊天室通讯:
    在这里插入图片描述
    客户端A,客户端B,客户端N订阅同一主题,进行消息发布和接收。实现类似聊天室效果。
    以上实际是消息队列的两种消息模式,点对点或发布订阅模式。模型为示意图,供参考。

    消息中间件示例
    电商系统
    在这里插入图片描述
    消息队列采用高可用,可持久化的消息中间件。比如Active MQ,Rabbit MQ,Rocket Mq。
    (1)应用将主干逻辑处理完成后,写入消息队列。消息发送是否成功可以开启消息的确认模式。(消息队列返回消息接收成功状态后,应用再返回,这样保障消息的完整性)
    (2)扩展流程(发短信,配送处理)订阅队列消息。采用推或拉的方式获取消息并处理。
    (3)消息将应用解耦的同时,带来了数据一致性问题,可以采用最终一致性方式解决。比如主数据写入数据库,扩展应用根据消息队列,并结合数据库方式实现基于消息队列的后续处理。

    日志收集系统
    在这里插入图片描述
    分为Zookeeper注册中心,日志收集客户端,Kafka集群和Storm集群(OtherApp)四部分组成。
    Zookeeper注册中心,提出负载均衡和地址查找服务
    日志收集客户端,用于采集应用系统的日志,并将数据推送到kafka队列
    Kafka集群:接收,路由,存储,转发等消息处理
    Storm集群:与OtherApp处于同一级别,采用拉的方式消费队列中的数据

    参考文章:
    https://www.jianshu.com/p/36a7775b04ec
    https://blog.csdn.net/HD243608836/article/details/80217591

    展开全文
  • 云大使大咖秀系列已经为大家放送一个月时间的...今天为大家带来一位推广龄仅一月多,却坐收几万佣金的小白大使—敖丙 这位96年的水瓶座小哥哥在今年的双十一活动中正式加入云大使组织,小哥哥说成为云大使的时候活动...

    云大使大咖秀系列已经为大家放送一个月时间的时间,这期间越来越多的新大使涌入进来,也有很多大使被成功大使的经验鼓舞着激励着,小编在此感谢大家对云大使大咖秀系列的喜爱和支持,未来我们会出品更多的精品文章,也希望更多有经验有想法的大使加入我们。
    在这里插入图片描述
    今天为大家带来一位推广龄仅一月多,却坐收几万佣金的小白大使—敖丙
    这位96年的水瓶座小哥哥在今年的双十一活动中正式加入云大使组织,小哥哥说成为云大使的时候活动已经开始了,虽然失去了活动前期布道的先机,但是小哥哥依然稳准狠的抓住了每一个机会,双十一排位战中小哥哥依然取得了58名不错的好成绩。

    23岁的敖丙来自贵州遵义的小伙子,回忆当初,“在大三学分提前修满的情况下,在某为某阿里系电商任职过(现在在蘑菇街算法工程部门)之前带我的两任师傅分别是,某五百强架构师团队Leader(现在是阿里技术专家)和阿里P8,所以接触的场景的丰富性,涉足领域的专业的深度性也使我有着自己独特的获客渠道和推广方式。”

    对于之前二十几年的人生,敖丙谈及“人生是一个漫长的过程,我们怀有太多的期望,就难免会遭遇失望与挫折”。我惊讶于一个只有23岁的刚毕业的莘莘学子为何会有这么强烈的人生感悟,直到小编了解到他的世界。

    在大学期间敖丙就是那种交不起学费的仔,19年年初,也就是在毕业的半年后,他还完了自己的助学贷款。说起那是,敖丙表示“说实话当时感觉无债一身轻,开始踏上了自己的职业生涯,以为是终点,后来才知道这不过是一年的开始,还有太多太多等着我。“

    工作半年时,敖丙便收获了自己的第一个年终奖,收到年终奖的那一刻,就决定给爸妈换掉卡得不行的手机,买了比当时比自己的手机还要好的苹果8p顶配,不知道为啥给爸妈买啥都不觉得心疼钱,反而还觉的很开心。老妈经常跟他抱怨手机看电视卡,每次他说要买手机的时候都说不喜欢,就像是每次叫她旅游,她说没时间一样,其实敖丙知道是心疼钱,对于父母而言更加明白子女打拼的心酸和不易,小编想大概天下的父母都是这样的吧,子女每一次花在父母身上的开销,他们都觉得这是自己的孩子用多少的加班,奔波,出差换来的。

    程序员的生活不仅是疲惫的,更是日复一日的无休止的加班,敖丙开始在想难道自己要在这样的日子中一直忙碌下去吗,对于这个多彩斑斓,鸟语花香的世界,自己每天只有坐在写字楼的办公位上敲着代码,优化着需求,来获得每月杯水车薪的薪资。于是他开始迎来了自己事业的第二春,今年的双十一大战中,这位对生活有理想有希望的程序员小哥正式加入了云大使的队伍中,凭借着自己在各个渠道开拓的阵地,成功的晋升到了云大使的主力军团中。

    在推广的路上也并非是一一帆风顺,对于投放这个在推广初期最棘手的问题,敖丙通过问身边有经验的朋友,也学习观看了很多的课件资料,得到了很多的思路,现在对这个问题敖丙也有自己的节奏和方向,所以问题来临时首先要想的是如何靠自己的力量去解决而不是一味的逃避或者丢给别人,主动的自发性是成功的最重要因素。对于推广,敖丙谈到个人得到了不错的经济收益,尤其是对于程序员来说云大使真的就是将技术变现发挥到极致的一个项目。除此之外需要服务器的小伙伴也得到了不错的优惠,互利共赢。毕竟上云是大势所趋吧,现在自己的老东家,还有现在的公司都上云了,上云一定是中国未来大中小型公司的必走之路,所以这块的利益是可观的,能利用自己的专业性知识和业余时间去做推广,不仅赚得盆满钵满,也推动了云计算的发展,这种双赢的项目是普惠大众的。
    敖丙说从事云大使以来彻底解决了目前我经济窘迫的情况,还给家里置办了不少电器家具,再也不会像之前一样过着紧紧巴巴的生活,这些都是在推广前没有想到的,目前在进行的双十二活动排位中敖丙已经取得了第一的成绩,在采访的最后敖丙说到:“活动我都会参加的,不为了别的因为钱。希望越办越好!“

    小编想说云大使是依托于线下最有推广前景的云计算而存在的,技术变现不再是纸上谈兵,不加入你怎么直到自己不会年入百万?梦想一定要有,在这里不是可能会实现,是一定会实现!我们愿与你一起助力企业上云,驱动数字中国。在这里插入图片描述

    展开全文
  • RocketMQ

    万次阅读 多人点赞 2019-07-31 19:17:34
    RocketMQ是一款分布式、队列模型的消息中间件,由Metaq3.X版本改名而来,RocketMQ并不遵循包括JMS规范在内的任何规范,但是参考了各种规范不同类产品的设计思想,自己有一套自定义的机制,简单来说就是使用订阅主题...

    此为博主(yjclsx)原创文章,如若转载请标明出处,谢谢!
    #一、RocketMQ简介
    ##1.1、介绍
    RocketMQ是一款分布式、队列模型的消息中间件,由Metaq3.X版本改名而来,RocketMQ并不遵循包括JMS规范在内的任何规范,但是参考了各种规范不同类产品的设计思想,自己有一套自定义的机制,简单来说就是使用订阅主题的方式去发送和接收任务,但是支持集群和广播两种消息模式。开源项目地址:https://github.com/apache/rocketmq
    具有以下特点:
    1、能够保证严格的消息顺序
    2、提供丰富的消息拉取模式
    3、高效的订阅者水平扩展能力
    4、实时的消息订阅机制
    5、亿级消息堆积能力
    选用理由:
    1、强调集群无单点,可扩展,任意一点高可用,水平可扩展。
    2、海量消息堆积能力,消息堆积后,写入低延迟。
    3、支持上万个队列。
    4、消息失败重试机制。
    5、消息可查询。
    6、开源社区活跃。
    7、成熟度(历经多次天猫双十一海量消息考验)
    ##1.2、专业术语
    1、Producer
    消息生产者,负责产生消息,一般由业务系统负责产生消息。
    2、Consumer
    消息消费者,负责消费消息,一般是后台系统负责异步消费。
    3、Push Consumer
    Consumer 的一种,应用通常向 Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立刻回调 Listener 接口方法。
    4、Pull Consumer
    Consumer 的一种,应用通常主动调用 Consumer 的拉消息方法从 Broker 拉消息,主动权由应用控制。
    5、Producer Group
    一类 Producer 的集合名称,这类 Producer 通常发送一类消息,且发送逻辑一致。
    6、Consumer Group
    一类 Consumer 的集合名称,这类 Consumer 通常消费一类消息,且消费逻辑一致。
    7、Broker
    消息中转角色,负责存储消息,转发消息,一般也称为 Server。在 JMS 规范中称为 Provider。
    8、广播消费
    一条消息被多个 Consumer 消费,即使返些 Consumer 属于同一个 Consumer Group,消息也会被 Consumer Group 中的每个 Consumer 都消费一次,广播消费中的 Consumer Group 概念可以认为在消息划分方面无意义。
    在 CORBA Notification 规范中,消费方式都属于广播消费。
    在 JMS 规范中,相当于 JMS publish/subscribe model
    9、集群消费
    一个 Consumer Group 中的 Consumer 实例平均分摊消费消息。例如某个 Topic 有 9 条消息,其中一个Consumer Group 有 3 个实例(可能是 3 个进程,或者 3 台机器),那么每个实例只消费其中的 3 条消息。
    在 CORBA Notification 规范中,无此消费方式。
    在 JMS 规范中,JMS point-to-point model 与之类似,但是 RocketMQ 的集群消费功能大等于 PTP 模型。
    因为 RocketMQ 单个 Consumer Group 内的消费者类似于 PTP,但是一个 Topic/Queue 可以被多个 Consumer Group 消费。
    10、顺序消息
    消费消息的顺序要同収送消息的顺序一致,在 RocketMQ 中,主要挃的是尿部顺序,即一类消息为满足顺序性,必须 Producer 单线程顺序収送,丏収送到同一个队列,返样 Consumer 就可以挄照 Producer 发送的顺序去消费消息。
    11、普通顺序消息
    顺序消息的一种,正常情冴下可以保证完全的顺序消息,但是一旦収生通信异常,Broker 重启,由亍队列总数収生发化,哈希叏模后定位的队列会发化,产生短暂的消息顺序丌一致。如果业务能容忍在集群异常情冴(如某个 Broker 宕机戒者重启)下,消息短暂的乱序,使用普通顺序方式比较合适。
    12、严格顺序消息
    顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式 Failover 特性,即 Broker 集群中只要有一台机器丌可用,则整个集群都丌可用,服务可用性大大降低。
    如果服务器部署为同步双写模式,此缺陷可通过备机自劢切换为主避免,丌过仍然会存在几分钟的服务丌可用。(依赖同步双写,主备自劢切换,自劢切换功能目前迓未实现)
    目前已知的应用只有数据库 binlog 同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推荐使用普通的顺序消息。
    13、Message Queue
    在 RocketMQ 中,所有消息队列都是持丽化,长度无限的数据结构,所谓长度无限是挃队列中的每个存储单元都是定长,访问其中的存储单元使用 Offset 来访问,offset 为 java long 类型,64 位,理论上在 100年内不会溢出,所以认为是长度无限,另外队列中只保存最近几天的数据,之前的数据会按照过期时间来删除。
    也可以认为 Message Queue 是一个长度无限的数组,offset 就是下标。
    ##1.3、关键概念
    ###1.3.1、主题与标签
    主题Topic:第一级消息类型,书的标题;
    标签Tags:第二级消息类型,书的目录,可以基于Tag做简单的消息过滤,通常这已经可以满足90%的需求了,如果有更复杂的过滤场景,就需要使用rocketmq-filtersrv组件了。
    例如,主题是订单交易,那么标签可以是订单交易-创建、订单交易-付款、订单交易-完成。
    通过查看源码就可以发现:一个主题在MQ上默认会有4个Queue队列来存储该主题上的消息,Queue的数量也可以在创建主题时指定。这也是为什么,当MQ采用双Master集群方式时,如果向MQ发送100条消息,其中52条在BrokerA上,48条在BrokerB上。因为4条发给A,4条发给B…依次循环下去,最后4条是发给了A,所以A比B多存储了4条消息。
    ###1.3.2、群组
    这里写图片描述
    生产组:用于消息的发送的群组,官方推荐:一个生产组理应发送的是同一主题的消息,消息子类型再使用Tags来区分;
    消费组:用于消息的订阅处理的群组,官方推荐:一个消费组理应消费的是同一主题的消息,再使用Tags在Broker做消息过滤。
    生产组和消费组极大地方便了扩缩机器、增减处理能力等,同时只有群组名相同才会被认为是一个集群组的,RocketMQ默认情况下采用集群消费模式,所以消息每次只会随机的发给每个消费群组中的一员,这也体现了RocketMQ集群无单点、水平可扩展、任意一点高可用、支持负载均衡等特点。
    ##1.4、RocketMQ核心模块
    rocketmq-broker:接受生产者发来的消息并存储(通过调用rocketmq-store),消费者从这里取得消息。
    rocketmq-client:提供发送、接受消息的客户端API。
    rocketmq-namesrv:NameServer,类似于Zookeeper,这里保存着消息的TopicName,队列等运行时的元信息。
    rocketmq-common:通用的一些类,方法,数据结构等。
    rocketmq-remoting:基于Netty4的client/server + fastjson序列化 + 自定义二进制协议。
    rocketmq-store:消息、索引存储等。
    rocketmq-filtersrv:消息过滤器Server,需要注意的是,要实现这种过滤,需要上传代码到MQ!【一般而言,我们利用Tag足以满足大部分的过滤需求,如果更灵活更复杂的过滤需求,可以考虑filtersrv组件】。
    rocketmq-tools:命令行工具。
    #二、RocketMQ示例
    ##2.1、RocketMQ部署–双master方式
    可参考我的博文:“RocketMQ部署–双master方式”。
    ##2.2、HelloWorld示例
    ###2.2.1、生产者

    import com.alibaba.rocketmq.client.exception.MQBrokerException;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    import com.alibaba.rocketmq.remoting.exception.RemotingException;
    
    public class Producer {
    	public static void main(String[] args) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
    		//实例化生产者,实例化时需要指定生产组名
    		DefaultMQProducer producer = new DefaultMQProducer("quickstart_producer");
    		//设置namesrc地址,有多个的话用";"隔开
    		producer.setNamesrvAddr("192.168.246.130:9876;192.168.246.131:9876");
    		//启动生产者
    		producer.start();
    		for(int i=1;i<=100;i++){
    			//创建一条消息,指定了消息的主题topic、标签tag、消息的内容
    			Message msg = new Message("TopicQuickStart", "TagA", ("Hello RocketMQ "+i).getBytes());
    			//发送消息
    			SendResult sendResult = producer.send(msg);
    			System.out.println(sendResult);
    		}
    		//关闭生产者,main方法主线程结束,程序终止
    		producer.shutdown();
    	}
    }
    

    ###2.2.2、消费者

    import java.util.List;
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    /**
     * Consumer,订阅消息
     */
    public class Consumer {
        public static void main(String[] args) throws InterruptedException, MQClientException {
        	//实例化消费者,实例化时需要指定消费组名
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");
            //设置namesrc地址,有多个的话用";"隔开
            consumer.setNamesrvAddr("192.168.246.130:9876;192.168.246.131:9876");
            /**
             * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
             * 如果非第一次启动,那么按照上次消费的位置继续消费
             */
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            //设置每次消费的消息最大数量,默认是1,即一条条拉取
            consumer.setConsumeMessageBatchMaxSize(10);
            //设置订阅的消息主题topic和标签tags,这里订阅TopicQuickStart主题下的所有消息,所以会收到上面生产者发送的该主题下标签为TagA的消息
            consumer.subscribe("TopicQuickStart", "*");
            //注册消费监听
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                        ConsumeConcurrentlyContext context) {
                	//如果不设置每次消费的消息最大数量,这里的msgs里只会有一条
                	System.out.println("消息条数:"+msgs.size());
                	for(MessageExt msg : msgs){
                		System.out.println(Thread.currentThread().getName()+"收到消息:topic:"+msg.getTopic()+",tags:"+msg.getTags()+",msg:"+new String(msg.getBody()));
                	}
                	//回复RocketMQ,这条消息消费成功,如果返回的是ConsumeConcurrentlyStatus.RECONSUME_LATER,即表明消息消费失败,那RocketMQ会对这条消息进行重发操作
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
     //启动消费者,main方法主线程结束后,程序不会停止,进入阻塞状态,来一条消息就触发一次监听事件
            consumer.start();
            System.out.println("Consumer Started.");
        }
    }
    

    可以执行多次上面消费者的main方法,也就是启动多个这样的消费者,因为在一个群组里,消息每次只会发送给群组里的一个成员,所以假设有100条消息,启动了两个同一群组的消费者,那么每个消费者各消费50条消息。可见,RocketMQ自动完成了相同群组下的消费者的负载均衡操作,而且如果想增减消费者,只需启动或者关闭消费者即可,无需任何配置,水平可扩展性好!
    如果要切换成广播消费模式,每个消费端都需进行下面的设置:
    consumer.setMessageModel(MessageModel.BROADCASTING);//设置为广播消费模式
    这样即使是同一个消费组的消费者,也都会收到订阅的所有消息,不会进行均衡消费。
    ##2.3、两类Consumer
    在RocketMQ里,Consumer分为两类:MQPullConsumer和MQPushConsumer。其实两种都是拉模式(pull),即Consumer轮询从broker拉取消息。
    push方式就是上面例子里的消费者,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumerMessage()来消费,对用户而言,感觉消息是被推送过来的。
    pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。
    #三、消息重试
    ##3.1、生产端消息重试
    生产者端的消息失败,也就是Producer往MQ上发消息没有发送成功,比如网络抖动导致生产者发送消息到MQ失败,这种消息失败重试我们可以手动设置发送失败重试的次数。

    producer.setRetryTimesWhenSendFailed(3); //设置重试次数
    producer.send(msg, 1000); //发送消息,并设置消息发送超时时间
    

    上面的代码表示消息在1S内没有发送成功就会触发重试,重试最多3次。
    ##3.2、消费端消息重试
    消费端在收到消息并处理完成会返回ConsumeConcurrentlyStatus.CONSUME_SUCCESS表示消费成功,如果返回了失败或者没返回就会触发重试,即MQ会把消息再发一遍。所以,发生消费端的消息重试有两种情况:1、返回了ConsumeConcurrentlyStatus.RECONSUME_LATER直接表明消费失败;2、长时间没有返回消息处理状态给MQ导致超时。
    消息重复消费
    值得注意的是,当一个消费组有多个消费者时,其中一个消费者处理消息后长时间没返回,那么MQ就会把这条消息进行重试,会发送给同一消费组的另外一个消费者进行消费。要是这时候之前的消费者又把消息处理结果返回了,那就出现了消息重复消费的问题。
    RocketMQ无法避免消息重复,如果业务对消息重复非常敏感,务必要在业务层面去重,这就要求我们一定要做好消费端幂等处理。比如每条消息都有一个唯一编号,每处理完一条消息就记录日志,当消息再来的时候判断一下本条消息是否处理过。需要注意的是,如果消费端处理消息后的结果保存在DB中,那记录日志的操作也一定要保存在这个DB中,这样才能保证事务,其中有一步失败了就会一起回滚。倘若把消息处理后的结果存在mysql里,日志却记录在redis中,然后每次消息再来的时候去redis中查看是否已经处理过,这样是错误的做法,本以为放redis里再去查询的时候速度快,可以提升性能,但是却导致事务的一致性无法保证(比如mysql操作成功了而redis操作失败了那怎么回滚呢),至少目前为止单靠spring的事务管理无法回滚两个数据源的操作,需要增加其他的组件,所以建议都在一个DB中操作。
    #四、集群
    推荐的几种 Broker 集群部署方式,这里的 Slave 不可写,但可读,类似于 Mysql 主备方式。当主节点挂了,就可以访问从节点来获取之前未消费的数据。但是因为Slave是只读的,所以不会接收生产者生产的新数据,新数据只会存储到其他的Broker主备节点上,直到宕机的主节点重新启动了才会接收新数据。至少截止到v3.2.4版本,RocketMQ还未能支持主备自动切换功能。
    ##4.1、单个 Master
    返种方式风险较大,一旦 Broker 重启或者宕机时,会导致整个服务不可用,不建议线上环境使用
    ##4.2、多 Master 模式
    一个集群无 Slave,全是 Master,例如 2 个 Master 或者 3 个 Master
    优点:配置简单,单个 Master 宕机或重启维护对应用无影响,在磁盘配置为 RAID10 时,即使机器宕机不可恢复情况下,由于 RAID10 磁盘非常可靠,消息也不会丢(异步刷盘丢失少量消息,同步刷盘一条不丢)。性能最高。
    缺点:单台机器宕机期间,这台机器上未被消费的消息在机器恢复之前不可订阅,消息实时性会受到影响。
    ##4.3、多 Master 多 Slave 模式,异步复制
    每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用异步复制方式,主备有短暂消息延迟,毫秒级。
    优点:即使磁盘损坏,消息丢失的非常少,且消息实时性不会受影响,因为 Master 宕机后,消费者仍然可以从 Slave 消费,此过程对应用透明。不需要人工干预。性能同多 Master 模式几乎一样。
    缺点:Master 宕机、磁盘损坏等情况,会丢失少量消息。
    ##4.4、多 Master 多 Slave 模式,同步双写
    每个 Master 配置一个 Slave,有多对 Master-Slave,HA 采用同步双写方式,主备都写成功,才会向应用返回成功。
    优点:数据与服务都无单点,Master 宕机情况下,消息无延迟,服务可用性与数据可用性都非常高
    缺点:性能比异步复制模式略低,大约低 10%左右,发送单个消息的 RT 会略高。目前主宕机后,备机不能自动切换为主机,后续会支持自动切换功能。
    #五、顺序消费
    普通模式下,使用传统的send发送消息即可,比如2.2里的示例代码,但是这种模式下不能保证消息消费顺序的一致性。假如我们在网购的时候,需要下单,那么下单需要有三个顺序,第一、创建订单 ,第二:订单付款,第三:订单完成,也就是这个三个环节要有顺序,这个订单才有意义,这种场景下就需要顺序消费。
    世界上解决一个计算机问题最简单的方法:“恰好”不需要解决它!
    那通过RocketMQ怎么实现顺序消费的呢?
    答:需要顺序消费的消息在生成端必须发送到同一个主题的同一个队列中(一个主题默认4个队列),比如创建订单1、订单1付款,订单1完成这三条消息就需要在同一个队列中,创建订单2、订单2付款,订单2完成这三条消息也需要在同一队列中,但订单1和订单2的队列可以不是同一个队列。然后消费端消费时必须实现MessageListenerOrderly接口以保证一个队列只会被同一个消费端的一个线程所消费,因为队列先进先出的原则,就可以保证顺序消费了。
    比如有1个生产端和2个消费端,要保证顺序消费,示例代码如下:
    ##5.1、生产者

    public class Producer {  
        public static void main(String[] args) {  
            try {  
                DefaultMQProducer producer = new DefaultMQProducer("order_Producer");  
                producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
                producer.start();  
                for (int i = 1; i <= 5; i++) {  
      // 主题:TopicOrderTest,标签:order_1,KEY:"KEY" + i,消息内容:"order_1 " + i
                    Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes());  
      // RocketMQ通过MessageQueueSelector中实现的算法来确定消息发送到哪一个队列上
    		// RocketMQ默认提供了两种MessageQueueSelector实现:随机/Hash
    		// 当然你可以根据业务实现自己的MessageQueueSelector来决定消息按照何种策略发送到消息队列中
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {  
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  
                            Integer id = (Integer) arg;  //arg就是producer.send方法的最后一个参数,这里是0
                            int index = id % mqs.size();  //队列数量没有事先设置那就是4,0%4=0
                            return mqs.get(index);  //返回下标为0的队列,即这5条消息存放在0号队列中
                        }  
                    }, 0);  
                    System.out.println(sendResult);  
                }  
                for (int i = 1; i <= 5; i++) {  
                    Message msg = new Message("TopicOrderTest", "order_2", "KEY" + i, ("order_2 " + i).getBytes());  
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {  
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  
                            Integer id = (Integer) arg;  
                            int index = id % mqs.size();  
                            return mqs.get(index);  //返回下标为1的队列,即这5条消息存放在1号队列中
                        }  
                    }, 1);  
                    System.out.println(sendResult);  
                }  
                for (int i = 1; i <= 5; i++) {  
                    Message msg = new Message("TopicOrderTest", "order_3", "KEY" + i, ("order_3 " + i).getBytes());  
                    SendResult sendResult = producer.send(msg, new MessageQueueSelector() {  
                        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  
                            Integer id = (Integer) arg;  
                            int index = id % mqs.size();  
                            return mqs.get(index);  //返回下标为2的队列,即这5条消息存放在2号队列中
                        }  
                    }, 2);  
                    System.out.println(sendResult);  
                }  
                producer.shutdown();  
            } catch (MQClientException e) {  
                e.printStackTrace();  
            } catch (RemotingException e) {  
                e.printStackTrace();  
            } catch (MQBrokerException e) {  
                e.printStackTrace();  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }
    

    ##5.2、消费者1

    public class Consumer1 {    
        public static void main(String[] args) throws MQClientException {  
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");  
            consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
            consumer.subscribe("TopicOrderTest", "*");  
            /** 
             * 实现了MessageListenerOrderly表示一个队列只会被一个线程取到,第二个线程无法访问这个队列 
      * 所以为了保证顺序消费,消费逻辑里不应该有多线程逻辑,比如通过线程池并发消费,这都是不允许的
             */  
            consumer.registerMessageListener(new MessageListenerOrderly() {  
                AtomicLong consumeTimes = new AtomicLong(0);  
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {  
                    // 设置自动提交  
                    context.setAutoCommit(true);  
                    for (MessageExt msg : msgs) {  
                        System.out.println(msg + ",内容:" + new String(msg.getBody()));  
                    }  
                    try {  
                        TimeUnit.SECONDS.sleep(5L);  
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                    return ConsumeOrderlyStatus.SUCCESS;  
                }  
            });  
            consumer.start();  
            System.out.println("Consumer1 Started.");  
        }  
    }
    

    ##5.3、消费者2

    public class Consumer2 {  
        public static void main(String[] args) throws MQClientException {  
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer");  
            consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
            consumer.subscribe("TopicOrderTest", "*");  
            /** 
             * 实现了MessageListenerOrderly表示一个队列只会被一个线程取到,第二个线程无法访问这个队列 
      * 所以为了保证顺序消费,消费逻辑里不应该有多线程逻辑,比如通过线程池并发消费,这都是不允许的
             */  
            consumer.registerMessageListener(new MessageListenerOrderly() {  
                AtomicLong consumeTimes = new AtomicLong(0);  
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {  
                    // 设置自动提交  
                    context.setAutoCommit(true);  
                    for (MessageExt msg : msgs) {  
                        System.out.println(msg + ",内容:" + new String(msg.getBody()));  
                    }  
                    try {  
                        TimeUnit.SECONDS.sleep(5L);  
                    } catch (InterruptedException e) {  
                        e.printStackTrace();  
                    }  
                    return ConsumeOrderlyStatus.SUCCESS;  
                }  
            });  
            consumer.start();  
            System.out.println("Consumer2 Started.");  
        }  
    }
    

    先启动Consumer1和Consumer2,然后启动Producer,Producer会发送15条消息。
    Consumer1消费情况如图,都按照顺序执行了
    这里写图片描述
    Consumer2消费情况如图,也都按照顺序执行了
    这里写图片描述
    #六、事务消费
    考虑生活中的场景:我们去北京庆丰包子铺吃炒肝,先去营业员那里付款(Action1),拿到小票(Ticket),然后去取餐窗口排队拿炒肝(Action2)。思考2个问题:第一,为什么不在付款的同时,给顾客炒肝?如果这样的话,会增加处理时间,使得后面的顾客等待时间变长,相当于降低了接待顾客的能力(降低了系统的QPS)。第二,付了款,拿到的是Ticket,顾客为什么会接受?从心理上说,顾客相信Ticket会兑现炒肝。事实上也是如此,就算在最后炒肝没了,或者断电断水(系统出现异常),顾客依然可以通过Ticket进行退款操作,这样都不会有什么损失!(虽然这么说,但是实际上包子铺最大化了它的利益,如果炒肝真的没了,浪费了顾客的时间,不过顾客顶多发发牢骚,最后接受)
    生活已经告诉我们处理分布式事务,保证数据最终一致性的思路!这个Ticket(凭证)其实就是消息!
    通过RocketMQ可以实现分布式事务,比如银行A向银行B转账,银行A扣款1000,那银行B一定要加1000才行,通过RocketMQ的执行逻辑如下:
    这里写图片描述
    如上图所示,消息数据独立存储,业务和消息解耦,实质上消息的发送有2次,一条是转账消息,另一条是确认消息。发送转账消息后,消息在MQ的状态是prepared,这时消费者还无法收到这条消息,需等生产者这边的本地事务执行完并发送确认消息后,才能收到这条消息。
    到这里,我们先来看看基于RocketMQ的代码:
    ##6.1、消费者

    public class Consumer {  
        public static void main(String[] args) throws InterruptedException, MQClientException {  
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_Consumer");  
            consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
            consumer.setConsumeMessageBatchMaxSize(10);  
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
            consumer.subscribe("TopicTransactionTest", "*");  
            consumer.registerMessageListener(new MessageListenerConcurrently() {  
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  
                    try {  
                        for (MessageExt msg : msgs) {  
                            System.out.println(msg + ",内容:" + new String(msg.getBody()));  
                        }  
                    } catch (Exception e) {  
                        e.printStackTrace();  
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重试  
                    }  
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功  
                }  
            }); 
            consumer.start(); 
            System.out.println("transaction_Consumer Started.");  
        }  
    }
    

    ##6.2、生产者
    ###6.2.1、生产者

    public class Producer {  
        public static void main(String[] args) throws MQClientException, InterruptedException {  
            TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();  
            TransactionMQProducer producer = new TransactionMQProducer("transaction_Producer");  
            producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876");  
            // 事务回查最小并发数  
            producer.setCheckThreadPoolMinSize(2);  
            // 事务回查最大并发数  
            producer.setCheckThreadPoolMaxSize(2);  
            // 队列数  
            producer.setCheckRequestHoldMax(2000);  
            producer.setTransactionCheckListener(transactionCheckListener);  
            producer.start();   
            TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();  
            for (int i = 1; i <= 2; i++) {  
                try {  
                    Message msg = new Message("TopicTransactionTest", "transaction" + i, "KEY" + i,  
                            ("Hello RocketMQ " + i).getBytes());  
      //发送消息后,消息在MQ的状态是prepared,这时消费者还无法收到这条消息,需等生产者这边的本地事务执行完并发送确认消息后,才能收到这条消息
                    SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);  
                    System.out.println(sendResult);  
                    Thread.sleep(10);  
                } catch (MQClientException e) {  
                    e.printStackTrace();  
                }  
            }  
            for (int i = 0; i < 100000; i++) {  
                Thread.sleep(1000);  
            }  
            producer.shutdown();  
        }  
    }
    

    ###6.2.2、执行本地事务
    TransactionExecuterImpl类用于执行本地事务如下:

    public class TransactionExecuterImpl implements LocalTransactionExecuter {  
        public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {  
            System.out.println("执行本地事务msg = " + new String(msg.getBody()));  
            System.out.println("执行本地事务arg = " + arg);  
            String tags = msg.getTags();  
            if (tags.equals("transaction2")) {  
                System.out.println("======我的操作============,失败了  -进行ROLLBACK");  
                return LocalTransactionState.ROLLBACK_MESSAGE;  //返回失败并发送回滚消息
            }  
            return LocalTransactionState.COMMIT_MESSAGE;  //返回成功并发送确认消息
            // return LocalTransactionState.UNKNOW;  
        }  
    }
    

    ###6.2.3、针对未决事务,MQ服务器回查客户端
    如果因网络问题最后发送确认消息给MQ失败了或者发送了LocalTransactionState.UNKNOW,那事务就一直没能完成,一直处于prepared状态,针对未决事务,MQ服务器会回查客户端看看到底有没有完成(目前已经被阉割啦),这时会调用TransactionCheckListener接口,所以TransactionCheckListenerImpl类实现了这个接口用于回查,代码如下:

    public class TransactionCheckListenerImpl implements TransactionCheckListener {  
        //在这里,我们可以根据由MQ回传的key去数据库查询,这条数据到底是成功了还是失败了。  
        public LocalTransactionState checkLocalTransactionState(MessageExt msg) {  
            System.out.println("未决事务,服务器回查客户端msg =" + new String(msg.getBody().toString()));  
            // return LocalTransactionState.ROLLBACK_MESSAGE;  
            return LocalTransactionState.COMMIT_MESSAGE;  
            // return LocalTransactionState.UNKNOW;  
        }  
    }
    

    producer端发送数据到MQ,并且处理本地事物,这里模拟了一个成功一个失败。Consumer只会接收到本地事物成功的数据,第二个数据失败了,不会被消费。
    因为MQ回查客户端的功能被阿里去除了,导致即使返回了LocalTransactionState.UNKNOW,TransactionCheckListenerImpl里的代码也不会被触发,所以目前事务回查这部分需要自己设计实现。
    #七、参考文章
    RocketMQ重点原理讲解:https://www.jianshu.com/p/453c6e7ff81c

    此为博主(yjclsx)原创文章,如若转载请标明出处,谢谢!

    展开全文
  • 消息中间件MQ与RabbitMQ面试题(2020最新版)

    万次阅读 多人点赞 2020-03-01 11:11:21
    MQ的优点消息队列有什么优缺点?RabbitMQ有什么优缺点?你们公司生产环境用的是什么消息中间件?Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点?MQ 有哪些常见问题?如何解决这些问题?什么是RabbitMQ?...
  • 消息队列MQ的使用流程

    万次阅读 多人点赞 2018-09-19 19:21:21
    在大型平台的分布式项目中,消息队列MQ具有重要的作用,经常用在边缘业务功能的处理中,比如日志管理【下面将以Bug日志保存为例】,因为像日志保存、新用户注册发送邮件等操作都不是主干业务,可以放在消息队列异步...
  • [转]Redis作为消息队列与RabbitMQ的性能对比

    万次阅读 热门讨论 2014-06-25 21:32:24
    周末测试了一下RabbitMQ的性能,RabbitMQ是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。个人认为,在互联网开发...
  • FreeRTOS消息队列

    千次阅读 2019-01-31 17:55:04
    问题解答 曾经有人问我,FreeRTOS那么多API,到底怎么记住呢? 我想说,其实API不难记,就是有点难找,因为FreeRTOS的API很多都是带参宏,所以跳来跳去的比较麻烦,而且注释也很多,要找还真不是那么容易,不过也...
  • 消息总线VS消息队列

    万次阅读 2015-02-18 17:30:22
    消息队列跟消息总线进行了对比,并说明了对于企业应用,封装消息总线的必要性。
  • 消息队列之 RabbitMQ

    万次阅读 多人点赞 2019-04-30 17:11:00
    关于消息队列,从前年开始断断续续看了些资料,想写很久了,但一直没腾出空,近来分别碰到几个朋友聊这块的技术选型,是时候把这块的知识整理记录一下了。 市面上的消息队列产品有很多,比如老牌的 ActiveMQ、...
  • Linux消息队列

    千次阅读 2011-09-13 16:21:28
     消息队列就是一个消息的链表。可以把消息看作一个记录,具有特定的格式以及特定的优先级。对消息队列有写权限的进程可以向中按照一定的规则添加新消息;有读权限的进程则可以读走消息。读走就没有了。消息队列是随...
  • 天天说队列, 项目请求数据不能及时处理时,就一言...1.比如你的服务器一秒能处理100个订单,但秒杀活动1秒进来1000个订单,持续10秒,在后端能力无法增加的情况下,你可以用消息队列将总共10000个请求压在队列里,后...
  • 关于消息队列的使用

    万次阅读 2019-03-05 13:58:31
    一、消息队列概述 消息队列中间件是分布式系统中重要的组件,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,...
  • 三太子敖丙-帅丙的文章目录

    千次阅读 2020-04-10 09:48:19
    三太子敖丙-帅丙的文章目录
  • 文章目录Rabbitmq - 进阶:延迟消息队列,以及深入探究延迟消息队列的极限超时1、构建一个简单的springboot 应用3、构建rabbitmq 环境2、编写一个非延迟队列3、测试非延迟队列是否能够正常工作4、了解延迟队列以及...
  • 什么是消息队列你了解过么?
  • 常用消息队列对比

    万次阅读 2018-03-05 22:31:26
    作为中间件,消息队列是分布式应用间交换信息的重要组件。消息队列可驻留在内存或磁盘上, 队列可以存储消息直到它们被应用程序读走。通过消息队列,应用程序可以在不知道彼此位置的情况下独立处理消息,或者在处理...
  • 这是网上的一篇教程写的很好,不知原作者是谁,没法注明出处,我看的时候也是别人转载的,这里就注明一下那篇转载的地址:...使用较多的消息队列有ActiveMQ,RabbitMQ
  • 敖丙大佬面试视屏学习(一)

    万次阅读 多人点赞 2020-03-24 16:29:41
    敖丙面试一个大厂一年经验的程序员 看完之后感觉 自己很多方面的积累还不如人家一个一年经验的人 尤其是今天学习 学的好累 感觉自己要炸裂了 感觉不会的太多了 没想到一个视频给我看清醒了 还是要积累和学习 加油 ...
  • 1.关于消息队列 消息队列,外文名Message Queue,简称MQ,是指在消息的传输中保存消息的容器或服务。 消息队列,是分布式系统实现高性能、高可用、可伸缩等高级特效的重要组件,适用多种场景,如:消息通讯、异步...
  • 这篇文章主要讲述如何在springboot中用reids实现消息队列。准备阶段 安装redis,可参考我的另一篇文章,5分钟带你入门Redis。
  • C#操作消息队列

    千次阅读 2007-03-24 15:31:00
    public class QueueManage { /// /// 发送对象到队列中 /// /// 队列名称,因为队列名称在一个应用中应该不改变的,所以大家最好写在配置文件中 /// 要发出去的对象 public static void SendQueue(string QueuePath,...
  • 1、消息队列(以下简称MQ)天生就是处理高并发的有力工具,因为他可以把一个完整的流程拆为多部分,并发进行,或者不是很重要的步骤模块延迟进行。大家所熟悉的是消息队列在大基数用户项目的注册模块和电商项目的...
  • 一、消息队列 1、消息队列提供了一个从一个进程向另外一个进程发送一块数据的方法 2、每个数据块都被认为是有一个类型,接收者进程接收的数据块可以有不同的类型值 3、消息队列与管道不同的是,消息队列是基于消息的...
  • Windows运行机理——消息与消息队列

    千次阅读 2018-03-17 14:10:12
    Windows运行机理这系列文章都是来至于《零基础学Qt4编程》——吴迪,...例如,当用户在窗口中画图的时候,按下鼠标左键,此时操作系统会感知这一事件,于是将这个事件包装成一个消息,投递到应用程序的消息队列中...
  • 消息队列面试题及答案

    千次阅读 2019-11-27 15:48:36
    1、为什么使用消息队列消息队列使用的场景和中间件有很多,但解决的核心问题主要是:异步、解耦、消峰填谷。 2、消息队列的优缺点 异步、解耦、消峰填谷这是消息队列最大的优点,除了这些消息队列还可以会解决...
  • 消息队列手动确认Ack

    千次阅读 2020-01-17 14:42:20
    以RabbitMQ为例,默认情况下 RabbitMQ是自动ACK机制,就意味着 MQ 会在消息发送完毕后,自动帮我们去ACK,然后删除消息的信息。 这样依赖就存在这样一个问题: 如果消费者处理消息需要较长时间,最好的做法是消费端...
  • SpringBoot整合Redis消息队列

    万次阅读 2020-06-28 11:59:32
    队列模型如图所示,它具有以下几个特点,就像我们用微信和好友(群聊除外)聊天一样,微信就是这个队列,我们可以和很多个好友聊天,但是每条消息只能发给一个好友。 只有一个消费者将获得消息 生产者不需要在接收...
  • 前面已经介绍过怎么安装rabbitmq以及要使用的三方库 因此这里直接进入实例 1、发布端代码 # new_task.py import pika # 导入pika import sys # 导入系统模块 ...connection = pika.BlockingConnection(pika....

空空如也

1 2 3 4 5 ... 20
收藏数 70,344
精华内容 28,137
关键字:

消息队列