当前位置: 代码迷 >> 综合 >> MQTT 连接建立流程及 paho mqtt 源码分析
  详细解决方案

MQTT 连接建立流程及 paho mqtt 源码分析

热度:91   发布时间:2023-12-15 23:23:11.0

文章转自:https://www.whqft.com/tech/ba0fbbea4fbe52ab5c48a1a3680b38c0.html

目录

目录
前言
连接流程
连接配置
源码分析
总结

前言

在阅读本文之前,需要大家去了解一下Mqtt报文格式,还没有看的同学可以参考下此文:https://blog.csdn.net/l_215851356/article/details/76070304?locationNum=5&fps=1, 后面的内容假设大家已经了解了mqtt报文的格式,也可以下载文件 https://github.com/rockstore/mqtt_learn/blob/master/docs/mqtt-cn.pdf 查看。

文章MQTT Demo尝试中已经说过,mqtt中所有的报文都是协议正常运行需要的报文,没有多余的,将mqtt报文总结一下,其实可以分为以下三类:

  • 和连接相关的报文
  • 和订阅相关
  • 和消息相关

和连接相关的报文包括CONNECT, CONNACK, PINGREQ, PINGRESP, DISCONNECT,剩余两类报文会在后面进行说明。在和连接相关的报文又可以分为三类:

  • 建立连接
  • 保持连接
  • 断开连接

其中和建立连接相关的报文包括CONNECT和CONNACK。本文将分析建立连接的流程,故本文主要涉及这两个报文。保持连接和断开连接会分别在两篇文章中进行分析。

连接流程


mqtt协议是建立在有序的、可靠的、双向字节流的通信链路上。网络连接建立完成之后,CONNECT数据包必须是Client向Broker发送的第一个数据包,而且在一个网络连接上,Client只能发送一次CONNECT报文,Broker收到第二个CONNECT报文时会将此视为协议违规而断开连接。

在上图中,Client和Broker之间的网络连接建立之后,Client首先向Broker发送CONNECT报文,Broker收到CONNECT报文后,对CONNECT报文进行解析,然后向Client发送CONNACK报文,告诉Client端协议是否可以建立,如果不能建立,将错误信息告知Client。

连接配置

mqtt协议的建立流程比较简单,真正需要我们关注的是连接的配置。以下是CONNECT数据包可变报头的结构:

在构建连接的过程中,我们需要关注的是连接标志ConnectFlags和KeepAlive部分。连接标志ConnectFlags部分定了7个标志,reserved为保留标志,broker会验证此标志是否为0,如果不为0则会断开和客户端的连接。剩余的6个标志解释如下:

  • Clean Session
    此字段指定了会话状态的处理方式,客户端和broker可以保存会话状态,以支持跨网络连接的可靠消息传输。此标志如果为0,broker必须基于当前会话(使用clientId识别)恢复与客户端的通信,如果为1,则必须新建一个会话。
  • Will Flag
    如果mqtt协议连接正常建立,遗嘱消息必须被存储在broker,之后客户端与broker之间的网络异常断开时,服务端必须发布遗嘱消息。
  • Will Qos
    遗嘱消息的qos
  • Will Retain
    遗嘱消息在被发布时是否需要被保留
  • Password Flag
    如果此标志为1,则有效载荷中必须包含密码
  • User Name Flag
    如果此标志为1,则有效载荷中必须包含用户名

KeepAlive指在客户端传输完成一个控制报文的时刻到发送下一个报文的时刻,两者之间允许空闲的最大时间间隔,以秒为单位,可以将此字段理解为心跳包。
以上只是简单说明了字段的功能,字段的具体含义可以参考https://github.com/rockstore/mqtt_learn/blob/master/docs/mqtt-cn.pdf文档的3.1节。

源码分析

实操部分,我们跟着eclipse paho mqtt来分析一下mqtt协议的建立过程。demo部分的源码可以从https://github.com/rockstore/mqtt_learn获取。为了便于后面的源码分析,先把paho连接流程时序图贴出来。

