Spring Cloud 源码学习之 Hystrix 请求缓存

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】
免费领取 2000+ 道 Java 面试题

欢迎访问陈同学博客原文
本文学习了 Hystrix 请求缓存机制。

场景

先用一个小场景演示下请求缓存。

服务A 查询一页数据,共10条,每条都有一个orgId字段,需要根据orgId向 服务B 查询orgName。10条数据中orgId有8条相同,剩余2条相同。

下面写下伪代码:

方式一:循环10次

    for (org : 10条数据) {
        org.setOrgName(向服务B获取orgName);
    }

服务间调用,内网调用,走HTTP的话,即使每个请求50-100ms,10个请求也有0.5到1s,耗时非常久。

方式二:人工缓存

    Map<String, String> organizations = new HashMap<>(10);
    for (org : 10条数据) {
        if (organizations.containsKey(org.getOrgId)) {
            // 从缓存中读取
            org.setOrgName(organizations.get(org.getOrgId));
        } else {
            // 远程调用B服务
            org.setOrgName(向服务B获取orgName);
            // 加入缓存
            organizations.put(org.getOrgId, org.getOrgName);
        }
    }

这样只需要调用2次B服务,耗时在100-200毫秒之间,性能提升5倍。但这样做真的好吗?

微服务中,服务之间的依赖非常多,如果每个方法都自行处理缓存的话,应用中可以想象有多少累赘的缓存代码。

方式三:自动缓存

这属于本文的主题,在请求生命周期内,无论是当前线程,还是其他线程,只要请求相同的数据,就自动做缓存,不侵入业务代码。

ReplaySubject

自动缓存的实现方式有多种,这里介绍 Hystrix 的实现方式。Hystrix 使用了 RxJava 中的 ReplaySubject。

replay 译为重放,Subject 是个合体工具,既可以做数据发射器(被观察者、Observable),也可以做数据消费者(观察者、Observer)。

看个小例子就明白:

    @Test
    public void replaySubject() {
        ReplaySubject<Integer> replaySubject = ReplaySubject.create();
        replaySubject.subscribe(v -> System.out.println("订阅者1:" + v));
        replaySubject.onNext(1);
        replaySubject.onNext(2);

        replaySubject.subscribe(v -> System.out.println("订阅者2:" + v));
        replaySubject.onNext(3);

        replaySubject.subscribe(v -> System.out.println("订阅者3:" + v));
    }

输出结果(换行由手工添加):

    订阅者1:1
    订阅者1:2

    订阅者2:1
    订阅者2:2

    订阅者1:3
    订阅者2:3

    订阅者3:1
    订阅者3:2
    订阅者3:3

可以看出,无论是 replaySubject 多久前发射的数据,新的订阅者都可以收到所有数据。类比一下:一位大V,提供订阅服务,任何人任何时候订阅,大V都会把以前的所有资料发你一份。

请求缓存用的就是 ReplaySubject 这个特性,如果请求相同数据,就把原先的结果发你一份

请求缓存的实现

Spring Cloud 源码学习之 Hystrix 工作原理 一文中,有 Hystrix 的全流程源码介绍。

这是AbstractCommand.toObservable()中关于请求缓存的源码。请求缓存有2个条件,一是启用了请求缓存,二是有cacheKey。

    public Observable<R> toObservable() {
        final AbstractCommand<R> _cmd = this;
        ...
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                ...
                final boolean requestCacheEnabled = isRequestCachingEnabled();
                final String cacheKey = getCacheKey();

                // 启用了requestCache, 则尝试从缓存中获取
                if (requestCacheEnabled) {
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                    if (fromCache != null) {
                        isResponseFromCache = true;
                        // 从缓存中获取数据
                        return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                    }
                }

                Observable<R> hystrixObservable =
                        Observable.defer(applyHystrixSemantics)
                                .map(wrapWithAllOnNextHooks);

                Observable<R> afterCache;

                // 启用缓存而且有cacheKey
                if (requestCacheEnabled && cacheKey != null) {
                    // 使用HystrixCachedObservable来包装Obervable,并且添加到requestCache中
                    HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                    HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                    ...
                    afterCache = toCache.toObservable();
                }
                ...
            }
        });
    }

