找回密码
 立即注册
首页 业界区 业界 Nacos源码—6.Nacos升级gRPC分析一

Nacos源码—6.Nacos升级gRPC分析一

圣罩 2025-6-2 23:15:01
大纲
1.Nacos 2.x版本的一些变化
2.客户端升级gRPC发起服务注册
3.服务端进行服务注册时的处理
4.客户端服务发现和服务端处理服务订阅的源码分析
 
1.Nacos 2.x版本的一些变化
变化一:客户端和服务端的交互方式由HTTP升级为gRPC
Nacos 1.x服务端会提供一系列的HTTP接口供客户端请求调用,Nacos 2.x服务端会定义一些列Handler处理类来处理客户端的gRPC请求。
 
Nacos 1.x进行服务注册时使用的是HTTP短连接,Nacos 2.x进行服务注册时使用的是RPC长连接。
 
变化二:注册中心的注册表由双重Map结构变为轻量的一个Map
Nacos 1.x版本中的注册中心是使用双重Map结构来存储注册表的,这样使得注册表还是比较重量级的,在并发高时也需要考虑并发冲突。
 
Nacos 2.x版本则把注册表轻量化了,服务端在处理服务注册时,只是简单地往一个Map记写入客户端连接的ID。
 
变化三:大量使用了事件驱动
Nacos 2.x版本的Nacos里,使用了非常多的事件驱动。比如服务注册、服务销毁、服务变更等都先通过通知中心来发布一个事件,然后通过处理事件来来处理后续的逻辑流程。
 
2.客户端升级gRPC发起服务注册
(1)客户端和服务端的版本选择
(2)Nacos客户端项目启动时自动触发服务实例注册
(3)Nacos客户端通过gRPC方式发起服务实例注册
(4)总结
 
(1)客户端和服务端的版本选择
  1. <properties>
  2.     <java.version>1.8</java.version>
  3.     <spring-cloud.version>Hoxton.SR12</spring-cloud.version>
  4.     <spring-cloud-alibaba.version>2.2.8.RELEASE</spring-cloud-alibaba.version>
  5. </properties>
复制代码
(2)Nacos客户端项目启动时自动触发服务实例注册
spring-cloud-starter-alibaba-nacos-discovery依赖包会自动注册服务。查看这个依赖包中的spring.factories文件,会指定一些Configuration类。Spring Boot启动时会扫描spring.factories文件,然后创建里面的配置类。
 
在spring.pactories文件中,与注册相关的类就是:NacosServiceRegistryAutoConfiguration这个Nacos服务注册自动配置类。
1.png
Nacos服务注册自动配置类NacosServiceRegistryAutoConfiguration如下,该配置类创建了三个Bean:
 
第一个Bean:NacosServiceRegistry
这个Bean在创建时,会传入加载了yml配置文件内容的类NacosDiscoveryProperties。
 
第二个Bean:NacosRegistration
这个Bean在创建时,会传入加载了yml配置文件内容的类NacosDiscoveryProperties。
 
第三个Bean:NacosAutoServiceRegistration
这个Bean在创建时,会传入NacosServiceRegistry和NacosRegistration两个Bean,然后该Bean继承了AbstractAutoServiceRegistration抽象类。该抽象类实现了ApplicationListener接口,所以项目启动时便是利用了Spring的监听事件来实现自动注册服务的。因为在Spring容器启动的最后会执行finishRefresh()方法,然后会发布一个事件,该事件会触发调用onApplicationEvent()方法。
 
调用AbstractAutoServiceRegistration的onApplicationEvent()方法时,首先会调用AbstractAutoServiceRegistration的bind()方法,然后调用AbstractAutoServiceRegistration的start()方法,接着调用AbstractAutoServiceRegistration的register()方法发起注册,也就是调用this.serviceRegistry的register()方法完成服务注册的具体工作。
 
而AbstractAutoServiceRegistration的serviceRegistry属性,是在服务注册自动配置类NacosServiceRegistryAutoConfiguration,创建第三个Bean—NacosAutoServiceRegistration时,通过传入其创建的第一个Bean—NacosServiceRegistry进行赋值的。
  1. @Configuration(proxyBeanMethods = false)
  2. @EnableConfigurationProperties
  3. @ConditionalOnNacosDiscoveryEnabled
  4. @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
  5. @AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class, NacosDiscoveryAutoConfiguration.class })
  6. public class NacosServiceRegistryAutoConfiguration {
  7.     @Bean
  8.     public NacosServiceRegistry nacosServiceRegistry(NacosServiceManager nacosServiceManager, NacosDiscoveryProperties nacosDiscoveryProperties) {
  9.         return new NacosServiceRegistry(nacosServiceManager, nacosDiscoveryProperties);
  10.     }
  11.     @Bean
  12.     @ConditionalOnBean(AutoServiceRegistrationProperties.class)
  13.     public NacosRegistration nacosRegistration(ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers, NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) {
  14.         return new NacosRegistration(registrationCustomizers.getIfAvailable(), nacosDiscoveryProperties, context);
  15.     }
  16.     @Bean
  17.     @ConditionalOnBean(AutoServiceRegistrationProperties.class)
  18.     public NacosAutoServiceRegistration nacosAutoServiceRegistration(NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) {
  19.         return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration);
  20.     }
  21. }
  22. public class NacosAutoServiceRegistration extends AbstractAutoServiceRegistration<Registration> {
  23.     ...
  24.     private NacosRegistration registration;
  25.     public NacosAutoServiceRegistration(ServiceRegistry<Registration> serviceRegistry,
  26.             AutoServiceRegistrationProperties autoServiceRegistrationProperties,
  27.             NacosRegistration registration) {
  28.         super(serviceRegistry, autoServiceRegistrationProperties);
  29.         this.registration = registration;
  30.     }
  31.     ...
  32. }
  33. public abstract class AbstractAutoServiceRegistration<R extends Registration>
  34.         implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {
  35.     ...
  36.     private final ServiceRegistry<R> serviceRegistry;
  37.     private AutoServiceRegistrationProperties properties;
  38.     protected AbstractAutoServiceRegistration(ServiceRegistry<R> serviceRegistry, AutoServiceRegistrationProperties properties) {
  39.         this.serviceRegistry = serviceRegistry;
  40.         this.properties = properties;
  41.     }
  42.     ...
  43.     @Override
  44.     @SuppressWarnings("deprecation")
  45.     public void onApplicationEvent(WebServerInitializedEvent event) {
  46.         bind(event);
  47.     }
  48.     public void bind(WebServerInitializedEvent event) {
  49.         ApplicationContext context = event.getApplicationContext();
  50.         if (context instanceof ConfigurableWebServerApplicationContext) {
  51.             if ("management".equals(((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {
  52.                 return;
  53.             }
  54.         }
  55.         this.port.compareAndSet(0, event.getWebServer().getPort());
  56.         this.start();
  57.     }
  58.     public void start() {
  59.         if (!isEnabled()) {
  60.             if (logger.isDebugEnabled()) {
  61.                 logger.debug("Discovery Lifecycle disabled. Not starting");
  62.             }
  63.             return;
  64.         }
  65.         if (!this.running.get()) {
  66.             this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));
  67.             //发起注册
  68.             register();
  69.             if (shouldRegisterManagement()) {
  70.                 registerManagement();
  71.             }
  72.             this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));
  73.             this.running.compareAndSet(false, true);
  74.         }
  75.     }
  76.     protected void register() {
  77.         //调用创建NacosAutoServiceRegistration时传入的NacosServiceRegistry实例的register()方法
  78.         this.serviceRegistry.register(getRegistration());
  79.     }
  80.     ...
  81. }
  82. public class NacosServiceRegistry implements ServiceRegistry<Registration> {
  83.     private final NacosDiscoveryProperties nacosDiscoveryProperties;
  84.     public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
  85.         this.nacosDiscoveryProperties = nacosDiscoveryProperties;
  86.     }
  87.     @Override
  88.     public void register(Registration registration) {
  89.         if (StringUtils.isEmpty(registration.getServiceId())) {
  90.             log.warn("No service to register for nacos client...");
  91.             return;
  92.         }
  93.         NamingService namingService = namingService();
  94.         //获取服务ID、分组
  95.         String serviceId = registration.getServiceId();
  96.         String group = nacosDiscoveryProperties.getGroup();
  97.         //创建Instance对象
  98.         Instance instance = getNacosInstanceFromRegistration(registration);
  99.         try {
  100.             //发起服务实例注册
  101.             namingService.registerInstance(serviceId, group, instance);
  102.             log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort());
  103.         } catch (Exception e) {
  104.             if (nacosDiscoveryProperties.isFailFast()) {
  105.                 log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e);
  106.                 rethrowRuntimeException(e);
  107.             } else {
  108.                 log.warn("Failfast is false. {} register failed...{},", serviceId, registration.toString(), e);
  109.             }
  110.         }
  111.     }
  112.     ...
  113. }
