深入理解Spring cloud源码篇之Eureka源码

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】
免费领取 2000+ 道 Java 面试题

1.eureka功能分析

首先,eureka在springcloud中充当服务注册功能,相当于dubbo+zk里面得zk,但是比zk要简单得多,zk可以做得东西太多了,包括分布式锁,分布式队列都是基于zk里面得四种节点加watch机制通过长连接来实现得,但是eureka不一样,eureka是基于HTTPrest来实现的,就是把服务的信息放到一个ConcurrentHashMap中,然后服务启动的时候去读取这个map,来把所有服务关联起来,然后服务器之间调用的时候通过信息,进行http调用。eureka包括两部分,一部分就是服务提供者(对于eureka来说就是客户端),一部分是服务端,客户端需要每个读取每个服务的信息,然后注册到服务端,很明显了,这个服务端就是接受客户端提供的自身的一些信息。

2.eureka客户端源码分析

如果看spring的源码的话我们一般会找到Spring 源码包里面的META-INF文件夹下面的spring.handlers文件,然后直接找到XXXHandler的源码文件,紧着着就会分析springxml里面的各种标签解析。在看cloud源码的时候,我们则是找到META-INF文件下的spring.factories,找到里面的类去分析功能。
我们根据上面的描述首先找到eureka-client(1.4.0)包下面的spring.factories文件中的EurekaClientAutoConfiguration配置类。我们知道一个eureka客户端最重要的功能也就是四点:

  • 2.1读取该项目的ip,instance_id,端口号,注册到服务端
  • 2.2服务下架
  • 2.3心跳机制
  • 2.4获取其他服务器信息

2.1服务注册

基于这个思想,我们先找到第一个配置就是在哪读取的application.properties文件,我们看到eurekaInstanceConfigBean()方法,就是读取配置文件到EurekaInstanceConfigBean对象中,并且有@bean注册到ioc的容器中。EurekaInstanceConfigBean对象就包括客户端的ip,instance_id,端口号等等信息。我们看到以下代码是对EurekaInstanceConfigBean的一个包装:

            @Bean
            public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {//上文说的eurekaInstanceConfigBean是EurekaInstanceConfig的实现类
                InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
                return new ApplicationInfoManager(config, instanceInfo);
            }

接着就是服务注册了:

            @Bean(destroyMethod = "shutdown")
            public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config, EurekaInstanceConfig instance) {
                manager.getInfo(); // force initialization
                return new CloudEurekaClient(manager, config, this.optionalArgs,
                        this.context);
            }

我们直接看到super里面的方法,在initScheduledTasks();之上就是创建一些线程池,initScheduledTasks里面开启了一个线程heartbeat,我们看到了:

    private class HeartbeatThread implements Runnable {

            public void run() {
                if (renew()) {
                    lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
                }
            }
        }
    boolean renew() {
            EurekaHttpResponse<InstanceInfo> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
                logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
                if (httpResponse.getStatusCode() == 404) {
                    REREGISTER_COUNTER.increment();
                    logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
                    return register();
                }
                return httpResponse.getStatusCode() == 200;
            } catch (Throwable e) {
                logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
                return false;
            }
        }

在renew方法里,如果返回为404的话,则会调用register()方法去注册,这个发送心跳的时间间隔也可配置,在配置源码的定时器里可以找到,跟读源码的时候发现调用这个register方法除了renew还有InstanceInfoReplicator线程里面的run方法,这个定时器的时间间隔是40秒,在服务启动的时候也会去设置条件合适去执行定时器,这个定时器的作用就是当配置信息改变的时候去调用register,当初次启动的时候也会去调用一下,因为调用了refreshInstanceInfo(),所以isInstanceInfoDirty的值就变成了true,所以,初次注册的时候也会注册到这里,之后除了特殊情况其他的的都不会走register().特殊情况包括:IP的改变,某些配置文件参数的改变,从下面代码可以看出来:

     public void refreshDataCenterInfoIfRequired() {
            String existingAddress = instanceInfo.getHostName();

            String newAddress;
            if (config instanceof RefreshableInstanceConfig) {
                // Refresh data center info, and return up to date address
                newAddress = ((RefreshableInstanceConfig) config).resolveDefaultAddress(true);
            } else {
                newAddress = config.getHostName(true);
            }
            String newIp = config.getIpAddress();

            if (newAddress != null && !newAddress.equals(existingAddress)) {
                logger.warn("The address changed from : {} => {}", existingAddress, newAddress);

                // :( in the legacy code here the builder is acting as a mutator.
                // This is hard to fix as this same instanceInfo instance is referenced elsewhere.
                // We will most likely re-write the client at sometime so not fixing for now.
                InstanceInfo.Builder builder = new InstanceInfo.Builder(instanceInfo);
                builder.setHostName(newAddress).setIPAddr(newIp).setDataCenterInfo(config.getDataCenterInfo());
                instanceInfo.setIsDirty();//设置isInstanceInfoDirty为true,lastDirtyTimestamp为当前时间
            }
        }

        public void refreshLeaseInfoIfRequired() {
            LeaseInfo leaseInfo = instanceInfo.getLeaseInfo();
            if (leaseInfo == null) {
                return;
            }
            int currentLeaseDuration = config.getLeaseExpirationDurationInSeconds();
            int currentLeaseRenewal = config.getLeaseRenewalIntervalInSeconds();
            if (leaseInfo.getDurationInSecs() != currentLeaseDuration || leaseInfo.getRenewalIntervalInSecs() != currentLeaseRenewal) {//配置参数变了
                LeaseInfo newLeaseInfo = LeaseInfo.Builder.newBuilder()
                        .setRenewalIntervalInSecs(currentLeaseRenewal)
                        .setDurationInSecs(currentLeaseDuration)
                        .build();
                instanceInfo.setLeaseInfo(newLeaseInfo);
                instanceInfo.setIsDirty();
            }
        }

