1、前面文章中我们主要演示了eureka的使用,今天我们来学习eureka的自我保护机制。
2、什么是eureka的自我保护机制?
默认情况下,如果Eureka Server在一定时间内(默认60秒)剔除90秒内没有续约的节点。但是当网络分区故障发生时,微服务与Eureka Server之间无法正常通信,而微服 务本身是正常运行的,此时不应该移除这个微服务,所以引入了自我保护机制。
spring官方的定义是:自我保护模式正是一种针对网络异常波动的安全保护措施,使用自我保护模式能使Eureka集群更加的健壮、稳定的运行。
自我机制的表现形式是啥?
如果在15分钟内超过85%的客户端节点都没有正常的心跳,那么Eureka就认为客户端与注册中心出现了网络故障,Eureka Server自动进入自我保护机制,此时会出现以下几种情况:
1、Eureka Server不再从注册列表中移除因为长时间没收到心跳而应该过期的服务。
2、Eureka Server仍然能够接受新服务的注册和查询请求,但是不会被同步到其它节点上,保证当前节点依然可用。
3、当网络稳定时,当前Eureka Server新的注册信息会被同步到其它节点中。因此Eureka Server可以很好的应对因网络故障导致部分节点失联的情况,而不会像ZK那样如果有一半不可用的情况会导致整个集群不可用而变成瘫痪。
3、自我保护机制是针对eurekaServer端的,自我保护机制默认是开启的,我们可以进行配置,使用配置eureka.server.enable-self-preservation=false进行关闭。
4、自我保护机制的实现原理:
表现形式如上图描述,那么这个自我保护机制的实现原理是啥呢?
这个自我保护机制的实现原理在于,如何统计服务续约的阈值threshold、最后一分钟的实际续约数量来完成判断。
我们在源码中找到了这两个值的定义所在:在类 AbstractInstanceRegistry 中:
#最后一分钟续约数量处理器private final MeasuredRate renewsLastMin;#服务续约阈值protected volatile int numberOfRenewsPerMinThreshold;#注册的服务数量,由于注册的服务数量是会不停变化的美因此这个值会在服务注册registry、cancel的时候进行改动。protected volatile int expectedNumberOfClientsSendingRenews;
我们先来看看服务续约的阈值是如何计算的:
protected void updateRenewsPerMinThreshold() {先使用服务数量 expectedNumberOfClientsSendingRenews * 续约的频率(默认1min 2次) * 失败率(默认0.85)例如3个服务那么阈值就等于 3 * 2 * 0.85 = 5 this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews* (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())* serverConfig.getRenewalPercentThreshold());}
我们来看看什么时候服务数量 expectedNumberOfClientsSendingRenews 会发生改变呢?
1、服务上线的时候肯定会。
2、服务主动下线的时候肯定也会。
3、剔除不良的节点的时候也会。
我们依次来看看源码中的实现:
1、服务上线的时候源码:
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {try {read.lock();Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());REGISTER.increment(isReplication);if (gMap == null) {final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);if (gMap == null) {gMap = gNewMap;}}Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());// Retain the last dirty timestamp without overwriting it, if there is already a leaseif (existingLease != null && (existingLease.getHolder() != null)) {Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);// this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted// InstanceInfo instead of the server local copy.if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +" than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");registrant = existingLease.getHolder();}} else {// The lease does not exist and hence it is a new registrationsynchronized (lock) {if (this.expectedNumberOfClientsSendingRenews > 0) {// Since the client wants to register it, increase the number of clients sending renews将服务数量进行 + 1 然后更新续约阈值。this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;updateRenewsPerMinThreshold();}}logger.debug("No previous lease information found; it is new registration");}Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);if (existingLease != null) {lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());}gMap.put(registrant.getId(), lease);recentRegisteredQueue.add(new Pair<Long, String>(System.currentTimeMillis(),registrant.getAppName() + "(" + registrant.getId() + ")"));// This is where the initial state transfer of overridden status happensif (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "+ "overrides", registrant.getOverriddenStatus(), registrant.getId());if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {logger.info("Not found overridden id {} and hence adding it", registrant.getId());overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());}}InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());if (overriddenStatusFromMap != null) {logger.info("Storing overridden status {} from map", overriddenStatusFromMap);registrant.setOverriddenStatus(overriddenStatusFromMap);}// Set the status based on the overridden status rulesInstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);registrant.setStatusWithoutDirty(overriddenInstanceStatus);// If the lease is registered with UP status, set lease service up timestampif (InstanceStatus.UP.equals(registrant.getStatus())) {lease.serviceUp();}registrant.setActionType(ActionType.ADDED);recentlyChangedQueue.add(new RecentlyChangedItem(lease));registrant.setLastUpdatedTimestamp();invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());logger.info("Registered instance {}/{} with status {} (replication={})",registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);} finally {read.unlock();}}
2、服务主动下线的时候源码分析:
protected boolean internalCancel(String appName, String id, boolean isReplication) {try {read.lock();CANCEL.increment(isReplication);Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);Lease<InstanceInfo> leaseToCancel = null;if (gMap != null) {leaseToCancel = gMap.remove(id);}recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);if (instanceStatus != null) {logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());}if (leaseToCancel == null) {CANCEL_NOT_FOUND.increment(isReplication);logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);return false;} else {leaseToCancel.cancel();InstanceInfo instanceInfo = leaseToCancel.getHolder();String vip = null;String svip = null;if (instanceInfo != null) {instanceInfo.setActionType(ActionType.DELETED);recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));instanceInfo.setLastUpdatedTimestamp();vip = instanceInfo.getVIPAddress();svip = instanceInfo.getSecureVipAddress();}invalidateCache(appName, vip, svip);logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);}} finally {read.unlock();}synchronized (lock) {if (this.expectedNumberOfClientsSendingRenews > 0) {// Since the client wants to cancel it, reduce the number of clients to send renews.将服务数量进行 -1 , 然后跟行续约数量this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;updateRenewsPerMinThreshold();}}return true;}
3、剔除不良节点的时候,在eureka服务端提出不良节点的时候是采用定时任务来实现的,源码如下:
protected void postInit() {renewsLastMin.start();if (evictionTaskRef.get() != null) {evictionTaskRef.get().cancel();}evictionTaskRef.set(new EvictionTask());剔除的定时任务,默认时间间隔是1分钟evictionTimer.schedule(evictionTaskRef.get(),serverConfig.getEvictionIntervalTimerInMs(),serverConfig.getEvictionIntervalTimerInMs());}
EvictionTask源码如下:
class EvictionTask extends TimerTask {private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l);@Overridepublic void run() {try {long compensationTimeMs = getCompensationTimeMs();logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);剔除不良节点evict(compensationTimeMs);} catch (Throwable e) {logger.error("Could not run the evict task", e);}}/*** compute a compensation time defined as the actual time this task was executed since the prev iteration,* vs the configured amount of time for execution. This is useful for cases where changes in time (due to* clock skew or gc for example) causes the actual eviction task to execute later than the desired time* according to the configured cycle.*/long getCompensationTimeMs() {long currNanos = getCurrentTimeNano();long lastNanos = lastExecutionNanosRef.getAndSet(currNanos);if (lastNanos == 0l) {return 0l;}long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos);long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs();return compensationTime <= 0l ? 0l : compensationTime;}long getCurrentTimeNano() { // for testingreturn System.nanoTime();}}
剔除不良服务源码如下:
public void evict(long additionalLeaseMs) {logger.debug("Running the evict task");如果目前已经处于开启了自我保护状态,那就不剔除节点。if (!isLeaseExpirationEnabled()) {logger.debug("DS: lease expiration is currently disabled.");return;}// We collect first all expired items, to evict them in random order. For large eviction sets,// if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,// the impact should be evenly distributed across all applications.List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();if (leaseMap != null) {for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {Lease<InstanceInfo> lease = leaseEntry.getValue();if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {expiredLeases.add(lease);}}}}// To compensate for GC pauses or drifting local time, we need to use current registry size as a base for// triggering self-preservation. Without that we would wipe out full registry.int registrySize = (int) getLocalRegistrySize();int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());int evictionLimit = registrySize - registrySizeThreshold;int toEvict = Math.min(expiredLeases.size(), evictionLimit);if (toEvict > 0) {logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);Random random = new Random(System.currentTimeMillis());for (int i = 0; i < toEvict; i++) {// Pick a random item (Knuth shuffle algorithm)int next = i + random.nextInt(expiredLeases.size() - i);Collections.swap(expiredLeases, i, next);Lease<InstanceInfo> lease = expiredLeases.get(i);String appName = lease.getHolder().getAppName();String id = lease.getHolder().getId();EXPIRED.increment();logger.warn("DS: Registry: expired lease for {}/{}", appName, id);最终还是调用取消服务的方法。internalCancel(appName, id, false);}}}
服务数量的动态值就是通过上面三个地方进行动态更新的。
接下来我们来看看eureka是如何计算最后一分钟的服务实际续约数量的?????????
还是我们在前面讲的三个核心属性中的:这个叫做续约最新一分钟任务。
private final MeasuredRate renewsLastMin;
protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) {this.serverConfig = serverConfig;this.clientConfig = clientConfig;this.serverCodecs = serverCodecs;this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000);this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000);赋值这个任务,设置间隔时间是1分钟,注意这里是写死的。this.renewsLastMin = new MeasuredRate(1000 * 60 * 1);this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),serverConfig.getDeltaRetentionTimerIntervalInMs(),serverConfig.getDeltaRetentionTimerIntervalInMs());}
然后在初始化方法中进行启动:
protected void postInit() {启动最新一分钟续约数量的计算任务renewsLastMin.start();if (evictionTaskRef.get() != null) {evictionTaskRef.get().cancel();}evictionTaskRef.set(new EvictionTask());evictionTimer.schedule(evictionTaskRef.get(),serverConfig.getEvictionIntervalTimerInMs(),serverConfig.getEvictionIntervalTimerInMs());}
启动任务方法源码:
public synchronized void start() {if (!isActive) {timer.schedule(new TimerTask() {@Overridepublic void run() {try {// Zero out the current bucket.lastBucket.set(currentBucket.getAndSet(0));} catch (Throwable e) {logger.error("Cannot reset the Measured Rate", e);}}此处的sampleInterval 就是构造函数中定义的1000 * 60 * 1}, sampleInterval, sampleInterval);isActive = true;}}
MeasuredRate 源码:
public class MeasuredRate {private static final Logger logger = LoggerFactory.getLogger(MeasuredRate.class);所有节点的最后续约数private final AtomicLong lastBucket = new AtomicLong(0);当前节点的当前本次需约数,例如续约一次+1 , 也可以续约一次+2,就看increment怎么实现。private final AtomicLong currentBucket = new AtomicLong(0);private final long sampleInterval;private final Timer timer;private volatile boolean isActive;/*** @param sampleInterval in milliseconds*/public MeasuredRate(long sampleInterval) {this.sampleInterval = sampleInterval;this.timer = new Timer("Eureka-MeasureRateTimer", true);this.isActive = false;}public synchronized void start() {if (!isActive) {timer.schedule(new TimerTask() {@Overridepublic void run() {try {// Zero out the current bucket.把当前续约的数量放到最后续约数量中lastBucket.set(currentBucket.getAndSet(0));} catch (Throwable e) {logger.error("Cannot reset the Measured Rate", e);}}}, sampleInterval, sampleInterval);isActive = true;}}public synchronized void stop() {if (isActive) {timer.cancel();isActive = false;}}/*** Returns the count in the last sample interval.*/获取续约数量。public long getCount() {return lastBucket.get();}/*** Increments the count in the current sample interval.*/在调用续约的方法的时候会调用次方法,进行需约数 +1 public void increment() {currentBucket.incrementAndGet();}
}
4、上面我们讲解了续约阈值以及服务最后一分钟续约数量,那么如果处于自我保护状态了,有是如何恢复正常的呢?????
经过翻阅源码我们发现了,其实还是使用定时任务,我们来看一看源码:来到 PeerAwareInstanceRegistryImpl 类中的init(...)方法:
@Overridepublic void init(PeerEurekaNodes peerEurekaNodes) throws Exception {this.numberOfReplicationsLastMin.start();this.peerEurekaNodes = peerEurekaNodes;initializedResponseCache();启动续约阈值更新任务scheduleRenewalThresholdUpdateTask();initRemoteRegionRegistry();try {Monitors.registerObject(this);} catch (Throwable e) {logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);}}
启动的方法:
private void scheduleRenewalThresholdUpdateTask() {timer.schedule(new TimerTask() {@Overridepublic void run() {更新续约阈值updateRenewalThreshold();}此处的serverConfig.getRenewalThresholdUpdateIntervalMs()就是默认的15分钟。}, serverConfig.getRenewalThresholdUpdateIntervalMs(),serverConfig.getRenewalThresholdUpdateIntervalMs());}@Overridepublic int getRenewalThresholdUpdateIntervalMs() {return configInstance.getIntProperty(namespace + "renewalThresholdUpdateIntervalMs",(15 * 60 * 1000)).get();}
更新续约阈值的实现:
private void updateRenewalThreshold() {try {获取所有的应用Applications apps = eurekaClient.getApplications();int count = 0;for (Application app : apps.getRegisteredApplications()) {获取应用的节点,然后计算节点数量。for (InstanceInfo instance : app.getInstances()) {if (this.isRegisterable(instance)) {++count;}}}synchronized (lock) {// Update threshold only if the threshold is greater than the// current expected threshold or if self preservation is disabled.如果节点数量>续约阈值 或者 配置了没有启用自我保护机制,那就更新续约阈值。15分钟做一次,也就是说没15分钟做一次矫正,双重保障。if ((count) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfClientsSendingRenews)|| (!this.isSelfPreservationModeEnabled())) {this.expectedNumberOfClientsSendingRenews = count;updateRenewsPerMinThreshold();}}logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);} catch (Throwable e) {logger.error("Cannot update renewal threshold", e);}}
5 、eureka的监控页面上的数据来源
我们来到spring-cloud-netflix-eureka-server 包下面,打开navbar.ftlh:
<h1>System Status</h1>
<div class="row"><div class="col-md-6"><table id='instances' class="table table-condensed table-striped table-hover"><#if amazonInfo??><tr><td>EUREKA SERVER</td><td>AMI: ${amiId!}</td></tr><tr><td>Zone</td><td>${availabilityZone!}</td></tr><tr><td>instance-id</td><td>${instanceId!}</td></tr></#if><tr><td>Environment</td><td>${environment!}</td></tr><tr><td>Data center</td><td>${datacenter!}</td></tr></table></div><div class="col-md-6"><table id='instances' class="table table-condensed table-striped table-hover"><tr><td>Current time</td><td>${currentTime}</td></tr><tr><td>Uptime</td><td>${upTime}</td></tr><tr><td>Lease expiration enabled</td><td>${registry.leaseExpirationEnabled?c}</td></tr><tr>这里是续约的阈值。<td>Renews threshold</td><td>${registry.numOfRenewsPerMinThreshold}</td></tr><tr>这里就是获取到的所有节点最后一分钟续约的数量<td>Renews (last min)</td><td>${registry.numOfRenewsInLastMin}</td></tr></table></div>
</div><#if isBelowRenewThresold><#if !registry.selfPreservationModeEnabled><h4 id="uptime"><font size="+1" color="red"><b>RENEWALS ARE LESSER THAN THE THRESHOLD. THE SELF PRESERVATION MODE IS TURNED OFF. THIS MAY NOT PROTECT INSTANCE EXPIRY IN CASE OF NETWORK/OTHER PROBLEMS.</b></font></h4><#else><h4 id="uptime"><font size="+1" color="red"><b>EMERGENCY! EUREKA MAY BE INCORRECTLY CLAIMING INSTANCES ARE UP WHEN THEY'RE NOT. RENEWALS ARE LESSER THAN THRESHOLD AND HENCE THE INSTANCES ARE NOT BEING EXPIRED JUST TO BE SAFE.</b></font></h4></#if>
<#elseif !registry.selfPreservationModeEnabled><h4 id="uptime"><font size="+1" color="red"><b>THE SELF PRESERVATION MODE IS TURNED OFF. THIS MAY NOT PROTECT INSTANCE EXPIRY IN CASE OF NETWORK/OTHER PROBLEMS.</b></font></h4>
</#if><h1>DS Replicas</h1>
<ul class="list-group"><#list replicas as replica><li class="list-group-item"><a href="${replica.value}">${replica.key}</a></li></#list>
</ul>
这个页面中的数据是通过接口 EurekaController 中的status 接口获取的:
@RequestMapping(method = RequestMethod.GET)public String status(HttpServletRequest request, Map<String, Object> model) {populateBase(request, model);populateApps(model);StatusInfo statusInfo;try {statusInfo = new StatusResource().getStatusInfo();}catch (Exception e) {statusInfo = StatusInfo.Builder.newBuilder().isHealthy(false).build();}model.put("statusInfo", statusInfo);populateInstanceInfo(model, statusInfo);filterReplicas(model, statusInfo);return "eureka/status";}
在model里面存在registry数据模型:
这就是eureka服务端自我保护机制的原理。