当前位置: 代码迷 >> 综合 >> Soul 网关源码阅读之 apache dubbo plugin
  详细解决方案

Soul 网关源码阅读之 apache dubbo plugin

热度:48   发布时间:2023-12-15 21:32:27.0

Soul 提供了 apache 和 alibaba 两种 dubbo 的支持。本篇先来看 apache dubbo 插件的源码。

ApacheDubboPlugin

插件类本身的代码非常简单,只是判断一些上下文需要的对象是否可以正常使用,最终将处理逻辑交给了 ApacheDubboProxyService 类:

    protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
    String body = exchange.getAttribute(Constants.DUBBO_PARAMS);SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);assert soulContext != null;// 获取元数据MetaData metaData = exchange.getAttribute(Constants.META_DATA);if (!checkMetaData(metaData)) {
    assert metaData != null;log.error(" path is :{}, meta data have error.... {}", soulContext.getPath(), metaData.toString());exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);Object error = SoulResultWrap.error(SoulResultEnum.META_DATA_ERROR.getCode(), SoulResultEnum.META_DATA_ERROR.getMsg(), null);return WebFluxResultUtils.result(exchange, error);}// 判断必须的对象是否处于可用状态if (StringUtils.isNoneBlank(metaData.getParameterTypes()) && StringUtils.isBlank(body)) {
    exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);Object error = SoulResultWrap.error(SoulResultEnum.DUBBO_HAVE_BODY_PARAM.getCode(), SoulResultEnum.DUBBO_HAVE_BODY_PARAM.getMsg(), null);return WebFluxResultUtils.result(exchange, error);}// 交给实际的逻辑处理final Mono<Object> result = dubboProxyService.genericInvoker(body, metaData, exchange);return result.then(chain.execute(exchange));}

ApacheDubboProxyService

从名字就可以看出来,这个类代理 dubbo 的实际 API 对外提供接口,其 genericInvoker 方法完成了上下文的组装和最终的 RPC 调用。

    public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws SoulException {
    String dubboTagRouteFromHttpHeaders = exchange.getRequest().getHeaders().getFirst(Constants.DUBBO_TAG_ROUTE);if (StringUtils.isNotBlank(dubboTagRouteFromHttpHeaders)) {
    RpcContext.getContext().setAttachment(CommonConstants.TAG_KEY, dubboTagRouteFromHttpHeaders);}// 从缓存中获取 ReferenceConfig ReferenceConfig<GenericService> reference = ApplicationConfigCache.getInstance().get(metaData.getPath());if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterface())) {
    // ReferenceConfig 为空时,使缓存失效ApplicationConfigCache.getInstance().invalidate(metaData.getPath());// 并通过 metaData 重建reference = ApplicationConfigCache.getInstance().initRef(metaData);}// 获取 GenericServiceGenericService genericService = reference.get();Pair<String[], Object[]> pair;if (ParamCheckUtils.dubboBodyIsEmpty(body)) {
    pair = new ImmutablePair<>(new String[]{
    }, new Object[]{
    });} else {
    pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes());}// 发出请求CompletableFuture<Object> future = genericService.$invokeAsync(metaData.getMethodName(), pair.getLeft(), pair.getRight());return Mono.fromFuture(future.thenApply(ret -> {
    if (Objects.isNull(ret)) {
    ret = Constants.DUBBO_RPC_RESULT_EMPTY;}exchange.getAttributes().put(Constants.DUBBO_RPC_RESULT, ret);exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());return ret;})).onErrorMap(exception -> exception instanceof GenericException ? new SoulException(((GenericException) exception).getExceptionMessage()) : new SoulException(exception));}

可以看到,ApacheDubboProxyService.genericInvoker 实际上是代理了 GenericService.$invokeAsync 方法,承担与缓存组件的交互和请求参数的组装。

ApplicationConfigCache

接下来,我们具体的看一下 genericInvoker 方法中频繁出现的缓存操作。
整体上,这一部分的缓存使用了 guava 提供的 cache,整个类是单例的,使用静态内部类实现。至此,结合前面的几篇文章,我们已经在 soul 中看到了三种不同的单例模式实现方式,值得参考学习。

    /*** Gets instance.** @return the instance*/public static ApplicationConfigCache getInstance() {
    return ApplicationConfigCacheInstance.INSTANCE;}/*** The type Application config cache instance.*/static class ApplicationConfigCacheInstance {
    /*** The Instance.*/static final ApplicationConfigCache INSTANCE = new ApplicationConfigCache();}

