内射老阿姨1区2区3区4区_久久精品人人做人人爽电影蜜月_久久国产精品亚洲77777_99精品又大又爽又粗少妇毛片

Spark核心編程-創(chuàng)新互聯(lián)

文章目錄
  • Spark 核心編程
    • 一、RDD
      • 1、什么是 RDD
      • 2、分布式計算模擬
        • (1) 搭建基礎(chǔ)的架子
        • (2) 客戶端向服務(wù)器發(fā)送計算任務(wù)
      • 3、RDD 創(chuàng)建
        • (1) 從集合(內(nèi)存)中創(chuàng)建
        • (2) 從外部存儲(文件)創(chuàng)建RDD
        • (3) 從其他RDD創(chuàng)建
        • (4) 直接創(chuàng)建 RDD (new)
      • 4、RDD 并行度與分區(qū)
        • (1) makeRDD() 基于內(nèi)存創(chuàng)建的RDD的分區(qū)
        • (2) 基于文件創(chuàng)建的RDD 的分區(qū)
        • (3) 數(shù)據(jù)分區(qū)的規(guī)則

創(chuàng)新互聯(lián)建站是一家專業(yè)提供高昌企業(yè)網(wǎng)站建設(shè),專注與網(wǎng)站設(shè)計制作、做網(wǎng)站、H5開發(fā)、小程序制作等業(yè)務(wù)。10年已為高昌眾多企業(yè)、政府機構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)絡(luò)公司優(yōu)惠進行中。Spark 核心編程

Spark 計算框架為了能夠進行高并發(fā)和高吞吐的數(shù)據(jù)處理,封裝了三大數(shù)據(jù)結(jié)構(gòu),用于處理不同的應(yīng)用場景。三大數(shù)據(jù)結(jié)構(gòu)分別是:
1)RDD:彈性分布式數(shù)據(jù)集
2)累加器:分布式共享只寫變量
3)廣播變量:分布式共享只讀變量
接下來讓我們看看這三大數(shù)據(jù)結(jié)構(gòu)是如何數(shù)據(jù)處理中使用的

一、RDD 1、什么是 RDD

RDD(Resilient Distributed Dataset)叫做彈性分布式數(shù)據(jù)集,是 Spark 中最基本的數(shù)據(jù)處理模型。代碼中是一個抽象類,它代表一個彈性的,不可變,可分區(qū),里面的元素可并行計算的集合。
彈性:
存儲的彈性:內(nèi)存與磁盤的自動切換
容錯的彈性:數(shù)據(jù)丟失可以自動恢復(fù)
計算的彈性:計算出錯重試機制
分片的機制:可根據(jù)需要重新分片
分布式:數(shù)據(jù)存儲在大數(shù)據(jù)集群不同的節(jié)點上
數(shù)據(jù)集:RDD 封裝了計算邏輯,并不保存數(shù)據(jù)
數(shù)據(jù)抽象:RDD 是一個抽象類,需要子類具體實現(xiàn)
不可變:RDD 封裝了計算邏輯,是不可以改變的,想要改變,只能產(chǎn)生新的RDD,在新的RDD里面封裝邏輯計算
可分區(qū),并行計算

2、分布式計算模擬 (1) 搭建基礎(chǔ)的架子

首先分為兩部分,我們把Excuter當(dāng)成服務(wù)器,把Driver當(dāng)成客戶端。然后用客戶端去連接服務(wù)器,然后客戶端發(fā)送數(shù)據(jù)給服務(wù)器。
Excuter (服務(wù)器):
第一步設(shè)置服務(wù)器的端口號,ServerScket(9998)方法,里面的參數(shù)是端口號,這可以隨便寫。然后第二步等待客戶端發(fā)送數(shù)據(jù)過來accept()方法。然后第三步使用getInputStream輸入流接收客戶端發(fā)送過來的數(shù)據(jù),使用輸入流的read()方法,這個就是從客戶端拿到的數(shù)據(jù),然后把這個數(shù)據(jù)給輸出。最后把輸出流,數(shù)據(jù)等待,還有服務(wù)器依次都給關(guān)閉。

package com.atguigu.bigdata.spark.core.wc.test2

import java.io.InputStream
import java.net.{ServerSocket, Socket}

