服务注册

在以下几种情况中,Client会将自己注册到Server中:

  1. 启动时:当客户端应用启动并初始化Spring应用上下文后,如果它包含一个DiscoveryClient实例(例如EurekaDiscoveryClient),那么它会自动将自己注册到服务端。这是通过DiscoveryClient@PostConstruct注解标记的方法完成的。
  2. 服务状态变化时:如果客户端应用的服务状态发生变化(例如,从DOWN状态变为UP状态),并且这个状态变化被一个StatusChangeListener监听到,那么客户端可能会重新将自己注册到服务端。这是通过StatusChangeListenernotify()方法完成的。
  3. 定期心跳失败后:客户端应用会定期向服务端发送心跳以维持其在服务端的注册状态。如果心跳失败(例如,因为网络问题或服务端故障),那么客户端可能会尝试重新将自己注册到服务端。

我们以服务启动时触发的注册行为为例,然后从Client和Server两段看下它的注册流程。

Client端

在Client启动的过程中,Spring Boot通过自动装配机制将EurekaClientAutoConfiguration注册为Bean,而这个类中又将EurekaAutoServiceRegistration注册成Bean。在EurekaAutoServiceRegistration这个类中实现了SmartLifecycle接口同时isAutoStartup()方法结果为True,所以会执行它的start()方法,这个方法的最后会发布一个StatusChangeEvent事件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Override
public void start() {
/** 省略代码*/
if (!running.get() && registration.getNonSecurePort() > 0) {
context.publishEvent(new InstancePreRegisteredEvent(this, registration));
// 注册动作
serviceRegistry.register(registration);

context.publishEvent(new InstanceRegisteredEvent<>(this, registration.getInstanceConfig()));
running.set(true);
}
}

@Override
public void register(EurekaRegistration reg) {
/** 省略代码*/
// 需要看setInstanceStatus()方法
reg.getApplicationInfoManager().setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
/** 省略代码*/
}
public synchronized void setInstanceStatus(InstanceStatus status) {
InstanceStatus next = instanceStatusMapper.map(status);
if (next == null) {
return;
}

InstanceStatus prev = instanceInfo.setStatus(next);
if (prev != null) {
for (StatusChangeListener listener : listeners.values()) {
try {
// 发布了一个StatusChangeListener事件
listener.notify(new StatusChangeEvent(prev, next));
} catch (Exception e) {
logger.warn("failed to notify listener: {}", listener.getId(), e);
}
}
}
}

而恰好之前Spring Cloud在初始化DIscoveryClient类的时候已经添加了一个StatusChangeListener监听器,所以我们继续看onDemandUpdate()。

1
2
3
4
5
6
7
8
9
10
11
12
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}

@Override
public void notify(StatusChangeEvent statusChangeEvent) {
logger.info("Saw local status change event {}", statusChangeEvent);
instanceInfoReplicator.onDemandUpdate();
}
};

发现它内部声明了一个异步线程,那就继续这个线程的run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public boolean onDemandUpdate() {
if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
if (!scheduler.isShutdown()) {
scheduler.submit(new Runnable() {
@Override
public void run() {
logger.debug("Executing on-demand update of local InstanceInfo");

Future latestPeriodic = scheduledPeriodicRef.get();
if (latestPeriodic != null && !latestPeriodic.isDone()) {
logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
latestPeriodic.cancel(false);
}

InstanceInfoReplicator.this.run();
}
});
return true;
} else {
logger.warn("Ignoring onDemand update due to stopped scheduler");
return false;
}
} else {
logger.warn("Ignoring onDemand update due to rate limiter");
return false;
}
}

instanceInfoReplicator#run方法内部,是通过发送一个HTTP请求完成了服务注册。可以看到finally中也设置了一个任务,30秒后会再次注册一次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void run() {
try {
// 主要是刷新应用的数据中心信息和租约信息
discoveryClient.refreshInstanceInfo();
// 上次更新时间,防止并发操作
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();
instanceInfo.unsetIsDirty(dirtyTimestamp);
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
// 30秒后再次执行一次注册任务
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}

Server端

服务端处理注册请求的ApplicationResource#addInstance方法,服务端再接收到这个请求时,会先发布一个EurekaInstanceRegisteredEvent事件,然后将实例信息放入一个register中,最后再清理下缓存。大致是以下流程

  1. 发布一个EurekaInstanceRegisteredEvent事件,表示有客户端触发了注册事件。
  2. 构造一个Lease对象作为Value值,key为InstanceInfo对象的ID,将这个键值对放入三级缓存(registry)中,同时也会完成其他集合的操作,比如构造Pair后放入recentRegisteredQueue,构造RecentlyChangedItem放入recentlyChangedQueue,最后清理一下缓存。
  3. 通知其它Server完成节点的变更

服务发现

