Spring Cloud Ribbon 全解 (3) – 基本组件实现源码(1)

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】

本文基于SpringCloud-Dalston.SR5

上一篇我们了解到Ribbon主要由如下几个组件组成:

  1. 所有Ribbon负载均衡器需要实现的接口IClient
  2. 服务实例列表维护机制实现的接口ServerList
  3. 负载均衡数据记录LoadBalancerStats
  4. 负责选取Server的接口ILoadBalancer
  5. 负载均衡选取规则实现的接口IRule
  6. 检查实例是否存活实现的接口IPing
  7. 服务实例列表更新机制实现的接口ServerListUpdater
  8. 服务实例列表过滤机制ServerListFilter

我们会逐个分析

1. 所有Ribbon负载均衡器需要实现的接口IClient

对于这个IClient,之前我们说到执行器逻辑,例如重试还有异常处理,都在这里处理。我们看他的默认抽象类实现AbstractLoadBalancerAwareClient:

AbstractLoadBalancerAwareClient.java

    public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
        //获取重试处理器,这个由其他实现类动态实现
        RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, requestConfig);
        //构造LoadBalancerCommand,RxJava风格
        LoadBalancerCommand<T> command = LoadBalancerCommand.<T>builder()
                .withLoadBalancerContext(this)
                .withRetryHandler(handler)
                .withLoadBalancerURI(request.getUri())
                .build();

        try {

            return command.submit(
                new ServerOperation<T>() {
                    @Override
                    public Observable<T> call(Server server) {
                        //修改原始url为实际的url
                        URI finalUri = reconstructURIWithServer(server, request.getUri());
                        S requestForServer = (S) request.replaceUri(finalUri);
                        try {
                            //执行请求
                            return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
                        } 
                        catch (Exception e) {
                            return Observable.error(e);
                        }
                    }
                })
                .toBlocking()
                .single();
        } catch (Exception e) {
            Throwable t = e.getCause();
            if (t instanceof ClientException) {
                throw (ClientException) t;
            } else {
                throw new ClientException(e);
            }
        }

    }

    public abstract RequestSpecificRetryHandler getRequestSpecificRetryHandler(S request, IClientConfig requestConfig);

这个构造的LoadBalancerCommand是一个RxJava风格的,它包含了重试和异常处理机制:

