当前位置: 代码迷 >> 综合 >> Soul 源码阅读之熔断器
  详细解决方案

Soul 源码阅读之熔断器

热度:24   发布时间:2023-12-15 21:31:59.0

这篇文章开始,我们来探究 soul 对熔断的支持。
Soul 一共支持了三款主流的熔断实现,分别是 hystrix,sentinel,resilience4j 三种。本篇从 hystrix 开始。

熔断器

分布式系统中常见的一个组件,类似于物理层面的熔断器,当电流超过规定值一定时间后,自身断开,起到保护用电器的作用。放回到分布式系统中,当流量达到一定程度后,采取快速失败等策略,从而提升整体服务的可用性。

hystrix

hystrix 是 netflix 开源的熔断工具,提供了两种实现方式,线程池和信号量,可以灵活的配置触发限流的条件如超过多少百分比的失败,也提供了诸如熔断后恢复的功能。

HystrixPlugin

soul hystrix 插件的源码主要就是 hystrix 的使用过程:

    protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
    ...// 熔断器相关配置是规则级别的,所以相关的上下文将在 rule 中获取final HystrixHandle hystrixHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), HystrixHandle.class);...// 获取 CommandCommand command = fetchCommand(hystrixHandle, exchange, chain);...}private Command fetchCommand(final HystrixHandle hystrixHandle, final ServerWebExchange exchange, final SoulPluginChain chain) {
    if (hystrixHandle.getExecutionIsolationStrategy() == HystrixIsolationModeEnum.SEMAPHORE.getCode()) {
    // 信号量的方式熔断return new HystrixCommand(HystrixBuilder.build(hystrixHandle),exchange, chain, hystrixHandle.getCallBackUri());}// 线程池的方式熔断return new HystrixCommandOnThread(HystrixBuilder.buildForHystrixCommand(hystrixHandle),exchange, chain, hystrixHandle.getCallBackUri());}

可以看到,soul 分别使用 HystrixCommand 和 HystrixCommandOnThread 实现了 com.netflix.hystrix 包下的 HystrixObservableCommand 和 HystrixCommand。同时,使用了 HystrixBuilder 类来构建相应的 Setter,即一些熔断相关的指标配置,这里需要注意的是,以线程池的方式进行熔断时,没有对最大并发度的配置,这是 hystrix 决定的。

Resilience4J

Resilience4j是一个受Netflix Hystrix启发的轻量级容错库,但专为Java 8和函数式编程而设计。轻量级,因为这个库只使用Vavr,而Vavr没有任何其他外部库依赖。相比之下,Netflix Hystrix对Archaius有编译依赖,Archaius有更多的外部库依赖,如Guava和Apache Commons Configuration。
Resilience4JRegistryFactory
Resilience4JRegistryFactory 维护了两个 resilience4j 的注册器,分别用于限流和熔断功能的注册:

/*** RateLimiter registry.*/
private static final RateLimiterRegistry RATE_LIMITER_REGISTRY = RateLimiterRegistry.ofDefaults();/*** CircuitBreaker registry.*/
private static final CircuitBreakerRegistry CIRCUIT_BREAKER_REGISTRY = CircuitBreakerRegistry.ofDefaults();
Resilience4JBuilder
Resilience4JBuilder 创建了两个 Resilience4JConf 实例,用于维护两个 conf:
public static Resilience4JConf build(final RuleData ruleData) {
    // 从规则配置中获取相应的配置Resilience4JHandle handle = GsonUtils.getGson().fromJson(ruleData.getHandle(), Resilience4JHandle.class);CircuitBreakerConfig circuitBreakerConfig = null;// 组装 CircuitBreakerConfigif (handle.getCircuitEnable() == 1) {
    ...}// 组装 TimeLimiterConfigTimeLimiterConfig timeLimiterConfig = TimeLimiterConfig.custom().timeoutDuration(Duration.ofSeconds(handle.getTimeoutDuration() / 1000)).build();// 组装 RateLimiterConfigRateLimiterConfig rateLimiterConfig = RateLimiterConfig.custom().limitForPeriod(handle.getLimitForPeriod()).timeoutDuration(Duration.ofSeconds(handle.getTimeoutDurationRate() / 1000)).limitRefreshPeriod(Duration.ofNanos(handle.getLimitRefreshPeriod() * 1000000)).build();return new Resilience4JConf(Resilience4JHandler.getResourceName(ruleData), handle.getFallbackUri(), rateLimiterConfig, timeLimiterConfig, circuitBreakerConfig);
}

最终,这些 Config 会被用于注册相应的 CircuitBreaker 或 RateLimiter。

Executor

这个接口的两个实现类 RateLimiterExecutor 和 CombinedExecutor,最终使用 Resilience4JRegistryFactory 和相应的 Config,构建了 Resilience4J 的实例并调用,以 CombinedExecutor 为例:

