当前位置: 代码迷 >> 综合 >> dubbo使用7----> dubbo spi源码分析
  详细解决方案

dubbo使用7----> dubbo spi源码分析

热度:79   发布时间:2023-10-24 15:29:24.0

1、根据名称获取扩展点的源码分析

      1.1、使用方式案例 org.apache.dubbo.container.Container扩展点       

默认的实现是spring
@SPI("spring")
public interface Container {/*** start method to load the container.*/void start();/*** stop method to unload the container.*/void stop();}

       1.2、spi文件

spring=org.apache.dubbo.container.spring.SpringContainer
log4j=org.apache.dubbo.container.log4j.Log4jContainer
logback=org.apache.dubbo.container.logback.LogbackContainer

      1.3、获取实现的方式

ExtensionLoader<Container> extensionLoader = ExtensionLoader.getExtensionLoader(Container.class);
Container spring = extensionLoader.getExtension("log4j");
System.out.println(sprin

       1.4、实现源码解析

           ①、获取扩展载入器

    public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {1、基础校验。if (type == null) {throw new IllegalArgumentException("Extension type == null");}if (!type.isInterface()) {throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");}if (!withExtensionAnnotation(type)) {throw new IllegalArgumentException("Extension type (" + type +") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");}2、先从本地缓存EXTENSION_LOADERS中通过Class类型获取当前Class类型的扩展载入器,EXTENSION_LOADERS是一个ConcurrentHashMap。ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);if (loader == null) {3、如果缓存中没有,就创建一个当前所需Class类型的载入器并添加到本地缓存。EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));4、然后再次从本地缓存中获取。loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);}5、返回构建好的当前Class类的扩展载入器。return loader;}

                 ExtensionLoader构造函数:

    private ExtensionLoader(Class<?> type) {this.type = type;这个objectFactory属性的类型是ExtensionFactory,也就是扩展工厂,这个工厂就是用来根据入参
获取到真正的实现实例。默认会创建一个AdaptiveExtensionFactory实例,这个是一个自适应扩展类,这个类里
面有一个List<ExtensionFactory> factories属性,在创建的时候会将ExtensionFactory的另外两个实现
SpiExtensionFactory、SpringExtensionFactory实例化后添加到factories集合中。objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());}

                  ExtensionFactory接口:

    @SPIpublic interface ExtensionFactory {/*** Get extension.** @param type object type.* @param name object name.* @return object instance.*/ExtensionFactory 接口的作用就是根据类型+扩展实现的key来获取一个实现的实例。<T> T getExtension(Class<T> type, String name);}

                 ExtensionFactory类图:

                 

                 ExtensionLoader<Container> extensionLoader = ExtensionLoader.getExtensionLoader(Container.class)得到的结果

                 

           ②、使用扩展载入器根据扩展的实现名称来获取实现的实例

                   Container spring = extensionLoader.getExtension("log4j");

         public T getExtension(String name) {入参是实现的名称,以及是否需要包装实现,默认是需要。return getExtension(name, true);}

                    上面提到了包装举例说明以Cluster来举例,我么先查看Cluster的spi文件

      后缀名是Wrapper的表示是包装类,其实dubbo里面判断是否是包装类的方式是,查看类中是否有入参是扩展点接口的构造函数,如果有那就是一个包装器,多个人包装器也是有排序的。mock=org.apache.dubbo.rpc.cluster.support.wrapper.MockClusterWrapperfailover=org.apache.dubbo.rpc.cluster.support.FailoverClusterfailfast=org.apache.dubbo.rpc.cluster.support.FailfastClusterfailsafe=org.apache.dubbo.rpc.cluster.support.FailsafeClusterfailback=org.apache.dubbo.rpc.cluster.support.FailbackClusterforking=org.apache.dubbo.rpc.cluster.support.ForkingClusteravailable=org.apache.dubbo.rpc.cluster.support.AvailableClustermergeable=org.apache.dubbo.rpc.cluster.support.MergeableClusterbroadcast=org.apache.dubbo.rpc.cluster.support.BroadcastClusterzone-aware=org.apache.dubbo.rpc.cluster.support.registry.ZoneAwareCluster

               当我们根据名称获取Cluster扩展实现的时候,获取到的就是包装类MockClusterWrapper,MockClusterWrapper包装了我们根据名称找到的实现。

                

                接着主流程走:

    public T getExtension(String name, boolean wrap) {if (StringUtils.isEmpty(name)) {throw new IllegalArgumentException("Extension name == null");}if ("true".equals(name)) {如果传入的名称是“true”那就获取一个默认的扩展点实现实例。return getDefaultExtension();}创建一个Holder实例用于存放我们找到的扩展点实现类的实例。final Holder<Object> holder = getOrCreateHolder(name);Object instance = holder.get();if (instance == null) {synchronized (holder) {instance = holder.get();if (instance == null) {根据名称+是否包装来获取一个扩展点实现的实例。instance = createExtension(name, wrap);holder.set(instance);}}}return (T) instance;}
    private T createExtension(String name, boolean wrap) {1、getExtensionClasses()方法加载当前扩展点所有的实现类,就是去加载路径下的spi文件然后按照key=名称,value=实现.class放在HashMap中返回。那么在这一步我们就能根据名获取到我们所需实现的Class信息。同时也会判断实现的类Class信息是否是包装类、自适应、激活扩展方式。是否是包装                是    缓存到当前ExtensionLoader的cachedWrapperClasses中类级别是否是自适应扩展方式  是    缓存到当前ExtensionLoader的cachedAdaptiveClass中类级别是否是激活扩展方式    是    缓存到当前ExtensionLoader的cachedActivatesClass<?> clazz = getExtensionClasses().get(name);if (clazz == null) {throw findException(name);}try {T instance = (T) EXTENSION_INSTANCES.get(clazz);if (instance == null) {2、通过反射创建一个我们所需要的扩展点实现类的实例EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());instance = (T) EXTENSION_INSTANCES.get(clazz);}3、如果我们所需的实现里面有一些属性,那么在此处进行依赖注入,类似于springDI,这里就会使用我们之前构建好的ExtensionFactory来进行依赖的类实例构建,比如扩展点里面依赖扩展点也是在此处进行依赖注入。injectExtension(instance);4、如果需要包装,默认是需要包装。if (wrap) {List<Class<?>> wrapperClassesList = new ArrayList<>();5、cachedWrapperClasses这个Set缓存会在第1步中载入扩展类所有的实现类Class信息的时候会进行判断以及缓存在cachedWrapperClasses中表示当前扩展点的包装类。if (cachedWrapperClasses != null) {wrapperClassesList.addAll(cachedWrapperClasses);6、正排序包装类Class信息。wrapperClassesList.sort(WrapperComparator.COMPARATOR);7、然后再反转成倒排序。Collections.reverse(wrapperClassesList);}if (CollectionUtils.isNotEmpty(wrapperClassesList)) {for (Class<?> wrapperClass : wrapperClassesList) {Wrapper wrapper = wrapperClass.getAnnotation(Wrapper.class);if (wrapper == null|| (ArrayUtils.contains(wrapper.matches(), name) && !ArrayUtils.contains(wrapper.mismatches(), name))) {8、使用扩展点实现的实例来构建包装实例,然后再依赖注入包装实例。instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));}}}}9、初始化扩展点实例(此处要么是真正的实现实例,要么是包装类实例)initExtension(instance);10、返回我们根据名称找到的扩店点实现的实例。return instance;} catch (Throwable t) {throw new IllegalStateException("Extension instance (name: " + name + ", class: " +type + ") couldn't be instantiated: " + t.getMessage(), t);}}

            这就是根据名称获取扩展点实现实例的主要实现。

 

