Spring Cloud 源码分析(一)—— 服务治理

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】

前言

Spring Cloud Eureka是Spring Cloud Netflix微服务套件中的一部分,主要负责实现微服务架构中的服务治理功能,包括服务注册和服务发现。本文从源码的角度,分析Eureka的服务治理机制。

Eureka服务治理机制

在Eureka服务治理架构中有三种角色,分别是:

  • 服务注册中心
    Eureka提供的服务端,提供服务的注册和发现功能
  • 服务提供者
    提供服务的应用,需要将自己提供的服务注册到Eureka,以供其他应用发现
  • 服务消费者
    消费者应用,从服务注册中心获取服务列表,从而知道从何处调用需要的服务

DiscoveryClient类

一般我们将Spring boot应用注册到Eureka Server或者需要从Eureka Server中获取服务列表时,需要做如下两件事情:

  • 在应用主类中配置@EnableDiscoveryClient注解
  • 在application.properties中用eureka.client.serviceUrl.defaultZone参数指定服务注册中心地址

先从@EnableDiscoveryClient注解开始,看一下源码:
20191017100285\_1.png

从图中红框标注的注释可以得知,此注解用来开启DiscoveryClient的实例。搜索DiscoveryClient,可以发现一个类和一个接口,先来看一下类图:
20191017100285\_2.png

从DiscoveryClient(com.netflix.discovery.DiscoveryClient)的类注释中,我们可以知道,该类负责向Eureka Server注册服务实例、向Eureka Server服务租约、当服务关闭时,向Eureka Server取消租约、查询Eureka Server中的服务实例列表。

服务注册

查看DiscoveryClient的构造方法,可以看到调用了如下方法:

        private void initScheduledTasks() {
            ……

            if (clientConfig.shouldRegisterWithEureka()) {
                ……
                // InstanceInfo replicator
                instanceInfoReplicator = new InstanceInfoReplicator(
                        this,
                        instanceInfo,
                        clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                        2); // burstSize

               ……

                instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
            } else {
                logger.info("Not registering with Eureka server per configuration");
            }
        }

可以看到,如果当前应用需要被注册到Eureka中,则创建一个InstanceInfoReplicator类的实例,它会执行一个定时任务,

        public void start(int initialDelayMs) {
            if (started.compareAndSet(false, true)) {
                instanceInfo.setIsDirty();  // for initial register
                Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
                scheduledPeriodicRef.set(next);
            }
        }

该任务的具体执行内容可以查看该类的run方法,

        public void run() {
            try {
                discoveryClient.refreshInstanceInfo();

                Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
                if (dirtyTimestamp != null) {
                    discoveryClient.register();
                    instanceInfo.unsetIsDirty(dirtyTimestamp);
                }
            } catch (Throwable t) {
                logger.warn("There was a problem with the instance info replicator", t);
            } finally {
                Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
                scheduledPeriodicRef.set(next);
            }
        }

        boolean register() throws Throwable {
            logger.info(PREFIX + appPathIdentifier + ": registering service...");
            EurekaHttpResponse<Void> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
            } catch (Exception e) {
                logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
                throw e;
            }
            if (logger.isInfoEnabled()) {
                logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
            }
            return httpResponse.getStatusCode() == 204;
        }

看到这里,我们应该能猜出来,注册操作其实就是通过REST请求的方式进行的,传入的参数为InstanceInfo对象,内部保存的就是关于服务的元数据。

服务获取和服务续约

刚才的initScheduledTasks方法中还有两个定时任务,分别是“服务获取”和“服务续约”,

        private void initScheduledTasks() {
            if (clientConfig.shouldFetchRegistry()) {
                // registry cache refresh timer
                int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "cacheRefresh",
                                scheduler,
                                cacheRefreshExecutor,
                                registryFetchIntervalSeconds,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new CacheRefreshThread()
                        ),
                        registryFetchIntervalSeconds, TimeUnit.SECONDS);
            }

            if (clientConfig.shouldRegisterWithEureka()) {
                int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
                int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
                logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

                // Heartbeat timer
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "heartbeat",
                                scheduler,
                                heartbeatExecutor,
                                renewalIntervalInSecs,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new HeartbeatThread()
                        ),
                        renewalIntervalInSecs, TimeUnit.SECONDS);

                // InstanceInfo replicator
                ……
            } else {
                logger.info("Not registering with Eureka server per configuration");
            }
        }