@Override
public <T> Mono<T> run(final Mono<T> run, final Function<Throwable, Mono<T>> fallback, final Resilience4JConf resilience4JConf) {
    // 使用 factory 和 Conf 生成 RateLimiter 实例RateLimiter rateLimiter = Resilience4JRegistryFactory.rateLimiter(resilience4JConf.getId(), resilience4JConf.getRateLimiterConfig());// 使用 factory 和 Conf 生成 CircuitBreaker 实例CircuitBreaker circuitBreaker = Resilience4JRegistryFactory.circuitBreaker(resilience4JConf.getId(), resilience4JConf.getCircuitBreakerConfig());// 最终调用 Resilience4J 实现熔断功能Mono<T> to = run.transformDeferred(CircuitBreakerOperator.of(circuitBreaker)).transformDeferred(RateLimiterOperator.of(rateLimiter)).timeout(resilience4JConf.getTimeLimiterConfig().getTimeoutDuration()).doOnError(TimeoutException.class, t -> circuitBreaker.onError(resilience4JConf.getTimeLimiterConfig().getTimeoutDuration().toMillis(),TimeUnit.MILLISECONDS,t));if (fallback != null) {
    to = to.onErrorResume(fallback);}return to;
}

Resilience4JPlugin

有了前文的基础,实际上 Resilience4JPlugin 就比较好理解了,仅仅是根据用户的配置,最终调用不同的 Executor 实例:

@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
    final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);assert soulContext != null;Resilience4JHandle resilience4JHandle = GsonUtils.getGson().fromJson(rule.getHandle(), Resilience4JHandle.class);if (resilience4JHandle.getCircuitEnable() == 1) {
    // 调用 CombinedExecutor 实例return combined(exchange, chain, rule);}// 调用 RateLimiterExecutor 实例return rateLimiter(exchange, chain, rule);
}

Sentinel

Sentinel 是阿里开源的项目,以“flow”为突破口,涵盖流量控制、并发限制、断路、自适应系统保护等多个领域的强大流量控制组件,保证微服务的可靠性。

SentinelRuleHandle

SentinelRuleHandle 类实现了 PluginDataHandler 接口,接收 RuleData 数据。其使用 Sentinel 提供的两个 Manager 实现熔断器配置的加载:

@Override
public void handlerRule(final RuleData ruleData) {
    SentinelHandle sentinelHandle = GsonUtils.getInstance().fromJson(ruleData.getHandle(), SentinelHandle.class);List<FlowRule> flowRules = FlowRuleManager.getRules().stream().filter(r -> !r.getResource().equals(getResourceName(ruleData))).collect(Collectors.toList());if (sentinelHandle.getFlowRuleEnable() == Constants.SENTINEL_ENABLE_FLOW_RULE) {
    // 使用 ruleData 信息作为资源名称创建 FlowRuleFlowRule rule = new FlowRule(getResourceName(ruleData));rule.setCount(sentinelHandle.getFlowRuleCount());rule.setGrade(sentinelHandle.getFlowRuleGrade());rule.setControlBehavior(sentinelHandle.getFlowRuleControlBehavior());flowRules.add(rule);}// 加载 FlowRuleFlowRuleManager.loadRules(flowRules);List<DegradeRule> degradeRules = DegradeRuleManager.getRules().stream().filter(r -> !r.getResource().equals(getResourceName(ruleData))).collect(Collectors.toList());if (sentinelHandle.getDegradeRuleEnable() == Constants.SENTINEL_ENABLE_DEGRADE_RULE) {
    // 使用 ruleData 信息作为资源名称创建 DegradeRule DegradeRule rule = new DegradeRule(getResourceName(ruleData));rule.setCount(sentinelHandle.getDegradeRuleCount());rule.setGrade(sentinelHandle.getDegradeRuleGrade());rule.setTimeWindow(sentinelHandle.getDegradeRuleTimeWindow());degradeRules.add(rule);}// 加载 DegradeRuleDegradeRuleManager.loadRules(degradeRules);
}

SentinelPlugin

@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
    ...// 使用 RuleData 信息作为资源名称,使用 Sentinel 维护的内存中的规则配置,获取 reactor 实例作为最终执行逻辑return chain.execute(exchange).transform(new SentinelReactorTransformer<>(resourceName)).doOnSuccess(v -> {
    if (exchange.getResponse().getStatusCode() != HttpStatus.OK) {
    HttpStatus status = exchange.getResponse().getStatusCode();exchange.getResponse().setStatusCode(null);throw new SentinelFallbackException(status);}}).onErrorResume(throwable -> sentinelFallbackHandler.fallback(exchange, UriUtils.createUri(sentinelHandle.getFallbackUri()), throwable));
}