2、自适应扩展点实现的源码分析

      2.1、应用方式:

        这里获取扩展点加载器跟之前的根据名称获取扩展点的实现是一样的ExtensionLoader<Protocol> extensionLoader = ExtensionLoader.getExtensionLoader(Protocol.class);获取自适应扩展点的自适应实现实例Protocol protocol = extensionLoader.getAdaptiveExtension();System.out.println(protocol);

       2.2、核心方法getAdaptiveExtension()方法

    public T getAdaptiveExtension() {1、先获取缓存,看看是否存在,默认情况下,类级别的自适应扩展点实现只有一个,如果被创建了就会缓存在构建一个Holder实例缓存在cachedAdaptiveInstance中Object instance = cachedAdaptiveInstance.get();if (instance == null) {if (createAdaptiveInstanceError != null) {throw new IllegalStateException("Failed to create adaptive instance: " +createAdaptiveInstanceError.toString(),createAdaptiveInstanceError);}synchronized (cachedAdaptiveInstance) {instance = cachedAdaptiveInstance.get();if (instance == null) {try {2、创建一个自适应扩展点的实现实例instance = createAdaptiveExtension();3、将获取到的自适应扩展点的实现实例缓存在cachedAdaptiveInstance中。cachedAdaptiveInstance.set(instance);} catch (Throwable t) {createAdaptiveInstanceError = t;throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);}}}}4、返回自适应扩展点的实现实例。return (T) instance;}

           核心方法createAdaptiveExtension()创建自适应扩展实现实例

    private T createAdaptiveExtension() {try {获取自适应扩展的实现类Class信息,分为两种:1、在实现类上面使用@Adaptive注解的时候,就直接返回类的Class信息。2、在扩展点接口的方法上面标注@Adaptive注解的时候,这个时候dubbo会动态生成一个实现类的源码code,然后获取一个编译器Compiler进行编译后返回。获取到自适应扩展点接口的实现类的Class后,反射实例化,然后再对实例进行依赖注入。return injectExtension((T) getAdaptiveExtensionClass().newInstance());} catch (Exception e) {throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);}}

           核心方法getAdaptiveExtensionClass():

    private Class<?> getAdaptiveExtensionClass() {1、先去获取扩展点接口的spi文件,然后将所有的实现的Class信息载入并缓存,这一步跟使用名称获取扩展点实现的时候是一样的处理方式。getExtensionClasses();2、如果当前自适应扩展点有类级别的实现,那就直接返回其Class信息。if (cachedAdaptiveClass != null) {return cachedAdaptiveClass;}3、如果当前扩展点没有类级别的自适应扩展实现,那就动态创建一个自适应实现,这个动态的实现只会实现被@Adaptive标注的扩展点接口方法。return cachedAdaptiveClass = createAdaptiveExtensionClass();}

           核心方法 createAdaptiveExtensionClass():

    private Class<?> createAdaptiveExtensionClass() {1、先去生成自适应扩展点的实现代码code,如果扩展点接口中没有被@Adaptive标注的方法的话,将会抛出IllegalStateException(No adaptive method exist on extensionString code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();2、获取ExtensionLoader.class类的类装载器。ClassLoader classLoader = findClassLoader();3、获取一个代码编译器进行代码编译从而得到动态生成的自适应扩展点实现类的Class信息。Compiler也是自适应扩展点,有一个类级别的自适应实现AdaptiveCompiler这个实现也就是一个适配器,里面会选择实现,默认会选择JavassistCompiler作为编译器。org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();使用编译器来便器动态生成的自适应扩展点的源码code,从而得到Class信息。return compiler.compile(code, classLoader);}

           比如Protocol在获取自适应扩展点的时候动态生成的实现的源码code案例:

类名为扩展点接口simpleName&Adaptive,这些代码全是在允许期间动态生成的,然后会在运行期间编译。
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {public void destroy() {throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");}public int getDefaultPort() {throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");}public Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");if (arg0.getUrl() == null)throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");URL url = arg0.getUrl();String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());if (extName == null)throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);return extension.export(arg0);}public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {if (arg1 == null) throw new IllegalArgumentException("url == null");org.apache.dubbo.common.URL url = arg1;String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());if (extName == null)throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url (" + url.toString() + ") use keys([protocol])");org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);return extension.refer(arg0, arg1);}public java.util.List getServers() {throw new UnsupportedOperationException("The method public default java.util.List org.apache.dubbo.rpc.Protocol.getServers() of interface org.apache.dubbo.rpc.Protocol is not adaptive method!");}}

           @Adaptive注解知识点:此注解可标注在类上或者方法上,此注解有一个成员属性value,类型是String[],含义是当前扩展点的参数名称,意思就是在服务的url中使用哪个参数名称来指定具体扩展点的实现,不配置默认是扩展点接口的SimpleName首字母小写,如指定协议时候使用protocol参数。

