Nacos源码—2.Nacos服务注册发现分析二-牛翰网

Nacos源码—2.Nacos服务注册发现分析二

大纲

5.服务发现—服务之间的调用请求链路分析

6.服务端如何维护不健康的微服务实例

7.服务下线时涉及的处理

8.服务注册发现总结

 

5.服务发现—服务之间的调用请求链路分析

(1)微服务通过Nacos完成服务调用的请求流程

(2)Nacos客户端进行服务发现的源码

(3)Nacos服务端进行服务查询的源码

(4)总结

 

(1)微服务通过Nacos完成服务调用的请求流程

按照Nacos使用简介里的案例:订单服务和库存服务完成Nacos注册后,会通过Feign来完成服务间的调用。如下图示:

步骤一:首先每个客户端都会有一个微服务本地缓存列表,这个缓存列表会定时从注册中心获取最新的列表来更新本地缓存。

 

步骤二:然后当order-service需要调用stock-service时,order-service会先根据服务名称去本地缓存列表中找对应的微服务实例。但通过服务名称可能会找到多个,所以需要负载均衡选择其中一个。

 

步骤三:最后把服务名称更换为IP + Port,通过Feign发起HTTP调用获取返回结果。

 

(2)Nacos客户端进行服务发现的源码

一.nacos-discovery通过引入Ribbon实现服务调用时的负载均衡

二.nacos-discovery如何整合Ribbon实现服务调用时的负载均衡

三.nacos-client如何进行服务发现

 

一.nacos-discovery通过引入Ribbon实现服务调用时的负载均衡

Nacos客户端就是引入了nacos-discovery + nacos-client依赖的项目。由于nacos-discovery整合了Ribbon,所以Ribbon可以调用Nacos服务端的服务实例查询列表接口。于是Nacos客户端便借助Ribbon实现了服务调用时的负载均衡,也就是Ribbon会从服务实例列表中选择一个服务实例给客户端进行服务调用。

 

在nacos-discovery的pom.xml中,可以看到它引入了Ribbon依赖:

二.nacos-discovery如何整合Ribbon实现服务调用时的负载均衡

在Ribbon中会有一个ServerList接口,如下所示:这就是一个扩展接口,这个接口的作用就是获取Server列表。然后nacos-discovery会针对这个接口进行实现,从而整合Ribbon。从引入的包来看:loadbalancer是属于Ribbon源码包下的,而LoadBalancer则是Ribbon中的负载均衡器。负载均衡器会结合IRule负载均衡策略,从服务实例列表中选择一个实例。

package com.netflix.loadbalancer;

import java.util.List;

//Interface that defines the methods sed to obtain the List of Servers
public interface ServerList<T extends Server> {
    public List<T> getInitialListOfServers();

    //Return updated list of servers. This is called say every 30 secs
    public List<T> getUpdatedListOfServers();
}

当Nacos客户端进行微服务调用时,会通过Ribbon来选出一个微服务实例。也就是Ribbon会通过调用NacosServerList的getUpdatedListOfServers()方法选出一个微服务实例。

 

nacos-discovery的NacosServerList类继承了AbstractServerList类,而且实现了Ribbon的ServerList接口的两个方法,如下所示:

public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware {
    ...
    ...
}

public class NacosServerList extends AbstractServerList<NacosServer> {
    private NacosDiscoveryProperties discoveryProperties;
    private String serviceId;

    public NacosServerList(NacosDiscoveryProperties discoveryProperties) {
        this.discoveryProperties = discoveryProperties;
    }

    @Override
    public List<NacosServer> getInitialListOfServers() {
        return getServers();
    }

    @Override
    public List<NacosServer> getUpdatedListOfServers() {
        return getServers();
    }

    private List<NacosServer> getServers() {
        try {
            //读取分组
            String group = discoveryProperties.getGroup();
            //通过服务名称、分组、true(表示只需要健康实例),
            //调用NacosNamingService.selectInstances()方法来查询服务实例列表
            List<Instance> instances = discoveryProperties.namingServiceInstance().selectInstances(serviceId, group, true);
            //把Instance转换成NacosServer类型
            return instancesToServerList(instances);
        } catch (Exception e) {
            throw new IllegalStateException("Can not get service instances from nacos, serviceId=" + serviceId, e);
        }
    }

    private List<NacosServer> instancesToServerList(List<Instance> instances) {
        List<NacosServer> result = new ArrayList<>();
        if (CollectionUtils.isEmpty(instances)) {
            return result;
        }
        for (Instance instance : instances) {
            result.add(new NacosServer(instance));
        }
        return result;
    }

    public String getServiceId() {
        return serviceId;
    }

    @Override
    public void initWithNiwsConfig(IClientConfig iClientConfig) {
        this.serviceId = iClientConfig.getClientName();
    }
}

NacosServerList的核心方法是NacosServerList的getServers()方法,因为nacos-discovery实现Ribbon的两个接口都调用到了该方法。

 

在nacos-discovery的NacosServerList的getServers()方法中,会调用nacos-client的NacosNamingService的selectInstances()方法,来获取服务实例列表。

 

三.nacos-client如何进行服务发现

在nacos-client的NacosNamingService的selectInstances()方法中:首先会调用HostReactor的getServiceInfo()方法获取服务实例列表,然后调用HostReactor的getServiceInfo0()方法尝试从本地缓存获取,接着调用HostReactor的updateServiceNow()方法查询并更新缓存,也就是调用HostReactor的updateService()方法查询并更新缓存。即先调用NamingProxy的queryList()方法来查询服务端的服务实例列表,再调用HostReactor的processServiceJson()方法更新本地缓存。最后调用HostReactor的scheduleUpdateIfAbsent()方法提交同步缓存任务。

 

所以nacos-client的HostReactor的getServiceInfo()方法是服务发现的核心,它会先到本地缓存中去查询对应的服务实例列表。如果本地缓存查不到对应的服务数据,则到服务端去查询服务实例列表。当获取完服务实例列表后,会向调度线程池提交一个延迟执行的任务,在延迟任务中会执行UpdateTask任务的run()方法。

 

UpdateTask任务的run()方法:会调用updateService()方法查询服务实例列表并更新本地缓存。当该任务执行完毕时,会继续向调度线程池提交一个延迟执行的任务,从而实现不断重复地更新本地缓存的服务实例列表。

public class NacosNamingService implements NamingService {
    private HostReactor hostReactor;
    ...

    @Override
    public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy) throws NacosException {
        return selectInstances(serviceName, groupName, healthy, true);
    }

    @Override
    public List<Instance> selectInstances(String serviceName, String groupName, boolean healthy, boolean subscribe) throws NacosException {
        return selectInstances(serviceName, groupName, new ArrayList<String>(), healthy, subscribe);
    }

    @Override
    public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException {
        ServiceInfo serviceInfo;
        //这个参数传入默认就是true
        if (subscribe) {
            serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        } else {
            serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
        }
        return selectInstances(serviceInfo, healthy);
    }
    ...
}

public class HostReactor implements Closeable {
    //服务实例列表的本地缓存
    private final Map<String, ServiceInfo> serviceInfoMap;
    private final Map<String, Object> updatingMap;
    private final NamingProxy serverProxy;
    private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>();
    private final ScheduledExecutorService executor;
    ...

