本文由博主原创,未经博主许可不得转载。
前言
一个mqtt broker要完成哪些任务?
- 作为并发服务器,维护多个客户端的TCP链路
- 处理客户端mqtt connect、disconnect、subscribe、publish、ping等请求
- 处理消息持久化即消息永久保存问题、处理不同QoS消息
为什么选择V0.1版本?
- 就像当初读Linux内核源码也是选一个很老的版本一样。避免“一头扎进去,迷失瀚海中”。
- 这是在http://mosquitto.org/files/source/中能找到的最老版本。
幸运的是,这个版本“麻雀虽小五脏俱全”,没有“特性蔓延”的问题,作为初学者研究非常合适。
Mosquitto-0.1是怎样完成broker要完成的任务的?
- 使用select处理客户端并发
- 使用结构mqtt3_context维护每个客户端信息
- 使用sqlite3创建表客户端clients、订阅sbus、消息messages、持久化retain,来实现对客户端-订阅/发布/持久化-消息的管理
下面进入具体剖析。
-
一、概述
Mosquito V0.1版本,实现了独立、完整的MQTT V3.1协议的服务端(broker)。源码行数约3000行,使用C语言编写,.c文件13个,broker使用其中的10个文件。因为mosquitto基于sqlite3,其编译链接和运行,需要libsqlite3.so。
文件名 |
主要函数 |
描述 |
conf.c |
mqtt3_config_read |
读取并解析配置文件 |
context.c |
mqtt3_context_init mqtt3_context_cleanup |
提供mqtt3_context的初始化和清理接口。 mqtt3_context结构含socket fd,客户端id,最后一次收发时间,保活时间等参数。 |
memory.c |
mqtt3_calloc mqtt3_free等 |
提供内存分配和使用接口 |
database.c |
mqtt3_db_open mqtt3_db_close _mqtt3_db_tables_create _mqtt3_db_statement_prepare mqtt3_db_XXX_insert等 |
提供mqtt相关sqlite3数据库操作接口 |
net.c |
mqtt3_socket_listen |
提供TCP socket接口 |
mosquito.c |
handle_read |
入口函数所在文件 |
raw_send.c |
mqtt3_raw_publish mqtt3_raw_puback等 |
提供mqtt原始报文发送接口 |
raw_send_client.c |
mqtt3_raw_connect mqtt3_raw_disconnect mqtt3_raw_subscribe mqtt3_raw_unsubscribe |
提供客户端mqtt原始报文发送接口,broker不使用该文件 |
raw_send_server.c |
mqtt3_raw_connack |
提供connect ack发送接口 |
read_handle.c |
mqtt3_handle_publish mqtt3_handle_puback mqtt3_handle_pingreq等 |
提供socket上读入数据处理接口 |
read_handle_client.c |
mqtt3_handle_connack mqtt3_handle_suback mqtt3_handle_unsuback |
提供客户端socket上读入数据处理接口,broker不使用该文件 |
read_handle_server.c |
mqtt3_handle_connect mqtt3_handle_disconnect mqtt3_handle_subscribe mqtt3_handle_unsubscribe |
提供mqtt conn/disconn/sub/unsub命令处理接口 |
util.c |
mqtt3_command_to_string |
提供工具,未使用 |
文件调用关系
mosquito.c - - > conf.c
mosquito.c - - > read_handle_server.c - - > context.c|- - > database.c|- - > raw_send.c|- - > raw_send_server.c|- - > net.c
mosquito.c - - > read_handle.c|- - > database.c| - - > net.c
mosquito.c - - > memory.c
mosquito.c - - > database.c
程序的运行,需要libsqlite3.so
编译和使用
略
-
二、数据结构
数据库表
mosquitto启动后共创建5个表:
1、主要用于版本信息的config
CREATE TABLE config ( [key] TEXT PRIMARY KEY, value TEXT);
2、记录客户端信息的clients
CREATE TABLE clients (sock INTEGER,id TEXT PRIMARY KEY,clean_start INTEGER,will INTEGER,will_retain INTEGER,will_qos INTEGER,will_topic TEXT,will_message TEXT,last_mid INTEGER
);
3、订阅信息subs
CREATE TABLE subs ( client_id TEXT, sub TEXT, qos INTEGER);
4、持久化消息retain
CREATE TABLE retain (sub TEXT,qos INTEGER,payloadlen INTEGER,payload BLOB
);
5、客户端消息messages
CREATE TABLE messages (client_id TEXT,timestamp INTEGER,direction INTEGER,status INTEGER,mid INTEGER,dup INTEGER,qos INTEGER,retain INTEGER,sub TEXT,payloadlen INTEGER,payload BLOB
);
mqtt3_config系统全局配置
typedef struct {int port;int msg_timeout;int persistence;char *persistence_location;int sys_interval;char *pid_file;
} mqtt3_config;
分别代表:TCP服务端监听端口; 发出消息无回应超时时间; 数据库是否保存及保存位置(不保存则存在于内存中);系统消息更新的时间间隔;PID文件(后台运行时使用)。
_mqtt3_context客户端上下文
typedef struct _mqtt3_context{int sock;time_t last_msg_in;time_t last_msg_out;uint16_t keepalive;bool clean_start;char *id;
} mqtt3_context;
分别代表:socket fd;在该客户端socket上最近发送时间;最近接收时间;心跳周期;客户端connect flag之clean session;客户端id。
mqtt3_context方法:mqtt3_context_init:初始化,mqtt3_context_cleanup:关闭socket;如果clean_start置上,则还需要删除subs、messages、clients中与对应客户端有关的行
- mqtt3_handle_connect:设置context的client_id
- 其它各处使用都是需要依据context
mqtt3_msg_status消息状态
typedef enum {ms_invalid = 0,ms_publish = 1,ms_publish_puback = 2,ms_wait_puback = 3,ms_publish_pubrec = 4,ms_wait_pubrec = 5,ms_resend_pubrel = 6,ms_wait_pubrel = 7,ms_resend_pubcomp = 8,ms_wait_pubcomp = 9
} mqtt3_msg_status;
-
三、基本函数
mqtt3_context_init
初始化除client_id之外的成员,client_id是应用层属性,只有在mqtt connect之后才可以获取(实际上还包括clean_start,初始化时默认置1)。
使用场景:收到新的客户端TCP连接时。
mqtt3_context_cleanup
关闭该客户端socket。如果clean_start为true,则还需要清理客户端、订阅、消息,即删除subs、messages、clients中有关行。
使用场景:
- 客户端TCP断开、出错
- 因客户端协议等不正确,服务端主动断开连接时
- 服务端结束运行时
mqtt3_db_client_insert
类似的函数mqtt3_db_client_delete等。组合方式为:
mqtt3_db_{TABLE}_{ACTION},顾名思义,这类函数的作用是操作数据库表。
mqtt3_db_retain_find
SELECT qos,payloadlen,payload FROM retain WHERE sub=?
使用关键字sub从retain表中搜索。
mqtt3_db_messages_queue(sub,qos,len,pl,retain)
功能:将指定的消息加入队列,涉及数据库表messages和retain。
- 处理持久化。mqtt3_db_retain_insert (): 如果消息是retain型的,则以sub为关键字插入或更新retain相应表项(内容-qos,payload,payloadlen均由消息自身提供)
- 为推送给订阅者做准备。mqtt3_db_sub_search_start(sub): 根据关键字sub从subs表找到所有匹配项并且调用
mqtt3_db_message_insert():将消息插入messages表,mqtt3_db_message_insert原型如下,其中的client_id来自于subs匹配行的client_id列(订阅者id)
int mqtt3_db_message_insert(const char *client_id, uint16_t mid, mqtt3_msg_direction dir, mqtt3_msg_status status, int retain, const char *sub, int qos, uint32_t payloadlen, const uint8_t *payload)
注意:该函数完成的插入操作,消息类型均为md_out。
mqtt3_socket_close(mqtt3_context *context)
- 依据client_id 和socket fd 重置clients. sock为-1
- 关闭socket
- 客户端context.sock重置为-1
-
四、算法和处理流程
系统初始化
mqtt3_config_read
读取系统配置文件,完成mqtt3_config的初始化(未提供配置时使用默认配置,例如默认监听端口1883)
mqtt3_db_open
- 根据系统配置打开/创建数据库,并调用_mqtt3_db_tables_create使用CREATE TABLE IF NOT EXISTS的方式创建5个表。
- 调用_mqtt3_db_invalidate_sockets初始化客户端socket: UPDATE clients SET sock=-1
mqtt3_socket_listen
开启TCP监听(默认端口1883)。
系统初始化完成后,进入处理循环。
Broker socket新连接处理
收到新的TCP connect之后,accept并且调用mqtt3_context_init进行context初始化。
Broker socket读操作(msg from client to server)及处理
TCP socket读数据在handle_read()中根据mqtt报文类型进行处理。Broker可能收到的mqtt报文有connect、publish、puback、pubrec、pubrel、pubcomp、subscribe、unsubscribe、ping、disconnect共10种(不含错误的报文)。
1.mqtt connect处理(ack)
【协议定义】客户端的connect应该回复ack,一个connect及ack命令示例:
0000 10 20 00 06 4d 51 49 73 64 70 03 02 00 3c 00 12 conn,len=0x20,6,MQIsdp V03,flag=2, kl=0x003c, id_len=18
0010 6d 6f 73 71 75 69 74 74 6f 5f 70 75 62 5f 34 32 34 37 mosquitto_pub_4247
ACK: 20 conn ack02 00 00 len=2, reserved, accept
【处理】mqtt3_handle_connect为处理客户端mqtt connect的函数,最重要的是两个任务:
(1)设置context id域;
(2)将(发起mqtt connect的)客户端信息插入表clients。
context->id = client_id;mqtt3_db_client_insert(context, will, will_retain, will_qos, will_topic, will_message);
mqtt3_raw_connack为回复conn ack的函数。该函数回复:20 02 00 00,4字节发送的方式是多次(4次,实际在TCP上是一次发送-如果Nagle算法未关闭的话),Wireshark抓包工具能够看出内容为一个TCP段mqtt connect ack。
【示例】客户端 connect时,mosquitto运行日志(在源码上增加的调试信息):
查看sqlite3数据库clients表:
2.mqtt disconnect处理(no ack)
【协议定义】disconnect消息:e0 00,不需要应答。
【处理】 mqtt3_handle_disconnect调用mqtt3_socket_close
int mqtt3_socket_close(mqtt3_context *context){……mqtt3_db_client_invalidate_socket(context->id, context->sock);rc = close(context->sock);context->sock = -1;
……
}
- 关闭socket
- 重置context sock
- 根据客户端重置clients表中的相应行中sock列
3.mqtt ping处理(pong)
【协议定义】对客户端的ping request(c0 00)直接回复ping response(d0 00)。理论上broker收不到pong,因此即使收到,就简单丢弃。
4.mqtt subscribe处理(ack,retain pub)
【协议定义】subscribe需要ack,一个例子如下:
REQ
0000 82 08 00 01 00 03 6d 73 67 00 sub+qos=1, msglen=8, mid=1, tlen=3, topic="msg", req_qos=0
ACK
0000 90 03 00 01 00Sub ack, len=3, mid=1, qos=0
【处理】int mqtt3_handle_subscribe(mqtt3_context *context)是处理客户端订阅的函数:
- 调用mqtt3_db_sub_insert将该订阅信息插入subs表
- 调用mqtt3_db_retain_find检查retain表里是否有消息(关键字sub)需要立即推送给该订阅者,并根据结果进行消息推送操作(描述见publish处理)。
在MQTT协议里关于消息的持久化规定对于持久的、最新一条PUBLISH消息,服务器要马上推送给新的订阅者(注:仅最新的一条,不是所有)。
【示例】1:mqtt sub(-t msg)之后:
查看subs有该订阅信息:
【示例】2:有持久消息时,订阅者会收到推送,如下,retain存有主题为/messages/vb的消息:
在订阅(-t /messages/vb)之后立即收到该消息:
该新订阅者除了收到sub ack之外还收到了retain消息(qos=0):ret_vb,如下:
5.mqtt unsubscribe处理(ack)
【协议定义】取消订阅需要ack。
【处理】int mqtt3_handle_unsubscribe(mqtt3_context *context)调用mqtt3_db_sub_delete删除subs表相应行。
注意:订阅者单纯的TCP断开,不会发送unsubscribe消息(因为根本来不及完成这个交互)。
6.mqtt publish处理(QoS=0/1/2)
【协议定义】publish有retain标志;publish根据QoS有不同处理:
1.QoS=0,broker应立即把消息推送给订阅者,不回ack,一个例子如下:
0000 30 0b 00 03 6d 73 67 71 30 5f 6d 73 67Pub+qos=0,len=11,topic_len=3,topic="msg", content:"q0_msg"
2.QoS=1,broker应立即把消息推送给订阅者,并回复ACK,一个例子如下:
REQ:
0000 32 0d 00 03 6d 73 67 00 01 71 31 5f 6d 73 67Pub+qos=1,len=13,topic_len=3,topic="msg",mid=0001, content:"q1_msg"
ACK:
0000 40 02 00 01 Puback, len=2, mid=0001
3.QoS=2,pub-recv-rel-comp,broker首先把消息暂存,然后经过recv-rel-comp握手之后,再把消息推送给订阅者。一个例子如下:
0000 34 0d 00 03 6d 73 67 00 01 71 32 5f 6d 73 67 C to S
0000 34 0d 00 03 6d 73 67 00 01 71 32 5f 6d 73 67 C to SPub+qos=2,len=13,topic_len=3,topic="msg",mid=0001, content:"q2_msg"
0000 50 02 00 01 S to CPub received
0000 62 02 00 01 C to SPub release
0000 70 02 00 01 S to CPub complete
【处理】mqtt3_handle_publish调用mqtt3_db_messages_queue或mqtt3_db_message_insert更新数据库,调用mqtt3_raw_XX回复客户端。
int mqtt3_handle_publish(mqtt3_context *context, uint8_t header)
{
……switch(qos){case 0:if(mqtt3_db_messages_queue(sub, qos, payloadlen, payload, retain)) rc = 1;break;case 1:if(mqtt3_db_messages_queue(sub, qos, payloadlen, payload, retain)) rc = 1;if(mqtt3_raw_puback(context, mid)) rc = 1;break;case 2:if(mqtt3_db_message_insert(context->id, mid, md_in, ms_wait_pubrec, retain, sub, qos, payloadlen, payload)) rc = 1;if(mqtt3_raw_pubrec(context, mid)) rc = 1;break;}
……
}
区分QoS:
- ==0。调用mqtt3_db_messages_queue(),为插入/更新retain表(因为publish消息可以是retain型)和messages表,消息方向md_out。
- ==1。同==0,多一个puback回复。
- ==2。会调用mqtt3_db_message_insert暂存消息并回复pubrec。注意消息方向md_in,状态ms_wait_pubrec。这说明这条消息是暂存的,在握手完成后,立即删除这条暂存消息并把原pub消息进行推送,见pubrel处理。
注意:发布者在发布之后,都会disconnect断连。这一点和订阅者不同,订阅者是和服务端保持长连接的!
7.mqtt puback处理(no ack)
【协议定义】什么情况下broker会收到puback并且需要处理呢?
---在broker向订阅客户publish QoS=1的消息时,收到回复puback表示客户端收到该消息。
【处理】mqtt3_handle_puback调用mqtt3_handle_puback (client_id, mid, dir)从messages中删除相应一条消息:client_id==订阅者的id。表示推送完成不再需要重复推送,因此从数据库中删除。
【示例】1:
客户端 id_sub01订阅:-t topic01
客户端id_pub01发布消息主题为msg,内容为:this is msg from pub01 qos1
根据就低原则(参见下面描述),id_sub01只会收到broker的publish(QoS=0),不需要回复puback:
【示例】2:客户端 id_sub01订阅:-t topic02 –q 1
客户端id_sub01在id_pub01发布QoS=1的消息后收到的是QoS=1的消息
8.mqtt pubrec & pubcomp处理
【协议定义】什么情况下broker会收到pubrec和pubcomp?
---当broker向客户端publish QoS=2的消息时,收到客户端的回应不是puback,而是pubrec,表示客户端收到了publish消息,broker将回复pubrel,客户端收到后回复pubcomp。
【处理】
- mqtt3_handle_pubrec处理broker收到pubrec:
·调用mqtt3_db_message_update,根据参数更新messages中消息,设置消息状态为
ms_wait_pubcomp,同时更新时间戳,SQL语句可以伪码描述为:
UPDATE messages SET status=ms_wait_pubcomp, timestamp=now
WHERE client_id={client_id} AND mid={mid} AND direction=md_out
·调用mqtt3_raw_pubrel给客户端回复pubrel
- mqtt3_handle_pubcomp处理broker收到pubrel:
·调用mqtt3_db_message_delete依据参数删除messages中消息。
DELETE FROM messages WHERE client_id={client_id} AND mid={mid} AND direction=md_out
9.mqtt pubrel处理
【协议定义】什么情况下broker会收到pubrel?
---当客户端向broker publish QoS=2的消息时,broker首先回复pubrec,客户端收到后回复pubrel,broker应回复pubcomp。
【处理】mqtt3_handle_pubrel对收到pubrel进行处理:
·调用mqtt3_db_message_release:找到publish时暂存的md_in消息,取出其内容,并调用mqtt3_db_messages_queue将publish消息插入messages(以及retain如果是持久消息),完成后删除暂存md_in消息;
·调用mqtt3_raw_pubcomp回复pubcomp
10.mqtt unsubscribe处理(ack)
【处理】mqtt3_handle_unsubscribe调用mqtt3_db_sub_delete从subs中删除客户端指定topic的订阅信息,并(直接组织应答报文)回复ack。
Broker socket写操作及处理
mqtt3_db_outgoing_check发送数据准备
int mqtt3_db_outgoing_check(fd_set *writefds, int *sockmax)
使用JION语句
SELECT sock FROM clients JOIN messages ON clients.id=messages.client_id WHERE (messages.status=1 OR messages.status=2 OR messages.status=4 OR messages.status=6 OR messages.status=8) AND messages.direction=1 AND sock<>-1
找出“有哪些messages要发送到哪些客户端”。
这里关键是clients.id=messages.client_id,怎么理解?
- clients.id 是客户端(包括订阅者、发布者,其实只有订阅者会保存在数据库中)的id。如前分析:订阅者在mqtt conn之后提供client id,mosquitto broker将id保存在clients中;
- messages.client_id,如前分析:messages表中client_id列来自于subs匹配行的client_id列(即订阅者id)
由此可见,通过这个sql语句,找到了本条messages有哪些订阅者,下一步的动作是推送给这些订阅者,在mqtt3_db_outgoing_check中的处理是找到这些订阅者的socket fd,并加入FD_SET中(FD_ISSET返回真),在接下来的pselect循环会进行处理。
messages.direction=1指的是md_out,发出消息。
messages.status=1 OR messages.status=2 OR messages.status=4 OR messages.status=6 OR messages.status=8 指的是下列状态:
- ms_publish:消息待推送,在mqtt3_db_message_write中即将完成推送
- ms_publish_puback:消息待推送并且需要收到puback,在mqtt3_db_message_write中即将完成推送并置消息状态为ms_wait_puback
- ms_publish_pubrec:消息待推送并且需要收到pubrec,在mqtt3_db_message_write中即将完成推送并置消息状态为ms_wait_ pubrec
- ms_resend_pubrel:ms_wait_pubrel的消息超时没收到客户端回复pubrel,在mqtt3_db_message_write中即将重发
- ms_resend_pubcomp:ms_wait_ pubcomp的消息超时没收到客户端回复pubcomp,在mqtt3_db_message_write中即将重发
mqtt3_db_message_write发送消息
在经过select及FD_ISSET后,已经进入了某个客户端的发送。mqtt3_db_message_write为完成发送的执行体。
SQL语句select绑定的是client_id,根据messages表中现有待发出的消息的状态,有不同的动作:
switch(status){case ms_publish:if(!mqtt3_raw_publish(context, false, qos, retain, mid, sub, payloadlen, payload)){mqtt3_db_message_delete_by_oid(OID);}break;case ms_publish_puback:if(!mqtt3_raw_publish(context, false, qos, retain, mid, sub, payloadlen, payload)){mqtt3_db_message_update(context->id, mid, md_out, ms_wait_puback);}break;case ms_publish_pubrec:if(!mqtt3_raw_publish(context, false, qos, retain, mid, sub, payloadlen, payload)){mqtt3_db_message_update(context->id, mid, md_out, ms_wait_pubrec);}break;case ms_resend_pubrel:if(!mqtt3_raw_pubrel(context, mid)){mqtt3_db_message_update(context->id, mid, md_out, ms_wait_pubrel);}break;case ms_resend_pubcomp:if(!mqtt3_raw_pubcomp(context, mid)){mqtt3_db_message_update(context->id, mid, md_out, ms_wait_pubcomp);}break;
}
- ms_publish:推送后删除
- ms_publish_puback:推送并置消息状态为ms_wait_puback(在客户端收到并回复puback之后,broker的接收socket处理hand_read会删除推送消息,这样完成了消息状态的闭环)
- ms_publish_pubrec:推送并置消息状态为ms_wait_ pubrec
- ms_resend_pubrel:ms_wait_pubrel的消息超时没收到客户端回复pubrel,重发后恢复ms_wait_pubrel状态
- ms_resend_pubcomp:ms_wait_ pubcomp的消息超时没收到客户端回复pubcomp,重发后恢复ms_wait_ pubcomp状态
Broker socket出错处理
如前所述,客户端断开TCP,不会引起mqtt层相应操作,但broker会完成清理:
删除该订阅者的信息(clients、subs、messages),有遗言的把遗言加入messages(如果遗言持久化消息,还会保存到retain供后续订阅者使用)。
示例1:
- 带持久化遗言的订阅(--will-payload "it's a will msg" --will-retain --will-topic msg)
- TCP断开,broker将遗言保存到retain表中:
- 新的订阅(-t msg)除收到suback外还会收到该主题的遗言
Broker socket超时处理
Mosquitto记录在每个客户端socket上最近一次收到报文时间,按照1.5倍心跳周期内没有收到客户端任何数据作为客户端链路超时的依据,超时发生时主动断开和该客户端TCP连接。
客户端回应消息超时处理
【协议定义】不同于上面的socket链路超时,这里指的是mqtt协议层超时,即客户端可能还会发出ping,但是对broker的推送消息(QoS=1)不予回复ACK。
【处理】mqtt3_db_message_timeout_check使用两条SQL语句
SELECT OID,status FROM messages WHERE timestamp < time(NULL) - timeout
timestamp < time(NULL) – timeout 即time(NULL) - timestamp > timeout <,含义是:当前时间距离消息的时间戳已经超过了timeout。
UPDATE messages SET status=?,dup=1 WHERE OID=?
其中状态status变化为:
ms_wait_puback --> ms_publish_puback
ms_wait_pubrec --> ms_publish_pubrec
ms_wait_pubrel -->ms_resend_pubrel
ms_wait_pubcomp --> ms_resend_pubcomp
处理流程之数据库视角
Cli send |
clients |
subs(key:sub) |
messages |
retain |
connect |
插入/更新 |
无 |
无 |
无 |
disconnect |
依据id和sock,重置sock = -1 |
无 |
无 |
无 |
TCP disconnect |
删除 |
删除 |
1、删除客户端相关联消息; 2、插入客户端的遗言消息 |
插入(订阅/发布)客户的遗言持久化消息 |
subscribe |
无 |
插入 |
依据右边表格(retain sub查询结果)插入mqtt3_db_messages_queue(retain_msg) md_out消息(QoS决定了消息状态) |
1、关键字sub查询,并给左边表格使用 2、带遗言持久化的订阅会插入行 |
unsubscribe |
无 |
删除 |
无 |
无 |
publish |
无 |
查询是否有订阅者,结果供右边表格使用 |
QoS=0/1:依据左边表格结果插入的mid_out消息; QoS=2:先插入md_in, ms_wait_pubrec消息,在握手完成后删除该临时消息,并插入publish消息md_out |
发布的消息是retain型则插入 |
ping |
无 |
无 |
无 |
无 |
处理流程之方向视角
|
方向 |
connect |
上行(即客户端到服务器) |
disconnect |
上行(即客户端到服务器) |
TCP disconnect |
双向 |
subscribe |
上行 |
unsubscribe |
上行 |
publish |
双向。Publish可以发送消息到broker,broker也使用mqtt publish把来自其他客户端的消息推送给subscribe |
ping |
上行 |
|
|
处理流程之消息状态视角
- Broker收到客户端pub时,应把消息暂存在messages中:
- QoS==0/1,直接暂存
- QoS==2,需要broker回复pubrec,消息状态:ms_publish_pubrec,方向:md_in
- Broker转发客户端pub的消息给subscribe时,根据就低原则,选取客户端订阅时QoS和本pub消息的QoS两者小值:
- QoS==0,不需要客户端回复ack,消息状态:ms_publish,方向md_out
- QoS==1,需要客户端回复puback,消息状态:ms_publish_puback,方向md_out
- QoS==2,需要客户端回复pubrec,消息状态:ms_publish_pubrec,方向:md_out
处理流程之综合示例
演示如下模型:
1.客户端sub01订阅主题msg,使用QoS=2;
./sub -i sub01 -t msg -q 2 -h 192.168.122.21 --will-qos 2 --will-retain --will-topic msg --will-payload "this is a will-retain qos=2 msg from sub01"&
2.客户端pub01发布主题msg QoS=2 retain=1消息”this is qos=2 topic=msg from pub01” ,will:QoS=2,retain=1,topic=msg, msg=”this is a will-retain qos=2 msg from pub01”
./pub -i pub01 -t msg -q 2 -r -h 192.168.122.21 --will-topic msg -m "this is qos=2 topic=msg from pub01" --will-qos 2 --will-retain --will-payload "this is a will-retain qos=2 msg from pub01"
3.客户端sub02订阅主题msg,使用QoS=2;
./sub -i sub02 -t msg -q 2 -h 192.168.122.21&
4.异常结束sub01
消息交互流程:
- sub01 connect, broker回复 connect ack
- sub01 subscribe, broker回复subscribe ack
- pub01 connect, broker回复 connect ack
- pub01 publish, broker回复publish received
- pub01 publish release, broker向sub01 publish
- sub01 publish received,broker回复pub01 publish complete
- pub01 disconnect, broker回复sub01 publish release
- sub01 publish complete
- sub02 connect, broker回复 connect ack
- sub02 subscribe, broker回复subscribe ack + publish(pub01的retain publish)
- sub02 publish received,broker回复sub02 publish release
- sub02 publish complete
- sub01异常退出, broker向sub02 publish(sub01的will msg)
- sub02 publish received,broker回复sub02 publish release
- sub02 publish complete
几个特别注意的地方
- 客户端在connect的时候就已经可以带遗言,不是在subscribe或者publish的时候。从遗言的设计角度来理解,遗言是应对链路异常(设备掉电、异常关机、网络异常等原因引起掉线)时通知其他参与方的,当然是在connect时指定了。
- 同时,客户端正常退出即主动发起mqtt disconnect,broker不会保存其遗言的。这真正体现了遗言的作用。
- 上面综合示例可以看出broker并发服务器的特点,即不会对一个客户端处理完成之后再处理另外的客户端(迭代服务器)。
- mqtt3_socket_close在重置clients表sock时,为什么需要使用client_id和sock,只传入一个sock或者client_id不就行了吗?难道这两个不具有唯一性?
- 如果一个客户端conn之后啥也不干,到1.5倍心跳时间时,broker会否断开连接?
- 哪些地方涉及QoS?QoS是和主题相关的,因此发生在:
- 订阅者订阅某主题,指定QoS
- 订阅者在收retain消息时,有retain msg qos和sub qos,按照就低原则选取的QoS=MIN(retain_qos, sub_qos)
- 发布者发布消息时,指定QoS
一些评论
- 使用pselect 机制和sqlite3数据库限制了并发客户端的处理效率和数量,据说V1.5版本改成了epoll
- TCP server尽量开启REUSEADDR socket选项
- 回复时(例如connect ack是分4次write 1B)尽量一次丢给TCP,当然,开启了Nagle算法的时候会进行组合,但还是应用层自己管理更好一些
后语
匆匆完成剖析,感觉意犹未尽。有如下几个问题,值得进一步思考:
- 源码剖析的目的是什么?为此我做了什么?
- 一个mqtt broker需要完成哪些任务?哪些是至关重要的?哪些是和性能有关的?
- Mosquito-v0.1是怎样完成这个任务的?是否体现了协议的所有方面?
- Mosquito-v0.1的性能怎么样?能够处理多少客户端并发?
- Mosquito-v0.1的优缺点是什么?有什么精妙的地方值得学习的?