以上就是eureka客户端的注册。

2.2服务下架

我们看EurekaClient接口,里面有个shutdown,我们看到@PreDestroy当servlet关闭的时候就会触发。

      if (isShutdown.compareAndSet(false, true)) {
                logger.info("Shutting down DiscoveryClient ...");

                if (statusChangeListener != null && applicationInfoManager != null) {
                    applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
                }

                cancelScheduledTasks();//关闭心跳,服务替换,缓存刷新等定时器

                // If APPINFO was registered
                if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka()) {
                    applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);//设置状态为down
                    unregister();//通知服务端客户端下线
                }

                logger.info("Completed shut down of DiscoveryClient");
            }

2.3心跳机制

2.1我们分析了服务注册,设计到了renew()当返回404的时候是服务注册,200的时候就是发送心跳的机制默认30秒发送一次。

2.4服务获取

当eureka客户端启动的时候会注册到eureka服务端上,其他客户端也需要感知该eureka启动,从而读取配置信息,服务之间的信息获取也是通过定时器获取的,在initScheduledTasks();方法中,我们看到启动了一个CacheRefreshThread线程,时间间隔默认为30秒,我们直接看该线程里面的fetchRegistry(boolean forceFullRegistryFetch);方法,这里有两种拉取,一种是全量拉取,一种是增量拉取。全量拉取方法为getAndStoreFullRegistry()代码:

      private void getAndStoreFullRegistry() throws Throwable {
            long currentUpdateGeneration = fetchRegistryGeneration.get();
            Applications apps = null;
            EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                    ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())//rest请求服务器获得实例信息
                    : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
            if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
                apps = httpResponse.getEntity();
            }
            if (apps == null) {
                logger.error("The application is null for some reason. Not storing this information");
            } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
                localRegionApps.set(this.filterAndShuffle(apps));//存放到DiscoveryClient对象的localRegionApps的AtomicReference对象中
            } else {   
            }
        }

3.eureka服务端源码分析

分析eureka客户端功能的时候我们发现客户端是通过httprest请求来注册/拉取信息的,那么eureka服务端一定是一个类似spring MVC的项目结构。找到EurekaServerAutoConfiguration类,看到jerseyApplication()方法,在容器中存放了一个jerseyApplication对象,jerseyApplication()方法里的东西和Spring源码里扫描@Component逻辑类似,扫描@Path和@Provider标签,然后封装成beandefinition,封装到Application的set容器里。通过filter过滤器来过滤url进行映射到对象的Controller。

        @Bean
        public FilterRegistrationBean jerseyFilterRegistration(
                javax.ws.rs.core.Application eurekaJerseyApp) {
            FilterRegistrationBean bean = new FilterRegistrationBean();//核心是一个filter
            bean.setFilter(new ServletContainer(eurekaJerseyApp));
            bean.setOrder(Ordered.LOWEST_PRECEDENCE);
            bean.setUrlPatterns(
                    Collections.singletonList(EurekaConstants.DEFAULT_PREFIX + "/*"));//拦截/eureka开头的所有请求

            return bean;
        }

以上是对jersey的初步介绍,通过分析eureka客户端,我们大概知道客户端有这几个功能

  • 服务接受请求(认识jersey)
  • 接受客户端注册/心跳/下架请求并处理
  • 服务剔除

以下是eureka服务端自身高可用层面的功能点

  • 自我保护
  • 服务之间的信息同步

3.1服务怎么接受请求

上面介绍了jersey和eureka怎么集成jersey,这里就不多说。

3.2接受客户端注册/心跳/下架请求并处理

服务端接受客户端的注册

