精华内容
下载资源
问答
  • 2020-10-02 08:47:28

    Retained Message:

    保留消息定义

      如果PUBLISH消息的RETAIN标记位被设置为1,则称该消息为“保留消息”;
      Broker会存储每个Topic的最后一条保留消息及其Qos,当订阅该Topic的客户端上线后,Broker需要将该消息投递给它。

    A retained message is a normal MQTT message with the retained flag set to true. The broker will store the last retained message and the corresponding QoS for that topic Each client that subscribes to a topic pattern, which matches the topic of the retained message, will receive the message immediately after subscribing. For each topic only one retained message will be stored by the broker.

    保留消息作用

      可以让新订阅的客户端得到发布方的最新的状态值,而不必要等待发送。

    A retained message makes sense, when newly connected subscribers should receive messages immediately and shouldn’t have to wait until a publishing client sends the next message. This is extremely helpful when for status updates of components or devices on individual topics. For example the status of device1 is on the topic myhome/devices/device1/status, a new subscriber to the topic will get the status (online/offline) of the device immediately after subscribing when retained messages are used. The same is true for clients, which send data in intervals, temperature, GPS coordinates and other data. Without retained messages new subscribers are kept in the dark between publish intervals. So using retained messages helps to provide the last good value to a connecting client immediately.

    保留消息的删除

    1. 方式1:发送空消息体的保留消息;
    2. 方式2:发送最新的保留消息覆盖之前的(推荐);

    让我们来看一下这个场景:

    你有一个温度传感器,它每三个小时向一个 Topic 发布当前的温度。那么问题来了,有一个新的订阅者在它刚刚发布了当前温度之后订阅了这个主题,那么这个订阅端什么时候能才能收到温度消息?

    对的,它必须等到三个小时以后,温度传感器再次发布消息的时候才能收到。在这之前,这个新的订阅者对传感器的温度数据一无所知。

    怎么来解决这个问题呢?

    这个时候就轮到 Retained 消息出场解决这个问题了。Retained 消息是指在 PUBLISH 数据包中 Retain 标识设为 1 的消息,Broker 收到这样的 PUBLISH 包以后,将保存这个消息,当有一个新的订阅者订阅相应主题的时候,Broker 会马上将这个消息发送给订阅者。

    Retain 消息有以下一些特点:

    • 一个 Topic 只能有 1 条 Retained 消息,发布新的 Retained 消息将覆盖老的 Retained 消息;
    • 如果订阅者使用通配符订阅主题,它会收到所有匹配的主题上的 Retained 消息;
    • 只有新的订阅者才会收到 Retained 消息,如果订阅者重复订阅一个主题,也会被当做新的订阅者,然后收到 Retained 消息;
    • Retained 消息发送到订阅者时,消息的 Retain 标识仍然是 1,订阅者可以判断这个消息是否是 Retained 消息,以做相应的处理。

    注意:Retained 消息和持久性会话没有任何关系,Retained 消息是 Broker 为每一个 Topic 单独存储的,而持久性会话是 Broker 为每一个 Client 单独存储的。

    如果你想删除一个 Retained 消息也很简单,只要向这个主题发布一个 Payload 长度为 0 的 Retained 消息就可以了。

    那么开头我们提到的那个场景的解决方案就很简单了,温度传感器每 3 个小时发布当前的温度的 Retained 消息,那么无论新的订阅者什么时候进行订阅,它都能收到温度传感器上一次发布的数据。

    8.2 代码实践:发布和接收 Retained 消息

    在这里我们编写一个发布 Retained 消息的发布者,和一个接受消息的订阅端,订阅端在接收消息的时候将消息的 Retain 标识和内容打印出来。

    完整的代码 publish_retained.js 如下:

    var mqtt = require('mqtt')
    var client = mqtt.connect('mqtt://iot.eclipse.org', {
        clientId: "mqtt_sample_publisher_1",
        clean: false
    })
    
    client.on('connect', function (connack) {
        if(connack.returnCode == 0){
            client.publish("home/2ndfloor/201/temperature", JSON.stringify({current: 25}), {qos: 0, retain: 1}, function (err) {
                if(err == undefined) {
                    console.log("Publish finished")
                    client.end()
                }else{
                    console.log("Publish failed")
                }
            })
        }else{
            console.log(`Connection failed: ${connack.returnCode}`)
        }
    
    })

    完整的代码 subscribe_retained.js 如下:

    var mqtt = require('mqtt')
    var client = mqtt.connect('mqtt://iot.eclipse.org', {
        clientId: "mqtt_sample_subscriber_id_chapter_8",
        clean: false
    })
    
    client.on('connect', function (connack) {
        if(connack.returnCode == 0) {
            if (connack.sessionPresent == false) {
                console.log("subscribing")
                client.subscribe("home/2ndfloor/201/temperature", {
                    qos: 0
                }, function (err, granted) {
                    if (err != undefined) {
                        console.log("subscribe failed")
                    } else {
                        console.log(`subscribe succeeded with ${granted[0].topic}, qos: ${granted[0].qos}`)
                    }
                })
            }
        }else {
            console.log(`Connection failed: ${connack.returnCode}`)
        }
    })
    
    
    client.on("message", function (_, message, packet) {
        var jsonPayload = JSON.parse(message.toString())
        console.log(`retained: ${packet.retain}, temperature: ${jsonPayload.current}`)
    })

    我们首先运行 node publish_retained.js,再运行 node subscribe_retained.js,会得到以下输出:

    retained: true, temperature: 25

    可见我们在 Publisher 发布之后再订阅主题也能收到 Retained 消息。

    然后我们再运行一次 node publish_retained.js,在运行 subscribe_retained.js 的终端会有以下输出:

    retained: false, temperature: 25

    Broker 收到 Retained 消息以后,只是保存这个消息,然后按照正常的转发逻辑转发给订阅者,所以对订阅者来说,这个是一个普通的 MQTT 消息,所以 Retain 标识为 0。

    然后 Ctrl+C 关闭掉 subscribe_retained.js,然后重新运行,终端不会有任何输出,可见 Retained 消息只对新订阅的订阅者有效。

    更多相关内容
  • EMQ-保留消息 概述和案例

    千次阅读 2021-03-04 11:48:29
    服务端收到 Retain 标志为 1 的 PUBLISH 报文时,会将该报文视为保留消息,除了被正常转发以外,保留消息会被存储在服务端,每个主题下只能存在一份保留消息,因此如果已经存在相同主题的保留消息,则该保留消息被...

    简介

    服务端收到 Retain 标志为 1 的 PUBLISH 报文时,会将该报文视为保留消息除了被正常转发以外,保留消息会被存储在服务端,每个主题下只能存在一份保留消息,因此如果已经存在相同主题的保留消息,则该保留消息被替换

    当客户端建立订阅时,如果服务端存在主题匹配的保留消息,则这些保留消息将被立即发送给该客户端。借助保留消息,新的订阅者能够立即获取最近的状态,而不需要等待无法预期的时间,这在很多场景下非常重要的。

    EMQ X 默认开启保留消息的功能,可以在 etc/emqx.conf 中修改 mqtt.retain_available为 false 以禁用保留消息功能。如果 EMQ X 在保留消息功能被禁用的情况下依然收到了保留消息,那么将返回原因码为 0x9A(不支持保留消息)的 DISCONNECT 报文。

    配置

    EMQ X 的保留消息功能是由 emqx_retainer 插件实现该插件默认开启,通过修改 emqx_retainer 插件的配置,可以调整 EMQ X 储存保留消息的位置,限制接收保留消息数量和 Payload 最大长度,以及调整保留消息的过期时间。

    emqx_retainer 插件默认开启,插件的配置路径为 etc/plugins/emqx_retainer.conf

    配置项类型可取值默认值说明
    retainer.storage_typeenumramdiscdisc_onlyramram:仅储存在内存中;
    disc:储存在内存和硬盘中;
    disc_only:仅储存在硬盘中。
    retainer.max_retained_messagesinteger>= 00保留消息的最大数量,0 表示没有限制。保留消息数量超出最大值限制后,可以替换已存在的保留消息,但不能为新的主题储存保留消息。
    retainer.max_payload_sizebytesize 1MB保留消息的最大 Payload 值。Payload 大小超出最大值后 EMQ X 消息服务器会把收到的保留消息作为普通消息处理。
    retainer.expiry_intervalduration 保留消息的过期时间,0 表示永不过期。如果 PUBLISH 报文中设置了消息过期间隔,那么以 PUBLISH 报文中的消息过期间隔为准。


    普通消息 与 保留消息 对比 案例

    普通消息 未订阅前


    普通消息 订阅后,看右侧的消息窗口,没有收到订阅,没变化



    保留消息 未订阅前


    保留消息 订阅后,注意看右侧的消息窗口,有收到最后发的一条保留消息

     

    展开全文
  • 前言 发布订阅模式虽然让消息的发布者与订阅者充分解耦,但也出现了一个隐含的问题,即订阅者无法主动向发布者请求消息,订阅者何时收到消息完全依赖于...因此 MQTT 引入了保留消息保留消息 当服务端收到 Ret...

    前言

    发布订阅模式虽然让消息的发布者与订阅者充分解耦,但也出现了一个隐含的问题,即订阅者无法主动向发布者请求消息,订阅者何时收到消息完全依赖于发布者何时发布消息,这在某些场景中就产生了不便。例如,某设备定期发布自身 GPS 坐标,但对于订阅者而言,从它发起订阅到第一次收到数据可能需要几秒钟,也可能需要十几分钟甚至更多,这样并不友好。因此 MQTT 引入了保留消息。

    保留消息

    在这里插入图片描述

    当服务端收到 Retain 标志为 1 的 PUBLISH 报文时,它将进行以下操作:

    1. 如果存在匹配此主题名的订阅者,则按正常逻辑进行转发,并在转发前清除 Retain 标志。MQTT v3.1.1 协议中 Retain 标志必须被清除,而 MQTT v5.0 协议则在订阅选项中新增了一个 Retain As Publish 字段,由客户端自行指示服务端在转发前是否需要清除 Retain 标志。
    2. 如果 Payload 非空,存储此应用消息,如果此主题名下已经存在保留消息则进行替换。如果 Payload 为空,服务端不会存储此应用消息,同时清除此主题名下已经存在的保留消息。

    而每当有订阅者建立订阅时,服务端就会查找是否存在匹配该订阅的保留消息,如果保留消息存在,就会立即转发给订阅者。当保留消息在这种情况下被转发给订阅者时,它的 Retain 标志必须保持为 1。相比 MQTT v3.1.1,MQTT v5.0 对于订阅建立时是否发送保留消息做了更细致的划分,并在订阅选项中提供了 Retain Handling 字段。例如某些客户端可能仅希望在首次订阅时接收保留消息,又或者不希望在订阅建立时接收保留消息,都可以通过 Retain Handling 选项调整。

    保留消息虽然存储在服务端中,但它并不属于会话的一部分。也就是说,即便发布这个保留消息的会话终结,保留消息也不会被删除。删除保留消息只有两种方式:

    1. 前文已经提到过的,客户端往某个主题发送一个 Payload 为空的保留消息,服务端就会删除这个主题下的保留消息。
    2. 消息过期间隔属性在保留消息中同样适用,如果客户端设置了这一属性,那么保留消息在服务端存储超过过期时间后就会被删除。

    借助保留消息,新的订阅者能够立即获取最近的状态,而不需要等待无法预期的时间,这在很多场景下很非常重要的。

    展开全文
  • 【如何构建商业级别聊天系统】 MQTT 篇(四)MQTT 特性之 持久会话、保留消息、遗嘱 本篇将介绍 MQTT 的一些我们应该关注的特性 关注不迷路!! 我是 dying 搁浅 神秘地址 1. 持久会话 为什么需要持久会话? 为了...

    【如何构建商业级别聊天系统】 MQTT 篇(四)MQTT 特性之 持久会话、保留消息、遗嘱

    本篇将介绍 MQTT 的一些我们应该关注的特性
    关注不迷路!! 我是 dying 搁浅 神秘地址

    1. 持久会话

    为什么需要持久会话?

    为了接收 MQTT broker 的消息,客户端在连接 broker 时会创建其感兴趣主题的订阅。当客户端和代理的连接在非持久会话中断开时,这些主题将丢失,这意味着客户端在重新连接需要重新订阅,这对于资源受限的客户端来说是一笔很高的消耗。同时我们在大多数业务场景下都需要保存一个持久的会话来记录客户端的状态(如保存在 DB 中)。那么将会话的状态等信息保存到代理 broker 中就是一个很好的选择。在客户端与服务代理建立连接时根据唯一的标识来提供会话的信息(如登录凭证或者 clientId)

    持久会话存储什么?
    • session 信息,客户端凭证
    • 客户端所有订阅信息
    • 所有客户端未确认的 QoS 级别为 1 或者 2 的消息
    • 客户端在离线时所有错过的 QoS 级别为 1 或者 2 的消息
    • 所有从客户端收到,尚未完全确认的 QoS 2 的消息
    那么如何开始或者结束一个持久会话?

    客户端可以通过 cleanSession 进行标记,来告诉 broker 代理自己需要怎样的会话,在与代理建立连接时可以选择请求持久会话。

    cleanSession = true 非持久会话 如果客户端请求非持久会话,那么当客户端与代理断连时其前一个持久会话的所有排队消息都将丢失。

    cleanSession = false 持久会话 如果客户端请求持久会话,代理服务端将保存会话的所有信息

    最佳实战

    这里我们将会话分为: persistent session 持久会话clean session 清洁会话

    何时使用 持久会话

    • 客户端必须从某个主题获取所有消息,即使其离线,也想通过 broker 进行消息排队,以便重新连接后立即传送他们。
    • 客户端资源有限,希望通过代理存储其订阅消息,并快速恢复中断的通信。
    • 客户端需要在重新连接后恢复所有 qos 1 或者 2 的消息。

    何时使用 清洁会话

    • 客户端只需要发布消息,不需要订阅主题。客户端不希望 broker 存储消息或者重试 qos 1 或者 2 的传输。
    • 客户端不需要获取离线错过的消息。

    2. 保留消息

    为什么需要保留消息?

    对于 消息发布者 和 主题订阅者,双方对于相互的状态是无感知的,因为我们的 broker 代理 代理了这一切,那么消息发布者只能确保消息正确的发送到了 broker 代理,而 broker 代理 和 主题订阅者之间同样如此。而 消息发布者 并不确定何时向对应的主题发布消息,那在这期间,新订阅主题的客户端对该主题的状态一无所知。这个时候,保留消息就起到的它的作用。

    什么是保留消息?

    保留消息 是 一条普通的 MQTT 消息 其 retained flag 设置为 true,broker 代理为主题存储其最后一条保留消息以及其 Qos 级别。

    那么每个订阅匹配该主题的客户端在订阅后将立即收到该条 保留消息。代理仅存储每个主题的一条保留消息。

    订阅客户端 可以通过 retained flag 来识别该条消息是否为 保留消息,以便决定如何处理它。

    保留消息的作用

    保留消息可以帮助 新订阅的客户端 立即获得该主题的状态更新,其消除了等待 发布客户端 发布下一条消息的时间间隔。

    最佳实战

    啊,那么我们什么时候应该使用保留消息呢?当你希望新订阅的客户端立即获取主题消息时,保留消息是有意义的

    这对于单个组件主题的状态更新非常有用,例如 /my/temperature 获取温度状态,如果使用保留消息,新订阅者在订阅时将立即获取最后的温度状态。如果没有使用,那么在发布者发布下一条消息期间,新订阅者将处于黑暗状态

    3.遗嘱

    为什么需要遗嘱?

    MQTT 经常构建于不可靠的网络场景,由于连接丢失,电量耗尽,可能会发生异常断开的情况。了解客户端是 正常断开(MQTT DISCONNECT 消息) 还是 异常断开 有助于我们进行正确的响应,这里遗嘱消息就发挥了作用。

    LWT 最后的遗嘱 Last Will and Testament

    在 MQTT 中使用 LWT 最后的遗嘱来通知 其他客户端 异常断开的情况。每个客户端在连接到代理时,可以指定 LWT

    LWT 是一条普通的 MQTT 消息带有 主题、保留消息标志、QoS 和 payload。代理保存该消息直到客户端异常断开,此时代理会将 LWT 消息发送给每个订阅该主题的客户端。当客户端正常断开时,LWT 消息会被代理直接丢弃。

    如何指定 LWT?

    在客户端 CONNECT 时,客户端可以指定一条 LWT 消息

    在这里插入图片描述

    代理应该在何时发送 LWT 消息?

    根据 MQTT 规范,borker 代理 必须在以下情况分发 客户端 LWT 消息:

    • 代理检测到 I/O 错误或者网络故障
    • 客户端无法再定义的 Keep Alive 时间内进行通信
    • 客户端在关闭网络连接之前不会发送 DISCONNECT 数据包
    • 由于协议错误,代理关闭连接

    最佳实战

    最后的遗嘱 LWT 是通知订阅客户端,发布客户端异常断开的有效手段。其在生产上经常和 保留消息一起配合使用,以存储客户端在特定主题上的状态。

    举个例子:client1 首先向 broker 发送 CONNECT 数据包其中

    lastWillMessage 将 ”offline“ 作为 payload

    lastWillRetain 设置为 true

    lastWillTopic 设置为 client1/status

    连接之后 client1 再向主题发送一条 PUBLISH 消息,其 payload 为 ”online“ 并将其 retain flag 设置为 true。

    此时 只要 client1保持连接,那么所有新订阅主题的客户端都会收到一条 ”online“ 消息。

    如果 client1 异常断开,broker 会给所有订阅客户端发送 ”offline“ 的 LWT 消息,同时 LWT 消息会成为新的保留消息发送给新的订阅者。

    这种特定的保留消息模式,可以让其他客户端知道,client1 在特定主题上的连接状态。

    展开全文
  • 服务端收到 Retain 标志为 1 的 PUBLISH 报文时,会将该报文视为保留消息,除了被正常转发以外,保留消息会被存储在服务端,每个主题下只能存在一份保留消息,因此如果已经存在相同主题的保留消息,则该保留消息被...
  • BusCast:使用城市公交车灵活灵活地保留消息传递
  • #5之后,pub r=1的空消息,sub收到空消息,sub重新上线,不会再收到保留消息 #6之后,pub r=0,sub上线,不会收到消息 Clean Session Flag 订阅QoS=0,clean session=0,订阅topic1,连接之后,掉线...
  • MQTT入门(8)- 保留消息和最后遗嘱

    千次阅读 2018-01-05 14:59:19
    [b](1)保留消息Retained Messages[/b] MQTT中,无论是发布还是订阅都不会有任何触发事件。 1个Topic只有唯一的retain消息,Broker会保存每个Topic的最后一条retain消息。 每个Client订阅Topic后会立即读取...
  • Parameter for asynchronous messages – xiadapter.inbound.persistDuration.default (confi gured to default 1 day) xiadapter.outbound.persistDuration.default (confi gured to default 1 day) ...
  • 个topic的客户端将会收到该消息,这是正常流程,如果客户端A刚刚发布消息, 此时有一个新的客户端B订阅该topic,也就是“订阅”是在“发布”后,这个时候 客户端B将接收不到该消息。 Retain 功能就是为了解决这一...
  • 当客户端连接时将 Retained 为 true ,Broker 会存储每个 Topic 的最后一条保留消息及其 Qos,当订阅该 Topic 的客户端上线后,Broker 需要将该消息投递给它。 保留消息作用: 可以让新订阅的客户端得到发布方的...
  • 伴随天工物联网核心套件IoT Core在众多领域的广泛落地,百度智能云于近日为其带来重大能力升级——保留消息(Retain)功能,这也是国内公有云厂商中首个支持此特性的物联网服务。 随着物联网技术的广泛应用,数以...
  • 翻译:...Broker对保留消息的处理 Broker会存储每个Topic的最后一条保留消息及其Qos,当订阅该Topic的客户端上线后,Broker需要将该消息投递给它。
  • 最近在做IM(及时通信)相关的应用,长连接(消息收发)采用的Mqtt5, 由于mqtt支持会话保存(cleanStart)、会话超时(sessionExpire)、QOS等设置,可以实现移动端弱网络(或网络断开后再重连)的离线消息接收。 ...
  • Kafka消息保留策略

    千次阅读 2018-11-01 17:24:02
    Kafka Broker默认的消息保留策略是:要么保留一定时间,要么保留消息达到一定大小的字节数。 当消息达到设置的条件上限时,旧消息就会过期并被删除,所以,在任何时刻,可用消息的总量都不会超过配置参数所指定的...
  • 初识emqx消息服务器

    千次阅读 2019-04-16 18:15:17
    EMQ X R3.0 (Erlang/Enterprise/Elastic MQTT Broker) 是基于 Erlang/OTP 语言平台开发,支持大规模连接和分布式集群,发布订阅模式的开源 MQTT 消息服务器。 MQTT是什么 MQTT是一个由IBM主导开发的物联网传输协议,...
  • Kafka消息保留-清理策略

    千次阅读 2020-11-10 00:51:05
    Kafka Broker默认的消息保留策略是:要么保留一定时间,要么保留消息达到一定大小的字节数。 当消息达到设置的条件上限时,旧消息就会过期并被删除,所以,在任何时刻,可用消息的总量都不会超过配置参数所指定的...
  • Rabbitmq Docker容器重启后,MQ内的残留消息会丢失(K8s环境) 丢失消息的服务类型配置 修复配置为 原因
  • Kafka 消息保留机制

    千次阅读 2019-04-26 14:57:43
    Kafka通常根据时间决定数据可以保留多久。默认使用log.retention.hours参数配置时间,默认值是168小时,也就是一周。除此之外,还有其他两个参数,log.retention.minutes和log.retention.ms,这三个参数作用是一样的...
  • 默认情况: 立即删除每个订阅上已确认...保留策略 持久存储已经消耗并确认的消息消息不论消费和未消费,都至少保存X小时。 设置已确认消费消息最大保存时间 pulsar-admin namespaces set-retention tenant/namespa...
  • 我们在看文章的时候,大多看到的都是要怎么打开...1.关闭不保留活动这个其实很少有朋友用过这个功能,开启这个功能后可以提升手机的运行,而且还可以省电,但是它的功能就是不能进行后台的运行,也就是说当你从第...
  • 2.mqtt服务端发送时defaultRetained设置为true保留消息,false不保留消息,即便是重启mqtt服务器消息仍会保留 3.我将defaultRetained设置为false后,或者不设置,因为默认时false,发现再发送消息确实不会保留了,...
  • MQTT推送消息订阅端重复接收问题。 (背景)订阅端断开的时候,发布端联系...该值很多文章上只说了是 消息保留机制,若设置为true,mqtt服务器会保留每次发布的消息;较少提到 若订阅某主题的客户端重启,则会把...
  • Kafka消息保留机制 log.retention

    千次阅读 2021-06-02 19:32:41
    目录 背景 介绍 运行机制 参数 保留参数 log.retention.bytes log.retention.hours log.retention.minutes ...因为在windows环境运行,遭遇了文件重写异常,于是研究了一下kafka的消息保留...
  • 什么是分布式消息中间件? 消息中间件的作用是什么? 消息中间件的使用场景是什么? 消息中间件选型? 初识 Kafka Kafka知识树 1 Why Kafka 活动跟踪:Kafka 可以用来跟踪用户行为,比如我们经常回去淘宝购物,你...
  • 分布式系统的现代消息传递

    千次阅读 2019-06-13 15:29:27
    论文《分布式系统的现代消息传递》Modern Messaging for Distributed Sytems L Magnoni 通过IOP出版有限公司出版许可物理学学报:会议系列,608卷,第1会议 作者电子邮件 luca.magnoni@cern.ch 作者隶属...
  • “消息队列”是在消息的传输过程中保存消息的容器。 “消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;...如果发送消息时接收者不可用,消息队列会保留消息,直到可以成功地传递它...
  • 数据集成-8-消息中间件

    千次阅读 2022-03-15 09:04:14
    数据集成-8-消息中间件

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 308,965
精华内容 123,586
关键字:

保留消息