源码的分析就从MainActivityconnect函数开始。connect函数首先设置了连接选项,然后新建MqttAndroidClient对象mClient, 最后执行mClient.connect函数发起连接请求。

(1)

MqttAndroidClient的connect(connOpt)函数最终会调用MqttAndroidClient的connect(connOpt, userContext, callback)函数,这个函数会启动paho mqtt唯一一个组件MqttService这个组件不支持跨进程调用,如果需要将MqttService放在其他进程,需要将和mqtt相关的调用全部放在同一个进程内,这部分的实现我会在https://github.com/rockstore/mqtt_learn新开一个分支进行说明。

由于需要使用MqttService组件中的函数,需要在启动MqttService后对MqttService进行绑定。如果服务已经启动,则直接执行建立连接操作。这时候建立的连接仅仅是网络连接,不是mqtt协议连接。

MqttService组件启动绑定后,开始执行doConnect函数了。

private void doConnect() {if (clientHandle == null) {clientHandle = mqttService.getClient(serverURI, clientId,myContext.getApplicationInfo().packageName,persistence);}String activityToken = storeToken(connectToken);try {mqttService.connect(clientHandle, connectOptions, null,activityToken);}catch (MqttException e) {IMqttActionListener listener = connectToken.getActionCallback();if (listener != null) {listener.onFailure(connectToken, e);}}
}

突然出现了activityToken, connectToken, clientHandle,不要慌,一个一个分析。执行connect(connOpt, userContext, callback)函数时,会生成connectToken,具体生成如下:

IMqttToken token = new MqttTokenAndroid(this, userContext,callback);connectToken = token;

token机制在paho mqtt实现中扮演着十分重要的角色,负责消息各种回调的实现,后面会单独分析paho mqtt的token机制。
看下mqttService.getClient函数,

public String getClient(String serverURI, String clientId, String contextId, MqttClientPersistence persistence) {String clientHandle = serverURI + ":" + clientId+":"+contextId;if (!connections.containsKey(clientHandle)) {MqttConnection client = new MqttConnection(this, serverURI,clientId, persistence, clientHandle);connections.put(clientHandle, client);}return clientHandle;

}

clientHandleserverURI + ":" + clientId+":"+contextId组合形成的字符串,contextId是应用包名。MqttService内部使用connections记录每一个连接实例。说完了clientHandle,接下来就说下activityToke,看下storeToke函数:

private synchronized String storeToken(IMqttToken token) {tokenMap.put(tokenNumber, token);return Integer.toString(tokenNumber++);
}

MqttAndroidToken内部使用tokenMap记录每每次调用生成的token, 将tokenNumber返回。

###(2)
执行完doConnect函数后,函数调用到了MqttService组件中

public void connect(String clientHandle, MqttConnectOptions connectOptions,String invocationContext, String activityToken)throws MqttSecurityException, MqttException {MqttConnection client = getConnection(clientHandle);client.connect(connectOptions, null, activityToken);
}

是不是很熟悉,在上一步,将生成的MqttConnection实例保存起来,这一步通过getConnection重新获取。接下来看下client.connect

