spring-cloud-eureka (二) Client – Server 接口交互(消息发送)源码分析

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】
免费领取10G资料包与项目实战视频资料

上一篇文章中有介绍spring-cloud-eureka的原因,以及一部分源码分析了服务在启动时是如何加载并运行spring-cloud-eureka的,这一篇文章将从源码的角度来分析spring-cloud-eureka是如何进行服务治理的。

服务注册

服务注册的真正入口在com.netflix.discovery.DiscoveryClient#register()

    public class DiscoveryClient implements EurekaClient {
        ...
        boolean register() throws Throwable {
            logger.info(PREFIX + appPathIdentifier + ": registering service...");
            EurekaHttpResponse<Void> httpResponse;
            try {
                //发送注册信息
                httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
            } catch (Exception e) {
                logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
                throw e;
            }
            if (logger.isInfoEnabled()) {
                logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
            }
            return httpResponse.getStatusCode() == 204;
        }
        ...
    }

从代码中可以看到,eureka注册时,是将实例信息传递给eurekaTransport.registrationClient.register(InstanceInfo)方法,eurekaTransport.registrationClient是一个EurekaHttpClient接口,它的注释中说明这是一个底层的eureka http 通信api,也就是说,服务注册消息、服务续约消息(心跳)、服务下线、服务获取等,都是由这个接口发送的。往上翻看源码,发现eurekaTransport.registrationClient是一个com.netflix.discovery.shared.transport.decorator.SessionedEurekaHttpClient实例,也就是注册信息是由该类发送的。

com.netflix.discovery.DiscoveryClient

    public class DiscoveryClient implements EurekaClient {

        ...
        //构造方法
        DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                        Provider<BackupRegistry> backupRegistryProvider) {
        ...
        //创建EurekaTransport,并初始化
        eurekaTransport = new EurekaTransport();
                scheduleServerEndpointTask(eurekaTransport, args);
        ...
        }
        ...
        private void scheduleServerEndpointTask(EurekaTransport eurekaTransport,
                                                AbstractDiscoveryClientOptionalArgs args) {
         ...
            if (clientConfig.shouldRegisterWithEureka()) {
                EurekaHttpClientFactory newRegistrationClientFactory = null;
                EurekaHttpClient newRegistrationClient = null;
                try {
                    //创建注册客户端工厂类
                    newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory(
                            eurekaTransport.bootstrapResolver,
                            eurekaTransport.transportClientFactory,
                            transportConfig
                    );
                    //创建注册客户端
                    newRegistrationClient = newRegistrationClientFactory.newClient();
                } catch (Exception e) {
                    logger.warn("Transport initialization failure", e);
                }
                //引用
                eurekaTransport.registrationClientFactory = newRegistrationClientFactory;
                eurekaTransport.registrationClient = newRegistrationClient;
            }
            ...
        }
    }

com.netflix.discovery.shared.transport.EurekaHttpClients

    public final class EurekaHttpClients {
        ...
        public static EurekaHttpClientFactory registrationClientFactory(ClusterResolver bootstrapResolver,
                                                                        TransportClientFactory transportClientFactory,
                                                                        EurekaTransportConfig transportConfig) {
            return canonicalClientFactory(EurekaClientNames.REGISTRATION, transportConfig, bootstrapResolver, transportClientFactory);
        }

        static EurekaHttpClientFactory canonicalClientFactory(final String name,
                                                              final EurekaTransportConfig transportConfig,
                                                              final ClusterResolver<EurekaEndpoint> clusterResolver,
                                                              final TransportClientFactory transportClientFactory) {
            //一个客户端工厂匿名类
            return new EurekaHttpClientFactory() {
                @Override
                public EurekaHttpClient newClient() {
                    //创建eureka http客户端并返回
                    return new SessionedEurekaHttpClient(
                            name,
                            /* 新建了一个RetryableEurekaHttpClient工厂 */ RetryableEurekaHttpClient.createFactory(
                                    name,
                                    transportConfig,
                                    clusterResolver,
                                    RedirectingEurekaHttpClient.createFactory(transportClientFactory),
                                    ServerStatusEvaluators.legacyEvaluator()),
                            transportConfig.getSessionedClientReconnectIntervalSeconds() * 1000
                    );
                }

                @Override
                public void shutdown() {
                    wrapClosable(clusterResolver).shutdown();
                }
            };
        }
        ...
    }

注册客户端(eurekaTransport.registrationClient)的创建过程是这样的

DiscoveryClient.init()
– scheduleServerEndpointTask()
– – EurekaHttpClients.registrationClientFactory()
– – – canonicalClientFactory()
– – EurekaHttpClientFactory.newClient()

20191123100249\_1.png

发送注册消息

你以为最终的注册消息是由SessionedEurekaHttpClient发送的么?我一开始也是这样以为的,最后发现太天真了,看看调用链,看看这其中包装了一大堆EurekaHttpClient实现类。
20191123100249\_2.png

可以发现,最终调用的是com.netflix.discovery.shared.transport.jersey.JerseyApplicationClient对象。

