這篇文章主要介紹了從Nacos客戶端視角來(lái)看看配置中心實(shí)現(xiàn)原理是什么,具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。
創(chuàng)新互聯(lián)建站主要從事做網(wǎng)站、成都做網(wǎng)站、網(wǎng)頁(yè)設(shè)計(jì)、企業(yè)做網(wǎng)站、公司建網(wǎng)站等業(yè)務(wù)。立足成都服務(wù)美蘭,十多年網(wǎng)站建設(shè)經(jīng)驗(yàn),價(jià)格優(yōu)惠、服務(wù)專業(yè),歡迎來(lái)電咨詢建站服務(wù):13518219792
今天我們一起從Nacos客戶端視角來(lái)看看配置中心實(shí)現(xiàn)原理;整理這篇文章時(shí)候,也參照學(xué)習(xí)了部分大佬的博客,這里致謝;
在開(kāi)始閱讀文章之前,有些思路我按我的理解先闡述一些,方便大家更快理清思路,不對(duì)的地方還請(qǐng)大家批評(píng)指正;
Nacos客戶端會(huì)在在本地緩存服務(wù)端配置文件,防止服務(wù)器奔潰情況下,導(dǎo)致服務(wù)不可用;
本地緩存類在代碼中的體現(xiàn)就是我們下面提到的CacheData,我們知道對(duì)應(yīng)服務(wù)端一個(gè)配置,肯定可以同時(shí)被多個(gè)客戶端所使用,當(dāng)這個(gè)配置發(fā)生變更,如何去通知到每一個(gè)客戶端?
客戶端啟動(dòng)之后,回去注冊(cè)監(jiān)視器,監(jiān)視器最終會(huì)被保存到CacheData類中CopyOnWriteArrayList
長(zhǎng)輪詢左右主要就是刷新配置,保持服務(wù)端配置和本地緩存配置保持一致;
首先,我們來(lái)看看Nacos官網(wǎng)給出的Nacos地圖,我們可以清楚的看到,動(dòng)態(tài)配置服務(wù)是 Nacos 的三大功能之一;
這里借用官網(wǎng)的描述,一起來(lái)看看Nacos 為我們帶來(lái)什么黑科技?
動(dòng)態(tài)配置服務(wù)可以讓您以中心化、外部化和動(dòng)態(tài)化的方式管理所有環(huán)境的應(yīng)用配置和服務(wù)配置。動(dòng)態(tài)配置消除了配置變更時(shí)重新部署應(yīng)用和服務(wù)的需要,讓配置管理變得更加高效和敏捷。配置中心化管理讓實(shí)現(xiàn)無(wú)狀態(tài)服務(wù)變得更簡(jiǎn)單,讓服務(wù)按需彈性擴(kuò)展變得更容易。
所以,有了Nacos ,可能我們以前上線打包弄錯(cuò)配置文件,改配置需要重啟服務(wù)等一系列問(wèn)題,都會(huì)顯著改觀
下面我將來(lái)和大家一起來(lái)了解下 Nacos 的動(dòng)態(tài)配置的能力,看看 Nacos 是如何以簡(jiǎn)單、優(yōu)雅、高效的方式管理配置,實(shí)現(xiàn)配置的動(dòng)態(tài)變更的。
我們用一個(gè)簡(jiǎn)單的例子來(lái)了解下 Nacos 的動(dòng)態(tài)配置的功能。
首先,我們需要搭建一個(gè)Nacos 服務(wù)端,由于官網(wǎng)的quick-start已經(jīng)對(duì)此做了詳細(xì)的解讀,我們這里就不在贅述
https://nacos.io/zh-cn/docs/quick-start.html
安裝完成之后啟動(dòng),我們就可以訪問(wèn) Nacos 的控制臺(tái)了,如下圖所示:
Nacos控制臺(tái)做了簡(jiǎn)單的權(quán)限控制,默認(rèn)的賬號(hào)和密碼都是 nacos。
登錄進(jìn)去之后,是這樣的:
接下來(lái)我們?cè)诳刂婆_(tái)上創(chuàng)建一個(gè)簡(jiǎn)單的配置項(xiàng),如下圖所示:
Nacos支持導(dǎo)入配置,可以直接將配置文件壓縮包導(dǎo)入,這里我們以人人開(kāi)源的微服務(wù)項(xiàng)目為例
下面我以自己搭建的子服務(wù)為例,一起來(lái)看看Nacos配置中心的使用
首先我們需要配置一下,大家只需關(guān)注config節(jié)點(diǎn)配置就可以,discovery節(jié)點(diǎn)可以忽略
cloud: nacos: discovery: metadata: management: context-path: ${server.servlet.context-path}/actuator server-addr: ${nacos-host:nacos-host}:${nacos-port:8848} #nacos的命名空間ID,默認(rèn)是public namespace: ${nacos-namespace:} service: ets-web config: server-addr: ${spring.cloud.nacos.discovery.server-addr} namespace: ${spring.cloud.nacos.discovery.namespace} group: RENREN_CLOUD_GROUP file-extension: yaml #指定共享配置,且支持動(dòng)態(tài)刷新 extension-configs: - data-id: datasource.yaml group: ${spring.cloud.nacos.config.group} refresh: true - data-id: common.yaml group: ${spring.cloud.nacos.config.group} refresh: true
其實(shí)extension-configs節(jié)點(diǎn)的配置信息對(duì)應(yīng)的是下面的類
接下來(lái)我們啟動(dòng)服務(wù),來(lái)看看控制臺(tái)日志
接下來(lái)我們?cè)?Nacos 的控制臺(tái)上將我們的配置信息改為如下圖所示:
修改完配置,點(diǎn)擊 “發(fā)布” 按鈕后,客戶端將會(huì)收到最新的數(shù)據(jù),如下圖所示:
至此一個(gè)簡(jiǎn)單的動(dòng)態(tài)配置管理功能已經(jīng)講完了,刪除配置和更新配置操作類似,這里不再贅述。
通過(guò)上面的小案例,我們大概了解了Nacos動(dòng)態(tài)配置的服務(wù)的使用方法,Nacos服務(wù)端將配置信息保存到其配置文件所配置的數(shù)據(jù)庫(kù)中,客戶端連接到服務(wù)端之后,根據(jù) dataID,Group可以獲取到具體的配置信息,當(dāng)服務(wù)端的配置發(fā)生變更時(shí),客戶端會(huì)收到通知。當(dāng)客戶端拿到變更后的最新配置信息后,就可以做自己的處理了,這非常有用,所有需要使用配置的場(chǎng)景都可以通過(guò) Nacos 來(lái)進(jìn)行管理。
現(xiàn)在我們了解了 Nacos 的動(dòng)態(tài)配置服務(wù)的功能了,但是有一個(gè)問(wèn)題我們需要弄明白,那就是 Nacos 客戶端是怎么實(shí)時(shí)獲取到 Nacos 服務(wù)端的最新數(shù)據(jù)的。
其實(shí)客戶端和服務(wù)端之間的數(shù)據(jù)交互,無(wú)外乎兩種情況:
服務(wù)端推數(shù)據(jù)給客戶端
客戶端從服務(wù)端拉數(shù)據(jù)
那到底是推還是拉呢,從 Nacos 客戶端通過(guò) Listener 來(lái)接收最新數(shù)據(jù)的這個(gè)做法來(lái)看,感覺(jué)像是服務(wù)端推的數(shù)據(jù),但是不能想當(dāng)然,要想知道答案,最快最準(zhǔn)確的方法就是從源碼中去尋找。
try { // 傳遞配置 String serverAddr = "{serverAddr}"; String dataId = "{dataId}"; String group = "{group}"; Properties properties = new Properties(); properties.put("serverAddr", serverAddr); // 新建 configService ConfigService configService = NacosFactory.createConfigService(properties); String content = configService.getConfig(dataId, group, 5000); System.out.println(content); // 注冊(cè)監(jiān)聽(tīng)器 configService.addListener(dataId, group, new Listener() { @Override public void receiveConfigInfo(String configInfo) { System.out.println("recieve1:" + configInfo); } @Override public Executor getExecutor() { return null; } }); } catch (NacosException e) { // TODO -generated catch block e.printStackTrace(); }
當(dāng)我們引包結(jié)束以后,會(huì)發(fā)現(xiàn)下面三個(gè)關(guān)于Nacos的包
從我的理解來(lái)說(shuō),api包會(huì)調(diào)用client包的能力來(lái)和Nacos服務(wù)端進(jìn)行交互.那再交互時(shí)候,主要就會(huì)用到我們接下來(lái)分析的實(shí)現(xiàn)了ConfigService接口的NacosConfigService 類
現(xiàn)在我們來(lái)看下 NacosConfigService 的構(gòu)造方法,看看 ConfigService 是怎么實(shí)例化的,如下圖所示:
public class NacosConfigService implements ConfigService {
private static final Logger LOGGER = LogUtils.logger(NacosConfigService.class);
private static final long POST_TIMEOUT = 3000L;
/**
* http agent.
*/
private final HttpAgent agent;
/**
* long polling. 這里是長(zhǎng)輪詢
*/
private final ClientWorker worker;
private String namespace;
private final String encode;
//省略其他代碼
//構(gòu)造方法 ic NacosConfigService(Properties properties) throws NacosException { ValidatorUtils.checkInitParam(properties); String encodeTmp = properties.getProperty(PropertyKeyConst.ENCODE); if (StringUtils.isBlank(encodeTmp)) { this.encode = Constants.ENCODE; } else { this.encode = encodeTmp.trim(); } initNamespace(properties); //對(duì)象1 this.agent = new MetricsHttpAgent(new ServerHttpAgent(properties)); this.agent.start(); //對(duì)象2 this.worker = new ClientWorker(this.agent, this.configFilterChainManager, properties); }
實(shí)例化時(shí)主要是初始化了兩個(gè)對(duì)象,他們分別是:
HttpAgent
ClientWorker
其中 agent 是通過(guò)裝飾器模式實(shí)現(xiàn)的,ServerHttpAgent 是實(shí)際工作的類,MetricsHttpAgent 在內(nèi)部也是調(diào)用了 ServerHttpAgent 的方法,另外加上了一些統(tǒng)計(jì)操作,所以我們只需要關(guān)心 ServerHttpAgent 的功能就可以了。
不熟悉的同學(xué),可以看菜鳥(niǎo)教程對(duì)裝飾器模式的解讀
agent 實(shí)際是在 ClientWorker 中發(fā)揮能力的,而 ClientWorker 也是真正的打工人,下面我們來(lái)看下 ClientWorker 類。
以下是 ClientWorker 的構(gòu)造方法,如下圖所示:
public ClientWorker(final HttpAgent agent, final ConfigFilterChainManager configFilterChainManager, final Properties properties) { this.agent = agent; this.configFilterChainManager = configFilterChainManager; // Initialize the timeout parameter init(properties); //創(chuàng)建了一個(gè)定時(shí)任務(wù)的線程池 this.executor = Executors.newScheduledThreadPool(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker." + agent.getName()); t.setDaemon(true); return t; } }); //創(chuàng)建了一個(gè)保持長(zhǎng)輪詢的線程池 this.executorService = Executors .newScheduledThreadPool(Runtime.getRuntime().availableProcessors(), new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setName("com.alibaba.nacos.client.Worker.longPolling." + agent.getName()); t.setDaemon(true); return t; } }); //創(chuàng)建了一個(gè)延遲任務(wù)線程池來(lái)每隔10ms來(lái)檢查配置信息的線程池 this.executor.scheduleWithFixedDelay(new Runnable() { @Override public void run() { try { checkConfigInfo(); } catch (Throwable e) { LOGGER.error("[" + agent.getName() + "] [sub-check] rotate check error", e); } } }, 1L, 10L, TimeUnit.MILLISECONDS); }
可以看到 ClientWorker 除了將 HttpAgent 維持在自己內(nèi)部,還創(chuàng)建了兩個(gè)線程池:
final ScheduledExecutorService executor; final ScheduledExecutorService executorService;
第一個(gè)線程池負(fù)責(zé)與配置中心進(jìn)行數(shù)據(jù)的交互,并且啟動(dòng)后延遲1ms,之后每隔10ms對(duì)配置信息進(jìn)行定時(shí)檢查
第二個(gè)線程池則是負(fù)責(zé)保持一個(gè)長(zhǎng)輪詢鏈接
接下來(lái)讓我們來(lái)看下 executor 每 10ms 執(zhí)行的方法到底做了什么工作,如下圖所示:
/**
* groupKey -> cacheData.
*/
private final AtomicReference<Map<String, CacheData>> cacheMap = new AtomicReference<Map<String, CacheData>>(
new HashMap<String, CacheData>());
/** * Check config info. 檢查配置信息 */ public void checkConfigInfo() { // 分任務(wù)(解決大數(shù)據(jù)量的傳輸問(wèn)題) int listenerSize = cacheMap.get().size(); // 向上取整為批數(shù),分批次進(jìn)行檢查 //ParamUtil.getPerTaskConfigSize() =3000 int longingTaskCount = (int) Math.ceil(listenerSize / ParamUtil.getPerTaskConfigSize()); if (longingTaskCount > currentLongingTaskCount) { for (int i = (int) currentLongingTaskCount; i < longingTaskCount; i++) { // 要判斷任務(wù)是否在執(zhí)行 這塊需要好好想想。 任務(wù)列表現(xiàn)在是無(wú)序的。變化過(guò)程可能有問(wèn)題 executorService.execute(new LongPollingRunnable(i)); } currentLongingTaskCount = longingTaskCount; } }
這里主要是先去拿緩存中 Map
現(xiàn)在我們來(lái)看看 LongPollingRunnable 做了什么,主要分為兩部分,
第一部分是檢查本地的配置信息,
第二部分是獲取服務(wù)端的配置信息然后更新到本地。
首先取出與該 taskId 相關(guān)的 CacheData,然后對(duì) CacheData 進(jìn)行檢查,包括本地配置檢查和緩存數(shù)據(jù)的 md5 檢查,本地檢查主要是做一個(gè)故障容錯(cuò),當(dāng)服務(wù)端掛掉后,Nacos 客戶端可以從本地的文件系統(tǒng)中獲取相關(guān)的配置信息,如下圖所示:
public void run() {
List<CacheData> cacheDatas = new ArrayList<CacheData>();
List<String> inInitializingCacheList = new ArrayList<String>();
try {
//
for (CacheData cacheData : cacheMap.get().values()) {
if (cacheData.getTaskId() == taskId) {
cacheDatas.add(cacheData);
try {
//執(zhí)行檢查本地配置
checkLocalConfig(cacheData);
if (cacheData.isUseLocalConfigInfo()) {
//緩存數(shù)據(jù)的md5的檢查
cacheData.checkListenerMd5();
}
} catch (Exception e) {
LOGGER.error("get local config info error", e);
}
}
}
}
//檢查本地配置 private void checkLocalConfig(CacheData cacheData) { final String dataId = cacheData.dataId; final String group = cacheData.group; final String tenant = cacheData.tenant; //本地緩存文件 File path = LocalConfigInfoProcessor.getFailoverFile(agent.getName(), dataId, group, tenant); //不使用本地配置,但是持久化文件存在,需要讀取文件加載至內(nèi)存 if (!cacheData.isUseLocalConfigInfo() && path.exists()) { String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE); cacheData.setUseLocalConfigInfo(true); cacheData.setLocalConfigInfoVersion(path.lastModified()); cacheData.setContent(content); LOGGER.warn( "[{}] [failover-change] failover file created. dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content)); return; } // 有 -> 沒(méi)有。不通知業(yè)務(wù)監(jiān)聽(tīng)器,從server拿到配置后通知。 //使用本地配置,但是持久化文件不存在 if (cacheData.isUseLocalConfigInfo() && !path.exists()) { cacheData.setUseLocalConfigInfo(false); LOGGER.warn("[{}] [failover-change] failover file deleted. dataId={}, group={}, tenant={}", agent.getName(), dataId, group, tenant); return; } // 有變更 //使用本地配置,持久化文件存在,緩存跟文件最后修改時(shí)間不一致 if (cacheData.isUseLocalConfigInfo() && path.exists() && cacheData.getLocalConfigInfoVersion() != path .lastModified()) { String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); final String md5 = MD5Utils.md5Hex(content, Constants.ENCODE); cacheData.setUseLocalConfigInfo(true); cacheData.setLocalConfigInfoVersion(path.lastModified()); cacheData.setContent(content); LOGGER.warn( "[{}] [failover-change] failover file changed. dataId={}, group={}, tenant={}, md5={}, content={}", agent.getName(), dataId, group, tenant, md5, ContentUtils.truncateContent(content)); } }
本地檢查主要是通過(guò)是否使用本地配置,繼而尋找持久化緩存文件,再通過(guò)判斷文件的最后修改事件與本地緩存的版本是否一致來(lái)判斷是否由變更
通過(guò)跟蹤 checkLocalConfig 方法,可以看到 Nacos 將緩存配置信息保存在了
~/nacos/config/fixed-{address}_8848_nacos/snapshot/DEFAULT_GROUP/{dataId}
這個(gè)文件中,我們看下這個(gè)文件中保存的內(nèi)容,如下圖所示:
然后通過(guò) checkUpdateDataIds() 方法從服務(wù)端獲取值變化的 dataId 列表,
通過(guò) getServerConfig 方法,根據(jù) dataId 到服務(wù)端獲取最新的配置信息,接著將最新的配置信息保存到 CacheData 中。
最后調(diào)用 CacheData 的 checkListenerMd5 方法,可以看到該方法在第一部分也被調(diào)用過(guò),我們需要重點(diǎn)關(guān)注一下。
// 檢查服務(wù)器配置 List<String> changedGroupKeys = checkUpdateDataIds(cacheDatas, inInitializingCacheList); if (!CollectionUtils.isEmpty(changedGroupKeys)) { LOGGER.info("get changedGroupKeys:" + changedGroupKeys); } for (String groupKey : changedGroupKeys) { String[] key = GroupKey.parseKey(groupKey); String dataId = key[0]; String group = key[1]; String tenant = null; if (key.length == 3) { tenant = key[2]; } try { //從服務(wù)器端獲取相關(guān)id的最新配置 String[] ct = getServerConfig(dataId, group, tenant, 3000L); CacheData cache = cacheMap.get().get(GroupKey.getKeyTenant(dataId, group, tenant)); cache.setContent(ct[0]); if (null != ct[1]) { cache.setType(ct[1]); } LOGGER.info("[{}] [data-received] dataId={}, group={}, tenant={}, md5={}, content={}, type={}", agent.getName(), dataId, group, tenant, cache.getMd5(), ContentUtils.truncateContent(ct[0]), ct[1]); } catch (NacosException ioe) { String message = String .format("[%s] [get-update] get changed config exception. dataId=%s, group=%s, tenant=%s", agent.getName(), dataId, group, tenant); LOGGER.error(message, ioe); } } for (CacheData cacheData : cacheDatas) { if (!cacheData.isInitializing() || inInitializingCacheList .contains(GroupKey.getKeyTenant(cacheData.dataId, cacheData.group, cacheData.tenant))) { //校驗(yàn)MD5值 cacheData.checkListenerMd5(); cacheData.setInitializing(false); } } inInitializingCacheList.clear(); executorService.execute(this); catch (Throwable e) { // If the rotation training task is abnormal, the next execution time of the task will be punished LOGGER.error("longPolling error : ", e); executorService.schedule(this, taskPenaltyTime, TimeUnit.MILLISECONDS);
這里大家也發(fā)現(xiàn),當(dāng)客戶端從服務(wù)器拉去配置文件之后,會(huì)將配置文件在本地進(jìn)行緩存,所以,一般會(huì)優(yōu)先使用本地配置,如果本地文件不存在或者內(nèi)容為空,則再通過(guò) HTTP GET 方法從遠(yuǎn)端拉取配置,并保存到本地緩存中
private String getConfigInner(String tenant, String dataId, String group, long timeoutMs) throws NacosException { group = null2defaultGroup(group); ParamUtils.checkKeyParam(dataId, group); ConfigResponse cr = new ConfigResponse(); cr.setDataId(dataId); cr.setTenant(tenant); cr.setGroup(group); // 優(yōu)先使用本地配置 String content = LocalConfigInfoProcessor.getFailover(agent.getName(), dataId, group, tenant); if (content != null) { LOGGER.warn("[{}] [get-config] get failover ok, dataId={}, group={}, tenant={}, config={}", agent.getName(), dataId, group, tenant, ContentUtils.truncateContent(content)); cr.setContent(content); configFilterChainManager.doFilter(null, cr); content = cr.getContent(); return content; } try { String[] ct = worker.getServerConfig(dataId, group, tenant, timeoutMs); cr.setContent(ct[0]); configFilterChainManager.doFilter(null, cr); content = cr.getContent(); return content; } catch (NacosException ioe) { if (NacosException.NO_RIGHT == ioe.getErrCode()) { throw ioe; } LOGGER.warn("[{}] [get-config] get from server error, dataId={}, group={}, tenant={}, msg={}", agent.getName(), dataId, group, tenant, ioe.toString()); } LOGGER.warn("[{}] [get-config] get snapshot ok, dataId={}, group={}, tenant={}, config={}", agent.getName(), dataId, group, tenant, ContentUtils.truncateContent(content)); content = LocalConfigInfoProcessor.getSnapshot(agent.getName(), dataId, group, tenant); cr.setContent(content); configFilterChainManager.doFilter(null, cr); content = cr.getContent(); return content; }
好了現(xiàn)在我們可以為 ConfigService 來(lái)添加一個(gè) Listener 了,最終是調(diào)用了 ClientWorker 的 addTenantListeners 方法,如下圖所示:
/** * Add listeners for tenant. * * @param dataId dataId of data * @param group group of data * @param listeners listeners * @throws NacosException nacos exception */ public void addTenantListeners(String dataId, String group, List<? extends Listener> listeners) throws NacosException { //設(shè)置默認(rèn)組 group = null2defaultGroup(group); String tenant = agent.getTenant(); CacheData cache = addCacheDataIfAbsent(dataId, group, tenant); for (Listener listener : listeners) { cache.addListener(listener); } }
該方法分為兩個(gè)部分,首先根據(jù) dataId,group 和tenant獲取一個(gè) CacheData 對(duì)象,然后將當(dāng)前要添加的 listener 對(duì)象添加到 CacheData 中去。
接下來(lái),我們要重點(diǎn)關(guān)注下 CacheData 類了。
首先讓我們來(lái)看一下 CacheData 中的成員變量,如下圖所示:
private final String name; private final ConfigFilterChainManager configFilterChainManager; public final String dataId; public final String group; public final String tenant; //監(jiān)聽(tīng)器 private final CopyOnWriteArrayList<ManagerListenerWrap> listeners; private volatile String md5; /** * whether use local config. */ private volatile boolean isUseLocalConfig = false; /** * last modify time. */ private volatile long localConfigLastModified; private volatile String content; private int taskId; private volatile boolean isInitializing = true; private String type;
我們可以看到,成員變量包括tenant ,dataId,group,content,taskId等,還有兩個(gè)值得我們關(guān)注的:
listeners
md5
listeners 是該 CacheData 所關(guān)聯(lián)的所有 listener,不過(guò)不是保存的原始的 Listener對(duì)象,而是包裝后的 ManagerListenerWrap 對(duì)象,該對(duì)象除了持有 Listener 對(duì)象,還持有了一個(gè) lastCallMd5 和lastContent屬性。
private static class ManagerListenerWrap { final Listener listener; //關(guān)注 String lastCallMd5 = CacheData.getMd5String(null); String lastContent = null; ManagerListenerWrap(Listener listener) { this.listener = listener; } ManagerListenerWrap(Listener listener, String md5) { this.listener = listener; this.lastCallMd5 = md5; } ManagerListenerWrap(Listener listener, String md5, String lastContent) { this.listener = listener; this.lastCallMd5 = md5; this.lastContent = lastContent; } }
另外一個(gè)屬性 md5 就是根據(jù)當(dāng)前對(duì)象的 content 計(jì)算出來(lái)的 md5 值。
現(xiàn)在我們對(duì) ConfigService 有了大致的了解了,現(xiàn)在剩下最后一個(gè)重要的問(wèn)題還沒(méi)有答案,那就是 ConfigService 的 Listener 是在什么時(shí)候觸發(fā)回調(diào)方法 receiveConfigInfo 的。
現(xiàn)在讓我們回過(guò)頭來(lái)想一下,在 ClientWorker 中的定時(shí)任務(wù)中,啟動(dòng)了一個(gè)長(zhǎng)輪詢的任務(wù):LongPollingRunnable,該任務(wù)多次執(zhí)行了 cacheData.checkListenerMd5() 方法,那現(xiàn)在就讓我們來(lái)看下這個(gè)方法到底做了些什么,如下圖所示:
void checkListenerMd5() { for (ManagerListenerWrap wrap : listeners) { if (!md5.equals(wrap.lastCallMd5)) { safeNotifyListener(dataId, group, content, type, md5, wrap); } } }
到這里應(yīng)該就比較清晰了,該方法會(huì)檢查 CacheData 當(dāng)前的 md5 與 CacheData 持有的所有 Listener 中保存的 md5 的值是否一致,如果不一致,就執(zhí)行一個(gè)安全的監(jiān)聽(tīng)器的通知方法:safeNotifyListener,通知什么呢?我們可以大膽的猜一下,應(yīng)該是通知 Listener 的使用者,該 Listener 所關(guān)注的配置信息已經(jīng)發(fā)生改變了?,F(xiàn)在讓我們來(lái)看一下 safeNotifyListener 方法,如下圖所示:
private void safeNotifyListener(final String dataId, final String group, final String content, final String type, final String md5, final ManagerListenerWrap listenerWrap) { final Listener listener = listenerWrap.listener; Runnable job = new Runnable() { @Override public void run() { ClassLoader myClassLoader = Thread.currentThread().getContextClassLoader(); ClassLoader appClassLoader = listener.getClass().getClassLoader(); try { if (listener instanceof AbstractSharedListener) { AbstractSharedListener adapter = (AbstractSharedListener) listener; adapter.fillContext(dataId, group); LOGGER.info("[{}] [notify-context] dataId={}, group={}, md5={}", name, dataId, group, md5); } // 執(zhí)行回調(diào)之前先將線程classloader設(shè)置為具體webapp的classloader,以免回調(diào)方法中調(diào)用spi接口是出現(xiàn)異常或錯(cuò)用(多應(yīng)用部署才會(huì)有該問(wèn)題)。 Thread.currentThread().setContextClassLoader(appClassLoader); ConfigResponse cr = new ConfigResponse(); cr.setDataId(dataId); cr.setGroup(group); cr.setContent(content); //重點(diǎn)關(guān)注,在這里調(diào)用 //重點(diǎn)關(guān)注,在這里調(diào)用 //重點(diǎn)關(guān)注,在這里調(diào)用 configFilterChainManager.doFilter(null, cr); String contentTmp = cr.getContent(); listener.receiveConfigInfo(contentTmp); // compare lastContent and content if (listener instanceof AbstractConfigChangeListener) { Map data = ConfigChangeHandler.getInstance() .parseChangeData(listenerWrap.lastContent, content, type); ConfigChangeEvent event = new ConfigChangeEvent(data); ((AbstractConfigChangeListener) listener).receiveConfigChange(event); listenerWrap.lastContent = content; } listenerWrap.lastCallMd5 = md5; LOGGER.info("[{}] [notify-ok] dataId={}, group={}, md5={}, listener={} ", name, dataId, group, md5, listener); } catch (NacosException ex) { LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} errCode={} errMsg={}", name, dataId, group, md5, listener, ex.getErrCode(), ex.getErrMsg()); } catch (Throwable t) { LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} tx={}", name, dataId, group, md5, listener, t.getCause()); } finally { Thread.currentThread().setContextClassLoader(myClassLoader); } } }; final long startNotify = System.currentTimeMillis(); try { if (null != listener.getExecutor()) { listener.getExecutor().execute(job); } else { job.run(); } } catch (Throwable t) { LOGGER.error("[{}] [notify-error] dataId={}, group={}, md5={}, listener={} throwable={}", name, dataId, group, md5, listener, t.getCause()); } final long finishNotify = System.currentTimeMillis(); LOGGER.info("[{}] [notify-listener] time cost={}ms in ClientWorker, dataId={}, group={}, md5={}, listener={} ", name, (finishNotify - startNotify), dataId, group, md5, listener); }
可以看到在 safeNotifyListener 方法中,重點(diǎn)關(guān)注下紅框中的三行代碼:獲取最新的配置信息,調(diào)用 Listener 的回調(diào)方法,將最新的配置信息作為參數(shù)傳入,這樣 Listener 的使用者就能接收到變更后的配置信息了,最后更新 ListenerWrap 的 md5 值。和我們猜測(cè)的一樣, Listener 的回調(diào)方法就是在該方法中觸發(fā)的。
那 CacheData 的 md5 值是何時(shí)發(fā)生改變的呢?我們可以回想一下,在上面的 LongPollingRunnable 所執(zhí)行的任務(wù)中,在獲取服務(wù)端發(fā)生變更的配置信息時(shí),將最新的 content 數(shù)據(jù)寫(xiě)入了 CacheData 中,我們可以看下該方法如下:
public void setContent(String content) { this.content = content; this.md5 = getMd5String(this.content); }
可以看到是在長(zhǎng)輪詢的任務(wù)中,當(dāng)服務(wù)端配置信息發(fā)生變更時(shí),客戶端將最新的數(shù)據(jù)獲取下來(lái)之后,保存在了 CacheData 中,同時(shí)更新了該 CacheData 的 md5 值,所以當(dāng)下次執(zhí)行 checkListenerMd5 方法時(shí),就會(huì)發(fā)現(xiàn)當(dāng)前 listener 所持有的 md5 值已經(jīng)和 CacheData 的 md5 值不一樣了,也就意味著服務(wù)端的配置信息發(fā)生改變了,這時(shí)就需要將最新的數(shù)據(jù)通知給 Listener 的持有者。
至此配置中心的完整流程已經(jīng)分析完畢了,可以發(fā)現(xiàn),Nacos 并不是通過(guò)推的方式將服務(wù)端最新的配置信息發(fā)送給客戶端的,而是客戶端維護(hù)了一個(gè)長(zhǎng)輪詢的任務(wù),定時(shí)去拉取發(fā)生變更的配置信息,然后將最新的數(shù)據(jù)推送給 Listener 的持有者。
客戶端拉取服務(wù)端的數(shù)據(jù)與服務(wù)端推送數(shù)據(jù)給客戶端相比,優(yōu)勢(shì)在哪呢,為什么 Nacos 不設(shè)計(jì)成主動(dòng)推送數(shù)據(jù),而是要客戶端去拉取呢?如果用推的方式,服務(wù)端需要維持與客戶端的長(zhǎng)連接,這樣的話需要耗費(fèi)大量的資源,并且還需要考慮連接的有效性,例如需要通過(guò)心跳來(lái)維持兩者之間的連接。而用拉取的方式,客戶端只需要通過(guò)一個(gè)無(wú)狀態(tài)的 http 請(qǐng)求即可獲取到服務(wù)端的數(shù)據(jù)。
現(xiàn)在,我們來(lái)簡(jiǎn)單復(fù)盤(pán)一下Nacos客戶端視角下的配置中心實(shí)現(xiàn)原理
首先我們假設(shè)Nacos服務(wù)端一切正常,Nacos客戶端啟動(dòng)以后
第一步是根據(jù)我們配置的服務(wù)端信息,新建 ConfigService 實(shí)例,它的實(shí)現(xiàn)就是我們文中提到的NacosConfigService;
第二步可以通過(guò)相應(yīng)的接口獲取配置和注冊(cè)配置監(jiān)聽(tīng)器,
考慮到服務(wù)端故障的問(wèn)題,客戶端將最新數(shù)據(jù)獲取后會(huì)保存在本地的 緩存文件中,以后會(huì)優(yōu)先從文件中獲取配置信息的值,如果獲取不到,會(huì)直接從服務(wù)器拉去,并保存到緩存中;
其實(shí)真正干活的就是ClientWorker類;客戶端是通過(guò)一個(gè)定時(shí)的長(zhǎng)輪詢來(lái)檢查自己監(jiān)聽(tīng)的配置項(xiàng)的數(shù)據(jù)的,一旦服務(wù)端的數(shù)據(jù)發(fā)生變化時(shí),會(huì)從服務(wù)端獲取到dataID的列表,
客戶端根據(jù)dataID列表從服務(wù)端獲取到最新的數(shù)據(jù),并將最新的數(shù)據(jù)保存在一個(gè) CacheData 對(duì)象中,在輪詢過(guò)程中,如果決定使用本地配置,就會(huì)比較當(dāng)前CacheData 的MD5值是否和所有監(jiān)聽(tīng)者所持有的MD5值相等,如果不相等,,此時(shí)就會(huì)對(duì)該 CacheData 所綁定的 Listener 觸發(fā) receiveConfigInfo 回調(diào),來(lái)通知使用者此配置信息已經(jīng)變更;
感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“從Nacos客戶端視角來(lái)看看配置中心實(shí)現(xiàn)原理是什么”這篇文章對(duì)大家有幫助,同時(shí)也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來(lái)學(xué)習(xí)!
網(wǎng)站標(biāo)題:從Nacos客戶端視角來(lái)看看配置中心實(shí)現(xiàn)原理是什么
URL地址:http://www.rwnh.cn/article32/jsdhsc.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站維護(hù)、商城網(wǎng)站、網(wǎng)站收錄、微信小程序、網(wǎng)站內(nèi)鏈、網(wǎng)站改版
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)