当前位置: 代码迷 >> 综合 >> Soul 网关源码阅读之 plugin data 的数据流向
  详细解决方案

Soul 网关源码阅读之 plugin data 的数据流向

热度:37   发布时间:2023-12-15 21:32:55.0

在过去的几篇文章中,我们阅读了 soul 几种数据同步方式的源码,也知道同步的目的是将 admin 的信息同步到 bootstrap 的 jvm 内存中,那么,具体来说,这些数据最终流向了哪里?这次,我们来看一下 plugin data 的目的地。

PluginDataSubscriber

有了之前的基础,我们从 zookeeper 模块入手可以很轻松的找到处理 plugin data 的代码:

	private void watcherPlugin(final String pluginName) {
    String pluginPath = ZkPathConstants.buildPluginPath(pluginName);if (!zkClient.exists(pluginPath)) {
    zkClient.createPersistent(pluginPath, true);}// 更新缓存cachePluginData(zkClient.readData(pluginPath));// 在 zookeeper 中订阅 plugin data 的变化subscribePluginDataChanges(pluginPath, pluginName);}private void cachePluginData(final PluginData pluginData) {
    Optional.ofNullable(pluginData).flatMap(data -> Optional.ofNullable(pluginDataSubscriber)).ifPresent(e -> e.onSubscribe(pluginData));}private void subscribePluginDataChanges(final String pluginPath, final String pluginName) {
    zkClient.subscribeDataChanges(pluginPath, new IZkDataListener() {
    @Overridepublic void handleDataChange(final String dataPath, final Object data) {
    Optional.ofNullable(data).ifPresent(d -> Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.onSubscribe((PluginData) d)));}@Overridepublic void handleDataDeleted(final String dataPath) {
    final PluginData data = new PluginData();data.setName(pluginName);Optional.ofNullable(pluginDataSubscriber).ifPresent(e -> e.unSubscribe(data));}});}

可以看到,不论是 cachePluginData 方法的直接缓存更新,还是 subscribePluginDataChanges 方法的监听器创建,最终都指向了方法 org.dromara.soul.sync.data.api.PluginDataSubscriber.onSubscribe(PluginData pluginData);而此方法只有一个默认实现 CommonPluginDataSubscriber.onSubscribe,并最终调用了 subscribeDataHandler 方法:

			if (data instanceof PluginData) {
    PluginData pluginData = (PluginData) data;if (dataType == DataEventTypeEnum.UPDATE) {
    BaseDataCache.getInstance().cachePluginData(pluginData);Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.handlerPlugin(pluginData));} else if (dataType == DataEventTypeEnum.DELETE) {
    BaseDataCache.getInstance().removePluginData(pluginData);Optional.ofNullable(handlerMap.get(pluginData.getName())).ifPresent(handler -> handler.removePlugin(pluginData));}}

我们只截取与本次文章有关的 PluginData 部分,可以看到,数据流向有两处,一处是 BaseDataCache,一处是 PluginDataHandler.handlerPlugin。

BaseDataCache

org.dromara.soul.plugin.base.cache.BaseDataCache 类实现了一个饿汉式的单例模式,在初始化时就直接进行了实例的创建,它维护了三个 ConcurrentHashMap,分别缓存 PluginData,SelectorData 和 RuleData。
点开 obtainPluginData 的调用方:
在这里插入图片描述
可以看到最终有两处使用了此缓存,一处是前文提到的插件调用链中,用于在抽象父类的 execute 方法中获取,判断当前插件是否可用的:

	public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
    String pluginName = named();final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);if (pluginData != null && pluginData.getEnabled()) {
    // 具体逻辑,前文中有表达,不再赘述...return doExecute(exchange, chain, selectorData, rule);}return chain.execute(exchange);}

另一处是在默认的鉴权插件实现中判断插件的可用性的,功能与上文相同:

    public Pair<Boolean, String> signVerify(final ServerWebExchange exchange) {
    PluginData signData = BaseDataCache.getInstance().obtainPluginData(PluginEnum.SIGN.getName());if (signData != null && signData.getEnabled()) {
    final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);assert soulContext != null;return verify(soulContext, exchange);}return Pair.of(Boolean.TRUE, "");}

PluginDataHandler.handlerPlugin(PluginData pluginData)

查看此方法的实现类:
在这里插入图片描述
使用之前的文章中分析过的防火墙模块举例,,org.dromara.soul.plugin.waf.handler.WafPluginDataHandler:

public void handlerPlugin(final PluginData pluginData) {
    WafConfig wafConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), WafConfig.class);Singleton.INST.single(WafConfig.class, wafConfig);
}

数据在这里落入了一个使用枚举类实现的单例模式代码中,这个枚举类维护了一个 ConcurrentHashMap,这种单例模式的实现方式也是 《Effective Java》推荐的实现方式,值得学习一下:

public enum Singleton {
    /*** Inst singleton.*/INST;/*** The Singles.*/private static final Map<String, Object> SINGLES = new ConcurrentHashMap<>();/*** Single.** @param clazz the clazz* @param o the o*/public void single(final Class clazz, final Object o) {
    SINGLES.put(clazz.getName(), o);}/*** Get t.** @param <T> the type parameter* @param clazz the clazz* @return the t*/@SuppressWarnings("unchecked")public <T> T get(final Class<T> clazz) {
    return (T) SINGLES.get(clazz.getName());}
}

最终,我们可以在 WafPlugin 类中看到,这些数据,也就是插件的属性,在插件被调用时,会使用到:

    protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
    // 获取防火墙插件的配置属性WafConfig wafConfig = Singleton.INST.get(WafConfig.class);if (Objects.isNull(selector) && Objects.isNull(rule)) {
    // 判断用户配置的属性是黑名单模式还是混合模式if (WafModelEnum.BLACK.getName().equals(wafConfig.getModel())) {
    return chain.execute(exchange);}exchange.getResponse().setStatusCode(HttpStatus.FORBIDDEN);Object error = SoulResultWrap.error(403, Constants.REJECT_MSG, null);return WebFluxResultUtils.result(exchange, error);}String handle = rule.getHandle();WafHandle wafHandle = GsonUtils.getInstance().fromJson(handle, WafHandle.class);if (Objects.isNull(wafHandle) || StringUtils.isBlank(wafHandle.getPermission())) {
    log.error("waf handler can not configuration:{}", handle);return chain.execute(exchange);}if (WafEnum.REJECT.getName().equals(wafHandle.getPermission())) {
    exchange.getResponse().setStatusCode(HttpStatus.FORBIDDEN);Object error = SoulResultWrap.error(Integer.parseInt(wafHandle.getStatusCode()), Constants.REJECT_MSG, null);return WebFluxResultUtils.result(exchange, error);}return chain.execute(exchange);}

小结

可以看到,plugin data 最终分别流向了两处单例缓存中,并在整个插件链的调用流程中被使用。

  相关解决方案