ratelimiter
soul 网关使用 redis 实现了一个限流器。
RateLimiterPluginDataHandler
当插件配置被修改时(位于 admin 系统->系统管理->插件管理),数据将流向此类的 handlerPlugin 方法,soul 使用这些信息组装 redis 连接:
public void handlerPlugin(final PluginData pluginData) {
if (Objects.nonNull(pluginData) && pluginData.getEnabled()) {
//init redis// 获取限流器配置RateLimiterConfig rateLimiterConfig = GsonUtils.getInstance().fromJson(pluginData.getConfig(), RateLimiterConfig.class);//spring data redisTemplateif (Objects.isNull(Singleton.INST.get(ReactiveRedisTemplate.class))|| Objects.isNull(Singleton.INST.get(RateLimiterConfig.class))|| !rateLimiterConfig.equals(Singleton.INST.get(RateLimiterConfig.class))) {
// 使用配置生成 Lettuce 连接工厂实例LettuceConnectionFactory lettuceConnectionFactory = createLettuceConnectionFactory(rateLimiterConfig);lettuceConnectionFactory.afterPropertiesSet();RedisSerializer<String> serializer = new StringRedisSerializer();RedisSerializationContext<String, String> serializationContext =RedisSerializationContext.<String, String>newSerializationContext().key(serializer).value(serializer).hashKey(serializer).hashValue(serializer).build();// 生成 redis 连接实例并放入缓存ReactiveRedisTemplate<String, String> reactiveRedisTemplate = new SoulReactiveRedisTemplate<>(lettuceConnectionFactory, serializationContext);Singleton.INST.single(ReactiveRedisTemplate.class, reactiveRedisTemplate);Singleton.INST.single(RateLimiterConfig.class, rateLimiterConfig);}}}
RedisRateLimiter
RedisRateLimiter 类维护了实现限流功能的关键 lua 脚本:
private RedisScript<List<Long>> redisScript() {
DefaultRedisScript redisScript = new DefaultRedisScript<>();// 加载 lua 脚本,放入 java 内存redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("/META-INF/scripts/request_rate_limiter.lua")));redisScript.setResultType(List.class);return redisScript;}
脚本关键逻辑如下:
...// 获取上一次的数量
local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil thenlast_tokens = capacity
end
--redis.log(redis.LOG_WARNING, "last_tokens " .. last_tokens)// 获取上一次调用的时间
local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil thenlast_refreshed = 0
end
--redis.log(redis.LOG_WARNING, "last_refreshed " .. last_refreshed)local delta = math.max(0, now-last_refreshed)
// 根据现在距离上一次调用的时间和流速配置,计算出当前的数量
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
// 计算是否超限
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed thennew_tokens = filled_tokens - requestedallowed_num = 1
end...
// 重新设置缓存并刷新过期时间
redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)return {
allowed_num, new_tokens }
同时使用 isAllowed 方法封装了 lua 脚本的调用过程:
public Mono<RateLimiterResponse> isAllowed(final String id, final double replenishRate, final double burstCapacity) {
... // 从 Singleton 中获取 RateLimiterPluginDataHandler 类缓存的 ReactiveRedisTemplateFlux<List<Long>> resultFlux = Singleton.INST.get(ReactiveRedisTemplate.class).execute(this.script, keys, scriptArgs);// 执行脚本并返回结果return resultFlux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L))).reduce(new ArrayList<Long>(), (longs, l) -> {
longs.addAll(l);return longs;}).map(results -> {
boolean allowed = results.get(0) == 1L;Long tokensLeft = results.get(1);RateLimiterResponse rateLimiterResponse = new RateLimiterResponse(allowed, tokensLeft);log.info("RateLimiter response:{}", rateLimiterResponse.toString());return rateLimiterResponse;}).doOnError(throwable -> log.error("Error determining if user allowed from redis:{}", throwable.getMessage()));}
RateLimiterPlugin
最终 RateLimiterPlugin 的 doExecute 方法只需要调用 RedisRateLimiter 实例的 isAllowed 方法即可。
RewritePlugin
RewritePlugin 提供了 url 的重写功能,代码比较简单,将用户配置的 rewriteURI 写入上下文:
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
...// 设置 RewriteURIexchange.getAttributes().put(Constants.REWRITE_URI, rewriteHandle.getRewriteURI());return chain.execute(exchange);}
然后在 DividePlugin 的逻辑中,会在组装实际 URL 的时候判断此参数:
private String buildRealURL(final String domain, final SoulContext soulContext, final ServerWebExchange exchange) {
String path = domain;final String rewriteURI = (String) exchange.getAttributes().get(Constants.REWRITE_URI);// 如果设置了 rewriteURI,则以 rewriteURI 为准if (StringUtils.isNoneBlank(rewriteURI)) {
path = path + rewriteURI;} else {
final String realUrl = soulContext.getRealUrl();if (StringUtils.isNoneBlank(realUrl)) {
path = path + realUrl;}}...return path;}
ContextPathMappingPlugin
ContextPathMappingPlugin 提供了 contextPath 的重写功能,比如请求路径为/soul/http/order, 配置的contextPath为’/soul/http’,那么真正请求的url为’/order’。
代码也相对简单,进行匹配替换即可:
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
...//check the context path illegal// 如果 contextPath 并非以用户设置的替换规则开始,则报错if (!soulContext.getPath().startsWith(contextMappingHandle.getContextPath())) {
Object error = SoulResultWrap.error(SoulResultEnum.CONTEXT_PATH_ERROR.getCode(), SoulResultEnum.CONTEXT_PATH_ERROR.getMsg(), null);return WebFluxResultUtils.result(exchange, error);}// 替换路径 urlthis.buildContextPath(soulContext, contextMappingHandle);return chain.execute(exchange);}private void buildContextPath(final SoulContext context, final ContextMappingHandle handle) {
...// 分词Optional<String> optional = Arrays.stream(context.getPath().split(handle.getContextPath())).reduce((first, last) -> last);// 设置 RealUrloptional.ifPresent(context::setRealUrl);}
最终,会在上文提到的 DividePlugin 插件中,以低于 rewrite 插件配置的优先级进行 URL 组装。