Spring Cloud 源码学习之 HystrixRequestContext

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】
免费领取 2000+ 道 Java 面试题

欢迎访问陈同学博客原文

前面写过 Spring Cloud 之 Hystrix 跨线程传递数据,写的是跨线程传递数据的表象,其实就是调试了一把,记录一下。今天写下核心:HystrixRequestContext。

HystrixRequestContext 表示 request level的context,用于存储 request level 的数据。与此相对的是 thread level的数据,一般用ThreadLocal来处理。

例举一个场景:Tomcat 工作线程拿到request后,将任务(例如Runnable)交给线程池处理,或创建一个子线程来处理。Tomcat工作线程自然可以获取request数据(利用RequestContextHolder存储在当前线程中呢),但如何在线程池线程中或子线程中获取到request数据呢?

这就是request level context的意义,无论是哪个线程在处理,只要在当前请求的生命周期内,都可以拿到request的数据。

小例子

先看个Demo,使用方式和ThreadLocal基本一模一样。

    @Test
    public void test() throws InterruptedException {
        // 在当前线程下创建一个HystrixRequestContext对象
        HystrixRequestContext.initializeContext();
        // 创建HystrixRequestVariableDefault作为检索数据的key
        final HystrixRequestVariableDefault<String> variableDefault = new HystrixRequestVariableDefault<>();
        // 将<HystrixRequestVariableDefault,kitty>存储到当前线程的HystrixRequestContext中
        variableDefault.set("kitty");
        // HystrixContextRunnable 是核心, 下面将分析源码:
        HystrixContextRunnable runnable =
                new HystrixContextRunnable(() -> System.out.println(variableDefault.get()));
        // 用子线程执行任务
        new Thread(runnable, "subThread").start();
    }

结果自然是输出了: kitty。上面代码并没有显示的将 kitty 从 main线程传递到子线程,也没有利用InheritableThreadLocal,原理且看下文。

HystrixRequestVariable

20191123100238\_1.png

HystrixRequestVariable接口表示request level的属性,仅提供了get()来获取属性。

    public interface HystrixRequestVariable<T> extends HystrixRequestVariableLifecycle<T> {
        public T get();
    }

HystrixRequestVariableDefault是实现类,和ThreadLocal一样,提供了 T get()set(T value) 两个工具方法,在使用时HystrixRequestVariableDefault都是 static final 类型(方便使用),充当一个检索数据的 key

下面是它的get/set 方法:

    // 拿到当前线程的存储结构, 用自己作为key, 存储实际的数据。
    public void set(T value) {
        HystrixRequestContext.getContextForCurrentThread().state.put(this, new LazyInitializer<T>(this, value));
    }

    public T get() {
        // 拿到当前线程的存储结构, 以自己作为key, 来检索数据
        ConcurrentHashMap<HystrixRequestVariableDefault<?>, LazyInitializer<?>> variableMap = HystrixRequestContext.getContextForCurrentThread().state;

        LazyInitializer<?> v = variableMap.get(this);
        if (v != null) {
            return (T) v.get();
        }
        ...
    }

HystrixRequestContext

真正存储数据的是HystrixRequestContext,和ThreadLocal一样,存储数据的不是ThreadLocal,而是Thread本身的数据结构。下面看看它的实现:

    public class HystrixRequestContext implements Closeable {
        // 利用ThreadLocal, 每个线程各有一份HystrixRequestContext,当然,前提是调用了initializeContext()进行初始化
        private static ThreadLocal<HystrixRequestContext> requestVariables = new ThreadLocal<HystrixRequestContext>();

        // 创建一个HystrixRequestContext,并与当前线程关联
        public static HystrixRequestContext initializeContext() {
            HystrixRequestContext state = new HystrixRequestContext();
            requestVariables.set(state);
            return state;
        }

        // 获取当前线程关联的HystrixRequestContext, 用的是ThreadLocal
        public static HystrixRequestContext getContextForCurrentThread() {
            HystrixRequestContext context = requestVariables.get();
            if (context != null && context.state != null) {
                return context;
            } else {
                return null;
            }
        }

        // 为当前线程设置一个已存在的HystrixRequestContext
        public static void setContextOnCurrentThread(HystrixRequestContext state) {
            requestVariables.set(state);
        }

        // 这句单独说 (注意:实际类型不是T,我简化了)
        ConcurrentHashMap<HystrixRequestVariableDefault<?>, T value> state = new ...
    }

ConcurrentHashMap<HystrixRequestVariableDefault<?>, T value> state

这是实际的存储结构,每个线程关联一个HystrixRequestContext,每个HystrixRequestContext有个Map结构存储数据,key就是HystrixRequestVariableDefault。

因此, 初始化 HystrixRequestVariableDefault v1, v2 后,在当前线程执行:

    v1.set("1");
    v2.set("2");

那当前线程对应的HystrixRequestContext存储的数据为:

    <v1, "1">
    <v2, "2">

如何实现request level context?

