精华内容
下载资源
问答
  • Flink

    2020-07-16 16:31:34
    Flink


    简介

    Apache Flink 是一个在无界和有界数据流上进行状态计算的框架和分布式处理引擎。Flink 已经可以在所有常见的集群环境中运行 (YARN/ Mesos/ Kubernetes),并以 in-memory 的速度和任意的规模进行计算。

    • Source:必须是流式数据,eg:MySQL/ SQL Server/ Kafka/ RabbitMQ;
    • Sink:既可以是流式数据,也可以是批式数据,eg: MySQL/ SQL Server/ HDFS/ Hive/ Kafka/ RabbitMQ/ ElasticSearch;

    Flink的用途:

    • 实时数仓。包括对数据的实时清洗、归并、结构化,以及对传统数仓的补充和优化;
    • 实时报表。包括对双11等营销活动的分析大屏、实时的数据化运营;
    • 流数据分析。包括对实时指标的计算,例如金融风控指标等,以及个性化定制、精准推荐等;
    • 实时监控。包括对用户行为进行实时监测和分析、并进行预警等;

    由于Flink项目中很多华人开发者,因此有较多中文版的学习资料,但这些学子资料都是以由深到浅的方式介绍 Flink 的原理、概念,缺乏入门的demo,让初学者一头雾水。这一点上,Flink 的官方文档是不如 Spark 的官方文档的。


    命令

    安装

    # Flink下载
    wget https://mirror-hk.koddos.net/apache/flink/flink-1.10.3/flink-1.10.3-bin-scala_2.11.tgz
    
    # 解压
    tar -zxvf /root/software/flink-1.10.3-bin-scala_2.11.tgz
    
    # 以单节点形式启动Flink;注意,这里不能用 `sh start-cluster.sh` 形式启动,这里是flink的bug
    ./bin/start-cluster.sh
    
    # 查看Flink WebUI:在浏览器访问 8081 端口即可
    
    # 添加Kafka命令到环境变量,方便快速启动
    vim /etc/profile  # 打开配置文件
    export PATH="/root/flink-1.10.3/bin:$PATH"  # 增加一行
    source /etc/profile  # 变量生效
    

    测试

    通过官方自带demo,检查Flink是否可以正常使用

    # 第一步、启动一个Linux会话窗口,在这个会话中手动输入流数据
    nc -l 9999
    
    # 第二步、启动另外一个Linux会话窗口,这样就启动了一个Flink任务
    flink run ./bin/examples/streaming/SocketWindowWordCount.jar --port 9999
    
    # 第三步、登录Flink WebUI查看任务执行状态
    
    # 第四步、在第一个会话窗口中随意输入一些单词
    
    # 第五步、查看结果。在Flink WenUI - Job Manager - Stdout 也可以查看
    vim ./log/flink-root-taskexecutor-0-VM-232-72-centos.log
    

    其它

    # Flink 1.8.0
    # 安装目录 [192.168.102.85] home/devsa_dev/flink-1.8.0/
    ./bin/start-cluster.sh  # 以StandAlone模式启动Flink集群
    ./bin/stop-cluster.sh  # 启动Flink集群
    ./bin/flink --version  # 查看Flink版本
    
    ./bin/flink run examples/streaming/WordCount.jar  # 提交demo任务,WordCount
    ./bin/flink run -d examples/streaming/TopSpeedWindowing.jar  # 提交demo任务,TopSpeedWindowing
    ./bin/sql-client.sh embedded  # 启动Flink-SQL客户端
    ./bin/start-scala-shell.sh  # 启动Flink的Scala shell入口
    
    ./bin/flink -h  # 查看Flink的命令行参数
    ./bin/flink run -h  # 查看run命令的参数
    ./bin/flink list -m 127.0.0.1:8081  # 查看任务列表
    ./bin/flink stop -m 127.0.0.1:8081 d67420e52bd051fae2fddbaa79e046bb  # 停止任务
    
    ./bin/flink run -m yarn-cluster ./example/batch/WordCount.jar  # 以yarn模式提交作业
    

    Flink的API

    Flink 的API可以使用 Java / Scala / Python 来调用。

    Stateful Stream Processing

    Flink 最底层的API。它将通过过程函数(Process Function)嵌入到 DataStream API 中,允许用户可以自由地处理来自一个或多个流数据的事件,并使用一致、容错的状态。除此之外,用户可以注册事件时间和处理事件回调,从而使程序可以实现复杂的计算。
    DataStream API 是 Flink API 中比较底层的一种,在实际的生产中,需要用户较大的开发工作量,但同时它也提供了更为灵活的表达能力,适用于需要精细操作的情况;而绝大多数业务逻辑,都可以用 Flink SQL 来实现。

    DataStream / DataSet API

    Flink 提供的核心 API,DataSet 处理有界的数据集,DataStream 处理有界或者无界的数据流。用户可以通过各种方法(map / flatmap / window / keyby / sum / max / min / avg / join 等)将数据进行转换 / 计算。

    Table API

    Table API 与 Flink SQL 联系非常紧密。
    Flink API 中最顶层的抽象,可以做流批统一处理。Flink SQL 是基于Apache Calcite来实现的标准SQL;它对于批(DataSet)和流(DataStream)的输入有相同的语义,也会产生同样的计算结果。它也是使用起来最简单的 API。
    Table API 是以 表 为中心的声明式 DSL,其中表可能会动态变化(在表达流数据时)。Table API 提供了例如 select、project、join、group-by、aggregate 等操作,使用起来却更加简洁(代码量更少)。你可以在表与 DataStream/DataSet 之间无缝切换,也允许程序将 Table API 与 DataStream 以及 DataSet 混合使用。

    SQL

    Flink 提供的最高层级的抽象是 SQL 。这一层抽象在语法与表达能力上与 Table API 类似,但是是以 SQL查询表达式的形式表现程序。SQL 抽象与 Table API 交互密切,同时 SQL 查询可以直接在 Table API 定义的表上执行。


    Flink SQL

    Flink SQL 使用类似 ANSI-SQL 的语言,操作流式数据,从而实现业务逻辑的实时计算;其本质是调用了封装在 Table API & SQL 中的 Flink 函数,也是 Flink API 中最高级、最简单的形式。通过简单的 SQL 语言就可以实现实时计算,从而实现业务与技术的解耦,这是 Flink 比 Spark Streaming 高明的地方。但是,Flink SQL 并不适用于过于复杂的业务逻辑的实现。

    # Flink中常见的窗口函数
    select  -- 统计每五分钟、每个城市的骑车人数
        city
        , tumble_end(start_time, interval '5' minute) window_end  -- 时间窗口,每5分钟统计一次
        , count(*) cnt
    from t_ride
    WHERE is_del = 0
    group by city
        , tumble(start_time, interal '5' minute)
    having count(*) >= 5
    ;
    
    
    insert into sink_kafka_TenMinPsgCnts  -- 将结果写入Kafka
    select
        tumble_start(ride_time, interval '10' minutes) cnt_start  -- 每10分钟的搭乘的乘客数
        , tumble_end(ride_time, interval '10' minutes) cnt_end
        , cast(sum(psgCnt) as bigint) cnt
    from t_rides
    group by tumble(ride_time, interval '10' minutes)
    ;
    
    
    -- 将结果写入Elasticsearch
    insert into sink_elasticsearch_AreaCnts
    select
        city  -- 每个区域出发的行车数
        , count(*) cnt
    from t_rides
    where is_del = 0
    group by city
    ;
    

    Table API

    依赖

    < !-- Flink Table API的依赖 -- >
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-api-java-bridge_2.11</artifactId>
        <version>1.11.0</version>
        <scope>provided</scope>
    </dependency>
    
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-scala_2.11</artifactId>
        <version>1.11.0</version>
        <scope>provided</scope>
    </dependency>
    
    < !-- Flink Table API在本地IDE运行的依赖 -- >
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-table-planner-blink_2.11</artifactId>
        <version>1.11.0</version>
        <scope>provided</scope>
    </dependency>
    

    DataStream / DataSet API

    Flink 的这不分 API 操作与 Spark Streaming有非常多的相似之处。

    展开全文
  • study_java_flink 学习项目,java语言flink组件开发 flink官网: ://flink.apache.org/
  • Python on Flink & Flink on Zeppelin ) Staff Engineer @Alibaba 04/22/2020 About Me Apache Beam Committer Staff Engineer @ Alibaba Apache Flink ALC Beijing Apache Flink PMC Member 2016.10 2017.7 2019.6
  • 注:本篇章的flink学习均是基于java开发语言 我们如果要使用flink进行计算开发,一个完整的开发步骤是怎样的呢? 前情回顾:什么叫有界数据流,什么叫无界数据流(何为流处理,何为批处理)? - Batch Analytics,...


    注:本篇章的flink学习均是基于java开发语言

    我们如果要使用flink进行计算开发,一个完整的开发步骤是怎样的呢?

    前情回顾:什么叫有界数据流,什么叫无界数据流(何为流处理,何为批处理)?

    image-20210307225350028

    - Batch Analytics,右边是 Streaming Analytics。批量计算: 统一收集数据->存储到DB->对数据进行批量处理,对数据实时性邀请不高,比如生成离线报表、月汇总,支付宝年度账单(一年结束批处理计算)

    - Streaming Analytics 流式计算,顾名思义,就是对数据流进行处理,如使用流式分析引擎如 Storm,Flink 实时处理分析数据,应用较多的场景如 实时报表、车辆实时报警计算等等。

    (0)开发程序所需依赖

    <properties>
        <encoding>UTF-8</encoding>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <java.version>1.8</java.version>
        <scala.version>2.12</scala.version>
        <flink.version>1.12.2</flink.version>
    </properties>
    <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-java-bridge_2.12</artifactId>
                <version>${flink.version}</version>
            </dependency>
    </dependencies>
        <build>
            <sourceDirectory>src/main/java</sourceDirectory>
            <plugins>
                <!-- 编译插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.5.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <!--<encoding>${project.build.sourceEncoding}</encoding>-->
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <version>2.18.1</version>
                    <configuration>
                        <useFile>false</useFile>
                        <disableXmlReport>true</disableXmlReport>
                        <includes>
                            <include>**/*Test.*</include>
                            <include>**/*Suite.*</include>
                        </includes>
                    </configuration>
                </plugin>
                <!-- 打包插件(会包含所有依赖) -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.3</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <!--
                                            zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <transformers>
                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <!-- 设置jar包的入口类(可选) -->
                                        <mainClass></mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    

    (1)获取执行环境

    flink程序开发,首要的便是需要获取其执行环境!

    ex:

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    

    或者:

     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    

    如果使用StreamExecutionEnvironment 默认便是流式处理环境

    但是flink1.12 开始,流批一体,我们可以自己指定当前计算程序的环境模式

    指定为自动模式:AUTOMATIC

    此设置后,flink将会自动识别数据源类型

    有界数据流,则会采用批方式进行数据处理

    无界束流,则会采用流方式进行数据处理

    env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
    

    强制指定为批数据处理模式:BATCH

    env.setRuntimeMode(RuntimeExecutionMode.BATCH);
    

    强制指定为流数据处理模式:STREAMING

    env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
    

    注意点:

    在flink中,有界与无界数据流都可以强指定为流式运行环境,但是,如果明知一个数据来源为流式数据,就必须设置环境为AUTOMATICSTREAMING,不可以指定为BATCH否则程序会报错!

    (2)加载/创建数据源

    flink,是一个计算框架,在计算的前提,肯定是要有数据来源啊!

    flink可以从多种场景读取加载数据,例如 各类DB 如MysqlSQL SERVERMongoDB、各类MQ 如KafkaRabbitMQ、以及很多常用数据存储场景 如redis文件(本地文件/HDFS)scoket

    我们在加载数据源的时候,便知道,该数据是有界还是无界了!

    ex:

    flink读取rabbitMQ消息,是有界还是无界呢?当然是无界!因为flink程序启动时,能通过连接知道什么时候MQ中有数据,什么时候没有数据吗?不知道,因为本身MQ中是否有消息或者消息有多少就是一个不能肯定确定的因素,因此其不得不保持一个类似于长连接的形式,一直等待MQ中有数据到来,然后处理。


    flink读取指定某个文件中的数据,那么此数据源是有界还是无界呢?当然是有界!因为文件中数据,flink读取会做记录,当文件内容读完了,数据源就相当于没有新的数据来到了嘛!

    ex:

    从集合中读取数据:

    DataStream<String> elementsSource = env.fromElements("java,scala,php,c++","java,scala,php", "java,scala", "java");
    

    那么,这是无界数据还是有界数据呢?很明显,有界数据!因为数据就这么多,当前数据源在读取时不会再凭空产生数据了。

    scoket中读取数据:

     DataStreamSource<String> elementsSource= env.socketTextStream("10.50.40.131", 9999);
    

    这是无界数据还是有界数据呢?很明显,无界数据!因为scoket一旦连接,flink不会知道其数据源什么时候会数据结束,其不得不保持一个类似于长连接的状态,一直等待Scoket中有数据到来,然后处理。

    (3)数据转换处理

    数据转换处理,就是flink使用算子,对从数据源中获取的数据进行数据加工处理(例如 数据转换,计算等等)

    例如:开窗口、低阶处理函数ProcessFuction、各种算子:map(映射,与java8流中Map效果类似),flatmap(元素摊平,与java8流中Map效果类似)等等。

    demo示例:

    DataStreamSource<String> elementsSource = env.fromElements("java,scala,php,c++",
                                                               "java,scala,php", "java,scala", "java");
    // 数据处理
    DataStream<String> flatMap = elementsSource.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String element, Collector<String> out) throws Exception {
            String[] wordArr = element.split(",");
            for (String word : wordArr) {
                out.collect(word);
            }
        }
    });
    flatMap.map(new MapFunction<String, String>() {
        @Override
        public String map(String value) throws Exception {
            return value.toUpperCase();
        }
    });
    

    (4)处理后数据放置/输出

    将计算后的数据,进行放置(输出/存储),可以很地方,从什么地方读取数据,自然也可以将计算结果输出到该地点。

    例如:输出到文件,输出到控制台,输出到MQ,输出到DB,输出到scoket

    ex:输出到控制台

    source.print();
    

    (5)执行计算程序

    flink程序需要启动才能执行任务,正如,spring-boot启动程序需要nohup java -jar xxxx.jar & 或者编译器中点击图标按钮启动

    启动示例:

    // 1.准备环境
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    // 设置模式 (流、批、自动)
    // 2.加载数据源
    // 3.数据转换
    // 4.数据输出
    // 5.执行程序
    env.execute();
    //或者 env.execute("指定当前计算程序名");
    

    (6)完整示例

    public class FlinkDemo {
        public static void main(String[] args) throws Exception {
            // 1.准备环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // 设置运行模式
            env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
            // 2.加载数据源
            DataStreamSource<String> elementsSource = env.fromElements("java,scala,php,c++",
                    "java,scala,php", "java,scala", "java");
            // 3.数据转换
            DataStream<String> flatMap = elementsSource.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public void flatMap(String element, Collector<String> out) throws Exception {
                    String[] wordArr = element.split(",");
                    for (String word : wordArr) {
                        out.collect(word);
                    }
                }
            });
            //DataStream 下边为DataStream子类
            SingleOutputStreamOperator<String> source = flatMap.map(new MapFunction<String, String>() {
                @Override
                public String map(String value) throws Exception {
                    return value.toUpperCase();
                }
            });
            // 4.数据输出
            source.print();
            // 5.执行程序
            env.execute("flink-hello-world");
        }
    }
    

    IDEA执行后,输出结果:

    image-20210331222234445

    前边序号可以理解为多线程执行时的线程名字!

    展开全文
  • 搭建Scala、Flink开发环境,使用IDEA开发Scala程序 一、基于IDEA安装scala File——Settings——Plugins搜索Scala安装,安装后重启IDEA使Scala插件生效 二、配置Flink开发环境 <dependencies> <!--...

    IDEA搭建Scala、Flink开发环境,使用Scala语言开发Flink程序

    一、基于IDEA安装scala

    File——Settings——Plugins搜索Scala安装,安装后重启IDEA使Scala插件生效
    在这里插入图片描述

    二、配置Flink开发环境

    使用Scala语言开发Flink程序的时候需要添加下面的配置

        <dependencies>
    
    		<!--配置flin
    展开全文
  • Flink 最锋利的武器:Flink SQL 入门和实战

    万次阅读 多人点赞 2019-06-21 00:00:00
    一、Flink SQL 背景Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。自 2015...

    学习路径:《2021年最新从零到大数据专家学习路径指南》

    面      试:《2021年最新版大数据面试题全面开启更新》

    【注意】:Flink1.9版本后的Flink SQL使用看这里:

    Flink 最锋利的武器:Flink SQL 入门和实战(1.9版本及以后)

    一、Flink SQL 背景

    Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。

    自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初将最终代码开源,也就是我们熟知的 Blink。Blink 在原来的 Flink 基础上最显著的一个贡献就是 Flink SQL 的实现。

    Flink SQL 是面向用户的 API 层,在我们传统的流式计算领域,比如 Storm、Spark Streaming 都会提供一些 Function 或者 Datastream API,用户通过 Java 或 Scala 写业务逻辑,这种方式虽然灵活,但有一些不足,比如具备一定门槛且调优较难,随着版本的不断更新,API 也出现了很多不兼容的地方。

    640?wx_fmt=png

    在这个背景下,毫无疑问,SQL 就成了我们最佳选择,之所以选择将 SQL 作为核心 API,是因为其具有几个非常重要的特点:

    • SQL 属于设定式语言,用户只要表达清楚需求即可,不需要了解具体做法;

    • SQL 可优化,内置多种查询优化器,这些查询优化器可为 SQL 翻译出最优执行计划;

    • SQL 易于理解,不同行业和领域的人都懂,学习成本较低;

    • SQL 非常稳定,在数据库 30 多年的历史中,SQL 本身变化较少;

    • 流与批的统一,Flink 底层 Runtime 本身就是一个流与批统一的引擎,而 SQL 可以做到 API 层的流与批统一。

    二、Flink 的最新特性(1.7.0 和 1.8.0 更新)

    2.1 Flink 1.7.0 新特性

    在 Flink 1.7.0 中,我们更接近实现快速数据处理和以无缝方式为 Flink 社区构建数据密集型应用程序的目标。最新版本包括一些新功能和改进,例如对 Scala 2.12 的支持、一次性 S3 文件接收器、复杂事件处理与流 SQL 的集成等。

    Apache Flink 中对 Scala 2.12 的支持(FLINK-7811)

    Apache Flink 1.7.0 是第一个完全支持 Scala 2.12 的版本。这允许用户使用较新的 Scala 版本编写 Flink 应用程序并利用 Scala 2.12 生态系统。

    状态演进(FLINK-9376)

    许多情况下,由于需求的变化,长期运行的 Flink 应用程序需要在其生命周期内发展。在不失去当前应用程序进度状态的情况下更改用户状态是应用程序发展的关键要求。使用 Flink 1.7.0,社区添加了状态演变,允许您灵活地调整长时间运行的应用程序的用户状态模式,同时保持与以前保存点的兼容性。通过状态演变,可以在状态模式中添加或删除列,以便更改应用程序部署后应用程序捕获的业务功能。现在,使用 Avro 生成时,状态模式演变现在可以立即使用作为用户状态的类,这意味着可以根据 Avro 的规范来演变国家的架构。虽然 Avro 类型是 Flink 1.7 中唯一支持模式演变的内置类型,但社区仍在继续致力于在未来的 Flink 版本中进一步扩展对其他类型的支持。

    MATCH RECOGNIZE Streaming SQL 支持(FLINK-6935)

    这是 Apache Flink 1.7.0 的一个重要补充,它为 Flink SQL 提供了 MATCH RECOGNIZE 标准的初始支持。此功能结合了复杂事件处理(CEP)和 SQL,可以轻松地对数据流进行模式匹配,从而实现一整套新的用例。此功能目前处于测试阶段,因此我们欢迎社区提供任何反馈和建议。

    流式 SQL 中的时态表和时间连接(FLINK-9712)

    时态表是 Apache Flink 中的一个新概念,它为表的更改历史提供(参数化)视图,并在特定时间点返回表的内容。例如,我们可以使用具有历史货币汇率的表格。随着时间的推移,这种表格不断增长/发展,并且增加了新的更新汇率。时态表是一种视图,可以将这些汇率的实际状态返回到任何给定的时间点。使用这样的表,可以使用正确的汇率将不同货币的订单流转换为通用货币。时间联接允许使用不断变化/更新的表来进行内存和计算有效的流数据连接。

    Streaming SQL 的其他功能

    除了上面提到的主要功能外,Flink 的 Table&SQL API 已经扩展到更多用例。以下内置函数被添加到 API:TO_BASE64、LOG2、LTRIM、REPEAT、REPLACE、COSH、SINH、TANH SQL Client 现在支持在环境文件和 CLI 会话中定义视图。此外,CLI 中添加了基本的 SQL 语句自动完成功能。社区添加了一个 Elasticsearch 6 表接收器,允许存储动态表的更新结果。

    Kafka 2.0 连接器(FLINK-10598)

    Apache Flink 1.7.0 继续添加更多连接器,使其更容易与更多外部系统进行交互。在此版本中,社区添加了 Kafka 2.0 连接器,该连接器允许通过一次性保证读取和写入 Kafka 2.0。

    本地恢复(FLINK-9635)

    Apache Flink 1.7.0 通过扩展 Flink 的调度来完成本地恢复功能,以便在恢复时考虑以前的部署位置。如果启用了本地恢复,Flink 将保留最新检查点的本地副本任务运行的机器。通过将任务调度到以前的位置,Flink 将通过从本地磁盘读取检查点状态来最小化恢复状态的网络流量。此功能大大提高了恢复速度。

    2.2 Flink 1.8.0 新特性

    Flink 1.8.0 引入对状态的清理

    使用 TTL(生存时间)连续增量清除旧的 Key 状态 Flink 1.8 引入了对 RocksDB 状态后端(FLINK-10471)和堆状态后端(FLINK-10473)的旧数据的连续清理。这意味着旧的数据将(根据 TTL 设置)不断被清理掉。

    新增和删除一些 Table API

    1) 引入新的 CSV 格式符(FLINK-9964)

    此版本为符合 RFC4180 的 CSV 文件引入了新的格式符。新描述符可以使用 org.apache.flink.table.descriptors.Csv。目前,只能与 Kafka 一起使用。旧描述符 org.apache.flink.table.descriptors.OldCsv 用于文件系统连接器。

    2) 静态生成器方法在 TableEnvironment(FLINK-11445)上的弃用

    为了将 API 与实际实现分开,TableEnvironment.getTableEnvironment() 不推荐使用静态方法。现在推荐使用 Batch/StreamTableEnvironment.create()。

    3) 表 API Maven 模块中的更改(FLINK-11064)

    之前具有 flink-table 依赖关系的用户需要更新其依赖关系 flink-table-planner,以及正确的依赖关系 flink-table-api-*,具体取决于是使用 Java 还是 Scala: flink-table-api-java-bridge 或者 flink-table-api-scala-bridge。

    Kafka Connector 的修改

    引入可直接访问 ConsumerRecord 的新 KafkaDeserializationSchema(FLINK-8354),对于 FlinkKafkaConsumers 推出了一个新的 KafkaDeserializationSchema,可以直接访问 KafkaConsumerRecord。

    三、Flink SQL 的编程模型

    Flink 的编程模型基础构建模块是流(streams)与转换 (transformations),每一个数据流起始于一个或多个 source,并终止于一个或多个 sink。

    640?wx_fmt=png

    相信大家对上面的图已经十分熟悉了,当然基于 Flink SQL 编写的 Flink 程序也离不开读取原始数据,计算逻辑和写入计算结果数据三部分。

    一个完整的 Flink SQL 编写的程序包括如下三部分:

    • Source Operator:Soruce operator 是对外部数据源的抽象, 目前 Apache Flink 内置了很多常用的数据源实现例如 MySQL、Kafka 等;

    • Transformation Operators:算子操作主要完成例如查询、聚合操作等,目前 Flink SQL 支持了 Union、Join、Projection、Difference、Intersection 及 window 等大多数传统数据库支持的操作;

    • Sink Operator:Sink operator 是对外结果表的抽象,目前 Apache Flink 也内置了很多常用的结果表的抽象,比如 Kafka Sink 等

    我们通过用一个最经典的 WordCount 程序作为入门,看一下传统的基于 DataSet/DataStream API 开发和基于 SQL 开发有哪些不同?

    • DataStream/DataSetAPI

     
    • Flink SQL

    //省略掉初始化环境等公共代码
    SELECT word, COUNT(word) FROM table GROUP BY word;
    

    我们已经可以直观体会到,SQL 开发的快捷和便利性了。

    四、Flink SQL 的语法和算子

    4.1 Flink SQL 支持的语法

    Flink SQL 核心算子的语义设计参考了 1992、2011 等 ANSI-SQL 标准,Flink 使用 Apache Calcite 解析 SQL ,Calcite 支持标准的 ANSI SQL。

    那么 Flink 自身支持的 SQL 语法有哪些呢?

    insert:
    INSERT INTO tableReference
    query
    
    query:
    values
      | {
    select
          | selectWithoutFrom
          | query UNION [ ALL ] query
          | query EXCEPT query
          | query INTERSECT query
        }
        [ ORDER BY orderItem [, orderItem ]* ]
        [ LIMIT { count | ALL } ]
        [ OFFSET start { ROW | ROWS } ]
        [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
    
    orderItem:
      expression [ ASC | DESC ]
    
    select:
    SELECT [ ALL | DISTINCT ]
      { * | projectItem [, projectItem ]* }
    FROM tableExpression
      [ WHERE booleanExpression ]
      [ GROUP BY { groupItem [, groupItem ]* } ]
      [ HAVING booleanExpression ]
      [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
    
    selectWithoutFrom:
    SELECT [ ALL | DISTINCT ]
      { * | projectItem [, projectItem ]* }
    
    projectItem:
      expression [ [ AS ] columnAlias ]
      | tableAlias . *
    
    tableExpression:
      tableReference [, tableReference ]*
      | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
    
    joinCondition:
    ON booleanExpression
      | USING '(' column [, column ]* ')'
    
    tableReference:
      tablePrimary
      [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
    
    tablePrimary:
      [ TABLE ] [ [ catalogName . ] schemaName . ] tableName
      | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
      | UNNEST '(' expression ')'
    
    values:
    VALUES expression [, expression ]*
    
    groupItem:
      expression
      | '(' ')'
      | '(' expression [, expression ]* ')'
      | CUBE '(' expression [, expression ]* ')'
      | ROLLUP '(' expression [, expression ]* ')'
      | GROUPING SETS '(' groupItem [, groupItem ]* ')'
    
    windowRef:
        windowName
      | windowSpec
    
    windowSpec:
        [ windowName ]
    '('
        [ ORDER BY orderItem [, orderItem ]* ]
        [ PARTITION BY expression [, expression ]* ]
        [
    RANGE numericOrIntervalExpression {PRECEDING}
          | ROWS numericExpression {PRECEDING}
        ]
    ')'
    

    上面 SQL 的语法支持也已经表明了 Flink SQL 对算子的支持,接下来我们对 Flink SQL 中最常见的算子语义进行介绍。

    4.2 Flink SQL 常用算子

    SELECT

    SELECT 用于从 DataSet/DataStream 中选择数据,用于筛选出某些列。

    示例:

    SELECT * FROM Table;// 取出表中的所有列
    SELECT name,age FROM Table;// 取出表中 name 和 age 两列
    

    与此同时 SELECT 语句中可以使用函数和别名,例如我们上面提到的 WordCount 中:

    SELECT word, COUNT(word) FROM table GROUP BY word;
    

    WHERE

    WHERE 用于从数据集/流中过滤数据,与 SELECT 一起使用,用于根据某些条件对关系做水平分割,即选择符合条件的记录。

    示例:

    SELECT name,age FROM Table where name LIKE ‘% 小明 %’;
    SELECT * FROM Table WHERE age = 20

    WHERE 是从原数据中进行过滤,那么在 WHERE 条件中,Flink SQL 同样支持 =、<、>、<>、>=、<=,以及 AND、OR 等表达式的组合,最终满足过滤条件的数据会被选择出来。并且 WHERE 可以结合 IN、NOT IN 联合使用。举个负责的例子:

    SELECT name, age
    FROM Table
    WHERE name IN (SELECT name FROM Table2)
    

    DISTINCT

    DISTINCT 用于从数据集/流中去重根据 SELECT 的结果进行去重。

    示例:

    SELECT DISTINCT name FROM Table;
    

    对于流式查询,计算查询结果所需的 State 可能会无限增长,用户需要自己控制查询的状态范围,以防止状态过大。

    GROUP BY

    GROUP BY 是对数据进行分组操作。例如我们需要计算成绩明细表中,每个学生的总分。

    SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name;
    

    UNION 和 UNION ALL

    UNION 用于将两个结果集合并起来,要求两个结果集字段完全一致,包括字段类型、字段顺序。不同于 UNION ALL 的是,UNION 会对结果数据去重。

    示例:

    SELECT * FROM T1 UNION (ALL) SELECT * FROM T2;
    

    JOIN

    JOIN 用于把来自两个表的数据联合起来形成结果表,Flink 支持的 JOIN 类型包括:

    • JOIN - INNER JOIN

    • LEFT JOIN - LEFT OUTER JOIN

    • RIGHT JOIN - RIGHT OUTER JOIN

    • FULL JOIN - FULL OUTER JOIN

    这里的 JOIN 的语义和我们在关系型数据库中使用的 JOIN 语义一致。

    示例:

    JOIN(将订单表数据和商品表进行关联)
    SELECT * FROM Orders INNER JOIN Product ON Orders.productId = [Product.id](http://product.id/)
    

    LEFT JOIN 与 JOIN 的区别是当右表没有与左边相 JOIN 的数据时候,右边对应的字段补 NULL 输出,RIGHT JOIN 相当于 LEFT JOIN 左右两个表交互一下位置。FULL JOIN 相当于 RIGHT JOIN 和 LEFT JOIN 之后进行 UNION ALL 操作。

    示例:

    SELECT *
    FROM Orders LEFT JOIN Product ON Orders.productId = [Product.id](http://product.id/)
    
    SELECT *
    FROM Orders RIGHT JOIN Product ON Orders.productId = [Product.id](http://product.id/)
    
    SELECT *
    FROM Orders FULL OUTER JOIN Product ON Orders.productId = [Product.id](http://product.id/)
    

    Group Window

    根据窗口数据划分的不同,目前 Apache Flink 有如下 3 种 Bounded Window:

    • Tumble,滚动窗口,窗口数据有固定的大小,窗口数据无叠加;

    • Hop,滑动窗口,窗口数据有固定大小,并且有固定的窗口重建频率,窗口数据有叠加;

    • Session,会话窗口,窗口数据没有固定的大小,根据窗口数据活跃程度划分窗口,窗口数据无叠加。

    Tumble Window

    Tumble 滚动窗口有固定大小,窗口数据不重叠,具体语义如下:

    640?wx_fmt=png

    Tumble 滚动窗口对应的语法如下:

    SELECT 
        [gk],
        [TUMBLE_START(timeCol, size)], 
        [TUMBLE_END(timeCol, size)], 
        agg1(col1), 
        ... 
        aggn(colN)
    FROM Tab1
    GROUP BY [gk], TUMBLE(timeCol, size)
    

    其中:

    • [gk] 决定了是否需要按照字段进行聚合;

    • TUMBLE_START 代表窗口开始时间;

    • TUMBLE_END 代表窗口结束时间;

    • timeCol 是流表中表示时间字段;

    • size 表示窗口的大小,如 秒、分钟、小时、天。

    举个例子,假如我们要计算每个人每天的订单量,按照 user 进行聚合分组:

    SELECT user, TUMBLE_START(rowtime, INTERVAL1DAY) as wStart, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime, INTERVAL1DAY), user;
    

    Hop Window

    Hop 滑动窗口和滚动窗口类似,窗口有固定的 size,与滚动窗口不同的是滑动窗口可以通过 slide 参数控制滑动窗口的新建频率。因此当 slide 值小于窗口 size 的值的时候多个滑动窗口会重叠,具体语义如下:

    640?wx_fmt=png

    Hop 滑动窗口对应语法如下:

    SELECT 
        [gk], 
        [HOP_START(timeCol, slide, size)] ,  
        [HOP_END(timeCol, slide, size)],
        agg1(col1), 
        ... 
        aggN(colN) 
    FROM Tab1
    GROUP BY [gk], HOP(timeCol, slide, size)
    

    每次字段的意思和 Tumble 窗口类似:

    • [gk] 决定了是否需要按照字段进行聚合;

    • HOP_START 表示窗口开始时间;

    • HOP_END 表示窗口结束时间;

    • timeCol 表示流表中表示时间字段;

    • slide 表示每次窗口滑动的大小;

    • size 表示整个窗口的大小,如 秒、分钟、小时、天。

    举例说明,我们要每过一小时计算一次过去 24 小时内每个商品的销量:

    SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product
    

    Session Window

    会话时间窗口没有固定的持续时间,但它们的界限由 interval 不活动时间定义,即如果在定义的间隙期间没有出现事件,则会话窗口关闭。

    640?wx_fmt=png

    Seeeion 会话窗口对应语法如下:

    SELECT 
        [gk], 
        SESSION_START(timeCol, gap) AS winStart,  
        SESSION_END(timeCol, gap) AS winEnd,
        agg1(col1),
         ... 
        aggn(colN)
    FROM Tab1
    GROUP BY [gk], SESSION(timeCol, gap)
    
    • [gk] 决定了是否需要按照字段进行聚合;

    • SESSION_START 表示窗口开始时间;

    • SESSION_END 表示窗口结束时间;

    • timeCol 表示流表中表示时间字段;

    • gap 表示窗口数据非活跃周期的时长。

    例如,我们需要计算每个用户访问时间 12 小时内的订单量:

    SELECT user, SESSION_START(rowtime, INTERVAL12HOUR) AS sStart, SESSION_ROWTIME(rowtime, INTERVAL12HOUR) AS sEnd, SUM(amount) FROM Orders GROUP BY SESSION(rowtime, INTERVAL12HOUR), user
    

    五、Flink SQL 的内置函数

    Flink 提供大量的内置函数供我们直接使用,我们常用的内置函数分类如下:

    • 比较函数

    • 逻辑函数

    • 算术函数

    • 字符串处理函数

    • 时间函数

    我们接下来对每种函数举例进行讲解。

    5.1 比较函数

    比较函数描述
    value1=value2如果 value1 等于 value2,则返回 TRUE ; 如果 value1 或 value2 为 NULL,则返回 UNKNOWN
    value1<>value2如果 value1 不等于 value2,则返回 TRUE ; 如果 value1 或 value2 为 NULL,则返回 UNKNOWN
    value1>value2如果 value1 大于 value2,则返回 TRUE ; 如果 value1 或 value2 为 NULL,则返回 UNKNOWN
    value1 < value2如果 value1 小于 value2,则返回 TRUE ; 如果 value1 或 value2 为 NULL,则返回 UNKNOWN
    value IS NULL如果 value 为 NULL,则返回 TRUE
    value IS NOT NULL如果 value 不为 NULL,则返回 TRUE
    string1 LIKE string2如果 string1 匹配模式 string2,则返回 TRUE ; 如果 string1 或 string2 为 NULL,则返回 UNKNOWN
    value1 IN (value2, value3…)如果给定列表中存在 value1 (value2,value3,…),则返回 TRUE 。当(value2,value3,…)包含 NULL,如果可以找到该数据元则返回 TRUE,否则返回 UNKNOWN。如果 value1 为 NULL,则始终返回 UNKNOWN

    5.2 逻辑函数

    逻辑函数描述
    A OR B如果 A 为 TRUE 或 B 为 TRUE,则返回 TRUE
    A AND B如果 A 和 B 都为 TRUE,则返回 TRUE
    NOT boolean如果 boolean 为 FALSE,则返回 TRUE,否则返回 TRUE。如果 boolean 为 TRUE,则返回 FALSE
    A IS TRUE 或 FALSE判断 A 是否为真

    5.3 算术函数

    算术函数描述
    numeric1 ±*/ numeric2分别代表两个数值加减乘除
    ABS(numeric)返回 numeric 的绝对值
    POWER(numeric1, numeric2)返回 numeric1 上升到 numeric2 的幂

    除了上述表中的函数,Flink SQL 还支持种类丰富的函数计算。

    5.4 字符串处理函数

    字符串函数描述
    UPPER/LOWER以大写 / 小写形式返回字符串
    LTRIM(string)返回一个字符串,从去除左空格的字符串, 类似还有 RTRIM
    CONCAT(string1, string2,…)返回连接 string1,string2,…的字符串

    5.5 时间函数

    时间函数描述
    DATE string返回以“yyyy-MM-dd”形式从字符串解析的 SQL 日期
    TIMESTAMP string返回以字符串形式解析的 SQL 时间戳,格式为“yyyy-MM-dd HH:mm:ss [.SSS]”
    CURRENT_DATE返回 UTC 时区中的当前 SQL 日期
    DATE_FORMAT(timestamp, string)返回使用指定格式字符串格式化时间戳的字符串

    六、Flink SQL 实战应用

    上面我们分别介绍了 Flink SQL 的背景、新特性、编程模型和常用算子,这部分我们将模拟一个真实的案例为大家使用 Flink SQL 提供一个完整的 Demo。

    相信这里应该有很多 NBA 的球迷,假设我们有一份数据记录了每个赛季的得分王的数据,包括赛季、球员、出场、首发、时间、助攻、抢断、盖帽、得分等。现在我们要统计获得得分王荣誉最多的三名球员。

    原数据存在 score.csv 文件中,如下:

    17-18,詹姆斯-哈登,72,72,35.4,8.8,1.8,0.7,30.4
    16-17,拉塞尔-威斯布鲁克,81,81,34.6,10.4,1.6,0.4,31.6
    15-16,斯蒂芬-库里,79,79,34.2,6.7,2.1,0.2,30.1
    14-15,拉塞尔-威斯布鲁克,67,67,34.4,8.6,2.1,0.2,28.1
    13-14,凯文-杜兰特,81,81,38.5,5.5,1.3,0.7,32
    12-13,卡梅罗-安东尼,67,67,37,2.6,0.8,0.5,28.7
    11-12,凯文-杜兰特,66,66,38.6,3.5,1.3,1.2,28
    10-11,凯文-杜兰特,78,78,38.9,2.7,1.1,1,27.7
    09-10,凯文-杜兰特,82,82,39.5,2.8,1.4,1,30.1
    08-09,德维恩-韦德,79,79,38.6,7.5,2.2,1.3,30.2
    07-08,勒布朗-詹姆斯,75,74,40.4,7.2,1.8,1.1,30
    06-07,科比-布莱恩特,77,77,40.8,5.4,1.4,0.5,31.6
    05-06,科比-布莱恩特,80,80,41,4.5,1.8,0.4,35.4
    04-05,阿伦-艾弗森,75,75,42.3,7.9,2.4,0.1,30.7
    03-04,特雷西·麦克格雷迪,67,67,39.9,5.5,1.4,0.6,28
    02-03,特雷西·麦克格雷迪,75,74,39.4,5.5,1.7,0.8,32.1
    01-02,阿伦-艾弗森,60,59,43.7,5.5,2.8,0.2,31.4
    00-01,阿伦-艾弗森,71,71,42,4.6,2.5,0.3,31.1
    99-00,沙奎尔-奥尼尔,79,79,40,3.8,0.5,3,29.7
    98-99,阿伦-艾弗森,48,48,41.5,4.6,2.3,0.1,26.8
    97-98,迈克尔-乔丹,82,82,38.8,3.5,1.7,0.5,28.7
    96-97,迈克尔-乔丹,82,82,37.9,4.3,1.7,0.5,29.6
    95-96,迈克尔-乔丹,82,82,37.7,4.3,2.2,0.5,30.4
    94-95,沙奎尔-奥尼尔,79,79,37,2.7,0.9,2.4,29.3
    93-94,大卫-罗宾逊,80,80,40.5,4.8,1.7,3.3,29.8
    92-93,迈克尔-乔丹,78,78,39.3,5.5,2.8,0.8,32.6
    91-92,迈克尔-乔丹,80,80,38.8,6.1,2.3,0.9,30.1
    90-91,迈克尔-乔丹,82,82,37,5.5,2.7,1,31.5
    89-90,迈克尔-乔丹,82,82,39,6.3,2.8,0.7,33.6
    88-89,迈克尔-乔丹,81,81,40.2,8,2.9,0.8,32.5
    87-88,迈克尔-乔丹,82,82,40.4,5.9,3.2,1.6,35
    86-87,迈克尔-乔丹,82,82,40,4.6,2.9,1.5,37.1
    85-86,多米尼克-威尔金斯,78,78,39.1,2.6,1.8,0.6,30.3
    84-85,伯纳德-金,55,55,37.5,3.7,1.3,0.3,32.9
    83-84,阿德里安-丹特利,79,79,37.8,3.9,0.8,0.1,30.6
    82-83,阿历克斯-英格利什,82,82,36.4,4.8,1.4,1.5,28.4
    81-82,乔治-格文,79,79,35.7,2.4,1,0.6,32.3
    

    首先我们需要创建一个工程,并且在 Maven 中有如下依赖:

    <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <flink.version>1.7.1</flink.version>
    <slf4j.version>1.7.7</slf4j.version>
    <log4j.version>1.2.17</log4j.version>
    <scala.binary.version>2.11</scala.binary.version>
    </properties>
    <dependencies>
    <!-- Apache Flink dependencies -->
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table_2.11</artifactId>
    <version>1.7.1</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
    <version>1.7.1</version>
    </dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>${slf4j.version}</version>
    </dependency>
    <dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>${log4j.version}</version>
    </dependency>
    

    第一步,创建上下文环境:

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
    

    第二步,读取 score.csv 并且作为 source 输入:

     DataSet<String> input = env.readTextFile("score.csv");
            DataSet<PlayerData> topInput = input.map(new MapFunction<String, PlayerData>() {
    @Override
    public PlayerData map(String s) throws Exception {
                    String[] split = s.split(",");
    return new PlayerData(String.valueOf(split[0]),
                            String.valueOf(split[1]),
                            String.valueOf(split[2]),
                            Integer.valueOf(split[3]),
                            Double.valueOf(split[4]),
                            Double.valueOf(split[5]),
                            Double.valueOf(split[6]),
                            Double.valueOf(split[7]),
                            Double.valueOf(split[8])
                    );
                }
            });
    其中的PlayerData类为自定义类:
    public static class PlayerData {
    /**
             * 赛季,球员,出场,首发,时间,助攻,抢断,盖帽,得分
             */
    public String season;
    public String player;
    public String play_num;
    public Integer first_court;
    public Double time;
    public Double assists;
    public Double steals;
    public Double blocks;
    public Double scores;
    
    public PlayerData() {
    super();
            }
    
    public PlayerData(String season,
                              String player,
                              String play_num,
                              Integer first_court,
                              Double time,
                              Double assists,
                              Double steals,
                              Double blocks,
                              Double scores
                              ) {
    this.season = season;
    this.player = player;
    this.play_num = play_num;
    this.first_court = first_court;
    this.time = time;
    this.assists = assists;
    this.steals = steals;
    this.blocks = blocks;
    this.scores = scores;
            }
        }
    

    第三步,将 source 数据注册成表:

    Table topScore = tableEnv.fromDataSet(topInput);
    tableEnv.registerTable("score", topScore);
    

    第四步,核心处理逻辑 SQL 的编写:

    Table queryResult = tableEnv.sqlQuery("
    select player, 
    count(season) as num 
    FROM score 
    GROUP BY player 
    ORDER BY num desc 
    LIMIT 3
    ");
    

    第五步,输出结果:

    DataSet<Result> result = tableEnv.toDataSet(queryResult, Result.class);
    result.print();
    

    我们直接运行整个程序,观察输出结果:

    ...
    16:28:06,162 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Shut down complete.
    16:28:06,162 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Stop job leader service.
    16:28:06,164 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Stopped TaskExecutor akka://flink/user/taskmanager_2.
    16:28:06,166 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
    16:28:06,166 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.
    16:28:06,169 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.
    16:28:06,177 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopping Akka RPC service.
    16:28:06,187 INFO  org.apache.flink.runtime.blob.PermanentBlobCache              - Shutting down BLOB cache
    16:28:06,187 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Shutting down BLOB cache
    16:28:06,188 INFO  org.apache.flink.runtime.blob.BlobServer                      - Stopped BLOB server at 0.0.0.0:51703
    16:28:06,188 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped Akka RPC service.
    迈克尔-乔丹:10
    凯文-杜兰特:4
    阿伦-艾弗森:4
    

    我们看到控制台已经输出结果了:

    640?wx_fmt=jpeg

    完整的代码如下:

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.BatchTableEnvironment;
    
    public class TableSQL {
    
    public static void main(String[] args) throws Exception{
    
    //1\. 获取上下文环境 table的环境
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
    
    //2\. 读取score.csv
            DataSet<String> input = env.readTextFile("score.csv");
            input.print();
    
            DataSet<PlayerData> topInput = input.map(new MapFunction<String, PlayerData>() {
    @Override
    public PlayerData map(String s) throws Exception {
                    String[] split = s.split(",");
    
    return new PlayerData(String.valueOf(split[0]),
                            String.valueOf(split[1]),
                            String.valueOf(split[2]),
                            Integer.valueOf(split[3]),
                            Double.valueOf(split[4]),
                            Double.valueOf(split[5]),
                            Double.valueOf(split[6]),
                            Double.valueOf(split[7]),
                            Double.valueOf(split[8])
                    );
                }
            });
    
    //3\. 注册成内存表
            Table topScore = tableEnv.fromDataSet(topInput);
            tableEnv.registerTable("score", topScore);
    
    //4\. 编写sql 然后提交执行
    //select player, count(season) as num from score group by player order by num desc;
            Table queryResult = tableEnv.sqlQuery("select player, count(season) as num from score group by player order by num desc limit 3");
    
    //5\. 结果进行打印
            DataSet<Result> result = tableEnv.toDataSet(queryResult, Result.class);
            result.print();
    
        }
    
    public static class PlayerData {
    /**
             * 赛季,球员,出场,首发,时间,助攻,抢断,盖帽,得分
             */
    public String season;
    public String player;
    public String play_num;
    public Integer first_court;
    public Double time;
    public Double assists;
    public Double steals;
    public Double blocks;
    public Double scores;
    
    public PlayerData() {
    super();
            }
    
    public PlayerData(String season,
                              String player,
                              String play_num,
                              Integer first_court,
                              Double time,
                              Double assists,
                              Double steals,
                              Double blocks,
                              Double scores
                              ) {
    this.season = season;
    this.player = player;
    this.play_num = play_num;
    this.first_court = first_court;
    this.time = time;
    this.assists = assists;
    this.steals = steals;
    this.blocks = blocks;
    this.scores = scores;
            }
        }
    
    public static class Result {
    public String player;
    public Long num;
    
    public Result() {
    super();
            }
    public Result(String player, Long num) {
    this.player = player;
    this.num = num;
            }
    @Override
    public String toString() {
    return player + ":" + num;
            }
        }
    }//
    

    当然我们也可以自定义一个 Sink,将结果输出到一个文件中,例如:

            TableSink sink = new CsvTableSink("/home/result.csv", ",");
    String[] fieldNames = {"name", "num"};
            TypeInformation[] fieldTypes = {Types.STRING, Types.INT};
            tableEnv.registerTableSink("result", fieldNames, fieldTypes, sink);
            sqlQuery.insertInto("result");
            env.execute();
    

    然后我们运行程序,可以看到 /home 目录下生成的 result.csv,查看结果:

    迈克尔-乔丹,10
    凯文-杜兰特,4
    阿伦-艾弗森,4
    

    七、总结

    本篇向大家介绍了 Flink SQL 产生的背景,Flink SQL 大部分核心功能,并且分别介绍了 Flink SQL 的编程模型和常用算子及内置函数。最后以一个完整的示例展示了如何编写 Flink SQL 程序。Flink SQL 的简便易用极大地降低了 Flink 编程的门槛,是我们必需掌握的使用 Flink 解决流式计算问题最锋利的武器!

    640?wx_fmt=gif

    展开全文
  • Flink入门之WordCount(Scala语言) 流式处理 一.创建一个Maven工程 1.pom.xml文件依赖 <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-...
  • Flink教程

    2020-06-25 19:12:24
    这是一部讲解Flink的通俗教程,语言可能不够优雅,但力争讲的明白,如果您在浏览的时候有什么迷惑的地方,您可以大胆的提出issue,这样我可以及时修改,但是您不要指望讲解的有多么深入,因为这只是一个入门的Flink...
  • 1.valueState ValueState:这将保留一个可以更新和检索的值(作用域为...1.1 java语言描述 valueState功能定义:对输入的数据数量统计,当数量达到2时,输出清零,之后继续统计。 package javaState.valuestate; impo...
  • flink sql flink 整合 hive
  • flink中针对读取csv文件的输出可以有3种格式,都是通过引用inputFormat来控制的,分别为 PojoCsvInputFormat输出类型为pojo, RowCsvInputFormat输出类型为Row, TupleCsvInputFormat输出类型为Tuple。本例子就用...
  • Flink介绍

    2020-11-05 17:49:24
    Flink 实际应用场景 Flink 自从 2019 年初开源以来,迅速成为大数据实时计算领域炙手可热的技术框架。作为 Flink 的主要贡献者阿里巴巴率先将其在全集团进行推广使用,另外由于 Flink 天然的流式特性,更为领先的...
  • flink实战--flinkSQL入门大全

    万次阅读 2018-11-12 18:07:03
    FlinkSQL概念介绍 Table API &...Table API是Scala和Java的语言集成查询API,允许以非常直观的方式组合来自关系运算符的查询,Table API和SQL接口彼此紧密集成,以及Flink的DataStream和DataSet A...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 11,196
精华内容 4,478
关键字:

flink语言