补充点额外知识:
CompletableFuture 现在 只需要知道是一个和异步任务相关的东西即可.
AbstractTimerTask 实现的是 TimerTask 接口,先来分析下这个抽象类,因为这个类是其他任务类的父类.
// channel 集合
private final ChannelProvider channelProvider;
// 下次任务执行的间隔
private final Long tick;
// 是否取消
protected volatile boolean cancel = false;
lastRead 会从 channel 中获取读取的时间戳
lastWrite 会从 channel 中获取写的时间戳
now 返回当前时间
cancel 取消任务
reput 方法将任务重新放入调度器中.
run 方法执行任务
子类需要重写抽象方法 doTask,用于实现自己的业务逻辑,AbstractTimerTask 在实现的过程中,已经自动的实现了 reput 的功能.
CloseTimerTask 继承 AbstractTimerTask 类,实现关闭 channel 的功能,如果判定可以关闭了?现在距上次读/写时间超过空闲时间,则关闭 channel.
HeartbeatTimerTask 同样继承 AbstractTimerTask 类,实现心跳检测功能,比如说当前 channel 距上次读/写时间超过心跳时间(60s),就发送一个心跳事件去看下.
ReconnectTimerTask 继承 AbstractTimerTask 类,实现重连功能. 空闲时间必须心跳间隔的 2 倍以上,因为客户端可能会重试. 什么时候会触发重连了?1.channel 没有连接上,2.距上次读超过了空闲时间.
DefaultFuture 继承 CompletableFuture 类,首先关注下该类的重要属性.
private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
// put into waiting map.
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
在来关注下 TimeoutCheckTask 这个任务,如果任务顺利做完,则一切 ok,如果没有顺利做完,则肯定有异常,要么服务端超时,要么客户端超时. 最后调用 DefaultFuture.received 方法进行处理. 然后到 doReceived 方法. 其实是不是有一个疑问?如果正常执行完,那么正常的 response 是在哪里设置进去的了?答案是 HeaderExchangeHandler.handleResponse 方法.
先去网上盗个图:
ExchangeChannel 继承 Channel,额外封装了两个方法:request,用于更贴近用户使用习惯.
ExchangeClient 继承 Client 和 ExchangeChannel 接口,这个表达的意思是客户端是 Channel 的具体化.
HeaderExchangeClient 实现 ExchangeClient 接口,同样采用了装饰者模式,HeaderExchangeClient 持有 client 和 channel 属性. client 本身就是从 channel 细化而来,所以 channel 是直接 new HeaderExchangeChannel(client). 其实这一步就是在增强 channel 的功能. 值得注意的是 client 端有重连任务和心跳任务. 我发现 HeaderExchangeClient 实现的方法,都是直接调用 client.xxx 或者 channel.xxx.
下面关注下两个任务.
两个任务的相关内容参考上面.
HeaderExchangeChannel 实现了 ExchangeChannel 接口,该类同样采用了装饰者模式,是功能的增强. 关于该类实现的核心方法,我们重点关注下 send 方法和 request 方法.
public void send(Object message, boolean sent) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + “, cause: The channel " + this + " is closed!”);
}
if (message instanceof Request
|| message instanceof Response
|| message instanceof String) {
channel.send(message, sent);
} else {
Request request = new Request();
request.setVersion(Version.getProtocolVersion());
request.setTwoWay(false);
request.setData(message);
channel.send(request, sent);
}
}
是不是感觉很怪异?但是又说不上来哪里怪了?HeaderExchangeChannel 的职责就是用Request对象包装请求体,但是如果之前已经有装饰器对请求体做过包装了,那就没必要再包装一层了. 那么问题来了?
1.啥时候回封装 Response 对象?
2.为啥 String 类型的消息可以直接被发送走?
经过高人指点,发现 HeaderExchangeHandler 中有这么一段处理逻辑:
public void received(Channel channel, Object message) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
final ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
if (message instanceof Request) {
// handle request.
Request request = (Request) message;
if (request.isEvent()) {
handlerEvent(channel, request);
} else {
if (request.isTwoWay()) {
handleRequest(exchangeChannel, request);
} else {
handler.received(exchangeChannel, request.getData());
}
}
} else if (message instanceof Response) {
handleResponse(channel, (Response) message);
} else if (message instanceof String) {
if (isClientSide(channel)) {
Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
logger.error(e.getMessage(), e);
} else {
String echo = handler.telnet(channel, (String) message);
if (echo != null && echo.length() > 0) {
channel.send(echo);
}
}
} else {
handler.received(exchangeChannel, message);
}
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
我们再看下 request 方法:
public CompletableFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + “, cause: The channel " + this + " is closed!”);
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
1.为什么 request 方法不需要判断 request 的类型了?类似与 send 方法那样判断?
我的理解是调用问题吧. send 方法用于单方面的通知(也不一就,但是有一点可以确定的是 send 方法只管发送,发送完后就不管结果了),而 request 类似于请求/应答模式,是有结果的.
我发现,类似于 ExecutionChannelHandler 类,调用的都是 channel.send 方法,而没有使用 request. 所以,request 方法是不需要额外的进行判断的.
ExchangeHandler 继承 ChannelHandler 和 TelnetHandler,该类之定义了一个方法,CompletableFuture reply(ExchangeChannel channel, Object request).
ExchangeHandlerDispatcher 实现 ExchangeHandler 接口,该类运用了组合模式,有三个比较关键的成员变量:
private final ReplierDispatcher replierDispatcher;private final ChannelHandlerDispatcher handlerDispatcher;private final TelnetHandler telnetHandler;
该三个成员变量,分别管不同的方法.
ExchangeHandlerAdapter 主要是为了做适配 TelnetHandlerAdapter 和 ExchangeHandler.
ExchangeServer 继承 Server,扩展了两个和 ExchangeChannel 相关的方法.
HeaderExchangeServer 实现 ExchangeServer,同样采用了装饰者模式,装饰 Server. 需要注意的是,空闲检查在 Server 和 Client 的处理是不一样的,Server 是关闭 Channel,而 Client 是重连.
当服务端关闭时,会发送 readonly 事件,通知客户端服务不可用.
ExchangeServerDelegate 为 ExchangeServer 的代理.
Exchanger 类似 Transporter
HeaderExchangeHandler 和 netty 中 handler 的概念很类似,我们看下它是如何处理事件的.
我们可以看到,connected 的时候,放入了 read 时间戳和 write 时间戳. 同时将 channel 包装成 ExchangeHandler.
public void connected(Channel channel) throws RemotingException {
channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
handler.connected(exchangeChannel);
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
}
然后看下 sent 方法:
public void sent(Channel channel, Object message) throws RemotingException {
Throwable exception = null;
try {
channel.setAttribute(KEY_WRITE_TIMESTAMP, System.currentTimeMillis());
ExchangeChannel exchangeChannel = HeaderExchangeChannel.getOrAddChannel(channel);
try {
handler.sent(exchangeChannel, message);
} finally {
HeaderExchangeChannel.removeChannelIfDisconnected(channel);
}
} catch (Throwable t) {
exception = t;
}
// Request 类型的消息,才会有发送. 因为 Request 类型的消息,需要得到结果,所以需要标记已发送.
if (message instanceof Request) {
Request request = (Request) message;
DefaultFuture.sent(channel, request);
}
if (exception != null) {
if (exception instanceof RuntimeException) {
throw (RuntimeException) exception;
} else if (exception instanceof RemotingException) {
throw (RemotingException) exception;
} else {
throw new RemotingException(channel.getLocalAddress(), channel.getRemoteAddress(),
exception.getMessage(), exception);
}
}
}
这个类看的还不是很明白,有待调试…
客户端调用链:
注意:这里的 handler 其实是 NettyClient.
这个 handler 是如何被组装起来的?
首先看下 client,发现调用 wrapChannelHandler 的时候,会增加 MultiMessageHandler 和 HeartbeatHandler 的功能. 同时会进一步使用 dispatcher 获得一个 handler,默认的是 AllChannelHandler.
public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
super(url, wrapChannelHandler(url, handler));
}
那 DecodeHandler 是如何被封装进去的了?
答案在 DubboPotocol 类中:
client = Exchangers.connect(url, requestHandler);
然后到 HeaderExchange.connect 方法中:
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
最后的那个 DubboProtocol$1@3481 是 requestHandler
那数据时如何发送的了?通过 ReferenceCountExchangeClient.request -> HeaderExchangeClient.request -> HeaderExchangeChannel.request(将 object 封装成 Request 对象)
Request 对象在传输的过程中,会调用编解码器进行处理.
服务端调用链:
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
// you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
// the handler will be warped: MultiMessageHandler->HeartbeatHandler->handler
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}