當前位置:首頁 » 編程語言 » flink和sql哪個好
擴展閱讀
webinf下怎麼引入js 2023-08-31 21:54:13
堡壘機怎麼打開web 2023-08-31 21:54:11

flink和sql哪個好

發布時間: 2022-11-22 01:17:39

㈠ Flink sql 寫入 Hive表的性能問題

寫入Hive表的性能,每秒寫入記錄數,發現性能並不樂觀,上有節點背壓嚴重。

Hive Table DDL:

而寫入HDFS文件的性能,每秒寫入記錄數,性能符合期待。

HDFS文件的DDL:

翻閱Flink的PR,十幾天前,阿里Flink的開發同學已經注意到了這個問題,我們將之吸收到測試環境,編譯替換lib下jar包,重新測試,性能確實up了,單並發升至5W每秒,上游節點才稍微有背壓。
[FLINK-19121][hive] Avoid accessing HDFS frequently in HiveBulkWriterFactory

所以,Flink的新特性從發布到應用線上,穩定性與性能上都不能過於樂觀、聽信於官方宣傳,
司內另一教訓就是過早在熱數據存儲層啟用了Hadoop的糾刪碼,導致問題不斷,被迫退化到副本機制。
這與前期調研、驗證不足,對該特性過於輕信有莫大關系,教訓也是深刻。

底層採用Reed-Solomon(k,m)演算法,RS是一種常用的糾刪碼演算法,通過矩陣運算,可以為k位數據生成m位校驗位,根據k和m的取值不同,可以實現不同程度的容錯能力,是一種比較靈活的糾刪碼演算法。
HDFS糾刪碼技術能夠降低數據存儲的冗餘度,以RS(3,2)為例,其數據冗餘度為67%,相比Hadoop默認的200%大為減少。但是糾刪碼技術存儲數據和數據恢復都需要 消耗cpu進行計算,實際上是一種以時間換空間的選擇,因此比較適用的場景是對冷數據的存儲 。冷數據存儲的數據往往一次寫入之後長時間沒有訪問,這種情況下可以通過糾刪碼技術減少副本數。

㈡ flinksql-core-動態表

普通動態表是FlinkSQL中的一類表,表中的數據與連接的外部數據對等,可以簡單理解為把一張mysql的表放進flink內存中得到的表,並且該表與mysql表有連接關系,即該表可以讀寫mysql表。

需要聲明表的欄位定義和表屬性(連接器屬性)。語法如下:

with關鍵字前面的是欄位定義,with關鍵字後面的是表屬性。其中欄位定義時還可以聲明表主鍵,聲明語法為PARIMARY KEY(myColumn1,...) NOT ENFORCED, 這里的not enforced表示flinksql不會對主鍵做強制的唯一性約束、非空約束,而且目前flinksql中只支持這種類型的主鍵。
表屬性中有若干個屬性欄位需要聲明,具體有哪些屬性欄位取決於使用哪個連接器,如上述聲明中使用的是jdbc連接器,在使用該連接器時需要提供url、username、password等屬性,通過此連接器我們就可以讓該表能連接到對應的mysql表。

我們可以查詢flinksql普通動態表的數據,此數據與連接的外部數據是一致的。語法如下:

tips:在運行時,只會載入一次外部數據到flinksql普通動態表。後續外部數據表有更新時,flinksql的普通動態表不會跟著自動更新。

我們可以把數據寫入到flinksql動態表,從而實現寫入數據到外部系統的目的。語法如下:

㈢ 五種大數據處理架構

