當前位置:首頁 » 編程語言 » sparksqlorc
擴展閱讀
webinf下怎麼引入js 2023-08-31 21:54:13
堡壘機怎麼打開web 2023-08-31 21:54:11

sparksqlorc

發布時間: 2022-07-04 17:00:51

Ⅰ spark 跑spark sql時cpu利用率特別高怎麼辦

優化過程中常用到方法
查看查詢的整個運行計劃
scala>query.queryExecution
查看查詢的Unresolved LogicalPlan
scala>query.queryExecution.logical
查看查詢的Analyzed LogicalPlan
scala>query.queryExecution.analyzed
查看優化後的LogicalPlan
scala>query.queryExecution.optimizedPlan
查看物理計劃
scala>query.queryExecution.sparkPlan
查看RDD的轉換過程
scala>query.toDebugString
SparkSQL調優
Spark是一個快速的內存計算框架,同時是一個並行運算的框架,在計算性能調優的時候,除了要考慮廣為人知的木桶原理外,還要考慮平行運算的Amdahl定理。
木桶原理又稱短板理論,其核心思想是:一隻木桶盛水的多少,並不取決於桶壁上最高的那塊木塊,而是取決於桶壁上最短的那塊。將這個理論應用到系統性能優化上,系統的最終性能取決於系統中性能表現最差的組件。例如,即使系統擁有充足的內存資源和CPU資源,但是如果磁碟I/O性能低下,那麼系統的總體性能是取決於當前最慢的磁碟I/O速度,而不是當前最優越的CPU或者內存。在這種情況下,如果需要進一步提升系統性能,優化內存或者CPU資源是毫無用處的。只有提高磁碟I/O性能才能對系統的整體性能進行優化。

SparkSQL作為Spark的一個組件,在調優的時候,也要充分考慮到上面的兩個原理,既要考慮如何充分的利用硬體資源,又要考慮如何利用好分布式系統的並行計算。由於測試環境條件有限,本篇不能做出更詳盡的實驗數據來說明,只能在理論上加以說明。
2.1 並行性
SparkSQL在集群中運行,將一個查詢任務分解成大量的Task分配給集群中的各個節點來運行。通常情況下,Task的數量是大於集群的並行度,shuffle的時候預設的spark.sql.shuffle.partitionsw為200個partition,也就是200個Task:
而實驗的集群環境卻只能並行3個Task,也就是說同時只能有3個Task保持Running:

這時大家就應該明白了,要跑完這200個Task就要跑200/3=67批次。如何減少運行的批次呢?那就要盡量提高查詢任務的並行度。查詢任務的並行度由兩方面決定:集群的處理能力和集群的有效處理能力。
l對於Spark Standalone集群來說,集群的處理能力是由conf/spark-env中的SPARK_WORKER_INSTANCES參數、SPARK_WORKER_CORES參數決定的;而SPARK_WORKER_INSTANCES*SPARK_WORKER_CORES不能超過物理機器的實際CPU core;
l集群的有效處理能力是指集群中空閑的集群資源,一般是指使用spark-submit或spark-shell時指定的--total-executor-cores,一般情況下,我們不需要指定,這時候,Spark Standalone集群會將所有空閑的core分配給查詢,並且在Task輪詢運行過程中,Standalone集群會將其他spark應用程序運行完後空閑出來的core也分配給正在運行中的查詢。
綜上所述,SparkSQL的查詢並行度主要和集群的core數量相關,合理配置每個節點的core可以提高集群的並行度,提高查詢的效率。
2.2 高效的數據格式
高效的數據格式,一方面是加快了數據的讀入速度,另一方面可以減少內存的消耗。高效的數據格式包括多個方面:
2.2.1 數據本地性
分布式計算系統的精粹在於移動計算而非移動數據,但是在實際的計算過程中,總存在著移動數據的情況,除非是在集群的所有節點上都保存數據的副本。移動數據,將數據從一個節點移動到另一個節點進行計算,不但消耗了網路IO,也消耗了磁碟IO,降低了整個計算的效率。為了提高數據的本地性,除了優化演算法(也就是修改spark內存,難度有點高),就是合理設置數據的副本。設置數據的副本,這需要通過配置參數並長期觀察運行狀態才能獲取的一個經驗值。
下面是Spark webUI監控Stage的一個圖:
lPROCESS_LOCAL是指讀取緩存在本地節點的數據
lNODE_LOCAL是指讀取本地節點硬碟數據
lANY是指讀取非本地節點數據
l通常讀取數據PROCESS_LOCAL>NODE_LOCAL>ANY,盡量使數據以PROCESS_LOCAL或NODE_LOCAL方式讀取。其中PROCESS_LOCAL還和cache有關。

