當前位置:首頁 » 編程語言 » flinksql開源
擴展閱讀
webinf下怎麼引入js 2023-08-31 21:54:13
堡壘機怎麼打開web 2023-08-31 21:54:11

flinksql開源

發布時間: 2023-05-30 16:38:26

1. 數據治理大數據湖倉一體開源框架

數據治坦拍理大數據湖倉一體開源框架分為4部分:

1、數據源

業務庫數據、用戶日誌、系統日賀盯志、爬蟲數據

2、構建集群

Hadoop,HDFS,Yarn

3.1 數據採集

數據採集工具:Sqoop、Flume、Canal、Sparkstreaming

3.2 數據預處理

數據預處理工具:消息系統Kafka,寬表工具Sparksql、FlinkSql

3.3數據存儲

Hbase數據讓拍羨庫集群、Clickhouse

3.4數據挖掘

Spark,Flink

4、數據可視化

FineBI 、PowerBI

開發工具:Intellij IDEA

2. Flink:特性、概念、組件棧、架構及原理分析

簡單之美 | Apache Flink:特性、概念、組件棧、架構及原理分析
http://shiyanjun.cn/archives/1508.html

Apache Flink是一個面向分布式數據流處理和批量數據處理的開源計算平台,它能夠基於同一個Flink運行時(Flink Runtime),提供支持流處理和批處理兩種類型應用的功能。現有的開源計算方案,會把流處理和批處理作為兩種不同的應用類型,因為他們它們所提供的SLA是完全不相同的:流處理一般需要支持低延遲、Exactly-once保證,而批處理需要支持高吞吐、高效處理,所以在實現的時候通常是分別給出兩套實現方法,或者通過一個獨立的開源框架來實現其中每一種處理方案。例如,實現批處理的開源方案有MapRece、Tez、Crunch、Spark,實現流處理的開源方案有Samza、Storm。Flink在實現流處理和批處理時,與傳統的一些方案完全不同,它從另一個視角看待流處理和批處理,將二者統一起來:Flink是完全支持流處理,也就是說作為流處理看待時輸入數據流是無界的;批處理被作為一種特殊的流處理,只是它的輸入數據流被定義為有界的。基於同一個Flink運行時(Flink Runtime),分別提供了流處理和批處理API,而這兩種API也是實現上層面向流處理、批處理類型應用框架的基礎。
基本特性
關於Flink所支持的特性,我這里只是通過分類的方式簡單做一下梳理,涉及到具體的一些概念及其原理會在後面的部分做詳細說明。
流處理特性
支持高吞吐、低延遲、高性能的流處理
支持帶有事件時間的窗口(Window)操作
支持有狀態計算的Exactly-once語義
支持高度靈活的窗口(Window)操作,支持基於time、count、session,以及data-driven的窗口操作
支持具有Backpressure功能的持續流模型
支持基於輕量級分布式快照(Snapshot)實現的容錯
一個運行時同時支持Batch on Streaming處理和Streaming處理
Flink在JVM內部實現了自己的內存管理
支持迭代計算
支持程序自動優化:避免特定情況下Shuffle、排序等昂貴操作,中間結果有必要進行緩存

API支持
對Streaming數據類應用,提供DataStream API
對批處理類應用,提供DataSet API(支持Java/Scala)

Libraries支持
支持機器學習(FlinkML)
支持圖分析(Gelly)
支持關系數據處理(Table)
支持復雜事件處理(CEP)

整合支持
支持Flink on YARN
支持HDFS
支持來自Kafka的輸入數據
支持Apache HBase
支持Hadoop程序
支持Tachyon
支持ElasticSearch
支持RabbitMQ
支持Apache Storm
支持S3
支持XtreemFS

基本概念
Stream & Transformation & Operator
用戶實現的Flink程序是由Stream和Transformation這兩個基本構建塊組成,其中Stream是一個中間結果數據,而Transformation是一個操作,它對一個或多個輸入Stream進行計算處理,輸出一個或多個結果Stream。當一個Flink程序被執行的時候,它會被映射為Streaming Dataflow。一個Streaming Dataflow是由一組Stream和Transformation Operator組成,它類似於一個DAG圖,在啟動的時候從一個或多個Source Operator開始,結束於一個或多個Sink Operator。下面是一個由Flink程序映射為Streaming Dataflow的示意圖,如下所示:

比如從Source[1]到map()[1],它保持了Source的分區特性(Partitioning)和分區內元素處理的有序性,也就是說map()[1]的Subtask看到數據流中記錄的順序,與Source[1]中看到的記錄順序是一致的。
Redistribution模式

這種模式改變了輸入數據流的分區,比如從map()[1]、map()[2]到keyBy()/window()/apply()[1]、keyBy()/window()/apply()[2],上游的Subtask向下游的多個不同的Subtask發送數據,改變了數據流的分區,這與實際應用所選擇的Operator有關系。另外,Source Operator對應2個Subtask,所以並行度為2,而Sink Operator的Subtask只有1個,故而並行度為1。
Task & Operator Chain
在Flink分布式執行環境中,會將多個Operator Subtask串起來組成一個Operator Chain,實際上就是一個執行鏈,每個執行鏈會在TaskManager上一個獨立的線程中執行,如下圖所示:

在Flink集群啟動的時候,TaskManager會向JobManager注冊,如果注冊成功,則JobManager會向TaskManager回復消息AcknowledgeRegistration。
SubmitJob

Flink程序內部通過Client向JobManager提交Flink Job,其中在消息SubmitJob中以JobGraph形式描述了Job的基本信息。
CancelJob

請求取消一個Flink Job的執行,CancelJob消息中包含了Job的ID,如果成功則返回消息CancellationSuccess,失敗則返回消息CancellationFailure。
UpdateTaskExecutionState

TaskManager會向JobManager請求更新ExecutionGraph中的ExecutionVertex的狀態信息,更新成功則返回true。
RequestNextInputSplit

運行在TaskManager上面的Task,請求獲取下一個要處理的輸入Split,成功則返回NextInputSplit。
JobStatusChanged

