Hystrix
在微服务架构中, 我们将系统拆分成了很多服务单元, 各单元的应用间通过服务注册与订阅的方式互相依赖。 由于每个单元都在不同的进程中运行, 依赖通过远程调用的方式执行, 这样就有可能因为网络原因或是依赖服务自身问题出现调用故障或延迟, 而这些问题会直接导致调用方的对外服务也出现延迟, 若此时调用方的请求不断增加,最后就会因等待出现故障的依赖方响应形成任务积压,最终导致自身服务的瘫痪。
举个例子, 在 一 个电商网站中, 我们可能会将系统拆分成用户、 订单、 库存、 积分、评论等 一 系列服务单元。 用户创建 一 个订单的时候, 客户端将调用订单服务的创建订单接口,此时创建订单接口又会向库存服务来请求出货(判断是否有足够库存来出货)。 此时若库存服务因自身处理逻辑等原因造成响应缓慢, 会直接导致创建订单服务的线程被挂起,以等待库存申请服务的响应, 在漫长的等待之后用户会因为请求库存失败而得到创建订单失败的结果。 如果在高并发情况之下, 因这些挂起的线程在等待库存服务的响应而未能释放, 使得后续到来的创建订单请求被阻塞,最终导致订单服务也不可用。
在微服务架构中, 存在着那么多的服务单元, 若一个单元出现故障,就很容易因依赖关系而引发故障的蔓延,最终导致整个系统的瘫痪,这样的架构相较传统架构更加不稳定。为了解决这样的问题, 产生了断路器等一系列的服务保护机制。
断路器模式源于 Martin Fowler 的 Circuit Breaker一 文。“断路器 ” 本身是 一 种开关装置,用于在电路上保护线路过载,当线路中有电器发生短路时, "WT 路器”能够及时切断故障电路, 防止发生过载、 发热甚至起火等严重后果。在分布式架构中, 断路器模式的作用也是类似的, 当某个服务单元发生故障(类似用电器发生短路) 之后, 通过断路器的故障监控(类似熔断保险丝), 向调用方返回一 个错误响应, 而不是长时间的等待。 这样就不会使得线程因调用故障服务被长时间占用不释放,避免了故障在分布式系统中的蔓延。针对上述问题, Spring Cloud Hystrix实现了断路器、 线程隔离等一系列服务保护功能。它也是基于Netflix的开源框架Hystrix实现的, 该框架的目标在于通过控制那些访问远程系统、 服务和第三方库的节点, 从而对延迟和故障提供更强大的容错能力。Hystrix具备服务降级、 服务熔断、 线程和信号隔离、 请求缓存、 请求合并以及服务监控等强大功能。
服务降级
比如某些app提示的"网络开小差了"之类的,优先核心服务,非核心服务不可用或者弱可用。 通过HystrixCommand注解指定fallbackMethod(回退函数)中具体实现降级逻辑
不使用Hystrix组件时,用订单访问商品服务,返回查询信息。
但是,如果此时关闭了商品服务,那么接口直接提示500
订单服务-使用Hystrix组件,触发服务降级
1.引入依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-hystrix</artifactId></dependency>
2.启动类添加 @EnableCircuitBreaker 注解,但是还有个注解@SpringCloudApplication里面包含了:@SpringBootApplication、@EnableDiscoveryClient、@EnableCircuitBreaker所以可以直接再启动类添加@SpringCloudApplication来替换这三个。
3.在需要降级的接口添加@HystrixCommand,当访问的服务不可用时,触发降级
HystrixCommand: 用在依赖的服务返回单个操作结果的时候
HystrixObservableCommand: 用在依赖的服务返回多个操作结果的时候。
//同步执行@HystrixCommand(fallbackMethod = "fallback")@GetMapping("/getPorductInfoList")public String getProductInfoList(){RestTemplate restTemplate=new RestTemplate();return restTemplate.postForObject("http://localhost:8088/product/listForOrder",Arrays.asList("157875196366160022"),String.class);}//异步执行@HystrixCommand(fallbackMethod = "fallback")@GetMapping("/getPorductInfoList")public Future<String> getProductInfoList(){return new AsyncResult<String>() {@Overridepublic String invoke() {RestTemplate restTemplate=new RestTemplate();return restTemplate.postForObject("http://localhost:8088/product/listForOrder",Arrays.asList("157875196366160022"),String.class);}}}public String fallback(){return "太拥挤,请稍少再试";}
服务降级时异常处理
那么如果是抛出了一个异常,那么会怎样呢?修改接口
@HystrixCommand(fallbackMethod = "fallback")@GetMapping("/getPorductInfoList")public String getProductInfoList(){
// RestTemplate restTemplate=new RestTemplate();
// return restTemplate.postForObject("http://localhost:8088/product/listForOrder",
// Arrays.asList("157875196366160022"),
// String.class);throw new RuntimeException("发送异常了");}
结果还是返回 “太拥挤,请稍少再试”,以此我们也可以实现服务内部的降级
在 HystrixComrnand 实现的 run() 方法中抛出异常时, 除了 HystrixBadRequest-Exception 之外,其他异常均会被 Hystrix 认为命令执行失败并触发服务降级的处理逻辑,所以当需要在命令执行中抛出不触发服务降级的异常时来使用它。
而在使用注册配置实现 Hystrix 命令时,它还支持忽略指定异常类型功能, 只需要通过
设置 @HystrixComrnand 注解的 ignoreExceptions 参数, 比如:
@HystrixCommand(ignoreExceptions = {BadRequestException.class})
public User getUserByid(Long id) {return restTemplate.getForObject("http://USER-SERVICE/users/{1}", User.class,id);
}
如上面代码的定义, 当 getUserByld 方法抛出了类型为 BadRequestException的异常时, Hystrix 会将它包装在 HystrixBadRequestException 中抛出, 这样就不会触发后续的 fallback 逻辑。
当 Hystrix 命令因为异常(除了 HystrixBadRequestException 的异常)进入服务降级逻辑之后, 往往需要对不同异常做针对性的处理, 那么我们如何来获取当前抛出的异常呢?
注解配置方式实现异常的获取。 只需要在 fallback 实现方法的参数中增加 Throwable e 对象的定义, 这样在方法内部就可以获取触发服务降级的具体异常内容了, 比如:
@HystrixCommand(fallbackMethod = "fallbackl")
User getUserByid(String id) {throw new RuntimeException("getUserByid command failed");
}
User fallbackl{String id, Throwable e) {assert "getUserByid command failed". equals {e.getMessage {}));
}
默认降级
也可以在Controller添加默认的降级@DefaultProperties(defaultFallback = “defaultFallback”)。如果添加了默认的那么方法上的@HystrixCommand可以不写任何参数,然后就会使用默认的。
public String defaultFallback(){return "默认提示:太拥挤,请稍少再试";}
超时时间设置
1.比如在服务方,也就是商品的查询接口设置等待时间为2秒
Thread.sleep(2000);
那么访问的时候,永远都走入了降级。原因HystrixCommand默认的时间为1秒而接口访问时间已经超过1秒。服务方等待了2秒,调用方等不及了所以直接走入了降级。面对这样的情况,那么可以给服务降级来设置一个超时时间,以此保证服务间的正常调用。
2.重新设置默认时间,访问成功
@HystrixCommand(commandProperties =
@HystrixProperty(name = “execution.isolation.thread.timeoutInMilliseconds”,value = “3000”))
怎么去看默认的值呢?
进入@HystrixProperty 然后进入hystrix的源码
找到HystrixCommandProperties类 里面就是它的一些属性和默认值
搜索下时间的属性可以看到,在这个类中设置了默认属性。这个代码的意思是如果没有设置"execution.isolation.thread.timeoutInMilliseconds",那么使用默认值,所以我们上面将timeoutInMilliseconds重新设置
this.executionTimeoutInMilliseconds = getProperty(propertyPrefix, key, "execution.isolation.thread.timeoutInMilliseconds", builder.getExecutionIsolationThreadTimeoutInMilliseconds(), default_executionTimeoutInMilliseconds);
断路器模式
断路器模式设计:有三种状态
Closed 关闭:调用失败次数累计到一定次数或者达到一定比例就会启动
open 打开:对服务器都返回错误
Half Open 半熔断状态:允许定量服务请求,如果调用都成功或者一定比例就会关闭熔断器,否则认为没好,就打开断路器。
熔断实现
//超时配置
// @HystrixCommand(commandProperties =
// @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds",value = "3000"))@HystrixCommand(commandProperties ={@HystrixProperty(name = "circuitBreaker.enabled",value = "true"),//设置熔断开启@HystrixProperty(name = "circuitBreaker.requestVolumeThreshold",value = "10"),@HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds",value = "10000"),@HystrixProperty(name = "circuitBreaker.errorThresholdPercentage",value = "60")})//设置 错误百分比@GetMapping("/getPorductInfoList")public String getProductInfoList(@RequestParam("num")Integer num){if (num % 2 == 0 ){return "success";}RestTemplate restTemplate=new RestTemplate();return restTemplate.postForObject("http://localhost:8088/product/listForOrder",Arrays.asList("157875196366160022"),String.class);}
关闭超时时间,也就是说默认超时时间是1秒此时访问商品接口那么必然会返回"默认提示:太拥挤,请稍少再试"。分别使用num为1的数字参数和num为2的参数请求接口,num为1的结果应该返回"默认提示:太拥挤,请稍少再试",而num为2的结果返回"success"。但是如果不停的使用num为1的参数去访问,当错误率达到60%那么此时再使用num为2的参数去访问返回的也是"默认提示:太拥挤,请稍少再试",这个就是熔断保护。
等过一小会重新访问就又好了
使用配置项实现:
设置全局的超时时间,方法上@HystrixCommand注解的所有配置都可以去掉。但是必须添加@HystrixCommand注解,这个是启动容错必须的
hystrix:command:default:execution:isolation:thread:timeoutInMilliseconds: 3000
设置单个方法的超时时间
hystrix:command:default:execution:isolation:thread:timeoutInMilliseconds: 1000#如果@HystrixCommand注解没设置commandKey这个配置,那么该配置默认的值为方法名称。getProductInfoList为@HystrixCommand注解的commandKeygetProductInfoList:execution:isolation:thread:timeoutInMilliseconds: 3000
可以看到,全局的超时时间为1秒,本来访问接口应该返回"默认提示:太拥挤,请稍少再试",但是我们给该方法设置了超时时间,所以访问接口的返回值是正确的。
请求缓存
当系统用户不断增长时, 每个微服务需要承受的并发压力也越来越大。 在分布式环境下, 通常压力来自于对依赖服务的调用, 因为请求依赖服务的资源需要通过通信来实现,这样的依赖方式比起进程内的调用方式会引起一部分的性能损失, 同时HTTP相比其他高性能的通信协议在速度上没有任何优势, 所以它有些类似于对数据库这样的外部资源进行读写操作, 在高并发的情况下可能会成为系统的瓶颈。 既然如此, 我们很容易地可以联想到, 类似数据访问的缓存保护是否也可以应用到依赖服务的调用上呢?答案显而易见, 在高并发的场景之下, Hystrix 中提供了请求缓存的功能, 我们可以方便地开启和使用请求缓存来优化系统, 达到减轻高并发时的请求线程消耗、 降低请求响应时间的效果。
继承方式
public class UserCommand extends HystrixCommand<User> {private RestTemplate restTemplate;private Long id;public UserCommand(RestTemplate restTemplate, Long id) {super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("UserGroup")));this.restTemplate= restTemplate;this.id= id;}@Overrideprotected User run() {return restTemplate.getForObject("http://USER-SERVICE/users/{1}",User.class, id);}@Overrideprotected String getCacheKey() {return String.valueOf(id);}
}
在上面的例子中, 我们通过在 getCacheKey 方法中返回的请求缓存 key 值(使用了传入的获取 User 对象的 id 值), 就能让该请求命令具备缓存功能。 此时, 当不同的外部请求处理逻辑调用了同 一 个依赖服务时, Hystrix 会根据 getCacheKey 方法返回的值来区分是否是重复的请求 (getCacheKey在run方法之前执行),如果它们的 cacheKey 相同, 那么该依赖服务只会在第一个请求到达时被真实地调用 一 次, 另外 一 个请求则是直接从请求缓存中返回结果, 所以通过开启请求缓存可以让我们实现的 Hystrix 命令具备下面几项好处:
? 减少重复的请求数, 降低依赖服务的并发度。
? 在同一 用户请求的上下文中, 相同依赖服务的返回数据始终保持一致。
? 请求缓存在 run() 和 construct ()执行之前生效, 所以可以有效减少不必要的线程开销。
注解方式 启动请求缓存
请求缓存清理
使用请求缓存时, 如果只是读操作, 那么不需要考虑缓存内容是否正确的问题, 但是如果请求命令中还有更新数据的写操作, 那么缓存中的数据就需要我们在进行写操作时进行及时处理, 以防止读操作的请求命令获取到了失效的数据。 我们可以通过 HystrixRequestCache.clear() 方法来进行缓存的清理, 具体示例如下:
private s七atic final HystrixCommandKey GETTER_ KEY = HystrixCommandKey. Factory.asKey("CommandKey");//刷新缓存,根据id进行清理
HystrixRequestCache.ge七Instance(GETTER_KEY,HystrixConcurrencyStrategyDefault.getinstance()) .clear(String.valueOf(id));
通过 HystrixRequestCache.getinstance(GETTER-KEY, HystrixConcurrency?StrategyDefault.getinstance())方法从默认的 Hystrix 并发策略中根据GETTER-KEY 获取到该命令的请求缓存对象 HystrixRequestCache 的实例, 然后再调用该请求缓存对象实例的 clear 方法。
请求合并
微服务架构中的依赖通常通过远程调用实现, 而远程调用中最常见的问题就是通信消耗与连接数占用。 在高并发的情况之下, 因通信次数的增加, 总的通信时间消耗将会变得不那么理想。 同时, 因为依赖服务的线程池资源有限,将出现排队等待与响应延迟的清况。为了优化这两个问题, Hystrix 提供了 HystrixCollapser 来实现请求的合并,以减少通信消耗和线程数的占用。
HystrixCollapser 实现 了在 HystrixCommand 之前放置一个合并处理器, 将处于一个很短的时间窗(默认10毫秒内对同一依赖服务的多个请求进行整合并以批量方式发起请求的功能(服务提供方也需要提供相应的批量实现接口)。通过
HystrixCollapser 的封装, 开发者不需要关注线程合并的细节过程,只需关注批量化服务和处理。
接下来, 我们通过 一 个简单的示例来直观理解实现请求合并的过程。
假设当前微服务 USER-SERVICE 提供了两个获取 User 的接口。
? /users/{id}: 根据 id 返回 User 对象的 GET 请求接口。
? /users?ids={ids}: 根据 ids 返回 User 对象列表的 GET 请求接口, 其中 ids 为以逗号分隔的 id 集合。
而在服务消费端, 已经为这两个远程接口通过 RestTemplate 实现了简单的调用,具体如下所示:
@Service
public class UserServiceimpl implements UserService {@Autowiredprivate RestTemplate restTemplate;@Overridepublic User find(Long id) {return restTemplate.getForObject("http://USER-SERVICE/users/{1} ",User.class, id);}@Overridepublic List<User> findAll(List<Long> ids) {return restTemplate.getForObject{"http://USER-SERVICE/users?ids={1} ",List. class, StringUtils. join (ids, ", ")) ;}
}
接着, 我们实现将短时间内多个获取单一User 对象的请求命令进行合并。
? 第 一 步,为请求合并的实现准备一个批量请求命令的实现, 具体如下:
public class UserBatchCommand extends HystrixCommand<List<User>> {UserService userService;List<Long> userids;public UserBatchCommand(UserService userService, List<Long> userids) {super(Setter.withGroupKey(asKey("userServiceCommand")));this.userids = userids;this.userService = userService;}@Overrideprotected List<User> run() throws Exception {return userService.findAll(userids);}
}
批量请求命令实际上就是 一 个简单的HystrixCommand实现, 从上面的实现中可以看到它通过调用 userService.findAll方法来访问/users?ids = {ids}接口以返回User的列表结果。
? 第二步, 通过继承HystrixCollapser实现请求合并器:
public class UserCollapseCommand extends HystrixCollapser<List<User>, User,Long> {private UserService userService;private Long userid;public UserCollapseCommand(UserService userService, Long userid) {super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("userCollapseCommand")) .andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter() .withTimerDelayinMilliseconds(100)));this.userService = userService;this.userid = userid;}@Overridepublic Long getRequestArgument () {return userid;}@Overrideprotected HystrixComrnand<List<User>> createComrnand(Collection<CollapsedRequest<User, Long>> collapsedRequests) {List<Long> userids = new ArrayList<>(collapsedRequests.size());userids.addAll(collapsedRequests.stream() .map(CollapsedRequest::getArgumen七) .calleet(Collectors.toList()));return new UserBatchCommand(userService, userids);}@Overrideprotected void mapResponseToRequests(List<User> batchResponse,Collection<CollapsedRequest<User, Long>> collapsedRequests) {int count = O;for (CollapsedRequest<User, Long> collapsedRequest : collapsedRequests) {User user = batchResponse.get(count++);collapsedRequest.setResponse(user);}
}
在上面的构造函数中, 我们为请求合并器设置 了时间延迟属性, 合并器会在该时间窗内收集获取单个User的请求并在时间窗结束时进行合并组装成单个批量请求。getRequestArgument 方法返回给定的单个请求参数userId, 而createCommand和
mapResponseToRequests是请求合并器的两个核心。
? ereateCommand: 该方法的 collapsedRequests参数中保存了延迟时间窗中收集到的所有获取单个User的请求。通过获取这些请求的参数来组织上面我们准备的批量请求命令UserBatchCommand实例。
? mapResponseToRequests: 在批量请求命令UserBatchCommand实例被触发执行完成之后 , 该 方 法开始执行, 其中batchResponse 参数保存了createCommand中组织的批量请求命令的返回结果, 而 collapsedRequests参数则代表了每个被合并的请求。 在这里我们通过遍历批量结果batchResponse对象, 为 collapsedRequests中 每个合并前的单个请求设置返回结果, 以此完成批量结果到单个请求结果的转换。
下图展示了在未使用HystrixCollapser请求合并器之前的线程使用情况。 可以看到, 当服务消费者同时对USER-SERVICE的 /users/{id}接口发起了5个请求时, 会向该依赖服务的独立线程池中申请5个线程来完成各自的请求操作
而在使用了HystrixCollapser请求合并器之后, 相同情况下的线程占用如下图所示。由于同一时间发生的5个请求处于请求合并器的 一 个时间窗内,这些发向/users/{id}接口的请求被请求合并器拦截下来, 并在合并器中进行组合, 然后将这些请求合并成 一 个请求发向 USER-SERVICE 的批量接口 /users?ids = {ids} 。在获取到批量请求结果之后,通过请求合并器再将批量结果拆分并分配给每个被合并的请求。 从图中我们可以看到, 通过使用请求合并器有效减少了对线程池中资源的占用。 所以在资源有效并且短时间内会产生高并发请求的时候, 为避免连接不够用而引起的延迟可以考虑使用请求合并器的方式来
处理和优化。
使用注解实现请求合并器
@Service
public class UserService {@Autowiredprivate RestTemplate restTemplate;@HystrixCollapser(batchMethod= "findAll", collapserProperties = {@HystrixProperty(name="timerDelayinMilliseconds", value = "100")})public User find(Long id)return null;}@HystrixCommandpublic List<User> findAll(List<Long> ids) {return restTemplate.getForObject("http://USER-SERVICE/users?ids = {1}",List. class, StringUtils. join (ids, ", ")) ;}
}
我们之前已经介绍过@HystrixCommand了,可以看到,这里通过它定义了一 个用于请求/users?ids={ids}接口。而在请求/users?ids={ids}接口的方法上通过@HystrixCollapser注解为其创建了合并请求器, 通过batchMethod 属性指定了批量请求的实现方法为findAll方法(即请求/users?ids = {ids}接口的命令),同时通过collapserProperties属性为合并请求器设置了相关属性这里使用@HystrixProperty(name=“timerDelayinMilliseconds”,value=“100”)将合并时间窗设置为100毫秒。 这样通过@HystirxCollapser注解简单而又优雅地实现了在/users/{id}依赖服务之前设置了 一 个批量请求合并器。
请求合并的额外开销
虽然通过请求合并可以减少请求的数量以缓解依赖服务线程池的资源, 但是在使用的时候也需要注意它所带来的额外开销: 用于请求合并的延迟时间窗会使得依赖服务的请求延迟增高。 比如, 某个请求不通过请求合并器访问的平均耗时为5ms, 请求合并的延迟时间窗为lOms (默认值), 那么当该请求设置了请求合并器之后, 最坏情况下(在延迟时间窗结束时才发起请求)该请求需要 15ms才能完成。由千请求合并器的延迟时间窗会带来额外开销, 所以我们是否使用请求合并器需要 根
据依赖服务调用的实际情况来选择, 主要考虑下面两个方面。
? 请求命令本身的延迟。 如果依赖服务的请求命令本身是 一 个高延迟的命令, 那么可以使用请求合并器, 因为延迟时间
窗的时间消耗显得微不足道了。
? 延迟时间窗内的并发量。 如果 一 个时间窗内只有1-2个请求, 那么这样的依赖服务不适合使用请求合并器。 这种情况不
但不能提升系统性能, 反而会成为系统瓶颈,因为每个请求都需要多消耗 一 个时间窗才响应。 相反, 如果 一 个时间窗
内具有很高的并发量, 并且服务提供方也实现了批量处理接口, 那么使用请求合并器可以有效减少网络连接数量并极大
提升系统吞吐量, 此时延迟时间窗所增加的消耗就可以忽略不计了。
Feign-Hystrix的使用
1.在Feign中本身就有一个Hystrix组件,所以我们只需要在客户端配置中开启就好
feign:hystrix:enabled: true
2.修改服务端接口,提供服务降级
修改商品Client服务(该服务提供了查询,修改库存等方法供订单服务调用。订单服务中,引用了该服务的pom文件)
修改前代码:
@FeignClient(name = "product")
public interface ProductClient {@GetMapping("/msg")String productMst();@PostMapping("/product/listForOrder")List<ProductInfoOutPut> listForOrder(@RequestBody List<String> productIdList);@PostMapping("/product/decreaseStock")void decreaseStock(@RequestBody List<DecreaseStockInput> cartDTOList);}
修改后,使用fallback定义降级处理,如果发生服务降级那么将调用ProductClientFallback类中对应方法。
@FeignClient(name = "product",fallback = ProductClient.ProductClientFallback.class)
public interface ProductClient {@GetMapping("/msg")String productMst();@PostMapping("/product/listForOrder")List<ProductInfoOutPut> listForOrder(@RequestBody List<String> productIdList);@PostMapping("/product/decreaseStock")void decreaseStock(@RequestBody List<DecreaseStockInput> cartDTOList);@Componentstatic class ProductClientFallback implements ProductClient{@Overridepublic String productMst() {return null;}@Overridepublic List<ProductInfoOutPut> listForOrder(List<String> productIdList) {return null;}@Overridepublic void decreaseStock(List<DecreaseStockInput> cartDTOList) {}}
}
在不启动商品服务的时候,直接调用Client是调用不通的所以进入降级处理返回商品信息为null
注意的点:
1.配置需要配对
2.order(客户端)使用了product(商品服务)中的降级,所以需要扫描到ProductClientFallback 类否则会报错,在启动类添加注解@ComponentScan(basePackages = “com.imooc”)
3.product(商品服务)ProductClientFallback 类上要添加@Component注解
Hystrix-dashboard(可视化组件)
1.引入依赖
<dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-netflix-hystrix-dashboard</artifactId></dependency>
2.启动类添加注解
@EnableHystrixDashboard
3.访问Hystrix Dashboard
输入地址、Delay(时间)、title(服务名称)然后Monitor Stream。这个时候,有可能出现一些坑,根据https://www.cnblogs.com/x1mercy/p/9291348.html解决就好。
当有请求访问的时候 该页面就会出现相应变化
这个数字颜色和上面是对应的,比如success就是绿色。后面那个灰色的0.0%是错误率。这个线是一段时间内相应的流量变化。host:0.0/s是请求频率。Circuit是熔断状态。
这个圆代表着流量,流量越大它越大,越红越表示越不健康,当然不一定是红色。