内射老阿姨1区2区3区4区_久久精品人人做人人爽电影蜜月_久久国产精品亚洲77777_99精品又大又爽又粗少妇毛片

Flink心跳服務(wù)流程-創(chuàng)新互聯(lián)

之前了解到的 Flink 的心跳服務(wù)都比較淺顯,只知道 在 Flink 中心跳服務(wù)是由 ReourceManager 發(fā)送給 JobMaster 和 TaskExecutor 以及 JobMaster 發(fā)送給 TaskExecutor。 然后 TaskExecutor 返回相關(guān)的Slot等數(shù)據(jù)給 ResouceManager。所以一直以為 心跳服務(wù)是 Akka 的 ask 進(jìn)行傳遞的。 但是查看相關(guān)源碼發(fā)現(xiàn)和我的理解有些出入。并且在最開(kāi)始查看源碼的時(shí)候發(fā)現(xiàn),F(xiàn)link 對(duì)心跳服務(wù)封裝的比較好,定義的接口在很多地方都是匿名的實(shí)現(xiàn),所以一開(kāi)始看的時(shí)候很容易混淆,搞不清楚整個(gè)心跳的流程,下面用ResourceManager和TaskManager的心跳服務(wù) 來(lái)簡(jiǎn)單聊一聊 Flink 中心跳服務(wù)的流程。
下面是心跳服務(wù)類(lèi)的繼承關(guān)系
在這里插入圖片描述

創(chuàng)新互聯(lián)成立10年來(lái),這條路我們正越走越好,積累了技術(shù)與客戶(hù)資源,形成了良好的口碑。為客戶(hù)提供網(wǎng)站設(shè)計(jì)、成都網(wǎng)站制作、網(wǎng)站策劃、網(wǎng)頁(yè)設(shè)計(jì)、域名與空間、網(wǎng)絡(luò)營(yíng)銷(xiāo)、VI設(shè)計(jì)、網(wǎng)站改版、漏洞修補(bǔ)等服務(wù)。網(wǎng)站是否美觀、功能強(qiáng)大、用戶(hù)體驗(yàn)好、性?xún)r(jià)比高、打開(kāi)快等等,這些對(duì)于網(wǎng)站建設(shè)都非常重要,創(chuàng)新互聯(lián)通過(guò)對(duì)建站技術(shù)性的掌握、對(duì)創(chuàng)意設(shè)計(jì)的研究為客戶(hù)提供一站式互聯(lián)網(wǎng)解決方案,攜手廣大客戶(hù),共同發(fā)展進(jìn)步。

最核心的類(lèi)就是HeaderbeatManager接口的實(shí)現(xiàn)類(lèi)HeartbeatManagerImpl類(lèi)。其中實(shí)現(xiàn)了接收到心跳請(qǐng)求和接受到心跳的代碼。它的子類(lèi)HeartbeatManagerSendImpl繼承了Runnable接口,用于定期觸發(fā)心跳請(qǐng)求。
HeartbeatManagerImpl中有一個(gè)存放HeartbeatMonitor對(duì)象的 Map 集合。
HeartbeatMonitor類(lèi)主要是記錄心跳的時(shí)間,判斷心跳是否超時(shí)。在構(gòu)造HeartbeatMonitor的時(shí)候需要傳入一個(gè)HeartbeatTarget接口的實(shí)現(xiàn)對(duì)象。
HeartbeatTarget接口定義的是接受到心跳請(qǐng)求后的操作和接收到心跳的操作。 該接口的實(shí)現(xiàn)類(lèi)主要在兩個(gè)地方,一個(gè)是在添加 Motitor 時(shí)的匿名對(duì)象,比如在RM添加對(duì) TaskManager 監(jiān)聽(tīng)時(shí)會(huì)傳入一個(gè)實(shí)現(xiàn)了HeartbeatTarget 接口的匿名對(duì)象。一個(gè)是在HeartbeatManagerSendImpl中的實(shí)現(xiàn)。這個(gè)地方我最開(kāi)始看源碼時(shí)特別容易混淆。HeartbeatManagerSendImpl中的requestHeartbeat()方法是接收到心跳請(qǐng)求后的處理,receiveHeartbeat()是接收到心跳后的處理。 在匿名對(duì)象中的requestHeartbeat()是發(fā)送心跳請(qǐng)求的動(dòng)作(e.g. RM向TM發(fā)送心跳請(qǐng)求)而receiveHeartbeat()則是實(shí)現(xiàn)了 接收到心跳請(qǐng)求后發(fā)送心跳的動(dòng)作 (e.g. TM 就收到RM的心跳請(qǐng)求,向RM發(fā)送心跳及需要匯報(bào)的信息)