五種大數據處理架構
大數據是收集、整理、處理大容量數據集,並從中獲得見解所需的非傳統戰略和技術的總稱。雖然處理數據所需的計算能力或存儲容量早已超過一台計算機的上限,但這種計算類型的普遍性、規模,以及價值在最近幾年才經歷了大規模擴展。
本文將介紹大數據系統一個最基本的組件:處理框架。處理框架負責對系統中的數據進行計算,例如處理從非易失存儲中讀取的數據,或處理剛剛攝入到系統中的數據。數據的計算則是指從大量單一數據點中提取信息和見解的過程。
下文將介紹這些框架:
· 僅批處理框架:
Apache Hadoop
· 僅流處理框架:
Apache Storm
Apache Samza
· 混合框架:
Apache Spark
Apache Flink
大數據處理框架是什麼?
處理框架和處理引擎負責對數據系統中的數據進行計算。雖然「引擎」和「框架」之間的區別沒有什麼權威的定義,但大部分時候可以將前者定義為實際負責處理數據操作的組件,後者則可定義為承擔類似作用的一系列組件。
例如Apache Hadoop可以看作一種以MapRece作為默認處理引擎的處理框架。引擎和框架通常可以相互替換或同時使用。例如另一個框架Apache Spark可以納入Hadoop並取代MapRece。組件之間的這種互操作性是大數據系統靈活性如此之高的原因之一。
雖然負責處理生命周期內這一階段數據的系統通常都很復雜,但從廣義層面來看它們的目標是非常一致的:通過對數據執行操作提高理解能力,揭示出數據蘊含的模式,並針對復雜互動獲得見解。
為了簡化這些組件的討論,我們會通過不同處理框架的設計意圖,按照所處理的數據狀態對其進行分類。一些系統可以用批處理方式處理數據,一些系統可以用流方式處理連續不斷流入系統的數據。此外還有一些系統可以同時處理這兩類數據。
在深入介紹不同實現的指標和結論之前,首先需要對不同處理類型的概念進行一個簡單的介紹。
批處理系統
批處理在大數據世界有著悠久的歷史。批處理主要操作大容量靜態數據集,並在計算過程完成後返回結果。
批處理模式中使用的數據集通常符合下列特徵…
· 有界:批處理數據集代表數據的有限集合
· 持久:數據通常始終存儲在某種類型的持久存儲位置中
· 大量:批處理操作通常是處理極為海量數據集的唯一方法
批處理非常適合需要訪問全套記錄才能完成的計算工作。例如在計算總數和平均數時,必須將數據集作為一個整體加以處理,而不能將其視作多條記錄的集合。這些操作要求在計算進行過程中數據維持自己的狀態。
需要處理大量數據的任務通常最適合用批處理操作進行處理。無論直接從持久存儲設備處理數據集,或首先將數據集載入內存,批處理系統在設計過程中就充分考慮了數據的量,可提供充足的處理資源。由於批處理在應對大量持久數據方面的表現極為出色,因此經常被用於對歷史數據進行分析。
大量數據的處理需要付出大量時間,因此批處理不適合對處理時間要求較高的場合。
Apache Hadoop
Apache Hadoop是一種專用於批處理的處理框架。Hadoop是首個在開源社區獲得極大關注的大數據框架。基於谷歌有關海量數據處理所發表的多篇論文與經驗的Hadoop重新實現了相關演算法和組件堆棧,讓大規模批處理技術變得更易用。
新版Hadoop包含多個組件,即多個層,通過配合使用可處理批數據:
· HDFS:HDFS是一種分布式文件系統層,可對集群節點間的存儲和復制進行協調。HDFS確保了無法避免的節點故障發生後數據依然可用,可將其用作數據來源,可用於存儲中間態的處理結果,並可存儲計算的最終結果。
· YARN:YARN是Yet Another Resource Negotiator(另一個資源管理器)的縮寫,可充當Hadoop堆棧的集群協調組件。該組件負責協調並管理底層資源和調度作業的運行。通過充當集群資源的介面,YARN使得用戶能在Hadoop集群中使用比以往的迭代方式運行更多類型的工作負載。
· MapRece:MapRece是Hadoop的原生批處理引擎。
批處理模式
Hadoop的處理功能來自MapRece引擎。MapRece的處理技術符合使用鍵值對的map、shuffle、rece演算法要求。基本處理過程包括:
· 從HDFS文件系統讀取數據集
· 將數據集拆分成小塊並分配給所有可用節點
· 針對每個節點上的數據子集進行計算(計算的中間態結果會重新寫入HDFS)
· 重新分配中間態結果並按照鍵進行分組
· 通過對每個節點計算的結果進行匯總和組合對每個鍵的值進行「Recing」
· 將計算而來的最終結果重新寫入 HDFS
優勢和局限
由於這種方法嚴重依賴持久存儲,每個任務需要多次執行讀取和寫入操作,因此速度相對較慢。但另一方面由於磁碟空間通常是伺服器上最豐富的資源,這意味著MapRece可以處理非常海量的數據集。同時也意味著相比其他類似技術,Hadoop的MapRece通常可以在廉價硬體上運行,因為該技術並不需要將一切都存儲在內存中。MapRece具備極高的縮放潛力,生產環境中曾經出現過包含數萬個節點的應用。
MapRece的學習曲線較為陡峭,雖然Hadoop生態系統的其他周邊技術可以大幅降低這一問題的影響,但通過Hadoop集群快速實現某些應用時依然需要注意這個問題。
圍繞Hadoop已經形成了遼闊的生態系統,Hadoop集群本身也經常被用作其他軟體的組成部件。很多其他處理框架和引擎通過與Hadoop集成也可以使用HDFS和YARN資源管理器。
總結
Apache Hadoop及其MapRece處理引擎提供了一套久經考驗的批處理模型,最適合處理對時間要求不高的非常大規模數據集。通過非常低成本的組件即可搭建完整功能的Hadoop集群,使得這一廉價且高效的處理技術可以靈活應用在很多案例中。與其他框架和引擎的兼容與集成能力使得Hadoop可以成為使用不同技術的多種工作負載處理平台的底層基礎。
流處理系統
流處理系統會對隨時進入系統的數據進行計算。相比批處理模式,這是一種截然不同的處理方式。流處理方式無需針對整個數據集執行操作,而是對通過系統傳輸的每個數據項執行操作。
· 流處理中的數據集是「無邊界」的,這就產生了幾個重要的影響:
· 完整數據集只能代表截至目前已經進入到系統中的數據總量。
· 工作數據集也許更相關,在特定時間只能代表某個單一數據項。
處理工作是基於事件的,除非明確停止否則沒有「盡頭」。處理結果立刻可用,並會隨著新數據的抵達繼續更新。
流處理系統可以處理幾乎無限量的數據,但同一時間只能處理一條(真正的流處理)或很少量(微批處理,Micro-batch Processing)數據,不同記錄間只維持最少量的狀態。雖然大部分系統提供了用於維持某些狀態的方法,但流處理主要針對副作用更少,更加功能性的處理(Functional processing)進行優化。
功能性操作主要側重於狀態或副作用有限的離散步驟。針對同一個數據執行同一個操作會或略其他因素產生相同的結果,此類處理非常適合流處理,因為不同項的狀態通常是某些困難、限制,以及某些情況下不需要的結果的結合體。因此雖然某些類型的狀態管理通常是可行的,但這些框架通常在不具備狀態管理機制時更簡單也更高效。
此類處理非常適合某些類型的工作負載。有近實時處理需求的任務很適合使用流處理模式。分析、伺服器或應用程序錯誤日誌,以及其他基於時間的衡量指標是最適合的類型,因為對這些領域的數據變化做出響應對於業務職能來說是極為關鍵的。流處理很適合用來處理必須對變動或峰值做出響應,並且關注一段時間內變化趨勢的數據。
Apache Storm
Apache Storm是一種側重於極低延遲的流處理框架,也許是要求近實時處理的工作負載的最佳選擇。該技術可處理非常大量的數據,通過比其他解決方案更低的延遲提供結果。
流處理模式
Storm的流處理可對框架中名為Topology(拓撲)的DAG(Directed Acyclic Graph,有向無環圖)進行編排。這些拓撲描述了當數據片段進入系統後,需要對每個傳入的片段執行的不同轉換或步驟。
拓撲包含:
· Stream:普通的數據流,這是一種會持續抵達系統的無邊界數據。
· Spout:位於拓撲邊緣的數據流來源,例如可以是API或查詢等,從這里可以產生待處理的數據。
· Bolt:Bolt代表需要消耗流數據,對其應用操作,並將結果以流的形式進行輸出的處理步驟。Bolt需要與每個Spout建立連接,隨後相互連接以組成所有必要的處理。在拓撲的尾部,可以使用最終的Bolt輸出作為相互連接的其他系統的輸入。
Storm背後的想法是使用上述組件定義大量小型的離散操作,隨後將多個組件組成所需拓撲。默認情況下Storm提供了「至少一次」的處理保證,這意味著可以確保每條消息至少可以被處理一次,但某些情況下如果遇到失敗可能會處理多次。Storm無法確保可以按照特定順序處理消息。
為了實現嚴格的一次處理,即有狀態處理,可以使用一種名為Trident的抽象。嚴格來說不使用Trident的Storm通常可稱之為Core Storm。Trident會對Storm的處理能力產生極大影響,會增加延遲,為處理提供狀態,使用微批模式代替逐項處理的純粹流處理模式。
為避免這些問題,通常建議Storm用戶盡可能使用Core Storm。然而也要注意,Trident對內容嚴格的一次處理保證在某些情況下也比較有用,例如系統無法智能地處理重復消息時。如果需要在項之間維持狀態,例如想要計算一個小時內有多少用戶點擊了某個鏈接,此時Trident將是你唯一的選擇。盡管不能充分發揮框架與生俱來的優勢,但Trident提高了Storm的靈活性。
Trident拓撲包含:
· 流批(Stream batch):這是指流數據的微批,可通過分塊提供批處理語義。
· 操作(Operation):是指可以對數據執行的批處理過程。
優勢和局限
目前來說Storm可能是近實時處理領域的最佳解決方案。該技術可以用極低延遲處理數據,可用於希望獲得最低延遲的工作負載。如果處理速度直接影響用戶體驗,例如需要將處理結果直接提供給訪客打開的網站頁面,此時Storm將會是一個很好的選擇。
Storm與Trident配合使得用戶可以用微批代替純粹的流處理。雖然藉此用戶可以獲得更大靈活性打造更符合要求的工具,但同時這種做法會削弱該技術相比其他解決方案最大的優勢。話雖如此,但多一種流處理方式總是好的。
Core Storm無法保證消息的處理順序。Core Storm為消息提供了「至少一次」的處理保證,這意味著可以保證每條消息都能被處理,但也可能發生重復。Trident提供了嚴格的一次處理保證,可以在不同批之間提供順序處理,但無法在一個批內部實現順序處理。
在互操作性方面,Storm可與Hadoop的YARN資源管理器進行集成,因此可以很方便地融入現有Hadoop部署。除了支持大部分處理框架,Storm還可支持多種語言,為用戶的拓撲定義提供了更多選擇。
總結
對於延遲需求很高的純粹的流處理工作負載,Storm可能是最適合的技術。該技術可以保證每條消息都被處理,可配合多種編程語言使用。由於Storm無法進行批處理,如果需要這些能力可能還需要使用其他軟體。如果對嚴格的一次處理保證有比較高的要求,此時可考慮使用Trident。不過這種情況下其他流處理框架也許更適合。
Apache Samza
Apache Samza是一種與Apache Kafka消息系統緊密綁定的流處理框架。雖然Kafka可用於很多流處理系統,但按照設計,Samza可以更好地發揮Kafka獨特的架構優勢和保障。該技術可通過Kafka提供容錯、緩沖,以及狀態存儲。
Samza可使用YARN作為資源管理器。這意味著默認情況下需要具備Hadoop集群(至少具備HDFS和YARN),但同時也意味著Samza可以直接使用YARN豐富的內建功能。
流處理模式
Samza依賴Kafka的語義定義流的處理方式。Kafka在處理數據時涉及下列概念:
· Topic(話題):進入Kafka系統的每個數據流可稱之為一個話題。話題基本上是一種可供消耗方訂閱的,由相關信息組成的數據流。
· Partition(分區):為了將一個話題分散至多個節點,Kafka會將傳入的消息劃分為多個分區。分區的劃分將基於鍵(Key)進行,這樣可以保證包含同一個鍵的每條消息可以劃分至同一個分區。分區的順序可獲得保證。
· Broker(代理):組成Kafka集群的每個節點也叫做代理。
· Procer(生成方):任何向Kafka話題寫入數據的組件可以叫做生成方。生成方可提供將話題劃分為分區所需的鍵。
· Consumer(消耗方):任何從Kafka讀取話題的組件可叫做消耗方。消耗方需要負責維持有關自己分支的信息,這樣即可在失敗後知道哪些記錄已經被處理過了。
由於Kafka相當於永恆不變的日誌,Samza也需要處理永恆不變的數據流。這意味著任何轉換創建的新數據流都可被其他組件所使用,而不會對最初的數據流產生影響。
優勢和局限
乍看之下,Samza對Kafka類查詢系統的依賴似乎是一種限制,然而這也可以為系統提供一些獨特的保證和功能,這些內容也是其他流處理系統不具備的。
例如Kafka已經提供了可以通過低延遲方式訪問的數據存儲副本,此外還可以為每個數據分區提供非常易用且低成本的多訂閱者模型。所有輸出內容,包括中間態的結果都可寫入到Kafka,並可被下游步驟獨立使用。
這種對Kafka的緊密依賴在很多方面類似於MapRece引擎對HDFS的依賴。雖然在批處理的每個計算之間對HDFS的依賴導致了一些嚴重的性能問題,但也避免了流處理遇到的很多其他問題。
Samza與Kafka之間緊密的關系使得處理步驟本身可以非常鬆散地耦合在一起。無需事先協調,即可在輸出的任何步驟中增加任意數量的訂閱者,對於有多個團隊需要訪問類似數據的組織,這一特性非常有用。多個團隊可以全部訂閱進入系統的數據話題,或任意訂閱其他團隊對數據進行過某些處理後創建的話題。這一切並不會對資料庫等負載密集型基礎架構造成額外的壓力。
直接寫入Kafka還可避免回壓(Backpressure)問題。回壓是指當負載峰值導致數據流入速度超過組件實時處理能力的情況,這種情況可能導致處理工作停頓並可能丟失數據。按照設計,Kafka可以將數據保存很長時間,這意味著組件可以在方便的時候繼續進行處理,並可直接重啟動而無需擔心造成任何後果。
Samza可以使用以本地鍵值存儲方式實現的容錯檢查點系統存儲數據。這樣Samza即可獲得「至少一次」的交付保障,但面對由於數據可能多次交付造成的失敗,該技術無法對匯總後狀態(例如計數)提供精確恢復。
Samza提供的高級抽象使其在很多方面比Storm等系統提供的基元(Primitive)更易於配合使用。目前Samza只支持JVM語言,這意味著它在語言支持方面不如Storm靈活。
總結
對於已經具備或易於實現Hadoop和Kafka的環境,Apache Samza是流處理工作負載一個很好的選擇。Samza本身很適合有多個團隊需要使用(但相互之間並不一定緊密協調)不同處理階段的多個數據流的組織。Samza可大幅簡化很多流處理工作,可實現低延遲的性能。如果部署需求與當前系統不兼容,也許並不適合使用,但如果需要極低延遲的處理,或對嚴格的一次處理語義有較高需求,此時依然適合考慮。
混合處理系統:批處理和流處理
一些處理框架可同時處理批處理和流處理工作負載。這些框架可以用相同或相關的組件和API處理兩種類型的數據,藉此讓不同的處理需求得以簡化。
如你所見,這一特性主要是由Spark和Flink實現的,下文將介紹這兩種框架。實現這樣的功能重點在於兩種不同處理模式如何進行統一,以及要對固定和不固定數據集之間的關系進行何種假設。
雖然側重於某一種處理類型的項目會更好地滿足具體用例的要求,但混合框架意在提供一種數據處理的通用解決方案。這種框架不僅可以提供處理數據所需的方法,而且提供了自己的集成項、庫、工具,可勝任圖形分析、機器學習、互動式查詢等多種任務。
Apache Spark
Apache Spark是一種包含流處理能力的下一代批處理框架。與Hadoop的MapRece引擎基於各種相同原則開發而來的Spark主要側重於通過完善的內存計算和處理優化機制加快批處理工作負載的運行速度。
Spark可作為獨立集群部署(需要相應存儲層的配合),或可與Hadoop集成並取代MapRece引擎。
批處理模式
與MapRece不同,Spark的數據處理工作全部在內存中進行,只在一開始將數據讀入內存,以及將最終結果持久存儲時需要與存儲層交互。所有中間態的處理結果均存儲在內存中。
雖然內存中處理方式可大幅改善性能,Spark在處理與磁碟有關的任務時速度也有很大提升,因為通過提前對整個任務集進行分析可以實現更完善的整體式優化。為此Spark可創建代表所需執行的全部操作,需要操作的數據,以及操作和數據之間關系的Directed Acyclic Graph(有向無環圖),即DAG,藉此處理器可以對任務進行更智能的協調。
為了實現內存中批計算,Spark會使用一種名為Resilient Distributed Dataset(彈性分布式數據集),即RDD的模型來處理數據。這是一種代表數據集,只位於內存中,永恆不變的結構。針對RDD執行的操作可生成新的RDD。每個RDD可通過世系(Lineage)回溯至父級RDD,並最終回溯至磁碟上的數據。Spark可通過RDD在無需將每個操作的結果寫回磁碟的前提下實現容錯。
流處理模式
流處理能力是由Spark Streaming實現的。Spark本身在設計上主要面向批處理工作負載,為了彌補引擎設計和流處理工作負載特徵方面的差異,Spark實現了一種叫做微批(Micro-batch)*的概念。在具體策略方面該技術可以將數據流視作一系列非常小的「批」,藉此即可通過批處理引擎的原生語義進行處理。
Spark Streaming會以亞秒級增量對流進行緩沖,隨後這些緩沖會作為小規模的固定數據集進行批處理。這種方式的實際效果非常好,但相比真正的流處理框架在性能方面依然存在不足。
優勢和局限
使用Spark而非Hadoop MapRece的主要原因是速度。在內存計算策略和先進的DAG調度等機制的幫助下,Spark可以用更快速度處理相同的數據集。
Spark的另一個重要優勢在於多樣性。該產品可作為獨立集群部署,或與現有Hadoop集群集成。該產品可運行批處理和流處理,運行一個集群即可處理不同類型的任務。
除了引擎自身的能力外,圍繞Spark還建立了包含各種庫的生態系統,可為機器學習、互動式查詢等任務提供更好的支持。相比MapRece,Spark任務更是「眾所周知」地易於編寫,因此可大幅提高生產力。
為流處理系統採用批處理的方法,需要對進入系統的數據進行緩沖。緩沖機制使得該技術可以處理非常大量的傳入數據,提高整體吞吐率,但等待緩沖區清空也會導致延遲增高。這意味著Spark Streaming可能不適合處理對延遲有較高要求的工作負載。
由於內存通常比磁碟空間更貴,因此相比基於磁碟的系統,Spark成本更高。然而處理速度的提升意味著可以更快速完成任務,在需要按照小時數為資源付費的環境中,這一特性通常可以抵消增加的成本。
Spark內存計算這一設計的另一個後果是,如果部署在共享的集群中可能會遇到資源不足的問題。相比HadoopMapRece,Spark的資源消耗更大,可能會對需要在同一時間使用集群的其他任務產生影響。從本質來看,Spark更不適合與Hadoop堆棧的其他組件共存一處。
總結
Spark是多樣化工作負載處理任務的最佳選擇。Spark批處理能力以更高內存佔用為代價提供了無與倫比的速度優勢。對於重視吞吐率而非延遲的工作負載,則比較適合使用Spark Streaming作為流處理解決方案。
Apache Flink
Apache Flink是一種可以處理批處理任務的流處理框架。該技術可將批處理數據視作具備有限邊界的數據流,藉此將批處理任務作為流處理的子集加以處理。為所有處理任務採取流處理為先的方法會產生一系列有趣的副作用。
這種流處理為先的方法也叫做Kappa架構,與之相對的是更加被廣為人知的Lambda架構(該架構中使用批處理作為主要處理方法,使用流作為補充並提供早期未經提煉的結果)。Kappa架構中會對一切進行流處理,藉此對模型進行簡化,而這一切是在最近流處理引擎逐漸成熟後才可行的。
流處理模型
Flink的流處理模型在處理傳入數據時會將每一項視作真正的數據流。Flink提供的DataStream API可用於處理無盡的數據流。Flink可配合使用的基本組件包括:
· Stream(流)是指在系統中流轉的,永恆不變的無邊界數據集
· Operator(操作方)是指針對數據流執行操作以產生其他數據流的功能
· Source(源)是指數據流進入系統的入口點
· Sink(槽)是指數據流離開Flink系統後進入到的位置,槽可以是資料庫或到其他系統的連接器
為了在計算過程中遇到問題後能夠恢復,流處理任務會在預定時間點創建快照。為了實現狀態存儲,Flink可配合多種狀態後端系統使用,具體取決於所需實現的復雜度和持久性級別。
此外Flink的流處理能力還可以理解「事件時間」這一概念,這是指事件實際發生的時間,此外該功能還可以處理會話。這意味著可以通過某種有趣的方式確保執行順序和分組。
批處理模型
Flink的批處理模型在很大程度上僅僅是對流處理模型的擴展。此時模型不再從持續流中讀取數據,而是從持久存儲中以流的形式讀取有邊界的數據集。Flink會對這些處理模型使用完全相同的運行時。
Flink可以對批處理工作負載實現一定的優化。例如由於批處理操作可通過持久存儲加以支持,Flink可以不對批處理工作負載創建快照。數據依然可以恢復,但常規處理操作可以執行得更快。
另一個優化是對批處理任務進行分解,這樣即可在需要的時候調用不同階段和組件。藉此Flink可以與集群的其他用戶更好地共存。對任務提前進行分析使得Flink可以查看需要執行的所有操作、數據集的大小,以及下游需要執行的操作步驟,藉此實現進一步的優化。
優勢和局限
Flink目前是處理框架領域一個獨特的技術。雖然Spark也可以執行批處理和流處理,但Spark的流處理採取的微批架構使其無法適用於很多用例。Flink流處理為先的方法可提供低延遲,高吞吐率,近乎逐項處理的能力。
Flink的很多組件是自行管理的。雖然這種做法較為罕見,但出於性能方面的原因,該技術可自行管理內存,無需依賴原生的Java垃圾回收機制。與Spark不同,待處理數據的特徵發生變化後Flink無需手工優化和調整,並且該技術也可以自行處理數據分區和自動緩存等操作。
Flink會通過多種方式對工作進行分許進而優化任務。這種分析在部分程度上類似於SQL查詢規劃器對關系型資料庫所做的優化,可針對特定任務確定最高效的實現方法。該技術還支持多階段並行執行,同時可將受阻任務的數據集合在一起。對於迭代式任務,出於性能方面的考慮,Flink會嘗試在存儲數據的節點上執行相應的計算任務。此外還可進行「增量迭代」,或僅對數據中有改動的部分進行迭代。
在用戶工具方面,Flink提供了基於Web的調度視圖,藉此可輕松管理任務並查看系統狀態。用戶也可以查看已提交任務的優化方案,藉此了解任務最終是如何在集群中實現的。對於分析類任務,Flink提供了類似SQL的查詢,圖形化處理,以及機器學習庫,此外還支持內存計算。
Flink能很好地與其他組件配合使用。如果配合Hadoop 堆棧使用,該技術可以很好地融入整個環境,在任何時候都只佔用必要的資源。該技術可輕松地與YARN、HDFS和Kafka 集成。在兼容包的幫助下,Flink還可以運行為其他處理框架,例如Hadoop和Storm編寫的任務。
目前Flink最大的局限之一在於這依然是一個非常「年幼」的項目。現實環境中該項目的大規模部署尚不如其他處理框架那麼常見,對於Flink在縮放能力方面的局限目前也沒有較為深入的研究。隨著快速開發周期的推進和兼容包等功能的完善,當越來越多的組織開始嘗試時,可能會出現越來越多的Flink部署
總結
Flink提供了低延遲流處理,同時可支持傳統的批處理任務。Flink也許最適合有極高流處理需求,並有少量批處理任務的組織。該技術可兼容原生Storm和Hadoop程序,可在YARN管理的集群上運行,因此可以很方便地進行評估。快速進展的開發工作使其值得被大家關注。
結論
大數據系統可使用多種處理技術。
對於僅需要批處理的工作負載,如果對時間不敏感,比其他解決方案實現成本更低的Hadoop將會是一個好選擇。
對於僅需要流處理的工作負載,Storm可支持更廣泛的語言並實現極低延遲的處理,但默認配置可能產生重復結果並且無法保證順序。Samza與YARN和Kafka緊密集成可提供更大靈活性,更易用的多團隊使用,以及更簡單的復制和狀態管理。
對於混合型工作負載,Spark可提供高速批處理和微批處理模式的流處理。該技術的支持更完善,具備各種集成庫和工具,可實現靈活的集成。Flink提供了真正的流處理並具備批處理能力,通過深度優化可運行針對其他平台編寫的任務,提供低延遲的處理,但實際應用方面還為時過早。
最適合的解決方案主要取決於待處理數據的狀態,對處理所需時間的需求,以及希望得到的結果。具體是使用全功能解決方案或主要側重於某種項目的解決方案,這個問題需要慎重權衡。隨著逐漸成熟並被廣泛接受,在評估任何新出現的創新型解決方案時都需要考慮類似的問題。

