您当前的位置:首页 > 电脑百科 > 网络技术 > 网络技术

Nacos配置中心集群原理及源码分析

时间:2022-03-30 15:45:24  来源:博客园  作者:马士兵老师

Nacos作为配置中心,必然需要保证服务节点的高可用性,那么Nacos是如何实现集群的呢?

下面这个图,表示Nacos集群的部署图。

Nacos配置中心集群原理及源码分析

 

Nacos集群工作原理

Nacos作为配置中心的集群结构中,是一种无中心化节点的设计,由于没有主从节点,也没有选举机制,所以为了能够实现热备,就需要增加虚拟IP(VIP)。

Nacos的数据存储分为两部分

/data/program/nacos-1/data/config-data/${GROUP}

在Nacos的设计中,MySQL是一个中心数据仓库,且认为在Mysql中的数据是绝对正确的。 除此之外,Nacos在启动时会把Mysql中的数据写一份到本地磁盘。

这么设计的好处是可以提高性能,当客户端需要请求某个配置项时,服务端会想Ian从磁盘中读取对应文件返回,而磁盘的读取效率要比数据库效率高。

当配置发生变更时:

  1. Nacos会把变更的配置保存到数据库,然后再写入本地文件。
  2. 接着发送一个HTTP请求,给到集群中的其他节点,其他节点收到事件后,从Mysql中dump刚刚写入的数据到本地文件中。

另外,NacosServer启动后,会同步启动一个定时任务,每隔6小时,会dump一次全量数据到本地文件

配置变更同步入口

当配置发生修改、删除、新增操作时,通过发布一个 notifyConfigChange 事件。

@PostMApping
@Secured(action = ActionTypes.WRITE, parser = ConfigResourceParser.class)
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
        @RequestParam(value = "datAId") String dataId, @RequestParam(value = "group") String group,
        @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
        @RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
        @RequestParam(value = "appName", required = false) String appName,
        @RequestParam(value = "src_user", required = false) String srcUser,
        @RequestParam(value = "config_tags", required = false) String configTags,
        @RequestParam(value = "desc", required = false) String desc,
        @RequestParam(value = "use", required = false) String use,
        @RequestParam(value = "effect", required = false) String effect,
        @RequestParam(value = "type", required = false) String type,
        @RequestParam(value = "schema", required = false) String schema) throws NacosException {
    
   //省略..
    if (StringUtils.isBlank(betaIps)) {
        if (StringUtils.isBlank(tag)) {
            persistService.insertOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);
            ConfigChangePublisher
                    .notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, time.getTime()));
        } else {
            persistService.insertOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);
            ConfigChangePublisher.notifyConfigChange(
                    new ConfigDataChangeEvent(false, dataId, group, tenant, tag, time.getTime()));
        }
    }//省略
    return true;
}

AsyncNotifyService

配置数据变更事件,专门有一个监听器AsyncNotifyService,它会处理数据变更后的同步事件。

@Autowired
public AsyncNotifyService(ServerMemberManager memberManager) {
    this.memberManager = memberManager;
    
    // Register ConfigDataChangeEvent to NotifyCenter.
    NotifyCenter.registerToPublisher(ConfigDataChangeEvent.class, NotifyCenter.ringBufferSize);
    
    // Register A Subscriber to subscribe ConfigDataChangeEvent.
    NotifyCenter.registerSubscriber(new Subscriber() {
        
        @Override
        public void onEvent(Event event) {
            // Generate ConfigDataChangeEvent concurrently
            if (event instanceof ConfigDataChangeEvent) {
                ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
                long dumpTs = evt.lastModifiedTs;
                String dataId = evt.dataId;
                String group = evt.group;
                String tenant = evt.tenant;
                String tag = evt.tag;
                Collection<Member> ipList = memberManager.allMembers(); //得到集群中的ip列表
                
                // 构建NotifySingleTask,并添加到队列中。
                Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
                for (Member member : ipList) { //遍历集群中的每个节点
                    queue.add(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, member.getAddress(),
                            evt.isBeta));
                }
                //异步执行任务 AsyncTask
                ConfigExecutor.executeAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));
            }
        }
        
        @Override
        public Class<? extends Event> subscribeType() {
            return ConfigDataChangeEvent.class;
        }
    });
}

AsyncTask

