當前位置:首頁 » 服務存儲 » flink狀態存儲多副本
擴展閱讀
webinf下怎麼引入js 2023-08-31 21:54:13
堡壘機怎麼打開web 2023-08-31 21:54:11

flink狀態存儲多副本

發布時間: 2022-10-11 13:56:29

① Flink狀態管理和恢復機制

1、什麼是狀態?
2、Flink狀態類型有哪幾種?
3、狀態有什麼作用?
4、如何使用狀態,實現什麼樣的API?
5、什麼是checkpoint與savepoint?
6、如何使用checkpoint與savepoint?
7、checkpoint原理是什麼?
8、checkpint存儲到hdfs上又是什麼意思?

<1> 增量計算
聚合操作、機器學習訓練模型迭代運算時保存當前模型等等
<2> 容錯
Job故障重啟、升級

定義: 某task或者operator 在某一時刻的在內存中的狀態。
而checkpoint是,對於這個中間結果進行一次快照。
作用:State是可以被記錄的,在失敗的情況下可以恢復。
checkpoint則表示了一個Flink Job,在一個特定時刻的一份全局狀態快照,即包含了一個job下所有task/operator某時刻的狀態。

比如任務掛掉的時候或被手動停止的時候,可以從掛掉的點重新繼續消費。
基本類型:Operator state、Keyed state
特殊的 Broadcast State
適用場景:
增量計算:
<1>聚合操作
<2>機器學習訓練模型迭代運算時保存當前模型
等等
容錯:
Job故障重啟

使用狀態,必須使用RichFunction,因為狀態是使用RuntimeContext訪問的,只能在RichFunction中訪問

假設現在存在輸入源數據格式為(EventID,Value)
輸出數據,直接flatMap即可,無狀態。
如果要輸出某EventID最大值/最小值等,HashMap是否可以?
程序一旦Crash,如何恢復?

答案:Flink提供了一套狀態保存的方法,不需要藉助第三方存儲系統來解決狀態存儲問題。

Operator State跟一個特定operator的一個並發實例綁定,整個operator只對應一個state。相比較而言,在一個operator上,可能有很多個key,從而對應多個keyed state。
所以一個並行度為4的source,即有4個實例,那麼就會有4個狀態

舉例:Flink中的Kafka Connector,就使用了operator state。有幾個並行度,就會有幾個connector實例,消費的分區不一樣,它會在每個connector實例中,保存該實例中消費topic的所有(partition,offset)映射。

數據結構:ListState<T>

一般編碼過程:實現CheckpointedFunction介面,必須實現兩個函數,分別是:
initializeState和snapshotState

如何保存狀態?
通常是定義一個private transient ListState<Long> checkPointList;

注意:使用Operator State最好不要在keyBy之後使用,另外不要將太大的state存放到這個裡面。

是基於KeyStream之上的狀態,keyBy之後的Operator State。
那麼,一個並行度為3的keyed Opreator有幾個狀態,這個就不一定是3了,這里有幾個狀態是由keyby之後有幾個key所決定的。

案例:有一個事件流Tuple2[eventId,val],求不同的事件eventId下,相鄰3個val的平均值,事件流如下:
(1,4),(2,3),(3,1),(1,2),(3,2),(1,2),(2,2),(2,9)
那麼事件1:8/3=2
那麼事件2:14/3=4

Keyed State的數據結構類型有:
ValueState<T>:update(T)
ListState<T>:add(T)、get(T)和clear(T)
RecingState<T>:add(T)、receFunction()
MapState<UK,UV>:put(UK,UV)、putAll(Map<UK,UV>)、get(UK)

FlatMapFunction是無狀態函數;RichFlatMapFunction是有狀態函數

這里沒有實現CheckpointedFunction介面,而是直接調用方法 getRuntimeContext(),然後使用getState方法來獲取狀態值。

特殊場景: 來自一個流的一些數據需要廣播到所有下游任務,在這些任務中,這些數據被本地存儲並且用於處理另一個流上的所有處理元素 。例如:一個低吞吐量流,其中包含一組規則,我們希望對來自另一個流的所有元素按照規則進行計算

典型應用:常規事件流.connect(規則流)
常規事件流.connect(配置流)

<1> 創建常規事件流DataStream或者KeyedDataStream
<2> 創建BroadcastedStream:創建規則流/配置流(低吞吐)並廣播
<3> 連接兩個Stream並實現計算處理
process(可以是BroadcastProcessFunction 或者 KeyedBroadcastProcessFunction )

BroadcastProcessFunction:

processElement(...):負責處理非廣播流中的傳入元素

processBroadcastElement(...):負責處理廣播流中的傳入元素(如規則),一般廣播流的元素添加到狀態里去備用,processElement處理業務數據時就可以使用

ReadOnlyContext和Context:
ReadOnlyContext對Broadcast State只有隻讀許可權,Conetxt有寫許可權

KeyedBroadcastProcessFunction:

注意:
<1> Flink之間沒有跨Task的通信
<2> 每個任務的廣播狀態的元素順序有可能不一樣
<3> Broadcast State保存在內存中(並不在RocksDB)

② 轉載:阿里巴巴為什麼選擇Apache Flink

本文主要整理自阿里巴巴計算平台事業部資深技術專家莫問在雲棲大會的演講。

合抱之木,生於毫末

隨著人工智慧時代的降臨,數據量的爆發,在典型的大數據的業務場景下數據業務最通用的做法是:選用批處理的技術處理全量數據,採用流式計算處理實時增量數據。在絕大多數的業務場景之下,用戶的業務邏輯在批處理和流處理之中往往是相同的。但是,用戶用於批處理和流處理的兩套計算引擎是不同的。

因此,用戶通常需要寫兩套代碼。毫無疑問,這帶來了一些額外的負擔和成本。阿里巴巴的商品數據處理就經常需要面對增量和全量兩套不同的業務流程問題,所以阿里就在想,我們能不能有一套統一的大數據引擎技術,用戶只需要根據自己的業務邏輯開發一套代碼。這樣在各種不同的場景下,不管是全量數據還是增量數據,亦或者實時處理,一套方案即可全部支持, 這就是阿里選擇Flink的背景和初衷

目前開源大數據計算引擎有很多選擇,流計算如Storm,Samza,Flink,Kafka Stream等,批處理如Spark,Hive,Pig,Flink等。而同時支持流處理和批處理的計算引擎,只有兩種選擇:一個是Apache Spark,一個是Apache Flink。

從技術,生態等各方面的綜合考慮。首先,Spark的技術理念是基於批來模擬流的計算。而Flink則完全相反,它採用的是基於流計算來模擬批計算。

從技術發展方向看,用批來模擬流有一定的技術局限性,並且這個局限性可能很難突破。而Flink基於流來模擬批,在技術上有更好的擴展性。從長遠來看,阿里決定用Flink做一個統一的、通用的大數據引擎作為未來的選型。

Flink是一個低延遲、高吞吐、統一的大數據計算引擎。在阿里巴巴的生產環境中,Flink的計算平台可以實現毫秒級的延遲情況下,每秒鍾處理上億次的消息或者事件。同時Flink提供了一個Exactly-once的一致性語義。保證了數據的正確性。這樣就使得Flink大數據引擎可以提供金融級的數據處理能力。

Flink在阿里的現狀

基於Apache Flink在阿里巴巴搭建的平台於2016年正式上線,並從阿里巴巴的搜索和推薦這兩大場景開始實現。目前阿里巴巴所有的業務,包括阿里巴巴所有子公司都採用了基於Flink搭建的實時計算平台。同時Flink計算平台運行在開源的Hadoop集群之上。採用Hadoop的YARN做為資源管理調度,以 HDFS作為數據存儲。因此,Flink可以和開源大數據軟體Hadoop無縫對接。

目前,這套基於Flink搭建的實時計算平台不僅服務於阿里巴巴集團內部,而且通過阿里雲的雲產品API向整個開發者生態提供基於Flink的雲產品支持。

Flink在阿里巴巴的大規模應用,表現如何?

規模: 一個系統是否成熟,規模是重要指標,Flink最初上線阿里巴巴只有數百台伺服器,目前規模已達上萬台,此等規模在全球范圍內也是屈指可數;

狀態數據: 基於Flink,內部積累起來的狀態數據已經是PB級別規模;

Events: 如今每天在Flink的計算平台上,處理的數據已經超過萬億條;

PS: 在峰值期間可以承擔每秒超過4.72億次的訪問,最典型的應用場景是阿里巴巴雙11大屏;

Flink的發展之路

接下來從開源技術的角度,來談一談Apache Flink是如何誕生的,它是如何成長的?以及在成長的這個關鍵的時間點阿里是如何進入的?並對它做出了那些貢獻和支持?

Flink誕生於歐洲的一個大數據研究項目StratoSphere。該項目是柏林工業大學的一個研究性項目。早期,Flink是做Batch計算的,但是在2014年,StratoSphere裡面的核心成員孵化出Flink,同年將Flink捐贈Apache,並在後來成為Apache的頂級大數據項目,同時Flink計算的主流方向被定位為Streaming,即用流式計算來做所有大數據的計算,這就是Flink技術誕生的背景。

2014年Flink作為主攻流計算的大數據引擎開始在開源大數據行業內嶄露頭角。區別於Storm,Spark Streaming以及其他流式計算引擎的是:它不僅是一個高吞吐、低延遲的計算引擎,同時還提供很多高級的功能。比如它提供了有狀態的計算,支持狀態管理,支持強一致性的數據語義以及支持Event Time,WaterMark對消息亂序的處理。

Flink核心概念以及基本理念

Flink最區別於其他流計算引擎的,其實就是狀態管理。

什麼是狀態?例如開發一套流計算的系統或者任務做數據處理,可能經常要對數據進行統計,如Sum,Count,Min,Max,這些值是需要存儲的。因為要不斷更新,這些值或者變數就可以理解為一種狀態。如果數據源是在讀取Kafka,RocketMQ,可能要記錄讀取到什麼位置,並記錄Offset,這些Offset變數都是要計算的狀態。

Flink提供了內置的狀態管理,可以把這些狀態存儲在Flink內部,而不需要把它存儲在外部系統。這樣做的好處是第一降低了計算引擎對外部系統的依賴以及部署,使運維更加簡單;第二,對性能帶來了極大的提升:如果通過外部去訪問,如Redis,HBase它一定是通過網路及RPC。如果通過Flink內部去訪問,它只通過自身的進程去訪問這些變數。同時Flink會定期將這些狀態做Checkpoint持久化,把Checkpoint存儲到一個分布式的持久化系統中,比如HDFS。這樣的話,當Flink的任務出現任何故障時,它都會從最近的一次Checkpoint將整個流的狀態進行恢復,然後繼續運行它的流處理。對用戶沒有任何數據上的影響。

Flink是如何做到在Checkpoint恢復過程中沒有任何數據的丟失和數據的冗餘?來保證精準計算的?

這其中原因是Flink利用了一套非常經典的Chandy-Lamport演算法,它的核心思想是把這個流計算看成一個流式的拓撲,定期從這個拓撲的頭部Source點開始插入特殊的Barries,從上游開始不斷的向下游廣播這個Barries。每一個節點收到所有的Barries,會將State做一次Snapshot,當每個節點都做完Snapshot之後,整個拓撲就算完整的做完了一次Checkpoint。接下來不管出現任何故障,都會從最近的Checkpoint進行恢復。