initScheduledTasks方法会启动HeartbeatThread线程定时完成服务续约,默认续约的时间间隔为30秒,默认的服务信息保留时间为90秒,可以通过application.properties中启动参数设置。

HeartbeatThread线程通过调用renew方法,直接以REST请求的方式实现服务续约。

        private class HeartbeatThread implements Runnable {

            public void run() {
                if (renew()) {
                    lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
                }
            }
        }

        boolean renew() {
            EurekaHttpResponse<InstanceInfo> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
                logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
                if (httpResponse.getStatusCode() == 404) {
                    REREGISTER_COUNTER.increment();
                    logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
                    return register();
                }
                return httpResponse.getStatusCode() == 200;
            } catch (Throwable e) {
                logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
                return false;
            }
        }

再来看服务获取逻辑,initScheduledTasks方法会启动CacheRefreshThread线程定时完成服务获取,默认的时间间隔为30秒,也可以通过application.properties中启动参数设置。

        class CacheRefreshThread implements Runnable {
            public void run() {
                refreshRegistry();
            }
        }

        @VisibleForTesting
        void refreshRegistry() {
            try {
                ……
                boolean success = fetchRegistry(remoteRegionsModified);
                ……
            } catch (Throwable e) {
                logger.error("Cannot fetch registry from server", e);
            }        
        }

        private boolean fetchRegistry(boolean forceFullRegistryFetch) {
            Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

            try {
                // If the delta is disabled or if it is the first time, get all
                // applications
                Applications applications = getApplications();

                if (clientConfig.shouldDisableDelta()
                        || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                        || forceFullRegistryFetch
                        || (applications == null)
                        || (applications.getRegisteredApplications().size() == 0)
                        || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
                {
                    ……
                    getAndStoreFullRegistry();
                } else {
                    getAndUpdateDelta(applications);
                }
                applications.setAppsHashCode(applications.getReconcileHashCode());
                logTotalInstances();
            } 
            ……

            // Notify about cache refresh before updating the instance remote status
            onCacheRefreshed();

            // Update remote status based on refreshed data held in the cache
            updateInstanceRemoteStatus();

            // registry was fetched successfully, so return true
            return true;
        }

        private void getAndStoreFullRegistry() throws Throwable {
            long currentUpdateGeneration = fetchRegistryGeneration.get();

            Applications apps = null;
            EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                    ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                    : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
            if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
                apps = httpResponse.getEntity();
            }
            logger.info("The response status is {}", httpResponse.getStatusCode());

            ……
        }

可以看到服务获取逻辑相对复杂一点,会根据是否是第一次获取发起不同的REST请求和相应的处理,处理逻辑又分全量获取和差量获取。

服务注册中心处理

通过上述分析,我们得知所有的交互都通过REST请求来发起的,那服务注册中心又是如何处理这些请求的呢?Eureka Server对于各类REST请求的定义都位于com.netflix.eureka.resources包中,以处理服务注册为例,

        @POST
        @Consumes({"application/json", "application/xml"})
        public Response addInstance(InstanceInfo info,
                                    @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
            ……
            registry.register(info, "true".equals(isReplication));
            return Response.status(204).build();  // 204 to be backwards compatible
        }

        public void register(InstanceInfo info, boolean isReplication) {
            this.handleRegistration(info, this.resolveInstanceLeaseDuration(info), isReplication);
            super.register(info, isReplication);
        }

        private void handleRegistration(InstanceInfo info, int leaseDuration, boolean isReplication) {
            ……
            this.publishEvent(new EurekaInstanceRegisteredEvent(this, info, leaseDuration, isReplication));
        }

服务注册的过程大致如下,先调用publishEvent方法,将该新服务注册的事件传播出去,再调用父类方法注册实现,将InstanceInfo中的元数据保存在一个ConcurrentHashMap对象中。注册中心存储了两层Map结构,第一层key为存储的服务名称,value为InstanceInfo中的appName属性,第二层key为实例名称,value为InstanceInfo中的instanceId属性。

总结

本文从源码的角度介绍了Spring Cloud Eureka的服务治理机制,但真正的服务治理机制绝非如此简单,本文关注了其最重要的几个方面,希望能对读者有所帮助。


来源:[]()

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » Spring Cloud 源码分析(一)—— 服务治理

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