flink 订阅
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。 [1] 展开全文
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。 [1]
信息
稳定版本
Apache软件基金会 [1]
类    型
1.8.0 [1]
中文名
弗林克
外文名
Flink [1]
flink开发
Apache Flink是由Apache软件基金会内的Apache Flink社区基于Apache许可证2.0开发的,该项目已有超过100位代码提交者和超过460贡献者。 [2]  是由Apache Flink的创始人创建的公司。目前,该公司已聘用了12个Apache Flink的代码提交者。 [3] 
收起全文
精华内容
下载资源
问答
  • Flink

    万次阅读 2019-08-14 19:43:41
    Flink 一、简介 Flink核心是一个流式的数据流执行引擎,其针对数据流的分布式计算提供了数据分布、数据通信以及容错机制等功能。基于流执行引擎,Flink提供了诸多更高抽象层的API以便用户编写分布式任务: ​ ...

    Flink

    一、简介

    Flink核心是一个流式的数据流执行引擎,其针对数据流的分布式计算提供了数据分布、数据通信以及容错机制等功能。基于流执行引擎,Flink提供了诸多更高抽象层的API以便用户编写分布式任务:

    ​ DataSet API, 对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用Flink提供的各种操作符对分布式数据集进行处理,支持Java、Scala和Python。

    ​ DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持Java和Scala。

    ​ Table API,对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过类SQL的DSL对关系表进行各种查询操作,支持Java和Scala。

    此外,Flink还针对特定的应用领域提供了领域库,例如:

    ​ Flink ML,Flink的机器学习库,提供了机器学习Pipelines API并实现了多种机器学习算法。

    ​ Gelly,Flink的图计算库,提供了图计算的相关API及多种图计算算法实现。

    二、优点

    Flink 是一个开源的分布式流式处理框架:

    ​ ①提供准确的结果,甚至在出现无序或者延迟加载的数据的情况下。

    ​ ②它是状态化的容错的,同时在维护一次完整的的应用状态时,能无缝修复错误。

    ​ ③大规模运行,在上千个节点运行时有很好的吞吐量和低延迟。

    Flink处理不同数据分类:

    ​ ①有界数据流–Flink批处理–DataSet–groupBy

    ​ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    ​ ②无界数据流–Flink流处理–DataStream–keyBy–流处理默认一条条数据处理

    ​ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    ​ ③有/无界数据流–Flink SQL

    Flink & Storm & SparkStreaming 流处理框架的区别:

    ​ •Strom:纯实时处理数据,吞吐量小 --水龙头滴水

    ​ •SparkStreaming : 准实时处理数据,微批处理数据,吞吐量大 --河道中开闸关闸

    ​ •Flink:纯实时处理数据,吞吐量大 --河流远远不断

    三、注意点

    Flink程序的执行过程:
    	获取flink的执行环境(execution environment)
    	加载数据				-- soure
    	对加载的数据进行转换 		-- transformation
    	对结果进行保存或者打印	    -- sink
    	触发flink程序的执行(execute(),count(),collect(),print()),例如:调用						ExecutionEnvironment或者StreamExecutionEnvironment的execute()方法。
    	
    
    
    1.创建环境
          批处理:ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
          流处理:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          
    2.在批处理中Flink处理的数据对象是DataSet
        在流处理中Flink处理的数据对象时DataStream
    
    3.代码流程必须符合 source ->transformation -> sink
          transformation 都是懒执行,需要最后使用env.execute()触发执行或者使用 print() 					,count(),collect() 触发执行
    
    4.Flink编程不是基于K,V格式的编程,通过某些方式来指定虚拟key
    
    5.Flink中的tuple最多支持25个元素,每个元素是从0开始
    

    四、WordCount

    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.operators.Order;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.*;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.core.fs.FileSystem;
    import org.apache.flink.util.Collector;
    
    public class WordCount {
        public static void main(String[] args) throws Exception{
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSource<String> dataSource = env.readTextFile("./data/words");
            FlatMapOperator<String, String> words = dataSource.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public void flatMap(String line, Collector<String> collector) throws Exception {
                    String[] split = line.split(" ");
                    for(String word : split){
                        collector.collect(word);
                    }
                }
            });
    
            MapOperator<String, Tuple2<String, Integer>> pairWords = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String word) throws Exception {
                    return new Tuple2<>(word, 1);
                }
            });
    
            UnsortedGrouping<Tuple2<String, Integer>> groupBy = pairWords.groupBy(0);
            AggregateOperator<Tuple2<String, Integer>> result = groupBy.sum(1).setParallelism(1);
            SortPartitionOperator<Tuple2<String, Integer>> sortResult = result.sortPartition(1, Order.DESCENDING);
    
            sortResult.writeAsCsv("./data/result/r2","\n","&", FileSystem.WriteMode.OVERWRITE);
            env.execute("myflink");
            result.print();
        }
    }
    

    五、排序

    在Flink中,排序是默认在各个分区中进行的排序,所以最终结果在默认情况下是不能进行排序展示的

    这时, ①可以给相关的算子进行设计–sum().setParallelism(1)

    AggregateOperator<Tuple2<String, Integer>> result = groupBy.sum(1).setParallelism(1);
    

    ​ ②全局设计,但是全局设计限制分区数,影响框架运行速度,通常不建议–env. setParallelism(1)

    六、source数据源和Sink源

    1 数据源Source

    ​ Source 是Flink 获取数据的地方。以下source 中和批处理的source 类似,但是以下
    源作为dataStream 流处理时,是一条条处理,最终得到的不是一个总结果,而是每
    次处理后都会得到一个结果。
    ​ 1).socketTextStream – 读取Socket 数据流
    ​ 2). readTextFile() – 逐行读取文本文件获取数据流,每行都返回字符串。
    ​ 3).fromCollection() – 从集合中创建数据流。
    ​ 4).fromElements – 从给定的数据对象创建数据流,所有数据类型要一致。
    ​ 5).addSource – 添加新的源函数,例如从kafka 中读取数据,参见读取kafka 数据案例。

    2 数据写出Sink

    ​ 1).writeAsText() – 以字符串的形式逐行写入文件,调用每个元素的toString()得到写
    入的字符串。
    ​ 2).writeAsCsv() – 将元组写出以逗号分隔的csv 文件。注意:只能作用到元组数据上。
    ​ 3).print() – 控制台直接输出结果,调用对象的toString()方法得到输出结果。
    ​ 4).addSink() – 自定义接收函数。例如将结果保存到kafka 中,参见kafka 案例。

    七、累加器&计数器

    Flink 累加器

    ​ Accumulator即累加器,可以在分布式统计数据,只有在任务结束之后才能获取累加器的最终结果。计数器是累加器的具体实现,有:IntCounter,LongCounter和DoubleCounter。

    累加器注意事项

    ​ 1.需要在算子内部创建累加器对象

    ​ 2.通常在Rich函数中的open方法中注册累加器,指定累加器的名称

    ​ 3.在当前算子内任意位置可以使用累加器

    ​ 4.必须当任务执行结束后,通过env.execute(xxx)执行后的JobExecutionResult对象获取累加器的值。

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    DataSource<String> dataSource = env.fromElements("a", "b", "c", "d", "e", "f");
    MapOperator<String, String> map = dataSource.map(new RichMapFunction<String, String>(){
    
    //1.创建累加器,在算子中创建累加器对象
    	private IntCounter numLines = new IntCounter();
    
    //2.注册累加器对象,通常在Rich 函数的open 方法中使用
    // getRuntimeContext().addAccumulator("num-lines", this.numLines);注册累加器
        public void open(Configuration parameters) throws Exception {
            getRuntimeContext().addAccumulator("num-lines", this.numLines);
    	}
    
        @Override
        public String map(String s) throws Exception {
        //3.使用累加器,可以在任意操作中使用,包括在open 或者close 方法中
            this.numLines.add(1);
            return s;
        }
        }).setParallelism(8);
        map.writeAsText("./TempResult/result",FileSystem.WriteMode.OVERWRITE);
        JobExecutionResult myJobExecutionResult = env.execute("IntCounterTest");
        
        //4.当作业执行完成之后,在JobExecutionResult 对象中获取累加器的值。
            int accumulatorResult = myJobExecutionResult.getAccumulatorResult("num-lines");
            System.out.println("accumulator value = "+accumulatorResult);
    

    八、指定虚拟Key

    Apache Flink 指定虚拟key

    ​ 1.使用Tuples来指定key

    ​ 2.使用Field Expression来指定key

    ​ 3.使用Key Selector Functions来指定key

    ​ 操作(reduce,groupReduce,Aggregate,Windows)允许数据在处理之前根据key 进行分组。
    在Flink 中数据模型不是基于Key,Value 格式处理的,因此不需将数据处理成键值对的格
    式,key 是“虚拟的”,可以人为的来指定,实际数据处理过程中根据指定的key 来对数
    据进行分组,DataSet 中使用groupBy 来指定key,DataStream 中使用keyBy 来指定key。
    如何指定keys?

    1. 使用Tuples 来指定key
      定义元组来指定key 可以指定tuple 中的第几个元素当做key,或者指定tuple 中的
      联合元素当做key。需要使用org.apache.flink.api.java.tuple.TupleXX 包下的tuple,最多
      支持25 个元素且Tuple 必须new 创建。如果Tuple 是嵌套的格式, 例如:
      DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds,如果指定keyBy(0)则会使
      用整个内部的Tuple2 作为key。如果想要使用内部Tuple2 中的Float 格式当做key,
      就要使用第二种方式Field Expression 来指定key。
    2. 使用Field Expression 来指定key
      可以使用Field Expression 来指定key,一般作用的对象可以是类对象,或者嵌套的
      Tuple 格式的数据。
      使用注意点:
      a) 对于类对象可以使用类中的字段来指定key。
      类对象定义需要注意:
      i. 类的访问级别必须是public
      ii. 必须写出默认的空的构造函数
      iii. 类中所有的字段必须是public 的或者必须有getter,setter 方法。例如类
      中有个字段是foo,那么这个字段的getter,setter 方法为:getFoo() 和
      setFoo().
      iv. Flink 必须支持字段的类型。一般类型都支持
      b) 对于嵌套的Tuple 类型的Tuple 数据可以使用”xx.f0”表示嵌套tuple 中第一个元
      素, 也可以直接使用”xx.0” 来表示第一个元素, 参照案例
      GroupByUseFieldExpressions。
    3. 使用Key Selector Functions 来指定key
      使用key Selector 这种方式选择key,非常方便,可以从数据类型中指定想要的key.

    九、Flink & Kafka

    IDEA代码—MyFlinkCode

    1 Flink消费kafka数据起始offset配置

    ​ \1.flinkKafkaConsumer.setStartFromEarliest()

    从topic的最早offset位置开始处理数据,如果kafka中保存有消费者组的消费位置将被忽略。

    ​ \2. flinkKafkaConsumer.setStartFromLatest()

    从topic的最新offset位置开始处理数据,如果kafka中保存有消费者组的消费位置将被忽略。

    ​ \3. flinkKafkaConsumer.setStartFromTimestamp(…)

    从指定的时间戳(毫秒)开始消费数据,Kafka中每个分区中数据大于等于设置的时间戳的数据位置将被当做开始消费的位置。如果kafka中保存有消费者组的消费位置将被忽略。

    ​ \4. flinkKafkaConsumer.setStartFromGroupOffsets()

    默认的设置。根据代码中设置的group.id设置的消费者组,去kafka中或者zookeeper中找到对应的消费者offset位置消费数据。如果没有找到对应的消费者组的位置,那么将按照auto.offset.reset设置的策略读取offset。

    2 Flink消费kafka数据,消费者offset提交配置

    ​ 1.Flink提供了消费kafka数据的offset如何提交给Kafka或者zookeeper(kafka0.8之前)的配置。注意,Flink并不依赖提交给Kafka或者zookeeper中的offset来保证容错。提交的offset只是为了外部来查询监视kafka数据消费的情况

    ​ 2.配置offset的提交方式取决于是否为job设置开启checkpoint。可以使用env.enableCheckpointing(5000)来设置开启checkpoint。

    3.关闭checkpoint:

    ​ 如何禁用了checkpoint,那么offset位置的提交取决于Flink读取kafka客户端的配置,enable.auto.commit ( auto.commit.enable【Kafka 0.8】)配置是否开启自动提交offset, auto.commit.interval.ms决定自动提交offset的周期。

    4.开启checkpoint:

    ​ 如果开启了checkpoint,那么当checkpoint保存状态完成后,将checkpoint中保存的offset位置提交到kafka。这样保证了Kafka中保存的offset和checkpoint中保存的offset一致,可以通过配置setCommitOffsetsOnCheckpoints(boolean)来配置是否将checkpoint中的offset提交到kafka中(默认是true)。如果使用这种方式,那么properties中配置的kafka offset自动提交参数enable.auto.commit和周期提交参数auto.commit.interval.ms参数将被忽略。

    3 Flink中外部状态实现两阶段提交

    ​ Flink外部状态实现两阶段提交将逻辑封装到TwoPhaseComitSinkFunction类中,需要扩展TwoPhaseCommitSinkFunction来实现仅一次消费数据。若要实现支持exactly-once语义的自定义sink,需要实现以下4个方法:

    \1. beginTransaction:开启一个事务,创建一个临时文件,将数据写入到临时文件中

    \2. preCommit:在pre-commit阶段,flush缓存数据到磁盘,然后关闭这个文件,确保不会 有新的数据写入到这个文件,同时开启一个新事务执行属于下一个checkpoint的写入 操作。

    \3. commit:在commit阶段,我们以原子性的方式将上一阶段的文件写入真正的文件目 录下。【注意:数据有延时,不是实时的】。

    \4. abort:一旦异常终止事务,程序如何处理。这里要清除临时文件。

    exactly-once语义的自定义sink,需要实现以下4个方法:

    \1. beginTransaction:开启一个事务,创建一个临时文件,将数据写入到临时文件中

    \2. preCommit:在pre-commit阶段,flush缓存数据到磁盘,然后关闭这个文件,确保不会 有新的数据写入到这个文件,同时开启一个新事务执行属于下一个checkpoint的写入 操作。

    \3. commit:在commit阶段,我们以原子性的方式将上一阶段的文件写入真正的文件目 录下。【注意:数据有延时,不是实时的】。

    \4. abort:一旦异常终止事务,程序如何处理。这里要清除临时文件。

    在这里插入图片描述

    展开全文
  • Flink 最锋利的武器:Flink SQL 入门和实战

    万次阅读 多人点赞 2019-06-21 00:00:00
    一、Flink SQL 背景Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。自 2015...

    学习路径:《2021年最新从零到大数据专家学习路径指南》

    面      试:《2021年最新版大数据面试题全面开启更新》

    【注意】:Flink1.9版本后的Flink SQL使用看这里:

    Flink 最锋利的武器:Flink SQL 入门和实战(1.9版本及以后)

    一、Flink SQL 背景

    Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。

    自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初将最终代码开源,也就是我们熟知的 Blink。Blink 在原来的 Flink 基础上最显著的一个贡献就是 Flink SQL 的实现。

    Flink SQL 是面向用户的 API 层,在我们传统的流式计算领域,比如 Storm、Spark Streaming 都会提供一些 Function 或者 Datastream API,用户通过 Java 或 Scala 写业务逻辑,这种方式虽然灵活,但有一些不足,比如具备一定门槛且调优较难,随着版本的不断更新,API 也出现了很多不兼容的地方。

    640?wx_fmt=png

    在这个背景下,毫无疑问,SQL 就成了我们最佳选择,之所以选择将 SQL 作为核心 API,是因为其具有几个非常重要的特点:

    • SQL 属于设定式语言,用户只要表达清楚需求即可,不需要了解具体做法;

    • SQL 可优化,内置多种查询优化器,这些查询优化器可为 SQL 翻译出最优执行计划;

    • SQL 易于理解,不同行业和领域的人都懂,学习成本较低;

    • SQL 非常稳定,在数据库 30 多年的历史中,SQL 本身变化较少;

    • 流与批的统一,Flink 底层 Runtime 本身就是一个流与批统一的引擎,而 SQL 可以做到 API 层的流与批统一。

    二、Flink 的最新特性(1.7.0 和 1.8.0 更新)

    2.1 Flink 1.7.0 新特性

    在 Flink 1.7.0 中,我们更接近实现快速数据处理和以无缝方式为 Flink 社区构建数据密集型应用程序的目标。最新版本包括一些新功能和改进,例如对 Scala 2.12 的支持、一次性 S3 文件接收器、复杂事件处理与流 SQL 的集成等。

    Apache Flink 中对 Scala 2.12 的支持(FLINK-7811)

    Apache Flink 1.7.0 是第一个完全支持 Scala 2.12 的版本。这允许用户使用较新的 Scala 版本编写 Flink 应用程序并利用 Scala 2.12 生态系统。

    状态演进(FLINK-9376)

    许多情况下,由于需求的变化,长期运行的 Flink 应用程序需要在其生命周期内发展。在不失去当前应用程序进度状态的情况下更改用户状态是应用程序发展的关键要求。使用 Flink 1.7.0,社区添加了状态演变,允许您灵活地调整长时间运行的应用程序的用户状态模式,同时保持与以前保存点的兼容性。通过状态演变,可以在状态模式中添加或删除列,以便更改应用程序部署后应用程序捕获的业务功能。现在,使用 Avro 生成时,状态模式演变现在可以立即使用作为用户状态的类,这意味着可以根据 Avro 的规范来演变国家的架构。虽然 Avro 类型是 Flink 1.7 中唯一支持模式演变的内置类型,但社区仍在继续致力于在未来的 Flink 版本中进一步扩展对其他类型的支持。

    MATCH RECOGNIZE Streaming SQL 支持(FLINK-6935)

    这是 Apache Flink 1.7.0 的一个重要补充,它为 Flink SQL 提供了 MATCH RECOGNIZE 标准的初始支持。此功能结合了复杂事件处理(CEP)和 SQL,可以轻松地对数据流进行模式匹配,从而实现一整套新的用例。此功能目前处于测试阶段,因此我们欢迎社区提供任何反馈和建议。

    流式 SQL 中的时态表和时间连接(FLINK-9712)

    时态表是 Apache Flink 中的一个新概念,它为表的更改历史提供(参数化)视图,并在特定时间点返回表的内容。例如,我们可以使用具有历史货币汇率的表格。随着时间的推移,这种表格不断增长/发展,并且增加了新的更新汇率。时态表是一种视图,可以将这些汇率的实际状态返回到任何给定的时间点。使用这样的表,可以使用正确的汇率将不同货币的订单流转换为通用货币。时间联接允许使用不断变化/更新的表来进行内存和计算有效的流数据连接。

    Streaming SQL 的其他功能

    除了上面提到的主要功能外,Flink 的 Table&SQL API 已经扩展到更多用例。以下内置函数被添加到 API:TO_BASE64、LOG2、LTRIM、REPEAT、REPLACE、COSH、SINH、TANH SQL Client 现在支持在环境文件和 CLI 会话中定义视图。此外,CLI 中添加了基本的 SQL 语句自动完成功能。社区添加了一个 Elasticsearch 6 表接收器,允许存储动态表的更新结果。

    Kafka 2.0 连接器(FLINK-10598)

    Apache Flink 1.7.0 继续添加更多连接器,使其更容易与更多外部系统进行交互。在此版本中,社区添加了 Kafka 2.0 连接器,该连接器允许通过一次性保证读取和写入 Kafka 2.0。

    本地恢复(FLINK-9635)

    Apache Flink 1.7.0 通过扩展 Flink 的调度来完成本地恢复功能,以便在恢复时考虑以前的部署位置。如果启用了本地恢复,Flink 将保留最新检查点的本地副本任务运行的机器。通过将任务调度到以前的位置,Flink 将通过从本地磁盘读取检查点状态来最小化恢复状态的网络流量。此功能大大提高了恢复速度。

    2.2 Flink 1.8.0 新特性

    Flink 1.8.0 引入对状态的清理

    使用 TTL(生存时间)连续增量清除旧的 Key 状态 Flink 1.8 引入了对 RocksDB 状态后端(FLINK-10471)和堆状态后端(FLINK-10473)的旧数据的连续清理。这意味着旧的数据将(根据 TTL 设置)不断被清理掉。

    新增和删除一些 Table API

    1) 引入新的 CSV 格式符(FLINK-9964)

    此版本为符合 RFC4180 的 CSV 文件引入了新的格式符。新描述符可以使用 org.apache.flink.table.descriptors.Csv。目前,只能与 Kafka 一起使用。旧描述符 org.apache.flink.table.descriptors.OldCsv 用于文件系统连接器。

    2) 静态生成器方法在 TableEnvironment(FLINK-11445)上的弃用

    为了将 API 与实际实现分开,TableEnvironment.getTableEnvironment() 不推荐使用静态方法。现在推荐使用 Batch/StreamTableEnvironment.create()。

    3) 表 API Maven 模块中的更改(FLINK-11064)

    之前具有 flink-table 依赖关系的用户需要更新其依赖关系 flink-table-planner,以及正确的依赖关系 flink-table-api-*,具体取决于是使用 Java 还是 Scala: flink-table-api-java-bridge 或者 flink-table-api-scala-bridge。

    Kafka Connector 的修改

    引入可直接访问 ConsumerRecord 的新 KafkaDeserializationSchema(FLINK-8354),对于 FlinkKafkaConsumers 推出了一个新的 KafkaDeserializationSchema,可以直接访问 KafkaConsumerRecord。

    三、Flink SQL 的编程模型

    Flink 的编程模型基础构建模块是流(streams)与转换 (transformations),每一个数据流起始于一个或多个 source,并终止于一个或多个 sink。

    640?wx_fmt=png

    相信大家对上面的图已经十分熟悉了,当然基于 Flink SQL 编写的 Flink 程序也离不开读取原始数据,计算逻辑和写入计算结果数据三部分。

    一个完整的 Flink SQL 编写的程序包括如下三部分:

    • Source Operator:Soruce operator 是对外部数据源的抽象, 目前 Apache Flink 内置了很多常用的数据源实现例如 MySQL、Kafka 等;

    • Transformation Operators:算子操作主要完成例如查询、聚合操作等,目前 Flink SQL 支持了 Union、Join、Projection、Difference、Intersection 及 window 等大多数传统数据库支持的操作;

    • Sink Operator:Sink operator 是对外结果表的抽象,目前 Apache Flink 也内置了很多常用的结果表的抽象,比如 Kafka Sink 等

    我们通过用一个最经典的 WordCount 程序作为入门,看一下传统的基于 DataSet/DataStream API 开发和基于 SQL 开发有哪些不同?

    • DataStream/DataSetAPI

     
    • Flink SQL

    //省略掉初始化环境等公共代码
    SELECT word, COUNT(word) FROM table GROUP BY word;
    

    我们已经可以直观体会到,SQL 开发的快捷和便利性了。

    四、Flink SQL 的语法和算子

    4.1 Flink SQL 支持的语法

    Flink SQL 核心算子的语义设计参考了 1992、2011 等 ANSI-SQL 标准,Flink 使用 Apache Calcite 解析 SQL ,Calcite 支持标准的 ANSI SQL。

    那么 Flink 自身支持的 SQL 语法有哪些呢?

    insert:
    INSERT INTO tableReference
    query
    
    query:
    values
      | {
    select
          | selectWithoutFrom
          | query UNION [ ALL ] query
          | query EXCEPT query
          | query INTERSECT query
        }
        [ ORDER BY orderItem [, orderItem ]* ]
        [ LIMIT { count | ALL } ]
        [ OFFSET start { ROW | ROWS } ]
        [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
    
    orderItem:
      expression [ ASC | DESC ]
    
    select:
    SELECT [ ALL | DISTINCT ]
      { * | projectItem [, projectItem ]* }
    FROM tableExpression
      [ WHERE booleanExpression ]
      [ GROUP BY { groupItem [, groupItem ]* } ]
      [ HAVING booleanExpression ]
      [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
    
    selectWithoutFrom:
    SELECT [ ALL | DISTINCT ]
      { * | projectItem [, projectItem ]* }
    
    projectItem:
      expression [ [ AS ] columnAlias ]
      | tableAlias . *
    
    tableExpression:
      tableReference [, tableReference ]*
      | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
    
    joinCondition:
    ON booleanExpression
      | USING '(' column [, column ]* ')'
    
    tableReference:
      tablePrimary
      [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
    
    tablePrimary:
      [ TABLE ] [ [ catalogName . ] schemaName . ] tableName
      | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
      | UNNEST '(' expression ')'
    
    values:
    VALUES expression [, expression ]*
    
    groupItem:
      expression
      | '(' ')'
      | '(' expression [, expression ]* ')'
      | CUBE '(' expression [, expression ]* ')'
      | ROLLUP '(' expression [, expression ]* ')'
      | GROUPING SETS '(' groupItem [, groupItem ]* ')'
    
    windowRef:
        windowName
      | windowSpec
    
    windowSpec:
        [ windowName ]
    '('
        [ ORDER BY orderItem [, orderItem ]* ]
        [ PARTITION BY expression [, expression ]* ]
        [
    RANGE numericOrIntervalExpression {PRECEDING}
          | ROWS numericExpression {PRECEDING}
        ]
    ')'
    

    上面 SQL 的语法支持也已经表明了 Flink SQL 对算子的支持,接下来我们对 Flink SQL 中最常见的算子语义进行介绍。

    4.2 Flink SQL 常用算子

    SELECT

    SELECT 用于从 DataSet/DataStream 中选择数据,用于筛选出某些列。

    示例:

    SELECT * FROM Table;// 取出表中的所有列
    SELECT name,age FROM Table;// 取出表中 name 和 age 两列
    

    与此同时 SELECT 语句中可以使用函数和别名,例如我们上面提到的 WordCount 中:

    SELECT word, COUNT(word) FROM table GROUP BY word;
    

    WHERE

    WHERE 用于从数据集/流中过滤数据,与 SELECT 一起使用,用于根据某些条件对关系做水平分割,即选择符合条件的记录。

    示例:

    SELECT name,age FROM Table where name LIKE ‘% 小明 %’;
    SELECT * FROM Table WHERE age = 20

    WHERE 是从原数据中进行过滤,那么在 WHERE 条件中,Flink SQL 同样支持 =、<、>、<>、>=、<=,以及 AND、OR 等表达式的组合,最终满足过滤条件的数据会被选择出来。并且 WHERE 可以结合 IN、NOT IN 联合使用。举个负责的例子:

    SELECT name, age
    FROM Table
    WHERE name IN (SELECT name FROM Table2)
    

    DISTINCT

    DISTINCT 用于从数据集/流中去重根据 SELECT 的结果进行去重。

    示例:

    SELECT DISTINCT name FROM Table;
    

    对于流式查询,计算查询结果所需的 State 可能会无限增长,用户需要自己控制查询的状态范围,以防止状态过大。

    GROUP BY

    GROUP BY 是对数据进行分组操作。例如我们需要计算成绩明细表中,每个学生的总分。

    SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name;
    

    UNION 和 UNION ALL

    UNION 用于将两个结果集合并起来,要求两个结果集字段完全一致,包括字段类型、字段顺序。不同于 UNION ALL 的是,UNION 会对结果数据去重。

    示例:

    SELECT * FROM T1 UNION (ALL) SELECT * FROM T2;
    

    JOIN

    JOIN 用于把来自两个表的数据联合起来形成结果表,Flink 支持的 JOIN 类型包括:

    • JOIN - INNER JOIN

    • LEFT JOIN - LEFT OUTER JOIN

    • RIGHT JOIN - RIGHT OUTER JOIN

    • FULL JOIN - FULL OUTER JOIN

    这里的 JOIN 的语义和我们在关系型数据库中使用的 JOIN 语义一致。

    示例:

    JOIN(将订单表数据和商品表进行关联)
    SELECT * FROM Orders INNER JOIN Product ON Orders.productId = [Product.id](http://product.id/)
    

    LEFT JOIN 与 JOIN 的区别是当右表没有与左边相 JOIN 的数据时候,右边对应的字段补 NULL 输出,RIGHT JOIN 相当于 LEFT JOIN 左右两个表交互一下位置。FULL JOIN 相当于 RIGHT JOIN 和 LEFT JOIN 之后进行 UNION ALL 操作。

    示例:

    SELECT *
    FROM Orders LEFT JOIN Product ON Orders.productId = [Product.id](http://product.id/)
    
    SELECT *
    FROM Orders RIGHT JOIN Product ON Orders.productId = [Product.id](http://product.id/)
    
    SELECT *
    FROM Orders FULL OUTER JOIN Product ON Orders.productId = [Product.id](http://product.id/)
    

    Group Window

    根据窗口数据划分的不同,目前 Apache Flink 有如下 3 种 Bounded Window:

    • Tumble,滚动窗口,窗口数据有固定的大小,窗口数据无叠加;

    • Hop,滑动窗口,窗口数据有固定大小,并且有固定的窗口重建频率,窗口数据有叠加;

    • Session,会话窗口,窗口数据没有固定的大小,根据窗口数据活跃程度划分窗口,窗口数据无叠加。

    Tumble Window

    Tumble 滚动窗口有固定大小,窗口数据不重叠,具体语义如下:

    640?wx_fmt=png

    Tumble 滚动窗口对应的语法如下:

    SELECT 
        [gk],
        [TUMBLE_START(timeCol, size)], 
        [TUMBLE_END(timeCol, size)], 
        agg1(col1), 
        ... 
        aggn(colN)
    FROM Tab1
    GROUP BY [gk], TUMBLE(timeCol, size)
    

    其中:

    • [gk] 决定了是否需要按照字段进行聚合;

    • TUMBLE_START 代表窗口开始时间;

    • TUMBLE_END 代表窗口结束时间;

    • timeCol 是流表中表示时间字段;

    • size 表示窗口的大小,如 秒、分钟、小时、天。

    举个例子,假如我们要计算每个人每天的订单量,按照 user 进行聚合分组:

    SELECT user, TUMBLE_START(rowtime, INTERVAL1DAY) as wStart, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime, INTERVAL1DAY), user;
    

    Hop Window

    Hop 滑动窗口和滚动窗口类似,窗口有固定的 size,与滚动窗口不同的是滑动窗口可以通过 slide 参数控制滑动窗口的新建频率。因此当 slide 值小于窗口 size 的值的时候多个滑动窗口会重叠,具体语义如下:

    640?wx_fmt=png

    Hop 滑动窗口对应语法如下:

    SELECT 
        [gk], 
        [HOP_START(timeCol, slide, size)] ,  
        [HOP_END(timeCol, slide, size)],
        agg1(col1), 
        ... 
        aggN(colN) 
    FROM Tab1
    GROUP BY [gk], HOP(timeCol, slide, size)
    

    每次字段的意思和 Tumble 窗口类似:

    • [gk] 决定了是否需要按照字段进行聚合;

    • HOP_START 表示窗口开始时间;

    • HOP_END 表示窗口结束时间;

    • timeCol 表示流表中表示时间字段;

    • slide 表示每次窗口滑动的大小;

    • size 表示整个窗口的大小,如 秒、分钟、小时、天。

    举例说明,我们要每过一小时计算一次过去 24 小时内每个商品的销量:

    SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product
    

    Session Window

    会话时间窗口没有固定的持续时间,但它们的界限由 interval 不活动时间定义,即如果在定义的间隙期间没有出现事件,则会话窗口关闭。

    640?wx_fmt=png

    Seeeion 会话窗口对应语法如下:

    SELECT 
        [gk], 
        SESSION_START(timeCol, gap) AS winStart,  
        SESSION_END(timeCol, gap) AS winEnd,
        agg1(col1),
         ... 
        aggn(colN)
    FROM Tab1
    GROUP BY [gk], SESSION(timeCol, gap)
    
    • [gk] 决定了是否需要按照字段进行聚合;

    • SESSION_START 表示窗口开始时间;

    • SESSION_END 表示窗口结束时间;

    • timeCol 表示流表中表示时间字段;

    • gap 表示窗口数据非活跃周期的时长。

    例如,我们需要计算每个用户访问时间 12 小时内的订单量:

    SELECT user, SESSION_START(rowtime, INTERVAL12HOUR) AS sStart, SESSION_ROWTIME(rowtime, INTERVAL12HOUR) AS sEnd, SUM(amount) FROM Orders GROUP BY SESSION(rowtime, INTERVAL12HOUR), user
    

    五、Flink SQL 的内置函数

    Flink 提供大量的内置函数供我们直接使用,我们常用的内置函数分类如下:

    • 比较函数

    • 逻辑函数

    • 算术函数

    • 字符串处理函数

    • 时间函数

    我们接下来对每种函数举例进行讲解。

    5.1 比较函数

    比较函数描述
    value1=value2如果 value1 等于 value2,则返回 TRUE ; 如果 value1 或 value2 为 NULL,则返回 UNKNOWN
    value1<>value2如果 value1 不等于 value2,则返回 TRUE ; 如果 value1 或 value2 为 NULL,则返回 UNKNOWN
    value1>value2如果 value1 大于 value2,则返回 TRUE ; 如果 value1 或 value2 为 NULL,则返回 UNKNOWN
    value1 < value2如果 value1 小于 value2,则返回 TRUE ; 如果 value1 或 value2 为 NULL,则返回 UNKNOWN
    value IS NULL如果 value 为 NULL,则返回 TRUE
    value IS NOT NULL如果 value 不为 NULL,则返回 TRUE
    string1 LIKE string2如果 string1 匹配模式 string2,则返回 TRUE ; 如果 string1 或 string2 为 NULL,则返回 UNKNOWN
    value1 IN (value2, value3…)如果给定列表中存在 value1 (value2,value3,…),则返回 TRUE 。当(value2,value3,…)包含 NULL,如果可以找到该数据元则返回 TRUE,否则返回 UNKNOWN。如果 value1 为 NULL,则始终返回 UNKNOWN

    5.2 逻辑函数

    逻辑函数描述
    A OR B如果 A 为 TRUE 或 B 为 TRUE,则返回 TRUE
    A AND B如果 A 和 B 都为 TRUE,则返回 TRUE
    NOT boolean如果 boolean 为 FALSE,则返回 TRUE,否则返回 TRUE。如果 boolean 为 TRUE,则返回 FALSE
    A IS TRUE 或 FALSE判断 A 是否为真

    5.3 算术函数

    算术函数描述
    numeric1 ±*/ numeric2分别代表两个数值加减乘除
    ABS(numeric)返回 numeric 的绝对值
    POWER(numeric1, numeric2)返回 numeric1 上升到 numeric2 的幂

    除了上述表中的函数,Flink SQL 还支持种类丰富的函数计算。

    5.4 字符串处理函数

    字符串函数描述
    UPPER/LOWER以大写 / 小写形式返回字符串
    LTRIM(string)返回一个字符串,从去除左空格的字符串, 类似还有 RTRIM
    CONCAT(string1, string2,…)返回连接 string1,string2,…的字符串

    5.5 时间函数

    时间函数描述
    DATE string返回以“yyyy-MM-dd”形式从字符串解析的 SQL 日期
    TIMESTAMP string返回以字符串形式解析的 SQL 时间戳,格式为“yyyy-MM-dd HH:mm:ss [.SSS]”
    CURRENT_DATE返回 UTC 时区中的当前 SQL 日期
    DATE_FORMAT(timestamp, string)返回使用指定格式字符串格式化时间戳的字符串

    六、Flink SQL 实战应用

    上面我们分别介绍了 Flink SQL 的背景、新特性、编程模型和常用算子,这部分我们将模拟一个真实的案例为大家使用 Flink SQL 提供一个完整的 Demo。

    相信这里应该有很多 NBA 的球迷,假设我们有一份数据记录了每个赛季的得分王的数据,包括赛季、球员、出场、首发、时间、助攻、抢断、盖帽、得分等。现在我们要统计获得得分王荣誉最多的三名球员。

    原数据存在 score.csv 文件中,如下:

    17-18,詹姆斯-哈登,72,72,35.4,8.8,1.8,0.7,30.4
    16-17,拉塞尔-威斯布鲁克,81,81,34.6,10.4,1.6,0.4,31.6
    15-16,斯蒂芬-库里,79,79,34.2,6.7,2.1,0.2,30.1
    14-15,拉塞尔-威斯布鲁克,67,67,34.4,8.6,2.1,0.2,28.1
    13-14,凯文-杜兰特,81,81,38.5,5.5,1.3,0.7,32
    12-13,卡梅罗-安东尼,67,67,37,2.6,0.8,0.5,28.7
    11-12,凯文-杜兰特,66,66,38.6,3.5,1.3,1.2,28
    10-11,凯文-杜兰特,78,78,38.9,2.7,1.1,1,27.7
    09-10,凯文-杜兰特,82,82,39.5,2.8,1.4,1,30.1
    08-09,德维恩-韦德,79,79,38.6,7.5,2.2,1.3,30.2
    07-08,勒布朗-詹姆斯,75,74,40.4,7.2,1.8,1.1,30
    06-07,科比-布莱恩特,77,77,40.8,5.4,1.4,0.5,31.6
    05-06,科比-布莱恩特,80,80,41,4.5,1.8,0.4,35.4
    04-05,阿伦-艾弗森,75,75,42.3,7.9,2.4,0.1,30.7
    03-04,特雷西·麦克格雷迪,67,67,39.9,5.5,1.4,0.6,28
    02-03,特雷西·麦克格雷迪,75,74,39.4,5.5,1.7,0.8,32.1
    01-02,阿伦-艾弗森,60,59,43.7,5.5,2.8,0.2,31.4
    00-01,阿伦-艾弗森,71,71,42,4.6,2.5,0.3,31.1
    99-00,沙奎尔-奥尼尔,79,79,40,3.8,0.5,3,29.7
    98-99,阿伦-艾弗森,48,48,41.5,4.6,2.3,0.1,26.8
    97-98,迈克尔-乔丹,82,82,38.8,3.5,1.7,0.5,28.7
    96-97,迈克尔-乔丹,82,82,37.9,4.3,1.7,0.5,29.6
    95-96,迈克尔-乔丹,82,82,37.7,4.3,2.2,0.5,30.4
    94-95,沙奎尔-奥尼尔,79,79,37,2.7,0.9,2.4,29.3
    93-94,大卫-罗宾逊,80,80,40.5,4.8,1.7,3.3,29.8
    92-93,迈克尔-乔丹,78,78,39.3,5.5,2.8,0.8,32.6
    91-92,迈克尔-乔丹,80,80,38.8,6.1,2.3,0.9,30.1
    90-91,迈克尔-乔丹,82,82,37,5.5,2.7,1,31.5
    89-90,迈克尔-乔丹,82,82,39,6.3,2.8,0.7,33.6
    88-89,迈克尔-乔丹,81,81,40.2,8,2.9,0.8,32.5
    87-88,迈克尔-乔丹,82,82,40.4,5.9,3.2,1.6,35
    86-87,迈克尔-乔丹,82,82,40,4.6,2.9,1.5,37.1
    85-86,多米尼克-威尔金斯,78,78,39.1,2.6,1.8,0.6,30.3
    84-85,伯纳德-金,55,55,37.5,3.7,1.3,0.3,32.9
    83-84,阿德里安-丹特利,79,79,37.8,3.9,0.8,0.1,30.6
    82-83,阿历克斯-英格利什,82,82,36.4,4.8,1.4,1.5,28.4
    81-82,乔治-格文,79,79,35.7,2.4,1,0.6,32.3
    

    首先我们需要创建一个工程,并且在 Maven 中有如下依赖:

    <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <flink.version>1.7.1</flink.version>
    <slf4j.version>1.7.7</slf4j.version>
    <log4j.version>1.2.17</log4j.version>
    <scala.binary.version>2.11</scala.binary.version>
    </properties>
    <dependencies>
    <!-- Apache Flink dependencies -->
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table_2.11</artifactId>
    <version>1.7.1</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
    <version>1.7.1</version>
    </dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>${slf4j.version}</version>
    </dependency>
    <dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>${log4j.version}</version>
    </dependency>
    

    第一步,创建上下文环境:

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
    

    第二步,读取 score.csv 并且作为 source 输入:

     DataSet<String> input = env.readTextFile("score.csv");
            DataSet<PlayerData> topInput = input.map(new MapFunction<String, PlayerData>() {
    @Override
    public PlayerData map(String s) throws Exception {
                    String[] split = s.split(",");
    return new PlayerData(String.valueOf(split[0]),
                            String.valueOf(split[1]),
                            String.valueOf(split[2]),
                            Integer.valueOf(split[3]),
                            Double.valueOf(split[4]),
                            Double.valueOf(split[5]),
                            Double.valueOf(split[6]),
                            Double.valueOf(split[7]),
                            Double.valueOf(split[8])
                    );
                }
            });
    其中的PlayerData类为自定义类:
    public static class PlayerData {
    /**
             * 赛季,球员,出场,首发,时间,助攻,抢断,盖帽,得分
             */
    public String season;
    public String player;
    public String play_num;
    public Integer first_court;
    public Double time;
    public Double assists;
    public Double steals;
    public Double blocks;
    public Double scores;
    
    public PlayerData() {
    super();
            }
    
    public PlayerData(String season,
                              String player,
                              String play_num,
                              Integer first_court,
                              Double time,
                              Double assists,
                              Double steals,
                              Double blocks,
                              Double scores
                              ) {
    this.season = season;
    this.player = player;
    this.play_num = play_num;
    this.first_court = first_court;
    this.time = time;
    this.assists = assists;
    this.steals = steals;
    this.blocks = blocks;
    this.scores = scores;
            }
        }
    

    第三步,将 source 数据注册成表:

    Table topScore = tableEnv.fromDataSet(topInput);
    tableEnv.registerTable("score", topScore);
    

    第四步,核心处理逻辑 SQL 的编写:

    Table queryResult = tableEnv.sqlQuery("
    select player, 
    count(season) as num 
    FROM score 
    GROUP BY player 
    ORDER BY num desc 
    LIMIT 3
    ");
    

    第五步,输出结果:

    DataSet<Result> result = tableEnv.toDataSet(queryResult, Result.class);
    result.print();
    

    我们直接运行整个程序,观察输出结果:

    ...
    16:28:06,162 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Shut down complete.
    16:28:06,162 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Stop job leader service.
    16:28:06,164 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Stopped TaskExecutor akka://flink/user/taskmanager_2.
    16:28:06,166 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
    16:28:06,166 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.
    16:28:06,169 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.
    16:28:06,177 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopping Akka RPC service.
    16:28:06,187 INFO  org.apache.flink.runtime.blob.PermanentBlobCache              - Shutting down BLOB cache
    16:28:06,187 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Shutting down BLOB cache
    16:28:06,188 INFO  org.apache.flink.runtime.blob.BlobServer                      - Stopped BLOB server at 0.0.0.0:51703
    16:28:06,188 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped Akka RPC service.
    迈克尔-乔丹:10
    凯文-杜兰特:4
    阿伦-艾弗森:4
    

    我们看到控制台已经输出结果了:

    640?wx_fmt=jpeg

    完整的代码如下:

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.BatchTableEnvironment;
    
    public class TableSQL {
    
    public static void main(String[] args) throws Exception{
    
    //1\. 获取上下文环境 table的环境
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
    
    //2\. 读取score.csv
            DataSet<String> input = env.readTextFile("score.csv");
            input.print();
    
            DataSet<PlayerData> topInput = input.map(new MapFunction<String, PlayerData>() {
    @Override
    public PlayerData map(String s) throws Exception {
                    String[] split = s.split(",");
    
    return new PlayerData(String.valueOf(split[0]),
                            String.valueOf(split[1]),
                            String.valueOf(split[2]),
                            Integer.valueOf(split[3]),
                            Double.valueOf(split[4]),
                            Double.valueOf(split[5]),
                            Double.valueOf(split[6]),
                            Double.valueOf(split[7]),
                            Double.valueOf(split[8])
                    );
                }
            });
    
    //3\. 注册成内存表
            Table topScore = tableEnv.fromDataSet(topInput);
            tableEnv.registerTable("score", topScore);
    
    //4\. 编写sql 然后提交执行
    //select player, count(season) as num from score group by player order by num desc;
            Table queryResult = tableEnv.sqlQuery("select player, count(season) as num from score group by player order by num desc limit 3");
    
    //5\. 结果进行打印
            DataSet<Result> result = tableEnv.toDataSet(queryResult, Result.class);
            result.print();
    
        }
    
    public static class PlayerData {
    /**
             * 赛季,球员,出场,首发,时间,助攻,抢断,盖帽,得分
             */
    public String season;
    public String player;
    public String play_num;
    public Integer first_court;
    public Double time;
    public Double assists;
    public Double steals;
    public Double blocks;
    public Double scores;
    
    public PlayerData() {
    super();
            }
    
    public PlayerData(String season,
                              String player,
                              String play_num,
                              Integer first_court,
                              Double time,
                              Double assists,
                              Double steals,
                              Double blocks,
                              Double scores
                              ) {
    this.season = season;
    this.player = player;
    this.play_num = play_num;
    this.first_court = first_court;
    this.time = time;
    this.assists = assists;
    this.steals = steals;
    this.blocks = blocks;
    this.scores = scores;
            }
        }
    
    public static class Result {
    public String player;
    public Long num;
    
    public Result() {
    super();
            }
    public Result(String player, Long num) {
    this.player = player;
    this.num = num;
            }
    @Override
    public String toString() {
    return player + ":" + num;
            }
        }
    }//
    

    当然我们也可以自定义一个 Sink,将结果输出到一个文件中,例如:

            TableSink sink = new CsvTableSink("/home/result.csv", ",");
    String[] fieldNames = {"name", "num"};
            TypeInformation[] fieldTypes = {Types.STRING, Types.INT};
            tableEnv.registerTableSink("result", fieldNames, fieldTypes, sink);
            sqlQuery.insertInto("result");
            env.execute();
    

    然后我们运行程序,可以看到 /home 目录下生成的 result.csv,查看结果:

    迈克尔-乔丹,10
    凯文-杜兰特,4
    阿伦-艾弗森,4
    

    七、总结

    本篇向大家介绍了 Flink SQL 产生的背景,Flink SQL 大部分核心功能,并且分别介绍了 Flink SQL 的编程模型和常用算子及内置函数。最后以一个完整的示例展示了如何编写 Flink SQL 程序。Flink SQL 的简便易用极大地降低了 Flink 编程的门槛,是我们必需掌握的使用 Flink 解决流式计算问题最锋利的武器!

    640?wx_fmt=gif

    展开全文
  • lcc@lcc ~$ mvn install:install-file -DgroupId=org.apache.flink -DartifactId=flink-fs-hadoop-shaded -Dversion=1.9-SNAPSHOT -Dpackaging=jar -Dfile=/Users/lcc/flink-s3-fs-hadoop-1.9.0.jar [WARNING] ...
  • Flink:Flink-源码

    2021-03-19 00:02:02
    Flink Flink
  • Flink学习 Flink Scala版本 卡夫卡版 1.10.0 2.11+ 0.10+ Maven 模板 : https://www.mvnjar.com/org.apache.flink/flink-quickstart-scala/jar.html <br> 注意 : 当在idea上调试时,先将 es-shade install。...
  • flink集群安装部署 yarn集群模式 Flink入门及实战-上: http://edu.51cto.com/sd/07245 Flink入门及实战-下: http://edu.51cto.com/sd/5845e 快速开始 在yarn上启动一个一直运行的flink集群 在yarn上...

    flink集群安装部署

    yarn集群模式

     

    Flink入门及实战-上:

    http://edu.51cto.com/sd/07245

    Flink入门及实战-下:

    http://edu.51cto.com/sd/5845e

     

    • 快速开始
    1. 在yarn上启动一个一直运行的flink集群
    2. 在yarn上运行一个flink job
    • flink yarn session
    1. 启动flink session
    2. 提交任务到flink
    • 在yarn上运行一个独立的flink job
    1. 用户依赖jar包和classpath
    • flink on yarn的故障恢复
    • 调试一个失败的yarn session
    1. 日志文件
    2. yarn client控制台和web界面
    • 针对指定的hadoop版本构建yarn client
    • 在yarn上运行flink使用防火墙
    • flink on yarn 内部实现

    快速开始

    在yarn上启动一个一直运行的flink集群

    启动一个yarn session使用4个taskmanager(每个节点4GB内存)
    注意:如果自己的虚拟机没有这么大内存的话,可以吧-n设置小一点,对应的后面的内存-jm -tm也设置小一点,否则,如果内存不够用,会导致启动失败。

    tar xvzf flink-1.4.2-bin-hadoop2.tgz
    cd flink-1.4.2/
    ./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096

    通过-s参数指定每一个taskmanager分配多少个slots(处理进程)。我们建议设置为每个机器的CPU核数。

    一旦session创建成功,你可以使用./bin/flink工具向集群提交任务。

    在yarn上运行一个flink job

    tar xvzf flink-1.4.2-bin-hadoop2.tgz
    cd flink-1.4.2/
    ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/batch/WordCount.jar

     

    flink yarn session

    yarn是一个集群资源管理框架。它运行在集群之上运行各种分布式应用程序。flink像其他程序一样,也可以在yarn上运行。用户不需要设置或者安装任何东西,如果已经有一个安装配置好的yarn。

    必须的依赖

     

    • 至少是hadoop2.2
    • hdfs(或者是其它支持hadoop的分布式文件系统)

    如果你在使用flink yarn client的时候有什么问题,可以到这里查找答案

    启动flink session

    按照下面的步骤来学习如何在yarn集群上启动一个flink session

    一个session将会包含所有必须的flink 服务(jobmanager和taskmanager),这样你就可以向这个集群提交程序了。注意:每个session会话你可以运行多个程序。

    下载flink

    下载hadoop2对应的flink安装包,点此下载。它包含了必须的文件。

    使用下面命令解压:

    tar xvzf flink-1.4.2-bin-hadoop2.tgz
    cd flink-1.4.2/

    启动一个session

    使用下面命令启动一个session

    ./bin/yarn-session.sh

    这个命令将会输出下面内容:

    用法:
       必选
         -n,--container <arg>   分配多少个yarn容器 (=taskmanager的数量)
       可选
         -D <arg>                        动态属性
         -d,--detached                   独立运行
         -jm,--jobManagerMemory <arg>    JobManager的内存 [in MB]
         -nm,--name                     在YARN上为一个自定义的应用设置一个名字
         -q,--query                      显示yarn中可用的资源 (内存, cpu核数)
         -qu,--queue <arg>               指定YARN队列.
         -s,--slots <arg>                每个TaskManager使用的slots数量
         -tm,--taskManagerMemory <arg>   每个TaskManager的内存 [in MB]
         -z,--zookeeperNamespace <arg>   针对HA模式在zookeeper上创建NameSpace

    请注意:client必须要设置YARN_CONF_DIR或者HADOOP_CONF_DIR环境变量,通过这个环境变量来读取YARN和HDFS的配置信息,否则启动会失败。

    经试验发现,其实如果配置的有HADOOP_HOME环境变量的话也是可以的。HADOOP_HOME ,YARN_CONF_DIR,HADOOP_CONF_DIR 只要配置的有任何一个即可。

    例子:下面的命令会申请10个taskmanager,每个8G内存和32个solt

    ./bin/yarn-session.sh -n 10 -tm 8192 -s 32

    该系统默认会使用这个配置文件:conf/flink-conf.yaml。如果你想修改一些参数,请查看我们的配置指南

     

    flink on yarn模式将会覆盖一些配置文件 jobmanager.rpc.address(因为jobmanager总是分配在不同的机器),taskmanager.tmp.dirs(我们使用yarn提供的临时目录)和parallelism.default 如果solts的数量已经被指定。

    如果你不想修改配置文件去改变参数,有一个选择是通过动态的参数-D 来指定。所以你可以传递参数:-Dfs.overwrite-files=true -Dtaskmanager.network.memory.min=536346624

    上面的例子将会启动11个容器(即使仅请求10个容器),因为有一个额外的容器来启动ApplicationMaster 和 job manager

    一旦flink在你的yarn集群上部署,它将会显示job manager的连接详细信息。

    停止yarn session通过停止unix进程(使用CTRL+C)或者在client中输入stop。

    Flink on yarn只会启动请求的资源,如果集群资源充足。大多数yarn调度器请求容器的内存,一些也会请求cpu。默认,cpu的核数等于slots的数量,通过-s参数指定。这个参数yarn.containers.vcores的值允许使用一个自定义值来进行覆盖。

     

    后台 yarn session

    如果你不希望flink yarn client一直运行,也可以启动一个后台运行的yarn session。使用这个参数:-d 或者 --detached

    在这种情况下,flink yarn client将会只提交任务到集群然后关闭自己。注意:在这种情况下,无法使用flink停止yarn session。

    使用yarn 工具 来停止yarn session

    yarn application -kill <applicationId>

    附着到一个已存在的session

    使用下面命令启动一个session

    ./bin/yarn-session.sh

    执行这个命令将会显示下面内容:

    用法:
       必须
         -id,--applicationId <yarnAppId>        YARN集群上的任务id

    正如前面提到的,YARN_CONF_DIR或者HADOOP_CONF_DIR环境变量必须是可以读取到YARN和HDFS配置的。

    例如:发出下面命令可以附着到一个运行中的flink yarn session

    ./bin/yarn-session.sh -id application_1463870264508_0029

    附着到一个运行的session使用yarn resourcemanager来确定job Manager 的RPC端口。

    停止yarn session通过停止unix进程(使用CTRL+C)或者在client中输入stop

     

    提交任务到flink

    使用下面的命令提交一个flink程序到yarn集群

    ./bin/flink

    请参考客户端命令行操作文档

     

    这个命令将会向你展示一个这样一个帮助菜单

    "run" 参数可以编译和运行一个程序

    用法: run [OPTIONS] <jar-file> <arguments>
      "run" 操作参数:
         -c,--class <classname>           如果没有在jar包中指定入口类,则需要在这里通过这个参数指定
    
         -m,--jobmanager <host:port>      指定需要连接的jobmanager(主节点)地址
                                          使用这个参数可以指定一个不同于配置文件中的jobmanager
    
         -p,--parallelism <parallelism>   指定程序的并行度。可以覆盖配置文件中的默认值。

    使用run 命令向yarn集群提交一个job。客户端可以确定jobmanager的地址。当然,你也可以通过-m参数指定jobmanager。jobmanager的地址在yarn控制台上可以看到。

    例子:【注意:下面的命令官网文档提供的有问题,执行失败】

    wget -O LICENSE-2.0.txt http://www.apache.org/licenses/LICENSE-2.0.txt
    hadoop fs -copyFromLocal LICENSE-2.0.txt hdfs:/// ...
    【注意:下面的命令官网文档提供的有问题,执行失败】
    ./bin/flink run ./examples/batch/WordCount.jar \
            hdfs:///..../LICENSE-2.0.txt hdfs:///.../wordcount-result.txt
    查看flink源码发现,wordCount.jar可以不提供参数,或者提供参数,提供参数的时候需要使用input和output参数指定:
    上面的命令需要修改为如下格式才能正常执行[传递的两个参数需要使用-input和 -output来指定]
    ./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hostname:port/LICENSE-2.0.txt -output hdfs://hostname:port/wordcount-result.txt
    
    
    

    如果有以下错误,确保所有taskmanager是否已经启动:

    Exception in thread "main" org.apache.flink.compiler.CompilerException:
        Available instances could not be determined from job manager: Connection timed out.

    你可以在jobmanager的web界面上检查taskmanager的数量。这个web界面的地址会打印在yarn session的控制台上。

    如果没有发现taskmanager,你应该通过日志文件来检查问题。

     

    在yarn上运行一个独立的flink job

    这个文档描述了如何在一个hadoop yarn环境中启动flink集群。也可以在yarn中启动只执行单个任务的flink。

    请注意:client期望设置-yn 参数(taskmanager的数量)

    例子:

    ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar

    yarn session命令行的选项也可以使用./bin/flink 工具获得。它们都有一个y或者yarn的前缀

    注意:通过为每个任务设置不同的环境变量 FLINK_CONF_DIR,可以为每个任务使用不同的配置目录。从 Flink 分发包中复制 conf 目录,然后修改配置,例如,每个任务不同的日志设置

     

    用户依赖jar包和classpath

    默认情况下,当运行一个独立的job的时候,这个flink job将包含用户依赖的jar包。可以通过参数yarn.per-job-cluster.include-user-jar来控制。

    当设置为 DISABLED ,flink将会包含用户classpath下面的jar包。

    用户jar包在类路径中的位置可以通过下面参数来控制:

    * ORDER: (默认) 添加jar包到系统类路径下面,按照字典顺序.
    * FIRST: 将jar添加到类路径的前面.
    * LAST: 将jar添加到类路径的最后.

     

    flink on yarn的故障恢复

    flink 的 yarn 客户端通过下面的配置参数来控制容器的故障恢复。这些参数可以通过conf/flink-conf.yaml 或者在启动yarn session的时候通过-D参数来指定。

     

    • yarn.reallocate-failed:这个参数控制了flink是否应该重新分配失败的taskmanager容器。默认是true。
    • yarn.maximum-failed-containers:applicationMaster可以接受的容器最大失败次数,达到这个参数,就会认为yarn session失败。默认这个次数和初始化请求的taskmanager数量相等(-n 参数指定的)。
    • yarn.application-attempts:applicationMaster重试的次数。如果这个值被设置为1(默认就是1),当application master失败的时候,yarn session也会失败。设置一个比较大的值的话,yarn会尝试重启applicationMaster。

     

    调试一个失败的yarn session

     

    一个flink yarn session部署失败可能会有很多原因。一个错误的hadoop配置(hdfs 权限,yarn配置),版本不兼容(使用cdh中的hadoop运行flink),或者其他的错误。

    日志文件

    在某种情况下,flink yarn session 部署失败是由于它自身的原因,用户必须依赖于yarn的日志来进行分析。最有用的就是yarn log aggregation 。启动它,用户必须在yarn-site.xml文件中设置yarn.log-aggregation-enable 属性为true。一旦启用了,用户可以通过下面的命令来查看一个失败的yarn session的所有详细日志。

    yarn logs -applicationId <application ID>

    yarn client控制台和web界面

    flink yarn client也会打印一些错误信息在控制台上,如果错误发生在运行时(例如如果一个taskmanager停止工作了一段时间)

    除此之外,yarn resource manager的web界面(默认端口是8088)。resource manager的端口是通过yarn.resourcemanager.webapp.address参数来配置的。

    它运行在yarn 程序运行的时候查看日志和程序失败的时候查看日志用户查找问题。

     

    针对指定的hadoop版本构建yarn client

    用户可以使用hadoop发行版。例如,hortonworks,CDH或者MapR等版本去构建 flink。请参考构建指南获取详细信息

     

    在yarn上运行flink使用防火墙

    一些yarn 集群使用防火墙来控制集群的网络和其他网络的通信。在这种设置下,flink只能通过集群的网络来提交任务到yarn session。针对生产环境下使用是不可行的,flink允许配置所有相关服务的端口范围,通过这些端口范围的配置,用户也可以透过防火墙来提交flink job。

    目前,两个服务都需要提交任务:

    • jobmanager(yarn中的applicationMaster)
    • jobmanager内部运行的blobserver

    当向flink提交一个任务的时候,blobserver将会把用户的代码分发到所有工作节点(taskManagers)。jobmanager接收任务本身,并触发执行。

    以下两个配置参数可以指定端口:

     

    • yarn.application-master.port
    • blob.server.port

    这两个配置选项接收单一的端口(例如:"50010"),区间("50000-50025"),或者同时指定多个("50010,50011,50020-50025,50050-50075")。

    (hadoop也使用的是类似的机制,例如:yarn.app.mapreduce.am.job.client.port-range)

     

    flink on yarn 内部实现

    本节主要描述flink和yarn是如何交互的

    YARN 客户端需要访问 Hadoop 配置,从而连接 YARN 资源管理器和 HDFS。可以使用下面的策略来决定 Hadoop 配置:

     

    • 测试 YARN_CONF_DIR, HADOOP_CONF_DIR 或 HADOOP_CONF_PATH 环境变量是否设置了(按该顺序测试)。如果它们中有一个被设置了,那么它们就会用来读取配置。
    • 如果上面的策略失败了(如果正确安装了 YARN 的话,这不应该会发生),客户端会使用 HADOOP_HOME 环境变量。如果该变量设置了,客户端会尝试访问 $HADOOP_HOME/etc/hadoop (Hadoop 2) 和 $HADOOP_HOME/conf(Hadoop 1)。

    当启动一个新的 Flink YARN Client会话,客户端首先会检查所请求的资源(容器和内存)是否可用。之后,它会上传包含了 Flink 配置和 jar文件到 HDFS(步骤 1)。

    客户端的下一步是请求(步骤 2)一个 YARN 容器启动 ApplicationMaster (步骤 3)。因为客户端将配置和jar 文件作为容器的资源注册了,所以运行在特定机器上的 YARN 的 NodeManager 会负责准备容器(例如,下载文件)。一旦这些完成了,ApplicationMaster (AM) 就启动了。

    JobManager 和 AM 运行在同一个容器中。一旦它们成功地启动了,AM 知道 JobManager 的地址(它自己)。它会为 TaskManager 生成一个新的 Flink 配置文件(这样它们才能连上 JobManager)。该文件也同样会上传到 HDFS。另外,AM 容器同时提供了 Flink 的 Web 界面服务。Flink 用来提供服务的端口是由用户 + 应用程序 id 作为偏移配置的。这使得用户能够并行执行多个 Flink YARN 会话。

    之后,AM 开始为 Flink 的 TaskManager 分配容器,这会从 HDFS 下载 jar 文件和修改过的配置文件。一旦这些步骤完成了,Flink 就安装完成并准备接受任务了。

     

     

     

    获取更多大数据资料,视频以及技术交流请加群:

     

     

     

    展开全文
  • 书名叫Mastering Apache Flink, Learning Apache Flink, 英文高清版,支持复制
  • flink 资料

    2019-02-26 20:31:57
    风头正劲的,flink 基础学习相关资料, 希望可以有帮助
  • flink实战--flinkSQL入门大全

    万次阅读 2018-11-12 18:07:03
    FlinkSQL概念介绍 Table API & SQL Apache Flink具有两个关系API - 表API和SQL - 用于统一流和批处理。Table API是Scala和Java的语言集成查询API,允许以非常直观的方式组合来自关系运算符的查询,Table API...

     

    扫一扫加入大数据公众号和技术交流群,了解更多大数据技术,还有免费资料等你哦

     

    FlinkSQL概念介绍

    Table API & SQL

                    Apache Flink具有两个关系API - 表API和SQL - 用于统一流和批处理。Table API是Scala和Java的语言集成查询API,允许以非常直观的方式组合来自关系运算符的查询,Table API和SQL接口彼此紧密集成,以及Flink的DataStream和DataSet API。您可以轻松地在基于API构建的所有API和库之间切换。例如,您可以使用CEP库从DataStream中提取模式,然后使用Table API分析模式,或者可以在预处理上

    展开全文
  • Flink 专题 -1 搭建FlinkFlink 简介

    千次阅读 2018-11-06 22:46:28
    文章目录Flink 专题1 : 搭建FlinkFlink 简介Flink 简介Flink 的优势:Flink 安装flink 安装步骤flink 集群模式 结构 :配置文件设置:添加jobManager/TaskManager启动集群1 集群模式启动2. yarn 模式启动 Flink ...
  • flink-template flink代码模板
  • flink sql flink 整合 hive
  • 问题导读 1.Flink CEP是什么?2.Flink CEP可以做哪些事情?3.Flink CEP和流式处理有什么区别?...CEP在Flink未产生以前,已经有CEP,并不是有了Flink才有CEP,我们这里重点是讲Flink CEP。CEP本身的含义
  • Flink 原理与实现

    万次阅读 2020-10-21 20:00:06
    Flink 为流处理和批处理分别提供了 DataStream API 和 DataSet API。正是这种高层的抽象和 flunent API 极大地便利了用户编写大数据应用。 不过很多初学者在看到官方 Streaming 文档中那一大坨的转换时,常常会蒙了...
  • Flink CEP兵书

    2021-06-16 18:50:51
    Flink CEP兵书全面系统的介绍Flink CEP相关知识点以及相关代码讲解。分别讲解了:Flink CEP与流式处理的区别、原理、api讲解、Flink CEP各种模式、跳过策略、模式匹配、水位线的讲解等内容。以及在代码例子中,给...
  • Flink基础系列3-windows安装Flink

    千次阅读 2021-10-13 15:14:11
    文章目录一.Flink下载二.运行Flink2.1 Java安装2.2 运行Flink三.访问 Flink UI四.运行自带的 WordCount 示例参考: 一.Flink下载 本次以Flink 1.9.0版本为例。 下载 flink-1.9.0-bin-scala_2.12.tgz 下载后解压到 D:\...
  • Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。这个学习文档通俗易懂flink知识点几乎全部覆盖,...
  • Flink示例——Flink-CDC

    2021-04-14 13:38:56
    Flink示例——Flink-CDC 版本信息 产品 版本 Flink 1.11.1 flink-cdc-connectors 1.1.0 Java 1.8.0_231 MySQL 5.7.16 注意:官方说目前支持MySQL-5.7和8,但笔者还简单测试过mariadb-10.0.38(对应...
  • Flink基本概念与部署.Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行...
  • Flink教程

    千次阅读 2020-02-22 15:48:46
    Flink教程。Flink 是一个同时具备流数据处理和批数据处理的分布式计算框架。flink代码主要是由 Java 实现,部分代码由 Scala实现。Flink既可以处理有界的批量数据集、也可以处理无界的实时数据集。就业界的使用情况...
  • flink安装.docx

    2019-12-26 08:45:19
    flink 安装 flink 安装 flink 安装 flink 安装 flink 安装 flink 安装
  • Flink简介

    千次阅读 2021-03-03 10:58:39
    序言 大数据运算主要有2个领域:1:流式计算 2:批量计算。在数据操作层面可以看做如下的两类 有限数据集:数据大小有限(固定大小,比如固定...https://flink.apache.org/zh/flink-architecture.html ...
  • Flink基础系列1-Flink介绍

    千次阅读 2021-10-12 12:24:34
    文章目录概述:一.Flink体系结构介绍1.1 处理无界和有界数据1.2 部署应用程序在任何地方1.3 在任何规模上运行应用程序1.4 利用内存性能二.应用程序2.1 流应用程序的构建块2.2 分层的api2.3 库三. 操作3.1 7*24 不间断...
  • Flink技术分析

    2019-04-19 08:30:11
    flink技术分析,在这里,我们解释Flink架构的重要方面,Apache Flink是什么 ,Apache Flink有哪些方面的特性,对flink的全面剖析
  • flink-example:flink的联系项目

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 92,080
精华内容 36,832
关键字:

flink