spring cloud eureka client源码解析

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】
免费领取 2000+ 道 Java 面试题

前言

eureka是springcloud 常用的注册中心,这里简单介绍下,eureka client注册逻辑的实现。eureka-client是1.6.2版本。

eureka server api参考 https://blog.csdn.net/qq_30062125/article/details/83829357

这里,eureka client分析主要分三个步骤:

  • eureka client 自动配置类EurekaClientAutoConfiguration是怎么找到的
  • eureka客户端逻辑是怎么触发的
  • eureka client 如何发起注册

1、 自动配置类是怎么找到的

spring boot项目启动类一般会加一个注解@SpringBootApplication,这个注解包含了@EnableAutoConfiguration。@EnableAutoConfiguration是springboot各种自动配置插件的入口。

EnableAutoConfiguration 代码清单

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Inherited
    @AutoConfigurationPackage
    @Import(EnableAutoConfigurationImportSelector.class)
    public @interface EnableAutoConfiguration {

        String ENABLED_OVERRIDE_PROPERTY = "spring.boot.enableautoconfiguration";

        /**
         * Exclude specific auto-configuration classes such that they will never be applied.
         * @return the classes to exclude
         */
        Class<?>[] exclude() default {};

        /**
         * Exclude specific auto-configuration class names such that they will never be
         * applied.
         * @return the class names to exclude
         * @since 1.3.0
         */
        String[] excludeName() default {};

    }

我们可以看到@Import(EnableAutoConfigurationImportSelector.class),进入它的父类AutoConfigurationImportSelector,可以看到引入bean逻辑:

    @Override
        public String[] selectImports(AnnotationMetadata annotationMetadata) {
            if (!isEnabled(annotationMetadata)) {
                return NO_IMPORTS;
            }
            try {
                AutoConfigurationMetadata autoConfigurationMetadata = AutoConfigurationMetadataLoader
                        .loadMetadata(this.beanClassLoader);
                AnnotationAttributes attributes = getAttributes(annotationMetadata);
                // 获取需要加载到spring容器中的bean列表
                List<String> configurations = getCandidateConfigurations(annotationMetadata,
                        attributes);
                configurations = removeDuplicates(configurations);
                configurations = sort(configurations, autoConfigurationMetadata);
                Set<String> exclusions = getExclusions(annotationMetadata, attributes);
                checkExcludedClasses(configurations, exclusions);
                configurations.removeAll(exclusions);
                configurations = filter(configurations, autoConfigurationMetadata);
                fireAutoConfigurationImportEvents(configurations, exclusions);
                return configurations.toArray(new String[configurations.size()]);
            }
            catch (IOException ex) {
                throw new IllegalStateException(ex);
            }
        }

getCandidateConfigurations 逻辑如下:

    protected List<String> getCandidateConfigurations(AnnotationMetadata metadata,
                AnnotationAttributes attributes) {
            List<String> configurations = SpringFactoriesLoader.loadFactoryNames(
                    getSpringFactoriesLoaderFactoryClass(), getBeanClassLoader());
            Assert.notEmpty(configurations,
                    "No auto configuration classes found in META-INF/spring.factories. If you "
                            + "are using a custom packaging, make sure that file is correct.");
            return configurations;
        }

这里会加载所有的META-INF/spring.factories文件,并且会循环找出来org.springframework.boot.autoconfigure.EnableAutoConfiguration对应的自动注册类,返回回去。
我们这里关心的是Eureka client相关的,路径在spring-cloud-netflix-eureka-client/1.3.6.RELEASE/spring-cloud-netflix-eureka-client-1.3.6.RELEASE.jar!/META-INF/spring.factories,其中内容为:

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
    org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
    org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\
    org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
    org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration

    org.springframework.cloud.bootstrap.BootstrapConfiguration=\
    org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration

    org.springframework.cloud.client.discovery.EnableDiscoveryClient=\
    org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration

这里加载了org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,这是一个比较重要的类,eureka client的很多逻辑都是从这入口的。

2、 eureka客户端逻辑是怎么触发的

开启EurekaClient需要配置@EnableDiscoveryClient或者@EnableEurekaClient,当前版本,这两个没啥区别。

上面我们分析了如何加载EurekaClientAutoConfiguration,不过如果细心看一下,我们会发现EurekaClientAutoConfiguration依旧需要一些条件才会加载。

    @Configuration
    @EnableConfigurationProperties
    @ConditionalOnClass(EurekaClientConfig.class)
    @ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)
    @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
    @AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
            CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
    @AutoConfigureAfter(name = "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration")
    public class EurekaClientAutoConfiguration {
    ......
    }

其中关键的一点是 @ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)
我们来看下这个bean是怎么加载的。
这得从@EnableDiscoveryClient开始说起:

EnableDiscoveryClient代码:

    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @Documented
    @Inherited
    @Import(EnableDiscoveryClientImportSelector.class)
    public @interface EnableDiscoveryClient {

        /**
         * If true, the ServiceRegistry will automatically register the local server.
         */
        boolean autoRegister() default true;
    }