2.2.2 合適的數據類型
對於要查詢的數據,定義合適的數據類型也是非常有必要。對於一個tinyint可以使用的數據列,不需要為了方便定義成int類型,一個tinyint的數據佔用了1個byte,而int佔用了4個byte。也就是說,一旦將這數據進行緩存的話,內存的消耗將增加數倍。在SparkSQL里,定義合適的數據類型可以節省有限的內存資源。
2.2.3 合適的數據列
對於要查詢的數據,在寫SQL語句的時候,盡量寫出要查詢的列名,如Select a,b from tbl,而不是使用Select * from tbl;這樣不但可以減少磁碟IO,也減少緩存時消耗的內存。
2.2.4 優的數據存儲格式
在查詢的時候,最終還是要讀取存儲在文件系統中的文件。採用更優的數據存儲格式,將有利於數據的讀取速度。查看SparkSQL的Stage,可以發現,很多時候,數據讀取消耗佔有很大的比重。對於sqlContext來說,支持 textFiile、SequenceFile、ParquetFile、jsonFile;對於hiveContext來說,支持AvroFile、ORCFile、Parquet File,以及各種壓縮。根據自己的業務需求,測試並選擇合適的數據存儲格式將有利於提高SparkSQL的查詢效率。
2.3 內存的使用
spark應用程序最糾結的地方就是內存的使用了,也是最能體現「細節是魔鬼」的地方。Spark的內存配置項有不少,其中比較重要的幾個是:
lSPARK_WORKER_MEMORY,在conf/spark-env.sh中配置SPARK_WORKER_MEMORY 和SPARK_WORKER_INSTANCES,可以充分的利用節點的內存資源,SPARK_WORKER_INSTANCES*SPARK_WORKER_MEMORY不要超過節點本身具備的內存容量;
lexecutor-memory,在spark-shell或spark-submit提交spark應用程序時申請使用的內存數量;不要超過節點的SPARK_WORKER_MEMORY;
lspark.storage.memoryFraction spark應用程序在所申請的內存資源中可用於cache的比例
lspark.shuffle.memoryFraction spark應用程序在所申請的內存資源中可用於shuffle的比例
在實際使用上,對於後兩個參數,可以根據常用查詢的內存消耗情況做適當的變更。另外,在SparkSQL使用上,有幾點建議:
l對於頻繁使用的表或查詢才進行緩存,對於只使用一次的表不需要緩存;
l對於join操作,優先緩存較小的表;
l要多注意Stage的監控,多思考如何才能更多的Task使用PROCESS_LOCAL;
l要多注意Storage的監控,多思考如何才能Fraction cached的比例更多

2.4 合適的Task
對於SparkSQL,還有一個比較重要的參數,就是shuffle時候的Task數量,通過spark.sql.shuffle.partitions來調節。調節的基礎是spark集群的處理能力和要處理的數據量,spark的默認值是200。Task過多,會產生很多的任務啟動開銷,Task多少,每個Task的處理時間過長,容易straggle。
2.5 其他的一些建議
優化的方面的內容很多,但大部分都是細節性的內容,下面就簡單地提提:
l 想要獲取更好的表達式查詢速度,可以將spark.sql.codegen設置為Ture;
l 對於大數據集的計算結果,不要使用collect() ,collect()就結果返回給driver,很容易撐爆driver的內存;一般直接輸出到分布式文件系統中;
l 對於Worker傾斜,設置spark.speculation=true 將持續不給力的節點去掉;
l 對於數據傾斜,採用加入部分中間步驟,如聚合後cache,具體情況具體分析;
l 適當的使用序化方案以及壓縮方案;
l 善於利用集群監控系統,將集群的運行狀況維持在一個合理的、平穩的狀態;
l 善於解決重點矛盾,多觀察Stage中的Task,查看最耗時的Task,查找原因並改善;

Ⅱ spark SQL和hive到底什麼關系

Hive是一種基於HDFS的數據倉庫,並且提供了基於SQL模型的,針對存儲了大數據的數據倉庫,進行分布式交互查詢的查詢引擎。

SparkSQL並不能完全替代Hive,它替代的是Hive的查詢引擎,SparkSQL由於其底層基於Spark自身的基於內存的特點,因此速度是Hive查詢引擎的數倍以上,Spark本身是不提供存儲的,所以不可能替代Hive作為數據倉庫的這個功能。

SparkSQL相較於Hive的另外一個優點,是支持大量不同的數據源,包括hive、json、parquet、jdbc等等。SparkSQL由於身處Spark技術堆棧內,基於RDD來工作,因此可以與Spark的其他組件無縫整合使用,配合起來實現許多復雜的功能。比如SparkSQL支持可以直接針對hdfs文件執行sql語句。

