深入Spring Cloud源码设计-eureka一篇就够

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】
免费领取 2000+ 道 Java 面试题

What is eureka ?

Eureka is a REST (Representational State Transfer) based service that is primarily used in the AWS cloud for locating services for the purpose of load balancing and failover of middle-tier servers. We call this service, the Eureka Server. Eureka also comes with a Java-based client component,the Eureka Client, which makes interactions with the service much easier. The client also has a built-in load balancer that does basic round-robin load balancing.

eureka 发音 [juˈri:kə] 伊瑞克

翻译后可简单概括为:

Eureka是一个基于 REST 的服务,用于定位服务,以实现云端中间层服务发现和故障转移的中间件。它有两个重要组成部分,Eureka服务端和基于JAVA的客户端组件。

eureka核心模块

20191123100245\_1.png

由上图可以看出,eureka主要有三种角色:

  1. Eureka Server
    eureka服务端,主要用来做服务的注册与发现
  2. Service Provider
    服务的实际提供方,会将服务注册到Eureka Server上
  3. Service Consumer
    服务消费方,从Eureka Server获取服务列表,向Service Provider发起真实调用请求

TIP:这三个角色是逻辑上的划分,可能在使用时,这几个角色可以是同一个实例;
一个Service Provider既可以是Service Consumer,也可以是Service Provider

20191123100245\_2.png

上图进一步展示了3个角色之间的交互。

  1. ServiceProvider会向Eureka Server做Register(服务注册)、Renew(服务续约)、Cancel(服务下线)等操作。
  2. EurekaServer之间会做注册服务的同步,从而保证状态一致
  3. ServiceConsumer会向Eureka Server获取注册服务列表,并消费服务

eureka源码阅读入口

eureka的服务端

A: EurekaBootStrap.java 实现了ServletContextListener接口,当项目启动时会初始化该类,触发contextInitialized方法的执行。

B: contextInitialized中调用了initEurekaServerContext方法。

C:
initEurekaServerContext方法依次调用了三个方法
PeerAwareInstanceRegistryImpl初始化
PeerEurekaNodes初始化
DefaultEurekaServerContext初始化,该类有@Singleton注解,并含有@PostConstruct注解(构造方法执行后执行)的initialize方法

D:
initialize调用了两个方法
PeerEurekaNodes的start方法
PeerAwareInstanceRegistryImpl的init方法

E:
init方法依次调用三个方法
initializedResponseCache方法,该方法继续调用ResponseCacheImpl类,最后使用的是guava cache
scheduleRenewalThresholdUpdateTask使用Timer做定时任务定时更新新注册服务配置参数更新
initRemoteRegionRegistry初始化注册中心

eureka的客户端

DiscoveryClient.java这个类中含有了client侧的很多操作:
register()
renew()
unregister()
fetchRegistry() 等等

register()方法调用过程

最为主要的register方法,是在DiscoveryClient初始化的过程中被调用,如下DiscoveryClient构造方法中调用了initScheduledTasks()

    @Inject
        DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                        Provider<BackupRegistry> backupRegistryProvider) {
            ...

            if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
                fetchRegistryFromBackup();
            }

            // call and execute the pre registration handler before all background tasks (inc registration) is started
            if (this.preRegistrationHandler != null) {
                this.preRegistrationHandler.beforeRegistration();
            }
            initScheduledTasks();

            try {
                Monitors.registerObject(this);
            } catch (Throwable e) {
                logger.warn("Cannot register timers", e);
            }

            // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
            // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); }

initScheduledTasks 方法中启动了InstanceInfoReplicator线程,
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds())

       private void initScheduledTasks() {
                // Heartbeat timer
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "heartbeat",
                                scheduler,
                                heartbeatExecutor,
                                renewalIntervalInSecs,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new HeartbeatThread()
                        ),
                        renewalIntervalInSecs, TimeUnit.SECONDS);

                // InstanceInfo replicator
                instanceInfoReplicator = new InstanceInfoReplicator(
                        this,
                        instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                        2); // burstSize

                statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                    @Override
                    public String getId() {
                        return "statusChangeListener";
                    }
              ...
                instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
            } else {
                logger.info("Not registering with Eureka server per configuration");
            }
        }

InstanceInfoReplicator实现了Runnable接口是一个线程,其run方法逻辑如下,可以看到其调用了 discoveryClient.register();

     public void run() {
            try {
                discoveryClient.refreshInstanceInfo();

                Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
                if (dirtyTimestamp != null) {
                    discoveryClient.register();
                    instanceInfo.unsetIsDirty(dirtyTimestamp);
                }
            } catch (Throwable t) {
                logger.warn("There was a problem with the instance info replicator", t);
            } finally {
                Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
                scheduledPeriodicRef.set(next);
            }
        }

再来看看DiscoveryClient的register方法

      /** * Register with the eureka service by making the appropriate REST call. */
        boolean register() throws Throwable {
            logger.info(PREFIX + appPathIdentifier + ": registering service...");
            EurekaHttpResponse<Void> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
            } catch (Exception e) {
                logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
                throw e;
            }
            if (logger.isInfoEnabled()) {
                logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
            }
            return httpResponse.getStatusCode() == 204;
        }

renew ()方法调用过程

Renew(服务续约)操作由Service Provider定期调用,类似于heartbeat。主要是用来告诉Eureka Server Service Provider还活着,避免服务被剔除掉

      /** * Renew with the eureka service by making the appropriate REST call */
        boolean renew() {
            EurekaHttpResponse<InstanceInfo> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
                logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
                if (httpResponse.getStatusCode() == 404) {
                    REREGISTER_COUNTER.increment();
                    logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
                    return register();
                }
                return httpResponse.getStatusCode() == 200;
            } catch (Throwable e) {
                logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
                return false;
            }
        }

renew方法在HeartbeatThread线程中被调用

       /** * The heartbeat task that renews the lease in the given intervals. */
        private class HeartbeatThread implements Runnable {

            public void run() {
                if (renew()) {
                    lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
                }
            }
        }

HeartbeatThread线程在initScheduledTasks方法中被调用

     private void initScheduledTasks() {
       ...
                // Heartbeat timer
                scheduler.schedule(
                        new TimedSupervisorTask(
                                "heartbeat",
                                scheduler,
                                heartbeatExecutor,
                                renewalIntervalInSecs,
                                TimeUnit.SECONDS,
                                expBackOffBound,
                                new HeartbeatThread()
                        ),
                        renewalIntervalInSecs, TimeUnit.SECONDS);

                // InstanceInfo replicator
                instanceInfoReplicator = new InstanceInfoReplicator(
                        this,
                        instanceInfo,
                        clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                        2); // burstSize
    ...
    }

initScheduledTasks是在DiscoveryClient构造函数初始化过程中被调用。

通过register方法和renew方法的分析,相信大家已经摸索出eureka代码的套路,其他的方法,大家可以自己去深入理解了。

参考资料

http://xujin.org/categories/Spring-Cloud-Eureka/

http://blog.csdn.net/jenny8080/article/details/52448403

https://github.com/Netflix/eureka

https://github.com/spring-cloud/spring-cloud-netflix/tree/v1.2.2.RELEASE

https://springcloud.cc/


来源:http://ddrv.cn/a/88268

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » 深入Spring Cloud源码设计-eureka一篇就够

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