Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.10) 之 RequestRateLimiterGatewayFilterFactory 请求限流

摘要: 原创出处 http://www.iocoder.cn/Spring-Cloud-Gateway/filter-request-rate-limiter/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Spring-Cloud-Gateway 2.0.X M4


1. 概述

本文主要分享 RequestRateLimiterGatewayFilterFactory 的代码实现

《Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.2) 之 GatewayFilterFactory 过滤器工厂》 一文中,我们看到 Spring Cloud Gateway 提供了多种 GatewayFilterFactory 的实现,而 RequestRateLimiterGatewayFilterFactory 也是其中的一种。

通过 RequestRateLimiterGatewayFilterFactory ,可以创建 RequestRateLimiterGatewayFilter ( 实际是内部匿名类,为了表述方便,下面继续这么称呼 ) 。

RequestRateLimiterGatewayFilter 使用 Redis + Lua 实现分布式限流。而限流的粒度,例如 URL / 用户 / IP 等,通过 org.springframework.cloud.gateway.filter.ratelimit.KeyResolver 实现类决定,在 「4. KeyResolver」 详细解析。

这里,笔者一本正经的推荐下自己分享的 《Eureka 源码解析 —— 基于令牌桶算法的 RateLimiter》 ,简直业界良心。


推荐 Spring Cloud 书籍

推荐 Spring Cloud 视频

2. 环境搭建

第一步,以 spring-cloud-gateway-sample 项目为基础,在 pom.xml 文件添加依赖库。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
</dependency>

第二步,在 application.yml 配置一个 RouteDefinition 。