Flink利用這套經典的演算法,保證了強一致性的語義。這也是Flink與其他無狀態流計算引擎的核心區別。

下面介紹Flink是如何解決亂序問題的。比如星球大戰的播放順序,如果按照上映的時間觀看,可能會發現故事在跳躍。

在流計算中,與這個例子是非常類似的。所有消息到來的時間,和它真正發生在源頭,在線系統Log當中的時間是不一致的。在流處理當中,希望是按消息真正發生在源頭的順序進行處理,不希望是真正到達程序里的時間來處理。Flink提供了Event Time和WaterMark的一些先進技術來解決亂序的問題。使得用戶可以有序的處理這個消息。這是Flink一個很重要的特點。

接下來要介紹的是Flink啟動時的核心理念和核心概念,這是Flink發展的第一個階段;第二個階段時間是2015年和2017年,這個階段也是Flink發展以及阿里巴巴介入的時間。故事源於2015年年中,我們在搜索事業部的一次調研。當時阿里有自己的批處理技術和流計算技術,有自研的,也有開源的。但是,為了思考下一代大數據引擎的方向以及未來趨勢,我們做了很多新技術的調研。

結合大量調研結果,我們最後得出的結論是:解決通用大數據計算需求,批流融合的計算引擎,才是大數據技術的發展方向,並且最終我們選擇了Flink。

但2015年的Flink還不夠成熟,不管是規模還是穩定性尚未經歷實踐。最後我們決定在阿里內部建立一個Flink分支,對Flink做大量的修改和完善,讓其適應阿里巴巴這種超大規模的業務場景。在這個過程當中,我們團隊不僅對Flink在性能和穩定性上做出了很多改進和優化,同時在核心架構和功能上也進行了大量創新和改進,並將其貢獻給社區,例如:Flink新的分布式架構,增量Checkpoint機制,基於Credit-based的網路流控機制和Streaming SQL等。

阿里巴巴對Flink社區的貢獻

我們舉兩個設計案例,第一個是阿里巴巴重構了Flink的分布式架構,將Flink的Job調度和資源管理做了一個清晰的分層和解耦。這樣做的首要好處是Flink可以原生的跑在各種不同的開源資源管理器上。經過這套分布式架構的改進,Flink可以原生地跑在Hadoop Yarn和Kubernetes這兩個最常見的資源管理系統之上。同時將Flink的任務調度從集中式調度改為了分布式調度,這樣Flink就可以支持更大規模的集群,以及得到更好的資源隔離。

另一個是實現了增量的Checkpoint機制,因為Flink提供了有狀態的計算和定期的Checkpoint機制,如果內部的數據越來越多,不停地做Checkpoint,Checkpoint會越來越大,最後可能導致做不出來。提供了增量的Checkpoint後,Flink會自動地發現哪些數據是增量變化,哪些數據是被修改了。同時只將這些修改的數據進行持久化。這樣Checkpoint不會隨著時間的運行而越來越難做,整個系統的性能會非常地平穩,這也是我們貢獻給社區的一個很重大的特性。

經過2015年到2017年對Flink Streaming的能力完善,Flink社區也逐漸成熟起來。Flink也成為在Streaming領域最主流的計算引擎。因為Flink最早期想做一個流批統一的大數據引擎,2018年已經啟動這項工作,為了實現這個目標,阿里巴巴提出了新的統一API架構,統一SQL解決方案,同時流計算的各種功能得到完善後,我們認為批計算也需要各種各樣的完善。無論在任務調度層,還是在數據Shuffle層,在容錯性,易用性上,都需要完善很多工作。

篇幅原因,下面主要和大家分享兩點:

● 統一 API Stack

● 統一 SQL方案

先來看下目前Flink API Stack的一個現狀,調研過Flink或者使用過Flink的開發者應該知道。Flink有2套基礎的API,一套是DataStream,一套是DataSet。DataStream API是針對流式處理的用戶提供,DataSet API是針對批處理用戶提供,但是這兩套API的執行路徑是完全不一樣的,甚至需要生成不同的Task去執行。所以這跟得到統一的API是有沖突的,而且這個也是不完善的,不是最終的解法。在Runtime之上首先是要有一個批流統一融合的基礎API層,我們希望可以統一API層。

因此,我們在新架構中將採用一個DAG(有限無環圖)API,作為一個批流統一的API層。對於這個有限無環圖,批計算和流計算不需要涇渭分明的表達出來。只需要讓開發者在不同的節點,不同的邊上定義不同的屬性,來規劃數據是流屬性還是批屬性。整個拓撲是可以融合批流統一的語義表達,整個計算無需區分是流計算還是批計算,只需要表達自己的需求。有了這套API後,Flink的API Stack將得到統一。

除了統一的基礎API層和統一的API Stack外,同樣在上層統一SQL的解決方案。流和批的SQL,可以認為流計算有數據源,批計算也有數據源,我們可以將這兩種源都模擬成數據表。可以認為流數據的數據源是一張不斷更新的數據表,對於批處理的數據源可以認為是一張相對靜止的表,沒有更新的數據表。整個數據處理可以當做SQL的一個Query,最終產生的結果也可以模擬成一個結果表。

對於流計算而言,它的結果表是一張不斷更新的結果表。對於批處理而言,它的結果表是相當於一次更新完成的結果表。從整個SOL語義上表達,流和批是可以統一的。此外,不管是流式SQL,還是批處理SQL,都可以用同一個Query來表達復用。這樣以來流批都可以用同一個Query優化或者解析。甚至很多流和批的運算元都是可以復用的。

Flink的未來方向

首先,阿里巴巴還是要立足於Flink的本質,去做一個全能的統一大數據計算引擎。將它在生態和場景上進行落地。目前Flink已經是一個主流的流計算引擎,很多互聯網公司已經達成了共識:Flink是大數據的未來,是最好的流計算引擎。下一步很重要的工作是讓Flink在批計算上有所突破。在更多的場景下落地,成為一種主流的批計算引擎。然後進一步在流和批之間進行無縫的切換,流和批的界限越來越模糊。用Flink,在一個計算中,既可以有流計算,又可以有批計算。

第二個方向就是Flink的生態上有更多語言的支持,不僅僅是Java,Scala語言,甚至是機器學習下用的Python,Go語言。未來我們希望能用更多豐富的語言來開發Flink計算的任務,來描述計算邏輯,並和更多的生態進行對接。

最後不得不說AI,因為現在很多大數據計算的需求和數據量都是在支持很火爆的AI場景,所以在Flink流批生態完善的基礎上,將繼續往上走,完善上層Flink的Machine Learning演算法庫,同時Flink往上層也會向成熟的機器學習,深度學習去集成。比如可以做Tensorflow On Flink, 讓大數據的ETL數據處理和機器學習的Feature計算和特徵計算,訓練的計算等進行集成,讓開發者能夠同時享受到多種生態給大家帶來的好處。

③ Flink 有狀態的流的工作(Working with state)

有狀態的函數和操作在處理各個元素或者事件時存儲數據,使得state稱為任何類型的復雜操作的關鍵構建部件,例如:
當一個應用程序搜索某些特定的事件模式時,狀態會保存截止到目前為止遇到過的事件的順序;
當每分鍾聚合事件時,狀態會保存掛起的聚合
當通過數據點來訓練機器學習模型時,狀態會保存當前版本的模型參數

為了使state容錯,Flink需要識別state並 checkpoint 它, 在許多情況下,Flink還管理著應用程序的狀態,這意味著Flink處理內存管理(如果需要,可能會將內存中的數據溢出到磁碟)來保存非常大的state。

這篇文檔介紹了在開發應用程序時如何使用Flink的state 抽象概念。

在Flink中有兩個基本的state:Keyed state和 Operator state

Keyed State 總是與key相關,並且只能應用於 KeyedStream 的函數和操作中。

你可以認為 Keyed State 是一個已經分區或者劃分的,每個state分區對應一個key的 Operator State , 每個 keyed-state 邏輯上與一個<並行操作實例, 鍵>( <parallel-operator-instance, key> )綁定在一起,由於每個key屬於唯一一個鍵控運算元( keyed operator )的並行實例,我們可以簡單地看作是 <operator, key> 。

Keyed State 可以進一步的組成 Key Group , Key Group 是Flink重新分配 Keyed State 的最小單元,這里有跟定義的最大並行數一樣多的 Key Group ,在運行時 keyed operator 的並行實例與key一起為一個或者多個 Key Group 工作。

使用 Operator State (或者非鍵控的state)的話,每個運算元狀態綁定到一個並行運算元實例中。 Kafka Connector 就是在Flink中使用 Operator State 的一個很好的例子,每個 Kafka consumer 的並行實例保存著一個 topic 分區和偏移量的map作為它的 Operator State 。
當並行數發生變化時, Operator State 介面支持在並行操作實例中進行重新分配,這里有多種方法來進行重分配。

Keyed State和 Operator State存在兩種形式:託管的和原生的。

託管的State( Managed State )由Flink運行時控制的數據結構表示, 例如內部哈希表或者RocksDB,例子是"ValueSate", "ListState"等。Flink運行時會對State編碼並將它們寫入checkpoint中。

原生State( Raw State )是運算元保存它們自己的數據結構的state,當checkpoint時,它們僅僅寫一串byte數組到checkpoint中。Flink並不知道State的數據結構,僅能看到原生的byte數組。

所有的數據流函數都可以使用託管state,但是原生state介面只能在實現operator時才能使用。使用託管State(而不是原生state)被推薦使用是因為使用託管state,當並行度發生變化時,Flink可以自動地重新分配state,同時還能更好地管理內存。

託管的鍵控state介面可以訪問所有當前輸入元素的key范圍內的不同類型的state,這也就意味著這種類型的state只能被通過stream.keyBy(...)創建的KeyedStream使用。
現在我們首先來看一下可用的不同類型的state,然後在看它們是如何在程序中使用的,可用State的原形如下:
ValueState<T>:這里保存了一個可以更新和檢索的值(由上述輸入元素的key所限定,所以一個操作的每個key可能有一個值)。這個值可以使用 update(T) 來更新,使用 T value() 來獲取。

ListState<T>:這個保存了一個元素列表,你可以追加元素以及獲取一個囊括當前所保存的所有元素的 Iterable ,元素可以通過調用 add(T) 來添加,而 Iterable 可以調用 Iterable<T> get() 來獲取。

RecingState<T>:這個保存了表示添加到state的所值的聚合的當個值,這個介面與ListState類似,只是調用add(T)添加的元素使用指定的ReceFunction聚合成一個聚合值。

FoldingState<T, ACC>:這將保存表示添加到狀態的所有值的聚合的單個值,與RecingState相反,聚合的數據類型可能跟添加到State的元素的數據類型不同,這個介面與ListState類似,只是調用add(T)添加的元素使用指定的FoldFunction折疊成一個聚合值。

MapState<T>:這個保存了一個映射列表,你可以添加key-value對到state中並且檢索一個包含所有當前保存的映射的Iterable。映射可以使用 put(UK, UV) 或者 putAll(Map<UK, UV>) 來添加。與key相關的value,可以使用 get(UK) 來獲取,映射的迭代、keys及values可以分別調用 entries() , keys() 和 values() 來獲取。

