首页 » 漏洞 » Eureka 源码解析 —— 应用实例注册发现(六)之全量获取

Eureka 源码解析 —— 应用实例注册发现(六)之全量获取

 
文章目录

Eureka 源码解析 —— 应用实例注册发现(六)之全量获取

������关注 微信公众号:【芋道源码】 有福利:

  1. RocketMQ / MyCAT / Sharding-JDBC 所有 源码分析文章列表
  2. RocketMQ / MyCAT / Sharding-JDBC 中文注释源码 GitHub 地址
  3. 您对于源码的疑问每条留言 将得到 认真 回复。 甚至不知道如何读源码也可以请教噢
  4. 新的 源码解析文章 实时 收到通知。 每周更新一篇左右
  5. 认真的 源码交流微信群。

1. 概述

本文主要分享 Eureka-Client 向 Eureka-Server 获取全量注册信息的过程

Eureka-Client 获取注册信息,分成 全量获取增量获取 。默认配置下,Eureka-Client 启动时,首先执行一次 全量 获取进行 本地缓存 注册信息,而后每 30增量 获取刷新 本地缓存 ( 非“ 正常 ”情况下会是全量获取 )。

本文重点在于 全量获取

推荐 Spring Cloud 书籍:

2. Eureka-Client 发起全量获取

本小节调用关系如下:

Eureka 源码解析 —— 应用实例注册发现(六)之全量获取

2.1 初始化全量获取

Eureka-Client 启动时,首先执行一次 全量 获取进行 本地缓存 注册信息,首先代码如下:

// DiscoveryClient.java /** * Applications 在本地的缓存 */ private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();  DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,  Provider<BackupRegistry> backupRegistryProvider) {    // ... 省略无关代码    // 【3.2.5】初始化应用集合在本地的缓存  localRegionApps.set(new Applications());    // ... 省略无关代码    // 【3.2.12】从 Eureka-Server 拉取注册信息  if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {  fetchRegistryFromBackup();  }    // ... 省略无关代码 } 

  • com.netflix.discovery.shared.Applications ,注册的应用集合。较为容易理解,点击 链接 链接查看带中文注释的类,这里就不啰嗦了。Applications 与 InstanceInfo 类关系如下:

    Eureka 源码解析 —— 应用实例注册发现(六)之全量获取

  • 配置 eureka.shouldFetchRegistry = true ,开启从 Eureka-Server 获取注册信息。默认值: true

  • 调用 #fetchRegistry(false) 方法,从 Eureka-Server 全量 获取注册信息,在「2.4 发起获取注册信息」详细解析。

2.2 定时获取

Eureka-Client 在初始化过程中,创建 获取注册信息 线程, 固定间隔 向 Eureka-Server 发起 获取注册信息 ( fetch ), 刷新 本地注册信息 缓存 。实现代码如下:

// DiscoveryClient.java DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,  Provider<BackupRegistry> backupRegistryProvider) {  // ... 省略无关代码    // 【3.2.9】初始化线程池  // default size of 2 - 1 each for heartbeat and cacheRefresh  scheduler = Executors.newScheduledThreadPool(2,  new ThreadFactoryBuilder()  .setNameFormat("DiscoveryClient-%d")  .setDaemon(true)  .build());    cacheRefreshExecutor = new ThreadPoolExecutor(  1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,  new SynchronousQueue<Runnable>(),  new ThreadFactoryBuilder()  .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")  .setDaemon(true)  .build()  ); // use direct handoff    // ... 省略无关代码    // 【3.2.14】初始化定时任务  initScheduledTasks();    // ... 省略无关代码 }  private void initScheduledTasks(){  // 向 Eureka-Server 心跳(续租)执行器  if (clientConfig.shouldFetchRegistry()) {  // registry cache refresh timer  int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();  int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();  scheduler.schedule(  new TimedSupervisorTask(  "cacheRefresh",  scheduler,  cacheRefreshExecutor,  registryFetchIntervalSeconds,  TimeUnit.SECONDS,  expBackOffBound,  new CacheRefreshThread()  ),  registryFetchIntervalSeconds, TimeUnit.SECONDS);  }  // ... 省略无关代码 } 

  • 初始化定时任务代码,和 续租 的定时任务代码类似,在

    《Eureka 源码解析 —— 应用实例注册发现(二)之续租

    有详细解析,这里不重复分享。

  • com.netflix.discovery.DiscoveryClient.CacheRefreshThread ,注册信息缓存刷新任务,实现代码如下:

    class CacheRefreshThread implements Runnable{  public void run(){  refreshRegistry();  } } 

    • 调用 #refreshRegistry(false) 方法,刷新注册信息缓存,在「2.3 刷新注册信息缓存」详细解析。

2.3 刷新注册信息缓存

调用 #refreshRegistry(false) 方法,刷新注册信息缓存,实现代码如下:

// DiscoveryClient.java  1: void refreshRegistry(){  2: try {  3: // TODO 芋艿:TODO[0009]:RemoteRegionRegistry  4: boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();  5:   6: boolean remoteRegionsModified = false;  7: // This makes sure that a dynamic change to remote regions to fetch is honored.  8: String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();  9: if (null != latestRemoteRegions) {  10: String currentRemoteRegions = remoteRegionsToFetch.get();  11: if (!latestRemoteRegions.equals(currentRemoteRegions)) {  12: // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync  13: synchronized (instanceRegionChecker.getAzToRegionMapper()) {  14: if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {  15: String[] remoteRegions = latestRemoteRegions.split(",");  16: remoteRegionsRef.set(remoteRegions);  17: instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);  18: remoteRegionsModified = true;  19: } else {  20: logger.info("Remote regions to fetch modified concurrently," +  21: " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);  22: }  23: }  24: } else {  25: // Just refresh mapping to reflect any DNS/Property change  26: instanceRegionChecker.getAzToRegionMapper().refreshMapping();  27: }  28: }  29:   30: boolean success = fetchRegistry(remoteRegionsModified);  31: if (success) {  32: // 设置 注册信息的应用实例数  33: registrySize = localRegionApps.get().size();  34: // 设置 最后获取注册信息时间  35: lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();  36: }  37:   38: // 打印日志  39: if (logger.isDebugEnabled()) {  40: StringBuilder allAppsHashCodes = new StringBuilder();  41: allAppsHashCodes.append("Local region apps hashcode: ");  42: allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());  43: allAppsHashCodes.append(", is fetching remote regions? ");  44: allAppsHashCodes.append(isFetchingRemoteRegionRegistries);  45: for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {  46: allAppsHashCodes.append(", Remote region: ");  47: allAppsHashCodes.append(entry.getKey());  48: allAppsHashCodes.append(" , apps hashcode: ");  49: allAppsHashCodes.append(entry.getValue().getAppsHashCode());  50: }  51: logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",  52: allAppsHashCodes.toString());  53: }  54: } catch (Throwable e) {  55: logger.error("Cannot fetch registry from server", e);  56: }   57: } 

  • 第 3 至 28 行 :TODO[0009]:RemoteRegionRegistry
  • 第 30 行 :调用 #fetchRegistry(false) 方法,从 Eureka-Server 获取注册信息,在「2.4 发起获取注册信息」详细解析。
  • 第 31 至 36 行 :获取注册信息成功,设置注册信息的应用实例数,最后获取注册信息时间。变量代码如下:

    /** * 注册信息的应用实例数 */ private volatile int registrySize = 0; /** * 最后成功从 Eureka-Server 拉取注册信息时间戳 */ private volatile long lastSuccessfulRegistryFetchTimestamp = -1; 

  • 第 38 至 53 行 :打印调试日志。

  • 第 54 至 56 行 :打印 异常 日志。

2.4 发起获取注册信息