我们再创建一个Client服务,以观察它是如何发现其它服务的。

全量拉取

Client端在启动时会立即触发一次拉取操作,从Server端全量拉取注册表,Server端在接受到请求后会从缓存中获取全量注册表信息,然后将这些信息压缩为GZIP进行网络传输。Client端接受到数据后再将数据维护到本地缓存localRegionApps中。

服务发现-全量 (1)

增量拉取

Client端是如何拉取增量数据的?

Client端在启动时会初始化一个定时任务CacheRefreshThread,默认情况下每隔30秒从Server端拉取增量更新的注册表信息。Client端在获取到增量信息后会计算Hash值,如果和服务端返回的不一致会触发一次全量拉取注册表并覆盖本地注册表的操作。

服务发现-增量

具体再看下Client端的处理逻辑。这里面为了保证Client端与Server端的数据一致,采用了计算并比对HASH值的方式,算一个技术上亮点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private void getAndUpdateDelta(Applications applications) throws Throwable {
long currentUpdateGeneration = fetchRegistryGeneration.get();

Applications delta = null;
// 先请求Server获取增量数据
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
delta = httpResponse.getEntity();
}

if (delta == null) { // 如果Server端返回请求失败,就执行全量拉取到逻辑
logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
+ "Hence got the full registry.");
getAndStoreFullRegistry();
} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
// 拉取增量成功,CAS竞争锁
logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
String reconcileHashCode = "";
if (fetchRegistryUpdateLock.tryLock()) { // 再通过ReetrantLock保证数据安全
try {
updateDelta(delta); // 更新注册表信息
reconcileHashCode = getReconcileHashCode(applications); // 重新计算HASH值
} finally {
fetchRegistryUpdateLock.unlock();
}
} else {
logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
}
// There is a diff in number of instances for some reason
if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
// HASH值不一样,说明Client端和Server端的注册表数据不一致,重新获取全量注册表并更新本地注册表
reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall
}
} else {
logger.warn("Not updating application delta as another thread is updating it already");
logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
}
}

继续看下具体是如何更新注册表的,可以看到主要有三种行为:

  1. 新增 直接添加到本地注册表中
  2. 修改 新数据覆盖旧数据
  3. 删除 直接从本地注册表中移除旧数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
private void updateDelta(Applications delta) {
int deltaCount = 0;
for (Application app : delta.getRegisteredApplications()) { // 遍历增量注册表
for (InstanceInfo instance : app.getInstances()) {
Applications applications = getApplications();
String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
if (null == remoteApps) {
remoteApps = new Applications();
remoteRegionVsApps.put(instanceRegion, remoteApps);
}
applications = remoteApps;
}

++deltaCount;
if (ActionType.ADDED.equals(instance.getActionType())) { // 处理新增的数据
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
applications.addApplication(app);
}
logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
} else if (ActionType.MODIFIED.equals(instance.getActionType())) { // 处理变更的数据
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp == null) {
applications.addApplication(app);
}
logger.debug("Modified instance {} to the existing apps ", instance.getId());

applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);

} else if (ActionType.DELETED.equals(instance.getActionType())) { // 处理删除的数据
Application existingApp = applications.getRegisteredApplications(instance.getAppName());
if (existingApp != null) {
logger.debug("Deleted instance {} to the existing apps ", instance.getId());
existingApp.removeInstance(instance);
/*
* We find all instance list from application(The status of instance status is not only the status is UP but also other status)
* if instance list is empty, we remove the application.
*/
if (existingApp.size() == 0) {
applications.removeApplication(existingApp);
}
}
}
}
}
logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);

getApplications().setVersion(delta.getVersion());
getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());

for (Applications applications : remoteRegionVsApps.values()) {
applications.setVersion(delta.getVersion());
applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
}
}

Server端又是如何生成增量数据呢?

Server端有一个只保留最近三分钟数据的队列:recentlyChangedQueue,服务的注册、变更、删除等操作都会在里面添加一个元素,而获取增量数据即是遍历recentlyChangedQueue,然后组装成delta数据返回。

之所以这个队列只保留最近三分钟的元素,是因为Server启动时会初始化一个每隔30秒执行一次的定时任务,它会剔除掉队列中存活超过3分钟的元素。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
this.deltaRetentionTimer.schedule(getDeltaRetentionTask(),
serverConfig.getDeltaRetentionTimerIntervalInMs(), // 默认值30秒
serverConfig.getDeltaRetentionTimerIntervalInMs());
private TimerTask getDeltaRetentionTask() {
return new TimerTask() {
// serverConfig.getRetentionTimeInMSInDeltaQueue() = 3 * MINUTES
@Override
public void run() {
Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator();
while (it.hasNext()) {
if (it.next().getLastUpdateTime() <
System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) {
it.remove();
} else {
break;
}
}
}
};
}