所有類型的state都有一個 clear() 方法來清除當前活動的key(及輸入元素的key)的State。

注意: FoldingState會在下一個Flink版本中啟用,並在將來的版本中徹底刪除,將提供更加一般的替代方案。

值得注意的是這些State對象僅用於與State進行介面,State並不只保存在內存中,也可能會在磁碟中或者其他地方,第二個需要注意的是從State中獲取的值依賴於輸入元素的key,因此如果涉及的key不同,那麼在一次調用用戶函數中獲得的值可能與另一次調用的值不同。

為了獲得一個State句柄,你需要創建一個 StateDescriptor ,這個 StateDescriptor 保存了state的名稱(接下來我們會講到,你可以創建若干個state,但是它們必須有唯一的值以便你能夠引用它們),State保存的值的類型以及用戶自定義函數如:一個 ReceFunction 。根據你想要檢索的state的類型,你可以創建一個 ValueStateDescriptor , 一個 ListStateDescriptor , 一個 RecingStateDescriptor , 一個 FoldingStateDescriptor 或者一個 MapStateDescriptor

State可以通過 RuntimeContext 來訪問,所以只能在富函數中使用。 RichFunction 中的 RuntimeContext 有以下這些方法來訪問state:
ValueState<T> getState(ValueStateDescriptor<T>)
RecingState<T> getRecingState(ReceingStateDescriptor<T>)
ListState<T> getListState(ListStateDescriptor<T>)
FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)

這個 FlatMapFunction 例子展示了所有部件如何組合在一起:

這個例子實現了一個簡單的計數器,我們使用元組的第一個欄位來進行分組(這個例子中,所有的key都是1),這個函數將計數和運行時總和保存在一個ValueState中,一旦計數大於2,就會發出平均值並清理state,因此我們又從0開始。請注意,如果我們在第一個欄位中具有不同值的元組,則這將為每個不同的輸入鍵保持不同的state值。

除了上述介面之外,Scala API還具有快捷方式在KeyedStream上通過有狀態的 map() 或 flatMap() 函數獲取當個ValueState, 用戶定義的Function以一個Option形式來獲取ValueState的當前值,並且必須返回一個更新的值來更新State。

為了使用託管的運算元State,有狀態的函數可以實現更加通用的CheckpointedFunction介面或者ListCheckpoint<T extends Serializable>介面

CheckpointedFunction介面可以通過不同的重分區模式來訪問非鍵控的state,它需要實現兩個方法:

無論何時執行checkpoint, snapshotState() 都會被調用,相應地,每次初始化用戶定義的函數時,都會調用對應的 initializeState() ,當函數首次初始化時,或者當該函數實際上是從較早的檢查點進行恢復時調用的。鑒於此, initializeState() 不僅是不同類型的狀態被初始化的地方,而且還是state恢復邏輯的地方。

目前列表式託管運算元狀態是支持的,State要求是一個可序列化的彼此獨立的列表,因此可以在重新調整後重新分配,換句話說,這些對象是可重新分配的非鍵控state的最小粒度。根據狀態的訪問方法,定義了一下重分配方案:
Even-split redistribution :每個運算元返回一個State元素列表,
Union redistribution :每個運算元返回一個State元素列表,

④ Flink 狀態編程

1.流式計算分為無狀態和有狀態兩種情況。 無狀態的計算觀察每個獨立事件,並根據最後一個事件輸出結果。例如,流處理應用程序從感測器接收水位數據,並在水位超過指定高度時發出警告。有狀態的計算則會基於多個事件輸出結果。以下是一些例子。

(1)所有類型的窗口。例如,計算過去一小時的平均水位,就是有狀態的計算。

(2)所有用於復雜事件處理的狀態機。例如,若在一分鍾內收到兩個相差20cm以上的水位差讀數,則發出警告,這是有狀態的計算。

(3)流與流之間的所有關聯操作,以及流與靜態表或動態表之間的關聯操作,都是有狀態的計算。

2.下圖展示了無狀態流處理和有狀態流處理的主要區別。 無狀態流處理分別接收每條數據記錄(圖中的黑條),然後根據最新輸入的數據生成輸出數據(白條)。有狀態流處理會維護狀態(根據每條輸入記錄進行更新),並基於最新輸入的記錄和當前的狀態值生成輸出記錄(灰條)。

上圖中輸入數據由黑條表示。無狀態流處理每次只轉換一條輸入記錄,並且僅根據最新的輸入記錄輸出結果(白條)。有狀態 流處理維護所有已處理記錄的狀態值,並根據每條新輸入的記錄更新狀態,因此輸出記錄(灰條)反映的是綜合考慮多個事件之後的結果。

3.有狀態的運算元和應用程序

Flink內置的很多運算元,數據源source,數據存儲sink都是有狀態的,流中的數據都是buffer records,會保存一定的元素或者元數據。例如: ProcessWindowFunction會緩存輸入流的數據,ProcessFunction會保存設置的定時器信息等等。

在Flink中,狀態始終與特定運算元相關聯。總的來說,有兩種類型的狀態:

運算元狀態(operator state)

鍵控狀態(keyed state)

4.運算元狀態(operator state)

運算元狀態的作用范圍限定為運算元任務。這意味著由同一並行任務所處理的所有數據都可以訪問到相同的狀態,狀態對於同一任務而言是共享的。運算元狀態不能由相同或不同運算元的另一個任務訪問。

Flink為運算元狀態提供三種基本數據結構:

列表狀態(List state):將狀態表示為一組數據的列表。

聯合列表狀態(Union list state):也將狀態表示為數據的列表。它與常規列表狀態的區別在於,在發生故障時,或者從保存點(savepoint)啟動應用程序時如何恢復。

廣播狀態(Broadcast state):如果一個運算元有多項任務,而它的每項任務狀態又都相同,那麼這種特殊情況最適合應用廣播狀態

5.鍵控狀態(keyed state)

鍵控狀態是根據輸入數據流中定義的鍵(key)來維護和訪問的。Flink為每個鍵值維護一個狀態實例,並將具有相同鍵的所有數據,都分區到同一個運算元任務中,這個任務會維護和處理這個key對應的狀態。當任務處理一條數據時,它會自動將狀態的訪問范圍限定為當前數據的key。因此,具有相同key的所有數據都會訪問相同的狀態。Keyed State很類似於一個分布式的key-value map數據結構,只能用於KeyedStream(keyBy運算元處理之後)。

6.狀態後端(state backend)

每傳入一條數據,有狀態的運算元任務都會讀取和更新狀態。由於有效的狀態訪問對於處理數據的低延遲至關重要,因此每個並行任務都會在本地維護其狀態,以確保快速的狀態訪問。狀態的存儲、訪問以及維護,由一個可插入的組件決定,這個組件就叫做狀態後端(state backend)

狀態後端主要負責兩件事:

1)本地的狀態管理

2)將檢查點(checkpoint)狀態寫入遠程存儲

狀態後端分類:

(1)MemoryStateBackend

內存級的狀態後端,會將鍵控狀態作為內存中的對象進行管理,將它們存儲在TaskManager的JVM堆上;而將checkpoint存儲在JobManager的內存中。

(2)FsStateBackend

將checkpoint存到遠程的持久化文件系統(FileSystem)上。而對於本地狀態,跟MemoryStateBackend一樣,也會存在TaskManager的JVM堆上。

(3)RocksDBStateBackend

將所有狀態序列化後,存入本地的RocksDB中存儲。

7.狀態一致性

當在分布式系統中引入狀態時,自然也引入了一致性問題。一致性實際上是"正確性級別"的另一種說法,也就是說在成功處理故障並恢復之後得到的結果,與沒有發生任何故障時得到的結果相比,前者到底有多正確?舉例來說,假設要對最近一小時登錄的用戶計數。在系統經歷故障之後,計數結果是多少?如果有偏差,是有漏掉的計數還是重復計數?

1)一致性級別

在流處理中,一致性可以分為3個級別:

(1) at-most-once : 這其實是沒有正確性保障的委婉說法——故障發生之後,計數結果可能丟失。同樣的還有udp。

(2) at-least-once : 這表示計數結果可能大於正確值,但絕不會小於正確值。也就是說,計數程序在發生故障後可能多算,但是絕不會少算。

(3) exactly-once : 這指的是系統保證在發生故障後得到的計數結果與正確值一致。

曾經,at-least-once非常流行。第一代流處理器(如Storm和Samza)剛問世時只保證at-least-once,原因有二。

(1)保證exactly-once的系統實現起來更復雜。這在基礎架構層(決定什麼代表正確,以及exactly-once的范圍是什麼)和實現層都很有挑戰性。

(2)流處理系統的早期用戶願意接受框架的局限性,並在應用層想辦法彌補(例如使應用程序具有冪等性,或者用批量計算層再做一遍計算)。

最先保證exactly-once的系統(Storm Trident和Spark Streaming)在性能和表現力這兩個方面付出了很大的代價。為了保證exactly-once,這些系統無法單獨地對每條記錄運用應用邏輯,而是同時處理多條(一批)記錄,保證對每一批的處理要麼全部成功,要麼全部失敗。這就導致在得到結果前,必須等待一批記錄處理結束。因此,用戶經常不得不使用兩個流處理框架(一個用來保證exactly-once,另一個用來對每個元素做低延遲處理),結果使基礎設施更加復雜。曾經,用戶不得不在保證exactly-once與獲得低延遲和效率之間權衡利弊。Flink避免了這種權衡。

Flink的一個重大價值在於, 它既保證了 exactly-once ,也具有低延遲和高吞吐的處理能力 。

從根本上說,Flink通過使自身滿足所有需求來避免權衡,它是業界的一次意義重大的技術飛躍。盡管這在外行看來很神奇,但是一旦了解,就會恍然大悟。

2)端到端(end-to-end)狀態一致性

目前我們看到的一致性保證都是由流處理器實現的,也就是說都是在 Flink 流處理器內部保證的;而在真實應用中,流處理應用除了流處理器以外還包含了數據源(例如 Kafka)和輸出到持久化系統。

端到端的一致性保證,意味著結果的正確性貫穿了整個流處理應用的始終;每一個組件都保證了它自己的一致性,整個端到端的一致性級別取決於所有組件中一致性最弱的組件。具體可以劃分如下:

1)source端 —— 需要外部源可重設數據的讀取位置

2)link內部 —— 依賴checkpoint

3)sink端 —— 需要保證從故障恢復時,數據不會重復寫入外部系統

而對於sink端,又有兩種具體的實現方式:冪等(Idempotent)寫入和事務性(Transactional)寫入。

4)冪等寫入

所謂冪等操作,是說一個操作,可以重復執行很多次,但只導致一次結果更改,也就是說,後面再重復執行就不起作用了。

5)事務寫入

需要構建事務來寫入外部系統,構建的事務對應著 checkpoint,等到 checkpoint 真正完成的時候,才把所有對應的結果寫入 sink 系統中。

對於事務性寫入,具體又有兩種實現方式:預寫日誌(WAL)和兩階段提交(2PC)。

8.檢查點(checkpoint)

