-
2021-06-11 00:33:26
关于Mqtt
Mqtt3.1.1中文文档
这是一个客户端服务端架构的发布/订阅模式的消息传输协议;轻巧、开放、简单、规范,易于实现,国内很多关于Mqtt的文章都是讲推送的,其实Mqtt远远不止这个功能,具体详情可自行文档查阅。
场景说明
在Android上,往往具体的业务逻辑要回到Activity中,而Mqtt连接则是建立在service中(考虑到内存资源及退出应用后续操作),同时Mqtt的消息透传回调也是在service中。
通常做法是要写很多类(通常是Boardcast)来实现确保service同Activity组件之间的通信,就像Eclipse paho的Android service一样。
改进方案
本文基于Eclipse的paho框架的java client端,在Android上引入EventBus来做事件分发,简化了代码量,同时方便实现。
以下是我写的一个简单的工具操作类:
public class MqttManager {
// 单例
private static MqttManager mInstance = null;
// 回调
private MqttCallback mCallback;
// Private instance variables
private MqttClient client;
private MqttConnectOptions conOpt;
private boolean clean = true;
private MqttManager() {
mCallback = new MqttCallbackBus();
}
public static MqttManager getInstance() {
if (null == mInstance) {
mInstance = new MqttManager();
}
return mInstance;
}
/** * 释放单例, 及其所引用的资源 */
public static void release() {
try {
if (mInstance != null) {
mInstance.disConnect();
mInstance = null;
}
} catch (Exception e) {
}
}
/**
* 创建Mqtt 连接
*
* @param brokerUrl Mqtt服务器地址(tcp://xxxx:1863)
* @param userName 用户名
* @param password 密码
* @param clientId clientId
* @return
*/
public boolean creatConnect(String brokerUrl, String userName, String password, String clientId) {
boolean flag = false;
String tmpDir = System.getProperty("java.io.tmpdir");
MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir);
try {
// Construct the connection options object that contains connection parameters
// such as cleanSession and LWT
conOpt = new MqttConnectOptions();
conOpt.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
conOpt.setCleanSession(clean);
if (password != null) {
conOpt.setPassword(password.toCharArray());
}
if (userName != null) {
conOpt.setUserName(userName);
}
// Construct an MQTT blocking mode client
client = new MqttClient(brokerUrl, clientId, dataStore);
// Set this wrapper as the callback handler
client.setCallback(mCallback);
flag = doConnect();
} catch (MqttException e) {
Logger.e(e.getMessage());
}
return flag;
}
/**
* 建立连接
*
* @return
*/
public boolean doConnect() {
boolean flag = false;
if (client != null) {
try {
client.connect(conOpt);
Logger.d("Connected to " + client.getServerURI() + " with client ID " + client.getClientId());
flag = true;
} catch (Exception e) {
}
}
return flag;
}
/**
* Publish / send a message to an MQTT server
*
* @param topicName the name of the topic to publish to
* @param qos the quality of service to delivery the message at (0,1,2)
* @param payload the set of bytes to send to the MQTT server
* @return boolean
*/
public boolean publish(String topicName, int qos, byte[] payload) {
boolean flag = false;
if (client != null && client.isConnected()) {
Logger.d("Publishing to topic \"" + topicName + "\" qos " + qos);
// Create and configure a message
MqttMessage message = new MqttMessage(payload); message.setQos(qos);
// Send the message to the server, control is not returned until
// it has been delivered to the server meeting the specified
// quality of service.
try {
client.publish(topicName, message);
flag = true;
} catch (MqttException e) {
}
}
return flag;
}
/**
* Subscribe to a topic on an MQTT server
* Once subscribed this method waits for the messages to arrive from the server
* that match the subscription. It continues listening for messages until the enter key is
* pressed.
*
* @param topicName to subscribe to (can be wild carded)
* @param qos the maximum quality of service to receive messages at for this subscription
* @return boolean
*/
public boolean subscribe(String topicName, int qos) {
boolean flag = false;
if (client != null && client.isConnected()) {
// Subscribe to the requested topic
// The QoS specified is the maximum level that messages will be sent to the client at.
// For instance if QoS 1 is specified, any messages originally published at QoS 2 will
// be downgraded to 1 when delivering to the client but messages published at 1 and 0
// will be received at the same level they were published at.
Logger.d("Subscribing to topic \"" + topicName + "\" qos " + qos);
try {
client.subscribe(topicName, qos);
flag = true;
} catch (MqttException e) {
}
}
return flag;
}
/**
* 取消连接
*
* @throws MqttException
*/
public void disConnect() throws MqttException {
if (client != null && client.isConnected()) {
client.disconnect();
}
}
}
在Mqtt Callback方法中发送Evenet事件:
public class MqttCallbackBus implements MqttCallback {
@Override
public void connectionLost(Throwable cause) {
Logger.e(cause.getMessage());
}
@Override
public void messageArrived(String topic, MqttMessage message) {
Logger.d(topic + "====" + message.toString());
EventBus.getDefault().post(message);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
}
问题
由于EventBus不支持跨进程通信,所以当在service是独立进程时,无法在主进程中接收到EventBus分发的事件。
可以考虑通过广播做中转,即在Mqtt的回调中发送一个广播,在广播中进行事件分发。
连接等操作是较费时的,为了避免ANR,应在子线程中完成操作,也可以考虑使用RxJava来简化。
Demo
更多相关内容 -
java MQTT server ,MQTT client 直接使用java实现,快速连接物联网
2021-09-11 16:07:01支持 MQTT client 客户端。 支持 MQTT server 服务端。 支持 MQTT 遗嘱消息。 支持 MQTT 保留消息。 支持自定义消息(mq)处理转发实现集群。 MQTT 客户端 阿里云 mqtt 连接 demo。 支持 GraalVM 编译成本机... -
RT-Thread之mqttclient软件包
2021-01-06 12:17:57关于mqttclient软件包 一个基于socket API之上的跨平台MQTT客户端 基于socket API的MQTT客户端,拥有非常简洁的API接口,以极少的资源实现QOS2的服务质量,并且无缝衔接了mbedtls加密库。此仓库是专门为RT-Thread做... -
MQTTClient
2020-09-21 10:31:07mqtt 客户端MQTT调试工具,用于MQTT协议联调联试,使用方便,画面简洁清晰,需要键入地址及端口号,ID/用户名及密码后连接,能够用来进行十六进制显示和ASCII显示,可以订阅MQTT topic! -
mqttclient依赖包
2018-05-15 14:20:17mqtt客户端测试需要依赖包:org.eclipse.paho.client.mqttv3-1.0.2.jar -
mqtt client 代码
2018-12-06 10:32:27简单实现了mqtt 发布端和订阅端的功能 mqtt_pub 发布端 mqtt_sub 订阅端 -
基于Android的MQTT客户端代码MQTTClient
2017-06-08 17:08:421、自动连接(重连)服务器,监测是否与服务器连接 2、能够接收(订阅)和发送(发布)消息 3、可以先看app下面的readme说明 -
mqttClient最新的源码
2017-11-23 16:32:17普遍的用于物联网行业的mqttClient源码,可以直接放到工程里编译 -
MQTTClient:具有 TLS 支持的 libmosquitto 的 Objective-C 包装器
2021-06-09 16:52:49MQTTClient 是 libmosquitto 的 Objective-C 包装器。 MQTTClient 支持身份验证和 TLS 加密。 MQTTClient 已经用 libmosquitto 1.4.1 进行了测试。 在此处下载最新的 libmosquitto: : 带有 TLS 的 MQTTClient 将 ... -
mqtt client测试工具
2022-03-18 14:20:46mqtt client测试工具 -
MqttClient.zip
2020-07-03 11:11:38ios mqttcleint 使用,连接,订阅,push, 取消订阅,博客地址: https://blog.csdn.net/dreams_deng/article/details/107100859 -
.net开发的mqttclient客户端工具
2021-09-13 11:31:26.net开发的mqttclient客户端工具 -
Kepware IOT gateway使用教程 - MQTT Client
2020-08-14 20:02:08Kepware中IOT gateway模块功能下MQTT具体使用方法,过程很详细,有详细的图形指引,对研究Kepware通过MQTT和其他Client数据交互很有价值,希望对初入IOT的朋友有很好的帮助 -
MQTT Client
2020-08-30 16:48:11开篇 最近在使用MQTTClient实现一个类似于消息推送的服务,说实话,真没怎么使用过MQTTClient,也不知道这是个啥? 上网了解了一下,...最近在使用MQTTClient实现一个类似于消息推送的服务,说实话,真没怎么使用过MQTTClient,也不知道这是个啥?
上网了解了一下,发现MQTT功能挺强(牛)大(逼),既能做即时通讯,又能搞消息推送等功能。
这里我使用的是消息推送服务,通过和服务器端协商,终于能够与服务器连接,并且能够收发消息了。 所以,简单总结了一下,有了这篇文章。
MQTT介绍
MQTT
MQTT基于订阅者模型架构,客户端如果互相通信,必须在同一订阅主题下,即都订阅了同一个topic,客户端之间是没办法直接通讯的。订阅模型显而易见的好处是群发消息的话只需要发布到topic,所有订阅了这个topic的客户端就可以接收到消息了。
发送消息必须发送到某个topic,重点说明的是不管客户端是否订阅了该topic都可以向topic发送了消息,还有如果客户端订阅了该主题,那么自己发送的消息也会接收到。MQTT特点
- 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。这一点很类似于XMPP,但是MQTT的信息冗余远小于XMPP
- 对负载内容屏蔽的消息传输
使用TCP/IP提供网络连接。主流的MQTT是基于TCP连接进行数据推送的,但是同样有基于UDP的版本,叫做MQTT-SN。这两种版本由于基于不同的连接方式,优缺点自然也就各有不同了 - 三种消息传输方式QoS:
- 0代表“至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
- 1代表“至少一次”,确保消息到达,但消息重复可能会发生。
- 2代表“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。 (备注:由于服务端采用Mosca实现,Mosca目前只支持到QoS 1)
如果发送的是临时的消息,例如给某topic所有在线的设备发送一条消息,丢失的话也无所谓,0就可以了(客户端登录的时候要指明支持的QoS级别,同时发送消息的时候也要指明这条消息支持的QoS级别),如果需要客户端保证能接收消息,需要指定QoS为1,如果同时需要加入客户端不在线也要能接收到消息,那么客户端登录的时候要指定session的有效性,接收离线消息需要指定服务端要保留客户端的session状态。
具体MQTT的详细介绍可以戳这里 https://baike.baidu.com/item/MQTT/3618851?fr=aladdin
MQTTClient的使用
iOS 环境下开发 MQTT 客户端程序,一般依赖稳定的第三方 FrameWork,由于涉及网络数据传输,建议选择
Object-c
原生的框架,比如MQTT-Client-Framework
。
现在一般常用的有两个MQTT
1) MQTTKit
2) MQTTClient
不过MQTTKit貌似很长时间不维护了, 使用较多的是MQTTClient。- 集成MQTTClient
MQTT-Client-Framework
- 用cocopod直接, pod ‘MQTTClient’
- GitHub下载,把相对应的文件夹拖进工程即可
MQTT-Client-FrameWork
包提供的客户端类有MQTTSession
和MQTTSessionManager
,建议使用后者维持静态资源,而且已经封装好自动重连等逻辑。初始化时需要传入相关的网络参数。我使用的是第二种, 引入
#import "MQTTClient.h"
#import "MQTTSessionManager.h"
头文件, 遵循MQTTSessionManagerDelegate
协议使用步骤:
- 建立连接
和服务器端确定好MQTT服务器地址,端口号, 用户名, 密码, 订阅主题topic/** host: 服务器地址 port: 服务器端口 tls: 是否使用tls协议,mosca是支持tls的,如果使用了要设置成true keepalive: 心跳时间,单位秒,每隔固定时间发送心跳包, 心跳间隔不得大于120s clean: session是否清除,这个需要注意,如果是false,代表保持登录,如果客户端离线了再次登录就可以接收到离线消息 auth: 是否使用登录验证 user: 用户名 pass: 密码 willTopic: 订阅主题 willMsg: 自定义的离线消息 willQos: 接收离线消息的级别 clientId: 客户端id,需要特别指出的是这个id需要全局唯一,因为服务端是根据这个来区分不同的客户端的,默认情况下一个id登录后,假如有另外的连接以这个id登录,上一个连接会被踢下线, 我使用的设备UUID */ NSString *clientId = [UIDevice currentDevice].identifierForVendor.UUIDString; MQTTSessionManager *sessionManager = [[MQTTSessionManager alloc] init]; [sessionManager connectTo:@"121.199.19.126" port:1883 tls:false keepalive:60 //心跳间隔不得大于120s clean:true auth:true user:@"guest" pass:@"guest" will:false willTopic:nil willMsg:nil willQos:0 willRetainFlag:false withClientId:clientId]; sessionManager.delegate = self; self.sessionManager = sessionManager;
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 监控连接状态
连接当前状态,添加对应的回调接口,可以进行相关的业务逻辑处理。
// 添加监听状态观察者 [self.sessionManager addObserver:self forKeyPath:@"state" options:NSKeyValueObservingOptionInitial | NSKeyValueObservingOptionNew context:nil];
- 1
- 2
- 3
- 4
- 5
监听连接状态,进行相应处理。
// 监听当前连接状态 - (void)observeValueForKeyPath:(NSString *)keyPath ofObject:(id)object change:(NSDictionary *)change context:(void *)context { switch (self.sessionManager.state) { case MQTTSessionManagerStateClosed: NSLog(@"连接已经关闭"); break; case MQTTSessionManagerStateClosing: NSLog(@"连接正在关闭"); break; case MQTTSessionManagerStateConnected: NSLog(@"已经连接"); break; case MQTTSessionManagerStateConnecting: NSLog(@"正在连接中"); break; case MQTTSessionManagerStateError: { NSString *errorCode = self.sessionManager.lastErrorCode.localizedDescription; NSLog(@"连接异常 ----- %@",errorCode); } break; case MQTTSessionManagerStateStarting: NSLog(@"开始连接"); break; default: break; } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 接收消息
实现MQTTSessionManagerDelegate
代理方法,处理数据。
// 获取服务器返回数据 - (void)handleMessage:(NSData *)data onTopic:(NSString *)topic retained:(BOOL)retained { NSLog(@"------------->>%@",topic); NSString *dataString = [[NSString alloc] initWithData:data encoding:NSUTF8StringEncoding]; NSLog(@"%@",dataString); // 进行消息处理 }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 订阅和发送消息
// 订阅主题 NSDictionary类型,Object 为 QoS,key 为 Topic self.sessionManager.subscriptions = [NSDictionary dictionaryWithObject:[NSNumber numberWithInt:MQTTQosLevelExactlyOnce] forKey:@"hello"]; // 发送消息 返回值msgid大于0代表发送成功 NSString *msg = @"hahaha"; UInt16 msgid = [self.sessionManager sendData:[msg dataUsingEncoding:NSUTF8StringEncoding] //要发送的消息体 topic:@"hello" //要往哪个topic发送消息 qos:0 //消息级别 retain:false];
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
如果是使用阿里云的服务器替代自己的服务器,需要在阿里云控制台申请
Topic
,Group ID
等资源。
在建立连接时,传入的参数值也会有所改变,user
和pass
由于服务端需要对客户端进行鉴权,因此需要传入合法的user
和pass
。user
设置为当前用户的AccessKey
,pass
则设置为 MQTT 客户端GroupID
的签名字符串,签名计算方式是使用SecretKey
对GroupID
做HmacSHA1
散列加密。self.manager = [[MQTTSessionManager alloc] init]; self.manager.delegate = self; self.manager.subscriptions = [NSDictionary dictionaryWithObject:[NSNumber numberWithInt:self.qos] forKey:[NSString stringWithFormat:@"%@/#", self.rootTopic]]; //password的计算方式是,使用secretkey对groupId做hmac签名算法,具体实现参考macSignWithText方法 NSString *passWord = [[self class] macSignWithText:self.groupId secretKey:self.secretKey]; [self.manager connectTo:self.mqttSettings[@"host"] port:[self.mqttSettings[@"port"] intValue] tls:[self.mqttSettings[@"tls"] boolValue] keepalive:60 //心跳间隔不得大于120s clean:true auth:true user:self.accessKey pass:passWord will:false willTopic:nil willMsg:nil willQos:0 willRetainFlag:false withClientId:self.clientId];
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
使用
SecretKey
对GroupID
做HmacSHA1
散列加密+ (NSString *)macSignWithText:(NSString *)text secretKey:(NSString *)secretKey { NSData *saltData = [secretKey dataUsingEncoding:NSUTF8StringEncoding]; NSData *paramData = [text dataUsingEncoding:NSUTF8StringEncoding]; NSMutableData* hash = [NSMutableData dataWithLength:CC_SHA1_DIGEST_LENGTH ]; CCHmac(kCCHmacAlgSHA1, saltData.bytes, saltData.length, paramData.bytes, paramData.length, hash.mutableBytes); NSString *base64Hash = [hash base64EncodedStringWithOptions:0]; return base64Hash; }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
具体介绍请戳下面帮助链接
MQTT接入环境配置
阿里云接入MTQQ示例
申请MQ资源参考
-
mqttClient
2020-05-29 16:03:49通过eclipse.paho连接mqtt服务器,实现单主题,多主题订阅,并包含断线重连,可以满足业务基本需要 -
ESP32的MQTT Client
2019-01-12 15:59:48红旭出品,欢迎大家加入,相互交流发展。MQTT是最常用的物联网通信方式红旭开源之ESP32开发指南V11 完成元2019年1月11日,由一群不愿透漏姓名的吃瓜群众编写 -
使用MQTTClient.h库进行mqtt通讯【C语言】
2022-04-01 21:40:12先放源代码,还没开始看,等看完再来写注释及讲解。 源代码 #include <stdio.h>...#include "MQTTClient.h" #define MQTT_Uri "localhost" // MQTT服务器的地址和端口号 #define ClientId "ubuntu16MQTT
MQTT是一种轻量级物联网消息推送协议,使用MQTT开发物联网设备将会十分简单方便。
关于MQTT服务器的安装可以看我的这篇文章:搭建MQTT服务器
安装paho.mqtt.c库
安装依赖
sudo apt-get install libssl-dev
安装
git clone https://github.com/eclipse/paho.mqtt.c.git cd paho.mqtt.c make sudo make install
编译及运行
将下面的源代码放到test.c文件。
编译
gcc -o test test.c -l paho-mqtt3c
gcc编译引用的API接口库:参考文章
paho-mqtt3a :异步API接口的库,API函数都是以MQTTAsync开头,API函数定义在MQTTAsync.h。
paho-mqtt3as :异步API接口的库,API函数都是以MQTTAsync开头,API函数定义在MQTTAsync.h,这个库默认带有TLS加密功能,同时也需要openssl库支持,才能使用这个库。
paho-mqtt3c :同步API接口的库,API函数都是以MQTTClient开头,API函数定义在MQTTClient.h。
paho-mqtt3cs :同步API接口的库,API函数都是以MQTTClient开头,API函数定义在MQTTClient.h,这个库默认带有TLS加密功能,同时也需要openssl库支持,才能使用这个库。
运行
./test
源代码
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include "MQTTClient.h" #define MQTT_Uri "localhost" // MQTT服务器的地址和端口号 #define ClientId "ubuntu16" // ClientId需要唯一 #define UserName "pi" // 用户名 #define PassWord "12345678" // 用户名对应的密码 // 失去连接回调函数 void connect_lost(void *context, char *cause) { printf("Connection lost,The reason: %s \n",cause); } // 收到主题信息回调函数 int message_arrived(void *context, char *topicName, int topicLen, MQTTClient_message *message) { printf("Receive topic: %s, message data: \n", topicName); printf("%.*s\n", message->payloadlen, (char*)message->payload); MQTTClient_freeMessage(&message); MQTTClient_free(topicName); return 1; } // 主题发布成功回调函数 void delivery_complete(void *context, MQTTClient_deliveryToken dt) { printf("publish topic success,token = %d \n", dt); } int main(int argc, char* argv[]) { // 1、定义一个MQTT客户端结构体指针 MQTTClient client; // 2、创建一个MQTT客户端 int rc; if ((rc = MQTTClient_create(&client, MQTT_Uri, ClientId, MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS) { printf("Failed to create client, return code %d\n", rc); exit(EXIT_FAILURE); goto exit; } // 3、创建一个MQTT连接配置结构体,并配置其参数 MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; conn_opts.username = UserName; // 用户名 conn_opts.password = PassWord; // 用户名对应的密码 conn_opts.keepAliveInterval = 60; // 心跳时间 conn_opts.cleansession = 1; // 清除会话 // 4、设置MQTT连接时的回调函数 if ((rc = MQTTClient_setCallbacks(client, NULL, connect_lost, message_arrived, delivery_complete)) != MQTTCLIENT_SUCCESS) { printf("Failed to set callbacks, return code %d\n", rc); rc = EXIT_FAILURE; goto destroy_exit; } // 5、开始连接到MQTT服务器 if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) { printf("Failed to connect, return code %d\n", rc); exit(EXIT_FAILURE); goto destroy_exit; } // 6、定义一个主题消息存储结构体 MQTTClient_message pubmsg = MQTTClient_message_initializer; char mag_data[] = "I am Ubuntu16."; pubmsg.payload = mag_data; pubmsg.payloadlen = (int)strlen(mag_data); pubmsg.qos = 1; // qos等级为1 pubmsg.retained = 0; // 服务器不保留消息 MQTTClient_deliveryToken token; // 标记MQTT消息的值,用来检查消息是否发送成功 // 7、发布主题信息 if ((rc = MQTTClient_publishMessage(client, "pubtest", &pubmsg, &token)) != MQTTCLIENT_SUCCESS) { printf("Failed to publish message, return code %d\n", rc); exit(EXIT_FAILURE); } // 8、订阅主题 if ((rc = MQTTClient_subscribe(client, "subtest", 1)) != MQTTCLIENT_SUCCESS) { printf("Failed to subscribe, return code %d\n", rc); rc = EXIT_FAILURE; } // 9、等待输入'Q'或'q'退出 printf("Press Q or q + <Enter> to quit\n\n"); int ch; do { ch = getchar(); } while (ch!='Q' && ch != 'q'); if ((rc = MQTTClient_unsubscribe(client, "ubuntu16_subscribe")) != MQTTCLIENT_SUCCESS) { printf("Failed to unsubscribe, return code %d\n", rc); rc = EXIT_FAILURE; } // 10、断开连接 if ((rc = MQTTClient_disconnect(client, 10000)) != MQTTCLIENT_SUCCESS) { printf("Failed to disconnect, return code %d\n", rc); rc = EXIT_FAILURE; } destroy_exit: MQTTClient_destroy(&client); exit: return rc; }
-
c# mqtt client库
2017-05-10 20:12:55implementation of the client half of the MQTT messaging protocol. Please visit: http://github.com/stevenlovegrove/MqttDotNet http://www.doc.ic.ac.uk/~sl203/ for information about MqttDotNet Visit: ... -
paho实现MQTTClient发布消息
2020-10-25 12:09:44paho实现MQTTClient发布消息 接下来会用paho开源的一个项目,实现mqtt客户端发布消息,此文主要参考MQTT Client library for C,Paho给出的创建一个客户端有如下类似的步骤: 1、安装 //从github上下载项目 git ...paho实现MQTTClient发布消息
接下来会用paho开源的一个项目,实现mqtt客户端发布消息,此文主要参考MQTT Client library for C,Paho给出的创建一个客户端有如下类似的步骤:
1、安装
//从github上下载项目 git clone https://github.com/eclipse/paho.mqtt.c.git //进入文件夹 cd paho.mqtt.c //编译 make //安装 sudo make install
2、目录介绍
src:源文件、库文件目录 build:编译过后的动态库及执行文件
2.1、src目录
src中有许多源文件,有发布、订阅、同步、异步文件。
2.2、build目录
build文件是编译过后的执行文件,也是同步、异步、发布、订阅消息等功能
3、实现过程
1.创建一个客户端对象; 2.设置连接MQTT服务器的选项; 3.如果多线程(异步模式)操作被使用则设置回调函数(详见 Asynchronous >vs synchronous client applications); 4.订阅客户端需要接收的任意话题; 5.重复以下操作直到结束: a.发布客户端需要的任意信息; b.处理所有接收到的信息; 6.断开客户端连接; 7.释放客户端使用的所有内存。
4、文件操作
我们直接使用paho自带文件,首先修改src中MQTTClient_publish.c源代码。
4.1、修改信息
//mqtt服务器地址 #define ADDRESS “tcp://m2m.eclipse.org:1883” //客户端号 #define CLIENTID “ExampleClientPub” //主题 #define TOPIC “MQTT Examples” //消息 #define PAYLOAD “Hello World!”
4.2、添加信息
需要添加服务器中用户名和密码
char *username= "test"; //添加的用户名 char *password = "test"; //添加的密码
将其写入客户端选项
conn_opts.username = username; //将用户名写入连接选项中 conn_opts.password = password; //将密码写入连接选项中
5、源码
源码我做了详尽的解释,其中有许多函数需要自己去上面文件自己查看,下面这是同步发布,不是异步,异步的代码也有,大家可以去看上面的网站
/******************************************************************************* * Copyright (c) 2012, 2020 IBM Corp. * * All rights reserved. This program and the accompanying materials * are made available under the terms of the Eclipse Public License v2.0 * and Eclipse Distribution License v1.0 which accompany this distribution. * * The Eclipse Public License is available at * https://www.eclipse.org/legal/epl-2.0/ * and the Eclipse Distribution License is available at * http://www.eclipse.org/org/documents/edl-v10.php. * * Contributors: * Ian Craggs - initial contribution *******************************************************************************/ #include <stdio.h> #include <stdlib.h> #include <string.h> #include "MQTTClient.h" #define ADDRESS "59.110.42.24:1883" #define CLIENTID "0bd981c5-a055-4196-8b7f-efb9f7a4d6ac" #define TOPIC "test" #define PAYLOAD "Hello World!" #define QOS 1 #define TIMEOUT 10000L int main(int argc, char* argv[]) { //声明mqtt客户端 MQTTClient client; //初始化客户端选项 conn_opts MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer; //消息初始化 pubmsg MQTTClient_message pubmsg = MQTTClient_message_initializer; MQTTClient_deliveryToken token; int rc; char *username = "test"; char *password = "test"; if ((rc = MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS) { printf("Failed to create client, return code %d\n", rc); exit(EXIT_FAILURE); } //保持心跳20 conn_opts.keepAliveInterval = 20; //清理会话 conn_opts.cleansession = 1; //客户端的用户名和密码 conn_opts.username = username; conn_opts.password = password; //创建连接 if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS) { printf("Failed to connect, return code %d\n", rc); exit(EXIT_FAILURE); } //消息负载(内容) pubmsg.payload = PAYLOAD; //消息长度 pubmsg.payloadlen = (int)strlen(PAYLOAD); //消息质量分为0:不重要的消息比如温度,可以多次上传丢失一次没事。1:可能会丢失1次 2:永远不会丢失 pubmsg.qos = QOS; //有true和false 判断消息是否保留 pubmsg.retained = 0; //发布消息 token是消息发布后,传递令牌将返回客户端检查令牌是否已成功传递到其目的地 if ((rc = MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token)) != MQTTCLIENT_SUCCESS) { printf("Failed to publish message, return code %d\n", rc); exit(EXIT_FAILURE); } printf("Waiting for up to %d seconds for publication of %s\n" "on topic %s for client with ClientID: %s\n", (int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID); //阻塞函数,等待消息发布 rc = MQTTClient_waitForCompletion(client, token, TIMEOUT); printf("Message with delivery token %d delivered\n", token); if ((rc = MQTTClient_disconnect(client, 10000)) != MQTTCLIENT_SUCCESS) printf("Failed to disconnect, return code %d\n", rc); //断开连接 MQTTClient_destroy(&client); return rc; }
6、实现效果
服务器中发布
mqtt测试软件订阅的消息就能接受到
-
一个高性能、高稳定性的跨平台MQTT客户端——mqttclient简介与使用
2020-06-14 11:12:04【开源】一个高性能、高稳定性的跨平台MQTT客户端——mqttclient简介与使用 -
KEPServerEX IoT Gateway REST and MQTT Client Agent Demo with Node-RED.docx
2021-03-27 15:41:50KEPServerEX IoT Gateway -
windows下基于QT5.7的mqttClient测试上位机软件源码
2019-01-17 09:43:27mqttclient上位机调用qtmqtt库实现mqtt客户端的远程订阅及发布功能,在调试mqtt通信协议时是非常方便的一个工具,这里用QT实现的源码,可以参考 -
mqtt-client-1.3-API文档-中文版.zip
2022-04-08 19:05:35赠送jar包:mqtt-client-1.3.jar; 赠送原API文档:mqtt-client-1.3-javadoc.jar; 赠送源代码:mqtt-client-1.3-sources.jar; 包含翻译后的API文档:mqtt-client-1.3-javadoc-API文档-中文(简体)版.zip 对应... -
MQTTClient-QTDEMO.rar
2019-06-05 18:35:56MQTT客户端源码+QTDEMO源码,可直接用于QT服务测试,纯源码,无封装 -
MQTTClient_horse3v5_PublicRights_MQ_consistf9z_MQTTClient_
2021-10-02 10:52:35cnmake# All rights reserved. This program and the accompanying materials# are made available under the terms of the Eclipse Public License v1.0# and Eclipse Distribution License v1.0 which accompany ... -
paho mqtt调用MQTTClient_connect失败,返回MQTTCLIENT_BAD_STRUCTURE(-8)
2020-01-19 14:08:56使用编译好的库文件,从githup上下载头文件,运行时MQTTClient_connect调用失败,返回MQTTCLIENT_BAD_STRUCTURE; MQTTCLIENT_BAD_STRUCTURE在MQTTClient.h中是struct_id或struct_version 不正确,其定义为: /** ... -
MQTT Client redis mysql
2022-01-15 20:03:28建立一个mqtt客户端连接,订阅主题,然后处理数据异步保存到数据库,redis,和mysql同时维护。这个应该是个趋势。 -
mqtt.MqttClient
2018-07-11 08:31:311.消息处理handler,实现MqttSimpleCallback,也可以对高级回调接口实现...package cn.smartslim.mqtt.demo.wmqtt; import com.ibm.mqtt.MqttSimpleCallback; //简单回调函数,处理server接收到... -
AndroidMqttClient.apk
2018-02-05 15:39:18自己编写的一款MQTT功能的android客户端,界面清晰,涵盖mqtt的全部功能。软件的简单介绍可查看我发表的一篇博客简介。