当前位置:首页 » 编程语言 » sparksql语法手册
扩展阅读
webinf下怎么引入js 2023-08-31 21:54:13
堡垒机怎么打开web 2023-08-31 21:54:11

sparksql语法手册

发布时间: 2022-11-20 04:45:48

Ⅰ Spark sql到底支持什么SQL语句

作者:张宽
链接:https://www.hu.com/question/34569764/answer/59217173
来源:知乎
着作权归作者所有,转载请联系作者获得授权。

scala语言不是很容易懂,但是里面有解析SQL的方法,可以看出支持的SQL语句,至少关键词是很明确的。

protected val ALL = Keyword("ALL")
protected val AND = Keyword("AND")
protected val APPROXIMATE = Keyword("APPROXIMATE")
protected val AS = Keyword("AS")
protected val ASC = Keyword("ASC")
protected val BETWEEN = Keyword("BETWEEN")
protected val BY = Keyword("BY")
protected val CASE = Keyword("CASE")
protected val CAST = Keyword("CAST")
protected val DESC = Keyword("DESC")
protected val DISTINCT = Keyword("DISTINCT")
protected val ELSE = Keyword("ELSE")
protected val END = Keyword("END")
protected val EXCEPT = Keyword("EXCEPT")
protected val FALSE = Keyword("FALSE")
protected val FROM = Keyword("FROM")
protected val FULL = Keyword("FULL")
protected val GROUP = Keyword("GROUP")
protected val HAVING = Keyword("HAVING")
protected val IN = Keyword("IN")
protected val INNER = Keyword("INNER")
protected val INSERT = Keyword("INSERT")
protected val INTERSECT = Keyword("INTERSECT")
protected val INTO = Keyword("INTO")
protected val IS = Keyword("IS")
protected val JOIN = Keyword("JOIN")
protected val LEFT = Keyword("LEFT")
protected val LIKE = Keyword("LIKE")
protected val LIMIT = Keyword("LIMIT")
protected val NOT = Keyword("NOT")
protected val NULL = Keyword("NULL")
protected val ON = Keyword("ON")
protected val OR = Keyword("OR")
protected val ORDER = Keyword("ORDER")
protected val SORT = Keyword("SORT")
protected val OUTER = Keyword("OUTER")
protected val OVERWRITE = Keyword("OVERWRITE")
protected val REGEXP = Keyword("REGEXP")
protected val RIGHT = Keyword("RIGHT")
protected val RLIKE = Keyword("RLIKE")
protected val SELECT = Keyword("SELECT")
protected val SEMI = Keyword("SEMI")
protected val TABLE = Keyword("TABLE")
protected val THEN = Keyword("THEN")
protected val TRUE = Keyword("TRUE")
protected val UNION = Keyword("UNION")
protected val WHEN = Keyword("WHEN")
protected val WHERE = Keyword("WHERE")
protected val WITH = Keyword("WITH")

Ⅱ SQL 语法速成手册(干货满满,建议收藏!)



SQL 语法结构包括:

例如: SELECT 与 select 、 Select 是相同的。

数据定义语言(Data Definition Language,DDL)是 SQL 语言集中负责数据结构定义与数据库对象定义的语言。

DDL 的主要功能是 定义数据库对象

DDL 的核心指令是 CREATE 、 ALTER 、 DROP 。

数据操纵语言(Data Manipulation Language, DML)是用于数据库操作,对数据库其中的对象和数据运行访问工作的编程语句。

DML 的主要功能是 访问数据 ,因此其语法都是以 读写数据库 为主。

DML 的核心指令是 INSERT 、 UPDATE 、 DELETE 、 SELECT 。这四个指令合称 CRUD(Create, Read, Update, Delete),即增删改查。

事务控制语言 (Transaction Control Language, TCL) 用于 管理数据库中的事务 。这些用于管理由 DML 语句所做的更改。它还允许将语句分组为逻辑事务。

TCL 的核心指令是 COMMIT 、 ROLLBACK 。

