诘琅 发表于 2025-6-2 22:44:59

Nacos源码—2.Nacos服务注册发现分析二

大纲
5.服务发现—服务之间的调用请求链路分析
6.服务端如何维护不健康的微服务实例
7.服务下线时涉及的处理
8.服务注册发现总结
 
5.服务发现—服务之间的调用请求链路分析
(1)微服务通过Nacos完成服务调用的请求流程
(2)Nacos客户端进行服务发现的源码
(3)Nacos服务端进行服务查询的源码
(4)总结
 
(1)微服务通过Nacos完成服务调用的请求流程
按照Nacos使用简介里的案例:订单服务和库存服务完成Nacos注册后,会通过Feign来完成服务间的调用。如下图示:
步骤一:首先每个客户端都会有一个微服务本地缓存列表,这个缓存列表会定时从注册中心获取最新的列表来更新本地缓存。
 
步骤二:然后当order-service需要调用stock-service时,order-service会先根据服务名称去本地缓存列表中找对应的微服务实例。但通过服务名称可能会找到多个,所以需要负载均衡选择其中一个。
 
步骤三:最后把服务名称更换为IP + Port,通过Feign发起HTTP调用获取返回结果。
 
(2)Nacos客户端进行服务发现的源码
一.nacos-discovery通过引入Ribbon实现服务调用时的负载均衡
二.nacos-discovery如何整合Ribbon实现服务调用时的负载均衡
三.nacos-client如何进行服务发现
 
一.nacos-discovery通过引入Ribbon实现服务调用时的负载均衡
Nacos客户端就是引入了nacos-discovery + nacos-client依赖的项目。由于nacos-discovery整合了Ribbon,所以Ribbon可以调用Nacos服务端的服务实例查询列表接口。于是Nacos客户端便借助Ribbon实现了服务调用时的负载均衡,也就是Ribbon会从服务实例列表中选择一个服务实例给客户端进行服务调用。
 
在nacos-discovery的pom.xml中,可以看到它引入了Ribbon依赖:
二.nacos-discovery如何整合Ribbon实现服务调用时的负载均衡
在Ribbon中会有一个ServerList接口,如下所示:这就是一个扩展接口,这个接口的作用就是获取Server列表。然后nacos-discovery会针对这个接口进行实现,从而整合Ribbon。从引入的包来看:loadbalancer是属于Ribbon源码包下的,而LoadBalancer则是Ribbon中的负载均衡器。负载均衡器会结合IRule负载均衡策略,从服务实例列表中选择一个实例。
package com.netflix.loadbalancer;

import java.util.List;

//Interface that defines the methods sed to obtain the List of Servers
public interface ServerList<T extends Server> {
    public List<T> getInitialListOfServers();

    //Return updated list of servers. This is called say every 30 secs
    public List<T> getUpdatedListOfServers();
}当Nacos客户端进行微服务调用时,会通过Ribbon来选出一个微服务实例。也就是Ribbon会通过调用NacosServerList的getUpdatedListOfServers()方法选出一个微服务实例。
 
nacos-discovery的NacosServerList类继承了AbstractServerList类,而且实现了Ribbon的ServerList接口的两个方法,如下所示:
public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware {
    ...
    ...
}

public class NacosServerList extends AbstractServerList<NacosServer> {
    private NacosDiscoveryProperties discoveryProperties;
    private String serviceId;

    public NacosServerList(NacosDiscoveryProperties discoveryProperties) {
      this.discoveryProperties = discoveryProperties;
    }

    @Override
    public List<NacosServer> getInitialListOfServers() {
      return getServers();
    }

    @Override
    public List<NacosServer> getUpdatedListOfServers() {
      return getServers();
    }

    private List<NacosServer> getServers() {
      try {
            //读取分组
            String group = discoveryProperties.getGroup();
            //通过服务名称、分组、true(表示只需要健康实例),
            //调用NacosNamingService.selectInstances()方法来查询服务实例列表
            List<Instance> instances = discoveryProperties.namingServiceInstance().selectInstances(serviceId, group, true);
            //把Instance转换成NacosServer类型
            return instancesToServerList(instances);
      } catch (Exception e) {
            throw new IllegalStateException("Can not get service instances from nacos, serviceId=" + serviceId, e);
      }
    }