Flink具體如何保證exactly-once呢? 它使用一種被稱為"檢查點"(checkpoint)的特性,在出現故障時將系統重置回正確狀態。下面通過簡單的類比來解釋檢查點的作用。

9.Flink+Kafka如何實現端到端的exactly-once語義

我們知道,端到端的狀態一致性的實現,需要每一個組件都實現,對於Flink + Kafka的數據管道系統(Kafka進、Kafka出)而言,各組件怎樣保證exactly-once語義呢?

1)內部 —— 利用checkpoint機制,把狀態存檔,發生故障的時候可以恢復,保證內部的狀態一致性

2)source —— kafka consumer作為source,可以將偏移量保存下來,如果後續任務出現了故障,恢復的時候可以由連接器重置偏移量,重新消費數據,保證一致性

3)sink —— kafka procer作為sink,採用兩階段提交 sink,需要實現一個TwoPhaseCommitSinkFunction

內部的checkpoint機制我們已經有了了解,那source和sink具體又是怎樣運行的呢?接下來我們逐步做一個分析。

我們知道Flink由JobManager協調各個TaskManager進行checkpoint存儲,checkpoint保存在 StateBackend中,默認StateBackend是內存級的,也可以改為文件級的進行持久化保存。

當checkpoint 啟動時,JobManager 會將檢查點分界線(barrier)注入數據流;barrier會在運算元間傳遞下去。

每個運算元會對當前的狀態做個快照,保存到狀態後端。對於source任務而言,就會把當前的offset作為狀態保存起來。下次從checkpoint恢復時,source任務可以重新提交偏移量,從上次保存的位置開始重新消費數據。

每個內部的transform 任務遇到 barrier 時,都會把狀態存到 checkpoint 里。

sink 任務首先把數據寫入外部 kafka,這些數據都屬於預提交的事務(還不能被消費);當遇到 barrier 時,把狀態保存到狀態後端,並開啟新的預提交事務。

當所有運算元任務的快照完成,也就是這次的 checkpoint 完成時,JobManager 會向所有任務發通知,確認這次 checkpoint 完成。

當sink 任務收到確認通知,就會正式提交之前的事務,kafka 中未確認的數據就改為「已確認」,數據就真正可以被消費了。

所以我們看到,執行過程實際上是一個兩段式提交,每個運算元執行完成,會進行「預提交」,直到執行完sink操作,會發起「確認提交」,如果執行失敗,預提交會放棄掉。

具體的兩階段提交步驟總結如下:

1)第一條數據來了之後,開啟一個 kafka 的事務(transaction),正常寫入 kafka 分區日誌但標記為未提交,這就是「預提交」

2)觸發 checkpoint 操作,barrier從 source 開始向下傳遞,遇到 barrier 的運算元將狀態存入狀態後端,並通知jobmanager

3)sink 連接器收到 barrier,保存當前狀態,存入 checkpoint,通知 jobmanager,並開啟下一階段的事務,用於提交下個檢查點的數據

4)jobmanager 收到所有任務的通知,發出確認信息,表示 checkpoint 完成

5)sink 任務收到 jobmanager 的確認信息,正式提交這段時間的數據

6)外部kafka關閉事務,提交的數據可以正常消費了。

⑤ Flink的類載入器解析

在運行 Flink 應用程序時,JVM 會隨著時間的推移載入各種類。 這些類可以根據它們的來源分為三組:

作為一般規則,無論何時您先啟動 Flink 進程然後再提交作業,作業的類都會動態載入。 如果 Flink 進程與作業/應用程序一起啟動,或者如果應用程序產生 Flink 組件(JobManager、TaskManager 等),那麼所有作業的類都在 Java 類路徑中。

插件組件中的代碼由每個插件的專用類載入器動態載入一次。

以下是有關不同部署模式的更多詳細信息:

當作為獨立會話啟動 Flink 集群時,JobManagers 和 TaskManagers 使用 Java 類路徑中的 Flink 框架類啟動。 針對會話(通過 REST / CLI)提交的所有作業/應用程序中的類都是動態載入的。

Docker / Kubernetes 設置首先啟動一組 JobManagers / TaskManagers,然後通過 REST 或 CLI 提交作業/應用程序,其行為類似於獨立會話:Flink 的代碼位於 Java 類路徑中,插件組件和作業代碼在啟動時動態載入。

YARN 類載入在單個作業部署和會話之間有所不同:

當直接向 YARN 提交 Flink 作業/應用程序時(通過 bin/flink run -m yarn-cluster ...),將為該作業啟動專用的 TaskManager 和 JobManager。 這些 JVM 在 Java 類路徑中具有用戶代碼類。 這意味著在這種情況下,作業不涉及動態類載入。

當啟動一個 YARN 會話時,JobManagers 和 TaskManagers 是用 classpath 中的 Flink 框架類啟動的。 針對會話提交的所有作業的類都是動態載入的。

在涉及動態類載入的設置中(插件組件、會話設置中的 Flink 作業),通常有兩個類載入器的層次結構:(1)Java 的應用程序類載入器,它包含類路徑中的所有類,以及(2)動態插件/ 用戶代碼類載入器。 用於從插件或用戶代碼 jar 載入類。 動態 ClassLoader 將應用程序類載入器作為其父級。

默認情況下,Flink 反轉類載入順序,這意味著它首先查看動態類載入器,如果類不是動態載入代碼的一部分,則僅查看父類(應用程序類載入器)。

反向類載入的好處是插件和作業可以使用與 Flink 核心本身不同的庫版本,這在不同版本的庫不兼容時非常有用。 該機制有助於避免常見的依賴沖突錯誤,如 IllegalAccessError 或 NoSuchMethodError。 代碼的不同部分只是具有單獨的類副本(Flink 的核心或其依賴項之一可以使用與用戶代碼或插件代碼不同的副本)。 在大多數情況下,這運行良好,不需要用戶進行額外配置。

但是,在某些情況下,反向類載入會導致問題(請參閱下文,「X cannot be cast to X」)。 對於用戶代碼類載入,您可以通過在 Flink 配置中通過 classloader.resolve-order 將 ClassLoader 解析順序配置為 parent-first(從 Flink 的默認 child-first)來恢復到 Java 的默認模式。

請注意,某些類總是以父級優先的方式解析(首先通過父類載入器),因為它們在 Flink 的核心和插件/用戶代碼或面向插件/用戶代碼的 API 之間共享。 這些類的包是通過 classloader.parent-first-patterns-default 和 classloader.parent-first-patterns-additional 配置的。 要添加父級優先載入的新包,請設置 classloader.parent-first-patterns-additional 配置選項。

所有組件(JobManger、TaskManager、Client、ApplicationMaster 等)在啟動時記錄它們的類路徑設置。 它們可以作為日誌開頭的環境信息的一部分找到。

當運行 JobManager 和 TaskManagers 專用於一項特定作業的設置時,可以將用戶代碼 JAR 文件直接放入 /lib 文件夾中,以確保它們是類路徑的一部分而不是動態載入。

通常將作業的 JAR 文件放入 /lib 目錄中。 JAR 將成為類路徑(AppClassLoader)和動態類載入器(FlinkUserCodeClassLoader)的一部分。 因為 AppClassLoader 是 FlinkUserCodeClassLoader 的父級(並且 Java 載入父級,默認情況下),這應該導致類只載入一次。

對於無法將作業的 JAR 文件放入 /lib 文件夾的設置(例如因為安裝程序是由多個作業使用的會話),仍然可以將公共庫放入 /lib 文件夾,並避免動態為那些類進行載入。

在某些情況下,轉換函數、源或接收器需要手動載入類(通過反射動態載入)。 為此,它需要能夠訪問作業類的類載入器。

在這種情況下,函數(或源或接收器)可以成為 RichFunction(例如 RichMapFunction 或 RichWindowFunction)並通過 getRuntimeContext().getUserCodeClassLoader() 訪問用戶代碼類載入器。

在使用動態類載入的設置中,您可能會看到 com.foo.X cannot be cast to com.foo.X 樣式中的異常。 這意味著 com.foo.X 類的多個版本已被不同的類載入器載入,並且該類的類型試圖相互分配。

一個常見的原因是庫與 Flink 的反向類載入方法不兼容。 您可以關閉反向類載入來驗證這一點(在 Flink 配置中設置 classloader.resolve-order: parent-first)或從反向類載入中排除庫(在 Flink 配置中設置 classloader.parent-first-patterns-additional)。

另一個原因可能是緩存對象實例,如 Apache Avro 之類的某些庫或通過注冊(例如通過 Guava 的 Interners)生成的對象實例。 這里的解決方案是要麼在沒有任何動態類載入的情況下進行設置,要麼確保相應的庫完全是動態載入代碼的一部分。 後者意味著該庫不能被添加到 Flink 的 /lib 文件夾中,而必須是應用程序的 fat-jar/uber-jar 的一部分

所有涉及動態用戶代碼類載入(會話)的場景都依賴於再次卸載類。 類卸載意味著垃圾收集器發現類中不存在任何對象,因此刪除該類(代碼、靜態變數、元數據等)。

每當 TaskManager 啟動(或重新啟動)一個任務時,它將載入該特定任務的代碼。 除非可以卸載類,否則這將成為內存泄漏,因為載入了新版本的類,並且載入的類總數會隨著時間的推移而累積。 這通常通過 OutOfMemoryError: Metaspace 表現出來。

類泄漏的常見原因和建議的修復:

卸載動態載入類的一個有用工具是用戶代碼類載入器釋放鉤子。 這些是在卸載類載入器之前執行的鉤子。 通常建議關閉和卸載資源作為常規函數生命周期的一部分(通常是 close() 方法)。 但在某些情況下(例如對於靜態欄位),最好在不再需要類載入器時卸載。

類載入器釋放鉤子可以通過 RuntimeContext.() 方法注冊。

從應用程序開發人員的角度解決依賴沖突的一種方法是通過隱藏它們來避免暴露依賴關系。

Apache Maven 提供了 maven-shade-plugin,它允許在編譯後更改類的包(因此您編寫的代碼不受陰影影響)。 例如,如果您的用戶代碼 jar 中有來自 aws sdk 的 com.amazonaws 包,則 shade 插件會將它們重新定位到 org.myorg.shaded.com.amazonaws 包中,以便您的代碼調用您的 aws sdk 版本。

注意 Flink 的大部分依賴,比如 guava、netty、jackson 等,都被 Flink 的維護者屏蔽掉了,所以用戶通常不用擔心。

⑥ Flink基礎系列28-Flink容錯機制

在執行流應用程序期間,Flink 會定期保存狀態的一致檢查點

如果發生故障, Flink 將會使用最近的檢查點來一致恢復應用程序的狀態,並重新啟動處理流程

(如圖中所示,7這個數據被source讀到了,准備傳給奇數流時,奇數流宕機了,數據傳輸發生中斷)

遇到故障之後,第一步就是重啟應用

(重啟後,起初流都是空的)

第二步是從 checkpoint 中讀取狀態,將狀態重置

(讀取在遠程倉庫(Storage,這里的倉庫指狀態後端保存數據指定的三種方式之一)保存的狀態)

從檢查點重新啟動應用程序後,其內部狀態與檢查點完成時的狀態完全相同

