Spring Cloud Eureka 源码解析

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】
免费领取 2000+ 道 Java 面试题

文章目录

写在前面

在看具体源码之前我们先回顾一下之前在《服务治理:Spring-cloud Eureka入门实例详解》所实现的内容。首先,对于服务注册中心、服务提供者、服务消费者这三个主要元素来说,后两者(也就是Eurake客户端)在整个运行机制中是大部分通信行为的主动发起者,而注册中心主要是处理请求的接受者,所以,我们可以从Eurake的客户端作为入口看看它是如何完成这些主要通信行为的。
我们将一个普通的Spring Boot应用注册到Eureka Server或是从Eureka Server中获取服务列表时,主要做了以下两件事:

  1. 在应用主类配置了@EnableDiscoveryClient注解
  2. 在application.properties中用eurake.serverUrl.defaultZonoe参数指定了服务注册中心的位置

一、@EnableDiscoveryClient注解

现在我们来看一下@EnableDiscoveryClient的源码

    /**
     * Annotation to enable a DiscoveryClient implementation.
     * @author Spencer Gibb
     */
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Inherited
    @Import(EnableDiscoveryClientImportSelector.class)
    public @interface EnableDiscoveryClient {

        /**
         * If true, the ServiceRegistry will automatically register the local server.
         * 如果为真,ServiceRegistry将自动注册本地服务器
         */
        boolean autoRegister() default true;
    }

通过注释我们可以知道它主要用来开启DiscoveryClient的实例。接下来我们来看一下DiscoveryClient相关的类与接口。
20191123100159\_1.png
其中,左边的org.springframework.cloud.client.discovery.DiscoveryClient是Spring Cloud的接口,它定义了用来发现服务的常用抽象方法,通过该接口可以有效地屏蔽服务治理的实现细节。org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient是对该接口的实现,实现的是对Eureka发现服务的封装。右边的接口和类均来自com.netflix.discovery包,EurekaDiscoveryClient依赖了Netflix Eurake的com.netflix.discovery.EurakeClient接口,EurekaClient继承了LookuoServer接口,它们都是Netflix开源包的内容,主要定义了针对Eureka的发现服务的抽象方法,而真正实现发现服务的则是Netflix包中的com.netflix.discovery.DiscoveryClient类。

1.1 com.netflix.discovery.DiscoveryClient类

根据com.netflix.discovery.DiscoveryClient类的头部注释,我们可以知道以下信息:

    这个类用于帮助Eurake Server互相协作。
    Eureka Client负责下面的任务:  

     - 向Eureka Server注册服务实例
     - 向Eureka Server服务续约
     - 当服务关闭期间,向Eureka Server取消租约
     - 查询Eureka Server中的服务实例列表

     Eureka Client还需要配置一个Eureka Server的URL列表

在具体看Eureka Client负责完成的任务之前,我们先看看在哪里对Eureka Server的URL列表进行配置,根据我们配置的属性名eureka.client.serviceUrl.defaultZone,通过serviceUrl可以找到该属性相关的加载属性,但是在SR5版本中它们都被@Deprecated标注为不再建议使用,并@link到替代类com.netflix.discovery.endpoint.EndpointUtils,所以我们在com.netflix.discovery.endpoint.EndpointUtils类下找这个方法

    /**
         * Get the list of all eureka service urls from properties file for the eureka client to talk to.
         *  从属性文件中获取所有eureka服务url的列表,以便与eureka客户机通信
         * @param clientConfig the clientConfig to use
         * @param instanceZone The zone in which the client resides
         * @param preferSameZone true if we have to prefer the same zone as the client, false otherwise
         * @return The list of all eureka service urls for the eureka client to talk to
         */
        public static List<String> getServiceUrlsFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) {
            List<String> orderedUrls = new ArrayList<String>();
            //重点部分
            String region = getRegion(clientConfig);
            String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());

            if (availZones == null || availZones.length == 0) {
                availZones = new String[1];
                availZones[0] = DEFAULT_ZONE;
            }
            logger.debug("The availability zone for the given region {} are {}", region, availZones);

                //重点部分
            int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);
            List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[myZoneOffset]);

            if (serviceUrls != null) {
                orderedUrls.addAll(serviceUrls);
            }
            int currentOffset = myZoneOffset == (availZones.length - 1) ? 0 : (myZoneOffset + 1);
            while (currentOffset != myZoneOffset) {
                serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[currentOffset]);
                if (serviceUrls != null) {
                    orderedUrls.addAll(serviceUrls);
                }
                if (currentOffset == (availZones.length - 1)) {
                    currentOffset = 0;
                } else {
                    currentOffset++;
                }
            }

            if (orderedUrls.size() < 1) {
                throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!");
            }
            return orderedUrls;
        }