//這個是做計算準(zhǔn)備的,主要是邏輯代碼部分
//這個相當(dāng)于是服務(wù)器,然后Driver相當(dāng)于是客戶端,客戶端連接服務(wù)器就可以直接使用了
class Excuter {}
object Excuter{def main(args: Array[String]): Unit = {//啟動服務(wù)器,接收數(shù)據(jù) 這個端口號是隨便寫的
    val server = new ServerSocket(9998) //這個是網(wǎng)絡(luò)編程的
    println("服務(wù)器啟動,等待接收數(shù)據(jù)")

    //等待客戶端的鏈接
    val client: Socket = server.accept() //等待客戶端發(fā)送過來的數(shù)據(jù),accept()方法
    val in: InputStream = client.getInputStream //輸入流接收數(shù)據(jù)
    val i = in.read() //這個就是拿到的值
    println("接收到客戶端發(fā)送的數(shù)據(jù):" + i) //把客戶端拿到的數(shù)據(jù)給輸出

    in.close()  //把輸入流給關(guān)閉掉
    client.close()
    server.close() //把服務(wù)器給關(guān)閉掉

  }
}

在這里插入圖片描述
Driver (客戶端):
首先客戶端連接服務(wù)器的端口號Socket("localhost",9998)方法,第一個參數(shù)是連接方式,這里是本地連接,第二個參數(shù)是服務(wù)器的端口號。然后第二步就向服務(wù)器發(fā)送數(shù)據(jù),getOutputStream方法輸出流,然后使用輸出流的write()方法寫出數(shù)據(jù)。然后使用輸出流的flush()方法,flush方法的作用是,刷新此輸出流并強制寫出所有緩沖的輸出字節(jié)。然后用完之后就把輸出流和客戶端給關(guān)閉了。

package com.atguigu.bigdata.spark.core.wc.test2

import java.io.OutputStream
import java.net.Socket

//這個是用來執(zhí)行程序的
class Driver {}
object Driver{def main(args: Array[String]): Unit = {//連接服務(wù)器 本地連接,然后第二個參數(shù)是服務(wù)器定義的端口號
    val client = new Socket("localhost",9998) //這個相當(dāng)于是是客戶端,連接服務(wù)器
    val out: OutputStream = client.getOutputStream //向服務(wù)器發(fā)東西,用getOutputStream()
    out.write(2)
    out.flush()

    out.close() //用完了吧這個輸出流給關(guān)掉
    client.close() //然后把這個客戶端也關(guān)掉
  }
}
(2) 客戶端向服務(wù)器發(fā)送計算任務(wù)

Excuter 類里面是服務(wù)器,Driver是客戶端,Task 里面是準(zhǔn)備數(shù)據(jù)和邏輯操作的,那個Driver 里面創(chuàng)建一個Task 對象然后把Task 用ObjectOutputstream輸出流把對象給輸出到Excuter接收,接收也是使用ObjectIntputstream對象輸入流進行接收,因為輸出的是一個操作邏輯,用字節(jié)流接收肯定不對,所有要用對象。然后Excuter 拿到Task之后,就可以直接使用里面的函數(shù)了。Task里面要混入Serializable特質(zhì),因為在網(wǎng)絡(luò)中肯定是無法直接傳送一個對象過去的,所以要進行序列化。
在這里插入圖片描述7
Excuter 代碼:

package com.atguigu.bigdata.spark.core.wc.test2

import java.io.{InputStream, ObjectInputStream}
import java.net.{ServerSocket, Socket}