LoadBalancerCommand.java

    //返回一个只包含一个Server的Observable,但是每次从负载均衡器中获取一个
    private Observable<Server> selectServer() {
        return Observable.create(new OnSubscribe<Server>() {
            @Override
            public void call(Subscriber<? super Server> next) {
                try {
                    Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);   
                    next.onNext(server);
                    next.onCompleted();
                } catch (Exception e) {
                    next.onError(e);
                }
            }
        });
    }
    public Observable<T> submit(final ServerOperation<T> operation) {
        final ExecutionInfoContext context = new ExecutionInfoContext();

        if (listenerInvoker != null) {
            try {
                listenerInvoker.onExecutionStart();
            } catch (AbortExecutionException e) {
                return Observable.error(e);
            }
        }

        //获取在每个服务实例重试的的次数
        final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
        //最多尝试几个服务实例
        final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();

        //对于每个服务实例的调用逻辑
        //默认field server是null,通过selectServer()方法获取一个Server
        Observable<T> o = 
                (server == null ? selectServer() : Observable.just(server))
                .concatMap(new Func1<Server, Observable<T>>() {
                    @Override
                    //对于每个Server,按顺序映射为对于每个Server包含重试逻辑的请求调用
                    public Observable<T> call(Server server) {
                        //设置上下文
                        context.setServer(server);
                        final ServerStats stats = loadBalancerContext.getServerStats(server);

                        //每个Server包含重试逻辑的请求调用
                        Observable<T> o = Observable
                                .just(server)
                                .concatMap(new Func1<Server, Observable<T>>() {
                                    @Override
                                    public Observable<T> call(final Server server) {
                                        context.incAttemptCount();
                                        //增加Server正在处理的请求计数
                                        loadBalancerContext.noteOpenConnection(stats);

                                        //监听器
                                        if (listenerInvoker != null) {
                                            try {
                                                listenerInvoker.onStartWithServer(context.toExecutionInfo());
                                            } catch (AbortExecutionException e) {
                                                return Observable.error(e);
                                            }
                                        }

                                        //计时器
                                        final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
                                        //operation.call(server)就是刚刚分析的AbstractLoadBalancerAwareClient传过来的ServerOperation,就是直接对这个Server调用请求
                                        //doOnEach的操作就是记录请求前后的一些数据用于负载均衡数据统计
                                        return operation.call(server).doOnEach(new Observer<T>() {
                                            private T entity;
                                            @Override
                                            public void onCompleted() {
                                                //记录请求完成
                                                recordStats(tracer, stats, entity, null);
                                            }

                                            @Override
                                            public void onError(Throwable e) {
                                                //记录请求结束
                                                recordStats(tracer, stats, null, e);
                                                logger.debug("Got error {} when executed on server {}", e, server);
                                                //发生了错误,通知listener
                                                if (listenerInvoker != null) {
                                                    listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
                                                }
                                            }

                                            @Override
                                            public void onNext(T entity) {
                                                //因为只有调用请求成功只有一个结果(只有一个请求), 这里的entity就是结果,只要收到结果就代表请求成功
                                                this.entity = entity;
                                                if (listenerInvoker != null) {
                                                    listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
                                                }
                                            }                            

                                            private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
                                                tracer.stop();
                                                loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
                                            }
                                        });
                                    }
                                });

                        if (maxRetrysSame > 0)
                            //是否retry
                            o = o.retry(retryPolicy(maxRetrysSame, true));
                        return o;
                    }
                });

        if (maxRetrysNext > 0 && server == null)
            //是否retry,如果retry回调用selectServer()返回下一个Server
            o = o.retry(retryPolicy(maxRetrysNext, false));

        //异常处理
        return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
            @Override
            public Observable<T> call(Throwable e) {
                if (context.getAttemptCount() > 0) {
                    //如果超过重试次数,则抛异常
                    if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) {
                        e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED,
                                "Number of retries on next server exceeded max " + maxRetrysNext
                                + " retries, while making a call for: " + context.getServer(), e);
                    }
                    else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) {
                        e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED,
                                "Number of retries exceeded max " + maxRetrysSame
                                + " retries, while making a call for: " + context.getServer(), e);
                    }
                }
                if (listenerInvoker != null) {
                    listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo());
                }
                return Observable.error(e);
            }
        });
    }

2. 服务实例列表维护机制实现的接口ServerList

2019102020025\_1.png

AbstractServerList.java

其实这个抽象类一是在实现ServerList接口的同时,实现了IClientConfigAware这个接口,代表是可配置的。
同时,提供了一个生成默认ServerListFilter(这个Filter的实现类是由NIWSServerListFilterClassName这个配置决定,默认是ZoneAffinityServerListFilter)的方法

    public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware {   

        public AbstractServerListFilter<T> getFilterImpl(IClientConfig niwsClientConfig) throws ClientException{
            try {
                String niwsServerListFilterClassName = niwsClientConfig
                        .getProperty(
                                CommonClientConfigKey.NIWSServerListFilterClassName,
                                ZoneAffinityServerListFilter.class.getName())
                        .toString();

                AbstractServerListFilter<T> abstractNIWSServerListFilter = 
                        (AbstractServerListFilter<T>) ClientFactory.instantiateInstanceWithClientConfig(niwsServerListFilterClassName, niwsClientConfig);
                return abstractNIWSServerListFilter;
            } catch (Throwable e) {
                throw new ClientException(
                        ClientException.ErrorType.CONFIGURATION,
                        "Unable to get an instance of CommonClientConfigKey.NIWSServerListFilterClassName. Configured class:"
                                + niwsClientConfig
                                        .getProperty(CommonClientConfigKey.NIWSServerListFilterClassName), e);
            }
        }
    }

