SyncRequestProcessor,該處理器將請求存入磁盤,其將請求批量的存入磁盤以提高效率,請求在寫入磁盤之前是不會被轉(zhuǎn)發(fā)到下個處理器的。
創(chuàng)新互聯(lián)為您提適合企業(yè)的網(wǎng)站設(shè)計?讓您的網(wǎng)站在搜索引擎具有高度排名,讓您的網(wǎng)站具備超強(qiáng)的網(wǎng)絡(luò)競爭力!結(jié)合企業(yè)自身,進(jìn)行網(wǎng)站設(shè)計及把握,最后結(jié)合企業(yè)文化和具體宗旨等,才能創(chuàng)作出一份性化解決方案。從網(wǎng)站策劃到網(wǎng)站設(shè)計、成都網(wǎng)站制作, 我們的網(wǎng)頁設(shè)計師為您提供的解決方案。
SyncRequestProcessor維護(hù)了ZooKeeperServer實例,其用于獲取ZooKeeper的數(shù)據(jù)庫和其他信息;維護(hù)了一個處理請求的隊列,其用于存放請求;維護(hù)了一個處理快照的線程,用于處理快照;維護(hù)了一個running標(biāo)識,標(biāo)識SyncRequestProcessor是否在運行;同時還維護(hù)了一個等待被刷新到磁盤的請求隊列。
// Zookeeper服務(wù)器
private final ZooKeeperServer zks;
// 請求隊列
private final LinkedBlockingQueue<Request> queuedRequests =
new LinkedBlockingQueue<Request>();
// 下個處理器
private final RequestProcessor nextProcessor;
// 快照處理線程
private Thread snapInProcess = null;
// 是否在運行中
volatile private boolean running;
/**
* Transactions that have been written and are waiting to be flushed to
* disk. Basically this is the list of SyncItems whose callbacks will be
* invoked after flush returns successfully.
*/
// 等待被刷新到磁盤的請求隊列
private final LinkedList<Request> toFlush = new LinkedList<Request>();
// 隨機(jī)數(shù)生成器
private final Random r = new Random();
/**
* The number of log entries to log before starting a snapshot
*/
// 快照個數(shù)
private static int snapCount = ZooKeeperServer.getSnapCount();
// 結(jié)束請求標(biāo)識
private final Request requestOfDeath = Request.requestOfDeath;
構(gòu)造函數(shù)首先會調(diào)用父類的構(gòu)造函數(shù),然后根據(jù)構(gòu)造函數(shù)參數(shù)給類的屬性賦值,其中會確定下個處理器,并會設(shè)置該處理器正在運行的標(biāo)識。
public SyncRequestProcessor(ZooKeeperServer zks,
RequestProcessor nextProcessor) {
super("SyncThread:" + zks.getServerId(), zks
.getZooKeeperServerListener());
this.zks = zks;
this.nextProcessor = nextProcessor;
running = true;
}
@Override
public void run() {
try {
// 寫日志數(shù)量初始化為0
int logCount = 0;
// we do this in an attempt to ensure that not all of the servers
// in the ensemble take a snapshot at the same time
// 防止集群中所有機(jī)器在同一時刻進(jìn)行數(shù)據(jù)快照,對是否進(jìn)行數(shù)據(jù)快照增加隨機(jī)因素
int randRoll = r.nextInt(snapCount/2);
while (true) {
Request si = null;
// 沒有需要刷新到磁盤的請求
if (toFlush.isEmpty()) {
// 從請求隊列中取出一個請求,若queuedRequests隊列為空會阻塞
si = queuedRequests.take();
} else {
// 從請求隊列中取出一個請求,若queuedRequests隊列為空,則返回空,不會阻塞
si = queuedRequests.poll();
// 取出的請求為空
if (si == null) {
// 刷新數(shù)據(jù)磁盤
flush(toFlush);
continue;
}
}
// 在關(guān)閉處理器之后,會添加requestOfDeath請求到queuedRequests隊列,表示關(guān)閉后不再處理請求
if (si == requestOfDeath) {
break;
}
// 請求不為空,處理請求
if (si != null) {
// track the number of records written to the log
// 將寫請求添加至事務(wù)日志文件 FileTxnSnapLog.append(si)
if (zks.getZKDatabase().append(si)) {
// 日志寫入,logCount加1
logCount++;
//確定是否需要進(jìn)行數(shù)據(jù)快照
if (logCount > (snapCount / 2 + randRoll)) {
randRoll = r.nextInt(snapCount/2);
// roll the log
// 滾動日志,從當(dāng)前日志文件滾到下一個日志文件,不是回滾
zks.getZKDatabase().rollLog();
// take a snapshot
if (snapInProcess != null && snapInProcess.isAlive()) { // 正在進(jìn)行快照
LOG.warn("Too busy to snap, skipping");
} else {
// 創(chuàng)建線程來處理快照
snapInProcess = new ZooKeeperThread("Snapshot Thread") {
public void run() {
try {
// 進(jìn)行快照
zks.takeSnapshot();
} catch(Exception e) {
LOG.warn("Unexpected exception", e);
}
}
};
// 開始快照線程處理
snapInProcess.start();
}
// 重置為0
logCount = 0;
}
} else if (toFlush.isEmpty()) {// 讀請求會走到這里,查看此時toFlush是否為空,如果為空,說明近段時間讀多寫少,直接響應(yīng)
// optimization for read heavy workloads
// iff this is a read, and there are no pending
// flushes (writes), then just pass this to the next
// processor
if (nextProcessor != null) {
// 下個處理器開始處理請求
nextProcessor.proce***equest(si);
// 處理器是Flushable的,刷新數(shù)據(jù)到磁盤
if (nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
continue;
}
// 將請求添加至被刷新至磁盤隊列
toFlush.add(si);
if (toFlush.size() > 1000) {// 隊列大小大于1000,直接刷新到磁盤
flush(toFlush);
}
}
}
} catch (Throwable t) {
handleException(this.getName(), t);
} finally{
running = false;
}
LOG.info("SyncRequestProcessor exited!");
}
flush將toFlush隊列中的請求刷新到磁盤中。
private void flush(LinkedList<Request> toFlush)
throws IOException, RequestProcessorException
{
if (toFlush.isEmpty())
return;
// 提交事務(wù)至ZK數(shù)據(jù)庫
zks.getZKDatabase().commit();
while (!toFlush.isEmpty()) {
// 從隊列移除請求
Request i = toFlush.remove();
// 下個處理器開始處理請求
if (nextProcessor != null) {
nextProcessor.proce***equest(i);
}
}
if (nextProcessor != null && nextProcessor instanceof Flushable) {
((Flushable)nextProcessor).flush();
}
}
函數(shù)用于關(guān)閉SyncRequestProcessor處理器,其首先會在queuedRequests隊列中添加一個結(jié)束請求requestOfDeath,然后再判斷SyncRequestProcessor是否還在運行,若是,則會等待其結(jié)束;之后判斷toFlush隊列是否為空,若不為空,則刷新到磁盤中
public void shutdown() {
LOG.info("Shutting down");
// 添加結(jié)束請求請求至隊列
queuedRequests.add(requestOfDeath);
try {
// 還在運行
if(running){
this.join();// 等待該線程終止
}
if (!toFlush.isEmpty()) {// 隊列不為空,刷新到磁盤
flush(toFlush);
}
} catch(InterruptedException e) {
LOG.warn("Interrupted while wating for " + this + " to finish");
} catch (IOException e) {
LOG.warn("Got IO exception during shutdown");
} catch (RequestProcessorException e) {
LOG.warn("Got request processor exception during shutdown");
}
if (nextProcessor != null) {
nextProcessor.shutdown();
}
}
當(dāng)前名稱:zookeeper(12)源碼分析-請求處理鏈(2)
本文來源:http://www.rwnh.cn/article48/jdciep.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站設(shè)計、用戶體驗、品牌網(wǎng)站制作、網(wǎng)站維護(hù)、營銷型網(wǎng)站建設(shè)、搜索引擎優(yōu)化
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)