Spring Cloud Gateway 自定义限流

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】
免费领取10G资料包与项目实战视频资料

在使用Spring Cloud Gateway限流功能时官网提供的限流中的流速以及桶容量是针对所有策略的,意思是只要配置上那么所有的都是一样的,不能根据不同的类型配置不同的参数,例如:A渠道、B渠道,若配置上replenishRate(流速)和burstCapacity(令牌桶容量),那么不管是A渠道还是B渠道都是这个值,如果修改那么对应的其他渠道也会修改,如何能做到分为不同渠道进行限流呢,A渠道replenishRate:10,burstCapacity:100,B渠道:replenishRate:20,burstCapacity:1000,下面开始分析:
限流方式采用的redis,底层使用的redis的lua脚本实现的,具体可以自行查阅,不做讲解,默认限流类:RedisRateLimiter,本文也是仿照这个进行重写的。
本文是参考“Spring Cloud Gateway 结合配置中心限流”进行简化改造,通过配置的方式进行实现。

引入依赖

    <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
    </dependency>

自定义限流类

参照RedisRateLimiter进行自定义限流类SystemRedisRateLimiter用于渠道限流方式,实现代码如下:

    /** * @author : Erick * @version : 1.0 * @Description : * @time :2018-12-1 */
    public class SystemRedisRateLimiter extends AbstractRateLimiter<SystemRedisRateLimiter.Config> implements ApplicationContextAware {
        //这些变量全部从RedisRateLimiter复制的,都会用到。
        public static final String REPLENISH_RATE_KEY = "replenishRate";

        public static final String BURST_CAPACITY_KEY = "burstCapacity";

        public static final String CONFIGURATION_PROPERTY_NAME = "sys-redis-rate-limiter";
        public static final String REDIS_SCRIPT_NAME = "redisRequestRateLimiterScript";
        public static final String REMAINING_HEADER = "X-RateLimit-Remaining";
        public static final String REPLENISH_RATE_HEADER = "X-RateLimit-Replenish-Rate";
        public static final String BURST_CAPACITY_HEADER = "X-RateLimit-Burst-Capacity";

        //处理速度
        private static final String DEFAULT_REPLENISHRATE="default.replenishRate";
        //容量
        private static final String DEFAULT_BURSTCAPACITY="default.burstCapacity";

        private ReactiveRedisTemplate<String, String> redisTemplate;
        private RedisScript<List<Long>> script;
        private AtomicBoolean initialized = new AtomicBoolean(false);

        private String remainingHeader = REMAINING_HEADER;

        /** The name of the header that returns the replenish rate configuration. */
        private String replenishRateHeader = REPLENISH_RATE_HEADER;

        /** The name of the header that returns the burst capacity configuration. */
        private String burstCapacityHeader = BURST_CAPACITY_HEADER;

        private Config defaultConfig;

        public SystemRedisRateLimiter(ReactiveRedisTemplate<String, String> redisTemplate,
                                      RedisScript<List<Long>> script, Validator validator) {
            super(Config.class , CONFIGURATION_PROPERTY_NAME , validator);
            this.redisTemplate = redisTemplate;
            this.script = script;
            initialized.compareAndSet(false,true);
        }

        public SystemRedisRateLimiter(int defaultReplenishRate, int defaultBurstCapacity){
            super(Config.class , CONFIGURATION_PROPERTY_NAME , null);
            defaultConfig = new Config()
                    .setReplenishRate(defaultReplenishRate)
                    .setBurstCapacity(defaultBurstCapacity);

        }
        //具体限流实现,此处调用的是lua脚本
        @Override
        public Mono<RateLimiter.Response> isAllowed(String routeId, String id) {
            if (!this.initialized.get()) {
                throw new IllegalStateException("RedisRateLimiter is not initialized");
            }
            if (ObjectUtils.isEmpty(rateLimiterConf) ){
                throw new IllegalArgumentException("No Configuration found for route " + routeId);
            }
            //获取的是自定义的map
            Map<String , Integer> rateLimitMap = rateLimiterConf.getRateLimitMap();
            //缓存的key
            String replenishRateKey = routeId + "." + id + "." + REPLENISH_RATE_KEY;
            //若map中不存在则采用默认值,存在则取值。
            int replenishRate = ObjectUtils.isEmpty(rateLimitMap.get(replenishRateKey)) ? rateLimitMap.get(DEFAULT_REPLENISHRATE) : rateLimitMap.get(replenishRateKey);
            //容量key
            String burstCapacityKey = routeId + "." + id + "." + BURST_CAPACITY_KEY;
            //若map中不存在则采用默认值,存在则取值。
            int burstCapacity = ObjectUtils.isEmpty(rateLimitMap.get(burstCapacityKey)) ? rateLimitMap.get(DEFAULT_BURSTCAPACITY) : rateLimitMap.get(burstCapacityKey);

            try {
                List<String> keys = getKeys(id);

                List<String> scriptArgs = Arrays.asList(replenishRate + "", burstCapacity + "",
                        Instant.now().getEpochSecond() + "", "1");
                Flux<List<Long>> flux = this.redisTemplate.execute(this.script, keys, scriptArgs);

                return flux.onErrorResume(throwable -> Flux.just(Arrays.asList(1L, -1L)))
                        .reduce(new ArrayList<Long>(), (longs, l) -> {
                            longs.addAll(l);
                            return longs;
                        }) .map(results -> {
                            boolean allowed = results.get(0) == 1L;
                            Long tokensLeft = results.get(1);

                            RateLimiter.Response response = new RateLimiter.Response(allowed, getHeaders(replenishRate , burstCapacity , tokensLeft));

                            return response;
                        });
            } catch (Exception e) {
                e.printStackTrace();
            }

            return Mono.just(new RateLimiter.Response(true, getHeaders(replenishRate , burstCapacity , -1L)));
        }

        private RateLimiterConf rateLimiterConf;
        @Override
        public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
            this.rateLimiterConf = applicationContext.getBean(RateLimiterConf.class);
        }

        public HashMap<String, String> getHeaders(Integer replenishRate, Integer burstCapacity , Long tokensLeft) {
            HashMap<String, String> headers = new HashMap<>();
            headers.put(this.remainingHeader, tokensLeft.toString());
            headers.put(this.replenishRateHeader, String.valueOf(replenishRate));
            headers.put(this.burstCapacityHeader, String.valueOf(burstCapacity));
            return headers;
        }

        static List<String> getKeys(String id) {
            // use `{}` around keys to use Redis Key hash tags
            // this allows for using redis cluster

            // Make a unique key per user.
            //此处可以自定义redis前缀信息
            String prefix = "request_sys_rate_limiter.{" + id;

            // You need two Redis keys for Token Bucket.
            String tokenKey = prefix + "}.tokens";
            String timestampKey = prefix + "}.timestamp";
            return Arrays.asList(tokenKey, timestampKey);
        }

        @Validated
        public static class Config{
            @Min(1)
            private int replenishRate;
            @Min(1)
            private int burstCapacity = 1;

            public int getReplenishRate() {
                return replenishRate;
            }

            public Config setReplenishRate(int replenishRate) {
                this.replenishRate = replenishRate;
                return this;
            }

            public int getBurstCapacity() {
                return burstCapacity;
            }

            public Config setBurstCapacity(int burstCapacity) {
                this.burstCapacity = burstCapacity;
                return this;
            }

            @Override
            public String toString() {
                return "Config{" +
                        "replenishRate=" + replenishRate +
                        ", burstCapacity=" + burstCapacity +
                        '}';
            }
        }
    }

