小Demo

还是先建一个Demo工程,方便我们阅读源码。

新建一个Spring Boot项目,pom中添加以下信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
<properties>
<java.version>17</java.version>
<spring-cloud.version>2023.0.1</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

然后在启动类中加上@EnableEurekaServer注解

1
2
3
4
5
6
7
8
9
@EnableEurekaServer
@SpringBootApplication
public class CloudServerApplication {

public static void main(String[] args) {
SpringApplication.run(CloudServerApplication.class, args);
}

}

最后在配置文件中加一下配置,这里为yml文件为例。

1
2
3
4
5
6
7
8
9
10
11
server:
port: 8761

eureka:
instance:
hostname: localhost
client:
registerWithEureka: false
fetchRegistry: false
serviceUrl:
defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/

启动项目后,可以通过hostname+端口访问控制台了。

一张图介绍下启动流程

EurekaServer的启动流程

启动流程的源码解读

在前面的Demo中,我们看到启动一个简单的EurekaServer还是很简单的,只要引入依赖、添加配置、然后启动类上加上@EnableEurekaServer注解就可以了。那么我们就先从这个最显眼的注解入手。

@EnableEurekaServer的原理

看下这个注解的内部,可以看到它引入了一个类:EurekaServerMarkerConfiguration

1
2
3
4
5
6
7
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {

}

继续看EurekaServerMarkerConfiguration,可以看到它内部注册了一个Marker的Bean。

1
2
3
4
5
6
7
8
9
10
11
12
13
@Configuration(proxyBeanMethods = false)
public class EurekaServerMarkerConfiguration {

@Bean
public Marker eurekaServerMarkerBean() {
return new Marker();
}

class Marker {

}

}

总结一下,启动类上加上@EnableEurekaServer注解,最终会创建一个Marker类的Bean

自动装配又做了什么

我们知道Spring Boot的自动装配机制也会创建Bean,而Spring Cloud又是基于Spring Boot开发的,那它内部应该也是有自动装配内容的,我们检查一下,果然在spring-cloud-netflix-eureka-server-4.1.1这个jar包中发现了org.springframework.boot.autoconfigure.AutoConfiguration.imports文件,然后文件内部配置了一个类

1
org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration

也就是说Spring Boot的自动装配机制会自动创建这个类的Bean。我们继续看这个类。

探究EurekaServerAutoConfiguration

先看下它内部都加了哪些注解

1
2
3
4
5
6
7
@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class,
EurekaProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {}
  • @Configuration 说明这时一个配置类,而且不需要创建CGLIB代理
  • @Import 说明Spring会创建EurekaServerInitializerConfiguration中定义的Bean
  • @ConditionalOnBean 说明只有存在类型为EurekaServerMarkerConfiguration.Marker的bean时,才会创建和管理这个配置类中定义的bean
  • @EnableConfigurationProperties 启用指定的配置属性类Spring将创建这些类的实例,并从配置文件中填充它们的属性。
  • @PropertySource 指定配置文件的位置,这个配置文件将被加载,并用于填充配置属性类的属性。

同时,它EurekaServerAutoConfiguration内部也通过@Bean注解定义了很多类型的Bean,这些Bean也会依次被创建。比如

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// 注册一个PeerAwareInstanceRegistry实例,用于管理Eureka服务的实例注册表。
@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs,
EurekaServerHttpClientFactory eurekaServerHttpClientFactory,
EurekaInstanceConfigBean eurekaInstanceConfigBean) {
if (eurekaInstanceConfigBean.isAsyncClientInitialization()) {
if (log.isDebugEnabled()) {
log.debug("Initializing client asynchronously...");
}

ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.submit(() -> {
this.eurekaClient.getApplications();
if (log.isDebugEnabled()) {
log.debug("Asynchronous client initialization done.");
}
});
}
else {
this.eurekaClient.getApplications(); // force initialization
}

return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient,
eurekaServerHttpClientFactory,
this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}
// 创建一个PeerEurekaNodes,用于管理Eureka服务的节点信息
@Bean
@ConditionalOnMissingBean
public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs,
ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs,
this.applicationInfoManager, replicationClientAdditionalFilters);
}
// 创建EurekaServerContext实例,用于管理Eureka服务器的上下文信息
@Bean
@ConditionalOnMissingBean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry,
PeerEurekaNodes peerEurekaNodes) {
return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes,
this.applicationInfoManager);
}
// 创建EurekaServerBootstrap,用于初始化和启动Eureka服务器
@Bean
public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry,
EurekaServerContext serverContext) {
return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig,
registry, serverContext);
}

