當前位置:首頁 » 編程語言 » flinksql半實時採集
擴展閱讀
webinf下怎麼引入js 2023-08-31 21:54:13
堡壘機怎麼打開web 2023-08-31 21:54:11

flinksql半實時採集

發布時間: 2022-09-02 19:31:54

❶ 基於flink sql構建實時數據倉庫

根據目前大數據這一塊的發展,已經不局限於離線的分析,挖掘數據潛在的價值,數據的時效性最近幾年變得剛需,實時處理的框架有storm,spark-streaming,flink等。想要做到實時數據這個方案可行,需要考慮以下幾點:1、狀態機制 2、精確一次語義 3、高吞吐量 4、可彈性伸縮的應用 5、容錯機制,剛好這幾點,flink都完美的實現了,並且支持flink sql高級API,減少了開發成本,可用實現快速迭代,易維護等優點。

離線數倉的架構圖:

實時數倉架構圖:

目前是將實時維度表和DM層數據存於hbase當中,實時公共層都存於kafka當中,並且以寫滾動日誌的方式寫入HDFS(主要是用於校驗數據)。其實在這里可以做的工作還有很多,kafka集群,flink集群,hbase集群相互獨立,這對整個實時數據倉庫的穩定性帶來一定的挑戰。

一個數據倉庫想要成體系,成資產,離不開數據域的劃分。所以參考著離線的數據倉庫,想著在實時數倉做出這方面的探索,理論上來講,離線可以實現的,實時也是可以實現的。 並且目前已經取得了成效,目前劃分的數據域跟離線大致相同,有流量域,交易域,營銷域等等。當然這裡面涉及到維表,多事務事實表,累計快照表,周期性快照表的設計,開發,到落地這里就不詳述了。

維度表也是整個實時數據倉庫不可或缺的部分。從目前整個實時數倉的建設來看,維度表有著數據量大,但是變更少的特點,我們試想過構建全平台的實時商品維度表或者是實時會員維度表,但是這類維度表太過於復雜,所以針對這類維度表下面介紹。還有另外一種就是較為簡單的維度表,這類維度可能對應著業務系統單個mysql表,或者只需要幾個表進行簡單ETL就可以產出的表,這類維表是可以做成實時的。以下有幾個實施的關鍵點:

如下是離線數據同步架構圖:

實時數據的接入其實在底層架構是一樣的,就是從kafka那邊開始不一樣,實時用flink的UDTF進行解析,而離線是定時(目前是小時級)用camus拉到HDFS,然後定時load HDFS的數據到hive表裡面去,這樣來實現離線數據的接入。實時數據的接入是用flink解析kafka的數據,然後在次寫入kafka當中去。

由於目前離線數據已經穩定運行了很久,所以實時接入數據的校驗可以對比離線數據,但是離線數據是小時級的hive數據,實時數據存於kafka當中,直接比較不了,所以做了相關處理,將kafka的數據使用flink寫HDFS滾動日誌的形式寫入HDFS,然後建立hive表小時級定時去load HDFS中的文件,以此來獲取實時數據。

完成以上兩點,剩餘還需要考慮一點,都是小時級的任務,這個時間卡點使用什麼欄位呢?首先要確定一點就是離線和實時任務卡點的時間欄位必須是一致的,不然肯定會出問題。目前離線使用camus從kafka將數據拉到HDFS上,小時級任務,使用nginx_ts這個時間欄位來卡點,這個欄位是上報到nginx伺服器上記錄的時間點。而實時的數據接入是使用flink消費kafka的數據,在以滾動日誌的形式寫入HDFS的,然後在建立hive表load HDFS文件獲取數據,雖然這個hive也是天/小時二級分區,但是離線的表是根據nginx_ts來卡點分區,但是實時的hive表是根據任務啟動去load文件的時間點去區分的分區,這是有區別的,直接篩選分區和離線的數據進行對比,會存在部分差異,應當的做法是篩選范圍分區,然後在篩選nginx_ts的區間,這樣在跟離線做對比才是合理的。