在eurekawiki上https://github.com/Netflix/eureka/wiki/Eureka-REST-operations我们我们知道注册到服务端是调用的POST /eureka/v2/apps/appID 接口,找到了ApplicationsResource类中调用了ApplicationResource的addInstance()方法,找到register()方法

    @Override
        public void register(final InstanceInfo info, final boolean isReplication) {
            int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;//默认有效时长90m
            if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
                leaseDuration = info.getLeaseInfo().getDurationInSecs();
            }
            super.register(info, leaseDuration, isReplication);//实例注册,下面具体看这个
            //同步到其他服务
            replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
        }
     private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
                = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();//线程安全的一个服务实例map,name为cloud项目中的实例名字,嵌套里面的map是以key为instanceId,Lease对象为value的一个map
     public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
                try {
                    Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());//appname就为cloud配置里的spring.application.name
                    REGISTER.increment(isReplication);
                    if (gMap == null) {
                        final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                        gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);//如果第一个实例注册进来的时候会给registryput进去一个空的lease
                        if (gMap == null) {
                            gMap = gNewMap;
                        }
                    }
                    Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());//这个id就是instanceId

                    Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
                    if (existingLease != null) {
                        lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
                    }
                    //put了一个lease
                    gMap.put(registrant.getId(), lease);

                } finally {
                    read.unlock();
                }
            }

服务端接受客户端的续约(心跳)

接口在InstanceResource#renewLease()。服务续约其实就是维护实例状态,更新一下最后更新时间,然后同步到其他服务端。直接看renew()方法

      public boolean renew(String appName, String id, boolean isReplication) {
                RENEW.increment(isReplication);
                Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);//得到实例对应的lease对象
                Lease<InstanceInfo> leaseToRenew = null;
                if (gMap != null) {
                    leaseToRenew = gMap.get(id);//得到实例
                }
                if (leaseToRenew == null) {//error
                    return false;
                } else {
                    InstanceInfo instanceInfo = leaseToRenew.getHolder();
                    if (instanceInfo != null) {
                        // touchASGCache(instanceInfo.getASGName());
                        InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                                instanceInfo, leaseToRenew, isReplication);
                        ...
                        if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                            Object[] args = {
                                    instanceInfo.getStatus().name(),
                                    instanceInfo.getOverriddenStatus().name(),
                                    instanceInfo.getId()
                            };
                            instanceInfo.setStatus(overriddenInstanceStatus);//修改实例状态
                        }
                    }
                    renewsLastMin.increment();
                    leaseToRenew.renew();//更新组后更新时间
                    return true;
                }
            }

服务端接受客户端要下架请求

服务下架接口在InstanceResource#cancelLease()方法,直接看internalCancel()方法

    protected boolean internalCancel(String appName, String id, boolean isReplication) {
                try {
                    Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);//得到所有实例
                    Lease<InstanceInfo> leaseToCancel = null;
                    if (gMap != null) {
                        leaseToCancel = gMap.remove(id);//从map中移除掉下架实例
                    }
                    synchronized (recentCanceledQueue) {
                        recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
                    }
                    InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
                    if (instanceStatus != null) {
                        logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
                    }
                    if (leaseToCancel == null) {
                        return false;
                    } else {
                        leaseToCancel.cancel();
                        InstanceInfo instanceInfo = leaseToCancel.getHolder();
                        String vip = null;
                        String svip = null;
                        if (instanceInfo != null) {
                            instanceInfo.setActionType(ActionType.DELETED);
                            recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                            instanceInfo.setLastUpdatedTimestamp();
                            vip = instanceInfo.getVIPAddress();
                            svip = instanceInfo.getSecureVipAddress();
                        }
                        invalidateCache(appName, vip, svip);
                        logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
                        return true;
                    }
                } finally {
                }
            }

3.3服务剔除