复制代码
总结:
Nacos客户端项目启动时自动触发注册服务实例的流程:Spring监听器调用onApplicationEvent() -> bind() -> start() -> register(),最后register()方法会调用serviceRegistry属性的register()方法进行注册。
 
整个流程具体来说就是:首先通过spring.factories文件,找到一个注册相关的Configuration配置类,这个配置类里面定义了三个Bean对象。创建第三个Bean对象时,需要第一个、第二个Bean对象作为参数传进去。第一个Bean对象里面就有真正进行服务注册的register()方法,并且第一个Bean对象会赋值给第三个Bean对象中的serviceRegistry属性,在第三个Bean对象的父类会实现Spring的监听器方法。所以在Spring容器启动时会发布监听事件,从而触发执行Nacos注册逻辑。
 
(3)Nacos客户端通过gRPC方式发起服务实例注册
nacos-client会提供接口给Nacos客户端调用来进行服务实例注册。
 
在NacosNamingService提供的服务注册接口registerInstance()中,会调用NamingClientProxyDelegate的registerService()方法来注册服务。此时会先调用NamingClientProxyDelegate的getExecuteClientProxy()方法,来判断要注册的服务实例是否为临时实例来获取gRPC代理还是HTTP代理。如果注册的是临时实例,则使用gRPC方式注册,否则用HTTP方式注册,然后再调用NamingGrpcClientProxy的registerService()方法注册服务。
 
在NamingGrpcClientProxy的registerService()方法中,则会调用NamingGrpcClientProxy的doRegisterService()方法执行注册。此时先根据要注册的服务信息创建一个InstanceRequest请求参数对象,然后调用NamingGrpcClientProxy的requestToServer()方法发出请求,也就是通过调用RpcClient的request()方法向Nacos服务端发出gRPC请求。
 
至于RpcClient的request()方法的底层实现,则是通过一个本地存根代理类grpcFutureServiceStub调用gRPC的接口,来实现向Nacos服务端发起RPC调用的。
  1. //Nacos Naming Service.
  2. @SuppressWarnings("PMD.ServiceOrDaoClassShouldEndWithImplRule")
  3. public class NacosNamingService implements NamingService {
  4.     ...
  5.     private NamingClientProxy clientProxy;
  6.     private void init(Properties properties) throws NacosException {
  7.         ...
  8.         //初始化clientProxy属性为NamingClientProxyDelegate对象实例
  9.         this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);
  10.     }
  11.     //register a instance to service with specified instance properties.
  12.     @Override
  13.     public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
  14.         NamingUtils.checkInstanceIsLegal(instance);
  15.         //调用NamingClientProxy的注册方法registerService(),其实就是NamingClientProxyDelegate.registerService()方法
  16.         clientProxy.registerService(serviceName, groupName, instance);
  17.     }
  18.     ...
  19. }
  20. //Delegate of naming client proxy.
  21. public class NamingClientProxyDelegate implements NamingClientProxy {
  22.     private final NamingHttpClientProxy httpClientProxy;
  23.     private final NamingGrpcClientProxy grpcClientProxy;
  24.     public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties, InstancesChangeNotifier changeNotifier) throws NacosException {
  25.         ...
  26.         this.httpClientProxy = new NamingHttpClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder);
  27.         this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder);
  28.     }
  29.     ...
  30.     @Override
  31.     public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
  32.         getExecuteClientProxy(instance).registerService(serviceName, groupName, instance);
  33.     }
  34.     private NamingClientProxy getExecuteClientProxy(Instance instance) {
  35.         return instance.isEphemeral() ? grpcClientProxy : httpClientProxy;
  36.     }
  37.     ...
  38. }
  39. //Naming grpc client proxy.
  40. public class NamingGrpcClientProxy extends AbstractNamingClientProxy {
  41.     private final String namespaceId;
  42.     private final String uuid;
  43.     private final Long requestTimeout;
  44.     private final RpcClient rpcClient;
  45.     private final NamingGrpcRedoService redoService;
  46.     public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory, Properties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {
  47.         super(securityProxy);
  48.         this.namespaceId = namespaceId;
  49.         this.uuid = UUID.randomUUID().toString();
  50.         this.requestTimeout = Long.parseLong(properties.getProperty(CommonParams.NAMING_REQUEST_TIMEOUT, "-1"));
  51.         Map<String, String> labels = new HashMap<String, String>();
  52.         labels.put(RemoteConstants.LABEL_SOURCE, RemoteConstants.LABEL_SOURCE_SDK);
  53.         labels.put(RemoteConstants.LABEL_MODULE, RemoteConstants.LABEL_MODULE_NAMING);
  54.         //通过RpcClientFactory.createClient()方法创建一个GrpcSdkClient对象实例,然后赋值给rpcClient属性
  55.         this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
  56.         this.redoService = new NamingGrpcRedoService(this);
  57.         start(serverListFactory, serviceInfoHolder);
  58.     }
  59.     private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
  60.         rpcClient.serverListFactory(serverListFactory);
  61.         rpcClient.registerConnectionListener(redoService);
  62.         rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
  63.         //调用RpcClient.start()方法建立与服务端的连接
  64.         rpcClient.start();
  65.         NotifyCenter.registerSubscriber(this);
  66.     }
  67.     ...
  68.     //Register a instance to service with specified instance properties.
  69.     @Override
  70.     public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
  71.         NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance {}", namespaceId, serviceName, instance);
  72.         redoService.cacheInstanceForRedo(serviceName, groupName, instance);
  73.         //执行服务实例的注册
  74.         doRegisterService(serviceName, groupName, instance);
  75.     }
  76.     //Execute register operation.
  77.     public void doRegisterService(String serviceName, String groupName, Instance instance) throws NacosException {
  78.         //创建请求参数对象InstanceRequest
  79.         InstanceRequest request = new InstanceRequest(namespaceId, serviceName, groupName, NamingRemoteConstants.REGISTER_INSTANCE, instance);
  80.         //向服务端发起请求
  81.         requestToServer(request, Response.class);
  82.         redoService.instanceRegistered(serviceName, groupName);
  83.     }
  84.     private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass) throws NacosException {
  85.         try {
  86.             request.putAllHeader(getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));
  87.             //实际会调用RpcClient.request()方法发起gRPC请求
  88.             Response response = requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
  89.             if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
  90.                 throw new NacosException(response.getErrorCode(), response.getMessage());
  91.             }
  92.             if (responseClass.isAssignableFrom(response.getClass())) {
  93.                 return (T) response;
  94.             }
  95.             NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'", response.getClass().getName(), responseClass.getName());
  96.         } catch (Exception e) {
  97.             throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);
  98.         }
  99.         throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
  100.     }
  101.     ...
  102. }
  103. //abstract remote client to connect to server.
  104. @SuppressWarnings("PMD.AbstractClassShouldStartWithAbstractNamingRule")
  105. public abstract class RpcClient implements Closeable {
  106.     protected volatile Connection currentConnection;
  107.     ...
  108.     public final void start() throws NacosException {
  109.         ...
  110.         // connect to server, try to connect to server sync RETRY_TIMES times, async starting if failed.
  111.         Connection connectToServer = null;
  112.         rpcClientStatus.set(RpcClientStatus.STARTING);
  113.         int startUpRetryTimes = RETRY_TIMES;
  114.         while (startUpRetryTimes > 0 && connectToServer == null) {
  115.             try {
  116.                 startUpRetryTimes--;
  117.                 ServerInfo serverInfo = nextRpcServer();
  118.                 LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Try to connect to server on start up, server: {}", name, serverInfo);
  119.                 //调用GrpcClient.connectToServer()方法建立和服务端的长连接
  120.                 connectToServer = connectToServer(serverInfo);
  121.             } catch (Throwable e) {
  122.                 LoggerUtils.printIfWarnEnabled(LOGGER, "[{}] Fail to connect to server on start up, error message = {}, start up retry times left: {}", name, e.getMessage(), startUpRetryTimes);
  123.             }
  124.         }
  125.         if (connectToServer != null) {
  126.             LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up, connectionId = {}", name, connectToServer.serverInfo.getAddress(), connectToServer.getConnectionId());
  127.             //currentConnection其实就是一个用于客户端的GrpcConnection对象实例
  128.             this.currentConnection = connectToServer;
  129.             rpcClientStatus.set(RpcClientStatus.RUNNING);
  130.             eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
  131.         } else {
  132.             switchServerAsync();
  133.         }
  134.         ...
  135.     }
  136.     //connect to server.
  137.     public abstract Connection connectToServer(ServerInfo serverInfo) throws Exception;
  138.     ...
  139.     //send request.
  140.     public Response request(Request request) throws NacosException {
  141.         return request(request, DEFAULT_TIMEOUT_MILLS);
  142.     }
  143.     //send request.
  144.     public Response request(Request request, long timeoutMills) throws NacosException {
  145.         int retryTimes = 0;
  146.         Response response;
  147.         Exception exceptionThrow = null;
  148.         long start = System.currentTimeMillis();
  149.         while (retryTimes < RETRY_TIMES && System.currentTimeMillis() < timeoutMills + start) {
  150.             boolean waitReconnect = false;
  151.             try {
  152.                 if (this.currentConnection == null || !isRunning()) {
  153.                     waitReconnect = true;
  154.                     throw new NacosException(NacosException.CLIENT_DISCONNECT, "Client not connected, current status:" + rpcClientStatus.get());
  155.                 }
  156.                 //发起gRPC请求,调用GrpcConnection.request()方法
  157.                 response = this.currentConnection.request(request, timeoutMills);
  158.                 if (response == null) {
  159.                     throw new NacosException(SERVER_ERROR, "Unknown Exception.");
  160.                 }
  161.                 if (response instanceof ErrorResponse) {
  162.                     if (response.getErrorCode() == NacosException.UN_REGISTER) {
  163.                         synchronized (this) {
  164.                             waitReconnect = true;
  165.                             if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
  166.                                 LoggerUtils.printIfErrorEnabled(LOGGER, "Connection is unregistered, switch server, connectionId = {}, request = {}", currentConnection.getConnectionId(), request.getClass().getSimpleName());
  167.                                 switchServerAsync();
  168.                             }
  169.                         }
  170.                     }
  171.                     throw new NacosException(response.getErrorCode(), response.getMessage());
  172.                 }
  173.                 //return response.
  174.                 lastActiveTimeStamp = System.currentTimeMillis();
  175.                 return response;
  176.             } catch (Exception e) {
  177.                 if (waitReconnect) {
  178.                     try {
  179.                         //wait client to reconnect.
  180.                         Thread.sleep(Math.min(100, timeoutMills / 3));
  181.                     } catch (Exception exception) {
  182.                         //Do nothing.
  183.                     }
  184.                 }
  185.                 LoggerUtils.printIfErrorEnabled(LOGGER, "Send request fail, request = {}, retryTimes = {}, errorMessage = {}", request, retryTimes, e.getMessage());
  186.                 exceptionThrow = e;
  187.             }
  188.             retryTimes++;
  189.         }
  190.         if (rpcClientStatus.compareAndSet(RpcClientStatus.RUNNING, RpcClientStatus.UNHEALTHY)) {
  191.             switchServerAsyncOnRequestFail();
  192.         }
  193.         if (exceptionThrow != null) {
  194.             throw (exceptionThrow instanceof NacosException) ? (NacosException) exceptionThrow : new NacosException(SERVER_ERROR, exceptionThrow);
  195.         } else {
  196.             throw new NacosException(SERVER_ERROR, "Request fail, unknown Error");
  197.         }
  198.     }
  199.     ...
  200. }
  201. //gRPC connection.
  202. public class GrpcConnection extends Connection {
  203.     //stub to send request.
  204.     //grpcFutureServiceStub属性会在NamingGrpcClientProxy初始化 -> 调用RpcClient.start() -> GrpcClient.connectToServer()时,
  205.     //通过GrpcConnection.setGrpcFutureServiceStub()方法进行设置
  206.     protected RequestGrpc.RequestFutureStub grpcFutureServiceStub;
  207.     ...
  208.     @Override
  209.     public Response request(Request request, long timeouts) throws NacosException {
  210.         Payload grpcRequest = GrpcUtils.convert(request);
  211.         //调用gRPC提供的接口发起请求,属于io.grpc包下的内容了
  212.         ListenableFuture<Payload> requestFuture = grpcFutureServiceStub.request(grpcRequest);
  213.         Payload grpcResponse;
  214.         try {
  215.             grpcResponse = requestFuture.get(timeouts, TimeUnit.MILLISECONDS);
  216.         } catch (Exception e) {
  217.             throw new NacosException(NacosException.SERVER_ERROR, e);
  218.         }
  219.         return (Response) GrpcUtils.parse(grpcResponse);
  220.     }
  221.     ...
  222. }