目前實時數據接入層的主要時延是在UDTF函數解析上,實時的UDTF函數是根據上報的日誌格式進行開發的,可以完成日誌的解析功能。

解析流程圖如下:

解析速率圖如下:

該圖還不是在峰值數據量的時候截的,目前以800記錄/second為准,大概一個記錄的解析速率為1.25ms。
目前該任務的flink資源配置核心數為1,假設解析速率為1.25ms一條記錄,那麼峰值只能處理800條/second,如果數據接入速率超過該值就需要增加核心數,保證解析速率。

介紹一下目前離線維度表的情況,就拿商品維度表來說,全線記錄數將近一個億,計算邏輯來自40-50個ods層的數據表,計算邏輯相當復雜,如果實時維度表也參考離線維度表來完成的話,那麼開發成本和維護成本非常大,對於技術來講也是很大的一個挑戰,並且目前也沒有需求要求維度屬性百分百准確。所以目前(偽實時維度表)准備在當天24點產出,當天的維度表給第二天實時公共層使用,即T-1的模式。偽實時維度表的計算邏輯參考離線維度表,但是為了保障在24點之前產出,需要簡化一下離線計算邏輯,並且去除一些不常用的欄位,保障偽實時維度表可以較快產出。

實時維度表的計算流程圖:

目前使用flink作為公司主流的實時計算引擎,使用內存作為狀態後端,並且固定30s的間隔做checkpoint,使用HDFS作為checkpoint的存儲組件。並且checkpoint也是作為任務restart以後恢復狀態的重要依據。熟悉flink的人應該曉得,使用內存作為狀態後端,這個內存是JVM的堆內存,畢竟是有限的東西,使用不得當,OOM是常有的事情,下面就介紹一下針對有限的內存,如果完成常規的計算。

❷ 基於Flink的實時計算平台的構建

一、系統架構

1. 接入層

Canal、Flume、Kafka

針對業務系統數據,Canal監控Binlog日誌,發送至kafka;

針對日誌數據,由Flume來進行統一收集,並發送至kafka。

消息隊列的數據既是離線數倉的原始數據,也是實時計算的原始數據,這樣可以保證實時和離線的原始數據是統一的。

2. 計算層

Flink

有了源數據,在 計算層 經過Flink實時計算引擎做一些加工處理,然後落地到存儲層中不同存儲介質當中。

3. 存儲層

HBase、Kafka、ES、Mysql、Hive、Redis

不同的 存儲介質 是通過不同的應用場景來選擇。

4. 數據應用層

風控、模型、圖譜、大屏展示

通過存儲層應用於不同的 數據應用 ,數據應用可能是我們的正式產品或者直接的業務系統

二、技術實現

1. 計算引擎

實時計算引擎的功能要求

提供高級 API,支持常見的數據操作比如關聯聚合,最好是能支持 SQL

具有狀態管理和自動支持久化方案,減少對存儲的依賴

可靠的容錯機制,低延時,最好能夠保證Exactly-once

Flink的優勢

Flink的API、容錯機制與狀態管理都滿足實時數倉計算引擎的需求

Flink高吞吐、低延時的特性

端到端的Exactly-once

WaterMark&Event Time的支持

Flink 不僅支持了大量常用的 SQL 語句,還有豐富的數據類型、內置函數以及靈活的自定義函數,基本覆蓋了我們的開發場景

2. 存儲引擎

根據不同的業務場景,使用最適合的存儲引擎:

Kafka主要用於中間數據表的存儲

ES主要針對日誌數據的存儲和分析

HBase、Redis可用於維表存儲

Hive用於數據校驗

Mysql可以用於指標計算結果的存儲

三、數據分層

數據源:目前數據源主要是Binlog,通過Canal監控各個業務系統的Mysql,將binlog發送至kafka。

