1. 【Flink 精選】闡述 Flink 的容錯機制,剖析 Checkpoint 實現流程
Flink 容錯機制主要是 狀態的保存和恢復,涉及 state backends 狀態後端、checkpoint 和 savepoint,還有 Job 和 Task 的錯誤恢復 。
Flink 狀態後端是指 保存 Checkpoint 數據的容器 ,其分類有 MemoryStateBackend、FsStateBackend、RocksDBStateBackend ,狀態的分類有 operator state 和 keyed state 。
Flink 狀態保存和恢復主要依靠 Checkpoint 機制和 Savepoint 機制,兩者的區別如下表所示。
快照的概念來源於相片,指照相館的一種沖洗過程短的照片。在計算機領域, 快照是數據存儲的某一時刻的狀態記錄 。 Flink Snapshot 快照是指作業狀態的全局一致記錄 。一個完整的快照是包括 source 運算元的狀態(例如,消費 kafka partition 的 offset)、狀態運算元的緩存數據和 sink 運算元的狀態(批量緩存數據、事務數據等)。
Checkpoint 檢查點可以自動產生快照,用於Flink 故障恢復 。Checkpoint 具有分布式、非同步、增量的特點。
Savepoint 保存點是用戶手動觸發的,保存全量的作業狀態數據 。一般使用場景是作業的升級、作業的並發度縮放、遷移集群等。
Flink 是採用輕量級的分布式非同步快照,其實現是採用柵欄 barrier 作為 checkpoint 的傳遞信號,與業務數據一樣無差別地傳遞下去 ,目的是使得數據流被切分成微批,進行 checkpoint 保存為 snapshot。當 barrier 經過流圖節點的時候,Flink 進行 checkpoint 保存狀態數據。
如下圖所示,checkpoint n 包含每個運算元的狀態,該狀態是指checkpoint n 之前的全部事件,而不包含它之後的所有事件。
針對用戶作業出現故障而導致結果丟失或者重復的問題,Flink 提供3種語義:
① At-Least-Once 最少一次 :不會丟失數據,但可能會有重復結果。
② Exactly-Once 精確一次 :checkpoint barrier 對齊機制可以保障精確一次。
① FailureRateRestartStrategy :允許在指定時間間隔內的最大失敗次數,同時可以設置重啟延時時間。
② FixedDelayRestartStrategy :允許指定的失敗次數,同時可以設置重啟延時時間。
③ NoRestartStrategy :不需要重啟,即 Job 直接失敗。
④ ThrowingRestartStrategy :不需要重啟,直接拋異常。
Job Restart 策略可以通過 env 設置。
上述策略的父類介面是RestartStrategy,其關鍵是restart(重啟操作)。
① RestartAllStrategy :重啟全部 task,默認策略。
② RestartIndivialStrategy :恢復單個 task。如果該 task 沒有source,可能導致數據丟失。
③ NoOpFailoverStrategy :不恢復 task。
上述策略的父類介面是FailoverStrategy,其關鍵是Factory的create(創建 strategy)、onTaskFailure(處理錯誤)。
如何產生可靠的全局一致性快照是分布式系統的難點,其傳統方案是使用的全局時鍾,但存在單點故障、數據不一致等可靠性問題 。為了解決該問題, Chandy-Lamport 演算法採用 marker 的傳播來代替全局時鍾 。
① 進程 Pi 記錄自己的進程狀態,同時生產一個標識信息 marker(與正常 message 不同),通過 ouput channel 發送給系統裡面的其他進程。
② 進程 Pi 開始記錄所有 input channel 接收到的 message
進程 Pj 從 input channel Ckj 接收到 marker。如果 Pj 還沒有記錄自己的進程狀態,則 Pj 記錄自己的進程狀態,向 output channel 發送 marker;否則 Pj 正在記錄自己的進程狀態(該 marker 之前的 message)。
所有的進程都收到 marker 信息並且記錄下自己的狀態和 channel 的狀態(包含的 message)。
Flink 的分布式非同步快照實現了Chandy Lamport 演算法,其核心思想是 在 source 插入 barrier 代替 Chandy-Lamport 演算法中的 marker,通過控制 barrier 的同步來實現 snapshot 的備份和 Exactly-Once 語義 。
Checkpoint Coordinator 向所有 source 節點 trigger Checkpoint。
source task向下游廣播barrier。
當source task備份完自己的狀態後,會將備份數據的地址(state handle)通知 Checkpoint Coordinator。
map和sink task收集齊上游source的barrier n,執行本地快照。下面例子是RocksDB增量Checkpoint 的流程:首先RocksDB會全量保存到磁碟上(紅色大三角表示),然後Flink會從中選擇沒有上傳的文件進行持久化備份(紫色小三角)。
map和sink task在完成Checkpoint 之後,將狀態地址state handle返回通知 Coordinator。
當Checkpoint Coordinator收到全部task的state handle,就確定該Checkpoint已完成,並向持久化存儲中備份一個Checkpoint Meta(元數據,包括該checkpoint狀態數據的備份地址)。
2. Flink HistoryServer配置(簡單三步完成)
允許您查詢JobManager存檔的已完成作業的狀態和統計信息。(官網原話)
最適合用於:了解 flink過去完成任務的狀態,以及有狀態作業的恢復(保存了最後一次的checkpoint地址)
官網地址: https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/historyserver.html
官網配置參數: https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#history-server
修改flink-1.11.2/conf/flink-conf.yaml文件
兩張圖:
historyserver.web.tmpdir的默認配置圖:
historyserver.web.tmpdir的自定義路徑配置圖:
在hdfs的/flink目錄下創建completed-jobs目錄(許可權可以改成777)
啟動/關閉命令:
1、查看啟動狀態
2、分別啟一個per-job任務、sql任務、基於session啟的任務,過一會全部cancel掉,都可以在hdfs路徑和/tmp下的自定義目錄看到相關數據,最後可以在host:8082上面看到你剛才canceled的任務,如下圖:
3、訪問hdfs路徑:
4、訪問 http://host:8082 可以查看到歷史完成任務狀態:
生產中遇到突然這個服務丟失,然後重啟任務失敗。通過排查任務是historyserver.web.tmpdir: /tmp/flinkhistoryserver/這個路徑被刪除了。
3. 基於flink sql構建實時數據倉庫
根據目前大數據這一塊的發展,已經不局限於離線的分析,挖掘數據潛在的價值,數據的時效性最近幾年變得剛需,實時處理的框架有storm,spark-streaming,flink等。想要做到實時數據這個方案可行,需要考慮以下幾點:1、狀態機制 2、精確一次語義 3、高吞吐量 4、可彈性伸縮的應用 5、容錯機制,剛好這幾點,flink都完美的實現了,並且支持flink sql高級API,減少了開發成本,可用實現快速迭代,易維護等優點。
離線數倉的架構圖:
實時數倉架構圖:
目前是將實時維度表和DM層數據存於hbase當中,實時公共層都存於kafka當中,並且以寫滾動日誌的方式寫入HDFS(主要是用於校驗數據)。其實在這里可以做的工作還有很多,kafka集群,flink集群,hbase集群相互獨立,這對整個實時數據倉庫的穩定性帶來一定的挑戰。
一個數據倉庫想要成體系,成資產,離不開數據域的劃分。所以參考著離線的數據倉庫,想著在實時數倉做出這方面的探索,理論上來講,離線可以實現的,實時也是可以實現的。 並且目前已經取得了成效,目前劃分的數據域跟離線大致相同,有流量域,交易域,營銷域等等。當然這裡面涉及到維表,多事務事實表,累計快照表,周期性快照表的設計,開發,到落地這里就不詳述了。
維度表也是整個實時數據倉庫不可或缺的部分。從目前整個實時數倉的建設來看,維度表有著數據量大,但是變更少的特點,我們試想過構建全平台的實時商品維度表或者是實時會員維度表,但是這類維度表太過於復雜,所以針對這類維度表下面介紹。還有另外一種就是較為簡單的維度表,這類維度可能對應著業務系統單個mysql表,或者只需要幾個表進行簡單ETL就可以產出的表,這類維表是可以做成實時的。以下有幾個實施的關鍵點:
如下是離線數據同步架構圖:
實時數據的接入其實在底層架構是一樣的,就是從kafka那邊開始不一樣,實時用flink的UDTF進行解析,而離線是定時(目前是小時級)用camus拉到HDFS,然後定時load HDFS的數據到hive表裡面去,這樣來實現離線數據的接入。實時數據的接入是用flink解析kafka的數據,然後在次寫入kafka當中去。
由於目前離線數據已經穩定運行了很久,所以實時接入數據的校驗可以對比離線數據,但是離線數據是小時級的hive數據,實時數據存於kafka當中,直接比較不了,所以做了相關處理,將kafka的數據使用flink寫HDFS滾動日誌的形式寫入HDFS,然後建立hive表小時級定時去load HDFS中的文件,以此來獲取實時數據。
完成以上兩點,剩餘還需要考慮一點,都是小時級的任務,這個時間卡點使用什麼欄位呢?首先要確定一點就是離線和實時任務卡點的時間欄位必須是一致的,不然肯定會出問題。目前離線使用camus從kafka將數據拉到HDFS上,小時級任務,使用nginx_ts這個時間欄位來卡點,這個欄位是上報到nginx伺服器上記錄的時間點。而實時的數據接入是使用flink消費kafka的數據,在以滾動日誌的形式寫入HDFS的,然後在建立hive表load HDFS文件獲取數據,雖然這個hive也是天/小時二級分區,但是離線的表是根據nginx_ts來卡點分區,但是實時的hive表是根據任務啟動去load文件的時間點去區分的分區,這是有區別的,直接篩選分區和離線的數據進行對比,會存在部分差異,應當的做法是篩選范圍分區,然後在篩選nginx_ts的區間,這樣在跟離線做對比才是合理的。
目前實時數據接入層的主要時延是在UDTF函數解析上,實時的UDTF函數是根據上報的日誌格式進行開發的,可以完成日誌的解析功能。
解析流程圖如下:
解析速率圖如下:
該圖還不是在峰值數據量的時候截的,目前以800記錄/second為准,大概一個記錄的解析速率為1.25ms。
目前該任務的flink資源配置核心數為1,假設解析速率為1.25ms一條記錄,那麼峰值只能處理800條/second,如果數據接入速率超過該值就需要增加核心數,保證解析速率。
介紹一下目前離線維度表的情況,就拿商品維度表來說,全線記錄數將近一個億,計算邏輯來自40-50個ods層的數據表,計算邏輯相當復雜,如果實時維度表也參考離線維度表來完成的話,那麼開發成本和維護成本非常大,對於技術來講也是很大的一個挑戰,並且目前也沒有需求要求維度屬性百分百准確。所以目前(偽實時維度表)准備在當天24點產出,當天的維度表給第二天實時公共層使用,即T-1的模式。偽實時維度表的計算邏輯參考離線維度表,但是為了保障在24點之前產出,需要簡化一下離線計算邏輯,並且去除一些不常用的欄位,保障偽實時維度表可以較快產出。
實時維度表的計算流程圖:
目前使用flink作為公司主流的實時計算引擎,使用內存作為狀態後端,並且固定30s的間隔做checkpoint,使用HDFS作為checkpoint的存儲組件。並且checkpoint也是作為任務restart以後恢復狀態的重要依據。熟悉flink的人應該曉得,使用內存作為狀態後端,這個內存是JVM的堆內存,畢竟是有限的東西,使用不得當,OOM是常有的事情,下面就介紹一下針對有限的內存,如果完成常規的計算。
4. Flink kafka kerberos的配置
Flink消費集成kerberos認證的kafka集群時,需要做一些配置才可以正常執行。
Flink版本:1.8;kafka版本:2.0.1;Flink模式:Standalone
//指示是否從 Kerberos ticket 緩存中讀取
security.kerberos.login.use-ticket-cache: false1
//Kerberos 密鑰表文件的絕對路徑
security.kerberos.login.keytab: /data/home/keytab/flink.keytab
//認證主體名稱
security.kerberos.login.principal: [email protected]
//Kerberos登陸contexts
security.kerberos.login.contexts: Client,KafkaClient
val properties: Properties =new Properties()
properties.setProperty("bootstrap.servers","broker:9092")
properties.setProperty("group.id","testKafka")
properties.setProperty("security.protocol","SASL_PLAINTEXT")
properties.setProperty("sasl.mechanism","GSSAPI")
properties.setProperty("sasl.kerberos.service.name","kafka")
consumer =new FlinkKafkaConsumer[String]("flink",new SimpleStringSchema(), properties)
參數說明 :security.protocol
運行參數可以配置為PLAINTEXT(可不配置)/SASL_PLAINTEXT/SSL/SASL_SSL四種協議,分別對應Fusion Insight Kafka集群的21005/21007/21008/21009埠。 如果配置了SASL,則必須配置sasl.kerberos.service.name為kafka,並在conf/flink-conf.yaml中配置security.kerberos.login相關配置項。如果配置了SSL,則必須配置ssl.truststore.location和ssl.truststore.password,前者表示truststore的位置,後者表示truststore密碼。
5. Apache Flink快速入門-基本架構、核心概念和運行流程
Flink是一個基於流計算的分布式引擎,以前的名字叫stratosphere,從2010年開始在德國一所大學里發起,也是有好幾年的 歷史 了,2014年來借鑒了社區其它一些項目的理念,快速發展並且進入了Apache頂級孵化器,後來更名為Flink。
Flink在德語中是快速和靈敏的意思 ,用來體現流式數據處理速度快和靈活性強等特點。
Flink提供了同時支持高吞吐、低延遲和exactly-once 語義的實時計算能力,另外Flink 還提供了基於流式計算引擎處理批量數據的計算能力,真正意義上實現了流批統一。
Flink 獨立於Apache Hadoop,且能在沒有任何 Hadoop 依賴的情況下運行。
但是,Flink 可以很好的集成很多 Hadoop 組件,例如 HDFS、YARN 或 HBase。 當與這些組件一起運行時,Flink 可以從 HDFS 讀取數據,或寫入結果和檢查點(checkpoint)/快照(snapshot)數據到 HDFS 。 Flink 還可以通過 YARN 輕松部署,並與 YARN 和 HDFS Kerberos 安全模塊集成。
Flink具有先進的架構理念、諸多的優秀特性,以及完善的編程介面。
Flink的具體優勢有如下幾點:
(1)同時支持高吞吐、低延遲、高性能;
(2)支持事件時間(Event Time)概念;
事件時間的語義使流計算的結果更加精確,尤其在事件到達無序或者延遲的情況下,保持了事件原本產生時的時序性,盡可能避免網路傳輸或硬體系統的影響。
(3)支持有狀態計算;
所謂狀態就是在流計算過程中,將運算元的中間結果數據保存在內存或者文件系統中,等下一個事件進入運算元後,可以從之前的狀態中獲取中間結果,計算當前的結果,從而無需每次都基於全部的原始數據來統計結果。
(4)支持高度靈活的窗口(Window)操作;
(5)基於輕量級分布式快照(Snapshot)實現的容錯;
(6)基於JVM實現獨立的內存管理;
(7)Save Points(保存點);
保存點是手動觸發的,觸發時會將它寫入狀態後端(State Backends)。Savepoints的實現也是依賴Checkpoint的機制。Flink 程序在執行中會周期性的在worker 節點上進行快照並生成Checkpoint。因為任務恢復的時候只需要最後一個完成的Checkpoint的,所以舊有的Checkpoint會在新的Checkpoint完成時被丟棄。Savepoints和周期性的Checkpoint非常的類似,只是有兩個重要的不同。一個是由用戶觸發,而且不會隨著新的Checkpoint生成而被丟棄。
在Flink整個軟體架構體系中,統一遵循了分層的架構設計理念,在降低系統耦合度的同時,為上層用戶構建Flink應用提供了豐富且友好的介面。
整個Flink的架構體系可以分為三層:
Deployment層: 該層主要涉及了Flink的部署模式,Flink支持多種部署模式:本地、集群(Standalone/YARN),雲(GCE/EC2),Kubernetes等。
Runtime層:Runtime層提供了支持Flink計算的全部核心實現,比如:支持分布式Stream處理、JobGraph到ExecutionGraph的映射、調度等等,為上層API層提供基礎服務。
API層: 主要實現了面向無界Stream的流處理和面向Batch的批處理API,其中面向流處理對應DataStream API,面向批處理對應DataSet API。
Libraries層:該層也可以稱為Flink應用框架層,根據API層的劃分,在API層之上構建的滿足特定應用的計算框架,也分別對應於面向流處理和面向批處理兩類。
核心概念:Job Managers,Task Managers,Clients
Flink也是典型的master-slave分布式架構。Flink的運行時,由兩種類型的進程組成:
Client: Client不是運行時和程序執行的一部分,它是用來准備和提交數據流到JobManagers。之後,可以斷開連接或者保持連接以獲取任務的狀態信息。
當 Flink 集群啟動後,首先會啟動一個 JobManger 和一個或多個的 TaskManager。由 Client 提交任務給 JobManager, JobManager 再調度任務到各個 TaskManager 去執行,然後 TaskManager 將心跳和統計信息匯報給 JobManager。 TaskManager 之間以流的形式進行數據的傳輸。上述三者均為獨立的 JVM 進程。
每個Worker(Task Manager)是一個JVM進程,通常會在單獨的線程里執行一個或者多個子任務。為了控制一個Worker能夠接受多少個任務,會在Worker上抽象多個Task Slot (至少一個)。
只有一個slot的TaskManager意味著每個任務組運行在一個單獨JVM中。 在擁有多個slot的TaskManager上,subtask共用JVM,可以共用TCP連接和心跳消息,同時可以共用一些數據集和數據結構,從而減小任務的開銷。
Flink的任務運行其實是多線程的方式,這和MapRece多JVM進程的方式有很大的區別,Flink能夠極大提高CPU使用效率,在多個任務之間通過TaskSlot方式共享系統資源,每個TaskManager中通過管理多個TaskSlot資源池對資源進行有效管理。
6. 3.一文搞定:Flink中端到端的狀態一致性(理念)
哈嘍,大家好。在第二章節中我聊了聊狀態後端和檢查點相關的內容,如果你仔細看完就能夠很清楚的知道,Flink是如何保存自己引以為傲的狀態的了。並且我也在上一章節中說了,Flink能夠依賴檢查點機制來實現自身的精確一致性,所以這篇文章咱們就簡單聊聊它所謂的精準一次性是如何實現的。(檢查點是根據在數據中插入「分界線」標志來觸發的)
從本文的標題能夠看出,今天要聊的是端到端的狀態一致性,那麼首先要先知道這個端到端是什麼意思。我們試想一下,Flink處理數據,肯定是要有數據來源和數據發送點的,也就是source和sink。那麼,如果要聊端到端的精準一次性,就要對這個兩個「端」字進行拆解,分為輸入端與Flink之間的精準一次性,和Flink與輸出端之間的精準一次性。
輸入端的精準一次性比較好理解,你只需要明確一點,只要向Flink發送數據的source端具備數據重放的功能就好。還記得上一小節提到的檢查點機制嗎?它是對程序運行時的某個時間點所有狀態的一次快照,在做快照的時候,source運算元會讀取數據源的偏移量信息,一起保存在檢查點中,這就能夠在故障恢復的時候讓source運算元根據自己保存的這個偏移量信息,去數據源中重新讀取數據。所以數據源頭這方面,精準一次性比較好搞,因為Flink內部幹了很多事情。當然了,這個source端一定要具備數據保存和數據回放的能力,如果不存數的話,就算真記錄了偏移量也是白搭。
輸入端說完了,接下來就說說輸出端。這一塊其實就不太好弄了,因為Flink將數據發送到sink方。它的角色就仿若souce到flink一樣。(ps:souce->flink === flink -->sink,都是一個東西向另外一個東西發送數據)。但是sink端的外部存儲系統沒辦法像Flink一樣,通過保存狀態的方式來完成精準一次性。但是在開發過程中,精準一次性很多場景都非常重要,所以為了能夠實現Flink到sink端的精準一次性,還是需要對sink端有一些要求的。
主要的要求內容一共有兩個,分別是「事務」和「冪等性」:
我們首先來聊一聊冪等,冪等這個東西很神奇。它就像一個不允許重復的集合一樣,在冪等的環境下,一個操作可以重復多次,但是次操作所導致的結果就只有一次。有了冪等性,數據無論寫多少次,都不會影響到sink端的結果,但是如果想要進行冪等性寫出,那就要求sink端需要支持冪等,還是有一些場景上的限制的。
冪等性聊完了就聊聊事務,事務相對於冪等性的要求它的限制要少一些。其實在聊精準一次性之前,大家要明確一個點,那就是sink端面臨的問題不是數據會不會丟失,而是數據會不會被重復計算。這是因為即使是數據因為故障丟失,source端也會通過檢查點來對數據進行重新讀寫,所以數據丟失的幾率被降得很低。所以重復計算才是sink端的難題。
我們都知道,事務的特性是:如果我本次的活干到一半突然出現故障,那麼這次乾的內容我會全部收回。當然這是粗俗的說法,如果嚴格一點,那就是:事務具有四個基本特性:ACID,即原子、一致、隔離、持久這四大特性。通過使用這四大特性,就能夠保證如果本次的操作沒有完成,那這次操作過程中完成的部分也會被撤銷。
那就用這個理念,來套入Flink與sink之間的處理。如果我們寫一個事務,用這個事務和檢查點綁定在一起,通過這個事務向外部寫出數據,當Sink運算元觸發檢查點保存的時候,開啟保存狀態的同時就開啟一個事務,接下來的數據都寫在這個事務裡面。只有檢查點完成了保存,那麼事務就可以提交,數據就算是成功寫出,可以使用了。如果程序出現故障了,狀態保存失敗,就會根據檢查點中的內容對數據重新讀寫,事務也肯定是失敗了,就會回退。之前的操作也會作廢,也就證明了數據沒有多餘寫出了。
為了能夠滿足Flink與sink端之間的狀態精準一次性,也明確了在兩種實現理論中事務比較牛,那就再來聊一聊事務實現的兩種方式:預寫日誌、兩階段提交。當然了,這個TMD數據寫出端也需要支持事務才行。就像我之前使用到的Clickhouse,它既不支持事務也不支持冪等,只能通過本身的副本合並樹來實現虛假的精準一次性(我現階段就這個水平,可能有的人會手寫更牛叉的自定義Sink也說不定)。
預寫日誌首先需要把數據作為日誌狀態保存起來,既然它是日誌狀態了,那就證明程序在觸發檢查點的時候就能夠將它一起以快照的方式存儲到外部系統中做持久化存儲。當檢查點成功保存之後,再把結果一次性寫出就好。這么搞是不是就算是完成精準一次性了呢?答案是,是的!但是預寫日誌有個問題,那就是它是把一個小階段之間的數據全量寫出,雖然在宏觀角度去看這一個階段很小,但是Flink處理讀取數據的能力賊強,就這一小階段就會產生大量數據,一次性寫出就會對集群造成一定的壓力。就相當於把流處理變成了批處理(spark震怒!!!)
這種方法看起來有點美好,但是實際上它有問題。它把流處理變成了批處理,那必然要知道這次批操作成沒成功,所以在數據寫出完成之後,會還給sink一個成功的信號,只有明確這個信息是真的完成了,才能證明Checkpoint是成功的,同時也要把這個返回的成功信息做一個保存,用來證明這次批處理的成功。這種情況會面臨的問題就是,當數據成功寫出了,返回成功信號的時候出事兒了,那故障恢復後Flink確認不了這次成沒成功,就會重新發送數據,數據就又會重復發送了。
這個功能DateSteam API提供了一個模板類來幫忙完成。
預寫日誌說完了,就來談談兩階段提交吧。它比預寫日誌還要牛一點,但是限制也是一大堆,比較難搞。在正式說兩階段提交之前,要明確兩個概念。第一個是兩階段是由預提交和正式提交兩部分組成,預提交階段的數據內容是不能夠使用的。
當第一條數據來時,或者是sink接收到檢查點的「分界線」時,就會開啟一個事務,這個事務會代替sink來進行數據的寫出操作,但是在這個階段,所有寫出的操作都是不可用的,也就是預提交階段。而sink這個時候會進入一個等待狀態,它需要接收JobManager發送給它的一個檢查點成功保存的信號,一旦它接收到這個信息,那麼sink端的這個事務就會提交,預提交階段所有不可用的數據便會變成可使用的了。
Flink同樣提供了一個模板sink:
在這篇文章裡面,我聊了聊我理解的有關於狀態一致性的內容,總的來說得到了一個結論,那就是精準一次性代價頗高,逼事賊多。而我目前用到的clickhouse既不支持事務、也不支持冪等。所以我還沒有解除過重寫上述兩個介面的過程。但是,在進行實時數據處理的過程中,大多都是在Kafka與Flink之間進行數據的來回傳遞,分層操作也是如此。所以,我們在下一篇文章中就聊聊,有關於 Kafka -> Flink -> Kafka 之間的狀態一致性吧。
7. flink配置和內存
1.Hosts and Ports
metrics.internal.query-service.port "0" String Accepts a list of ports (「50100,50101」), ranges(「50100-50200」) or a combination of both.
rest.bind-address (none) String
rest.bind-port "8081" String
taskmanager.data.port 0 Integer 任務管理器的外部埠,用於數據交換操作。
taskmanager.host (none) String
taskmanager.rpc.port "0" String
2.Fault Tolerance
restart-strategy 重啟策略 默認空
restart-strategy.fixed-delay.attempts 固定周期重啟
restart-strategy.fixed-delay.delay 固定周期重啟
restart-strategy.failure-rate.delay 失敗率重啟
restart-strategy.failure-rate.failure-rate-interval 失敗率重啟
restart-strategy.failure-rate.max-failures-per-interval 失敗率重啟
state.backend.incremental 增量checkpoint(僅對rocksdb支持)
state.backend.local-recovery 狀態後端配置本地恢復,默認false,只支持鍵控狀態後端
state.checkpoints.num-retained 保留的最大已完成檢查點數,默認1
taskmanager.state.local.root-dirs 本地恢復的根目錄
3.High Availability
high-availability 默認為NONE To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of factory class.
high-availability.cluster-id 高可用flink集群ID
high-availability.storageDir 元數據路徑
high-availability.zookeeper.path.root 配置ZK路徑
high-availability.zookeeper.quorum 配置ZK集群
4.Memory Configuration
在大多數情況下,用戶只需要設置值taskmanager.memory.process.size或taskmanager.memory.flink.size(取決於設置方式),並可能通過調整JVM堆與託管內存的比率taskmanager.memory.managed.fraction。
jobmanager.memory.enable-jvm-direct-memory-limit 是否啟用jm進程的JVM直接內存限制,默認false
jobmanager.memory.flink.size 默認none。這包括JobManager消耗的所有內存。非容器配置
jobmanager.memory.heap.size 默認none。默認為總內存減去JVM,network,managed等
jobmanager.memory.jvm-metaspace.size jm的元空間大小,默認256mb
jobmanager.memory.jvm-overhead.fraction jm保留總進程內存的比例,默認0.1
jobmanager.memory.jvm-overhead.max 最大JVM開銷,默認1gb
jobmanager.memory.jvm-overhead.min 最小JVM開銷,默認192mb
jobmanager.memory.off-heap.size jm的堆外內存,默認128mb,如果第一個參數被啟用,這個將生效
jobmanager.memory.process.size JobManager的總進程內存大小。容器化配置這個,為容器總大小
taskmanager.memory.flink.size TaskExecutor的總Flink內存大小。默認none,非容器配置
taskmanager.memory.framework.heap.size TaskExecutor的框架堆內存大小。默認128mb
taskmanager.memory.framework.off-heap.size TaskExecutor的框架堆外內存大小。默認128mb
taskmanager.memory.jvm-metaspace.size TaskExecutor的JVM元空間大小。默認256mb
taskmanager.memory.jvm-overhead.fraction 要為JVM開銷保留的總進程內存的分數。默認0.1
taskmanager.memory.jvm-overhead.max TaskExecutor的最大JVM開銷最大大小,默認1gb
taskmanager.memory.jvm-overhead.min TaskExecutor的最小JVM開銷大小。默認192mb
taskmanager.memory.managed.consumer-weights 消費者權重。DATAPROC(用於流式RocksDB狀態後端和批量內置演算法)和PYTHON(用於Python進程),默認DATAPROC:70,PYTHON:30
taskmanager.memory.managed.fraction 如果未顯式指定託管內存大小,則將用作託管內存的Flink總內存的分數,默認0.4
taskmanager.memory.managed.size TaskExecutor的託管內存大小。默認none
taskmanager.memory.network.fraction 用作網路內存的總Flink內存的分數,默認0.1
taskmanager.memory.network.max TaskExecutor的最大網路內存大小。默認1gb
taskmanager.memory.network.min TaskExecutor的最小網路內存大小。默認64mb
taskmanager.memory.process.size TaskExecutor的總進程內存大小。默認none,容器配置
taskmanager.memory.task.heap.size tm內存,默認none,
taskmanager.memory.task.off-heap.size tm堆外內存,默認0
5.Miscellaneous Options
fs.allowed-fallback-filesystems none
fs.default-scheme none
io.tmp.dirs 'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in standalone.
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup_tm.html
8. 4.一文搞定:Flink與Kafka之間的精準一次性
在上一篇文章當中,也算是比較詳細且通俗的聊了聊Flink是如何通過checkpoint機制來完成數據精準一次性的實現的。並且也在上一章的結尾表示,要在接下來聊一聊Flink與它的鐵哥們Kafaka之間,是如何實現數據的精準一次性消費的。
本次的聊法,還是要通過以kafka(source)->Flink,Flink(source)->Kafka來分別展開討論。
kafka是一個具有數據保存、數據回放能力的消息隊列,說白了就是kafka中的每一個數據,都有一個專門的標記作為標識。而在Flink消費kafka傳入的數據的時候,source任務就能夠將這個偏移量以運算元狀態的角色進行保存,寫入到設定好的檢查點中。這樣一旦發生故障,Flink中的FlinkKafkaProce連接器就i能夠按照自己保存的偏移量,自己去Kafka中重新拉取數據,也正是通過這種方式,就能夠確保Kafka到Flink之間的精準一次性。
在上一篇文章當中,已經表明了,如果想要讓輸出端能夠進行精準一次性消費,就需要使用到冪等性或者是事務。而事務中的兩階段提交是所有方案裡面最好的實現。
其實Flink到Kafak之間也是採用了這種方式,具體的可以看一下ctrl進到FlinkKafkaProce連接器內部去看一看:
這也就表明了,當數據通過Flink發送給sink端Kafka的時候,是經歷了兩個階段的處理的。第一階段就是Flink向Kafka中插入數據,進入預提交階段。當JobManager發送的Ckeckpoint保存成功信號過來之後,才會提交事務進行正式的數據發送,也就是讓原來不可用的數據可以被使用了。
這個實現過程到目前階段就很清晰了,它的主體流程無非就是在開啟檢查點之後,由JobManager向各個階段的處理邏輯發送有關於檢查點的barrier。所有的計算任務接收到之後,就會根據自己當前的狀態做一個檢查點保存。而當這個barrier來到sink任務的時候,sink就會開啟一個事務,然後通過這個事務向外預寫數據。直到Jobmanager來告訴它這一次的檢查點已經保存完成了,sink就會進行第二次提交,數據也就算是成功寫出了。
1.必須要保證檢查點被打開了,如果檢查點沒有打開,那麼之前說的一切話都是空談。因為Flink默認檢查點是關著的。
2.在FlinkKafakProcer連接器的構造函數中要傳入參數,這個參數就是用來保證狀態一致性的。就是在構造函數的最後一個參數輸入如下:
3.配置Kafka讀取數據的隔離級別
在kafka中有個配置,這個配置用來管理Kafka讀取數據的級別。而這個配置默認是能夠讀取預提交階段的數據的,所以如果你沒改這個配置,那兩階段提交的第一階段就是白費了。所以需要改一下這個配置,來更換一下隔離級別:
4.事務超時時間
這個配置也很有意思,大家試想一下。如果要進行兩階段提交,就要保證sink端支持事務,Kafka是支持事務的,但是像這個組件對於很多機制都有一個超時時間的概念,也就是說如果時間到了這個界限還沒完成工作,那就會默認這個工作失敗。Kafka中由這個概念,Flink中同樣由這個概念。但是flink默認超時時間是1小時,而Kafka默認是15分鍾,這就有可能出現檢查點保存東西的時間大於15分鍾,假如說是16分鍾保存完成然後給sink發送檢查點保存陳功可以提交事務的信號,但是這個時候Kafka已經認為事務失敗,把之前的數據都扔了。那數據不就是丟失了么。所以說Kafka的超時時間要大於Flink的超時時間才好。
截止到目前為止,基本上把有關於狀態維護的一些東西都說完了,有狀態後端、有檢查點。還通過檢查點完成可端到端的數據精準一次性消費。但是想到這我又感覺,如果有學習進度比我差一些的,萬一沒辦法很好的理解怎麼辦。所以在下一篇文章當中我就聊聊Flink中的「狀態」到底是個什麼東西,都有什麼類型,都怎麼去用。
9. Flink 狀態編程
1.流式計算分為無狀態和有狀態兩種情況。 無狀態的計算觀察每個獨立事件,並根據最後一個事件輸出結果。例如,流處理應用程序從感測器接收水位數據,並在水位超過指定高度時發出警告。有狀態的計算則會基於多個事件輸出結果。以下是一些例子。
(1)所有類型的窗口。例如,計算過去一小時的平均水位,就是有狀態的計算。
(2)所有用於復雜事件處理的狀態機。例如,若在一分鍾內收到兩個相差20cm以上的水位差讀數,則發出警告,這是有狀態的計算。
(3)流與流之間的所有關聯操作,以及流與靜態表或動態表之間的關聯操作,都是有狀態的計算。
2.下圖展示了無狀態流處理和有狀態流處理的主要區別。 無狀態流處理分別接收每條數據記錄(圖中的黑條),然後根據最新輸入的數據生成輸出數據(白條)。有狀態流處理會維護狀態(根據每條輸入記錄進行更新),並基於最新輸入的記錄和當前的狀態值生成輸出記錄(灰條)。
上圖中輸入數據由黑條表示。無狀態流處理每次只轉換一條輸入記錄,並且僅根據最新的輸入記錄輸出結果(白條)。有狀態 流處理維護所有已處理記錄的狀態值,並根據每條新輸入的記錄更新狀態,因此輸出記錄(灰條)反映的是綜合考慮多個事件之後的結果。
3.有狀態的運算元和應用程序
Flink內置的很多運算元,數據源source,數據存儲sink都是有狀態的,流中的數據都是buffer records,會保存一定的元素或者元數據。例如: ProcessWindowFunction會緩存輸入流的數據,ProcessFunction會保存設置的定時器信息等等。
在Flink中,狀態始終與特定運算元相關聯。總的來說,有兩種類型的狀態:
運算元狀態(operator state)
鍵控狀態(keyed state)
4.運算元狀態(operator state)
運算元狀態的作用范圍限定為運算元任務。這意味著由同一並行任務所處理的所有數據都可以訪問到相同的狀態,狀態對於同一任務而言是共享的。運算元狀態不能由相同或不同運算元的另一個任務訪問。
Flink為運算元狀態提供三種基本數據結構:
列表狀態(List state):將狀態表示為一組數據的列表。
聯合列表狀態(Union list state):也將狀態表示為數據的列表。它與常規列表狀態的區別在於,在發生故障時,或者從保存點(savepoint)啟動應用程序時如何恢復。
廣播狀態(Broadcast state):如果一個運算元有多項任務,而它的每項任務狀態又都相同,那麼這種特殊情況最適合應用廣播狀態
5.鍵控狀態(keyed state)
鍵控狀態是根據輸入數據流中定義的鍵(key)來維護和訪問的。Flink為每個鍵值維護一個狀態實例,並將具有相同鍵的所有數據,都分區到同一個運算元任務中,這個任務會維護和處理這個key對應的狀態。當任務處理一條數據時,它會自動將狀態的訪問范圍限定為當前數據的key。因此,具有相同key的所有數據都會訪問相同的狀態。Keyed State很類似於一個分布式的key-value map數據結構,只能用於KeyedStream(keyBy運算元處理之後)。
6.狀態後端(state backend)
每傳入一條數據,有狀態的運算元任務都會讀取和更新狀態。由於有效的狀態訪問對於處理數據的低延遲至關重要,因此每個並行任務都會在本地維護其狀態,以確保快速的狀態訪問。狀態的存儲、訪問以及維護,由一個可插入的組件決定,這個組件就叫做狀態後端(state backend)
狀態後端主要負責兩件事:
1)本地的狀態管理
2)將檢查點(checkpoint)狀態寫入遠程存儲
狀態後端分類:
(1)MemoryStateBackend
內存級的狀態後端,會將鍵控狀態作為內存中的對象進行管理,將它們存儲在TaskManager的JVM堆上;而將checkpoint存儲在JobManager的內存中。
(2)FsStateBackend
將checkpoint存到遠程的持久化文件系統(FileSystem)上。而對於本地狀態,跟MemoryStateBackend一樣,也會存在TaskManager的JVM堆上。
(3)RocksDBStateBackend
將所有狀態序列化後,存入本地的RocksDB中存儲。
7.狀態一致性
當在分布式系統中引入狀態時,自然也引入了一致性問題。一致性實際上是"正確性級別"的另一種說法,也就是說在成功處理故障並恢復之後得到的結果,與沒有發生任何故障時得到的結果相比,前者到底有多正確?舉例來說,假設要對最近一小時登錄的用戶計數。在系統經歷故障之後,計數結果是多少?如果有偏差,是有漏掉的計數還是重復計數?
1)一致性級別
在流處理中,一致性可以分為3個級別:
(1) at-most-once : 這其實是沒有正確性保障的委婉說法——故障發生之後,計數結果可能丟失。同樣的還有udp。
(2) at-least-once : 這表示計數結果可能大於正確值,但絕不會小於正確值。也就是說,計數程序在發生故障後可能多算,但是絕不會少算。
(3) exactly-once : 這指的是系統保證在發生故障後得到的計數結果與正確值一致。
曾經,at-least-once非常流行。第一代流處理器(如Storm和Samza)剛問世時只保證at-least-once,原因有二。
(1)保證exactly-once的系統實現起來更復雜。這在基礎架構層(決定什麼代表正確,以及exactly-once的范圍是什麼)和實現層都很有挑戰性。
(2)流處理系統的早期用戶願意接受框架的局限性,並在應用層想辦法彌補(例如使應用程序具有冪等性,或者用批量計算層再做一遍計算)。
最先保證exactly-once的系統(Storm Trident和Spark Streaming)在性能和表現力這兩個方面付出了很大的代價。為了保證exactly-once,這些系統無法單獨地對每條記錄運用應用邏輯,而是同時處理多條(一批)記錄,保證對每一批的處理要麼全部成功,要麼全部失敗。這就導致在得到結果前,必須等待一批記錄處理結束。因此,用戶經常不得不使用兩個流處理框架(一個用來保證exactly-once,另一個用來對每個元素做低延遲處理),結果使基礎設施更加復雜。曾經,用戶不得不在保證exactly-once與獲得低延遲和效率之間權衡利弊。Flink避免了這種權衡。
Flink的一個重大價值在於, 它既保證了 exactly-once ,也具有低延遲和高吞吐的處理能力 。
從根本上說,Flink通過使自身滿足所有需求來避免權衡,它是業界的一次意義重大的技術飛躍。盡管這在外行看來很神奇,但是一旦了解,就會恍然大悟。
2)端到端(end-to-end)狀態一致性
目前我們看到的一致性保證都是由流處理器實現的,也就是說都是在 Flink 流處理器內部保證的;而在真實應用中,流處理應用除了流處理器以外還包含了數據源(例如 Kafka)和輸出到持久化系統。
端到端的一致性保證,意味著結果的正確性貫穿了整個流處理應用的始終;每一個組件都保證了它自己的一致性,整個端到端的一致性級別取決於所有組件中一致性最弱的組件。具體可以劃分如下:
1)source端 —— 需要外部源可重設數據的讀取位置
2)link內部 —— 依賴checkpoint
3)sink端 —— 需要保證從故障恢復時,數據不會重復寫入外部系統
而對於sink端,又有兩種具體的實現方式:冪等(Idempotent)寫入和事務性(Transactional)寫入。
4)冪等寫入
所謂冪等操作,是說一個操作,可以重復執行很多次,但只導致一次結果更改,也就是說,後面再重復執行就不起作用了。
5)事務寫入
需要構建事務來寫入外部系統,構建的事務對應著 checkpoint,等到 checkpoint 真正完成的時候,才把所有對應的結果寫入 sink 系統中。
對於事務性寫入,具體又有兩種實現方式:預寫日誌(WAL)和兩階段提交(2PC)。
8.檢查點(checkpoint)
Flink具體如何保證exactly-once呢? 它使用一種被稱為"檢查點"(checkpoint)的特性,在出現故障時將系統重置回正確狀態。下面通過簡單的類比來解釋檢查點的作用。
9.Flink+Kafka如何實現端到端的exactly-once語義
我們知道,端到端的狀態一致性的實現,需要每一個組件都實現,對於Flink + Kafka的數據管道系統(Kafka進、Kafka出)而言,各組件怎樣保證exactly-once語義呢?
1)內部 —— 利用checkpoint機制,把狀態存檔,發生故障的時候可以恢復,保證內部的狀態一致性
2)source —— kafka consumer作為source,可以將偏移量保存下來,如果後續任務出現了故障,恢復的時候可以由連接器重置偏移量,重新消費數據,保證一致性
3)sink —— kafka procer作為sink,採用兩階段提交 sink,需要實現一個TwoPhaseCommitSinkFunction
內部的checkpoint機制我們已經有了了解,那source和sink具體又是怎樣運行的呢?接下來我們逐步做一個分析。
我們知道Flink由JobManager協調各個TaskManager進行checkpoint存儲,checkpoint保存在 StateBackend中,默認StateBackend是內存級的,也可以改為文件級的進行持久化保存。
當checkpoint 啟動時,JobManager 會將檢查點分界線(barrier)注入數據流;barrier會在運算元間傳遞下去。
每個運算元會對當前的狀態做個快照,保存到狀態後端。對於source任務而言,就會把當前的offset作為狀態保存起來。下次從checkpoint恢復時,source任務可以重新提交偏移量,從上次保存的位置開始重新消費數據。
每個內部的transform 任務遇到 barrier 時,都會把狀態存到 checkpoint 里。
sink 任務首先把數據寫入外部 kafka,這些數據都屬於預提交的事務(還不能被消費);當遇到 barrier 時,把狀態保存到狀態後端,並開啟新的預提交事務。
當所有運算元任務的快照完成,也就是這次的 checkpoint 完成時,JobManager 會向所有任務發通知,確認這次 checkpoint 完成。
當sink 任務收到確認通知,就會正式提交之前的事務,kafka 中未確認的數據就改為「已確認」,數據就真正可以被消費了。
所以我們看到,執行過程實際上是一個兩段式提交,每個運算元執行完成,會進行「預提交」,直到執行完sink操作,會發起「確認提交」,如果執行失敗,預提交會放棄掉。
具體的兩階段提交步驟總結如下:
1)第一條數據來了之後,開啟一個 kafka 的事務(transaction),正常寫入 kafka 分區日誌但標記為未提交,這就是「預提交」
2)觸發 checkpoint 操作,barrier從 source 開始向下傳遞,遇到 barrier 的運算元將狀態存入狀態後端,並通知jobmanager
3)sink 連接器收到 barrier,保存當前狀態,存入 checkpoint,通知 jobmanager,並開啟下一階段的事務,用於提交下個檢查點的數據
4)jobmanager 收到所有任務的通知,發出確認信息,表示 checkpoint 完成
5)sink 任務收到 jobmanager 的確認信息,正式提交這段時間的數據
6)外部kafka關閉事務,提交的數據可以正常消費了。
10. 端到端的精確一次保證
Flink 任務 failover 之後,可能會重復寫出數據到 Sink 中,你們公司是怎麼做到端對端 exactly-once 的?
端對端 exactly-once 有 3 個條件:
⭐ Source 引擎可以重新消費,比如 Kafka 可以重置 offset 進行重新消費
⭐ Flink 任務配置 exactly-once,保證 Flink 任務 State 的 exactly-once
⭐ Sink 運算元支持兩階段或者可重入,保證產出結果的 exactly-once
其中前兩項一般大多數引擎都支持,我們需要關注的就是第 3 項,目前有兩種常用方法:
⭐ Sink 兩階段:由於兩階段提交是隨著 Checkpoint 進行的,假設 Checkpoint 是 5min 做一次,那麼數據對下游消費方的可見性延遲至少也是 5min,所以會有數據延遲等問題,目前用的比較少。
⭐ Sink 支持可重入:舉例:
⭐ Sink 為 MySQL:可以按照 key update 數據
⭐ Sink 為 Druid:聚合類型可以選用 longMax
⭐ Sink 為 ClickHouse:查詢時使用 longMax 或者使用 ReplacingMergeTree 表引擎將重復寫入的數據去重,這里有小夥伴會擔心 ReplacingMergeTree 會有性能問題,但是博主認為其實性能影響不會很大,因為 failover 導致的數據重復其實一般情況下是小概率事件,並且重復的數據量也不會很大,也只是一個 Checkpoint 周期內的數據重復,所以使用 ReplacingMergeTree 是可以接受的)
⭐ Sink 為 Redis:按照 key 更新數據
其他解答:Flink狀態一致性、端到端的精確一次保證
狀態一致性:當在分布式系統中引入狀態時,自然也引入了一致性問題。一致性實際上是"正確性級別"的另一種說法,也就是說在成功處理故障並恢復之後得到的結果,與沒有發生任何故障時得到的結果相比,前者到底有多正確?舉例來說,假設要對最近一小時登錄的用戶計數。在系統經歷故障之後,計數結果是多少?如果有偏差,是有漏掉的計數還是重復計數? 對於流處理內部來說,所謂的狀態一致性,其實就是我們所說的計算結果保證准確。在遇到故障時可以恢復狀態,恢復以後的重新計算,結果應該也是完全正確的。 一條數據不應該丟失,也不應該重復計算
一致性可以分為 3 個級別: at-most-once(最多一次):計數結果可能丟失
at-least-once (至少一次):計數程序在發生故障後可能多算,但是絕不會少算。
exactly-once (精確一次):系統保證在發生故障後得到的計數結果與正確值一致。
數據流(DataStream)內部保證exactly-once (精確一次)的方法:Flink 使用了一種輕量級快照機制 ---- 檢查點(checkpoint)來保證 exactly-once 語義
有狀態應用的一致檢查點,其實就是:所有任務的狀態,在某個時間點的一份拷貝(一份快照)。而這個時間點,應該是所有任務都恰好處理完一個相同的輸入數據的時候。
端到端保證一致性:
內部保證 —— 依賴 checkpoint
source 端 —— 需要外部源可重設數據的讀取位置
sink 端 —— 需要保證從故障恢復時,數據不會重復寫入外部系統
而對於 sink 端,又有兩種具體的實現方式:冪等(Idempotent)寫入和事務性(Transactional)寫入。
冪等操作:是說一個操作,可以重復執行很多次,但只導致一次結果更改,也就是說,後面再重復執行就不起作用了。 例如Hashmap 的寫入插入操作是冪等的操作,重復寫入,寫入的結果還一樣。
事務寫入:構建的事務對應著 checkpoint,等到 checkpoint 真正完成的時候,才把所有對應的結果寫入 sink 系統中
對於事務性寫入,具體又有兩種實現方式: 預寫日誌(WAL)和兩階段提交(2PC) 。DataStream API 提供了 GenericWriteAheadSink 模板類和TwoPhaseCommitSinkFunction 介面,可以方便地實現這兩種方式的事務性寫入。其中 預寫日誌(WAL)只能保證至少一次精確。
Flink+Kafka 端到端狀態一致性的保證
內部 —— 利用 checkpoint 機制,把狀態存檔,發生故障的時候可以恢復, 保證內部的狀態一致性
source —— kafka consumer 作為 source,可以將偏移量保存下來,如果後 續任務出現了故障,恢復的時候可以由連接器重置偏移量,重新消費數據, 保證一致性
sink —— kafka procer 作為 sink,採用兩階段提交 sink,需要實現一個 TwoPhaseCommitSinkFunction
由於端到端保證一致性需要用到兩階段提交(2PC)TwoPhaseCommitSinkFunction,我們來了解一下兩階段提交的方式:
第一條數據來了之後,開啟一個 kafka 的事務(transaction),正常寫入 kafka 分區日誌但標記為未提交,這就是「預提交」
jobmanager 觸發 checkpoint 操作,barrier 從 source 開始向下傳遞,遇到 barrier 的運算元將狀態存入狀態後端,並通知 jobmanager
sink 連接器收到 barrier,保存當前狀態,存入 checkpoint,通知 jobmanager,並開啟下一階段的事務,用於提交下個檢查點的數據
jobmanager 收到所有任務的通知,發出確認信息,表示 checkpoint 完成
sink 任務收到 jobmanager 的確認信息,正式提交這段時間的數據
外部 kafka 關閉事務,提交的數據可以正常消費了。
我們也可以看到,如果宕機需要通過 StateBackend 進行恢復,只能恢復所有確認提交的操作,之於有關後端狀態的選擇,後面再單獨聊聊