当前位置:首页 » 编程语言 » flinksql实现行转列
扩展阅读
webinf下怎么引入js 2023-08-31 21:54:13
堡垒机怎么打开web 2023-08-31 21:54:11

flinksql实现行转列

发布时间: 2022-10-15 11:17:05

‘壹’ flinksql自定义topN函数的代码

摘要 package day07;

‘贰’ Apache Flink现在在大数据处理方面能够和Apache Spark分庭抗礼么

我们是否还需要另外一个新的数据处理引擎?当我第一次听到flink的时候这是我是非常怀疑的。在大数据领域,现在已经不缺少数据处理框架了,但是没有一个框架能够完全满足不同的处理需求。自从Apache spark出现后,貌似已经成为当今把大部分的问题解决得最好的框架了,所以我对另外一款解决类似问题的框架持有很强烈的怀疑态度。
不过因为好奇,我花费了数个星期在尝试了解flink。一开始仔细看了flink的几个例子,感觉和spark非常类似,心理就倾向于认为flink又是一个模仿spark的框架。但是随着了解的深入,这些API体现了一些flink的新奇的思路,这些思路还是和spark有着比较明显的区别的。我对这些思路有些着迷了,所以花费了更多的时间在这上面。
flink中的很多思路,例如内存管理,dataset API都已经出现在spark中并且已经证明 这些思路是非常靠谱的。所以,深入了解flink也许可以帮助我们分布式数据处理的未来之路是怎样的
在后面的文章里,我会把自己作为一个spark开发者对flink的第一感受写出来。因为我已经在spark上干了2年多了,但是只在flink上接触了2到3周,所以必然存在一些bias,所以大家也带着怀疑和批判的角度来看这篇文章吧。
Apache Flink是什么
flink是一款新的大数据处理引擎,目标是统一不同来源的数据处理。这个目标看起来和spark和类似。没错,flink也在尝试解决spark在解决的问题。这两套系统都在尝试建立一个统一的平台可以运行批量,流式,交互式,图处理,机器学习等应用。所以,flink和spark的目标差别并不大,他们最主要的区别在于实现的细节。
后面我会重点从不同的角度对比这两者。
Apache Spark vs Apache Flink
1.抽象 Abstraction
spark中,对于批处理我们有RDD,对于流式,我们有DStream,不过内部实际还是RDD.所以所有的数据表示本质上还是RDD抽象。
后面我会重点从不同的角度对比这两者。在flink中,对于批处理有DataSet,对于流式我们有DataStreams。看起来和spark类似,他们的不同点在于:
一)DataSet在运行时是表现为运行计划(runtime plans)的
在spark中,RDD在运行时是表现为java objects的。通过引入Tungsten,这块有了些许的改变。但是在flink中是被表现为logical plan(逻辑计划)的,听起来很熟悉?没错,就是类似于spark中的dataframes。所以在flink中你使用的类Dataframe api是被作为第一优先级来优化的。但是相对来说在spark RDD中就没有了这块的优化了。
flink中的Dataset,对标spark中的Dataframe,在运行前会经过优化。
在spark 1.6,dataset API已经被引入spark了,也许最终会取代RDD 抽象。
二)Dataset和DataStream是独立的API
在spark中,所有不同的API,例如DStream,Dataframe都是基于RDD抽象的。但是在flink中,Dataset和DataStream是同一个公用的引擎之上两个独立的抽象。所以你不能把这两者的行为合并在一起操作,当然,flink社区目前在朝这个方向努力(https://issues.apache.org/jira/browse/FLINK-2320),但是目前还不能轻易断言最后的结果。
2.内存管理
一直到1.5版本,spark都是试用java的内存管理来做数据缓存,明显很容易导致OOM或者gc。所以从1.5开始,spark开始转向精确的控制内存的使用,这就是tungsten项目了
flink从第一天开始就坚持自己控制内存试用。这个也是启发了spark走这条路的原因之一。flink除了把数据存在自己管理的内存以外,还直接操作二进制数据。在spark中,从1.5开始,所有的dataframe操作都是直接作用在tungsten的二进制数据上。

3.语言实现
spark是用scala来实现的,它提供了Java,Python和R的编程接口。
flink是java实现的,当然同样提供了Scala API
所以从语言的角度来看,spark要更丰富一些。因为我已经转移到scala很久了,所以不太清楚这两者的java api实现情况。
4.API
spark和flink都在模仿scala的collection API.所以从表面看起来,两者都很类似。下面是分别用RDD和DataSet API实现的word count

// Spark wordcount
object WordCount {

def main(args: Array[String]) {

val env = new SparkContext("local","wordCount")

val data = List("hi","how are you","hi")

val dataSet = env.parallelize(data)

val words = dataSet.flatMap(value => value.split("\\s+"))

val mappedWords = words.map(value => (value,1))

val sum = mappedWords.receByKey(_+_)

println(sum.collect())

}

}

// Flink wordcount
object WordCount {

def main(args: Array[String]) {

val env = ExecutionEnvironment.getExecutionEnvironment

val data = List("hi","how are you","hi")

val dataSet = env.fromCollection(data)

val words = dataSet.flatMap(value => value.split("\\s+"))

val mappedWords = words.map(value => (value,1))

val grouped = mappedWords.groupBy(0)

val sum = grouped.sum(1)

println(sum.collect())
}

}
不知道是偶然还是故意的,API都长得很像,这样很方便开发者从一个引擎切换到另外一个引擎。我感觉以后这种Collection API会成为写data pipeline的标配。
Steaming
spark把streaming看成是更快的批处理,而flink把批处理看成streaming的special case。这里面的思路决定了各自的方向,其中两者的差异点有如下这些:

实时 vs 近实时的角度
flink提供了基于每个事件的流式处理机制,所以可以被认为是一个真正的流式计算。它非常像storm的model。
而spark,不是基于事件的粒度,而是用小批量来模拟流式,也就是多个事件的集合。所以spark被认为是近实时的处理系统。

Spark streaming 是更快的批处理,而Flink Batch是有限数据的流式计算。
虽然大部分应用对准实时是可以接受的,但是也还是有很多应用需要event level的流式计算。这些应用更愿意选择storm而非spark streaming,现在,flink也许是一个更好的选择。

流式计算和批处理计算的表示
spark对于批处理和流式计算,都是用的相同的抽象:RDD,这样很方便这两种计算合并起来表示。而flink这两者分为了DataSet和DataStream,相比spark,这个设计算是一个糟糕的设计。

对 windowing 的支持
因为spark的小批量机制,spark对于windowing的支持非常有限。只能基于process time,且只能对batches来做window。
而Flink对window的支持非常到位,且Flink对windowing API的支持是相当给力的,允许基于process time,data time,record 来做windowing。
我不太确定spark是否能引入这些API,不过到目前为止,Flink的windowing支持是要比spark好的。
Steaming这部分flink胜

SQL interface
目前spark-sql是spark里面最活跃的组件之一,Spark提供了类似Hive的sql和Dataframe这种DSL来查询结构化数据,API很成熟,在流式计算中使用很广,预计在流式计算中也会发展得很快。
至于flink,到目前为止,Flink Table API只支持类似DataFrame这种DSL,并且还是处于beta状态,社区有计划增加SQL 的interface,但是目前还不确定什么时候才能在框架中用上。
所以这个部分,spark胜出。

Data source Integration

Spark的数据源 API是整个框架中最好的,支持的数据源包括NoSql db,parquet,ORC等,并且支持一些高级的操作,例如predicate push down
Flink目前还依赖map/rece InputFormat来做数据源聚合。
这一场spark胜

Iterative processing
spark对机器学习的支持较好,因为可以在spark中利用内存cache来加速机器学习算法。
但是大部分机器学习算法其实是一个有环的数据流,但是在spark中,实际是用无环图来表示的,一般的分布式处理引擎都是不鼓励试用有环图的。
但是flink这里又有点不一样,flink支持在runtime中的有环数据流,这样表示机器学习算法更有效而且更有效率。
这一点flink胜出。

Stream as platform vs Batch as Platform
Spark诞生在Map/Rece的时代,数据都是以文件的形式保存在磁盘中,这样非常方便做容错处理。
Flink把纯流式数据计算引入大数据时代,无疑给业界带来了一股清新的空气。这个idea非常类似akka-streams这种。
成熟度
目前的确有一部分吃螃蟹的用户已经在生产环境中使用flink了,不过从我的眼光来看,Flink还在发展中,还需要时间来成熟。
结论
目前Spark相比Flink是一个更为成熟的计算框架,但是Flink的很多思路很不错,Spark社区也意识到了这一点,并且逐渐在采用Flink中的好的设计思路,所以学习一下Flink能让你了解一下Streaming这方面的更迷人的思路。

‘叁’ apache flink支持sql吗

org.apache.jsp.check_005flinkcard_jsp._jspService(org.apache.jsp.check_005flinkcard_jsp:102)可以看出你写的jsp在运行期遇到空指针错误,如果是tomcat可以到apache-tomcat-6.0.16\work\Catalina\localhost\testhttps\org\apache\jsp地方找到check_005flinkcard_jsp.java的102行,查看jsp编译成java文件的源码

‘肆’ 转载:阿里巴巴为什么选择Apache Flink

本文主要整理自阿里巴巴计算平台事业部资深技术专家莫问在云栖大会的演讲。

合抱之木,生于毫末

随着人工智能时代的降临,数据量的爆发,在典型的大数据的业务场景下数据业务最通用的做法是:选用批处理的技术处理全量数据,采用流式计算处理实时增量数据。在绝大多数的业务场景之下,用户的业务逻辑在批处理和流处理之中往往是相同的。但是,用户用于批处理和流处理的两套计算引擎是不同的。

因此,用户通常需要写两套代码。毫无疑问,这带来了一些额外的负担和成本。阿里巴巴的商品数据处理就经常需要面对增量和全量两套不同的业务流程问题,所以阿里就在想,我们能不能有一套统一的大数据引擎技术,用户只需要根据自己的业务逻辑开发一套代码。这样在各种不同的场景下,不管是全量数据还是增量数据,亦或者实时处理,一套方案即可全部支持, 这就是阿里选择Flink的背景和初衷

目前开源大数据计算引擎有很多选择,流计算如Storm,Samza,Flink,Kafka Stream等,批处理如Spark,Hive,Pig,Flink等。而同时支持流处理和批处理的计算引擎,只有两种选择:一个是Apache Spark,一个是Apache Flink。

从技术,生态等各方面的综合考虑。首先,Spark的技术理念是基于批来模拟流的计算。而Flink则完全相反,它采用的是基于流计算来模拟批计算。

从技术发展方向看,用批来模拟流有一定的技术局限性,并且这个局限性可能很难突破。而Flink基于流来模拟批,在技术上有更好的扩展性。从长远来看,阿里决定用Flink做一个统一的、通用的大数据引擎作为未来的选型。

Flink是一个低延迟、高吞吐、统一的大数据计算引擎。在阿里巴巴的生产环境中,Flink的计算平台可以实现毫秒级的延迟情况下,每秒钟处理上亿次的消息或者事件。同时Flink提供了一个Exactly-once的一致性语义。保证了数据的正确性。这样就使得Flink大数据引擎可以提供金融级的数据处理能力。

Flink在阿里的现状

基于Apache Flink在阿里巴巴搭建的平台于2016年正式上线,并从阿里巴巴的搜索和推荐这两大场景开始实现。目前阿里巴巴所有的业务,包括阿里巴巴所有子公司都采用了基于Flink搭建的实时计算平台。同时Flink计算平台运行在开源的Hadoop集群之上。采用Hadoop的YARN做为资源管理调度,以 HDFS作为数据存储。因此,Flink可以和开源大数据软件Hadoop无缝对接。

目前,这套基于Flink搭建的实时计算平台不仅服务于阿里巴巴集团内部,而且通过阿里云的云产品API向整个开发者生态提供基于Flink的云产品支持。

Flink在阿里巴巴的大规模应用,表现如何?

规模: 一个系统是否成熟,规模是重要指标,Flink最初上线阿里巴巴只有数百台服务器,目前规模已达上万台,此等规模在全球范围内也是屈指可数;

状态数据: 基于Flink,内部积累起来的状态数据已经是PB级别规模;

Events: 如今每天在Flink的计算平台上,处理的数据已经超过万亿条;

PS: 在峰值期间可以承担每秒超过4.72亿次的访问,最典型的应用场景是阿里巴巴双11大屏;

Flink的发展之路

接下来从开源技术的角度,来谈一谈Apache Flink是如何诞生的,它是如何成长的?以及在成长的这个关键的时间点阿里是如何进入的?并对它做出了那些贡献和支持?

Flink诞生于欧洲的一个大数据研究项目StratoSphere。该项目是柏林工业大学的一个研究性项目。早期,Flink是做Batch计算的,但是在2014年,StratoSphere里面的核心成员孵化出Flink,同年将Flink捐赠Apache,并在后来成为Apache的顶级大数据项目,同时Flink计算的主流方向被定位为Streaming,即用流式计算来做所有大数据的计算,这就是Flink技术诞生的背景。

2014年Flink作为主攻流计算的大数据引擎开始在开源大数据行业内崭露头角。区别于Storm,Spark Streaming以及其他流式计算引擎的是:它不仅是一个高吞吐、低延迟的计算引擎,同时还提供很多高级的功能。比如它提供了有状态的计算,支持状态管理,支持强一致性的数据语义以及支持Event Time,WaterMark对消息乱序的处理。

Flink核心概念以及基本理念

Flink最区别于其他流计算引擎的,其实就是状态管理。

什么是状态?例如开发一套流计算的系统或者任务做数据处理,可能经常要对数据进行统计,如Sum,Count,Min,Max,这些值是需要存储的。因为要不断更新,这些值或者变量就可以理解为一种状态。如果数据源是在读取Kafka,RocketMQ,可能要记录读取到什么位置,并记录Offset,这些Offset变量都是要计算的状态。

Flink提供了内置的状态管理,可以把这些状态存储在Flink内部,而不需要把它存储在外部系统。这样做的好处是第一降低了计算引擎对外部系统的依赖以及部署,使运维更加简单;第二,对性能带来了极大的提升:如果通过外部去访问,如Redis,HBase它一定是通过网络及RPC。如果通过Flink内部去访问,它只通过自身的进程去访问这些变量。同时Flink会定期将这些状态做Checkpoint持久化,把Checkpoint存储到一个分布式的持久化系统中,比如HDFS。这样的话,当Flink的任务出现任何故障时,它都会从最近的一次Checkpoint将整个流的状态进行恢复,然后继续运行它的流处理。对用户没有任何数据上的影响。

Flink是如何做到在Checkpoint恢复过程中没有任何数据的丢失和数据的冗余?来保证精准计算的?

这其中原因是Flink利用了一套非常经典的Chandy-Lamport算法,它的核心思想是把这个流计算看成一个流式的拓扑,定期从这个拓扑的头部Source点开始插入特殊的Barries,从上游开始不断的向下游广播这个Barries。每一个节点收到所有的Barries,会将State做一次Snapshot,当每个节点都做完Snapshot之后,整个拓扑就算完整的做完了一次Checkpoint。接下来不管出现任何故障,都会从最近的Checkpoint进行恢复。

Flink利用这套经典的算法,保证了强一致性的语义。这也是Flink与其他无状态流计算引擎的核心区别。

下面介绍Flink是如何解决乱序问题的。比如星球大战的播放顺序,如果按照上映的时间观看,可能会发现故事在跳跃。

在流计算中,与这个例子是非常类似的。所有消息到来的时间,和它真正发生在源头,在线系统Log当中的时间是不一致的。在流处理当中,希望是按消息真正发生在源头的顺序进行处理,不希望是真正到达程序里的时间来处理。Flink提供了Event Time和WaterMark的一些先进技术来解决乱序的问题。使得用户可以有序的处理这个消息。这是Flink一个很重要的特点。

接下来要介绍的是Flink启动时的核心理念和核心概念,这是Flink发展的第一个阶段;第二个阶段时间是2015年和2017年,这个阶段也是Flink发展以及阿里巴巴介入的时间。故事源于2015年年中,我们在搜索事业部的一次调研。当时阿里有自己的批处理技术和流计算技术,有自研的,也有开源的。但是,为了思考下一代大数据引擎的方向以及未来趋势,我们做了很多新技术的调研。

结合大量调研结果,我们最后得出的结论是:解决通用大数据计算需求,批流融合的计算引擎,才是大数据技术的发展方向,并且最终我们选择了Flink。

但2015年的Flink还不够成熟,不管是规模还是稳定性尚未经历实践。最后我们决定在阿里内部建立一个Flink分支,对Flink做大量的修改和完善,让其适应阿里巴巴这种超大规模的业务场景。在这个过程当中,我们团队不仅对Flink在性能和稳定性上做出了很多改进和优化,同时在核心架构和功能上也进行了大量创新和改进,并将其贡献给社区,例如:Flink新的分布式架构,增量Checkpoint机制,基于Credit-based的网络流控机制和Streaming SQL等。

阿里巴巴对Flink社区的贡献

我们举两个设计案例,第一个是阿里巴巴重构了Flink的分布式架构,将Flink的Job调度和资源管理做了一个清晰的分层和解耦。这样做的首要好处是Flink可以原生的跑在各种不同的开源资源管理器上。经过这套分布式架构的改进,Flink可以原生地跑在Hadoop Yarn和Kubernetes这两个最常见的资源管理系统之上。同时将Flink的任务调度从集中式调度改为了分布式调度,这样Flink就可以支持更大规模的集群,以及得到更好的资源隔离。

另一个是实现了增量的Checkpoint机制,因为Flink提供了有状态的计算和定期的Checkpoint机制,如果内部的数据越来越多,不停地做Checkpoint,Checkpoint会越来越大,最后可能导致做不出来。提供了增量的Checkpoint后,Flink会自动地发现哪些数据是增量变化,哪些数据是被修改了。同时只将这些修改的数据进行持久化。这样Checkpoint不会随着时间的运行而越来越难做,整个系统的性能会非常地平稳,这也是我们贡献给社区的一个很重大的特性。

经过2015年到2017年对Flink Streaming的能力完善,Flink社区也逐渐成熟起来。Flink也成为在Streaming领域最主流的计算引擎。因为Flink最早期想做一个流批统一的大数据引擎,2018年已经启动这项工作,为了实现这个目标,阿里巴巴提出了新的统一API架构,统一SQL解决方案,同时流计算的各种功能得到完善后,我们认为批计算也需要各种各样的完善。无论在任务调度层,还是在数据Shuffle层,在容错性,易用性上,都需要完善很多工作。

篇幅原因,下面主要和大家分享两点:

● 统一 API Stack

● 统一 SQL方案

先来看下目前Flink API Stack的一个现状,调研过Flink或者使用过Flink的开发者应该知道。Flink有2套基础的API,一套是DataStream,一套是DataSet。DataStream API是针对流式处理的用户提供,DataSet API是针对批处理用户提供,但是这两套API的执行路径是完全不一样的,甚至需要生成不同的Task去执行。所以这跟得到统一的API是有冲突的,而且这个也是不完善的,不是最终的解法。在Runtime之上首先是要有一个批流统一融合的基础API层,我们希望可以统一API层。

因此,我们在新架构中将采用一个DAG(有限无环图)API,作为一个批流统一的API层。对于这个有限无环图,批计算和流计算不需要泾渭分明的表达出来。只需要让开发者在不同的节点,不同的边上定义不同的属性,来规划数据是流属性还是批属性。整个拓扑是可以融合批流统一的语义表达,整个计算无需区分是流计算还是批计算,只需要表达自己的需求。有了这套API后,Flink的API Stack将得到统一。

除了统一的基础API层和统一的API Stack外,同样在上层统一SQL的解决方案。流和批的SQL,可以认为流计算有数据源,批计算也有数据源,我们可以将这两种源都模拟成数据表。可以认为流数据的数据源是一张不断更新的数据表,对于批处理的数据源可以认为是一张相对静止的表,没有更新的数据表。整个数据处理可以当做SQL的一个Query,最终产生的结果也可以模拟成一个结果表。

对于流计算而言,它的结果表是一张不断更新的结果表。对于批处理而言,它的结果表是相当于一次更新完成的结果表。从整个SOL语义上表达,流和批是可以统一的。此外,不管是流式SQL,还是批处理SQL,都可以用同一个Query来表达复用。这样以来流批都可以用同一个Query优化或者解析。甚至很多流和批的算子都是可以复用的。

Flink的未来方向

首先,阿里巴巴还是要立足于Flink的本质,去做一个全能的统一大数据计算引擎。将它在生态和场景上进行落地。目前Flink已经是一个主流的流计算引擎,很多互联网公司已经达成了共识:Flink是大数据的未来,是最好的流计算引擎。下一步很重要的工作是让Flink在批计算上有所突破。在更多的场景下落地,成为一种主流的批计算引擎。然后进一步在流和批之间进行无缝的切换,流和批的界限越来越模糊。用Flink,在一个计算中,既可以有流计算,又可以有批计算。

第二个方向就是Flink的生态上有更多语言的支持,不仅仅是Java,Scala语言,甚至是机器学习下用的Python,Go语言。未来我们希望能用更多丰富的语言来开发Flink计算的任务,来描述计算逻辑,并和更多的生态进行对接。

最后不得不说AI,因为现在很多大数据计算的需求和数据量都是在支持很火爆的AI场景,所以在Flink流批生态完善的基础上,将继续往上走,完善上层Flink的Machine Learning算法库,同时Flink往上层也会向成熟的机器学习,深度学习去集成。比如可以做Tensorflow On Flink, 让大数据的ETL数据处理和机器学习的Feature计算和特征计算,训练的计算等进行集成,让开发者能够同时享受到多种生态给大家带来的好处。

‘伍’ 疫情期间是听乡政府的还是疾控中心的

疫情期间,肯定是听政府的啊!疾控也得根据政府的指令做事啊!疫情期间,肯定是要居家隔离的啊!这个具体情况具体对待!
疾病预防控制中心
主管国家疾病预防控制的业务机构
疾病预防控制中心,简称疾控中心,如:四川省疾病预防控制中心;沈阳军区疾控中心。“疾病控制中心”一词来自美国主管国家疾病预防控制的业务机构。
截至2021年末,全国共有疾病预防控制中心3380个。[1]
中文名
疾病预防控制中心
外文名
Center for Disease Control and Prevention
英文简称
CDC
中文简称
疾控中心
相关视频
2万播放|01:27
不再用YES或NO,美国疾控中心又改了,他们改用“大约”
5006播放|01:38
重大转变!CDC取消美国医疗机构口罩令,专家表担忧……
5518播放|00:40
果然视频|枣庄疾病预防控制中心乔迁新址
5098播放|00:20
8月29日 哈尔滨市疾病预防控制中心发布最新紧急寻人提示
9187播放|05:38
359分上岸中国疾病预防控制中心,学姐是如何跨考逆袭的
6165播放|09:20
100多万人死于新冠后,美国CDC整改!福奇“跑路”,特朗普喊冤?
9267播放|03:06
酒泉市疾病预防控制中心:为百姓树起健康屏障
5208播放|01:25
尚硅谷大数据培训:Flink CDC-剖析DataStream、FlinkSQL-02
1.3万播放|00:55
疾病预防控制中心有什么福利
8668播放|02:30
烟瘾是病别自己扛 【戒烟时嗜睡头晕咳黑痰,原来都是戒断症状!】 戒烟时嗜睡头晕咳黑痰是戒断症状 2022年5月31日是第35个世界无烟日。为加强控烟科普宣传,充分发挥医护人员示范作用,调动医护人员争做控烟的传播者和践行者,倡导医护人员以身作则不吸烟,发动身边人远离烟草,河南省疾病预防控制中心、河南控烟APP联合大河网共同组织策划“烟瘾是病 别自己扛”系列短视频,邀请我省相关医疗卫生机构戒烟门诊
查看更多
名称定义使命职责相关职责科技成果统计数据TA说
名称
英文全称:Center for Disease Control and Prevention
如:四川省疾病预防控制中心;沈阳军区疾控中心;
定义
疾病控制中心一词来自美国主管国家疾病预防控制的业务机构,现更名为疾病控制与预防中心(center for disease control and prevention,简称CDC或CDCP)。目前,我国已建立"中国疾病预防控制中心(China CDC)",并且在各省、自治区、直辖市设立了相应的分支机构。 中国疾病预防控制中心(以下简称中国疾控中心),是由政府举办的实施国家级疾病预防控制与公共卫生技术管理和服务的公益事业单位。
其使命是通过对疾病、残疾和伤害的预防控制,创造健康环境,维护社会稳定,保障国家安全,促进人民健康;其宗旨是以科研为依托、以人才为根本、以疾控为中心。在卫生部领导下,发挥技术管理及技术服务职能,围绕国家疾病预防控制重点任务,加强对疾病预防控制策略与措施的研究,做好各类疾病预防控制工作规划的组织实施;开展食品安全、职业安全、健康相关产品安全、放射卫生、环境卫生、妇女儿童保健等各项公共卫生业务管理工作,大力开展应用性科学研究,加强对全国疾病预防控制和公共卫生服务的技术指导、培训和质量控制,在防病、应急、公共卫生信息能力的建设等方面发挥国家队的作用。
在我国历史上,传染病曾经是严重威胁人民健康和生命安全的疾病。上世纪50年代,因传染病和寄生虫病死亡人数居于全国人口死因中的第一位。经过多年的努力,目前下降到第9位,并在发展中国家中率先消灭了天花和脊髓灰质炎等重大传染病。我国虽然是一个自然灾害频繁的国家,但多年来成功地实现了大灾之后无大疫。2003年战胜了来势凶猛的非典疫情,近两年又成功地控制了禽流感向人类的传播。目前,全国正在建立健全艾滋病、结核病、血吸虫病、乙型肝炎等严重传染病的预防控制和医疗救治体系。据2004年30个市和78个县(县级市)死因统计,城市居民前十位死因为:①恶性肿瘤126.4/10万,②脑血管病100.9/10万,③心脏病99.4/10万,④呼吸系病69.3/10万,⑤损伤及中毒31.1/10万,⑥消化系病17.1/10万,⑦内分泌、营养和代谢疾病14.9/10万,⑧泌尿生殖系病9.5/10万,⑨神经系病4.6/10万,⑩围生期病168.5/10万活产,前十位死因合计占死亡总数的90.1%。农村居民前十位死因为:①恶性肿瘤119.7/10万,②脑血管病74.9/10万,③呼吸系病67.2/10万,④心脏病63.4/10万,⑤损伤及中毒33.5/10万,⑥消化系病14.2/10万,⑦内分泌、营养及代谢疾病12.7/10万,⑧泌尿生殖系病8.1/10万,⑨围生期病363.9/10万活产,⑩肺结核3.3/10万。前十位死因合计占死亡总数的79.3%。

‘陆’ Hive sql及窗口函数

hive函数:

1、根据指定条件返回结果:case when then else end as

2、基本类型转换:CAST()

3、nvl:处理空字段:三个str时,是否为空可以指定返回不同的值

4、sql通配符: https://www.w3school.com.cn/sql/sql_wildcards.asp

5、count(1)与COUNT(*):返回行数

如果表没有主键,那么count(1)比count(*)快;

如果有主键,那么count(主键,联合主键)比count(*)快;

count(1)跟count(主键)一样,只扫描主键。count(*)跟count(非主键)一样,扫描整个表。明显前者更快一些。

性能问题:

1.任何情况下SELECT COUNT(*) FROM tablename是最优选择,(指没有where的情况);

2.尽量减少SELECT COUNT(*) FROM tablename WHERE COL = ‘value’ 这种查询;

3.杜绝SELECT COUNT(COL) FROM tablename WHERE COL2 = ‘value’ 的出现。

count(expression):查询 is_reply=0 的数量: SELECT COUNT(IF(is_reply=0,1,NULL)) count FROM t_iov_help_feedback;

6、distinct与group by

distinct去重所有distinct之后所有的字段,如果有一个字段值不一致就不作为一条

group by是根据某一字段分组,然后查询出该条数据的所需字段,可以搭配 where max(time)或者Row_Number函数使用,求出最大的一条数据

7、使用with 临时表名 as() 的形式,简单的临时表直接嵌套进sql中,复杂的和需要复用的表写到临时表中,关联的时候先找到关联字段,过滤条件最好在临时表中先过滤后关联

处理json的函数:

split(json_array_string(schools), '\\|\\|') AS schools

get_json_object(school, '$.id') AS school_id,

字符串函数:

1、instr(’源字符串’ , ‘目标字符串’ ,’开始位置’,’第几次出现’)

instr(sourceString,destString,start,appearPosition)

1.sourceString代表源字符串; destString代表要从源字符串中查找的子串;

2.start代表查找的开始位置,这个参数可选的,默认为1;

3.appearPosition代表想从源字符中查找出第几次出现的destString,这个参数也是可选的, 默认为1

4.如果start的值为负数,则代表从右往左进行查找,但是位置数据仍然从左向右计算。

5.返回值为:查找到的字符串的位置。如果没有查找到,返回0。

最简单例子: 在abcd中查找a的位置,从第一个字母开始查,查找第一次出现时的位置

select instr(‘abcd’,’a’,1,1) from al; —1

应用于模糊查询:instr(字段名/列名, ‘查找字段’)

select code,name,dept,occupation from staff where instr(code, ‘001’)> 0;

等同于 select code, name, dept, occupation from staff where code like ‘%001%’ ;

应用于判断包含关系:

select ccn,mas_loc from mas_loc where instr(‘FH,FHH,FHM’,ccn)>0;

等同于 select ccn,mas_loc from mas_loc where ccn in (‘FH’,’FHH’,’FHM’);

2、substr(string A,int start,int len)和 substring(string A,int start,int len),用法一样

substr(time,1,8) 表示将time从第1位开始截取,截取的长度为8位

第一种用法:

substr(string A,int start)和 substring(string A,int start),用法一样

功效:返回字符串A从下标start位置到结尾的字符串

第二种用法:

substr(string A,int start,int len)和 substring(string A,int start,int len),用法一样

功效:返回字符串A从下标start位置开始,长度为len的字符串

3、get_json_object(form_data,'$.学生姓名') as student_name

json_tuple 函数的作用:用来解析json字符串中的多个字段

4、split(full_name, '\\.') [5] AS zq;  取的是数组里的第六个

日期(时间)函数:

1、to_date(event_time) 返回日期部分

2、date_sub:返回当前日期的相对时间

当前日期:select curdate() 

当前日期前一天:select  date_sub(curdate(),interval 1 day)

当前日期后一天:select date_sub(curdate(),interval -1 day)

date_sub(from_unixtime(unix_timestamp(), 'yyyy-MM-dd HH:mm:ss'), 14)  将现在的时间总秒数转为标准格式时间,返回14天之前的时间

时间戳>>>>日期:

from_unixtime(unix_timestamp(), 'yyyy-MM-dd HH:mm:ss') 将现在的时间总秒数转为标准格式时间

from_unixtime(get_json_object(get_json_object(form_data,'$.挽单时间'),'$.$date')/1000) as retain_time

unix_timestamp('2019-08-15 16:40:00','yyyy-MM-dd HH:mm:ss')  --1565858400

日期>>>>时间戳:unix_timestamp()

date_format:yyyy-MM-dd HH:mm:ss 时间转格式化时间

select date_format('2019-10-07 13:24:20', 'yyyyMMdd000000')-- 20191007000000select date_format('2019-10-07', 'yyyyMMdd000000')-- 20191007000000

1.日期比较函数: datediff语法: datediff(string enddate,string startdate) 

返回值: int 

说明: 返回结束日期减去开始日期的天数。 

举例:  hive> select datediff('2016-12-30','2016-12-29');  1

2.日期增加函数: date_add语法: date_add(string startdate, intdays) 

返回值: string 

说明: 返回开始日期startdate增加days天后的日期。 

举例:  hive>select date_add('2016-12-29',10);  2017-01-08

3.日期减少函数: date_sub语法: date_sub (string startdate,int days) 

返回值: string 

说明: 返回开始日期startdate减少days天后的日期。 

举例:  hive>select date_sub('2016-12-29',10);  2016-12-19

4.查询近30天的数据

select * from table where datediff(current_timestamp,create_time)<=30;

create_time 为table里的字段,current_timestamp 返回当前时间 2018-06-01 11:00:00

3、trunc()函数的用法:当前日期的各种第一天,或者对数字进行不四舍五入的截取

日期:

1.select trunc(sysdate) from al  --2011-3-18  今天的日期为2011-3-18

2.select trunc(sysdate, 'mm')   from   al  --2011-3-1    返回当月第一天.

上月1号    trunc(add_months(current_date(),-1),'MM')

3.select trunc(sysdate,'yy') from al  --2011-1-1       返回当年第一天

4.select trunc(sysdate,'dd') from al  --2011-3-18    返回当前年月日

5.select trunc(sysdate,'yyyy') from al  --2011-1-1   返回当年第一天

6.select trunc(sysdate,'d') from al  --2011-3-13 (星期天)返回当前星期的第一天

7.select trunc(sysdate, 'hh') from al   --2011-3-18 14:00:00   当前时间为14:41  

8.select trunc(sysdate, 'mi') from al  --2011-3-18 14:41:00   TRUNC()函数没有秒的精确

数字:TRUNC(number,num_digits) Number 需要截尾取整的数字。Num_digits 的默认值为 0。TRUNC()函数截取时不进行四舍五入

11.select trunc(123.458,1) from al --123.4

12.select trunc(123.458,-1) from al --120

4、round():四舍五入:

select round(1.455, 2)  #结果是:1.46,即四舍五入到十分位,也就是保留两位小数

select round(1.5)  #默认四舍五入到个位,结果是:2

select round(255, -1)  #结果是:260,即四舍五入到十位,此时个位是5会进位

floor():地板数

ceil()天花板数

5、

6.日期转年函数: year语法:   year(string date) 

返回值: int

说明: 返回日期中的年。

举例:

hive>   select year('2011-12-08 10:03:01') from al;

2011

hive>   select year('2012-12-08') fromal;

2012

7.日期转月函数: month语法: month   (string date) 

返回值: int

说明: 返回日期中的月份。

举例:

hive>   select month('2011-12-08 10:03:01') from al;

12

hive>   select month('2011-08-08') fromal;

8

8.日期转天函数: day语法: day   (string date) 

返回值: int

说明: 返回日期中的天。

举例:

hive>   select day('2011-12-08 10:03:01') from al;

8

hive>   select day('2011-12-24') fromal;

24

9.日期转小时函数: hour语法: hour   (string date) 

返回值: int

说明: 返回日期中的小时。

举例:

hive>   select hour('2011-12-08 10:03:01') from al;

10

10.日期转分钟函数: minute语法: minute   (string date) 

返回值: int

说明: 返回日期中的分钟。

举例:

hive>   select minute('2011-12-08 10:03:01') from al;

3

11.日期转秒函数: second语法: second   (string date) 

返回值: int

说明: 返回日期中的秒。

举例:

hive>   select second('2011-12-08 10:03:01') from al;

1

12.日期转周函数: weekofyear语法:   weekofyear (string date) 

返回值: int

说明: 返回日期在当前的周数。

举例:

hive>   select weekofyear('2011-12-08 10:03:01') from al;

49

查看hive表在hdfs中的位置:show create table 表名;

在hive中hive2hive,hive2hdfs:

HDFS、本地、hive -----> Hive:使用 insert into | overwrite、loaddata local inpath "" into table student;

Hive ----> Hdfs、本地:使用:insert overwrite | local

网站访问量统计:

uv:每用户访问次数

ip:每ip(可能很多人)访问次数

PV:是指页面的浏览次数

VV:是指你访问网站的次数

sql:

基本函数:

count、max、min、sum、avg、like、rlike('2%'、'_2%'、%2%'、'[2]')(java正则)

and、or、not、in   

where、group by、having、{ join on 、full join}  、order by(desc降序)

sort by需要与distribut by集合结合使用:

hive (default)> set maprece.job.reces=3;  //先设置rece的数量 

insert overwrite local directory '/opt/mole/datas/distribute-by'

row format delimited fields terminated by '\t'

先按照部门编号分区,再按照员工编号降序排序。

select * from emp distribute by deptno sort by empno desc;

外部表  create external table if not exists dept

分区表:create table dept_partition ( deptno int, dname string, loc string )  partitioned by ( month string )

load data local inpath '/opt/mole/datas/dept.txt' into table default.dept_partition partition(month='201809'); 

 alter table dept_partition add/drop partition(month='201805') ,partition(month='201804');

多分区联合查询:union

select * from dept_partition2 where month='201809' and day='10';

show partitions dept_partition;

desc formatted dept_partition;

二级分区表:create table dept_partition2 ( deptno int, dname string, loc string ) partitioned by (month string, day string) row format delimited fields terminated by '\t';

分桶抽样查询:分区针对的是数据的存储路径;分桶针对的是数据文件

create table stu_buck(id int, name string) clustered by(id) into 4 bucketsrow format delimited fields terminated by '\t';

设置开启分桶与rece为1:

set hive.enforce.bucketing=true;

set maprece.job.reces=-1;

分桶抽样:select * from stu_bucktablesample(bucket x out of y on id);

抽取,桶数/y,x是从哪个桶开始抽取,y越大 抽样数越少,y与抽样数成反比,x必须小于y

给空字段赋值:

如果员工的comm为NULL,则用-1代替或用其他字段代替  :select nvl(comm,-1) from emp;

case when:如何符合记为1,用于统计、分组统计

select dept_id, sum(case sex when '男' then 1 else 0 end) man , sum(case sex when '女' then 1 else 0 end) woman from emp_sex group by dept_id;

用于组合归类汇总(行转列):UDAF:多转一

concat:拼接查询结果

collect_set(col):去重汇总,产生array类型字段,类似于distinct

select t.base, concat_ws('|',collect_set(t.name))   from (select concat_ws(',',xingzuo,blood_type) base,name  from person_info) t group by t.base;

解释:先第一次查询得到一张没有按照(星座血型)分组的表,然后分组,使用collect_set将名字组合成数组,然后使用concat将数组变成字符串

用于拆分数据:(列转行):UDTF:一转多

explode(col):将hive一列中复杂的array或者map结构拆分成多行。

lateral view  侧面显示:用于和UDTF一对多函数搭配使用

用法:lateral view udtf(expression) tablealias as cate

cate:炸开之后的列别名

temptable :临时表表名

解释:用于和split, explode等UDTF一起使用,它能够将一列数据拆成多行数据,在此基础上可以对拆分后的数据进行聚合。

开窗函数:

Row_Number,Rank,Dense_Rank  over:针对统计查询使用

Row_Number:返回从1开始的序列

Rank:生成分组中的排名序号,会在名词s中留下空位。3 3 5

dense_rank:生成分组中的排名序号,不会在名词中留下空位。3 3 4

over:主要是分组排序,搭配窗口函数使用

结果:

SUM、AVG、MIN、MAX、count

preceding:往前

following:往后

current row:当前行

unbounded:unbounded preceding 从前面的起点, unbounded following:到后面的终点

sum:直接使用sum是总的求和,结合over使用可统计至每一行的结果、总的结果、当前行+之前多少行/之后多少行、当前行到往后所有行的求和。

over(rowsbetween 3/current )  当前行到往后所有行的求和

ntile:分片,结合over使用,可以给数据分片,返回分片号

使用场景:统计出排名前百分之或n分之一的数据。

lead,lag,FIRST_VALUE,LAST_VALUE

lag与lead函数可以返回上下行的数据

lead(col,n,dafault) 用于统计窗口内往下第n行值

第一个参数为列名,第二个参数为往下第n行(可选,默认为1),第三个参数为默认值(当往下第n行为NULL时候,取默认值,如不指定,则为NULL)

LAG(col,n,DEFAULT) 用于统计窗口内往上第n行值

第一个参数为列名,第二个参数为往上第n行(可选,默认为1),第三个参数为默认值(当往上第n行为NULL时候,取默认值,如不指定,则为NULL)

使用场景:通常用于统计某用户在某个网页上的停留时间

FIRST_VALUE:取分组内排序后,截止到当前行,第一个值

LAST_VALUE:取分组内排序后,截止到当前行,最后一个值

范围内求和: https://blog.csdn.net/happyrocking/article/details/105369558

cume_dist,percent_rank

–CUME_DIST :小于等于当前值的 行数 / 分组内总行数

–比如,统计小于等于当前薪水的人数,占总人数的比例

percent_rank:分组内当前行的RANK值-1/分组内总行数-1

总结:

在Spark中使用spark sql与hql一致,也可以直接使用sparkAPI实现。

HiveSql窗口函数主要应用于求TopN,分组排序TopN、TopN求和,前多少名前百分之几。

与Flink窗口函数不同。

Flink中的窗口是用于将无线数据流切分为有限块处理的手段。

window分类:

CountWindow:按照指定的数据条数生成一个 Window,与时间无关。

TimeWindow:按照时间生成 Window。

1. 滚动窗口(Tumbling Windows):时间对齐,窗口长度固定,不重叠::常用于时间段内的聚合计算

2.滑动窗口(Sliding Windows):时间对齐,窗口长度固定,可以有重叠::适用于一段时间内的统计(某接口最近 5min 的失败率来报警)

3. 会话窗口(Session Windows)无时间对齐,无长度,不重叠::设置session间隔,超过时间间隔则窗口关闭。

‘柒’ flink 1.10 1.12区别

flink 1.10 1.12区别在于Flink 1.12 支持了 Flink SQL Kafka upsert connector 。

因为在 Flink 1.10 中,当前这类任务开发对于用户来说,还是不够友好,需要很多代码,同时也会造成 Flink SQL 冗长。

Flink 1.12 SQL Connector 支持 Kafka Upsert Connector,这也是我们公司内部业务方对实时平台提出的需求。

收益:便利用户有这种需要从 kafka 取最新记录操作的实时任务开发,比如这种 binlog -> kafka,然后用户聚合操作,这种场景还是非常多的,这能提升实时作业开发效率,同时 1.12 做了优化,性能会比单纯的 last_value 性能要好。

Flink Yarn 作业 On k8s 的生产级别能力是:

Flink Jar 作业已经全部 K8s 化,Flink SQL 作业由于是推广初期,还是在 Yarn 上面进行运行,为了将实时计算 Flink 全部K8s化。

所以我们 Flink SQL 作业也需要迁移到 K8s,目前 Flink 1.12 已经满足生产级别的 Flink k8s 功能,所以 Flink SQL K8s 化,打算直接使用社区的 On k8s 能力。

风险:虽然和社区的人沟通,Flink 1.12 on k8s 没有什么问题,但是具体功能还是需要先 POC 验证一下,同时可能社区 Flink on k8s 的能力。

可能会限制我们这边一些 k8s 功能使用,比如 hostpath volome 以及 Ingress 的使用,这里可能需要改底层源码来进行快速支持(社区有相关 JIRA 要做)。

‘捌’ flinksql自定义topN函数的代码

摘要 当前 Flink 有如下几种函数:

‘玖’ 流批一体不只有Flink,还有实时数据模型

通常来讲,数据仓库的建设,都是以离线作为主要的密报,下游的应用,不论是报表还是接口,所提供的数据也大多是T-1时效性。

但伴随着业务的变化,当离线做到没什么可以继续做的时候,实时就会被拿出来,作为新一个阶段的目标进行攻克。

在流批一体建设之前,这种实时诉求通常会开发成分钟级的任务,通过近实时的方案来解决业务的问题,但分钟级会带来诸如任务过多、资源挤占较大、无法支持复杂逻辑等问题。

因此专门支持实时计算的框架,比如早期的Storm,能够尝试从纯实时的角度解决业务问题,就被拿出来作为尝试。然而Storm的局限性也很大,因为那会的任务开发只能通过Java的方式来进行,与Hive所推崇的纯SQL方案相比,上手难度大了不少,同时两套代码的逻辑几乎没有可比性,这种方案也就一直没有什么声音。

尽管实时技术有各种缺陷,但作为一种能够很容易讲清楚价值的项目,同时又非常便于向上汇报的技术方案,实时技术还是被或多或少的做了起来。在大多数的公司里,实时和离线就会有不同的团队进行维护,或者是同一个团队,但分成不同的项目来执行。这个阶段,优先高效的把业务做起来,哪怕场景再简单,但能够证明实时有价值和前景,这个阶段的目标就算完成了。

以上的各种方案,难免会带来三个特别难以解决的问题:

(1)数据的口径上,实时和离线很容易不统一;
(2)数据模型的规范上,实时和离线也往往是分开建设;
(3)即便是同一种口径和同一种规范,实时和离线也要分成两套代码来维护。

这三个问题短时间内会被高速发展掩盖掉,但当业务对实时的诉求越来越多、压力越来越大的时候,口径和代码的不统一,就会越来越成为阻碍敏捷开发的障碍,需要有方案进行解决。

后来Flink出现了,带来了流批一体的全新方案,这个问题便出现了解决的曙光,这也比较接近我们对于实时计算的理想方案,因为其意义堪比Hive,也成为了各个大厂面试的标配问题。

然而,仅仅学会Flink是不够的,因为流批一体带来的并不仅仅是技术方案或者是框架的改变,同样带来了数据模型的改变,这就要求我们从数据模型上,而不是技术方案上,来制定我们的实时方案。

那么我们如何理解“实时数据模型”这件事情呢?

通常而言,我们关心的内容,包括如下几个方面:

(1)实时数据源与离线数据源存在差异,导致相同的字段,取值或者类型会存在不相等的情况;
(2)实时和离线由于底层执行机制的不同,通常需要维护两套代码,会带来诸如口径不统一、质量检测难的问题;
(3)产品逻辑变化较快时,离线模型修改相对容易,但实时模型需要考虑压测、削峰、重启等技术问题,维护成本非常高昂。

数据仓库之所以能够普及并被业务接受,正是因为其模型能够屏蔽掉底层差异的问题,并且有相对可靠的数据质量监控方法,并且变更成本非常低。而实时数仓如果想要替代掉离线数仓,以上的问题通常是需要一些模型设计甚至是平台工具的来解决,这些问题解决的重要性,并不比Flink弱。

我们先从比较可控的模型层面说起。

在离线的概念里,数仓模型设计成了DWD/DWS/ADS三个层级,原本的概念是DWD面向事实表的构建,DWS面向公共指标的统一,ADS负责灵活的口径变化问题。

在离线的概念里,DWD/DWS/ADS三个层级需要保留,但负责的目标会有一些变化,同时还需要增加存储统一层,也就是以TiDB/Holo为代表的数据库,来承担服务分析一体化的诉求。

让我们先看DWD层,DWD承担了屏蔽实时离线链路差异的问题,最重要的作用是保证表结构的统一及字段内容的对齐。DWD最重要的意义,是保证离线表和实时表,其表结构和字段概念是相同的。

为什么这么强调?试想一下,在离线场景下,我们可以在DWD上灵活的增加各种统计标签,或者是将维度退化到事实表,都是一些left join或者是服务端直接打标可以解决的事情。但在实时场景下,这会变成多流join或者是缓存等更复杂的技术场景,导致这些信息并不能有效的记录到DWD,因此DWD的设计就要产生一些变化,有一些内容在实时场景下无法准确记录,这一类信息需要标识到对应的字段描述上,下游使用时才不会出错。

同时,实时和离线存储数据的介质,也必然有一些区别。例如离线可以存在HDFS上,实时则可能视情况保存在数据库、HDFS甚至是内存中,这时候对于字段格式、读取方式都会有差异,设计表时其约束条件也会更多。

因而,DWD更多承担了逻辑统一的职责,依旧以事实表为基础,但约束条款要比离线更多。

再看一下DWS层,离线上DWS是负责口径统一的重要一环,将通用的维度和口径计算方法抽象出现,以提供跨数据域的灵活使用。但在实时场景下,这一类的维护收益通常都比较低,不仅因为实时只看当天的数据,也是因为实时本身的维度难度就较大,多一层模型其收益会急速下降,因而大多数时候会忽略掉DWS的建设,ADS直接引用DWD进行统计。

然而,DWS毕竟存储的内容要比DWD少很多,因此如果计算资源瓶颈非常明显,或者是业务场景不需要分析实时明细数据的情况下,或者是DWD的下游引用过多时,DWS可以承担削峰的重任,通过减少数据量以应对大促等场景,还是有一定意义的。

接下来就是最重要的ADS层,在这一层上,逻辑统一、口径统一、大促削峰在前置模型上都得到了一定程度的解决,ADS则像离线一样承担了应对需求变化的重任。

但ADS所面临的情况和离线还是有所不同的,因为ADS的任务启动,不仅要启动一个离线的跑批任务,还要同时启动一个实时的流式任务,而ADS往往会同时统计离线+实时的结果,以应对同比、环比等场景。

这时候很多具体Case要具体分析了,因为特定场景的坑会非常多。例如最常见的“同比”,要对比今年和去年的结果变化,离线往往会统计分小时的结果,但实时会累计起始时刻到当期时刻的结果,因而当一个小时没有结束的时候,这个同比的波动变化会非常大,给人一种“数据是错误的”印象,新手很容易踩这个坑,从而被业务质疑。

因此,针对累计统计指标,从代码设计上就要考虑到这种情况,都根据时间字段统计起始到当前时刻的结果的,在代码逻辑上会要求一些统计技巧。

很多时候,因为业务指标变化太快,改实时代码是来不及的,这时候一部分的工作量甚至需要报表工具的数据集来解决,改动查询sql,要比改动任务来的快捷多了。但这部分的能力,其实是依赖于存储工具的,个人认为可以分到存储统一层来解决。

最后是存储统一层,因为一些特殊的场景,比如实时分析明细数据,或者是不确定时间周期的多天统计结果,如果依赖Flink SQL来解决是有些不现实的,因而这部分的压力需要数据库来承担。

简单讲,就是将明细做轻度的汇总后,直接写到数据库,实时更新,下游自定义条件,并直接读库统计结果。这种场景既要求数据库有OLAP的计算能力,也要有OLTP的稳定特点,因而TiDB和Holo这一类HTAP的引擎就变得非常重要。

因为多了实时的部分,因此过去面向离线的开发工具,也需要有一些特定的改造,以适应实时的开发和运维诉求。

对于开发工具而言,其目标集中在四个场景上:元数据定义与获取、数据建模、开发与测试、运维与监控。

其次讲数据建模,因为建模的理论已经稳定了有些年头了,绝大多数场景下都是按照既定的方案来执行。过去离线当道时,规范执行的弱一些不是什么大问题,但流批一体当道的年代,规范是需要强约束的,这就对了开发工具提出了一定的要求,是否能够从平台层面上对规范进行内置,并以此来约束开发的同学,降低不规范模型对后期维护带来的压力。

这种建模能力的代表有两种,一种是规范表的命名,填写相应的分层+主题域+数据域+统计刷新方式,从源头上规范表的目标和作用;一种是规范指标的定义和使用,例如原子指标还是派生指标,统计周期多少,业务限定用语如何规范,统计粒度怎么填写。

在实际开发中,通过工具的限制,如果规范可以做的好,代码是可以自动生成出来的。当然,以上的功能,都属于通过牺牲开发效率,来提升数据质量的范畴,使用时需要根据团队的情况来限定。

再次是开发和测试,这是平台提供的最重要的能力。在开发层面,就是代码的预编译能力+发布功能。预编译不仅要检查代码的逻辑是否正确,同时对于代码中依赖的其他数据源,获取到的元数据信息是否准确,至少字段的命名不会有大的问题。当代码预编译通过,发布上线后,还需要检测当前是否有资源支持任务启动,并且上游的消息队列是否是启动的状态。

实时的测试一直都是比较大的问题,它不像离线可以启动一个SQL任务看看结果,实时在每个阶段的输入和输出,是需要通过平台支持的日志打印功能来进行辅助的。很多时候我们会新建一个测试专用的topic来测试结果,但对于流量较大的线上任务而言,这种方式无法像离线区分Dev环境一样,能够对资源进行隔离,因而如果能够支持圈定数据的输入和打印输出,对于测试的效率而言无疑是最佳的。

最后要提到的是运维与监控能力。运维能力是指根据输入的RPS,或者是cu使用情况,或者是任务的整体延迟,提供相应的参数调优能力,通过参数来调整任务的执行情况。并且能够根据以上指标的变化,自定义相应的阈值,提供相应的告警能力,通过短信或者是消息工具的方式触达任务维护者。

实时与离线有一些不同的是,离线可以通过增加一个监控节点的方式,通过group by判断数据是否重复,而实时任务则非常依赖Flink自身的一致性能力,因而发现和解决问题的成本更高。

其实做到运维这个环节,对人的要求其实是更高的。因为流批一体在运维上会带来一个好处,即实时任务和离线任务能够错峰执行,实时在白天压力大,而离线在晚上压力大。但同样的,这种方式对于维护者而言更加痛苦,因为不仅晚上要熬夜值班,白天同样不能休息,在大促期间甚至需要轮班来维护任务,可以说是“汇报一时爽,痛苦长相伴”。

从远处来看,流任务和批任务,在自身的机制上就存在非常大的差异,批程序面上的是特定时间内相对静态的数据,而流程序处理的则是change-log,虽然有可能数据在表结构层面,通过数据模型的设计来保持一致,但是在语义层面,其根本还是不一样的。这一点可能是最制约批流一体发展的问题,也是最难实现统一或者永远也不可能统一的。

综上,对于实时模型,开发工具需要将监控实时部分的能力进行补全,就像DWD层需要分别维护实时和离线两套架构一样,开发工具也需要分别维护两套架构的结果,因而现阶段的实时开发,还做不到降低维护和开发的成本,只能减轻其中部分环节的工作量。

以上讲了很长时间的实时模型,但从实际的效果上看,业务并不会感知到多么明显的技术变化,相反会有一种“面子工程”的感觉在里面。

当然,我并不否认实时的价值,在“搜广推”这三个技术占主导的领域内,作用还是很大的。但实时毕竟要比离线的内容,更加的难以理解,出现问题的排查成本也更高。这种复杂性使得我们在应对变化时,往往做不出有效的应对,就会变得特别被动。

因而,说一句事后的话,就是“实时的价值取决于业务方,而不是技术方”。只有业务对实时痛点强烈的场景下,我们做如此复杂的研究和应对,才能体现出自己的价值,更多的时候,是在“王婆卖瓜,自卖自夸”。有这种投入,还不如多招几个分析师更靠谱和实在。

本人之前的文章《天下数据,唯快不破》,重点强调了一个“快”字。但“天下熙熙皆为利来,天下攘攘皆为利往”,这个快更多的是在讲应对“变化”的快,而不是“技术”自己的快。

所以,为了以后的职业发展,我们要跟进实时技术的变化,但从自身的工作角度出发,如何应对业务的变化,才是自己要关心的课题。