當前位置:首頁 » 編程語言 » sparksql分桶優化
擴展閱讀
webinf下怎麼引入js 2023-08-31 21:54:13
堡壘機怎麼打開web 2023-08-31 21:54:11

sparksql分桶優化

發布時間: 2022-10-20 10:48:32

『壹』 詳解 Spark 中的 Bucketing

Bucketing 就是利用 buckets(按列進行分桶)來決定數據分區(partition)的一種優化技術,它可以幫助在計算中避免數據交換(avoid data shuffle)。並行計算的時候shuffle常常會耗費非常多的時間和資源.

Bucketing 的基本原理比較好理解,它會根據你指定的列(可以是一個也可以是多個)計算哈希值,然後具有相同哈希值的數據將會被分到相同的分區。

Bucket的最終目的也是實現分區,但是和Partition的原理不同,當我們根據指定列進行Partition的時候,Spark會根據列的名字對數據進行分區(如果沒有指定列名則會根據一個隨機信息對數據進行分區)。Bucketing的最大不同在於它使用了指定列的哈希值,這樣可以保證具有相同列值的數據被分到相同的分區。

目前在使用 bucketBy 的時候,必須和 sortBy,saveAsTable 一起使用,如下。這個操作其實是將數據保存到了文件中(如果不指定path,也會保存到一個臨時目錄中)。

數據分桶保存之後,我們才能使用它。

在一個SparkSession內,保存之後你可以通過如下命令通過表名獲取其對應的DataFrame.

其中spark是一個SparkSession對象。獲取之後就可以使用DataFrame或者在sql中使用表。

如果你要使用歷史保存的數據,那麼就不能用上述方法了,也不能像讀取常規文件一樣使用 spark.read.parquet() ,這種方式讀進來的數據是不帶bucket信息的。正確的方法是利用CREATE TABLE 語句,詳情可用參考 https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-table.html

示例如下:

在我們join兩個表的時候,如果兩個表最好按照相同的列劃分成相同的buckets,就可以完全避免shuffle。根據前面所述的hash值計算方法,兩個表具有相同列值的數據會存放在相同的機器上,這樣在進行join操作時就不需要再去和其他機器通訊,直接在本地完成計算即可。假設你有左右兩個表,各有兩個分區,那麼join的時候實際計算就是下圖的樣子,兩個機器進行計算,並且計算後分區還是2.

而當需要shuffle的時候,會是這樣的,

細心的你可能發現了,上面兩個分區對應兩個Executor,下面shuffle之後對應的怎麼成了三個Executor了?沒錯,當數據進行shuffle之後,分區數就不再保持和輸入的數據相同了,實際上也沒有必要保持相同。

我們考慮的是大數據表的連接,本地測試的時候一般使用小的表,所以逆序需要將小表自動廣播的配置關掉。如果開啟小表廣播,那麼兩個小表的join之後分區數是不會變的,例如:

關閉配置的命令如下:

正常情況下join之後分區數會發生變化:

這個200其實就是 "spark.sql.shuffle.partitions" 配置的值,默認就是200. 所以如果在Join過程中出現了shuffle,join之後的分區一定會變,並且變成spark.sql.shuffle.partitions的值。通常你需要根據自己的集群資源修改這個值,從而優化並行度,但是shuffle是不可避免的。

實際測試結果如下:

Spark依然會利用一些Bucekt的信息,但具體怎麼執行目前還不太清楚,還是保持一致的好。

另外,如果你spark job的可用計算核心數小於Bucket值,那麼從文件中讀取之後Bucekt值會變,就是說bucket的數目不會超過你能使用的最大計算核數。

在處理null值的時候,我們可能會用到一些特殊的函數或者符號,如下表所示。但是在使用bucket的時候這里有個坑,一定要躲過。join的時候千萬不要使用 <=> 符號,使用之後spark就會忽略bucket信息,繼續shuffle數據,原因可能和hash計算有關。

原文連接

『貳』 sparksql 內存不釋放

1.1. Spark SQL的前世今生

Shark是一個為Spark設計的大規模數據倉庫系統,它與Hive兼容。Shark建立在Hive的代碼基礎上,並通過將Hive的部分物理執行計劃交換出來。這個方法使得Shark的用戶可以加速Hive的查詢,但是Shark繼承了Hive的大且復雜的代碼使得Shark很難優化和維護,同時Shark依賴於Spark的版本。隨著我們遇到了性能優化的上限,以及集成SQL的一些復雜的分析功能,我們發現Hive的MapRece設計的框架限制了Shark的發展。在2014年7月1日的Spark Summit上,Databricks宣布終止對Shark的開發,將重點放到Spark SQL上。