    public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
        NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
        String key = ServiceInfo.getKey(serviceName, clusters);
        if (failoverReactor.isFailoverSwitch()) {
            return failoverReactor.getService(key);
        }
        //先查询本地缓存中的服务实例列表
        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);

        //如果本地缓存实例列表为空
        if (null == serviceObj) {
            serviceObj = new ServiceInfo(serviceName, clusters);
            serviceInfoMap.put(serviceObj.getKey(), serviceObj);
            updatingMap.put(serviceName, new Object());
            //调用Nacos服务端的服务实例列表查询接口,立即更新Service数据
            updateServiceNow(serviceName, clusters);
            updatingMap.remove(serviceName);
        } else if (updatingMap.containsKey(serviceName)) {
            if (UPDATE_HOLD_INTERVAL > 0) {
                //hold a moment waiting for update finish
                synchronized (serviceObj) {
                    try {
                        serviceObj.wait(UPDATE_HOLD_INTERVAL);
                    } catch (InterruptedException e) {
                        NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                    }
                }
            }
        }
        //开启定时任务,维护本地缓存
        scheduleUpdateIfAbsent(serviceName, clusters);
        //最后从本地缓存中,获取服务实例列表数据
        return serviceInfoMap.get(serviceObj.getKey());
    }

    private ServiceInfo getServiceInfo0(String serviceName, String clusters) {
        String key = ServiceInfo.getKey(serviceName, clusters);
        //从本地缓存中获取服务实例列表
        return serviceInfoMap.get(key);
    }

    private void updateServiceNow(String serviceName, String clusters) {
        try {
            updateService(serviceName, clusters);
        } catch (NacosException e) {
            NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
        }
    }

    //Update service now.
    public void updateService(String serviceName, String clusters) throws NacosException {
        ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
        try {
            //调用Nacos服务端的服务实例查询接口
            String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
            //如果结果不为空,则更新本地缓存
            if (StringUtils.isNotEmpty(result)) {
                //更新本地缓存
                processServiceJson(result);
            }
        } finally {
            if (oldService != null) {
                synchronized (oldService) {
                    oldService.notifyAll();
                }
            }
        }
    }
    ...

    public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
            return;
        }
        synchronized (futureMap) {
            if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
                return;
            }
            //向调度线程池提交一个延迟执行的任务
            ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
            futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
        }
    }

    public synchronized ScheduledFuture<?> addTask(UpdateTask task) {
        //向调度线程池提交一个延迟执行的任务
        return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
    }

    public class UpdateTask implements Runnable {
        long lastRefTime = Long.MAX_VALUE;
        private final String clusters;    
        private final String serviceName;
        private int failCount = 0;

        public UpdateTask(String serviceName, String clusters) {
            this.serviceName = serviceName;
            this.clusters = clusters;
        }

        private void incFailCount() {
            int limit = 6;
            if (failCount == limit) {
                return;
            }
            failCount++;
        }

        private void resetFailCount() {
            failCount = 0;
        }

        @Override
        public void run() {
            long delayTime = DEFAULT_DELAY;
            try {
                ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                //如果本地缓存为空
                if (serviceObj == null) {
                    updateService(serviceName, clusters);
                    return;
                }
                //lastRefTime是最大的Long型
                if (serviceObj.getLastRefTime() <= lastRefTime) {
                    updateService(serviceName, clusters);
                    serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                } else {
                    refreshOnly(serviceName, clusters);
                }
                lastRefTime = serviceObj.getLastRefTime();
                if (!notifier.isSubscribed(serviceName, clusters) && !futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
                    NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
                    return;
                }
                if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
                    incFailCount();
                    return;
                }
                delayTime = serviceObj.getCacheMillis();
                resetFailCount();
            } catch (Throwable e) {
                incFailCount();
                NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
            } finally {
                //向调度线程池继续提交一个延迟执行的任务继续同步本地缓存
                executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
            }
        }
    }
}

public class NamingProxy implements Closeable {
    ...
    //向Nacos服务端发起HTTP形式的服务实例列表查询请求
    public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException {
        final Map<String, String> params = new HashMap<String, String>(8);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put("clusters", clusters);
        params.put("udpPort", String.valueOf(udpPort));
        params.put("clientIP", NetUtils.localIP());
        params.put("healthyOnly", String.valueOf(healthyOnly));
        //通过HTTP的方式,请求"/nacos/v1/ns/instance/list"接口
        return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
    }
    ...
}

(3)Nacos服务端进行服务实例查询的源码

由于Nacos客户端向服务端发起查询服务实例列表的请求时,调用的是HTTP下的”/nacos/v1/ns/instance/list”接口,所以Nacos服务端处理该请求的入口是InstanceController的list()方法。

@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {
    ...
    //Get all instance of input service.
    @GetMapping("/list")
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
    public ObjectNode list(HttpServletRequest request) throws Exception {
        //获取请求参数
        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);

        String agent = WebUtils.getUserAgent(request);
        String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
        String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
        int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
        String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
        boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));

        String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
        String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
        boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));

        //查询实例
        return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant, healthyOnly);
    }

    //Get service full information with instances.
    public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
        int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
        ...
        //从serviceManager的内存注册表中获取服务Service对象
        Service service = serviceManager.getService(namespaceId, serviceName);
        ...
        //从Service对象中获取服务实例列表
        srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
        ...
    }
    ...
}

//Service of Nacos server side
//We introduce a 'service --> cluster --> instance' model, 
//in which service stores a list of clusters, which contain a list of instances.
//his class inherits from Service in API module and stores some fields that do not have to expose to client.
@JsonInclude(Include.NON_NULL)
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
    private Map<String, Cluster> clusterMap = new HashMap<>();
    ...

    //Get all instance from input clusters.
    public List<Instance> srvIPs(List<String> clusters) {
        if (CollectionUtils.isEmpty(clusters)) {
            clusters = new ArrayList<>();
            clusters.addAll(clusterMap.keySet());
        }
        //拿到需要查询的集群对象
        return allIPs(clusters);
    }

    //Get all instance from input clusters.
    public List<Instance> allIPs(List<String> clusters) {
        List<Instance> result = new ArrayList<>();
        //遍历集群对象
        for (String cluster : clusters) {
            Cluster clusterObj = clusterMap.get(cluster);
            if (clusterObj == null) {
                continue;
            }
            //获取cluster对象中所有的Instance实例
            result.addAll(clusterObj.allIPs());
        }
        return result;
    }
    ...
}

public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
    @JsonIgnore
    private Set<Instance> persistentInstances = new HashSet<>();

    @JsonIgnore
    private Set<Instance> ephemeralInstances = new HashSet<>();
    ...

    public List<Instance> allIPs() {
        //返回持久化实例、临时实例
        List<Instance> allInstances = new ArrayList<>();
        allInstances.addAll(persistentInstances);
        allInstances.addAll(ephemeralInstances);
        return allInstances;
    }
    ...
}

(4)总结

一.微服务之间进行调用时获取微服务列表的流程

每一个客户端本地都会缓存微服务列表。在客户端发起请求前,会通过微服务名称找到对应的微服务列表,最终选举一台被调用的实例对象,进行HTTP调用。而且本地缓存列表会有一个定时任务,及时对微服务列表进行更新。

 

二.Nacos客户端进行服务发现的源码

首先Nacos客户端指引入了nacos-discovery + nacos-client依赖的项目,其中nacos-discovery会整合Ribbon。

 

Nacos客户端在微服务调用前,会向Nacos服务端发起服务列表查询请求,然后把请求结果缓存本地,同时会不断开启延迟执行任务维护本地缓存。而Nacos服务端中的查询服务实例列表的接口,会从内存注册表中获取数据。

 

6.服务端如何维护不健康的微服务实例

(1)Nacos服务管理的心跳机制

(2)服务端处理心跳请求的源码

(3)服务端定时检查心跳是否健康的源码

(4)Nacos维护微服务实例的健康状态总结

 

(1)Nacos服务管理的心跳机制

Nacos客户端发起服务实例注册时,会开启一个发送心跳任务。该任务会每隔5s调用一次服务端的实例心跳接口,告诉服务端它还活着。服务端接收到实例心跳接口的请求后,先通过IP + Port找到对应Instance。然后把Instance对象的lastBeat属性修改成当前最新的时间,再返回响应。

 