复制代码
(4)总结
2.png
 
3.服务端进行服务注册时的处理
(1)服务端处理客户端发起的服务注册请求
(2)服务端对客户端注册事件的处理源码
(3)总结
 
(1)服务端处理客户端发起的服务注册请求
客户端向服务端发起服务注册时,会先根据要注册的服务信息来创建一个InstanceRequest请求参数对象,再调用NamingGrpcClientProxy的requestToServer()方法向服务端发请求,也就是通过调用RpcClient的request()方法来向服务端发出gRPC请求。
 
InstanceRequestHandler的handle()方法就是用来处理服务注册请求的,该方法会继续调用InstanceRequestHandler的registerInstance()方法,根据客户端发过来的请求类型来选择是注册服务还是注销服务。
 
如果客户端发过来的请求类型是注册服务实例,则调用EphemeralClientOperationServiceImpl的registerInstance()方法,在该方法中:
 
一.首先会调用ServiceManager的getSingleton()方法
根据由请求信息创建的Service对象获取一个已注册的Service对象。
 
需要注意:在Nacos 1.x中,ServiceManager使用一个双层Map存放服务和命名空间。在Nacos 2.x中,ServiceManager则使用两个Map存放服务和命名空间。
  1. 在Nacos 1.x中的双层Map是:
  2. Map<String, Map<String, Service>>;
  3. 例如Map(namespace, Map(group::serviceName, Service));
  4. 在Nacos 2.x中的两个Map是:
  5. ConcurrentHashMap<Service, Service>、
  6. ConcurrentHashMap<String_namespace, Set<Service>>;
复制代码
二.然后调用ClientManagerDelegate的getClient()方法
根据请求参数中的connectionId来获取一个IpPortBasedClient对象。在执行ClientManagerDelegate的getClient()方法时,会先根据connectionId选出具体的ClientManager实现类,接着再调用比如EphemeralIpPortClientManager的getClient()方法,从EphemeralIpPortClientManager.clients属性中获取一个Client对象。clients属性是一个ConcurrentMap,key是请求参数中的connectionId,value是继承了实现Client接口的AbstractClient的IpPortBasedClient对象。
 
需要注意:Nacos中的gRPC底层是基于Netty实现的。当客户端和服务端建立长连接后,服务端会生成SocketChannel连接对象,这个SocketChannel连接对象就代表了客户端。Nacos会在这个SocketChannel连接对象的基础上,封装一个Client对象,并且生成一个connectionId将SocketChannel对象与Client对象关联起来。
 
所以当要进行服务注册的客户端和服务端建立好长连接后,服务端就会为客户端创建一个IpPortBasedClient对象,并将该对象存放在EphemeralIpPortClientManager.clients属性里。
3.png
三.接着调用ClientOperationService的getPublishInfo()方法
将请求中的instance实例信息封装为InstancePublishInfo对象。
 
四.然后调用IpPortBasedClient对象的addServiceInstance()方法
往IpPortBasedClient对象里添加Service对象 -> InstancePublishInfo对象。
 
由于IpPortBasedClient继承自实现了Client接口的AbstractClient抽象类,所以实际是调用AbstractClient的addServiceInstance()方法添加服务实例。
 
在AbstractClient抽象类中,有一个名为publishers的属性。它是一个ConcurrentHashMap,用于存储客户端服务注册请求中的Instance信息。就是记录该客户端提供的服务和服务实例,一个客户端可提供多个服务,当然这些信息已经被封装为InstancePublishInfo对象了。所以AbstractClient.publishers属性的key为已注册的Service,value是根据请求中的Instance实例信息封装的InstancePublishInfo对象。
 
在AbstractClient的addServiceInstance()方法中,首先往publishers放入这次要注册的Service对象和客户端Instance实例,然后发布客户端改变事件ClientChangedEvent,用来同步集群间的数据。
 
