精华内容
下载资源
问答
  • 而当下FlinkSQL的火热程度不用多说,FlinkSQL也为HBase提供了connector,因此HBase与FlinkSQL的结合非常有必要实践实践。当然,本文假设用户有一定的HBase知识基础,不会详细去介绍HBase的架构和原理,本文着重介绍...
  • flink-sql-submit-client 执行Flink SQL 文件的客户端 Flink 版本:flink 1.11.0 其他版本待测试 是在的基础上修改而来 使用简单方便 需要指定FLINK_HOME 下载上面code中的jar包 修改 sql-submit.sh 脚本中jar包的...
  • MicroBatch和MiniBatch都是微批处理,只是微批的触发机制略有不同。原理同样是缓存一定的数据后再触发处理,以减少对State的访问,从而提升吞吐并减少数据的输出量。...MicroBatch是MiniBatch的升级版,主要基于事件...
  • 阿里巴巴将该项目命名为Blink,主要由BlinkRuntime与FlinkSQL组成。BlinkRuntime是阿里巴巴内部高度定制化的计算内核,FlinkSQL则是面向用户的API层,我们完善了部分功能,比如Agg、Join、Windows处理等。今年,我们...
  • FlinkSQL flink sql api编程程序,实现ETL功能,包含读写Mysql,kafka 源Mysql MysqlCDC 下沉Mysql 使用说明 克隆仓库:git clone 使用idea打开项目 maven下载相关依赖 依赖下载完成后运行demo测试
  • flinksql学习笔记

    2020-10-19 22:06:58
    flink基础教程笔记,flinksql教程笔记及flink电商项目实战。git相关操作文档
  • FlinkSQL编程.pdf

    2019-07-17 12:00:23
    Flink 2019峰会 阿里大牛的技术, 在线教程有github:**,第6个文档 简明扼要的讲解FlinkSQL api及 源码分析和运行原理。值得收藏
  • 一。背景 阿里工作的时候是使用Blink进行流数据处理和计算,通过编写sql实现Blink的计算作业,开发...用户输入sql(ddl,query,dml) -> ddl对应为Flink的source和sink -> query/dml的insert into数据处理和计算
  • OPPO 作为手机厂商,基于 Android 定制了自己的 ColorOS 系统,当前日活跃用户超过 2 亿。围绕 ColorOS,OPPO 构建了很多互联网应用,比如应用商店、浏览器、信息流等。在运营这些互联网应用的过程中,OPPO 积累了...
  • flink-sql-cookbook:Apache Flink SQL Cookbook是Apache Flink SQL的示例,模式和用例的精选集合。 许多配方是完全独立的,可以按原样在Ververica Platform中运行
  • 本文整理自FlinkPMCmember云邪在ApacheFlinkMeetup2020·上海站的talk,旨在帮助用户快速了解新...2.详细解读FlinkSQL1.11新功能,E.g.connectors参数简化+动态Table参数减少代码冗余,内置connectors+LIKE语法帮助快速
  • 经过博主使用 flink sql 经验来看,并不是所有的 dwd,dws 聚合场景都适合 flink sql(截止发文阶段来说)!!! 其实这些目前不适合 flink sql 的场景总结下来就是在处理上比 datastream 还是会有一定的损失。 先...

    图片

    感谢您的小爱心(关注  +  点赞 + 再看),对博主的肯定,会督促博主持续的输出更多的优质实战内容!!!

    1.序篇-本文结构

    大数据羊说

    大数据羊说

    用数据提升美好事物发生的概率~

    34篇原创内容

    公众号

    源码公众号后台回复flink sql tumble window 的奇妙解析之路获取。

    针对 datastream api 大家都比较熟悉了,还是那句话,在 datastream 中,你写的代码逻辑是什么样的,它最终的执行方式就是什么样的。

    但是对于 flink sql 的执行过程,大家还是不熟悉的。上节使用 ETL,group agg(sum,count等)简单聚合类 query 带大家走进一条 flink sql query 逻辑的世界。帮大家至少能够熟悉在 flink sql 程序运行时知道 flink 程序在干什么。

    此节就是窗口聚合章节的第一篇,以一个最简单、最常用的分钟 tumble window 聚合案例给大家介绍其使用方式和原理。

    由于 flink 1.13 引入了 window tvf,所以 1.13 和 1.12 及之前版本的实现不同。本节先介绍 flink 1.12 及之前的 tumble window 实现。这也是大家在引入 flink sql 能力时最常使用的。

    本节依然从以下几个章节给大家详细介绍 flink sql 的能力。

    1. 目标篇-本文能帮助大家了解 flink sql 什么?
    • 回顾上节的 flink sql 适用场景的结论
    1. 概念篇-先聊聊常见的窗口聚合
    • 窗口竟然拖慢数据产出?

    • 常用的窗口

    1. 实战篇-简单的 tumble window 案例和运行原理
    • 先看一个 datastream 窗口案例

    • flink sql tumble window 的语义

    • tumble window 实际案例

    • GeneratedWatermarkGenerator - flink 1.12.1

    • BinaryRowDataKeySelector - flink 1.12.1

    • AggregateWindowOperator - flink 1.12.1

    1. 总结与展望篇

    先说说结论,以下这些结论已经在上节说过了,此处附上上节文章:

    1. 场景问题:flink sql 很适合简单 ETL,以及基本全部场景下的聚合类指标(本节要介绍的 tumble window 就在聚合类指标的范畴之内)。

    2. 语法问题:flink sql 语法其实是和其他 sql 语法基本一致的。基本不会产生语法问题阻碍使用 flink sql。但是本节要介绍的 tumble window 的语法就是略有不同的那部分。下面详细介绍。

    3. 运行问题:查看 flink sql 任务时的一些技巧,以及其中一些可能会碰到的坑:

    • 去 flink webui 就能看到这个任务目前在做什么。包括算子名称都会给直接展示给我们目前哪个算子在干啥事情,在处理啥逻辑。

    • sql 的 watermark 类型要设置为 TIMESTAMP(3)。

    • 事件时间逻辑中,sql api 和 datastream api 对于数据记录时间戳存储逻辑是不一样的。datastream api:每条记录的 rowtime 是放在 StreamRecord 中的时间戳字段中的。sql api:时间戳是每次都从数据中进行获取的。算子中会维护一个下标。可以按照下标从数据中获取时间戳。

    2.目标篇-本文能帮助大家了解 flink sql tumble window 什么?

    关于 flink sql tumble window 一般都会有以下问题。本文的目标也是为大家解答这些问题:

    1. 场景问题:场景问题就不必多说,datastream 在 tumble window 场景下的应用很多了,分钟级别聚合等常用场景

    2. 语法问题:flink sql 写 tumble window 任务时是一种与 hive sql 中没有的语法。下文详细介绍。

    3. 运行问题:使用一条简单的 tumble window sql 帮大家从 transformation、runtime 帮大家理解 tumble window 的整体运行机制。

    4. 理解误区:既然是 sql 必然要遵循 sql 语义,sql tumble window 聚合是输入多条,产出一条数据。并不像 datastream 那样可以在窗口 udf 中做到多对多。

    在正式开始聊 tumble window 之前,先看看上节 flink sql 适用场景的结论。让大家先有 flink sql 的一个整体印象以及结论。

    2.1.回顾上节的 flink sql 适用场景的结论

    不装了,我坦白了,flink sql 其实很适合干的活就是 dwd 清洗,dws 聚合。

    此处主要针对实时数仓的场景来说。flink sql 能干 dwd 清洗,dws 聚合,基本上实时数仓的大多数场景都能给覆盖了。

    flink sql 牛逼!!!

    但是!!!

    经过博主使用 flink sql 经验来看,并不是所有的 dwd,dws 聚合场景都适合 flink sql(截止发文阶段来说)!!!

    其实这些目前不适合 flink sql 的场景总结下来就是在处理上比 datastream 还是会有一定的损失。

    先总结下使用场景:

    1. dwd:简单的清洗、复杂的清洗、维度的扩充、各种 udf 的使用

    2. dws:各类聚合

    然后分适合的场景和不适合的场景来说,因为只这一篇不能覆盖所有的内容,所以本文此处先大致给个结论,之后会结合具体的场景详细描述。

    • 适合的场景:
    1. 简单的 dwd 清洗场景

    2. 全场景的 dws 聚合场景

    • 目前不太适合的场景:
    1. 复杂的 dwd 清洗场景:举例比如使用了很多 udf 清洗,尤其是使用很多的 json 类解析清洗

    2. 关联维度场景:举例比如 datastream 中经常会有攒一批数据批量访问外部接口的场景,flink sql 目前对于这种场景虽然有 localcache、异步访问能力,但是依然还是一条一条访问外部缓存,这样相比批量访问还是会有性能差距。

    3.概念篇-先聊聊常见的窗口聚合

    窗口聚合大家都在 datastream api 中很熟悉了,目前在实时数据处理的过程中,窗口计算可以说是最重要、最常用的一种计算方式了。

    但是在抛出窗口概念之前,博主有几个关于窗口的小想法说一下。

    3.1.窗口竟然拖慢数据产出?

    一个小想法。

    先抛结论:窗口会拖慢实时数据的产出,是在目前下游分析引擎能力有限的情况下的一种妥协方案。

    站在数据开发以及需求方的世界中,当然希望所有的数据都是实时来的,实时处理的,实时产出的,实时展现的

    举个例子:如果你要满足一个一分钟窗口聚合的 pv,uv,或者其他聚合需求。

    olap 数据服务引擎 就可以满足上述的实时来的,实时处理的,实时产出的,实时展现的的场景。flink 消费处理明细数据,产出到 kafka,然后直接导入到 olap 引擎中。查询时直接用 olap 做聚合。这其中是没有任何窗口的概念的。但是整个链路中,要保障端对端精确一次,要保障大数据量情况下 olap 引擎能够秒级查询返回,更何况有一些去重类指标的计算,等等场景。把这些压力都放在 olap 引擎的压力是很大的。

    因此在 flink 数据计算引擎中就诞生了窗口的概念。我们可以直接在计算引擎中进行窗口聚合计算,然后等到窗口结束之后直接把结果数据产出。这就出现了博主所说的窗口拖慢了实时数据产出的情况。而且窗口在处理不好的情况下可能会导致数据丢失。

    关于上述两种情况的具体优劣选择,都由大家自行选择。上述只是引出博主一些想法。

    3.2.常用的窗口

    目前已知的窗口分为以下四种。

    1. Tumble Windows2. Hop Windows3. Cumulate Windows****4. Session Windows

    这些窗口的具体描述直接见官网,有详细的说明。此处不赘述。

    https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sql/queries/window-agg/

    此处介绍下 flink 中常常会涉及到的两个容易混淆的概念就是:窗口 + key。这里来形象的说明下。

    • 窗口:时间周期上面的划分。将无限流进行纵向切分,将无限流切分为一个一个的窗口,窗口相当于是无限流中的一段时间内的数据。

    • key:数据类别上面的划分。将无限流进行横向划分,相同 key 的数据会被划分到一组中,这个 key 的数据也是一条无限流。

    如下图所示。

    图片

    1

    4.实战篇-简单的 tumble window 案例和运行原理

    源码公众号后台回复flink sql tumble window 的奇妙解析之路获取。

    4.1.先看一个 datastream 窗口案例

    在介绍 sql tumble window 窗口算子执行案例之前,先看一个 datastream 中的窗口算子案例。其逻辑都是相通的。会对我们了解 sql tumble window 算子有帮助。

    我们先看看 datastream 处理逻辑。

    以下面这个为例。

    public class _04_TumbleWindowTest {
    
        public static void main(String[] args) throws Exception {
    
            StreamExecutionEnvironment env =
                    StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
    
            env.setParallelism(1);
    
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            env.addSource(new UserDefinedSource())
                    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple4<String, String, Integer, Long>>(Time.seconds(0)) {
                        @Override
                        public long extractTimestamp(Tuple4<String, String, Integer, Long> element) {
                            return element.f3;
                        }
                    })
                    .keyBy(new KeySelector<Tuple4<String, String, Integer, Long>, String>() {
                        @Override
                        public String getKey(Tuple4<String, String, Integer, Long> row) throws Exception {
                            return row.f0;
                        }
                    })
                    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                    .sum(2)
                    .print();
    
            env.execute("1.12.1 DataStream TUMBLE WINDOW 案例");
        }
    
        private static class UserDefinedSource implements SourceFunction<Tuple4<String, String, Integer, Long>> {
    
            private volatile boolean isCancel;
    
            @Override
            public void run(SourceContext<Tuple4<String, String, Integer, Long>> sourceContext) throws Exception {
    
                while (!this.isCancel) {
    
                    sourceContext.collect(Tuple4.of("a", "b", 1, System.currentTimeMillis()));
    
                    Thread.sleep(10L);
                }
    
            }
    
            @Override
            public void cancel() {
                this.isCancel = true;
            }
        }
    }
    
    

    datastream 生产的具体的 transformation 如下图:

    图片

    24

    其中我们只关注最重要的 WindowOperator 算子。

    图片

    25

    其中 WindowOperator 算子包含的重要属性如下图。

    图片

    26

    来看看 WindowOperator 的执行逻辑。窗口执行的整体详细流程可以参考:http://wuchong.me/blog/2016/05/25/flink-internals-window-mechanism/

    图片

    23

    4.2.flink sql tumble window 的语义

    介绍到 tumble window 的语义,总要有对比的去介绍。这里的参照物就是 datastream api。

    在 datastream api 中。tumble window 一般用作以下两种场景。

    1. 业务场景:使用 tumble window 很轻松的计算出窗口内的聚合数据。一般是多条输入数据,窗口结束时一条输出数据。

    2. 优化场景:窗口聚合一批数据然后批量访问外部存储扩充维度、或者有一些自定义的处理逻辑。一般是多条输入数据,窗口结束时多条输出数据。

    但是在 sql api 中。tumble window 是聚合(group by)语义,聚合在 sql 标准中的数据处理逻辑是多条输入,在窗口触发时就输出一条数据的语义。而上面的常常用在 datastream 中的优化场景是多对多的场景。因此和 sql 语义不符合。所以 flink sql tumble window 一般都是用于计算聚合运算值来使用。

    4.3.tumble window 实际案例

    滚动窗口的特性就是会将无限流进行纵向划分成一个一个的窗口,每个窗口都是相同的大小,并且不重叠。

    图片

    22

    本文主要介绍 flink 1.12 及之前版本的实现。下一篇文章介绍 flink 1.13 的实现。

    来,在介绍原理之前,总要先用起来,我们就以下面这个例子展开。

    1.(flink 1.12.1)场景:简单且常见的分维度分钟级别同时在线用户数、总销售额

    数据源表:

    CREATE TABLE source_table (
        -- 维度数据
        dim STRING,
        -- 用户 id
        user_id BIGINT,
        -- 用户
        price BIGINT,
        -- 事件时间戳
        row_time AS cast(CURRENT_TIMESTAMP as timestamp(3)),
        -- watermark 设置
        WATERMARK FOR row_time AS row_time - INTERVAL '5' SECOND
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '10',
      'fields.dim.length' = '1',
      'fields.user_id.min' = '1',
      'fields.user_id.max' = '100000',
      'fields.price.min' = '1',
      'fields.price.max' = '100000'
    )
    
    

    **Notes - 关于 watermark 容易踩得坑:**sql 的 watermark 类型必须要设置为 TIMESTAMP(3)

    数据汇表:

    CREATE TABLE sink_table (
        dim STRING,
        pv BIGINT,
        sum_price BIGINT,
        max_price BIGINT,
        min_price BIGINT,
        uv BIGINT,
        window_start bigint
    ) WITH (
      'connector' = 'print'
    )
    
    

    数据处理逻辑:

    可以看下下面语法,窗口聚合的写法有专门的 tumble(row_time, interval '1' minute) 写法,这就是与平常我们写的 hive sql,mysql 等不一样的地方。

    insert into sink_table
    select dim,
           sum(bucket_pv) as pv,
           sum(bucket_sum_price) as sum_price,
           max(bucket_max_price) as max_price,
           min(bucket_min_price) as min_price,
           sum(bucket_uv) as uv,
           max(window_start) as window_start
    from (
         select dim,
                count(*) as bucket_pv,
                sum(price) as bucket_sum_price,
                max(price) as bucket_max_price,
                min(price) as bucket_min_price,
                -- 计算 uv 数
                count(distinct user_id) as bucket_uv,
                cast(tumble_start(row_time, interval '1' minute) as bigint) * 1000 as window_start
         from source_table
         group by
                -- 按照用户 id 进行分桶,防止数据倾斜
                mod(user_id, 1024),
                dim,
                tumble(row_time, interval '1' minute)
    )
    group by dim,
             window_start
    
    

    2.运行:可以看到,其实在 flink sql 任务中,其会把对应的处理逻辑给写到算子名称上面。

    **Notes - 观察 flink sql 技巧 1:**这个其实就是我们观察 flink sql 任务的第一个技巧。如果你想知道你的 flink 任务在干啥,第一反应是去 flink webui 看看这个任务目前在做什么。包括算子名称都会给直接展示给我们目前哪个算子在干啥事情,在处理啥逻辑

    先看一下整个算子图,如下图。从左到右总共分为三个算子。

    1. 第一个算子就是数据源算子

    2. 第二个算子就是分了桶的窗口聚合算子,第一个算子和第二个算子之间 hash 传输就是按照 group key 进行 hash 传输

    3. 第三个算子就是外层进行合桶计算的算子,同样也是 hash 传输,将分桶的数据在一个算子中进行合并计算

    图片

    5

    来看看每一个算子具体做了什么事情。

    第一个算子:

    1. table scan 读取数据源

    2. 从数据源中获取对应的字段(包括源表定义的 rowtime)

    3. 分配 watermark(按照源表定义的 watermark 分配对应的 watermark)

    4. 将一些必要的字段抽取。比如 group by 中的字段。在 hash 时需要使用。

    图片

    6

    第二个算子:

    1. 窗口聚合,计算窗口聚合数据

    2. 将数据按照第一层 select 中的数据进行计算以及格式化

    图片

    7

    第三个算子:

    1. group 聚合合桶计算

    2. 将数据按照第二层 select 中的数据进行计算以及格式化

    3. 将数据 sink 写出

    图片

    8

    3.(flink 1.12.1)结果:

    +I(9,1,32682,32682,32682,1,1631026440000)
    -U(9,1,32682,32682,32682,1,1631026440000)
    +U(9,2,115351,82669,32682,2,1631026440000)
    +I(2,1,76148,76148,76148,1,1631026440000)
    +I(8,1,79321,79321,79321,1,1631026440000)
    +I(a,1,85792,85792,85792,1,1631026440000)
    +I(0,1,12858,12858,12858,1,1631026440000)
    +I(5,1,36753,36753,36753,1,1631026440000)
    +I(3,1,19218,19218,19218,1,1631026440000)
    ...
    
    

    4.(flink 1.12.1)原理:

    关于 sql 开始运行的机制见上一节详述。

    此处只介绍相比前一节新增内容。可以看到上述代码的具体 transformation 如下图。

    图片

    9

    4.4.GeneratedWatermarkGenerator - flink 1.12.1

    按照顺序,首先看看 watermark 算子。同 datastream 的自定义 watermark 分配策略。

    图片

    10

    watermark 生成的具体代码 WatermarkGenerator$6,主要获取 watermark 的逻辑在 currentWatermark 方法中。如下图。

    图片

    11

    4.5.BinaryRowDataKeySelector - flink 1.12.1

    接着就是 group by(同 datastream 中的 keyby)。

    图片

    12

    group by key 生成的具体代码 KeyProjection$19,主要逻辑在 apply 方法中。

    图片

    13

    下一个就是窗口聚合算子。

    4.6.AggregateWindowOperator - flink 1.12.1

    兄弟们!!!兄弟们!!!兄弟们!!!

    本节的重头戏来了。sql 窗口聚合算子解析搞起来了。

    关于 WatermarkGeneratorKeyProjection 没有什么可以详细介绍的,都是输入一条数据,输出一条数据,逻辑很简单。

    但是窗口聚合算子的计算逻辑相比上面两个算子复杂很多。窗口算子又承载了窗口聚合的主要逻辑,所以本文重点介绍窗口算子计算的逻辑。

    先来看看 sql 窗口整体处理流程。其实与 datastream 处理流程基本一致,但只是少了 Evictor。如下图所示。

    图片

    40

    接着来看看上述 sql 生成的窗口聚合算子 AggregateWindowOperator,截图中属性也很清晰。

    图片

    16

    图片

    14

    具体生成的窗口聚合代码 GroupingWindowAggsHandler$59

    图片

    41

    计算逻辑 GroupingWindowAggsHandler$59#accumulate

    图片

    42

    图片

    43

    上面那段都是在 flink 客户端初始化处理的。包括窗口算子的初始化等。

    下面这段处理逻辑是在 flink TM 运行时开始执行的,包括窗口算子资源的初始化以及运行逻辑。就到了正式的数据处理环节了。

    窗口算子 Task 运行。

    图片

    27

    窗口算子 Task 初始化。

    图片

    28

    StreamTask 整体的处理流程。

    图片

    29

    窗口算子 open 初始化。

    图片

    30

    图片

    31

    窗口算子 open 初始化后的结果。如下图,对应的具体组件。

    图片

    32

    初始化完成之后,开始处理具体数据。

    图片

    图片

    循环 loop,一直 run 啊 run。

    图片

    35

    判断记录的具体类型,然后执行不同的逻辑。

    图片

    36

    来看看处理一条数据的 processElement 方法逻辑,进行 acc 处理。代码中的的 windowAggregator 就是之前代码生成的 GroupingWindowAggsHandler$59

    **Notes:**事件时间逻辑中,sql api 和 datastream api 对于数据记录时间戳存储逻辑是不一样的。datastream api:每条记录的 rowtime 是放在 StreamRecord 中的时间戳字段中的。sql api:时间戳是每次都从数据中进行获取的。算子中会维护一个下标。可以按照下标从数据中获取时间戳。

    图片

    39

    来看看 watermark 到达并且触发窗口计算时,执行 onEventTime 逻辑。

    图片

    38

    触发窗口计算时,onEventTime -> emitWindowResult,产出具体数据。

    图片

    17

    至此整个 sql tumble window 的处理逻辑也就很清楚了。和 datastream 基本上都是一致的。是不是整个逻辑就理清楚了。

    5.总结与展望篇

    源码公众号后台回复flink sql tumble window 的奇妙解析之路获取。

    大数据羊说

    大数据羊说

    用数据提升美好事物发生的概率~

    34篇原创内容

    公众号

    本文主要介绍了 tumble window 聚合类指标的常见场景案例以及其底层运行原理。

    而且也介绍了在查看 flink sql 任务时的一些技巧:

    1. 去 flink webui 就能看到这个任务目前在做什么。包括算子名称都会给直接展示给我们目前哪个算子在干啥事情,在处理啥逻辑。

    2. sql 的 watermark 类型要设置为 TIMESTAMP(3)。

    3. 事件时间逻辑中,sql api 和 datastream api 对于数据记录时间戳存储逻辑是不一样的。datastream api:每条记录的 rowtime 是放在 StreamRecord 中的时间戳字段中的。sql api:时间戳是每次都从数据中进行获取的。算子中会维护一个下标。可以按照下标从数据中获取时间戳。

    后续文章会基于一些最常见的案例以及原理层面介绍 1.13 版本的 flink sql tumble window(基于最新的 window tvf)。

    希望大家能持续关注。支持博主。喜欢的请关注 + 点赞 + 再看。

    往期推荐

    [

    flink sql 知其所以然(七):不会连最适合 flink sql 的 ETL 和 group agg 场景都没见过吧?

    ](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247489235&idx=1&sn=66c2b95aa3e22069a12b3d53b6d1d9f3&chksm=c1549a2bf623133d7a75732b5cea4bc304bf06a53777963d5cac8406958114873294dc3e4699&scene=21#wechat_redirect)

    [

    flink sql 知其所以然(六)| flink sql 约会 calcite(看这篇就够了)

    ](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247489112&idx=1&sn=21e86dab0e20da211c28cd0963b75ee2&chksm=c1549aa0f62313b6674833cd376b2a694752a154a63532ec9446c9c3013ef97f2d57b4e2eb64&scene=21#wechat_redirect)

    [

    flink sql 知其所以然(五)| 自定义 protobuf format

    ](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247488994&idx=1&sn=20236350b1c8cfc4ec5055687b35603d&chksm=c154991af623100c46c0ed224a8264be08235ab30c9f191df7400e69a8ee873a3b74859fb0b7&scene=21#wechat_redirect)

    [

    flink sql 知其所以然(四)| sql api 类型系统

    ](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247488788&idx=1&sn=0127fd4037788762a0401313b43b0ea5&chksm=c15499ecf62310fa747c530f722e631570a1b0469af2a693e9f48d3a660aa2c15e610653fe8c&scene=21#wechat_redirect)

    [

    flink sql 知其所以然(三)| 自定义 redis 数据汇表(附源码)

    ](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247488720&idx=1&sn=5695e3691b55a7e40814d0e455dbe92a&chksm=c1549828f623113e9959a382f98dc9033997dd4bdcb127f9fb2fbea046545b527233d4c3510e&scene=21#wechat_redirect)

    [

    flink sql 知其所以然(二)| 自定义 redis 数据维表(附源码)

    ](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247488635&idx=1&sn=41817a078ef456fb036e94072b2383ff&chksm=c1549883f623119559c47047c6d2a9540531e0e6f0b58b155ef9da17e37e32a9c486fe50f8e3&scene=21#wechat_redirect)

    [

    flink sql 知其所以然(一)| source\sink 原理

    ](http://mp.weixin.qq.com/s?__biz=MzkxNjA1MzM5OQ==&mid=2247488486&idx=1&sn=b9bdb56e44631145c8cc6354a093e7c0&chksm=c1549f1ef623160834e3c5661c155ec421699fc18c57f2c63ba14d33bab1d37c5930fdce016b&scene=21#wechat_redirect)

    更多 Flink 实时大数据分析相关技术博文,视频。后台回复 “flink” 或者 “flink sql” 获取。

    点个赞+在看,感谢您的肯定 👇
    
    展开全文
  • 3.怎样利用 Flink SQL 做多流 join 后实时同步到 Elasticsearch 中? 1 Flink 1.8 ~ 1.11 社区发展趋势回顾 自 2019 年初阿里巴巴宣布向 Flink 社区贡献 Blink 源码并在同年 4 月发布 Flink 1.8 版本后,Flink ...

    问题导读

    1.Flink 1.11 有哪些新功能?
    2.如何使用 flink-cdc-connectors 捕获 MySQL 和 Postgres 的数据变更?
    3.怎样利用 Flink SQL 做多流 join 后实时同步到 Elasticsearch 中?

     

    1 Flink 1.8 ~ 1.11 社区发展趋势回顾

    自 2019 年初阿里巴巴宣布向 Flink 社区贡献 Blink 源码并在同年 4 月发布 Flink 1.8 版本后,Flink 在社区的活跃程度犹如坐上小火箭般上升,每个版本包含的 git commits 数量以 50% 的增速持续上涨, 吸引了一大批国内开发者和用户参与到社区的生态发展中来,中文用户邮件列表(user-zh@)更是在今年 6 月首次超出英文用户邮件列表(user@),在 7 月超出比例达到了 50%。对比其它 Apache 开源社区如 Spark、Kafka 的用户邮件列表数(每月约 200 封左右)可以看出,整个 Flink 社区的发展依然非常健康和活跃。

     

     

     

    2 Flink SQL 新功能解读

     

    在了解 Flink 整体发展趋势后,我们来看下最近发布的 Flink 1.11 版本在 connectivity 和 simplicity 方面都带来了哪些令人耳目一新的功能。

    FLIP-122:简化 connector 参数

    整个 Flink SQL 1.11 在围绕易用性方面做了很多优化,比如 FLIP-122[1] 。

    优化了 connector 的 property 参数名称冗长的问题。以 Kafka 为例,在 1.11 版本之前用户的 DDL 需要声明成如下方式:

    CREATE TABLE user_behavior (  ...) WITH (  'connector.type'='kafka',  'connector.version'='universal',  'connector.topic'='user_behavior',  'connector.startup-mode'='earliest-offset',  'connector.properties.zookeeper.connect'='localhost:2181',  'connector.properties.bootstrap.servers'='localhost:9092',  'format.type'='json');
    

    而在 Flink SQL 1.11 中则简化为:

    CREATE TABLE user_behavior (  ...) WITH (  'connector'='kafka',  'topic'='user_behavior',  'scan.startup.mode'='earliest-offset',  'properties.zookeeper.connect'='localhost:2181',  'properties.bootstrap.servers'='localhost:9092',  'format'='json');

     

    DDL 表达的信息量丝毫未少,但是看起来清爽许多 :)。Flink 的开发者们为这个优化做了很多讨论,有兴趣可以围观 FLIP-122 Discussion Thread[2]。

    FLINK-16743:内置 connectors

    Flink SQL 1.11 新加入了三种内置的 connectors,如下表所示:

    connector 描述使用场景
    'connector'='datagen'用于生成随机数据的source常用于测试
    'connector'='blackhole'不做任何处理的 sink常用于性能测试
    'connector'='print' 打印到标准输出流(.out文件)的 sink常用于调试

    在外部 connector 环境还没有 ready 时,用户可以选择 datagen source 和 print sink 快速构建 pipeline 熟悉 Flink SQL;对于想要测试 Flink SQL 性能的用户,可以使用 blackhole 作为 sink;对于调试排错场景,print sink 会将计算结果打到标准输出(比如集群环境下就会打到 taskmanager.out 文件),使得定位问题的成本大大降低。

    FLIP-110:LIKE 语法

    Flink SQL 1.11 支持用户从已定义好的 table DDL 中快速 “fork” 自己的版本并进一步修改 watermark 或者 connector 等属性。比如下面这张 base_table 上想加一个 watermark,在 Flink 1.11 版本之前,用户只能重新将表声明一遍,并加入自己的修改,可谓 “牵一发而动全身”。

    -- before Flink SQL 1.11CREATE TABLE base_table (  id BIGINT,  name STRING,  ts TIMESTAMP) WITH (  'connector.type'='kafka',  ...);
    CREATE TABLE derived_table (  id BIGINT,  name STRING,  ts TIMESTAMP,  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND) WITH (  'connector.type'='kafka',  ...);

     

    从 Flink 1.11 开始,用户只需要使用 CREATE TABLE LIKE 语法就可以完成之前的操作。

    -- Flink SQL 1.11
    CREATE TABLE base_table (  id BIGINT,  name STRING,  ts TIMESTAMP) WITH (  'connector'='kafka',  ...);
    CREATE TABLE derived_table (  WATERMARK FOR ts AS ts - INTERVAL '5' SECOND) LIKE base_table;

     

    而内置 connector 与 CREATE TABLE LIKE 语法搭配使用则会如下图一般产生“天雷勾地火”的效果,极大提升开发效率。

     

     

    FLIP-113:动态 Table 参数

    对于像 Kafka 这种消息队列,在声明 DDL 时通常会有一个启动点位去指定开始消费数据的时间,如果需要更改启动点位,在老版本上就需要重新声明一遍新点位的 DDL,非常不方便。

     

    CREATE TABLE user_behavior (  user_id BIGINT,  behavior STRING,  ts TIMESTAMP(3)) WITH (  'connector'='kafka',  'topic'='user_behavior',  'scan.startup.mode'='timestamp',  'scan.startup.timestamp-millis'='123456',  'properties.bootstrap.servers'='localhost:9092',  'format'='json');

     

    从 Flink 1.11 开始,用户可以在 SQL client 中按如下方式设置开启 SQL 动态参数(默认是关闭的),如此即可在 DML 里指定具体的启动点位。

     

    SET 'table.dynamic-table-options.enabled' = 'true';
    SELECT user_id, COUNT(DISTINCT behaviro)FROM user_behavior /*+ OPTIONS('scan.startup.timestamp-millis'='1596282223') */GROUP BY user_id;

     

    除启动点位外,动态参数还支持像 sink.partition、scan.startup.mode 等更多运行时参数,感兴趣可移步 FLIP-113[3],获得更多信息。

     

    FLIP-84:重构优化 TableEnvironment 接口 

    Flink SQL 1.11 以前的 TableEnvironment 接口定义和行为有一些不够清晰,比如:

    • TableEnvironment#sqlUpdate() 方法对于 DDL 会立即执行,但对于 INSERT INTO DML 语句却是 buffer 住的,直到调用 TableEnvironment#execute() 才会被执行,所以在用户看起来顺序执行的语句,实际产生的效果可能会不一样。

    • 触发作业提交有两个入口,一个是 TableEnvironment#execute(),另一个是 StreamExecutionEnvironment#execute(),于用户而言很难理解应该使用哪个方法触发作业提交。

    • 单次执行不接受多个 INSERT INTO 语句。

     

    针对这些问题,Flink SQL 1.11 提供了新 API,即 TableEnvironment#executeSql(),它统一了执行 SQL 的行为, 无论接收 DDL、查询 query 还是 INSERT INTO 都会立即执行。针对多 sink 场景提供了 StatementSet 和 TableEnvironment#createStatementSet() 方法,允许用户添加多条 INSERT 语句一起执行。

     

    除此之外,新的 execute 方法都有返回值,用户可以在返回值上执行 print,collect 等方法。

    新旧 API 对比如下表所示:

    Current InterfaceNew Interface
    tEnv.sqlUpdate("CREATE TABLE...”);TableResult result = tEnv.executeSql("CREATE TABLE...”);
    tEnv.sqlUpdate("INSERT INTO...SELECT...”);
    tEnv.execute();
    TableResult result = 
    tEnv.executeSql("INSERT INTO ... SELECT...”);
    tEnv.sqlUpdate("insert into xx ...”); 
    tEnv.sqlUpdate("insert into yy ...”); 
    tEnv.execute();
    StatementSet ss =tEnv.createStatementSet(); 
    ss.addInsertSql("insert into xx ...”); 
    ss.addInsertSql("insert into yy ...”); 
    TableResult result = ss.execute();

     

    对于在 Flink 1.11 上使用新接口遇到的一些常见问题,云邪做了统一解答,可在 Appendix 部分查看。

     

    FLIP-95:TableSource & TableSink 重构

    开发者们在 Flink SQL 1.11 版本花了大量经历对 TableSource 和 TableSink API 进行了重构,核心优化点如下:

    • 移除类型相关接口,简化开发,解决迷惑的类型问题,支持全类型

    • 寻找 Factory 时,更清晰的报错信息

    • 解决找不到 primary key 的问题

    • 统一了流批 source,统一了流批 sink

    • 支持读取 CDC 和输出 CDC

    • 直接高效地生成 Flink SQL 内部数据结构 RowData

     

    新 DynamicTableSink API 去掉了所有类型相关接口,因为所有的类型都是从 DDL 来的,不需要 TableSink 告诉框架是什么类型。而对于用户来说,最直观的体验就是在老版本上遇到各种奇奇怪怪报错的概率降低了很多,比如不支持的精度类型和找不到 primary key / table factory 的诡异报错在新版本上都不复存在了。关于 Flink 1.11 是如何解决这些问题的详细可以在附录部分阅读。

     

    FLIP-123:Hive Dialect

     

    Flink 1.10 版本对 Hive connector 的支持达到了生产可用,但是老版本的 Flink SQL 不支持 Hive DDL 及使用 Hive syntax,这无疑限制了 Flink connectivity。在新版本中,开发者们为支持 HiveQL 引入了新 parser,用户可以在 SQL client 的 yaml 文件中指定是否使用 Hive 语法,也可以在 SQL client 中通过 set table.sql-dialect=hive/default 动态切换。更多信息可以参考 FLIP-123[4]。

     

    以上简要介绍了 Flink 1.11 在减少用户不必要的输入和操作方面对 connectivity 和 simplicity 方面做出的优化。下面会重点介绍在外部系统和数据生态方面对 connectivity 和 simplicity 的两个核心优化,并附上最佳实践介绍。

     

    3 Hive 数仓实时化 & Flink SQL + CDC 最佳实践

     

    Hive 数仓实时化

    下图是一张非常经典的 Lambda 数仓架构,在整个大数据行业从批处理逐步拥抱流计算的许多年里代表“最先进的生产力”。然而随着业务发展和规模扩大,两套单独的架构所带来的开发、运维、计算成本问题已经日益凸显。

     

     

    而 Flink 作为一个流批一体的计算引擎,在最初的设计上就认为“万物本质皆是流”,批处理是流计算的特例,如果能够在自身提供高效批处理能力的同时与现有的大数据生态结合,则能以最小侵入的方式改造现有的数仓架构使其支持流批一体。在新版本中,Flink SQL 提供了开箱即用的 “Hive 数仓同步”功能,即所有的数据加工逻辑由 Flink SQL 以流计算模式执行,在数据写入端,自动将 ODS,DWD 和 DWS 层的已经加工好的数据实时回流到 Hive table。One size (sql) fits for all suites (tables) 的设计,使得在 batch 层不再需要维护任何计算 pipeline。

     

     

    对比传统架构,它带来的好处和解决的问题有哪些呢?

     

    • 计算口径与处理逻辑统一,降低开发和运维成本

    传统架构维护两套数据 pipeline 最大的问题在于需要保持它们处理逻辑的等价性,但由于使用了不同的计算引擎(比如离线使用 Hive,实时使用 Flink 或 Spark Streaming),SQL 往往不能直接套用,存在代码上的差异性,经年累月下来,离线和实时处理逻辑很可能会完全 diverge,有些大的公司甚至会存在两个团队分别去维护实时和离线数仓,人力物力成本非常高。Flink 支持 Hive Streaming Sink 后,实时处理结果可以实时回流到 Hive 表,离线的计算层可以完全去掉,处理逻辑由 Flink SQL 统一维护,离线层只需要使用回流好的 ODS、DWD、DWS 表做进一步 ad-hoc 查询即可。

    • 离线对于“数据漂移”的处理更自然,离线数仓“实时化”

    离线数仓 pipeline 非 data-driven 的调度执行方式,在跨分区的数据边界处理上往往需要很多 trick 来保证分区数据的完整性,而在两套数仓架构并行的情况下,有时会存在对 late event 处理差异导致数据对比不一致的问题。而实时 data-driven 的处理方式和 Flink 对于 event time 的友好支持本身就意味着以业务时间为分区(window),通过 event time + watermark 可以统一定义实时和离线数据的完整性和时效性,Hive Streaming Sink 更是解决了离线数仓同步的“最后一公里问题”。

     

    下面会以一个 Demo 为例,介绍 Hive 数仓实时化的最佳实践。

     

    ■ 实时数据写入 Hive 的最佳实践

    FLIP-105:支持 Change Data Capture (CDC)

    除了对 Hive Streaming Sink 的支持,Flink SQL 1.11 的另一大亮点就是引入了 CDC 机制。CDC 的全称是 Change Data Capture,用于 tracking 数据库表的增删改查操作,是目前非常成熟的同步数据库变更的一种方案。在国内常见的 CDC 工具就是阿里开源的 Canal,在国外比较流行的有 Debezium。Flink SQL 在设计之初就提出了 Dynamic Table 和“流表二象性”的概念,并且在 Flink SQL 内部完整支持了 Changelog 功能,相对于其他开源流计算系统是一个重要优势。本质上 Changelog 就等价于一张一直在变化的数据库的表。Dynamic Table 这个概念是 Flink SQL 的基石, Flink SQL 的各个算子之间传递的就是 Changelog,完整地支持了 Insert、Delete、Update 这几种消息类型。

     

    得益于 Flink SQL 运行时的强大,Flink 与 CDC 对接只需要将外部的数据流转为 Flink 系统内部的 Insert、Delete、Update 消息即可。进入到 Flink 内部后,就可以灵活地应用 Flink 各种 query 语法了。

     

     

    在实际应用中,把 Debezium Kafka Connect Service 注册到 Kafka 集群并带上想同步的数据库表信息,Kafka 则会自动创建 topic 并监听 Binlog,把变更同步到 topic 中。在 Flink 端想要消费带 CDC 的数据也很简单,只需要在 DDL 中声明 format = debezium-json 即可。

     

     

    在 Flink 1.11 上开发者们还做了一些有趣的探索,既然 Flink SQL 运行时能够完整支持 Changelog,那是否有可能不需要 Debezium 或者 Canal 的服务,直接通过 Flink 获取 MySQL 的变更呢?答案当然是可以,Debezium 类库的良好设计使得它的 API 可以被封装为 Flink 的 Source Function,不需要再起额外的 Service,目前这个项目已经开源,支持了 MySQL 和 Postgres 的 CDC 读取,后续也会支持更多类型的数据库,可移步到下方链接解锁更多使用姿势。

    https://github.com/ververica/flink-cdc-connectors

    下面的 Demo 会介绍如何使用 flink-cdc-connectors 捕获 MySQL 和 Postgres 的数据变更,并利用 Flink SQL 做多流 join 后实时同步到 Elasticsearch 中。

     

     

    假设你在一个电商公司,订单和物流是你最核心的数据,你想要实时分析订单的发货情况。因为公司已经很大了,所以商品的信息、订单的信息、物流的信息,都分散在不同的数据库和表中。我们需要创建一个流式 ETL,去实时消费所有数据库全量和增量的数据,并将他们关联在一起,打成一个大宽表。从而方便数据分析师后续的分析。

     

    ■ 使用 Flink SQL CDC 的最佳实践展示

    4 Flink SQL 1.12 未来规划

     

    以上介绍了 Flink SQL 1.11 的核心功能与最佳实践,对于下个版本,云邪也给出了一些 ongoing 的计划,并欢迎大家在社区积极提出意见 & 建议。

    • FLIP-132[5]:Temporal Table DDL (Binlog 模式的维表关联)

    • FLIP-129[6]:重构 Descriptor API (Table API 的 DDL)

    • 支持 Schema Registry Avro 格式

    • CDC 更完善的支持(批处理,upsert 输出到 Kafka 或 Hive)

    • 优化 Streaming File Sink 小文件问题

    • N-ary input operator (Batch 性能提升)

    5 附录

     

    使用新版本 TableEnvironment 遇到的常见报错及原因

    第一个常见报错是 No operators defined in streaming topolog。遇到这个问题的原因是在老版本中执行 INSERT INTO 语句的下面两个方法: 

    TableEnvironment#sqlUpdate()TableEnvironment#execute()

     

    在新版本中没有完全向前兼容(方法还在,执行逻辑变了),如果没有将 Table 转换为 AppendedStream/RetractStream 时(通过StreamExecutionEnvironment#toAppendStream/toRetractStream),上面的代码执行就会出现上述错误;与此同时,一旦做了上述转换,就必须使用 StreamExecutionEnvironment#execute() 来触发作业执行。所以建议用户还是迁移到新版本的 API 上面,语义上也会更清晰一些。

     

    第二个问题是调用新的 TableEnvironemnt#executeSql() 后 print 没有看到返回值,原因是因为目前 print 依赖了 checkpoint 机制,开启 exactly-onece 后就可以了,新版本会优化此问题。

     

     

    老版本的 StreamTableSource、StreamTableSink 常见报错及新版本优化

    第一个常见报错是不支持精度类型,经常出现在 JDBC 或者 HBase 数据源上 ,在新版本上这个问题就不会再出现了。

     

     

    第二个常见报错是 Sink 时找不到 PK,因为老的 StreamSink 需要通过 query 去推导出 PK,当 query 变得复杂时有可能会丢失 PK 信息,但实际上 PK 信息在 DDL 里就可以获取,没有必要通过 query 去推导,所以新版本的 Sink 就不会再出现这个错误啦。

     

     

    第三个常见报错是在解析 Source 和 Sink 时,如果用户少填或者填错了参数,框架返回的报错信息很模糊,“找不到 table factory”,用户也不知道该怎么修改。这是因为老版本 SPI 设计得比较通用,没有对 Source 和 Sink 解析的逻辑做单独处理,当匹配不到完整参数列表的时候框架已经默认当前的 table factory 不是要找的,然后遍历所有的 table factories 发现一个也不匹配,就报了这个错。在新版的加载逻辑里,Flink 会先判断 connector 类型,再匹配剩余的参数列表,这个时候如果必填的参数缺失或填错了,框架就可以精准报错给用户。

     

     

    展开全文
  • flink sql几种Join方式

    2021-09-26 00:29:32
    flink SQL 适合离线处理的两种方式 package com.staywithyou.flink.apitest.tableapi; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment....

    flink SQL 适合离线处理的两种方式

    package com.staywithyou.flink.apitest.tableapi;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.TableEnvironment;
    import org.apache.flink.table.api.TableResult;
    import org.apache.flink.types.Row;
    import org.apache.flink.util.CloseableIterator;
    
    public class TableTest3_demoBatch {
    
        public static void main(String[] args) throws Exception{
    
            /**
             *  批批Join
             */
    
            //创建执行环境
            TableEnvironment tEnv= TableEnvironment.create(EnvironmentSettings.newInstance()
                    .useBlinkPlanner().inBatchMode()
                    .build());
    
              tEnv.executeSql( "CREATE TABLE score (\n" +
                    "  s_id int,\n" +
                    "  s_core STRING,\n" +
                    "  s_score int,\n" +
                    "  proc_time AS PROCTIME() --使用维表时需要指定该字段\n" +
                    ") WITH (\n" +
                    "  'connector' = 'jdbc', -- 连接器\n" +
                    "  'driver'='com.mysql.jdbc.Driver',\n" +
                    "  --'hostname' = 'hadoop103',   --mysql地址\n" +
                    "  --'port' = '3306',  -- mysql端口\n" +
                    "  'username' = 'root',  --mysql用户名\n" +
                    "  'password' = 'Hezijiduihua1.',  -- mysql密码\n" +
                    "  --'database-name' = 'cdc', --  数据库名称\n" +
                    "  'table-name' = 'score',\n" +
                    "  'url' = 'jdbc:mysql://192.168.217.31:3306/test'\n" +
                    ")\n");
    
            tEnv.executeSql( "CREATE TABLE student (\n" +
                    "  s_id int,\n" +
                    "  s_name STRING\n" +
                    ") WITH (\n" +
                    "  'connector' = 'jdbc', -- 连接器\n" +
                    "  'driver'='com.mysql.jdbc.Driver',\n" +
                    "  --'hostname' = 'hadoop103',   --mysql地址\n" +
                    "  --'port' = '3306',  -- mysql端口\n" +
                    "  'username' = 'root',  --mysql用户名\n" +
                    "  'password' = 'Hezijiduihua1.',  -- mysql密码\n" +
                    "  --'database-name' = 'cdc', --  数据库名称\n" +
                    "  'table-name' = 'student',\n" +
                    "  'url' = 'jdbc:mysql://192.168.217.31:3306/test'\n" +
                    ")\n");
    
            Table table = tEnv.sqlQuery("select a.s_id," +
                    "a.s_name," +
                    "b.s_core," +
                    "b.s_score " +
                    "from student a " +
                    "left join score b on a.s_id=b.s_id");
    
            TableResult tableResult = table.execute();
            CloseableIterator<Row> collect = tableResult.collect();
            while(collect.hasNext()) {
                Row row = collect.next();
                System.out.println(row);
            }
    
        }
    
    }
    
    
    package com.staywithyou.flink.apitest.tableapi;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.TableResult;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    import org.apache.flink.util.CloseableIterator;
    import org.apache.flink.table.factories.DeserializationFormatFactory;
    
    
    public class TableTest4_demoStreamRegular {
        public static void main(String[] args) throws Exception {
            /**
             * regular join 使用场景在小数据量和离线数据场景中使用
             */
            //定义环境
            StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
    
            EnvironmentSettings es= EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
            StreamTableEnvironment tEnv= StreamTableEnvironment.create(env,es);
    
            //打开kafka连接
            //温度传感器内容
            tEnv.executeSql("CREATE TABLE sensorreading (\n" +
                    "    id  STRING COMMENT '传感器唯一ID',\n" +
                    "    up_timestamp BIGINT COMMENT '传感器上抛时间',\n" +
                    "    temperature DOUBLE COMMENT '传感器温度',\n" +
                    "    procTime AS PROCTIME(),  " +
                    "    ets AS TO_TIMESTAMP(FROM_UNIXTIME(up_timestamp / 1000)), "+
                    "    WATERMARK FOR ets AS ets - INTERVAL '15' SECOND\n" +
                    ") WITH (\n" +
                    "    'connector' = 'kafka',\n" +
                    "    'topic'     = 'flinksinktest',\n" +
                    "    'properties.group.id' = 'gf14'," +
                    "    'properties.bootstrap.servers' = '192.168.217.32:9092',\n" +
                    "    'format'    = 'json'\n" +
                    ")");
    
    
            //打开kafka连接
            //温度传感器类型(维度表) 暂时作为流式数据处理
            tEnv.executeSql("CREATE TABLE dim_sensorreading (\n" +
                    "    id  STRING COMMENT '传感器唯一ID',\n" +
                    "    sensor_type  STRING COMMENT '传感器类型',\n" +
                    "    warn_timestamp BIGINT COMMENT '传感器报警时间',\n" +
                    "    ets AS TO_TIMESTAMP(FROM_UNIXTIME(warn_timestamp / 1000)), "+
                    "    WATERMARK FOR ets AS ets - INTERVAL '15' SECOND\n" +
                    ") WITH (\n" +
                    "    'connector' = 'kafka',\n" +
                    "    'topic'     = 'test',\n" +
                    "    'properties.group.id' = 'gf14'," +
                    "    'properties.bootstrap.servers' = '192.168.217.31:9092',\n" +
                    "    'format'    = 'json'\n" +
                    ")");
    
    
            Table table = tEnv.sqlQuery("select a.id,a.up_timestamp,a.temperature,b.sensor_type,b.warn_timestamp from sensorreading a inner join dim_sensorreading b on a.id=b.id where a.temperature>=40.0");
    
            //查看传感器报警信息
            TableResult tableResult = table.execute();
            CloseableIterator<Row> collect = tableResult.collect();
            while(collect.hasNext()) {
                Row row = collect.next();
                System.out.println(row);
            }
    
            env.execute();
    
        }
    }
    
    

    flink SQL 双流驱动 interval join

    package com.staywithyou.flink.apitest.tableapi;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.TableResult;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    import org.apache.flink.util.CloseableIterator;
    
    
    public class TableTest4_demoStreamInterval {
        public static void main(String[] args) throws Exception {
            /**
             * interval join 双流join场景
             */
            //定义环境
            StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
    
            EnvironmentSettings es= EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
            StreamTableEnvironment tEnv= StreamTableEnvironment.create(env,es);
    
            //打开kafka连接
            //温度传感器内容
            tEnv.executeSql("CREATE TABLE sensorreading (\n" +
                    "    id  STRING COMMENT '传感器唯一ID',\n" +
                    "    up_timestamp BIGINT COMMENT '传感器上抛时间',\n" +
                    "    temperature DOUBLE COMMENT '传感器温度',\n" +
                    "    procTime AS PROCTIME(),  " +
                    "    ets AS TO_TIMESTAMP(FROM_UNIXTIME(up_timestamp)), "+
                    "    WATERMARK FOR ets AS ets - INTERVAL '15' SECOND\n" +
                    ") WITH (\n" +
                    "    'connector' = 'kafka',\n" +
                    "    'topic'     = 'flinksinktest',\n" +
                    "    'properties.group.id' = 'gf14'," +
                    "    'properties.bootstrap.servers' = '192.168.217.32:9092',\n" +
                    "    'format'    = 'json'\n" +
                    ")");
    
    
            //打开kafka连接
            //温度传感器类型(维度表) 暂时作为流式数据处理
            tEnv.executeSql("CREATE TABLE dim_sensorreading (\n" +
                    "    id  STRING COMMENT '传感器唯一ID',\n" +
                    "    sensor_type  STRING COMMENT '传感器类型',\n" +
                    "    warn_timestamp BIGINT COMMENT '传感器报警时间',\n" +
                    "    ets AS TO_TIMESTAMP(FROM_UNIXTIME(warn_timestamp)), "+
                    "    WATERMARK FOR ets AS ets - INTERVAL '15' SECOND\n" +
                    ") WITH (\n" +
                    "    'connector' = 'kafka',\n" +
                    "    'topic'     = 'test',\n" +
                    "    'properties.group.id' = 'gf14'," +
                    "    'properties.bootstrap.servers' = '192.168.217.31:9092',\n" +
                    "    'format'    = 'json'\n" +
                    ")");
    
    
            Table table = tEnv.sqlQuery("select a.id,a.up_timestamp,a.temperature,b.sensor_type,b.warn_timestamp from sensorreading a " +
                    "inner join dim_sensorreading b on a.id=b.id and a.temperature>=40.0 " +
                    "and b.ets between a.ets and a.ets+INTERVAL '15' SECOND");
    
            //查看传感器报警信息
            TableResult tableResult = table.execute();
            CloseableIterator<Row> collect = tableResult.collect();
            while(collect.hasNext()) {
                Row row = collect.next();
                System.out.println(row);
            }
    
            env.execute();
    
        }
    }
    
    

    flink SQL 单流驱动 temproal table join

    package com.staywithyou.flink.apitest.tableapi;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.TableResult;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.types.Row;
    import org.apache.flink.util.CloseableIterator;
    
    
    public class TableTest4_demoStreamTemproalTable {
        public static void main(String[] args) throws Exception {
            /**
             * temproal table join 使用场景:维度表 join 场景   (流批join)
             */
            //定义环境
            StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
    
            EnvironmentSettings es= EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
            StreamTableEnvironment tEnv= StreamTableEnvironment.create(env,es);
    
            //打开kafka连接
            //温度传感器内容
            tEnv.executeSql("CREATE TABLE sensorreading (\n" +
                    "    id  STRING COMMENT '传感器唯一ID',\n" +
                    "    up_timestamp BIGINT COMMENT '传感器上抛时间',\n" +
                    "    temperature DOUBLE COMMENT '传感器温度',\n" +
                    "    procTime AS PROCTIME(),  " +
                    "    ets AS TO_TIMESTAMP(FROM_UNIXTIME(up_timestamp)), "+
                    "    WATERMARK FOR ets AS ets - INTERVAL '15' SECOND\n" +
                    ") WITH (\n" +
                    "    'connector' = 'kafka',\n" +
                    "    'topic'     = 'flinksinktest',\n" +
                    "    'properties.group.id' = 'gf14'," +
                    "    'properties.bootstrap.servers' = '192.168.217.32:9092',\n" +
                    "    'format'    = 'json'\n" +
                    ")");
    
    
            //Mysql中批数据
                 tEnv.executeSql( "CREATE TABLE dim_sensorreading (\n" +
                    "  id STRING COMMENT '传感器唯一ID',\n" +
                    "  sensor_type STRING COMMENT '传感器类型',\n" +
                    "  warn_timestamp BIGINT COMMENT '传感器报警时间'\n" +
                    ") WITH (\n" +
                    "  'connector' = 'jdbc', -- 连接器\n" +
                    "  'driver'='com.mysql.jdbc.Driver',\n" +
                    "  'username' = 'root',  --mysql用户名\n" +
                    "  'password' = 'Hezijiduihua1.',  -- mysql密码\n" +
                    "  'table-name' = 'dim_sensorreading',\n" +
                    "  'url' = 'jdbc:mysql://192.168.217.31:3306/test'\n" +
                    "  --'database-name' = 'cdc', --  数据库名称\n" +
                    "  --'hostname' = 'hadoop103',   --mysql地址\n" +
                    "  --'port' = '3306',  -- mysql端口\n" +
                    ")\n");
    
    
            Table table = tEnv.sqlQuery("select a.id,a.up_timestamp,a.temperature,b.sensor_type,b.warn_timestamp from sensorreading a " +
                    "inner join dim_sensorreading FOR SYSTEM_TIME as of a.procTime b on a.id=b.id");
    
    //        Table table=tEnv.sqlQuery("select * from dim_sensorreading");
    
            //查看传感器报警信息
            TableResult tableResult = table.execute();
            CloseableIterator<Row> collect = tableResult.collect();
            while(collect.hasNext()) {
                Row row = collect.next();
                System.out.println(row);
            }
    
            env.execute();
    
        }
    }
    
    

    这里暂时没有时间做解释,先贴上Demo,未来进行更改

    展开全文
  • 对于初学者来说,学习 Flink 可能不是一件容易的事情。看文档是一种学习,更重要的是实践起来。但对于一个初学者来说要把一个 Flink SQL 跑起来还真不容易,要搭各种环境,真心累。很...

    对于初学者来说,学习 Flink 可能不是一件容易的事情。看文档是一种学习,更重要的是实践起来。但对于一个初学者来说要把一个 Flink SQL 跑起来还真不容易,要搭各种环境,真心累。很幸运的是,Flink 生态圈里有这样一款工具可以帮助你更有效率地学习 Flink:Zeppelin。本文不讲 Flink on Zeppelin 相关的内容,更关注于如何用 Zeppelin 来学习 Flink。

    给大家介绍一个可能是迄今为止用户体验最好的 Flink SQL 教程:Flink Sql Cookbook on Zeppelin。你无需写任何代码,只要照着这篇文章轻松几步就能跑各种类型的 Flink SQL 语句。废话不多说,我们开始吧。

    这个教程其实就是 ververica 的 flink-sql-cookbook (https://github.com/ververica/flink-sql-cookbook/ )的改进版。这里所有的例子你都可以在 Zeppelin 里跑起来,而且不用写任何代码。我已经把里面的例子都移植到了 Zeppelin。

    准备环境


    Step 1

    git clone https://github.com/zjffdu/flink-sql-cookbook-on-zeppelin.git
    
    
    

    这个 repo 里是一些 Zeppelin notebook,里面都是 flink-sql-cookbook 里的例子。

    Step 2

    下载 Flink 1.12.1 (我没有试过其他版本的 Flink,有兴趣的同学可以试下),并解压。

    Step 3

    编译 Flink faker,地址:https://github.com/knaufk/flink-faker/。

    把编译出来的 flink-faker-0.2.0.jar 拷贝到 Flink 的 lib 目录下。这个 Flink faker 是一个特制的 table source,用来生成测试数据。我们的很多例子里都会用到这个  Flink faker。

    Step 4

    运行下面的命令启动最新版本的 Zeppelin。

    docker run -p 8081:8081 -p 8080:8080 --rm -v $PWD/logs:/logs -v /Users/jzhang/github/flink-sql-cookbook-on-zeppelin:/notebook -v /Users/jzhang/Java/lib/flink-1.12.1:/flink -e ZEPPELIN_LOG_DIR='/logs' -e ZEPPELIN_NOTEBOOK_DIR='/notebook' --name zeppelin apache/zeppelin:0.9.0
    
    
    

    需要注意的是这里的 2 个目录:

    1. /Users/jzhang/github/flink-sql-cookbook-on-zeppelin(这是Step 1 里clone 下来的 repo 目录)

    2. /Users/jzhang/Java/lib/flink-1.12.1 (这是 Step 2 下载下来并解压之后的 Flink 目录)

    这两个目录是我自己本地目录,请替换成你自己的目录。

    体验 Flink Sql Cookbook 教程

    好了,现在教程环境已经 ready 了,浏览器打开 http://localhost:8080 开始你的  Flink Sql 学习之旅吧。

    这是 Zeppelin 的 UI,里面已经有了一个文件夹 Flink Sql Cookbook,内含所有  Flink Sql 教程。首先我们需要配置下 Flink 解释器,点击右上角的菜单,选择 interpreter,找到 Flink interpreter,修改其中的 FLINK_HOME 为 /flink (也就是上面 docker 命令里我们挂载的 flink),然后点击重启 interpreter。

    如果你碰到如下的错误的话,请往下拉,看 Depenendies 里是不是有个用户名在那里,如果是的话,把它删掉再 save(这是 Zeppelin 的一个前端 bug,社区正在  fix)

    例子1:Filtering Data

    接下来我们就选择其中里的 Foundations/04 Filtering Data 来体验下。

    这里有 2 个段落(Paragraph),第一个段落是创建一个 server_logs 表,第二个段落是用 select where 语句去过滤这张表里的数据,并按时间排序取最新的 10 条数据。下图就是执行完 select 语句的效果,大家可以看到里面的数据每隔 3 秒钟会更新下,并且 status_code 的确永远都是 401 或者 403,验证了我们的 SQL 逻辑。右上角还有一个 FLINK JOB 的链接,点进去之后你还能看个这个 Job 的详细信息。

    例子2:Lateral Table Join

    接下来我们来看一个 Lateral Table Join 的例子,这是 Flink SQL 里的其中一种  Join 类型。初学者看到这个名词第一感觉会有点懵逼,上网查完资料之后也是似懂非懂的感觉,如果这时候有个比较直观的例子给你,应该会对你的理解非常有帮助。这个教程里就自带了这么一个例子,打开 Joins/06 Lateral Table Join,运行之后,你就能看到如下的效果。

    这里我就举这 2 个例子,里面还有很多很多有用的例子(如下图所示),大家可以自己去学习,可以尝试修改下 SQL 再运行看看结果有什么不一样。

    以上是我花了周末 2 天时间整理出来的学习资料,希望对大家学习 Flink 有所帮助,共同进步。不过这个教程还有改进的空间,有兴趣的同学可以一起来改进,目前还有如下 3 个点可以改进:

    • 每个Note里的说明文档都是英文的,可以翻译成中文,让更多人学习起来方便些。

    • 现在每个教程都是文字形式,如果有谁能为每个教程都做个小视频,配合讲解的话,我觉得效果会更好。

    • 增加更多案例教程,现在虽然内容很多,但还有空间增加更多教程。

    有兴趣想为这个教程做贡献的同学请发邮件到这个地址联系我:jeffzhang.zjf@alibaba-inc.com,  对 Flink on Zeppelin 感兴趣的可以加入钉钉群:32803524

    Flink on Zeppelin 这个项目是从 Flink 1.10 开始,目前为止已经支持了 3 个 Flink  的大版本。接下来我们还有很多有挑战的事情要去做,比如 Application Mode 的支持、K8s 的支持、调度的支持等等。Flink on Zeppelin 是我们做的工作的其中一部分,其他开源引擎的支持我们也会去做,我们的目标是做一个用户体验最好的基于开源组件的数据开发平台,有兴趣的同学可以看看下面的招聘详情,欢迎加入我们的数据开发团队。

    我们的主要职责是为阿里云上的各大中小企业客户提供大数据和 AI 的基础服务。你的工作将是围绕  Spark、Flink、Hadoop、Tensorflow、PyTorch 等开源组件构建一个易用的,企业级的大数据和 AI 开放平台。不仅有技术的挑战,也需要做产品的激情。我们采用大量的开源技术(Hadoop、Flink、Spark、Zeppelin、 Kubernetes、Tensorflow、Pytorch 等等),并且致力于回馈到开源社区。

    如果你对开源,大数据或者 AI 感兴趣,这里有最好的土壤。拥有在 Apache Flink、 Apache Kafka、Apache Zeppelin、Apache Beam、Apache Druid、Apache HBase 等诸多开源领域的 Committer & PMC。感兴趣的同学请发简历到:jeffzhang.zjf@alibaba-inc.com。


    ▼ 关注「Flink 中文社区」,获取更多技术干货 ▼

    ????  你也「在看」吗?

    展开全文
  • Flink SQL Demo 为切入,结合调试过程,深入理解 Flink Streaming SQL CodeGen flink 语法扩展 对比 Spark SQL 的执行流程:https://blog.csdn.net/super_wj0820/article/details/100981862 1. Flink...
  • FlinkSQL快速入门

    2021-09-08 16:33:29
    一.FlinkSQL和TableAPI简介 Flink针对流处理和批处理,为我们提供了多种操作API。从图中可知,越上层的API抽象程度越高,门槛越低(大家都熟悉SQL),但也丧失了灵活性。 Table API 是一系列集成在Java或Scala语言...
  • 深入解读 Flink SQL 1.13

    2021-06-24 00:39:00
    摘要:本文由社区志愿者陈政羽整理,Apache Flink 社区在 5 月份发布了 1.13 版本,带来了很多新的变化。文章整理自徐榜江(雪尽)5 月 22 日在北京的 Flink Me...
  • 点击上方蓝色字体,选择“设为星标”回复”资源“获取更多资源我们在之前的文章中详细介绍过Zepplin的来龙去脉,很多大厂基于Flink开发了自己的SQL开发平台。更多的公司可能需要选择一种...
  • Flink1.9发布之前字节跳动内部基于master分支进行内部的SQL平台构建。经历了2~3个月的时间字节内部在19年10月份发布了基于Flink1.9的Blinkplanner构建的StreamingSQL平台,并进行内部推广。在这个过程中发现了一些...
  • Flink从1.13版本开始支持在SQL Client从savepoint恢复作业。flink-savepoint介绍 接下来我们从Flink SQL Client构建一个mysql cdc数据经kafka入hudi数据湖的例子。整体流程如下: 在上述第二步中,我们通过手工停止...
  • SQL API 可以处理 SQL 语言编写的查询语句,但是这些查询需要嵌入用 Java 或 Scala 编写的程序中。此外,这些程序在提交到集群前需要用构建工具打包。这或多或少限制了 Java/Scala 程序员对 Flink 的使用。 SQL ...
  • ​ 在 1.9 版本发布之前,Flink SQL 完全借助于 Calcite 的 Schema 接口来管理注册的表,并且提供了 ExternalCatalog 接口,通过 TableDescriptor 定义外部系统数据的来源,从而访问到外部系统的数据。但是 ...
  • Flink SQL & Table简单实例

    千次阅读 2020-09-02 17:04:48
    与传统的SQL查询相比,FlinkSQL是动态表查询,SQL不会中止,会不断的执行; Kafka数据不断的被注入到动态表中,FlinkSQL则会在这张动态表中不断的执行。FLink从0.9版本中支持FLinkSQL,但是目前为止,FlnikSQL和Table...
  • flinksql自定义函数

    2021-03-05 17:55:53
    官网位置 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/ UDF 一进一出的方式 package day07; import bean.SensorReading;...import org.apache.flink.streaming.api.environ
  • Flink SQL中时态表

    2021-03-23 09:44:53
    前言 Flink 1.12正式发布后,带来了很多新的特性,本文重点学习和总结...首先,大家需要明确一个概念,就是传统SQL中表一般表示的都是有界的数据,而直接套用于流计算这样源源不断的数据上是存在问题的,所以在Flink S

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 25,514
精华内容 10,205
关键字:

flinksql