android消息推送,使用MQTT协议,谁有用java写过服务端

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了android消息推送,使用MQTT协议,谁有用java写过服务端相关的知识,希望对你有一定的参考价值。

客户端的代码也有还会加分

参考技术A socket编程是吧?
我有。。。
demo也有。。追问

要使用IBM的那个MQTT协议,不是单纯的socket...

追答

这样啊,那倒没试过。。。不好意思啊, 帮不到你。。

追问

呵呵 ,没事,也谢谢你了,到时候没人回答分就给你了

参考技术B package com.zgnet.billing.service;
import org.json.JSONObject;
import android.annotation.SuppressLint;
import android.app.AlarmManager;
import android.app.NotificationManager;
import android.app.PendingIntent;
import android.app.Service;
import android.content.BroadcastReceiver;
import android.content.Context;
import android.content.Intent;
import android.content.IntentFilter;
import android.content.SharedPreferences;
import android.media.MediaPlayer;
import android.net.ConnectivityManager;
import android.net.NetworkInfo;
import android.os.IBinder;
import android.os.StrictMode;
import android.util.Log;
import com.ibm.mqtt.IMqttClient;
import com.ibm.mqtt.MqttClient;
import com.ibm.mqtt.MqttException;
import com.ibm.mqtt.MqttPersistence;
import com.ibm.mqtt.MqttPersistenceException;
import com.ibm.mqtt.MqttSimpleCallback;
import com.zgnet.billing.utils.WebServiceConfig;
/*
* PushService that does all of the work.
* Most of the logic is borrowed from KeepAliveService.
* http://code.google.com/p/android-random/source/browse/trunk/TestKeepAlive/src/org/devtcg/demo/keepalive/KeepAliveService.java?r=219
*/
public class PushService extends Service
// this is the log tag
public static final String TAG = "ZgnetPushService";
// the IP address, where your MQTT broker is running.服务器ip
private static final String MQTT_HOST = "192.168.10.4";
// private static final String MQTT_HOST = "10.10.15.113";
// the port at which the broker is running. 服务器端口
private static int MQTT_BROKER_PORT_NUM = 1883;
// Let's not use the MQTT persistence.
private static MqttPersistence MQTT_PERSISTENCE = null;
// We don't need to remember any state between the connections, so we use a
// clean start.
private static boolean MQTT_CLEAN_START = true;
// Let's set the internal keep alive for MQTT to 15 mins. I haven't tested
// this value much. It could probably be increased.
private static short MQTT_KEEP_ALIVE = 60 * 15;
// Set quality of services to 0 (at most once delivery), since we don't want
// push notifications
// arrive more than once. However, this means that some messages might get
// lost (delivery is not guaranteed)
private static int[] MQTT_QUALITIES_OF_SERVICE = 0 ;
private static int MQTT_QUALITY_OF_SERVICE = 0;
// The broker should not retain any messages.
private static boolean MQTT_RETAINED_PUBLISH = false;
// MQTT client ID, which is given the broker. In this example, I also use
// this for the topic header.
// You can use this to run push notifications for multiple apps with one
// MQTT broker.
public static String MQTT_CLIENT_ID = "zgnet";
// These are the actions for the service (name are descriptive enough)
private static final String ACTION_START = MQTT_CLIENT_ID + ".START";
private static final String ACTION_STOP = MQTT_CLIENT_ID + ".STOP";
private static final String ACTION_KEEPALIVE = MQTT_CLIENT_ID
+ ".KEEP_ALIVE";
private static final String ACTION_RECONNECT = MQTT_CLIENT_ID
+ ".RECONNECT";
// Connection log for the push service. Good for debugging.
// private ConnectionLog mLog;
// Connectivity manager to determining, when the phone loses connection
private ConnectivityManager mConnMan;
// Notification manager to displaying arrived push notifications
@SuppressWarnings("unused")
private NotificationManager mNotifMan;
// Whether or not the service has been started.
private boolean mStarted;
// This the application level keep-alive interval, that is used by the
// AlarmManager
// to keep the connection active, even when the device goes to sleep.
private static final long KEEP_ALIVE_INTERVAL = 1000 * 60 * 28;
// Retry intervals, when the connection is lost.
private static final long INITIAL_RETRY_INTERVAL = 1000 * 10;
private static final long MAXIMUM_RETRY_INTERVAL = 1000 * 60 * 30;
// Preferences instance
private SharedPreferences mPrefs;
// We store in the preferences, whether or not the service has been started
public static final String PREF_STARTED = "isStarted";
// We also store the deviceID (target)
public static final String PREF_DEVICE_ID = "deviceID";
// We store the last retry interval
public static final String PREF_RETRY = "retryInterval";
// Notification title
public static String NOTIF_TITLE = "Tokudu";
// Notification id
@SuppressWarnings("unused")
private static final int NOTIF_CONNECTED = 0;
// This is the instance of an MQTT connection.
private MQTTConnection mConnection;
private long mStartTime;
JSONObject js = null;
@SuppressWarnings("unused")
private MediaPlayer mp;
// Static method to start the service
public static void actionStart(Context ctx)
Intent i = new Intent(ctx, PushService.class);
i.setAction(ACTION_START);
ctx.startService(i);