1.2 Region、Zone

在上面的函数中,可以发现,客户端依次加载了两个内容,第一个是Region,第二个是Zone;

  • 通过getRegion函数,我们可以看到它从配置中读取了一个Region返回,所以一个微服务应用只可以属于一个Region,如果不特别配置则默认default。若我们要自己设置,可以通过eureka.client.region属性来定义。
    /**
         * Get the region that this particular instance is in.
         * 获取这个特定实例所在的区域。
         * @return - The region in which the particular instance belongs to.
         */
        public static String getRegion(EurekaClientConfig clientConfig) {
            String region = clientConfig.getRegion();
            if (region == null) {
                region = DEFAULT_REGION;
            }
            region = region.trim().toLowerCase();
            return region;
        }
  • 通过getAvailabilityZones函数,可以知道当我们没有特别为Region配置Zone的时候,将默认采用defaultZone,这也是我们之前配置参数eureka.client.serviceUrl.defaultZone的由来。若要为应用指定Zone,可以通过eureka.client.avaliablitity-zones属性来进行设置,。从函数getAvailabilityZones返回的内容,我们知道Zone可以设置多个,用逗号进行分隔,由此,我们可以判断Region和Zone是一对多的关系
    @Override
        public String[] getAvailabilityZones(String region) {
            String value = this.availabilityZones.get(region);
            if (value == null) {
                value = DEFAULT_ZONE;
            }
            return value.split(",");
        }

1.3 serviceUrls

在获取了Region和Zone的信息之后,开始真正加载Eureka Server的具体地址,它根据传入的参数按一定算法确定加载位于哪一个Zone配置的serviceUrls

     int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones);
     List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[myZoneOffset]);

具体获取serviceUrls的实现,我们可以看getEurekaServerServiceUrls函数的具体实现类EurekaClientConfigBean,该类是EurekaClientConfig和EurekaConstants接口的实现,用来加载配置文件中的内容。

    @Override
        public List<String> getEurekaServerServiceUrls(String myZone) {
            String serviceUrls = this.serviceUrl.get(myZone);
            if (serviceUrls == null || serviceUrls.isEmpty()) {
                serviceUrls = this.serviceUrl.get(DEFAULT_ZONE);
            }
            if (!StringUtils.isEmpty(serviceUrls)) {
                final String[] serviceUrlsSplit = StringUtils.commaDelimitedListToStringArray(serviceUrls);
                List<String> eurekaServiceUrls = new ArrayList<>(serviceUrlsSplit.length);
                for (String eurekaServiceUrl : serviceUrlsSplit) {
                    if (!endsWithSlash(eurekaServiceUrl)) {
                        eurekaServiceUrl += "/";
                    }
                    eurekaServiceUrls.add(eurekaServiceUrl);
                }
                return eurekaServiceUrls;
            }

            return new ArrayList<>();
        }

当我们在为微服务应用中使用Ribbon来实现服务调用时,对于Zone的设置可以在负载均衡时实现区域亲和特性:Ribbon的默认策略会优先访问同客户端处于一个Zone中的服务端实例,只有当同一个Zone中没有可用服务端实例的时候才会访问其他Zone中的实例。

二、服务注册

在理解了多个服务注册中心信息的加载后,我们再回头看看DiscoveryClient类是如何实现“服务注册”行为的,来看一下它的构造函数

