當前位置:首頁 » 編程語言 » sparksql語法手冊
擴展閱讀
webinf下怎麼引入js 2023-08-31 21:54:13
堡壘機怎麼打開web 2023-08-31 21:54:11

sparksql語法手冊

發布時間: 2022-11-20 04:45:48

Ⅰ Spark sql到底支持什麼SQL語句

作者:張寬
鏈接:https://www.hu.com/question/34569764/answer/59217173
來源:知乎
著作權歸作者所有,轉載請聯系作者獲得授權。

scala語言不是很容易懂,但是裡面有解析SQL的方法,可以看出支持的SQL語句,至少關鍵詞是很明確的。

protected val ALL = Keyword("ALL")
protected val AND = Keyword("AND")
protected val APPROXIMATE = Keyword("APPROXIMATE")
protected val AS = Keyword("AS")
protected val ASC = Keyword("ASC")
protected val BETWEEN = Keyword("BETWEEN")
protected val BY = Keyword("BY")
protected val CASE = Keyword("CASE")
protected val CAST = Keyword("CAST")
protected val DESC = Keyword("DESC")
protected val DISTINCT = Keyword("DISTINCT")
protected val ELSE = Keyword("ELSE")
protected val END = Keyword("END")
protected val EXCEPT = Keyword("EXCEPT")
protected val FALSE = Keyword("FALSE")
protected val FROM = Keyword("FROM")
protected val FULL = Keyword("FULL")
protected val GROUP = Keyword("GROUP")
protected val HAVING = Keyword("HAVING")
protected val IN = Keyword("IN")
protected val INNER = Keyword("INNER")
protected val INSERT = Keyword("INSERT")
protected val INTERSECT = Keyword("INTERSECT")
protected val INTO = Keyword("INTO")
protected val IS = Keyword("IS")
protected val JOIN = Keyword("JOIN")
protected val LEFT = Keyword("LEFT")
protected val LIKE = Keyword("LIKE")
protected val LIMIT = Keyword("LIMIT")
protected val NOT = Keyword("NOT")
protected val NULL = Keyword("NULL")
protected val ON = Keyword("ON")
protected val OR = Keyword("OR")
protected val ORDER = Keyword("ORDER")
protected val SORT = Keyword("SORT")
protected val OUTER = Keyword("OUTER")
protected val OVERWRITE = Keyword("OVERWRITE")
protected val REGEXP = Keyword("REGEXP")
protected val RIGHT = Keyword("RIGHT")
protected val RLIKE = Keyword("RLIKE")
protected val SELECT = Keyword("SELECT")
protected val SEMI = Keyword("SEMI")
protected val TABLE = Keyword("TABLE")
protected val THEN = Keyword("THEN")
protected val TRUE = Keyword("TRUE")
protected val UNION = Keyword("UNION")
protected val WHEN = Keyword("WHEN")
protected val WHERE = Keyword("WHERE")
protected val WITH = Keyword("WITH")

Ⅱ SQL 語法速成手冊(干貨滿滿,建議收藏!)



SQL 語法結構包括:

例如: SELECT 與 select 、 Select 是相同的。

數據定義語言(Data Definition Language,DDL)是 SQL 語言集中負責數據結構定義與資料庫對象定義的語言。

DDL 的主要功能是 定義資料庫對象

DDL 的核心指令是 CREATE 、 ALTER 、 DROP 。

數據操縱語言(Data Manipulation Language, DML)是用於資料庫操作,對資料庫其中的對象和數據運行訪問工作的編程語句。

DML 的主要功能是 訪問數據 ,因此其語法都是以 讀寫資料庫 為主。

DML 的核心指令是 INSERT 、 UPDATE 、 DELETE 、 SELECT 。這四個指令合稱 CRUD(Create, Read, Update, Delete),即增刪改查。

事務控制語言 (Transaction Control Language, TCL) 用於 管理資料庫中的事務 。這些用於管理由 DML 語句所做的更改。它還允許將語句分組為邏輯事務。

TCL 的核心指令是 COMMIT 、 ROLLBACK 。

