中文字幕日韩精品一区二区免费_精品一区二区三区国产精品无卡在_国精品无码专区一区二区三区_国产αv三级中文在线

Spark中Join的用法

這篇文章主要介紹“Spark中Join的用法”,在日常操作中,相信很多人在Spark中Join的用法問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”Spark中Join的用法”的疑惑有所幫助!接下來,請跟著小編一起來學(xué)習(xí)吧!

專注于為中小企業(yè)提供網(wǎng)站制作、成都網(wǎng)站設(shè)計(jì)服務(wù),電腦端+手機(jī)端+微信端的三站合一,更高效的管理,為中小企業(yè)滴道免費(fèi)做網(wǎng)站提供優(yōu)質(zhì)的服務(wù)。我們立足成都,凝聚了一批互聯(lián)網(wǎng)行業(yè)人才,有力地推動了上千余家企業(yè)的穩(wěn)健成長,幫助中小企業(yè)通過網(wǎng)站建設(shè)實(shí)現(xiàn)規(guī)模擴(kuò)充和轉(zhuǎn)變。

在數(shù)據(jù)分析和處理的過程中,我們經(jīng)常會用Join操作來關(guān)聯(lián)兩個(gè)數(shù)據(jù)集,Spark作為一個(gè)通用的分析引擎,能夠支持多種Join的應(yīng)用場景。

Join操作的輸入是兩個(gè)數(shù)據(jù)集,A和B,將數(shù)據(jù)集A中的每一條記錄和數(shù)據(jù)集B中的每一條記錄進(jìn)行比對,每發(fā)現(xiàn)一條符合條件的記錄時(shí),返回一條新的記錄,新記錄中的字段可以只從A中來,也可以只從B中來,也可以分別從A和B中取一部分,因此,Join后的記錄可以表示兩個(gè)數(shù)據(jù)集中記錄的結(jié)合。

影響Spark Join操作的三個(gè)因素

具體到Spark中Join操作的執(zhí)行,有三個(gè)影響較大的因素:輸入數(shù)據(jù)集的大小、Join條件、Join類型。

輸入數(shù)據(jù)集的大小

輸入數(shù)據(jù)集的大小直接影響Join操作的效率和可靠性,不只絕對大小,數(shù)據(jù)集之間的相對大小也對效率和可靠性有影響。

Join條件

Join條件通常是兩個(gè)數(shù)據(jù)集中字段的邏輯比較,一般可以分為等值Join不等值Join。

等值Join可以包含一個(gè)相等條件或多個(gè)需要同時(shí)滿足的相等條件,比如:

  • 一個(gè)相等條件:A.x == B.x

  • 多個(gè)相等條件:A.x == B.x and A.y == B.y

注:x 和 y 是數(shù)據(jù)集A和B中的字段。

不等值Join使用不相等條件或者不能同時(shí)滿足的相等條件,比如:

  • 不相等條件:A.x < B.x

  • 不能同時(shí)滿足的相等條件:A.x == B.x or A.y == B.y

Join類型

Join類型影響Join操作的輸出,大致包括以下幾類:

  • Inner Join:Inner Join只輸出匹配的記錄(滿足Join條件),記錄來自兩個(gè)數(shù)據(jù)集

  • Outer Join:Outer Join除了輸出匹配的記錄,也輸出未匹配的記錄,根據(jù)如何輸出未匹配的記錄,outer Join可以進(jìn)一步分為left out join、right out join和full outer join,記錄來自兩個(gè)數(shù)據(jù)集

  • Semi Join:Semi Join輸出的記錄只來自一個(gè)數(shù)據(jù)集,要么是匹配的記錄,要么是未匹配的記錄。如果輸出的是未匹配的記錄,也叫做Anti Join

  • Cross Join:Cross Join輸出兩個(gè)數(shù)據(jù)集中所有記錄可能的組合,例如,A集合中有m條記錄,B集合中有n條記錄,則結(jié)果為m*n條記錄,Cross Join又稱為笛卡爾積。

根據(jù)上面的三個(gè)因素,Spark會選擇合適的執(zhí)行機(jī)制來完成Join操作。

Spark Join的執(zhí)行機(jī)制

Spark提供了五種執(zhí)行Join操作的機(jī)制,分別是:

  • Shuffle Hash Join

  • Broadcast Hash Join

  • Sort Merge Join

  • Cartesian Join

  • Broadcast Nested Join

Hash Join

Broadcast Hash Join和Shuffle Hash Join都基于Hash Join,Hash Join是單機(jī)上的Join操作。想象一道LeetCode算法題,數(shù)據(jù)量分別為m和n的兩個(gè)數(shù)組,怎么找到兩個(gè)數(shù)組的公共元素?第一種方法:對兩個(gè)數(shù)組進(jìn)行嵌套循環(huán)的遍歷,發(fā)現(xiàn)相等元素則輸出。第二種方法:用空間換時(shí)間,將其中一個(gè)數(shù)組轉(zhuǎn)化成集合(Python的set或者Java的HashSet,實(shí)現(xiàn)都基于哈希表),然后遍歷第二個(gè)數(shù)組中的每一個(gè)元素,判斷是否包含在第一個(gè)集合中。Hash Join和第二種方法類似,將較小的數(shù)據(jù)集分區(qū)構(gòu)造成哈希表,用Join的key作為哈希表的key,key所對應(yīng)的記錄作為哈希表的value,然后遍歷較大的數(shù)據(jù)集分區(qū),在哈希表中尋找對應(yīng)的key,找到兩個(gè)分區(qū)key相同的記錄將其輸出。因?yàn)槭褂昧斯1?,所以叫做Hash Join。

