A. Spark RDD,DataFrame和DataSet的區別
RDD、DataFrame和DataSet是容易產生混淆的概念,必須對其相互之間對比,才可以知道其中異同。
RDD和DataFrame
RDD-DataFrame
上圖直觀地體現了DataFrame和RDD的區別。左側的RDD[Person]雖然以Person為類型參數,但Spark框架本身不了解
Person類的內部結構。而右側的DataFrame卻提供了詳細的結構信息,使得Spark
sql可以清楚地知道該數據集中包含哪些列,每列的名稱和類型各是什麼。DataFrame多了數據的結構信息,即schema。RDD是分布式的
Java對象的集合。DataFrame是分布式的Row對象的集合。DataFrame除了提供了比RDD更豐富的運算元以外,更重要的特點是提升執行效
率、減少數據讀取以及執行計劃的優化,比如filter下推、裁剪等。
提升執行效率
RDD
API是函數式的,強調不變性,在大部分場景下傾向於創建新對象而不是修改老對象。這一特點雖然帶來了干凈整潔的API,卻也使得Spark應用程序在運
行期傾向於創建大量臨時對象,對GC造成壓力。在現有RDD
API的基礎之上,我們固然可以利用mapPartitions方法來重載RDD單個分片內的數據創建方式,用復用可變對象的方式來減小對象分配和GC的
開銷,但這犧牲了代碼的可讀性,而且要求開發者對Spark運行時機制有一定的了解,門檻較高。另一方面,Spark
SQL在框架內部已經在各種可能的情況下盡量重用對象,這樣做雖然在內部會打破了不變性,但在將數據返回給用戶時,還會重新轉為不可變數據。利用
DataFrame API進行開發,可以地享受到這些優化效果。
減少數據讀取
分析大數據,最快的方法就是 ——忽略它。這里的「忽略」並不是熟視無睹,而是根據查詢條件進行恰當的剪枝。
上文討論分區表時提到的分區剪 枝便是其中一種——當查詢的過濾條件中涉及到分區列時,我們可以根據查詢條件剪掉肯定不包含目標數據的分區目錄,從而減少IO。
對於一些「智能」數據格 式,Spark
SQL還可以根據數據文件中附帶的統計信息來進行剪枝。簡單來說,在這類數據格式中,數據是分段保存的,每段數據都帶有最大值、最小值、null值數量等
一些基本的統計信息。當統計信息表名某一數據段肯定不包括符合查詢條件的目標數據時,該數據段就可以直接跳過(例如某整數列a某段的最大值為100,而查
詢條件要求a > 200)。
此外,Spark SQL也可以充分利用RCFile、ORC、Parquet等列式存儲格式的優勢,僅掃描查詢真正涉及的列,忽略其餘列的數據。
執行優化
人口數據分析示例
為了說明查詢優化,我們來看上圖展示的人口數據分析的示例。圖中構造了兩個DataFrame,將它們join之後又做了一次filter操作。如
果原封不動地執行這個執行計劃,最終的執行效率是不高的。因為join是一個代價較大的操作,也可能會產生一個較大的數據集。如果我們能將filter
下推到 join下方,先對DataFrame進行過濾,再join過濾後的較小的結果集,便可以有效縮短執行時間。而Spark
SQL的查詢優化器正是這樣做的。簡而言之,邏輯查詢計劃優化就是一個利用基於關系代數的等價變換,將高成本的操作替換為低成本操作的過程。
得到的優化執行計劃在轉換成物 理執行計劃的過程中,還可以根據具體的數據源的特性將過濾條件下推至數據源內。最右側的物理執行計劃中Filter之所以消失不見,就是因為溶入了用於執行最終的讀取操作的表掃描節點內。
對於普通開發者而言,查詢優化 器的意義在於,即便是經驗並不豐富的程序員寫出的次優的查詢,也可以被盡量轉換為高效的形式予以執行。
RDD和DataSet
DataSet以Catalyst邏輯執行計劃表示,並且數據以編碼的二進制形式被存儲,不需要反序列化就可以執行sorting、shuffle等操作。
DataSet創立需要一個顯式的Encoder,把對象序列化為二進制,可以把對象的scheme映射為SparkSQl類型,然而RDD依賴於運行時反射機制。
通過上面兩點,DataSet的性能比RDD的要好很多。
DataFrame和DataSet
Dataset可以認為是DataFrame的一個特例,主要區別是Dataset每一個record存儲的是一個強類型值而不是一個Row。因此具有如下三個特點:
DataSet可以在編譯時檢查類型
並且是面向對象的編程介面。用wordcount舉例:
//DataFrame
// Load a text file and interpret each line as a java.lang.String
val ds = sqlContext.read.text("/home/spark/1.6/lines").as[String]
val result = ds
.flatMap(_.split(" ")) // Split on whitespace
.filter(_ != "") // Filter empty words
.toDF() // Convert to DataFrame to perform aggregation / sorting
.groupBy($"value") // Count number of occurences of each word
.agg(count("*") as "numOccurances")
.orderBy($"numOccurances" desc) // Show most common words first
後面版本DataFrame會繼承DataSet,DataFrame是面向Spark SQL的介面。
//DataSet,完全使用scala編程,不要切換到DataFrame
val wordCount =
ds.flatMap(_.split(" "))
.filter(_ != "")
.groupBy(_.toLowerCase()) // Instead of grouping on a column expression (i.e. $"value") we pass a lambda function
.count()
DataFrame和DataSet可以相互轉化, df.as[ElementType] 這樣可以把DataFrame轉化為DataSet, ds.toDF() 這樣可以把DataSet轉化為DataFrame。
B. 2019-03-05 SparkSQL集群性能調優 CheatSheet
0.買高性能機器,增加節點
1.設置磁碟文件預讀值大小為16384,使用linux命令:
echo 16384 > /sys/block/{磁碟名}/queue/read_ahead_kb
2. Spark 任務序列化只支持JavaSerializer,數據序列化支持JavaSerializer和 KryoSerializer 。KryoSerializer能達到JavaSerializer的十倍。
3.在spark.driver.extraJavaOptions和spark.executor.extraJavaOptions配置項中添加參數:" -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps ",如果頻繁出現Full GC,需要優化GC。把RDD做Cache操作,通過日誌查看RDD在內存中的大小,如果數據太大,需要改變RDD的存儲級別來優化。
4.一般並行度設置為集群CPU總和的2-3倍
5.大表和小表做join操作時可以把小表Broadcast到各個節點,從而就可以把join操作轉變成普通的操作,減少了shuffle操作。
6. 合理設計DAG,減少shuffle //TODO
7.使用 mapPartitions 可以更靈活地操作數據,例如對一個很大的數據求TopN,當N不是很大時,可以先使用mapPartitions對每個partition求TopN,collect結果到本地之後再做排序取TopN。這樣相比直接對全量數據做排序取TopN效率要高很多。
8.當之前的操作有很多filter時,使用 coalesce 減少空運行的任務數量
9.當任務數過大時候Shuffle壓力太大導致程序掛住不動,或者出現linux資源受限的問題。此時需要對數據重新進行分區,使用 repartition 。
10.配置多個磁碟給 localDir ,shuffle時寫入數據速度增快
11. 別collect大數據量,數據會回到driver端,容易OOM。非要collect,請配置 spark.sql.bigdata.thriftServer.useHdfsCollect 為true,會存在hdfs再讀
12.盡量用receByKey,會在Map端做本地聚合
13. broadcase set/map而不是Iterator, set/map 查詢效率O(1) ,iteratorO(n)
14. 數據發生傾斜,repartition大法 ,查出key,salt it
15.使用Hash Shuffle時,通過設置 spark.shuffle.consolidateFiles 為true,來合並shuffle中間文件,減少shuffle文件的數量,減少文件IO操作以提升性能
16.Spark SQL 小表join,把小表broadcast出去。配置 spark.sql.autoBroadcastJoinThreshold 和 spark.sql.bigdata.useExecutorBroadcast 。小表在join 右端。
17.SparkSQL數據傾斜,配置 spark.sql.planner.skewJoin 和 spark.sql.planner.skewJoin.threshold
18. SparkSQL 小文件,配置 spark.sql.small.file.combine 和 spark.sql.small.file.split.size
C. spark 跑spark sql時cpu利用率特別高怎麼辦
優化過程中常用到方法
查看查詢的整個運行計劃
scala>query.queryExecution
查看查詢的Unresolved LogicalPlan
scala>query.queryExecution.logical
查看查詢的Analyzed LogicalPlan
scala>query.queryExecution.analyzed
查看優化後的LogicalPlan
scala>query.queryExecution.optimizedPlan
查看物理計劃
scala>query.queryExecution.sparkPlan
查看RDD的轉換過程
scala>query.toDebugString
SparkSQL調優
Spark是一個快速的內存計算框架,同時是一個並行運算的框架,在計算性能調優的時候,除了要考慮廣為人知的木桶原理外,還要考慮平行運算的Amdahl定理。
木桶原理又稱短板理論,其核心思想是:一隻木桶盛水的多少,並不取決於桶壁上最高的那塊木塊,而是取決於桶壁上最短的那塊。將這個理論應用到系統性能優化上,系統的最終性能取決於系統中性能表現最差的組件。例如,即使系統擁有充足的內存資源和CPU資源,但是如果磁碟I/O性能低下,那麼系統的總體性能是取決於當前最慢的磁碟I/O速度,而不是當前最優越的CPU或者內存。在這種情況下,如果需要進一步提升系統性能,優化內存或者CPU資源是毫無用處的。只有提高磁碟I/O性能才能對系統的整體性能進行優化。
SparkSQL作為Spark的一個組件,在調優的時候,也要充分考慮到上面的兩個原理,既要考慮如何充分的利用硬體資源,又要考慮如何利用好分布式系統的並行計算。由於測試環境條件有限,本篇不能做出更詳盡的實驗數據來說明,只能在理論上加以說明。
2.1 並行性
SparkSQL在集群中運行,將一個查詢任務分解成大量的Task分配給集群中的各個節點來運行。通常情況下,Task的數量是大於集群的並行度,shuffle的時候預設的spark.sql.shuffle.partitionsw為200個partition,也就是200個Task:
而實驗的集群環境卻只能並行3個Task,也就是說同時只能有3個Task保持Running:
這時大家就應該明白了,要跑完這200個Task就要跑200/3=67批次。如何減少運行的批次呢?那就要盡量提高查詢任務的並行度。查詢任務的並行度由兩方面決定:集群的處理能力和集群的有效處理能力。
l對於Spark Standalone集群來說,集群的處理能力是由conf/spark-env中的SPARK_WORKER_INSTANCES參數、SPARK_WORKER_CORES參數決定的;而SPARK_WORKER_INSTANCES*SPARK_WORKER_CORES不能超過物理機器的實際CPU core;
l集群的有效處理能力是指集群中空閑的集群資源,一般是指使用spark-submit或spark-shell時指定的--total-executor-cores,一般情況下,我們不需要指定,這時候,Spark Standalone集群會將所有空閑的core分配給查詢,並且在Task輪詢運行過程中,Standalone集群會將其他spark應用程序運行完後空閑出來的core也分配給正在運行中的查詢。
綜上所述,SparkSQL的查詢並行度主要和集群的core數量相關,合理配置每個節點的core可以提高集群的並行度,提高查詢的效率。
2.2 高效的數據格式
高效的數據格式,一方面是加快了數據的讀入速度,另一方面可以減少內存的消耗。高效的數據格式包括多個方面:
2.2.1 數據本地性
分布式計算系統的精粹在於移動計算而非移動數據,但是在實際的計算過程中,總存在著移動數據的情況,除非是在集群的所有節點上都保存數據的副本。移動數據,將數據從一個節點移動到另一個節點進行計算,不但消耗了網路IO,也消耗了磁碟IO,降低了整個計算的效率。為了提高數據的本地性,除了優化演算法(也就是修改spark內存,難度有點高),就是合理設置數據的副本。設置數據的副本,這需要通過配置參數並長期觀察運行狀態才能獲取的一個經驗值。
下面是Spark webUI監控Stage的一個圖:
lPROCESS_LOCAL是指讀取緩存在本地節點的數據
lNODE_LOCAL是指讀取本地節點硬碟數據
lANY是指讀取非本地節點數據
l通常讀取數據PROCESS_LOCAL>NODE_LOCAL>ANY,盡量使數據以PROCESS_LOCAL或NODE_LOCAL方式讀取。其中PROCESS_LOCAL還和cache有關。
2.2.2 合適的數據類型
對於要查詢的數據,定義合適的數據類型也是非常有必要。對於一個tinyint可以使用的數據列,不需要為了方便定義成int類型,一個tinyint的數據佔用了1個byte,而int佔用了4個byte。也就是說,一旦將這數據進行緩存的話,內存的消耗將增加數倍。在SparkSQL里,定義合適的數據類型可以節省有限的內存資源。
2.2.3 合適的數據列
對於要查詢的數據,在寫SQL語句的時候,盡量寫出要查詢的列名,如Select a,b from tbl,而不是使用Select * from tbl;這樣不但可以減少磁碟IO,也減少緩存時消耗的內存。
2.2.4 優的數據存儲格式
在查詢的時候,最終還是要讀取存儲在文件系統中的文件。採用更優的數據存儲格式,將有利於數據的讀取速度。查看SparkSQL的Stage,可以發現,很多時候,數據讀取消耗佔有很大的比重。對於sqlContext來說,支持 textFiile、SequenceFile、ParquetFile、jsonFile;對於hiveContext來說,支持AvroFile、ORCFile、Parquet File,以及各種壓縮。根據自己的業務需求,測試並選擇合適的數據存儲格式將有利於提高SparkSQL的查詢效率。
2.3 內存的使用
spark應用程序最糾結的地方就是內存的使用了,也是最能體現「細節是魔鬼」的地方。Spark的內存配置項有不少,其中比較重要的幾個是:
lSPARK_WORKER_MEMORY,在conf/spark-env.sh中配置SPARK_WORKER_MEMORY 和SPARK_WORKER_INSTANCES,可以充分的利用節點的內存資源,SPARK_WORKER_INSTANCES*SPARK_WORKER_MEMORY不要超過節點本身具備的內存容量;
lexecutor-memory,在spark-shell或spark-submit提交spark應用程序時申請使用的內存數量;不要超過節點的SPARK_WORKER_MEMORY;
lspark.storage.memoryFraction spark應用程序在所申請的內存資源中可用於cache的比例
lspark.shuffle.memoryFraction spark應用程序在所申請的內存資源中可用於shuffle的比例
在實際使用上,對於後兩個參數,可以根據常用查詢的內存消耗情況做適當的變更。另外,在SparkSQL使用上,有幾點建議:
l對於頻繁使用的表或查詢才進行緩存,對於只使用一次的表不需要緩存;
l對於join操作,優先緩存較小的表;
l要多注意Stage的監控,多思考如何才能更多的Task使用PROCESS_LOCAL;
l要多注意Storage的監控,多思考如何才能Fraction cached的比例更多
2.4 合適的Task
對於SparkSQL,還有一個比較重要的參數,就是shuffle時候的Task數量,通過spark.sql.shuffle.partitions來調節。調節的基礎是spark集群的處理能力和要處理的數據量,spark的默認值是200。Task過多,會產生很多的任務啟動開銷,Task多少,每個Task的處理時間過長,容易straggle。
2.5 其他的一些建議
優化的方面的內容很多,但大部分都是細節性的內容,下面就簡單地提提:
l 想要獲取更好的表達式查詢速度,可以將spark.sql.codegen設置為Ture;
l 對於大數據集的計算結果,不要使用collect() ,collect()就結果返回給driver,很容易撐爆driver的內存;一般直接輸出到分布式文件系統中;
l 對於Worker傾斜,設置spark.speculation=true 將持續不給力的節點去掉;
l 對於數據傾斜,採用加入部分中間步驟,如聚合後cache,具體情況具體分析;
l 適當的使用序化方案以及壓縮方案;
l 善於利用集群監控系統,將集群的運行狀況維持在一個合理的、平穩的狀態;
l 善於解決重點矛盾,多觀察Stage中的Task,查看最耗時的Task,查找原因並改善;
D. hive處理數據的效率怎樣可以比肩spark
1、為了讓Spark能夠連接到Hive的原有數據倉庫,我們需要將Hive中的hive-site.xml文件拷貝到Spark的conf目錄下,這樣就可以通過這個配置文件找到Hive的元數據以及數據存放。
在這里由於我的Spark是自動安裝和部署的,因此需要知道CDH將hive-site.xml放在哪裡。經過摸索。該文件默認所在的路徑是:/etc/hive/conf 下。
同理,spark的conf也是在/etc/spark/conf。
此時,如上所述,將對應的hive-site.xml拷貝到spark/conf目錄下即可
如果Hive的元數據存放在Mysql中,我們還需要准備好Mysql相關驅動,比如:mysql-connector-java-5.1.22-bin.jar。
2、編寫測試代碼
val conf=new SparkConf().setAppName("Spark-Hive").setMaster("local")
val sc=new SparkContext(conf)
//create hivecontext
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' ") //這里需要注意數據的間隔符
sqlContext.sql("LOAD DATA INPATH '/user/liujiyu/spark/kv1.txt' INTO TABLE src ");
sqlContext.sql(" SELECT * FROM jn1").collect().foreach(println)
sc.stop()
3、下面列舉一下出現的問題:
(1)如果沒有將hive-site.xml拷貝到spark/conf目錄下,會出現:
分析:從錯誤提示上面就知道,spark無法知道hive的元數據的位置,所以就無法實例化對應的client。
解決的辦法就是必須將hive-site.xml拷貝到spark/conf目錄下
(2)測試代碼中沒有加sc.stop會出現如下錯誤:
ERROR scheler.LiveListenerBus: Listener EventLoggingListener threw an exception
java.lang.reflect.InvocationTargetException
在代碼最後一行添加sc.stop()解決了該問題。
E. 源碼級解讀如何解決Spark-sql讀取hive分區表執行效率低問題
問題描述
在開發過程中使用spark去讀取hive分區表的過程中(或者使用hive on spark、nodepad開發工具),部分開發人員未注意添加分區屬性過濾導致在執行過程中載入了全量數據,引起任務執行效率低、磁碟IO大量損耗等問題。
解決辦法
1、自定義規則CheckPartitionTable類,實現Rule,通過以下方式創建SparkSession。
2、自定義規則CheckPartitionTable類,實現Rule,將規則類追加至Optimizer.batches: Seq[Batch]中,如下。
規則內容實現
1、CheckPartitionTable規則執行類,需要通過引入sparkSession從而獲取到引入conf;需要繼承Rule[LogicalPlan];
2、通過splitPredicates方法,分離分區謂詞,得到分區謂詞表達式。在sql解析過程中將謂詞解析為TreeNode,此處採用遞歸的方式獲取分區謂詞。
3、判斷是否是分區表,且是否添加分區欄位。
4、實現Rule的apply方法
大數據和雲計算的關系
大數據JUC面試題
大數據之Kafka集群部署
大數據logstsh架構
大數據技術kafka的零拷貝
F. 如何使用 Spark SQL
一、啟動方法
/data/spark-1.4.0-bin-cdh4/bin/spark-sql --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
註:/data/spark-1.4.0-bin-cdh4/為spark的安裝路徑
/data/spark-1.4.0-bin-cdh4/bin/spark-sql –help 查看啟動選項
--master MASTER_URL 指定master url
--executor-memory MEM 每個executor的內存,默認為1G
--total-executor-cores NUM 所有executor的總核數
-e <quoted-query-string> 直接執行查詢SQL
-f <filename> 以文件方式批量執行SQL
二、Spark sql對hive支持的功能
1、查詢語句:SELECT GROUP BY ORDER BY CLUSTER BY SORT BY
2、hive操作運算:
1) 關系運算:= ==, <>, <, >, >=, <=
2) 算術運算:+, -, *, /, %
3) 邏輯運算:AND, &&, OR, ||
4) 復雜的數據結構
5) 數學函數:(sign, ln, cos, etc)
6) 字元串函數:
3、 UDF
4、 UDAF
5、 用戶定義的序列化格式
6、join操作:JOIN {LEFT|RIGHT|FULL} OUTER JOIN LEFT SEMI JOIN CROSS JOIN
7、 unions操作:
8、 子查詢: SELECT col FROM ( SELECT a + b AS col from t1) t2
9、Sampling
10、 Explain
11、 分區表
12、 視圖
13、 hive ddl功能:CREATE TABLE、CREATE TABLE AS SELECT、ALTER TABLE
14、 支持的數據類型:TINYINT SMALLINT INT BIGINT BOOLEAN FLOAT DOUBLE STRING BINARY TIMESTAMPDATE ARRAY MAP STRUCT
三、Spark sql 在客戶端編程方式進行查詢數據
1、啟動spark-shell
./spark-shell --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
2、編寫程序
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("../examples/src/main/resources/people.json")
查看所有數據:df.show()
查看錶結構:df.printSchema()
只看name列:df.select("name").show()
對數據運算:df.select(df("name"), df("age") + 1).show()
過濾數據:df.filter(df("age") > 21).show()
分組統計:df.groupBy("age").count().show()
1、查詢txt數據
import sqlContext.implicits._
case class Person(name: String, age: Int)
val people = sc.textFile("../examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
2、parquet文件
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
3、hdfs文件
val df = sqlContext.read.load("hdfs://namenode.Hadoop:9000/user/hive/warehouse/spark_test.db/test_parquet/part-r-00001.gz.parquet")
4、保存查詢結果數據
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
df.select("name", "favorite_color").write.save("namesAndFavColors.parquet「)
四、Spark sql性能調優
緩存數據表:sqlContext.cacheTable("tableName")
取消緩存表:sqlContext.uncacheTable("tableName")
spark.sql.inMemoryColumnarStorage.compressedtrue當設置為true時,Spark SQL將為基於數據統計信息的每列自動選擇一個壓縮演算法。
spark.sql.inMemoryColumnarStorage.batchSize10000柱狀緩存的批數據大小。更大的批數據可以提高內存的利用率以及壓縮效率,但有OOMs的風險
G. 為什麼SparkSQL不支持子查詢
執行效率低 例:select * from table where a not in(select a from tableb) 如果子查詢包括一個比較大的結果集,就不建議使用。
H. spark core程序會比spark sql效率更好么
json File 日期類型 怎樣處理?怎樣從字元型,轉換為Date或DateTime類型?
json文件如下,有字元格式的日期類型
```
{ "name" : "Andy", "age" : 30, "time" :"2015-03-03T08:25:55.769Z"}
{ "name" : "Justin", "age" : 19, "time" : "2015-04-04T08:25:55.769Z" }
{ "name" : "pan", "age" : 49, "time" : "2015-05-05T08:25:55.769Z" }
{ "name" : "penny", "age" : 29, "time" : "2015-05-05T08:25:55.769Z" }
```
默認推測的Schema:
```
root
|-- _corrupt_record: string (nullable = true)
|-- age: long (nullable = true)
|-- name: string (nullable = true)
|-- time200: string (nullable = true)
```
測試代碼
```
val fileName = "person.json"
val sc = SparkUtils.getScLocal("json file 測試")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val jsonFile = sqlContext.read.json(fileName)
jsonFile.printSchema()
I. 基於spark SQL之上的檢索與排序對比性能測試
之前做過一年的spark研發,之前在阿里與騰訊也做了很久的hive,所以對這方面比較了解。
第一:其實快多少除了跟spark與hive本身的技術實現外,也跟機器性能,底層操作系統的參數優化息息相關,不能一概而論。
第二:hive 目前應該還是業界的主流,畢竟快與慢很多時候並非是至關重要的,對於一個生產系統來說,更重要的應該是穩定性,spark畢竟還算是比較新興的事務,快確實快,但是穩定性上距離hive相差甚遠。關於spark我們也修復了很多關於內存泄露的BUG,因為您問的是性能,所以不過多介紹(可以跟我要YDB編程指南,裡面有我對這些BUG的修正)
第三:關於性能,我測試的可能不夠全面,只能在排序與檢索過濾上提供我之前的基於YDB的BLOCK sort測試報告供您參考(網路上貼word太費勁,您可以跟我要 word文檔)。
排序可以說是很多日誌系統的硬指標(如按照時間逆序排序),如果一個大數據系統不能進行排序,基本上是這個系統屬於不可用狀態,排序算得上是大數據系統的一個「剛需」,無論大數據採用的是hadoop,還是spark,還是impala,hive,總之排序是必不可少的,排序的性能測試也是必不可少的。
有著計算奧運會之稱的Sort Benchmark全球排序每年都會舉行一次,每年巨頭都會在排序上進行巨大的投入,可見排序速度的高低有多麼重要!但是對於大多數企業來說,動輒上億的硬體投入,實在劃不來、甚至遠遠超出了企業的項目預算。相比大數據領域的暴力排序有沒有一種更廉價的實現方式?
在這里,我們為大家介紹一種新的廉價排序方法,我們稱為blockSort。
500G的數據300億條數據,只使用4台 16核,32G內存,千兆網卡的虛擬機即可實現 2~15秒的 排序 (可以全表排序,也可以與任意篩選條件篩選後排序)。
一、基本的思想是這樣的,如下圖所示:
1.將數據按照大小預先劃分好,如劃分成 大、中、小三個塊(block)。
2.如果想找最大的數據,那麼只需要在最大的那個塊里去找就可以了。
3.這個快還是有層級結構的,如果每個塊內的數據量很多,可以到下面的子快內進行繼續查找,可以分多個層進行排序。
4.採用這種方法,一個億萬億級別的數據(如long類型),最壞最壞的極端情況也就進行2048次文件seek就可以篩選到結果。
五、哪些用戶適合使用YDB?
1.傳統關系型數據,已經無法容納更多的數據,查詢效率嚴重受到影響的用戶。
2.目前在使用SOLR、ES做全文檢索,覺得solr與ES提供的分析功能太少,無法完成復雜的業務邏輯,或者數據量變多後SOLR與ES變得不穩定,在掉片與均衡中不斷惡性循環,不能自動恢復服務,運維人員需經常半夜起來重啟集群的情況。
3.基於對海量數據的分析,但是苦於現有的離線計算平台的速度和響應時間無滿足業務要求的用戶。
4.需要對用戶畫像行為類數據做多維定向分析的用戶。
5.需要對大量的UGC(User Generate Content)數據進行檢索的用戶。
6.當你需要在大數據集上面進行快速的,互動式的查詢時。
7.當你需要進行數據分析,而不只是簡單的鍵值對存儲時。
8.當你想要分析實時產生的數據時。
ps:說了一大堆,說白了最適合的還是蹤跡分析因為數據量大,數據還要求實時,查詢還要求快。這才是關鍵。
J. Spark-sql讀取hive分區表限制分區過濾條件及限制分區數量
在開發過程中使用spark去讀取hive分區表的過程中(或者使用hive on spark、nodepad開發工具),部分開發人員未注意添加分區屬性過濾導致在執行過程中載入了全量數據,引起任務執行效率低、磁碟IO大量損耗等問題
1、自定義規則CheckPartitionTable類,實現Rule
然後通過此種方法創建SparkSession
2、自定義規則CheckPartitionTable類,實現Rule,將規則類追加致Optimizer.batches: Seq[Batch]中
1、CheckPartitionTable規則執行類,需要通過引入sparkSession從而獲取到引入conf;需要繼承Rule[LogicalPlan];
2、通過splitPredicates方法,分離分區謂詞,得到分區謂詞表達式
在sql解析過程中將謂詞解析為TreeNode,此處採用遞歸的方式獲取分區謂詞
3、判斷是否是分區表,且是否添加分區欄位
4、實現Rule的apply方法
關於spark-sql的主要執行流程及預備知識,可參照我同學的這篇博文 https://www.jianshu.com/p/4cc6797fb9ce