五.最后发布客户端注册服务实例事件和服务实例元数据事件
客户端注册服务实例事件是ClientRegisterServiceEvent,服务实例元数据事件是InstanceMetadataEvent。
  1. //Instance request handler.
  2. @Component
  3. public class InstanceRequestHandler extends RequestHandler<InstanceRequest, InstanceResponse> {
  4.     private final EphemeralClientOperationServiceImpl clientOperationService;
  5.     public InstanceRequestHandler(EphemeralClientOperationServiceImpl clientOperationService) {
  6.         this.clientOperationService = clientOperationService;
  7.     }
  8.     @Override
  9.     @Secured(action = ActionTypes.WRITE)
  10.     public InstanceResponse handle(InstanceRequest request, RequestMeta meta) throws NacosException {
  11.         //根据请求信息创建一个Service对象,里面包含了:命名空间、分组名、服务名
  12.         Service service = Service.newService(request.getNamespace(), request.getGroupName(), request.getServiceName(), true);
  13.         switch (request.getType()) {
  14.             case NamingRemoteConstants.REGISTER_INSTANCE:
  15.                 //注册实例
  16.                 return registerInstance(service, request, meta);
  17.             case NamingRemoteConstants.DE_REGISTER_INSTANCE:
  18.                 //注销实例
  19.                 return deregisterInstance(service, request, meta);
  20.             default:
  21.                 throw new NacosException(NacosException.INVALID_PARAM, String.format("Unsupported request type %s", request.getType()));
  22.         }
  23.     }
  24.     private InstanceResponse registerInstance(Service service, InstanceRequest request, RequestMeta meta) {
  25.         //调用EphemeralClientOperationServiceImpl的注册方法registerInstance(),这里需要注意如下参数;
  26.         //参数service:根据请求信息创建的一个Service对象,里面有命名空间、分组名、服务名
  27.         //参数request.getInstance():这个参数就对应了客户端的实例对象,里面包含IP、端口等信息
  28.         //参数meta.getConnectionId():这个参数很关键,它是连接ID
  29.         clientOperationService.registerInstance(service, request.getInstance(), meta.getConnectionId());
  30.         return new InstanceResponse(NamingRemoteConstants.REGISTER_INSTANCE);
  31.     }
  32.     private InstanceResponse deregisterInstance(Service service, InstanceRequest request, RequestMeta meta) {
  33.         //调用EphemeralClientOperationServiceImpl的注销方法deregisterInstance()
  34.         clientOperationService.deregisterInstance(service, request.getInstance(), meta.getConnectionId());
  35.         return new InstanceResponse(NamingRemoteConstants.DE_REGISTER_INSTANCE);
  36.     }
  37. }
  38. //Operation service for ephemeral clients and services.
  39. @Component("ephemeralClientOperationService")
  40. public class EphemeralClientOperationServiceImpl implements ClientOperationService {
  41.     private final ClientManager clientManager;
  42.     public EphemeralClientOperationServiceImpl(ClientManagerDelegate clientManager) {
  43.         this.clientManager = clientManager;
  44.     }
  45.     @Override
  46.     public void registerInstance(Service service, Instance instance, String clientId) {
  47.         //1.从ServiceManager中根据由请求信息创建的Service对象获取一个已注册的Service对象
  48.         Service singleton = ServiceManager.getInstance().getSingleton(service);
  49.         if (!singleton.isEphemeral()) {
  50.             throw new NacosRuntimeException(NacosException.INVALID_PARAM, String.format("Current service %s is persistent service, can't register ephemeral instance.", singleton.getGroupedServiceName()));
  51.         }
  52.         //2.从ClientManagerDelegate中根据请求参数中的connectionId获取一个Client对象,即IpPortBasedClient对象
  53.         Client client = clientManager.getClient(clientId);
  54.         if (!clientIsLegal(client, clientId)) {
  55.             return;
  56.         }
  57.         //3.将请求中的instance实例信息封装为InstancePublishInfo对象
  58.         InstancePublishInfo instanceInfo = getPublishInfo(instance);
  59.         //4.往Client对象里添加已注册的服务对象Service,调用的是IpPortBasedClient对象的父类AbstractClient的addServiceInstance()方法
  60.         client.addServiceInstance(singleton, instanceInfo);
  61.         //设置IpPortBasedClient对象的lastUpdatedTime属性为最新时间
  62.         client.setLastUpdatedTime();
  63.         //5.发布客户端注册服务实例的事件
  64.         NotifyCenter.publishEvent(new ClientOperationEvent.ClientRegisterServiceEvent(singleton, clientId));
  65.         //5.发布服务实例元数据的事件
  66.         NotifyCenter.publishEvent(new MetadataEvent.InstanceMetadataEvent(singleton, instanceInfo.getMetadataId(), false));
  67.     }
  68.     ...
  69. }
  70. public interface ClientOperationService {
  71.     ...   
  72.     //get publish info.
  73.     default InstancePublishInfo getPublishInfo(Instance instance) {
  74.         InstancePublishInfo result = new InstancePublishInfo(instance.getIp(), instance.getPort());
  75.         Map<String, Object> extendDatum = result.getExtendDatum();
  76.         if (null != instance.getMetadata() && !instance.getMetadata().isEmpty()) {
  77.             extendDatum.putAll(instance.getMetadata());
  78.         }
  79.         if (StringUtils.isNotEmpty(instance.getInstanceId())) {
  80.             extendDatum.put(Constants.CUSTOM_INSTANCE_ID, instance.getInstanceId());
  81.         }
  82.         if (Constants.DEFAULT_INSTANCE_WEIGHT != instance.getWeight()) {
  83.             extendDatum.put(Constants.PUBLISH_INSTANCE_WEIGHT, instance.getWeight());
  84.         }
  85.         if (!instance.isEnabled()) {
  86.             extendDatum.put(Constants.PUBLISH_INSTANCE_ENABLE, instance.isEnabled());
  87.         }
  88.         String clusterName = StringUtils.isBlank(instance.getClusterName()) ? UtilsAndCommons.DEFAULT_CLUSTER_NAME : instance.getClusterName();
  89.         result.setHealthy(instance.isHealthy());
  90.         result.setCluster(clusterName);
  91.         return result;
  92.     }
  93. }
  94. //Nacos service manager for v2.
  95. public class ServiceManager {
  96.     private static final ServiceManager INSTANCE = new ServiceManager();
  97.     //key是根据请求参数创建的Service对象,value是已经注册的Service对象
  98.     private final ConcurrentHashMap<Service, Service> singletonRepository;
  99.     //key是命名空间,value是相同命名空间的已注册的Service对象集合
  100.     private final ConcurrentHashMap<String, Set<Service>> namespaceSingletonMaps;
  101.     private ServiceManager() {
  102.         singletonRepository = new ConcurrentHashMap<>(1 << 10);
  103.         namespaceSingletonMaps = new ConcurrentHashMap<>(1 << 2);
  104.     }
  105.     public static ServiceManager getInstance() {
  106.         return INSTANCE;
  107.     }
  108.     public Set<Service> getSingletons(String namespace) {
  109.         return namespaceSingletonMaps.getOrDefault(namespace, new HashSet<>(1));
  110.     }
  111.     //Get singleton service. Put to manager if no singleton.
  112.     //@param service new service
  113.     //@return if service is exist, return exist service, otherwise return new service
  114.     public Service getSingleton(Service service) {
  115.         //往singletonRepository这个ConcurrentHashMap中添加一个Service对象,如果存在就不添加
  116.         singletonRepository.putIfAbsent(service, service);
  117.         //从这个ConcurrentHashMap中把已注册的Service对象取出来
  118.         Service result = singletonRepository.get(service);
  119.         //将已注册的Service对象添加到namespaceSingletonMaps中
  120.         //由于namespaceSingletonMaps会按命名空间对已注册的Service对象进行分类
  121.         //所以namespaceSingletonMaps的key是命名空间,value是相同命名空间的已注册的Service对象集合
  122.         namespaceSingletonMaps.computeIfAbsent(result.getNamespace(), (namespace) -> new ConcurrentHashSet<>());
  123.         namespaceSingletonMaps.get(result.getNamespace()).add(result);
  124.         return result;
  125.     }
  126.     ...
  127. }
  128. //Client manager delegate.
  129. @Component("clientManager")
  130. public class ClientManagerDelegate implements ClientManager {
  131.     private final ConnectionBasedClientManager connectionBasedClientManager;
  132.     private final EphemeralIpPortClientManager ephemeralIpPortClientManager;   
  133.     private final PersistentIpPortClientManager persistentIpPortClientManager;
  134.     public ClientManagerDelegate(ConnectionBasedClientManager connectionBasedClientManager, EphemeralIpPortClientManager ephemeralIpPortClientManager, PersistentIpPortClientManager persistentIpPortClientManager) {
  135.         this.connectionBasedClientManager = connectionBasedClientManager;
  136.         this.ephemeralIpPortClientManager = ephemeralIpPortClientManager;
  137.         this.persistentIpPortClientManager = persistentIpPortClientManager;
  138.     }
  139.     ...
  140.     @Override
  141.     public Client getClient(String clientId) {
  142.         //通过请求参数中的connectionId获取一个Client对象,比如调用EphemeralIpPortClientManager.getClient()方法
  143.         return getClientManagerById(clientId).getClient(clientId);
  144.     }
  145.     private ClientManager getClientManagerById(String clientId) {
  146.         if (isConnectionBasedClient(clientId)) {
  147.             return connectionBasedClientManager;
  148.         }
  149.         return clientId.endsWith(ClientConstants.PERSISTENT_SUFFIX) ? persistentIpPortClientManager : ephemeralIpPortClientManager;
  150.     }
  151.     ...
  152. }
  153. //The manager of {@code IpPortBasedClient} and ephemeral.
  154. @Component("ephemeralIpPortClientManager")
  155. public class EphemeralIpPortClientManager implements ClientManager {
  156.     //key是请求参数中的connectionId即clientId,value是一个继承了实现Client接口的AbstractClient的IpPortBasedClient对象
  157.     private final ConcurrentMap<String, IpPortBasedClient> clients = new ConcurrentHashMap<>();
  158.     private final DistroMapper distroMapper;
  159.     private final ClientFactory<IpPortBasedClient> clientFactory;
  160.     public EphemeralIpPortClientManager(DistroMapper distroMapper, SwitchDomain switchDomain) {
  161.         this.distroMapper = distroMapper;
  162.         GlobalExecutor.scheduleExpiredClientCleaner(new ExpiredClientCleaner(this, switchDomain), 0, Constants.DEFAULT_HEART_BEAT_INTERVAL, TimeUnit.MILLISECONDS);
  163.         clientFactory = ClientFactoryHolder.getInstance().findClientFactory(ClientConstants.EPHEMERAL_IP_PORT);
  164.     }
  165.     @Override
  166.     public boolean clientConnected(String clientId, ClientAttributes attributes) {
  167.         return clientConnected(clientFactory.newClient(clientId, attributes));
  168.     }
  169.     @Override
  170.     public boolean clientConnected(final Client client) {
  171.         clients.computeIfAbsent(client.getClientId(), s -> {
  172.             Loggers.SRV_LOG.info("Client connection {} connect", client.getClientId());
  173.             IpPortBasedClient ipPortBasedClient = (IpPortBasedClient) client;
  174.             ipPortBasedClient.init();
  175.             return ipPortBasedClient;
  176.         });
  177.         return true;
  178.     }
  179.     @Override
  180.     public boolean syncClientConnected(String clientId, ClientAttributes attributes) {
  181.         return clientConnected(clientFactory.newSyncedClient(clientId, attributes));
  182.     }
  183.     @Override
  184.     public Client getClient(String clientId) {
  185.         //客户端和服务端建立长连接时,就会通过EphemeralIpPortClientManager.clientConnected()方法将clientId放入到clients
  186.         return clients.get(clientId);
  187.     }
  188.     ...
  189. }
  190. public class IpPortBasedClient extends AbstractClient {
  191.     ...
  192.     ...
  193. }
  194. public abstract class AbstractClient implements Client {
  195.     //publishers其实就是记录该客户端提供的服务和服务实例,一个客户端可提供多个服务;
  196.     //存储客户端发送过来的请求中的Instance信息,当然这些信息已封装为InstancePublishInfo对象
  197.     //key为已注册的Service,value是根据请求中的Instance实例信息封装的InstancePublishInfo对象
  198.     protected final ConcurrentHashMap<Service, InstancePublishInfo> publishers = new ConcurrentHashMap<>(16, 0.75f, 1);
  199.     ...
  200.     @Override
  201.     public boolean addServiceInstance(Service service, InstancePublishInfo instancePublishInfo) {
  202.         //服务注册时,如果是第一次put进去Service对象,会返回null
  203.         if (null == publishers.put(service, instancePublishInfo)) {
  204.             //监视器记录
  205.             MetricsMonitor.incrementInstanceCount();
  206.         }
  207.         //发布客户端改变事件ClientChangedEvent,用于处理集群间的数据同步
  208.         NotifyCenter.publishEvent(new ClientEvent.ClientChangedEvent(this));
  209.         Loggers.SRV_LOG.info("Client change for service {}, {}", service, getClientId());
  210.         return true;
  211.     }
  212.     ...
  213. }