下面是ResourceManager 和 TaskExecutor 的心跳服務(wù)的流程
在這里插入圖片描述

RM 心跳服務(wù)的創(chuàng)建與調(diào)度

在最開(kāi)始,ResourceManager 服務(wù)啟動(dòng)的時(shí)會(huì)創(chuàng)建兩個(gè) 心跳服務(wù)管理對(duì)象, RM用來(lái)管理TaskManager的心跳服務(wù)的對(duì)象名叫taskManagerHeartbeatManager

private void startHeartbeatServices() {taskManagerHeartbeatManager =
            heartbeatServices.createHeartbeatManagerSender(
                    resourceId,
                    new TaskManagerHeartbeatListener(),
                    getMainThreadExecutor(),
                    log);

    jobManagerHeartbeatManager =
            heartbeatServices.createHeartbeatManagerSender(
                    resourceId,
                    new JobManagerHeartbeatListener(),
                    getMainThreadExecutor(),
                    log);
}

一個(gè)用來(lái)管理 TaskManager 的心跳通信,一個(gè)用來(lái)管理 JobManager 的心跳通信。這兩個(gè)對(duì)象都是HeartbeatManagerSenderImpl對(duì)象。在HeartbeatManagerSenderImpl的構(gòu)造方法中就會(huì)啟動(dòng)定時(shí)任務(wù)。

public class HeartbeatManagerSenderImplextends HeartbeatManagerImplimplements Runnable {
    HeartbeatManagerSenderImpl(ScheduledExecutor mainThreadExecutor, ...) {super(heartbeatTimeout,...mainThreadExecutor);
    
            this.heartbeatPeriod = heartbeatPeriod;
        	// 開(kāi)始任務(wù)調(diào)度
            mainThreadExecutor.schedule(this, 0L, TimeUnit.MILLISECONDS);
        }

    @Override
    public void run() {if (!stopped) {log.debug("Trigger heartbeat request.");
            for (HeartbeatMonitorheartbeatMonitor : getHeartbeatTargets().values()) {requestHeartbeat(heartbeatMonitor);
            }
        	// 設(shè)置新的任務(wù)調(diào)度
            getMainThreadExecutor().schedule(this, heartbeatPeriod, TimeUnit.MILLISECONDS);
        }
    }
}

mainThreadExecutor是一個(gè)RpcEndpoint的靜態(tài)內(nèi)部類(lèi),這里使用它的schedule()方法來(lái)實(shí)現(xiàn)定時(shí)任務(wù)調(diào)度。schedule()接受一個(gè)Runnable接口的對(duì)象,而HeartbeatManagerSenderImpl就實(shí)現(xiàn)了Runnable接口,所以,在定時(shí)任務(wù)被觸發(fā)時(shí)就會(huì)執(zhí)行HeartbeatManagerSenderImpl#run()方法。 在run()方法中,會(huì)繼續(xù)設(shè)置一個(gè)新的定時(shí)任務(wù),這樣不斷地循環(huán)。這里默認(rèn)的延遲時(shí)間為 10000 毫秒。