ODS層:主要將Binlog數據存儲至Kafka,這一層不對數據進行任何操作,存儲最原始的數據,Binlog 日誌在這一層為庫級別,即:一個庫的變更數據存放在同一個 Kafka Topic 中。

DWD層:主要對數據進行簡單的清洗。拆分主題,將庫級別的主題拆分為表級別;打平數據,將data數組格式打平。

DWS層:主要根據不同的業務的需求,將該需求所涉及到的表進行join所得。

APP層:根據指標計算需求,對數據進行處理後,存儲HBase,為了方便模型查詢,主要將表存儲為索引表和明細表,直接對數據進行指標計算後,將計算結果存儲到HBase。

四、數據監控及校驗

1. 數據監控

目前數據的監控的架構是pushgateway + Prometheus + Grafana

數據監控主要是接入Flink的Metric,通過Grafana對Flink系統指標及自定義指標進行圖形化界面的展示,對關鍵指標進行監控報警

2. 數據校驗

目前數據的監控的架構是Grafana + Mysql

Grafana用於監控指標的展示及相關閾值數據的報警,Mysql主要用於監控數據的存儲

將每個服務的source收到的數據、sink發出的數據,根據表的不同將數據關鍵欄位寫入mysql中,通過統計各個階段各個表中的數據條數,對數據完整性進行監控校驗,若出現數據缺時,先查找原因,然後指定時間戳重啟服務

五、系統管理

元數據管理

表,欄位元數據管理,實時感知元數據的變化,大幅度降低使用數據的成本。

系統配置

對應用啟動參數及相關配置參數的管理,對任務進行靈活配置及管理。

血緣管理

主要是梳理實時計算平台中數據依賴關系,以及實時任務的依賴關系,從底層ODS到DWD再到DWS,以及APP層用到哪些數據,將整個鏈度串聯起來。

六、問題及解決方案

1. 數據傾斜

由於要拆分主題,要以table為key對數據進行keyBy,但是由於每個表的數據量相差較大,會出現數據傾斜

解決方案:

加鹽,給key加前綴

前綴不能隨便加,為了保證同一id的數據在相同的分區中,所以根據id_table進行keyBy

2. 數據重復

任務在進行自動或手動重啟時,為了保證數據不丟失,數據會出現重復計算的問題,如果下游只是對數據進行HBase存儲的話,由於冪等性,這種重復可以解。但是,如果下游要對數據進行聚合,這樣會導致數據被計算多次,影響計算結果的准確性

解決方案:

上游在對數據進行發送時,對kafka procer 進行 exactly once的設置

在對數據統計時進行數據去重

3. 數據延時

由於所處理的數據表的大小不一樣,處理大表時,會出現數據延時的問題。

解決方案:

針對大表數據增加並行度

4.數據亂序

由於Flink kafka procer默認是根據hash對數據進行隨機分區,kafka consumer在對數據進行消費時,每個分區消費速度不同,這樣最終在存儲數據時,就會出現亂序即相同的id會出現老數據覆蓋新數據的問題

解決方案:

對kafka每個階段進行自定義分區,將id相同的數據分到同一個分區,保證同一id的數據的有序性

由於整個數據處理過程中可能會出現shuffle,導數數據重新亂序,所以在對數據存儲前對數據進行排序

對數據進行排序的關鍵點時要保證每條數據的唯一性,即要有標記數據先後順序的欄位

5 . 數據唯一標記(很重要)

由於要對數據進行去重或者排序,所以要保證數據的唯一性

解決辦法:

使用時間戳不可以,因為數據量很大的情況下,同一時間會處理上百條數據

在最初發出數據的時候,為數據打上標記,使用 partition + offset + idx 的組合來確認數據的唯一性及順序性

6. 數據可靠性

我們對服務重啟或對服務升級時,可能會出現數據的丟失