Ⅲ spark的sqlcontext怎麼show databases

spark是一個分布式計算框架, 從他的作業調度可以看到,它的資源分配粒度很粗,CPU的核數進行分配的,集群的CPU資源是有限的
同時spark sql資源計算時需要把大量數據載入到內存中,需要消耗集群大量的內存資源,再做shuffle的時候,又需要消耗大量的網路IO和磁碟IO, 如果同時多個job執行,那麼每個job獲得資源要麼少,要麼需要排隊。而不能像關系型資料庫那麼提供高並發的服務。

Ⅳ spark Provider org.apache.spark.sql.hive.orc.DefaultSource could not be instantiated

解決辦法:在程序上
去掉程序中的setMaster("local")
這條語句並不是在集群中提交job
Fei joe
val conf = new SparkConf().setAppName("Map").setMaster("local")

Ⅳ spark sql支持哪些sql操作

支持Shark和sparkSQL 。

但是,隨著Spark的發展,其中sparkSQL作為Spark生態的一員繼續發展,而不再受限於hive,只是兼容hive;而hive on
spark是一個hive的發展計劃,該計劃將spark作為hive的底層引擎之一,也就是說,hive將不再受限於一個引擎,可以採用map-
rece、Tez、spark等引擎。

Ⅵ Spark SQL到底支持什麼SQL語句

Spark SQL到底支持什麼SQL語句
scala語言不是很容易懂,但是裡面有解析SQL的方法,可以看出支持的SQL語句,至少關鍵詞是很明確的。
protected val ALL = Keyword("ALL")
protected val AND = Keyword("AND")
protected val APPROXIMATE = Keyword("APPROXIMATE")
protected val AS = Keyword("AS")
protected val ASC = Keyword("ASC")
protected val BETWEEN = Keyword("BETWEEN")
protected val BY = Keyword("BY")
protected val CASE = Keyword("CASE")
protected val CAST = Keyword("CAST")
protected val DESC = Keyword("DESC")
protected val DISTINCT = Keyword("DISTINCT")

Ⅶ sparkSQL和spark有什麼區別

Spark為結構化數據處理引入了一個稱為Spark SQL的編程模塊。簡而言之,sparkSQL是Spark的前身,是在Hadoop發展過程中,為了給熟悉RDBMS但又不理解MapRece的技術人員提供快速上手的工具。
sparkSQL提供了一個稱為DataFrame(數據框)的編程抽象,DF的底層仍然是RDD,並且可以充當分布式SQL查詢引擎。

SparkSql有哪些特點呢?

1)引入了新的RDD類型SchemaRDD,可以像傳統資料庫定義表一樣來定義SchemaRDD。

2)在應用程序中可以混合使用不同來源的數據,如可以將來自HiveQL的數據和來自SQL的數據進行Join操作。

3)內嵌了查詢優化框架,在把SQL解析成邏輯執行計劃之後,最後變成RDD的計算。

Ⅷ hive和sparksql的區別

歷史上存在的原理,以前都是使用hive來構建數據倉庫,所以存在大量對hive所管理的數據查詢的需求。而hive、shark、sparlSQL都可以進行hive的數據查詢。shark是使用了hive的sql語法解析器和優化器,修改了執行器,使之物理執行過程是跑在spark上;而sparkSQL是使用了自身的語法解析器、優化器和執行器,同時sparkSQL還擴展了介面,不單單支持hive數據的查詢,可以進行多種數據源的數據查詢。

Ⅸ Spark RDD,DataFrame和DataSet的區別

RDD、DataFrame和DataSet是容易產生混淆的概念,必須對其相互之間對比,才可以知道其中異同。

RDD和DataFrame

RDD-DataFrame

上圖直觀地體現了DataFrame和RDD的區別。左側的RDD[Person]雖然以Person為類型參數,但Spark框架本身不了解
Person類的內部結構。而右側的DataFrame卻提供了詳細的結構信息,使得Spark
SQL可以清楚地知道該數據集中包含哪些列,每列的名稱和類型各是什麼。DataFrame多了數據的結構信息,即schema。RDD是分布式的
Java對象的集合。DataFrame是分布式的Row對象的集合。DataFrame除了提供了比RDD更豐富的運算元以外,更重要的特點是提升執行效
率、減少數據讀取以及執行計劃的優化,比如filter下推、裁剪等。

提升執行效率