当服务端接收到客户端的服务注册请求时,也会开启一个健康检查任务。这个任务就是专门用来判断Instance状态是否可用的,也就是对比每一个Instance的lastBeat属性和当前时间。如果lastBeat超过当前时间15s,表示实例状态不健康。如果lastBeat超过当前时间30s,Nacos则会自动把该实例进行删除。

 

(2)服务端处理心跳请求的源码

一.客户端发送心跳请求的源码

二.服务端处理心跳请求的源码

三.服务端处理心跳请求总结

 

一.客户端发送心跳请求的源码

调用NacosNamingService的registerInstance()方法注册服务实例时,在调用NamingProxy的registerService()方法来注册服务实例之前,会根据注册的服务实例是临时实例来构建和添加心跳信息到beatReactor,也就是调用BeatReactor的buildBeatInfo()和addBeatInfo()方法。

 

在BeatReactor的buildBeatInfo()方法中,会通过BeatInfo的setPeriod()方法设置心跳间隔时间,默认是5秒。

 

在BeatReactor的addBeatInfo()方法中,倒数第二行会开启一个延时执行的任务。执行的任务是根据心跳信息BeatInfo封装的BeatTask。该BeatTask任务会交给BeatReactor的ScheduledExecutorService来执行,并通过BeatInfo的getPeriod()方法获取延时执行的时间为5秒。

 

在BeatTask的run()方法中,就会调用NamingProxy的sendBeat()方法发送心跳请求给Nacos服务端,也就是调用NamingProxy的reqApi()方法向Nacos服务端发起心跳请求。如果返回的心跳响应表明服务实例不存在,则重新发起服务实例注册请求。无论心跳响应如何,继续根据心跳信息BeatInfo封装一个BeatTask任务,然后将该任务交给线程池ScheduledExecutorService来延时5秒执行。

 

由此可见,在客户端在发起服务注册期间,会开启一个心跳健康检查的延时任务,这个任务每间隔5s执行一次。任务内容就是通过HTTP请求调用发送Nacos提供的服务实例心跳接口。

public class NacosServiceRegistry implements ServiceRegistry<Registration> {
    private final NacosDiscoveryProperties nacosDiscoveryProperties;

    public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
        this.nacosDiscoveryProperties = nacosDiscoveryProperties;
    }

    @Override
    public void register(Registration registration) {
        if (StringUtils.isEmpty(registration.getServiceId())) {
            log.warn("No service to register for nacos client...");
            return;
        }
        NamingService namingService = namingService();
        //服务名称
        String serviceId = registration.getServiceId();
        //服务分组
        String group = nacosDiscoveryProperties.getGroup();
        //服务实例,包含了IP、Port等信息
        Instance instance = getNacosInstanceFromRegistration(registration);

        try {
            //调用NacosNamingService.registerInstance()方法把当前的服务实例注册到Nacos中
            namingService.registerInstance(serviceId, group, instance);
            log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort());
        } catch (Exception e) {
            log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e);
            rethrowRuntimeException(e);
        }
    }

    private NamingService namingService() {
        return nacosServiceManager.getNamingService(nacosDiscoveryProperties.getNacosProperties());
    }

    private Instance getNacosInstanceFromRegistration(Registration registration) {
        Instance instance = new Instance();
        instance.setIp(registration.getHost());
        instance.setPort(registration.getPort());
        instance.setWeight(nacosDiscoveryProperties.getWeight());
        instance.setClusterName(nacosDiscoveryProperties.getClusterName());
        instance.setEnabled(nacosDiscoveryProperties.isInstanceEnabled());
        instance.setMetadata(registration.getMetadata());
        instance.setEphemeral(nacosDiscoveryProperties.isEphemeral());
        return instance;
    }
    ...
}

public class NacosNamingService implements NamingService {
    private BeatReactor beatReactor;
    private NamingProxy serverProxy;
    ...

    @Override
    public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        NamingUtils.checkInstanceIsLegal(instance);
        //获取分组服务名字
        String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
        //判定要注册的服务实例是否是临时实例
        if (instance.isEphemeral()) {
            //如果是临时实例,则构建心跳信息
            BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
            //添加心跳信息
            beatReactor.addBeatInfo(groupedServiceName, beatInfo);
        }
        //接下来调用NamingProxy的注册方法registerService()来注册服务实例
        serverProxy.registerService(groupedServiceName, groupName, instance);
    }
    ...
}

public class BeatReactor implements Closeable {
    ...
    //Build new beat information.
    public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {
        BeatInfo beatInfo = new BeatInfo();
        beatInfo.setServiceName(groupedServiceName);
        beatInfo.setIp(instance.getIp());
        beatInfo.setPort(instance.getPort());
        beatInfo.setCluster(instance.getClusterName());
        beatInfo.setWeight(instance.getWeight());
        beatInfo.setMetadata(instance.getMetadata());
        beatInfo.setScheduled(false);
        //getInstanceHeartBeatInterval()的返回值是5000
        beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
        return beatInfo;
    }
    ...
}

@JsonInclude(Include.NON_NULL)
public class Instance implements Serializable {
    ...
    public long getInstanceHeartBeatInterval() {
        //Constants.DEFAULT_HEART_BEAT_INTERVAL,默认是5000
        return getMetaDataByKeyWithDefault(PreservedMetadataKeys.HEART_BEAT_INTERVAL, Constants.DEFAULT_HEART_BEAT_INTERVAL);
    }
    ...
}

public class BeatReactor implements Closeable {
    private final ScheduledExecutorService executorService;
    private final NamingProxy serverProxy;
    public final Map<String, BeatInfo> dom2Beat = new ConcurrentHashMap<String, BeatInfo>();

    public BeatReactor(NamingProxy serverProxy) {
        this(serverProxy, UtilAndComs.DEFAULT_CLIENT_BEAT_THREAD_COUNT);
    }

    public BeatReactor(NamingProxy serverProxy, int threadCount) {
        this.serverProxy = serverProxy;
        this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.naming.beat.sender");
                return thread;
            }
        });
    }
    ...

    //Add beat information.
    public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
        NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
        String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
        BeatInfo existBeat = null;
        if ((existBeat = dom2Beat.remove(key)) != null) {
            existBeat.setStopped(true);
        }
        dom2Beat.put(key, beatInfo);
        //开启一个延时执行的任务,执行的任务是BeatTask
        executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
        MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
    }
    ...

    class BeatTask implements Runnable {
        BeatInfo beatInfo;
        public BeatTask(BeatInfo beatInfo) {
            this.beatInfo = beatInfo;
        }

        @Override
        public void run() {
            //判断是否需要停止
            if (beatInfo.isStopped()) {
                return;
            }
            //获取下一次执行的时间,同样还是5s
            long nextTime = beatInfo.getPeriod();
            try {
                //调用NamingProxy.sendBeat()方法发送心跳请求给Nacos服务端
                JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
                long interval = result.get("clientBeatInterval").asLong();
                boolean lightBeatEnabled = false;
                if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
                    lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
                }
                BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
                if (interval > 0) {
                    nextTime = interval;
                }
                //获取Nacos服务端返回的code状态码
                int code = NamingResponseCode.OK;
                if (result.has(CommonParams.CODE)) {
                    code = result.get(CommonParams.CODE).asInt();
                }
                //如果code = RESOURCE_NOT_FOUND,没有找到资源,那么表示之前注册的信息,已经被Nacos服务端移除了
                if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
                    //然后重新组装参数,重新发起注册请求
                    Instance instance = new Instance();
                    instance.setPort(beatInfo.getPort());
                    instance.setIp(beatInfo.getIp());
                    instance.setWeight(beatInfo.getWeight());
                    instance.setMetadata(beatInfo.getMetadata());
                    instance.setClusterName(beatInfo.getCluster());
                    instance.setServiceName(beatInfo.getServiceName());
                    instance.setInstanceId(instance.getInstanceId());
                    instance.setEphemeral(true);
                    try { 
                        //调用NamingProxy.registerService()方法发送服务实例注册请求到Nacos服务端
                        serverProxy.registerService(beatInfo.getServiceName(), NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
                    } catch (Exception ignore) {
                    }
                }
            } catch (NacosException ex) {
                NAMING_LOGGER.error("[CLIENT-BEAT] failed to send beat: {}, code: {}, msg: {}", JacksonUtils.toJson(beatInfo), ex.getErrCode(), ex.getErrMsg());
            }
            //把beatInfo又重新放入延迟任务当中,并且还是5秒,所以一直是个循环的状态
            executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
        }
    }
}

