Nacos源码—2.Nacos服务注册发现分析二
大纲5.服务发现—服务之间的调用请求链路分析
6.服务端如何维护不健康的微服务实例
7.服务下线时涉及的处理
8.服务注册发现总结
5.服务发现—服务之间的调用请求链路分析
(1)微服务通过Nacos完成服务调用的请求流程
(2)Nacos客户端进行服务发现的源码
(3)Nacos服务端进行服务查询的源码
(4)总结
(1)微服务通过Nacos完成服务调用的请求流程
按照Nacos使用简介里的案例:订单服务和库存服务完成Nacos注册后,会通过Feign来完成服务间的调用。如下图示:
步骤一:首先每个客户端都会有一个微服务本地缓存列表,这个缓存列表会定时从注册中心获取最新的列表来更新本地缓存。
步骤二:然后当order-service需要调用stock-service时,order-service会先根据服务名称去本地缓存列表中找对应的微服务实例。但通过服务名称可能会找到多个,所以需要负载均衡选择其中一个。
步骤三:最后把服务名称更换为IP + Port,通过Feign发起HTTP调用获取返回结果。
(2)Nacos客户端进行服务发现的源码
一.nacos-discovery通过引入Ribbon实现服务调用时的负载均衡
二.nacos-discovery如何整合Ribbon实现服务调用时的负载均衡
三.nacos-client如何进行服务发现
一.nacos-discovery通过引入Ribbon实现服务调用时的负载均衡
Nacos客户端就是引入了nacos-discovery + nacos-client依赖的项目。由于nacos-discovery整合了Ribbon,所以Ribbon可以调用Nacos服务端的服务实例查询列表接口。于是Nacos客户端便借助Ribbon实现了服务调用时的负载均衡,也就是Ribbon会从服务实例列表中选择一个服务实例给客户端进行服务调用。
在nacos-discovery的pom.xml中,可以看到它引入了Ribbon依赖:
二.nacos-discovery如何整合Ribbon实现服务调用时的负载均衡
在Ribbon中会有一个ServerList接口,如下所示:这就是一个扩展接口,这个接口的作用就是获取Server列表。然后nacos-discovery会针对这个接口进行实现,从而整合Ribbon。从引入的包来看:loadbalancer是属于Ribbon源码包下的,而LoadBalancer则是Ribbon中的负载均衡器。负载均衡器会结合IRule负载均衡策略,从服务实例列表中选择一个实例。
package com.netflix.loadbalancer;
import java.util.List;
//Interface that defines the methods sed to obtain the List of Servers
public interface ServerList<T extends Server> {
public List<T> getInitialListOfServers();
//Return updated list of servers. This is called say every 30 secs
public List<T> getUpdatedListOfServers();
}当Nacos客户端进行微服务调用时,会通过Ribbon来选出一个微服务实例。也就是Ribbon会通过调用NacosServerList的getUpdatedListOfServers()方法选出一个微服务实例。
nacos-discovery的NacosServerList类继承了AbstractServerList类,而且实现了Ribbon的ServerList接口的两个方法,如下所示:
public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware {
...
...
}
public class NacosServerList extends AbstractServerList<NacosServer> {
private NacosDiscoveryProperties discoveryProperties;
private String serviceId;
public NacosServerList(NacosDiscoveryProperties discoveryProperties) {
this.discoveryProperties = discoveryProperties;
}
@Override
public List<NacosServer> getInitialListOfServers() {
return getServers();
}
@Override
public List<NacosServer> getUpdatedListOfServers() {
return getServers();
}
private List<NacosServer> getServers() {
try {
//读取分组
String group = discoveryProperties.getGroup();
//通过服务名称、分组、true(表示只需要健康实例),
//调用NacosNamingService.selectInstances()方法来查询服务实例列表
List<Instance> instances = discoveryProperties.namingServiceInstance().selectInstances(serviceId, group, true);
//把Instance转换成NacosServer类型
return instancesToServerList(instances);
} catch (Exception e) {
throw new IllegalStateException("Can not get service instances from nacos, serviceId=" + serviceId, e);
}
}
private List<NacosServer> instancesToServerList(List<Instance> instances) {
List<NacosServer> result = new ArrayList<>();
if (CollectionUtils.isEmpty(instances)) {
return result;
}
for (Instance instance : instances) {
result.add(new NacosServer(instance));
}
return result;
}
public String getServiceId() {
return serviceId;
}
@Override
public void initWithNiwsConfig(IClientConfig iClientConfig) {
this.serviceId = iClientConfig.getClientName();
}
}NacosServerList的核心方法是NacosServerList的getServers()方法,因为nacos-discovery实现Ribbon的两个接口都调用到了该方法。
在nacos-discovery的NacosServerList的getServers()方法中,会调用nacos-client的NacosNamingService的selectInstances()方法,来获取服务实例列表。
三.nacos-client如何进行服务发现
在nacos-client的NacosNamingService的selectInstances()方法中:首先会调用HostReactor的getServiceInfo()方法获取服务实例列表,然后调用HostReactor的getServiceInfo0()方法尝试从本地缓存获取,接着调用HostReactor的updateServiceNow()方法查询并更新缓存,也就是调用HostReactor的updateService()方法查询并更新缓存。即先调用NamingProxy的queryList()方法来查询服务端的服务实例列表,再调用HostReactor的processServiceJson()方法更新本地缓存。最后调用HostReactor的scheduleUpdateIfAbsent()方法提交同步缓存任务。
所以nacos-client的HostReactor的getServiceInfo()方法是服务发现的核心,它会先到本地缓存中去查询对应的服务实例列表。如果本地缓存查不到对应的服务数据,则到服务端去查询服务实例列表。当获取完服务实例列表后,会向调度线程池提交一个延迟执行的任务,在延迟任务中会执行UpdateTask任务的run()方法。
UpdateTask任务的run()方法:会调用updateService()方法查询服务实例列表并更新本地缓存。当该任务执行完毕时,会继续向调度线程池提交一个延迟执行的任务,从而实现不断重复地更新本地缓存的服务实例列表。
public class NacosNamingService implements NamingService {
private HostReactor hostReactor;
...
@Override
public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException {
return selectInstances(serviceName, groupName, healthy, true);
}
@Override
public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe) throws NacosException {
return selectInstances(serviceName, groupName, new ArrayList<String>(), healthy, subscribe);
}
@Override
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {
ServiceInfo serviceInfo;
//这个参数传入默认就是true
if (subscribe) {
serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
} else {
serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
}
return selectInstances(serviceInfo, healthy);
}
...
}
public class HostReactor implements Closeable {
//服务实例列表的本地缓存
private final Map<String, ServiceInfo> serviceInfoMap;
private final Map<String, Object> updatingMap;
private final NamingProxy serverProxy;
private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>();
private final ScheduledExecutorService executor;
...
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
String key = ServiceInfo.getKey(serviceName, clusters);
if (failoverReactor.isFailoverSwitch()) {
return failoverReactor.getService(key);
}
//先查询本地缓存中的服务实例列表
ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
//如果本地缓存实例列表为空
if (null == serviceObj) {
serviceObj = new ServiceInfo(serviceName, clusters);
serviceInfoMap.put(serviceObj.getKey(), serviceObj);
updatingMap.put(serviceName, new Object());
//调用Nacos服务端的服务实例列表查询接口,立即更新Service数据
updateServiceNow(serviceName, clusters);
updatingMap.remove(serviceName);
} else if (updatingMap.containsKey(serviceName)) {
if (UPDATE_HOLD_INTERVAL > 0) {
//hold a moment waiting for update finish
synchronized (serviceObj) {
try {
serviceObj.wait(UPDATE_HOLD_INTERVAL);
} catch (InterruptedException e) {
NAMING_LOGGER.error(" serviceName:" + serviceName + ", clusters:" + clusters, e);
}
}
}
}
//开启定时任务,维护本地缓存
scheduleUpdateIfAbsent(serviceName, clusters);
//最后从本地缓存中,获取服务实例列表数据
return serviceInfoMap.get(serviceObj.getKey());
}
private ServiceInfo getServiceInfo0(String serviceName, String clusters) {
String key = ServiceInfo.getKey(serviceName, clusters);
//从本地缓存中获取服务实例列表
return serviceInfoMap.get(key);
}
private void updateServiceNow(String serviceName, String clusters) {
try {
updateService(serviceName, clusters);
} catch (NacosException e) {
NAMING_LOGGER.error(" failed to update serviceName: " + serviceName, e);
}
}
//Update service now.
public void updateService(String serviceName, String clusters) throws NacosException {
ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
try {
//调用Nacos服务端的服务实例查询接口
String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
//如果结果不为空,则更新本地缓存
if (StringUtils.isNotEmpty(result)) {
//更新本地缓存
processServiceJson(result);
}
} finally {
if (oldService != null) {
synchronized (oldService) {
oldService.notifyAll();
}
}
}
}
...
public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
synchronized (futureMap) {
if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
return;
}
//向调度线程池提交一个延迟执行的任务
ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
}
}
public synchronized ScheduledFuture<?> addTask(UpdateTask task) {
//向调度线程池提交一个延迟执行的任务
return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}
public class UpdateTask implements Runnable {
long lastRefTime = Long.MAX_VALUE;
private final String clusters;
private final String serviceName;
private int failCount = 0;
public UpdateTask(String serviceName, String clusters) {
this.serviceName = serviceName;
this.clusters = clusters;
}
private void incFailCount() {
int limit = 6;
if (failCount == limit) {
return;
}
failCount++;
}
private void resetFailCount() {
failCount = 0;
}
@Override
public void run() {
long delayTime = DEFAULT_DELAY;
try {
ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
//如果本地缓存为空
if (serviceObj == null) {
updateService(serviceName, clusters);
return;
}
//lastRefTime是最大的Long型
if (serviceObj.getLastRefTime() <= lastRefTime) {
updateService(serviceName, clusters);
serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
} else {
refreshOnly(serviceName, clusters);
}
lastRefTime = serviceObj.getLastRefTime();
if (!notifier.isSubscribed(serviceName, clusters) && !futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
return;
}
if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
incFailCount();
return;
}
delayTime = serviceObj.getCacheMillis();
resetFailCount();
} catch (Throwable e) {
incFailCount();
NAMING_LOGGER.warn(" failed to update serviceName: " + serviceName, e);
} finally {
//向调度线程池继续提交一个延迟执行的任务继续同步本地缓存
executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
}
}
}
}
public class NamingProxy implements Closeable {
...
//向Nacos服务端发起HTTP形式的服务实例列表查询请求
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException {
final Map<String, String> params = new HashMap<String, String>(8);
params.put(CommonParams.NAMESPACE_ID, namespaceId);
params.put(CommonParams.SERVICE_NAME, serviceName);
params.put("clusters", clusters);
params.put("udpPort", String.valueOf(udpPort));
params.put("clientIP", NetUtils.localIP());
params.put("healthyOnly", String.valueOf(healthyOnly));
//通过HTTP的方式,请求"/nacos/v1/ns/instance/list"接口
return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
}
...
}二.服务端处理心跳请求的源码
服务端的InstanceController的beat()方法,会处理客户端发来的心跳请求。首先会尝试从ServiceManager的注册表中获取对应的Instance实例对象。如果在内存注册表中找不到对应的Instance实例对象,则直接调用ServiceManager的registerInstance()方法进行服务注册。
如果在内存注册表中可以找到对应的Instance实例对象,那么就从ServiceManager的注册表中取出对应的Service服务对象,这样后续对Service的Cluster的Instance进行修改时,就会修改到注册表数据。接着执行Service的processClientBeat()方法,该方法会提交一个异步任务ClientBeatProcessor给线程池,其中线程池的线程数是可用线程数的一半。
在ClientBeatProcessor的run()方法中:会先通过集群名找到所有的临时实例列表。然后通过for循环对这些临时实例进行IP + Port判断,找出对应的Instance实例对象。找出对应的Instance后,接着就会把Instance的lastBeat属性修改成当前时间,然后再判断当前Instance的状态是否不健康,若是则重新标记成健康状态。
//Instance operation controller.@RestController@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")public class InstanceController { ... //Create a beat for instance. @CanDistro @PutMapping("/beat") @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public ObjectNode beat(HttpServletRequest request) throws Exception { ObjectNode result = JacksonUtils.createEmptyJsonNode(); result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval()); String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY); //获取请求参数、namespaceId、serviceName RsInfo clientBeat = null; if (StringUtils.isNotBlank(beat)) { clientBeat = JacksonUtils.toObj(beat, RsInfo.class); } String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME); String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY); int port = Integer.parseInt(WebUtils.optional(request, "port", "0")); if (clientBeat != null) { if (StringUtils.isNotBlank(clientBeat.getCluster())) { clusterName = clientBeat.getCluster(); } else { clientBeat.setCluster(clusterName); } ip = clientBeat.getIp(); port = clientBeat.getPort(); } String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); Loggers.SRV_LOG.debug(" full arguments: beat: {}, serviceName: {}", clientBeat, serviceName); //通过命令空间、服务名等信息,从ServiceManager内存注册表中获取instance实例对象 Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port); //如果获取实例为空,则会重新调用服务注册的方法ServiceManager.registerInstance() if (instance == null) { if (clientBeat == null) { result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND); return result; } Loggers.SRV_LOG.warn(" The instance has been removed for health mechanism, " + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName); instance = new Instance(); instance.setPort(clientBeat.getPort()); instance.setIp(clientBeat.getIp()); instance.setWeight(clientBeat.getWeight()); instance.setMetadata(clientBeat.getMetadata()); instance.setClusterName(clusterName); instance.setServiceName(serviceName); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(clientBeat.isEphemeral()); //重新注册服务实例 serviceManager.registerInstance(namespaceId, serviceName, instance); } //从ServiceManager内存注册表中获取服务Service,后续对Service中的Cluster的Instance修改,便会修改到注册表 Service service = serviceManager.getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.SERVER_ERROR, "service not found: " + serviceName + "@" + namespaceId); } if (clientBeat == null) { clientBeat = new RsInfo(); clientBeat.setIp(ip); clientBeat.setPort(port); clientBeat.setCluster(clusterName); } //提交客户端服务实例的心跳健康检查任务,更改lastBeat属性 service.processClientBeat(clientBeat); result.put(CommonParams.CODE, NamingResponseCode.OK); if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) { result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval()); } result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled()); return result; } ...}@JsonInclude(Include.NON_NULL)public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener { ... public void processClientBeat(final RsInfo rsInfo) { ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor(); clientBeatProcessor.setService(this); clientBeatProcessor.setRsInfo(rsInfo); //立即执行 HealthCheckReactor.scheduleNow(clientBeatProcessor); } ...}//Health check reactor.@SuppressWarnings("PMD.ThreadPoolCreationRule")public class HealthCheckReactor { ... //Schedule client beat check task without a delay. public static ScheduledFuture scheduleNow(Runnable task) { //提交任务到线程池立即执行 return GlobalExecutor.scheduleNamingHealth(task, 0, TimeUnit.MILLISECONDS); } ...}public class GlobalExecutor { public static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors() 分享、互助 让互联网精神温暖你我 这个好,看起来很实用 yyds。多谢分享 收藏一下 不知道什么时候能用到
页:
[1]