① Spark 处理小文件
不论是Hive还是Spark sql在使用过程中都可能会遇到小文件过多的问题。小文件过多最直接的表现是任务执行时间长,查看Spark log会发现大量的数据移动的日志。我们可以查看log中展现的日志信息,去对应的路径下查看文件的大小和个数。
通过上述命令可以查看文件的个数以及大小。count查看出的文件大小单位是B,需要转换为MB。
在spark官方的推荐文档中,parquet格式的文件推荐大小是128MB,小于该大小的均可以称之为小文件,在实际的工作,往往小文件的大小仅仅为几KB,表现为,可能文件大小为几百MB,但是文件个数可能到达了几十万个。一般来说,我们可以通过简单相除获得文件的平均大小,如果文件数目不多,我们也可以通过下述命令获得每个文件的大小。
1.任务执行时间长
2.真实的文件大小独占一个数据存储块,存放到DataNode节点中。同时 DataNode一般默认存三份副本,以保障数据安全。同时该文件所存放的位置也写入到NameNode的内存中,如果有Secondary NameNode高可用节点,也可同时复制一份过去。NameNode的内存数据将会存放到硬盘中,如果HDFS发生重启,将产生较长时间的元数据从硬盘读到内存的过程。
3.不论在Hive还是在Spark中,每一个存储块都对应一个Map程序,一个Map呈现就需要一个JVM,启动一个JVM去读取或者写小文件是吃力不讨好的行为。在实际的生产中,为了更好的管理集群资源,一般会要求程序执行时限制Executor数量和每个Executor的核心数量,需要频繁创建Executor来读取写入。
5.影响磁盘寻址时间
小文件合并,本质上就是通过某种操作,将一系列小文件合并成大文件。我们知道,以MapRece为代表的大数据系统,都习惯用K-V键值对的形式来处理文件,最后文件落盘,也是一个rece对应一个输出文件。所以直观上,我们可以减少rece数量,达到减少文件数量的目的。
从Map到Rece需要一个Shuffle过程,所以我们将小文件合并理解为通过一个Shuffle,合并小文件成一个大文件。基于这样的思想,我们的策略可以分为两类:一类是原来的计算已经有Shuffle了,那么我们可以认为控制输出文件的数量;二类是强制触发Shuffle,进行小文件合并。
1-设置参数 (一般用于Hive)
2-distribute by rand()
往动态分区插入数据时,在已经写好的SQL末尾加上distribute by rand()
该算子只是起到打散的效果,但是我们还要设置文件的大小,以免打散后仍然有小文件。
表示每个rece的大小,Hive可以数据总量,得到rece个数,假设hive认为会有10个rece,那么,这里rand()则会为 x % 10
3-group by
我们知道,group by算子会触发Shuffle,因此只要我们设置好Shuffle时的文件个数就好,在Spark SQL中,我们可以设置partition个数,因为一个partition会对应一个文件。
上述的操作,会触发shuffle,因此我们再设置partition个数。
则表示,shuffle后,只会产生10个partition.
4-repartition()
5-coalesce()
需要注意的是,4和5都是spark 2.4以及以后才会支持的。
② Hive优化之Hive的配置参数优化
Hive是大数据领域常用的组件之一,主要用于大数据离线数仓的运算,关于Hive的性能调优在日常工作和面试中是经常涉及的一个点,因此掌握一些Hive调优是必不可少的一项技能。影响Hive效率的主要因素有数据倾斜、数据冗余、job的IO以及不同底层引擎配置情况和Hive本身参数和HiveSQL的执行等。本文主要从建表配置参数方面对Hive优化进行讲解。
1. 创建一个普通表
table test_user1(id int, name string,code string,code_id string ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';
2. 查看这张表的信息
DESCRIBE FORMATTED test_user1;
我们从该表的描述信息介绍建表时的一些可优化点。
2.1 表的文件数
numFiles表示表中含有的文件数,当文件数过多时可能意味着该表的小文件过多,这时候我们可以针对小文件的问题进行一些优化,HDFS本身提供了解决方案:
(1)Hadoop Archive/HAR:将小文件打包成大文件。
(2)SEQUENCEFILE格式:将大量小文件压缩成一个SEQUENCEFILE文件。
(3)CombineFileInputFormat:在map和rece处理之前组合小文件。
(4)HDFS Federation:HDFS联盟,使用多个namenode节点管理文件。
除此之外,我们还可以通过设置hive的参数来合并小文件。
(1)输入阶段合并
需要更改Hive的输入文件格式,即参数hive.input.format,默认值是org.apache.hadoop.hive.ql.io.HiveInputFormat,我们改成org.apache.hadoop.hive.ql.io.CombineHiveInputFormat。这样比起上面对mapper数的调整,会多出两个参数,分别是mapred.min.split.size.per.node和mapred.min.split.size.per.rack,含义是单节点和单机架上的最小split大小。如果发现有split大小小于这两个值(默认都是100MB),则会进行合并。具体逻辑可以参看Hive源码中的对应类。
(2)输出阶段合并
直接将hive.merge.mapfiles和hive.merge.mapredfiles都设为true即可,前者表示将map-only任务的输出合并,后者表示将map-rece任务的输出合并,Hive会额外启动一个mr作业将输出的小文件合并成大文件。另外,hive.merge.size.per.task可以指定每个task输出后合并文件大小的期望值,hive.merge.size.smallfiles.avgsize可以指定所有输出文件大小的均值阈值,默认值都是1GB。如果平均大小不足的话,就会另外启动一个任务来进行合并。
2.2 表的存储格式
通过InputFormat和OutputFormat可以看出表的存储格式是TEXT类型,Hive支持TEXTFILE, SEQUENCEFILE, AVRO, RCFILE, ORC,以及PARQUET文件格式,可以通过两种方式指定表的文件格式:
(1)CREATE TABLE ... STORE AS <file_format>:在建表时指定文件格式,默认是TEXTFILE
(2)ALTER TABLE ... [PARTITION partition_spec] SET FILEFORMAT <file_format>:修改具体表的文件格式
如果要改变创建表的默认文件格式,可以使用set
hive.default.fileformat=<file_format>进行配置,适用于所有表。同时也可以使用set
hive.default.fileformat.managed = <file_format>进行配置,仅适用于内部表或外部表。
扩展:不同存储方式的情况
TEXT,
SEQUENCE和
AVRO文件是面向行的文件存储格式,不是最佳的文件格式,因为即便只查询一列数据,使用这些存储格式的表也需要读取完整的一行数据。另一方面,面向列的存储格式(RCFILE,
ORC, PARQUET)可以很好地解决上面的问题。关于每种文件格式的说明,如下:
(1)TEXTFILE
创建表时的默认文件格式,数据被存储成文本格式。文本文件可以被分割和并行处理,也可以使用压缩,比如GZip、LZO或者Snappy。然而大部分的压缩文件不支持分割和并行处理,会造成一个作业只有一个mapper去处理数据,使用压缩的文本文件要确保文件不要过大,一般接近两个HDFS块的大小。
(2)SEQUENCEFILE
key/value对的二进制存储格式,sequence文件的优势是比文本格式更好压缩,sequence文件可以被压缩成块级别的记录,块级别的压缩是一个很好的压缩比例。如果使用块压缩,需要使用下面的配置:set
hive.exec.compress.output=true; set io.seqfile.compression.type=BLOCK
(3)AVRO
二进制格式文件,除此之外,avro也是一个序列化和反序列化的框架。avro提供了具体的数据schema。
(4)RCFILE
全称是Record Columnar File,首先将表分为几个行组,对每个行组内的数据进行按列存储,每一列的数据都是分开存储,即先水平划分,再垂直划分。
(5)ORC
全称是Optimized Row Columnar,从hive0.11版本开始支持,ORC格式是RCFILE格式的一种优化的格式,提供了更大的默认块(256M)
(6)PARQUET
另外一种列式存储的文件格式,与ORC非常类似,与ORC相比,Parquet格式支持的生态更广,比如低版本的impala不支持ORC格式。
配置同样数据同样字段的两张表,以常见的TEXT行存储和ORC列存储两种存储方式为例,对比执行速度。
TEXT存储方式
总结: 从上图中可以看出列存储在对指定列进行查询时,速度更快, 建议在建表时设置列存储的存储方式 。
2.3 表的压缩
对Hive表进行压缩是常见的优化手段,一些存储方式自带压缩选择,比如SEQUENCEFILE支持三种压缩选择:NONE,RECORD,BLOCK。Record压缩率低,一般建议使用BLOCK压缩;
ORC支持三种压缩选择:NONE,ZLIB,SNAPPY。我们以TEXT存储方式和ORC存储方式为例,查看表的压缩情况。
配置同样数据同样字段的四张表,一张TEXT存储方式,另外三张分别是默认压缩方式的ORC存储、SNAPPY压缩方式的ORC存储和NONE压缩方式的ORC存储,查看在hdfs上的存储情况:
TEXT存储方式
默认压缩ORC存储方式
SNAPPY压缩的ORC存储方式
NONE压缩的ORC存储方式
总结 :可以看到ORC存储方式将数据存放为两个block,默认压缩大小加起来134.69M,SNAPPY压缩大小加起来196.67M,NONE压缩大小加起来247.55M,TEXT存储方式的文件大小为366.58M,且默认block两种存储方式分别为256M和128M,ORC默认的压缩方式比SNAPPY压缩得到的文件还小,原因是ORZ默认的ZLIB压缩方式采用的是deflate压缩算法,比Snappy压缩算法得到的压缩比高,压缩的文件更小。 ORC不同压缩方式之间的执行速度,经过多次测试发现三种压缩方式的执行速度差不多,所以建议采用ORC默认的存储方式进行存储数据。
2.4 分桶分区
Num Buckets表示桶的数量,我们可以通过分桶和分区操作对Hive表进行优化:
对于一张较大的表,可以将它设计成分区表,如果不设置成分区表,数据是全盘扫描的,设置成分区表后,查询时只在指定的分区中进行数据扫描,提升查询效率。要注意尽量避免多级分区,一般二级分区足够使用。常见的分区字段:
(1)日期或者时间,比如year、month、day或者hour,当表中存在时间或者日期字段时,可以使用些字段。
(2)地理位置,比如国家、省份、城市等
(3)业务逻辑,比如部门、销售区域、客户等等
与分区表类似,分桶表的组织方式是将HDFS上的一张大表文件分割成多个文件。分桶是相对分区进行更细粒度的划分,分桶将整个数据内容按照分桶字段属性值得hash值进行区分,分桶可以加快数据采样,也可以提升join的性能(join的字段是分桶字段),因为分桶可以确保某个key对应的数据在一个特定的桶内(文件),所以巧妙地选择分桶字段可以大幅度提升join的性能。通常情况下,分桶字段可以选择经常用在过滤操作或者join操作的字段。
创建分桶表
create
table test_user_bucket(id int, name string,code string,code_id string )
clustered by(id) into 3 buckets ROW FORMAT DELIMITED FIELDS TERMINATED
BY ',';
查看描述信息
DESCRIBE FORMATTED test_user_bucket
多出了如下信息
查看该表的hdfs
同样的数据查看普通表和分桶表查询效率
普通表
分桶表
普通表是全表扫描,分桶表在按照分桶字段的hash值分桶后,根据join字段或者where过滤字段在特定的桶中进行扫描,效率提升。
本文首发于: 数栈研习社
数栈是云原生—站式数据中台PaaS,我们在github上有一个有趣的开源项目: FlinkX
FlinkX是一个基于Flink的批流统一的数据同步工具,既可以采集静态的数据,比如MySQL,HDFS等,也可以采集实时变化的数据,比如MySQL
binlog,Kafka等,是全域、异构、批流一体的数据同步引擎,大家如果有兴趣,欢迎来github社区找我们玩~
③ Spark SQL怎么创建编程创建DataFrame
创建DataFrame在Spark SQL中,开发者可以非常便捷地将各种内、外部的单机、分布式数据转换为DataFrame。以下Python示例代码充分体现了Spark SQL 1.3.0中DataFrame数据源的丰富多样和简单易用:
# 从Hive中的users表构造DataFrame
users = sqlContext.table("users")
# 加载S3上的JSON文件
logs = sqlContext.load("s3n://path/to/data.json", "json")
# 加载HDFS上的Parquet文件
clicks = sqlContext.load("hdfs://path/to/data.parquet", "parquet")
# 通过JDBC访问MySQL
comments = sqlContext.jdbc("jdbc:mysql://localhost/comments", "user")
# 将普通RDD转变为DataFrame
rdd = sparkContext.textFile("article.txt") \
.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.receByKey(lambda a, b: a + b) \
wordCounts = sqlContext.createDataFrame(rdd, ["word", "count"])
# 将本地数据容器转变为DataFrame
data = [("Alice", 21), ("Bob", 24)]
people = sqlContext.createDataFrame(data, ["name", "age"])
# 将Pandas DataFrame转变为Spark DataFrame(Python API特有功能)
sparkDF = sqlContext.createDataFrame(pandasDF)
④ sparksql 表定义 存储在哪
Spark SQL是支持在Spark中使用Sql、HiveSql、Scala中的关系型查询表达式。它的核心组件是一个新增的RDD类型SchemaRDD,它把行对象用一个Schema来描述行里面的所有列的数据类型,它就像是关系型数据库里面的一张表。它可以从原有的RDD创建,也可以是Parquet文件,最重要的是它可以支持用HiveQL从hive里面读取数据。
下面是一些案例,可以在Spark shell当中运行。
首先我们要创建一个熟悉的Context,熟悉spark的人都知道吧,有了Context我们才可以进行各种操作。
⑤ 怎么创建一个hive的parquet的数据文档
怎么创建一个hive的parquet的数据文档
用access建立一个数据库例子如下:
单击“开始”→”所有程序“→”Microsoft Office“→”Microsoft Access 2010“,打开Microsoft Access 2010软件
在打开的Microsoft Access 2010软件中选中“空数据库”双击即可创建。或者打击“创建”命令按钮创建,其中文件名处可以选择指定具体的文件名和路径。
创建空白数据库后可以看到一个新建了一个名为“表1”的数据表,界面右侧显示了它的字段。若要添加字段,可以单击“单击以添加”旁的倒三角箭头,选择要添加的字段类型。
此时光标会定位在字段名称上,可以对字段名称进行重命名,重命名字段名称后按下回车键,将继续下一个字段的添加操作。
通过以上的步骤就可以创建一个包含单个数据表的简易的空白数据库了。
⑥ 使用Hive SQL插入动态分区的Parquet表OOM异常分析
1.异常描述
当运行“INSERT ... SELECT”语句向 Parquet 或者 ORC 格式的表中插入数据时,如果启用了动态分区,你可能会碰到以下错误,而导致作业无法正常执行。
Hive 客户端:
(可左右滑动)
YARN 的 8088 中查看具体 map task 报错:
(可左右滑动)
2.异常分析
Parquet 和 ORC 是列式批处理文件格式。这些格式要求在写入文件之前将批次的行(batches of rows)缓存在内存中。在执行 INSERT 语句时,动态分区目前的实现是:至少为每个动态分区目录打开一个文件写入器(file writer)。由于这些缓冲区是按分区维护的,因此在运行时所需的内存量随着分区数量的增加而增加。所以经常会导致 mappers 或 recers 的 OOM,具体取决于打开的文件写入器(file writer)的数量。
通过 INSERT 语句插入数据到动态分区表中,也可能会超过 HDFS 同时打开文件数的限制。
如果没有 join 或聚合,INSERT ... SELECT 语句会被转换为只有 map 任务的作业。mapper 任务会读取输入记录然后将它们发送到目标分区目录。在这种情况下,每个 mapper 必须为遇到的每个动态分区创建一个新的文件写入器(file writer)。mapper 在运行时所需的内存量随着它遇到的分区数量的增加而增加。
3.异常重现与解决
3.1.生成动态分区的几个参数说明
hive.exec.dynamic.partition
默认值:false
是否开启动态分区功能,默认 false 关闭。
使用动态分区时候,该参数必须设置成 true;
hive.exec.dynamic.partition.mode
默认值:strict
动态分区的模式,默认 strict,表示必须指定至少一个分区为静态分区,nonstrict 模式表示允许所有的分区字段都可以使用动态分区。
一般需要设置为 nonstrict
hive.exec.max.dynamic.partitions.pernode
默认值:100
在每个执行 MR 的节点上,最大可以创建多少个动态分区。
该参数需要根据实际的数据来设定。
比如:源数据中包含了一年的数据,即 day 字段有 365 个值,那么该参数就需要设置成大于 365,如果使用默认值 100,则会报错。
hive.exec.max.dynamic.partitions
默认值:1000
在所有执行 MR 的节点上,最大一共可以创建多少个动态分区。
同上参数解释。
hive.exec.max.created.files
默认值:100000
整个 MR Job 中,最大可以创建多少个 HDFS 文件。
一般默认值足够了,除非你的数据量非常大,需要创建的文件数大于 100000,可根据实际情况加以调整。
maprece.map.memory.mb
map 任务的物理内存分配值,常见设置为 1GB,2GB,4GB 等。
maprece.map.java.opts
map 任务的 Java 堆栈大小设置,一般设置为小于等于上面那个值的 75%,这样可以保证 map 任务有足够的堆栈外内存空间。
maprece.input.fileinputformat.split.maxsize
maprece.input.fileinputformat.split.minsize
这个两个参数联合起来用,主要是为了方便控制 maprece 的 map 数量。比如我设置为 1073741824,就是为了让每个 map 处理 1GB 的文件。
3.2.一个例子
Fayson 在前两天给人调一个使用 Hive SQL 插入动态分区的 Parquet 表时,总是报错 OOM,也是折腾了很久。以下我们来看看整个过程。
1.首先我们看看执行脚本的内容,基本其实就是使用 Hive 的 insert 语句将文本数据表插入到另外一张 parquet 表中,当然使用了动态分区。
2.我们看看原始数据文件,是文本文件,一共 120 个,每个 30GB 大小,总共差不多 3.6TB。
3.我们看看报错
4.因为是一个只有 map 的 maprece 任务,当我们从 YARN 的 8088 观察这个作业时可以发现,基本没有一个 map 能够执行成功,全部都是失败的。报上面的错误。
5.把 maprece.map.memory.mb 从 2GB 增大到 4GB,8GB,16GB,相应 maprece.map.java.opts 增大到 3GB,6GB,12GB。依旧报错 OOM。
6.后面又将 maprece.input.fileinputformat.split.maxsize 从 1GB,减少为 512MB,256MB,从而增大 map 数量,缩小单个 map 处理文件的大小。依旧报错 OOM。
7.最后启用 hive.optimize.sort.dynamic.partition,增加 rece 过程,作业执行成功。
8.最后查看结果文件大约 1.2TB,约为输入文件的三分之一。一共 1557 个分区,最大的分区文件为 2GB。
4.异常总结
对于这个异常,我们建议有以下三种方式来处理:
1.启用 hive.optimize.sort.dynamic.partition,将其设置为 true。通过这个优化,这个只有 map 任务的 maprece 会引入 rece 过程,这样动态分区的那个字段比如日期在传到 recer 时会被排序。由于分区字段是排序的,因此每个 recer 只需要保持一个文件写入器(file writer)随时处于打开状态,在收到来自特定分区的所有行后,关闭记录写入器(record writer),从而减小内存压力。这种优化方式在写 parquet 文件时使用的内存要相对少一些,但代价是要对分区字段进行排序。
2.第二种方式就是增加每个 mapper 的内存分配,即增大 maprece.map.memory.mb 和 maprece.map.java.opts,这样所有文件写入器(filewriter)缓冲区对应的内存会更充沛。
3.将查询分解为几个较小的查询,以减少每个查询创建的分区数量。这样可以让每个 mapper 打开较少的文件写入器(file writer)。
备注:
默认情况下,Hive 为每个打开的 Parquet 文件缓冲区(file buffer)分配 128MB。这个 buffer 大小由参数 parquet.block.size 控制。为获得最佳性能,parquet 的 buffer size 需要与 HDFS 的 block size 保持对齐(比如相等),从而使每个 parquet 文件在单个 HDFS 的块中,以便每个 I/O 请求都可以读取整个数据文件,而无需通过网络传输访问后续的 block。
参考:
https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties
http://blog.cloudera.com/blog/2014/03/how-to-use-parquet-with-impala-hive-pig-maprece/
https://www.cloudera.com/documentation/enterprise/latest/topics/cdh_ig_parquet.html
https://issues.cloudera.org/browse/IMPALA-2521
https://issues.apache.org/jira/browse/HIVE-6455
http://blog.csdn.net/qq_26937525/article/details/54946281
⑦ spark parquet只能用于spark sql么
1)过去整个业界对大数据的分析的技术栈的Pipeline一般分为以下两种方式:
a)Data Source -> HDFS -> MR/Hive/Spark(相当于ETL)-> HDFS Parquet -> Spark SQL/Impala -> ResultService(可以放在DB中,也有可能被通过JDBC/ODBC来作为数据服务使用);
b)Data Source -> Real timeupdate data to HBase/DB -> Export to Parquet -> Spark SQL/Impala -> ResultService(可以放在DB中,也有可能被通过JDBC/ODBC来作为数据服务使用);
上述的第二种方式完全可以通过Kafka+Spark Streaming+Spark SQL(内部也强烈建议采用Parquet的方式来存储数据)的方式取代
2)期待的方式:DataSource -> Kafka -> Spark Streaming -> Parq
⑧ DB2时间戳数据导入到SparkSql时,怎样转换为SparkSql中的timestamp
Spark SQL是支持在Spark中使用Sql、HiveSql、Scala中的关系型查询表达式。
它的核心组件是一个新增的RDD类型SchemaRDD,它把行对象用一个Schema来描述行里面的所有列的数据类型,它就像是关系型数据库里面的一张表。
它可以从原有的RDD创建,也可以是Parquet文件
最重要的是它可以支持用HiveQL从hive里面读取数据。
⑨ 请问用SQl语言怎么把Date型的日期减两个月
SELECT DATEADD(month,-2,你的日期)
⑩ 怎么看spark sql表的存储格式是text还是parquet
标签:hive和sparksql计算引擎在text导入parquet格式的hive存储引擎分片数量机制
表的hive导入:
create table XXXXXXX201512 (N多字段构成)STORED AS PARQUETFILE;
insert into XXXXXXX201512 select * from XXXXXXX20151231;