复制代码
(2)服务端对客户端注册事件的处理源码
一.服务端处理客户端的服务注册请求时的主要工作
往三个ConcurrentHashMap里放入内容 + 发布三个事件。
 
三个ConcurrentHashMap分别是:第一个是ServiceManager中的,key是根据请求参数创建的Service对象,value是已经注册的Service对象。第二个是ServiceManager中的,key是命名空间,value是相同命名空间的已注册的Service对象集合。第三个是AbstractClient中的,key是注册的Service服务对象,value是根据请求中的Instance实例封装的InstancePublishInfo对象。
 
三个事件分别是:第一个是在AbstractClient的addServiceInstance()方法中,发布的ClientChangedEvent客户端改变事件。第二个是在EphemeralClientOperationService的registerInstance()方法中,发布的ClientRegisterServiceEvent客户端注册事件。第三个是在EphemeralClientOperationService的registerInstance()方法中,发布的InstanceMetadataEvent服务实例元数据事件。
 
二.服务端对客户端注册事件ClientRegisterServiceEvent的处理
当执行EphemeralClientOperationService的registerInstance()方法,发布一个ClientRegisterServiceEvent客户端注册事件时,便会触发执行ClientServiceIndexesManager的onEvent()方法,然后执行ClientServiceIndexesManager的handleClientOperation()方法,最终调用ClientServiceIndexesManager的addPublisherIndexes()方法。
 
其中addPublisherIndexes()方法会把clientId放入到publisherIndexes中。publisherIndexes是一个ConcurrentMap,它的key是要注册的服务实例所属的服务Service对象,它的value是某服务Service对象下的所有clientId即connectionId。由于connectionId代表了一个Client对象,也就是一个客户端,所以publisherIndexes的value可理解为服务Service下的所有客户端实例。
  1. //Client and service index manager.
  2. @Component
  3. public class ClientServiceIndexesManager extends SmartSubscriber {
  4.     //注册表(服务提供者),一个Service服务对象,对应多个服务实例的clientId
  5.     private final ConcurrentMap<Service, Set<String>> publisherIndexes = new ConcurrentHashMap<>();
  6.     ...
  7.     //可以处理客户端注册事件ClientRegisterServiceEvent
  8.     @Override
  9.     public void onEvent(Event event) {
  10.         if (event instanceof ClientEvent.ClientDisconnectEvent) {
  11.             handleClientDisconnect((ClientEvent.ClientDisconnectEvent) event);
  12.         } else if (event instanceof ClientOperationEvent) {
  13.             handleClientOperation((ClientOperationEvent) event);
  14.         }
  15.     }
  16.     private void handleClientOperation(ClientOperationEvent event) {
  17.         Service service = event.getService();
  18.         String clientId = event.getClientId();
  19.         if (event instanceof ClientOperationEvent.ClientRegisterServiceEvent) {
  20.             //处理客户端注册事件ClientRegisterServiceEvent
  21.             addPublisherIndexes(service, clientId);
  22.         } else if (event instanceof ClientOperationEvent.ClientDeregisterServiceEvent) {
  23.             //处理客户端注销事件ClientDeregisterServiceEvent
  24.             removePublisherIndexes(service, clientId);
  25.         } else if (event instanceof ClientOperationEvent.ClientSubscribeServiceEvent) {
  26.             //处理客户端订阅服务事件ClientSubscribeServiceEvent
  27.             addSubscriberIndexes(service, clientId);
  28.         } else if (event instanceof ClientOperationEvent.ClientUnsubscribeServiceEvent) {
  29.             //处理客户端取消订阅事件ClientUnsubscribeServiceEvent
  30.             removeSubscriberIndexes(service, clientId);
  31.         }
  32.     }
  33.     private void addPublisherIndexes(Service service, String clientId) {
  34.         //判断注册表是否存在该Service,不存在则创建一个空的ConcurrentHashSet
  35.         publisherIndexes.computeIfAbsent(service, (key) -> new ConcurrentHashSet<>());
  36.         //把clientId放入到对应的Service中
  37.         publisherIndexes.get(service).add(clientId);
  38.         //发布服务改变事件
  39.         NotifyCenter.publishEvent(new ServiceEvent.ServiceChangedEvent(service, true));
  40.     }
  41.     ...
  42. }