spring:
  cloud:
    gateway:
      routes:
      # =====================================
      - id: default_path_to_httpbin
        uri: http://127.0.0.1:8081
        order: 10000
        predicates:
        - Path=/**
        filters:
        - RequestRateLimiter=10, 20, #{@principalNameKeyResolver}
  • - RequestRateLimiter=10, 20, #{@principalNameKeyResolver} ,配置 RequestRateLimiterGatewayFilterFactory 。
    • 默认情况下,基于令牌桶算法实现限流。
    • 第一个参数,burstCapacity ,令牌桶上限 。
    • 第二个参数,replenishRate ,令牌桶填充平均速率,单位:秒。
    • 第三个参数,keyResolver ,限流解析器 Bean 对象名字,根据 #{@beanName} ,使用 SpEL 表达式,从 Spring 容器中获取 Bean 对象,详细参见 RouteDefinitionRouteLocator#getTuple(ArgumentHints, Map<String, String>, SpelExpressionParser, BeanFactory) 处的代码。另外,这里有一个 BUG :在 YAML 里,# 代表注释,所以第三个参数无法正确被读取,需要等待官方修复。如果比较着急使用,可以考虑将此处的 # 修改成 \# ,并修改部分相关代码以解决该 BUG 。

第三步,配置完成,启动 spring-cloud-gateway-sample 项目。

友情提示,RequestRateLimiterGatewayFilter 使用了 RedisTemplate ,生产环境请配置。

3. RequestRateLimiterGatewayFilterFactory

org.springframework.cloud.gateway.filter.factory.RequestRateLimiterGatewayFilterFactory ,请求限流网关过滤器工厂类。代码如下 :

  1: public class RequestRateLimiterGatewayFilterFactory implements GatewayFilterFactory {
  2:
  3:    public static final String KEY_RESOLVER_KEY = "keyResolver";
  4:
  5:    private final RateLimiter rateLimiter;
  6:    private final KeyResolver defaultKeyResolver;
  7:
  8:    public RequestRateLimiterGatewayFilterFactory(RateLimiter rateLimiter,
  9:            KeyResolver defaultKeyResolver) {
 10:        this.rateLimiter = rateLimiter;
 11:        this.defaultKeyResolver = defaultKeyResolver;
 12:    }
 13:
 14:    @Override
 15:    public List<String> argNames() {
 16:         return Arrays.asList(
 17:                 RedisRateLimiter.REPLENISH_RATE_KEY,
 18:                 RedisRateLimiter.BURST_CAPACITY_KEY,
 19:                 KEY_RESOLVER_KEY
 20:         );
 21:    }
 22:
 23:    @Override
 24:    public boolean validateArgs() {
 25:        return false;
 26:    }
 27:
 28:    @SuppressWarnings("unchecked")
 29:    @Override
 30:    public GatewayFilter apply(Tuple args) {
 31:         validateMin(2, args);
 32:
 33:         // 获得 KeyResolver
 34:        KeyResolver keyResolver;
 35:        if (args.hasFieldName(KEY_RESOLVER_KEY)) {
 36:            keyResolver = args.getValue(KEY_RESOLVER_KEY, KeyResolver.class);
 37:        } else {
 38:            keyResolver = defaultKeyResolver;
 39:        }
 40:
 41:        return (exchange, chain) -> keyResolver.resolve(exchange).flatMap(key ->
 42:             // TODO: if key is empty?
 43:             rateLimiter.isAllowed(key, args).flatMap(response -> {
 44:                 // TODO: set some headers for rate, tokens left
 45:
 46:                 // 允许访问
 47:                 if (response.isAllowed()) {
 48:                     return chain.filter(exchange);
 49:                 }
 50:
 51:                 // 被限流,不允许访问
 52:                 exchange.getResponse().setStatusCode(HttpStatus.TOO_MANY_REQUESTS);
 53:                 return exchange.getResponse().setComplete();
 54:             }));
 55:    }
 56:
 57: }
  • rateLimiter 属性,限流器。默认情况下,使用 RedisRateLimiter 。
  • defaultKeyResolver 属性,默认限流解析器。默认情况下,使用 PrincipalNameKeyResolver 。
  • #argNames() 方法,定义了 Tuple 参数的 Key 为 replenishRate / burstCapacity / keyResolver
  • #validateArgs() 方法,定义在 RouteDefinitionRouteLocator#getTuple(ArgumentHints, Map<String, String>, SpelExpressionParser, BeanFactory) 无需校验 Tuple 结果。因为 keyResolver 非必填项,在 #apply() 方法,创建 RequestRateLimiterGatewayFilter 时校验
  • #apply() 方法,创建 RequestRateLimiterGatewayFilter 对象。
  • 第 31 行 :校验 Tuple 参数至少有两个元素,即 replenishRateburstCapacity 。而 keyResolver选填,为空时,使用默认值 defaultKeyResolver
  • 第 34 至 39 行 :获得 keyResolver 。通过它,获得请求的限流,例如URL / 用户 / IP 等。
  • ——— 第 41 至 54 行 :创建 RequestRateLimiterGatewayFilter 对象并返回
  • 第 41 行 :调用 KeyResolver#resolve(ServerWebExchange) 方法,获得请求的限流
    • 注意下,这里未处理限流为空的情况( TODO: if key is empty? )。所以,当限流为空时,过滤器链不会继续向下执行,也就是说,不会请求后端 Http / Websocket 服务,并且最终返回客户端 200 状态码,内容为
  • 第 43 至 54 行 :调用 RateLimiter#isAllowed(ServerWebExchange, Tuple) 方法,判断是否被限流。
    • 第 47 至 49 行 :被限流,允许访问,提交过滤器链继续过滤。
    • 第 52 至 53 行 :被限流, 允许访问,设置响应 429 状态码,并回写客户端响应( exchange.getResponse().setComplete() ) 。

4. KeyResolver

org.springframework.cloud.gateway.filter.ratelimit.KeyResolver ,请求解析器接口。代码如下 :

public interface KeyResolver {
    Mono<String> resolve(ServerWebExchange exchange);
}
  • KeyResolver#resolve(ServerWebExchange) 方法,获得请求的限流

通过实现 KeyResolver 接口,实现获得不同的请求的限流,例如URL / 用户 / IP 等。

目前版本,Spring Cloud Gateway 提供的 KeyResolver 实现类只有 PrincipalNameKeyResolver 。据官方说法,在未来的里程碑版本中,将会有一些 KeyResolver 具体实现类。

4.1 PrincipalNameKeyResolver

org.springframework.cloud.gateway.filter.ratelimit.PrincipalNameKeyResolver ,使用请求认证的 java.security.Principal 作为限流。代码如下 :

public class PrincipalNameKeyResolver implements KeyResolver {

    public static final String BEAN_NAME = "principalNameKeyResolver";

    @Override
    public Mono<String> resolve(ServerWebExchange exchange) {
        return exchange.getPrincipal().map(Principal::getName).switchIfEmpty(Mono.empty());
    }
}

4.2 自定义 KeyResolver

通过实现 KeyResolver 接口,实现自定义 KeyResolver 。下面我们实现一个使用请求 IP 作为限流的 KeyResolver 。

第一步,创建 RemoteAddrKeyResolver 类,代码如下 :

public class RemoteAddrKeyResolver implements KeyResolver {

    public static final String BEAN_NAME = "remoteAddrKeyResolver";

    @Override
    public Mono<String> resolve(ServerWebExchange exchange) {
        return Mono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress());
    }

}

第二步,配置 RemoteAddrKeyResolver Bean 对象,代码如下 :

@Bean(name = RemoteAddrKeyResolver.BEAN_NAME)
@ConditionalOnBean(RateLimiter.class)
public RemoteAddrKeyResolver remoteAddrKeyResolver() {
    return new RemoteAddrKeyResolver();
}

第三步,配置 RouteDefinition 路由配置,配置如下 :

spring:
  cloud:
    gateway:
      routes:
      # =====================================
      - id: default_path_to_httpbin
        uri: http://127.0.0.1:8081
        order: 10000
        predicates:
        - Path=/**
        filters:
        - RequestRateLimiter=10, 20, #{@remoteAddrKeyResolver}

第四步,大功告成,启动 Spring Cloud Gateway 即可。

另外,推荐 《周立 —— Spring Cloud限流详解(附源码)》,里面有一些限流维度的分析。

5. RateLimiter

org.springframework.cloud.gateway.filter.ratelimit.RateLimiter ,限流器接口。代码如下 :

public interface RateLimiter {

    Mono<Response> isAllowed(String id, Tuple args);

}
  • #isAllowed(String id, Tuple args) 方法,判断是否被限流。
  • Response 类,代码如下 :

    class Response {
        /**
         * 是否允许访问( 未被限流 )
         */
        private final boolean allowed;
        /**
         * 令牌桶剩余数量
         */
        private final long tokensRemaining;
    
        public Response(boolean allowed, long tokensRemaining) {
            this.allowed = allowed;
            this.tokensRemaining = tokensRemaining;
        }
    }
    