第三步:開始消費並處理檢查點到發生故障之間的所有數據

這種檢查點的保存和恢復機制可以為應用程序狀態提供「精確一次」(exactly-once)的一致性,因為所有運算元都會保存檢查點並恢復其所有狀態,這樣一來所有的輸入流就都會被重置到檢查點完成時的位置

(這里要求source源也能記錄狀態,回退到讀取數據7的狀態,kafka有相應的偏移指針能完成該操作)

概述
checkpoint和Watermark一樣,都會以廣播的形式告訴所有下游。

具體講解

JobManager 會向每個 source 任務發送一條帶有新檢查點 ID 的消息,通過這種方式來啟動檢查點

(這個帶有新檢查點ID的東西為barrier,由圖中三角型表示,數值2隻是ID)

數據源將它們的狀態寫入檢查點,並發出一個檢查點barrier
狀態後端在狀態存入檢查點之後,會返回通知給source任務,source任務就會向JobManager確認檢查點完成
上圖,在Source端接受到barrier後,將自己此身的3 和 4 的數據的狀態寫入檢查點,且向JobManager發送checkpoint成功的消息,然後向下游分別發出一個檢查點 barrier

可以看出在Source接受barrier時,數據流也在不斷的處理,不會進行中斷

此時的偶數流已經處理完藍2變成了4,但是還沒處理到黃4,只是下游sink發送了一個數據4,而奇數流已經處理完藍3變成了8(黃1+藍1+黃3+藍3),並向下游sink發送了8

此時檢查點barrier都還未到Sum_odd奇數流和Sum_even偶數流

分界線對齊:barrier向下游傳遞,sum任務會等待所有輸入分區的barrier到達
對於barrier已經達到的分區,繼續到達的數據會被緩存
而barrier尚未到達的分區,數據會被正常處理
此時藍色流的barrier先一步抵達了偶數流,黃色的barrier還未到,但是因為數據的不中斷一直處理,此時的先到的藍色的barrier會將此時的偶數流的數據4進行緩存處理,流接著處理接下來的數據等待著黃色的barrier的到來,而黃色barrier之前的數據將會對緩存的數據相加

這次處理的總結:分界線對齊:barrier 向下游傳遞,sum 任務會等待所有輸入分區的 barrier 到達,對於barrier已經到達的分區,繼續到達的數據會被緩存。而barrier尚未到達的分區,數據會被正常處理

當收到所有輸入分區的 barrier 時,任務就將其狀態保存到狀態後端的檢查點中,然後將 barrier 繼續向下游轉發
當藍色的barrier和黃色的barrier(所有分區的)都到達後,進行狀態保存到遠程倉庫,然後對JobManager發送消息,說自己的檢查點保存完畢了

此時的偶數流和奇數流都為8

向下游轉發檢查點 barrier 後,任務繼續正常的數據處理

Sink 任務向 JobManager 確認狀態保存到 checkpoint 完畢
當所有任務都確認已成功將狀態保存到檢查點時,檢查點就真正完成了

CheckPoint為自動保存,SavePoint為手動保存

有狀態的流處理,內部每個運算元任務都可以有自己的狀態

對於流處理器內部來說,所謂的狀態一致性,其實就是我們所說的計算結果要保證准確。

一條數據不應該丟失,也不應該重復計算

在遇到故障時可以恢復狀態,恢復以後的重新計算,結果應該也是完全正確的。

Flink的一個重大價值在於,它既保證了exactly-once,也具有低延遲和高吞吐的處理能力。

Flink使用了一種輕量級快照機制——檢查點(checkpoint)來保證exactly-once語義
有狀態流應用的一致檢查點,其實就是:所有任務的狀態,在某個時間點的一份備份(一份快照)。而這個時間點,應該是所有任務都恰好處理完一個相同的輸入數據的時間。
應用狀態的一致檢查點,是Flink故障恢復機制的核心

端到端(end-to-end)狀態一致性
目前我們看到的一致性保證都是由流處理器實現的,也就是說都是在Flink流處理器內部保證的;而在真實應用中,流處理應用除了流處理器以外還包含了數據源(例如Kafka)和輸出到持久化系統

端到端的一致性保證,意味著結果的正確性貫穿了整個流處理應用的始終;每一個組件都保證了它自己的一致性

整個端到端的一致性級別取決於所有組件中一致性最弱的組件

端到端 exactly-once

冪等寫入
所謂冪等操作,是說一個操作,可以重復執行很多次,但只導致一次結果更改,也就是說,後面再重復執行就不起作用了。

(中間可能會存在不正確的情況,只能保證最後結果正確。比如5=>10=>15=>5=>10=>15,雖然最後是恢復到了15,但是中間有個恢復的過程,如果這個過程能夠被讀取,就會出問題。)

事務寫入

預寫日誌(Write-Ahead-Log,WAL)
把結果數據先當成狀態保存,然後在收到checkpoint完成的通知時,一次性寫入sink系統
簡單易於實現,由於數據提前在狀態後端中做了緩存,所以無論什麼sink系統,都能用這種方式一批搞定
DataStream API提供了一個模版類:GenericWriteAheadSink,來實現這種事務性sink

兩階段提交(Two-Phase-Commit,2PC)
對於每個checkpoint,sink任務會啟動一個事務,並將接下來所有接收到的數據添加到事務里
然後將這些數據寫入外部sink系統,但不提交它們——這時只是"預提交"
這種方式真正實現了exactly-once,它需要一個提供事務支持的外部sink系統。Flink提供了TwoPhaseCommitSinkFunction介面

不同Source和Sink的一致性保證

內部——利用checkpoint機制,把狀態存檔,發生故障的時候可以恢復,保證內部的狀態一致性
source——kafka consumer作為source,可以將偏移量保存下來,如果後續任務出現了故障,恢復的時候可以由連接器重製偏移量,重新消費數據,保證一致性
sink——kafka procer作為sink,採用兩階段提交sink,需要實現一個TwoPhaseCommitSinkFunction

Exactly-once 兩階段提交

JobManager 協調各個 TaskManager 進行 checkpoint 存儲
checkpoint保存在 StateBackend中,默認StateBackend是內存級的,也可以改為文件級的進行持久化保存

當 checkpoint 啟動時,JobManager 會將檢查點分界線(barrier)注入數據流
barrier會在運算元間傳遞下去

每個運算元會對當前的狀態做個快照,保存到狀態後端
checkpoint 機制可以保證內部的狀態一致性

每個內部的 transform 任務遇到 barrier 時,都會把狀態存到 checkpoint 里

sink 任務首先把數據寫入外部 kafka,這些數據都屬於預提交的事務;遇到 barrier 時,把狀態保存到狀態後端,並開啟新的預提交事務

(barrier之前的數據還是在之前的事務中沒關閉事務,遇到barrier後的數據另外新開啟一個事務)

當所有運算元任務的快照完成,也就是這次的 checkpoint 完成時,JobManager 會向所有任務發通知,確認這次 checkpoint 完成
sink 任務收到確認通知,正式提交之前的事務,kafka 中未確認數據改為「已確認」

Exactly-once 兩階段提交步驟總結

⑦ 五種大數據處理架構