复制代码
(3)总结
服务端处理服务实例注册时,会使用多个Map来存储微服务实例的信息。在注册表也就是ClientServiceIndexesManager.publisherIndexes属性中,只是简单记录每个Service服务对象下包含的clientId字符串集合。通过clientId可以在clients属性中获取到IpPortBasedClient对象,IpPortBasedClient的父类AbstractClient会存储对应的Instance实例信息,所以这样的注册表是可记录每个Service服务包含的所有Instance实例的。
4.png
 
4.客户端服务发现和服务端处理服务订阅的源码分析
(1)Nacos客户端进行服务发现的源码
(2)Nacos服务端处理服务订阅请求的源码
 
(1)Nacos客户端进行服务发现的源码
一.nacos-discovery引入Ribbon实现服务调用时的负载均衡
二.nacos-discovery如何整合Ribbon实现服务调用时的负载均衡
三.nacos-client如何进行服务发现
 
一.nacos-discovery引入Ribbon实现服务调用时的负载均衡
Nacos客户端就是引入了nacos-discovery + nacos-client依赖的项目。由于nacos-discovery整合了Ribbon,所以Ribbon可以调用Nacos服务端的服务实例查询列表接口。于是Nacos客户端便借助Ribbon实现了服务调用时的负载均衡,即Ribbon会从服务实例列表中选择一个服务实例给客户端进行服务调用。
 
在nacos-discovery的pom.xml中,可以看到它引入了Ribbon依赖:
5.png
二.nacos-discovery如何整合Ribbon实现服务调用时的负载均衡
在Ribbon中会有一个ServerList接口,如下所示:ServerList就是一个扩展接口,这个接口的作用就是获取Server列表。然后nacos-discovery会针对这个接口进行实现,从而整合Ribbon。
 
从引入的包来看,loadbalancer是属于Ribbon源码包下的。而LoadBalancer则是Ribbon中的负载均衡器。负载均衡器会结合IRule负载均衡策略,从服务实例列表中选择一个实例。
  1. package com.netflix.loadbalancer;
  2. import java.util.List;
  3. //Interface that defines the methods sed to obtain the List of Servers
  4. public interface ServerList<T extends Server> {
  5.     public List<T> getInitialListOfServers();
  6.     //Return updated list of servers. This is called say every 30 secs
  7.     public List<T> getUpdatedListOfServers();
  8. }
复制代码
当Nacos客户端进行微服务调用时,会通过Ribbon来选出一个服务实例,此时Ribbon会调用NacosServerList的getUpdatedListOfServers()方法获取服务实例列表。
 
nacos-discovery的NacosServerList类继承了AbstractServerList类,而且实现了Ribbon的ServerList接口的两个方法,如下所示:
  1. public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware {
  2.     ...
  3.     ...
  4. }
  5. public class NacosServerList extends AbstractServerList<NacosServer> {
  6.     private NacosDiscoveryProperties discoveryProperties;
  7.     private String serviceId;
  8.     public NacosServerList(NacosDiscoveryProperties discoveryProperties) {
  9.         this.discoveryProperties = discoveryProperties;
  10.     }
  11.     @Override
  12.     public List<NacosServer> getInitialListOfServers() {
  13.         return getServers();
  14.     }
  15.     @Override
  16.     public List<NacosServer> getUpdatedListOfServers() {
  17.         return getServers();
  18.     }
  19.     private List<NacosServer> getServers() {
  20.         try {
  21.             //读取分组
  22.             String group = discoveryProperties.getGroup();
  23.             //通过服务名称、分组、true(表示只需要健康实例),
  24.             //调用NacosNamingService.selectInstances()方法来查询服务实例列表
  25.             List<Instance> instances = discoveryProperties.namingServiceInstance().selectInstances(serviceId, group, true);
  26.             //把Instance转换成NacosServer类型
  27.             return instancesToServerList(instances);
  28.         } catch (Exception e) {
  29.             throw new IllegalStateException("Can not get service instances from nacos, serviceId=" + serviceId, e);
  30.         }
  31.     }
  32.     private List<NacosServer> instancesToServerList(List<Instance> instances) {
  33.         List<NacosServer> result = new ArrayList<>();
  34.         if (CollectionUtils.isEmpty(instances)) {
  35.             return result;
  36.         }
  37.         for (Instance instance : instances) {
  38.             result.add(new NacosServer(instance));
  39.         }
  40.         return result;
  41.     }
  42.     public String getServiceId() {
  43.         return serviceId;
  44.     }
  45.     @Override
  46.     public void initWithNiwsConfig(IClientConfig iClientConfig) {
  47.         this.serviceId = iClientConfig.getClientName();
  48.     }
  49. }
复制代码
NacosServerList的核心方法是getServers(),因为nacos-discovery实现Ribbon的两个接口都调用到了该方法。
 
在nacos-discovery的NacosServerList的getServers()方法中,会调用nacos-client的NacosNamingService的selectInstances()方法,来获取服务实例列表。
 
三.nacos-client如何进行服务发现
在nacos-client的NacosNamingService的selectInstances()方法中,首先会调用ServiceInfoHolder的getServiceInfo()方法从本地缓存获取数据。ServiceInfoHolder的serviceInfoMap中的value是一个ServiceInfo对象,在ServiceInfo对象中会有一个Listhosts属性来存放实例数据。
 
如果ServiceInfoHolder中的本地缓存没有对应的ServiceInfo对象,那么就会调用NamingClientProxyDelegate的subscribe()方法。该方法首先会开启一个查询服务实例列表的延时执行的任务,然后通过Client对象发送订阅请求,去服务端实时获取服务实例数据。
 
