當前位置:首頁 » 編程語言 » flinksql實現行轉列
擴展閱讀
webinf下怎麼引入js 2023-08-31 21:54:13
堡壘機怎麼打開web 2023-08-31 21:54:11

flinksql實現行轉列

發布時間: 2022-10-15 11:17:05

『壹』 flinksql自定義topN函數的代碼

摘要 package day07;

『貳』 Apache Flink現在在大數據處理方面能夠和Apache Spark分庭抗禮么

我們是否還需要另外一個新的數據處理引擎?當我第一次聽到flink的時候這是我是非常懷疑的。在大數據領域,現在已經不缺少數據處理框架了,但是沒有一個框架能夠完全滿足不同的處理需求。自從Apache spark出現後,貌似已經成為當今把大部分的問題解決得最好的框架了,所以我對另外一款解決類似問題的框架持有很強烈的懷疑態度。
不過因為好奇,我花費了數個星期在嘗試了解flink。一開始仔細看了flink的幾個例子,感覺和spark非常類似,心理就傾向於認為flink又是一個模仿spark的框架。但是隨著了解的深入,這些API體現了一些flink的新奇的思路,這些思路還是和spark有著比較明顯的區別的。我對這些思路有些著迷了,所以花費了更多的時間在這上面。
flink中的很多思路,例如內存管理,dataset API都已經出現在spark中並且已經證明 這些思路是非常靠譜的。所以,深入了解flink也許可以幫助我們分布式數據處理的未來之路是怎樣的
在後面的文章里,我會把自己作為一個spark開發者對flink的第一感受寫出來。因為我已經在spark上幹了2年多了,但是只在flink上接觸了2到3周,所以必然存在一些bias,所以大家也帶著懷疑和批判的角度來看這篇文章吧。
Apache Flink是什麼
flink是一款新的大數據處理引擎,目標是統一不同來源的數據處理。這個目標看起來和spark和類似。沒錯,flink也在嘗試解決spark在解決的問題。這兩套系統都在嘗試建立一個統一的平台可以運行批量,流式,互動式,圖處理,機器學習等應用。所以,flink和spark的目標差別並不大,他們最主要的區別在於實現的細節。
後面我會重點從不同的角度對比這兩者。
Apache Spark vs Apache Flink
1.抽象 Abstraction
spark中,對於批處理我們有RDD,對於流式,我們有DStream,不過內部實際還是RDD.所以所有的數據表示本質上還是RDD抽象。
後面我會重點從不同的角度對比這兩者。在flink中,對於批處理有DataSet,對於流式我們有DataStreams。看起來和spark類似,他們的不同點在於:
一)DataSet在運行時是表現為運行計劃(runtime plans)的
在spark中,RDD在運行時是表現為java objects的。通過引入Tungsten,這塊有了些許的改變。但是在flink中是被表現為logical plan(邏輯計劃)的,聽起來很熟悉?沒錯,就是類似於spark中的dataframes。所以在flink中你使用的類Dataframe api是被作為第一優先順序來優化的。但是相對來說在spark RDD中就沒有了這塊的優化了。
flink中的Dataset,對標spark中的Dataframe,在運行前會經過優化。
在spark 1.6,dataset API已經被引入spark了,也許最終會取代RDD 抽象。
二)Dataset和DataStream是獨立的API
在spark中,所有不同的API,例如DStream,Dataframe都是基於RDD抽象的。但是在flink中,Dataset和DataStream是同一個公用的引擎之上兩個獨立的抽象。所以你不能把這兩者的行為合並在一起操作,當然,flink社區目前在朝這個方向努力(https://issues.apache.org/jira/browse/FLINK-2320),但是目前還不能輕易斷言最後的結果。
2.內存管理
一直到1.5版本,spark都是試用java的內存管理來做數據緩存,明顯很容易導致OOM或者gc。所以從1.5開始,spark開始轉向精確的控制內存的使用,這就是tungsten項目了
flink從第一天開始就堅持自己控制內存試用。這個也是啟發了spark走這條路的原因之一。flink除了把數據存在自己管理的內存以外,還直接操作二進制數據。在spark中,從1.5開始,所有的dataframe操作都是直接作用在tungsten的二進制數據上。

3.語言實現
spark是用scala來實現的,它提供了Java,Python和R的編程介面。
flink是java實現的,當然同樣提供了Scala API
所以從語言的角度來看,spark要更豐富一些。因為我已經轉移到scala很久了,所以不太清楚這兩者的java api實現情況。
4.API
spark和flink都在模仿scala的collection API.所以從表面看起來,兩者都很類似。下面是分別用RDD和DataSet API實現的word count

// Spark wordcount
object WordCount {

def main(args: Array[String]) {

val env = new SparkContext("local","wordCount")

val data = List("hi","how are you","hi")

val dataSet = env.parallelize(data)

val words = dataSet.flatMap(value => value.split("\\s+"))

val mappedWords = words.map(value => (value,1))

val sum = mappedWords.receByKey(_+_)

println(sum.collect())

}

}

// Flink wordcount
object WordCount {

def main(args: Array[String]) {

val env = ExecutionEnvironment.getExecutionEnvironment

val data = List("hi","how are you","hi")

val dataSet = env.fromCollection(data)

val words = dataSet.flatMap(value => value.split("\\s+"))

val mappedWords = words.map(value => (value,1))

val grouped = mappedWords.groupBy(0)

val sum = grouped.sum(1)

println(sum.collect())
}

}
不知道是偶然還是故意的,API都長得很像,這樣很方便開發者從一個引擎切換到另外一個引擎。我感覺以後這種Collection API會成為寫data pipeline的標配。
Steaming
spark把streaming看成是更快的批處理,而flink把批處理看成streaming的special case。這裡面的思路決定了各自的方向,其中兩者的差異點有如下這些:

實時 vs 近實時的角度
flink提供了基於每個事件的流式處理機制,所以可以被認為是一個真正的流式計算。它非常像storm的model。
而spark,不是基於事件的粒度,而是用小批量來模擬流式,也就是多個事件的集合。所以spark被認為是近實時的處理系統。

Spark streaming 是更快的批處理,而Flink Batch是有限數據的流式計算。
雖然大部分應用對准實時是可以接受的,但是也還是有很多應用需要event level的流式計算。這些應用更願意選擇storm而非spark streaming,現在,flink也許是一個更好的選擇。

流式計算和批處理計算的表示
spark對於批處理和流式計算,都是用的相同的抽象:RDD,這樣很方便這兩種計算合並起來表示。而flink這兩者分為了DataSet和DataStream,相比spark,這個設計算是一個糟糕的設計。

對 windowing 的支持
因為spark的小批量機制,spark對於windowing的支持非常有限。只能基於process time,且只能對batches來做window。
而Flink對window的支持非常到位,且Flink對windowing API的支持是相當給力的,允許基於process time,data time,record 來做windowing。
我不太確定spark是否能引入這些API,不過到目前為止,Flink的windowing支持是要比spark好的。
Steaming這部分flink勝

SQL interface
目前spark-sql是spark裡面最活躍的組件之一,Spark提供了類似Hive的sql和Dataframe這種DSL來查詢結構化數據,API很成熟,在流式計算中使用很廣,預計在流式計算中也會發展得很快。
至於flink,到目前為止,Flink Table API只支持類似DataFrame這種DSL,並且還是處於beta狀態,社區有計劃增加SQL 的interface,但是目前還不確定什麼時候才能在框架中用上。
所以這個部分,spark勝出。

Data source Integration

Spark的數據源 API是整個框架中最好的,支持的數據源包括NoSql db,parquet,ORC等,並且支持一些高級的操作,例如predicate push down
Flink目前還依賴map/rece InputFormat來做數據源聚合。
這一場spark勝

Iterative processing
spark對機器學習的支持較好,因為可以在spark中利用內存cache來加速機器學習演算法。
但是大部分機器學習演算法其實是一個有環的數據流,但是在spark中,實際是用無環圖來表示的,一般的分布式處理引擎都是不鼓勵試用有環圖的。
但是flink這里又有點不一樣,flink支持在runtime中的有環數據流,這樣表示機器學習演算法更有效而且更有效率。
這一點flink勝出。

Stream as platform vs Batch as Platform
Spark誕生在Map/Rece的時代,數據都是以文件的形式保存在磁碟中,這樣非常方便做容錯處理。
Flink把純流式數據計算引入大數據時代,無疑給業界帶來了一股清新的空氣。這個idea非常類似akka-streams這種。
成熟度
目前的確有一部分吃螃蟹的用戶已經在生產環境中使用flink了,不過從我的眼光來看,Flink還在發展中,還需要時間來成熟。
結論
目前Spark相比Flink是一個更為成熟的計算框架,但是Flink的很多思路很不錯,Spark社區也意識到了這一點,並且逐漸在採用Flink中的好的設計思路,所以學習一下Flink能讓你了解一下Streaming這方面的更迷人的思路。

『叄』 apache flink支持sql嗎

org.apache.jsp.check_005flinkcard_jsp._jspService(org.apache.jsp.check_005flinkcard_jsp:102)可以看出你寫的jsp在運行期遇到空指針錯誤,如果是tomcat可以到apache-tomcat-6.0.16\work\Catalina\localhost\testhttps\org\apache\jsp地方找到check_005flinkcard_jsp.java的102行,查看jsp編譯成java文件的源碼

『肆』 轉載:阿里巴巴為什麼選擇Apache Flink

本文主要整理自阿里巴巴計算平台事業部資深技術專家莫問在雲棲大會的演講。

合抱之木,生於毫末

隨著人工智慧時代的降臨,數據量的爆發,在典型的大數據的業務場景下數據業務最通用的做法是:選用批處理的技術處理全量數據,採用流式計算處理實時增量數據。在絕大多數的業務場景之下,用戶的業務邏輯在批處理和流處理之中往往是相同的。但是,用戶用於批處理和流處理的兩套計算引擎是不同的。

因此,用戶通常需要寫兩套代碼。毫無疑問,這帶來了一些額外的負擔和成本。阿里巴巴的商品數據處理就經常需要面對增量和全量兩套不同的業務流程問題,所以阿里就在想,我們能不能有一套統一的大數據引擎技術,用戶只需要根據自己的業務邏輯開發一套代碼。這樣在各種不同的場景下,不管是全量數據還是增量數據,亦或者實時處理,一套方案即可全部支持, 這就是阿里選擇Flink的背景和初衷

目前開源大數據計算引擎有很多選擇,流計算如Storm,Samza,Flink,Kafka Stream等,批處理如Spark,Hive,Pig,Flink等。而同時支持流處理和批處理的計算引擎,只有兩種選擇:一個是Apache Spark,一個是Apache Flink。

從技術,生態等各方面的綜合考慮。首先,Spark的技術理念是基於批來模擬流的計算。而Flink則完全相反,它採用的是基於流計算來模擬批計算。

從技術發展方向看,用批來模擬流有一定的技術局限性,並且這個局限性可能很難突破。而Flink基於流來模擬批,在技術上有更好的擴展性。從長遠來看,阿里決定用Flink做一個統一的、通用的大數據引擎作為未來的選型。

Flink是一個低延遲、高吞吐、統一的大數據計算引擎。在阿里巴巴的生產環境中,Flink的計算平台可以實現毫秒級的延遲情況下,每秒鍾處理上億次的消息或者事件。同時Flink提供了一個Exactly-once的一致性語義。保證了數據的正確性。這樣就使得Flink大數據引擎可以提供金融級的數據處理能力。

Flink在阿里的現狀

基於Apache Flink在阿里巴巴搭建的平台於2016年正式上線,並從阿里巴巴的搜索和推薦這兩大場景開始實現。目前阿里巴巴所有的業務,包括阿里巴巴所有子公司都採用了基於Flink搭建的實時計算平台。同時Flink計算平台運行在開源的Hadoop集群之上。採用Hadoop的YARN做為資源管理調度,以 HDFS作為數據存儲。因此,Flink可以和開源大數據軟體Hadoop無縫對接。

目前,這套基於Flink搭建的實時計算平台不僅服務於阿里巴巴集團內部,而且通過阿里雲的雲產品API向整個開發者生態提供基於Flink的雲產品支持。

Flink在阿里巴巴的大規模應用,表現如何?

規模: 一個系統是否成熟,規模是重要指標,Flink最初上線阿里巴巴只有數百台伺服器,目前規模已達上萬台,此等規模在全球范圍內也是屈指可數;

狀態數據: 基於Flink,內部積累起來的狀態數據已經是PB級別規模;

Events: 如今每天在Flink的計算平台上,處理的數據已經超過萬億條;

PS: 在峰值期間可以承擔每秒超過4.72億次的訪問,最典型的應用場景是阿里巴巴雙11大屏;

Flink的發展之路

接下來從開源技術的角度,來談一談Apache Flink是如何誕生的,它是如何成長的?以及在成長的這個關鍵的時間點阿里是如何進入的?並對它做出了那些貢獻和支持?

Flink誕生於歐洲的一個大數據研究項目StratoSphere。該項目是柏林工業大學的一個研究性項目。早期,Flink是做Batch計算的,但是在2014年,StratoSphere裡面的核心成員孵化出Flink,同年將Flink捐贈Apache,並在後來成為Apache的頂級大數據項目,同時Flink計算的主流方向被定位為Streaming,即用流式計算來做所有大數據的計算,這就是Flink技術誕生的背景。

2014年Flink作為主攻流計算的大數據引擎開始在開源大數據行業內嶄露頭角。區別於Storm,Spark Streaming以及其他流式計算引擎的是:它不僅是一個高吞吐、低延遲的計算引擎,同時還提供很多高級的功能。比如它提供了有狀態的計算,支持狀態管理,支持強一致性的數據語義以及支持Event Time,WaterMark對消息亂序的處理。

Flink核心概念以及基本理念

Flink最區別於其他流計算引擎的,其實就是狀態管理。

什麼是狀態?例如開發一套流計算的系統或者任務做數據處理,可能經常要對數據進行統計,如Sum,Count,Min,Max,這些值是需要存儲的。因為要不斷更新,這些值或者變數就可以理解為一種狀態。如果數據源是在讀取Kafka,RocketMQ,可能要記錄讀取到什麼位置,並記錄Offset,這些Offset變數都是要計算的狀態。

Flink提供了內置的狀態管理,可以把這些狀態存儲在Flink內部,而不需要把它存儲在外部系統。這樣做的好處是第一降低了計算引擎對外部系統的依賴以及部署,使運維更加簡單;第二,對性能帶來了極大的提升:如果通過外部去訪問,如Redis,HBase它一定是通過網路及RPC。如果通過Flink內部去訪問,它只通過自身的進程去訪問這些變數。同時Flink會定期將這些狀態做Checkpoint持久化,把Checkpoint存儲到一個分布式的持久化系統中,比如HDFS。這樣的話,當Flink的任務出現任何故障時,它都會從最近的一次Checkpoint將整個流的狀態進行恢復,然後繼續運行它的流處理。對用戶沒有任何數據上的影響。

Flink是如何做到在Checkpoint恢復過程中沒有任何數據的丟失和數據的冗餘?來保證精準計算的?

這其中原因是Flink利用了一套非常經典的Chandy-Lamport演算法,它的核心思想是把這個流計算看成一個流式的拓撲,定期從這個拓撲的頭部Source點開始插入特殊的Barries,從上游開始不斷的向下游廣播這個Barries。每一個節點收到所有的Barries,會將State做一次Snapshot,當每個節點都做完Snapshot之後,整個拓撲就算完整的做完了一次Checkpoint。接下來不管出現任何故障,都會從最近的Checkpoint進行恢復。

Flink利用這套經典的演算法,保證了強一致性的語義。這也是Flink與其他無狀態流計算引擎的核心區別。

下面介紹Flink是如何解決亂序問題的。比如星球大戰的播放順序,如果按照上映的時間觀看,可能會發現故事在跳躍。

在流計算中,與這個例子是非常類似的。所有消息到來的時間,和它真正發生在源頭,在線系統Log當中的時間是不一致的。在流處理當中,希望是按消息真正發生在源頭的順序進行處理,不希望是真正到達程序里的時間來處理。Flink提供了Event Time和WaterMark的一些先進技術來解決亂序的問題。使得用戶可以有序的處理這個消息。這是Flink一個很重要的特點。

接下來要介紹的是Flink啟動時的核心理念和核心概念,這是Flink發展的第一個階段;第二個階段時間是2015年和2017年,這個階段也是Flink發展以及阿里巴巴介入的時間。故事源於2015年年中,我們在搜索事業部的一次調研。當時阿里有自己的批處理技術和流計算技術,有自研的,也有開源的。但是,為了思考下一代大數據引擎的方向以及未來趨勢,我們做了很多新技術的調研。

結合大量調研結果,我們最後得出的結論是:解決通用大數據計算需求,批流融合的計算引擎,才是大數據技術的發展方向,並且最終我們選擇了Flink。

但2015年的Flink還不夠成熟,不管是規模還是穩定性尚未經歷實踐。最後我們決定在阿里內部建立一個Flink分支,對Flink做大量的修改和完善,讓其適應阿里巴巴這種超大規模的業務場景。在這個過程當中,我們團隊不僅對Flink在性能和穩定性上做出了很多改進和優化,同時在核心架構和功能上也進行了大量創新和改進,並將其貢獻給社區,例如:Flink新的分布式架構,增量Checkpoint機制,基於Credit-based的網路流控機制和Streaming SQL等。

阿里巴巴對Flink社區的貢獻

我們舉兩個設計案例,第一個是阿里巴巴重構了Flink的分布式架構,將Flink的Job調度和資源管理做了一個清晰的分層和解耦。這樣做的首要好處是Flink可以原生的跑在各種不同的開源資源管理器上。經過這套分布式架構的改進,Flink可以原生地跑在Hadoop Yarn和Kubernetes這兩個最常見的資源管理系統之上。同時將Flink的任務調度從集中式調度改為了分布式調度,這樣Flink就可以支持更大規模的集群,以及得到更好的資源隔離。

另一個是實現了增量的Checkpoint機制,因為Flink提供了有狀態的計算和定期的Checkpoint機制,如果內部的數據越來越多,不停地做Checkpoint,Checkpoint會越來越大,最後可能導致做不出來。提供了增量的Checkpoint後,Flink會自動地發現哪些數據是增量變化,哪些數據是被修改了。同時只將這些修改的數據進行持久化。這樣Checkpoint不會隨著時間的運行而越來越難做,整個系統的性能會非常地平穩,這也是我們貢獻給社區的一個很重大的特性。

經過2015年到2017年對Flink Streaming的能力完善,Flink社區也逐漸成熟起來。Flink也成為在Streaming領域最主流的計算引擎。因為Flink最早期想做一個流批統一的大數據引擎,2018年已經啟動這項工作,為了實現這個目標,阿里巴巴提出了新的統一API架構,統一SQL解決方案,同時流計算的各種功能得到完善後,我們認為批計算也需要各種各樣的完善。無論在任務調度層,還是在數據Shuffle層,在容錯性,易用性上,都需要完善很多工作。

篇幅原因,下面主要和大家分享兩點:

● 統一 API Stack

● 統一 SQL方案

先來看下目前Flink API Stack的一個現狀,調研過Flink或者使用過Flink的開發者應該知道。Flink有2套基礎的API,一套是DataStream,一套是DataSet。DataStream API是針對流式處理的用戶提供,DataSet API是針對批處理用戶提供,但是這兩套API的執行路徑是完全不一樣的,甚至需要生成不同的Task去執行。所以這跟得到統一的API是有沖突的,而且這個也是不完善的,不是最終的解法。在Runtime之上首先是要有一個批流統一融合的基礎API層,我們希望可以統一API層。

因此,我們在新架構中將採用一個DAG(有限無環圖)API,作為一個批流統一的API層。對於這個有限無環圖,批計算和流計算不需要涇渭分明的表達出來。只需要讓開發者在不同的節點,不同的邊上定義不同的屬性,來規劃數據是流屬性還是批屬性。整個拓撲是可以融合批流統一的語義表達,整個計算無需區分是流計算還是批計算,只需要表達自己的需求。有了這套API後,Flink的API Stack將得到統一。

除了統一的基礎API層和統一的API Stack外,同樣在上層統一SQL的解決方案。流和批的SQL,可以認為流計算有數據源,批計算也有數據源,我們可以將這兩種源都模擬成數據表。可以認為流數據的數據源是一張不斷更新的數據表,對於批處理的數據源可以認為是一張相對靜止的表,沒有更新的數據表。整個數據處理可以當做SQL的一個Query,最終產生的結果也可以模擬成一個結果表。

對於流計算而言,它的結果表是一張不斷更新的結果表。對於批處理而言,它的結果表是相當於一次更新完成的結果表。從整個SOL語義上表達,流和批是可以統一的。此外,不管是流式SQL,還是批處理SQL,都可以用同一個Query來表達復用。這樣以來流批都可以用同一個Query優化或者解析。甚至很多流和批的運算元都是可以復用的。

Flink的未來方向

首先,阿里巴巴還是要立足於Flink的本質,去做一個全能的統一大數據計算引擎。將它在生態和場景上進行落地。目前Flink已經是一個主流的流計算引擎,很多互聯網公司已經達成了共識:Flink是大數據的未來,是最好的流計算引擎。下一步很重要的工作是讓Flink在批計算上有所突破。在更多的場景下落地,成為一種主流的批計算引擎。然後進一步在流和批之間進行無縫的切換,流和批的界限越來越模糊。用Flink,在一個計算中,既可以有流計算,又可以有批計算。

第二個方向就是Flink的生態上有更多語言的支持,不僅僅是Java,Scala語言,甚至是機器學習下用的Python,Go語言。未來我們希望能用更多豐富的語言來開發Flink計算的任務,來描述計算邏輯,並和更多的生態進行對接。

最後不得不說AI,因為現在很多大數據計算的需求和數據量都是在支持很火爆的AI場景,所以在Flink流批生態完善的基礎上,將繼續往上走,完善上層Flink的Machine Learning演算法庫,同時Flink往上層也會向成熟的機器學習,深度學習去集成。比如可以做Tensorflow On Flink, 讓大數據的ETL數據處理和機器學習的Feature計算和特徵計算,訓練的計算等進行集成,讓開發者能夠同時享受到多種生態給大家帶來的好處。

『伍』 疫情期間是聽鄉政府的還是疾控中心的

疫情期間,肯定是聽政府的啊!疾控也得根據政府的指令做事啊!疫情期間,肯定是要居家隔離的啊!這個具體情況具體對待!
疾病預防控制中心
主管國家疾病預防控制的業務機構
疾病預防控制中心,簡稱疾控中心,如:四川省疾病預防控制中心;沈陽軍區疾控中心。「疾病控制中心」一詞來自美國主管國家疾病預防控制的業務機構。
截至2021年末,全國共有疾病預防控制中心3380個。[1]
中文名
疾病預防控制中心
外文名
Center for Disease Control and Prevention
英文簡稱
CDC
中文簡稱
疾控中心
相關視頻
2萬播放|01:27
不再用YES或NO,美國疾控中心又改了,他們改用「大約」
5006播放|01:38
重大轉變!CDC取消美國醫療機構口罩令,專家表擔憂……
5518播放|00:40
果然視頻|棗庄疾病預防控制中心喬遷新址
5098播放|00:20
8月29日 哈爾濱市疾病預防控制中心發布最新緊急尋人提示
9187播放|05:38
359分上岸中國疾病預防控制中心,學姐是如何跨考逆襲的
6165播放|09:20
100多萬人死於新冠後,美國CDC整改!福奇「跑路」,特朗普喊冤?
9267播放|03:06
酒泉市疾病預防控制中心:為百姓樹起健康屏障
5208播放|01:25
尚矽谷大數據培訓:Flink CDC-剖析DataStream、FlinkSQL-02
1.3萬播放|00:55
疾病預防控制中心有什麼福利
8668播放|02:30
煙癮是病別自己扛 【戒煙時嗜睡頭暈咳黑痰,原來都是戒斷症狀!】 戒煙時嗜睡頭暈咳黑痰是戒斷症狀 2022年5月31日是第35個世界無煙日。為加強控煙科普宣傳,充分發揮醫護人員示範作用,調動醫護人員爭做控煙的傳播者和踐行者,倡導醫護人員以身作則不吸煙,發動身邊人遠離煙草,河南省疾病預防控制中心、河南控煙APP聯合大河網共同組織策劃「煙癮是病 別自己扛」系列短視頻,邀請我省相關醫療衛生機構戒煙門診
查看更多
名稱定義使命職責相關職責科技成果統計數據TA說
名稱
英文全稱:Center for Disease Control and Prevention
如:四川省疾病預防控制中心;沈陽軍區疾控中心;
定義
疾病控制中心一詞來自美國主管國家疾病預防控制的業務機構,現更名為疾病控制與預防中心(center for disease control and prevention,簡稱CDC或CDCP)。目前,我國已建立"中國疾病預防控制中心(China CDC)",並且在各省、自治區、直轄市設立了相應的分支機構。 中國疾病預防控制中心(以下簡稱中國疾控中心),是由政府舉辦的實施國家級疾病預防控制與公共衛生技術管理和服務的公益事業單位。
其使命是通過對疾病、殘疾和傷害的預防控制,創造健康環境,維護社會穩定,保障國家安全,促進人民健康;其宗旨是以科研為依託、以人才為根本、以疾控為中心。在衛生部領導下,發揮技術管理及技術服務職能,圍繞國家疾病預防控制重點任務,加強對疾病預防控制策略與措施的研究,做好各類疾病預防控制工作規劃的組織實施;開展食品安全、職業安全、健康相關產品安全、放射衛生、環境衛生、婦女兒童保健等各項公共衛生業務管理工作,大力開展應用性科學研究,加強對全國疾病預防控制和公共衛生服務的技術指導、培訓和質量控制,在防病、應急、公共衛生信息能力的建設等方面發揮國家隊的作用。
在我國歷史上,傳染病曾經是嚴重威脅人民健康和生命安全的疾病。上世紀50年代,因傳染病和寄生蟲病死亡人數居於全國人口死因中的第一位。經過多年的努力,目前下降到第9位,並在發展中國家中率先消滅了天花和脊髓灰質炎等重大傳染病。我國雖然是一個自然災害頻繁的國家,但多年來成功地實現了大災之後無大疫。2003年戰勝了來勢兇猛的非典疫情,近兩年又成功地控制了禽流感向人類的傳播。目前,全國正在建立健全艾滋病、結核病、血吸蟲病、乙型肝炎等嚴重傳染病的預防控制和醫療救治體系。據2004年30個市和78個縣(縣級市)死因統計,城市居民前十位死因為:①惡性腫瘤126.4/10萬,②腦血管病100.9/10萬,③心臟病99.4/10萬,④呼吸系病69.3/10萬,⑤損傷及中毒31.1/10萬,⑥消化系病17.1/10萬,⑦內分泌、營養和代謝疾病14.9/10萬,⑧泌尿生殖系病9.5/10萬,⑨神經系病4.6/10萬,⑩圍生期病168.5/10萬活產,前十位死因合計占死亡總數的90.1%。農村居民前十位死因為:①惡性腫瘤119.7/10萬,②腦血管病74.9/10萬,③呼吸系病67.2/10萬,④心臟病63.4/10萬,⑤損傷及中毒33.5/10萬,⑥消化系病14.2/10萬,⑦內分泌、營養及代謝疾病12.7/10萬,⑧泌尿生殖系病8.1/10萬,⑨圍生期病363.9/10萬活產,⑩肺結核3.3/10萬。前十位死因合計占死亡總數的79.3%。

『陸』 Hive sql及窗口函數

hive函數:

1、根據指定條件返回結果:case when then else end as

2、基本類型轉換:CAST()

3、nvl:處理空欄位:三個str時,是否為空可以指定返回不同的值

4、sql通配符: https://www.w3school.com.cn/sql/sql_wildcards.asp

5、count(1)與COUNT(*):返回行數

如果表沒有主鍵,那麼count(1)比count(*)快;

如果有主鍵,那麼count(主鍵,聯合主鍵)比count(*)快;

count(1)跟count(主鍵)一樣,只掃描主鍵。count(*)跟count(非主鍵)一樣,掃描整個表。明顯前者更快一些。

性能問題:

1.任何情況下SELECT COUNT(*) FROM tablename是最優選擇,(指沒有where的情況);

2.盡量減少SELECT COUNT(*) FROM tablename WHERE COL = 『value』 這種查詢;

3.杜絕SELECT COUNT(COL) FROM tablename WHERE COL2 = 『value』 的出現。

count(expression):查詢 is_reply=0 的數量: SELECT COUNT(IF(is_reply=0,1,NULL)) count FROM t_iov_help_feedback;

6、distinct與group by

distinct去重所有distinct之後所有的欄位,如果有一個欄位值不一致就不作為一條

group by是根據某一欄位分組,然後查詢出該條數據的所需欄位,可以搭配 where max(time)或者Row_Number函數使用,求出最大的一條數據

7、使用with 臨時表名 as() 的形式,簡單的臨時表直接嵌套進sql中,復雜的和需要復用的表寫到臨時表中,關聯的時候先找到關聯欄位,過濾條件最好在臨時表中先過濾後關聯

處理json的函數:

split(json_array_string(schools), '\\|\\|') AS schools

get_json_object(school, '$.id') AS school_id,

字元串函數:

1、instr(』源字元串』 , 『目標字元串』 ,』開始位置』,』第幾次出現』)

instr(sourceString,destString,start,appearPosition)

1.sourceString代表源字元串; destString代表要從源字元串中查找的子串;

2.start代表查找的開始位置,這個參數可選的,默認為1;

3.appearPosition代表想從源字元中查找出第幾次出現的destString,這個參數也是可選的, 默認為1

4.如果start的值為負數,則代表從右往左進行查找,但是位置數據仍然從左向右計算。

5.返回值為:查找到的字元串的位置。如果沒有查找到,返回0。

最簡單例子: 在abcd中查找a的位置,從第一個字母開始查,查找第一次出現時的位置

select instr(『abcd』,』a』,1,1) from al; —1

應用於模糊查詢:instr(欄位名/列名, 『查找欄位』)

select code,name,dept,occupation from staff where instr(code, 『001』)> 0;

等同於 select code, name, dept, occupation from staff where code like 『%001%』 ;

應用於判斷包含關系:

select ccn,mas_loc from mas_loc where instr(『FH,FHH,FHM』,ccn)>0;

等同於 select ccn,mas_loc from mas_loc where ccn in (『FH』,』FHH』,』FHM』);

2、substr(string A,int start,int len)和 substring(string A,int start,int len),用法一樣

substr(time,1,8) 表示將time從第1位開始截取,截取的長度為8位

第一種用法:

substr(string A,int start)和 substring(string A,int start),用法一樣

功效:返回字元串A從下標start位置到結尾的字元串

第二種用法:

substr(string A,int start,int len)和 substring(string A,int start,int len),用法一樣

功效:返回字元串A從下標start位置開始,長度為len的字元串

3、get_json_object(form_data,'$.學生姓名') as student_name

json_tuple 函數的作用:用來解析json字元串中的多個欄位

4、split(full_name, '\\.') [5] AS zq;  取的是數組里的第六個

日期(時間)函數:

1、to_date(event_time) 返回日期部分

2、date_sub:返回當前日期的相對時間

當前日期:select curdate() 

當前日期前一天:select  date_sub(curdate(),interval 1 day)

當前日期後一天:select date_sub(curdate(),interval -1 day)

date_sub(from_unixtime(unix_timestamp(), 'yyyy-MM-dd HH:mm:ss'), 14)  將現在的時間總秒數轉為標准格式時間,返回14天之前的時間

時間戳>>>>日期:

from_unixtime(unix_timestamp(), 'yyyy-MM-dd HH:mm:ss') 將現在的時間總秒數轉為標准格式時間

from_unixtime(get_json_object(get_json_object(form_data,'$.挽單時間'),'$.$date')/1000) as retain_time

unix_timestamp('2019-08-15 16:40:00','yyyy-MM-dd HH:mm:ss')  --1565858400

日期>>>>時間戳:unix_timestamp()

date_format:yyyy-MM-dd HH:mm:ss 時間轉格式化時間

select date_format('2019-10-07 13:24:20', 'yyyyMMdd000000')-- 20191007000000select date_format('2019-10-07', 'yyyyMMdd000000')-- 20191007000000

1.日期比較函數: datediff語法: datediff(string enddate,string startdate) 

返回值: int 

說明: 返回結束日期減去開始日期的天數。 

舉例:  hive> select datediff('2016-12-30','2016-12-29');  1

2.日期增加函數: date_add語法: date_add(string startdate, intdays) 

返回值: string 

說明: 返回開始日期startdate增加days天後的日期。 

舉例:  hive>select date_add('2016-12-29',10);  2017-01-08

3.日期減少函數: date_sub語法: date_sub (string startdate,int days) 

返回值: string 

說明: 返回開始日期startdate減少days天後的日期。 

舉例:  hive>select date_sub('2016-12-29',10);  2016-12-19

4.查詢近30天的數據

select * from table where datediff(current_timestamp,create_time)<=30;

create_time 為table里的欄位,current_timestamp 返回當前時間 2018-06-01 11:00:00

3、trunc()函數的用法:當前日期的各種第一天,或者對數字進行不四捨五入的截取

日期:

1.select trunc(sysdate) from al  --2011-3-18  今天的日期為2011-3-18

2.select trunc(sysdate, 'mm')   from   al  --2011-3-1    返回當月第一天.

上月1號    trunc(add_months(current_date(),-1),'MM')

3.select trunc(sysdate,'yy') from al  --2011-1-1       返回當年第一天

4.select trunc(sysdate,'dd') from al  --2011-3-18    返回當前年月日

5.select trunc(sysdate,'yyyy') from al  --2011-1-1   返回當年第一天

6.select trunc(sysdate,'d') from al  --2011-3-13 (星期天)返回當前星期的第一天

7.select trunc(sysdate, 'hh') from al   --2011-3-18 14:00:00   當前時間為14:41  

8.select trunc(sysdate, 'mi') from al  --2011-3-18 14:41:00   TRUNC()函數沒有秒的精確

數字:TRUNC(number,num_digits) Number 需要截尾取整的數字。Num_digits 的默認值為 0。TRUNC()函數截取時不進行四捨五入

11.select trunc(123.458,1) from al --123.4

12.select trunc(123.458,-1) from al --120

4、round():四捨五入:

select round(1.455, 2)  #結果是:1.46,即四捨五入到十分位,也就是保留兩位小數

select round(1.5)  #默認四捨五入到個位,結果是:2

select round(255, -1)  #結果是:260,即四捨五入到十位,此時個位是5會進位

floor():地板數

ceil()天花板數

5、

6.日期轉年函數: year語法:   year(string date) 

返回值: int

說明: 返回日期中的年。

舉例:

hive>   select year('2011-12-08 10:03:01') from al;

2011

hive>   select year('2012-12-08') fromal;

2012

7.日期轉月函數: month語法: month   (string date) 

返回值: int

說明: 返回日期中的月份。

舉例:

hive>   select month('2011-12-08 10:03:01') from al;

12

hive>   select month('2011-08-08') fromal;

8

8.日期轉天函數: day語法: day   (string date) 

返回值: int

說明: 返回日期中的天。

舉例:

hive>   select day('2011-12-08 10:03:01') from al;

8

hive>   select day('2011-12-24') fromal;

24

9.日期轉小時函數: hour語法: hour   (string date) 

返回值: int

說明: 返回日期中的小時。

舉例:

hive>   select hour('2011-12-08 10:03:01') from al;

10

10.日期轉分鍾函數: minute語法: minute   (string date) 

返回值: int

說明: 返回日期中的分鍾。

舉例:

hive>   select minute('2011-12-08 10:03:01') from al;

3

11.日期轉秒函數: second語法: second   (string date) 

返回值: int

說明: 返回日期中的秒。

舉例:

hive>   select second('2011-12-08 10:03:01') from al;

1

12.日期轉周函數: weekofyear語法:   weekofyear (string date) 

返回值: int

說明: 返回日期在當前的周數。

舉例:

hive>   select weekofyear('2011-12-08 10:03:01') from al;

49

查看hive表在hdfs中的位置:show create table 表名;

在hive中hive2hive,hive2hdfs:

HDFS、本地、hive -----> Hive:使用 insert into | overwrite、loaddata local inpath "" into table student;

Hive ----> Hdfs、本地:使用:insert overwrite | local

網站訪問量統計:

uv:每用戶訪問次數

ip:每ip(可能很多人)訪問次數

PV:是指頁面的瀏覽次數

VV:是指你訪問網站的次數

sql:

基本函數:

count、max、min、sum、avg、like、rlike('2%'、'_2%'、%2%'、'[2]')(java正則)

and、or、not、in   

where、group by、having、{ join on 、full join}  、order by(desc降序)

sort by需要與distribut by集合結合使用:

hive (default)> set maprece.job.reces=3;  //先設置rece的數量 

insert overwrite local directory '/opt/mole/datas/distribute-by'

row format delimited fields terminated by '\t'

先按照部門編號分區,再按照員工編號降序排序。

select * from emp distribute by deptno sort by empno desc;

外部表  create external table if not exists dept

分區表:create table dept_partition ( deptno int, dname string, loc string )  partitioned by ( month string )

load data local inpath '/opt/mole/datas/dept.txt' into table default.dept_partition partition(month='201809'); 

 alter table dept_partition add/drop partition(month='201805') ,partition(month='201804');

多分區聯合查詢:union

select * from dept_partition2 where month='201809' and day='10';

show partitions dept_partition;

desc formatted dept_partition;

二級分區表:create table dept_partition2 ( deptno int, dname string, loc string ) partitioned by (month string, day string) row format delimited fields terminated by '\t';

分桶抽樣查詢:分區針對的是數據的存儲路徑;分桶針對的是數據文件

create table stu_buck(id int, name string) clustered by(id) into 4 bucketsrow format delimited fields terminated by '\t';

設置開啟分桶與rece為1:

set hive.enforce.bucketing=true;

set maprece.job.reces=-1;

分桶抽樣:select * from stu_bucktablesample(bucket x out of y on id);

抽取,桶數/y,x是從哪個桶開始抽取,y越大 抽樣數越少,y與抽樣數成反比,x必須小於y

給空欄位賦值:

如果員工的comm為NULL,則用-1代替或用其他欄位代替  :select nvl(comm,-1) from emp;

case when:如何符合記為1,用於統計、分組統計

select dept_id, sum(case sex when '男' then 1 else 0 end) man , sum(case sex when '女' then 1 else 0 end) woman from emp_sex group by dept_id;

用於組合歸類匯總(行轉列):UDAF:多轉一

concat:拼接查詢結果

collect_set(col):去重匯總,產生array類型欄位,類似於distinct

select t.base, concat_ws('|',collect_set(t.name))   from (select concat_ws(',',xingzuo,blood_type) base,name  from person_info) t group by t.base;

解釋:先第一次查詢得到一張沒有按照(星座血型)分組的表,然後分組,使用collect_set將名字組合成數組,然後使用concat將數組變成字元串

用於拆分數據:(列轉行):UDTF:一轉多

explode(col):將hive一列中復雜的array或者map結構拆分成多行。

lateral view  側面顯示:用於和UDTF一對多函數搭配使用

用法:lateral view udtf(expression) tablealias as cate

cate:炸開之後的列別名

temptable :臨時表表名

解釋:用於和split, explode等UDTF一起使用,它能夠將一列數據拆成多行數據,在此基礎上可以對拆分後的數據進行聚合。

開窗函數:

Row_Number,Rank,Dense_Rank  over:針對統計查詢使用

Row_Number:返回從1開始的序列

Rank:生成分組中的排名序號,會在名詞s中留下空位。3 3 5

dense_rank:生成分組中的排名序號,不會在名詞中留下空位。3 3 4

over:主要是分組排序,搭配窗口函數使用

結果:

SUM、AVG、MIN、MAX、count

preceding:往前

following:往後

current row:當前行

unbounded:unbounded preceding 從前面的起點, unbounded following:到後面的終點

sum:直接使用sum是總的求和,結合over使用可統計至每一行的結果、總的結果、當前行+之前多少行/之後多少行、當前行到往後所有行的求和。

over(rowsbetween 3/current )  當前行到往後所有行的求和

ntile:分片,結合over使用,可以給數據分片,返回分片號

使用場景:統計出排名前百分之或n分之一的數據。

lead,lag,FIRST_VALUE,LAST_VALUE

lag與lead函數可以返回上下行的數據

lead(col,n,dafault) 用於統計窗口內往下第n行值

第一個參數為列名,第二個參數為往下第n行(可選,默認為1),第三個參數為默認值(當往下第n行為NULL時候,取默認值,如不指定,則為NULL)

LAG(col,n,DEFAULT) 用於統計窗口內往上第n行值

第一個參數為列名,第二個參數為往上第n行(可選,默認為1),第三個參數為默認值(當往上第n行為NULL時候,取默認值,如不指定,則為NULL)

使用場景:通常用於統計某用戶在某個網頁上的停留時間

FIRST_VALUE:取分組內排序後,截止到當前行,第一個值

LAST_VALUE:取分組內排序後,截止到當前行,最後一個值

范圍內求和: https://blog.csdn.net/happyrocking/article/details/105369558

cume_dist,percent_rank

–CUME_DIST :小於等於當前值的 行數 / 分組內總行數

–比如,統計小於等於當前薪水的人數,占總人數的比例

percent_rank:分組內當前行的RANK值-1/分組內總行數-1

總結:

在Spark中使用spark sql與hql一致,也可以直接使用sparkAPI實現。

HiveSql窗口函數主要應用於求TopN,分組排序TopN、TopN求和,前多少名前百分之幾。

與Flink窗口函數不同。

Flink中的窗口是用於將無線數據流切分為有限塊處理的手段。

window分類:

CountWindow:按照指定的數據條數生成一個 Window,與時間無關。

TimeWindow:按照時間生成 Window。

1. 滾動窗口(Tumbling Windows):時間對齊,窗口長度固定,不重疊::常用於時間段內的聚合計算

2.滑動窗口(Sliding Windows):時間對齊,窗口長度固定,可以有重疊::適用於一段時間內的統計(某介面最近 5min 的失敗率來報警)

3. 會話窗口(Session Windows)無時間對齊,無長度,不重疊::設置session間隔,超過時間間隔則窗口關閉。

『柒』 flink 1.10 1.12區別

flink 1.10 1.12區別在於Flink 1.12 支持了 Flink SQL Kafka upsert connector 。

因為在 Flink 1.10 中,當前這類任務開發對於用戶來說,還是不夠友好,需要很多代碼,同時也會造成 Flink SQL 冗長。

Flink 1.12 SQL Connector 支持 Kafka Upsert Connector,這也是我們公司內部業務方對實時平台提出的需求。

收益:便利用戶有這種需要從 kafka 取最新記錄操作的實時任務開發,比如這種 binlog -> kafka,然後用戶聚合操作,這種場景還是非常多的,這能提升實時作業開發效率,同時 1.12 做了優化,性能會比單純的 last_value 性能要好。

Flink Yarn 作業 On k8s 的生產級別能力是:

Flink Jar 作業已經全部 K8s 化,Flink SQL 作業由於是推廣初期,還是在 Yarn 上面進行運行,為了將實時計算 Flink 全部K8s化。

所以我們 Flink SQL 作業也需要遷移到 K8s,目前 Flink 1.12 已經滿足生產級別的 Flink k8s 功能,所以 Flink SQL K8s 化,打算直接使用社區的 On k8s 能力。

風險:雖然和社區的人溝通,Flink 1.12 on k8s 沒有什麼問題,但是具體功能還是需要先 POC 驗證一下,同時可能社區 Flink on k8s 的能力。

可能會限制我們這邊一些 k8s 功能使用,比如 hostpath volome 以及 Ingress 的使用,這里可能需要改底層源碼來進行快速支持(社區有相關 JIRA 要做)。

『捌』 flinksql自定義topN函數的代碼

摘要 當前 Flink 有如下幾種函數:

『玖』 流批一體不只有Flink,還有實時數據模型

通常來講,數據倉庫的建設,都是以離線作為主要的密報,下游的應用,不論是報表還是介面,所提供的數據也大多是T-1時效性。

但伴隨著業務的變化,當離線做到沒什麼可以繼續做的時候,實時就會被拿出來,作為新一個階段的目標進行攻克。

在流批一體建設之前,這種實時訴求通常會開發成分鍾級的任務,通過近實時的方案來解決業務的問題,但分鍾級會帶來諸如任務過多、資源擠占較大、無法支持復雜邏輯等問題。

因此專門支持實時計算的框架,比如早期的Storm,能夠嘗試從純實時的角度解決業務問題,就被拿出來作為嘗試。然而Storm的局限性也很大,因為那會的任務開發只能通過Java的方式來進行,與Hive所推崇的純SQL方案相比,上手難度大了不少,同時兩套代碼的邏輯幾乎沒有可比性,這種方案也就一直沒有什麼聲音。

盡管實時技術有各種缺陷,但作為一種能夠很容易講清楚價值的項目,同時又非常便於向上匯報的技術方案,實時技術還是被或多或少的做了起來。在大多數的公司里,實時和離線就會有不同的團隊進行維護,或者是同一個團隊,但分成不同的項目來執行。這個階段,優先高效的把業務做起來,哪怕場景再簡單,但能夠證明實時有價值和前景,這個階段的目標就算完成了。

以上的各種方案,難免會帶來三個特別難以解決的問題:

(1)數據的口徑上,實時和離線很容易不統一;
(2)數據模型的規范上,實時和離線也往往是分開建設;
(3)即便是同一種口徑和同一種規范,實時和離線也要分成兩套代碼來維護。

這三個問題短時間內會被高速發展掩蓋掉,但當業務對實時的訴求越來越多、壓力越來越大的時候,口徑和代碼的不統一,就會越來越成為阻礙敏捷開發的障礙,需要有方案進行解決。

後來Flink出現了,帶來了流批一體的全新方案,這個問題便出現了解決的曙光,這也比較接近我們對於實時計算的理想方案,因為其意義堪比Hive,也成為了各個大廠面試的標配問題。

然而,僅僅學會Flink是不夠的,因為流批一體帶來的並不僅僅是技術方案或者是框架的改變,同樣帶來了數據模型的改變,這就要求我們從數據模型上,而不是技術方案上,來制定我們的實時方案。

那麼我們如何理解「實時數據模型」這件事情呢?

通常而言,我們關心的內容,包括如下幾個方面:

(1)實時數據源與離線數據源存在差異,導致相同的欄位,取值或者類型會存在不相等的情況;
(2)實時和離線由於底層執行機制的不同,通常需要維護兩套代碼,會帶來諸如口徑不統一、質量檢測難的問題;
(3)產品邏輯變化較快時,離線模型修改相對容易,但實時模型需要考慮壓測、削峰、重啟等技術問題,維護成本非常高昂。

數據倉庫之所以能夠普及並被業務接受,正是因為其模型能夠屏蔽掉底層差異的問題,並且有相對可靠的數據質量監控方法,並且變更成本非常低。而實時數倉如果想要替代掉離線數倉,以上的問題通常是需要一些模型設計甚至是平台工具的來解決,這些問題解決的重要性,並不比Flink弱。

我們先從比較可控的模型層面說起。

在離線的概念里,數倉模型設計成了DWD/DWS/ADS三個層級,原本的概念是DWD面向事實表的構建,DWS面向公共指標的統一,ADS負責靈活的口徑變化問題。

在離線的概念里,DWD/DWS/ADS三個層級需要保留,但負責的目標會有一些變化,同時還需要增加存儲統一層,也就是以TiDB/Holo為代表的資料庫,來承擔服務分析一體化的訴求。

讓我們先看DWD層,DWD承擔了屏蔽實時離線鏈路差異的問題,最重要的作用是保證表結構的統一及欄位內容的對齊。DWD最重要的意義,是保證離線表和實時表,其表結構和欄位概念是相同的。

為什麼這么強調?試想一下,在離線場景下,我們可以在DWD上靈活的增加各種統計標簽,或者是將維度退化到事實表,都是一些left join或者是服務端直接打標可以解決的事情。但在實時場景下,這會變成多流join或者是緩存等更復雜的技術場景,導致這些信息並不能有效的記錄到DWD,因此DWD的設計就要產生一些變化,有一些內容在實時場景下無法准確記錄,這一類信息需要標識到對應的欄位描述上,下游使用時才不會出錯。

同時,實時和離線存儲數據的介質,也必然有一些區別。例如離線可以存在HDFS上,實時則可能視情況保存在資料庫、HDFS甚至是內存中,這時候對於欄位格式、讀取方式都會有差異,設計表時其約束條件也會更多。

因而,DWD更多承擔了邏輯統一的職責,依舊以事實表為基礎,但約束條款要比離線更多。

再看一下DWS層,離線上DWS是負責口徑統一的重要一環,將通用的維度和口徑計算方法抽象出現,以提供跨數據域的靈活使用。但在實時場景下,這一類的維護收益通常都比較低,不僅因為實時只看當天的數據,也是因為實時本身的維度難度就較大,多一層模型其收益會急速下降,因而大多數時候會忽略掉DWS的建設,ADS直接引用DWD進行統計。

然而,DWS畢竟存儲的內容要比DWD少很多,因此如果計算資源瓶頸非常明顯,或者是業務場景不需要分析實時明細數據的情況下,或者是DWD的下游引用過多時,DWS可以承擔削峰的重任,通過減少數據量以應對大促等場景,還是有一定意義的。

接下來就是最重要的ADS層,在這一層上,邏輯統一、口徑統一、大促削峰在前置模型上都得到了一定程度的解決,ADS則像離線一樣承擔了應對需求變化的重任。

但ADS所面臨的情況和離線還是有所不同的,因為ADS的任務啟動,不僅要啟動一個離線的跑批任務,還要同時啟動一個實時的流式任務,而ADS往往會同時統計離線+實時的結果,以應對同比、環比等場景。

這時候很多具體Case要具體分析了,因為特定場景的坑會非常多。例如最常見的「同比」,要對比今年和去年的結果變化,離線往往會統計分小時的結果,但實時會累計起始時刻到當期時刻的結果,因而當一個小時沒有結束的時候,這個同比的波動變化會非常大,給人一種「數據是錯誤的」印象,新手很容易踩這個坑,從而被業務質疑。

因此,針對累計統計指標,從代碼設計上就要考慮到這種情況,都根據時間欄位統計起始到當前時刻的結果的,在代碼邏輯上會要求一些統計技巧。

很多時候,因為業務指標變化太快,改實時代碼是來不及的,這時候一部分的工作量甚至需要報表工具的數據集來解決,改動查詢sql,要比改動任務來的快捷多了。但這部分的能力,其實是依賴於存儲工具的,個人認為可以分到存儲統一層來解決。

最後是存儲統一層,因為一些特殊的場景,比如實時分析明細數據,或者是不確定時間周期的多天統計結果,如果依賴Flink SQL來解決是有些不現實的,因而這部分的壓力需要資料庫來承擔。

簡單講,就是將明細做輕度的匯總後,直接寫到資料庫,實時更新,下游自定義條件,並直接讀庫統計結果。這種場景既要求資料庫有OLAP的計算能力,也要有OLTP的穩定特點,因而TiDB和Holo這一類HTAP的引擎就變得非常重要。

因為多了實時的部分,因此過去面向離線的開發工具,也需要有一些特定的改造,以適應實時的開發和運維訴求。

對於開發工具而言,其目標集中在四個場景上:元數據定義與獲取、數據建模、開發與測試、運維與監控。

其次講數據建模,因為建模的理論已經穩定了有些年頭了,絕大多數場景下都是按照既定的方案來執行。過去離線當道時,規范執行的弱一些不是什麼大問題,但流批一體當道的年代,規范是需要強約束的,這就對了開發工具提出了一定的要求,是否能夠從平台層面上對規范進行內置,並以此來約束開發的同學,降低不規范模型對後期維護帶來的壓力。

這種建模能力的代表有兩種,一種是規范表的命名,填寫相應的分層+主題域+數據域+統計刷新方式,從源頭上規范表的目標和作用;一種是規范指標的定義和使用,例如原子指標還是派生指標,統計周期多少,業務限定用語如何規范,統計粒度怎麼填寫。

在實際開發中,通過工具的限制,如果規范可以做的好,代碼是可以自動生成出來的。當然,以上的功能,都屬於通過犧牲開發效率,來提升數據質量的范疇,使用時需要根據團隊的情況來限定。

再次是開發和測試,這是平台提供的最重要的能力。在開發層面,就是代碼的預編譯能力+發布功能。預編譯不僅要檢查代碼的邏輯是否正確,同時對於代碼中依賴的其他數據源,獲取到的元數據信息是否准確,至少欄位的命名不會有大的問題。當代碼預編譯通過,發布上線後,還需要檢測當前是否有資源支持任務啟動,並且上游的消息隊列是否是啟動的狀態。

實時的測試一直都是比較大的問題,它不像離線可以啟動一個SQL任務看看結果,實時在每個階段的輸入和輸出,是需要通過平台支持的日誌列印功能來進行輔助的。很多時候我們會新建一個測試專用的topic來測試結果,但對於流量較大的線上任務而言,這種方式無法像離線區分Dev環境一樣,能夠對資源進行隔離,因而如果能夠支持圈定數據的輸入和列印輸出,對於測試的效率而言無疑是最佳的。

最後要提到的是運維與監控能力。運維能力是指根據輸入的RPS,或者是cu使用情況,或者是任務的整體延遲,提供相應的參數調優能力,通過參數來調整任務的執行情況。並且能夠根據以上指標的變化,自定義相應的閾值,提供相應的告警能力,通過簡訊或者是消息工具的方式觸達任務維護者。

實時與離線有一些不同的是,離線可以通過增加一個監控節點的方式,通過group by判斷數據是否重復,而實時任務則非常依賴Flink自身的一致性能力,因而發現和解決問題的成本更高。

其實做到運維這個環節,對人的要求其實是更高的。因為流批一體在運維上會帶來一個好處,即實時任務和離線任務能夠錯峰執行,實時在白天壓力大,而離線在晚上壓力大。但同樣的,這種方式對於維護者而言更加痛苦,因為不僅晚上要熬夜值班,白天同樣不能休息,在大促期間甚至需要輪班來維護任務,可以說是「匯報一時爽,痛苦長相伴」。

從遠處來看,流任務和批任務,在自身的機制上就存在非常大的差異,批程序面上的是特定時間內相對靜態的數據,而流程序處理的則是change-log,雖然有可能數據在表結構層面,通過數據模型的設計來保持一致,但是在語義層面,其根本還是不一樣的。這一點可能是最制約批流一體發展的問題,也是最難實現統一或者永遠也不可能統一的。

綜上,對於實時模型,開發工具需要將監控實時部分的能力進行補全,就像DWD層需要分別維護實時和離線兩套架構一樣,開發工具也需要分別維護兩套架構的結果,因而現階段的實時開發,還做不到降低維護和開發的成本,只能減輕其中部分環節的工作量。

以上講了很長時間的實時模型,但從實際的效果上看,業務並不會感知到多麼明顯的技術變化,相反會有一種「面子工程」的感覺在裡面。

當然,我並不否認實時的價值,在「搜廣推」這三個技術佔主導的領域內,作用還是很大的。但實時畢竟要比離線的內容,更加的難以理解,出現問題的排查成本也更高。這種復雜性使得我們在應對變化時,往往做不出有效的應對,就會變得特別被動。

因而,說一句事後的話,就是「實時的價值取決於業務方,而不是技術方」。只有業務對實時痛點強烈的場景下,我們做如此復雜的研究和應對,才能體現出自己的價值,更多的時候,是在「王婆賣瓜,自賣自誇」。有這種投入,還不如多招幾個分析師更靠譜和實在。

本人之前的文章《天下數據,唯快不破》,重點強調了一個「快」字。但「天下熙熙皆為利來,天下攘攘皆為利往」,這個快更多的是在講應對「變化」的快,而不是「技術」自己的快。

所以,為了以後的職業發展,我們要跟進實時技術的變化,但從自身的工作角度出發,如何應對業務的變化,才是自己要關心的課題。