实现的秘密就在HystrixContextRunnable和HystrixContextCallable中,这里以前者为例:

    // HystrixContextRunnable是个Runnable,一个可用于执行的任务
    public class HystrixContextRunnable implements Runnable {

        private final Callable<Void> actual;
        private final HystrixRequestContext parentThreadState;

        public HystrixContextRunnable(Runnable actual) {
            this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual);
        }

        public HystrixContextRunnable(HystrixConcurrencyStrategy concurrencyStrategy, final Runnable actual) {
            // 获取当前线程的HystrixRequestContext(如文首的main线程)
            this(concurrencyStrategy, HystrixRequestContext.getContextForCurrentThread(), actual);
        }

        // 关键的构造器
        public HystrixContextRunnable(final HystrixConcurrencyStrategy concurrencyStrategy, final HystrixRequestContext hystrixRequestContext, final Runnable actual) {

            // 将原始任务Runnable包装成Callable, 创建了一个新的callable
            this.actual = concurrencyStrategy.wrapCallable(new Callable<Void>() {
                @Override
                public Void call() throws Exception {
                    actual.run();
                    return null;
                }
            });
            // 存储当前线程的hystrixRequestContext
            this.parentThreadState = hystrixRequestContext;
        }

        @Override
        public void run() {
            // 运行实际的Runnable之前先保存当前线程已有的HystrixRequestContext
            HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
            try {
                // 设置当前线程的HystrixRequestContext,来自上一级线程,因此两个线程是同一个HystrixRequestContext
                HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
                try {
                    actual.call();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            } finally {
                // 还原当前线程的HystrixRequestContext
                HystrixRequestContext.setContextOnCurrentThread(existingState);
            }
        }
    }

so,Hystrix 的思路是包装Runnable,在执行实际的任务之前,先拿当前线程的HystrixRequestContext初始化实际执行任务的线程的HystrixRequestContext。

因此,回顾下文首的小例子,伪代码变成:

    1.main 线程创建了HystrixRequestContext context, HystrixRequestVariableDefault key
    2.main 线程在context的Map中存储<key, "kitty">
    3.main 线程 HystrixContextRunnable task, 并将context作为它的属性
    4.main 线程创建 subThread, 用它来执行task (将Task交给线程池的效果是一样的)
    5.subThread 执行HystrixContextRunnable.run()时, 先用task的context来初始化subThread
    此时,main线程和subThread线程拥有的是同一个context, 因此在两个线程中执行 key.get() 拿到的都是"kitty"

因此,在不同线程之间传递信息的载体是HystrixContextRunnable,也就是任务本身。独立个体之间需要建立联系,总得有个桥梁,现实中的推荐信和HystrixContextRunnable的作用是一样的。

HystrixRequestVariableDefault 和ThreadLocal的一些区别

  • 它使用前需要用 HystrixRequestContext.initializeContext() 进行初始化
  • 它结束时需使用 **HystrixRequestContext.shutdown()**进行清理
  • 它有一个生命周期方法 shutdown()用来清理资源
  • 它会以传引用的方式(线程之间使用的是相同的HystrixRequestVariables)拷贝到下一个线程,主要通过HystrixRequestContext.getContextForCurrentThread()和HystrixRequestContext.setContextOnCurrentThread()两个方法
  • 父线程调用shutdown时,子线程的HystrixRequestVariables也会被清理(因为就是一个对象,传的是引用)。

因Thread中的存储结构是:ThreadLocal.ThreadLocalMap threadLocals,threadLocals在创建线程时会初始化。

而HystrixRequestContext并不是每个线程都需要的,因此需要根据需要自行进行初始化。

小结

本文抽象出来就是 跨主体信息传递,跨主体算是我YY的名词。Hystrix 传递信息的思路是值得借鉴的,也许在某些场景下,我们需要设计一个在特定范围内传递信息的模型。

YY个无聊的例子:一场神奇运动会,N个运动员在田径场进行接力赛,每分钟换一个运动员,比赛开始时点燃一柱香,这根香是接力棒,香上贴着一个数字,运动员拿到香后要记住数字。

需求是:运动员之间不能交流,在香燃尽时,参与接力赛的运动员都要知道这个数字。

这个例子就是在N个主体之间传递信息:

  • 生命周期(scope):香开始点着到燃尽的时间内
  • 主体:N个运动员
  • 信息载体:香本身。如果运行员将香传给下一个人是Task,那香就属于Task属性的一部分。

现实生活中的信息载体又有哪些呢?文字?语言?空气?…

附录

HystrixContextCallable

HystrixContextCallable做的事情和HystrixContextRunnable是一样的,只不过它实现了Callable。

    public class HystrixContextCallable<K> implements Callable<K> {

        private final Callable<K> actual;
        private final HystrixRequestContext parentThreadState;

        public HystrixContextCallable(Callable<K> actual) {
            this(HystrixPlugins.getInstance().getConcurrencyStrategy(), actual);
        }

        public HystrixContextCallable(HystrixConcurrencyStrategy concurrencyStrategy, Callable<K> actual) {
            this.actual = concurrencyStrategy.wrapCallable(actual);
            this.parentThreadState = HystrixRequestContext.getContextForCurrentThread();
        }

        @Override
        public K call() throws Exception {
            HystrixRequestContext existingState = HystrixRequestContext.getContextForCurrentThread();
            try {
                // set the state of this thread to that of its parent
                HystrixRequestContext.setContextOnCurrentThread(parentThreadState);
                // execute actual Callable with the state of the parent
                return actual.call();
            } finally {
                // restore this thread back to its original state
                HystrixRequestContext.setContextOnCurrentThread(existingState);
            }
        }

    }

欢迎关注陈同学的公众号,一起学习,一起成长

20191123100238\_2.png


来源:http://ddrv.cn/a/88268

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » Spring Cloud 源码学习之 HystrixRequestContext

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