public class NamingProxy implements Closeable {
    ...
    //Send beat.
    public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {    
        if (NAMING_LOGGER.isDebugEnabled()) {
            NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
        }
        Map<String, String> params = new HashMap<String, String>(8);
        Map<String, String> bodyMap = new HashMap<String, String>(2);
        if (!lightBeatEnabled) {
            bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
        }
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
        params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
        params.put("ip", beatInfo.getIp());
        params.put("port", String.valueOf(beatInfo.getPort()));
        String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
        return JacksonUtils.toObj(result);
    }
    ...
}

二.服务端处理心跳请求的源码

服务端的InstanceController的beat()方法,会处理客户端发来的心跳请求。首先会尝试从ServiceManager的注册表中获取对应的Instance实例对象。如果在内存注册表中找不到对应的Instance实例对象,则直接调用ServiceManager的registerInstance()方法进行服务注册。

 

如果在内存注册表中可以找到对应的Instance实例对象,那么就从ServiceManager的注册表中取出对应的Service服务对象,这样后续对Service的Cluster的Instance进行修改时,就会修改到注册表数据。接着执行Service的processClientBeat()方法,该方法会提交一个异步任务ClientBeatProcessor给线程池,其中线程池的线程数是可用线程数的一半。

 

在ClientBeatProcessor的run()方法中:会先通过集群名找到所有的临时实例列表。然后通过for循环对这些临时实例进行IP + Port判断,找出对应的Instance实例对象。找出对应的Instance后,接着就会把Instance的lastBeat属性修改成当前时间,然后再判断当前Instance的状态是否不健康,若是则重新标记成健康状态。

//Instance operation controller.
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {
    ...
    //Create a beat for instance.
    @CanDistro
    @PutMapping("/beat")
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public ObjectNode beat(HttpServletRequest request) throws Exception {
        ObjectNode result = JacksonUtils.createEmptyJsonNode();
        result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());

        String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
        //获取请求参数、namespaceId、serviceName
        RsInfo clientBeat = null;
        if (StringUtils.isNotBlank(beat)) {
            clientBeat = JacksonUtils.toObj(beat, RsInfo.class);
        }
        String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME, UtilsAndCommons.DEFAULT_CLUSTER_NAME);
        String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
        int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
        if (clientBeat != null) {
            if (StringUtils.isNotBlank(clientBeat.getCluster())) {
                clusterName = clientBeat.getCluster();
            } else {
                clientBeat.setCluster(clusterName);
            }
            ip = clientBeat.getIp();
            port = clientBeat.getPort();
        }
        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);

        //通过命令空间、服务名等信息,从ServiceManager内存注册表中获取instance实例对象
        Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
        //如果获取实例为空,则会重新调用服务注册的方法ServiceManager.registerInstance()
        if (instance == null) {
            if (clientBeat == null) {
                result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
                return result;
            }
            Loggers.SRV_LOG.warn("[CLIENT-BEAT] The instance has been removed for health mechanism, " + "perform data compensation operations, beat: {}, serviceName: {}", clientBeat, serviceName);
            instance = new Instance();
            instance.setPort(clientBeat.getPort());
            instance.setIp(clientBeat.getIp());
            instance.setWeight(clientBeat.getWeight());
            instance.setMetadata(clientBeat.getMetadata());
            instance.setClusterName(clusterName);
            instance.setServiceName(serviceName);
            instance.setInstanceId(instance.getInstanceId());
            instance.setEphemeral(clientBeat.isEphemeral());
            //重新注册服务实例
            serviceManager.registerInstance(namespaceId, serviceName, instance);
        }

        //从ServiceManager内存注册表中获取服务Service,后续对Service中的Cluster的Instance修改,便会修改到注册表
        Service service = serviceManager.getService(namespaceId, serviceName);
        if (service == null) {
            throw new NacosException(NacosException.SERVER_ERROR, "service not found: " + serviceName + "@" + namespaceId);
        }
        if (clientBeat == null) {
            clientBeat = new RsInfo();
            clientBeat.setIp(ip);
            clientBeat.setPort(port);
            clientBeat.setCluster(clusterName);
        }
        //提交客户端服务实例的心跳健康检查任务,更改lastBeat属性
        service.processClientBeat(clientBeat);

        result.put(CommonParams.CODE, NamingResponseCode.OK);
        if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {
            result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());
        }
        result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
        return result;
    }
    ...
}

@JsonInclude(Include.NON_NULL)
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
    ...
    public void processClientBeat(final RsInfo rsInfo) {
        ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
        clientBeatProcessor.setService(this);
        clientBeatProcessor.setRsInfo(rsInfo);
        //立即执行
        HealthCheckReactor.scheduleNow(clientBeatProcessor);
    }
    ...
}

//Health check reactor.
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public class HealthCheckReactor {
    ...
    //Schedule client beat check task without a delay.
    public static ScheduledFuture<?> scheduleNow(Runnable task) {
        //提交任务到线程池立即执行
        return GlobalExecutor.scheduleNamingHealth(task, 0, TimeUnit.MILLISECONDS);
    }
    ...
}

public class GlobalExecutor {
    public static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors() <= 1 ? 1 : Runtime.getRuntime().availableProcessors() / 2;

    //线程池的线程数是可用线程的一半
    private static final ScheduledExecutorService NAMING_HEALTH_EXECUTOR = 
        ExecutorFactory.Managed.newScheduledExecutorService(
            ClassUtils.getCanonicalName(NamingApp.class), 
            DEFAULT_THREAD_COUNT, 
            new NameThreadFactory("com.alibaba.nacos.naming.health")
        );
    ...
    public static ScheduledFuture<?> scheduleNamingHealth(Runnable command, long delay, TimeUnit unit) {
        return NAMING_HEALTH_EXECUTOR.schedule(command, delay, unit);
    }
    ...
}

//Thread to update ephemeral instance triggered by client beat.
public class ClientBeatProcessor implements Runnable {
    private RsInfo rsInfo;
    private Service service;
    ...

    @Override
    public void run() {
        Service service = this.service;
        if (Loggers.EVT_LOG.isDebugEnabled()) {
            Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
        }
        //获取ip、clusterName
        String ip = rsInfo.getIp();
        String clusterName = rsInfo.getCluster();
        int port = rsInfo.getPort();
        Cluster cluster = service.getClusterMap().get(clusterName);
        //获取当前cluster下的所有临时实例
        List<Instance> instances = cluster.allIPs(true);

        //遍历临时实例
        for (Instance instance : instances) {
            //判断ip、port,只操作当前发送心跳检查的instance实例
            if (instance.getIp().equals(ip) && instance.getPort() == port) {
                if (Loggers.EVT_LOG.isDebugEnabled()) {
                    Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
                }
                //把instance实例的最后心跳时间修改为当前时间
                instance.setLastBeat(System.currentTimeMillis());
                if (!instance.isMarked()) {
                    //如果instance实例之前的状态是不健康
                    if (!instance.isHealthy()) {
                        instance.setHealthy(true);
                        Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok", cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE);
                        getPushService().serviceChanged(service);
                    }
                }
            }
        }
    }
    ...
}

三.服务端处理心跳请求总结