解決方案:

結合Flink 的checkpoint及savepoint機制保證數據的可靠性

開啟Flink的checkpoint機制,服務進行自動重啟時,會自動讀取上次保存在checkpoint中offset,或者我們指定offset進行數據消費

對服務進行升級時,先將服務的狀態保存至savepoint中,重啟時指定savepoint進行服務啟動,保證數據不丟失

7. 無感升級

由於我們目前數據量比較龐大,且在對服務進行升級時,耗時較長,會影響調用方的使用。

解決辦法:

在對服務進行升級時,將數據寫入備用庫,等數據追上且服務穩定運行後,再將存儲庫進行切換

❸ 《Flink基礎教程》pdf下載在線閱讀,求百度網盤雲資源

《Flink基礎教程》([美] 埃倫•弗里德曼)電子書網盤下載免費在線閱讀

鏈接:https://pan..com/s/1tm7Vs-V-SUnv7jA3MMLF0Q

密碼:4jf8

書名:Flink基礎教程

作者:[美] 埃倫•弗里德曼

譯者:王紹翾

豆瓣評分:6.0

出版社:人民郵電出版社

出版年份:2018-8

頁數:96

內容簡介:

作為新一代的開源流處理器,Flink是眾多大數據處理框架中一顆冉冉升起的新星。它以同一種技術支持流處理和批處理,並能同時滿足高吞吐、低延遲和容錯的需求。本書由Flink項目核心成員執筆,系統闡釋Flink的適用場景、設計理念、功能、用途和性能優勢。

作者簡介:

埃倫·弗里德曼(Ellen Friedman)

解決方案咨詢師,知名大數據相關技術佈道師,在流處理架構和大數據處理框架等方面有多部著作。

科斯塔斯·宙馬斯(Kostas Tzoumas)

Flink項目核心成員,data Artisans公司聯合創始人兼首席執行官,在流處理和數據科學領域經驗豐富。

譯者介紹

王紹翾

阿里巴巴資深技術專家,Apache Flink Committer,淘寶花名「大沙」。畢業於北京大學信息科學技術學院,後取得加州大學聖地亞哥分校計算機工程博士學位。目前就職於阿里巴巴計算平台事業部,負責Flink SQL引擎及機器學習的相關開發。加入阿里巴巴之前,在Facebook開發分布式圖存儲系統TAO。曾多次拜訪由Flink創始團隊創辦的公司data Artisans,並與其首席執行官科斯塔斯·宙馬斯(本書作者之一)以及首席技術官斯蒂芬·尤恩有著廣泛的合作。

❹ 實時計算組件有哪些

實時計算的組件有很多,數據採集組件及中間件:Flume、Sqoop、Kafka、Logstash、Splunk等。大數據集群核心組件:Hadoop、Hive、Impala、HBase、Spark(Core、SQL、Streaming、MLlib)、Flink、Zookeeper等,大概如下:
數據從底層的數據源開始,經過Kafka、Flume等數據組件進行收集,然後分成兩條線進行計算:
一條線是進入流式計算平台(例如 Storm、Flink或者SparkStreaming),去計算實時的一些指標;
另一條線進入批量數據處理離線計算平台(例如Maprece、Hive,Spark SQL),去計算T+1的相關業務指標,這些指標需要隔日才能看見。
這就是實時計算所需的組件了。

❺ flink 1.10 1.12區別

flink 1.10 1.12區別在於Flink 1.12 支持了 Flink SQL Kafka upsert connector 。

因為在 Flink 1.10 中,當前這類任務開發對於用戶來說,還是不夠友好,需要很多代碼,同時也會造成 Flink SQL 冗長。

Flink 1.12 SQL Connector 支持 Kafka Upsert Connector,這也是我們公司內部業務方對實時平台提出的需求。