原理和上面类似,也是通过@Import注入bean。
我们看下EnableDiscoveryClientImportSelector的父类方法 org.springframework.cloud.commons.util.SpringFactoryImportSelector#selectImports:

    @Override
        public String[] selectImports(AnnotationMetadata metadata) {
            if (!isEnabled()) {
                return new String[0];
            }
            AnnotationAttributes attributes = AnnotationAttributes.fromMap(
                    metadata.getAnnotationAttributes(this.annotationClass.getName(), true));

            Assert.notNull(attributes, "No " + getSimpleName() + " attributes found. Is "
                    + metadata.getClassName() + " annotated with @" + getSimpleName() + "?");

            // 获取需要加载的bean列表
            // Find all possible auto configuration classes, filtering duplicates
            List<String> factories = new ArrayList<>(new LinkedHashSet<>(SpringFactoriesLoader
                    .loadFactoryNames(this.annotationClass, this.beanClassLoader)));

            if (factories.isEmpty() && !hasDefaultFactory()) {
                throw new IllegalStateException("Annotation @" + getSimpleName()
                        + " found, but there are no implementations. Did you forget to include a starter?");
            }

            if (factories.size() > 1) {
                // there should only ever be one DiscoveryClient, but there might be more than
                // one factory
                log.warn("More than one implementation " + "of @" + getSimpleName()
                        + " (now relying on @Conditionals to pick one): " + factories);
            }

            return factories.toArray(new String[factories.size()]);
        }

和上面自动注入逻辑类似,也是从META-INF/spring.factories中查找当前注解对应的加载类。
这里找的是

    org.springframework.cloud.client.discovery.EnableDiscoveryClient=\
    org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration

查看EurekaDiscoveryClientConfiguration代码

    @Configuration
    @EnableConfigurationProperties
    @ConditionalOnClass(EurekaClientConfig.class)
    @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
    @CommonsLog
    public class EurekaDiscoveryClientConfiguration {

        class Marker {}

        @Bean
        public Marker eurekaDiscoverClientMarker() {
            return new Marker();
        }

     ......
    }

最上面就是Marker bean的声明,也就是前面@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)依赖的bean,所以只有加上@EnableDiscoveryClient,eureka客户端逻辑才会生效。

3、 eureka client 如何发起注册

首先看org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration#eurekaInstanceConfigBean,eureka client的客户端配置基本都是通过它来加载的。

    @Bean
        @ConditionalOnMissingBean(value = EurekaInstanceConfig.class, search = SearchStrategy.CURRENT)
        public EurekaInstanceConfigBean eurekaInstanceConfigBean(InetUtils inetUtils) {
            RelaxedPropertyResolver relaxedPropertyResolver = new RelaxedPropertyResolver(env, "eureka.instance.");
            String hostname = relaxedPropertyResolver.getProperty("hostname");
            boolean preferIpAddress = Boolean.parseBoolean(relaxedPropertyResolver.getProperty("preferIpAddress"));
            EurekaInstanceConfigBean instance = new EurekaInstanceConfigBean(inetUtils);
            instance.setNonSecurePort(this.nonSecurePort);
            instance.setInstanceId(getDefaultInstanceId(this.env));
            instance.setPreferIpAddress(preferIpAddress);

            if (this.managementPort != this.nonSecurePort && this.managementPort != 0) {
                if (StringUtils.hasText(hostname)) {
                    instance.setHostname(hostname);
                }
                String statusPageUrlPath = relaxedPropertyResolver.getProperty("statusPageUrlPath");
                String healthCheckUrlPath = relaxedPropertyResolver.getProperty("healthCheckUrlPath");
                if (StringUtils.hasText(statusPageUrlPath)) {
                    instance.setStatusPageUrlPath(statusPageUrlPath);
                }
                if (StringUtils.hasText(healthCheckUrlPath)) {
                    instance.setHealthCheckUrlPath(healthCheckUrlPath);
                }
                String scheme = instance.getSecurePortEnabled() ? "https" : "http";
                instance.setStatusPageUrl(scheme + "://" + instance.getHostname() + ":"
                        + this.managementPort + instance.getStatusPageUrlPath());
                instance.setHealthCheckUrl(scheme + "://" + instance.getHostname() + ":"
                        + this.managementPort + instance.getHealthCheckUrlPath());
            }
            return instance;
        }