ExecutionGraph向JobManager發送該消息,用來表示Flink Job的狀態發生的變化,例如:RUNNING、CANCELING、FINISHED等。
TaskManager
TaskManager也是一個Actor,它是實際負責執行計算的Worker,在其上執行Flink Job的一組Task。每個TaskManager負責管理其所在節點上的資源信息,如內存、磁碟、網路,在啟動的時候將資源的狀態向JobManager匯報。TaskManager端可以分成兩個階段:
注冊階段

TaskManager會向JobManager注冊,發送RegisterTaskManager消息,等待JobManager返回AcknowledgeRegistration,然後TaskManager就可以進行初始化過程。
可操作階段

該階段TaskManager可以接收並處理與Task有關的消息,如SubmitTask、CancelTask、FailTask。如果TaskManager無法連接到JobManager,這是TaskManager就失去了與JobManager的聯系,會自動進入「注冊階段」,只有完成注冊才能繼續處理Task相關的消息。
Client
當用戶提交一個Flink程序時,會首先創建一個Client,該Client首先會對用戶提交的Flink程序進行預處理,並提交到Flink集群中處理,所以Client需要從用戶提交的Flink程序配置中獲取JobManager的地址,並建立到JobManager的連接,將Flink Job提交給JobManager。Client會將用戶提交的Flink程序組裝一個JobGraph, 並且是以JobGraph的形式提交的。一個JobGraph是一個Flink Dataflow,它由多個JobVertex組成的DAG。其中,一個JobGraph包含了一個Flink程序的如下信息:JobID、Job名稱、配置信息、一組JobVertex等。
組件棧
Flink是一個分層架構的系統,每一層所包含的組件都提供了特定的抽象,用來服務於上層組件。Flink分層的組件棧如下圖所示:

了解YARN的話,對上圖的原理非常熟悉,實際Flink也實現了滿足在YARN集群上運行的各個組件:Flink YARN Client負責與YARN RM通信協商資源請求,Flink JobManager和Flink TaskManager分別申請到Container去運行各自的進程。通過上圖可以看到,YARN AM與Flink JobManager在同一個Container中,這樣AM可以知道Flink JobManager的地址,從而AM可以申請Container去啟動Flink TaskManager。待Flink成功運行在YARN集群上,Flink YARN Client就可以提交Flink Job到Flink JobManager,並進行後續的映射、調度和計算處理。
Runtime層

Runtime層提供了支持Flink計算的全部核心實現,比如:支持分布式Stream處理、JobGraph到ExecutionGraph的映射、調度等等,為上層API層提供基礎服務。
API層

API層主要實現了面向無界Stream的流處理和面向Batch的批處理API,其中面向流處理對應DataStream API,面向批處理對應DataSet API。
Libraries層

該層也可以稱為Flink應用框架層,根據API層的劃分,在API層之上構建的滿足特定應用的實現計算框架,也分別對應於面向流處理和面向批處理兩類。面向流處理支持:CEP(復雜事件處理)、基於SQL-like的操作(基於Table的關系操作);面向批處理支持:FlinkML(機器學習庫)、Gelly(圖處理)。
內部原理
容錯機制
Flink基於Checkpoint機制實現容錯,它的原理是不斷地生成分布式Streaming數據流Snapshot。在流處理失敗時,通過這些Snapshot可以恢復數據流處理。理解Flink的容錯機制,首先需要了解一下Barrier這個概念:Stream Barrier是Flink分布式Snapshotting中的核心元素,它會作為數據流的記錄被同等看待,被插入到數據流中,將數據流中記錄的進行分組,並沿著數據流的方向向前推進。每個Barrier會攜帶一個Snapshot ID,屬於該Snapshot的記錄會被推向該Barrier的前方。因為Barrier非常輕量,所以並不會中斷數據流。帶有Barrier的數據流,如下圖所示:

接收到Barrier n的Stream被臨時擱置,來自這些Stream的記錄不會被處理,而是被放在一個Buffer中
一旦最後一個Stream接收到Barrier n,Operator會emit所有暫存在Buffer中的記錄,然後向Checkpoint Coordinator發送Snapshot n
繼續處理來自多個Stream的記錄

基於Stream Aligning操作能夠實現Exactly Once語義,但是也會給流處理應用帶來延遲,因為為了排列對齊Barrier,會暫時緩存一部分Stream的記錄到Buffer中,尤其是在數據流並行度很高的場景下可能更加明顯,通常以最遲對齊Barrier的一個Stream為處理Buffer中緩存記錄的時刻點。在Flink中,提供了一個開關,選擇是否使用Stream Aligning,如果關掉則Exactly Once會變成At least once。
調度機制
在JobManager端,會接收到Client提交的JobGraph形式的Flink Job,JobManager會將一個JobGraph轉換映射為一個ExecutionGraph,如下圖所示:

迭代機制
機器學習和圖計算應用,都會使用到迭代計算,Flink通過在迭代Operator中定義Step函數來實現迭代演算法,這種迭代演算法包括Iterate和Delta Iterate兩種類型,在實現上它們反復地在當前迭代狀態上調用Step函數,直到滿足給定的條件才會停止迭代。下面,對Iterate和Delta Iterate兩種類型的迭代演算法原理進行說明:
Iterate

Iterate Operator是一種簡單的迭代形式:每一輪迭代,Step函數的輸入或者是輸入的整個數據集,或者是上一輪迭代的結果,通過該輪迭代計算出下一輪計算所需要的輸入(也稱為Next Partial Solution),滿足迭代的終止條件後,會輸出最終迭代結果,具體執行流程如下圖所示:

Delta Iterate Operator實現了增量迭代,它的實現原理如下圖所示:

另外,Flink還提供了3個參數來配置Backpressure監控行為:
參數名稱
默認值
說明

jobmanager.web.backpressure.refresh-interval
60000
默認1分鍾,表示采樣統計結果刷新時間間隔

jobmanager.web.backpressure.num-samples
100
評估Backpressure狀態,所使用的堆棧跟蹤調用次數

jobmanager.web.backpressure.delay-between-samples
50
默認50毫秒,表示對一個Job的每個Task依次調用的時間間隔

