當前位置:首頁 » 編程語言 » sql建表parquet格式
擴展閱讀
webinf下怎麼引入js 2023-08-31 21:54:13
堡壘機怎麼打開web 2023-08-31 21:54:11

sql建表parquet格式

發布時間: 2022-10-30 05:49:00

① Spark 處理小文件

不論是Hive還是Spark sql在使用過程中都可能會遇到小文件過多的問題。小文件過多最直接的表現是任務執行時間長,查看Spark log會發現大量的數據移動的日誌。我們可以查看log中展現的日誌信息,去對應的路徑下查看文件的大小和個數。

通過上述命令可以查看文件的個數以及大小。count查看出的文件大小單位是B,需要轉換為MB。

在spark官方的推薦文檔中,parquet格式的文件推薦大小是128MB,小於該大小的均可以稱之為小文件,在實際的工作,往往小文件的大小僅僅為幾KB,表現為,可能文件大小為幾百MB,但是文件個數可能到達了幾十萬個。一般來說,我們可以通過簡單相除獲得文件的平均大小,如果文件數目不多,我們也可以通過下述命令獲得每個文件的大小。

1.任務執行時間長

2.真實的文件大小獨佔一個數據存儲塊,存放到DataNode節點中。同時 DataNode一般默認存三份副本,以保障數據安全。同時該文件所存放的位置也寫入到NameNode的內存中,如果有Secondary NameNode高可用節點,也可同時復制一份過去。NameNode的內存數據將會存放到硬碟中,如果HDFS發生重啟,將產生較長時間的元數據從硬碟讀到內存的過程。

3.不論在Hive還是在Spark中,每一個存儲塊都對應一個Map程序,一個Map呈現就需要一個JVM,啟動一個JVM去讀取或者寫小文件是吃力不討好的行為。在實際的生產中,為了更好的管理集群資源,一般會要求程序執行時限制Executor數量和每個Executor的核心數量,需要頻繁創建Executor來讀取寫入。

5.影響磁碟定址時間

小文件合並,本質上就是通過某種操作,將一系列小文件合並成大文件。我們知道,以MapRece為代表的大數據系統,都習慣用K-V鍵值對的形式來處理文件,最後文件落盤,也是一個rece對應一個輸出文件。所以直觀上,我們可以減少rece數量,達到減少文件數量的目的。

從Map到Rece需要一個Shuffle過程,所以我們將小文件合並理解為通過一個Shuffle,合並小文件成一個大文件。基於這樣的思想,我們的策略可以分為兩類:一類是原來的計算已經有Shuffle了,那麼我們可以認為控制輸出文件的數量;二類是強制觸發Shuffle,進行小文件合並。

1-設置參數 (一般用於Hive)

2-distribute by rand()

往動態分區插入數據時,在已經寫好的SQL末尾加上distribute by rand()

該運算元只是起到打散的效果,但是我們還要設置文件的大小,以免打散後仍然有小文件。

表示每個rece的大小,Hive可以數據總量,得到rece個數,假設hive認為會有10個rece,那麼,這里rand()則會為 x % 10

3-group by

我們知道,group by運算元會觸發Shuffle,因此只要我們設置好Shuffle時的文件個數就好,在Spark SQL中,我們可以設置partition個數,因為一個partition會對應一個文件。

上述的操作,會觸發shuffle,因此我們再設置partition個數。

則表示,shuffle後,只會產生10個partition.

4-repartition()

5-coalesce()

需要注意的是,4和5都是spark 2.4以及以後才會支持的。

② Hive優化之Hive的配置參數優化

Hive是大數據領域常用的組件之一,主要用於大數據離線數倉的運算,關於Hive的性能調優在日常工作和面試中是經常涉及的一個點,因此掌握一些Hive調優是必不可少的一項技能。影響Hive效率的主要因素有數據傾斜、數據冗餘、job的IO以及不同底層引擎配置情況和Hive本身參數和HiveSQL的執行等。本文主要從建表配置參數方面對Hive優化進行講解。

1. 創建一個普通表