由于EurekaInstanceConfigBean类上面配置了@ConfigurationProperties,所以生成bean的过程中,在ConfigurationPropertiesBindingPostProcessor逻辑中,会注入配置文件的配置参数。

    @Data
    @ConfigurationProperties("eureka.instance")
    public class EurekaInstanceConfigBean implements CloudEurekaInstanceConfig, EnvironmentAware {

EurekaInstanceConfigBean这个bean为ApplicationInfoManager依赖,而ApplicationInfoManager被EurekaClient依赖。

    @Bean
            @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
            public ApplicationInfoManager eurekaApplicationInfoManager(
                    EurekaInstanceConfig config) {
                InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
                return new ApplicationInfoManager(config, instanceInfo);
            }

eurekaClient bean是client的入口,注入逻辑
主要在
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration.EurekaClientConfiguration#eurekaClient

org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration.RefreshableEurekaClientConfiguration#eurekaClient
两个是针对不同场景的client注入。

EurekaClient bean 声明

    @Bean(destroyMethod = "shutdown")
            @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
            public EurekaClient eurekaClient(ApplicationInfoManager manager,
                    EurekaClientConfig config) {
                return new CloudEurekaClient(manager, config, this.optionalArgs,
                        this.context);
            }

CloudEurekaClient 构造函数

    public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,
                                 EurekaClientConfig config,
                                 DiscoveryClientOptionalArgs args,
                                 ApplicationEventPublisher publisher) {
            super(applicationInfoManager, config, args);
            this.applicationInfoManager = applicationInfoManager;
            this.publisher = publisher;
            this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class, "eurekaTransport");
            ReflectionUtils.makeAccessible(this.eurekaTransportField);
        }

查看CloudEurekaClient父类DiscoveryClient声明。
com.netflix.discovery.DiscoveryClient#DiscoveryClient

    @Inject
        DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                        Provider<BackupRegistry> backupRegistryProvider) {
            ......
            initScheduledTasks();
            .....
        }

注册主要通过initScheduledTasks实现

    /**
         * Initializes all scheduled tasks.
         */
        private void initScheduledTasks() {
            if (clientConfig.shouldFetchRegistry()) {
                // registry cache refresh timer
                int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
                int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "cacheRefresh",
                                scheduler,
                                cacheRefreshExecutor,
                                registryFetchIntervalSeconds,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new CacheRefreshThread()
                        ),
                        registryFetchIntervalSeconds, TimeUnit.SECONDS);
            }

            if (clientConfig.shouldRegisterWithEureka()) {
                int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
                int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
                logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

                // Heartbeat timer
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "heartbeat",
                                scheduler,
                                heartbeatExecutor,
                                renewalIntervalInSecs,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new HeartbeatThread()
                        ),
                        renewalIntervalInSecs, TimeUnit.SECONDS);

                // InstanceInfo replicator
                instanceInfoReplicator = new InstanceInfoReplicator(
                        this,
                        instanceInfo,
                        clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                        2); // burstSize

                statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                    @Override
                    public String getId() {
                        return "statusChangeListener";
                    }

                    @Override
                    public void notify(StatusChangeEvent statusChangeEvent) {
                        if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                                InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                            // log at warn level if DOWN was involved
                            logger.warn("Saw local status change event {}", statusChangeEvent);
                        } else {
                            logger.info("Saw local status change event {}", statusChangeEvent);
                        }
                        instanceInfoReplicator.onDemandUpdate();
                    }
                };

                if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                    applicationInfoManager.registerStatusChangeListener(statusChangeListener);
                }

                instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
            } else {
                logger.info("Not registering with Eureka server per configuration");
            }
        }

这里我们看下heartbeat定时任务,通过new HeartbeatThread()实现。

     /**
         * The heartbeat task that renews the lease in the given intervals.
         */
        private class HeartbeatThread implements Runnable {

            public void run() {
                if (renew()) {
                    lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
                }
            }
        }

renew()逻辑,向注册中心发送心跳返回404,就会发起注册。

    /**
         * Renew with the eureka service by making the appropriate REST call
         */
        boolean renew() {
            EurekaHttpResponse<InstanceInfo> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
                logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
                if (httpResponse.getStatusCode() == 404) {
                    REREGISTER_COUNTER.increment();
                    logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
                    return register();
                }
                return httpResponse.getStatusCode() == 200;
            } catch (Throwable e) {
                logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
                return false;
            }
        }

注册服务到注册中心

    /**
         * Register with the eureka service by making the appropriate REST call.
         */
        boolean register() throws Throwable {
            logger.info(PREFIX + appPathIdentifier + ": registering service...");
            EurekaHttpResponse<Void> httpResponse;
            try {
                // 实际注册逻辑
                httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
            } catch (Exception e) {
                logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
                throw e;
            }
            if (logger.isInfoEnabled()) {
                logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
            }
            return httpResponse.getStatusCode() == 204;
        }

最终调用com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient#register

    @Override
        public EurekaHttpResponse<Void> register(InstanceInfo info) {
            // 这个地方发起post请求,url类似http://localhost:8761/eureka/apps/SERVICES2
            // 传递的对象是 InstanceInfo
            String urlPath = "apps/" + info.getAppName();
            ClientResponse response = null;
            try {
                Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
                addExtraHeaders(resourceBuilder);
                response = resourceBuilder
                        .header("Accept-Encoding", "gzip")
                        .type(MediaType.APPLICATION_JSON_TYPE)
                        .accept(MediaType.APPLICATION_JSON)
                        .post(ClientResponse.class, info);
                return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
            } finally {
                if (logger.isDebugEnabled()) {
                    logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                            response == null ? "N/A" : response.getStatus());
                }
                if (response != null) {
                    response.close();
                }
            }
        }

到这里 服务已经注册到注册中心了。


来源:http://ddrv.cn/a/88268

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » spring cloud eureka client源码解析

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