收益:便利用戶有這種需要從 kafka 取最新記錄操作的實時任務開發,比如這種 binlog -> kafka,然後用戶聚合操作,這種場景還是非常多的,這能提升實時作業開發效率,同時 1.12 做了優化,性能會比單純的 last_value 性能要好。

Flink Yarn 作業 On k8s 的生產級別能力是:

Flink Jar 作業已經全部 K8s 化,Flink SQL 作業由於是推廣初期,還是在 Yarn 上面進行運行,為了將實時計算 Flink 全部K8s化。

所以我們 Flink SQL 作業也需要遷移到 K8s,目前 Flink 1.12 已經滿足生產級別的 Flink k8s 功能,所以 Flink SQL K8s 化,打算直接使用社區的 On k8s 能力。

風險:雖然和社區的人溝通,Flink 1.12 on k8s 沒有什麼問題,但是具體功能還是需要先 POC 驗證一下,同時可能社區 Flink on k8s 的能力。

可能會限制我們這邊一些 k8s 功能使用,比如 hostpath volome 以及 Ingress 的使用,這里可能需要改底層源碼來進行快速支持(社區有相關 JIRA 要做)。

❻ flink窗口的種類及詳述

flink窗口的種類及詳述:

滾動窗口(tumblingwindow)將事件分配到長度固定且互不重疊的桶中。

實際案例:簡單且常見的分維度分鍾級別同時在線用戶數、總銷售額

Java設置語句:window(TumblingProcessingTimeWindows.of(Time.seconds(5)))

該語句為設置滾動窗口的窗口時長為5秒鍾

sql設置語句:FROM TABLE(TUMBLE(

        TABLE source_table

        , DESCRIPTOR(row_time)

        , INTERVAL '60' SECOND))

Windowing TVF 滾動窗口的寫法就是把 tumble window 的聲明寫在了數據源的 Table 子句中,即 TABLE(TUMBLE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '60' SECOND)),包含三部分參數。

第一個參數 TABLE source_table 聲明數據源表;第二個參數 DESCRIPTOR(row_time) 聲明數據源的時間戳;第三個參數 INTERVAL '60' SECOND 聲明滾動窗口大小為 1 min

滑動窗口:分配器將每個元素分配給固定窗口大小的窗口。與滾動窗口分配器類似,窗口的大小由 window size 參數配置。還有一個window slide參數用來控制滑動窗口的滑動大小。因此,如果滑動大小小於窗口大小,則滑動窗口會重疊。在這種情況下,一個元素會被分配到多個窗口中。

實際案例:簡單且常見的分維度分鍾級別同時在線用戶數,1 分鍾輸出一次,計算最近 5 分鍾的數據

java設置語句:window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))

window size :窗口大小為 10秒鍾

window slide:窗口間隔為5秒鍾

sql設置語句: hop(row_time, interval '1' minute, interval '5' minute) 

第一個參數為事件時間的時間戳;第二個參數為滑動窗口的滑動步長;第三個參數為滑動窗口大小。

會話窗口:分配器通過活動會話對元素進行分組。與滾動窗口和滑動窗口相比,會話窗口不會重疊,也沒有固定的開始和結束時間。當會話窗口在一段時間內沒有接收到元素時會關閉。會話窗口分配器需要配置一個會話間隙,定義了所需的不活動時長。當此時間段到期時,當前會話關閉,後續元素被分配到新的會話窗口。

實際案例:計算每個用戶在活躍期間(一個 Session)總共購買的商品數量,如果用戶 5 分鍾沒有活動則視為 Session 斷開

設置語句:基於事件時間的會話窗口window(EventTimeSessionWindows.withGap(Time.minutes(10)))

基於處理時間的會話窗口

Java設置:window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))

會話間隙,不活動時長為10秒鍾

sql設置:session(row_time, interval '5' minute)

Group Window Aggregation 中 Session 窗口的寫法就是把 session window 的聲明寫在了 group by 子句中