五種大數據處理架構
大數據是收集、整理、處理大容量數據集,並從中獲得見解所需的非傳統戰略和技術的總稱。雖然處理數據所需的計算能力或存儲容量早已超過一台計算機的上限,但這種計算類型的普遍性、規模,以及價值在最近幾年才經歷了大規模擴展。
本文將介紹大數據系統一個最基本的組件:處理框架。處理框架負責對系統中的數據進行計算,例如處理從非易失存儲中讀取的數據,或處理剛剛攝入到系統中的數據。數據的計算則是指從大量單一數據點中提取信息和見解的過程。
下文將介紹這些框架:
· 僅批處理框架:
Apache Hadoop
· 僅流處理框架:
Apache Storm
Apache Samza
· 混合框架:
Apache Spark
Apache Flink
大數據處理框架是什麼?
處理框架和處理引擎負責對數據系統中的數據進行計算。雖然「引擎」和「框架」之間的區別沒有什麼權威的定義,但大部分時候可以將前者定義為實際負責處理數據操作的組件,後者則可定義為承擔類似作用的一系列組件。
例如Apache Hadoop可以看作一種以MapRece作為默認處理引擎的處理框架。引擎和框架通常可以相互替換或同時使用。例如另一個框架Apache Spark可以納入Hadoop並取代MapRece。組件之間的這種互操作性是大數據系統靈活性如此之高的原因之一。
雖然負責處理生命周期內這一階段數據的系統通常都很復雜,但從廣義層面來看它們的目標是非常一致的:通過對數據執行操作提高理解能力,揭示出數據蘊含的模式,並針對復雜互動獲得見解。
為了簡化這些組件的討論,我們會通過不同處理框架的設計意圖,按照所處理的數據狀態對其進行分類。一些系統可以用批處理方式處理數據,一些系統可以用流方式處理連續不斷流入系統的數據。此外還有一些系統可以同時處理這兩類數據。
在深入介紹不同實現的指標和結論之前,首先需要對不同處理類型的概念進行一個簡單的介紹。
批處理系統
批處理在大數據世界有著悠久的歷史。批處理主要操作大容量靜態數據集,並在計算過程完成後返回結果。
批處理模式中使用的數據集通常符合下列特徵…
· 有界:批處理數據集代表數據的有限集合
· 持久:數據通常始終存儲在某種類型的持久存儲位置中
· 大量:批處理操作通常是處理極為海量數據集的唯一方法
批處理非常適合需要訪問全套記錄才能完成的計算工作。例如在計算總數和平均數時,必須將數據集作為一個整體加以處理,而不能將其視作多條記錄的集合。這些操作要求在計算進行過程中數據維持自己的狀態。
需要處理大量數據的任務通常最適合用批處理操作進行處理。無論直接從持久存儲設備處理數據集,或首先將數據集載入內存,批處理系統在設計過程中就充分考慮了數據的量,可提供充足的處理資源。由於批處理在應對大量持久數據方面的表現極為出色,因此經常被用於對歷史數據進行分析。
大量數據的處理需要付出大量時間,因此批處理不適合對處理時間要求較高的場合。
Apache Hadoop
Apache Hadoop是一種專用於批處理的處理框架。Hadoop是首個在開源社區獲得極大關注的大數據框架。基於谷歌有關海量數據處理所發表的多篇論文與經驗的Hadoop重新實現了相關演算法和組件堆棧,讓大規模批處理技術變得更易用。
新版Hadoop包含多個組件,即多個層,通過配合使用可處理批數據:
· HDFS:HDFS是一種分布式文件系統層,可對集群節點間的存儲和復制進行協調。HDFS確保了無法避免的節點故障發生後數據依然可用,可將其用作數據來源,可用於存儲中間態的處理結果,並可存儲計算的最終結果。
· YARN:YARN是Yet Another Resource Negotiator(另一個資源管理器)的縮寫,可充當Hadoop堆棧的集群協調組件。該組件負責協調並管理底層資源和調度作業的運行。通過充當集群資源的介面,YARN使得用戶能在Hadoop集群中使用比以往的迭代方式運行更多類型的工作負載。
· MapRece:MapRece是Hadoop的原生批處理引擎。
批處理模式
Hadoop的處理功能來自MapRece引擎。MapRece的處理技術符合使用鍵值對的map、shuffle、rece演算法要求。基本處理過程包括:
· 從HDFS文件系統讀取數據集
· 將數據集拆分成小塊並分配給所有可用節點
· 針對每個節點上的數據子集進行計算(計算的中間態結果會重新寫入HDFS)
· 重新分配中間態結果並按照鍵進行分組
· 通過對每個節點計算的結果進行匯總和組合對每個鍵的值進行「Recing」
· 將計算而來的最終結果重新寫入 HDFS
優勢和局限
由於這種方法嚴重依賴持久存儲,每個任務需要多次執行讀取和寫入操作,因此速度相對較慢。但另一方面由於磁碟空間通常是伺服器上最豐富的資源,這意味著MapRece可以處理非常海量的數據集。同時也意味著相比其他類似技術,Hadoop的MapRece通常可以在廉價硬體上運行,因為該技術並不需要將一切都存儲在內存中。MapRece具備極高的縮放潛力,生產環境中曾經出現過包含數萬個節點的應用。
MapRece的學習曲線較為陡峭,雖然Hadoop生態系統的其他周邊技術可以大幅降低這一問題的影響,但通過Hadoop集群快速實現某些應用時依然需要注意這個問題。
圍繞Hadoop已經形成了遼闊的生態系統,Hadoop集群本身也經常被用作其他軟體的組成部件。很多其他處理框架和引擎通過與Hadoop集成也可以使用HDFS和YARN資源管理器。
總結
Apache Hadoop及其MapRece處理引擎提供了一套久經考驗的批處理模型,最適合處理對時間要求不高的非常大規模數據集。通過非常低成本的組件即可搭建完整功能的Hadoop集群,使得這一廉價且高效的處理技術可以靈活應用在很多案例中。與其他框架和引擎的兼容與集成能力使得Hadoop可以成為使用不同技術的多種工作負載處理平台的底層基礎。
流處理系統
流處理系統會對隨時進入系統的數據進行計算。相比批處理模式,這是一種截然不同的處理方式。流處理方式無需針對整個數據集執行操作,而是對通過系統傳輸的每個數據項執行操作。
· 流處理中的數據集是「無邊界」的,這就產生了幾個重要的影響:
· 完整數據集只能代表截至目前已經進入到系統中的數據總量。
· 工作數據集也許更相關,在特定時間只能代表某個單一數據項。
處理工作是基於事件的,除非明確停止否則沒有「盡頭」。處理結果立刻可用,並會隨著新數據的抵達繼續更新。
流處理系統可以處理幾乎無限量的數據,但同一時間只能處理一條(真正的流處理)或很少量(微批處理,Micro-batch Processing)數據,不同記錄間只維持最少量的狀態。雖然大部分系統提供了用於維持某些狀態的方法,但流處理主要針對副作用更少,更加功能性的處理(Functional processing)進行優化。
功能性操作主要側重於狀態或副作用有限的離散步驟。針對同一個數據執行同一個操作會或略其他因素產生相同的結果,此類處理非常適合流處理,因為不同項的狀態通常是某些困難、限制,以及某些情況下不需要的結果的結合體。因此雖然某些類型的狀態管理通常是可行的,但這些框架通常在不具備狀態管理機制時更簡單也更高效。
此類處理非常適合某些類型的工作負載。有近實時處理需求的任務很適合使用流處理模式。分析、伺服器或應用程序錯誤日誌,以及其他基於時間的衡量指標是最適合的類型,因為對這些領域的數據變化做出響應對於業務職能來說是極為關鍵的。流處理很適合用來處理必須對變動或峰值做出響應,並且關注一段時間內變化趨勢的數據。
Apache Storm
Apache Storm是一種側重於極低延遲的流處理框架,也許是要求近實時處理的工作負載的最佳選擇。該技術可處理非常大量的數據,通過比其他解決方案更低的延遲提供結果。
流處理模式
Storm的流處理可對框架中名為Topology(拓撲)的DAG(Directed Acyclic Graph,有向無環圖)進行編排。這些拓撲描述了當數據片段進入系統後,需要對每個傳入的片段執行的不同轉換或步驟。
拓撲包含:
· Stream:普通的數據流,這是一種會持續抵達系統的無邊界數據。
· Spout:位於拓撲邊緣的數據流來源,例如可以是API或查詢等,從這里可以產生待處理的數據。
· Bolt:Bolt代表需要消耗流數據,對其應用操作,並將結果以流的形式進行輸出的處理步驟。Bolt需要與每個Spout建立連接,隨後相互連接以組成所有必要的處理。在拓撲的尾部,可以使用最終的Bolt輸出作為相互連接的其他系統的輸入。
Storm背後的想法是使用上述組件定義大量小型的離散操作,隨後將多個組件組成所需拓撲。默認情況下Storm提供了「至少一次」的處理保證,這意味著可以確保每條消息至少可以被處理一次,但某些情況下如果遇到失敗可能會處理多次。Storm無法確保可以按照特定順序處理消息。
為了實現嚴格的一次處理,即有狀態處理,可以使用一種名為Trident的抽象。嚴格來說不使用Trident的Storm通常可稱之為Core Storm。Trident會對Storm的處理能力產生極大影響,會增加延遲,為處理提供狀態,使用微批模式代替逐項處理的純粹流處理模式。
為避免這些問題,通常建議Storm用戶盡可能使用Core Storm。然而也要注意,Trident對內容嚴格的一次處理保證在某些情況下也比較有用,例如系統無法智能地處理重復消息時。如果需要在項之間維持狀態,例如想要計算一個小時內有多少用戶點擊了某個鏈接,此時Trident將是你唯一的選擇。盡管不能充分發揮框架與生俱來的優勢,但Trident提高了Storm的靈活性。
Trident拓撲包含:
· 流批(Stream batch):這是指流數據的微批,可通過分塊提供批處理語義。
· 操作(Operation):是指可以對數據執行的批處理過程。
優勢和局限
目前來說Storm可能是近實時處理領域的最佳解決方案。該技術可以用極低延遲處理數據,可用於希望獲得最低延遲的工作負載。如果處理速度直接影響用戶體驗,例如需要將處理結果直接提供給訪客打開的網站頁面,此時Storm將會是一個很好的選擇。
Storm與Trident配合使得用戶可以用微批代替純粹的流處理。雖然藉此用戶可以獲得更大靈活性打造更符合要求的工具,但同時這種做法會削弱該技術相比其他解決方案最大的優勢。話雖如此,但多一種流處理方式總是好的。
Core Storm無法保證消息的處理順序。Core Storm為消息提供了「至少一次」的處理保證,這意味著可以保證每條消息都能被處理,但也可能發生重復。Trident提供了嚴格的一次處理保證,可以在不同批之間提供順序處理,但無法在一個批內部實現順序處理。
在互操作性方面,Storm可與Hadoop的YARN資源管理器進行集成,因此可以很方便地融入現有Hadoop部署。除了支持大部分處理框架,Storm還可支持多種語言,為用戶的拓撲定義提供了更多選擇。
總結
對於延遲需求很高的純粹的流處理工作負載,Storm可能是最適合的技術。該技術可以保證每條消息都被處理,可配合多種編程語言使用。由於Storm無法進行批處理,如果需要這些能力可能還需要使用其他軟體。如果對嚴格的一次處理保證有比較高的要求,此時可考慮使用Trident。不過這種情況下其他流處理框架也許更適合。
Apache Samza
Apache Samza是一種與Apache Kafka消息系統緊密綁定的流處理框架。雖然Kafka可用於很多流處理系統,但按照設計,Samza可以更好地發揮Kafka獨特的架構優勢和保障。該技術可通過Kafka提供容錯、緩沖,以及狀態存儲。
Samza可使用YARN作為資源管理器。這意味著默認情況下需要具備Hadoop集群(至少具備HDFS和YARN),但同時也意味著Samza可以直接使用YARN豐富的內建功能。
流處理模式
Samza依賴Kafka的語義定義流的處理方式。Kafka在處理數據時涉及下列概念:
· Topic(話題):進入Kafka系統的每個數據流可稱之為一個話題。話題基本上是一種可供消耗方訂閱的,由相關信息組成的數據流。
· Partition(分區):為了將一個話題分散至多個節點,Kafka會將傳入的消息劃分為多個分區。分區的劃分將基於鍵(Key)進行,這樣可以保證包含同一個鍵的每條消息可以劃分至同一個分區。分區的順序可獲得保證。
· Broker(代理):組成Kafka集群的每個節點也叫做代理。
· Procer(生成方):任何向Kafka話題寫入數據的組件可以叫做生成方。生成方可提供將話題劃分為分區所需的鍵。
· Consumer(消耗方):任何從Kafka讀取話題的組件可叫做消耗方。消耗方需要負責維持有關自己分支的信息,這樣即可在失敗後知道哪些記錄已經被處理過了。
由於Kafka相當於永恆不變的日誌,Samza也需要處理永恆不變的數據流。這意味著任何轉換創建的新數據流都可被其他組件所使用,而不會對最初的數據流產生影響。
優勢和局限
乍看之下,Samza對Kafka類查詢系統的依賴似乎是一種限制,然而這也可以為系統提供一些獨特的保證和功能,這些內容也是其他流處理系統不具備的。
例如Kafka已經提供了可以通過低延遲方式訪問的數據存儲副本,此外還可以為每個數據分區提供非常易用且低成本的多訂閱者模型。所有輸出內容,包括中間態的結果都可寫入到Kafka,並可被下游步驟獨立使用。
這種對Kafka的緊密依賴在很多方面類似於MapRece引擎對HDFS的依賴。雖然在批處理的每個計算之間對HDFS的依賴導致了一些嚴重的性能問題,但也避免了流處理遇到的很多其他問題。
Samza與Kafka之間緊密的關系使得處理步驟本身可以非常鬆散地耦合在一起。無需事先協調,即可在輸出的任何步驟中增加任意數量的訂閱者,對於有多個團隊需要訪問類似數據的組織,這一特性非常有用。多個團隊可以全部訂閱進入系統的數據話題,或任意訂閱其他團隊對數據進行過某些處理後創建的話題。這一切並不會對資料庫等負載密集型基礎架構造成額外的壓力。
直接寫入Kafka還可避免回壓(Backpressure)問題。回壓是指當負載峰值導致數據流入速度超過組件實時處理能力的情況,這種情況可能導致處理工作停頓並可能丟失數據。按照設計,Kafka可以將數據保存很長時間,這意味著組件可以在方便的時候繼續進行處理,並可直接重啟動而無需擔心造成任何後果。
Samza可以使用以本地鍵值存儲方式實現的容錯檢查點系統存儲數據。這樣Samza即可獲得「至少一次」的交付保障,但面對由於數據可能多次交付造成的失敗,該技術無法對匯總後狀態(例如計數)提供精確恢復。
Samza提供的高級抽象使其在很多方面比Storm等系統提供的基元(Primitive)更易於配合使用。目前Samza只支持JVM語言,這意味著它在語言支持方面不如Storm靈活。
總結
對於已經具備或易於實現Hadoop和Kafka的環境,Apache Samza是流處理工作負載一個很好的選擇。Samza本身很適合有多個團隊需要使用(但相互之間並不一定緊密協調)不同處理階段的多個數據流的組織。Samza可大幅簡化很多流處理工作,可實現低延遲的性能。如果部署需求與當前系統不兼容,也許並不適合使用,但如果需要極低延遲的處理,或對嚴格的一次處理語義有較高需求,此時依然適合考慮。
混合處理系統:批處理和流處理
一些處理框架可同時處理批處理和流處理工作負載。這些框架可以用相同或相關的組件和API處理兩種類型的數據,藉此讓不同的處理需求得以簡化。
如你所見,這一特性主要是由Spark和Flink實現的,下文將介紹這兩種框架。實現這樣的功能重點在於兩種不同處理模式如何進行統一,以及要對固定和不固定數據集之間的關系進行何種假設。
雖然側重於某一種處理類型的項目會更好地滿足具體用例的要求,但混合框架意在提供一種數據處理的通用解決方案。這種框架不僅可以提供處理數據所需的方法,而且提供了自己的集成項、庫、工具,可勝任圖形分析、機器學習、互動式查詢等多種任務。
Apache Spark
Apache Spark是一種包含流處理能力的下一代批處理框架。與Hadoop的MapRece引擎基於各種相同原則開發而來的Spark主要側重於通過完善的內存計算和處理優化機制加快批處理工作負載的運行速度。
Spark可作為獨立集群部署(需要相應存儲層的配合),或可與Hadoop集成並取代MapRece引擎。
批處理模式
與MapRece不同,Spark的數據處理工作全部在內存中進行,只在一開始將數據讀入內存,以及將最終結果持久存儲時需要與存儲層交互。所有中間態的處理結果均存儲在內存中。
雖然內存中處理方式可大幅改善性能,Spark在處理與磁碟有關的任務時速度也有很大提升,因為通過提前對整個任務集進行分析可以實現更完善的整體式優化。為此Spark可創建代表所需執行的全部操作,需要操作的數據,以及操作和數據之間關系的Directed Acyclic Graph(有向無環圖),即DAG,藉此處理器可以對任務進行更智能的協調。
為了實現內存中批計算,Spark會使用一種名為Resilient Distributed Dataset(彈性分布式數據集),即RDD的模型來處理數據。這是一種代表數據集,只位於內存中,永恆不變的結構。針對RDD執行的操作可生成新的RDD。每個RDD可通過世系(Lineage)回溯至父級RDD,並最終回溯至磁碟上的數據。Spark可通過RDD在無需將每個操作的結果寫回磁碟的前提下實現容錯。
流處理模式
流處理能力是由Spark Streaming實現的。Spark本身在設計上主要面向批處理工作負載,為了彌補引擎設計和流處理工作負載特徵方面的差異,Spark實現了一種叫做微批(Micro-batch)*的概念。在具體策略方面該技術可以將數據流視作一系列非常小的「批」,藉此即可通過批處理引擎的原生語義進行處理。
Spark Streaming會以亞秒級增量對流進行緩沖,隨後這些緩沖會作為小規模的固定數據集進行批處理。這種方式的實際效果非常好,但相比真正的流處理框架在性能方面依然存在不足。
優勢和局限
使用Spark而非Hadoop MapRece的主要原因是速度。在內存計算策略和先進的DAG調度等機制的幫助下,Spark可以用更快速度處理相同的數據集。
Spark的另一個重要優勢在於多樣性。該產品可作為獨立集群部署,或與現有Hadoop集群集成。該產品可運行批處理和流處理,運行一個集群即可處理不同類型的任務。
除了引擎自身的能力外,圍繞Spark還建立了包含各種庫的生態系統,可為機器學習、互動式查詢等任務提供更好的支持。相比MapRece,Spark任務更是「眾所周知」地易於編寫,因此可大幅提高生產力。
為流處理系統採用批處理的方法,需要對進入系統的數據進行緩沖。緩沖機制使得該技術可以處理非常大量的傳入數據,提高整體吞吐率,但等待緩沖區清空也會導致延遲增高。這意味著Spark Streaming可能不適合處理對延遲有較高要求的工作負載。
由於內存通常比磁碟空間更貴,因此相比基於磁碟的系統,Spark成本更高。然而處理速度的提升意味著可以更快速完成任務,在需要按照小時數為資源付費的環境中,這一特性通常可以抵消增加的成本。
Spark內存計算這一設計的另一個後果是,如果部署在共享的集群中可能會遇到資源不足的問題。相比HadoopMapRece,Spark的資源消耗更大,可能會對需要在同一時間使用集群的其他任務產生影響。從本質來看,Spark更不適合與Hadoop堆棧的其他組件共存一處。
總結
Spark是多樣化工作負載處理任務的最佳選擇。Spark批處理能力以更高內存佔用為代價提供了無與倫比的速度優勢。對於重視吞吐率而非延遲的工作負載,則比較適合使用Spark Streaming作為流處理解決方案。
Apache Flink
Apache Flink是一種可以處理批處理任務的流處理框架。該技術可將批處理數據視作具備有限邊界的數據流,藉此將批處理任務作為流處理的子集加以處理。為所有處理任務採取流處理為先的方法會產生一系列有趣的副作用。
這種流處理為先的方法也叫做Kappa架構,與之相對的是更加被廣為人知的Lambda架構(該架構中使用批處理作為主要處理方法,使用流作為補充並提供早期未經提煉的結果)。Kappa架構中會對一切進行流處理,藉此對模型進行簡化,而這一切是在最近流處理引擎逐漸成熟後才可行的。
流處理模型
Flink的流處理模型在處理傳入數據時會將每一項視作真正的數據流。Flink提供的DataStream API可用於處理無盡的數據流。Flink可配合使用的基本組件包括:
· Stream(流)是指在系統中流轉的,永恆不變的無邊界數據集
· Operator(操作方)是指針對數據流執行操作以產生其他數據流的功能
· Source(源)是指數據流進入系統的入口點
· Sink(槽)是指數據流離開Flink系統後進入到的位置,槽可以是資料庫或到其他系統的連接器
為了在計算過程中遇到問題後能夠恢復,流處理任務會在預定時間點創建快照。為了實現狀態存儲,Flink可配合多種狀態後端系統使用,具體取決於所需實現的復雜度和持久性級別。
此外Flink的流處理能力還可以理解「事件時間」這一概念,這是指事件實際發生的時間,此外該功能還可以處理會話。這意味著可以通過某種有趣的方式確保執行順序和分組。
批處理模型
Flink的批處理模型在很大程度上僅僅是對流處理模型的擴展。此時模型不再從持續流中讀取數據,而是從持久存儲中以流的形式讀取有邊界的數據集。Flink會對這些處理模型使用完全相同的運行時。
Flink可以對批處理工作負載實現一定的優化。例如由於批處理操作可通過持久存儲加以支持,Flink可以不對批處理工作負載創建快照。數據依然可以恢復,但常規處理操作可以執行得更快。
另一個優化是對批處理任務進行分解,這樣即可在需要的時候調用不同階段和組件。藉此Flink可以與集群的其他用戶更好地共存。對任務提前進行分析使得Flink可以查看需要執行的所有操作、數據集的大小,以及下游需要執行的操作步驟,藉此實現進一步的優化。
優勢和局限
Flink目前是處理框架領域一個獨特的技術。雖然Spark也可以執行批處理和流處理,但Spark的流處理採取的微批架構使其無法適用於很多用例。Flink流處理為先的方法可提供低延遲,高吞吐率,近乎逐項處理的能力。
Flink的很多組件是自行管理的。雖然這種做法較為罕見,但出於性能方面的原因,該技術可自行管理內存,無需依賴原生的Java垃圾回收機制。與Spark不同,待處理數據的特徵發生變化後Flink無需手工優化和調整,並且該技術也可以自行處理數據分區和自動緩存等操作。
Flink會通過多種方式對工作進行分許進而優化任務。這種分析在部分程度上類似於SQL查詢規劃器對關系型資料庫所做的優化,可針對特定任務確定最高效的實現方法。該技術還支持多階段並行執行,同時可將受阻任務的數據集合在一起。對於迭代式任務,出於性能方面的考慮,Flink會嘗試在存儲數據的節點上執行相應的計算任務。此外還可進行「增量迭代」,或僅對數據中有改動的部分進行迭代。
在用戶工具方面,Flink提供了基於Web的調度視圖,藉此可輕松管理任務並查看系統狀態。用戶也可以查看已提交任務的優化方案,藉此了解任務最終是如何在集群中實現的。對於分析類任務,Flink提供了類似SQL的查詢,圖形化處理,以及機器學習庫,此外還支持內存計算。
Flink能很好地與其他組件配合使用。如果配合Hadoop 堆棧使用,該技術可以很好地融入整個環境,在任何時候都只佔用必要的資源。該技術可輕松地與YARN、HDFS和Kafka 集成。在兼容包的幫助下,Flink還可以運行為其他處理框架,例如Hadoop和Storm編寫的任務。
目前Flink最大的局限之一在於這依然是一個非常「年幼」的項目。現實環境中該項目的大規模部署尚不如其他處理框架那麼常見,對於Flink在縮放能力方面的局限目前也沒有較為深入的研究。隨著快速開發周期的推進和兼容包等功能的完善,當越來越多的組織開始嘗試時,可能會出現越來越多的Flink部署
總結
Flink提供了低延遲流處理,同時可支持傳統的批處理任務。Flink也許最適合有極高流處理需求,並有少量批處理任務的組織。該技術可兼容原生Storm和Hadoop程序,可在YARN管理的集群上運行,因此可以很方便地進行評估。快速進展的開發工作使其值得被大家關注。
結論
大數據系統可使用多種處理技術。
對於僅需要批處理的工作負載,如果對時間不敏感,比其他解決方案實現成本更低的Hadoop將會是一個好選擇。
對於僅需要流處理的工作負載,Storm可支持更廣泛的語言並實現極低延遲的處理,但默認配置可能產生重復結果並且無法保證順序。Samza與YARN和Kafka緊密集成可提供更大靈活性,更易用的多團隊使用,以及更簡單的復制和狀態管理。
對於混合型工作負載,Spark可提供高速批處理和微批處理模式的流處理。該技術的支持更完善,具備各種集成庫和工具,可實現靈活的集成。Flink提供了真正的流處理並具備批處理能力,通過深度優化可運行針對其他平台編寫的任務,提供低延遲的處理,但實際應用方面還為時過早。
最適合的解決方案主要取決於待處理數據的狀態,對處理所需時間的需求,以及希望得到的結果。具體是使用全功能解決方案或主要側重於某種項目的解決方案,這個問題需要慎重權衡。隨著逐漸成熟並被廣泛接受,在評估任何新出現的創新型解決方案時都需要考慮類似的問題。