5.1 GatewayRedisAutoConfiguration

org.springframework.cloud.gateway.config.GatewayRedisAutoConfiguration ,Redis 相关配置类,代码如下 :

  1: @Configuration
  2: @AutoConfigureAfter(RedisReactiveAutoConfiguration.class)
  3: @AutoConfigureBefore(GatewayAutoConfiguration.class)
  4: @ConditionalOnBean(ReactiveRedisTemplate.class)
  5: @ConditionalOnClass({RedisTemplate.class, DispatcherHandler.class})
  6: class GatewayRedisAutoConfiguration {
  7:
  8:    @Bean
  9:    @SuppressWarnings("unchecked")
 10:    public RedisScript redisRequestRateLimiterScript() {
 11:        DefaultRedisScript redisScript = new DefaultRedisScript<>();
 12:        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("META-INF/scripts/request_rate_limiter.lua")));
 13:        redisScript.setResultType(List.class);
 14:        return redisScript;
 15:    }
 16:
 17:    @Bean
 18:    //TODO: replace with ReactiveStringRedisTemplate in future
 19:    public ReactiveRedisTemplate<String, String> stringReactiveRedisTemplate(
 20:            ReactiveRedisConnectionFactory reactiveRedisConnectionFactory,
 21:            ResourceLoader resourceLoader) {
 22:        RedisSerializer<String> serializer = new StringRedisSerializer();
 23:        RedisSerializationContext<String , String> serializationContext = RedisSerializationContext
 24:                .<String, String>newSerializationContext()
 25:                .key(serializer)
 26:                .value(serializer)
 27:                .hashKey(serializer)
 28:                .hashValue(serializer)
 29:                .build();
 30:        return new ReactiveRedisTemplate<>(reactiveRedisConnectionFactory,
 31:                serializationContext);
 32:    }
 33:
 34:    @Bean
 35:    public RedisRateLimiter redisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate,
 36:                                             @Qualifier("redisRequestRateLimiterScript") RedisScript<List<Long>> redisScript) {
 37:        return new RedisRateLimiter(redisTemplate, redisScript);
 38:    }
 39:
 40: }
  • 第 8 至 15 行 :创建 org.springframework.data.redis.core.script.RedisScript Bean 对象,加载 META-INF/scripts/request_rate_limiter.lua 路径下的 Redis Lua 脚本。该脚本使用 Redis 基于令牌桶算法实现限流。在本文 「Redis Lua 脚本」 详细解析。
  • 第 17 至 32 行 :创建 org.springframework.data.redis.core.ReactiveRedisTemplate Bean 对象。
  • 第 34 至 38 行 :使用 RedisScript 和 ReactiveRedisTemplate Bean 对象,创建 RedisRateLimiter Bean 对象。

