Spring Cloud源码分析(二)Ribbon

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】
免费领取 2000+ 道 Java 面试题

在之前介绍使用Ribbon进行服务消费的时候,我们用到了RestTemplate,但是熟悉Spring的同学们是否产生过这样的疑问:RestTemplate不是Spring自己就有的吗?跟Ribbon的客户端负载均衡又有什么关系呢?下面在本文,我们来看RestTemplateRibbon是如何联系起来并实现客户端负载均衡的。

首先,回顾一下之前的消费者示例:我们是如何实现客户端负载均衡的?仔细观察一下代码之前的代码,我们可以发现在消费者的例子中,可能就是这个注解@LoadBalanced是之前没有接触过的,并且从命名上来看也与负载均衡相关。我们不妨以此为线索来看看源码实现的机制。

@LoadBalanced注解源码的注释中,我们可以知道该注解用来给RestTemplate标记,以使用负载均衡的客户端(LoadBalancerClient)来配置它。

通过搜索LoadBalancerClient,我们可以发现这是Spring Cloud中定义的一个接口:

123456789`|`publicinterfaceLoadBalancerClient{ServiceInstancechoose(StringserviceId);<T>Texecute(StringserviceId,LoadBalancerRequest<T>request)throwsIOException;URIreconstructURI(ServiceInstanceinstance,URIoriginal);}
123456789` | `publicinterfaceLoadBalancerClient{ServiceInstancechoose(StringserviceId);<T>Texecute(StringserviceId,LoadBalancerRequest<T>request)throwsIOException;URIreconstructURI(ServiceInstanceinstance,URIoriginal);}

从该接口中,我们可以通过定义的抽象方法来了解到客户端负载均衡器中应具备的几种能力:

  • ServiceInstance choose(String serviceId):根据传入的服务名serviceId,从负载均衡器中挑选一个对应服务的实例。
  • T execute(String serviceId, LoadBalancerRequest request) throws IOException:使用从负载均衡器中挑选出的服务实例来执行请求内容。
  • URI reconstructURI(ServiceInstance instance, URI original):为系统构建一个合适的“host:port”形式的URI。在分布式系统中,我们使用逻辑上的服务名称作为host来构建URI(替代服务实例的“host:port”形式)进行请求,比如:http://myservice/path/to/service。在该操作的定义中,前者ServiceInstance对象是带有host和port的具体服务实例,而后者URI对象则是使用逻辑服务名定义为host的URI,而返回的URI内容则是通过ServiceInstance的服务实例详情拼接出的具体“host:post”形式的请求地址。

顺着LoadBalancerClient接口的所属包org.springframework.cloud.client.loadbalancer,我们对其内容进行整理,可以得出如下图的关系:

20191123100208\_1.png

从类的命名上我们初步判断LoadBalancerAutoConfiguration为实现客户端负载均衡器的自动化配置类。通过查看源码,我们可以验证这一点假设:

12345678910111213141516171819202122232425262728293031323334353637383940414243444546`|`@Configuration@ConditionalOnClass(RestTemplate.class)@ConditionalOnBean(LoadBalancerClient.class)publicclassLoadBalancerAutoConfiguration{@LoadBalanced@Autowired(required=false)privateList<RestTemplate>restTemplates=Collections.emptyList();@BeanpublicSmartInitializingSingletonloadBalancedRestTemplateInitializer(finalList<RestTemplateCustomizer>customizers){returnnewSmartInitializingSingleton(){@OverridepublicvoidafterSingletonsInstantiated(){for(RestTemplaterestTemplate:LoadBalancerAutoConfiguration.this.restTemplates){for(RestTemplateCustomizercustomizer:customizers){customizer.customize(restTemplate);}}}};}@Bean@ConditionalOnMissingBeanpublicRestTemplateCustomizerrestTemplateCustomizer(finalLoadBalancerInterceptorloadBalancerInterceptor){returnnewRestTemplateCustomizer(){@Overridepublicvoidcustomize(RestTemplaterestTemplate){List<ClientHttpRequestInterceptor>list=newArrayList<>(restTemplate.getInterceptors());list.add(loadBalancerInterceptor);restTemplate.setInterceptors(list);}};}@BeanpublicLoadBalancerInterceptorribbonInterceptor(LoadBalancerClientloadBalancerClient){returnnewLoadBalancerInterceptor(loadBalancerClient);}}
12345678910111213141516171819202122232425262728293031323334353637383940414243444546` | `@Configuration@ConditionalOnClass(RestTemplate.class)@ConditionalOnBean(LoadBalancerClient.class)publicclassLoadBalancerAutoConfiguration{@LoadBalanced@Autowired(required=false)privateList<RestTemplate>restTemplates=Collections.emptyList();@BeanpublicSmartInitializingSingletonloadBalancedRestTemplateInitializer(finalList<RestTemplateCustomizer>customizers){returnnewSmartInitializingSingleton(){@OverridepublicvoidafterSingletonsInstantiated(){for(RestTemplaterestTemplate:LoadBalancerAutoConfiguration.this.restTemplates){for(RestTemplateCustomizercustomizer:customizers){customizer.customize(restTemplate);}}}};}@Bean@ConditionalOnMissingBeanpublicRestTemplateCustomizerrestTemplateCustomizer(finalLoadBalancerInterceptorloadBalancerInterceptor){returnnewRestTemplateCustomizer(){@Overridepublicvoidcustomize(RestTemplaterestTemplate){List<ClientHttpRequestInterceptor>list=newArrayList<>(restTemplate.getInterceptors());list.add(loadBalancerInterceptor);restTemplate.setInterceptors(list);}};}@BeanpublicLoadBalancerInterceptorribbonInterceptor(LoadBalancerClientloadBalancerClient){returnnewLoadBalancerInterceptor(loadBalancerClient);}}

LoadBalancerAutoConfiguration类头上的注解可以知道Ribbon实现的负载均衡自动化配置需要满足下面两个条件:

  • @ConditionalOnClass(RestTemplate.class)RestTemplate类必须存在于当前工程的环境中。
  • @ConditionalOnBean(LoadBalancerClient.class):在Spring的Bean工程中有必须有LoadBalancerClient的实现Bean。

在该自动化配置类中,主要做了下面三件事:

  • 创建了一个LoadBalancerInterceptor的Bean,用于实现对客户端发起请求时进行拦截,以实现客户端负载均衡。
  • 创建了一个RestTemplateCustomizer的Bean,用于给RestTemplate增加LoadBalancerInterceptor拦截器。
  • 维护了一个被@LoadBalanced注解修饰的RestTemplate对象列表,并在这里进行初始化,通过调用RestTemplateCustomizer的实例来给需要客户端负载均衡的RestTemplate增加LoadBalancerInterceptor拦截器。

接下来,我们看看LoadBalancerInterceptor拦截器是如何将一个普通的RestTemplate变成客户端负载均衡的:

123456789101112131415161718192021222324252627282930313233343536373839404142`|`publicclassLoadBalancerInterceptorimplementsClientHttpRequestInterceptor{privateLoadBalancerClientloadBalancer;publicLoadBalancerInterceptor(LoadBalancerClientloadBalancer){this.loadBalancer=loadBalancer;}@OverridepublicClientHttpResponseintercept(finalHttpRequestrequest,finalbyte[]body,finalClientHttpRequestExecutionexecution)throwsIOException{finalURIoriginalUri=request.getURI();StringserviceName=originalUri.getHost();returnthis.loadBalancer.execute(serviceName,newLoadBalancerRequest<ClientHttpResponse>(){@OverridepublicClientHttpResponseapply(finalServiceInstanceinstance)throwsException{HttpRequestserviceRequest=newServiceRequestWrapper(request,instance);returnexecution.execute(serviceRequest,body);}});}privateclassServiceRequestWrapperextendsHttpRequestWrapper{privatefinalServiceInstanceinstance;publicServiceRequestWrapper(HttpRequestrequest,ServiceInstanceinstance){super(request);this.instance=instance;}@OverridepublicURIgetURI(){URIuri=LoadBalancerInterceptor.this.loadBalancer.reconstructURI(this.instance,getRequest().getURI());returnuri;}}}
123456789101112131415161718192021222324252627282930313233343536373839404142` | `publicclassLoadBalancerInterceptorimplementsClientHttpRequestInterceptor{privateLoadBalancerClientloadBalancer;publicLoadBalancerInterceptor(LoadBalancerClientloadBalancer){this.loadBalancer=loadBalancer;}@OverridepublicClientHttpResponseintercept(finalHttpRequestrequest,finalbyte[]body,finalClientHttpRequestExecutionexecution)throwsIOException{finalURIoriginalUri=request.getURI();StringserviceName=originalUri.getHost();returnthis.loadBalancer.execute(serviceName,newLoadBalancerRequest<ClientHttpResponse>(){@OverridepublicClientHttpResponseapply(finalServiceInstanceinstance)throwsException{HttpRequestserviceRequest=newServiceRequestWrapper(request,instance);returnexecution.execute(serviceRequest,body);}});}privateclassServiceRequestWrapperextendsHttpRequestWrapper{privatefinalServiceInstanceinstance;publicServiceRequestWrapper(HttpRequestrequest,ServiceInstanceinstance){super(request);this.instance=instance;}@OverridepublicURIgetURI(){URIuri=LoadBalancerInterceptor.this.loadBalancer.reconstructURI(this.instance,getRequest().getURI());returnuri;}}}

通过源码以及之前的自动化配置类,我们可以看到在拦截器中注入了LoadBalancerClient的实现。当一个被@LoadBalanced注解修饰的RestTemplate对象向外发起HTTP请求时,会被LoadBalancerInterceptor类的intercept函数所拦截。由于我们在使用RestTemplate时候采用了服务名作为host,所以直接从HttpRequest的URI对象中通过getHost()就可以拿到服务名,然后调用execute函数去根据服务名来选择实例并发起实际的请求。

分析到这里,LoadBalancerClient还只是一个抽象的负载均衡器接口,所以我们还需要找到它的具体实现类来进一步分析。通过查看ribbon的源码,我们可以很容易的在org.springframework.cloud.netflix.ribbon包下找到对应的实现类:RibbonLoadBalancerClient

12345678910111213141516171819202122232425262728`|`public<T>Texecute(StringserviceId,LoadBalancerRequest<T>request)throwsIOException{ILoadBalancerloadBalancer=getLoadBalancer(serviceId);Serverserver=getServer(loadBalancer);if(server==null){thrownewIllegalStateException("Noinstancesavailablefor"+serviceId);}RibbonServerribbonServer=newRibbonServer(serviceId,server,isSecure(server,serviceId),serverIntrospector(serviceId).getMetadata(server));RibbonLoadBalancerContextcontext=this.clientFactory.getLoadBalancerContext(serviceId);RibbonStatsRecorderstatsRecorder=newRibbonStatsRecorder(context,server);try{TreturnVal=request.apply(ribbonServer);statsRecorder.recordStats(returnVal);returnreturnVal;}catch(IOExceptionex){statsRecorder.recordStats(ex);throwex;}catch(Exceptionex){statsRecorder.recordStats(ex);ReflectionUtils.rethrowRuntimeException(ex);}returnnull;}
12345678910111213141516171819202122232425262728` | `public<T>Texecute(StringserviceId,LoadBalancerRequest<T>request)throwsIOException{ILoadBalancerloadBalancer=getLoadBalancer(serviceId);Serverserver=getServer(loadBalancer);if(server==null){thrownewIllegalStateException("Noinstancesavailablefor"+serviceId);}RibbonServerribbonServer=newRibbonServer(serviceId,server,isSecure(server,serviceId),serverIntrospector(serviceId).getMetadata(server));RibbonLoadBalancerContextcontext=this.clientFactory.getLoadBalancerContext(serviceId);RibbonStatsRecorderstatsRecorder=newRibbonStatsRecorder(context,server);try{TreturnVal=request.apply(ribbonServer);statsRecorder.recordStats(returnVal);returnreturnVal;}catch(IOExceptionex){statsRecorder.recordStats(ex);throwex;}catch(Exceptionex){statsRecorder.recordStats(ex);ReflectionUtils.rethrowRuntimeException(ex);}returnnull;}

可以看到,在execute函数的实现中,第一步做的就是通过getServer根据传入的服务名serviceId去获得具体的服务实例:

123456`|`protectedServergetServer(ILoadBalancerloadBalancer){if(loadBalancer==null){returnnull;}returnloadBalancer.chooseServer("default");}
123456` | `protectedServergetServer(ILoadBalancerloadBalancer){if(loadBalancer==null){returnnull;}returnloadBalancer.chooseServer("default");}

通过getServer函数的实现源码,我们可以看到这里获取具体服务实例的时候并没有使用LoadBalancerClient接口中的choose函数,而是使用了ribbon自身的ILoadBalancer接口中定义的chooseServer函数。

我们先来认识一下ILoadBalancer接口:

123456789101112`|`publicinterfaceILoadBalancer{publicvoidaddServers(List<Server>newServers);publicServerchooseServer(Objectkey);publicvoidmarkServerDown(Serverserver);publicList<Server>getReachableServers();publicList<Server>getAllServers();}
123456789101112` | `publicinterfaceILoadBalancer{publicvoidaddServers(List<Server>newServers);publicServerchooseServer(Objectkey);publicvoidmarkServerDown(Serverserver);publicList<Server>getReachableServers();publicList<Server>getAllServers();}

