精华内容
下载资源
问答
  • AMQP协议的理解

    2019-06-03 23:41:32
    1、什么是AMQP? AMQP,即高级消息队列协议,是为了弥补当前应用大量使用异步消息模型,并随之产生众多消息中间件产品及协议,标准的不一致使应用与中间件之间的耦合...AMQP协议是一个二进制协议,AMQP通常被划分为...

    一、什么是AMQP?

    AMQP,即高级消息队列协议,是为了弥补当前应用大量使用异步消息模型,并随之产生众多消息中间件产品及协议,标准的不一致使应用与中间件之间的耦合限制产品的选择,并增加维护成本等缺点。AMQP是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同开发语言等条件的限制。

    AMQP协议是一个二进制协议,AMQP通常被划分为三层,如下:
    在这里插入图片描述
    对上图作如下解释:

    这种分层架构类似于OSI网络协议,可替换各层实现而不影响与其它层的交互。AMQP定义了合适的服务器端域模型,用于规范服务器的行为(AMQP服务器端可称为broker)。在这里Model层决定这些基本域模型所产生的行为,这种行为在AMQP中用”command”表示,在后文中会着重来分析这些域模型。Session层定义客户端与broker之间的通信(通信双方都是一个peer,可互称做partner),为command的可靠传输提供保障。Transport层专注于数据传送,并与Session保持交互,接受上层的数据,组装成二进制流,传送到receiver后再解析数据,交付给Session层。Session层需要Transport层完成网络异常情况的汇报,顺序传送command等工作。

    消息中间件的主要功能是消息的路由(Routing)和缓存(Buffering)。

    VBvNcD.jpg
    Exchange接收消息生产者(Message Producer)发送的消息根据不同的路由算法将消息发送往Message queue。Message queue会在消息不能被正常消费时缓存这些消息,具体的缓存策略由实现者决定,当message queue与消息消费者(Message consumer)之间的连接通畅时,Message queue有将消息转发到consumer的责任。
    Message是当前模型中所操纵的基本单位,它由Producer产生,经过Broker被Consumer所消费。它的基本结构有两部分: Header和Body。Header是由Producer添加上的各种属性的集合,这些属性有控制Message是否可被缓存,接收的queue是哪个,优先级是多少等。Body是真正需要传送的数据,它是对Broker不可见的二进制数据流,在传输过程中不应该受到影响。
    一个broker中会存在多个Message queue,Exchange怎样知道它要把消息发送到哪个Message queue中去呢? 这就是上图中所展示Binding的作用。Message queue的创建是由client application控制的,在创建Message queue后需要确定它来接收并保存哪个Exchange路由的结果。Binding是用来关联Exchange与Message queue的域模型。Client application控制Exchange与某个特定Message queue关联,并将这个queue接受哪种消息的条件绑定到Exchange,这个条件也叫Binding key或是 Criteria。
    在与多个Message queue关联后,Exchange中就会存在一个路由表,这个表中存储着每个Message queue所需要消息的限制条件。Exchange就会检查它接受到的每个Message的Header及Body信息,来决定将Message路由到哪个queue中去。Message的Header中应该有个属性叫Routing Key,它由Message发送者产生,提供给Exchange路由这条Message的标准。Exchange根据不同路由算法有不同的Exchange Type,比如有Direct类似,需要Binding key等于Routing key;也有Binding key与Routing key符合一个模式关系;也有根据Message包含的某些属性来判断。一些基础的路由算法由AMQP所提供,client application也可以自定义各种自己的扩展路由算法。那么一个Message的处理流程类似于这样:   
    在这里插入图片描述
    在这里有个新名词需要介绍: Virtual Host。一个Virtual Host可持有一些Exchange和Message queue。它是一个虚拟概念,一个Virtual Host可以是一台服务器,也可以是由多台服务器组成的集群。同步扩展下,Exchange与Message queue的部署也可以是一台或是多台服务器上。

    总结下AMQP的工作过程如下:

    发布者(Publisher)发布消息(Message),经由交换机(Exchange)。

    交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。

    最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

    深入理解

    1、发布者、交换机、队列、消费者都可以有多个。同时因为 AMQP 是一个网络协议,所以这个过程中的发布者,消费者,消息代理 可以分别存在于不同的设备上。

    2、发布者发布消息时可以给消息指定各种消息属性(Message Meta-data)。有些属性有可能会被消息代理(Brokers)使用,然而其他的属性则是完全不透明的,它们只能被接收消息的应用所使用。

    3、从安全角度考虑,网络是不可靠的,又或是消费者在处理消息的过程中意外挂掉,这样没有处理成功的消息就会丢失。基于此原因,AMQP 模块包含了一个消息确认(Message Acknowledgements)机制:当一个消息从队列中投递给消费者后,不会立即从队列中删除,直到它收到来自消费者的确认回执(Acknowledgement)后,才完全从队列中删除。

    4、在某些情况下,例如当一个消息无法被成功 路由时(无法从交换机分发到队列),消息或许会被返回给发布者并被丢弃。或者,如果消息代理执行了延期操作,消息会被放入一个所谓的死信队列中。此时,消息发布者可以选择某些参数来处理这些特殊情况。

    二、Exchange交换机

    介绍Exchange之前先看写基本概念:

    • ConnectionFactory(连接工厂): 生产Connection的的工厂

    • Connection(连接): 是RabbitMQ的socket的长链接,它封装了socket协议相关部分逻辑

    • Channel(频道|信道): 是建立在Connection连接之上的一种轻量级的连接,我们大部分的业务操作是在Channel这个接口中完成的,包括定义队列的声明queueDeclare、交换机的声明exchangeDeclare、队列的绑定queueBind、发布消息basicPublish、消费消息basicConsume等。如果把Connection比作一条光纤电缆的话,那么Channel信道就比作成光纤电缆中的其中一束光纤。一个Connection上可以创建任意数量的Channel。

    • Producer(生产者): 生产者用于发布消息

    • Exchange(交换机): 生产者会将消息发送到交换机,然后交换机通过路由策略(规则)将消息路由到匹配的队列中去

    • Routing Key(路由键): 一个String值,用于定义路由规则,在队列绑定的时候需要指定路由键,在生产者发布消息的时候需要指定路由键,当消息的路由键和队列绑定的路由键匹配时,消息就会发送到该队列。

    • Queue(队列): 用于存储消息的容器,可以看成一个有序的数组,生产者生产的消息会发送到交换机中,最终交换机将消息存储到某个或某些队列中,队列可被消费者订阅,消费者从订阅的队列中获取消息。

    • Binding(绑定): Binding并不是一个概念,而是一种操作,RabbitMQ中通过绑定,以路由键作为桥梁将Exchange与Queue关联起来(Exchange—>Routing Key—>Queue),这样RabbitMQ就知道如何正确地将消息路由到指定的队列了,通过queueBind方法将Exchange、Routing Key、Queue绑定起来

    • Consumer(消费者): 用于从队列中获取消息,消费者只需关注队列即可,不需要关注交换机和路由键,消费者可以通过basicConsume(订阅模式可以从队列中一直持续的自动的接收消息)或者basicGet(先订阅消息,然后获取单条消息,再然后取消订阅,也就是说basicGet一次只能获取一条消息,如果还想再获取下一条还要再次调用basicGet)来从队列中获取消息

    • vhost(虚拟主机): RabbitMQ 通过虚拟主机(virtual host)来分发消息, 拥有自己独立的权限控制,不同的vhost之间是隔离的,单独的。vhost是权限控制的基本单位,用户只能访问与之绑定的vhost,默认vhost:”/” ,默认用户”guest” 密码“guest”,来访问默认的vhost。

    -如下图所示:
    V6HdgO.png

    交换机是用来发送消息的 AMQP 实体。交换机拿到一个消息之后将它路由给一个或零个队列。上面提到Exchange根据不同路由算法有不同的Exchange Type,

    AMQP 0-9-1 的代理提供了四种交换机:
    在这里插入图片描述
    交换机可以有两个状态:持久(durable)、暂存(transient)。

    持久化的交换机会在消息代理(broker)重启后依旧存在,而暂存的交换机则不会(它们需要在代理再次上线后重新被声明)。

    并不是所有的应用场景都需要持久化的交换机。

    2.1、默认交换机

    默认交换机(default exchange)实际上是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。

    它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。

    举个栗子:当你声明了一个名为 “search-indexing-online” 的队列,AMQP 代理会自动将其绑定到默认交换机上,绑定(binding)的路由键名称也是为 “search-indexing-online”。因此,当携带着名为 “search-indexing-online” 的路由键的消息被发送到默认交换机的时候,此消息会被默认交换机路由至名为 “search-indexing-online” 的队列中。换句话说,默认交换机看起来貌似能够直接将消息投递给队列,尽管技术上并没有做相关的操作。

    2.2、直连交换机
    直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应绑定键的队列。直连交换机用来处理消息的单播路由(unicast routing)(尽管它也可以处理多播路由)。下边介绍它是如何工作的:

    1)将一个队列绑定到某个交换机上时,赋予该绑定一个绑定键(Binding Key),假设为R;
    2)当一个携带着路由键(Routing Key)为R的消息被发送给直连交换机时,交换机会把它路由给绑定键为R的队列。

    直连交换机的队列通常是循环分发任务给多个消费者(我们称之为轮询)。比如说有3个消费者,4个任务。分别分发每个消费者一个任务后,第4个任务又分发给了第一个消费者。综上,我们很容易得出一个结论,在 AMQP 0-9-1 中,消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)之间。

    直连型交换机图例:
    在这里插入图片描述
    当生产者(P)发送消息时 Rotuing key=booking 时,这时候将消息传送给 Exchange,Exchange 获取到生产者发送过来消息后,会根据自身的规则进行与匹配相应的 Queue,这时发现 Queue1 和 Queue2 都符合,就会将消息传送给这两个队列。

    如果我们以 Rotuing key=create 和 Rotuing key=confirm 发送消息时,这时消息只会被推送到 Queue2 队列中,其他 Routing Key 的消息将会被丢弃。

    2.3、扇型交换机

    扇型交换机(funout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果 N 个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的 N 个队列。扇型用来交换机处理消息的广播路由(broadcast routing)。

    因为扇型交换机投递消息的拷贝到所有绑定到它的队列,所以他的应用案例都极其相似:

    大规模多用户在线(MMO)游戏可以使用它来处理排行榜更新等全局事件
    体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端
    分发系统使用它来广播各种状态和配置更新
    在群聊的时候,它被用来分发消息给参与群聊的用户。(AMQP 没有内置 presence 的概念,因此 XMPP 可能会是个更好的选择)
    扇型交换机图例:
    在这里插入图片描述
    上图所示,生产者(P)生产消息 1 将消息 1 推送到 Exchange,由于 Exchange Type=fanout 这时候会遵循 fanout 的规则将消息推送到所有与它绑定 Queue,也就是图上的两个 Queue 最后两个消费者消费。

    2.4、主题交换机

    前面提到的 direct 规则是严格意义上的匹配,换言之 Routing Key 必须与 Binding Key 相匹配的时候才将消息传送给 Queue.

    而Topic 的路由规则是一种模糊匹配,可以通过通配符满足一部分规则就可以传送。

    它的约定是:

    1)binding key 中可以存在两种特殊字符 “ * ” 与“#”,用于做模糊匹配,其中 “ * ” 用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

    2)routing key 为一个句点号 “.” 分隔的字符串(我们将被句点号 “. ” 分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
    binding key 与 routing key 一样也是句点号 “.” 分隔的字符串

    主题交换机图例:
    在这里插入图片描述
    当生产者发送消息 Routing Key=F.C.E 的时候,这时候只满足 Queue1,所以会被路由到 Queue 中,如果 Routing Key=A.C.E 这时候会被同是路由到 Queue1 和 Queue2 中,如果 Routing Key=A.F.B 时,这里只会发送一条消息到 Queue2 中。

    主题交换机拥有非常广泛的用户案例。无论何时,当一个问题涉及到那些想要有针对性的选择需要接收消息的 多消费者 / 多应用(multiple consumers/applications) 的时候,主题交换机都可以被列入考虑范围。

    使用案例:

    分发有关于特定地理位置的数据,例如销售点
    由多个工作者(workers)完成的后台任务,每个工作者负责处理某些特定的任务
    股票价格更新(以及其他类型的金融数据更新)
    涉及到分类或者标签的新闻更新(例如,针对特定的运动项目或者队伍)
    云端的不同种类服务的协调
    分布式架构 / 基于系统的软件封装,其中每个构建者仅能处理一个特定的架构或者系统。

    2.4、头交换机
    headers 类型的 Exchange 不依赖于 routing key 与 binding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。

    头交换机可以视为直连交换机的另一种表现形式。但直连交换机的路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至可以是整数或者哈希值(字典)等。灵活性更强(但实际上我们很少用到头交换机)。工作流程:

    1)绑定一个队列到头交换机上时,会同时绑定多个用于匹配的头(header)。
    2)传来的消息会携带header,以及会有一个 “x-match” 参数。当 “x-match” 设置为 “any” 时,消息头的任意一个值被匹配就可以满足条件,而当 “x-match” 设置为 “all” 的时候,就需要消息头的所有值都匹配成功。

    交换机小结

    类型名称路由规则
    Default自动命名的直交换机
    DirectRouting Key==Binding Key,严格匹配
    Fanout把发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中
    TopicRouting Key==Binding Key,模糊匹配
    Headers根据发送的消息内容中的 headers 属性进行匹配

    三、相关概念

    1、Queue队列

    AMQP 中的队列(queue)跟其他消息队列或任务队列中的队列是很相似的:它们存储着即将被应用消费掉的消息。

    1.1、队列属性
    队列跟交换机共享某些属性,但是队列也有一些另外的属性。

    Name
    Durable(消息代理重启后,队列依旧存在)
    Exclusive(只被一个连接(connection)使用,而且当连接关闭后队列即被删除)
    Auto-delete(当最后一个消费者退订后即被删除)
    Arguments(一些消息代理用他来完成类似与 TTL 的某些额外功能)
    1.2、队列创建
    队列在声明(declare)后才能被使用。如果一个队列尚不存在,声明一个队列会创建它。如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。如果声明中的属性与已存在队列的属性有差异,那么一个错误代码为 406 的通道级异常就会被抛出。

    1.3、队列持久化
    持久化队列(Durable queues)会被存储在磁盘上,当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。并不是所有的场景和案例都需要将队列持久化。

    持久化的队列并不会使得路由到它的消息也具有持久性。倘若消息代理挂掉了,重新启动,那么在重启的过程中持久化队列会被重新声明,无论怎样,只有经过持久化的消息才能被重新恢复。

    2、Consumer消费者
    消息如果只是存储在队列里是没有任何用处的。被应用消费掉,消息的价值才能够体现。在 AMQP 0-9-1 模型中,有两种途径可以达到此目的:

    1)将消息投递给应用 (“push API”)
    2)应用根据需要主动获取消息 (“pull API”)

    使用 push API,应用(application)需要明确表示出它在某个特定队列里所感兴趣的,想要消费的消息。如是,我们可以说应用注册了一个消费者,或者说订阅了一个队列。一个队列可以注册多个消费者,也可以注册一个独享的消费者(当独享消费者存在时,其他消费者即被排除在外)。

    每个消费者(订阅者)都有一个叫做消费者标签的标识符。它可以被用来退订消息。消费者标签实际上是一个字符串。

    3、消息机制

    3.1、消息确认
    消费者应用(Consumer applications) - 用来接受和处理消息的应用 - 在处理消息的时候偶尔会失败或者有时会直接崩溃掉。而且网络原因也有可能引起各种问题。这就给我们出了个难题,AMQP 代理在什么时候删除消息才是正确的?AMQP 0-9-1 规范给我们两种建议:

    1)自动确认模式:当消息代理(broker)将消息发送给应用后立即删除。(使用 AMQP 方法:basic.deliver 或 basic.get-ok))
    2)显式确认模式:待应用(application)发送一个确认回执(acknowledgement)后再删除消息。(使用 AMQP 方法:basic.ack)

    如果一个消费者在尚未发送确认回执的情况下挂掉了,那 AMQP 代理会将消息重新投递给另一个消费者。如果当时没有可用的消费者了,消息代理会死等下一个注册到此队列的消费者,然后再次尝试投递。

    3.2、拒绝消息
    当一个消费者接收到某条消息后,处理过程有可能成功,有可能失败。应用可以向消息代理表明,本条消息由于 “拒绝消息(Rejecting Messages)” 的原因处理失败了(或者未能在此时完成)。

    当拒绝某条消息时,应用可以告诉消息代理如何处理这条消息——销毁它或者重新放入队列。

    当此队列只有一个消费者时,请确认不要由于拒绝消息并且选择了重新放入队列的行为而引起消息在同一个消费者身上无限循环的情况发生。

    在 AMQP 中,basic.reject 方法用来执行拒绝消息的操作。但 basic.reject 有个限制:你不能使用它决绝多个带有确认回执(acknowledgements)的消息。但是如果你使用的是 RabbitMQ,那么你可以使用被称作 negative acknowledgements(也叫 nacks)的 AMQP 0-9-1 扩展来解决这个问题。

    3.3、预取消息
    在多个消费者共享一个队列的案例中,明确指定在收到下一个确认回执前每个消费者一次可以接受多少条消息是非常有用的。这可以在试图批量发布消息的时候起到简单的负载均衡和提高消息吞吐量的作用。For example, if a producing application sends messages every minute because of the nature of the work it is doing.(???例如,如果生产应用每分钟才发送一条消息,这说明处理工作尚在运行。)

    注意,RabbitMQ 只支持通道级的预取计数,而不是连接级的或者基于大小的预取。

    3.4、消息属性
    AMQP 模型中的消息(Message)对象是带有属性(Attributes)的。有些属性及其常见,以至于 AMQP 0-9-1 明确的定义了它们,并且应用开发者们无需费心思思考这些属性名字所代表的具体含义。例如:

    Content type(内容类型)
    Content encoding(内容编码)
    Routing key(路由键)
    Delivery mode (persistent or not)
    投递模式(持久化 或 非持久化)
    Message priority(消息优先权)
    Message publishing timestamp(消息发布的时间戳)
    Expiration period(消息有效期)
    Publisher application id(发布应用的 ID)

    有些属性是被 AMQP 代理所使用的,但是大多数是开放给接收它们的应用解释器用的。有些属性是可选的也被称作消息头(headers)。他们跟 HTTP 协议的 X-Headers 很相似。消息属性需要在消息被发布的时候定义。

    3.5、消息主体
    AMQP 的消息除属性外,也含有一个有效载荷 - Payload(消息实际携带的数据),它被 AMQP 代理当作不透明的字节数组来对待。

    消息代理不会检查或者修改有效载荷。消息可以只包含属性而不携带有效载荷。它通常会使用类似 JSON 这种序列化的格式数据,为了节省,协议缓冲器和 MessagePack 将结构化数据序列化,以便以消息的有效载荷的形式发布。AMQP 及其同行者们通常使用 “content-type” 和 “content-encoding” 这两个字段来与消息沟通进行有效载荷的辨识工作,但这仅仅是基于约定而已。

    3.6、消息持久化
    消息能够以持久化的方式发布,AMQP 代理会将此消息存储在磁盘上。如果服务器重启,系统会确认收到的持久化消息未丢失。

    简单地将消息发送给一个持久化的交换机或者路由给一个持久化的队列,并不会使得此消息具有持久化性质:它完全取决与消息本身的持久模式(persistence mode)。将消息以持久化方式发布时,会对性能造成一定的影响(就像数据库操作一样,健壮性的存在必定造成一些性能牺牲)。

    4、连接
    AMQP 连接通常是长连接。AMQP 是一个使用 TCP 提供可靠投递的应用层协议。AMQP 使用认证机制并且提供 TLS(SSL)保护。当一个应用不再需要连接到 AMQP 代理的时候,需要优雅的释放掉 AMQP 连接,而不是直接将 TCP 连接关闭。

    5、通道
    有些应用需要与 AMQP 代理建立多个连接。无论怎样,同时开启多个 TCP 连接都是不合适的,因为这样做会消耗掉过多的系统资源并且使得防火墙的配置更加困难。AMQP 0-9-1 提供了通道(channels)来处理多连接,可以把通道理解成共享一个 TCP 连接的多个轻量化连接。

    在涉及多线程 / 进程的应用中,为每个线程 / 进程开启一个通道(channel)是很常见的,并且这些通道不能被线程 / 进程共享。

    一个特定通道上的通讯与其他通道上的通讯是完全隔离的,因此每个 AMQP 方法都需要携带一个通道号,这样客户端就可以指定此方法是为哪个通道准备的。

    6、虚拟主机
    为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等),AMQP 提供了一个虚拟主机(virtual hosts - vhosts)的概念。这跟 Web servers 虚拟主机概念非常相似,这为 AMQP 实体提供了完全隔离的环境。当连接被建立的时候,AMQP 客户端来指定使用哪个虚拟主机。

    AMQP 是可扩展的
    AMQP 0-9-1 拥有多个扩展点:

    1)定制化交换机类型:可以让开发者们实现一些开箱即用的交换机类型尚未很好覆盖的路由方案。例如 geodata-based routing。)
    2)交换机和队列的声明中可以包含一些消息代理能够用到的额外属性。例如 RabbitMQ 中的 per-queue message TTL 即是使用该方式实现。)
    3)特定消息代理的协议扩展。例如 RabbitMQ 所实现的扩展。
    新的 AMQP 0-9-1 方法类可被引入。)
    4)消息代理可以被其他的插件扩展,例如 RabbitMQ 的管理前端 和 已经被插件化的 HTTP API。

    这些特性使得 AMQP 0-9-1 模型更加灵活,并且能够适用于解决更加宽泛的问题。

    AMQP 0-9-1 客户端生态系统
    AMQP 0-9-1 拥有众多的适用于各种流行语言和框架的客户端。其中一部分严格遵循 AMQP 规范,提供 AMQP 方法的实现。另一部分提供了额外的技术,方便使用的方法和抽象。有些客户端是异步的(非阻塞的),有些是同步的(阻塞的),有些将这两者同时实现。有些客户端支持 “供应商的特定扩展”(例如 RabbitMQ 的特定扩展)。

    因为 AMQP 的主要目标之一就是实现交互性,所以对于开发者来讲,了解协议的操作方法而不是只停留在弄懂特定客户端的库就显得十分重要。这样一来,开发者使用不同类型的库与协议进行沟通时就会容易的多。

    转载自:

    AMQP协议
    深入理解AMQP协议

    推荐阅读:
    RabbitMQ基础知识

    展开全文
  • AMQP协议介绍

    2019-10-17 08:42:47
    AMQP协议本身包括三层: Module Layer:协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑。例如,客户端可以使用Queue.Declare命令声明一个队列或者使用Basic.Consume订阅...

    AMQP协议本身包括三层:

    • Module Layer:协议最高层,主要定义了一些供客户端调用的命令,客户端可以利用这些命令实现自己的业务逻辑。例如,客户端可以使用Queue.Declare命令声明一个队列或者使用Basic.Consume订阅消费一个队列中的消息
    • Session Layer:中间层,主要负责将客户端的命令发送给服务器,再将服务端的应答返回给客户端,主要为客户端和服务端之间的通信提供可靠的同步机制和错误处理。
    • Transport Layer:最底层,主要传输二进制流,提供帧的处理、信道复用、错误检测和数据表示等

    AMQP生产者流转过程

    Connection connection = factory.newConnection();//创建连接
    Channel channel = connection.createChannel();//创建信道
    String message = "Hello World";
    channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
    channel.close();
    connection.close();
    

    在这里插入图片描述

    AMQP消费者流转过程

    Connection connection = factory.newConnection(addresses);//创建连接
    final Channel channel = Connection.createChannel();//创建信道
    Consumer consumer = new DefaultConsumer(channel){} //...省略实现
    channel.basicQos(64);
    channel.basicConsume(QUEUE_NAME,consumer);
    //等待回调函数执行完毕后,关闭资源
    TimeUnit.SECONDS.sleep(5);
    channel.close();
    connection.close();
    

    在这里插入图片描述
    如果在消费之前调用了channel.basicQos(int prefetchCount) 的方法来设置消费者客户端最大能保持的未确认的消息数,那么协议流转会涉及Basic.Qos/.Qos-Ok这两个AMQP命令。
    在真正消费之前,消费者客户端需要向Broker发送Basic.Consume命令(即调用Basic.basicConsume方法)将channel设置为接收模式,之后Broker回执Basic.Consume-Ok以告诉消费者客户端准备好消费消息。紧接着Broker向消费者客户端推送消息,即Basic.Deliver命令,和Basic.Publish命令一样会携带Content Header 和Content Body。
    消费者接收到消息并正确消费之后,向Broker发送确认,即Basic.Ack命令

    展开全文
  • 深入理解AMQP协议

    万次阅读 多人点赞 2018-10-22 12:32:16
    AMQP(高级消息队列协议)是一个网络协议。它支持符合要求的客户端应用(application)和消息中间件代理(messaging middleware broker)之间进行通信。 AMQP模型 工作过程 发布者(Publisher)发布消息...

    一、AMQP 是什么

    AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个进程间传递异步消息网络协议

    二、AMQP模型

    在这里插入图片描述

    工作过程

    发布者(Publisher)发布消息(Message),经由交换机(Exchange)。

    交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。

    最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。

    深入理解

    1、发布者、交换机、队列、消费者都可以有多个。同时因为 AMQP 是一个网络协议,所以这个过程中的发布者,消费者,消息代理 可以分别存在于不同的设备上。

    2、发布者发布消息时可以给消息指定各种消息属性(Message Meta-data)。有些属性有可能会被消息代理(Brokers)使用,然而其他的属性则是完全不透明的,它们只能被接收消息的应用所使用。

    3、从安全角度考虑,网络是不可靠的,又或是消费者在处理消息的过程中意外挂掉,这样没有处理成功的消息就会丢失。基于此原因,AMQP 模块包含了一个消息确认(Message Acknowledgements)机制:当一个消息从队列中投递给消费者后,不会立即从队列中删除,直到它收到来自消费者的确认回执(Acknowledgement)后,才完全从队列中删除。

    4、在某些情况下,例如当一个消息无法被成功路由时(无法从交换机分发到队列),消息或许会被返回给发布者并被丢弃。或者,如果消息代理执行了延期操作,消息会被放入一个所谓的死信队列中。此时,消息发布者可以选择某些参数来处理这些特殊情况。

    三、Exchange交换机

    交换机是用来发送消息的 AMQP 实体。

    交换机拿到一个消息之后将它路由给一个或零个队列。

    它使用哪种路由算法是由交换机类型绑定(Bindings)规则所决定的。

    AMQP 0-9-1 的代理提供了四种交换机:
    在这里插入图片描述
    交换机可以有两个状态:持久(durable)、暂存(transient)。

    持久化的交换机会在消息代理(broker)重启后依旧存在,而暂存的交换机则不会(它们需要在代理再次上线后重新被声明)。

    并不是所有的应用场景都需要持久化的交换机。

    在正式介绍五种交换机(包括默认交换机)前,在这里重申一下,发布者生产的消息中包含了交换机类型。消息中声明的交换机类型不同,路由规则也就不同,也就会采取不同的规则将消息投入队列。

    默认交换机

    默认交换机(default exchange)实际上是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。

    它有一个特殊的属性使得它对于简单应用特别有用处:那就是每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。

    举个栗子:当你声明了一个名为 “search-indexing-online” 的队列,AMQP 代理会自动将其绑定到默认交换机上,绑定(binding)的路由键名称也是为 “search-indexing-online”。因此,当携带着名为 “search-indexing-online” 的路由键的消息被发送到默认交换机的时候,此消息会被默认交换机路由至名为 “search-indexing-online” 的队列中。换句话说,默认交换机看起来貌似能够直接将消息投递给队列,尽管技术上并没有做相关的操作。

    直连交换机

    直连型交换机(direct exchange)是根据消息携带的路由键(routing key)将消息投递给对应绑定键的队列。直连交换机用来处理消息的单播路由(unicast routing)(尽管它也可以处理多播路由)。下边介绍它是如何工作的:

    1)将一个队列绑定到某个交换机上时,赋予该绑定一个绑定键(Binding Key),假设为R;
    2)当一个携带着路由键(Routing Key)为R的消息被发送给直连交换机时,交换机会把它路由给绑定键为R的队列。

    直连交换机的队列通常是循环分发任务给多个消费者(我们称之为轮询)。比如说有3个消费者,4个任务。分别分发每个消费者一个任务后,第4个任务又分发给了第一个消费者。综上,我们很容易得出一个结论,在 AMQP 0-9-1 中,消息的负载均衡是发生在消费者(consumer)之间的,而不是队列(queue)之间。

    直连型交换机图例:
    在这里插入图片描述
    当生产者(P)发送消息时 Rotuing key=booking 时,这时候将消息传送给 Exchange,Exchange 获取到生产者发送过来消息后,会根据自身的规则进行与匹配相应的 Queue,这时发现 Queue1 和 Queue2 都符合,就会将消息传送给这两个队列。

    如果我们以 Rotuing key=create 和 Rotuing key=confirm 发送消息时,这时消息只会被推送到 Queue2 队列中,其他 Routing Key 的消息将会被丢弃。

    扇型交换机

    扇型交换机(funout exchange)将消息路由给绑定到它身上的所有队列,而不理会绑定的路由键。如果 N 个队列绑定到某个扇型交换机上,当有消息发送给此扇型交换机时,交换机会将消息的拷贝分别发送给这所有的 N 个队列。扇型用来交换机处理消息的广播路由(broadcast routing)。

    因为扇型交换机投递消息的拷贝到所有绑定到它的队列,所以他的应用案例都极其相似:

    • 大规模多用户在线(MMO)游戏可以使用它来处理排行榜更新等全局事件
    • 体育新闻网站可以用它来近乎实时地将比分更新分发给移动客户端
    • 分发系统使用它来广播各种状态和配置更新
    • 在群聊的时候,它被用来分发消息给参与群聊的用户。(AMQP 没有内置 presence 的概念,因此 XMPP 可能会是个更好的选择)

    扇型交换机图例:
    在这里插入图片描述
    上图所示,生产者(P)生产消息 1 将消息 1 推送到 Exchange,由于 Exchange Type=fanout 这时候会遵循 fanout 的规则将消息推送到所有与它绑定 Queue,也就是图上的两个 Queue 最后两个消费者消费。

    主题交换机

    前面提到的 direct 规则是严格意义上的匹配,换言之 Routing Key 必须与 Binding Key 相匹配的时候才将消息传送给 Queue.

    而Topic 的路由规则是一种模糊匹配,可以通过通配符满足一部分规则就可以传送。

    它的约定是:

    1)binding key 中可以存在两种特殊字符 “” 与“#”,用于做模糊匹配,其中 “” 用于匹配一个单词,“#”用于匹配多个单词(可以是零个)

    2)routing key 为一个句点号 “.” 分隔的字符串(我们将被句点号 “. ” 分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
    binding key 与 routing key 一样也是句点号 “.” 分隔的字符串

    主题交换机图例:
    在这里插入图片描述
    当生产者发送消息 Routing Key=F.C.E 的时候,这时候只满足 Queue1,所以会被路由到 Queue 中,如果 Routing Key=A.C.E 这时候会被同是路由到 Queue1 和 Queue2 中,如果 Routing Key=A.F.B 时,这里只会发送一条消息到 Queue2 中。

    主题交换机拥有非常广泛的用户案例。无论何时,当一个问题涉及到那些想要有针对性的选择需要接收消息的 多消费者 / 多应用(multiple consumers/applications) 的时候,主题交换机都可以被列入考虑范围。

    使用案例:

    • 分发有关于特定地理位置的数据,例如销售点
    • 由多个工作者(workers)完成的后台任务,每个工作者负责处理某些特定的任务
    • 股票价格更新(以及其他类型的金融数据更新)
    • 涉及到分类或者标签的新闻更新(例如,针对特定的运动项目或者队伍)
    • 云端的不同种类服务的协调
    • 分布式架构 / 基于系统的软件封装,其中每个构建者仅能处理一个特定的架构或者系统。
    头交换机

    headers 类型的 Exchange 不依赖于 routing key 与 binding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。

    头交换机可以视为直连交换机的另一种表现形式。但直连交换机的路由键必须是一个字符串,而头属性值则没有这个约束,它们甚至可以是整数或者哈希值(字典)等。灵活性更强(但实际上我们很少用到头交换机)。工作流程:

    1)绑定一个队列到头交换机上时,会同时绑定多个用于匹配的头(header)。
    2)传来的消息会携带header,以及会有一个 “x-match” 参数。当 “x-match” 设置为 “any” 时,消息头的任意一个值被匹配就可以满足条件,而当 “x-match” 设置为 “all” 的时候,就需要消息头的所有值都匹配成功。

    交换机小结
    类型名称路由规则
    Default自动命名的直交换机
    DirectRouting Key==Binding Key,严格匹配
    Fanout把发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中
    TopicRouting Key==Binding Key,模糊匹配
    Headers根据发送的消息内容中的 headers 属性进行匹配

    四、Queue队列

    AMQP 中的队列(queue)跟其他消息队列或任务队列中的队列是很相似的:它们存储着即将被应用消费掉的消息。

    队列属性

    队列跟交换机共享某些属性,但是队列也有一些另外的属性。

    • Name
    • Durable(消息代理重启后,队列依旧存在)
    • Exclusive(只被一个连接(connection)使用,而且当连接关闭后队列即被删除)
    • Auto-delete(当最后一个消费者退订后即被删除)
    • Arguments(一些消息代理用他来完成类似与 TTL 的某些额外功能)
    队列创建

    队列在声明(declare)后才能被使用。如果一个队列尚不存在,声明一个队列会创建它。如果声明的队列已经存在,并且属性完全相同,那么此次声明不会对原有队列产生任何影响。如果声明中的属性与已存在队列的属性有差异,那么一个错误代码为 406 的通道级异常就会被抛出。

    队列持久化

    持久化队列(Durable queues)会被存储在磁盘上,当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。并不是所有的场景和案例都需要将队列持久化。

    持久化的队列并不会使得路由到它的消息也具有持久性。倘若消息代理挂掉了,重新启动,那么在重启的过程中持久化队列会被重新声明,无论怎样,只有经过持久化的消息才能被重新恢复。

    五、Consumer消费者

    消息如果只是存储在队列里是没有任何用处的。被应用消费掉,消息的价值才能够体现。在 AMQP 0-9-1 模型中,有两种途径可以达到此目的:

    1)将消息投递给应用 (“push API”)
    2)应用根据需要主动获取消息 (“pull API”)

    使用 push API,应用(application)需要明确表示出它在某个特定队列里所感兴趣的,想要消费的消息。如是,我们可以说应用注册了一个消费者,或者说订阅了一个队列。一个队列可以注册多个消费者,也可以注册一个独享的消费者(当独享消费者存在时,其他消费者即被排除在外)。

    每个消费者(订阅者)都有一个叫做消费者标签的标识符。它可以被用来退订消息。消费者标签实际上是一个字符串。

    六、消息机制

    消息确认

    消费者应用(Consumer applications) - 用来接受和处理消息的应用 - 在处理消息的时候偶尔会失败或者有时会直接崩溃掉。而且网络原因也有可能引起各种问题。这就给我们出了个难题,AMQP 代理在什么时候删除消息才是正确的?AMQP 0-9-1 规范给我们两种建议:

    1)自动确认模式:当消息代理(broker)将消息发送给应用后立即删除。(使用 AMQP 方法:basic.deliver 或 basic.get-ok))
    2)显式确认模式:待应用(application)发送一个确认回执(acknowledgement)后再删除消息。(使用 AMQP 方法:basic.ack)

    如果一个消费者在尚未发送确认回执的情况下挂掉了,那 AMQP 代理会将消息重新投递给另一个消费者。如果当时没有可用的消费者了,消息代理会死等下一个注册到此队列的消费者,然后再次尝试投递。

    拒绝消息

    当一个消费者接收到某条消息后,处理过程有可能成功,有可能失败。应用可以向消息代理表明,本条消息由于 “拒绝消息(Rejecting Messages)” 的原因处理失败了(或者未能在此时完成)。

    当拒绝某条消息时,应用可以告诉消息代理如何处理这条消息——销毁它或者重新放入队列。

    当此队列只有一个消费者时,请确认不要由于拒绝消息并且选择了重新放入队列的行为而引起消息在同一个消费者身上无限循环的情况发生。

    在 AMQP 中,basic.reject 方法用来执行拒绝消息的操作。但 basic.reject 有个限制:你不能使用它决绝多个带有确认回执(acknowledgements)的消息。但是如果你使用的是 RabbitMQ,那么你可以使用被称作 negative acknowledgements(也叫 nacks)的 AMQP 0-9-1 扩展来解决这个问题。

    预取消息

    在多个消费者共享一个队列的案例中,明确指定在收到下一个确认回执前每个消费者一次可以接受多少条消息是非常有用的。这可以在试图批量发布消息的时候起到简单的负载均衡和提高消息吞吐量的作用。For example, if a producing application sends messages every minute because of the nature of the work it is doing.(???例如,如果生产应用每分钟才发送一条消息,这说明处理工作尚在运行。)

    注意,RabbitMQ 只支持通道级的预取计数,而不是连接级的或者基于大小的预取。

    消息属性

    AMQP 模型中的消息(Message)对象是带有属性(Attributes)的。有些属性及其常见,以至于 AMQP 0-9-1 明确的定义了它们,并且应用开发者们无需费心思思考这些属性名字所代表的具体含义。例如:

    • Content type(内容类型)
    • Content encoding(内容编码)
    • Routing key(路由键)
    • Delivery mode (persistent or not)
    • 投递模式(持久化 或 非持久化)
    • Message priority(消息优先权)
    • Message publishing timestamp(消息发布的时间戳)
    • Expiration period(消息有效期)
    • Publisher application id(发布应用的 ID)

    有些属性是被 AMQP 代理所使用的,但是大多数是开放给接收它们的应用解释器用的。有些属性是可选的也被称作消息头(headers)。他们跟 HTTP 协议的 X-Headers 很相似。消息属性需要在消息被发布的时候定义。

    消息主体

    AMQP 的消息除属性外,也含有一个有效载荷 - Payload(消息实际携带的数据),它被 AMQP 代理当作不透明的字节数组来对待。

    消息代理不会检查或者修改有效载荷。消息可以只包含属性而不携带有效载荷。它通常会使用类似 JSON 这种序列化的格式数据,为了节省,协议缓冲器和 MessagePack 将结构化数据序列化,以便以消息的有效载荷的形式发布。AMQP 及其同行者们通常使用 “content-type” 和 “content-encoding” 这两个字段来与消息沟通进行有效载荷的辨识工作,但这仅仅是基于约定而已。

    消息持久化

    消息能够以持久化的方式发布,AMQP 代理会将此消息存储在磁盘上。如果服务器重启,系统会确认收到的持久化消息未丢失。

    简单地将消息发送给一个持久化的交换机或者路由给一个持久化的队列,并不会使得此消息具有持久化性质:它完全取决与消息本身的持久模式(persistence mode)。将消息以持久化方式发布时,会对性能造成一定的影响(就像数据库操作一样,健壮性的存在必定造成一些性能牺牲)。

    七、其他

    连接

    AMQP 连接通常是长连接。AMQP 是一个使用 TCP 提供可靠投递的应用层协议。AMQP 使用认证机制并且提供 TLS(SSL)保护。当一个应用不再需要连接到 AMQP 代理的时候,需要优雅的释放掉 AMQP 连接,而不是直接将 TCP 连接关闭。

    通道

    有些应用需要与 AMQP 代理建立多个连接。无论怎样,同时开启多个 TCP 连接都是不合适的,因为这样做会消耗掉过多的系统资源并且使得防火墙的配置更加困难。AMQP 0-9-1 提供了通道(channels)来处理多连接,可以把通道理解成共享一个 TCP 连接的多个轻量化连接。

    在涉及多线程 / 进程的应用中,为每个线程 / 进程开启一个通道(channel)是很常见的,并且这些通道不能被线程 / 进程共享。

    一个特定通道上的通讯与其他通道上的通讯是完全隔离的,因此每个 AMQP 方法都需要携带一个通道号,这样客户端就可以指定此方法是为哪个通道准备的。

    虚拟主机

    为了在一个单独的代理上实现多个隔离的环境(用户、用户组、交换机、队列 等),AMQP 提供了一个虚拟主机(virtual hosts - vhosts)的概念。这跟 Web servers 虚拟主机概念非常相似,这为 AMQP 实体提供了完全隔离的环境。当连接被建立的时候,AMQP 客户端来指定使用哪个虚拟主机。

    AMQP 是可扩展的

    AMQP 0-9-1 拥有多个扩展点:

    1)定制化交换机类型:可以让开发者们实现一些开箱即用的交换机类型尚未很好覆盖的路由方案。例如 geodata-based routing。)
    2)交换机和队列的声明中可以包含一些消息代理能够用到的额外属性。例如 RabbitMQ 中的 per-queue message TTL 即是使用该方式实现。)
    3)特定消息代理的协议扩展。例如 RabbitMQ 所实现的扩展。
    新的 AMQP 0-9-1 方法类可被引入。)
    4)消息代理可以被其他的插件扩展,例如 RabbitMQ 的管理前端 和 已经被插件化的 HTTP API。

    这些特性使得 AMQP 0-9-1 模型更加灵活,并且能够适用于解决更加宽泛的问题。

    AMQP 0-9-1 客户端生态系统

    AMQP 0-9-1 拥有众多的适用于各种流行语言和框架的客户端。其中一部分严格遵循 AMQP 规范,提供 AMQP 方法的实现。另一部分提供了额外的技术,方便使用的方法和抽象。有些客户端是异步的(非阻塞的),有些是同步的(阻塞的),有些将这两者同时实现。有些客户端支持 “供应商的特定扩展”(例如 RabbitMQ 的特定扩展)。

    因为 AMQP 的主要目标之一就是实现交互性,所以对于开发者来讲,了解协议的操作方法而不是只停留在弄懂特定客户端的库就显得十分重要。这样一来,开发者使用不同类型的库与协议进行沟通时就会容易的多。

    参考资料

    [1]:RabbitMQ中文文档
    [2]:https://www.cnblogs.com/dwlsxj/p/RabbitMQ.html

    展开全文
  • AMQP 协议详解

    万次阅读 2017-11-28 16:39:11
    一、AMQP 历史​ 消息队列(Message Queue)起源于一位来自 MIT 的硬件设计教育工作者 Vivek Ranadivé 设想了一种通用软件总线,就像主板上的总线那样,供其他应用程序接入。Vivek在1983年成立了 Teknekron,高盛等...

    一、AMQP 历史

    ​ 消息队列(Message Queue)起源于一位来自 MIT 的硬件设计教育工作者 Vivek Ranadivé 设想了一种通用软件总线,就像主板上的总线那样,供其他应用程序接入。Vivek在1983年成立了 Teknekron,高盛等公司作为第一批用户再金融交易中采用了 Teknekron的软件,同时还诞生了第一代消息队列软件:Teknekron 的 The Information Bus(TIB)。

    ​ Teknekron 的 TIB 允许应用开发者建立一系列规则去描述消息内容,只要消息按照这些规则发布出去,任何消费者应用都可以订阅感兴趣的内容,信息的生产者和消费者完全解耦,并且可以再传输过程中灵活混合。这个特性引起了电信特别是新闻机构的注意。1994年路透社收购了 Teknekron 。

    ​ 由于消息队列再金融交易中应用的反响,BIM 在1990年也开始研发自己的消息队列软件(BIM MQ),并且逐步演化成 WebSphere MQ 并统治着商业消息队列平台市场。同时微软开发了Microsoft Message Queue(MSMQ)。然而这些商业MQ问题在供应商壁垒,各个厂商的 MQ 之间无法互通。为了解决这个问题,Java Message Service(JMS)在2001年诞生了,试图通过提供公共 Java API的方式隐藏MQ各个供应商提供的实际接口,从而跨越壁垒和解决互通问题,但是由于使用单独的标准化接口来胶合众多不同的接口使应用程序反而变得更加脆弱。

    ​ 2004年 JPMorgan Chase 和 iMatix 公司一起合作开发 Advanced Message Queuing Protocol (AMQP,高级消息队列协议),从一开始就设计成为开放标准,任何人都可以执行这一标准,针对该标准任何人都可以和任何 AMQP 供应商提供的 MQ 服务器进行交互。

    二、AMQP 协议

    ​ AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。

    AMQP协议这种降低耦合的机制是基于与上层产品,语言无关的协议。是一种二进制协议,提供客户端应用与消息中间件之间多通道、协商、异步、安全、中立和高效地交互。从整体来看,AMQP协议可划分为两层: 
    

    Functional Layer

    ​ 功能层,位于协议上层主要定义了一组命令(基于功能的逻辑分类),用于应用程序调用实现自身所需的业务逻辑。例如:应用程序可以通过功能层定义队列名称,生产消息到指定队列,消费指定队列消息等基于(Message queues 模型)

    Transport Layer

    ​ 传输层,基于二进制数据流传输,用于将应用程序调用的指令传回服务器,并返回结果,同时可以处理信道复用,帧处理,内容编码,心跳传输,数据传输和异常处理。

    ​ 传输层可以被任意传输替换,只要不改变应用可见的功能层相关协议,也可以使用相同的传输层,同时使用不同的高级协议

    AMQ 模型设计驱动基于如下要求:

    ​ 1.保证基于模型实现的应用之间相互可以联通;

    ​ 2.提供对服务质量的可靠控制;

    ​ 3.命名规划,要求命名明确且保持一致;

    ​ 4.允许通过协议配置服务器连接;

    ​ 5.功能层命名能够简单的映射到应用程序级别的 API;

    ​ 6.职责单一明确,每个操作只做一件事情。

    AMQP 传输层设计驱动基于如下要求:

    ​ 1.使用二进制数据流压缩和解压,提高效率;

    ​ 2.可以处理任意大小的消息,且不做任何限制;

    ​ 3.单个连接支持多个通信通道;

    ​ 4.客户端和服务端基于长链接实现,且无特殊限制;

    ​ 5.允许异步指令基于管道通信;

    ​ 6.易扩展,基于新的需求和变化支持扩展;

    ​ 7.新版本向下兼容老版本;

    ​ 8.基于断言模型,异常可以快速定位修复;

    ​ 9.对编程语言保持中立;

    ​ 10.适应代码发展演变;

    三、AMQP 通用组件

    1.AMQ Model 架构

    ​ AMQ 作为中间层服务,把消息生产和消费分隔开来,当消息生产者出现异常,不影响消费者对消息的消费,当消费者异常时,生产者生产的消息可以存放到服务的内存或者磁盘,不会影响到消费的速率,同时,消息也可以基于路由的规则可以投递到指定的消费者消费。

    ​ AMQ 基于模块化通过 Exchange 和 Message Queue 两个组建组合实现消息路由分发:

    Exchange:

    基于消息生产者和路由规则可以将消息投递到指定的 Message Queue;

    Message Queue:

    能够将发送过来的消息进行存储,同时将消息转发给消费者;

    ​ Exchange 和 Message Queue之间存在绑定关系,消息到了 Exchange 后基于路由策略可以将消息投递到已绑定且符合路由策略的 Message Queue。

    1.1 模型重要组件
    Message Queue

    ​ 消息队列会将消息存储到内存或者磁盘中,并将这些消息按照一定顺序转发给一个或者多个消费者,每个消息队列都是独立隔离的,相互不影响。

    ​ 消息队列具有不同的属性:私有,共享,持久化,临时,客户端定义 或者服务端定义等,可以基于实际需求选择对应的类型,以 RabbitMQ 队列特性为例:

    共享持久化消息队列:将发送的消息存储到磁盘,然后将消息转发给订阅该队列的所有消费者;

    私有临时消息队列:RabbitMQ 支持 rpc 调用,再调用过程中消费者都会临时生成一个消息队列,只有当前消费者可见,且由服务端生成,调用完就会销毁队列。

    Exchange

    ​ 交换机收到生产者投递的消息,基于路由规则及队列绑定关系匹配到投递对应的交换机或者队列进行分发,交换机不存储消息,只做转发。

    AMQP定义了许多标准交换类型,基本涵盖了消息传递所需的路由类型,一般 AMQP 服务器都会提供默认的交换机基于交换机类型命名,AMQP 的应用程序也可以创建自己的交换机用于绑定指定的消息队列发布消息。

    1.2 消息的流转过程

    消息生命周期

    ​ 消息主要由属性及消息内容组成,生产者创建消息时可以给消息设置属性及消息内容,同时也可以标记路由信息在消息上,可以将消息发送到指定交换机。

    ​ 当消息到达交换机时,交换机会基于路由规则判断消息能否转发,如果不能转发会丢弃消息同时反馈给生产者。

    ​ 交换机基于路由规则可以将消息投递到一个或者多个消息队列,服务器通过复制或者计数器的方式将消息保存到不同队列中,每个队列中的消息内容是相同的,但是操作是隔离的,相互不影响。

    ​ 当消息到达消息队列后,消息队列会基于 AMQP 协议投递给消费者,如果无法投递给消费者或者没有消费者,消息将在内存或者磁盘中存储,等待消费者。

    ​ 当消息队列可以将消息传递给消费者时,消息将从其内部缓冲区中删除。 删除操作可能立刻执行也可以再消费者确认消息消费后再执行,删除策略消费者可以选择。

    生产消息投递确认和消费消息消费确认可以作为两个事务,然后提交或者回滚事务。

    2.AMQP 指令架构

    2.1 协议指令(类和方法)

    ​ 作为消息中间件传统的 API 定义的操作非常复杂,为了解决这个问题 AMQP 基于传统 API 的功能,定义方法来对应实现 API 的操作每个方法只完成一件事,通过方法之间的组合来实现完整的功能,所以AMQP 形成了一个非常庞大的指令集,但是指令集中的方法都是便于理解的。

    ​ AMQP 指令集中指令,基于对应的特定功能域被划分为不同的类,其中有一些类作为特定类的支持类,属于可选的。

    有如下两个场景:

    同步请求:

    ​ 一边等待对方发送请求,一边等待对方发送回复。适用于对性能要求不高的场景。

    异步请求:

    ​ 发送请求后不等待回复,使用场景对性能要求比较高的场景。

    ​ 为了简化指令处理,我们给每个同步请求定义不同的回复指令,也就是说同一个回复指令不可能返回给2个不同的请求。这也意味着发送同步请求的发送方可以接受和处理回复的指令,知道获得有效的同步回复指令为止。这种方式可以将 AMQP 与传统的 RPC 协议区分开来。

    ​ 一条指令可以被定义为同步请求,同步回复(针对特定请求)或者异步回复,但是每种指令真正再被定义是在客户端(即服务器到客户端)或者服务端(即户端到服务器)。

    2.2 AMQP 映射到中间层 API

    AMQP 映射到中间层 API,这个映射过程并不是所有方法和参数完全映射,因为有部分方法或者参数对应用程序没有意义。同时映射规则也是固定的,基于已定的一些规则,所有方法按照这个规则映射,不需要人工干预。

    例如:队列声明方法:

    Queue.Declare 
        queue=my.queue
        auto-delete=TRUE 
        exclusive=FALSE

    可以作为一条线性记录

    +--------+---------+----------+-----------+-----------+ 
    |  Queue | Declare | my.queue |     1     |     0     | 
    +--------+---------+----------+-----------+-----------+ 
       class    method     name    auto-delete  exclusive

    也可以作为高级 API

    queue_declare (session, "my.queue", TRUE, FALSE);

    ​ 对于大多数应用程序来说,中间层(指令层)隐藏再技术层面,应用程序实际使用的 API 功能对比中间层相对会较少。

    3.AMQP 传输层架构

    3.1 简要概述

    ​ AMQP 传输基于二进制协议,传输的信息被组织成各种类型的帧,帧携带协议方法和其他相关信息,所有的帧具有相同个格式:帧头,有效内容,帧尾。帧的有效内容格式取决于帧的类型。

    假设再一个可靠的面向流的网络传输层(例如:TCP / IP)

    ​ 再一个 Socket 连接中,可以有多个独立的线程访问,这种情况就是上文中提到的 Channel(通道),每个帧都有一个属于自己的通道号码,再同一个连接中所有的帧混合在一起,不同的通道共享连接,但是针对每个通道自身的帧都是按照严格的顺序运行。

    ​ 由于帧的有效内容都是由帧头和帧尾包装,所以对应帧数据的解析是相当简单便捷的,同时基于协议规范生成帧数据也是非常容易。

    3.2 数据类型

    AMQP 使用的数据类型如下:

    • Integers(数值范围1-8, 8个字节):用于表示大小,数量,限制等,整数类型无符号的,可以在帧内不对齐。
    • Bits(统一为8个字节):用于表示开/关值。
    • Short strings:用于保存简短的文本属性,字符串个数限制为255,8个字节
    • Long strings:用于保存二进制数据块。
    • Field tables:包含键值对,字段值一般为字符串,整数等。

    3.3 协议协商

    ​ AMQP 客户端和服务器存在协商协议。这意味着当客户端连接时,服务端会提出一些客户端可以接受或者修改的选项,如果双方达成一致,连接继续,基于协商协议,可以设定好一些先决条件。

    在AMQP中,协商协议的一些具体方面:

    • 实际的协议和版本。服务器可以在同一端口上监听多个协议。

    • 加密参数和双方的身份验证。

    • 最大帧大小,通道数量和其他操作限制。

    展开全文
  • AMQP协议

    2021-03-06 23:31:01
    当前各种应用大量使用异步消息模型,并随之产生众多消息中间件产品及协议,标准的不一致使应用与中间件之间的耦合限制产品的选择,并增加维护成本。...AMQP协议是一种二进制协议,提供客户端应用与消息中间件之间...
  • AMQP协议详解

    千次阅读 2019-05-15 15:26:44
    AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/...
  • AMQP协议 中文版

    千次阅读 2018-11-10 08:05:00
    AMQP协议 中文版
  • ActiveMQ系列—消息协议(AMQP协议

    千次阅读 2017-09-18 22:24:50
    AMQP协议的全称是:Advanced Message Queuing Protocol(高级消息队列协议)。目前AMQP协议的版本为 Version 1.0,这个协议标准在2014年通过了国际标准组织 (ISO) 和国际电工委员会 (IEC) 的投票,成为了新的 ISO 和...
  • 理解 AMQP协议

    千次阅读 2016-09-14 13:58:02
    当前各种应用大量使用异步消息模型,并随之产生众多消息中间件产品及协议,标准的不一致使应用与中间件之间的耦合限制产品的选择,并增加维护成本。...AMQP协议是一种二进制协议,提供客户端应用与消息
  • MQ - AMQP 协议详解

    千次阅读 2018-07-05 16:59:40
    一、AMQP 历史​ 消息队列(Message Queue)起源于一位来自 MIT 的硬件设计教育工作者 Vivek Ranadivé 设想了一种通用软件总线,就像主板上的总线那样,供其他应用程序接入。Vivek在1983年成立了 Teknekron,高盛等...
  • RabbitMQ学习(二)——AMQP协议

    千次阅读 多人点赞 2018-04-21 17:30:33
    AMQP协议介绍 RabbitMQ中常用的基本术语 RabbitMQ的工作流程介绍 RabbitMQ是消息传输的中间者,可以把它当做是一个消息代理,你把消息传送给它,它再把消息发送给具体的接收人。 这就像是邮局一样,你把邮件放入...
  • AMQP协议探索

    2021-03-03 15:14:07
    1.什么是AMQP协议(Advanced Message Queuing Protocol高级消息队列协议)我们知道HTTP、TCP/IP等等网络通信协议。HTTP就是超文本传输协议,其作...
  • 什么是AMQP高级协议AMQP核心概念是什么? RabbitMQ整体架构模型是什么样子的? RabbitMQ消息是如何流转的? RabbitMQ安装与使用 命令行与管控台 RabbitMQ消费生产与消费 RabbitMQ交换机详解 RabbitMQ队列...
  • 详解AMQP协议

    2020-01-06 21:07:52
    同时因为 AMQP 是一个网络协议,所以这个过程中的发布者,消费者,消息代理 可以分别存在于不同的设备上。 2、发布者发布消息时可以给消息指定各种消息属性(Message Meta-data)。有些属性有可能会被消息代理...
  • php amqp消息队列教程1-了解amqp协议

    千次阅读 2014-02-12 14:44:01
    当前各种应用大量使用异步消息模型,并随之产生众多消息中间件产品及协议,标准的不一致使应用与中间件之间的耦合限制产品的选择,并增加维护成本。AMQP是一个提供统一消息服务的应用...AMQP协议是一种二进制协议,提供
  • AMQP 协议介绍

    2018-10-01 17:48:00
    RabbitMQ 是遵从AMQP 协议的, 换句话说, RabbitMQ 就是AMQP 协议的Erlang 的实现(当然RabbitMQ 还支持STOMP2 、MQTT3 等协议) 0 AMQP 的模型架构和RabbitMQ 的模型架构是一样的,生产者将消息发送给交换器,交换器...
  • AMQP协议简介

    2019-09-05 20:29:14
    1.应用标准高级消息队列协议 2.应用层协议的一个开放标准 3.二进制协议 4.提供统一消息服务 5.为消息中间件设计 6.一种规范 7.也是ActiveMQ的规范 1.Message   消息。消息是不具名的,它由消息头消息体...
  • RabbitMQ入门-AMQP协议

    2020-03-11 11:14:46
       RabbitMQ和AMQP    AMQP生产者流转过程    AMQP消费者流转过程  ...RabbitMQ遵从AMQP协议,AMQP的模型架构和RabbitMQ的模型架构是一样的,...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 8,060
精华内容 3,224
关键字:

amqp协议三层