//這個是做計算準(zhǔn)備的,主要是邏輯代碼部分
//這個相當(dāng)于是服務(wù)器,然后Driver相當(dāng)于是客戶端,客戶端連接服務(wù)器就可以直接使用了
class Excuter {}
object Excuter{//要混入序列化的特征,不然不能那個傳一個對象過去
  def main(args: Array[String]): Unit = {//啟動服務(wù)器,接收數(shù)據(jù) 這個端口號是隨便寫的
    val server = new ServerSocket(9998) //這個是網(wǎng)絡(luò)編程的
    println("服務(wù)器啟動,等待接收數(shù)據(jù)")

    //等待客戶端的鏈接
    val client: Socket = server.accept() //等待客戶端發(fā)送過來的數(shù)據(jù),accept()方法
    val in: InputStream = client.getInputStream //輸入流接收數(shù)據(jù)
    val objin: ObjectInputStream = new ObjectInputStream(in) //輸出流失obj那么接收也應(yīng)該是obj
    val task: Task = objin.readObject().asInstanceOf[Task] //這個就是拿到的值 ,但是這里不應(yīng)該是AnyRef,所以要進行轉(zhuǎn)換
    val ints = task.compute() //上面已經(jīng)拿到了傳過來的操作了,所以可以直接使用里面定義的函數(shù)了
    println("計算節(jié)點的計算結(jié)果為:" + ints) //把客戶端拿到的數(shù)據(jù)給輸出

    objin.close()  //把輸入流給關(guān)閉掉
    client.close()
    server.close() //把服務(wù)器給關(guān)閉掉

  }
}

Driver 代碼:

package com.atguigu.bigdata.spark.core.wc.test2

import java.io.{ObjectOutputStream, OutputStream}
import java.net.Socket

//這個是用來執(zhí)行程序的
class Driver {}
object Driver {def main(args: Array[String]): Unit = {//連接服務(wù)器 本地連接,然后第二個參數(shù)是服務(wù)器定義的端口號
    val client = new Socket("localhost",9998) //這個相當(dāng)于是是客戶端,連接服務(wù)器
    val out: OutputStream = client.getOutputStream //向服務(wù)器發(fā)東西,用getOutputStream()

    val objout = new ObjectOutputStream(out) //定義這個Object的輸出,因為上面那個是輸出字節(jié)的不能傳輸對象

    val task:Task = new Task() //然后創(chuàng)建一個task
    objout.writeObject(task) //把task 傳入給objout 對象輸出流
    objout.flush()

    objout.close() //用完了吧這個輸出流給關(guān)掉
    client.close() //然后把這個客戶端也關(guān)掉
    println("客戶端發(fā)送數(shù)據(jù)完畢")
  }
}

Task 代碼:

package com.atguigu.bigdata.spark.core.wc.test2

class Task extends Serializable {//要混入序列化的特征,不然不能那個傳一個對象過去
  val datas = List(1,2,3,4)  //這個是數(shù)據(jù)

  val logic = (num:Int) =>{num * 2} //匿名函數(shù)  這個是邏輯

  //計算
  def compute() = {datas.map(logic)  //莫logic 上面定義的邏輯操作傳入進去

  }

}
3、RDD 創(chuàng)建

在 Spark 中創(chuàng)建 RDD 的創(chuàng)建方式可以分為四種: 一般就是用前兩種就行了,一般前兩種用的比較多。

(1) 從集合(內(nèi)存)中創(chuàng)建

從集合中創(chuàng)建RDD,Spark主要提供了兩個方法:parallelizemakeRDD
parallelize 是并行的意思,makeRDD 的底層則完全就是調(diào)用了parallelize方法,因為這個單詞字面意思不大好理解,所以都用makeRDD就行了。
注意:local[*]里面加上*的意思是可以模擬多核多線程,要是不加的話那么就是模擬單線程,從內(nèi)存中創(chuàng)建makeRDD()方法要傳一個集合進去

package com.atguigu.bigdata.spark.core.wc.create_RDD

import org.apache.spark.api.java.JavaSparkContext.fromSparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
//在內(nèi)存(集合)中創(chuàng)建RDD
class Spark01_RDD_Memory {}
object Spark01_RDD_Memory{def main(args: Array[String]): Unit = {//TODO 準(zhǔn)備環(huán)境
    //這個 local[*] 里面加上*的意思是,可以模擬多核多線程,不加的話就是模擬的單線程
    val conf = new SparkConf().setMaster("local[*]").setAppName("create_RDD")
    val context = new SparkContext(conf)