schedule()方法實(shí)現(xiàn)任務(wù)的延遲執(zhí)行主要是通過(guò)給 Actor 發(fā)送一條異步任務(wù)的消息,該消息會(huì)帶上延遲執(zhí)行的時(shí)間。 在這里就是 ResourceManager 給自己的 Acotr 發(fā)送了一條延遲消息。

@Override
    public void scheduleRunAsync(Runnable runnable, long delayMillis) {if (isLocal) {// 計(jì)算任務(wù)調(diào)度的時(shí)間
            long atTimeNanos = delayMillis == 0 ? 0 : System.nanoTime() + (delayMillis * 1_000_000);
        	// 向自己發(fā)送一條 異步任務(wù)處理 的消息
            tell(new RunAsync(runnable, atTimeNanos));
        } 
    }

在 ResourceManager 的 Actor 接收到這條消息的時(shí)候,會(huì)判斷任任務(wù)是否需要立即執(zhí)行,如果是延遲執(zhí)行,則會(huì)使用 Akka 的 ActorSystem.scheduler() 來(lái)定時(shí)執(zhí)行該任務(wù)。

private void handleRunAsync(RunAsync runAsync) {final long timeToRun = runAsync.getTimeNanos();
        final long delayNanos;
    	// 如果接收到的任務(wù)已經(jīng)到達(dá)任務(wù)的執(zhí)行時(shí)間則立即執(zhí)行
        if (timeToRun == 0 || (delayNanos = timeToRun - System.nanoTime())<= 0) {// run immediately
            try {runAsync.getRunnable().run();
            } catch (Throwable t) {log.error("Caught exception while executing runnable in main thread.", t);
                ExceptionUtils.rethrowIfFatalErrorOrOOM(t);
            }
            // 如果沒(méi)有到達(dá)任務(wù)的執(zhí)行時(shí)間,則發(fā)送一條新的延遲消息給自己
        } else {// schedule for later. send a new message after the delay, which will then be
            // immediately executed
            FiniteDuration delay = new FiniteDuration(delayNanos, TimeUnit.NANOSECONDS);
            RunAsync message = new RunAsync(runAsync.getRunnable(), timeToRun);

            final Object envelopedSelfMessage = envelopeSelfMessage(message);

            getContext()
                    .system()
                    .scheduler()
                    .scheduleOnce(
                            delay,
                            getSelf(),
                            envelopedSelfMessage,
                            getContext().dispatcher(),
                            ActorRef.noSender());
        }
    }
心跳監(jiān)聽(tīng)對(duì)象的添加與觸發(fā)

heartbeatMonitor 對(duì)象的添加是在 TaskManger 啟動(dòng)后,向 ResourceManager 注冊(cè)時(shí)調(diào)用HeartbeatManagerSenderImpl#monitorTarget()方法添加的。 添加的時(shí)候會(huì)傳入一個(gè)HeartbeatTarget 接口的匿名實(shí)現(xiàn)類(lèi)。 該實(shí)現(xiàn)類(lèi)就定義了觸發(fā)心跳請(qǐng)求時(shí)的操作。下面代碼中就定義了RM向TaskManager發(fā)送心跳時(shí)需要怎么做,但是接收心跳請(qǐng)求的方法

private RegistrationResponse registerTaskExecutorInternal(
        TaskExecutorGateway taskExecutorGateway,
		TaskExecutorRegistration taskExecutorRegistration) {// 向 RM的TaskManager心跳管理服務(wù) 添加心跳監(jiān)聽(tīng)對(duì)象
    taskManagerHeartbeatManager.monitorTarget(
    taskExecutorResourceId,
    new HeartbeatTarget() {@Override
        public void receiveHeartbeat(ResourceID resourceID, Void payload) {// the ResourceManager will always send heartbeat requests to the
            // TaskManager
        }

        @Override
        public void requestHeartbeat(ResourceID resourceID, Void payload) {taskExecutorGateway.heartbeatFromResourceManager(resourceID);
        }
    });
}

