本篇文章為大家展示了什么是Flink windows和Time操作,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。
成都創(chuàng)新互聯(lián)公司專注于企業(yè)全網(wǎng)整合營銷推廣、網(wǎng)站重做改版、樂山網(wǎng)站定制設(shè)計、自適應(yīng)品牌網(wǎng)站建設(shè)、H5建站、商城網(wǎng)站定制開發(fā)、集團公司官網(wǎng)建設(shè)、外貿(mào)營銷網(wǎng)站建設(shè)、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁設(shè)計等建站業(yè)務(wù),價格優(yōu)惠性價比高,為樂山等各大城市提供網(wǎng)站開發(fā)制作服務(wù)。
在Flink中常用的Time類型:
處理時間
攝取時間
事件時間
是上圖中,最后一步的處理時間,表示服務(wù)器中執(zhí)行相關(guān)操作的處理時間。例如一些算子操作時間,在服務(wù)器上面的時間。
如果你以處理時間作為流處理的時間處理方式,那么所有的基于時間的操作都會使用服務(wù)器的時間,來運行相關(guān)的操作。例如:一個小時的處理時間窗口,將會包含一個小時內(nèi)的到達服務(wù)器內(nèi)的所有數(shù)據(jù)。例如應(yīng)用程序9:15am開始執(zhí)行,第一個小時的時間處理窗口會包含所有的9:15到10:15內(nèi)的事件數(shù)據(jù),下一個時間窗口是10:15到11:15內(nèi)的所有數(shù)據(jù)。
處理時間是最簡單的事件處理方式,并不需要流和機器的時間協(xié)調(diào)。因此提供了高性能和低延遲。然而在分布式環(huán)境中或者異步環(huán)境中處理時間并不能夠提供準確性(也就是說在處理數(shù)據(jù)時,由于網(wǎng)絡(luò)的抖動在一個處理時間窗口中例如9:15到10:15,很大可能包括9:00的事件數(shù)據(jù))。
事件時間是每一個設(shè)備上每一個單獨事件發(fā)生的時間例如手機登錄APP的日志時間。這個時間就是這條數(shù)據(jù)記錄的時間。每一條數(shù)據(jù)都有一個時間戳表示這條數(shù)據(jù)的事件發(fā)生時間。這個時間取決于每條數(shù)據(jù),而并不會依賴于機器的時間。事件時間處理時必須指定如何獲得Event Time watermarks(用來描述Event Time如何處理)。
按照事件時間處理數(shù)據(jù),處理結(jié)果應(yīng)該是完全一致,也就是說無論處理多少次結(jié)果都是一樣的,這就是所謂的大數(shù)據(jù)處理的冪等性。不管事件到達時間和事件是不是有序到達(在生產(chǎn)環(huán)境中,數(shù)據(jù)往往進入到服務(wù)器中的時間和順序是不一定的,有可能先產(chǎn)生的數(shù)據(jù)后到達服務(wù)器,這取決于很多網(wǎng)絡(luò)因素)
攝取時間表示某個事件數(shù)據(jù)進入到Flink的時間。在source操作中,每條記錄都會得到source的當前時間戳,也就是接收到的數(shù)據(jù)自動會有一個攝取時間,也就是例如時間窗都是基于這個時間來處理的。
攝取時間是處于事件時間和處理時間之間。如上圖所示。攝取時間是有成本的,但是卻是結(jié)果可預(yù)測的。因為攝取時間使用了穩(wěn)定的時間戳(在source端只會分配一次),每一條數(shù)據(jù)的時間戳都是固定的。并且同一攝取時間的數(shù)據(jù)有可能被分配到不同的處理時間窗口中。
Windows使我們處理無限數(shù)據(jù)流(源源不斷的進來)的核心部件。Windows把我們的數(shù)據(jù)流拆成一個個的buckets。我們需要把算子作用到buckets上面去。
第一件事情就是需要指定我們的流數(shù)據(jù)是不是有key,有key和沒有key對應(yīng)的算子是完全不一樣的。
帶keyby,會結(jié)合windows一起使用。輸入的數(shù)據(jù)內(nèi)容中的任意屬性都可以作為一個key。在這個流上可以允許窗口多任務(wù)并行計算,每一個邏輯key都可以被獨立計算,相同的key的數(shù)據(jù)會被發(fā)送到相同的并行任務(wù)中去處理。
通過使用windowAll來指定。原始的數(shù)據(jù)流不會被拆分成多個邏輯任務(wù),所有窗口邏輯都是一個窗口任務(wù)來執(zhí)行,所以并行度是1。
簡而言之,當?shù)谝粋€元素到達對應(yīng)的窗口時,一個windows就會被開始創(chuàng)建。當時間(不管是event時間還是processing時間)達到時間戳范圍,就會移除窗口。另外,每一個窗口都有一個Trigger和window Functions,當數(shù)據(jù)到達窗口后,執(zhí)行的函數(shù)就是window Functions,這個函數(shù)包含了對這個窗口內(nèi)容的所有計算,當Trigger達到一定條件之后,就會觸發(fā)。
在指定流數(shù)據(jù)是否帶key之后,下一步就是定義窗口的分配器(windows assigner),windows assigner的職責(zé)是定義每一個傳入的元素如何分配到窗口內(nèi)。對于keyby使用window()方法,對于non-keyby使用windowAll()方法。
A
WindowAssigner
is responsible for assigning each incoming element to one or more windows.
每個傳入的數(shù)據(jù)分配給一個或多個窗口。
Flink內(nèi)置的window assigner對于大多數(shù)場景來講基本上是夠用的(tumbling windows滾動窗口, sliding windows滑動窗口, session windows會話窗口 and global windows全局窗口)。也可以通過繼承WindowAssigner來自定義一個window assigner。所有的內(nèi)置window assigner(除了全局窗口)都是基于時間(處理時間或事件時間)來分配數(shù)據(jù)的。
基于時間的窗口有一個開始的timestamp(inclusive)和結(jié)束timestamp(exclusive)表示窗口的大小。
Flink中對于窗口的劃分有兩大類,第一大類是基于time(用的最多),第二大類是基于count。
滾動窗口分配器將分配每一個元素到一個指定大小的窗口,這種類型的窗口有一個固定的大小而且不會有重疊的。上面這張圖就是隨著時間流按照指定的時間間隔拆開。
簡單實例代碼:
object WindosApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val text = env.socketTextStream("192.168.227.128", 9999) text.flatMap(_.split(",")).map((_,1)).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1) env.execute("WindosApp") } }
上面的代碼表示監(jiān)聽socket數(shù)據(jù)流,每隔5秒獲取一次數(shù)據(jù)。timeWindow表示根據(jù)時間來劃分窗口,(此外還有countWindow根據(jù)數(shù)量來劃分窗口)。默認時間是processTime處理時間。
public class JavaWindowApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> text = env.socketTextStream("192.168.227.128", 9999); text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { String[] tokens = value.toLowerCase().split(","); for(String token: tokens) { if(token.length()>0){ out.collect(new Tuple2<String, Integer>(token, 1)); } } } }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1); env.execute("JavaWindowApp"); } }
滑動窗口分配器分配每一個元素到一個固定大小的窗口,類似于滾動窗口,窗口大小可以通過配置進行修改,但是滑動窗口還有另外一個附加滑動參數(shù)控制滑動窗口什么時候啟動,所以這個窗口是有可能重疊的。
上面圖的意思是window1的窗口大小是10分鐘,滑動大小是5分鐘,也就是每隔5分鐘產(chǎn)生一個窗口,這個窗口的大小是10分鐘,這個窗口就是window2,然后window2又過5分鐘產(chǎn)生一個窗口,窗口的大小是10分鐘 window3,以此類推。所以滑動窗口處理的數(shù)據(jù)可能會有重疊。一個數(shù)據(jù)元素可能會在多個窗口中進行處理。
使用場景:每個半個小時統(tǒng)計前一個小時的TopN。
object WindosApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val text = env.socketTextStream("192.168.227.128", 9999) text.flatMap(_.split(",")).map((_,1)).keyBy(0) //.timeWindow(Time.seconds(5)) # 滾動窗口 .timeWindow(Time.seconds(10),Time.seconds(5)) .sum(1).print().setParallelism(1) env.execute("WindosApp") } }
每隔5秒統(tǒng)計近10秒的數(shù)據(jù)。所以當服務(wù)器端輸入:
a,a,a,b,b,b a,a,a,b,b,b a,b,a,b,a,a
時,控制臺會打印兩遍結(jié)果:
(a,10) (b,8) (b,8) (a,10)
在定義窗口分配器之后,就需要指定基于每一個窗口的計算方法了(在上面的例子中我們做了一個keyby sum操作)。window function會處理窗口中的每一個元素。window function包括如下幾個:
ReduceFunction
AggregationFunction
FoldFunction
ProcessWindowFunction
ReduceFunction和AggregationFunction的執(zhí)行效率更高,因為Flink會在數(shù)據(jù)到達每一個窗口時首先做一個增量聚合操作。ProcessWindowFunction拿到的是包含在窗口中的所有的元素以及附加信息一個Iterable,是一個全量聚合。因此ProcessWindowFunction的執(zhí)行效率不高,因為Flink會緩存窗口中的所有數(shù)據(jù)。
input中的兩個元素進行結(jié)合產(chǎn)生一個同樣類型的輸出。這里我們舉例,通過傳入的數(shù)據(jù)類型是數(shù)值類型來演示增量效果。
object WindowReduceApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val text = env.socketTextStream("192.168.227.128", 9999) text.flatMap(_.split(",")) .map(x=>(1,x.toInt)) // 1,2,3,4,5 => (1,1) (1,2) (1,3) (1,4) (1,5) .keyBy(0) //因為key都是1, 所以所有的元素都到一個task去執(zhí)行 .timeWindow(Time.seconds(5)) // 滾動窗口 .reduce((v1, v2) => { //// reduce函數(shù)作用在窗口之上,就可以完成窗口中的增量操作,不用等所有的數(shù)據(jù)到達之后進行一次性處理,而是數(shù)據(jù)兩兩處理 println(v1 + "....." + v2) (v1._1, v1._2 + v2._2) }) .print().setParallelism(1) env.execute("WindowReduceApp") } }
服務(wù)器端輸入:
1,2,3,4,5
控制臺中輸出如下:
(1,1).....(1,2) (1,3).....(1,3) (1,6).....(1,4) (1,10).....(1,5) (1,15)
reduce函數(shù)作用在窗口之上,就可以完成窗口中的增量操作,不用等所有的數(shù)據(jù)到達之后進行一次性處理,而是數(shù)據(jù)兩兩處理。
public class JavaWindowReduceApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> text = env.socketTextStream("192.168.227.128", 9999); text.flatMap(new FlatMapFunction<String, Tuple2<Integer, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) throws Exception { String[] tokens = value.toLowerCase().split(","); for(String token: tokens) { if(token.length()>0){ out.collect(new Tuple2<Integer, Integer>(1, Integer.parseInt(token))); } } } }).keyBy(0).timeWindow(Time.seconds(5)) .reduce(new ReduceFunction<Tuple2<Integer, Integer>>() { @Override public Tuple2<Integer, Integer> reduce(Tuple2<Integer, Integer> value1, Tuple2<Integer, Integer> value2) throws Exception { System.out.println("value1 = [" + value1 + "], value2 = [" + value2 + "]"); return new Tuple2<>(value1.f0,value1.f1 + value2.f1); } }) .print().setParallelism(1); env.execute("JavaWindowApp"); } }
輸出結(jié)果如下:
value1 = [(1,1)], value2 = [(1,2)] value1 = [(1,3)], value2 = [(1,3)] value1 = [(1,6)], value2 = [(1,4)] value1 = [(1,10)], value2 = [(1,5)] (1,15)
ProcessWindowFunction可以拿到一個Iterable,可以拿到窗口中的所有元素,并且有一個上下文對象可以訪問時間和狀態(tài)信息,比reducefunction可以提供更多的功能。但這樣卻可以帶來資源和性能的開銷,因為元素并不能通過增量的方式去聚合,相反,它需要把所有的數(shù)據(jù)都放在一個buffer中。
public class JavaWindowProcessApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> text = env.socketTextStream("192.168.227.128", 9999); text.flatMap(new FlatMapFunction<String, Tuple2<Integer, Integer>>() { @Override public void flatMap(String value, Collector<Tuple2<Integer, Integer>> out) throws Exception { String[] tokens = value.toLowerCase().split(","); for(String token: tokens) { if(token.length()>0){ out.collect(new Tuple2<Integer, Integer>(1, Integer.parseInt(token))); } } } }).keyBy(0).timeWindow(Time.seconds(5)) .process(new ProcessWindowFunction<Tuple2<Integer, Integer>, Object, Tuple, TimeWindow>() { @Override public void process(Tuple tuple, Context context, Iterable<Tuple2<Integer, Integer>> elements, Collector<Object> out) throws Exception { System.out.println("tuple = [" + tuple + "], context = [" + context + "], elements = [" + elements + "], out = [" + out + "]"); long count = 0; for(Tuple2<Integer, Integer> in:elements) { count++; } out.collect("window:" + context.window() + "count:" + count); } }) .print().setParallelism(1); env.execute("JavaWindowApp"); } }
服務(wù)器輸入:
1,2,3,4,5
控制臺輸出:
tuple = [(1)], context = [org.apache.flink.streaming.runtime.operators.windowing.functions.InternalProcessWindowContext@40e09d6c], elements = [[(1,1), (1,2), (1,3), (1,4), (1,5)]], out = [org.apache.flink.streaming.api.operators.TimestampedCollector@4e277b00] window:TimeWindow{start=1568542160000, end=1568542165000}count:5
只輸出一次,說明是等待所有數(shù)據(jù)都拿到之后才進行處理。
使用場景:窗口內(nèi)的數(shù)據(jù)進行排序。在Reduce中是無法進行排序的。
上述內(nèi)容就是什么是Flink windows和Time操作,你們學(xué)到知識或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識儲備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。
新聞標題:什么是Flinkwindows和Time操作
網(wǎng)站鏈接:http://www.rwnh.cn/article34/jgpsse.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供自適應(yīng)網(wǎng)站、靜態(tài)網(wǎng)站、網(wǎng)站營銷、用戶體驗、ChatGPT、建站公司
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)