具体来说就是先调用ServiceInfoUpdateService的scheduleUpdateIfAbsent()方法,开启一个延迟执行查询服务实例列表的UpdateTask任务,然后再次调用ServiceInfoHolder的getServiceInfoMap()方法查询本地缓存。如果本地缓存为空,则向服务端发起gRPC请求获取服务实例数据,也就是通过调用NamingGrpcClientProxy的subscribe()方法,触发调用NamingGrpcClientProxy的doSubscribe()方法,再触发调用NamingGrpcClientProxy的requestToServer()方法,接着调用RpcClient的request()方法发送gRPC请求给服务端,最后调用ServiceInfoHolder的processServiceInfo()方法更新本地缓存。
  1. public class NacosNamingService implements NamingService {
  2.     private ServiceInfoHolder serviceInfoHolder;
  3.     private NamingClientProxy clientProxy;
  4.     ...
  5.     private void init(Properties properties) throws NacosException {
  6.         ...
  7.         this.serviceInfoHolder = new ServiceInfoHolder(namespace, properties);
  8.         this.clientProxy = new NamingClientProxyDelegate(this.namespace, serviceInfoHolder, properties, changeNotifier);
  9.     }
  10.     @Override
  11.     public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {
  12.         ServiceInfo serviceInfo;
  13.         String clusterString = StringUtils.join(clusters, ",");
  14.         //判断是否需要订阅,默认为true
  15.         if (subscribe) {
  16.             //查询Nacos本地缓存数据
  17.             serviceInfo = serviceInfoHolder.getServiceInfo(serviceName, groupName, clusterString);
  18.             if (null == serviceInfo) {
  19.                 //如果本地缓存数据为空,则调用Client代理NamingClientProxyDelegate的订阅方法subscribe(),通过Client对象请求服务端获取数据
  20.                 serviceInfo = clientProxy.subscribe(serviceName, groupName, clusterString);
  21.             }
  22.         } else {
  23.             serviceInfo = clientProxy.queryInstancesOfService(serviceName, groupName, clusterString, 0, false);
  24.         }
  25.         //返回数据
  26.         return selectInstances(serviceInfo, healthy);
  27.     }
  28.     ...
  29. }
  30. //Naming client service information holder.
  31. public class ServiceInfoHolder implements Closeable {
  32.     private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;
  33.     ...
  34.     public ServiceInfo getServiceInfo(final String serviceName, final String groupName, final String clusters) {
  35.         NAMING_LOGGER.debug("failover-mode: {}", failoverReactor.isFailoverSwitch());
  36.         //获取key
  37.         String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
  38.         String key = ServiceInfo.getKey(groupedServiceName, clusters);
  39.         if (failoverReactor.isFailoverSwitch()) {
  40.             return failoverReactor.getService(key);
  41.         }
  42.         //通过key从本地缓存中获取数据
  43.         return serviceInfoMap.get(key);
  44.     }
  45.     ...
  46. }
  47. public class NamingClientProxyDelegate implements NamingClientProxy {
  48.     private final ServiceInfoUpdateService serviceInfoUpdateService;
  49.     private final ServiceInfoHolder serviceInfoHolder;
  50.     private final NamingGrpcClientProxy grpcClientProxy;
  51.     ...
  52.     public NamingClientProxyDelegate(String namespace, ServiceInfoHolder serviceInfoHolder, Properties properties, InstancesChangeNotifier changeNotifier) throws NacosException {
  53.         ...
  54.         this.serviceInfoUpdateService = new ServiceInfoUpdateService(properties, serviceInfoHolder, this, changeNotifier);
  55.         this.serviceInfoHolder = serviceInfoHolder;
  56.         this.grpcClientProxy = new NamingGrpcClientProxy(namespace, securityProxy, serverListManager, properties, serviceInfoHolder);
  57.     }
  58.     ...
  59.     @Override
  60.     public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
  61.         NAMING_LOGGER.info("[SUBSCRIBE-SERVICE] service:{}, group:{}, clusters:{} ", serviceName, groupName, clusters);
  62.         String serviceNameWithGroup = NamingUtils.getGroupedName(serviceName, groupName);
  63.         String serviceKey = ServiceInfo.getKey(serviceNameWithGroup, clusters);
  64.         //1.调用ServiceInfoUpdateService.scheduleUpdateIfAbsent()方法开启一个查询服务实例列表的定时任务
  65.         serviceInfoUpdateService.scheduleUpdateIfAbsent(serviceName, groupName, clusters);
  66.         //再次查询本地的缓存数据
  67.         ServiceInfo result = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
  68.         if (null == result || !isSubscribed(serviceName, groupName, clusters)) {
  69.             //2.如果本地缓存还是为空,则使用gRPC来请求服务端,也就是调用NamingGrpcClientProxy.subscribe()方法
  70.             result = grpcClientProxy.subscribe(serviceName, groupName, clusters);
  71.         }
  72.         //更新本地缓存
  73.         serviceInfoHolder.processServiceInfo(result);
  74.         return result;
  75.     }
  76.     ...
  77. }
  78. public class NamingGrpcClientProxy extends AbstractNamingClientProxy {
  79.     ...
  80.     private final RpcClient rpcClient;
  81.     private final NamingGrpcRedoService redoService;
  82.     public NamingGrpcClientProxy(String namespaceId, SecurityProxy securityProxy, ServerListFactory serverListFactory, Properties properties, ServiceInfoHolder serviceInfoHolder) throws NacosException {
  83.         ...
  84.         //通过RpcClientFactory.createClient()方法创建一个GrpcSdkClient对象实例,然后赋值给rpcClient属性
  85.         this.rpcClient = RpcClientFactory.createClient(uuid, ConnectionType.GRPC, labels);
  86.         this.redoService = new NamingGrpcRedoService(this);
  87.         start(serverListFactory, serviceInfoHolder);
  88.     }
  89.     private void start(ServerListFactory serverListFactory, ServiceInfoHolder serviceInfoHolder) throws NacosException {
  90.         rpcClient.serverListFactory(serverListFactory);
  91.         rpcClient.registerConnectionListener(redoService);
  92.         rpcClient.registerServerRequestHandler(new NamingPushRequestHandler(serviceInfoHolder));
  93.         rpcClient.start();
  94.         NotifyCenter.registerSubscriber(this);
  95.     }
  96.     ...
  97.     @Override
  98.     public ServiceInfo subscribe(String serviceName, String groupName, String clusters) throws NacosException {
  99.         if (NAMING_LOGGER.isDebugEnabled()) {
  100.             NAMING_LOGGER.debug("[GRPC-SUBSCRIBE] service:{}, group:{}, cluster:{} ", serviceName, groupName, clusters);
  101.         }
  102.         redoService.cacheSubscriberForRedo(serviceName, groupName, clusters);
  103.         //执行订阅
  104.         return doSubscribe(serviceName, groupName, clusters);
  105.     }
  106.     //Execute subscribe operation.
  107.     public ServiceInfo doSubscribe(String serviceName, String groupName, String clusters) throws NacosException {
  108.         //构建订阅服务请求——SubscribeServiceRequest对象
  109.         SubscribeServiceRequest request = new SubscribeServiceRequest(namespaceId, groupName, serviceName, clusters, true);
  110.         //向服务端发送SubscribeServiceRequest类型的请求
  111.         SubscribeServiceResponse response = requestToServer(request, SubscribeServiceResponse.class);
  112.         redoService.subscriberRegistered(serviceName, groupName, clusters);
  113.         return response.getServiceInfo();
  114.     }
  115.     private <T extends Response> T requestToServer(AbstractNamingRequest request, Class<T> responseClass) throws NacosException {
  116.         try {
  117.             request.putAllHeader(getSecurityHeaders(request.getNamespace(), request.getGroupName(), request.getServiceName()));
  118.             //实际会调用RpcClient.request()方法发起gRPC请求
  119.             Response response = requestTimeout < 0 ? rpcClient.request(request) : rpcClient.request(request, requestTimeout);
  120.             if (ResponseCode.SUCCESS.getCode() != response.getResultCode()) {
  121.                 throw new NacosException(response.getErrorCode(), response.getMessage());
  122.             }
  123.             if (responseClass.isAssignableFrom(response.getClass())) {
  124.                 return (T) response;
  125.             }
  126.             NAMING_LOGGER.error("Server return unexpected response '{}', expected response should be '{}'", response.getClass().getName(), responseClass.getName());
  127.         } catch (Exception e) {
  128.             throw new NacosException(NacosException.SERVER_ERROR, "Request nacos server failed: ", e);
  129.         }
  130.         throw new NacosException(NacosException.SERVER_ERROR, "Server return invalid response");
  131.     }
  132.     ...
  133. }
复制代码
其中UpdateTask任务的run()方法会先调用NamingClientProxy.queryInstancesOfService()方法,然后调用ServiceInfoHolder的processServiceInfo()方法向服务端查询服务实例列表以及更新本地服务实例缓存。当该任务执行完毕时,会继续向调度线程池提交一个延迟6s执行的任务,从而实现不断更新本地缓存的服务实例列表。
 
