1 背景
我们日常开发的系统,对外会提供一些接口。例如提供id可以查询用户个人信息。采用每请求每线程时,系统在处理这类请求时都是来一个请求分派一个处理线程。这些查询都有相似的特性,根据一个同类id查对应的数据。
如果是在一个批量查询的场景,如分页查询,我们通常会一次传一批id去查。
但在查单次请求的场景下,后台每收到一次请求,就会请求一次存储系统。由于用户每次请求只传一个id,因此乍一看没法像批量查那样减少网络连接数量、请求线程数。
受批量查的启发。如果有这种性质相似的单次查询请求。服务端收到了,缓冲一段时间,积攒一批,然后再使用业务上提供的批量查询接口完成查询。查询结束后,再将查询结果和请求相映射,最终返回用户。这样来减少网络请求和线程数。
2 Hystrix中的请求合并
在Hystrix中我们可以将多个相似的HystrixCommand通过HystrixCollapser进行合并。
如前所述,批量查询可以减少网络连接数和线程数。但是由于是将分散的用户请求汇聚,并做批量处理因此会带来一定的延时问题。
其工作原理如下图[1]:
当我们产生了多次调用后,需要实现批量执行操作。这个就需要我们业务上开发相关接口。否则仅在一个线程中迭代执行,最终只会减少线程数,响应反而会慢很多。
如下是通过HystrixCollapser实现请求合并,我们覆盖HystrixCommand实现批量执行命令的创建,并传入批量参数。
在BatchCommand的run方法中实现批量操作,这里通过模拟返回批量数据代替。实际上通过执行批量方法,执行批量操作即可。
如下是官方demo。
public class CollapserTest {
class CommandCollapserGetValueForKey extends HystrixCollapser<List<String>,String,Integer> {
private final Integer key;public CommandCollapserGetValueForKey(Integer key) {
this.key = key;}@Overridepublic Integer getRequestArgument() {
return key;}@Overrideprotected HystrixCommand<List<String>> createCommand(Collection<CollapsedRequest<String, Integer>> collection) {
return new BatchCommand(collection);}@Overrideprotected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) {
int count = 0;for (CollapsedRequest<String,Integer> request : requests) {
request.setResponse(batchResponse.get(count++));}}}private static final class BatchCommand extends HystrixCommand<List<String>> {
private final Collection<HystrixCollapser.CollapsedRequest<String,Integer>> requests;private BatchCommand(Collection<HystrixCollapser.CollapsedRequest<String, Integer>> requests) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")).andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey")));this.requests = requests;}@Overrideprotected List<String> run() throws Exception {
ArrayList<String> response = new ArrayList<>();for (HystrixCollapser.CollapsedRequest<String,Integer> request : requests) {
response.add("ValueForKey: " + request.getArgument());}return response;}}@org.junit.Testpublic void testCollapser() throws Exception {
HystrixRequestContext context = HystrixRequestContext.initializeContext();try {
Future<String> f1 = new CommandCollapserGetValueForKey(1).queue();Future<String> f2 = new CommandCollapserGetValueForKey(2).queue();Future<String> f3 = new CommandCollapserGetValueForKey(3).queue();Future<String> f4 = new CommandCollapserGetValueForKey(4).queue();f1.get();f2.get();f3.get();f4.get();System.out.println(HystrixRequestLog.getCurrentRequest().getExecutedCommands().size());HystrixCommand<?> command = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand<?>[1])[0];System.out.println(command.getCommandKey().name());System.out.println(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED));System.out.println(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));} finally {
context.shutdown();}}
}
总结下来看具有相同结构的独立请求按如下方式合并:
- 等待一段时间汇集批量操作
- 通过实现批量执行方法,执行批量操作
- 将批量操作结果与请求映射,最终返回
当然工具框架实际上还有很多复杂灵活的参数,这里主要学习基本使用和了解其工作工程。
3 参考
[1]RequestCollapsing,https://github.com/Netflix/Hystrix/wiki/How-it-Works#RequestCollapsing
[2]Collapsing,https://github.com/Netflix/Hystrix/wiki/How-To-Use#Collapsing