通過上面個定義的Backpressure狀態,以及調整相應的參數,可以確定當前運行的Job的狀態是否正常,並且保證不影響JobManager提供服務。
參考鏈接
http://flink.apache.org/
http://flink.apache.org/features.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/general_arch.html
http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/stream_checkpointing.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/internals/job_scheling.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/event_time.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/yarn_setup.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/setup/jobmanager_high_availability.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/streaming/libs/cep.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/libs/gelly.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/libs/ml/index.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/libs/table.html
https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/connectors.html
http://geek.csdn.net/news/detail/56272
http://samza.apache.org/

3. 開源的大數據框架有哪些

文件存儲:Hadoop HDFS、Tachyon、KFS離線計算:Hadoop MapRece、Spark流式、實時計算:Storm、Spark Streaming、S4、HeronK-V、NOSQL資料庫:HBase、Redis、MongoDB資源管念埋理:YARN、Mesos日誌收集:Flume、Scribe、Logstash、Kibana消息系統:Kafka、StormMQ、ZeroMQ、RabbitMQ查詢分析:Hive、Impala、Pig、Presto、Phoenix、SparkSQL、Drill、Flink、Kylin、祥豎Druid分布式協調服務:Zookeeper集謹高大群管理與監控:Ambari、Ganglia、Nagios、Cloudera

4. 為什麼Flink會成為下一代大數據處理框架的標准

作者:張利兵

如需轉載請聯系華章 科技

在當前數據量激增傳統的時代,不同的業務場景都有大量的業務數據產生, 對於這些不斷產生的數據應該如何進行有效地處理,成為當下大多數公司所面臨的問題。

隨著雅虎對Hadoop的開源,越來越多的大數據處理技術開始湧入人們的視線,例如目前比較流行大數據處理引擎Apache Spark,基本上已經取代了MapRece成為當前大數據處理的標准。

但隨著數據的不斷增長,新技術的不斷發展,人們逐漸意識到對實時數據處理的重要性,企業需要能夠同時支持高吞吐、低延遲、高性能的流處理技術來處理日益增長的數據。

相對於傳統的數據處理模式,流式數據處理則有著更高的處理效率和成本控制。 Apache Flink就是近年來在開源社區發展不斷發展的能夠支持同時支持高吞吐、低延遲、高性能分布式處理框架。

在2010至2014年間,由柏林工業大學,柏林洪堡大學和哈索普拉特納研究所聯合發起名為「Stratosphere: Information Management on the Cloud」研究項目,該項目在當時的社區逐漸具有一定社區知名度,2014年4月,Stratosphere代碼被貢獻給Apache 軟體基金會,成為Apache基金會孵化器項目。

期初參與該項目的核心成員均來自Stratosphere原來的核心成員,之後團隊的大部分創始成員離開學校,共同創辦了一家名叫Data Artisans的公司,其主要業務便是將Stratosphere,也就是之後的Flink實現商業化。在項目孵化期間,項目Stratosphere改名為Flink。

Flink在德語中是快速和靈敏的意思,用來體現流式數據處理器的速度快和靈活性強等特點,同時使用棕紅色松鼠圖案作為Flink項目的Logo,也是主要藉助於松鼠靈活快速的特點,由此Flink開始正式地進入社區開發者的視線。

在2014年12月,該項目成為Apache 軟體基金會頂級項目,從2015年09月發布第一個穩定版本0.9,到2019年4月已經發布到1.8的版本,更多的社區開發成員也逐步地加入,現在Flink在全球范輪前圍內擁有燃棚350多位的開發人員, 不斷有新的特性被發布。

同時在全球范圍內,越來越多的公司開始使用Flink,在國內比較出名的互聯網公司如Alibaba,美團,滴滴等,都在大規模的使用Flink作為企業的分布式大數據處理引擎。

Flink在近年來逐步被人們所熟知和使用,其主要原因不僅因為提供同時支持高吞吐、低延遲和exactly-once語義的實時計算能力,同時Flink還提供了基於流式計算引擎處理批量數據的計算能力,真正意義實現了批流統一,同時隨著Alibaba對Blink的開源,極大地增強了Flink對批計算領域的支持。

眾多優秀的特性,使得Flink成為開源大數據數據處理框架中的一顆新星,隨著國內社區不斷推動, 越來越多的國內公司開始選擇使用Flink作為實時數據處理的技術 ,在將來不久的時間內,Flink也將會成為企業內部主流的數據處理框架,最終成為下一代大數據數據處理框架的標准。

有狀態流計算將會隨著技術的發展,逐步成為企業作為構建數據平台的架構模式,而這種技術實現的開源方案目前從社區來看,能夠滿足的就是Apache Flink。 Flink通過實現Google Dataflow流式計算模型實現了高吞吐,低延遲,高性能兼具實時流式計算框架。

同時Flink支持高效容錯的狀態管理,Flink能夠將其狀態維護在內存或RockDB資料庫中,為了防止狀態在計算過程中因為系統異常而出現丟失,Flink周期性的通過分布式快照技術CheckPoints實現狀態的持久化維護,使得在系統即使在停機或者異常的情況下都能正確的進行狀態恢復,從而保證皮桐則在任何時間都能計算出正確的結果。

數據架構的演變過程,伴隨著技術的不斷迭代更新,Flink具有先進的架構理念,以及諸多的優秀特性,以及完善的編程介面,而Flink也在每一次的Release版本中,不斷推出新的特性。

例如Queryable State功能的提出,將直接容許用戶通過遠程的方式直接獲取流式計算任務的狀態信息,也就是說數據不需要落地資料庫就能直接從流式應用中直接查詢出,對於實時互動式的查詢業務可以直接從Flink的狀態中查詢最新的結果,當然這個功能目前還屬於Beta版本,但是相信在不久的未來,會變得越來越完善,那時Flink將不僅作為實時流式處理的框架,更多的可能會成為一套實時的存儲引擎, 會讓更多的用戶從有狀態計算的技術中獲取收益。

Flink是一套集高吞吐,低延遲,高性能三者於一身的分布式流式數據處理框架。