当客户端长时间(默认90秒)没有给服务端发送请求的时候,就说明客户端down了,看过Spring源码得都明白,Spring源码比较重要得方法就在AbstractApplicationContext#refresh()方法,里面从扫描了xml/java文件到扫描注解,到进行DI到ioc容器然后再到销毁bean,最后有一个finishRefresh();方法,这是Spring所有工作做完之后调用得方法,一直调到了DefaultLifecycleProcessor#onRefresh()下得#startBeans(true);下的#start();下的doStart(this.lifecycleBeans, member.name, this.autoStartupOnly);下的start()方法。在这里会调用到实现了Lifecycle接口的所有的start()方法,而在EurekaServerAutoConfiguration类中,我们看到import了一个实现了Lifecycle接口的EurekaServerInitializerConfiguration类,在start方法里初始化了一个单独的EurekaServerContext的上下文。在initEurekaServerContext()方法中,
执行了registry.openForTraffic(applicationInfoManager, registryCount);最后一句调用了AbstractInstanceRegistry#postInit()方法,在此方法里开启了一个每60秒调用一次EvictionTask#evict()的定时器。

      public void evict(long additionalLeaseMs) {
              if (!isLeaseExpirationEnabled()) {//如果开启自我保护,则不自动剔除。默认开启
                logger.debug("DS: lease expiration is currently disabled.");
                return;
                }

                List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
                for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
                    Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
                    if (leaseMap != null) {
                        for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                            Lease<InstanceInfo> lease = leaseEntry.getValue();
                            if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                                expiredLeases.add(lease);//如果过期了,加入到expiredLeases的list中
                            }
                        }
                    }
                }
                // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
                // triggering self-preservation. Without that we would wipe out full registry.
                int registrySize = (int) getLocalRegistrySize();
                int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
                int evictionLimit = registrySize - registrySizeThreshold;

                int toEvict = Math.min(expiredLeases.size(), evictionLimit);
                if (toEvict > 0) {
                    logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

                    Random random = new Random(System.currentTimeMillis());
                    for (int i = 0; i < toEvict; i++) {
                        // Pick a random item (Knuth shuffle algorithm)
                        int next = i + random.nextInt(expiredLeases.size() - i);
                        Collections.swap(expiredLeases, i, next);
                        Lease<InstanceInfo> lease = expiredLeases.get(i);

                        String appName = lease.getHolder().getAppName();
                        String id = lease.getHolder().getId();
                        EXPIRED.increment();
                        logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                        internalCancel(appName, id, false);//移除服务器缓存,同步其他服务器
                    }
                }
            }

3.4服务自我保护模式

客户端长时间不发送续约(心跳),服务端默认每一分钟会进行一次服务剔除,3.3里又一个isLeaseExpirationEnabled()方法:

         /** * 期望 最大 每分钟 续租 次数。 计算公式 当前注册的应用实例数 x 2 */
         protected volatile int expectedNumberOfRenewsPerMin ;
         /** * 期望 最小 每分钟 续租 次数。 计算公式 expectedNumberOfRenewsPerMin * 续租百分比( eureka.renewalPercentThreshold ) */
         protected volatile int numberOfRenewsPerMinThreshold ;
      @Override
        public boolean isLeaseExpirationEnabled() {
            if (!isSelfPreservationModeEnabled()) {//默认打开自我保护,false则关闭自我保护
                // The self preservation mode is disabled, hence allowing the instances to expire.
                return true;
            }
            return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;//每分钟心跳数大于期望最小每分钟续租次数代表这个实例还活着
        }

3.5服务之间信息同步

上面说到的服务注册,服务剔除,服务续约等功能的时候在修改完本地业务之后会调用PeerAwareInstanceRegistryImpl#replicateToPeers()方法,同步到其他服务器。

     private void replicateToPeers(Action action, String appName, String id,
                                      InstanceInfo info /* optional */,
                                      InstanceStatus newStatus /* optional */, boolean isReplication) {
            Stopwatch tracer = action.getTimer().start();
            try {
                if (isReplication) {//
                    numberOfReplicationsLastMin.increment();
                }
                // If it is a replication already, do not replicate again as this will create a poison replication
                if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                    return;
                }

                for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                    // If the url represents this host, do not replicate to yourself.
                    if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                        continue;
                    }
                    replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
                }
            } finally {
                tracer.stop();
            }
        }
     /** * Replicates all instance changes to peer eureka nodes except for * replication traffic to this node. * */
        private void replicateInstanceActionsToPeers(Action action, String appName,
                                                     String id, InstanceInfo info, InstanceStatus newStatus,
                                                     PeerEurekaNode node) {
            try {
                InstanceInfo infoFromRegistry = null;
                CurrentRequestVersion.set(Version.V2);
                switch (action) {
                    case Cancel:
                        node.cancel(appName, id);
                        break;
                    case Heartbeat:
                        InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                        node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                        break;
                    case Register:
                        node.register(info);
                        break;
                    case StatusUpdate:
                        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                        node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                        break;
                    case DeleteStatusOverride:
                        infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                        node.deleteStatusOverride(appName, id, infoFromRegistry);
                        break;
                }
            } catch (Throwable t) {
                logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
            }
        }

Eureka二次传播问题:当eureka A配置到B,B配置到C的时候,客户端注册到服务器A,这个时候服务器A,B会有客户端信息,C则没有。代码分析结果如下:
20191123100276\_1.png
当客户端注册到服务端A的时候A上有客户端信息,这个时候会同步一遍B服务端,则,B同步到C的时候isReplication则为false,就不会同步过去了。


来源:http://ddrv.cn/a/88268

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » 深入理解Spring cloud源码篇之Eureka源码

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