HeartbeatManagerSenderImplrun()方法中,會(huì)遍歷所有的正在監(jiān)視的 heartbeatMonitor 對(duì)象,并調(diào)用 在添加監(jiān)視時(shí)傳入的heartbeatTarget匿名對(duì)象的requestHeartbeat()方法,就像上面代碼一樣。所以在RM向TaskManager 發(fā)送心跳請(qǐng)求的時(shí)候 是通過(guò) 調(diào)用taskExecutorGateway的heartbeatFromResourceManager() 發(fā)送了 RPC 請(qǐng)求

TaskManager 心跳服務(wù)創(chuàng)建與監(jiān)聽(tīng)對(duì)象添加

在 TM 服務(wù)啟動(dòng)的時(shí)候同樣也會(huì)創(chuàng)建一個(gè)心跳服務(wù)來(lái)管理與RM之間的心跳

  this.resourceManagerHeartbeatManager =
                createResourceManagerHeartbeatManager(heartbeatServices, resourceId);
// ============

    publicHeartbeatManagercreateHeartbeatManager(
            ResourceID resourceId,
            HeartbeatListenerheartbeatListener,
            ScheduledExecutor mainThreadExecutor,
            Logger log) {return new HeartbeatManagerImpl<>(
                heartbeatTimeout, resourceId, heartbeatListener, mainThreadExecutor, log);
    }

在TM中創(chuàng)建的就是HeartbeatManagerImpl對(duì)象,因?yàn)門(mén)M并不需要發(fā)送心跳請(qǐng)求,所以不是創(chuàng)建HeartbeatManagerSenderImpl對(duì)象。

TM 向 RM 注冊(cè)成功后,會(huì)添加一個(gè)對(duì) RM 的監(jiān)聽(tīng)對(duì)象

// monitor the resource manager as heartbeat target
        resourceManagerHeartbeatManager.monitorTarget(
                resourceManagerResourceId,
                new HeartbeatTarget() {@Override
                    public void receiveHeartbeat(
                            ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {resourceManagerGateway.heartbeatFromTaskManager(
                                resourceID, heartbeatPayload);
                    }

                    @Override
                    public void requestHeartbeat(
                            ResourceID resourceID, TaskExecutorHeartbeatPayload heartbeatPayload) {// the TaskManager won't send heartbeat requests to the ResourceManager
                    }
                });

在這里,HeartbeatTarget 匿名對(duì)象中,receiveHeartbeat() 就是向RM 發(fā)送心跳并附帶上匯報(bào)信息,而requestHeartbeat 是空的,因?yàn)?TM 不會(huì)向 RM 發(fā)送心跳請(qǐng)求。

TaskManager 接受心跳請(qǐng)求并發(fā)送心跳

回到之前,RM 調(diào)用taskExecutorGateway的heartbeatFromResourceManager方法,通過(guò)RPC方式發(fā)送了心跳請(qǐng)求。 在TaskExecutor類(lèi)中的heartbeatFromResourceManager方法就會(huì)被調(diào)用。并傳入了RM 的 resourceID。

@Override
public void heartbeatFromResourceManager(ResourceID resourceID) {resourceManagerHeartbeatManager.requestHeartbeat(resourceID, null);
}

resourceManagerHeartbeatManager 就是 TM 時(shí)創(chuàng)建的HeartbeatManagerImpl對(duì)象,所以這里調(diào)用的requestHeartbeat() 方法是HeartbeatManagerImpl中的方法。

public class HeartbeatManagerImplimplements HeartbeatManager{// 接受到RM的心跳請(qǐng)求
	@Override
    public void requestHeartbeat(final ResourceID requestOrigin, I heartbeatPayload) {if (!stopped) {log.debug("Received heartbeat request from {}.", requestOrigin);
            // 匯報(bào)心跳,清除HeartbeatMonitor中的超時(shí)Future
            final HeartbeatTargetheartbeatTarget = reportHeartbeat(requestOrigin);

            if (heartbeatTarget != null) {if (heartbeatPayload != null) {heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
                }

                heartbeatTarget.receiveHeartbeat(
                        getOwnResourceID(), heartbeatListener.retrievePayload(requestOrigin));
            }
        }
    }
}