1.2. 什麼是Spark SQL

Spark SQL是Spark用來處理結構化數據的一個模塊,它提供了一個編程抽象叫做DataFrame並且作為分布式SQL查詢引擎的作用。

相比於Spark RDD API,Spark SQL包含了對結構化數據和在其上運算的更多信息,Spark SQL使用這些信息進行了額外的優化,使對結構化數據的操作更加高效和方便。

有多種方式去使用Spark SQL,包括SQL、DataFrames API和Datasets API。但無論是哪種API或者是編程語言,它們都是基於同樣的執行引擎,因此你可以在不同的API之間隨意切換,它們各有各的特點,看你喜歡那種風格。

1.3. 為什麼要學習Spark SQL

我們已經學習了Hive,它是將Hive SQL轉換成MapRece然後提交到集群中去執行,大大簡化了編寫MapRece程序的復雜性,由於MapRece這種計算模型執行效率比較慢,所以Spark SQL應運而生,它是將Spark SQL轉換成RDD,然後提交到集群中去運行,執行效率非常快!

1.易整合

『叄』 Spark連接到MySQL並執行查詢為什麼速度會快

spark mysql 為什麼快
一,SQL查詢優化:指,使用的語句是不是冗餘的,就是有沒有無用的。 你可用用explain 你的語句來比較分板一番。比如:select * from wc where 1;與select * from wc二者的執行時間不一樣的; 二,SQL執行計劃就是用於描述SQL引擎在執行一個sql語

『肆』 spark sql怎麼劃分stage

其實sql就是關系操作。關系操作跟map,rece這些基礎運算元對應起來的(spark其實基礎運算元也是map,rece,只是在此基礎上做了擴展)。比如projection,filter是窄依賴,join,semi join,outer join是寬依賴。
具體流程會比較復雜。首先spark會解析這條sql,生成語法樹(spark2.0會通過antlr4解析),然後經過邏輯優化(dataframe中有logic plan),然後轉換為map rece,生成對應的操作運算元(projection,filter,join等)。有了寬依賴,窄依賴,也就能劃分stage了。

『伍』 spark 跑spark sql時cpu利用率特別高怎麼辦

優化過程中常用到方法
查看查詢的整個運行計劃
scala>query.queryExecution
查看查詢的Unresolved LogicalPlan
scala>query.queryExecution.logical
查看查詢的Analyzed LogicalPlan
scala>query.queryExecution.analyzed
查看優化後的LogicalPlan
scala>query.queryExecution.optimizedPlan
查看物理計劃
scala>query.queryExecution.sparkPlan
查看RDD的轉換過程
scala>query.toDebugString
SparkSQL調優
Spark是一個快速的內存計算框架,同時是一個並行運算的框架,在計算性能調優的時候,除了要考慮廣為人知的木桶原理外,還要考慮平行運算的Amdahl定理。
木桶原理又稱短板理論,其核心思想是:一隻木桶盛水的多少,並不取決於桶壁上最高的那塊木塊,而是取決於桶壁上最短的那塊。將這個理論應用到系統性能優化上,系統的最終性能取決於系統中性能表現最差的組件。例如,即使系統擁有充足的內存資源和CPU資源,但是如果磁碟I/O性能低下,那麼系統的總體性能是取決於當前最慢的磁碟I/O速度,而不是當前最優越的CPU或者內存。在這種情況下,如果需要進一步提升系統性能,優化內存或者CPU資源是毫無用處的。只有提高磁碟I/O性能才能對系統的整體性能進行優化。

SparkSQL作為Spark的一個組件,在調優的時候,也要充分考慮到上面的兩個原理,既要考慮如何充分的利用硬體資源,又要考慮如何利用好分布式系統的並行計算。由於測試環境條件有限,本篇不能做出更詳盡的實驗數據來說明,只能在理論上加以說明。
2.1 並行性
SparkSQL在集群中運行,將一個查詢任務分解成大量的Task分配給集群中的各個節點來運行。通常情況下,Task的數量是大於集群的並行度,shuffle的時候預設的spark.sql.shuffle.partitionsw為200個partition,也就是200個Task:
而實驗的集群環境卻只能並行3個Task,也就是說同時只能有3個Task保持Running:

這時大家就應該明白了,要跑完這200個Task就要跑200/3=67批次。如何減少運行的批次呢?那就要盡量提高查詢任務的並行度。查詢任務的並行度由兩方面決定:集群的處理能力和集群的有效處理能力。
l對於Spark Standalone集群來說,集群的處理能力是由conf/spark-env中的SPARK_WORKER_INSTANCES參數、SPARK_WORKER_CORES參數決定的;而SPARK_WORKER_INSTANCES*SPARK_WORKER_CORES不能超過物理機器的實際CPU core;
l集群的有效處理能力是指集群中空閑的集群資源,一般是指使用spark-submit或spark-shell時指定的--total-executor-cores,一般情況下,我們不需要指定,這時候,Spark Standalone集群會將所有空閑的core分配給查詢,並且在Task輪詢運行過程中,Standalone集群會將其他spark應用程序運行完後空閑出來的core也分配給正在運行中的查詢。
綜上所述,SparkSQL的查詢並行度主要和集群的core數量相關,合理配置每個節點的core可以提高集群的並行度,提高查詢的效率。
2.2 高效的數據格式
高效的數據格式,一方面是加快了數據的讀入速度,另一方面可以減少內存的消耗。高效的數據格式包括多個方面:
2.2.1 數據本地性
分布式計算系統的精粹在於移動計算而非移動數據,但是在實際的計算過程中,總存在著移動數據的情況,除非是在集群的所有節點上都保存數據的副本。移動數據,將數據從一個節點移動到另一個節點進行計算,不但消耗了網路IO,也消耗了磁碟IO,降低了整個計算的效率。為了提高數據的本地性,除了優化演算法(也就是修改spark內存,難度有點高),就是合理設置數據的副本。設置數據的副本,這需要通過配置參數並長期觀察運行狀態才能獲取的一個經驗值。
下面是Spark webUI監控Stage的一個圖:
lPROCESS_LOCAL是指讀取緩存在本地節點的數據
lNODE_LOCAL是指讀取本地節點硬碟數據
lANY是指讀取非本地節點數據
l通常讀取數據PROCESS_LOCAL>NODE_LOCAL>ANY,盡量使數據以PROCESS_LOCAL或NODE_LOCAL方式讀取。其中PROCESS_LOCAL還和cache有關。

2.2.2 合適的數據類型
對於要查詢的數據,定義合適的數據類型也是非常有必要。對於一個tinyint可以使用的數據列,不需要為了方便定義成int類型,一個tinyint的數據佔用了1個byte,而int佔用了4個byte。也就是說,一旦將這數據進行緩存的話,內存的消耗將增加數倍。在SparkSQL里,定義合適的數據類型可以節省有限的內存資源。
2.2.3 合適的數據列
對於要查詢的數據,在寫SQL語句的時候,盡量寫出要查詢的列名,如Select a,b from tbl,而不是使用Select * from tbl;這樣不但可以減少磁碟IO,也減少緩存時消耗的內存。
2.2.4 優的數據存儲格式
在查詢的時候,最終還是要讀取存儲在文件系統中的文件。採用更優的數據存儲格式,將有利於數據的讀取速度。查看SparkSQL的Stage,可以發現,很多時候,數據讀取消耗佔有很大的比重。對於sqlContext來說,支持 textFiile、SequenceFile、ParquetFile、jsonFile;對於hiveContext來說,支持AvroFile、ORCFile、Parquet File,以及各種壓縮。根據自己的業務需求,測試並選擇合適的數據存儲格式將有利於提高SparkSQL的查詢效率。
2.3 內存的使用
spark應用程序最糾結的地方就是內存的使用了,也是最能體現「細節是魔鬼」的地方。Spark的內存配置項有不少,其中比較重要的幾個是:
lSPARK_WORKER_MEMORY,在conf/spark-env.sh中配置SPARK_WORKER_MEMORY 和SPARK_WORKER_INSTANCES,可以充分的利用節點的內存資源,SPARK_WORKER_INSTANCES*SPARK_WORKER_MEMORY不要超過節點本身具備的內存容量;
lexecutor-memory,在spark-shell或spark-submit提交spark應用程序時申請使用的內存數量;不要超過節點的SPARK_WORKER_MEMORY;
lspark.storage.memoryFraction spark應用程序在所申請的內存資源中可用於cache的比例
lspark.shuffle.memoryFraction spark應用程序在所申請的內存資源中可用於shuffle的比例
在實際使用上,對於後兩個參數,可以根據常用查詢的內存消耗情況做適當的變更。另外,在SparkSQL使用上,有幾點建議:
l對於頻繁使用的表或查詢才進行緩存,對於只使用一次的表不需要緩存;
l對於join操作,優先緩存較小的表;
l要多注意Stage的監控,多思考如何才能更多的Task使用PROCESS_LOCAL;
l要多注意Storage的監控,多思考如何才能Fraction cached的比例更多

2.4 合適的Task
對於SparkSQL,還有一個比較重要的參數,就是shuffle時候的Task數量,通過spark.sql.shuffle.partitions來調節。調節的基礎是spark集群的處理能力和要處理的數據量,spark的默認值是200。Task過多,會產生很多的任務啟動開銷,Task多少,每個Task的處理時間過長,容易straggle。
2.5 其他的一些建議
優化的方面的內容很多,但大部分都是細節性的內容,下面就簡單地提提:
l 想要獲取更好的表達式查詢速度,可以將spark.sql.codegen設置為Ture;
l 對於大數據集的計算結果,不要使用collect() ,collect()就結果返回給driver,很容易撐爆driver的內存;一般直接輸出到分布式文件系統中;
l 對於Worker傾斜,設置spark.speculation=true 將持續不給力的節點去掉;
l 對於數據傾斜,採用加入部分中間步驟,如聚合後cache,具體情況具體分析;
l 適當的使用序化方案以及壓縮方案;
l 善於利用集群監控系統,將集群的運行狀況維持在一個合理的、平穩的狀態;
l 善於解決重點矛盾,多觀察Stage中的Task,查看最耗時的Task,查找原因並改善;

『陸』 spark sql啟動後執行越來越慢是為什麼

Shark為了實現Hive兼容,在HQL方面重用了Hive中HQL的解析、邏輯執行計劃翻譯、執行計劃優化等邏輯,可以近似認為僅將物理執行計劃從MR作業替換成了Spark作業(輔以內存列式存儲等各種和Hive關系不大的優化);同時還依賴HiveMetastore和HiveSe

『柒』 「每日一道大數據面試題系列」spark如何調優

如果面試時被問到spark任務如何調優,我們該如何回答呢?

下面我們從四大方面回答這個問題,保證吊打面試官。

一、spark性能調優

1、分配更多的資源

比如增加執行器個數(num_executor)、增加執行器個數(executor_cores)、增加執行器內存(executor_memory)

2、調節並行度

spark.default.parallelism

3、重構RDD架構以及RDD持久化

盡量去復用RDD,差不多的RDD可以抽取成一個共同的RDD,公共RDD一定要實現持久化

4、廣播變數

SparkContext.broadcast方法創建一個對象,通過value方法訪問

5、使用kryo序列化

SparkConf中設置屬性:spark.serializer: org.apache.spark.serializer.kryoSerializer

6、使用fastutil優化數據格式(代替java中的Array、List、Set、Map)

7、調節數據本地化等待時長

調節參數: spark.locality.wait

二、JVM調優

降低cache操作的內存佔比 1.6版本之前使用的是靜態內存管理

spark中堆內存被劃分為兩塊:

一塊是專門來給RDD作cachepersist持久化的 StorageMemory,另一塊是給spark運算元函數運行使用的,存放函數中自己創建的對象。

1.6版本之後採用統一內存管理機制

storage和execution各佔50%,若己方不足對方空餘可佔用對方空間

可嘗試調節executor堆外內存

spark.yarn.executor.memoryOverhead = 2048m

調節連接等待時長

spark.core.connection.ack.wait.timeout = 300

三、shuffle數據傾斜調優

1、預聚合源數據,對hive源表提前進行聚合操作,在hive聚合之後,spark任務再去讀取

2、檢查傾斜的key是否是臟數據,可以提前過濾

3、提高shuffle操作rece的並行度

4、使用隨機key實現雙重聚合

5、將rece端 join轉換成map端 join

6、sample采樣傾斜key,單獨進行join後在union

7、使用隨機數以及擴容表進行join

四、運算元調優

1、使用mapPartition提升map類操作的性能

2、filter過後使用coalesce減少分區數量

3、使用foreachPartition優化寫數據性能

4、使用repartition解決sparkSql低並行度的性能問題

5、receByKey替換groupByKey實現map讀預聚合


『捌』 spark sql啟動後執行越來越慢是為什麼

Shark為了實現Hive兼容,在HQL方面重用了Hive中HQL的解析、邏輯執行計劃翻譯、執行計劃優化等邏輯,可以近似認為僅將物理執行計劃從MR作業替換成了Spark作業(輔以內存列式存儲等各種和Hive關系不大的優化);同時還依賴Hive Metastore和Hive SerDe(用於兼容現有的各種Hive存儲格式)。這一策略導致了兩個問題,第一是執行計劃優化完全依賴於Hive,不方便添加新的優化策略;二是因為MR是進程級並行,寫代碼的時候不是很注意線程安全問題,導致Shark不得不使用另外一套獨立維護的打了補丁的Hive源碼分支(至於為何相關修改沒有合並到Hive主線,我也不太清楚)。

『玖』 Spark SQL 到底怎麼搭建起來

一般spark sql用於訪問hive集群的表數據吧?
我們的spark是訪問hive集群的,步驟還是很簡單的,大致如下:
1)安裝spark時需要將hive-site.xml,yarn-site.xml,hdfs-site.xml都拷貝到spark/conf中(yarn-site.xml是因為我們是spark on yarn)
2)編程時用HiveContext,調用sql(...)就好了,如:
val hc = new HiveContext(sc)
hc.sql( "select ..." ) 這里的sql語句自己發揮吧~
不過spark sql穩定性不高,寫復雜語句時partition和優化策略不太合理,小數據量玩一下就好(如spark streaming中使用也還可以),大數據量暫時不建議用~

『拾』 Spark SQL 和 Shark 在架構上有哪些區別將來會合並嗎

Shark為了實現Hive兼容,在HQL方面重用了Hive中HQL的解析、邏輯執行計劃翻譯、執行計劃優化等邏輯,可以近似認為僅將物理執行計劃從MR作業替換成了Spark作業(輔以內存列式存儲等各種和Hive關系不大的優化);同時還依賴Hive Metastore和Hive SerDe(用於兼容現有的各種Hive存儲格式)。這一策略導致了兩個問題,第一是執行計劃優化完全依賴於Hive,不方便添加新的優化策略;二是因為MR是進程級並行,寫代碼的時候不是很注意線程安全問題,導致Shark不得不使用另外一套獨立維護的打了補丁的Hive源碼分支。

Spark SQL解決了這兩個問題。第一,Spark SQL在Hive兼容層面僅依賴HQL parser、Hive Metastore和Hive SerDe。也就是說,從HQL被解析成抽象語法樹(AST)起,就全部由Spark SQL接管了。執行計劃生成和優化都由Catalyst負責。藉助Scala的模式匹配等函數式語言特性,利用Catalyst開發執行計劃優化策略比Hive要簡潔得多。去年Spark summit上Catalyst的作者Michael Armbrust對Catalyst做了一個簡要介紹:2013 | Spark Summit(知乎竟然不能自定義鏈接的文字?)。第二,相對於Shark,由於進一步削減了對Hive的依賴,Spark SQL不再需要自行維護打了patch的Hive分支。Shark後續將全面採用Spark SQL作為引擎,不僅僅是查詢優化方面。

此外,除了兼容HQL、加速現有Hive數據的查詢分析以外,Spark SQL還支持直接對原生RDD對象進行關系查詢。同時,除了HQL以外,Spark SQL還內建了一個精簡的SQL parser,以及一套Scala DSL。也就是說,如果只是使用Spark SQL內建的SQL方言或Scala DSL對原生RDD對象進行關系查詢,用戶在開發Spark應用時完全不需要依賴Hive的任何東西。

能夠對原生RDD對象進行關系查詢,個人認為大大降低了用戶門檻。一方面當然是因為熟悉SQL的人比熟悉Spark API的人多,另一方面是因為Spark SQL之下有Catalyst驅動的查詢計劃優化引擎。雖然在很多方面Spark的性能完爆Hadoop MapRece好幾條街,但Spark的運行時模型也比MapRece復雜不少,使得Spark應用的性能調優比較tricky。雖然從代碼量上來看,Spark應用往往是對等的MR應用的好幾分之一,但裸用Spark API開發高效Spark應用還是需要花些心思的。這就體現出Spark SQL的優勢了:即便用戶寫出的查詢不那麼高效,Catalyst也可以自動應用一系列常見優化策略。
。。