首先通过请求参数,在ServiceManager的内存注册表中找Instance对象。如果找不到对应的Instance对象,那么会重新进行服务注册。如果找到对应的Instance对象,则继续从ServiceManager的内存注册表中找出对应的Service对象,然后通过Service对象提交一个ClientBeatProcessor异步任务。

 

在这个异步任务中,会找到相同集群下的所有临时实例。然后通过for循环,并根据IP + Port来找到对应的Instance实例对象。接着修改Instance实例对象的lastBeat属性为当前时间,并且判断Instance实例对象是否健康,如果不健康则重新标记为健康状态。

 

对于健康的客户端实例,每5s会定时发送实例心跳请求。对于不健康的客户端实例,则不会每5s发送实例心跳请求。所以对于不健康的服务实例,Nacos是如何感知和处理的?

 

(3)服务端定时检查心跳是否健康的源码

一.Service服务被创建时的处理流程

二.异步任务ClientBeatCheckTask的run()方法的核心逻辑

 

一.Service服务被创建时的处理流程

ServiceManager的registerInstance()方法处理服务注册请求时,会调用ServiceManager的createEmptyService()方法看是否需要创建服务。

 

在ServiceManager的createEmptyService()方法中,如果需要创建一个新的服务Service,则会先new一个Service对象,然后调用ServiceManager的putServiceAndInit()方法。

 

ServiceManager的putServiceAndInit()方法会将新的Service放入注册表,然后调用Service的init()方法提交一个异步任务ClientBeatCheckTask到线程池,其中线程池的线程数是可用线程数的一半。

//服务管理者,拥有所有的服务列表,用于管理所有服务的注册、销毁、修改等
@Component
public class ServiceManager implements RecordListener<Service> {
    //注册表,Map(namespace, Map(group::serviceName, Service)).
    private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

    ...
    //Register an instance to a service in AP mode.
    //This method creates service or cluster silently if they don't exist.
    public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
        //1.创建一个空的服务
        createEmptyService(namespaceId, serviceName, instance.isEphemeral());
        //2.根据命名空间ID、服务名获取一个服务,如果获取结果为null则抛异常
        Service service = getService(namespaceId, serviceName);
        if (service == null) {
            throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName);
        }
        //3.添加服务实例
        addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
    }
    ...

    //1.创建一个空服务
    public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
        createServiceIfAbsent(namespaceId, serviceName, local, null);
    }

    //Create service if not exist.
    public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {
        Service service = getService(namespaceId, serviceName);
        if (service == null) {
            Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
            service = new Service();
            service.setName(serviceName);
            service.setNamespaceId(namespaceId);
            service.setGroupName(NamingUtils.getGroupName(serviceName));
            //now validate the service. if failed, exception will be thrown
            service.setLastModifiedMillis(System.currentTimeMillis());
            service.recalculateChecksum();
            if (cluster != null) {
                cluster.setService(service);
                service.getClusterMap().put(cluster.getName(), cluster);
            }
            service.validate();

            putServiceAndInit(service);
            if (!local) {
                addOrReplaceService(service);
            }
        }
    }

    private void putServiceAndInit(Service service) throws NacosException {
        //把Service放入注册表serviceMap中
        putService(service);
        service.init();
        //把Service作为监听器添加到consistencyService的listeners中
        consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
        consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
        Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
    }

    //Put service into manager.
    public void putService(Service service) {
        if (!serviceMap.containsKey(service.getNamespaceId())) {
            synchronized (putServiceLock) {
                if (!serviceMap.containsKey(service.getNamespaceId())) {
                    serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
                }
            }
        }
        serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
    }
    ...
}

@JsonInclude(Include.NON_NULL)
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
    @JsonIgnore
    private ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this);
    ...

    public void init() {
        //提交一个clientBeatCheckTask延时任务,每5s执行一次
        HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
        for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
            entry.getValue().setService(this);
            entry.getValue().init();
        }
    }
    ...
}

public class HealthCheckReactor {
    private static Map<String, ScheduledFuture> futureMap = new ConcurrentHashMap<>();
    ...

    //Schedule client beat check task with a delay.
    public static void scheduleCheck(ClientBeatCheckTask task) {
        futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
    }
    ...
}

public class GlobalExecutor {
    public static final int DEFAULT_THREAD_COUNT = Runtime.getRuntime().availableProcessors() <= 1 ? 1 : Runtime.getRuntime().availableProcessors() / 2;
    //线程池的线程数是可用线程的一半
    private static final ScheduledExecutorService NAMING_HEALTH_EXECUTOR = 
        ExecutorFactory.Managed.newScheduledExecutorService(
            ClassUtils.getCanonicalName(NamingApp.class), 
            DEFAULT_THREAD_COUNT,
            new NameThreadFactory("com.alibaba.nacos.naming.health")
        );
    ...

    public static ScheduledFuture<?> scheduleNamingHealth(Runnable command, long initialDelay, long delay, TimeUnit unit) {
        //以相对固定的频率来执行某项任务,即只有等这一次任务执行完了(不管执行了多长时间),才能执行下一次任务
        return NAMING_HEALTH_EXECUTOR.scheduleWithFixedDelay(command, initialDelay, delay, unit);
    }
    ...
}

public final class ExecutorFactory {
    ...
    public static final class Managed {
        private static final ThreadPoolManager THREAD_POOL_MANAGER = ThreadPoolManager.getInstance();
        ...
        //Create a new scheduled executor service with input thread factory and register to manager.
        public static ScheduledExecutorService newScheduledExecutorService(final String group, final int nThreads, final ThreadFactory threadFactory) {
            ScheduledExecutorService executorService = Executors.newScheduledThreadPool(nThreads, threadFactory);
            THREAD_POOL_MANAGER.register(DEFAULT_NAMESPACE, group, executorService);
            return executorService;
        }
    }
    ...
}

二.异步任务ClientBeatCheckTask的run()方法的核心逻辑

ClientBeatCheckTask的run()方法的作用就是进行服务实例的健康检查。即检查哪些客户端服务实例是不健康的,如果不健康就对它进行处理。

 

第一个循环的主要作用是:找出哪些Instance服务实例是不健康的。如果不健康就需要把Instance实例的healthy属性更改为false,而判断不健康的依据就是Instance实例的lastBeat属性。如果是健康的,则客户端每5s会发送一次心跳请求更新lastBeat属性。如果是不健康的,那么lastBeat属性是不会变化的。一旦超过15s还没变化,这个Instance就会被定时任务标记为不健康。

 

第二个循环的主要作用是:找出哪些Instance是可以删除的。Instance服务实例可以被删除的依据还是lastBeat属性,一旦超过30s没更新lastBeat属性,定时任务则会把该Instance删除掉。

//Check and update statues of ephemeral instances, remove them if they have been expired.
public class ClientBeatCheckTask implements Runnable {    
    private Service service;

    public ClientBeatCheckTask(Service service) {
        this.service = service;
    }

    ...
    @Override
    public void run() {
        try {
            //获取全部的临时实例
            List<Instance> instances = service.allIPs(true);
            //遍历每一个临时实例
            for (Instance instance : instances) {
                //判断:当前时间 - 实例最后心跳时间 > 心跳超时时间
                //instance.getInstanceHeartBeatTimeOut()取常量=15s
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
                    //marked默认为false,所以这个if成立
                    if (!instance.isMarked()) {
                        //如果这个instance还是健康的状态
                        if (instance.isHealthy()) {
                            //最终就改成不健康状态
                            instance.setHealthy(false);
                            //事件发布监听事件,通过udp协议来发送通知
                            getPushService().serviceChanged(service);
                            ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
                        }
                    }
                }
            }
            //又一次遍历全部的临时实例
            for (Instance instance : instances) {
                if (instance.isMarked()) {
                    continue;
                }
                //判断:当前时间 - 最后一次心跳时间 > 心跳删除时间
                //instance.getIpDeleteTimeout()取常量=30s
                if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
                    //直接把对应的instance从注册表中删除
                    deleteIp(instance);
                }
            }
        } catch (Exception e) {
            Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
        }
    }
    ...
}