table test_user1(id int, name string,code string,code_id string ) ROW FORMAT DELIMITED FIELDS TERMINATED  BY ',';

2. 查看這張表的信息

DESCRIBE FORMATTED  test_user1;

我們從該表的描述信息介紹建表時的一些可優化點。

2.1 表的文件數

numFiles表示表中含有的文件數,當文件數過多時可能意味著該表的小文件過多,這時候我們可以針對小文件的問題進行一些優化,HDFS本身提供了解決方案:

(1)Hadoop Archive/HAR:將小文件打包成大文件。

(2)SEQUENCEFILE格式:將大量小文件壓縮成一個SEQUENCEFILE文件。

(3)CombineFileInputFormat:在map和rece處理之前組合小文件。

(4)HDFS Federation:HDFS聯盟,使用多個namenode節點管理文件。

除此之外,我們還可以通過設置hive的參數來合並小文件。

(1)輸入階段合並

需要更改Hive的輸入文件格式,即參數hive.input.format,默認值是org.apache.hadoop.hive.ql.io.HiveInputFormat,我們改成org.apache.hadoop.hive.ql.io.CombineHiveInputFormat。這樣比起上面對mapper數的調整,會多出兩個參數,分別是mapred.min.split.size.per.node和mapred.min.split.size.per.rack,含義是單節點和單機架上的最小split大小。如果發現有split大小小於這兩個值(默認都是100MB),則會進行合並。具體邏輯可以參看Hive源碼中的對應類。

(2)輸出階段合並

直接將hive.merge.mapfiles和hive.merge.mapredfiles都設為true即可,前者表示將map-only任務的輸出合並,後者表示將map-rece任務的輸出合並,Hive會額外啟動一個mr作業將輸出的小文件合並成大文件。另外,hive.merge.size.per.task可以指定每個task輸出後合並文件大小的期望值,hive.merge.size.smallfiles.avgsize可以指定所有輸出文件大小的均值閾值,默認值都是1GB。如果平均大小不足的話,就會另外啟動一個任務來進行合並。

2.2 表的存儲格式

通過InputFormat和OutputFormat可以看出表的存儲格式是TEXT類型,Hive支持TEXTFILE, SEQUENCEFILE, AVRO, RCFILE, ORC,以及PARQUET文件格式,可以通過兩種方式指定表的文件格式:

(1)CREATE TABLE ... STORE AS <file_format>:在建表時指定文件格式,默認是TEXTFILE

(2)ALTER TABLE ... [PARTITION partition_spec] SET FILEFORMAT <file_format>:修改具體表的文件格式

如果要改變創建表的默認文件格式,可以使用set

hive.default.fileformat=<file_format>進行配置,適用於所有表。同時也可以使用set

hive.default.fileformat.managed = <file_format>進行配置,僅適用於內部表或外部表。

擴展:不同存儲方式的情況

TEXT,

SEQUENCE和

AVRO文件是面向行的文件存儲格式,不是最佳的文件格式,因為即便只查詢一列數據,使用這些存儲格式的表也需要讀取完整的一行數據。另一方面,面向列的存儲格式(RCFILE,

ORC, PARQUET)可以很好地解決上面的問題。關於每種文件格式的說明,如下:

(1)TEXTFILE

創建表時的默認文件格式,數據被存儲成文本格式。文本文件可以被分割和並行處理,也可以使用壓縮,比如GZip、LZO或者Snappy。然而大部分的壓縮文件不支持分割和並行處理,會造成一個作業只有一個mapper去處理數據,使用壓縮的文本文件要確保文件不要過大,一般接近兩個HDFS塊的大小。

(2)SEQUENCEFILE

key/value對的二進制存儲格式,sequence文件的優勢是比文本格式更好壓縮,sequence文件可以被壓縮成塊級別的記錄,塊級別的壓縮是一個很好的壓縮比例。如果使用塊壓縮,需要使用下面的配置:set

hive.exec.compress.output=true; set io.seqfile.compression.type=BLOCK

(3)AVRO