非常成熟的計算框架Apache Spark也只能兼顧高吞吐和高性能特性,在Spark Streaming流式計算中無法做到低延遲保障;而Apache Storm只能支持低延遲和高性能特性,但是無法滿足高吞吐的要求。而對於滿足高吞吐,低延遲,高性能這三個目標對分布式流式計算框架是非常重要的。

在流式計算領域中,窗口計算的地位舉足輕重,但目前大多數計算框架窗口計算所採用的都是系統時間(Process Time),也是事件傳輸到計算框架處理時,系統主機的當前時間,Flink能夠支持基於 事件時間 (Event Time)語義的進行窗口計算,就是使用事件產生的時間,這種時間機制使得事件即使無序到達甚至延遲到達,數據流都能夠計算出精確的結果,同時保持了事件原本產生時的在時間維度的特點,而不受網路傳輸或者計算框架的影響。

Flink在1.4版本中實現了狀態管理,所謂狀態就是在流式計算過程中將運算元的中間結果數據的保存在內存或者DB中,等下一個事件進入接著從狀態中獲取中間結果進行計算,從而無需基於全部的原始數據統計結果,這種做法極大地提升了系統的性能,同時也降低了計算過程的耗時。

對於數據量非常大且邏輯運算非常復雜的流式運算,基於狀態的流式計算則顯得非常使用。

在流處理應用中,數據是連續不斷的,需要通過窗口的方式對流數據進行一定范圍的聚合計算,例如統計在過去的1分鍾內有多少用戶點擊了某一網頁,在這種情況下,我們必須定義一個窗口,用來收集最近一分鍾內的數據,並對這個窗口內的數據再進行計算。

Flink將窗口劃分為基於Time、Count、Session,以及Data-driven等類型的窗口操作,窗口能夠用靈活的觸發條件定製化從而達到對復雜的流傳輸模式的支持,不同的窗口操作應用能夠反饋出真實事件產生的情況,用戶可以定義不同的窗口觸發機制來滿足不同的需求。

Flink能夠分布式運行在上千個節點之上,將一個大型計算的流程拆解成小的計算過程,然後將計算過程分布到單台並行節點上進行處理。

在任務執行過程中,能夠自動的發現事件處理過程中的錯誤而導致數據不一致的問題,常見的錯誤類型例如:節點宕機,或者網路傳輸問題,或是由於用戶因為升級或修復問題而導致計算服務重啟等。

在這些情況下,通過基於分布式快照技術的 Checkpoints ,將執行過程中的任務信息進行持久化存儲,一旦任務出現異常宕機,Flink能夠進行任務的自動恢復,從而確保數據在處理過程中的一致性。

內存管理是每套計算框架需要重點考慮的領域,尤其對於計算量比較大的計算場景,數據在內存中該如何進行管理,針對內存管理這塊,Flink實現了自身管理內存的機制,盡可能減少Full GC對系統的影響。

另外通過自定義序列化/反序列化方法將所有的對象轉換成二進制在內存中存儲,降低數據存儲的大小,更加有效的對內存空間進行利用,降低GC所帶來的性能下降或者任務停止的風險,同時提升了分布式處理過數據傳輸的性能。

因此Flink較其他分布式處理的框架則會顯得更加穩定,不會因為JVM GC等問題而導致整個應用宕機的問題。

對於7*24小時運行的流式應用,數據源源不斷的接入,在一段時間內應用的終止都有可能導致數據的丟失或者計算結果的不準確性,例如進行版本的升級,停機運維操作等,都能導致這種情況發生。

然而值得一提的是Flink通過其Save Points技術能夠將任務執行的快照(Snapshot)保存在存儲介質上,等待任務重啟的時候可以直接從實現保存的Save Points恢復原有的計算狀態,使得任務繼續按照停機之前的狀態繼續運行,Save Points技術可以讓用戶更好的管理和運維實時流式應用。

同時Flink除了上述的特性之外也具有其他非常優秀的特性,可以讓用戶有更多選擇。 Flink具備非常多的優秀特性,這不僅讓Flink在社區的知名度越來越高,也吸引了眾多的企業參與研發和使用Flink這項技術。

關於作者:張利兵,資深架構師,流式計算領域專家,第四範式華東區AI項目架構師,原明略數據華東區大數據架構師。有多年大數據、流式計算方面的開發經驗,對Hadoop、Spark、Flink等大數據計算引擎有著非常深入的理解,積累了豐富的項目實踐經驗。

推薦語: 從功能、原理、實戰和調優4個維度循序漸進講解利用Flink進行分布式流式應用開發,指導讀者從零基礎入門到進階。適讀人群:流計算開發工程師、大數據架構工程師、大數據開發工程師、數據挖掘工程師、高校研究生以及高年級本科生。

5. 什麼是大數據的主流框架

市場上有許多可用的框架。其中一些更受歡迎,例如Spark,Hadoop,Hive和Storm。Presto在效用指數上得分很高,而Flink具有巨大的潛力。
1. Apache Hadoop
Hadoop是基於Java的平台。這是一個開放源代碼框架,可跨集群排列的一組硬體機器提供批處理數據處理和數據存儲服務。Hadoop同樣適用於可靠,可擴展和分布式的計算。但是,它也可以用作通用文件存儲。它可以存儲和處理PB的信息。Hadoop由三個主要組件組成。
2. Apache Spark
Spark框架由加利福尼亞大學伯克利分校成立。它是具有改進的數據流處理的批處理框架。藉助完整的內存計算以及處理優化,它保證了極其快速的集群計算系統。
3.Apache Storm
Apache Storm是另一個引人注目的解決方案,專注於處理巨大的實時數據流。Storm的主要亮點是可伸縮性和停機後的迅速恢復能力。
4. Apache Flink
Apache Flink是一個開源框架,同樣適用於批處理和流數據處理。它最適合於集群環境。該框架基於轉換–流概念。它也是大數據的4G。它比Hadoop – Map Rece快100倍。
5. Presto
Presto是最適合較小數據集的開源分布式SQL工具。Presto配備了協調員以及各種工人。當客戶提交查詢時,將對這些查詢進行解析,分析,計劃執行並分配給協調員在工作人員之間進行處理。
6. Samza
Apache Samza是有狀態的流,准備與Kafka共同開發的大數據系統。Kafka提供數據服務,緩沖和容錯能力。