(4)Nacos维护微服务实例的健康状态总结

Nacos客户端会有一个心跳任务,每隔5s会给Nacos服务端发送心跳,Nacos服务端会根据心跳时间修改对应Instance实例的lastBeat属性。

 

并且Nacos服务端在注册一个服务实例时,会按Service服务维度提交一个心跳健康检查任务给线程池定时执行。把超过15s没有心跳的Instance微服务实例设置为不健康状态,把超过30s没有心跳的Instance微服务实例直接从注册表中删除。

 

 

7.服务下线时涉及的处理

(1)Nacos客户端服务下线的源码

(2)Nacos服务端处理服务下线的源码

(3)Nacos服务端发送服务变动事件给客户端的源码

 

(1)Nacos客户端服务下线的源码

Nacos客户端的Spring容器被销毁时,会通知Nacos服务端进行服务下线。首先会触发调用AbstractAutoServiceRegistration的destroy()方法。因为该类实现了Spring监听器,并且该方法被@PreDestroy注解修饰。@PreDestroy注解的作用是:Spring容器销毁时回调被该注解修饰的方法。

 

然后调用NacosServiceRegistry的deregister()方法 -> NamingService的deregisterInstance()方法 -> NamingProxy的deregisterService()方法,最后调用NamingProxy的reqApi()方法向”/nacos/v1/ns/instance”接口发起删除请求。

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class, NacosDiscoveryAutoConfiguration.class })
public class NacosServiceRegistryAutoConfiguration {
    @Bean
    public NacosServiceRegistry nacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {
        return new NacosServiceRegistry(nacosDiscoveryProperties);
    }

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    public NacosRegistration nacosRegistration(ObjectProvider<List<NacosRegistrationCustomizer>> registrationCustomizers, NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) {
        return new NacosRegistration(registrationCustomizers.getIfAvailable(), nacosDiscoveryProperties, context);
    }

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    public NacosAutoServiceRegistration nacosAutoServiceRegistration(NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) {
        return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration);
    }
}

public class NacosAutoServiceRegistration extends AbstractAutoServiceRegistration<Registration> {
    ...
    ...
}

public abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> {
    private final ServiceRegistry<R> serviceRegistry;
    ...

    @PreDestroy
    public void destroy() {
        stop();
    }

    public void stop() {
        if (this.getRunning().compareAndSet(true, false) && isEnabled()) {
            deregister();
            if (shouldRegisterManagement()) {
                deregisterManagement();
            }
            this.serviceRegistry.close();
        }
    }

    protected void deregister() {
        //调用NacosServiceRegistry.deregister()方法
        this.serviceRegistry.deregister(getRegistration());
    }
    ...
}

public class NacosServiceRegistry implements ServiceRegistry<Registration> {
    ...
    @Override
    public void deregister(Registration registration) {
        ...
        NamingService namingService = namingService();
        String serviceId = registration.getServiceId();
        String group = nacosDiscoveryProperties.getGroup();
        try {
            //调用NamingService.deregisterInstance()方法
            namingService.deregisterInstance(serviceId, group, registration.getHost(), registration.getPort(), nacosDiscoveryProperties.getClusterName());
        } catch (Exception e) {
            log.error("ERR_NACOS_DEREGISTER, de-register failed...{},", registration.toString(), e);
        }
        log.info("De-registration finished.");
    }

    private NamingService namingService() {
        return nacosServiceManager.getNamingService(nacosDiscoveryProperties.getNacosProperties());
    }
    ...
}

//以上是nacos-discovery的,以下是nacos-client的
public class NacosNamingService implements NamingService {
    private BeatReactor beatReactor;
    private NamingProxy serverProxy;
    ...

    @Override
    public void deregisterInstance(String serviceName, String groupName, String ip, int port, String clusterName) throws NacosException {
        Instance instance = new Instance();
        instance.setIp(ip);
        instance.setPort(port);
        instance.setClusterName(clusterName);
        deregisterInstance(serviceName, groupName, instance);
    }

    @Override
    public void deregisterInstance(String serviceName, String groupName, Instance instance) throws NacosException {
        if (instance.isEphemeral()) {
            beatReactor.removeBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), instance.getIp(), instance.getPort());
        }
        //调用NamingProxy.deregisterService()方法
        serverProxy.deregisterService(NamingUtils.getGroupedName(serviceName, groupName), instance);
    }
    ...
}

public class NamingProxy implements Closeable {
    ...
    public void deregisterService(String serviceName, Instance instance) throws NacosException {
        final Map<String, String> params = new HashMap<String, String>(8);
        params.put(CommonParams.NAMESPACE_ID, namespaceId);
        params.put(CommonParams.SERVICE_NAME, serviceName);
        params.put(CommonParams.CLUSTER_NAME, instance.getClusterName());
        params.put("ip", instance.getIp());
        params.put("port", String.valueOf(instance.getPort()));
        params.put("ephemeral", String.valueOf(instance.isEphemeral()));
        reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.DELETE);
    }
    ...
}

(2)Nacos服务端处理服务下线的源码

Nacos服务端处理服务下线的入口是InstanceController的deregister()方法,然后会调用ServiceManager的removeInstance()方法移除注册表里的实例,也就是调用ServiceManager的substractIpAddresses()方法。其中会传入remove参数执行ServiceManager的updateIpAddresses()方法,该方法的返回结果不会包含要删除的实例。

 

在ServiceManager的updateIpAddresses()方法中,判断入参action如果是remove,那么会把对应的Instance移除掉。但此时并不操作内存注册表,只是在返回的结果中删除对应的Instance实例。然后和注册逻辑一样,也是通过异步任务 + 内存队列的方式,去修改注册表。

//Instance operation controller.
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {
    @Autowired
    private ServiceManager serviceManager;

    ...
    //Deregister instances.
    @CanDistro
    @DeleteMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String deregister(HttpServletRequest request) throws Exception {
        Instance instance = getIpAddress(request);
        String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);

        Service service = serviceManager.getService(namespaceId, serviceName);
        if (service == null) {
            Loggers.SRV_LOG.warn("remove instance from non-exist service: {}", serviceName);
            return "ok";
        }
        //移除ServiceManager的注册表里的Instance实例
        serviceManager.removeInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
        return "ok";
    }
    ...
}

//Core manager storing all services in Nacos.
@Component
public class ServiceManager implements RecordListener<Service> {
    //注册表,Map(namespace, Map(group::serviceName, Service)).
    private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

    @Resource(name = "consistencyDelegate")
    private ConsistencyService consistencyService;
    ...