Session 窗口即支持 處理時間 也支持 事件時間。但是處理時間只支持在 Streaming 任務中運行,Batch 任務不支持。

漸進式窗口:在其實就是 固定窗口間隔內提前觸發的的滾動窗口,其實就是 Tumble Window + early-fire 的一個事件時間的版本。例如,從每日零點到當前這一分鍾繪制累積 UV,其中 10:00 時的 UV 表示從 00:00 到 10:00 的 UV 總數。漸進式窗口可以認為是首先開一個最大窗口大小的滾動窗口,然後根據用戶設置的觸發的時間間隔將這個滾動窗口拆分為多個窗口,這些窗口具有相同的窗口起點和不同的窗口終點。

 應用場景:周期內累計 PV,UV 指標(如每天累計到當前這一分鍾的 PV,UV)。這類指標是一段周期內的累計狀態,對分析師來說更具統計分析價值,而且幾乎所有的復合指標都是基於此類指標的統計(不然離線為啥都要累計一天的數據,而不要一分鍾累計的數據呢)。

實際案例:每天的截止當前分鍾的累計 money(sum(money)),去重 id 數(count(distinct id))。每天代表漸進式窗口大小為 1 天,分鍾代表漸進式窗口移動步長為分鍾級別

sql設置:FROM TABLE(CUMULATE(

       TABLE source_table

       , DESCRIPTOR(row_time)

       , INTERVAL '60' SECOND

       , INTERVAL '1' DAY))

Windowing TVF 滾動窗口的寫法就是把 cumulate window 的聲明寫在了數據源的 Table 子句中,即 TABLE(CUMULATE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '60' SECOND, INTERVAL '1' DAY)),其中包含四部分參數:

第一個參數 TABLE source_table 聲明數據源表;第二個參數 DESCRIPTOR(row_time) 聲明數據源的時間戳;第三個參數 INTERVAL '60' SECOND 聲明漸進式窗口觸發的漸進步長為 1 min。第四個參數 INTERVAL '1' DAY 聲明整個漸進式窗口的大小為 1 天,到了第二天新開一個窗口重新累計

全局窗口:分配器將具有相同 key 的所有元素分配給同一個全局窗口。僅當我們指定自定義觸發器時,窗口才起作用。否則,不會執行任何計算,因為全局窗口沒有我們可以處理聚合元素的自然結束的點(譯者註:即本身自己不知道窗口的大小,計算多長時間的元素)

window(GlobalWindows.create())

平時滑動窗口用得比較多,其次是滾動窗口

❼ flink sql 知其所以然(十三):流 join問題解決

本節是 flink sql 流 join 系列的下篇,上篇的鏈接如下:

廢話不多說,咱們先直接上本文的目錄和結論,小夥伴可以先看結論快速了解博主期望本文能給小夥伴們帶來什麼幫助:

書接上文,上文介紹了曝光流在關聯點擊流時,使用 flink sql regular join 存在的 retract 問題。

本文介紹怎麼使用 flink sql interval join 解決這些問題。

flink sql 知其所以然(十二):流 join 很難嘛???(上)

看看上節的實際案例,來看看在具體輸入值的場景下,輸出值應該長啥樣。

場景:即常見的曝光日誌流(show_log)通過 log_id 關聯點擊日誌流(click_log),將數據的關聯結果進行下發。

來一波輸入數據:

曝光數據:

點擊數據:

預期輸出數據如下:

上節的 flink sql regular join 解決方案如下:

上節說道,flink sql left join 在流數據到達時,如果左表流(show_log)join 不到右表流(click_log) ,則不會等待右流直接輸出(show_log,null),在後續右表流數據代打時,會將(show_log,null)撤回,發送(show_log,click_log)。這就是為什麼產生了 retract 流,從而導致重復寫入 kafka。

對此,我們也是提出了對應的解決思路,既然 left join 中左流不會等待右流,那麼能不能讓左流強行等待右流一段時間,實在等不到在數據關聯不到的數據即可。