public void connect(MqttConnectOptions options, String invocationContext,String activityToken) {connectOptions = options;reconnectActivityToken = activityToken;if (options != null) {cleanSession = options.isCleanSession();}if (connectOptions.isCleanSession()) { // if it's a clean session,// discard old dataservice.messageStore.clearArrivedMessages(clientHandle);}service.traceDebug(TAG, "Connecting {" + serverURI + "} as {" + clientId + "}");......try {......IMqttActionListener listener = new MqttConnectionListener(resultBundle) {@Overridepublic void onSuccess(IMqttToken asyncActionToken) {......}@Overridepublic void onFailure(IMqttToken asyncActionToken,Throwable exception) {......}};if (myClient != null) {......				service.traceDebug(TAG, "myClient != null and the client is not connected");service.traceDebug(TAG,"Do Real connect!");setConnectingState(true);myClient.connect(connectOptions, invocationContext, listener);......}// if myClient is null, then create a new connectionelse {alarmPingSender = new AlarmPingSender(service);myClient = new MqttAsyncClient(serverURI, clientId,persistence, alarmPingSender);myClient.setCallback(this);service.traceDebug(TAG,"Do Real connect!");setConnectingState(true);myClient.connect(connectOptions, invocationContext, listener);}} catch (Exception e) {service.traceError(TAG, "Exception occurred attempting to connect: " + e.getMessage());setConnectingState(false);handleException(resultBundle, e);}
}

函数体比较长,我去掉了回调和持久化部分的代码,回调部分的代码会在分析token机制的时候进行说明。这段代码的主要作用就是新建了MqttAsyncClient对象,然后注册了回调函数,然后去执行connect函数,同时将状态置为正在连接状态。接下来就分析下MqttAsyncClient.connect函数
###(3)

public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback)throws MqttException, MqttSecurityException {final String methodName = "connect";// 进行状态判断if (comms.isConnected()) {throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED);}if (comms.isConnecting()) {throw new MqttException(MqttException.REASON_CODE_CONNECT_IN_PROGRESS);}if (comms.isDisconnecting()) {throw new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING);}if (comms.isClosed()) {throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED);}if (options == null) {options = new MqttConnectOptions();}this.connOpts = options;this.userContext = userContext;final boolean automaticReconnect = options.isAutomaticReconnect();// 设置网络连接comms.setNetworkModules(createNetworkModules(serverURI, options));// 设置重连回调comms.setReconnectCallback(new MqttReconnectCallback(automaticReconnect));// Insert our own callback to iterate through the URIs till the connect// succeedsMqttToken userToken = new MqttToken(getClientId());ConnectActionListener connectActionListener = new ConnectActionListener(this, persistence, comms, options,userToken, userContext, callback, reconnecting);userToken.setActionCallback(connectActionListener);userToken.setUserContext(this);// If we are using the MqttCallbackExtended, set it on the// connectActionListenerif (this.mqttCallback instanceof MqttCallbackExtended) {connectActionListener.setMqttCallbackExtended((MqttCallbackExtended) this.mqttCallback);}comms.setNetworkModuleIndex(0);connectActionListener.connect();return userToken;
}

MqttAsyncClient.connect函数的主要作用是设置了网络连接模块,设置重连回调,最后执行connectActionListener.connect函数。先分析一下网络连接的设置createNetworkModules函数:

protected NetworkModule[] createNetworkModules(String address, MqttConnectOptions options)throws MqttException, MqttSecurityException {NetworkModule[] networkModules = null;String[] serverURIs = options.getServerURIs();String[] array = null;if (serverURIs == null) {array = new String[] { address };} else if (serverURIs.length == 0) {array = new String[] { address };} else {array = serverURIs;}networkModules = new NetworkModule[array.length];for (int i = 0; i < array.length; i++) {networkModules[i] = createNetworkModule(array[i], options);}return networkModules;
}

options实例在建立连接的过程中,我们也仅仅是设置了和连接相关的一些状态,并没有设置serverURI,故options.getServerURIS返回为nullNetworkModule为paho定义的接口,规定了网络模块需要实现的方法。目前paho定义的网络连接模块有TCPNetworkModuleSSLNetworkModuleWebsocketNetworkModuleWebSocketSecureNetworkModule,可以看下createNetworkModule根据uri使用的协议类型创建对应的NetworkModule,大家可以看下createNetwokModule函数的实现。
创建完所有的NetworkModule后,执行comms.setNetworknModule(0),先使用第一个NetworkModule进行连接。commsClientComms类型的实例,在paho的实现中占有非常重要的地位,后序部分会进行分析。
###(4)
接下来就要分析ConnectActionListener.connect()函数了