2.1 DiscoveryClient类构造函数

    @Inject
        DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                        Provider<BackupRegistry> backupRegistryProvider) {
            if (args != null) {
                this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
                this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
                this.eventListeners.addAll(args.getEventListeners());
                this.preRegistrationHandler = args.preRegistrationHandler;
            } else {
                this.healthCheckCallbackProvider = null;
                this.healthCheckHandlerProvider = null;
                this.preRegistrationHandler = null;
            }

            this.applicationInfoManager = applicationInfoManager;
            InstanceInfo myInfo = applicationInfoManager.getInfo();

            clientConfig = config;
            staticClientConfig = clientConfig;
            transportConfig = config.getTransportConfig();
            instanceInfo = myInfo;
            if (myInfo != null) {
                appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
            } else {
                logger.warn("Setting instanceInfo to a passed in null value");
            }

            this.backupRegistryProvider = backupRegistryProvider;

            this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
            localRegionApps.set(new Applications());

            fetchRegistryGeneration = new AtomicLong(0);

            remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
            remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));

            if (config.shouldFetchRegistry()) {
                this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
            } else {
                this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
            }

            if (config.shouldRegisterWithEureka()) {
                this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
            } else {
                this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
            }

            logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

            if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
                logger.info("Client configured to neither register nor query for data.");
                scheduler = null;
                heartbeatExecutor = null;
                cacheRefreshExecutor = null;
                eurekaTransport = null;
                instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

                // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
                // to work with DI'd DiscoveryClient
                DiscoveryManager.getInstance().setDiscoveryClient(this);
                DiscoveryManager.getInstance().setEurekaClientConfig(config);

                initTimestampMs = System.currentTimeMillis();
                logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                        initTimestampMs, this.getApplications().size());

                return;  // no need to setup up an network tasks and we are done
            }

            try {
                // default size of 2 - 1 each for heartbeat and cacheRefresh
                scheduler = Executors.newScheduledThreadPool(2,
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-%d")
                                .setDaemon(true)
                                .build());

                heartbeatExecutor = new ThreadPoolExecutor(
                        1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>(),
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                                .setDaemon(true)
                                .build()
                );  // use direct handoff

                cacheRefreshExecutor = new ThreadPoolExecutor(
                        1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                        new SynchronousQueue<Runnable>(),
                        new ThreadFactoryBuilder()
                                .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                                .setDaemon(true)
                                .build()
                );  // use direct handoff

                eurekaTransport = new EurekaTransport();
                scheduleServerEndpointTask(eurekaTransport, args);

                AzToRegionMapper azToRegionMapper;
                if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                    azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
                } else {
                    azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
                }
                if (null != remoteRegionsToFetch.get()) {
                    azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
                }
                instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
            } catch (Throwable e) {
                throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
            }

            if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
                fetchRegistryFromBackup();
            }

            // call and execute the pre registration handler before all background tasks (inc registration) is started
            if (this.preRegistrationHandler != null) {
                this.preRegistrationHandler.beforeRegistration();
            }

            if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
                try {
                    if (!register() ) {
                        throw new IllegalStateException("Registration error at startup. Invalid server response.");
                    }
                } catch (Throwable th) {
                    logger.error("Registration error at startup: {}", th.getMessage());
                    throw new IllegalStateException(th);
                }
            }
        //重点部分
            // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
            initScheduledTasks();

            try {
                Monitors.registerObject(this);
            } catch (Throwable e) {
                logger.warn("Cannot register timers", e);
            }

            // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
            // to work with DI'd DiscoveryClient
            DiscoveryManager.getInstance().setDiscoveryClient(this);
            DiscoveryManager.getInstance().setEurekaClientConfig(config);

            initTimestampMs = System.currentTimeMillis();
            logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                    initTimestampMs, this.getApplications().size());
        }

可以从上面的源码发现经过一系统属性值判断后,最终会调用**initScheduledTasks()**这个方法,下面是这个方法的实现源码

    private void initScheduledTasks() {
            if (clientConfig.shouldFetchRegistry()) {
                // registry cache refresh timer
                int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "cacheRefresh",
                                scheduler,
                                cacheRefreshExecutor,
                                registryFetchIntervalSeconds,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new CacheRefreshThread()
                        ),
                        registryFetchIntervalSeconds, TimeUnit.SECONDS);
            }

            if (clientConfig.shouldRegisterWithEureka()) {
                int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
                int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
                logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

                // Heartbeat timer
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "heartbeat",
                                scheduler,
                                heartbeatExecutor,
                                renewalIntervalInSecs,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new HeartbeatThread()
                        ),
                        renewalIntervalInSecs, TimeUnit.SECONDS);

                // InstanceInfo replicator
                instanceInfoReplicator = new InstanceInfoReplicator(
                        this,
                        instanceInfo,
                        clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                        2); // burstSize

                statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                    @Override
                    public String getId() {
                        return "statusChangeListener";
                    }

                    @Override
                    public void notify(StatusChangeEvent statusChangeEvent) {
                        if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                                InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                            // log at warn level if DOWN was involved
                            logger.warn("Saw local status change event {}", statusChangeEvent);
                        } else {
                            logger.info("Saw local status change event {}", statusChangeEvent);
                        }
                        instanceInfoReplicator.onDemandUpdate();
                    }
                };

                if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                    applicationInfoManager.registerStatusChangeListener(statusChangeListener);
                }

                instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
            } else {
                logger.info("Not registering with Eureka server per configuration");
            }
        }

