⑴ 到底spark 緩存機制怎麼用
Spark 中一個很重要的能力是將數據persisting持久化(或稱為caching緩存),在多個操作間都可以訪問這些持久化的數據。當持久化一個 RDD 時,每個節點的其它分區都可以使用 RDD 在內存中進行計算,在該數據上的其他 action 操作將直接使用內存中的數據。這樣會讓以後的 action 操作計算速度加快(通常運行速度會加速 10 倍)。緩存是迭代演算法和快速的互動式使用的重要工具。
RDD 可以使用persist()方法或cache()方法進行持久化。數據將會在第一次 action 操作時進行計算,並緩存在節點的內存中。Spark 的緩存具有容錯機制,如果一個緩存的 RDD 的某個分區丟失了,Spark 將按照原來的計算過程,自動重新計算並進行緩存。
另外,每個持久化的 RDD 可以使用不同的storage level存儲級別進行緩存,例如,持久化到磁碟、已序列化的 Java 對象形式持久化到內存(可以節省空間)、跨節點間復制、以 off-heap 的方式存儲在 Tachyon。這些存儲級別通過傳遞一個StorageLevel對象 (Scala,Java,Python) 給persist()方法進行設置。cache()方法是使用默認存儲級別的快捷設置方法,默認的存儲級別是StorageLevel.MEMORY_ONLY(將反序列化的對象存儲到內存中)
⑵ hadoop和spark的區別
1、解決問題的層面不一樣
首先,Hadoop和Apache Spark兩者都是大數據框架,但是各自存在的目的不盡相同。Hadoop實質上更多是一個分布式數據基礎設施:它將巨大的數據集分派到一個由普通計算機組成的集群中的多個節點進行存儲,意味著您不需要購買和維護昂貴的伺服器硬體。
同時,Hadoop還會索引和跟蹤這些數據,讓大數據處理和分析效率達到前所未有的高度。Spark,則是那麼一個專門用來對那些分布式存儲的大數據進行處理的工具,它並不會進行分布式數據的存儲。
2、兩者可合可分
Hadoop除了提供為大家所共識的HDFS分布式數據存儲功能之外,還提供了叫做MapRece的數據處理功能。所以這里我們完全可以拋開Spark,使用Hadoop自身的MapRece來完成數據的處理。
相反,Spark也不是非要依附在Hadoop身上才能生存。但如上所述,畢竟它沒有提供文件管理系統,所以,它必須和其他的分布式文件系統進行集成才能運作。這里我們可以選擇Hadoop的HDFS,也可以選擇其他的基於雲的數據系統平台。但Spark默認來說還是被用在Hadoop上面的,畢竟,大家都認為它們的結合是最好的。
以下是從網上摘錄的對MapRece的最簡潔明了的解析:
我們要數圖書館中的所有書。你數1號書架,我數2號書架。這就是「Map」。我們人越多,數書就更快。
現在我們到一起,把所有人的統計數加在一起。這就是「Rece」。
3、Spark數據處理速度秒殺MapRece
Spark因為其處理數據的方式不一樣,會比MapRece快上很多。MapRece是分步對數據進行處理的: 」從集群中讀取數據,進行一次處理,將結果寫到集群,從集群中讀取更新後的數據,進行下一次的處理,將結果寫到集群,等等…「 Booz Allen Hamilton的數據科學家Kirk Borne如此解析。
反觀Spark,它會在內存中以接近「實時」的時間完成所有的數據分析:「從集群中讀取數據,完成所有必須的分析處理,將結果寫回集群,完成,」 Born說道。Spark的批處理速度比MapRece快近10倍,內存中的數據分析速度則快近100倍。
如果需要處理的數據和結果需求大部分情況下是靜態的,且你也有耐心等待批處理的完成的話,MapRece的處理方式也是完全可以接受的。
但如果你需要對流數據進行分析,比如那些來自於工廠的感測器收集回來的數據,又或者說你的應用是需要多重數據處理的,那麼你也許更應該使用Spark進行處理。
大部分機器學習演算法都是需要多重數據處理的。此外,通常會用到Spark的應用場景有以下方面:實時的市場活動,在線產品推薦,網路安全分析,機器日記監控等。
4、災難恢復
兩者的災難恢復方式迥異,但是都很不錯。因為Hadoop將每次處理後的數據都寫入到磁碟上,所以其天生就能很有彈性的對系統錯誤進行處理。
Spark的數據對象存儲在分布於數據集群中的叫做彈性分布式數據集(RDD: Resilient Distributed Dataset)中。這些數據對象既可以放在內存,也可以放在磁碟,所以RDD同樣也可以提供完成的災難恢復功能。
⑶ 如何理解spark中RDD和DataFrame的結構
RDD就是一個分布式的無序的列表。
RDD中可以存儲任何的單機類型的數據,但是,直接使用RDD在欄位需求明顯時,存在運算元難以復用的缺點。
例如,現在RDD存的數據是一個Person類型的數據,現在要求所有每個年齡段(10年一個年齡段)的人中最高的身高與最大的體重。
使用RDD介面,因為RDD不了解其中存儲的數據的具體結構,數據的結構對它而言是黑盒,於是這就需要用戶自己去寫一個很特化的聚合的函數來完成這樣的功能。
而有了DataFrame,則框架會去了解RDD中的數據是什麼樣的結構的,用戶可以說清楚自己對每一列進行什麼樣的操作,這樣就有可能可以實現一個運算元,用在多個列上,比較容易進行運算元的復用。甚至,未來又要同時求出每個年齡段內不同的姓氏有多少個,則使用RDD介面,之前的函數需要改動很大才能滿足需求,而使用DataFrame介面,則只需要添加對這一個列的處理,原來的max/min的相關列處理都可保持不變。
總而言之,DataFrame相關介面就是RDD的一個擴展,讓RDD了解了RDD中存儲的數據包含哪些列,並可以在列上進行操作。
⑷ rdd的定義
RDD(Resilient Distributed Datasets)的定義是: 彈性分布式數據集, 是分布式內存的一個抽象概念,RDD提供了一種高度受限的共享內存模型,即RDD是只讀的記錄分區的集合,只能通過在其他RDD執行確定的轉換操作(如map、join和group by)而創建,然而這些限制使得實現容錯的開銷很低。對開發者而言,RDD可以看作是Spark的一個對象,它本身運行於內存中,如讀文件是一個RDD,對文件計算是一個RDD,結果集也是一個RDD ,不同的分片、 數據之間的依賴 、key-value類型的map數據都可以看做RDD。
RDD是只讀的、分區記錄的集合。RDD只能基於在穩定物理存儲中的數據集和其他已有的RDD上執行確定性操作來創建。這些確定性操作稱之為轉換,如map、filter、groupBy、join(轉換不是程開發人員在RDD上執行的操作) 。
RDD不需要物化。RDD含有如何從其他RDD衍生(即計算)出本RDD的相關信息(即Lineage),據此可以從物理存儲的數據計算出相應的RDD分區 。
RDD作為數據結構,本質上是一個只讀的分區記錄集合。一個RDD可以包含多個分區,每個分區就是一個dataset片段。RDD可以相互依賴。如果RDD的每個分區最多隻能被一個Child RDD的一個分區使用,則稱之為narrow dependency;若多個Child RDD分區都可以依賴,則稱之為wide dependency。不同的操作依據其特性,可能會產生不同的依賴。例如map操作會產生narrow dependency,而join操作則產生wide dependency。
⑸ 如何學習Spark大數據
大數據技術,只有相互分享才能彼此共同進步,為什麼我們的程序員經常活躍在各大博客和技術論壇?其主要原因是:程序員們並不拒絕分享,甚至是樂於去貢獻代碼。身為一個程序員,特別值得他們驕傲的事情就是自己寫的代碼被別人用到而帶來的成就感。
今天,為我們分享了當今火爆的大數據技術,講解了spark技術的核心,我們可以不從事數據分析行業,但國家的一些技術還是要了解的。
Spark核心概念Resilient Distributed Dataset (RDD)彈性分布數據集
RDD是Spark的基本抽象,是對分布式內存的抽象使用,實現了以操作本地集合的方式來操作分布式數據集的抽象實現。RDD是Spark特別核心的東西,它表示已被分區,不可變的並能夠被並行操作的數據集合,不同的數據集格式對應不同的RDD實現。RDD必須是可序列化的。RDD可以cache到內存中,每次對RDD數據集的操作之後的結果,都可以存放到內存中,下一個操作可以直接從內存中輸入,省去了MapRece大量的磁碟IO操作。這對於迭代運算比較常見的機器學習演算法, 互動式數據挖掘來說,效率提升比較大。
RDD的特點:
1、它是在集群節點上的不可變的、已分區的集合對象。
2、用並行轉換的方式來創建如(map, filter, join, etc)。
3、失敗自動重建。
4、可以控制存儲級別(內存、磁碟等)來進行重用。
5、必須是可序列化的。
5、是靜態類型的。
RDD的好處:
1、RDD只能從持久存儲或經過Transformations操作產生,相比於分布式共享內存(DSM)可以更高效實現容錯,對於丟失部分數據分區只需根據它的lineage就可重新計算出來,而不需要做特定的Checkpoint。
2、RDD的不變性,可以實現類Hadoop MapRece的推測式執行。
3、RDD的數據分區特性,可以用數據的本地性來提高性能,這與Hadoop MapRece是一樣的。
4、RDD都是可序列化的,在內存不足時可自動降級為磁碟存儲,把RDD存儲於磁碟上,這時性能會有大的下降但不會差於現在的MapRece。
RDD的存儲與分區
1、用戶可以選擇不同的存儲級別存儲RDD以便重用。
2、當前RDD默認是存儲於內存,但當內存不足時,RDD會spill到disk。
3、RDD在需要進行分區把數據分布於集群中時會根據每條記錄Key進行分區(如Hash 分區),以此保證兩個數據集在Join時能高效。
RDD的內部表示
在RDD的內部實現中每個RDD都可以使用5個方面的特性來表示:
1、分區列表(數據塊列表)
2、計算每個分片的函數(根據父RDD計算出此RDD)
3、對父RDD的依賴列表
4、對key-value RDD的Partitioner【可選】
5、每個數據分片的預定義地址列表(如HDFS上的數據塊的地址)【可選】
大數據是互聯網發展的方向,大數據人才是未來的高薪貴族。隨著大數據人才的供不應求,大數據人才的薪資待遇也在不斷提升。
⑹ 為什麼說rdd是不變的數據結構存儲
首選你要知道什麼是RDD;
什麼是RDD
RDD的全稱是「彈性分布式數據集」(Resilient Distributed Dataset)。首先,它是一個數據集,就像Scala語言中的Array、List、Tuple、Set、Map也是數據集合一樣,但從操作上看RDD最像Array和List,裡面的數據都是平鋪的,可以順序遍歷。而且Array、List對象擁有的許多操作RDD對象也有,比如flatMap、map、filter、rece、groupBy等。
其次,RDD是分布存儲的。裡面的成員被水平切割成小的數據塊,分散在集群的多個節點上,便於對RDD裡面的數據進行並行計算。
最後,RDD的分布是彈性的,不是固定不變的。RDD的一些操作可以被拆分成對各數據塊直接計算,不涉及其他節點,比如map。這樣的操作一般在數據塊所在的節點上直接進行,不影響RDD的分布,除非某個節點故障需要轉換到其他節點上。但是在有些操作中,只訪問部分數據塊是無法完成的,必須訪問RDD的所有數據塊。比如groupBy,在做groupBy之前完全不知道每個key的分布,必須遍歷RDD的所有數據塊,將具有相同key的元素匯聚在一起,這樣RDD的分布就完全重組,而且數量也可能發生變化。此外,RDD的彈性還表現在高可靠性上。
⑺ scala 中rdd類型用什麼頭文件
1.RDD介紹:
RDD,彈性分布式數據集,即分布式的元素集合。在spark中,對所有數據的操作不外乎是創建RDD、轉化已有的RDD以及調用RDD操作進行求值。在這一切的背後,Spark會自動將RDD中的數據分發到集群中,並將操作並行化。
Spark中的RDD就是一個不可變的分布式對象集合。每個RDD都被分為多個分區,這些分區運行在集群中的不同節點上。RDD可以包含Python,Java,Scala中任意類型的對象,甚至可以包含用戶自定義的對象。
用戶可以使用兩種方法創建RDD:讀取一個外部數據集,或在驅動器程序中分發驅動器程序中的對象集合,比如list或者set。
RDD的轉化操作都是惰性求值的,這意味著我們對RDD調用轉化操作,操作不會立即執行。相反,Spark會在內部記錄下所要求執行的操作的相關信息。我們不應該把RDD看做存放著特定數據的數據集,而最好把每個RDD當做我們通過轉化操作構建出來的、記錄如何計算數據的指令列表。數據讀取到RDD中的操作也是惰性的,數據只會在必要時讀取。轉化操作和讀取操作都有可能多次執行。
2.創建RDD數據集
(1)讀取一個外部數據集
val input=sc.textFile(inputFileDir)
(2)分發對象集合,這里以list為例
val lines =sc.parallelize(List("hello world","this is a test"));
3.RDD操作
(1)轉化操作
實現過濾器轉化操作:
val lines =sc.parallelize(List("error:a","error:b","error:c","test"));
val errors=lines.filter(line => line.contains("error"));
errors.collect().foreach(println);
輸出:
error:a
error:b
error:c
可見,列表list中包含詞語error的表項都被正確的過濾出來了。
(2)合並操作
將兩個RDD數據集合並為一個RDD數據集
接上述程序示例:
val lines =sc.parallelize(List("error:a","error:b","error:c","test","warnings:a"));
val errors=lines.filter(line => line.contains("error"));
val warnings =lines.filter(line => line.contains("warnings"));
val unionLines =errors.union(warnings);
unionLines.collect().foreach(println);
輸出:
error:a
error:b
error:c
warning:a
可見,將原始列表項中的所有error項和warning項都過濾出來了。
(3)獲取RDD數據集中的部分或者全部元素
①獲取RDD數據集中的部分元素 .take(int num) 返回值List<T>
獲取RDD數據集中的前num項。
/**
* Take the first num elements of the RDD. This currently scans the partitions *one by one*, so
* it will be slow if a lot of partitions are required. In that case, use collect() to get the
* whole RDD instead.
*/
def take(num: Int): JList[T]
程序示例:接上
unionLines.take(2).foreach(println);
輸出:
error:a
error:b
可見,輸出了RDD數據集unionLines的前2項
②獲取RDD數據集中的全部元素 .collect() 返回值 List<T>
程序示例:
val all =unionLines.collect();
all.foreach(println);
遍歷輸出RDD數據集unionLines的每一項
4.向spark傳遞函數
在scala中,我們可以把定義的內聯函數、方法的引用或靜態方法傳遞給Spark,就像Scala的其他函數式API一樣。我們還要考慮其他一些細節,必須所傳遞的函數及其引用的數據需要是可序列化的(實現了Java的Serializable介面)。除此之外,與Python類似,傳遞一個對象的方法或者欄位時,會包含對整個對象的引用。我們可以把需要的欄位放在一個局部變數中,來避免包含該欄位的整個對象。
class searchFunctions (val query:String){
def isMatch(s: String): Boolean = {
s.contains(query)
}
def getMatchFunctionReference(rdd: RDD[String]) :RDD[String]={
//問題: isMach表示 this.isMatch ,因此我們需要傳遞整個this
rdd.filter(isMatch)
}
def getMatchesFunctionReference(rdd: RDD[String]) :RDD[String] ={
//問題: query表示 this.query ,因此我們需要傳遞整個this
rdd.flatMap(line => line.split(query))
}
def getMatchesNoReference(rdd:RDD[String]):RDD[String] ={
//安全,只把我們需要的欄位拿出來放入局部變數之中
val query1=this.query;
rdd.flatMap(x =>x.split(query1)
)
}
}
5.針對每個元素的轉化操作:
轉化操作map()接收一個函數,把這個函數用於RDD中的每個元素,將函數的返回結果作為結果RDD中對應的元素。關鍵詞:轉化
轉化操作filter()接受一個函數,並將RDD中滿足該函數的元素放入新的RDD中返回。關鍵詞:過濾
示例圖如下所示:
RDD1.cartesian(RDD2)
返回兩個RDD數據集的笛卡爾集
程序示例:生成RDD集合{1,2} 和{1,2}的笛卡爾集
val rdd1=sc.parallelize(List(1,2));
val rdd2=sc.parallelize(List(1,2));
val rdd=rdd1.cartesian(rdd2);
println(rdd.collect().mkString("
"));
輸出:
(1,1)
(1,2)
(2,1)
(2,2)
7.行動操作
(1)rece操作
rece()接收一個函數作為參數,這個函數要操作兩個RDD的元素類型的數據並返回一個同樣類型的新元素。一個簡單的例子就是函數+,可以用它來對我們的RDD進行累加。使用rece(),可以很方便地計算出RDD中所有元素的總和,元素的個數,以及其他類型的聚合操作。
以下是求RDD數據集所有元素和的程序示例:
val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10));
val results=rdd.rece((x,y) =>x+y);
println(results);
輸出:55
(2)fold()操作
接收一個與rece()接收的函數簽名相同的函數,再加上一個初始值來作為每個分區第一次調用時的結果。你所提供的初始值應當是你提供的操作的單位元素,也就是說,使用你的函數對這個初始值進行多次計算不會改變結果(例如+對應的0,*對應的1,或者拼接操作對應的空列表)。
程序實例:
①計算RDD數據集中所有元素的和:
zeroValue=0;//求和時,初始值為0。
val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10));
val results=rdd.fold(0)((x,y) =>x+y);
println(results);
②計算RDD數據集中所有元素的積:
zeroValue=1;//求積時,初始值為1。
val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10));
val results=rdd.fold(1)((x,y) =>x*y);
println(results);
(3)aggregate()操作
aggregate()函數返回值類型不必與所操作的RDD類型相同。
與fold()類似,使用aggregate()時,需要提供我們期待返回的類型的初始值。然後通過一個函數把RDD中的元素合並起來放入累加器。考慮到每個節點是在本地進行累加的,最終,還需要提供第二個函數來將累加器兩兩合並。
以下是程序實例:
val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10));
val result=rdd.aggregate((0,0))(
(acc,value) =>(acc._1+value,acc._2+1),
(acc1,acc2) => (acc1._1+acc2._1, acc1._2+acc2._2)
)
val average=result._1/result._2;
println(average)
輸出:5
最終返回的是一個Tuple2<int,int>對象, 他被初始化為(0,0),當遇到一個int值時,將該int數的值加到Tuple2對象的_1中,並將_2值加1,如果遇到一個Tuple2對象時,將這個Tuple2的_1和_2的值歸並到最終返回的Tuple2值中去。
表格:對一個數據為{1,2,3,3}的RDD進行基本的RDD行動操作
函數名 目的 示例 結果
collect() 返回RDD的所有元素 rdd.collect() {1,2,3,3}
count() RDD的元素個數 rdd.count() 4
countByValue() 各元素在RDD中出現的次數 rdd.countByValue() {(1,1),
(2,1),
(3,2)
}
take(num) 從RDD中返回num個元素 rdd.take(2) {1,2}
top(num) 從RDD中返回最前面的num個元素 rdd.takeOrdered(2)(myOrdering) {3,3}
takeOrdered(num)
(ordering) 從RDD中按照提供的順序返回最前面的num個元素
rdd.takeSample(false,1) 非確定的
takeSample(withReplacement,num,[seed]) 從RDD中返回任意一些元素 rdd.takeSample(false,1) 非確定的
rece(func) 並行整合RDD中所有數據 rdd.rece((x,y) => x+y)
9
fold(zero)(func) 和rece()一樣,但是需要提供初始值 rdd.fold(0)((x,y) => x+y)
9
aggregate(zeroValue)(seqOp,combOp) 和rece()相似,但是通常返回不同類型的函數 rdd.aggregate((0,0))
((x,y) =>
(x._1+y,x._2+1),
(x,y)=>
(x._1+y._1,x._2+y._2)
) (9,4)
foreach(func) 對RDD中的每個元素使用給定的函數 rdd.foreach(func) 無
8.持久化緩存
因為Spark RDD是惰性求值的,而有時我們希望能多次使用同一個RDD。如果簡單地對RDD調用行動操作,Spark每次都會重算RDD以及它的所有依賴。這在迭代演算法中消耗格外大,因為迭代演算法常常會多次使用同一組數據。
為了避免多次計算同一個RDD,可以讓Spark對數據進行持久化。當我們讓Spark持久化存儲一個RDD時,計算出RDD的節點會分別保存它們所求出的分區數據。
出於不同的目的,我們可以為RDD選擇不同的持久化級別。默認情況下persist()會把數據以序列化的形式緩存在JVM的堆空間中
不同關鍵字對應的存儲級別表
級別
使用的空間
cpu時間
是否在內存
是否在磁碟
備注
MEMORY_ONLY
高
低
是
否
直接儲存在內存
MEMORY_ONLY_SER
低
高
是
否
序列化後儲存在內存里
MEMORY_AND_DISK
低
中等
部分
部分
如果數據在內存中放不下,溢寫在磁碟上
MEMORY_AND_DISK_SER
低
高
部分
部分
數據在內存中放不下,溢寫在磁碟中。內存中存放序列化的數據。
DISK_ONLY
低
高
否
是
直接儲存在硬碟裡面
程序示例:將RDD數據集持久化在內存中。
val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9,10)).persist(StorageLevel.MEMORY_ONLY);
println(rdd.count())
println(rdd.collect().mkString(","));
RDD還有unpersist()方法,調用該方法可以手動把持久化的RDD從緩存中移除。
9.不同的RDD類型
在scala中,將RDD轉為由特定函數的RDD(比如在RDD[Double]上進行數值操作),是由隱式轉換來自動處理的。這些隱式轉換可以隱式地將一個RDD轉為各種封裝類,比如DoubleRDDFunctions(數值數據的RDD)和PairRDDFunctions(鍵值對RDD),這樣我們就有了諸如mean()和variance()之類的額外的函數。
示常式序:
val rdd=sc.parallelize(List(1.0,2.0,3.0,4.0,5.0));
println(rdd.mean());
其實RDD[T]中並沒有mean()函數,只是隱式轉換自動將其轉換為DoubleRDDFunctions。
⑻ 如果中間輸出RDD在內存放不下會怎麼樣
一般來講,對於陌生的名詞,大家的第一個反應都是「What is it?」.
RDD是Spark的核心內容,在Spark的官方文檔中解釋如下:RDD is a fault-tolerant collection of elements that can be operated on in parallel.由此可見,其中有兩個關鍵詞:fault-tolerant & in parallel.首先,容錯性是RDD的一個重要特性;其次,它是並行計算的數據.
RDD的中文解釋為:彈性分布式數據集,全稱Resilient Distributed Datasets.賓語是dataset,即內存中的資料庫.RDD 只讀、可分區,這個數據集的全部或部分可以緩存在內存中,在多次計算間重用.所謂彈性,是指內存不夠時可以與磁碟進行交換.這涉及到了RDD的另一特性:內存計算,就是將數據保存到內存中.同時,為解決內存容量限制問題,Spark為我們提供了最大的自由度,所有數據均可由我們來進行cache的設置,包括是否cache和如何cache.
如果看到這里,你的思維里對RDD還是沒有任何概念的話,或許可以參照我的形象化理RDD,就是一個被武裝起來的數據集.
主體:a、由源數據分割而來,源碼中對應splits變數;
武器有下:b、數據集體內包含了它本身的「血統」信息,即dependencies變數,存儲著它的父RDD及兩者關系;
c、計算函數,即其與父RDD的轉化方式,對應源碼中的iterator(split) & compute函數;
d、一些關於如何分塊以及如何存放位置的元信息,eg:partitioner & preferredLocations.
有了這些武器,RDD的容錯機制也就顯而易見了.容錯,顧名思義就是在存在故障的情況下,計算機系統仍能正常工作.容錯通常有兩種方式 checkpoint 和logging update ,RDD 採用的是 logging update .Checkpoint( 數據檢查點)意味著要在各個機器間復制大數據,花費會很高,這種拷貝操作相當緩慢,而且會消耗大量的存儲資源,因此deserted.Logging update( 記錄更新),僅支持粗顆粒度變換,也就是說,僅記錄在單個塊上執行的單個操作,然後創建某個RDD的變換序列存儲下來,數據丟失時,就可通過「血統」重新計算,恢復數據.Nevertheless,血緣鏈(變換序列)變得很長時,建議用戶此時建立一些數據檢查點加快容錯速度.(saveAstextFile方法手動設置)
⑼ 6何為伯克利數據分析棧BDASMP3
所謂Spark是起源於美國加州大學伯克利分校AMPLab的大數據計算平台,在2011年開源,目前是Apache軟體基金會的頂級項目。隨著Spark在大數據計算領域的暫露頭角,越來越多的企業開始關注和使用。2014年11月,Spark在Daytona Gray Sort 100TB Benchmark競賽中打破了由Hadoop MapRece保持的排序記錄。Spark利用1/10的節點數,把100TB數據的排序時間從72分鍾提高到了23分鍾。
Spark在架構上包括內核部分和4個官方子模塊
Spark SQL
Spark Streaming
機器學習庫MLlib
圖計算庫GraphX
由Spark在伯克利的數據分析軟體棧BDAS(Berkeley Data Analytics Stack)中的位置可見,Spark專注於數據的計算,而數據的存儲在生產環境中往往還是由Hadoop分布式文件系統HDFS承擔。
Spark在BDAS中的位置
Spark被設計成支持多場景的通用大數據計算平台,它可以解決大數據計算中的批處理,交互查詢及流式計算等核心問題。Spark可以從多數據源的讀取數據,並且擁有不斷發展的機器學習庫和圖計算庫供開發者使用。數據和計算在Spark內核及Spark的子模塊中是打通的,這就意味著Spark內核和子模塊之間成為一個整體。Spark的各個子模塊以Spark內核為基礎,進一步支持更多的計算場景,例如使用Spark SQL讀入的數據可以作為機器學習庫MLlib的輸入。以下列舉了一些在Spark平台上的計算場景。
Spark的應用場景舉例
之前在大數據概述的課程中我們提到了Hadoop,大數據工程師都非常了解Hadoop MapRece一個最大的問題是在很多應用場景中速度非常慢,只適合離線的計算任務。這是由於MapRece需要將任務劃分成map和rece兩個階段,map階段產生的中間結果要寫回磁碟,而在這兩個階段之間需要進行shuffle操作。Shuffle操作需要從網路中的各個節點進行數據拷貝,使其往往成為最為耗時的步驟,這也是Hadoop MapRece慢的根本原因之一,大量的時間耗費在網路磁碟IO中而不是用於計算。在一些特定的計算場景中,例如像邏輯回歸這樣的迭代式的計算,MapRece的弊端會顯得更加明顯。
那Spark是如果設計分布式計算的呢?首先我們需要理解Spark中最重要的概念--彈性分布數據集(Resilient Distributed Dataset),也就是RDD。
關鍵詞:彈性分布數據集RDD
RDD是Spark中對數據和計算的抽象,是Spark中最核心的概念,它表示已被分片(partition),不可變的並能夠被並行操作的數據集合。對RDD的操作分為兩種transformation和action。Transformation操作是通過轉換從一個或多個RDD生成新的RDD。Action操作是從RDD生成最後的計算結果。在Spark最新的版本中,提供豐富的transformation和action操作,比起MapRece計算模型中僅有的兩種操作,會大大簡化程序開發的難度。
RDD的生成方式只有兩種,一是從數據源讀入,另一種就是從其它RDD通過transformation操作轉換。一個典型的Spark程序就是通過Spark上下文環境(SparkContext)生成一個或多個RDD,在這些RDD上通過一系列的transformation操作生成最終的RDD,最後通過調用最終RDD的action方法輸出結果。
每個RDD都可以用下面5個特性來表示,其中後兩個為可選的:
分片列表(數據塊列表)
計算每個分片的函數
對父RDD的依賴列表
對key-value類型的RDD的分片器(Partitioner)(可選)
每個數據分片的預定義地址列表(如HDFS上的數據塊的地址)(可選)
雖然Spark是基於內存的計算,但RDD不光可以存儲在內存中,根據useDisk、useMemory、useOffHeap, deserialized、replication五個參數的組合Spark提供了12種存儲級別,在後面介紹RDD的容錯機制時,我們會進一步理解。值得注意的是當StorageLevel設置成OFF_HEAP時,RDD實際被保存到Tachyon中。Tachyon是一個基於內存的分布式文件系統,目前正在快速發展,在這里我們就不做詳細介紹啦,可以通過其官方網站進一步了解。
DAG、Stage與任務的生成
Spark的計算發生在RDD的action操作,而對action之前的所有transformation,Spark只是記錄下RDD生成的軌跡,而不會觸發真正的計算。
Spark內核會在需要計算發生的時刻繪制一張關於計算路徑的有向無環圖,也就是DAG。舉個例子,在下圖中,從輸入中邏輯上生成A和C兩個RDD,經過一系列transformation操作,邏輯上生成了F,注意,我們說的是邏輯上,因為這時候計算沒有發生,Spark內核做的事情只是記錄了RDD的生成和依賴關系。當F要進行輸出時,也就是F進行了action操作,Spark會根據RDD的依賴生成DAG,並從起點開始真正的計算。
邏輯上的計算過程:DAG
有了計算的DAG圖,Spark內核下一步的任務就是根據DAG圖將計算劃分成任務集,也就是Stage,這樣可以將任務提交到計算節點進行真正的計算。Spark計算的中間結果默認是保存在內存中的,Spark在劃分Stage的時候會充分考慮在分布式計算中可流水線計算(pipeline)的部分來提高計算的效率,而在這個過程中,主要的根據就是RDD的依賴類型。
根據不同的transformation操作,RDD的依賴可以分為窄依賴(Narrow Dependency)和寬依賴(Wide Dependency,在代碼中為ShuffleDependency)兩種類型。窄依賴指的是生成的RDD中每個partition只依賴於父RDD(s) 固定的partition。寬依賴指的是生成的RDD的每一個partition都依賴於父 RDD(s) 所有partition。窄依賴典型的操作有map, filter, union等,寬依賴典型的操作有groupByKey, sortByKey等。可以看到,寬依賴往往意味著shuffle操作,這也是Spark劃分stage的主要邊界。對於窄依賴,Spark會將其盡量劃分在同一個stage中,因為它們可以進行流水線計算。
RDD的寬依賴和窄依賴
最後我們再通過下圖來詳細解釋一下Spark中的Stage劃分。我們從HDFS中讀入數據生成3個不同的RDD,通過一系列transformation操作後再將計算結果保存回HDFS。可以看到這幅DAG中只有join操作是一個寬依賴,Spark內核會以此為邊界將其前後劃分成不同的Stage. 同時我們可以注意到,在圖中Stage2中,從map到union都是窄依賴,這兩步操作可以形成一個流水線操作,通過map操作生成的partition可以不用等待整個RDD計算結束,而是繼續進行union操作,這樣大大提高了計算的效率。
Spark中的Stage劃分