今天,我们继续研究 SelectorData 的数据流向,并以 divide 插件为例,看看数据最终的应用场景,比如ip 探活。
DividePluginDataHandler
有了上篇文章的基础,我们直接打开 CommonPluginDataSubscriber.subscribeDataHandler 方法:
private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
Optional.ofNullable(classData).ifPresent(data -> {
...else if (data instanceof SelectorData) {
SelectorData selectorData = (SelectorData) data;if (dataType == DataEventTypeEnum.UPDATE) {
BaseDataCache.getInstance().cacheSelectData(selectorData);Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));} else if (dataType == DataEventTypeEnum.DELETE) {
BaseDataCache.getInstance().removeSelectData(selectorData);Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));}} ...});}
除去同样会在 BaseDataCache 中缓存一份之外,还会调用 PluginDataHandler.handlerPlugin(SelectorData selectorData); 方法。查看其实现,最终落到了 UpstreamCacheManager 类中:
public void handlerSelector(final SelectorData selectorData) {
UpstreamCacheManager.getInstance().submit(selectorData);}
UpstreamCacheManager
针对 divide 插件,SelectorData 信息在 UpstreamCacheManager 类中被解析为一串 DivideUpstream 实例:
public void submit(final SelectorData selectorData) {
final List<DivideUpstream> upstreamList = GsonUtils.getInstance().fromList(selectorData.getHandle(), DivideUpstream.class);if (null != upstreamList && upstreamList.size() > 0) {
UPSTREAM_MAP.put(selectorData.getId(), upstreamList);UPSTREAM_MAP_TEMP.put(selectorData.getId(), upstreamList);} else {
UPSTREAM_MAP.remove(selectorData.getId());UPSTREAM_MAP_TEMP.remove(selectorData.getId());}}
可以看到,数据最终落入了两个 ConcurrentHashMap 中,继续看此类的源码,可以发现,其中 UPSTREAM_MAP 是用来服务探活的,而 UPSTREAM_MAP_TEMP 是用于存储探活后的 DivideUpstream,并最终被插件所用。
首先,创建定时任务:
private UpstreamCacheManager() {
// 判断用户是否打开了定期探活的配置boolean check = Boolean.parseBoolean(System.getProperty("soul.upstream.check", "false"));if (check) {
// 创建一个定时任务线程池new ScheduledThreadPoolExecutor(1, SoulThreadFactory.create("scheduled-upstream-task", false)).scheduleWithFixedDelay(this::scheduled,30, Integer.parseInt(System.getProperty("soul.upstream.scheduledTime", "30")), TimeUnit.SECONDS);}}
接着定时任务会调用下述逻辑:
private void scheduled() {
if (UPSTREAM_MAP.size() > 0) {
// 遍历 UPSTREAM_MAPUPSTREAM_MAP.forEach((k, v) -> {
// 进行探活List<DivideUpstream> result = check(v);if (result.size() > 0) {
// 将探活的结果放入 UPSTREAM_MAP_TEMP 中UPSTREAM_MAP_TEMP.put(k, result);} else {
UPSTREAM_MAP_TEMP.remove(k);}});}}
探活逻辑如下:
private List<DivideUpstream> check(final List<DivideUpstream> upstreamList) {
List<DivideUpstream> resultList = Lists.newArrayListWithCapacity(upstreamList.size());for (DivideUpstream divideUpstream : upstreamList) {
// 调用探活工具方法final boolean pass = UpstreamCheckUtils.checkUrl(divideUpstream.getUpstreamUrl());// 修改状态if (pass) {
if (!divideUpstream.isStatus()) {
divideUpstream.setTimestamp(System.currentTimeMillis());divideUpstream.setStatus(true);log.info("UpstreamCacheManager detect success the url: {}, host: {} ", divideUpstream.getUpstreamUrl(), divideUpstream.getUpstreamHost());}resultList.add(divideUpstream);} else {
divideUpstream.setStatus(false);log.error("check the url={} is fail ", divideUpstream.getUpstreamUrl());}}return resultList;}// org.dromara.soul.common.utils.UpstreamCheckUtils.checkUrlpublic static boolean checkUrl(final String url) {
if (StringUtils.isBlank(url)) {
return false;}// 使用 Pattern 判断 URL 格式是否合法if (checkIP(url)) {
String[] hostPort;if (url.startsWith(HTTP)) {
final String[] http = StringUtils.split(url, "\\/\\/");hostPort = StringUtils.split(http[1], Constants.COLONS);} else {
hostPort = StringUtils.split(url, Constants.COLONS);}// 判断 host 的可连接性return isHostConnector(hostPort[0], Integer.parseInt(hostPort[1]));} else {
// 判断 host 的可达性return isHostReachable(url);}}private static boolean isHostConnector(final String host, final int port) {
// 创建一个新的 Sockettry (Socket socket = new Socket()) {
// 使用 socket 尝试连接相应的端口socket.connect(new InetSocketAddress(host, port));} catch (IOException e) {
return false;}return true;}private static boolean isHostReachable(final String host) {
try {
// 无端口号的情况下,则直接针对 host 进行可达性检测return InetAddress.getByName(host).isReachable(1000);} catch (IOException ignored) {
}return false;}
小结
至此,我们通过实际的 divide 插件作为例子,看到了 SelectorData 的数据流向,并以此做引,看到了 divide 插件的探活机制。在最后,作为补充,探活后的 DivideUpstream 结果,会在每次进行权重获取时被用到:
// org.dromara.soul.plugin.divide.balance.spi.AbstractLoadBalanceprotected int getWeight(final DivideUpstream upstream) {
// 如果 DivideUpstream 未存活,则权重为 0if (!upstream.isStatus()) {
return 0;}int weight = getWeight(upstream.getTimestamp(), getWarmup(upstream.getWarmup(), Constants.DEFAULT_WARMUP), upstream.getWeight());return weight;}