可以看到,在该接口中定义了一个软负载均衡器需要的一系列抽象操作(未例举过期函数):

  • addServers:向负载均衡器中维护的实例列表增加服务实例。
  • chooseServer:通过某种策略,从负载均衡器中挑选出一个具体的服务实例。
  • markServerDown:用来通知和标识负载均衡器中某个具体实例已经停止服务,不然负载均衡器在下一次获取服务实例清单前都会认为服务实例均是正常服务的。
  • getReachableServers:获取当前正常服务的实例列表。
  • getAllServers:获取所有已知的服务实例列表,包括正常服务和停止服务的实例。

在该接口定义中涉及到的Server对象定义的是一个传统的服务端节点,在该类中存储了服务端节点的一些元数据信息,包括:host、port以及一些部署信息等。

20191123100208\_2.png

而对于该接口的实现,我们可以整理出如上图所示的结构。我们可以看到BaseLoadBalancer类实现了基础的负载均衡,而DynamicServerListLoadBalancerZoneAwareLoadBalancer在负载均衡的策略上做了一些功能的扩展。

那么在整合Ribbon的时候Spring Cloud默认采用了哪个具体实现呢?我们通过RibbonClientConfiguration配置类,可以知道在整合时默认采用了ZoneAwareLoadBalancer来实现负载均衡器。

1234567891011`|`@Bean@ConditionalOnMissingBeanpublicILoadBalancerribbonLoadBalancer(IClientConfigconfig,ServerList<Server>serverList,ServerListFilter<Server>serverListFilter,IRulerule,IPingping){ZoneAwareLoadBalancer<Server>balancer=LoadBalancerBuilder.newBuilder().withClientConfig(config).withRule(rule).withPing(ping).withServerListFilter(serverListFilter).withDynamicServerList(serverList).buildDynamicServerListLoadBalancer();returnbalancer;}
1234567891011` | `@Bean@ConditionalOnMissingBeanpublicILoadBalancerribbonLoadBalancer(IClientConfigconfig,ServerList<Server>serverList,ServerListFilter<Server>serverListFilter,IRulerule,IPingping){ZoneAwareLoadBalancer<Server>balancer=LoadBalancerBuilder.newBuilder().withClientConfig(config).withRule(rule).withPing(ping).withServerListFilter(serverListFilter).withDynamicServerList(serverList).buildDynamicServerListLoadBalancer();returnbalancer;}

下面,我们再回到RibbonLoadBalancerClientexecute函数逻辑,在通过ZoneAwareLoadBalancerchooseServer函数获取了负载均衡策略分配到的服务实例对象Server之后,将其内容包装成RibbonServer对象(该对象除了存储了服务实例的信息之外,还增加了服务名serviceId、是否需要使用HTTPS等其他信息),然后使用该对象再回调LoadBalancerInterceptor请求拦截器中LoadBalancerRequestapply(final ServiceInstance instance)函数,向一个实际的具体服务实例发起请求,从而实现一开始以服务名为host的URI请求,到实际访问host:post形式的具体地址的转换。

apply(final ServiceInstance instance)函数中传入的ServiceInstance接口是对服务实例的抽象定义。在该接口中暴露了服务治理系统中每个服务实例需要提供的一些基本信息,比如:serviceId、host、port等,具体定义如下:

1234567891011121314`|`publicinterfaceServiceInstance{StringgetServiceId();StringgetHost();intgetPort();booleanisSecure();URIgetUri();Map<String,String>getMetadata();}
1234567891011121314` | `publicinterfaceServiceInstance{StringgetServiceId();StringgetHost();intgetPort();booleanisSecure();URIgetUri();Map<String,String>getMetadata();}

而上面提到的具体包装Server服务实例的RibbonServer对象就是ServiceInstance接口的实现,可以看到它除了包含了Server对象之外,还存储了服务名、是否使用https标识以及一个Map类型的元数据集合。