@Override
public void run() {
    executeAsyncInvoke();
}

private void executeAsyncInvoke() {
    while (!queue.isEmpty()) {//遍历队列中的数据,直到数据为空
        NotifySingleTask task = queue.poll(); //获取task
        String targetIp = task.getTargetIP(); //获取目标ip
        
        if (memberManager.hasMember(targetIp)) { //如果集群中的ip列表包含目标ip
            // start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify
            //判断目标ip的健康状态
            boolean unHealthNeedDelay = memberManager.isUnHealth(targetIp); //
            if (unHealthNeedDelay) { //如果目标服务是非健康,则继续添加到队列中,延后再执行。
                // target ip is unhealthy, then put it in the notification list
                ConfigTraceService.logNotifyEvent(task.getDataId(), task.getGroup(), task.getTenant(), null,
                        task.getLastModified(), .NETUtils.getSelfIP(), ConfigTraceService.NOTIFY_EVENT_UNHEALTH,
                        0, task.target);
                // get delay time and set fail count to the task
                asyncTaskExecute(task);
            } else {
                //构建header
                Header header = Header.newInstance();
                header.addParam(NotifyService.NOTIFY_HEADER_LAST_MODIFIED, String.valueOf(task.getLastModified()));
                header.addParam(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP, InetUtils.getSelfIP());
                if (task.isBeta) {
                    header.addParam("isBeta", "true");
                }
                AuthHeaderUtil.addIdentityToHeader(header);
                //通过restTemplate发起远程调用,如果调用成功,则执行AsyncNotifyCallBack的回调方法
                restTemplate.get(task.url, header, Query.EMPTY, String.class, new AsyncNotifyCallBack(task));
            }
        }
    }
}

目标节点接收请求

数据同步的请求地址为,task.url=
http://192.168.8.16:8848/nacos/v1/cs/communication/dataChange?dataId=log.yaml&group=DEFAULT_GROUP

@GetMapping("/dataChange")
public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,
        @RequestParam("group") String group,
        @RequestParam(value = "tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,
        @RequestParam(value = "tag", required = false) String tag) {
    dataId = dataId.trim();
    group = group.trim();
    String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);
    long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);
    String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);
    String isBetaStr = request.getHeader("isBeta");
    if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {
        dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);
    } else {
        //
        dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);
    }
    return true;
}

dumpService.dump用来实现配置的更新,代码如下

当前任务会被添加到DumpTaskMgr中管理。

public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
        boolean isBeta) {
    String groupKey = GroupKey2.getKey(dataId, group, tenant);
    String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);
    dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
    DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
}

TaskManager.addTask, 先调用父类去完成任务添加。

@Override
public void addTask(Object key, AbstractDelayTask newTask) {
    super.addTask(key, newTask);
    MetricsMonitor.getDumpTaskMonitor().set(tasks.size());
}

在这种场景设计中,一般都会采用生产者消费者模式来完成,因此这里不难猜测到,任务会被保存到一个队列中,然后有另外一个线程来执行。

NacosDelayTaskExecuteEngine

TaskManager的父类是
NacosDelayTaskExecuteEngine,

这个类中有一个成员属性 protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks; ,专门来保存延期执行的任务类型AbstractDelayTask.

在这个类的构造方法中,初始化了一个延期执行的任务,其中具体的任务是ProcessRunnable.

public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
    super(logger);
    tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
    processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));
    processingExecutor
            .scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);
}

ProcessRunnable

private class ProcessRunnable implements Runnable {
    
    @Override
    public void run() {
        try {
            processTasks();
        } catch (Throwable e) {
            getEngineLog().error(e.toString(), e);
        }
    }
}

processTasks

protected void processTasks() {
    //获取所有的任务
    Collection<Object> keys = getAllTaskKeys();
    for (Object taskKey : keys) {
        AbstractDelayTask task = removeTask(taskKey);
        if (null == task) {
            continue;
        }
        //获取任务处理器,这里返回的是DumpProcessor
        NacosTaskProcessor processor = getProcessor(taskKey);
        if (null == processor) {
            getEngineLog().error("processor not found for task, so discarded. " + task);
            continue;
        }
        try {
            // ReAdd task if process failed
            //执行具体任务
            if (!processor.process(task)) {
                retryFailedTask(taskKey, task);
            }
        } catch (Throwable e) {
            getEngineLog().error("Nacos task execute error : " + e.toString(), e);
            retryFailedTask(taskKey, task);
        }
    }
}

