1. 【Flink 精选】阐述 Flink 的容错机制,剖析 Checkpoint 实现流程
Flink 容错机制主要是 状态的保存和恢复,涉及 state backends 状态后端、checkpoint 和 savepoint,还有 Job 和 Task 的错误恢复 。
Flink 状态后端是指 保存 Checkpoint 数据的容器 ,其分类有 MemoryStateBackend、FsStateBackend、RocksDBStateBackend ,状态的分类有 operator state 和 keyed state 。
Flink 状态保存和恢复主要依靠 Checkpoint 机制和 Savepoint 机制,两者的区别如下表所示。
快照的概念来源于相片,指照相馆的一种冲洗过程短的照片。在计算机领域, 快照是数据存储的某一时刻的状态记录 。 Flink Snapshot 快照是指作业状态的全局一致记录 。一个完整的快照是包括 source 算子的状态(例如,消费 kafka partition 的 offset)、状态算子的缓存数据和 sink 算子的状态(批量缓存数据、事务数据等)。
Checkpoint 检查点可以自动产生快照,用于Flink 故障恢复 。Checkpoint 具有分布式、异步、增量的特点。
Savepoint 保存点是用户手动触发的,保存全量的作业状态数据 。一般使用场景是作业的升级、作业的并发度缩放、迁移集群等。
Flink 是采用轻量级的分布式异步快照,其实现是采用栅栏 barrier 作为 checkpoint 的传递信号,与业务数据一样无差别地传递下去 ,目的是使得数据流被切分成微批,进行 checkpoint 保存为 snapshot。当 barrier 经过流图节点的时候,Flink 进行 checkpoint 保存状态数据。
如下图所示,checkpoint n 包含每个算子的状态,该状态是指checkpoint n 之前的全部事件,而不包含它之后的所有事件。
针对用户作业出现故障而导致结果丢失或者重复的问题,Flink 提供3种语义:
① At-Least-Once 最少一次 :不会丢失数据,但可能会有重复结果。
② Exactly-Once 精确一次 :checkpoint barrier 对齐机制可以保障精确一次。
① FailureRateRestartStrategy :允许在指定时间间隔内的最大失败次数,同时可以设置重启延时时间。
② FixedDelayRestartStrategy :允许指定的失败次数,同时可以设置重启延时时间。
③ NoRestartStrategy :不需要重启,即 Job 直接失败。
④ ThrowingRestartStrategy :不需要重启,直接抛异常。
Job Restart 策略可以通过 env 设置。
上述策略的父类接口是RestartStrategy,其关键是restart(重启操作)。
① RestartAllStrategy :重启全部 task,默认策略。
② RestartIndivialStrategy :恢复单个 task。如果该 task 没有source,可能导致数据丢失。
③ NoOpFailoverStrategy :不恢复 task。
上述策略的父类接口是FailoverStrategy,其关键是Factory的create(创建 strategy)、onTaskFailure(处理错误)。
如何产生可靠的全局一致性快照是分布式系统的难点,其传统方案是使用的全局时钟,但存在单点故障、数据不一致等可靠性问题 。为了解决该问题, Chandy-Lamport 算法采用 marker 的传播来代替全局时钟 。
① 进程 Pi 记录自己的进程状态,同时生产一个标识信息 marker(与正常 message 不同),通过 ouput channel 发送给系统里面的其他进程。
② 进程 Pi 开始记录所有 input channel 接收到的 message
进程 Pj 从 input channel Ckj 接收到 marker。如果 Pj 还没有记录自己的进程状态,则 Pj 记录自己的进程状态,向 output channel 发送 marker;否则 Pj 正在记录自己的进程状态(该 marker 之前的 message)。
所有的进程都收到 marker 信息并且记录下自己的状态和 channel 的状态(包含的 message)。
Flink 的分布式异步快照实现了Chandy Lamport 算法,其核心思想是 在 source 插入 barrier 代替 Chandy-Lamport 算法中的 marker,通过控制 barrier 的同步来实现 snapshot 的备份和 Exactly-Once 语义 。
Checkpoint Coordinator 向所有 source 节点 trigger Checkpoint。
source task向下游广播barrier。
当source task备份完自己的状态后,会将备份数据的地址(state handle)通知 Checkpoint Coordinator。
map和sink task收集齐上游source的barrier n,执行本地快照。下面例子是RocksDB增量Checkpoint 的流程:首先RocksDB会全量保存到磁盘上(红色大三角表示),然后Flink会从中选择没有上传的文件进行持久化备份(紫色小三角)。
map和sink task在完成Checkpoint 之后,将状态地址state handle返回通知 Coordinator。
当Checkpoint Coordinator收到全部task的state handle,就确定该Checkpoint已完成,并向持久化存储中备份一个Checkpoint Meta(元数据,包括该checkpoint状态数据的备份地址)。
2. Flink HistoryServer配置(简单三步完成)
允许您查询JobManager存档的已完成作业的状态和统计信息。(官网原话)
最适合用于:了解 flink过去完成任务的状态,以及有状态作业的恢复(保存了最后一次的checkpoint地址)
官网地址: https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/historyserver.html
官网配置参数: https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#history-server
修改flink-1.11.2/conf/flink-conf.yaml文件
两张图:
historyserver.web.tmpdir的默认配置图:
historyserver.web.tmpdir的自定义路径配置图:
在hdfs的/flink目录下创建completed-jobs目录(权限可以改成777)
启动/关闭命令:
1、查看启动状态
2、分别启一个per-job任务、sql任务、基于session启的任务,过一会全部cancel掉,都可以在hdfs路径和/tmp下的自定义目录看到相关数据,最后可以在host:8082上面看到你刚才canceled的任务,如下图:
3、访问hdfs路径:
4、访问 http://host:8082 可以查看到历史完成任务状态:
生产中遇到突然这个服务丢失,然后重启任务失败。通过排查任务是historyserver.web.tmpdir: /tmp/flinkhistoryserver/这个路径被删除了。
3. 基于flink sql构建实时数据仓库
根据目前大数据这一块的发展,已经不局限于离线的分析,挖掘数据潜在的价值,数据的时效性最近几年变得刚需,实时处理的框架有storm,spark-streaming,flink等。想要做到实时数据这个方案可行,需要考虑以下几点:1、状态机制 2、精确一次语义 3、高吞吐量 4、可弹性伸缩的应用 5、容错机制,刚好这几点,flink都完美的实现了,并且支持flink sql高级API,减少了开发成本,可用实现快速迭代,易维护等优点。
离线数仓的架构图:
实时数仓架构图:
目前是将实时维度表和DM层数据存于hbase当中,实时公共层都存于kafka当中,并且以写滚动日志的方式写入HDFS(主要是用于校验数据)。其实在这里可以做的工作还有很多,kafka集群,flink集群,hbase集群相互独立,这对整个实时数据仓库的稳定性带来一定的挑战。
一个数据仓库想要成体系,成资产,离不开数据域的划分。所以参考着离线的数据仓库,想着在实时数仓做出这方面的探索,理论上来讲,离线可以实现的,实时也是可以实现的。 并且目前已经取得了成效,目前划分的数据域跟离线大致相同,有流量域,交易域,营销域等等。当然这里面涉及到维表,多事务事实表,累计快照表,周期性快照表的设计,开发,到落地这里就不详述了。
维度表也是整个实时数据仓库不可或缺的部分。从目前整个实时数仓的建设来看,维度表有着数据量大,但是变更少的特点,我们试想过构建全平台的实时商品维度表或者是实时会员维度表,但是这类维度表太过于复杂,所以针对这类维度表下面介绍。还有另外一种就是较为简单的维度表,这类维度可能对应着业务系统单个mysql表,或者只需要几个表进行简单ETL就可以产出的表,这类维表是可以做成实时的。以下有几个实施的关键点:
如下是离线数据同步架构图:
实时数据的接入其实在底层架构是一样的,就是从kafka那边开始不一样,实时用flink的UDTF进行解析,而离线是定时(目前是小时级)用camus拉到HDFS,然后定时load HDFS的数据到hive表里面去,这样来实现离线数据的接入。实时数据的接入是用flink解析kafka的数据,然后在次写入kafka当中去。
由于目前离线数据已经稳定运行了很久,所以实时接入数据的校验可以对比离线数据,但是离线数据是小时级的hive数据,实时数据存于kafka当中,直接比较不了,所以做了相关处理,将kafka的数据使用flink写HDFS滚动日志的形式写入HDFS,然后建立hive表小时级定时去load HDFS中的文件,以此来获取实时数据。
完成以上两点,剩余还需要考虑一点,都是小时级的任务,这个时间卡点使用什么字段呢?首先要确定一点就是离线和实时任务卡点的时间字段必须是一致的,不然肯定会出问题。目前离线使用camus从kafka将数据拉到HDFS上,小时级任务,使用nginx_ts这个时间字段来卡点,这个字段是上报到nginx服务器上记录的时间点。而实时的数据接入是使用flink消费kafka的数据,在以滚动日志的形式写入HDFS的,然后在建立hive表load HDFS文件获取数据,虽然这个hive也是天/小时二级分区,但是离线的表是根据nginx_ts来卡点分区,但是实时的hive表是根据任务启动去load文件的时间点去区分的分区,这是有区别的,直接筛选分区和离线的数据进行对比,会存在部分差异,应当的做法是筛选范围分区,然后在筛选nginx_ts的区间,这样在跟离线做对比才是合理的。
目前实时数据接入层的主要时延是在UDTF函数解析上,实时的UDTF函数是根据上报的日志格式进行开发的,可以完成日志的解析功能。
解析流程图如下:
解析速率图如下:
该图还不是在峰值数据量的时候截的,目前以800记录/second为准,大概一个记录的解析速率为1.25ms。
目前该任务的flink资源配置核心数为1,假设解析速率为1.25ms一条记录,那么峰值只能处理800条/second,如果数据接入速率超过该值就需要增加核心数,保证解析速率。
介绍一下目前离线维度表的情况,就拿商品维度表来说,全线记录数将近一个亿,计算逻辑来自40-50个ods层的数据表,计算逻辑相当复杂,如果实时维度表也参考离线维度表来完成的话,那么开发成本和维护成本非常大,对于技术来讲也是很大的一个挑战,并且目前也没有需求要求维度属性百分百准确。所以目前(伪实时维度表)准备在当天24点产出,当天的维度表给第二天实时公共层使用,即T-1的模式。伪实时维度表的计算逻辑参考离线维度表,但是为了保障在24点之前产出,需要简化一下离线计算逻辑,并且去除一些不常用的字段,保障伪实时维度表可以较快产出。
实时维度表的计算流程图:
目前使用flink作为公司主流的实时计算引擎,使用内存作为状态后端,并且固定30s的间隔做checkpoint,使用HDFS作为checkpoint的存储组件。并且checkpoint也是作为任务restart以后恢复状态的重要依据。熟悉flink的人应该晓得,使用内存作为状态后端,这个内存是JVM的堆内存,毕竟是有限的东西,使用不得当,OOM是常有的事情,下面就介绍一下针对有限的内存,如果完成常规的计算。
4. Flink kafka kerberos的配置
Flink消费集成kerberos认证的kafka集群时,需要做一些配置才可以正常执行。
Flink版本:1.8;kafka版本:2.0.1;Flink模式:Standalone
//指示是否从 Kerberos ticket 缓存中读取
security.kerberos.login.use-ticket-cache: false1
//Kerberos 密钥表文件的绝对路径
security.kerberos.login.keytab: /data/home/keytab/flink.keytab
//认证主体名称
security.kerberos.login.principal: [email protected]
//Kerberos登陆contexts
security.kerberos.login.contexts: Client,KafkaClient
val properties: Properties =new Properties()
properties.setProperty("bootstrap.servers","broker:9092")
properties.setProperty("group.id","testKafka")
properties.setProperty("security.protocol","SASL_PLAINTEXT")
properties.setProperty("sasl.mechanism","GSSAPI")
properties.setProperty("sasl.kerberos.service.name","kafka")
consumer =new FlinkKafkaConsumer[String]("flink",new SimpleStringSchema(), properties)
参数说明 :security.protocol
运行参数可以配置为PLAINTEXT(可不配置)/SASL_PLAINTEXT/SSL/SASL_SSL四种协议,分别对应Fusion Insight Kafka集群的21005/21007/21008/21009端口。 如果配置了SASL,则必须配置sasl.kerberos.service.name为kafka,并在conf/flink-conf.yaml中配置security.kerberos.login相关配置项。如果配置了SSL,则必须配置ssl.truststore.location和ssl.truststore.password,前者表示truststore的位置,后者表示truststore密码。
5. Apache Flink快速入门-基本架构、核心概念和运行流程
Flink是一个基于流计算的分布式引擎,以前的名字叫stratosphere,从2010年开始在德国一所大学里发起,也是有好几年的 历史 了,2014年来借鉴了社区其它一些项目的理念,快速发展并且进入了Apache顶级孵化器,后来更名为Flink。
Flink在德语中是快速和灵敏的意思 ,用来体现流式数据处理速度快和灵活性强等特点。
Flink提供了同时支持高吞吐、低延迟和exactly-once 语义的实时计算能力,另外Flink 还提供了基于流式计算引擎处理批量数据的计算能力,真正意义上实现了流批统一。
Flink 独立于Apache Hadoop,且能在没有任何 Hadoop 依赖的情况下运行。
但是,Flink 可以很好的集成很多 Hadoop 组件,例如 HDFS、YARN 或 HBase。 当与这些组件一起运行时,Flink 可以从 HDFS 读取数据,或写入结果和检查点(checkpoint)/快照(snapshot)数据到 HDFS 。 Flink 还可以通过 YARN 轻松部署,并与 YARN 和 HDFS Kerberos 安全模块集成。
Flink具有先进的架构理念、诸多的优秀特性,以及完善的编程接口。
Flink的具体优势有如下几点:
(1)同时支持高吞吐、低延迟、高性能;
(2)支持事件时间(Event Time)概念;
事件时间的语义使流计算的结果更加精确,尤其在事件到达无序或者延迟的情况下,保持了事件原本产生时的时序性,尽可能避免网络传输或硬件系统的影响。
(3)支持有状态计算;
所谓状态就是在流计算过程中,将算子的中间结果数据保存在内存或者文件系统中,等下一个事件进入算子后,可以从之前的状态中获取中间结果,计算当前的结果,从而无需每次都基于全部的原始数据来统计结果。
(4)支持高度灵活的窗口(Window)操作;
(5)基于轻量级分布式快照(Snapshot)实现的容错;
(6)基于JVM实现独立的内存管理;
(7)Save Points(保存点);
保存点是手动触发的,触发时会将它写入状态后端(State Backends)。Savepoints的实现也是依赖Checkpoint的机制。Flink 程序在执行中会周期性的在worker 节点上进行快照并生成Checkpoint。因为任务恢复的时候只需要最后一个完成的Checkpoint的,所以旧有的Checkpoint会在新的Checkpoint完成时被丢弃。Savepoints和周期性的Checkpoint非常的类似,只是有两个重要的不同。一个是由用户触发,而且不会随着新的Checkpoint生成而被丢弃。
在Flink整个软件架构体系中,统一遵循了分层的架构设计理念,在降低系统耦合度的同时,为上层用户构建Flink应用提供了丰富且友好的接口。
整个Flink的架构体系可以分为三层:
Deployment层: 该层主要涉及了Flink的部署模式,Flink支持多种部署模式:本地、集群(Standalone/YARN),云(GCE/EC2),Kubernetes等。
Runtime层:Runtime层提供了支持Flink计算的全部核心实现,比如:支持分布式Stream处理、JobGraph到ExecutionGraph的映射、调度等等,为上层API层提供基础服务。
API层: 主要实现了面向无界Stream的流处理和面向Batch的批处理API,其中面向流处理对应DataStream API,面向批处理对应DataSet API。
Libraries层:该层也可以称为Flink应用框架层,根据API层的划分,在API层之上构建的满足特定应用的计算框架,也分别对应于面向流处理和面向批处理两类。
核心概念:Job Managers,Task Managers,Clients
Flink也是典型的master-slave分布式架构。Flink的运行时,由两种类型的进程组成:
Client: Client不是运行时和程序执行的一部分,它是用来准备和提交数据流到JobManagers。之后,可以断开连接或者保持连接以获取任务的状态信息。
当 Flink 集群启动后,首先会启动一个 JobManger 和一个或多个的 TaskManager。由 Client 提交任务给 JobManager, JobManager 再调度任务到各个 TaskManager 去执行,然后 TaskManager 将心跳和统计信息汇报给 JobManager。 TaskManager 之间以流的形式进行数据的传输。上述三者均为独立的 JVM 进程。
每个Worker(Task Manager)是一个JVM进程,通常会在单独的线程里执行一个或者多个子任务。为了控制一个Worker能够接受多少个任务,会在Worker上抽象多个Task Slot (至少一个)。
只有一个slot的TaskManager意味着每个任务组运行在一个单独JVM中。 在拥有多个slot的TaskManager上,subtask共用JVM,可以共用TCP连接和心跳消息,同时可以共用一些数据集和数据结构,从而减小任务的开销。
Flink的任务运行其实是多线程的方式,这和MapRece多JVM进程的方式有很大的区别,Flink能够极大提高CPU使用效率,在多个任务之间通过TaskSlot方式共享系统资源,每个TaskManager中通过管理多个TaskSlot资源池对资源进行有效管理。
6. 3.一文搞定:Flink中端到端的状态一致性(理念)
哈喽,大家好。在第二章节中我聊了聊状态后端和检查点相关的内容,如果你仔细看完就能够很清楚的知道,Flink是如何保存自己引以为傲的状态的了。并且我也在上一章节中说了,Flink能够依赖检查点机制来实现自身的精确一致性,所以这篇文章咱们就简单聊聊它所谓的精准一次性是如何实现的。(检查点是根据在数据中插入“分界线”标志来触发的)
从本文的标题能够看出,今天要聊的是端到端的状态一致性,那么首先要先知道这个端到端是什么意思。我们试想一下,Flink处理数据,肯定是要有数据来源和数据发送点的,也就是source和sink。那么,如果要聊端到端的精准一次性,就要对这个两个“端”字进行拆解,分为输入端与Flink之间的精准一次性,和Flink与输出端之间的精准一次性。
输入端的精准一次性比较好理解,你只需要明确一点,只要向Flink发送数据的source端具备数据重放的功能就好。还记得上一小节提到的检查点机制吗?它是对程序运行时的某个时间点所有状态的一次快照,在做快照的时候,source算子会读取数据源的偏移量信息,一起保存在检查点中,这就能够在故障恢复的时候让source算子根据自己保存的这个偏移量信息,去数据源中重新读取数据。所以数据源头这方面,精准一次性比较好搞,因为Flink内部干了很多事情。当然了,这个source端一定要具备数据保存和数据回放的能力,如果不存数的话,就算真记录了偏移量也是白搭。
输入端说完了,接下来就说说输出端。这一块其实就不太好弄了,因为Flink将数据发送到sink方。它的角色就仿若souce到flink一样。(ps:souce->flink === flink -->sink,都是一个东西向另外一个东西发送数据)。但是sink端的外部存储系统没办法像Flink一样,通过保存状态的方式来完成精准一次性。但是在开发过程中,精准一次性很多场景都非常重要,所以为了能够实现Flink到sink端的精准一次性,还是需要对sink端有一些要求的。
主要的要求内容一共有两个,分别是“事务”和“幂等性”:
我们首先来聊一聊幂等,幂等这个东西很神奇。它就像一个不允许重复的集合一样,在幂等的环境下,一个操作可以重复多次,但是次操作所导致的结果就只有一次。有了幂等性,数据无论写多少次,都不会影响到sink端的结果,但是如果想要进行幂等性写出,那就要求sink端需要支持幂等,还是有一些场景上的限制的。
幂等性聊完了就聊聊事务,事务相对于幂等性的要求它的限制要少一些。其实在聊精准一次性之前,大家要明确一个点,那就是sink端面临的问题不是数据会不会丢失,而是数据会不会被重复计算。这是因为即使是数据因为故障丢失,source端也会通过检查点来对数据进行重新读写,所以数据丢失的几率被降得很低。所以重复计算才是sink端的难题。
我们都知道,事务的特性是:如果我本次的活干到一半突然出现故障,那么这次干的内容我会全部收回。当然这是粗俗的说法,如果严格一点,那就是:事务具有四个基本特性:ACID,即原子、一致、隔离、持久这四大特性。通过使用这四大特性,就能够保证如果本次的操作没有完成,那这次操作过程中完成的部分也会被撤销。
那就用这个理念,来套入Flink与sink之间的处理。如果我们写一个事务,用这个事务和检查点绑定在一起,通过这个事务向外部写出数据,当Sink算子触发检查点保存的时候,开启保存状态的同时就开启一个事务,接下来的数据都写在这个事务里面。只有检查点完成了保存,那么事务就可以提交,数据就算是成功写出,可以使用了。如果程序出现故障了,状态保存失败,就会根据检查点中的内容对数据重新读写,事务也肯定是失败了,就会回退。之前的操作也会作废,也就证明了数据没有多余写出了。
为了能够满足Flink与sink端之间的状态精准一次性,也明确了在两种实现理论中事务比较牛,那就再来聊一聊事务实现的两种方式:预写日志、两阶段提交。当然了,这个TMD数据写出端也需要支持事务才行。就像我之前使用到的Clickhouse,它既不支持事务也不支持幂等,只能通过本身的副本合并树来实现虚假的精准一次性(我现阶段就这个水平,可能有的人会手写更牛叉的自定义Sink也说不定)。
预写日志首先需要把数据作为日志状态保存起来,既然它是日志状态了,那就证明程序在触发检查点的时候就能够将它一起以快照的方式存储到外部系统中做持久化存储。当检查点成功保存之后,再把结果一次性写出就好。这么搞是不是就算是完成精准一次性了呢?答案是,是的!但是预写日志有个问题,那就是它是把一个小阶段之间的数据全量写出,虽然在宏观角度去看这一个阶段很小,但是Flink处理读取数据的能力贼强,就这一小阶段就会产生大量数据,一次性写出就会对集群造成一定的压力。就相当于把流处理变成了批处理(spark震怒!!!)
这种方法看起来有点美好,但是实际上它有问题。它把流处理变成了批处理,那必然要知道这次批操作成没成功,所以在数据写出完成之后,会还给sink一个成功的信号,只有明确这个信息是真的完成了,才能证明Checkpoint是成功的,同时也要把这个返回的成功信息做一个保存,用来证明这次批处理的成功。这种情况会面临的问题就是,当数据成功写出了,返回成功信号的时候出事儿了,那故障恢复后Flink确认不了这次成没成功,就会重新发送数据,数据就又会重复发送了。
这个功能DateSteam API提供了一个模板类来帮忙完成。
预写日志说完了,就来谈谈两阶段提交吧。它比预写日志还要牛一点,但是限制也是一大堆,比较难搞。在正式说两阶段提交之前,要明确两个概念。第一个是两阶段是由预提交和正式提交两部分组成,预提交阶段的数据内容是不能够使用的。
当第一条数据来时,或者是sink接收到检查点的“分界线”时,就会开启一个事务,这个事务会代替sink来进行数据的写出操作,但是在这个阶段,所有写出的操作都是不可用的,也就是预提交阶段。而sink这个时候会进入一个等待状态,它需要接收JobManager发送给它的一个检查点成功保存的信号,一旦它接收到这个信息,那么sink端的这个事务就会提交,预提交阶段所有不可用的数据便会变成可使用的了。
Flink同样提供了一个模板sink:
在这篇文章里面,我聊了聊我理解的有关于状态一致性的内容,总的来说得到了一个结论,那就是精准一次性代价颇高,逼事贼多。而我目前用到的clickhouse既不支持事务、也不支持幂等。所以我还没有解除过重写上述两个接口的过程。但是,在进行实时数据处理的过程中,大多都是在Kafka与Flink之间进行数据的来回传递,分层操作也是如此。所以,我们在下一篇文章中就聊聊,有关于 Kafka -> Flink -> Kafka 之间的状态一致性吧。
7. flink配置和内存
1.Hosts and Ports
metrics.internal.query-service.port "0" String Accepts a list of ports (“50100,50101”), ranges(“50100-50200”) or a combination of both.
rest.bind-address (none) String
rest.bind-port "8081" String
taskmanager.data.port 0 Integer 任务管理器的外部端口,用于数据交换操作。
taskmanager.host (none) String
taskmanager.rpc.port "0" String
2.Fault Tolerance
restart-strategy 重启策略 默认空
restart-strategy.fixed-delay.attempts 固定周期重启
restart-strategy.fixed-delay.delay 固定周期重启
restart-strategy.failure-rate.delay 失败率重启
restart-strategy.failure-rate.failure-rate-interval 失败率重启
restart-strategy.failure-rate.max-failures-per-interval 失败率重启
state.backend.incremental 增量checkpoint(仅对rocksdb支持)
state.backend.local-recovery 状态后端配置本地恢复,默认false,只支持键控状态后端
state.checkpoints.num-retained 保留的最大已完成检查点数,默认1
taskmanager.state.local.root-dirs 本地恢复的根目录
3.High Availability
high-availability 默认为NONE To enable high-availability, set this mode to "ZOOKEEPER" or specify FQN of factory class.
high-availability.cluster-id 高可用flink集群ID
high-availability.storageDir 元数据路径
high-availability.zookeeper.path.root 配置ZK路径
high-availability.zookeeper.quorum 配置ZK集群
4.Memory Configuration
在大多数情况下,用户只需要设置值taskmanager.memory.process.size或taskmanager.memory.flink.size(取决于设置方式),并可能通过调整JVM堆与托管内存的比率taskmanager.memory.managed.fraction。
jobmanager.memory.enable-jvm-direct-memory-limit 是否启用jm进程的JVM直接内存限制,默认false
jobmanager.memory.flink.size 默认none。这包括JobManager消耗的所有内存。非容器配置
jobmanager.memory.heap.size 默认none。默认为总内存减去JVM,network,managed等
jobmanager.memory.jvm-metaspace.size jm的元空间大小,默认256mb
jobmanager.memory.jvm-overhead.fraction jm保留总进程内存的比例,默认0.1
jobmanager.memory.jvm-overhead.max 最大JVM开销,默认1gb
jobmanager.memory.jvm-overhead.min 最小JVM开销,默认192mb
jobmanager.memory.off-heap.size jm的堆外内存,默认128mb,如果第一个参数被启用,这个将生效
jobmanager.memory.process.size JobManager的总进程内存大小。容器化配置这个,为容器总大小
taskmanager.memory.flink.size TaskExecutor的总Flink内存大小。默认none,非容器配置
taskmanager.memory.framework.heap.size TaskExecutor的框架堆内存大小。默认128mb
taskmanager.memory.framework.off-heap.size TaskExecutor的框架堆外内存大小。默认128mb
taskmanager.memory.jvm-metaspace.size TaskExecutor的JVM元空间大小。默认256mb
taskmanager.memory.jvm-overhead.fraction 要为JVM开销保留的总进程内存的分数。默认0.1
taskmanager.memory.jvm-overhead.max TaskExecutor的最大JVM开销最大大小,默认1gb
taskmanager.memory.jvm-overhead.min TaskExecutor的最小JVM开销大小。默认192mb
taskmanager.memory.managed.consumer-weights 消费者权重。DATAPROC(用于流式RocksDB状态后端和批量内置算法)和PYTHON(用于Python进程),默认DATAPROC:70,PYTHON:30
taskmanager.memory.managed.fraction 如果未显式指定托管内存大小,则将用作托管内存的Flink总内存的分数,默认0.4
taskmanager.memory.managed.size TaskExecutor的托管内存大小。默认none
taskmanager.memory.network.fraction 用作网络内存的总Flink内存的分数,默认0.1
taskmanager.memory.network.max TaskExecutor的最大网络内存大小。默认1gb
taskmanager.memory.network.min TaskExecutor的最小网络内存大小。默认64mb
taskmanager.memory.process.size TaskExecutor的总进程内存大小。默认none,容器配置
taskmanager.memory.task.heap.size tm内存,默认none,
taskmanager.memory.task.off-heap.size tm堆外内存,默认0
5.Miscellaneous Options
fs.allowed-fallback-filesystems none
fs.default-scheme none
io.tmp.dirs 'LOCAL_DIRS' on Yarn. '_FLINK_TMP_DIR' on Mesos. System.getProperty("java.io.tmpdir") in standalone.
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_setup_tm.html
8. 4.一文搞定:Flink与Kafka之间的精准一次性
在上一篇文章当中,也算是比较详细且通俗的聊了聊Flink是如何通过checkpoint机制来完成数据精准一次性的实现的。并且也在上一章的结尾表示,要在接下来聊一聊Flink与它的铁哥们Kafaka之间,是如何实现数据的精准一次性消费的。
本次的聊法,还是要通过以kafka(source)->Flink,Flink(source)->Kafka来分别展开讨论。
kafka是一个具有数据保存、数据回放能力的消息队列,说白了就是kafka中的每一个数据,都有一个专门的标记作为标识。而在Flink消费kafka传入的数据的时候,source任务就能够将这个偏移量以算子状态的角色进行保存,写入到设定好的检查点中。这样一旦发生故障,Flink中的FlinkKafkaProce连接器就i能够按照自己保存的偏移量,自己去Kafka中重新拉取数据,也正是通过这种方式,就能够确保Kafka到Flink之间的精准一次性。
在上一篇文章当中,已经表明了,如果想要让输出端能够进行精准一次性消费,就需要使用到幂等性或者是事务。而事务中的两阶段提交是所有方案里面最好的实现。
其实Flink到Kafak之间也是采用了这种方式,具体的可以看一下ctrl进到FlinkKafkaProce连接器内部去看一看:
这也就表明了,当数据通过Flink发送给sink端Kafka的时候,是经历了两个阶段的处理的。第一阶段就是Flink向Kafka中插入数据,进入预提交阶段。当JobManager发送的Ckeckpoint保存成功信号过来之后,才会提交事务进行正式的数据发送,也就是让原来不可用的数据可以被使用了。
这个实现过程到目前阶段就很清晰了,它的主体流程无非就是在开启检查点之后,由JobManager向各个阶段的处理逻辑发送有关于检查点的barrier。所有的计算任务接收到之后,就会根据自己当前的状态做一个检查点保存。而当这个barrier来到sink任务的时候,sink就会开启一个事务,然后通过这个事务向外预写数据。直到Jobmanager来告诉它这一次的检查点已经保存完成了,sink就会进行第二次提交,数据也就算是成功写出了。
1.必须要保证检查点被打开了,如果检查点没有打开,那么之前说的一切话都是空谈。因为Flink默认检查点是关着的。
2.在FlinkKafakProcer连接器的构造函数中要传入参数,这个参数就是用来保证状态一致性的。就是在构造函数的最后一个参数输入如下:
3.配置Kafka读取数据的隔离级别
在kafka中有个配置,这个配置用来管理Kafka读取数据的级别。而这个配置默认是能够读取预提交阶段的数据的,所以如果你没改这个配置,那两阶段提交的第一阶段就是白费了。所以需要改一下这个配置,来更换一下隔离级别:
4.事务超时时间
这个配置也很有意思,大家试想一下。如果要进行两阶段提交,就要保证sink端支持事务,Kafka是支持事务的,但是像这个组件对于很多机制都有一个超时时间的概念,也就是说如果时间到了这个界限还没完成工作,那就会默认这个工作失败。Kafka中由这个概念,Flink中同样由这个概念。但是flink默认超时时间是1小时,而Kafka默认是15分钟,这就有可能出现检查点保存东西的时间大于15分钟,假如说是16分钟保存完成然后给sink发送检查点保存陈功可以提交事务的信号,但是这个时候Kafka已经认为事务失败,把之前的数据都扔了。那数据不就是丢失了么。所以说Kafka的超时时间要大于Flink的超时时间才好。
截止到目前为止,基本上把有关于状态维护的一些东西都说完了,有状态后端、有检查点。还通过检查点完成可端到端的数据精准一次性消费。但是想到这我又感觉,如果有学习进度比我差一些的,万一没办法很好的理解怎么办。所以在下一篇文章当中我就聊聊Flink中的“状态”到底是个什么东西,都有什么类型,都怎么去用。
9. Flink 状态编程
1.流式计算分为无状态和有状态两种情况。 无状态的计算观察每个独立事件,并根据最后一个事件输出结果。例如,流处理应用程序从传感器接收水位数据,并在水位超过指定高度时发出警告。有状态的计算则会基于多个事件输出结果。以下是一些例子。
(1)所有类型的窗口。例如,计算过去一小时的平均水位,就是有状态的计算。
(2)所有用于复杂事件处理的状态机。例如,若在一分钟内收到两个相差20cm以上的水位差读数,则发出警告,这是有状态的计算。
(3)流与流之间的所有关联操作,以及流与静态表或动态表之间的关联操作,都是有状态的计算。
2.下图展示了无状态流处理和有状态流处理的主要区别。 无状态流处理分别接收每条数据记录(图中的黑条),然后根据最新输入的数据生成输出数据(白条)。有状态流处理会维护状态(根据每条输入记录进行更新),并基于最新输入的记录和当前的状态值生成输出记录(灰条)。
上图中输入数据由黑条表示。无状态流处理每次只转换一条输入记录,并且仅根据最新的输入记录输出结果(白条)。有状态 流处理维护所有已处理记录的状态值,并根据每条新输入的记录更新状态,因此输出记录(灰条)反映的是综合考虑多个事件之后的结果。
3.有状态的算子和应用程序
Flink内置的很多算子,数据源source,数据存储sink都是有状态的,流中的数据都是buffer records,会保存一定的元素或者元数据。例如: ProcessWindowFunction会缓存输入流的数据,ProcessFunction会保存设置的定时器信息等等。
在Flink中,状态始终与特定算子相关联。总的来说,有两种类型的状态:
算子状态(operator state)
键控状态(keyed state)
4.算子状态(operator state)
算子状态的作用范围限定为算子任务。这意味着由同一并行任务所处理的所有数据都可以访问到相同的状态,状态对于同一任务而言是共享的。算子状态不能由相同或不同算子的另一个任务访问。
Flink为算子状态提供三种基本数据结构:
列表状态(List state):将状态表示为一组数据的列表。
联合列表状态(Union list state):也将状态表示为数据的列表。它与常规列表状态的区别在于,在发生故障时,或者从保存点(savepoint)启动应用程序时如何恢复。
广播状态(Broadcast state):如果一个算子有多项任务,而它的每项任务状态又都相同,那么这种特殊情况最适合应用广播状态
5.键控状态(keyed state)
键控状态是根据输入数据流中定义的键(key)来维护和访问的。Flink为每个键值维护一个状态实例,并将具有相同键的所有数据,都分区到同一个算子任务中,这个任务会维护和处理这个key对应的状态。当任务处理一条数据时,它会自动将状态的访问范围限定为当前数据的key。因此,具有相同key的所有数据都会访问相同的状态。Keyed State很类似于一个分布式的key-value map数据结构,只能用于KeyedStream(keyBy算子处理之后)。
6.状态后端(state backend)
每传入一条数据,有状态的算子任务都会读取和更新状态。由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务都会在本地维护其状态,以确保快速的状态访问。状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(state backend)
状态后端主要负责两件事:
1)本地的状态管理
2)将检查点(checkpoint)状态写入远程存储
状态后端分类:
(1)MemoryStateBackend
内存级的状态后端,会将键控状态作为内存中的对象进行管理,将它们存储在TaskManager的JVM堆上;而将checkpoint存储在JobManager的内存中。
(2)FsStateBackend
将checkpoint存到远程的持久化文件系统(FileSystem)上。而对于本地状态,跟MemoryStateBackend一样,也会存在TaskManager的JVM堆上。
(3)RocksDBStateBackend
将所有状态序列化后,存入本地的RocksDB中存储。
7.状态一致性
当在分布式系统中引入状态时,自然也引入了一致性问题。一致性实际上是"正确性级别"的另一种说法,也就是说在成功处理故障并恢复之后得到的结果,与没有发生任何故障时得到的结果相比,前者到底有多正确?举例来说,假设要对最近一小时登录的用户计数。在系统经历故障之后,计数结果是多少?如果有偏差,是有漏掉的计数还是重复计数?
1)一致性级别
在流处理中,一致性可以分为3个级别:
(1) at-most-once : 这其实是没有正确性保障的委婉说法——故障发生之后,计数结果可能丢失。同样的还有udp。
(2) at-least-once : 这表示计数结果可能大于正确值,但绝不会小于正确值。也就是说,计数程序在发生故障后可能多算,但是绝不会少算。
(3) exactly-once : 这指的是系统保证在发生故障后得到的计数结果与正确值一致。
曾经,at-least-once非常流行。第一代流处理器(如Storm和Samza)刚问世时只保证at-least-once,原因有二。
(1)保证exactly-once的系统实现起来更复杂。这在基础架构层(决定什么代表正确,以及exactly-once的范围是什么)和实现层都很有挑战性。
(2)流处理系统的早期用户愿意接受框架的局限性,并在应用层想办法弥补(例如使应用程序具有幂等性,或者用批量计算层再做一遍计算)。
最先保证exactly-once的系统(Storm Trident和Spark Streaming)在性能和表现力这两个方面付出了很大的代价。为了保证exactly-once,这些系统无法单独地对每条记录运用应用逻辑,而是同时处理多条(一批)记录,保证对每一批的处理要么全部成功,要么全部失败。这就导致在得到结果前,必须等待一批记录处理结束。因此,用户经常不得不使用两个流处理框架(一个用来保证exactly-once,另一个用来对每个元素做低延迟处理),结果使基础设施更加复杂。曾经,用户不得不在保证exactly-once与获得低延迟和效率之间权衡利弊。Flink避免了这种权衡。
Flink的一个重大价值在于, 它既保证了 exactly-once ,也具有低延迟和高吞吐的处理能力 。
从根本上说,Flink通过使自身满足所有需求来避免权衡,它是业界的一次意义重大的技术飞跃。尽管这在外行看来很神奇,但是一旦了解,就会恍然大悟。
2)端到端(end-to-end)状态一致性
目前我们看到的一致性保证都是由流处理器实现的,也就是说都是在 Flink 流处理器内部保证的;而在真实应用中,流处理应用除了流处理器以外还包含了数据源(例如 Kafka)和输出到持久化系统。
端到端的一致性保证,意味着结果的正确性贯穿了整个流处理应用的始终;每一个组件都保证了它自己的一致性,整个端到端的一致性级别取决于所有组件中一致性最弱的组件。具体可以划分如下:
1)source端 —— 需要外部源可重设数据的读取位置
2)link内部 —— 依赖checkpoint
3)sink端 —— 需要保证从故障恢复时,数据不会重复写入外部系统
而对于sink端,又有两种具体的实现方式:幂等(Idempotent)写入和事务性(Transactional)写入。
4)幂等写入
所谓幂等操作,是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。
5)事务写入
需要构建事务来写入外部系统,构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中。
对于事务性写入,具体又有两种实现方式:预写日志(WAL)和两阶段提交(2PC)。
8.检查点(checkpoint)
Flink具体如何保证exactly-once呢? 它使用一种被称为"检查点"(checkpoint)的特性,在出现故障时将系统重置回正确状态。下面通过简单的类比来解释检查点的作用。
9.Flink+Kafka如何实现端到端的exactly-once语义
我们知道,端到端的状态一致性的实现,需要每一个组件都实现,对于Flink + Kafka的数据管道系统(Kafka进、Kafka出)而言,各组件怎样保证exactly-once语义呢?
1)内部 —— 利用checkpoint机制,把状态存盘,发生故障的时候可以恢复,保证内部的状态一致性
2)source —— kafka consumer作为source,可以将偏移量保存下来,如果后续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据,保证一致性
3)sink —— kafka procer作为sink,采用两阶段提交 sink,需要实现一个TwoPhaseCommitSinkFunction
内部的checkpoint机制我们已经有了了解,那source和sink具体又是怎样运行的呢?接下来我们逐步做一个分析。
我们知道Flink由JobManager协调各个TaskManager进行checkpoint存储,checkpoint保存在 StateBackend中,默认StateBackend是内存级的,也可以改为文件级的进行持久化保存。
当checkpoint 启动时,JobManager 会将检查点分界线(barrier)注入数据流;barrier会在算子间传递下去。
每个算子会对当前的状态做个快照,保存到状态后端。对于source任务而言,就会把当前的offset作为状态保存起来。下次从checkpoint恢复时,source任务可以重新提交偏移量,从上次保存的位置开始重新消费数据。
每个内部的transform 任务遇到 barrier 时,都会把状态存到 checkpoint 里。
sink 任务首先把数据写入外部 kafka,这些数据都属于预提交的事务(还不能被消费);当遇到 barrier 时,把状态保存到状态后端,并开启新的预提交事务。
当所有算子任务的快照完成,也就是这次的 checkpoint 完成时,JobManager 会向所有任务发通知,确认这次 checkpoint 完成。
当sink 任务收到确认通知,就会正式提交之前的事务,kafka 中未确认的数据就改为“已确认”,数据就真正可以被消费了。
所以我们看到,执行过程实际上是一个两段式提交,每个算子执行完成,会进行“预提交”,直到执行完sink操作,会发起“确认提交”,如果执行失败,预提交会放弃掉。
具体的两阶段提交步骤总结如下:
1)第一条数据来了之后,开启一个 kafka 的事务(transaction),正常写入 kafka 分区日志但标记为未提交,这就是“预提交”
2)触发 checkpoint 操作,barrier从 source 开始向下传递,遇到 barrier 的算子将状态存入状态后端,并通知jobmanager
3)sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据
4)jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成
5)sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据
6)外部kafka关闭事务,提交的数据可以正常消费了。
10. 端到端的精确一次保证
Flink 任务 failover 之后,可能会重复写出数据到 Sink 中,你们公司是怎么做到端对端 exactly-once 的?
端对端 exactly-once 有 3 个条件:
⭐ Source 引擎可以重新消费,比如 Kafka 可以重置 offset 进行重新消费
⭐ Flink 任务配置 exactly-once,保证 Flink 任务 State 的 exactly-once
⭐ Sink 算子支持两阶段或者可重入,保证产出结果的 exactly-once
其中前两项一般大多数引擎都支持,我们需要关注的就是第 3 项,目前有两种常用方法:
⭐ Sink 两阶段:由于两阶段提交是随着 Checkpoint 进行的,假设 Checkpoint 是 5min 做一次,那么数据对下游消费方的可见性延迟至少也是 5min,所以会有数据延迟等问题,目前用的比较少。
⭐ Sink 支持可重入:举例:
⭐ Sink 为 MySQL:可以按照 key update 数据
⭐ Sink 为 Druid:聚合类型可以选用 longMax
⭐ Sink 为 ClickHouse:查询时使用 longMax 或者使用 ReplacingMergeTree 表引擎将重复写入的数据去重,这里有小伙伴会担心 ReplacingMergeTree 会有性能问题,但是博主认为其实性能影响不会很大,因为 failover 导致的数据重复其实一般情况下是小概率事件,并且重复的数据量也不会很大,也只是一个 Checkpoint 周期内的数据重复,所以使用 ReplacingMergeTree 是可以接受的)
⭐ Sink 为 Redis:按照 key 更新数据
其他解答:Flink状态一致性、端到端的精确一次保证
状态一致性:当在分布式系统中引入状态时,自然也引入了一致性问题。一致性实际上是"正确性级别"的另一种说法,也就是说在成功处理故障并恢复之后得到的结果,与没有发生任何故障时得到的结果相比,前者到底有多正确?举例来说,假设要对最近一小时登录的用户计数。在系统经历故障之后,计数结果是多少?如果有偏差,是有漏掉的计数还是重复计数? 对于流处理内部来说,所谓的状态一致性,其实就是我们所说的计算结果保证准确。在遇到故障时可以恢复状态,恢复以后的重新计算,结果应该也是完全正确的。 一条数据不应该丢失,也不应该重复计算
一致性可以分为 3 个级别: at-most-once(最多一次):计数结果可能丢失
at-least-once (至少一次):计数程序在发生故障后可能多算,但是绝不会少算。
exactly-once (精确一次):系统保证在发生故障后得到的计数结果与正确值一致。
数据流(DataStream)内部保证exactly-once (精确一次)的方法:Flink 使用了一种轻量级快照机制 ---- 检查点(checkpoint)来保证 exactly-once 语义
有状态应用的一致检查点,其实就是:所有任务的状态,在某个时间点的一份拷贝(一份快照)。而这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时候。
端到端保证一致性:
内部保证 —— 依赖 checkpoint
source 端 —— 需要外部源可重设数据的读取位置
sink 端 —— 需要保证从故障恢复时,数据不会重复写入外部系统
而对于 sink 端,又有两种具体的实现方式:幂等(Idempotent)写入和事务性(Transactional)写入。
幂等操作:是说一个操作,可以重复执行很多次,但只导致一次结果更改,也就是说,后面再重复执行就不起作用了。 例如Hashmap 的写入插入操作是幂等的操作,重复写入,写入的结果还一样。
事务写入:构建的事务对应着 checkpoint,等到 checkpoint 真正完成的时候,才把所有对应的结果写入 sink 系统中
对于事务性写入,具体又有两种实现方式: 预写日志(WAL)和两阶段提交(2PC) 。DataStream API 提供了 GenericWriteAheadSink 模板类和TwoPhaseCommitSinkFunction 接口,可以方便地实现这两种方式的事务性写入。其中 预写日志(WAL)只能保证至少一次精确。
Flink+Kafka 端到端状态一致性的保证
内部 —— 利用 checkpoint 机制,把状态存盘,发生故障的时候可以恢复, 保证内部的状态一致性
source —— kafka consumer 作为 source,可以将偏移量保存下来,如果后 续任务出现了故障,恢复的时候可以由连接器重置偏移量,重新消费数据, 保证一致性
sink —— kafka procer 作为 sink,采用两阶段提交 sink,需要实现一个 TwoPhaseCommitSinkFunction
由于端到端保证一致性需要用到两阶段提交(2PC)TwoPhaseCommitSinkFunction,我们来了解一下两阶段提交的方式:
第一条数据来了之后,开启一个 kafka 的事务(transaction),正常写入 kafka 分区日志但标记为未提交,这就是“预提交”
jobmanager 触发 checkpoint 操作,barrier 从 source 开始向下传递,遇到 barrier 的算子将状态存入状态后端,并通知 jobmanager
sink 连接器收到 barrier,保存当前状态,存入 checkpoint,通知 jobmanager,并开启下一阶段的事务,用于提交下个检查点的数据
jobmanager 收到所有任务的通知,发出确认信息,表示 checkpoint 完成
sink 任务收到 jobmanager 的确认信息,正式提交这段时间的数据
外部 kafka 关闭事务,提交的数据可以正常消费了。
我们也可以看到,如果宕机需要通过 StateBackend 进行恢复,只能恢复所有确认提交的操作,之于有关后端状态的选择,后面再单独聊聊