// Static method to stop the service
public static void actionStop(Context ctx)
Intent i = new Intent(ctx, PushService.class);
i.setAction(ACTION_STOP);
ctx.startService(i);

// Static method to send a keep alive message
public static void actionPing(Context ctx)
Intent i = new Intent(ctx, PushService.class);
i.setAction(ACTION_KEEPALIVE);
ctx.startService(i);

@Override
public void onCreate()
super.onCreate();
log("Creating service");
mStartTime = System.currentTimeMillis();
// try
// mLog = new ConnectionLog();
// Log.i(TAG, "Opened log at " + mLog.getPath());
// catch (IOException e)
// Log.e(TAG, "Failed to open log", e);
//
// Get instances of preferences, connectivity manager and notification
// manager
mPrefs = getSharedPreferences(TAG, MODE_PRIVATE);
mConnMan = (ConnectivityManager) getSystemService(CONNECTIVITY_SERVICE);
mNotifMan = (NotificationManager) getSystemService(NOTIFICATION_SERVICE);
/*
* If our process was reaped by the system for any reason we need to
* restore our state with merely a call to onCreate. We record the last
* "started" value and restore it here if necessary.
*/
handleCrashedService();

// This method does any necessary clean-up need in case the server has been
// destroyed by the system
// and then restarted
private void handleCrashedService()
if (wasStarted() == true)
log("Handling crashed service...");
// stop the keep alives
stopKeepAlives();
// Do a clean start
start();


@Override
public void onDestroy()
log("Service destroyed (started=" + mStarted + ")");
// Stop the services, if it has been started
if (mStarted == true)
stop();

// try
// if (mLog != null)
// mLog.close();
// catch (IOException e)

@SuppressWarnings("deprecation")
@Override
public void onStart(Intent intent, int startId)
super.onStart(intent, startId);
log("Service started with intent=" + intent);
// Do an appropriate action based on the intent.
if (intent.getAction().equals(ACTION_STOP) == true)
stop();
stopSelf();
else if (intent.getAction().equals(ACTION_START) == true)
start();
else if (intent.getAction().equals(ACTION_KEEPALIVE) == true)
keepAlive();
else if (intent.getAction().equals(ACTION_RECONNECT) == true)
if (isNetworkAvailable())
reconnectIfNecessary();



@Override
public IBinder onBind(Intent intent)
return null;

// log helper function
private void log(String message)
System.out.println("----------------------------" + message);

// Reads whether or not the service has been started from the preferences
private boolean wasStarted()
return mPrefs.getBoolean(PREF_STARTED, false);

// Sets whether or not the services has been started in the preferences.
private void setStarted(boolean started)
mPrefs.edit().putBoolean(PREF_STARTED, started).commit();
mStarted = started;

private synchronized void start()
log("Starting service...");
// Do nothing, if the service is already running.
if (mStarted == true)
Log.w(TAG, "Attempt to start connection that is already active");
return;

