EurekaClient与EurekaServer的心跳机制 Spring Cloud的心跳机制主要是通过Eureka客户端定期向Eureka服务器发送心跳包来实现的。这个机制用于维护服务实例的健康状态,并保证服务注册表的准确性。
具体来说,每个Eureka客户端会每隔一段时间(默认为30秒)向Eureka服务器发送一个心跳包,告诉服务器它还活着。Eureka服务器在收到心跳包后,会更新服务实例的租约信息,延长其在服务注册表中的存活时间。
如果Eureka服务器在一段时间内(默认为90秒 + 补偿时间)没有收到某个服务实例的心跳包,那么它会认为这个服务实例已经出现故障,将其从服务注册表中移除。
Client端 Client在启动时初始化了一个默认30秒被执行一次的任务:HeartbeatThread,在它内部的run()方法中,它会发起一次对Server端的续约请求,并更新自己的lastSuccessfulHeartbeatTimestamp。
1 2 3 4 5 6 7 8 9 private class HeartbeatThread implements Runnable { public void run () { if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } }
发虚续约请求时,如果服务端返回了404,说明当前Client端实例信息在Server端是不存在的,此时会调用register()重新向Server端注册自己。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 boolean renew () { EurekaHttpResponse<InstanceInfo> httpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null ); logger.debug(PREFIX + "{} - Heartbeat status: {}" , appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} - Re-registering apps/{}" , appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == Status.OK.getStatusCode(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to send heartbeat!" , appPathIdentifier, e); return false ; } }
Server端 Server端处理续约请求的地方为InstanceResource#renewLease()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 public Response renewLease ( @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication, @QueryParam("overriddenstatus") String overriddenStatus, @QueryParam("status") String status, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) { boolean isFromReplicaNode = "true" .equals(isReplication); boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode); if (!isSuccess) { logger.warn("Not Found (Renew): {} - {}" , app.getName(), id); return Response.status(Status.NOT_FOUND).build(); } Response response; if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) { response = this .validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode); if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode() && (overriddenStatus != null ) && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus)) && isFromReplicaNode) { registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus)); } } else { response = Response.ok().build(); } logger.debug("Found (Renew): {} - {}; reply status={}" , app.getName(), id, response.getStatus()); return response; }
InstanceRegistry#renew()
发布续约事件,调用父类的续约方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 @Override public boolean renew (final String appName, final String serverId, boolean isReplication) { log("renew " + appName + " serverId " + serverId + ", isReplication {}" + isReplication); Application application = getApplication(appName); if (application != null ) { InstanceInfo instanceInfo = application.getByInstanceId(serverId); if (instanceInfo != null ) { publishEvent(new EurekaInstanceRenewedEvent (this , appName, serverId, instanceInfo, isReplication)); } } return super .renew(appName, serverId, isReplication); }
PeerAwareInstanceRegistryImpl#renew()
调用父类续约方法,如果返回成功就通知其它Server复制这个心跳事件。
1 2 3 4 5 6 7 public boolean renew (final String appName, final String id, final boolean isReplication) { if (super .renew(appName, id, isReplication)) { replicateToPeers(Action.Heartbeat, appName, id, null , null , isReplication); return true ; } return false ; }
AbstractInstanceRegistry#renew()
具体的续约动作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public boolean renew (String appName, String id, boolean isReplication) { RENEW.increment(isReplication); Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); Lease<InstanceInfo> leaseToRenew = null ; if (gMap != null ) { leaseToRenew = gMap.get(id); } if (leaseToRenew == null ) { RENEW_NOT_FOUND.increment(isReplication); logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}" , appName, id); return false ; } else { InstanceInfo instanceInfo = leaseToRenew.getHolder(); if (instanceInfo != null ) { InstanceStatus overriddenInstanceStatus = this .getOverriddenInstanceStatus( instanceInfo, leaseToRenew, isReplication); if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) { logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}" + "; re-register required" , instanceInfo.getId()); RENEW_NOT_FOUND.increment(isReplication); return false ; } if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) { logger.info( "The instance status {} is different from overridden instance status {} for instance {}. " + "Hence setting the status to overridden status" , instanceInfo.getStatus().name(), overriddenInstanceStatus.name(), instanceInfo.getId()); instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus); } } renewsLastMin.increment(); leaseToRenew.renew(); return true ; } } public void renew () { lastUpdateTimestamp = System.currentTimeMillis() + duration; }
一张图总结下心跳机制
自我保护机制 自我保护机制就是说,有时候Eureka Server接受到的心跳数比期望的少,但这并不是Clinet的问题,是Server自己的网络出问题了,导致Clinet通讯不了,这种情况下,Server会进入自动保护机制,不会清除实例信息。
首先,要获取期望的心跳数。在Eureka启动类EurekaBootStrap执行的过程中,会执行一段registry.syncUp()的逻辑,这段逻辑会从近邻的Server节点将服务信息同步过来,同时注册到本地注册表,返回一个同步实例的数量
然后执行registry.openForTrafic()方法,入参之一就是同步实例的数量,比如同步了40个实例,那么每分钟期望的心跳数(numberOfRenewsPerMinThreshold) = 40 * (60 /30(可配置) ) * 0.85 = 68。那么这个expectedNumberOfClientsSendingRenews又是怎么维护的呢?每次Client来注册,Server都会将expectedNumberOfClientsSendingRenews + 1,并重新计算numberOfRenewPerMinThreshold;同理,当服务下线时,会将expectedNumberOfClientsSendingRenews - 1,再计算numberOfRenewPerMinThreshold值。我们可以看下更新numberOfRenewPerMinThreshold的代码:
1 2 3 4 5 protected void updateRenewsPerMinThreshold () { this .numberOfRenewsPerMinThreshold = (int ) (this .expectedNumberOfClientsSendingRenews * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds()) * serverConfig.getRenewalPercentThreshold()); }
然后执行super.postInit()方法,里面会执行一段renewsLastMin.start()方法,会每隔一分钟将上一分钟接收到的心跳数置为0,即只统计这一分钟的心跳数。每次接收到续约请求,会将这个数目累加1。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public synchronized void start () { if (!isActive) { timer.schedule(new TimerTask () { @Override public void run () { try { lastBucket.set(currentBucket.getAndSet(0 )); } catch (Throwable e) { logger.error("Cannot reset the Measured Rate" , e); } } }, sampleInterval, sampleInterval); isActive = true ; } }
每隔一分钟执行一次EvictionTask的run方法,这个方法会执行一个evict方法,这个方法内有这样一段逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 public void evict (long additionalLeaseMs) { logger.debug("Running the evict task" ); if (!isLeaseExpirationEnabled()) { logger.debug("DS: lease expiration is currently disabled." ); return ; } } @Override public boolean isLeaseExpirationEnabled () { if (!isSelfPreservationModeEnabled()) { return true ; } return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold; } @Override public long getNumOfRenewsInLastMin () { return renewsLastMin.getCount(); } public long getCount () { return lastBucket.get(); }