EurekaServerContext类的作用

在它的内部我们看到这样一个方法,它被声明了@PostConstruct注解,意味着它在背实例化后就会自动执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@PostConstruct
@Override
public void initialize() {
logger.info("Initializing ...");
// 启动Eureka节点更新任务,并期更新Eureka节点信息
peerEurekaNodes.start();
try {
// 初始化实例注册表
registry.init(peerEurekaNodes);
} catch (Exception e) {
throw new RuntimeException(e);
}
logger.info("Initialized");
}

先看下peerEurekaNodes.start()代码,它主要是更新Eureka的节点信息,然后在内部启动流程一个每分钟执行一次的定时更新Eureka节点信息的任务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public void start() {
// 创建一个单线程的定时任务执行器
taskExecutor = Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
thread.setDaemon(true);
return thread;
}
}
);
try {
// 先更新Eureka节点信息
updatePeerEurekaNodes(resolvePeerUrls());
// 创建一个异步更新任务
Runnable peersUpdateTask = new Runnable() {
@Override
public void run() {
try {
updatePeerEurekaNodes(resolvePeerUrls());
} catch (Throwable e) {
logger.error("Cannot update the replica Nodes", e);
}

}
};
// 将异步任务交给调度器执行,默认情况下,延迟1分钟后,然后每隔1分钟执行一次更新任务
taskExecutor.scheduleWithFixedDelay(
peersUpdateTask,
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
TimeUnit.MILLISECONDS
);
} catch (Exception e) {
throw new IllegalStateException(e);
}
for (PeerEurekaNode node : peerEurekaNodes) {
logger.info("Replica node URL: {}", node.getServiceUrl());
}
}

再看下registry.init(peerEurekaNodes)这部分的逻辑,具体是PeerAwareInstanceRegistryImpl#init(),它主要负责的是初始化Eureka的实例注册表,包括启动度量任务、初始化响应缓存、安排更新任务、初始化远程注册表、注册JMX监视器等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
// 启动一个定时任务,用于度量每分钟的复制次数
this.numberOfReplicationsLastMin.start();
this.peerEurekaNodes = peerEurekaNodes;
// 初始化响应缓存
initializedResponseCache();
// 启动一个每隔(15分钟)更新续约阈值的定时任务
scheduleRenewalThresholdUpdateTask();
// 初始化远程区域注册表
initRemoteRegionRegistry();

try {
// 注册JMX监听器
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e);
}
}

重点看下initializedResponseCache(),因为EurekaServer三级缓存中的两个缓存都是在这一步初始化的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
@Override
public synchronized void initializedResponseCache() {
if (responseCache == null) {
// 构造了一个ResponseCacheImpl实例
responseCache = new ResponseCacheImpl(serverConfig, serverCodecs, this);
}
}
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
this.serverConfig = serverConfig;
this.serverCodecs = serverCodecs;
// 默认为True
this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
this.registry = registry;
// 默认值为30000,即30秒
long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
//
this.readWriteCacheMap =
CacheBuilder.newBuilder()
.initialCapacity(serverConfig.getInitialCapacityOfResponseCache()) // 容量1000
.expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS) // 默认过期时间3分钟
.removalListener(new RemovalListener<Key, Value>() { // 设置过期时的监听器
@Override
public void onRemoval(RemovalNotification<Key, Value> notification) {
Key removedKey = notification.getKey();
if (removedKey.hasRegions()) {
Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
}
}
})
.build(new CacheLoader<Key, Value>() { // 设置未命中缓存时的操作,即从注册表中获取信息
@Override
public Value load(Key key) throws Exception {
if (key.hasRegions()) {
Key cloneWithNoRegions = key.cloneWithoutRegions();
regionSpecificKeys.put(cloneWithNoRegions, key);
}
Value value = generatePayload(key);
return value;
}
});

if (shouldUseReadOnlyResponseCache) {
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}

try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e);
}
}

