高并发系统限流操作之令牌桶实现可变TPS控制

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】

文章首发于:clawhub.club


年前有个需求,批量请求供应商API,要有限流操作,并支持TPS与并发数可配置,那时候简单的查了查资料,任务结束就过去了,最近又有个限流的小需求,所以又翻出了以前的代码。

本次简单记录一下令牌桶的实现:

令牌桶算法的原理是系统会以一个恒定的速度往桶里放入令牌,而如果请求需要被处理,则需要先从桶里获取一个令牌,当桶里没有令牌可取时,则拒绝服务。

2019102910041\_1.png

令牌桶算法.png

实现思路:
用LinkedBlockingQueue作为装令牌的桶,Executors.newSingleThreadScheduledExecutor()作为定时器定时将令牌放入桶中,使用构建者模式的代码风格。忘了以前在哪抄的了,就这样吧。

贴上核心代码:

    import java.util.concurrent.Executors;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;

    /**
     * TokenBucket<br>
     */
    public class TokenBucket {

        /**
         * 每秒最多请求数量
         */
        private int maxFlowRate;

        /**
         * 每秒平均请求数量
         */
        private int avgFlowRate;

        /**
         * 队列来缓存桶数量
         */
        private LinkedBlockingQueue<Byte> tokenQueue;

        /**
         * 由定时任务持续生成令牌。这样的问题在于会极大的消耗系统资源,如,某接口需要分别对每个用户做访问频率限制。
         * 假设系统中存在6W用户,则至多需要开启6W个定时任务来维持每个桶中的令牌数,这样的开销是巨大的。
         * 可以做成延迟计算的形式,每次请求令牌的时候,看当前时间是否晚与下一次生成令牌的时间,计算该段时间的令牌数,
         * 加入令牌桶,更新数据。
         */
        private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();

        /**
         * The Mutex do not use directly.
         */
        private volatile Object mutexDoNotUseDirectly = new Object();
        /**
         * The Is start.
         */
        private volatile boolean isStart = false;

        /**
         * The constant A_CHAR.
         */
        private static final Byte A_CHAR = 'a';

        /**
         * Instantiates a new Token bucket.
         */
        private TokenBucket() {
        }

        /**
         * New builder token bucket.
         *
         * @return the token bucket
         */
        public static TokenBucket newBuilder() {
            return new TokenBucket();
        }

        /**
         * 每秒内最大请求数量设置
         *
         * @param maxFlowRate 每秒内最大请求数量
         * @return 当前令牌同
         */
        public TokenBucket maxFlowRate(int maxFlowRate) {
            this.maxFlowRate = maxFlowRate;
            return this;
        }

        /**
         * 每秒平均请求数量设置
         *
         * @param avgFlowRate 每秒平均请求数量
         * @return 当前令牌同
         */
        public TokenBucket avgFlowRate(int avgFlowRate) {
            this.avgFlowRate = avgFlowRate;
            return this;
        }

        /**
         * 构造者模式
         *
         * @return the token bucket
         */
        public TokenBucket build() {
            //初始化
            init();
            //返回当前对象
            return this;
        }

        /**
         * 初始化
         */
        private void init() {
            //初始化桶队列大小
            if (maxFlowRate > 0) {
                tokenQueue = new LinkedBlockingQueue<>(maxFlowRate);
            }
            //初始化令牌生产者
            TokenProducer tokenProducer = new TokenProducer(avgFlowRate, this);
            //每秒执行一次增加令牌操作
            scheduledExecutorService.scheduleAtFixedRate(tokenProducer, 0, 1, TimeUnit.SECONDS);
            //系统启动
            isStart = true;

        }

        /**
         * 停止任务
         */
        public void stop() {
            isStart = false;
            scheduledExecutorService.shutdown();
        }

        /**
         * 查看任务是否执行
         *
         * @return the boolean
         */
        public boolean isStarted() {
            return isStart;
        }

        /**
         * 增加令牌
         *
         * @param tokenNum the token num
         */
        private void addTokens(Integer tokenNum) {
            // 若是桶已经满了,就不再家如新的令牌
            for (int i = 0; i < tokenNum; i++) {
                tokenQueue.offer(A_CHAR);
            }
        }

        /**
         * 获取令牌
         * <p>
         * true:获取到1个令牌,非阻塞
         * <p>
         * false:未获取到令牌,非阻塞
         *
         * @return boolean
         */
        public boolean tryAcquire() {
            synchronized (mutexDoNotUseDirectly) {
                // 否存在足够的桶数量
                if (tokenQueue.size() > 0) {
                    //队列不为空时返回队首值并移除,队列为空时返回null。非阻塞立即返回。
                    Byte poll = tokenQueue.poll();
                    if (poll != null) {
                        //获取到令牌
                        return true;
                    }
                }
            }
            //未获取到令牌
            return false;
        }

        /**
         * 令牌生产者 <br>
         */
        private class TokenProducer implements Runnable {

            /**
             * 每次加入令牌的数量
             */
            private int tokenNum;
            /**
             * 当前令牌桶
             */
            private TokenBucket tokenBucket;

            /**
             * 令牌生产者构造方法
             *
             * @param tokenNum    每次加入令牌的数量
             * @param tokenBucket 当前令牌桶
             */
            private TokenProducer(int tokenNum, TokenBucket tokenBucket) {
                this.tokenNum = tokenNum;
                this.tokenBucket = tokenBucket;
            }

            @Override
            public void run() {
                //增加令牌
                tokenBucket.addTokens(tokenNum);
            }
        }

    }

来源:https://www.jianshu.com/p/347ea7f881f8

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » 高并发系统限流操作之令牌桶实现可变TPS控制

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