❶ 基于flink sql构建实时数据仓库
根据目前大数据这一块的发展,已经不局限于离线的分析,挖掘数据潜在的价值,数据的时效性最近几年变得刚需,实时处理的框架有storm,spark-streaming,flink等。想要做到实时数据这个方案可行,需要考虑以下几点:1、状态机制 2、精确一次语义 3、高吞吐量 4、可弹性伸缩的应用 5、容错机制,刚好这几点,flink都完美的实现了,并且支持flink sql高级API,减少了开发成本,可用实现快速迭代,易维护等优点。
离线数仓的架构图:
实时数仓架构图:
目前是将实时维度表和DM层数据存于hbase当中,实时公共层都存于kafka当中,并且以写滚动日志的方式写入HDFS(主要是用于校验数据)。其实在这里可以做的工作还有很多,kafka集群,flink集群,hbase集群相互独立,这对整个实时数据仓库的稳定性带来一定的挑战。
一个数据仓库想要成体系,成资产,离不开数据域的划分。所以参考着离线的数据仓库,想着在实时数仓做出这方面的探索,理论上来讲,离线可以实现的,实时也是可以实现的。 并且目前已经取得了成效,目前划分的数据域跟离线大致相同,有流量域,交易域,营销域等等。当然这里面涉及到维表,多事务事实表,累计快照表,周期性快照表的设计,开发,到落地这里就不详述了。
维度表也是整个实时数据仓库不可或缺的部分。从目前整个实时数仓的建设来看,维度表有着数据量大,但是变更少的特点,我们试想过构建全平台的实时商品维度表或者是实时会员维度表,但是这类维度表太过于复杂,所以针对这类维度表下面介绍。还有另外一种就是较为简单的维度表,这类维度可能对应着业务系统单个mysql表,或者只需要几个表进行简单ETL就可以产出的表,这类维表是可以做成实时的。以下有几个实施的关键点:
如下是离线数据同步架构图:
实时数据的接入其实在底层架构是一样的,就是从kafka那边开始不一样,实时用flink的UDTF进行解析,而离线是定时(目前是小时级)用camus拉到HDFS,然后定时load HDFS的数据到hive表里面去,这样来实现离线数据的接入。实时数据的接入是用flink解析kafka的数据,然后在次写入kafka当中去。
由于目前离线数据已经稳定运行了很久,所以实时接入数据的校验可以对比离线数据,但是离线数据是小时级的hive数据,实时数据存于kafka当中,直接比较不了,所以做了相关处理,将kafka的数据使用flink写HDFS滚动日志的形式写入HDFS,然后建立hive表小时级定时去load HDFS中的文件,以此来获取实时数据。
完成以上两点,剩余还需要考虑一点,都是小时级的任务,这个时间卡点使用什么字段呢?首先要确定一点就是离线和实时任务卡点的时间字段必须是一致的,不然肯定会出问题。目前离线使用camus从kafka将数据拉到HDFS上,小时级任务,使用nginx_ts这个时间字段来卡点,这个字段是上报到nginx服务器上记录的时间点。而实时的数据接入是使用flink消费kafka的数据,在以滚动日志的形式写入HDFS的,然后在建立hive表load HDFS文件获取数据,虽然这个hive也是天/小时二级分区,但是离线的表是根据nginx_ts来卡点分区,但是实时的hive表是根据任务启动去load文件的时间点去区分的分区,这是有区别的,直接筛选分区和离线的数据进行对比,会存在部分差异,应当的做法是筛选范围分区,然后在筛选nginx_ts的区间,这样在跟离线做对比才是合理的。
目前实时数据接入层的主要时延是在UDTF函数解析上,实时的UDTF函数是根据上报的日志格式进行开发的,可以完成日志的解析功能。
解析流程图如下:
解析速率图如下:
该图还不是在峰值数据量的时候截的,目前以800记录/second为准,大概一个记录的解析速率为1.25ms。
目前该任务的flink资源配置核心数为1,假设解析速率为1.25ms一条记录,那么峰值只能处理800条/second,如果数据接入速率超过该值就需要增加核心数,保证解析速率。
介绍一下目前离线维度表的情况,就拿商品维度表来说,全线记录数将近一个亿,计算逻辑来自40-50个ods层的数据表,计算逻辑相当复杂,如果实时维度表也参考离线维度表来完成的话,那么开发成本和维护成本非常大,对于技术来讲也是很大的一个挑战,并且目前也没有需求要求维度属性百分百准确。所以目前(伪实时维度表)准备在当天24点产出,当天的维度表给第二天实时公共层使用,即T-1的模式。伪实时维度表的计算逻辑参考离线维度表,但是为了保障在24点之前产出,需要简化一下离线计算逻辑,并且去除一些不常用的字段,保障伪实时维度表可以较快产出。
实时维度表的计算流程图:
目前使用flink作为公司主流的实时计算引擎,使用内存作为状态后端,并且固定30s的间隔做checkpoint,使用HDFS作为checkpoint的存储组件。并且checkpoint也是作为任务restart以后恢复状态的重要依据。熟悉flink的人应该晓得,使用内存作为状态后端,这个内存是JVM的堆内存,毕竟是有限的东西,使用不得当,OOM是常有的事情,下面就介绍一下针对有限的内存,如果完成常规的计算。
❷ 基于Flink的实时计算平台的构建
一、系统架构
1. 接入层
Canal、Flume、Kafka
针对业务系统数据,Canal监控Binlog日志,发送至kafka;
针对日志数据,由Flume来进行统一收集,并发送至kafka。
消息队列的数据既是离线数仓的原始数据,也是实时计算的原始数据,这样可以保证实时和离线的原始数据是统一的。
2. 计算层
Flink
有了源数据,在 计算层 经过Flink实时计算引擎做一些加工处理,然后落地到存储层中不同存储介质当中。
3. 存储层
HBase、Kafka、ES、Mysql、Hive、Redis
不同的 存储介质 是通过不同的应用场景来选择。
4. 数据应用层
风控、模型、图谱、大屏展示
通过存储层应用于不同的 数据应用 ,数据应用可能是我们的正式产品或者直接的业务系统
二、技术实现
1. 计算引擎
实时计算引擎的功能要求
提供高级 API,支持常见的数据操作比如关联聚合,最好是能支持 SQL
具有状态管理和自动支持久化方案,减少对存储的依赖
可靠的容错机制,低延时,最好能够保证Exactly-once
Flink的优势
Flink的API、容错机制与状态管理都满足实时数仓计算引擎的需求
Flink高吞吐、低延时的特性
端到端的Exactly-once
WaterMark&Event Time的支持
Flink 不仅支持了大量常用的 SQL 语句,还有丰富的数据类型、内置函数以及灵活的自定义函数,基本覆盖了我们的开发场景
2. 存储引擎
根据不同的业务场景,使用最适合的存储引擎:
Kafka主要用于中间数据表的存储
ES主要针对日志数据的存储和分析
HBase、Redis可用于维表存储
Hive用于数据校验
Mysql可以用于指标计算结果的存储
三、数据分层
数据源:目前数据源主要是Binlog,通过Canal监控各个业务系统的Mysql,将binlog发送至kafka。
ODS层:主要将Binlog数据存储至Kafka,这一层不对数据进行任何操作,存储最原始的数据,Binlog 日志在这一层为库级别,即:一个库的变更数据存放在同一个 Kafka Topic 中。
DWD层:主要对数据进行简单的清洗。拆分主题,将库级别的主题拆分为表级别;打平数据,将data数组格式打平。
DWS层:主要根据不同的业务的需求,将该需求所涉及到的表进行join所得。
APP层:根据指标计算需求,对数据进行处理后,存储HBase,为了方便模型查询,主要将表存储为索引表和明细表,直接对数据进行指标计算后,将计算结果存储到HBase。
四、数据监控及校验
1. 数据监控
目前数据的监控的架构是pushgateway + Prometheus + Grafana
数据监控主要是接入Flink的Metric,通过Grafana对Flink系统指标及自定义指标进行图形化界面的展示,对关键指标进行监控报警
2. 数据校验
目前数据的监控的架构是Grafana + Mysql
Grafana用于监控指标的展示及相关阈值数据的报警,Mysql主要用于监控数据的存储
将每个服务的source收到的数据、sink发出的数据,根据表的不同将数据关键字段写入mysql中,通过统计各个阶段各个表中的数据条数,对数据完整性进行监控校验,若出现数据缺时,先查找原因,然后指定时间戳重启服务
五、系统管理
元数据管理
表,字段元数据管理,实时感知元数据的变化,大幅度降低使用数据的成本。
系统配置
对应用启动参数及相关配置参数的管理,对任务进行灵活配置及管理。
血缘管理
主要是梳理实时计算平台中数据依赖关系,以及实时任务的依赖关系,从底层ODS到DWD再到DWS,以及APP层用到哪些数据,将整个链度串联起来。
六、问题及解决方案
1. 数据倾斜
由于要拆分主题,要以table为key对数据进行keyBy,但是由于每个表的数据量相差较大,会出现数据倾斜
解决方案:
加盐,给key加前缀
前缀不能随便加,为了保证同一id的数据在相同的分区中,所以根据id_table进行keyBy
2. 数据重复
任务在进行自动或手动重启时,为了保证数据不丢失,数据会出现重复计算的问题,如果下游只是对数据进行HBase存储的话,由于幂等性,这种重复可以解。但是,如果下游要对数据进行聚合,这样会导致数据被计算多次,影响计算结果的准确性
解决方案:
上游在对数据进行发送时,对kafka procer 进行 exactly once的设置
在对数据统计时进行数据去重
3. 数据延时
由于所处理的数据表的大小不一样,处理大表时,会出现数据延时的问题。
解决方案:
针对大表数据增加并行度
4.数据乱序
由于Flink kafka procer默认是根据hash对数据进行随机分区,kafka consumer在对数据进行消费时,每个分区消费速度不同,这样最终在存储数据时,就会出现乱序即相同的id会出现老数据覆盖新数据的问题
解决方案:
对kafka每个阶段进行自定义分区,将id相同的数据分到同一个分区,保证同一id的数据的有序性
由于整个数据处理过程中可能会出现shuffle,导数数据重新乱序,所以在对数据存储前对数据进行排序
对数据进行排序的关键点时要保证每条数据的唯一性,即要有标记数据先后顺序的字段
5 . 数据唯一标记(很重要)
由于要对数据进行去重或者排序,所以要保证数据的唯一性
解决办法:
使用时间戳不可以,因为数据量很大的情况下,同一时间会处理上百条数据
在最初发出数据的时候,为数据打上标记,使用 partition + offset + idx 的组合来确认数据的唯一性及顺序性
6. 数据可靠性
我们对服务重启或对服务升级时,可能会出现数据的丢失
解决方案:
结合Flink 的checkpoint及savepoint机制保证数据的可靠性
开启Flink的checkpoint机制,服务进行自动重启时,会自动读取上次保存在checkpoint中offset,或者我们指定offset进行数据消费
对服务进行升级时,先将服务的状态保存至savepoint中,重启时指定savepoint进行服务启动,保证数据不丢失
7. 无感升级
由于我们目前数据量比较庞大,且在对服务进行升级时,耗时较长,会影响调用方的使用。
解决办法:
在对服务进行升级时,将数据写入备用库,等数据追上且服务稳定运行后,再将存储库进行切换
❸ 《Flink基础教程》pdf下载在线阅读,求百度网盘云资源
《Flink基础教程》([美] 埃伦•弗里德曼)电子书网盘下载免费在线阅读
链接:https://pan..com/s/1tm7Vs-V-SUnv7jA3MMLF0Q
书名:Flink基础教程
作者:[美] 埃伦•弗里德曼
译者:王绍翾
豆瓣评分:6.0
出版社:人民邮电出版社
出版年份:2018-8
页数:96
内容简介:
作为新一代的开源流处理器,Flink是众多大数据处理框架中一颗冉冉升起的新星。它以同一种技术支持流处理和批处理,并能同时满足高吞吐、低延迟和容错的需求。本书由Flink项目核心成员执笔,系统阐释Flink的适用场景、设计理念、功能、用途和性能优势。
作者简介:
埃伦·弗里德曼(Ellen Friedman)
解决方案咨询师,知名大数据相关技术布道师,在流处理架构和大数据处理框架等方面有多部着作。
科斯塔斯·宙马斯(Kostas Tzoumas)
Flink项目核心成员,data Artisans公司联合创始人兼首席执行官,在流处理和数据科学领域经验丰富。
译者介绍
王绍翾
阿里巴巴资深技术专家,Apache Flink Committer,淘宝花名“大沙”。毕业于北京大学信息科学技术学院,后取得加州大学圣地亚哥分校计算机工程博士学位。目前就职于阿里巴巴计算平台事业部,负责Flink SQL引擎及机器学习的相关开发。加入阿里巴巴之前,在Facebook开发分布式图存储系统TAO。曾多次拜访由Flink创始团队创办的公司data Artisans,并与其首席执行官科斯塔斯·宙马斯(本书作者之一)以及首席技术官斯蒂芬·尤恩有着广泛的合作。
❹ 实时计算组件有哪些
实时计算的组件有很多,数据采集组件及中间件:Flume、Sqoop、Kafka、Logstash、Splunk等。大数据集群核心组件:Hadoop、Hive、Impala、HBase、Spark(Core、SQL、Streaming、MLlib)、Flink、Zookeeper等,大概如下:
数据从底层的数据源开始,经过Kafka、Flume等数据组件进行收集,然后分成两条线进行计算:
一条线是进入流式计算平台(例如 Storm、Flink或者SparkStreaming),去计算实时的一些指标;
另一条线进入批量数据处理离线计算平台(例如Maprece、Hive,Spark SQL),去计算T+1的相关业务指标,这些指标需要隔日才能看见。
这就是实时计算所需的组件了。
❺ flink 1.10 1.12区别
flink 1.10 1.12区别在于Flink 1.12 支持了 Flink SQL Kafka upsert connector 。
因为在 Flink 1.10 中,当前这类任务开发对于用户来说,还是不够友好,需要很多代码,同时也会造成 Flink SQL 冗长。
Flink 1.12 SQL Connector 支持 Kafka Upsert Connector,这也是我们公司内部业务方对实时平台提出的需求。
收益:便利用户有这种需要从 kafka 取最新记录操作的实时任务开发,比如这种 binlog -> kafka,然后用户聚合操作,这种场景还是非常多的,这能提升实时作业开发效率,同时 1.12 做了优化,性能会比单纯的 last_value 性能要好。
Flink Yarn 作业 On k8s 的生产级别能力是:
Flink Jar 作业已经全部 K8s 化,Flink SQL 作业由于是推广初期,还是在 Yarn 上面进行运行,为了将实时计算 Flink 全部K8s化。
所以我们 Flink SQL 作业也需要迁移到 K8s,目前 Flink 1.12 已经满足生产级别的 Flink k8s 功能,所以 Flink SQL K8s 化,打算直接使用社区的 On k8s 能力。
风险:虽然和社区的人沟通,Flink 1.12 on k8s 没有什么问题,但是具体功能还是需要先 POC 验证一下,同时可能社区 Flink on k8s 的能力。
可能会限制我们这边一些 k8s 功能使用,比如 hostpath volome 以及 Ingress 的使用,这里可能需要改底层源码来进行快速支持(社区有相关 JIRA 要做)。
❻ flink窗口的种类及详述
flink窗口的种类及详述:
滚动窗口(tumblingwindow)将事件分配到长度固定且互不重叠的桶中。
实际案例:简单且常见的分维度分钟级别同时在线用户数、总销售额
Java设置语句:window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
该语句为设置滚动窗口的窗口时长为5秒钟
sql设置语句:FROM TABLE(TUMBLE(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '60' SECOND))
Windowing TVF 滚动窗口的写法就是把 tumble window 的声明写在了数据源的 Table 子句中,即 TABLE(TUMBLE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '60' SECOND)),包含三部分参数。
第一个参数 TABLE source_table 声明数据源表;第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;第三个参数 INTERVAL '60' SECOND 声明滚动窗口大小为 1 min
滑动窗口:分配器将每个元素分配给固定窗口大小的窗口。与滚动窗口分配器类似,窗口的大小由 window size 参数配置。还有一个window slide参数用来控制滑动窗口的滑动大小。因此,如果滑动大小小于窗口大小,则滑动窗口会重叠。在这种情况下,一个元素会被分配到多个窗口中。
实际案例:简单且常见的分维度分钟级别同时在线用户数,1 分钟输出一次,计算最近 5 分钟的数据
java设置语句:window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
window size :窗口大小为 10秒钟
window slide:窗口间隔为5秒钟
sql设置语句: hop(row_time, interval '1' minute, interval '5' minute)
第一个参数为事件时间的时间戳;第二个参数为滑动窗口的滑动步长;第三个参数为滑动窗口大小。
会话窗口:分配器通过活动会话对元素进行分组。与滚动窗口和滑动窗口相比,会话窗口不会重叠,也没有固定的开始和结束时间。当会话窗口在一段时间内没有接收到元素时会关闭。会话窗口分配器需要配置一个会话间隙,定义了所需的不活动时长。当此时间段到期时,当前会话关闭,后续元素被分配到新的会话窗口。
实际案例:计算每个用户在活跃期间(一个 Session)总共购买的商品数量,如果用户 5 分钟没有活动则视为 Session 断开
设置语句:基于事件时间的会话窗口window(EventTimeSessionWindows.withGap(Time.minutes(10)))
基于处理时间的会话窗口
Java设置:window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
会话间隙,不活动时长为10秒钟
sql设置:session(row_time, interval '5' minute)
Group Window Aggregation 中 Session 窗口的写法就是把 session window 的声明写在了 group by 子句中
Session 窗口即支持 处理时间 也支持 事件时间。但是处理时间只支持在 Streaming 任务中运行,Batch 任务不支持。
渐进式窗口:在其实就是 固定窗口间隔内提前触发的的滚动窗口,其实就是 Tumble Window + early-fire 的一个事件时间的版本。例如,从每日零点到当前这一分钟绘制累积 UV,其中 10:00 时的 UV 表示从 00:00 到 10:00 的 UV 总数。渐进式窗口可以认为是首先开一个最大窗口大小的滚动窗口,然后根据用户设置的触发的时间间隔将这个滚动窗口拆分为多个窗口,这些窗口具有相同的窗口起点和不同的窗口终点。
应用场景:周期内累计 PV,UV 指标(如每天累计到当前这一分钟的 PV,UV)。这类指标是一段周期内的累计状态,对分析师来说更具统计分析价值,而且几乎所有的复合指标都是基于此类指标的统计(不然离线为啥都要累计一天的数据,而不要一分钟累计的数据呢)。
实际案例:每天的截止当前分钟的累计 money(sum(money)),去重 id 数(count(distinct id))。每天代表渐进式窗口大小为 1 天,分钟代表渐进式窗口移动步长为分钟级别
sql设置:FROM TABLE(CUMULATE(
TABLE source_table
, DESCRIPTOR(row_time)
, INTERVAL '60' SECOND
, INTERVAL '1' DAY))
Windowing TVF 滚动窗口的写法就是把 cumulate window 的声明写在了数据源的 Table 子句中,即 TABLE(CUMULATE(TABLE source_table, DESCRIPTOR(row_time), INTERVAL '60' SECOND, INTERVAL '1' DAY)),其中包含四部分参数:
第一个参数 TABLE source_table 声明数据源表;第二个参数 DESCRIPTOR(row_time) 声明数据源的时间戳;第三个参数 INTERVAL '60' SECOND 声明渐进式窗口触发的渐进步长为 1 min。第四个参数 INTERVAL '1' DAY 声明整个渐进式窗口的大小为 1 天,到了第二天新开一个窗口重新累计
全局窗口:分配器将具有相同 key 的所有元素分配给同一个全局窗口。仅当我们指定自定义触发器时,窗口才起作用。否则,不会执行任何计算,因为全局窗口没有我们可以处理聚合元素的自然结束的点(译者注:即本身自己不知道窗口的大小,计算多长时间的元素)
window(GlobalWindows.create())
平时滑动窗口用得比较多,其次是滚动窗口
❼ flink sql 知其所以然(十三):流 join问题解决
本节是 flink sql 流 join 系列的下篇,上篇的链接如下:
废话不多说,咱们先直接上本文的目录和结论,小伙伴可以先看结论快速了解博主期望本文能给小伙伴们带来什么帮助:
书接上文,上文介绍了曝光流在关联点击流时,使用 flink sql regular join 存在的 retract 问题。
本文介绍怎么使用 flink sql interval join 解决这些问题。
flink sql 知其所以然(十二):流 join 很难嘛???(上)
看看上节的实际案例,来看看在具体输入值的场景下,输出值应该长啥样。
场景:即常见的曝光日志流(show_log)通过 log_id 关联点击日志流(click_log),将数据的关联结果进行下发。
来一波输入数据:
曝光数据:
点击数据:
预期输出数据如下:
上节的 flink sql regular join 解决方案如下:
上节说道,flink sql left join 在流数据到达时,如果左表流(show_log)join 不到右表流(click_log) ,则不会等待右流直接输出(show_log,null),在后续右表流数据代打时,会将(show_log,null)撤回,发送(show_log,click_log)。这就是为什么产生了 retract 流,从而导致重复写入 kafka。
对此,我们也是提出了对应的解决思路,既然 left join 中左流不会等待右流,那么能不能让左流强行等待右流一段时间,实在等不到在数据关联不到的数据即可。
当当当!!!
本文的 flink sql interval join 登场,它就能等。
大家先通过下面这句话和图简单了解一下 interval join 的作用(熟悉 DataStream 的小伙伴萌可能已经使用过了),后续会详细介绍原理。
interval join 就是用一个流的数据去关联另一个流的一段时间区间内的数据。关联到就下发关联到的数据,关联不到且在超时后就根据是否是 outer join(left join,right join,full join)下发没关联到的数据。
<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">interval join</figcaption>
来看看上述案例的 flink sql interval join sql 怎么写:
这里设置了 show_log.row_time BETWEEN click_log.row_time - INTERVAL ཆ' MINUTE AND click_log.row_time + INTERVAL ཆ' MINUTE 代表 show_log 表中的数据会和 click_log 表中的 row_time 在前后 10 分钟之内的数据进行关联。
运行结果如下:
如上就是我们期望的正确结果了。
flink web ui 算子图如下:
<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">flink web ui</figcaption>
那么此时你可能有一个问题,结果中的前两条数据 join 到了输出我是理解的,那当 show_log join 不到 click_log 时为啥也输出了?原理是啥?
博主带你们来定位到具体的实现源码。先看一下 transformations。
<figcaption style="margin: 5px 0px 0px; padding: 0px; outline: 0px; max-width: 100%; box-sizing: border-box !important; overflow-wrap: break-word !important; text-align: center; color: rgb(136, 136, 136); font-size: 12px; font-family: PingFangSC-Light;">transformations</figcaption>
可以看到事件时间下 interval join 的具体 operator 是 org.apache.flink.table.runtime.operators.join. 。
其核心逻辑就集中在 processElement1 和 processElement2 中,在 processElement1 和 processElement2 中使用 org.apache.flink.table.runtime.operators.join.interval.RowTimeIntervalJoin 来处理具体 join 逻辑。 RowTimeIntervalJoin 重要方法如下图所示。
TimeIntervalJoin
下面详细给大家解释一下。
join 时,左流和右流会在 interval 时间之内相互等待,如果等到了则输出数据[+(show_log,click_log)],如果等不到,并且另一条流的时间已经推进到当前这条数据在也不可能 join 到另一条流的数据时,则直接输出[+(show_log,null)],[+(null,click_log)]。
举个例子, show_log.row_time BETWEEN click_log.row_time - INTERVAL ཆ' MINUTE AND click_log.row_time + INTERVAL ཆ' MINUTE , 当 click_log 的时间推进到 2021-11-01 11:00:00 时,这时 show_log 来一条 2021-11-01 02:00:00 的数据, 那这条 show_log 必然不可能和 click_log 中的数据 join 到了,因为 click_log 中 2021-11-01 01:50:00 到 2021-11-01 02:10:00 之间的数据以及过期删除了。则 show_log 直接输出 [+(show_log,null)]
以上面案例的 show_log(左表) interval join click_log(右表) 为例(不管是 inner interval join,left interval join,right interval join 还是 full interval join,都会按照下面的流程执行):
上面只是左流 show_log 数据到达时的执行流程(即 ProcessElement1 ),当右流 click_log 到达时也是完全类似的执行流程(即 ProcessElement2 )。
小伙伴萌在使用 interval join 需要注意的两点事项:
本文主要介绍了 flink sql interval 是怎么避免出现 flink regular join 存在的 retract 问题的,并通过解析其实现说明了运行原理,博主期望你读完本文之后能了解到:
❽ flinksql自定义topN函数的代码
摘要 当前 Flink 有如下几种函数:
❾ flink sql 近3天登录次数
flink sql 近3天登录次数如下
1、获取最近七天活跃的用户,并对用户活跃日期进行排序。
2、计算用户活跃日期与排名的差值。
3、对用户及差值进行分组。
4、统计差值个数取出差值个数大于3的数据(即连续登陆三天以上的用户)。
5、对数据进行去重。
❿ flinksql自定义topN函数的代码
摘要 package day07;