㈣ Apache Flink和Apache Spark有什麼異同它們的發展前景分別怎樣

1、Spark在SQL上的優化,尤其是DataFrame到DataSet其實是借鑒的Flink的。Flink最初一開始對SQL支持得就更好。
2、Spark的cache in memory在Flink中是由框架自己判斷的,而不是用戶來指定的,因為Flink對數據的處理不像Spark以RDD為單位,就是一種細粒度的處理,對內存的規劃更好。
3、Flink原來用Java寫確實很難看,現在也在向Spark靠攏,Scala的支持也越來越好。不管怎麼說,二者目前都是在相互吸收。

㈤ 基於flink sql構建實時數據倉庫

根據目前大數據這一塊的發展,已經不局限於離線的分析,挖掘數據潛在的價值,數據的時效性最近幾年變得剛需,實時處理的框架有storm,spark-streaming,flink等。想要做到實時數據這個方案可行,需要考慮以下幾點:1、狀態機制 2、精確一次語義 3、高吞吐量 4、可彈性伸縮的應用 5、容錯機制,剛好這幾點,flink都完美的實現了,並且支持flink sql高級API,減少了開發成本,可用實現快速迭代,易維護等優點。

離線數倉的架構圖:

實時數倉架構圖:

目前是將實時維度表和DM層數據存於hbase當中,實時公共層都存於kafka當中,並且以寫滾動日誌的方式寫入HDFS(主要是用於校驗數據)。其實在這里可以做的工作還有很多,kafka集群,flink集群,hbase集群相互獨立,這對整個實時數據倉庫的穩定性帶來一定的挑戰。

