EurekaClient与EurekaServer的心跳机制

Spring Cloud的心跳机制主要是通过Eureka客户端定期向Eureka服务器发送心跳包来实现的。这个机制用于维护服务实例的健康状态,并保证服务注册表的准确性。

具体来说,每个Eureka客户端会每隔一段时间(默认为30秒)向Eureka服务器发送一个心跳包,告诉服务器它还活着。Eureka服务器在收到心跳包后,会更新服务实例的租约信息,延长其在服务注册表中的存活时间。

如果Eureka服务器在一段时间内(默认为90秒 + 补偿时间)没有收到某个服务实例的心跳包,那么它会认为这个服务实例已经出现故障,将其从服务注册表中移除。

Client端

Client在启动时初始化了一个默认30秒被执行一次的任务:HeartbeatThread,在它内部的run()方法中,它会发起一次对Server端的续约请求,并更新自己的lastSuccessfulHeartbeatTimestamp。

1
2
3
4
5
6
7
8
9
private class HeartbeatThread implements Runnable {

public void run() {
if (renew()) { // 发送续约请求
// 更新时间戳
lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
}
}
}

发虚续约请求时,如果服务端返回了404,说明当前Client端实例信息在Server端是不存在的,此时会调用register()重新向Server端注册自己。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
// 发送续约请求
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { // 实例信息不存在的逻辑
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == Status.OK.getStatusCode();
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}

Server端

Server端处理续约请求的地方为InstanceResource#renewLease()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public Response renewLease(
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication,
@QueryParam("overriddenstatus") String overriddenStatus,
@QueryParam("status") String status,
@QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
boolean isFromReplicaNode = "true".equals(isReplication);
// 续约的具体动作
boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode);

// Not found in the registry, immediately ask for a register
if (!isSuccess) { // 如果续约失败,返回404
logger.warn("Not Found (Renew): {} - {}", app.getName(), id);
return Response.status(Status.NOT_FOUND).build();
}
// Check if we need to sync based on dirty time stamp, the client
// instance might have changed some value
Response response;
if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) {
// 校验最后更新时间戳,证服务注册表的准确性,防止因为网络延迟或者其他原因导致的服务实例信息不一致问题
response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode);
// Store the overridden status since the validation found out the node that replicates wins
if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
&& (overriddenStatus != null)
&& !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus))
&& isFromReplicaNode) {
// 重新注册
registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus));
}
} else {
response = Response.ok().build();
}
logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus());
return response;
}

InstanceRegistry#renew()

发布续约事件,调用父类的续约方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public boolean renew(final String appName, final String serverId, boolean isReplication) {
log("renew " + appName + " serverId " + serverId + ", isReplication {}" + isReplication);
Application application = getApplication(appName);
if (application != null) {
InstanceInfo instanceInfo = application.getByInstanceId(serverId);
if (instanceInfo != null) {
// 发布一个事件
publishEvent(new EurekaInstanceRenewedEvent(this, appName, serverId, instanceInfo, isReplication));
}
}
return super.renew(appName, serverId, isReplication);
}

PeerAwareInstanceRegistryImpl#renew()

调用父类续约方法,如果返回成功就通知其它Server复制这个心跳事件。

1
2
3
4
5
6
7
public boolean renew(final String appName, final String id, final boolean isReplication) {
if (super.renew(appName, id, isReplication)) {
replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
return true;
}
return false;
}

AbstractInstanceRegistry#renew()