⑧ Flink如何管理Kafka 消費位點(譯文)

Checkpointing 是 Flink 故障恢復的內部機制。一個 checkpoint 就是 Flink應用程序產生的狀態的一個副本。如果 Flink 任務發生故障,它會從 checkpoint 中載入之前的狀態來恢復任務,就好像故障沒有發生一樣。

Checkpoints 是 Flink 容錯的基礎,並且確保了 Flink 流式應用在失敗時的完整性。Checkpoints 可以通過 Flink 設置定時觸發。

Flink Kafka consumer 使用 Flink 的 checkpoint 機制來存儲 Kafka 每個分區的位點到 state。當 Flink 執行 checkpoint 時,Kafka 的每個分區的位點都被存儲到 checkpoint 指定的 filesystem 中。Flink 的 checkpoint 機制確保了所有任務運算元的狀態是一致的,也就是說這些狀態具有相同的數據輸入。當所有的任務運算元成功存儲他們自己的狀態後,代表一次 checkpoint 的完成。因此,當任務從故障中恢復時,Flink 保證了exactly-once。

下面將一步一步的演示 Flink 是如何通過 checkpoint 來管理 Kafka 的 offset 的。

下面的例子從兩個分區的 Kafka topic 中讀取數據,每個分區的數據是 「A」, 「B」, 「C」, 」D」, 「E」。假設每個分區都是從 0 開始讀取。