6. Flink SQL 知其所以然(五)| 自定義 protobuf format

protobuf 作為目前各大公司中最廣泛使用的高效的協議數據交換格式工具庫,會大量作為流式數據傳輸的序列化方式,所以在 flink sql 中如果能實現 protobuf 的 format 會非常有用( 目前社區已經有對應的實現,不過目前還沒有 merge,預計在 1.14 系列版本中能 release )。

issue 見: https://issues.apache.org/jira/browse/FLINK-18202?filter=-4&jql=project%20%3D%20FLINK%20AND%20issuetype%20%3D%20%22New%20Feature%22%20AND%20text%20~%20protobuf%20order%20by%20created%20DESC

pr 見: https://github.com/apache/flink/pull/14376

這一節主要介紹 flink sql 中怎麼自定義實現 format ,其中以最常使用的 protobuf 作為案例來介紹。

如果想在本地直接測試下:

關於為什麼選擇 protobuf 可以看這篇文章,寫的很詳細:

http://hengyunabc.github.io/thinking-about-grpc-protobuf/?utm_source=tuicool&utm_medium=referral

在實時計算的領域中,為了可讀性會選擇 json ,為了效率以及一些已經依賴了 grpc 的公司會選擇 protobuf 來做數據序列化,那麼自然而然,日誌的序列化方式也會選擇 protobuf 。

而官方目前已經 release 的版本中是沒有提供 flink sql api 的 protobuf format 的。如下圖,基於 1.13 版本。

https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/overview/

因此本文在介紹怎樣自定義一個 format 的同時,實現一個 protobuf format 來給大家使用。

預期效果是先實現幾種最基本的數據類型,包括 protobuf 中的 message (自定義 model)、 map (映射)、 repeated (列表)、其他基本數據類型等,這些都是我們最常使用的類型。

預期 protobuf message 定義如下:

測試數據源數據如下,博主把 protobuf 的數據轉換為 json,以方便展示,如下圖:

預期 flink sql:

數據源表 DDL:

數據匯表 DDL:

Transform 執行邏輯:

下面是我在本地跑的結果:

可以看到列印的結果,數據是正確的被反序列化讀入,並且最終輸出到 console。

目前業界可以參考的實現如下: https://github.com/maosuhan/flink-pb , 也就是這位哥們負責目前 flink protobuf 的 format。

這種實現的具體使用方式如下:

其實現有幾個特點:

[圖片上傳失敗...(image-66c35b-1644940704671)]

其實上節已經詳細描述了 flink sql 對於 sourcesinkformat 的載入機制。

如圖 serde format 是通過 TableFactoryHelper.discoverDecodingFormat 和 TableFactoryHelper.discoverEncodingFormat 創建的

所有通過 SPI 的 sourcesinkformt 插件都繼承自 Factory 。

整體創建 format 方法的調用鏈如下圖。

最終實現如下,涉及到了幾個實現類:

具體流程:

上述實現類的具體關系如下:

介紹完流程,進入具體實現方案細節:

ProtobufFormatFactory 主要創建 format 的邏輯:

resourcesMETA-INF 文件:

主要實現反序列化的邏輯:

可以注意到上述反序列化的主要邏輯就集中在 runtimeConverter 上,即 ProtobufToRowDataConverters.ProtobufToRowDataConverter 。

ProtobufToRowDataConverters.ProtobufToRowDataConverter 就是在 ProtobufToRowDataConverters 中定義的。

ProtobufToRowDataConverters.ProtobufToRowDataConverter 其實就是一個 convertor 介面:

其作用就是將 protobuf message 中的每一個欄位轉換成為 RowData 中的每一個欄位。

ProtobufToRowDataConverters 中就定義了具體轉換邏輯,如截圖所示,每一個 LogicalType 都定義了 protobuf message 欄位轉換為 flink 數據類型的邏輯:

源碼後台回復 flink sql 知其所以然(五)| 自定義 protobuf format 獲取。

本文主要是針對 flink sql protobuf format 進行了原理解釋以及對應的實現。
如果你正好需要這么一個 format,直接後台回復 flink sql 知其所以然(五)| 自定義 protobuf format 獲取源碼吧。

當然上述只是 protobuf format 一個基礎的實現,用於生產環境還有很多方面可以去擴展的。

7. flinksql-core-動態表

普通動態表是FlinkSQL中的一類表,表中的數據與連接的外部數據對等,可以簡單理解為把一張mysql的表放進flink內存中得到的表,並且該表與mysql表有連接關系,即該表可以讀寫mysql表。

需要聲明表的欄位定義和表屬性(連接器屬性)。語法如下:

with關鍵字前面的是欄位定義,with關鍵字後面的是表屬性。其中欄位定義時還可以聲明表主鍵,聲明語法為PARIMARY KEY(myColumn1,...) NOT ENFORCED, 這里的not enforced表示flinksql不會對主鍵做強制的唯一性約束、非空約束,而且目前flinksql中只支持這種類型的主鍵。
表屬性中有若干個屬性欄位需要聲明,具體有哪些屬性欄位取決於使用哪個連接器,如上述聲明中使用的是jdbc連接器,在使用該連接器時需要提供url、username、password等屬性,通過此連接器我們就可以讓該表能連接到對應的mysql表。

我們可以查詢flinksql普通動態表的數據,此數據與連接的外部數據是一致的。語法如下:

tips:在運行時,只會載入一次外部數據到flinksql普通動態表。後續外部數據表有更新時,flinksql的普通動態表不會跟著自動更新。

我們可以把數據寫入到flinksql動態表,從而實現寫入數據到外部系統的目的。語法如下:

8. Flink SQL實戰演練之自定義Clickhouse Connector

簡介:實時數倉目前的架構是flink+clickhouse,社區目前jdbc connector不支持clickhouse的方言,所以決定自定義clickhouse connector實現flink sql寫入數據到clickhouse。