一個數據倉庫想要成體系,成資產,離不開數據域的劃分。所以參考著離線的數據倉庫,想著在實時數倉做出這方面的探索,理論上來講,離線可以實現的,實時也是可以實現的。 並且目前已經取得了成效,目前劃分的數據域跟離線大致相同,有流量域,交易域,營銷域等等。當然這裡面涉及到維表,多事務事實表,累計快照表,周期性快照表的設計,開發,到落地這里就不詳述了。

維度表也是整個實時數據倉庫不可或缺的部分。從目前整個實時數倉的建設來看,維度表有著數據量大,但是變更少的特點,我們試想過構建全平台的實時商品維度表或者是實時會員維度表,但是這類維度表太過於復雜,所以針對這類維度表下面介紹。還有另外一種就是較為簡單的維度表,這類維度可能對應著業務系統單個mysql表,或者只需要幾個表進行簡單ETL就可以產出的表,這類維表是可以做成實時的。以下有幾個實施的關鍵點:

如下是離線數據同步架構圖:

實時數據的接入其實在底層架構是一樣的,就是從kafka那邊開始不一樣,實時用flink的UDTF進行解析,而離線是定時(目前是小時級)用camus拉到HDFS,然後定時load HDFS的數據到hive表裡面去,這樣來實現離線數據的接入。實時數據的接入是用flink解析kafka的數據,然後在次寫入kafka當中去。

由於目前離線數據已經穩定運行了很久,所以實時接入數據的校驗可以對比離線數據,但是離線數據是小時級的hive數據,實時數據存於kafka當中,直接比較不了,所以做了相關處理,將kafka的數據使用flink寫HDFS滾動日誌的形式寫入HDFS,然後建立hive表小時級定時去load HDFS中的文件,以此來獲取實時數據。

完成以上兩點,剩餘還需要考慮一點,都是小時級的任務,這個時間卡點使用什麼欄位呢?首先要確定一點就是離線和實時任務卡點的時間欄位必須是一致的,不然肯定會出問題。目前離線使用camus從kafka將數據拉到HDFS上,小時級任務,使用nginx_ts這個時間欄位來卡點,這個欄位是上報到nginx伺服器上記錄的時間點。而實時的數據接入是使用flink消費kafka的數據,在以滾動日誌的形式寫入HDFS的,然後在建立hive表load HDFS文件獲取數據,雖然這個hive也是天/小時二級分區,但是離線的表是根據nginx_ts來卡點分區,但是實時的hive表是根據任務啟動去load文件的時間點去區分的分區,這是有區別的,直接篩選分區和離線的數據進行對比,會存在部分差異,應當的做法是篩選范圍分區,然後在篩選nginx_ts的區間,這樣在跟離線做對比才是合理的。

目前實時數據接入層的主要時延是在UDTF函數解析上,實時的UDTF函數是根據上報的日誌格式進行開發的,可以完成日誌的解析功能。

解析流程圖如下:

解析速率圖如下:

該圖還不是在峰值數據量的時候截的,目前以800記錄/second為准,大概一個記錄的解析速率為1.25ms。
目前該任務的flink資源配置核心數為1,假設解析速率為1.25ms一條記錄,那麼峰值只能處理800條/second,如果數據接入速率超過該值就需要增加核心數,保證解析速率。

介紹一下目前離線維度表的情況,就拿商品維度表來說,全線記錄數將近一個億,計算邏輯來自40-50個ods層的數據表,計算邏輯相當復雜,如果實時維度表也參考離線維度表來完成的話,那麼開發成本和維護成本非常大,對於技術來講也是很大的一個挑戰,並且目前也沒有需求要求維度屬性百分百准確。所以目前(偽實時維度表)准備在當天24點產出,當天的維度表給第二天實時公共層使用,即T-1的模式。偽實時維度表的計算邏輯參考離線維度表,但是為了保障在24點之前產出,需要簡化一下離線計算邏輯,並且去除一些不常用的欄位,保障偽實時維度表可以較快產出。

實時維度表的計算流程圖:

目前使用flink作為公司主流的實時計算引擎,使用內存作為狀態後端,並且固定30s的間隔做checkpoint,使用HDFS作為checkpoint的存儲組件。並且checkpoint也是作為任務restart以後恢復狀態的重要依據。熟悉flink的人應該曉得,使用內存作為狀態後端,這個內存是JVM的堆內存,畢竟是有限的東西,使用不得當,OOM是常有的事情,下面就介紹一下針對有限的內存,如果完成常規的計算。

㈥ 轉載:阿里巴巴為什麼選擇Apache Flink

本文主要整理自阿里巴巴計算平台事業部資深技術專家莫問在雲棲大會的演講。

合抱之木,生於毫末

隨著人工智慧時代的降臨,數據量的爆發,在典型的大數據的業務場景下數據業務最通用的做法是:選用批處理的技術處理全量數據,採用流式計算處理實時增量數據。在絕大多數的業務場景之下,用戶的業務邏輯在批處理和流處理之中往往是相同的。但是,用戶用於批處理和流處理的兩套計算引擎是不同的。

因此,用戶通常需要寫兩套代碼。毫無疑問,這帶來了一些額外的負擔和成本。阿里巴巴的商品數據處理就經常需要面對增量和全量兩套不同的業務流程問題,所以阿里就在想,我們能不能有一套統一的大數據引擎技術,用戶只需要根據自己的業務邏輯開發一套代碼。這樣在各種不同的場景下,不管是全量數據還是增量數據,亦或者實時處理,一套方案即可全部支持, 這就是阿里選擇Flink的背景和初衷