數據控制語言 (Data Control Language, DCL) 是一種可對數據訪問權進行控制的指令,它可以控制特定用戶賬戶對數據表、查看錶、預存程序、用戶自定義函數等資料庫對象的控制權。

DCL 的核心指令是 GRANT 、 REVOKE 。

DCL 以 控制用戶的訪問許可權 為主,因此其指令作法並不復雜,可利用 DCL 控制的許可權有: CONNECT 、 SELECT 、 INSERT 、 UPDATE 、 DELETE 、 EXECUTE 、 USAGE 、 REFERENCES 。

根據不同的 DBMS 以及不同的安全性實體,其支持的許可權控制也有所不同。

(以下為 DML 語句用法)


插入完整的行

插入行的一部分

插入查詢出來的數據



刪除表中的指定數據

清空表中的數據


查詢單列

查詢多列

查詢所有列

查詢不同的值

限制查詢結果


子查詢的子查詢



SELECT 語句中的 WHERE 子句

UPDATE 語句中的 WHERE 子句

DELETE 語句中的 WHERE 子句

IN 示例

BETWEEN 示例

AND 示例

OR 示例

NOT 示例

% 示例

_ 示例


組合查詢



其中, SOUNDEX() 可以將一個字元串轉換為描述其語音表示的字母數字模式。








AVG() 會忽略 NULL 行。

使用 DISTINCT 可以讓匯總函數值匯總不同的值。

指定多個列的排序方向

分組

分組後排序

使用 WHERE 和 HAVING 過濾數據

(以下為 DDL 語句用法)

普通創建

根據已有的表創建新表

添加列

刪除列

修改列

添加主鍵

刪除主鍵



創建表時使用約束條件:

(以下為 TCL 語句用法)


(以下為 DCL 語句用法)




可以使用觸發器來進行審計跟蹤,把修改記錄到另外一張表中。

MySQL 不允許在觸發器中使用 CALL 語句 ,也就是不能調用存儲過程。

BEGIN 和 END

當觸發器的觸發條件滿足時,將會執行 BEGIN 和 END 之間的觸發器執行動作。

NEW 和 OLD

CREATE TRIGGER 指令用於創建觸發器。

語法:

說明:

示例:

Ⅲ Spark SQL到底支持什麼SQL語句

試試看看spark\sql\catalyst\src\main\scala\org\apache\spark\sql\catalyst\SQLParser.scala
scala語言不是很容易懂,但是裡面有解析SQL的方法,可以看出支持的SQL語句,至少關鍵詞是很明確的。

protected val ALL = Keyword("ALL")
protected val AND = Keyword("AND")
protected val APPROXIMATE = Keyword("APPROXIMATE")
protected val AS = Keyword("AS")
protected val ASC = Keyword("ASC")
protected val BETWEEN = Keyword("BETWEEN")
protected val BY = Keyword("BY")
protected val CASE = Keyword("CASE")
protected val CAST = Keyword("CAST")
protected val DESC = Keyword("DESC")
protected val DISTINCT = Keyword("DISTINCT")
protected val ELSE = Keyword("ELSE")
protected val END = Keyword("END")
protected val EXCEPT = Keyword("EXCEPT")
protected val FALSE = Keyword("FALSE")
protected val FROM = Keyword("FROM")
protected val FULL = Keyword("FULL")
protected val GROUP = Keyword("GROUP")
protected val HAVING = Keyword("HAVING")
protected val IN = Keyword("IN")
protected val INNER = Keyword("INNER")
protected val INSERT = Keyword("INSERT")
protected val INTERSECT = Keyword("INTERSECT")
protected val INTO = Keyword("INTO")
protected val IS = Keyword("IS")
protected val JOIN = Keyword("JOIN")
protected val LEFT = Keyword("LEFT")
protected val LIKE = Keyword("LIKE")
protected val LIMIT = Keyword("LIMIT")
protected val NOT = Keyword("NOT")
protected val NULL = Keyword("NULL")
protected val ON = Keyword("ON")
protected val OR = Keyword("OR")
protected val ORDER = Keyword("ORDER")
protected val SORT = Keyword("SORT")
protected val OUTER = Keyword("OUTER")
protected val OVERWRITE = Keyword("OVERWRITE")
protected val REGEXP = Keyword("REGEXP")
protected val RIGHT = Keyword("RIGHT")
protected val RLIKE = Keyword("RLIKE")
protected val SELECT = Keyword("SELECT")
protected val SEMI = Keyword("SEMI")
protected val TABLE = Keyword("TABLE")
protected val THEN = Keyword("THEN")
protected val TRUE = Keyword("TRUE")
protected val UNION = Keyword("UNION")
protected val WHEN = Keyword("WHEN")
protected val WHERE = Keyword("WHERE")
protected val WITH = Keyword("WITH")