// Establish an MQTT connection
connect();
// Register a connectivity listener
registerReceiver(mConnectivityChanged, new IntentFilter(
ConnectivityManager.CONNECTIVITY_ACTION));

private synchronized void stop()
// Do nothing, if the service is not running.
if (mStarted == false)
Log.w(TAG, "Attempt to stop connection not active.");
return;

// Save stopped state in the preferences
setStarted(false);
// Remove the connectivity receiver
unregisterReceiver(mConnectivityChanged);
// Any existing reconnect timers should be removed, since we explicitly
// stopping the service.
cancelReconnect();
// Destroy the MQTT connection if there is one
if (mConnection != null)
mConnection.disconnect();
mConnection = null;


//
private synchronized void connect()
log("Connecting...");
// fetch the device ID from the preferences.
String deviceID = mPrefs.getString(PREF_DEVICE_ID, null);
// Create a new connection only if the device id is not NULL
if (deviceID == null)
log("Device ID not found.");
else
try
mConnection = new MQTTConnection(MQTT_HOST,
new String[] "billing"
+ WebServiceConfig.DEPARTMENTID );
catch (MqttException e)
// Schedule a reconnect, if we failed to connect
log("MqttException: "
+ (e.getMessage() != null ? e.getMessage() : "NULL"));
if (isNetworkAvailable())
scheduleReconnect(mStartTime);


setStarted(true);


private synchronized void keepAlive()
try
// Send a keep alive, if there is a connection.
if (mStarted == true && mConnection != null)
mConnection.sendKeepAlive();

catch (MqttException e)
mConnection.disconnect();
mConnection = null;
cancelReconnect();


// Schedule application level keep-alives using the AlarmManager
private void startKeepAlives()
Intent i = new Intent();
i.setClass(this, PushService.class);
i.setAction(ACTION_KEEPALIVE);
PendingIntent pi = PendingIntent.getService(this, 0, i, 0);
AlarmManager alarmMgr = (AlarmManager) getSystemService(ALARM_SERVICE);
alarmMgr.setRepeating(AlarmManager.RTC_WAKEUP,
System.currentTimeMillis() + KEEP_ALIVE_INTERVAL,
KEEP_ALIVE_INTERVAL, pi);

// Remove all scheduled keep alives
参考技术C 看起来很麻烦的样子。我用xmpp貌似没这么麻烦。有个openfire已经帮我们实现。

MQTT与Mosquitto服务器搭建以及Android推送MQTT简介

文章钢要:

对MQTT协议有一定认识

对MQTT运行原理有一定了解

 

一、什么是MQTT

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输)是IBM开发的一个即时通讯协议。
国内很多企业都广泛使用MQTT作为Android手机客户端与服务器端推送消息的协议。其中Sohu,Cmstop手机客户端中均有使用到MQTT作为消息推送消息。
MQTT由于开放源代码,耗电量小等特点,将会在移动消息推送领域会有更多的贡献,在物联网领域,传感器与服务器的通信,信息的收集,MQTT都可以作为考虑的方案之一。在未来MQTT会进入到我们生活的各各方面。
 

二、MQTT特点

是轻量级基于代理的发布/订阅的消息传输协议,设计思想是开放、简单、轻量、易于实现。这些特点使它适用于受限环境。例如,但不仅限于此:

  • 网络代价昂贵,带宽低、不可靠。
  • 在嵌入设备中运行,处理器和内存资源有限。

该协议的特点有:

  • 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
  • 对负载内容屏蔽的消息传输。
  • 使用 TCP/IP 提供网络连接。

有三种消息发布服务质量:

  • "至多一次",消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
  • "至少一次",确保消息到达,但消息重复可能会发生。
  • "只有一次",确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
  • 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。
  • 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。

在未来几年,MQTT的应用会越来越广,值得关注。

通过MQTT协议,目前已经扩展出了数十个MQTT服务器端程序,可以通过PHP,JAVA,Python,C,C#等系统语言来向MQTT发送相关消息。

此外,国内很多企业都广泛使用MQTT作为Android手机客户端与服务器端推送消息的协议。其中Sohu,Cmstop手机客户端中均有使用到MQTT作为消息推送消息。

 