以上就是自适应扩展点是实现方式。

 

3、激活扩展点实现的源码分析

      3.1、@Activate 注解

               ①、用于标注在扩展点的实现类上面。

               ②、注解的成员属性列表:

                      String[] group() default {} :指定分组,表示在指定分组下才能激活当前的扩展实现,没有指定分组将会默认是匹配。此处的分组跟我们服务的分组不是一个东西,这里的分组表示的是提供者还是消费者,比如Filter扩展点下面有很多实现,但是有些只能在提供者端使用,有的只能在消费者端使用,就是使用这个group来进行控制。

                      String[] value() default {}:指定dubbo接口的URL中的参数名称,如果配置的话就需要去URL中根据配置的名称去获取参数值,如果URL中没有此参数就不会被激活。如果不配置默认表示激活。例如某实现类上有注解@Activate("cache, validation")就表示如果url中能够找到cache参数或者有validation参数就激活当前扩展点实现。

                      int order() default 0:当前激活的扩展点实现如过被激活,将会在什么位置排序。

     3.2、激活扩展点的源码实现

    key表示url中配置参数的名称,比如我们在服务提供方配置:filter=filter="cache,token,-timeout" 在服务提供者解析阶段会往URL中添加参数service.filter=cache,token,-timeout 且在服务发布过程中会调用  List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), "service.filter", "provider");方法去获取到需要激活的Filter。url表示dubbo服务的url。group表示在什么分组下激活,是激活扩展点的过滤条件。public List<T> getActivateExtension(URL url, String key, String group) {1、通过key从Url中获取参数的value值。String value = url.getParameter(key);2、将value值进行split(",")后,去获取符合条件的激活扩展点列表。return getActivateExtension(url, StringUtils.isEmpty(value) ? null : COMMA_SPLIT_PATTERN.split(value), group);}
    public List<T> getActivateExtension(URL url, String[] values, String group) {入参说明:url    :dubbo服务的urlvalues :url中按某参数名称获取到的value值split(",")的数组,一般情况下是激活扩展点实现的名称,如上面说的["cache","token","-timeout"]-timeout表示一处==移除这个激活扩展点的实现。group  :获取激活扩展点的传入的分组,如provider、consumerList<T> activateExtensions = new ArrayList<>();1、使用一个names的集合来存放values。List<String> names = values == null ? new ArrayList<>(0) : asList(values);2、如果values中不包含"-default",也就是说这个if成立的化应该是去加载默认的激活扩展点。而"-default"就表示移除默认的激活扩展点。默认的激活扩展点就是我们没有显示配置的激活扩展点。if (!names.contains(REMOVE_VALUE_PREFIX + DEFAULT_KEY)) {3、这里不用多说了吧,在载入当前扩展点接口所有的实现类,将激活扩展的实现Class信息缓存到cachedActivates中。getExtensionClasses();4、循环当前扩展点的所有激活扩展的实现类。for (Map.Entry<String, Object> entry : cachedActivates.entrySet()) {5、获取当前激活扩展实现的nameString name = entry.getKey();6、获取当前激活扩展实现的@Activate注解描述实实例,没错cachedActivates缓存中放的key=实现的name  value=@Activate的注解描述实例。Object activate = entry.getValue();String[] activateGroup, activateValue;7、这一步就是获取@Activate注解的group、value属性。if (activate instanceof Activate) {activateGroup = ((Activate) activate).group();activateValue = ((Activate) activate).value();} else if (activate instanceof com.alibaba.dubbo.common.extension.Activate) {activateGroup = ((com.alibaba.dubbo.common.extension.Activate) activate).group();activateValue = ((com.alibaba.dubbo.common.extension.Activate) activate).value();} else {continue;}8、判断当前循环的激活扩展点实现是否是本次获取激活扩展点所需要的,判断的条件:条件1-分组匹配:如果传入的group与当前激活扩展点实现类上标注的@Activate中的group是匹配的,如果传入的group为空就默认表示匹配,如果传入group不为空那就查看当前循环的实现是支持当前分组。条件2-当前循环的扩展点实现的名称不在配置的valus数组中这个if里面是处理默认的激活扩展点,也就是判断不在我们配置的范围内的激活扩展实现,这样做的好处就是必要的激活扩展点的实现不需要我们显示的指定。在names集合中的也就是我们显示配置的激活扩展点的实现在下面会将其加入到我们所需要的激活扩展点的列表中。条件3-当循环的激活扩展点的实现是否是我们配置需要移除的实现比如我们上面配置的filter="cache","token","-timeout" 我们要移除name=timeout的扩展点实现。条件4-当前循环的激活扩展点实现是否需要激活判断依据:1、如果当前激活扩展点实现类上的@Activate注解的成员属性value数组为空,那就表示默认需要激活。2、如果当前激活扩展点实现类上的@Activate注解的成员属性value数组不为空,那就遍历URL的参数列表,如果存在数组中的任何一个参数名称,那就表示需要激活。举例如下:假设当前循环的扩展点实现标注了@Active(value={"cache","xxx"}),那就表示如果当前的URL参数中只要有cache、或者xxx参数,那么就表示当前的激活扩展点实现是需要激活的。                if (isMatchGroup(group, activateGroup)&& !names.contains(name)&& !names.contains(REMOVE_VALUE_PREFIX + name)&& isActive(activateValue, url)) {9、上面4个条件都满足后,那么默认的激活扩展点是实现列表就添加完成。activateExtensions.add(getExtension(name));}}10、排序满足当前环境的默认的激活扩展点是实现的列表。activateExtensions.sort(ActivateComparator.COMPARATOR);}11、创建一个空集合用存存放我们显示配置的激活扩展点实现列表。List<T> loadedExtensions = new ArrayList<>();12、处理我们显示配置的激活扩展点,names=values也就是我们显示配置的激活扩展点,如filter="cache","token","-timeout" 就表示我们显示的配置3个激活扩展点。for (int i = 0; i < names.size(); i++) {String name = names.get(i);13、如果是不需要移除的扩展点实现那就加入到返回结果集合里面。if (!name.startsWith(REMOVE_VALUE_PREFIX)&& !names.contains(REMOVE_VALUE_PREFIX + name)) {if (DEFAULT_KEY.equals(name)) {if (!loadedExtensions.isEmpty()) {activateExtensions.addAll(0, loadedExtensions);loadedExtensions.clear();}} else {loadedExtensions.add(getExtension(name));}}}14、如果显示配置的激活扩展点实现不为空,那就加入到结果集activateExtensions集合中,从此步可以看出,显示配置的激活扩展点实现会放在默认的激活扩展点实现的后面。if (!loadedExtensions.isEmpty()) {activateExtensions.addAll(loadedExtensions);}15、返回本次获取激活扩展点实现的列表。return activateExtensions;}

           这就是dubbo激活扩展点的实现原理。