从上面的函数中,可以看到一个与服务注册相关的判断语句if(clientConfig.shouldRegisterWithEureka()),在该分支内,创建了一个InstanceInfoReplicator类的实例,它会执行一个定时任务,而这个定时任务的具体工作可以查看该类的run()函数,具体如下:

    public void run() {
            try {
                discoveryClient.refreshInstanceInfo();

                Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
                if (dirtyTimestamp != null) {
                    discoveryClient.register();
                    instanceInfo.unsetIsDirty(dirtyTimestamp);
                }
            } catch (Throwable t) {
                logger.warn("There was a problem with the instance info replicator", t);
            } finally {
                Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
                scheduledPeriodicRef.set(next);
            }
        }

相信大家都发现了**discoveryClient.register()**这一行,真正触发调用注册的地方就在这里,继续看register()的实现内容,如下:

    /**
         * Register with the eureka service by making the appropriate REST call.
         */
        boolean register() throws Throwable {
            logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
            EurekaHttpResponse<Void> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
            } catch (Exception e) {
                logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
                throw e;
            }
            if (logger.isInfoEnabled()) {
                logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
            }
            return httpResponse.getStatusCode() == 204;
        }

通过REST请求的方式注册eureka service,发起注册请求的时候,传入了一个com.netflix.appinfo.Instance对象,该对象就是注册时客户端给服务端的服务的元数据

三、服务获取与服务续约

我们继续来看DiscoveryClient的initScheduleTasks函数,不难发现在其中还有两个定时任务,分别是“服务获取”和“服务续约”。

    if (clientConfig.shouldFetchRegistry()) {
                // registry cache refresh timer
                int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "cacheRefresh",
                                scheduler,
                                cacheRefreshExecutor,
                                registryFetchIntervalSeconds,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new CacheRefreshThread()
                        ),
                        registryFetchIntervalSeconds, TimeUnit.SECONDS);
            }

    if (clientConfig.shouldRegisterWithEureka()) {
                int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
                int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
                logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

                // Heartbeat timer
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "heartbeat",
                                scheduler,
                                heartbeatExecutor,
                                renewalIntervalInSecs,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new HeartbeatThread()
                        ),
                        renewalIntervalInSecs, TimeUnit.SECONDS);
                        ......
        }

从上面的代码实现可以看出,“服务获取”任务相对于“服务续约”和“服务注册”任务更为独立。“服务续约”与“服务注册”在同一个if逻辑中,服务注册到Eureka Server后,需要一个心跳去续约,防止被剔除,所以它们是成对出现的。
而“服务获取”的逻辑在独立的一个if判断中,其判断依据就是eureka.client.register-with-eureka=true参数,默认为true。为了定期更新客户端的服务清单,以保证客户端能够访问健康的服务实例,“服务获取”的请求不会只限于服务启动,而是一个定时执行的任务,从源码中我们可以看到任务运行中的registryFetchIntervalSeconds参数对应的eureka.client.registry-fetch-interval-seconds=30配置参数,它默认为30s。

“服务续约”的实现较为简单,直接以REST请求的方式进行续约,我们从**new HeartbeatThread()**这个入参进到函数中看到如下实现:

    /**
         * The heartbeat task that renews the lease in the given intervals.
         */
        private class HeartbeatThread implements Runnable {

            public void run() {
                if (renew()) {
                    lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
                }
            }
        }

        /**
         * Renew with the eureka service by making the appropriate REST call
         */
        boolean renew() {
            EurekaHttpResponse<InstanceInfo> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
                logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
                if (httpResponse.getStatusCode() == 404) {
                    REREGISTER_COUNTER.increment();
                    logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                    long timestamp = instanceInfo.setIsDirtyWithTime();
                    boolean success = register();
                    if (success) {
                        instanceInfo.unsetIsDirty(timestamp);
                    }
                    return success;
                }
                return httpResponse.getStatusCode() == 200;
            } catch (Throwable e) {
                logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
                return false;
            }
        }

