精华内容
下载资源
问答
  • JetLinks物联网基础平台-设备消息协议解析SDK
    千次阅读 多人点赞
    2020-03-31 15:18:43

    设备消息协议解析SDK

    平台封装了网络通信,但是具体的数据由消息协议进行解析.协议(ProtocolSupport)主要由认证器(Authenticator),
    消息编解码器(DeviceMessageCodec),消息发送拦截器(DeviceMessageSenderInterceptor)以及配置元数据(ConfigMetadata)组成.

    认证器

    认证器(Authenticator)是用于在收到设备请求(例如MQTT)时,对客户端进行认证时使用,不同的网络协议(Transport)使用不同的认证器.

    接口定义:

    public interface Authenticator {
        Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request,
                                                  @Nonnull DeviceOperator device);
    }
    

    参数AuthenticationRequest为认证请求参数,不同的网络类型请求类型也不同,请根据实际情况转换为对应的类型,例如:
    MqttAuthenticationRequest mqttRequest = (MqttAuthenticationRequest)request;

    参数DeviceOperator为对应的设备操作接口,可通过此接口获取设备的配置,例如:device.getConfig("mqttUsername").

    返回值Mono<AuthenticationResponse>为认证结果.

    例:

       Authenticator mqttAuthenticator =  (request, device) -> {
                MqttAuthenticationRequest mqttRequest = ((MqttAuthenticationRequest) request);
                return device.getConfigs("username", "password") //获取设备的配置信息,由配置元数据定义,在设备型号中进行配置.
                        .flatMap(values -> {
                            String username = values.getValue("username").map(Value::asString).orElse(null);
                            String password = values.getValue("password").map(Value::asString).orElse(null);
                            if (mqttRequest.getUsername().equals(username) && mqttRequest.getPassword().equals(password)) {
                                return Mono.just(AuthenticationResponse.success());
                            } else {
                                return Mono.just(AuthenticationResponse.error(400, "密码错误"));
                            }
                        });
            }
    

    消息编解码器

    用于将平台统一的消息(Message)设备端能处理的消息(EncodedMessage)进行相互转换.

    接口(DeviceMessageCodec)定义:

      //此编解码器支持的网络协议,如: DefaultTransport.MQTT
      Transport getSupportTransport();
      //将平台发往设备的消息编码为设备端对消息
      Publisher<? extends EncodedMessage> encode(MessageEncodeContext context);
      //将设备发往平台的消息解码为平台统一的消息
      Publisher<? extends Message> decode(MessageDecodeContext context);
    

    编码: 可以从上下文MessageEncodeContext中获取当前设备操作接口DeviceOperator以及平台统一的设备消息Message.根据设备侧定义的协议转换为对应的EncodedMessage.

    tip 注意: 不同的网络协议需要转换为不同的EncodedMessage类型.比如,MQTT需要转换为MqttMessage.

    大部分情况下:MessageDecodeContext可转为FromDeviceMessageContext,可获取到当前设备的连接会话DeviceSession,通过会话可以直接发送消息到设备.

    解码: 可以从上下文MessageDecodeContext中获取设备操作接口DeviceOperator以及设备消息EncodedMessage,然后将消息转换为平台统一的消息.

    平台统一消息定义

    平台将设备抽象为由属性(property),功能(function),事件(event)组成.
    平台接入设备之前,应该先将设备的属性``功能``事件设计好.

    消息组成

    消息主要由deviceId,messageId,headers组成.

    deviceId为设备的唯一标识,messageId为消息的唯一标识,headers为消息头,通常用于对自定义消息处理的行为,如是否异步消息,
    是否分片消息等.

    常用的(Headers)[https://github.com/jetlinks/jetlinks-core/blob/master/src/main/java/org/jetlinks/core/message/Headers.java]:

    1. aysnc 是否异步,boolean类型.
    2. timeout 指定超时时间. 毫秒.
    3. frag_msg_id 分片主消息ID,为下发消息的messageId
    4. frag_num 分片总数
    5. frag_part 当前分片索引
    6. frag_last 是否为最后一个分片,当无法确定分片数量的时候,可以将分片设置到足够大,最后一个分片设置:frag_last=true来完成返回.
    7. keepOnline 与DeviceOnlineMessage配合使用,在TCP短链接,保持设备一直在线状态,连接断开不会设置设备离线.

    tip:messageId通常由平台自动生成,如果设备不支持消息id,可在自定义协议中通过Map的方式来做映射,将设备返回的消息与平台的messageId进行绑定.

    属性相关消息

    1. 获取设备属性(ReadPropertyMessage)对应设备回复的消息ReadPropertyMessageReply.
    2. 修改设备属性(WritePropertyMessage)对应设备回复的消息WritePropertyMessageReply.
    3. 设备上报属性(ReportPropertyMessage) 由设备上报.

    注意:设备回复的消息是通过messageId进行绑定,messageId应该注意要全局唯一,如果设备无法做到,可以在编解码时通过添加前缀等方式实现.

    消息定义:

    ReadPropertyMessage{
        String deviceId; 
        String messageId;
        List<String> properties;//可读取多个属性
    }
    
    ReadPropertyMessageReply{
        String deviceId;
        String messageId;
        long timestamp;
        boolean success;
        Map<String,Object> properties;//属性键值对
    }
    
    WritePropertyMessage{
        String deviceId; 
        String messageId;
        Map<String,Object> properties;
    }
    
    WritePropertyMessageReply{
        String deviceId;
        String messageId;
        long timestamp;
        boolean success;
        Map<String,Object> properties; //回复被修改的属性最新值
    }
    
    ReportPropertyMessage{
        String deviceId;
        String messageId;
        long timestamp;
        Map<String,Object> properties;
    }
    

    功能相关消息

    调用设备功能到消息(FunctionInvokeMessage)由平台发往设备,对应到返回消息FunctionInvokeMessageReply.

    消息定义:

    FunctionInvokeMessage{
        String functionId;//功能标识,在元数据中定义.
        String deviceId;
        String messageId;
        List<FunctionParameter> inputs;//输入参数
    }
    
    FunctionParameter{
        String name;
        Object value;
    }
    
    FunctionInvokeMessageReply{
        String deviceId;
        String messageId;
        long timestamp;
        boolean success;
        Object output; //输出值,需要与元数据定义中的类型一致
    }
    

    事件消息

    事件消息EventMessage由设备端发往平台.

    消息定义:

    EventMessage{
        String event; //事件标识,在元数据中定义
        Object data;  //与元数据中定义的类型一致,如果是对象类型,请转为java.util.HashMap,禁止使用自定义类型.
        long timestamp;
    }
    

    其他消息

    1. DeviceOnlineMessage 设备上线消息,通常用于网关代理的子设备的上线操作.
    2. DeviceOfflineMessage 设备上线消息,通常用于网关代理的子设备的下线操作.
    3. ChildrenDeviceMessage 子设备消息,通常用于网关代理的子设备的消息.
    4. ChildrenDeviceMessageReply 子设备消息回复,用于平台向网关代理的子设备发送消息后设备回复给平台的结果.

    消息定义:

    DeviceOnlineMessage{
        String deviceId;
        long timestamp;
    }
    
    DeviceOfflineMessage{
        String deviceId;
        long timestamp;
    }
    
    
    ChildDeviceMessage{
        String deviceId;
        String childDeviceId;
        Message childDeviceMessage; //子设备消息
    }
    

    父子设备消息处理请看这里

    消息发送拦截器

    使用拦截器可以拦截消息发送和返回的动作,通过修改参数等操作实现自定义逻辑,如: 当设备离线时,将消息缓存到设备配置中,等设备上线时再重发.

    DeviceMessageSenderInterceptor{
         //发送前
          Mono<DeviceMessage> preSend(DeviceOperator device, DeviceMessage message);
    
         //发送后
          <R extends DeviceMessage> Flux<R> afterSent(DeviceOperator device, DeviceMessage message, Flux<R> reply);
    }
    

    在发送前,可以将参数DeviceMessage转为其他消息.

    发送后,会将返回结果流Flux<R>传入,通过对该数据流对操作以实现自定义行为,如:忽略错误等.

    配置元数据

    配置元数据用于告诉平台,在使用此协议的时候,需要添加一些自定义配置到设备配置(DeviceOperator.setConfig)中.
    在其他地方可以通过DeviceOperator.getConfig获取这些配置.

    例如:

    CompositeProtocolSupport support = new CompositeProtocolSupport();
    support.setId("demo-v1");
    support.setName("演示协议v1");
    support.setDescription("演示协议");
    support.setMetadataCodec(new JetLinksDeviceMetadataCodec()); //固定为JetLinksDeviceMetadataCodec,请勿修改.
    
    DefaultConfigMetadata mqttConfig = new DefaultConfigMetadata(
                "MQTT认证配置"
                , "")
                .add("username", "username", "MQTT用户名", new StringType())
                .add("password", "password", "MQTT密码", new PasswordType());
    //设置MQTT所需要到配置
     support.addConfigMetadata(DefaultTransport.MQTT, mqttConfig);
    
    

    完整例子

    演示协议

    更多相关内容
  • 基本功:消息协议

    万次阅读 2019-04-17 22:58:52
    消息是信息交换的主体,简单的讲,就是两个进程约定一个协议格式。消息表示指的是序列化后的消息字节流在直观上的表现...接下来将带你了解 RPC 的消息协议背后有哪些需要考虑的基本点。 目录 1. 消息边界 1.1 ...

    消息是信息交换的主体,简单的讲,就是两个进程约定一个协议格式。消息表示指的是序列化后的消息字节流在直观上的表现形式,它看起来是对人类友好还是对计算机友好。文本形式对人类友好,二进制形式对计算机友好。每个消息都有其内部字段结构,结构构成了消息内部的逻辑规则,程序要按照结构规则来决定字段序列化的顺序。接下来将带你了解 RPC 的消息协议背后有哪些需要考虑的基本点。

    目录

    1. 消息边界

    1.1 特殊分割符法

    1.2 长度前缀法

    2. 消息表示

    2.1 文本消息

    2.2 二进制消息

    2.3 序列化协议考虑的因素

    2.4 混合模式-HTTP 协议

    3. 消息的结构

    4. 消息压缩


    消息协议的基本原理

    1. 消息边界

    RPC 需要在一条 TCP 链接上进行多次消息传递。基于 TCP 链接之上的单条消息如果过大,就会被网络协议栈拆分为多个数据包进行传送。如果消息过小,网络协议栈可能会将多个消息组合成一个数据包进行发送。

    问题:对于接收端来说它看到的只是一串串的字节数组,如果没有明确的消息边界规则,接收端是无从知道这一串字节数组究竟是包含多条消息还是只是某条消息的一部分?

    比较常用的两种分割方式是特殊分割符法和长度前缀法。

    1.1 特殊分割符法

    消息发送端在每条消息的末尾追加一个特殊的分割符,并且保证消息中间的数据不能包含特殊分割符。比如最为常见的分割符是\r\n。当接收端遍历字节数组时发现了\r\n,就立即可以断定\r\n 之前的字节数组是一条完整的消息。HTTP 和 Redis 协议就大量使用了\r\n 分割符。此种消息一般要求消息体的内容是文本消息

    优点

    消息的可读性比较强,可以直接看到消息的文本内容。

    缺点

    不适合传递二进制消息,因为二进制的字节数组里面很容易就冒出连续的两个字节内容正好就是\r\n 分割符的 ascii 值。如果需要传递的话,一般是对二进制进行 base64 编码转变成普通文本消息再进行传送。

    1.2 长度前缀法

    消息发送端在每条消息的开头增加一个 4 字节长度的整数值,标记消息体的长度。这样消息接受者首先读取到长度信息,然后再读取相应长度的字节数组就可以将一个完整的消息分离出来。此种消息比较常用于二进制消息

    基于长度前缀法的优点和缺点同特殊分割符法正好是相反的。长度前缀法因为适用于二进制协议,所以可读性很差。但是对传递的内容本身没有特殊限制,文本和内容皆可以传输,不需要进行特殊处理。HTTP 协议的 Content-Length 头信息用来标记消息体的长度,这个也可以看成是长度前缀法的一种应用。

    2. 消息表示

    二进制消息和文本消息的表示方式就是我们熟悉的序列化反序列化

    使用“对象”来进行数据的操纵:

    class User{
             std::String user_name;
             uint64_t user_id;
             uint32_t user_age;
    };

    User u = new User(“shenjian”);
    u.setUid(123);
    u.setAge(35);

    但当需要对数据进行存储或者传输时,“对象”就不这么好用了,往往需要把数据转化成连续空间的“二进制字节流”,一些典型的场景是:

    • 数据库索引的磁盘存储:数据库的索引在内存里是b+树,但这个格式是不能够直接存储到磁盘上的,所以需要把b+树转化为连续空间的二进制字节流,才能存储到磁盘上
    • 缓存的KV存储:redis/memcache是KV类型的缓存,缓存存储的value必须是连续空间的二进制字节流,而不能够是User对象
    • 数据的网络传输:socket发送的数据必须是连续空间的二进制字节流,也不能是对象

    所谓序列化(Serialization),就是将“对象”形态的数据转化为“连续空间二进制字节流”形态数据的过程。这个过程的逆过程叫做反序列化。

    这是一个非常细节的问题,要是让你来把“对象”转化为字节流,你会怎么做?

    2.1 文本消息

    很容易想到的就是xml(或者json)这类具有自描述特性的标记性语言:规定好转换规则,发送方很容易把User类的一个对象序列化为xml进行信息的交换

    <class name=”User”>
    <element name=”user_name” type=”std::String” value=”shenjian” />
    <element name=”user_id” type=”uint64_t” value=”123” />
    <element name=”user_age” type=”uint32_t” value=”35” />
    </class>

    2.2 二进制消息

    以上面的User对象为例,你可以设计一个二进制消息方式来进行序列化:整个二进制字节流共12+29+27+24=92字节。

    • 第一行:序号4个字节(设0表示类名),类名长度4个字节(长度为4),接下来4个字节是类名(”User”),共12字节

    • 第二行:序号4个字节(1表示第一个属性),属性长度4个字节(长度为9),接下来9个字节是属性名(”user_name”),属性值长度4个字节(长度为8),属性值8个字节(值为”shenjian”),共29字节

    • 第三行:序号4个字节(2表示第二个属性),属性长度4个字节(长度为7),接下来7个字节是属性名(”user_id”),属性值长度4个字节(长度为8),属性值8个字节(值为123),共27字节

    • 第四行:序号4个字节(3表示第三个属性),属性长度4个字节(长度为8),接下来8个字节是属性名(”user_name”),属性值长度4个字节(长度为4),属性值4个字节(值为35),共24字节

    实际的序列化协议要考虑的细节远比这个多,例如:强类型的语言不仅要还原属性名,属性值,还要还原属性类型;复杂的对象不仅要考虑普通类型,还要考虑对象嵌套类型等。无论如何,序列化的思路都是类似的。

    2.3 序列化协议考虑的因素

    不管使用成熟协议xml/json,还是自定义二进制协议来序列化对象,序列化协议设计时都需要考虑以下这些因素。

    • 解析效率:这个应该是序列化协议应该首要考虑的因素,像xml/json解析起来比较耗时,需要解析doom树,二进制自定义协议解析起来效率就很高

    • 压缩率,传输有效性:同样一个对象,xml/json传输起来有大量的xml标签,信息有效性低,二进制自定义协议占用的空间相对来说就小多了

    • 扩展性与兼容性:是否能够方便的增加字段,增加字段后旧版客户端是否需要强制升级,都是需要考虑的问题,xml/json和上面的二进制协议都能够方便的扩展

    • 可读性与可调试性:这个很好理解,xml/json的可读性就比二进制协议好很多

    • 跨语言:上面的两个协议都是跨语言的,有些序列化协议是与开发语言紧密相关的,例如dubbo的序列化协议就只能支持Java的RPC调用

    • 通用性:xml/json非常通用,都有很好的第三方解析库,各个语言解析起来都十分方便,上面自定义的二进制协议虽然能够跨语言,但每个语言都要写一个简易的协议客户端

    有哪些常见的序列化方式?

    • xml/json:解析效率,压缩率都较差,扩展性、可读性、通用性较好

    • thrift

    • protobuf:Google出品,必属精品,各方面都不错,强烈推荐,属于二进制协议,可读性差了点,但也有类似的to-string协议帮助调试问题

    • Avro

    • CORBA

    2.4 混合模式-HTTP 协议

    HTTP 协议是一种基于特殊分割符和长度前缀法的混合型协议。比如 HTTP 的消息头采用的是纯文本外加\r\n 分割符,而消息体则是通过消息头中的 Content-Type 的值来决定长度。HTTP 协议虽然被称之为文本传输协议,但是也可以在消息体中传输二进制数据数据的,例如音视频图像,所以 HTTP 协议被称之为「超文本」传输协议。

    3. 消息的结构

    每条消息都有它包含的语义结构信息,有些消息协议的结构信息是显式的,还有些是隐式的。

    显式

    比如 json 消息,它的结构就可以直接通过它的内容体现出来,所以它是一种显式结构的消息协议。json 这种直观的消息协议的可读性非常棒,但是它的缺点也很明显,有太多的冗余信息。

    隐式

    消息的隐式结构一般是指那些结构信息由代码来约定的消息协议,在 RPC 交互的消息数据中只是纯粹的二进制数据,由代码来确定相应位置的二进制是属于哪个字段。

    消息的结构在同一条消息通道上是可以复用的,比如在建立链接的开始 RPC 客户端和服务器之间先交流协商一下消息的结构,后续发送消息时只需要发送一系列消息的 value 值,接收端会自动将 value 值和相应位置的 key 关联起来,形成一个完成的结构消息。在 Hadoop 系统中广泛使用的 avro 消息协议就是通过这种方式实现的,在 RPC 链接建立之处就开始交流消息的结构,后续消息的传递就可以节省很多流量。

    如果纯粹看消息内容是无法知道节点消息内容中的哪些字节的含义,它的消息结构是通过代码的结构顺序来确定的。这种隐式的消息的优点就在于节省传输流量,它完全不需要传输结构信息。

    4. 消息压缩

    如果消息的内容太大,就要考虑对消息进行压缩处理,这可以减轻网络带宽压力。但是这同时也会加重 CPU 的负担,因为压缩算法是 CPU 计算密集型操作,会导致操作系统的负载加重。所以,最终是否进行消息压缩,一定要根据业务情况加以权衡。

    更多关于压缩之前也讲过。

    展开全文
  • 消息协议则是指用于实现消息队列功能时所涉及的协议。按照是否向行业开放消息规范文 档,可以将消息协议分为开放协议和私有协议。常见的开放协议有 AMQP, MQTT STOMP,XMPP 等。有些特殊框架(如 Red Kafka ZeroMQ )...

    前言

    简单理解
    就是需要大家都遵守的 套规 。在计算机领域中,只要涉及不同的计算机之间要共同完成一
    事情的时候,就肯定会有协议的存在,就像我们说话用某种语言一样,不 同的计算机之间必
    须使用相同的语 才能进行通信
    在这里插入图片描述
    消息协议则是指用于实现消息队列功能时所涉及的协议。按照是否向行业开放消息规范文
    档,可以将消息协议分为开放协议和私有协议。常见的开放协议有 AMQP, MQTT STOMP,XMPP 等。有些特殊框架(如 Red Kafka ZeroMQ )根据自身需要未严格遵循 MQ 规范,而
    是基于 TCP IP 自行封装了 协议,通过网 Socket 接口进行传输,实现了 MQ 的功能。这
    的协议可以简 地理解成对双方通信 个约定,比如传过来 衍流数据,其中第
    节表示什么, 字节表示什么,类似这样的约定。本章主要介绍常见的几种开放协议,
    并且主要围绕每种协议约定的数据格式来阐述,包括每种协议中的基本概念, 及约定的互相
    通信的消息数据格式等。

    AMQP协议

    amqp的三个部分

    1. 基本概念:基本概念是指 AMQP 内部定义的各组件及组件的功能说明
    2. 功能命令:是指该协议所定义的 系列命令,应用程序可以基于这些命令来实现相应的功能
    3. 传输层协议:是一个网络级协议,它定义了数据的 传输格式,消息队列的客户端可以基于这个协议与消息代理和 AMQP 的相模型进行交互通信,该协议的内容包括数据, 吭处理、信道 用、内容编码、心跳检测、数据表示和错误处理等。

    主要概念

    • Message (消息):消息服务器所处理数据的原子单元。消息可以携带内容,从格式上看,消息包括 个内容头、 组属性和 个内容体这里所说的消息可以对应到许多不同应用程序的实体,比如 个应用程序级消息、 个传输文件、 个数据流帧等。消息可以被保存到磁盘上这样即使发生严重的网络故障、服务器崩溃 可确保投递。消息可以有优先级,高优先级的消息会在等待同 个消息队列时在低优先级的消息之前发送,当消息必须被丢弃以确保消息服务器的服务质量时,服务器将会优先丢弃低优先级的消息 。消息服务器不能修改所接收到的井将传递给消费者应用程序的消息容体。消息服务器可以在内容头中添加额外信息,但不能删除或修改现有信息。
    • Publisher (消息生产者)也是 个向交换器发布消息的客户端应用程序 对于java来说就是发送请求的那一段代码
    • Exchange (交换器):用来接收消息生产者所发送的消息井将这些消息路由给服务器中的队列。
    • Binding (绑定) 用于消息队列和交换器之间的关联。 个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以以将交换器理解成 个由绑定构成的路由表
    • Virtual Host (虚拟主机):它是消息队列以及相关对象的集合,是共享同一个身份验证和加密环境的独 服务器域。每个虚拟主本质上都是 mini 版的消息服务器,拥有自己的队列、交换器、绑定和权限机制。
    • Broker (消息代理) 表示消息队列服务器实体,接受客户端连接,实现 AMQP 消息队列和路由功能的过程
    • Routing Key (路由规则〉:虚拟机可用它来确定如何路由一个特定消息。
    • Queue (消息队列):用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可被投入 个或多个队中息一直在队列里面,等待消费者连接到这个队列将其取走。
    • Connection (连接〉 可以理解成客户端和消息队列服务器之间的 TCP 连接
    • Channel (信道):仅仅当创建了连接后,若客户端还是不能发送消息,则需要为连接创建一个信道。信道是 条独立的双向数流通道,它是建立在真实的 TCP 连接内的虚拟连接, AMQP 令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,它们都通过信道完成。 个连接可以包含多个信道,之所以需要信道,==是因为TCP 接的建立和释放都是 分昂贵的,==如果客户端的每 个线程都需要与消息服务器交互,如果每 个线程都建立了 TCP 连接,暂且不考虑 TCP 接是否?浪费,就算操作系统也无法承受每秒建立如此多的 TCP 连接

    核心组件的生命周期

    在这里插入图片描述
    (1 )消息的生命周期

    生产者( Publisher )在发布消息时可 给消息指定各种消息属性( message meta-data ),其中有些属性有可能会被消息代理( Broker )所使用,而其他属性则是完全不透明的, 们只能被接收消息的应用所使用。当消息到达服务器时,交换器通常会将消息路由到服务器上的消息队列中,如果消息不能路由,则交换器会将消息丢弃或者将其返回给生产者,这样生产者可以选择如何来处理未路由的消息。单条消息可存在于多个消息队列中 ,消息代理可以采用 复制消息等多种方式进行处理。但是当 条消息被路由到多个消息队列中时,它在每个消息队列中都是一样 。当消息到达消息队列时,==消息队列会 尝试将消息传递给消息消费者。如果传递不成功 ,则消息队列会存储消息(按生产者要求存在者 内存或磁盘中),并 待消 费者准备好。==如果没有消费者,则消息队列通过 AMQP 将消息返回给生产者( 如果需要的话) 。当消息队列把消息传递给消费者后 ,它 从内部 中删除消息,删除动作可能是 发生的, 可能在消费者应答 己成功处理之后再删除。消息消费者可选择如何及何时来应答消息,同 消费者也可以拒绝消息 (一个否定应答〉。

    (2)交换器的生命周期
    每台 AMQP 务器都预先 建了许多交换器实例,它们在服务器启动时就存在并且不能被销毁 如果你的应用程序有特殊要求,则可以选择自己创建交换器,并在完成工作后进行销毁
    (3)队列的生命周期
    这里主要有两种消 队列的生命周期,即持久化消息队列和临时消息队列。持久化消息队列可被 个消费者共享 不管是否有消费者接收,它们都可 以独立存 临时消 队列对消费者是私有的,只能绑定到此消费者,当消费者断开连接时,该消息队列将被删除。

    MQTT 协议

    它是一个基于TCP IP 协议、可提供发布/订阅消息模式、十分轻量级的通信协议。除标准版外,还有 个简化版本MQTT SN ,它基于非 TCP IP 协议(如 ZigBee 协议),该协议主要为嵌入式设备提供消息通信。这里主要介绍标准版 MQTT 3.1.1 ,该协议是一个基于客户端-服务器的消息发布/订阅传输协议,其特点是轻量、简单、开放和易于实现。正因为这些特点,使它常应用于很多机器计算能力有限、低带宽、网络不可 的远程通信应用场景中。
    目前有很多 MQT 消息中间件服务器,如下都是 MQTT 协议的服务器端的实现
    IBM WebSphere MQ Teleme IBM MessageSig Mosquitto clipse ho emqttd Xively
    m2m.io webMethods Nirvana Messa ing == RabbitMQ pache ActiveMQ == Apache Apollo
    Moque仗队 HiveMQ Mosca Litmus Automation Loop JoramMQ hingMQ VemeMQ

    主要概念

    所有基于 网络连接的应用都会有客户端( Client )和服务器( Server ),而在 MQTT 协议使用者有 种身份: 发布者( Publisher )、代理( Broker )和订阅者( Subscriber )。其中消息的发布者和 订阅者都是客户端,消息代理是服务器 消息发布者可以同时是订阅者。

    一条消息 流转过程是这样的 先由消息发布者发布消息到代理服务器,在消息中会包含
    主题( Topic ),之后消息订阅者如果订阅了该主题的消息,将会收到代理服务器推送的消息

    在这里插入图片描述

    MQTT 协议中的 组件

    • 网络连接( Network Connection ):网络连接 客户端连接到服务器时所使 有的底层传输协议,由该连接来负责提供有序的、可靠的基于字节流的双向传输
    • 应用消息( ppli cat ion Message ):应用消息指通过网络所传输的应用数据,该数据 般包括 题和负载两部分。
    • 主题( Topic):主题题相当于应用消息的类型 ,消息订阅者订阅后,就会收到该主题的消 内容
    • 负载( Payload): 负载指消息订阅者具体接收的内容
    • 客户端(Client)客户端指使用 MQTT 程序或设备 客户端总是通过网络连接到服务端,它可以发布应用消息给其他相关的客端、订阅消息用以请求接收相关的应用消息、取消订阅应用消息、从服务器断开连接等。
    • 服务器( Server):服务器也是指程序或设备,它作为发送消息的客户端束 请求订阅的客户端之间的中介。服务器的功能包括接收来自客户端的网络连接、接收客户端发布的应用消息、处理客户端的订阅和取消订阅的请求、转发应用消息给相应的客户端等
    • 会话( Sess on):客户端与服务器建立连接之后就是 个会话,客户端和服务器之间通过会话来进行状态交互。会话存在于一个网络连接之间,也可能会跨越多个连续的网络连接。会话主要用于客户端和服务器之间的逻辑层面的通信
    • 订阅( Su scripti on):订阅一般与 个会话关联,会话可以包含多于 个的订阅。订阅包含 个主题过滤器和个服务质量( QoS )等级。会话的每个订阅都有 个不同的主题过滤器。
    • 主题名( opic Name):主题名是附加在消息上的一个标签,该标签与服务器的订阅相匹配,服务器会根据该标签将消息发送给与订阅所匹配的每个客户端
    • 主题过滤器( pi Filter):主题过滤器是订阅中包含的一个表达式,用于表示相关联的一个或多个主题。主题过滤器可以使用通配衍。
    • MQTT 控制拇文 CMQT Control Packet):MQT 控制报文实际上就是通过网络连接发送的信息数据包。

    STOMP 协议

    STOMP ( Streaming Text Orientated Messaging Protocol 流文本定向消息协议〉是 个简单
    的文本消息传输协议, 它提供了 种可互操作的连接格式,允许客户端与任意消息服务器
    Broker )进行交互 如下都 STOMP
    协议的服务器端实现。
    Apache Apollo Apach ActiveMQ RabbitMQ Horn tQ Sta mpy StompServer

    主要概念

    与其他消 协议相 同, STOMP 同样包含客户端和服务器,这里的客户端既可以是消息生产者,也可以是消息梢费者,而服务器就是消息数据的目的地,所有消息都会被发送到服务器
    在这里插入图片描述
    STOMP 客户端和服务器之间的通信是基于 来实现的 每一 都包括一 表示命令
    符串、一系列可选的 头条目和帧的数据内容。帧的数据格式如下
    在这里插入图片描述

    JMS 协议

    JMS CJava Message Service) ep Java 消息服务应用程序接口,是 Ja 平台中面向消息中间
    件的 套规范的 JavaAP 接口,用于在两个应用程序之间或分布式系统中发送消息,进行异步
    通信 这套规范由 IB 提出
    jms并不是消息队列协议的 种,更不是消息队列产品,它是与具体平台无关 API 目前市
    面上的绝大多数消息中间件厂商都支持 接口规范 换句话说 你可 使用 JMSAPI 来连接
    支持 AMQP STOMP 等协议的消息中间件产品(比如 Act veM RabbitMQ 等),在这一点上
    它与 Java 中的 IDB 的作用很像,我们可以用 IDB CAPI 来访问具体的数据库产品( Oracle
    My 等)。

    体系架构

    JMS 之前,大部分消息队列产品都支持点对点 发布 订阅两种方式来传递消息。基于
    此, JMS 将这两种消息模型抽象成两类规范,它们相互独立,由 JMS 的提供商(即消息队列产
    品的具体厂商〉自己选择实现其中的 种还是两 模型。 JMS 的作用是提供通用接口保证基于
    JMS PI 编写 程序适用于任何 种模型 使得在更换消息队列提供商 况下 程序
    代码也不需要做太大 改动。

    (1 ) 点对点模型在点对点( Point to int )模型中,应用程序由队列( Queue )、发送者( Sender )和接
    (Receiver )组成。每条消息都被发送到一个特定的队列中,接收者从队列中获取消息( )。
    队列中 直保留着消息,直到它们被接收或超时。点对点模型的特点如下:

    • 每条消息只有一个接收者,消息一旦被接收就不再保留在消息队列中了。
    • 发送者 接收者之间在时间上没有依赖。也就是说,当消息被发送之后 不管接收者有没有在运行,都不会影响消息被发送到队列中。
    • 每条消息仅会被传送给 个接收者 也就是说, 个队列中可能会有多个接收者听,但是消息只能被队列中的 个接收者接收
    • 消息存在先后顺序 。一个队列会按照消息服务器将消息放入队列中的顺序把它们传送给接收者。当消息己经被接收时就会从队列头部将它们删除(除非使用了消息优先级)
    • 当接收者收 消息时,会发送确认收到通知

    所以,一般情况下,如果希望所发送的每条消息都能被成功处理,则需要使用点对点模型

    在这里插入图片描述
    (2 )发布/订阅模型
    在发布/订阅( Pub Sub )模型中,应用程序由主题( Topic )、发布者( Publi )和订阅者
    Subsc )组成。发布者发布 条消息,该消息通过 题传递给所有的订阅者
    在这种模型中,发布者和订阅者彼此不知道对方,它们是匿名的井且可以动态发布和订阅主题。
    主题用于保存和传递消息,并且会 直保存消息直到消息被传递给订阅者。
    在这里插入图片描述
    发布/订阅模型的特点如下:

    • 每条消息可以有多个订阅者
    • 发布者和订阅者之间有时间上的依赖 一般情况下,某个主题的订阅者需要在创建了订阅之后才能接收到消息,而且为了接收消息订阅者必须保持运行的状态。
    • JMS 允许订阅者创建 可持久化的订阅,这样即使订阅者没有运行也能接收到所阅的消息
    • 每条消息都会传送给该主题下的所有 订阅者
    • 通常发布者不会知道 意识不到哪一个订阅者正在接收消息所以,如果希望所发送的消息不被做任何处理或者被 个或多个订阅者 ,则可以使用发布/订阅模型。

    基本概念

    按照、JMS 规范中所说的, 一个JMS 应用由如下几个部分组成

    • JMS 客户端( JMS Client): 指发送和接收消息的 Java 程序
    • 非JMS 客户端(Non JMS Client ):指使用消息系统原生 客户端 API 代替 JMS 的客户端。 果应用程序在 JMS 规范前就己存在, 它可能同时包 JMS 客户端 JMS客户端。
    • 消息( Message) 每个应用都定义了一组消息,用于多 客户端之间的消息通信
    • JMS 提供商( JMS Provider ):指实现了 JMS API 实际消息系统。
    • 受管对象( Administered Obect :指由 管理员创建, 并预先 好给客户端使用的血但对象。 JMS 中的受管对象分为两种,即 ConnectionFactory (客户端使用这个对象来建到提供者的连接)和 Destination (客户端使用这个对象来指定发送或接收消息的目
      的地)。

    具体到 JMS 应用程序, 主要涉及以 基本概念

    • 生产者( Producer 建并发送消息 JMS 客户端,在点对点模型中就是发送者 ,在发布/订阅模型中就是发布者
    • 消费者( Consumer ): 接收消息的 JMS 客户端 在点对点模型中就是接收者 在发订阅模型中就是订阅者。
    • 客户端( lient ):生产或消费消息的基于 Jav 应用程序或对象。
    • 队列( Queue 个容纳被发送的等待阅读的消息的区域。它是点对点模型中的队列。
    • 主题( Topic ):一种支持发送消息给多个订阅者的机制。它是发布/订阅模型中的主题。
    • 在 JMS 客户端之间传递的数据对象。 JMS 消息又包括消息头、属性和消息体 部分。消息头是指所有消息都支持的相同的头字段集,它包含了客户端JMS 提供商都要使用的用于标识和路由消息的值 属性是指除标准的头宇段外,消息接口还 含了 支持属性值的内建机制 实际上,这 是为消息提供了 加可选的消息头字段的机制。消息属性包括应用专有属性、标准属性、提供商专有属性
      JMS 定义了几种消息体类型,这些类型覆盖了当前使用的大部分消息风格。消息体就是指实际的消息内容。

    编程接口

    ConnectionFactory 接口(连接工厂〉
    ConnectionFactory onnection 工厂,根据不同的消息类型用户可选择用队列
    连接工厂或者主题连接 厂,分别对应 QueueConnecti onF actory TopicConnectionFactory
    以通过 JNDI 来查找 ConnectionFactory 对象。

    Destination 接口(目的地)
    Destination 是一 包装了消息目的地标识符的受管对象 消息目的地是指消息发布和接收的地点,消息目的地要么是队列要么是主题。对于消息生产者来说 ,它的 Destination 是某个队列或某个主题:对于消息消费者来说,它的 Destination 是某个队列或主题( 即消息来源)。所以 Destination 实际上就是两种类型的对象 Queue Topic 可以通过 JNDI 来查找 Destination

    Connection 接口(连接)
    Connection 表示在客户端和 JMS 系统之 建立的连接(实际上是对 TCP IP Socket 的包装)。
    Connection 可以产生 个或多个 Session ,跟 ConnectionFactory 样, onnection 有两 类型
    QueueConnection TopicConnection

    Session 接口(会话)
    Session 是实际操作消息的接口,表示一 单线程的上下文,用于发送和接 消息。因为会
    话是单线程的,所以消息是按照发送的顺序 个个接收的 。可 以通过 Session 创建生产者、消费
    者、消息等。在规范 Session 还提供了事务的功能。 Session 分为两种类型 QueueSession
    军日 TopicSession

    MessageProducer 接口(消息生产者〉
    消息生产者由 Session 创建并用于将消息发送到 Destination 消费者可以 步(阻塞模式)
    或异步(非阻塞模式)接收队列和主题类型的消息。消息生产者有两种类型 QueueSender
    TopicPublisher

    MessageConsumer 接口(消息消费者)
    消息消费者由 Session 建,用于接收被发送到 Destination 的消息。消息、消费者有两种类刑
    QueueReceiver TopicSubscriber

    Message 接口(消息〉
    消息是在 费者 生产者之 传送 对象,即将消 应用程序发送 应用程序。

    MessageListener (消息监 器)
    如果果注册 消息监听 那么当消息 将自动调用监 onMessage 方法。

    总结

    在使用消息队列时肯定需要了JMS ,该规范实际上是对 AMQPMQTT STOMP XMPP 等消息通信协议的更高 层的抽象 从消息队列使用者的角度来看,JMS 对所需要 理的常规 题都已经提供了相关支持
    节选自-------《分布式消息中间件实践》 可以在脚本之家下载

    展开全文
  • C++使用protobuf 作为网络消息协议

    热门讨论 2015-03-25 18:35:16
    一个c++使用protobuf作为消息协议的一个小demo,从这个demo里你可以很好地理解进行socket编程中的数据包的设计以及数据的打包和解包。
  • java socket通信自定义消息协议

    热门讨论 2010-05-26 16:26:34
    java socket通信自定义消息协议,socket以字节码的方式通信传递,客户端与服务器端分别进行转换与解析的过程实现简单的消息协议
  • 首先我们将讨论几种常用消息队列协议的基本原理和工作方式,包括MQTT、XMPP、Stomp、AMQP、OpenWire等。然后在这个基础上介绍两款MQ产品:ActiveMQ和RabbitMQ,它们是现在业务系统中应用广泛的消息队列软件。包括...

    1、概述

    从本文开始,我们介绍另一类型的系统间通讯及输:MQ消息队列。首先我们将讨论几种常用消息队列协议的基本原理和工作方式,包括MQTT、XMPP、Stomp、AMQP、OpenWire等。然后在这个基础上介绍两款MQ产品:ActiveMQ和RabbitMQ,它们是现在业务系统中应用广泛的消息队列软件。包括他们的安装、运行、支持协议、集群化和调用方式。

    当然,在这个过程中我们还会提到其他的消息队列协议(或者实现),例如微软JBossMQ、MSMQ、商业化产品WebSphere MQ、Oracle高级队列(AQ)等。我们还会讨论这些眼花缭乱的协议、软件、程序库之间的关系

    随后我们会花一些篇幅,讨论现在新兴的消息队列Kafka和ZeroMQ。它们的应用越来越广泛,尤其在大数据的采集方面。最后我们将使用消息队列搭建一个高性能的日志采集系统,作为实战。

    2、基本概念

    2-1、消息

    首先有三个基本概念在开篇前我们需要进行讨论:消息、消息协议、消息队列。消息既是信息的载体 这个描述相信各位读者都能够明白。为了让消息发送者和消息接收者都能够明白消息所承载的信息(消息发送者需要知道如何构造消息;消息接收者需要知道如何解析消息),它们就需要按照一种统一的格式描述消息,这种统一的格式称之为消息协议。所以,有效的消息一定具有某一种格式;而没有格式的消息是没有意义的

    而消息从发送者到接收者的方式也有两种。一种我们可以称为即时消息通讯,也就是说消息从一端发出后(消息发送者)立即就可以达到另一端(消息接收者),这种方式的具体实现就是我们已经介绍过的RPC(当然单纯的http通讯也满足这个定义);另一种方式称为延迟消息通讯,即消息从某一端发出后,首先进入一个容器进行临时存储,当达到某种条件后,再由这个容器发送给另一端。 这个容器的一种具体实现就是消息队列

    2-2、知识结构

    消息队列和已经介绍过的RPC相同的是:无论是RPC也好,消息队列也好他们都建立在网络IO模型基础上(我们已经介绍过多种网络IO模型)。先进的网络IO模型将赋予MQ协议优异的性能表现(当然,性能也不仅仅取决于网络IO模型)。

    这里写图片描述

    从上图可以看到,某一种消息通讯软件(或者叫做程序库)的实现都建立在“协议”基础上:RMI程序库建立在RMI协议上(RMI协议是JAVA规范协议的一部分) ,属于一种“即时消息通讯”;RabbitMQ和Qpid消息通讯软件的设计依据是AMQP协议,属于一种“延迟消息通讯”。

    虽然消息协议存在“私有协议”和“开放协议”之分(是否向行业开放消息规范文档、是否允许某个组织更改协议),虽然某一个软件(程序库)不一定只支持一种协议(例如ActiveMQ实现了多种消息协议),虽然某一种协议也不一定只有一种软件(程序库)实现(例如能够支持webservice协议的程序库就有Codehaus XFire、Apache CXF、Jboss RESTEasy等),但是这并不影响“某一种消息通讯软件(或者叫做程序库)的实现都建立在“协议”基础上”的概念,反而是这个基本概念加强了。

    3、消息协议

    那么要理解消息队列,我们就应该从这些支持“延迟消息通讯”的消息协议开始讨论。这个小节我们首先为各位读者介绍几种使用的消息协议,他们是XMPP、Stomp和AMQP。为了承接后文我们讲解的MQ软件,这三个协议中我们又着重讲解AMQP协议。

    3-1、XMPP协议

    3-1-1、定义

    XMPP is the Extensible Messaging and Presence Protocol, a set of open technologies for instant messaging, presence, multi-party chat, voice and video calls, collaboration, lightweight middleware, content syndication, and generalized routing of XML data.

    以上内容引用自XMPP官网,这个定义已经可以清楚表明XMPP协议的用途和特性。XMPP的前身是Jabber,一个开源形式组织制定的网络即时通信协议。XMPP目前被IETF国际标准组织完成了标准化工作。

    XMPP基于XML,用于IM系统的开发。国内比较流行的XMPP服务器叫做Openfire,它使用MINA作为下层的网络IO框架(不是MINA2是MINA1);国外用的比较多的XMPP服务器叫做Tigase,它的官网号称单节点可以支撑50万用户在线,集群可以支持100万用户在线:(http://projects.tigase.org/

    Cluster with over 1mn online users . 500k online users on a single machine

    当然如果读者所在公司需要开发IM系统,除了使用现成的XMPP服务器以外,还需要实现了XMPP协议的客户端或者开发包(以便进行扩展开发)。您可以在XMPP官网查看到XMPP官方推荐的开发包,各种语言的支持基本上都有:https://xmpp.org/software/libraries/

    笔者曾参与过某几款IM系统的开发(包括自己创业的项目),总的来说XMPP协议本身是不错的选择,但是学习起来会耗费相当的时间,并且某些XMPP客户端、服务器端或者程序库并没有这些开发团队宣传的那么稳定好用。所以如果您的公司需要进行IM系统的开发,那么创立私有的消息协议也会是一个不错的选择

    3-1-2、协议通讯过程示例

    为了让各位读者对XMPP协议有一个感性认识,这里我们给出一个XMPP协议处理“IM用户登录”操作的过程(XMPP的登录方式分为有用户密码和无用户密码两种方式,这里我们介绍无密码登录方式)。

    XMPP协议本身细节比较丰富,这里我们只讨论登录操作,如果读者有兴趣可以下载全套的XMPP官方规范文档进行研究(XMPP | The universal messaging standard):

    这里写图片描述

    通过上图可以看到,XMPP协议中的xml片段。这里出现了几个XMPP协议中的关键信息,例如:

    • stream标记:通讯流标记,是指XMPP的客户端或者服务器端向对方发起的通讯请求(或者响应)。通讯流并不携带正真的内容信息,指示表明客户端和服务器端发生了一次交互。stream的属性包括:to、from、id、xml:lang、version等。

    • iq标记:iq标记是Info/Query的简称(你可以理解成查询信息请求),一般是一组的形式出现,由客户端发起查询请求,由服务器端返回查询结果。由于查询请求的类型不一样,iq标记中可以嵌入的子标记就有很多。例如,可以嵌入bind标记,表明某个用户和jid的绑定关系;可以嵌入多个item标记,表明查询得到的这个用户的好友信息(如下)。

    <iq to='somenode@example.com/someresource' type='result' id='roster'>  
        <query xmlns='jabber:iq:roster'>  
            <item jid='friend1@example.com' name='someone1'/>  
            <item jid='friend2@example.com' name='someone2'/>  
        </query>  
    </iq>
    • jid标记:jid(JabberID)是XMPP协议中标示,它用来标示XMPP网络中的各个XMPP实体(实体可以是某一个用户、某一个服务器、某一个聊天室),规范格式如下:
    jid = [ node "@" ] domain [ "/" resource ] 
    • 还有未出现的message、presence标记:message是实体内容标记,记录了聊天的真实内容;presence标记表示了XMPP用户的服务状态(离线,在线、忙碌等)。示例如下:
    <message to="somenode@example.com/someresource" type="chat"> 
        <body>helloword。。。</body> 
    </message> 

    3-2、Stomp协议

    3-2-1、定义

    Stomp协议,英文全名Streaming Text Orientated Message Protocol,中文名称为 ‘流文本定向消息协议’。是一种以纯文本为载体的协议(以文本为载体的意思是它的消息格式规范中没有类似XMPP协议那样的xml格式要求,你可以将它看作‘半结构化数据’)。目前Stomp协议有两个版本:V1.1和V1.2。

    一个标准的Stomp协议包括以下部分:命令/信息关键字、头信息、文本内容。如下图所示:

    这里写图片描述

    以下为一段简单的协议信息示例:

    CONNECT
    accept-version:1.2
    someparam1:value1
    someparam2:value2
    
    this is conntecon ^@

    上面的示例中,我们使用了Stomp协议的CONNECT命令,它的意思为连接到Stomp代理端,并且携带了要求代理端的版本信息和两个自定义的K-V信息(请注意’^@’符号,STOMP协议中用它来表示NULL)。

    Stomp协议中有两个重要的角色:STOMP客户端与任意STOMP消息代理(Broker)。如下图所示:

    这里写图片描述

    看了上面的示意图后有的读者可能会问:为什么称为Stomp消息代理,而不称为Stomp消息服务?因为Stomp Broker只是负责接受和存储客户端发来的消息、只是按照客户端要求的路径转发消息,只是管理客户端连接和订阅:它并不负责根据消息内容做任何业务处理。所以将它称为消息代理端更贴切。

    由于Stomp协议的结构如此简单,以至于任何理解Stomp协议命令格式的技术人员都可以开发Stomp的代理端或者Stomp的客户端,并将自己满足Stomp协议的系统轻松接入另一个同样满足Stomp协议的第三方系统(例如activeMQ)

    3-2-2、基本命令/返回信息

    和介绍XMPP协议的方式类似,为了让读者对Stomp协议有进一步的认识,本小节我们介绍Stomp协议的基本命令和代理端返回的信息种类,并且列举一些实例进行使用讲解。

    在Stomp协议中,主要有以下命令/返回信息(有的文章中也称一个完整的信息为帧)。这些命令/返回信息构成了Stomp协议的主体,并能够支持您的Stomp客户端和Stomp代理端完成连接、发送、订阅、事务、响应的整个操作过程。这些命令/返回是:

    • CONNECT/STOMP命令: 客户端通过使用CONNECT命令,连接到Stomp代理端。如果使用STOMP命令,那么Stomp代理端的版本必须是1.2。

    • CONNECTED信息:当Stomp代理端收到客户端发送来的Connect命令并且处理成功后,将向这个客户端返回CONNECTED状态信息;如果这个过程中出现任何问题,还可能返回ERROR信息

    • SEND 发送命令:客户端使用SEND命令,向某个指定位置(代理端上的一个虚拟路径)发送内容。这样在这个路径上订阅了消息事件的其它客户端,将能够收到这个消息。

    • SUBSCRIBE 订阅命令:客户端使用SUBSCRIBE订阅命令,向Stomp服务代理订阅某一个虚拟路径上的监听。这样当其它客户端使用SEND命令发送内容到这个路径上时,这个客户端就可以收到这个消息。在使用SUBSCRIBE时,有一个重要的ACK属性。这个ACK属性说明了Stomp服务代理端发送给这个客户端的消息是否需要收到一个ACK命令,才认为这个消息处理成功了。如下所示:

    SUBSCRIBE
    id:XXXXXXXXX
    destination:/test
    ack:client
    
    ^@

    以上SUBSCRIBE命令信息说明,客户端订阅的虚拟位置是test。且命令信息中ack属性为client,说明当客户端收到消息时,必须向代理端发送ack命令,代理端才认为这个消息处理成功了(ack的值只有三种:auto(默认)、client和client-individual)。

    • UNSUBSCRIBE 退订命令:客户端使用这个命令,取消对某个路径上消息事件的监听。如果客户端给出的路径之前就没有被这个客户端订阅,那么这个命令执行无效。

    • MESSAGE 信息:当客户端在某个订阅的位置收到消息时,这个消息将通过MESSAGE关键字进行描述。类似以下信息就是从代理端拿到的消息描述:

    MESSAGE
    redelivered:true
    message-id:ID:localhost-34450-1457321490460-4:24:-1:1:1
    destination:/test
    timestamp:1457331607873
    expires:0
    priority:4
    
    2345431457331607861
    • BEGIN 开始事务命令: Stomp协议支持事务模式,在这种模式下,使用Send命令从某个客户端发出的消息,在没有使用COMMIT正式提交前,这些消息是不会真正发送给Stomp代理端的。BEGIN命令就是用于开启事务。注意,一个事务中可以有一条消息,也可以有多条消息

    • COMMIT 提交命令: 当完成事务中的信息定义后,使用该命令提交事务。只有使用COMMIT命令后,在某一个事务中的一条或者多条消息才会进入Stomp代理端的队列(订阅了事件的其它客户端才能收到这些消息)。

    • ABORT 取消/终止事务命令:很明显,这个命令用于取消/终止当前还没有执行COMMIT命令的事务。

    • ACK 确认命令:当客户端使用SUBSCRIBE命令进行订阅时,如果在SUBSCRIBE命令中制定ack属性为client,那么这个客户端在收到某条消息(id为XXXX)后,必须向Stomp代理端发送ACK命令,这样代理端才会认为消息处理成功了;如果Stomp客户端在断开连接之前都没有发送ACK命令,那么Stomp代理端将在这个客户端断开连接后,将这条消息发送给其它客户端

    ACK
    id:MESSAGE ID
    
    ^@

    请注意head部分的id属性,传递的id属性是之前收到的MESSAGE信息的id标示。

    • NACK 不确认命令:同样是以上的SUBSCRIBE命令的状态下,如果这时Stomp客户端向Stomp代理端发送NACK信息,证明这条消息在这个客户端处理失败。Stomp代理端将会把这条消息发送给另一个客户端(无论当前的客户端是否断开连接)

    • DISCONNECT 断开命令:这个命令将断开Stomp客户端与Stomp代理端的连接。

    上篇文章中我们重点讨论了“协议”的重要性,并为各位读者介绍了Stomp协议和XMPP协议。这两种协议是消息队列中两种不同使用场景下的典型代表。本文主要接续上文的篇幅,继续讨论消息队列中另一种典型协议:AMQP协议。

    3-3、AMQP协议

    AMQP协议的全称是:Advanced Message Queuing Protocol(高级消息队列协议)。目前AMQP协议的版本为 Version 1.0,这个协议标准在2014年通过了国际标准组织 (ISO) 和国际电工委员会 (IEC) 的投票,成为了新的 ISO 和 IEC 国际化标准。目前支持AMQP的软件厂商包括:

    这里写图片描述

    3-3-1、协议概览

    在网络上讲解AQMP协议的文章已经有很多了,您可以在百度、Google、必应上搜索关键字‘AMQP’,就会出现很多相关文章。虽然文章数量比较多,但是却鲜有质量过硬的文章(当然除了AMQP官网 Home | AMQP 的协议说明文档)。本小节的内容我试图在能力所及的范围内,为各位读者将AMQP协议的核心要点讲清楚。为了达到这个目的,首先将AMQP协议的原理用下图进行一个全面呈现,然后在详细讲解图中的每一个要点:

    这里写图片描述

    从上图我们可以看到AMQP协议的各个组成部分:

    • AMQP协议中的元素包括:Message(消息体)、Producer(消息生产者)、Consumer(消息消费者)、Virtual Host(虚拟节点)、Exchange(交换机)、Queue(队列)等

    • 由Producer(消息生产者)和Consumer(消息消费者)构成了AMQP的客户端,他们是发送消息和接收消息的主体。AMQP服务端称为Broker,一个Broker中一定包含完整的Virtual Host(虚拟主机)、 Exchange(交换机)、Queue(队列)定义。

    • 一个Broker可以创建多个Virtual Host(虚拟主机),我们将讨论的Exchange和Queue都是虚拟机中的工作元素(还有User元素)。注意,如果AMQP是由多个Broker构成的集群提供服务,那么一个Virtual Host也可以由多个Broker共同构成。

    • Connection是由Producer(消息生产者)和Consumer(消息消费者)创建的连接,连接到Broker物理节点上。但是有了Connection后客户端还不能和服务器通信,在Connection之上客户端会创建Channel,连接到Virtual Host或者Queue上,这样客户端才能向Exchange发送消息或者从Queue接受消息。一个Connection上允许存在多个Channel,只有Channel中能够发送/接受消息

    • Exchange元素是AMQP协议中的交换机,Exchange可以绑定多个Queue也可以同时绑定其他Exchange。消息通过Exchange时,会按照Exchange中设置的Routing(路由)规则,将消息发送到符合的Queue或者Exchange中

    那么AMQP消息在这个结构中是如何通过Producer发出,又经过Broker最后到达Consumer的呢?请看下图:

    这里写图片描述

    1. 在Producer(消息生产者)客户端建立了Channel后,就建立了到Broker上Virtual Host的连接。接下来Producer就可以向这个Virtual Host中的Exchange发送消息了。

    2. Exchange(交换机)能够处理消息的前提是:它至少已经和某个Queue或者另外的Exchange形成了绑定关系,并设置好了到这些Queue和Excahnge的Routing(路由规则)。Excahnge中的Routing有三种模式,我们随后会讲到。在Exchange收到消息后,会根据设置的Routing(路由规则),将消息发送到符合要求的Queue或者Exchange中(路由规则还会和Message中的Routing Key属性配合使用)。

    3. Queue收到消息后,可能会进行如下的处理:如果当前没有Consumer的Channel连接到这个Queue,那么Queue将会把这条消息进行存储直到有Channel被创建(AMQP协议的不同实现产品中,存储方式又不尽相同);如果已经有Channel连接到这个Queue,那么消息将会按顺序被发送给这个Channel。

    4. Consumer收到消息后,就可以进行消息的处理了。但是整个消息传递的过程还没有完成:视设置情况,Consumer在完成某一条消息的处理后,将需要手动的发送一条ACK消息给对应的Queue(当然您可以设置为自动发送,或者无需发送)。Queue在收到这条ACK信息后,才会认为这条消息处理成功,并将这条消息从Queue中移除;如果在对应的Channel断开后,Queue都没有这条消息的ACK信息,这条消息将会重新被发送给另外的Channel。当然,您还可以发送NACK信息,这样这条消息将会立即归队,并发送给另外的Channel

    3-3-2、Message(消息体)

    通过上一小节的描述,我们可以看到AMQP协议中消息的处理规则和Stomp协议中消息的处理规则有类似之处,比如对ACK、NACK的使用。但明显不同的地方还是很多,例如AMQP中Exchange元素提供的Routing路由规则,这显然比Stomp协议中直接发送给Queue要灵活得多。

    为了支持AMQP协议中的这些规则,AMQP协议中的消息也必须有特定的格式,实际上AMQP协议要比Stomp协议复杂得多。下面我们就根据ISO/IEC发布的AMQP Version 1.0标准文档,来讨论一下AMQP协议中的消息格式。

    首先要说明的是目前国内多个技术站点,详细介绍AMQP消息格式的文章本来就不多(不包括那些聊聊几笔的转发),而且基本上都没有详细讲解格式本身,只是粗略说明了AMQP消息采用二进制格式(任何应用层协议在网络上进行传输,都是使用二进制流进行的,所以这个说法当然没错)。

    有的文章还向读者传递了错误的信息。例如说AMQP消息格式包括两部分:消息头和消息正文。 这是完全错误的,虽然AMQP消息格式确实包括Header和Body部分,但是绝对不止这两个部分。(如果真是这样,ISO/IEC组织就不需要使用125页的文档篇幅来进行说明了)

    首先我们需要说明的是,作为一种网络通讯协议,AMQP工作在七层/五层网络模型的应用层,是一个典型的应用层协议;另外,由于AMQP协议存在多种元素定义,且这些元素定义工作在不同的领域。例如Channel的定义是为了基于网络连接记录会话状态;Queue等元素帮助AMQP完成路由规则,这些元素在Message消息记录中都需要有所体现。

    所以AMQP协议首先要记录网络状态和会话状态,格式如下(AMQP帧的定义在《OASIS Advanced Message Queueing Protocol
    (AMQP) Version 1.0》文档的第38页):

    这里写图片描述

    其中非PAYLOAD部分,在网络协议的应用层说明Channel的工作状态(当然还有说明整个AMQP消息的长度区域:SIZE),我们真正需要的内容存在PAYLOAD区域。PAYLOAD区域(译文称为‘交付区’)的格式如下(可以在《OASIS Advanced Message Queueing Protocol
    (AMQP) Version 1.0》文档的第3部分:messaging第82页找到详细说明):

    这里写图片描述

    在PAYLAOD区域一共包含7个数据区域:header、delivery-annotations、message-annotations、properties、application-properties、application-data、footer。这些元素的作用如下:

    • header:header部分记录了AMQP消息的在‘支持AMQP的中间件’中的交互状态。例如该条消息在节点间被交互的总次数、优先级、TTL(Time To Live)值等信息。

    • delivery-annotations:在header部分只能传递规范的、标准的、经过ISO/IEC组织定义的属性。那么如果需要在header部分传递一些非标准信息怎么办呢?这就是delivery-annotations数据区域存在的意义:用来记录那些’非标’的header信息。

    • message-annotations:这个数据区域,用于存储一些自定义的辅助属性。和delivery-annotations区域的非标准信息不同,这里的自定义属性主要用于消息的转换。例如AMQP-JMS信息转换过程中将依据这个数据区域的“x-opt-jms-type”、“x-opt-to-type”、“x-opt-reply-type”和“name”属性进行JMS规范中对应的“JMSType”、“Type of the JMSDestination”、“Type of the JMSReplyTo”和“JMS_AMQP_MA_name”属相的转换。

    • properties:从整个AMQP消息的properties属性开始,到AMQP消息的application-data部分结束,才是AMQP消息的正文内容(译文称为‘裸消息’)。Properties属性记录了AMQP消息正文的那些‘不可变’属性。在properties部分只能传递规范的、标准的、经过ISO/IEC组织定义的属性。例如:消息id、分组id、发送者id、内容编码等。以下是AMQP协议文档中对Properties部分属性的描述(只能包含这些信息):

    <type name="properties" class="composite" source="list" provides="section">
        <descriptor name="amqp:properties:list" code="0x00000000:0x00000073"/>
        <field name="message-id" type="*" requires="message-id"/>
        <field name="user-id" type="binary"/>
        <field name="to" type="*" requires="address"/>
        <field name="subject" type="string"/>
        <field name="reply-to" type="*" requires="address"/>
        <field name="correlation-id" type="*" requires="message-id"/>
        <field name="content-type" type="symbol"/>
        <field name="content-encoding" type="symbol"/>
        <field name="absolute-expiry-time" type="timestamp"/>
        <field name="creation-time" type="timestamp"/>
        <field name="group-id" type="string"/>
        <field name="group-sequence" type="sequence-no"/>
        <field name="reply-to-group-id" type="string"/>
    </type>
    • application-properties:‘应用数据’属性,在这部分数据中主要记录和应用有关的数据,AMQP的实现产品(例如RabbitMQ)需要用这部分数据决定其处理逻辑。例如:送入哪一个Exchange、消息的Routing值是什么、是否进行持久化等。

    • application-data:使用二进制格式描述的AMQP消息的用户部分内容。既是我们发送出去的真实内容

    • footer:一般在这个数据区域存储辅助内容,例如消息的哈希值,HMAC,签名或者加密细节。

    以上才是一个AMQP消息的完整结构。当然由于篇幅限制,在某一个数据区域的‘标准’属性就没有再细讲了,例如Properties数据区域定义的creation-time属性、Header数据区域定义的durable属性。有兴趣的朋友可以参考《OASIS Advanced Message Queueing Protocol
    (AMQP) Version 1.0》,这个文档我已经上传到我的下载列表中,供大家免费下载^_^(OASISAdvancedMessageQueueingProtocol(AMQP)Version1.0-其它文档类资源-CSDN下载)。

    3-3-3、Exchange(交换机)路由规则

    Exchange交换机在AMQP协议中主要负责按照一定的规则,将收到的消息转发到已经和它事先绑定好的Queue或者另外的Exchange中。Excahnge交换机的这个处理过程称之为Routing(路由)。目前流行的AMQP路由实现一共有三种:分别是Direct Exchange、Fanout Exchange和Topic Exchange。需要特别注意的是:Exhange需要具备怎样的‘路由’规则,并没有在AMQP标准协议进行强行规定,目前流行的AMQP转发规则都是AMQP实现产品自行开发的(这也是为什么AMQP消息中和路由、过滤规则相关的属性是存放在application-properties区域的原因)。

    A、Direct路由

    direct模式从字面上的理解应该是‘引导’、‘直接’的含义。direct模式下Exchange将使用AMQP消息中所携带的Routing-Key和Queue中的Routing Key进行比较。如果两者完全匹配,就会将这条消息发送到这个Queue中。如下图所示:

    这里写图片描述

    B、Fanout路由

    Fanout路由模式不需要Routing Key。当设置为Fanout模式的Exchange收到AMQP消息后,将会将这个AMQP消息复制多份,分别发送到和自己绑定的各个Queue中。如下图所示:

    这里写图片描述

    C、Topic路由

    Topic模式是Routing Key的匹配模式。Exchange将支持使用‘#’和‘ * ’通配符进行Routing Key的匹配查找(‘#’表示0个或若干个关键词,‘ * ’表示一个关键词,注意是关键词不是字母)。如下图所示:

    这里写图片描述

    为了方便各位读者的理解,这里我们再举几个通配符匹配的示例:

    • “param.#”,可以匹配“param”、“param.test”、“param.value”、“param.test.child”等等AMQP消息的Routing Key;但是不能匹配诸如“param1.test”、“param2.test”、“param3.test”。以为param这个关键词和param1这个关键词不相同。

    • “param.*.* ”,可以匹配“param.test.test”、“param.test.value”、“param.test.child”等等AMQP消息的Routing Key;但是不能匹配诸如“param”、“param.test”、“parm.child”等等Routing Key。

    • “param.*.value”,可以匹配“param.value.value”、“param.test.value”等Routing Key;但是不能匹配诸如“param.value”、“param.value.child”等Routing Key。

    注意,以上介绍的Direct 路由模式和Topic 路由模式中,如果Exchange交换机没有找到任何匹配Routing Key的Queue,那么这条AMQP消息会被丢弃。(只有Queue有保存消息的功能,但是Exchange并不负责保存消息)

    4、不得不提的JMS规范

    JMS不是消息队列,更不是某种消息队列协议。**JMS是Java消息服务接口,是一套规范的JAVA API 接口。这套规范接口由SUN提出,并在2002年发布JMS规范的Version 1.1版本。**JMS和消息中间件厂商无关,既然是一套接口规范,就代表这它需要各个厂商进行实现。好消息是,大部分消息中间件产品都支持JMS 接口规范。也就是说,您可以使用JMS API来连接Stomp协议的产品(例如ActiveMQ)。就像您可以使用JDBC API来连接ORACLE或者MYSQL一样。

    部分网络上的资料都介绍JMS是一个消息队列,这个说法是错误的,会误导读者。难道你能说JDBC是数据库?

    当然,这些具体实现JMS规范的JAVA API都是由具体的中间件厂商提供的。下面一段代码演示了如何使用JMS建立与ActiveMQ的连接:

    package com.yinwenjie.test.testActivemq.jms;
    
    import javax.jms.Connection;
    import javax.jms.Destination;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
    
    /**
     * 测试使用JMS API连接ActiveMQ
     * @author yinwenjie
     */
    public class JMSProducer {
        /**
         * 由于是测试代码,这里忽略了异常处理。
         * 正是代码可不能这样做
         * @param args
         * @throws RuntimeException
         */
        public static void main (String[] args) throws Exception {
            // 定义JMS-ActiveMQ连接信息
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616", "username", "password");
            Session session = null;
            Destination sendQueue;
            Connection connection = null;
    
            //进行连接
            connection = connectionFactory.createConnection();
            connection.start();
    
            //建立会话
            session = connection.createSession(true, Session.SESSION_TRANSACTED);
            //建立queue(当然如果有了就不会重复建立)
            sendQueue = session.createQueue("");
            //建立消息发送者对象
            MessageProducer sender = session.createProducer(sendQueue);
            TextMessage outMessage = session.createTextMessage();
            outMessage.setText("这是发送的消息内容");
    
            //发送(JMS是支持事务的)
            sender.send(outMessage);
            session.commit();
    
            //关闭
            sender.close();
            connection.close();
            connectionFactory.close();
        }
    }

    这里,再给各位读者一个官方文档。这个官方文档用于描述ActiveMQ消息中间件中实现的AMQP协议信息转换为JMS服务接口能够识别的数据信息(请仔细理解这句话黑体字部分的描述)。http://activemq.apache.org/amqp.html

    5、后文介绍

    通过两篇文章的篇幅,我们介绍了典型的消息队列协议。当然还有很多具体的消息队列协议没有讲到,但是通过介绍XMPP、AMQP、Stomp协议可以起到一个管中窥豹可见一斑的效果。另外我们还说明了JMS规范的具体含义,以便帮助读者纠正一些不正确的观点。

    下一篇文章开始,我们将讲解两个典型的消息队列中间件:ActiveMQ和RabbitMQ。最后我们还会列举一个实际场景,然后通过消息队列技术一起搭建一个高性能的业务处理方案。

    展开全文
  • MQ--消息协议

    千次阅读 2019-06-14 20:29:26
    消息的传递依赖于协议,没有协议消息无法准确的读取,没有协议消息的传递也不安全。 在计算机网络中,网络有7层协议。 web开发由http、https协议。 tcp/udp.......
  • 底层协议(例如 TCP)是被设计用来将一个消息从一个发送者(sender)传递给一个接收者(receiver)。他们并不关系消息本身应该如何构建,也不关系消息的请求、获取、存储以及如何保证安全可靠。 像 WebSockets 这样...
  • 首先我们将讨论几种常用消息队列协议的基本原理和工作方式,包括MQTT、XMPP、Stomp、AMQP、OpenWire等。然后在这个基础上介绍两款MQ产品:ActiveMQ和RabbitMQ,它们是现在业务系统中应用广泛的消息队列软件。包括...
  • Socket通信几本协议: 首先解释下为什么Socket通信需要一定的协议才能理解消息的内容 1. 安全性, 协议中有判断内容安全的字段(比如报文的长度), 这样可以进行验证,如果被网络连接和篡改,这样的消息就是...消息协议
  • ActiveMQ系列—消息协议(Stomp协议)

    千次阅读 2017-09-18 21:50:20
    1、定义Stomp协议,英文全名Streaming Text Orientated Message Protocol,中文名称为 ‘流文本定向消息协议’。是一种以纯文本为载体的协议(以文本为载体的意思是它的消息格式规范中没有类似XMPP协议那样的xml格式...
  • 首先我们将讨论几种常用消息队列协议的基本原理和工作方式,包括MQTT、XMPP、Stomp、AMQP、OpenWire等。然后在这个基础上介绍两款MQ产品:ActiveMQ和RabbitMQ,它们是现在业务系统中应用广泛的消息队列软件。包括...
  • 互联网控制消息协议(英语:InternetControlMessageProtocol,缩写:ICMP)是互联网协议族的核心协议之一。它用于TCP/IP网络中发送控制消息,提供可能发生在通信环境中的各种问题反馈,通过这些信息,使管理者可以对...
  • MQTT协议

    千次阅读 2021-03-25 14:22:03
    客户端只需要订阅这个主题,当有其他客户端向这个服务端发布消息时,这个客户端就可以收到这个消息 请求响应模式 请求响应模式: 客户端向服务端发送请求,服务端收到请求后,向客户端返回响应 1.1 MQTT简介 MQTT...
  • ActiveMQ系列—消息协议(AMQP协议)

    千次阅读 2017-09-18 22:24:50
    AMQP协议的全称是:Advanced Message Queuing Protocol(高级消息队列协议)。目前AMQP协议的版本为 Version 1.0,这个协议标准在2014年通过了国际标准组织 (ISO) 和国际电工委员会 (IEC) 的投票,成为了新的 ISO 和...
  • MQTT协议介绍

    千次阅读 2022-04-02 12:56:15
    MQTT(Message Queuing Telemetry Transport, 消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少...
  • 选择你的消息协议 AMQP, MQTT,STOMP

    千次阅读 2019-01-28 15:46:34
    当我讨论软件架构主题时,我被问的最常多的一个问题就是是当前存在的各种应用程序消息传递协议之间的区别 - 诸如协议如何和为什么出现,以及在特定应用程序中应该使用哪一个的问题。 他们的问题是...
  • 消息中间件常用协议

    千次阅读 2021-12-10 12:32:24
    消息中间件常用协议
  • SOAP协议解析

    万次阅读 多人点赞 2018-08-31 11:29:34
    一、SOAP协议简介 1、SOAP简介  SOAP(Simple Object Accrss ... SOAP基于XML语言和XSD标准,其定义了一套编码规则,编码规则定义如何将数据表示为消息,以及怎样通过HTTP协议来传输SOAP消息,由四部分组成: ...
  • 微信协议

    千次阅读 2019-10-18 13:31:09
    微信 ipad sdk,微信ipad协议,微信web版接口api,微信网页版接口,微信电脑版sdk,微信开发sdk,微信开发API,微信协议,微信接口文档sdk,替代微信ipad协议的api接口,网页个人微信api分享1、基础消息类型1、...
  • MQTT协议规范

    千次阅读 2021-11-18 10:48:09
    MQTT是基于TCP/IP协议栈构建的异步通信消息协议,是一种轻量级的发布、订阅信息传输协议。可在不可靠的网络环境中进行扩展,适用于设备硬件存储空间或网络带宽有限的场景。使用MQTT协议,消息发送者与接收者不受时间...
  • 所以说,消息队列也是一样,想要互相通信,就要使用同一种协议。 每个协议下的消息队列,都有着不同的角色定义。 简单说下常见的消息队列协议: 1.AMQP(Advance Message Queuing Protocol) Message(消息):消息...
  • * Q3解决方案:不同业务定义不同的协议,比如心跳协议,业务协议; 另外一种方案就是实用json数据格式进行传输 * @Link(http://blog.csdn.net/ljl157011/article/details/19291611) * 短链接:建立完一次通信后将被...
  • 拜占庭将军问题之口头协议

    千次阅读 2022-03-26 16:23:23
    解决拜占庭将军问题之口头协议
  • 微信消息推送协议简单分析

    千次阅读 2015-10-10 08:51:30
    2) 有新消息时,微信服务器A通知Android微信客户端;后者和微信服务器B建立新的TCP短连接,并获得数据; 3) 心跳间隔比较长,约300秒左右;     【协议分析】   分析涉及到两个微信服务器: 服务器一:183...
  • 网络协议之:WebSocket的消息格式

    万次阅读 2021-09-24 10:23:35
    文章目录简介WebSocket的握手流程webSocket的消息格式Extensions和Subprotocols总结 简介 我们知道WebSocket是建立在TCP协议基础上的一种网络协议,用来进行客户端和服务器端的实时通信。非常的好用。最简单的使用...
  • 实时发布订阅协议(RTPS)DDS互操作网络协议规范-中文翻译_001 关键字:OMG,RTPS,DDS The Real-time Publish-Subscribe Protocol (RTPS) DDS Interoperability Wire Protocol Specification,Version 2.2,...
  • 实时消息传输协议(RTMP)详解

    千次阅读 2017-03-30 22:31:34
    RTMP提供了一套全双工的可靠的多路复用消息服务,类似于TCP协议[RFC0793],用来在一对结点之间并行传输带时间戳的音频流,视频流,数据流。通常情况下,不同类型的消息会被分配不同的优先级,当网络传输能力受限时,...
  • 1.MQTT协议介绍

    千次阅读 2022-02-26 21:08:59
    MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。 MQTT最大优点在于,可以...
  • 【5G核心网】 NAS消息

    万次阅读 2020-09-02 10:10:37
    2.3 PDU session identity Table 11.2.3.1c.1: PDU session identity 2.4 Procedure transaction identity L3 协议可以定义包含过程事务标识(PTI) 的标准 L3 消息协议。对于给定的 PD 和给定的 SAP,PTI 多...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 669,371
精华内容 267,748
关键字:

消息协议

友情链接: LoginWithMySQL2.rar