源码剖析
一、读入响应数据包
?Mycat 主要采用 Reactor 模式作为通信模型,而此通信模型获取客户端发送到服务端的请求数据包是通过在信道中注册 SelectionKey.OP_READ 读事件。Mycat 也不例外,用名为 NIOREACTOR-i-RW 的线程在双向通道中注册读事件,并接收、处理请求数据包。
package io.mycat.net;public final class NIOReactor {private final class RW implements Runnable {public void run() {for ( ; ; ) {...try {...// 注册 SelectionKey.OP_READ 读事件register(tSelector);...if ( ... ) {...} else {...for (SelectionKey key : keys) {AbstractConnection con = null;try {// 获取当前的附加连接对象,如果没有附件,则为空Object att = key.attachment();if (att != null) {con = (AbstractConnection) att;if (key.isValid() && key.isReadable()) {try {// 同步处理读到的数据包con.asynRead();} catch ( ... ) { ... }}} else {// 请求取消此键的通道到其选择器的注册key.cancel();}}}}} catch ( ... ) { ... }}}}
}
?将字节序列使用同步非阻塞方式从信道读入到指定的缓冲区并得到当前信道的状态,根据该状态值及缓冲区中的字节序列作后续的状态检测及字节序列解码。
package io.mycat.net;public class NIOSocketWR extends SocketWR {public void asynRead() throws IOException {...// 将字节序列从此信道中读入给定的缓冲区int got = channel.read(theBuffer);// 读取可能的 Socket 字节流con.onReadData(got);}
}
?信道状态作为参数,根据信道状态作相应的流程控制。当状态正常时,则继续循环处理缓冲区中的字节序列信息。处理过程中还需要判断是否需要解压字节信息。
package io.mycat.net;public abstract class AbstractConnection implements NIOConnection {public void onReadData(int got) throws IOException {// 当前连接是否已关闭if (isClosed.get()) {return;}// 记录本次读事件时间,将作为连接空闲超时判断依据lastReadTime = TimeUtil.currentTimeMillis();if (got < 0) { // 客户端已主动关闭 Socketthis.close("stream closed");return;} else if (got == 0 && !this.channel.isOpen()) { // 无数据且通道未开放this.close("socket closed");return;}// 累加读入字节大小,用于记录此连接读入字节总大小netInBytes += got;processor.addNetInBytes(got);// 循环处理字节信息int offset = readBufferOffset;int length = 0;int position = readBuffer.position();for ( ; ; ) {// 获取数据包的长度length = getPacketLength(readBuffer, offset);...if (position >= offset + length && readBuffer != null) {// handle this packagereadBuffer.position(offset);byte[] data = new byte[length];readBuffer.get(data, 0, length);// 处理数据包handle(data);...}}}@Overridepublic void handle(byte[] data) {if (isSupportCompress()) { // 是否支持压缩// 解压数据包,同时做分包处理List<byte[]> packs = CompressUtil.decompressMysqlPacket(data, decompressUnfinishedDataQueue);for (byte[] pack : packs) {if (pack.length != 0) {// 处理数据包handler.handle(pack);}}} else {// 处理数据包handler.handle(data);}}
}
package io.mycat.backend.mysql.nio;public class MySQLConnectionHandler extends BackendAsyncHandler {@Overridepublic void handle(byte[] data) {offerData(data, source.getProcessor().getExecutor());}
}
package io.mycat.net.handler;public abstract class BackendAsyncHandler implements NIOHandler {protected void offerData(byte[] data, Executor executor) {handleData(data);}
}
二、数据包分类
?通过判断服务端响应给客户端报文的第 1 个字节值,可将所有的响应报文划分为以下几种类型。
响应报文类型 | 第1个字节取值范围 |
---|---|
OK 响应报文 | 0x00 |
Error 响应报文 | 0xFF |
Result Set Header 报文 | 0x01 - 0xFA |
Field 报文 | 0x01 - 0xFA |
Row Data 报文 | 0x01 - 0xFA |
EOF 报文 | 0xFE |
?将不同的响应报文类型组合起来,能对应出不同的 sql 请求返回结果,下述代码就是通过流程分支组合报文类型,以映射不同的 sql 请求。
package io.mycat.backend.mysql.nio;public class MySQLConnectionHandler extends BackendAsyncHandler {protected void handleData(byte[] data) {switch (resultStatus) {case RESULT_STATUS_INIT:switch (data[4]) { // 判断响应类型case OkPacket.FIELD_COUNT: // 命令执行成功时,返回OK响应报文,值恒为0x00handleOkPacket(data); break;case ErrorPacket.FIELD_COUNT: // 命令执行失败时,返回Error响应报文,值恒为0xFFhandleErrorPacket(data); break;case RequestFilePacket.FIELD_COUNT:handleRequestPacket(data); // load data file 请求文件数据包处理break;default: // 其他类型resultStatus = RESULT_STATUS_HEADER; // 转换状态,准备接收数据header = data;fields = new ArrayList<>((int) ByteUtil.readLength(data, 4));}break;case RESULT_STATUS_HEADER:switch (data[4]) {case ErrorPacket.FIELD_COUNT: // Error响应报文,值恒为0xFFresultStatus = RESULT_STATUS_INIT;handleErrorPacket(data);break;case EOFPacket.FIELD_COUNT: // EOF响应报文,值恒为0xFEresultStatus = RESULT_STATUS_FIELD_EOF; // 数据接收完毕,准备接收命令结束符handleFieldEofPacket(data);break;default:fields.add(data);}break;case RESULT_STATUS_FIELD_EOF:switch (data[4]) {case ErrorPacket.FIELD_COUNT: // Error响应报文,值恒为0xFFresultStatus = RESULT_STATUS_INIT;handleErrorPacket(data);break;case EOFPacket.FIELD_COUNT: // EOF响应报文,值恒为0xFEresultStatus = RESULT_STATUS_INIT; // 命令接收完毕handleRowEofPacket(data); // 行数据包结束处理break;default:handleRowPacket(data); // 行数据包处理}break;default:throw new RuntimeException("unknown status!");}}
}
2.1 变更类型
?变更类型的 sql 请求对应的响应结果是由 OK 报文和 EOF 报文组成。具体的 OK 报文结构如下表所示:
序号 | 字节长度 | 说明 |
---|---|---|
1 | 1 | OK 报文。值恒为 0x00 |
2 | 1-9 | 受影响行数。当执行INSERT /UPDATE /DELETE 语句时所影响的数据行数。 |
3 | 1-9 | 索引 ID 值。 该值为 AUTO_INCREMENT 索引字段生成,如果没有索引字段,则为 0x00。注意:当 INSERT 插入语句为多行数据时,该索引 ID 值为第一个插入的数据行索引值,而非最后一个 |
4 | 2 | 服务器状态。客户端可以通过该值检查命令是否在事务处理中。 |
5 | 2 | 告警计数。告警发生的次数。 |
6 | n | 服务器消息。服务器返回给客户端的消息,一般为简单的描述性字符串,可选字段。 |
2.1.1 转发到单节点
?当原 sql 经解析路由被转发到单节点上执行时,对应的也就仅有一个数据节点会响应结果集,那么仅把收到的 OK 报文和 EOF 报文再转发到前端连接的写缓冲队列中即可。
package io.mycat.backend.mysql.nio.handler;public class SingleNodeHandler implements ResponseHandler, Terminatable, LoadDataResponseHandler {@Overridepublic void okResponse(byte[] data, BackendConnection conn) {this.netOutBytes += data.length; // 记录输出字节大小,供管理端口使用boolean executeResponse = conn.syncAndExcute();if (executeResponse) {ServerConnection source = session.getSource(); // 对应的前端连接OkPacket ok = new OkPacket();ok.read(data); // 解析OK报文boolean isCanClose2Client = !rrs.isCallStatement() || !rrs.getProcedure().isResultSimpleValue();...if (isCanClose2Client) {source.setLastInsertId(ok.insertId); // 设置返回主键if (!errorRepsponsed.get() && !session.closed() && source.canResponse()) {ok.write(source); // 写入前端连接的写缓冲队列([OK 报文])}}... // 记录操作信息,供管理端口查看}}
}
?真正将前端连接写缓冲队列中的数据发送给应用是由对应的 NIOSocketWR 从写队列中读取 ByteBuffer 并返回的。
2.1.2 转发到多节点
?当原 sql 经解析路由被转发到多节点上执行时,则需要先把各数据节点响应的 OK 报文信息汇总成一个新的 OK 报文,再把新 OK 报文和收到的最后一个 EOF 报文写入前端连接的写缓冲队列中。
OK 报文的合并是先同步累加各 OK 报文的受影响行数,当处理最后一个 OK 报文时,将前置的累加结果信息赋值到最后一个 OK 报文中,再写入前端连接的写缓冲队列中。
package io.mycat.backend.mysql.nio.handler;public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataResponseHandler {@Overridepublic void okResponse(byte[] data, BackendConnection conn) {this.netOutBytes += data.length;boolean executeResponse = conn.syncAndExcute();LOGGER.debug("received ok response, executeResponse:{} from {}", executeResponse, conn);if (executeResponse) {ServerConnection source = session.getSource();OkPacket ok = new OkPacket();ok.read(data);boolean isCanClose2Client = !rrs.isCallStatement() || !rrs.getProcedure().isResultSimpleValue();if (!isCallProcedure) {if (clearIfSessionClosed(session) || canClose(conn, false)) {return;}}lock.lock(); // 变量值变更需要同步操作try {if (!rrs.isGlobalTable()) { // 非全局表affectedRows += ok.affectedRows; // 累加受影响行数} else { // 全局表affectedRows = ok.affectedRows; // 受影响行数以最后一次执行的为准}if (ok.insertId > 0) {if (rrs.getAutoIncrement()) { // 是否为自增主键insertId = insertId == 0 ? ok.insertId : Math.max(insertId, ok.insertId);} else {insertId = insertId == 0 ? ok.insertId : Math.min(insertId, ok.insertId);}}} finally {lock.unlock(); // 释放同步锁}LOGGER.debug("{} on row okResponse {} {} {}", new Object[]{this, conn, errorRepsponsed.get(), nodeCount});// 是否为最后一个数据节点的数据包boolean isEndPacket = isCallProcedure ? decrementOkCountBy(1) : decrementCountBy(1);if (isEndPacket && isCanClose2Client) {... lock.lock();try {if (rrs.isLoadData()) {byte lastPackId = source.getLoadDataInfileHandler().getLastPackId();ok.packetId = ++lastPackId;// OK_PACKET// 此处信息只是为了控制台给人看的ok.message = ("Records: " + affectedRows + " Deleted: 0 Skipped: 0 Warnings: 0").getBytes();source.getLoadDataInfileHandler().clear();} else {ok.packetId = ++packetId;}ok.affectedRows = affectedRows; // 设置受影响行数ok.serverStatus = source.isAutocommit() ? 2 : 1; // 设置服务器状态if (insertId > 0) {// 设置索引ID值ok.insertId = rrs.getAutoIncrement() ? (insertId - affectedRows + 1) : insertId;source.setLastInsertId(insertId);}if (source.canResponse()) { // 判断是否已经报错返回给前台了ok.write(source); // 写入前端连接的写缓冲队列}} catch (Exception e) {handleDataProcessException(e);} finally {lock.unlock();}}... // 记录操作信息,供管理端口查看}}
}
2.2 查询类型
?当客户端发送查询请求后,在没有错误的情况下,服务器会返回结果集(Result Set)给客户端。Result Set 消息分为五部分,结构如下:
序号 | 结构 | 说明 |
---|---|---|
1 | [Result Set Header] | 列数量 |
2 | [Field] | 列信息(多个) |
3 | [EOF] | 列结束 |
4 | [Row Data] | 行数据(多个) |
5 | [EOF] | 数据结束 |
?Result Set 在 Mycat 中的按照先后顺序对应的流程分支如下:
- RESULT_STATUS_INIT 状态下的 default 分支
Result Set Header 结构类型数据,从中获取出将要返回的列数量,使用该数量初始化一个用于存储列信息的 List 集合。 - RESULT_STATUS_HEADER 状态下的 default 分支
Field 结构类型数据,将从中获取到的列信息存储 List 集合中。 - RESULT_STATUS_HEADER 状态下的 EOFPacket.FIELD_COUNT 分支
Field EOF 结构类型数据,表示列信息结束,将列信息写入前端连接的写缓冲队列中。 - RESULT_STATUS_FIELD_EOF 状态下的 default 分支
Row Data 结构类型数据,客户端的预期数据结果集。 - RESULT_STATUS_FIELD_EOF 状态下的 EOFPacket.FIELD_COUNT 分支
Row EOF 结构类型数据,表示信息返回结束,将标识写入前端连接的写缓冲队列汇中。
2.2.1 转发到单节点
?当原 sql 经解析路由被转发到单节点上执行时,对应的也就仅有一个数据节点会响应结果集,那么仅把收到的各类报文按照收到的先后顺序再转发到前端连接的写缓冲队列中即可。下面简要展示一下在此场景下的各响应报文的处理方式。
- 字段结束包(Field EOF 报文)
此类报文表示将要返回的列个数以及列信息均已返回完毕,即将返回 row 数据信息,其中起到信息分隔的作用,下述为此报文的处理方式。
在处理 Field EOF 报文时,把 Result Set Header 信息、Field 信息和 Field EOF 信息先后写入 ByteBuffer 中,再一次性把该缓冲写入前端连接的缓冲队列中。package io.mycat.backend.mysql.nio.handler;public class SingleNodeHandler implements ResponseHandler, Terminatable, LoadDataResponseHandler {/*** 将列数量和列信息依次写入前端连接的写缓冲队列中(服务端返回列信息结束包时触发)** @param header 列数量报文数据包* @param fields 字段信息集合* @param eof 列结束报文数据包* @param conn 后端连接实例*/public void fieldEofResponse(byte[] header, List<byte[]> fields, byte[] eof, BackendConnection conn) {... // 记录相关信息,供管理端口查看ServerConnection source = session.getSource();// 把将返回的字段个数写入前端连接缓冲队列(Header 报文)buffer = source.writeToBuffer(header, allocBuffer()); for (byte[] field : fields) {field[3] = ++packetId;FieldPacket fieldPk = new FieldPacket();fieldPk.read(field); // 解析Field报文fieldPackets.add(fieldPk);// 把将返回的字段信息写入前端连接缓冲队列(Field 报文)buffer = source.writeToBuffer(field, buffer); }fieldCount = fieldPackets.size();eof[3] = ++packetId;// 把将返回的字段结束符写入前端连接缓冲队列(Field EOF 报文)buffer = source.writeToBuffer(eof, buffer);... // 当请求是 show [full] tables 类型时,填充分片表 row 信息到写缓冲队列中} }
- 行数据包(Row Data 报文)
行数据包中存放的是真正的数据,每个数据包对应一行数据,此处除特殊场景外,无需做任何处理,直接将数据包转发到前端连接的写缓冲队列中即可。package io.mycat.backend.mysql.nio.handler;public class SingleNodeHandler implements ResponseHandler, Terminatable, LoadDataResponseHandler {/*** 将行数据写入缓冲区中(服务端返回行数据时触发)** @param row 行数据(单行)* @param conn 后端连接*/public void rowResponse(byte[] row, BackendConnection conn) {if (errorRepsponsed.get()) {//已经有错误时不再继续处理,直接返回return;}...if (isDefaultNodeShowTable || isDefaultNodeShowFullTable) {RowDataPacket rowDataPacket = new RowDataPacket(1);rowDataPacket.read(row);String table = StringUtil.decode(rowDataPacket.fieldValues.get(0), session.getSource().getCharset());if (shardingTablesSet.contains(table.toUpperCase())) {return;}}row[3] = ++packetId;if (prepared) {...} else {MiddlerResultHandler middlerResultHandler = session.getMiddlerResultHandler();if (null == middlerResultHandler) {// 把将返回的行数据写入前端连接缓冲队列(Header 报文)buffer = session.getSource().writeToBuffer(row, allocBuffer());} else {...}}} }
- 行数据结束包
此类报文表示 row 数据已返回完毕,结束本次响应,直接原封不动写入前端连接的写缓冲队列中即可。
该方法并没有真正处理 row 数据包,而是将其投递到 io.mycat.sqlengine.mpp.package io.mycat.backend.mysql.nio.handler;public class SingleNodeHandler implements ResponseHandler, Terminatable, LoadDataResponseHandler {/*** 将行数集结束标志写入前端连接缓冲队列(行结束标志返回时触发)** @param eof 行数据结束标识* @param conn 后端连接*/public void rowEofResponse(byte[] eof, BackendConnection conn) {this.netOutBytes += eof.length;ServerConnection source = session.getSource();conn.recordSql(source.getHost(), source.getSchema(), node.getStatement());// 判断是调用存储过程的话不能在这里释放链接if (!rrs.isCallStatement() || rrs.getProcedure().isResultSimpleValue()) {session.releaseConnectionIfSafe(conn, LOGGER.isDebugEnabled(), false);endRunning();}eof[3] = ++packetId;// 把将返回的行数据结束标志写入缓冲中buffer = source.writeToBuffer(eof, allocBuffer());...MiddlerResultHandler middlerResultHandler = session.getMiddlerResultHandler();if (middlerResultHandler != null) {middlerResultHandler.secondEexcute();} else {if (!errorRepsponsed.get() && !session.closed() && source.canResponse()) { // 判断能否继续返回// 把将返回的行数据结束标志写入前端连接缓冲队列(EOF 报文)source.write(buffer);}}... // 记录操作信息,供管理端口查看} }
队列中,并交由businessExecutor线程处理
2.2.2 转发到多节点
?当原 sql 经解析路由被转发到多节点上执行时,则需要先把各数据节点响应的 Row Data 报文信息汇总成一个新报文,再把新报文和收到的最后一个 EOF 报文写入前端连接的写缓冲队列中。
- 字段结束包
此处仅处理接收到的首个 Field EOF 报文,而且对于在路由解析模块中未改写过结果集的 sql 请求对应的 Field EOF 报文,处理方式和路由到单节点时保持一致,对于改写过结果集的请求,此处需要再把 Field 信息恢复原貌。package io.mycat.backend.mysql.nio.handler;public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataResponseHandler {/*** 将列数量和列信息依次写入前端连接的写缓冲队列中(服务端返回列信息结束包时触发)** @param header 列数量报文数据包* @param fields 字段信息集合* @param eof 列结束报文数据包* @param conn 后端连接实例*/public void fieldEofResponse(byte[] header, List<byte[]> fields, byte[] eof, BackendConnection conn) {if (errorRepsponsed.get() || this.isFail()) {return; // 连接错误时,直接返回}... // 记录操作信息,供管理端口查看ServerConnection source = null;if (fieldsReturned) { // 字段信息是否已返回return;}lock.lock(); // 再入锁try {if (fieldsReturned) {return;}fieldsReturned = true; // 设置为已返回字段信息... // 当原sql是AVG类型时,记录要改写的函数source = session.getSource();ByteBuffer buffer = source.allocate();... // 原sql为avg类型时,改写返回字段信息eof[3] = ++packetId;buffer = source.writeToBuffer(eof, buffer);MiddlerResultHandler middlerResultHandler = session.getMiddlerResultHandler();if (null == middlerResultHandler) {source.write(buffer);}... // } catch (Exception e) {handleDataProcessException(e);} finally {lock.unlock(); // 释放再入锁}} }
Mycat 会在路由解析模块将 AVG() 聚合函数改写成 SUM() 和 COUNT() 聚合函数。
- 行数据包
此方法并没有直接处理返回的 row 数据包,而是将其投递到 io.mycat.sqlengine.mpp.AbstractDataNodeMerge 类里名为 packs 的队列中,交由 businessExecutor 线程去做聚合处理。package io.mycat.backend.mysql.nio.handler;public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataResponseHandler {/*** 将行数据写入缓冲区中(服务端返回行数据时触发)** @param row 行数据(单行)* @param conn 后端连接*/public void rowResponse(byte[] row, BackendConnection conn) {if (errorRepsponsed.get() || this.isFail()) {return; // 连接错误时,直接返回}lock.lock(); // 再入锁try {this.selectRows++;RouteResultsetNode rNode = (RouteResultsetNode) conn.getAttachment();String dataNode = rNode.getName();if (dataMergeSvr != null) { // 需要对数据作聚合处理// 将row数据包投递到队列中,并交由businessExecutor线程处理dataMergeSvr.onNewRecord(dataNode, row);MiddlerResultHandler middlerResultHandler = session.getMiddlerResultHandler();if (middlerResultHandler instanceof MiddlerQueryResultHandler) { // 中间结果处理类// 从row数据包中获取首个字段值byte[] rv = ResultSetUtil.getColumnVal(row, fields, 0);String rowValue = rv == null ? "" : new String(rv);middlerResultHandler.add(rowValue);}} else { ... // 非select类型且不需要聚合处理}} catch (Exception e) {handleDataProcessException(e);} finally {lock.unlock();}} }
- 行数据结束包
此方法仅处理接收到的最后一个 Row EOF 数据包,且同样交由 businessExecutor 线程去处理。package io.mycat.backend.mysql.nio.handler;public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataResponseHandler {public void rowEofResponse(final byte[] eof, BackendConnection conn) {LOGGER.debug("{} on row end response {} {} {}", new Object[]{this, conn, errorRepsponsed.get(), nodeCount});... // 容错性问题处理if (decrementCountBy(1)) { // 是否为最后一个节点的[Row EOF]包... // 容错性问题处理if (dataMergeSvr != null) {... // 数据合并前如果有中间过程则先执行数据合并再执行下一步try {// 把[row eof]数据包写入前端连接缓冲队列([ROW EOF]报文)(交由businessExecutor线程处理)dataMergeSvr.outputMergeResult(session, eof);} catch (Exception e) {handleDataProcessException(e);}} else {... // 写出非select类型且不需要聚合处理的sql的[ROW EOF]数据包至前端连接缓冲队列}}... // 记录操作信息,供管理端口查看} }
三、数据聚合
?Mycat 的结果集处理内存分为 Direct Memory 和 Heap Memory,而默认使用的是 Heap Memory 进行处理结果集,所以本文仅介绍 Heap Memory 处理方式。
虽然使用 Direct Memory(堆外内存)处理结果集的设计初衷是避免 OOM 问题、减少 Full GC 时间,提高 Mycat 响应速度,但在实际使用中会出现一些莫名的、偏底层的问题,在真实生产环境中并不使用之,所以该功能属于鸡肋功能。
?在上节中先后将 Row Data 报文和 Row EOF 报文封装成 io.mycat.sqlengine.mpp.PackWraper 实例,再投递到 AbstractDataNodeMerge 类里的 paks 属性队列中,并由名为 businessExecutor 的线程负责进行处理及写入到前端连接的缓冲队列中。
package io.mycat.sqlengine.mpp;public abstract class AbstractDataNodeMerge implements Runnable {/*** 添加一个数据包,如果没有运行,可能会唤醒一个业务线程来工作* @param pack 数据包* @return 是否正在运行*/protected final boolean addPack(final PackWraper pack) {packs.add(pack);if (running.get()) {return false;}final MycatServer server = MycatServer.getInstance();// 向businessExecutor线程池提交一个任务server.getBusinessExecutor().execute(this);return true;}
}
?在上述伪代码中,向 businessExecutor 线程池提交的任务是 this,而该 this 指向的是 AbstractDataNodeMerge 类,数据包也是被投递到该类的属性中,而且该类实现了 java.lang.Runnable 接口,所以从该类的 run() 方法开始探索。
package io.mycat.sqlengine.mpp;public class DataMergeService extends AbstractDataNodeMerge {public void run() {if (!running.compareAndSet(false, true)) {return; // 业务线程未启动,不作处理}boolean nulpack = false;try {// loop-on-packsfor ( ; ; ) {final PackWraper pack = packs.poll(); // 从队列中拉取数据if (pack == null) {nulpack = true; // 标记:队列为空break;}if (pack == END_FLAG_PACK) { // 处理Row EOF报文... // 拼装 Row EOF 报文final ServerConnection source = multiQueryHandler.getSession().getSource();final byte[] array = eof.array();// 输出合并后的结果集multiQueryHandler.outputMergeResult(source, array, getResults(array));break;}// 处理row data 报文final RowDataPacket row = new RowDataPacket(fieldCount);row.read(pack.rowData); // 解析row data报文if (grouper != null) {grouper.addRow(row); // 将数据添加到分组器中} else if (sorter != null) {if (!sorter.addRow(row)) { // 将数据添加排序器中canDiscard.put(pack.dataNode, true);}} else {// 无需分组或排序,直接缓存到结果集中result.get(pack.dataNode).add(row); }}} catch (final Exception e) {multiQueryHandler.handleDataProcessException(e);} finally {running.set(false);}if (nulpack && !packs.isEmpty()) {this.run();}}/*** 获取数据聚合后的结果集** @param eof Row Data EOF包* @return 聚合后的结果集*/public List<RowDataPacket> getResults(byte[] eof) {List<RowDataPacket> tmpResult = null;if (this.grouper != null) {tmpResult = grouper.getResult(); // 获取分组后的结果集grouper = null;}if (sorter != null) {if (tmpResult != null) {Iterator<RowDataPacket> itor = tmpResult.iterator();while (itor.hasNext()) {sorter.addRow(itor.next());itor.remove();}}tmpResult = sorter.getSortedResult(); // 获取排序后的结果集sorter = null;}//no grouper and sorterif (tmpResult == null) {tmpResult = new LinkedList<>();// 每次移除dataNode,防止一个dataNode重复发送多次结果集for (RouteResultsetNode node : rrs.getNodes()) {LinkedList<RowDataPacket> remove = result.remove(node.getName());if (remove != null) {tmpResult.addAll(remove);}}}LOGGER.debug("prepare mpp merge result for " + rrs.getStatement());return tmpResult;}
}
?上述方法中存在一个死循环,每次循环都要从队列里拉取一个 Row Data 数据包,直至拉取的数据包为空,方才结束循环。把拉取到的 Row Data 数据包添加到分组器或排序器中进行相应处理,当拉取的数据包是 Row Data EOF 类型时,写出数据结果集。
package io.mycat.backend.mysql.nio.handler;public class MultiNodeQueryHandler extends MultiNodeHandler implements LoadDataResponseHandler {/*** 输出合并结果集** @param source 前端连接* @param eof Row EOF数据包* @param results 合并后的数据结果集*/public void outputMergeResult(ServerConnection source, byte[] eof, List<RowDataPacket> results) {lock.lock();try {ByteBuffer buffer = session.getSource().allocate();final RouteResultset rrs = this.dataMergeSvr.getRrs();// 处理limit语句int start = rrs.getLimitStart();int end = start + rrs.getLimitSize();if (start < 0) {start = 0;}if (rrs.getLimitSize() < 0) {end = results.size();}if (end > results.size()) {end = results.size();}if (prepared) { // 预处理... // 拼装预处理的返回结果集} else {for (int i = start; i < end; i++) { // 遍历处理每个row data包RowDataPacket row = results.get(i);row.packetId = ++packetId;buffer = row.write(buffer, source, true); // 将每个Row Data包写入缓冲}}eof[3] = ++packetId;LOGGER.debug("last packet id:{}", packetId);if (source.canResponse()) { // 判断是否允许输出结果// 将数据包写入前端连接的缓冲队列中([Row Data]报文和[Row EOF]报文)source.write(source.writeToBuffer(eof, buffer));}} catch (Exception e) {handleDataProcessException(e);} finally {lock.unlock();dataMergeSvr.clear();}}
}
?先获取 Limit 限定结果集范围的起止值,再从聚合后的结果集中截取出指定范围的 Row Data 数据包,最后把截取的结果和 Row Data 数据包按先后顺序写入到前端连接的缓冲队列中。具体的分组或排序算法参见 io.mycat.sqlengine.mpp.RowDataPacketGrouper 类和 io.mycat.sqlengine.mpp.tmp.RowDataSorter 类。