Hystrix 源码解析 —— 命令执行(三)之执行超时

摘要: 原创出处 http://www.iocoder.cn/Hystrix/command-execute-third-timeout/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Hystrix 1.5.X 版本


1. 概述

本文主要分享 Hystrix 命令执行(三)之执行超时

建议 :对 RxJava 已经有一定的了解的基础上阅读本文。

开启执行超时功能,需要配置 :

  • HystrixCommandProperties.executionTimeoutEnabled :执行命令超时功能开关。
    • 值 :Boolean
    • 默认值 :true
  • HystrixCommandProperties.executionTimeoutInMilliseconds :执行命令超时时长。
    • 值 :Integer
    • 单位 :毫秒
    • 默认值 :1000 毫秒

《Hystrix 源码解析 —— 命令执行(一)之正常执行逻辑》「4. #executeCommandAndObserve(…)」 中,#executeCommandAndObserve(...) 方法的第 75 行 lift(new HystrixObservableTimeoutOperator<R>(_cmd)) ,实现了对执行命令超时的监控。


推荐 Spring Cloud 书籍

推荐 Spring Cloud 视频

2. HystrixObservableTimeoutOperator

HystrixObservableTimeoutOperator 类,代码如下 :

  1: private static class HystrixObservableTimeoutOperator<R> implements Operator<R, R> {
  2:
  3:     final AbstractCommand<R> originalCommand;
  4:
  5:     public HystrixObservableTimeoutOperator(final AbstractCommand<R> originalCommand) {
  6:         this.originalCommand = originalCommand;
  7:     }
  8:
  9:     @Override
 10:     public Subscriber<? super R> call(final Subscriber<? super R> child) {
 11:         // 创建 订阅
 12:         final CompositeSubscription s = new CompositeSubscription();
 13:         // 添加 订阅
 14:         // if the child unsubscribes we unsubscribe our parent as well
 15:         child.add(s);
 16:
 17:         //capture the HystrixRequestContext upfront so that we can use it in the timeout thread later
 18:         final HystrixRequestContext hystrixRequestContext = HystrixRequestContext.getContextForCurrentThread();
 19:
 20:         TimerListener listener = new TimerListener() {
 21:
 22:             @Override
 23:             public void tick() {
 24:                 // if we can go from NOT_EXECUTED to TIMED_OUT then we do the timeout codepath
 25:                 // otherwise it means we lost a race and the run() execution completed or did not start
 26:                 if (originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.TIMED_OUT)) {
 27:                     // report timeout failure
 28:                     originalCommand.eventNotifier.markEvent(HystrixEventType.TIMEOUT, originalCommand.commandKey);
 29:
 30:                     // shut down the original request
 31:                     s.unsubscribe();
 32:
 33:                     final HystrixContextRunnable timeoutRunnable = new HystrixContextRunnable(originalCommand.concurrencyStrategy, hystrixRequestContext, new Runnable() {
 34:
 35:                         @Override
 36:                         public void run() {
 37:                             child.onError(new HystrixTimeoutException());
 38:                         }
 39:                     });
 40:
 41:                     timeoutRunnable.run();
 42:                     //if it did not start, then we need to mark a command start for concurrency metrics, and then issue the timeout
 43:                 }
 44:             }
 45:
 46:             @Override
 47:             public int getIntervalTimeInMilliseconds() {
 48:                 return originalCommand.properties.executionTimeoutInMilliseconds().get();
 49:             }
 50:         };
 51:
 52:         final Reference<TimerListener> tl = HystrixTimer.getInstance().addTimerListener(listener);
 53:
 54:         // set externally so execute/queue can see this
 55:         originalCommand.timeoutTimer.set(tl);
 56:
 57:         /**
 58:          * If this subscriber receives values it means the parent succeeded/completed
 59:          */
 60:         Subscriber<R> parent = new Subscriber<R>() {
 61:
 62:             @Override
 63:             public void onCompleted() {
 64:                 if (isNotTimedOut()) {
 65:                     // stop timer and pass notification through
 66:                     tl.clear();
 67:                     // 完成
 68:                     child.onCompleted();
 69:                 } else {
 70:                     System.out.println("timeout: " + "onCompleted"); // 笔者调试用
 71:                 }
 72:             }
 73:
 74:             @Override
 75:             public void onError(Throwable e) {
 76:                 if (isNotTimedOut()) {
 77:                     // stop timer and pass notification through
 78:                     tl.clear();
 79:                     // 异常
 80:                     child.onError(e);
 81:                 } else {
 82:                     System.out.println("timeout: " + "onError"); // 笔者调试用
 83:                 }
 84:             }
 85:
 86:             @Override
 87:             public void onNext(R v) {
 88:                 if (isNotTimedOut()) {
 89:                     // 继续执行
 90:                     child.onNext(v);
 91:                 } else {
 92:                     System.out.println("timeout: " + "onNext"); // 笔者调试用
 93:                 }
 94:             }
 95:
 96:             /**
 97:              * 通过 CAS 判断是否超时
 98:              *
 99:              * @return 是否超时
100:              */
101:             private boolean isNotTimedOut() {
102:                 // if already marked COMPLETED (by onNext) or succeeds in setting to COMPLETED
103:                 return originalCommand.isCommandTimedOut.get() == TimedOutStatus.COMPLETED ||
104:                         originalCommand.isCommandTimedOut.compareAndSet(TimedOutStatus.NOT_EXECUTED, TimedOutStatus.COMPLETED);
105:             }
106:
107:         };
108:
109:         // 添加 订阅
110:         // if s is unsubscribed we want to unsubscribe the parent
111:         s.add(parent);
112:
113:         return parent;
114:     }
115:
116: }
  • 第 12 行 :创建订阅 s
  • 第 15 行 :添加订阅 schild 的订阅。
  • 第 18 行 :获得 HystrixRequestContext 。因为下面 listener 的执行不在当前线程,HystrixRequestContext 基于 ThreadLocal 实现。
  • 第 20 至 50 行 :创建执行命令超时监听器 listener ( TimerListener ) 。当超过执行命令的时长( TimerListener#getIntervalTimeInMilliseconds() )时,TimerListener#tick() 方法触发调用。
    • 第 26 行 :通过 AbstractCommand.isCommandTimedOut 变量 CAS 操作,保证和下面第 60 行parent 有且只有一方操作成功。TimedOutStatus 状态变迁如下图 :
    • 第 28 行 :TODO 【2011】【Hystrix 事件机制】
    • 第 31 行 :取消订阅 s注意 :不同执行隔离策略此处的表现不同
    • 第 34 至 41 行 :执行 child#onError(e) 【Subscriber#onError(Throwable)】 方法,处理 HystrixTimeoutException 异常。该异常会被 handleFallback 处理,点击 链接 查看,在 《Hystrix 源码解析 —— 请求执行(四)之失败回退逻辑》 详细解析。
      • HystrixContextRunnable ,设置第 18 行获得的 HystrixRequestContext 到 Callable#run() 所在线程的 HystrixRequestContext ,并继续执行。点击 链接 查看。另外,HystrixContextRunnable 只有此处使用,独立成类的原因是测试用例使用到。
  • 第 52 行 :使用 TimerListener 到定时器,监听命令的超时执行。
  • 第 55 行 :设置 TimerListener 到 AbstractCommand.timeoutTimer 属性。用于执行超时等等场景下的 TimerListener 的清理( tl#clear() )。如下方法有通过该属性对 TimerListener 的清理 :
  • 第 60 至 107 行 :创建的 Subscriber ( parent )。在传参的 child 的基础上,增加了对是否执行超时的判断( #isNotTimedOut() )和TimerListener的清理。

  • 第 111 行 :添加添加订阅 parents 的订阅。整体订阅关系如下 :
    • 这里看起来 s 有些“多余” ?因为 parentlistener 存在互相引用的情况,通过 s 解决。
  • 第 113 行 :返回 parent注意。如果不能理解,建议阅读下 《RxJava 源码解析 —— Observable#lift(Operator)》

3. HystrixTimer

<

p>com.netflix.hystrix.util.HystrixTimer ,Hystrix 定时器。

目前有如下场景使用 :

HystrixTimer 构造方法,代码如下 :

public class HystrixTimer {

    /**
     * 单例
     */
    private static HystrixTimer INSTANCE = new HystrixTimer();

    /* package */ AtomicReference<ScheduledExecutor> executor = new AtomicReference<ScheduledExecutor>();

    private HystrixTimer() {
        // private to prevent public instantiation
    }

    public static HystrixTimer getInstance() {
        return INSTANCE;
    }

}
  • INSTANCE 静态属性,单例。
  • executor 属性,定时任务执行器( ScheduledExecutor )。

调用 HystrixTimer#addTimerListener(TimerListener) 方法,提交定时监听器,生成定时任务,代码如下 :

  1: public Reference<TimerListener> addTimerListener(final TimerListener listener) {
  2:     startThreadIfNeeded();
  3:     // add the listener
  4:
  5:     Runnable r = new Runnable() {
  6:
  7:         @Override
  8:         public void run() {
  9:             try {
 10:                 listener.tick();
 11:             } catch (Exception e) {
 12:                 logger.error("Failed while ticking TimerListener", e);
 13:             }
 14:         }
 15:     };
 16:
 17:     ScheduledFuture<?> f = executor.get().getThreadPool().scheduleAtFixedRate(r, listener.getIntervalTimeInMilliseconds(), listener.getIntervalTimeInMilliseconds(), TimeUnit.MILLISECONDS);
 18:     return new TimerReference(listener, f);
 19: }
  • 第 2 行 :调用 #startThreadIfNeeded() 方法,保证 executor 延迟初始化已完成。
  • 第 5 至 15 行 :创建定时任务 Runnable 。在 Runnable#run() 方法里,调用 TimerListener#tick() 方法。在 「3.2 TimerListener」 详细解析。
  • 第 17 行 :提交定时监听器,生成定时任务 f ( ScheduledFuture )。
  • 第 18 行 :使用 listener + f 创建 TimerReference 返回。在 「3.3 TimerReference」 详细解析。

3.1 ScheduledExecutor

com.netflix.hystrix.util.HystrixTimer.ScheduledExecutor ,Hystrix 定时任务执行器。代码如下 :

/* package */ static class ScheduledExecutor {
    /**
    * 定时任务线程池执行器
    */
    /* package */ volatile ScheduledThreadPoolExecutor executor;
    /**
     * 是否初始化
     */
    private volatile boolean initialized;

    /**
     * We want this only done once when created in compareAndSet so use an initialize method
     */
    public void initialize() {
        // coreSize
        HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
        int coreSize = propertiesStrategy.getTimerThreadPoolProperties().getCorePoolSize().get();

        // 创建 ThreadFactory
        ThreadFactory threadFactory = null;
        if (!PlatformSpecific.isAppEngineStandardEnvironment()) {
            threadFactory = new ThreadFactory() {
                final AtomicInteger counter = new AtomicInteger();

                @Override
                public Thread newThread(Runnable r) {
                    Thread thread = new Thread(r, "HystrixTimer-" + counter.incrementAndGet());
                    thread.setDaemon(true);
                    return thread;
                }

            };
        } else {
            threadFactory = PlatformSpecific.getAppEngineThreadFactory();
        }

        // 创建 ScheduledThreadPoolExecutor
        executor = new ScheduledThreadPoolExecutor(coreSize, threadFactory);

        // 已初始化
        initialized = true;
   }

   public ScheduledThreadPoolExecutor getThreadPool() {
       return executor;
   }

   public boolean isInitialized() {
       return initialized;
   }
}
  • 线程池大小( coreSize ),通过 HystrixTimerThreadPoolProperties.corePoolSize 配置。

3.2 TimerListener

com.netflix.hystrix.util.HystrixTimer.TimerListener ,Hystrix 定时任务监听器****接口。代码如下 :

public static interface TimerListener {

   /**
    * The 'tick' is called each time the interval occurs.
    * <p>
    * This method should NOT block or do any work but instead fire its work asynchronously to perform on another thread otherwise it will prevent the Timer from functioning.
    * <p>
    * This contract is used to keep this implementation single-threaded and simplistic.
    * <p>
    * If you need a ThreadLocal set, you can store the state in the TimerListener, then when tick() is called, set the ThreadLocal to your desired value.
    */
   void tick();

   /**
    * How often this TimerListener should 'tick' defined in milliseconds.
    */
   int getIntervalTimeInMilliseconds();
}
  • #tick() 方法 :时间到达( 超时 )执行的逻辑。
  • #getIntervalTimeInMilliseconds() 方法 :返回到达( 超时 )时间时长。

3.3 TimerReference

com.netflix.hystrix.util.HystrixTimer.TimerReference ,Hystrix 定时任务引用。代码如下 :

private static class TimerReference extends SoftReference<TimerListener> {

    private final ScheduledFuture<?> f;

    TimerReference(TimerListener referent, ScheduledFuture<?> f) {
        super(referent);
        this.f = f;
    }

    @Override
    public void clear() {
        super.clear();
        // stop this ScheduledFuture from any further executions
        f.cancel(false); // 非强制
    }

}
  • 通过 #clear() 方法,可以取消定时任务的执行。

666. 彩蛋

顺畅~刚开始看 Hystrix 执行命令超时逻辑,一直想不通。现在整理干净了。

喵了个咪~

胖友,分享一波朋友圈可好!

赞(0) 打赏

如未加特殊说明,此网站文章均为原创,转载必须注明出处。Java 技术驿站 » Hystrix 源码解析 —— 命令执行(三)之执行超时
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

Java 技术驿站 | 致力打造 Java 精品博客

联系作者优质文章

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