Ⅳ 可能是全網最詳細的 Spark Sql Aggregate 源碼剖析

縱觀 Spark Sql 源碼,聚合的實現是其中較為復雜的部分,本文希望能以例子結合流程圖的方式來說清楚整個過程。這里僅關注 Aggregate 在物理執行計劃相關的內容,之前的 parse、analyze 及 optimize 階段暫不做分析。在 Spark Sql 中,有一個專門的 Aggregation strategy 用來處理聚合,我們先來看看這個策略。

本文暫不討論 distinct Aggregate 的實現(有興趣的可以看看另一篇博文 https://www.jianshu.com/p/77e0a70db8cd ),我們來看看 AggUtils#planAggregateWithoutDistinct 是如何生成聚合的物理執行計劃的

創建聚合分為兩個階段:

AggregateExpression 共有以下幾種 mode:

Q:是否支持使用 hash based agg 是如何判斷的?

摘自我另一篇文章: https://www.jianshu.com/p/77e0a70db8cd

為了說明最常用也是最復雜的的 hash based agg,本小節暫時將示例 sql 改為

這樣就能進入 HashAggregateExec 的分支

構造函數主要工作就是對 groupingExpressions、aggregateExpressions、aggregateAttributes、resultExpressions 進行了初始化

在 enable code gen 的情況下,會調用 HashAggregateExec#inputRDDs 來生成 RDD,為了分析 HashAggregateExec 是如何生成 RDD 的,我們設置 spark.sql.codegen.wholeStage 為 false 來 disable code gen,這樣就會調用 HashAggregateExec#doExecute 來生成 RDD,如下:

可以看到,關鍵的部分就是根據 child.execute() 生成的 RDD 的每一個 partition 的迭代器轉化生成一個新的 TungstenAggregationIterator ,即 HashAggregateExec 生成的 RDD 的各個 partition。由於 TungstenAggregationIterator 涉及內容非常多,我們單開一大節來進行介紹。

此迭代器:

註:UnsafeKVExternalSorter 的實現可以參考:

UnsafeRow 是 InternalRow(表示一行記錄) 的 unsafe 實現,由原始內存(byte array)而不是 Java 對象支持,由三個區域組成:

使用 UnsafeRow 的收益:

構造函數的主要流程已在上圖中說明,需要注意的是:當內存不足時(畢竟每個 grouping 對應的 agg buffer 直接佔用內存,如果 grouping 非常多,或者 agg buffer 較大,容易出現內存用盡)會從 hash based aggregate 切換為 sort based aggregate(會 spill 數據到磁碟),後文會進行詳述。先來看看最關鍵的 processInputs 方法的實現

上圖中,需要注意的是:hashMap 中 get 一個 groupingKey 對應的 agg buffer 時,若已經存在該 buffer 則直接返回;若不存在,嘗試申請內存新建一個:

上圖中,用於真正處理一條 row 的 AggregationIterator#processRow 還需進一步展開分析。在此之前,我們先來看看 AggregateFunction 的分類

AggregateFunction 可以分為 DeclarativeAggregate 和 ImperativeAggregate 兩大類,具體的聚合函數均為這兩類的子類。

DeclarativeAggregate 是一類直接由 Catalyst 中的 Expressions 構成的聚合函數,主要邏輯通過調用 4 個表達式完成,分別是:

我們再次以容易理解的 Count 來舉例說明:

通常來講,實現一個基於 Expressions 的 DeclarativeAggregate 函數包含以下幾個重要的組成部分:

再來看看 AggregationIterator#processRow

AggregationIterator#processRow 會調用

生成用於處理一行數據(row)的函數

說白了 processRow 生成了函數才是直接用來接受一條 input row 來更新對應的 agg buffer,具體是根據 mode 及 aggExpression 中的 aggFunction 的類型調用其 updateExpressions 或 mergeExpressions 方法:

比如,對於 aggFunction 為 DeclarativeAggregate 類型的 Partial 下的 Count 來說就是調用其 updateExpressions 方法,即:

對於 Final 的 Count 來說就是調用其 mergeExpressions 方法,即:

對於 aggFunction 為 ImperativeAggregate 類型的 Partial 下的 Collect 來說就是調用其 update 方法,即:

對於 Final 的 Collect 來說就是調用其 merge 方法,即:

我們都知道,讀取一個迭代器的數據,是要不斷調用 hasNext 方法進行 check 是否還有數據,當該方法返回 true 的時候再調用 next 方法取得下一條數據。所以要知道如何讀取 TungstenAggregationIterator 的數據,就得分析其這兩個方法。

分為兩種情況,分別是:

Agg 的實現確實復雜,本文雖然篇幅已經很長,但還有很多方面沒有 cover 到,但基本最核心、最復雜的點都詳細介紹了,如果對於未 cover 的部分有興趣,請自行閱讀源碼進行分析~

Ⅳ spark sql怎麼劃分stage

其實sql就是關系操作。關系操作跟map,rece這些基礎運算元對應起來的(spark其實基礎運算元也是map,rece,只是在此基礎上做了擴展)。比如projection,filter是窄依賴,join,semi join,outer join是寬依賴。
具體流程會比較復雜。首先spark會解析這條sql,生成語法樹(spark2.0會通過antlr4解析),然後經過邏輯優化(dataframe中有logic plan),然後轉換為map rece,生成對應的操作運算元(projection,filter,join等)。有了寬依賴,窄依賴,也就能劃分stage了。

Ⅵ spark 怎麼通過寫sql語句一行一行讀數據

spark 怎麼通過寫sql語句一行一行讀數據
Spark SQL就是shark ,也就是SQL on Spark。如果沒記錯的話,shark的開發利用了hive的API,所以支持讀取HBase。而且Spark的數據類型兼容範圍大於Hadoop,並且包含了Hadoop所支持的任何數據類型。

Ⅶ spark sql 2.3 源碼解讀 - Execute (7)

終於到了最後一步執行了:

最關鍵的兩個函數便是 doPrepare和 doExecute了。

還是以上一章的sql語句為例,其最終生成的sparkplan為:

看一下SortExec的doPrepare 和 doExecute方法:

下面看child也就是ShuffleExchangeExec:

先看沒有exchangeCoordinator的情況,

首先執行:

上面的方法會返回一個ShuffleDependency,ShuffleDependency中最重要的是rddWithPartitionIds,它決定了每一條InternalRow shuffle後的partition id:

接下來:

返回結果是ShuffledRowRDD:

CoalescedPartitioner的邏輯:

再看有exchangeCoordinator的情況:

同樣返回的是ShuffledRowRDD:

再看doEstimationIfNecessary:

estimatePartitionStartIndices 函數得到了 partitionStartIndices:

有exchangeCoordinator的情況就生成了partitionStartIndices,從而對分區進行了調整。

最後來一個例子:

未開啟exchangeCoordinator的plan:

開啟exchangeCoordinator的plan:

不同之處是 兩個Exchange都帶了coordinator,且都是同一個coordinator。

執行withExchangeCoordinator前:

執行withExchangeCoordinator後:

生成了coordinator,且執行了 doPrepare後,可以看到兩個exchange都向其注冊了。

doExecute後:

原先的numPartitions是200,經過執行後,生成的partitionStartIndices為[1],也就是只有1個partition,顯然在測試數據量很小的情況下,1個partition是更為合理的。這就是ExchangeCoordinator的功勞。

execute 最終的輸出是rdd,剩下的結果便是spark對rdd的運算了。其實 spark sql 最終的目標便也是生成rdd,交給spark core來運算。

spark sql的介紹到這里就結束了。

Ⅷ spark sql 怎樣處理日期類型

json File 日期類型 怎樣處理?怎樣從字元型,轉換為Date或DateTime類型?

json文件如下,有字元格式的日期類型

```
{ "name" : "Andy", "age" : 30, "time" :"2015-03-03T08:25:55.769Z"}
{ "name" : "Justin", "age" : 19, "time" : "2015-04-04T08:25:55.769Z" }
{ "name" : "pan", "age" : 49, "time" : "2015-05-05T08:25:55.769Z" }
{ "name" : "penny", "age" : 29, "time" : "2015-05-05T08:25:55.769Z" }
```
默認推測的Schema:
```
root
|-- _corrupt_record: string (nullable = true)
|-- age: long (nullable = true)
|-- name: string (nullable = true)
|-- time200: string (nullable = true)
```
測試代碼
```
val fileName = "person.json"
val sc = SparkUtils.getScLocal("json file 測試")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val jsonFile = sqlContext.read.json(fileName)
jsonFile.printSchema()
```
##解決方案

### 方案一、json數據 時間為 long 秒或毫秒
### 方案二、自定義schema
```
val fileName = "person.json"
val sc = SparkUtils.getScLocal("json file 測試")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val schema: StructType = StructType(mutable.ArraySeq(
StructField("name", StringType, true),
StructField("age", StringType, true),
StructField("time", TimestampType, true)));
val jsonFile = sqlContext.read.schema(schema).json(fileName)
jsonFile.printSchema()
jsonFile.registerTempTable("person")
val now: Timestamp = new Timestamp(System.currentTimeMillis())

val teenagers = sqlContext.sql("SELECT * FROM person WHERE age >= 20 AND age <= 30 AND time <=『" +now+"『")
teenagers.foreach(println)
val dataFrame = sqlContext.sql("SELECT * FROM person WHERE age >= 20 AND age <= 30 AND time <=『2015-03-03 16:25:55.769『")
dataFrame.foreach(println)
```
###方案三、sql建表
創建表sql
```
CREATE TEMPORARY TABLE person IF NOT EXISTS
[(age: long ,name:string ,time:Timestamp)]
USING org.apache.spark.sql.json
OPTIONS ( path 『person.json『)
語法
CREATE [TEMPORARY] TABLE [IF NOT EXISTS]
[(col-name data-type [, …])]
USING [OPTIONS ...]
[AS ]
```

### 方案四、用textfile convert

Ⅸ 如何使用 Spark SQL

一、啟動方法
/data/spark-1.4.0-bin-cdh4/bin/spark-sql --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2

註:/data/spark-1.4.0-bin-cdh4/為spark的安裝路徑

/data/spark-1.4.0-bin-cdh4/bin/spark-sql –help 查看啟動選項

--master MASTER_URL 指定master url
--executor-memory MEM 每個executor的內存,默認為1G
--total-executor-cores NUM 所有executor的總核數
-e <quoted-query-string> 直接執行查詢SQL

-f <filename> 以文件方式批量執行SQL

二、Spark sql對hive支持的功能

1、查詢語句:SELECT GROUP BY ORDER BY CLUSTER BY SORT BY
2、hive操作運算:
1) 關系運算:= ==, <>, <, >, >=, <=
2) 算術運算:+, -, *, /, %
3) 邏輯運算:AND, &&, OR, ||
4) 復雜的數據結構
5) 數學函數:(sign, ln, cos, etc)
6) 字元串函數:
3、 UDF
4、 UDAF