    //Remove instance from service.
    public void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
        Service service = getService(namespaceId, serviceName);
        synchronized (service) {
            //移除Instance
            removeInstance(namespaceId, serviceName, ephemeral, service, ips);
        }
    }

    private void removeInstance(String namespaceId, String serviceName, boolean ephemeral, Service service, Instance... ips) throws NacosException {
        //和注册一样,也是先构建key
        String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
        //在这个instanceList中,不会包含需要删除的Instance实例了
        List<Instance> instanceList = substractIpAddresses(service, ephemeral, ips);
        //包装成Instances对象
        Instances instances = new Instances();
        instances.setInstanceList(instanceList);
        //调用和注册一样的逻辑,把instanceList中的Instance,通过写时复制的机制,修改内存注册表
        consistencyService.put(key, instances);
    }

    private List<Instance> substractIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
        //UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE传的Remove
        return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE, ephemeral, ips);
    }

    //Compare and get new instance list.
    public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {
        //先获取已经注册到Nacos的、当前要注册的服务实例对应的服务的、所有服务实例
        Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
        List<Instance> currentIPs = service.allIPs(ephemeral);
        Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
        Set<String> currentInstanceIds = Sets.newHashSet();

        for (Instance instance : currentIPs) {
            //把instance实例的IP当作key,instance实例当作value,放入currentInstances
            currentInstances.put(instance.toIpAddr(), instance);
            //把实例唯一编码添加到currentInstanceIds中
            currentInstanceIds.add(instance.getInstanceId());
        }

        //用来存放当前要注册的服务实例对应的服务的、所有服务实例
        Map<String, Instance> instanceMap;
        if (datum != null && null != datum.value) {
            instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
        } else {
            instanceMap = new HashMap<>(ips.length);
        }

        for (Instance instance : ips) {
            if (!service.getClusterMap().containsKey(instance.getClusterName())) {
                Cluster cluster = new Cluster(instance.getClusterName(), service);
                cluster.init();
                service.getClusterMap().put(instance.getClusterName(), cluster);
                Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson());
            }

            if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
                //移除Instance实例
                instanceMap.remove(instance.getDatumKey());
            } else {
                Instance oldInstance = instanceMap.get(instance.getDatumKey());
                if (oldInstance != null) {
                    instance.setInstanceId(oldInstance.getInstanceId());
                } else {
                    instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
                }
                //instanceMap的key与IP和端口有关
                instanceMap.put(instance.getDatumKey(), instance);
            }
        }
        if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
            throw new IllegalArgumentException("ip list can not be empty, service: " + service.getName() + ", ip list: " + JacksonUtils.toJson(instanceMap.values()));
        }
        //最后instanceMap里肯定会包含新注册的Instance实例
        //并且如果不是第一次注册,里面还会包含之前注册的Instance实例信息
        return new ArrayList<>(instanceMap.values());
    }
    ...
}

(3)Nacos服务端发送服务变动事件给客户端的源码

一.处理服务注册或服务下线时让客户端感知的方案

二.处理服务注册或服务下线时发布服务变动事件

三.监听服务变动事件并通过UDP发送推送给客户端

 

一.处理服务注册或服务下线时让客户端感知的方案

Nacos客户端进行服务注册或服务下线时,其他Nacos客户端如何感知。

 

方案一:其他Nacos客户端在服务发现时,会通过定时任务去更新客户端本地缓存,但是这样做会有几秒钟的延迟。

 

方案二:当Nacos服务端的注册表发生了变动,服务端主动通知客户端。其实Nacos服务端在处理服务注册或服务下线时的最后逻辑是一样的。即在通过写时复制修改完注册表后,服务端会发布一个变动事件。然后通过UDP方式通知每一个客户端,从而让客户端更快感知服务变动。

 

二.处理服务注册或服务下线时发布服务变动事件

服务注册或服务下线时,都会调用ConsistencyService的put()方法,将本次操作包装成Pair对象放入阻塞队列,然后由异步任务Notifier来处理阻塞队列中的Pair对象。

 

异步任务Notifier对阻塞队列中的Pair对象进行处理时,会调用Pair对象对应的Service服务的onChange()方法,而Service的onChange()方法又会调用Service的updateIPs()方法。

 

在Service的updateIPs()方法中:会先调用Cluster的updateIps()方法通过写时复制机制去修改注册表,然后调用PushService的serviceChanged()方法发布服务变动事件。

@DependsOn("ProtocolManager")
@org.springframework.stereotype.Service("distroConsistencyService")
public class DistroConsistencyServiceImpl implements EphemeralConsistencyService, DistroDataProcessor {
    private final GlobalConfig globalConfig;
    private final DistroProtocol distroProtocol;
    private final DataStore dataStore;//用于存储所有已注册的服务实例数据
    private Map<String, ConcurrentLinkedQueue<RecordListener>> listeners = new ConcurrentHashMap<>();
    private volatile Notifier notifier = new Notifier();
    ...

    @PostConstruct
    public void init() {
        //初始化完成后,会将notifier任务提交给GlobalExecutor来执行
        GlobalExecutor.submitDistroNotifyTask(notifier);
    }

    @Override
    public void put(String key, Record value) throws NacosException {
        //把包含了当前注册的服务实例的、最新的服务实例列表,存储到DataStore对象中
        onPut(key, value);
        //在集群架构下,DistroProtocol.sync()方法会进行集群节点的服务实例数据同步
        distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2);
    }

    public void onPut(String key, Record value) {
        if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
            //创建Datum对象,把服务key和服务的所有服务实例Instances放入Datum对象中
            Datum<Instances> datum = new Datum<>();
            datum.value = (Instances) value;
            datum.key = key;
            datum.timestamp.incrementAndGet();
            //添加到DataStore的Map对象里
            dataStore.put(key, datum);
        }    
        if (!listeners.containsKey(key)) {
            return;
        }
        //添加处理任务
        notifier.addTask(key, DataOperation.CHANGE);
    }
    ...

    public class Notifier implements Runnable {
        private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024);
        private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);

        //Add new notify task to queue.
        public void addTask(String datumKey, DataOperation action) {
            if (services.containsKey(datumKey) && action == DataOperation.CHANGE) {
                return;
            }
            if (action == DataOperation.CHANGE) {
                services.put(datumKey, StringUtils.EMPTY);
            }
            //tasks是一个阻塞队列,把key、action封装成Pair对象,放入队列中
            tasks.offer(Pair.with(datumKey, action));
        }

        public int getTaskSize() {
            return tasks.size();
        }

        @Override
        public void run() {
            Loggers.DISTRO.info("distro notifier started");
            //无限循环
            for (; ;) {
                try {
                    //从阻塞队列中获取任务
                    Pair<String, DataOperation> pair = tasks.take();
                    //处理任务
                    handle(pair);
                } catch (Throwable e) {
                    Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
                }
            }
        }

        private void handle(Pair<String, DataOperation> pair) {
            try {
                //把在DistroConsistencyServiceImpl.onPut()方法创建的key和action取出来
                String datumKey = pair.getValue0();
                DataOperation action = pair.getValue1();
                services.remove(datumKey);

                int count = 0;
                if (!listeners.containsKey(datumKey)) {
                    return;
                }
                
                for (RecordListener listener : listeners.get(datumKey)) {
                    count++;
                    try {
                        if (action == DataOperation.CHANGE) {
                            //把Instances信息写到注册表里去,会调用Service.onChange()方法
                            listener.onChange(datumKey, dataStore.get(datumKey).value);
                            continue;
                        }
                        if (action == DataOperation.DELETE) {
                            listener.onDelete(datumKey);
                            continue;
                        }
                    } catch (Throwable e) {
                        Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e);
                    }
                }
                if (Loggers.DISTRO.isDebugEnabled()) {
                    Loggers.DISTRO.debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name());
                }
            } catch (Throwable e) {
                Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e);
            }
        }
    }
}

