1.创建请求命令
Hystrix命令(HystrixCommand)它用来封装具体的依赖服务调用逻辑。
我们可以用继承的方式实现:
public class HelloCommand extends HystrixCommand<String> {private RestTemplate restTemplate;private Long id;public HelloCommand(Setter setter,RestTemplate restTemplate,long id){super(setter);this.restTemplate=restTemplate;this.id=id;}@Overrideprotected String run() throws Exception {return restTemplate.getForEntity("http://hello-service/hello", String.class).getBody();}
}
通过上面实现的HelloCommand我们既可以同步执行也可以异步执行。
同步执行:String s= new HelloCommand(restTemplate,1L).execute();
异步执行:Future<String> future=new HelloCommand(restTemplate,1L).queue();异步执行时,可以通过对返回的future调用get方法来获取结果。
另外可以用@HystrixCommand注解阿里实现Hystrix命令的定义。
@Service
public class HelloService {private final Logger logger=Logger.getLogger(getClass());@AutowiredRestTemplate restTemplate;@HystrixCommand(fallbackMethod = "helloFallback")public String helloService() {long start=System.currentTimeMillis();ResponseEntity<String> responseEntity = restTemplate.getForEntity("http://hello-service/hello", String.class);String body = responseEntity.getBody();long end=System.currentTimeMillis();logger.info("消耗时间:"+(start-end));return body;}@HystrixCommand(fallbackMethod = "helloFallback")public Future<String> helloServiceFuture() {return new AsyncResult<String>(){public String invoke(){System.out.println("future当前时间"+new Date(System.currentTimeMillis()));return restTemplate.getForObject("http://hello-service/hello",String.class);}};}public String helloFallback() {return "error";}
}
这里分别用上了同步和异步两种方式,除了传统的同步和异步执行外,还可以将HystrixCommand通过Observable来实现响应式执行方式。通过调用observe()和toObservable()方法可以返回Observable对象:
Observable<String> ho=new HelloCommand(restTemplate,1L).observe();
Observable<String> co=new HelloCommand(restTemplate,1L).toObservable();
前者返回的是Hot Observable该命令会在observe()调用的时候立即执行,当Observable每次订阅的时候会重放它的行为,后者返回的是一个Cold Observable,toObservable()执行后命令不会立即执行,只有当所有订阅者都订阅它才会执行。
而使用@HystrixCommand来实现的方式是:
@HystrixCommand(fallbackMethod = "helloFallback")public Observable<String> helloServiceObservable() {return Observable.create(new Observable.OnSubscribe<String>() {@Overridepublic void call(Subscriber<? super String> subscriber) {try {if (!subscriber.isUnsubscribed()){String s=restTemplate.getForObject("http://hello-service/hello",String.class);subscriber.onNext(s);subscriber.onCompleted();}}catch (Exception e){subscriber.onError(e);}}});}
在使用@HystrixCommand注解实现响应式命令时,可以通过observableExecutionMode参数来控制是使用observe()还是toObservable()的执行方式:
@HystrixCommand(observableExecutionMode = ObservableExecutionMode.EAGER)
@HystrixCommand(observableExecutionMode = ObservableExecutionMode.LAZY)
EAGER表示使用observe()执行方式。
LAZY表示使用toObservable()执行方式。
2.定义服务降级
fallback是Hystrix命令执行失败时使用的后备方法,用来实现服务的降级处理逻辑。而在HystrixCommand中可以通过重载getFallback()方法来实现服务的降级处理,Hystrix会在run()执行中出现错误,超时,线程池拒绝,断路器熔断等情况,执行getFallback()方法内的逻辑可以实现服务降级逻辑。
在HystrixObservableCommand实现的Hystrix命令中,我们可以通过重载resumeWithFallback方法来实现服务降级逻辑。该方法会返回一个Observable对象,当命令执行失败时,Hystrix会将Observable中的结果通知给所有订阅者。
当通过注解的形式进行服务降级只需要使用@HystrixCommand中的fallbackMethod参数来指定服务降级方法:
public String helloFallback() {return "error";}@HystrixCommand(fallbackMethod = "helloFallback")public Future<String> helloServiceFuture() {……}
当使用注解来定义服务降级时,我们需要将具体的Hystrix命令与fallback实现函数定义在同一个类中,并且fallback方法的名字相同。由于必须定义在一个类中,所以对于fallback的访问修饰符没有特定要求。
在实际使用时,我们需要为大多数执行过程中可能会失败的Hystrix命令进行服务降级逻辑,但是也有些情况不需要去实现降级逻辑。
1.执行写操作的命令:当Hystrix命令是用来执行写操作的时候,实现服务降级的意义并不是很大。当写入失败时,这时候通常只需要通知调用者即可。
2.执行批处理或离线计算时:当Hystrix命令是用来执行批处理程序生成一份报告或是进行任何类型的离线计算时,那么通常这些操作只需要将错误传播给调用者,让调用者稍后重试而不是进行降级处理响应。
3.异常处理
3.1异常传播
在HystrixCommand实现的run()方法中跑出异常时,除了HystrixBadRequestException之外,其他异常均会被Hystrix认为命令执行失败并触发服务降级的处理逻辑。
而在注解配置实现Hystrix命令时,它还支持忽略指定异常类型的功能,只需要通过设置@HystrixCommand注解的ignoreException参数,比如:
@HystrixCommand(fallbackMethod = "helloFallback",ignoreExceptions = {NullPointerException.class})
这条注释定义了当方法抛出空指针异常时,Hystrix会将其包装在HystrixBadRequestException中,这样就不会触发后续的fallback逻辑。
3.2异常获取
当Hystrix命令因为异常(除了HystrixBadRequestException的异常)进入服务降级逻辑后需要对其不同的异常进行针对性处理。
在以继承方式实现Hystrix命令中,我们可以用getFallback()方法通过Throwable getExecutionException()方法来获取具体的异常,通过判断来进入不同的处理逻辑。
而通过注解的方式来获取异常可以使用该种方式:
@HystrixCommand(fallbackMethod = "helloFallback")public String exception() {System.out.println("exception");if(1==1)throw new RuntimeException("command failed!!!!!");return "hello";}public String helloFallback(Throwable e) {System.out.println(e.getMessage());assert "command failed".equals(e.getMessage());System.out.println(e.getMessage());return "error";}
(如果你用debug模式需要注意如果长时间未到降级服务方法,它会变成其他异常信息。。。这样就不能获取RuntimeException异常。)
4.命令名称、分组以及线程池划分
当使用@HystrixCommand注解的时候设置命令名、分组以及线程划分,只需要设置注解的commandKey、groupKey以及threadPoolKey属性即可,如:
@HystrixCommand(commandKey = "idKey",groupKey = "Group",threadPoolKey = "Thread")
当使用继承的方式来实现命令组名等信息时,可以用如下的方式:这里通过withGroupKey来设置命令组,然后调用andCommandKey来设置命令名。因为Setter定义中,只有withGroupKey静态函数可以创建Setter的实例,所以GroupKey是每个Setter必须的参数,而CommandKey则是可选的参数。而HystrixThreadPoolKey来对线程池来进行设置,通过它我们可以实现更细粒度的线程池划分。
public HelloCommand() {super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GroupKey")).andCommandKey(HystrixCommandKey.Factory.asKey("CommandKey")).andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolKey")));}
5.请求缓存
Hystrix请求缓存的使用非常简单,我们只需要在实现HystrixCommand或HystrixObservableCommand时,通过重载getCacheKey()方法开启请求缓存。
package com.example.eureka.ribbon.dao;import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.netflix.hystrix.HystrixThreadPoolKey;
import org.springframework.web.client.RestTemplate;public class HelloCommand extends HystrixCommand<String> {private RestTemplate restTemplate;private Long id;public HelloCommand(Setter setter, RestTemplate restTemplate, long id) {super(setter);this.restTemplate = restTemplate;this.id = id;}public HelloCommand( RestTemplate restTemplate, long id) {super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("Group")));this.restTemplate = restTemplate;this.id = id;}public HelloCommand() {super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GroupKey")).andCommandKey(HystrixCommandKey.Factory.asKey("CommandKey")).andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolKey")));}@Overrideprotected String run() throws Exception {return restTemplate.getForEntity("http://hello-service/hello", String.class).getBody();}protected String getCacheKey(){return String.valueOf(id);}}
这里通过getCacheKey方法中的返回的请求缓存key值,就能让该请求命令具备缓存功能。通过开启请求缓存可以让我们实现的Hystrix命令具备下面的几项好处:
1.减少重复请求数,降低依赖服务的并发度。
2.在同一用户请求的上下文中,相同依赖服务的返回数据始终保持一致。
3.请求缓存在run()和construct()执行之前生效,所以可以有效减少不必要的线程开销。
清理失效缓存功能
在Hystrix中,我们可以通过HystrixRequestCache.clear()方法来进行缓存的清理:
public class HelloCommand extends HystrixCommand<String> {private static final HystrixCommandKey GETTER_KEY=HystrixCommandKey.Factory.asKey("CommandKey");private RestTemplate restTemplate;private Long id;public HelloCommand(Setter setter, RestTemplate restTemplate, long id) {super(setter);this.restTemplate = restTemplate;this.id = id;}public HelloCommand( RestTemplate restTemplate, long id) {super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("Group")).andCommandKey(GETTER_KEY));this.restTemplate = restTemplate;this.id = id;}public HelloCommand() {super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("GroupKey")).andCommandKey(HystrixCommandKey.Factory.asKey("CommandKey")).andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ThreadPoolKey")));}@Overrideprotected String run() throws Exception {return restTemplate.getForEntity("http://hello-service/hello", String.class).getBody();}protected String getCacheKey(){return String.valueOf(id);}public static void flushCache(Long id){HystrixRequestCache.getInstance(GETTER_KEY,HystrixConcurrencyStrategyDefault.getInstance()).clear(String.valueOf(id));}
}
使用注解实现请求缓存
注解 | 描述 | 属性 |
|
该注解用来标记请求命令返回的结果应该被缓存,它必须与@HystrixCommand结合使用 | cacheKeyMethod |
|
该注解用来让请求命令的缓存失效,失效的缓存根据定义的key决定 | commandKey,cacheKeyMethod |
|
该注解用来在请求命令的参数上标记,使其作为缓存的key值,如果没有 标注则会使用所有参数。如果同时使用了@CacheResult和@CacheRemove注解的cacheKeyMethod方法指定缓存Key的生成,那么该注解将不会起作用 | value |
定义缓存Key:当使用注解来定义请求缓存时,如果要为请求命令指定具体的缓存Key生成规则,我们可以使用@CacheResult和@CacheRemove注解的cacheKeyMethod方法来指定具体的生成函数;也可以通过使用@CacheKey注解在方法参数中指定用于组装缓存Key的元素。
使用cacheKeyMethod方法示例如下:
@CacheResult(cacheKeyMethod = "CacheKey")@HystrixCommand(fallbackMethod = "helloFallbacks")public String helloService(int id) {long start=System.currentTimeMillis();ResponseEntity<String> responseEntity = restTemplate.getForEntity("http://hello-service/hello", String.class);String body = responseEntity.getBody();long end=System.currentTimeMillis();logger.info("消耗时间:"+(start-end));return body;}private String CacheKey(int id){System.out.println(id);return String.valueOf(id);}
需要注意的是要在控制器中加入
HystrixRequestContext.initializeContext();
否则他会报:java.lang.IllegalStateException: Request caching is not available. Maybe you need to initialize the HystrixRequestContext?
通过@CacheKey注解实现方式如下:
@CacheResult@HystrixCommand(fallbackMethod = "helloFallbacks")public String helloService(@CacheKey("id") int id) {long start=System.currentTimeMillis();ResponseEntity<String> responseEntity = restTemplate.getForEntity("http://hello-service/hello", String.class);String body = responseEntity.getBody();long end=System.currentTimeMillis();logger.info("消耗时间:"+(start-end));return body;}
@CacheKey("id"),会报java.beans.IntrospectionException: Method not found: isId,目前还不知道什么原因。
缓存清理:这里通过@CacheRemove注解来进行失效缓存的清理,比如下面的例子:
@CacheResult
@HystrixCommand(fallbackMethod = "helloFallbacks")
public String helloService(int id) {long start=System.currentTimeMillis();ResponseEntity<String> responseEntity = restTemplate.getForEntity("http://hello-service/hello", String.class);String body = responseEntity.getBody();long end=System.currentTimeMillis();logger.info("消耗时间:"+(start-end));return body;
}@CacheRemove(commandKey = "helloService")@HystrixCommand(fallbackMethod = "helloFallbacks")public String updateCache(int id) {ResponseEntity<String> responseEntity = restTemplate.getForEntity("http://hello-service/hello", String.class);String body = responseEntity.getBody();return body;}
@RequestMapping("/ribbon-consumer")public String helloConsumer(int id) {System.out.println("同步:当前时间:" + new Date(System.currentTimeMillis()));String s = helloService.helloService(id);System.out.println("同步:当前时间:" + new Date(System.currentTimeMillis()) + "值:" + s);String ss = helloService.helloService(id);String sss= helloService.updateCache(id);System.out.println(s+"====\n"+ss+"=====\n"+sss);return s;}
需要注意的是@CacheRemove注解的commandKey属性必须指定,它用来指明需要使用请求缓存的请求命令,因为只有通过该属性的配置Hystrix才能找到正确的请求命令缓存的位置。
这里我新加了一个过滤器代码:
@WebFilter(filterName = "hystrixContextServletFilter",urlPatterns = "/*",asyncSupported = true)
public class HystrixContextServletFilter implements Filter {@Overridepublic void init(FilterConfig filterConfig) throws ServletException {}@Overridepublic void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException {System.out.println("+++++++++++++++++++ filter +++++++++++++++++++++++");HystrixRequestContext context=HystrixRequestContext.initializeContext();try {chain.doFilter(request,response);}finally {context.shutdown();}}@Overridepublic void destroy() {}
}
并且在主类处开启@ServletComponentScan注解:
@SpringCloudApplication
@ServletComponentScan
public class EurekaRibbonApplication {@Bean@LoadBalancedRestTemplate restTemplate() {return new RestTemplate();}public static void main(String[] args) {SpringApplication.run(EurekaRibbonApplication.class, args);}
}
参考《Spring Cloud微服务实战》