5.2 RedisRateLimiter

org.springframework.cloud.gateway.filter.ratelimit.RedisRateLimiter ,基于 Redis 的分布式限流器实现类

构造方法,代码如下 :

public class RedisRateLimiter implements RateLimiter {
    public static final String REPLENISH_RATE_KEY = "replenishRate";
    public static final String BURST_CAPACITY_KEY = "burstCapacity";

    private final ReactiveRedisTemplate<String, String> redisTemplate;
    private final RedisScript<List<Long>> script;

    public RedisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate,
            RedisScript<List<Long>> script) {
        this.redisTemplate = redisTemplate;
        this.script = script;
    }
}
  • redisTemplate 属性,RedisTemplate 。
  • script 属性,Lua 脚本。

#isAllowed(id, Tuple) ,代码如下 :

  1: public Mono<Response> isAllowed(String id, Tuple args) {
  2:    // How many requests per second do you want a user to be allowed to do?
  3:    int replenishRate = args.getInt(REPLENISH_RATE_KEY);
  4:
  5:    // How much bursting do you want to allow?
  6:    int burstCapacity;
  7:    if (args.hasFieldName(BURST_CAPACITY_KEY)) {
  8:        burstCapacity = args.getInt(BURST_CAPACITY_KEY);
  9:    } else {
 10:        burstCapacity = 0;
 11:    }
 12:
 13:    try {
 14:        // Make a unique key per user.
 15:        String prefix = "request_rate_limiter." + id;
 16:
 17:        // You need two Redis keys for Token Bucket.
 18:        List<String> keys = Arrays.asList(prefix + ".tokens", prefix + ".timestamp");
 19:
 20:        // The arguments to the LUA script. time() returns unixtime in seconds.
 21:        List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "",
 22:                Instant.now().getEpochSecond() + "", "1");
 23:        // allowed, tokens_left = redis.eval(SCRIPT, keys, args)
 24:        Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);
 25:        // .log("redisratelimiter", Level.FINER);
 26:        return flux
 27:                // Throwable => Flux.just(Arrays.asList(1L, -1L)) 。
 28:                .onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
 29:                // Flux<List<Long>> => Mono<List<Long>>
 30:                .reduce(new ArrayList<Long>(), (longs, l) -> {
 31:                    longs.addAll(l);
 32:                    return longs;
 33:                })
 34:                // Mono<List<Long>> => Mono<Response>
 35:                .map(results -> {
 36:                    boolean allowed = results.get(0) == 1L;
 37:                    Long tokensLeft = results.get(1);
 38:
 39:                    Response response = new Response(allowed, tokensLeft);
 40:
 41:                    if (log.isDebugEnabled()) {
 42:                        log.debug("response: " + response);
 43:                    }
 44:                    return response;
 45:                });
 46:    }
 47:    catch (Exception e) {
 48:        /*
 49:         * We don't want a hard dependency on Redis to allow traffic. Make sure to set
 50:         * an alert so you know if this is happening too much. Stripe's observed
 51:         * failure rate is 0.01%.
 52:         */
 53:        log.error("Error determining if user allowed from redis", e);
 54:    }
 55:    return Mono.just(new Response(true, -1));
 56: }
  • id 方法参数,令牌桶编号。一个令牌桶编号对应令牌桶。
    • 在本文场景中为请求限流
  • 第 3 行 :获得 burstCapacity 令牌桶上限。
  • 第 5 至 11 行 :获得 replenishRate ,令牌桶填充平均速率,单位:秒。
  • 第 15 行 :获得令牌桶前缀,request_rate_limiter.${id}
  • 第 18 行 :获得令牌桶键数组 :
    • request_rate_limiter.${id}.tokens :令牌桶剩余令牌数。
    • request_rate_limiter.${id}.timestamp :令牌桶最后填充令牌时间,单位:秒。
  • 第 21 至 22 行 :获得 Lua 脚本参数 :

    • 第一个参数 :replenishRate
    • 第二个参数 :burstCapacity
    • 第三个参数 :得到从 1970-01-01 00:00:00 开始的秒数。为什么在 Java 代码里获取,而不使用 Lua 在 Reids 里获取

      FROM 《亿级流量网站架构核心技术》
      因为 Redis 的限制( Lua中有写操作不能使用带随机性质的读操作,如TIME )不能在 Redis Lua中 使用 TIME 获取时间戳,因此只好从应用获取然后传入,在某些极端情况下(机器时钟不准的情况下),限流会存在一些小问题。

      • 涛哥这本书非常不错,推荐购买。
    • 第四个参数 :消耗令牌数量,默认 1 。

  • 第 24 行 :调用 ReactiveRedisTemplate#execute(RedisScript<T>, List<K>, List<?>) 方法,执行 Redis Lua 脚本,获取令牌。返回结果为 [是否获取令牌成功, 剩余令牌数] ,其中,1 代表获取令牌成功0 代表令牌获取失败

  • 第 25 行 :当 Redis Lua 脚本过程中发生异常,忽略异常,返回 Flux.just(Arrays.asList(1L, -1L)) ,即认为获取令牌成功。为什么?在 Redis 发生故障时,我们不希望限流器对 Reids 是强依赖,并且 Redis 发生故障的概率本身就很低。

    We don’t want a hard dependency on Redis to allow traffic.
    Make sure to set an alert so you know if this is happening too much. Stripe’s observed failure rate is 0.01%.

  • 第 30 至 33 行 :调用 Flux#reduce(A, BiFunction<A, ? super T, A>) 方法,将 Flux<List<Long>> 转换成 Mono<List<Long>> 。因为 ReactiveRedisTemplate#execute(RedisScript<T>, List<K>, List<?>) 方法的执行结果为 Flux ( 多次 ),实际在当前场景里,自行 Redis Lua 脚本只会返回一次数组,所以转换成 Mono (一次)。

  • 第 35 至 45 行 :调用 Mono#map(Function<? super T, ? extends R>) 方法,将 Mono<List<Long>> => Mono<Response>
  • 第 47 至 55 行 :当【第 15 至 24 行】代码部分执行发生异常时,例如 Redis 挂了,返回 Flux.just(Arrays.asList(1L, -1L)) ,即认为获取令牌成功

