精华内容
下载资源
问答
  • 了解MQTT协议以及利用Netty搭建MQTT服务器MQTT协议协议简介设计原则灵活的发布订阅和主题设计带宽消耗最小化三个可选的 QoS 等级会话保持在线状态感知开源 MQTT 服务器MQTT 发布订阅模式介绍发布订阅模式消息路由...

    MQTT协议协议简介

    MQTT,Message Queuing Telemetry Transport,消息队列遥测传输协议
    基于发布/订阅(publish/subscribe)模式
    构建于TCP/IP协议上

    MQTT协议包含3种角色:
    发布者:Publish
    代理:Broker,也就是MQTT服务器
    订阅者:Subscribe

    设计原则

    简单容易实现,支持 QoS(设备网络环境复杂),轻量且省带宽(因为那时候带宽很贵),数据无关(不关心 Payload 数据格式),有持续地会话感知能力(时刻知道设备是否在线)。

    灵活的发布订阅和主题设计

    发布订阅模式是传统 Client/Server 模式的一种解耦方案。发布者通过 Broker 与消费者之间通信,Broker 的作用是将收到的消息通过某种过滤规则,正确地发送给消费者。发布/订阅模式 相对于 客户端/服务器模式 的好处在于:

    • 发布者和消费者之间不必预先知道对方的存在,比如不需要预先沟通对方的 IP Address 和 Port
    • 发布者和消费者之间不必同时运行。因为 Broker 是一直运行的。

    发布订阅模式的优点还在于发布者与订阅者的解耦,这种解耦表现在以下两个方面:

    • 空间解耦,订阅者与发布者不需要建立直接连接,新的订阅者想要加入网络时不需要修改发布者的行为。
    • 时间解耦,订阅者和发布者不需要同时在线,即便不存在订阅者也不影响发布者发布消息。

    在 MQTT 协议里,上面提到的 过滤规则Topic。比如:所有发布到 news 这个 Topic 的消息,都会被 Broker 转发给已经订阅了news 的订阅者:
    在这里插入图片描述
    上图中订阅者预先订阅了 news,然后发布者向 Broker 发布了一条消息 “some msg” 并指定发布到 news 主题,Broker 通过 Topic 匹配,决定将这条消息转发给订阅者。

    MQTT与消息队列的区别:

    • MQTT 并不要求发布或者订阅之前显式地创建主题,唯一可能造成的不良影响是客户端可能使用错误的主题而不自知,但显然灵活部署带来的收益更高
    • 消息队列主要用于服务端应用之间的消息存储与转发,这类场景往往数据量大但接入量少,而 MQTT 面向的是 IoT 领域和移动互联网领域,这类场景的侧重点是海量的设备接入、管理与消息传输

    代理作为发布订阅模式的关键角色,它需要准确、高效地向订阅者转发其期望的消息,一般来说,比较常用的有以下两种方式:

    • 根据主题。订阅者向代理订阅自己感兴趣的主题,发布者发布的所有消息中都会包含自己的主题,代理根据消息的主题判断需要将消息转发给哪些订阅者。
    • 根据消息内容。订阅者定义其感兴趣的消息的条件,只有当消息的属性或内容满足订阅者定义的条件时,消息才会被投递到该订阅者。严格来讲,主题也可以算是消息内容的一种。

    MQTT 的 Topic 有层级结构,并且支持通配符 +#:

    • +是匹配单层的通配符。比如 news/+ 可以匹配 news/sportsnews/+/basketball 可匹配到 news/sports/basketball
    • # 是一到多层的通配符。比如 news/# 可以匹配 newsnews/sportsnews/sports/basketball 以及 news/sports/basketball/x 等等

    MQTT 的主题是不要预先创建的,发布者发送消息到某个主题、或者订阅者订阅某个主题的时候,Broker 就会自动创建这个主题。

    带宽消耗最小化

    MQTT 协议将协议本身占用的额外消耗最小化,消息头部最小只需要占用 2 个字节。

    MQTT 的消息格式分三部分:

    • 固定长度头部,2 个字节,所有消息类型里都有
    • 可变长度头部,只有某些消息类型里有
    • Payload,只有某些消息类型里有

    MQTT 的主要消息类型有:

    项目报文流动方向描述
    CONNECT1客户端到服务端客户端请求连接服务端
    CONNACK2服务端到客户端连接报文确认
    PUBLISH3两个方向都允许发布消息
    PUBACK4两个方向都允许QoS 1消息发布收到确认
    PUBREC5两个方向都允许发布收到(保证交付第一步)
    PUBREL6两个方向都允许发布释放(保证交付第二步)
    PUBCOMP7两个方向都允许QoS 2消息发布完成(保证交互第三步)
    SUBSCRIBE8客户端到服务端客户端订阅请求
    SUBACK9服务端到客户端订阅请求报文确认
    UNSUBSCRIBE10客户端到服务端客户端取消订阅请求
    UNSUBACK11服务端到客户端取消订阅报文确认
    PINGREQ12客户端到服务端心跳请求
    PINGRESP13服务端到客户端心跳响应
    DISCONNECT14客户端到服务端客户端断开连接

    其中0、15作为保留值, PINGREQ / PINGRESP 和 DISCONNECT 报文是不需要可变头部的,也没有 Payload,也就是说它们的报文大小仅仅消耗 2 个字节。

    在 CONNECT 报文的可变长度头部里,有个 Protocol Version 的字段。为了节省空间,只有一个字节。所以版本号不是按照字符串 “3.1.1” 存放的,而是使用数字 4 来表示 3.1.1 版本。

    QoS 等级

    为适应设备不同的网络环境,MQTT 设计了 3 个 QoS 等级,0, 1, 2:

    • At most once (0)
    • At least once (1)
    • Exactly once (2)

    QoS 0 是一种 “fire and forget” 的消息发送模式:Sender (可能是 Publisher 或者 Broker) 发送一条消息之后,就不再关心它有没有发送到对方,也不设置任何重发机制。

    QoS 1 包含了简单的重发机制,Sender 发送消息之后等待接收者的 ACK,如果没收到 ACK 则重新发送消息。这种模式能保证消息至少能到达一次,但无法保证消息重复。

    QoS 2 设计了略微复杂的重发和重复消息发现机制,保证消息到达对方并且严格只到达一次。

    会话保持

    MQTT 没有假设设备或 Broker 使用了 TCP 的保活机制,而是设计了协议层的保活机制:在 CONNECT 报文里可设置 Keepalive 字段,来设置保活心跳包 PINGREQ/PINGRESP 的发送时间间隔。当长时间无法收到设备的 PINGREQ 的时候,Broker 就会认为设备已经下线。

    总的来说,Keepalive 有两个作用:

    • 发现对端死亡或者网络中断
    • 在长时间无消息交互的情况下,保持连接不被网络设备断开

    对于那些想要在重新上线后,重新收到离线期间错过的消息的设备,MQTT 设计了持久化连接:在 CONNECT 报文里可设置 CleanSession 字段为 False,则 Broker 会为终端存储:

    • 设备所有的订阅
    • 还未被设备确认的 QoS1 和 QoS 消息
    • 设备离线时错过的消息

    在线状态感知

    MQTT 设计了遗愿(Last Will) 消息,让 Broker 在发现设备异常下线的情况下,帮助设备发布一条遗愿消息到指定的主题。

    开源 MQTT 服务器

    到目前为止,比较流行的开源 MQTT 服务器有几个:

    1. Eclipse Mosquitto

      使用 C 语言实现的 MQTT 服务器。Eclipse 组织还还包含了大量的 MQTT 客户端项目:https://www.eclipse.org/paho/#

    2. EMQ X

      使用 Erlang 语言开发的 MQTT 服务器,内置强大的规则引擎,支持许多其他 IoT 协议比如 MQTT-SN、 CoAP、LwM2M 等。

    3. Mosca

      使用 Node.JS 开发的 MQTT 服务器,简单易用。

    4. VerneMQ

      同样使用 Erlang 开发的 MQTT 服务器.

    MQTT 主题特性

    主题简介

    MQTT 协议 通过网络传输应用消息,应用消息通过 MQTT 传输时,它们有关联的服务质量(QoS)和主题(Topic)。主题本质上是一个字符串,MQTT 协议规定主题是 UTF-8 编码的字符串,这意味着,主题过滤器和主题名的比较可以通过比较编码后的 UTF-8 字节或解码后的 Unicode 字符。

    主题名和主题过滤器

    • 主题名 附加在应用消息上的一个标签,服务端已知且与订阅匹配。服务端发送应用消息的一个副本给每一个匹配的客户端订阅。
    • 主题过滤器 订阅中包含的一个表达式,用于表示相关的一个或多个主题。主题过滤器可以使用通配符。

    如果订阅的主题过滤器与消息的主题名匹配,应用消息会被发送给每一个匹配的客户端订阅。主题资源可以是管理员在服务端预先定义好的,也可以是服务端收到第一个订阅或使用那个主题名的应用消息时动态添加的。服务端可以使用一个安全组件有选择地授权客户端使用某个主题资源。

    主题和主题过滤器命名的规则

    • 所有的主题名和主题过滤器必须至少包含一个字符。
    • 主题名和主题过滤器是大小写敏感的。ACCOUNTSAccounts 是不同的主题名。
    • 主题名和主题过滤器可以包含空格字符。Accounts payable 是合法的主题名
    • 主题名或主题过滤器以前置或后置斜杠 / 区分。/financefinance 是不同的。
    • 只包含斜杠 / 的主题名或主题过滤器是合法的。
    • 主题名和主题过滤器不能包含null 字符(Unicode U+0000)。
    • 主题名和主题过滤器是 UTF-8 编码字符串,除了不能超过 UTF-8 编码字符串的长度限制之外,主题名或主题过滤器的层级数量没有其它限制。

    主题层级

    主题层级分隔符

    斜杠(“/” U+002F)用于分割主题的每个层级,为主题名提供一个分层结构。分隔符用于将结构化引入主题名。如果存在分隔符,它将主题名分割为多个主题层级,是消息主题层级设计中很重要的符号。 比方说:aaa/bbbaaa/bbb/cccaaa/bbb/ccc/ddd 这样的消息主题格式,是一个层层递进的关系,可通过多层通配符同时匹配两者,或者单层通配符只匹配一个。 这在现实场景中,可以应用到:公司的部门层级推送、国家城市层级推送等包含层级关系的场景。

    MQTT 订阅报文包含一个主题过滤器(Topic Filter)和一个最大的服务质量(QoS)等级。订阅的主题过滤器可以包含特殊的通配符,允许客户端一次订阅多个主题。当客户端订阅指定的主题过滤器包含两种通配符时,主题层级分隔符就很有用了。主题层级分隔符可以出现在主题过滤器或主题名字的任何位置。相邻的主题层次分隔符表示一个零长度的主题层级。

    主题过滤器中可以使用通配符,但是主题名不能使用通配符。单层通配符和多层通配符只能用于订阅 (subscribe) 消息而不能用于发布
    (publish) 消息,层级分隔符两种情况下均可使用。

    多层通配符

    井字符号(“#” U+0023)是用于匹配主题中任意层级的通配符。多层通配符表示它的父级和任意数量的子层级。

    例如,如果客户端订阅主题 sport/tennis/player1/#,它会收到使用下列主题名发布的消息:

    • sport/tennis/player1
    • sport/tennis/player1/ranking
    • sport/tennis/player1/score/wimbledon

    因为多层通配符包括它自己的父级,所以 sport/# 也匹配单独的 sport 主题名,sport/tennis/player1/# 也可以匹配 sport/tennis/player1

    单独的多层通配符 # 是有效的,它会收到所有的应用消息。

    多层通配符必须单独指定,或者跟在主题层级分隔符后面。多层通配符必须是主题过滤器的最后一个字符。因此,sport/tennis#sport/tennis/#/ranking 都是无效的多层通配符。

    单层通配符

    加号 (“+” U+002B) 是只能用于单个主题层级匹配的通配符。例如,sport/tennis/+ 匹配 sport/tennis/player1sport/tennis/player2 ,但是不匹配 sport/tennis/player1/ranking。同时,由于单层通配符只能匹配一个层级,sport/+ 不匹配 sport 但是却匹配 sport/

    在主题过滤器的任意层级都可以使用单层通配符,包括第一个和最后一个层级,可以在主题过滤器中的多个层级中使用它,也可以和多层通配符一起使用,++/tennis/#sport/+/player1 都有有效的。在使用单层通配符时,单层通配符占据过滤器的整个层级,sport+ 是无效的。

    以 $ 开头的主题

    服务端不能将$ 字符开头的主题名匹配通配符 (#+) 开头的主题过滤器, 订阅 # 的客户端不会收到任何发布到以 $ 开头主题的消息,订阅 +/monitor/Clients 的客户端也不会收到任何发布到 $SYS/monitor/Clients 的消息。服务端应该阻止客户端使用这种主题名与其他客户端交换消息,客户端注意不能使用 $ 字符开头的主题。

    服务端实现可以将 $ 开头的主题名用作其他目的。,例如 $SYS/ 被广泛用作包含服务器特定信息或控制接口的主题的前缀。订阅 $SYS/# 的客户端会收到发布到以 $SYS/ 开头主题的消息,订阅 $SYS/monitor/+ 的客户端会收到发布到 $SYS/monitor/Clients 主题的消息,如果客户端想同时接受以 $SYS/ 开头主题的消息和不以 $ 开头主题的消息,它需要同时订阅 # 和 $SYS/#。

    QoS(服务质量)

    MQTT 协议 中规定了消息服务质量(Quality of Service),它保证了在不同的网络环境下消息传递的可靠性,QoS 的设计是 MQTT 协议里的重点。作为专为物联网场景设计的协议,MQTT 的运行场景不仅仅是 PC,而是更广泛的窄带宽网络和低功耗设备,如果能在协议层解决传输质量的问题,将为物联网应用的开发提供极大便利。

    MQTT QoS 等级

    MQTT 设计了 3 个 QoS 等级。

    • QoS 0:消息最多传递一次,如果当时客户端不可用,则会丢失该消息。
    • QoS 1:消息传递至少 1 次。
    • QoS 2:消息仅传送一次。

    QoS 0 是一种 “fire and forget” 的消息发送模式:Sender (可能是 Publisher 或者 Broker) 发送一条消息之后,就不再关心它有没有发送到对方,也不设置任何重发机制。

    QoS 1 包含了简单的重发机制,Sender 发送消息之后等待接收者的 ACK,如果没收到 ACK 则重新发送消息。这种模式能保证消息至少能到达一次,但无法保证消息重复。

    QoS 2 设计了重发和重复消息发现机制,保证消息到达对方并且严格只到达一次。

    工作原理

    QoS 0 - 最多分发一次
    当 QoS 为 0 时,消息的分发依赖于底层网络的能力。发布者只会发布一次消息,接收者不会应答消息,发布者也不会储存和重发消息。消息在这个等级下具有最高的传输效率,但可能送达一次也可能根本没送达。
    在这里插入图片描述
    Qos 1 - 至少分发一次
    当 QoS 为 1 时,可以保证消息至少送达一次。MQTT 通过简单的 ACK 机制来保证 QoS 1。发布者会发布消息,并等待接收者的 PUBACK 报文的应答,如果在规定的时间内没有收到 PUBACK 的应答,发布者会将消息的 DUP 置为 1 并重发消息。接收者接收到 QoS 为 1 的消息时应该回应 PUBACK 报文,接收者可能会多次接受同一个消息,无论 DUP 标志如何,接收者都会将收到的消息当作一个新的消息并发送 PUBACK 报文应答。

    在这里插入图片描述
    QoS 2 - 只分发一次
    当 QoS 为 2 时,发布者和订阅者通过两次会话来保证消息只被传递一次,这是最高等级的服务质量,消息丢失和重复都是不可接受的。使用这个服务质量等级会有额外的开销。

    发布者发布 QoS 为 2 的消息之后,会将发布的消息储存起来并等待接收者回复 PUBREC 的消息,发送者收到 PUBREC 消息后,它就可以安全丢弃掉之前的发布消息,因为它已经知道接收者成功收到了消息。发布者会保存 PUBREC 消息并应答一个 PUBREL,等待接收者回复 PUBCOMP 消息,当发送者收到 PUBCOMP 消息之后会清空之前所保存的状态。

    当接收者接收到一条 QoS 为 2 的 PUBLISH 消息时,他会处理此消息并返回一条 PUBREC 进行应答。当接收者收到 PUBREL 消息之后,它会丢弃掉所有已保存的状态,并回复 PUBCOMP。

    无论在传输过程中何时出现丢包,发送端都负责重发上一条消息。不管发送端是 Publisher 还是 Broker,都是如此。因此,接收端也需要对每一条命令消息都进行应答。
    在这里插入图片描述

    QoS 在发布与订阅中的区别

    MQTT 发布与订阅操作中的 QoS 代表了不同的含义,发布时的 QoS 表示消息发送到服务端时使用的 QoS,订阅时的 QoS 表示服务端向自己转发消息时可以使用的最大 QoS。

    • 当客户端 A 的发布 QoS 大于客户端 B 的订阅 QoS 时,服务端向客户端 B 转发消息时使用的 QoS 为客户端 B 的订阅 QoS。
    • 当客户端 A 的发布 QoS 小于客户端 B 的订阅 QoS 时,服务端向客户端 B 转发消息时使用的 QoS 为客户端 A 的发布 QoS。

    不同情况下客户端收到的消息 QoS 可参考下表:

    发布消息的 QoS主题订阅的 QoS接收消息的 QoS
    000
    010
    020
    100
    111
    121
    200
    211
    222

    如何选择 MQTT QoS 等级

    QoS 级别越高,流程越复杂,系统资源消耗越大。应用程序可以根据自己的网络场景和业务需求,选择合适的 QoS 级别。

    以下情况下可以选择 QoS 0

    • 可以接受消息偶尔丢失。
    • 在同一个子网内部的服务间的消息交互,或其他客户端与服务端网络非常稳定的场景。

    以下情况下可以选择 QoS 1

    • 对系统资源消耗较为关注,希望性能最优化。
    • 消息不能丢失,但能接受并处理重复的消息。

    以下情况下可以选择 QoS 2

    • 不能忍受消息丢失(消息的丢失会造成生命或财产的损失),且不希望收到重复的消息。
    • 数据完整性与及时性要求较高的银行、消防、航空等行业。

    MQTT 会话

    什么是会话?

    我们将从客户端向服务端发起 MQTT 连接请求开始,到连接中断直到会话过期为止的消息收发序列称之为会话。因此,会话可能仅持续一个网络连接,也可能跨越多个网络连接存在,如果客户端能在会话过期之前重新建立了连接的话。

    在 MQTT v5 中会话过期时间由 Session Expiry Interval 字段决定,早前版本的协议没有限制会话过期时间,但通常由 MQTT 服务端决定。

    什么是会话状态?

    MQTT 要求客户端与服务端在会话有效期内存储一系列与客户端标识相关联的状态,称之为会话状态。

    客户端需要存储以下会话状态:

    • 已发送给服务端,但是还没有完成确认的 QoS 1 与 QoS 2 消息。
    • 从服务端收到的,但是还没有完成确认的 QoS 2 消息。

    服务端需要存储以下会话状态:

    • 会话是否存在,即使会话状态其余部分为空。
    • 客户端订阅信息,包括任何订阅标识符。
    • 已发送给客户端,但是还没有完成确认的 QoS 1 与 QoS 2 消息。
    • 等待传输给客户端的 QoS 0 消息(可选),QoS 1 与 QoS 2 消息。
    • 从客户端收到的,但是还没有完成确认的 QoS 2 消息,遗嘱消息和遗嘱延时间隔。
    • 会话过期时间。

    会话状态的使用

    如果客户端因为网络波动等原因导致连接短暂中断,但在会话过期前重新与服务端建立了连接,那么就可以沿用上次连接建立的订阅关系,不需要重新订阅一遍。在低带宽、不稳定的网络场景下,网络中断可能会发生得很频繁,保存会话状态的方式避免了每次连接都需要重新订阅,降低了重连时客户端和服务端的资源消耗。服务端在客户端脱机期间为其保留未完成确认的以及后续到达的消息,客户端重新连接时再一并转发,既可以避免消息丢失,也能够降低某些场景下用户对网络变化的感知度。

    会话的开始与结束

    MQTT v5.0 与 v3.1.1 在会话上有着较为显著的变化。MQTT v3.1.1 只有一个 Clean Session 字段,由客户端在连接时指定,为 1 表示客户端和服务器必须丢弃任何先前的会话并创建一个新的会话,且这个会话的生命周期与网络连接保持一致;为 0 则表示服务端必须使用与 Client ID 关联的会话来恢复与客户端的通信(除非会话不存在),客户端和服务器在断开连接后必须存储会话的状态。

    MQTT v3.1.1 没有规定持久会话应该在什么时候过期,如果仅从协议层面理解的话,这个持久会话应该永久存在。但在实际场景中这并不现实,因为它会非常占用服务端的资源,所以服务端通常不会遵循协议来实现,而是向用户提供一个全局配置来限制会话过期时间。

    而到了 MQTT 5.0,这个问题得到了妥善的解决,Clean Session 字段被拆分成了 Clean Start 字段与 Session Expiry Interval 字段。Clean Start 字段指定是否需要全新的会话,Session Expiry Interval 字段指定会话过期时间,它们在连接时指定,但 Session Expiry Interval 字段可以在客户端断开连接时被更新。因此我们可以很轻易地实现客户端网络连接异常断开时会话被保留,客户端正常下线时会话则随着连接关闭而终结的功能。

    客户端如何知道这是被恢复的会话?

    显而易见的是,当客户端以期望从先前建立的会话恢复状态的方式发起连接,它需要知道服务端是否存在相应的会话,才能决定在连接建立后是否需要重复一遍订阅操作。关于这一点,MQTT 协议从 v3.1.1 开始,就为 CONNACK 报文设计了 Session Present 字段,用于表示当前连接使用的是否是一个全新会话,客户端可以根据这个字段的值进行判断。

    使用建议

    开发者需要特别注意 ClientID 与会话之间的联系,如果某些场景下同一个 ClientID 会被不同的应用或者用户多次使用,即每次连接都会有完全不同的行为,那么就需要确保每次连接时都请求了全新的会话。合理地评估是否需要持久会话,如非必要可以在正常离线时将会话设置为立即过期减少服务端资源占用。设置合适的会话过期时间,设置过短,可能会失去存储会话状态的意义,设置过长,可能会过多地占用服务端资源。

    保留消息

    发布订阅模式虽然让消息的发布者与订阅者充分解耦,但也出现了一个隐含的问题,即订阅者无法主动向发布者请求消息,订阅者何时收到消息完全依赖于发布者何时发布消息,这在某些场景中就产生了不便。例如,某设备定期发布自身 GPS 坐标,但对于订阅者而言,从它发起订阅到第一次收到数据可能需要几秒钟,也可能需要十几分钟甚至更多,这样并不友好。因此 MQTT 引入了保留消息。

    在这里插入图片描述
    当服务端收到 Retain 标志为 1 的 PUBLISH 报文时,它将进行以下操作:

    1. 如果存在匹配此主题名的订阅者,则按正常逻辑进行转发,并在转发前清除 Retain 标志。MQTT v3.1.1 协议中 Retain 标志必须被清除,而 MQTT v5.0 协议则在订阅选项中新增了一个 Retain As Publish 字段,由客户端自行指示服务端在转发前是否需要清除 Retain 标志。
    2. 如果 Payload 非空,存储此应用消息,如果此主题名下已经存在保留消息则进行替换。如果 Payload 为空,服务端不会存储此应用消息,同时清除此主题名下已经存在的保留消息。

    而每当有订阅者建立订阅时,服务端就会查找是否存在匹配该订阅的保留消息,如果保留消息存在,就会立即转发给订阅者。当保留消息在这种情况下被转发给订阅者时,它的 Retain 标志必须保持为 1。相比 MQTT v3.1.1,MQTT v5.0 对于订阅建立时是否发送保留消息做了更细致的划分,并在订阅选项中提供了 Retain Handling 字段。例如某些客户端可能仅希望在首次订阅时接收保留消息,又或者不希望在订阅建立时接收保留消息,都可以通过 Retain Handling 选项调整。

    保留消息虽然存储在服务端中,但它并不属于会话的一部分。也就是说,即便发布这个保留消息的会话终结,保留消息也不会被删除。删除保留消息只有两种方式:

    1. 前文已经提到过的,客户端往某个主题发送一个 Payload 为空的保留消息,服务端就会删除这个主题下的保留消息。
    2. 消息过期间隔属性在保留消息中同样适用,如果客户端设置了这一属性,那么保留消息在服务端存储超过过期时间后就会被删除。

    借助保留消息,新的订阅者能够立即获取最近的状态,而不需要等待无法预期的时间,这在很多场景下很非常重要的。

    遗嘱消息(Will Message)

    当客户端断开连接时,发送给相关的订阅者的遗嘱消息。以下情况下会发送 Will Message:

    • 服务端发生了I/O 错误或者网络失败;
    • 客户端在定义的心跳时期失联;
    • 客户端在发送下线包之前关闭网络连接;
    • 服务端在收到下线包之前关闭网络连接。

    遗嘱消息一般通过在客户端 CONNECT 的时候指定。如下所示,在连接的时候通过调用 MqttConnectOptions 实例的 setWill 方法来设定。任何订阅了下面的主题的客户端都可以收到该遗嘱消息。

    参考:
    emq官网
    Netty实现高性能IOT服务器(Groza)之手撕MQTT协议篇上

    展开全文
  • 后面无意中发现了Netty框架,一个实现了大量网络协议的框架,于是就基于Netty实现了mqtt客户端。 环境搭建 要开发一个mqtt客户端,我们首先就需要搭建一个完整的mqtt通讯环境。 服务器:EMQX 下载windows压缩包并...

    前言

    在Android开发中,之前一直使用org.eclipse.paho.client.mqttv3包来开发mqtt客户端,随之就遇到了线程不回收的难搞问题,还伴随其他一些莫名其妙的问题。事实教育我,mqttv3虽然用的人多,却并不稳定。后面无意中发现了Netty框架,一个实现了大量网络协议的框架,于是就基于Netty实现了mqtt客户端。

    环境搭建

    要开发一个mqtt客户端,我们首先就需要搭建一个完整的mqtt通讯环境。

    服务器:EMQX

    下载windows压缩包并解压:
    https://www.emqx.com/en/downloads?product=broker

    启动指令:
    ./bin/emqx start

    停止指令:
    ./bin/emqx stop

    启动服务后台:
    http://127.0.0.1:18083 用户名:admin 密码:public

    配置连接认证:
    –打开文件:
    \etc\plugins\emqx_auth_mnesia.conf
    –修改匹配算法:
    auth.mnesia.password_hash = plain
    –添加认证用户:
    auth.user.1.username = testuser
    auth.user.1.password = 123456
    –启动插件:
    重启服务,进入服务后台,插件–>找到emqx_auth_mnesia–>启动
    –参考文档:
    https://docs.emqx.cn/broker/v4.3/

    客户端:MQTT X

    下载:https://mqttx.app/zh

    展开全文
  • Netty是业界最流行的nio框架之一,结合springboot可以满足快速开发 MQTT(Message Queuing ...Netty也可以实现MQTT协议,他的内部封装了MQTT协议的相关对象。 使用Netty+SpringBoot方式可以快速地开发一套基于MQT

    Netty是业界最流行的nio框架之一,结合springboot可以满足快速开发

    MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上的。MQTT协议的可以用在物联网、小型设备、还有移动应用上。

    Netty也可以实现MQTT协议,他的内部封装了MQTT协议的相关对象。

    使用Netty+SpringBoot方式可以快速地开发一套基于MQTT协议(主要是MQTT3.1和MQTT3.1.1)的服务端程序

    SpringBoot+Netty创建,pom.xml文件导入依赖包

    
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
    	</properties>
     
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>2.1.6.RELEASE</version>
    		<relativePath /> <!-- lookup parent from repository -->
    	</parent>
     
    	<dependencies>
    	
    		<!--web模块的启动器 -->
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
    		<!-- netty依赖 springboot2.x自动导入版本 -->
    		<dependency>
    			<groupId>io.netty</groupId>
    			<artifactId>netty-all</artifactId>
    		</dependency>
    		
    	</dependencies>

     Springboot启动类,直接在main里面启动netty的MQTT服务(也包含web应用的)

    package boot.example.mqtt.server;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    import boot.example.mqtt.server.server.BootNettyServer;
    
    
    
    @SpringBootApplication
    public class BootNettyApplication 
    {
        public static void main( String[] args )
        {
    		SpringApplication app = new SpringApplication(BootNettyApplication.class);
    		app.run(args);
    		// 启动  1883
            new BootNettyServer().startup();
        }
    }

    Netty的MQTT启动类

    package boot.example.mqtt.server.server;
    
    
    import boot.example.mqtt.server.adapter.BootChannelInboundHandler;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.PooledByteBufAllocator;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.mqtt.MqttDecoder;
    import io.netty.handler.codec.mqtt.MqttEncoder;
    import io.netty.handler.timeout.IdleStateHandler;
    
    
    public class BootNettyServer {
    
    	private int port = 1883;
    	
    	private NioEventLoopGroup bossGroup;
    
    	private NioEventLoopGroup workGroup;
    
    	/**
    	 * 	启动服务
    	 * @throws InterruptedException 
    	 */
    	public void startup() {
    
    		try {
    			bossGroup = new NioEventLoopGroup(1);
    			workGroup = new NioEventLoopGroup();
    
    			ServerBootstrap bootstrap = new ServerBootstrap();
    			bootstrap.group(bossGroup, workGroup);
    			bootstrap.channel(NioServerSocketChannel.class);
    
    			bootstrap.option(ChannelOption.SO_REUSEADDR, true)
    					.option(ChannelOption.SO_BACKLOG, 1024)
    					.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
    					.option(ChannelOption.SO_RCVBUF, 10485760);
    
    			bootstrap.childOption(ChannelOption.TCP_NODELAY, true)
    					.childOption(ChannelOption.SO_KEEPALIVE, true)
    					.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    
    			bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
    						protected void initChannel(SocketChannel ch) {
    							ChannelPipeline channelPipeline = ch.pipeline();
    							// 设置读写空闲超时时间
    							channelPipeline.addLast(new IdleStateHandler(600, 600, 1200));
    							channelPipeline.addLast("encoder", MqttEncoder.INSTANCE);
    							channelPipeline.addLast("decoder", new MqttDecoder());
    							channelPipeline.addLast(new BootChannelInboundHandler());
    						}
    					});
    			ChannelFuture f = bootstrap.bind(port).sync();
    			f.channel().closeFuture().sync();
    			
    		} catch (Exception e) {
    			System.out.println("start exception"+e.toString());
    		}
    
    	}
    
    	/**
    	 * 	关闭服务
    	 */
    	public void shutdown() throws InterruptedException {
    		if (workGroup != null && bossGroup != null) {
    			bossGroup.shutdownGracefully();
    			workGroup.shutdownGracefully();
    			System.out.println("shutdown success");
    		}
    	}
    
    }
    

    MQTT服务端I/O数据读写处理类

    package boot.example.mqtt.server.adapter;
    
    
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandler;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.handler.codec.mqtt.*;
    import java.io.IOException;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    
    @ChannelHandler.Sharable
    public class BootChannelInboundHandler extends ChannelInboundHandlerAdapter {
    
    	private Logger log =  LoggerFactory.getLogger(this.getClass());
    	
        /**
         * 	客户端与服务端第一次建立连接时执行 在channelActive方法之前执行
         */
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
            super.channelRegistered(ctx);
        }
        
        /**
         * 	客户端与服务端 断连时执行 channelInactive方法之后执行
         */
        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            super.channelUnregistered(ctx);
        }
    
        /**
         * 	从客户端收到新的数据时,这个方法会在收到消息时被调用
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException {
        	if (null != msg) {
                MqttMessage mqttMessage = (MqttMessage) msg;
                log.info("info--"+mqttMessage.toString());
                MqttFixedHeader mqttFixedHeader = mqttMessage.fixedHeader();
                Channel channel = ctx.channel();
    
                if(mqttFixedHeader.messageType().equals(MqttMessageType.CONNECT)){
                	//	在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接
                	//	to do 建议connect消息单独处理,用来对客户端进行认证管理等 这里直接返回一个CONNACK消息
                	BootMqttMsgBack.connack(channel, mqttMessage);
                }
    
                switch (mqttFixedHeader.messageType()){
                    case PUBLISH:		//	客户端发布消息
                    	//	PUBACK报文是对QoS 1等级的PUBLISH报文的响应
                    	System.out.println("123");
                    	BootMqttMsgBack.puback(channel, mqttMessage);
                        break;
                    case PUBREL:		//	发布释放
                    	//	PUBREL报文是对PUBREC报文的响应
                    	//	to do
                    	BootMqttMsgBack.pubcomp(channel, mqttMessage);
                        break;
                    case SUBSCRIBE:		//	客户端订阅主题
                    	//	客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题。
                    	//	为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端。
                    	//	SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端
                    	// 	to do
                    	BootMqttMsgBack.suback(channel, mqttMessage);
                        break;
                    case UNSUBSCRIBE:	//	客户端取消订阅
                    	//	客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题
                    	//	to do
                    	BootMqttMsgBack.unsuback(channel, mqttMessage);
                        break;
                    case PINGREQ:		//	客户端发起心跳
                    	//	客户端发送PINGREQ报文给服务端的
                    	//	在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着
                    	//	请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开
                    	BootMqttMsgBack.pingresp(channel, mqttMessage);
                        break;
                    case DISCONNECT:	//	客户端主动断开连接
                    	//	DISCONNECT报文是客户端发给服务端的最后一个控制报文, 服务端必须验证所有的保留位都被设置为0
                    	//	to do
                        break;
                    default:
                        break;
                }
        	}
        }
    
        /**
         * 	从客户端收到新的数据、读取完成时调用
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws IOException {
        }
    
        /**
         * 	当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            super.exceptionCaught(ctx, cause);  
            ctx.close();
        }
    
        /**
         * 	客户端与服务端第一次建立连接时执行
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            super.channelActive(ctx);
        }
    
        /**
         * 	客户端与服务端 断连时执行
         */
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {
            super.channelInactive(ctx);
        }
        
        /**
         * 	服务端 当读超时时 会调用这个方法
         */
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception, IOException {
            super.userEventTriggered(ctx, evt);
            ctx.close();
        }
    
        
        @Override
        public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
            super.channelWritabilityChanged(ctx);
        }
    
    }
    

    对MQTT客户端发送消息后,处理的返回消息,基于MQTT协议的,需要MQTT协议的主要内容

    package boot.example.mqtt.server.adapter;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Set;
    import java.util.stream.Collectors;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import io.netty.channel.Channel;
    import io.netty.handler.codec.mqtt.MqttConnAckMessage;
    import io.netty.handler.codec.mqtt.MqttConnAckVariableHeader;
    import io.netty.handler.codec.mqtt.MqttConnectMessage;
    import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
    import io.netty.handler.codec.mqtt.MqttConnectVariableHeader;
    import io.netty.handler.codec.mqtt.MqttFixedHeader;
    import io.netty.handler.codec.mqtt.MqttMessage;
    import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
    import io.netty.handler.codec.mqtt.MqttMessageType;
    import io.netty.handler.codec.mqtt.MqttPubAckMessage;
    import io.netty.handler.codec.mqtt.MqttPublishMessage;
    import io.netty.handler.codec.mqtt.MqttQoS;
    import io.netty.handler.codec.mqtt.MqttSubAckMessage;
    import io.netty.handler.codec.mqtt.MqttSubAckPayload;
    import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
    import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
    
    
    public class BootMqttMsgBack {
    
    	private static Logger log =  LoggerFactory.getLogger(BootMqttMsgBack.class);
    	
    	/**
    	 * 	确认连接请求
    	 * @param channel
    	 * @param mqttMessage
    	 */
    	public static void connack (Channel channel, MqttMessage mqttMessage) {
    		MqttConnectMessage mqttConnectMessage = (MqttConnectMessage) mqttMessage;
    		MqttFixedHeader mqttFixedHeaderInfo = mqttConnectMessage.fixedHeader();
    		MqttConnectVariableHeader mqttConnectVariableHeaderInfo = mqttConnectMessage.variableHeader();
    		
    		//	构建返回报文, 可变报头
    		MqttConnAckVariableHeader mqttConnAckVariableHeaderBack = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED, mqttConnectVariableHeaderInfo.isCleanSession());
    		//	构建返回报文, 固定报头
    		MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.CONNACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
    		//	构建CONNACK消息体
    		MqttConnAckMessage connAck = new MqttConnAckMessage(mqttFixedHeaderBack, mqttConnAckVariableHeaderBack);
    		log.info("back--"+connAck.toString());
    		channel.writeAndFlush(connAck);
    	}
    	
    	/**
    	 * 	根据qos发布确认
    	 * @param channel
    	 * @param mqttMessage
    	 */
    	public static void puback (Channel channel, MqttMessage mqttMessage) {
    		MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
    		MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();
    		MqttQoS qos = (MqttQoS) mqttFixedHeaderInfo.qosLevel();
            byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()];
            mqttPublishMessage.payload().readBytes(headBytes);
            String data = new String(headBytes);
            System.out.println("publish data--"+data);
    
            switch (qos) {
    	        case AT_MOST_ONCE: 		//	至多一次
    	            break;
    	        case AT_LEAST_ONCE:		//	至少一次
    	    		//	构建返回报文, 可变报头
    	    		MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
    	    		//	构建返回报文, 固定报头
    	    		MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBACK,mqttFixedHeaderInfo.isDup(), MqttQoS.AT_MOST_ONCE, mqttFixedHeaderInfo.isRetain(), 0x02);
    	    		//	构建PUBACK消息体
    	    		MqttPubAckMessage pubAck = new MqttPubAckMessage(mqttFixedHeaderBack, mqttMessageIdVariableHeaderBack);
    	    		log.info("back--"+pubAck.toString());
    	    		channel.writeAndFlush(pubAck);
    	            break;
    	        case EXACTLY_ONCE:		//	刚好一次
    	            //	构建返回报文, 固定报头
    	        	MqttFixedHeader mqttFixedHeaderBack2 = new MqttFixedHeader(MqttMessageType.PUBREC,false, MqttQoS.AT_LEAST_ONCE,false,0x02);
    	            //	构建返回报文, 可变报头
    	            MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack2 = MqttMessageIdVariableHeader.from(mqttPublishMessage.variableHeader().packetId());
    	            MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack2,mqttMessageIdVariableHeaderBack2);
    	    		log.info("back--"+mqttMessageBack.toString());
    	    		channel.writeAndFlush(mqttMessageBack);
    	            break;
    			default:
    				break;
            }
    	}
    	
    	/**
    	 * 	发布完成 qos2 
    	 * @param channel
    	 * @param mqttMessage
    	 */
    	public static void pubcomp (Channel channel, MqttMessage mqttMessage) {
            MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader();       
            //	构建返回报文, 固定报头
        	MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.PUBCOMP,false, MqttQoS.AT_MOST_ONCE,false,0x02);
            //	构建返回报文, 可变报头
            MqttMessageIdVariableHeader mqttMessageIdVariableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
            MqttMessage mqttMessageBack = new MqttMessage(mqttFixedHeaderBack,mqttMessageIdVariableHeaderBack);
    		log.info("back--"+mqttMessageBack.toString());
    		channel.writeAndFlush(mqttMessageBack);
    	}
    	
    	/**
    	 * 	订阅确认
    	 * @param channel
    	 * @param mqttMessage
    	 */
    	public static void suback(Channel channel, MqttMessage mqttMessage) {
    		MqttSubscribeMessage mqttSubscribeMessage = (MqttSubscribeMessage) mqttMessage;
    		MqttMessageIdVariableHeader messageIdVariableHeader = mqttSubscribeMessage.variableHeader(); 
    		//	构建返回报文, 可变报头
    		MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
    		Set<String> topics = mqttSubscribeMessage.payload().topicSubscriptions().stream().map(mqttTopicSubscription -> mqttTopicSubscription.topicName()).collect(Collectors.toSet());
    		//log.info(topics.toString());
    		List<Integer> grantedQoSLevels = new ArrayList<>(topics.size());
    		for (int i = 0; i < topics.size(); i++) {
    			grantedQoSLevels.add(mqttSubscribeMessage.payload().topicSubscriptions().get(i).qualityOfService().value());
    		}
    		//	构建返回报文	有效负载
    		MqttSubAckPayload payloadBack = new MqttSubAckPayload(grantedQoSLevels);
    		//	构建返回报文	固定报头
    		MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2+topics.size());
    		//	构建返回报文	订阅确认
    		MqttSubAckMessage subAck = new MqttSubAckMessage(mqttFixedHeaderBack,variableHeaderBack, payloadBack);
    		log.info("back--"+subAck.toString());
    		channel.writeAndFlush(subAck);
    	}
    	
    	/**
    	 * 	取消订阅确认
    	 * @param channel
    	 * @param mqttMessage
    	 */
    	public static void unsuback(Channel channel, MqttMessage mqttMessage) {
    		MqttMessageIdVariableHeader messageIdVariableHeader = (MqttMessageIdVariableHeader) mqttMessage.variableHeader(); 
    		//	构建返回报文	可变报头
    		MqttMessageIdVariableHeader variableHeaderBack = MqttMessageIdVariableHeader.from(messageIdVariableHeader.messageId());
    		//	构建返回报文	固定报头
    		MqttFixedHeader mqttFixedHeaderBack = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 2);
    		//	构建返回报文	取消订阅确认
    		MqttUnsubAckMessage unSubAck = new MqttUnsubAckMessage(mqttFixedHeaderBack,variableHeaderBack);
    		log.info("back--"+unSubAck.toString());
    		channel.writeAndFlush(unSubAck);
    	}
    	
    	/**
    	 * 	心跳响应
    	 * @param channel
    	 * @param mqttMessage
    	 */
    	public static void pingresp (Channel channel, MqttMessage mqttMessage) {
    		//	心跳响应报文	11010000 00000000  固定报文
    		MqttFixedHeader fixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0);
    		MqttMessage mqttMessageBack = new MqttMessage(fixedHeader);
    		log.info("back--"+mqttMessageBack.toString());
    		channel.writeAndFlush(mqttMessageBack);
    	}
    	
    	
    }
    

    使用springboot+netty很容易在极短的时间内搭建基于MQTT协议的服务端,简单地DEMO应用仅仅只需要四个类就能满足要求,这个Demo应用是基于Netty4.x的。

    使用Netty来搭建MQTT协议的服务端DEMO搭建好了,需要一个MQTT客户端来做测试,我使用的是eclipse的paho工具来测试的,支持window客户端版本,我使用的是(org.eclipse.paho.ui.app-1.1.1-win32.win32.x86_64.zip)  UI的意思是可以桌面用的,当然需要java的jdk支持的,此工具官网有的。

    测试效果截图

    DEMO仅仅只是DEMO,他只是单方面的支持客户端到服务端创建连接,订阅,取消订阅,发布消息等,最主要地还是要看到MQTT创建过程和数据

    在类中,使用了log日志,使用springboot默认的log日志配置,便可以得到MQTT创建过程中的数据

     

    MQTT协议主要由三部分组成

    • 固定头(MqttFixedHeader):所有的 MQTT 数据包都有,用于表示数据包类型及对应标识,还有数据包的大小
    • 可变头(variableHeader):部分的 MQTT 数据包中有,需要根据协议中具体类型来决定
    • 消息体(payload):部分的 MQTT 数据包中有,具体数据信息(关键真正业务用到的数据哦)

    具体的MQTT协议需要参考文档

     

    我们使用的是netty封装的mqtt类,在(io.netty.handler.codec.mqtt)里toString()方法得到报文信息参考如下

    1.连接服务器(CONNECT)和确认连接请求(CONNACK)

    客户端到服务端的网络连接建立后,客户端发送给服务端的第一个报文必须是CONNECT报文

    MqttConnectMessage[
    fixedHeader=MqttFixedHeader[messageType=CONNECT, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=35], 
    variableHeader=MqttConnectVariableHeader[name=MQTT, version=4, hasUserName=false, hasPassword=false, isWillRetain=false, isWillFlag=false, isCleanSession=true, keepAliveTimeSeconds=20],
    payload=MqttConnectPayload[clientIdentifier=paho1614153936872000000, willTopic=null, willMessage=null, userName=null, password=null]
    ]

    从CONNECT报文中,我们可以看到很多的信息,协议标识,协议级别,会话,遗嘱,用户,密码等,我这里抓取的报文只是一个基础参考

     

    服务端发送CONNACK报文响应从客户端收到的CONNECT报文,服务端发送给客户端的第一个报文必须是CONNACK,这里也只是一个参考,具体需要根据CONNECT来返回报文

    MqttConnAckMessage[
    fixedHeader=MqttFixedHeader[messageType=CONNACK, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=2], 
    variableHeader=MqttConnAckVariableHeader[connectReturnCode=CONNECTION_ACCEPTED, sessionPresent=true], 
    payload=]

     

    2.订阅主题(SUBSCRIBE)和确认订阅(SUBACK)

    客户端向服务端发送SUBSCRIBE报文用于创建一个或多个订阅,每个订阅注册客户端关心的一个或多个主题,为了将应用消息转发给与那些订阅匹配的主题,服务端发送PUBLISH报文给客户端,SUBSCRIBE报文也(为每个订阅)指定了最大的QoS等级,服务端根据这个发送应用消息给客户端,具体的应用,DEMO里只是体现了订阅主题的过程,实际业务并不是如此简单,订阅主题的报文参考如下

    MqttSubscribeMessage[
    fixedHeader=MqttFixedHeader[messageType=SUBSCRIBE, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=56], 
    variableHeader=MqttMessageIdVariableHeader[messageId=1], 
    payload=MqttSubscribePayload[
    MqttTopicSubscription[topicFilter=test/netty/post, qualityOfService=AT_MOST_ONCE], 
    MqttTopicSubscription[topicFilter=test/netty/get, qualityOfService=AT_LEAST_ONCE], 
    MqttTopicSubscription[topicFilter=test/netty/event, qualityOfService=EXACTLY_ONCE]
    ]]

    服务端发送SUBACK报文给客户端,用于确认它已收到并且正在处理SUBSCRIBE报文,SUBACK报文包含一个返回码清单,它们指定了SUBSCRIBE请求的每个订阅被授予的最大QoS等级,确认订阅的报文参考如下

    MqttSubAckMessage[
    fixedHeader=MqttFixedHeader[messageType=SUBACK, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=5], 
    variableHeader=MqttMessageIdVariableHeader[messageId=1], 
    payload=MqttSubAckPayload[grantedQoSLevels=[0, 1, 2]]
    ]

     

    3.取消订阅(UNSUBSCRIBE)和取消订阅确认(UNSUBACK)

    客户端发送UNSUBSCRIBE报文给服务端,用于取消订阅主题,参考报文如下

    MqttUnsubscribeMessage[
    fixedHeader=MqttFixedHeader[messageType=UNSUBSCRIBE, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=53], 
    variableHeader=MqttMessageIdVariableHeader[messageId=2], 
    payload=MqttUnsubscribePayload[topicName = test/netty/post, topicName = test/netty/get, topicName = test/netty/event]
    ]

    服务端发送UNSUBACK报文给客户端用于确认收到UNSUBSCRIBE报文,参考报文如下

    MqttUnsubAckMessage[
    fixedHeader=MqttFixedHeader[messageType=UNSUBACK, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=2], 
    variableHeader=MqttMessageIdVariableHeader[messageId=2], 
    payload=
    ]

     

    4.心跳请求(PINGREQ)和心跳响应(PINGRESP)

    客户端发送PINGREQ报文给服务端的,在没有任何其它控制报文从客户端发给服务的时,告知服务端客户端还活着,请求服务端发送 响应确认它还活着,使用网络以确认网络连接没有断开,参考报文如下

    MqttMessage[
    fixedHeader=MqttFixedHeader[messageType=PINGREQ, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=0], 
    variableHeader=, 
    payload=
    ]

    服务端发送PINGRESP报文响应客户端的PINGREQ报文,表示服务端还活着

    MqttMessage[
    fixedHeader=MqttFixedHeader[messageType=PINGRESP, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=0], 
    variableHeader=, 
    payload=
    ]

     

    5.断开连接(DISCONNECT)客户端主动断开连接

    DISCONNECT报文是客户端发给服务端的最后一个控制报文,表示客户端正常断开连接,而服务端不需要返回消息了,处理业务逻辑便可。

    MqttMessage[
    fixedHeader=MqttFixedHeader[messageType=DISCONNECT, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=0], 
    variableHeader=, 
    payload=
    ]

    6.发布和订阅,根据(QoS等级)

    • 发布消息(PUBLISH): PUBLISH控制报文是指从客户端向服务端或者服务端向客户端传输一个应用消息 
    • 发布确认(PUBACK): PUBACK报文是对QoS 1等级的PUBLISH报文的响应
    • 发布收到(PUBREC): PUBREC报文是对QoS等级2的PUBLISH报文的响应,它是QoS 2等级协议交换的第二个报文 
    • 发布释放(PUBREL): PUBREL报文是对PUBREC报文的响应,它是QoS 2等级协议交换的第三个报文
    • 发布完成(PUBCOMP): PUBCOMP报文是对PUBREL报文的响应,它是QoS 2等级协议交换的第四个也是最后一个报文

     

    1).QoS0-至多一次,最多一次

    客户端->服务端  PUBLISH  服务端无需向客户端发送确认消息,这就是最多一次消息,参考报文

    MqttPublishMessage[
    fixedHeader=MqttFixedHeader[messageType=PUBLISH, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=25], 
    variableHeader=MqttPublishVariableHeader[topicName=test/netty/post, packetId=-1], 
    payload=PooledSlicedByteBuf(ridx: 0, widx: 8, cap: 8/8, unwrapped: PooledUnsafeDirectByteBuf(ridx: 27, widx: 27, cap: 496))
    ]

    其中playload的数据可以用下面代码获取

    		MqttPublishMessage mqttPublishMessage = (MqttPublishMessage) mqttMessage;
    		MqttFixedHeader mqttFixedHeaderInfo = mqttPublishMessage.fixedHeader();
    		MqttQoS qos = (MqttQoS) mqttFixedHeaderInfo.qosLevel();
            byte[] headBytes = new byte[mqttPublishMessage.payload().readableBytes()];
            mqttPublishMessage.payload().readBytes(headBytes);
            String data = new String(headBytes);

    2).QoS1-至少一次,服务器下发确认消息

    客户端->服务端 PUBLISH 参考报文

    MqttPublishMessage[
    fixedHeader=MqttFixedHeader[messageType=PUBLISH, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=26], 
    variableHeader=MqttPublishVariableHeader[topicName=test/netty/get, packetId=4], 
    payload=PooledSlicedByteBuf(ridx: 0, widx: 8, cap: 8/8, unwrapped: PooledUnsafeDirectByteBuf(ridx: 28, widx: 28, cap: 480))]

    服务端->客户端 PUBACK 参考报文

    MqttPubAckMessage[
    fixedHeader=MqttFixedHeader[messageType=PUBACK, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=2], 
    variableHeader=MqttMessageIdVariableHeader[messageId=4], 
    payload=]

    3).QoS2-刚好一次(共四个报文)

    客户端->服务端 PUBLISH 第一个报文

    MqttPublishMessage[
    fixedHeader=MqttFixedHeader[messageType=PUBLISH, isDup=false, qosLevel=EXACTLY_ONCE, isRetain=false, remainingLength=28], 
    variableHeader=MqttPublishVariableHeader[topicName=test/netty/post, packetId=5], 
    payload=PooledSlicedByteBuf(ridx: 0, widx: 9, cap: 9/9, unwrapped: PooledUnsafeDirectByteBuf(ridx: 30, widx: 30, cap: 496))]

    服务端->客户端 PUBREC 第二个报文

    MqttMessage[
    fixedHeader=MqttFixedHeader[messageType=PUBREC, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=2], 
    variableHeader=MqttMessageIdVariableHeader[messageId=5], 
    payload=]

    客户端->服务端 PUBREL 第三个报文

    MqttMessage[
    fixedHeader=MqttFixedHeader[messageType=PUBREL, isDup=false, qosLevel=AT_LEAST_ONCE, isRetain=false, remainingLength=2], 
    variableHeader=MqttMessageIdVariableHeader[messageId=5], 
    payload=]

    服务端->客户端 PUBCOMP 第四个报文

    MqttMessage[
    fixedHeader=MqttFixedHeader[messageType=PUBCOMP, isDup=false, qosLevel=AT_MOST_ONCE, isRetain=false, remainingLength=2], 
    variableHeader=MqttMessageIdVariableHeader[messageId=5], 
    payload=]

    这仅仅只是一个DEMO,不涉及任何业务,主要参考了(https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/01-Introduction.html)MQTT中文版协议

    展开全文
  • web单体式应用,Netty应用,Mqtt应用的代码总结 模块说明(/ src目录) phynos └── phynos - front -- 通讯前置机 ├── phynos - front - mqtt -- mqtt前置机 ├── phynos - front - raw -- netty前置...
  • MQTT服务器搭建--Apollo

    千次阅读 2017-04-06 14:41:29
    MQTT服务器搭建--Apollo 1.Apollo下载 下载地址:http://activemq.apache.org/apollo/download.html 直接下载apache-apollo-1.7.1-windows的安装包:...

    MQTT服务器搭建--Apollo

    1.Apollo下载

    下载地址:http://activemq.apache.org/apollo/download.html

    直接下载apache-apollo-1.7.1-windows的安装包:http://apache.fayea.com/activemq/activemq-apollo/1.7.1/apache-apollo-1.7.1-windows-distro.zip


    2.Apollo安装

    将下载的压缩包解压后:


    在命令窗口进入apache-apollo-1.7.1的目录里面,


    运行bin\apollo.cmd


    创建apollo的服务器应用实例


    创建服务器实例的文件夹


    服务器实例的配置信息


    进入服务器实例目录


    4.服务器实例运行


    在服务列表找到服务项


    启动服务后,在浏览器上面输入:https://127.0.0.1:61681/http://127.0.0.1:61680/


    使用默认的用户名和密码登录(admin;password)


    见到上图说明apollo运行成功了!也可以去日志目录看一下apollo日志信息


    展开全文
  • mqtt服务器搭建以及mqtt客户端测试工具安装使用mqtt 服务器选择安装mqtt 发布、订阅介绍mqtt 客户端下载mqtt客户端使用案例 mqtt 服务器选择安装 mqtt 服务器选择 emq , 版本:xxx mqtt 发布、订阅介绍 发布、订阅 ...
  • 搭建服务器工具:activeMQ微信截图_20190527083335.png解压完成后,选择一个目标文件夹,然后调用命令D:\apache-activemq-5.15.9\bin\activemq.bat create mybroker然后进入到文件夹mybrokercd mybroker运行mqtt...
  • 做者使用的是docker版elipse-mosquitto(mqtt),没有使用docker版的mqtt配置与本文相同,进入正题:**请确保你的ca证书所在目录与做者相同!!!!**做者证书制做及所在目录请看自制CA证书,自制客户端,服务端证书1....
  • Springboot+Netty搭建基于TCP协议的服务端(一)

    万次阅读 多人点赞 2019-06-02 19:37:20
    Netty是业界最流行的nio框架之一,它具有功能强大、性能优异、可定制性和可扩展性的优点 Netty的优点: 1.API使用简单,开发入门门槛低。 2.功能十分强大,预置多种编码解码功能,支持多种主流协议。 3.可定制、...
  • 为了满足消息推送的需求和增强推送系统的性能,采用Netty网络编程框架并搭建消息推送服务器集群,使用TCP链接发送心跳包,以保持和维护连接状态进行消息推送。通过性能测试,结果表明服务器集群可以分散链接压力,...
  • MQTT】Java SSM开发MQTT,一篇就够了:服务器搭建+SSM框架容器+web端mqtt.js+arduino ESP8266开发接入 文章目录【MQTT】Java SSM开发MQTT,一篇就够了:服务器搭建+SSM框架容器+web端mqtt.js+arduino ESP8266开发...
  • netty网关,支持百万客户端连接,压力测试ing…,并优化了与服务端集群通信,以往轮询往多个服务器发消息,看似消息发送很平均,其实大大影响了效率,本次对平均算法做了优化,本次上传代码添加了很多功能,摒弃了...
  • 在上一篇文章《【原创】MQTT客户端搭建-最清晰的MQTT协议架构》中提到了MQTT.fx工具作为客户端使用时,使用了该工具官方提供的服务器m2m.eclipse.org: 1883,现在准备在本地主机搭建一个MQTT服务器。   在GitHub...
  • netty 物联网iot中mqtt 服务器端开发,主要是使用技术springboot+mqtt3.1.1+netty
  • 本博文记录自己的一个完整的无线终端应用平台的搭建过程,持续更新! 使用到的技术:NettyMQTT,JAVA 使用到的硬件:Lora网关,无线烟感设备 文章目录1、技术讲解与难点1.1 Netty1.2 MQTT2、起步 1、技术讲解与...
  • 现在我有一个项目需求, 老大叫我把dtu数据采集仪,通过串口,来发送数据,我们需要展示数据,代表我们需要开发能接受这玩意的服务器,这台设备有许多种方式来接入:tcp/mqtt 我今天选择是mqtt,这玩意借个图来表达我...
  • camel与activemq搭建netty4框架

    千次阅读 2017-07-27 16:52:30
    笔者现在参与的项目是有设备或安全保往服务器发送数据,APP端/WEB端请求服务器的数据,笔者感觉这样转发起来太麻烦,累,所以就尝试一下用camel和activemq搭建netty4,别忘了,你还要搭起activemq服务器,当然了,...
  • SpringBoot整合Mqtt

    万次阅读 2020-08-27 17:49:34
    订阅发布,服务器可以使用Emq,官方网址是https://www.emqx.io/ (实在不想在搭建Emq服务器了太求没意思了。故此通过springboot整合Mqtt来倒推Emq的功能cuiyaonan2000@163.com。) 参考资料: ...
  • MQTT协议详解及开发教程(二)MQTT服务器EMQx搭建 MQTT协议详解及开发教程(三)MQTT Client工具软件选择及简单测试 MQTT协议详解及开发教程(四)MQTT协议报文格式 MQTT协议详解及开发教程(五)CONNECT/CONNACK...
  • MQTT 大消息失败原因排查

    千次阅读 2019-09-13 12:57:27
    小组内使用 MQTT 协议搭建了一个聊天服务器,前天在测大消息(超过5000汉字)时,连接直接变得不可用,后续发送的消息全部都收不到回复。 服务器环境: Netty :4.1.32.Final 使用的是 Netty 包中自带的 MqttDecoder...
  • Netty浅析

    2019-10-14 14:49:48
    Netty浅析 - 1. 基础 https://www.jianshu.com/p/5e8e9d458c5c 前言 在了解一个事物之前,最好能对它的基本属性和相关概念有个基本的认知,所以学习Netty之前,也有必要了解与Netty相关的基础概念知识;本篇将对...
  • 1.什么是MQTT 2.MQTT协议实现方式 一.什么是MQTT 什么是快乐星球。。不对。。什么是MQTT MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式...
  • 基于MQTT+WCF 实现Web 接口转即时服务

    千次阅读 2015-11-03 20:19:03
    -MQTT Client -MQTT Broker -WCF(Base Winform) -WebControler (Base Asp.net MVC) ...通过使用MQTT Broker的快速搭建,开发即时服务器应用 SocketAPP1,Android/IOS 其他应用使用MQTT连接服务器,只处理
  • 基于Netty的物联网应用

    万次阅读 热门讨论 2018-05-22 20:38:41
    本项目是基于局域网和即将到来的5G为信息载体,以终端节点(EndNodes)、网关(Gateway)、云服务器(LoRaWAN Server)和客户端(Client)组成。用于监测温室,大棚等局部环境变化。做到实时监控,提前预防。 先让我们一起...
  • JetLinks物联网基础平台-通过第三方MQTT服务接入设备

    千次阅读 多人点赞 2020-03-17 16:43:00
    在某些场景,设备不是直接接入平台,而是通过第三方MQTT服务,如:emqtt. 消息编解码与MQTT服务一样,从消息协议中使用DefaultTransport.MQTT来获取消息编解码器. 本文使用mqtt.fx为设备端,通过emqtt接入平台。 创建...
  • 主要给大家介绍了关于Spring Boot集成netty实现客户端服务端交互的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧
  • 3.1NETTY自定义协议的TCP服务器 3.1.1使用原因 为什么要使用自定义的协议呢,原因有三: ①常规的物联网系统是连接到大公司搭好的平台上,如使用mqtt连接到中国移动的onenet平台,但缺点就是公司可以掌握你的所有...
  • netty实现paho(一)

    千次阅读 2017-07-27 19:21:25
    netty实现paho(一)前言在我们实际的项目中,使用了开源MQTT协议的实现mosquitto,使用mosquitto作为server,负责消息订阅,消息推送,长连接管理和检测等功能,同时使用了paho作为客户端进行对接,paho也是一款...
  • Springboot整合mybatisPlus+mysql+druid+swaggerUI+ mqtt 整合mqtt 整合druid 整合mybatis-plus 完整pom 完整yml 整合swaggerUi 整合log4j MQTT 物联网系统基本架构 本物联网系列 mqtt) 整合mqtt org.spring...

空空如也

空空如也

1 2 3 4 5 ... 13
收藏数 252
精华内容 100
关键字:

netty搭建mqtt服务器