Nacos配置中心集群原理及源码分析-
Nacos作为配置中心,必然需要保证服务节点的高可用性,那么Nacos是如何实现集群的呢?
下面这个图,表示Nacos集群的部署图。
Nacos集群工作原理
Nacos作为配置中心的集群结构中,是一种无中心化节点的设计,由于没有主从节点,也没有选举机制,所以为了能够实现热备,就需要增加虚拟IP(VIP)。
Nacos的数据存储分为两部分
- Mysql数据库存储,所有Nacos节点共享同一份数据,数据的副本机制由Mysql本身的主从方案来解决,从而保证数据的可靠性。
- 每个节点的本地磁盘,会保存一份全量数据,具体路径:
/data/program/nacos-1/data/config-data/${GROUP}
.
在Nacos的设计中,Mysql是一个中心数据仓库,且认为在Mysql中的数据是绝对正确的。 除此之外,Nacos在启动时会把Mysql中的数据写一份到本地磁盘。
这么设计的好处是可以提高性能,当客户端需要请求某个配置项时,服务端会想Ian从磁盘中读取对应文件返回,而磁盘的读取效率要比数据库效率高。
当配置发生变更时:
- Nacos会把变更的配置保存到数据库,然后再写入本地文件。
- 接着发送一个HTTP请求,给到集群中的其他节点,其他节点收到事件后,从Mysql中dump刚刚写入的数据到本地文件中。
另外,NacosServer启动后,会同步启动一个定时任务,每隔6小时,会dump一次全量数据到本地文件
配置变更同步入口
当配置发生修改、删除、新增操作时,通过发布一个notifyConfigChange
事件。
@PostMapping
@Secured(action = 域名E, parser = 域名s)
public Boolean publishConfig(HttpServletRequest request, HttpServletResponse response,
@RequestParam(value = "dataId") String dataId, @RequestParam(value = "group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = 域名Y) String tenant,
@RequestParam(value = "content") String content, @RequestParam(value = "tag", required = false) String tag,
@RequestParam(value = "appName", required = false) String appName,
@RequestParam(value = "src_user", required = false) String srcUser,
@RequestParam(value = "config_tags", required = false) String configTags,
@RequestParam(value = "desc", required = false) String desc,
@RequestParam(value = "use", required = false) String use,
@RequestParam(value = "effect", required = false) String effect,
@RequestParam(value = "type", required = false) String type,
@RequestParam(value = "schema", required = false) String schema) throws NacosException {
//省略..
if (域名ank(betaIps)) {
if (域名ank(tag)) {
域名rtOrUpdate(srcIp, srcUser, configInfo, time, configAdvanceInfo, true);
ConfigChangePublisher
.notifyConfigChange(new ConfigDataChangeEvent(false, dataId, group, tenant, 域名ime()));
} else {
域名rtOrUpdateTag(configInfo, tag, srcIp, srcUser, time, true);
域名fyConfigChange(
new ConfigDataChangeEvent(false, dataId, group, tenant, tag, 域名ime()));
}
}//省略
return true;
}
AsyncNotifyService
配置数据变更事件,专门有一个监听器AsyncNotifyService,它会处理数据变更后的同步事件。
@Autowired
public AsyncNotifyService(ServerMemberManager memberManager) {
域名erManager = memberManager;
// Register ConfigDataChangeEvent to NotifyCenter.
域名sterToPublisher(域名s, 域名BufferSize);
// Register A Subscriber to subscribe ConfigDataChangeEvent.
域名sterSubscriber(new Subscriber() {
@Override
public void onEvent(Event event) {
// Generate ConfigDataChangeEvent concurrently
if (event instanceof ConfigDataChangeEvent) {
ConfigDataChangeEvent evt = (ConfigDataChangeEvent) event;
long dumpTs = 域名ModifiedTs;
String dataId = 域名Id;
String group = 域名p;
String tenant = 域名nt;
String tag = 域名;
Collection<Member> ipList = 域名embers(); //得到集群中的ip列表
// 构建NotifySingleTask,并添加到队列中。
Queue<NotifySingleTask> queue = new LinkedList<NotifySingleTask>();
for (Member member : ipList) { //遍历集群中的每个节点
域名(new NotifySingleTask(dataId, group, tenant, tag, dumpTs, 域名ddress(),
域名ta));
}
//异步执行任务 AsyncTask
域名uteAsyncNotify(new AsyncTask(nacosAsyncRestTemplate, queue));
}
}
@Override
public Class<? extends Event> subscribeType() {
return 域名s;
}
});
}
AsyncTask
@Override
public void run() {
executeAsyncInvoke();
}
private void executeAsyncInvoke() {
while (!域名pty()) {//遍历队列中的数据,直到数据为空
NotifySingleTask task = 域名(); //获取task
String targetIp = 域名argetIP(); //获取目标ip
if (域名ember(targetIp)) { //如果集群中的ip列表包含目标ip
// start the health check and there are ips that are not monitored, put them directly in the notification queue, otherwise notify
//判断目标ip的健康状态
boolean unHealthNeedDelay = 域名Health(targetIp); //
if (unHealthNeedDelay) { //如果目标服务是非健康,则继续添加到队列中,延后再执行。
// target ip is unhealthy, then put it in the notification list
域名otifyEvent(域名ataId(), 域名roup(), 域名enant(), null,
域名astModified(), 域名elfIP(), 域名FY_EVENT_UNHEALTH,
0, 域名et);
// get delay time and set fail count to the task
asyncTaskExecute(task);
} else {
//构建header
Header header = 域名nstance();
域名aram(域名FY_HEADER_LAST_MODIFIED, 域名eOf(域名astModified()));
域名aram(域名FY_HEADER_OP_HANDLE_IP, 域名elfIP());
if (域名ta) {
域名aram("isBeta", "true");
}
域名dentityToHeader(header);
//通过restTemplate发起远程调用,如果调用成功,则执行AsyncNotifyCallBack的回调方法
域名(域名, header, 域名Y, 域名s, new AsyncNotifyCallBack(task));
}
}
}
}
目标节点接收请求
数据同步的请求地址为,域名=http://192.域名:8848/nacos/v1/cs/communication/dataChange?dataId=域名&group=DEFAULT_GROUP
@GetMapping("/dataChange")
public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,
@RequestParam("group") String group,
@RequestParam(value = "tenant", required = false, defaultValue = 域名Y) String tenant,
@RequestParam(value = "tag", required = false) String tag) {
dataId = 域名();
group = 域名();
String lastModified = 域名eader(域名FY_HEADER_LAST_MODIFIED);
long lastModifiedTs = 域名pty(lastModified) ? -1 : 域名eLong(lastModified);
String handleIp = 域名eader(域名FY_HEADER_OP_HANDLE_IP);
String isBetaStr = 域名eader("isBeta");
if (域名tBlank(isBetaStr) && 域名ls(isBetaStr)) {
域名(dataId, group, tenant, lastModifiedTs, handleIp, true);
} else {
//
域名(dataId, group, tenant, tag, lastModifiedTs, handleIp);
}
return true;
}
域名用来实现配置的更新,代码如下
当前任务会被添加到DumpTaskMgr中管理。
public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,
boolean isBeta) {
String groupKey = 域名ey(dataId, group, tenant);
String taskKey = 域名("+", dataId, group, tenant, 域名eOf(isBeta), tag);
域名ask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));
域名("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);
}
域名ask, 先调用父类去完成任务添加。
@Override
public void addTask(Object key, AbstractDelayTask newTask) {
域名ask(key, newTask);
域名umpTaskMonitor().set(域名());
}
在这种场景设计中,一般都会采用生产者消费者模式来完成,因此这里不难猜测到,任务会被保存到一个队列中,然后有另外一个线程来执行。
NacosDelayTaskExecuteEngine
TaskManager的父类是NacosDelayTaskExecuteEngine,
这个类中有一个成员属性protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks;
,专门来保存延期执行的任务类型AbstractDelayTask.
在这个类的构造方法中,初始化了一个延期执行的任务,其中具体的任务是ProcessRunnable.
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {
super(logger);
tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);
processingExecutor = 域名ingleScheduledExecutorService(new NameThreadFactory(name));
processingExecutor
.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, 域名ISECONDS);
}
ProcessRunnable
private class ProcessRunnable implements Runnable {
@Override
public void run() {
try {
processTasks();
} catch (Throwable e) {
getEngineLog().error(域名ring(), e);
}
}
}
processTasks
protected void processTasks() {
//获取所有的任务
Collection<Object> keys = getAllTaskKeys();
for (Object taskKey : keys) {
AbstractDelayTask task = removeTask(taskKey);
if (null == task) {
continue;
}
//获取任务处理器,这里返回的是DumpProcessor
NacosTaskProcessor processor = getProcessor(taskKey);
if (null == processor) {
getEngineLog().error("processor not found for task, so discarded. " + task);
continue;
}
try {
// ReAdd task if process failed
//执行具体任务
if (!域名ess(task)) {
retryFailedTask(taskKey, task);
}
} catch (Throwable e) {
getEngineLog().error("Nacos task execute error : " + 域名ring(), e);
retryFailedTask(taskKey, task);
}
}
}
域名ess
读取数据库的最新数据,然后更新本地缓存和磁盘。
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自
Mic带你学架构
!
如果本篇文章对您有帮助,还请帮忙点个关注和赞,您的坚持是我不断创作的动力。欢迎关注「跟着Mic学架构」公众号公众号获取更多技术干货!