当前位置: 代码迷 >> 综合 >> Hystrix并发请求合并(Request Collapsing)模式学习
  详细解决方案

Hystrix并发请求合并(Request Collapsing)模式学习

热度:91   发布时间:2024-02-24 07:27:47.0

1 背景

我们日常开发的系统,对外会提供一些接口。例如提供id可以查询用户个人信息。采用每请求每线程时,系统在处理这类请求时都是来一个请求分派一个处理线程。这些查询都有相似的特性,根据一个同类id查对应的数据。

如果是在一个批量查询的场景,如分页查询,我们通常会一次传一批id去查。

在这里插入图片描述

但在查单次请求的场景下,后台每收到一次请求,就会请求一次存储系统。由于用户每次请求只传一个id,因此乍一看没法像批量查那样减少网络连接数量、请求线程数。

在这里插入图片描述

受批量查的启发。如果有这种性质相似的单次查询请求。服务端收到了,缓冲一段时间,积攒一批,然后再使用业务上提供的批量查询接口完成查询。查询结束后,再将查询结果和请求相映射,最终返回用户。这样来减少网络请求和线程数。

2 Hystrix中的请求合并

在Hystrix中我们可以将多个相似的HystrixCommand通过HystrixCollapser进行合并。

如前所述,批量查询可以减少网络连接数和线程数。但是由于是将分散的用户请求汇聚,并做批量处理因此会带来一定的延时问题。

其工作原理如下图[1]:
在这里插入图片描述
当我们产生了多次调用后,需要实现批量执行操作。这个就需要我们业务上开发相关接口。否则仅在一个线程中迭代执行,最终只会减少线程数,响应反而会慢很多。

如下是通过HystrixCollapser实现请求合并,我们覆盖HystrixCommand实现批量执行命令的创建,并传入批量参数。

在BatchCommand的run方法中实现批量操作,这里通过模拟返回批量数据代替。实际上通过执行批量方法,执行批量操作即可。

如下是官方demo。

public class CollapserTest {
    class CommandCollapserGetValueForKey extends HystrixCollapser<List<String>,String,Integer> {
    private final Integer key;public CommandCollapserGetValueForKey(Integer key) {
    this.key = key;}@Overridepublic Integer getRequestArgument() {
    return key;}@Overrideprotected HystrixCommand<List<String>> createCommand(Collection<CollapsedRequest<String, Integer>> collection) {
    return new BatchCommand(collection);}@Overrideprotected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) {
    int count = 0;for (CollapsedRequest<String,Integer> request : requests) {
    request.setResponse(batchResponse.get(count++));}}}private static final class BatchCommand extends HystrixCommand<List<String>> {
    private final Collection<HystrixCollapser.CollapsedRequest<String,Integer>> requests;private BatchCommand(Collection<HystrixCollapser.CollapsedRequest<String, Integer>> requests) {
    super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")).andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey")));this.requests = requests;}@Overrideprotected List<String> run() throws Exception {
    ArrayList<String> response = new ArrayList<>();for (HystrixCollapser.CollapsedRequest<String,Integer> request : requests) {
    response.add("ValueForKey: " + request.getArgument());}return response;}}@org.junit.Testpublic void testCollapser() throws Exception {
    HystrixRequestContext context = HystrixRequestContext.initializeContext();try {
    Future<String> f1 = new CommandCollapserGetValueForKey(1).queue();Future<String> f2 = new CommandCollapserGetValueForKey(2).queue();Future<String> f3 = new CommandCollapserGetValueForKey(3).queue();Future<String> f4 = new CommandCollapserGetValueForKey(4).queue();f1.get();f2.get();f3.get();f4.get();System.out.println(HystrixRequestLog.getCurrentRequest().getExecutedCommands().size());HystrixCommand<?> command = HystrixRequestLog.getCurrentRequest().getExecutedCommands().toArray(new HystrixCommand<?>[1])[0];System.out.println(command.getCommandKey().name());System.out.println(command.getExecutionEvents().contains(HystrixEventType.COLLAPSED));System.out.println(command.getExecutionEvents().contains(HystrixEventType.SUCCESS));} finally {
    context.shutdown();}}
}

总结下来看具有相同结构的独立请求按如下方式合并:

  1. 等待一段时间汇集批量操作
  2. 通过实现批量执行方法,执行批量操作
  3. 将批量操作结果与请求映射,最终返回

当然工具框架实际上还有很多复杂灵活的参数,这里主要学习基本使用和了解其工作工程。

3 参考

[1]RequestCollapsing,https://github.com/Netflix/Hystrix/wiki/How-it-Works#RequestCollapsing
[2]Collapsing,https://github.com/Netflix/Hystrix/wiki/How-To-Use#Collapsing

  相关解决方案