SpringCloudAlibaba:Nacos 实现原理详解
来源:https://blog.csdn.net/cold___play/article/details/108032204
Nacos 架构
Provider APP:服务提供者
Consumer APP:服务消费者
Name Server:通过 VIP(Virtual IP)或 DNS 的方式实现 Nacos 高可用集群的服务路由
Nacos Server:Nacos 服务提供者,里面包含的 Open API 是功能访问入口,Conig Service、Naming Service 是 Nacos 提供的配置服务、命名服务模块。
Consitency Protocol 是一致性协议,用来实现 Nacos 集群节点的数据同步,这里使用的是 Raft 算法(Etcd、Redis 哨兵选举)
Nacos Console:控制台
注册中心的原理
- 服务实例在启动时注册到服务注册表,并在关闭时注销
- 服务消费者查询服务注册表,获得可用实例
- 服务注册中心需要调用服务实例的健康检查 API 来验证它是否能够处理请求
Spring Cloud 完成注册的时机
在 spring-cloud-commons 包中有一个类org.springframework.cloud.client.serviceregistry.ServiceRegistry
, 它是 Spring Cloud 提供的服务注册的标准。集成到 Spring Cloud 中实现服务注册的组件, 都会实现该接口。
该接口有一个实现类是NacoServiceRegistry
。
Spring Cloud 集成 Nacos 的实现过程:
在 spring-cloud-commons 包的 META-INF/spring.factories 中包含自动装配的配置信息如下:
其中 AutoServiceRegistrationAutoConfiguration 就是服务注册相关的配置类:
@Configuration(proxyBeanMethods = false
)
@Import({
AutoServiceRegistrationConfiguration.class})
@ConditionalOnProperty(value = {
"spring.cloud.service-registry.auto-registration.enabled"},matchIfMissing = true
)
public class AutoServiceRegistrationAutoConfiguration {
@Autowired(required = false)private AutoServiceRegistration autoServiceRegistration;@Autowiredprivate AutoServiceRegistrationProperties properties;public AutoServiceRegistrationAutoConfiguration() {
}@PostConstructprotected void init() {
if (this.autoServiceRegistration == null && this.properties.isFailFast()) {
throw new IllegalStateException("Auto Service Registration has been requested, but there is no AutoServiceRegistration bean");}}
}
在 AutoServiceRegistrationAutoConfiguration 配置类中, 可以看到注入了一个 AutoServiceRegistration 实例, 该类的关系图如下所示。
可以看出, AbstractAutoServiceRegistration 抽象类实现了该接口, 并且最重要的是 NacosAutoServiceRegistration 继承了 AbstractAutoServiceRegistration。
看到 EventListener 我们就应该知道,Nacos 是通过 Spring 的事件机制继承到 SpringCloud 中去的。
AbstractAutoServiceRegistration 实现了 onApplicationEvent 抽象方法, 并且监听 WebServerInitializedEvent 事件 (当 Webserver 初始化完成之后) , 调用 this.bind ( event ) 方法。
public void onApplicationEvent(WebServerInitializedEvent event) {
this.bind(event);
}/** @deprecated */
@Deprecated
public void bind(WebServerInitializedEvent event) {
ApplicationContext context = event.getApplicationContext();if (!(context instanceof ConfigurableWebServerApplicationContext) || !"management".equals(((ConfigurableWebServerApplicationContext)context).getServerNamespace())) {
this.port.compareAndSet(0, event.getWebServer().getPort());this.start();}
}
最终会调用 NacosServiceRegistry.register() 方法进行服务注册。
public void register(Registration registration) {
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");} else {
NamingService namingService = this.namingService();String serviceId = registration.getServiceId();String group = this.nacosDiscoveryProperties.getGroup();Instance instance = this.getNacosInstanceFromRegistration(registration);try {
namingService.registerInstance(serviceId, group, instance);log.info("nacos registry, {} {} {}:{} register finished", new Object[]{
group, serviceId, instance.getIp(), instance.getPort()});} catch (Exception var7) {
log.error("nacos registry, {} register failed...{},", new Object[]{
serviceId, registration.toString(), var7});ReflectionUtils.rethrowRuntimeException(var7);}}
}
NacosServiceRegistry 的实现
在 NacosServiceRegistry.registry 方法中, 调用了 Nacos Client SDK 中的 namingService.registerInstance 完成服务的注册。
跟踪 NacosNamingService 的 registerInstance() 方法:
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);if (instance.isEphemeral()) {
BeatInfo beatInfo = this.beatReactor.buildBeatInfo(groupedServiceName, instance);this.beatReactor.addBeatInfo(groupedServiceName, beatInfo);}this.serverProxy.registerService(groupedServiceName, groupName, instance);
}
public BeatInfo buildBeatInfo(String groupedServiceName, Instance instance) {
BeatInfo beatInfo = new BeatInfo();beatInfo.setServiceName(groupedServiceName);beatInfo.setIp(instance.getIp());beatInfo.setPort(instance.getPort());beatInfo.setCluster(instance.getClusterName());beatInfo.setWeight(instance.getWeight());beatInfo.setMetadata(instance.getMetadata());beatInfo.setScheduled(false);beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());return beatInfo;
}
通过beatReactor.addBeatInfo()
创建心跳信息实现健康检测, Nacos Server 必须要确保注册的服务实例是健康的, 而心跳检测就是服务健康检测的手段。
serverProxy.registerService()
实现服务注册
心跳机制:
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
LogUtils.NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);String key = this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());BeatInfo existBeat = null;if ((existBeat = (BeatInfo)this.dom2Beat.remove(key)) != null) {
existBeat.setStopped(true);}this.dom2Beat.put(key, beatInfo);this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());
}
从上述代码看, 所谓心跳机制就是客户端通过 schedule 定时向服务端发送一个数据包 , 然后启动一个线程不断检测服务端的回应, 如果在设定时间内没有收到服务端的回应, 则认为服务器出现了故障。Nacos 服务端会根据客户端的心跳包不断更新服务的状态。
注册原理:
Nacos 提供了 SDK 和 Open API 两种形式来实现服务注册。
Open API:
SDK:
public void registerInstance(String serviceName, String ip, int port) throws NacosException
这两种形式本质都一样,底层都是基于 HTTP 协议完成请求的。所以注册服务就是发送一个 HTTP 请求:
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{
this.namespaceId, serviceName, instance});Map<String, String> params = new HashMap(16);params.put("namespaceId", this.namespaceId);params.put("serviceName", serviceName);params.put("groupName", groupName);params.put("clusterName", instance.getClusterName());params.put("ip", instance.getIp());params.put("port", String.valueOf(instance.getPort()));params.put("weight", String.valueOf(instance.getWeight()));params.put("enable", String.valueOf(instance.isEnabled()));params.put("healthy", String.valueOf(instance.isHealthy()));params.put("ephemeral", String.valueOf(instance.isEphemeral()));params.put("metadata", JacksonUtils.toJson(instance.getMetadata()));this.reqApi(UtilAndComs.nacosUrlInstance, params, "POST");
}
对于 nacos 服务端,对外提供的服务接口请求地址为nacos/v1/ns/instance
,实现代码咋 nacos-naming 模块下的 InstanceController 类中:
- 从请求参数汇总获得 serviceName(服务名)和 namespaceId(命名空间 Id)
- 调用 registerInstance 注册实例
- 创建一个控服务(在 Nacos 控制台 “服务列表” 中展示的服务信息),实际上是初始化一个 serviceMap,它是一个 ConcurrentHashMap 集合
- getService,从 serviceMap 中根据 namespaceId 和 serviceName 得到一个服务对象
- 调用 addInstance 添加服务实例
- 根据 namespaceId、serviceName 从缓存中获取 Service 实例
- 如果 Service 实例为空,则创建并保存到缓存中
- 通过 putService() 方法将服务缓存到内存
- service.init() 建立心跳机制
- consistencyService.listen 实现数据一致性监听
service.init ( ) 方法的如下图所示,它主要通过定时任务不断检测当前服务下所有实例最后发送心跳包的时间。如果超时, 则设置 healthy 为 false 表示服务不健康, 并且发送服务变更事件。在这里请大家思考一一个问题, 服务实例的最后心跳包更新时间是谁来触发的? 实际上前面有讲到, Nacos 客户端注册服务的同时也建立了心跳机制。
putService 方法,它的功能是将 Service 保存到 serviceMap 中:
继续调用 addInstance 方法把当前注册的服务实例保存到 Service 中:
总结:Nacos 客户端通过 Open API 的形式发送服务注册请求,Nacos 服务端收到请求后,做以下三件事:
- 1.构建一个 Service 对象保存到 ConcurrentHashMap 集合中
- 2.使用定时任务对当前服务下的所有实例建立心跳检测机制
- 3.基于数据一致性协议服务数据进行同步
服务提供者地址查询
Open API:
SDK:
InstanceController 中的 list 方法:
- 解析请求参数
- 通过 doSrvIPXT 返回服务列表数据
- 根据 namespaceId、serviceName 获得 Service 实例
- 从 Service 实例中基于 srvIPs 得到所有服务提供者实例
- 遍历组装 JSON 字符串并返回
Nacos 服务地址动态感知原理
可以通过 subscribe 方法来实现监听,其中 serviceName 表示服务名、EventListener 表示监听到的事件:
具体调用方式如下:
或者调用 selectInstance 方法,如果将 subscribe 属性设置为 true,会自动注册监听:
Nacos 客户端中有一个 HostReactor 类,它的功能是实现服务的动态更新,基本原理是:
客户端发起时间订阅后,在 HostReactor 中有一个 UpdateTask 线程,每 10s 发送一次 Pull 请求,获得服务端最新的地址列表
对于服务端,它和服务提供者的实例之间维持了心跳检测,一旦服务提供者出现异常,则会发送一个 Push 消息给 Nacos 客户端,也就是服务端消费者
服务消费者收到请求之后,使用 HostReactor 中提供的 processServiceJSON 解析消息,并更新本地服务地址列表。