精华内容
下载资源
问答
  • MQTT源码解析

    2020-03-20 11:10:59
    一、MQTT的创建和connect流程 1.Android端实现mqtt,首先会new一个MqttAndroidClient,并传入需要的参数。 首先,MqttAndroidClient是如何创建的呢? public MqttAndroidClient(Context context, String ...

     

    一、MQTT的创建和connect流程

    1.Android端实现mqtt,首先会new一个MqttAndroidClient,并传入需要的参数。

    首先,MqttAndroidClient是如何创建的呢?

    public MqttAndroidClient(Context context, String serverURI,
    			String clientId, MqttClientPersistence persistence, Ack ackType) {
    		myContext = context;
    		this.serverURI = serverURI;
    		this.clientId = clientId;
    		this.persistence = persistence;
    		messageAck = ackType;
    	}

    MqttAndroidClient本身是继承Broadcast的子类,他的构造器需要传入上面的参数,必须传入的为

    public MqttAndroidClient(Context context, String serverURI,
    			String clientId) {
    		this(context, serverURI, clientId, null, Ack.AUTO_ACK);
    	}

    2.connect的核心方法:

    @Override
    	public IMqttToken connect(MqttConnectOptions options, Object userContext,
    			IMqttActionListener callback) throws MqttException {
    
    		IMqttToken token = new MqttTokenAndroid(this, userContext,
    				callback);
    
    		connectOptions = options;
    		connectToken = token;
    
    		/*
    		 * The actual connection depends on the service, which we start and bind
    		 * to here, but which we can't actually use until the serviceConnection
    		 * onServiceConnected() method has run (asynchronously), so the
    		 * connection itself takes place in the onServiceConnected() method
    		 */
    		if (mqttService == null) { // First time - must bind to the service
    			Intent serviceStartIntent = new Intent();
    			serviceStartIntent.setClassName(myContext, SERVICE_NAME);
    			Object service = myContext.startService(serviceStartIntent);
    			if (service == null) {
    				IMqttActionListener listener = token.getActionCallback();
    				if (listener != null) {
    					listener.onFailure(token, new RuntimeException(
    							"cannot start service " + SERVICE_NAME));
    				}
    			}
    
    			// We bind with BIND_SERVICE_FLAG (0), leaving us the manage the lifecycle
    			// until the last time it is stopped by a call to stopService()
    			myContext.bindService(serviceStartIntent, serviceConnection,
    					Context.BIND_AUTO_CREATE);
    
    			if (!receiverRegistered) registerReceiver(this);
    		}
    		else {
    			pool.execute(new Runnable() {
    
    				@Override
    				public void run() {
    					doConnect();
    					
    					//Register receiver to show shoulder tap.
    					if (!receiverRegistered) registerReceiver(MqttAndroidClient.this);
    				}
    
    			});
    		}
    
    		return token;
    	}

    返回一个IMqttToken对象,这里首先new一个MqttTokenAndroid对象,然后去判断是否存在MqttService实例。

    没有的话会创建一个,如果创建失败直接调用IMqttActionListener的onFailure回调。

    成功的话绑定一个bindService,传入ServiceConnection参数对象。

    而ServiceConnection的成功回调中有个doConnect()方法,里面会调用MqttService的connect方法从而建立Mqtt的连接。

    而在MqttService的connect方法里面,会创建一个MqttConnection对象,调用他的connect方法。

    而MqttConnection的connect方法里面会判断是否有MqttAsyncClient实例,如果没有会创建他的实例并调用他的connect方法

    // if myClient is null, then create a new connection
    			else {
    				alarmPingSender = new AlarmPingSender(service);
    				myClient = new MqttAsyncClient(serverURI, clientId,
    						persistence, alarmPingSender);
    				myClient.setCallback(this);
    
    				service.traceDebug(TAG,"Do Real connect!");
    				setConnectingState(true);
    				myClient.connect(connectOptions, invocationContext, listener);

    而在MqttAsyncClient的connect方法里面,会调用ConnectActionListener的connect方法;然后调用了ClientCommms的connect方法。

    /**
    	 * Sends a connect message and waits for an ACK or NACK.
    	 * Connecting is a special case which will also start up the
    	 * network connection, receive thread, and keep alive thread.
    	 * @param options The {@link MqttConnectOptions} for the connection
    	 * @param token The {@link MqttToken} to track the connection
    	 * @throws MqttException if an error occurs when connecting
    	 */
    	public void connect(MqttConnectOptions options, MqttToken token) throws MqttException {
    		final String methodName = "connect";
    		synchronized (conLock) {
    			if (isDisconnected() && !closePending) {
    				//@TRACE 214=state=CONNECTING
    				log.fine(CLASS_NAME,methodName,"214");
    
    				conState = CONNECTING;
    
    				conOptions = options;
    
                    MqttConnect connect = new MqttConnect(client.getClientId(),
                            conOptions.getMqttVersion(),
                            conOptions.isCleanSession(),
                            conOptions.getKeepAliveInterval(),
                            conOptions.getUserName(),
                            conOptions.getPassword(),
                            conOptions.getWillMessage(),
                            conOptions.getWillDestination());
    
                    this.clientState.setKeepAliveSecs(conOptions.getKeepAliveInterval());
                    this.clientState.setCleanSession(conOptions.isCleanSession());
                    this.clientState.setMaxInflight(conOptions.getMaxInflight());
    
    				tokenStore.open();
    				ConnectBG conbg = new ConnectBG(this, token, connect, executorService);
    				conbg.start();
    			}
    			else {
    				// @TRACE 207=connect failed: not disconnected {0}
    				log.fine(CLASS_NAME,methodName,"207", new Object[] {new Byte(conState)});
    				if (isClosed() || closePending) {
    					throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED);
    				} else if (isConnecting()) {
    					throw new MqttException(MqttException.REASON_CODE_CONNECT_IN_PROGRESS);
    				} else if (isDisconnecting()) {
    					throw new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING);
    				} else {
    					throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED);
    				}
    			}
    		}
    	}

    而ClientComms的connect会调用内部类,名为ConnectBG的一个Thread线程子类,后面就是按照Mqtt的报文格式进行发送和接收message的流程了。

    而启动的线程属于之前在MqttAsyncAndroid里创建的线程池的线程,受其管理。

    发送用一个CommsSender的线程管理,接收用一个CommsReceiver的线程管理,锁用一个conLock对象来管理,来确保同时只能进行单一操作。

    Mqtt的报文格式放在MqttWireMessage里面。

    简单看待的话:Mqtt其实就是实现了一种规定协议的通讯方式,通过一个叫MqttService的服务来维护,用Thread线程池来管理通讯。而他的功能只要包括:connect连接,publish订阅,topic主题和disconnect断开连接和close关闭。

    我们普通开发者调用的话主要是依赖MqttAndroidClient这个类里面的方法,而Mqtt实现的主要方法是MqttAsyncClient来控制整个流程。

    需要注意的一点是:

    MqttAndroidClient是一个广播,在connect的时候会注册,但是停止使用的时候理论上需要调用MqttAndroidClient的方法:

    /**
    	 * Unregister receiver which receives intent from MqttService avoids
    	 * IntentReceiver leaks.
    	 */
    	public void unregisterResources(){
    		if(myContext != null && receiverRegistered){
    			synchronized (MqttAndroidClient.this) {
    				LocalBroadcastManager.getInstance(myContext).unregisterReceiver(this);
    				receiverRegistered = false;
    			}
    			if(bindedService){
    				try{
    					myContext.unbindService(serviceConnection);
    					bindedService = false;
    				}catch(IllegalArgumentException e){
    					//Ignore unbind issue.
    				}
    			}
    		}
    	}

    来进行解除注册,防止内存泄漏问题。

    但是你如果直接使用的是MqttClient的话,就不需要考虑这个问题。但是MqttClient的话并没有MqttService的支持,所以你需要考虑后台保活的话,使用MqttAndroidClient就不需要考虑这个问题,Android端的封装会使用起来更加简单。

     

    二、MqttCallbackExtended的回调

    当设置Mqtt自动断线重连的时候,需要调用MqttCallbackExtended回调接口,因为该回调接口当回调成功的时候,回调一个方法connectComplete,而该方法的调用位置是在ConnectActionListener的onSuccess()方法中。

    if(mqttCallbackExtended != null){
        	String serverURI = comms.getNetworkModules()[comms.getNetworkModuleIndex()].getServerURI();
        	mqttCallbackExtended.connectComplete(reconnect, serverURI);
        }

    而自动重连实现的方法为:

    在MqttAsyncClient类中有个内部类:MqttReconnectCallback,并实现了MqttCallbackextended接口

    class MqttReconnectCallback implements MqttCallbackExtended {
    
    		final boolean automaticReconnect;
    
    		MqttReconnectCallback(boolean isAutomaticReconnect) {
    			automaticReconnect = isAutomaticReconnect;
    		}
    
    		public void connectionLost(Throwable cause) {
    			if (automaticReconnect) {
    				// Automatic reconnect is set so make sure comms is in resting
    				// state
    				comms.setRestingState(true);
    				reconnecting = true;
    				startReconnectCycle();
    			}
    		}
    
    		public void messageArrived(String topic, MqttMessage message) throws Exception {
    		}
    
    		public void deliveryComplete(IMqttDeliveryToken token) {
    		}
    
    		public void connectComplete(boolean reconnect, String serverURI) {
    		}
    
    	}

    当断开连接之后,会调用connectionLost方法的startreconnectCycle()方法

    	private void startReconnectCycle() {
    		String methodName = "startReconnectCycle";
    		// @Trace 503=Start reconnect timer for client: {0}, delay: {1}
    		log.fine(CLASS_NAME, methodName, "503", new Object[] { this.clientId, new Long(reconnectDelay) });
    		reconnectTimer = new Timer("MQTT Reconnect: " + clientId);
    		reconnectTimer.schedule(new ReconnectTask(), reconnectDelay);
    	}

    会启动一个内部类ReconnectTask

    	private class ReconnectTask extends TimerTask {
    		private static final String methodName = "ReconnectTask.run";
    
    		public void run() {
    			// @Trace 506=Triggering Automatic Reconnect attempt.
    			log.fine(CLASS_NAME, methodName, "506");
    			attemptReconnect();
    		}
    	}
    	private void attemptReconnect() {
    		final String methodName = "attemptReconnect";
    		// @Trace 500=Attempting to reconnect client: {0}
    		log.fine(CLASS_NAME, methodName, "500", new Object[] { this.clientId });
    		try {
    			connect(this.connOpts, this.userContext, new MqttReconnectActionListener(methodName));
    		} catch (MqttSecurityException ex) {
    			// @TRACE 804=exception
    			log.fine(CLASS_NAME, methodName, "804", null, ex);
    		} catch (MqttException ex) {
    			// @TRACE 804=exception
    			log.fine(CLASS_NAME, methodName, "804", null, ex);
    		}
    	}

    会调用connect方法尝试连接

    会跳到MqttConnection的connect方法,然后会判断是不是断线重连

    if(connectOptions.isAutomaticReconnect()){
    			//The Automatic reconnect functionality is enabled here
    			Log.i(TAG, "Requesting Automatic reconnect using New Java AC");
    			final Bundle resultBundle = new Bundle();
    			resultBundle.putString(
    					MqttServiceConstants.CALLBACK_ACTIVITY_TOKEN,
    					reconnectActivityToken);
    			resultBundle.putString(
    					MqttServiceConstants.CALLBACK_INVOCATION_CONTEXT, null);
    			resultBundle.putString(MqttServiceConstants.CALLBACK_ACTION,
    					MqttServiceConstants.CONNECT_ACTION);
    			try {
    				myClient.reconnect();
    			} catch (MqttException ex){
    				Log.e(TAG, "Exception occurred attempting to reconnect: " + ex.getMessage());
    				setConnectingState(false);
    				handleException(resultBundle, ex);
    			}
    		} 

    断线重连会执行reconnect方法,

    	public void reconnect() throws MqttException {
    		final String methodName = "reconnect";
    		// @Trace 500=Attempting to reconnect client: {0}
    		log.fine(CLASS_NAME, methodName, "500", new Object[] { this.clientId });
    		// Some checks to make sure that we're not attempting to reconnect an
    		// already connected client
    		if (comms.isConnected()) {
    			throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED);
    		}
    		if (comms.isConnecting()) {
    			throw new MqttException(MqttException.REASON_CODE_CONNECT_IN_PROGRESS);
    		}
    		if (comms.isDisconnecting()) {
    			throw new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING);
    		}
    		if (comms.isClosed()) {
    			throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED);
    		}
    		// We don't want to spam the server
    		stopReconnectCycle();
    
    		attemptReconnect();
    	}

    周而复始~直到连接成功

    这里强调一点:每次连接失败之后,延迟执行的时间会更改

    每次会等待之前时间的2倍之后才再次请求,当时间大于128s之后,每隔128秒请求一次

    if (reconnectDelay < 128000) {
    				reconnectDelay = reconnectDelay * 2;
    			}
    			rescheduleReconnectCycle(reconnectDelay);

    三、订阅subscribe

    首先执行MqttAndroidClient的subscribe方法,调用MqttService的subscribe方法

    public IMqttToken subscribe(String topic, int qos, Object userContext,
    			IMqttActionListener callback) throws MqttException {
    		IMqttToken token = new MqttTokenAndroid(this, userContext,
    				callback, new String[]{topic});
    		String activityToken = storeToken(token);
    		mqttService.subscribe(clientHandle, topic, qos, null, activityToken);
    		return token;
    	}

    MqttService中启动一个MqttConnection对象,调用它的subscribe方法

    如果没有connect的话,会创建一个Bundle对象,把参数保存进去,启动一个广播发送出去。

    void callbackToActivity(String clientHandle, Status status,
          Bundle dataBundle) {
        // Don't call traceDebug, as it will try to callbackToActivity leading
        // to recursion.
        Intent callbackIntent = new Intent(
            MqttServiceConstants.CALLBACK_TO_ACTIVITY);
        if (clientHandle != null) {
          callbackIntent.putExtra(
              MqttServiceConstants.CALLBACK_CLIENT_HANDLE, clientHandle);
        }
        callbackIntent.putExtra(MqttServiceConstants.CALLBACK_STATUS, status);
        if (dataBundle != null) {
          callbackIntent.putExtras(dataBundle);
        }
        LocalBroadcastManager.getInstance(this).sendBroadcast(callbackIntent);
      }

    这个广播是之前说的那个动态广播MqttAndroidClient。

    然后在onReceive方法里面,根据

    MqttServiceConstants.CALLBACK_ACTION

    来获取对应的action

    这个action的值在之前Bundle里面存放

    MqttServiceConstants.SUBSCRIBE_ACTION

     

    如果connect连接的话,就调用MqttAsyncClient的subscribe方法

    public IMqttToken subscribe(String[] topicFilters, int[] qos, Object userContext, IMqttActionListener callback)
    			throws MqttException {
    		final String methodName = "subscribe";
    
    		if (topicFilters.length != qos.length) {
    			throw new IllegalArgumentException();
    		}
    
    		// remove any message handlers for individual topics
    		for (int i = 0; i < topicFilters.length; ++i) {
    			this.comms.removeMessageListener(topicFilters[i]);
    		}
    
    		// Only Generate Log string if we are logging at FINE level
    		if (log.isLoggable(Logger.FINE)) {
    			StringBuffer subs = new StringBuffer();
    			for (int i = 0; i < topicFilters.length; i++) {
    				if (i > 0) {
    					subs.append(", ");
    				}
    				subs.append("topic=").append(topicFilters[i]).append(" qos=").append(qos[i]);
    
    				// Check if the topic filter is valid before subscribing
    				MqttTopic.validate(topicFilters[i], true/* allow wildcards */);
    			}
    			// @TRACE 106=Subscribe topicFilter={0} userContext={1} callback={2}
    			log.fine(CLASS_NAME, methodName, "106", new Object[] { subs.toString(), userContext, callback });
    		}
    
    		MqttToken token = new MqttToken(getClientId());
    		token.setActionCallback(callback);
    		token.setUserContext(userContext);
    		token.internalTok.setTopics(topicFilters);
    
    		MqttSubscribe register = new MqttSubscribe(topicFilters, qos);
    
    		comms.sendNoWait(register, token);
    		// @TRACE 109=<
    		log.fine(CLASS_NAME, methodName, "109");
    
    		return token;
    	}

    创建一个MqttSubscribe对象组包,然后调用MqttCommons的sendNoWait发送

    其实sendNoWait做的就是用MqttState的send方法,把数据给一个

    Vector类型的对象,相当于存到发送的队列里面,然后sender出去
    展开全文
  • MQTT源码解析 (C源码解析)

    千次阅读 2019-06-28 14:27:51
    网址:https://blog.csdn.net/bangdingshouji/article/details/52576110
    展开全文
  • mqtt协议解析

    2015-08-06 17:30:04
    mqtt的消息格式解析,你了解解析还怕不会封装么?
  • MQTT源码分析

    2020-06-24 14:06:34
    MQTT介绍 MQTT是个轻量级的消息订阅/发布协议,基于TCP协议,在物联网中应用较广,当然也有的公司拿MQTT协议来做Push或IM。MQTT协议有很多客户端/服务端的实现,如Eclipse Paho就是其中一个。本文不对MQTT协议本身做...

    MQTT介绍

    MQTT是个轻量级的消息订阅/发布协议,基于TCP协议,在物联网中应用较广,当然也有的公司拿MQTT协议来做Push或IM。MQTT协议有很多客户端/服务端的实现,如Eclipse Paho就是其中一个。本文不对MQTT协议本身做介绍,而是主要分析下一个Paho MQTT客户端的代码实现。

    Paho MQTT开源项目基本使用

    • 发布端代码案例
    /**
     *发布端
     */
    public class PublishSample {
        public static void main(String[] args) {
    
            String topic = "mqtt/test";
            String content = "hello 哈哈";
            int qos = 1;
            String broker = "tcp://iot.eclipse.org:1883";
            String userName = "test";
            String password = "test";
            String clientId = "pubClient";
            // 内存存储
            MemoryPersistence persistence = new MemoryPersistence();
    
            try {
                // 创建客户端
                MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
                // 创建链接参数
                MqttConnectOptions connOpts = new MqttConnectOptions();
                // 在重新启动和重新连接时记住状态
                connOpts.setCleanSession(false);
                // 设置连接的用户名
                connOpts.setUserName(userName);
                connOpts.setPassword(password.toCharArray());
                // 建立连接
                sampleClient.connect(connOpts);
                // 创建消息
                MqttMessage message = new MqttMessage(content.getBytes());
                // 设置消息的服务质量
                message.setQos(qos);
                // 发布消息
                sampleClient.publish(topic, message);
                // 断开连接
                sampleClient.disconnect();
                // 关闭客户端
                sampleClient.close();
            } catch (MqttException me) {
                System.out.println("reason " + me.getReasonCode());
                System.out.println("msg " + me.getMessage());
                System.out.println("loc " + me.getLocalizedMessage());
                System.out.println("cause " + me.getCause());
                System.out.println("excep " + me);
                me.printStackTrace();
            }
        }
    }
    
    • 订阅端代码案例
    /**
     *订阅端
     */
    public class SubscribeSample {
    
        public static void main(String[] args) throws MqttException {   
            String HOST = "tcp://iot.eclipse.org:1883";
            String TOPIC = "mqtt/test";
            int qos = 1;
            String clientid = "subClient";
            String userName = "test";
            String passWord = "test";
            try {
                // host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
                MqttClient client = new MqttClient(HOST, clientid, new MemoryPersistence());
                // MQTT的连接设置
                MqttConnectOptions options = new MqttConnectOptions();
                // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
                options.setCleanSession(true);
                // 设置连接的用户名
                options.setUserName(userName);
                // 设置连接的密码
                options.setPassword(passWord.toCharArray());
                // 设置超时时间 单位为秒
                options.setConnectionTimeout(10);
                // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
                options.setKeepAliveInterval(20);
                // 设置回调函数
                client.setCallback(new MqttCallback() {
    
                    public void connectionLost(Throwable cause) {
                        System.out.println("connectionLost");
                    }
    
                    public void messageArrived(String topic, MqttMessage message) throws Exception {
                        System.out.println("topic:"+topic);
                        System.out.println("Qos:"+message.getQos());
                        System.out.println("message content:"+new String(message.getPayload()));
                        
                    }
    
                    public void deliveryComplete(IMqttDeliveryToken token) {
                        System.out.println("deliveryComplete---------"+ token.isComplete());
                    }
    
                });
                client.connect(options);
                //订阅消息
                client.subscribe(TOPIC, qos);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    应用层源码分析

    Mqtt内部源码中提供了两种外部使用的Client,MqttClient和MqttAsyncClient,接口功能基本相同,一个实现了IMqttClient接口,一个实现了IMqttAsyncClient接口。IMqttAsyncClient相当于IMqttCLient方法参数里增加了操作回调接口,因为是异步的嘛,所以需要接口回调通知。这里可以理解MqttClient是MqttAsyncClient的代理类,IMClient接口和IMqttAsyncClient提供了基本的订阅、解除订阅、连接、断开连接、重新连接、设置回调接口等功能,MqttClient和MqttAsyncClient比较,从名字可以看出来一个是异步的一个是同步的。实际上真正负责干活的是MqttAsyncClient,MqttClient对其MqttAsyncClient的API做了一层阻塞式的包装,我们可以从源码中对MqttClient的注释看出,如下:

    /**
     * Lightweight client for talking to an MQTT server using methods that block until an operation completes.
     * <p>This class implements the blocking {@link IMqttClient} client interface where all actions block until they have completed (or timed out).
     * </p>
     * @see IMqttClient
     */
    public class MqttClient implements IMqttClient {
    
    

    在看下源码对MqttAsyncClient的描述

    /**
     * Lightweight client for talking to an MQTT server using non-blocking methods that allow an operation to run in the background.
     * <p>
     * <p>This class implements the non-blocking {@link IMqttAsyncClient} client interface allowing applications to initiate MQTT actions and then carry on working while the MQTT action completes on a background thread.
     
     * @see IMqttAsyncClient
     */
    public class MqttAsyncClient implements IMqttAsyncClient {
    

    然后可以随便看一个MqttClient的subscribe方法

    public void subscribe(String[] topicFilters, int[] qos) throws MqttException {
    		IMqttToken tok = aClient.subscribe(topicFilters, qos, null, null);
    		tok.waitForCompletion(getTimeToWait());
    		int[] grantedQos = tok.getGrantedQos();
    		for (int i = 0; i < grantedQos.length; ++i) {
    			qos[i] = grantedQos[i];
    		}
    		if (grantedQos.length == 1 && qos[0] == 0x80) {
    			throw new MqttException(MqttException.REASON_CODE_SUBSCRIBE_FAILED);
    		}
    	}
    

    关键代码

    tok.waitForCompletion(getTimeToWait());
    

    可以看出MqttClient确实是对MqttAsyncClient的阻塞式封装。其它connect方法也是一样,就不看了。总的来说,使用MqttAsyncClient的相关api,都是异步的,必须依赖于回调的结果,才知道操作是否是成功的。而MqttClient是阻塞式,能顺序执行结束,不发生异常,则说明操作是成功的。

    客户端操作接口

    • connect
    • disconnect
    • reconnect
    • subscribe
    • unsubscribe

    分析客户端连接服务端做了哪些操作

    MqttClient mqttClient=new MqttClient(host,clientId,persistence);
    //MqttClient的构造函数
    public MqttClient(String serverURI, String clientId, MqttClientPersistence persistence) throws MqttException {
    		aClient = new MqttAsyncClient(serverURI, clientId, persistence);
    	}
    实际上内部其实构造了一个MqttAsyncClient对象,进一步可以说明,MqttClient是MqttAsyncClient的一个代理类
    接下来看下MqttAsyncClient的构造函数
    public MqttAsyncClient(String serverURI, String clientId, MqttClientPersistence persistence, MqttPingSender pingSender) throws MqttException {
            ...
            this.comms = new ClientComms(this, this.persistence, pingSender);
            ...
        }
    省略了很多,这里我们可以看到构建了一个ClientComms对象,这也是个核心类
    接下来看ClientComms的构造函数
    public ClientComms(IMqttAsyncClient client, MqttClientPersistence persistence, MqttPingSender pingSender) throws MqttException {
    	    ...
    		this.callback 	= new CommsCallback(this);
    		this.clientState = new ClientState(persistence, tokenStore, this.callback, this, pingSender);
    		callback.setClientState(clientState);
    	}
    这里可以看到构造了一个CommsCallback和ClientState对象,然后CommsCallback持有ClientState的引用
    到此MqttClient基本构造结束,可以说就是构造一些基本的配置
    接下来分析MqttClient的connect方法,连接至服务器,MqttClient调用connect方法最终走到了MqttAsyncClient的connect方法,接着走到ClientComms的connect方法
    public void connect(MqttConnectOptions options, MqttToken token) throws MqttException {
                    ...
    				ConnectBG conbg = new ConnectBG(this, token, connect);
    				conbg.start();
    	}
    省略相关代码,这里直接看到创建了一个ConnectBG的后台任务,走进去这个ConnectBG,看它的run方法
    public void run() {
    		        ...
    				// start the background processing threads before sending the connect
    				// packet.
    				NetworkModule networkModule = networkModules[networkModuleIndex];
    				networkModule.start();
    				receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream());
    				receiver.start("MQTT Rec: "+getClient().getClientId());
    				sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());
    				sender.start("MQTT Snd: "+getClient().getClientId());
    				callback.start("MQTT Call: "+getClient().getClientId());				
    				internalSend(conPacket, conToken);
    			} catch (MqttException ex) {
    				//@TRACE 212=connect failed: unexpected exception
    				mqttEx = ex;
    			} catch (Exception ex) {
    				//@TRACE 209=connect failed: unexpected exception
    				mqttEx =  ExceptionHelper.createMqttException(ex);
    			}
    
    			if (mqttEx != null) {
    				shutdownConnection(conToken, mqttEx);
    			}
    		}
    	}
    可以看到创建了NetworkModule、CommsReceiver、CommsSender,然后分别start了
    NetworkModule主要处理底层TCP通信的,Socket啥的,暂且不说底层,CommsReceiver这个用来接受消息。CommsSender用来发送消息。CommsCallabck,这个之前分析过,它的所用主要是作为CommsReceiver接收到消息后和Client的一个转换,通过它将内部接收的消息以回调的形式反馈给客户端。继续分析CommsReceiver,分析它的run方法
    public void run() {
            ...
            while (running && (in != null)) {
               MqttWireMessage message = in.readMqttWireMessage();
                        if (message != null) {
                            // A new message has arrived
                            try {
                                clientState.notifyReceivedMsg(message);
                            } catch (Exception e) {
                                MqttLog.printStackTrace(e);
                            }
                        }
                    }
        }
    省略相关代码,看核心代码,只要线程运行状态并且读消息的流不为空,这里将会发一个通知notifyReceivedMsg通知收到了消息,继续跟进 clientState.notifyReceivedMsg(message);方法
    protected void notifyReceivedMsg(MqttWireMessage message) throws MqttException {
           ...
    		if (callback != null) {
    				callback.messageArrived(send);
    		}
    		...
    		
    	}
    省略相关代码,注意这里的callback就是我们之前说的CommsCallback。如果不为空,这里直接调用CommsCallback的messageArrived()方法,继续跟进这个方法
    public void messageArrived(MqttPublish sendMessage) {
    		final String methodName = "messageArrived";
    		if (mqttCallback != null || callbacks.size() > 0) {
    			// If we already have enough messages queued up in memory, wait
    			// until some more queue space becomes available. This helps 
    			// the client protect itself from getting flooded by messages 
    			// from the server.
    			synchronized (spaceAvailable) {
    				while (running && !quiescing && messageQueue.size() >= INBOUND_QUEUE_SIZE) {
    					try {
    						// @TRACE 709=wait for spaceAvailable
    						spaceAvailable.wait(200);
    					} catch (InterruptedException ex) {
    					}
    				}
    			}
    			if (!quiescing) {
    				messageQueue.addElement(sendMessage);
    				// Notify the CommsCallback thread that there's work to do...
    				synchronized (workAvailable) {
    					// @TRACE 710=new msg avail, notify workAvailable
    					workAvailable.notifyAll();
    				}
    			}
    		}
    	}
    仔细看下代码,其实是一个生产者和消费者模型,CommsCallback内部确实是这种,内部通过一个消息队列messageQueue来存储消息的,继续跟进核心代码messageQueue.addElement(sendMessage);说明直接插入消息到消息队列中了,整理下,CommsReceiver接收线程收到消息后,通知CommsCallabck,直接将消息插入到其内部的消息队列中,我们在看下,CommsCallabck内部如何读取这个队列消息的。
    public void run() {
    		final String methodName = "run";
    		while (running) {
    				    ...
    					MqttPublish message = null;
    					synchronized (messageQueue) {
    					    if (!messageQueue.isEmpty()) {
    						  
    							message = (MqttPublish) messageQueue.elementAt(0);
    							messageQueue.removeElementAt(0);
    					    }
    					}
    					if (null != message) {
    						handleMessage(message);
    					}
    					...
    				}
    		
    		}
    	}
    省略了相关代码,可以看出来内部是一个死循环,不断的从消息队列中读取消息。消息读取之后,从队列删除,接着交给handleMessage方法处理,继续跟进这个handleMessage方法
    private void handleMessage(MqttPublish publishMessage)
    			throws MqttException, Exception {
    		  ...
    		// @TRACE 713=call messageArrived key={0} topic={1}
    		deliverMessage(destName, publishMessage.getMessageId(),
    				publishMessage.getMessage());
    		...
    	}
    省略相关代码,可以看出调用了deliverMessage方法,继续跟进
    protected boolean deliverMessage(String topicName, int messageId, MqttMessage aMessage) throws Exception{		
    		
    		if (mqttCallback != null && !delivered) {
    			aMessage.setId(messageId);
    			mqttCallback.messageArrived(topicName, aMessage);
    			delivered = true;
    		}
    		return delivered;
    	}
    可以清晰的看到,最终回调了我们最熟悉的MqttCallback的messageArrived()方法,客户端开始进行消息处理,流程大概结束。
    

    内部线程梳理

    由以上流程分析,这个开源Mqtt客户端内部大概有以下几种线程

    • Connect mqtt与服务器连接线程,线程名称为 "MQTT Con: "+getClient().getClientId()
    • Sender 消息发送线程,线程名称为 "MQTT Snd: "+getClient().getClientId()
    • Receiver 消息接收线程,线程名称为 "MQTT Rec: "+getClient().getClientId()
    • Callback 消息回调线程,线程名称为 "MQTT Call: "+getClient().getClientId()

    此外还有一个重要的底层角色,NetworkModule,他负责处理底层的Socket TCP通信相关,这个未分析

    注意点

    每个Mqtt客户端的clientId不能一样,要保证唯一性,否则连接时Broker代理服务器会将上一个相同的clientId踢掉断开!并且会出现EOFException!

    展开全文
  • MQTT 协议解析

    千次阅读 2017-07-05 17:44:48
    mqtt帧分析---基于QMQTT

    0. 准备

    1. wireshark安装包:网络抓包工具
    2. qmqtt源码:mqtt客户端源码(Qt版本)
    3. 官网:http://mqtt.org/
    4. 英文版协议说明文档:http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
    5. 中文版协议说明文档 https://mcxiaoke.gitbooks.io/mqtt-cn/content/mqtt/01-Introduction.html

    1、帧

    ###1.1帧结构

    Fixed header, present in all MQTT Control Packets
    Variable header, present in some MQTT Control Packets
    Payload, present in some MQTT Control Packets

    帧 = 固定头+可变头+载荷

    ###1.2 帧类型

    namevaluedrirectionDescription
    Reserved0ForbiddenReserved
    CONNECT1Client to ServerClient request to connect to Server
    CONNACK2Server to ClientConnect acknowledgment
    PUBLISH3Client to Server or Server to ClientPublish message
    PUBACK4同上Publish acknowledgment
    PUBREC5同上Publish received (assured delivery part 1)
    PUBREL6同上Publish release (assured delivery part 2)
    PUBCOMP7同上Publish complete (assured delivery part 3)
    SUBSCRIBE8Client to ServerClient subscribe request
    SUBACK0Server to ClientSubscribe acknowledgment
    UNSUBSCRIBE9Client to ServerUnsubscribe request
    UNSUBACKAServer to ClientUnsubscribe acknowledgment
    PINGREQBClient to ServerPING request
    PINGRESPCServer to ClientPING response
    DISCONNECTDClient to ServerClient is disconnecting
    ReservedEForbiddenReserved

    2、 wireshark 使用

    打开wireshark 后主界面会显示网络的连接有那些,如果安装了虚拟机的话会有如下的连接:
    这里写图片描述

    选择 realtek PCIe…
    双击进入如下界面

    这里写图片描述

    选择过滤为 mqtt(见左上角),这样的话wireshark就只会显示MQTT协议的包了。

    3、connect 帧

    与服务器连接时可以使用will topic 和will msg,

    它俩的作用是什么呢?
    就是在这个客户端非正常的退出时,服务器会推送这个topic给定阅了这个topic的客户端。
    这里写图片描述
    这里我们的client id没有填写,而在我们的抓包中(下图)可以看到到一个字串,这是QMQTT生成的一个uuid

    这里写图片描述

    对照协议文档再看wrieshark所抓的包,就可以清楚的明白,帧结构了。

    4、subscribe 帧

    这里我们定阅了一个主题是:guangzhou/huangpu

    这里写图片描述
    抓出来的包如下:
    这里写图片描述

    5、publish 帧

    这里我们发布了一个主题消息:work here
    这里写图片描述
    以下是抓到的包:
    这里写图片描述

    6、总结

    其它的帧可以通过同样的方法可以抓到,如此我们就可以根据数据来分析帧的结构了。(对照文档)

    展开全文
  • 源码分析 总结 前言 在上篇分析了mqtt的基础后,接下来就分析下mqtt协议连接建立流程。在阅读本文之前,需要大家去了解一下Mqtt报文格式,还没有看的同学可以参考下此文:https://blog.csdn.net/l...
  • 解读Paho MQTT源码

    千次阅读 2016-02-21 14:28:56
    这个两天要重点突破一下MQTT的东西, 找到了它的源码,解读一下,作为下一步优化的路标。 Paho是基于socket开做的,本质上还是维持一个长socket。 以TCP socket为例:(org.eclipse.paho.client.mqttv3.internal....
  • 在Eclipse paho mqtt源码中有心跳的接口类:org.eclipse.paho.client.mqttv.MqttPingSender。此接口类的实现有两个,分别是:org.eclipse.paho.client.mqttv3.TimerPingSender和org.eclipse.paho.client.mqttv3....
  • 此篇文章继上一篇物联网协议之MQTT源码分析(一)而写的第二篇MQTT发布消息以及接收Broker消息的源码分析,想看MQTT连接的小伙伴可以去看我上一篇哦。 juejin.im/post/5cd66c… MQTT发布消息 MQTT发布消息是由...
  • 本文是MQTT 嵌入式 C语言 客户端libemqtt源码解析 MQTT协议连接  libemqtt源码下载 源码目录结构:  ├── client:客户端应用代码(订阅与发布)  ├── include:头文件包含  └── src:emqtt...
  • 物联网协议之MQTT源码分析(一)

    千次阅读 2019-05-12 06:32:02
    一直以来只是单纯的使用MQTT连接、发布和接收消息等,也没在乎其Client(Android)端的源码是怎样实现的,要不是最近项目出现一个小问题困扰了很久可能也不会引发我看一看MQTT源码实现。好啦让我们开始了解MQTT的...
  • MQTT 协议解析,java使用

    千次阅读 2018-12-06 10:03:56
    MQ 遥测传输 (MQTT) 是轻量级基于代理的发布/订阅的消息传输协议,设计思想是开放、简单、轻量、易于实现。这些特点使它适用于受限环境。该协议的特点有: 使用发布/订阅消息模式,提供一对多的消息发布,解除应用...
  • 这将解析来自433MHz接收器的Oregon Scientific协议v3信号,并将数据发送到MQTT服务器。 硬件使用ESP8266 Sparkfun Thing来直接联系wifi。 GPIO 4是从外部433MHz接收器单元输入的 为了进行编译,您必须在local.c中...
  • 首先说一下为什么要写这篇文章呢,在我发现... 主要记录一下MQTT在原先1.2.0版本使用过程中出现的问题,排查问题到升级1.2.1版本过程中出现的问题,通过源码一步步排查出最后的问题点,直到符合预期目标。   &l...
  • 本文从阿里云源码解析,将阿里云一机一密型中的配置解析出常规的配置信息。 用处在于如果没有设备网关,可以将数据解析后,使用MQTT桌面平台,填入常规mqtt配置信息,进而和阿里云物联网平台进行通信,进行数据发送...
  • MQTT再学习 -- MQTT 客户端源码分析

    万次阅读 2017-08-14 15:25:55
    MQTT 源码分析,搜索了一下发现网络上讲的很少,多是逍遥子的哪几篇。参看:逍遥子_mosquitto源码分析系列参看:MQTT libmosquitto源码分析参看:Mosquitto学习笔记一、目录结构首先我们还是来看一下 mosquitto-...
  • 0. 准备 ...qmqtt源码mqtt客户端源码(Qt版本) 官网:http://mqtt.org/ 英文版协议说明文档:http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html 中文版协议说明文档 h...
  • 从DSMR到MQTT 使用DSMRv5协议读取荷兰智能电表,并将某些统计信息发布给mqtt代理。 DSMRv5解析器 为了解析数据电报,我使用了dsmr5板箱的修改版,您可以查看其源代码
  • 开源MQTT服务器源码分析

    千次阅读 2018-07-23 15:11:12
    本文主要描述libmosquitto部分代码架构,实现原理,部分重要代码解析;另外还有针对该代码库的不足和问题分析。   阅读条件 阅读此文,需要了解MQTT协议结构和部分实现。 MQTT简述 MQTT(Message Queuing ...
  • MQTT libmosquitto源码分析

    万次阅读 2016-03-02 14:44:48
    本文主要描述libmosquitto部分代码架构,实现原理,部分重要代码解析;另外还有针对该代码库的不足和问题分析。   阅读条件 阅读此文,需要了解MQTT协议结构和部分实现。 MQTT简述 MQTT(Message Queuing Tel
  • 编码和解码MQTT 3.1.1、5.0包的节点方式。 该库已通过节点v6,v8,v10,v12和v14进行了测试。 支持旧版本节点的最新版本是mqtt-packet@4.1.2。 安装 npm install mqtt-packet --save 例子 产生中 const mqtt = ...
  • 该模块支持流输入(例如,从用于串行端口的TCP套接字),其中推送到解析器的单个缓冲区可能一次包含部分数据包或多个数据包。 安装 npm install mqttsn-packet --save 例子 产生中 var mqttsn = require ( 'mqttsn-...
  • MQTT 客户端源码分析

    2018-02-22 16:04:00
    参看:MQTT libmosquitto源码分析 参看:Mosquitto学习笔记 一、目录结构 首先我们还是来看一下 mosquitto-1.4.14 的源码目录结构 我们主要关注 client、lib、src 这三个目录。其中 src 和 lib 目录下主要放置 ...
  • 节点红色contrib-mqtt-json 解析JSON有效负载的Node-RED MQTT输入节点 执照 麻省理工学院(c)塞巴斯蒂安·拉夫
  • 单片机接入阿里云平台步骤解析1.1 接入阿里云物联网平台步骤1.2 阿里云IOT平台订阅消息处理1.3 本地消息发布2.ESP8266 AT指令驱动2.1 ESP8266_AT.C2.2 ESP8266_AT.H3.MQTT协议实现3.1 ESP8266_MQTT.C3.2 ESP8266_...
  • 将设备通过mqtt上传的数据解析然后插入mysql数据库,同时下发控制命令

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,975
精华内容 790
热门标签
关键字:

mqtt源码解析