目前開源大數據計算引擎有很多選擇,流計算如Storm,Samza,Flink,Kafka Stream等,批處理如Spark,Hive,Pig,Flink等。而同時支持流處理和批處理的計算引擎,只有兩種選擇:一個是Apache Spark,一個是Apache Flink。

從技術,生態等各方面的綜合考慮。首先,Spark的技術理念是基於批來模擬流的計算。而Flink則完全相反,它採用的是基於流計算來模擬批計算。

從技術發展方向看,用批來模擬流有一定的技術局限性,並且這個局限性可能很難突破。而Flink基於流來模擬批,在技術上有更好的擴展性。從長遠來看,阿里決定用Flink做一個統一的、通用的大數據引擎作為未來的選型。

Flink是一個低延遲、高吞吐、統一的大數據計算引擎。在阿里巴巴的生產環境中,Flink的計算平台可以實現毫秒級的延遲情況下,每秒鍾處理上億次的消息或者事件。同時Flink提供了一個Exactly-once的一致性語義。保證了數據的正確性。這樣就使得Flink大數據引擎可以提供金融級的數據處理能力。

Flink在阿里的現狀

基於Apache Flink在阿里巴巴搭建的平台於2016年正式上線,並從阿里巴巴的搜索和推薦這兩大場景開始實現。目前阿里巴巴所有的業務,包括阿里巴巴所有子公司都採用了基於Flink搭建的實時計算平台。同時Flink計算平台運行在開源的Hadoop集群之上。採用Hadoop的YARN做為資源管理調度,以 HDFS作為數據存儲。因此,Flink可以和開源大數據軟體Hadoop無縫對接。

目前,這套基於Flink搭建的實時計算平台不僅服務於阿里巴巴集團內部,而且通過阿里雲的雲產品API向整個開發者生態提供基於Flink的雲產品支持。

Flink在阿里巴巴的大規模應用,表現如何?

規模: 一個系統是否成熟,規模是重要指標,Flink最初上線阿里巴巴只有數百台伺服器,目前規模已達上萬台,此等規模在全球范圍內也是屈指可數;

狀態數據: 基於Flink,內部積累起來的狀態數據已經是PB級別規模;

Events: 如今每天在Flink的計算平台上,處理的數據已經超過萬億條;

PS: 在峰值期間可以承擔每秒超過4.72億次的訪問,最典型的應用場景是阿里巴巴雙11大屏;

Flink的發展之路

接下來從開源技術的角度,來談一談Apache Flink是如何誕生的,它是如何成長的?以及在成長的這個關鍵的時間點阿里是如何進入的?並對它做出了那些貢獻和支持?

Flink誕生於歐洲的一個大數據研究項目StratoSphere。該項目是柏林工業大學的一個研究性項目。早期,Flink是做Batch計算的,但是在2014年,StratoSphere裡面的核心成員孵化出Flink,同年將Flink捐贈Apache,並在後來成為Apache的頂級大數據項目,同時Flink計算的主流方向被定位為Streaming,即用流式計算來做所有大數據的計算,這就是Flink技術誕生的背景。

2014年Flink作為主攻流計算的大數據引擎開始在開源大數據行業內嶄露頭角。區別於Storm,Spark Streaming以及其他流式計算引擎的是:它不僅是一個高吞吐、低延遲的計算引擎,同時還提供很多高級的功能。比如它提供了有狀態的計算,支持狀態管理,支持強一致性的數據語義以及支持Event Time,WaterMark對消息亂序的處理。

Flink核心概念以及基本理念

Flink最區別於其他流計算引擎的,其實就是狀態管理。

什麼是狀態?例如開發一套流計算的系統或者任務做數據處理,可能經常要對數據進行統計,如Sum,Count,Min,Max,這些值是需要存儲的。因為要不斷更新,這些值或者變數就可以理解為一種狀態。如果數據源是在讀取Kafka,RocketMQ,可能要記錄讀取到什麼位置,並記錄Offset,這些Offset變數都是要計算的狀態。

Flink提供了內置的狀態管理,可以把這些狀態存儲在Flink內部,而不需要把它存儲在外部系統。這樣做的好處是第一降低了計算引擎對外部系統的依賴以及部署,使運維更加簡單;第二,對性能帶來了極大的提升:如果通過外部去訪問,如Redis,HBase它一定是通過網路及RPC。如果通過Flink內部去訪問,它只通過自身的進程去訪問這些變數。同時Flink會定期將這些狀態做Checkpoint持久化,把Checkpoint存儲到一個分布式的持久化系統中,比如HDFS。這樣的話,當Flink的任務出現任何故障時,它都會從最近的一次Checkpoint將整個流的狀態進行恢復,然後繼續運行它的流處理。對用戶沒有任何數據上的影響。

Flink是如何做到在Checkpoint恢復過程中沒有任何數據的丟失和數據的冗餘?來保證精準計算的?

這其中原因是Flink利用了一套非常經典的Chandy-Lamport演算法,它的核心思想是把這個流計算看成一個流式的拓撲,定期從這個拓撲的頭部Source點開始插入特殊的Barries,從上游開始不斷的向下游廣播這個Barries。每一個節點收到所有的Barries,會將State做一次Snapshot,當每個節點都做完Snapshot之後,整個拓撲就算完整的做完了一次Checkpoint。接下來不管出現任何故障,都會從最近的Checkpoint進行恢復。

Flink利用這套經典的演算法,保證了強一致性的語義。這也是Flink與其他無狀態流計算引擎的核心區別。

下面介紹Flink是如何解決亂序問題的。比如星球大戰的播放順序,如果按照上映的時間觀看,可能會發現故事在跳躍。

在流計算中,與這個例子是非常類似的。所有消息到來的時間,和它真正發生在源頭,在線系統Log當中的時間是不一致的。在流處理當中,希望是按消息真正發生在源頭的順序進行處理,不希望是真正到達程序里的時間來處理。Flink提供了Event Time和WaterMark的一些先進技術來解決亂序的問題。使得用戶可以有序的處理這個消息。這是Flink一個很重要的特點。

接下來要介紹的是Flink啟動時的核心理念和核心概念,這是Flink發展的第一個階段;第二個階段時間是2015年和2017年,這個階段也是Flink發展以及阿里巴巴介入的時間。故事源於2015年年中,我們在搜索事業部的一次調研。當時阿里有自己的批處理技術和流計算技術,有自研的,也有開源的。但是,為了思考下一代大數據引擎的方向以及未來趨勢,我們做了很多新技術的調研。

結合大量調研結果,我們最後得出的結論是:解決通用大數據計算需求,批流融合的計算引擎,才是大數據技術的發展方向,並且最終我們選擇了Flink。

但2015年的Flink還不夠成熟,不管是規模還是穩定性尚未經歷實踐。最後我們決定在阿里內部建立一個Flink分支,對Flink做大量的修改和完善,讓其適應阿里巴巴這種超大規模的業務場景。在這個過程當中,我們團隊不僅對Flink在性能和穩定性上做出了很多改進和優化,同時在核心架構和功能上也進行了大量創新和改進,並將其貢獻給社區,例如:Flink新的分布式架構,增量Checkpoint機制,基於Credit-based的網路流控機制和Streaming SQL等。

阿里巴巴對Flink社區的貢獻

我們舉兩個設計案例,第一個是阿里巴巴重構了Flink的分布式架構,將Flink的Job調度和資源管理做了一個清晰的分層和解耦。這樣做的首要好處是Flink可以原生的跑在各種不同的開源資源管理器上。經過這套分布式架構的改進,Flink可以原生地跑在Hadoop Yarn和Kubernetes這兩個最常見的資源管理系統之上。同時將Flink的任務調度從集中式調度改為了分布式調度,這樣Flink就可以支持更大規模的集群,以及得到更好的資源隔離。

另一個是實現了增量的Checkpoint機制,因為Flink提供了有狀態的計算和定期的Checkpoint機制,如果內部的數據越來越多,不停地做Checkpoint,Checkpoint會越來越大,最後可能導致做不出來。提供了增量的Checkpoint後,Flink會自動地發現哪些數據是增量變化,哪些數據是被修改了。同時只將這些修改的數據進行持久化。這樣Checkpoint不會隨著時間的運行而越來越難做,整個系統的性能會非常地平穩,這也是我們貢獻給社區的一個很重大的特性。

經過2015年到2017年對Flink Streaming的能力完善,Flink社區也逐漸成熟起來。Flink也成為在Streaming領域最主流的計算引擎。因為Flink最早期想做一個流批統一的大數據引擎,2018年已經啟動這項工作,為了實現這個目標,阿里巴巴提出了新的統一API架構,統一SQL解決方案,同時流計算的各種功能得到完善後,我們認為批計算也需要各種各樣的完善。無論在任務調度層,還是在數據Shuffle層,在容錯性,易用性上,都需要完善很多工作。

篇幅原因,下面主要和大家分享兩點:

● 統一 API Stack

● 統一 SQL方案

