mqtt消息推送_mqtt 消息推送出现乱码 - CSDN
精华内容
参与话题
  • 使用Apollo搭建的服务端,后台采用java编写,使用Spring MVC 框架搭建的后台; 客户端使用Android,验证并接收后台传递的消息
  • MQTT消息推送

    2018-10-22 09:58:20
    Mqtt之服务质量等级Qos 分为3级,从低到高为:Qos0、Qos1、Qos2

    Mqtt之服务质量等级Qos

    分为3级,从低到高为:Qos0、Qos1、Qos2

    代理(broker)

    • 主要职责是接受发布者发布的所有消息,并将其过滤后分发给不同的消息订阅者。
    展开全文
  • MQTT实现消息推送

    万次阅读 热门讨论 2012-07-24 14:33:21
    MQTT实现消息推送   MQTT实现消息接收(接收消息需实现MqttSimpleCallback接口并实现它的publishArrived方法)必须注册接收消息方法 mqttClient.registerSimpleHandler(simpleCallbackHandler);// 注册接收消息...

     

    MQTT实现消息推送

     

    MQTT实现消息接收(接收消息需实现MqttSimpleCallback接口并实现它的publishArrived方法)必须注册接收消息方法

    mqttClient.registerSimpleHandler(simpleCallbackHandler);// 注册接收消息方法
    

    和订阅接主题

    mqttClient.subscribe(TOPICS, QOS_VALUES);// 订阅接主题
    

     

     

    服务端:

    package com.gmcc.kuchuan.business;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    
    import com.ibm.mqtt.MqttClient;
    import com.ibm.mqtt.MqttException;
    import com.ibm.mqtt.MqttSimpleCallback;
    
    /**
     * MQTT消息发送与接收
     * @author Join
     *
     */
    public class MqttBroker {
    	private final static Log logger = LogFactory.getLog(MqttBroker.class);// 日志对象
    	// 连接参数
    	private final static String CONNECTION_STRING = "tcp://localhost:9901";
    	private final static boolean CLEAN_START = true;
    	private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s
    	private final static String CLIENT_ID = "master";// 客户端标识
    	private final static int[] QOS_VALUES = { 0, 0, 2, 0 };// 对应主题的消息级别
    	private final static String[] TOPICS = { "Test/TestTopics/Topic1",
    			"Test/TestTopics/Topic2", "Test/TestTopics/Topic3",
    			"client/keepalive" };
    	private static MqttBroker instance = new MqttBroker();
    
    	private MqttClient mqttClient;
    
    	/**
    	 * 返回实例对象
    	 * 
    	 * @return
    	 */
    	public static MqttBroker getInstance() {
    		return instance;
    	}
    
    	/**
    	 * 重新连接服务
    	 */
    	private void connect() throws MqttException {
    		logger.info("connect to mqtt broker.");
    		mqttClient = new MqttClient(CONNECTION_STRING);
    		logger.info("***********register Simple Handler***********");
    		SimpleCallbackHandler simpleCallbackHandler = new SimpleCallbackHandler();
    		mqttClient.registerSimpleHandler(simpleCallbackHandler);// 注册接收消息方法
    		mqttClient.connect(CLIENT_ID, CLEAN_START, KEEP_ALIVE);
    		logger.info("***********subscribe receiver topics***********");
    		mqttClient.subscribe(TOPICS, QOS_VALUES);// 订阅接主题
    
    		logger.info("***********CLIENT_ID:" + CLIENT_ID);
    		/**
    		 * 完成订阅后,可以增加心跳,保持网络通畅,也可以发布自己的消息
    		 */
    		mqttClient.publish("keepalive", "keepalive".getBytes(), QOS_VALUES[0],
    				true);// 增加心跳,保持网络通畅
    	}
    
    	/**
    	 * 发送消息
    	 * 
    	 * @param clientId
    	 * @param messageId
    	 */
    	public void sendMessage(String clientId, String message) {
    		try {
    			if (mqttClient == null || !mqttClient.isConnected()) {
    				connect();
    			}
    
    			logger.info("send message to " + clientId + ", message is "
    					+ message);
    			// 发布自己的消息
    			mqttClient.publish("GMCC/client/" + clientId, message.getBytes(),
    					0, false);
    		} catch (MqttException e) {
    			logger.error(e.getCause());
    			e.printStackTrace();
    		}
    	}
    
    	/**
    	 * 简单回调函数,处理server接收到的主题消息
    	 * 
    	 * @author Join
    	 * 
    	 */
    	class SimpleCallbackHandler implements MqttSimpleCallback {
    
    		/**
    		 * 当客户机和broker意外断开时触发 可以再此处理重新订阅
    		 */
    		@Override
    		public void connectionLost() throws Exception {
    			// TODO Auto-generated method stub
    			System.out.println("客户机和broker已经断开");
    		}
    
    		/**
    		 * 客户端订阅消息后,该方法负责回调接收处理消息
    		 */
    		@Override
    		public void publishArrived(String topicName, byte[] payload, int Qos,
    				boolean retained) throws Exception {
    			// TODO Auto-generated method stub
    			System.out.println("订阅主题: " + topicName);
    			System.out.println("消息数据: " + new String(payload));
    			System.out.println("消息级别(0,1,2): " + Qos);
    			System.out.println("是否是实时发送的消息(false=实时,true=服务器上保留的最后消息): "
    					+ retained);
    		}
    
    	}
    
    	public static void main(String[] args) {
    		new MqttBroker().sendMessage("client", "message");
    	}
    }


     

     

    Android客户端:

    核心代码:MQTTConnection内部类

    import java.io.ByteArrayInputStream;
    import java.io.File;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.RandomAccessFile;
    import java.net.HttpURLConnection;
    import java.net.URL;
    import java.util.ArrayList;
    import java.util.Timer;
    import java.util.TimerTask;
    
    import android.app.AlarmManager;
    import android.app.Notification;
    import android.app.NotificationManager;
    import android.app.PendingIntent;
    import android.app.Service;
    import android.content.BroadcastReceiver;
    import android.content.Context;
    import android.content.Intent;
    import android.content.IntentFilter;
    import android.content.SharedPreferences;
    import android.database.Cursor;
    import android.net.ConnectivityManager;
    import android.net.NetworkInfo;
    import android.os.Binder;
    import android.os.Bundle;
    import android.os.IBinder;
    import android.provider.ContactsContract;
    import android.util.Log;
    //此部分项目导包已被删除
    import com.ibm.mqtt.IMqttClient;
    import com.ibm.mqtt.MqttClient;
    import com.ibm.mqtt.MqttException;
    import com.ibm.mqtt.MqttPersistence;
    import com.ibm.mqtt.MqttPersistenceException;
    import com.ibm.mqtt.MqttSimpleCallback;
    
    /* 
     * PushService that does all of the work.
     * Most of the logic is borrowed from KeepAliveService.
     * http://code.google.com/p/android-random/source/browse/trunk/TestKeepAlive/src/org/devtcg/demo/keepalive/KeepAliveService.java?r=219
     */
    public class PushService extends Service {
    	private MyBinder mBinder = new MyBinder();
    	// this is the log tag
    	public static final String TAG = "PushService";
    
    	// the IP address, where your MQTT broker is running.
    	private static final String MQTT_HOST = "120.197.230.53"; // "209.124.50.174";//
    	// the port at which the broker is running.
    	private static int MQTT_BROKER_PORT_NUM = 9901;
    	// Let's not use the MQTT persistence.
    	private static MqttPersistence MQTT_PERSISTENCE = null;
    	// We don't need to remember any state between the connections, so we use a
    	// clean start.
    	private static boolean MQTT_CLEAN_START = true;
    	// Let's set the internal keep alive for MQTT to 15 mins. I haven't tested
    	// this value much. It could probably be increased.
    	private static short MQTT_KEEP_ALIVE = 60 * 15;
    	// Set quality of services to 0 (at most once delivery), since we don't want
    	// push notifications
    	// arrive more than once. However, this means that some messages might get
    	// lost (delivery is not guaranteed)
    	private static int[] MQTT_QUALITIES_OF_SERVICE = { 0 };
    	private static int MQTT_QUALITY_OF_SERVICE = 0;
    	// The broker should not retain any messages.
    	private static boolean MQTT_RETAINED_PUBLISH = false;
    
    	// MQTT client ID, which is given the broker. In this example, I also use
    	// this for the topic header.
    	// You can use this to run push notifications for multiple apps with one
    	// MQTT broker.
    	public static String MQTT_CLIENT_ID = "client";
    
    	// These are the actions for the service (name are descriptive enough)
    	public static final String ACTION_START = MQTT_CLIENT_ID + ".START";
    	private static final String ACTION_STOP = MQTT_CLIENT_ID + ".STOP";
    	private static final String ACTION_KEEPALIVE = MQTT_CLIENT_ID
    			+ ".KEEP_ALIVE";
    	private static final String ACTION_RECONNECT = MQTT_CLIENT_ID
    			+ ".RECONNECT";
    
    	// Connection log for the push service. Good for debugging.
    	private ConnectionLog mLog;
    
    	// Connectivity manager to determining, when the phone loses connection
    	private ConnectivityManager mConnMan;
    	// Notification manager to displaying arrived push notifications
    	private NotificationManager mNotifMan;
    
    	// Whether or not the service has been started.
    	private boolean mStarted;
    
    	// This the application level keep-alive interval, that is used by the
    	// AlarmManager
    	// to keep the connection active, even when the device goes to sleep.
    	private static final long KEEP_ALIVE_INTERVAL = 1000 * 60 * 28;
    
    	// Retry intervals, when the connection is lost.
    	private static final long INITIAL_RETRY_INTERVAL = 1000 * 10;
    	private static final long MAXIMUM_RETRY_INTERVAL = 1000 * 60 * 30;
    
    	// Preferences instance
    	private SharedPreferences mPrefs;
    	// We store in the preferences, whether or not the service has been started
    	public static final String PREF_STARTED = "isStarted";
    	// We also store the deviceID (target)
    	public static final String PREF_DEVICE_ID = "deviceID";
    	// We store the last retry interval
    	public static final String PREF_RETRY = "retryInterval";
    
    	// Notification title
    	public static String NOTIF_TITLE = "client";
    	// Notification id
    	private static final int NOTIF_CONNECTED = 0;
    
    	// This is the instance of an MQTT connection.
    	private MQTTConnection mConnection;
    	private long mStartTime;
    	boolean mShowFlag = true;// 是否显示通知
    	public static Context ctx;
    	private boolean mRunFlag = true;// 是否向服务器发送心跳
    	Timer mTimer = new Timer();
    
    	// Static method to start the service
    	public static void actionStart(Context ctx) {
    		Intent i = new Intent(ctx, PushService.class);
    		i.setAction(ACTION_START);
    		ctx.startService(i);
    		PushService.ctx = ctx;
    	}
    
    	// Static method to stop the service
    	public static void actionStop(Context ctx) {
    		Intent i = new Intent(ctx, PushService.class);
    		i.setAction(ACTION_STOP);
    		ctx.startService(i);
    	}
    
    	// Static method to send a keep alive message
    	public static void actionPing(Context ctx) {
    		Intent i = new Intent(ctx, PushService.class);
    		i.setAction(ACTION_KEEPALIVE);
    		ctx.startService(i);
    	}
    
    	@Override
    	public void onCreate() {
    		super.onCreate();
    
    		log("Creating service");
    		mStartTime = System.currentTimeMillis();
    
    		try {
    			mLog = new ConnectionLog();
    			Log.i(TAG, "Opened log at " + mLog.getPath());
    		} catch (IOException e) {
    			Log.e(TAG, "Failed to open log", e);
    		}
    
    		// Get instances of preferences, connectivity manager and notification
    		// manager
    		mPrefs = getSharedPreferences(TAG, MODE_PRIVATE);
    		mConnMan = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);
    		mNotifMan = (NotificationManager) getSystemService(NOTIFICATION_SERVICE);
    
    		/*
    		 * If our process was reaped by the system for any reason we need to
    		 * restore our state with merely a call to onCreate. We record the last
    		 * "started" value and restore it here if necessary.
    		 */
    		handleCrashedService();
    	}
    
    	// This method does any necessary clean-up need in case the server has been
    	// destroyed by the system
    	// and then restarted
    	private void handleCrashedService() {
    		if (wasStarted() == true) {
    			log("Handling crashed service...");
    			// stop the keep alives
    			stopKeepAlives();
    
    			// Do a clean start
    			start();
    		}
    	}
    
    	@Override
    	public void onDestroy() {
    		log("Service destroyed (started=" + mStarted + ")");
    
    		// Stop the services, if it has been started
    		if (mStarted == true) {
    			stop();
    		}
    
    		try {
    			if (mLog != null)
    				mLog.close();
    		} catch (IOException e) {
    		}
    	}
    
    	@Override
    	public void onStart(Intent intent, int startId) {
    		super.onStart(intent, startId);
    		log("Service started with intent=" + intent);
    		if (intent == null) {
    			return;
    		}
    		// Do an appropriate action based on the intent.
    		if (intent.getAction().equals(ACTION_STOP) == true) {
    			stop();
    			stopSelf();
    		} else if (intent.getAction().equals(ACTION_START) == true) {
    			start();
    
    		} else if (intent.getAction().equals(ACTION_KEEPALIVE) == true) {
    			keepAlive();
    		} else if (intent.getAction().equals(ACTION_RECONNECT) == true) {
    			if (isNetworkAvailable()) {
    				reconnectIfNecessary();
    			}
    		}
    	}
    
    	public class MyBinder extends Binder {
    		public PushService getService() {
    			return PushService.this;
    		}
    	}
    
    	@Override
    	public IBinder onBind(Intent intent) {
    		return mBinder;
    	}
    
    	// log helper function
    	private void log(String message) {
    		log(message, null);
    	}
    
    	private void log(String message, Throwable e) {
    		if (e != null) {
    			Log.e(TAG, message, e);
    
    		} else {
    			Log.i(TAG, message);
    		}
    
    		if (mLog != null) {
    			try {
    				mLog.println(message);
    			} catch (IOException ex) {
    			}
    		}
    	}
    
    	// Reads whether or not the service has been started from the preferences
    	private boolean wasStarted() {
    		return mPrefs.getBoolean(PREF_STARTED, false);
    	}
    
    	// Sets whether or not the services has been started in the preferences.
    	private void setStarted(boolean started) {
    		mPrefs.edit().putBoolean(PREF_STARTED, started).commit();
    		mStarted = started;
    	}
    
    	private synchronized void start() {
    		log("Starting service...");
    
    		// Do nothing, if the service is already running.
    		if (mStarted == true) {
    			Log.w(TAG, "Attempt to start connection that is already active");
    			return;
    		}
    
    		// Establish an MQTT connection
    
    		connect();
    
    		// 向服务器定时发送心跳,一分钟一次
    		mRunFlag = true;
    		mTimer.schedule(new TimerTask() {
    			@Override
    			public void run() {
    				if (!mRunFlag) {
    					// this.cancel();
    					// PushService.this.stopSelf();
    					return;
    				}
    				System.out.println("run");
    				try {
    					if (isNetworkAvailable()) {
    						SharedPreferences pref = getSharedPreferences(
    								"client", 0);
    						String MOBILE_NUM = pref.getString("MOBILE_NUM", "");
    						HttpUtil.post(Constants.KEEPALIVE + "&mobile="
    								+ MOBILE_NUM + "&online_flag=1");
    					}
    				} catch (Exception e) {
    					e.printStackTrace();
    					// TODO: handle exception
    				}
    			}
    		}, 0, 60 * 1000);
    		// Register a connectivity listener
    		registerReceiver(mConnectivityChanged, new IntentFilter(
    				ConnectivityManager.CONNECTIVITY_ACTION));
    	}
    
    	private synchronized void stop() {
    		// Do nothing, if the service is not running.
    		if (mStarted == false) {
    			Log.w(TAG, "Attempt to stop connection not active.");
    			return;
    		}
    
    		// Save stopped state in the preferences
    		setStarted(false);
    
    		// Remove the connectivity receiver
    		unregisterReceiver(mConnectivityChanged);
    		// Any existing reconnect timers should be removed, since we explicitly
    		// stopping the service.
    		cancelReconnect();
    
    		// Destroy the MQTT connection if there is one
    		if (mConnection != null) {
    			mConnection.disconnect();
    			mConnection = null;
    		}
    	}
    
    	//
    	private synchronized void connect() {
    		log("Connecting...");
    		// Thread t = new Thread() {
    		// @Override
    		// public void run() {
    		// fetch the device ID from the preferences.
    		String deviceID = "GMCC/client/"
    				+ mPrefs.getString(PREF_DEVICE_ID, null);
    		
    		// Create a new connection only if the device id is not NULL
    		try {
    			mConnection = new MQTTConnection(MQTT_HOST, deviceID);
    		} catch (MqttException e) {
    			// Schedule a reconnect, if we failed to connect
    			log("MqttException: "
    					+ (e.getMessage() != null ? e.getMessage() : "NULL"));
    			if (isNetworkAvailable()) {
    				scheduleReconnect(mStartTime);
    			}
    		}
    		setStarted(true);
    		// }
    		// };
    		// t.start();
    		// 向服务器定时发送心跳,一分钟一次
    		mRunFlag = true;
    	}
    
    	private synchronized void keepAlive() {
    		try {
    			// Send a keep alive, if there is a connection.
    			if (mStarted == true && mConnection != null) {
    				mConnection.sendKeepAlive();
    			}
    		} catch (MqttException e) {
    			log("MqttException: "
    					+ (e.getMessage() != null ? e.getMessage() : "NULL"), e);
    
    			mConnection.disconnect();
    			mConnection = null;
    			cancelReconnect();
    		}
    	}
    
    	// Schedule application level keep-alives using the AlarmManager
    	private void startKeepAlives() {
    		Intent i = new Intent();
    		i.setClass(this, PushService.class);
    		i.setAction(ACTION_KEEPALIVE);
    		PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
    		AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);
    		alarmMgr.setRepeating(AlarmManager.RTC_WAKEUP,
    				System.currentTimeMillis() + KEEP_ALIVE_INTERVAL,
    				KEEP_ALIVE_INTERVAL, pi);
    	}
    
    	// Remove all scheduled keep alives
    	private void stopKeepAlives() {
    		Intent i = new Intent();
    		i.setClass(this, PushService.class);
    		i.setAction(ACTION_KEEPALIVE);
    		PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
    		AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);
    		alarmMgr.cancel(pi);
    	}
    
    	// We schedule a reconnect based on the starttime of the service
    	public void scheduleReconnect(long startTime) {
    		// the last keep-alive interval
    		long interval = mPrefs.getLong(PREF_RETRY, INITIAL_RETRY_INTERVAL);
    
    		// Calculate the elapsed time since the start
    		long now = System.currentTimeMillis();
    		long elapsed = now - startTime;
    
    		// Set an appropriate interval based on the elapsed time since start
    		if (elapsed < interval) {
    			interval = Math.min(interval * 4, MAXIMUM_RETRY_INTERVAL);
    		} else {
    			interval = INITIAL_RETRY_INTERVAL;
    		}
    
    		log("Rescheduling connection in " + interval + "ms.");
    
    		// Save the new internval
    		mPrefs.edit().putLong(PREF_RETRY, interval).commit();
    
    		// Schedule a reconnect using the alarm manager.
    		Intent i = new Intent();
    		i.setClass(this, PushService.class);
    		i.setAction(ACTION_RECONNECT);
    		PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
    		AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);
    		alarmMgr.set(AlarmManager.RTC_WAKEUP, now + interval, pi);
    	}
    
    	// Remove the scheduled reconnect
    	public void cancelReconnect() {
    		Intent i = new Intent();
    		i.setClass(PushService.this, PushService.class);
    		i.setAction(ACTION_RECONNECT);
    		PendingIntent pi = PendingIntent.getService(PushService.this, 0, i, 0);
    		AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);
    		alarmMgr.cancel(pi);
    	}
    
    	private synchronized void reconnectIfNecessary() {
    		log("mStarted" + mStarted);
    		log("mConnection" + mConnection);
    		if (mStarted == true && mConnection == null) {
    			log("Reconnecting...");
    			connect();
    		}
    	}
    
    	// This receiver listeners for network changes and updates the MQTT
    	// connection
    	// accordingly
    	private BroadcastReceiver mConnectivityChanged = new BroadcastReceiver() {
    		@Override
    		public void onReceive(Context context, final Intent intent) {
    			// Get network info
    			// Thread mReconnect = new Thread(){
    			// public void run() {
    			NetworkInfo info = (NetworkInfo) intent
    					.getParcelableExtra(ConnectivityManager.EXTRA_NETWORK_INFO);
    			// Is there connectivity?
    			boolean hasConnectivity = (info != null && info.isConnected()) ? true
    					: false;
    
    			log("Connectivity changed: connected=" + hasConnectivity);
    
    			if (hasConnectivity) {
    				reconnectIfNecessary();
    			} else if (mConnection != null) {
    				// Thread cancelConn = new Thread(){
    				// public void run() {
    				// // if there no connectivity, make sure MQTT connection is
    				// destroyed
    				log("cancelReconnect");
    				mConnection.disconnect();
    				mConnection = null;
    				log("cancelReconnect" + mConnection);
    				cancelReconnect();
    				// }
    				// };
    				// cancelConn.start();
    			}
    			// };
    			//
    			// };
    			// mReconnect.start();
    		}
    	};
    
    	// Display the topbar notification
    	private void showNotification(String text, Request request) {
    
    		Notification n = new Notification();
    		n.flags |= Notification.FLAG_SHOW_LIGHTS;
    		n.flags |= Notification.FLAG_AUTO_CANCEL;
    		n.defaults = Notification.DEFAULT_ALL;
    		n.icon = R.drawable.ico;
    		n.when = System.currentTimeMillis();
    		Intent intent = new Intent();
    		Bundle bundle = new Bundle();
    		bundle.putSerializable("request", request);
    		bundle.putString("currentTab", "1");
    		intent.putExtras(bundle);
    		intent.setClass(this, MainActivity.class);
    		intent.setAction(Intent.ACTION_MAIN);
    		intent.addCategory(Intent.CATEGORY_LAUNCHER);
    		intent.setFlags(Intent.FLAG_ACTIVITY_NEW_TASK
    				| Intent.FLAG_ACTIVITY_RESET_TASK_IF_NEEDED);
    		// Simply open the parent activity
    		PendingIntent pi = PendingIntent.getActivity(this, 0, intent, 0);
    
    		// Change the name of the notification here
    		n.setLatestEventInfo(this, NOTIF_TITLE, text, pi);
    		mNotifMan.notify(NOTIF_CONNECTED, n);
    	}
    
    	// Check if we are online
    	private boolean isNetworkAvailable() {
    		NetworkInfo info = mConnMan.getActiveNetworkInfo();
    		if (info == null) {
    			return false;
    		}
    		return info.isConnected();
    	}
    
    	// This inner class is a wrapper on top of MQTT client.
    	private class MQTTConnection implements MqttSimpleCallback {
    		IMqttClient mqttClient = null;
    
    		// Creates a new connection given the broker address and initial topic
    		public MQTTConnection(String brokerHostName, String initTopic)
    				throws MqttException {
    			// Create connection spec
    			String mqttConnSpec = "tcp://" + brokerHostName + "@"
    					+ MQTT_BROKER_PORT_NUM;
    			// Create the client and connect
    			mqttClient = MqttClient.createMqttClient(mqttConnSpec,
    					MQTT_PERSISTENCE);
    			String clientID = MQTT_CLIENT_ID + "/"
    					+ mPrefs.getString(PREF_DEVICE_ID, "");
    			Log.d(TAG, "mqttConnSpec:" + mqttConnSpec + "  clientID:"
    					+ clientID);
    			mqttClient.connect(clientID, MQTT_CLEAN_START, MQTT_KEEP_ALIVE);
    
    			// register this client app has being able to receive messages
    			mqttClient.registerSimpleHandler(this);
    
    			// Subscribe to an initial topic, which is combination of client ID
    			// and device ID.
    			// initTopic = MQTT_CLIENT_ID + "/" + initTopic;
    			subscribeToTopic(initTopic);
    
    			log("Connection established to " + brokerHostName + " on topic "
    					+ initTopic);
    
    			// Save start time
    			mStartTime = System.currentTimeMillis();
    			// Star the keep-alives
    			startKeepAlives();
    		}
    
    		// Disconnect
    		public void disconnect() {
    			// try {
    			stopKeepAlives();
    			log("stopKeepAlives");
    			Thread t = new Thread() {
    				public void run() {
    					try {
    						mqttClient.disconnect();
    						log("mqttClient.disconnect();");
    					} catch (MqttPersistenceException e) {
    						log("MqttException"
    								+ (e.getMessage() != null ? e.getMessage()
    										: " NULL"), e);
    					}
    				};
    			};
    			t.start();
    			// } catch (MqttPersistenceException e) {
    			// log("MqttException"
    			// + (e.getMessage() != null ? e.getMessage() : " NULL"),
    			// e);
    			// }
    		}
    
    		/*
    		 * Send a request to the message broker to be sent messages published
    		 * with the specified topic name. Wildcards are allowed.
    		 */
    		private void subscribeToTopic(String topicName) throws MqttException {
    
    			if ((mqttClient == null) || (mqttClient.isConnected() == false)) {
    				// quick sanity check - don't try and subscribe if we don't have
    				// a connection
    				log("Connection error" + "No connection");
    			} else {
    				String[] topics = { topicName };
    				mqttClient.subscribe(topics, MQTT_QUALITIES_OF_SERVICE);
    			}
    		}
    
    		/*
    		 * Sends a message to the message broker, requesting that it be
    		 * published to the specified topic.
    		 */
    		private void publishToTopic(String topicName, String message)
    				throws MqttException {
    			if ((mqttClient == null) || (mqttClient.isConnected() == false)) {
    				// quick sanity check - don't try and publish if we don't have
    				// a connection
    				log("No connection to public to");
    			} else {
    				mqttClient.publish(topicName, message.getBytes(),
    						MQTT_QUALITY_OF_SERVICE, MQTT_RETAINED_PUBLISH);
    			}
    		}
    
    		/*
    		 * Called if the application loses it's connection to the message
    		 * broker.
    		 */
    		public void connectionLost() throws Exception {
    			log("Loss of connection" + "connection downed");
    			stopKeepAlives();
    			// 取消定时发送心跳
    			mRunFlag = false;
    			// 向服务器发送请求,更改在线状态
    			// SharedPreferences pref = getSharedPreferences("client",0);
    			// String MOBILE_NUM=pref.getString("MOBILE_NUM", "");
    			// HttpUtil.post(Constants.KEEPALIVE + "&mobile="
    			// + MOBILE_NUM+"&online_flag=0");
    			// null itself
    			mConnection = null;
    			if (isNetworkAvailable() == true) {
    				reconnectIfNecessary();
    			}
    		}
    
    		/*
    		 * Called when we receive a message from the message broker.
    		 */
    		public void publishArrived(String topicName, byte[] payload, int qos,
    				boolean retained) throws MqttException {
    			// Show a notification
    			// synchronized (lock) {
    			String s = new String(payload);
    			Request request = null;
    			try {// 解析服务端推送过来的消息
    				request = XmlPaserTool.getMessage(new ByteArrayInputStream(s
    						.getBytes()));
    				// request=Constants.request;
    			} catch (Exception e) {
    				e.printStackTrace();
    			}
    			final Request mRequest = request;
    			DownloadInfo down = new DownloadInfo(mRequest);
    			down.setDownLoad(down);
    			downloadInfos.add(down);
    			sendUpdateBroast();
    			down.start();
    			showNotification("您有一条新的消息!", mRequest);
    			Log.d(PushService.TAG, s);
    			Log.d(PushService.TAG, mRequest.getMessageId());
    			// 再向服务端推送消息
    			new AdvancedCallbackHandler().sendMessage(MQTT_CLIENT_ID
    					+ "/keepalive", "***********send message**********");
    		}
    
    		public void sendKeepAlive() throws MqttException {
    			log("Sending keep alive");
    			// publish to a keep-alive topic
    			publishToTopic(MQTT_CLIENT_ID + "/keepalive",
    					mPrefs.getString(PREF_DEVICE_ID, ""));
    		}
    	}
    
    	class AdvancedCallbackHandler {
    		IMqttClient mqttClient = null;
    		public final int[] QOS_VALUES = { 0, 0, 2, 0 };// 对应主题的消息级别
    
    		/**
    		 * 重新连接服务
    		 */
    		private void connect() throws MqttException {
    			String mqttConnSpec = "tcp://" + MQTT_HOST + "@"
    					+ MQTT_BROKER_PORT_NUM;
    			// Create the client and connect
    			mqttClient = MqttClient.createMqttClient(mqttConnSpec,
    					MQTT_PERSISTENCE);
    			String clientID = MQTT_CLIENT_ID + "/"
    					+ mPrefs.getString(PREF_DEVICE_ID, "");
    			mqttClient.connect(clientID, MQTT_CLEAN_START, MQTT_KEEP_ALIVE);
    			Log.d(TAG, "连接服务器,推送消息");
    			Log.d(TAG, "**mqttConnSpec:" + mqttConnSpec + "  clientID:"
    					+ clientID);
    			Log.d(TAG, MQTT_CLIENT_ID + "/keepalive");
    			// 增加心跳,保持网络通畅
    			mqttClient.publish(MQTT_CLIENT_ID + "/keepalive",
    					"keepalive".getBytes(), QOS_VALUES[0], true);
    		}
    
    		/**
    		 * 发送消息
    		 * 
    		 * @param clientId
    		 * @param messageId
    		 */
    		public void sendMessage(String clientId, String message) {
    			try {
    				if (mqttClient == null || !mqttClient.isConnected()) {
    					connect();
    				}
    
    				Log.d(TAG, "send message to " + clientId + ", message is "
    						+ message);
    				// 发布自己的消息
    				// mqttClient.publish(MQTT_CLIENT_ID + "/keepalive",
    				// message.getBytes(), 0, false);
    				mqttClient.publish(MQTT_CLIENT_ID + "/keepalive",
    						message.getBytes(), 0, false);
    			} catch (MqttException e) {
    				Log.d(TAG, e.getCause() + "");
    				e.printStackTrace();
    			}
    		}
    	}
    
    	public String getPeople(String phone_number) {
    		String name = "";
    		String[] projection = { ContactsContract.PhoneLookup.DISPLAY_NAME,
    				ContactsContract.CommonDataKinds.Phone.NUMBER };
    		Log.d(TAG, "getPeople ---------");
    		// 将自己添加到 msPeers 中
    		Cursor cursor = this.getContentResolver().query(
    				ContactsContract.CommonDataKinds.Phone.CONTENT_URI,
    				projection, // Which columns to return.
    				ContactsContract.CommonDataKinds.Phone.NUMBER + " = '"
    						+ phone_number + "'", // WHERE clause.
    				null, // WHERE clause value substitution
    				null); // Sort order.
    
    		if (cursor == null) {
    			Log.d(TAG, "getPeople null");
    			return "";
    		}
    		Log.d(TAG, "getPeople cursor.getCount() = " + cursor.getCount());
    		if (cursor.getCount() > 0) {
    			cursor.moveToPosition(0);
    
    			// 取得联系人名字
    			int nameFieldColumnIndex = cursor
    					.getColumnIndex(ContactsContract.PhoneLookup.DISPLAY_NAME);
    			name = cursor.getString(nameFieldColumnIndex);
    			Log.i("Contacts", "" + name + " .... " + nameFieldColumnIndex); // 这里提示
    																			// force
    																			// close
    			System.out.println("联系人姓名:" + name);
    			return name;
    		}
    		return phone_number;
    	}
    
    	public void sendUpdateBroast() {
    		Intent intent = new Intent();
    		intent.setAction("update");
    		sendBroadcast(intent);
    	}
    
    	public void sendUpdateFinishBroast() {
    		Intent intent = new Intent();
    		intent.setAction("updateFinishList");
    		sendBroadcast(intent);
    	}
    
    	public class DownloadInfo extends Thread {
    		boolean runflag = true;
    		Request mRequest;
    		public float progress;
    		public MessageBean bean = null;
    		DownloadInfo download = null;
    		MessageDetailDao dao = new MessageDetailDao(
    				PushService.this.getApplicationContext());
    
    		public synchronized void stopthread() {
    			runflag = false;
    		}
    
    		public synchronized boolean getrunflag() {
    			return runflag;
    		}
    
    		DownloadInfo(Request mRequest) {
    			this.mRequest = mRequest;
    
    		}
    
    		public void setDownLoad(DownloadInfo download) {
    			this.download = download;
    		}
    
    		@Override
    		public void run() {
    			try {
    
    				File dir = new File(Constants.DOWNLOAD_PATH);
    				if (!dir.exists()) {
    					dir.mkdirs();
    				}
    				String filePath = Constants.DOWNLOAD_PATH
    						+ mRequest.getMessageId() + "." + mRequest.getExt();
    				bean = new MessageBean();
    				bean.setPath(filePath);
    				bean.setStatus(0);
    				bean.setDate(mRequest.getTimestamp());
    				bean.setLayoutID(R.layout.list_say_he_item);
    				bean.setPhotoID(R.drawable.receive_ico);
    				bean.setMessage_key(mRequest.getMessageId());
    				bean.setPhone_number(mRequest.getReceiver());
    				bean.setAction(1);
    				String name = getPeople(mRequest.getSender());
    				bean.setName(name);
    				bean.setFileType(Integer.parseInt(mRequest.getCommand()));
    				if (mRequest.getCommand().equals(Request.TYPE_MUSIC)) {
    					bean.setMsgIco(R.drawable.music_temp);
    					bean.setText(name + "给你发送了音乐");
    					mRequest.setBody(Base64.encodeToString(bean.getText()
    							.getBytes(), Base64.DEFAULT));
    				} else if (mRequest.getCommand().equals(Request.TYPE_CARD)) {
    					bean.setMsgIco(R.drawable.card_temp);
    					bean.setText(new String(Base64.decode(mRequest.getBody(),
    							Base64.DEFAULT)));
    					mRequest.setBody(Base64.encodeToString(bean.getText()
    							.getBytes(), Base64.DEFAULT));
    				} else if (mRequest.getCommand().equals(Request.TYPE_LBS)) {
    					bean.setMsgIco(R.drawable.address_temp);
    					bean.setText(new String(Base64.decode(mRequest.getBody(),
    							Base64.DEFAULT)));
    					mRequest.setBody(Base64.encodeToString(bean.getText()
    							.getBytes(), Base64.DEFAULT));
    				} else if (mRequest.getCommand().equals(Request.TYPE_PHOTO)) {
    					bean.setText(name + "向你发送了照片");
    					bean.setMsgIco(-1);
    				} else if (mRequest.getCommand().equals(Request.TYPE_PIC)) {
    					bean.setText(name + "向你发送了图片");
    					bean.setMsgIco(-1);
    				} else if (mRequest.getCommand().equals(Request.TYPE_SMS)) {
    					bean.setFileType(0);
    				}
    
    				if (!mRequest.getCommand().equals(Request.TYPE_CARD)
    						&& !mRequest.getCommand().equals(Request.TYPE_SMS)) {
    					String path = Constants.FILE_DOWNLOAD_URL
    							+ mRequest.getMessageId();
    					URL url = new URL(path);
    					HttpURLConnection hurlconn = (HttpURLConnection) url
    							.openConnection();// 基于HTTP协议的连接对象
    					hurlconn.setConnectTimeout(5000);// 请求超时时间 5s
    					hurlconn.setRequestMethod("GET");// 请求方式
    					hurlconn.connect();
    					long fileSize = hurlconn.getContentLength();
    					InputStream instream = hurlconn.getInputStream();
    					byte[] buffer = new byte[1024];
    					int len = 0;
    					int number = 0;
    					RandomAccessFile rasf = new RandomAccessFile(filePath,
    							"rwd");
    					while ((len = instream.read(buffer)) != -1) {// 开始下载文件
    						if (getrunflag() && progress < 100) {
    							rasf.seek(number);
    							number += len;
    							rasf.write(buffer, 0, len);
    							progress = (((float) number) / fileSize) * 100;
    							// 发送广播,修改进度条进度
    							sendUpdateBroast();
    						} else {
    							this.interrupt();
    							if (number != fileSize) {// 取消下载,将已经缓存的未下载完成的文件删除
    								File file = new File(filePath);
    								if (file.exists())
    									file.delete();
    							}
    							PushService.downloadInfos.remove(download);
    							sendUpdateBroast();
    							return;
    						}
    					}
    					instream.close();
    					PushService.downloadInfos.remove(download);
    					sendUpdateBroast();
    				} else {// 收到消息,将信息保存到数据库
    
    					PushService.downloadInfos.remove(download);
    					sendUpdateBroast();
    				}
    				// 将文件信息保存到数据库
    				dao.create(bean);
    				sendUpdateFinishBroast();
    
    			} catch (Exception e) {
    				PushService.downloadInfos.remove(download);
    				sendUpdateBroast();
    				e.printStackTrace();
    			}
    		}
    	}
    
    	public static ArrayList<DownloadInfo> downloadInfos = new ArrayList<DownloadInfo>();
    
    	public ArrayList<DownloadInfo> getDownloadInfos() {
    		return PushService.downloadInfos;
    	}
    
    	public void setDownloadInfos(ArrayList<DownloadInfo> downloadInfos) {
    		PushService.downloadInfos = downloadInfos;
    	}
    }


     ps:

    接收者必须订阅发送者的TOPICS才能收到消息

     

    
    
    
    
    
    
    
    
    展开全文
  • 关于MQTT协议实现消息推送系统

    万次阅读 2018-01-06 23:21:44
    MQTT协议 官网是最好的老师:http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html 国内一些人的总结:https://zhuanlan.zhihu.com/p/20888181 概要 为物联网而生,物联网(Internet of Things...

    MQTT协议

    国内一些人的总结:https://zhuanlan.zhihu.com/p/20888181

    概要

    为物联网而生,物联网(Internet of Things,IoT)最近曝光率越来越高。虽然HTTP是网页的事实标准,不过机器之间(Machine-to-Machine,M2M)的大规模沟通需要不同的模式:之前的请求/回答(Request/Response)模式不再合适,取而代之的是发布/订阅(Publish/Subscribe)模式。这就是轻量级、可扩展的MQTT(Message Queuing Telemetry Transport)可以施展拳脚的舞台。
    适用背景,MQTT是基于二进制消息的发布/订阅编程模式的消息协议,最早由IBM提出的,如今已经成为OASIS规范。由于规范很简单,非常适合需要低功耗和网络带宽有限的IoT场景,比如:遥感数据;汽车;智能家居;智慧城市;医疗医护。
    采用pub/sub模式,与请求/回答这种同步模式不同,发布/定义模式解耦了发布消息的客户(发布者)与订阅消息的客户(订阅者)之间的关系,这意味着发布者和订阅者之间并不需要直接建立联系。
    主题,MQTT是通过主题对消息进行分类的,本质上就是一个UTF-8的字符串,不过可以通过反斜杠表示多个层级关系。主题并不需要创建,直接使用就是了。主题还可以通过通配符进行过滤。其中,+可以过滤一个层级,而*只能出现在主题最后表示过滤任意级别的层级。举个例子:building-b/floor-5:代表B楼5层的设备;+/floor-5:代表任何一个楼的5层的设备;building-b/*:代表B楼所有的设备。注意,MQTT允许使用通配符订阅主题,但是并不允许使用通配符广播
    服务质量,为了满足不同的场景,MQTT支持三种不同级别的服务质量(Quality of Service,QoS)为不同场景提供消息可靠性:级别0:尽力而为,消息发送者会想尽办法发送消息,但是遇到意外并不会重试(TCP保证);级别1:至少一次,消息接收者如果没有知会或者知会本身丢失(消息推送系统中海量消息推送,这个搞不好会带来,推送速率锐减,一条消息可能重试多次直到接收端回复接受正常?),消息发送者会再次发送以保证消息接收者至少会收到一次,当然可能造成重复消息;级别2:恰好一次。保证这种语义肯待会减少并发或者增加延时,不过丢失或者重复消息是不可接受的时候,级别2是最合适的。
    服务质量是个老话题了。级别2所提供的不重不丢很多情况下是最理想的,不过往返多次的确认一定对并发和延迟带来影响。级别1提供的至少一次语义在日志处理这种场景下是完全OK的,所以像Kafka这类的系统利用这一特点减少确认从而大大提高了并发。级别0适合鸡肋数据场景,食之无味弃之可惜,就这么着吧。

    报文

    1.我们
    一共四个字节固定报头,剩下是数据包长度。

    2.MQTT(最短7字节数据报头)
    由  两个字节的固定包头 + 可变报头 + 有效载荷(消息体)组成。MQTT拥有14种不同的消息类型其中,不管长度有限的消息类型,推送消息是通过PUBLISH类型消息推送到接收端的。PUBLISH控制报文是指从客户端向服务端或者服务端向客户端传输一个应用消息。所以,看一下其组成。
    2.1固定报头(2字节长度):

    2.2可变报头(qos0为5字节,QoS1/2为7字节):
    其顺序包含 主题名  和 报文标识符
    主题名:用于识别有效载荷数据应该被发布到哪一个信息通道
    报文标识符:只有当QoS等级是1或2时,报文标识符(Packet Identifier)字段才能出现在PUBLISH报文
    中。

    2.3有效载荷
    有效载荷包含将被发布的应用消息。数据的内容和格式是应用特定的。有效载荷的长度这样计算: 用固定报头中的剩余长度字段的值减去可变报头的长度。包含零长度有效载荷的,PUBLISH报文是合法的

    3.总结
    所以就按照报文大小来说,我们包更加的小。

    性能

    1.我们
    长连接,在线推送,对于报文有重试机制
    2.MQTT
    2.1也是长连接,在线推送,对于报文QOS1级别,需要的保证机制,估计比我们的重试机制要重;
    2.2为了提供服务质量保证,客户端和服务端有必要存储会话状态。在整个会话期间,客户端和服务端都必须存储会话状态。会话必须至少持续和它的活跃网络连接同样长的时间 
    2.3支持tcp/ip,TLS,  WebSocket协议
    2.4分发协议关注的是从单个发送者到单个接收者的应用消息。服务端分发应用消息给多个客户端时,每个客户端独立处理。分发给客户端的出站应用消息和入站应用消息的QoS等级可能是不同的。

    服务端的保留消息不是会话状态的组成部分, 服务端应该保留那种消息直到客户端删除它。MQTT用户应该评估MQTT客户端和服务端实现的存储容量,确保能满足需求。

    关于性能测试的例子:

    3.ActiveMQ性能测试参考

    测试环境:



     硬盘:1T,5400  (效果不佳)

     

     

    得出了一个异样的测试结果:

     

    持久: 

    插入200000条JSON,共消耗:25.175 s

    平均:7944.389275074478 条/秒

     

    插入200000条JSON,共消耗:34.47 s

    平均:5802.146794313896 条/秒

     

    插入200000条JSON,共消耗:29.937 s数量:1400000

    平均:6680.696128536593 条/秒

     

    插入200000条JSON,共消耗:29.094 s

    平均:6874.269608854059 条/秒

     

     

     

    非持久:

    插入200000条JSON,共消耗:11.35 s数量:1800000

    平均:17621.14537444934 条/秒

     

    插入200000条JSON,共消耗:10.714 s

    平均:18667.16445771887 条/秒

     

    插入200000条JSON,共消耗:11.153 s

    平均:17932.394871335066 条/秒

     

    插入200000条JSON,共消耗:10.717 s数量:2400000

    平均:18661.93897545955 条/秒

     

     

    主要在自己本地测试,最终祸首是硬盘不给力啊;

    在进行持久化操作时,ActiveMQ默认是kahadb管理

    log的默认大小是32MB,当超过之后会新建一个新的log文件,完成操作后,activeMQ又将旧的log删除了。

     



     

     

    代码贴上:

    Java代码  收藏代码
    1. public class Sender {  
    2.   
    3.     static int size = 200000;  
    4.     static Session session;  
    5.     static MessageProducer producer;  
    6.     static Topic topic;  
    7.     static Connection connection;  
    8.     static String str = "[{'flag':'1','value':'8854c92e92404b188e63c4031db0eac9','label':'交换机(虚机)'},{'flag':'1','value':'3f367296c2174b7981342dc6fcb39d64','label':'防火墙'},{'flag':'1','value':'8a3e05eeedf54f8cbed37c6fb38c6385','label':'负载均衡'},{'flag':'1','value':'4f0ebc601dfc40ed854e08953f0cdce8','label':'其他设备'},{'flag':'1','value':'6','label':'路由器'},{'flag':'1','value':'4','label':'交换机'},{'flag':'1','value':'b216ca1af7ec49e6965bac19aadf66da','label':'服务器'},{'flag':'1','value':'7','label':'安全设备'},{'flag':'1','value':'cd8b768a300a4ce4811f5deff91ef700','label':'DWDM\\SDH'},{'flag':'1','value':'5','label':'防火墙(模块)'},{'flag':'1','value':'01748963956649e589a11c644d6c09b5','label':'机箱'}]";  
    9.   
    10.     public static void init_connection() throws Exception {  
    11.         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
    12.         connection = factory.createConnection();  
    13.         connection.start();  
    14.         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
    15.         topic = session.createTopic("java.activemq.tps");  
    16.         producer = session.createProducer(topic);  
    17.         producer.setDeliveryMode(DeliveryMode.PERSISTENT);  
    18.     }  
    19.   
    20.     public static void sendMessage(String msg) {  
    21.         TextMessage message;  
    22.         try {  
    23.             message = session.createTextMessage();  
    24.             message.setText(str);  
    25.             producer.send(message);  
    26.         } catch (JMSException e) {  
    27.             e.printStackTrace();  
    28.         }  
    29.     }  
    30.   
    31.     public static void close() throws Exception {  
    32.         connection.close();  
    33.     }  
    34.   
    35.     public static void main(String[] arg) throws Exception {  
    36.         long start = System.currentTimeMillis();  
    37.         ExecutorService es = Executors.newFixedThreadPool(10);  
    38.         final CountDownLatch cdl = new CountDownLatch(size);  
    39.         init_connection();  
    40.         for (int a = 0; a < size; a++) {  
    41.             es.execute(new Runnable() {  
    42.                 @Override  
    43.                 public void run() {  
    44.                     sendMessage(str);  
    45.                     cdl.countDown();  
    46.                 }  
    47.             });  
    48.         }  
    49.         cdl.await();  
    50.         es.shutdown();  
    51.         long time = System.currentTimeMillis() - start;  
    52.         System.out.println("插入" + size + "条JSON,共消耗:" + (double)time / 1000 + " s");  
    53.         System.out.println("平均:" + size / ((double)time/1000) + " 条/秒");  
    54.         close();  
    55.     }  
    56. }  


    消息持久化

    最重要的是消息持久化,这个时候再写磁盘或者数据库,就有点晚了(一般都有海量设备不在线),效率肯定要低下来,集群应该可以挽救,但是担心集群复杂度更高,成本比较大。后面有一些已有的集群方案作比较。

    MQTT实现推送收集

    原始设想


    流行MQTT服务端


    emqttd 收费  Erlang开发   支持集群  开源版稳定性可靠性差
    RabbitMQ 免费 Erlang开发  不确定是否支持集群,可能需要自己定制    开源性能都说不好
    mosquitto 收费      单线程不支持集群和负载均衡开源
    Moquette           java     集群自己定制(借助redis等)  开源问题不少http://blog.csdn.net/educast/article/details/78352953
    ActiveMQ   免费java 支持集群 开源

    关于架构

    关于点对点:http://blog.csdn.net/flonny/article/details/78521634(借助广播主题,并遍历server上链接的client,找到对应的clientid)
    http://blog.csdn.net/vsddvsd/article/details/54632913(貌似是按照客户端数量建立queue,跪了)

    还得考虑消息持久化(要考虑集群的因素,需要加数据库,加上同步保存集群session),加上上面的点对点,等系统成型,性能不敢保证,下面是分布式集群的方案和测试:

    影响qctiveMQ性能的几个要素

    影响ActiveMQ性能的几个重要因素

    Queue
    1、Send/dispatch Async 影响非常大

         同步异步的发送和投递,都非常影响吞吐量。另外,SystemUsage和PFC流控对同步发送有直接影响。
    2、Not transacted 去掉了记录redo日志
    3、Auto_ACK/Optim_ACK 优化确认

         减少交互次数
    4、Non-persistence 持久化消息,跟下面几点有关

        持久化和非持久化,也是数量级的影响,毕竟为了提高可靠性,使用数据库或文件来存消息,开销非常大。
    5、pendingQueuePolicy/vmQueueCursor 决定了消息存储+发送模式,影响很大

        内存最快,文件和jdbc方式更安全,但是非常慢。。。
    6、producerFlowControl/memoryLimit  可能会直接block掉producer

          vmCursor+非持久时,直接变成一个内存MQ,为了不爆掉jvm,在消息积压到指定数量的时候,PFC会阻止生产消息。
    7、fast/slow consumer      决定了消息处理模式

         跟上面几点有关系。

    8、在connection或connectionFactory上关闭掉 copyMessageOnSend

    <!--StartFragment -->
    根据JMS规范,消息是不可变的。send的时候,会自动的添加一些属性。有时候,可能会重用,或者多线程处理。为了不影响消息的不可变性,发送的时候,先复制一份,这样,发送时处理的消息对象和你的代码持有的消息对象,是两个不同对象了。相互之间就不会互相影响了。
    一般情况下,这个选项可以关闭,从而获得一定的性能提升。
    9、consumer端,获取消息时候的prefetchSize设置。 一定范围情况下,一次预获取越大,总体性能越好。






    展开全文
  • 基于MQTT消息推送

    千次阅读 2016-12-09 11:45:17
    这段时间学习了推送技术,对xmpp和mqtt 协议做了下比较。 xmpp基于xml信息传递,所以传输信息量比较大,在保持长链接情况下功耗会比较大。 可能还是比较适合用来做聊天之类的通讯应用,而对于智能和物联低功耗设备...

    这段时间学习了推送技术,对xmpp和mqtt 协议做了下比较。

    xmpp基于xml信息传递,所以传输信息量比较大,在保持长链接情况下功耗会比较大。

    可能还是比较适合用来做聊天之类的通讯应用,而对于智能和物联低功耗设备的推送来说,感觉比较笨重。

    而mqtt协议就是针对网络带宽低,高延时,通信不稳定的环境设计的,特别适合物联设备。低通讯量连接保持,简约轻便。

    • 提供了发布/订阅模式,只要订阅了,即使发布时客户端离线,等再次上线时还能收到消息。
    • 提供了发布反馈,客户端收到消息反馈等机制。
    • 提供了发布质量,比如至多一次,至少一次,只有一次。可以根据不同业务要求进行选择。
    • 提供了心跳机制,可自行设置心跳
    • 提供了鉴权机制

    可见,提供的功能已经很完整了。


    mqtt 详细协议可见:

    http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html

    中文版的:

    http://wenku.baidu.com/link?url=eyG24CVjmmA_J5KbR_Y8xzDOkq5YsrChshcHxcZQzav2OcVE9KNxukXr1uix1ACPTw36QIKwHRTmLqjM1Bm7qqPwVQYG9BzIv_BdJvPoU8_


    现在有很多基于mqtt的开源实现,包含客户端和服务器端(或者说中间件)。

    我做了一个demo,客户选用paho 提供client-mqtt 和 android server库,中间件是apollo。

    apollo 下载地址:

    http://activemq.apache.org/apollo/download.html

    下载完解压按照里面的readme 建立和运行自己的broker,依赖java运行环境。


    client-mqtt 和 android server库地址:

    可以直接把jar文件导入工程,但是我选择的是把源码放入,因为这样可以根据自己的需要对源码做修改。
    android studio上:
    1.把jar放入很简单:放到model libs目录下,右键 As libraries就可以了。
    2. 源码放入,把sources.jar 分别解压,放到对应java目录下。



    3. client.mqtt3 里包含了java的.properties 文件,它们是针对多语言的,这点和android的设计不一样,所以android studio编译时是不能直接把它们打包到class文件里面的。需要单独把它们打包成jar放到lib目录下,这样在运行过程就不会报找不到它们了。

      打包很简单,新建org\eclipse\paho\client\mqttv3\internal\nls 目录,把它们都放到这个目录下。

     然后在org 目录外执行 jar -cvf  properties.jar ./*  , 之后把properties.jar 放入libs下。


      4. 在androidManifest 里面添加

    <service android:name="org.eclipse.paho.android.service.MqttService">
    </service>
    5. 在自己的activity或者service里面通过调用paho.android.service MqttAndroidClient 类实现发布和订阅。

    package com.tww.test;
    
    import android.content.ComponentName;
    import android.content.Context;
    import android.content.ServiceConnection;
    import android.os.Bundle;
    import android.os.IBinder;
    import android.os.PowerManager;
    import android.support.v7.app.AppCompatActivity;
    import android.util.Log;
    import android.view.View;
    import android.widget.Button;
    
    import org.eclipse.paho.android.service.MqttAndroidClient;
    import org.eclipse.paho.android.service.MqttTraceHandler;
    import org.eclipse.paho.client.mqttv3.IMqttActionListener;
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.IMqttToken;
    import org.eclipse.paho.client.mqttv3.MqttCallback;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.MqttSecurityException;
    import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
    
    import java.io.IOException;
    import java.io.InputStream;
    
    import javax.net.ssl.SSLSocketFactory;
    
    public class MainActivity extends AppCompatActivity  implements View.OnClickListener,ServiceConnection, MqttCallback, IMqttActionListener,MqttTraceHandler {
        private static final String TAG = "MainActivity";
        private Button mButton;
        private MqttAndroidClient mMqttAndroidClient;
        private String host = "tcp://192.168.43.224:61613";
        private String userName = "admin";
        private String passWord = "password";
        private IMqttToken mConnectToken;
        private PowerManager.WakeLock mWakeLock;
        @Override
        protected void onCreate(Bundle savedInstanceState) {
            super.onCreate(savedInstanceState);
            setContentView(R.layout.activity_main);
            mButton = (Button) findViewById(R.id.button);
            mButton.setOnClickListener(this);
            mMqttAndroidClient = new MqttAndroidClient(this,host,"123456789",new MqttDefaultFilePersistence(getCacheDir().getAbsolutePath()));
            mMqttAndroidClient.setCallback(this);
            mMqttAndroidClient.setTraceEnabled(true);
            mMqttAndroidClient.setTraceCallback( this);
    
            Log.d(TAG,"onCreate");
            MqttConnectOptions options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setUserName(userName);
            options.setPassword(passWord.toCharArray());
            options.setConnectionTimeout(60);
            options.setKeepAliveInterval(2*60);
            options.setMqttVersion(3);
            try {
                InputStream caInput = getResources().getAssets().open("keystore.bks");
                if(caInput!=null){
                    Log.d(TAG,"do setSocketFactory");
                    SSLSocketFactory sslSocketFactory = mMqttAndroidClient.getSSLSocketFactory(caInput,"password");
                    options.setSocketFactory(sslSocketFactory);
                }
            } catch (IOException e) {
                e.printStackTrace();
                Log.e(TAG,"do connect IOException:"+e);
            }catch (MqttSecurityException e) {
                e.printStackTrace();
                Log.e(TAG,"do connect MqttSecurityException:"+e);
            }
            try {
                Log.d(TAG,"do connect");
                mConnectToken = mMqttAndroidClient.connect(options,this);
            } catch (MqttException e) {
                Log.e(TAG,"connect MqttException:"+e);
                e.printStackTrace();
            }
            PowerManager powerManager = (PowerManager)getSystemService(Context.POWER_SERVICE);
            mWakeLock = powerManager.newWakeLock(PowerManager.FULL_WAKE_LOCK, TAG);
            mWakeLock.setReferenceCounted(false);
            mWakeLock.acquire();
    
        }
        @Override
        protected void onDestroy(){
            super.onDestroy();
            if(mMqttAndroidClient != null){
                try {
                    mMqttAndroidClient.disconnect();
                } catch (MqttException e) {
                    e.printStackTrace();
                }
                mMqttAndroidClient.close();
            }
            mWakeLock.release();
        }
    
        @Override
        public void onClick(View v) {
            if(v.getId()==R.id.button){
    
            }
        }
    
        @Override
        public void onServiceConnected(ComponentName name, IBinder service) {
            Log.d(TAG,"onServiceConnected");
        }
    
        @Override
        public void onServiceDisconnected(ComponentName name) {
            Log.d(TAG,"onServiceDisconnected");
        }
    
        @Override
        public void connectionLost(Throwable throwable) {
            Log.d(TAG,"connectionLost");
        }
    
        @Override
        public void messageArrived(String s, MqttMessage mqttMessage) throws Exception {
            Log.d(TAG,"messageArrived:"+mqttMessage.toString());
        }
    
        @Override
        public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
            Log.d(TAG,"deliveryComplete");
        }
    
        @Override
        public void onSuccess(IMqttToken iMqttToken) {
            if(mConnectToken.equals(iMqttToken)){
                try {
                    Log.d(TAG,"connect success, do subscribe");
                    mMqttAndroidClient.subscribe("test",1);
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
        }
    
        @Override
        public void onFailure(IMqttToken iMqttToken, Throwable throwable) {
            if(mConnectToken.equals(iMqttToken)){
               Log.d(TAG,"onFailure success:"+throwable.toString());
    
            }
        }
    
        @Override
        public void traceDebug(String s, String s1) {
            Log.d(s,s1);
        }
    
        @Override
        public void traceError(String s, String s1) {
            Log.e(s,s1);
        }
    
        @Override
        public void traceException(String s, String s1, Exception e) {
            Log.e(s,s1,e);
        }
    }
    


    展开全文
  • MQTT是一个轻量级的消息发布/订阅协议,它是实现基于手机客户端的消息推送服务器的理想解决方案。  实现MQTT协议的中间件有很多,我用的是Apollo服务器,如何搭建MQTT服务器,请查阅其他资料。这里,主要介绍...
  • 酷炫MQTT实现消息推送

    万次阅读 热门讨论 2017-06-24 22:50:38
    首先在实现本功能之前我们需要储备一下预备知识,大家可以看我的前两篇文章以及官网,了解MQTT的基本常识: MQTT入门篇 MQTT服务器Mosquitto安装及使用 MQTT官网 在步入正题之前先给大家发放个福利,介绍一款MQTT...
  • 使用EMQ实现消息推送

    千次阅读 2019-05-06 10:44:52
    笔者希望能为一些选择了EMQ作为消息推送服务的同学启发,并将使用EMQ过程中笔者遇到的问题暴露出来,当然也希望其他使用EMQ的同学能够给笔者更好的建议。本文的食用人群为对EMQ做过调研或者有相关实践经验的同学,...
  • springboot如何集成mqtt消息推送

    万次阅读 热门讨论 2018-06-04 16:12:16
    1.需求分析 近期笔者项目需要用到mqtt实现消息推送,笔者选择emq作为mqtt服务器载体,上篇笔者讲解了如何在linux中安装mqtt服务,安装链接:https://blog.csdn.net/zhangxing52077/article/details/80567270,接下来...
  • SpringBoot2.0集成MQTT消息推送功能

    万次阅读 多人点赞 2018-06-06 14:13:39
    这几天在弄后端管理系统向指定的Android客户端推送消息的功能模块,查阅了网上很多博客介绍的许多方式,最终选择基于MQTT协议来实现,MQTT是一个轻量级的消息发布/订阅协议,它是实现基于手机客户端的消息推送服务器...
  • MQTT 协议是Android 系统中消息推送的实现技术之一, 由于其具有低功耗、节省流量和可扩展性强的优点, 目前已得到了众多应用. 同时, RocketMQ 作为一种分布式消息队列, 在服务器分布式部署上具有很大优势, 具有高...
  • Android MQTT消息推送实践

    万次阅读 2017-01-12 17:35:44
    大概的消息推送过程就是这样,手机端订阅(Subscribe)一个Topic,当服务器有新消息的时候就发布(public)到所有的订阅者哪里去.MQTT全称Message Queuing Telemetry Transport,消息队列遥测传输本文记录Android客户端的...
  • 基于MQTT消息推送[https://mosquitto.org] 目的 * 要解决的问题是:给Android手机做消息推送 * 不使用第三方的SDK,如极光推送,百度云推送,信鸽 这些都不能用 * 自已搭建消息推送后台 模块 * 代理端(中转...
  • Android 实现XMP协议,基于 mqtt 消息推送通知指南
  • mqtt消息推送

    2018-05-21 19:15:04
    这是一个基于spring boot的mqtt消息推送项目。里面有代码和相关资料,代码中有我相关的包,加载后可直接运行。资料中有mqtt服务环境搭建,可做参考或网上百度。这是我研究了一个星期搞懂的,分享给大家。
  • 第一次用android,要用mqtt实现消息推送,在网上下载了源码,但找不到详细的使用方法, 不知道如何使用,想知道具体如何实现,求助!
  • 最近在研究vue+webAPI进行前后端分离,在一些如前端定时循环请求后台接口判断状态等应用场景用使用mqtt进行主动的消息推送能够很大程度的减小服务端接口的压力,提高系统的效率,而且可以利用mqtt消息通知建立一个...
  • [Android实例] MQTT消息推送,即时通讯

    热门讨论 2016-01-04 16:37:30
    博客地址:http://blog.csdn.net/krubo1/article/details/50456847
  • mqtt消息推送-推送服务简单实现

    千次阅读 2017-07-05 16:33:17
    一个上传推送消息的API接口。连接mosquito并推送消息的服务。 简单的例子可以使用springboot快速开发,使用默认配置即可。 新建maven项目mqtt-server,pom.xml配置文件如下:...
  • MQTT是面向M2M和物联网的连接协议,采用轻量级发布和订阅消息传输机制。 大家可以直接上GitHub下载MQQT服务的源码,源码地址:https://github.com/mqtt/mqtt.github.io/wiki/libraries 主要内容: 官方文档翻译:...
1 2 3 4 5 ... 20
收藏数 6,302
精华内容 2,520
关键字:

mqtt消息推送