数据控制语言 (Data Control Language, DCL) 是一种可对数据访问权进行控制的指令,它可以控制特定用户账户对数据表、查看表、预存程序、用户自定义函数等数据库对象的控制权。

DCL 的核心指令是 GRANT 、 REVOKE 。

DCL 以 控制用户的访问权限 为主,因此其指令作法并不复杂,可利用 DCL 控制的权限有: CONNECT 、 SELECT 、 INSERT 、 UPDATE 、 DELETE 、 EXECUTE 、 USAGE 、 REFERENCES 。

根据不同的 DBMS 以及不同的安全性实体,其支持的权限控制也有所不同。

(以下为 DML 语句用法)


插入完整的行

插入行的一部分

插入查询出来的数据



删除表中的指定数据

清空表中的数据


查询单列

查询多列

查询所有列

查询不同的值

限制查询结果


子查询的子查询



SELECT 语句中的 WHERE 子句

UPDATE 语句中的 WHERE 子句

DELETE 语句中的 WHERE 子句

IN 示例

BETWEEN 示例

AND 示例

OR 示例

NOT 示例

% 示例

_ 示例


组合查询



其中, SOUNDEX() 可以将一个字符串转换为描述其语音表示的字母数字模式。








AVG() 会忽略 NULL 行。

使用 DISTINCT 可以让汇总函数值汇总不同的值。

指定多个列的排序方向

分组

分组后排序

使用 WHERE 和 HAVING 过滤数据

(以下为 DDL 语句用法)

普通创建

根据已有的表创建新表

添加列

删除列

修改列

添加主键

删除主键



创建表时使用约束条件:

(以下为 TCL 语句用法)


(以下为 DCL 语句用法)




可以使用触发器来进行审计跟踪,把修改记录到另外一张表中。

MySQL 不允许在触发器中使用 CALL 语句 ,也就是不能调用存储过程。

BEGIN 和 END

当触发器的触发条件满足时,将会执行 BEGIN 和 END 之间的触发器执行动作。

NEW 和 OLD

CREATE TRIGGER 指令用于创建触发器。

语法:

说明:

示例:

Ⅲ Spark SQL到底支持什么SQL语句

试试看看spark\sql\catalyst\src\main\scala\org\apache\spark\sql\catalyst\SQLParser.scala
scala语言不是很容易懂,但是里面有解析SQL的方法,可以看出支持的SQL语句,至少关键词是很明确的。

protected val ALL = Keyword("ALL")
protected val AND = Keyword("AND")
protected val APPROXIMATE = Keyword("APPROXIMATE")
protected val AS = Keyword("AS")
protected val ASC = Keyword("ASC")
protected val BETWEEN = Keyword("BETWEEN")
protected val BY = Keyword("BY")
protected val CASE = Keyword("CASE")
protected val CAST = Keyword("CAST")
protected val DESC = Keyword("DESC")
protected val DISTINCT = Keyword("DISTINCT")
protected val ELSE = Keyword("ELSE")
protected val END = Keyword("END")
protected val EXCEPT = Keyword("EXCEPT")
protected val FALSE = Keyword("FALSE")
protected val FROM = Keyword("FROM")
protected val FULL = Keyword("FULL")
protected val GROUP = Keyword("GROUP")
protected val HAVING = Keyword("HAVING")
protected val IN = Keyword("IN")
protected val INNER = Keyword("INNER")
protected val INSERT = Keyword("INSERT")
protected val INTERSECT = Keyword("INTERSECT")
protected val INTO = Keyword("INTO")
protected val IS = Keyword("IS")
protected val JOIN = Keyword("JOIN")
protected val LEFT = Keyword("LEFT")
protected val LIKE = Keyword("LIKE")
protected val LIMIT = Keyword("LIMIT")
protected val NOT = Keyword("NOT")
protected val NULL = Keyword("NULL")
protected val ON = Keyword("ON")
protected val OR = Keyword("OR")
protected val ORDER = Keyword("ORDER")
protected val SORT = Keyword("SORT")
protected val OUTER = Keyword("OUTER")
protected val OVERWRITE = Keyword("OVERWRITE")
protected val REGEXP = Keyword("REGEXP")
protected val RIGHT = Keyword("RIGHT")
protected val RLIKE = Keyword("RLIKE")
protected val SELECT = Keyword("SELECT")
protected val SEMI = Keyword("SEMI")
protected val TABLE = Keyword("TABLE")
protected val THEN = Keyword("THEN")
protected val TRUE = Keyword("TRUE")
protected val UNION = Keyword("UNION")
protected val WHEN = Keyword("WHEN")
protected val WHERE = Keyword("WHERE")
protected val WITH = Keyword("WITH")

