【Spring Cloud】源码-Eureka客户端的服务注册、服务获取与服务续约

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】
免费领取 2000+ 道 Java 面试题

在看源码之前,先说一下标题中提到的三个概念:

  1. 服务注册:

服务提供者(eureka客户端)在启动后,如果参数eureka.client.register-with-eureka为true,那么会将自己注册到服务注册中心中,注册的动作会将自己的元数据发送给注册中心,注册中心将接受的元数据保存在一个注册列表中,该列表是一个双层Map结构,具体为:Map<服务名, Map<实例名,服务实例>>

  1. 服务续约:

成功注册的eureka服务(eureka客户端)会在注册之后维护一个心跳来告诉注册中心“我还活着”,这样注册中心就不会从注册列表中将这个服务实例剔除,关于这个心跳机制涉及到两个配置参数:

eureka.instance.lease-renewal-interval-in-seconds(默认30):心跳间隔时间

eureka.instance.lease-expiration-duration-in-seconds(默认90):定义服务失效时间

  1. 服务获取:

前两个概念是针对服务提供者,而服务获取是针对服务消费者(也属于eureka客户端),即调用服务方才需要获取服务列表以便选择调用哪一个服务实例。在服务消费者启动后,会向服务注册中心请求一份服务清单,该清单记录了已经注册到服务中心的服务实例。该请求动作不会仅限于启动的时候,因为消费者需要访问正确的、健康的服务实例,因此会定时发送请求。间隔时间通过配置参数:

eureka.instance.registry-fetch-interval-seconds(默认30)

那么接下来我们来简单看一下源码:

首先eureka客户端最重要的功能实现类就是org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient

这个类是对Eureka发现法务的封装,而SpringCloudEureka本身就是对NetflixEureka的功能封装,因此,EurekaDiscoveryClient类会持有一个com.netflix.discovery.EurekaClient.EurekaClient对象引用(为组合关系,具体实现是NetflixEureka的DiscoveryClient类),而SpringCloudEureka本身有一个对发现服务的常用方法的抽象,这就是org.springframework.cloud.netflix.eureka.DiscoveryClient接口,EurekaDiscoveryClient实现了该接口(为继承关系),他们的关系大概如下图所示:

20191123100359\_1.png

其中左边的两个是SpringCloudEureka的,右面的两个是NetflixEureka的。

既然服务发现的方法主要在Netflix的DiscoveryClient类中,可以看一下这个类的注释,主要告诉我们DiscoveryClient的主要功能:Eureka客户端的注册、续约、取消租约(服务关闭)、获取服务。

因此首先研究的就是它,在构造器中,DiscoveryClient会调用一个initScheduledTasks()方法,从命名就可以看出这是一个初始化方法,那么我们可以从这个方法入手,源码如下:

    /**
         * Initializes all scheduled tasks.
         */
        private void initScheduledTasks() {
            if (clientConfig.shouldFetchRegistry()) {    // #1
                // registry cache refresh timer
                int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "cacheRefresh",
                                scheduler,
                                cacheRefreshExecutor,
                                registryFetchIntervalSeconds,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new CacheRefreshThread()
                        ),
                        registryFetchIntervalSeconds, TimeUnit.SECONDS);
            }

            if (clientConfig.shouldRegisterWithEureka()) {    //#2
                int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
                int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
                logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

                // Heartbeat timer
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "heartbeat",
                                scheduler,
                                heartbeatExecutor,
                                renewalIntervalInSecs,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new HeartbeatThread()
                        ),
                        renewalIntervalInSecs, TimeUnit.SECONDS);    // #3

                // InstanceInfo replicator
                instanceInfoReplicator = new InstanceInfoReplicator(    // #4
                        this,
                        instanceInfo,
                        clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                        2); // burstSize

                statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                    @Override
                    public String getId() {
                        return "statusChangeListener";
                    }

                    @Override
                    public void notify(StatusChangeEvent statusChangeEvent) {
                        if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                                InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                            // log at warn level if DOWN was involved
                            logger.warn("Saw local status change event {}", statusChangeEvent);
                        } else {
                            logger.info("Saw local status change event {}", statusChangeEvent);
                        }
                        instanceInfoReplicator.onDemandUpdate();
                    }
                };

                if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                    applicationInfoManager.registerStatusChangeListener(statusChangeListener);
                }

                instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
            } else {
                logger.info("Not registering with Eureka server per configuration");
            }
        }

在这段代码中,出现两个重要的if判断,首先看#1处的if判断:

if(clientConfig.shouldFetchRegistry()),顾名思义,获取配置中参数eureka.client.fetch-registery(是否获取服务列表)的值,如果是服务消费者,那么这个值就为true,进入判断代码块后,首先会从配置中获取一个参数:

registeryFetchIntervalSeconds:对应配置参数eureka.client.registry-fetch-interval-seconds,即从服务注册中心获取服务列表的间隔时间,默认是30。之后会执行定时任务CacheRefreshThread(),为什么叫CacheRefreshThread呢?因为客户端所持有的服务列表会缓存起来,到了一定时间(即上面的eureka.client.registry-fetch-interval-seconds)后会更新缓存并重新从注册中心获取新的服务列表,因此有一个“Cache”开头的方法。这个方法的代码如下:

    /**
         * The task that fetches the registry information at specified intervals.
         *
         */
        class CacheRefreshThread implements Runnable {
            public void run() {
                refreshRegistry();
            }
        }

