1、前面我们分析了eureka服务端实现原理、客户端实现原理,今天我们来分析一下eureka集群同步的原理,如何搭建以及使用eureka集群我们在前面以及演示过了,此处不在过多累赘。
2、eureka服务端也是客户端:
前面我们分析eurekaServer端的时候提到过,服务端依赖了客户端的实现,且也会在服务端启动之前就构建一个客户端,那么为啥需要这样做呢?????
答:原因就是服务端启动的时候需要去同步其他节点的服务注册表数据。 接下来我们来验证是否是这样是实现的。
首先来查看 EurekaServerAutoConfiguration 这个自动配置类:
@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {。 。 。
}
主要查看 @Import(EurekaServerInitializerConfiguration.class),进入 EurekaServerInitializerConfiguration这个类:
@Configuration(proxyBeanMethods = false)
public class EurekaServerInitializerConfigurationimplements ServletContextAware, SmartLifecycle, Ordered {。 。 。@Overridepublic void start() {new Thread(() -> {try {// TODO: is this class even needed now?主要看这里,初始化eurekaServerBootstrapeurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);log.info("Started Eureka Server");publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));EurekaServerInitializerConfiguration.this.running = true;publish(new EurekaServerStartedEvent(getEurekaServerConfig()));}catch (Exception ex) {// Help!log.error("Could not initialize Eureka servlet context", ex);}}).start();}。 。 。
}
初始化的具体实现:
public void contextInitialized(ServletContext context) {try {initEurekaEnvironment();主要看这里,初始化eurekaServer上下文initEurekaServerContext();context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);}catch (Throwable e) {log.error("Cannot bootstrap eureka server :", e);throw new RuntimeException("Cannot bootstrap eureka server :", e);}}
初始化上下文的实现:
protected void initEurekaServerContext() throws Exception {// For backward compatibilityJsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),XStream.PRIORITY_VERY_HIGH);XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),XStream.PRIORITY_VERY_HIGH);if (isAws(this.applicationInfoManager.getInfo())) {this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,this.eurekaClientConfig, this.registry, this.applicationInfoManager);this.awsBinder.start();}EurekaServerContextHolder.initialize(this.serverContext);log.info("Initialized server context");// Copy registry from neighboring eureka node主要看这里,从其他节点node同步数据刀当前的eurekaServer节点int registryCount = this.registry.syncUp();this.registry.openForTraffic(this.applicationInfoManager, registryCount);// Register all monitoring statistics.EurekaMonitors.registerAllStats();}
同步其他节点的实现(这里很重要):
@Overridepublic int syncUp() {// Copy entire entry from neighboring DS nodeint count = 0;for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {if (i > 0) {try {Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());} catch (InterruptedException e) {logger.warn("Interrupted during registry transfer..");break;}}1、使用eurekaClient获取从其他节点上拉取的服务注册表,这里是从eurekaClient的本地缓存中获取服务注册表,还记得之前讲解过的客户端的缓存刷新任务吗,对,就是使用那个任务进行数据刷新获取数据缓存起来,然后eurekaServer启动的时候去获取这个服务注册表的缓存,然后注册在eurekaServer自己上面。Applications apps = eurekaClient.getApplications();for (Application app : apps.getRegisteredApplications()) {for (InstanceInfo instance : app.getInstances()) {try {if (isRegisterable(instance)) {2、然后将从当前节点的eurekaClient中获取的服务注册表的缓存数据注册到自己这个节点上。register(instance, instance.getLeaseInfo().getDurationInSecs(), true);count++;}} catch (Throwable t) {logger.error("During DS init copy", t);}}}}return count;}
好了,eurekaServer启动的时候同步其他节点node服务注册表数据就是这样实现的。
3、eurekaServer在运行期间是如何同步其他节点的注册数据呢????
我们先来看看服务端提供的实例添加接口:在 ApplicationResource 中:
@POST@Consumes({"application/json", "application/xml"})public Response addInstance(InstanceInfo info,@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);// validate that the instanceinfo contains all the necessary required fieldsif (isBlank(info.getId())) {return Response.status(400).entity("Missing instanceId").build();} else if (isBlank(info.getHostName())) {return Response.status(400).entity("Missing hostname").build();} else if (isBlank(info.getIPAddr())) {return Response.status(400).entity("Missing ip address").build();} else if (isBlank(info.getAppName())) {return Response.status(400).entity("Missing appName").build();} else if (!appName.equals(info.getAppName())) {return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();} else if (info.getDataCenterInfo() == null) {return Response.status(400).entity("Missing dataCenterInfo").build();} else if (info.getDataCenterInfo().getName() == null) {return Response.status(400).entity("Missing dataCenterInfo Name").build();}// handle cases where clients may be registering with bad DataCenterInfo with missing dataDataCenterInfo dataCenterInfo = info.getDataCenterInfo();if (dataCenterInfo instanceof UniqueIdentifier) {String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();if (isBlank(dataCenterInfoId)) {boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));if (experimental) {String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";return Response.status(400).entity(entity).build();} else if (dataCenterInfo instanceof AmazonInfo) {AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);if (effectiveId == null) {amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());}} else {logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());}}}调用注册器实例进行注册registry.register(info, "true".equals(isReplication));return Response.status(204).build(); // 204 to be backwards compatible}
来到 PeerAwareInstanceRegistryImpl 的registry方法中:
@Overridepublic void register(final InstanceInfo info, final boolean isReplication) {int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {leaseDuration = info.getLeaseInfo().getDurationInSecs();}super.register(info, leaseDuration, isReplication);主要看这里,复制到其他同等的节点,操作是注册。replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);}
复制到其它节点的实现:
private void replicateToPeers(Action action, String appName, String id,InstanceInfo info /* optional */,InstanceStatus newStatus /* optional */, boolean isReplication) {Stopwatch tracer = action.getTimer().start();try {if (isReplication) {numberOfReplicationsLastMin.increment();}// If it is a replication already, do not replicate again as this will create a poison replicationif (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {return;}1、主要看这里,eurekaServer集群中其他的所有节点,然后遍历。peerEurekaNodes这个Bean会在自动装配阶段进行自动装配。for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {// If the url represents this host, do not replicate to yourself.if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {continue;}2、复制当前服务端的操作到其他同等的节点replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);}} finally {tracer.stop();}}
复制当前实例的操作到某个同等节点的实现:
private void replicateInstanceActionsToPeers(Action action, String appName,String id, InstanceInfo info, InstanceStatus newStatus,PeerEurekaNode node) {try {InstanceInfo infoFromRegistry;CurrentRequestVersion.set(Version.V2);switch (action) {1、如果是 info实例 的下线操作,进行node节点的 info实例 同步下线case Cancel:node.cancel(appName, id);break;2、如果是 info实例 的心跳续约操作,进行node节点的 info实例 同步心跳续约case Heartbeat:InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);infoFromRegistry = getInstanceByAppAndId(appName, id, false);node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);break;3、如果是 info实例 的注册操作,那就将info实例同步注册到node节点上。case Register:node.register(info);break;4、如果是 info实例 的状态改变操作,那就将info实例的状态改变同步到node上。case StatusUpdate:infoFromRegistry = getInstanceByAppAndId(appName, id, false);node.statusUpdate(appName, id, newStatus, infoFromRegistry);break;5、同步删除状态覆盖case DeleteStatusOverride:infoFromRegistry = getInstanceByAppAndId(appName, id, false);node.deleteStatusOverride(appName, id, infoFromRegistry);break;}} catch (Throwable t) {logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);} finally {CurrentRequestVersion.remove();}}
我们以registry 操作来进行讲解,其他的操作实现都一致:来到 PeerEurekaNode 的 registry(...) 方法:
public void register(final InstanceInfo info) throws Exception {long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);使用一个批量分发器去处理这个registry操作到目标node节点中。此处会将这个registry操作封装为
一个实例复制任务InstanceReplicationTask实例,然后交给batchingDispatcher去处理。batchingDispatcher.process(taskId("register", info),new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {public EurekaHttpResponse<Void> execute() {每一个InstanceReplicationTask的业务逻辑就是发起http请求,将当前的注册的
实例info 以同样的注册方式注册到需要复制的节点上。return replicationClient.register(info);}},expiryTime);}
那么核心流程分析完成了,我们来讲解一下批处理分发器 batchingDispatcher,默认在构建节点实例 PeerEurekaNode 的时候会实例化 一个批量处理分发器以及一个非批量处理分发器:
private final TaskDispatcher<String, ReplicationTask> batchingDispatcher;private final TaskDispatcher<String, ReplicationTask> nonBatchingDispatcher;
public PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl, HttpReplicationClient replicationClient, EurekaServerConfig config) {this(registry, targetHost, serviceUrl, replicationClient, config, BATCH_SIZE, MAX_BATCHING_DELAY_MS, RETRY_SLEEP_TIME_MS, SERVER_UNAVAILABLE_SLEEP_TIME_MS);}/* For testing */ PeerEurekaNode(PeerAwareInstanceRegistry registry, String targetHost, String serviceUrl,HttpReplicationClient replicationClient, EurekaServerConfig config,int batchSize, long maxBatchingDelayMs,long retrySleepTimeMs, long serverUnavailableSleepTimeMs) {this.registry = registry;this.targetHost = targetHost;this.replicationClient = replicationClient;this.serviceUrl = serviceUrl;this.config = config;this.maxProcessingDelayMs = config.getMaxTimeForReplication();String batcherName = getBatcherName();ReplicationTaskProcessor taskProcessor = new ReplicationTaskProcessor(targetHost, replicationClient);构建一个批处理分发器this.batchingDispatcher = TaskDispatchers.createBatchingTaskDispatcher(batcherName,config.getMaxElementsInPeerReplicationPool(),batchSize,config.getMaxThreadsForPeerReplication(),maxBatchingDelayMs,serverUnavailableSleepTimeMs,retrySleepTimeMs,taskProcessor);构建一个非批处理分发器this.nonBatchingDispatcher = TaskDispatchers.createNonBatchingTaskDispatcher(targetHost,config.getMaxElementsInStatusReplicationPool(),config.getMaxThreadsForStatusReplication(),maxBatchingDelayMs,serverUnavailableSleepTimeMs,retrySleepTimeMs,taskProcessor);}
在同步各种Action的时候默认是使用批处理分发器 batchingDispatcher 。为什么要使用批处理分发器呢????
答:原因就是提高效率,在服务端每一个eurekaServer都提供了一 个支持批量同步Action的rest接口:
批量同步接口@Path("batch")@POSTpublic Response batchReplication(ReplicationList replicationList) {try {ReplicationListResponse batchResponse = new ReplicationListResponse();for (ReplicationInstance instanceInfo : replicationList.getReplicationList()) {try {batchResponse.addResponse(dispatch(instanceInfo));} catch (Exception e) {batchResponse.addResponse(new ReplicationInstanceResponse(Status.INTERNAL_SERVER_ERROR.getStatusCode(), null));logger.error("{} request processing failed for batch item {}/{}",instanceInfo.getAction(), instanceInfo.getAppName(), instanceInfo.getId(), e);}}return Response.ok(batchResponse).build();} catch (Throwable e) {logger.error("Cannot execute batch Request", e);return Response.status(Status.INTERNAL_SERVER_ERROR).build();}}
既然批量那就有几个问题需要拉出来聊了,什么时候触发批量的rest接口调用?
答:批量的触发有两个条件:
1、任务数量:如果在一个周期内一旦达到了一定的任务数量(任务数量=同步的Action数量)就触发一次批量的rest接口调用,默认是周期内满250个就触发。
2、周期:等待一个周期,即使没有达到任务数量,也会发起一次批量的rest接口调用,默认的周期时长是500ms。
由谁来发起批量请求呢????
答:batchingDispatcher只负责分发,并不负责发起批量请求,发起批量请求由batchingDispatcher的任务处理器taskProcessor进行执行:源码如下:在 ReplicationTaskProcessor 中:
@Overridepublic ProcessingResult process(List<ReplicationTask> tasks) {ReplicationList list = createReplicationListOf(tasks);try {发起目标节点的批量同步接口。EurekaHttpResponse<ReplicationListResponse> response = replicationClient.submitBatchUpdates(list);int statusCode = response.getStatusCode();if (!isSuccess(statusCode)) {if (statusCode == 503) {logger.warn("Server busy (503) HTTP status code received from the peer {}; rescheduling tasks after delay", peerId);return ProcessingResult.Congestion;} else {// Unexpected error returned from the server. This should ideally never happen.logger.error("Batch update failure with HTTP status code {}; discarding {} replication tasks", statusCode, tasks.size());return ProcessingResult.PermanentError;}} else {handleBatchResponse(tasks, response.getEntity().getResponseList());}} catch (Throwable e) {if (maybeReadTimeOut(e)) {logger.error("It seems to be a socket read timeout exception, it will retry later. if it continues to happen and some eureka node occupied all the cpu time, you should set property 'eureka.server.peer-node-read-timeout-ms' to a bigger value", e);//read timeout exception is more Congestion then TransientError, return Congestion for longer delay return ProcessingResult.Congestion;} else if (isNetworkConnectException(e)) {logNetworkErrorSample(null, e);return ProcessingResult.TransientError;} else {logger.error("Not re-trying this exception because it does not seem to be a network exception", e);return ProcessingResult.PermanentError;}}return ProcessingResult.Success;}
以上就是eurekaServer集群的工作原理。