Ⅳ 可能是全网最详细的 Spark Sql Aggregate 源码剖析

纵观 Spark Sql 源码,聚合的实现是其中较为复杂的部分,本文希望能以例子结合流程图的方式来说清楚整个过程。这里仅关注 Aggregate 在物理执行计划相关的内容,之前的 parse、analyze 及 optimize 阶段暂不做分析。在 Spark Sql 中,有一个专门的 Aggregation strategy 用来处理聚合,我们先来看看这个策略。

本文暂不讨论 distinct Aggregate 的实现(有兴趣的可以看看另一篇博文 https://www.jianshu.com/p/77e0a70db8cd ),我们来看看 AggUtils#planAggregateWithoutDistinct 是如何生成聚合的物理执行计划的

创建聚合分为两个阶段:

AggregateExpression 共有以下几种 mode:

Q:是否支持使用 hash based agg 是如何判断的?

摘自我另一篇文章: https://www.jianshu.com/p/77e0a70db8cd

为了说明最常用也是最复杂的的 hash based agg,本小节暂时将示例 sql 改为

这样就能进入 HashAggregateExec 的分支

构造函数主要工作就是对 groupingExpressions、aggregateExpressions、aggregateAttributes、resultExpressions 进行了初始化

在 enable code gen 的情况下,会调用 HashAggregateExec#inputRDDs 来生成 RDD,为了分析 HashAggregateExec 是如何生成 RDD 的,我们设置 spark.sql.codegen.wholeStage 为 false 来 disable code gen,这样就会调用 HashAggregateExec#doExecute 来生成 RDD,如下:

可以看到,关键的部分就是根据 child.execute() 生成的 RDD 的每一个 partition 的迭代器转化生成一个新的 TungstenAggregationIterator ,即 HashAggregateExec 生成的 RDD 的各个 partition。由于 TungstenAggregationIterator 涉及内容非常多,我们单开一大节来进行介绍。

此迭代器:

注:UnsafeKVExternalSorter 的实现可以参考:

UnsafeRow 是 InternalRow(表示一行记录) 的 unsafe 实现,由原始内存(byte array)而不是 Java 对象支持,由三个区域组成:

使用 UnsafeRow 的收益:

构造函数的主要流程已在上图中说明,需要注意的是:当内存不足时(毕竟每个 grouping 对应的 agg buffer 直接占用内存,如果 grouping 非常多,或者 agg buffer 较大,容易出现内存用尽)会从 hash based aggregate 切换为 sort based aggregate(会 spill 数据到磁盘),后文会进行详述。先来看看最关键的 processInputs 方法的实现

上图中,需要注意的是:hashMap 中 get 一个 groupingKey 对应的 agg buffer 时,若已经存在该 buffer 则直接返回;若不存在,尝试申请内存新建一个:

上图中,用于真正处理一条 row 的 AggregationIterator#processRow 还需进一步展开分析。在此之前,我们先来看看 AggregateFunction 的分类

AggregateFunction 可以分为 DeclarativeAggregate 和 ImperativeAggregate 两大类,具体的聚合函数均为这两类的子类。

DeclarativeAggregate 是一类直接由 Catalyst 中的 Expressions 构成的聚合函数,主要逻辑通过调用 4 个表达式完成,分别是:

我们再次以容易理解的 Count 来举例说明:

通常来讲,实现一个基于 Expressions 的 DeclarativeAggregate 函数包含以下几个重要的组成部分:

再来看看 AggregationIterator#processRow

AggregationIterator#processRow 会调用

生成用于处理一行数据(row)的函数

说白了 processRow 生成了函数才是直接用来接受一条 input row 来更新对应的 agg buffer,具体是根据 mode 及 aggExpression 中的 aggFunction 的类型调用其 updateExpressions 或 mergeExpressions 方法:

比如,对于 aggFunction 为 DeclarativeAggregate 类型的 Partial 下的 Count 来说就是调用其 updateExpressions 方法,即:

对于 Final 的 Count 来说就是调用其 mergeExpressions 方法,即:

对于 aggFunction 为 ImperativeAggregate 类型的 Partial 下的 Collect 来说就是调用其 update 方法,即:

对于 Final 的 Collect 来说就是调用其 merge 方法,即:

我们都知道,读取一个迭代器的数据,是要不断调用 hasNext 方法进行 check 是否还有数据,当该方法返回 true 的时候再调用 next 方法取得下一条数据。所以要知道如何读取 TungstenAggregationIterator 的数据,就得分析其这两个方法。

分为两种情况,分别是:

Agg 的实现确实复杂,本文虽然篇幅已经很长,但还有很多方面没有 cover 到,但基本最核心、最复杂的点都详细介绍了,如果对于未 cover 的部分有兴趣,请自行阅读源码进行分析~

Ⅳ spark sql怎么划分stage

其实sql就是关系操作。关系操作跟map,rece这些基础算子对应起来的(spark其实基础算子也是map,rece,只是在此基础上做了扩展)。比如projection,filter是窄依赖,join,semi join,outer join是宽依赖。
具体流程会比较复杂。首先spark会解析这条sql,生成语法树(spark2.0会通过antlr4解析),然后经过逻辑优化(dataframe中有logic plan),然后转换为map rece,生成对应的操作算子(projection,filter,join等)。有了宽依赖,窄依赖,也就能划分stage了。

Ⅵ spark 怎么通过写sql语句一行一行读数据

spark 怎么通过写sql语句一行一行读数据
Spark SQL就是shark ,也就是SQL on Spark。如果没记错的话,shark的开发利用了hive的API,所以支持读取HBase。而且Spark的数据类型兼容范围大于Hadoop,并且包含了Hadoop所支持的任何数据类型。

Ⅶ spark sql 2.3 源码解读 - Execute (7)

终于到了最后一步执行了:

最关键的两个函数便是 doPrepare和 doExecute了。

还是以上一章的sql语句为例,其最终生成的sparkplan为:

看一下SortExec的doPrepare 和 doExecute方法:

下面看child也就是ShuffleExchangeExec:

先看没有exchangeCoordinator的情况,

首先执行:

上面的方法会返回一个ShuffleDependency,ShuffleDependency中最重要的是rddWithPartitionIds,它决定了每一条InternalRow shuffle后的partition id:

接下来:

返回结果是ShuffledRowRDD:

CoalescedPartitioner的逻辑:

再看有exchangeCoordinator的情况:

同样返回的是ShuffledRowRDD:

再看doEstimationIfNecessary:

estimatePartitionStartIndices 函数得到了 partitionStartIndices:

有exchangeCoordinator的情况就生成了partitionStartIndices,从而对分区进行了调整。

最后来一个例子:

未开启exchangeCoordinator的plan:

开启exchangeCoordinator的plan:

不同之处是 两个Exchange都带了coordinator,且都是同一个coordinator。

执行withExchangeCoordinator前:

执行withExchangeCoordinator后:

生成了coordinator,且执行了 doPrepare后,可以看到两个exchange都向其注册了。

doExecute后:

原先的numPartitions是200,经过执行后,生成的partitionStartIndices为[1],也就是只有1个partition,显然在测试数据量很小的情况下,1个partition是更为合理的。这就是ExchangeCoordinator的功劳。

execute 最终的输出是rdd,剩下的结果便是spark对rdd的运算了。其实 spark sql 最终的目标便也是生成rdd,交给spark core来运算。

spark sql的介绍到这里就结束了。

Ⅷ spark sql 怎样处理日期类型

json File 日期类型 怎样处理?怎样从字符型,转换为Date或DateTime类型?

json文件如下,有字符格式的日期类型

```
{ "name" : "Andy", "age" : 30, "time" :"2015-03-03T08:25:55.769Z"}
{ "name" : "Justin", "age" : 19, "time" : "2015-04-04T08:25:55.769Z" }
{ "name" : "pan", "age" : 49, "time" : "2015-05-05T08:25:55.769Z" }
{ "name" : "penny", "age" : 29, "time" : "2015-05-05T08:25:55.769Z" }
```
默认推测的Schema:
```
root
|-- _corrupt_record: string (nullable = true)
|-- age: long (nullable = true)
|-- name: string (nullable = true)
|-- time200: string (nullable = true)
```
测试代码
```
val fileName = "person.json"
val sc = SparkUtils.getScLocal("json file 测试")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val jsonFile = sqlContext.read.json(fileName)
jsonFile.printSchema()
```
##解决方案

### 方案一、json数据 时间为 long 秒或毫秒
### 方案二、自定义schema
```
val fileName = "person.json"
val sc = SparkUtils.getScLocal("json file 测试")
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val schema: StructType = StructType(mutable.ArraySeq(
StructField("name", StringType, true),
StructField("age", StringType, true),
StructField("time", TimestampType, true)));
val jsonFile = sqlContext.read.schema(schema).json(fileName)
jsonFile.printSchema()
jsonFile.registerTempTable("person")
val now: Timestamp = new Timestamp(System.currentTimeMillis())

val teenagers = sqlContext.sql("SELECT * FROM person WHERE age >= 20 AND age <= 30 AND time <=‘" +now+"‘")
teenagers.foreach(println)
val dataFrame = sqlContext.sql("SELECT * FROM person WHERE age >= 20 AND age <= 30 AND time <=‘2015-03-03 16:25:55.769‘")
dataFrame.foreach(println)
```
###方案三、sql建表
创建表sql
```
CREATE TEMPORARY TABLE person IF NOT EXISTS
[(age: long ,name:string ,time:Timestamp)]
USING org.apache.spark.sql.json
OPTIONS ( path ‘person.json‘)
语法
CREATE [TEMPORARY] TABLE [IF NOT EXISTS]
[(col-name data-type [, …])]
USING [OPTIONS ...]
[AS ]
```

### 方案四、用textfile convert

Ⅸ 如何使用 Spark SQL

一、启动方法
/data/spark-1.4.0-bin-cdh4/bin/spark-sql --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2

注:/data/spark-1.4.0-bin-cdh4/为spark的安装路径

/data/spark-1.4.0-bin-cdh4/bin/spark-sql –help 查看启动选项

--master MASTER_URL 指定master url
--executor-memory MEM 每个executor的内存,默认为1G
--total-executor-cores NUM 所有executor的总核数
-e <quoted-query-string> 直接执行查询SQL

-f <filename> 以文件方式批量执行SQL

二、Spark sql对hive支持的功能

1、查询语句:SELECT GROUP BY ORDER BY CLUSTER BY SORT BY
2、hive操作运算:
1) 关系运算:= ==, <>, <, >, >=, <=
2) 算术运算:+, -, *, /, %
3) 逻辑运算:AND, &&, OR, ||
4) 复杂的数据结构
5) 数学函数:(sign, ln, cos, etc)
6) 字符串函数:
3、 UDF
4、 UDAF

