当前位置: 代码迷 >> 综合 >> Soul 网关源码阅读之 selector data 及 ip 探活机制
  详细解决方案

Soul 网关源码阅读之 selector data 及 ip 探活机制

热度:22   发布时间:2023-12-15 21:32:38.0

今天,我们继续研究 SelectorData 的数据流向,并以 divide 插件为例,看看数据最终的应用场景,比如ip 探活。

DividePluginDataHandler

有了上篇文章的基础,我们直接打开 CommonPluginDataSubscriber.subscribeDataHandler 方法:

    private <T> void subscribeDataHandler(final T classData, final DataEventTypeEnum dataType) {
    Optional.ofNullable(classData).ifPresent(data -> {
    ...else if (data instanceof SelectorData) {
    SelectorData selectorData = (SelectorData) data;if (dataType == DataEventTypeEnum.UPDATE) {
    BaseDataCache.getInstance().cacheSelectData(selectorData);Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.handlerSelector(selectorData));} else if (dataType == DataEventTypeEnum.DELETE) {
    BaseDataCache.getInstance().removeSelectData(selectorData);Optional.ofNullable(handlerMap.get(selectorData.getPluginName())).ifPresent(handler -> handler.removeSelector(selectorData));}} ...});}

除去同样会在 BaseDataCache 中缓存一份之外,还会调用 PluginDataHandler.handlerPlugin(SelectorData selectorData); 方法。查看其实现,最终落到了 UpstreamCacheManager 类中:

    public void handlerSelector(final SelectorData selectorData) {
    UpstreamCacheManager.getInstance().submit(selectorData);}

UpstreamCacheManager

针对 divide 插件,SelectorData 信息在 UpstreamCacheManager 类中被解析为一串 DivideUpstream 实例:

    public void submit(final SelectorData selectorData) {
    final List<DivideUpstream> upstreamList = GsonUtils.getInstance().fromList(selectorData.getHandle(), DivideUpstream.class);if (null != upstreamList && upstreamList.size() > 0) {
    UPSTREAM_MAP.put(selectorData.getId(), upstreamList);UPSTREAM_MAP_TEMP.put(selectorData.getId(), upstreamList);} else {
    UPSTREAM_MAP.remove(selectorData.getId());UPSTREAM_MAP_TEMP.remove(selectorData.getId());}}

可以看到,数据最终落入了两个 ConcurrentHashMap 中,继续看此类的源码,可以发现,其中 UPSTREAM_MAP 是用来服务探活的,而 UPSTREAM_MAP_TEMP 是用于存储探活后的 DivideUpstream,并最终被插件所用。
首先,创建定时任务:

    private UpstreamCacheManager() {
    // 判断用户是否打开了定期探活的配置boolean check = Boolean.parseBoolean(System.getProperty("soul.upstream.check", "false"));if (check) {
    // 创建一个定时任务线程池new ScheduledThreadPoolExecutor(1, SoulThreadFactory.create("scheduled-upstream-task", false)).scheduleWithFixedDelay(this::scheduled,30, Integer.parseInt(System.getProperty("soul.upstream.scheduledTime", "30")), TimeUnit.SECONDS);}}

接着定时任务会调用下述逻辑:

    private void scheduled() {
    if (UPSTREAM_MAP.size() > 0) {
    // 遍历 UPSTREAM_MAPUPSTREAM_MAP.forEach((k, v) -> {
    // 进行探活List<DivideUpstream> result = check(v);if (result.size() > 0) {
    // 将探活的结果放入 UPSTREAM_MAP_TEMP 中UPSTREAM_MAP_TEMP.put(k, result);} else {
    UPSTREAM_MAP_TEMP.remove(k);}});}}

探活逻辑如下:

    private List<DivideUpstream> check(final List<DivideUpstream> upstreamList) {
    List<DivideUpstream> resultList = Lists.newArrayListWithCapacity(upstreamList.size());for (DivideUpstream divideUpstream : upstreamList) {
    // 调用探活工具方法final boolean pass = UpstreamCheckUtils.checkUrl(divideUpstream.getUpstreamUrl());// 修改状态if (pass) {
    if (!divideUpstream.isStatus()) {
    divideUpstream.setTimestamp(System.currentTimeMillis());divideUpstream.setStatus(true);log.info("UpstreamCacheManager detect success the url: {}, host: {} ", divideUpstream.getUpstreamUrl(), divideUpstream.getUpstreamHost());}resultList.add(divideUpstream);} else {
    divideUpstream.setStatus(false);log.error("check the url={} is fail ", divideUpstream.getUpstreamUrl());}}return resultList;}// org.dromara.soul.common.utils.UpstreamCheckUtils.checkUrlpublic static boolean checkUrl(final String url) {
    if (StringUtils.isBlank(url)) {
    return false;}// 使用 Pattern 判断 URL 格式是否合法if (checkIP(url)) {
    String[] hostPort;if (url.startsWith(HTTP)) {
    final String[] http = StringUtils.split(url, "\\/\\/");hostPort = StringUtils.split(http[1], Constants.COLONS);} else {
    hostPort = StringUtils.split(url, Constants.COLONS);}// 判断 host 的可连接性return isHostConnector(hostPort[0], Integer.parseInt(hostPort[1]));} else {
    // 判断 host 的可达性return isHostReachable(url);}}private static boolean isHostConnector(final String host, final int port) {
    // 创建一个新的 Sockettry (Socket socket = new Socket()) {
    // 使用 socket 尝试连接相应的端口socket.connect(new InetSocketAddress(host, port));} catch (IOException e) {
    return false;}return true;}private static boolean isHostReachable(final String host) {
    try {
    // 无端口号的情况下,则直接针对 host 进行可达性检测return InetAddress.getByName(host).isReachable(1000);} catch (IOException ignored) {
    }return false;}

小结

至此,我们通过实际的 divide 插件作为例子,看到了 SelectorData 的数据流向,并以此做引,看到了 divide 插件的探活机制。在最后,作为补充,探活后的 DivideUpstream 结果,会在每次进行权重获取时被用到:

	// org.dromara.soul.plugin.divide.balance.spi.AbstractLoadBalanceprotected int getWeight(final DivideUpstream upstream) {
    // 如果 DivideUpstream 未存活,则权重为 0if (!upstream.isStatus()) {
    return 0;}int weight = getWeight(upstream.getTimestamp(), getWarmup(upstream.getWarmup(), Constants.DEFAULT_WARMUP), upstream.getWeight());return weight;}
  相关解决方案