精华内容
下载资源
问答
  • MQTT协议MQTT协议简介及协议原理

    万次阅读 多人点赞 2016-02-04 15:34:36
    MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以...

    MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。做为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

    1. MQTT协议特点
    2. MQTT协议原理

     

    1. MQTT协议特点

    MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

    MQTT协议当前版本为,2014年发布的MQTT v3.1.1。除标准版外,还有一个简化版MQTT-SN,该协议主要针对嵌入式设备,这些设备一般工作于百TCP/IP网络,如:ZigBee。

    MQTT协议运行在TCP/IP或其他网络协议,提供有序、无损、双向连接。其特点包括:

    1. 使用的发布/订阅消息模式,它提供了一对多消息分发,以实现与应用程序的解耦。
    2. 对负载内容屏蔽的消息传输机制。
    3. 对传输消息有三种服务质量(QoS):
      • 最多一次,这一级别会发生消息丢失或重复,消息发布依赖于底层TCP/IP网络。即:<=1
      • 最少一次,这一级别会确保消息到达,但消息可能会重复。即:>=1
      • 只有一次,确保消息只有一次到达。即:=1。在一些要求比较严格的计费系统中,可以使用此级别
    4. 数据传输和协议交换的最小化(协议头部只有2字节),以减少网络流量
    5. 通知机制,异常中断时通知传输双方

     

    2. MQTT协议原理

    2.1 MQTT协议实现方式

    • 实现MQTT协议需要:客户端服务器端
    • MQTT协议中有三种身份:发布者(Publish)代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者
    • MQTT传输的消息分为:主题(Topic)负载(payload)两部分
      • Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload
      • payload,可以理解为消息的内容,是指订阅者具体要使用的内容

     

    2.2 网络传输与应用消息

    MQTT会构建底层网络传输:它将建立客户端到服务器的连接,提供两者之间的一个有序的、无损的、基于字节流的双向传输。

    当应用数据通过MQTT网络发送时,MQTT会把与之相关的服务质量(QoS)和主题名(Topic)相关连。

     

    2.3 MQTT客户端

    一个使用MQTT协议的应用程序或者设备,它总是建立到服务器的网络连接。客户端可以:

    • 发布其他客户端可能会订阅的信息
    • 订阅其它客户端发布的消息
    • 退订或删除应用程序的消息
    • 断开与服务器连接

     

    2.4 MQTT服务器

    MQTT服务器以称为“消息代理”(Broker),可以是一个应用程序或一台设备。它是位于消息发布者订阅者之间,它可以:

    • 接受来自客户的网络连接
    • 接受客户发布的应用信息
    • 处理来自客户端的订阅和退订请求
    • 向订阅的客户转发应用程序消息

     

    2.5 MQTT协议中的订阅、主题、会话

    订阅(Subscription)

    订阅包含主题筛选器(Topic Filter)和最大服务质量(QoS)。订阅会与一个会话(Session)关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。

    会话(Session)

    每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。

    主题名(Topic Name)

    连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。

    主题筛选器(Topic Filter)

    一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。

    负载(Payload)

    消息订阅者所具体接收的内容

     

    2.6 MQTT协议中的方法

    MQTT协议中定义了一些方法(也被称为动作), 来于表示对确定资源所进行操作。 这个资源可以代表预先存在的数据或动态生成数据,这取决于服务器的实现。通常来说,资源指服务器上的文件或输出。

    Connect,等待与服务器建立连接

    Disconnect,等待MQTT客户端完成所做的工作,并与服务器断开TCP/IP会话

    Subscribe,等待完成订阅

    UnSubscribe,等待服务器取消客户端的一个或多个topics订阅

    Publish,MQTT客户端发送消息请求,发送完成后返回应用程序线程

    http://itbilu.com/other/relate/4kHBsx_Pg.html

    展开全文
  • mqtt协议

    万次阅读 2018-12-20 16:00:18
    物联网下,物理设备内存CPU有限、4G网络不可靠、网络带宽小等,公司设备准备用MQTT协议实现P/S模式的消息传递,目前有emqttd、mosquitto、activemq等支持mqtt协议。对于点对点的消息传递直接使用一般的通信方式不...

    物联网下,物理设备内存CPU有限、4G网络不可靠、网络带宽小等,公司设备准备用MQTT协议实现P/S模式的消息传递,目前有emqttd、mosquitto、activemq等支持mqtt协议。对于点对点的消息传递直接使用一般的通信方式不使用消息队列就ok的,但是最近出了个需求需要消息广播准备使用发布订阅来实现。rabbitmq是将mqtt协议转换为amqp协议来处理。

    • 1.消息类型

    消息类型比较简单,请求报文也比较简单。

    CONNECT	    1	    C->S	客户端请求与服务端建立连接
    CONNACK	    2	    S->C	服务端确认连接建立
    PUBLISH	    3	    CóS	    发布消息
    PUBACK	    4	    CóS	    收到发布消息确认
    PUBREC	    5	    CóS	    发布消息收到
    PUBREL	    6	    CóS	    发布消息释放
    PUBCOMP	    7	    CóS	    发布消息完成
    SUBSCRIBE	8	    C->S	订阅请求
    SUBACK	    9	    S->C	订阅确认
    UNSUBSCRIBE	10	    C->S	取消订阅
    UNSUBACK	11	    S->C	取消订阅确认
    PING	    12	    C->S	客户端发送PING(连接保活)命令
    PINGRSP	    13	    S->C	PING命令回复
    DISCONNECT	14	    C->S	断开连接
    • 2.rabbitmq开启mqtt
    #开启WEB管理
    rabbitmq-plugins enable rabbitmq_management
    #开启MQTT插件
    rabbitmq-plugins enable rabbitmq_mqtt

    启用的是1883端口:

    mqtt

    • 3.java客户端

    程序都是网上找的,先写了个简单的测试。依赖文件:

    <dependency>
    	<groupId>org.eclipse.paho</groupId>
    	<artifactId>mqtt-client</artifactId>
    	<version>0.4.0</version>
    </dependency>

    publish端:

    //发布客户端
    public class publishClient {
    
        //mqtt服务器地址
        public static final String HOST = "tcp://114.116.48.130:1883";
        //主题
        public static final String TOPIC = "service_login";
        //mqtt 客户机ID
        private static final String clientid = "server";
        private MqttClient client;//客户端实例
        private MqttTopic topic11;//主题实例
        private String userName = "*****";  //非必须
        private String passWord = "*****";  //非必须
        private MqttMessage message;
        //初始化客户端实例
        public publishClient() throws MqttException {
            //MemoryPersistence设置clientid的保存形式,默认为以内存保存
            client = new MqttClient(HOST, clientid, new MemoryPersistence());
            connect();
        }
        //连接服务器
        private void connect() {
            //连接配置
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);//不保存,每次重启新client
            options.setUserName(userName);
            options.setPassword(passWord.toCharArray());
            // 设置超时时间
            options.setConnectionTimeout(10);
            // 设置会话心跳时间
            options.setKeepAliveInterval(20);
            try {
                //设置发布回调
                client.setCallback(new publishCallback());
                client.connect(options);
                topic11 = client.getTopic(TOPIC);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        //发布
        public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,
                MqttException {
            MqttDeliveryToken token = topic.publish(message);
            token.waitForCompletion();
            System.out.println("message is published completely! "+ token.isComplete());
        }
        //测试类
        public static void main(String[] args) throws MqttException, InterruptedException {
            //发布客户端
            publishClient server = new publishClient();
            //每隔10s发一条
            for (;;){
                server.message = new MqttMessage();
                server.message.setQos(1);//保证消息能到达一次
                server.message.setRetained(true);//消息保留
                server.message.setPayload("{'key':'value'}".getBytes());//消息内容
                server.publish(server.topic11 , server.message);//发布
                Thread.sleep(10000);
            }
        }
    }

    subscribe端:

    //订阅客户端
    public class subscribeClient {
        //mqtt服务器ip
        public static final String HOST = "tcp://114.116.48.130:1883";
        //主题
        public static final String TOPIC1 = "service_login";
        //mqtt 客户机ID
        private String clientid = "client";
        private MqttClient client;
        private MqttConnectOptions options;
        private String userName = "*****";
        private String passWord = "*****";
        @SuppressWarnings("unused")
        private ScheduledExecutorService scheduler;
        public subscribeClient(String clientid){
            this.clientid = clientid;
        }
        private void start() {
            try {
                // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
                client = new MqttClient(HOST, clientid, new MemoryPersistence());
                // MQTT的连接设置
                options = new MqttConnectOptions();
                // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接
                options.setCleanSession(true);
                // 设置连接的用户名
                options.setUserName(userName);
                // 设置连接的密码
                options.setPassword(passWord.toCharArray());
                // 设置超时时间 单位为秒
                options.setConnectionTimeout(10);
                // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
                options.setKeepAliveInterval(20);
                // 设置回调
                client.setCallback(new subcribeCallback());
                MqttTopic topic = client.getTopic(TOPIC1);
                //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息
                // 遗嘱
                options.setWill(topic, "close".getBytes(), 2, true);
                client.connect(options);
                //订阅消息
                int[] Qos  = {1};
                String[] topic1 = {TOPIC1};
                client.subscribe(topic1, Qos);
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        public static void main(String[] args) throws MqttException {
            //一个设备一个队列
            new subscribeClient("client_1").start();
            new subscribeClient("client_2").start();
            new subscribeClient("client_3").start();
        }
    }

     callback回调:

    public class publishCallback implements MqttCallback {
    
        //在断开连接时调用
        @Override
        public void connectionLost(Throwable cause) {
            // 连接丢失后,一般在这里面进行重连
            System.out.println("连接断开,可以做重连");
        }
    
        //接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            System.out.println("deliveryComplete---------" + token.isComplete());
        }
    
        //接收已经预订的发布
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            // subscribe后得到的消息会执行到这里面
            System.out.println("接收消息主题 : " + topic);
            System.out.println("接收消息Qos : " + message.getQos());
            System.out.println("接收消息内容 : " + new String(message.getPayload()));
        }
    }
    • 4.测试

    首先QoS取值决定了消息质量,消息推送端分别设置不同的消息级别在建立虚拟通道是有差异,先启动推送端十秒一条消息

    0:尽力发送,如果遇到传递失败,TCP传输层保证不会重试,出错会丢失消息
    1:消费者没有接收确认或者确认消息本身丢失,消息发送者会再次发送,可能造成消息重复
    2:保证消息接收者成功接收一次,造成并发性能下降以及消息传递时延增加

    关于rabbitmq的web窗口参数:

    Publish:producter pub消息的速率。
    Publisher confirm:broker确认pub消息的速率。
    Deliver(manual ack):customer手动确认的速率。
    Deliver( auto ack):customer自动确认的速率。
    Consumer ack:customer正在确认的速率。
    Redelivered:正在传递'redelivered'标志集的消息的速率。
    Get (manual ack):响应basic.get而要求确认的消息的传输速率。
    Get (auto ack):响应于basic.get而发送不需要确认的消息的速率。
    Return:将basic.return发送给producter的速率。
    Disk read:queue从磁盘读取消息的速率。
    Disk write:queue从磁盘写入消息的速率。

    server.message.setQos(0):

    级别0

    server.message.setQos(1):

    级别1

    server.message.setQos(2):

    启动会报IO异常,待解决。Qos=2报错,已断开连接 (32109) - java.io.EOFException,查阅资料发现是rabbitmq自身的bug,据说插件升级到mqtt-3.1.1可以解决,还没有尝试。

    本身confirm方式就是用来确保消息成功推送到broker中,这里正好rabbitmqq默认使用confirm实现mqtt协议的QoS=1。rabbitmq的ack来自于发布确认,但是消费者还没有启动所以队列也没有创建,消息从broker传递到队列中之后(不管消息有没有被消费)都会由broker返回确认,启动消费者,查看队列如下:

    rabbitmq实现的mqtt协议在发布订阅模式下,每个消费者都会创建一个队列,创建队列由订阅主题时触发client.subscribe(topic1, Qos),对于rabbitmq来说会收到broker返回的消息发布成功确认消息:

    每个队列都只有一个消费者,如下:

    由于rabbitmq是将mqtt协议转化为amqp协议,在mqtt协议里面是没有交换机、队列概念的,所以这里整个mqtt服务器是利用同一个topic交换机实现的,查看交换机如下:

    这里交换机消息进出一比三,交换机绑定三个队列正好对应:

    如果更换topic路由键,比如login主题增加一个logout主题,交换机中就会新建一个主题,再启动三个消费者去订阅,结果如下:

    • 5.org.eclipse.paho.client.mqttv3使用细节

    Retained :可以让新订阅的客户端得到发布方的最新的状态值,而不必要等待发送,此操作属于持久化操作,消费端重启服务依然可以收到。mqtt协议消息类型publish有一个redain标记位,broker会储存该topic最后一条消息,新上线的客户端会收到这一条消息,这个消息是本地持久化即使推送端重启。

    CleanSession:是否清除客户端session,清除会使用新身份登入。如果不清除,即使是客户端下线,我这边关掉消费者,mqtt服务器会保留客户端信息如下,点进去发会发现这个队列没有消费者。如果清除的话这里是没有这条记录的。

    ConnectionTimeout:超时时间。

    KeepAliveInterval:会话心跳时间。

    我这边推送端server.message.setRetained(false)设置消息不保留,消费端设置options.setCleanSession(false)客户端身份不清除,按道理消费者重复上线是不会收到保留消息的,可我这里没生效原因不明,重启依旧收到上一条消息的保留值。所以对publish请求抓包,查看一下publish推送请求如下:

    查看subscribe请求抓包如下:

    推送的时候retain是false,订阅到的消息retain是true,应该是rabbitmq给我改了, 估摸是和rabbitmq的消息持久化有关,我现在在web窗口手动publish一条消息(设置非持久化)再对subscribe端抓包发现retain为0,且重启subscribe端是没有重复获取这条消息的,结果如下:

    在web窗口手动向topic交换机publish消息走的是qmqp协议并没有通过rabbitmq自带的mqtt插件所以会造成这种差别,查看rabbitmq-mqtt源码

    #非持久化
    delivery_mode(?QOS_0) -> 1;
    #持久化
    delivery_mode(?QOS_1) -> 2.

    发现qos=1的情况默认持久化消息,所以用rabbitmq-mqtt插件会让qos=1的publicsh消息设置retain=false无效,再次设置qos=0然后测试重启果然没有收到这个消息。

    • 6.关于QPS

    publish端设置qos=0,subcribe端设置cleanSession=false,启动推送端5秒一条消息,启动消费端之后又停止,查看队列中的未消费消息:

    这种情况应该是效率最好的。 对于单队列来说,让生产和消费速率平衡之后测试速率的峰值可以很方便得到QPS,可是我们这里业务场景是生产者速率并不会太高,但是因为要对n多设备进行消息广播所以rabbitmq中在线的队列数量会比我们设备还多,目前还不知道用rabbitmq-mqtt实现广播消息性能怎么样。现在while死循环创建客户端取订阅主题,如图:

    看到默认的socket连接上线是829个,队列达到500的时候就已经占用了一大半了。 

    • 7.关于开放平台mqtt服务器

    中国移动提供了开放的物联网平台,支持多种协议,mqtt就是其中一种。所有服务端交互使用https命令,然后硬件再用对应的协议比如mqtt进行连接。不用自己搭建、不用担心负载、使用简单,就是中国移动要平台维护,几个月一次,每次几分钟,一般的应用可能受不了这5分钟的服务不可用。

    中国移动oneNet物联网开放平台

    • 8.mosquitto

    搭建也比较简单。

    展开全文
  • MQTT协议

    2021-03-25 14:22:03
    1.MQTT协议概念: MQTT是基于Publish/Subscribe(发布订阅)模式的物联网通信协议 特点: 简单易实现 支持Qos(服务质量) 报文小 MQTT协议构建于TCP/IP协议之上 发布订阅模式: 客户端只需要订阅这个主题,当有其他...

    源视频地址

    1.MQTT协议概念:

    MQTT是基于Publish/Subscribe(发布订阅)模式的物联网通信协议
    特点:

    1. 简单易实现
    2. 支持Qos(服务质量)
    3. 报文小
      MQTT协议构建于TCP/IP协议之上

    发布订阅模式:
    在这里插入图片描述
    客户端只需要订阅这个主题,当有其他客户端向这个服务端发布消息时,这个客户端就可以收到这个消息
    请求响应模式
    请求响应模式: 客户端向服务端发送请求,服务端收到请求后,向客户端返回响应

    在这里插入图片描述

    1.1 MQTT简介

    MQTT(Message Queuing Telemetry Transport, 消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为远程连接设备提过实时可靠的消息服务,作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用
    MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(loT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。
    在这里插入图片描述

    1.2 MQTT协议设计规范

    • (1) 精简,不添加可有可无的功能;
    • (2) 发布/订阅(Pub/Sub)模式,方便消息在传感器之间传递,解耦Client/Server模式,带来的好处在于不必预先知道对方的存在(ip/port), 不必同时运行
    • (3) 允许用户动态创建主题(不需要预先创建主题),零运维成本;
    • (4) 把传输量降到最低以提高传输效率
    • (5) 把低带宽、高延迟、不稳定的网络等因素考虑在内;
    • (6) 支持连续的会话保持和控制(心跳协议)
    • (7) 理解客户端计算能力可能很低
    • (8) 提供服务质量( quality of service level: QoS)管理:
    • (9) 不强求传输数据的类型与格式,保持灵活性(指的使应用层业务数据)

    主题的概念
    在这里插入图片描述

    1.3 MQTT协议主要特性

    • (1)开放消息协议,简单易实现。
    • (2)使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
    • (3)对负载(协议携带的应用数据)内容屏蔽的消息传输。
    • (4)基于TCP/IP网络连接,提供有序,无损,双向连接
      主流的MQTT是基于TCP连接进行数据推送的,但是同样有基于UDP的版本,叫做MQTT-SN。这两种版本由于基于不同的连接方式,优缺点自然也就各有不同了。
      由于基于不同的连接方式,优缺点自然也就各有不同了。
    • (5)消息服务质量(QoS)支持,可靠传输保证;有三种消息发布服务质量:
      QoSO:“至多一次”,消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。
      QoS1:“至少—次”,确保消息到达,但消息重复可能会发生。
      QoS2:“只有一次”,确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。
    • (6) 1字节固定报头,2字节心跳报文,最小化传输开销和协议交换,有效减少网络流量。
      这就是为什么在介绍里说它非常适合"在物联网领域,传感器与服务器的通信,信息的收集,要知道嵌入式设备的运算能力和带宽都相对薄弱,使用这种协议来传递消息再适合不过了。
    • (7) 在线状态感知:使用Last Will和Testament特性通知有关各方客户端异常中断的机制。
      Last Will:即遗言机制,用于通知同一主题下的其他设备,发送遗言的设备已经断开了连接。
      Testament:遗嘱机制,功能类似于Last Will。

    1.4 MQTT应用领域

    • 物联网M2M通信,物联网大数据采集
    • Android消息推送,WEB消息推送
    • 移动即时消息,例如Facebook Messenger
    • 智能硬件、智能家具、智能电器
    • 车联网通信,电动车站桩采集
    • 智慧城市、远程医疗、远程教育
    • 电力、石油与能源等行业市场

    2. MQTT协议原理

    2.1 MQTT协议实现方式

    实现MQTT协议需要客户端和服务器端通讯完成, 在通讯过程中, MQTT协议中有三种身份: 发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。 其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。
    在这里插入图片描述
    MQTT传输的消息分为: 主题(Topic) 和 负载(payload)两部分:
    - (1) Topic: 可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload)
    - (2) payload: 可以理解为消息的内容,是指订阅者具体要使用的内容

    2.2 网络传输与应用消息

    MQTT会构建底层网络传输: 它将建立客户端到服务器的连接,提供两者之间的一个有序的、无损的、基于字节流的双向传输。
    当应用数据通过MQTT网络发送时,MQTT会把与之相关的服务质量(QoS)和主题名(Topic)相关联

    2.3 MQTT客户端

    一个使用MQTT协议的应用程序或者设备,它总是建立到服务器的网络连接。客户端可以:

    • (1) 发布其他客户端可能会订阅的信息
    • (2) 订阅其他客户端发布的消息
    • (3) 退定或删除应用程序的消息
    • (4) 断开与服务器的连接

    2.4 MQTT服务器端

    MQTT服务器以称为"消息代理"(Broker), 可以是一个应用程序或一台设备,它是位于消息发布者和订阅者之间,它可以:

    • (1) 接受来自客户的网络连接;
    • (2) 接受客户发布的应用信息
    • (3) 处理来自客户端的订阅和退订请求
    • (4) 向订阅的客户转发应用程序消息

    2.5 发布/订阅、主题、会话

    MQTT是基于发布(Publish)/订阅(Subscribe)模式来进行通信及数据交换的,与HTTP的请求(Request)/应答(Response)的模式有本质的不同
    订阅者(Subscriber)会向 消息服务器(Broker)订阅一个主题(Topic)。成功订阅后,消息服务器会将该主题下的消息转发给所有订阅者
    主题(Topic)以’/‘为分隔符区分不同的层级,包含通配符’+’ 或 ‘#’的主题又称为主题过滤器(Topic Filters); 不含通配符的成为主题名(Topic Names) 例如:

    chat/ room/1
    
    sensor/10/temperature
    
    sensor/+/temperature
    
    $SYS/broker/metrics/packets/received
    
    $SYS/broker/metrics/#
    
    '+' : 表示通配一个层级, 例如a/+,匹配a/x, a/y
    
    '#' : 表示通配多个层级,例如a/#,匹配a/x, a/b/c/d
    
    注: '+' 通配一个层级,‘#’ 通配多个层级(必须在末尾)
    

    发布者(Publisher)只能向主题名发布消息,订阅者(Subscriber)则可以通过订阅主题过滤器来通配多个主题名称

    会话(Session)
    每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。

    2.6 MQTT协议中的方法

    MQTT协议中定义了一些方法(也被称为动作),来于表示对确定资源所进行操作。这个资源可以代表预先存在的数据或动态生成数据,这取决于服务器的实现。通常来说,资源指服务器上的文件或输出。主要方法有:

    • (1) CONNECT: 客户端连接到服务器
    • (2) CONNACK: 连接确认
    • (3) PUBLISH: 发布消息
    • (4) PUBACK: 发布消息确认
    • (5) PUBREC: 发布的消息已接收
    • (6) PUBREL: 发布的消息已释放
    • (7) PUBCOMP: 发布完成
    • (8) SUBSCRIBE: 订阅请求
    • (9) SUBACK: 订阅确认
    • (10) UNSUBSCRIBE: 取消订阅
    • (11) UNSUBACK: 取消订阅确认
    • (12) PINGREQ: 客户端发送心跳
    • (13) PINGRESP: 服务端心跳响应
    • (14) DISCONNECT: 断开连接
    • (15) AUTH: 认证

    3.MQTT协议数据包结构

    在MQTT协议中,一个MQTT数据包由: 固定头(Fixed header)、可变头(Variable header)、消息体(payload)三部分构成。
    MQTT数据包结构如下:
    在这里插入图片描述
    (1) 固定头(Fixed header)。存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识,如连接,发布,订阅,心跳等。其中固定头是必须的,所有类型的MQTT协议中,都必须包含固定头。
    (2) 可变头(Variable header)。存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容。可变头部不是可选的意思,而是指这部分在有些协议类型中存在,在有些协议中不存在。
    (3) 消息体(Payload)。存在于部分MQTT数据包中,表示客户端收到的具体内容。与可变头一样,在有些协议类型中有消息内容,有些协议类型中没有消息内容。

    3.1 固定头(Fixed Header)

    在这里插入图片描述
    固定头存在于所有MQTT数据包中
    固定头包含两部分内容:
    首字节(字节1)
    剩余消息报文长度(从第二个字节开始,长度为1-4字节)
    剩余长度是当前包中剩余内容长度的字节数,包括变量头和有效负载中的数据)。剩余长度不包含用来编码剩余长度的字节。
    剩余长度使用了一种可变长度的结构来编码,这种结构使用单一字节表示0-127的值。大于127的值如下处理。每个字节的低7位用来编码数据,最高位用来表示是否还有后续字节。因此每个字节可以编码128个值,再加上一个标识位。剩余长度最多可以用四个字节来表示。

    数据包类型
    位置: 第一个字节(Byte 1)中的7-4个bit被(Bit[7-4]),标识4位无符号值
    在这里插入图片描述
    在这里插入图片描述
    标志位
    位置: 第一个字节中的0-3个bit位(Bit[3-0])。意思是字节位Bit[3-0]用作报文的标识。
    首字节的低4位(bit3-bit0)用来表示某些报文类型的控制字段,实际上只有少数报文类型有控制位,如下图:
    在这里插入图片描述
    (1) : 其中Bit[3]为DUP字段,如果该值为1,表明这个数据包是一条重复的消息;否则该数据包就是第一次发布的消息
    (2) : Bit[2-1]为QoS字段:
    如果Bit 1 和 Bit 2都为0,表示QoS 0: 至多一次;
    如果Bit 1为1, 表示QoS 1: 至少一次;
    如果Bit 2为1,表示QoS 2:只有一次;
    如果同时将Bit 1 和 Bit 2都设置成1,那么客户端或服务器认为这是一条非法的消息,会关闭当前连接。
    MQTT消息QoS
    MQTT发布消息服务质量保证(QoS)不是端到端的,是客户端与服务端之间的。订阅者收到MQTT消息的QoS级别,最终取决于发布消息的QoS和主题订阅的QoS
    QoS: 服务质量是指 客户端和服务端之间的服务质量
    在这里插入图片描述在这里插入图片描述
    QoS消息订阅(至多一次):
    在这里插入图片描述
    QoS1消息发布订阅(至少一次)
    在这里插入图片描述
    QoS2消息发布订阅(只有一次)
    在这里插入图片描述

    3.2 可变头(Variable Header)

    可变头的意思是可变化的消息头部。有些报文类型包含可变头部有些报文则不包含。可变头部在固定头部和消息内容之间,其内容根据报文类型不同而不同
    在这里插入图片描述
    协议版本
    位无符号值表示客户端的版本等级。3.1.1版本
    MQTT会话
    MQTT客户端向服务器发起CONNECT请求时,可以通过’Clean Session’标志设置会话。
    ‘Clean Session’设置为0,表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。
    ‘Clean Session’设置为1,表示创建一个新的临时会话,在客户端断开时,会话自动销毁
    Will Flag /Will QoS/Will Retain
    Will Flag : 遗言标志位
    Will QoS: 遗言的消息质量
    Will Retain: 遗言的保持状态
    Keep Alive timer(心跳时长)
    心跳协议:
    在这里插入图片描述
    Keep Alive timer 最大时间间隔
    在这里插入图片描述

    3.3 消息体(Payload)

    有些报文类型是包含Payload的,Payload的意思是消息载体的意思
    如PUBLISH的Payload就是指消息内容(应用程序发布的消息内容)。而CONNECT的Payload则包含Client Identifier, Will Topic, Will Message, Username, Password等信息
    包含payload的报文类型如下:
    在这里插入图片描述
    Payload是消息内容,也只在某些报文类型中出现,其内容和格式也根据报文类型不同而不同
    具体不同报文类型所包含的不同Payload,请查看官方文档
    官方文档

    展开全文
  • MQTT 协议

    2019-09-06 10:18:11
    MQTT 协议 MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息...MQTT协议的设计原则 由于物联网的环境是非常特别的,所以MQTT遵循以下设计...

    MQTT 协议

    MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议。

    MQTT协议的设计原则

    由于物联网的环境是非常特别的,所以MQTT遵循以下设计原则:

    (1)精简,不添加可有可无的功能;

    (2)发布/订阅(Pub/Sub)模式,方便消息在传感器之间传递;

    (3)允许用户动态创建主题,零运维成本;

    (4)把传输量降到最低以提高传输效率;

    (5)把低带宽、高延迟、不稳定的网络等因素考虑在内;

    (6)支持连续的会话控制;

    (7)理解客户端计算能力可能很低;

    (8)提供服务质量管理;

    (9)假设数据不可知,不强求传输数据的类型与格式,保持灵活性。

    MQTT的特点

    MQTT协议是为大量计算能力有限,且工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:

    1、使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合;

    2、对负载内容屏蔽的消息传输;

    3、使用 TCP/IP 提供网络连接;

    4、有三种消息发布服务质量:

    • “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
    • “至少一次”,确保消息到达,但消息重复可能会发生。
    • “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。

    5、小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量;

    6、使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。

    Last Will:即遗言机制,用于通知同一主题下的其他设备发送遗言的设备已经断开了连接。

    Testament:遗嘱机制,功能类似于Last Will。

    MQTT协议原理

    MQTT协议实现方式

    实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

    MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

    (1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);

    (2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

    【网络传输与应用消息】

    MQTT会构建底层网络传输:它将建立客户端到服务器的连接,提供两者之间的一个有序的、无损的、基于字节流的双向传输。

    当应用数据通过MQTT网络发送时,MQTT会把与之相关的服务质量(QoS)和主题名(Topic)相关连。

    【MQTT客户端】

    一个使用MQTT协议的应用程序或者设备,它总是建立到服务器的网络连接。客户端可以:

    (1)发布其他客户端可能会订阅的信息;

    (2)订阅其它客户端发布的消息;

    (3)退订或删除应用程序的消息;

    (4)断开与服务器连接。

    【MQTT服务器】

    MQTT服务器可以称为“消息代理”(Broker),可以是一个应用程序或一台设备。它是位于消息发布者和订阅者之间,它可以:

    (1)接受来自客户的网络连接;

    (2)接受客户发布的应用信息;

    (3)处理来自客户端的订阅和退订请求;

    (4)向订阅的客户转发应用程序消息。

    【MQTT协议中的订阅、主题、会话】

    一、订阅(Subscription)

    订阅包含主题筛选器(Topic Filter)和最大服务质量(QoS)。订阅会与一个会话(Session)关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。

    二、会话(Session)

    每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。

    三、主题名(Topic Name)

    连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。

    四、主题筛选器(Topic Filter)

    一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。

    五、负载(Payload)

    消息订阅者所具体接收的内容。

    【MQTT协议中的方法】

    MQTT协议中定义了一些方法(也被称为动作),来于表示对确定资源所进行操作。这个资源可以代表预先存在的数据或动态生成数据,这取决于服务器的实现。通常来说,资源指服务器上的文件或输出。主要方法有:

    (1)Connect。等待与服务器建立连接。

    (2)Disconnect。等待MQTT客户端完成所做的工作,并与服务器断开TCP/IP会话。

    (3)Subscribe。等待完成订阅。

    (4)UnSubscribe。等待服务器取消客户端的一个或多个topics订阅。

    (5)Publish。MQTT客户端发送消息请求,发送完成后返回应用程序线程。

    MQTT协议数据包结构

    在MQTT协议中,一个MQTT数据包由:固定头(Fixed header)、可变头(Variable header)、消息体(payload)三部分构成。MQTT数据包结构如下:

    (1)固定头(Fixed header)。存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识。

    (2)可变头(Variable header)。存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容。

    (3)消息体(Payload)。存在于部分MQTT数据包中,表示客户端收到的具体内容。

    【MQTT固定头】

    固定头存在于所有MQTT数据包中,其结构如下:

    MQTT数据包类型

    位置:Byte 1中bits 7-4。

    相于一个4位的无符号值,类型、取值及描述如下:

    标识位

    位置:Byte 1中bits 3-0。

    在不使用标识位的消息类型中,标识位被作为保留位。如果收到无效的标志时,接收端必须关闭网络连接:

    (1)DUP:发布消息的副本。用来保证消息的可靠传输,如果设置为1,则在下面的变长中增加MessageId,并且需要回复确认,以保证消息传输完成,但不能用于检测消息重复发送。

    (2)QoS:发布消息的服务质量,即:保证消息传递的次数。

             Ø00:最多一次,即:<=1

             Ø01:至少一次,即:>=1

             Ø10:一次,即:=1

             Ø11:预留

    (3)RETAIN: 发布保留标识,表示服务器要保留这次推送的信息,如果有新的订阅者出现,就把这消息推送给它;如果没有,那么推送至当前订阅者后释放。

    剩余长度

    地址:Byte 2。

    固定头的第二字节用来保存变长头部和消息体的总大小,但不是直接保存的。这一字节可以扩展,其保存机制,前7位用于保存长度,后一部用做标识。当最后一位为1时,表示长度不足,需要使用两个字节继续保存。

    【MQTT可变头】

     MQTT数据包中包含一个可变头,它驻位于固定的头和负载之间。可变头的内容因数据包类型而不同,较常的应用是作为包的标识:

    很多类型数据包中都包括一个2字节的数据包标识字段,这些类型的包有:PUBLISH (QoS > 0)、PUBACK、PUBREC、PUBREL、PUBCOMP、SUBSCRIBE、SUBACK、UNSUBSCRIBE、UNSUBACK。

    【Payload消息体】

     

    Payload消息体位于MQTT数据包的第三部分,包含CONNECT、SUBSCRIBE、SUBACK、UNSUBSCRIBE四种类型的消息:

    (1)CONNECT,消息体内容主要是:客户端的ClientID、订阅的Topic、Message以及用户名和密码。

    (2)SUBSCRIBE,消息体内容是一系列的要订阅的主题以及QoS。

    (3)SUBACK,消息体内容是服务器对于SUBSCRIBE所申请的主题及QoS进行确认和回复。

    (4)UNSUBSCRIBE,消息体内容是要订阅的主题。


    下面说一下为什么IoT要选择MQTT作为它的网络协议:

    为什么 MQTT 是最适合物联网的网络协议?

    物联网 (IoT) 设备必须连接互联网。通过连接到互联网,设备就能相互协作,以及与后端服务协同工作。互联网的基础网络协议是 TCP/IP。MQTT(消息队列遥测传输) 是基于 TCP/IP 协议栈而构建的,已成为 IoT 通信的标准。

    MQTT 最初由 IBM 于上世纪 90 年代晚期发明和开发。它最初的用途是将石油管道上的传感器与卫星相链接。顾名思义,它是一种支持在各方之间异步通信的消息协议。异步消息协议在空间和时间上将消息发送者与接收者分离,因此可以在不可靠的网络环境中进行扩展。虽然叫做消息队列遥测传输,但它与消息队列毫无关系,而是使用了一个发布和订阅的模型。在 2014 年末,它正式成为了一种 OASIS 开放标准,而且在一些流行的编程语言中受到支持(通过使用多种开源实现)。

    【为何选MQTT】

    MQTT 是一种轻量级的、灵活的网络协议,致力于为 IoT 开发人员实现适当的平衡:

    • 这个轻量级协议可在严重受限的设备硬件和高延迟/带宽有限的网络上实现。
    • 它的灵活性使得为 IoT 设备和服务的多样化应用场景提供支持成为可能。

    【为什么不选择其他众多网络协议】

     大多数开发人员已经熟悉 HTTP Web 服务。那么为什么不让 IoT 设备连接到 Web 服务?设备可采用 HTTP 请求的形式发送其数据,并采用 HTTP 响应的形式从系统接收更新。这种请求和响应模式存在一些严重的局限性:

    • HTTP 是一种同步协议。客户端需要等待服务器响应。Web 浏览器具有这样的要求,但它的代价是牺牲了可伸缩性。在 IoT 领域,大量设备以及很可能不可靠或高延迟的网络使得同步通信成为问题。异步消息协议更适合 IoT 应用程序。传感器发送读数,让网络确定将其传送到目标设备和服务的最佳路线和时间。
    • HTTP 是单向的。客户端必须发起连接。在 IoT 应用程序中,设备或传感器通常是客户端,这意味着它们无法被动地接收来自网络的命令。
    • HTTP 是一种 1-1 协议。客户端发出请求,服务器进行响应。将消息传送到网络上的所有设备上,不但很困难,而且成本很高,而这是 IoT 应用程序中的一种常见使用情况。
    • HTTP 是一种有许多标头和规则的重量级协议。它不适合受限的网络。

    出于上述原因,大部分高性能、可扩展的系统都使用异步消息总线来进行内部数据交换,而不使用 Web 服务。事实上,企业中间件系统中使用的最流行的消息协议被称为 AMQP(高级消息排队协议)。但是,在高性能环境中,计算能力和网络延迟通常不是问题。AMQP 致力于在企业应用程序中实现可靠性和互操作性。它拥有庞大的特性集,但不适合资源受限的 IoT 应用程序。

    除了 AMQP 之外,还有其他流行的消息协议。例如,XMPP(Extensible Messaging and Presence Protocol,可扩展消息和状态协议)是一种对等即时消息 (IM) 协议。它高度依赖于支持 IM 用例的特性,比如存在状态和介质连接。与 MQTT 相比,它在设备和网络上需要的资源都要多得多。

    那么,MQTT 为什么如此轻量且灵活?MQTT 协议的一个关键特性是发布和订阅模型。与所有消息协议一样,它将数据的发布者与使用者分离。

    【发布和订阅模型】

    MQTT 协议在网络中定义了两种实体类型:一个消息代理和一些客户端。代理是一个服务器,它从客户端接收所有消息,然后将这些消息路由到相关的目标客户端。客户端是能够与代理交互来发送和接收消息的任何事物。客户端可以是现场的 IoT 传感器,或者是数据中心内处理 IoT 数据的应用程序。

    1. 客户端连接到代理。它可以订阅代理中的任何消息 “主题”。此连接可以是简单的 TCP/IP 连接,也可以是用于发送敏感消息的加密 TLS 连接。
    2. 客户端通过将消息和主题发送给代理,发布某个主题范围内的消息。
    3. 代理然后将消息转发给所有订阅该主题的客户端。

    因为 MQTT 消息是按主题进行组织的,所以应用程序开发人员能灵活地指定某些客户端只能与某些消息交互。例如,传感器将在 “sensor_data” 主题范围内发布读数,并订阅 “config_change” 主题。将传感器数据保存到后端数据库中的数据处理应用程序会订阅 “sensor_data” 主题。管理控制台应用程序能接收系统管理员的命令来调整传感器的配置,比如灵敏度和采样频率,并将这些更改发布到 “config_change” 主题。(参阅图 1。)

    图 1. IoT 传感器的 MQTT 发布和订阅模型

    同时,MQTT 是轻量级的。它有一个用来指定消息类型的简单标头,有一个基于文本的主题,还有一个任意的二进制有效负载。应用程序可对有效负载采用任何数据格式,比如 JSON、XML、加密二进制或 Base64,只要目标客户端能够解析该有效负载。 

    展开全文
  • MQTT协议

    2015-04-15 15:21:54
    中文和英文的mqtt文档,学习mqtt的首选,翻译的还不错.
  • mqtt协议_MQTT协议浅析

    2020-12-05 02:42:41
    MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)最早是IBM开发的一个即时通讯协议,MQTT协议是为大量计算能力有限且工作在低带宽、不可靠网络的远程传感器和控制设备通讯而设计的一种协议。MQTT协议...
  • MQTT协议MQTT协议解析(MQTT数据包结构)

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 7,208
精华内容 2,883
关键字:

mqtt协议