//Service of Nacos server side
//We introduce a 'service --> cluster --> instance' model, 
//in which service stores a list of clusters, which contain a list of instances.
//his class inherits from Service in API module and stores some fields that do not have to expose to client.
@JsonInclude(Include.NON_NULL)
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> {
    private Map<String, Cluster> clusterMap = new HashMap<>();
    ...

    @Override
    public void onChange(String key, Instances value) throws Exception {
        Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value);
        for (Instance instance : value.getInstanceList()) {
            if (instance == null) {
                //Reject this abnormal instance list:
                throw new RuntimeException("got null instance " + key);
            }
            if (instance.getWeight() > 10000.0D) {
                instance.setWeight(10000.0D);
            }
            if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
                instance.setWeight(0.01D);
            }
        }
        updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
        recalculateChecksum();
    }

    //Update instances. 这里的instances里就包含了新注册的实例对象
    public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
        //clusterMap表示的是该服务的集群
        Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
        for (String clusterName : clusterMap.keySet()) {
            ipMap.put(clusterName, new ArrayList<>());
        }
        //遍历全部实例对象:包括已经注册过的实例对象 和 新注册的实例对象
        //这里的作用就是对相同集群下的instance进行分类
        for (Instance instance : instances) {
            try {
                if (instance == null) {
                    Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null");
                    continue;
                }
                //判定客户端传过来的instance实例中,是否设置了ClusterName
                if (StringUtils.isEmpty(instance.getClusterName())) {
                    //如果否,就设置instance实例的ClusterName为DEFAULT
                    instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
                }
                //判断之前是否存在对应的CLusterName,如果没有则需要创建新的Cluster对象
                if (!clusterMap.containsKey(instance.getClusterName())) {
                    Loggers.SRV_LOG.warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson());
                    //创建新的Cluster集群对象
                    Cluster cluster = new Cluster(instance.getClusterName(), this);
                    cluster.init();
                    //将新创建的Cluster对象放入到集群clusterMap中
                    getClusterMap().put(instance.getClusterName(), cluster);
                }
                //根据集群名字,从ipMap里面获取集群下的所有实例
                List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
                if (clusterIPs == null) {
                    clusterIPs = new LinkedList<>();
                    ipMap.put(instance.getClusterName(), clusterIPs);
                }
                //将客户端传过来的新注册的instance实例,添加到clusterIPs,也就是ipMap中
                clusterIPs.add(instance);
            } catch (Exception e) {
                Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e);
            }
        }

        //对所有的服务实例分好类之后,按照ClusterName来更新注册表
        for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
            //entryIPs已经是根据ClusterName分好组的实例列表了
            List<Instance> entryIPs = entry.getValue();
            //调用Cluster.updateIps()方法,根据写时复制,对注册表中的每一个Cluster对象进行更新
            clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
        }
        setLastModifiedMillis(System.currentTimeMillis());
        //使用UDP方式通知Nacos客户端
        getPushService().serviceChanged(this);
        StringBuilder stringBuilder = new StringBuilder();
        for (Instance instance : allIPs()) {
            stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(",");
        }
        Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(), stringBuilder.toString());
    }

    @JsonIgnore
    public PushService getPushService() {
        return ApplicationUtils.getBean(PushService.class);
    }
    ...
}

public class Cluster extends com.alibaba.nacos.api.naming.pojo.Cluster implements Cloneable {
    @JsonIgnore
    private Set<Instance> persistentInstances = new HashSet<>();

    @JsonIgnore
    private Set<Instance> ephemeralInstances = new HashSet<>();

    @JsonIgnore
    private Service service;
    ...

    //Update instance list.
    public void updateIps(List<Instance> ips, boolean ephemeral) {
        //先判定是否是临时实例,然后把对应的实例数据取出来,放入到新创建的toUpdateInstances集合中
        Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances;
        //将老的实例列表toUpdateInstances复制一份到oldIpMap中
        HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size());
        for (Instance ip : toUpdateInstances) {
            oldIpMap.put(ip.getDatumKey(), ip);
        }
        ...
        //最后把传入进来的实例列表,重新初始化一个HaseSet,赋值给toUpdateInstances
        toUpdateInstances = new HashSet<>(ips);
        //判断是否是临时实例,将CLuster的persistentInstances或ephemeralInstances替换为toUpdateInstances
        if (ephemeral) {
            //直接把之前的实例列表替换成新的
            ephemeralInstances = toUpdateInstances;
        } else {
            //直接把之前的实例列表替换成新的
            persistentInstances = toUpdateInstances;
        }
    }
    ...
}

三.监听服务变动事件并通过UDP发送通知给客户端

PushService的serviceChanged()方法发布服务变动事件。由于PushService实现了ApplicationListener,所以PushService的onApplicationEvent()方法会收到发布的服务变动事件,然后调用PushService的udpPush()方法通过UDP协议主动通知客户端。

 

总结:如果Nacos服务端的注册表发生变动,会通过UDP协议主动通知客户端。UDP协议比较轻量化,它无需建立连接就可以发送封装的IP数据包。虽然UDP协议下的传输不可靠,但是不可靠也没关系。因为每个客户端本地还有一个定时任务去更新本地实例列表缓存。

@Component
@SuppressWarnings("PMD.ThreadPoolCreationRule")
public class PushService implements ApplicationContextAware, ApplicationListener<ServiceChangeEvent> {
    private ApplicationContext applicationContext;
    private static DatagramSocket udpSocket;
    private static volatile ConcurrentMap<String, Long> udpSendTimeMap = new ConcurrentHashMap<>();
    private static volatile ConcurrentMap<String, Receiver.AckEntry> ackMap = new ConcurrentHashMap<>();
    ...

    //Service changed.
    public void serviceChanged(Service service) {
        //merge some change events to reduce the push frequency:
        if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {
            return;
        }
        //发布服务变动事件
        this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
    }

    @Override
    public void onApplicationEvent(ServiceChangeEvent event) {
        Service service = event.getService();
        String serviceName = service.getName();
        String namespaceId = service.getNamespaceId();

        Future future = GlobalExecutor.scheduleUdpSender(() -> {
            try {
                Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
                //获取某服务下的所有Nacos客户端
                ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
                if (MapUtils.isEmpty(clients)) {
                    return;
                }

                Map<String, Object> cache = new HashMap<>(16);
                long lastRefTime = System.nanoTime();
                //遍历所有客户端
                for (PushClient client : clients.values()) {
                    ...
                    //通过UDP进行通知
                    udpPush(ackEntry);
                }
            } catch (Exception e) {
                Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", serviceName, e);
            } finally {
                futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
            }
        }, 1000, TimeUnit.MILLISECONDS);
        futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
    }

    private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
        if (ackEntry == null) {
            Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
            return null;
        }
        if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
            Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
            ackMap.remove(ackEntry.key);
            udpSendTimeMap.remove(ackEntry.key);
            failedPush += 1;
            return ackEntry;
        }

        try {
            if (!ackMap.containsKey(ackEntry.key)) {
                totalPush++;
            }
            ackMap.put(ackEntry.key, ackEntry);
            udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());

            Loggers.PUSH.info("send udp packet: " + ackEntry.key);
            //通过UDP协议发送消息
            udpSocket.send(ackEntry.origin);

            ackEntry.increaseRetryTime();
            GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry), TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);
            return ackEntry;
        } catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", ackEntry.data, ackEntry.origin.getAddress().getHostAddress(), e);
            ackMap.remove(ackEntry.key);
            udpSendTimeMap.remove(ackEntry.key);
            failedPush += 1;
            return null;
        }
    }
    ...
}

(4)服务下线的处理总结

 

8.服务注册发现总结

一.客户端

nacos-discovery利用了Spring的事件监听机制,在Spring容器启动时的调用Nacos服务端提供的服务实例注册接口。在调用服务实例注册接口时,客户端会开启一个异步任务来做发送心跳。

 

在客户端进行微服务调用时,nacos-discovery会整合Ribbon,然后查询Nacos服务端的服务实例列表来维护本地缓存,从而通过Ribbon实现服务调用时的负载均衡。

 

在关闭Spring容器时,会触发Nacos客户端销毁的方法,然后调用Nacos服务端的服务下线接口,从而完成服务下线流程。

 

二.服务端

服务端的核心功能:服务注册、服务查询、服务下线、心跳健康。服务注册的实现要点:异步任务 + 内存阻塞队列、内存注册表、写时复制。

 

服务端也会开启心跳健康检查的定时任务来检查不健康的实例。如果发现Instance超过15秒没有心跳,则标记为不健康。如果发现Instance超过30秒没有心跳,则会直接删除。

 

进行服务查询时,是直接从内存注册表中获取Instance列表进行返回。

 

来源链接:https://www.cnblogs.com/mjunz/p/18854646

请登录后发表评论

    没有回复内容