在這個(gè)方法中 首先會(huì)調(diào)用reportHeartbeat方法.

HeartbeatTargetreportHeartbeat(ResourceID resourceID) {if (heartbeatTargets.containsKey(resourceID)) {// 通過(guò) RM 的reosurceID 找到 TM對(duì)RM的監(jiān)聽(tīng)器
            HeartbeatMonitorheartbeatMonitor = heartbeatTargets.get(resourceID);
            // 重新設(shè)置 監(jiān)聽(tīng)器的超時(shí)時(shí)間
            heartbeatMonitor.reportHeartbeat();

            return heartbeatMonitor.getHeartbeatTarget();
        } else {return null;
        }
    }

之后就會(huì)調(diào)用之前創(chuàng)建監(jiān)聽(tīng)器時(shí)的匿名對(duì)象的方法來(lái)通過(guò)RPC調(diào)用向RM發(fā)送心跳數(shù)據(jù)。

resourceManagerGateway.heartbeatFromTaskManager(resourceID, heartbeatPayload);

之后又回到了RM

RM 接受TM的心跳數(shù)據(jù)

在 TM 發(fā)送 RPC 請(qǐng)求后,ResourceManager 類(lèi)中的heartbeatFromTaskManager()方法會(huì)被調(diào)用。該方法只有一行代碼

@Override
public void heartbeatFromTaskManager(
        final ResourceID resourceID, final TaskExecutorHeartbeatPayload heartbeatPayload) {taskManagerHeartbeatManager.receiveHeartbeat(resourceID, heartbeatPayload);
}

所以在這里,會(huì)調(diào)用 RM 管理 TM 的心跳服務(wù)對(duì)象(HeartbeatManagerSenderImpl) 的receiveHeartbeat()方法。

@Override
public void receiveHeartbeat(ResourceID heartbeatOrigin, I heartbeatPayload) {if (!stopped) {log.debug("Received heartbeat from {}.", heartbeatOrigin);
        reportHeartbeat(heartbeatOrigin);

        if (heartbeatPayload != null) {heartbeatListener.reportPayload(heartbeatOrigin, heartbeatPayload);
        }
    }
}

這里首先會(huì)調(diào)用reportHeartbeat()來(lái)重新設(shè)置 在 RM 中對(duì) TM 的監(jiān)聽(tīng)器的超時(shí)時(shí)間。 然后調(diào)用heartbeatListener來(lái)處理TM 傳過(guò)來(lái)的數(shù)據(jù)。

你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機(jī)房具備T級(jí)流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級(jí)服務(wù)器適合批量采購(gòu),新人活動(dòng)首月15元起,快前往官網(wǎng)查看詳情吧

標(biāo)題名稱(chēng):Flink心跳服務(wù)流程-創(chuàng)新互聯(lián)
標(biāo)題網(wǎng)址:http://www.rwnh.cn/article36/gdhpg.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供企業(yè)網(wǎng)站制作、品牌網(wǎng)站設(shè)計(jì)、App設(shè)計(jì)、微信公眾號(hào)、用戶(hù)體驗(yàn)、網(wǎng)站導(dǎo)航

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶(hù)投稿、用戶(hù)轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

外貿(mào)網(wǎng)站制作
琼结县| 定边县| 临海市| 绩溪县| 云和县| 车险| 新竹市| 崇州市| 讷河市| 乌审旗| 崇明县| 彩票| 姜堰市| 萨嘎县| 新建县| 朝阳县| 安陆市| 黎城县| 开封县| 黎城县| 赤水市| 灵山县| 和龙市| 库尔勒市| 高台县| 连山| 遵义县| 乐平市| 元朗区| 栾川县| 山东省| 敖汉旗| 冕宁县| 新沂市| 逊克县| 突泉县| 陆丰市| 宜川县| 余江县| 清苑县| 汕尾市|