二進制格式文件,除此之外,avro也是一個序列化和反序列化的框架。avro提供了具體的數據schema。

(4)RCFILE

全稱是Record Columnar File,首先將表分為幾個行組,對每個行組內的數據進行按列存儲,每一列的數據都是分開存儲,即先水平劃分,再垂直劃分。

(5)ORC

全稱是Optimized Row Columnar,從hive0.11版本開始支持,ORC格式是RCFILE格式的一種優化的格式,提供了更大的默認塊(256M)

(6)PARQUET

另外一種列式存儲的文件格式,與ORC非常類似,與ORC相比,Parquet格式支持的生態更廣,比如低版本的impala不支持ORC格式。

配置同樣數據同樣欄位的兩張表,以常見的TEXT行存儲和ORC列存儲兩種存儲方式為例,對比執行速度。

TEXT存儲方式

總結: 從上圖中可以看出列存儲在對指定列進行查詢時,速度更快, 建議在建表時設置列存儲的存儲方式 。

2.3 表的壓縮

對Hive表進行壓縮是常見的優化手段,一些存儲方式自帶壓縮選擇,比如SEQUENCEFILE支持三種壓縮選擇:NONE,RECORD,BLOCK。Record壓縮率低,一般建議使用BLOCK壓縮;

ORC支持三種壓縮選擇:NONE,ZLIB,SNAPPY。我們以TEXT存儲方式和ORC存儲方式為例,查看錶的壓縮情況。

配置同樣數據同樣欄位的四張表,一張TEXT存儲方式,另外三張分別是默認壓縮方式的ORC存儲、SNAPPY壓縮方式的ORC存儲和NONE壓縮方式的ORC存儲,查看在hdfs上的存儲情況:

TEXT存儲方式

默認壓縮ORC存儲方式

SNAPPY壓縮的ORC存儲方式

NONE壓縮的ORC存儲方式

總結 :可以看到ORC存儲方式將數據存放為兩個block,默認壓縮大小加起來134.69M,SNAPPY壓縮大小加起來196.67M,NONE壓縮大小加起來247.55M,TEXT存儲方式的文件大小為366.58M,且默認block兩種存儲方式分別為256M和128M,ORC默認的壓縮方式比SNAPPY壓縮得到的文件還小,原因是ORZ默認的ZLIB壓縮方式採用的是deflate壓縮演算法,比Snappy壓縮演算法得到的壓縮比高,壓縮的文件更小。 ORC不同壓縮方式之間的執行速度,經過多次測試發現三種壓縮方式的執行速度差不多,所以建議採用ORC默認的存儲方式進行存儲數據。

2.4 分桶分區

Num Buckets表示桶的數量,我們可以通過分桶和分區操作對Hive表進行優化:

對於一張較大的表,可以將它設計成分區表,如果不設置成分區表,數據是全盤掃描的,設置成分區表後,查詢時只在指定的分區中進行數據掃描,提升查詢效率。要注意盡量避免多級分區,一般二級分區足夠使用。常見的分區欄位:

(1)日期或者時間,比如year、month、day或者hour,當表中存在時間或者日期欄位時,可以使用些欄位。

(2)地理位置,比如國家、省份、城市等

(3)業務邏輯,比如部門、銷售區域、客戶等等

與分區表類似,分桶表的組織方式是將HDFS上的一張大表文件分割成多個文件。分桶是相對分區進行更細粒度的劃分,分桶將整個數據內容按照分桶欄位屬性值得hash值進行區分,分桶可以加快數據采樣,也可以提升join的性能(join的欄位是分桶欄位),因為分桶可以確保某個key對應的數據在一個特定的桶內(文件),所以巧妙地選擇分桶欄位可以大幅度提升join的性能。通常情況下,分桶欄位可以選擇經常用在過濾操作或者join操作的欄位。

創建分桶表

create

table test_user_bucket(id int, name string,code string,code_id string )

clustered by(id) into 3 buckets ROW FORMAT DELIMITED FIELDS TERMINATED 

BY ',';

查看描述信息

DESCRIBE FORMATTED test_user_bucket

多出了如下信息