    //TODO 創(chuàng)建RDD
    //從內(nèi)存中創(chuàng)建RDD,將內(nèi)存中集合的數(shù)據(jù)作為處理的數(shù)據(jù)
    val seq: Seq[Int] = Seq(1, 2, 3, 4)
    //parallelize 并行
    //val sc: RDD[Int] = context.parallelize(seq) //這里面?zhèn)魅氲膮?shù)是一個集合,當(dāng)做數(shù)據(jù)源,
    val sc: RDD[Int] = context.makeRDD(seq) //makeRDD方法和parallelize方法是一樣的
    sc.collect().foreach(println) //只有觸發(fā)collect方法,才會執(zhí)行我們的應(yīng)用程序

    //TODO 關(guān)閉環(huán)境
    context.stop()

  }
}
(2) 從外部存儲(文件)創(chuàng)建RDD

由外部存儲系統(tǒng)的數(shù)據(jù)集創(chuàng)建RDD 包括:本地的文件系統(tǒng),所有Hadoop支持的數(shù)據(jù)集,比如HDFS,HBase 等。
注意:這個文件的路徑,可以是項目目錄下,可以洗本地環(huán)境目錄下,或者說hdfs 的路徑下都是可以的。在文件中創(chuàng)建RDD,就要用textFile()方法將文件的路徑給導(dǎo)入進去。或者讀取數(shù)據(jù)的時候用wholeTextFiles()方法可以看到里面的數(shù)據(jù)來源,具體是來自于哪一份文件。
textFile:以行為單位來讀取數(shù)據(jù),讀取的數(shù)據(jù)都是字符串
wholeTextFIles:以文件為單位讀取數(shù)據(jù),讀取的結(jié)果表示為元組,第一個元素表示文件路徑,第二個元素表示文件內(nèi)容
在這里插入圖片描述

package com.atguigu.bigdata.spark.core.wc.create_RDD

import org.apache.spark.{SparkConf, SparkContext}

//從文件中創(chuàng)建RDD
class Spark02_RDD_File {}
object Spark02_RDD_File{def main(args: Array[String]): Unit = {//TODO 準(zhǔn)備環(huán)境
    val conf = new SparkConf().setMaster("local[*]").setAppName("create_RDD_File")
    val context = new SparkContext(conf)

    //TODO 創(chuàng)建RDD
    //從文件中創(chuàng)建RDD,將文件中的數(shù)據(jù)作為處理的數(shù)據(jù)源
    //path路徑默認以當(dāng)前環(huán)境的根路徑為基準(zhǔn),可以寫絕對路徑,也可以寫相對路徑,
    //還可以hdfs路徑也是可以的,例如:hdfs://master:9080/test.txt
    val file = context.textFile("datas/*")
    file.collect().foreach(println)


    //TODO 關(guān)閉環(huán)境
    context.stop()
  }
}
(3) 從其他RDD創(chuàng)建

主要是通過一個RDD運算完后,再產(chǎn)生新的RDD。

(4) 直接創(chuàng)建 RDD (new)

使用new的方式直接構(gòu)造 RDD,一般由 Spark 框架自身使用。

4、RDD 并行度與分區(qū)

默認情況下,Spark 可以將一個作業(yè)切分多個任務(wù)后,發(fā)送給Executor 節(jié)點并行計算,而能夠并行計算的任務(wù)數(shù)量我們稱之為并行度。這個數(shù)量可以在構(gòu)建RDD時指定。記住,這里的并行執(zhí)行的任務(wù)數(shù)量,并不是指的切分任務(wù)的數(shù)量,不要混淆了。

(1) makeRDD() 基于內(nèi)存創(chuàng)建的RDD的分區(qū)

注意:makeRDD()方法,第二個參數(shù)是個隱式參數(shù),是分區(qū)的數(shù)量,如果不傳的話那么默認分區(qū)跟本地環(huán)境的核有關(guān)。比如我的電腦是4核,那么分區(qū)就是分為四個,并行計算。saveAsTextFile()方法 將處理的數(shù)據(jù)保存成分區(qū)文件,里面的參數(shù)是要創(chuàng)建的文件名。然后輸出之后會自動生成一個這個名字的目錄,下面的文件是分區(qū)文件。

package com.atguigu.bigdata.spark.core.wc.create_RDD

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
//RDD 并行度
class Spark01_RDD_Memory_Par {}
object Spark01_RDD_Memory_Par{def main(args: Array[String]): Unit = {//TODO 準(zhǔn)備環(huán)境
    //這個 local[*] 里面加上*的意思是,可以模擬多核多線程,不加的話就是模擬的單線程
    val conf = new SparkConf().setMaster("local[*]").setAppName("create_RDD")
    val context = new SparkContext(conf)