先來看下目前Flink API Stack的一個現狀,調研過Flink或者使用過Flink的開發者應該知道。Flink有2套基礎的API,一套是DataStream,一套是DataSet。DataStream API是針對流式處理的用戶提供,DataSet API是針對批處理用戶提供,但是這兩套API的執行路徑是完全不一樣的,甚至需要生成不同的Task去執行。所以這跟得到統一的API是有沖突的,而且這個也是不完善的,不是最終的解法。在Runtime之上首先是要有一個批流統一融合的基礎API層,我們希望可以統一API層。

因此,我們在新架構中將採用一個DAG(有限無環圖)API,作為一個批流統一的API層。對於這個有限無環圖,批計算和流計算不需要涇渭分明的表達出來。只需要讓開發者在不同的節點,不同的邊上定義不同的屬性,來規劃數據是流屬性還是批屬性。整個拓撲是可以融合批流統一的語義表達,整個計算無需區分是流計算還是批計算,只需要表達自己的需求。有了這套API後,Flink的API Stack將得到統一。

除了統一的基礎API層和統一的API Stack外,同樣在上層統一SQL的解決方案。流和批的SQL,可以認為流計算有數據源,批計算也有數據源,我們可以將這兩種源都模擬成數據表。可以認為流數據的數據源是一張不斷更新的數據表,對於批處理的數據源可以認為是一張相對靜止的表,沒有更新的數據表。整個數據處理可以當做SQL的一個Query,最終產生的結果也可以模擬成一個結果表。

對於流計算而言,它的結果表是一張不斷更新的結果表。對於批處理而言,它的結果表是相當於一次更新完成的結果表。從整個SOL語義上表達,流和批是可以統一的。此外,不管是流式SQL,還是批處理SQL,都可以用同一個Query來表達復用。這樣以來流批都可以用同一個Query優化或者解析。甚至很多流和批的運算元都是可以復用的。

Flink的未來方向

首先,阿里巴巴還是要立足於Flink的本質,去做一個全能的統一大數據計算引擎。將它在生態和場景上進行落地。目前Flink已經是一個主流的流計算引擎,很多互聯網公司已經達成了共識:Flink是大數據的未來,是最好的流計算引擎。下一步很重要的工作是讓Flink在批計算上有所突破。在更多的場景下落地,成為一種主流的批計算引擎。然後進一步在流和批之間進行無縫的切換,流和批的界限越來越模糊。用Flink,在一個計算中,既可以有流計算,又可以有批計算。

第二個方向就是Flink的生態上有更多語言的支持,不僅僅是Java,Scala語言,甚至是機器學習下用的Python,Go語言。未來我們希望能用更多豐富的語言來開發Flink計算的任務,來描述計算邏輯,並和更多的生態進行對接。

最後不得不說AI,因為現在很多大數據計算的需求和數據量都是在支持很火爆的AI場景,所以在Flink流批生態完善的基礎上,將繼續往上走,完善上層Flink的Machine Learning演算法庫,同時Flink往上層也會向成熟的機器學習,深度學習去集成。比如可以做Tensorflow On Flink, 讓大數據的ETL數據處理和機器學習的Feature計算和特徵計算,訓練的計算等進行集成,讓開發者能夠同時享受到多種生態給大家帶來的好處。

㈦ 技術選型 - OLAP大數據技術哪家強

Lambda架構的核心理念是「流批一體化」,因為隨著機器性能和數據框架的不斷完善,用戶其實不關心底層是如何運行的,批處理也好,流式處理也罷,能按照統一的模型返回結果就可以了,這就是Lambda架構誕生的原因。現在很多應用,例如Spark和Flink,都支持這種結構,也就是數據進入平台後,可以選擇批處理運行,也可以選擇流式處理運行,但不管怎樣,一致性都是相同的。

Kylin

Kylin的主要特點是預計算,提前計算好各個cube,這樣的優點是查詢快速,秒級延遲;缺點也非常明顯,靈活性不足,無法做一些 探索 式的,關聯性的數據分析。

適合的場景也是比較固定的,場景清晰的地方。

ClickHouse

Clickhouse由俄羅斯yandex公司開發。專為在線數據分析而設計。

Clickhouse最大的特點首先是快 ,為了快採用了列式儲存,列式儲存更好的支持壓縮,壓縮後的數據傳輸量變小,所以更快;同時支持分片,支持分布式執行,支持SQL。

ClickHouse很輕量級,支持數據壓縮和最終數據一致性,其數據量級在PB級別。

另外Clickhouse不是為關聯分析而生,所以多表關聯支持的不太好。

同樣Clickhouse不能修改或者刪除數據,僅能用於批量刪除或修改。沒有完整的事務支持,不支持二級索引等等,缺點也非常明顯。

與Kylin相比ClickHouse更加的靈活,sql支持的更好,但是相比Kylin,ClickHouse不支持大並發,也就是不能很多訪問同時在線。

總之ClickHouse用於在線數據分析,支持功能簡單。CPU 利用率高,速度極快。最好的場景用於行為統計分析。

Hive

Hive這個工具,大家一定很熟悉,大數據倉庫的首選工具。可以將結構化的數據文件映射為一張資料庫表,並提供完整的sql查詢功能。

主要功能是可以將sql語句轉換為相對應的MapRece任務進行運行,這樣可能處理海量的數據批量,

Hive與HDFS結合緊密,在大數據開始初期,提供一種直接使用sql就能訪問HDFS的方案,擺脫了寫MapRece任務的方式,極大的降低了大數據的門檻。

當然Hive的缺點非常明顯,定義的是分鍾級別的查詢延遲,估計都是在比較理想的情況。 但是作為數據倉庫的每日批量工具,的確是一個穩定合格的產品。

Presto

Presto極大的改進了Hive的查詢速度,而且Presto 本身並不存儲數據,但是可以接入多種數據源,並且支持跨數據源的級聯查詢,支持包括復雜查詢、聚合、連接等等。

Presto沒有使用MapRece,它是通過一個定製的查詢和執行引擎來完成的。它的所有的查詢處理是在內存中,這也是它的性能很高的一個主要原因。

Presto由於是基於內存的,缺點可能是多張大表關聯操作時易引起內存溢出錯誤。

另外Presto不支持OLTP的場景,所以不要把Presto當做資料庫來使用。

Presto相比ClickHouse優點主要是多表join效果好。相比ClickHouse的支持功能簡單,場景支持單一,Presto支持復雜的查詢,應用范圍更廣。

Impala

Impala是Cloudera 公司推出,提供對 HDFS、Hbase 數據的高性能、低延遲的互動式 SQL 查詢功能。

Impala 使用 Hive的元數據, 完全在內存中計算。是CDH 平台首選的 PB 級大數據實時查詢分析引擎。

Impala 的缺點也很明顯,首先嚴重依賴Hive,而且穩定性也稍差,元數據需要單獨的mysql/pgsql來存儲,對數據源的支持比較少,很多nosql是不支持的。但是,估計是cloudera的國內市場推廣做的不錯,Impala在國內的市場不錯。

SparkSQL

SparkSQL的前身是Shark,它將 SQL 查詢與 Spark 程序無縫集成,可以將結構化數據作為 Spark 的 RDD 進行查詢。

SparkSQL後續不再受限於Hive,只是兼容Hive。

SparkSQL提供了sql訪問和API訪問的介面。

支持訪問各式各樣的數據源,包括Hive, Avro, Parquet, ORC, JSON, and JDBC。

Drill

Drill好像國內使用的很少,根據定義,Drill是一個低延遲的分布式海量數據互動式查詢引擎,支持多種數據源,包括hadoop,NoSQL存儲等等。

除了支持多種的數據源,Drill跟BI工具集成比較好。

Druid

Druid是專為海量數據集上的做高性能 OLAP而設計的數據存儲和分析系統。

Druid 的架構是 Lambda 架構,分成實時層和批處理層。

Druid的核心設計結合了數據倉庫,時間序列資料庫和搜索系統的思想,以創建一個統一的系統,用於針對各種用例的實時分析。Druid將這三個系統中每個系統的關鍵特徵合並到其接收層,存儲格式,查詢層和核心體系結構中。

目前 Druid 的去重都是非精確的,Druid 適合處理星型模型的數據,不支持關聯操作。也不支持數據的更新。

Druid最大的優點還是支持實時與查詢功能,解約了很多開發工作。

Ku

ku是一套完全獨立的分布式存儲引擎,很多設計概念上借鑒了HBase,但是又跟HBase不同,不需要HDFS,通過raft做數據復制;分片策略支持keyrange和hash等多種。

數據格式在parquet基礎上做了些修改,支持二級索引,更像一個列式存儲,而不是HBase schema-free的kv方式。

ku也是cloudera主導的項目,跟Impala結合比較好,通過impala可以支持update操作。

ku相對於原有parquet和ORC格式主要還是做增量更新的。

Hbase

Hbase使用的很廣,更多的是作為一個KV資料庫來使用,查詢的速度很快。

Hawq

Hawq是一個Hadoop原生大規模並行SQL分析引擎,Hawq採用 MPP 架構,改進了針對 Hadoop 的基於成本的查詢優化器。

除了能高效處理本身的內部數據,還可通過 PXF 訪問 HDFS、Hive、HBase、JSON 等外部數據源。HAWQ全面兼容 SQL 標准,還可用 SQL 完成簡單的數據挖掘和機器學習。無論是功能特性,還是性能表現,HAWQ 都比較適用於構建 Hadoop 分析型數據倉庫應用。