    private List<NacosServer> instancesToServerList(List<Instance> instances) {
      List<NacosServer> result = new ArrayList<>();
      if (CollectionUtils.isEmpty(instances)) {
            return result;
      }
      for (Instance instance : instances) {
            result.add(new NacosServer(instance));
      }
      return result;
    }

    public String getServiceId() {
      return serviceId;
    }

    @Override
    public void initWithNiwsConfig(IClientConfig iClientConfig) {
      this.serviceId = iClientConfig.getClientName();
    }
}NacosServerList的核心方法是NacosServerList的getServers()方法,因为nacos-discovery实现Ribbon的两个接口都调用到了该方法。
 
在nacos-discovery的NacosServerList的getServers()方法中,会调用nacos-client的NacosNamingService的selectInstances()方法,来获取服务实例列表。
 
三.nacos-client如何进行服务发现
在nacos-client的NacosNamingService的selectInstances()方法中:首先会调用HostReactor的getServiceInfo()方法获取服务实例列表,然后调用HostReactor的getServiceInfo0()方法尝试从本地缓存获取,接着调用HostReactor的updateServiceNow()方法查询并更新缓存,也就是调用HostReactor的updateService()方法查询并更新缓存。即先调用NamingProxy的queryList()方法来查询服务端的服务实例列表,再调用HostReactor的processServiceJson()方法更新本地缓存。最后调用HostReactor的scheduleUpdateIfAbsent()方法提交同步缓存任务。
 
所以nacos-client的HostReactor的getServiceInfo()方法是服务发现的核心,它会先到本地缓存中去查询对应的服务实例列表。如果本地缓存查不到对应的服务数据,则到服务端去查询服务实例列表。当获取完服务实例列表后,会向调度线程池提交一个延迟执行的任务,在延迟任务中会执行UpdateTask任务的run()方法。
 
UpdateTask任务的run()方法:会调用updateService()方法查询服务实例列表并更新本地缓存。当该任务执行完毕时,会继续向调度线程池提交一个延迟执行的任务,从而实现不断重复地更新本地缓存的服务实例列表。
public class NacosNamingService implements NamingService {
    private HostReactor hostReactor;
    ...

    @Override
    public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException {
      return selectInstances(serviceName, groupName, healthy, true);
    }

    @Override
    public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe) throws NacosException {
      return selectInstances(serviceName, groupName, new ArrayList<String>(), healthy, subscribe);
    }

    @Override
    public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {
      ServiceInfo serviceInfo;
      //这个参数传入默认就是true
      if (subscribe) {
            serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
      } else {
            serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
      }
      return selectInstances(serviceInfo, healthy);
    }
    ...
}

public class HostReactor implements Closeable {
    //服务实例列表的本地缓存
    private final Map<String, ServiceInfo> serviceInfoMap;
    private final Map<String, Object> updatingMap;
    private final NamingProxy serverProxy;
    private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>();
    private final ScheduledExecutorService executor;
    ...