整个逻辑还是非常简单的,在启用缓存的前提后,有缓存则读缓存,没缓存则缓存结果供下次使用。

再看下HystrixRequestCache,用于缓存的工具。

    Cache that is scoped to the current request as managed by HystrixRequestVariableDefault.
    This is used for short-lived caching of HystrixCommand instances to allow de-duping of command executions within a request.

    缓存仅在请求范围内使用,主要用途是减少HystrixCommand实例的执行次数(缓存结果后执行次数自然少了)

HystrixRequestCache实例的存储是由自身的静态变量搞定,未提供public的构造器,通过 getInstance() 的静态方法来获取与cacheKey对应的实例。

    public class HystrixRequestCache {
        private final static ConcurrentHashMap<RequestCacheKey, HystrixRequestCache> caches = new ConcurrentHashMap<RequestCacheKey, HystrixRequestCache>();
    }

    public static HystrixRequestCache getInstance(HystrixCommandKey key, HystrixConcurrencyStrategy concurrencyStrategy) {
        return getInstance(new RequestCacheKey(key, concurrencyStrategy), concurrencyStrategy);
    }

存储HystrixCachedObservable的数据结构是ConcurrentHashMap,cacheKey作为key,HystrixCachedObservable为value。

    private static final HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>> requestVariableForCache = new HystrixRequestVariableHolder<ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>>(new HystrixRequestVariableLifecycle<ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>>() {

        @Override
        public ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>> initialValue() {
            return new ConcurrentHashMap<ValueCacheKey, HystrixCachedObservable<?>>();
        }
        ...
    });

再看看缓存的结果HystrixCachedObservable,这个就用到了上面提过的ReplaySubject。将一个普通的Observable包装成了HystrixCachedObservable。

    public class HystrixCachedObservable<R> {
        protected final Subscription originalSubscription;
        protected final Observable<R> cachedObservable;
        private volatile int outstandingSubscriptions = 0;

        protected HystrixCachedObservable(final Observable<R> originalObservable) {
            ReplaySubject<R> replaySubject = ReplaySubject.create();
            // 订阅普通的Observable, 拿到其中的数据
            this.originalSubscription = originalObservable
                    .subscribe(replaySubject);

            this.cachedObservable = replaySubject...
        }
        ...

        // 将cachedObservable作为数据源提供出去, 完成普通Observable向ReplaySubject的转换
        public Observable<R> toObservable() {
            return cachedObservable;
        }
    }

因此,command执行一次拿到结果来自于ReplaySubject。后续无论有多少次订阅(即执行command),都把已有的结果推送一次,从而达到缓存请求结果的效果。

如何使用缓存的结果

以HystrixCommand的 queue() 方法为例:

    public Future<R> queue() {
        // 调用 toObservable 拿到数据源
        final Future<R> delegate = toObservable().toBlocking().toFuture();
        ...
     }   

在toFuture()中会订阅这个数据源:

    public static <T> Future<T> toFuture(Observable<? extends T> that) {

        final CountDownLatch finished = new CountDownLatch(1);
        final AtomicReference<T> value = new AtomicReference<T>();
        final AtomicReference<Throwable> error = new AtomicReference<Throwable>();

        // 首先,通过single()确保从Observable中拿到单个结果. 然后订阅数据源
        @SuppressWarnings("unchecked")
        final Subscription s = ((Observable<T>)that).single().subscribe(new Subscriber<T>() {

            @Override
            public void onNext(T v) {
                // 拿到执行的结果后放到AtomicReference中
                value.set(v);
            }
        });

        return new Future<T>() {
            private volatile boolean cancelled;

            // 返回执行结果
            @Override
            public T get() throws InterruptedException, ExecutionException {
                finished.await();
                return getValue();
            }
        };
    }