查看該表的hdfs

同樣的數據查看普通表和分桶表查詢效率

普通表

分桶表

普通表是全表掃描,分桶表在按照分桶欄位的hash值分桶後,根據join欄位或者where過濾欄位在特定的桶中進行掃描,效率提升。

本文首發於: 數棧研習社

數棧是雲原生—站式數據中台PaaS,我們在github上有一個有趣的開源項目: FlinkX

FlinkX是一個基於Flink的批流統一的數據同步工具,既可以採集靜態的數據,比如MySQL,HDFS等,也可以採集實時變化的數據,比如MySQL

binlog,Kafka等,是全域、異構、批流一體的數據同步引擎,大家如果有興趣,歡迎來github社區找我們玩~

③ Spark SQL怎麼創建編程創建DataFrame

創建DataFrame在Spark SQL中,開發者可以非常便捷地將各種內、外部的單機、分布式數據轉換為DataFrame。以下Python示例代碼充分體現了Spark SQL 1.3.0中DataFrame數據源的豐富多樣和簡單易用:
# 從Hive中的users表構造DataFrame
users = sqlContext.table("users")

# 載入S3上的JSON文件
logs = sqlContext.load("s3n://path/to/data.json", "json")

# 載入HDFS上的Parquet文件
clicks = sqlContext.load("hdfs://path/to/data.parquet", "parquet")

# 通過JDBC訪問MySQL
comments = sqlContext.jdbc("jdbc:mysql://localhost/comments", "user")

# 將普通RDD轉變為DataFrame
rdd = sparkContext.textFile("article.txt") \
.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.receByKey(lambda a, b: a + b) \
wordCounts = sqlContext.createDataFrame(rdd, ["word", "count"])

# 將本地數據容器轉變為DataFrame
data = [("Alice", 21), ("Bob", 24)]
people = sqlContext.createDataFrame(data, ["name", "age"])

# 將Pandas DataFrame轉變為Spark DataFrame(Python API特有功能)
sparkDF = sqlContext.createDataFrame(pandasDF)

④ sparksql 表定義 存儲在哪

Spark SQL是支持在Spark中使用Sql、HiveSql、Scala中的關系型查詢表達式。它的核心組件是一個新增的RDD類型SchemaRDD,它把行對象用一個Schema來描述行裡面的所有列的數據類型,它就像是關系型資料庫裡面的一張表。它可以從原有的RDD創建,也可以是Parquet文件,最重要的是它可以支持用HiveQL從hive裡面讀取數據。
下面是一些案例,可以在Spark shell當中運行。
首先我們要創建一個熟悉的Context,熟悉spark的人都知道吧,有了Context我們才可以進行各種操作。

⑤ 怎麼創建一個hive的parquet的數據文檔

怎麼創建一個hive的parquet的數據文檔
用access建立一個資料庫例子如下:
單擊「開始」→」所有程序「→」Microsoft Office「→」Microsoft Access 2010「,打開Microsoft Access 2010軟體
在打開的Microsoft Access 2010軟體中選中「空資料庫」雙擊即可創建。或者打擊「創建」命令按鈕創建,其中文件名處可以選擇指定具體的文件名和路徑。
創建空白資料庫後可以看到一個新建了一個名為「表1」的數據表,界面右側顯示了它的欄位。若要添加欄位,可以單擊「單擊以添加」旁的倒三角箭頭,選擇要添加的欄位類型。
此時游標會定位在欄位名稱上,可以對欄位名稱進行重命名,重命名欄位名稱後按下回車鍵,將繼續下一個欄位的添加操作。
通過以上的步驟就可以創建一個包含單個數據表的簡易的空白資料庫了。

⑥ 使用Hive SQL插入動態分區的Parquet表OOM異常分析

1.異常描述

當運行「INSERT ... SELECT」語句向 Parquet 或者 ORC 格式的表中插入數據時,如果啟用了動態分區,你可能會碰到以下錯誤,而導致作業無法正常執行。

Hive 客戶端:

(可左右滑動)

YARN 的 8088 中查看具體 map task 報錯:

(可左右滑動)

2.異常分析

Parquet 和 ORC 是列式批處理文件格式。這些格式要求在寫入文件之前將批次的行(batches of rows)緩存在內存中。在執行 INSERT 語句時,動態分區目前的實現是:至少為每個動態分區目錄打開一個文件寫入器(file writer)。由於這些緩沖區是按分區維護的,因此在運行時所需的內存量隨著分區數量的增加而增加。所以經常會導致 mappers 或 recers 的 OOM,具體取決於打開的文件寫入器(file writer)的數量。

通過 INSERT 語句插入數據到動態分區表中,也可能會超過 HDFS 同時打開文件數的限制。

如果沒有 join 或聚合,INSERT ... SELECT 語句會被轉換為只有 map 任務的作業。mapper 任務會讀取輸入記錄然後將它們發送到目標分區目錄。在這種情況下,每個 mapper 必須為遇到的每個動態分區創建一個新的文件寫入器(file writer)。mapper 在運行時所需的內存量隨著它遇到的分區數量的增加而增加。

3.異常重現與解決

3.1.生成動態分區的幾個參數說明

hive.exec.dynamic.partition

默認值:false

是否開啟動態分區功能,默認 false 關閉。

使用動態分區時候,該參數必須設置成 true;

hive.exec.dynamic.partition.mode

默認值:strict

動態分區的模式,默認 strict,表示必須指定至少一個分區為靜態分區,nonstrict 模式表示允許所有的分區欄位都可以使用動態分區。

一般需要設置為 nonstrict

hive.exec.max.dynamic.partitions.pernode

默認值:100

在每個執行 MR 的節點上,最大可以創建多少個動態分區。

該參數需要根據實際的數據來設定。

比如:源數據中包含了一年的數據,即 day 欄位有 365 個值,那麼該參數就需要設置成大於 365,如果使用默認值 100,則會報錯。

hive.exec.max.dynamic.partitions

默認值:1000

在所有執行 MR 的節點上,最大一共可以創建多少個動態分區。

同上參數解釋。

hive.exec.max.created.files

默認值:100000

整個 MR Job 中,最大可以創建多少個 HDFS 文件。

一般默認值足夠了,除非你的數據量非常大,需要創建的文件數大於 100000,可根據實際情況加以調整。

maprece.map.memory.mb

map 任務的物理內存分配值,常見設置為 1GB,2GB,4GB 等。

maprece.map.java.opts

map 任務的 Java 堆棧大小設置,一般設置為小於等於上面那個值的 75%,這樣可以保證 map 任務有足夠的堆棧外內存空間。

maprece.input.fileinputformat.split.maxsize

maprece.input.fileinputformat.split.minsize

這個兩個參數聯合起來用,主要是為了方便控制 maprece 的 map 數量。比如我設置為 1073741824,就是為了讓每個 map 處理 1GB 的文件。

3.2.一個例子

Fayson 在前兩天給人調一個使用 Hive SQL 插入動態分區的 Parquet 表時,總是報錯 OOM,也是折騰了很久。以下我們來看看整個過程。

1.首先我們看看執行腳本的內容,基本其實就是使用 Hive 的 insert 語句將文本數據表插入到另外一張 parquet 表中,當然使用了動態分區。

2.我們看看原始數據文件,是文本文件,一共 120 個,每個 30GB 大小,總共差不多 3.6TB。

3.我們看看報錯

4.因為是一個只有 map 的 maprece 任務,當我們從 YARN 的 8088 觀察這個作業時可以發現,基本沒有一個 map 能夠執行成功,全部都是失敗的。報上面的錯誤。

5.把 maprece.map.memory.mb 從 2GB 增大到 4GB,8GB,16GB,相應 maprece.map.java.opts 增大到 3GB,6GB,12GB。依舊報錯 OOM。

6.後面又將 maprece.input.fileinputformat.split.maxsize 從 1GB,減少為 512MB,256MB,從而增大 map 數量,縮小單個 map 處理文件的大小。依舊報錯 OOM。