    public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
      NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
      String key = ServiceInfo.getKey(serviceName, clusters);
      if (failoverReactor.isFailoverSwitch()) {
            return failoverReactor.getService(key);
      }
      //先查询本地缓存中的服务实例列表
      ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);

      //如果本地缓存实例列表为空
      if (null == serviceObj) {
            serviceObj = new ServiceInfo(serviceName, clusters);
            serviceInfoMap.put(serviceObj.getKey(), serviceObj);
            updatingMap.put(serviceName, new Object());
            //调用Nacos服务端的服务实例列表查询接口,立即更新Service数据
            updateServiceNow(serviceName, clusters);
            updatingMap.remove(serviceName);
      } else if (updatingMap.containsKey(serviceName)) {
            if (UPDATE_HOLD_INTERVAL > 0) {
                //hold a moment waiting for update finish
                synchronized (serviceObj) {
                  try {
                        serviceObj.wait(UPDATE_HOLD_INTERVAL);
                  } catch (InterruptedException e) {
                        NAMING_LOGGER.error(" serviceName:" + serviceName + ", clusters:" + clusters, e);
                  }
                }
            }
      }
      //开启定时任务,维护本地缓存
      scheduleUpdateIfAbsent(serviceName, clusters);
      //最后从本地缓存中,获取服务实例列表数据
      return serviceInfoMap.get(serviceObj.getKey());
    }

    private ServiceInfo getServiceInfo0(String serviceName, String clusters) {
      String key = ServiceInfo.getKey(serviceName, clusters);
      //从本地缓存中获取服务实例列表
      return serviceInfoMap.get(key);
    }

    private void updateServiceNow(String serviceName, String clusters) {
      try {
            updateService(serviceName, clusters);
      } catch (NacosException e) {
            NAMING_LOGGER.error(" failed to update serviceName: " + serviceName, e);
      }
    }

    //Update service now.
    public void updateService(String serviceName, String clusters) throws NacosException {
      ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
      try {
            //调用Nacos服务端的服务实例查询接口
            String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
            //如果结果不为空,则更新本地缓存
            if (StringUtils.isNotEmpty(result)) {
                //更新本地缓存
                processServiceJson(result);
            }
      } finally {
            if (oldService != null) {
                synchronized (oldService) {
                  oldService.notifyAll();
                }
            }
      }
    }
    ...

    public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
      if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
            return;
      }
      synchronized (futureMap) {
            if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
                return;
            }
            //向调度线程池提交一个延迟执行的任务
            ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
            futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
      }
    }

    public synchronized ScheduledFuture<?> addTask(UpdateTask task) {
      //向调度线程池提交一个延迟执行的任务
      return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
    }

    public class UpdateTask implements Runnable {
      long lastRefTime = Long.MAX_VALUE;
      private final String clusters;   
      private final String serviceName;
      private int failCount = 0;

      public UpdateTask(String serviceName, String clusters) {
            this.serviceName = serviceName;
            this.clusters = clusters;
      }

      private void incFailCount() {
            int limit = 6;
            if (failCount == limit) {
                return;
            }
            failCount++;
      }

      private void resetFailCount() {
            failCount = 0;
      }

      @Override
      public void run() {
            long delayTime = DEFAULT_DELAY;
            try {
                ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                //如果本地缓存为空
                if (serviceObj == null) {
                  updateService(serviceName, clusters);
                  return;
                }
                //lastRefTime是最大的Long型
                if (serviceObj.getLastRefTime() <= lastRefTime) {
                  updateService(serviceName, clusters);
                  serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                } else {
                  refreshOnly(serviceName, clusters);
                }
                lastRefTime = serviceObj.getLastRefTime();
                if (!notifier.isSubscribed(serviceName, clusters) && !futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
                  NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
                  return;
                }
                if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
                  incFailCount();
                  return;
                }
                delayTime = serviceObj.getCacheMillis();
                resetFailCount();
            } catch (Throwable e) {
                incFailCount();
                NAMING_LOGGER.warn(" failed to update serviceName: " + serviceName, e);
            } finally {
                //向调度线程池继续提交一个延迟执行的任务继续同步本地缓存
                executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
            }
      }
    }
}

public class NamingProxy implements Closeable {
    ...
    //向Nacos服务端发起HTTP形式的服务实例列表查询请求
    public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException {
      final Map<String, String> params = new HashMap<String, String>(8);
      params.put(CommonParams.NAMESPACE_ID, namespaceId);
      params.put(CommonParams.SERVICE_NAME, serviceName);
      params.put("clusters", clusters);
      params.put("udpPort", String.valueOf(udpPort));
      params.put("clientIP", NetUtils.localIP());
      params.put("healthyOnly", String.valueOf(healthyOnly));
      //通过HTTP的方式,请求"/nacos/v1/ns/instance/list"接口
      return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
    }
    ...
}二.服务端处理心跳请求的源码
服务端的InstanceController的beat()方法,会处理客户端发来的心跳请求。首先会尝试从ServiceManager的注册表中获取对应的Instance实例对象。如果在内存注册表中找不到对应的Instance实例对象,则直接调用ServiceManager的registerInstance()方法进行服务注册。
 
如果在内存注册表中可以找到对应的Instance实例对象,那么就从ServiceManager的注册表中取出对应的Service服务对象,这样后续对Service的Cluster的Instance进行修改时,就会修改到注册表数据。接着执行Service的processClientBeat()方法,该方法会提交一个异步任务ClientBeatProcessor给线程池,其中线程池的线程数是可用线程数的一半。
 