    //TODO 創(chuàng)建RDD
    //RDD的并行度 & 分區(qū)
    //makeRDD 方法可以傳入第二個參數(shù),第二個參數(shù)是分區(qū)的數(shù)量
    //第二個參數(shù)是可以不傳的,因為是隱式參數(shù),如果不傳默認分區(qū)就是按照內(nèi)核數(shù)量決定的,我的內(nèi)核是4個,所以分區(qū)是4
    val rdd:RDD[Int] = context.makeRDD(List(1, 2, 3,4,5),3)  //里面的第一個參數(shù)是一個集合,第二個參數(shù)是分區(qū)的數(shù)量,分為幾個區(qū)
    //saveAsTextFile方法 將處理的數(shù)據(jù)保存成分區(qū)文件
    rdd.saveAsTextFile("output")//saveAsTextFile方法

    //TODO 關(guān)閉環(huán)境
    context.stop()

  }
}
(2) 基于文件創(chuàng)建的RDD 的分區(qū)

它分區(qū)分配數(shù)據(jù)的方式和Hadoop的分區(qū)的方式是一樣的。和上面的基于內(nèi)存的分配數(shù)據(jù)的方式不一樣。

package com.atguigu.bigdata.spark.core.wc.create_RDD

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

class Spark02_RDD_File_Par {}
object Spark02_RDD_File_Par{def main(args: Array[String]): Unit = {//TODO 準(zhǔn)備環(huán)境
    val conf = new SparkConf().setMaster("local[*]").setAppName("create_RDD_File2")
    val context = new SparkContext(conf)

    //TODO 創(chuàng)建RDD
    //textFile 可以將文件作為數(shù)據(jù)處理的數(shù)據(jù)源,默認也可以設(shè)定分區(qū)
    // minPartitions:最小分區(qū)數(shù)量
    //默認分區(qū)是兩個,如果不想使用默認的分區(qū)數(shù)量那么,可以通過第二個參數(shù)指定分區(qū)數(shù)
    val rdd: RDD[String] = context.textFile("datas/one.txt",3)
    rdd.saveAsTextFile("output")


    //TODO 關(guān)閉環(huán)境
    context.stop()
  }
}
(3) 數(shù)據(jù)分區(qū)的規(guī)則

首先看字節(jié),可以看到這個文件一共是14個字節(jié),加上回車符
在這里插入圖片描述
然后我們分兩個區(qū),14 / 2 = 7,一個區(qū)是7個字節(jié),再用 14 / 7 = 2 可以看到剛好是2沒有余數(shù),所以沒有問題剛剛好。首先是要計算行偏移量,計算出第一行的行偏移量是多少,計算出第二行是多少,然后計算行偏移量的范圍就可以算出每個分區(qū)得到的數(shù)據(jù)是什么了。
在這里插入圖片描述
查看結(jié)果
在這里插入圖片描述

你是否還在尋找穩(wěn)定的海外服務(wù)器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準(zhǔn)確流量調(diào)度確保服務(wù)器高可用性,企業(yè)級服務(wù)器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧

新聞名稱:Spark核心編程-創(chuàng)新互聯(lián)
文章鏈接:http://www.rwnh.cn/article28/ccesjp.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站建設(shè)App開發(fā)、定制開發(fā)企業(yè)建站、建站公司、關(guān)鍵詞優(yōu)化

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都網(wǎng)站建設(shè)
榆中县| 东山县| 社旗县| 基隆市| 镇安县| 甘南县| 南陵县| 呼和浩特市| 西贡区| 文昌市| 柳河县| 扶余县| 五原县| 邵东县| 乌兰察布市| 铁岭县| 平安县| 错那县| 江口县| 枣强县| 杭锦旗| 晋江市| 吉安市| 湄潭县| 称多县| 桂东县| 根河市| 天峻县| 通山县| 韶山市| 平乐县| 青岛市| 祁阳县| 昌黎县| 咸阳市| 蚌埠市| 子长县| 兰州市| 逊克县| 昭通市| 凤山县|