调用 #fetchRegistry(false) 方法,从 Eureka-Server 获取注册信息( 根据条件判断,可能是 全量 ,也可能是 增量 ),实现代码如下:

 1: private boolean fetchRegistry(boolean forceFullRegistryFetch){  2: Stopwatch tracer = FETCH_REGISTRY_TIMER.start();  3:   4: try {  5: // 获取 本地缓存的注册的应用实例集合  6: // If the delta is disabled or if it is the first time, get all  7: // applications  8: Applications applications = getApplications();  9:  10: // 全量获取 11: if (clientConfig.shouldDisableDelta() // 禁用增量获取 12: || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) 13: || forceFullRegistryFetch 14: || (applications == null) // 空 15: || (applications.getRegisteredApplications().size() == 0) // 空 16: || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta 17: { 18: logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); 19: logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); 20: logger.info("Force full registry fetch : {}", forceFullRegistryFetch); 21: logger.info("Application is null : {}", (applications == null)); 22: logger.info("Registered Applications size is zero : {}", 23: (applications.getRegisteredApplications().size() == 0)); 24: logger.info("Application version is -1: {}", (applications.getVersion() == -1)); 25: // 执行 全量获取 26: getAndStoreFullRegistry(); 27: } else { 28: // 执行 增量获取 29: getAndUpdateDelta(applications); 30: } 31: // 设置 应用集合 hashcode 32: applications.setAppsHashCode(applications.getReconcileHashCode()); 33: // 打印 本地缓存的注册的应用实例数量 34: logTotalInstances(); 35: } catch (Throwable e) { 36: logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e); 37: return false; 38: } finally { 39: if (tracer != null) { 40: tracer.stop(); 41: } 42: } 43:  44: // Notify about cache refresh before updating the instance remote status 45: onCacheRefreshed(); 46:  47: // Update remote status based on refreshed data held in the cache 48: updateInstanceRemoteStatus(); 49:  50: // registry was fetched successfully, so return true 51: return true; 52: } 

  • 第 5 至 8 行 :获取本地缓存的注册的应用实例集合,实现代码如下:

    public Applications getApplications(){  return localRegionApps.get(); } 

  • 第 10 至 26 行 : 全量 获取注册信息。

    • 第 11 行 :配置 eureka.disableDelta = true ,禁用 增量 获取注册信息。默认值: false
    • 第 12 行 :TODO[0010]:getRegistryRefreshSingleVipAddress
    • 第 13 行 :方法参数 forceFullRegistryFetch 强制 全量 获取注册信息。
    • 第 14 至 15 行 :本地缓存为空。
    • 第 25 至 26 行 :调用 #getAndStoreFullRegistry() 方法, 全量 获取注册信息,并设置到本地缓存。下文详细解析。
  • 第 27 至 30 行 : 增量 获取注册信息,并刷新本地缓存,在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。
  • 第 31 至 32 行 :计算应用集合 hashcode 。该变量用于校验 增量 获取的注册信息和 Eureka-Server 全量 的注册信息是否一致( 完整 ),在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。
  • 第 33 至 34 行 :打印调试日志,输出本地缓存的注册的应用实例数量。实现代码如下:

    private void logTotalInstances(){  if (logger.isDebugEnabled()) {  int totInstances = 0;  for (Application application : getApplications().getRegisteredApplications()) {  totInstances += application.getInstancesAsIsFromEureka().size();  }  logger.debug("The total number of all instances in the client now is {}", totInstances);  } } 

  • 第 44 至 45 行 :触发 CacheRefreshedEvent 事件,事件监听器执行。目前 Eureka 未提供默认的该事件监听器。

    • #onCacheRefreshed() 方法,实现代码如下:

      /** * Eureka 事件监听器 */ private final CopyOnWriteArraySet<EurekaEventListener> eventListeners = new CopyOnWriteArraySet<>();  protected void onCacheRefreshed(){  fireEvent(new CacheRefreshedEvent()); }  protected void fireEvent(final EurekaEvent event){  for (EurekaEventListener listener : eventListeners) {  listener.onEvent(event);  } } 

    • 笔者的YY:你可以实现自定义的事件监听器监听 CacheRefreshedEvent 事件,以达到 持久化 最新的注册信息到存储器( 例如,本地文件 ),通过这样的方式,配合实现 BackupRegistry 接口读取存储器。BackupRegistry 接口调用如下:

      // 【3.2.12】从 Eureka-Server 拉取注册信息 if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {  fetchRegistryFromBackup(); } 

  • 第47 至 48 行 :更新 本地缓存 的当前应用实例在 Eureka-Server 的状态。

     1: private volatile InstanceInfo.InstanceStatus lastRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN;   2:   3: private synchronized void updateInstanceRemoteStatus(){  4: // Determine this instance's status for this app and set to UNKNOWN if not found  5: InstanceInfo.InstanceStatus currentRemoteInstanceStatus = null;  6: if (instanceInfo.getAppName() != null) {  7: Application app = getApplication(instanceInfo.getAppName());  8: if (app != null) {  9: InstanceInfo remoteInstanceInfo = app.getByInstanceId(instanceInfo.getId()); 10: if (remoteInstanceInfo != null) { 11: currentRemoteInstanceStatus = remoteInstanceInfo.getStatus(); 12: } 13: } 14: } 15: if (currentRemoteInstanceStatus == null) { 16: currentRemoteInstanceStatus = InstanceInfo.InstanceStatus.UNKNOWN; 17: } 18:  19: // Notify if status changed 20: if (lastRemoteInstanceStatus != currentRemoteInstanceStatus) { 21: onRemoteStatusChanged(lastRemoteInstanceStatus, currentRemoteInstanceStatus); 22: lastRemoteInstanceStatus = currentRemoteInstanceStatus; 23: } 24: } 

    • 第 4 至 14 行 :从注册信息中获取当前应用在 Eureka-Server 的状态。
    • 第 19 至 23 行 :对比 本地缓存最新的 的当前应用实例在 Eureka-Server 的状态,若不同,更新 本地缓存 ( 注意,只更新该缓存变量,不更新本地当前应用实例的状态( instanceInfo.status ) ),触发 StatusChangeEvent 事件,事件监听器执行。目前 Eureka 未提供默认的该事件监听器。 #onRemoteStatusChanged(...) 实现代码如下:

      protected void onRemoteStatusChanged(InstanceInfo.InstanceStatus oldStatus, InstanceInfo.InstanceStatus newStatus){  fireEvent(new StatusChangeEvent(oldStatus, newStatus)); } 

    • TODO[0024] :client和server不同状态的原因

2.4.1 全量获取注册信息,并设置到本地缓存

调用 #getAndStoreFullRegistry() 方法, 全量 获取注册信息,并设置到本地缓存。下实现代码如下:

 1: private void getAndStoreFullRegistry() throws Throwable{  2: long currentUpdateGeneration = fetchRegistryGeneration.get();  3:   4: logger.info("Getting all instance registry info from the eureka server");  5:   6: // 全量获取注册信息  7: Applications apps = null;  8: EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null  9: ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) 10: : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); 11: if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { 12: apps = httpResponse.getEntity(); 13: } 14: logger.info("The response status is {}", httpResponse.getStatusCode()); 15:  16: // 设置到本地缓存 17: if (apps == null) { 18: logger.error("The application is null for some reason. Not storing this information"); 19: } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { 20: localRegionApps.set(this.filterAndShuffle(apps)); 21: logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode()); 22: } else { 23: logger.warn("Not updating applications as another thread is updating it already"); 24: } 25: } 

  • 第 6 至 14 行 : 全量 获取注册信息,实现代码如下:

    // AbstractJerseyEurekaHttpClient.java @Override public EurekaHttpResponse<Applications> getApplications(String... regions){  return getApplicationsInternal("apps/", regions); }  private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions){  ClientResponse response = null;  String regionsParamValue = null;  try {  WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);  if (regions != null && regions.length > 0) {  regionsParamValue = StringUtil.join(regions);  webResource = webResource.queryParam("regions", regionsParamValue);  }  Builder requestBuilder = webResource.getRequestBuilder();  addExtraHeaders(requestBuilder);  response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); // JSON   Applications applications = null;  if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {  applications = response.getEntity(Applications.class);  }  return anEurekaHttpResponse(response.getStatus(), Applications.class)  .headers(headersOf(response))  .entity(applications)  .build();  } finally {  if (logger.isDebugEnabled()) {  logger.debug("Jersey HTTP GET {}/{}?{}; statusCode={}",  serviceUrl, urlPath,  regionsParamValue == null ? "" : "regions=" + regionsParamValue,  response == null ? "N/A" : response.getStatus()  );  }  if (response != null) {  response.close();  }  } } 

    • 调用 AbstractJerseyEurekaHttpClient#getApplications(...) 方法,GET 请求 Eureka-Server 的 apps/ 接口,参数为 regions ,返回格式为 JSON ,实现 全量获取注册信息
  • 第 16 至 24 行 :设置到本地注册信息 缓存

    • 第 19 行 :TODO[0025] :并发更新的情况???
    • 第 20 行 :调用 #filterAndShuffle(...) 方法,根据配置 eureka.shouldFilterOnlyUpInstances = true ( 默认值 : true ) 过滤只保留状态为开启( UP )的应用实例,并 随机打乱 应用实例顺序。打乱后,实现调用应用服务的随机性。代码比较易懂,点击 链接 查看方法实现。

3. Eureka-Server 接收全量获取

3.1 接收全量获取请求

com.netflix.eureka.resources.ApplicationsResource ,处理 所有 应用的请求操作的 Resource ( Controller )。

注册应用实例信息的请求,映射 ApplicationsResource#getContainers() 方法,实现代码如下:

 1: @GET  2: public Response getContainers(@PathParam("version") String version, 3: @HeaderParam(HEADER_ACCEPT) String acceptHeader, 4: @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding, 5: @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept, 6: @Context UriInfo uriInfo, 7: @Nullable @QueryParam("regions") String regionsStr){  8: // TODO[0009]:RemoteRegionRegistry  9: boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty(); 10: String[] regions = null; 11: if (!isRemoteRegionRequested) { 12: EurekaMonitors.GET_ALL.increment(); 13: } else { 14: regions = regionsStr.toLowerCase().split(","); 15: Arrays.sort(regions); // So we don't have different caches for same regions queried in different order. 16: EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment(); 17: } 18:  19: // 判断是否可以访问 20: // Check if the server allows the access to the registry. The server can 21: // restrict access if it is not 22: // ready to serve traffic depending on various reasons. 23: if (!registry.shouldAllowAccess(isRemoteRegionRequested)) { 24: return Response.status(Status.FORBIDDEN).build(); 25: } 26:  27: // API 版本 28: CurrentRequestVersion.set(Version.toEnum(version)); 29:  30: // 返回数据格式 31: KeyType keyType = Key.KeyType.JSON; 32: String returnMediaType = MediaType.APPLICATION_JSON; 33: if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) { 34: keyType = Key.KeyType.XML; 35: returnMediaType = MediaType.APPLICATION_XML; 36: } 37:  38: // 响应缓存键( KEY ) 39: Key cacheKey = new Key(Key.EntityType.Application, 40: ResponseCacheImpl.ALL_APPS, 41: keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions 42: ); 43:  44: // 45: Response response; 46: if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) { 47: response = Response.ok(responseCache.getGZIP(cacheKey)) 48: .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE) 49: .header(HEADER_CONTENT_TYPE, returnMediaType) 50: .build(); 51: } else { 52: response = Response.ok(responseCache.get(cacheKey)) 53: .build(); 54: } 55: return response; 56: } 

  • 第 8 至 17 行 :TODO[0009]:RemoteRegionRegistry
  • 第 19 至 25 行 :Eureka-Server 启动完成,但是未处于就绪( Ready )状态,不接受请求全量应用注册信息的请求,例如,Eureka-Server 启动时,未能从其他 Eureka-Server 集群的节点获取到应用注册信息。
  • 第 27 至 28 行 :设置 API 版本号。 默认 最新 API 版本为 V2。实现代码如下:

    public enum Version {  V1, V2;   public static Version toEnum(String v){  for (Version version : Version.values()) {  if (version.name().equalsIgnoreCase(v)) {  return version;  }  }  //Defaults to v2  return V2;  } } 

  • 第 30 至 36 行 :设置返回数据格式,默认 JSON 。

  • 第 38 至 42 行 :创建响应缓存( ResponseCache ) 的键( KEY ),在详细解析。
  • 第 44 至 55 行 :从响应缓存读取 全量 注册信息,在详细解析。

3.2 响应缓存 ResponseCache

com.netflix.eureka.registry.ResponseCache ,响应缓存 接口 ,接口代码如下:

public interface ResponseCache{   String get(Key key);    byte[] getGZIP(Key key);    void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress);   AtomicLong getVersionDelta();    AtomicLong getVersionDeltaWithRegions();  } 

  • 其中, #getVersionDelta()#getVersionDeltaWithRegions() 已经废弃。这里保留的原因主要是考虑兼容性。判断依据来自如下代码:

    // Applications.java @Deprecated public void setVersion(Long version){  this.versionDelta = version; }  // AbstractInstanceRegistry.java public Applications getApplicationDeltas(){  // ... 省略其它无关代码  apps.setVersion(responseCache.getVersionDelta().get()); // 唯一调用到 ResponseCache#getVersionDelta() 方法的地方  // ... 省略其它无关代码 } 

  • #get() :获得缓存。

  • #getGZIP() :获得缓存,并 GZIP 。
  • #invalidate() :过期缓存。

3.2.1 缓存键

com.netflix.eureka.registry.Key ,缓存键。实现代码如下:

public class Key{   public enum KeyType {  JSON, XML  }   /** * An enum to define the entity that is stored in this cache for this key. */  public enum EntityType {  Application, VIP, SVIP  }   /** * 实体名 */  private final String entityName;  /** * TODO[0009]:RemoteRegionRegistry */  private final String[] regions;  /** * 请求参数类型 */  private final KeyType requestType;  /** * 请求 API 版本号 */  private final Version requestVersion;  /** * hashKey */  private final String hashKey;  /** * 实体类型 * * {@link EntityType} */  private final EntityType entityType;  /** * {@link EurekaAccept} */  private final EurekaAccept eurekaAccept;    public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions){  this.regions = regions;  this.entityType = entityType;  this.entityName = entityName;  this.requestType = type;  this.requestVersion = v;  this.eurekaAccept = eurekaAccept;  hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "")  + requestType.name() + requestVersion.name() + this.eurekaAccept.name();  }    public Key(EntityType entityType, String entityName, KeyType type, Version v, EurekaAccept eurekaAccept, @Nullable String[] regions){  this.regions = regions;  this.entityType = entityType;  this.entityName = entityName;  this.requestType = type;  this.requestVersion = v;  this.eurekaAccept = eurekaAccept;  hashKey = this.entityType + this.entityName + (null != this.regions ? Arrays.toString(this.regions) : "")  + requestType.name() + requestVersion.name() + this.eurekaAccept.name();  }    @Override  public int hashCode(){  String hashKey = getHashKey();  return hashKey.hashCode();  }   @Override  public boolean equals(Object other){  if (other instanceof Key) {  return getHashKey().equals(((Key) other).getHashKey());  } else {  return false;  }  }   } 

3.2.2 响应缓存实现类

com.netflix.eureka.registry.ResponseCacheImpl ,响应缓存实现类。

在 ResponseCacheImpl 里,将缓存拆分成两层 :

  • 只读缓存 ( readOnlyCacheMap )
  • 固定过期 + 固定大小读写缓存 ( readWriteCacheMap )

默认配置下, 缓存读取策略 如下:

Eureka 源码解析 —— 应用实例注册发现(六)之全量获取

缓存过期策略如下:

  • 应用实例注册、下线、过期时, 只只只 过期 readWriteCacheMap
  • readWriteCacheMap 写入一段时间( 可配置 )后自动过期。
  • 定时 任务对比 readWriteCacheMapreadOnlyCacheMap 的缓存值,若不一致,以前者为主。通过这样的方式,实现了 readOnlyCacheMap 的定时过期。

注意:应用实例注册、下线、过期时,不会很快刷新到 readWriteCacheMap 缓存里。默认配置下,最大延迟在 30 秒。

为什么可以使用缓存?

CAP 的选择上,Eureka 选择了 AP ,不同于 Zookeeper 选择了 CP 。

推荐阅读:

3.3 缓存读取

调用 ResponseCacheImpl#get(...) 方法( #getGzip(...) 类似 ),读取缓存,实现代码如下:

 1: private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();  2:   3: private final LoadingCache<Key, Value> readWriteCacheMap;  4:   5: public String get(final Key key){  6: return get(key, shouldUseReadOnlyResponseCache);  7: }  8:   9: String get(final Key key, boolean useReadOnlyCache){ 10: Value payload = getValue(key, useReadOnlyCache); 11: if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) { 12: return null; 13: } else { 14: return payload.getPayload(); 15: } 16: } 17:  18: Value getValue(final Key key, boolean useReadOnlyCache){ 19: Value payload = null; 20: try { 21: if (useReadOnlyCache) { 22: final Value currentPayload = readOnlyCacheMap.get(key); 23: if (currentPayload != null) { 24: payload = currentPayload; 25: } else { 26: payload = readWriteCacheMap.get(key); 27: readOnlyCacheMap.put(key, payload); 28: } 29: } else { 30: payload = readWriteCacheMap.get(key); 31: } 32: } catch (Throwable t) { 33: logger.error("Cannot get value for key :" + key, t); 34: } 35: return payload; 36: } 

  • 第 5 至 7 行 :调用 #get(key, useReadOnlyCache) 方法,读取缓存。其中 shouldUseReadOnlyResponseCache 通过配置 eureka.shouldUseReadOnlyResponseCache = true (默认值 : true ) 开启只读缓存。如果你对数据的一致性有相对高的要求,可以关闭这个开关,当然因为少了 readOnlyCacheMap ,性能会有一定的下降。
  • 第 9 至 16 行 :调用 getValue(key, useReadOnlyCache) 方法,读取缓存。从 readOnlyCacheMapreadWriteCacheMap 变量可以看到缓存值的类为 com.netflix.eureka.registry.ResponseCacheImpl.Value ,实现代码如下:

    public class Value{   /** * 原始值 */  private final String payload;  /** * GZIP 压缩后的值 */  private byte[] gzipped;   public Value(String payload){  this.payload = payload;  if (!EMPTY_PAYLOAD.equals(payload)) {  // ... 省略 GZIP 压缩代码  gzipped = bos.toByteArray();  } else {  gzipped = null;  }  }   public String getPayload(){  return payload;  }   public byte[] getGzipped() {  return gzipped;  }  } 

  • 第 21 至 31 行 :读取缓存。

    • 第 21 至 28 行 :先读取 readOnlyCacheMap 。读取不到,读取 readWriteCacheMap ,并设置到 readOnlyCacheMap
    • 第 29 至 31 行 :读取 readWriteCacheMap
    • readWriteCacheMap 实现代码如下:

      this.readWriteCacheMap =  CacheBuilder.newBuilder().initialCapacity(1000)  .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)  .removalListener(new RemovalListener<Key, Value>() {  @Override  public void onRemoval(RemovalNotification<Key, Value> notification){  // TODO[0009]:RemoteRegionRegistry  Key removedKey = notification.getKey();  if (removedKey.hasRegions()) {  Key cloneWithNoRegions = removedKey.cloneWithoutRegions();  regionSpecificKeys.remove(cloneWithNoRegions, removedKey);  }  }  })  .build(new CacheLoader<Key, Value>() {  @Override  public Value load(Key key) throws Exception{  // // TODO[0009]:RemoteRegionRegistry  if (key.hasRegions()) {  Key cloneWithNoRegions = key.cloneWithoutRegions();  regionSpecificKeys.put(cloneWithNoRegions, key);  }  Value value = generatePayload(key);  return value;  }  }); 

      • readWriteCacheMap 最大缓存数量为 1000 。
      • 调用 #generatePayload(key) 方法,生成缓存值。
  • #generatePayload(key) 方法,实现代码如下:

     1: private Value generatePayload(Key key){  2: Stopwatch tracer = null;  3: try {  4: String payload;  5: switch (key.getEntityType()) {  6: case Application:  7: boolean isRemoteRegionRequested = key.hasRegions();  8:   9: if (ALL_APPS.equals(key.getName())) { 10: if (isRemoteRegionRequested) { // TODO[0009]:RemoteRegionRegistry 11: tracer = serializeAllAppsWithRemoteRegionTimer.start(); 12: payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions())); 13: } else { 14: tracer = serializeAllAppsTimer.start(); 15: payload = getPayLoad(key, registry.getApplications()); 16: } 17: } else if (ALL_APPS_DELTA.equals(key.getName())) { 18: // ... 省略增量获取相关的代码 19: } else { 20: tracer = serializeOneApptimer.start(); 21: payload = getPayLoad(key, registry.getApplication(key.getName())); 22: } 23: break; 24: // ... 省略部分代码 25: } 26: return new Value(payload); 27: } finally { 28: if (tracer != null) { 29: tracer.stop(); 30: } 31: } 32: } 

    • 第 10 至 12 行 :TODO[0009]:RemoteRegionRegistry
    • 第 13 至 16 行 :调用 AbstractInstanceRegistry#getApplications() 方法,获得注册的应用集合。后调用 #getPayLoad() 方法,将注册的应用集合转换成缓存值。�� 这两个方法代码较多,下面详细解析。
    • 第 17 至 18 行 :获取增量注册信息的缓存值,在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。

3.3.1 获得注册的应用集合

调用 AbstractInstanceRegistry#getApplications() 方法,获得注册的应用集合,实现代码如下:

 1: // AbstractInstanceRegistry.java  2:   3: private static final String[] EMPTY_STR_ARRAY = new String[0];  4:   5: public Applications getApplications(){  6: boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion();  7: if (disableTransparentFallback) { // TODO[0009]:RemoteRegionRegistry  8: return getApplicationsFromLocalRegionOnly();  9: } else { 10: return getApplicationsFromAllRemoteRegions(); // Behavior of falling back to remote region can be disabled. 11: } 12: } 13:  14: public Applications getApplicationsFromLocalRegionOnly(){ 15: return getApplicationsFromMultipleRegions(EMPTY_STR_ARRAY); 16: } 

  • 第 6 至 8 行 :TODO[0009]:RemoteRegionRegistry
  • 第 9 至 16 行 :调用 #getApplicationsFromMultipleRegions(...) 方法,获得注册的应用集合,实现代码如下:

     1: public Applications getApplicationsFromMultipleRegions(String[] remoteRegions){  2: // TODO[0009]:RemoteRegionRegistry  3: boolean includeRemoteRegion = null != remoteRegions && remoteRegions.length != 0;  4: logger.debug("Fetching applications registry with remote regions: {}, Regions argument {}",  5: includeRemoteRegion, Arrays.toString(remoteRegions));  6: if (includeRemoteRegion) {  7: GET_ALL_WITH_REMOTE_REGIONS_CACHE_MISS.increment();  8: } else {  9: GET_ALL_CACHE_MISS.increment(); 10: } 11: // 获得获得注册的应用集合 12: Applications apps = new Applications(); 13: apps.setVersion(1L); 14: for (Entry<String, Map<String, Lease<InstanceInfo>>> entry : registry.entrySet()) { 15: Application app = null; 16:  17: if (entry.getValue() != null) { 18: for (Entry<String, Lease<InstanceInfo>> stringLeaseEntry : entry.getValue().entrySet()) { 19: Lease<InstanceInfo> lease = stringLeaseEntry.getValue(); 20: if (app == null) { 21: app = new Application(lease.getHolder().getAppName()); 22: } 23: app.addInstance(decorateInstanceInfo(lease)); 24: } 25: } 26: if (app != null) { 27: apps.addApplication(app); 28: } 29: } 30: // TODO[0009]:RemoteRegionRegistry 31: if (includeRemoteRegion) { 32: for (String remoteRegion : remoteRegions) { 33: RemoteRegionRegistry remoteRegistry = regionNameVSRemoteRegistry.get(remoteRegion); 34: if (null != remoteRegistry) { 35: Applications remoteApps = remoteRegistry.getApplications(); 36: for (Application application : remoteApps.getRegisteredApplications()) { 37: if (shouldFetchFromRemoteRegistry(application.getName(), remoteRegion)) { 38: logger.info("Application {} fetched from the remote region {}", 39: application.getName(), remoteRegion); 40:  41: Application appInstanceTillNow = apps.getRegisteredApplications(application.getName()); 42: if (appInstanceTillNow == null) { 43: appInstanceTillNow = new Application(application.getName()); 44: apps.addApplication(appInstanceTillNow); 45: } 46: for (InstanceInfo instanceInfo : application.getInstances()) { 47: appInstanceTillNow.addInstance(instanceInfo); 48: } 49: } else { 50: logger.debug("Application {} not fetched from the remote region {} as there exists a " 51: + "whitelist and this app is not in the whitelist.", 52: application.getName(), remoteRegion); 53: } 54: } 55: } else { 56: logger.warn("No remote registry available for the remote region {}", remoteRegion); 57: } 58: } 59: } 60: // 设置 应用集合 hashcode 61: apps.setAppsHashCode(apps.getReconcileHashCode()); 62: return apps; 63: } 

    • 第 2 至 第 10 行 :TODO[0009]:RemoteRegionRegistry
    • 第 11 至 29 行 :获得获得注册的应用集合。
    • 第 30 至 59 行 :TODO[0009]:RemoteRegionRegistry
    • 第 61 行 :计算应用集合 hashcode 。该变量用于校验 增量 获取的注册信息和 Eureka-Server 全量 的注册信息是否一致( 完整 ),在 《Eureka 源码解析 —— 应用实例注册发现 (七)之增量获取》 详细解析。

3.3.2 转换成缓存值

调用 #getPayLoad() 方法,将注册的应用集合转换成缓存值,实现代码如下:

/** * Generate pay load with both JSON and XML formats for all applications. */ private String getPayLoad(Key key, Applications apps){  // 获得编码器  EncoderWrapper encoderWrapper = serverCodecs.getEncoder(key.getType(), key.getEurekaAccept());  String result;  try {  // 编码  result = encoderWrapper.encode(apps);  } catch (Exception e) {  logger.error("Failed to encode the payload for all apps", e);  return "";  }  if(logger.isDebugEnabled()) {  logger.debug("New application cache entry {} with apps hashcode {}", key.toStringCompact(), apps.getAppsHashCode());  }  return result; } 

3.4 主动过期读写缓存

应用实例注册、下线、过期时,调用 ResponseCacheImpl#invalidate() 方法,主动过期读写缓存( readWriteCacheMap ),实现代码如下:

public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress){  for (Key.KeyType type : Key.KeyType.values()) {  for (Version v : Version.values()) {  invalidate(  new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full),  new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact),  new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full),  new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact),  new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full),  new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact)  );  if (null != vipAddress) {  invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full));  }  if (null != secureVipAddress) {  invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full));  }  }  } } 

  • 调用 #invalidate(keys) 方法,逐个过期每个缓存键值,实现代码如下:

    public void invalidate(Key... keys){  for (Key key : keys) {  logger.debug("Invalidating the response cache key : {} {} {} {}, {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());  // 过期读写缓存  readWriteCacheMap.invalidate(key);  // TODO[0009]:RemoteRegionRegistry  Collection<Key> keysWithRegions = regionSpecificKeys.get(key);  if (null != keysWithRegions && !keysWithRegions.isEmpty()) {  for (Key keysWithRegion : keysWithRegions) {  logger.debug("Invalidating the response cache key : {} {} {} {} {}",  key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept());  readWriteCacheMap.invalidate(keysWithRegion);  }  }  } } 

3.5 被动过期读写缓存

读写缓存( readWriteCacheMap ) 写入后,一段时间自动过期,实现代码如下:

expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds()) 

  • 配置 eureka.responseCacheAutoExpirationInSeconds ,设置写入过期时长。默认值 :180 秒。

3.6 定时刷新只读缓存

定时任务对比 readWriteCacheMapreadOnlyCacheMap 的缓存值,若不一致,以前者为主。通过这样的方式,实现了 readOnlyCacheMap 的定时过期。实现代码如下:

 1: ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {  2: // ... 省略无关代码  3:   4: long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();  5: // ... 省略无关代码  6:   7: if (shouldUseReadOnlyResponseCache) {  8: timer.schedule(getCacheUpdateTask(),  9: new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) 10: + responseCacheUpdateIntervalMs), 11: responseCacheUpdateIntervalMs); 12: } 13:  14: // ... 省略无关代码 15: } 16:  17: private TimerTask getCacheUpdateTask(){ 18: return new TimerTask() { 19: @Override 20: public void run(){ 21: logger.debug("Updating the client cache from response cache"); 22: for (Key key : readOnlyCacheMap.keySet()) { // 循环 readOnlyCacheMap 的缓存键 23: if (logger.isDebugEnabled()) { 24: Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()}; 25: logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args); 26: } 27: try { 28: CurrentRequestVersion.set(key.getVersion()); 29: Value cacheValue = readWriteCacheMap.get(key); 30: Value currentCacheValue = readOnlyCacheMap.get(key); 31: if (cacheValue != currentCacheValue) { // 不一致时,进行替换 32: readOnlyCacheMap.put(key, cacheValue); 33: } 34: } catch (Throwable th) { 35: logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th); 36: } 37: } 38: } 39: }; 40: } 

  • 第 7 至 12 行 :初始化定时任务。配置 eureka.responseCacheUpdateIntervalMs ,设置任务执行频率,默认值 :30 * 1000 毫秒。
  • 第 17 至 39 行 :创建定时任务。
    • 第 22 行 :循环 readOnlyCacheMap 的缓存键。 为什么不循环 readWriteCacheMapreadOnlyCacheMap 的缓存过期依赖 readWriteCacheMap ,因此缓存键会更多。
    • 第 28 行 至 33 行 :对比 readWriteCacheMapreadOnlyCacheMap 的缓存值,若不一致,以前者为主。通过这样的方式,实现了 readOnlyCacheMap 的定时过期。

666. 彩蛋

比预期,比想想,长老多老多的一篇文章。细思极恐。

估计下一篇增量获取会简介很多。

胖友,分享我的公众号( 芋道源码 ) 给你的胖友可好?

原文链接:Eureka 源码解析 —— 应用实例注册发现(六)之全量获取,转载请注明来源!

0