由于toObservable()拿到的是一个ReplaySubject,下次命令再次执行时,订阅ReplaySubject后,可以直接拿到之前已有的结果。

缓存的生命周期

缓存是request scope,在同一个请求范围内,所有线程都可以使用相同cacheKey已缓存的结果。

为什么是request scope呢?在数据动态变化的情况下,即使参数相同,多次请求访问时,缓存也没有意义。所以只在同一个请求下使用。

下面是个小例子:

    public class HystrixCommandCacheTest extends HystrixCommand<String> {
        private final String value;

        public HystrixCommandCacheTest(String value) {
            super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
            this.value = value;
        }

        // 将 value 参数作为key, 模拟请求的参数
        @Override
        protected String getCacheKey() {
            return value;
        }

        @Override
        protected String run() throws Exception {
            return "hello," + value;
        }

        public static void main(String[] args) {
            // 第一个请求环境
            HystrixRequestContext context1 = HystrixRequestContext.initializeContext();
            HystrixCommandCacheTest cmd1 = new HystrixCommandCacheTest("kitty");
            System.out.println("cmd1结果:" + cmd1.execute() + ";使用缓存:" + cmd1.isResponseFromCache());

            // 模拟10个相同请求参数的命令执行
            for (int i = 0; i < 10; i++) {
                HystrixCommandCacheTest tempCmd = new HystrixCommandCacheTest("kitty");
                System.out.println("第" + i + " 次执行:" + tempCmd.execute() + ";使用缓存:" + tempCmd.isResponseFromCache());
            }
            context1.shutdown();

            // 第二个请求环境
            HystrixRequestContext context2 = HystrixRequestContext.initializeContext();
            HystrixCommandCacheTest cmd2 = new HystrixCommandCacheTest("kitty");
            System.out.println("cmd2结果:" + cmd2.execute() + ";使用缓存:" + cmd2.isResponseFromCache());
        }
    }

输出结果如下:

    cmd1结果:hello,kitty;使用缓存:false
    第0 次执行:hello,kitty;使用缓存:true
    第1 次执行:hello,kitty;使用缓存:true
    第2 次执行:hello,kitty;使用缓存:true
    第3 次执行:hello,kitty;使用缓存:true
    第4 次执行:hello,kitty;使用缓存:true
    第5 次执行:hello,kitty;使用缓存:true
    第6 次执行:hello,kitty;使用缓存:true
    第7 次执行:hello,kitty;使用缓存:true
    第8 次执行:hello,kitty;使用缓存:true
    第9 次执行:hello,kitty;使用缓存:true
    cmd2结果:hello,kitty;使用缓存:false

第一次没有缓存,后面10次执行都用了第一次的执行结果。第二次请求时没有缓冲可用。

小结

利用缓存可以极大的提升性能,“天下武功,唯快不破”。

如何练就一门快功夫呢?方式有多种,举两个小例子:

  • 速度再快比不上近水楼台,直接用应用缓存肯定比网络通讯获取数据快得多
  • 利用各类缓存”神器”,比如Redis,人家就是快。

为了提升性能,从用户发起请求的那一刻起,链路上的各类角色就在各显神通了,例如:

  • 浏览器缓存静态资源;提供LocalStorage这种缓存结构,单页面应用可直接使用
  • 请求进入网络后,利用CDN,优先从地理位置较近的地方拉取资源
  • 请求到达目表网络后,可以从代理中读取缓存数据(如nginx缓存)
  • 请求达到应用后,应用直接从内存中获取数据,如:Map、Guava等
  • 分布式缓存,例如使用Redis提供缓存,减少对DB的直接访问

欢迎关注陈同学的公众号,一起学习,一起成长

20191123100237\_1.png


来源:http://ddrv.cn/a/88268

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » Spring Cloud 源码学习之 Hystrix 请求缓存

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