㈧ Flink SQL 知其所以然(五)| 自定義 protobuf format

protobuf 作為目前各大公司中最廣泛使用的高效的協議數據交換格式工具庫,會大量作為流式數據傳輸的序列化方式,所以在 flink sql 中如果能實現 protobuf 的 format 會非常有用( 目前社區已經有對應的實現,不過目前還沒有 merge,預計在 1.14 系列版本中能 release )。

issue 見: https://issues.apache.org/jira/browse/FLINK-18202?filter=-4&jql=project%20%3D%20FLINK%20AND%20issuetype%20%3D%20%22New%20Feature%22%20AND%20text%20~%20protobuf%20order%20by%20created%20DESC

pr 見: https://github.com/apache/flink/pull/14376

這一節主要介紹 flink sql 中怎麼自定義實現 format ,其中以最常使用的 protobuf 作為案例來介紹。

如果想在本地直接測試下:

關於為什麼選擇 protobuf 可以看這篇文章,寫的很詳細:

http://hengyunabc.github.io/thinking-about-grpc-protobuf/?utm_source=tuicool&utm_medium=referral

在實時計算的領域中,為了可讀性會選擇 json ,為了效率以及一些已經依賴了 grpc 的公司會選擇 protobuf 來做數據序列化,那麼自然而然,日誌的序列化方式也會選擇 protobuf 。

而官方目前已經 release 的版本中是沒有提供 flink sql api 的 protobuf format 的。如下圖,基於 1.13 版本。

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/

因此本文在介紹怎樣自定義一個 format 的同時,實現一個 protobuf format 來給大家使用。

預期效果是先實現幾種最基本的數據類型,包括 protobuf 中的 message (自定義 model)、 map (映射)、 repeated (列表)、其他基本數據類型等,這些都是我們最常使用的類型。

預期 protobuf message 定義如下:

測試數據源數據如下,博主把 protobuf 的數據轉換為 json,以方便展示,如下圖:

預期 flink sql:

數據源表 DDL:

數據匯表 DDL:

Transform 執行邏輯:

下面是我在本地跑的結果:

可以看到列印的結果,數據是正確的被反序列化讀入,並且最終輸出到 console。

目前業界可以參考的實現如下: https://github.com/maosuhan/flink-pb , 也就是這位哥們負責目前 flink protobuf 的 format。

這種實現的具體使用方式如下:

其實現有幾個特點:

[圖片上傳失敗...(image-66c35b-1644940704671)]

其實上節已經詳細描述了 flink sql 對於 sourcesinkformat 的載入機制。

如圖 serde format 是通過 TableFactoryHelper.discoverDecodingFormat 和 TableFactoryHelper.discoverEncodingFormat 創建的

所有通過 SPI 的 sourcesinkformt 插件都繼承自 Factory 。

整體創建 format 方法的調用鏈如下圖。

最終實現如下,涉及到了幾個實現類:

具體流程:

上述實現類的具體關系如下:

介紹完流程,進入具體實現方案細節:

ProtobufFormatFactory 主要創建 format 的邏輯:

resourcesMETA-INF 文件:

主要實現反序列化的邏輯:

可以注意到上述反序列化的主要邏輯就集中在 runtimeConverter 上,即 ProtobufToRowDataConverters.ProtobufToRowDataConverter 。

ProtobufToRowDataConverters.ProtobufToRowDataConverter 就是在 ProtobufToRowDataConverters 中定義的。

ProtobufToRowDataConverters.ProtobufToRowDataConverter 其實就是一個 convertor 介面:

其作用就是將 protobuf message 中的每一個欄位轉換成為 RowData 中的每一個欄位。

ProtobufToRowDataConverters 中就定義了具體轉換邏輯,如截圖所示,每一個 LogicalType 都定義了 protobuf message 欄位轉換為 flink 數據類型的邏輯:

源碼後台回復 flink sql 知其所以然(五)| 自定義 protobuf format 獲取。

本文主要是針對 flink sql protobuf format 進行了原理解釋以及對應的實現。
如果你正好需要這么一個 format,直接後台回復 flink sql 知其所以然(五)| 自定義 protobuf format 獲取源碼吧。

當然上述只是 protobuf format 一個基礎的實現,用於生產環境還有很多方面可以去擴展的。

㈨ flink 1.10 1.12區別

flink 1.10 1.12區別在於Flink 1.12 支持了 Flink SQL Kafka upsert connector 。

因為在 Flink 1.10 中,當前這類任務開發對於用戶來說,還是不夠友好,需要很多代碼,同時也會造成 Flink SQL 冗長。

Flink 1.12 SQL Connector 支持 Kafka Upsert Connector,這也是我們公司內部業務方對實時平台提出的需求。

收益:便利用戶有這種需要從 kafka 取最新記錄操作的實時任務開發,比如這種 binlog -> kafka,然後用戶聚合操作,這種場景還是非常多的,這能提升實時作業開發效率,同時 1.12 做了優化,性能會比單純的 last_value 性能要好。

Flink Yarn 作業 On k8s 的生產級別能力是:

Flink Jar 作業已經全部 K8s 化,Flink SQL 作業由於是推廣初期,還是在 Yarn 上面進行運行,為了將實時計算 Flink 全部K8s化。

所以我們 Flink SQL 作業也需要遷移到 K8s,目前 Flink 1.12 已經滿足生產級別的 Flink k8s 功能,所以 Flink SQL K8s 化,打算直接使用社區的 On k8s 能力。

風險:雖然和社區的人溝通,Flink 1.12 on k8s 沒有什麼問題,但是具體功能還是需要先 POC 驗證一下,同時可能社區 Flink on k8s 的能力。

可能會限制我們這邊一些 k8s 功能使用,比如 hostpath volome 以及 Ingress 的使用,這里可能需要改底層源碼來進行快速支持(社區有相關 JIRA 要做)。

㈩ 流批一體不只有Flink,還有實時數據模型

通常來講,數據倉庫的建設,都是以離線作為主要的密報,下游的應用,不論是報表還是介面,所提供的數據也大多是T-1時效性。

但伴隨著業務的變化,當離線做到沒什麼可以繼續做的時候,實時就會被拿出來,作為新一個階段的目標進行攻克。

在流批一體建設之前,這種實時訴求通常會開發成分鍾級的任務,通過近實時的方案來解決業務的問題,但分鍾級會帶來諸如任務過多、資源擠占較大、無法支持復雜邏輯等問題。

因此專門支持實時計算的框架,比如早期的Storm,能夠嘗試從純實時的角度解決業務問題,就被拿出來作為嘗試。然而Storm的局限性也很大,因為那會的任務開發只能通過Java的方式來進行,與Hive所推崇的純SQL方案相比,上手難度大了不少,同時兩套代碼的邏輯幾乎沒有可比性,這種方案也就一直沒有什麼聲音。

盡管實時技術有各種缺陷,但作為一種能夠很容易講清楚價值的項目,同時又非常便於向上匯報的技術方案,實時技術還是被或多或少的做了起來。在大多數的公司里,實時和離線就會有不同的團隊進行維護,或者是同一個團隊,但分成不同的項目來執行。這個階段,優先高效的把業務做起來,哪怕場景再簡單,但能夠證明實時有價值和前景,這個階段的目標就算完成了。

以上的各種方案,難免會帶來三個特別難以解決的問題:

(1)數據的口徑上,實時和離線很容易不統一;
(2)數據模型的規范上,實時和離線也往往是分開建設;
(3)即便是同一種口徑和同一種規范,實時和離線也要分成兩套代碼來維護。

這三個問題短時間內會被高速發展掩蓋掉,但當業務對實時的訴求越來越多、壓力越來越大的時候,口徑和代碼的不統一,就會越來越成為阻礙敏捷開發的障礙,需要有方案進行解決。

後來Flink出現了,帶來了流批一體的全新方案,這個問題便出現了解決的曙光,這也比較接近我們對於實時計算的理想方案,因為其意義堪比Hive,也成為了各個大廠面試的標配問題。

然而,僅僅學會Flink是不夠的,因為流批一體帶來的並不僅僅是技術方案或者是框架的改變,同樣帶來了數據模型的改變,這就要求我們從數據模型上,而不是技術方案上,來制定我們的實時方案。

那麼我們如何理解「實時數據模型」這件事情呢?

通常而言,我們關心的內容,包括如下幾個方面:

(1)實時數據源與離線數據源存在差異,導致相同的欄位,取值或者類型會存在不相等的情況;
(2)實時和離線由於底層執行機制的不同,通常需要維護兩套代碼,會帶來諸如口徑不統一、質量檢測難的問題;
(3)產品邏輯變化較快時,離線模型修改相對容易,但實時模型需要考慮壓測、削峰、重啟等技術問題,維護成本非常高昂。

數據倉庫之所以能夠普及並被業務接受,正是因為其模型能夠屏蔽掉底層差異的問題,並且有相對可靠的數據質量監控方法,並且變更成本非常低。而實時數倉如果想要替代掉離線數倉,以上的問題通常是需要一些模型設計甚至是平台工具的來解決,這些問題解決的重要性,並不比Flink弱。

