当前位置: 代码迷 >> 综合 >> Soul 源码阅读之 tars 插件
  详细解决方案

Soul 源码阅读之 tars 插件

热度:67   发布时间:2023-12-15 21:32:14.0

Soul 同样支持腾讯的 tars RPC 框架。插件主流程不复杂:

    protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
    String body = exchange.getAttribute(Constants.TARS_PARAMS);SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);assert soulContext != null;MetaData metaData = exchange.getAttribute(Constants.META_DATA);...// 从缓存中获取代理对象列表TarsInvokePrxList tarsInvokePrxList = ApplicationConfigCache.getInstance().get(metaData.getPath());// 随机选择一个代理int index = RANDOM.nextInt(tarsInvokePrxList.getTarsInvokePrxList().size());Object prx = tarsInvokePrxList.getTarsInvokePrxList().get(index).getInvokePrx();Method method = tarsInvokePrxList.getMethod();CompletableFuture future;try {
    // 执行代理对象的 methodfuture = (CompletableFuture) method.invoke(prx, PrxInfoUtil.getParamArray(tarsInvokePrxList.getParamTypes(), tarsInvokePrxList.getParamNames(), body));} catch (Exception e) {
    ...return WebFluxResultUtils.result(exchange, error);}return Mono.fromFuture(future.thenApply(ret -> {
    if (Objects.isNull(ret)) {
    ret = Constants.TARS_RPC_RESULT_EMPTY;}exchange.getAttributes().put(Constants.TARS_RPC_RESULT, ret);exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());return ret;})).onErrorMap(m -> new SoulException("failed to invoke tars")).then(chain.execute(exchange));}

元数据缓存同样使用了 guava 的 cache,它维护了三个缓存数据:

    private final LoadingCache<String, TarsInvokePrxList> cache = CacheBuilder.newBuilder().maximumWeight(maxCount).weigher((Weigher<String, TarsInvokePrxList>) (string, referenceConfig) -> getSize()).build(new CacheLoader<String, TarsInvokePrxList>() {
    @Overridepublic TarsInvokePrxList load(final String key) {
    return new TarsInvokePrxList(new CopyOnWriteArrayList<>(), null, null, null);}});private final ConcurrentHashMap<String, Class<?>> prxClassCache = new ConcurrentHashMap<>();private final ConcurrentHashMap<String, TarsParamInfo> prxParamCache = new ConcurrentHashMap<>();

其中 cache 是最终会被其他组件使用到的,而 prxClassCache 和 prxParamCache 则是缓存在组装 cache 时的中间结果。

此类的大多数方法都同上篇文章一样,是对 cache 操作的封装,唯一复杂的是 initPrx 方法,大体分为三个部分,第一个部分是一个循环结构,用于组装缓存需要的数据:

		for (; ;) {
    Class<?> prxClass = prxClassCache.get(metaData.getServiceName());try {
    ...} catch (Exception e) {
    log.error("init tars ref ex:{}", e.getMessage());break;}}

第二部分则是根据元数据组装中间结果缓存:

		// 当 prxClassCache 中没有当前 metaData 指向的代理类信息时if (Objects.isNull(prxClass)) {
    assert LOCK != null;// 可重入锁,防止两个 metaData 同步请求重复初始化if (LOCK.tryLock()) {
    try {
    // 动态组装类的实例,并添加注解 @Servant...Class<?> prxClazz = classDefinition.annotateType(AnnotationDescription.Builder.ofType(Servant.class).build()).make().load(Servant.class.getClassLoader(), ClassLoadingStrategy.Default.INJECTION).getLoaded();assert communicator != null;// 放入缓存中prxClassCache.put(metaData.getServiceName(), prxClazz);} finally {
    LOCK.unlock();}}}

最后,将信息放入 cache 中

        else {
    // if object name is same it will return same prx// 使用 tars 提供的类,将上一部分创建好的实例转换成代理类Object prx = communicator.stringToProxy(prxClass, PrxInfoUtil.getObjectName(metaData));TarsInvokePrxList tarsInvokePrxList = cache.get(metaData.getPath());// 使用元数据和上一部分组装好的中间结果,初始化 TarsInvokePrxListif (tarsInvokePrxList.getMethod() == null) {
    TarsParamInfo tarsParamInfo = prxParamCache.get(getClassMethodKey(prxClass.getName(), metaData.getMethodName()));Method method = prx.getClass().getDeclaredMethod(PrxInfoUtil.getMethodName(metaData.getMethodName()), tarsParamInfo.getParamTypes());tarsInvokePrxList.setMethod(method);tarsInvokePrxList.setParamTypes(tarsParamInfo.getParamTypes());tarsInvokePrxList.setParamNames(tarsParamInfo.getParamNames());}tarsInvokePrxList.getTarsInvokePrxList().add(new TarsInvokePrx(prx, metaData.getAppName()));break;}

可以看到,tars RPC 框架的支持,使用了大量的反射技术。