20191123100249\_3.png

上面是EurekaHttpClient的类图结构,最终真正发消息的是由AbstractJerseyEurekaHttpClient发送的,它有两个子类,分别是JerseyApplicationClient和JerseyReplicationClient,JerseyReplicationClient类是用于服务同步的,后面再说,先看JerseyApplicationClient类。

    package com.netflix.discovery.shared.transport.jersey;
    ...
    public class JerseyApplicationClient extends AbstractJerseyEurekaHttpClient {

        private final Map<String, String> additionalHeaders;

        public JerseyApplicationClient(Client jerseyClient, String serviceUrl, Map<String, String> additionalHeaders) {
            super(jerseyClient, serviceUrl);
            this.additionalHeaders = additionalHeaders;
        }

        @Override
        protected void addExtraHeaders(Builder webResource) {
            if (additionalHeaders != null) {
                for (String key : additionalHeaders.keySet()) {
                    webResource.header(key, additionalHeaders.get(key));
                }
            }
        }
    }

在JerseyReplicationClient 类中并没有发送消息的逻辑,那么就是在AbstractJerseyEurekaHttpClient类中了,查看该类的源码,可以看到相关的方法。

    package com.netflix.discovery.shared.transport.jersey;
    ...
    public abstract class AbstractJerseyEurekaHttpClient implements EurekaHttpClient {
        ...
        //发送注册消息
        @Override
        public EurekaHttpResponse<Void> register(InstanceInfo info) {
            String urlPath = "apps/" + info.getAppName();
            ClientResponse response = null;
            try {
                Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
                addExtraHeaders(resourceBuilder);
                response = resourceBuilder
                        .header("Accept-Encoding", "gzip")
                        .type(MediaType.APPLICATION_JSON_TYPE)
                        .accept(MediaType.APPLICATION_JSON)
                        .post(ClientResponse.class, info);
                return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
            } finally {
                if (logger.isDebugEnabled()) {
                    logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                            response == null ? "N/A" : response.getStatus());
                }
                if (response != null) {
                    response.close();
                }
            }
        }
        ...
    }

AbstractJerseyEurekaHttpClient 类中方法对应的功能

method description
register(InstanceInfo) 服务注册
cancel(String,String) 服务下线
sendHeartBeat(String,String,InstanceInfo,InstanceStatus) 服务续约
statusUpdate(String,String,InstanceStatus,InstanceInfo) 服务状态更新

假设服务实例的eureka配置是这样的

    spring:
      application:
        name: eureka-client
    server:
      port: 10101
    eureka:
      client:
        register-with-eureka: true
        fetch-registry: true
        serviceUrl:
          defaultZone: http://localhost:10002/eureka/

那么请求的路径就是http://localhost:10002/eureka/apps/EUREKA-CLIENT,并以post的方法将InstanceInfo对象提交过去,然后将注册中心响应的状态码和head信息包装起来返回。

注册中心接收注册消息

Eureka Server 通过jersey发布了一些web接口,资源类在eureka-core-{version}.jar!com.netflix.eureka.resources包下,服务注册的代码就在com.netflix.eureka.resources.ApplicationResource中。

com.netflix.eureka.resources.ApplicationResource

    package com.netflix.eureka.resources;
    ...
    @Produces({"application/xml", "application/json"})
    public class ApplicationResource {
        ...
        @POST
        @Consumes({"application/json", "application/xml"})
        public Response addInstance(InstanceInfo info,
                                    @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
            ...

            //注册中心服务登记的入口,isReplication判断是否是其它Eureka Server广播过来的。
            registry.register(info, "true".equals(isReplication));
            return Response.status(204).build();  // 204 to be backwards compatible
        }
        ...
    }

这些接口包括:

url method class description
/apps/{appId} POST ApplicationResource 服务注册
/apps/{appId} GET ApplicationResource 获取服务信息
/instances/{id} GET InstancesResource 获取服务实例信息
/apps/{appId}/{id} GET InstanceResource 获取服务实例信息
/apps/{appId}/{id} PUT InstanceResource 服务实例续约
/apps/{appId}/{id} DELETE InstanceResource 服务实例下线
/apps/{appId}/{id}/status PUT InstanceResource 服务实例状态变更

上一章spring-cloud-eureka (一) 原理分析 主要分析服务治理的功能以及Eureka Client和Eureka Server在启动时做的处理,这一章分析了Eureka Client与 Eureka Server接口交互的逻辑,还有会下一章分析注册中的服务注册、续约、下线、以及服务剔除的逻辑。

下面几张图描述了服务注册、服务续约、服务下线的调用链

服务注册
20191123100249\_4.png

服务续约
20191123100249\_5.png

服务下线
20191123100249\_6.png

参考资料
《spring cloud 微服务实战》
深度剖析服务发现组件Netflix Eureka


来源:http://ddrv.cn/a/88268

赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » spring-cloud-eureka (二) Client – Server 接口交互(消息发送)源码分析

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