在ServiceInfoHolder的processServiceInfo()方法更新本地服务实例缓存中,会判断服务实例是否发生改变。如果有改变,那么客户端会先发布一个服务实例改变事件InstancesChangeEvent,然后把新的服务实例数据写入本地磁盘。
  1. public class ServiceInfoUpdateService implements Closeable {
  2.     private final NamingClientProxy namingClientProxy;
  3.     private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>();
  4.     ...
  5.     //Schedule update if absent.
  6.     public void scheduleUpdateIfAbsent(String serviceName, String groupName, String clusters) {
  7.         //生成一个serverKey
  8.         String serviceKey = ServiceInfo.getKey(NamingUtils.getGroupedName(serviceName, groupName), clusters);
  9.         //判断当前serviceKey是否已经开启了对应的定时任务,如果已经开启就不开启了
  10.         if (futureMap.get(serviceKey) != null) {
  11.             return;
  12.         }
  13.         //加一把同步锁,避免并发冲突
  14.         synchronized (futureMap) {
  15.             //加锁之后再进行一次判断,双重检测Double Check
  16.             if (futureMap.get(serviceKey) != null) {
  17.                 return;
  18.             }
  19.             //添加UpdateTask任务到executor延迟执行
  20.             ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, groupName, clusters));
  21.             futureMap.put(serviceKey, future);
  22.         }
  23.     }
  24.     private synchronized ScheduledFuture<?> addTask(UpdateTask task) {
  25.         return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
  26.     }
  27.     ...
  28.     public class UpdateTask implements Runnable {
  29.         long lastRefTime = Long.MAX_VALUE;
  30.         private boolean isCancel;
  31.         private final String serviceName;
  32.         private final String groupName;
  33.         private final String clusters;
  34.         private final String groupedServiceName;
  35.         private final String serviceKey;
  36.         //the fail situation. 1:can't connect to server 2:serviceInfo's hosts is empty
  37.         private int failCount = 0;
  38.         public UpdateTask(String serviceName, String groupName, String clusters) {
  39.             this.serviceName = serviceName;
  40.             this.groupName = groupName;
  41.             this.clusters = clusters;
  42.             this.groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
  43.             this.serviceKey = ServiceInfo.getKey(groupedServiceName, clusters);
  44.         }
  45.         @Override
  46.         public void run() {
  47.             long delayTime = DEFAULT_DELAY;
  48.             try {
  49.                 if (!changeNotifier.isSubscribed(groupName, serviceName, clusters) && !futureMap.containsKey(serviceKey)) {
  50.                     NAMING_LOGGER.info("update task is stopped, service:{}, clusters:{}", groupedServiceName, clusters);
  51.                     isCancel = true;
  52.                     return;
  53.                 }
  54.                 //获取本地缓存
  55.                 ServiceInfo serviceObj = serviceInfoHolder.getServiceInfoMap().get(serviceKey);
  56.                 if (serviceObj == null) {
  57.                     //如果本地缓存为空,则通过gRPC去查询服务端的服务实例数据
  58.                     serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
  59.                     //更新本地缓存
  60.                     serviceInfoHolder.processServiceInfo(serviceObj);
  61.                     //更新UpdateTask的lastRefTime属性,即更新UpdateTask任务的数据获取时间
  62.                     lastRefTime = serviceObj.getLastRefTime();
  63.                     return;
  64.                 }
  65.                 //如果本地缓存不为空,则判断本地缓存最后一次刷新时间 是否小于等于 最后一次UpdateTask任务的数据获取时间
  66.                 if (serviceObj.getLastRefTime() <= lastRefTime) {
  67.                     //如果小于等于,则重新查询服务端的服务实例数据
  68.                     serviceObj = namingClientProxy.queryInstancesOfService(serviceName, groupName, clusters, 0, false);
  69.                     //更新本地缓存
  70.                     serviceInfoHolder.processServiceInfo(serviceObj);
  71.                 }
  72.                 lastRefTime = serviceObj.getLastRefTime();
  73.                 if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
  74.                     incFailCount();
  75.                     return;
  76.                 }
  77.                 //TODO multiple time can be configured.
  78.                 //计算下一次定时任务执行的时间,这个时间默认是6s
  79.                 delayTime = serviceObj.getCacheMillis() * DEFAULT_UPDATE_CACHE_TIME_MULTIPLE;
  80.                 //请求成功后,重置错误次数为0
  81.                 resetFailCount();
  82.             } catch (Throwable e) {
  83.                 //记录请求失败的次数
  84.                 incFailCount();
  85.                 NAMING_LOGGER.warn("[NA] failed to update serviceName: {}", groupedServiceName, e);
  86.             } finally {
  87.                 if (!isCancel) {
  88.                     //根据请求失败的次数,动态调整重新提交的UpdateTask定时任务的执行时间
  89.                     executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
  90.                 }
  91.             }
  92.         }
  93.         private void incFailCount() {
  94.             int limit = 6;
  95.             if (failCount == limit) {
  96.                 return;
  97.             }
  98.             failCount++;
  99.         }
  100.         private void resetFailCount() {
  101.             failCount = 0;
  102.         }
  103.     }
  104.     ...
  105. }
  106. //Naming client service information holder.
  107. public class ServiceInfoHolder implements Closeable {
  108.     private final ConcurrentMap<String, ServiceInfo> serviceInfoMap;
  109.     ...
  110.     //Process service info.
  111.     public ServiceInfo processServiceInfo(ServiceInfo serviceInfo) {
  112.         String serviceKey = serviceInfo.getKey();
  113.         if (serviceKey == null) {
  114.             return null;
  115.         }
  116.         //获取本地缓存中的服务实例
  117.         ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
  118.         if (isEmptyOrErrorPush(serviceInfo)) {
  119.             return oldService;
  120.         }
  121.         //更新本地缓存中的服务实例
  122.         serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
  123.         //判断服务实例是否有改变
  124.         boolean changed = isChangedServiceInfo(oldService, serviceInfo);
  125.         if (StringUtils.isBlank(serviceInfo.getJsonFromServer())) {
  126.             serviceInfo.setJsonFromServer(JacksonUtils.toJson(serviceInfo));
  127.         }
  128.         MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
  129.         if (changed) {
  130.             NAMING_LOGGER.info("current ips:({}) service: {} -> {}", serviceInfo.ipCount(), serviceInfo.getKey(), JacksonUtils.toJson(serviceInfo.getHosts()));
  131.             //发布服务实例改变事件
  132.             NotifyCenter.publishEvent(new InstancesChangeEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), serviceInfo.getHosts()));
  133.             //将服务实例信息写入本地磁盘
  134.             DiskCache.write(serviceInfo, cacheDir);
  135.         }
  136.         return serviceInfo;
  137.     }
  138.     ...
  139. }
复制代码
ServiceStorage的getData()方法在读取缓存时,获取要查询的Service服务对象下的全部Instance实例会分三步:一是从注册表中获取要查询的Service对象下的全部clientId,二是根据clientId获取对应的Client对象,三是根据Client对象获取对应的Instance信息。
  1. //Handler to handle subscribe service.
  2. @Component
  3. public class SubscribeServiceRequestHandler extends RequestHandler<SubscribeServiceRequest, SubscribeServiceResponse> {
  4.     private final ServiceStorage serviceStorage;
  5.     private final EphemeralClientOperationServiceImpl clientOperationService;
  6.     ...
  7.     //假设order-service需要调用stock-service的接口,那么order-service(Nacos客户端)就要向服务端订阅stock-service服务
  8.     //也就是order-service需要从服务端获取到(查询出)stock-service的所有服务实例
  9.     @Override
  10.     @Secured(action = ActionTypes.READ)
  11.     public SubscribeServiceResponse handle(SubscribeServiceRequest request, RequestMeta meta) throws NacosException {
  12.         String namespaceId = request.getNamespace();
  13.         String serviceName = request.getServiceName();
  14.         String groupName = request.getGroupName();
  15.         String app = request.getHeader("app", "unknown");
  16.         String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
  17.         //构建要查询的Service服务对象,对应的是stock-serivce
  18.         Service service = Service.newService(namespaceId, groupName, serviceName, true);
  19.         //构建要订阅Service服务的订阅者,对应的是order-service
  20.         Subscriber subscriber = new Subscriber(meta.getClientIp(), meta.getClientVersion(), app, meta.getClientIp(), namespaceId, groupedServiceName, 0, request.getClusters());
  21.         //1.调用ServiceStorage.getData()方法读取缓存
  22.         ServiceInfo serviceInfo = ServiceUtil.selectInstancesWithHealthyProtection(serviceStorage.getData(service), metadataManager.getServiceMetadata(service).orElse(null), subscriber);
  23.         if (request.isSubscribe()) {
  24.             //2.添加订阅者,如果订阅的服务有变动,则需要通知订阅者
  25.             clientOperationService.subscribeService(service, subscriber, meta.getConnectionId());
  26.         } else {
  27.             clientOperationService.unsubscribeService(service, subscriber, meta.getConnectionId());
  28.         }
  29.         return new SubscribeServiceResponse(ResponseCode.SUCCESS.getCode(), "success", serviceInfo);
  30.     }
  31. }
复制代码
二.添加订阅者即Subscriber对象
调用的方法是EphemeralClientOperationServiceImpl.subscribeService(),添加订阅者其实就是先根据clientId找出对应的客户端Client对象,然后往AbstractClient.subscribers属性放入服务对象和对应的订阅者对象,最后再发布一个客户端订阅服务事件ClientSubscribeServiceEvent。
 
这个事件会被ClientServiceIndexesManager的onEvent()方法处理,即调用ClientServiceIndexesManager的addSubscriberIndexes()方法,该方法会继续发布一个服务订阅事件ServiceSubscribedEvent。
  1. -> serviceDataIndexes.get()
  2. -> ServiceStorage.getPushData()方法
  3. -> ServiceStorage.emptyServiceInfo()方法
  4. -> ServiceStorage.getAllInstancesFromIndex()方法
  5. -> ClientServiceIndexesManager.getAllClientsRegisteredService()方法
  6. -> ServiceStorage.getInstanceInfo()方法根据clientId获取Instance对象
  7. -> EphemeralIpPortClientManager.getClient()方法
  8. -> AbstractClient.getInstancePublishInfo()方法
  9. -> ServiceStorage.parseInstance()方法
  10. -> serviceDataIndexes.put() + serviceClusterIndex.put()
复制代码
(3)总结
6.png
 

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册