目前想要實現flink sql數據落地到ck,可以修改jdbc connector的源碼,增加ck方言,或者採用阿里提供的ck connector包,為了更好的理解flink connector的原理,這里自定義connector實現。

目前支持Flink寫入Clickhouse的依賴哭比較多,如果數據格式固定,可以CSV的方式寫入,如果不固定,可採用Json的方式寫入。

9. 轉載:阿里巴巴為什麼選擇Apache Flink

本文主要整理自阿里巴巴計算平台事業部資深技術專家莫問在雲棲大會的演講。

合抱之木,生於毫末

隨著人工智慧時代的降臨,數據量的爆發,在典型的大數據的業務場景下數據業務最通用的做法是:選用批處理的技術處理全量數據,採用流式計算處理實時增量數據。在絕大多數的業務場景之下,用戶的業務邏輯在批處理和流處理之中往往是相同的。但是,用戶用於批處理和流處理的兩套計算引擎是不同的。

因此,用戶通常需要寫兩套代碼。毫無疑問,這帶來了一些額外的負擔和成本。阿里巴巴的商品數據處理就經常需要面對增量和全量兩套不同的業務流程問題,所以阿里就在想,我們能不能有一套統一的大數據引擎技術,用戶只需要根據自己的業務邏輯開發一套代碼。這樣在各種不同的場景下,不管是全量數據還是增量數據,亦或者實時處理,一套方案即可全部支持, 這就是阿里選擇Flink的背景和初衷

目前開源大數據計算引擎有很多選擇,流計算如Storm,Samza,Flink,Kafka Stream等,批處理如Spark,Hive,Pig,Flink等。而同時支持流處理和批處理的計算引擎,只有兩種選擇:一個是Apache Spark,一個是Apache Flink。

從技術,生態等各方面的綜合考慮。首先,Spark的技術理念是基於批來模擬流的計算。而Flink則完全相反,它採用的是基於流計算來模擬批計算。

從技術發展方向看,用批來模擬流有一定的技術局限性,並且這個局限性可能很難突破。而Flink基於流來模擬批,在技術上有更好的擴展性。從長遠來看,阿里決定用Flink做一個統一的、通用的大數據引擎作為未來的選型。

Flink是一個低延遲、高吞吐、統一的大數據計算引擎。在阿里巴巴的生產環境中,Flink的計算平台可以實現毫秒級的延遲情況下,每秒鍾處理上億次的消息或者事件。同時Flink提供了一個Exactly-once的一致性語義。保證了數據的正確性。這樣就使得Flink大數據引擎可以提供金融級的數據處理能力。

Flink在阿里的現狀

基於Apache Flink在阿里巴巴搭建的平台於2016年正式上線,並從阿里巴巴的搜索和推薦這兩大場景開始實現。目前阿里巴巴所有的業務,包括阿里巴巴所有子公司都採用了基於Flink搭建的實時計算平台。同時Flink計算平台運行在開源的Hadoop集群之上。採用Hadoop的YARN做為資源管理調度,以 HDFS作為數據存儲。因此,Flink可以和開源大數據軟體Hadoop無縫對接。

目前,這套基於Flink搭建的實時計算平台不僅服務於阿里巴巴集團內部,而且通過阿里雲的雲產品API向整個開發者生態提供基於Flink的雲產品支持。

Flink在阿里巴巴的大規模應用,表現如何?

規模: 一個系統是否成熟,規模是重要指標,Flink最初上線阿里巴巴只有數百台伺服器,目前規模已達上萬台,此等規模在全球范圍內也是屈指可數;

狀態數據: 基於Flink,內部積累起來的狀態數據已經是PB級別規模;

Events: 如今每天在Flink的計算平台上,處理的數據已經超過萬億條;

PS: 在峰值期間可以承擔每秒超過4.72億次的訪問,最典型的應用場景是阿里巴巴雙11大屏;

Flink的發展之路

接下來從開源技術的角度,來談一談Apache Flink是如何誕生的,它是如何成長的?以及在成長的這個關鍵的時間點阿里是如何進入的?並對它做出了那些貢獻和支持?

Flink誕生於歐洲的一個大數據研究項目StratoSphere。該項目是柏林工業大學的一個研究性項目。早期,Flink是做Batch計算的,但是在2014年,StratoSphere裡面的核心成員孵化出Flink,同年將Flink捐贈Apache,並在後來成為Apache的頂級大數據項目,同時Flink計算的主流方向被定位為Streaming,即用流式計算來做所有大數據的計算,這就是Flink技術誕生的背景。

2014年Flink作為主攻流計算的大數據引擎開始在開源大數據行業內嶄露頭角。區別於Storm,Spark Streaming以及其他流式計算引擎的是:它不僅是一個高吞吐、低延遲的計算引擎,同時還提供很多高級的功能。比如它提供了有狀態的計算,支持狀態管理,支持強一致性的數據語義以及支持Event Time,WaterMark對消息亂序的處理。

Flink核心概念以及基本理念

Flink最區別於其他流計算引擎的,其實就是狀態管理。

什麼是狀態?例如開發一套流計算的系統或者任務做數據處理,可能經常要對數據進行統計,如Sum,Count,Min,Max,這些值是需要存儲的。因為要不斷更新,這些值或者變數就可以理解為一種狀態。如果數據源是在讀取Kafka,RocketMQ,可能要記錄讀取到什麼位置,並記錄Offset,這些Offset變數都是要計算的狀態。

Flink提供了內置的狀態管理,可以把這些狀態存儲在Flink內部,而不需要把它存儲在外部系統。這樣做的好處是第一降低了計算引擎對外部系統的依賴以及部署,使運維更加簡單;第二,對性能帶來了極大的提升:如果通過外部去訪問,如Redis,HBase它一定是通過網路及RPC。如果通過Flink內部去訪問,它只通過自身的進程去訪問這些變數。同時Flink會定期將這些狀態做Checkpoint持久化,把Checkpoint存儲到一個分布式的持久化系統中,比如HDFS。這樣的話,當Flink的任務出現任何故障時,它都會從最近的一次Checkpoint將整個流的狀態進行恢復,然後繼續運行它的流處理。對用戶沒有任何數據上的影響。

