精华内容
下载资源
问答
  • emtumble:使用C ++编写的Turing Tumble模拟器
  • 常用的窗口 实战篇-简单的 tumble window 案例和运行原理 先看一个 datastream 窗口案例 flink sql tumble window 的语义 tumble window 实际案例 GeneratedWatermarkGenerator - flink 1.12.1 ...

    图片

    感谢您的小爱心(关注  +  点赞 + 再看),对博主的肯定,会督促博主持续的输出更多的优质实战内容!!!

    1.序篇-本文结构

    大数据羊说

    大数据羊说

    用数据提升美好事物发生的概率~

    34篇原创内容

    公众号

    源码公众号后台回复flink sql tumble window 的奇妙解析之路获取。

    针对 datastream api 大家都比较熟悉了,还是那句话,在 datastream 中,你写的代码逻辑是什么样的,它最终的执行方式就是什么样的。

    但是对于 flink sql 的执行过程,大家还是不熟悉的。上节使用 ETL,group agg(sum,count等)简单聚合类 query 带大家走进一条 flink sql query 逻辑的世界。帮大家至少能够熟悉在 flink sql 程序运行时知道 flink 程序在干什么。

    此节就是窗口聚合章节的第一篇,以一个最简单、最常用的分钟 tumble window 聚合案例给大家介绍其使用方式和原理。

    由于 flink 1.13 引入了 window tvf,所以 1.13 和 1.12 及之前版本的实现不同。本节先介绍 flink 1.12 及之前的 tumble window 实现。这也是大家在引入 flink sql 能力时最常使用的。

    本节依然从以下几个章节给大家详细介绍 flink sql 的能力。

    1. 目标篇-本文能帮助大家了解 flink sql 什么?
    • 回顾上节的 flink sql 适用场景的结论
    1. 概念篇-先聊聊常见的窗口聚合
    • 窗口竟然拖慢数据产出?

    • 常用的窗口

    1. 实战篇-简单的 tumble window 案例和运行原理
    • 先看一个 datastream 窗口案例

    • flink sql tumble window 的语义

    • tumble window 实际案例

    • GeneratedWatermarkGenerator - flink 1.12.1

    • BinaryRowDataKeySelector - flink 1.12.1

    • AggregateWindowOperator - flink 1.12.1

    1. 总结与展望篇

    先说说结论,以下这些结论已经在上节说过了,此处附上上节文章:

    1. 场景问题:flink sql 很适合简单 ETL,以及基本全部场景下的聚合类指标(本节要介绍的 tumble window 就在聚合类指标的范畴之内)。

    2. 语法问题:flink sql 语法其实是和其他 sql 语法基本一致的。基本不会产生语法问题阻碍使用 flink sql。但是本节要介绍的 tumble window 的语法就是略有不同的那部分。下面详细介绍。

    3. 运行问题:查看 flink sql 任务时的一些技巧,以及其中一些可能会碰到的坑:

    • 去 flink webui 就能看到这个任务目前在做什么。包括算子名称都会给直接展示给我们目前哪个算子在干啥事情,在处理啥逻辑。

    • sql 的 watermark 类型要设置为 TIMESTAMP(3)。

    • 事件时间逻辑中,sql api 和 datastream api 对于数据记录时间戳存储逻辑是不一样的。datastream api:每条记录的 rowtime 是放在 StreamRecord 中的时间戳字段中的。sql api:时间戳是每次都从数据中进行获取的。算子中会维护一个下标。可以按照下标从数据中获取时间戳。

    2.目标篇-本文能帮助大家了解 flink sql tumble window 什么?

    关于 flink sql tumble window 一般都会有以下问题。本文的目标也是为大家解答这些问题:

    1. 场景问题:场景问题就不必多说,datastream 在 tumble window 场景下的应用很多了,分钟级别聚合等常用场景

    2. 语法问题:flink sql 写 tumble window 任务时是一种与 hive sql 中没有的语法。下文详细介绍。

    3. 运行问题:使用一条简单的 tumble window sql 帮大家从 transformation、runtime 帮大家理解 tumble window 的整体运行机制。

    4. 理解误区:既然是 sql 必然要遵循 sql 语义,sql tumble window 聚合是输入多条,产出一条数据。并不像 datastream 那样可以在窗口 udf 中做到多对多。

    在正式开始聊 tumble window 之前,先看看上节 flink sql 适用场景的结论。让大家先有 flink sql 的一个整体印象以及结论。

    2.1.回顾上节的 flink sql 适用场景的结论

    不装了,我坦白了,flink sql 其实很适合干的活就是 dwd 清洗,dws 聚合。

    此处主要针对实时数仓的场景来说。flink sql 能干 dwd 清洗,dws 聚合,基本上实时数仓的大多数场景都能给覆盖了。

    flink sql 牛逼!!!

    但是!!!

    经过博主使用 flink sql 经验来看,并不是所有的 dwd,dws 聚合场景都适合 flink sql(截止发文阶段来说)!!!

    其实这些目前不适合 flink sql 的场景总结下来就是在处理上比 datastream 还是会有一定的损失。

    先总结下使用场景:

    1. dwd:简单的清洗、复杂的清洗、维度的扩充、各种 udf 的使用

    2. dws:各类聚合

    然后分适合的场景和不适合的场景来说,因为只这一篇不能覆盖所有的内容,所以本文此处先大致给个结论,之后会结合具体的场景详细描述。

    • 适合的场景:
    1. 简单的 dwd 清洗场景

    2. 全场景的 dws 聚合场景

    • 目前不太适合的场景:
    1. 复杂的 dwd 清洗场景:举例比如使用了很多 udf 清洗,尤其是使用很多的 json 类解析清洗

    2. 关联维度场景:举例比如 datastream 中经常会有攒一批数据批量访问外部接口的场景,flink sql 目前对于这种场景虽然有 localcache、异步访问能力,但是依然还是一条一条访问外部缓存,这样相比批量访问还是会有性能差距。

    3.概念篇-先聊聊常见的窗口聚合

    窗口聚合大家都在 datastream api 中很熟悉了,目前在实时数据处理的过程中,窗口计算可以说是最重要、最常用的一种计算方式了。

    但是在抛出窗口概念之前,博主有几个关于窗口的小想法说一下。

    3.1.窗口竟然拖慢数据产出?

    一个小想法。

    先抛结论:窗口会拖慢实时数据的产出,是在目前下游分析引擎能力有限的情况下的一种妥协方案。

    站在数据开发以及需求方的世界中,当然希望所有的数据都是实时来的,实时处理的,实时产出的,实时展现的

    举个例子:如果你要满足一个一分钟窗口聚合的 pv,uv,或者其他聚合需求。

    olap 数据服务引擎 就可以满足上述的实时来的,实时处理的,实时产出的,实时展现的的场景。flink 消费处理明细数据,产出到 kafka,然后直接导入到 olap 引擎中。查询时直接用 olap 做聚合。这其中是没有任何窗口的概念的。但是整个链路中,要保障端对端精确一次,要保障大数据量情况下 olap 引擎能够秒级查询返回,更何况有一些去重类指标的计算,等等场景。把这些压力都放在 olap 引擎的压力是很大的。

    因此在 flink 数据计算引擎中就诞生了窗口的概念。我们可以直接在计算引擎中进行窗口聚合计算,然后等到窗口结束之后直接把结果数据产出。这就出现了博主所说的窗口拖慢了实时数据产出的情况。而且窗口在处理不好的情况下可能会导致数据丢失。

    关于上述两种情况的具体优劣选择,都由大家自行选择。上述只是引出博主一些想法。

    3.2.常用的窗口

    目前已知的窗口分为以下四种。

    1. Tumble Windows2. Hop Windows3. Cumulate Windows****4. Session Windows

    这些窗口的具体描述直接见官网,有详细的说明。此处不赘述。

    https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/window-agg/

    此处介绍下 flink 中常常会涉及到的两个容易混淆的概念就是:窗口 + key。这里来形象的说明下。

    • 窗口:时间周期上面的划分。将无限流进行纵向切分,将无限流切分为一个一个的窗口,窗口相当于是无限流中的一段时间内的数据。

    • key:数据类别上面的划分。将无限流进行横向划分,相同 key 的数据会被划分到一组中,这个 key 的数据也是一条无限流。

    如下图所示。

    图片

    1

    4.实战篇-简单的 tumble window 案例和运行原理

    源码公众号后台回复flink sql tumble window 的奇妙解析之路获取。

    4.1.先看一个 datastream 窗口案例

    在介绍 sql tumble window 窗口算子执行案例之前,先看一个 datastream 中的窗口算子案例。其逻辑都是相通的。会对我们了解 sql tumble window 算子有帮助。

    我们先看看 datastream 处理逻辑。

    以下面这个为例。

    public class _04_TumbleWindowTest {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env =
                    StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
    
            env.setParallelism(1);
    
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            env.addSource(new UserDefinedSource())
                    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple4<String, String, Integer, Long>>(Time.seconds(0)) {
                        @Override
                        public long extractTimestamp(Tuple4<String, String, Integer, Long> element) {
                            return element.f3;
                        }
                    })
                    .keyBy(new KeySelector<Tuple4<String, String, Integer, Long>, String>() {
                        @Override
                        public String getKey(Tuple4<String, String, Integer, Long> row) throws Exception {
                            return row.f0;
                        }
                    })
                    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                    .sum(2)
                    .print();
    
            env.execute("1.12.1 DataStream TUMBLE WINDOW 案例");
        }
    
        private static class UserDefinedSource implements SourceFunction<Tuple4<String, String, Integer, Long>> {
    
            private volatile boolean isCancel;
    
            @Override
            public void run(SourceContext<Tuple4<String, String, Integer, Long>> sourceContext) throws Exception {
    
                while (!this.isCancel) {
    
                    sourceContext.collect(Tuple4.of("a", "b", 1, System.currentTimeMillis()));
    
                    Thread.sleep(10L);
                }
    
            }
    
            @Override
            public void cancel() {
                this.isCancel = true;
            }
        }
    }
    
    

    datastream 生产的具体的 transformation 如下图:

    图片

    24

    其中我们只关注最重要的 WindowOperator 算子。

    图片

    25

    其中 WindowOperator 算子包含的重要属性如下图。

    图片

    26

    来看看 WindowOperator 的执行逻辑。窗口执行的整体详细流程可以参考:http://wuchong.me/blog/2016/05/25/flink-internals-window-mechanism/

    图片

    23

    4.2.flink sql tumble window 的语义

    介绍到 tumble window 的语义,总要有对比的去介绍。这里的参照物就是 datastream api。

    在 datastream api 中。tumble window 一般用作以下两种场景。

    1. 业务场景:使用 tumble window 很轻松的计算出窗口内的聚合数据。一般是多条输入数据,窗口结束时一条输出数据。

    2. 优化场景:窗口聚合一批数据然后批量访问外部存储扩充维度、或者有一些自定义的处理逻辑。一般是多条输入数据,窗口结束时多条输出数据。

    但是在 sql api 中。tumble window 是聚合(group by)语义,聚合在 sql 标准中的数据处理逻辑是多条输入,在窗口触发时就输出一条数据的语义。而上面的常常用在 datastream 中的优化场景是多对多的场景。因此和 sql 语义不符合。所以 flink sql tumble window 一般都是用于计算聚合运算值来使用。

    4.3.tumble window 实际案例

    滚动窗口的特性就是会将无限流进行纵向划分成一个一个的窗口,每个窗口都是相同的大小,并且不重叠。

    图片

    22

    本文主要介绍 flink 1.12 及之前版本的实现。下一篇文章介绍 flink 1.13 的实现。

    来,在介绍原理之前,总要先用起来,我们就以下面这个例子展开。

    1.(flink 1.12.1)场景:简单且常见的分维度分钟级别同时在线用户数、总销售额

    数据源表:

    CREATE TABLE source_table (
        -- 维度数据
        dim STRING,
        -- 用户 id
        user_id BIGINT,
        -- 用户
        price BIGINT,
        -- 事件时间戳
        row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
        -- watermark 设置
        WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '10',
      'fields.dim.length' = '1',
      'fields.user_id.min' = '1',
      'fields.user_id.max' = '100000',
      'fields.price.min' = '1',
      'fields.price.max' = '100000'
    )
    
    

    **Notes - 关于 watermark 容易踩得坑:**sql 的 watermark 类型必须要设置为 TIMESTAMP(3)

    数据汇表:

    CREATE TABLE sink_table (
        dim STRING,
        pv BIGINT,
        sum_price BIGINT,
        max_price BIGINT,
        min_price BIGINT,
        uv BIGINT,
        window_start bigint
    ) WITH (
      'connector' = 'print'
    )
    
    

    数据处理逻辑:

    可以看下下面语法,窗口聚合的写法有专门的 tumble(row_time, interval '1' minute) 写法,这就是与平常我们写的 hive sql,mysql 等不一样的地方。

    insert into sink_table
    select dim,
           sum(bucket_pv) as pv,
           sum(bucket_sum_price) as sum_price,
           max(bucket_max_price) as max_price,
           min(bucket_min_price) as min_price,
           sum(bucket_uv) as uv,
           max(window_start) as window_start
    from (
         select dim,
                count(*) as bucket_pv,
                sum(price) as bucket_sum_price,
                max(price) as bucket_max_price,
                min(price) as bucket_min_price,
                -- 计算 uv 数
                count(distinct user_id) as bucket_uv,
                cast(tumble_start(row_time, interval '1' minute) as bigint) * 1000 as window_start
         from source_table
         group by
                -- 按照用户 id 进行分桶,防止数据倾斜
                mod(user_id, 1024),
                dim,
                tumble(row_time, interval '1' minute)
    )
    group by dim,
             window_start
    
    

    2.运行:可以看到,其实在 flink sql 任务中,其会把对应的处理逻辑给写到算子名称上面。

    **Notes - 观察 flink sql 技巧 1:**这个其实就是我们观察 flink sql 任务的第一个技巧。如果你想知道你的 flink 任务在干啥,第一反应是去 flink webui 看看这个任务目前在做什么。包括算子名称都会给直接展示给我们目前哪个算子在干啥事情,在处理啥逻辑

    先看一下整个算子图,如下图。从左到右总共分为三个算子。

    1. 第一个算子就是数据源算子

    2. 第二个算子就是分了桶的窗口聚合算子,第一个算子和第二个算子之间 hash 传输就是按照 group key 进行 hash 传输

    3. 第三个算子就是外层进行合桶计算的算子,同样也是 hash 传输,将分桶的数据在一个算子中进行合并计算

    图片

    5

    来看看每一个算子具体做了什么事情。

    第一个算子:

    1. table scan 读取数据源

    2. 从数据源中获取对应的字段(包括源表定义的 rowtime)

    3. 分配 watermark(按照源表定义的 watermark 分配对应的 watermark)

    4. 将一些必要的字段抽取。比如 group by 中的字段。在 hash 时需要使用。

    图片

    6

    第二个算子:

    1. 窗口聚合,计算窗口聚合数据

    2. 将数据按照第一层 select 中的数据进行计算以及格式化

    图片

    7

    第三个算子:

    1. group 聚合合桶计算

    2. 将数据按照第二层 select 中的数据进行计算以及格式化

    3. 将数据 sink 写出

    图片

    8

    3.(flink 1.12.1)结果:

    +I(9,1,32682,32682,32682,1,1631026440000)
    -U(9,1,32682,32682,32682,1,1631026440000)
    +U(9,2,115351,82669,32682,2,1631026440000)
    +I(2,1,76148,76148,76148,1,1631026440000)
    +I(8,1,79321,79321,79321,1,1631026440000)
    +I(a,1,85792,85792,85792,1,1631026440000)
    +I(0,1,12858,12858,12858,1,1631026440000)
    +I(5,1,36753,36753,36753,1,1631026440000)
    +I(3,1,19218,19218,19218,1,1631026440000)
    ...
    
    

    4.(flink 1.12.1)原理:

    关于 sql 开始运行的机制见上一节详述。

    此处只介绍相比前一节新增内容。可以看到上述代码的具体 transformation 如下图。

    图片

    9

    4.4.GeneratedWatermarkGenerator - flink 1.12.1

    按照顺序,首先看看 watermark 算子。同 datastream 的自定义 watermark 分配策略。

    图片

    10

    watermark 生成的具体代码 WatermarkGenerator$6,主要获取 watermark 的逻辑在 currentWatermark 方法中。如下图。

    图片

    11

    4.5.BinaryRowDataKeySelector - flink 1.12.1

    接着就是 group by(同 datastream 中的 keyby)。

    图片

    12

    group by key 生成的具体代码 KeyProjection$19,主要逻辑在 apply 方法中。

    图片

    13

    下一个就是窗口聚合算子。

    4.6.AggregateWindowOperator - flink 1.12.1

    兄弟们!!!兄弟们!!!兄弟们!!!

    本节的重头戏来了。sql 窗口聚合算子解析搞起来了。

    关于 WatermarkGeneratorKeyProjection 没有什么可以详细介绍的,都是输入一条数据,输出一条数据,逻辑很简单。

    但是窗口聚合算子的计算逻辑相比上面两个算子复杂很多。窗口算子又承载了窗口聚合的主要逻辑,所以本文重点介绍窗口算子计算的逻辑。

    先来看看 sql 窗口整体处理流程。其实与 datastream 处理流程基本一致,但只是少了 Evictor。如下图所示。

    图片

    40

    接着来看看上述 sql 生成的窗口聚合算子 AggregateWindowOperator,截图中属性也很清晰。

    图片

    16

    图片

    14

    具体生成的窗口聚合代码 GroupingWindowAggsHandler$59

    图片

    41

    计算逻辑 GroupingWindowAggsHandler$59#accumulate

    图片

    42

    图片

    43

    上面那段都是在 flink 客户端初始化处理的。包括窗口算子的初始化等。

    下面这段处理逻辑是在 flink TM 运行时开始执行的,包括窗口算子资源的初始化以及运行逻辑。就到了正式的数据处理环节了。

    窗口算子 Task 运行。

    图片

    27

    窗口算子 Task 初始化。

    图片

    28

    StreamTask 整体的处理流程。

    图片

    29

    窗口算子 open 初始化。

    图片

    30

    图片

    31

    窗口算子 open 初始化后的结果。如下图,对应的具体组件。

    图片

    32

    初始化完成之后,开始处理具体数据。

    图片

    图片

    循环 loop,一直 run 啊 run。

    图片

    35

    判断记录的具体类型,然后执行不同的逻辑。

    图片

    36

    来看看处理一条数据的 processElement 方法逻辑,进行 acc 处理。代码中的的 windowAggregator 就是之前代码生成的 GroupingWindowAggsHandler$59

    **Notes:**事件时间逻辑中,sql api 和 datastream api 对于数据记录时间戳存储逻辑是不一样的。datastream api:每条记录的 rowtime 是放在 StreamRecord 中的时间戳字段中的。sql api:时间戳是每次都从数据中进行获取的。算子中会维护一个下标。可以按照下标从数据中获取时间戳。

    图片

    39

    来看看 watermark 到达并且触发窗口计算时,执行 onEventTime 逻辑。

    图片

    38

    触发窗口计算时,onEventTime -> emitWindowResult,产出具体数据。

    图片

    17

    至此整个 sql tumble window 的处理逻辑也就很清楚了。和 datastream 基本上都是一致的。是不是整个逻辑就理清楚了。

    5.总结与展望篇

    源码公众号后台回复flink sql tumble window 的奇妙解析之路获取。

    大数据羊说

    大数据羊说

    用数据提升美好事物发生的概率~

    34篇原创内容

    公众号

    本文主要介绍了 tumble window 聚合类指标的常见场景案例以及其底层运行原理。

    而且也介绍了在查看 flink sql 任务时的一些技巧:

    1. 去 flink webui 就能看到这个任务目前在做什么。包括算子名称都会给直接展示给我们目前哪个算子在干啥事情,在处理啥逻辑。

    2. sql 的 watermark 类型要设置为 TIMESTAMP(3)。

    3. 事件时间逻辑中,sql api 和 datastream api 对于数据记录时间戳存储逻辑是不一样的。datastream api:每条记录的 rowtime 是放在 StreamRecord 中的时间戳字段中的。sql api:时间戳是每次都从数据中进行获取的。算子中会维护一个下标。可以按照下标从数据中获取时间戳。

    后续文章会基于一些最常见的案例以及原理层面介绍 1.13 版本的 flink sql tumble window(基于最新的 window tvf)。

    希望大家能持续关注。支持博主。喜欢的请关注 + 点赞 + 再看。

    往期推荐

    [

    flink sql 知其所以然(七):不会连最适合 flink sql 的 ETL 和 group agg 场景都没见过吧?

    ](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247489235&idx=1&sn=66c2b95aa3e22069a12b3d53b6d1d9f3&chksm=c1549a2bf623133d7a75732b5cea4bc304bf06a53777963d5cac8406958114873294dc3e4699&scene=21#wechat_redirect)

    [

    flink sql 知其所以然(六)| flink sql 约会 calcite(看这篇就够了)

    ](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247489112&idx=1&sn=21e86dab0e20da211c28cd0963b75ee2&chksm=c1549aa0f62313b6674833cd376b2a694752a154a63532ec9446c9c3013ef97f2d57b4e2eb64&scene=21#wechat_redirect)

    [

    flink sql 知其所以然(五)| 自定义 protobuf format

    ](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247488994&idx=1&sn=20236350b1c8cfc4ec5055687b35603d&chksm=c154991af623100c46c0ed224a8264be08235ab30c9f191df7400e69a8ee873a3b74859fb0b7&scene=21#wechat_redirect)

    [

    flink sql 知其所以然(四)| sql api 类型系统

    ](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247488788&idx=1&sn=0127fd4037788762a0401313b43b0ea5&chksm=c15499ecf62310fa747c530f722e631570a1b0469af2a693e9f48d3a660aa2c15e610653fe8c&scene=21#wechat_redirect)

    [

    flink sql 知其所以然(三)| 自定义 redis 数据汇表(附源码)

    ](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247488720&idx=1&sn=5695e3691b55a7e40814d0e455dbe92a&chksm=c1549828f623113e9959a382f98dc9033997dd4bdcb127f9fb2fbea046545b527233d4c3510e&scene=21#wechat_redirect)

    [

    flink sql 知其所以然(二)| 自定义 redis 数据维表(附源码)

    ](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247488635&idx=1&sn=41817a078ef456fb036e94072b2383ff&chksm=c1549883f623119559c47047c6d2a9540531e0e6f0b58b155ef9da17e37e32a9c486fe50f8e3&scene=21#wechat_redirect)

    [

    flink sql 知其所以然(一)| source\sink 原理

    ](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247488486&idx=1&sn=b9bdb56e44631145c8cc6354a093e7c0&chksm=c1549f1ef623160834e3c5661c155ec421699fc18c57f2c63ba14d33bab1d37c5930fdce016b&scene=21#wechat_redirect)

    更多 Flink 实时大数据分析相关技术博文,视频。后台回复 “flink” 或者 “flink sql” 获取。

    点个赞+在看,感谢您的肯定 👇
    
    展开全文
  • Tumble Pop-crx插件

    2021-04-07 11:23:15
    穿越邪教经典Tumble Pop难度越来越高的关卡 在Tumble Pop New Tab Pages with Wallpapers中与视频游戏史上最著名的英雄一起玩。 使用方法-只需点击添加到Chrome,它就会被自动安装添加。 -您可以在网络上搜索更多...
  • Error resuming tumble

    2020-12-29 08:38:12
    <div><p>I started a tumble via the UI. I lost power and resumed. It sat for several days and never completed. I have three steps left on the schedule. Here is the error I am seeing. I've removed ...
  • flink学习详细笔记

    2020-10-28 16:38:28
    包含了基础概念 数据源读取和数据源处理算子详解,时间窗口,cep算子和详解,让你从0步入数据分析!!!!!!
  • flink动态表中的窗口

    2021-03-05 16:27:52
    TUMBLE_START(time_attr, interval) TUMBLE_END(time_attr, interval) TUMBLE_ROWTIME(time_attr, interval) TUMBLE_PROCTIME(time_attr, interval) 接下来就是具体的例子 Group Windows (1)ProcessTime 1)...

    flink动态表中的窗口

    Flink Window作用
    GroupWindow对window中的数据按照字段进行分组
    OverWindow在整个Window窗口的条件下,对数据进行统计操作等

    Table API和SQL需要引入的依赖有两个:planner和bridge。

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner_2.12</artifactId>
        <version>1.10.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge_2.12</artifactId>
        <version>1.10.1</version>
    </dependency>
    

    滚动窗口

    TableAPI方式

    滚动窗口(Tumbling windows)要用Tumble类来定义,另外还有三个方法:
    over:定义窗口长度
    on:用来分组(按时间间隔)或者排序(按行数)的时间字段
    as:别名,必须出现在后面的groupBy中

    // Tumbling Event-time Window
    .window(Tumble.over("10.minutes").on("rowtime").as("w"))
    // Tumbling Processing-time Window
    .window(Tumble.over("10.minutes").on("proctime").as("w"))
    // Tumbling Row-count Window
    .window(Tumble.over("10.rows").on("proctime").as("w"))
    

    滑动窗口

    TableAPI方式

    滑动窗口(Sliding windows)要用Slide类来定义,另外还有四个方法:
    over:定义窗口长度
    every:定义滑动步长
    on:用来分组(按时间间隔)或者排序(按行数)的时间字段
    as:别名,必须出现在后面的groupBy中

    // Sliding Event-time Window
    .window(Slide.over("10.minutes").every("5.minutes").on("rowtime").as("w"))
    // Sliding Processing-time window 
    .window(Slide.over("10.minutes").every("5.minutes").on("proctime").as("w"))
    // Sliding Row-count window
    .window(Slide.over("10.rows").every("5.rows").on("proctime").as("w"))
    

    会话窗口

    TableAPI方式

    会话窗口(Session windows)要用Session类来定义,另外还有三个方法:
    withGap:会话时间间隔
    on:用来分组(按时间间隔)或者排序(按行数)的时间字段
    as:别名,必须出现在后面的groupBy中

    // Session Event-time Window
    .window(Session.withGap.("10.minutes").on("rowtime").as("w"))
    // Session Processing-time Window
    .window(Session.withGap.("10.minutes").on("proctime").as("w"))
    

    FlinkSQL方式

    我们已经了解了在Table API里window的调用方式,同样,我们也可以在SQL中直接加入窗口的定义和使用。Group Windows在SQL查询的Group BY子句中定义。与使用常规GROUP BY子句的查询一样,使用GROUP BY子句的查询会计算每个组的单个结果行。
    SQL支持以下Group窗口函数:
    TUMBLE(time_attr, interval)
    定义一个滚动窗口,第一个参数是时间字段,第二个参数是窗口长度。
    HOP(time_attr, interval, interval)
    定义一个滑动窗口,第一个参数是时间字段,第二个参数是窗口滑动步长,第三个是窗口长度。
    SESSION(time_attr, interval)
    定义一个会话窗口,第一个参数是时间字段,第二个参数是窗口间隔(Gap)。
    另外还有一些辅助函数,可以用来选择Group Window的开始和结束时间戳,以及时间属性。
    这里只写TUMBLE_*,滑动和会话窗口是类似的(HOP_*,SESSION_*)。
    TUMBLE_START(time_attr, interval)
    TUMBLE_END(time_attr, interval)
    TUMBLE_ROWTIME(time_attr, interval)
    TUMBLE_PROCTIME(time_attr, interval)
    

    接下来就是具体的例子

    Group Windows

    (1)ProcessTime

    1)TimeWindow

    1>Tumble
    package day07;
    
    import bean.SensorReading;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.Tumble;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    public class FlinkSQL09_ProcessTime_GroupWindow_Tumble {
    
        public static void main(String[] args) throws Exception {
    
            //1.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            //获取TableAPI执行环境
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
    
            //2.读取端口数据转换为JavaBean
            SingleOutputStreamOperator<SensorReading> sensorDS = env.socketTextStream("hadoop102", 9999)
                    .map(line -> {
                        String[] fields = line.split(",");
                        return new SensorReading(fields[0],
                                Long.parseLong(fields[1]),
                                Double.parseDouble(fields[2]));
                    });
    
            
            
         //----TableApi的形式 -----------------------------------------------------  
            
            //3.将流转换为表并指定处理时间字段
            Table table = tableEnv.fromDataStream(sensorDS, "id,ts,temp,pt.proctime");
    
            //4.基于时间的滚动窗口TableAPI
    
            // over("10.seconds").on("pt").as("tw"),
            // over("10.seconds"),表示的是窗口的大小为10s,注意是seconds
            // on("pt")基于时间字段pt,
            // as("tw"),是一个别名,tw。
    
            Table tableResult = table.window(Tumble.over("10.seconds").on("pt").as("tw"))
                    //根据处理时间和id进行分组
                    .groupBy("tw,id")
                    .select("id,id.count,tw.start"); //可以拿到当前事件的开始时间tw.start,。
    
    
            
            //----sql的形式---------------------------------------------------------
            
            
            //5.基于时间的滚动窗口SQL API
            tableEnv.createTemporaryView("sensor", table); //这里只能从table中读取数据建表,因为table中有pt这个字段。
    
    
            //这里是second,不是复数。可以获取窗口的开始时间或者是结束时间--tumble_end()、tumble_start()
            //也就是窗口信息也是可以拿到的。
            Table sqlResult = tableEnv.sqlQuery("select id,count(id),tumble_end(pt,interval '10' second) " +
                    "from sensor " +
                    "group by id,tumble(pt,interval '10' second)");
    
    
            //6.转换为流进行输出
            tableEnv.toAppendStream(tableResult, Row.class).print("Table");
            tableEnv.toAppendStream(sqlResult, Row.class).print("SQL");
    
            //7.执行任务
            env.execute();
    
        }
    }
    

    在这里插入图片描述

    2>Slide
    package day07;
    
    import bean.SensorReading;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Slide;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.Tumble;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    public class FlinkSQL10_ProcessTime_GroupWindow_Slide {
    
        public static void main(String[] args) throws Exception {
    
            //1.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            //获取TableAPI执行环境
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            //2.读取端口数据转换为JavaBean
            SingleOutputStreamOperator<SensorReading> sensorDS = env.socketTextStream("hadoop102", 9999)
                    .map(line -> {
                        String[] fields = line.split(",");
                        return new SensorReading(fields[0],
                                Long.parseLong(fields[1]),
                                Double.parseDouble(fields[2]));
                    });
    
            //3.将流转换为表并指定处理时间字段
            Table table = tableEnv.fromDataStream(sensorDS, "id,ts,temp,pt.proctime");
    
    
    
    
            //4.基于时间的滚动窗口TableAPI
    
            // over("10.seconds").every("5.seconds")
            // over("10.seconds"),表示窗口的大小为10s。
            //every("5.seconds"),表示滑动的步长为5s。
    
            //滑动窗口不能在select后获取窗口的信息
            Table tableResult = table.window(Slide.over("10.seconds").every("5.seconds").on("pt").as("sw"))
                    .groupBy("sw,id")
                    .select("id,id.count");
    
    
            //5.基于时间的滚动窗口SQL API
    
            //hop(pt,interval '5' second,interval '10' second),用hop来表示滑动窗口的有关信息
            //注意第一个是滑动步长,第二个参数是窗口的大小。
            //滑动窗口不能在select后获取窗口的信息
            tableEnv.createTemporaryView("sensor", table);
            Table sqlResult = tableEnv.sqlQuery("select id,count(id) " +
                    "from sensor " +
                    "group by id,hop(pt,interval '5' second,interval '10' second)");
    
    
            //6.转换为流进行输出
            tableEnv.toAppendStream(tableResult, Row.class).print("Table");
            tableEnv.toAppendStream(sqlResult, Row.class).print("SQL");
    
            //7.执行任务
            env.execute();
    
    
        }
    }
    
    3>Session
    package day07;
    
    import bean.SensorReading;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Session;
    import org.apache.flink.table.api.Slide;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    public class FlinkSQL11_ProcessTime_GroupWindow_Session {
    
        public static void main(String[] args) throws Exception {
    
            //1.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            //获取TableAPI执行环境
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            //2.读取端口数据转换为JavaBean
            SingleOutputStreamOperator<SensorReading> sensorDS = env.socketTextStream("hadoop102", 9999)
                    .map(line -> {
                        String[] fields = line.split(",");
                        return new SensorReading(fields[0],
                                Long.parseLong(fields[1]),
                                Double.parseDouble(fields[2]));
                    });
    
            //3.将流转换为表并指定处理时间字段
            Table table = tableEnv.fromDataStream(sensorDS, "id,ts,temp,pt.proctime");
    
    
            //4.基于时间的滚动窗口TableAPI
            //select后面不能得到会话信息
            //以5s作为一个会话窗口,按照处理时间和id进行分组。所以就是一个id一个窗口,一直输入的话看不见结果的,停止输入,等5s,可以看见结果。
            Table tableResult = table.window(Session.withGap("5.seconds").on("pt").as("sw"))
                    .groupBy("sw,id")
                    .select("id,id.count");
    
            //5.基于时间的滚动窗口SQL API
            //select后面不能得到会话信息
            tableEnv.createTemporaryView("sensor", table);
            Table sqlResult = tableEnv.sqlQuery("select id,count(id) " +
                    "from sensor " +
                    "group by id,session(pt,interval '5' second)");
    
            //6.转换为流进行输出
            tableEnv.toAppendStream(tableResult, Row.class).print("Table");
            tableEnv.toAppendStream(sqlResult, Row.class).print("SQL");
    
            //7.执行任务
            env.execute();
    
    
    
            //以5s作为一个会话窗口,按照处理时间和id进行分组。所以就是一个id一个窗口,一直输入的话看不见结果的,停止输入,等5s,可以看见结果。
    
        }
    }
    

    2) CountWindow(计数窗口没有sql写法)

    1>Tumble_Count
    package day07;
    
    import bean.SensorReading;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.Tumble;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    public class FlinkSQL12_ProcessTime_GroupWindow_Tumble_Count {
    
        public static void main(String[] args) throws Exception {
    
            //1.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            //获取TableAPI执行环境
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            //2.读取端口数据转换为JavaBean
            SingleOutputStreamOperator<SensorReading> sensorDS = env.socketTextStream("hadoop102", 9999)
                    .map(line -> {
                        String[] fields = line.split(",");
                        return new SensorReading(fields[0],
                                Long.parseLong(fields[1]),
                                Double.parseDouble(fields[2]));
                    });
    
            //3.将流转换为表并指定处理时间字段
            Table table = tableEnv.fromDataStream(sensorDS, "id,ts,temp,pt.proctime");
    
            //4.基于时间的滚动窗口TableAPI
    
            //窗口的大小为5行。
            Table tableResult = table.window(Tumble.over("5.rows").on("pt").as("tw"))
                    .groupBy("tw,id")
                    .select("id,id.count");
    
            //5.转换为流进行输出
            tableEnv.toAppendStream(tableResult, Row.class).print("Table");
    
            //6.执行任务
            env.execute();
    
    
        }
    }
    
    2>Slide_Count
    package day07;
    
    import bean.SensorReading;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Slide;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    public class FlinkSQL13_ProcessTime_GroupWindow_Slide_Count {
    
        public static void main(String[] args) throws Exception {
    
            //1.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            //获取TableAPI执行环境
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            //2.读取端口数据转换为JavaBean
            SingleOutputStreamOperator<SensorReading> sensorDS = env.socketTextStream("hadoop102", 9999)
                    .map(line -> {
                        String[] fields = line.split(",");
                        return new SensorReading(fields[0],
                                Long.parseLong(fields[1]),
                                Double.parseDouble(fields[2]));
                    });
    
            //3.将流转换为表并指定处理时间字段
            Table table = tableEnv.fromDataStream(sensorDS, "id,ts,temp,pt.proctime");
    
            //4.基于时间的滚动窗口TableAPI
    
            //.over("5.rows").every("2.rows").on("pt").as("sw")
            //over("5.rows"),5行数据作为一个窗口。
            //滑动步长为2行。
    
            Table tableResult = table.window(Slide.over("5.rows").every("2.rows").on("pt").as("sw"))
                    .groupBy("sw,id")
                    .select("id,id.count");
    
            //5.转换为流进行输出
            tableEnv.toAppendStream(tableResult, Row.class).print("Table");
    
            //7.执行任务
            env.execute();
    
        }
    }
    

    在这里插入图片描述

    (2)EventTime

    1)TimeWindow

    1>Tumble
    package day07;
    
    import bean.SensorReading;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.Tumble;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    public class FlinkSQL14_EventTime_GroupWindow_Tumble {
    
        public static void main(String[] args) throws Exception {
    
            //1.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            //获取TableAPI执行环境
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            //2.读取端口数据转换为JavaBean
            SingleOutputStreamOperator<SensorReading> sensorDS = env.socketTextStream("hadoop102", 9999)
                    .map(line -> {
                        String[] fields = line.split(",");
                        return new SensorReading(fields[0],
                                Long.parseLong(fields[1]),
                                Double.parseDouble(fields[2]));
                    })
                    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
                        @Override
                        public long extractTimestamp(SensorReading element) {
                            return element.getTs() * 1000L;
                        }
                    });
    
            //3.将流转换为表并指定处理时间字段
            Table table = tableEnv.fromDataStream(sensorDS, "id,ts,temp,rt.rowtime");
    
            //4.TableAPI
    //        Table tableResult = table.window(Tumble.over("10.seconds").on("rt").as("tw"))
    //                .groupBy("tw,id")
    //                .select("id,id.count");
    
            Table tableResult = table.window(Tumble.over("10.seconds").on("rt").as("tw"))
                    .groupBy("tw,id")
                    .select("id,id.count");
    
            //5.SQL
            tableEnv.createTemporaryView("sensor", table); //如果没有时间字段的话,可以用sensorDS创建临时表,但是流中没有时间字段,所以从table表中获取数据,来创建临时表。
            Table sqlResult = tableEnv.sqlQuery("select id,count(id) " +
                    "from sensor " +
                    "group by id,tumble(rt,interval '10' second)");
    
            //6.转换为流进行打印输出
            tableEnv.toAppendStream(tableResult, Row.class).print("Table");
            tableEnv.toAppendStream(sqlResult, Row.class).print("SQL");
    
            //7.执行
            env.execute();
    
    
    
    
            //------视频15
    
            //窗口是10s,延迟2s.
            //199,202有结果
    
            //seneor_1,11111199,35.8
            //seneor_6,11111202,35.8
    
            //为什么用sensor_6,事件时间为202的时候,也能输出 seneor_1,1 呢?
            //watermark会往下游传。watermark到了200,窗口就得关。和key没有关系,只有会话窗口会看key.
            //和watermark传递有关系,向下广播。
    
    
            //和并行度有点关系。
        }
    }
    
    2>Slide
    package day07;
    
    import bean.SensorReading;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.table.api.Slide;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.Tumble;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    public class FlinkSQL15_EventTime_GroupWindow_Slide {
    
        public static void main(String[] args) throws Exception {
    
            //1.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            //获取TableAPI执行环境
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            //2.读取端口数据转换为JavaBean
            SingleOutputStreamOperator<SensorReading> sensorDS = env.socketTextStream("hadoop102", 9999)
                    .map(line -> {
                        String[] fields = line.split(",");
                        return new SensorReading(fields[0],
                                Long.parseLong(fields[1]),
                                Double.parseDouble(fields[2]));
                    })
                    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
                        @Override
                        public long extractTimestamp(SensorReading element) {
                            return element.getTs() * 1000L;
                        }
                    });
    
            //3.将流转换为表并指定处理时间字段
            Table table = tableEnv.fromDataStream(sensorDS, "id,ts,temp,rt.rowtime");
    
            //4.TableAPI
            Table tableResult = table.window(Slide.over("10.seconds").every("2.seconds").on("rt").as("sw"))
                    .groupBy("sw,id")
                    .select("id,id.count");
    
            //5.SQL
            tableEnv.createTemporaryView("sensor", table);
            Table sqlResult = tableEnv.sqlQuery("select id,count(id) " +
                    "from sensor " +
                    "group by id,hop(rt,interval '2' second,interval '10' second)");
    
            //6.转换为流进行打印输出
            tableEnv.toAppendStream(tableResult, Row.class).print("Table");
            tableEnv.toAppendStream(sqlResult, Row.class).print("SQL");
    
            //7.执行
            env.execute();
    
    
           //10,2。每个时间处于5个窗口
    
           //只要是事件时间,统一都看watermark。
    
            //  sensor_1,1547777199,35.8
            //  sensor_6,1547777202,45
            //因为并行度是1,所以会输出  sensor_1,1
    
        }
    }
    
    3>Session
    package day07;
    
    import bean.SensorReading;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.table.api.Session;
    import org.apache.flink.table.api.Slide;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    public class FlinkSQL16_EventTime_GroupWindow_Session {
    
        public static void main(String[] args) throws Exception {
    
            //1.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            //获取TableAPI执行环境
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            //2.读取端口数据转换为JavaBean
            SingleOutputStreamOperator<SensorReading> sensorDS = env.socketTextStream("hadoop102", 9999)
                    .map(line -> {
                        String[] fields = line.split(",");
                        return new SensorReading(fields[0],
                                Long.parseLong(fields[1]),
                                Double.parseDouble(fields[2]));
                    })
                    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<SensorReading>(Time.seconds(2)) {
                        @Override
                        public long extractTimestamp(SensorReading element) {
                            return element.getTs() * 1000L;
                        }
                    });
    
            //3.将流转换为表并指定处理时间字段
            Table table = tableEnv.fromDataStream(sensorDS, "id,ts,temp,rt.rowtime");
    
            //4.TableAPI
            Table tableResult = table.window(Session.withGap("5.seconds").on("rt").as("sw"))
                    .groupBy("sw,id")
                    .select("id,id.count");
    
            //5.SQL
            tableEnv.createTemporaryView("sensor", table);
            Table sqlResult = tableEnv.sqlQuery("select id,count(id) " +
                    "from sensor " +
                    "group by id,session(rt,interval '5' second)");
    
            //6.转换为流进行打印输出
            tableEnv.toAppendStream(tableResult, Row.class).print("Table");
            tableEnv.toAppendStream(sqlResult, Row.class).print("SQL");
    
            //7.执行
            env.execute();
    
    
            //5s的窗口,2s的延迟。
            //基于事件时间
            //  sensor_1,1547777199,35.8
            //  sensor_6,1547777206,45
            //因为并行度是1,所以会输出  sensor_1,1
    
            //只要是事件时间,统一都看watermark。
    
        }
    }
    

    2)CountWindow

     EventTime目前没有计数窗口
    

    overwindow(没有滚动、滑动那一套)

    package bean;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    
    public class SensorReading {
    
        private String id;
        private  Long ts;
        private  Double temp;
    }
    

    (1)ProcessTime

    package day07;
    
    import bean.SensorReading;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Over;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.Tumble;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    public class FlinkSQL17_ProcessTime_OverWindow {
    
        public static void main(String[] args) throws Exception {
    
            //1.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            //获取TableAPI执行环境
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            //2.读取端口数据转换为JavaBean
            SingleOutputStreamOperator<SensorReading> sensorDS = env.socketTextStream("hadoop102", 9999)
                    .map(line -> {
                        String[] fields = line.split(",");
                        return new SensorReading(fields[0],
                                Long.parseLong(fields[1]),
                                Double.parseDouble(fields[2]));
                    });
    
    
            //3.将流转换为表并指定处理时间字段
            Table table = tableEnv.fromDataStream(sensorDS, "id,ts,temp,pt.proctime");
    
            //4.基于时间的滚动窗口TableAPI
            //---orderBy是必须选项。
            //Over.partitionBy("id").orderBy("pt").as("ow"),这个是开窗,下面的select中的字段可以用  over 别名的方式,对字段进行开窗。
            //例如:id.count over ow,这个就是id在开窗之后,进行计数,什么样的开窗,就是ow的那种方式。
    
            Table tableResult = table.window(Over.partitionBy("id").orderBy("pt").as("ow"))
                    .select("id,id.count over ow,temp.max over ow");
    
            //5.基于时间的滚动窗口SQL API
            //over中没有倒序这一说,数据还没有来,谁是第一条呢?
            tableEnv.createTemporaryView("sensor", table);
            Table sqlResult = tableEnv.sqlQuery("select id,count(id) over(partition by id order by pt) ct " +
                    "from sensor");
    
            //6.转换为流进行输出
            //给每一个数据都开了一个独立的窗口。使用append流即可,因为不可能撤回数据。
            tableEnv.toRetractStream(tableResult, Row.class).print("Table");
            tableEnv.toRetractStream(sqlResult, Row.class).print("SQL");
    
            //7.执行任务
            env.execute();
            
        }
    }
    

    (2)EventTime

    package day07_exercise;
    
    import bean.SensorReading;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.table.api.Over;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.Tumble;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    
    public class FlinkSQL18_EventTime_OverWindow {
    
        public static void main(String[] args) throws Exception {
    
            //1.获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            //获取TableAPI执行环境
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
            //2.读取端口数据转换为JavaBean
            SingleOutputStreamOperator<SensorReading> sensorDS = env.socketTextStream("hadoop102", 9999)
                    .map(line -> {
                        String[] fields = line.split(",");
                        return new SensorReading(fields[0],
                                Long.parseLong(fields[1]),
                                Double.parseDouble(fields[2]));
                    })
                
                //这里watermark,我们使用自动增加的类型。
                    .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<SensorReading>() {
                        @Override
                        public long extractAscendingTimestamp(SensorReading element) {
                            return element.getTs();
                        }
                    });
    
            //3.将流转换为表并指定处理时间字段
            Table table = tableEnv.fromDataStream(sensorDS, "id,ts,temp,rt.rowtime");
    
            //4.TableAPI
            Table tableResult = table.window(Over.partitionBy("id").orderBy("rt").as("ow"))
                    .select("id,id.count over ow");
    
            //5.SQL
            tableEnv.createTemporaryView("sensor", table); //常规注意,这里的数据来源只能是上面的table,因为上面的流中没有rt那个字段。
            
            Table sqlResult = tableEnv.sqlQuery("select id,count(id) over(partition by id order by rt) ct " +
                    "from sensor");
    
            //6.转换为流进行打印输出
            tableEnv.toAppendStream(tableResult, Row.class).print("Table");
            tableEnv.toAppendStream(sqlResult, Row.class).print("SQL");
    
            //7.执行
            env.execute();
    
        }
    }
    

    需要注意的是:
    我们用的是自增的watermark,源码里面是给事件事件减去了1ms,用来防止相同的数据过来的时候,第一相同数据就把窗口关闭了,以后的相同的数据无法进行计算。
    相同的时间戳的数据放在一个窗口里面。

    比如说数据是1、2、2、3、4,这些数据过来的时候。
    

    在这里插入图片描述

    在这里插入图片描述

    EventTime类型,只看watermark,不看key。watermark推动事件的进展
    
    展开全文
  • Apache Flink SQL概览

    2019-12-17 10:12:25
    TUMBLE_START/TUMBLE_END ·  HOP_START/HOP_END ·  SESSION_START/SESSION_END   这些辅助函数如何使用,请参考如下完整示例的使用方式。 完整的 SQL Job 案例   上面我们介绍了Apache ...

    Apache Flink SQL Job的组成

    我们做任何数据计算都离不开读取原始数据,计算逻辑和写入计算结果数据三部分,当然基于Apache Flink SQL编写的计算Job也离不开这个三部分,如下所所示:

    如上所示,一个完整的Apache Flink SQL Job 由如下三部分:

    · Source Operator – Soruce operator是对外部数据源的抽象, 目前Apache Flink内置了很多常用的数据源实现,比如上图提到的Kafka。

    · Query Operators – 查询算子主要完成如图的Query Logic,目前支持了Union,Join,Projection,Difference, Intersection以及window等大多数传统数据库支持的操作。

    · Sink Operator – Sink operator 是对外结果表的抽象,目前Apache Flink也内置了很多常用的结果表的抽象,比如上图提到的Kafka。

    Apache Flink SQL 核心算子

    SQL是Structured Quevy Language的缩写,最初是由美国计算机科学家Donald D. Chamberlin和Raymond F. Boyce在20世纪70年代早期从 Early History of SQL 中了解关系模型后在IBM开发的。该版本最初称为[SEQUEL: A Structured English Query Language](结构化英语查询语言),旨在操纵和检索存储在IBM原始准关系数据库管理系统System R中的数据。直到1986年,ANSI和ISO标准组正式采用了标准的”数据库语言SQL”语言定义。Apache Flink SQL 核心算子的语义设计也参考了1992 2011等ANSI-SQL标准。接下来我们将简单为大家介绍Apache Flink SQL 每一个算子的语义。

     

    SELECT

    SELECT 用于从数据集/流中选择数据,语法遵循ANSI-SQL标准,语义是关系代数中的投影(Projection),对关系进行垂直分割,消去某些列。

     

    一个使用Select的语句如下:

     
    SELECT ColA, ColC FROME tab ;

     

    WHERE

    WHERE 用于从数据集/流中过滤数据,与SELECT一起使用,语法遵循ANSI-SQL标准,语义是关系代数的Selection,根据某些条件对关系做水平分割,即选择符合条件的记录,如下所示:

    对应的SQL语句如下:

    SELECT * FROM tab WHERE ColA <> 'a2' ;

    GROUP BY

    GROUP BY 是对数据进行分组的操作,比如我需要分别计算一下一个学生表里面女生和男生的人数分别是多少,如下:

    对应的SQL语句如下:

    SELECT sex, COUNT(name) AS count FROM tab GROUP BY sex ;

    UNION ALL

    UNION ALL 将两个表合并起来,要求两个表的字段完全一致,包括字段类型、字段顺序,语义对应关系代数的Union,只是关系代数是Set集合操作,会有去重复操作,UNION ALL 不进行去重,如下所示:

    对应的SQL语句如下:

    SELECT * FROM T1 UNION ALL SELECT * FROM T2

    UNION

     

    UNION 将两个流给合并起来,要求两个流的字段完全一致,包括字段类型、字段顺序,并其UNION 不同于UNION ALL,UNION会对结果数据去重,与关系代数的Union语义一致,如下:

    对应的SQL语句如下:

     
    SELECT * FROM T1 UNION SELECT * FROM T2

     

    JOIN

    JOIN 用于把来自两个表的行联合起来形成一个宽表,Apache Flink支持的JOIN类型:

    · JOIN – INNER JOIN

    · LEFT JOIN – LEFT OUTER JOIN

    · RIGHT JOIN – RIGHT OUTER JOIN

    · FULL JOIN – FULL OUTER JOIN

     

     

    JOIN与关系代数的Join语义相同,具体如下:

     

    对应的SQL语句如下(INNER JOIN):

     
    SELECT ColA, ColB, T2.ColC, ColE FROM TI JOIN T2 ON T1.ColC = T2.ColC ;

    LEFT JOIN与INNER JOIN的区别是当右表没有与左边相JOIN的数据时候,右边对应的字段补NULL输出,如下:

    对应的SQL语句如下(LEFT JOIN):

    SELECT ColA, ColB, T2.ColC, ColE FROM TI LEFT JOIN T2 ON T1.ColC = T2.ColC ;

    说明:

    · 细心的读者可能发现上面T2.ColC是添加了前缀T2了,这里需要说明一下,当两张表有字段名字一样的时候,我需要指定是从那个表里面投影的。

    · RIGHT JOIN 相当于 LEFT JOIN 左右两个表交互一下位置。FULL JOIN相当于 RIGHT JOIN 和 LEFT JOIN 之后进行UNION ALL操作。

    Window

    在Apache Flink中有2种类型的Window,一种是OverWindow,即传统数据库的标准开窗,每一个元素都对应一个窗口。一种是GroupWindow,目前在SQL中GroupWindow都是基于时间进行窗口划分的。

    OverWindow

    OVER Window 目前支持由如下三个元素组合的8中类型:

    · 时间 – Processing Time 和 EventTime

    · 数据集 – Bounded 和 UnBounded

    · 划分方式 – ROWS 和 RANGE 我们以的Bounded ROWS 和 Bounded RANGE 两种常用类型,想大家介绍Over Window的语义

    Bounded ROWS Over Window

    Bounded ROWS OVER Window 每一行元素都视为新的计算行,即,每一行都是一个新的窗口。

    语法

     

    SELECT 
        agg1(col1) OVER(
         [PARTITION BY (value_expression1,..., value_expressionN)] 
         ORDER BY timeCol
         ROWS 
         BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW) AS colName, 
    ... 
    FROM Tab1

    FROM Tab1

    · value_expression – 进行分区的字表达式;

    · timeCol – 用于元素排序的时间字段;

    · rowCount – 是定义根据当前行开始向前追溯几行元素;

    语义

    我们以3个元素(2 PRECEDING)的窗口为例,如下图:

    上图所示窗口 user 1 的 w5和w6, user 2的 窗口 w2 和 w3,虽然有元素都是同一时刻到达,但是他们仍然是在不同的窗口,这一点有别于RANGE OVER Window.

    Bounded RANGE Over Window

    Bounded RANGE OVER Window 具有相同时间值的所有元素行视为同一计算行,即,具有相同时间值的所有行都是同一个窗口;

    语法

    Bounded RANGE OVER Window的语法如下:

     

    SELECT 
        agg1(col1) OVER(
         [PARTITION BY (value_expression1,..., value_expressionN)] 
         ORDER BY timeCol
         RANGE 
         BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS colName, 
    ... 
    FROM Tab1

    · value_expression – 进行分区的字表达式;

    · timeCol – 用于元素排序的时间字段;

    · timeInterval – 是定义根据当前行开始向前追溯指定时间的元素行;

     

    语义

    我们以3秒中数据(INTERVAL ‘2’ SECOND)的窗口为例,如下图:

    注意: 上图所示窗口 user 1 的 w6, user 2的 窗口 w3,元素都是同一时刻到达,他们是在同一个窗口,这一点有别于ROWS OVER Window.

    GroupWindow

    根据窗口数据划分的不同,目前Apache Flink有如下3种Bounded Winodw:

    · Tumble – 滚动窗口,窗口数据有固定的大小,窗口数据无叠加;

    · Hop – 滑动窗口,窗口数据有固定大小,并且有固定的窗口重建频率,窗口数据有叠加;

    · Session – 会话窗口,窗口数据没有固定的大小,根据窗口数据活跃程度划分窗口,窗口数据无叠加;

    说明:

     Aapche Flink 还支持UnBounded的 Group Window,也就是全局Window,流上所有数据都在一个窗口里面,语义非常简单,这里不做详细介绍了。

    GroupWindow的语法如下:

    SELECT 
        [gk], 
        agg1(col1),
         ... 
        aggN(colN)
    FROM Tab1
    GROUP BY [WINDOW(definition)], [gk]

    · [WINDOW(definition)] – 在具体窗口语义介绍中介绍。

     

    Tumble Window

    Tumble 滚动窗口有固定size,窗口数据不重叠,具体语义如下:

    假设我们要写一个2分钟大小的Tumble,示例SQL如下:

    SELECT gk, COUNT(*) AS pv 
      FROM tab 
        GROUP BY TUMBLE(rowtime, INTERVAL '2' MINUTE), gk

    Hop Window

    Hop 滑动窗口和滚动窗口类似,窗口有固定的size,与滚动窗口不同的是滑动窗口可以通过slide参数控制滑动窗口的新建频率。因此当slide值小于窗口size的值的时候多个滑动窗口会重叠,具体语义如下:

    假设我们要写一个每5分钟统计近10分钟的页面访问量(PV).

     

    SELECT gk, COUNT(*) AS pv 
      FROM tab 
        GROUP BY HOP(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE), gk

     

    Session Window

    Session 会话窗口 是没有固定大小的窗口,通过session的活跃度分组元素。不同于滚动窗口和滑动窗口,会话窗口不重叠,也没有固定的起止时间。一个会话窗口在一段时间内没有接收到元素时,即当出现非活跃间隙时关闭。一个会话窗口 分配器通过配置session gap来指定非活跃周期的时长,具体语义如下:

    假设我们要写一个统计连续的两个访问用户之间的访问时间间隔不超过3分钟的的页面访问量(PV).

     

    SELECT gk, COUNT(*) AS pv  
      FROM pageAccessSession_tab
        GROUP BY SESSION(rowtime, INTERVAL '3' MINUTE), gk

    说明: 

    很多场景用户需要获得Window的开始和结束时间,上面的GroupWindow的SQL示例中没有体现,那么窗口的开始和结束时间应该怎样获取呢? Apache Flink 我们提供了如下辅助函数:

    · TUMBLE_START/TUMBLE_END

    · HOP_START/HOP_END

    · SESSION_START/SESSION_END

     

    这些辅助函数如何使用,请参考如下完整示例的使用方式。

    完整的 SQL Job 案例

     

    上面我们介绍了Apache Flink SQL核心算子的语法及语义,这部分将选取Bounded EventTime Tumble Window为例为大家编写一个完整的包括Source和Sink定义的Apache Flink SQL Job。假设有一张淘宝页面访问表(PageAccess_tab),有地域,用户ID和访问时间。我们需要按不同地域统计每2分钟的淘宝首页的访问量(PV). 具体数据如下:

     

     

    region

    userId

    accessTime

    ShangHaiU00102017-11-11 10:01:00
    BeiJingU10012017-11-11 10:01:00
    BeiJingU20322017-11-11 10:10:00
    BeiJingU11002017-11-11 10:11:00
    ShangHaiU00112017-11-11 12:10:00

     

    Source 定义

     

    自定义Apache Flink Stream Source需要实现StreamTableSource, StreamTableSource中通过StreamExecutionEnvironment 的addSource方法获取DataStream, 所以我们需要自定义一个 SourceFunction, 并且要支持产生WaterMark,也就是要实现DefinedRowtimeAttributes接口。出于代码篇幅问题,我们如下只介绍核心部分,完整代码 请查看: EventTimeTumbleWindowDemo.scala

     

    Source Function定义

     

    支持接收携带EventTime的数据集合,Either的数据结构Right是WaterMark,Left是元数据:

    class MySourceFunction[T](dataList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] {
      override def run(ctx: SourceContext[T]): Unit = {
        dataList.foreach {
          case Left(t) => ctx.collectWithTimestamp(t._2, t._1) 
          case Right(w) => ctx.emitWatermark(new Watermark(w)) // emit watermark
        }
      }
    }

    定义 StreamTableSource

    我们自定义的Source要携带我们测试的数据,以及对应的WaterMark数据,具体如下:

    class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {
    
      // 页面访问表数据 rows with timestamps and watermarks
      val data = Seq(
         // Data
         Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")),
         // Watermark
         Right(1510365660000L),
        ..
      )
      
      val fieldNames = Array("accessTime", "region", "userId")
      val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING))
      val rowType = new RowTypeInfo(
        Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
        fieldNames)
      
      override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
        // 添加数据源实现
        execEnv.addSource(new MySourceFunction[Row](data)).setParallelism(1).returns(rowType)
      }
      ...
    }

    Sink 定义

    我们简单的将计算结果写入到Apache Flink内置支持的CSVSink中,定义Sink如下:

    def getCsvTableSink: TableSink[Row] = {
        val tempFile = ...
        new CsvTableSink(tempFile.getAbsolutePath).configure(
          Array[String]("region", "winStart", "winEnd", "pv"),
          Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG))
      }

    构建主程序

    主程序包括执行环境的定义,Source/Sink的注册以及统计查SQL的执行,具体如下:

    def main(args: Array[String]): Unit = {
        // Streaming 环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val tEnv = TableEnvironment.getTableEnvironment(env)
    
        // 设置EventTime
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        //方便我们查出输出数据
        env.setParallelism(1)
    
        val sourceTableName = "mySource"
        // 创建自定义source数据结构
        val tableSource = new MyTableSource
    
        val sinkTableName = "csvSink"
        // 创建CSV sink 数据结构
        val tableSink = getCsvTableSink
    
        // 注册source
        tEnv.registerTableSource(sourceTableName, tableSource)
        // 注册sink
        tEnv.registerTableSink(sinkTableName, tableSink)
    
        val sql =
          "SELECT  " +
          "  region, " +
          "  TUMBLE_START(accessTime, INTERVAL '2' MINUTE) AS winStart," +
          "  TUMBLE_END(accessTime, INTERVAL '2' MINUTE) AS winEnd, COUNT(region) AS pv " +
          " FROM mySource " +
          " GROUP BY TUMBLE(accessTime, INTERVAL '2' MINUTE), region"
    
        tEnv.sqlQuery(sql).insertInto(sinkTableName);
        env.execute()
      }

    执行并查看运行结果

    执行主程序后我们会在控制台得到Sink的文件路径,如下:

    Sink path : /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem

    Cat 方式查看计算结果,如下:

    jinchengsunjcdeMacBook-Pro:FlinkTableApiDemo jincheng.sunjc$ cat /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem
    ShangHai,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1
    BeiJing,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1
    BeiJing,2017-11-11 02:10:00.0,2017-11-11 02:12:00.0,2
    ShangHai,2017-11-11 04:10:00.0,2017-11-11 04:12:00.0,1
    展开全文
  • TUMBLE(, ) < size-interval>: INTERVAL 'string' timeUnit 参数必须是时间流中的一个合法的时间属性字段 标识函数 使用标识函数选出窗口的起始时间或者结束时间,窗口的时间属性用于下级Window的聚合 窗口标识函数 ...

     

    前言

    Flink SQL的窗口操作在straming的实现上增添了不少东西。是用起来越来越方便。本篇主要内容:

    • Watermark

    • 滚动窗口

    • 滑动窗口

    • 累积窗口函数

    • 窗口分组聚合GROUPING SETS

    • Clue幂集函数

    • Over函数

    Watermark

    引入

    由于实时计算的输入数据是持续不断的,因此我们需要一个有效的进度指标,来帮助我们确定关闭时间窗口的正确时间点,保证关闭窗口后不会再有数据进入该窗口,可以安全输出这个窗口的聚合结果。

    而Watermark就是一种衡量Event Time进展的有效机制。随着时间的推移,最早流入实时计算的数据会被处理完成,之后流入的数据处于正在处理状态。处于正在处理部分的和已处理部分的交界的时间戳,可以被定义为Watermark,代表在此之前的事件已经被处理完成并输出。

    针对乱序的流,Watermark也至关重要,即使部分事件延迟到达,也不会影响窗口计算的正确性。此外,并行数据流中,当算子(Operator)有多个输入流时,算子的Event Time以最小流Event Time为准。

    具体可参考Flink Straming的原理介绍:

    Flink的窗口、时间语义,Watermark机制,多代码案例详解,Flink学习入门(三)

    watermark策略

    Flink SQL提供了几种常用的watermark策略。

    1. 严格意义上递增的时间戳,发出到目前为止已观察到的最大时间戳的水印。时间戳小于最大时间戳的行不会迟到。watermark for rowtime_column as rowtime_column

    2. 递增的时间戳,发出到目前为止已观察到的最大时间戳为负1的水印。时间戳等于或小于最大时间戳的行不会迟到。watermark for rowtime_column as rowtime_column - INTERVAL '1' SECOND.

    3. 有界时间戳(乱序)发出水印,它是观察到的最大时间戳减去指定的延迟, 例如,watermark for rowtime_column as rowtime_column - INTERVAL'5'SECOND是5秒的延迟水印策略。watermark for rowtime_column as rowtime_column - INTERVAL 'string' timeUnit.

    现常用的语法

    watermark [watermarkName] for <rowtime_field> as withOffset(<rowtime_field>, offset)

    参数是否必填说明
    watermarkName标识Watermark的名字。
    <rowtime_field><rowtime_field>必须是表中已定义的一列(当前仅支持TIMESTAMP类型),基于该列生成Watermark,并且标识该列为Event Time列。您可以使用<rowtime_field>在作业代码中定义窗口。
    withOffsetWatermark的生成策略,根据<rowtime_field> - offset生成Watermark的值。withOffset的第一个参数必须是<rowtime_field>。
    offsetWatermark值与Event Time值的偏移量,单位为毫秒。

    窗口函数

    滚动窗口

    滚动窗口(TUMBLE)将每个元素分配到一个指定大小的窗口中。通常,滚动窗口有一个固定的大小,并且不会出现重叠。例如,如果指定了一个5分钟大小的滚动窗口,无限流的数据会根据时间划分为[0:00, 0:05)、[0:05, 0:10)、[0:10, 0:15)等窗口。

    语法

    TUMBLE函数用在GROUP BY子句中,用来定义滚动窗口。TUMBLE(< time-attr>, < size-interval>) < size-interval>: INTERVAL 'string' timeUnit

    < time-attr>参数必须是时间流中的一个合法的时间属性字段

    标识函数

    使用标识函数选出窗口的起始时间或者结束时间,窗口的时间属性用于下级Window的聚合

    窗口标识函数返回类型描述
    TUMBLE_START(time-attr, size-interval)TIMESTAMP返回窗口的起始时间(包含边界)。例如[00:10,00:15)窗口,返回00:10
    TUMBLE_END(time-attr, size-interval)TIMESTAMP返回窗口的结束时间(包含边界)。例如[00:00, 00:15]窗口,返回00:15。
    TUMBLE_ROWTIME(time-attr, size-interval)TIMESTAMP(rowtime-attr)返回窗口的结束时间(不包含边界)。例如[00:00, 00:15]窗口,返回00:14:59.999。返回值是一个rowtime attribute,即可以基于该字段做时间属性的操作,例如,级联窗口只能用在基于Event Time的Window上,详情请参见级联窗口。
    TUMBLE_PROCTIME(time-attr, size-interval)TIMESTAMP(rowtime-attr)返回窗口的结束时间(不包含边界)。例如[00:00, 00:15]窗口,返回00:14:59.999。返回值是一个Proctime Attribute,即可以基于该字段做时间属性的操作。例如,级联窗口只能用在基于Processing Time的Window上,详情请参见级联窗口。

    新版本语法(简洁)

    TUMBLE(TABLE data, DESCRIPTOR(timecol), size)

    • data: 是一个表参数。

    • timecol: 是一个列描述符指示应该映射到哪个时间的属性列。

    • size: 是一个持续时间指定窗口的宽度。

    滑动窗口

    滑动窗口(HOP),也被称作Sliding Window。不同于滚动窗口,滑动窗口的窗口可以重叠。

    通常,大部分元素符合多个窗口情景,窗口是重叠的。因此,滑动窗口在计算移动平均数(moving averages)时很实用。例如,计算过去5分钟数据的平均值,每10秒钟更新一次,可以设置slide为10秒,size为5分钟。下图为您展示间隔为30秒,窗口大小为1分钟的滑动窗口。

    语法

    下面不再介绍之前版本的写法,按1.13版本来 HOP(TABLE data, DESCRIPTOR(timecol), slide, size [, offset ])

    • data: 是一个表参数,数据表。

    • timecol: 是一个列描述符指示应该映射到哪个时间的属性列。

    • slide: 滑动时间。

    • size: 是一个持续时间指定窗口的宽度。

    累积窗口函数

    累积窗口在某些情况下非常有用,例如在固定的窗口间隔内早期触发的滚动窗口。例如,每日从00:00到每分钟累计计算UV值,10:00的UV值表示00:00到10:00的UV总数。这可以通过累积窗口轻松有效地实现。

    累积函数将元素分配给窗口,这些窗口在初始步长间隔内覆盖行,并在每一步扩展到一个更多的步长(保持窗口开始固定),直到最大窗口大小。你可以把累积函数看作是先应用具有最大窗口大小的滚动窗口,然后把每个滚动窗口分成几个窗口,每个窗口的开始和结束都有相同的步长差。所以累积窗口确实有重叠,而且没有固定的大小

    例如,你可以对1小时的步长和1天的最大长度有一个累积窗口,你就会得到窗口 [00:00, 01:00), [00:00, 02:00), [00:00, 03:00), …, [00:00, 24:00)

    语法

    CUMULATE(TABLE data, DESCRIPTOR(timecol), step, size)

    • data: 是一个表参数,数据表。

    • timecol: 是一个列描述符指示应该映射到哪个时间的属性列。

    • step: 步长

    • size: 是一个持续时间指定窗口的宽度!

    以此为例:数据:goods

    timepriceitem
    2020-04-15 08:054.00C
    2020-04-15 08:072.00A
    2020-04-15 08:095.00D
    2020-04-15 08:113.00B
    2020-04-15 08:131.00E
    2020-04-15 08:176.00F
    SELECT window_start, window_end, SUM(price)
      FROM TABLE(
        CUMULATE(TABLE goods, DESCRIPTOR(time), INTERVAL '2' MINUTES, INTERVAL '10' MINUTES))
      GROUP BY window_start, window_end
    
    window_startwindow_endprice
    2020-04-15 08:002020-04-15 08:064.00
    2020-04-15 08:002020-04-15 08:086.00
    2020-04-15 08:002020-04-15 08:1011.00
    2020-04-15 08:102020-04-15 08:123.00
    2020-04-15 08:102020-04-15 08:144.00
    2020-04-15 08:102020-04-15 08:164.00
    2020-04-15 08:102020-04-15 08:1810.00
    2020-04-15 08:102020-04-15 08:2010.00

    窗口分组聚合GROUPING SETS

    窗口聚合也支持分组集语法。分组集允许更复杂的分组标准比描述所GROUP by操作。指定行分别分组,每个分组集和聚合计算每组一样简单的group by子句 与下面相同 ROLLUP 汇总是一种速记符号用于指定一个常见类型的分组集。它代表给定的表达式和所有前缀列表的列表,包括空列表。      窗口与汇总需要聚合window_start和window_end列必须在GROUP BY子句,但不是在ROLLUP中条款。

    val kafka_sql=
          """
            |CREATE TABLE goods (
            |  item_id VARCHAR,
            |  item_type VARCHAR,
            |  event_time varchar,
            |  on_sell_time AS TO_TIMESTAMP(event_time),
            |  price DOUBLE,
            |  WATERMARK FOR on_sell_time AS on_sell_time - INTERVAL '1' SECOND
            |) WITH (
            |  'connector' = 'kafka',
            |  'topic' = 'testtopic',
            |  'properties.bootstrap.servers' = 'xxxxxx',
            |  'format' = 'json'
            |)
            |""".stripMargin
        tableEnv.executeSql(kafka_sql)
    
        val query=
          """
            |SELECT window_start, window_end, item_type, SUM(price) as price
            |  FROM TABLE(
            |    TUMBLE(TABLE goods, DESCRIPTOR(on_sell_time), INTERVAL '5' MINUTES))
            |  GROUP BY window_start, window_end, GROUPING SETS ((item_type), ())
            |""".stripMargin
            
            或者
           
          val query=
          """
            |SELECT window_start, window_end, item_type, SUM(price) as price
            |  FROM TABLE(
            |    TUMBLE(TABLE goods, DESCRIPTOR(on_sell_time), INTERVAL '5' MINUTES))
            |  GROUP BY window_start, window_end, ROLLUP (item_type)
            |""".stripMargin
    
    
    tableEnv.executeSql(query).print()
            
    

    数据源write(item)方法后发送kafka的消息

    def test_over_window(): Array[AnyRef] ={
        Array(
          Map(
            "item_id"->"ITEM001",
            "item_type"->"apple",
            "event_time"->"2020-04-15 10:01:00",
            "price"->20
          ),
          Map(
            "item_id"->"ITEM002",
            "item_type"->"apple",
            "event_time"->"2020-04-15 10:02:00",
            "price"->50
          ),
          Map(
            "item_id"->"ITEM003",
            "item_type"->"apple",
            "event_time"->"2020-04-15 10:03:00",
            "price"->30
          ),
          Map(
            "item_id"->"ITEM004",
            "item_type"->"apple",
            "event_time"->"2020-04-15 10:04:00",
            "price"->60
          ),
          Map(
            "item_id"->"ITEM005",
            "item_type"->"apple",
            "event_time"->"2020-04-15 10:05:00",
            "price"->40
          ),
          Map(
            "item_id"->"ITEM006",
            "item_type"->"apple",
            "event_time"->"2020-04-15 10:06:00",
            "price"->20
          ),
          Map(
            "item_id"->"ITEM007",
            "item_type"->"apple",
            "event_time"->"2020-04-15 10:07:00",
            "price"->10
          ),
          Map(
            "item_id"->"ITEM008",
            "item_type"->"orange",
            "event_time"->"2020-04-15 10:08:00",
            "price"->20
          ),
          Map(
            "item_id"->"ITEM009",
            "item_type"->"orange",
            "event_time"->"2020-04-15 10:09:00",
            "price"->21
          ),
          Map(
            "item_id"->"ITEM010",
            "item_type"->"apple",
            "event_time"->"2020-04-15 10:10:00",
            "price"->22
          ),
          Map(
            "item_id"->"ITEM011",
            "item_type"->"orange",
            "event_time"->"2020-04-15 10:11:00",
            "price"->26
          )
        )
      }
    

    结果:

    window_startwindow_enditem_typeprice
    2020-04-15 10:00:00.0002020-04-15 10:05:00.000apple160.0
    2020-04-15 10:00:00.0002020-04-15 10:05:00.000(NULL)160.0
    2020-04-15 10:05:00.0002020-04-15 10:10:00.000apple70.0
    2020-04-15 10:05:00.0002020-04-15 10:10:00.000orange41.0
    2020-04-15 10:05:00.0002020-04-15 10:10:00.000(NULL)111.0

    CUBE

    CUBE是一种用于指定公共分组集类型的简写符号。它表示给定的列表及其所有可能的子集——幂集

    def cube(): Unit ={
        val kafka_sql=
          """
            |CREATE TABLE goods (
            |  item_id VARCHAR,
            |  item_type VARCHAR,
            |  event_time varchar,
            |  on_sell_time AS TO_TIMESTAMP(event_time),
            |  price DOUBLE,
            |  WATERMARK FOR on_sell_time AS on_sell_time - INTERVAL '1' SECOND
            |) WITH (
            |  'connector' = 'kafka',
            |  'topic' = 'testtopic',
            |  'properties.bootstrap.servers' = 'xxxxx',
            |  'format' = 'json'
            |)
            |""".stripMargin
        tableEnv.executeSql(kafka_sql)
    
        val query=
          """
            |SELECT window_start, window_end,item_id, item_type, SUM(price) as price
            |  FROM TABLE(
            |    TUMBLE(TABLE goods, DESCRIPTOR(on_sell_time), INTERVAL '5' MINUTES))
            |  GROUP BY window_start, window_end, CUBE(item_id,item_type)
            |""".stripMargin
    
        tableEnv.executeSql(query).print()
        
        }
    

    数据源write(item)方法后发送kafka的消息

    def test_clue(): Array[AnyRef] ={
        Array(
          Map(
            "item_id"->"ITEM001",
            "item_type"->"apple",
            "event_time"->"2020-04-15 10:01:00",
            "price"->20
          ),
          Map(
            "item_id"->"ITEM001",
            "item_type"->"apple",
            "event_time"->"2020-04-15 10:02:00",
            "price"->50
          ),
          Map(
            "item_id"->"ITEM001",
            "item_type"->"apple",
            "event_time"->"2020-04-15 10:03:00",
            "price"->30
          ),
          Map(
            "item_id"->"ITEM001",
            "item_type"->"apple",
            "event_time"->"2020-04-15 10:04:00",
            "price"->60
          ),
          Map(
            "item_id"->"ITEM001",
            "item_type"->"apple",
            "event_time"->"2020-04-15 10:05:00",
            "price"->40
          ),
          Map(
            "item_id"->"ITEM002",
            "item_type"->"apple",
            "event_time"->"2020-04-15 10:06:00",
            "price"->20
          ),
          Map(
            "item_id"->"ITEM002",
            "item_type"->"apple",
            "event_time"->"2020-04-15 10:07:00",
            "price"->10
          ),
          Map(
            "item_id"->"ITEM002",
            "item_type"->"orange",
            "event_time"->"2020-04-15 10:08:00",
            "price"->20
          ),
          Map(
            "item_id"->"ITEM002",
            "item_type"->"orange",
            "event_time"->"2020-04-15 10:09:00",
            "price"->21
          ),
          Map(
            "item_id"->"ITEM02",
            "item_type"->"apple",
            "event_time"->"2020-04-15 10:10:00",
            "price"->22
          ),
          Map(
            "item_id"->"ITEM02",
            "item_type"->"orange",
            "event_time"->"2020-04-15 10:11:00",
            "price"->26
          )
        )
      }
    
    

    结果:

    window_startwindow_enditem_iditem_typeprice
    2020-04-15 10:00:00.0002020-04-15 10:05:00.000(NULL)apple140.0
    2020-04-15 10:00:00.0002020-04-15 10:05:00.000ITEM001apple140.0
    2020-04-15 10:00:00.0002020-04-15 10:05:00.000ITEM001(NULL)140.0
    2020-04-15 10:00:00.0002020-04-15 10:05:00.000(NULL)(NULL)140.0
    2020-04-15 10:05:00.0002020-04-15 10:10:00.000(NULL)apple70.0
    2020-04-15 10:05:00.0002020-04-15 10:10:00.000ITEM001apple40.0
    2020-04-15 10:05:00.0002020-04-15 10:10:00.000ITEM001(NULL)40.0
    2020-04-15 10:05:00.0002020-04-15 10:10:00.000ITEM002orange41.0
    2020-04-15 10:05:00.0002020-04-15 10:10:00.000(NULL)orange41.0
    2020-04-15 10:05:00.0002020-04-15 10:10:00.000ITEM002(NULL)71.0
    2020-04-15 10:05:00.0002020-04-15 10:10:00.000(NULL)(NULL)111.0
    2020-04-15 10:05:00.0002020-04-15 10:10:00.000ITEM002apple30.0

    over函数

    以窗口Top N为例子

     def top_n: Unit ={
        val kafka_sql=
          """
            |CREATE TABLE goods (
            |  item_id VARCHAR,
            |  item_type VARCHAR,
            |  event_time varchar,
            |  on_sell_time AS TO_TIMESTAMP(event_time),
            |  price DOUBLE,
            |  WATERMARK FOR on_sell_time AS on_sell_time - INTERVAL '1' SECOND
            |) WITH (
            |  'connector' = 'kafka',
            |  'topic' = 'testtopic',
            |  'properties.bootstrap.servers' = 'xxxxx',
            |  'format' = 'json'
            |)
            |""".stripMargin
        tableEnv.executeSql(kafka_sql)
    
        val query=
          """
            |SELECT *
            |  FROM (
            |    SELECT *, ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY price DESC) as rownum
            |    FROM (
            |      SELECT window_start, window_end, item_type, SUM(price) as price, COUNT(*) as cnt
            |      FROM TABLE(
            |        TUMBLE(TABLE goods, DESCRIPTOR(on_sell_time), INTERVAL '5' MINUTES))
            |      GROUP BY window_start, window_end, item_type
            |    )
            |  ) WHERE rownum <= 2
            |""".stripMargin
        tableEnv.executeSql(query).print()
        
        }
    

    数据源也就是上面的test_over_window通过write(item)方法后发送kafka的消息

    结果:

    window_startwindow_enditem_typepricecntrownum
    2020-04-15 10:00:00.0002020-04-15 10:05:00.000apple160.041
    2020-04-15 10:05:00.0002020-04-15 10:10:00.000apple70.031
    2020-04-15 10:05:00.0002020-04-15 10:10:00.000orange41.022

    总结:

    在Flink SQL窗口中:累积窗口函数, 窗口分组聚合GROUPING SETS,Clue幂集函数, Over函数这些给人眼前一亮,的确是减少了开发工作量。但是需要开发工作者灵活运行才会发挥价值。知识无止境,人生也无止境。

      

    大数据左右手

    技术如同手中的水有了生命似的,汇聚在了一起。作为大数据开发工作者,致力于大数据技术的学习与工作,分享大数据原理、架构、实时、离线、面试与总结,分享生活思考与读书见解。总有适合你的那一篇。

    关注公众号!!!

    和我联系吧,加群交流大数据知识,一起成长~~~

    展开全文
  • //group by TUMBLE(proctime,INTERVAL '10' SECOND) ,deviceId) Table rs = bsTableEnv.sqlQuery("select count(*),deviceId,TUMBLE_START(rowtime, INTERVAL '10' SECOND) from deviceInfo GROUP BY TUMBLE(row...
  • 一、Flink SQL 背景 Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。 自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定... Tumble Window ...
  • 深入解读 Flink SQL 1.13

    2021-06-25 11:06:47
    ■ 1.1 Window TVF 语法 在 1.13 版本前,window 的实现是通过一个特殊的 SqlGroupedWindowFunction: SELECT TUMBLE_START(bidtime,INTERVAL '10' MINUTE), TUMBLE_END(bidtime,INTERVAL '10' MINUTE), TUMBLE_ROW...
  • flink 技术学习分享

    千次阅读 2018-12-13 17:53:33
    在flink中常用的窗口有TumbleWindow、SlidingWindow、sessionWindow。 滚动窗口(TUMBLE)将每个元素分配到一个指定大小的窗口中。通常滚动窗口有一个固定的大小,并且不会出现重叠。 例如:如果指定了一个5分钟...
  • scoring them having a set of scissors to lessen the chance of having a tumble. Louboutin Outlet Shoes Christian Louboutin also came up with popular and famous shiny red lacquer soled shoes that ...
  • import org.apache.flink.table.api.Tumble; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.functions.ScalarFunction; import org.apache.flink.types....
  • 这里只写TUMBLE_*,滑动和会话窗口是类似的(HOP_*,SESSION_*) TUMBLE_START(time_attr, interval) TUMBLE_END(time_attr, interval) TUMBLE_ROWTIME(time_attr, interval) TUMBLE_PROCTIME(time_attr, interval) ...
  • --------------- 伪代码 --------------- INSERT INTO kafka_sink_table SELECT -- 窗口开始时间 CAST( TUMBLE_START(proctime, INTERVAL '1' DAY) AS bigint ) AS window_start, -- 当前记录处理的时间 cast(max...
  • Flink:Flink-SQL开发

    千次阅读 2020-07-27 18:08:40
    Tumble Window Tumble 滚动窗口有固定大小,窗口数据不重叠,具体语义如下: Tumble 滚动窗口对应的语法如下: SELECT [gk], [TUMBLE_START(timeCol, size)], [TUMBLE_END(timeCol, size)], agg1(col1), ......
  • Flink实战-快速开始

    2020-11-20 23:44:05
    // define operator sql Table statisticTable = tableEnv.sqlQuery("SELECT deviceId, TUMBLE_START(`rowtime`, INTERVAL '5' MINUTE) as window_start, TUMBLE_END(`rowtime`, INTERVAL '5' MINUTE) as window_...
  • TUMBLE_START(time_attr, interval)  TUMBLE_END(time_attr, interval)  TUMBLE_ROWTIME(time_attr, interval)  TUMBLE_PROCTIME(time_attr, interval) 代码 //获取表执行环境 StreamTableEnvironment ...
  • TUMBLE_START(time_attr, interval) TUMBLE_END(time_attr, interval) TUMBLE_ROWTIME(time_attr, interval) TUMBLE_PROCTIME(time_attr, interval) 案例: val table2: TableResult = bsTableEnv.executeSql( """ ...
  • 1.2.1 滚动窗口 滚动窗口(Tumbling windows)要用Tumble类来定义,另外还有三个方法: over:定义窗口长度 on:用来分组(按时间间隔)或者排序(按行数)的时间字段 as:别名,必须出现在后面的groupBy中 代码...
  • // Table result=TableEnv.sqlQuery( // "SELECT SUM(CAST(ORIG_PRCP AS double)) AS AMT FROM cmis_loan WHERE LAST_INT_ACC_DT=CURRENT_DATE AND dataEventType='insert' GROUP BY TUMBLE(canalTimeStamp ,...
  • INSERT INTO Sink_TenMinPsgCnts SELECT TUMBLE_START(rideTime, INTERVAL '10' MINUTE) AS cntStart, TUMBLE_END(rideTime, INTERVAL '10' MINUTE) AS cntEnd, CAST(SUM(psgCnt) AS BIGINT) AS cnt FROM Rides ...
  • Flink源码分析--(二)FLink SQL Window功能

    千次阅读 2019-05-31 10:28:59
    DATE_FORMAT(TUMBLE_ROWTIME(ctime, INTERVAL '5' MINUTE), 'yyyy-MM-dd-HH-mm-ss:SSS'), --这里TUMBLE_ROWTIME为TUMBLE_END-1ms,一般用于后续窗口级联聚合 categoryName, SUM(price) FROM ...
  • Flink学习笔记

    千次阅读 2019-05-04 13:55:26
    insert into Sink_TenMinPsgCnts select tumble_start(rideTime, interval '10' minute), tumble_end(rideTime, interval '10' minute), cast(sum(psgCnt) as bigint) as cnt from rides group by tumble(ride...
  • ■ 2.2 Enhanced Tumble Window 有如下需求,用户希望在 Tumble Window 触发之后,不去丢弃迟到的数据,而是再次触发窗口计算。如果使用 DataStream API,使用 SideOutput 就可以完成需求。但是对于 SQL,目前是没...
  • Flink基础学习(五)

    2020-09-18 10:38:06
    Tumble Window Tumble 滚动窗口有固定大小,窗口数据不重叠,具体语义如下: Hop Window Hop 滑动窗口和滚动窗口类似,窗口有固定的 size,与滚动窗口不同的是滑动窗口可以通 过 slide 参数控制滑动窗口的新建频率...
  • TUMBLE_START(time_attr, interval) TUMBLE_END(time_attr, interval) TUMBLE_ROWTIME(time_attr, interval) TUMBLE_PROCTIME(time_attr, interval) Over Windows 由于 Over 本来就是 SQL 内置支持的语法,所以这在...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,589
精华内容 635
关键字:

tumble