而“服务获取”则复杂一些,会根据是否是第一次获取发起不同的REST请求和相应处理。同样从TimedSupervisorTask的最后一个入参**new CacheRefreshThread()**进去:

    /**
         * The task that fetches the registry information at specified intervals.
         *
         */
        class CacheRefreshThread implements Runnable {
            public void run() {
                refreshRegistry();
            }
        }

        @VisibleForTesting
        void refreshRegistry() {
            try {
                boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();

                boolean remoteRegionsModified = false;
                // This makes sure that a dynamic change to remote regions to fetch is honored.
                //这确保了对远程区域进行动态更改以获取数据
                String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
                if (null != latestRemoteRegions) {
                    String currentRemoteRegions = remoteRegionsToFetch.get();
                    if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                        // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                        synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                            if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                                String[] remoteRegions = latestRemoteRegions.split(",");
                                remoteRegionsRef.set(remoteRegions);
                                instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                                remoteRegionsModified = true;
                            } else {
                                logger.info("Remote regions to fetch modified concurrently," +
                                        " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                            }
                        }
                    } else {
                        // Just refresh mapping to reflect any DNS/Property change
                        //只需刷新映射以反映任何DNS/属性更改
                        instanceRegionChecker.getAzToRegionMapper().refreshMapping();
                    }
                }
        //重点部分
                boolean success = fetchRegistry(remoteRegionsModified);

                if (success) {
                    registrySize = localRegionApps.get().size();
                    lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
                }

                if (logger.isDebugEnabled()) {
                    StringBuilder allAppsHashCodes = new StringBuilder();
                    allAppsHashCodes.append("Local region apps hashcode: ");
                    allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
                    allAppsHashCodes.append(", is fetching remote regions? ");
                    allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
                    for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
                        allAppsHashCodes.append(", Remote region: ");
                        allAppsHashCodes.append(entry.getKey());
                        allAppsHashCodes.append(" , apps hashcode: ");
                        allAppsHashCodes.append(entry.getValue().getAppsHashCode());
                    }
                    logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
                            allAppsHashCodes);
                }
            } catch (Throwable e) {
                logger.error("Cannot fetch registry from server", e);
            }        
        }

    /**
         * Fetches the registry information.  获取注册表信息
         *除非在协调eureka服务器和客户端注册表信息方面存在问题,否则此方法只尝试在第一次获取之后获取增量
         */
        private boolean fetchRegistry(boolean forceFullRegistryFetch) {
            Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

            try {
                // If the delta is disabled or if it is the first time, get all applications
                //如果禁用了增量,或者这是第一次,则获取所有应用程序
                Applications applications = getApplications();

                if (clientConfig.shouldDisableDelta()
                        || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                        || forceFullRegistryFetch
                        || (applications == null)
                        || (applications.getRegisteredApplications().size() == 0)
                        || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
                {
                    logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                    logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                    logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                    logger.info("Application is null : {}", (applications == null));
                    logger.info("Registered Applications size is zero : {}",
                            (applications.getRegisteredApplications().size() == 0));
                    logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                    //第一次获取
                    getAndStoreFullRegistry();
                } else {
                    //非第一次获取
                    getAndUpdateDelta(applications);
                }
                applications.setAppsHashCode(applications.getReconcileHashCode());
                logTotalInstances();
            } catch (Throwable e) {
                logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
                return false;
            } finally {
                if (tracer != null) {
                    tracer.stop();
                }
            }

            // Notify about cache refresh before updating the instance remote status
            //在更新实例远程状态之前,通知缓存刷新
            onCacheRefreshed();

            // Update remote status based on refreshed data held in the cache
            //根据缓存中保存的刷新数据更新远程状态
            updateInstanceRemoteStatus();

            // registry was fetched successfully, so return true
            //成功获取registry,因此返回true
            return true;
        }

    /**
         * Gets the full registry information from the eureka server and stores it locally.
         * 从eureka服务器获取完整的注册表信息,并将其存储在本地
         */
    private void getAndStoreFullRegistry() throws Throwable {
            long currentUpdateGeneration = fetchRegistryGeneration.get();

            logger.info("Getting all instance registry info from the eureka server");

            Applications apps = null;
            EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                    ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                    : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
            if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
                apps = httpResponse.getEntity();
            }
            logger.info("The response status is {}", httpResponse.getStatusCode());

            if (apps == null) {
                logger.error("The application is null for some reason. Not storing this information");
            } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
                localRegionApps.set(this.filterAndShuffle(apps));
                logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
            } else {
                logger.warn("Not updating applications as another thread is updating it already");
            }
        }

    private void getAndUpdateDelta(Applications applications) throws Throwable {
            long currentUpdateGeneration = fetchRegistryGeneration.get();

            Applications delta = null;
            EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
            if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
                delta = httpResponse.getEntity();
            }

            if (delta == null) {
                logger.warn("The server does not allow the delta revision to be applied because it is not safe. Hence got the full registry.");
               //重新调用getAndStoreFullRegistry方法进行REST方式注册
                getAndStoreFullRegistry();
            } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
                logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
                String reconcileHashCode = "";
                if (fetchRegistryUpdateLock.tryLock()) {
                    try {
                        updateDelta(delta);
                        reconcileHashCode = getReconcileHashCode(applications);
                    } finally {
                        fetchRegistryUpdateLock.unlock();
                    }
                } else {
                    logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
                }
                // There is a diff in number of instances for some reason
                if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
                    reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
                }
            } else {
                logger.warn("Not updating application delta as another thread is updating it already");
                logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
            }
        }