5、 用户定义的序列化格式
6、join操作:JOIN {LEFT|RIGHT|FULL} OUTER JOIN LEFT SEMI JOIN CROSS JOIN
7、 unions操作:
8、 子查询: SELECT col FROM ( SELECT a + b AS col from t1) t2
9、Sampling
10、 Explain
11、 分区表
12、 视图
13、 hive ddl功能:CREATE TABLE、CREATE TABLE AS SELECT、ALTER TABLE

14、 支持的数据类型:TINYINT SMALLINT INT BIGINT BOOLEAN FLOAT DOUBLE STRING BINARY TIMESTAMPDATE ARRAY MAP STRUCT

三、Spark sql 在客户端编程方式进行查询数据
1、启动spark-shell
./spark-shell --master spark://master:7077 --total-executor-cores 10 --executor-memory 1g --executor-cores 2
2、编写程序
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val df = sqlContext.read.json("../examples/src/main/resources/people.json")
查看所有数据:df.show()
查看表结构:df.printSchema()
只看name列:df.select("name").show()
对数据运算:df.select(df("name"), df("age") + 1).show()
过滤数据:df.filter(df("age") > 21).show()

分组统计:df.groupBy("age").count().show()

1、查询txt数据
import sqlContext.implicits._
case class Person(name: String, age: Int)
val people = sc.textFile("../examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
people.registerTempTable("people")
val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
2、parquet文件
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")
3、hdfs文件

val df = sqlContext.read.load("hdfs://namenode.Hadoop:9000/user/hive/warehouse/spark_test.db/test_parquet/part-r-00001.gz.parquet")
4、保存查询结果数据
val df = sqlContext.read.load("../examples/src/main/resources/users.parquet")

df.select("name", "favorite_color").write.save("namesAndFavColors.parquet“)

四、Spark sql性能调优

缓存数据表:sqlContext.cacheTable("tableName")

取消缓存表:sqlContext.uncacheTable("tableName")

spark.sql.inMemoryColumnarStorage.compressedtrue当设置为true时,Spark SQL将为基于数据统计信息的每列自动选择一个压缩算法。
spark.sql.inMemoryColumnarStorage.batchSize10000柱状缓存的批数据大小。更大的批数据可以提高内存的利用率以及压缩效率,但有OOMs的风险

Ⅹ Spark SQL(十):Hive On Spark

Hive是目前大数据领域,事实上的SQL标准。其底层默认是基于MapRece实现的,但是由于MapRece速度实在比较慢,因此这几年,陆续出来了新的SQL查询引擎,包括Spark SQL,Hive On Tez,Hive On Spark等。

Spark SQL与Hive On Spark是不一样的。Spark SQL是Spark自己研发出来的针对各种数据源,包括Hive、JSON、Parquet、JDBC、RDD等都可以执行查询的,一套基于Spark计算引擎的查询引擎。因此它是Spark的一个项目,只不过提供了针对Hive执行查询的工功能而已,适合在一些使用Spark技术栈的大数据应用类系统中使用。

而Hive On Spark,是Hive的一个项目,它是将Spark作为底层的查询引擎(不通过MapRece作为唯一的查询引擎)。Hive On Spark,只适用于Hive,在可预见的未来,很有可能Hive默认的底层引擎就从MapRece切换为Spark了;适合于将原有的Hive数据仓库以及数据统计分析替换为Spark引擎,作为全公司通用的大数据统计分析引擎。

Hive On Spark做了一些优化:
1、Map Join
Spark SQL默认对join是支持使用broadcast机制将小表广播到各个节点上,以进行join的。但是问题是,这会给Driver和Worker带来很大的内存开销。因为广播的数据要一直保留在Driver内存中。所以目前采取的是,类似乎MapRece的Distributed Cache机制,即提高HDFS replica factor的复制因子,以让数据在每个计算节点上都有一个备份,从而可以在本地进行数据读取。

2、Cache Table
对于某些需要对一张表执行多次操作的场景,Hive On Spark内部做了优化,即将要多次操作的表cache到内存中,以便于提升性能。但是这里要注意,并不是对所有的情况都会自动进行cache。所以说,Hive On Spark还有很多不完善的地方。

Hive QL语句 =>
语法分析 => AST =>
生成逻辑执行计划 => Operator Tree =>
优化逻辑执行计划 => Optimized Operator Tree =>
生成物理执行计划 => Task Tree =>
优化物理执行计划 => Optimized Task Tree =>
执行优化后的Optimized Task Tree