在ClientBeatProcessor的run()方法中:会先通过集群名找到所有的临时实例列表。然后通过for循环对这些临时实例进行IP + Port判断,找出对应的Instance实例对象。找出对应的Instance后,接着就会把Instance的lastBeat属性修改成当前时间,然后再判断当前Instance的状态是否不健康,若是则重新标记成健康状态。
//Instance operation controller.@RestController@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")public class InstanceController {    ...    //Create a beat for instance.    @CanDistro    @PutMapping("/beat")    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)    public ObjectNode beat(HttpServletRequest request) throws Exception {      ObjectNode result = JacksonUtils.createEmptyJsonNode();      result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());      String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);      //获取请求参数、namespaceId、serviceName      RsInfo clientBeat = null;      if (StringUtils.isNotBlank(beat)) {            clientBeat = JacksonUtils.toObj(beat, RsInfo.class);      }      String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);      String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);      int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));      if (clientBeat != null) {            if (StringUtils.isNotBlank(clientBeat.getCluster())) {                clusterName = clientBeat.getCluster();            } else {                clientBeat.setCluster(clusterName);            }            ip = clientBeat.getIp();            port = clientBeat.getPort();      }      String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);      String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);      NamingUtils.checkServiceNameFormat(serviceName);      Loggers.SRV_LOG.debug(" full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);      //通过命令空间、服务名等信息,从ServiceManager内存注册表中获取instance实例对象      Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);      //如果获取实例为空,则会重新调用服务注册的方法ServiceManager.registerInstance()      if (instance == null) {            if (clientBeat == null) {                result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);                return result;            }            Loggers.SRV_LOG.warn(" The instance has been removed for health mechanism, " + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);            instance = new Instance();            instance.setPort(clientBeat.getPort());            instance.setIp(clientBeat.getIp());            instance.setWeight(clientBeat.getWeight());            instance.setMetadata(clientBeat.getMetadata());            instance.setClusterName(clusterName);            instance.setServiceName(serviceName);            instance.setInstanceId(instance.getInstanceId());            instance.setEphemeral(clientBeat.isEphemeral());            //重新注册服务实例            serviceManager.registerInstance(namespaceId, serviceName, instance);      }      //从ServiceManager内存注册表中获取服务Service,后续对Service中的Cluster的Instance修改,便会修改到注册表      Service service = serviceManager.getService(namespaceId, serviceName);      if (service == null) {            throw new NacosException(NacosException.SERVER_ERROR, "service not found: " + serviceName + "@" + namespaceId);      }      if (clientBeat == null) {            clientBeat = new RsInfo();            clientBeat.setIp(ip);            clientBeat.setPort(port);            clientBeat.setCluster(clusterName);      }      //提交客户端服务实例的心跳健康检查任务,更改lastBeat属性      service.processClientBeat(clientBeat);      result.put(CommonParams.CODE, NamingResponseCode.OK);      if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {            result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());      }      result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());      return result;    }    ...}@JsonInclude(Include.NON_NULL)public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener {    ...    public void processClientBeat(final RsInfo rsInfo) {      ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();      clientBeatProcessor.setService(this);      clientBeatProcessor.setRsInfo(rsInfo);      //立即执行      HealthCheckReactor.scheduleNow(clientBeatProcessor);    }    ...}//Health check reactor.@SuppressWarnings("PMD.ThreadPoolCreationRule")public class HealthCheckReactor {    ...    //Schedule client beat check task without a delay.    public static ScheduledFuture scheduleNow(Runnable task) {      //提交任务到线程池立即执行      return GlobalExecutor.scheduleNamingHealth(task, 0, TimeUnit.MILLISECONDS);    }    ...}public class GlobalExecutor {    public static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors()

县挫伪 发表于 2025-10-30 04:51:19

分享、互助 让互联网精神温暖你我

艾曼语 发表于 2025-12-1 01:35:27

这个好,看起来很实用

何书艺 发表于 2025-12-3 23:20:20

yyds。多谢分享

煅圆吧 发表于 4 小时前

收藏一下   不知道什么时候能用到
页: [1]
查看完整版本: Nacos源码—2.Nacos服务注册发现分析二