除去缓存外,ApplicationConfigCache 类还维护了两个成员变量,分别是应用信息 ApplicationConfig,和注册中心信息 RegistryConfig,它们在 init 方法中初始化:

    /*** Init.** @param dubboRegisterConfig the dubbo register config*/public void init(final DubboRegisterConfig dubboRegisterConfig) {
    if (applicationConfig == null) {
    applicationConfig = new ApplicationConfig("soul_proxy");}if (registryConfig == null) {
    registryConfig = new RegistryConfig();registryConfig.setProtocol(dubboRegisterConfig.getProtocol());registryConfig.setId("soul_proxy");registryConfig.setRegister(false);registryConfig.setAddress(dubboRegisterConfig.getRegister());Optional.ofNullable(dubboRegisterConfig.getGroup()).ifPresent(registryConfig::setGroup);}}

而这个方法会在 PluginData 变更时调用,即 ApacheDubboPluginDataHandler.handlerPlugin:

    public void handlerPlugin(final PluginData pluginData) {
    if (null != pluginData && pluginData.getEnabled()) {
    DubboRegisterConfig dubboRegisterConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), DubboRegisterConfig.class);DubboRegisterConfig exist = Singleton.INST.get(DubboRegisterConfig.class);if (Objects.isNull(dubboRegisterConfig)) {
    return;}if (Objects.isNull(exist) || !dubboRegisterConfig.equals(exist)) {
    // If it is null, initialize it// 当 admin 模块操作变更了 PluginData 时,初始化配置信息ApplicationConfigCache.getInstance().init(dubboRegisterConfig);// 同时让缓存失效ApplicationConfigCache.getInstance().invalidateAll();}Singleton.INST.single(DubboRegisterConfig.class, dubboRegisterConfig);}}

接下来是 initRef 方法,直接看注释即可:

    public ReferenceConfig<GenericService> initRef(final MetaData metaData) {
    try {
    // 根据路径获取缓存ReferenceConfig<GenericService> referenceConfig = cache.get(metaData.getPath());if (StringUtils.isNoneBlank(referenceConfig.getInterface())) {
    return referenceConfig;}} catch (ExecutionException e) {
    log.error("init dubbo ref ex:{}", e.getMessage());}// 获取失败时,重新构建缓存return build(metaData);}

构建缓存的过程:

    public ReferenceConfig<GenericService> build(final MetaData metaData) {
    ReferenceConfig<GenericService> reference = new ReferenceConfig<>();reference.setGeneric(true);reference.setApplication(applicationConfig);reference.setRegistry(registryConfig);reference.setInterface(metaData.getServiceName());reference.setProtocol("dubbo");String rpcExt = metaData.getRpcExt();// 根据 MetaData 组装 ReferenceConfigDubboParamExtInfo dubboParamExtInfo = GsonUtils.getInstance().fromJson(rpcExt, DubboParamExtInfo.class);if (Objects.nonNull(dubboParamExtInfo)) {
    if (StringUtils.isNoneBlank(dubboParamExtInfo.getVersion())) {
    reference.setVersion(dubboParamExtInfo.getVersion());}if (StringUtils.isNoneBlank(dubboParamExtInfo.getGroup())) {
    reference.setGroup(dubboParamExtInfo.getGroup());}if (StringUtils.isNoneBlank(dubboParamExtInfo.getLoadbalance())) {
    final String loadBalance = dubboParamExtInfo.getLoadbalance();// 组装不同的负载均衡方式,buildLoadBalanceName 方法仅做了字符串转换reference.setLoadbalance(buildLoadBalanceName(loadBalance));}if (StringUtils.isNoneBlank(dubboParamExtInfo.getUrl())) {
    reference.setUrl(dubboParamExtInfo.getUrl());}Optional.ofNullable(dubboParamExtInfo.getTimeout()).ifPresent(reference::setTimeout);Optional.ofNullable(dubboParamExtInfo.getRetries()).ifPresent(reference::setRetries);}try {
    Object obj = reference.get();if (obj != null) {
    log.info("init apache dubbo reference success there meteData is :{}", metaData.toString());// 将初始化好的 ReferenceConfig 放入缓存,key 为路径信息cache.put(metaData.getPath(), reference);}} catch (Exception e) {
    log.error("init apache dubbo reference ex:{}", e.getMessage());}return reference;}

后面的三个方法 get,invalidate,invalidateAll 仅是对 LoadingCache 对应方法的封装调用,比较简单,不再赘述。

  相关解决方案