12345678910111213141516171819202122`|`protectedstaticclassRibbonServerimplementsServiceInstance{privatefinalStringserviceId;privatefinalServerserver;privatefinalbooleansecure;privateMap<String,String>metadata;protectedRibbonServer(StringserviceId,Serverserver){this(serviceId,server,false,Collections.<String,String>emptyMap());}protectedRibbonServer(StringserviceId,Serverserver,booleansecure,Map<String,String>metadata){this.serviceId=serviceId;this.server=server;this.secure=secure;this.metadata=metadata;}//省略实现ServiceInstance的一些获取Server信息的get函数...}
12345678910111213141516171819202122` | `protectedstaticclassRibbonServerimplementsServiceInstance{privatefinalStringserviceId;privatefinalServerserver;privatefinalbooleansecure;privateMap<String,String>metadata;protectedRibbonServer(StringserviceId,Serverserver){this(serviceId,server,false,Collections.<String,String>emptyMap());}protectedRibbonServer(StringserviceId,Serverserver,booleansecure,Map<String,String>metadata){this.serviceId=serviceId;this.server=server;this.secure=secure;this.metadata=metadata;}//省略实现ServiceInstance的一些获取Server信息的get函数...}

那么apply(final ServiceInstance instance)函数,在接收到了具体ServiceInstance实例后,是如何通过LoadBalancerClient接口中的reconstructURI操作来组织具体请求地址的呢?

123456`|`@OverridepublicClientHttpResponseapply(finalServiceInstanceinstance)throwsException{HttpRequestserviceRequest=newServiceRequestWrapper(request,instance);returnexecution.execute(serviceRequest,body);}
123456` | `@OverridepublicClientHttpResponseapply(finalServiceInstanceinstance)throwsException{HttpRequestserviceRequest=newServiceRequestWrapper(request,instance);returnexecution.execute(serviceRequest,body);}

apply的实现中,我们可以看到它具体执行的时候,还传入了ServiceRequestWrapper对象,该对象继承了HttpRequestWrapper并重写了getURI函数,重写后的getURI会通过调用LoadBalancerClient接口的reconstructURI函数来重新构建一个URI来进行访问。

12345678910111213`|`privateclassServiceRequestWrapperextendsHttpRequestWrapper{privatefinalServiceInstanceinstance;...@OverridepublicURIgetURI(){URIuri=LoadBalancerInterceptor.this.loadBalancer.reconstructURI(this.instance,getRequest().getURI());returnuri;}}
12345678910111213` | `privateclassServiceRequestWrapperextendsHttpRequestWrapper{privatefinalServiceInstanceinstance;...@OverridepublicURIgetURI(){URIuri=LoadBalancerInterceptor.this.loadBalancer.reconstructURI(this.instance,getRequest().getURI());returnuri;}}

LoadBalancerInterceptor拦截器中,ClientHttpRequestExecution的实例具体执行execution.execute(serviceRequest, body)时,会调用InterceptingClientHttpRequestInterceptingRequestExecution类的execute函数,具体实现如下:

1234567891011121314`|`publicClientHttpResponseexecute(HttpRequestrequest,byte[]body)throwsIOException{if(this.iterator.hasNext()){ClientHttpRequestInterceptornextInterceptor=this.iterator.next();returnnextInterceptor.intercept(request,body,this);}else{ClientHttpRequestdelegate=requestFactory.createRequest(request.getURI(),request.getMethod());delegate.getHeaders().putAll(request.getHeaders());if(body.length>0){StreamUtils.copy(body,delegate.getBody());}returndelegate.execute();}}
1234567891011121314` | `publicClientHttpResponseexecute(HttpRequestrequest,byte[]body)throwsIOException{if(this.iterator.hasNext()){ClientHttpRequestInterceptornextInterceptor=this.iterator.next();returnnextInterceptor.intercept(request,body,this);}else{ClientHttpRequestdelegate=requestFactory.createRequest(request.getURI(),request.getMethod());delegate.getHeaders().putAll(request.getHeaders());if(body.length>0){StreamUtils.copy(body,delegate.getBody());}returndelegate.execute();}}

可以看到在创建请求的时候requestFactory.createRequest(request.getURI(), request.getMethod());,这里request.getURI()会调用之前介绍的ServiceRequestWrapper对象中重写的getURI函数。此时,它就会使用RibbonLoadBalancerClient中实现的reconstructURI来组织具体请求的服务实例地址。

12345678910111213`|`publicURIreconstructURI(ServiceInstanceinstance,URIoriginal){Assert.notNull(instance,"instancecannotbenull");StringserviceId=instance.getServiceId();RibbonLoadBalancerContextcontext=this.clientFactory.getLoadBalancerContext(serviceId);Serverserver=newServer(instance.getHost(),instance.getPort());booleansecure=isSecure(server,serviceId);URIuri=original;if(secure){uri=UriComponentsBuilder.fromUri(uri).scheme("https").build().toUri();}returncontext.reconstructURIWithServer(server,uri);}
12345678910111213` | `publicURIreconstructURI(ServiceInstanceinstance,URIoriginal){Assert.notNull(instance,"instancecannotbenull");StringserviceId=instance.getServiceId();RibbonLoadBalancerContextcontext=this.clientFactory.getLoadBalancerContext(serviceId);Serverserver=newServer(instance.getHost(),instance.getPort());booleansecure=isSecure(server,serviceId);URIuri=original;if(secure){uri=UriComponentsBuilder.fromUri(uri).scheme("https").build().toUri();}returncontext.reconstructURIWithServer(server,uri);}

reconstructURI函数中,我们可以看到,它通过ServiceInstance实例对象的serviceId,从SpringClientFactory类的clientFactory对象中获取对应serviceId的负载均衡器的上下文RibbonLoadBalancerContext对象。然后根据ServiceInstance中的信息来构建具体服务实例信息的Server对象,并使用RibbonLoadBalancerContext对象的reconstructURIWithServer函数来构建服务实例的URI。

为了帮助理解,简单介绍一下上面提到的SpringClientFactoryRibbonLoadBalancerContext

  • SpringClientFactory类是一个用来创建客户端负载均衡器的工厂类,该工厂会为每一个不同名的ribbon客户端生成不同的Spring上下文。
  • RibbonLoadBalancerContext类是LoadBalancerContext的子类,该类用于存储一些被负载均衡器使用的上下文内容和Api操作(reconstructURIWithServer就是其中之一)。

reconstructURIWithServer的实现中我们可以看到,它同reconstructURI的定义类似。只是reconstructURI的第一个保存具体服务实例的参数使用了Spring Cloud定义的ServiceInstance,而reconstructURIWithServer中使用了Netflix中定义的Server,所以在RibbonLoadBalancerClient实现reconstructURI时候,做了一次转换,使用ServiceInstance的host和port信息来构建了一个Server对象来给reconstructURIWithServer使用。从reconstructURIWithServer的实现逻辑中,我们可以看到,它从Server对象中获取host和port信息,然后根据以服务名为host的URI对象original中获取其他请求信息,将两者内容进行拼接整合,形成最终要访问的服务实例的具体地址。

123456789101112131415161718192021222324252627282930313233343536373839404142`|`publicclassLoadBalancerContextimplementsIClientConfigAware{...publicURIreconstructURIWithServer(Serverserver,URIoriginal){Stringhost=server.getHost();intport=server.getPort();if(host.equals(original.getHost())&&port==original.getPort()){returnoriginal;}Stringscheme=original.getScheme();if(scheme==null){scheme=deriveSchemeAndPortFromPartialUri(original).first();}try{StringBuildersb=newStringBuilder();sb.append(scheme).append("://");if(!Strings.isNullOrEmpty(original.getRawUserInfo())){sb.append(original.getRawUserInfo()).append("@");}sb.append(host);if(port>=0){sb.append(":").append(port);}sb.append(original.getRawPath());if(!Strings.isNullOrEmpty(original.getRawQuery())){sb.append("?").append(original.getRawQuery());}if(!Strings.isNullOrEmpty(original.getRawFragment())){sb.append("#").append(original.getRawFragment());}URInewURI=newURI(sb.toString());returnnewURI;}catch(URISyntaxExceptione){thrownewRuntimeException(e);}}...}
123456789101112131415161718192021222324252627282930313233343536373839404142` | `publicclassLoadBalancerContextimplementsIClientConfigAware{...publicURIreconstructURIWithServer(Serverserver,URIoriginal){Stringhost=server.getHost();intport=server.getPort();if(host.equals(original.getHost())&&port==original.getPort()){returnoriginal;}Stringscheme=original.getScheme();if(scheme==null){scheme=deriveSchemeAndPortFromPartialUri(original).first();}try{StringBuildersb=newStringBuilder();sb.append(scheme).append("://");if(!Strings.isNullOrEmpty(original.getRawUserInfo())){sb.append(original.getRawUserInfo()).append("@");}sb.append(host);if(port>=0){sb.append(":").append(port);}sb.append(original.getRawPath());if(!Strings.isNullOrEmpty(original.getRawQuery())){sb.append("?").append(original.getRawQuery());}if(!Strings.isNullOrEmpty(original.getRawFragment())){sb.append("#").append(original.getRawFragment());}URInewURI=newURI(sb.toString());returnnewURI;}catch(URISyntaxExceptione){thrownewRuntimeException(e);}}...}

另外,从RibbonLoadBalancerClientexecute的函数逻辑中,我们还能看到在回调拦截器中,执行具体的请求之后,ribbon还通过RibbonStatsRecorder对象对服务的请求还进行了跟踪记录,这里不再展开说明,有兴趣的读者可以继续研究。

分析到这里,我们已经可以大致理清Spring Cloud中使用Ribbon实现客户端负载均衡的基本脉络。了解了它是如何通过LoadBalancerInterceptor拦截器对RestTemplate的请求进行拦截,并利用Spring Cloud的负载均衡器LoadBalancerClient将以逻辑服务名为host的URI转换成具体的服务实例的过程。同时通过分析LoadBalancerClient的Ribbon实现RibbonLoadBalancerClient,可以知道在使用Ribbon实现负载均衡器的时候,实际使用的还是Ribbon中定义的ILoadBalancer接口的实现,自动化配置会采用ZoneAwareLoadBalancer的实例来进行客户端负载均衡实现。

负载均衡器

通过之前的分析,我们已经对Spring Cloud如何使用Ribbon有了基本的了解。虽然Spring Cloud中定义了LoadBalancerClient为负载均衡器的接口,并且针对Ribbon实现了RibbonLoadBalancerClient,但是它在具体实现客户端负载均衡时,则是通过Ribbon的ILoadBalancer接口实现。在上一节分析时候,我们对该接口的实现结构已经做了一些简单的介绍,下面我们根据ILoadBalancer接口的实现类逐个看看它都是如何实现客户端负载均衡的。

AbstractLoadBalancer

AbstractLoadBalancerILoadBalancer接口的抽象实现。在该抽象类中定义了一个关于服务实例的分组枚举类ServerGroup,它包含了三种不同类型:ALL-所有服务实例、STATUS_UP-正常服务的实例、STATUS_NOT_UP-停止服务的实例;实现了一个chooseServer()函数,该函数通过调用接口中的chooseServer(Object key)实现,其中参数key为null,表示在选择具体服务实例时忽略key的条件判断;最后还定义了两个抽象函数,getServerList(ServerGroup serverGroup)定义了根据分组类型来获取不同的服务实例列表,getLoadBalancerStats()定义了获取LoadBalancerStats对象的方法,LoadBalancerStats对象被用来存储负载均衡器中各个服务实例当前的属性和统计信息,这些信息非常有用,我们可以利用这些信息来观察负载均衡器的运行情况,同时这些信息也是用来制定负载均衡策略的重要依据。

12345678910111213141516`|`publicabstractclassAbstractLoadBalancerimplementsILoadBalancer{publicenumServerGroup{ALL,STATUS_UP,STATUS_NOT_UP}publicServerchooseServer(){returnchooseServer(null);}publicabstractList<Server>getServerList(ServerGroupserverGroup);publicabstractLoadBalancerStatsgetLoadBalancerStats();}
12345678910111213141516` | `publicabstractclassAbstractLoadBalancerimplementsILoadBalancer{publicenumServerGroup{ALL,STATUS_UP,STATUS_NOT_UP}publicServerchooseServer(){returnchooseServer(null);}publicabstractList<Server>getServerList(ServerGroupserverGroup);publicabstractLoadBalancerStatsgetLoadBalancerStats();}

BaseLoadBalancer

BaseLoadBalancer类是Ribbon负载均衡器的基础实现类,在该类中定义很多关于均衡负载器相关的基础内容:

  • 定义并维护了两个存储服务实例Server对象的列表。一个用于存储所有服务实例的清单,一个用于存储正常服务的实例清单。

    123456`|`@Monitor(name=PREFIX+"AllServerList",type=DataSourceType.INFORMATIONAL)protectedvolatileList<Server>allServerList=Collections.synchronizedList(newArrayList<Server>());@Monitor(name=PREFIX+"UpServerList",type=DataSourceType.INFORMATIONAL)protectedvolatileList<Server>upServerList=Collections.synchronizedList(newArrayList<Server>());
    123456` | `@Monitor(name=PREFIX+"AllServerList",type=DataSourceType.INFORMATIONAL)protectedvolatileList<Server>allServerList=Collections.synchronizedList(newArrayList<Server>());@Monitor(name=PREFIX+"UpServerList",type=DataSourceType.INFORMATIONAL)protectedvolatileList<Server>upServerList=Collections.synchronizedList(newArrayList<Server>());
  • 定义了之前我们提到的用来存储负载均衡器各服务实例属性和统计信息的LoadBalancerStats对象。

  • 定义了检查服务实例是否正常服务的IPing对象,在BaseLoadBalancer中默认为null,需要在构造时注入它的具体实现。

  • 定义了检查服务实例操作的执行策略对象IPingStrategy,在BaseLoadBalancer中默认使用了该类中定义的静态内部类SerialPingStrategy实现。根据源码,我们可以看到该策略采用线性遍历ping服务实例的方式实现检查。该策略在当我们实现的IPing速度不理想,或是Server列表过大时,可能变的不是很为理想,这时候我们需要通过实现IPingStrategy接口并实现pingServers(IPing ping, Server[] servers)函数去扩展ping的执行策略。

12345678910111213141516171819202122232425`|`privatestaticclassSerialPingStrategyimplementsIPingStrategy{@Overridepublicboolean[]pingServers(IPingping,Server[]servers){intnumCandidates=servers.length;boolean[]results=newboolean[numCandidates];if(logger.isDebugEnabled()){logger.debug("LoadBalancer:PingTaskexecuting["+numCandidates+"]serversconfigured");}for(inti=0;i<numCandidates;i++){results[i]=false;try{if(ping!=null){results[i]=ping.isAlive(servers[i]);}}catch(Throwablet){logger.error("ExceptionwhilepingingServer:"+servers[i],t);}}returnresults;}}
12345678910111213141516171819202122232425` | `privatestaticclassSerialPingStrategyimplementsIPingStrategy{@Overridepublicboolean[]pingServers(IPingping,Server[]servers){intnumCandidates=servers.length;boolean[]results=newboolean[numCandidates];if(logger.isDebugEnabled()){logger.debug("LoadBalancer:PingTaskexecuting["+numCandidates+"]serversconfigured");}for(inti=0;i<numCandidates;i++){results[i]=false;try{if(ping!=null){results[i]=ping.isAlive(servers[i]);}}catch(Throwablet){logger.error("ExceptionwhilepingingServer:"+servers[i],t);}}returnresults;}}
  • 定义了负载均衡的处理规则IRule对象,从BaseLoadBalancerchooseServer(Object key)的实现源码,我们可以知道负载均衡器实际进行服务实例选择任务是委托给了IRule实例中的choose函数来实现。而在这里,默认初始化了RoundRobinRuleIRule的实现对象。RoundRobinRule实现了最基本且常用的线性负载均衡规则。
123456789101112131415`|`publicServerchooseServer(Objectkey){if(counter==null){counter=createCounter();}counter.increment();if(rule==null){returnnull;}else{try{returnrule.choose(key);}catch(Throwablet){returnnull;}}}
123456789101112131415` | `publicServerchooseServer(Objectkey){if(counter==null){counter=createCounter();}counter.increment();if(rule==null){returnnull;}else{try{returnrule.choose(key);}catch(Throwablet){returnnull;}}}
  • 启动ping任务:在BaseLoadBalancer的默认构造函数中,会直接启动一个用于定时检查Server是否健康的任务。该任务默认的执行间隔为:10秒。
  • 实现了ILoadBalancer接口定义的负载均衡器应具备的一系列基本操作:

    • addServers(List newServers):向负载均衡器中增加新的服务实例列表,该实现将原本已经维护着的所有服务实例清单allServerList和新传入的服务实例清单newServers都加入到newList中,然后通过调用setServersList函数对newList进行处理,在BaseLoadBalancer中实现的时候会使用新的列表覆盖旧的列表。而之后介绍的几个扩展实现类对于服务实例清单更新的优化都是对setServersList函数的重写来实现的。

      123456789101112`|`publicvoidaddServers(List<Server>newServers){if(newServers!=null&&newServers.size()>0){try{ArrayList<Server>newList=newArrayList<Server>();newList.addAll(allServerList);newList.addAll(newServers);setServersList(newList);}catch(Exceptione){logger.error("ExceptionwhileaddingServers",e);}}}
      123456789101112` | `publicvoidaddServers(List<Server>newServers){if(newServers!=null&&newServers.size()>0){try{ArrayList<Server>newList=newArrayList<Server>();newList.addAll(allServerList);newList.addAll(newServers);setServersList(newList);}catch(Exceptione){logger.error("ExceptionwhileaddingServers",e);}}}
    • chooseServer(Object key):挑选一个具体的服务实例,在上面介绍IRule的时候,已经做了说明,这里不再赘述。

    • markServerDown(Server server):标记某个服务实例暂停服务。

      123456789101112`|`publicvoidmarkServerDown(Serverserver){if(server==null){return;}if(!server.isAlive()){return;}logger.error("LoadBalancer:markServerDowncalledon["+server.getId()+"]");server.setAlive(false);notifyServerStatusChangeListener(singleton(server));}
      123456789101112` | `publicvoidmarkServerDown(Serverserver){if(server==null){return;}if(!server.isAlive()){return;}logger.error("LoadBalancer:markServerDowncalledon["+server.getId()+"]");server.setAlive(false);notifyServerStatusChangeListener(singleton(server));}
    • getReachableServers():获取可用的服务实例列表。由于BaseLoadBalancer中单独维护了一个正常服务的实例清单,所以直接返回即可。

      123`|`publicList<Server>getReachableServers(){returnCollections.unmodifiableList(upServerList);}
      123` | `publicList<Server>getReachableServers(){returnCollections.unmodifiableList(upServerList);}
    • getAllServers():获取所有的服务实例列表。由于BaseLoadBalancer中单独维护了一个所有服务的实例清单,所以也直接返回它即可。

      123`|`publicList<Server>getAllServers(){returnCollections.unmodifiableList(allServerList);}
      123` | `publicList<Server>getAllServers(){returnCollections.unmodifiableList(allServerList);}

DynamicServerListLoadBalancer

DynamicServerListLoadBalancer类继承于BaseLoadBalancer类,它是对基础负载均衡器的扩展。在该负载均衡器中,实现了服务实例清单的在运行期的动态更新能力;同时,它还具备了对服务实例清单的过滤功能,也就是说我们可以通过过滤器来选择性的获取一批服务实例清单。下面我们具体来看看在该类中增加了一些什么内容:

ServerList

DynamicServerListLoadBalancer的成员定义中,我们马上可以发现新增了一个关于服务列表的操作对象:ServerList<T> serverListImpl。其中泛型T从类名中对于T的限定DynamicServerListLoadBalancer<T extends Server>可以获知它是一个Server的子类,即代表了一个具体的服务实例的扩展类。而ServerList接口定义如下所示:

123456`|`publicinterfaceServerList<TextendsServer>{publicList<T>getInitialListOfServers();publicList<T>getUpdatedListOfServers();}
123456` | `publicinterfaceServerList<TextendsServer>{publicList<T>getInitialListOfServers();publicList<T>getUpdatedListOfServers();}

它定义了两个抽象方法:getInitialListOfServers用于获取初始化的服务实例清单,而getUpdatedListOfServers用于获取更新的服务实例清单。那么该接口的实现有哪些呢?通过搜索源码,我们可以整出如下图的结构:

20191123100208\_3.png

从图中我们可以看到有很多个ServerList的实现类,那么在DynamicServerListLoadBalancer中的ServerList默认配置到底使用了哪个具体实现呢?既然在该负载均衡器中需要实现服务实例的动态更新,那么势必需要ribbon具备访问eureka来获取服务实例的能力,所以我们从Spring Cloud整合ribbon与eureka的包org.springframework.cloud.netflix.ribbon.eureka下探索,可以找到配置类EurekaRibbonClientConfiguration,在该类中可以找到看到下面创建ServerList实例的内容:

123456789`|`@Bean@ConditionalOnMissingBeanpublicServerList<?>ribbonServerList(IClientConfigconfig){DiscoveryEnabledNIWSServerListdiscoveryServerList=newDiscoveryEnabledNIWSServerList(config);DomainExtractingServerListserverList=newDomainExtractingServerList(discoveryServerList,config,this.approximateZoneFromHostname);returnserverList;}
123456789` | `@Bean@ConditionalOnMissingBeanpublicServerList<?>ribbonServerList(IClientConfigconfig){DiscoveryEnabledNIWSServerListdiscoveryServerList=newDiscoveryEnabledNIWSServerList(config);DomainExtractingServerListserverList=newDomainExtractingServerList(discoveryServerList,config,this.approximateZoneFromHostname);returnserverList;}

可以看到,这里创建的是一个DomainExtractingServerList实例,从下面它的源码中我们可以看到在它内部还定义了一个ServerList list。同时,DomainExtractingServerList类中对getInitialListOfServersgetUpdatedListOfServers的具体实现,其实委托给了内部定义的ServerList list对象,而该对象是通过创建DomainExtractingServerList时候,由构造函数传入的DiscoveryEnabledNIWSServerList实现的。

12345678910111213141516171819202122232425262728`|`publicclassDomainExtractingServerListimplementsServerList<DiscoveryEnabledServer>{privateServerList<DiscoveryEnabledServer>list;privateIClientConfigclientConfig;privatebooleanapproximateZoneFromHostname;publicDomainExtractingServerList(ServerList<DiscoveryEnabledServer>list,IClientConfigclientConfig,booleanapproximateZoneFromHostname){this.list=list;this.clientConfig=clientConfig;this.approximateZoneFromHostname=approximateZoneFromHostname;}@OverridepublicList<DiscoveryEnabledServer>getInitialListOfServers(){List<DiscoveryEnabledServer>servers=setZones(this.list.getInitialListOfServers());returnservers;}@OverridepublicList<DiscoveryEnabledServer>getUpdatedListOfServers(){List<DiscoveryEnabledServer>servers=setZones(this.list.getUpdatedListOfServers());returnservers;}...}
12345678910111213141516171819202122232425262728` | `publicclassDomainExtractingServerListimplementsServerList<DiscoveryEnabledServer>{privateServerList<DiscoveryEnabledServer>list;privateIClientConfigclientConfig;privatebooleanapproximateZoneFromHostname;publicDomainExtractingServerList(ServerList<DiscoveryEnabledServer>list,IClientConfigclientConfig,booleanapproximateZoneFromHostname){this.list=list;this.clientConfig=clientConfig;this.approximateZoneFromHostname=approximateZoneFromHostname;}@OverridepublicList<DiscoveryEnabledServer>getInitialListOfServers(){List<DiscoveryEnabledServer>servers=setZones(this.list.getInitialListOfServers());returnservers;}@OverridepublicList<DiscoveryEnabledServer>getUpdatedListOfServers(){List<DiscoveryEnabledServer>servers=setZones(this.list.getUpdatedListOfServers());returnservers;}...}

那么DiscoveryEnabledNIWSServerList是如何实现这两个服务实例的获取的呢?我们从源码中可以看到这两个方法都是通过该类中的一个私有函数obtainServersViaDiscovery来通过服务发现机制来实现服务实例的获取。

123456789`|`@OverridepublicList<DiscoveryEnabledServer>getInitialListOfServers(){returnobtainServersViaDiscovery();}@OverridepublicList<DiscoveryEnabledServer>getUpdatedListOfServers(){returnobtainServersViaDiscovery();}
123456789` | `@OverridepublicList<DiscoveryEnabledServer>getInitialListOfServers(){returnobtainServersViaDiscovery();}@OverridepublicList<DiscoveryEnabledServer>getUpdatedListOfServers(){returnobtainServersViaDiscovery();}

obtainServersViaDiscovery的实现逻辑如下,主要依靠EurekaClient从服务注册中心中获取到具体的服务实例InstanceInfo列表(EurekaClient的具体实现,我们在分析Eureka的源码时已经做了详细的介绍,这里传入的vipAddress可以理解为逻辑上的服务名,比如“USER-SERVICE”)。接着,对这些服务实例进行遍历,将状态为“UP”(正常服务)的实例转换成DiscoveryEnabledServer对象,最后将这些实例组织成列表返回。

12345678910111213141516171819202122232425262728`|`privateList<DiscoveryEnabledServer>obtainServersViaDiscovery(){List<DiscoveryEnabledServer>serverList=newArrayList<DiscoveryEnabledServer>();if(eurekaClientProvider==null||eurekaClientProvider.get()==null){logger.warn("EurekaClienthasnotbeeninitializedyet,returninganemptylist");returnnewArrayList<DiscoveryEnabledServer>();}EurekaClienteurekaClient=eurekaClientProvider.get();if(vipAddresses!=null){for(StringvipAddress:vipAddresses.split(",")){List<InstanceInfo>listOfInstanceInfo=eurekaClient.getInstancesByVipAddress(vipAddress,isSecure,targetRegion);for(InstanceInfoii:listOfInstanceInfo){if(ii.getStatus().equals(InstanceStatus.UP)){//省略了一些实例信息的加工逻辑DiscoveryEnabledServerdes=newDiscoveryEnabledServer(ii,isSecure,shouldUseIpAddr);des.setZone(DiscoveryClient.getZone(ii));serverList.add(des);}}if(serverList.size()>0&&prioritizeVipAddressBasedServers){break;}}}returnserverList;}
12345678910111213141516171819202122232425262728` | `privateList<DiscoveryEnabledServer>obtainServersViaDiscovery(){List<DiscoveryEnabledServer>serverList=newArrayList<DiscoveryEnabledServer>();if(eurekaClientProvider==null||eurekaClientProvider.get()==null){logger.warn("EurekaClienthasnotbeeninitializedyet,returninganemptylist");returnnewArrayList<DiscoveryEnabledServer>();}EurekaClienteurekaClient=eurekaClientProvider.get();if(vipAddresses!=null){for(StringvipAddress:vipAddresses.split(",")){List<InstanceInfo>listOfInstanceInfo=eurekaClient.getInstancesByVipAddress(vipAddress,isSecure,targetRegion);for(InstanceInfoii:listOfInstanceInfo){if(ii.getStatus().equals(InstanceStatus.UP)){//省略了一些实例信息的加工逻辑DiscoveryEnabledServerdes=newDiscoveryEnabledServer(ii,isSecure,shouldUseIpAddr);des.setZone(DiscoveryClient.getZone(ii));serverList.add(des);}}if(serverList.size()>0&&prioritizeVipAddressBasedServers){break;}}}returnserverList;}

DiscoveryEnabledNIWSServerList中通过EurekaClient从服务注册中心获取到最新的服务实例清单后,返回的List到了DomainExtractingServerList类中,将继续通过setZones函数进行处理,而这里的处理具体内容如下,主要完成将DiscoveryEnabledNIWSServerList返回的List列表中的元素,转换成内部定义的DiscoveryEnabledServer的子类对象DomainExtractingServer,在该对象的构造函数中将为服务实例对象设置一些必要的属性,比如id、zone、isAliveFlag、readyToServe等信息。

123456789101112`|`privateList<DiscoveryEnabledServer>setZones(List<DiscoveryEnabledServer>servers){List<DiscoveryEnabledServer>result=newArrayList<>();booleanisSecure=this.clientConfig.getPropertyAsBoolean(CommonClientConfigKey.IsSecure,Boolean.TRUE);booleanshouldUseIpAddr=this.clientConfig.getPropertyAsBoolean(CommonClientConfigKey.UseIPAddrForServer,Boolean.FALSE);for(DiscoveryEnabledServerserver:servers){result.add(newDomainExtractingServer(server,isSecure,shouldUseIpAddr,this.approximateZoneFromHostname));}returnresult;}
123456789101112` | `privateList<DiscoveryEnabledServer>setZones(List<DiscoveryEnabledServer>servers){List<DiscoveryEnabledServer>result=newArrayList<>();booleanisSecure=this.clientConfig.getPropertyAsBoolean(CommonClientConfigKey.IsSecure,Boolean.TRUE);booleanshouldUseIpAddr=this.clientConfig.getPropertyAsBoolean(CommonClientConfigKey.UseIPAddrForServer,Boolean.FALSE);for(DiscoveryEnabledServerserver:servers){result.add(newDomainExtractingServer(server,isSecure,shouldUseIpAddr,this.approximateZoneFromHostname));}returnresult;}
ServerListUpdater

通过上面的分析我们已经知道了Ribbon与Eureka整合后,如何实现从Eureka Server中获取服务实例清单。那么它又是如何触发向Eureka Server去获取服务实例清单以及如何在获取到服务实例清单后更新本地的服务实例清单的呢?继续来看DynamicServerListLoadBalancer中的实现内容,我们可以很容易的找到下面定义的关于ServerListUpdater的内容:

12345678`|`protectedfinalServerListUpdater.UpdateActionupdateAction=newServerListUpdater.UpdateAction(){@OverridepublicvoiddoUpdate(){updateListOfServers();}};protectedvolatileServerListUpdaterserverListUpdater;
12345678` | `protectedfinalServerListUpdater.UpdateActionupdateAction=newServerListUpdater.UpdateAction(){@OverridepublicvoiddoUpdate(){updateListOfServers();}};protectedvolatileServerListUpdaterserverListUpdater;

根据该接口的命名,我们基本就能猜到,这个对象实现的是对ServerList的更新,所以可以称它为“服务更新器”,从下面的源码中可以看到,在ServerListUpdater内部还定义了一个UpdateAction接口,上面定义的updateAction对象就是以匿名内部类的方式创建了一个它的具体实现,其中doUpdate实现的内容就是对ServerList的具体更新操作。除此之外,“服务更新器”中还定义了一系列控制它和获取它一些信息的操作。

123456789101112131415161718192021222324`|`publicinterfaceServerListUpdater{publicinterfaceUpdateAction{voiddoUpdate();}//启动服务更新器,传入的UpdateAction对象为更新操作的具体实现。voidstart(UpdateActionupdateAction);//停止服务更新器voidstop();//获取最近的更新时间戳StringgetLastUpdate();//获取上一次更新到现在的时间间隔,单位为毫秒longgetDurationSinceLastUpdateMs();//获取错过的更新周期数intgetNumberMissedCycles();//获取核心线程数intgetCoreThreads();}
123456789101112131415161718192021222324` | `publicinterfaceServerListUpdater{publicinterfaceUpdateAction{voiddoUpdate();}//启动服务更新器,传入的UpdateAction对象为更新操作的具体实现。voidstart(UpdateActionupdateAction);//停止服务更新器voidstop();//获取最近的更新时间戳StringgetLastUpdate();//获取上一次更新到现在的时间间隔,单位为毫秒longgetDurationSinceLastUpdateMs();//获取错过的更新周期数intgetNumberMissedCycles();//获取核心线程数intgetCoreThreads();}

ServerListUpdater的实现类不多,具体下图所示。

20191123100208\_4.png

根据两个类的注释,我们可以很容易的知道它们的作用分别是:

  • PollingServerListUpdater:动态服务列表更新的默认策略,也就是说DynamicServerListLoadBalancer负载均衡器中的默认实现就是它,它通过定时任务的方式进行服务列表的更新。
  • EurekaNotificationServerListUpdater:该更新器也可服务于DynamicServerListLoadBalancer负载均衡器,但是它的触发机制与PollingServerListUpdater不同,它需要利用Eureka的事件监听器来驱动服务列表的更新操作。

下面我们来详细看看它默认实现的PollingServerListUpdater。先从用于启动“服务更新器”的start函数源码看起,具体如下。我们可以看到start函数的实现内容验证了之前提到的:以定时任务的方式进行服务列表的更新。它先创建了一个Runnable的线程实现,在该实现中调用了上面提到的具体更新服务实例列表的方法updateAction.doUpdate(),最后再为这个Runnable的线程实现启动了一个定时任务来执行。

12345678910111213141516171819202122232425262728293031`|`@Overridepublicsynchronizedvoidstart(finalUpdateActionupdateAction){if(isActive.compareAndSet(false,true)){finalRunnablewrapperRunnable=newRunnable(){@Overridepublicvoidrun(){if(!isActive.get()){if(scheduledFuture!=null){scheduledFuture.cancel(true);}return;}try{updateAction.doUpdate();lastUpdated=System.currentTimeMillis();}catch(Exceptione){logger.warn("Failedoneupdatecycle",e);}}};scheduledFuture=getRefreshExecutor().scheduleWithFixedDelay(wrapperRunnable,initialDelayMs,refreshIntervalMs,TimeUnit.MILLISECONDS);}else{logger.info("Alreadyactive,no-op");}}
12345678910111213141516171819202122232425262728293031` | `@Overridepublicsynchronizedvoidstart(finalUpdateActionupdateAction){if(isActive.compareAndSet(false,true)){finalRunnablewrapperRunnable=newRunnable(){@Overridepublicvoidrun(){if(!isActive.get()){if(scheduledFuture!=null){scheduledFuture.cancel(true);}return;}try{updateAction.doUpdate();lastUpdated=System.currentTimeMillis();}catch(Exceptione){logger.warn("Failedoneupdatecycle",e);}}};scheduledFuture=getRefreshExecutor().scheduleWithFixedDelay(wrapperRunnable,initialDelayMs,refreshIntervalMs,TimeUnit.MILLISECONDS);}else{logger.info("Alreadyactive,no-op");}}

继续看PollingServerListUpdater的其他内容,我们可以找到用于启动定时任务的2个重要参数initialDelayMsrefreshIntervalMs的默认定义分别为1000和30*1000,单位为毫秒。也就是说更新服务实例在初始化之后延迟1秒后开始执行,并以30秒为周期重复执行。除了这些内容之外,我们还能看到它还会记录最后更新时间、是否存活等信息,同时也实现了ServerListUpdater中定义的一些其他操作内容,这些操作相对比较简单,这里不再具体说明,有兴趣的读者可以自己查看源码了解其实现原理。

ServerListFilter

在了解了更新服务实例的定时任务是如何启动的之后,我们继续回到updateAction.doUpdate()调用的具体实现位置,在DynamicServerListLoadBalancer中,它的实际实现委托给了updateListOfServers函数,具体实现如下:

123456789101112131415`|`publicvoidupdateListOfServers(){List<T>servers=newArrayList<T>();if(serverListImpl!=null){servers=serverListImpl.getUpdatedListOfServers();LOGGER.debug("ListofServersfor{}obtainedfromDiscoveryclient:{}",getIdentifier(),servers);if(filter!=null){servers=filter.getFilteredListOfServers(servers);LOGGER.debug("FilteredListofServersfor{}obtainedfromDiscoveryclient:{}",getIdentifier(),servers);}}updateAllServerList(servers);}
123456789101112131415` | `publicvoidupdateListOfServers(){List<T>servers=newArrayList<T>();if(serverListImpl!=null){servers=serverListImpl.getUpdatedListOfServers();LOGGER.debug("ListofServersfor{}obtainedfromDiscoveryclient:{}",getIdentifier(),servers);if(filter!=null){servers=filter.getFilteredListOfServers(servers);LOGGER.debug("FilteredListofServersfor{}obtainedfromDiscoveryclient:{}",getIdentifier(),servers);}}updateAllServerList(servers);}

可以看到,这里终于用到了我们之前提到的ServerListgetUpdatedListOfServers,通过之前的介绍我们已经可以知道这一步实现了从Eureka Server中获取服务可用实例的列表。在获得了服务实例列表之后,这里又将引入一个新的对象filter,追朔该对象的定义,我们可以找到它是ServerListFilter定义的。

ServerListFilter接口非常简单,该接口中之定义了一个方法List getFilteredListOfServers(List servers),主要用于实现对服务实例列表的过滤,通过传入的服务实例清单,根据一些规则返回过滤后的服务实例清单。该接口的实现如下图所示:

20191123100208\_5.png

其中,除了ZonePreferenceServerListFilter的实现是Spring Cloud Netflix中对Ribbon的扩展实现外,其他均是Netflix Ribbon中的实现类。我们可以分别看看这些过滤器实现都有什么特点:

  • AbstractServerListFilter:这是一个抽象过滤器,在这里定义了过滤时需要的一个重要依据对象LoadBalancerStats,我们在之前介绍过的,该对象存储了关于负载均衡器的一些属性和统计信息等。
123456789101112`|`publicabstractclassAbstractServerListFilter<TextendsServer>implementsServerListFilter<T>{privatevolatileLoadBalancerStatsstats;publicvoidsetLoadBalancerStats(LoadBalancerStatsstats){this.stats=stats;}publicLoadBalancerStatsgetLoadBalancerStats(){returnstats;}}
123456789101112` | `publicabstractclassAbstractServerListFilter<TextendsServer>implementsServerListFilter<T>{privatevolatileLoadBalancerStatsstats;publicvoidsetLoadBalancerStats(LoadBalancerStatsstats){this.stats=stats;}publicLoadBalancerStatsgetLoadBalancerStats(){returnstats;}}
  • ZoneAffinityServerListFilter:该过滤器基于“区域感知(Zone Affinity)”的方式实现服务实例的过滤,也就是说它会根据提供服务的实例所处区域(Zone)与消费者自身的所处区域(Zone)进行比较,过滤掉那些不是同处一个区域的实例。
123456789101112`|`publicList<T>getFilteredListOfServers(List<T>servers){if(zone!=null&&(zoneAffinity||zoneExclusive)&&servers!=null&&servers.size()>0){List<T>filteredServers=Lists.newArrayList(Iterables.filter(servers,this.zoneAffinityPredicate.getServerOnlyPredicate()));if(shouldEnableZoneAffinity(filteredServers)){returnfilteredServers;}elseif(zoneAffinity){overrideCounter.increment();}}returnservers;}
123456789101112` | `publicList<T>getFilteredListOfServers(List<T>servers){if(zone!=null&&(zoneAffinity||zoneExclusive)&&servers!=null&&servers.size()>0){List<T>filteredServers=Lists.newArrayList(Iterables.filter(servers,this.zoneAffinityPredicate.getServerOnlyPredicate()));if(shouldEnableZoneAffinity(filteredServers)){returnfilteredServers;}elseif(zoneAffinity){overrideCounter.increment();}}returnservers;}

从上面的源码中我们可以看到,对于服务实例列表的过滤是通过Iterables.filter(servers, this.zoneAffinityPredicate.getServerOnlyPredicate())来实现的,其中判断依据由ZoneAffinityPredicate实现服务实例与消费者的Zone比较。而在过滤之后,这里并不会马上返回过滤的结果,而是通过shouldEnableZoneAffinity函数来判断是否要启用“区域感知”的功能,从下面shouldEnableZoneAffinity的实现中,我们可以看到,它使用了LoadBalancerStatsgetZoneSnapshot方法来获取这些过滤后的同区域实例的基础指标(包含了:实例数量、断路器断开数、活动请求数、实例平均负载等),根据一系列的算法求出下面的几个评价值并与设置的阈值对比(下面的为默认值),若有一个条件符合,就不启用“区域感知”过滤的服务实例清单。这一算法实现对于集群出现区域故障时,依然可以依靠其他区域的实例进行正常服务提供了完善的高可用保障。同时,通过这里的介绍,我们也可以关联着来理解之前介绍Eureka的时候提到的对于区域分配设计来保证跨区域故障的高可用问题。

  • blackOutServerPercentage:故障实例百分比(断路器断开数 / 实例数量) >= 0.8
  • activeReqeustsPerServer:实例平均负载 >= 0.6
  • availableServers:可用实例数(实例数量 – 断路器断开数) < 2
123456789101112131415161718192021222324252627`|`privatebooleanshouldEnableZoneAffinity(List<T>filtered){if(!zoneAffinity&&!zoneExclusive){returnfalse;}if(zoneExclusive){returntrue;}LoadBalancerStatsstats=getLoadBalancerStats();if(stats==null){returnzoneAffinity;}else{logger.debug("Determiningifzoneaffinityshouldbeenabledwithgivenserverlist:{}",filtered);ZoneSnapshotsnapshot=stats.getZoneSnapshot(filtered);doubleloadPerServer=snapshot.getLoadPerServer();intinstanceCount=snapshot.getInstanceCount();intcircuitBreakerTrippedCount=snapshot.getCircuitTrippedCount();if(((double)circuitBreakerTrippedCount)/instanceCount>=blackOutServerPercentageThreshold.get()||loadPerServer>=activeReqeustsPerServerThreshold.get()||(instanceCount-circuitBreakerTrippedCount)<availableServersThreshold.get()){logger.debug("zoneAffinityisoverriden.blackOutServerPercentage:{},activeReqeustsPerServer:{},availableServers:{}",newObject[]{(double)circuitBreakerTrippedCount/instanceCount,loadPerServer,instanceCount-circuitBreakerTrippedCount});returnfalse;}else{returntrue;}}}
123456789101112131415161718192021222324252627` | `privatebooleanshouldEnableZoneAffinity(List<T>filtered){if(!zoneAffinity&&!zoneExclusive){returnfalse;}if(zoneExclusive){returntrue;}LoadBalancerStatsstats=getLoadBalancerStats();if(stats==null){returnzoneAffinity;}else{logger.debug("Determiningifzoneaffinityshouldbeenabledwithgivenserverlist:{}",filtered);ZoneSnapshotsnapshot=stats.getZoneSnapshot(filtered);doubleloadPerServer=snapshot.getLoadPerServer();intinstanceCount=snapshot.getInstanceCount();intcircuitBreakerTrippedCount=snapshot.getCircuitTrippedCount();if(((double)circuitBreakerTrippedCount)/instanceCount>=blackOutServerPercentageThreshold.get()||loadPerServer>=activeReqeustsPerServerThreshold.get()||(instanceCount-circuitBreakerTrippedCount)<availableServersThreshold.get()){logger.debug("zoneAffinityisoverriden.blackOutServerPercentage:{},activeReqeustsPerServer:{},availableServers:{}",newObject[]{(double)circuitBreakerTrippedCount/instanceCount,loadPerServer,instanceCount-circuitBreakerTrippedCount});returnfalse;}else{returntrue;}}}
  • DefaultNIWSServerListFilter:该过滤器完全继承自ZoneAffinityServerListFilter,是默认的NIWS(Netflix Internal Web Service)过滤器。
  • ServerListSubsetFilter:该过滤器也继承自ZoneAffinityServerListFilter,它非常适用于拥有大规模服务器集群(上百或更多)的系统。因为它可以产生一个“区域感知”结果的子集列表,同时它还能够通过比较服务实例的通信失败数量和并发连接数来判定该服务是否健康来选择性的从服务实例列表中剔除那些相对不够健康的实例。该过滤器的实现主要分为三步:

    1. 获取“区域感知”的过滤结果,来作为候选的服务实例清单
    2. 从当前消费者维护的服务实例子集中剔除那些相对不够健康的实例(同时也将这些实例从候选清单中剔除,防止第三步的时候又被选入),不够健康的标准如下:
      a. 服务实例的并发连接数超过客户端配置的值,默认为0,配置参数为:<clientName>.<nameSpace>.ServerListSubsetFilter.eliminationConnectionThresold
      b. 服务实例的失败数超过客户端配置的值,默认为0,配置参数为:<clientName>.<nameSpace>.ServerListSubsetFilter.eliminationFailureThresold
      c. 如果按符合上面任一规则的服务实例剔除后,剔除比例小于客户端默认配置的百分比,默认为0.1(10%),配置参数为:<clientName>.<nameSpace>.ServerListSubsetFilter.forceEliminatePercent。那么就先对剩下的实例列表进行健康排序,再开始从最不健康实例进行剔除,直到达到配置的剔除百分比。
    3. 在完成剔除后,清单已经少了至少10%(默认值)的服务实例,最后通过随机的方式从候选清单中选出一批实例加入到清单中,以保持服务实例子集与原来的数量一致,而默认的实例子集数量为20,其配置参数为:<clientName>.<nameSpace>.ServerListSubsetFilter.size
  • ZonePreferenceServerListFilter:Spring Cloud整合时新增的过滤器。若使用Spring Cloud整合Eureka和Ribbon时会默认使用该过滤器。它实现了通过配置或者Eureka实例元数据的所属区域(Zone)来过滤出同区域的服务实例。如下面的源码所示,它的实现非常简单,首先通过父类ZoneAffinityServerListFilter的过滤器来获得“区域感知”的服务实例列表,然后遍历这个结果取出根据消费者配置预设的区域Zone来进行过滤,如果过滤的结果是空的就直接返回父类获取的结果,如果不为空就返回通过消费者配置的Zone过滤后的结果。
12345678910111213141516`|`@OverridepublicList<Server>getFilteredListOfServers(List<Server>servers){List<Server>output=super.getFilteredListOfServers(servers);if(this.zone!=null&&output.size()==servers.size()){List<Server>local=newArrayList<Server>();for(Serverserver:output){if(this.zone.equalsIgnoreCase(server.getZone())){local.add(server);}}if(!local.isEmpty()){returnlocal;}}returnoutput;}
12345678910111213141516` | `@OverridepublicList<Server>getFilteredListOfServers(List<Server>servers){List<Server>output=super.getFilteredListOfServers(servers);if(this.zone!=null&&output.size()==servers.size()){List<Server>local=newArrayList<Server>();for(Serverserver:output){if(this.zone.equalsIgnoreCase(server.getZone())){local.add(server);}}if(!local.isEmpty()){returnlocal;}}returnoutput;}

ZoneAwareLoadBalancer

ZoneAwareLoadBalancer负载均衡器是对DynamicServerListLoadBalancer的扩展。在DynamicServerListLoadBalancer中,我们可以看到它并没有重写选择具体服务实例的chooseServer函数,所以它依然会采用在BaseLoadBalancer中实现的算法,使用RoundRobinRule规则,以线性轮询的方式来选择调用的服务实例,该算法实现简单并没有区域(Zone)的概念,所以它会把所有实例视为一个Zone下的节点来看待,这样就会周期性的产生跨区域(Zone)访问的情况,由于跨区域会产生更高的延迟,这些实例主要以防止区域性故障实现高可用为目的而不能作为常规访问的实例,所以在多区域部署的情况下会有一定的性能问题,而该负载均衡器则可以避免这样的问题。那么它是如何实现的呢?

首先,在ZoneAwareLoadBalancer中,我们可以发现,它并没有重写setServersList,说明实现服务实例清单的更新主逻辑没有修改。但是我们可以发现它重写了这个函数:setServerListForZones(Map<String, List<Server>> zoneServersMap)。看到这里可能会有一些陌生,因为它并不是接口中定义的必备函数,所以我们不妨去父类DynamicServerListLoadBalancer中寻找一下该函数,我们可以找到下面的定义了:

123456789101112`|`publicvoidsetServersList(Listlsrv){super.setServersList(lsrv);List<T>serverList=(List<T>)lsrv;Map<String,List<Server>>serversInZones=newHashMap<String,List<Server>>();...setServerListForZones(serversInZones);}protectedvoidsetServerListForZones(Map<String,List<Server>>zoneServersMap){LOGGER.debug("Settingserverlistforzones:{}",zoneServersMap);getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);}
123456789101112` | `publicvoidsetServersList(Listlsrv){super.setServersList(lsrv);List<T>serverList=(List<T>)lsrv;Map<String,List<Server>>serversInZones=newHashMap<String,List<Server>>();...setServerListForZones(serversInZones);}protectedvoidsetServerListForZones(Map<String,List<Server>>zoneServersMap){LOGGER.debug("Settingserverlistforzones:{}",zoneServersMap);getLoadBalancerStats().updateZoneServerMapping(zoneServersMap);}

setServerListForZones函数的调用位于更新服务实例清单函数setServersList的最后,同时从其实现内容来看,它在父类DynamicServerListLoadBalancer中的作用是根据按区域Zone分组的实例列表,为负载均衡器中的LoadBalancerStats对象创建ZoneStats并放入Map zoneStatsMap集合中,每一个区域Zone会对应一个ZoneStats,它用于存储每个Zone的一些状态和统计信息。

ZoneAwareLoadBalancer中对setServerListForZones的重写如下:

123456789101112131415`|`protectedvoidsetServerListForZones(Map<String,List<Server>>zoneServersMap){super.setServerListForZones(zoneServersMap);if(balancers==null){balancers=newConcurrentHashMap<String,BaseLoadBalancer>();}for(Map.Entry<String,List<Server>>entry:zoneServersMap.entrySet()){Stringzone=entry.getKey().toLowerCase();getLoadBalancer(zone).setServersList(entry.getValue());}for(Map.Entry<String,BaseLoadBalancer>existingLBEntry:balancers.entrySet()){if(!zoneServersMap.keySet().contains(existingLBEntry.getKey())){existingLBEntry.getValue().setServersList(Collections.emptyList());}}}
123456789101112131415` | `protectedvoidsetServerListForZones(Map<String,List<Server>>zoneServersMap){super.setServerListForZones(zoneServersMap);if(balancers==null){balancers=newConcurrentHashMap<String,BaseLoadBalancer>();}for(Map.Entry<String,List<Server>>entry:zoneServersMap.entrySet()){Stringzone=entry.getKey().toLowerCase();getLoadBalancer(zone).setServersList(entry.getValue());}for(Map.Entry<String,BaseLoadBalancer>existingLBEntry:balancers.entrySet()){if(!zoneServersMap.keySet().contains(existingLBEntry.getKey())){existingLBEntry.getValue().setServersList(Collections.emptyList());}}}

可以看到,在该实现中创建了一个ConcurrentHashMap()类型的balancers对象,它将用来存储每个Zone区域对应的负载均衡器,而具体的负载均衡器的创建则是通过下面的第一个循环中调用getLoadBalancer函数来完成,同时在创建负载均衡器的时候会创建它的规则(如果当前实现中没有IRULE的实例,就创建一个AvailabilityFilteringRule规则;如果已经有具体实例,就clone一个),在创建完负载均衡器后又马上调用setServersList函数为其设置对应Zone区域的实例清单。而第二个循环则是对Zone区域中实例清单的检查,看看是否有Zone区域下已经没有实例了,是的话就将balancers中对应Zone区域的实例列表清空,该操作的作用是为了后续选择节点时,防止过时的Zone区域统计信息干扰具体实例的选择算法。

在了解了该负载均衡器是如何扩展服务实例清单的实现后,我们来具体看看它是如何挑选服务实例,来实现对区域的识别的:

12345678910111213141516171819202122232425262728293031`|`publicServerchooseServer(Objectkey){if(!ENABLED.get()||getLoadBalancerStats().getAvailableZones().size()<=1){logger.debug("Zoneawarelogicdisabledorthereisonlyonezone");returnsuper.chooseServer(key);}Serverserver=null;try{LoadBalancerStatslbStats=getLoadBalancerStats();Map<String,ZoneSnapshot>zoneSnapshot=ZoneAvoidanceRule.createSnapshot(lbStats);logger.debug("Zonesnapshots:{}",zoneSnapshot);...Set<String>availableZones=ZoneAvoidanceRule.getAvailableZones(zoneSnapshot,triggeringLoad.get(),triggeringBlackoutPercentage.get());logger.debug("Availablezones:{}",availableZones);if(availableZones!=null&&availableZones.size()<zoneSnapshot.keySet().size()){Stringzone=ZoneAvoidanceRule.randomChooseZone(zoneSnapshot,availableZones);logger.debug("Zonechosen:{}",zone);if(zone!=null){BaseLoadBalancerzoneLoadBalancer=getLoadBalancer(zone);server=zoneLoadBalancer.chooseServer(key);}}}catch(Throwablee){logger.error("Unexpectedexceptionwhenchoosingserverusingzoneawarelogic",e);}if(server!=null){returnserver;}else{logger.debug("Zoneavoidancelogicisnotinvoked.");returnsuper.chooseServer(key);}}
12345678910111213141516171819202122232425262728293031` | `publicServerchooseServer(Objectkey){if(!ENABLED.get()||getLoadBalancerStats().getAvailableZones().size()<=1){logger.debug("Zoneawarelogicdisabledorthereisonlyonezone");returnsuper.chooseServer(key);}Serverserver=null;try{LoadBalancerStatslbStats=getLoadBalancerStats();Map<String,ZoneSnapshot>zoneSnapshot=ZoneAvoidanceRule.createSnapshot(lbStats);logger.debug("Zonesnapshots:{}",zoneSnapshot);...Set<String>availableZones=ZoneAvoidanceRule.getAvailableZones(zoneSnapshot,triggeringLoad.get(),triggeringBlackoutPercentage.get());logger.debug("Availablezones:{}",availableZones);if(availableZones!=null&&availableZones.size()<zoneSnapshot.keySet().size()){Stringzone=ZoneAvoidanceRule.randomChooseZone(zoneSnapshot,availableZones);logger.debug("Zonechosen:{}",zone);if(zone!=null){BaseLoadBalancerzoneLoadBalancer=getLoadBalancer(zone);server=zoneLoadBalancer.chooseServer(key);}}}catch(Throwablee){logger.error("Unexpectedexceptionwhenchoosingserverusingzoneawarelogic",e);}if(server!=null){returnserver;}else{logger.debug("Zoneavoidancelogicisnotinvoked.");returnsuper.chooseServer(key);}}

从源码中我们可以看到,只有当负载均衡器中维护的实例所属Zone区域个数大于1的时候才会执行这里的选择策略,否则还是将使用父类的实现。当Zone区域个数大于1个的时候,它的实现步骤主要如下:

  • 调用ZoneAvoidanceRule中的静态方法createSnapshot(lbStats),为当前负载均衡器中所有的Zone区域分别创建快照,保存在Map zoneSnapshot中,这些快照中的数据将用于后续的算法。
  • 调用ZoneAvoidanceRule中的静态方法getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get()),来获取可用的Zone区域集合,在该函数中会通过Zone区域快照中的统计数据来实现可用区的挑选。

    • 首先它会剔除符合这些规则的Zone区域:所属实例数为零的Zone区域;Zone区域内实例平均负载小于零,或者实例故障率(断路器断开次数/实例数)大于等于阈值(默认为0.99999)。
    • 然后根据Zone区域的实例平均负载来计算出最差的Zone区域,这里的最差指的是实例平均负载最高的Zone区域。
    • 如果在上面的过程中没有符合剔除要求的区域,同时实例最大平均负载小于阈值(默认为20%),就直接返回所有Zone区域为可用区域。否则,从最坏Zone区域集合中随机的选择一个,将它从可用Zone区域集合中剔除。
  • 当获得的可用Zone区域集合不为空,并且个数小于Zone区域总数,就随机的选择一个Zone区域。
  • 在确定了某个Zone区域后,则获取对应Zone区域的服务均衡器,并调用chooseServer来选择具体的服务实例,而在chooseServer中将使用IRule接口的choose函数来选择具体的服务实例。在这里IRule接口的实现会使用ZoneAvoidanceRule来挑选出具体的服务实例。

负载均衡策略

通过上面的源码解读,我们已经对Ribbon实现的负载均衡器以及其中包含的服务实例过滤器、服务实例信息的存储对象、区域的信息快照等都有了深入的认识和理解,但是对于负载均衡器中的服务实例选择策略只是讲解了几个默认实现的内容,而对于IRule的其他实现还没有详细的解读,下面我们来看看在Ribbon中共提供了那些负载均衡的策略实现。

20191123100208\_6.png

如上图所示,我们可以看到在Ribbon中实现了非常多的选择策略,其中也包含了我们在前面内容中提到过的:RoundRobinRuleZoneAvoidanceRule。下面我们来详细的解读一下IRule接口的各个实现。

AbstractLoadBalancerRule

负载均衡策略的抽象类,在该抽象类中定义了负载均衡器ILoadBalancer对象,该对象能够在具体实现选择服务策略时,获取到一些负载均衡器中维护的信息来作为分配依据,并以此设计一些算法来实现针对特定场景的高效策略。

1234567891011121314`|`publicabstractclassAbstractLoadBalancerRuleimplementsIRule,IClientConfigAware{privateILoadBalancerlb;@OverridepublicvoidsetLoadBalancer(ILoadBalancerlb){this.lb=lb;}@OverridepublicILoadBalancergetLoadBalancer(){returnlb;}}
1234567891011121314` | `publicabstractclassAbstractLoadBalancerRuleimplementsIRule,IClientConfigAware{privateILoadBalancerlb;@OverridepublicvoidsetLoadBalancer(ILoadBalancerlb){this.lb=lb;}@OverridepublicILoadBalancergetLoadBalancer(){returnlb;}}

RandomRule

该策略实现了从服务实例清单中随机选择一个服务实例的功能。它的具体实现如下,可以看到IRule接口的choose(Object key)函数实现,委托给了该类中的choose(ILoadBalancer lb, Object key),该方法增加了一个负载均衡器对象的参数。从具体的实现上看,它会使用传入的负载均衡器来获得可用实例列表upList和所有实例列表allList,并通过rand.nextInt(serverCount)函数来获取一个随机数,并将该随机数作为upList的索引值来返回具体实例。同时,具体的选择逻辑在一个while (server == null)循环之内,而根据选择逻辑的实现,正常情况下每次选择都应该能够选出一个服务实例,如果出现死循环获取不到服务实例时,则很有可能存在并发的Bug。

1234567891011121314151617181920212223242526272829303132`|`@OverridepublicServerchoose(Objectkey){returnchoose(getLoadBalancer(),key);}publicServerchoose(ILoadBalancerlb,Objectkey){...Serverserver=null;while(server==null){if(Thread.interrupted()){returnnull;}List<Server>upList=lb.getReachableServers();List<Server>allList=lb.getAllServers();intserverCount=allList.size();if(serverCount==0){returnnull;}intindex=rand.nextInt(serverCount);server=upList.get(index);if(server==null){Thread.yield();continue;}if(server.isAlive()){return(server);}server=null;Thread.yield();}returnserver;}
1234567891011121314151617181920212223242526272829303132` | `@OverridepublicServerchoose(Objectkey){returnchoose(getLoadBalancer(),key);}publicServerchoose(ILoadBalancerlb,Objectkey){...Serverserver=null;while(server==null){if(Thread.interrupted()){returnnull;}List<Server>upList=lb.getReachableServers();List<Server>allList=lb.getAllServers();intserverCount=allList.size();if(serverCount==0){returnnull;}intindex=rand.nextInt(serverCount);server=upList.get(index);if(server==null){Thread.yield();continue;}if(server.isAlive()){return(server);}server=null;Thread.yield();}returnserver;}

RoundRobinRule

该策略实现了按照线性轮询的方式依次选择每个服务实例的功能。它的具体实现如下,其详细结构与RandomRule非常类似。除了循环条件不同外,就是从可用列表中获取所谓的逻辑不同。从循环条件中,我们可以看到增加了一个count计数变量,该变量会在每次循环之后累加,也就是说如果一直选择不到server超过10次,那么就会结束尝试,并打印一个警告信息No available alive servers after 10 tries from load balancer: ...。而线性轮询的实现则是通过AtomicInteger nextServerCyclicCounter对象实现,每次进行实例选择时通过调用incrementAndGetModulo函数实现递增。

123456789101112131415161718192021222324252627282930`|`publicServerchoose(ILoadBalancerlb,Objectkey){...Serverserver=null;intcount=0;while(server==null&&count++<10){List<Server>reachableServers=lb.getReachableServers();List<Server>allServers=lb.getAllServers();intupCount=reachableServers.size();intserverCount=allServers.size();if((upCount==0)||(serverCount==0)){log.warn("Noupserversavailablefromloadbalancer:"+lb);returnnull;}intnextServerIndex=incrementAndGetModulo(serverCount);server=allServers.get(nextServerIndex);if(server==null){Thread.yield();continue;}if(server.isAlive()&&(server.isReadyToServe())){return(server);}server=null;}if(count>=10){log.warn("Noavailablealiveserversafter10triesfromloadbalancer:"+lb);}returnserver;}
123456789101112131415161718192021222324252627282930` | `publicServerchoose(ILoadBalancerlb,Objectkey){...Serverserver=null;intcount=0;while(server==null&&count++<10){List<Server>reachableServers=lb.getReachableServers();List<Server>allServers=lb.getAllServers();intupCount=reachableServers.size();intserverCount=allServers.size();if((upCount==0)||(serverCount==0)){log.warn("Noupserversavailablefromloadbalancer:"+lb);returnnull;}intnextServerIndex=incrementAndGetModulo(serverCount);server=allServers.get(nextServerIndex);if(server==null){Thread.yield();continue;}if(server.isAlive()&&(server.isReadyToServe())){return(server);}server=null;}if(count>=10){log.warn("Noavailablealiveserversafter10triesfromloadbalancer:"+lb);}returnserver;}

RetryRule

该策略实现了一个具备重试机制的实例选择功能。从下面的实现中我们可以看到,在其内部还定义了一个IRule对象,默认使用了RoundRobinRule实例。而在choose方法中的则实现了对内部定义的策略进行反复尝试的策略,若期间能够选择到具体的服务实例就返回,若选择不到就根据设置的尝试结束时间为阈值(maxRetryMillis参数定义的值 +choose方法开始执行的时间戳),当超过该阈值后就返回null。

1234567891011121314151617181920212223242526272829303132`|`publicclassRetryRuleextendsAbstractLoadBalancerRule{IRulesubRule=newRoundRobinRule();longmaxRetryMillis=500;...publicServerchoose(ILoadBalancerlb,Objectkey){longrequestTime=System.currentTimeMillis();longdeadline=requestTime+maxRetryMillis;Serveranswer=null;answer=subRule.choose(key);if(((answer==null)||(!answer.isAlive()))&&(System.currentTimeMillis()<deadline)){InterruptTasktask=newInterruptTask(deadline-System.currentTimeMillis());while(!Thread.interrupted()){answer=subRule.choose(key);if(((answer==null)||(!answer.isAlive()))&&(System.currentTimeMillis()<deadline)){Thread.yield();}else{break;}}task.cancel();}if((answer==null)||(!answer.isAlive())){returnnull;}else{returnanswer;}}...}
1234567891011121314151617181920212223242526272829303132` | `publicclassRetryRuleextendsAbstractLoadBalancerRule{IRulesubRule=newRoundRobinRule();longmaxRetryMillis=500;...publicServerchoose(ILoadBalancerlb,Objectkey){longrequestTime=System.currentTimeMillis();longdeadline=requestTime+maxRetryMillis;Serveranswer=null;answer=subRule.choose(key);if(((answer==null)||(!answer.isAlive()))&&(System.currentTimeMillis()<deadline)){InterruptTasktask=newInterruptTask(deadline-System.currentTimeMillis());while(!Thread.interrupted()){answer=subRule.choose(key);if(((answer==null)||(!answer.isAlive()))&&(System.currentTimeMillis()<deadline)){Thread.yield();}else{break;}}task.cancel();}if((answer==null)||(!answer.isAlive())){returnnull;}else{returnanswer;}}...}

WeightedResponseTimeRule

该策略是对RoundRobinRule的扩展,增加了根据实例的运行情况来计算权重,并根据权重来挑选实例,以达到更优的分配效果,它的实现主要有三个核心内容:

定时任务

WeightedResponseTimeRule策略在初始化的时候会通过serverWeightTimer.schedule(new DynamicServerWeightTask(), 0, serverWeightTaskTimerInterval)启动一个定时任务,用来为每个服务实例计算权重,该任务默认30秒执行一次。

12345678910`|`classDynamicServerWeightTaskextendsTimerTask{publicvoidrun(){ServerWeightserverWeight=newServerWeight();try{serverWeight.maintainWeights();}catch(Throwablet){logger.error("ThrowablecaughtwhilerunningDynamicServerWeightTaskfor"+name,t);}}}
12345678910` | `classDynamicServerWeightTaskextendsTimerTask{publicvoidrun(){ServerWeightserverWeight=newServerWeight();try{serverWeight.maintainWeights();}catch(Throwablet){logger.error("ThrowablecaughtwhilerunningDynamicServerWeightTaskfor"+name,t);}}}

权重计算

在源码中我们可以轻松找到用于存储权重的对象:List<Double> accumulatedWeights = new ArrayList<Double>(),该List中每个权重值所处的位置对应了负载均衡器维护的服务实例清单中所有实例在清单中的位置。

维护实例权重的计算过程通过maintainWeights函数实现,具体如下源码所示:

12345678910111213141516171819202122232425262728293031`|`publicvoidmaintainWeights(){ILoadBalancerlb=getLoadBalancer();...try{logger.info("Weightadjustingjobstarted");AbstractLoadBalancernlb=(AbstractLoadBalancer)lb;LoadBalancerStatsstats=nlb.getLoadBalancerStats();...//计算所有实例的平均响应时间的总和:totalResponseTimedoubletotalResponseTime=0;for(Serverserver:nlb.getAllServers()){//thiswillautomaticallyloadthestatsifnotincacheServerStatsss=stats.getSingleServerStat(server);totalResponseTime+=ss.getResponseTimeAvg();}//逐个计算每个实例的权重:weightSoFar+totalResponseTime-实例的平均响应时间DoubleweightSoFar=0.0;List<Double>finalWeights=newArrayList<Double>();for(Serverserver:nlb.getAllServers()){ServerStatsss=stats.getSingleServerStat(server);doubleweight=totalResponseTime-ss.getResponseTimeAvg();weightSoFar+=weight;finalWeights.add(weightSoFar);}setWeights(finalWeights);}catch(Throwablet){logger.error("Exceptionwhiledynamicallycalculatingserverweights",t);}finally{serverWeightAssignmentInProgress.set(false);}}
12345678910111213141516171819202122232425262728293031` | `publicvoidmaintainWeights(){ILoadBalancerlb=getLoadBalancer();...try{logger.info("Weightadjustingjobstarted");AbstractLoadBalancernlb=(AbstractLoadBalancer)lb;LoadBalancerStatsstats=nlb.getLoadBalancerStats();...//计算所有实例的平均响应时间的总和:totalResponseTimedoubletotalResponseTime=0;for(Serverserver:nlb.getAllServers()){//thiswillautomaticallyloadthestatsifnotincacheServerStatsss=stats.getSingleServerStat(server);totalResponseTime+=ss.getResponseTimeAvg();}//逐个计算每个实例的权重:weightSoFar+totalResponseTime-实例的平均响应时间DoubleweightSoFar=0.0;List<Double>finalWeights=newArrayList<Double>();for(Serverserver:nlb.getAllServers()){ServerStatsss=stats.getSingleServerStat(server);doubleweight=totalResponseTime-ss.getResponseTimeAvg();weightSoFar+=weight;finalWeights.add(weightSoFar);}setWeights(finalWeights);}catch(Throwablet){logger.error("Exceptionwhiledynamicallycalculatingserverweights",t);}finally{serverWeightAssignmentInProgress.set(false);}}

该函数的实现主要分为两个步骤:

  • 根据LoadBalancerStats中记录的每个实例的统计信息,累加所有实例的平均响应时间,得到总平均响应时间totalResponseTime,该值会用于后续的计算。
  • 为负载均衡器中维护的实例清单逐个计算权重(从第一个开始),计算规则为:weightSoFar + totalResponseTime - 实例的平均响应时间,其中weightSoFar初始化为零,并且每计算好一个权重需要累加到weightSoFar上供下一次计算使用。totalResponseTime则的上计算结果。

举个简单的例子来理解这个计算过程:假设有4个实例A、B、C、D,他们的平均响应时间为:10、40、80、100,所以总响应时间是10 + 40 + 80 + 100 = 230,每个实例的权重为总响应时间与实例自身的平均响应时间的差的累积获得,所以实例A、B、C、D的权重分别为:

  • 实例A:230 – 10 = 220
  • 实例B:220 + (230 – 40)= 410
  • 实例C:410 + (230 – 80)= 560
  • 实例D:560 + (230 – 100)= 690

需要注意的是,这里的权重值只是表示了各实例权重区间的上限,并非某个实例的优先级,所以不是数值越大被选中的概率就越大。那么什么是权重区间呢?以上面例子的计算结果为例,它实际上是为这4个实例构建了4个不同的区间,每个实例的区间下限是上一个实例的区间上限,而每个实例的区间上限则是我们上面计算并存储于List accumulatedWeights中的权重值,其中第一个实例的下限默认为零。所以,根据上面示例的权重计算结果,我们可以得到每个实例的权重区间:

  • 实例A:[0, 220]
  • 实例B:(220, 410]
  • 实例C:(410, 560]
  • 实例D:(560,690)

我们不难发现,实际上每个区间的宽度就是:总的平均响应时间 – 实例的平均响应时间,所以实例的平均响应时间越短、权重区间的宽度越大,而权重区间的宽度越大被选中的概率就越高。可能很多读者会问,这些区间边界的开闭是如何确定的呢?为什么不那么规则?下面我们会通过实例选择算法的解读来解释。

实例选择

WeightedResponseTimeRule选择实例的实现与之前介绍的算法结构类似,下面是它主体的算法(省略了循环体和一些判断等处理):

1234567891011121314151617181920212223242526272829303132333435`|`publicServerchoose(ILoadBalancerlb,Objectkey){...List<Double>currentWeights=accumulatedWeights;...List<Server>allList=lb.getAllServers();intserverCount=allList.size();if(serverCount==0){returnnull;}intserverIndex=0;//获取最后一个实例的权重doublemaxTotalWeight=currentWeights.size()==0?0:currentWeights.get(currentWeights.size()-1);if(maxTotalWeight<0.001d){//如果最后一个实例的权重值小于0.001,则采用父类实现的线性轮询的策略server=super.choose(getLoadBalancer(),key);if(server==null){returnserver;}}else{//如果最后一个实例的权重值大于等于0.001,就产生一个[0,maxTotalWeight)的随机数doublerandomWeight=random.nextDouble()*maxTotalWeight;intn=0;for(Doubled:currentWeights){//遍历维护的权重清单,若权重大于等于随机得到的数值,就选择这个实例if(d>=randomWeight){serverIndex=n;break;}else{n++;}}server=allList.get(serverIndex);}...returnserver;}
1234567891011121314151617181920212223242526272829303132333435` | `publicServerchoose(ILoadBalancerlb,Objectkey){...List<Double>currentWeights=accumulatedWeights;...List<Server>allList=lb.getAllServers();intserverCount=allList.size();if(serverCount==0){returnnull;}intserverIndex=0;//获取最后一个实例的权重doublemaxTotalWeight=currentWeights.size()==0?0:currentWeights.get(currentWeights.size()-1);if(maxTotalWeight<0.001d){//如果最后一个实例的权重值小于0.001,则采用父类实现的线性轮询的策略server=super.choose(getLoadBalancer(),key);if(server==null){returnserver;}}else{//如果最后一个实例的权重值大于等于0.001,就产生一个[0,maxTotalWeight)的随机数doublerandomWeight=random.nextDouble()*maxTotalWeight;intn=0;for(Doubled:currentWeights){//遍历维护的权重清单,若权重大于等于随机得到的数值,就选择这个实例if(d>=randomWeight){serverIndex=n;break;}else{n++;}}server=allList.get(serverIndex);}...returnserver;}

从源码中,我们可以看到,选择实例的核心过程就两步:

  • 生产一个[0, 最大权重值)区间内的随机数。
  • 遍历权重列表,比较权重值与随机数的大小,如果权重值大于等于随机数,就拿当前权重列表的索引值去服务实例列表中获取具体实例。这就是在上一节中提到的服务实例会根据权重区间挑选的原理,而权重区间边界的开闭原则根据算法,正常应该每个区间为(x, y]的形式,但是第一个实例和最后一个实例为什么不同呢?由于随机数的最小取值可以为0,所以第一个实例的下限是闭区间,同时随机数的最大值取不到最大权重值,所以最后一个实例的上限是开区间。

若继续以上面的数据为例,进行服务实例的选择,则该方法会从[0, 690)区间中选出一个随机数,比如选出的随机数为230,由于该值位于第二个区间,所以此时就会选择实例B来进行请求。

ClientConfigEnabledRoundRobinRule

该策略较为特殊,我们一般不直接使用它。因为它本身并没有实现什么特殊的处理逻辑,正如下面的源码所示,在它的内部定义了一个RoundRobinRule策略,而choose函数的实现也正是使用了RoundRobinRule的线性轮询机制,所以它实现的功能实际上与RoundRobinRule相同,那么定义它有什么特殊的用处呢?

虽然我们不会直接使用该策略,但是通过继承该策略,那么默认的choose就实现了线性轮询机制,在子类中做一些高级策略时通常都有可能会存在一些无法实施的情况,那么就可以通过父类的实现作为备选。在后文中我们将继续介绍的高级策略均是基于ClientConfigEnabledRoundRobinRule的扩展。

1234567891011121314`|`publicclassClientConfigEnabledRoundRobinRuleextendsAbstractLoadBalancerRule{RoundRobinRuleroundRobinRule=newRoundRobinRule();...@OverridepublicServerchoose(Objectkey){if(roundRobinRule!=null){returnroundRobinRule.choose(key);}else{thrownewIllegalArgumentException("ThisclasshasnotbeeninitializedwiththeRoundRobinRuleclass");}}}
1234567891011121314` | `publicclassClientConfigEnabledRoundRobinRuleextendsAbstractLoadBalancerRule{RoundRobinRuleroundRobinRule=newRoundRobinRule();...@OverridepublicServerchoose(Objectkey){if(roundRobinRule!=null){returnroundRobinRule.choose(key);}else{thrownewIllegalArgumentException("ThisclasshasnotbeeninitializedwiththeRoundRobinRuleclass");}}}

BestAvailableRule

该策略继承自ClientConfigEnabledRoundRobinRule,在实现中它注入了负载均衡器的统计对象:LoadBalancerStats,同时在具体的choose算法中利用LoadBalancerStats保存的实例统计信息来选择满足要求的实例。从如下源码中我们可以看到,它通过遍历负载均衡器中维护的所有服务实例,会过滤掉故障的实例,并找出并发请求数最小的一个,所以该策略的特性是选出最空闲的实例。

123456789101112131415161718192021222324`|`publicServerchoose(Objectkey){if(loadBalancerStats==null){returnsuper.choose(key);}List<Server>serverList=getLoadBalancer().getAllServers();intminimalConcurrentConnections=Integer.MAX_VALUE;longcurrentTime=System.currentTimeMillis();Serverchosen=null;for(Serverserver:serverList){ServerStatsserverStats=loadBalancerStats.getSingleServerStat(server);if(!serverStats.isCircuitBreakerTripped(currentTime)){intconcurrentConnections=serverStats.getActiveRequestsCount(currentTime);if(concurrentConnections<minimalConcurrentConnections){minimalConcurrentConnections=concurrentConnections;chosen=server;}}}if(chosen==null){returnsuper.choose(key);}else{returnchosen;}}
123456789101112131415161718192021222324` | `publicServerchoose(Objectkey){if(loadBalancerStats==null){returnsuper.choose(key);}List<Server>serverList=getLoadBalancer().getAllServers();intminimalConcurrentConnections=Integer.MAX_VALUE;longcurrentTime=System.currentTimeMillis();Serverchosen=null;for(Serverserver:serverList){ServerStatsserverStats=loadBalancerStats.getSingleServerStat(server);if(!serverStats.isCircuitBreakerTripped(currentTime)){intconcurrentConnections=serverStats.getActiveRequestsCount(currentTime);if(concurrentConnections<minimalConcurrentConnections){minimalConcurrentConnections=concurrentConnections;chosen=server;}}}if(chosen==null){returnsuper.choose(key);}else{returnchosen;}}

同时,由于该算法的核心依据是统计对象loadBalancerStats,当其为空的时候,该策略是无法执行的。所以从源码中我们可以看到,当loadBalancerStats为空的时候,它会采用父类的线性轮询策略,正如我们在介绍ClientConfigEnabledRoundRobinRule时那样,它的子类在无法满足实现高级策略时候,可以使用线性轮询策略的特性。后面将要介绍的策略因为也都继承自ClientConfigEnabledRoundRobinRule,所以他们都会具有这样的特性。

PredicateBasedRule

这是一个抽象策略,它也继承了ClientConfigEnabledRoundRobinRule,从其命名中可以猜出他是一个基于Predicate实现的策略,Predicate是Google Guava Collection工具对集合进行过滤的条件接口。

如下源码所示,它定义了一个抽象函数getPredicate来获取AbstractServerPredicate对象的实现,而在choose函数中,通过AbstractServerPredicatechooseRoundRobinAfterFiltering函数来选出具体的服务实例。从该函数的命名我们也大致能猜出它的基础逻辑:先通过子类中实现的Predicate逻辑来过滤一部分服务实例,然后再以线性轮询的方式从过滤后的实例清单中选出一个。

123456789101112131415`|`publicabstractclassPredicateBasedRuleextendsClientConfigEnabledRoundRobinRule{publicabstractAbstractServerPredicategetPredicate();@OverridepublicServerchoose(Objectkey){ILoadBalancerlb=getLoadBalancer();Optional<Server>server=getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(),key);if(server.isPresent()){returnserver.get();}else{returnnull;}}}
123456789101112131415` | `publicabstractclassPredicateBasedRuleextendsClientConfigEnabledRoundRobinRule{publicabstractAbstractServerPredicategetPredicate();@OverridepublicServerchoose(Objectkey){ILoadBalancerlb=getLoadBalancer();Optional<Server>server=getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(),key);if(server.isPresent()){returnserver.get();}else{returnnull;}}}

通过下面AbstractServerPredicate的源码片段,可以证实我们上面所做的猜测。在上面choose函数中调用的chooseRoundRobinAfterFiltering方法先通过内部定义的getEligibleServers函数来获取备选的实例清单(实现了过滤),如果返回的清单为空,则用Optional.absent()来表示不存在,反之则以线性轮询的方式从备选清单中获取一个实例。

1234567891011121314151617181920212223242526`|`publicabstractclassAbstractServerPredicateimplementsPredicate<PredicateKey>{...publicOptional<Server>chooseRoundRobinAfterFiltering(List<Server>servers,ObjectloadBalancerKey){List<Server>eligible=getEligibleServers(servers,loadBalancerKey);if(eligible.size()==0){returnOptional.absent();}returnOptional.of(eligible.get(nextIndex.getAndIncrement()%eligible.size()));}publicList<Server>getEligibleServers(List<Server>servers,ObjectloadBalancerKey){if(loadBalancerKey==null){returnImmutableList.copyOf(Iterables.filter(servers,this.getServerOnlyPredicate()));}else{List<Server>results=Lists.newArrayList();for(Serverserver:servers){if(this.apply(newPredicateKey(loadBalancerKey,server))){results.add(server);}}returnresults;}}}
1234567891011121314151617181920212223242526` | `publicabstractclassAbstractServerPredicateimplementsPredicate<PredicateKey>{...publicOptional<Server>chooseRoundRobinAfterFiltering(List<Server>servers,ObjectloadBalancerKey){List<Server>eligible=getEligibleServers(servers,loadBalancerKey);if(eligible.size()==0){returnOptional.absent();}returnOptional.of(eligible.get(nextIndex.getAndIncrement()%eligible.size()));}publicList<Server>getEligibleServers(List<Server>servers,ObjectloadBalancerKey){if(loadBalancerKey==null){returnImmutableList.copyOf(Iterables.filter(servers,this.getServerOnlyPredicate()));}else{List<Server>results=Lists.newArrayList();for(Serverserver:servers){if(this.apply(newPredicateKey(loadBalancerKey,server))){results.add(server);}}returnresults;}}}

在了解了整体逻辑之后,我们来详细看看实现过滤功能的getEligibleServers函数。从源码上看,它的实现结构非常简单清晰,通过遍历服务清单,使用this.apply方法来判断实例是否需要保留,是就添加到结果列表中。

可能到这里,不熟悉Google Guava Collections集合工具的读者会比较困惑,这个applyAbstractServerPredicate中并找不到它的定义,那么它是如何实现过滤的呢?实际上,AbstractServerPredicate实现了com.google.common.base.Predicate接口,而apply方法是该接口中的定义,主要用来实现过滤条件的判断逻辑,它输入的参数则是过滤条件需要用到的一些信息(比如源码中的new PredicateKey(loadBalancerKey, server)),它传入了关于实例的统计信息和负载均衡器的选择算法传递过来的key)。既然在AbstractServerPredicate中我们未能找到apply的实现,所以这里的chooseRoundRobinAfterFiltering函数只是定义了一个模板策略:“先过滤清单,再轮询选择”。对于如何过滤,则需要我们在AbstractServerPredicate的子类去实现apply方法来确定具体的过滤策略了。

后面我们将要介绍的两个策略就是基于此抽象策略实现,只是它们使用了不同的Predicate实现来完成过滤逻辑以达到不同的实例选择效果。

Google Guava Collections是一个对Java Collections Framework增强和扩展的一个开源项目。虽然Java Collections Framework已经能够 满足了我们大多数情况下使用集合的要求,但是当遇到一些特殊的情况我们的代码会比较冗长且容易出错。Guava Collections 可以帮助我们的让集合操作代码更为简短精炼并大大增强代码的可读性。

AvailabilityFilteringRule

该策略继承自上面介绍的抽象策略PredicateBasedRule,所以它也继承了“先过滤清单,再轮询选择”的基本处理逻辑,其中过滤条件使用了AvailabilityPredicate

1234567891011121314151617181920`|`publicclassAvailabilityPredicateextendsAbstractServerPredicate{...publicbooleanapply(@NullablePredicateKeyinput){LoadBalancerStatsstats=getLBStats();if(stats==null){returntrue;}return!shouldSkipServer(stats.getSingleServerStat(input.getServer()));}privatebooleanshouldSkipServer(ServerStatsstats){if((CIRCUIT_BREAKER_FILTERING.get()&&stats.isCircuitBreakerTripped())||stats.getActiveRequestsCount()>=activeConnectionsLimit.get()){returntrue;}returnfalse;}}
1234567891011121314151617181920` | `publicclassAvailabilityPredicateextendsAbstractServerPredicate{...publicbooleanapply(@NullablePredicateKeyinput){LoadBalancerStatsstats=getLBStats();if(stats==null){returntrue;}return!shouldSkipServer(stats.getSingleServerStat(input.getServer()));}privatebooleanshouldSkipServer(ServerStatsstats){if((CIRCUIT_BREAKER_FILTERING.get()&&stats.isCircuitBreakerTripped())||stats.getActiveRequestsCount()>=activeConnectionsLimit.get()){returntrue;}returnfalse;}}

从上述源码中,我们可以知道它的主要过滤逻辑位于shouldSkipServer方法中,它主要判断服务实例的两项内容:

  • 是否故障,即断路器是否生效已断开
  • 实例的并发请求数大于阈值,默认值为$2^{31}$ – 1,该配置我们可通过参数..ActiveConnectionsLimit来修改
    其中只要有一个满足apply就返回false(代表该节点可能存在故障或负载过高),都不满足就返回true。

在该策略中,除了实现了上面的过滤方法之外,对于choose的策略也做了一些改进优化,所以父类的实现对于它来说只是一个备用选项,其具体实现如下:

1234567891011`|`publicServerchoose(Objectkey){intcount=0;Serverserver=roundRobinRule.choose(key);while(count++<=10){if(predicate.apply(newPredicateKey(server))){returnserver;}server=roundRobinRule.choose(key);}returnsuper.choose(key);}
1234567891011` | `publicServerchoose(Objectkey){intcount=0;Serverserver=roundRobinRule.choose(key);while(count++<=10){if(predicate.apply(newPredicateKey(server))){returnserver;}server=roundRobinRule.choose(key);}returnsuper.choose(key);}

可以看到,它并没有像父类中那样,先遍历所有的节点进行过滤,然后在过滤后的集合中选择实例。而是先线性的方式选择一个实例,接着用过滤条件来判断该实例是否满足要求,若满足就直接使用该实例,若不满足要求就再选择下一个实例,并检查是否满足要求,如此循环进行,当这个过程重复了10次还是没有找到符合要求的实例,就采用父类的实现方案。

简单的说,该策略通过线性抽样的方式直接尝试寻找可用且较空闲的实例来使用,优化了父类每次都要遍历所有实例的开销。

ZoneAvoidanceRule

该策略我们在介绍负载均衡器ZoneAwareLoadBalancer时已经提到过了,它也是PredicateBasedRule的具体实现类。在之前的介绍中主要针对ZoneAvoidanceRule中用于选择Zone区域策略的一些静态函数,比如:createSnapshotgetAvailableZones。在这里我们将详细的看看ZoneAvoidanceRule作为服务实例过滤条件的实现原理。从下面ZoneAvoidanceRule的源码片段中我们可以看到,它使用了CompositePredicate来进行服务实例清单的过滤。这是一个组合过滤条件,在其构造函数中,它以ZoneAvoidancePredicate为主过滤条件,AvailabilityPredicate为次过滤条件初始化了组合过滤条件的实例。

12345678910111213`|`publicclassZoneAvoidanceRuleextendsPredicateBasedRule{...privateCompositePredicatecompositePredicate;publicZoneAvoidanceRule(){super();ZoneAvoidancePredicatezonePredicate=newZoneAvoidancePredicate(this);AvailabilityPredicateavailabilityPredicate=newAvailabilityPredicate(this);compositePredicate=createCompositePredicate(zonePredicate,availabilityPredicate);}...}
12345678910111213` | `publicclassZoneAvoidanceRuleextendsPredicateBasedRule{...privateCompositePredicatecompositePredicate;publicZoneAvoidanceRule(){super();ZoneAvoidancePredicatezonePredicate=newZoneAvoidancePredicate(this);AvailabilityPredicateavailabilityPredicate=newAvailabilityPredicate(this);compositePredicate=createCompositePredicate(zonePredicate,availabilityPredicate);}...}

ZoneAvoidanceRule在实现的时候并没有像AvailabilityFilteringRule那样重写choose函数来优化,所以它完全遵循了父类的过滤主逻辑:“先过滤清单,再轮询选择”。其中过滤清单的条件就是我们上面提到的以ZoneAvoidancePredicate为主过滤条件、AvailabilityPredicate为次过滤条件的组合过滤条件CompositePredicate。从CompositePredicate的源码片段中,我们可以看到它定义了一个主过滤条件AbstractServerPredicate delegate以及一组次过滤条件列表List fallbacks,所以它的次过滤列表是可以拥有多个的,并且由于它采用了List存储所以次过滤条件是按顺序执行的。

123456789101112131415161718192021`|`publicclassCompositePredicateextendsAbstractServerPredicate{privateAbstractServerPredicatedelegate;privateList<AbstractServerPredicate>fallbacks=Lists.newArrayList();privateintminimalFilteredServers=1;privatefloatminimalFilteredPercentage=0;@OverridepublicList<Server>getEligibleServers(List<Server>servers,ObjectloadBalancerKey){List<Server>result=super.getEligibleServers(servers,loadBalancerKey);Iterator<AbstractServerPredicate>i=fallbacks.iterator();while(!(result.size()>=minimalFilteredServers&&result.size()>(int)(servers.size()*minimalFilteredPercentage))&&i.hasNext()){AbstractServerPredicatepredicate=i.next();result=predicate.getEligibleServers(servers,loadBalancerKey);}returnresult;}}
123456789101112131415161718192021` | `publicclassCompositePredicateextendsAbstractServerPredicate{privateAbstractServerPredicatedelegate;privateList<AbstractServerPredicate>fallbacks=Lists.newArrayList();privateintminimalFilteredServers=1;privatefloatminimalFilteredPercentage=0;@OverridepublicList<Server>getEligibleServers(List<Server>servers,ObjectloadBalancerKey){List<Server>result=super.getEligibleServers(servers,loadBalancerKey);Iterator<AbstractServerPredicate>i=fallbacks.iterator();while(!(result.size()>=minimalFilteredServers&&result.size()>(int)(servers.size()*minimalFilteredPercentage))&&i.hasNext()){AbstractServerPredicatepredicate=i.next();result=predicate.getEligibleServers(servers,loadBalancerKey);}returnresult;}}

再来看看获取过滤结果的实现函数getEligibleServers中,它的处理逻辑如下:

  • 使用主过滤条件对所有实例过滤并返回过滤后的实例清单
  • 依次使用次过滤条件列表中的过滤条件对主过滤条件的结果进行过滤
  • 每次过滤之后(包括主过滤条件和次过滤条件),都需要判断下面两个条件,只要有一个符合就不再进行过滤,将当前结果返回供线性轮询算法选择:

    • 过滤后的实例总数 >= 最小过滤实例数(minimalFilteredServers,默认为1)
    • 过滤后的实例比例 > 最小过滤百分比(minimalFilteredPercentage,默认为0)

来源:http://ddrv.cn/a/88268

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » Spring Cloud源码分析(二)Ribbon

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