7.最後啟用 hive.optimize.sort.dynamic.partition,增加 rece 過程,作業執行成功。

8.最後查看結果文件大約 1.2TB,約為輸入文件的三分之一。一共 1557 個分區,最大的分區文件為 2GB。

4.異常總結

對於這個異常,我們建議有以下三種方式來處理:

1.啟用 hive.optimize.sort.dynamic.partition,將其設置為 true。通過這個優化,這個只有 map 任務的 maprece 會引入 rece 過程,這樣動態分區的那個欄位比如日期在傳到 recer 時會被排序。由於分區欄位是排序的,因此每個 recer 只需要保持一個文件寫入器(file writer)隨時處於打開狀態,在收到來自特定分區的所有行後,關閉記錄寫入器(record writer),從而減小內存壓力。這種優化方式在寫 parquet 文件時使用的內存要相對少一些,但代價是要對分區欄位進行排序。

2.第二種方式就是增加每個 mapper 的內存分配,即增大 maprece.map.memory.mb 和 maprece.map.java.opts,這樣所有文件寫入器(filewriter)緩沖區對應的內存會更充沛。

3.將查詢分解為幾個較小的查詢,以減少每個查詢創建的分區數量。這樣可以讓每個 mapper 打開較少的文件寫入器(file writer)。

備註:

默認情況下,Hive 為每個打開的 Parquet 文件緩沖區(file buffer)分配 128MB。這個 buffer 大小由參數 parquet.block.size 控制。為獲得最佳性能,parquet 的 buffer size 需要與 HDFS 的 block size 保持對齊(比如相等),從而使每個 parquet 文件在單個 HDFS 的塊中,以便每個 I/O 請求都可以讀取整個數據文件,而無需通過網路傳輸訪問後續的 block。

參考:

https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties

http://blog.cloudera.com/blog/2014/03/how-to-use-parquet-with-impala-hive-pig-maprece/

https://www.cloudera.com/documentation/enterprise/latest/topics/cdh_ig_parquet.html

https://issues.cloudera.org/browse/IMPALA-2521

https://issues.apache.org/jira/browse/HIVE-6455

http://blog.csdn.net/qq_26937525/article/details/54946281

⑦ spark parquet只能用於spark sql么

1)過去整個業界對大數據的分析的技術棧的Pipeline一般分為以下兩種方式:

a)Data Source -> HDFS -> MR/Hive/Spark(相當於ETL)-> HDFS Parquet -> Spark SQL/Impala -> ResultService(可以放在DB中,也有可能被通過JDBC/ODBC來作為數據服務使用);

b)Data Source -> Real timeupdate data to HBase/DB -> Export to Parquet -> Spark SQL/Impala -> ResultService(可以放在DB中,也有可能被通過JDBC/ODBC來作為數據服務使用);
上述的第二種方式完全可以通過Kafka+Spark Streaming+Spark SQL(內部也強烈建議採用Parquet的方式來存儲數據)的方式取代

2)期待的方式:DataSource -> Kafka -> Spark Streaming -> Parq

⑧ DB2時間戳數據導入到SparkSql時,怎樣轉換為SparkSql中的timestamp

Spark SQL是支持在Spark中使用Sql、HiveSql、Scala中的關系型查詢表達式。
它的核心組件是一個新增的RDD類型SchemaRDD,它把行對象用一個Schema來描述行裡面的所有列的數據類型,它就像是關系型資料庫裡面的一張表。
它可以從原有的RDD創建,也可以是Parquet文件
最重要的是它可以支持用HiveQL從hive裡面讀取數據。

⑨ 請問用SQl語言怎麼把Date型的日期減兩個月

SELECT DATEADD(month,-2,你的日期)

⑩ 怎麼看spark sql表的存儲格式是text還是parquet

標簽:hive和sparksql計算引擎在text導入parquet格式的hive存儲引擎分片數量機制
表的hive導入:
create table XXXXXXX201512 (N多欄位構成)STORED AS PARQUETFILE;
insert into XXXXXXX201512 select * from XXXXXXX20151231;