在继承AbstractRateLimiter泛型使用的是自定义类中的config:SystemRedisRateLimiter.Config

配置类

配置类主要用于初始化map参数。

    /** * @author : Erick * @version : 1.0 * @Description : * @time :2018-12-1 */
    @Component
    //使用配置文件的方式进行初始化
    @ConfigurationProperties(prefix = "ratelimiter-conf")
    public class RateLimiterConf {
        //处理速度
        private static final String DEFAULT_REPLENISHRATE="default.replenishRate";
        //容量
        private static final String DEFAULT_BURSTCAPACITY="default.burstCapacity";

        private Map<String , Integer> rateLimitMap = new ConcurrentHashMap<String , Integer>(){
            {
                put(DEFAULT_REPLENISHRATE , 10);
                put(DEFAULT_BURSTCAPACITY , 100);
            }
        };

        public Map<String, Integer> getRateLimitMap() {
            return rateLimitMap;
        }

        public void setRateLimitMap(Map<String, Integer> rateLimitMap) {
            this.rateLimitMap = rateLimitMap;
        }
    }

配置文件主要采用配置文件的方式进行初始化,若配置则进行添加,若没有配置则采用默认值。

配置文件

主要用于配置各个渠道的限流阀值,例如文章开头举例的A渠道和B渠道,配置如下:

    //与配置类RateLimiterConf保持一致
    ratelimiter-conf:
      #配置限流参数与RateLimiterConf类映射
      rateLimitMap:
        #格式为:routeid(gateway配置routes时指定的).系统名称.replenishRate(流速)/burstCapacity令牌桶大小
        service.A.replenishRate: 10
        service.A.burstCapacity: 100
        service.B.replenishRate: 20
        service.B.burstCapacity: 1000

到此配置限流相关的代码已经完成,需要在启动类和bootstrap.yml中进行配置才能够真正的使用。

启动类中声名

在启动类中声明使用的策略,指定自定义限流类。

    @Bean
     KeyResolver sysKeyResolver(){
          //从请求地址中截取sys值,进行限流。
         return exchange -> Mono.just(exchange.getRequest().getQueryParams().getFirst("sys"));
     }

    @Bean
    @Primary
    //使用自己定义的限流类
    SystemRedisRateLimiter systemRedisRateLimiter(
            ReactiveRedisTemplate<String, String> redisTemplate,
            @Qualifier(SystemRedisRateLimiter.REDIS_SCRIPT_NAME) RedisScript<List<Long>> script,
            Validator validator){
        return new SystemRedisRateLimiter(redisTemplate , script , validator);
    }

采用的是从请求地址中获取sys参数对应的值,当然也可以设置为其他值,再声名自定义的限流类。

配置限流

    spring:
      cloud:
        gateway:
          routes:
            - id: service //id名称需要与配置限流的保持一致
              uri: lb://serviceA
              predicates:
                - Path=/service/**/ //最后的/可以去掉,在本文中特意添加的如果去掉会把filters当成注释内容。
              filters:
                - name: RequestRateLimiter
                  args:
                    //需要与上边的方法名保持一致
                    rate-limiter: "#{@systemRedisRateLimiter}"
                    //需要与策略类的方法名保持一致。
                    key-resolver: "#{@sysKeyResolver}"

至此自定义限流代码全部完成,如有什么不妥支持请指正,同时也希望对其他人有帮助。实例代码已上传到码云可以点击查看。

20191123100328\_1.png


来源:http://ddrv.cn/a/88268

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » Spring Cloud Gateway 自定义限流

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