之前了解到的 Flink 的心跳服務(wù)都比較淺顯,只知道 在 Flink 中心跳服務(wù)是由 ReourceManager 發(fā)送給 JobMaster 和 TaskExecutor 以及 JobMaster 發(fā)送給 TaskExecutor。 然后 TaskExecutor 返回相關(guān)的Slot等數(shù)據(jù)給 ResouceManager。所以一直以為 心跳服務(wù)是 Akka 的 ask 進(jìn)行傳遞的。 但是查看相關(guān)源碼發(fā)現(xiàn)和我的理解有些出入。并且在最開(kāi)始查看源碼的時(shí)候發(fā)現(xiàn),F(xiàn)link 對(duì)心跳服務(wù)封裝的比較好,定義的接口在很多地方都是匿名的實(shí)現(xiàn),所以一開(kāi)始看的時(shí)候很容易混淆,搞不清楚整個(gè)心跳的流程,下面用ResourceManager和TaskManager的心跳服務(wù) 來(lái)簡(jiǎn)單聊一聊 Flink 中心跳服務(wù)的流程。
下面是心跳服務(wù)類(lèi)的繼承關(guān)系
最核心的類(lèi)就是HeaderbeatManager
接口的實(shí)現(xiàn)類(lèi)HeartbeatManagerImpl
類(lèi)。其中實(shí)現(xiàn)了接收到心跳請(qǐng)求和接受到心跳的代碼。它的子類(lèi)HeartbeatManagerSendImpl
繼承了Runnable接口,用于定期觸發(fā)心跳請(qǐng)求。
在HeartbeatManagerImpl
中有一個(gè)存放HeartbeatMonitor
對(duì)象的 Map 集合。HeartbeatMonitor
類(lèi)主要是記錄心跳的時(shí)間,判斷心跳是否超時(shí)。在構(gòu)造HeartbeatMonitor
的時(shí)候需要傳入一個(gè)HeartbeatTarget
接口的實(shí)現(xiàn)對(duì)象。HeartbeatTarget
接口定義的是接受到心跳請(qǐng)求后的操作和接收到心跳的操作。 該接口的實(shí)現(xiàn)類(lèi)主要在兩個(gè)地方,一個(gè)是在添加 Motitor 時(shí)的匿名對(duì)象,比如在RM添加對(duì) TaskManager 監(jiān)聽(tīng)時(shí)會(huì)傳入一個(gè)實(shí)現(xiàn)了HeartbeatTarget 接口的匿名對(duì)象。一個(gè)是在HeartbeatManagerSendImpl
中的實(shí)現(xiàn)。這個(gè)地方我最開(kāi)始看源碼時(shí)特別容易混淆。HeartbeatManagerSendImpl
中的requestHeartbeat()
方法是接收到心跳請(qǐng)求后的處理,receiveHeartbeat()
是接收到心跳后的處理。 在匿名對(duì)象中的requestHeartbeat()
是發(fā)送心跳請(qǐng)求的動(dòng)作(e.g. RM向TM發(fā)送心跳請(qǐng)求)而receiveHeartbeat()
則是實(shí)現(xiàn)了 接收到心跳請(qǐng)求后發(fā)送心跳的動(dòng)作 (e.g. TM 就收到RM的心跳請(qǐng)求,向RM發(fā)送心跳及需要匯報(bào)的信息)
下面是ResourceManager 和 TaskExecutor 的心跳服務(wù)的流程
在最開(kāi)始,ResourceManager 服務(wù)啟動(dòng)的時(shí)會(huì)創(chuàng)建兩個(gè) 心跳服務(wù)管理對(duì)象, RM用來(lái)管理TaskManager的心跳服務(wù)的對(duì)象名叫taskManagerHeartbeatManager
private void startHeartbeatServices() {taskManagerHeartbeatManager =
heartbeatServices.createHeartbeatManagerSender(
resourceId,
new TaskManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
jobManagerHeartbeatManager =
heartbeatServices.createHeartbeatManagerSender(
resourceId,
new JobManagerHeartbeatListener(),
getMainThreadExecutor(),
log);
}
一個(gè)用來(lái)管理 TaskManager 的心跳通信,一個(gè)用來(lái)管理 JobManager 的心跳通信。這兩個(gè)對(duì)象都是HeartbeatManagerSenderImpl
對(duì)象。在HeartbeatManagerSenderImpl
的構(gòu)造方法中就會(huì)啟動(dòng)定時(shí)任務(wù)。
public class HeartbeatManagerSenderImplextends HeartbeatManagerImplimplements Runnable {
HeartbeatManagerSenderImpl(ScheduledExecutor mainThreadExecutor, ...) {super(heartbeatTimeout,...mainThreadExecutor);
this.heartbeatPeriod = heartbeatPeriod;
// 開(kāi)始任務(wù)調(diào)度
mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);
}
@Override
public void run() {if (!stopped) {log.debug("Trigger heartbeat request.");
for (HeartbeatMonitorheartbeatMonitor : getHeartbeatTargets().values()) {requestHeartbeat(heartbeatMonitor);
}
// 設(shè)置新的任務(wù)調(diào)度
getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
}
}
}
mainThreadExecutor
是一個(gè)RpcEndpoint
的靜態(tài)內(nèi)部類(lèi),這里使用它的schedule()
方法來(lái)實(shí)現(xiàn)定時(shí)任務(wù)調(diào)度。schedule()
接受一個(gè)Runnable
接口的對(duì)象,而HeartbeatManagerSenderImpl
就實(shí)現(xiàn)了Runnable
接口,所以,在定時(shí)任務(wù)被觸發(fā)時(shí)就會(huì)執(zhí)行HeartbeatManagerSenderImpl#run()
方法。 在run()
方法中,會(huì)繼續(xù)設(shè)置一個(gè)新的定時(shí)任務(wù),這樣不斷地循環(huán)。這里默認(rèn)的延遲時(shí)間為 10000 毫秒。
schedule()
方法實(shí)現(xiàn)任務(wù)的延遲執(zhí)行主要是通過(guò)給 Actor 發(fā)送一條異步任務(wù)的消息,該消息會(huì)帶上延遲執(zhí)行的時(shí)間。 在這里就是 ResourceManager 給自己的 Acotr 發(fā)送了一條延遲消息。
@Override
public void scheduleRunAsync(Runnable runnable, long delayMillis) {if (isLocal) {// 計(jì)算任務(wù)調(diào)度的時(shí)間
long atTimeNanos = delayMillis == 0 ? 0 : System.nanoTime() + (delayMillis * 1_000_000);
// 向自己發(fā)送一條 異步任務(wù)處理 的消息
tell(new RunAsync(runnable, atTimeNanos));
}
}
在 ResourceManager 的 Actor 接收到這條消息的時(shí)候,會(huì)判斷任任務(wù)是否需要立即執(zhí)行,如果是延遲執(zhí)行,則會(huì)使用 Akka 的 ActorSystem.scheduler() 來(lái)定時(shí)執(zhí)行該任務(wù)。
private void handleRunAsync(RunAsync runAsync) {final long timeToRun = runAsync.getTimeNanos();
final long delayNanos;
// 如果接收到的任務(wù)已經(jīng)到達(dá)任務(wù)的執(zhí)行時(shí)間則立即執(zhí)行
if (timeToRun == 0 || (delayNanos = timeToRun - System.nanoTime())<= 0) {// run immediately
try {runAsync.getRunnable().run();
} catch (Throwable t) {log.error("Caught exception while executing runnable in main thread.", t);
ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
}
// 如果沒(méi)有到達(dá)任務(wù)的執(zhí)行時(shí)間,則發(fā)送一條新的延遲消息給自己
} else {// schedule for later. send a new message after the delay, which will then be
// immediately executed
FiniteDuration delay = new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS);
RunAsync message = new RunAsync(runAsync.getRunnable(), timeToRun);
final Object envelopedSelfMessage = envelopeSelfMessage(message);
getContext()
.system()
.scheduler()
.scheduleOnce(
delay,
getSelf(),
envelopedSelfMessage,
getContext().dispatcher(),
ActorRef.noSender());
}
}
心跳監(jiān)聽(tīng)對(duì)象的添加與觸發(fā)heartbeatMonitor 對(duì)象的添加是在 TaskManger 啟動(dòng)后,向 ResourceManager 注冊(cè)時(shí)調(diào)用HeartbeatManagerSenderImpl#monitorTarget()
方法添加的。 添加的時(shí)候會(huì)傳入一個(gè)HeartbeatTarget 接口的匿名實(shí)現(xiàn)類(lèi)。 該實(shí)現(xiàn)類(lèi)就定義了觸發(fā)心跳請(qǐng)求時(shí)的操作。下面代碼中就定義了RM向TaskManager發(fā)送心跳時(shí)需要怎么做,但是接收心跳請(qǐng)求的方法
private RegistrationResponse registerTaskExecutorInternal(
TaskExecutorGateway taskExecutorGateway,
TaskExecutorRegistration taskExecutorRegistration) {// 向 RM的TaskManager心跳管理服務(wù) 添加心跳監(jiān)聽(tīng)對(duì)象
taskManagerHeartbeatManager.monitorTarget(
taskExecutorResourceId,
new HeartbeatTarget() {@Override
public void receiveHeartbeat(ResourceID resourceID, Void payload) {// the ResourceManager will always send heartbeat requests to the
// TaskManager
}
@Override
public void requestHeartbeat(ResourceID resourceID, Void payload) {taskExecutorGateway.heartbeatFromResourceManager(resourceID);
}
});
}
在HeartbeatManagerSenderImpl
的run()
方法中,會(huì)遍歷所有的正在監(jiān)視的 heartbeatMonitor 對(duì)象,并調(diào)用 在添加監(jiān)視時(shí)傳入的heartbeatTarget
匿名對(duì)象的requestHeartbeat()
方法,就像上面代碼一樣。所以在RM向TaskManager 發(fā)送心跳請(qǐng)求的時(shí)候 是通過(guò) 調(diào)用taskExecutorGateway
的heartbeatFromResourceManager() 發(fā)送了 RPC 請(qǐng)求
在 TM 服務(wù)啟動(dòng)的時(shí)候同樣也會(huì)創(chuàng)建一個(gè)心跳服務(wù)來(lái)管理與RM之間的心跳
this.resourceManagerHeartbeatManager =
createResourceManagerHeartbeatManager(heartbeatServices, resourceId);
// ============
publicHeartbeatManagercreateHeartbeatManager(
ResourceID resourceId,
HeartbeatListenerheartbeatListener,
ScheduledExecutor mainThreadExecutor,
Logger log) {return new HeartbeatManagerImpl<>(
heartbeatTimeout, resourceId, heartbeatListener, mainThreadExecutor, log);
}
在TM中創(chuàng)建的就是HeartbeatManagerImpl
對(duì)象,因?yàn)門(mén)M并不需要發(fā)送心跳請(qǐng)求,所以不是創(chuàng)建HeartbeatManagerSenderImpl
對(duì)象。
TM 向 RM 注冊(cè)成功后,會(huì)添加一個(gè)對(duì) RM 的監(jiān)聽(tīng)對(duì)象
// monitor the resource manager as heartbeat target
resourceManagerHeartbeatManager.monitorTarget(
resourceManagerResourceId,
new HeartbeatTarget() {@Override
public void receiveHeartbeat(
ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {resourceManagerGateway.heartbeatFromTaskManager(
resourceID, heartbeatPayload);
}
@Override
public void requestHeartbeat(
ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {// the TaskManager won't send heartbeat requests to the ResourceManager
}
});
在這里,HeartbeatTarget 匿名對(duì)象中,receiveHeartbeat() 就是向RM 發(fā)送心跳并附帶上匯報(bào)信息,而requestHeartbeat 是空的,因?yàn)?TM 不會(huì)向 RM 發(fā)送心跳請(qǐng)求。
TaskManager 接受心跳請(qǐng)求并發(fā)送心跳回到之前,RM 調(diào)用taskExecutorGateway的heartbeatFromResourceManager方法,通過(guò)RPC方式發(fā)送了心跳請(qǐng)求。 在TaskExecutor類(lèi)中的heartbeatFromResourceManager方法就會(huì)被調(diào)用。并傳入了RM 的 resourceID。
@Override
public void heartbeatFromResourceManager(ResourceID resourceID) {resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
}
resourceManagerHeartbeatManager 就是 TM 時(shí)創(chuàng)建的HeartbeatManagerImpl
對(duì)象,所以這里調(diào)用的requestHeartbeat() 方法是HeartbeatManagerImpl
中的方法。
public class HeartbeatManagerImplimplements HeartbeatManager{// 接受到RM的心跳請(qǐng)求
@Override
public void requestHeartbeat(final ResourceID requestOrigin, I heartbeatPayload) {if (!stopped) {log.debug("Received heartbeat request from {}.", requestOrigin);
// 匯報(bào)心跳,清除HeartbeatMonitor中的超時(shí)Future
final HeartbeatTargetheartbeatTarget = reportHeartbeat(requestOrigin);
if (heartbeatTarget != null) {if (heartbeatPayload != null) {heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
}
heartbeatTarget.receiveHeartbeat(
getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin));
}
}
}
}
在這個(gè)方法中 首先會(huì)調(diào)用reportHeartbeat方法.
HeartbeatTargetreportHeartbeat(ResourceID resourceID) {if (heartbeatTargets.containsKey(resourceID)) {// 通過(guò) RM 的reosurceID 找到 TM對(duì)RM的監(jiān)聽(tīng)器
HeartbeatMonitorheartbeatMonitor = heartbeatTargets.get(resourceID);
// 重新設(shè)置 監(jiān)聽(tīng)器的超時(shí)時(shí)間
heartbeatMonitor.reportHeartbeat();
return heartbeatMonitor.getHeartbeatTarget();
} else {return null;
}
}
之后就會(huì)調(diào)用之前創(chuàng)建監(jiān)聽(tīng)器時(shí)的匿名對(duì)象的方法來(lái)通過(guò)RPC調(diào)用向RM發(fā)送心跳數(shù)據(jù)。
resourceManagerGateway.heartbeatFromTaskManager(resourceID, heartbeatPayload);
之后又回到了RM
RM 接受TM的心跳數(shù)據(jù)在 TM 發(fā)送 RPC 請(qǐng)求后,ResourceManager 類(lèi)中的heartbeatFromTaskManager()
方法會(huì)被調(diào)用。該方法只有一行代碼
@Override
public void heartbeatFromTaskManager(
final ResourceID resourceID, final TaskExecutorHeartbeatPayload heartbeatPayload) {taskManagerHeartbeatManager.receiveHeartbeat(resourceID, heartbeatPayload);
}
所以在這里,會(huì)調(diào)用 RM 管理 TM 的心跳服務(wù)對(duì)象(HeartbeatManagerSenderImpl) 的receiveHeartbeat()
方法。
@Override
public void receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload) {if (!stopped) {log.debug("Received heartbeat from {}.", heartbeatOrigin);
reportHeartbeat(heartbeatOrigin);
if (heartbeatPayload != null) {heartbeatListener.reportPayload(heartbeatOrigin, heartbeatPayload);
}
}
}
這里首先會(huì)調(diào)用reportHeartbeat()
來(lái)重新設(shè)置 在 RM 中對(duì) TM 的監(jiān)聽(tīng)器的超時(shí)時(shí)間。 然后調(diào)用heartbeatListener來(lái)處理TM 傳過(guò)來(lái)的數(shù)據(jù)。
你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級(jí)服務(wù)器適合批量采購(gòu),新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧
標(biāo)題名稱(chēng):Flink心跳服務(wù)流程-創(chuàng)新互聯(lián)
標(biāo)題網(wǎng)址:http://www.rwnh.cn/article36/gdhpg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供企業(yè)網(wǎng)站制作、品牌網(wǎng)站設(shè)計(jì)、App設(shè)計(jì)、微信公眾號(hào)、用戶(hù)體驗(yàn)、網(wǎng)站導(dǎo)航
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容