精华内容
下载资源
问答
  • 包启Mqtt发布与订阅两个VS2010工程,实现了Mqtt的发布与订阅功能
  • linuxMQTT介绍和开发

    2021-06-02 16:40:38
    MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布MQTT最大优点在于,可以以...

    一、简述

     MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

     MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

    在这里插入图片描述

    二、linux下MQTT的交叉编译

    https://blog.csdn.net/u012478275/article/details/117472966

    三、设计规范

    由于物联网的环境是非常特别的,所以MQTT遵循以下设计原则:

    (1)精简,不添加可有可无的功能;

    (2)发布/订阅(Pub/Sub)模式,方便消息在传感器之间传递;

    (3)允许用户动态创建主题,零运维成本;

    (4)把传输量降到最低以提高传输效率;

    (5)把低带宽、高延迟、不稳定的网络等因素考虑在内;

    (6)支持连续的会话控制;

    (7)理解客户端计算能力可能很低;

    (8)提供服务质量管理;

    (9)假设数据不可知,不强求传输数据的类型与格式,保持灵活性。

    四、主要特性

        MQTT协议工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:

    (1)使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。

     这一点很类似于XMPP,但是MQTT的信息冗余远小于XMPP,,因为XMPP使用XML格式文本来传递数据。

    (2)对负载内容屏蔽的消息传输。

    (3)使用TCP/IP提供网络连接。

     主流的MQTT是基于TCP连接进行数据推送的,但是同样有基于UDP的版本,叫做MQTT-SN。这两种版本由于基于不同的连接方式,优缺点自然也就各有不同了。

    (4)有三种消息发布服务质量:

     “至多一次”,消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。

     “至少一次”,确保消息到达,但消息重复可能会发生。

     “只有一次”,确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。

    (5)小型传输,开销很小(固定长度的头部是2字节),协议交换最小化,以降低网络流量。

     这就是为什么在介绍里说它非常适合“在物联网领域,传感器与服务器的通信,信息的收集”,要知道嵌入式设备的运算能力和带宽都相对薄弱,使用这种协议来传递消息再适合不过了。

    (6)使用Last Will和Testament特性通知有关各方客户端异常中断的机制。

    Last Will:即遗言机制,用于通知同一主题下的其他设备发送遗言的设备已经断开了连接。

    Testament:遗嘱机制,功能类似于Last Will。
     

    MQTT协议工作在低带宽、不可靠的网络的远程传感器和控制设备通讯而设计的协议,它具有以下主要的几项特性:

    • (1)使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。

      这一点很类似于XMPP,但是MQTT的信息冗余远小于XMPP,,因为XMPP使用XML格式文本来传递数据。

    • (2)对负载内容屏蔽的消息传输。

    • (3)使用TCP/IP提供网络连接。

      主流的MQTT是基于TCP连接进行数据推送的,但是同样有基于UDP的版本,叫做MQTT-SN。这两种版本由于基于不同的连接方式,优缺点自然也就各有不同了。

    • (4)有三种消息发布服务质量:

      "至多一次",消息发布完全依赖底层TCP/IP网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。这一种方式主要普通APP的推送,倘若你的智能设备在消息推送时未联网,推送过去没收到,再次联网也就收不到了。

      "至少一次",确保消息到达,但消息重复可能会发生。

      "只有一次",确保消息到达一次。在一些要求比较严格的计费系统中,可以使用此级别。在计费系统中,消息重复或丢失会导致不正确的结果。这种最高质量的消息发布服务还可以用于即时通讯类的APP的推送,确保用户收到且只会收到一次。

    • (5)小型传输,开销很小(固定长度的头部是2字节),协议交换最小化,以降低网络流量。

      这就是为什么在介绍里说它非常适合"在物联网领域,传感器与服务器的通信,信息的收集",要知道嵌入式设备的运算能力和带宽都相对薄弱,使用这种协议来传递消息再适合不过了。

    • (6)使用Last Will和Testament特性通知有关各方客户端异常中断的机制。

      Last Will:即遗言机制,用于通知同一主题下的其他设备发送遗言的设备已经断开了连接。

      Testament:遗嘱机制,功能类似于Last Will。

    五、MQTT协议原理

    5.1 MQTT协议实现方式

        实现MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

    MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

    (1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);

    (2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

    5.2 网络传输与应用消息

        MQTT会构建底层网络传输:它将建立客户端到服务器的连接,提供两者之间的一个有序的、无损的、基于字节流的双向传输。

    当应用数据通过MQTT网络发送时,MQTT会把与之相关的服务质量(QoS)和主题名(Topic)相关连。

    5.3 MQTT客户端

        一个使用MQTT协议的应用程序或者设备,它总是建立到服务器的网络连接。客户端可以:

    (1)发布其他客户端可能会订阅的信息;

    (2)订阅其它客户端发布的消息;

    (3)退订或删除应用程序的消息;

    (4)断开与服务器连接。

    5.4 MQTT服务器

        MQTT服务器以称为“消息代理”(Broker),可以是一个应用程序或一台设备。它是位于消息发布者和订阅者之间,它可以:

    (1)接受来自客户的网络连接;

    (2)接受客户发布的应用信息;

    (3)处理来自客户端的订阅和退订请求;

    (4)向订阅的客户转发应用程序消息。

    5.5 MQTT协议中的订阅、主题、会话

    一、订阅(Subscription)

     订阅包含主题筛选器(Topic Filter)和最大服务质量(QoS)。订阅会与一个会话(Session)关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。

    二、会话(Session)

     每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。

    三、主题名(Topic Name)

     连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。

    四、主题筛选器(Topic Filter)

     一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。

    五、负载(Payload)

     消息订阅者所具体接收的内容。

    5.6 MQTT协议中的方法

        MQTT协议中定义了一些方法(也被称为动作),来于表示对确定资源所进行操作。这个资源可以代表预先存在的数据或动态生成数据,这取决于服务器的实现。通常来说,资源指服务器上的文件或输出。主要方法有:

    (1)Connect。等待与服务器建立连接。

    (2)Disconnect。等待MQTT客户端完成所做的工作,并与服务器断开TCP/IP会话。

    (3)Subscribe。等待完成订阅。

    (4)UnSubscribe。等待服务器取消客户端的一个或多个topics订阅。

    (5)Publish。MQTT客户端发送消息请求,发送完成后返回应用程序线程。
     

    六、MQTT协议数据包结构

        在MQTT协议中,一个MQTT数据包由:固定头(Fixed header)、可变头(Variable header)、消息体(payload)三部分构成。MQTT数据包结构如下:

    (1)固定头(Fixed header)。存在于所有MQTT数据包中,表示数据包类型及数据包的分组类标识。

    (2)可变头(Variable header)。存在于部分MQTT数据包中,数据包类型决定了可变头是否存在及其具体内容。

    (3)消息体(Payload)。存在于部分MQTT数据包中,表示客户端收到的具体内容。

    6.1 MQTT固定头

        固定报头,所有的MQTT控制报文都包含,可变报头与有效载荷是部分MQTT控制报文包含。 固定报头占据两字节的空间,具体见:

    在这里插入图片描述

        固定报头的第一个字节分为控制报文的类型(4bit),以及控制报文类型的标志位,控制类型共有14种,其中0与15被系统保留出来,其他的类型具体见:

        固定报头的bit0-bit3为标志位,依照报文类型有不同的含义,事实上,除了PUBLISH类型报文以外,其他报文的标志位均为系统保留,PUBLISH报文的第一字节bit3是控制报文的重复分发标志(DUP),bit1-bit2是服务质量等级,bit0是PUBLISH报文的保留标志,用于标识PUBLISH是否保留,当客户端发送一个PUBLISH消息到服务器,如果保留标识位置1,那么服务器应该保留这条消息,当一个新的订阅者订阅这个主题的时候,最后保留的主题消息应被发送到新订阅的用户。

     固定报头的第二个字节开始是剩余长度字段,是用于记录剩余报文长度的,表示当前的消息剩余的字节数,包括可变报头和有效载荷区域(如果存在),但剩余长度不包括用于编码剩余长度字段本身的字节数。

     剩余长度字段使用一个变长度编码方案,对小于128的值它使用单字节编码,而对于更大的数值则按下面的方式处理:每个字节的低7位用于编码数据长度,最高位(bit7)用于标识剩余长度字段是否有更多的字节,且按照大端模式进行编码,因此每个字节可以编码128个数值和一个延续位,剩余长度字段最大可拥有4个字节。

    当剩余长度使用1个字节存储时,其取值范围为0(0x00)~127(0x7f)。

    当使用2个字节时,其取值范围为128(0x80,0x01)~16383(0Xff,0x7f)。

    当使用3个字节时,其取值范围为16384(0x80,0x80,0x01)~2097151(0xFF,0xFF,0x7F)。

    当使用4个字节时,其取值范围为2097152(0x80,0x80,0x80,0x01)~268435455(0xFF,0xFF,0xFF,0x7F)。

    总的来说,MQTT报文理论上可以发送最大256M的报文,当然,这种情况是非常少的。

    固定头存在于所有MQTT数据包中,其结构如下:

    6.1.1 MQTT数据包类型

    位置:Byte 1中bits 7-4。

    相于一个4位的无符号值,类型、取值及描述如下:

    6.1.2 标识位

    位置:Byte 1中bits 3-0。

     在不使用标识位的消息类型中,标识位被作为保留位。如果收到无效的标志时,接收端必须关闭网络连接:

    (1)DUP:发布消息的副本。用来在保证消息的可靠传输,如果设置为1,则在下面的变长中增加MessageId,并且需要回复确认,以保证消息传输完成,但不能用于检测消息重复发送。

    (2)QoS:发布消息的服务质量,即:保证消息传递的次数

    (3)RETAIN: 发布保留标识,表示服务器要保留这次推送的信息,如果有新的订阅者出现,就把这消息推送给它,如果设有那么推送至当前订阅者后释放。

    6.1.3 剩余长度(Remaining Length)

    地址:Byte 2。

     固定头的第二字节用来保存变长头部和消息体的总大小的,但不是直接保存的。这一字节是可以扩展,其保存机制,前7位用于保存长度,后一部用做标识。当最后一位为1时,表示长度不足,需要使用二个字节继续保存。例如:计算出后面的大小为0

    6.2 MQTT可变头

     MQTT数据包中包含一个可变头,它驻位于固定的头和负载之间。可变头的内容因数据包类型而不同,较常的应用是作为包的标识。

    只有某些报文才拥有可变报头,它在固定报头和有效负载之间,可变报头的内容会根据报文类型的不同而有所不同,但可变报头的报文标识符(Packet Identifier)字段存在于在多个类型的报文里,而有一些报文又没有报文标识符字段,具体见表格,报文标识符结构具体见图:

    6.3 Payload消息体

        Payload消息体位MQTT数据包的第三部分,包含CONNECT、SUBSCRIBE、SUBACK、UNSUBSCRIBE四种类型的消息:

    (1)CONNECT,消息体内容主要是:客户端的ClientID、订阅的Topic、Message以及用户名和密码。

    (2)SUBSCRIBE,消息体内容是一系列的要订阅的主题以及QoS。

    (3)SUBACK,消息体内容是服务器对于SUBSCRIBE所申请的主题及QoS进行确认和回复。

    (4)UNSUBSCRIBE,消息体内容是要订阅的主题。

    展开全文
  • linux mqtt客户端

    2019-07-10 14:09:52
    实现功能: ...(3)定时 PERIOD_TIME 发布 自身订阅的主题 信息,即循环PERIOD_TIME 发啥收啥。 说明: (1)主要根据 庆科的MiCO_A_v3.2.0/demos/net/mqtt_client 的 stm32 freeRTOS 移植到 li...

    实现功能:

    (1)定时30s发送心跳包;

    (2)接收 mqtt 数据包,解析函数是 user_recv_handle_cb;

    (3)定时  PERIOD_TIME 发布 自身订阅的主题 信息,即循环 PERIOD_TIME 发啥收啥。

    说明:

    (1)主要根据  庆科的MiCO_A_v3.2.0/demos/net/mqtt_client 的 stm32  freeRTOS  移植到 linux 平台。

    (2)实现方式:select、queue 、pthread。

    核心源码:

    /*************************************** 描述***********************
    作者: lee
    日期: 2019/7/2
    文件名:mqtt_client.c
    功能描述:
        1.定时30s发送心跳包
        2.接收 mqtt 数据包,解析函数是user_recv_handle_cb
        3.定时  PERIOD_TIME   发布 自身订阅的主题 信息,即循环 PERIOD_TIME 发啥收啥
    
    **********************************************************************/
    #include "./libraries/protocols/mqtt/MQTTClient.h"
    #include "/usr/local/include/uv.h"
    #include "pthread.h"
    #include "sys/select.h"
    #include "sys/queue.h"
    
    /*********************************
     *              Macros
     ***********************************************/
    #define app_log(M, ...) custom_log("APP", M, ##__VA_ARGS__)
    #define mqtt_log(M, ...) custom_log("MQTT", M, ##__VA_ARGS__)
    
    #define MQTT_CMD_TIMEOUT 5000 // 5s
    
    #define MAX_MQTT_TOPIC_SIZE  (256)
    #define MAX_MQTT_DATA_SIZE   (1024)
    
    #define MQTT_SERVER "127.0.0.1"
    //#define MQTT_SERVER "test.mosquitto.org"
    #define MQTT_SERVER_PORT 1883
    
    #define PERIOD_TIME   2000  // 2s
    
    /***********************************************
     *              Constants
     ***********************************************/
    #define MQTT_CLIENT_ID  "MiCO_MQTT_Client"
    #define MQTT_CLIENT_USERNAME NULL
    #define MQTT_CLIENT_PASSWORD NULL
    #define MQTT_CLIENT_KEEPALIVE 30
    #define MQTT_CLIENT_SUB_TOPIC "mico/test/send" // loop msg
    #define MQTT_CLIENT_PUB_TOPIC "mico/test/send"
    #define MQTT_YIELD_TMIE 5000 // 5s
    #define MQTT_CLIENT_PUB_MSG "mico_mqtt_client_test_data_1234567890"
    
    /***********************************************
     *              Structures
     ***********************************************/
    typedef struct {   
        char topic[MAX_MQTT_TOPIC_SIZE];
        char qos;
        char retained;
    
        uint8_t data[MAX_MQTT_DATA_SIZE];
        uint32_t datalen;
    } s_MQTT_Data_Packet_Info;
    
    struct node{
        STAILQ_ENTRY(node) next;
        void (*fp) (void*);  
        void *data;
    };
    
    /***********************************************
     *              Function Declarations
     ***********************************************/
    void user_send_cb(void* data);
    
    
    
    /***********************************************
     *              Variables Definitions
     ***********************************************/
     uv_req_t mqtt_client_recv_handle, mqtt_client_send_handle;
    
    volatile static bool no_mqtt_msg_exchange = true;
    
    Client c; // mqtt client object
    Network n; // socket network for mqtt client
    
    STAILQ_HEAD(head, node);
    struct head *lhead = 0;
    
    void user_recv_handle_cb(void* data){
        s_MQTT_Data_Packet_Info *p_recv_msg = (s_MQTT_Data_Packet_Info *)data;
    
        if (p_recv_msg)
        {
            实际工程中替换
            app_log("\t\t\t\t\tuser get data success! from_topic=[%s], msg=[%ld][%s].\r\n", p_recv_msg->topic, p_recv_msg->datalen, p_recv_msg->data);  
    
            free(p_recv_msg);
            p_recv_msg = NULL;
        }          
    }
    
    // call back, msg received from mqtt server
    static void messageArrived(MessageData* md){
        s_MQTT_Data_Packet_Info* p_recv_msg = NULL; 
    
        MQTTMessage* pMessage = md->message;
    
        p_recv_msg = (s_MQTT_Data_Packet_Info *)calloc(1, sizeof(s_MQTT_Data_Packet_Info));
        if (p_recv_msg == NULL)  
        {
            mqtt_log("malloc内存分配不足");
            return;
        }
        p_recv_msg->datalen = pMessage->payloadlen;
        p_recv_msg->qos = pMessage->qos;
        p_recv_msg->retained = pMessage->retained;
        strncpy(p_recv_msg->topic, md->topicName->lenstring.data, md->topicName->lenstring.len);
        memcpy(p_recv_msg->data, pMessage->payload, p_recv_msg->datalen + 1);  // lee:  !!!!!!!!!!!!!!!!!!!!   p_recv_msg->datalen + 1  不加1,会出现段错误 
    
        // mqtt_client_recv_handle.data = &p_recv_msg;
        // uv_queue_work(loop, &mqtt_client_recv_handle, user_recv_handle_cb, NULL);    //  lee !!!!!!!!!!!!!!!!!!!! 发现  libuv中的工作队列不能和select混用
        
        struct node process_func;
        process_func.data = p_recv_msg;
        process_func.fp = user_recv_handle_cb;
        STAILQ_INSERT_TAIL(lhead, &process_func, next); 
    
        p_recv_msg = NULL; 
    }
    
    static OSStatus mqtt_client_release(Client *c, Network *n){
        OSStatus err = kNoErr;
        
        if (c->isconnected) MQTTDisconnect(c);
    
        n->disconnect(n);// close connection
    
        if (MQTT_SUCCESS != MQTTClientDeinit(c)){
            app_log("MQTTClientDeinit failed!");
            err = kDeletedErr;
        }
    
        return err;
    }
    
    void* work_thread(void* arg){
        s_MQTT_Data_Packet_Info* p_send_msg = NULL;
    
        Timer period_timer;
        InitTimer(&period_timer);
        countdown_ms(&period_timer, PERIOD_TIME);
    
        while (1)
        {
            if (!STAILQ_EMPTY(lhead)){
                struct node* pfirst_node = STAILQ_FIRST(lhead);
                pfirst_node->fp(pfirst_node->data);
                STAILQ_REMOVE_HEAD(lhead, next);
            } 
            
            while (expired(&period_timer))
            {
                if (c.isconnected){
                    
                    p_send_msg = (s_MQTT_Data_Packet_Info*) calloc(1, sizeof(s_MQTT_Data_Packet_Info));                
                    if (p_send_msg == NULL) {
                        mqtt_log("没有内存可用");
                        continue;
                    }
                    
                    p_send_msg->qos = 0;
                    p_send_msg->retained = 0;
                    p_send_msg->datalen = strlen(MQTT_CLIENT_PUB_MSG);
                    memcpy(p_send_msg->data, MQTT_CLIENT_PUB_MSG, p_send_msg->datalen);
                    strncpy(p_send_msg->topic, MQTT_CLIENT_PUB_TOPIC, MAX_MQTT_TOPIC_SIZE);
    
                    struct node process_func;
                    process_func.data = p_send_msg;
                    process_func.fp = user_send_cb;
                    STAILQ_INSERT_TAIL(lhead, &process_func, next); 
                
                    p_send_msg = NULL;
                }
                else
                {
                    mqtt_log("MQTT client does not init ok");
                }
                
                countdown_ms(&period_timer, PERIOD_TIME);   
            } 
        }
        pthread_exit(NULL);   
    }
    
    
    void* mqtt_client_thread(void* arg){
        OSStatus err = kUnknownErr;
    
        int rc = -1;
        fd_set readfds;
        struct timeval t = { 0, MQTT_YIELD_TMIE * 1000};
    
        ssl_opts ssl_settings;
        MQTTPacket_connectData connectData = MQTTPacket_connectData_initializer;
    
        memset(&c, 0, sizeof(c));
        memset(&n, 0, sizeof(n));
    
    MQTT_start:
        // create network connection
        ssl_settings.ssl_enable = false;
    
        mqtt_log("enter into mqtt client thread.");
        while (1){
            rc = NewNetwork(&n, MQTT_SERVER, MQTT_SERVER_PORT, ssl_settings);
            if (rc == MQTT_SUCCESS) break;
            mqtt_log("ERROR: MQTT network connection err=%d, reconnect after 3s...", rc);
            sleep(3);
        }
    
        mqtt_log("MQTT network connection success!");
    
        // 2.init mqtt client
        rc = MQTTClientInit(&c, &n, MQTT_CMD_TIMEOUT);
        require_noerr_string(rc, MQTT_reconnect, "ERROR: MQTT client init err.");
    
        mqtt_log("MQTT client init success!");
    
        // 3.create mqtt client connection
        connectData.willFlag = 0;
        connectData.MQTTVersion = 4; // 3: 3.1, 4: v3.1.1
        connectData.clientID.cstring = MQTT_CLIENT_ID;
        connectData.username.cstring = MQTT_CLIENT_USERNAME;
        connectData.password.cstring = MQTT_CLIENT_PASSWORD;
        connectData.keepAliveInterval = MQTT_CLIENT_KEEPALIVE;
        connectData.cleansession = 1;
    
        rc = MQTTConnect(&c, &connectData);
        require_noerr_string(rc, MQTT_reconnect, "ERROR: MQTT client connect err.");
    
        mqtt_log("MQTT client connect success!");
    
        // 4.mqtt client subscribe
        rc = MQTTSubscribe(&c, MQTT_CLIENT_SUB_TOPIC, QOS0, messageArrived);
        require_noerr_string(rc, MQTT_reconnect, "ERROR: MQTT client subscribe err.");
    
        mqtt_log("MQTT client subscribe success! recv_topic=[%s].", MQTT_CLIENT_SUB_TOPIC);
    
        // 5. client loop for recv msg && keepalive
        while (1){
            no_mqtt_msg_exchange = true;
            FD_ZERO(&readfds);
            FD_SET(c.ipstack->my_socket, &readfds);
            select(c.ipstack->my_socket + 1, &readfds, NULL, NULL, &t);
            
            // recv msg from server
            if (FD_ISSET( c.ipstack->my_socket, &readfds)){
                rc = MQTTYield(&c, (int)MQTT_YIELD_TMIE);
                require_noerr(rc, MQTT_reconnect);
                no_mqtt_msg_exchange = false;
            }
        
            //if no msg exchange, we need to check ping msg to keep alive
            if (no_mqtt_msg_exchange){
                rc = keepalive(&c);
                    require_noerr_string(rc, MQTT_reconnect, "ERROR: keep alive err");
            }
                
        }  
    
    MQTT_reconnect:
        mqtt_log("Disconnect MQTT client, and reconnect after 5s, reason: mqtt_rc = %d, err = %d", rc, err);
        mqtt_client_release(&c, &n);
        sleep(5);
        goto MQTT_start;
    
    exit:
        mqtt_log("EXIT: MQTT client exit with err = %d.", err);
        mqtt_client_release(&c, &n);
        pthread_exit(NULL);
    }
    
    static OSStatus mqtt_msg_publish(Client *c, const char* topic, char qos, char retained,
                                    const unsigned char* msg,
                                    uint32_t msg_len){
        OSStatus err = kUnknownErr;
        int ret = 0;
        MQTTMessage publishData = MQTTMessage_publishData_initializer;
    
        require(topic && msg_len && msg, exit);
    
        //upload data qos0
        publishData.qos = qos;
        publishData.retained = retained;
        publishData.payload = (void*)msg;
        publishData.payloadlen = msg_len;
    
        ret = MQTTPublish(c, topic, &publishData);
    
        if (MQTT_SUCCESS == ret){
            err = kNoErr;
        }else{
            err = kUnknownErr;
        }
    
    exit:
        return err;
    }
    
    void user_send_cb(void* data){
        OSStatus err = kNoErr;
    
        s_MQTT_Data_Packet_Info* p_send_msg = (s_MQTT_Data_Packet_Info*)data;
        require_noerr_string((p_send_msg == NULL), exit, "没有内存可用");
       
        err = mqtt_msg_publish(&c, p_send_msg->topic, p_send_msg->qos, p_send_msg->retained,
                            p_send_msg->data,
                            p_send_msg->datalen);
        require_noerr_string(err, exit, "publish失败");
    
        mqtt_log("MQTT publish data success! send_topic=[%s], msg=[%ld][%s].\r\n", p_send_msg->topic, p_send_msg->datalen, p_send_msg->data);
    
        no_mqtt_msg_exchange = false; // 在当前情况下,多发一次或少发一次,无关紧要,无需用互斥锁
                  
    exit:
        if (p_send_msg != NULL){
            free(p_send_msg);  
            p_send_msg = NULL;
        }
    
    }
    
    
    int main(void){
        // void *rval;
        
        OSStatus err = kNoErr;
    
        lhead = (struct head*)malloc(sizeof(struct head));
        STAILQ_INIT(lhead);
        
        pthread_t mqtt_client_handle, work_thread_Handle/*, timer_thread_Handle*/;
        
        // 默认堆栈大小为8M, 嵌入式里太大,重新设置
        pthread_attr_t attr;
        err = pthread_attr_init(&attr);
        require_noerr_string(err, exit, "ERROR: Unable to init thread attr.");
    
        err = pthread_attr_setstacksize(&attr, 16384);// 堆栈大小不能小于16384Byte 
        require_noerr_string(err, exit, "ERROR: Unable to set thread size.");
    
        err = pthread_create(&mqtt_client_handle, &attr, mqtt_client_thread, NULL);
        require_noerr_string(err, exit, "ERROR: Unable to start the mqtt client thread.");
    
        err = pthread_create(&work_thread_Handle, &attr, work_thread, NULL);
        require_noerr_string(err, exit, "ERROR: Unable to start the work thread.");
    
        // err = pthread_create(&timer_thread_Handle, NULL, timer_thread, NULL);
        // require_noerr_string(err, exit, "ERROR: Unable to start the timer thread.");
        // loop = uv_default_loop();
        
        // uv_timer_t period_timer;
        // uv_timer_init(loop, &period_timer);
        // uv_timer_start(&period_timer, user_send_handler, 0, 2000);
    
        // err = uv_run(loop, UV_RUN_DEFAULT); 
        // require_noerr_string(err, exit, "ERROR: Unable to run uv loop.");
        // struct timerval interval;
        // struct itimerval timer;
    
        pthread_join(work_thread_Handle, NULL);
        pthread_join(mqtt_client_handle, NULL);   重点,当主线程没有其他可执行的循环时,一定要加此句
    
        pthread_attr_destroy(&attr);
    
    exit :
        if (err != kNoErr){
            app_log("ERROR: app thread exit err: %d", err);   
        }
    
        free(lhead);
        lhead  = NULL;
    
        return err;
    }

    整个工程源码:

    链接: https://pan.baidu.com/s/10w8a9X_7prtYyHsmMUj7Sw   

    提取码: 48aa 

    参考资料:

    linux c MQTT客户端实现

    https://www.jianshu.com/p/d309de966379

     

    展开全文
  • 这里写自定义目录标题mosquitto命令参数说明mosquitto_pub(发布)命令参数说明mosquitto_sub(订阅)命令参数说明重启或关闭命令简单的使用这些命令 mosquitto命令参数说明 -c 配置文件 从文件中加载配置参数 如果...

    mosquitto命令参数说明

    -c 配置文件
    从文件中加载配置参数 如果没有指定默认路径为/etc/mosquitto/mosquitto.conf

    -d 表示以后台守护进程的形式启动

    -p 指定监听端口 默认为1883

    -v 监控日志 类似于在配置文件中把log_type 设置为all

    mosquitto_pub(发布)命令参数说明

    1. -d 打印debug信息
    2. -f 将指定文件的内容作为发送消息的内容
    3. -h 指定要连接的域名 默认为localhost
    4. -i 指定要给哪个clientId的用户发送消息
    5. -I 指定给哪个clientId前缀的用户发送消息
    6. -m 消息内容
    7. -n 发送一个空(null)消息
    8. -p 连接端口号
    9. -q 指定QoS的值(0,1,2)
    10. -t 指定topic
    11. -u 指定broker访问用户
    12. -P 指定broker访问密码
    13. -V 指定MQTT协议版本
    14. –will-payload 指定一个消息,该消息当客户端与broker意外断开连接时发出。该参数需要与–will-topic一起使用
    15. –will-qos Will的QoS值。该参数需要与–will-topic一起使用
    16. –will-retain 指定Will消息被当做一个retain消息(即消息被广播后,该消息被保留起来)。该参数需要与–will-topic一起使用
    17. –will-topic 用户发送Will消息的topic

    mosquitto_sub(订阅)命令参数说明

    1. -c 设定‘clean session’为无效状态,这样一直保持订阅状态,即便是已经失去连接,如果再次连接仍旧能够接收的断开期间发送的消息。
    2. -d 打印debug信息
    3. -h 指定要连接的域名 默认为localhost
    4. -i 指定clientId
    5. -I 指定clientId前缀
    6. -k keepalive 每隔一段时间,发PING消息通知broker,仍处于连接状态。 默认为60秒。
    7. -q 指定希望接收到QoS为什么的消息 默认QoS为0
    8. -R 不显示陈旧的消息
    9. -t 订阅topic
    10. -v 打印消息
    11. –will-payload 指定一个消息,该消息当客户端与broker意外断开连接时发出。该参数需要与–will-topic一起使用
    12. –will-qos Will的QoS值。该参数需要与–will-topic一起使用
    13. –will-retain 指定Will消息被当做一个retain消息(即消息被广播后,该消息被保留起来)。该参数需要与–will-topic一起使用
    14. –will-topic 用户发送Will消息的topic

    重启或关闭命令

    ps -aux| grep mosquitto

    kill -9 xxx
    然后kill停止,然后重新启动

    有时会遇到这种情况:在这里插入图片描述
    然后就需要用到这个命令了。

    简单的使用这些命令

    mqtt的工作方式如下:
    在这里插入图片描述
    第一步:先要先要启动代理服务
    使用命令 mosquitto -v 或 mosquitto -c mosquitto.conf
    在这里插入图片描述

    第二步:再开启一个服务器窗口,sub一个主题
    使用命令:
    mosquitto_sub -h localhost -t test -d,加-d表示后台运行( localhost 和 127.0.0.1 代表本地IP的意思 )

    在这里插入图片描述
    在broker(代理)上也可以看到相应的信息
    在这里插入图片描述

    第三步:再开启一个服务器窗口,pub一个消息到刚才订阅的主题上
    使用命令:
    mosquitto_pub -h localhost -m “学习mqtt很快乐” -t test -d

    在这里插入图片描述

    对应在订阅者上可以看到刚才发布的消息
    在这里插入图片描述

    展开全文
  • linux c MQTT客户端实现

    千次阅读 2020-03-24 11:20:04
    mqtt协议是轻量级的消息订阅和发布(publish/subscribe)协议,建立在TCP/IP协议之上,在物联网应用中广泛使用。 二、源码下载: 链接:https://pan.baidu.com/s/1S1pT_ZZURg21DF5mIBg3pw 密码:tqgh 二使用说明: 1...

    一、前言:
    mqtt协议是轻量级的消息订阅和发布(publish/subscribe)协议,建立在TCP/IP协议之上,在物联网应用中广泛使用。

    二、源码下载:
    链接:https://pan.baidu.com/s/1S1pT_ZZURg21DF5mIBg3pw 密码:tqgh

    二使用说明:
    1.下载解压出来进入mqtt文件夹内容如图src.png所示:
    在这里插入图片描述
    2.将mqtt文件夹拷贝linux ununtu下面:
    在ununtu终端下根据如下步骤执行命令:
    1)make clean //清理项目
    2)vim mqtt.c //打开mqtt.c文件将如图:ip.png所示格式修改为自己的服务器ip地址、端口、用户名和密码:
    在这里插入图片描述
    ip.png

    wq保存退出;
    3)make //编译项目
    ./mqtt_demo//运行成功如图data.png所示(运行之前需要运行windows paho帮助测试,在本文下面有介绍)
    在这里插入图片描述
    data.png

    ①:订阅主题:2017/my/todev
    ②:发布主题:2017/my/toapp
    ③:接收到数据打印:asdfafs
    ④:ctrl + c//结束运行

    3.运行windows paho来测试mqtt_demo(没有安装可以安装一下连接进行安装:https://www.jianshu.com/p/48c36b72fec2):
    1)首先根据图:login.png所示:点击左上角绿色+号->点击选项->输入用户名和密码->点击MQTT返回主主界面
    在这里插入图片描述
    login.png

    2)图msg.png步骤解析如下:
    ①:输入服务器ip地址和端口号
    ②:点击连接
    ③:点击绿色小+号订阅主题
    ④:输入主题名字
    ⑤:把需要订阅的主题勾上
    ⑥:点击订阅
    ⑦:输入发布主题
    ⑧:输入发布的内容
    ⑨:点击发布
    ⑩:接收到订阅该主题发布出来的内容(linux 下面客户端程序发出来的)
    ⑪:自己发布出去的内容
    ⑫:linux 客户端掉线发布出来的遗嘱
    在这里插入图片描述

    展开全文
  • paho实现MQTTClient发布消息

    千次阅读 2020-10-25 12:09:44
    paho实现MQTTClient发布消息 接下来会用paho开源的一个项目,实现mqtt客户端发布消息,此文主要参考MQTT Client library for C,Paho给出的创建一个客户端有如下类似的步骤: 1、安装 //从github上下载项目 git ...
  • linux环境 MQTT测试与使用

    千次阅读 2019-06-27 10:06:59
    目录 1.在命令窗口测试 ...5.linux 环境 测试代码 1.在命令窗口测试 1)订阅端 mosquitto_sub -h localhost -t test -v 用 -h 参数指定服务器 IP ,用 -t 参数指定订阅的话题。( localhost 和 127.0....
  • MQTTMQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它...
  • 说明:工程分为两个。一个是Linux C语言编写的MQTT客户端,另一个是websocket编写的MQTT客户端,先运行Linux的,再运行websocket就出实验现象了。(发布的主要是温湿度数据、继电器控制状态、GPS定位系统等等)
  • 首次使用MQTT,入了很多坑,搞了一下午才配置完成,在网上找了很多教程,大多数都没有成功,经过提炼、摸索,终于实现了发送和接收,特记录次博客,以便更多人学习。有不对的,欢迎指教! 1、安装Centos下的必要软件...
  • LinuxMQTT环境搭建

    万次阅读 多人点赞 2018-05-01 21:55:44
    linux上搭建mqtt服务器并不难,主要就是用到了mosquitto这款消息代理服务软件其采用发布/订阅模式传输机制,轻量、简单、开放并易于实现,被广泛应用于物联网之中我的linux版本为centos6.7_x861、安装软件输入以下...
  • 基于请求响应的单对单同步通信协议 -- HTTP 已经难以满足大规模物联网设备的通信需求,因此需要为物联网通信设计一种新的消息传递机制,MQTT 实现的订阅-发布消息模型就可以满足物联网通信需求。什么是订阅-发布消息...
  • MQTT与Eclipse Paho 使用Java的简单发布者和订阅者。 它在给定主题上发布“静态”温度值,并收听该主题。
  • 嵌入式Linux硬件很多,在网上可以买到很多款,我采用了一款带4G和SDK开发环境的HJ8300硬件,采用MIPS处理,580Mhz的主频,128M内存,作为MQTT的开发已经足够。 HJ8300已经集成了GCC、GDB和LIB等编译调试工具,用SSH...
  • linux中安装mqtt服务(mosquitto): 以centOS8为例子: 一、准备工作: 1、查看系统的版本 cat /etc/redhat-release 2、查看防火墙的状态: 可参考陆详细内容:...
  • 安装mqtt服务实现mqtt通讯 ...最近在开发mqtt相关的硬件通讯,自己用虚拟机在linux环境搭了一个mqtt服务器,实现简单的通讯演示 一、MQTT通讯简介 1.结构模式图 mqtt通讯有服务端和客户端,服务端统一收集和分发信息。
  • libmosquitto客户端编程(发布消息,订阅消息) 在安装mosquitto成功后,可以在/usr/local/lib文件夹中有libmosquitto.so.1等文件,如下图所示:      7. mosquitto使用 进入命令行界面,用cd ...
  • 在节点上,它创建一个TUN接口,分配一个IP地址,并将所有数据包发布MQTT主题mqttip / [目标IP地址]。 同时,它订阅了MQTT主题mqttip / [自己的IP地址]。 这将建立一个完全连接的IP网络。 “切换”是通过MQTT代理...
  • LinuxMQTT安装及测试

    千次阅读 2019-10-24 23:06:15
    物联网老师上课的时候突然让用MQTT写一个通信程序,实现命令控制,他说的是客户端输入ls,那边就得返回目录信息,不是很懂,查了一大堆也不会,今天有点时间把第一步走了。 MQTT程序安装 千里之行,始于足下 这...
  • 简单说就是要往一款产品上用MQTT,而目前产品的开发板还没拿到,所以先在现有的HI3516DV300上移植一个MQTT并且用起来 所以真的是标题党 目标: <1>在windows主机上搭建MQTT服务器,用来调试 <2>移植...
  • 首先我们需要一个MQTT的服务器作为消息分发处理的核心,这里使用开源的EMQX---https://github.com/emqx/emqx EMQX使用指南:https://docs.emqx.io/broker/v3/cn/ 这个网址里面的是其官方的指导手册,内容非常齐全...
  • MQTT--linux安装部署

    千次阅读 2017-10-24 01:06:49
    "mqtt"为主题名,假如有客户端发布了主题为"mqtt"的消息,这个终端将会收到消息的内容。 mosquitto_sub -t mqtt 终端二:  这里就是发布一个主题为"mqtt",内容为"hello mqtt"的消息。 mosquitto_pub -h ...
  • LINUX--ubuntu移植MQTT并测试成功

    千次阅读 2019-03-26 09:44:45
    MQTT 协议的中心是 broker( 服务器/代理) ,客户端通过订阅消息和发布消息进行数据交互 ,如下图所示: 2,Mosquitto安装 (1)mosquitto下载 https://mosquitto.org/download/ 进入下载页面,选择source中...
  • linux配置mqtt支持websocket并配置用户名和密码记录一次老大叫我安装mqtt1、安装依赖包2、安装libwebsockets3、安装mosquitto4、增加webScokets支持5、创建用户名和密码、打开命令窗口 键入如下命令6、配置权限7、...
  • 前面我们讲到,MQTT协议提供了一种消息负载小的publish/subscribe模型,使得它很适合物联网(Internet of Things)通信,比如低功耗传感器或手机、嵌入式设备。 而Mosquitto是一个开源 的使用MQTT协议的消息代理软件...
  • MQTT.fx 是目前主流的mqtt客户端,可以快速验证是否可以与IoT Hub 服务交流发布或订阅消息。设备将当前所处的状态作为MQTT主题发送给IoT Hub,每个MQTT主题topic具有不同等级的名称,如“建筑/楼层/温度。” MQTT...
  • MQTT开源库mosquitto

    2018-05-22 14:00:37
    MQTT(MQ Telemetry Transport),消息队列遥测传输协议,轻量级的发布/订阅协议,适用于一些条件比较苛刻的环境,进行低带宽、不可靠或间歇性的通信。目前已经是物联网消息通信事实上的标准协议了。
  • mosquitto库实现发布与订阅一、下载安装mosquitto二、mosquitto发布与订阅三、mosquitto运行实例 一、下载安装mosquitto (1)下载地址:https://mosquitto.org/download/ Linux命令行输入 wget ...
  • 发布保留的消息 发布消息 订阅命名主题 清洁/不清洁的会议 手动和自动客户端ID生成 显示带有通配符订阅的主题名称 预定义的主题ID和简短的主题名称 根据MQTT-SN协议规范v1.2的转发器封装。 局限性 封包的长度必须...
  • MQTT(消息队列遥测传输)是ISO 标准(ISO/IEC PRF 20922)下基于发布/订阅范式的消息协议。它工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型消息协议,为此,它需要一...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 2,979
精华内容 1,191
关键字:

linuxmqtt发布消息

linux 订阅