我們先從比較可控的模型層面說起。

在離線的概念里,數倉模型設計成了DWD/DWS/ADS三個層級,原本的概念是DWD面向事實表的構建,DWS面向公共指標的統一,ADS負責靈活的口徑變化問題。

在離線的概念里,DWD/DWS/ADS三個層級需要保留,但負責的目標會有一些變化,同時還需要增加存儲統一層,也就是以TiDB/Holo為代表的資料庫,來承擔服務分析一體化的訴求。

讓我們先看DWD層,DWD承擔了屏蔽實時離線鏈路差異的問題,最重要的作用是保證表結構的統一及欄位內容的對齊。DWD最重要的意義,是保證離線表和實時表,其表結構和欄位概念是相同的。

為什麼這么強調?試想一下,在離線場景下,我們可以在DWD上靈活的增加各種統計標簽,或者是將維度退化到事實表,都是一些left join或者是服務端直接打標可以解決的事情。但在實時場景下,這會變成多流join或者是緩存等更復雜的技術場景,導致這些信息並不能有效的記錄到DWD,因此DWD的設計就要產生一些變化,有一些內容在實時場景下無法准確記錄,這一類信息需要標識到對應的欄位描述上,下游使用時才不會出錯。

同時,實時和離線存儲數據的介質,也必然有一些區別。例如離線可以存在HDFS上,實時則可能視情況保存在資料庫、HDFS甚至是內存中,這時候對於欄位格式、讀取方式都會有差異,設計表時其約束條件也會更多。

因而,DWD更多承擔了邏輯統一的職責,依舊以事實表為基礎,但約束條款要比離線更多。

再看一下DWS層,離線上DWS是負責口徑統一的重要一環,將通用的維度和口徑計算方法抽象出現,以提供跨數據域的靈活使用。但在實時場景下,這一類的維護收益通常都比較低,不僅因為實時只看當天的數據,也是因為實時本身的維度難度就較大,多一層模型其收益會急速下降,因而大多數時候會忽略掉DWS的建設,ADS直接引用DWD進行統計。

然而,DWS畢竟存儲的內容要比DWD少很多,因此如果計算資源瓶頸非常明顯,或者是業務場景不需要分析實時明細數據的情況下,或者是DWD的下游引用過多時,DWS可以承擔削峰的重任,通過減少數據量以應對大促等場景,還是有一定意義的。

接下來就是最重要的ADS層,在這一層上,邏輯統一、口徑統一、大促削峰在前置模型上都得到了一定程度的解決,ADS則像離線一樣承擔了應對需求變化的重任。

但ADS所面臨的情況和離線還是有所不同的,因為ADS的任務啟動,不僅要啟動一個離線的跑批任務,還要同時啟動一個實時的流式任務,而ADS往往會同時統計離線+實時的結果,以應對同比、環比等場景。

這時候很多具體Case要具體分析了,因為特定場景的坑會非常多。例如最常見的「同比」,要對比今年和去年的結果變化,離線往往會統計分小時的結果,但實時會累計起始時刻到當期時刻的結果,因而當一個小時沒有結束的時候,這個同比的波動變化會非常大,給人一種「數據是錯誤的」印象,新手很容易踩這個坑,從而被業務質疑。

因此,針對累計統計指標,從代碼設計上就要考慮到這種情況,都根據時間欄位統計起始到當前時刻的結果的,在代碼邏輯上會要求一些統計技巧。

很多時候,因為業務指標變化太快,改實時代碼是來不及的,這時候一部分的工作量甚至需要報表工具的數據集來解決,改動查詢sql,要比改動任務來的快捷多了。但這部分的能力,其實是依賴於存儲工具的,個人認為可以分到存儲統一層來解決。

最後是存儲統一層,因為一些特殊的場景,比如實時分析明細數據,或者是不確定時間周期的多天統計結果,如果依賴Flink SQL來解決是有些不現實的,因而這部分的壓力需要資料庫來承擔。

簡單講,就是將明細做輕度的匯總後,直接寫到資料庫,實時更新,下游自定義條件,並直接讀庫統計結果。這種場景既要求資料庫有OLAP的計算能力,也要有OLTP的穩定特點,因而TiDB和Holo這一類HTAP的引擎就變得非常重要。

因為多了實時的部分,因此過去面向離線的開發工具,也需要有一些特定的改造,以適應實時的開發和運維訴求。

對於開發工具而言,其目標集中在四個場景上:元數據定義與獲取、數據建模、開發與測試、運維與監控。

其次講數據建模,因為建模的理論已經穩定了有些年頭了,絕大多數場景下都是按照既定的方案來執行。過去離線當道時,規范執行的弱一些不是什麼大問題,但流批一體當道的年代,規范是需要強約束的,這就對了開發工具提出了一定的要求,是否能夠從平台層面上對規范進行內置,並以此來約束開發的同學,降低不規范模型對後期維護帶來的壓力。

這種建模能力的代表有兩種,一種是規范表的命名,填寫相應的分層+主題域+數據域+統計刷新方式,從源頭上規范表的目標和作用;一種是規范指標的定義和使用,例如原子指標還是派生指標,統計周期多少,業務限定用語如何規范,統計粒度怎麼填寫。

在實際開發中,通過工具的限制,如果規范可以做的好,代碼是可以自動生成出來的。當然,以上的功能,都屬於通過犧牲開發效率,來提升數據質量的范疇,使用時需要根據團隊的情況來限定。

再次是開發和測試,這是平台提供的最重要的能力。在開發層面,就是代碼的預編譯能力+發布功能。預編譯不僅要檢查代碼的邏輯是否正確,同時對於代碼中依賴的其他數據源,獲取到的元數據信息是否准確,至少欄位的命名不會有大的問題。當代碼預編譯通過,發布上線後,還需要檢測當前是否有資源支持任務啟動,並且上游的消息隊列是否是啟動的狀態。

實時的測試一直都是比較大的問題,它不像離線可以啟動一個SQL任務看看結果,實時在每個階段的輸入和輸出,是需要通過平台支持的日誌列印功能來進行輔助的。很多時候我們會新建一個測試專用的topic來測試結果,但對於流量較大的線上任務而言,這種方式無法像離線區分Dev環境一樣,能夠對資源進行隔離,因而如果能夠支持圈定數據的輸入和列印輸出,對於測試的效率而言無疑是最佳的。

最後要提到的是運維與監控能力。運維能力是指根據輸入的RPS,或者是cu使用情況,或者是任務的整體延遲,提供相應的參數調優能力,通過參數來調整任務的執行情況。並且能夠根據以上指標的變化,自定義相應的閾值,提供相應的告警能力,通過簡訊或者是消息工具的方式觸達任務維護者。

實時與離線有一些不同的是,離線可以通過增加一個監控節點的方式,通過group by判斷數據是否重復,而實時任務則非常依賴Flink自身的一致性能力,因而發現和解決問題的成本更高。

其實做到運維這個環節,對人的要求其實是更高的。因為流批一體在運維上會帶來一個好處,即實時任務和離線任務能夠錯峰執行,實時在白天壓力大,而離線在晚上壓力大。但同樣的,這種方式對於維護者而言更加痛苦,因為不僅晚上要熬夜值班,白天同樣不能休息,在大促期間甚至需要輪班來維護任務,可以說是「匯報一時爽,痛苦長相伴」。

從遠處來看,流任務和批任務,在自身的機制上就存在非常大的差異,批程序面上的是特定時間內相對靜態的數據,而流程序處理的則是change-log,雖然有可能數據在表結構層面,通過數據模型的設計來保持一致,但是在語義層面,其根本還是不一樣的。這一點可能是最制約批流一體發展的問題,也是最難實現統一或者永遠也不可能統一的。

綜上,對於實時模型,開發工具需要將監控實時部分的能力進行補全,就像DWD層需要分別維護實時和離線兩套架構一樣,開發工具也需要分別維護兩套架構的結果,因而現階段的實時開發,還做不到降低維護和開發的成本,只能減輕其中部分環節的工作量。

以上講了很長時間的實時模型,但從實際的效果上看,業務並不會感知到多麼明顯的技術變化,相反會有一種「面子工程」的感覺在裡面。

當然,我並不否認實時的價值,在「搜廣推」這三個技術佔主導的領域內,作用還是很大的。但實時畢竟要比離線的內容,更加的難以理解,出現問題的排查成本也更高。這種復雜性使得我們在應對變化時,往往做不出有效的應對,就會變得特別被動。

因而,說一句事後的話,就是「實時的價值取決於業務方,而不是技術方」。只有業務對實時痛點強烈的場景下,我們做如此復雜的研究和應對,才能體現出自己的價值,更多的時候,是在「王婆賣瓜,自賣自誇」。有這種投入,還不如多招幾個分析師更靠譜和實在。

本人之前的文章《天下數據,唯快不破》,重點強調了一個「快」字。但「天下熙熙皆為利來,天下攘攘皆為利往」,這個快更多的是在講應對「變化」的快,而不是「技術」自己的快。

所以,為了以後的職業發展,我們要跟進實時技術的變化,但從自身的工作角度出發,如何應對業務的變化,才是自己要關心的課題。