① Flink状态管理和恢复机制
1、什么是状态?
2、Flink状态类型有哪几种?
3、状态有什么作用?
4、如何使用状态,实现什么样的API?
5、什么是checkpoint与savepoint?
6、如何使用checkpoint与savepoint?
7、checkpoint原理是什么?
8、checkpint存储到hdfs上又是什么意思?
<1> 增量计算
聚合操作、机器学习训练模型迭代运算时保存当前模型等等
<2> 容错
Job故障重启、升级
定义: 某task或者operator 在某一时刻的在内存中的状态。
而checkpoint是,对于这个中间结果进行一次快照。
作用:State是可以被记录的,在失败的情况下可以恢复。
checkpoint则表示了一个Flink Job,在一个特定时刻的一份全局状态快照,即包含了一个job下所有task/operator某时刻的状态。
比如任务挂掉的时候或被手动停止的时候,可以从挂掉的点重新继续消费。
基本类型:Operator state、Keyed state
特殊的 Broadcast State
适用场景:
增量计算:
<1>聚合操作
<2>机器学习训练模型迭代运算时保存当前模型
等等
容错:
Job故障重启
使用状态,必须使用RichFunction,因为状态是使用RuntimeContext访问的,只能在RichFunction中访问
假设现在存在输入源数据格式为(EventID,Value)
输出数据,直接flatMap即可,无状态。
如果要输出某EventID最大值/最小值等,HashMap是否可以?
程序一旦Crash,如何恢复?
答案:Flink提供了一套状态保存的方法,不需要借助第三方存储系统来解决状态存储问题。
Operator State跟一个特定operator的一个并发实例绑定,整个operator只对应一个state。相比较而言,在一个operator上,可能有很多个key,从而对应多个keyed state。
所以一个并行度为4的source,即有4个实例,那么就会有4个状态
举例:Flink中的Kafka Connector,就使用了operator state。有几个并行度,就会有几个connector实例,消费的分区不一样,它会在每个connector实例中,保存该实例中消费topic的所有(partition,offset)映射。
数据结构:ListState<T>
一般编码过程:实现CheckpointedFunction接口,必须实现两个函数,分别是:
initializeState和snapshotState
如何保存状态?
通常是定义一个private transient ListState<Long> checkPointList;
注意:使用Operator State最好不要在keyBy之后使用,另外不要将太大的state存放到这个里面。
是基于KeyStream之上的状态,keyBy之后的Operator State。
那么,一个并行度为3的keyed Opreator有几个状态,这个就不一定是3了,这里有几个状态是由keyby之后有几个key所决定的。
案例:有一个事件流Tuple2[eventId,val],求不同的事件eventId下,相邻3个val的平均值,事件流如下:
(1,4),(2,3),(3,1),(1,2),(3,2),(1,2),(2,2),(2,9)
那么事件1:8/3=2
那么事件2:14/3=4
Keyed State的数据结构类型有:
ValueState<T>:update(T)
ListState<T>:add(T)、get(T)和clear(T)
RecingState<T>:add(T)、receFunction()
MapState<UK,UV>:put(UK,UV)、putAll(Map<UK,UV>)、get(UK)
FlatMapFunction是无状态函数;RichFlatMapFunction是有状态函数
这里没有实现CheckpointedFunction接口,而是直接调用方法 getRuntimeContext(),然后使用getState方法来获取状态值。
特殊场景: 来自一个流的一些数据需要广播到所有下游任务,在这些任务中,这些数据被本地存储并且用于处理另一个流上的所有处理元素 。例如:一个低吞吐量流,其中包含一组规则,我们希望对来自另一个流的所有元素按照规则进行计算
典型应用:常规事件流.connect(规则流)
常规事件流.connect(配置流)
<1> 创建常规事件流DataStream或者KeyedDataStream
<2> 创建BroadcastedStream:创建规则流/配置流(低吞吐)并广播
<3> 连接两个Stream并实现计算处理
process(可以是BroadcastProcessFunction 或者 KeyedBroadcastProcessFunction )
BroadcastProcessFunction:
processElement(...):负责处理非广播流中的传入元素
processBroadcastElement(...):负责处理广播流中的传入元素(如规则),一般广播流的元素添加到状态里去备用,processElement处理业务数据时就可以使用
ReadOnlyContext和Context:
ReadOnlyContext对Broadcast State只有只读权限,Conetxt有写权限
KeyedBroadcastProcessFunction:
注意:
<1> Flink之间没有跨Task的通信
<2> 每个任务的广播状态的元素顺序有可能不一样
<3> Broadcast State保存在内存中(并不在RocksDB)
② 转载:阿里巴巴为什么选择Apache Flink
本文主要整理自阿里巴巴计算平台事业部资深技术专家莫问在云栖大会的演讲。
合抱之木,生于毫末
随着人工智能时代的降临,数据量的爆发,在典型的大数据的业务场景下数据业务最通用的做法是:选用批处理的技术处理全量数据,采用流式计算处理实时增量数据。在绝大多数的业务场景之下,用户的业务逻辑在批处理和流处理之中往往是相同的。但是,用户用于批处理和流处理的两套计算引擎是不同的。
因此,用户通常需要写两套代码。毫无疑问,这带来了一些额外的负担和成本。阿里巴巴的商品数据处理就经常需要面对增量和全量两套不同的业务流程问题,所以阿里就在想,我们能不能有一套统一的大数据引擎技术,用户只需要根据自己的业务逻辑开发一套代码。这样在各种不同的场景下,不管是全量数据还是增量数据,亦或者实时处理,一套方案即可全部支持, 这就是阿里选择Flink的背景和初衷 。
目前开源大数据计算引擎有很多选择,流计算如Storm,Samza,Flink,Kafka Stream等,批处理如Spark,Hive,Pig,Flink等。而同时支持流处理和批处理的计算引擎,只有两种选择:一个是Apache Spark,一个是Apache Flink。
从技术,生态等各方面的综合考虑。首先,Spark的技术理念是基于批来模拟流的计算。而Flink则完全相反,它采用的是基于流计算来模拟批计算。
从技术发展方向看,用批来模拟流有一定的技术局限性,并且这个局限性可能很难突破。而Flink基于流来模拟批,在技术上有更好的扩展性。从长远来看,阿里决定用Flink做一个统一的、通用的大数据引擎作为未来的选型。
Flink是一个低延迟、高吞吐、统一的大数据计算引擎。在阿里巴巴的生产环境中,Flink的计算平台可以实现毫秒级的延迟情况下,每秒钟处理上亿次的消息或者事件。同时Flink提供了一个Exactly-once的一致性语义。保证了数据的正确性。这样就使得Flink大数据引擎可以提供金融级的数据处理能力。
Flink在阿里的现状
基于Apache Flink在阿里巴巴搭建的平台于2016年正式上线,并从阿里巴巴的搜索和推荐这两大场景开始实现。目前阿里巴巴所有的业务,包括阿里巴巴所有子公司都采用了基于Flink搭建的实时计算平台。同时Flink计算平台运行在开源的Hadoop集群之上。采用Hadoop的YARN做为资源管理调度,以 HDFS作为数据存储。因此,Flink可以和开源大数据软件Hadoop无缝对接。
目前,这套基于Flink搭建的实时计算平台不仅服务于阿里巴巴集团内部,而且通过阿里云的云产品API向整个开发者生态提供基于Flink的云产品支持。
Flink在阿里巴巴的大规模应用,表现如何?
规模: 一个系统是否成熟,规模是重要指标,Flink最初上线阿里巴巴只有数百台服务器,目前规模已达上万台,此等规模在全球范围内也是屈指可数;
状态数据: 基于Flink,内部积累起来的状态数据已经是PB级别规模;
Events: 如今每天在Flink的计算平台上,处理的数据已经超过万亿条;
PS: 在峰值期间可以承担每秒超过4.72亿次的访问,最典型的应用场景是阿里巴巴双11大屏;
Flink的发展之路
接下来从开源技术的角度,来谈一谈Apache Flink是如何诞生的,它是如何成长的?以及在成长的这个关键的时间点阿里是如何进入的?并对它做出了那些贡献和支持?
Flink诞生于欧洲的一个大数据研究项目StratoSphere。该项目是柏林工业大学的一个研究性项目。早期,Flink是做Batch计算的,但是在2014年,StratoSphere里面的核心成员孵化出Flink,同年将Flink捐赠Apache,并在后来成为Apache的顶级大数据项目,同时Flink计算的主流方向被定位为Streaming,即用流式计算来做所有大数据的计算,这就是Flink技术诞生的背景。
2014年Flink作为主攻流计算的大数据引擎开始在开源大数据行业内崭露头角。区别于Storm,Spark Streaming以及其他流式计算引擎的是:它不仅是一个高吞吐、低延迟的计算引擎,同时还提供很多高级的功能。比如它提供了有状态的计算,支持状态管理,支持强一致性的数据语义以及支持Event Time,WaterMark对消息乱序的处理。
Flink核心概念以及基本理念
Flink最区别于其他流计算引擎的,其实就是状态管理。
什么是状态?例如开发一套流计算的系统或者任务做数据处理,可能经常要对数据进行统计,如Sum,Count,Min,Max,这些值是需要存储的。因为要不断更新,这些值或者变量就可以理解为一种状态。如果数据源是在读取Kafka,RocketMQ,可能要记录读取到什么位置,并记录Offset,这些Offset变量都是要计算的状态。
Flink提供了内置的状态管理,可以把这些状态存储在Flink内部,而不需要把它存储在外部系统。这样做的好处是第一降低了计算引擎对外部系统的依赖以及部署,使运维更加简单;第二,对性能带来了极大的提升:如果通过外部去访问,如Redis,HBase它一定是通过网络及RPC。如果通过Flink内部去访问,它只通过自身的进程去访问这些变量。同时Flink会定期将这些状态做Checkpoint持久化,把Checkpoint存储到一个分布式的持久化系统中,比如HDFS。这样的话,当Flink的任务出现任何故障时,它都会从最近的一次Checkpoint将整个流的状态进行恢复,然后继续运行它的流处理。对用户没有任何数据上的影响。
Flink是如何做到在Checkpoint恢复过程中没有任何数据的丢失和数据的冗余?来保证精准计算的?
这其中原因是Flink利用了一套非常经典的Chandy-Lamport算法,它的核心思想是把这个流计算看成一个流式的拓扑,定期从这个拓扑的头部Source点开始插入特殊的Barries,从上游开始不断的向下游广播这个Barries。每一个节点收到所有的Barries,会将State做一次Snapshot,当每个节点都做完Snapshot之后,整个拓扑就算完整的做完了一次Checkpoint。接下来不管出现任何故障,都会从最近的Checkpoint进行恢复。
Flink利用这套经典的算法,保证了强一致性的语义。这也是Flink与其他无状态流计算引擎的核心区别。
下面介绍Flink是如何解决乱序问题的。比如星球大战的播放顺序,如果按照上映的时间观看,可能会发现故事在跳跃。
在流计算中,与这个例子是非常类似的。所有消息到来的时间,和它真正发生在源头,在线系统Log当中的时间是不一致的。在流处理当中,希望是按消息真正发生在源头的顺序进行处理,不希望是真正到达程序里的时间来处理。Flink提供了Event Time和WaterMark的一些先进技术来解决乱序的问题。使得用户可以有序的处理这个消息。这是Flink一个很重要的特点。
接下来要介绍的是Flink启动时的核心理念和核心概念,这是Flink发展的第一个阶段;第二个阶段时间是2015年和2017年,这个阶段也是Flink发展以及阿里巴巴介入的时间。故事源于2015年年中,我们在搜索事业部的一次调研。当时阿里有自己的批处理技术和流计算技术,有自研的,也有开源的。但是,为了思考下一代大数据引擎的方向以及未来趋势,我们做了很多新技术的调研。
结合大量调研结果,我们最后得出的结论是:解决通用大数据计算需求,批流融合的计算引擎,才是大数据技术的发展方向,并且最终我们选择了Flink。
但2015年的Flink还不够成熟,不管是规模还是稳定性尚未经历实践。最后我们决定在阿里内部建立一个Flink分支,对Flink做大量的修改和完善,让其适应阿里巴巴这种超大规模的业务场景。在这个过程当中,我们团队不仅对Flink在性能和稳定性上做出了很多改进和优化,同时在核心架构和功能上也进行了大量创新和改进,并将其贡献给社区,例如:Flink新的分布式架构,增量Checkpoint机制,基于Credit-based的网络流控机制和Streaming SQL等。
阿里巴巴对Flink社区的贡献
我们举两个设计案例,第一个是阿里巴巴重构了Flink的分布式架构,将Flink的Job调度和资源管理做了一个清晰的分层和解耦。这样做的首要好处是Flink可以原生的跑在各种不同的开源资源管理器上。经过这套分布式架构的改进,Flink可以原生地跑在Hadoop Yarn和Kubernetes这两个最常见的资源管理系统之上。同时将Flink的任务调度从集中式调度改为了分布式调度,这样Flink就可以支持更大规模的集群,以及得到更好的资源隔离。
另一个是实现了增量的Checkpoint机制,因为Flink提供了有状态的计算和定期的Checkpoint机制,如果内部的数据越来越多,不停地做Checkpoint,Checkpoint会越来越大,最后可能导致做不出来。提供了增量的Checkpoint后,Flink会自动地发现哪些数据是增量变化,哪些数据是被修改了。同时只将这些修改的数据进行持久化。这样Checkpoint不会随着时间的运行而越来越难做,整个系统的性能会非常地平稳,这也是我们贡献给社区的一个很重大的特性。
经过2015年到2017年对Flink Streaming的能力完善,Flink社区也逐渐成熟起来。Flink也成为在Streaming领域最主流的计算引擎。因为Flink最早期想做一个流批统一的大数据引擎,2018年已经启动这项工作,为了实现这个目标,阿里巴巴提出了新的统一API架构,统一SQL解决方案,同时流计算的各种功能得到完善后,我们认为批计算也需要各种各样的完善。无论在任务调度层,还是在数据Shuffle层,在容错性,易用性上,都需要完善很多工作。
篇幅原因,下面主要和大家分享两点:
● 统一 API Stack
● 统一 SQL方案
先来看下目前Flink API Stack的一个现状,调研过Flink或者使用过Flink的开发者应该知道。Flink有2套基础的API,一套是DataStream,一套是DataSet。DataStream API是针对流式处理的用户提供,DataSet API是针对批处理用户提供,但是这两套API的执行路径是完全不一样的,甚至需要生成不同的Task去执行。所以这跟得到统一的API是有冲突的,而且这个也是不完善的,不是最终的解法。在Runtime之上首先是要有一个批流统一融合的基础API层,我们希望可以统一API层。
因此,我们在新架构中将采用一个DAG(有限无环图)API,作为一个批流统一的API层。对于这个有限无环图,批计算和流计算不需要泾渭分明的表达出来。只需要让开发者在不同的节点,不同的边上定义不同的属性,来规划数据是流属性还是批属性。整个拓扑是可以融合批流统一的语义表达,整个计算无需区分是流计算还是批计算,只需要表达自己的需求。有了这套API后,Flink的API Stack将得到统一。
除了统一的基础API层和统一的API Stack外,同样在上层统一SQL的解决方案。流和批的SQL,可以认为流计算有数据源,批计算也有数据源,我们可以将这两种源都模拟成数据表。可以认为流数据的数据源是一张不断更新的数据表,对于批处理的数据源可以认为是一张相对静止的表,没有更新的数据表。整个数据处理可以当做SQL的一个Query,最终产生的结果也可以模拟成一个结果表。
对于流计算而言,它的结果表是一张不断更新的结果表。对于批处理而言,它的结果表是相当于一次更新完成的结果表。从整个SOL语义上表达,流和批是可以统一的。此外,不管是流式SQL,还是批处理SQL,都可以用同一个Query来表达复用。这样以来流批都可以用同一个Query优化或者解析。甚至很多流和批的算子都是可以复用的。
Flink的未来方向
首先,阿里巴巴还是要立足于Flink的本质,去做一个全能的统一大数据计算引擎。将它在生态和场景上进行落地。目前Flink已经是一个主流的流计算引擎,很多互联网公司已经达成了共识:Flink是大数据的未来,是最好的流计算引擎。下一步很重要的工作是让Flink在批计算上有所突破。在更多的场景下落地,成为一种主流的批计算引擎。然后进一步在流和批之间进行无缝的切换,流和批的界限越来越模糊。用Flink,在一个计算中,既可以有流计算,又可以有批计算。
第二个方向就是Flink的生态上有更多语言的支持,不仅仅是Java,Scala语言,甚至是机器学习下用的Python,Go语言。未来我们希望能用更多丰富的语言来开发Flink计算的任务,来描述计算逻辑,并和更多的生态进行对接。
最后不得不说AI,因为现在很多大数据计算的需求和数据量都是在支持很火爆的AI场景,所以在Flink流批生态完善的基础上,将继续往上走,完善上层Flink的Machine Learning算法库,同时Flink往上层也会向成熟的机器学习,深度学习去集成。比如可以做Tensorflow On Flink, 让大数据的ETL数据处理和机器学习的Feature计算和特征计算,训练的计算等进行集成,让开发者能够同时享受到多种生态给大家带来的好处。
③ Flink 有状态的流的工作(Working with state)
有状态的函数和操作在处理各个元素或者事件时存储数据,使得state称为任何类型的复杂操作的关键构建部件,例如:
当一个应用程序搜索某些特定的事件模式时,状态会保存截止到目前为止遇到过的事件的顺序;
当每分钟聚合事件时,状态会保存挂起的聚合
当通过数据点来训练机器学习模型时,状态会保存当前版本的模型参数
为了使state容错,Flink需要识别state并 checkpoint 它, 在许多情况下,Flink还管理着应用程序的状态,这意味着Flink处理内存管理(如果需要,可能会将内存中的数据溢出到磁盘)来保存非常大的state。
这篇文档介绍了在开发应用程序时如何使用Flink的state 抽象概念。
在Flink中有两个基本的state:Keyed state和 Operator state
Keyed State 总是与key相关,并且只能应用于 KeyedStream 的函数和操作中。
你可以认为 Keyed State 是一个已经分区或者划分的,每个state分区对应一个key的 Operator State , 每个 keyed-state 逻辑上与一个<并行操作实例, 键>( <parallel-operator-instance, key> )绑定在一起,由于每个key属于唯一一个键控算子( keyed operator )的并行实例,我们可以简单地看作是 <operator, key> 。
Keyed State 可以进一步的组成 Key Group , Key Group 是Flink重新分配 Keyed State 的最小单元,这里有跟定义的最大并行数一样多的 Key Group ,在运行时 keyed operator 的并行实例与key一起为一个或者多个 Key Group 工作。
使用 Operator State (或者非键控的state)的话,每个算子状态绑定到一个并行算子实例中。 Kafka Connector 就是在Flink中使用 Operator State 的一个很好的例子,每个 Kafka consumer 的并行实例保存着一个 topic 分区和偏移量的map作为它的 Operator State 。
当并行数发生变化时, Operator State 接口支持在并行操作实例中进行重新分配,这里有多种方法来进行重分配。
Keyed State和 Operator State存在两种形式:托管的和原生的。
托管的State( Managed State )由Flink运行时控制的数据结构表示, 例如内部哈希表或者RocksDB,例子是"ValueSate", "ListState"等。Flink运行时会对State编码并将它们写入checkpoint中。
原生State( Raw State )是算子保存它们自己的数据结构的state,当checkpoint时,它们仅仅写一串byte数组到checkpoint中。Flink并不知道State的数据结构,仅能看到原生的byte数组。
所有的数据流函数都可以使用托管state,但是原生state接口只能在实现operator时才能使用。使用托管State(而不是原生state)被推荐使用是因为使用托管state,当并行度发生变化时,Flink可以自动地重新分配state,同时还能更好地管理内存。
托管的键控state接口可以访问所有当前输入元素的key范围内的不同类型的state,这也就意味着这种类型的state只能被通过stream.keyBy(...)创建的KeyedStream使用。
现在我们首先来看一下可用的不同类型的state,然后在看它们是如何在程序中使用的,可用State的原形如下:
ValueState<T>:这里保存了一个可以更新和检索的值(由上述输入元素的key所限定,所以一个操作的每个key可能有一个值)。这个值可以使用 update(T) 来更新,使用 T value() 来获取。
ListState<T>:这个保存了一个元素列表,你可以追加元素以及获取一个囊括当前所保存的所有元素的 Iterable ,元素可以通过调用 add(T) 来添加,而 Iterable 可以调用 Iterable<T> get() 来获取。
RecingState<T>:这个保存了表示添加到state的所值的聚合的当个值,这个接口与ListState类似,只是调用add(T)添加的元素使用指定的ReceFunction聚合成一个聚合值。
FoldingState<T, ACC>:这将保存表示添加到状态的所有值的聚合的单个值,与RecingState相反,聚合的数据类型可能跟添加到State的元素的数据类型不同,这个接口与ListState类似,只是调用add(T)添加的元素使用指定的FoldFunction折叠成一个聚合值。
MapState<T>:这个保存了一个映射列表,你可以添加key-value对到state中并且检索一个包含所有当前保存的映射的Iterable。映射可以使用 put(UK, UV) 或者 putAll(Map<UK, UV>) 来添加。与key相关的value,可以使用 get(UK) 来获取,映射的迭代、keys及values可以分别调用 entries() , keys() 和 values() 来获取。
所有类型的state都有一个 clear() 方法来清除当前活动的key(及输入元素的key)的State。
注意: FoldingState会在下一个Flink版本中启用,并在将来的版本中彻底删除,将提供更加一般的替代方案。
值得注意的是这些State对象仅用于与State进行接口,State并不只保存在内存中,也可能会在磁盘中或者其他地方,第二个需要注意的是从State中获取的值依赖于输入元素的key,因此如果涉及的key不同,那么在一次调用用户函数中获得的值可能与另一次调用的值不同。
为了获得一个State句柄,你需要创建一个 StateDescriptor ,这个 StateDescriptor 保存了state的名称(接下来我们会讲到,你可以创建若干个state,但是它们必须有唯一的值以便你能够引用它们),State保存的值的类型以及用户自定义函数如:一个 ReceFunction 。根据你想要检索的state的类型,你可以创建一个 ValueStateDescriptor , 一个 ListStateDescriptor , 一个 RecingStateDescriptor , 一个 FoldingStateDescriptor 或者一个 MapStateDescriptor
State可以通过 RuntimeContext 来访问,所以只能在富函数中使用。 RichFunction 中的 RuntimeContext 有以下这些方法来访问state:
ValueState<T> getState(ValueStateDescriptor<T>)
RecingState<T> getRecingState(ReceingStateDescriptor<T>)
ListState<T> getListState(ListStateDescriptor<T>)
FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC>)
MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV>)
这个 FlatMapFunction 例子展示了所有部件如何组合在一起:
这个例子实现了一个简单的计数器,我们使用元组的第一个字段来进行分组(这个例子中,所有的key都是1),这个函数将计数和运行时总和保存在一个ValueState中,一旦计数大于2,就会发出平均值并清理state,因此我们又从0开始。请注意,如果我们在第一个字段中具有不同值的元组,则这将为每个不同的输入键保持不同的state值。
除了上述接口之外,Scala API还具有快捷方式在KeyedStream上通过有状态的 map() 或 flatMap() 函数获取当个ValueState, 用户定义的Function以一个Option形式来获取ValueState的当前值,并且必须返回一个更新的值来更新State。
为了使用托管的算子State,有状态的函数可以实现更加通用的CheckpointedFunction接口或者ListCheckpoint<T extends Serializable>接口
CheckpointedFunction接口可以通过不同的重分区模式来访问非键控的state,它需要实现两个方法:
无论何时执行checkpoint, snapshotState() 都会被调用,相应地,每次初始化用户定义的函数时,都会调用对应的 initializeState() ,当函数首次初始化时,或者当该函数实际上是从较早的检查点进行恢复时调用的。鉴于此, initializeState() 不仅是不同类型的状态被初始化的地方,而且还是state恢复逻辑的地方。
目前列表式托管算子状态是支持的,State要求是一个可序列化的彼此独立的列表,因此可以在重新调整后重新分配,换句话说,这些对象是可重新分配的非键控state的最小粒度。根据状态的访问方法,定义了一下重分配方案:
Even-split redistribution :每个算子返回一个State元素列表,
Union redistribution :每个算子返回一个State元素列表,
④ Flink 状态编程
1.流式计算分为无状态和有状态两种情况。 无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收水位数据,并在水位超过指定高度时发出警告。有状态的计算则会基于多个事件输出结果。以下是一些例子。
(1)所有类型的窗口。例如,计算过去一小时的平均水位,就是有状态的计算。
(2)所有用于复杂事件处理的状态机。例如,若在一分钟内收到两个相差20cm以上的水位差读数,则发出警告,这是有状态的计算。
(3)流与流之间的所有关联操作,以及流与静态表或动态表之间的关联操作,都是有状态的计算。
2.下图展示了无状态流处理和有状态流处理的主要区别。 无状态流处理分别接收每条数据记录(图中的黑条),然后根据最新输入的数据生成输出数据(白条)。有状态流处理会维护状态(根据每条输入记录进行更新),并基于最新输入的记录和当前的状态值生成输出记录(灰条)。
上图中输入数据由黑条表示。无状态流处理每次只转换一条输入记录,并且仅根据最新的输入记录输出结果(白条)。有状态 流处理维护所有已处理记录的状态值,并根据每条新输入的记录更新状态,因此输出记录(灰条)反映的是综合考虑多个事件之后的结果。
3.有状态的算子和应用程序
Flink内置的很多算子,数据源source,数据存储sink都是有状态的,流中的数据都是buffer records,会保存一定的元素或者元数据。例如: ProcessWindowFunction会缓存输入流的数据,ProcessFunction会保存设置的定时器信息等等。
在Flink中,状态始终与特定算子相关联。总的来说,有两种类型的状态:
算子状态(operator state)
键控状态(keyed state)
4.算子状态(operator state)
算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问。
Flink为算子状态提供三种基本数据结构:
列表状态(List state):将状态表示为一组数据的列表。
联合列表状态(Union list state):也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。
广播状态(Broadcast state):如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态
5.键控状态(keyed state)
键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。Keyed State很类似于一个分布式的key-value map数据结构,只能用于KeyedStream(keyBy算子处理之后)。
6.状态后端(state backend)
每传入一条数据,有状态的算子任务都会读取和更新状态。由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务都会在本地维护其状态,以确保快速的状态访问。状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(state backend)
状态后端主要负责两件事:
1)本地的状态管理
2)将检查点(checkpoint)状态写入远程存储
状态后端分类:
(1)MemoryStateBackend
内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在TaskManager的JVM堆上;而将checkpoint存储在JobManager的内存中。
(2)FsStateBackend
将checkpoint存到远程的持久化文件系统(FileSystem)上。而对于本地状态,跟MemoryStateBackend一样,也会存在TaskManager的JVM堆上。
(3)RocksDBStateBackend
将所有状态序列化后,存入本地的RocksDB中存储。
7.状态一致性
当在分布式系统中引入状态时,自然也引入了一致性问题。一致性实际上是"正确性级别"的另一种说法,也就是说在成功处理故障并恢复之后得到的结果,与没有发生任何故障时得到的结果相比,前者到底有多正确?举例来说,假设要对最近一小时登录的用户计数。在系统经历故障之后,计数结果是多少?如果有偏差,是有漏掉的计数还是重复计数?
1)一致性级别
在流处理中,一致性可以分为3个级别:
(1) at-most-once : 这其实是没有正确性保障的委婉说法——故障发生之后,计数结果可能丢失。同样的还有udp。
(2) at-least-once : 这表示计数结果可能大于正确值,但绝不会小于正确值。也就是说,计数程序在发生故障后可能多算,但是绝不会少算。
(3) exactly-once : 这指的是系统保证在发生故障后得到的计数结果与正确值一致。
曾经,at-least-once非常流行。第一代流处理器(如Storm和Samza)刚问世时只保证at-least-once,原因有二。
(1)保证exactly-once的系统实现起来更复杂。这在基础架构层(决定什么代表正确,以及exactly-once的范围是什么)和实现层都很有挑战性。
(2)流处理系统的早期用户愿意接受框架的局限性,并在应用层想办法弥补(例如使应用程序具有幂等性,或者用批量计算层再做一遍计算)。
最先保证exactly-once的系统(Storm Trident和Spark Streaming)在性能和表现力这两个方面付出了很大的代价。为了保证exactly-once,这些系统无法单独地对每条记录运用应用逻辑,而是同时处理多条(一批)记录,保证对每一批的处理要么全部成功,要么全部失败。这就导致在得到结果前,必须等待一批记录处理结束。因此,用户经常不得不使用两个流处理框架(一个用来保证exactly-once,另一个用来对每个元素做低延迟处理),结果使基础设施更加复杂。曾经,用户不得不在保证exactly-once与获得低延迟和效率之间权衡利弊。Flink避免了这种权衡。
Flink的一个重大价值在于, 它既保证了 exactly-once ,也具有低延迟和高吞吐的处理能力 。
从根本上说,Flink通过使自身满足所有需求来避免权衡,它是业界的一次意义重大的技术飞跃。尽管这在外行看来很神奇,但是一旦了解,就会恍然大悟。
2)端到端(end-to-end)状态一致性
目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在 Flink 流处理器内部保证的;而在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如 Kafka)和输出到持久化系统。
端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性,整个端到端的一致性级别取决于所有组件中一致性最弱的组件。具体可以划分如下:
1)source端 —— 需要外部源可重设数据的读取位置
2)link内部 —— 依赖checkpoint
3)sink端 —— 需要保证从故障恢复时,数据不会重复写入外部系统
而对于sink端,又有两种具体的实现方式:幂等(Idempotent)写入和事务性(Transactional)写入。
4)幂等写入
所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。
5)事务写入
需要构建事务来写入外部系统,构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中。
对于事务性写入,具体又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)。
8.检查点(checkpoint)
Flink具体如何保证exactly-once呢? 它使用一种被称为"检查点"(checkpoint)的特性,在出现故障时将系统重置回正确状态。下面通过简单的类比来解释检查点的作用。
9.Flink+Kafka如何实现端到端的exactly-once语义
我们知道,端到端的状态一致性的实现,需要每一个组件都实现,对于Flink + Kafka的数据管道系统(Kafka进、Kafka出)而言,各组件怎样保证exactly-once语义呢?
1)内部 —— 利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性
2)source —— kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性
3)sink —— kafka procer作为sink,采用两阶段提交 sink,需要实现一个TwoPhaseCommitSinkFunction
内部的checkpoint机制我们已经有了了解,那source和sink具体又是怎样运行的呢?接下来我们逐步做一个分析。
我们知道Flink由JobManager协调各个TaskManager进行checkpoint存储,checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存。
当checkpoint 启动时,JobManager 会将检查点分界线(barrier)注入数据流;barrier会在算子间传递下去。
每个算子会对当前的状态做个快照,保存到状态后端。对于source任务而言,就会把当前的offset作为状态保存起来。下次从checkpoint恢复时,source任务可以重新提交偏移量,从上次保存的位置开始重新消费数据。
每个内部的transform 任务遇到 barrier 时,都会把状态存到 checkpoint 里。
sink 任务首先把数据写入外部 kafka,这些数据都属于预提交的事务(还不能被消费);当遇到 barrier 时,把状态保存到状态后端,并开启新的预提交事务。
当所有算子任务的快照完成,也就是这次的 checkpoint 完成时,JobManager 会向所有任务发通知,确认这次 checkpoint 完成。
当sink 任务收到确认通知,就会正式提交之前的事务,kafka 中未确认的数据就改为“已确认”,数据就真正可以被消费了。
所以我们看到,执行过程实际上是一个两段式提交,每个算子执行完成,会进行“预提交”,直到执行完sink操作,会发起“确认提交”,如果执行失败,预提交会放弃掉。
具体的两阶段提交步骤总结如下:
1)第一条数据来了之后,开启一个 kafka 的事务(transaction),正常写入 kafka 分区日志但标记为未提交,这就是“预提交”
2)触发 checkpoint 操作,barrier从 source 开始向下传递,遇到 barrier 的算子将状态存入状态后端,并通知jobmanager
3)sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据
4)jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成
5)sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据
6)外部kafka关闭事务,提交的数据可以正常消费了。
⑤ Flink的类加载器解析
在运行 Flink 应用程序时,JVM 会随着时间的推移加载各种类。 这些类可以根据它们的来源分为三组:
作为一般规则,无论何时您先启动 Flink 进程然后再提交作业,作业的类都会动态加载。 如果 Flink 进程与作业/应用程序一起启动,或者如果应用程序产生 Flink 组件(JobManager、TaskManager 等),那么所有作业的类都在 Java 类路径中。
插件组件中的代码由每个插件的专用类加载器动态加载一次。
以下是有关不同部署模式的更多详细信息:
当作为独立会话启动 Flink 集群时,JobManagers 和 TaskManagers 使用 Java 类路径中的 Flink 框架类启动。 针对会话(通过 REST / CLI)提交的所有作业/应用程序中的类都是动态加载的。
Docker / Kubernetes 设置首先启动一组 JobManagers / TaskManagers,然后通过 REST 或 CLI 提交作业/应用程序,其行为类似于独立会话:Flink 的代码位于 Java 类路径中,插件组件和作业代码在启动时动态加载。
YARN 类加载在单个作业部署和会话之间有所不同:
当直接向 YARN 提交 Flink 作业/应用程序时(通过 bin/flink run -m yarn-cluster ...),将为该作业启动专用的 TaskManager 和 JobManager。 这些 JVM 在 Java 类路径中具有用户代码类。 这意味着在这种情况下,作业不涉及动态类加载。
当启动一个 YARN 会话时,JobManagers 和 TaskManagers 是用 classpath 中的 Flink 框架类启动的。 针对会话提交的所有作业的类都是动态加载的。
在涉及动态类加载的设置中(插件组件、会话设置中的 Flink 作业),通常有两个类加载器的层次结构:(1)Java 的应用程序类加载器,它包含类路径中的所有类,以及(2)动态插件/ 用户代码类加载器。 用于从插件或用户代码 jar 加载类。 动态 ClassLoader 将应用程序类加载器作为其父级。
默认情况下,Flink 反转类加载顺序,这意味着它首先查看动态类加载器,如果类不是动态加载代码的一部分,则仅查看父类(应用程序类加载器)。
反向类加载的好处是插件和作业可以使用与 Flink 核心本身不同的库版本,这在不同版本的库不兼容时非常有用。 该机制有助于避免常见的依赖冲突错误,如 IllegalAccessError 或 NoSuchMethodError。 代码的不同部分只是具有单独的类副本(Flink 的核心或其依赖项之一可以使用与用户代码或插件代码不同的副本)。 在大多数情况下,这运行良好,不需要用户进行额外配置。
但是,在某些情况下,反向类加载会导致问题(请参阅下文,“X cannot be cast to X”)。 对于用户代码类加载,您可以通过在 Flink 配置中通过 classloader.resolve-order 将 ClassLoader 解析顺序配置为 parent-first(从 Flink 的默认 child-first)来恢复到 Java 的默认模式。
请注意,某些类总是以父级优先的方式解析(首先通过父类加载器),因为它们在 Flink 的核心和插件/用户代码或面向插件/用户代码的 API 之间共享。 这些类的包是通过 classloader.parent-first-patterns-default 和 classloader.parent-first-patterns-additional 配置的。 要添加父级优先加载的新包,请设置 classloader.parent-first-patterns-additional 配置选项。
所有组件(JobManger、TaskManager、Client、ApplicationMaster 等)在启动时记录它们的类路径设置。 它们可以作为日志开头的环境信息的一部分找到。
当运行 JobManager 和 TaskManagers 专用于一项特定作业的设置时,可以将用户代码 JAR 文件直接放入 /lib 文件夹中,以确保它们是类路径的一部分而不是动态加载。
通常将作业的 JAR 文件放入 /lib 目录中。 JAR 将成为类路径(AppClassLoader)和动态类加载器(FlinkUserCodeClassLoader)的一部分。 因为 AppClassLoader 是 FlinkUserCodeClassLoader 的父级(并且 Java 加载父级,默认情况下),这应该导致类只加载一次。
对于无法将作业的 JAR 文件放入 /lib 文件夹的设置(例如因为安装程序是由多个作业使用的会话),仍然可以将公共库放入 /lib 文件夹,并避免动态为那些类进行加载。
在某些情况下,转换函数、源或接收器需要手动加载类(通过反射动态加载)。 为此,它需要能够访问作业类的类加载器。
在这种情况下,函数(或源或接收器)可以成为 RichFunction(例如 RichMapFunction 或 RichWindowFunction)并通过 getRuntimeContext().getUserCodeClassLoader() 访问用户代码类加载器。
在使用动态类加载的设置中,您可能会看到 com.foo.X cannot be cast to com.foo.X 样式中的异常。 这意味着 com.foo.X 类的多个版本已被不同的类加载器加载,并且该类的类型试图相互分配。
一个常见的原因是库与 Flink 的反向类加载方法不兼容。 您可以关闭反向类加载来验证这一点(在 Flink 配置中设置 classloader.resolve-order: parent-first)或从反向类加载中排除库(在 Flink 配置中设置 classloader.parent-first-patterns-additional)。
另一个原因可能是缓存对象实例,如 Apache Avro 之类的某些库或通过注册(例如通过 Guava 的 Interners)生成的对象实例。 这里的解决方案是要么在没有任何动态类加载的情况下进行设置,要么确保相应的库完全是动态加载代码的一部分。 后者意味着该库不能被添加到 Flink 的 /lib 文件夹中,而必须是应用程序的 fat-jar/uber-jar 的一部分
所有涉及动态用户代码类加载(会话)的场景都依赖于再次卸载类。 类卸载意味着垃圾收集器发现类中不存在任何对象,因此删除该类(代码、静态变量、元数据等)。
每当 TaskManager 启动(或重新启动)一个任务时,它将加载该特定任务的代码。 除非可以卸载类,否则这将成为内存泄漏,因为加载了新版本的类,并且加载的类总数会随着时间的推移而累积。 这通常通过 OutOfMemoryError: Metaspace 表现出来。
类泄漏的常见原因和建议的修复:
卸载动态加载类的一个有用工具是用户代码类加载器释放钩子。 这些是在卸载类加载器之前执行的钩子。 通常建议关闭和卸载资源作为常规函数生命周期的一部分(通常是 close() 方法)。 但在某些情况下(例如对于静态字段),最好在不再需要类加载器时卸载。
类加载器释放钩子可以通过 RuntimeContext.() 方法注册。
从应用程序开发人员的角度解决依赖冲突的一种方法是通过隐藏它们来避免暴露依赖关系。
Apache Maven 提供了 maven-shade-plugin,它允许在编译后更改类的包(因此您编写的代码不受阴影影响)。 例如,如果您的用户代码 jar 中有来自 aws sdk 的 com.amazonaws 包,则 shade 插件会将它们重新定位到 org.myorg.shaded.com.amazonaws 包中,以便您的代码调用您的 aws sdk 版本。
注意 Flink 的大部分依赖,比如 guava、netty、jackson 等,都被 Flink 的维护者屏蔽掉了,所以用户通常不用担心。
⑥ Flink基础系列28-Flink容错机制
在执行流应用程序期间,Flink 会定期保存状态的一致检查点
如果发生故障, Flink 将会使用最近的检查点来一致恢复应用程序的状态,并重新启动处理流程
(如图中所示,7这个数据被source读到了,准备传给奇数流时,奇数流宕机了,数据传输发生中断)
遇到故障之后,第一步就是重启应用
(重启后,起初流都是空的)
第二步是从 checkpoint 中读取状态,将状态重置
(读取在远程仓库(Storage,这里的仓库指状态后端保存数据指定的三种方式之一)保存的状态)
从检查点重新启动应用程序后,其内部状态与检查点完成时的状态完全相同
第三步:开始消费并处理检查点到发生故障之间的所有数据
这种检查点的保存和恢复机制可以为应用程序状态提供“精确一次”(exactly-once)的一致性,因为所有算子都会保存检查点并恢复其所有状态,这样一来所有的输入流就都会被重置到检查点完成时的位置
(这里要求source源也能记录状态,回退到读取数据7的状态,kafka有相应的偏移指针能完成该操作)
概述
checkpoint和Watermark一样,都会以广播的形式告诉所有下游。
具体讲解
JobManager 会向每个 source 任务发送一条带有新检查点 ID 的消息,通过这种方式来启动检查点
(这个带有新检查点ID的东西为barrier,由图中三角型表示,数值2只是ID)
数据源将它们的状态写入检查点,并发出一个检查点barrier
状态后端在状态存入检查点之后,会返回通知给source任务,source任务就会向JobManager确认检查点完成
上图,在Source端接受到barrier后,将自己此身的3 和 4 的数据的状态写入检查点,且向JobManager发送checkpoint成功的消息,然后向下游分别发出一个检查点 barrier
可以看出在Source接受barrier时,数据流也在不断的处理,不会进行中断
此时的偶数流已经处理完蓝2变成了4,但是还没处理到黄4,只是下游sink发送了一个数据4,而奇数流已经处理完蓝3变成了8(黄1+蓝1+黄3+蓝3),并向下游sink发送了8
此时检查点barrier都还未到Sum_odd奇数流和Sum_even偶数流
分界线对齐:barrier向下游传递,sum任务会等待所有输入分区的barrier到达
对于barrier已经达到的分区,继续到达的数据会被缓存
而barrier尚未到达的分区,数据会被正常处理
此时蓝色流的barrier先一步抵达了偶数流,黄色的barrier还未到,但是因为数据的不中断一直处理,此时的先到的蓝色的barrier会将此时的偶数流的数据4进行缓存处理,流接着处理接下来的数据等待着黄色的barrier的到来,而黄色barrier之前的数据将会对缓存的数据相加
这次处理的总结:分界线对齐:barrier 向下游传递,sum 任务会等待所有输入分区的 barrier 到达,对于barrier已经到达的分区,继续到达的数据会被缓存。而barrier尚未到达的分区,数据会被正常处理
当收到所有输入分区的 barrier 时,任务就将其状态保存到状态后端的检查点中,然后将 barrier 继续向下游转发
当蓝色的barrier和黄色的barrier(所有分区的)都到达后,进行状态保存到远程仓库,然后对JobManager发送消息,说自己的检查点保存完毕了
此时的偶数流和奇数流都为8
向下游转发检查点 barrier 后,任务继续正常的数据处理
Sink 任务向 JobManager 确认状态保存到 checkpoint 完毕
当所有任务都确认已成功将状态保存到检查点时,检查点就真正完成了
CheckPoint为自动保存,SavePoint为手动保存
有状态的流处理,内部每个算子任务都可以有自己的状态
对于流处理器内部来说,所谓的状态一致性,其实就是我们所说的计算结果要保证准确。
一条数据不应该丢失,也不应该重复计算
在遇到故障时可以恢复状态,恢复以后的重新计算,结果应该也是完全正确的。
Flink的一个重大价值在于,它既保证了exactly-once,也具有低延迟和高吞吐的处理能力。
Flink使用了一种轻量级快照机制——检查点(checkpoint)来保证exactly-once语义
有状态流应用的一致检查点,其实就是:所有任务的状态,在某个时间点的一份备份(一份快照)。而这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时间。
应用状态的一致检查点,是Flink故障恢复机制的核心
端到端(end-to-end)状态一致性
目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在Flink流处理器内部保证的;而在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如Kafka)和输出到持久化系统
端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性
整个端到端的一致性级别取决于所有组件中一致性最弱的组件
端到端 exactly-once
幂等写入
所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。
(中间可能会存在不正确的情况,只能保证最后结果正确。比如5=>10=>15=>5=>10=>15,虽然最后是恢复到了15,但是中间有个恢复的过程,如果这个过程能够被读取,就会出问题。)
事务写入
预写日志(Write-Ahead-Log,WAL)
把结果数据先当成状态保存,然后在收到checkpoint完成的通知时,一次性写入sink系统
简单易于实现,由于数据提前在状态后端中做了缓存,所以无论什么sink系统,都能用这种方式一批搞定
DataStream API提供了一个模版类:GenericWriteAheadSink,来实现这种事务性sink
两阶段提交(Two-Phase-Commit,2PC)
对于每个checkpoint,sink任务会启动一个事务,并将接下来所有接收到的数据添加到事务里
然后将这些数据写入外部sink系统,但不提交它们——这时只是"预提交"
这种方式真正实现了exactly-once,它需要一个提供事务支持的外部sink系统。Flink提供了TwoPhaseCommitSinkFunction接口
不同Source和Sink的一致性保证
内部——利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性
source——kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重制偏移量,重新消费数据,保证一致性
sink——kafka procer作为sink,采用两阶段提交sink,需要实现一个TwoPhaseCommitSinkFunction
Exactly-once 两阶段提交
JobManager 协调各个 TaskManager 进行 checkpoint 存储
checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存
当 checkpoint 启动时,JobManager 会将检查点分界线(barrier)注入数据流
barrier会在算子间传递下去
每个算子会对当前的状态做个快照,保存到状态后端
checkpoint 机制可以保证内部的状态一致性
每个内部的 transform 任务遇到 barrier 时,都会把状态存到 checkpoint 里
sink 任务首先把数据写入外部 kafka,这些数据都属于预提交的事务;遇到 barrier 时,把状态保存到状态后端,并开启新的预提交事务
(barrier之前的数据还是在之前的事务中没关闭事务,遇到barrier后的数据另外新开启一个事务)
当所有算子任务的快照完成,也就是这次的 checkpoint 完成时,JobManager 会向所有任务发通知,确认这次 checkpoint 完成
sink 任务收到确认通知,正式提交之前的事务,kafka 中未确认数据改为“已确认”
Exactly-once 两阶段提交步骤总结
⑦ 五种大数据处理架构
五种大数据处理架构
大数据是收集、整理、处理大容量数据集,并从中获得见解所需的非传统战略和技术的总称。虽然处理数据所需的计算能力或存储容量早已超过一台计算机的上限,但这种计算类型的普遍性、规模,以及价值在最近几年才经历了大规模扩展。
本文将介绍大数据系统一个最基本的组件:处理框架。处理框架负责对系统中的数据进行计算,例如处理从非易失存储中读取的数据,或处理刚刚摄入到系统中的数据。数据的计算则是指从大量单一数据点中提取信息和见解的过程。
下文将介绍这些框架:
· 仅批处理框架:
Apache Hadoop
· 仅流处理框架:
Apache Storm
Apache Samza
· 混合框架:
Apache Spark
Apache Flink
大数据处理框架是什么?
处理框架和处理引擎负责对数据系统中的数据进行计算。虽然“引擎”和“框架”之间的区别没有什么权威的定义,但大部分时候可以将前者定义为实际负责处理数据操作的组件,后者则可定义为承担类似作用的一系列组件。
例如Apache Hadoop可以看作一种以MapRece作为默认处理引擎的处理框架。引擎和框架通常可以相互替换或同时使用。例如另一个框架Apache Spark可以纳入Hadoop并取代MapRece。组件之间的这种互操作性是大数据系统灵活性如此之高的原因之一。
虽然负责处理生命周期内这一阶段数据的系统通常都很复杂,但从广义层面来看它们的目标是非常一致的:通过对数据执行操作提高理解能力,揭示出数据蕴含的模式,并针对复杂互动获得见解。
为了简化这些组件的讨论,我们会通过不同处理框架的设计意图,按照所处理的数据状态对其进行分类。一些系统可以用批处理方式处理数据,一些系统可以用流方式处理连续不断流入系统的数据。此外还有一些系统可以同时处理这两类数据。
在深入介绍不同实现的指标和结论之前,首先需要对不同处理类型的概念进行一个简单的介绍。
批处理系统
批处理在大数据世界有着悠久的历史。批处理主要操作大容量静态数据集,并在计算过程完成后返回结果。
批处理模式中使用的数据集通常符合下列特征…
· 有界:批处理数据集代表数据的有限集合
· 持久:数据通常始终存储在某种类型的持久存储位置中
· 大量:批处理操作通常是处理极为海量数据集的唯一方法
批处理非常适合需要访问全套记录才能完成的计算工作。例如在计算总数和平均数时,必须将数据集作为一个整体加以处理,而不能将其视作多条记录的集合。这些操作要求在计算进行过程中数据维持自己的状态。
需要处理大量数据的任务通常最适合用批处理操作进行处理。无论直接从持久存储设备处理数据集,或首先将数据集载入内存,批处理系统在设计过程中就充分考虑了数据的量,可提供充足的处理资源。由于批处理在应对大量持久数据方面的表现极为出色,因此经常被用于对历史数据进行分析。
大量数据的处理需要付出大量时间,因此批处理不适合对处理时间要求较高的场合。
Apache Hadoop
Apache Hadoop是一种专用于批处理的处理框架。Hadoop是首个在开源社区获得极大关注的大数据框架。基于谷歌有关海量数据处理所发表的多篇论文与经验的Hadoop重新实现了相关算法和组件堆栈,让大规模批处理技术变得更易用。
新版Hadoop包含多个组件,即多个层,通过配合使用可处理批数据:
· HDFS:HDFS是一种分布式文件系统层,可对集群节点间的存储和复制进行协调。HDFS确保了无法避免的节点故障发生后数据依然可用,可将其用作数据来源,可用于存储中间态的处理结果,并可存储计算的最终结果。
· YARN:YARN是Yet Another Resource Negotiator(另一个资源管理器)的缩写,可充当Hadoop堆栈的集群协调组件。该组件负责协调并管理底层资源和调度作业的运行。通过充当集群资源的接口,YARN使得用户能在Hadoop集群中使用比以往的迭代方式运行更多类型的工作负载。
· MapRece:MapRece是Hadoop的原生批处理引擎。
批处理模式
Hadoop的处理功能来自MapRece引擎。MapRece的处理技术符合使用键值对的map、shuffle、rece算法要求。基本处理过程包括:
· 从HDFS文件系统读取数据集
· 将数据集拆分成小块并分配给所有可用节点
· 针对每个节点上的数据子集进行计算(计算的中间态结果会重新写入HDFS)
· 重新分配中间态结果并按照键进行分组
· 通过对每个节点计算的结果进行汇总和组合对每个键的值进行“Recing”
· 将计算而来的最终结果重新写入 HDFS
优势和局限
由于这种方法严重依赖持久存储,每个任务需要多次执行读取和写入操作,因此速度相对较慢。但另一方面由于磁盘空间通常是服务器上最丰富的资源,这意味着MapRece可以处理非常海量的数据集。同时也意味着相比其他类似技术,Hadoop的MapRece通常可以在廉价硬件上运行,因为该技术并不需要将一切都存储在内存中。MapRece具备极高的缩放潜力,生产环境中曾经出现过包含数万个节点的应用。
MapRece的学习曲线较为陡峭,虽然Hadoop生态系统的其他周边技术可以大幅降低这一问题的影响,但通过Hadoop集群快速实现某些应用时依然需要注意这个问题。
围绕Hadoop已经形成了辽阔的生态系统,Hadoop集群本身也经常被用作其他软件的组成部件。很多其他处理框架和引擎通过与Hadoop集成也可以使用HDFS和YARN资源管理器。
总结
Apache Hadoop及其MapRece处理引擎提供了一套久经考验的批处理模型,最适合处理对时间要求不高的非常大规模数据集。通过非常低成本的组件即可搭建完整功能的Hadoop集群,使得这一廉价且高效的处理技术可以灵活应用在很多案例中。与其他框架和引擎的兼容与集成能力使得Hadoop可以成为使用不同技术的多种工作负载处理平台的底层基础。
流处理系统
流处理系统会对随时进入系统的数据进行计算。相比批处理模式,这是一种截然不同的处理方式。流处理方式无需针对整个数据集执行操作,而是对通过系统传输的每个数据项执行操作。
· 流处理中的数据集是“无边界”的,这就产生了几个重要的影响:
· 完整数据集只能代表截至目前已经进入到系统中的数据总量。
· 工作数据集也许更相关,在特定时间只能代表某个单一数据项。
处理工作是基于事件的,除非明确停止否则没有“尽头”。处理结果立刻可用,并会随着新数据的抵达继续更新。
流处理系统可以处理几乎无限量的数据,但同一时间只能处理一条(真正的流处理)或很少量(微批处理,Micro-batch Processing)数据,不同记录间只维持最少量的状态。虽然大部分系统提供了用于维持某些状态的方法,但流处理主要针对副作用更少,更加功能性的处理(Functional processing)进行优化。
功能性操作主要侧重于状态或副作用有限的离散步骤。针对同一个数据执行同一个操作会或略其他因素产生相同的结果,此类处理非常适合流处理,因为不同项的状态通常是某些困难、限制,以及某些情况下不需要的结果的结合体。因此虽然某些类型的状态管理通常是可行的,但这些框架通常在不具备状态管理机制时更简单也更高效。
此类处理非常适合某些类型的工作负载。有近实时处理需求的任务很适合使用流处理模式。分析、服务器或应用程序错误日志,以及其他基于时间的衡量指标是最适合的类型,因为对这些领域的数据变化做出响应对于业务职能来说是极为关键的。流处理很适合用来处理必须对变动或峰值做出响应,并且关注一段时间内变化趋势的数据。
Apache Storm
Apache Storm是一种侧重于极低延迟的流处理框架,也许是要求近实时处理的工作负载的最佳选择。该技术可处理非常大量的数据,通过比其他解决方案更低的延迟提供结果。
流处理模式
Storm的流处理可对框架中名为Topology(拓扑)的DAG(Directed Acyclic Graph,有向无环图)进行编排。这些拓扑描述了当数据片段进入系统后,需要对每个传入的片段执行的不同转换或步骤。
拓扑包含:
· Stream:普通的数据流,这是一种会持续抵达系统的无边界数据。
· Spout:位于拓扑边缘的数据流来源,例如可以是API或查询等,从这里可以产生待处理的数据。
· Bolt:Bolt代表需要消耗流数据,对其应用操作,并将结果以流的形式进行输出的处理步骤。Bolt需要与每个Spout建立连接,随后相互连接以组成所有必要的处理。在拓扑的尾部,可以使用最终的Bolt输出作为相互连接的其他系统的输入。
Storm背后的想法是使用上述组件定义大量小型的离散操作,随后将多个组件组成所需拓扑。默认情况下Storm提供了“至少一次”的处理保证,这意味着可以确保每条消息至少可以被处理一次,但某些情况下如果遇到失败可能会处理多次。Storm无法确保可以按照特定顺序处理消息。
为了实现严格的一次处理,即有状态处理,可以使用一种名为Trident的抽象。严格来说不使用Trident的Storm通常可称之为Core Storm。Trident会对Storm的处理能力产生极大影响,会增加延迟,为处理提供状态,使用微批模式代替逐项处理的纯粹流处理模式。
为避免这些问题,通常建议Storm用户尽可能使用Core Storm。然而也要注意,Trident对内容严格的一次处理保证在某些情况下也比较有用,例如系统无法智能地处理重复消息时。如果需要在项之间维持状态,例如想要计算一个小时内有多少用户点击了某个链接,此时Trident将是你唯一的选择。尽管不能充分发挥框架与生俱来的优势,但Trident提高了Storm的灵活性。
Trident拓扑包含:
· 流批(Stream batch):这是指流数据的微批,可通过分块提供批处理语义。
· 操作(Operation):是指可以对数据执行的批处理过程。
优势和局限
目前来说Storm可能是近实时处理领域的最佳解决方案。该技术可以用极低延迟处理数据,可用于希望获得最低延迟的工作负载。如果处理速度直接影响用户体验,例如需要将处理结果直接提供给访客打开的网站页面,此时Storm将会是一个很好的选择。
Storm与Trident配合使得用户可以用微批代替纯粹的流处理。虽然借此用户可以获得更大灵活性打造更符合要求的工具,但同时这种做法会削弱该技术相比其他解决方案最大的优势。话虽如此,但多一种流处理方式总是好的。
Core Storm无法保证消息的处理顺序。Core Storm为消息提供了“至少一次”的处理保证,这意味着可以保证每条消息都能被处理,但也可能发生重复。Trident提供了严格的一次处理保证,可以在不同批之间提供顺序处理,但无法在一个批内部实现顺序处理。
在互操作性方面,Storm可与Hadoop的YARN资源管理器进行集成,因此可以很方便地融入现有Hadoop部署。除了支持大部分处理框架,Storm还可支持多种语言,为用户的拓扑定义提供了更多选择。
总结
对于延迟需求很高的纯粹的流处理工作负载,Storm可能是最适合的技术。该技术可以保证每条消息都被处理,可配合多种编程语言使用。由于Storm无法进行批处理,如果需要这些能力可能还需要使用其他软件。如果对严格的一次处理保证有比较高的要求,此时可考虑使用Trident。不过这种情况下其他流处理框架也许更适合。
Apache Samza
Apache Samza是一种与Apache Kafka消息系统紧密绑定的流处理框架。虽然Kafka可用于很多流处理系统,但按照设计,Samza可以更好地发挥Kafka独特的架构优势和保障。该技术可通过Kafka提供容错、缓冲,以及状态存储。
Samza可使用YARN作为资源管理器。这意味着默认情况下需要具备Hadoop集群(至少具备HDFS和YARN),但同时也意味着Samza可以直接使用YARN丰富的内建功能。
流处理模式
Samza依赖Kafka的语义定义流的处理方式。Kafka在处理数据时涉及下列概念:
· Topic(话题):进入Kafka系统的每个数据流可称之为一个话题。话题基本上是一种可供消耗方订阅的,由相关信息组成的数据流。
· Partition(分区):为了将一个话题分散至多个节点,Kafka会将传入的消息划分为多个分区。分区的划分将基于键(Key)进行,这样可以保证包含同一个键的每条消息可以划分至同一个分区。分区的顺序可获得保证。
· Broker(代理):组成Kafka集群的每个节点也叫做代理。
· Procer(生成方):任何向Kafka话题写入数据的组件可以叫做生成方。生成方可提供将话题划分为分区所需的键。
· Consumer(消耗方):任何从Kafka读取话题的组件可叫做消耗方。消耗方需要负责维持有关自己分支的信息,这样即可在失败后知道哪些记录已经被处理过了。
由于Kafka相当于永恒不变的日志,Samza也需要处理永恒不变的数据流。这意味着任何转换创建的新数据流都可被其他组件所使用,而不会对最初的数据流产生影响。
优势和局限
乍看之下,Samza对Kafka类查询系统的依赖似乎是一种限制,然而这也可以为系统提供一些独特的保证和功能,这些内容也是其他流处理系统不具备的。
例如Kafka已经提供了可以通过低延迟方式访问的数据存储副本,此外还可以为每个数据分区提供非常易用且低成本的多订阅者模型。所有输出内容,包括中间态的结果都可写入到Kafka,并可被下游步骤独立使用。
这种对Kafka的紧密依赖在很多方面类似于MapRece引擎对HDFS的依赖。虽然在批处理的每个计算之间对HDFS的依赖导致了一些严重的性能问题,但也避免了流处理遇到的很多其他问题。
Samza与Kafka之间紧密的关系使得处理步骤本身可以非常松散地耦合在一起。无需事先协调,即可在输出的任何步骤中增加任意数量的订阅者,对于有多个团队需要访问类似数据的组织,这一特性非常有用。多个团队可以全部订阅进入系统的数据话题,或任意订阅其他团队对数据进行过某些处理后创建的话题。这一切并不会对数据库等负载密集型基础架构造成额外的压力。
直接写入Kafka还可避免回压(Backpressure)问题。回压是指当负载峰值导致数据流入速度超过组件实时处理能力的情况,这种情况可能导致处理工作停顿并可能丢失数据。按照设计,Kafka可以将数据保存很长时间,这意味着组件可以在方便的时候继续进行处理,并可直接重启动而无需担心造成任何后果。
Samza可以使用以本地键值存储方式实现的容错检查点系统存储数据。这样Samza即可获得“至少一次”的交付保障,但面对由于数据可能多次交付造成的失败,该技术无法对汇总后状态(例如计数)提供精确恢复。
Samza提供的高级抽象使其在很多方面比Storm等系统提供的基元(Primitive)更易于配合使用。目前Samza只支持JVM语言,这意味着它在语言支持方面不如Storm灵活。
总结
对于已经具备或易于实现Hadoop和Kafka的环境,Apache Samza是流处理工作负载一个很好的选择。Samza本身很适合有多个团队需要使用(但相互之间并不一定紧密协调)不同处理阶段的多个数据流的组织。Samza可大幅简化很多流处理工作,可实现低延迟的性能。如果部署需求与当前系统不兼容,也许并不适合使用,但如果需要极低延迟的处理,或对严格的一次处理语义有较高需求,此时依然适合考虑。
混合处理系统:批处理和流处理
一些处理框架可同时处理批处理和流处理工作负载。这些框架可以用相同或相关的组件和API处理两种类型的数据,借此让不同的处理需求得以简化。
如你所见,这一特性主要是由Spark和Flink实现的,下文将介绍这两种框架。实现这样的功能重点在于两种不同处理模式如何进行统一,以及要对固定和不固定数据集之间的关系进行何种假设。
虽然侧重于某一种处理类型的项目会更好地满足具体用例的要求,但混合框架意在提供一种数据处理的通用解决方案。这种框架不仅可以提供处理数据所需的方法,而且提供了自己的集成项、库、工具,可胜任图形分析、机器学习、交互式查询等多种任务。
Apache Spark
Apache Spark是一种包含流处理能力的下一代批处理框架。与Hadoop的MapRece引擎基于各种相同原则开发而来的Spark主要侧重于通过完善的内存计算和处理优化机制加快批处理工作负载的运行速度。
Spark可作为独立集群部署(需要相应存储层的配合),或可与Hadoop集成并取代MapRece引擎。
批处理模式
与MapRece不同,Spark的数据处理工作全部在内存中进行,只在一开始将数据读入内存,以及将最终结果持久存储时需要与存储层交互。所有中间态的处理结果均存储在内存中。
虽然内存中处理方式可大幅改善性能,Spark在处理与磁盘有关的任务时速度也有很大提升,因为通过提前对整个任务集进行分析可以实现更完善的整体式优化。为此Spark可创建代表所需执行的全部操作,需要操作的数据,以及操作和数据之间关系的Directed Acyclic Graph(有向无环图),即DAG,借此处理器可以对任务进行更智能的协调。
为了实现内存中批计算,Spark会使用一种名为Resilient Distributed Dataset(弹性分布式数据集),即RDD的模型来处理数据。这是一种代表数据集,只位于内存中,永恒不变的结构。针对RDD执行的操作可生成新的RDD。每个RDD可通过世系(Lineage)回溯至父级RDD,并最终回溯至磁盘上的数据。Spark可通过RDD在无需将每个操作的结果写回磁盘的前提下实现容错。
流处理模式
流处理能力是由Spark Streaming实现的。Spark本身在设计上主要面向批处理工作负载,为了弥补引擎设计和流处理工作负载特征方面的差异,Spark实现了一种叫做微批(Micro-batch)*的概念。在具体策略方面该技术可以将数据流视作一系列非常小的“批”,借此即可通过批处理引擎的原生语义进行处理。
Spark Streaming会以亚秒级增量对流进行缓冲,随后这些缓冲会作为小规模的固定数据集进行批处理。这种方式的实际效果非常好,但相比真正的流处理框架在性能方面依然存在不足。
优势和局限
使用Spark而非Hadoop MapRece的主要原因是速度。在内存计算策略和先进的DAG调度等机制的帮助下,Spark可以用更快速度处理相同的数据集。
Spark的另一个重要优势在于多样性。该产品可作为独立集群部署,或与现有Hadoop集群集成。该产品可运行批处理和流处理,运行一个集群即可处理不同类型的任务。
除了引擎自身的能力外,围绕Spark还建立了包含各种库的生态系统,可为机器学习、交互式查询等任务提供更好的支持。相比MapRece,Spark任务更是“众所周知”地易于编写,因此可大幅提高生产力。
为流处理系统采用批处理的方法,需要对进入系统的数据进行缓冲。缓冲机制使得该技术可以处理非常大量的传入数据,提高整体吞吐率,但等待缓冲区清空也会导致延迟增高。这意味着Spark Streaming可能不适合处理对延迟有较高要求的工作负载。
由于内存通常比磁盘空间更贵,因此相比基于磁盘的系统,Spark成本更高。然而处理速度的提升意味着可以更快速完成任务,在需要按照小时数为资源付费的环境中,这一特性通常可以抵消增加的成本。
Spark内存计算这一设计的另一个后果是,如果部署在共享的集群中可能会遇到资源不足的问题。相比HadoopMapRece,Spark的资源消耗更大,可能会对需要在同一时间使用集群的其他任务产生影响。从本质来看,Spark更不适合与Hadoop堆栈的其他组件共存一处。
总结
Spark是多样化工作负载处理任务的最佳选择。Spark批处理能力以更高内存占用为代价提供了无与伦比的速度优势。对于重视吞吐率而非延迟的工作负载,则比较适合使用Spark Streaming作为流处理解决方案。
Apache Flink
Apache Flink是一种可以处理批处理任务的流处理框架。该技术可将批处理数据视作具备有限边界的数据流,借此将批处理任务作为流处理的子集加以处理。为所有处理任务采取流处理为先的方法会产生一系列有趣的副作用。
这种流处理为先的方法也叫做Kappa架构,与之相对的是更加被广为人知的Lambda架构(该架构中使用批处理作为主要处理方法,使用流作为补充并提供早期未经提炼的结果)。Kappa架构中会对一切进行流处理,借此对模型进行简化,而这一切是在最近流处理引擎逐渐成熟后才可行的。
流处理模型
Flink的流处理模型在处理传入数据时会将每一项视作真正的数据流。Flink提供的DataStream API可用于处理无尽的数据流。Flink可配合使用的基本组件包括:
· Stream(流)是指在系统中流转的,永恒不变的无边界数据集
· Operator(操作方)是指针对数据流执行操作以产生其他数据流的功能
· Source(源)是指数据流进入系统的入口点
· Sink(槽)是指数据流离开Flink系统后进入到的位置,槽可以是数据库或到其他系统的连接器
为了在计算过程中遇到问题后能够恢复,流处理任务会在预定时间点创建快照。为了实现状态存储,Flink可配合多种状态后端系统使用,具体取决于所需实现的复杂度和持久性级别。
此外Flink的流处理能力还可以理解“事件时间”这一概念,这是指事件实际发生的时间,此外该功能还可以处理会话。这意味着可以通过某种有趣的方式确保执行顺序和分组。
批处理模型
Flink的批处理模型在很大程度上仅仅是对流处理模型的扩展。此时模型不再从持续流中读取数据,而是从持久存储中以流的形式读取有边界的数据集。Flink会对这些处理模型使用完全相同的运行时。
Flink可以对批处理工作负载实现一定的优化。例如由于批处理操作可通过持久存储加以支持,Flink可以不对批处理工作负载创建快照。数据依然可以恢复,但常规处理操作可以执行得更快。
另一个优化是对批处理任务进行分解,这样即可在需要的时候调用不同阶段和组件。借此Flink可以与集群的其他用户更好地共存。对任务提前进行分析使得Flink可以查看需要执行的所有操作、数据集的大小,以及下游需要执行的操作步骤,借此实现进一步的优化。
优势和局限
Flink目前是处理框架领域一个独特的技术。虽然Spark也可以执行批处理和流处理,但Spark的流处理采取的微批架构使其无法适用于很多用例。Flink流处理为先的方法可提供低延迟,高吞吐率,近乎逐项处理的能力。
Flink的很多组件是自行管理的。虽然这种做法较为罕见,但出于性能方面的原因,该技术可自行管理内存,无需依赖原生的Java垃圾回收机制。与Spark不同,待处理数据的特征发生变化后Flink无需手工优化和调整,并且该技术也可以自行处理数据分区和自动缓存等操作。
Flink会通过多种方式对工作进行分许进而优化任务。这种分析在部分程度上类似于SQL查询规划器对关系型数据库所做的优化,可针对特定任务确定最高效的实现方法。该技术还支持多阶段并行执行,同时可将受阻任务的数据集合在一起。对于迭代式任务,出于性能方面的考虑,Flink会尝试在存储数据的节点上执行相应的计算任务。此外还可进行“增量迭代”,或仅对数据中有改动的部分进行迭代。
在用户工具方面,Flink提供了基于Web的调度视图,借此可轻松管理任务并查看系统状态。用户也可以查看已提交任务的优化方案,借此了解任务最终是如何在集群中实现的。对于分析类任务,Flink提供了类似SQL的查询,图形化处理,以及机器学习库,此外还支持内存计算。
Flink能很好地与其他组件配合使用。如果配合Hadoop 堆栈使用,该技术可以很好地融入整个环境,在任何时候都只占用必要的资源。该技术可轻松地与YARN、HDFS和Kafka 集成。在兼容包的帮助下,Flink还可以运行为其他处理框架,例如Hadoop和Storm编写的任务。
目前Flink最大的局限之一在于这依然是一个非常“年幼”的项目。现实环境中该项目的大规模部署尚不如其他处理框架那么常见,对于Flink在缩放能力方面的局限目前也没有较为深入的研究。随着快速开发周期的推进和兼容包等功能的完善,当越来越多的组织开始尝试时,可能会出现越来越多的Flink部署
总结
Flink提供了低延迟流处理,同时可支持传统的批处理任务。Flink也许最适合有极高流处理需求,并有少量批处理任务的组织。该技术可兼容原生Storm和Hadoop程序,可在YARN管理的集群上运行,因此可以很方便地进行评估。快速进展的开发工作使其值得被大家关注。
结论
大数据系统可使用多种处理技术。
对于仅需要批处理的工作负载,如果对时间不敏感,比其他解决方案实现成本更低的Hadoop将会是一个好选择。
对于仅需要流处理的工作负载,Storm可支持更广泛的语言并实现极低延迟的处理,但默认配置可能产生重复结果并且无法保证顺序。Samza与YARN和Kafka紧密集成可提供更大灵活性,更易用的多团队使用,以及更简单的复制和状态管理。
对于混合型工作负载,Spark可提供高速批处理和微批处理模式的流处理。该技术的支持更完善,具备各种集成库和工具,可实现灵活的集成。Flink提供了真正的流处理并具备批处理能力,通过深度优化可运行针对其他平台编写的任务,提供低延迟的处理,但实际应用方面还为时过早。
最适合的解决方案主要取决于待处理数据的状态,对处理所需时间的需求,以及希望得到的结果。具体是使用全功能解决方案或主要侧重于某种项目的解决方案,这个问题需要慎重权衡。随着逐渐成熟并被广泛接受,在评估任何新出现的创新型解决方案时都需要考虑类似的问题。
⑧ Flink如何管理Kafka 消费位点(译文)
Checkpointing 是 Flink 故障恢复的内部机制。一个 checkpoint 就是 Flink应用程序产生的状态的一个副本。如果 Flink 任务发生故障,它会从 checkpoint 中载入之前的状态来恢复任务,就好像故障没有发生一样。
Checkpoints 是 Flink 容错的基础,并且确保了 Flink 流式应用在失败时的完整性。Checkpoints 可以通过 Flink 设置定时触发。
Flink Kafka consumer 使用 Flink 的 checkpoint 机制来存储 Kafka 每个分区的位点到 state。当 Flink 执行 checkpoint 时,Kafka 的每个分区的位点都被存储到 checkpoint 指定的 filesystem 中。Flink 的 checkpoint 机制确保了所有任务算子的状态是一致的,也就是说这些状态具有相同的数据输入。当所有的任务算子成功存储他们自己的状态后,代表一次 checkpoint 的完成。因此,当任务从故障中恢复时,Flink 保证了exactly-once。
下面将一步一步的演示 Flink 是如何通过 checkpoint 来管理 Kafka 的 offset 的。
下面的例子从两个分区的 Kafka topic 中读取数据,每个分区的数据是 “A”, “B”, “C”, ”D”, “E”。假设每个分区都是从 0 开始读取。
假设 Flink Kafka consumer 从分区 0 开始读取数据 “A”,那么此时第一个 consumer 的位点从 0 变成 1。如下图所示。
此时数据 “A” 到达 Flink Job 中的 Map Task。两个 consumer 继续读取数据 (从分区 0 读取数据 “B” ,从分区 1 读取数据 “A”)。 offsets 分别被更新成 2 和 1。与此同时,假设 Flink 从 source 端开始执行 checkpoint。
到这里,Flink Kafka consumer tasks 已经执行了一次快照,offsets也保存到了 state 中(“offset = 2, 1”) 。此时 source tasks 在 数据 “B” 和 “A” 后面,向下游发送一个 checkpoint barrier。checkpoint barriers 是 Flink 用来对齐每个任务算子的 checkpoint,以确保整个 checkpoint 的一致性。分区 1 的数据 “A” 到达 Flink Map Task, 与此同时分区 0 的 consumer 继续读取下一个消息(message “C”)。
Flink Map Task 收到上游两个 source 的 checkpoint barriers 然后开始执行 checkpoint ,把 state 保存到 filesystem。同时,消费者继续从Kafka分区读取更多事件。
假设 Flink Map Task 是 Flink Job 的最末端,那么当它完成 checkpoint 后,就会立马通知 Flink Job Master。当 job 的所有 task 都确认其 state 已经 “checkpointed”,Job Master将完成这次的整个 checkpoint。 之后,checkpoint 可以用于故障恢复。
如果发生故障(例如,worker 挂掉),则所有任务将重启,并且它们的状态将被重置为最近一次的 checkpoint 的状态。 如下图所示。
source 任务将分别从 offset 2 和 1 开始消费。当任务重启完成, 将会正常运行,就像之前没发生故障一样。
PS: 文中提到的 checkpoint 对齐,我说下我的理解,假设一个 Flink Job 有 Source -> Map -> Sink,其中Sink有多个输入。那么当一次checkpoint的 barrier从source发出时,到sink这里,多个输入需要等待其它的输入的barrier已经到达,经过对齐后,sink才会继续处理消息。这里就是exactly-once和at-least-once的区别。
The End
原文链接: How Apache Flink manages Kafka consumer offsets
⑨ 【Flink 精选】阐述 Flink 的容错机制,剖析 Checkpoint 实现流程
Flink 容错机制主要是 状态的保存和恢复,涉及 state backends 状态后端、checkpoint 和 savepoint,还有 Job 和 Task 的错误恢复 。
Flink 状态后端是指 保存 Checkpoint 数据的容器 ,其分类有 MemoryStateBackend、FsStateBackend、RocksDBStateBackend ,状态的分类有 operator state 和 keyed state 。
Flink 状态保存和恢复主要依靠 Checkpoint 机制和 Savepoint 机制,两者的区别如下表所示。
快照的概念来源于相片,指照相馆的一种冲洗过程短的照片。在计算机领域, 快照是数据存储的某一时刻的状态记录 。 Flink Snapshot 快照是指作业状态的全局一致记录 。一个完整的快照是包括 source 算子的状态(例如,消费 kafka partition 的 offset)、状态算子的缓存数据和 sink 算子的状态(批量缓存数据、事务数据等)。
Checkpoint 检查点可以自动产生快照,用于Flink 故障恢复 。Checkpoint 具有分布式、异步、增量的特点。
Savepoint 保存点是用户手动触发的,保存全量的作业状态数据 。一般使用场景是作业的升级、作业的并发度缩放、迁移集群等。
Flink 是采用轻量级的分布式异步快照,其实现是采用栅栏 barrier 作为 checkpoint 的传递信号,与业务数据一样无差别地传递下去 ,目的是使得数据流被切分成微批,进行 checkpoint 保存为 snapshot。当 barrier 经过流图节点的时候,Flink 进行 checkpoint 保存状态数据。
如下图所示,checkpoint n 包含每个算子的状态,该状态是指checkpoint n 之前的全部事件,而不包含它之后的所有事件。
针对用户作业出现故障而导致结果丢失或者重复的问题,Flink 提供3种语义:
① At-Least-Once 最少一次 :不会丢失数据,但可能会有重复结果。
② Exactly-Once 精确一次 :checkpoint barrier 对齐机制可以保障精确一次。
① FailureRateRestartStrategy :允许在指定时间间隔内的最大失败次数,同时可以设置重启延时时间。
② FixedDelayRestartStrategy :允许指定的失败次数,同时可以设置重启延时时间。
③ NoRestartStrategy :不需要重启,即 Job 直接失败。
④ ThrowingRestartStrategy :不需要重启,直接抛异常。
Job Restart 策略可以通过 env 设置。
上述策略的父类接口是RestartStrategy,其关键是restart(重启操作)。
① RestartAllStrategy :重启全部 task,默认策略。
② RestartIndivialStrategy :恢复单个 task。如果该 task 没有source,可能导致数据丢失。
③ NoOpFailoverStrategy :不恢复 task。
上述策略的父类接口是FailoverStrategy,其关键是Factory的create(创建 strategy)、onTaskFailure(处理错误)。
如何产生可靠的全局一致性快照是分布式系统的难点,其传统方案是使用的全局时钟,但存在单点故障、数据不一致等可靠性问题 。为了解决该问题, Chandy-Lamport 算法采用 marker 的传播来代替全局时钟 。
① 进程 Pi 记录自己的进程状态,同时生产一个标识信息 marker(与正常 message 不同),通过 ouput channel 发送给系统里面的其他进程。
② 进程 Pi 开始记录所有 input channel 接收到的 message
进程 Pj 从 input channel Ckj 接收到 marker。如果 Pj 还没有记录自己的进程状态,则 Pj 记录自己的进程状态,向 output channel 发送 marker;否则 Pj 正在记录自己的进程状态(该 marker 之前的 message)。
所有的进程都收到 marker 信息并且记录下自己的状态和 channel 的状态(包含的 message)。
Flink 的分布式异步快照实现了Chandy Lamport 算法,其核心思想是 在 source 插入 barrier 代替 Chandy-Lamport 算法中的 marker,通过控制 barrier 的同步来实现 snapshot 的备份和 Exactly-Once 语义 。
Checkpoint Coordinator 向所有 source 节点 trigger Checkpoint。
source task向下游广播barrier。
当source task备份完自己的状态后,会将备份数据的地址(state handle)通知 Checkpoint Coordinator。
map和sink task收集齐上游source的barrier n,执行本地快照。下面例子是RocksDB增量Checkpoint 的流程:首先RocksDB会全量保存到磁盘上(红色大三角表示),然后Flink会从中选择没有上传的文件进行持久化备份(紫色小三角)。
map和sink task在完成Checkpoint 之后,将状态地址state handle返回通知 Coordinator。
当Checkpoint Coordinator收到全部task的state handle,就确定该Checkpoint已完成,并向持久化存储中备份一个Checkpoint Meta(元数据,包括该checkpoint状态数据的备份地址)。
⑩ Flink——Exactly-Once
Apache Flink是目前市场最受关注的流计算处理引擎,相较于Spark Streaming的依托Spark Core实现的微批处理模型,Flink是一个纯粹的流处理引擎,其基于操作符的连续流模型,可以达到微秒级别的延迟。
Flink实现了流批一体化模式,实现按照事件处理和无序处理两种形式,基于内存计算。强大高效的反压机制和内存管理,基于轻量级分布式快照checkpoint机制,从而自动实现了Exactly-Once一致性语义。
1. 数据源端
支持可靠的数据源(如kafka), 数据可重读
Apache Flink内置FlinkKafkaConsumer010类,不依赖于 kafka 内置的消费组offset管理,在内部自行记录和维护 consumer 的offset。
2. Flink消费端
轻量级快照机制: 一致性checkpoint检查点
Flink采用了一种轻量级快照机制(检查点checkpoint)来保障Exactly-Once的一致性语义。所谓的一致检查点,即在某个时间点上所有任务状态的一份拷贝(快照)。该时间点是所有任务刚好处理完一个相同数据的时间。
间隔时间自动执行分布式一致性检查点(Checkpoints)程序,异步插入barrier检查点分界线,内存状态自动存储为cp进程文件。保证数据Exactly Oncey精确一次处理。
(1) 从source(Input)端开始,JobManager会向每个source(Input)发送检查点barrier消息,启动检查点。在保证所有的source(Input)数据都处理完成后,Flink开始保存具体的一致性检查点checkpoints,并在过程中启用barrier检查点分界线。
(2) 接收数据和barrier消息,两个过程异步进行。在所有的source(Input)数据都处理完成后,开始将自己的检查点(checkpoints)保存到状态后(StateBackend)中,并通知JobManager将Barrier分发到下游
(3) barrier向下游传递时,会进行barrier对齐确认。待barrier都到齐后才进行checkpoints检查点保存。
(4) 重复以上操作,直到整个流程完成。
3. 输出端
与上文Spark的输出端Exactly-Once一致性上实现类似,除了目标源需要满足一定条件以外,Flink内置的二阶段提交机制也变相实现了事务一致性。**支持幂等写入、事务写入机制(二阶段提交) **
这一块和上文Spark的幂写入特性内容一致,即相同Key/ID 更新写入,数据不变。借助支持主键唯一性约束的存储系统,实现幂等性写入数据,此处将不再继续赘述。
Flink在处理完source端数据接收和operator算子计算过程,待过程中所有的checkpoint都完成后,准备发送数据到sink端,此时启动事务。其中存在两种方式: (1) WAL预写日志: 将计算结果先写入到日志缓存(状态后端/WAL)中,等checkpoint确认完成后一次性写入到sink。(2) 二阶段提交: 对于每个checkpoint创建事务,先预提交数据到sink中,然后等所有的checkpoint全部完成后再真正提交请求到sink, 并把状态改为已确认。
整体思想: 为checkpoint创建事务,等到所有的checkpoint全部真正的完成后,才把计算结果写入到sink中。