publicbooleanonDemandUpdate() { if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { if (!scheduler.isShutdown()) { scheduler.submit(newRunnable() { @Override publicvoidrun() { logger.debug("Executing on-demand update of local InstanceInfo");
FuturelatestPeriodic= scheduledPeriodicRef.get(); if (latestPeriodic != null && !latestPeriodic.isDone()) { logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update"); latestPeriodic.cancel(false); }
InstanceInfoReplicator.this.run(); } }); returntrue; } else { logger.warn("Ignoring onDemand update due to stopped scheduler"); returnfalse; } } else { logger.warn("Ignoring onDemand update due to rate limiter"); returnfalse; } }
if (delta == null) { // 如果Server端返回请求失败,就执行全量拉取到逻辑 logger.warn("The server does not allow the delta revision to be applied because it is not safe. " + "Hence got the full registry."); getAndStoreFullRegistry(); } elseif (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { // 拉取增量成功,CAS竞争锁 logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); StringreconcileHashCode=""; if (fetchRegistryUpdateLock.tryLock()) { // 再通过ReetrantLock保证数据安全 try { updateDelta(delta); // 更新注册表信息 reconcileHashCode = getReconcileHashCode(applications); // 重新计算HASH值 } finally { fetchRegistryUpdateLock.unlock(); } } else { logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta"); } // There is a diff in number of instances for some reason if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { // HASH值不一样,说明Client端和Server端的注册表数据不一致,重新获取全量注册表并更新本地注册表 reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall } } else { logger.warn("Not updating application delta as another thread is updating it already"); logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode()); } }
} elseif (ActionType.DELETED.equals(instance.getActionType())) { // 处理删除的数据 ApplicationexistingApp= applications.getRegisteredApplications(instance.getAppName()); if (existingApp != null) { logger.debug("Deleted instance {} to the existing apps ", instance.getId()); existingApp.removeInstance(instance); /* * We find all instance list from application(The status of instance status is not only the status is UP but also other status) * if instance list is empty, we remove the application. */ if (existingApp.size() == 0) { applications.removeApplication(existingApp); } } } } } logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);