具体的续约动作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public boolean renew(String appName, String id, boolean isReplication) {
RENEW.increment(isReplication);
// 从注册表中获取实例信息
Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
Lease<InstanceInfo> leaseToRenew = null;
if (gMap != null) {
leaseToRenew = gMap.get(id);
}
if (leaseToRenew == null) {
RENEW_NOT_FOUND.increment(isReplication);
logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
return false;
} else {
InstanceInfo instanceInfo = leaseToRenew.getHolder();
if (instanceInfo != null) {
// touchASGCache(instanceInfo.getASGName());
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
instanceInfo, leaseToRenew, isReplication);
if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
+ "; re-register required", instanceInfo.getId());
RENEW_NOT_FOUND.increment(isReplication);
return false;
}
if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
logger.info(
"The instance status {} is different from overridden instance status {} for instance {}. "
+ "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
overriddenInstanceStatus.name(),
instanceInfo.getId());
instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

}
}
// 将最近一分钟收到的心跳数量加,用于监控Eureka服务器的负载情况
renewsLastMin.increment();
// 更新实例的续约信息,可以看到是在当前时间上加上了duration(默认90秒)
leaseToRenew.renew();
return true;
}
}
// Lease#renew(),这里有问题,多加了duration,会影响EvictionTask中的过期算法。
public void renew() {
lastUpdateTimestamp = System.currentTimeMillis() + duration;
}

一张图总结下心跳机制

SpringCloud心跳机制

自我保护机制

自我保护机制就是说,有时候Eureka Server接受到的心跳数比期望的少,但这并不是Clinet的问题,是Server自己的网络出问题了,导致Clinet通讯不了,这种情况下,Server会进入自动保护机制,不会清除实例信息。

  1. 首先,要获取期望的心跳数。在Eureka启动类EurekaBootStrap执行的过程中,会执行一段registry.syncUp()的逻辑,这段逻辑会从近邻的Server节点将服务信息同步过来,同时注册到本地注册表,返回一个同步实例的数量

  2. 然后执行registry.openForTrafic()方法,入参之一就是同步实例的数量,比如同步了40个实例,那么每分钟期望的心跳数(numberOfRenewsPerMinThreshold) = 40 * (60 /30(可配置) ) * 0.85 = 68。那么这个expectedNumberOfClientsSendingRenews又是怎么维护的呢?每次Client来注册,Server都会将expectedNumberOfClientsSendingRenews + 1,并重新计算numberOfRenewPerMinThreshold;同理,当服务下线时,会将expectedNumberOfClientsSendingRenews - 1,再计算numberOfRenewPerMinThreshold值。我们可以看下更新numberOfRenewPerMinThreshold的代码:

    1
    2
    3
    4
    5
    protected void updateRenewsPerMinThreshold() {
    this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
    * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
    * serverConfig.getRenewalPercentThreshold());
    }
  3. 然后执行super.postInit()方法,里面会执行一段renewsLastMin.start()方法,会每隔一分钟将上一分钟接收到的心跳数置为0,即只统计这一分钟的心跳数。每次接收到续约请求,会将这个数目累加1。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    public synchronized void start() {
    if (!isActive) {
    timer.schedule(new TimerTask() {

    @Override
    public void run() {
    try {
    // 重置为0
    lastBucket.set(currentBucket.getAndSet(0));
    } catch (Throwable e) {
    logger.error("Cannot reset the Measured Rate", e);
    }
    }
    }, sampleInterval, sampleInterval);

    isActive = true;
    }
    }
  4. 每隔一分钟执行一次EvictionTask的run方法,这个方法会执行一个evict方法,这个方法内有这样一段逻辑:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    public void evict(long additionalLeaseMs) {
    logger.debug("Running the evict task");
    // 判断是否启用自动保护机制,启用的话不会再执行服务摘除逻辑
    // 以下两种情况会执行摘除动作:1、关闭自动保护机制; 2、开了自动保护机制但是上一分钟的心跳数大于期望心跳数
    if (!isLeaseExpirationEnabled()) {
    logger.debug("DS: lease expiration is currently disabled.");
    return;
    }
    // 忽略以下逻辑
    }
    // 如果关闭自动保护机制,返回True;如果开了自动保护机制但是上一分钟的心跳数大于期望心跳数,也返回True。
    @Override
    public boolean isLeaseExpirationEnabled() {
    if (!isSelfPreservationModeEnabled()) { // 如果没有开,返回True,并终止方法
    // The self preservation mode is disabled, hence allowing the instances to expire.
    return true;
    }
    //
    return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
    }
    // 下面两个方法为获取最近1分钟的累计心跳数,可以看到就是取的lastBucket的值。
    @Override
    public long getNumOfRenewsInLastMin() {
    return renewsLastMin.getCount();
    }

    public long getCount() {
    return lastBucket.get();
    }