可以看到它显示构建了一个类型为CacheBuilder的readWriteCacheMap,即所谓的二级缓存,它被设置为了拥有以下能力

  • 设定容量为1000,内部元素的过期时间为180秒
  • 设置一个移除监听器,当缓存中的条目超过180秒没有被访问或修改从而被移除时,触发这个监听器。如果被移除的键有区域信息,那么它会从regionSpecificKeys中移除一个没有区域信息的键
  • 设置一个缓存加载器,当尝试获取缓存中不存在的条目时,这个加载器会被触发。它会处理区域信息,然后生成并返回新值。

一级缓存的初始化操作

一级缓存(readOnlyCacheMap)是一个ConcurrentHashMap类,我们看下它的代码。可以看到它声明了一个定时器,这个定时器会在第一次执行后,每隔30秒钟执行一次。

1
2
3
4
5
6
7
if (shouldUseReadOnlyResponseCache) {
// responseCacheUpdateIntervalMs的默认值为3000
timer.schedule(getCacheUpdateTask(),
new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
+ responseCacheUpdateIntervalMs),
responseCacheUpdateIntervalMs);
}

现在我们看下它的具体任务是什么,总的来说就是它会遍历一级缓存,然后将一级缓存中的值与二级缓存中的值做比较,如果两边值不相等就将二级缓存中的值覆盖一级缓存。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private TimerTask getCacheUpdateTask() {
return new TimerTask() {
@Override
public void run() {
logger.debug("Updating the client cache from response cache");
for (Key key : readOnlyCacheMap.keySet()) {
if (logger.isDebugEnabled()) {
logger.debug("Updating the client cache from response cache for key : {} {} {} {}",
key.getEntityType(), key.getName(), key.getVersion(), key.getType());
}
try {
CurrentRequestVersion.set(key.getVersion());
Value cacheValue = readWriteCacheMap.get(key);
Value currentCacheValue = readOnlyCacheMap.get(key);
// 如果值不相等,设置一级缓存
if (cacheValue != currentCacheValue) {
readOnlyCacheMap.put(key, cacheValue);
}
} catch (Throwable th) {
logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th);
} finally {
CurrentRequestVersion.remove();
}
}
}
};
}

最后看下scheduleRenewalThresholdUpdateTask(),它就是启动一个定时任务,每隔15分钟更新一下续约阈值。

1
2
3
4
5
6
7
8
9
private void scheduleRenewalThresholdUpdateTask() {
timer.schedule(new TimerTask() {
@Override
public void run() {
updateRenewalThreshold();
}
}, serverConfig.getRenewalThresholdUpdateIntervalMs(),
serverConfig.getRenewalThresholdUpdateIntervalMs());
}

EurekaServerInitializerConfiguration类的作用

先看下它的内部结构,它实现了SmartLifecycle接口,并且isAutoStartup()返回的是True,那么它内部的start()方法将会被执行掉。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

@Override
public void start() {
new Thread(() -> {
try {
// 初始化Eureka服务器上下文,内部有很多逻辑,主要是同步或更新注册表的信息,是集群高可用的保障措施之一。
eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
// 表示Eureka注册表已经可用
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
EurekaServerInitializerConfiguration.this.running = true;
// 表示Eureka服务器已启动
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
}
catch (Exception ex) {
// Help!
log.error("Could not initialize Eureka servlet context", ex);
}
}).start();
}

@Override
public boolean isAutoStartup() {
return true;
}


重点看下initEurekaServerContext方法,因为注册表复制与Eureka的故障感知和自动保护机制都在这里设置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
protected void initEurekaServerContext() throws Exception {
// For backward compatibility
JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);

if (isAws(this.applicationInfoManager.getInfo())) {
this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry,
this.applicationInfoManager);
this.awsBinder.start();
}
// 将EurekaServer的上下文设置到EurekaServerContextHolder中
EurekaServerContextHolder.initialize(this.serverContext);

log.info("Initialized server context");

// 处理从其他Server端同步过来的注册表信息
int registryCount = this.registry.syncUp();
// 故障感知与自动保护机制是在这设置的
this.registry.openForTraffic(this.applicationInfoManager, registryCount);

// Register all monitoring statistics.
EurekaMonitors.registerAllStats();
}