三、在Android推送方面与其他协议对比

1、使用XMPP协议(Openfire + Spark + Smack)
简介:基于XML协议的通讯协议,前身是Jabber,目前已由IETF国际标准化组织完成了标准化工作。
优点:协议成熟、强大、可扩展性强、目前主要应用于许多聊天系统中,且已有开源的Java版的开发实例androidpn。
缺点:协议较复杂、冗余(基于XML)、费流量、费电,部署硬件成本高。

2、使用MQTT协议(更多信息见:http://mqtt.org/
简介:轻量级的、基于代理的“发布/订阅”模式的消息传输协议。
优点:协议简洁、小巧、可扩展性强、省流量、省电,目前已经应用到企业领域(参考:http://mqtt.org/software)。
缺点:不够成熟、实现较复杂、服务端组件rsmb不开源,部署硬件成本较高。

3、使用HTTP轮循方式
简介:定时向HTTP服务端接口(Web Service API)获取最新消息。
优点:实现简单、可控性强,部署硬件成本低。
缺点:实时性差。

4、使用GCM服务(Google Cloud Messaging)
简介:Google推出的云消息服务,即第二代的C2DM。
优点:Google提供的服务、原生、简单,无需实现和部署服务端。
缺点:Android版本限制(必须大于2.2版本),该服务在国内不够稳定、需要用户绑定Google帐号,受限于Google。

 

四、详细解释MQTT协议

固定头部

固定头部,使用两个字节,共16位:

bit76543210
byte 1 Message Type DUP flag QoS level RETAIN
byte 2 Remaining Length

第一个字节(byte 1)

消息类型(4-7),使用4位二进制表示,可代表16种消息类型:

MnemonicEnumerationDescription
Reserved 0 Reserved
CONNECT 1 Client request to connect to Server
CONNACK 2 Connect Acknowledgment
PUBLISH 3 Publish message
PUBACK 4 Publish Acknowledgment
PUBREC 5 Publish Received (assured delivery part 1)
PUBREL 6 Publish Release (assured delivery part 2)
PUBCOMP 7 Publish Complete (assured delivery part 3)
SUBSCRIBE 8 Client Subscribe request
SUBACK 9 Subscribe Acknowledgment
UNSUBSCRIBE 10 Client Unsubscribe request
UNSUBACK 11 Unsubscribe Acknowledgment
PINGREQ 12 PING Request
PINGRESP 13 PING Response
DISCONNECT 14 Client is Disconnecting
Reserved 15 Reserved

除去0和15位置属于保留待用,共14种消息事件类型。

DUP flag(打开标志)

保证消息可靠传输,默认为0,只占用一个字节,表示第一次发送。不能用于检测消息重复发送等。只适用于客户端或服务器端尝试重发PUBLISH, PUBREL, SUBSCRIBE 或 UNSUBSCRIBE消息,注意需要满足以下条件:

 当QoS > 0
 消息需要回复确认

此时,在可变头部需要包含消息ID。当值为1时,表示当前消息先前已经被传送过。

QoS(Quality of Service,服务质量)

使用两个二进制表示PUBLISH类型消息:

QoS valuebit 2bit 1Description
0 0 0 至多一次 发完即丢弃 <=1
1 0 1 至少一次 需要确认回复 >=1
2 1 0 只有一次 需要确认回复 =1
3 1 1 待用,保留位置

RETAIN(保持)

仅针对PUBLISH消息。不同值,不同含义:

1:表示发送的消息需要一直持久保存(不受服务器重启影响),不但要发送给当前的订阅者,并且以后新来的订阅了此Topic name的订阅者会马上得到推送。

备注:新来乍到的订阅者,只会取出最新的一个RETAIN flag = 1的消息推送。

0:仅仅为当前订阅者推送此消息。

假如服务器收到一个空消息体(zero-length payload)、RETAIN = 1、已存在Topic name的PUBLISH消息,服务器可以删除掉对应的已被持久化的PUBLISH消息。

Remaining Length(剩余长度)

在当前消息中剩余的byte(字节)数,包含可变头部和负荷(内容)。

单个字节最大值:01111111,16进制:0x7F,10进制为127。

MQTT协议规定,第八位(最高位)若为1,则表示还有后续字节存在。

MQTT协议最多允许4个字节表示剩余长度。最大长度为:0xFF,0xFF,0xFF,0x7F,二进制表示为:11111111,11111111,11111111,01111111,十进制:268435455 byte=261120KB=256MB=0.25GB 四个字节之间值的范围:

DigitsFromTo
1 0 (0x00) 127 (0x7F)
2 128 (0x80, 0x01) 16 383 (0xFF, 0x7F)
3 16 384 (0x80, 0x80, 0x01) 2 097 151 (0xFF, 0xFF, 0x7F)
4 2 097 152 (0x80, 0x80, 0x80, 0x01) 268 435 455 (0xFF, 0xFF, 0xFF, 0x7F)

其实换个方式理解:第1字节的基数是1,而第2字节的基数:128,以此类推,第三字节的基数是:128*128=2的14次方,第四字节是:128*128*128=2的21次方;

例如,需要表达321=2*128+65.(2字节):10100001 0000 0011.

(和我们理解的低位运算放置顺序不一样,第一个字节是低位,后续字节是高位,但字节内部本身是低位右边,高位左边)。

可变头部

固定头部仅定义了消息类型和一些标志位,一些消息的元数据,需要放入可变头部中。可变头部内容字节长度 + Playload/负荷字节长度 = 剩余长度。

可变头部,包含了协议名称,版本号,连接标志,用户授权,心跳时间等内容。

可变头部居于固定头部和payload中间。

可变剩余长度(remaing length)不是可变头部的一部分,当然该长度值也是从可变头部开始计算,包含可变头部的长度+payload的长度。

可变头部的字段如下:

协议名称: MQTT CONNECT message. UTF编码:如 MQIsdp, capitalized.

协议版本:8位无符号,当前使用:3 (0x03),如下:

bit76543210
  Protocol Version
  0 0 0 0 0 0 1 1

Connect flags

Clean session, Will, Will QoS, Retain flags 该字段的设置

一个字节表示,除了第1位是保留未使用,其它7位都具有不同含义。

业务上很重要,对消息总体流程影响很大,需要牢记。

 

Clean session flag

Position: bit 1 ,连接标志.

0:server需要存储client的订阅。包括存储Qos 1和2的订阅主题(当client重连时能将消息发送);当连接丢失的时候 服务器必须维护正在发送的消息的状态直到客户端重新连接到服务器。

1:server MUST忽略之前维护关于client的信息,并且将该connection当成clean的。server MUST 忽略任何client断开的状态。

 

原文翻译不能,所以参考了下一位大牛的表达:

0,表示如果订阅的客户机断线了,要保存为其要推送的消息(QoS为1和QoS为2),若其重新连接时,需将这些消息推送(若客户端长时间不连接,需要设置一个过期值)。 
1,断线服务器即清理相关信息,重新连接上来之后,会再次订阅。

Will Flag

定义了客户端(没有主动发送DISCONNECT消息)出现网络异常导致连接中断的情况下,服务器需要做的一些措施。

简而言之,就是客户端预先定义好,在自己异常断开的情况下,所留下的最后遗愿(Last Will),也称之为遗嘱(Testament)。 这个遗嘱就是一个由客户端预先定义好的主题和对应消息,附加在CONNECT的可变头部中,在客户端连接出现异常的情况下,由服务器主动发布此消息。

只有在Will Flag位为1时,Will Qos和Will Retain才会被读取,此时消息体Playload中要出现Will Topic和Will Message具体内容,否则,Will QoS和Will Retain值会被忽略掉。

Will Qos

两位表示,和PUBLISH消息固定头部的QoS level含义一样。

若标识了Will Flag值为1,那么Will QoS就会生效,否则会被忽略掉。

Will RETAIN

如果设置Will Flag,Will Retain标志就是有效的,否则它将被忽略。

当客户端意外断开服务器发布其Will Message之后,服务器是否应该继续保存。这个属性和PUBLISH固定头部的RETAIN标志含义一样,这里先掠过。

User name 和 password Flag:

用于授权,两者要么为0要么为1,否则都是无效。都为0,表示客户端可自由连接/订阅,都为1,表示连接/订阅需要授权。

 

bit76543210
  User Name Flag Password Flag Will Retain Will QoS Will Flag Clean Session Reserved
  x x x x x x   x

 

Playload/消息体/负荷

消息体主要是为配合固定/可变头部命令(比如CONNECT可变头部User name标记若为1则需要在消息体中附加用户名称字符串)而存在。

CONNECT/SUBSCRIBE/SUBACK/PUBLISH等消息有消息体。PUBLISH的消息体以二进制形式对待。

MQTT协议只允许在PUBLISH类型消息体中使用自定义特性,在固定/可变头部想加入自定义私有特性是不允许的。

这也是为了协议免于流于形式,变得很分裂也为了兼顾现有客户端等。比如支持压缩等,那就可以在Playload中定义数据支持,在应用中进行读取处理。

这部分会在后面详细论述。

消息标识符/消息ID

固定头中的QoS level标志值为1或2时才会在:PUBLISH,PUBACK,PUBREC,PUBREL,PUBCOMP,SUBSCRIBE,SUBACK,UNSUBSCRIBE,UNSUBACK等消息的可变头中出现。

一个16位无符号位的short类型值(值不能为 0,0做保留作为无效的消息ID),仅仅要求在一个特定方向(服务器发往客户端为一个方向,客户端发送到服务器端为另一个方向)的通信消息中必须唯一。比如客户端发往服务器,有可能存在服务器发往客户端会同时存在重复,但不碍事。

可变头部中,需要两个字节的顺序是MSB(Most Significant Bit) LSB(Last/Least Significant Bit),翻译成中文就是,最高有效位,最低有效位。最高有效位在最低有效位左边/上面,表示这是一个大端字节/网络字节序,符合人的阅读习惯,高位在最左边。

bit76543210
  Message Identifier MSB
  Message Identifier LSB

最大长度可为: 65535

UTF-8编码

有关字符串,MQTT采用的是修改版的UTF-8编码,一般形式为如下:

bit76543210
byte 1 String Length MSB
byte 2 String Length LSB
bytes 3 ... Encoded Character Data

 

 最后的结构如下:

 

 Description76543210
Fixed header/固定头部
    Message Type(1) DUP flag QoS level RETAIN
byte 1
  0 0 0 1 x x x x
byte 2 Remaining Length
Variable header/可变头部
Protocol Name
byte 1 Length MSB (0) 0 0 0 0 0 0 0 0
byte 2 Length LSB (6) 0 0 0 0 0 1 1 0
byte 3 ‘M‘ 0 1 0 0 1 1 0 1
byte 4 ‘Q‘ 0 1 0 1 0 0 0 1
byte 5 ‘I‘ 0 1 0 0 1 0 0 1
byte 6 ‘s‘ 0 1 1 1 0 0 1 1
byte 7 ‘d‘ 0 1 1 0 0 1 0 0
byte 8 ‘p‘ 0 1 1 1 0 0 0 0
Protocol Version Number
byte 9 Version (3) 0 0 0 0 0 0 1 1
Connect Flags
  User Name Flag Password Flag Will Retain Will QoS Will Flag Clean Session Reserved
byte 10
1 1 0 0 1 1 1 x
Keep Alive timer
byte 11 Keep Alive MSB (0) 0 0 0 0 0 0 0 0
byte 12 Keep Alive LSB (10) 0 0 0 0 1 0 1 0
Playload/消息体

Client Identifier(客户端ID)

1-23个字符长度,客户端到服务器的全局唯一标志,如果客户端ID超出23个字符长度,服务器需要返回码为2,标识符被拒绝响应的CONNACK消息。
处理QoS级别1和2的消息ID中,可以使用到。
必填项。

Will Topic

Will Flag值为1,这里便是Will Topic的内容。QoS级别通过Will QoS字段定义,RETAIN值通过Will RETAIN标识,都定义在可变头里面。

Will Message

Will Flag若设为1,这里便是Will Message定义消息的内容,对应的主题为Will Topic。如果客户端意外的断开触发服务器PUBLISH此消息。
长度有可能为0。
在CONNECT消息中的Will Message是UTF-8编码的,当被服务器发布时则作为二进制的消息体。

User Name

如果设置User Name标识,可以在此读取用户名称。一般可用于身份验证。协议建议用户名为不多于12个字符,不是必须。

Password

如果设置Password标识,便可读取用户密码。建议密码为12个字符或者更少,但不是必须。

 

心跳时间(Keep Alive timer)

以秒为单位,定义服务器端从客户端接收消息的最大时间间隔。一般应用服务会在业务层次检测客户端网络是否连接,不是TCP/IP协议层面的心跳机制(比如开启SOCKET的SO_KEEPALIVE选项)。 一般来讲,在一个心跳间隔内,客户端发送一个PINGREQ消息到服务器,服务器返回PINGRESP消息,完成一次心跳交互,继而等待下一轮。若客户端没有收到心跳反馈,会关闭掉TCP/IP端口连接,离线。 16位两个字节,可看做一个无符号的short类型值。最大值,2^16-1 = 65535秒 = 18小时。最小值可以为0,表示客户端不断开。一般设为几分钟,比如微信心跳周期为300秒。

Will Message编码

Will Message在CONNECT Payload/消息体中,使用UTF-8编码。假设内容为“abcd”,大概如下:

 Description76543210
byte 1 Length MSB (0) 0 0 0 0 0 0 0 0
byte 2 Length LSB (4) 0 0 0 0 0 1 0 0
byte 3 ‘a‘ (0x61) 0 1 1 0 0 0 0 1
byte 4 ‘b‘ (0x62) 0 1 1 0 0 0 1 0
byte 5 ‘c‘ (0x63) 0 1 1 0 0 0 1 1
byte 6 ‘d‘ (0x64) 0 1 1 0 0 1 0 0

有一点需要记住,PUBLISH的Payload/消息体中以二进制编码保存。

某刻客户端异常关闭触发服务器会PUBLISH此消息。那么服务器会直接把byte3-byte6之间字符取出,保存为二进制,附加到PUBLISH消息体中,大概存储如下:

 Description76543210
byte 1 ‘a‘ (0x61) 0 1 1 0 0 0 0 1
byte 2 ‘b‘ (0x62) 0 1 1 0 0 0 1 0
byte 3 ‘c‘ (0x63) 0 1 1 0 0 0 1 1
byte 4 ‘d‘ (0x64) 0 1 1 0 0 1 0 0

另外,MQTT 3.1协议对Will message的说明很容易引起误解,3.1.1草案已经得到修正。

相关说明:

http://mqtt.org/wiki/doku.php/willmessageutf8_support

https://tools.oasis-open.org/issues/browse/MQTT-2

连接异常中断通知机制

CONNECT消息一旦设置在可变头部设置了Will flag标记,那就启用了Last-Will-And-Testament特性,此特性很赞。

一旦客户端出现异常中断,便会触发服务器发布Will Message消息到Will Topic主题上去,通知Will Topic订阅者,对方因异常退出。

接收CONNECT后的响应动作

接收到CONNECT消息之后,服务器应该返回一个CONNACK消息作为响应:

    1. 若客户端绕过CONNECT消息直接发送其它类型消息,服务器应关闭此非法连接 若客户端发送CONNECT之后未收到CONNACT,需要关闭当前连接,然后重新连接
    2. 相同Client ID客户端已连接到服务器,先前客户端必须断开连接后,服务器才能完成新的客户端CONNECT连接 客户端发送无效非法CONNECT消息,服务器需要关闭

CONNACK

一个完整的CONNACK消息大致如下:

 Description76543210
Fixed header/固定头部
byte 1   Message type (2) DUP flag QoS flags RETAIN
    0 0 1 0 x x x x
byte 2   Remaining Length (2)
    0 0 0 0 0 0 1 0
Variable header/可变头部
Topic Name Compression Response
byte 1 Reserved values. Not used. x x x x x x x x
Connect Return Code
byte 2 Return Code                

可变头部第一个字节为保留,无甚用处。第二个字节为连接握手返回码:

返回值 16进制 含义
0 0x00 Connection Accepted
1 0x01 Connection Refused: unacceptable protocol version
2 0x02 Connection Refused: identifier rejected
3 0x03 Connection Refused: server unavailable
4 0x04 Connection Refused: bad user name or password
5 0x05 Connection Refused: not authorized
6-255   Reserved for future use

只有0-5目前被使用到,其他值有待日后使用。一般返回值为0x00,表示连接建立。非法的请求,需要返回相应的数值。

从上面看出,一个CONNACT,四个字节表示。一个正常的CONNACT消息实际内容可能如下: 0x20 0x02 0x00 0x00

若是在私有协议中,两个字节就足够了。

很多时候,客户端和服务器端在没有消息传递时,会一直保持着连接。虽然不能依靠TCP心跳机制(比如SO_KEEPALIVE选项),业务层面定义心跳机制,会让连接状态检测、控制更为直观。

 

PINGREQ

由客户端发送到服务器端,证明自己还在一直连接着呢。两个字节,固定值。

 Description76543210
Fixed header/固定头部
byte 1   Message type (12) DUP flag QoS flags RETAIN
    1 1 0 0 x x x x
byte 2   Remaining Length (0)
    0 0 0 0 0 0 0 0

客户端会在一个心跳周期内发送一条PINGREQ消息到服务器端。

心跳频率在CONNECT可变头部“Keep Alive timer”中定义时间,单位为秒,无符号16位short表示。

PINGRESP

服务器收到PINGREQ请求之后,会立即响应一个两个字节固定格式的PINGRESP消息。

 Description76543210
Fixed header/固定头部
byte 1   Message type (13) DUP flag QoS flags RETAIN
    1 1 0 1 x x x x
byte 2   Remaining Length (0)
    0 0 0 0 0 0 0 0

服务器一般若在1.5倍的心跳周期内接收不到客户端发送的PINGREQ,可考虑关闭客户端的连接描述符。此时的关闭连接的行为和接收到客户端发送DISCONNECT消息的处理行为一致,但对客户端的订阅不会产生影响(不会清除客户端订阅数据),这个需要牢记。

若客户端发送PINGREQ之后的一个心跳周期内接收不到PINGRESP消息,可考虑关闭TCP/IP套接字连接。

DISCONNECT

客户端主动发送到服务器端,表明即将关闭TCP/IP连接。此时要求服务器要完整、干净的进行断开处理,不能仅仅类似于关闭连接描述符类似草草处理之。 需要两个字节,值固定:

 Description76543210
Fixed header/固定头部
byte 1   Message type (14) DUP flag QoS flags RETAIN
    1 1 1 0 x x x x
byte 2   Remaining Length (0)
    0 0 0 0 0 0 0 0

服务器要根据先前此客户端在发送CONNECT消息可变头部Connect flag中的“Clean session flag”所设置值,再次复习一下:

1、值为0,服务器必须在客户端断开之后继续存储/保持客户端的订阅状态。这些状态包括:

  • 存储订阅的消息QoS1和QoS2消息
  • 正在发送消息期间连接丢失导致发送失败的消息
  • 以便当客户端重新连接时以上消息可以被重新传递。

2、值为1,服务器需要立刻清理连接状态数据。

有一点需要牢记,服务器在接收到客户端发送的DISCONNECT消息之后,需要主动关闭TCP/IP连接。

 

参考大神博客:http://www.blogjava.net/yongboy/archive/2014/02/09/409630.html
























以上是关于android消息推送,使用MQTT协议,谁有用java写过服务端的主要内容,如果未能解决你的问题,请参考以下文章

MQTT与Mosquitto服务器搭建以及Android推送MQTT简介

android消息推送GCM、XMPP、MQTT三种方案的优劣,越仔细越好,有具体分析更好!

移动互联网消息推送原理:长连接+心跳机制(MQTT协议)

Android 项目必备APP 消息推送

Android 项目必备(三十八)-->APP 消息推送

MQTT协议及推送服务