DumpProcessor.process

读取数据库的最新数据,然后更新本地缓存和磁盘。

原文
https://www.cnblogs.com/mic112/p/16071963.html



Tags:Nacos   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
低代码开发:Nacos配置详解,如何确保平台跳转正常运作
在当今低代码开发的时代,平台的易用性和灵活性非常重要。右上角平台跳转作为用户界面中常见的交互元素,对于提高用户体验具有举足轻重的地位。然而,有时候我们会遇到跳转失效的...【详细内容】
2024-01-25  Search: Nacos  点击:(55)  评论:(0)  加入收藏
如何基于Kubernetes运行Nacos高可用集群
Nacos(Namings and Configuration Management)是阿里巴巴开源的一个易于构建云原生应用的动态服务发现、配置管理和服务管理平台。以下是Nacos的一些主要功能和特点: 服务发现...【详细内容】
2023-12-18  Search: Nacos  点击:(74)  评论:(0)  加入收藏
Nacos配置中心的Pull原理,附源码
在单体服务时代,关于配置信息,管理一套配置文件即可。而拆分成微服务之后,每一个系统都会有自己的配置,并且都各不相同,有些配置还需要动态改变,以达到动态降级、切流量、扩缩容等...【详细内容】
2023-11-17  Search: Nacos  点击:(265)  评论:(0)  加入收藏
Nacos有几种负载均衡策略?
Nacos 作为目前主流的微服务中间件,包含了两个顶级的微服务功能:配置中心和注册中心。1、配置中心扫盲配置中心是一种集中化管理配置的服务,通俗易懂的说就是将本地配置文件“...【详细内容】
2023-11-01  Search: Nacos  点击:(220)  评论:(0)  加入收藏
分布式配置中心Nacos和Apollo如何选择?
为什么需要配置中心?随着分布式业务的发展,分布式节点会越来越多,就会出现各种各样的问题。 业务功能随着需求逐渐变复杂了,就导致应用程序所需要的配置内容越来越多,例如业务开...【详细内容】
2023-09-11  Search: Nacos  点击:(108)  评论:(0)  加入收藏
Dubbo + Nacos这么玩就失去高可用的能力了
我们常用的微服务框架是SpringCloud那一套,在服务远程调用和注册中心的选型上也有不少方案。在服务远程调用上常用的有:Feign、Dubbo等,在注册中心上常用的有:Nacos、Zookeeper...【详细内容】
2023-09-08  Search: Nacos  点击:(267)  评论:(0)  加入收藏
教你用 Python 驾驭 Nacos 配置中心
大家好,我是安果!Nacos 是阿里巴巴开源的项目,用于构建云原生应用的动态服务发现、配置管理和服务管理平台核心特征包含:服务发现、服务健康监测、动态配置服务、动态 DNS 服务...【详细内容】
2023-08-03  Search: Nacos  点击:(319)  评论:(0)  加入收藏
Nacos 核心原理解读+高性能微服务系统实战无密径须沽取对君酌
Nacos 核心原理解读+高性能微服务系统download:https://www.sisuoit.com/itkecheng随着云计算和微服务架构的普及,注册中心作为微服务架构的重要组成部分,也变得越来越重要。Na...【详细内容】
2023-04-27  Search: Nacos  点击:(484)  评论:(0)  加入收藏
Spring Boot+Nacos+gRPC,一个区别于 OpenFeign 的微服务通信方案!
gRPC 的基础知识前面跟小伙伴们分享了很多了,今天再写一篇给这个小小的系列收个尾。我们前面介绍的都是 gRPC 的基本用法,最终目的当然是希望能够在 Spring Boot 中用上这个...【详细内容】
2023-04-04  Search: Nacos  点击:(233)  评论:(0)  加入收藏
一篇打通微服务架构,Nacos + Gateway + Redis + MySQL + Docker
基本组件Nginx、Gateway、Nacos、Sentinel、Ribbon、Feign、Seata、Redis、RabbitMQ、MySQL、docker、Vue。大家好,我是哪吒。今天分享一篇一站式微服务架构,读哪吒编程,品技术...【详细内容】
2023-02-27  Search: Nacos  点击:(188)  评论:(0)  加入收藏
▌简易百科推荐
手机就可以修改WiFi密码,进行网络提速,还能防止别人蹭网
随着网络的普及和使用频率的增加,很多人可能遇到了一些网络管理上的问题,比如忘记了WiFi密码、网络速度缓慢、或者发现有不明设备在家中蹭网。相信朋友们也曾遇到过吧?但是,你知...【详细内容】
2024-04-03  老毛桃    Tags:WiFi密码   点击:(9)  评论:(0)  加入收藏
手机WiFi信号满格却接收消息延迟?这里有妙招帮你解决!
在现代社会,手机已经成为了我们生活中不可或缺的一部分。无论是工作、学习还是娱乐,手机都扮演着重要的角色。然而,有时我们会遇到一些令人烦恼的问题,比如明明手机WiFi信号满格...【详细内容】
2024-04-03  蔡前进    Tags:手机WiFi   点击:(8)  评论:(0)  加入收藏
SASE技术应用落地的五个关键趋势
在Gartner 最新发布的《2023网络技术成熟度曲线》报告中认为,SASE技术已经开始走出最初的技术炒作期,将逐步迈向新一轮的实用落地阶段。在Gartner发布的《Hype Cycle for Ente...【详细内容】
2024-04-01    安全牛  Tags:SASE   点击:(12)  评论:(0)  加入收藏
提示“该网站安全证书存在问题,连接可能不安全”如何解决
在你输入网址并浏览网页时,如果你的浏览器弹出一个警告,提示“网站的安全证书存在问题”,或是显示一个红色的锁标志,这些都是网站不安全的警示。这些提示通常是由HTTPS协议中的S...【详细内容】
2024-03-18  倏然间    Tags:网站安全证书   点击:(10)  评论:(0)  加入收藏
如何有效排除CAN总线错误
控制器局域网(CAN)控制器局域网(CAN)是现代车辆中电子元件无缝运行的基础。在远程信息处理领域,CAN总线系统的效率至关重要,其能够实现支撑当今汽车技术的复杂功能。然而,CAN总...【详细内容】
2024-02-20    千家网  Tags:CAN   点击:(52)  评论:(0)  加入收藏
网络连接受限或无连接怎么办?这里提供几个修复办法
可能错误提示 连接受限或无连接:连接具有有限的连接或无连接。你可能无法访问Internet或某些网络资源。 连接受限。排除和解决“连接受限或无连接”错误此错误可能由计算机上...【详细内容】
2024-02-06  驾驭信息纵横科技    Tags:网络连接受限   点击:(50)  评论:(0)  加入收藏
如何将Mac连接到以太网?这里有详细步骤
在Wi-Fi成为最流行、最简单的互联网连接方式之前,每台Mac和电脑都使用以太网电缆连接。这是Mac可用端口的标准功能。如何将Mac连接到以太网如果你的Mac有以太网端口,则需要以...【详细内容】
2024-02-03  驾驭信息纵横科技    Tags:Mac   点击:(67)  评论:(0)  加入收藏
简易百科之什么是端口映射
端口映射,也称为端口转发,是一种网络通信中的技术手段,通过将内网中的一个端口上的数据流量转发到另一个端口,使得外部网络能够访问到内部网络中的特定服务。在实现上,端口映射通...【详细内容】
2024-01-26    简易百科  Tags:端口映射   点击:(163)  评论:(0)  加入收藏
ip因频繁登陆已被禁止访问 无法显示图片 怎么办
首先,我们要明白,部分网站为了有效遏制数据爬取和非法攻击,保证访问速度和普通用户查询,会在系统中增加网络安全设备,加强安全防护机制,并提前设置安全访问规则。因此,一旦用户的行...【详细内容】
2024-01-20  何福意思    Tags:ip   点击:(68)  评论:(0)  加入收藏
电脑连上wifi却上不了网怎么办
当电脑连接上 WiFi 却无法上网时,可能会让人感到困惑和沮丧。这个问题通常会有多种可能的原因,包括网络配置问题、路由器故障、无线适配器问题等。在面对这个问题时,可以尝试以...【详细内容】
2024-01-16  编程资料站    Tags:wifi   点击:(72)  评论:(0)  加入收藏
站内最新
站内热门
站内头条