十、Spring cloud服务短路(Hystrix)之源码解析

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】
免费领取 2000+ 道 Java 面试题
  • Spring Cloud Hystrix 源码解读
  • Nertflix Hystrix 源码解读
  • RxJava 基础

一、Spring Cloud Hystrix 源码解读

1、@EnableCircuitBreaker

(1)职责:
激活 Circuit Breaker

(2)调用链路:

        @EnableCircuitBreaker 
        <!-- 通过 EnableCircuitBreaker 注解上的注解 @Import(EnableCircuitBreakerImportSelector.class) 可知 -->
        -> EnableCircuitBreakerImportSelector 
        <!-- EnableCircuitBreakerImportSelector 继承 SpringFactoryImportSelector<EnableCircuitBreaker> ,通过探寻 发现,SpringFactoryImportSelector 类下的 selectImports() 方法中的 List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader .loadFactoryNames(this.annotationClass, this.beanClassLoader))); 可知,这里是以 EnableCircuitBreaker 全限定名为key,找到对应的默认实现。通过 搜索 org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker 找到 spring cloud 框架下的 spring.factories 文件中,找到了其默认实现: org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration -->
        -> HystrixCircuitBreakerConfiguration

2、HystrixCircuitBreakerConfiguration

(1)初始化组件

  • HystrixCommandAspect
  • HystrixShutdownHook
  • HasFeatures

二、Nertflix Hystrix 源码解读

1、HystrixCommandAspect

(1)依赖组件

  • MetaHolderFactory:生成拦截方法元信息
  • HystrixCommandFactory:生成 HystrixInvokable
  • HystrixInvokable
    • CommandCollapser
    • GenericObservableCommand
    • GenericCommand

2、Future 来实现超时熔断

    /**
     * 通过 {@link Future} 实现 服务熔断
     * @author 咸鱼
     * @date 2018/11/14 20:12
     */
    public class FutureCircuitBreakerDeo {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //初始化线程池
            ExecutorService executorService = Executors.newSingleThreadExecutor();

            RandomCommand randomCommand = new RandomCommand();

            Future<String> future = executorService.submit(() -> {
                //获取 run 方法计算结果
                return randomCommand.run();
            });

            String result = null;
            // 100 ms 超时时间
            try {
                result = future.get(100, TimeUnit.MILLISECONDS);
            } catch (TimeoutException e) {
                // fallback 方法调用
                result = randomCommand.fallback();
            }

            System.out.println(result);

            executorService.shutdown();
        }

        /**
         * 随即对象
         */
        private static final Random RANDOM = new Random();

        /**
         * 随机事件执行命令
         */
        static class RandomCommand implements Command<String> {
            @Override
            public String run() throws InterruptedException {
                long executeTime = RANDOM.nextInt(200);

                System.out.println("execute time : " + executeTime + "ms");

                Thread.sleep(executeTime);

                return "hello";
            }

            @Override
            public String fallback() {
                return "fallback";
            }
        }

        public static interface Command<T> {
            /**
             * 正常执行,并且返回结果
             * @return T
             */
            T run() throws InterruptedException;

            /**
             * 错误时,返回容错结果
             * @return T
             */
            T fallback();
        }
    }

三、RxJava 基础

1、单数据:Single API

    //仅能发布单个数据
            Single.just("Hello,World!")
                    //在I/O线程执行
                    .subscribeOn(Schedulers.io())
                    //订阅并且消费数据
                    .subscribe(RxJavaDemo::println);
                    Thread.sleep(100);

2、多数据:Observable API

    List<Integer> values = Arrays.asList(1,2,3,4,5,6,7,8);
            //发布多个数据
            Observable.from(values)
                    .subscribeOn(Schedulers.computation())
                    //订阅并且消费数据
                    .subscribe(RxJavaDemo::println);
            Thread.sleep(100);

3、使用标准 Reactive 模式:

    public static void demoStandardReactive() throws InterruptedException {
            List<Integer> values = Arrays.asList(1,2,3);
            //发布多个数据
            Observable.from(values)
                    .subscribeOn(Schedulers.newThread())
                    //subscribe(final Action1<? super T> onNext, final Action1<Throwable> onError, final Action0 onCompleted)
                    //接口 Action1:需实现call(T t)
                    //接口 Action0:需实现call()
                    .subscribe(
                            //参数一:消费数据
                            value -> {
                                if (value > 2) {
                                    throw new IllegalStateException("数据不容许大于2");
                                }
                                println("消费数据:" + value);
                            },
                            //参数二:当发生异常时,中断执行
                            e -> println("发生异常:" + e.getMessage()),
                            //参数三:当逻辑执行完毕时
                            () -> println("逻辑执行完毕"))
            ;
            //上面是异步执行,需要休眠等待其执行完毕
            Thread.sleep(100);
        }

来源:http://ddrv.cn/a/88268

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » 十、Spring cloud服务短路(Hystrix)之源码解析

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