精华内容
下载资源
问答
  • 为了使 Flink 的状态具有良好的容错性,Flink 提供了检查点机制 (CheckPoints) 。通过检查点机制Flink 定期在数据流上生成 checkpoint barrier ,当某个算子收到 barrier 时,即会基于当前状态生成一份快照,然后...

    1 检查点机制

    1.1 CheckPoints

    为了使 Flink 的状态具有良好的容错性,Flink 提供了检查点机制 (CheckPoints) 。通过检查点机制,Flink 定期在数据流上生成 checkpoint barrier ,当某个算子收到 barrier 时,即会基于当前状态生成一份快照,然后再将该 barrier 传递到下游算子,下游算子接收到该 barrier 后,也基于当前状态生成一份快照,依次传递直至到最后的 Sink 算子上。当出现异常后,Flink 就可以根据最近的一次的快照数据将所有算子恢复到先前的状态。

    检查点机制

    1.2 开启检查点

    默认情况下,检查点机制是关闭的,需要在程序中进行开启:

    // 开启检查点机制,并指定状态检查点之间的时间间隔
    env.enableCheckpointing(1000); 
    
    // 其他可选配置如下:
    // 设置语义
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    // 设置两个检查点之间的最小时间间隔
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
    // 设置执行Checkpoint操作时的超时时间
    env.getCheckpointConfig().setCheckpointTimeout(60000);
    // 设置最大并发执行的检查点的数量
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
    // 将检查点持久化到外部存储
    env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    // 如果有更近的保存点时,是否将作业回退到该检查点
    env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
    

    1.3 保存点机制

    保存点机制 (Savepoints) 是检查点机制的一种特殊的实现,它允许你通过手工的方式来触发 Checkpoint,并将结果持久化存储到指定路径中,主要用于避免 Flink 集群在重启或升级时导致状态丢失。

    1.4 RichFunction 检查点实战

    public class OperatorWarning implements CheckpointedFunction {
        // 非正常数据
        private List<Tuple2<String, Long>> bufferedData;
        // checkPointedState
        private transient ListState<Tuple2<String, Long>> checkPointedState;
       
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            // 注意这里获取的是OperatorStateStore
            checkPointedState = context.getOperatorStateStore().
                    getListState(new ListStateDescriptor<>("abnormalData", TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {})));
            // 如果发生重启,则需要从快照中将状态进行恢复
            if (context.isRestored()) {
                for (Tuple2<String, Long> element : checkPointedState.get()) {
                    bufferedData.add(element);
                }
            }
        }
    
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            // 在进行快照时,将数据存储到checkPointedState
            checkPointedState.clear();
            for (Tuple2<String, Long> element : bufferedData) {
                checkPointedState.add(element);
            }
        }
    }
    

    2 状态管理

    2.1 算子状态

    算子状态 (Operator State):顾名思义,状态是和算子进行绑定的,一个算子的状态不能被其他算子所访问到。官方文档上对 Operator State 的解释是:each operator state is bound to one parallel operator instance,所以更为确切的说一个算子状态是与一个并发的算子实例所绑定的,即假设算子的并行度是 2,那么其应有两个对应的算子状态:
    算子状态

    2.2 键控状态

    键控状态 (Keyed State) :是一种特殊的算子状态,即状态是根据 key 值进行区分的,Flink 会为每类键值维护一个状态实例。如下图所示,每个颜色代表不同 key 值,对应四个不同的状态实例。需要注意的是键控状态只能在 KeyedStream 上进行使用,我们可以通过 stream.keyBy(…) 来得到 KeyedStream 。
    键控状态

    2.3 监控状态编程

    Flink 提供了以下数据格式来管理和存储键控状态 (Keyed State):

    • ValueState:存储单值类型的状态。可以使用 update(T) 进行更新,并通过 T value() 进行检索。
    • ListState:存储列表类型的状态。可以使用 add(T) 或 addAll(List) 添加元素;并通过 get() 获得整个列表。
    • ReducingState:用于存储经过 ReduceFunction 计算后的结果,使用 add(T) 增加元素。
    • AggregatingState:用于存储经过 AggregatingState 计算后的结果,使用 add(IN) 添加元素。
    • FoldingState:已被标识为废弃,会在未来版本中移除,官方推荐使用 AggregatingState 代替。
    • MapState:维护 Map 类型的状态。
     @Override
        public void open(Configuration parameters) {
            StateTtlConfig ttlConfig = StateTtlConfig
                    // 设置有效期为 10 秒
                    .newBuilder(Time.seconds(10))
                    // 设置有效期更新规则,这里设置为当创建和写入时,都重置其有效期到规定的10秒
                    .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                    /*设置只要值过期就不可见,另外一个可选值是ReturnExpiredIfNotCleanedUp,
                     代表即使值过期了,但如果还没有被物理删除,就是可见的*/
                    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                    .build();
            ListStateDescriptor<Long> descriptor = new ListStateDescriptor<>("abnormalData", Long.class);
            descriptor.enableTimeToLive(ttlConfig);
            abnormalData = getRuntimeContext().getListState(descriptor);
        }
    
        @Override
        public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Long>>> out) throws Exception {
            Long inputValue = value.f1;
            // 如果输入值超过阈值,则记录该次不正常的数据信息
            if (inputValue >= threshold) {
                abnormalData.add(inputValue);
            }
    
            ArrayList<Long> list = Lists.newArrayList(abnormalData.get().iterator());
            // 如果不正常的数据出现达到一定次数,则输出报警信息
            if (list.size() >= numberOfTimes) {
                out.collect(Tuple2.of(value.f0 + " 超过指定阈值 ", list));
                // 报警信息输出后,清空状态
                abnormalData.clear();
            }
        }
    

    2.4 算子状态编程

    相比于键控状态,算子状态目前支持的存储类型只有以下三种:

    • ListState:存储列表类型的状态。
    • UnionListState:存储列表类型的状态,与 ListState 的区别在于:如果并行度发生变化,ListState 会将该算子的所有并发的状态实例进行汇总,然后均分给新的 Task;而 UnionListState 只是将所有并发的状态实例汇总起来,具体的划分行为则由用户进行定义。
    • BroadcastState:用于广播的算子状态。
     @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            // 注意这里获取的是OperatorStateStore
            checkPointedState = context.getOperatorStateStore().
                    getListState(new ListStateDescriptor<>("abnormalData", TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {})));
            // 如果发生重启,则需要从快照中将状态进行恢复
            if (context.isRestored()) {
                for (Tuple2<String, Long> element : checkPointedState.get()) {
                    bufferedData.add(element);
                }
            }
        }
    

    备注:一个算子状态是与一个并发的算子实例所绑定的,即假设算子的并行度是 2,那么其应有两个对应的算子状态

    3 状态后端

    3.1 状态管理实现方式

    状态管理的实现方式

    • MemoryStateBackend
      默认的方式,即基于 JVM 的堆内存进行存储,主要适用于本地开发和调试。

    • FsStateBackend
      基于文件系统进行存储,可以是本地文件系统,也可以是 HDFS 等分布式文件系统。 需要注意而是虽然选择使用了 FsStateBackend ,但正在进行的数据仍然是存储在 TaskManager 的内存中的,只有在 checkpoint 时,才会将状态快照写入到指定文件系统上。

    • RocksDBStateBackend
      RocksDBStateBackend 是 Flink 内置的第三方状态管理器,采用嵌入式的 key-value 型数据库 RocksDB 来存储正在进行的数据。等到 checkpoint 时,再将其中的数据持久化到指定的文件系统中,所以采用 RocksDBStateBackend 时也需要配置持久化存储的文件系统。之所以这样做是因为 RocksDB 作为嵌入式数据库安全性比较低,但比起全文件系统的方式,其读取速率更快;比起全内存的方式,其存储空间更大,因此它是一种比较均衡的方案。

    3.2 配置方式

    • 基于代码方式进行配置,只对当前作业生效:
    // 配置 FsStateBackend
    env.setStateBackend(new FsStateBackend("hdfs://namenode:40010/flink/checkpoints"));
    // 配置 RocksDBStateBackend
    env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:40010/flink/checkpoints"));
    
    // 配置 RocksDBStateBackend 时,需要额外导入下面的依赖:
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
        <version>1.12</version>
    </dependency>
    
    • 基于 flink-conf.yaml 配置文件的方式进行配置,对所有部署在该集群上的作业都生效:
    state.backend: filesystem
    state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
    
    展开全文
  • flink 检查点Apache Flink is a popular real-time data processing framework. It’s gaining more and more popularity thanks to its low-latency processing at extremely high throughput in a fault-tolerant ...

    flink 检查点

    Apache Flink is a popular real-time data processing framework. It’s gaining more and more popularity thanks to its low-latency processing at extremely high throughput in a fault-tolerant manner.

    Apache Flink是一种流行的实时数据处理框架。 它以容错的方式以极高的吞吐量进行低延迟处理,因此越来越受欢迎。

    While there is a good documentation provided by Flink it took me some time to get to understand the various mechanics that come together to make Flink Check pointing and Recovery work end to end. In this article I will explain the key steps one need to perform at various operator levels to create a fault tolerant Flink Job. Flink basic operators are Source, Process and Sink. Process operators could be of various flavors.

    尽管Flink提供了很好的文档,但是我花了一些时间来理解使Flink Check Pointing和Recovery工作端到端结合在一起的各种机制。 在本文中,我将解释在各种操作员级别上创建容错Flink Job所需执行的关键步骤。 Flink的基本运算符是Source,Process和Sink。 过程操作员可能具有多种口味。

    So let’s get started on what you need to do to enable check pointing and making all operators Checkpoint aware.

    因此,让我们开始您需要做的事情,以启用检查点并使所有操作员都知道Checkpoint。

    Flink环境配置(检查指向) (Flink Environment Configuration (Check pointing))

    Flink Job Configuration for Check pointing
    用于检查点的Flink作业配置

    源运营商检查点 (Source Operator Checkpointing)

    Source operator is the one which fetches data from the source. I wrote a simple SQL continuous query based source operator and kept track of the timestamp till the data has been queried. This information is what will be stored as part of check pointing process by flink. State of the source is saved by flink at the Job Operator level. CheckPointedFunction interface or ListCheckpointed interface should be implemented by the Source function as follows:

    源运算符是从源获取数据的运算符。 我编写了一个简单的基于SQL连续查询的源运算符,并跟踪时间戳,直到查询完数据为止。 该信息将作为flink在检查点过程中存储的信息。 源的状态通过flink在作业操作员级别保存。 CheckPointedFunction接口或ListCheckpointed接口应该由Source函数实现,如下所示:

    snapshotState method will be called by the Flink Job Operator every 30 seconds as configured. Method should return the value to be saved in state backend

    Flink作业操作员将按配置每30秒调用一次snapshotState方法。 方法应返回要保存在状态后端的值

    restoreState method is called when the operator is restarting and this method is the handler method to set the last stored timestamp (state) during a checkpoint

    当操作员重新启动时将调用restoreState方法,并且该方法是在检查点期间设置最后存储的时间戳(状态)的处理程序方法

    过程功能检查点 (Process Function Checkpointing)

    Flink supports saving state per key via KeyedProcessFunction. ProcessWindowFunction can also save the state of windows on per key basis in case of Event Time processing

    Flink支持通过KeyedProcessFunction保存每个键的状态。 在事件时间处理的情况下, ProcessWindowFunction还可以按键保存窗口的状态

    For KeyedProcessFunction, ValueState need to be stored per key as follows:

    对于KeyedProcessFunction ,需要按以下方式存储每个键的ValueState

    ValueState is just one of the examples. There are other ways to save the state as well. ProcessWindowFunction automatically saves the window state and no variable need to be set.

    ValueState只是示例之一。 还有其他保存状态的方法。 ProcessWindowFunction自动保存窗口状态,无需设置任何变量。

    接收器功能检查点 (Sink Function Checkpointing)

    Sink function check pointing works similar to Source Function check pointing and state is saved at the Job Operator level. I have implemented Sink function for Postgres DB. There could be multiple approaches to make sink function fault tolerant and robust considering performance and efficiency. I have taken a simplistic approach and will improve upon it in future.

    接收器功能检查指向的工作方式类似于源功能检查指向,并且状态保存在作业操作员级别。 我已经为Postgres DB实现了Sink功能。 考虑到性能和效率,可以有多种方法使接收器功能具有容错性和鲁棒性。 我采用了一种简单的方法,将来会对其进行改进。

    By committing statement in snapshotState method I’m ensuring that all pending data is flushed and committed as part of checkpointing trigger.

    通过在snapshotState方法中提交语句,我确保将所有未决数据刷新并作为检查点触发器的一部分提交。

    可以了,好了 (All Set)

    Finally, you need to run your job and you can try to cancel it in between of processing and try to rerun it by providing the checkpoint location as follows. You will need to pass the latest checkpoint yourself, pay attention to -s parameter.

    最后,您需要运行您的作业,您可以尝试在处理之间取消它,并通过提供以下检查点位置来尝试重新运行它。 您将需要自己通过最新的检查点,请注意-s参数。

    .\flink.bat run -m localhost:8081 -s D:\flink-checkpoints\1d96f28886b693452ab1c88ab72a35c8\chk-10 -c <Job class Name> <Path to Jar file>

    结论 (Conclusion)

    This is a basic approach toward checkpointing and failure recovey and might need more improvements depending upon each use case. Feel free to provide me your feedback. Happy Reading!!

    这是进行检查点和故障重新报告的基本方法,并且可能需要根据每个用例进行更多的改进。 随时向我提供您的反馈。 阅读愉快!

    Repository Link to codebase:

    仓库链接到代码库:

    翻译自: https://towardsdatascience.com/flink-checkpointing-and-recovery-7e59e76c2d45

    flink 检查点

    展开全文
  • Flink检查点问题

    千次阅读 2019-04-28 19:18:26
    Flink检查点问题问题简介扫盲什么是检查点如何配置检查点路径如何启用检查点如何使用检查点解决思路小结 问题简介 Cannot find meta data file '_metadata' in directory 'hdfs://nn-HA-service/flink/flink-...

    问题简介

    Cannot find meta data file '_metadata' in directory 'hdfs://nn-HA-service/flink/flink-checkpoints'
    

    上面是本人实际工作中遇到的一个问题,于是便百度了一下问题,查无结果,可能是很低级的错误吧,毕竟本人对flink也仅限于会用的地步。但还是打算总结一下这个问题,希望能帮助到各位读者。

    扫盲

    首先我们来扫盲一下相关知识点。

    什么是检查点

    官网是这样定义的:检查点通过允许恢复状态和相应的流位置使Flink中的状态容错,从而为应用程序提供与无故障执行相同的语义。

    检查点的主要目:在意外的作业失败时提供恢复机制。
    Checkpoint的生命周期由Flink管理,即Flink创建,拥有和发布Checkpoint - 无需用户交互。作为一种恢复和定期触发的方法,Checkpoint实现的两个主要设计目标是:
    i)创建轻量级。
    ii)尽可能快地恢复。

    如何配置检查点路径

    有两种方式:

    1. 通过配置文件全局配置
      这种配置方式是全局的,即默认检查点路径。
      搭建过集群的同学应该知道,配置文件一般都在conf目录下。没错,flink也是一样,我们需要修改的就是 flink/conf/flink-conf.yaml 这个文件。
      在配置文件里面将下面这个参数配置上:
       state.checkpoints.dir: hdfs://nn-HA-service/flink/flink-checkpoints
    
    1. 通过在作业里配置
      就是在每个作业里,自定义检查点路径,优先级高于默认检查点路径。
      如果没有定义,就走默认检查点路径。
    	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    	env.setStateBackend(new RocksDBStateBackend("hdfs:///flink/flink-checkpoints/");
    

    如何启用检查点

    flink默认是不会启用检查点的,这需要我们在代码里面设置。
    代码如下:

    			StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
               
                //设置检查点间隔时间
                env.enableCheckpointing(60000);
    

    对,我们只需要设置下检查点的时间间隔,单位是ms,就说明启动了了一个一分钟完成一次的检查点策略了。

    如何使用检查点

    使用检查点,即在意外的作业失败后恢复数据。
    这时,我们只需要指定检查点路径重启任务即可。
    官网给的示例:

    $ bin/flink run -s :checkpointMetaDataPath [:runArgs]
    

    checkpointMetaDataPath : 这个是检查点元数据路径,并不简单是所配置的检查点的路径。笔者遇到的问题关键就在于此。

    解决思路

    由报出来的错误我们不难看出,flink是希望找到_metadata这个元数据文件,可我并没有指定此文件,所以报错了。
    先介绍下我用的启动命令:

    flink run -d -c com.FlinkCheckpointTest \
    	-s hdfs://nn-HA-service/flink/flink-checkpoints/ \
    	-m yarn-cluster \
    	-yjm 1024m -ytm 1024m -ynm FlinkCheckpointTest \
    	/home/jar/flink_test.jar
    

    想必这个命令知道flink的同学都很熟悉,不熟悉的可以参考下Flink官网(https://flink.apache.org/)。
    因此我去检查点目录下查找了一番,终于让我找到了 _metadata 文件。
    如下图所示:
    检查点目录
    我们仔细看下这个目录:

    /flink/flink-checkpoints/a9fcbc1efb8124ae998d0cbea62e7eae/chk-2836
    

    前面两层目录是我们设置的检查点目录,后面一串像是id。
    最后那个chk-2836应该就是实际检查点的目录,2836代表当前作业已经生成过2836次检查点了。最新的检查点覆盖之前的检查点。
    /flink/flink-checkpoints/a9fcbc1efb8124ae998d0cbea62e7eae/ 该目录下面就一个chk开头的目录,即始终保存最新的目录,节省hdfs资源。
    中间那一串id经过仔细查阅官网得知是jobID
    好,那么现在主要的问题就是如何寻找对应作业的jobID
    有两种情况:

    1. 作业正在运行
      这种情况,可以直接去flink的管理界面找到。如下图:
      在这里插入图片描述
    2. 作业已经停止
      这种情况,就需要去看此作业的详细日志了。如下图:
      在这里插入图片描述
      jobmanager.log 日志里面多次出现 作业名和 jobID

    实际工作中,用到检查点的时候肯定是因为作业异常中断,所以大多数情况都是通过第二种方式去寻找作业对应的 jobID
    修改后的提交作业命令:

    flink run -d -c com.FlinkCheckpointTest \
    	-s hdfs://nn-HA-service/flink/flink-checkpoints/541eaf031c6f8ed50916342b2baafe98/chk-13 \
    	-m yarn-cluster \
    	-yjm 1024m -ytm 1024m -ynm FlinkCheckpointTest \
    	/home/jar/flink_test.jar
    

    运行了一段时间,作业异常中断,发现是内存溢出错误,故适当调节内存,最终提交命令如下:

    flink run -d -c com.FlinkCheckpointTest \
    	-s hdfs://nn-HA-service/flink/flink-checkpoints/541eaf031c6f8ed50916342b2baafe98/chk-13 \
    	-m yarn-cluster \
    	-yjm 4096m -ytm 4096m -ynm FlinkCheckpointTest \
    	/home/jar/flink_test.jar
    

    好了,至此,问题圆满解决。

    小结

    这里顺便总结下flink作业异常中断的操作流程。

    1. 找出作业对应的jobID
    2. 进入hdfs对应目录,找到目录下面最新的检查点目录
    3. 通过指定检查点目录的方式重新启动作业
    4. 观察作业运行情况,如果出现内存溢出异常断开,加大内存重新启动。如果出现其他异常,欢迎留言给我,共同学习。
    5. 待作业运行稳定,查看作业最初异常中断的原因,记录下来并总结思考如何解决和避免。
    展开全文
  • Flink 故障恢复机制的核心,就是应用状态的一致性检查点 有状态流应用的一致检查点,其实就是所有任务的状态,在某个时间点的一份拷贝(一份快照);这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的...

    一致性检查点(Checkpoints)

    1. Flink 故障恢复机制的核心,就是应用状态的一致性检查点

    2. 有状态流应用的一致检查点,其实就是所有任务的状态,在某个时间点的一份拷贝(一份快照);这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时候(如5这个数据虽然进了奇数流但是偶数流也应该做快照,因为属于同一个相同数据,只是没有被他处理)

    3. 在JobManager中也有个Chechpoint的指针,指向了仓库的状态快照的一个拓扑图,为以后的数据故障恢复做准备

    在这里插入图片描述

    从检查点恢复状态

    1. 在执行流应用程序期间,Flink 会定期保存状态的一致检查点
    2. 如果发生故障, Flink 将会使用最近的检查点来一致恢复应用程序的状态,并重新启动处理流程(如图中所示,7这个数据被source读到了,准备传给奇数流时,奇数流宕机了,数据传输发生中断

    在这里插入图片描述

    1. 遇到故障之后,第一步就是重启应用(重启后的流都是空的)

    在这里插入图片描述

    1. 第二步是从 checkpoint 中读取状态,将状态重置(读取在远程仓库(Storage,这里的仓库指状态后端保存数据指定的三种方式之一)保存的状态),从检查点重新启动应用程序后,其内部状态与检查点完成时的状态完全相同

    在这里插入图片描述

    1. 第三步:开始消费并处理检查点到发生故障之间的所有数据
    2. 这种检查点的保存和恢复机制可以为应用程序状态提供“精确一次”(exactly-once)的一致性,因为所有算子都会保存检查点并恢复其所有状态,这样一来所有的输入流就都会被重置到检查点完成时的位置

    在这里插入图片描述

    Chandy-Lamport 算法

    在上图所示的数据7,同样被Source读取后,在传向奇数流时,奇数流宕机了,那么这个数据7在开始已经Source读取了,但是由于宕机,奇数流又没有处理到这个数据7,那么当检查点恢复后,这个数据7是否还会重新从输入队列中读取,如果不重新读取则数据将发生丢失,为了防止这种情况Flink做了改进实现,这种实现叫Chandy-Lamport 算法的分布式快照,这样做的好处是将检查点的保存和数据处理分离开,不暂停整个应用,简单来说就是Source读取一个数据,自己就做一份CheckPoint保存,不用管其他数据流是否读取,其他数据流依然如此,读取一个数据保存一份数据

    Flink 检查点算法

    Flink 检查点算法

    1. Flink 的检查点算法用到了一种称为分界线(barrier)的特殊数据形式,用来把一条流上数据按照不同的检查点分开
    2. 分界线之前到来的数据导致的状态更改,都会被包含在当前分界线所属的检查点中;而基于分界线之后的数据导致的所有更改,就会被包含在之后的检查点中

    算法操作解析

    1. 现在是一个有两个输入流的应用程序,用并行的两个 Source 任务来读取
    2. 两条自然数数据流,蓝色数据流已经输出完蓝3了,黄色数据流输出完黄4
    3. 在Souce端 Source1 接收到了数据蓝3 正在往下游发向一个数据蓝2 和 蓝3; Source2 接受到了数据黄4,且往下游发送数据黄4
    4. 偶数流已经处理完黄2 所以后面显示为2, 奇数流处理完蓝1 和 黄1 黄3 所以为5 并分别往下游发送每次聚合后的结果给Sink

    在这里插入图片描述

    1. JobManager 会向每个 source 任务发送一条带有新检查点 ID 的消息,通过这种方式来启动检查点,这个带有新检查点ID的东西为barrier,图中三角型表示,2只是ID

    在这里插入图片描述

    1. 在Source端接受到barrier后,将自己此身的3 和 4 的数据,将它们的状态写入检查点,且向JobManager发送checkpoint成功的消息(状态后端在状态存入检查点之后,会返回通知给 source 任务,source 任务就会向 JobManager 确认检查点完),然后向下游分别发出一个检查点 barrier
    2. 可以看出在Source接受barrier时,数据流也在不断的处理,不会进行中断,
    3. 此时的偶数流已经处理完蓝2变成了4,但是还没处理到黄4,只是下游发送了一个次数的数据4,而奇数流已经处理完蓝3变成了8,并向下游发送了8
    4. 此时barrier都还未到奇数流和偶数流

    在这里插入图片描述

    1. 此时蓝色流的barrier先一步抵达了偶数流,黄色的barrier还未到,但是因为数据的不中断一直处理,此时的先到的蓝色的barrier会将此时的偶数流的数据4进行缓存处理,流接着处理接下来的数据等待着黄色的barrier的到来,而黄色barrier之前的数据将会对缓存的数据相加
    2. 这次处理的总结:分界线对齐barrier 向下游传递,sum 任务会等待所有输入分区的 barrier 到达,对于barrier已经到达的分区,继续到达的数据会被缓存。而barrier尚未到达的分区,数据会被正常处理

    在这里插入图片描述

    1. 当蓝色的barrier和黄色的barrier(所有分区的)都到达后,进行状态保存到远程仓库,然后对JobManager发送消息,说自己的检查点保存完毕了
    2. 此时的偶数流和奇数流都为8
    3. 当收到所有输入分区的 barrier 时,任务就将其状态保存到状态后端的检查点中,然后将 barrier 继续向下游转发

    在这里插入图片描述

    1. 向下游转发检查点 barrier 后,任务继续正常的数据处理

    在这里插入图片描述

    1. Sink 任务向 JobManager 确认状态保存到 checkpoint 完毕
    2. 当所有任务都确认已成功将状态保存到检查点时,检查点就真正完成了

    在这里插入图片描述

    保存点(Savepoints)

    简而言之,CheckPoint为自动保存,SavePoint为手动保存
    在这里插入图片描述

    CheckPoint配置

     env.enableCheckpointing(1000L) // 开启 触发时间间隔为1000毫秒
        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE) // 语义 默认EXACTLY_ONCE
        env.getCheckpointConfig.setCheckpointTimeout(60000L) // 超时时间
        env.getCheckpointConfig.setMaxConcurrentCheckpoints(2) // 最大允许同时出现几个CheckPoint
        env.getCheckpointConfig.setMinPauseBetweenCheckpoints(500L) // 最小得间隔时间
        env.getCheckpointConfig.setPreferCheckpointForRecovery(true) // 是否倾向于用CheckPoint做故障恢复
        env.getCheckpointConfig.setTolerableCheckpointFailureNumber(3) // 容忍多少次CheckPoint失败
         // 重启策略
        // 固定时间重启
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L))
        // 失败率重启
        env.setRestartStrategy(RestartStrategies.failureRateRestart(5, Time.of(5, TimeUnit.MINUTES), Time.of(10, TimeUnit.SECONDS)))
    
    

    在这里插入图片描述

    展开全文
  • 在执行流应用程序期间,Flink 会定期保存状态的一致检查点,如果发生故障, Flink 将会使用最近的检查点来一致恢复应用程序的状态,并重新启动处理流程。 遇到故障之后,第一步就是重启应用 第二步是从 ...
  • Flink检查点失败问题-汇总

    千次阅读 2019-03-21 23:49:41
    flink检查点中恢复的问题 从检查点恢复的时候,命令 ./flink run -s hdfs://192.xxx.xxx.xx:port/data1/flink/checkpoint1 -c com.mymain.MyTestMain 报出以下错误: java.util.concurrent.CompletionException...
  • Flink 检查点(checkpoint)

    千次阅读 2020-03-25 17:35:00
    它使用一种被称为"检查点"(checkpoint)的特性,在出现故障时将系统重置回正确状态。下面通过简单的类比来解释检查点的作用。 假设你和两位朋友正在数项链上有多少颗珠子,如下图所示。你捏住珠子,边数边拨,每拨...
  • 前言 之前文章有讲过flink的有状态算子。有状态算子就是讲算子产生的中间...所以,检查点就是定期保存任务状态的机制。但是状态又保存到哪里了呢?之前说过的状态后端。 检查点是用于错误恢复的,但这是有条件的:
  • flink检查点checkpoint失败问题总结-2

    千次阅读 2019-03-21 15:08:53
    检查点checkpoint失败问题总结(2): 问题描述:检查点刚开始是可以的做checkpoint的,后期越来越不能够做checkpoint的情况总结: 一.反压问题 1.什么是反压(如下图1所示)? 图2-1 部分算子反压表现(web ui) 2....
  • Flink Checkpoints检查点

    2021-01-06 10:20:14
    Flink 故障恢复机制的核心, 就是应用状态的一致性检查点 有状态流应用的一 致检查点,其实就是所有任务的状态,在某个时间点的一份拷贝(一份快照) ;这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的...
  • Flink 故障恢复机制的核心,就是应用状态的一致性检查点 有状态流应用的一致检查点,其实就是所有任务的状态,在某个时间点的一份拷贝(一份快照);这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时候 以上...
  • checkpoint和savepoint是Flink为我们提供的作业快照机制,它们都包含有作业状态的持久化副本。官方文档这样描述checkpoint: Checkpoints make state in Flink fault tolerant by allowing state and the ...
  • Flink检查点本质上是通过异步屏障快照(asychronous barrier snapshot, ABS)算法产生的全局状态快照,一般是存储在分布式文件系统(如HDFS)上。但是,如果状态空间超大(比如key非常多或者窗口区间很长),检查点...
  • Flink 状态管理与检查点机制

    千次阅读 2020-01-20 00:00:00
    大数据技术与架构点击右侧关注,大数据开发领域最强公众号!暴走大数据点击右侧关注,暴走大数据!一、状态分类相对于其他流计算框架,Flink 一个比较重要的特性就是其支持有状态计算。即你可以...
  • 1. Flink 状态管理 什么是有状态的计算? 首先输入数据源源不断输入到Task里面 当计算的时候通过Getstate 从State容器里读取历史的状态 经过一系列处理又更新到State容器里面 将处理后的结果发送到下游 1.1 状态...
  • Flink之所以能够做到高效而准确的有状态流式处理,...Flink检查点本质上是通过异步屏障快照(asychronous barrier snapshot, ABS)算法产生的全局状态快照,一般是存储在分布式文件系统(如HDFS)上。但是,如果状...
  • Flink检查点算法的正式名称是异步分界线快照(asynchronous barrier snapshotting)。该算法大致基于Chandy-Lamport分布式快照算法。 Flink检查点的核心作用是确保状态正确,即使遇到程序中断,也要正确。记住这一...
  • flink中,定义了操作状态和分组状态两种状态,且定义了检查点机制来定时触发检查点,触发检查点会将flink状态保存到statebackends中,所谓statebackends就是定义触发检查点后,将状态数据保存到哪里,默认是保存到...
  • Flink检查点机制与流和状态的持久存储交互。通常,它要求: 持久(或耐用,可以重放某个时间段内一定量的记录)的数据源。此类消息源是持久消息队列(例如,Apache Kafka,RabbitMQ,Amazon Ki
  • Flink Checkpoint(检查点

    千次阅读 2019-07-03 16:31:32
    Flink Checkpoint(检查点Flink中的每个函数和运算符都可以是有状态的(如果是Keyed Stream,使用ValueState、ListState等状态,如果是Operator State,实现CheckpointedFunction或CheckpointedList接口。使用...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 3,872
精华内容 1,548
关键字:

flink检查点机制