‘壹’ 详解 Spark 中的 Bucketing
Bucketing 就是利用 buckets(按列进行分桶)来决定数据分区(partition)的一种优化技术,它可以帮助在计算中避免数据交换(avoid data shuffle)。并行计算的时候shuffle常常会耗费非常多的时间和资源.
Bucketing 的基本原理比较好理解,它会根据你指定的列(可以是一个也可以是多个)计算哈希值,然后具有相同哈希值的数据将会被分到相同的分区。
Bucket的最终目的也是实现分区,但是和Partition的原理不同,当我们根据指定列进行Partition的时候,Spark会根据列的名字对数据进行分区(如果没有指定列名则会根据一个随机信息对数据进行分区)。Bucketing的最大不同在于它使用了指定列的哈希值,这样可以保证具有相同列值的数据被分到相同的分区。
目前在使用 bucketBy 的时候,必须和 sortBy,saveAsTable 一起使用,如下。这个操作其实是将数据保存到了文件中(如果不指定path,也会保存到一个临时目录中)。
数据分桶保存之后,我们才能使用它。
在一个SparkSession内,保存之后你可以通过如下命令通过表名获取其对应的DataFrame.
其中spark是一个SparkSession对象。获取之后就可以使用DataFrame或者在sql中使用表。
如果你要使用历史保存的数据,那么就不能用上述方法了,也不能像读取常规文件一样使用 spark.read.parquet() ,这种方式读进来的数据是不带bucket信息的。正确的方法是利用CREATE TABLE 语句,详情可用参考 https://docs.databricks.com/spark/latest/spark-sql/language-manual/create-table.html
示例如下:
在我们join两个表的时候,如果两个表最好按照相同的列划分成相同的buckets,就可以完全避免shuffle。根据前面所述的hash值计算方法,两个表具有相同列值的数据会存放在相同的机器上,这样在进行join操作时就不需要再去和其他机器通讯,直接在本地完成计算即可。假设你有左右两个表,各有两个分区,那么join的时候实际计算就是下图的样子,两个机器进行计算,并且计算后分区还是2.
而当需要shuffle的时候,会是这样的,
细心的你可能发现了,上面两个分区对应两个Executor,下面shuffle之后对应的怎么成了三个Executor了?没错,当数据进行shuffle之后,分区数就不再保持和输入的数据相同了,实际上也没有必要保持相同。
我们考虑的是大数据表的连接,本地测试的时候一般使用小的表,所以逆序需要将小表自动广播的配置关掉。如果开启小表广播,那么两个小表的join之后分区数是不会变的,例如:
关闭配置的命令如下:
正常情况下join之后分区数会发生变化:
这个200其实就是 "spark.sql.shuffle.partitions" 配置的值,默认就是200. 所以如果在Join过程中出现了shuffle,join之后的分区一定会变,并且变成spark.sql.shuffle.partitions的值。通常你需要根据自己的集群资源修改这个值,从而优化并行度,但是shuffle是不可避免的。
实际测试结果如下:
Spark依然会利用一些Bucekt的信息,但具体怎么执行目前还不太清楚,还是保持一致的好。
另外,如果你spark job的可用计算核心数小于Bucket值,那么从文件中读取之后Bucekt值会变,就是说bucket的数目不会超过你能使用的最大计算核数。
在处理null值的时候,我们可能会用到一些特殊的函数或者符号,如下表所示。但是在使用bucket的时候这里有个坑,一定要躲过。join的时候千万不要使用 <=> 符号,使用之后spark就会忽略bucket信息,继续shuffle数据,原因可能和hash计算有关。
原文连接
‘贰’ sparksql 内存不释放
1.1. Spark SQL的前世今生
Shark是一个为Spark设计的大规模数据仓库系统,它与Hive兼容。Shark建立在Hive的代码基础上,并通过将Hive的部分物理执行计划交换出来。这个方法使得Shark的用户可以加速Hive的查询,但是Shark继承了Hive的大且复杂的代码使得Shark很难优化和维护,同时Shark依赖于Spark的版本。随着我们遇到了性能优化的上限,以及集成SQL的一些复杂的分析功能,我们发现Hive的MapRece设计的框架限制了Shark的发展。在2014年7月1日的Spark Summit上,Databricks宣布终止对Shark的开发,将重点放到Spark SQL上。
1.2. 什么是Spark SQL
Spark SQL是Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。
相比于Spark RDD API,Spark SQL包含了对结构化数据和在其上运算的更多信息,Spark SQL使用这些信息进行了额外的优化,使对结构化数据的操作更加高效和方便。
有多种方式去使用Spark SQL,包括SQL、DataFrames API和Datasets API。但无论是哪种API或者是编程语言,它们都是基于同样的执行引擎,因此你可以在不同的API之间随意切换,它们各有各的特点,看你喜欢那种风格。
1.3. 为什么要学习Spark SQL
我们已经学习了Hive,它是将Hive SQL转换成MapRece然后提交到集群中去执行,大大简化了编写MapRece程序的复杂性,由于MapRece这种计算模型执行效率比较慢,所以Spark SQL应运而生,它是将Spark SQL转换成RDD,然后提交到集群中去运行,执行效率非常快!
1.易整合
‘叁’ Spark连接到MySQL并执行查询为什么速度会快
spark mysql 为什么快
一,SQL查询优化:指,使用的语句是不是冗余的,就是有没有无用的。 你可用用explain 你的语句来比较分板一番。比如:select * from wc where 1;与select * from wc二者的执行时间不一样的; 二,SQL执行计划就是用于描述SQL引擎在执行一个sql语
‘肆’ spark sql怎么划分stage
其实sql就是关系操作。关系操作跟map,rece这些基础算子对应起来的(spark其实基础算子也是map,rece,只是在此基础上做了扩展)。比如projection,filter是窄依赖,join,semi join,outer join是宽依赖。
具体流程会比较复杂。首先spark会解析这条sql,生成语法树(spark2.0会通过antlr4解析),然后经过逻辑优化(dataframe中有logic plan),然后转换为map rece,生成对应的操作算子(projection,filter,join等)。有了宽依赖,窄依赖,也就能划分stage了。
‘伍’ spark 跑spark sql时cpu利用率特别高怎么办
优化过程中常用到方法
查看查询的整个运行计划
scala>query.queryExecution
查看查询的Unresolved LogicalPlan
scala>query.queryExecution.logical
查看查询的Analyzed LogicalPlan
scala>query.queryExecution.analyzed
查看优化后的LogicalPlan
scala>query.queryExecution.optimizedPlan
查看物理计划
scala>query.queryExecution.sparkPlan
查看RDD的转换过程
scala>query.toDebugString
SparkSQL调优
Spark是一个快速的内存计算框架,同时是一个并行运算的框架,在计算性能调优的时候,除了要考虑广为人知的木桶原理外,还要考虑平行运算的Amdahl定理。
木桶原理又称短板理论,其核心思想是:一只木桶盛水的多少,并不取决于桶壁上最高的那块木块,而是取决于桶壁上最短的那块。将这个理论应用到系统性能优化上,系统的最终性能取决于系统中性能表现最差的组件。例如,即使系统拥有充足的内存资源和CPU资源,但是如果磁盘I/O性能低下,那么系统的总体性能是取决于当前最慢的磁盘I/O速度,而不是当前最优越的CPU或者内存。在这种情况下,如果需要进一步提升系统性能,优化内存或者CPU资源是毫无用处的。只有提高磁盘I/O性能才能对系统的整体性能进行优化。
SparkSQL作为Spark的一个组件,在调优的时候,也要充分考虑到上面的两个原理,既要考虑如何充分的利用硬件资源,又要考虑如何利用好分布式系统的并行计算。由于测试环境条件有限,本篇不能做出更详尽的实验数据来说明,只能在理论上加以说明。
2.1 并行性
SparkSQL在集群中运行,将一个查询任务分解成大量的Task分配给集群中的各个节点来运行。通常情况下,Task的数量是大于集群的并行度,shuffle的时候缺省的spark.sql.shuffle.partitionsw为200个partition,也就是200个Task:
而实验的集群环境却只能并行3个Task,也就是说同时只能有3个Task保持Running:
这时大家就应该明白了,要跑完这200个Task就要跑200/3=67批次。如何减少运行的批次呢?那就要尽量提高查询任务的并行度。查询任务的并行度由两方面决定:集群的处理能力和集群的有效处理能力。
l对于Spark Standalone集群来说,集群的处理能力是由conf/spark-env中的SPARK_WORKER_INSTANCES参数、SPARK_WORKER_CORES参数决定的;而SPARK_WORKER_INSTANCES*SPARK_WORKER_CORES不能超过物理机器的实际CPU core;
l集群的有效处理能力是指集群中空闲的集群资源,一般是指使用spark-submit或spark-shell时指定的--total-executor-cores,一般情况下,我们不需要指定,这时候,Spark Standalone集群会将所有空闲的core分配给查询,并且在Task轮询运行过程中,Standalone集群会将其他spark应用程序运行完后空闲出来的core也分配给正在运行中的查询。
综上所述,SparkSQL的查询并行度主要和集群的core数量相关,合理配置每个节点的core可以提高集群的并行度,提高查询的效率。
2.2 高效的数据格式
高效的数据格式,一方面是加快了数据的读入速度,另一方面可以减少内存的消耗。高效的数据格式包括多个方面:
2.2.1 数据本地性
分布式计算系统的精粹在于移动计算而非移动数据,但是在实际的计算过程中,总存在着移动数据的情况,除非是在集群的所有节点上都保存数据的副本。移动数据,将数据从一个节点移动到另一个节点进行计算,不但消耗了网络IO,也消耗了磁盘IO,降低了整个计算的效率。为了提高数据的本地性,除了优化算法(也就是修改spark内存,难度有点高),就是合理设置数据的副本。设置数据的副本,这需要通过配置参数并长期观察运行状态才能获取的一个经验值。
下面是Spark webUI监控Stage的一个图:
lPROCESS_LOCAL是指读取缓存在本地节点的数据
lNODE_LOCAL是指读取本地节点硬盘数据
lANY是指读取非本地节点数据
l通常读取数据PROCESS_LOCAL>NODE_LOCAL>ANY,尽量使数据以PROCESS_LOCAL或NODE_LOCAL方式读取。其中PROCESS_LOCAL还和cache有关。
2.2.2 合适的数据类型
对于要查询的数据,定义合适的数据类型也是非常有必要。对于一个tinyint可以使用的数据列,不需要为了方便定义成int类型,一个tinyint的数据占用了1个byte,而int占用了4个byte。也就是说,一旦将这数据进行缓存的话,内存的消耗将增加数倍。在SparkSQL里,定义合适的数据类型可以节省有限的内存资源。
2.2.3 合适的数据列
对于要查询的数据,在写SQL语句的时候,尽量写出要查询的列名,如Select a,b from tbl,而不是使用Select * from tbl;这样不但可以减少磁盘IO,也减少缓存时消耗的内存。
2.2.4 优的数据存储格式
在查询的时候,最终还是要读取存储在文件系统中的文件。采用更优的数据存储格式,将有利于数据的读取速度。查看SparkSQL的Stage,可以发现,很多时候,数据读取消耗占有很大的比重。对于sqlContext来说,支持 textFiile、SequenceFile、ParquetFile、jsonFile;对于hiveContext来说,支持AvroFile、ORCFile、Parquet File,以及各种压缩。根据自己的业务需求,测试并选择合适的数据存储格式将有利于提高SparkSQL的查询效率。
2.3 内存的使用
spark应用程序最纠结的地方就是内存的使用了,也是最能体现“细节是魔鬼”的地方。Spark的内存配置项有不少,其中比较重要的几个是:
lSPARK_WORKER_MEMORY,在conf/spark-env.sh中配置SPARK_WORKER_MEMORY 和SPARK_WORKER_INSTANCES,可以充分的利用节点的内存资源,SPARK_WORKER_INSTANCES*SPARK_WORKER_MEMORY不要超过节点本身具备的内存容量;
lexecutor-memory,在spark-shell或spark-submit提交spark应用程序时申请使用的内存数量;不要超过节点的SPARK_WORKER_MEMORY;
lspark.storage.memoryFraction spark应用程序在所申请的内存资源中可用于cache的比例
lspark.shuffle.memoryFraction spark应用程序在所申请的内存资源中可用于shuffle的比例
在实际使用上,对于后两个参数,可以根据常用查询的内存消耗情况做适当的变更。另外,在SparkSQL使用上,有几点建议:
l对于频繁使用的表或查询才进行缓存,对于只使用一次的表不需要缓存;
l对于join操作,优先缓存较小的表;
l要多注意Stage的监控,多思考如何才能更多的Task使用PROCESS_LOCAL;
l要多注意Storage的监控,多思考如何才能Fraction cached的比例更多
2.4 合适的Task
对于SparkSQL,还有一个比较重要的参数,就是shuffle时候的Task数量,通过spark.sql.shuffle.partitions来调节。调节的基础是spark集群的处理能力和要处理的数据量,spark的默认值是200。Task过多,会产生很多的任务启动开销,Task多少,每个Task的处理时间过长,容易straggle。
2.5 其他的一些建议
优化的方面的内容很多,但大部分都是细节性的内容,下面就简单地提提:
l 想要获取更好的表达式查询速度,可以将spark.sql.codegen设置为Ture;
l 对于大数据集的计算结果,不要使用collect() ,collect()就结果返回给driver,很容易撑爆driver的内存;一般直接输出到分布式文件系统中;
l 对于Worker倾斜,设置spark.speculation=true 将持续不给力的节点去掉;
l 对于数据倾斜,采用加入部分中间步骤,如聚合后cache,具体情况具体分析;
l 适当的使用序化方案以及压缩方案;
l 善于利用集群监控系统,将集群的运行状况维持在一个合理的、平稳的状态;
l 善于解决重点矛盾,多观察Stage中的Task,查看最耗时的Task,查找原因并改善;
‘陆’ spark sql启动后执行越来越慢是为什么
Shark为了实现Hive兼容,在HQL方面重用了Hive中HQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MR作业替换成了Spark作业(辅以内存列式存储等各种和Hive关系不大的优化);同时还依赖HiveMetastore和HiveSe
‘柒’ “每日一道大数据面试题系列”spark如何调优
如果面试时被问到spark任务如何调优,我们该如何回答呢?
下面我们从四大方面回答这个问题,保证吊打面试官。
一、spark性能调优
1、分配更多的资源
比如增加执行器个数(num_executor)、增加执行器个数(executor_cores)、增加执行器内存(executor_memory)
2、调节并行度
spark.default.parallelism
3、重构RDD架构以及RDD持久化
尽量去复用RDD,差不多的RDD可以抽取成一个共同的RDD,公共RDD一定要实现持久化
4、广播变量
SparkContext.broadcast方法创建一个对象,通过value方法访问
5、使用kryo序列化
SparkConf中设置属性:spark.serializer: org.apache.spark.serializer.kryoSerializer
6、使用fastutil优化数据格式(代替java中的Array、List、Set、Map)
7、调节数据本地化等待时长
调节参数: spark.locality.wait
二、JVM调优
降低cache操作的内存占比 1.6版本之前使用的是静态内存管理
spark中堆内存被划分为两块:
一块是专门来给RDD作cachepersist持久化的 StorageMemory,另一块是给spark算子函数运行使用的,存放函数中自己创建的对象。
1.6版本之后采用统一内存管理机制
storage和execution各占50%,若己方不足对方空余可占用对方空间
可尝试调节executor堆外内存
spark.yarn.executor.memoryOverhead = 2048m
调节连接等待时长
spark.core.connection.ack.wait.timeout = 300
三、shuffle数据倾斜调优
1、预聚合源数据,对hive源表提前进行聚合操作,在hive聚合之后,spark任务再去读取
2、检查倾斜的key是否是脏数据,可以提前过滤
3、提高shuffle操作rece的并行度
4、使用随机key实现双重聚合
5、将rece端 join转换成map端 join
6、sample采样倾斜key,单独进行join后在union
7、使用随机数以及扩容表进行join
四、算子调优
1、使用mapPartition提升map类操作的性能
2、filter过后使用coalesce减少分区数量
3、使用foreachPartition优化写数据性能
4、使用repartition解决sparkSql低并行度的性能问题
5、receByKey替换groupByKey实现map读预聚合
‘捌’ spark sql启动后执行越来越慢是为什么
Shark为了实现Hive兼容,在HQL方面重用了Hive中HQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MR作业替换成了Spark作业(辅以内存列式存储等各种和Hive关系不大的优化);同时还依赖Hive Metastore和Hive SerDe(用于兼容现有的各种Hive存储格式)。这一策略导致了两个问题,第一是执行计划优化完全依赖于Hive,不方便添加新的优化策略;二是因为MR是进程级并行,写代码的时候不是很注意线程安全问题,导致Shark不得不使用另外一套独立维护的打了补丁的Hive源码分支(至于为何相关修改没有合并到Hive主线,我也不太清楚)。
‘玖’ Spark SQL 到底怎么搭建起来
一般spark sql用于访问hive集群的表数据吧?
我们的spark是访问hive集群的,步骤还是很简单的,大致如下:
1)安装spark时需要将hive-site.xml,yarn-site.xml,hdfs-site.xml都拷贝到spark/conf中(yarn-site.xml是因为我们是spark on yarn)
2)编程时用HiveContext,调用sql(...)就好了,如:
val hc = new HiveContext(sc)
hc.sql( "select ..." ) 这里的sql语句自己发挥吧~
不过spark sql稳定性不高,写复杂语句时partition和优化策略不太合理,小数据量玩一下就好(如spark streaming中使用也还可以),大数据量暂时不建议用~
‘拾’ Spark SQL 和 Shark 在架构上有哪些区别将来会合并吗
Shark为了实现Hive兼容,在HQL方面重用了Hive中HQL的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从MR作业替换成了Spark作业(辅以内存列式存储等各种和Hive关系不大的优化);同时还依赖Hive Metastore和Hive SerDe(用于兼容现有的各种Hive存储格式)。这一策略导致了两个问题,第一是执行计划优化完全依赖于Hive,不方便添加新的优化策略;二是因为MR是进程级并行,写代码的时候不是很注意线程安全问题,导致Shark不得不使用另外一套独立维护的打了补丁的Hive源码分支。
Spark SQL解决了这两个问题。第一,Spark SQL在Hive兼容层面仅依赖HQL parser、Hive Metastore和Hive SerDe。也就是说,从HQL被解析成抽象语法树(AST)起,就全部由Spark SQL接管了。执行计划生成和优化都由Catalyst负责。借助Scala的模式匹配等函数式语言特性,利用Catalyst开发执行计划优化策略比Hive要简洁得多。去年Spark summit上Catalyst的作者Michael Armbrust对Catalyst做了一个简要介绍:2013 | Spark Summit(知乎竟然不能自定义链接的文字?)。第二,相对于Shark,由于进一步削减了对Hive的依赖,Spark SQL不再需要自行维护打了patch的Hive分支。Shark后续将全面采用Spark SQL作为引擎,不仅仅是查询优化方面。
此外,除了兼容HQL、加速现有Hive数据的查询分析以外,Spark SQL还支持直接对原生RDD对象进行关系查询。同时,除了HQL以外,Spark SQL还内建了一个精简的SQL parser,以及一套Scala DSL。也就是说,如果只是使用Spark SQL内建的SQL方言或Scala DSL对原生RDD对象进行关系查询,用户在开发Spark应用时完全不需要依赖Hive的任何东西。
能够对原生RDD对象进行关系查询,个人认为大大降低了用户门槛。一方面当然是因为熟悉SQL的人比熟悉Spark API的人多,另一方面是因为Spark SQL之下有Catalyst驱动的查询计划优化引擎。虽然在很多方面Spark的性能完爆Hadoop MapRece好几条街,但Spark的运行时模型也比MapRece复杂不少,使得Spark应用的性能调优比较tricky。虽然从代码量上来看,Spark应用往往是对等的MR应用的好几分之一,但裸用Spark API开发高效Spark应用还是需要花些心思的。这就体现出Spark SQL的优势了:即便用户写出的查询不那么高效,Catalyst也可以自动应用一系列常见优化策略。
。。