文章转自:https://www.whqft.com/tech/ba0fbbea4fbe52ab5c48a1a3680b38c0.html
目录
目录
前言
连接流程
连接配置
源码分析
总结
前言
在阅读本文之前,需要大家去了解一下Mqtt报文格式,还没有看的同学可以参考下此文:https://blog.csdn.net/l_215851356/article/details/76070304?locationNum=5&fps=1, 后面的内容假设大家已经了解了mqtt报文的格式,也可以下载文件 https://github.com/rockstore/mqtt_learn/blob/master/docs/mqtt-cn.pdf 查看。
文章MQTT Demo尝试中已经说过,mqtt中所有的报文都是协议正常运行需要的报文,没有多余的,将mqtt报文总结一下,其实可以分为以下三类:
- 和连接相关的报文
- 和订阅相关
- 和消息相关
和连接相关的报文包括CONNECT, CONNACK, PINGREQ, PINGRESP, DISCONNECT,剩余两类报文会在后面进行说明。在和连接相关的报文又可以分为三类:
- 建立连接
- 保持连接
- 断开连接
其中和建立连接相关的报文包括CONNECT和CONNACK。本文将分析建立连接的流程,故本文主要涉及这两个报文。保持连接和断开连接会分别在两篇文章中进行分析。
连接流程
mqtt协议是建立在有序的、可靠的、双向字节流的通信链路上。网络连接建立完成之后,CONNECT数据包必须是Client向Broker发送的第一个数据包,而且在一个网络连接上,Client只能发送一次CONNECT报文,Broker收到第二个CONNECT报文时会将此视为协议违规而断开连接。
在上图中,Client和Broker之间的网络连接建立之后,Client首先向Broker发送CONNECT报文,Broker收到CONNECT报文后,对CONNECT报文进行解析,然后向Client发送CONNACK报文,告诉Client端协议是否可以建立,如果不能建立,将错误信息告知Client。
连接配置
mqtt协议的建立流程比较简单,真正需要我们关注的是连接的配置。以下是CONNECT数据包可变报头的结构:
在构建连接的过程中,我们需要关注的是连接标志ConnectFlags和KeepAlive部分。连接标志ConnectFlags部分定了7个标志,reserved为保留标志,broker会验证此标志是否为0,如果不为0则会断开和客户端的连接。剩余的6个标志解释如下:
- Clean Session
此字段指定了会话状态的处理方式,客户端和broker可以保存会话状态,以支持跨网络连接的可靠消息传输。此标志如果为0,broker必须基于当前会话(使用clientId识别)恢复与客户端的通信,如果为1,则必须新建一个会话。 - Will Flag
如果mqtt协议连接正常建立,遗嘱消息必须被存储在broker,之后客户端与broker之间的网络异常断开时,服务端必须发布遗嘱消息。 - Will Qos
遗嘱消息的qos - Will Retain
遗嘱消息在被发布时是否需要被保留 - Password Flag
如果此标志为1,则有效载荷中必须包含密码 - User Name Flag
如果此标志为1,则有效载荷中必须包含用户名
KeepAlive指在客户端传输完成一个控制报文的时刻到发送下一个报文的时刻,两者之间允许空闲的最大时间间隔,以秒为单位,可以将此字段理解为心跳包。
以上只是简单说明了字段的功能,字段的具体含义可以参考https://github.com/rockstore/mqtt_learn/blob/master/docs/mqtt-cn.pdf文档的3.1节。
源码分析
实操部分,我们跟着eclipse paho mqtt来分析一下mqtt协议的建立过程。demo部分的源码可以从https://github.com/rockstore/mqtt_learn获取。为了便于后面的源码分析,先把paho连接流程时序图贴出来。
源码的分析就从MainActivity
的connect
函数开始。connect函数首先设置了连接选项,然后新建MqttAndroidClient
对象mClient
, 最后执行mClient.connect
函数发起连接请求。
(1)
MqttAndroidClient的connect(connOpt)
函数最终会调用MqttAndroidClient的connect(connOpt, userContext, callback)
函数,这个函数会启动paho mqtt唯一一个组件MqttService
,这个组件不支持跨进程调用,如果需要将MqttService
放在其他进程,需要将和mqtt相关的调用全部放在同一个进程内,这部分的实现我会在https://github.com/rockstore/mqtt_learn新开一个分支进行说明。
由于需要使用MqttService
组件中的函数,需要在启动MqttService
后对MqttService
进行绑定。如果服务已经启动,则直接执行建立连接操作。这时候建立的连接仅仅是网络连接,不是mqtt协议连接。
MqttService
组件启动绑定后,开始执行doConnect
函数了。
private void doConnect() {if (clientHandle == null) {clientHandle = mqttService.getClient(serverURI, clientId,myContext.getApplicationInfo().packageName,persistence);}String activityToken = storeToken(connectToken);try {mqttService.connect(clientHandle, connectOptions, null,activityToken);}catch (MqttException e) {IMqttActionListener listener = connectToken.getActionCallback();if (listener != null) {listener.onFailure(connectToken, e);}}
}
突然出现了activityToken, connectToken, clientHandle,不要慌,一个一个分析。执行connect(connOpt, userContext, callback)
函数时,会生成connectToken,具体生成如下:
IMqttToken token = new MqttTokenAndroid(this, userContext,callback);connectToken = token;
token机制在paho mqtt实现中扮演着十分重要的角色,负责消息各种回调的实现,后面会单独分析paho mqtt的token机制。
看下mqttService.getClient
函数,
public String getClient(String serverURI, String clientId, String contextId, MqttClientPersistence persistence) {String clientHandle = serverURI + ":" + clientId+":"+contextId;if (!connections.containsKey(clientHandle)) {MqttConnection client = new MqttConnection(this, serverURI,clientId, persistence, clientHandle);connections.put(clientHandle, client);}return clientHandle;
}
clientHandle
是serverURI + ":" + clientId+":"+contextId
组合形成的字符串,contextId
是应用包名。MqttService
内部使用connections
记录每一个连接实例。说完了clientHandle
,接下来就说下activityToke
,看下storeToke
函数:
private synchronized String storeToken(IMqttToken token) {tokenMap.put(tokenNumber, token);return Integer.toString(tokenNumber++);
}
MqttAndroidToken
内部使用tokenMap
记录每每次调用生成的token, 将tokenNumber
返回。
###(2)
执行完doConnect
函数后,函数调用到了MqttService
组件中
public void connect(String clientHandle, MqttConnectOptions connectOptions,String invocationContext, String activityToken)throws MqttSecurityException, MqttException {MqttConnection client = getConnection(clientHandle);client.connect(connectOptions, null, activityToken);
}
是不是很熟悉,在上一步,将生成的MqttConnection
实例保存起来,这一步通过getConnection
重新获取。接下来看下client.connect
public void connect(MqttConnectOptions options, String invocationContext,String activityToken) {connectOptions = options;reconnectActivityToken = activityToken;if (options != null) {cleanSession = options.isCleanSession();}if (connectOptions.isCleanSession()) { // if it's a clean session,// discard old dataservice.messageStore.clearArrivedMessages(clientHandle);}service.traceDebug(TAG, "Connecting {" + serverURI + "} as {" + clientId + "}");......try {......IMqttActionListener listener = new MqttConnectionListener(resultBundle) {@Overridepublic void onSuccess(IMqttToken asyncActionToken) {......}@Overridepublic void onFailure(IMqttToken asyncActionToken,Throwable exception) {......}};if (myClient != null) {...... service.traceDebug(TAG, "myClient != null and the client is not connected");service.traceDebug(TAG,"Do Real connect!");setConnectingState(true);myClient.connect(connectOptions, invocationContext, listener);......}// if myClient is null, then create a new connectionelse {alarmPingSender = new AlarmPingSender(service);myClient = new MqttAsyncClient(serverURI, clientId,persistence, alarmPingSender);myClient.setCallback(this);service.traceDebug(TAG,"Do Real connect!");setConnectingState(true);myClient.connect(connectOptions, invocationContext, listener);}} catch (Exception e) {service.traceError(TAG, "Exception occurred attempting to connect: " + e.getMessage());setConnectingState(false);handleException(resultBundle, e);}
}
函数体比较长,我去掉了回调和持久化部分的代码,回调部分的代码会在分析token机制的时候进行说明。这段代码的主要作用就是新建了MqttAsyncClient
对象,然后注册了回调函数,然后去执行connect
函数,同时将状态置为正在连接状态。接下来就分析下MqttAsyncClient.connect
函数
###(3)
public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback)throws MqttException, MqttSecurityException {final String methodName = "connect";// 进行状态判断if (comms.isConnected()) {throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED);}if (comms.isConnecting()) {throw new MqttException(MqttException.REASON_CODE_CONNECT_IN_PROGRESS);}if (comms.isDisconnecting()) {throw new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING);}if (comms.isClosed()) {throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED);}if (options == null) {options = new MqttConnectOptions();}this.connOpts = options;this.userContext = userContext;final boolean automaticReconnect = options.isAutomaticReconnect();// 设置网络连接comms.setNetworkModules(createNetworkModules(serverURI, options));// 设置重连回调comms.setReconnectCallback(new MqttReconnectCallback(automaticReconnect));// Insert our own callback to iterate through the URIs till the connect// succeedsMqttToken userToken = new MqttToken(getClientId());ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options,userToken, userContext, callback, reconnecting);userToken.setActionCallback(connectActionListener);userToken.setUserContext(this);// If we are using the MqttCallbackExtended, set it on the// connectActionListenerif (this.mqttCallback instanceof MqttCallbackExtended) {connectActionListener.setMqttCallbackExtended((MqttCallbackExtended) this.mqttCallback);}comms.setNetworkModuleIndex(0);connectActionListener.connect();return userToken;
}
MqttAsyncClient.connect
函数的主要作用是设置了网络连接模块,设置重连回调,最后执行connectActionListener.connect
函数。先分析一下网络连接的设置createNetworkModules
函数:
protected NetworkModule[] createNetworkModules(String address, MqttConnectOptions options)throws MqttException, MqttSecurityException {NetworkModule[] networkModules = null;String[] serverURIs = options.getServerURIs();String[] array = null;if (serverURIs == null) {array = new String[] { address };} else if (serverURIs.length == 0) {array = new String[] { address };} else {array = serverURIs;}networkModules = new NetworkModule[array.length];for (int i = 0; i < array.length; i++) {networkModules[i] = createNetworkModule(array[i], options);}return networkModules;
}
options
实例在建立连接的过程中,我们也仅仅是设置了和连接相关的一些状态,并没有设置serverURI
,故options.getServerURIS
返回为null
。NetworkModule
为paho定义的接口,规定了网络模块需要实现的方法。目前paho定义的网络连接模块有TCPNetworkModule
,SSLNetworkModule
,WebsocketNetworkModule
,WebSocketSecureNetworkModule
,可以看下createNetworkModule
根据uri使用的协议类型创建对应的NetworkModule
,大家可以看下createNetwokModule
函数的实现。
创建完所有的NetworkModule
后,执行comms.setNetworknModule(0)
,先使用第一个NetworkModule
进行连接。comms
是ClientComms
类型的实例,在paho的实现中占有非常重要的地位,后序部分会进行分析。
###(4)
接下来就要分析ConnectActionListener.connect()
函数了
public void connect() throws MqttPersistenceException {MqttToken token = new MqttToken(client.getClientId());token.setActionCallback(this);token.setUserContext(this);// 打开持久化存储persistence.open(client.getClientId(), client.getServerURI());if (options.isCleanSession()) {persistence.clear();}// 设置版本if (options.getMqttVersion() == MqttConnectOptions.MQTT_VERSION_DEFAULT) {options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);}try {// 开始连接comms.connect(options, token);}catch (MqttException e) {onFailure(token, e);}
}
这段代码已经添加注释,执行完这个函数后,就要进入核心函数ClientComms.connect
了。
public void connect(MqttConnectOptions options, MqttToken token) throws MqttException {final String methodName = "connect";synchronized (conLock) {if (isDisconnected() && !closePending) {//@TRACE 214=state=CONNECTINGlog.fine(CLASS_NAME,methodName,"214");// 设置连接状态conState = CONNECTING;conOptions = options;// 构建CONNECT数据包MqttConnect connect = new MqttConnect(client.getClientId(),conOptions.getMqttVersion(),conOptions.isCleanSession(),conOptions.getKeepAliveInterval(),conOptions.getUserName(),conOptions.getPassword(),conOptions.getWillMessage(),conOptions.getWillDestination());// 设置clientState属性this.clientState.setKeepAliveSecs(conOptions.getKeepAliveInterval());this.clientState.setCleanSession(conOptions.isCleanSession());this.clientState.setMaxInflight(conOptions.getMaxInflight());tokenStore.open();ConnectBG conbg = new ConnectBG(this, token, connect, executorService);conbg.start();}else {.....}}
}
函数的逻辑很清晰,首先设置了conState,然后构建了CONNECT数据包。clientState
对象负责在receiver
和sender
之间处理消息,receiver
和sender
会在ConnectBG
中说明。
(5)
终于到核心中的核心了,ConnectBG
是一个Runnable类型,看下run
函数的实现。
public void run() {......try {......// Save the connect token in tokenStore as failure can occur before sendtokenStore.saveToken(conToken,conPacket);// 启动网络模块,发起网络连接NetworkModule networkModule = networkModules[networkModuleIndex];networkModule.start();// 连接完成后,启动receiver,负责从broker接收消息receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream());receiver.start("MQTT Rec: "+getClient().getClientId(), executorService);// 连接完成后,启动sender,负责向broker发送消息sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());sender.start("MQTT Snd: "+getClient().getClientId(), executorService);callback.start("MQTT Call: "+getClient().getClientId(), executorService);// 向broker发送CONNECT数据包internalSend(conPacket, conToken);} ......}
}
代码已经注释的很清楚了,在run
函数内部,首先建立客户端到broker之间的网络连接,然后分别启动receiver和sender,接下来就是mqtt协议建立的过程。执行internalSend(conPacket, conToken)
,向broker发送CONNECT
数据包。
void internalSend(MqttWireMessage message, MqttToken token) throws MqttException {......try {// Persist if needed and send the messagethis.clientState.send(message, token);} catch(MqttException e) {......}
}
前面说过,clientState负责在receiver和sender之间进行消息处理,可以将sender看做是clientState的消费者, receiver负责接收来自broker的消息。看下ClientState.send
函数
public void send(MqttWireMessage message, MqttToken token) throws MqttException {final String methodName = "send";......if (token != null ) {try {token.internalTok.setMessageID(message.getMessageId());} catch (Exception e) {}}if (message instanceof MqttPublish) {......} else {//@TRACE 615=pending send key={0} message {1}log.fine(CLASS_NAME,methodName,"615", new Object[]{new Integer(message.getMessageId()), message});if (message instanceof MqttConnect) {synchronized (queueLock) {// Add the connect action at the head of the pending queue ensuring it jumps// ahead of any of other pending actions.tokenStore.saveToken(token, message);pendingFlows.insertElementAt(message,0);queueLock.notifyAll();}} else {......}}
}
send函数将消息体添加到pendingFlows中,等待sender的调度并发送。sender时Runnable实例,看下sender是如何调度发送的,以下是sender的run
函数:
try {while (running && (out != null)) {try {message = clientState.get();if (message != null) {if (message instanceof MqttAck) {out.write(message);out.flush();} else {MqttToken token = tokenStore.getToken(message);......if (token != null) {synchronized (token) {out.write(message);try {out.flush();} catch (IOException ex) {......}clientState.notifySent(message);}}}} else { // null message......}
sender不断循环从clientState获取待发送的消息,clientState.get
函数大家可以自行分析。
前面说过,客户端向broker发送了CONNECT数据包后,broker会向客户端发送CONACK数据包,告诉客户端mqtt协议连接是否可以建立。receiver负责从broker接收消息,receiver是Runnable类型,看下receiver的run
函数实现:
public void run() {......while (running && (in != null)) {try {......receiving = in.available() > 0;MqttWireMessage message = in.readMqttWireMessage();receiving = false;// instanceof checks if message is nullif (message instanceof MqttAck) {token = tokenStore.getToken(message);if (token!=null) {synchronized (token) {......clientState.notifyReceivedAck((MqttAck)message);}} else if(message instanceof MqttPubRec || message instanceof MqttPubComp || message instanceof MqttPubAck) {......}}}.....
}
receiver收到消息后,由于CONACK
数据包是MqttAck
类型,且token不为null,故会执行clientState.notifyReceivedAck
函数。
protected void notifyReceivedAck(MqttAck ack) throws MqttException {MqttToken token = tokenStore.getToken(ack);MqttException mex = null;if (token == null) {// @TRACE 662=no message found for ack id={0}log.fine(CLASS_NAME, methodName, "662", new Object[] {new Integer(ack.getMessageId())});} else if (ack instanceof MqttPubRec) {......} else if (ack instanceof MqttPubAck || ack instanceof MqttPubComp) {...... } else if (ack instanceof MqttPingResp) {......} else if (ack instanceof MqttConnack) {int rc = ((MqttConnack) ack).getReturnCode();if (rc == 0) {synchronized (queueLock) {if (cleanSession) {clearState();// Add the connect token back in so that users can be // notified when connect completes.tokenStore.saveToken(token,ack);}inFlightPubRels = 0;actualInFlight = 0;restoreInflightMessages();connected();}} else {mex = ExceptionHelper.createMqttException(rc);throw mex;}clientComms.connectComplete((MqttConnack) ack, mex);notifyResult(ack, token, mex);tokenStore.removeToken(ack);// Notify the sender thread that there maybe work for it to do nowsynchronized (queueLock) {queueLock.notifyAll();}} else {......}checkQuiesceLock();
}
ok,终于看到mqtt协议连接建立了,根据CONACK数据包中的返回码判断协议连接是否已经建立,0表示服务端接受连接,协议正常建立。
总结
mqtt协议的建立过程其实比较简单,需要我们理解的是CONNECT报文的选项设置。尤其是cleanSession和keepAliveInterval的设置,这两项设置需要根据具体的环境进行配置,cleanSession如果设置为0,则broker和客户端需要保存会话状态,且需要保持同步;对于一些非手持设备而言,网络断线的概率较小,此时可以将keepAliveInterval的时间设置长一些,比如60或者90秒。
文中一些对mqtt协议相关的解释限于篇幅,未做详细解释,大家可以参考https://github.com/rockstore/mqtt_learn/blob/master/docs/mqtt-cn.pdf文档详细查看。
源码的分析可能有疏漏,欢迎大家指正!!