四、服务注册中心处理

通过上面的源码分析,我们可以看到所有的交互都是通过REST请求来发起的。接下来一起来看看服务注册中心对这些请求的处理。Eureka Server对于各类REST请求的定义都位于com.netflix.eureka.resources包下。
以“服务注册”请求为例:

    @POST
        @Consumes({"application/json", "application/xml"})
        public Response addInstance(InstanceInfo info,
                                    @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
            logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
            // validate that the instanceinfo contains all the necessary required fields
            //验证instanceinfo包含所有必需的必需字段
            if (isBlank(info.getId())) {
                return Response.status(400).entity("Missing instanceId").build();
            } else if (isBlank(info.getHostName())) {
                return Response.status(400).entity("Missing hostname").build();
            } else if (isBlank(info.getIPAddr())) {
                return Response.status(400).entity("Missing ip address").build();
            } else if (isBlank(info.getAppName())) {
                return Response.status(400).entity("Missing appName").build();
            } else if (!appName.equals(info.getAppName())) {
                return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
            } else if (info.getDataCenterInfo() == null) {
                return Response.status(400).entity("Missing dataCenterInfo").build();
            } else if (info.getDataCenterInfo().getName() == null) {
                return Response.status(400).entity("Missing dataCenterInfo Name").build();
            }

            // handle cases where clients may be registering with bad DataCenterInfo with missing data
            //处理客户端可能注册了错误的DataCenterInfo而丢失数据的情况
            DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
            if (dataCenterInfo instanceof UniqueIdentifier) {
                String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
                if (isBlank(dataCenterInfoId)) {
                    boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                    if (experimental) {
                        String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                        return Response.status(400).entity(entity).build();
                    } else if (dataCenterInfo instanceof AmazonInfo) {
                        AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                        String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                        if (effectiveId == null) {
                            amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                        }
                    } else {
                        logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                    }
                }
            }

            registry.register(info, "true".equals(isReplication));
            return Response.status(204).build();  // 204 to be backwards compatible
        }

在对注册信息进行一堆校验后,会调用org.springframework.cloud.netflix.eureka.server.InstanceRegistry对象中的void **register(final InstanceInfo info, final boolean isReplication)**方法

    @Override
        public void register(final InstanceInfo info, final boolean isReplication) {
            handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
            super.register(info, isReplication);
        }

    private void handleRegistration(InstanceInfo info, int leaseDuration,
                boolean isReplication) {
            log("register " + info.getAppName() + ", vip " + info.getVIPAddress()
                    + ", leaseDuration " + leaseDuration + ", isReplication "
                    + isReplication);
            publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration,
                    isReplication));
        }

在注册函数中,先调用publishEvent函数,将该新服务注册的事件传播出去,然后调用com.netflix.eureka.registry.AbstractInstanceRegistry父类中的注册实现,将InstanceInfo中的元数据信息存储在一个ConcurrentHashMap对象中。注册中心存储了两层Map结构,第一层的key存储服务名:InstanceInfo中的属性名,第二层的key存储实例名;InstanceInfo中的instanceId属性

五、配置详解

在Eureka的服务治理体系中,主要分为服务端与客户端两个不同的角色,服务端为服务注册中心,而客户端为各个提供接口的微服务应用。
Eurake客户端的配置主要分为以下两个方面

  • 服务注册相关的配置信息,包括服务注册中心的地址、服务获取的隔间时间、可用区域等。
  • 服务实例相关的配置信息,包括服务实例的名称、IP地址、端口号、健康检查路径等。
    而Eureka服务端更多地类似于一个现成产品,大多数情况下,我们不需要修改它的配置信息。服务端配置可以看一下org.springframework.cloud.netflix.eureka.server.EurekaServerConfigBean这个类,这些参数均以eureka.server作为前缀。客户端配置则在org.springframework.cloud.netflix.eureka.EurekaClientConfigBean类,下面我们就来看看客户端的配置。