5.3 Redis Lua 脚本

META-INF/scripts/request_rate_limiter.lua ,Redis Lua 脚本,实现基于令牌桶算法实现限流。代码如下 :

  1: local tokens_key = KEYS[1]
  2: local timestamp_key = KEYS[2]
  3:
  4: local rate = tonumber(ARGV[1])
  5: local capacity = tonumber(ARGV[2])
  6: local now = tonumber(ARGV[3])
  7: local requested = tonumber(ARGV[4])
  8:
  9: local fill_time = capacity/rate
 10: local ttl = math.floor(fill_time*2)
 11:
 12: local last_tokens = tonumber(redis.call("get", tokens_key))
 13: if last_tokens == nil then
 14:   last_tokens = capacity
 15: end
 16:
 17: local last_refreshed = tonumber(redis.call("get", timestamp_key))
 18: if last_refreshed == nil then
 19:   last_refreshed = 0
 20: end
 21:
 22: local delta = math.max(0, now-last_refreshed)
 23: local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
 24: local allowed = filled_tokens >= requested
 25: local new_tokens = filled_tokens
 26: local allowed_num = 0
 27: if allowed then
 28:   new_tokens = filled_tokens - requested
 29:   allowed_num = 1
 30: end
 31:
 32: redis.call("setex", tokens_key, ttl, new_tokens)
 33: redis.call("setex", timestamp_key, ttl, now)
 34:
 35: return { allowed_num, new_tokens }
  • 第 1 至 2 行 :KEYS 方法参数 :
    • 第一个参数 :request_rate_limiter.${id}.tokens ,令牌桶剩余令牌数。
    • 第二个参数 :request_rate_limiter.${id}.timestamp ,令牌桶最后填充令牌时间,单位:秒。
  • 第 4 至 7 行 :ARGV 方法参数 :

    • 第一个参数 :replenishRate
    • 第二个参数 :burstCapacity
    • 第三个参数 :得到从 1970-01-01 00:00:00 开始的秒数。
    • 第四个参数 :消耗令牌数量,默认 1 。
  • 第 9 行 :计算令牌桶填充令牌需要多久时间,单位:秒。

  • 第 10 行 :计算 request_rate_limiter.${id}.tokens / request_rate_limiter.${id}.timestampttl* 2 保证时间充足。
  • 第 12 至 20 行 :调用 get 命令,获得令牌桶剩余令牌数( last_tokens ) ,令牌桶最后填充令牌时间(last_refreshed) 。
  • 第 22 至 23 行 :填充令牌,计算的令牌桶剩余令牌数( filled_tokens )。填充不超过令牌桶令牌上限
  • 第 24 至 30 行 :获取令牌是否成功。

    • 成功,令牌桶剩余令牌数(new_tokens) 消耗令牌数( requested ),并设置获取成功( allowed_num = 1 ) 。
    • 失败,设置获取失败( allowed_num = 0 ) 。
  • 第 32 至 33 行 :设置令牌桶剩余令牌数( new_tokens ) ,令牌桶最后填充令牌时间(now) 。

  • 第 35 行 :返回数组结果,[是否获取令牌成功, 剩余令牌数]

Redis Lua 脚本不会有并发问题么

FROM 《亿级流量网站架构核心技术》
因 Redis 是单线程模型,因此是线程安全的。

赞(0) 打赏

如未加特殊说明,此网站文章均为原创,转载必须注明出处。Java 技术驿站 » Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.10) 之 RequestRateLimiterGatewayFilterFactory 请求限流
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

Java 技术驿站 | 致力打造 Java 精品博客

联系作者优质文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