public void connect() throws MqttPersistenceException {MqttToken token = new MqttToken(client.getClientId());token.setActionCallback(this);token.setUserContext(this);// 打开持久化存储persistence.open(client.getClientId(), client.getServerURI());if (options.isCleanSession()) {persistence.clear();}// 设置版本if (options.getMqttVersion() == MqttConnectOptions.MQTT_VERSION_DEFAULT) {options.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);}try {// 开始连接comms.connect(options, token);}catch (MqttException e) {onFailure(token, e);}
}

这段代码已经添加注释,执行完这个函数后,就要进入核心函数ClientComms.connect了。

public void connect(MqttConnectOptions options, MqttToken token) throws MqttException {final String methodName = "connect";synchronized (conLock) {if (isDisconnected() && !closePending) {//@TRACE 214=state=CONNECTINGlog.fine(CLASS_NAME,methodName,"214");// 设置连接状态conState = CONNECTING;conOptions = options;// 构建CONNECT数据包MqttConnect connect = new MqttConnect(client.getClientId(),conOptions.getMqttVersion(),conOptions.isCleanSession(),conOptions.getKeepAliveInterval(),conOptions.getUserName(),conOptions.getPassword(),conOptions.getWillMessage(),conOptions.getWillDestination());// 设置clientState属性this.clientState.setKeepAliveSecs(conOptions.getKeepAliveInterval());this.clientState.setCleanSession(conOptions.isCleanSession());this.clientState.setMaxInflight(conOptions.getMaxInflight());tokenStore.open();ConnectBG conbg = new ConnectBG(this, token, connect, executorService);conbg.start();}else {.....}}
}

函数的逻辑很清晰,首先设置了conState,然后构建了CONNECT数据包。clientState对象负责在receiversender之间处理消息,receiversender会在ConnectBG中说明。

(5)

终于到核心中的核心了,ConnectBG是一个Runnable类型,看下run函数的实现。

public void run() {......try {......// Save the connect token in tokenStore as failure can occur before sendtokenStore.saveToken(conToken,conPacket);// 启动网络模块,发起网络连接NetworkModule networkModule = networkModules[networkModuleIndex];networkModule.start();// 连接完成后,启动receiver,负责从broker接收消息receiver = new CommsReceiver(clientComms, clientState, tokenStore, networkModule.getInputStream());receiver.start("MQTT Rec: "+getClient().getClientId(), executorService);// 连接完成后,启动sender,负责向broker发送消息sender = new CommsSender(clientComms, clientState, tokenStore, networkModule.getOutputStream());sender.start("MQTT Snd: "+getClient().getClientId(), executorService);callback.start("MQTT Call: "+getClient().getClientId(), executorService);// 向broker发送CONNECT数据包internalSend(conPacket, conToken);} ......}
}

代码已经注释的很清楚了,在run函数内部,首先建立客户端到broker之间的网络连接,然后分别启动receiver和sender,接下来就是mqtt协议建立的过程。执行internalSend(conPacket, conToken),向broker发送CONNECT数据包。

void internalSend(MqttWireMessage message, MqttToken token) throws MqttException {......try {// Persist if needed and send the messagethis.clientState.send(message, token);} catch(MqttException e) {......}
}

前面说过,clientState负责在receiver和sender之间进行消息处理,可以将sender看做是clientState的消费者, receiver负责接收来自broker的消息。看下ClientState.send函数

public void send(MqttWireMessage message, MqttToken token) throws MqttException {final String methodName = "send";......if (token != null ) {try {token.internalTok.setMessageID(message.getMessageId());} catch (Exception e) {}}if (message instanceof MqttPublish) {......} else {//@TRACE 615=pending send key={0} message {1}log.fine(CLASS_NAME,methodName,"615", new Object[]{new Integer(message.getMessageId()), message});if (message instanceof MqttConnect) {synchronized (queueLock) {// Add the connect action at the head of the pending queue ensuring it jumps// ahead of any of other pending actions.tokenStore.saveToken(token, message);pendingFlows.insertElementAt(message,0);queueLock.notifyAll();}} else {......}}
}

send函数将消息体添加到pendingFlows中,等待sender的调度并发送。sender时Runnable实例,看下sender是如何调度发送的,以下是sender的run函数:

try {while (running && (out != null)) {try {message = clientState.get();if (message != null) {if (message instanceof MqttAck) {out.write(message);out.flush();} else {MqttToken token = tokenStore.getToken(message);......if (token != null) {synchronized (token) {out.write(message);try {out.flush();} catch (IOException ex) {......}clientState.notifySent(message);}}}} else { // null message......}

sender不断循环从clientState获取待发送的消息,clientState.get函数大家可以自行分析。

前面说过,客户端向broker发送了CONNECT数据包后,broker会向客户端发送CONACK数据包,告诉客户端mqtt协议连接是否可以建立。receiver负责从broker接收消息,receiver是Runnable类型,看下receiver的run函数实现:

  public void run() {......while (running && (in != null)) {try {......receiving = in.available() > 0;MqttWireMessage message = in.readMqttWireMessage();receiving = false;// instanceof checks if message is nullif (message instanceof MqttAck) {token = tokenStore.getToken(message);if (token!=null) {synchronized (token) {......clientState.notifyReceivedAck((MqttAck)message);}} else if(message instanceof MqttPubRec || message instanceof MqttPubComp || message instanceof MqttPubAck) {......}}}.....
}  

receiver收到消息后,由于CONACK数据包是MqttAck类型,且token不为null,故会执行clientState.notifyReceivedAck函数。

protected void notifyReceivedAck(MqttAck ack) throws MqttException {MqttToken token = tokenStore.getToken(ack);MqttException mex = null;if (token == null) {// @TRACE 662=no message found for ack id={0}log.fine(CLASS_NAME, methodName, "662", new Object[] {new Integer(ack.getMessageId())});} else if (ack instanceof MqttPubRec) {......} else if (ack instanceof MqttPubAck || ack instanceof MqttPubComp) {...... } else if (ack instanceof MqttPingResp) {......} else if (ack instanceof MqttConnack) {int rc = ((MqttConnack) ack).getReturnCode();if (rc == 0) {synchronized (queueLock) {if (cleanSession) {clearState();// Add the connect token back in so that users can be  // notified when connect completes.tokenStore.saveToken(token,ack);}inFlightPubRels = 0;actualInFlight = 0;restoreInflightMessages();connected();}} else {mex = ExceptionHelper.createMqttException(rc);throw mex;}clientComms.connectComplete((MqttConnack) ack, mex);notifyResult(ack, token, mex);tokenStore.removeToken(ack);// Notify the sender thread that there maybe work for it to do nowsynchronized (queueLock) {queueLock.notifyAll();}} else {......}checkQuiesceLock();
}

ok,终于看到mqtt协议连接建立了,根据CONACK数据包中的返回码判断协议连接是否已经建立,0表示服务端接受连接,协议正常建立。

总结

mqtt协议的建立过程其实比较简单,需要我们理解的是CONNECT报文的选项设置。尤其是cleanSession和keepAliveInterval的设置,这两项设置需要根据具体的环境进行配置,cleanSession如果设置为0,则broker和客户端需要保存会话状态,且需要保持同步;对于一些非手持设备而言,网络断线的概率较小,此时可以将keepAliveInterval的时间设置长一些,比如60或者90秒。

文中一些对mqtt协议相关的解释限于篇幅,未做详细解释,大家可以参考https://github.com/rockstore/mqtt_learn/blob/master/docs/mqtt-cn.pdf文档详细查看。

源码的分析可能有疏漏,欢迎大家指正!!