Flink是如何做到在Checkpoint恢復過程中沒有任何數據的丟失和數據的冗餘?來保證精準計算的?

這其中原因是Flink利用了一套非常經典的Chandy-Lamport演算法,它的核心思想是把這個流計算看成一個流式的拓撲,定期從這個拓撲的頭部Source點開始插入特殊的Barries,從上游開始不斷的向下游廣播這個Barries。每一個節點收到所有的Barries,會將State做一次Snapshot,當每個節點都做完Snapshot之後,整個拓撲就算完整的做完了一次Checkpoint。接下來不管出現任何故障,都會從最近的Checkpoint進行恢復。

Flink利用這套經典的演算法,保證了強一致性的語義。這也是Flink與其他無狀態流計算引擎的核心區別。

下面介紹Flink是如何解決亂序問題的。比如星球大戰的播放順序,如果按照上映的時間觀看,可能會發現故事在跳躍。

在流計算中,與這個例子是非常類似的。所有消息到來的時間,和它真正發生在源頭,在線系統Log當中的時間是不一致的。在流處理當中,希望是按消息真正發生在源頭的順序進行處理,不希望是真正到達程序里的時間來處理。Flink提供了Event Time和WaterMark的一些先進技術來解決亂序的問題。使得用戶可以有序的處理這個消息。這是Flink一個很重要的特點。

接下來要介紹的是Flink啟動時的核心理念和核心概念,這是Flink發展的第一個階段;第二個階段時間是2015年和2017年,這個階段也是Flink發展以及阿里巴巴介入的時間。故事源於2015年年中,我們在搜索事業部的一次調研。當時阿里有自己的批處理技術和流計算技術,有自研的,也有開源的。但是,為了思考下一代大數據引擎的方向以及未來趨勢,我們做了很多新技術的調研。

結合大量調研結果,我們最後得出的結論是:解決通用大數據計算需求,批流融合的計算引擎,才是大數據技術的發展方向,並且最終我們選擇了Flink。

但2015年的Flink還不夠成熟,不管是規模還是穩定性尚未經歷實踐。最後我們決定在阿里內部建立一個Flink分支,對Flink做大量的修改和完善,讓其適應阿里巴巴這種超大規模的業務場景。在這個過程當中,我們團隊不僅對Flink在性能和穩定性上做出了很多改進和優化,同時在核心架構和功能上也進行了大量創新和改進,並將其貢獻給社區,例如:Flink新的分布式架構,增量Checkpoint機制,基於Credit-based的網路流控機制和Streaming SQL等。

阿里巴巴對Flink社區的貢獻

我們舉兩個設計案例,第一個是阿里巴巴重構了Flink的分布式架構,將Flink的Job調度和資源管理做了一個清晰的分層和解耦。這樣做的首要好處是Flink可以原生的跑在各種不同的開源資源管理器上。經過這套分布式架構的改進,Flink可以原生地跑在Hadoop Yarn和Kubernetes這兩個最常見的資源管理系統之上。同時將Flink的任務調度從集中式調度改為了分布式調度,這樣Flink就可以支持更大規模的集群,以及得到更好的資源隔離。

另一個是實現了增量的Checkpoint機制,因為Flink提供了有狀態的計算和定期的Checkpoint機制,如果內部的數據越來越多,不停地做Checkpoint,Checkpoint會越來越大,最後可能導致做不出來。提供了增量的Checkpoint後,Flink會自動地發現哪些數據是增量變化,哪些數據是被修改了。同時只將這些修改的數據進行持久化。這樣Checkpoint不會隨著時間的運行而越來越難做,整個系統的性能會非常地平穩,這也是我們貢獻給社區的一個很重大的特性。

經過2015年到2017年對Flink Streaming的能力完善,Flink社區也逐漸成熟起來。Flink也成為在Streaming領域最主流的計算引擎。因為Flink最早期想做一個流批統一的大數據引擎,2018年已經啟動這項工作,為了實現這個目標,阿里巴巴提出了新的統一API架構,統一SQL解決方案,同時流計算的各種功能得到完善後,我們認為批計算也需要各種各樣的完善。無論在任務調度層,還是在數據Shuffle層,在容錯性,易用性上,都需要完善很多工作。

篇幅原因,下面主要和大家分享兩點:

● 統一 API Stack

● 統一 SQL方案

先來看下目前Flink API Stack的一個現狀,調研過Flink或者使用過Flink的開發者應該知道。Flink有2套基礎的API,一套是DataStream,一套是DataSet。DataStream API是針對流式處理的用戶提供,DataSet API是針對批處理用戶提供,但是這兩套API的執行路徑是完全不一樣的,甚至需要生成不同的Task去執行。所以這跟得到統一的API是有沖突的,而且這個也是不完善的,不是最終的解法。在Runtime之上首先是要有一個批流統一融合的基礎API層,我們希望可以統一API層。

因此,我們在新架構中將採用一個DAG(有限無環圖)API,作為一個批流統一的API層。對於這個有限無環圖,批計算和流計算不需要涇渭分明的表達出來。只需要讓開發者在不同的節點,不同的邊上定義不同的屬性,來規劃數據是流屬性還是批屬性。整個拓撲是可以融合批流統一的語義表達,整個計算無需區分是流計算還是批計算,只需要表達自己的需求。有了這套API後,Flink的API Stack將得到統一。

除了統一的基礎API層和統一的API Stack外,同樣在上層統一SQL的解決方案。流和批的SQL,可以認為流計算有數據源,批計算也有數據源,我們可以將這兩種源都模擬成數據表。可以認為流數據的數據源是一張不斷更新的數據表,對於批處理的數據源可以認為是一張相對靜止的表,沒有更新的數據表。整個數據處理可以當做SQL的一個Query,最終產生的結果也可以模擬成一個結果表。

對於流計算而言,它的結果表是一張不斷更新的結果表。對於批處理而言,它的結果表是相當於一次更新完成的結果表。從整個SOL語義上表達,流和批是可以統一的。此外,不管是流式SQL,還是批處理SQL,都可以用同一個Query來表達復用。這樣以來流批都可以用同一個Query優化或者解析。甚至很多流和批的運算元都是可以復用的。

