怎么實(shí)踐Spark,針對這個問題,這篇文章詳細(xì)介紹了相對應(yīng)的分析和解答,希望可以幫助更多想解決這個問題的小伙伴找到更簡單易行的方法。
創(chuàng)新互聯(lián)建站是一家以重慶網(wǎng)站建設(shè)公司、網(wǎng)頁設(shè)計、品牌設(shè)計、軟件運(yùn)維、營銷推廣、小程序App開發(fā)等移動開發(fā)為一體互聯(lián)網(wǎng)公司。已累計為成都花箱等眾行業(yè)中小客戶提供優(yōu)質(zhì)的互聯(lián)網(wǎng)建站和軟件開發(fā)服務(wù)。
隨著項(xiàng)目的運(yùn)營,收集了很多的用戶數(shù)據(jù)。最近業(yè)務(wù)上想做些社交圖譜相關(guān)的產(chǎn)品,但因?yàn)閿?shù)據(jù)很多、很雜,傳統(tǒng)的數(shù)據(jù)庫查詢已經(jīng)滿足不了業(yè)務(wù)的需求。 試著用Spark
來做,權(quán)當(dāng)練練手了。
因?yàn)橛?code>Scala的開發(fā)經(jīng)驗(yàn),所以就不用官方提供的二進(jìn)制包了,自編譯scala 2.11
版本。
下載Spark:http://ftp.cuhk.edu.hk/pub/packages/apache.org/spark/spark-1.5.0/spark-1.5.0.tgz
tar zxf spark-1.5.0.tgz cd spark-1.5.0 ./dev/change-scala-version.sh 2.11 mvn -Pyarn -Phadoop-2.6 -Dscala-2.11 -DskipTests clean package
以上命令完成Spark
基于scala 2.11
版本的編譯。可以運(yùn)行自帶的一個示例程序來驗(yàn)證安裝是否成功。
./bin/run-example SparkPi
使用sbt
來構(gòu)建一個可提交的簡單Spark
程序,功能是計算每個用戶加入的群組,并把結(jié)果保存下來。project/Build.scala
配置文件如下:
import _root_.sbt.Keys._ import _root_.sbt._ import sbtassembly.AssemblyKeys._ object Build extends Build { override lazy val settings = super.settings :+ { shellPrompt := (s => Project.extract(s).currentProject.id + " > ") } lazy val root = Project("spark-MongoDB", file(".")) .settings( scalaVersion := "2.11.7", assemblyJarName in assembly := "spark-mongodb.jar", assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false), libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % verSpark % "scopeProvidedTest, "org.mongodb.mongo-hadoop" % "mongo-hadoop-core" % "1.4.0" excludeAll( ExclusionRule(organization = "javax.servlet"), ExclusionRule(organization = "commons-beanutils"), ExclusionRule(organization = "org.apache.hadoop"))) ) private val scopeProvidedTest = "provided,test" private val verSpark = "1.5.0" }
數(shù)據(jù)存儲在MongoDB
數(shù)據(jù)庫中,所以我們還需要使用mongo-hadoop
連接器來訪問MongoDB
數(shù)據(jù)庫。
示例程序非常的簡單,把數(shù)據(jù)從數(shù)據(jù)庫里全部讀出,使用map
來把每條記錄里用戶ID對應(yīng)加入的群組ID轉(zhuǎn)換成一個Set
,再使用 reduceByKey
來把相同用戶ID的set
合并到一起,存入數(shù)據(jù)庫即可。
import com.mongodb.BasicDBObject import com.mongodb.hadoop.{MongoInputFormat, MongoOutputFormat} import org.apache.hadoop.conf.Configuration import org.apache.spark.{SparkConf, SparkContext} import org.bson.BSONObject import scala.collection.JavaConverters._ object QQGroup { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("QQGroup") val sc = new SparkContext(sparkConf) val inputConfig = new Configuration() inputConfig.set("mongo.input.uri", "mongodb://192.168.31.121:27017/db.userGroup") inputConfig.set("mongo.input.fields", """{"userId":1, "groupId":1, "_id":0}""") inputConfig.set("mongo.input.noTimeout", "true") val documentRDD = sc.newAPIHadoopRDD( inputConfig, classOf[MongoInputFormat], classOf[Object], classOf[BSONObject]) val userRDD = documentRDD.map { case (_, doc) => (getValue(doc, "userId"), getValue(doc, "groupId")) }.reduceByKey(_ ++ _) val resultRDD = userRDD.map { case (userId, groupIds) => val o = new BasicDBObject() o.put("groupIds", groupIds.asJava) userId -> o } val outputConfig = new Configuration() outputConfig.set("mongo.output.uri", "mongodb://192.168.31.121:27017/db_result.userGroup") resultRDD.saveAsNewAPIHadoopFile( "file://this-is-completely-unused", classOf[Object], classOf[BSONObject], classOf[MongoOutputFormat[Object, BSONObject]], outputConfig) } def getValue(dbo: BSONObject, key: String) = { val value = dbo.get(key) if (value eq null) "" else value.asInstanceOf[String] } }
MongoDB
官方提供了Hadoop
連接器,Spark
可以使用mongo-hadoop
連接器來讀、寫MongoDB
數(shù)據(jù)庫。 主要的輸入配置薦有:
mongo.input.uri: MongoDB的連接URI
mongo.input.fields: 指定返回哪些數(shù)據(jù),與db.query
里的第2個參數(shù)功能一樣
mongo.input.query: MongoDB的查詢參數(shù)
相應(yīng)的MongoDB
也提供了一系列的輸出參數(shù),如:
mongo.output.uri: MongoDB的連接URI
sc.newAPIHadoopRDD()
方法有4個參數(shù),分別為:配置、輸入格式化類、待映射數(shù)據(jù)主鍵類型、待映射數(shù)據(jù)類型。
主要的操作代碼:
val userRDD = documentRDD.map { case (_, doc) => (getValue(doc, "userId"), Set(getValue(doc, "groupId"))) }.reduceByKey(_ ++ _) val resultRDD = userRDD.map { case (userId, groupIds) => val o = new BasicDBObject() o.put("groupIds", groupIds.asJava) userId -> o }
先使用map
方法獲取userId
和groupId
,并把groupId
轉(zhuǎn)換為一個Set
。
在把數(shù)據(jù)轉(zhuǎn)換成Tuple2
,就是一個KV的形式以后,我們就可以調(diào)用一系列的轉(zhuǎn)換方法來對RDD
進(jìn)行操作,這里使用reduceByKey
方法來將同一個userId
的所以value
都合并在一起。這樣我們就有了所有用戶對應(yīng)加入的群組 的一個RDD集了。
(RDD上有兩種類型的操作。一種是“變換”,它只是描述了待進(jìn)行的操作指令,并不會觸發(fā)實(shí)際的計算;另一種是“動作”, 它將觸發(fā)實(shí)際的計算動作,這時候系統(tǒng)才會實(shí)際的從數(shù)據(jù)源讀入數(shù)據(jù),操作內(nèi)存,保存數(shù)據(jù)等)
最后使用resultRDD.saveAsNewAPIHadoopFile()
方法來把計算結(jié)果存入MongoDB
,這里的一個參數(shù):用于指定 HDFS的存儲位置并不會使用到,因?yàn)?code>mongo-hadoop將會使用mongo.output.uri
指定的存儲URI連接地址來保存數(shù)據(jù)。
關(guān)于怎么實(shí)踐Spark問題的解答就分享到這里了,希望以上內(nèi)容可以對大家有一定的幫助,如果你還有很多疑惑沒有解開,可以關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道了解更多相關(guān)知識。
文章標(biāo)題:怎么實(shí)踐Spark
當(dāng)前路徑:http://www.rwnh.cn/article46/jdcheg.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供ChatGPT、營銷型網(wǎng)站建設(shè)、網(wǎng)站排名、移動網(wǎng)站建設(shè)、電子商務(wù)、網(wǎng)站收錄
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)