當當當!!!

本文的 flink sql interval join 登場,它就能等。

大家先通過下面這句話和圖簡單了解一下 interval join 的作用(熟悉 DataStream 的小夥伴萌可能已經使用過了),後續會詳細介紹原理。

interval join 就是用一個流的數據去關聯另一個流的一段時間區間內的數據。關聯到就下發關聯到的數據,關聯不到且在超時後就根據是否是 outer join(left join,right join,full join)下發沒關聯到的數據。

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">interval join</figcaption>

來看看上述案例的 flink sql interval join sql 怎麼寫:

這里設置了 show_log.row_time BETWEEN click_log.row_time - INTERVAL ཆ' MINUTE AND click_log.row_time + INTERVAL ཆ' MINUTE 代表 show_log 表中的數據會和 click_log 表中的 row_time 在前後 10 分鍾之內的數據進行關聯。

運行結果如下:

如上就是我們期望的正確結果了。

flink web ui 運算元圖如下:

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">flink web ui</figcaption>

那麼此時你可能有一個問題,結果中的前兩條數據 join 到了輸出我是理解的,那當 show_log join 不到 click_log 時為啥也輸出了?原理是啥?

博主帶你們來定位到具體的實現源碼。先看一下 transformations。

<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">transformations</figcaption>

可以看到事件時間下 interval join 的具體 operator 是 org.apache.flink.table.runtime.operators.join. 。

其核心邏輯就集中在 processElement1 和 processElement2 中,在 processElement1 和 processElement2 中使用 org.apache.flink.table.runtime.operators.join.interval.RowTimeIntervalJoin 來處理具體 join 邏輯。 RowTimeIntervalJoin 重要方法如下圖所示。

TimeIntervalJoin

下面詳細給大家解釋一下。

join 時,左流和右流會在 interval 時間之內相互等待,如果等到了則輸出數據[+(show_log,click_log)],如果等不到,並且另一條流的時間已經推進到當前這條數據在也不可能 join 到另一條流的數據時,則直接輸出[+(show_log,null)],[+(null,click_log)]。

舉個例子, show_log.row_time BETWEEN click_log.row_time - INTERVAL ཆ' MINUTE AND click_log.row_time + INTERVAL ཆ' MINUTE , 當 click_log 的時間推進到 2021-11-01 11:00:00 時,這時 show_log 來一條 2021-11-01 02:00:00 的數據, 那這條 show_log 必然不可能和 click_log 中的數據 join 到了,因為 click_log 中 2021-11-01 01:50:00 到 2021-11-01 02:10:00 之間的數據以及過期刪除了。則 show_log 直接輸出 [+(show_log,null)]

以上面案例的 show_log(左表) interval join click_log(右表) 為例(不管是 inner interval join,left interval join,right interval join 還是 full interval join,都會按照下面的流程執行):

上面只是左流 show_log 數據到達時的執行流程(即 ProcessElement1 ),當右流 click_log 到達時也是完全類似的執行流程(即 ProcessElement2 )。

小夥伴萌在使用 interval join 需要注意的兩點事項:

本文主要介紹了 flink sql interval 是怎麼避免出現 flink regular join 存在的 retract 問題的,並通過解析其實現說明了運行原理,博主期望你讀完本文之後能了解到:

❽ flinksql自定義topN函數的代碼

摘要 當前 Flink 有如下幾種函數:

❾ flink sql 近3天登錄次數

flink sql 近3天登錄次數如下
1、獲取最近七天活躍的用戶,並對用戶活躍日期進行排序。
2、計算用戶活躍日期與排名的差值。
3、對用戶及差值進行分組。
4、統計差值個數取出差值個數大於3的數據(即連續登陸三天以上的用戶)。
5、對數據進行去重。

❿ flinksql自定義topN函數的代碼

摘要 package day07;