RDD
API是函數式的,強調不變性,在大部分場景下傾向於創建新對象而不是修改老對象。這一特點雖然帶來了干凈整潔的API,卻也使得Spark應用程序在運
行期傾向於創建大量臨時對象,對GC造成壓力。在現有RDD
API的基礎之上,我們固然可以利用mapPartitions方法來重載RDD單個分片內的數據創建方式,用復用可變對象的方式來減小對象分配和GC的
開銷,但這犧牲了代碼的可讀性,而且要求開發者對Spark運行時機制有一定的了解,門檻較高。另一方面,Spark
SQL在框架內部已經在各種可能的情況下盡量重用對象,這樣做雖然在內部會打破了不變性,但在將數據返回給用戶時,還會重新轉為不可變數據。利用
DataFrame API進行開發,可以免費地享受到這些優化效果。

減少數據讀取

分析大數據,最快的方法就是 ——忽略它。這里的「忽略」並不是熟視無睹,而是根據查詢條件進行恰當的剪枝。

上文討論分區表時提到的分區剪 枝便是其中一種——當查詢的過濾條件中涉及到分區列時,我們可以根據查詢條件剪掉肯定不包含目標數據的分區目錄,從而減少IO。

對於一些「智能」數據格 式,Spark
SQL還可以根據數據文件中附帶的統計信息來進行剪枝。簡單來說,在這類數據格式中,數據是分段保存的,每段數據都帶有最大值、最小值、null值數量等

一些基本的統計信息。當統計信息表名某一數據段肯定不包括符合查詢條件的目標數據時,該數據段就可以直接跳過(例如某整數列a某段的最大值為100,而查
詢條件要求a > 200)。

此外,Spark SQL也可以充分利用RCFile、ORC、Parquet等列式存儲格式的優勢,僅掃描查詢真正涉及的列,忽略其餘列的數據。

執行優化

人口數據分析示例

為了說明查詢優化,我們來看上圖展示的人口數據分析的示例。圖中構造了兩個DataFrame,將它們join之後又做了一次filter操作。如
果原封不動地執行這個執行計劃,最終的執行效率是不高的。因為join是一個代價較大的操作,也可能會產生一個較大的數據集。如果我們能將filter
下推到 join下方,先對DataFrame進行過濾,再join過濾後的較小的結果集,便可以有效縮短執行時間。而Spark
SQL的查詢優化器正是這樣做的。簡而言之,邏輯查詢計劃優化就是一個利用基於關系代數的等價變換,將高成本的操作替換為低成本操作的過程。

得到的優化執行計劃在轉換成物 理執行計劃的過程中,還可以根據具體的數據源的特性將過濾條件下推至數據源內。最右側的物理執行計劃中Filter之所以消失不見,就是因為溶入了用於執行最終的讀取操作的表掃描節點內。

對於普通開發者而言,查詢優化 器的意義在於,即便是經驗並不豐富的程序員寫出的次優的查詢,也可以被盡量轉換為高效的形式予以執行。

RDD和DataSet

DataSet以Catalyst邏輯執行計劃表示,並且數據以編碼的二進制形式被存儲,不需要反序列化就可以執行sorting、shuffle等操作。
DataSet創立需要一個顯式的Encoder,把對象序列化為二進制,可以把對象的scheme映射為SparkSQl類型,然而RDD依賴於運行時反射機制。

通過上面兩點,DataSet的性能比RDD的要好很多。

DataFrame和DataSet

Dataset可以認為是DataFrame的一個特例,主要區別是Dataset每一個record存儲的是一個強類型值而不是一個Row。因此具有如下三個特點:

DataSet可以在編譯時檢查類型

並且是面向對象的編程介面。用wordcount舉例:
//DataFrame

// Load a text file and interpret each line as a java.lang.String
val ds = sqlContext.read.text("/home/spark/1.6/lines").as[String]
val result = ds
.flatMap(_.split(" ")) // Split on whitespace
.filter(_ != "") // Filter empty words
.toDF() // Convert to DataFrame to perform aggregation / sorting
.groupBy($"value") // Count number of occurences of each word
.agg(count("*") as "numOccurances")
.orderBy($"numOccurances" desc) // Show most common words first

後面版本DataFrame會繼承DataSet,DataFrame是面向Spark SQL的介面。
//DataSet,完全使用scala編程,不要切換到DataFrame

val wordCount =
ds.flatMap(_.split(" "))
.filter(_ != "")
.groupBy(_.toLowerCase()) // Instead of grouping on a column expression (i.e. $"value") we pass a lambda function
.count()

DataFrame和DataSet可以相互轉化, df.as[ElementType] 這樣可以把DataFrame轉化為DataSet, ds.toDF() 這樣可以把DataSet轉化為DataFrame。