1、spring-cloud生态中对服务发下的抽象核心如下:
主要发接口就是 DiscoveryClient 以及注解 @EnableDiscoveryClient
2、EurekaDiscoveryClient的实现原理:
EurekaDiscoveryClient 实现了spring-cloud生态中定义的 DiscoveryClient 接口,EurekaDiscoveryClient的核心属性就是Netflix的EurekaClient、Netflix的EurekaClientConfig。
2.1、自动装配EurekaDiscoveryClient的原理解析,在 EurekaDiscoveryClientConfiguration 中如下源码:
@Bean@ConditionalOnMissingBeanpublic EurekaDiscoveryClient discoveryClient(EurekaClient client,EurekaClientConfig clientConfig) {使用一个Netflix的EurekaClient + 客户端配置类来实例化一个EurekaDiscoveryClientreturn new EurekaDiscoveryClient(client, clientConfig);}
那么问题来了这个 Netflix的EurekaClient、Netflix的EurekaClientConfig是如何自动装配的呢????我们来到 EurekaClientAutoConfiguration 自动配置类中发现如下源码:
配置一个EurekaClientConfigBean 的Bean, EurekaClientConfigBean 实现了EurekaClientConfig接口,因此也是一个Netflix的EurekaClientConfig实例。@Bean@ConditionalOnMissingBean(value = EurekaClientConfig.class,search = SearchStrategy.CURRENT)public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {return new EurekaClientConfigBean();}不需要动态刷新的EurekaClient:配置一个Netflix的EurekaClient Bean, 默认使用CloudEurekaClient实例,这个Bean是不需要动态刷新的。通过eureka.client.refresh.enable这个配置像可以进行设置。@Bean(destroyMethod = "shutdown")@ConditionalOnMissingBean(value = EurekaClient.class,search = SearchStrategy.CURRENT)public EurekaClient eurekaClient(ApplicationInfoManager manager,EurekaClientConfig config) {return new CloudEurekaClient(manager, config, this.optionalArgs,this.context);}所以还有一个需要动态刷新的Netflix 的EurekaClient实例的配置:@Bean(destroyMethod = "shutdown")@ConditionalOnMissingBean(value = EurekaClient.class,search = SearchStrategy.CURRENT)@org.springframework.cloud.context.config.annotation.RefreshScope@Lazypublic EurekaClient eurekaClient(ApplicationInfoManager manager,EurekaClientConfig config, EurekaInstanceConfig instance,@Autowired(required = false) HealthCheckHandler healthCheckHandler) {// If we use the proxy of the ApplicationInfoManager we could run into a// problem// when shutdown is called on the CloudEurekaClient where the// ApplicationInfoManager bean is// requested but wont be allowed because we are shutting down. To avoid this// we use the// object directly.ApplicationInfoManager appManager;if (AopUtils.isAopProxy(manager)) {appManager = ProxyUtils.getTargetObject(manager);}else {appManager = manager;}CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,config, this.optionalArgs, this.context);cloudEurekaClient.registerHealthCheck(healthCheckHandler);return cloudEurekaClient;}
默认是配置动态刷新的 CloudEurekaClient,因为 eureka.client.refresh.enable 默认是true。
以上就完成了EurekaDiscoveryClient实例的自动装配。
2.2、EurekaDiscoveryClient 实例中的Netflix的EurekaClient实例初始化原理解析,为什么来说这个呢?那是因为 EurekaClient 初始化的时候做了很多事情,接下来我们来揭晓:
CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,config, this.optionalArgs, this.context);
我么来到 CloudEurekaClient 的构造函数中:
public CloudEurekaClient(ApplicationInfoManager applicationInfoManager,EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args,ApplicationEventPublisher publisher) {super(applicationInfoManager, config, args);this.applicationInfoManager = applicationInfoManager;this.publisher = publisher;this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,"eurekaTransport");ReflectionUtils.makeAccessible(this.eurekaTransportField);}
来到父类中的构造函数:
public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) {this(applicationInfoManager, config, args, ResolverUtils::randomize);}public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) {this(applicationInfoManager, config, args, new Provider<BackupRegistry>() {private volatile BackupRegistry backupRegistryInstance;@Overridepublic synchronized BackupRegistry get() {if (backupRegistryInstance == null) {String backupRegistryClassName = config.getBackupRegistryImpl();if (null != backupRegistryClassName) {try {backupRegistryInstance = (BackupRegistry) Class.forName(backupRegistryClassName).newInstance();logger.info("Enabled backup registry of type {}", backupRegistryInstance.getClass());} catch (InstantiationException e) {logger.error("Error instantiating BackupRegistry.", e);} catch (IllegalAccessException e) {logger.error("Error instantiating BackupRegistry.", e);} catch (ClassNotFoundException e) {logger.error("Error instantiating BackupRegistry.", e);}}if (backupRegistryInstance == null) {logger.warn("Using default backup registry implementation which does not do anything.");backupRegistryInstance = new NotImplementedRegistryImpl();}}return backupRegistryInstance;}}, randomizer);}
再来到this(...)方法中:藏得够深吧,这个方法里面做了注册、心跳定时任务初始化,本地服务列表缓存更新任务初始化等核心操作。
@InjectDiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {if (args != null) {this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;this.eventListeners.addAll(args.getEventListeners());this.preRegistrationHandler = args.preRegistrationHandler;} else {this.healthCheckCallbackProvider = null;this.healthCheckHandlerProvider = null;this.preRegistrationHandler = null;}this.applicationInfoManager = applicationInfoManager;InstanceInfo myInfo = applicationInfoManager.getInfo();clientConfig = config;staticClientConfig = clientConfig;transportConfig = config.getTransportConfig();instanceInfo = myInfo;if (myInfo != null) {appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();} else {logger.warn("Setting instanceInfo to a passed in null value");}this.backupRegistryProvider = backupRegistryProvider;this.endpointRandomizer = endpointRandomizer;this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);localRegionApps.set(new Applications());fetchRegistryGeneration = new AtomicLong(0);remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));如果需要从服务端拉取服务if (config.shouldFetchRegistry()) {this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});} else {this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;}如果需要注册到eureka 服务端if (config.shouldRegisterWithEureka()) {this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});} else {this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;}logger.info("Initializing Eureka in region {}", clientConfig.getRegion());如果不从eureka服务端拉取数据,也注册到eureka服务端,那就不开启心跳、本地缓存刷新等定时任务。if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {logger.info("Client configured to neither register nor query for data.");scheduler = null;heartbeatExecutor = null;cacheRefreshExecutor = null;eurekaTransport = null;instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()// to work with DI'd DiscoveryClientDiscoveryManager.getInstance().setDiscoveryClient(this);DiscoveryManager.getInstance().setEurekaClientConfig(config);initTimestampMs = System.currentTimeMillis();initRegistrySize = this.getApplications().size();registrySize = initRegistrySize;logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",initTimestampMs, initRegistrySize);return; // no need to setup up an network tasks and we are done}如果既需要从eureka服务端拉取数据,也需要注册到服务端,那么就初始化相关定时任务。try {// default size of 2 - 1 each for heartbeat and cacheRefresh1、创建一个策略类型的执行器scheduler = Executors.newScheduledThreadPool(2,new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-%d").setDaemon(true).build());2、创建一个心跳执行器heartbeatExecutor = new ThreadPoolExecutor(1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").setDaemon(true).build()); // use direct handoff3、创建一个缓存刷新执行器,缓存指的是,客户端从服务端获取的数据进行缓存,然后会定时去重新获取进行刷新。cacheRefreshExecutor = new ThreadPoolExecutor(1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,new SynchronousQueue<Runnable>(),new ThreadFactoryBuilder().setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d").setDaemon(true).build()); // use direct handoffeurekaTransport = new EurekaTransport();scheduleServerEndpointTask(eurekaTransport, args);AzToRegionMapper azToRegionMapper;if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);} else {azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);}if (null != remoteRegionsToFetch.get()) {azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));}instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());} catch (Throwable e) {throw new RuntimeException("Failed to initialize DiscoveryClient!", e);}if (clientConfig.shouldFetchRegistry()) {try {boolean primaryFetchRegistryResult = fetchRegistry(false);if (!primaryFetchRegistryResult) {logger.info("Initial registry fetch from primary servers failed");}boolean backupFetchRegistryResult = true;if (!primaryFetchRegistryResult && !fetchRegistryFromBackup()) {backupFetchRegistryResult = false;logger.info("Initial registry fetch from backup servers failed");}if (!primaryFetchRegistryResult && !backupFetchRegistryResult && clientConfig.shouldEnforceFetchRegistryAtInit()) {throw new IllegalStateException("Fetch registry error at startup. Initial fetch failed.");}} catch (Throwable th) {logger.error("Fetch registry error at startup: {}", th.getMessage());throw new IllegalStateException(th);}}// call and execute the pre registration handler before all background tasks (inc registration) is startedif (this.preRegistrationHandler != null) {this.preRegistrationHandler.beforeRegistration();}如果需要在初始化阶段进行服务注册,那就进行注册,但是如果注册失败,服务将启动失败。if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {try {if (!register() ) {throw new IllegalStateException("Registration error at startup. Invalid server response.");}} catch (Throwable th) {logger.error("Registration error at startup: {}", th.getMessage());throw new IllegalStateException(th);}}// finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch最后 初始化定时任务,例如:集群解析、心跳、拉取数据更新缓存等定时调度。initScheduledTasks();try {Monitors.registerObject(this);} catch (Throwable e) {logger.warn("Cannot register timers", e);}// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()// to work with DI'd DiscoveryClientDiscoveryManager.getInstance().setDiscoveryClient(this);DiscoveryManager.getInstance().setEurekaClientConfig(config);initTimestampMs = System.currentTimeMillis();initRegistrySize = this.getApplications().size();registrySize = initRegistrySize;logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",initTimestampMs, initRegistrySize);}
初始化定时调度的实现如下:
private void initScheduledTasks() {1、如果需要从服务端拉取数据if (clientConfig.shouldFetchRegistry()) {// registry cache refresh timer2、先获取配置的 客户端拉取服务数据的间格时间(默认30秒)。int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();3、使用调度器+缓存刷新执行器来构建一个缓存刷新任务TimedSupervisorTask,之所以这样做的原因是失败重试的时候 做衰减重试。cacheRefreshTask = new TimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread());4、 启动缓存刷新定时任务,默认30s执行一次。scheduler.schedule(cacheRefreshTask,registryFetchIntervalSeconds, TimeUnit.SECONDS);}5、如果需要将自己注册到服务端if (clientConfig.shouldRegisterWithEureka()) {6、获取服务续约的间隔时间(默认30秒)int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);// Heartbeat timer7、使用调度器 + 心跳执行器 来构建一个心跳定时任务,原理与缓存刷新一直,做重试衰减。heartbeatTask = new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread());8、启动心跳定时任务。scheduler.schedule(heartbeatTask,renewalIntervalInSecs, TimeUnit.SECONDS);// InstanceInfo replicator9、构建一个实例信息复制器,作用就是在实例信息发生改变的时候,进行服务端的数据刷新。默认也是30秒。instanceInfoReplicator = new InstanceInfoReplicator(this,instanceInfo,clientConfig.getInstanceInfoReplicationIntervalSeconds(),2); // burstSize10、构建一个状态改变监听器,当当前应用状态发生改变的时候进行通知。statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {@Overridepublic String getId() {return "statusChangeListener";}@Overridepublic void notify(StatusChangeEvent statusChangeEvent) {if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {// log at warn level if DOWN was involvedlogger.warn("Saw local status change event {}", statusChangeEvent);} else {logger.info("Saw local status change event {}", statusChangeEvent);}instanceInfoReplicator.onDemandUpdate();}};11、如果配置了应该在状态改变的时候进行服务端更新,那就将上面构建的状态改变监听器注册到applicationInfoManager中。if (clientConfig.shouldOnDemandUpdateStatusChange()) {applicationInfoManager.registerStatusChangeListener(statusChangeListener);} 12、启动实例信息赋值到服务端的定时调度。 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());} else {logger.info("Not registering with Eureka server per configuration");}}
3、当该实例化+初始化的类都好了以后,那就开始工作了,怎么工作的呢?spring-cloud-eureka-client中提供了一个生命周期的接口的实现类 EurekaAutoServiceRegistration ,类图如下:
其中 AutoServiceRegistration 是spring-cloud生态中提供的服务自动注册的接口。
我们主要看看EurekaAutoServiceRegistration这个类中的start()方法,源码如下:
@Overridepublic void start() {// only set the port if the nonSecurePort or securePort is 0 and this.port != 0if (this.port.get() != 0) {if (this.registration.getNonSecurePort() == 0) {this.registration.setNonSecurePort(this.port.get());}if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {this.registration.setSecurePort(this.port.get());}}// only initialize if nonSecurePort is greater than 0 and it isn't already running// because of containerPortInitializer belowif (!this.running.get() && this.registration.getNonSecurePort() > 0) {使用服务注册器注册,这里的注册器就是EurekaServiceRegistrythis.serviceRegistry.register(this.registration);this.context.publishEvent(new InstanceRegisteredEvent<>(this,this.registration.getInstanceConfig()));this.running.set(true);}}
来到服务注册器的注册方法,在 EurekaServiceRegistry ,这个类也实现了spring-cloud中的服务注册接口 ServiceRegistry,源码如下:
@Overridepublic void register(EurekaRegistration reg) {maybeInitializeClient(reg);if (log.isInfoEnabled()) {log.info("Registering application "+ reg.getApplicationInfoManager().getInfo().getAppName()+ " with eureka with status "+ reg.getInstanceConfig().getInitialStatus());}先设置ApplicationInfoManager中的当前实例InstanceInfo的状态为初始状态UP, 也就是
InstanceStatus.UP。在这里设置之前,InstanceInfo的状态是STARTING,也就是
InstanceStatus.STARTING。这里设置实例状态的方法会触发状态改变监听器StatusChangeListener的notify
方法。reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());如果有健康检查处理器,那就将其注册到CloudEurekaClient 中,默认情况下是没有健康检查处理器的。reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg.getEurekaClient().registerHealthCheck(healthCheckHandler));}
我们来看看 ApplicationInfoManager 的 setInstanceStatus(InstanceStatus status)方法:
public synchronized void setInstanceStatus(InstanceStatus status) {1、转换为最新状态,从上面调用栈来说,就是UP状态。InstanceStatus next = instanceStatusMapper.map(status);if (next == null) {return;}2、获取前面一次的状态,也就是STARTINGInstanceStatus prev = instanceInfo.setStatus(next);3、通知所有的状态改变监听器,状态有STARTING --> UPif (prev != null) {for (StatusChangeListener listener : listeners.values()) {try {listener.notify(new StatusChangeEvent(prev, next));} catch (Exception e) {logger.warn("failed to notify listener: {}", listener.getId(), e);}}}}
如果还记得 CloudEurekaClient 在构造函数中构建了一个匿名内部的状态改变监听器的话,这里就简单多了,我们把CloudEurekaClient的父类的构造函数中构建 StatusChangeEvent 以及将监听器注册到ApplicationInfoManager中的核心代码进行展示:
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {@Overridepublic String getId() {return "statusChangeListener";}@Overridepublic void notify(StatusChangeEvent statusChangeEvent) {if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {// log at warn level if DOWN was involvedlogger.warn("Saw local status change event {}", statusChangeEvent);} else {logger.info("Saw local status change event {}", statusChangeEvent);}instanceInfoReplicator.onDemandUpdate();}};if (clientConfig.shouldOnDemandUpdateStatusChange()) {applicationInfoManager.registerStatusChangeListener(statusChangeListener);}
那么理所当然会来到这个匿名对象的notify方法咯:
@Overridepublic void notify(StatusChangeEvent statusChangeEvent) {如果最新状态为DOWN 或者 前一次状态为DOWN,就答应警告日志,否则做info日志。 if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {// log at warn level if DOWN was involvedlogger.warn("Saw local status change event {}", statusChangeEvent);} else {logger.info("Saw local status change event {}", statusChangeEvent);}然后使用实例信息复制器进行服务端状态更新通知。instanceInfoReplicator.onDemandUpdate();}
顺利成章来到 InstanceInfoReplicator 这个类,这个类实现了 Runnable 接口,本生就是一个Task:
核心实现就是手动开启一条线程去执行InstanceInfoReplicator的run方法。public boolean onDemandUpdate() {if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {if (!scheduler.isShutdown()) {scheduler.submit(new Runnable() {@Overridepublic void run() {logger.debug("Executing on-demand update of local InstanceInfo");Future latestPeriodic = scheduledPeriodicRef.get();if (latestPeriodic != null && !latestPeriodic.isDone()) {logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");latestPeriodic.cancel(false);}InstanceInfoReplicator.this.run();}});return true;} else {logger.warn("Ignoring onDemand update due to stopped scheduler");return false;}} else {logger.warn("Ignoring onDemand update due to rate limiter");return false;}}
理所当然来到run()方法:
public void run() {try {1、先刷新实例信息,主要就是使用HealthCheckHandler再次获取当前实例的状态并更新实例信息InstanceInfo的状态信息。discoveryClient.refreshInstanceInfo();2、设置当前instanceInfo的变脏的时间,变脏的含义就是当前实例的信息以及发生改变,还没
通知服务端,所以叫脏的。如果已经在脏的状态的话,在下一次定时复制实例信息到服务端的时候再进行复制。Long dirtyTimestamp = instanceInfo.isDirtyWithTime();if (dirtyTimestamp != null) {3、使用服务发现客户端进行注册,此处核心。discoveryClient.register();4、注册后将脏实例设置为不脏。instanceInfo.unsetIsDirty(dirtyTimestamp);}} catch (Throwable t) {logger.warn("There was a problem with the instance info replicator", t);} finally {4、 不管注册成功还是失败,都会开启间歇性复制当前实例到服务端的定时任务,默认30秒执行一次。Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);scheduledPeriodicRef.set(next);}}
理所当然我们来到discoveryClient的registry()方法:这里的discoveryClient是CloudEurekaClient:
boolean register() throws Throwable {logger.info(PREFIX + "{}: registering service...", appPathIdentifier);EurekaHttpResponse<Void> httpResponse;try {使用eureka的传输实例中的注册器客户端发起注册,参数是需要注册的实例信息instanceInfo,
默认是使用AbstractJerseyEurekaHttpClient这个客户但进行http请求的构建与发送。httpResponse = eurekaTransport.registrationClient.register(instanceInfo);} catch (Exception e) {logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);throw e;}if (logger.isInfoEnabled()) {logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());}return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();}
理所当然来到 AbstractJerseyEurekaHttpClient的register(InstanceInfo info) :
@Overridepublic EurekaHttpResponse<Void> register(InstanceInfo info) {String urlPath = "apps/" + info.getAppName();ClientResponse response = null;try {构建请求URL=http://localhost:9090/eureka/apps/{AppName转大写}Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();addExtraHeaders(resourceBuilder);请求服务端并得到响应,这里就会调用服务端ApplicationResource提供的addInstance(...)接口。这就完成了服务注册。response = resourceBuilder.header("Accept-Encoding", "gzip").type(MediaType.APPLICATION_JSON_TYPE).accept(MediaType.APPLICATION_JSON).post(ClientResponse.class, info);return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();} finally {if (logger.isDebugEnabled()) {logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),response == null ? "N/A" : response.getStatus());}if (response != null) {response.close();}}}
4、服务注册讲完了,接下来我们来讲解心跳:在初始化CloudEurekaClient的时候,在其父类的构造函数里面构建了心跳的任务,我们在看看源码:
// Heartbeat timerheartbeatTask = new TimedSupervisorTask("heartbeat",scheduler,heartbeatExecutor,renewalIntervalInSecs,TimeUnit.SECONDS,expBackOffBound,new HeartbeatThread());scheduler.schedule(heartbeatTask,renewalIntervalInSecs, TimeUnit.SECONDS);
心跳所做的事情在 HeartbeatThread 这个Task里面,源码如下:
private class HeartbeatThread implements Runnable {public void run() {就是发起续约,续约成功的话就修改最后续约成功的时间if (renew()) {lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();}}}
续约的实现如下:
boolean renew() {EurekaHttpResponse<InstanceInfo> httpResponse;try {发送续约请求,只要响应200就表示续约成功。httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {REREGISTER_COUNTER.increment();logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());long timestamp = instanceInfo.setIsDirtyWithTime();boolean success = register();if (success) {instanceInfo.unsetIsDirty(timestamp);}return success;}return httpResponse.getStatusCode() == Status.OK.getStatusCode();} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);return false;}}
发送心跳的实现:
@Overridepublic EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) {String urlPath = "apps/" + appName + '/' + id;ClientResponse response = null;try {构建心跳的http请求,url=http://localhost:9090/eureka/apps/CONSUMER/DESKTOP-
IHK6B18:consumer:8080?status=UP&lastDirtyTimestamp=1608777683806,这样就会调用服务端
InstanceResource的renewLease(...)的rest接口。WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath).queryParam("status", info.getStatus().toString()).queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString());if (overriddenStatus != null) {webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name());}Builder requestBuilder = webResource.getRequestBuilder();addExtraHeaders(requestBuilder);response = requestBuilder.put(ClientResponse.class);EurekaHttpResponseBuilder<InstanceInfo> eurekaResponseBuilder = anEurekaHttpResponse(response.getStatus(), InstanceInfo.class).headers(headersOf(response));if (response.hasEntity() &&!HTML.equals(response.getType().getSubtype())) { //don't try and deserialize random html errors from the servereurekaResponseBuilder.entity(response.getEntity(InstanceInfo.class));}return eurekaResponseBuilder.build();} finally {if (logger.isDebugEnabled()) {logger.debug("Jersey HTTP PUT {}/{}; statusCode={}", serviceUrl, urlPath, response == null ? "N/A" : response.getStatus());}if (response != null) {response.close();}}}
5、心跳讲解完成,我们来分析本地缓存刷新的原理,默认eurekaClient会全量获取服务列表,并缓存在本地,然后使用定时任务去拉取最新数据然后更新本地缓存,这个就是使用缓存更新任务来实现的,我们在CloudEurekaClient初始化的时候知道了,在其父类的构造函数中构建并启动了这个任务,我们来看看源码:
cacheRefreshTask = new TimedSupervisorTask("cacheRefresh",scheduler,cacheRefreshExecutor,registryFetchIntervalSeconds,TimeUnit.SECONDS,expBackOffBound,new CacheRefreshThread());scheduler.schedule(cacheRefreshTask,registryFetchIntervalSeconds, TimeUnit.SECONDS);
缓存刷新业务在 CacheRefreshThread 中,源码如下:
class CacheRefreshThread implements Runnable {public void run() {refreshRegistry();}}
刷新服务列表实现如下:
@VisibleForTestingvoid refreshRegistry() {try {boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();boolean remoteRegionsModified = false;// This makes sure that a dynamic change to remote regions to fetch is honored.String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();if (null != latestRemoteRegions) {String currentRemoteRegions = remoteRegionsToFetch.get();if (!latestRemoteRegions.equals(currentRemoteRegions)) {// Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in syncsynchronized (instanceRegionChecker.getAzToRegionMapper()) {if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {String[] remoteRegions = latestRemoteRegions.split(",");remoteRegionsRef.set(remoteRegions);instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);remoteRegionsModified = true;} else {logger.info("Remote regions to fetch modified concurrently," +" ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);}}} else {// Just refresh mapping to reflect any DNS/Property changeinstanceRegionChecker.getAzToRegionMapper().refreshMapping();}}1、主要看这里,拉取服务列表,从eureka 服务端。boolean success = fetchRegistry(remoteRegionsModified);2、如果拉取成功,那就修改当前记录的服务数量,那服务数据怎么更新的呢??,我们进入到fetchRegistry一探究竟。if (success) {registrySize = localRegionApps.get().size();lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();}if (logger.isDebugEnabled()) {StringBuilder allAppsHashCodes = new StringBuilder();allAppsHashCodes.append("Local region apps hashcode: ");allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());allAppsHashCodes.append(", is fetching remote regions? ");allAppsHashCodes.append(isFetchingRemoteRegionRegistries);for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {allAppsHashCodes.append(", Remote region: ");allAppsHashCodes.append(entry.getKey());allAppsHashCodes.append(" , apps hashcode: ");allAppsHashCodes.append(entry.getValue().getAppsHashCode());}logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",allAppsHashCodes);}} catch (Throwable e) {logger.error("Cannot fetch registry from server", e);}}
理所当然来到 fetchRegistry(boolean forceFullRegistryFetch):
private boolean fetchRegistry(boolean forceFullRegistryFetch) {Stopwatch tracer = FETCH_REGISTRY_TIMER.start();try {// If the delta is disabled or if it is the first time, get all// applications1、先获取本地缓存的服务Applications applications = getApplications();2、如果禁用了增量拉取 || 强制全量拉取(默认true) || 本地缓存的服务是空 || if (clientConfig.shouldDisableDelta()|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))|| forceFullRegistryFetch|| (applications == null)|| (applications.getRegisteredApplications().size() == 0)|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta{logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());logger.info("Force full registry fetch : {}", forceFullRegistryFetch);logger.info("Application is null : {}", (applications == null));logger.info("Registered Applications size is zero : {}",(applications.getRegisteredApplications().size() == 0));logger.info("Application version is -1: {}", (applications.getVersion() == -1));3、获取并缓存全量的服务数据。getAndStoreFullRegistry();} else {4、否则就获取增量服务数据然后更新到本地缓存中。getAndUpdateDelta(applications);}applications.setAppsHashCode(applications.getReconcileHashCode());logTotalInstances();} catch (Throwable e) {logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);return false;} finally {if (tracer != null) {tracer.stop();}}// Notify about cache refresh before updating the instance remote statusonCacheRefreshed();// Update remote status based on refreshed data held in the cacheupdateInstanceRemoteStatus();// registry was fetched successfully, so return truereturn true;}
我们先来看看全量获取并缓存的实现:
private void getAndStoreFullRegistry() throws Throwable {long currentUpdateGeneration = fetchRegistryGeneration.get();logger.info("Getting all instance registry info from the eureka server");Applications apps = null;EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null1、获取所有的应用注册服务数据,这里就会调用服务端ApplicationsResource.getContainers(...)的rest接口。? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()): eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());2、如果获取成功,将注册的应用数据取出。if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {apps = httpResponse.getEntity();}logger.info("The response status is {}", httpResponse.getStatusCode());3、如果注册的应用数据是空的,那就记录error日志if (apps == null) {logger.error("The application is null for some reason. Not storing this information");4、如果注册的应用数据不是空的,那就设置到本地缓存中。} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {localRegionApps.set(this.filterAndShuffle(apps));logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());} else {logger.warn("Not updating applications as another thread is updating it already");}}
增量的也差不多是这个流程,我们就不查看源码了。
6、服务注册、状态改变复制到服务端、心跳、本地缓存更新都进行了讲解,接下来我们来讲解服务的一种DOWN状态,还是在 EurekaAutoServiceRegistration这个类里面,上面说了,它实现了 SmartLifecycle 接口,因此我们这次看它的stop()方法,源码如下:注意这里只是说EurekaAutoServiceRegistration这个Bean停止工作了,但是不代表真个应用shutdown了,所以这里只是实例状态DOWN,因此操作就是更新当前实例的DOWN状态到服务端。而真正的下线在 CloudEurekaClient 的 shutdown()方法中:
@Overridepublic void stop() {this.serviceRegistry.deregister(this.registration);this.running.set(false);}
服务由于EurekaAutoServiceRegistration Bean停止工作而状态为DOWN的源码如下:
@Overridepublic void deregister(EurekaRegistration reg) {if (reg.getApplicationInfoManager().getInfo() != null) {if (log.isInfoEnabled()) {log.info("Unregistering application "+ reg.getApplicationInfoManager().getInfo().getAppName()+ " with eureka with status DOWN");}设置实例的状态为DOWN状态,然后还是会触发StatusChangeListener,然后还是调用到实例复制器的run()方法,进行实例状态刷新然后注册,只不过这个时候状态是DOWN。reg.getApplicationInfoManager().setInstanceStatus(InstanceInfo.InstanceStatus.DOWN);// shutdown of eureka client should happen with EurekaRegistration.close()// auto registration will create a bean which will be properly disposed// manual registrations will need to call close()}}InstanceInfoReplicator的run方法,上面服务注册的时候也是讲解过:这个时候由于实例的状态是DOWN,所
以会发起实例信息复制到服务端也就是重新注册。public void run() {try {discoveryClient.refreshInstanceInfo();Long dirtyTimestamp = instanceInfo.isDirtyWithTime();if (dirtyTimestamp != null) {discoveryClient.register();instanceInfo.unsetIsDirty(dirtyTimestamp);}} catch (Throwable t) {logger.warn("There was a problem with the instance info replicator", t);} finally {Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);scheduledPeriodicRef.set(next);}}
7、服务的下线前面提了一嘴,也就是当 CloudEurekaClient 这个Bean被销毁的时候会进行服务下线,在配置 CloudEurekaClient 这个Bean的时候定义的其 destroyMethod属性:
@Bean(destroyMethod = "shutdown") 定义了销毁的方法@ConditionalOnMissingBean(value = EurekaClient.class,search = SearchStrategy.CURRENT)@org.springframework.cloud.context.config.annotation.RefreshScope@Lazypublic EurekaClient eurekaClient(ApplicationInfoManager manager,EurekaClientConfig config, EurekaInstanceConfig instance,@Autowired(required = false) HealthCheckHandler healthCheckHandler) {// If we use the proxy of the ApplicationInfoManager we could run into a// problem// when shutdown is called on the CloudEurekaClient where the// ApplicationInfoManager bean is// requested but wont be allowed because we are shutting down. To avoid this// we use the// object directly.ApplicationInfoManager appManager;if (AopUtils.isAopProxy(manager)) {appManager = ProxyUtils.getTargetObject(manager);}else {appManager = manager;}CloudEurekaClient cloudEurekaClient = new CloudEurekaClient(appManager,config, this.optionalArgs, this.context);cloudEurekaClient.registerHealthCheck(healthCheckHandler);return cloudEurekaClient;}
来到销毁的方法:
@PreDestroy@Overridepublic synchronized void shutdown() {if (isShutdown.compareAndSet(false, true)) {logger.info("Shutting down DiscoveryClient ...");删除之前注册的状态改变监听器if (statusChangeListener != null && applicationInfoManager != null) {applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());}取消定时任务cancelScheduledTasks();// If APPINFO was registeredif (applicationInfoManager != null&& clientConfig.shouldRegisterWithEureka()&& clientConfig.shouldUnregisterOnShutdown()) {applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);向服务端发起实例下线请求unregister();}if (eurekaTransport != null) {eurekaTransport.shutdown();}heartbeatStalenessMonitor.shutdown();registryStalenessMonitor.shutdown();Monitors.unregisterObject(this);logger.info("Completed shut down of DiscoveryClient");}}
下线请求实现:
void unregister() {// It can be null if shouldRegisterWithEureka == falseif(eurekaTransport != null && eurekaTransport.registrationClient != null) {try {logger.info("Unregistering ...");调用服务端的服务取消接口,这里将会调用服务端的"apps/" + appName + '/' + id请求,
请求类型为DELETE,也就是调用服务端InstanceResource.cancelLease(...)rest接口。这样就将服务进行了下
线。当然这是正常的下线方式,暴力下线将是使用服务端的剔除的定时任务(默认60s剔除90s内没有续约的实例)来实现的EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());} catch (Exception e) {logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);}}}
8、ribbon 如何与eurekaClient进行整合的:
ribbon如何让获取到eurekaClient缓存在本地的服务列表数据呢? 我们知道eurekaClient会将服务数据缓存在 DiscoveryClient 的 localRegionApps的实例中:
private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();
这个缓存会使用定时任务进行刷新,那么ribbon是如何来获取这个本地缓存的呢?我们知道在ribbon中使用了Netflix 中的ServerList 中提供了这个接口来更新服务提供者数据,那么eureka里面是不是提供了相关的实现呢? 我们来看看ServerList 类图:不止eureka做了实现,zookeeper、consul相关的集成也实现了相关的服务提供者获取。
在spring-cloud层面实现类是 org.springframework.cloud.netflix.ribbon.eureka.DomainExtractingServerList 源码入下:
public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {真正的服务列表实现类是ribbon-eureka 中的DiscoveryEnabledNIWSServerListprivate ServerList<DiscoveryEnabledServer> list;private final RibbonProperties ribbon;private boolean approximateZoneFromHostname;public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,IClientConfig clientConfig, boolean approximateZoneFromHostname) {this.list = list;this.ribbon = RibbonProperties.from(clientConfig);this.approximateZoneFromHostname = approximateZoneFromHostname;}@Overridepublic List<DiscoveryEnabledServer> getInitialListOfServers() {List<DiscoveryEnabledServer> servers = setZones(this.list.getInitialListOfServers());return servers;}@Overridepublic List<DiscoveryEnabledServer> getUpdatedListOfServers() {List<DiscoveryEnabledServer> servers = setZones(this.list.getUpdatedListOfServers());return servers;}. . .
}
因此我们来到 DiscoveryEnabledNIWSServerList中:我们主要看实现的ServerList接口的 getUpdatedListOfServers()方法:
public List<DiscoveryEnabledServer> getUpdatedListOfServers() {return this.obtainServersViaDiscovery();}
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {List<DiscoveryEnabledServer> serverList = new ArrayList();if (this.eurekaClientProvider != null && this.eurekaClientProvider.get() != null) {1、 获取一个eurekaClientEurekaClient eurekaClient = (EurekaClient)this.eurekaClientProvider.get();if (this.vipAddresses != null) {String[] var3 = this.vipAddresses.split(",");int var4 = var3.length;for(int var5 = 0; var5 < var4; ++var5) {String vipAddress = var3[var5];2、获取实例列表数据,此处核心List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, this.isSecure, this.targetRegion);Iterator var8 = listOfInstanceInfo.iterator();while(var8.hasNext()) {InstanceInfo ii = (InstanceInfo)var8.next();if (ii.getStatus().equals(InstanceStatus.UP)) {if (this.shouldUseOverridePort) {if (logger.isDebugEnabled()) {logger.debug("Overriding port on client name: " + this.clientName + " to " + this.overridePort);}InstanceInfo copy = new InstanceInfo(ii);if (this.isSecure) {ii = (new Builder(copy)).setSecurePort(this.overridePort).build();} else {ii = (new Builder(copy)).setPort(this.overridePort).build();}}DiscoveryEnabledServer des = this.createServer(ii, this.isSecure, this.shouldUseIpAddr);serverList.add(des);}}if (serverList.size() > 0 && this.prioritizeVipAddressBasedServers) {break;}}}3、返回服务列表数据return serverList;} else {logger.warn("EurekaClient has not been initialized yet, returning an empty list");return new ArrayList();}}
获取实例数据通过VipAddress的实现:
@Overridepublic List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure,@Nullable String region) {if (vipAddress == null) {throw new IllegalArgumentException("Supplied VIP Address cannot be null");}Applications applications;if (instanceRegionChecker.isLocalRegion(region)) {看见没,在这里,直接获取当前EurekaClient中缓存的服务数据,毫无疑问此处的
EurekaClient就是之前配置的CloudEurekaClient,那么此处获取的服务列表数据也肯定是从服务端拉取后缓
存的数据。applications = this.localRegionApps.get();} else {applications = remoteRegionVsApps.get(region);if (null == applications) {logger.debug("No applications are defined for region {}, so returning an empty instance list for vip "+ "address {}.", region, vipAddress);return Collections.emptyList();}}if (!secure) {return applications.getInstancesByVirtualHostName(vipAddress);} else {return applications.getInstancesBySecureVirtualHostName(vipAddress);}}
以上就是真个eureka-client的原理分析。