run()方法中执行refreshRegistry(),方法代码如下:

    @VisibleForTesting
        void refreshRegistry() {
            try {
                boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();

                boolean remoteRegionsModified = false;
                // This makes sure that a dynamic change to remote regions to fetch is honored.
                String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
                if (null != latestRemoteRegions) {
                    String currentRemoteRegions = remoteRegionsToFetch.get();
                    if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                        // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                        synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                            if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                                String[] remoteRegions = latestRemoteRegions.split(",");
                                remoteRegionsRef.set(remoteRegions);
                                instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                                remoteRegionsModified = true;
                            } else {
                                logger.info("Remote regions to fetch modified concurrently," +
                                        " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                            }
                        }
                    } else {
                        // Just refresh mapping to reflect any DNS/Property change
                        instanceRegionChecker.getAzToRegionMapper().refreshMapping();
                    }
                }

                boolean success = fetchRegistry(remoteRegionsModified);
                if (success) {
                    registrySize = localRegionApps.get().size();
                    lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
                }

                if (logger.isDebugEnabled()) {
                    StringBuilder allAppsHashCodes = new StringBuilder();
                    allAppsHashCodes.append("Local region apps hashcode: ");
                    allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
                    allAppsHashCodes.append(", is fetching remote regions? ");
                    allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
                    for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
                        allAppsHashCodes.append(", Remote region: ");
                        allAppsHashCodes.append(entry.getKey());
                        allAppsHashCodes.append(" , apps hashcode: ");
                        allAppsHashCodes.append(entry.getValue().getAppsHashCode());
                    }
                    logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
                            allAppsHashCodes.toString());
                }
            } catch (Throwable e) {
                logger.error("Cannot fetch registry from server", e);
            }        
        }

该段代码没研究…

接下来我们看#2处的if判断:

if(clientConfig.shouldRegisterWithEureka()),可以看出,此处判断参数eureka.client.register-with-eureka(是否将自己注册到服务注册中心,默认true),如果为true,那么就意味着需要做两件事:服务注册与服务续约。那么我们看代码进入#2的if判断之后,首先看#3处的代码,很明显这是一个定时任务,从renewalIntervalInSecs变量可以看出,这是心跳机制的定时任务,renewalIntervalInSecs变量则是从配置文件参数eureka.instance.lease-renewal-interval-in-seconds的值(默认为30),含义是心跳间隔时间,而HeartbeatThread()方法就是续约方法,

HeartbeatThread()方法代码如下:

    /**
         * The heartbeat task that renews the lease in the given intervals.
         */
        private class HeartbeatThread implements Runnable {

            public void run() {
                if (renew()) {    // #3.1
                    lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
                }
            }
        }

#3.1调用方法renew():

    /**
         * Renew with the eureka service by making the appropriate REST call
         */
        boolean renew() {
            EurekaHttpResponse<InstanceInfo> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
                logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
                if (httpResponse.getStatusCode() == 404) {
                    REREGISTER_COUNTER.increment();
                    logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
                    return register();
                }
                return httpResponse.getStatusCode() == 200;
            } catch (Throwable e) {
                logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
                return false;
            }
        }

该方法就是发送Rest请求给注册中心,若返回404,则调用register()方法,这个方法就是将自己的元数据重新注册到服务中心,具体代码不贴上来了。

之后在#4处的代码会创建一个InstanceInfoReplicator对象,这个对象也会做一个定时任务,具体run()方法代码如下:

    public void run() {
            try {
                discoveryClient.refreshInstanceInfo();

                Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
                if (dirtyTimestamp != null) {
                    discoveryClient.register();    // #4.1
                    instanceInfo.unsetIsDirty(dirtyTimestamp);
                }
            } catch (Throwable t) {
                logger.warn("There was a problem with the instance info replicator", t);
            } finally {
                Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
                scheduledPeriodicRef.set(next);
            }
        }

很明显,#4.1处的代码行,register()方法正是服务注册方法,其实代码如下:

    /**
         * Register with the eureka service by making the appropriate REST call.
         */
        boolean register() throws Throwable {
            logger.info(PREFIX + appPathIdentifier + ": registering service...");
            EurekaHttpResponse<Void> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.register(instanceInfo);    // #4.1.1
            } catch (Exception e) {
                logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
                throw e;
            }
            if (logger.isInfoEnabled()) {
                logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
            }
            return httpResponse.getStatusCode() == 204;
        }

#4.1.1处的代码就是具体的注册动作,该方法会发送一个Rest请求给服务注册中心,同时传递一个instanceInfo对象,之前我们说过,注册动作会将客户端自己的元数据传递给服务注册中心,那么这个instanceInfo对象就是客户端的元数据。

至此,#3、#4两处代码分别是 服务的注册与续约功能实现。由于服务注册与续约均需要参数eureka.client.register-with-eureka为true,因此两个功能写入一个if判断中,而服务发现需要参数eureka.client.fetch-registery为true,因此单独在一个if判断中。


来源:http://ddrv.cn/a/88268

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » 【Spring Cloud】源码-Eureka客户端的服务注册、服务获取与服务续约

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