Flink的未來方向

首先,阿里巴巴還是要立足於Flink的本質,去做一個全能的統一大數據計算引擎。將它在生態和場景上進行落地。目前Flink已經是一個主流的流計算引擎,很多互聯網公司已經達成了共識:Flink是大數據的未來,是最好的流計算引擎。下一步很重要的工作是讓Flink在批計算上有所突破。在更多的場景下落地,成為一種主流的批計算引擎。然後進一步在流和批之間進行無縫的切換,流和批的界限越來越模糊。用Flink,在一個計算中,既可以有流計算,又可以有批計算。

第二個方向就是Flink的生態上有更多語言的支持,不僅僅是Java,Scala語言,甚至是機器學習下用的Python,Go語言。未來我們希望能用更多豐富的語言來開發Flink計算的任務,來描述計算邏輯,並和更多的生態進行對接。

最後不得不說AI,因為現在很多大數據計算的需求和數據量都是在支持很火爆的AI場景,所以在Flink流批生態完善的基礎上,將繼續往上走,完善上層Flink的Machine Learning演算法庫,同時Flink往上層也會向成熟的機器學習,深度學習去集成。比如可以做Tensorflow On Flink, 讓大數據的ETL數據處理和機器學習的Feature計算和特徵計算,訓練的計算等進行集成,讓開發者能夠同時享受到多種生態給大家帶來的好處。

10. flink sql 知其所以然(十三):流 join問題解決

本節是 flink sql 流 join 系列的下篇,上篇的鏈接如下:

廢話不多說,咱們先直接上本文的目錄和結論,小夥伴可以先看結論快速了解博主期望本文能給小夥伴們帶來什麼幫助:

書接上文,上文介紹了曝光流在關聯點擊流時,使用 flink sql regular join 存在的 retract 問題。

本文介紹怎麼使用 flink sql interval join 解決這些問題。

flink sql 知其所以然(十二):流 join 很難嘛???(上)

看看上節的實際案例,來看看在具體輸入值的場景下,輸出值應該長啥樣。

場景:即常見的曝光日誌流(show_log)通過 log_id 關聯點擊日誌流(click_log),將數據的關聯結果進行下發。

來一波輸入數據:

曝光數據:

點擊數據:

預期輸出數據如下:

上節的 flink sql regular join 解決方案如下:

上節說道,flink sql left join 在流數據到達時,如果左表流(show_log)join 不到右表流(click_log) ,則不會等待右流直接輸出(show_log,null),在後續右表流數據代打時,會將(show_log,null)撤回,發送(show_log,click_log)。這就是為什麼產生了 retract 流,從而導致重復寫入 kafka。

對此,我們也是提出了對應的解決思路,既然 left join 中左流不會等待右流,那麼能不能讓左流強行等待右流一段時間,實在等不到在數據關聯不到的數據即可。

當當當!!!

本文的 flink sql interval join 登場,它就能等。

大家先通過下面這句話和圖簡單了解一下 interval join 的作用(熟悉 DataStream 的小夥伴萌可能已經使用過了),後續會詳細介紹原理。

interval join 就是用一個流的數據去關聯另一個流的一段時間區間內的數據。關聯到就下發關聯到的數據,關聯不到且在超時後就根據是否是 outer join(left join,right join,full join)下發沒關聯到的數據。

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">interval join</figcaption>

來看看上述案例的 flink sql interval join sql 怎麼寫:

這里設置了 show_log.row_time BETWEEN click_log.row_time - INTERVAL ཆ' MINUTE AND click_log.row_time + INTERVAL ཆ' MINUTE 代表 show_log 表中的數據會和 click_log 表中的 row_time 在前後 10 分鍾之內的數據進行關聯。

運行結果如下:

如上就是我們期望的正確結果了。

flink web ui 運算元圖如下:

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">flink web ui</figcaption>

那麼此時你可能有一個問題,結果中的前兩條數據 join 到了輸出我是理解的,那當 show_log join 不到 click_log 時為啥也輸出了?原理是啥?

博主帶你們來定位到具體的實現源碼。先看一下 transformations。

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">transformations</figcaption>

可以看到事件時間下 interval join 的具體 operator 是 org.apache.flink.table.runtime.operators.join. 。

其核心邏輯就集中在 processElement1 和 processElement2 中,在 processElement1 和 processElement2 中使用 org.apache.flink.table.runtime.operators.join.interval.RowTimeIntervalJoin 來處理具體 join 邏輯。 RowTimeIntervalJoin 重要方法如下圖所示。

TimeIntervalJoin

下面詳細給大家解釋一下。

join 時,左流和右流會在 interval 時間之內相互等待,如果等到了則輸出數據[+(show_log,click_log)],如果等不到,並且另一條流的時間已經推進到當前這條數據在也不可能 join 到另一條流的數據時,則直接輸出[+(show_log,null)],[+(null,click_log)]。

舉個例子, show_log.row_time BETWEEN click_log.row_time - INTERVAL ཆ' MINUTE AND click_log.row_time + INTERVAL ཆ' MINUTE , 當 click_log 的時間推進到 2021-11-01 11:00:00 時,這時 show_log 來一條 2021-11-01 02:00:00 的數據, 那這條 show_log 必然不可能和 click_log 中的數據 join 到了,因為 click_log 中 2021-11-01 01:50:00 到 2021-11-01 02:10:00 之間的數據以及過期刪除了。則 show_log 直接輸出 [+(show_log,null)]

以上面案例的 show_log(左表) interval join click_log(右表) 為例(不管是 inner interval join,left interval join,right interval join 還是 full interval join,都會按照下面的流程執行):

上面只是左流 show_log 數據到達時的執行流程(即 ProcessElement1 ),當右流 click_log 到達時也是完全類似的執行流程(即 ProcessElement2 )。

小夥伴萌在使用 interval join 需要注意的兩點事項:

本文主要介紹了 flink sql interval 是怎麼避免出現 flink regular join 存在的 retract 問題的,並通過解析其實現說明了運行原理,博主期望你讀完本文之後能了解到: