flink 订阅
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。 [1] 展开全文
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。 [1]
信息
稳定版本
Apache软件基金会 [1]
类    型
1.8.0 [1]
中文名
弗林克
外文名
Flink [1]
flink开发
Apache Flink是由Apache软件基金会内的Apache Flink社区基于Apache许可证2.0开发的,该项目已有超过100位代码提交者和超过460贡献者。 [2]  是由Apache Flink的创始人创建的公司。目前,该公司已聘用了12个Apache Flink的代码提交者。 [3] 
收起全文
精华内容
下载资源
问答
  • Flink

    千次阅读 2019-08-14 19:43:41
    Flink 一、简介 Flink核心是一个流式的数据流执行引擎,其针对数据流的分布式计算提供了数据分布、数据通信以及容错机制等功能。基于流执行引擎,Flink提供了诸多更高抽象层的API以便用户编写分布式任务: ​ ...

    Flink

    一、简介

    Flink核心是一个流式的数据流执行引擎,其针对数据流的分布式计算提供了数据分布、数据通信以及容错机制等功能。基于流执行引擎,Flink提供了诸多更高抽象层的API以便用户编写分布式任务:

    ​ DataSet API, 对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用Flink提供的各种操作符对分布式数据集进行处理,支持Java、Scala和Python。

    ​ DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持Java和Scala。

    ​ Table API,对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过类SQL的DSL对关系表进行各种查询操作,支持Java和Scala。

    此外,Flink还针对特定的应用领域提供了领域库,例如:

    ​ Flink ML,Flink的机器学习库,提供了机器学习Pipelines API并实现了多种机器学习算法。

    ​ Gelly,Flink的图计算库,提供了图计算的相关API及多种图计算算法实现。

    二、优点

    Flink 是一个开源的分布式流式处理框架:

    ​ ①提供准确的结果,甚至在出现无序或者延迟加载的数据的情况下。

    ​ ②它是状态化的容错的,同时在维护一次完整的的应用状态时,能无缝修复错误。

    ​ ③大规模运行,在上千个节点运行时有很好的吞吐量和低延迟。

    Flink处理不同数据分类:

    ​ ①有界数据流–Flink批处理–DataSet–groupBy

    ​ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    ​ ②无界数据流–Flink流处理–DataStream–keyBy–流处理默认一条条数据处理

    ​ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    ​ ③有/无界数据流–Flink SQL

    Flink & Storm & SparkStreaming 流处理框架的区别:

    ​ •Strom:纯实时处理数据,吞吐量小 --水龙头滴水

    ​ •SparkStreaming : 准实时处理数据,微批处理数据,吞吐量大 --河道中开闸关闸

    ​ •Flink:纯实时处理数据,吞吐量大 --河流远远不断

    三、注意点

    Flink程序的执行过程:
    	获取flink的执行环境(execution environment)
    	加载数据				-- soure
    	对加载的数据进行转换 		-- transformation
    	对结果进行保存或者打印	    -- sink
    	触发flink程序的执行(execute(),count(),collect(),print()),例如:调用						ExecutionEnvironment或者StreamExecutionEnvironment的execute()方法。
    	
    
    
    1.创建环境
          批处理:ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
          流处理:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          
    2.在批处理中Flink处理的数据对象是DataSet
        在流处理中Flink处理的数据对象时DataStream
    
    3.代码流程必须符合 source ->transformation -> sink
          transformation 都是懒执行,需要最后使用env.execute()触发执行或者使用 print() 					,count(),collect() 触发执行
    
    4.Flink编程不是基于K,V格式的编程,通过某些方式来指定虚拟key
    
    5.Flink中的tuple最多支持25个元素,每个元素是从0开始
    

    四、WordCount

    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.operators.Order;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.*;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.core.fs.FileSystem;
    import org.apache.flink.util.Collector;
    
    public class WordCount {
        public static void main(String[] args) throws Exception{
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSource<String> dataSource = env.readTextFile("./data/words");
            FlatMapOperator<String, String> words = dataSource.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public void flatMap(String line, Collector<String> collector) throws Exception {
                    String[] split = line.split(" ");
                    for(String word : split){
                        collector.collect(word);
                    }
                }
            });
    
            MapOperator<String, Tuple2<String, Integer>> pairWords = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String word) throws Exception {
                    return new Tuple2<>(word, 1);
                }
            });
    
            UnsortedGrouping<Tuple2<String, Integer>> groupBy = pairWords.groupBy(0);
            AggregateOperator<Tuple2<String, Integer>> result = groupBy.sum(1).setParallelism(1);
            SortPartitionOperator<Tuple2<String, Integer>> sortResult = result.sortPartition(1, Order.DESCENDING);
    
            sortResult.writeAsCsv("./data/result/r2","\n","&", FileSystem.WriteMode.OVERWRITE);
            env.execute("myflink");
            result.print();
        }
    }
    

    五、排序

    在Flink中,排序是默认在各个分区中进行的排序,所以最终结果在默认情况下是不能进行排序展示的

    这时, ①可以给相关的算子进行设计–sum().setParallelism(1)

    AggregateOperator<Tuple2<String, Integer>> result = groupBy.sum(1).setParallelism(1);
    

    ​ ②全局设计,但是全局设计限制分区数,影响框架运行速度,通常不建议–env. setParallelism(1)

    六、source数据源和Sink源

    1 数据源Source

    ​ Source 是Flink 获取数据的地方。以下source 中和批处理的source 类似,但是以下
    源作为dataStream 流处理时,是一条条处理,最终得到的不是一个总结果,而是每
    次处理后都会得到一个结果。
    ​ 1).socketTextStream – 读取Socket 数据流
    ​ 2). readTextFile() – 逐行读取文本文件获取数据流,每行都返回字符串。
    ​ 3).fromCollection() – 从集合中创建数据流。
    ​ 4).fromElements – 从给定的数据对象创建数据流,所有数据类型要一致。
    ​ 5).addSource – 添加新的源函数,例如从kafka 中读取数据,参见读取kafka 数据案例。

    2 数据写出Sink

    ​ 1).writeAsText() – 以字符串的形式逐行写入文件,调用每个元素的toString()得到写
    入的字符串。
    ​ 2).writeAsCsv() – 将元组写出以逗号分隔的csv 文件。注意:只能作用到元组数据上。
    ​ 3).print() – 控制台直接输出结果,调用对象的toString()方法得到输出结果。
    ​ 4).addSink() – 自定义接收函数。例如将结果保存到kafka 中,参见kafka 案例。

    七、累加器&计数器

    Flink 累加器

    ​ Accumulator即累加器,可以在分布式统计数据,只有在任务结束之后才能获取累加器的最终结果。计数器是累加器的具体实现,有:IntCounter,LongCounter和DoubleCounter。

    累加器注意事项

    ​ 1.需要在算子内部创建累加器对象

    ​ 2.通常在Rich函数中的open方法中注册累加器,指定累加器的名称

    ​ 3.在当前算子内任意位置可以使用累加器

    ​ 4.必须当任务执行结束后,通过env.execute(xxx)执行后的JobExecutionResult对象获取累加器的值。

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    DataSource<String> dataSource = env.fromElements("a", "b", "c", "d", "e", "f");
    MapOperator<String, String> map = dataSource.map(new RichMapFunction<String, String>(){
    
    //1.创建累加器,在算子中创建累加器对象
    	private IntCounter numLines = new IntCounter();
    
    //2.注册累加器对象,通常在Rich 函数的open 方法中使用
    // getRuntimeContext().addAccumulator("num-lines", this.numLines);注册累加器
        public void open(Configuration parameters) throws Exception {
            getRuntimeContext().addAccumulator("num-lines", this.numLines);
    	}
    
        @Override
        public String map(String s) throws Exception {
        //3.使用累加器,可以在任意操作中使用,包括在open 或者close 方法中
            this.numLines.add(1);
            return s;
        }
        }).setParallelism(8);
        map.writeAsText("./TempResult/result",FileSystem.WriteMode.OVERWRITE);
        JobExecutionResult myJobExecutionResult = env.execute("IntCounterTest");
        
        //4.当作业执行完成之后,在JobExecutionResult 对象中获取累加器的值。
            int accumulatorResult = myJobExecutionResult.getAccumulatorResult("num-lines");
            System.out.println("accumulator value = "+accumulatorResult);
    

    八、指定虚拟Key

    Apache Flink 指定虚拟key

    ​ 1.使用Tuples来指定key

    ​ 2.使用Field Expression来指定key

    ​ 3.使用Key Selector Functions来指定key

    ​ 操作(reduce,groupReduce,Aggregate,Windows)允许数据在处理之前根据key 进行分组。
    在Flink 中数据模型不是基于Key,Value 格式处理的,因此不需将数据处理成键值对的格
    式,key 是“虚拟的”,可以人为的来指定,实际数据处理过程中根据指定的key 来对数
    据进行分组,DataSet 中使用groupBy 来指定key,DataStream 中使用keyBy 来指定key。
    如何指定keys?

    1. 使用Tuples 来指定key
      定义元组来指定key 可以指定tuple 中的第几个元素当做key,或者指定tuple 中的
      联合元素当做key。需要使用org.apache.flink.api.java.tuple.TupleXX 包下的tuple,最多
      支持25 个元素且Tuple 必须new 创建。如果Tuple 是嵌套的格式, 例如:
      DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds,如果指定keyBy(0)则会使
      用整个内部的Tuple2 作为key。如果想要使用内部Tuple2 中的Float 格式当做key,
      就要使用第二种方式Field Expression 来指定key。
    2. 使用Field Expression 来指定key
      可以使用Field Expression 来指定key,一般作用的对象可以是类对象,或者嵌套的
      Tuple 格式的数据。
      使用注意点:
      a) 对于类对象可以使用类中的字段来指定key。
      类对象定义需要注意:
      i. 类的访问级别必须是public
      ii. 必须写出默认的空的构造函数
      iii. 类中所有的字段必须是public 的或者必须有getter,setter 方法。例如类
      中有个字段是foo,那么这个字段的getter,setter 方法为:getFoo() 和
      setFoo().
      iv. Flink 必须支持字段的类型。一般类型都支持
      b) 对于嵌套的Tuple 类型的Tuple 数据可以使用”xx.f0”表示嵌套tuple 中第一个元
      素, 也可以直接使用”xx.0” 来表示第一个元素, 参照案例
      GroupByUseFieldExpressions。
    3. 使用Key Selector Functions 来指定key
      使用key Selector 这种方式选择key,非常方便,可以从数据类型中指定想要的key.

    九、Flink & Kafka

    IDEA代码—MyFlinkCode

    1 Flink消费kafka数据起始offset配置

    ​ \1.flinkKafkaConsumer.setStartFromEarliest()

    从topic的最早offset位置开始处理数据,如果kafka中保存有消费者组的消费位置将被忽略。

    ​ \2. flinkKafkaConsumer.setStartFromLatest()

    从topic的最新offset位置开始处理数据,如果kafka中保存有消费者组的消费位置将被忽略。

    ​ \3. flinkKafkaConsumer.setStartFromTimestamp(…)

    从指定的时间戳(毫秒)开始消费数据,Kafka中每个分区中数据大于等于设置的时间戳的数据位置将被当做开始消费的位置。如果kafka中保存有消费者组的消费位置将被忽略。

    ​ \4. flinkKafkaConsumer.setStartFromGroupOffsets()

    默认的设置。根据代码中设置的group.id设置的消费者组,去kafka中或者zookeeper中找到对应的消费者offset位置消费数据。如果没有找到对应的消费者组的位置,那么将按照auto.offset.reset设置的策略读取offset。

    2 Flink消费kafka数据,消费者offset提交配置

    ​ 1.Flink提供了消费kafka数据的offset如何提交给Kafka或者zookeeper(kafka0.8之前)的配置。注意,Flink并不依赖提交给Kafka或者zookeeper中的offset来保证容错。提交的offset只是为了外部来查询监视kafka数据消费的情况

    ​ 2.配置offset的提交方式取决于是否为job设置开启checkpoint。可以使用env.enableCheckpointing(5000)来设置开启checkpoint。

    3.关闭checkpoint:

    ​ 如何禁用了checkpoint,那么offset位置的提交取决于Flink读取kafka客户端的配置,enable.auto.commit ( auto.commit.enable【Kafka 0.8】)配置是否开启自动提交offset, auto.commit.interval.ms决定自动提交offset的周期。

    4.开启checkpoint:

    ​ 如果开启了checkpoint,那么当checkpoint保存状态完成后,将checkpoint中保存的offset位置提交到kafka。这样保证了Kafka中保存的offset和checkpoint中保存的offset一致,可以通过配置setCommitOffsetsOnCheckpoints(boolean)来配置是否将checkpoint中的offset提交到kafka中(默认是true)。如果使用这种方式,那么properties中配置的kafka offset自动提交参数enable.auto.commit和周期提交参数auto.commit.interval.ms参数将被忽略。

    3 Flink中外部状态实现两阶段提交

    ​ Flink外部状态实现两阶段提交将逻辑封装到TwoPhaseComitSinkFunction类中,需要扩展TwoPhaseCommitSinkFunction来实现仅一次消费数据。若要实现支持exactly-once语义的自定义sink,需要实现以下4个方法:

    \1. beginTransaction:开启一个事务,创建一个临时文件,将数据写入到临时文件中

    \2. preCommit:在pre-commit阶段,flush缓存数据到磁盘,然后关闭这个文件,确保不会 有新的数据写入到这个文件,同时开启一个新事务执行属于下一个checkpoint的写入 操作。

    \3. commit:在commit阶段,我们以原子性的方式将上一阶段的文件写入真正的文件目 录下。【注意:数据有延时,不是实时的】。

    \4. abort:一旦异常终止事务,程序如何处理。这里要清除临时文件。

    exactly-once语义的自定义sink,需要实现以下4个方法:

    \1. beginTransaction:开启一个事务,创建一个临时文件,将数据写入到临时文件中

    \2. preCommit:在pre-commit阶段,flush缓存数据到磁盘,然后关闭这个文件,确保不会 有新的数据写入到这个文件,同时开启一个新事务执行属于下一个checkpoint的写入 操作。

    \3. commit:在commit阶段,我们以原子性的方式将上一阶段的文件写入真正的文件目 录下。【注意:数据有延时,不是实时的】。

    \4. abort:一旦异常终止事务,程序如何处理。这里要清除临时文件。

    在这里插入图片描述

    展开全文
  • Flink 最锋利的武器:Flink SQL 入门和实战

    万次阅读 多人点赞 2019-06-21 00:00:00
    一、Flink SQL 背景Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。自 2015...

     

    《2021年最新版大数据面试题全面开启更新》

    《2021年最新版大数据面试题全面开启更新》

     

    一、Flink SQL 背景

    Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。

    自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初将最终代码开源,也就是我们熟知的 Blink。Blink 在原来的 Flink 基础上最显著的一个贡献就是 Flink SQL 的实现。

    Flink SQL 是面向用户的 API 层,在我们传统的流式计算领域,比如 Storm、Spark Streaming 都会提供一些 Function 或者 Datastream API,用户通过 Java 或 Scala 写业务逻辑,这种方式虽然灵活,但有一些不足,比如具备一定门槛且调优较难,随着版本的不断更新,API 也出现了很多不兼容的地方。

    640?wx_fmt=png

    在这个背景下,毫无疑问,SQL 就成了我们最佳选择,之所以选择将 SQL 作为核心 API,是因为其具有几个非常重要的特点:

    • SQL 属于设定式语言,用户只要表达清楚需求即可,不需要了解具体做法;

    • SQL 可优化,内置多种查询优化器,这些查询优化器可为 SQL 翻译出最优执行计划;

    • SQL 易于理解,不同行业和领域的人都懂,学习成本较低;

    • SQL 非常稳定,在数据库 30 多年的历史中,SQL 本身变化较少;

    • 流与批的统一,Flink 底层 Runtime 本身就是一个流与批统一的引擎,而 SQL 可以做到 API 层的流与批统一。

    二、Flink 的最新特性(1.7.0 和 1.8.0 更新)

    2.1 Flink 1.7.0 新特性

    在 Flink 1.7.0 中,我们更接近实现快速数据处理和以无缝方式为 Flink 社区构建数据密集型应用程序的目标。最新版本包括一些新功能和改进,例如对 Scala 2.12 的支持、一次性 S3 文件接收器、复杂事件处理与流 SQL 的集成等。

    Apache Flink 中对 Scala 2.12 的支持(FLINK-7811)

    Apache Flink 1.7.0 是第一个完全支持 Scala 2.12 的版本。这允许用户使用较新的 Scala 版本编写 Flink 应用程序并利用 Scala 2.12 生态系统。

    状态演进(FLINK-9376)

    许多情况下,由于需求的变化,长期运行的 Flink 应用程序需要在其生命周期内发展。在不失去当前应用程序进度状态的情况下更改用户状态是应用程序发展的关键要求。使用 Flink 1.7.0,社区添加了状态演变,允许您灵活地调整长时间运行的应用程序的用户状态模式,同时保持与以前保存点的兼容性。通过状态演变,可以在状态模式中添加或删除列,以便更改应用程序部署后应用程序捕获的业务功能。现在,使用 Avro 生成时,状态模式演变现在可以立即使用作为用户状态的类,这意味着可以根据 Avro 的规范来演变国家的架构。虽然 Avro 类型是 Flink 1.7 中唯一支持模式演变的内置类型,但社区仍在继续致力于在未来的 Flink 版本中进一步扩展对其他类型的支持。

    MATCH RECOGNIZE Streaming SQL 支持(FLINK-6935)

    这是 Apache Flink 1.7.0 的一个重要补充,它为 Flink SQL 提供了 MATCH RECOGNIZE 标准的初始支持。此功能结合了复杂事件处理(CEP)和 SQL,可以轻松地对数据流进行模式匹配,从而实现一整套新的用例。此功能目前处于测试阶段,因此我们欢迎社区提供任何反馈和建议。

    流式 SQL 中的时态表和时间连接(FLINK-9712)

    时态表是 Apache Flink 中的一个新概念,它为表的更改历史提供(参数化)视图,并在特定时间点返回表的内容。例如,我们可以使用具有历史货币汇率的表格。随着时间的推移,这种表格不断增长/发展,并且增加了新的更新汇率。时态表是一种视图,可以将这些汇率的实际状态返回到任何给定的时间点。使用这样的表,可以使用正确的汇率将不同货币的订单流转换为通用货币。时间联接允许使用不断变化/更新的表来进行内存和计算有效的流数据连接。

    Streaming SQL 的其他功能

    除了上面提到的主要功能外,Flink 的 Table&SQL API 已经扩展到更多用例。以下内置函数被添加到 API:TO_BASE64、LOG2、LTRIM、REPEAT、REPLACE、COSH、SINH、TANH SQL Client 现在支持在环境文件和 CLI 会话中定义视图。此外,CLI 中添加了基本的 SQL 语句自动完成功能。社区添加了一个 Elasticsearch 6 表接收器,允许存储动态表的更新结果。

    Kafka 2.0 连接器(FLINK-10598)

    Apache Flink 1.7.0 继续添加更多连接器,使其更容易与更多外部系统进行交互。在此版本中,社区添加了 Kafka 2.0 连接器,该连接器允许通过一次性保证读取和写入 Kafka 2.0。

    本地恢复(FLINK-9635)

    Apache Flink 1.7.0 通过扩展 Flink 的调度来完成本地恢复功能,以便在恢复时考虑以前的部署位置。如果启用了本地恢复,Flink 将保留最新检查点的本地副本任务运行的机器。通过将任务调度到以前的位置,Flink 将通过从本地磁盘读取检查点状态来最小化恢复状态的网络流量。此功能大大提高了恢复速度。

    2.2 Flink 1.8.0 新特性

    Flink 1.8.0 引入对状态的清理

    使用 TTL(生存时间)连续增量清除旧的 Key 状态 Flink 1.8 引入了对 RocksDB 状态后端(FLINK-10471)和堆状态后端(FLINK-10473)的旧数据的连续清理。这意味着旧的数据将(根据 TTL 设置)不断被清理掉。

    新增和删除一些 Table API

    1) 引入新的 CSV 格式符(FLINK-9964)

    此版本为符合 RFC4180 的 CSV 文件引入了新的格式符。新描述符可以使用 org.apache.flink.table.descriptors.Csv。目前,只能与 Kafka 一起使用。旧描述符 org.apache.flink.table.descriptors.OldCsv 用于文件系统连接器。

    2) 静态生成器方法在 TableEnvironment(FLINK-11445)上的弃用

    为了将 API 与实际实现分开,TableEnvironment.getTableEnvironment() 不推荐使用静态方法。现在推荐使用 Batch/StreamTableEnvironment.create()。

    3) 表 API Maven 模块中的更改(FLINK-11064)

    之前具有 flink-table 依赖关系的用户需要更新其依赖关系 flink-table-planner,以及正确的依赖关系 flink-table-api-*,具体取决于是使用 Java 还是 Scala: flink-table-api-java-bridge 或者 flink-table-api-scala-bridge。

    Kafka Connector 的修改

    引入可直接访问 ConsumerRecord 的新 KafkaDeserializationSchema(FLINK-8354),对于 FlinkKafkaConsumers 推出了一个新的 KafkaDeserializationSchema,可以直接访问 KafkaConsumerRecord。

    三、Flink SQL 的编程模型

    Flink 的编程模型基础构建模块是流(streams)与转换 (transformations),每一个数据流起始于一个或多个 source,并终止于一个或多个 sink。

    640?wx_fmt=png

    相信大家对上面的图已经十分熟悉了,当然基于 Flink SQL 编写的 Flink 程序也离不开读取原始数据,计算逻辑和写入计算结果数据三部分。

    一个完整的 Flink SQL 编写的程序包括如下三部分:

    • Source Operator:Soruce operator 是对外部数据源的抽象, 目前 Apache Flink 内置了很多常用的数据源实现例如 MySQL、Kafka 等;

    • Transformation Operators:算子操作主要完成例如查询、聚合操作等,目前 Flink SQL 支持了 Union、Join、Projection、Difference、Intersection 及 window 等大多数传统数据库支持的操作;

    • Sink Operator:Sink operator 是对外结果表的抽象,目前 Apache Flink 也内置了很多常用的结果表的抽象,比如 Kafka Sink 等

    我们通过用一个最经典的 WordCount 程序作为入门,看一下传统的基于 DataSet/DataStream API 开发和基于 SQL 开发有哪些不同?

    • DataStream/DataSetAPI

    
     
    • Flink SQL

    //省略掉初始化环境等公共代码
    SELECT word, COUNT(word) FROM table GROUP BY word;
    

    我们已经可以直观体会到,SQL 开发的快捷和便利性了。

    四、Flink SQL 的语法和算子

    4.1 Flink SQL 支持的语法

    Flink SQL 核心算子的语义设计参考了 1992、2011 等 ANSI-SQL 标准,Flink 使用 Apache Calcite 解析 SQL ,Calcite 支持标准的 ANSI SQL。

    那么 Flink 自身支持的 SQL 语法有哪些呢?

    insert:
    INSERT INTO tableReference
    query
    
    query:
    values
      | {
    select
          | selectWithoutFrom
          | query UNION [ ALL ] query
          | query EXCEPT query
          | query INTERSECT query
        }
        [ ORDER BY orderItem [, orderItem ]* ]
        [ LIMIT { count | ALL } ]
        [ OFFSET start { ROW | ROWS } ]
        [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
    
    orderItem:
      expression [ ASC | DESC ]
    
    select:
    SELECT [ ALL | DISTINCT ]
      { * | projectItem [, projectItem ]* }
    FROM tableExpression
      [ WHERE booleanExpression ]
      [ GROUP BY { groupItem [, groupItem ]* } ]
      [ HAVING booleanExpression ]
      [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
    
    selectWithoutFrom:
    SELECT [ ALL | DISTINCT ]
      { * | projectItem [, projectItem ]* }
    
    projectItem:
      expression [ [ AS ] columnAlias ]
      | tableAlias . *
    
    tableExpression:
      tableReference [, tableReference ]*
      | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
    
    joinCondition:
    ON booleanExpression
      | USING '(' column [, column ]* ')'
    
    tableReference:
      tablePrimary
      [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
    
    tablePrimary:
      [ TABLE ] [ [ catalogName . ] schemaName . ] tableName
      | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
      | UNNEST '(' expression ')'
    
    values:
    VALUES expression [, expression ]*
    
    groupItem:
      expression
      | '(' ')'
      | '(' expression [, expression ]* ')'
      | CUBE '(' expression [, expression ]* ')'
      | ROLLUP '(' expression [, expression ]* ')'
      | GROUPING SETS '(' groupItem [, groupItem ]* ')'
    
    windowRef:
        windowName
      | windowSpec
    
    windowSpec:
        [ windowName ]
    '('
        [ ORDER BY orderItem [, orderItem ]* ]
        [ PARTITION BY expression [, expression ]* ]
        [
    RANGE numericOrIntervalExpression {PRECEDING}
          | ROWS numericExpression {PRECEDING}
        ]
    ')'
    

    上面 SQL 的语法支持也已经表明了 Flink SQL 对算子的支持,接下来我们对 Flink SQL 中最常见的算子语义进行介绍。

    4.2 Flink SQL 常用算子

    SELECT

    SELECT 用于从 DataSet/DataStream 中选择数据,用于筛选出某些列。

    示例:

    SELECT * FROM Table;// 取出表中的所有列
    SELECT name,age FROM Table;// 取出表中 name 和 age 两列
    

    与此同时 SELECT 语句中可以使用函数和别名,例如我们上面提到的 WordCount 中:

    SELECT word, COUNT(word) FROM table GROUP BY word;
    

    WHERE

    WHERE 用于从数据集/流中过滤数据,与 SELECT 一起使用,用于根据某些条件对关系做水平分割,即选择符合条件的记录。

    示例:

    SELECT name,age FROM Table where name LIKE ‘% 小明 %’;
    SELECT * FROM Table WHERE age = 20

    WHERE 是从原数据中进行过滤,那么在 WHERE 条件中,Flink SQL 同样支持 =、<、>、<>、>=、<=,以及 AND、OR 等表达式的组合,最终满足过滤条件的数据会被选择出来。并且 WHERE 可以结合 IN、NOT IN 联合使用。举个负责的例子:

    SELECT name, age
    FROM Table
    WHERE name IN (SELECT name FROM Table2)
    

    DISTINCT

    DISTINCT 用于从数据集/流中去重根据 SELECT 的结果进行去重。

    示例:

    SELECT DISTINCT name FROM Table;
    

    对于流式查询,计算查询结果所需的 State 可能会无限增长,用户需要自己控制查询的状态范围,以防止状态过大。

    GROUP BY

    GROUP BY 是对数据进行分组操作。例如我们需要计算成绩明细表中,每个学生的总分。

    SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name;
    

    UNION 和 UNION ALL

    UNION 用于将两个结果集合并起来,要求两个结果集字段完全一致,包括字段类型、字段顺序。不同于 UNION ALL 的是,UNION 会对结果数据去重。

    示例:

    SELECT * FROM T1 UNION (ALL) SELECT * FROM T2;
    

    JOIN

    JOIN 用于把来自两个表的数据联合起来形成结果表,Flink 支持的 JOIN 类型包括:

    • JOIN - INNER JOIN

    • LEFT JOIN - LEFT OUTER JOIN

    • RIGHT JOIN - RIGHT OUTER JOIN

    • FULL JOIN - FULL OUTER JOIN

    这里的 JOIN 的语义和我们在关系型数据库中使用的 JOIN 语义一致。

    示例:

    JOIN(将订单表数据和商品表进行关联)
    SELECT * FROM Orders INNER JOIN Product ON Orders.productId = [Product.id](http://product.id/)
    

    LEFT JOIN 与 JOIN 的区别是当右表没有与左边相 JOIN 的数据时候,右边对应的字段补 NULL 输出,RIGHT JOIN 相当于 LEFT JOIN 左右两个表交互一下位置。FULL JOIN 相当于 RIGHT JOIN 和 LEFT JOIN 之后进行 UNION ALL 操作。

    示例:

    SELECT *
    FROM Orders LEFT JOIN Product ON Orders.productId = [Product.id](http://product.id/)
    
    SELECT *
    FROM Orders RIGHT JOIN Product ON Orders.productId = [Product.id](http://product.id/)
    
    SELECT *
    FROM Orders FULL OUTER JOIN Product ON Orders.productId = [Product.id](http://product.id/)
    

    Group Window

    根据窗口数据划分的不同,目前 Apache Flink 有如下 3 种 Bounded Window:

    • Tumble,滚动窗口,窗口数据有固定的大小,窗口数据无叠加;

    • Hop,滑动窗口,窗口数据有固定大小,并且有固定的窗口重建频率,窗口数据有叠加;

    • Session,会话窗口,窗口数据没有固定的大小,根据窗口数据活跃程度划分窗口,窗口数据无叠加。

    Tumble Window

    Tumble 滚动窗口有固定大小,窗口数据不重叠,具体语义如下:

    640?wx_fmt=png

    Tumble 滚动窗口对应的语法如下:

    SELECT 
        [gk],
        [TUMBLE_START(timeCol, size)], 
        [TUMBLE_END(timeCol, size)], 
        agg1(col1), 
        ... 
        aggn(colN)
    FROM Tab1
    GROUP BY [gk], TUMBLE(timeCol, size)
    

    其中:

    • [gk] 决定了是否需要按照字段进行聚合;

    • TUMBLE_START 代表窗口开始时间;

    • TUMBLE_END 代表窗口结束时间;

    • timeCol 是流表中表示时间字段;

    • size 表示窗口的大小,如 秒、分钟、小时、天。

    举个例子,假如我们要计算每个人每天的订单量,按照 user 进行聚合分组:

    SELECT user, TUMBLE_START(rowtime, INTERVAL1DAY) as wStart, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime, INTERVAL1DAY), user;
    

    Hop Window

    Hop 滑动窗口和滚动窗口类似,窗口有固定的 size,与滚动窗口不同的是滑动窗口可以通过 slide 参数控制滑动窗口的新建频率。因此当 slide 值小于窗口 size 的值的时候多个滑动窗口会重叠,具体语义如下:

    640?wx_fmt=png

    Hop 滑动窗口对应语法如下:

    SELECT 
        [gk], 
        [HOP_START(timeCol, slide, size)] ,  
        [HOP_END(timeCol, slide, size)],
        agg1(col1), 
        ... 
        aggN(colN) 
    FROM Tab1
    GROUP BY [gk], HOP(timeCol, slide, size)
    

    每次字段的意思和 Tumble 窗口类似:

    • [gk] 决定了是否需要按照字段进行聚合;

    • HOP_START 表示窗口开始时间;

    • HOP_END 表示窗口结束时间;

    • timeCol 表示流表中表示时间字段;

    • slide 表示每次窗口滑动的大小;

    • size 表示整个窗口的大小,如 秒、分钟、小时、天。

    举例说明,我们要每过一小时计算一次过去 24 小时内每个商品的销量:

    SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product
    

    Session Window

    会话时间窗口没有固定的持续时间,但它们的界限由 interval 不活动时间定义,即如果在定义的间隙期间没有出现事件,则会话窗口关闭。

    640?wx_fmt=png

    Seeeion 会话窗口对应语法如下:

    SELECT 
        [gk], 
        SESSION_START(timeCol, gap) AS winStart,  
        SESSION_END(timeCol, gap) AS winEnd,
        agg1(col1),
         ... 
        aggn(colN)
    FROM Tab1
    GROUP BY [gk], SESSION(timeCol, gap)
    
    • [gk] 决定了是否需要按照字段进行聚合;

    • SESSION_START 表示窗口开始时间;

    • SESSION_END 表示窗口结束时间;

    • timeCol 表示流表中表示时间字段;

    • gap 表示窗口数据非活跃周期的时长。

    例如,我们需要计算每个用户访问时间 12 小时内的订单量:

    SELECT user, SESSION_START(rowtime, INTERVAL12HOUR) AS sStart, SESSION_ROWTIME(rowtime, INTERVAL12HOUR) AS sEnd, SUM(amount) FROM Orders GROUP BY SESSION(rowtime, INTERVAL12HOUR), user
    

    五、Flink SQL 的内置函数

    Flink 提供大量的内置函数供我们直接使用,我们常用的内置函数分类如下:

    • 比较函数

    • 逻辑函数

    • 算术函数

    • 字符串处理函数

    • 时间函数

       

    我们接下来对每种函数举例进行讲解。

    5.1 比较函数

    比较函数 描述
    value1=value2 如果 value1 等于 value2,则返回 TRUE ; 如果 value1 或 value2 为 NULL,则返回 UNKNOWN
    value1<>value2 如果 value1 不等于 value2,则返回 TRUE ; 如果 value1 或 value2 为 NULL,则返回 UNKNOWN
    value1>value2 如果 value1 大于 value2,则返回 TRUE ; 如果 value1 或 value2 为 NULL,则返回 UNKNOWN
    value1 < value2 如果 value1 小于 value2,则返回 TRUE ; 如果 value1 或 value2 为 NULL,则返回 UNKNOWN
    value IS NULL 如果 value 为 NULL,则返回 TRUE
    value IS NOT NULL 如果 value 不为 NULL,则返回 TRUE
    string1 LIKE string2 如果 string1 匹配模式 string2,则返回 TRUE ; 如果 string1 或 string2 为 NULL,则返回 UNKNOWN
    value1 IN (value2, value3…) 如果给定列表中存在 value1 (value2,value3,…),则返回 TRUE 。当(value2,value3,…)包含 NULL,如果可以找到该数据元则返回 TRUE,否则返回 UNKNOWN。如果 value1 为 NULL,则始终返回 UNKNOWN

    5.2 逻辑函数

    逻辑函数 描述
    A OR B 如果 A 为 TRUE 或 B 为 TRUE,则返回 TRUE
    A AND B 如果 A 和 B 都为 TRUE,则返回 TRUE
    NOT boolean 如果 boolean 为 FALSE,则返回 TRUE,否则返回 TRUE。如果 boolean 为 TRUE,则返回 FALSE
    A IS TRUE 或 FALSE 判断 A 是否为真

    5.3 算术函数

    算术函数 描述
    numeric1 ±*/ numeric2 分别代表两个数值加减乘除
    ABS(numeric) 返回 numeric 的绝对值
    POWER(numeric1, numeric2) 返回 numeric1 上升到 numeric2 的幂

    除了上述表中的函数,Flink SQL 还支持种类丰富的函数计算。

    5.4 字符串处理函数

    字符串函数 描述
    UPPER/LOWER 以大写 / 小写形式返回字符串
    LTRIM(string) 返回一个字符串,从去除左空格的字符串, 类似还有 RTRIM
    CONCAT(string1, string2,…) 返回连接 string1,string2,…的字符串

    5.5 时间函数

    时间函数 描述
    DATE string 返回以“yyyy-MM-dd”形式从字符串解析的 SQL 日期
    TIMESTAMP string 返回以字符串形式解析的 SQL 时间戳,格式为“yyyy-MM-dd HH:mm:ss [.SSS]”
    CURRENT_DATE 返回 UTC 时区中的当前 SQL 日期
    DATE_FORMAT(timestamp, string) 返回使用指定格式字符串格式化时间戳的字符串

    六、Flink SQL 实战应用

    上面我们分别介绍了 Flink SQL 的背景、新特性、编程模型和常用算子,这部分我们将模拟一个真实的案例为大家使用 Flink SQL 提供一个完整的 Demo。

    相信这里应该有很多 NBA 的球迷,假设我们有一份数据记录了每个赛季的得分王的数据,包括赛季、球员、出场、首发、时间、助攻、抢断、盖帽、得分等。现在我们要统计获得得分王荣誉最多的三名球员。

    原数据存在 score.csv 文件中,如下:

    17-18,詹姆斯-哈登,72,72,35.4,8.8,1.8,0.7,30.4
    16-17,拉塞尔-威斯布鲁克,81,81,34.6,10.4,1.6,0.4,31.6
    15-16,斯蒂芬-库里,79,79,34.2,6.7,2.1,0.2,30.1
    14-15,拉塞尔-威斯布鲁克,67,67,34.4,8.6,2.1,0.2,28.1
    13-14,凯文-杜兰特,81,81,38.5,5.5,1.3,0.7,32
    12-13,卡梅罗-安东尼,67,67,37,2.6,0.8,0.5,28.7
    11-12,凯文-杜兰特,66,66,38.6,3.5,1.3,1.2,28
    10-11,凯文-杜兰特,78,78,38.9,2.7,1.1,1,27.7
    09-10,凯文-杜兰特,82,82,39.5,2.8,1.4,1,30.1
    08-09,德维恩-韦德,79,79,38.6,7.5,2.2,1.3,30.2
    07-08,勒布朗-詹姆斯,75,74,40.4,7.2,1.8,1.1,30
    06-07,科比-布莱恩特,77,77,40.8,5.4,1.4,0.5,31.6
    05-06,科比-布莱恩特,80,80,41,4.5,1.8,0.4,35.4
    04-05,阿伦-艾弗森,75,75,42.3,7.9,2.4,0.1,30.7
    03-04,特雷西·麦克格雷迪,67,67,39.9,5.5,1.4,0.6,28
    02-03,特雷西·麦克格雷迪,75,74,39.4,5.5,1.7,0.8,32.1
    01-02,阿伦-艾弗森,60,59,43.7,5.5,2.8,0.2,31.4
    00-01,阿伦-艾弗森,71,71,42,4.6,2.5,0.3,31.1
    99-00,沙奎尔-奥尼尔,79,79,40,3.8,0.5,3,29.7
    98-99,阿伦-艾弗森,48,48,41.5,4.6,2.3,0.1,26.8
    97-98,迈克尔-乔丹,82,82,38.8,3.5,1.7,0.5,28.7
    96-97,迈克尔-乔丹,82,82,37.9,4.3,1.7,0.5,29.6
    95-96,迈克尔-乔丹,82,82,37.7,4.3,2.2,0.5,30.4
    94-95,沙奎尔-奥尼尔,79,79,37,2.7,0.9,2.4,29.3
    93-94,大卫-罗宾逊,80,80,40.5,4.8,1.7,3.3,29.8
    92-93,迈克尔-乔丹,78,78,39.3,5.5,2.8,0.8,32.6
    91-92,迈克尔-乔丹,80,80,38.8,6.1,2.3,0.9,30.1
    90-91,迈克尔-乔丹,82,82,37,5.5,2.7,1,31.5
    89-90,迈克尔-乔丹,82,82,39,6.3,2.8,0.7,33.6
    88-89,迈克尔-乔丹,81,81,40.2,8,2.9,0.8,32.5
    87-88,迈克尔-乔丹,82,82,40.4,5.9,3.2,1.6,35
    86-87,迈克尔-乔丹,82,82,40,4.6,2.9,1.5,37.1
    85-86,多米尼克-威尔金斯,78,78,39.1,2.6,1.8,0.6,30.3
    84-85,伯纳德-金,55,55,37.5,3.7,1.3,0.3,32.9
    83-84,阿德里安-丹特利,79,79,37.8,3.9,0.8,0.1,30.6
    82-83,阿历克斯-英格利什,82,82,36.4,4.8,1.4,1.5,28.4
    81-82,乔治-格文,79,79,35.7,2.4,1,0.6,32.3
    

    首先我们需要创建一个工程,并且在 Maven 中有如下依赖:

    <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <flink.version>1.7.1</flink.version>
    <slf4j.version>1.7.7</slf4j.version>
    <log4j.version>1.2.17</log4j.version>
    <scala.binary.version>2.11</scala.binary.version>
    </properties>
    <dependencies>
    <!-- Apache Flink dependencies -->
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table_2.11</artifactId>
    <version>1.7.1</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
    <version>1.7.1</version>
    </dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>${slf4j.version}</version>
    </dependency>
    <dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>${log4j.version}</version>
    </dependency>
    

    第一步,创建上下文环境:

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
    

    第二步,读取 score.csv 并且作为 source 输入:

     DataSet<String> input = env.readTextFile("score.csv");
            DataSet<PlayerData> topInput = input.map(new MapFunction<String, PlayerData>() {
    @Override
    public PlayerData map(String s) throws Exception {
                    String[] split = s.split(",");
    return new PlayerData(String.valueOf(split[0]),
                            String.valueOf(split[1]),
                            String.valueOf(split[2]),
                            Integer.valueOf(split[3]),
                            Double.valueOf(split[4]),
                            Double.valueOf(split[5]),
                            Double.valueOf(split[6]),
                            Double.valueOf(split[7]),
                            Double.valueOf(split[8])
                    );
                }
            });
    其中的PlayerData类为自定义类:
    public static class PlayerData {
    /**
             * 赛季,球员,出场,首发,时间,助攻,抢断,盖帽,得分
             */
    public String season;
    public String player;
    public String play_num;
    public Integer first_court;
    public Double time;
    public Double assists;
    public Double steals;
    public Double blocks;
    public Double scores;
    
    public PlayerData() {
    super();
            }
    
    public PlayerData(String season,
                              String player,
                              String play_num,
                              Integer first_court,
                              Double time,
                              Double assists,
                              Double steals,
                              Double blocks,
                              Double scores
                              ) {
    this.season = season;
    this.player = player;
    this.play_num = play_num;
    this.first_court = first_court;
    this.time = time;
    this.assists = assists;
    this.steals = steals;
    this.blocks = blocks;
    this.scores = scores;
            }
        }
    

    第三步,将 source 数据注册成表:

    Table topScore = tableEnv.fromDataSet(topInput);
    tableEnv.registerTable("score", topScore);
    

    第四步,核心处理逻辑 SQL 的编写:

    Table queryResult = tableEnv.sqlQuery("
    select player, 
    count(season) as num 
    FROM score 
    GROUP BY player 
    ORDER BY num desc 
    LIMIT 3
    ");
    

    第五步,输出结果:

    DataSet<Result> result = tableEnv.toDataSet(queryResult, Result.class);
    result.print();
    

    我们直接运行整个程序,观察输出结果:

    ...
    16:28:06,162 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Shut down complete.
    16:28:06,162 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Stop job leader service.
    16:28:06,164 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Stopped TaskExecutor akka://flink/user/taskmanager_2.
    16:28:06,166 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
    16:28:06,166 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.
    16:28:06,169 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.
    16:28:06,177 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopping Akka RPC service.
    16:28:06,187 INFO  org.apache.flink.runtime.blob.PermanentBlobCache              - Shutting down BLOB cache
    16:28:06,187 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Shutting down BLOB cache
    16:28:06,188 INFO  org.apache.flink.runtime.blob.BlobServer                      - Stopped BLOB server at 0.0.0.0:51703
    16:28:06,188 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped Akka RPC service.
    迈克尔-乔丹:10
    凯文-杜兰特:4
    阿伦-艾弗森:4
    

    我们看到控制台已经输出结果了:

    640?wx_fmt=jpeg

    完整的代码如下:

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.BatchTableEnvironment;
    
    public class TableSQL {
    
    public static void main(String[] args) throws Exception{
    
    //1\. 获取上下文环境 table的环境
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
    
    //2\. 读取score.csv
            DataSet<String> input = env.readTextFile("score.csv");
            input.print();
    
            DataSet<PlayerData> topInput = input.map(new MapFunction<String, PlayerData>() {
    @Override
    public PlayerData map(String s) throws Exception {
                    String[] split = s.split(",");
    
    return new PlayerData(String.valueOf(split[0]),
                            String.valueOf(split[1]),
                            String.valueOf(split[2]),
                            Integer.valueOf(split[3]),
                            Double.valueOf(split[4]),
                            Double.valueOf(split[5]),
                            Double.valueOf(split[6]),
                            Double.valueOf(split[7]),
                            Double.valueOf(split[8])
                    );
                }
            });
    
    //3\. 注册成内存表
            Table topScore = tableEnv.fromDataSet(topInput);
            tableEnv.registerTable("score", topScore);
    
    //4\. 编写sql 然后提交执行
    //select player, count(season) as num from score group by player order by num desc;
            Table queryResult = tableEnv.sqlQuery("select player, count(season) as num from score group by player order by num desc limit 3");
    
    //5\. 结果进行打印
            DataSet<Result> result = tableEnv.toDataSet(queryResult, Result.class);
            result.print();
    
        }
    
    public static class PlayerData {
    /**
             * 赛季,球员,出场,首发,时间,助攻,抢断,盖帽,得分
             */
    public String season;
    public String player;
    public String play_num;
    public Integer first_court;
    public Double time;
    public Double assists;
    public Double steals;
    public Double blocks;
    public Double scores;
    
    public PlayerData() {
    super();
            }
    
    public PlayerData(String season,
                              String player,
                              String play_num,
                              Integer first_court,
                              Double time,
                              Double assists,
                              Double steals,
                              Double blocks,
                              Double scores
                              ) {
    this.season = season;
    this.player = player;
    this.play_num = play_num;
    this.first_court = first_court;
    this.time = time;
    this.assists = assists;
    this.steals = steals;
    this.blocks = blocks;
    this.scores = scores;
            }
        }
    
    public static class Result {
    public String player;
    public Long num;
    
    public Result() {
    super();
            }
    public Result(String player, Long num) {
    this.player = player;
    this.num = num;
            }
    @Override
    public String toString() {
    return player + ":" + num;
            }
        }
    }//
    

    当然我们也可以自定义一个 Sink,将结果输出到一个文件中,例如:

            TableSink sink = new CsvTableSink("/home/result.csv", ",");
    String[] fieldNames = {"name", "num"};
            TypeInformation[] fieldTypes = {Types.STRING, Types.INT};
            tableEnv.registerTableSink("result", fieldNames, fieldTypes, sink);
            sqlQuery.insertInto("result");
            env.execute();
    

    然后我们运行程序,可以看到 /home 目录下生成的 result.csv,查看结果:

    迈克尔-乔丹,10
    凯文-杜兰特,4
    阿伦-艾弗森,4
    

    七、总结

    本篇向大家介绍了 Flink SQL 产生的背景,Flink SQL 大部分核心功能,并且分别介绍了 Flink SQL 的编程模型和常用算子及内置函数。最后以一个完整的示例展示了如何编写 Flink SQL 程序。Flink SQL 的简便易用极大地降低了 Flink 编程的门槛,是我们必需掌握的使用 Flink 解决流式计算问题最锋利的武器!

    640?wx_fmt=gif

    640?wx_fmt=jpeg

    展开全文
  • flink集群安装部署 yarn集群模式 Flink入门及实战-上: http://edu.51cto.com/sd/07245 Flink入门及实战-下: http://edu.51cto.com/sd/5845e 快速开始 在yarn上启动一个一直运行的flink集群 在yarn上...

    flink集群安装部署

    yarn集群模式

     

    Flink入门及实战-上:

    http://edu.51cto.com/sd/07245

    Flink入门及实战-下:

    http://edu.51cto.com/sd/5845e

     

    • 快速开始
    1. 在yarn上启动一个一直运行的flink集群
    2. 在yarn上运行一个flink job
    • flink yarn session
    1. 启动flink session
    2. 提交任务到flink
    • 在yarn上运行一个独立的flink job
    1. 用户依赖jar包和classpath
    • flink on yarn的故障恢复
    • 调试一个失败的yarn session
    1. 日志文件
    2. yarn client控制台和web界面
    • 针对指定的hadoop版本构建yarn client
    • 在yarn上运行flink使用防火墙
    • flink on yarn 内部实现

    快速开始

    在yarn上启动一个一直运行的flink集群

    启动一个yarn session使用4个taskmanager(每个节点4GB内存)
    注意:如果自己的虚拟机没有这么大内存的话,可以吧-n设置小一点,对应的后面的内存-jm -tm也设置小一点,否则,如果内存不够用,会导致启动失败。

    tar xvzf flink-1.4.2-bin-hadoop2.tgz
    cd flink-1.4.2/
    ./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096

    通过-s参数指定每一个taskmanager分配多少个slots(处理进程)。我们建议设置为每个机器的CPU核数。

    一旦session创建成功,你可以使用./bin/flink工具向集群提交任务。

    在yarn上运行一个flink job

    tar xvzf flink-1.4.2-bin-hadoop2.tgz
    cd flink-1.4.2/
    ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/batch/WordCount.jar

     

    flink yarn session

    yarn是一个集群资源管理框架。它运行在集群之上运行各种分布式应用程序。flink像其他程序一样,也可以在yarn上运行。用户不需要设置或者安装任何东西,如果已经有一个安装配置好的yarn。

    必须的依赖

     

    • 至少是hadoop2.2
    • hdfs(或者是其它支持hadoop的分布式文件系统)

    如果你在使用flink yarn client的时候有什么问题,可以到这里查找答案

    启动flink session

    按照下面的步骤来学习如何在yarn集群上启动一个flink session

    一个session将会包含所有必须的flink 服务(jobmanager和taskmanager),这样你就可以向这个集群提交程序了。注意:每个session会话你可以运行多个程序。

    下载flink

    下载hadoop2对应的flink安装包,点此下载。它包含了必须的文件。

    使用下面命令解压:

    tar xvzf flink-1.4.2-bin-hadoop2.tgz
    cd flink-1.4.2/

    启动一个session

    使用下面命令启动一个session

    ./bin/yarn-session.sh

    这个命令将会输出下面内容:

    用法:
       必选
         -n,--container <arg>   分配多少个yarn容器 (=taskmanager的数量)
       可选
         -D <arg>                        动态属性
         -d,--detached                   独立运行
         -jm,--jobManagerMemory <arg>    JobManager的内存 [in MB]
         -nm,--name                     在YARN上为一个自定义的应用设置一个名字
         -q,--query                      显示yarn中可用的资源 (内存, cpu核数)
         -qu,--queue <arg>               指定YARN队列.
         -s,--slots <arg>                每个TaskManager使用的slots数量
         -tm,--taskManagerMemory <arg>   每个TaskManager的内存 [in MB]
         -z,--zookeeperNamespace <arg>   针对HA模式在zookeeper上创建NameSpace

    请注意:client必须要设置YARN_CONF_DIR或者HADOOP_CONF_DIR环境变量,通过这个环境变量来读取YARN和HDFS的配置信息,否则启动会失败。

    经试验发现,其实如果配置的有HADOOP_HOME环境变量的话也是可以的。HADOOP_HOME ,YARN_CONF_DIR,HADOOP_CONF_DIR 只要配置的有任何一个即可。

    例子:下面的命令会申请10个taskmanager,每个8G内存和32个solt

    ./bin/yarn-session.sh -n 10 -tm 8192 -s 32

    该系统默认会使用这个配置文件:conf/flink-conf.yaml。如果你想修改一些参数,请查看我们的配置指南

     

    flink on yarn模式将会覆盖一些配置文件 jobmanager.rpc.address(因为jobmanager总是分配在不同的机器),taskmanager.tmp.dirs(我们使用yarn提供的临时目录)和parallelism.default 如果solts的数量已经被指定。

    如果你不想修改配置文件去改变参数,有一个选择是通过动态的参数-D 来指定。所以你可以传递参数:-Dfs.overwrite-files=true -Dtaskmanager.network.memory.min=536346624

    上面的例子将会启动11个容器(即使仅请求10个容器),因为有一个额外的容器来启动ApplicationMaster 和 job manager

    一旦flink在你的yarn集群上部署,它将会显示job manager的连接详细信息。

    停止yarn session通过停止unix进程(使用CTRL+C)或者在client中输入stop。

    Flink on yarn只会启动请求的资源,如果集群资源充足。大多数yarn调度器请求容器的内存,一些也会请求cpu。默认,cpu的核数等于slots的数量,通过-s参数指定。这个参数yarn.containers.vcores的值允许使用一个自定义值来进行覆盖。

     

    后台 yarn session

    如果你不希望flink yarn client一直运行,也可以启动一个后台运行的yarn session。使用这个参数:-d 或者 --detached

    在这种情况下,flink yarn client将会只提交任务到集群然后关闭自己。注意:在这种情况下,无法使用flink停止yarn session。

    使用yarn 工具 来停止yarn session

    yarn application -kill <applicationId>

    附着到一个已存在的session

    使用下面命令启动一个session

    ./bin/yarn-session.sh

    执行这个命令将会显示下面内容:

    用法:
       必须
         -id,--applicationId <yarnAppId>        YARN集群上的任务id

    正如前面提到的,YARN_CONF_DIR或者HADOOP_CONF_DIR环境变量必须是可以读取到YARN和HDFS配置的。

    例如:发出下面命令可以附着到一个运行中的flink yarn session

    ./bin/yarn-session.sh -id application_1463870264508_0029

    附着到一个运行的session使用yarn resourcemanager来确定job Manager 的RPC端口。

    停止yarn session通过停止unix进程(使用CTRL+C)或者在client中输入stop

     

    提交任务到flink

    使用下面的命令提交一个flink程序到yarn集群

    ./bin/flink

    请参考客户端命令行操作文档

     

    这个命令将会向你展示一个这样一个帮助菜单

    "run" 参数可以编译和运行一个程序

    用法: run [OPTIONS] <jar-file> <arguments>
      "run" 操作参数:
         -c,--class <classname>           如果没有在jar包中指定入口类,则需要在这里通过这个参数指定
    
         -m,--jobmanager <host:port>      指定需要连接的jobmanager(主节点)地址
                                          使用这个参数可以指定一个不同于配置文件中的jobmanager
    
         -p,--parallelism <parallelism>   指定程序的并行度。可以覆盖配置文件中的默认值。

    使用run 命令向yarn集群提交一个job。客户端可以确定jobmanager的地址。当然,你也可以通过-m参数指定jobmanager。jobmanager的地址在yarn控制台上可以看到。

    例子:【注意:下面的命令官网文档提供的有问题,执行失败】

    wget -O LICENSE-2.0.txt http://www.apache.org/licenses/LICENSE-2.0.txt
    hadoop fs -copyFromLocal LICENSE-2.0.txt hdfs:/// ...
    【注意:下面的命令官网文档提供的有问题,执行失败】
    ./bin/flink run ./examples/batch/WordCount.jar \
            hdfs:///..../LICENSE-2.0.txt hdfs:///.../wordcount-result.txt
    查看flink源码发现,wordCount.jar可以不提供参数,或者提供参数,提供参数的时候需要使用input和output参数指定:
    上面的命令需要修改为如下格式才能正常执行[传递的两个参数需要使用-input和 -output来指定]
    ./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hostname:port/LICENSE-2.0.txt -output hdfs://hostname:port/wordcount-result.txt
    
    
    

    如果有以下错误,确保所有taskmanager是否已经启动:

    Exception in thread "main" org.apache.flink.compiler.CompilerException:
        Available instances could not be determined from job manager: Connection timed out.

    你可以在jobmanager的web界面上检查taskmanager的数量。这个web界面的地址会打印在yarn session的控制台上。

    如果没有发现taskmanager,你应该通过日志文件来检查问题。

     

    在yarn上运行一个独立的flink job

    这个文档描述了如何在一个hadoop yarn环境中启动flink集群。也可以在yarn中启动只执行单个任务的flink。

    请注意:client期望设置-yn 参数(taskmanager的数量)

    例子:

    ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar

    yarn session命令行的选项也可以使用./bin/flink 工具获得。它们都有一个y或者yarn的前缀

    注意:通过为每个任务设置不同的环境变量 FLINK_CONF_DIR,可以为每个任务使用不同的配置目录。从 Flink 分发包中复制 conf 目录,然后修改配置,例如,每个任务不同的日志设置

     

    用户依赖jar包和classpath

    默认情况下,当运行一个独立的job的时候,这个flink job将包含用户依赖的jar包。可以通过参数yarn.per-job-cluster.include-user-jar来控制。

    当设置为 DISABLED ,flink将会包含用户classpath下面的jar包。

    用户jar包在类路径中的位置可以通过下面参数来控制:

    * ORDER: (默认) 添加jar包到系统类路径下面,按照字典顺序.
    * FIRST: 将jar添加到类路径的前面.
    * LAST: 将jar添加到类路径的最后.

     

    flink on yarn的故障恢复

    flink 的 yarn 客户端通过下面的配置参数来控制容器的故障恢复。这些参数可以通过conf/flink-conf.yaml 或者在启动yarn session的时候通过-D参数来指定。

     

    • yarn.reallocate-failed:这个参数控制了flink是否应该重新分配失败的taskmanager容器。默认是true。
    • yarn.maximum-failed-containers:applicationMaster可以接受的容器最大失败次数,达到这个参数,就会认为yarn session失败。默认这个次数和初始化请求的taskmanager数量相等(-n 参数指定的)。
    • yarn.application-attempts:applicationMaster重试的次数。如果这个值被设置为1(默认就是1),当application master失败的时候,yarn session也会失败。设置一个比较大的值的话,yarn会尝试重启applicationMaster。

     

    调试一个失败的yarn session

     

    一个flink yarn session部署失败可能会有很多原因。一个错误的hadoop配置(hdfs 权限,yarn配置),版本不兼容(使用cdh中的hadoop运行flink),或者其他的错误。

    日志文件

    在某种情况下,flink yarn session 部署失败是由于它自身的原因,用户必须依赖于yarn的日志来进行分析。最有用的就是yarn log aggregation 。启动它,用户必须在yarn-site.xml文件中设置yarn.log-aggregation-enable 属性为true。一旦启用了,用户可以通过下面的命令来查看一个失败的yarn session的所有详细日志。

    yarn logs -applicationId <application ID>

    yarn client控制台和web界面

    flink yarn client也会打印一些错误信息在控制台上,如果错误发生在运行时(例如如果一个taskmanager停止工作了一段时间)

    除此之外,yarn resource manager的web界面(默认端口是8088)。resource manager的端口是通过yarn.resourcemanager.webapp.address参数来配置的。

    它运行在yarn 程序运行的时候查看日志和程序失败的时候查看日志用户查找问题。

     

    针对指定的hadoop版本构建yarn client

    用户可以使用hadoop发行版。例如,hortonworks,CDH或者MapR等版本去构建 flink。请参考构建指南获取详细信息

     

    在yarn上运行flink使用防火墙

    一些yarn 集群使用防火墙来控制集群的网络和其他网络的通信。在这种设置下,flink只能通过集群的网络来提交任务到yarn session。针对生产环境下使用是不可行的,flink允许配置所有相关服务的端口范围,通过这些端口范围的配置,用户也可以透过防火墙来提交flink job。

    目前,两个服务都需要提交任务:

    • jobmanager(yarn中的applicationMaster)
    • jobmanager内部运行的blobserver

    当向flink提交一个任务的时候,blobserver将会把用户的代码分发到所有工作节点(taskManagers)。jobmanager接收任务本身,并触发执行。

    以下两个配置参数可以指定端口:

     

    • yarn.application-master.port
    • blob.server.port

    这两个配置选项接收单一的端口(例如:"50010"),区间("50000-50025"),或者同时指定多个("50010,50011,50020-50025,50050-50075")。

    (hadoop也使用的是类似的机制,例如:yarn.app.mapreduce.am.job.client.port-range)

     

    flink on yarn 内部实现

    本节主要描述flink和yarn是如何交互的

    YARN 客户端需要访问 Hadoop 配置,从而连接 YARN 资源管理器和 HDFS。可以使用下面的策略来决定 Hadoop 配置:

     

    • 测试 YARN_CONF_DIR, HADOOP_CONF_DIR 或 HADOOP_CONF_PATH 环境变量是否设置了(按该顺序测试)。如果它们中有一个被设置了,那么它们就会用来读取配置。
    • 如果上面的策略失败了(如果正确安装了 YARN 的话,这不应该会发生),客户端会使用 HADOOP_HOME 环境变量。如果该变量设置了,客户端会尝试访问 $HADOOP_HOME/etc/hadoop (Hadoop 2) 和 $HADOOP_HOME/conf(Hadoop 1)。

    当启动一个新的 Flink YARN Client会话,客户端首先会检查所请求的资源(容器和内存)是否可用。之后,它会上传包含了 Flink 配置和 jar文件到 HDFS(步骤 1)。

    客户端的下一步是请求(步骤 2)一个 YARN 容器启动 ApplicationMaster (步骤 3)。因为客户端将配置和jar 文件作为容器的资源注册了,所以运行在特定机器上的 YARN 的 NodeManager 会负责准备容器(例如,下载文件)。一旦这些完成了,ApplicationMaster (AM) 就启动了。

    JobManager 和 AM 运行在同一个容器中。一旦它们成功地启动了,AM 知道 JobManager 的地址(它自己)。它会为 TaskManager 生成一个新的 Flink 配置文件(这样它们才能连上 JobManager)。该文件也同样会上传到 HDFS。另外,AM 容器同时提供了 Flink 的 Web 界面服务。Flink 用来提供服务的端口是由用户 + 应用程序 id 作为偏移配置的。这使得用户能够并行执行多个 Flink YARN 会话。

    之后,AM 开始为 Flink 的 TaskManager 分配容器,这会从 HDFS 下载 jar 文件和修改过的配置文件。一旦这些步骤完成了,Flink 就安装完成并准备接受任务了。

     

     

     

    获取更多大数据资料,视频以及技术交流请加群:

     

     

     

    展开全文
  • flink实战--flinkSQL入门大全

    万次阅读 2018-11-12 18:07:03
    FlinkSQL概念介绍 Table API & SQL Apache Flink具有两个关系API - 表API和SQL - 用于统一流和批处理。Table API是Scala和Java的语言集成查询API,允许以非常直观的方式组合来自关系运算符的查询,Table API...

     

    扫一扫加入大数据公众号和技术交流群,了解更多大数据技术,还有免费资料等你哦

     

    FlinkSQL概念介绍

    Table API & SQL

                    Apache Flink具有两个关系API - 表API和SQL - 用于统一流和批处理。Table API是Scala和Java的语言集成查询API,允许以非常直观的方式组合来自关系运算符的查询,Table API和SQL接口彼此紧密集成,以及Flink的DataStream和DataSet API。您可以轻松地在基于API构建的所有API和库之间切换。例如,您可以使用CEP库从DataStream中提取模式,然后使用Table API分析模式,或者可以在预处理上运行Gelly图算法之前使用SQL查询扫描,过滤和聚合批处理表数据。

    TableEnvironment

    TableEnvironment是Table API和SQL集成的核心概念。它负责:

    1. 在内部目录中注册一个表
    2. 注册外部目录
    3. 执行SQL查询
    4. 注册用户定义的(标量,表或聚合)函数
    5. 把一个DataStreamDataSet转换为一个表Table
    6. 持有对ExecutionEnvironmentStreamExecutionEnvironment的引用

    创建一个TableEnvironment

               一个TableEnvironment是通过调用静态创建TableEnvironment.getTableEnvironment()用的方法StreamExecutionEnvironmentExecutionEnvironment与可选的TableConfig。该TableConfig可用于配置TableEnvironment或定制查询优化和翻译过程

    //获取table
    val tableEnv = TableEnvironment.getTableEnvironment(env)

    TableEnvironment中注册表

    TableEnvironment维护一个表的目录,这些表是按名称注册的。有两种类型的表、输入表和输出表。输入表可以在表API和SQL查询中引用,并提供输入数据。输出表可以用来将表API或SQL查询的结果发送到外部系统。可以从各种来源注册输入表:

    • 现有Table对象,通常是Table API或SQL查询的结果。
    • TableSource,访问外部数据,例如文件,数据库或消息传递系统
    •  DataStreamDataSet来自DataStream或DataSet程序。注册一个DataStreamDataSet

    一个输出表可以被注册使用TableSink

    val tableEnv = TableEnvironment.getTableEnvironment(env)
    
    val projTable: Table = tableEnv.scan("X").select(...)
    //注册表
    tableEnv.registerTable("projectedTable", projTable)

    注册一个TableSink

    一个已注册的表可以用来将表API或SQL查询的结果发送到外部存储系统,比如数据库、键值存储、消息队列或文件系统(在不同的编码中,例如CSV、Apache Parquet、Avro……)。

    说白了就是:table sink的作用就是如何将flink sql查询的数据保存到外部系统,如hdfs或者本地文件,数据库,hbase等。

    val tableEnv = TableEnvironment.getTableEnvironment(env)
    // create a TableSink
    val csvSink: TableSink = new CsvTableSink("/path/to/file", ...)
    // define the field names and types
    val fieldNames: Array[String] = Array("a", "b", "c")
    val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG)
    // register the TableSink as table "CsvSinkTable"
    tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink)

    sql环境需要的依赖

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-java-bridge_2.11</artifactId>
      <version>1.9.0</version>
      <scope>provided</scope>
    </dependency>
    <!-- or... -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
      <version>1.9.0</version>
      <scope>provided</scope>
    </dependency>

    此外,如果要在IDE中本地运行Table API和SQL程序,则必须添加以下一组模块之一,具体取决于要使用的计划程序:

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner_2.11</artifactId>
      <version>1.9.0</version>
      <scope>provided</scope>
    </dependency>
    <!-- or.. (for the new Blink planner) -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-blink_2.11</artifactId>
      <version>1.9.0</version>
      <scope>provided</scope>
    </dependency>

    在内部,表生态系统的一部分在Scala中实现。因此,请确保为批处理和流应用程序都添加以下依赖项:

    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-scala_2.11</artifactId>
      <version>1.9.0</version>
      <scope>provided</scope>
    </dependency>

    Flink1.9之后如何使用 Blink Planner

          在IDE环境里,只需要引入两个 Blink Planner 的相关依赖,就可以启用 Blink Planner。

    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
    <version>1.9.0</version>
    </dependency>
    
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.11</artifactId>
    <version>1.9.0</version>
    </dependency>
    

    对于流计算作业和批处理作业的配置非常类似,只需要在 EnvironmentSettings 中设置 StreamingMode 或 BatchMode 即可,流计算作业的设置如下:

    // **********************
    // BLINK STREAMING QUERY
    // **********************
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    
    StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
    StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
    // or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
    
    bsTableEnv.sqlUpdate(…);
    bsTableEnv.execute();
    

    批处理作业的设置如下 :

    // ******************
    // BLINK BATCH QUERY
    // ******************
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.TableEnvironment;
    
    EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
    TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
    bbTableEnv.sqlUpdate(…)
    bbTableEnv.execute()
    

    如果作业需要运行在集群环境,打包时将 Blink Planner 相关依赖的 scope 设置为 provided,表示这些依赖由集群环境提供。这是因为 Flink 在编译打包时, 已经将 Blink Planner 相关的依赖打包,不需要再次引入,避免冲突。

    社区长远计划

                            目前,TableAPI & SQL 已经成为 Flink API 的一等公民,社区也将投入更大的精力在这个模块。在不远的将来,待 Blink Planner 稳定之后,将会作为默认的 Planner ,而 Old Planner 也将会在合适的时候退出历史的舞台。目前社区也在努力赋予 DataStream 批处理的能力,从而统一流批技术栈,届时 DataSet API 也将退出历史的舞台

    SQL 语句

           FlinkSQL,它实现了SQL标准。SQL查询被指定为常规字符串。SQL文档描述了Flink对流式和批处理表的SQL支持。

    主要包括:sqlQuery和sqlUpdate 但是flink1.11做了Api的改变,使用executeSql代替sqlUpdate

    1. sqlQuery:主要用于sql查询
    2. sqlUpdate/executeSql:用于删除,更新,DDL等操作

    SQL API 改进

            随着 Flink SQL 支持的语句越来越丰富,老的 API 容易引起一些困惑:

    1. 原先的 sqlUpdate() 方法传递 DDL 语句会立即执行,而 INSERT INTO 语句在调用 execute 方法时才会执行

    2. Table 程序的执行入口不够清晰,像 TableEnvironment.execute() 和 StreamExecutionEnvironment.execute() 都可以触发 table 程序执行

    3. execute 方法没有返回值。像 SHOW TABLES 这样的语句没有很好地方式返回结果。另外,sqlUpdate 方法加入了越来越多的语句导致接口定义不清晰,sqlUpdate 可以执行 SHOW TABLES 就是一个反例

    4. 在 Blink planner 一直提供多 sink 优化执行的能力,但是在 API 层没有体现出来

    1.11 重新梳理了 TableEnv 上的 sql 相关接口,提供了更清晰的执行语义,同时执行任意 sql 语句现在都有返回值,用户可以通过新的 API 灵活的组织多行 sql 语句一起执行。

    更清晰的执行语义

               新的接口 TableEnvironment#executeSql 统一返回抽象 TableResult,用户可以迭代 TableResult 拿到执行结果。根据执行语句的不同,返回结果的数据结构也有变化,比如 SELECT 语句会返回查询结果,而 INSERT 语句会异步提交作业到集群。

    组织多条语句一起执行

    新的接口 TableEnvironment#createStatementSet 允许用户添加多条 INSERT 语句并一起执行,在多 sink 场景,Blink planner 会针对性地对执行计划做优化。

    新旧 API 对比

    sqlUpdate vs executeSql

    Current Interface

    New Interface

    tEnv.sqlUpdate("CREATE TABLE ...");

    TableResult result = tEnv.executeSql("CREATE TABLE ...");

    tEnv.sqlUpdate("INSERT INTO ... SELECT ...");

    tEnv.execute("test");

    TableResult result = tEnv.executeSql("INSERT INTO ... SELECT ...");

    execute vs createStatementSet

    Current Interface

    New Interface

    tEnv.sqlUpdate("insert into xx ...")

    tEnv.sqlUpdate("insert into yy ...")

    tEnv.execute("test")

    StatementSet ss = tEnv.createStatementSet();

    ss.addInsertSql("insert into xx ...");

    ss.addInsertSql("insert into yy ...");

    TableResult result = ss.execute();

     

    tEnv.insertInto("sink1", table1)

    tEnv.insertInto("sink2", table2)

    tEnv.execute("test")

    StatementSet ss = tEnv.createStatementSet();

    ss.addInsert("sink1", table1);

    ss.addInsert("sink2", table2);

    TableResult result = ss.execute()

    案例一:如何指定一个查询并将结果作为一张表返回

     val settings = EnvironmentSettings.newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build()
    
     val tEnv = StreamTableEnvironment.create(env, settings)
    val revenue = tableEnv.sqlQuery("""
      |SELECT cID, cName, SUM(revenue) AS revSum
      |FROM Orders
      |WHERE cCountry = 'FRANCE'
      |GROUP BY cID, cName
      """.stripMargin)
    

     

    FlinkSQL执行计划

    表API和SQL查询将转换为DataStreamDataSet程序,具体取决于它们的输入是流式还是批量输入。查询在内部表示为逻辑查询计划,并分为两个阶段:

    1. 优化逻辑计划,
    2. 转换为DataStream或DataSet程序。

    table与DataStream和DataSet API集成

                    表API和SQL查询可以轻松集成并嵌入到DataStreamDataSet程序中。例如,可以查询外部表(例如来自RDBMS),进行一些预处理,例如过滤,预测,聚合或加入元数据,然后使用DataStream或进一步处理数据。相反,Table API或SQL查询也可以应用于DataStream或DataSet程序的结果。这种相互作用可以通过将一个DataStream或DataSet转换为一个Table来实现,反之亦然。

    Scala的隐式转换

                      scala表API功能的隐式转换DataSetDataStream以及Table类。org.apache.flink.table.api.scala._除了org.apache.flink.api.scala._    Scala DataStream API 之外,还可以通过导入包来启用这些转换

    注:flink编程必须导入import org.apache.flink.api.scala._,flinkSQL编程必须导入import org.apache.flink.table.api._

    将DataStream或DataSet转换为表

    我们可以通过TableEnvironment将获得数据源的DataStream或DataSet转化成Table,在使用flinkSQL的时候这样将会十分便捷。

     EnvironmentSettings setting = EnvironmentSettings.newInstance()
                    .inStreamingMode()
                    .useBlinkPlanner()
                    .build();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,setting);
    DataStream<Tuple2<Long, String>> stream = ...
    Table table1 = tableEnv.fromDataStream(stream);
    Table table2 = tableEnv.fromDataStream(stream, "myLong, myString");
    //注册流表
    tableEnv.registerDataStream("userTable", table1);
    

    将Table转换为DataStream或DataSet

    一个Table可以转换为DataStreamDataSet。通过这种方式,可以在Table API或SQL查询的结果上运行自定义DataStream或DataSet程序。

    当转换一个TableDataStreamDataSet,需要指定将所得的数据类型DataStreamDataSet,即,数据类型到其中的行Table是要被转换。通常最方便的转换类型是Row。以下列表概述了不同选项的功能:

    1. ROW:字段按位置,任意数量的字段映射,支持null值,无类型安全访问。
    2. POJO:字段按名称映射(POJO字段必须命名为Table字段),任意数量的字段,支持null值,类型安全访问。
    3. 样例Case Class:字段按位置映射,不支持null值,类型安全访问。
    4. 元组:字段按位置映射,限制为22(Scala)或25(Java)字段,不支持null值,类型安全访问。
    5. 原子类型Table必须具有单个字段,不支持null值,类型安全访问。

    将表转换为DataStream

    一个Table是流媒体查询的结果将动态更新,即它正在改变,因为新记录的查询的输入流到达。因此,DataStream转换这种动态查询需要对表的更新进行编码。

    一个Table转换为一个DataStream有两种模式:

    1. 追加模式:只有在动态Table仅通过INSERT更改修改时才能使用此模式,即它仅附加并且以前发出的结果永远不会更新。
    2. 缩进模式:始终可以使用此模式。它用标志编码INSERTDELETE改变boolean
    val tableEnv = TableEnvironment.getTableEnvironment(env)
    // Table with two fields (String name, Integer age)
    val table: Table = ...
    // convert the Table into an append DataStream of Row
    val dsRow: DataStream[Row] = tableEnv.toAppendStream[Row](table)
    // convert the Table into an append DataStream of Tuple2[String, Int]
    val dsTuple: DataStream[(String, Int)] dsTuple = 
      tableEnv.toAppendStream[(String, Int)](table)
    val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStream[Row](table)

    将表转换为DataSet

    val tableEnv = TableEnvironment.getTableEnvironment(env)
    // Table with two fields (String name, Integer age)
    val table: Table = ...
    // convert the Table into a DataSet of Row
    val dsRow: DataSet[Row] = tableEnv.toDataSet[Row](table)
    // convert the Table into a DataSet of Tuple2[String, Int]
    val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet[(String, Int)](table)

    FlinkSQL语法

                SQL查询是使用sqlQuery()方法指定的TableEnvironment。该方法返回SQL查询的结果为Table。A Table可以在后续的SQL和Table API查询中使用,可以转换为DataSet或DataStream,也可以写入TableSink)。要访问SQL查询中的表,必须在TableEnvironment中注册它。可以从TableSourceTableDataStream或DataSet 注册表。或者,用户还可以在TableEnvironment中注册外部目录以指定数据源的位置。

    注意: Flink的SQL支持尚未完成。包含不受支持的SQL功能的查询会导致a TableException。以下部分列出了批处理和流表上SQL的受支持功能。

    FlinkSql  DDL功能

                 Flink1.9.x版本中,社区版本的 Flink 新增 了一个 SQL DDL 的新特性,但是暂时还不支持流式的一些概念的定义DDL是使用的sqlUpdate()方法指定的TableEnvironment。对于成功创建表,该方法不返回任何内容。一个Table可以注册到目录与一个CREATE TABLE声明,然后就可以在SQL查询的方法引用sqlQuery()TableEnvironment

    注意: Flink的DDL支持尚未完成。包含不受支持的SQL功能的查询会导致TableException。以下各节列出了批处理表和流表上SQL DDL的受支持功能。

    创建表

    CREATE TABLE [catalog_name.][db_name.]table_name
      [(col_name1 col_type1 [COMMENT col_comment1], ...)]
      [COMMENT table_comment]
      [PARTITIONED BY (col_name1, col_name2, ...)]
      WITH (key1=val1, key2=val2, ...)

    创建具有给定表属性的表。如果数据库中已经存在具有相同名称的表,则会引发异常。

    分区

    按指定的列对创建的表进行分区。如果将此表用作文件系统接收器,则会为每个分区创建一个目录。

    选项

    用于创建表源/接收器的表属性。这些属性通常用于查找和创建基础连接器。expression的键和值key1=val1都应为字符串文字。有关不同连接器的所有受支持表属性,

    注意:

    表名可以采用三种格式:

    1. catalog_name.db_name.table_name
    2. db_name.table_name
    3. table_name.

    对于catalog_name.db_name.table_name,该表将被注册到名为“ catalog_name”的目录和名为“ db_name”的数据库的元存储中;对于db_name.table_name,该表将被注册到执行表环境和名为“ db_name”的数据库的当前目录中;对于table_name,该表将被注册到执行表环境的当前目录和数据库中。

    注意:CREATE TABLE statement 注册的表既可以用作表源,也可以用作表接收器,在DML中引用它之前,我们无法确定是将其用作源还是接收器。

    Flink 1.11 DDL新写法

                       Flink 1.11 为了向前兼容性,依然保留了老 Source & Sink,使用 “connector.type” 的 Key,即可 Fallback 到老 Source & Sink 上。新 Source & Sink 使用标准姿势 (详见官方文档):

    CREATE TABLE kafka_table (
    ...
    ) WITH (
    'connector' = 'kafka-0.10',
    'topic' = 'test-topic',
    'scan.startup.mode' = 'earliest-offset',
    'properties.bootstrap.servers' = 'localhost:9092',
    'format' = 'json',
    'json.fail-on-missing-field' = 'false'
    );

    DDL案例:使用DDL语句创建kafka源表

    kafka中的数据格式:

    {"name":"张三","age":"18"}
    {"name":"李四","age":"20"}

    案例代码

    public class DDL {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            EnvironmentSettings setting = EnvironmentSettings.newInstance()
                    .inStreamingMode()
                    .useBlinkPlanner()
                    .build();
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,setting);
            String createTable=
                    "CREATE TABLE user_info (" +
                            " name VARCHAR COMMENT '姓名'," +
                            " age VARCHAR COMMENT '年龄')" +
                            " WITH ( " +
                            " 'connector.type' = 'kafka'," +
                            " 'connector.version' = '2.11'," +
                            " 'connector.topic' = 'test',"+
                            " 'connector.startup-mode' = 'latest-offset',"+
                            " 'connector.properties.1.key' = 'bootstrap.servers',"+
                            " 'connector.properties.1.value' = '127.0.0.1',"+
                            " 'update-mode' = 'append',"+
                            " 'format.type' = 'json',"+
                            " 'format.derive-schema' = 'true'"+
                            ") ";
            tableEnv.sqlUpdate(createTable);
            String query="SELECT name,age FROM user_info ";
            Table table = tableEnv.sqlQuery(query);
            DataStream<Row> rowDataStream = tableEnv.toAppendStream(table, Row.class);
            rowDataStream.print();
            env.execute("test");
        }
    }

    案例二:使用flinkSQL,获取文本中的用户的姓名

    数据准备:创建一个person.txt,内容如下

    kebe men
    wede men
    baby wemen
    james men

    代码

    
    object sql_test {
      def main(args: Array[String]): Unit = {
        //获取执行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        //获取table
         val settings = EnvironmentSettings.newInstance()
                    .useBlinkPlanner()
                    .inStreamingMode()
                    .build()
        val tEnv = StreamTableEnvironment.create(env, settings)
        //读取数据源
        val source1 = env.readTextFile("C:/flink_data/person.txt")
        val source2: DataStream[Person1] = source1.map(x=>{
          val split = x.split(" ")
          ( Person1(split(0),split(1)))
        })
        //将DataStream转化成Table
        val table1 = tableEnv.fromDataStream(source2)
        //注册表,表名为:person
        tableEnv.registerTable("person",table1)
        //获取表中所有信息
        val rs: Table = tableEnv.sqlQuery("select *  from person ")
        val stream: DataStream[String] = rs
        //过滤获取name这一列的数据
          .select("name")
          //将表转化成DataStream
          .toAppendStream[String]
         stream.print()
        env.execute("flinkSQL")
      }
    }
    
    /**
      * 定义样例类封装数据
      */
    case class  Person1(name:String ,score:String)

    案例三:双流join

    数据准备

    数据一

    2016-07-28 13:00:01.820,000001,10.2
    2016-07-28 13:00:01.260,000001,10.2
    2016-07-28 13:00:02.980,000001,10.1
    2016-07-28 13:00:04.330,000001,10.0
    2016-07-28 13:00:05.570,000001,10.0
    2016-07-28 13:00:05.990,000001,10.0
    2016-07-28 13:00:14.000,000001,10.1
    2016-07-28 13:00:20.000,000001,10.2

    数据二

    2016-07-28 13:00:01.000,000001,10.2
    2016-07-28 13:00:04.000,000001,10.1
    2016-07-28 13:00:07.000,000001,10.0
    2016-07-28 13:00:16.000,000001,10.1

    代码

    object JoinDemo {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        //获取接口传送的数据
        val dataStream1 = env.readTextFile("C:/flink_data/scoket1.txt")
        val dataStream2 = env.readTextFile("C:/flink_data/scoket2.txt")
        val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
       //使用样例类StockTransaction封装获取的数据
        val dataStreamMap1 = dataStream1.map(f => {
          val tokens1 = f.split(",")
          StockTransaction(tokens1(0), tokens1(1), tokens1(2).toDouble)
        })
          .assignAscendingTimestamps(f => format.parse(f.tx_time).getTime)
        //使用样例类StockSnapshot封装获取的数据
        val dataStreamMap2 = dataStream2.map(f => {
          val tokens2 = f.split(",")
          StockSnapshot(tokens2(0), tokens2(1), tokens2(2).toDouble)
        })
          .assignAscendingTimestamps(f => format.parse(f.md_time).getTime)
    
        /**
          * 进行双流join
          * 限定范围是:3秒钟的Event time时间窗口
          */
    
        val joinStream = dataStreamMap1.coGroup(dataStreamMap2)
          .where(_.tx_code)
          .equalTo(_.md_code)
          .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    
          val innerJoinStream = joinStream.apply(new InnerJoinFunction)
         innerJoinStream.name("innerJoin").print()
        print("===================== end =========================")
        env.execute("join demo")
      }
    
    }
    case class StockTransaction(tx_time:String, tx_code:String,tx_value:Double)
    case class StockSnapshot(md_time:String, md_code:String,md_value:Double)
    class InnerJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)]{
      override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double, Double,String)]): Unit = {
    
        /**
          * 将Java中的Iterable对象转换为Scala的Iterable
          * scala的集合操作效率高,简洁
          */
        import scala.collection.JavaConverters._
        val scalaT1 = T1.asScala.toList
        val scalaT2 = T2.asScala.toList
    
        /**
          * Inner Join要比较的是同一个key下,同一个时间窗口内的数据
          */
        if(scalaT1.nonEmpty && scalaT2.nonEmpty){
          for(transaction <- scalaT1){
            for(snapshot <- scalaT2){
              out.collect(transaction.tx_code,transaction.tx_time, snapshot.md_time,transaction.tx_value,snapshot.md_value,"Inner Join Test")
            }
          }
        }
      }
    }
    
    class LeftJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)] {
      override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double,Double,String)]): Unit = {
        /**
          * 将Java中的Iterable对象转换为Scala的Iterable
          * scala的集合操作效率高,简洁
          */
        import scala.collection.JavaConverters._
        val scalaT1 = T1.asScala.toList
        val scalaT2 = T2.asScala.toList
    
        /**
          * Left Join要比较的是同一个key下,同一个时间窗口内的数据
          */
        if(scalaT1.nonEmpty && scalaT2.isEmpty){
          for(transaction <- scalaT1){
            out.collect(transaction.tx_code,transaction.tx_time, "",transaction.tx_value,0,"Left Join Test")
          }
        }
      }
    }
    class RightJoinFunction extends CoGroupFunction[StockTransaction,StockSnapshot,(String,String,String,Double,Double,String)] {
      override def coGroup(T1: java.lang.Iterable[StockTransaction], T2: java.lang.Iterable[StockSnapshot], out: Collector[(String, String, String, Double,Double,String)]): Unit = {
        /**
          * 将Java中的Iterable对象转换为Scala的Iterable
          * scala的集合操作效率高,简洁
          */
        import scala.collection.JavaConverters._
        val scalaT1 = T1.asScala.toList
        val scalaT2 = T2.asScala.toList
    
        /**
          * Right Join要比较的是同一个key下,同一个时间窗口内的数据
          */
        if(scalaT1.isEmpty && scalaT2.nonEmpty){
          for(snapshot <- scalaT2){
            out.collect(snapshot.md_code, "",snapshot.md_time,0,snapshot.md_value,"Right Join Test")
          }
        }
      }
    }
    
    

    运行结果

     

     

     

     

     

     

     

     

     

    展开全文
  • 阿龙学堂-Flink简介

    万次阅读 多人点赞 2018-03-29 19:57:18
    1.Flink的引入 这几年大数据的飞速发展,出现了很多热门的开源社区,其中著名的有Hadoop、Storm,以及后来的Spark,他们都有着各自专注的应用场景。Spark掀开了内存计算的先河,也以内存为赌注,赢得了内存计算的...
  • Flink 专题 -1 搭建FlinkFlink 简介

    千次阅读 2018-11-06 22:46:28
    文章目录Flink 专题1 : 搭建FlinkFlink 简介Flink 简介Flink 的优势:Flink 安装flink 安装步骤flink 集群模式 结构 :配置文件设置:添加jobManager/TaskManager启动集群1 集群模式启动2. yarn 模式启动 Flink ...
  • Flink 快速入门

    千人学习 2019-01-02 11:26:54
    通过本课程的学习,既能获得Flink企业级真实项目经验,也能深入掌握Flink的核心理论知识,还能获得Flink在生产环境中安装、部署、监控的宝贵经验,从而一站式全面、深入掌握Flink技术。 任务作业: 1.使用Eclipse...
  • Flink教程

    千次阅读 2020-02-27 15:39:05
    Flink 学习路线系列 Flink笔记(一):Flink介绍 Flink笔记(二):Flink环境搭建(standalone模式) Flink笔记(三):Flink 提交任务的两种方式 Flink笔记(四):Java 编写Flink实时任务(WordCount 示例) Flink笔记(五):...
  • flink实战--flink整合kafka

    万次阅读 2018-12-26 22:06:30
    Flink Kafka Consumer集成了Flink的检查点机制,可提供一次性处理语义。为实现这一目标,Flink并不完全依赖kafka的消费者群体偏移跟踪,而是在内部跟踪和检查这些偏移。 Maven 依赖 支持到的版本 生产者和...
  • Flink示例——Flink-CDC

    千次阅读 2020-09-18 23:28:39
    文章目录Flink示例——State、Checkpoint、Savepoint版本信息Mavan依赖主从同步配置、数据准备使用Flink-CDC Flink示例——State、Checkpoint、Savepoint 版本信息 产品 版本 Flink 1.11.1 flink-cdc-...
  • Flink:Flink-SQL开发

    千次阅读 2020-07-27 18:08:40
    Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。 自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对...
  • 最近也是由于电脑出了一点问题,就没有更新,今天主要来介绍一下Flink的web ui的使用,我们先提交一个job上去(我的集群是on yarn的),命令如下: flink run -m yarn-cluster -c flink.window.FlinkWindowDemo -yn 2 -ys ...
  • Flink SQL Demo 为切入,结合调试过程,深入理解 Flink Streaming SQL flink 语法扩展 Flink SQL 引擎:Calcite 简述 Flink Table/SQL 执行流程 以 Flink SQL Demo 为切入,结合调试过程,深入理...
  • 快速入门Flink (1) —— Flink的简介与架构体系

    千次阅读 多人点赞 2020-07-16 16:30:54
    文章目录一、Flink 的简介1.1 Flink的引入1.2 什么是 Flink1.3 Flink 流处理特性1.4 Flink 基石1.5 批处理与流处理 一、Flink 的简介 1.1 Flink的引入         这几年...
  • flink实战--flink集群的搭建与部署

    万次阅读 2019-01-17 16:44:09
    flink实战案例一:flink集群的搭建与部署 1.下载Flink压缩包 下载地址:http://flink.apache.org/downloads.html 根据集群环境的情况下载相应的版本的flink压缩包 hadoop2.6,Scala2.11,所以下载:flink-1.5.0-bin-...
  • 这篇文章主要用来记录一下Flink中的常见的报错以及解决方案(以后会持续更新) 1,Table is not an append-only table. Use the toRetractStream() in order to handle add and retract messages. 这个是因为动态表...
  • 一文弄懂Flink基础理论

    万次阅读 多人点赞 2019-10-22 20:03:58
    文章目录Flink概述Flink生态为什么选择Flink?系统架构JobManager运行架构常用的类型和操作程序结构介绍并行数据流Task and Operator Chains核心原理Window&TimeWindowTimeState状态管理按组织形式的划分按照数据...
  • Flink 原理与实现

    万次阅读 2020-10-21 20:00:06
    Flink 为流处理和批处理分别提供了 DataStream API 和 DataSet API。正是这种高层的抽象和 flunent API 极大地便利了用户编写大数据应用。 不过很多初学者在看到官方 Streaming 文档中那一大坨的转换时,常常会蒙了...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 50,862
精华内容 20,344
关键字:

flink