ConfigurationBasedServerList.java

这个是默认的实现,如果没有特殊配置,ServerList的实现类就是ConfigurationBasedServerList;这个实际上就是从配置中读取ServerList,这个配置可以是动态配置,例如是Archaius

    public class ConfigurationBasedServerList extends AbstractServerList<Server> {

        private IClientConfig clientConfig;

        @Override
        public List<Server> getInitialListOfServers() {
            return getUpdatedListOfServers();
        }

        @Override
        public List<Server> getUpdatedListOfServers() {
            String listOfServers = clientConfig.get(CommonClientConfigKey.ListOfServers);
            return derive(listOfServers);
        }

        @Override
        public void initWithNiwsConfig(IClientConfig clientConfig) {
            this.clientConfig = clientConfig;
        }
        //可以看出这个配置就是以逗号分隔的字符串
        private List<Server> derive(String value) {
            List<Server> list = Lists.newArrayList();
            if (!Strings.isNullOrEmpty(value)) {
                for (String s: value.split(",")) {
                    list.add(new Server(s.trim()));
                }
            }
            return list;
        }
    }

DiscoveryEnabledNIWSServerList.java

这个就是从Eureka上面获取Server列表的类,构造的时候需要传入相关配置以及最重要的EurekaClient的Provider来获取合适的EurekaClient以便于获取Server列表。

实现ServerList接口的方法都是基于obtainServersViaDiscovery这个方法:

    @Override
    public List<DiscoveryEnabledServer> getInitialListOfServers(){
        return obtainServersViaDiscovery();
    }

    @Override
    public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
        return obtainServersViaDiscovery();
    }

    private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
        List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();

        //如果EurekaClient没有被初始化,则日志报警并返回空的列表
        if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
            logger.warn("EurekaClient has not been initialized yet, returning an empty list");
            return new ArrayList<DiscoveryEnabledServer>();
        }
        EurekaClient eurekaClient = eurekaClientProvider.get();

        //这里的vipAddresses其实就是微服务名称的各种形式,但是注意,它们代表的是同一个微服务
        if (vipAddresses!=null){
            for (String vipAddress : vipAddresses.split(",")) {
                // if targetRegion is null, it will be interpreted as the same region of client
                List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
                for (InstanceInfo ii : listOfInstanceInfo) {
                    if (ii.getStatus().equals(InstanceStatus.UP)) {

                        //是否覆盖port
                        if(shouldUseOverridePort){
                            if(logger.isDebugEnabled()){
                                logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
                            }

                            //这里复制一份是因为不希望其他的地方修改原有的实例信息
                            InstanceInfo copy = new InstanceInfo(ii);

                            if(isSecure){
                                ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
                            }else{
                                ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
                            }
                        }

                        DiscoveryEnabledServer des = new DiscoveryEnabledServer(ii, isSecure, shouldUseIpAddr);
                        des.setZone(DiscoveryClient.getZone(ii));
                        serverList.add(des);
                    }
                }

                //如果有一个vipAddress有服务列表,我们就不用获取剩余的了
                if (serverList.size()>0 && prioritizeVipAddressBasedServers){
                    break; 
                }
            }
        }
        return serverList;
    }

到这里我们可以看出,Ribbon和Eureka的配合其实就是Ribbon从Eureka中利用微服务名称获取Server列表;那么这个列表是如何更新的呢,在Eureka的章节我们提到过,Ribbon定时从EurekaClient获取服务实例列表更新,这就涉及到了下一个我们要讲到的Ribbon元素 – 服务实例列表更新机制实现的接口ServerListUpdater

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » Spring Cloud Ribbon 全解 (3) – 基本组件实现源码(1)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