5.1服务注册类配置

5.1.1指定注册中心

《服务治理:Spring-cloud Eureka入门实例详解》的示例中,我们演示了如何将一个SpringBoot应用纳入Eureka的服务治理体系,除了引入Eureka的依赖之外,就是在配置文件中指定注册中心,主要通过eureka.client.serviceUrl参数实现。该参数的定义如下所示:

        private Map<String, String> serviceUrl = new HashMap<>();
        {
        //DEFAULT_ZONE="defaultZone",DEFAULT_URL="http://localhost:8761/eureka/"
            this.serviceUrl.put(DEFAULT_ZONE, DEFAULT_URL);
        }

它的配置值存储在HashMap类型中,并且设置一组默认值,默认值的key为defaultZone、value为http://localhost:8761/eureka/
在我们之前的示例中服务注册中心指定了1110和1111的端口,我们在服务注册类的配置里设置eureka.client.serviceUrl.defaultZone=http://localhost:1110/eureka/,http://localhost:1111/eureka
,可以配置一个注册中心,也可以配置多个实现高可用的服务注册中心集群。
另外,为了服务注册中心的安全考虑,我们也可以在配置serviceUrl时,在URL中加入相应的安全校验信息,比如
http://:@localhost:1110/eureka

5.1.2其他配置

下面整理了org.springframework.cloud.netflix.eureka.EurekaClientConfigBean类中定义的常用配置参数以及对应的说明和默认值,这些参数均以eureka.client为前缀。

参数名 说明 默认值
enable 启用Eurake true
registryFetchIntervalSeconds 从Eureka服务端获取注册信息的间隔时间,单位秒 30
instanceInfoReplicationIntervalSeconds 更新实例信息的变化到Eureka服务端的间隔时间,单位秒 30
initialInstanceInfoReplicationIntervalSeconds 初始化实例信息到Eureka服务端的间隔时间,单位秒 40
eurekaServiceUrlPollIntervalSeconds 轮询Eureka服务端地址更改的间隔时间,单位秒,当我们与SpringCloudConfig配合,动态刷新Eureka的serviceUrl地址时需要关注改参数 300
eurekaServerReadTimeoutSeconds 读取EurekaServer信息的超时时间,单位秒 8
eurekaServerConnectTimeoutSeconds 连接EurekaServer的超时时间,单位为秒 5
eurekaServerTotalConnections 从Eureka客户端到所有Eureka服务端的连接总数 200
eurekaServerTotalConnectionsPerHost 从Eureka客户端到每个Eureka服务端主机的连接总数 50
eurekaConnectionIdleTimeoutSeconds Eureka服务端连接的空闲关闭时间,单位秒 30
heartbeatExecutorThreadPoolSize 心跳连接池初始化线程数 2
heartbeatExecutorExponentialBackOffBound 心跳超时重试延迟时间的最多乘数值 10
cacheRefreshExecutorThreadPoolSize 缓存刷新线程池的初始化线程数 2
cacheRefreshExecutorExponentialBackOffBound 缓存刷新重试延迟时间的最大乘数值 10
useDnsForFetchingServiceUrls 使用DNS来获取Eureka服务端的serviceUrl false
registerWithEureka 是否将自身的实例信息注册到Eureka服务端 true
preferSameZoneEureka 是否偏好使用处于相同Zone的Eureka服务端 true
filterOnlyUpInstances 获取实时时是否过来,仅保留UP状态的实例 true
fetchRegistry 是否从Eureka服务端获取注册信息 true

5.2服务实例类配置

关于服务实例类的配置信息,我们可以查看org.springframework.cloud.netflix.eureka.EurekaInstanceConfigBean的源码来获取详细内容,这些配置信息都以eureka.instance为前缀。

5.2.1元数据

在org.springframework.cloud.netflix.eureka.EurekaInstanceConfigBean的配置信息中,有一部分内容都是对服务实例元数据的配置,所谓元数据就是Eureka客户端在向服务注册中心发送注册请求时,用来描述自身服务信息的对象,其中包含了一些标准化的元数据,比如服务名称、实例名称、实例IP、实力端口等用于服务治理额重要信息,以及一些用于负载均衡策略或是其他特殊用途的自定义元数据信息。
在使用Spring Cloud Eurake的时候,所有的配置信息都是通过org.springframework.cloud.netflix.eureka.EurekaInstanceConfigBean进行加载,但在真正进行服务注册的时候,还是会包装成com.netflix.appinfo.InstanceInfo对象发送给Eureka服务端。
其中InstanceInfo中的metadata参数是自定义的元数据信息,而其他成员变量则是标准化的元数据信息。

    @XStreamAlias("metadata")
        private volatile Map<String, String> metadata;