假設 Flink Kafka consumer 從分區 0 開始讀取數據 「A」,那麼此時第一個 consumer 的位點從 0 變成 1。如下圖所示。

此時數據 「A」 到達 Flink Job 中的 Map Task。兩個 consumer 繼續讀取數據 (從分區 0 讀取數據 「B」 ,從分區 1 讀取數據 「A」)。 offsets 分別被更新成 2 和 1。與此同時,假設 Flink 從 source 端開始執行 checkpoint。

到這里,Flink Kafka consumer tasks 已經執行了一次快照,offsets也保存到了 state 中(「offset = 2, 1」) 。此時 source tasks 在 數據 「B」 和 「A」 後面,向下游發送一個 checkpoint barrier。checkpoint barriers 是 Flink 用來對齊每個任務運算元的 checkpoint,以確保整個 checkpoint 的一致性。分區 1 的數據 「A」 到達 Flink Map Task, 與此同時分區 0 的 consumer 繼續讀取下一個消息(message 「C」)。

Flink Map Task 收到上游兩個 source 的 checkpoint barriers 然後開始執行 checkpoint ,把 state 保存到 filesystem。同時,消費者繼續從Kafka分區讀取更多事件。

假設 Flink Map Task 是 Flink Job 的最末端,那麼當它完成 checkpoint 後,就會立馬通知 Flink Job Master。當 job 的所有 task 都確認其 state 已經 「checkpointed」,Job Master將完成這次的整個 checkpoint。 之後,checkpoint 可以用於故障恢復。

如果發生故障(例如,worker 掛掉),則所有任務將重啟,並且它們的狀態將被重置為最近一次的 checkpoint 的狀態。 如下圖所示。

source 任務將分別從 offset 2 和 1 開始消費。當任務重啟完成, 將會正常運行,就像之前沒發生故障一樣。

PS: 文中提到的 checkpoint 對齊,我說下我的理解,假設一個 Flink Job 有 Source -> Map -> Sink,其中Sink有多個輸入。那麼當一次checkpoint的 barrier從source發出時,到sink這里,多個輸入需要等待其它的輸入的barrier已經到達,經過對齊後,sink才會繼續處理消息。這里就是exactly-once和at-least-once的區別。

The End
原文鏈接: How Apache Flink manages Kafka consumer offsets

⑨ 【Flink 精選】闡述 Flink 的容錯機制,剖析 Checkpoint 實現流程

Flink 容錯機制主要是 狀態的保存和恢復,涉及 state backends 狀態後端、checkpoint 和 savepoint,還有 Job 和 Task 的錯誤恢復

Flink 狀態後端是指 保存 Checkpoint 數據的容器 ,其分類有 MemoryStateBackend、FsStateBackend、RocksDBStateBackend ,狀態的分類有 operator state 和 keyed state

Flink 狀態保存和恢復主要依靠 Checkpoint 機制和 Savepoint 機制,兩者的區別如下表所示。

快照的概念來源於相片,指照相館的一種沖洗過程短的照片。在計算機領域, 快照是數據存儲的某一時刻的狀態記錄 Flink Snapshot 快照是指作業狀態的全局一致記錄 。一個完整的快照是包括 source 運算元的狀態(例如,消費 kafka partition 的 offset)、狀態運算元的緩存數據和 sink 運算元的狀態(批量緩存數據、事務數據等)。

Checkpoint 檢查點可以自動產生快照,用於Flink 故障恢復 。Checkpoint 具有分布式、非同步、增量的特點。

Savepoint 保存點是用戶手動觸發的,保存全量的作業狀態數據 。一般使用場景是作業的升級、作業的並發度縮放、遷移集群等。

Flink 是採用輕量級的分布式非同步快照,其實現是採用柵欄 barrier 作為 checkpoint 的傳遞信號,與業務數據一樣無差別地傳遞下去 ,目的是使得數據流被切分成微批,進行 checkpoint 保存為 snapshot。當 barrier 經過流圖節點的時候,Flink 進行 checkpoint 保存狀態數據。
如下圖所示,checkpoint n 包含每個運算元的狀態,該狀態是指checkpoint n 之前的全部事件,而不包含它之後的所有事件。

針對用戶作業出現故障而導致結果丟失或者重復的問題,Flink 提供3種語義:
At-Least-Once 最少一次 :不會丟失數據,但可能會有重復結果。
Exactly-Once 精確一次 :checkpoint barrier 對齊機制可以保障精確一次。

FailureRateRestartStrategy :允許在指定時間間隔內的最大失敗次數,同時可以設置重啟延時時間。
FixedDelayRestartStrategy :允許指定的失敗次數,同時可以設置重啟延時時間。
NoRestartStrategy :不需要重啟,即 Job 直接失敗。
ThrowingRestartStrategy :不需要重啟,直接拋異常。
Job Restart 策略可以通過 env 設置。

上述策略的父類介面是RestartStrategy,其關鍵是restart(重啟操作)。

RestartAllStrategy :重啟全部 task,默認策略。
RestartIndivialStrategy :恢復單個 task。如果該 task 沒有source,可能導致數據丟失。
NoOpFailoverStrategy :不恢復 task。
上述策略的父類介面是FailoverStrategy,其關鍵是Factory的create(創建 strategy)、onTaskFailure(處理錯誤)。

如何產生可靠的全局一致性快照是分布式系統的難點,其傳統方案是使用的全局時鍾,但存在單點故障、數據不一致等可靠性問題 。為了解決該問題, Chandy-Lamport 演算法採用 marker 的傳播來代替全局時鍾

① 進程 Pi 記錄自己的進程狀態,同時生產一個標識信息 marker(與正常 message 不同),通過 ouput channel 發送給系統裡面的其他進程。
② 進程 Pi 開始記錄所有 input channel 接收到的 message

進程 Pj 從 input channel Ckj 接收到 marker。如果 Pj 還沒有記錄自己的進程狀態,則 Pj 記錄自己的進程狀態,向 output channel 發送 marker;否則 Pj 正在記錄自己的進程狀態(該 marker 之前的 message)。

所有的進程都收到 marker 信息並且記錄下自己的狀態和 channel 的狀態(包含的 message)。

Flink 的分布式非同步快照實現了Chandy Lamport 演算法,其核心思想是 在 source 插入 barrier 代替 Chandy-Lamport 演算法中的 marker,通過控制 barrier 的同步來實現 snapshot 的備份和 Exactly-Once 語義

Checkpoint Coordinator 向所有 source 節點 trigger Checkpoint。

source task向下游廣播barrier。

當source task備份完自己的狀態後,會將備份數據的地址(state handle)通知 Checkpoint Coordinator。

map和sink task收集齊上游source的barrier n,執行本地快照。下面例子是RocksDB增量Checkpoint 的流程:首先RocksDB會全量保存到磁碟上(紅色大三角表示),然後Flink會從中選擇沒有上傳的文件進行持久化備份(紫色小三角)。

map和sink task在完成Checkpoint 之後,將狀態地址state handle返回通知 Coordinator。

當Checkpoint Coordinator收到全部task的state handle,就確定該Checkpoint已完成,並向持久化存儲中備份一個Checkpoint Meta(元數據,包括該checkpoint狀態數據的備份地址)。

⑩ Flink——Exactly-Once

Apache Flink是目前市場最受關注的流計算處理引擎,相較於Spark Streaming的依託Spark Core實現的微批處理模型,Flink是一個純粹的流處理引擎,其基於操作符的連續流模型,可以達到微秒級別的延遲。

Flink實現了流批一體化模式,實現按照事件處理和無序處理兩種形式,基於內存計算。強大高效的反壓機制和內存管理,基於輕量級分布式快照checkpoint機制,從而自動實現了Exactly-Once一致性語義。

1. 數據源端

支持可靠的數據源(如kafka), 數據可重讀

Apache Flink內置FlinkKafkaConsumer010類,不依賴於 kafka 內置的消費組offset管理,在內部自行記錄和維護 consumer 的offset。

2. Flink消費端

輕量級快照機制: 一致性checkpoint檢查點

Flink採用了一種輕量級快照機制(檢查點checkpoint)來保障Exactly-Once的一致性語義。所謂的一致檢查點,即在某個時間點上所有任務狀態的一份拷貝(快照)。該時間點是所有任務剛好處理完一個相同數據的時間。

間隔時間自動執行分布式一致性檢查點(Checkpoints)程序,非同步插入barrier檢查點分界線,內存狀態自動存儲為cp進程文件。保證數據Exactly Oncey精確一次處理。

(1) 從source(Input)端開始,JobManager會向每個source(Input)發送檢查點barrier消息,啟動檢查點。在保證所有的source(Input)數據都處理完成後,Flink開始保存具體的一致性檢查點checkpoints,並在過程中啟用barrier檢查點分界線。
(2) 接收數據和barrier消息,兩個過程非同步進行。在所有的source(Input)數據都處理完成後,開始將自己的檢查點(checkpoints)保存到狀態後(StateBackend)中,並通知JobManager將Barrier分發到下游
(3) barrier向下游傳遞時,會進行barrier對齊確認。待barrier都到齊後才進行checkpoints檢查點保存。
(4) 重復以上操作,直到整個流程完成。

3. 輸出端

與上文Spark的輸出端Exactly-Once一致性上實現類似,除了目標源需要滿足一定條件以外,Flink內置的二階段提交機制也變相實現了事務一致性。**支持冪等寫入、事務寫入機制(二階段提交) **

這一塊和上文Spark的冪寫入特性內容一致,即相同Key/ID 更新寫入,數據不變。藉助支持主鍵唯一性約束的存儲系統,實現冪等性寫入數據,此處將不再繼續贅述。

Flink在處理完source端數據接收和operator運算元計算過程,待過程中所有的checkpoint都完成後,准備發送數據到sink端,此時啟動事務。其中存在兩種方式: (1) WAL預寫日誌: 將計算結果先寫入到日誌緩存(狀態後端/WAL)中,等checkpoint確認完成後一次性寫入到sink。(2) 二階段提交: 對於每個checkpoint創建事務,先預提交數據到sink中,然後等所有的checkpoint全部完成後再真正提交請求到sink, 並把狀態改為已確認。
整體思想: 為checkpoint創建事務,等到所有的checkpoint全部真正的完成後,才把計算結果寫入到sink中。