小Demo
还是先建一个Demo工程,方便我们阅读源码。
新建一个Spring Boot项目,pom中添加以下信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| <properties> <java.version>17</java.version> <spring-cloud.version>2023.0.1</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
|
然后在启动类中加上@EnableEurekaServer注解
1 2 3 4 5 6 7 8 9
| @EnableEurekaServer @SpringBootApplication public class CloudServerApplication {
public static void main(String[] args) { SpringApplication.run(CloudServerApplication.class, args); }
}
|
最后在配置文件中加一下配置,这里为yml文件为例。
1 2 3 4 5 6 7 8 9 10 11
| server: port: 8761
eureka: instance: hostname: localhost client: registerWithEureka: false fetchRegistry: false serviceUrl: defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/
|
启动项目后,可以通过hostname+端口访问控制台了。
一张图介绍下启动流程

启动流程的源码解读
在前面的Demo中,我们看到启动一个简单的EurekaServer还是很简单的,只要引入依赖、添加配置、然后启动类上加上@EnableEurekaServer注解就可以了。那么我们就先从这个最显眼的注解入手。
@EnableEurekaServer的原理
看下这个注解的内部,可以看到它引入了一个类:EurekaServerMarkerConfiguration
1 2 3 4 5 6 7
| @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(EurekaServerMarkerConfiguration.class) public @interface EnableEurekaServer {
}
|
继续看EurekaServerMarkerConfiguration,可以看到它内部注册了一个Marker的Bean。
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Configuration(proxyBeanMethods = false) public class EurekaServerMarkerConfiguration {
@Bean public Marker eurekaServerMarkerBean() { return new Marker(); }
class Marker {
}
}
|
总结一下,启动类上加上@EnableEurekaServer注解,最终会创建一个Marker类的Bean
自动装配又做了什么
我们知道Spring Boot的自动装配机制也会创建Bean,而Spring Cloud又是基于Spring Boot开发的,那它内部应该也是有自动装配内容的,我们检查一下,果然在spring-cloud-netflix-eureka-server-4.1.1这个jar包中发现了org.springframework.boot.autoconfigure.AutoConfiguration.imports文件,然后文件内部配置了一个类
1
| org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
|
也就是说Spring Boot的自动装配机制会自动创建这个类的Bean。我们继续看这个类。
探究EurekaServerAutoConfiguration
先看下它内部都加了哪些注解
1 2 3 4 5 6 7
| @Configuration(proxyBeanMethods = false) @Import(EurekaServerInitializerConfiguration.class) @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class) @EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class, EurekaProperties.class }) @PropertySource("classpath:/eureka/server.properties") public class EurekaServerAutoConfiguration implements WebMvcConfigurer {}
|
- @Configuration 说明这时一个配置类,而且不需要创建CGLIB代理
- @Import 说明Spring会创建EurekaServerInitializerConfiguration中定义的Bean
- @ConditionalOnBean 说明只有存在类型为
EurekaServerMarkerConfiguration.Marker
的bean时,才会创建和管理这个配置类中定义的bean
- @EnableConfigurationProperties 启用指定的配置属性类Spring将创建这些类的实例,并从配置文件中填充它们的属性。
- @PropertySource 指定配置文件的位置,这个配置文件将被加载,并用于填充配置属性类的属性。
同时,它EurekaServerAutoConfiguration内部也通过@Bean注解定义了很多类型的Bean,这些Bean也会依次被创建。比如
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| @Bean public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs, EurekaServerHttpClientFactory eurekaServerHttpClientFactory, EurekaInstanceConfigBean eurekaInstanceConfigBean) { if (eurekaInstanceConfigBean.isAsyncClientInitialization()) { if (log.isDebugEnabled()) { log.debug("Initializing client asynchronously..."); }
ExecutorService executorService = Executors.newSingleThreadExecutor(); executorService.submit(() -> { this.eurekaClient.getApplications(); if (log.isDebugEnabled()) { log.debug("Asynchronous client initialization done."); } }); } else { this.eurekaClient.getApplications(); }
return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient, eurekaServerHttpClientFactory, this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(), this.instanceRegistryProperties.getDefaultOpenForTrafficCount()); }
@Bean @ConditionalOnMissingBean public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs, ReplicationClientAdditionalFilters replicationClientAdditionalFilters) { return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.applicationInfoManager, replicationClientAdditionalFilters); }
@Bean @ConditionalOnMissingBean public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) { return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, this.applicationInfoManager); }
@Bean public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) { return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig, registry, serverContext); }
|
EurekaServerContext类的作用
在它的内部我们看到这样一个方法,它被声明了@PostConstruct注解,意味着它在被实例化后就会自动执行。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @PostConstruct @Override public void initialize() { logger.info("Initializing ..."); peerEurekaNodes.start(); try { registry.init(peerEurekaNodes); } catch (Exception e) { throw new RuntimeException(e); } logger.info("Initialized"); }
|
先看下peerEurekaNodes.start()代码,它主要是更新Eureka的节点信息,然后在内部启动流程一个每分钟执行一次的定时更新Eureka节点信息的任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public void start() { taskExecutor = Executors.newSingleThreadScheduledExecutor( new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "Eureka-PeerNodesUpdater"); thread.setDaemon(true); return thread; } } ); try { updatePeerEurekaNodes(resolvePeerUrls()); Runnable peersUpdateTask = new Runnable() { @Override public void run() { try { updatePeerEurekaNodes(resolvePeerUrls()); } catch (Throwable e) { logger.error("Cannot update the replica Nodes", e); }
} }; taskExecutor.scheduleWithFixedDelay( peersUpdateTask, serverConfig.getPeerEurekaNodesUpdateIntervalMs(), serverConfig.getPeerEurekaNodesUpdateIntervalMs(), TimeUnit.MILLISECONDS ); } catch (Exception e) { throw new IllegalStateException(e); } for (PeerEurekaNode node : peerEurekaNodes) { logger.info("Replica node URL: {}", node.getServiceUrl()); } }
|
再看下registry.init(peerEurekaNodes)这部分的逻辑,具体是PeerAwareInstanceRegistryImpl#init(),它主要负责的是初始化Eureka的实例注册表,包括启动度量任务、初始化响应缓存、安排更新任务、初始化远程注册表、注册JMX监视器等
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Override public void init(PeerEurekaNodes peerEurekaNodes) throws Exception { this.numberOfReplicationsLastMin.start(); this.peerEurekaNodes = peerEurekaNodes; initializedResponseCache(); scheduleRenewalThresholdUpdateTask(); initRemoteRegionRegistry();
try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e); } }
|
重点看下initializedResponseCache(),因为EurekaServer三级缓存中的两个缓存都是在这一步初始化的。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| @Override public synchronized void initializedResponseCache() { if (responseCache == null) { responseCache = new ResponseCacheImpl(serverConfig, serverCodecs, this); } } ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) { this.serverConfig = serverConfig; this.serverCodecs = serverCodecs; this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache(); this.registry = registry; long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs(); this.readWriteCacheMap = CacheBuilder.newBuilder() .initialCapacity(serverConfig.getInitialCapacityOfResponseCache()) .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS) .removalListener(new RemovalListener<Key, Value>() { @Override public void onRemoval(RemovalNotification<Key, Value> notification) { Key removedKey = notification.getKey(); if (removedKey.hasRegions()) { Key cloneWithNoRegions = removedKey.cloneWithoutRegions(); regionSpecificKeys.remove(cloneWithNoRegions, removedKey); } } }) .build(new CacheLoader<Key, Value>() { @Override public Value load(Key key) throws Exception { if (key.hasRegions()) { Key cloneWithNoRegions = key.cloneWithoutRegions(); regionSpecificKeys.put(cloneWithNoRegions, key); } Value value = generatePayload(key); return value; } });
if (shouldUseReadOnlyResponseCache) { timer.schedule(getCacheUpdateTask(), new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs); }
try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e); } }
|
可以看到它显示构建了一个类型为CacheBuilder的readWriteCacheMap,即所谓的二级缓存,它被设置为了拥有以下能力
- 设定容量为1000,内部元素的过期时间为180秒
- 设置一个移除监听器,当缓存中的条目超过180秒没有被访问或修改从而被移除时,触发这个监听器。如果被移除的键有区域信息,那么它会从regionSpecificKeys中移除一个没有区域信息的键
- 设置一个缓存加载器,当尝试获取缓存中不存在的条目时,这个加载器会被触发。它会处理区域信息,然后生成并返回新值。
一级缓存的初始化操作
一级缓存(readOnlyCacheMap)是一个ConcurrentHashMap类,我们看下它的代码。可以看到它声明了一个定时器,这个定时器会在第一次执行后,每隔30秒钟执行一次。
1 2 3 4 5 6 7
| if (shouldUseReadOnlyResponseCache) { timer.schedule(getCacheUpdateTask(), new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs); }
|
现在我们看下它的具体任务是什么,总的来说就是它会遍历一级缓存,然后将一级缓存中的值与二级缓存中的值做比较,如果两边值不相等就将二级缓存中的值覆盖一级缓存。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| private TimerTask getCacheUpdateTask() { return new TimerTask() { @Override public void run() { logger.debug("Updating the client cache from response cache"); for (Key key : readOnlyCacheMap.keySet()) { if (logger.isDebugEnabled()) { logger.debug("Updating the client cache from response cache for key : {} {} {} {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType()); } try { CurrentRequestVersion.set(key.getVersion()); Value cacheValue = readWriteCacheMap.get(key); Value currentCacheValue = readOnlyCacheMap.get(key); if (cacheValue != currentCacheValue) { readOnlyCacheMap.put(key, cacheValue); } } catch (Throwable th) { logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th); } finally { CurrentRequestVersion.remove(); } } } }; }
|
最后看下scheduleRenewalThresholdUpdateTask(),它就是启动一个定时任务,每隔15分钟更新一下续约阈值。
1 2 3 4 5 6 7 8 9
| private void scheduleRenewalThresholdUpdateTask() { timer.schedule(new TimerTask() { @Override public void run() { updateRenewalThreshold(); } }, serverConfig.getRenewalThresholdUpdateIntervalMs(), serverConfig.getRenewalThresholdUpdateIntervalMs()); }
|
EurekaServerInitializerConfiguration类的作用
先看下它的内部结构,它实现了SmartLifecycle接口,并且isAutoStartup()返回的是True,那么它内部的start()方法将会被执行掉。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Override public void start() { new Thread(() -> { try { eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext); log.info("Started Eureka Server"); publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig())); EurekaServerInitializerConfiguration.this.running = true; publish(new EurekaServerStartedEvent(getEurekaServerConfig())); } catch (Exception ex) { log.error("Could not initialize Eureka servlet context", ex); } }).start(); }
@Override public boolean isAutoStartup() { return true; }
|
重点看下initEurekaServerContext方法,因为注册表复制与Eureka的故障感知和自动保护机制都在这里设置。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| protected void initEurekaServerContext() throws Exception { JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH);
if (isAws(this.applicationInfoManager.getInfo())) { this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry, this.applicationInfoManager); this.awsBinder.start(); } EurekaServerContextHolder.initialize(this.serverContext);
log.info("Initialized server context");
int registryCount = this.registry.syncUp(); this.registry.openForTraffic(this.applicationInfoManager, registryCount);
EurekaMonitors.registerAllStats(); }
|