我们可以通过eureka.instance.=的格式对标准化元数据直接进行配置,其中propertises就是EurekaInstanceConfigBean对象中的成员变量名。而对于自定义元数据,可以通过eureka.instance.metadataMap.=的格式来进行配置,例如:

    eureka.instance.metadataMap.zone=hangzhou

5.2.2实例名配置

实例名,即InstanceInfo中的instanceId参数,它是区别同一服务中不同实例的唯一标识。在Netflix Eureka的原生实现中,实例名采用主机名作为默认值,这样的设置使得在同一主机上无法启动多个相同的服务实例。所以,在SpringCloudEureka的配置中,针对同一主机中启动多实例的情况,对实例名的默认命名做了更为合理的扩展,它采用了如下默认规则:

    ${spring.cloud.client.hostname}:${spring.application.name}:${spring.application.instance_id:${server.port}}

5.2.3端点配置

在InstanceInfo中,我们可以看到一些URL的配置信息,比如homePageUrl、statusPageUrl、healthCheckUrl,它们分别代表了应用主页的URL、状态页的URL、健康检查的URL。其中,状态页和健康检测的URL的Spring Cloud Eureka中默认使用了spring-boot-actuator模块提供的/info端点和/health端点。
大多数情况下,我们都不需要修改这个几个URL的配置,但是在需要特殊配置的时候,比如,为应用设置了context-path,这时所有spring-boot-actuator模块的监控端点都会增加一个前缀。那么在/info 和/health端点也加上类似的前缀信息

    management.context-path=/hello
    eureka.instance.statusPageUrlPath=${management.context-path}/info
    eureka.instance.healthpageUrlPath=${management.context-path}/health

在上面的示例中,我们使用相对路径来进行配置,由于Eureka的服务注册中心默认会以HTTP的方式来访问和暴露这些端点,若客户端应用以HTTPS的方式来暴露服务和监控端点时,相对路径的配置方式就无法满足需求了,所以需要采用绝对路径的配置参数的方式:

    eureka.instance.statusPageUrlPath=https://${eureka.instance.hostname}/info
    eureka.instance.healthpageUrlPath=https://${eureka.instance.hostname}/health
    eureka.instance.homePageUrl=https://${eureka.instance.hostname}/

5.2.4健康检测

默认情况下,Eureka中各个服务实例的健康检测并不是通过spring-boot-actuator模块的/health端点来实现的,而是依靠客户端心跳的方式来保持服务实例的存活。在Eureka的服务续约与剔除机制下,客户端的健康状态从注册到注册中心开始都会处于UP(准备接收通信)状态,除非心跳终止一段时间之后,服务注册中心将其剔除。默认的心跳实现方式可以有效检查客户端进程是否正常运作,但是却无法保证客户端应用能够正常的对外提供服务。
在Spring Cloud Eureka中,我们可以通过简单的配置把Eureka客户端的健康检测交给spring-boot-actuator模块的/health端点,以实现更加全面的健康状态维护。详细的配置步骤如下:

  • 在pom.xml中引入spring-boot-starter-actuator模块的依赖
  • 在application.properties中增加参数配置eureka.client.healthcheck.enabled=true
  • 在application.properties中配置可以让服务注册中心正确访问到的健康检查端点(参考端点配置)

5.2.5其他配置

参数名 说明 默认值
preferIpAddress 是否优先使用IP地址作为主机名的标识 false
leaseRenewalIntervalInSeconds Eureka客户端向服务端发送心跳的时间间隔,单位秒 30
leaseExpirationDurationInSeconds Eureka服务端在收到最后一次心跳之后等待的时间上限,单位秒。超过该时间之后服务端会将该服务实例从服务清单中剔除,从而禁止服务调用请求被发送到该实例上 90
nonSecurePortEnabled 是否启用非安全的通信端口号 true
securePortEnabled 是否启用安全的通信端口号
nonSecurePort 非安全的通信端口号 80
securePort 安全的通信端口号 443
appname 服务名。默认取spring.application.name的配置值,如果没有则为unknow
hostname 主机名,不配置的时候将根据操作系统的主机名来获取

参考文献:《Spring Cloud微服务实战》– 翟永超 著


来源:http://ddrv.cn/a/88268

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » Spring Cloud Eureka 源码解析

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