flink 订阅
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。 [1] 展开全文
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。 [1]
信息
稳定版本
Apache软件基金会 [1]
类    型
1.8.0 [1]
中文名
弗林克
外文名
Flink [1]
flink开发
Apache Flink是由Apache软件基金会内的Apache Flink社区基于Apache许可证2.0开发的,该项目已有超过100位代码提交者和超过460贡献者。 [2]  是由Apache Flink的创始人创建的公司。目前,该公司已聘用了12个Apache Flink的代码提交者。 [3] 
收起全文
精华内容
下载资源
问答
  • Flink

    千次阅读 2019-08-14 19:43:41
    Flink 一、简介 Flink核心是一个流式的数据流执行引擎,其针对数据流的分布式计算提供了数据分布、数据通信以及容错机制等功能。基于流执行引擎,Flink提供了诸多更高抽象层的API以便用户编写分布式任务: ​ ...

    Flink

    一、简介

    Flink核心是一个流式的数据流执行引擎,其针对数据流的分布式计算提供了数据分布、数据通信以及容错机制等功能。基于流执行引擎,Flink提供了诸多更高抽象层的API以便用户编写分布式任务:

    ​ DataSet API, 对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用Flink提供的各种操作符对分布式数据集进行处理,支持Java、Scala和Python。

    ​ DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持Java和Scala。

    ​ Table API,对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过类SQL的DSL对关系表进行各种查询操作,支持Java和Scala。

    此外,Flink还针对特定的应用领域提供了领域库,例如:

    ​ Flink ML,Flink的机器学习库,提供了机器学习Pipelines API并实现了多种机器学习算法。

    ​ Gelly,Flink的图计算库,提供了图计算的相关API及多种图计算算法实现。

    二、优点

    Flink 是一个开源的分布式流式处理框架:

    ​ ①提供准确的结果,甚至在出现无序或者延迟加载的数据的情况下。

    ​ ②它是状态化的容错的,同时在维护一次完整的的应用状态时,能无缝修复错误。

    ​ ③大规模运行,在上千个节点运行时有很好的吞吐量和低延迟。

    Flink处理不同数据分类:

    ​ ①有界数据流–Flink批处理–DataSet–groupBy

    ​ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

    ​ ②无界数据流–Flink流处理–DataStream–keyBy–流处理默认一条条数据处理

    ​ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    ​ ③有/无界数据流–Flink SQL

    Flink & Storm & SparkStreaming 流处理框架的区别:

    ​ •Strom:纯实时处理数据,吞吐量小 --水龙头滴水

    ​ •SparkStreaming : 准实时处理数据,微批处理数据,吞吐量大 --河道中开闸关闸

    ​ •Flink:纯实时处理数据,吞吐量大 --河流远远不断

    三、注意点

    Flink程序的执行过程:
    	获取flink的执行环境(execution environment)
    	加载数据				-- soure
    	对加载的数据进行转换 		-- transformation
    	对结果进行保存或者打印	    -- sink
    	触发flink程序的执行(execute(),count(),collect(),print()),例如:调用						ExecutionEnvironment或者StreamExecutionEnvironment的execute()方法。
    	
    
    
    1.创建环境
          批处理:ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
          流处理:StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
          
    2.在批处理中Flink处理的数据对象是DataSet
        在流处理中Flink处理的数据对象时DataStream
    
    3.代码流程必须符合 source ->transformation -> sink
          transformation 都是懒执行,需要最后使用env.execute()触发执行或者使用 print() 					,count(),collect() 触发执行
    
    4.Flink编程不是基于K,V格式的编程,通过某些方式来指定虚拟key
    
    5.Flink中的tuple最多支持25个元素,每个元素是从0开始
    

    四、WordCount

    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.operators.Order;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.operators.*;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.core.fs.FileSystem;
    import org.apache.flink.util.Collector;
    
    public class WordCount {
        public static void main(String[] args) throws Exception{
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSource<String> dataSource = env.readTextFile("./data/words");
            FlatMapOperator<String, String> words = dataSource.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public void flatMap(String line, Collector<String> collector) throws Exception {
                    String[] split = line.split(" ");
                    for(String word : split){
                        collector.collect(word);
                    }
                }
            });
    
            MapOperator<String, Tuple2<String, Integer>> pairWords = words.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String word) throws Exception {
                    return new Tuple2<>(word, 1);
                }
            });
    
            UnsortedGrouping<Tuple2<String, Integer>> groupBy = pairWords.groupBy(0);
            AggregateOperator<Tuple2<String, Integer>> result = groupBy.sum(1).setParallelism(1);
            SortPartitionOperator<Tuple2<String, Integer>> sortResult = result.sortPartition(1, Order.DESCENDING);
    
            sortResult.writeAsCsv("./data/result/r2","\n","&", FileSystem.WriteMode.OVERWRITE);
            env.execute("myflink");
            result.print();
        }
    }
    

    五、排序

    在Flink中,排序是默认在各个分区中进行的排序,所以最终结果在默认情况下是不能进行排序展示的

    这时, ①可以给相关的算子进行设计–sum().setParallelism(1)

    AggregateOperator<Tuple2<String, Integer>> result = groupBy.sum(1).setParallelism(1);
    

    ​ ②全局设计,但是全局设计限制分区数,影响框架运行速度,通常不建议–env. setParallelism(1)

    六、source数据源和Sink源

    1 数据源Source

    ​ Source 是Flink 获取数据的地方。以下source 中和批处理的source 类似,但是以下
    源作为dataStream 流处理时,是一条条处理,最终得到的不是一个总结果,而是每
    次处理后都会得到一个结果。
    ​ 1).socketTextStream – 读取Socket 数据流
    ​ 2). readTextFile() – 逐行读取文本文件获取数据流,每行都返回字符串。
    ​ 3).fromCollection() – 从集合中创建数据流。
    ​ 4).fromElements – 从给定的数据对象创建数据流,所有数据类型要一致。
    ​ 5).addSource – 添加新的源函数,例如从kafka 中读取数据,参见读取kafka 数据案例。

    2 数据写出Sink

    ​ 1).writeAsText() – 以字符串的形式逐行写入文件,调用每个元素的toString()得到写
    入的字符串。
    ​ 2).writeAsCsv() – 将元组写出以逗号分隔的csv 文件。注意:只能作用到元组数据上。
    ​ 3).print() – 控制台直接输出结果,调用对象的toString()方法得到输出结果。
    ​ 4).addSink() – 自定义接收函数。例如将结果保存到kafka 中,参见kafka 案例。

    七、累加器&计数器

    Flink 累加器

    ​ Accumulator即累加器,可以在分布式统计数据,只有在任务结束之后才能获取累加器的最终结果。计数器是累加器的具体实现,有:IntCounter,LongCounter和DoubleCounter。

    累加器注意事项

    ​ 1.需要在算子内部创建累加器对象

    ​ 2.通常在Rich函数中的open方法中注册累加器,指定累加器的名称

    ​ 3.在当前算子内任意位置可以使用累加器

    ​ 4.必须当任务执行结束后,通过env.execute(xxx)执行后的JobExecutionResult对象获取累加器的值。

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    DataSource<String> dataSource = env.fromElements("a", "b", "c", "d", "e", "f");
    MapOperator<String, String> map = dataSource.map(new RichMapFunction<String, String>(){
    
    //1.创建累加器,在算子中创建累加器对象
    	private IntCounter numLines = new IntCounter();
    
    //2.注册累加器对象,通常在Rich 函数的open 方法中使用
    // getRuntimeContext().addAccumulator("num-lines", this.numLines);注册累加器
        public void open(Configuration parameters) throws Exception {
            getRuntimeContext().addAccumulator("num-lines", this.numLines);
    	}
    
        @Override
        public String map(String s) throws Exception {
        //3.使用累加器,可以在任意操作中使用,包括在open 或者close 方法中
            this.numLines.add(1);
            return s;
        }
        }).setParallelism(8);
        map.writeAsText("./TempResult/result",FileSystem.WriteMode.OVERWRITE);
        JobExecutionResult myJobExecutionResult = env.execute("IntCounterTest");
        
        //4.当作业执行完成之后,在JobExecutionResult 对象中获取累加器的值。
            int accumulatorResult = myJobExecutionResult.getAccumulatorResult("num-lines");
            System.out.println("accumulator value = "+accumulatorResult);
    

    八、指定虚拟Key

    Apache Flink 指定虚拟key

    ​ 1.使用Tuples来指定key

    ​ 2.使用Field Expression来指定key

    ​ 3.使用Key Selector Functions来指定key

    ​ 操作(reduce,groupReduce,Aggregate,Windows)允许数据在处理之前根据key 进行分组。
    在Flink 中数据模型不是基于Key,Value 格式处理的,因此不需将数据处理成键值对的格
    式,key 是“虚拟的”,可以人为的来指定,实际数据处理过程中根据指定的key 来对数
    据进行分组,DataSet 中使用groupBy 来指定key,DataStream 中使用keyBy 来指定key。
    如何指定keys?

    1. 使用Tuples 来指定key
      定义元组来指定key 可以指定tuple 中的第几个元素当做key,或者指定tuple 中的
      联合元素当做key。需要使用org.apache.flink.api.java.tuple.TupleXX 包下的tuple,最多
      支持25 个元素且Tuple 必须new 创建。如果Tuple 是嵌套的格式, 例如:
      DataStream<Tuple3<Tuple2<Integer, Float>,String,Long>> ds,如果指定keyBy(0)则会使
      用整个内部的Tuple2 作为key。如果想要使用内部Tuple2 中的Float 格式当做key,
      就要使用第二种方式Field Expression 来指定key。
    2. 使用Field Expression 来指定key
      可以使用Field Expression 来指定key,一般作用的对象可以是类对象,或者嵌套的
      Tuple 格式的数据。
      使用注意点:
      a) 对于类对象可以使用类中的字段来指定key。
      类对象定义需要注意:
      i. 类的访问级别必须是public
      ii. 必须写出默认的空的构造函数
      iii. 类中所有的字段必须是public 的或者必须有getter,setter 方法。例如类
      中有个字段是foo,那么这个字段的getter,setter 方法为:getFoo() 和
      setFoo().
      iv. Flink 必须支持字段的类型。一般类型都支持
      b) 对于嵌套的Tuple 类型的Tuple 数据可以使用”xx.f0”表示嵌套tuple 中第一个元
      素, 也可以直接使用”xx.0” 来表示第一个元素, 参照案例
      GroupByUseFieldExpressions。
    3. 使用Key Selector Functions 来指定key
      使用key Selector 这种方式选择key,非常方便,可以从数据类型中指定想要的key.

    九、Flink & Kafka

    IDEA代码—MyFlinkCode

    1 Flink消费kafka数据起始offset配置

    ​ \1.flinkKafkaConsumer.setStartFromEarliest()

    从topic的最早offset位置开始处理数据,如果kafka中保存有消费者组的消费位置将被忽略。

    ​ \2. flinkKafkaConsumer.setStartFromLatest()

    从topic的最新offset位置开始处理数据,如果kafka中保存有消费者组的消费位置将被忽略。

    ​ \3. flinkKafkaConsumer.setStartFromTimestamp(…)

    从指定的时间戳(毫秒)开始消费数据,Kafka中每个分区中数据大于等于设置的时间戳的数据位置将被当做开始消费的位置。如果kafka中保存有消费者组的消费位置将被忽略。

    ​ \4. flinkKafkaConsumer.setStartFromGroupOffsets()

    默认的设置。根据代码中设置的group.id设置的消费者组,去kafka中或者zookeeper中找到对应的消费者offset位置消费数据。如果没有找到对应的消费者组的位置,那么将按照auto.offset.reset设置的策略读取offset。

    2 Flink消费kafka数据,消费者offset提交配置

    ​ 1.Flink提供了消费kafka数据的offset如何提交给Kafka或者zookeeper(kafka0.8之前)的配置。注意,Flink并不依赖提交给Kafka或者zookeeper中的offset来保证容错。提交的offset只是为了外部来查询监视kafka数据消费的情况

    ​ 2.配置offset的提交方式取决于是否为job设置开启checkpoint。可以使用env.enableCheckpointing(5000)来设置开启checkpoint。

    3.关闭checkpoint:

    ​ 如何禁用了checkpoint,那么offset位置的提交取决于Flink读取kafka客户端的配置,enable.auto.commit ( auto.commit.enable【Kafka 0.8】)配置是否开启自动提交offset, auto.commit.interval.ms决定自动提交offset的周期。

    4.开启checkpoint:

    ​ 如果开启了checkpoint,那么当checkpoint保存状态完成后,将checkpoint中保存的offset位置提交到kafka。这样保证了Kafka中保存的offset和checkpoint中保存的offset一致,可以通过配置setCommitOffsetsOnCheckpoints(boolean)来配置是否将checkpoint中的offset提交到kafka中(默认是true)。如果使用这种方式,那么properties中配置的kafka offset自动提交参数enable.auto.commit和周期提交参数auto.commit.interval.ms参数将被忽略。

    3 Flink中外部状态实现两阶段提交

    ​ Flink外部状态实现两阶段提交将逻辑封装到TwoPhaseComitSinkFunction类中,需要扩展TwoPhaseCommitSinkFunction来实现仅一次消费数据。若要实现支持exactly-once语义的自定义sink,需要实现以下4个方法:

    \1. beginTransaction:开启一个事务,创建一个临时文件,将数据写入到临时文件中

    \2. preCommit:在pre-commit阶段,flush缓存数据到磁盘,然后关闭这个文件,确保不会 有新的数据写入到这个文件,同时开启一个新事务执行属于下一个checkpoint的写入 操作。

    \3. commit:在commit阶段,我们以原子性的方式将上一阶段的文件写入真正的文件目 录下。【注意:数据有延时,不是实时的】。

    \4. abort:一旦异常终止事务,程序如何处理。这里要清除临时文件。

    exactly-once语义的自定义sink,需要实现以下4个方法:

    \1. beginTransaction:开启一个事务,创建一个临时文件,将数据写入到临时文件中

    \2. preCommit:在pre-commit阶段,flush缓存数据到磁盘,然后关闭这个文件,确保不会 有新的数据写入到这个文件,同时开启一个新事务执行属于下一个checkpoint的写入 操作。

    \3. commit:在commit阶段,我们以原子性的方式将上一阶段的文件写入真正的文件目 录下。【注意:数据有延时,不是实时的】。

    \4. abort:一旦异常终止事务,程序如何处理。这里要清除临时文件。

    在这里插入图片描述

    展开全文
  • Flink 最锋利的武器:Flink SQL 入门和实战

    万次阅读 多人点赞 2019-06-21 00:00:00
    一、Flink SQL 背景Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。自 2015...

     

    《2021年最新版大数据面试题全面开启更新》

    《2021年最新版大数据面试题全面开启更新》

     

    一、Flink SQL 背景

    Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。

    自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对 Flink 存在的不足进行优化和改进,并且在 2019 年初将最终代码开源,也就是我们熟知的 Blink。Blink 在原来的 Flink 基础上最显著的一个贡献就是 Flink SQL 的实现。

    Flink SQL 是面向用户的 API 层,在我们传统的流式计算领域,比如 Storm、Spark Streaming 都会提供一些 Function 或者 Datastream API,用户通过 Java 或 Scala 写业务逻辑,这种方式虽然灵活,但有一些不足,比如具备一定门槛且调优较难,随着版本的不断更新,API 也出现了很多不兼容的地方。

    640?wx_fmt=png

    在这个背景下,毫无疑问,SQL 就成了我们最佳选择,之所以选择将 SQL 作为核心 API,是因为其具有几个非常重要的特点:

    • SQL 属于设定式语言,用户只要表达清楚需求即可,不需要了解具体做法;

    • SQL 可优化,内置多种查询优化器,这些查询优化器可为 SQL 翻译出最优执行计划;

    • SQL 易于理解,不同行业和领域的人都懂,学习成本较低;

    • SQL 非常稳定,在数据库 30 多年的历史中,SQL 本身变化较少;

    • 流与批的统一,Flink 底层 Runtime 本身就是一个流与批统一的引擎,而 SQL 可以做到 API 层的流与批统一。

    二、Flink 的最新特性(1.7.0 和 1.8.0 更新)

    2.1 Flink 1.7.0 新特性

    在 Flink 1.7.0 中,我们更接近实现快速数据处理和以无缝方式为 Flink 社区构建数据密集型应用程序的目标。最新版本包括一些新功能和改进,例如对 Scala 2.12 的支持、一次性 S3 文件接收器、复杂事件处理与流 SQL 的集成等。

    Apache Flink 中对 Scala 2.12 的支持(FLINK-7811)

    Apache Flink 1.7.0 是第一个完全支持 Scala 2.12 的版本。这允许用户使用较新的 Scala 版本编写 Flink 应用程序并利用 Scala 2.12 生态系统。

    状态演进(FLINK-9376)

    许多情况下,由于需求的变化,长期运行的 Flink 应用程序需要在其生命周期内发展。在不失去当前应用程序进度状态的情况下更改用户状态是应用程序发展的关键要求。使用 Flink 1.7.0,社区添加了状态演变,允许您灵活地调整长时间运行的应用程序的用户状态模式,同时保持与以前保存点的兼容性。通过状态演变,可以在状态模式中添加或删除列,以便更改应用程序部署后应用程序捕获的业务功能。现在,使用 Avro 生成时,状态模式演变现在可以立即使用作为用户状态的类,这意味着可以根据 Avro 的规范来演变国家的架构。虽然 Avro 类型是 Flink 1.7 中唯一支持模式演变的内置类型,但社区仍在继续致力于在未来的 Flink 版本中进一步扩展对其他类型的支持。

    MATCH RECOGNIZE Streaming SQL 支持(FLINK-6935)

    这是 Apache Flink 1.7.0 的一个重要补充,它为 Flink SQL 提供了 MATCH RECOGNIZE 标准的初始支持。此功能结合了复杂事件处理(CEP)和 SQL,可以轻松地对数据流进行模式匹配,从而实现一整套新的用例。此功能目前处于测试阶段,因此我们欢迎社区提供任何反馈和建议。

    流式 SQL 中的时态表和时间连接(FLINK-9712)

    时态表是 Apache Flink 中的一个新概念,它为表的更改历史提供(参数化)视图,并在特定时间点返回表的内容。例如,我们可以使用具有历史货币汇率的表格。随着时间的推移,这种表格不断增长/发展,并且增加了新的更新汇率。时态表是一种视图,可以将这些汇率的实际状态返回到任何给定的时间点。使用这样的表,可以使用正确的汇率将不同货币的订单流转换为通用货币。时间联接允许使用不断变化/更新的表来进行内存和计算有效的流数据连接。

    Streaming SQL 的其他功能

    除了上面提到的主要功能外,Flink 的 Table&SQL API 已经扩展到更多用例。以下内置函数被添加到 API:TO_BASE64、LOG2、LTRIM、REPEAT、REPLACE、COSH、SINH、TANH SQL Client 现在支持在环境文件和 CLI 会话中定义视图。此外,CLI 中添加了基本的 SQL 语句自动完成功能。社区添加了一个 Elasticsearch 6 表接收器,允许存储动态表的更新结果。

    Kafka 2.0 连接器(FLINK-10598)

    Apache Flink 1.7.0 继续添加更多连接器,使其更容易与更多外部系统进行交互。在此版本中,社区添加了 Kafka 2.0 连接器,该连接器允许通过一次性保证读取和写入 Kafka 2.0。

    本地恢复(FLINK-9635)

    Apache Flink 1.7.0 通过扩展 Flink 的调度来完成本地恢复功能,以便在恢复时考虑以前的部署位置。如果启用了本地恢复,Flink 将保留最新检查点的本地副本任务运行的机器。通过将任务调度到以前的位置,Flink 将通过从本地磁盘读取检查点状态来最小化恢复状态的网络流量。此功能大大提高了恢复速度。

    2.2 Flink 1.8.0 新特性

    Flink 1.8.0 引入对状态的清理

    使用 TTL(生存时间)连续增量清除旧的 Key 状态 Flink 1.8 引入了对 RocksDB 状态后端(FLINK-10471)和堆状态后端(FLINK-10473)的旧数据的连续清理。这意味着旧的数据将(根据 TTL 设置)不断被清理掉。

    新增和删除一些 Table API

    1) 引入新的 CSV 格式符(FLINK-9964)

    此版本为符合 RFC4180 的 CSV 文件引入了新的格式符。新描述符可以使用 org.apache.flink.table.descriptors.Csv。目前,只能与 Kafka 一起使用。旧描述符 org.apache.flink.table.descriptors.OldCsv 用于文件系统连接器。

    2) 静态生成器方法在 TableEnvironment(FLINK-11445)上的弃用

    为了将 API 与实际实现分开,TableEnvironment.getTableEnvironment() 不推荐使用静态方法。现在推荐使用 Batch/StreamTableEnvironment.create()。

    3) 表 API Maven 模块中的更改(FLINK-11064)

    之前具有 flink-table 依赖关系的用户需要更新其依赖关系 flink-table-planner,以及正确的依赖关系 flink-table-api-*,具体取决于是使用 Java 还是 Scala: flink-table-api-java-bridge 或者 flink-table-api-scala-bridge。

    Kafka Connector 的修改

    引入可直接访问 ConsumerRecord 的新 KafkaDeserializationSchema(FLINK-8354),对于 FlinkKafkaConsumers 推出了一个新的 KafkaDeserializationSchema,可以直接访问 KafkaConsumerRecord。

    三、Flink SQL 的编程模型

    Flink 的编程模型基础构建模块是流(streams)与转换 (transformations),每一个数据流起始于一个或多个 source,并终止于一个或多个 sink。

    640?wx_fmt=png

    相信大家对上面的图已经十分熟悉了,当然基于 Flink SQL 编写的 Flink 程序也离不开读取原始数据,计算逻辑和写入计算结果数据三部分。

    一个完整的 Flink SQL 编写的程序包括如下三部分:

    • Source Operator:Soruce operator 是对外部数据源的抽象, 目前 Apache Flink 内置了很多常用的数据源实现例如 MySQL、Kafka 等;

    • Transformation Operators:算子操作主要完成例如查询、聚合操作等,目前 Flink SQL 支持了 Union、Join、Projection、Difference、Intersection 及 window 等大多数传统数据库支持的操作;

    • Sink Operator:Sink operator 是对外结果表的抽象,目前 Apache Flink 也内置了很多常用的结果表的抽象,比如 Kafka Sink 等

    我们通过用一个最经典的 WordCount 程序作为入门,看一下传统的基于 DataSet/DataStream API 开发和基于 SQL 开发有哪些不同?

    • DataStream/DataSetAPI

    
     
    • Flink SQL

    //省略掉初始化环境等公共代码
    SELECT word, COUNT(word) FROM table GROUP BY word;
    

    我们已经可以直观体会到,SQL 开发的快捷和便利性了。

    四、Flink SQL 的语法和算子

    4.1 Flink SQL 支持的语法

    Flink SQL 核心算子的语义设计参考了 1992、2011 等 ANSI-SQL 标准,Flink 使用 Apache Calcite 解析 SQL ,Calcite 支持标准的 ANSI SQL。

    那么 Flink 自身支持的 SQL 语法有哪些呢?

    insert:
    INSERT INTO tableReference
    query
    
    query:
    values
      | {
    select
          | selectWithoutFrom
          | query UNION [ ALL ] query
          | query EXCEPT query
          | query INTERSECT query
        }
        [ ORDER BY orderItem [, orderItem ]* ]
        [ LIMIT { count | ALL } ]
        [ OFFSET start { ROW | ROWS } ]
        [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY]
    
    orderItem:
      expression [ ASC | DESC ]
    
    select:
    SELECT [ ALL | DISTINCT ]
      { * | projectItem [, projectItem ]* }
    FROM tableExpression
      [ WHERE booleanExpression ]
      [ GROUP BY { groupItem [, groupItem ]* } ]
      [ HAVING booleanExpression ]
      [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]
    
    selectWithoutFrom:
    SELECT [ ALL | DISTINCT ]
      { * | projectItem [, projectItem ]* }
    
    projectItem:
      expression [ [ AS ] columnAlias ]
      | tableAlias . *
    
    tableExpression:
      tableReference [, tableReference ]*
      | tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ]
    
    joinCondition:
    ON booleanExpression
      | USING '(' column [, column ]* ')'
    
    tableReference:
      tablePrimary
      [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]
    
    tablePrimary:
      [ TABLE ] [ [ catalogName . ] schemaName . ] tableName
      | LATERAL TABLE '(' functionName '(' expression [, expression ]* ')' ')'
      | UNNEST '(' expression ')'
    
    values:
    VALUES expression [, expression ]*
    
    groupItem:
      expression
      | '(' ')'
      | '(' expression [, expression ]* ')'
      | CUBE '(' expression [, expression ]* ')'
      | ROLLUP '(' expression [, expression ]* ')'
      | GROUPING SETS '(' groupItem [, groupItem ]* ')'
    
    windowRef:
        windowName
      | windowSpec
    
    windowSpec:
        [ windowName ]
    '('
        [ ORDER BY orderItem [, orderItem ]* ]
        [ PARTITION BY expression [, expression ]* ]
        [
    RANGE numericOrIntervalExpression {PRECEDING}
          | ROWS numericExpression {PRECEDING}
        ]
    ')'
    

    上面 SQL 的语法支持也已经表明了 Flink SQL 对算子的支持,接下来我们对 Flink SQL 中最常见的算子语义进行介绍。

    4.2 Flink SQL 常用算子

    SELECT

    SELECT 用于从 DataSet/DataStream 中选择数据,用于筛选出某些列。

    示例:

    SELECT * FROM Table;// 取出表中的所有列
    SELECT name,age FROM Table;// 取出表中 name 和 age 两列
    

    与此同时 SELECT 语句中可以使用函数和别名,例如我们上面提到的 WordCount 中:

    SELECT word, COUNT(word) FROM table GROUP BY word;
    

    WHERE

    WHERE 用于从数据集/流中过滤数据,与 SELECT 一起使用,用于根据某些条件对关系做水平分割,即选择符合条件的记录。

    示例:

    SELECT name,age FROM Table where name LIKE ‘% 小明 %’;
    SELECT * FROM Table WHERE age = 20

    WHERE 是从原数据中进行过滤,那么在 WHERE 条件中,Flink SQL 同样支持 =、<、>、<>、>=、<=,以及 AND、OR 等表达式的组合,最终满足过滤条件的数据会被选择出来。并且 WHERE 可以结合 IN、NOT IN 联合使用。举个负责的例子:

    SELECT name, age
    FROM Table
    WHERE name IN (SELECT name FROM Table2)
    

    DISTINCT

    DISTINCT 用于从数据集/流中去重根据 SELECT 的结果进行去重。

    示例:

    SELECT DISTINCT name FROM Table;
    

    对于流式查询,计算查询结果所需的 State 可能会无限增长,用户需要自己控制查询的状态范围,以防止状态过大。

    GROUP BY

    GROUP BY 是对数据进行分组操作。例如我们需要计算成绩明细表中,每个学生的总分。

    SELECT name, SUM(score) as TotalScore FROM Table GROUP BY name;
    

    UNION 和 UNION ALL

    UNION 用于将两个结果集合并起来,要求两个结果集字段完全一致,包括字段类型、字段顺序。不同于 UNION ALL 的是,UNION 会对结果数据去重。

    示例:

    SELECT * FROM T1 UNION (ALL) SELECT * FROM T2;
    

    JOIN

    JOIN 用于把来自两个表的数据联合起来形成结果表,Flink 支持的 JOIN 类型包括:

    • JOIN - INNER JOIN

    • LEFT JOIN - LEFT OUTER JOIN

    • RIGHT JOIN - RIGHT OUTER JOIN

    • FULL JOIN - FULL OUTER JOIN

    这里的 JOIN 的语义和我们在关系型数据库中使用的 JOIN 语义一致。

    示例:

    JOIN(将订单表数据和商品表进行关联)
    SELECT * FROM Orders INNER JOIN Product ON Orders.productId = [Product.id](http://product.id/)
    

    LEFT JOIN 与 JOIN 的区别是当右表没有与左边相 JOIN 的数据时候,右边对应的字段补 NULL 输出,RIGHT JOIN 相当于 LEFT JOIN 左右两个表交互一下位置。FULL JOIN 相当于 RIGHT JOIN 和 LEFT JOIN 之后进行 UNION ALL 操作。

    示例:

    SELECT *
    FROM Orders LEFT JOIN Product ON Orders.productId = [Product.id](http://product.id/)
    
    SELECT *
    FROM Orders RIGHT JOIN Product ON Orders.productId = [Product.id](http://product.id/)
    
    SELECT *
    FROM Orders FULL OUTER JOIN Product ON Orders.productId = [Product.id](http://product.id/)
    

    Group Window

    根据窗口数据划分的不同,目前 Apache Flink 有如下 3 种 Bounded Window:

    • Tumble,滚动窗口,窗口数据有固定的大小,窗口数据无叠加;

    • Hop,滑动窗口,窗口数据有固定大小,并且有固定的窗口重建频率,窗口数据有叠加;

    • Session,会话窗口,窗口数据没有固定的大小,根据窗口数据活跃程度划分窗口,窗口数据无叠加。

    Tumble Window

    Tumble 滚动窗口有固定大小,窗口数据不重叠,具体语义如下:

    640?wx_fmt=png

    Tumble 滚动窗口对应的语法如下:

    SELECT 
        [gk],
        [TUMBLE_START(timeCol, size)], 
        [TUMBLE_END(timeCol, size)], 
        agg1(col1), 
        ... 
        aggn(colN)
    FROM Tab1
    GROUP BY [gk], TUMBLE(timeCol, size)
    

    其中:

    • [gk] 决定了是否需要按照字段进行聚合;

    • TUMBLE_START 代表窗口开始时间;

    • TUMBLE_END 代表窗口结束时间;

    • timeCol 是流表中表示时间字段;

    • size 表示窗口的大小,如 秒、分钟、小时、天。

    举个例子,假如我们要计算每个人每天的订单量,按照 user 进行聚合分组:

    SELECT user, TUMBLE_START(rowtime, INTERVAL1DAY) as wStart, SUM(amount) FROM Orders GROUP BY TUMBLE(rowtime, INTERVAL1DAY), user;
    

    Hop Window

    Hop 滑动窗口和滚动窗口类似,窗口有固定的 size,与滚动窗口不同的是滑动窗口可以通过 slide 参数控制滑动窗口的新建频率。因此当 slide 值小于窗口 size 的值的时候多个滑动窗口会重叠,具体语义如下:

    640?wx_fmt=png

    Hop 滑动窗口对应语法如下:

    SELECT 
        [gk], 
        [HOP_START(timeCol, slide, size)] ,  
        [HOP_END(timeCol, slide, size)],
        agg1(col1), 
        ... 
        aggN(colN) 
    FROM Tab1
    GROUP BY [gk], HOP(timeCol, slide, size)
    

    每次字段的意思和 Tumble 窗口类似:

    • [gk] 决定了是否需要按照字段进行聚合;

    • HOP_START 表示窗口开始时间;

    • HOP_END 表示窗口结束时间;

    • timeCol 表示流表中表示时间字段;

    • slide 表示每次窗口滑动的大小;

    • size 表示整个窗口的大小,如 秒、分钟、小时、天。

    举例说明,我们要每过一小时计算一次过去 24 小时内每个商品的销量:

    SELECT product, SUM(amount) FROM Orders GROUP BY HOP(rowtime, INTERVAL '1' HOUR, INTERVAL '1' DAY), product
    

    Session Window

    会话时间窗口没有固定的持续时间,但它们的界限由 interval 不活动时间定义,即如果在定义的间隙期间没有出现事件,则会话窗口关闭。

    640?wx_fmt=png

    Seeeion 会话窗口对应语法如下:

    SELECT 
        [gk], 
        SESSION_START(timeCol, gap) AS winStart,  
        SESSION_END(timeCol, gap) AS winEnd,
        agg1(col1),
         ... 
        aggn(colN)
    FROM Tab1
    GROUP BY [gk], SESSION(timeCol, gap)
    
    • [gk] 决定了是否需要按照字段进行聚合;

    • SESSION_START 表示窗口开始时间;

    • SESSION_END 表示窗口结束时间;

    • timeCol 表示流表中表示时间字段;

    • gap 表示窗口数据非活跃周期的时长。

    例如,我们需要计算每个用户访问时间 12 小时内的订单量:

    SELECT user, SESSION_START(rowtime, INTERVAL12HOUR) AS sStart, SESSION_ROWTIME(rowtime, INTERVAL12HOUR) AS sEnd, SUM(amount) FROM Orders GROUP BY SESSION(rowtime, INTERVAL12HOUR), user
    

    五、Flink SQL 的内置函数

    Flink 提供大量的内置函数供我们直接使用,我们常用的内置函数分类如下:

    • 比较函数

    • 逻辑函数

    • 算术函数

    • 字符串处理函数

    • 时间函数

       

    我们接下来对每种函数举例进行讲解。

    5.1 比较函数

    比较函数 描述
    value1=value2 如果 value1 等于 value2,则返回 TRUE ; 如果 value1 或 value2 为 NULL,则返回 UNKNOWN
    value1<>value2 如果 value1 不等于 value2,则返回 TRUE ; 如果 value1 或 value2 为 NULL,则返回 UNKNOWN
    value1>value2 如果 value1 大于 value2,则返回 TRUE ; 如果 value1 或 value2 为 NULL,则返回 UNKNOWN
    value1 < value2 如果 value1 小于 value2,则返回 TRUE ; 如果 value1 或 value2 为 NULL,则返回 UNKNOWN
    value IS NULL 如果 value 为 NULL,则返回 TRUE
    value IS NOT NULL 如果 value 不为 NULL,则返回 TRUE
    string1 LIKE string2 如果 string1 匹配模式 string2,则返回 TRUE ; 如果 string1 或 string2 为 NULL,则返回 UNKNOWN
    value1 IN (value2, value3…) 如果给定列表中存在 value1 (value2,value3,…),则返回 TRUE 。当(value2,value3,…)包含 NULL,如果可以找到该数据元则返回 TRUE,否则返回 UNKNOWN。如果 value1 为 NULL,则始终返回 UNKNOWN

    5.2 逻辑函数

    逻辑函数 描述
    A OR B 如果 A 为 TRUE 或 B 为 TRUE,则返回 TRUE
    A AND B 如果 A 和 B 都为 TRUE,则返回 TRUE
    NOT boolean 如果 boolean 为 FALSE,则返回 TRUE,否则返回 TRUE。如果 boolean 为 TRUE,则返回 FALSE
    A IS TRUE 或 FALSE 判断 A 是否为真

    5.3 算术函数

    算术函数 描述
    numeric1 ±*/ numeric2 分别代表两个数值加减乘除
    ABS(numeric) 返回 numeric 的绝对值
    POWER(numeric1, numeric2) 返回 numeric1 上升到 numeric2 的幂

    除了上述表中的函数,Flink SQL 还支持种类丰富的函数计算。

    5.4 字符串处理函数

    字符串函数 描述
    UPPER/LOWER 以大写 / 小写形式返回字符串
    LTRIM(string) 返回一个字符串,从去除左空格的字符串, 类似还有 RTRIM
    CONCAT(string1, string2,…) 返回连接 string1,string2,…的字符串

    5.5 时间函数

    时间函数 描述
    DATE string 返回以“yyyy-MM-dd”形式从字符串解析的 SQL 日期
    TIMESTAMP string 返回以字符串形式解析的 SQL 时间戳,格式为“yyyy-MM-dd HH:mm:ss [.SSS]”
    CURRENT_DATE 返回 UTC 时区中的当前 SQL 日期
    DATE_FORMAT(timestamp, string) 返回使用指定格式字符串格式化时间戳的字符串

    六、Flink SQL 实战应用

    上面我们分别介绍了 Flink SQL 的背景、新特性、编程模型和常用算子,这部分我们将模拟一个真实的案例为大家使用 Flink SQL 提供一个完整的 Demo。

    相信这里应该有很多 NBA 的球迷,假设我们有一份数据记录了每个赛季的得分王的数据,包括赛季、球员、出场、首发、时间、助攻、抢断、盖帽、得分等。现在我们要统计获得得分王荣誉最多的三名球员。

    原数据存在 score.csv 文件中,如下:

    17-18,詹姆斯-哈登,72,72,35.4,8.8,1.8,0.7,30.4
    16-17,拉塞尔-威斯布鲁克,81,81,34.6,10.4,1.6,0.4,31.6
    15-16,斯蒂芬-库里,79,79,34.2,6.7,2.1,0.2,30.1
    14-15,拉塞尔-威斯布鲁克,67,67,34.4,8.6,2.1,0.2,28.1
    13-14,凯文-杜兰特,81,81,38.5,5.5,1.3,0.7,32
    12-13,卡梅罗-安东尼,67,67,37,2.6,0.8,0.5,28.7
    11-12,凯文-杜兰特,66,66,38.6,3.5,1.3,1.2,28
    10-11,凯文-杜兰特,78,78,38.9,2.7,1.1,1,27.7
    09-10,凯文-杜兰特,82,82,39.5,2.8,1.4,1,30.1
    08-09,德维恩-韦德,79,79,38.6,7.5,2.2,1.3,30.2
    07-08,勒布朗-詹姆斯,75,74,40.4,7.2,1.8,1.1,30
    06-07,科比-布莱恩特,77,77,40.8,5.4,1.4,0.5,31.6
    05-06,科比-布莱恩特,80,80,41,4.5,1.8,0.4,35.4
    04-05,阿伦-艾弗森,75,75,42.3,7.9,2.4,0.1,30.7
    03-04,特雷西·麦克格雷迪,67,67,39.9,5.5,1.4,0.6,28
    02-03,特雷西·麦克格雷迪,75,74,39.4,5.5,1.7,0.8,32.1
    01-02,阿伦-艾弗森,60,59,43.7,5.5,2.8,0.2,31.4
    00-01,阿伦-艾弗森,71,71,42,4.6,2.5,0.3,31.1
    99-00,沙奎尔-奥尼尔,79,79,40,3.8,0.5,3,29.7
    98-99,阿伦-艾弗森,48,48,41.5,4.6,2.3,0.1,26.8
    97-98,迈克尔-乔丹,82,82,38.8,3.5,1.7,0.5,28.7
    96-97,迈克尔-乔丹,82,82,37.9,4.3,1.7,0.5,29.6
    95-96,迈克尔-乔丹,82,82,37.7,4.3,2.2,0.5,30.4
    94-95,沙奎尔-奥尼尔,79,79,37,2.7,0.9,2.4,29.3
    93-94,大卫-罗宾逊,80,80,40.5,4.8,1.7,3.3,29.8
    92-93,迈克尔-乔丹,78,78,39.3,5.5,2.8,0.8,32.6
    91-92,迈克尔-乔丹,80,80,38.8,6.1,2.3,0.9,30.1
    90-91,迈克尔-乔丹,82,82,37,5.5,2.7,1,31.5
    89-90,迈克尔-乔丹,82,82,39,6.3,2.8,0.7,33.6
    88-89,迈克尔-乔丹,81,81,40.2,8,2.9,0.8,32.5
    87-88,迈克尔-乔丹,82,82,40.4,5.9,3.2,1.6,35
    86-87,迈克尔-乔丹,82,82,40,4.6,2.9,1.5,37.1
    85-86,多米尼克-威尔金斯,78,78,39.1,2.6,1.8,0.6,30.3
    84-85,伯纳德-金,55,55,37.5,3.7,1.3,0.3,32.9
    83-84,阿德里安-丹特利,79,79,37.8,3.9,0.8,0.1,30.6
    82-83,阿历克斯-英格利什,82,82,36.4,4.8,1.4,1.5,28.4
    81-82,乔治-格文,79,79,35.7,2.4,1,0.6,32.3
    

    首先我们需要创建一个工程,并且在 Maven 中有如下依赖:

    <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <flink.version>1.7.1</flink.version>
    <slf4j.version>1.7.7</slf4j.version>
    <log4j.version>1.2.17</log4j.version>
    <scala.binary.version>2.11</scala.binary.version>
    </properties>
    <dependencies>
    <!-- Apache Flink dependencies -->
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-core</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table_2.11</artifactId>
    <version>1.7.1</version>
    </dependency>
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
    <version>1.7.1</version>
    </dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>${slf4j.version}</version>
    </dependency>
    <dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>${log4j.version}</version>
    </dependency>
    

    第一步,创建上下文环境:

    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
    

    第二步,读取 score.csv 并且作为 source 输入:

     DataSet<String> input = env.readTextFile("score.csv");
            DataSet<PlayerData> topInput = input.map(new MapFunction<String, PlayerData>() {
    @Override
    public PlayerData map(String s) throws Exception {
                    String[] split = s.split(",");
    return new PlayerData(String.valueOf(split[0]),
                            String.valueOf(split[1]),
                            String.valueOf(split[2]),
                            Integer.valueOf(split[3]),
                            Double.valueOf(split[4]),
                            Double.valueOf(split[5]),
                            Double.valueOf(split[6]),
                            Double.valueOf(split[7]),
                            Double.valueOf(split[8])
                    );
                }
            });
    其中的PlayerData类为自定义类:
    public static class PlayerData {
    /**
             * 赛季,球员,出场,首发,时间,助攻,抢断,盖帽,得分
             */
    public String season;
    public String player;
    public String play_num;
    public Integer first_court;
    public Double time;
    public Double assists;
    public Double steals;
    public Double blocks;
    public Double scores;
    
    public PlayerData() {
    super();
            }
    
    public PlayerData(String season,
                              String player,
                              String play_num,
                              Integer first_court,
                              Double time,
                              Double assists,
                              Double steals,
                              Double blocks,
                              Double scores
                              ) {
    this.season = season;
    this.player = player;
    this.play_num = play_num;
    this.first_court = first_court;
    this.time = time;
    this.assists = assists;
    this.steals = steals;
    this.blocks = blocks;
    this.scores = scores;
            }
        }
    

    第三步,将 source 数据注册成表:

    Table topScore = tableEnv.fromDataSet(topInput);
    tableEnv.registerTable("score", topScore);
    

    第四步,核心处理逻辑 SQL 的编写:

    Table queryResult = tableEnv.sqlQuery("
    select player, 
    count(season) as num 
    FROM score 
    GROUP BY player 
    ORDER BY num desc 
    LIMIT 3
    ");
    

    第五步,输出结果:

    DataSet<Result> result = tableEnv.toDataSet(queryResult, Result.class);
    result.print();
    

    我们直接运行整个程序,观察输出结果:

    ...
    16:28:06,162 INFO  org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint    - Shut down complete.
    16:28:06,162 INFO  org.apache.flink.runtime.taskexecutor.JobLeaderService        - Stop job leader service.
    16:28:06,164 INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor            - Stopped TaskExecutor akka://flink/user/taskmanager_2.
    16:28:06,166 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down remote daemon.
    16:28:06,166 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon shut down; proceeding with flushing remote transports.
    16:28:06,169 INFO  akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut down.
    16:28:06,177 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopping Akka RPC service.
    16:28:06,187 INFO  org.apache.flink.runtime.blob.PermanentBlobCache              - Shutting down BLOB cache
    16:28:06,187 INFO  org.apache.flink.runtime.blob.TransientBlobCache              - Shutting down BLOB cache
    16:28:06,188 INFO  org.apache.flink.runtime.blob.BlobServer                      - Stopped BLOB server at 0.0.0.0:51703
    16:28:06,188 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopped Akka RPC service.
    迈克尔-乔丹:10
    凯文-杜兰特:4
    阿伦-艾弗森:4
    

    我们看到控制台已经输出结果了:

    640?wx_fmt=jpeg

    完整的代码如下:

    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.java.BatchTableEnvironment;
    
    public class TableSQL {
    
    public static void main(String[] args) throws Exception{
    
    //1\. 获取上下文环境 table的环境
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
    
    //2\. 读取score.csv
            DataSet<String> input = env.readTextFile("score.csv");
            input.print();
    
            DataSet<PlayerData> topInput = input.map(new MapFunction<String, PlayerData>() {
    @Override
    public PlayerData map(String s) throws Exception {
                    String[] split = s.split(",");
    
    return new PlayerData(String.valueOf(split[0]),
                            String.valueOf(split[1]),
                            String.valueOf(split[2]),
                            Integer.valueOf(split[3]),
                            Double.valueOf(split[4]),
                            Double.valueOf(split[5]),
                            Double.valueOf(split[6]),
                            Double.valueOf(split[7]),
                            Double.valueOf(split[8])
                    );
                }
            });
    
    //3\. 注册成内存表
            Table topScore = tableEnv.fromDataSet(topInput);
            tableEnv.registerTable("score", topScore);
    
    //4\. 编写sql 然后提交执行
    //select player, count(season) as num from score group by player order by num desc;
            Table queryResult = tableEnv.sqlQuery("select player, count(season) as num from score group by player order by num desc limit 3");
    
    //5\. 结果进行打印
            DataSet<Result> result = tableEnv.toDataSet(queryResult, Result.class);
            result.print();
    
        }
    
    public static class PlayerData {
    /**
             * 赛季,球员,出场,首发,时间,助攻,抢断,盖帽,得分
             */
    public String season;
    public String player;
    public String play_num;
    public Integer first_court;
    public Double time;
    public Double assists;
    public Double steals;
    public Double blocks;
    public Double scores;
    
    public PlayerData() {
    super();
            }
    
    public PlayerData(String season,
                              String player,
                              String play_num,
                              Integer first_court,
                              Double time,
                              Double assists,
                              Double steals,
                              Double blocks,
                              Double scores
                              ) {
    this.season = season;
    this.player = player;
    this.play_num = play_num;
    this.first_court = first_court;
    this.time = time;
    this.assists = assists;
    this.steals = steals;
    this.blocks = blocks;
    this.scores = scores;
            }
        }
    
    public static class Result {
    public String player;
    public Long num;
    
    public Result() {
    super();
            }
    public Result(String player, Long num) {
    this.player = player;
    this.num = num;
            }
    @Override
    public String toString() {
    return player + ":" + num;
            }
        }
    }//
    

    当然我们也可以自定义一个 Sink,将结果输出到一个文件中,例如:

            TableSink sink = new CsvTableSink("/home/result.csv", ",");
    String[] fieldNames = {"name", "num"};
            TypeInformation[] fieldTypes = {Types.STRING, Types.INT};
            tableEnv.registerTableSink("result", fieldNames, fieldTypes, sink);
            sqlQuery.insertInto("result");
            env.execute();
    

    然后我们运行程序,可以看到 /home 目录下生成的 result.csv,查看结果:

    迈克尔-乔丹,10
    凯文-杜兰特,4
    阿伦-艾弗森,4
    

    七、总结

    本篇向大家介绍了 Flink SQL 产生的背景,Flink SQL 大部分核心功能,并且分别介绍了 Flink SQL 的编程模型和常用算子及内置函数。最后以一个完整的示例展示了如何编写 Flink SQL 程序。Flink SQL 的简便易用极大地降低了 Flink 编程的门槛,是我们必需掌握的使用 Flink 解决流式计算问题最锋利的武器!

    640?wx_fmt=gif

    640?wx_fmt=jpeg

    展开全文
  • flink集群安装部署 yarn集群模式 Flink入门及实战-上: http://edu.51cto.com/sd/07245 Flink入门及实战-下: http://edu.51cto.com/sd/5845e 快速开始 在yarn上启动一个一直运行的flink集群 在yarn上...

    flink集群安装部署

    yarn集群模式

     

    Flink入门及实战-上:

    http://edu.51cto.com/sd/07245

    Flink入门及实战-下:

    http://edu.51cto.com/sd/5845e

     

    • 快速开始
    1. 在yarn上启动一个一直运行的flink集群
    2. 在yarn上运行一个flink job
    • flink yarn session
    1. 启动flink session
    2. 提交任务到flink
    • 在yarn上运行一个独立的flink job
    1. 用户依赖jar包和classpath
    • flink on yarn的故障恢复
    • 调试一个失败的yarn session
    1. 日志文件
    2. yarn client控制台和web界面
    • 针对指定的hadoop版本构建yarn client
    • 在yarn上运行flink使用防火墙
    • flink on yarn 内部实现

    快速开始

    在yarn上启动一个一直运行的flink集群

    启动一个yarn session使用4个taskmanager(每个节点4GB内存)
    注意:如果自己的虚拟机没有这么大内存的话,可以吧-n设置小一点,对应的后面的内存-jm -tm也设置小一点,否则,如果内存不够用,会导致启动失败。

    tar xvzf flink-1.4.2-bin-hadoop2.tgz
    cd flink-1.4.2/
    ./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096

    通过-s参数指定每一个taskmanager分配多少个slots(处理进程)。我们建议设置为每个机器的CPU核数。

    一旦session创建成功,你可以使用./bin/flink工具向集群提交任务。

    在yarn上运行一个flink job

    tar xvzf flink-1.4.2-bin-hadoop2.tgz
    cd flink-1.4.2/
    ./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/batch/WordCount.jar

     

    flink yarn session

    yarn是一个集群资源管理框架。它运行在集群之上运行各种分布式应用程序。flink像其他程序一样,也可以在yarn上运行。用户不需要设置或者安装任何东西,如果已经有一个安装配置好的yarn。

    必须的依赖

     

    • 至少是hadoop2.2
    • hdfs(或者是其它支持hadoop的分布式文件系统)

    如果你在使用flink yarn client的时候有什么问题,可以到这里查找答案

    启动flink session

    按照下面的步骤来学习如何在yarn集群上启动一个flink session

    一个session将会包含所有必须的flink 服务(jobmanager和taskmanager),这样你就可以向这个集群提交程序了。注意:每个session会话你可以运行多个程序。

    下载flink

    下载hadoop2对应的flink安装包,点此下载。它包含了必须的文件。

    使用下面命令解压:

    tar xvzf flink-1.4.2-bin-hadoop2.tgz
    cd flink-1.4.2/

    启动一个session

    使用下面命令启动一个session

    ./bin/yarn-session.sh

    这个命令将会输出下面内容:

    用法:
       必选
         -n,--container <arg>   分配多少个yarn容器 (=taskmanager的数量)
       可选
         -D <arg>                        动态属性
         -d,--detached                   独立运行
         -jm,--jobManagerMemory <arg>    JobManager的内存 [in MB]
         -nm,--name                     在YARN上为一个自定义的应用设置一个名字
         -q,--query                      显示yarn中可用的资源 (内存, cpu核数)
         -qu,--queue <arg>               指定YARN队列.
         -s,--slots <arg>                每个TaskManager使用的slots数量
         -tm,--taskManagerMemory <arg>   每个TaskManager的内存 [in MB]
         -z,--zookeeperNamespace <arg>   针对HA模式在zookeeper上创建NameSpace

    请注意:client必须要设置YARN_CONF_DIR或者HADOOP_CONF_DIR环境变量,通过这个环境变量来读取YARN和HDFS的配置信息,否则启动会失败。

    经试验发现,其实如果配置的有HADOOP_HOME环境变量的话也是可以的。HADOOP_HOME ,YARN_CONF_DIR,HADOOP_CONF_DIR 只要配置的有任何一个即可。

    例子:下面的命令会申请10个taskmanager,每个8G内存和32个solt

    ./bin/yarn-session.sh -n 10 -tm 8192 -s 32

    该系统默认会使用这个配置文件:conf/flink-conf.yaml。如果你想修改一些参数,请查看我们的配置指南

     

    flink on yarn模式将会覆盖一些配置文件 jobmanager.rpc.address(因为jobmanager总是分配在不同的机器),taskmanager.tmp.dirs(我们使用yarn提供的临时目录)和parallelism.default 如果solts的数量已经被指定。

    如果你不想修改配置文件去改变参数,有一个选择是通过动态的参数-D 来指定。所以你可以传递参数:-Dfs.overwrite-files=true -Dtaskmanager.network.memory.min=536346624

    上面的例子将会启动11个容器(即使仅请求10个容器),因为有一个额外的容器来启动ApplicationMaster 和 job manager

    一旦flink在你的yarn集群上部署,它将会显示job manager的连接详细信息。

    停止yarn session通过停止unix进程(使用CTRL+C)或者在client中输入stop。

    Flink on yarn只会启动请求的资源,如果集群资源充足。大多数yarn调度器请求容器的内存,一些也会请求cpu。默认,cpu的核数等于slots的数量,通过-s参数指定。这个参数yarn.containers.vcores的值允许使用一个自定义值来进行覆盖。

     

    后台 yarn session

    如果你不希望flink yarn client一直运行,也可以启动一个后台运行的yarn session。使用这个参数:-d 或者 --detached

    在这种情况下,flink yarn client将会只提交任务到集群然后关闭自己。注意:在这种情况下,无法使用flink停止yarn session。

    使用yarn 工具 来停止yarn session

    yarn application -kill <applicationId>

    附着到一个已存在的session

    使用下面命令启动一个session

    ./bin/yarn-session.sh

    执行这个命令将会显示下面内容:

    用法:
       必须
         -id,--applicationId <yarnAppId>        YARN集群上的任务id

    正如前面提到的,YARN_CONF_DIR或者HADOOP_CONF_DIR环境变量必须是可以读取到YARN和HDFS配置的。

    例如:发出下面命令可以附着到一个运行中的flink yarn session

    ./bin/yarn-session.sh -id application_1463870264508_0029

    附着到一个运行的session使用yarn resourcemanager来确定job Manager 的RPC端口。

    停止yarn session通过停止unix进程(使用CTRL+C)或者在client中输入stop

     

    提交任务到flink

    使用下面的命令提交一个flink程序到yarn集群

    ./bin/flink

    请参考客户端命令行操作文档

     

    这个命令将会向你展示一个这样一个帮助菜单

    "run" 参数可以编译和运行一个程序

    用法: run [OPTIONS] <jar-file> <arguments>
      "run" 操作参数:
         -c,--class <classname>           如果没有在jar包中指定入口类,则需要在这里通过这个参数指定
    
         -m,--jobmanager <host:port>      指定需要连接的jobmanager(主节点)地址
                                          使用这个参数可以指定一个不同于配置文件中的jobmanager
    
         -p,--parallelism <parallelism>   指定程序的并行度。可以覆盖配置文件中的默认值。

    使用run 命令向yarn集群提交一个job。客户端可以确定jobmanager的地址。当然,你也可以通过-m参数指定jobmanager。jobmanager的地址在yarn控制台上可以看到。

    例子:【注意:下面的命令官网文档提供的有问题,执行失败】

    wget -O LICENSE-2.0.txt http://www.apache.org/licenses/LICENSE-2.0.txt
    hadoop fs -copyFromLocal LICENSE-2.0.txt hdfs:/// ...
    【注意:下面的命令官网文档提供的有问题,执行失败】
    ./bin/flink run ./examples/batch/WordCount.jar \
            hdfs:///..../LICENSE-2.0.txt hdfs:///.../wordcount-result.txt
    查看flink源码发现,wordCount.jar可以不提供参数,或者提供参数,提供参数的时候需要使用input和output参数指定:
    上面的命令需要修改为如下格式才能正常执行[传递的两个参数需要使用-input和 -output来指定]
    ./bin/flink run ./examples/batch/WordCount.jar -input hdfs://hostname:port/LICENSE-2.0.txt -output hdfs://hostname:port/wordcount-result.txt
    
    
    

    如果有以下错误,确保所有taskmanager是否已经启动:

    Exception in thread "main" org.apache.flink.compiler.CompilerException:
        Available instances could not be determined from job manager: Connection timed out.

    你可以在jobmanager的web界面上检查taskmanager的数量。这个web界面的地址会打印在yarn session的控制台上。

    如果没有发现taskmanager,你应该通过日志文件来检查问题。

     

    在yarn上运行一个独立的flink job

    这个文档描述了如何在一个hadoop yarn环境中启动flink集群。也可以在yarn中启动只执行单个任务的flink。

    请注意:client期望设置-yn 参数(taskmanager的数量)

    例子:

    ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar

    yarn session命令行的选项也可以使用./bin/flink 工具获得。它们都有一个y或者yarn的前缀

    注意:通过为每个任务设置不同的环境变量 FLINK_CONF_DIR,可以为每个任务使用不同的配置目录。从 Flink 分发包中复制 conf 目录,然后修改配置,例如,每个任务不同的日志设置

     

    用户依赖jar包和classpath

    默认情况下,当运行一个独立的job的时候,这个flink job将包含用户依赖的jar包。可以通过参数yarn.per-job-cluster.include-user-jar来控制。

    当设置为 DISABLED ,flink将会包含用户classpath下面的jar包。

    用户jar包在类路径中的位置可以通过下面参数来控制:

    * ORDER: (默认) 添加jar包到系统类路径下面,按照字典顺序.
    * FIRST: 将jar添加到类路径的前面.
    * LAST: 将jar添加到类路径的最后.

     

    flink on yarn的故障恢复

    flink 的 yarn 客户端通过下面的配置参数来控制容器的故障恢复。这些参数可以通过conf/flink-conf.yaml 或者在启动yarn session的时候通过-D参数来指定。

     

    • yarn.reallocate-failed:这个参数控制了flink是否应该重新分配失败的taskmanager容器。默认是true。
    • yarn.maximum-failed-containers:applicationMaster可以接受的容器最大失败次数,达到这个参数,就会认为yarn session失败。默认这个次数和初始化请求的taskmanager数量相等(-n 参数指定的)。
    • yarn.application-attempts:applicationMaster重试的次数。如果这个值被设置为1(默认就是1),当application master失败的时候,yarn session也会失败。设置一个比较大的值的话,yarn会尝试重启applicationMaster。

     

    调试一个失败的yarn session

     

    一个flink yarn session部署失败可能会有很多原因。一个错误的hadoop配置(hdfs 权限,yarn配置),版本不兼容(使用cdh中的hadoop运行flink),或者其他的错误。

    日志文件

    在某种情况下,flink yarn session 部署失败是由于它自身的原因,用户必须依赖于yarn的日志来进行分析。最有用的就是yarn log aggregation 。启动它,用户必须在yarn-site.xml文件中设置yarn.log-aggregation-enable 属性为true。一旦启用了,用户可以通过下面的命令来查看一个失败的yarn session的所有详细日志。

    yarn logs -applicationId <application ID>

    yarn client控制台和web界面

    flink yarn client也会打印一些错误信息在控制台上,如果错误发生在运行时(例如如果一个taskmanager停止工作了一段时间)

    除此之外,yarn resource manager的web界面(默认端口是8088)。resource manager的端口是通过yarn.resourcemanager.webapp.address参数来配置的。

    它运行在yarn 程序运行的时候查看日志和程序失败的时候查看日志用户查找问题。

     

    针对指定的hadoop版本构建yarn client

    用户可以使用hadoop发行版。例如,hortonworks,CDH或者MapR等版本去构建 flink。请参考构建指南获取详细信息

     

    在yarn上运行flink使用防火墙

    一些yarn 集群使用防火墙来控制集群的网络和其他网络的通信。在这种设置下,flink只能通过集群的网络来提交任务到yarn session。针对生产环境下使用是不可行的,flink允许配置所有相关服务的端口范围,通过这些端口范围的配置,用户也可以透过防火墙来提交flink job。

    目前,两个服务都需要提交任务:

    • jobmanager(yarn中的applicationMaster)
    • jobmanager内部运行的blobserver

    当向flink提交一个任务的时候,blobserver将会把用户的代码分发到所有工作节点(taskManagers)。jobmanager接收任务本身,并触发执行。

    以下两个配置参数可以指定端口:

     

    • yarn.application-master.port
    • blob.server.port

    这两个配置选项接收单一的端口(例如:"50010"),区间("50000-50025"),或者同时指定多个("50010,50011,50020-50025,50050-50075")。

    (hadoop也使用的是类似的机制,例如:yarn.app.mapreduce.am.job.client.port-range)

     

    flink on yarn 内部实现

    本节主要描述flink和yarn是如何交互的

    YARN 客户端需要访问 Hadoop 配置,从而连接 YARN 资源管理器和 HDFS。可以使用下面的策略来决定 Hadoop 配置:

     

    • 测试 YARN_CONF_DIR, HADOOP_CONF_DIR 或 HADOOP_CONF_PATH 环境变量是否设置了(按该顺序测试)。如果它们中有一个被设置了,那么它们就会用来读取配置。
    • 如果上面的策略失败了(如果正确安装了 YARN 的话,这不应该会发生),客户端会使用 HADOOP_HOME 环境变量。如果该变量设置了,客户端会尝试访问 $HADOOP_HOME/etc/hadoop (Hadoop 2) 和 $HADOOP_HOME/conf(Hadoop 1)。

    当启动一个新的 Flink YARN Client会话,客户端首先会检查所请求的资源(容器和内存)是否可用。之后,它会上传包含了 Flink 配置和 jar文件到 HDFS(步骤 1)。

    客户端的下一步是请求(步骤 2)一个 YARN 容器启动 ApplicationMaster (步骤 3)。因为客户端将配置和jar 文件作为容器的资源注册了,所以运行在特定机器上的 YARN 的 NodeManager 会负责准备容器(例如,下载文件)。一旦这些完成了,ApplicationMaster (AM) 就启动了。

    JobManager 和 AM 运行在同一个容器中。一旦它们成功地启动了,AM 知道 JobManager 的地址(它自己)。它会为 TaskManager 生成一个新的 Flink 配置文件(这样它们才能连上 JobManager)。该文件也同样会上传到 HDFS。另外,AM 容器同时提供了 Flink 的 Web 界面服务。Flink 用来提供服务的端口是由用户 + 应用程序 id 作为偏移配置的。这使得用户能够并行执行多个 Flink YARN 会话。

    之后,AM 开始为 Flink 的 TaskManager 分配容器,这会从 HDFS 下载 jar 文件和修改过的配置文件。一旦这些步骤完成了,Flink 就安装完成并准备接受任务了。

     

     

     

    获取更多大数据资料,视频以及技术交流请加群:

     

     

     

    展开全文
  • 阿龙学堂-Flink简介

    万次阅读 多人点赞 2018-03-29 19:57:18
    1.Flink的引入 这几年大数据的飞速发展,出现了很多热门的开源社区,其中著名的有Hadoop、Storm,以及后来的Spark,他们都有着各自专注的应用场景。Spark掀开了内存计算的先河,也以内存为赌注,赢得了内存计算的...

    1. Flink的引入

    这几年大数据的飞速发展,出现了很多热门的开源社区,其中著名的有 Hadoop、Storm,以及后来的 Spark,他们都有着各自专注的应用场景。Spark 掀开了内存计算的先河,也以内存为赌注,赢得了内存计算的飞速发展。Spark 的火热或多或少的掩盖了其他分布式计算的系统身影。就像 Flink,也就在这个时候默默的发展着。

    在国外一些社区,有很多人将大数据的计算引擎分成了 4 代,当然,也有很多人不会认同。我们先姑且这么认为和讨论。

    首先第一代的计算引擎,无疑就是 Hadoop 承载的 MapReduce。这里大家应该都不会对 MapReduce 陌生,它将计算分为两个阶段,分别为 Map 和 Reduce。对于上层应用来说,就不得不想方设法去拆分算法,甚至于不得不在上层应用实现多个 Job 的串联,以完成一个完整的算法,例如迭代计算。

    由于这样的弊端,催生了支持 DAG 框架的产生。因此,支持 DAG 的框架被划分为第二代计算引擎。如 Tez 以及更上层的 Oozie。这里我们不去细究各种 DAG 实现之间的区别,不过对于当时的 Tez 和 Oozie 来说,大多还是批处理的任务。

    接下来就是以 Spark 为代表的第三代的计算引擎。第三代计算引擎的特点主要是 Job 内部的 DAG 支持(不跨越Job),以及强调的实时计算。在这里,很多人也会认为第三代计算引擎也能够很好的运行批处理的 Job。

    随着第三代计算引擎的出现,促进了上层应用快速发展,例如各种迭代计算的性能以及对流计算和 SQL 等的支持。Flink 的诞生就被归在了第四代。这应该主要表现在 Flink 对流计算的支持,以及更一步的实时性上面。当然Flink 也可以支持 Batch 的任务,以及 DAG 的运算。

    首先,我们可以通过下面的性能测试初步了解两个框架的性能区别,它们都可以基于内存计算框架进行实时计算,所以都拥有非常好的计算性能。经过测试,Flink计算性能上略好。 

     

    测试环境: 

    1.CPU:7000个; 

    2.内存:单机128GB; 

    3.版本:Hadoop 2.3.0,Spark 1.4,Flink 0.9 

    4.数据:800MB,8GB,8TB; 

    5.算法:K-means:以空间中K个点为中心进行聚类,对最靠近它们的对象归类。通过迭代的方法,逐次更新各聚类中心的值,直至得到最好的聚类结果。 

    6.迭代:K=10,3组数据 

     

    迭代次数(纵坐标是秒,横坐标是次数)

    Spark和Flink全部都运行在Hadoop YARN上,性能为Flink > Spark > Hadoop(MR),迭代次数越多越明显,性能上,Flink优于Spark和Hadoop最主要的原因是Flink支持增量迭代,具有对迭代自动优化的功能。 

    2. Flink简介

    很多人可能都是在 2015 年才听到 Flink 这个词,其实早在 2008 年,Flink 的前身已经是柏林理工大学一个研究性项目, 在 2014 被 Apache 孵化器所接受,然后迅速地成为了 ASF(Apache Software Foundation)的顶级项目之一。Flink 的最新版本目前已经更新到了 0.10.0 了,在很多人感慨 Spark 的快速发展的同时,或许我们也该为 Flink的发展速度点个赞。

    Flink 是一个针对流数据和批数据的分布式处理引擎。它主要是由 Java 代码实现。目前主要还是依靠开源社区的贡献而发展。对 Flink 而言,其所要处理的主要场景就是流数据,批数据只是流数据的一个极限特例而已。再换句话说,Flink 会把所有任务当成流来处理,这也是其最大的特点。

    Flink 可以支持本地的快速迭代,以及一些环形的迭代任务。并且 Flink 可以定制化内存管理。在这点,如果要对比 Flink 和 Spark 的话,Flink 并没有将内存完全交给应用层。这也是为什么 Spark 相对于 Flink,更容易出现 OOM的原因(out of memory)。就框架本身与应用场景来说,Flink 更相似与 Storm。如果之前了解过 Storm 或者Flume 的读者,可能会更容易理解 Flink 的架构和很多概念。下面让我们先来看下 Flink 的架构图。

     

     

    我们可以了解到 Flink 几个最基础的概念,Client、JobManager 和 TaskManager。Client 用来提交任务给JobManager,JobManager 分发任务给 TaskManager 去执行,然后 TaskManager 会心跳的汇报任务状态。看到这里,有的人应该已经有种回到 Hadoop 一代的错觉。确实,从架构图去看,JobManager 很像当年的 JobTracker,TaskManager 也很像当年的 TaskTracker。然而有一个最重要的区别就是 TaskManager 之间是是流(Stream)。其次,Hadoop 一代中,只有 Map 和 Reduce 之间的 Shuffle,而对 Flink 而言,可能是很多级,并且在 TaskManager内部和 TaskManager 之间都会有数据传递,而不像 Hadoop,是固定的 Map 到 Reduce。

    3. 技术的特点(可选)

    关于Flink所支持的特性,我这里只是通过分类的方式简单做一下梳理,涉及到具体的一些概念及其原理会在后面的部分做详细说明。

    3.1. 流处理特性

    支持高吞吐、低延迟、高性能的流处理

    支持带有事件时间的窗口(Window)操作

    支持有状态计算的Exactly-once语义

    支持高度灵活的窗口(Window)操作,支持基于time、count、session,以及data-driven的窗口操作

    支持具有Backpressure功能的持续流模型

    支持基于轻量级分布式快照(Snapshot)实现的容错

    一个运行时同时支持Batch on Streaming处理和Streaming处理

    Flink在JVM内部实现了自己的内存管理

    支持迭代计算

    支持程序自动优化:避免特定情况下Shuffle、排序等昂贵操作,中间结果有必要进行缓存

    3.2. API支持

    对Streaming数据类应用,提供DataStream API

    对批处理类应用,提供DataSet API(支持Java/Scala)

    3.3. Libraries支持

    支持机器学习(FlinkML)

    支持图分析(Gelly)

    支持关系数据处理(Table)

    支持复杂事件处理(CEP)

    3.4. 整合支持

    支持Flink on YARN

    支持HDFS

    支持来自Kafka的输入数据

    支持Apache HBase

    支持Hadoop程序

    支持Tachyon

    支持ElasticSearch

    支持RabbitMQ

    支持Apache Storm

    支持S3

    支持XtreemFS

    3.5. Flink生态圈

    一个计算框架要有长远的发展,必须打造一个完整的 Stack。不然就跟纸上谈兵一样,没有任何意义。只有上层有了具体的应用,并能很好的发挥计算框架本身的优势,那么这个计算框架才能吸引更多的资源,才会更快的进步。所以 Flink 也在努力构建自己的 Stack。

    Flink 首先支持了 Scala 和 Java 的 API,Python 也正在测试中。Flink 通过 Gelly 支持了图操作,还有机器学习的FlinkML。Table 是一种接口化的 SQL 支持,也就是 API 支持,而不是文本化的 SQL 解析和执行。对于完整的 Stack我们可以参考下图。

     

    Flink 为了更广泛的支持大数据的生态圈,其下也实现了很多 Connector 的子项目。最熟悉的,当然就是与Hadoop HDFS 集成。其次,Flink 也宣布支持了 Tachyon、S3 以及 MapRFS。不过对于 Tachyon 以及 S3 的支持,都是通过 Hadoop HDFS 这层包装实现的,也就是说要使用 Tachyon 和 S3,就必须有 Hadoop,而且要更改 Hadoop的配置(core-site.xml)。如果浏览 Flink 的代码目录,我们就会看到更多 Connector 项目,例如 Flume 和 Kafka。

    4. 安装

    Flink 有三种部署模式,分别是 Local、Standalone Cluster 和 Yarn Cluster。

    4.1. Local模式

    对于 Local 模式来说,JobManager 和 TaskManager 会公用一个 JVM 来完成 Workload。如果要验证一个简单的应用,Local 模式是最方便的。实际应用中大多使用 Standalone 或者 Yarn Cluster,而local模式只是将安装包解压启动(./bin/start-local.sh)即可,在这里不在演示。

    4.2. Standalone 模式

    4.2.1. 下载

    安装包下载地址:http://flink.apache.org/downloads.html

    快速入门教程地址:

    https://ci.apache.org/projects/flink/flink-docs-release-1.3/quickstart/setup_quickstart.html

     

     

     

    4.2.2. 上传安装包到linux系统

    使用rz命令

    4.2.3. 解压

    tar –zxvf flink-1.3.2-bin-hadoop26-scala_2.10.tgz

    4.2.4. 重命名

    mv flink-1.3.2 flink

    4.2.5. 修改环境变量

    切换到root用户配置

    export FLINK_HOME=/home/hadoop/flink
    export PATH=$PATH:$FLINK_HOME/bin

    配置结束后切换会普通用户

    source /etc/profile

    4.2.6. 修改配置文件

    修改flink/conf/masters

    master1:8081

    修改flink/conf/slaves

    master1ha
    
    master2
    
    master2ha

    修改flink/conf/flink-conf.yaml

    taskmanager.numberOfTaskSlots: 2
    
    jobmanager.rpc.address: master1

    4.2.7. 启动flink

    /home/Hadoop/flink/bin/start-cluster.sh

     

    4.2.8. Flink 的 Rest API

    Flink 和其他大多开源的框架一样,提供了很多有用的 Rest API。不过 Flink 的 RestAPI,目前还不是很强大,只能支持一些 Monitor 的功能。Flink Dashboard 本身也是通过其 Rest 来查询各项的结果数据。在 Flink RestAPI 基础上,可以比较容易的将 Flink 的 Monitor 功能和其他第三方工具相集成,这也是其设计的初衷。

    在 Flink 的进程中,是由 JobManager 来提供 Rest API 的服务。因此在调用 Rest 之前,要确定 JobManager 是否处于正常的状态。正常情况下,在发送一个 Rest 请求给 JobManager 之后,Client 就会收到一个 JSON 格式的返回结果。由于目前 Rest 提供的功能还不多,需要增强这块功能的读者可以在子项目 flink-runtime-web 中找到对应的代码。其中最关键一个类 WebRuntimeMonitor,就是用来对所有的 Rest 请求做分流的,如果需要添加一个新类型的请求,就需要在这里增加对应的处理代码。下面我例举几个常用 Rest API。

    1.查询 Flink 集群的基本信息: /overview。示例命令行格式以及返回结果如下:

    $ curl http://localhost:8081/overview

    {"taskmanagers":1,"slots-total":16,
    
    "slots-available":16,"jobs-running":0,"jobs-finished":0,"jobs-cancelled":0,"jobs-failed":0}

    2.查询当前 Flink 集群中的 Job 信息:/jobs。示例命令行格式以及返回结果如下:

    $ curl http://localhost:8081/jobs

    {"jobs-running":[],"jobs-finished":
    
    ["f91d4dd4fdf99313d849c9c4d29f8977"],"jobs-cancelled":[],"jobs-failed":[]}

    3.查询一个指定的 Job 信息: /jobs/jobid。这个查询的结果会返回特别多的详细的内容,这是我在浏览器中进行的测试,如下图:

    想要了解更多 Rest 请求内容的读者,可以去 Apache Flink 的页面中查找。

    4.2.9. 运行测试任务

    ./bin/flink run -m master1:8082 ./examples/batch/WordCount.jar --input hdfs://master1:9000/words.txt --output hdfs://master1:9000/clinkout

     

     

    4.3. Flink 的 HA

    首先,我们需要知道 Flink 有两种部署的模式,分别是 Standalone 以及 Yarn Cluster 模式。对于 Standalone 来说,Flink 必须依赖于 Zookeeper 来实现 JobManager 的 HA(Zookeeper 已经成为了大部分开源框架 HA 必不可少的模块)。在 Zookeeper 的帮助下,一个 Standalone 的 Flink 集群会同时有多个活着的 JobManager,其中只有一个处于工作状态,其他处于 Standby 状态。当工作中的 JobManager 失去连接后(如宕机或 Crash),Zookeeper 会从Standby 中选举新的 JobManager 来接管 Flink 集群。

    对于 Yarn Cluaster 模式来说,Flink 就要依靠 Yarn 本身来对 JobManager 做 HA 了。其实这里完全是 Yarn 的机制。对于 Yarn Cluster 模式来说,JobManager 和 TaskManager 都是被 Yarn 启动在 Yarn 的 Container 中。此时的JobManager,其实应该称之为 Flink Application Master。也就说它的故障恢复,就完全依靠着 Yarn 中的ResourceManager(和 MapReduce 的 AppMaster 一样)。由于完全依赖了 Yarn,因此不同版本的 Yarn 可能会有细微的差异。这里不再做深究。

    4.3.1. 修改配置文件

    修改flink-conf.yaml

    state.backend: filesystem
    
    state.backend.fs.checkpointdir: hdfs://master1:9000/flink-checkpoints
    
    high-availability: zookeeper
    
    high-availability.storageDir: hdfs://master1:9000/flink/ha/
    
    high-availability.zookeeper.quorum: master1ha:2181,master2:2181,master2ha:2181
    
    high-availability.zookeeper.client.acl: open

    修改conf

    server.1=master1ha:2888:3888
    
    server.2=master2:2888:3888
    
    server.3=master2ha:2888:3888

    修改masters

    master1:8082
    
    master1ha:8082

    修改slaves

    master1ha
    
    master2
    
    master2ha

    4.3.2. 启动

    /home/Hadoop/flink/bin/start-cluster.sh

     

     

     

    4.4. Yarn Cluster 模式

    4.4.1. 引入

    在一个企业中,为了最大化的利用集群资源,一般都会在一个集群中同时运行多种类型的 Workload。因此 Flink也支持在 Yarn 上面运行。首先,让我们通过下图了解下 Yarn 和 Flink 的关系。

    在图中可以看出,Flink 与 Yarn 的关系与 MapReduce 和 Yarn 的关系是一样的。Flink 通过 Yarn 的接口实现了自己的 App Master。当在 Yarn 中部署了 Flink,Yarn 就会用自己的 Container 来启动 Flink 的 JobManager(也就是App Master)和 TaskManager。

    4.4.2. 修改环境变量

    export HADOOP_CONF_DIR= /home/hadoop/hadoop/etc/hadoop

    4.4.3. 部署启动

    yarn-session.sh -d -s 2 -tm 800 -n 2

    上面的命令的意思是,同时向Yarn申请3个container,其中 2 个 Container 启动 TaskManager(-n 2),每个TaskManager 拥有两个 Task Slot(-s 2),并且向每个 TaskManager 的 Container 申请 800M 的内存,以及一个ApplicationMaster(Job Manager)。

    Flink部署到Yarn Cluster后,会显示Job Manager的连接细节信息。

    Flink on Yarn会覆盖下面几个参数,如果不希望改变配置文件中的参数,可以动态的通过-D选项指定,如

    -Dfs.overwrite-files=true -Dtaskmanager.network.numberOfBuffers=16368
    jobmanager.rpc.address:因为JobManager会经常分配到不同的机器上
    
    taskmanager.tmp.dirs:使用Yarn提供的tmp目录
    
    parallelism.default:如果有指定slot个数的情况下
    
    yarn-session.sh会挂起进程,所以可以通过在终端使用CTRL+C或输入stop停止yarn-session。

    如果不希望Flink Yarn client长期运行,Flink提供了一种detached YARN session,启动时候加上参数-d或—detached

    在上面的命令成功后,我们就可以在 Yarn Application 页面看到 Flink 的纪录。如下图。

    如果在虚拟机中测试,可能会遇到错误。这里需要注意内存的大小,Flink 向 Yarn 会申请多个 Container,但是Yarn 的配置可能限制了 Container 所能申请的内存大小,甚至 Yarn 本身所管理的内存就很小。这样很可能无法正常启动 TaskManager,尤其当指定多个 TaskManager 的时候。因此,在启动 Flink 之后,需要去 Flink 的页面中检查下 Flink 的状态。这里可以从 RM 的页面中,直接跳转(点击 Tracking UI)。这时候 Flink 的页面如图

     

    yarn-session.sh启动命令参数如下:

    Usage:  
    
       Required  
         -n,--container <arg>   Number of YARN container to allocate (=Number of Task Managers) 
       Optional  
         -D <arg>                        Dynamic properties  
         -d,--detached                   Start detached  
         -jm,--jobManagerMemory <arg>    Memory for JobManager Container [in MB]  
         -nm,--name                      Set a custom name for the application on YARN  
         -q,--query                      Display available YARN resources (memory, cores)  
         -qu,--queue <arg>               Specify YARN queue.  
         -s,--slots <arg>                Number of slots per TaskManager  
         -st,--streaming                 Start Flink in streaming mode  
         -tm,--taskManagerMemory <arg>   Memory per TaskManager Container [in MB]  

    4.4.4. 提交任务

    之后,我们可以通过这种方式提交我们的任务

    ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar

    以上命令在参数前加上y前缀,-yn表示TaskManager个数。

    在这个模式下,同样可以使用-m yarn-cluster提交一个"运行后即焚"的detached yarn(-yd)作业到yarn cluster。

     

    4.4.5. 停止yarn cluster

    yarn application -kill application_1507603745315_0001

    5. 技术的使用

    5.1. Flink开发标准流程

    1. 获取execution environment,
    final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
    1. 加载/创建初始化数据
    DataStream<String> text = env.readTextFile("file:///path/to/file");
    1. 指定 transformations 作用在数据上
    val mapped = input.map { x => x.toInt }
    1. 存储结果集
    writeAsText(String path)
    print()
    1. 触发程序执行

    在local模式下执行程序

    execute()

    将程序达成jar运行在线上

    ./bin/flink run \
    -m master1:8082 \
    ./examples/batch/WordCount.jar \
    --input hdfs://master1:9000/words.txt \
    --output hdfs://master1:9000/clinkout \

    5.2. Wordcount

    5.2.1. Scala代码

    object SocketWindowWordCount {
        def main(args: Array[String]) : Unit = {
            // the port to connect to
            val port: Int = try {
                ParameterTool.fromArgs(args).getInt("port")
            } catch {
                case e: Exception => {
                    System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
                    return
                }
            }
    
            // get the execution environment
            val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
            // get input data by connecting to the socket
            val text = env.socketTextStream("localhost", port, '\n')
            // parse the data, group it, window it, and aggregate the counts
            val windowCounts = text
                .flatMap { w => w.split("\\s") }
                .map { w => WordWithCount(w, 1) }
                .keyBy("word")
                .timeWindow(Time.seconds(5), Time.seconds(1))
                .sum("count")
    
            // print the results with a single thread, rather than in parallel
            windowCounts.print().setParallelism(1)
            env.execute("Socket Window WordCount")
        }
        // Data type for words with count
        case class WordWithCount(word: String, count: Long)
    }

    5.2.2. Java代码

    public class SocketWindowWordCount {
     public static void main(String[] args) throws Exception {
    
            // the port to connect to
            final int port;
            try {
                final ParameterTool params = ParameterTool.fromArgs(args);
                port = params.getInt("port");
            } catch (Exception e) {
                System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'");
                return;
            }
    
            // get the execution environment
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            // get input data by connecting to the socket
            DataStream<String> text = env.socketTextStream("localhost", port, "\n");
            // parse the data, group it, window it, and aggregate the counts
            DataStream<WordWithCount> windowCounts = text
                .flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String value, Collector<WordWithCount> out) {
                        for (String word : value.split("\\s")) {
                            out.collect(new WordWithCount(word, 1L));
                        }
    
                    }
    
                })
    
                .keyBy("word")
                .timeWindow(Time.seconds(5), Time.seconds(1))
                .reduce(new ReduceFunction<WordWithCount>() {
                    @Override
                    public WordWithCount reduce(WordWithCount a, WordWithCount b) {
                        return new WordWithCount(a.word, a.count + b.count);
                    }
    
                });
            // print the results with a single thread, rather than in parallel
            windowCounts.print().setParallelism(1);
            env.execute("Socket Window WordCount");
        }
        // Data type for words with count
        public static class WordWithCount {
            public String word;
            public long count;
            public WordWithCount() {}
            public WordWithCount(String word, long count) {
                this.word = word;
                this.count = count;
            }
            @Override
            public String toString() {
                return word + " : " + count;
            }
        }
    }

    5.2.3. 运行

    l 启动nc发送消息

    $ nc -l 9000

    l 启动flink程序

    $ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000

     

     

    5.2.4. 测试

    l 输入

    $ nc -l 9000
    
    lorem ipsum
    ipsum ipsum ipsum
    bye

    l 输出

    $ tail -f log/flink-*-jobmanager-*.out
    
    lorem : 1
    bye : 1
    ipsum : 4

    5.3. 使用IDEA开发离线程序

    Dataset是flink的常用程序,数据集通过source进行初始化,例如读取文件或者序列化集合,然后通过transformation(filtering、mapping、joining、grouping)将数据集转成,然后通过sink进行存储,既可以写入hdfs这种分布式文件系统,也可以打印控制台,flink可以有很多种运行方式,如local、flink集群、yarn等

    5.3.1. Pom

    <properties>
            <maven.compiler.source>1.7</maven.compiler.source>
            <maven.compiler.target>1.7</maven.compiler.target>
            <encoding>UTF-8</encoding>
            <scala.version>2.10.2</scala.version>
            <scala.compat.version>2.10</scala.compat.version>
            <hadoop.version>2.6.2</hadoop.version>
            <flink.version>1.3.2</flink.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.10</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table_2.10</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java_2.10</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.10</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.38</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.22</version>
            </dependency>
        </dependencies>
        <build>
            <sourceDirectory>src/main/scala</sourceDirectory>
            <testSourceDirectory>src/test/scala</testSourceDirectory>
            <plugins>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.0</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                            <configuration>
                                <args>
                                    <arg>-make:transitive</arg>
                                    <arg>-dependencyfile</arg>
                                    <arg>${project.build.directory}/.scala_dependencies</arg>
                                </args>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <version>2.18.1</version>
                    <configuration>
                        <useFile>false</useFile>
                        <disableXmlReport>true</disableXmlReport>
                        <includes>
                            <include>**/*Test.*</include>
                            <include>**/*Suite.*</include>
                        </includes>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.3</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </exludes>
                                    </filter>
                                </filters>
                                <transformers>
                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass>org.apache.spark.WordCount</mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    <properties>
    
            <maven.compiler.source>1.7</maven.compiler.source>
            <maven.compiler.target>1.7</maven.compiler.target>
            <encoding>UTF-8</encoding>
            <scala.version>2.10.2</scala.version>
            <scala.compat.version>2.10</scala.compat.version>
            <hadoop.version>2.6.2</hadoop.version>
            <flink.version>1.3.2</flink.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_2.10</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_2.10</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.10</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.38</version>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.22</version>
            </dependency>
        </dependencies>
        <build>
            <sourceDirectory>src/main/scala</sourceDirectory>
            <testSourceDirectory>src/test/scala</testSourceDirectory>
            <plugins>
                <plugin>
                    <roupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.0</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                            <configration>
                                <args>
                                    <arg>-make:transitive</arg>
                                    <arg>-dependencyfile</arg>
                                    <arg>${project.build.directory}/.scala_dependencies</arg>
                                </args>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <version>2.18.1</version>
                    <configuration>
                        <useFile>false</useFile>
                        <disableXmlReport>true</disableXmlReport>
                        <includes>
                            <include>**/*Test.*</include>
                            <include>**/*Suite.*</include>
                        </includes>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.3</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <transformers>
                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass>org.apache.spark.WordCount</mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>

    5.3.2. 程序

     Java

    public class WordCountExample {
        public static void main(String[] args) throws Exception {
            //构建环境
            final ExecutionEnvironment env =
    ExecutionEnvironment.getExecutionEnvironment();
            //通过字符串构建数据集
            DataSet<String> text = env.fromElements(
                "Who's there?",
                "I think I hear them. Stand, ho! Who's there?");
            //分割字符串、按照key进行分组、统计相同的key个数
            DataSet<Tuple2<String, Integer>> wordCounts = text
                .flatMap(new LineSplitter())
                .groupBy(0)
                .sum(1);
            //打印
            wordCounts.print();
        }
    
        //分割符串的方法
        public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
                for (String word : line.split(" ")) {
                    out.collect(new Tuple2<String, Integer>(word, 1));
                }
            }
        }
    }

    Scala

    import org.apache.flink.api.scala._
    
    
    object WordCount {
      def main(args: Array[String]) {
        //初始化环
        val env = ExecutionEnvironment.getExecutionEnvironment
        //从字符串中加载数据
        val text = env.fromElements(
          "Who's there?",
          "I think I hear them. Stand, ho! Who's there?")
        //分割字符串、汇总tuple、按照key进行分组、统计分组后word个数
        val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }
          .map { (_, 1) }
          .groupBy(0)
          .sum(1)
        //打印
        counts.print()
      }
    }

     

    5.3.3. 运行

    本地

    直接runas即可

    线上

    1、 打包
    
    2、 上传
    
    3、 执行命令:flink run -m master1:8082 -c org.apache.flink.WordCount original-Flink-1.0-SNAPSHOT.jar

     

    展开全文
  • flink实战--flinkSQL入门大全

    万次阅读 2018-11-12 18:07:03
    FlinkSQL概念介绍 Table API & SQL Apache Flink具有两个关系API - 表API和SQL - 用于统一流和批处理。Table API是Scala和Java的语言集成查询API,允许以非常直观的方式组合来自关系运算符的查询,Table API...
  • ambari HDP2.6.5 安装FLINK1.9

    万次阅读 2020-01-19 18:11:34
    ambari HDP2.6.5 安装FLINK1.9 ambari HDP2.6.5 安装FLINK1.9 要下载Flink服务文件夹,请运行以下命令 VERSION=`hdp-select status hadoop-client | sed 's/hadoop-client - \([0-9]\.[0-9]\).*/\1/'` sudo git ...
  • Flink 专题 -1 搭建FlinkFlink 简介

    千次阅读 2018-11-06 22:46:28
    文章目录Flink 专题1 : 搭建FlinkFlink 简介Flink 简介Flink 的优势:Flink 安装flink 安装步骤flink 集群模式 结构 :配置文件设置:添加jobManager/TaskManager启动集群1 集群模式启动2. yarn 模式启动 Flink ...
  • flink实战--flink整合kafka

    万次阅读 2018-12-26 22:06:30
    Flink Kafka Consumer集成了Flink的检查点机制,可提供一次性处理语义。为实现这一目标,Flink并不完全依赖kafka的消费者群体偏移跟踪,而是在内部跟踪和检查这些偏移。 Maven 依赖 支持到的版本 生产者和...
  • Flink 快速入门

    千人学习 2019-01-02 11:26:54
    通过本课程的学习,既能获得Flink企业级真实项目经验,也能深入掌握Flink的核心理论知识,还能获得Flink在生产环境中安装、部署、监控的宝贵经验,从而一站式全面、深入掌握Flink技术。 任务作业: 1.使用Eclipse...
  • Flink教程

    千次阅读 2020-02-27 15:39:05
    Flink 学习路线系列 Flink笔记(一):Flink介绍 Flink笔记(二):Flink环境搭建(standalone模式) Flink笔记(三):Flink 提交任务的两种方式 Flink笔记(四):Java 编写Flink实时任务(WordCount 示例) Flink笔记(五):...
  • 最近也是由于电脑出了一点问题,就没有更新,今天主要来介绍一下Flink的web ui的使用,我们先提交一个job上去(我的集群是on yarn的),命令如下: flink run -m yarn-cluster -c flink.window.FlinkWindowDemo -yn 2 -ys ...
  • Flink:Flink-SQL开发

    千次阅读 2020-07-27 18:08:40
    Flink SQL 是 Flink 实时计算为简化计算模型,降低用户使用实时计算门槛而设计的一套符合标准 SQL 语义的开发语言。 自 2015 年开始,阿里巴巴开始调研开源流计算引擎,最终决定基于 Flink 打造新一代计算引擎,针对...
  • Flink SQL Demo 为切入,结合调试过程,深入理解 Flink Streaming SQL flink 语法扩展 Flink SQL 引擎:Calcite 简述 Flink Table/SQL 执行流程 以 Flink SQL Demo 为切入,结合调试过程,深入理...
  • Flink示例——Flink-CDC

    千次阅读 2020-09-18 23:28:39
    文章目录Flink示例——State、Checkpoint、Savepoint版本信息Mavan依赖主从同步配置、数据准备使用Flink-CDC Flink示例——State、Checkpoint、Savepoint 版本信息 产品 版本 Flink 1.11.1 flink-cdc-...
  • 快速入门Flink (1) —— Flink的简介与架构体系

    千次阅读 多人点赞 2020-07-16 16:30:54
    文章目录一、Flink 的简介1.1 Flink的引入1.2 什么是 Flink1.3 Flink 流处理特性1.4 Flink 基石1.5 批处理与流处理 一、Flink 的简介 1.1 Flink的引入         这几年...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 48,510
精华内容 19,404
关键字:

flink