Ⅰ spark 跑spark sql时cpu利用率特别高怎么办
优化过程中常用到方法
查看查询的整个运行计划
scala>query.queryExecution
查看查询的Unresolved LogicalPlan
scala>query.queryExecution.logical
查看查询的Analyzed LogicalPlan
scala>query.queryExecution.analyzed
查看优化后的LogicalPlan
scala>query.queryExecution.optimizedPlan
查看物理计划
scala>query.queryExecution.sparkPlan
查看RDD的转换过程
scala>query.toDebugString
SparkSQL调优
Spark是一个快速的内存计算框架,同时是一个并行运算的框架,在计算性能调优的时候,除了要考虑广为人知的木桶原理外,还要考虑平行运算的Amdahl定理。
木桶原理又称短板理论,其核心思想是:一只木桶盛水的多少,并不取决于桶壁上最高的那块木块,而是取决于桶壁上最短的那块。将这个理论应用到系统性能优化上,系统的最终性能取决于系统中性能表现最差的组件。例如,即使系统拥有充足的内存资源和CPU资源,但是如果磁盘I/O性能低下,那么系统的总体性能是取决于当前最慢的磁盘I/O速度,而不是当前最优越的CPU或者内存。在这种情况下,如果需要进一步提升系统性能,优化内存或者CPU资源是毫无用处的。只有提高磁盘I/O性能才能对系统的整体性能进行优化。
SparkSQL作为Spark的一个组件,在调优的时候,也要充分考虑到上面的两个原理,既要考虑如何充分的利用硬件资源,又要考虑如何利用好分布式系统的并行计算。由于测试环境条件有限,本篇不能做出更详尽的实验数据来说明,只能在理论上加以说明。
2.1 并行性
SparkSQL在集群中运行,将一个查询任务分解成大量的Task分配给集群中的各个节点来运行。通常情况下,Task的数量是大于集群的并行度,shuffle的时候缺省的spark.sql.shuffle.partitionsw为200个partition,也就是200个Task:
而实验的集群环境却只能并行3个Task,也就是说同时只能有3个Task保持Running:
这时大家就应该明白了,要跑完这200个Task就要跑200/3=67批次。如何减少运行的批次呢?那就要尽量提高查询任务的并行度。查询任务的并行度由两方面决定:集群的处理能力和集群的有效处理能力。
l对于Spark Standalone集群来说,集群的处理能力是由conf/spark-env中的SPARK_WORKER_INSTANCES参数、SPARK_WORKER_CORES参数决定的;而SPARK_WORKER_INSTANCES*SPARK_WORKER_CORES不能超过物理机器的实际CPU core;
l集群的有效处理能力是指集群中空闲的集群资源,一般是指使用spark-submit或spark-shell时指定的--total-executor-cores,一般情况下,我们不需要指定,这时候,Spark Standalone集群会将所有空闲的core分配给查询,并且在Task轮询运行过程中,Standalone集群会将其他spark应用程序运行完后空闲出来的core也分配给正在运行中的查询。
综上所述,SparkSQL的查询并行度主要和集群的core数量相关,合理配置每个节点的core可以提高集群的并行度,提高查询的效率。
2.2 高效的数据格式
高效的数据格式,一方面是加快了数据的读入速度,另一方面可以减少内存的消耗。高效的数据格式包括多个方面:
2.2.1 数据本地性
分布式计算系统的精粹在于移动计算而非移动数据,但是在实际的计算过程中,总存在着移动数据的情况,除非是在集群的所有节点上都保存数据的副本。移动数据,将数据从一个节点移动到另一个节点进行计算,不但消耗了网络IO,也消耗了磁盘IO,降低了整个计算的效率。为了提高数据的本地性,除了优化算法(也就是修改spark内存,难度有点高),就是合理设置数据的副本。设置数据的副本,这需要通过配置参数并长期观察运行状态才能获取的一个经验值。
下面是Spark webUI监控Stage的一个图:
lPROCESS_LOCAL是指读取缓存在本地节点的数据
lNODE_LOCAL是指读取本地节点硬盘数据
lANY是指读取非本地节点数据
l通常读取数据PROCESS_LOCAL>NODE_LOCAL>ANY,尽量使数据以PROCESS_LOCAL或NODE_LOCAL方式读取。其中PROCESS_LOCAL还和cache有关。
2.2.2 合适的数据类型
对于要查询的数据,定义合适的数据类型也是非常有必要。对于一个tinyint可以使用的数据列,不需要为了方便定义成int类型,一个tinyint的数据占用了1个byte,而int占用了4个byte。也就是说,一旦将这数据进行缓存的话,内存的消耗将增加数倍。在SparkSQL里,定义合适的数据类型可以节省有限的内存资源。
2.2.3 合适的数据列
对于要查询的数据,在写SQL语句的时候,尽量写出要查询的列名,如Select a,b from tbl,而不是使用Select * from tbl;这样不但可以减少磁盘IO,也减少缓存时消耗的内存。
2.2.4 优的数据存储格式
在查询的时候,最终还是要读取存储在文件系统中的文件。采用更优的数据存储格式,将有利于数据的读取速度。查看SparkSQL的Stage,可以发现,很多时候,数据读取消耗占有很大的比重。对于sqlContext来说,支持 textFiile、SequenceFile、ParquetFile、jsonFile;对于hiveContext来说,支持AvroFile、ORCFile、Parquet File,以及各种压缩。根据自己的业务需求,测试并选择合适的数据存储格式将有利于提高SparkSQL的查询效率。
2.3 内存的使用
spark应用程序最纠结的地方就是内存的使用了,也是最能体现“细节是魔鬼”的地方。Spark的内存配置项有不少,其中比较重要的几个是:
lSPARK_WORKER_MEMORY,在conf/spark-env.sh中配置SPARK_WORKER_MEMORY 和SPARK_WORKER_INSTANCES,可以充分的利用节点的内存资源,SPARK_WORKER_INSTANCES*SPARK_WORKER_MEMORY不要超过节点本身具备的内存容量;
lexecutor-memory,在spark-shell或spark-submit提交spark应用程序时申请使用的内存数量;不要超过节点的SPARK_WORKER_MEMORY;
lspark.storage.memoryFraction spark应用程序在所申请的内存资源中可用于cache的比例
lspark.shuffle.memoryFraction spark应用程序在所申请的内存资源中可用于shuffle的比例
在实际使用上,对于后两个参数,可以根据常用查询的内存消耗情况做适当的变更。另外,在SparkSQL使用上,有几点建议:
l对于频繁使用的表或查询才进行缓存,对于只使用一次的表不需要缓存;
l对于join操作,优先缓存较小的表;
l要多注意Stage的监控,多思考如何才能更多的Task使用PROCESS_LOCAL;
l要多注意Storage的监控,多思考如何才能Fraction cached的比例更多
2.4 合适的Task
对于SparkSQL,还有一个比较重要的参数,就是shuffle时候的Task数量,通过spark.sql.shuffle.partitions来调节。调节的基础是spark集群的处理能力和要处理的数据量,spark的默认值是200。Task过多,会产生很多的任务启动开销,Task多少,每个Task的处理时间过长,容易straggle。
2.5 其他的一些建议
优化的方面的内容很多,但大部分都是细节性的内容,下面就简单地提提:
l 想要获取更好的表达式查询速度,可以将spark.sql.codegen设置为Ture;
l 对于大数据集的计算结果,不要使用collect() ,collect()就结果返回给driver,很容易撑爆driver的内存;一般直接输出到分布式文件系统中;
l 对于Worker倾斜,设置spark.speculation=true 将持续不给力的节点去掉;
l 对于数据倾斜,采用加入部分中间步骤,如聚合后cache,具体情况具体分析;
l 适当的使用序化方案以及压缩方案;
l 善于利用集群监控系统,将集群的运行状况维持在一个合理的、平稳的状态;
l 善于解决重点矛盾,多观察Stage中的Task,查看最耗时的Task,查找原因并改善;
Ⅱ spark SQL和hive到底什么关系
Hive是一种基于HDFS的数据仓库,并且提供了基于SQL模型的,针对存储了大数据的数据仓库,进行分布式交互查询的查询引擎。
SparkSQL并不能完全替代Hive,它替代的是Hive的查询引擎,SparkSQL由于其底层基于Spark自身的基于内存的特点,因此速度是Hive查询引擎的数倍以上,Spark本身是不提供存储的,所以不可能替代Hive作为数据仓库的这个功能。
SparkSQL相较于Hive的另外一个优点,是支持大量不同的数据源,包括hive、json、parquet、jdbc等等。SparkSQL由于身处Spark技术堆栈内,基于RDD来工作,因此可以与Spark的其他组件无缝整合使用,配合起来实现许多复杂的功能。比如SparkSQL支持可以直接针对hdfs文件执行sql语句。
Ⅲ spark的sqlcontext怎么show databases
spark是一个分布式计算框架, 从他的作业调度可以看到,它的资源分配粒度很粗,CPU的核数进行分配的,集群的CPU资源是有限的
同时spark sql资源计算时需要把大量数据加载到内存中,需要消耗集群大量的内存资源,再做shuffle的时候,又需要消耗大量的网络IO和磁盘IO, 如果同时多个job执行,那么每个job获得资源要么少,要么需要排队。而不能像关系型数据库那么提供高并发的服务。
Ⅳ spark Provider org.apache.spark.sql.hive.orc.DefaultSource could not be instantiated
解决办法:在程序上
去掉程序中的setMaster("local")
这条语句并不是在集群中提交job
Fei joe
val conf = new SparkConf().setAppName("Map").setMaster("local")
Ⅳ spark sql支持哪些sql操作
支持Shark和sparkSQL 。
但是,随着Spark的发展,其中sparkSQL作为Spark生态的一员继续发展,而不再受限于hive,只是兼容hive;而hive on
spark是一个hive的发展计划,该计划将spark作为hive的底层引擎之一,也就是说,hive将不再受限于一个引擎,可以采用map-
rece、Tez、spark等引擎。
Ⅵ Spark SQL到底支持什么SQL语句
Spark SQL到底支持什么SQL语句
scala语言不是很容易懂,但是里面有解析SQL的方法,可以看出支持的SQL语句,至少关键词是很明确的。
protected val ALL = Keyword("ALL")
protected val AND = Keyword("AND")
protected val APPROXIMATE = Keyword("APPROXIMATE")
protected val AS = Keyword("AS")
protected val ASC = Keyword("ASC")
protected val BETWEEN = Keyword("BETWEEN")
protected val BY = Keyword("BY")
protected val CASE = Keyword("CASE")
protected val CAST = Keyword("CAST")
protected val DESC = Keyword("DESC")
protected val DISTINCT = Keyword("DISTINCT")
Ⅶ sparkSQL和spark有什么区别
Spark为结构化数据处理引入了一个称为Spark SQL的编程模块。简而言之,sparkSQL是Spark的前身,是在Hadoop发展过程中,为了给熟悉RDBMS但又不理解MapRece的技术人员提供快速上手的工具。
sparkSQL提供了一个称为DataFrame(数据框)的编程抽象,DF的底层仍然是RDD,并且可以充当分布式SQL查询引擎。
SparkSql有哪些特点呢?
1)引入了新的RDD类型SchemaRDD,可以像传统数据库定义表一样来定义SchemaRDD。
2)在应用程序中可以混合使用不同来源的数据,如可以将来自HiveQL的数据和来自SQL的数据进行Join操作。
3)内嵌了查询优化框架,在把SQL解析成逻辑执行计划之后,最后变成RDD的计算。
Ⅷ hive和sparksql的区别
历史上存在的原理,以前都是使用hive来构建数据仓库,所以存在大量对hive所管理的数据查询的需求。而hive、shark、sparlSQL都可以进行hive的数据查询。shark是使用了hive的sql语法解析器和优化器,修改了执行器,使之物理执行过程是跑在spark上;而sparkSQL是使用了自身的语法解析器、优化器和执行器,同时sparkSQL还扩展了接口,不单单支持hive数据的查询,可以进行多种数据源的数据查询。
Ⅸ Spark RDD,DataFrame和DataSet的区别
RDD、DataFrame和DataSet是容易产生混淆的概念,必须对其相互之间对比,才可以知道其中异同。
RDD和DataFrame
RDD-DataFrame
上图直观地体现了DataFrame和RDD的区别。左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解
Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使得Spark
SQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。DataFrame多了数据的结构信息,即schema。RDD是分布式的
Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效
率、减少数据读取以及执行计划的优化,比如filter下推、裁剪等。
提升执行效率
RDD
API是函数式的,强调不变性,在大部分场景下倾向于创建新对象而不是修改老对象。这一特点虽然带来了干净整洁的API,却也使得Spark应用程序在运
行期倾向于创建大量临时对象,对GC造成压力。在现有RDD
API的基础之上,我们固然可以利用mapPartitions方法来重载RDD单个分片内的数据创建方式,用复用可变对象的方式来减小对象分配和GC的
开销,但这牺牲了代码的可读性,而且要求开发者对Spark运行时机制有一定的了解,门槛较高。另一方面,Spark
SQL在框架内部已经在各种可能的情况下尽量重用对象,这样做虽然在内部会打破了不变性,但在将数据返回给用户时,还会重新转为不可变数据。利用
DataFrame API进行开发,可以免费地享受到这些优化效果。
减少数据读取
分析大数据,最快的方法就是 ——忽略它。这里的“忽略”并不是熟视无睹,而是根据查询条件进行恰当的剪枝。
上文讨论分区表时提到的分区剪 枝便是其中一种——当查询的过滤条件中涉及到分区列时,我们可以根据查询条件剪掉肯定不包含目标数据的分区目录,从而减少IO。
对于一些“智能”数据格 式,Spark
SQL还可以根据数据文件中附带的统计信息来进行剪枝。简单来说,在这类数据格式中,数据是分段保存的,每段数据都带有最大值、最小值、null值数量等
一些基本的统计信息。当统计信息表名某一数据段肯定不包括符合查询条件的目标数据时,该数据段就可以直接跳过(例如某整数列a某段的最大值为100,而查
询条件要求a > 200)。
此外,Spark SQL也可以充分利用RCFile、ORC、Parquet等列式存储格式的优势,仅扫描查询真正涉及的列,忽略其余列的数据。
执行优化
人口数据分析示例
为了说明查询优化,我们来看上图展示的人口数据分析的示例。图中构造了两个DataFrame,将它们join之后又做了一次filter操作。如
果原封不动地执行这个执行计划,最终的执行效率是不高的。因为join是一个代价较大的操作,也可能会产生一个较大的数据集。如果我们能将filter
下推到 join下方,先对DataFrame进行过滤,再join过滤后的较小的结果集,便可以有效缩短执行时间。而Spark
SQL的查询优化器正是这样做的。简而言之,逻辑查询计划优化就是一个利用基于关系代数的等价变换,将高成本的操作替换为低成本操作的过程。
得到的优化执行计划在转换成物 理执行计划的过程中,还可以根据具体的数据源的特性将过滤条件下推至数据源内。最右侧的物理执行计划中Filter之所以消失不见,就是因为溶入了用于执行最终的读取操作的表扫描节点内。
对于普通开发者而言,查询优化 器的意义在于,即便是经验并不丰富的程序员写出的次优的查询,也可以被尽量转换为高效的形式予以执行。
RDD和DataSet
DataSet以Catalyst逻辑执行计划表示,并且数据以编码的二进制形式被存储,不需要反序列化就可以执行sorting、shuffle等操作。
DataSet创立需要一个显式的Encoder,把对象序列化为二进制,可以把对象的scheme映射为SparkSQl类型,然而RDD依赖于运行时反射机制。
通过上面两点,DataSet的性能比RDD的要好很多。
DataFrame和DataSet
Dataset可以认为是DataFrame的一个特例,主要区别是Dataset每一个record存储的是一个强类型值而不是一个Row。因此具有如下三个特点:
DataSet可以在编译时检查类型
并且是面向对象的编程接口。用wordcount举例:
//DataFrame
// Load a text file and interpret each line as a java.lang.String
val ds = sqlContext.read.text("/home/spark/1.6/lines").as[String]
val result = ds
.flatMap(_.split(" ")) // Split on whitespace
.filter(_ != "") // Filter empty words
.toDF() // Convert to DataFrame to perform aggregation / sorting
.groupBy($"value") // Count number of occurences of each word
.agg(count("*") as "numOccurances")
.orderBy($"numOccurances" desc) // Show most common words first
后面版本DataFrame会继承DataSet,DataFrame是面向Spark SQL的接口。
//DataSet,完全使用scala编程,不要切换到DataFrame
val wordCount =
ds.flatMap(_.split(" "))
.filter(_ != "")
.groupBy(_.toLowerCase()) // Instead of grouping on a column expression (i.e. $"value") we pass a lambda function
.count()
DataFrame和DataSet可以相互转化, df.as[ElementType] 这样可以把DataFrame转化为DataSet, ds.toDF() 这样可以把DataSet转化为DataFrame。