flink_flinkx - CSDN
flink 订阅
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。 [1] 展开全文
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。 [1]
信息
稳定版本
Apache软件基金会 [1]
类    型
1.8.0 [1]
中文名
弗林克
外文名
Flink [1]
flink开发
Apache Flink是由Apache软件基金会内的Apache Flink社区基于Apache许可证2.0开发的,该项目已有超过100位代码提交者和超过460贡献者。 [2]  是由Apache Flink的创始人创建的公司。目前,该公司已聘用了12个Apache Flink的代码提交者。 [3] 
收起全文
精华内容
参与话题
  • 本课程使用Flink入门及进阶教程, 前期讲解Flink API的使用,分析Flink的架构,已经第三方插件的接入。 进阶教程是一个Flnk的电商行为分析项目,使用同学能够对前期的只是进行融会贯通。
  • Flink入门这一篇就够了

    千次阅读 2020-06-17 09:33:37
    Flink入门这一篇就够了 Flink好处在这不再赘述,有想要了解的同学可自行搜索 Linux中安装好JDK1.8 Flink下载(没有windows版本) https://flink.apache.org/downloads.html 通过sftp将下载好的文件传入到linux系统...

    Flink入门这一篇就够了

    Flink好处在这不再赘述,有想要了解的同学可自行搜索

    Linux中安装好JDK1.8

    在这里插入图片描述

    Flink下载(没有windows版本)

    https://flink.apache.org/downloads.html

    在这里插入图片描述通过sftp将下载好的文件传入到linux系统上(连接上虚拟机后 alt + p)
    在这里插入图片描述在这里插入图片描述在/root下可以将文件移动到指定目录中
    [root@hadoop301 ~]# mv flink-1.6.1-bin-hadoop26-scala_2.11.tgz /opt
    解压
    [root@hadoop301 ~]# tar -zxvf flink-1.6.1-bin-hadoop26-scala_2.11.tgz

    进入目录启动
    [root@hadoop301 flink-1.6.1]# ./bin/start-cluster.sh

    点开浏览器输入linuxIP端口为8081打开网页
    在这里插入图片描述

    在IDEA中可以开始写JAVA代码了(也可以使用scala)自行查看
    https://ci.apache.org/projects/flink/flink-docs-release-1.6/quickstart/setup_quickstart.html

    1.创建一个maven工程
    2.pom文件

       <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-java</artifactId>
                <version>1.6.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.11</artifactId>
                <version>1.6.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-clients_2.11</artifactId>
                <version>1.6.1</version>
            </dependency>
        </dependencies>
    

    Word Count 代码-读取本地文件

    //获取环境
    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    //读取本地文件-放入到数据集合中
    DataSet<String> text = env.readTextFile("/path/to/file");
    
    DataSet<Tuple2<String, Integer>> counts =
            text.flatMap(new Tokenizer())
            .groupBy(0)
            .sum(1);
    
    counts.writeAsCsv(outputPath, "\n", " ");
    
    
    public static class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            //value是文件中一行一行的数据,将每行数据进行切割
            String[] tokens = value.toLowerCase().split("\\W+");
            //token是分割出来的单词
            for (String token : tokens) {
            	//如果切割出来的的单词
                if (token.length() > 0) {
                	//写入集合进行统计
                    out.collect(new Tuple2<String, Integer>(token, 1));
                }   
            }
        }
    }
    
    展开全文
  • 一文弄懂Flink基础理论

    万次阅读 多人点赞 2019-10-22 20:21:05
    文章目录Flink概述Flink生态为什么选择Flink?系统架构JobManager运行架构常用的类型和操作程序结构介绍并行数据流Task and Operator Chains核心原理Window&TimeWindowTimeState状态管理按组织形式的划分按照数据...

    Flink概述

    Apache Flink是一个计算框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。其针对数据流的分布式计算提供了数据分布、数据通信以及容错机制等功能。基于流执行引擎,Flink提供了诸多更高抽象层的API以便用户编写分布式任务:

    • DataSet API, 对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用Flink提供的各种操作符对分布式数据集进行处理,支持Java、Scala和Python。

    • DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持Java和Scala。

    • Table API,对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过类SQL的DSL对关系表进行各种查询操作,支持Java和Scala。

    在这里插入图片描述

    从部署上讲,Flink支持local模式、集群模式(standalone集群或者Yarn集群)、云端部署。Runtime是主要的数据处理引擎,它以JobGraph形式的API接收程序,JobGraph是一个简单的并行数据流,包含一系列的tasks,每个task包含了输入和输出(source和sink例外)。

    Flink生态

    在这里插入图片描述

    为什么选择Flink?

    在这里插入图片描述

    系统架构

    Flink分布式程序包含2个主要的进程:JobManager和TaskManager.当程序运行时,不同的进程就会参与其中,包括Jobmanager、TaskManager和JobClient。
    在这里插入图片描述
    当 Flink 集群启动后,首先会启动一个 JobManger 和一个或多个的 TaskManager。由 Client 提交任务给 JobManager,JobManager 再调度任务到各个 TaskManager 去执行,然后 TaskManager 将心跳和统计信息汇报给 JobManager。TaskManager 之间以流的形式进行数据的传输。上述三者均为独立的 JVM 进程。

    JobManager

    Master进程,负责Job的管理和资源的协调。包括任务调度,检查点管理,失败恢复等。

    当然,对于集群HA模式,可以同时多个master进程,其中一个作为leader,其他作为standby。当leader失败时,会选出一个standby的master作为新的leader(通过zookeeper实现leader选举)。

    JobManager包含了3个重要的组件:

    ###(1)Actor系统

    Flink内部使用Akka模型作为JobManager和TaskManager之间的通信机制。

    Actor系统是个容器,包含许多不同的Actor,这些Actor扮演者不同的角色。Actor系统提供类似于调度、配置、日志等服务,同时包含了所有actors初始化时的线程池。

    所有的Actors存在着层级的关系。新加入的Actor会被分配一个父类的Actor。Actors之间的通信采用一个消息系统,每个Actor都有一个“邮箱”,用于读取消息。如果Actors是本地的,则消息在共享内存中共享;如果Actors是远程的,则消息通过RPC远程调用。

    每个父类的Actor都负责监控其子类Actor,当子类Actor出现错误时,自己先尝试重启并修复错误;如果子类Actor不能修复,则将问题升级并由父类Actor处理。

    在Flink中,actor是一个有状态和行为的容器。Actor的线程持续的处理从“邮箱”中接收到的消息。Actor中的状态和行为则由收到的消息决定。
    在这里插入图片描述

    ###(2)调度
    Flink中的Executors被定义为task slots(线程槽位)。每个Task Manager需要管理一个或多个task slots。
    Flink通过SlotSharingGroup和CoLocationGroup来决定哪些task需要被共享,哪些task需要被单独的slot使用。
    ###(3)检查点

    Flink的检查点机制是保证其一致性容错功能的骨架。它持续的为分布式的数据流和有状态的operator生成一致性的快照。Flink的容错机制持续的构建轻量级的分布式快照,因此负载非常低。通常这些有状态的快照都被放在HDFS中存储(state backend)。程序一旦失败,Flink将停止executor并从最近的完成了的检查点开始恢复(依赖可重发的数据源+快照)。

    参考:三分钟掌握Flink基本概念和原理

    运行架构

    常用的类型和操作

    在这里插入图片描述
    参考:
    Flink 原理与实现:数据流上的类型和操作:http://wuchong.me/blog/2016/05/20/flink-internals-streams-and-operations-on-streams
    Flink Stream 算子:https://flink.sojb.cn/dev/stream/operators

    程序结构介绍

    在这里插入图片描述

    Source,它是整个stream的入口。
    Transformation,用于转换一个或多个DataStream从而形成一个新的DataStream对象。
    Sink,它流的数据出口。

    并行数据流

    Flink程序本质上是并行和分布式的。在程序执行期间,一个流会生成一个或者多个stream partition,并且一个operator会生成一个或者多个operator subtask。operator的 subtask 彼此之间是独立的,分别在不同的线程里去执行并且可能分布在不同的机器上或者containers上。
    operator的subtasks的数量等于该操作算子的并行度的数量。流的并行度有总是取决于产生它的操作算子的并行度决定的。同一个flink程序中的不同的operators可能有不同的并行度。
    在这里插入图片描述

    数据流在两个operators之间进行传递的方式有两种:one-to-one 模式 和 redistributing 模式

    • one-to-one 模式
      两个operator用此模式传递的时候,会保持数据的分区数和数据的排序,比如:在下图中Source和map() operators之间的数据传递方式;
    • Redistributing 模式(重新分配模式)
      这种模式会改变数据的分区数;每个一个operator subtask会根据选择transformation把数据发送到不同的目标subtasks,比如keyBy()会通过hashcode重新分区,broadcast()和rebalance()方法会随机重新分区,比如:在下图中map()和keyBy/window ,keyBy/window和Sink之间的数据传递方式;

    Flink每个算子都可以设置并行度,然后就是也可以设置全局并行度。
    api设置.map(new RollingAdditionMapper()).setParallelism(10)
    全局配置在flink-conf.yaml文件中,parallelism.default,默认是1

    Task and Operator Chains

    为了更高效地分布式执行,Flink会尽可能地将operator的subtask链接(chain)在一起形成task。每个task在一个线程中执行。将operators链接成task是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。
    在这里插入图片描述
    可以进行Operator chains的条件
    1、上下游的并行度一致
    2、下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
    3、上下游节点都在同一个 slot group 中(下面会解释 slot group)
    4、下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)
    5、上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)
    6、两个节点间数据分区方式是 forward(参考理解数据流的分区)
    7、用户没有禁用 chain

    核心原理

    Apache Flink 之所以能越来越受欢迎,我们认为离不开它最重要的四个基石:Checkpoint、State、Time、Window。

    Window&Time

    Window

    Flink 中 Window 可以将无限流切分成有限流,是处理有限流的核心组件,现在Flink 中 Window 可以是时间驱动的(Time Window),也可以是数据驱动的(Count Window),如下图所示:
    在这里插入图片描述

    上图中,基于时间的窗口操作,在每个相同的时间间隔对Stream中的记录进行处理,通常各个时间间隔内的窗口操作处理的记录数不固定;而基于数据驱动的窗口操作,可以在Stream中选择固定数量的记录作为一个窗口,对该窗口中的记录进行处理。

    窗口类型:

    • tumbling window(滚动窗口):窗口间的元素无重复

    一个翻滚窗口分配器的每个数据元分配给指定的窗口的窗口大小。翻滚窗具有固定的尺寸,不重叠。例如,如果指定大小为5分钟的翻滚窗口,则将评估当前窗口,并且每五分钟将启动一个新窗口
    在这里插入图片描述

    • sliding window(滑动窗口):窗口间的元素可能重复

    该滑动窗口分配器分配元件以固定长度的窗口。与翻滚窗口分配器类似,窗口大小由窗口大小参数配置。附加的窗口滑动参数控制滑动窗口的启动频率。因此,如果幻灯片小于窗口大小,则滑动窗口可以重叠。在这种情况下,数据元被分配给多个窗口。
    例如,您可以将大小为10分钟的窗口滑动5分钟。有了这个,你每隔5分钟就会得到一个窗口,其中包含过去10分钟内到达的事件
    在这里插入图片描述

    • session window(会话窗口)

    在会话窗口中按活动会话分配器组中的数据元。与翻滚窗口和滑动窗口相比,会话窗口不重叠并且没有固定的开始和结束时间。相反,当会话窗口在一段时间内没有接收到数据元时,即当发生不活动的间隙时,会关闭会话窗口。会话窗口分配器可以配置静态会话间隙或 会话间隙提取器函数,该函数定义不活动时间段的长度。当此期限到期时,当前会话将关闭,后续数据元将分配给新的会话窗口。
    在这里插入图片描述

    • global window(全局窗口)

    一个全局性的窗口分配器分配使用相同的Keys相同的单个的所有数据元全局窗口。此窗口方案仅在您还指定自定义触发器时才有用。否则,将不执行任何计算,因为全局窗口没有我们可以处理聚合数据元的自然结束。
    在这里插入图片描述
    参考:
    window:http://flink.iteblog.com/dev/windows.html

    Time

    Time的分类
    在这里插入图片描述

    • Event-Time :事件时间是每个事件在其生产设备上发生的时间。此时间通常在进入Flink之前嵌入记录中,并且 可以从每个记录中提取该事件时间戳。
    • Ingestion-Time :摄取时间是事件进入Flink的时间。在源算子处,每个记录将源的当前时间作为时间戳,并且基于时间的 算子操作(如时间窗口)引用该时间戳。
    • Processing-Time : 处理时间是指执行相应算子操作的机器的系统时间。

    引入Watermark的背景?

    主要解决延迟数据

    我们可以考虑一个这样的例子:某 App 会记录用户的所有点击行为,并回传日志(在网络不好的情况下,先保存在本地,延后回传)。A 用户在 11:02 对 App 进行操作,B 用户在 11:03 操作了 App,但是 A 用户的网络不太稳定,回传日志延迟了,导致我们在服务端先接受到 B 用户 11:03 的消息,然后再接受到 A 用户 11:02 的消息,消息乱序了。

    那我们怎么保证基于 event-time 的窗口在销毁的时候,已经处理完了所有的数据呢?这就是 watermark 的功能所在。watermark 会携带一个单调递增的时间戳 t,watermark(t) 表示所有时间戳不大于 t 的数据都已经到来了,未来不会再来,因此可以放心的触发和销毁窗口了。

    什么是Watermark?
    Watermark是Apache Flink为了处理EventTime 窗口计算提出的一种机制,本质上也是一种时间戳,由Apache Flink Source或者自定义的Watermark生成器按照需求Punctuated或者Periodic两种方式生成的一种系统Event,与普通数据流Event一样流转到对应的下游算子,接收到Watermark Event的算子以此不断调整自己管理的EventTime clock。

    乱序流中Watermark的工作示意图:
    在这里插入图片描述
    并行流中的Watermarks的工作示意图:
    在这里插入图片描述
    多并行度的情况下,watermark对齐会取所有channel最小的watermark。
    例如:多输入operator(union、 keyBy、 partition)的当前event time是其输入流event time的最小值。

    设置Time类型
    不设置Time 类型,默认是processingTime。

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    

    如果使用EventTime则需要在source之后明确指定Timestamp Assigner & Watermark Generator

    Watermark的产生方式:

    • Punctuated - 数据流中每一个递增的EventTime都会产生一个Watermark。在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。
      接口定义AssignerWithPunctuatedWatermarks:Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp);
    • Periodic - 周期性的(一定时间间隔或者达到一定的记录条数)产生一个Watermark。在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。
      接口定义AssignerWithPeriodicWatermarks:Watermark getCurrentWatermark();

    Watermark触发计算时间:
    在基于 Event-Time 的流处理应用中,每个数据有两个必需的信息:

    • 时间戳:事件发生的时间
    • Watermark:算子通过Watermark推断当前的事件时间。Watermark用于通知算子没有比水位更小的时间戳的事件会发生了。

    基于时间的窗口会根据事件时间将一个数据分配给某个窗口。每个时间窗口都有一个 开始时间戳结束时间戳
    所有内置的窗口分配器都会提供一个默认的触发器,一旦时间超过某个窗口的结束时间,触发器就会触发对这个窗口的计算。
    触发过程见示例:https://juejin.im/post/5bf95810e51d452d705fef33

    参考:
    event-time:https://flink.sojb.cn/dev/event_time.html

    flink watermark的原理和实践:https://www.jianshu.com/p/7d524ef8143c

    Flink学习笔记:Time的故事:https://www.cnblogs.com/dajiangtai/p/10697318.html

    State状态管理

    State是指流计算过程中计算节点的中间计算结果或元数据属性,比如 在aggregation过程中要在state中记录中间聚合结果,比如 Apache Kafka 作为数据源时候,我们也要记录已经读取记录的offset,这些State数据在计算过程中会进行持久化(插入或更新)。所以Apache Flink中的State就是与时间相关的,Apache Flink任务的内部数据(计算数据和元数据属性)的快照。

    按组织形式的划分

    • Managed State,这类State的内部结构完全由Flink runtime内部来控制,包括如何将它们编码写入到checkpoint中等等。
    • Raw State,这类State就比较显得灵活一些,它们被保留在操作运行实例内部的数据结构中。从Flink系统角度来观察,在checkpoint时,它只知道的是这些状态数据是以连续字节的形式被写入checkpoint中。等待进行状态恢复时,又从字节数据反序列化为状态对象。

    Managed State可以在所有的data stream相关方法中被使用,官方也是推荐优先使用这类State,因为它能被Flink runtime内部做自动重分布而且能被更好地进行内存管理。

    按照数据的划分和扩张方式

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    参考:
    Apache Flink 漫谈系列 - State:https://www.codercto.com/a/32411.html
    Flink 的状态管理和检查点机制:http://blog.jrwang.me/2017/flink-state-checkpoint

    Checkpoint容错机制

    Checkpoint是Flink实现容错机制最核心的功能,它能够根据配置周期性地基于Stream中各个Operator/task的状态来生成一个轻量级的分布式快照,从而将这些状态数据定期持久化存储下来,当Flink程序一旦意外崩溃时,重新运行程序时可以有选择地从这些快照进行恢复,从而修正因为故障带来的程序数据异常。

    默认情况下,检查点不会保存,仅用于从失败中恢复作业。取消程序时会删除它们。但是,您可以配置要保存的定期检查点。根据配置 ,当作业失败或取消时,不会自动清除这些保存的检查点。这样,如果您的工作失败,您将有一个检查点可以从中恢复。

    
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    env.enableCheckpointing(1000);
    
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
    
    env.getCheckpointConfig().setCheckpointTimeout(60000);
    
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    
    env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    
    

    Flink的失败恢复依赖于“检查点机制+可部分重发的数据源”。

    Flink 实现了一个轻量级的分布式快照机制,其核心点在于 Barrier。 Coordinator 在需要触发检查点的时候要求数据源注入向数据流中注入 barrie, barrier 和正常的数据流中的消息一起向前流动,相当于将数据流中的消息切分到了不同的检查点中。当一个 operator 从它所有的 input channel 中都收到了 barrier,则会触发当前 operator 的快照操作,并向其下游 channel 中发射 barrier。当所有的 sink 都反馈收到了 barrier 后,则当前检查点创建完毕。
    在这里插入图片描述

    在此过程中会涉及到对齐操作,一些 operator 拥有多个 input channel,它往往不会同时从这些 channel 中接收到 barrier。如果 Operator 继续处理 barrier 先到达的 channel 中的消息,那么在所有 channel 的 barrier 都到达时,operator 就会处于一种混杂的状态。在这种情况下,Flink 采用对齐操作来保证 Exactly Once 特性。Operator 会阻塞 barrier 先到达的 channel,通常是将其流入的消息放入缓冲区中,待收到所有 input channel 的 barrier 后,进行快照操作,释放被阻塞的 channel,并向下游发射 barrier。
    在这里插入图片描述
    Barries 对齐过程:
    (1). 一旦operator从输入流接收到快照barrier n,它就不能处理来自该流的任何其他记录,直到它从其他输入接收到barrier n为止。 否则,它会混合属于快照n的记录和属于快照n + 1的记录。
    (2). 包含barrier n的流数据暂时被Operator搁置。 从这些流接收的记录不会被处理,而是放入输入缓冲区。
    (3). 一旦最后一个流接收到屏障n,Operator就会向下一个Operator发出所有挂起的流数据,然后自己发出快照n个屏障。
    (4). 之后,它将继续处理来自所有输入流的记录,在处理来自流的记录之前,会优先处理来自输入缓冲区的记录。
    对齐操作会对流处理造成延时,但通常不会特别明显。如果应用对一致性要求比较宽泛的话,那么也可以选择跳过对齐操作。这意味着快照中会包含一些属于下一个检查点的数据,这样就不能保证 Exactly Once 特性,而只能降级为 At Least Once。

    Checkpoint的执行流程:
    Checkpoint的执行流程是按照Chandy-Lamport算法实现的。

    在这里插入图片描述

    后端状态存储方式

    在有状态的流处理中,当开发人员启用了 Flink 中的 checkpoint 机制,那么状态将会持久化以防止数据的丢失并确保发生故障时能够完全恢复。选择何种状态后端,将决定状态持久化的方式和位置。

    Flink 提供了三种可用的状态后端:MemoryStateBackendFsStateBackendRocksDBStateBackend

    
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
    
    

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    参考:

    如何选择状态后端:http://wuchong.me/blog/2018/11/21/flink-tips-how-to-choose-state-backends

    Savepoint保存点

    Savepoint 是命令触发的 Checkpoint,对流式程序做一次完整的快照并将结果写到 State backend,可用于停止、恢复或更新 Flink 程序。整个过程依赖于 Checkpoint 机制。另一个不同之处是,Savepoint 不会自动清除。

    分配算子ID

    Savepoint 中会以 Operator ID 作为 key 保存每个有状态算子的状态。

    Operator ID 用于确定每个算子的状态,只要ID不变,就可以从 Savepoint 中恢复,Operator ID 如果不显示指定会自动生成,生成的ID取决于程序的结构,并且对程序更改很敏感。

    
    DataStream<String> stream = env.
    
      // Stateful source (e.g. Kafka) with ID
    
      .addSource(new StatefulSource())
    
      .uid("source-id") // ID for the source operator
    
      .shuffle()
    
      // Stateful mapper with ID
    
      .map(new StatefulMapper())
    
      .uid("mapper-id") // ID for the mapper
    
      // Stateless printing sink
    
      .print(); // Auto-generated ID
    
    

    Savepoint 包含了两个主要元素:

    • 1、首先,Savepoint 包含了一个目录,其中包含(通常很大的)二进制文件,这些文件表示了整个流应用在 Checkpoint/Savepoint 时的状态。
    • 2、以及一个(相对较小的)元数据文件,包含了指向 Savapoint 各个文件的指针,并存储在所选的分布式文件系统或数据存储中。

    何时使用 Savepoint ?
    虽然流式应用程序处理的数据是持续地生成的(“运动中”的数据),但是存在着想要重新处理之前已经处理过的数据的情况。Savepoint 可以在以下情况下使用:

    • 部署流应用的一个新版本,包括新功能、BUG 修复、或者一个更好的机器学习模型
    • 引入 A/B 测试,使用相同的源数据测试程序的不同版本,从同一时间点开始测试而不牺牲先前的状态
    • 在需要更多资源时扩容应用程序
    • 迁移流应用程序到 Flink 的新版本上,或者迁移到另一个集群

    如何使用?
    savepoint是有用户手动管理的,常用操作包含:

    • 保存Savepoint
      $ bin/flink savepoint :jobId [:targetDirectory]
      这将触发具有ID的作业的保存点:jobId,并返回创建的保存点的路径。您需要此路径来还原和部署保存点。

    • 在yarn 集群中保存Savepoint
      $ bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
      这将触发具有ID :jobId和YARN应用程序ID 的作业的保存点:yarnAppId,并返回创建的保存点的路径。

    • 使用 Savepoint 取消job
      $ bin/flink cancel -s [:targetDirectory] :jobId
      这将以原子方式触发具有ID的作业的保存点:jobid并取消作业。此外,您可以指定目标文件系统目录以存储保存点。该目录需要可由JobManager和TaskManager访问。

    • Resuming Savepoint
      $ bin/flink run -s :savepointPath [:runArgs]
      这将提交作业并指定要从中恢复的保存点。您可以指定保存点目录或_metadata文件的路径。

    • 允许未恢复状态启动

    $ bin/flink run -s :savepointPath -n [:runArgs]

    默认情况下,resume操作将尝试将保存点的所有状态映射回要恢复的程序。如果删除了运算符,则可以通过–allowNonRestoredState(short -n:)选项跳过无法映射到新程序的状态

    • 删除Savepoint

    $ bin/flink savepoint -d :savepointPath

    通过指定路径删除 Savepoint,也可以通过文件系统手动删除 Savepoint 数据,而不会影响其他 Savepoint 或 Checkpoint。

    Savepoint 和 Checkpoint

    在这里插入图片描述

    参考:
    Savepoint 和 Checkpoint 的 3 个不同点:http://wuchong.me/blog/2018/11/25/flink-tips-differences-between-savepoints-and-checkpoints/
    Flink 专题 -2 Checkpoint、Savepoint 机制:https://yq.aliyun.com/articles/665758?spm=a2c4e.11153940.0.0.143e64c01hVKeN

    Flink部署与运行

    Yarn运行Flink作业

    link支持多种部署模式:本地、集群(Standalone/YARN)、云(GCE/EC2)。Standalone部署模式与Spark类似,这里,我们看一下Flink on YARN的部署模式,如下图所示:
    在这里插入图片描述

    实际Flink也实现了满足在YARN集群上运行的各个组件:Flink YARN Client负责与YARN RM通信协商资源请求,Flink JobManager和Flink TaskManager分别申请到Container去运行各自的进程。通过上图可以看到,YARN AM与Flink JobManager在同一个Container中,这样AM可以知道Flink JobManager的地址,从而AM可以申请Container去启动Flink TaskManager。待Flink成功运行在YARN集群上,Flink YARN Client就可以提交Flink Job到Flink JobManager,并进行后续的映射、调度和计算处理。
    在YARN上启动一个Flink主要有两种方式:(1)、启动一个YARN session(Start a long-running Flink cluster on YARN);(2)、直接在YARN上提交运行Flink作业(Run a Flink job on YARN)。
    在这里插入图片描述

    Flink YARN Session

    在这里插入图片描述

    首先,看下yarn-session.sh脚本参数

    yarn-session.sh脚本参数
    用法:
         必须:
              -n,--container <arg> 要分配的YARN容器数(=任务管理器数)
         可选的
              -D <property=value> 使用给定属性的值
              -d,--detached 如果存在,则以分离模式运行作业,不启动客户端进程,不打印YARN返回信息
              -h,--help 
              -id,--applicationId <arg> 附加到正在运行的YARN会话
              -j,--jar <arg> Flink jar文件的路径
              -jm,--jobManagerMemory <arg> 具有可选单元的JobManager容器的内存(默认值:MB)
              -m,--jobmanager <arg> 要连接的JobManager(主站)的地址。 使用此标志连接到指定地址的JobManager
         配置中的:
              -n,--container <arg> 要分配的YARN容器数(=任务管理器数)
              -nl,--nodeLabel <arg> 为YARN应用程序指定YARN节点标签
              -nm,--name <arg> 在YARN上为应用程序设置自定义名称
              -q,--query 显示可用的YARN资源(内存,内核)
              -qu,--queue <arg> 指定YARN队列
              -s,--slots <arg> 每个TaskManager的槽
              -sae,--shutdownOnAttachedExit 如果作业以附加模式提交,请在CLI突然终止时执行尽力而为的群集关闭,例如,响应用户中断,
                                                                                   例如键入Ctrl + C.
              -st,--streaming 流模式启动flink
              -t,--ship <arg> 在指定目录中发送文件(t用于传输)
              -tm,--taskManagerMemory <arg> 没taskmanager内存数
              -yd,--yarndetached 如果存在,则以分离模式运行作业(不建议使用;请改为使用非YARN特定选项)
              -z,--zookeeperNamespace <arg> 命名空间,用于为高可用性模式创建Zookeeper子路径
    

    在启动的是可以指定TaskManager的个数以及内存(默认是1G),也可以指定JobManager的内存,但是JobManager的个数只能是一个。好了,我们开启动一个YARN session:
    ./bin/yarn-session.sh -n 10 -tm 8192 -s 32
    上面命令启动了10个TaskManager,每个管理器具有8 GB内存和32个处理插槽(是每个TaskManager,默认是1个核)。
    注:以上命令实际启动了11个容器(即使只请求了10个容器),因为ApplicationMaster和Job Manager还有一个额外的容器。
    上述命令一直在终端中运行着的,此时可以通过停止unix进程(使用CTRL + C)或在客户端输入“stop”来停止yarn session。
    如果想启动一个后台运行的yarn session。使用这个参数:-d 或者 --detached 在这种情况下,flink yarn client将会只提交任务到集群然后关闭自己。
    附着到一个已存在的flink yarn session,可以用./bin/yarn-session.sh -id <applicationId>
    如果关闭一个已存在的flink yarn session,可以用yarn application -kill <applicationId>

    启动了YARN session之后我们如何运行作业呢?很简单,我们可以使用./bin/flink脚本提交作业,同样我们来看看这个脚本支持哪些参数:

    flink 命令脚本参数说明
    
    操作“run”编译并运行程序。
    
       run 操作选项
              -c,--class <classname> 具有程序入口点的类“main”方法或“getPlan()”方法。仅在JAR文件未在其清单中指定类时才需要。
              -C,--classpath <url> 向集群中所有节点上的每个用户代码类加载器添加URL。路径必须指定协议(例如file://)并且可以在所有节点上访问
                                                                                                    (例如,通过NFS共享)。您可以多次使用此选项来指定多个URL。该协议必须由{@link java.net.URLClassLoader}支持。
              -d,--detached 如果存在,则以分离模式运行作业(不启动客户端,集群提交方式,不在客户端打印返回信息)
              -n,--allowNonRestoredState 允许跳过无法恢复的保存点状态。如果在触发保存点时从程序中删除了作为程序一部分的运算符,则需要允许此操作。
              -p,--parallelism <parallelism> 运行程序的并行性。可选标志,用于覆盖配置中指定的默认值。
              -q,--sysoutLogging 如果存在,则将日志记录输出抑制为标准输出
              -s,--fromSavepoint <savepointPath> 保存点的路径,用于从中恢复作业(例如hdfs:///flink/savepoint-1537)。
              -sae,--shutdownOnAttachedExit 如果作业以附加模式提交,请在CLI突然终止时执行尽力而为的群集关闭,例如,响应用户中断,例如键入Ctrl + C.
       YARN集群模式选项:
              -d,--detached 如果存在,则以分离模式运行作业
              -m,--jobmanager <arg> 要连接的JobManager(主站)的地址。使用此标志连接到与配置中指定的JobManager不同的JobManager。
              -yD <property=value> 使用给定属性的值
              -yd,--yarndetached 如果存在,则以分离模式运行作业(不建议使用;请改为使用非YARN特定选项)
              -yh,--yarnhelp yarn session cli帮助(“-yh”不是有效的操作)
              -yid,--yarnapplicationId <arg> 附加到正在运行的YARN会话
              -yj,--yarnjar <arg> Flink jar文件的路径
              -yjm,--yarnjobManagerMemory <arg> 有可选单元的JobManager容器的内存(默认值:MB)
              -yn,--yarncontainer <arg> 要分配的YARN容器数(=任务管理器数)
              ynl,--yarnnodeLabel <arg> 为YARN应用程序指定YARN节点标签
              -ynm,--yarnname <arg> 在YARN上为应用程序设置自定义名称
              -yq,--yarnquery 显示可用的YARN资源(内存,内核)
              -yqu,--yarnqueue <arg> 指定YARN队列
              -ys,--yarnslots <arg> 每个TaskManager的插槽数
              -yst,--yarnstreaming 以流模式启动Flink
              -yt,--yarnship <arg> 在指定目录中发送文件(t用于传输)
              -ytm,--yarntaskManagerMemory <arg> 具有可选单元的每个TaskManager容器的内存(默认值:MB)
              -yz,--yarnzookeeperNamespace <arg> 命名空间,用于为高可用性模式创建Zookeeper子路径
              -z,--zookeeperNamespace <arg> 命名空间,用于为高可用性模式创建Zookeeper子路径
       默认模式的选项:
              -m,--jobmanager <arg> 要连接的JobManager(主站)的地址。 使用此标志连接到与配置文件中指定的JobManager不同的JobManager。
              -z,--zookeeperNamespace <arg> 命名空间,用于为高可用性模式创建Zookeeper子路径
    
    操作“info”显示程序的优化执行计划(JSON)。
    
    
    语法:info [OPTIONS] <jar-file> <arguments>
    “info”动作选项: 
              -c,--class <classname> 具有程序入口点的类(“main”方法或“getPlan()”方法。仅在JAR文件未在其清单中指定类时才需要。
    -p,--parallelism <parallelism> 运行程序的并行性。 可选标志,用于覆盖配置中指定的默认值。
    
    
    操作“list”列出了运行和计划的程序。 
    语法: list [OPTIONS]
    "list" 操作选项
              -r,--running 仅显示正在运行的程序及其JobID
              -s,--scheduled Show only scheduled programs and their JobIDs
    yarn-cluster 模式选项
              -m,--jobmanager <arg> 要连接的JobManager(主站)的地址。 使用此标志连接到与配置中指定的JobManager不同的JobManager。
              -yid,--yarnapplicationId <arg> 附加到正在运行的YARN会话
              -z,--zookeeperNamespace <arg> 命名空间,用于为高可用性模式创建Zookeeper子路径
    默认模式的选项:
              -m,--jobmanager <arg>
              -z,--zookeeperNamespace <arg>
    
    操作“stop”会停止正在运行的程序(仅限流式处理作业)。
    语法:stop [OPTIONS] <Job ID>
    "stop"操作选项:
    yarn-cluster 模式选项
              -m,--jobmanager <arg> 指定需要操作的非默认的jobmanager地址
              -yid,--yarnapplicationId <arg> 追加到指定的yarn容器
              -z,--zookeeperNamespace <arg> 命名空间,用于为高可用性模式创建Zookeeper子路径
    默认选项
              -m,--jobmanager <arg>
              -z,--zookeeperNamespace <arg>
    
    操作“cancel”取消正在运行的程序。 
    语法:cancel [OPTIONS] <Job ID>
    "cancel" 操作选项
              -s,--withSavepoint <targetDirectory> 触发保存点并取消作业。 目标目录是可选的。 如果未指定目录,则使用配置的缺省目录(state.savepoints.dir)。
              yarn-cluster 模式选项
              -m,--jobmanager <arg> 指定需要操作的非默认的jobmanager地址
              -yid,--yarnapplicationId <arg> 追加到指定的yarn容器
              -z,--zookeeperNamespace <arg> 命名空间,用于为高可用性模式创建Zookeeper子路径
    默认模式的选项:
              -m,--jobmanager <arg>
              -z,--zookeeperNamespace <arg>
    
    
    
    操作"savepoint" 触发正在运行的作业的保存点或处置现有作业。
    语法:savepoint [OPTIONS] <Job ID> [<target directory>]
    "savepoint"操作选项
              -d,--dispose <arg> 处置的保存点的路径。
              -j,--jarfile <jarfile> flink程序jar文件
    yarn-cluster 模式选项
              -m,--jobmanager <arg> 要连接的JobManager(主站)的地址。 使用此标志连接到与配置中指定的JobManager不同的JobManager。
              -yid,--yarnapplicationId <arg> 附加到正在运行的YARN会话
              -z,--zookeeperNamespace <arg> 命名空间,用于为高可用性模式创建Zookeeper子路径
    默认模式的选项:
              -m,--jobmanager <arg>
              -z,--zookeeperNamespace <arg>
    
    
    操作"modify"修改正在运行的作业(例如,并行性的改变)。
    语法:modify <Job ID> [OPTIONS]
    "modify" 操作选项
              -h,--help 
              -p,--parallelism <newParallelism> 指定作业的新并行性。
              -v,--verbose 不推荐使用此选项。
    yarn-cluster 模式选项
              -m,--jobmanager <arg> 指定需要操作的非默认的jobmanager地址
              -yid,--yarnapplicationId <arg> 追加到指定的yarn容器
              -z,--zookeeperNamespace <arg> 命名空间,用于为高可用性模式创建Zookeeper子路径
    默认模式的选项:
              -m,--jobmanager <arg>
              -z,--zookeeperNamespace <arg>
    

    可以自动获取到YARN session的地址,然后我们以WordCount程序启动程序:

    ./bin/flink run ./examples/batch/WordCount.jar \
                              --input hdfs:///user/iteblog/LICENSE \
                              --output hdfs:///user/iteblog/result.txt  
    

    Run a single Flink job on YARN(推荐)

    在这里插入图片描述
    我们也可以不需要事先启动YARN session,而直接启动一个Flink作业,在这个作业运行完session也就结束了。

    #命令行启动示例:
    ./bin/flink run -m yarn-cluster -yn 2 ./examples/batch/WordCount.jar \
                          --input hdfs:///user/iteblog/LICENSE \
                          --output hdfs:///user/iteblog/result.txt
    

    上面的命令同样会启动一个类似于YARN session启动的页面。其中的-yn是指TaskManager的个数,必须指定。

    Standalone部署

    一般用于开发环境

    在这里插入图片描述

    参考:Flink从入门到放弃(入门篇2)-本地环境搭建&构建第一个Flink应用.md

    Storm、Spark-Streaming和Flink对比

    storm spark streaming flink
    流模型 原生流 微批次 原生流
    延迟 毫秒 毫秒
    消息处理 At least once exactly once exactly once
    消息容错 记录&ack 基于RDD的checkpoint checkpoint(基于分布式快照)
    状态管理 非内置 专有的DStream 带状态的操作
    吞吐量
    API Low level High level High level
    成熟度 高(工业标准) 高(正当时) 低(新兴)
    代码贡献量 378 1400 543
    Beam Runner not support support support

    参考:

    流计算框架 Flink 与 Storm 的性能对比:https://tech.meituan.com/2017/11/17/flink-benchmark.html

    Demo演示(SocketTextStreamWordCount)

    我们使用 Flink 自带的 examples 包中的 SocketTextStreamWordCount,这是一个从 socket 流中统计单词出现次数的例子。(如果仅仅是演示效果,在Standalone模式下即可),假设flink的安装目为FLINK_HOME

    SocketTextStreamWordCount 的具体代码如下:(flink-1.0.x 版本,最新示例点击)

    
    public static void main(String[] args) throws Exception {
    
      // 检查输入
    
      final ParameterTool params = ParameterTool.fromArgs(args);
    
      ...
    
    
    
      // set up the execution environment
    
      final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    
    
      // get input data
    
      DataStream<String> text =
    
          env.socketTextStream(params.get("hostname"), params.getInt("port"), '\n', 0);
    
    
    
      DataStream<Tuple2<String, Integer>> counts =
    
          // split up the lines in pairs (2-tuples) containing: (word,1)
    
          text.flatMap(new Tokenizer())
    
              // group by the tuple field "0" and sum up tuple field "1"
    
              .keyBy(0)
    
              .sum(1);
    
      counts.print();
    
      
    
      // execute program
    
      env.execute("WordCount from SocketTextStream Example");
    
    }
    
    

    1、首先,使用 netcat 启动在终端中输入:

    nc -l 9000

    2、提交Flink作业

    cd FLINK_HOME,直接使用example中的SocketTextStreamWordCount.jar即可。

    ./bin/flink run examples/streaming/SocketTextStreamWordCount.jar --port 9000

    3、输入单词并查看结果

    在第1步的netcat中输入单词(多个单词用空格隔开)

    新开终端,cd FLINK_HOME,执行tail -f flink*.out,可以实时查看执行结果

    另外,日志可可以在web ui上直接查看

    参考:https://flink.sojb.cn/tutorials/local_setup.html

    Fink-Startup

    maven创建初始工程

    Flink Maven Archetype 来创建我们的项目结构和一些初始的默认依赖。在你的工作目录下,运行如下命令来创建项目:

    
    mvn archetype:generate \
    
          -DarchetypeGroupId=org.apache.flink \
    
          -DarchetypeArtifactId=flink-quickstart-java \
    
          -DarchetypeCatalog=https://repository.apache.org/content/repositories/snapshots/ \
    
          -DarchetypeVersion=1.7-SNAPSHOT
    
    

    当然也可以用以下命令

    curl https://flink.apache.org/q/quickstart-SNAPSHOT.sh | bash -s 1.7-SNAPSHOT

    这样一个工程就构建好了

    参考:

    https://flink.sojb.cn/dev/projectsetup/java_api_quickstart.html

    https://flink.sojb.cn/tutorials/datastream_api.html

    https://flink.sojb.cn/dev/projectsetup/dependencies.html

    参考

    Apache Flink官方文档(英文):https://ci.apache.org/projects/flink/flink-docs-stable/
    Ververica【推荐】:https://ververica.cn/
    Apache Flink中文文档:https://flink.sojb.cn
    github:https://github.com/apache/flink
    flink-forward-china-2018: https://github.com/flink-china/flink-forward-china-2018
    flink-training-course:https://github.com/flink-china/flink-training-course
    God-Of-BigData:https://github.com/wangzhiwubigdata/God-Of-BigData/tree/master/Flink
    Flink China:https://zh.ververica.com
    一文了解 Apache Flink 核心技术:http://wuchong.me/blog/2018/11/09/flink-tech-evolution-introduction/
    深入理解Apache Flink核心技术:https://www.toutiao.com/a6254143247988293890
    Apache Flink状态管理和容错机制介绍:https://www.iteblog.com/archives/2417.html
    Flink Improvement Proposals:https://cwiki.apache.org/confluence/display/FLINK/Flink+Improvement+Proposals
    Flink on YARN部署快速入门指南:https://www.iteblog.com/archives/1620.html
    Flink流计算编程–Flink扩容、程序升级前后的思考:https://blog.csdn.net/lmalds/article/details/73457767

    展开全文
  • Flink 基本工作原理

    万次阅读 多人点赞 2018-06-30 18:11:43
    Flink是新的stream计算引擎,用java实现。既可以处理stream data也可以处理batch data,可以同时兼顾Spark以及Spark streaming的功能,与Spark不同的是,Flink本质上只有stream的概念,batch被认为是special stream...

          Flink是新的stream计算引擎,用java实现。既可以处理stream data也可以处理batch data,可以同时兼顾Spark以及Spark streaming的功能,与Spark不同的是,Flink本质上只有stream的概念,batch被认为是special stream。Flink在运行中主要有三个组件组成,JobClient,JobManager 和 TaskManager。主要工作原理如下图   

     

    用户首先提交Flink程序到JobClient,经过JobClient的处理、解析、优化提交到JobManager,最后由TaskManager运行task。

    JobClient

    JobClient是Flink程序和JobManager交互的桥梁,主要负责接收程序、解析程序的执行计划、优化程序的执行计划,然后提交执行计划到JobManager。为了了解Flink的解析过程,需要简单介绍一下Flink的Operator,在Flink主要有三类Operator,

    • Source Operator ,顾名思义这类操作一般是数据来源操作,比如文件、socket、kafka等,一般存在于程序的最开始
    • Transformation Operator 这类操作主要负责数据转换,map,flatMap,reduce等算子都属于Transformation Operator,
    • Sink Operator,意思是下沉操作,这类操作一般是数据落地,数据存储的过程,放在Job最后,比如数据落地到Hdfs、Mysql、Kafka等等。 
    Flink会将程序中每一个算计解析成Operator,然后按照算子之间的关系,将operator组合起来,形成一个Operator组合成的Graph。如下面的代码解析之后形成的执行计划,
    DataStream<String> data = env.addSource(...);
    data.map(x->new Tuple2(x,1)).keyBy(0).timeWindow(Time.seconds(60)).sum(1).addSink(...)
    解析形成执行计划之后,JobClient的任务还没有完,还负责执行计划的优化,这里执行的主要优化是将相邻的Operator融合,形成OperatorChain,因为Flink是分布式运行的,程序中每一个算子,在实际执行中被分隔为多个SubTask,数据流在算子之间的流动,就对应到SubTask之间的数据传递,SubTask之间进行数据传递模式有两种一种是one-to-one的,数据不需要重新分布,也就是数据不需要经过IO,节点本地就能完成,比如上图中的source到map,一种是re-distributed,数据需要通过shuffle过程重新分区,需要经过IO,比如上图中的map到keyBy。显然re-distributed这种模式更加浪费时间,同时影响整个Job的性能。所以,Flink为了提高性能,将one-to-one关系的前后两类subtask,融合形成一个task。而TaskManager中一个task运行一个独立的线程中,同一个线程中的SubTask进行数据传递,不需要经过IO,不需要经过序列化,直接发送数据对象到下一个SubTask,性能得到提升,除此之外,subTask的融合可以减少task的数量,提高taskManager的资源利用率。图1.0中的执行计划,优化结果如下图,Flink的subTask融合规则可以参考官方文档。
    • 值得注意的是,并不是每一个SubTask都可以被融合,对于不能融合的SubTask会独立形成一个Task运行在TaskManager中。
    • 改变operator的并行度,可能会导致不同的优化结果,同时这也是性能调优的一个重要方式,例如不显式设置operator的并行度的时候,默认所有算子的并行度是一样的,所以会有下图中的优化结果。
    我们来分析一下默认情况下可能发生的问题,假如设置作业的并行度为10,source明确为kafka,对应topic只有一个topic,因为source默认会根据topic的分区数,决定自己的分区数,那么10个source subtask只有一个会工作,而且任务比较重。这样会导致后面的map实际也是有一个subTask在工作,处理所有的数据,假如map中的任务比较重,那么会导致数据倾斜,性能低下。在source不能改造的情况下,我们显式减少source的并行度(为了节省资源,设置1),提高map的并行度(增加处理速度,设为20)。第一眼看上去,感觉性能提升了不少,但是在实际情况中却不一定这样。因为调整source和map的并发度,失去了原有one-to-one数据传递的优势,导致subTask不能融合,数据需要reblance,产生大量的IO,所以修改并行度也不一定可以提升性能。修改并行度之后,执行计划的优化结果如下图。所以在实际优化的过程中,还是要注意结合数据分布和执行计划调优,理解Flink执行计划的生成过程很有必要。


    JobManager

    JobManager是一个进程,主要负责申请资源,协调以及控制整个job的执行过程,具体包括,调度任务、处理checkpoint、容错等等,在接收到JobClient提交的执行计划之后,针对收到的执行计划,继续解析,因为JobClient只是形成一个operaor层面的执行计划,所以JobManager继续解析执行计划(根据算子的并发度,划分task),形成一个可以被实际调度的由task组成的拓扑图,如上图被解析之后形成下图的执行计划,最后向集群申请资源,一旦资源就绪,就调度task到TaskManager。




    为了保证高可用,一般会有多个JobManager进程同时存在,它们之间也是采用主从模式,一个进程被选举为Leader,其他进程为follower。Job运行期间,只有Leader在工作,follower在闲置,一旦Leader挂掉,随即引发一次选举,产生新的Leader继续处理Job。JobManager除了调度任务,另外一个主要工作就是容错,主要依靠checkpoint进行容错,checkpoint其实是stream以及executor(TaskManager中的Slot)的快照,一般将checkpoint保存在可靠的存储中(比如hdfs),为了容错Flink会持续建立这类快照。当Flink作业重新启动的时候,会寻找最新可用的checkpoint来恢复执行状态,已达到数据不丢失,不重复,准确被处理一次的语义。一般情况下,都不会用到checkpoint,只有在数据需要积累或处理历史状态的时候,才需要设定checkpoint,比如updateStateByKey这个算子,默认会启用checkpoint,如果没有配置checkpoint目录的话,程序会抛异常。


    TaskManager

    TaskManager是一个进程,及一个JVM(Flink用java实现)。主要作用是接收并执行JobManager发送的task,并且与JobManager通信,反馈任务状态信息,比如任务分执行中,执行完等状态,上文提到的checkpoint的部分信息也是TaskManager反馈给JobManager的。如果说JobManager是master的话,那么TaskManager就是worker主要用来执行任务。在TaskManager内可以运行多个task。多个task运行在一个JVM内有几个好处,首先task可以通过多路复用的方式TCP连接,其次task可以共享节点之间的心跳信息,减少了网络传输。TaskManager并不是最细粒度的概念,每个TaskManager像一个容器一样,包含一个多或多个Slot,如图1.2。



    Slot是TaskManager资源粒度的划分,每个Slot都有自己独立的内存。所有Slot平均分配TaskManger的内存,比如TaskManager分配给Solt的内存为8G,两个Slot,每个Slot的内存为4G,四个Slot,每个Slot的内存为2G,值得注意的是,Slot仅划分内存,不涉及cpu的划分。同时Slot是Flink中的任务执行器(类似Storm中Executor),每个Slot可以运行多个task,而且一个task会以单独的线程来运行。Slot主要的好处有以下几点:
    1. 可以起到隔离内存的作用,防止多个不同job的task竞争内存。
    2. Slot的个数就代表了一个Flink程序的最高并行度,简化了性能调优的过程
    3. 允许多个Task共享Slot,提升了资源利用率,举一个实际的例子,kafka有3个partition,对应flink的source有3个task,而keyBy我们设置的并行度为20,这个时候如果Slot不能共享的话,需要占用23个Slot,如果允许共享的话,那么只需要20个Slot即可(Slot的默认共享规则计算为20个)。
    共享Slot,虽然在flink中允许task共享Slot提升资源利用率,但是如果一个Slot中容纳过多task反而会造成资源低下(比如极端情况下所有task都分布在一个Slot内),在Flink中task需要按照一定规则共享Slot。共享Slot的方式有两种,SlotShardingGroup和CoLocationGroup,CoLocationGroup这种方式目前还没有接触过,如果感兴趣可以查阅官方文档。下面主要介绍一下SlotShardingGroup的用法,这种共享的基本思路就是给operator分组,同一组的不同operator的task,可以共享一个Slot。默认所有的operator属于同一个组“default”,及所有operator的task可以共享一个Slot,可以给operator设置不同的group,防止不合理的共享。Flink在调度task分配Slot的时候有两个重要原则:
    • 同一个job中,同一个group中不同operator的task可以共享一个Slot
    • Flink是按照拓扑顺序从Source依次调度到Sink的
    还拿上述的例子来说明Slot共享以及task调度的原理,如图1.3假设有两个TaskManager(TaskManager1和TaskManager2),每个TaskManager有2个Slot(Slot1和Slot2)。为了方便理解Slot共享的流程需要提前定义operator的并发度,来决定task的调度顺序。假设source/map的并发度为2,keyBy/window/sink的并发度为4,那么调度的顺序依次为source/map[1] ->source/map[2] ->keyBy/window/sink[1]->keyBy/window/sink[2]->keyBy/window/sink[3]->keyBy/window/sink[4]。如图1.3为了便于说明流程,将source/map的并发度设为4,keyBy/window/sink的并发度设为4。那么首先分配task source/map[1],这个时候Slot中还没有task,分配到TaskManager1中,然后分配 source/map[2],根据Slot共享规则,source/map[1]和source/map[2] 属于同一operator的不同task,所以source/map[2]不能共享Slot1,被分配到TaskManager1的Slot2,source/map[3]和source/map[4]同样会依次分配不同的Slot,接下来分配keyBy/window/sink[1],根据Slot共享规则,它可以和source/map[1],共享同一个slot,所以也被分配到TaskManager1的Slot1中,keyBy/window/sink的其他任务依次被分配到不同Slot中。图1.4为并行度分别设置为2和4的分配过程,这里不再展开说明。
       

    总结

           上述内容,主要介绍了,Flink的基本架构以及Flink执行的基本原理,重点说明了Flink实现高性能的一些基本原理,因为写的比较匆忙,如有错误之处,欢迎大家评论指正。

    参考资料

    • https://ci.apache.org/projects/flink/flink-docs-master/internals/job_scheduling.html?spm=a2c4e.11153940.blogcont64819.14.4cc928ce5F2w98
    • https://ci.apache.org/projects/flink/flink-docs-master/concepts/programming-model.html
    • https://yq.aliyun.com/articles/64819
    • https://blog.csdn.net/lisi1129/article/details/54844919
    • Learning Apache Flink
    展开全文
  • Flink大数据项目实战

    千人学习 2019-04-22 15:44:31
    本课程基于某电商公司运营实时...通过本课程的学习,既能获得Flink企业级真实项目经验,也能深入掌握Flink的核心理论知识,还能获得Flink在生产环境中安装、部署、监控的宝贵经验,从而一站式全面、深入掌握Flink技术。
  • Flink原理及架构(一)

    千次阅读 2018-08-16 23:33:37
    Flink核心是一个流式的数据流执行引擎,并且能够基于同一个Flink运行时,提供支持流处理和批处理两种类型应用 一.Flink介绍   Flink核心是一个流式的数据流执行引擎,并且能够基于同一个Flink运行时,提供...
      Flink核心是一个流式的数据流执行引擎,并且能够基于同一个Flink运行时,提供支持流处理和批处理两种类型应用

    一.Flink介绍

      Flink核心是一个流式的数据流执行引擎,并且能够基于同一个Flink运行时,提供支持流处理和批处理两种类型应用。其针对数据流的分布式计算提供了数据分布,数据通信及容错机制等功能。基于流执行引擎,Flink提供了跟多高抽象层的API便于用户编写分布式任务。下面介绍常见的几种API;
      DataSet API: 对静态数据进行批处理作业,将静态数据抽象成分布式的数据集,用户可以方便的使用Flink提供的各种操作符对分布式数据集进行处理,支持Java,Scala和python;
      DataStream API:对数据流进行流处理作业,将流式的数据抽象成分布式的数据流,用户可以方面的对分布式数据流进行各种操作,支持Java,scala和python;
      Table API:对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过SQL的DSL对关系表进行各种查询操作,支持Java和Scala;
      SQL: SQL查询是使用TableEnvironment的sqlquery()方法执行的,该方法以SQL的形式返回SQL查询的结果。Table可以在后续的SQL和Table API查询中使用,可以转换诶DataSet和DataStream,也可以写入TableSink。SQL和Table API可以无缝的整合,进行整体优化并转换为单个程序。要访问SQL中查询的表,必须在TableEnvironment中注册他,可以从TableSource,Table,DataStream和DataSet注册表,用户也可以在TableEnvironment中注册外部目录以制定数据源的位置。Blink开源后,将使Flink SQL更加完善稳定。
      StateFul Stream Processing:最低级抽象只提供有状态流,通过Process Function嵌入到DataStream API中,它允许用户自由处理来自一个或者多个流的时间,并使用一致的容错状态,此外用户可以注册event time和processing time回调,允许程序实现复杂的计算。

    下面是Flink的基础架构图:
    这里写图片描述
      从上图可知,Flink从另一个角度看待流处理和批处理,将2者统一起来。Flink是完全支持流处理,也就是说流处理看待数据是无界限的,批处理作为流处理的一种特殊情况,数据只是被定义为有界的。
      Flink可以与Hadoop集成,可以方便读取Hadoop项目中的组件数据,如hive,hdfs及Hbase等。以kafka作为流式的数据源,直接可以重用storm代码。

    二.Flink流处理特性

    1 支持高吞吐,低延迟,高性能的流处理;
    2.支持带有事件事件窗口操作;
    3.支持有状态计 算的exactly-once语义;
    4.支持高度灵活的window操作;
    5.支持具有backPressure功能的持续流模型;
    6.支持具有轻量级分布式快照实现的容错;
    7.一个运行时同时支持Batch on streaming和streaming处理;
    8.Flink在JVM内部实现了自己的内存管理;
    9.支持迭代计算;
    10.支持程序自动优化:避免特定情况下Shuffer,排序等昂贵操作,中间结果又必要进行缓存;
    

    三. 批处理与流处理统一

       在大数据领域,批处理任务和流处理任务是2中不同的任务,一个大数据系统一般被设计为只能处理其中的一个任务。如storm,spark streaming只支持流处理。而mapreduce,spark只支持批处理。 而spark streaming采用了micro-batch的架构,所以spark streaming还是基于spark批处理对流式数据进行处理。而storm,flink通过其灵活的执行引擎,且Flink能同时支持批处理任务和流处理任务。

      Flink以固定的缓存块为单位进行网络数据传输,如果缓存块的超时值为0,则Flink的数据传输方式类似于上下文提到的流式处理系统模型,此时系统可以获取低延迟的要求;如果缓存块的超时值无限大,则Flink的数据传输方式类似于上下文提到的批处理系统模型;

      总结的一点是:如果缓存块的阀值越小,那么Flink流处理系统的延迟就越低,吞吐量也越低;反之亦然;

      在统一的流式处理引擎上,Flink同时支持了流计算和批处理,并对系能(延迟,吞吐量等)有所保证,相对于其他原生的流处理和批处理刺痛,并没有因为统一执行引擎而受到影响从而大幅度减轻了用户安装,部署,监控,维护等成本。

    四. Flink流处理的时间窗口

      对于流处理系统而言,流入的消息不存在上限。所以对于聚合和连接操作而言,流处理系统需要对流入的消息进行分段,然后基于每一段消息进行聚合或者连接。消息的分段即为窗口。对于大部分流处理系统而言,时间窗口一般是根据task所在节点的本地时钟进行切分,这种方式实现起来比较容易,不回产生阻塞,但可能无法满足某些场景需求:

    • 消息本身带有时间戳,用户希望按照消息本身的时间进行分段处理;
    • 由于不同节点的时钟可能不同,以及消息在流经各个节点的延迟不同,在某个节点属于同一个时间窗口处理的消息,流到下一个节对于点可能被切分到不同的时间窗口中。产生不符合预期的结果。

    Flink支持3种类型的时间窗口,分别使用与用户时间窗口不同类型的要求:

    • Operrator Time:根据Task所在节点的本地时钟来切分的时间窗口;
    • Event Time:消息自带有时间戳,根据消息的时间戳进行处理,确保时间戳在同一个时间窗口的所有消息一定会被正确处理。由于消息可能乱序流入task,所以task需要缓存当前时间窗口消息处理的状态,直到确认属于该时间窗口的所有消息都被处理,才可以释放。如果乱序的消息延迟很高会影响到分布式系统的吞吐两盒延迟;
    • Ingress Time:有时时间并不带有时间戳,但用户依然希望按照消息而不是节点时钟划分时间窗口;例如避免上面提到的第二个问题,此时可以在消息源流入Flink流处理系统时自动生成增量的时间戳赋予消息,之后处理的流程与Event Time相同,Ingress Time可以看做是Event Time的一个特例,由于在消息源时间戳一定是有序的,所以在流处理系统中,相对于Event Time,其乱序的消息延迟不回很高,因此对于Flink分布式系统的吞吐量很延迟的影响也会更小;

    五.Event Time时间窗口的实现

      Flink借鉴了Google的MillWheel项目,通过WaterMark来支持基于Event Time的时间窗口。当操作符通过基于Event Time的时间窗口来处理数据时,它必须在确定所有属于该时间窗口的消息全部流入此操作符后才能开始数据处理。但是由于消息可能是乱序的,所以操作符无法直接确认何时所有属于该时间窗口的消息全部流入此操作符。WaterMark包含一个时间戳,Flink使用WaterMark标记所有小于该时间戳的消息都已流入,Flink的数据源在确认所有小于某个时间戳的消息都已输出到Flink流处理系统后,会生成一个包含该时间戳的WaterMark,插入到消息流中输出到Flink流处理系统中,Flink操作符按照时间窗口缓存所有流入的消息,当操作符处理到WaterMark时,它对所有小于该WaterMark时间戳的时间窗口数据进行处理并发送到下一个操作符节点,然后也将WaterMark发送到下一个操作符节点。

      为了保证能够处理所有属于某个时间窗口的消息,操作符必须等到大于这个时间窗口的WaterMark之后才能开始对该时间窗口的消息进行处理,相对于基于Operator Time的时间窗口,Flink需要占用更多内存,且会直接影响消息处理的延迟时间。对此,一个可能的优化措施是,对于聚合类的操作符,可以提前对部分消息进行聚合操作,当有属于该时间窗口的新消息流入时,基于之前的部分聚合结果继续计算,这样的话,只需缓存中间计算结果即可,无需缓存该时间窗口的所有消息。

      对于基于Event Time时间窗口的操作符来说,流入WaterMark的时间戳与当前节点的时钟一致是最简单理想的状况,但是在实际环境中是不可能的,由于消息的乱序以及前面节点处理效率的不同,总是会有某些消息流入时间大于其本身的时间戳.

    基于时间戳的排序:

      在流处理系统中,由于流入的消息是无限的,所以对消息进行排序基本上被认为是不可行的。但是在Flink流处理系统中,基于WaterMark,Flink实现了基于时间戳的全局排序。排序的实现思路如下:排序操作符缓存所有流入的消息,当其接收到WaterMark时,对时间戳小于该WaterMark的消息进行排序,并发送到下一个节点,在此排序操作符中释放所有时间戳小于该WaterMark的消息,继续缓存流入的消息,等待下一个WaterMark触发下一次排序。

      由于WaterMark保证了在其之后不会出现时间戳比它小的消息,所以可以保证排序的正确性。需要注意的是,如果排序操作符有多个节点,只能保证每个节点的流出消息是有序的,节点之间的消息不能保证有序,要实现全局有序,则只能有一个排序操作符节点。通过支持基于Event Time的消息处理,Flink扩展了其流处理系统的应用范围,使得更多的流处理任务可以通过Flink来执行。

    最后说明一下:
      由于容错机制和内存管理内容较多,在在后续章节中会贴出详细的图示方便读者们更好的理解Flink相关内容,此文章由于各种原因暂时没有很好的呈现给大家,请多谅解!

    展开全文
  • Flink

    千次阅读 2019-08-14 19:43:41
    Flink 一、简介 Flink核心是一个流式的数据流执行引擎,其针对数据流的分布式计算提供了数据分布、数据通信以及容错机制等功能。基于流执行引擎,Flink提供了诸多更高抽象层的API以便用户编写分布式任务: ​ ...
  • Flink安装及使用

    千次阅读 2018-12-04 10:06:50
    在官网安装Flink,并解压到/usr/local/flink sudo tar -zxf flink-1.6.2-bin-hadoop27-scala_2.11.tgz -C /usr/local cd /usr/local 修改文件名字,并设置权限 sudo mv ./flink-*/ ./flink sudo chown -...
  • flink实例开发-详细使用指南

    万次阅读 2020-07-03 11:08:35
    Flink入门及实战-上: http://edu.51cto.com/sd/07245 Flink入门及实战-下: http://edu.51cto.com/sd/5845e flink实例开发-详细使用指南 配置一个maven项目 编写一个flink程序 编程实战:编写一个向kafka...
  • Flink之一 Flink基本原理介绍

    万次阅读 多人点赞 2017-05-11 13:41:07
    Flink原理,Flink入门介绍
  • Flink技术原理

    万次阅读 2019-11-15 22:53:25
    Flink简介 Flink概述: Flink是一个批处理和流处理结合的统一计算框架,其核心是一个提供了数据分发以及并发化计算的流数据处理引擎。它的最大亮点是流处理,是业界最顶级的开源流处理引擎。 Flink与Storm...
  • Apache Flink介绍

    千次阅读 2018-07-25 16:21:16
    1、Apache Flink介绍 既然有了Apache Spark,为什么还要使用Apache Flink? 因为Flink是一个纯流式计算引擎,而类似于Spark这种微批的引擎,只是Flink流式引擎的一个特例。其他的不同点之后会陆续谈到。 1.1 历史...
  • flink最新中文文档1.7

    万次阅读 2019-04-08 11:43:53
    最近学习flink,中文文档目前只更新到1.2,英文版阅读速度相对慢很多,参考官方文档和博客又发现了一个新中文版本: V1.7-SNAPSHOT,希望对大家有帮助。 ...
  • Flink日志输出查看方式

    万次阅读 热门讨论 2019-03-08 10:28:52
    在网上查看flink日志查看方式,竟然查询不到,因此写下这篇文章,给有此困惑的小盆友们,也给自己做个总结! 前情提要: 我是通过flink web ui提交的flink任务的,因此通过flink ui查看自身控制台打印输出的。 第一...
  • 精通Apache Flink必读系列文章

    千次阅读 2020-09-03 11:49:51
    Flink是最接近于谷歌Dataflow大数据分析平台的设计的开源分布式计算引擎,其核心设计理念与Spark有很大的不同。 从设计出发点,Flink是一个流计算处理计算引擎,把批处理视为无限流计算的一种特例,Spark是批处理...
  • flink入门到项目(完整教程)

    万次阅读 多人点赞 2019-06-18 19:44:51
    flink知识点进行简单梳理,及每个功能点的代码实现。本地运行只需要更改/resource/conf下的配置信息,可直接运行。路过的大老爷们,点个星呗(你们的赞是我坚持写下去的动力!!) 项目持续更新中。。。 ...
  • 如何停止flink job

    万次阅读 2020-07-03 10:59:17
    两种方式,在standalone和on yarn集群中都是适用的1:在ui界面停止2:在命令行停止在ui界面停止假设已经成功提交到集群一个任务,查看集群webui界面在命令行停止:先查询目前在运行的job任务列表执行bin/flink list...
  • 下载(注:flink需要jdk版本在8.x之上): https://flink.apache.org/downloads.html 下载Apache Flink 1.x.x for Scala 2.11 (asc, sha512),本地解压 运行 进入解压目录的bin目录,运行start-cluster.bat,启动...
  • 使用Maven生成Flink开发项目

    千次阅读 2020-07-18 17:32:54
    mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.9.0 -DgroupId=com.test -DartifactId=flink -Dversion=1.0.0 -Dpackage=...
  • 干货 | Spark Streaming 和 Flink 详细对比

    千次阅读 2018-08-11 00:39:08
    原文详见:https://mp.weixin.qq.com/s/Fb1cW0oN7xYeb1oI2ixtgQ
1 2 3 4 5 ... 20
收藏数 38,440
精华内容 15,376
关键字:

flink