根據(jù)進(jìn)行Join的兩個(gè)數(shù)據(jù)集的大小關(guān)系,Spark支持兩種Hash Join。

Broadcast Hash Join

當(dāng)其中一個(gè)數(shù)據(jù)集足夠小時(shí),采用Broadcast Hash Join,較小的數(shù)據(jù)集會被廣播到所有Spark的executor上,并轉(zhuǎn)化為一個(gè)Hash Table,之后較大數(shù)據(jù)集的各個(gè)分區(qū)會在各個(gè)executor上與Hash Table進(jìn)行本地的Join,各分區(qū)Join的結(jié)果合并為最終結(jié)果。

Broadcast Hash Join 沒有Shuffle階段、效率最高。但為了保證可靠性,executor必須有足夠的內(nèi)存能放得下被廣播的數(shù)據(jù)集,所以當(dāng)進(jìn)兩個(gè)數(shù)據(jù)集的大小都超過一個(gè)可配置的閾值之后,Spark不會采用這種Join。控制這個(gè)閾值的參數(shù)為spark.sql.autoBroadcastJoinThreshold,最新版本(3.0.1)中默認(rèn)值為10M。

Shuffle Hash Join

當(dāng)兩個(gè)數(shù)據(jù)集都小于可以使用Broadcast Hash Join的閾值時(shí),采用Shuffle Join,先對兩個(gè)數(shù)據(jù)集進(jìn)行Shuffle,Shuffle是意思是根據(jù)key的哈希值,對兩個(gè)數(shù)據(jù)集進(jìn)行重新分區(qū),使得兩個(gè)數(shù)據(jù)集中key的哈希值相同的記錄會被分配到同一個(gè)executor上,此時(shí)在每個(gè)executor上的分區(qū)都足夠小,各個(gè)executor分別執(zhí)行Hash Join即可。

Shuffle操作會帶來大量的網(wǎng)絡(luò)IO開銷,因此效率會受到影響。同時(shí),在executor的內(nèi)存使用方面,如果executor的數(shù)量足夠多,每個(gè)分區(qū)處理的數(shù)據(jù)量可以控制到比較小。

Sort Merge Join

Sort Merge Join和Shuffle Hash Join類似,會有一個(gè)Shuffle階段,將key相同的記錄重分配同一個(gè)executor上,不同的是,在每個(gè)executor上,不再構(gòu)造哈希表,而是對兩個(gè)分區(qū)進(jìn)行排序,然后用兩個(gè)下標(biāo)同時(shí)遍歷兩個(gè)分區(qū),如果兩個(gè)下標(biāo)指向的記錄key相同,則輸出這兩條記錄,否則移動key較小的下標(biāo)。

Sort Merge Join也有Shuffle階段,因此效率同樣不如Broadcast Hash Join。在內(nèi)存使用方面,因?yàn)椴恍枰獦?gòu)造哈希表,需要的內(nèi)存比Hash Join要少。

Cartesian Join

Cartesian Join機(jī)制專門用來實(shí)現(xiàn)cross join,結(jié)果的分區(qū)數(shù)等于輸入數(shù)據(jù)集的分區(qū)數(shù)之積,結(jié)果中每一個(gè)分區(qū)的數(shù)據(jù)對應(yīng)一個(gè)輸入數(shù)據(jù)集的一個(gè)分區(qū)和另外一個(gè)輸入數(shù)據(jù)集的一個(gè)分區(qū)。

Cartesian Join會產(chǎn)生非常多的分區(qū),但如果要進(jìn)行cross join,別無選擇。

Broadcast Nested Loop Join

Broadcast Nested Join將一個(gè)輸入數(shù)據(jù)集廣播到每個(gè)executor上,然后在各個(gè)executor上,另一個(gè)數(shù)據(jù)集的分區(qū)會和第一個(gè)數(shù)據(jù)集使用嵌套循環(huán)的方式進(jìn)行Join輸出結(jié)果。

Broadcast Nested Join需要廣播數(shù)據(jù)集和嵌套循環(huán),計(jì)算效率極低,對內(nèi)存的需求也極大,因?yàn)椴徽摂?shù)據(jù)集大小,都會有一個(gè)數(shù)據(jù)集被廣播到所有executor上。

Spark如何選擇Join機(jī)制

Spark根據(jù)以下的因素選擇實(shí)際執(zhí)行Join的機(jī)制:

  • 參數(shù)配置

  • hint參數(shù)

  • 輸入數(shù)據(jù)集大小

  • Join類型

  • Join條件

其中,hint參數(shù)是一種在join時(shí)手動指定join機(jī)制的方法,例如:

df1.hint("broadcast").join(df2, ...)

下面介紹在什么情況下使用何種Join機(jī)制。

何時(shí)使用Broadcast Hash Join

必需條件:

  • 只用于等值Join

  • 不能用于Full Outer Join

以下條件需要滿足一個(gè):

  • 左邊的數(shù)據(jù)集使用了broadcast hint,Join類型是Right Outer,Right Semi或Inner

  • 沒使用hint,但左邊的數(shù)據(jù)集小于spark.sql.autoBroadcastJoinThreshold參數(shù),Join類型是Right Outer,Right Semi或Inner

  • 右邊的數(shù)據(jù)集使用了broadcast hint,Join類型是Left Outer,Left Semi或Inner

  • 沒使用hint,但右邊的數(shù)據(jù)集小于spark.sql.autoBroadcastJoinThreshold參數(shù),Join類型是Left Outer,Left Semi或Inner

  • 兩個(gè)數(shù)據(jù)集都使用了broadcast hint,Join類型是Left Outer,Left Semi,Right Outer,Right Semi或Inner

  • 沒使用hint,但兩個(gè)數(shù)據(jù)集都小于spark.sql.autoBroadcastJoinThreshold參數(shù),Join類型是Left Outer,Left Semi,Right Outer,Right Semi或Inner

何時(shí)使用Shuffle Hash Join

必需條件:

  • 只用于等值Join

  • 不能用于Full Outer Join

  • spark.sql.join.prefersortmergeJoin 參數(shù)默認(rèn)值為true,設(shè)置為false

以下條件需要滿足一個(gè):

  • 左邊的數(shù)據(jù)集使用了shuffle_hash hint,Join類型是Right Outer,Right Semi或Inner

  • 沒使用hint,但左邊的數(shù)據(jù)集比右邊的數(shù)據(jù)集顯著小,Join類型是Right Outer,Right Semi或Inner

  • 右邊的數(shù)據(jù)集使用了shuffle_hash hint,Join類型是Left Outer,Left Semi或Inner

  • 沒使用hint,但右邊的數(shù)據(jù)集比左邊的數(shù)據(jù)集顯著小,Join類型是Left Outer,Left Semi或Inner

  • 兩邊的數(shù)據(jù)集都使用了shuffle_hash hint,Join類型是Left Outer,Left Semi,Right Outer,Right Semi或Inner

  • 沒使用hint,兩個(gè)數(shù)據(jù)集都比較小,Join類型是Left Outer,Left Semi,Right Outer,Right Semi或Inner

何時(shí)使用Sort Merge Join

必需條件:

  • 只用于等值Join

  • Join條件中的key是可排序的

  • spark.sql.join.prefersortmergeJoin 參數(shù)默認(rèn)值為true,設(shè)置為true

以下條件需要滿足一個(gè):

  • 有一個(gè)數(shù)據(jù)集使用了merge hint,Join類型任意

  • 沒有使用merge hint,Join類型任意

何時(shí)使用Cartesian Join

必需條件:

  • Cross Join

以下條件需要滿足一個(gè):

  • 使用了shuffle_replicate_nl hint,是等值或不等值Join均可

  • 沒有使用hint,等值或不等值Join均可

何時(shí)Broadcast Nested Loop Join

Broadcast Nested Loop Join是默認(rèn)的Join機(jī)制,當(dāng)沒有選用其他Join機(jī)制被選擇時(shí),用它來進(jìn)行任意條件任意類型的Join。

當(dāng)有多種Join機(jī)制可用時(shí),選擇的優(yōu)先級為Broadcast Hash Join > Sort Merge Join > Shuffle Hash Join > Cartesian Join。

在進(jìn)行Inner Join和不等值Join時(shí),如果有一個(gè)數(shù)據(jù)集可以被廣播,Broadcast Nested Loop Join的優(yōu)先級比Cartesian Join優(yōu)先級高。

到此,關(guān)于“Spark中Join的用法”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識,請繼續(xù)關(guān)注創(chuàng)新互聯(lián)網(wǎng)站,小編會繼續(xù)努力為大家?guī)砀鄬?shí)用的文章!

新聞標(biāo)題:Spark中Join的用法
文章位置:http://www.rwnh.cn/article0/ghcjio.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供企業(yè)建站、電子商務(wù)、軟件開發(fā)、服務(wù)器托管、品牌網(wǎng)站建設(shè)、網(wǎng)站設(shè)計(jì)公司

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來源: 創(chuàng)新互聯(lián)

成都網(wǎng)頁設(shè)計(jì)公司
兴山县| 佳木斯市| 上蔡县| 滨州市| 墨玉县| 两当县| 江油市| 东明县| 沁水县| 麟游县| 金昌市| 漾濞| 田林县| 上犹县| 房山区| 玉山县| 班玛县| 抚州市| 黑河市| 印江| 密山市| 衡南县| 台安县| 阿尔山市| 南宫市| 沁水县| 建阳市| 宜城市| 天峻县| 田阳县| 沙坪坝区| 彭水| 太和县| 沙河市| 海丰县| 长乐市| 和田市| 平山县| 嘉义市| 舟曲县| 古丈县|