精华内容
下载资源
问答
  • mqtt异步通信的简单复现,采用官网的示例,官网例子:发布端代码 订阅端 二、准备工作 本地的ubuntu系统,云端的centos系统,均安装了paho.mqtt.c以及mosquitto,如何安装,网上资料多,这里不再复述 安装paho....

    一、问题重述

    mqtt异步通信的简单复现,采用官网的示例,官网例子:发布端代码   订阅端

    二、准备工作

    本地的ubuntu系统,云端的centos系统,均安装了paho.mqtt.c 以及mosquitto,如何安装,网上资料多,这里不再复述

    安装paho.mqtt.c后,在/usr/local/lib目录下有相应的库

     异步通信,要使用的是paho-mqtt3a.so库

    这里记录一下启动mosquitto服务器的命令:mosquitto -c /etc/mosquitto/mosquitto.conf -d

    三、编译测试

    编译发布端和订阅端代码,编译命令:gcc -o s   subscribe.c    -L /usr/local/lib   -lpaho-mqtt3a   

    编译成功后,先运行订阅端代码,再运行发布端代码

    代码中需要修改的地方,主要是mqtt服务器地址,其余地方可改可不改。

     

     

     

     

     

     

    展开全文
  • 上一篇文章简单提及了以异步函数和同步函数对比,由于异步函数是非阻塞的,所以性能上比同步函数要稍好些,所以也就常用异步函数来实现MQTT的通讯。 异步函数与同步函数两者的差别就是在连接服务器的connect函数、...

    1、前言

    上一篇文章简单提及了以异步函数和同步函数对比,由于异步函数是非阻塞的,所以性能上比同步函数要稍好些,所以也就常用异步函数来实现MQTT的通讯。

    异步函数与同步函数两者的差别就是在连接服务器的connect函数、loop循环函数。那接下来就简单探究一下loop函数的调用方式:同步函数是调用mosquitto_loop函数来阻塞等待实现的一种通信;而查看源码我们就会发现,异步方式的"loop"函数就是创建了一个线程去完成同步方式中导致阻塞等待的mosquitto_loop函数,其调用过程如下:

    mosquitto_loop_start(mosq);		// 异步方式的loop
    	pthread_create(&mosq->thread_id, NULL, mosquitto__thread_main, mosq)
    		mosquitto_loop_forever(mosq, 1000*86400, 1);
    			mosquitto_loop(mosq, timeout, max_packets);		// 同步方式的loop
    
    mosquitto_loop_stop(mosq, false);
    	pthread_cancel(mosq->thread_id);
    	pthread_join(mosq->thread_id, NULL);
    

    了解同步异步函数的调用区别之后,我们继续看一下异步方式连接服务器函数mosquitto_connect_async的官方说明:

    Connect to an MQTT broker. This is a non-blocking call. If you use mosquitto_connect_async your client must use the threaded interface mosquitto_loop_start. If you need to use mosquitto_loop, you must use mosquitto_connect to connect the client.
    May be called before or after mosquitto_loop_start.

    现在我们可以根据connect函数与loop函数的匹配关系编写异步方式的通讯程序:


    2、订阅端

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <unistd.h>
    #include "mosquitto.h"
    
    
    /* 设置打印开关 */
    #define DEBUG_PROCESS   printf
    #define DEBUG_ERROR     printf
    #define DEBUG_MSG       printf
    
    
    // 定义运行标志决定是否需要结束
    static int g_iRunFlag = 1;
    
    
    /* 回调函数里添加自己的额外操作即可,mosquitto_xx函数已实现底层操作 */
    void on_connect(struct mosquitto *mosq, void *obj, int rc);
    void on_disconnect(struct mosquitto *mosq, void *obj, int rc);
    void on_subscribe(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos);
    void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg);
    
    
    int main()
    {
            int ret;
            struct mosquitto *mosq;
    
            // 初始化mosquitto库
            ret = mosquitto_lib_init();
            if(ret){
                    DEBUG_ERROR("Init lib error!\n");
                    return -1;
            }
    
            // 创建一个实例
            // 参数:id(不需要则为NULL)、clean_start、用户数据
            mosq =  mosquitto_new("Rookie_sub", true, NULL);
            if(mosq == NULL){
                    DEBUG_ERROR("New error!\n");
                    mosquitto_lib_cleanup();
                    return -1;
            }
    
    
            // 设置连接的用户名与密码:,
            // 参数:句柄、用户名、密码
            ret = mosquitto_username_pw_set(mosq, "user_test", "123456");
            if(ret){
                    DEBUG_ERROR("Set username and password error!\n");
                    mosquitto_destroy(mosq);
                    mosquitto_lib_cleanup();
                    return -1;
            }
    
            // 设置回调函数
            // 参数:句柄、回调函数
            mosquitto_connect_callback_set(mosq, on_connect);
            mosquitto_disconnect_callback_set(mosq, on_disconnect);
            mosquitto_subscribe_callback_set(mosq, on_subscribe);
            mosquitto_message_callback_set(mosq, on_message);
    
            // 连接至服务器
            // 参数:句柄、ip(host)、端口、心跳
            // ret = mosquitto_connect_async(mosq, "127.0.0.1", 2020, 60);
            ret = mosquitto_connect_async(mosq, "127.0.0.1", 2020, 60);
            if(ret){
                    DEBUG_ERROR("Connect server error!\n");
                    mosquitto_destroy(mosq);
                    mosquitto_lib_cleanup();
                    return -1;
            }
    
            ret = mosquitto_loop_start(mosq);
            if(ret){
                    DEBUG_ERROR("Start loop error!\n");
                    mosquitto_destroy(mosq);
                    mosquitto_lib_cleanup();
                    return -1;
            }
    
            // 开始通信:循环执行、直到运行标志g_iRunFlag被改变
            DEBUG_PROCESS("Start!\n");
            while(g_iRunFlag)
            {
                    //mosquitto_loop(mosq, -1, 1);
                    sleep(1);
            }
    
            // 结束后的清理工作
            mosquitto_loop_stop(mosq, false);
            mosquitto_destroy(mosq);
            mosquitto_lib_cleanup();
            DEBUG_PROCESS("End!\n");
    
            return 0;
    }
    
    
    void on_connect(struct mosquitto *mosq, void *obj, int rc)
    {
            DEBUG_PROCESS("Call the function: on_connect\n");
    
            if(rc){
                    // 连接错误,退出程序
                    DEBUG_ERROR("on_connect error!\n");
                    exit(1);
            }else{
                    // 订阅主题
                    // 参数:句柄、id、订阅的主题、qos
                    if(mosquitto_subscribe(mosq, NULL, "test/+", 2)){
                            DEBUG_ERROR("Set the topic error!\n");
                            exit(1);
                    }
            }
    }
    
    void on_disconnect(struct mosquitto *mosq, void *obj, int rc)
    {
            DEBUG_PROCESS("Call the function: on_disconnect\n");
    
            g_iRunFlag = 0;
    }
    
    void on_subscribe(struct mosquitto *mosq, void *obj, int mid, int qos_count, const int *granted_qos)
    {
            DEBUG_PROCESS("Call the function: on_subscribe\n");
    }
    
    void on_message(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg)
    {
            DEBUG_PROCESS("Call the function: on_message\n");
            DEBUG_MSG("Recieve a message: %s\n", (char *)msg->payload);
    
            if(0 == strcmp(msg->payload, "quit")){
                    mosquitto_disconnect(mosq);
            }
    }
    

    3、发布端

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <unistd.h>
    #include "mosquitto.h"
    
    #define DEBUG_PROCESS   printf
    #define DEBUG_ERROR     printf
    #define DEBUG_MSG       printf
    
    static int g_iRunFlag = 1;
    
    void on_connect(struct mosquitto *mosq, void *obj, int rc);
    void on_disconnect(struct mosquitto *mosq, void *obj, int rc);
    void on_publish(struct mosquitto *mosq, void *obj, int mid);
    
    
    int main()
    {
            int ret;
            struct mosquitto *mosq;
    
            ret = mosquitto_lib_init();
            if(ret){
                    DEBUG_ERROR("Init lib error!\n");
                    return -1;
            }
    
            mosq =  mosquitto_new("Rookie_pub", true, NULL);
            if(mosq == NULL){
                    DEBUG_ERROR("New error!\n");
                    mosquitto_lib_cleanup();
                    return -1;
            }
    
            // 设置MQTT遗言信息
            // 参数:句柄、主题、消息长度、消息内容、qos、是否保留
            ret = mosquitto_will_set(mosq, "test/will", strlen("pub_will_message"), "pub_will_message", 2, false);
            if(ret){
                    DEBUG_ERROR("Set will message error!\n");
                    return -1;
            }
    
            ret = mosquitto_username_pw_set(mosq, "user_test", "123456");
            if(ret){
                    DEBUG_ERROR("Set username and password error!\n");
                    mosquitto_destroy(mosq);
                    mosquitto_lib_cleanup();
                    return -1;
            }
    
            mosquitto_connect_callback_set(mosq, on_connect);
            mosquitto_disconnect_callback_set(mosq, on_disconnect);
            mosquitto_publish_callback_set(mosq, on_publish);
    
            ret = mosquitto_loop_start(mosq);
            if(ret){
                    DEBUG_ERROR("Start loop error!\n");
                    mosquitto_destroy(mosq);
                    mosquitto_lib_cleanup();
                    return -1;
            }
    
            // ret = mosquitto_connect(mosq, "127.0.0.1", 2020, 60);
            ret = mosquitto_connect_async(mosq, "127.0.0.1", 2020, 60);
            if(ret){
                    DEBUG_ERROR("Connect server error!\n");
                    mosquitto_destroy(mosq);
                    mosquitto_lib_cleanup();
                    return -1;
            }
    
            DEBUG_PROCESS("Start!\n");
            while(g_iRunFlag)
            {
                    //mosquitto_loop(mosq, -1, 1);
                    sleep(1);
            }
    
            mosquitto_loop_stop(mosq, false);
            mosquitto_destroy(mosq);
            mosquitto_lib_cleanup();
            DEBUG_PROCESS("End!\n");
    
            return 0;
    }
    
    
    void on_connect(struct mosquitto *mosq, void *obj, int rc)
    {
            DEBUG_PROCESS("Call the function: on_connect\n");
    
            if(rc){
                    DEBUG_ERROR("on_connect error!\n");
                    exit(1);
            } else {
                            // 参数:句柄、id、主题、消息长度、消息、Qos、是否保留
                    if(mosquitto_publish(mosq, NULL, "test/common", strlen("hello"), "hello", 2, false)){
                            DEBUG_ERROR("Set the topic error!\n");
                            exit(1);
                    }
            }
    }
    
    void on_disconnect(struct mosquitto *mosq, void *obj, int rc)
    {
            DEBUG_PROCESS("Call the function: on_disconnect\n");
    
            g_iRunFlag = 0;
    }
    
    void on_publish(struct mosquitto *mosq, void *obj, int mid)
    {
            DEBUG_PROCESS("Call the function: on_publish\n");
    
            sleep(2);
    
            mosquitto_disconnect(mosq);
    }
    
    展开全文
  • LinuxMQTT环境搭建

    万次阅读 多人点赞 2018-05-01 21:55:44
    linux上搭建mqtt服务器并不难,主要就是用到了mosquitto这款消息代理服务软件其采用发布/订阅模式传输机制,轻量、简单、开放并易于实现,被广泛应用于物联网之中我的linux版本为centos6.7_x861、安装软件输入以下...

    在linux上搭建mqtt服务器并不难,主要就是用到了mosquitto这款消息代理服务软件

    其采用发布/订阅模式传输机制,轻量、简单、开放并易于实现,被广泛应用于物联网之中



    我的linux版本为centos6.7_x86


    1、安装软件

    输入以下指令,挨个安装:

    yum install gcc-c++
    yum install cmake
    yum install openssl-devel

    新建个software文件夹,下载mosquitto,下个不高不低的版本,并解压:

    mkdir software
    cd software
    wget http://mosquitto.org/files/source/mosquitto-1.4.10.tar.gz
    tar -xzvf mosquitto-1.4.10.tar.gz

    但这里还不能编译安装mosquitto

    下面的三款扩展性软件,不安装也不影响mosquitto的使用:

    安装c-areas(支持异步DNS查找的库):

    wget http://c-ares.haxx.se/download/c-ares-1.10.0.tar.gz
    tar xvf c-ares-1.10.0.tar.gz
    cd c-ares-1.10.0
    ./configure
    make
    sudo make install

    安装lib-uuid(支持为每个连接客户端生成唯一uuid)

    yum install libuuid-devel

    安装libwebsockets(支持需使用websocket的应用):

    wget https://github.com/warmcat/libwebsockets/archive/v1.3-chrome37-firefox30.tar.gz
    tar zxvf v1.3-chrome37-firefox30.tar.gz
    cd libwebsockets-1.3-chrome37-firefox30
    mkdir build
    cd build
    cmake .. -DLIB_SUFFIX=64
    make install

    上面的有可能没安装成功,尤其是第三个,不过并无大碍,只不过没有相应的功能


    我们修改一下mosquitto的配置:

    cd mosquitto-1.4.10
    vim config.mk
    将里面的WITH_SRV:=yes和WITH_UUID:=yes都用#号注释掉


    接下来编译安装mosquitto:

    make
    sudo make install

    注意:如果在后续使用过程中找不到libmosquitto.so.1的话,在software下输入以下指令修改一下libmosquitto.so的位置:

    sudo ln -s /usr/local/lib/libmosquitto.so.1 /usr/lib/libmosquitto.so.1
    sudo ldconfig


    2、启动测试

    创建用户:

    sudo groupadd mosquitto
    sudo useradd -g mosquitto mosquitto

    这里要是出现什么存在不存在的问题,一定是你copy错了

    程序配置:

    mv /etc/mosquitto/mosquitto.conf.example /etc/mosquitto/mosquitto.conf

    启动程序:

    mosquitto -c /etc/mosquitto/mosquitto.conf -d

    默认端口为1883


    最后我们再打开一个服务器窗口,在一个(订阅)窗口输入:

    mosquitto_sub -t hello
    另一个(发布)窗口输入:
    mosquitto_pub -t hello -h localhost -m "hello world!"

    程序截图:




    这样,我们就成功订阅了主题为hello的消息了

    此过程我在树莓派上也搭建过,需自己多添加几个sudo便可以成功操作

    如果不行请从make那一步继续无误地操作一遍

    展开全文
  • 异步方式就是先设置对应的结构体,之后就去启动对应的操作,这些操作后台执行完成之后就会调用相结构体里面的成功或失败函数,我们就可以在这些被调用的函数里面执行我们下一步的操作。 2、订阅端 程序一开始就根据...

    1、前言

    异步函数的好处就是以非阻塞的方式去运行,但它相比于同步函数的结构就要稍微复杂一些。异步方式就是先设置对应的结构体,之后就去启动对应的操作,这些操作后台执行完成之后就会调用相结构体里面的成功或失败函数,我们就可以在这些被调用的函数里面执行我们下一步的操作。


    2、订阅端

    程序一开始就根据我们定义的宏去创建一个实例和设置回调函数,接着定义一个“连接”的结构体填充用户密码、函数指针,这个结构体中将文件句柄作为context传进去供onConnectSuccess等函数去获取,因为连接成功的函数里面需要文件句柄去订阅主题。订阅主题之后服务器端就会根据主题发送匹配的消息,消息到来之后就会调用上面设置的回调函数msgarrvd,在该函数里面就可以获得消息内容去执行相应的操作。

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <unistd.h>
    #include "MQTTAsync.h"
    
    
    #define ADDRESS     "tcp://127.0.0.1:2020"
    #define CLIENTID    "Rookie_sub"
    #define TOPIC       "test/+"
    #define USERNAME	"user_test"
    #define PASSWD		"123456"
    #define QOS         2
    #define KEEPALIVE	20
    
    int g_iRunFlag = 1;
    
    
    void onConnectSuccess(void* context, MQTTAsync_successData* response);
    void onConnectFailure(void* context, MQTTAsync_failureData* response);
    void onSubscribeSuccess(void* context, MQTTAsync_successData* response);
    void onSubscribeFailure(void* context, MQTTAsync_failureData* response);
    int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message);
    void onDisconnectSuccess(void* context, MQTTAsync_successData* response);
    void onDisconnectFailure(void* context, MQTTAsync_failureData* response);
    
    
    int main(int argc, char* argv[])
    {
    	MQTTAsync client;
    	MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
    	int rc;
    
    	// 创建NQTT实例
    	if ((rc = MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL))
    			!= MQTTASYNC_SUCCESS)
    	{
    		printf("Failed to create client, return %d\n", rc);
    		MQTTAsync_destroy(&client);
    		return -1;
    	}
    
    
    	// 设置回调函数,参数:句柄、传入内容、连接丢失的回调函数、消息到来的回调函数、传输完毕的回调函数
    	if ((rc = MQTTAsync_setCallbacks(client, &client, NULL, msgarrvd, NULL)) != MQTTASYNC_SUCCESS)
    	{
    		printf("Failed to set callbacks, return %d\n", rc);
    		MQTTAsync_destroy(&client);
    		return -1;
    	}
    	
    
    	// 配置连接参数
    	conn_opts.context = client;		// 提供内容给onConnectxxx回调函数使用
    	conn_opts.username = USERNAME;
    	conn_opts.password = PASSWD;
    	conn_opts.keepAliveInterval = KEEPALIVE;
    	conn_opts.cleansession = 1;
    	conn_opts.automaticReconnect = 1;
    	conn_opts.onSuccess = onConnectSuccess;
    	conn_opts.onFailure = onConnectFailure;
    	if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
    	{
    		printf("Failed to start connect, return %d\n", rc);
    		MQTTAsync_destroy(&client);
    		return -1;
    	}
    
    	while(g_iRunFlag)
    	{
    		// do something
    		sleep(2);
    	}
    
    	// 销毁MQTT客户端
    	MQTTAsync_destroy(&client);
    	return 0;
    }
    
    
    void onConnectSuccess(void* context, MQTTAsync_successData* response)
    {
    	MQTTAsync client = (MQTTAsync)context;
    	MQTTAsync_responseOptions resp_opts = MQTTAsync_responseOptions_initializer;
    	int rc;
    
    	printf("Connect succeeded!\n");
    
    	resp_opts.onSuccess = onSubscribeSuccess;
    	resp_opts.onFailure = onSubscribeFailure;
    	if ((rc = MQTTAsync_subscribe(client, TOPIC, QOS, &resp_opts)) != MQTTASYNC_SUCCESS)
    	{
    		printf("Failed to start subscribe, return %d\n", rc);
    		exit(-1);
    	}
    }
    
    void onConnectFailure(void* context, MQTTAsync_failureData* response)
    {
    	printf("Connect failed, return %d\n", response->code);
    	exit(-1);
    }
    
    void onSubscribeSuccess(void* context, MQTTAsync_successData* response)
    {
    	printf("Subscribe succeeded!\n");
    }
    
    void onSubscribeFailure(void* context, MQTTAsync_failureData* response)
    {
    	printf("Subscribe failed, return %d\n", response->code);
    	exit(-1);
    }
    
    
    int msgarrvd(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
    {
    	MQTTAsync client = (MQTTAsync)context;
    	MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
    	int rc;
    	
        printf("Recieve a message: (topic)%s, (msg)%s\n", topicName, (char*)message->payload);
    	
    	if(0 == strcmp(message->payload, "quit"))
    	{
    		disc_opts.onSuccess = onDisconnectSuccess;
    		disc_opts.onFailure = onDisconnectFailure;
    		if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
    		{
    			printf("Failed to start disconnect, return %d\n", rc);
    			exit(-1);
    		}
    	}
    
        MQTTAsync_freeMessage(&message);
        MQTTAsync_free(topicName);
    
        return 1;
    }
    
    void onDisconnectSuccess(void* context, MQTTAsync_successData* response)
    {
    	printf("Disconnect succeeded!\n");
    	g_iRunFlag = 0;
    }
    
    void onDisconnectFailure(void* context, MQTTAsync_failureData* response)
    {
    	printf("Disconnect failed, rc %d\n", response->code);
    	exit(-1);
    }
    

    3、发布端

    发布端的程序结构也类似于订阅端,既然都是异步方式,都是先定义结构体填充函数指针再启动相应的操作去后台执行。程序中,创建MQTT实例连接服务器之后发布一条消息就断开连接。

    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <unistd.h>
    #include "MQTTAsync.h"
    
    
    #define ADDRESS     	"tcp://127.0.0.1:2020"
    #define CLIENTID    	"Rookie_pub"
    #define USERNAME		"user_test"
    #define PASSWD			"123456"
    #define TOPIC       	"test/common"
    #define PAYLOAD     	"hello"
    #define WILL_TOPIC		"test/will"
    #define WILL_PAYLOAD	"pub_will_message"
    #define QOS         	2
    #define KEEPALIVE		20
    
    int g_iRunFlag = 1;
    
    void onConnectSuccess(void* context, MQTTAsync_successData* response);
    void onConnectFailure(void* context, MQTTAsync_failureData* response);
    void onSendSuccess(void* context, MQTTAsync_successData* response);
    void onSendFailure(void* context, MQTTAsync_failureData* response);
    void onDisconnectSuccess(void* context, MQTTAsync_successData* response);
    void onDisconnectFailure(void* context, MQTTAsync_failureData* response);
    
    
    int main(int argc, char* argv[])
    {
    	MQTTAsync client;
    	MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
    	MQTTAsync_willOptions will_opts = MQTTAsync_willOptions_initializer;
    	
    	int rc;
    
    	// 创建NQTT实例
    	if ((rc = MQTTAsync_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL))
    			!= MQTTASYNC_SUCCESS)
    	{
    		printf("Failed to create client, return %d\n", rc);
    		MQTTAsync_destroy(&client);
    		return -1;
    	}
    
    
    	// 配置连接参数
    	conn_opts.context = client;
    	conn_opts.username = USERNAME;
    	conn_opts.password = PASSWD;
    	conn_opts.keepAliveInterval = KEEPALIVE;
    	conn_opts.cleansession = 1;
    	conn_opts.onSuccess = onConnectSuccess;
    	conn_opts.onFailure = onConnectFailure;
    	conn_opts.will = &will_opts;
    	will_opts.topicName = WILL_TOPIC;
    	will_opts.payload.data = WILL_PAYLOAD;
    	will_opts.payload.len = (int)strlen(WILL_PAYLOAD);
    	if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS)
    	{
    		printf("Failed to start connect, return %d\n", rc);
    		MQTTAsync_destroy(&client);
    		return -1;
    	}
    
    
    	while(g_iRunFlag)
    	{
    		// do something
    		sleep(2);
    	}
    
    	MQTTAsync_destroy(&client);
    
    	return 0;
    }
    
    
    void onConnectSuccess(void* context, MQTTAsync_successData* response)
    {
    	MQTTAsync client = (MQTTAsync)context;
    	MQTTAsync_message mesg_opts = MQTTAsync_message_initializer;
    	MQTTAsync_responseOptions resp_opts = MQTTAsync_responseOptions_initializer;
    
    	int rc;
    
    	printf("Successful connection\n");
    
    	mesg_opts.payload = PAYLOAD;
    	mesg_opts.payloadlen = (int)strlen(PAYLOAD);
    	mesg_opts.qos = QOS;
    	mesg_opts.retained = 0;
    	resp_opts.context = client;
    	resp_opts.onSuccess = onSendSuccess;
    	resp_opts.onFailure = onSendFailure;
    	if ((rc = MQTTAsync_sendMessage(client, TOPIC, &mesg_opts, &resp_opts)) != MQTTASYNC_SUCCESS)
    	{
    		printf("Failed to start sendMessage, return %d\n", rc);
    		exit(-1);
    	}
    }
    
    void onConnectFailure(void* context, MQTTAsync_failureData* response)
    {
    	printf("Connect failed, return %d\n", response->code);
    	exit(-1);
    }
    
    void onSendSuccess(void* context, MQTTAsync_successData* response)
    {
    	MQTTAsync client = (MQTTAsync)context;
    	MQTTAsync_disconnectOptions disc_opts = MQTTAsync_disconnectOptions_initializer;
    	int rc;
    
    	printf("%d messages have been published\n", response->token);
    	sleep(2);
    
    	disc_opts.onSuccess = onDisconnectSuccess;
    	disc_opts.onFailure = onDisconnectFailure;
    	if ((rc = MQTTAsync_disconnect(client, &disc_opts)) != MQTTASYNC_SUCCESS)
    	{
    		printf("Failed to start disconnect, return %d\n", rc);
    		exit(-1);
    	}
    }
    
    void onSendFailure(void* context, MQTTAsync_failureData* response)
    {
    	printf(" Send message failed, return %d\n", response->code);
    	exit(-1);
    }
    
    void onDisconnectSuccess(void* context, MQTTAsync_successData* response)
    {
    	printf("Disconnect succeeded\n");
    	g_iRunFlag = 0;
    }
    
    void onDisconnectFailure(void* context, MQTTAsync_failureData* response)
    {
    	printf("Disconnect failed, return %d\n", response->code);
    	exit(-1);
    }
    

    4、编译

    与同步函数的编译方法类似,差别就在于所依赖的动态库,异步函数使用的是libpaho-mqtt3a.so。

    gcc 002mysub.c -o 002mysub -I/work/system/paho.mqtt.c-master/src/ -lpaho-mqtt3a -L/work/system/paho.mqtt.c-master/build/output/
    gcc 002mypub.c -o 002mypub -I/work/system/paho.mqtt.c-master/src/ -lpaho-mqtt3a -L/work/system/paho.mqtt.c-master/build/output/
    

    5、配置运行环境

    与同步函数的运行环境一样,都是需要先配置动态库的路径,否则无法执行。

    export LD_LIBRARY_PATH=/work/system/paho.mqtt.c-master/build/output
    
    展开全文
  • linuxmqtt库的编译

    千次阅读 2019-04-27 19:00:38
    linuxmqtt库的编译编译环境eclipse/paho.mqtt.c 库下载编译1,使用make编译,创建动态库2,静态库编译,使用Cmake安装库到ubuntu库的简单测试 编译环境 ubuntu 1804 编译库时需要的环境: 这需要OpenSSL库:apt-...
  • 如果你的应用程序调用了MQTTClient_setCallbacks(),则客户端将会进入异步模式,否则会以同步模式运行。 同步模式下,客户端应用程序运行在单个线程上。信息的发布使用MQTTClient_publish()和MQTTClient_...
  • 1、安装软件 yum install gcc-c++ yum install cmake yum install openssl-devel 2、去usr/loca/目录下操作 mkdir software ...tar -xzvf mosquitto-1.4.10.tar.gz ...安装c-areas(支持异步DNS查找的库): wge
  • 文章目录1、编译 paho.mqtt.c1.1 步骤1.2 说明1.3 同步函数2、订阅端2.1 订阅端——MQTTClient_receive阻塞等待方式2.2 订阅端——MQTTClient_setCallbacks回调的异步方式3、发布端4、编译5、配置运行环境 ...
  • “弹性”异步非阻塞MQTT驱动程序。 一种使用便宜的ESP8266模块将MQTT带到缺少WiFi接口的MicroPython平台的方法。 1.“弹性”驱动程序 这是官方驱动程序的替代方法。 已在以下平台上进行了测试。 ESP8266 ESP32 ...
  • 前面两节讲了MQTT的简单介绍、mosquitto服务器端与客户端的两种搭建方式及其简单测试,但那些都是在命令行里面的操作,而我们需要写程序的时候总不能一直都是system函数来调用吧。 上一篇文章里面使用的就是...
  • paho-mqtt3a-异步(MQTTAsync) paho-mqtt3as-与SSL异步(MQTTAsync) paho-mqtt3c-“经典” /同步(MQTTClient) paho-mqtt3cs-“经典” /与SSL同步(MQTTClient) 用法和API 详细的API文档。 也可以通过在...
  • linux系统 二、同样建立文件和文件夹 这个不说了 三开始编程 #include </home/zzl/3rdlib/include/MQTTAsync.h> //换成你自己的安装路径,不然回报错 #include <stdio.h> #include <unistd.h> #...
  • libumqtt:基于libev的轻量级且完全异步MQTT客户端C库
  • Linux下学习用C语言实现MQTT(三)(异步函数) https://blog.csdn.net/qq_43260665/article/details/88541433 原文是执行一次发送一次消息,然后就断开了, 我把它改成了可以连续发送消息的mqtt实现: Talk is ...
  • MQTT学习

    2019-09-15 11:18:39
    1.Linux下学习用C语言实现...3.Linux下学习用C语言实现MQTT(三)(异步函数) 4.Linux下学习用C语言实现MQTT(四)(setCallback回调函数) 5.MQTT协议以及库函数说明 其他 1.不可重入函数,信号量,可重入函数 ...
  • mongoose:Mongoose嵌入式Web服务器库-具有TCPUDP,HTTP,WebSocket,MQTT内置协议,异步DNS解析器和非阻塞API的多协议嵌入式网络库
  • Linux下学习用C语言实现MQTT(一)(同步函数)

    千次阅读 热门讨论 2019-03-12 23:38:56
    先介绍一下MQTTMQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议,有可能成为物联网的重要组成部分。该协议支持所有平台,几乎可以把所有联网物品和外部连接起来,被...
  • 我是用的secureCRT登录的树莓派,要实现MQTT通信,就需要用到许多关于MQTT的函数,这里我用的是Paho.c库,所以首先下载库: 在git下下载paho C库git clone https://github.com/eclipse/paho.mqtt.c.git cd paho.mqtt...
  • 文章目录mqttclient设计与实现方式设计思想API接口MQTT客户端的核心结构mqttclient实现申请一个mqtt客户端...整体采用分层式设计,代码实现采用异步设计方式,降低耦合。 消息的处理使用回调的方式处理:用户指定订
  • 介绍一下常用的MQTT的C函数。MQTT系列函数有很多,在paho-mqtt.c库中的src路径下有大量函数的定义和声明,这里只介绍比较常用的函数解耦提和结构体: 1.结构体MQTTClient 定义:typedef void* MQTTClient; 含义:...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,341
精华内容 536
关键字:

linuxmqtt异步

linux 订阅