5、 用戶定義的序列化格式
6、join操作:JOIN {LEFT|RIGHT|FULL} OUTER JOIN LEFT SEMI JOIN CROSS JOIN
7、 unions操作:
8、 子查詢: SELECT col FROM ( SELECT a + b AS col from t1) t2
9、Sampling
10、 Explain
11、 分區表
12、 視圖
13、 hive ddl功能:CREATE TABLE、CREATE TABLE AS SELECT、ALTER TABLE

14、 支持的數據類型:TINYINT SMALLINT INT BIGINT BOOLEAN FLOAT DOUBLE STRING BINARY TIMESTAMPDATE ARRAY MAP STRUCT

三、Spark sql 在客戶端編程方式進行查詢數據
1、啟動spark-shell
./spark-shell --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
2、編寫程序
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("../examples/src/main/resources/people.json")
查看所有數據:df.show()
查看錶結構:df.printSchema()
只看name列:df.select("name").show()
對數據運算:df.select(df("name"), df("age") + 1).show()
過濾數據:df.filter(df("age") > 21).show()

分組統計:df.groupBy("age").count().show()

1、查詢txt數據
import sqlContext.implicits._
case class Person(name: String, age: Int)
val people = sc.textFile("../examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
2、parquet文件
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
3、hdfs文件

val df = sqlContext.read.load("hdfs://namenode.Hadoop:9000/user/hive/warehouse/spark_test.db/test_parquet/part-r-00001.gz.parquet")
4、保存查詢結果數據
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")

df.select("name", "favorite_color").write.save("namesAndFavColors.parquet「)

四、Spark sql性能調優

緩存數據表:sqlContext.cacheTable("tableName")

取消緩存表:sqlContext.uncacheTable("tableName")

spark.sql.inMemoryColumnarStorage.compressedtrue當設置為true時,Spark SQL將為基於數據統計信息的每列自動選擇一個壓縮演算法。
spark.sql.inMemoryColumnarStorage.batchSize10000柱狀緩存的批數據大小。更大的批數據可以提高內存的利用率以及壓縮效率,但有OOMs的風險

Ⅹ Spark SQL(十):Hive On Spark

Hive是目前大數據領域,事實上的SQL標准。其底層默認是基於MapRece實現的,但是由於MapRece速度實在比較慢,因此這幾年,陸續出來了新的SQL查詢引擎,包括Spark SQL,Hive On Tez,Hive On Spark等。

Spark SQL與Hive On Spark是不一樣的。Spark SQL是Spark自己研發出來的針對各種數據源,包括Hive、JSON、Parquet、JDBC、RDD等都可以執行查詢的,一套基於Spark計算引擎的查詢引擎。因此它是Spark的一個項目,只不過提供了針對Hive執行查詢的工功能而已,適合在一些使用Spark技術棧的大數據應用類系統中使用。

而Hive On Spark,是Hive的一個項目,它是將Spark作為底層的查詢引擎(不通過MapRece作為唯一的查詢引擎)。Hive On Spark,只適用於Hive,在可預見的未來,很有可能Hive默認的底層引擎就從MapRece切換為Spark了;適合於將原有的Hive數據倉庫以及數據統計分析替換為Spark引擎,作為全公司通用的大數據統計分析引擎。

Hive On Spark做了一些優化:
1、Map Join
Spark SQL默認對join是支持使用broadcast機制將小表廣播到各個節點上,以進行join的。但是問題是,這會給Driver和Worker帶來很大的內存開銷。因為廣播的數據要一直保留在Driver內存中。所以目前採取的是,類似乎MapRece的Distributed Cache機制,即提高HDFS replica factor的復制因子,以讓數據在每個計算節點上都有一個備份,從而可以在本地進行數據讀取。

2、Cache Table
對於某些需要對一張表執行多次操作的場景,Hive On Spark內部做了優化,即將要多次操作的表cache到內存中,以便於提升性能。但是這里要注意,並不是對所有的情況都會自動進行cache。所以說,Hive On Spark還有很多不完善的地方。

Hive QL語句 =>
語法分析 => AST =>
生成邏輯執行計劃 => Operator Tree =>
優化邏輯執行計劃 => Optimized Operator Tree =>
生成物理執行計劃 => Task Tree =>
優化物理執行計劃 => Optimized Task Tree =>
執行優化後的Optimized Task Tree