精华内容
下载资源
问答
  • Dubbo-容错机制
    2019-09-03 23:13:50

    所谓容错机制,举个简单例子,我们在使用电脑的某个程序时,常常会遇到“程序无反应”或“程序未响应”的情况发生,此时这个程序便不能在进行下去,但经常会在过了几秒钟后恢复到正常使用的状态。这种“无反应”或“未响应”几秒钟的错误状态,我们便称之为“容错”。

    在分布式系统中常常各个系统之间是一个链路的调用过程,如果链路中的某个节点出现故障,很可能会发生雪崩效应。

    在这里插入图片描述

    比如如果Node3节点发生故障会导致整个分布式系统不可用,这个是不能容忍的。首先设计的时候应用系统之间肯定是要进行隔离,同时也需要有相应的容错机制。

    Dubbo一共提供了六种容错机制,可以在Client端调用的时候进行设置:

    Failover Cluster(缺省)

    失败自动切换,当出现失败,重试其它服务器。通常用于读操作,但重试会带来更长延迟。可通过 retries=“2” 来设置重试次数(不含第一次,缺省为2次)。

    重试次数配置如下:

    <dubbo:service retries="2" />
    

    <dubbo:reference retries="2" />
    

    <dubbo:reference>
        <dubbo:method name="findFoo" retries="2" />
    </dubbo:reference>
    

    Failfast Cluster

    快速失败,只发起一次调用,失败立即报错。通常用于非幂等性的写操作,比如新增记录。

    Failsafe Cluster

    失败安全,出现异常时,直接忽略。通常用于写入审计日志等操作。

    Failback Cluster

    失败自动恢复,后台记录失败请求,定时重发。通常用于消息通知操作。

    Forking Cluster

    并行调用多个服务器,只要一个成功即返回。通常用于实时性要求较高的读操作,但需要浪费更多服务资源。可通过 forks=“2” 来设置最大并行数。

    Broadcast Cluster

    广播调用所有提供者,逐个调用,任意一台报错则报错。通常用于通知所有提供者更新缓存或日志等本地资源信息。

    一般查询操作会使用Failover,事务请求会使用Failfast。

    配置方式很简单:

    在这里插入图片描述

    更多相关内容
  • Flink 容错机制

    千次阅读 2022-03-27 10:30:53
    Flink 容错机制

    Flink 容错机制主要包括:

    • 一致性检查点(checkpoint)
    • 从检查点恢复状态
    • Flink 检查点算法
    • 保存点(save points)

    一致性检查点(checkpoint)

    下图为flink程序处理一个数据流内部的完整过程,我们对改图的处理过程做一个大致的描述就是,有一批数据,比如从1 ~100的数据源不断的被flink的source读取进来;

    假如当前的程序要做的事情是,将读取到的奇数和偶数通过两个不同的task进行分别的计算,在某个时刻,flink程序所在环境因为某种原因突然断掉了,试想如果没有某种机制能够保障服务恢复后能够重新按照之前的这个点继续的话,这将是一个很糟糕的事情;

    假如flink程序正好是读到5的这个时间点的时候服务挂掉的,设想下,当前的两个并行的task任务的状态,对于奇数的task来说,在source读到5这个数据的时候,已经计算的结果是: 2 + 4 = 6 了,对于偶数任务的task来说,准备计算的结果是: 1 +  3 + 5 = 9;

    如果在这个时候,服务挂掉,如果需要某种机制能够保证服务恢复后,继续当前的这个任务状态的话,那么对于这种计算机制来说,需要保存5这个数据之前已经计算好的时刻的状态,这个状态的待保存的点即为“检查点”,俗称一致性检查点(Checkpoints)

    有了这个检查点,故障恢复之后,就能通过读取检查点中的数据,重新继续之前的任务计算状态继续后续的任务了;

    对Flink的一致性检查点做个简单的总结如下:

    • Flink 故障恢复机制的核心,就是应用状态的一致性检查点;
    • 有状态流应用的一致检查点,其实就是所有任务的状态,在某个时间点的一份拷贝(一份快照);这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时候;

    从检查点恢复状态

    参照上面这个示意图,设想下flink是如何从检查点恢复状态呢?

    • 在执行流应用程序期间,Flink 会定期保存状态的一致检查点;
    • 如果发生故障, Flink 将会使用最近的检查点来一致恢复应用程序的状态,并重新启动处理流程

    1、遇到故障之后,第一步就是重启应用

    2、第二步是从 checkpoint 中读取状态,将状态重置

    从检查点重新启动应用程序后,其内部状态与检查点完成时的状态完全相同

     3、第三步:开始消费并处理检查点到发生故障之间的所有数据

    这种检查点的保存和恢复机制可以为应用程序状态提供“精确一次”(exactly-once)的一致性,因为所有算子都会保存检查点并恢复其所有状态,这样一来所有的输入流就都会被重置到检查点完成时的位置

    检查点的实现算法

    1、一种简单的想法
    暂停应用,保存状态到检查点,再重新恢复应用
    2、 Flink 的改进实现
    1、基于 Chandy-Lamport 算法的分布式快照;
    2、将检查点的保存和数据处理分离开,不暂停整个应用;

    Flink 检查点算法

    检查点分界线(Checkpoint Barrier)

    • Flink 的检查点算法用到了一种称为分界线(barrier)的特殊数据形式,
      用来把一条流上数据按照不同的检查点分开;
    • 分界线之前到来的数据导致的状态更改,都会被包含在当前分界线所属的检查点中;而基于分界线之后的数据导致的所有更改,就会被包含在之后的检查点中;

    •  现在是一个有两个输入流的应用程序,用并行的两个 Source 任务来读取

    • JobManager 会向每个 source 任务发送一条带有新检查点 ID 的消息,通过这种方式来启动检查点;

    • 数据源将它们的状态写入检查点,并发出一个检查点 barrier
    • 状态后端在状态存入检查点之后,会返回通知给 source 任务,source 任务就会 向 JobManager 确认检查点完成;

    • 分界线对齐:barrier 向下游传递,sum 任务会等待所有输入分区的 barrier 到达;
    • 对于barrier已经到达的分区,继续到达的数据会被缓存;
    • 而barrier尚未到达的分区,数据会被正常处理;

    • 当收到所有输入分区的 barrier 时,任务就将其状态保存到状态后端的检查点中, 然后将 barrier 继续向下游转发;

    • 向下游转发检查点 barrier 后,任务继续正常的数据处理;

    • Sink 任务向 JobManager 确认状态保存到 checkpoint 完毕;
    • 当所有任务都确认已成功将状态保存到检查点时,检查点就真正完成了

    保存点(Savepoints)

    • Flink 还提供了可以自定义的镜像保存功能,就是保存点(savepoints);
    • 原则上,创建保存点使用的算法与检查点完全相同,因此保存点可以认为就是具有一些额外元数据的检查点;
    • Flink不会自动创建保存点,因此用户(或者外部调度程序)必须明确地触发创建操作;
    • 保存点是一个强大的功能。除了故障恢复外,保存点可以用于:有计划的手动备份,更新应用程序,版本迁移,暂停和重启应用,等等;

    示例案例代码展示

    import com.congge.source.SensorReading;
    import org.apache.flink.api.common.restartstrategy.RestartStrategies;
    import org.apache.flink.api.common.time.Time;
    import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import org.apache.flink.runtime.state.memory.MemoryStateBackend;
    import org.apache.flink.streaming.api.CheckpointingMode;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    public class FaultTolerance1 {
    
        public static void main(String[] args) throws Exception{
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            // 1. 状态后端配置
            env.setStateBackend( new MemoryStateBackend());
            env.setStateBackend( new FsStateBackend(""));
            env.setStateBackend( new RocksDBStateBackend(""));
    
            // 2. 检查点配置
            env.enableCheckpointing(300);
    
            // 高级选项
            env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);  //模式选择设置
            env.getCheckpointConfig().setCheckpointTimeout(60000L);
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);   //检查点个数设置
            env.getCheckpointConfig().setMinPauseBetweenCheckpoints(100L);
            env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
            env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0);
    
            // 3. 重启策略配置
            // 固定延迟重启
            env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L));
            // 失败率重启
            env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(10), Time.minutes(1)));
    
            // socket文本流
            DataStream<String> inputStream = env.socketTextStream("localhost", 7777);
    
            // 转换成SensorReading类型
            DataStream<SensorReading> dataStream = inputStream.map(line -> {
                String[] fields = line.split(",");
                return new SensorReading(fields[0], new Long(fields[1]), new Double(fields[2]));
            });
    
            dataStream.print();
            env.execute();
    
        }
    
    }
    

    展开全文
  • Flink中的容错机制

    千次阅读 2020-08-01 23:27:24
       Flink 故障恢复机制的核心,就是应用状态的一致性检查点checkpoint。   在Spark Streaming中仅仅是针对driver的故障恢复做了数据和元数据的Checkpoint,处理的是当前时间点所有分区当前数据的状态。在Flink中...

    1 checkpoint

       Flink 故障恢复机制的核心,就是应用状态的一致性检查点checkpoint。

      在Spark Streaming中仅仅是针对driver的故障恢复做了数据和元数据的Checkpoint,处理的是当前时间点所有分区当前数据的状态。在Flink中不能把当前所有分区的数据直接存下来,因为是有状态的流式计算所以除了当前处理的数据之外还应该有当前的状态。因为在状态编程中,我们可能会自定义状态,所以直接保存当前的数据和他的状态是不行的,还要知道在具体的操作流程里面到底执行到哪了,这样的话太复杂了,做不到。其实核心的一点就是要知道当前数据到底处理完没有。Flink提出的是不要保存当前所有的数据了,不管当前处理的数据是什么(如果要考虑就要考虑对应的每一个状态到底改变过没有),就考虑同一个数据,所有任务都处理完之后把那个状态取出来。

      在Spark中是针对RDD做存盘,里面就是数据,现在是怎样的数据全部存到硬盘,故障恢复把数据拿出来重新算一遍,这个想法非常简单,因为Spark是批处理,数据全存下来,恢复的时候全做一遍,这是基于批处理的一种简单实现。在流处理FLink中要想存数据的话要么存全量,要么直接重置偏移量到最开始全部回滚,这个效率太低了。所以把之前做到某一步的状态保存下来。

      有状态流应用的一致检查点,其实就是所有任务的状态,在某个时间点的一份快照;这个时间点,应该是所有任务都恰好处理完一个相同的输入数据的时候。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-QpJk3zNR-1596295448157)(C:\资料\flink\笔记\7 状态编程和容错机制\assets\1596201467669.png)]

      假设输入数据是由自然数构成的数据流,source读取之后,按照奇偶分区,sum求和。source里面的状态是保存当前读到哪个数,也就是偏移量,现在是8表示已经处理完8这个数了(有可能在处理第6个数,但是状态是5),此时sum也已经把8这个数处理完了。现在可以做一个快照把这3个状态存起来,存到定义好的状态后端,JobManager进行管理了,会保存当前checkpoint的id,元信息(source对应哪个,sum对应哪个),这里source的状态是当前处理的偏移量,sum状态是之前所有处理完数据之后的累加和。

      为什么不存储数据,而是存储所有任务的状态?假设如果要存储数据的话,有可能source在处理9,8还在去sum的路上,sum有可能还在处理6(也有可能处理完),如果把6,4,3这三个数据存储,这样最后这样恢复的话首先没有状态,source没有偏移量要消费的数据丢掉了,sum之前累加的结果没有记录不行。那么连着状态存起来,有个问题怎么知道当前数据到底要不要重新处理,有可能处理到3状态已经改了也有可能没有改。另外还在等待sum的5恢复后会丢失,所以按照数据和状态去存会有很多问题。所以Flink不要存当前正在处理的数据而是保证所有的操作把同一个数据处理完。

    2 检查点恢复状态

      在执行流应用程序期间,Flink 会定期保存状态的一致检性查点。如果发生故障, 将会使用最近的检查点来一致恢复应用程序的状态,并重新启动处理流程。就要从之前存盘的检查点恢复,从新读取数据处理,流程如下:

      (1)首先重启应用,外部系统不知道,所有状态都是空的

    在这里插入图片描述

      (2)然后从checkpoint恢复状态重置,这个是8处理完的时间点,source那里保存了偏移量,需要给数据源那里重新提交偏移量。

    在这里插入图片描述

      (3)开始消费并处理检查点到发生故障之间的所有数据,这样就好像错误没有发生,这就是Flink的checkpoint检查点机制保证了,内存状态的精确一次。因为所有算子都会保存检查点并恢复其所有状态,这样一来所有的输入流就都会被重置到检查点完成时的位置

    在这里插入图片描述

    3 Flink 检查点算法

      上面说到Flink的checkpoint保存的是所有任务状态的快照,这个状态要求所以逇任务都处理完同一个数据之后的状态。

      处理的流程怎样保证它保存状态的时候都保证他是处理完同一份数据呢?在前面例子source完之后就分区了比如8在奇数分区求和是知道数据读进来了,偶数求和分区怎么能知道5进来了呢。假如来的数据有同样的数据怎么判断当前数据是我要处理的数据呢。假设source保存了8的状态,后面的任务怎么知道读完8之后要保存呢?这就需要告诉他哪个数据读完了接下来要保存了,后面需要一个标记,要告诉后面的任务到底什么时候触发状态的保存。

      Flink中假设8读完数据之后,在偏移量为8和9的数据之间插入一个标记,现在这个标记就是要让5做完操作之后的状态保存下来,只要看到这个标记就保存下来。我们把这个标记插入到流式处理的过程中,就像Watermark一样当做特殊的数据结构,后面的任务看到这个特殊的数据结构就做保存。

      检查点算法的实现一般有两种想法:一是暂停应用,保存状态到检查点再重新恢复应用;二是将检查点的保存和数据处理分开,不暂停应用。Flink的实现是基于Chandy-Lamport算法的分布式快照。

      Flink检查点算法的核心就是检查点分界线(Checkpoint Barrier),Barrier可以认为是在source里面要做checkpoint的时候插入的一个特殊数据结构,用来把一条流上的数据按照不同的检查点分开。分界线之前的数据的状态更改会包含在当前分界线所属的检查点,barrier之后的数据状态的更改属于之后的检查点。每个任务遇到Barrier保存自己的状态。如果有并行任务Barrier就广播出去。如果上游也不只一个分区,那就有多个Barrier,要等到所有的Barrier到齐,才能保证之前该去处理保存的数据状态都保存进去了,所以有个Barrier对齐的概念。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UTkZ1J24-1596295448173)(C:\资料\flink\笔记\7 状态编程和容错机制\assets\1596202432713.png)]

      检查点算法流程:

      (1)JobManager触发Checkpoint,发出一个标记,这个标记会带着一个数,他发送给所有的source任务,source接收到消息之后就会在当前的数据流里面插入Barrier。

      (2)source任务见到了barrier就把当前刚处理完的状态(偏移量)保存了,然后把barrier广播往下游发送,同时向JobManager确认检查点已经保存好了

      (3)如果上游其中一个分区的barrier到了,接下来要做的是Barrier的对齐,没到齐之前,已经来了barrier的流新的数据又来了不能直接做计算,而是先缓存起来,而barrier还没有到达的上游分区来的数据会被正常处理

      (4)上游所有输入分区的barrier都到齐时,任务就将其状态保存到状态后端的检查点中,将barrier继续向下游转发

      (5)直到Sink 任务向 JobManager 确认状态保存到 checkpoint 完毕。当所有任务都确认已成功将状态保存到检查点时,检查点就真正完成了

    4 保存点

      保存点(savepoints)是Flink 提供的可以自定义的镜像保存功能,不会自动创建保存点,因此用户(或者外部调度程序)必须明确地触发创建操作 。

      算法和checkpoint一样,比checkpoint多了一点额外的元数据,可以认为是具有额外元数据的checkpoint,区别在于checkpoint是自动创建的,保存点是用户手动触发的

      保存点除了可以用于故障恢复外,还可以用于有计划的手动备份,更新应用程序,版本迁移,暂停和重启应用等。

      手动创建保存点:

    bin/flink savepoint :jobId [:targetDirectory]
    

      创建yarn上的保存点

    bin/flink savepoint :jobId [:targetDirectory] -yid :yarnAppId
    

      取消保存点

    bin/flink cancel -s [:targetDirectory] :jobId
    

      保存点恢复,和检查点恢复方法一样的

    bin/flink run -s :savepointPath [:runArgs]
    

      检查点(checkpoint)的目录是依赖JobID的,每次运行任务都是一个唯一的JobID,所以要找到上一次任务的JobID才能找到检查点。保存点(savepoint)需要手动触发,并且在指定目录下还生成一个唯一的子目录。根据JobID可以在任务失败后,简单的重新执行任务即可恢复到失败前的检查点。

    展开全文
  • 一般来说,分布式数据集的容错性有两种方式:数据检查点和记录数据的更新。 面向大规模数据分析,数据检查点操作成本很高,需要通过数据中心的网络连接在机器之间复制庞大的数据集,而网络带宽往往比内存带宽低得多...

    引入

    一般来说,分布式数据集的容错性有两种方式:数据检查点和记录数据的更新。 
    面向大规模数据分析,数据检查点操作成本很高,需要通过数据中心的网络连接在机器之间复制庞大的数据集,而网络带宽往往比内存带宽低得多,同时还需要消耗更多的存储资源。 
    因此,Spark选择记录更新的方式。但是,如果更新粒度太细太多,那么记录更新成本也不低。因此,RDD只支持粗粒度转换,即只记录单个块上执行的单个操作,然后将创建RDD的一系列变换序列(每个RDD都包含了他是如何由其他RDD变换过来的以及如何重建某一块数据的信息。因此RDD的容错机制又称“血统(Lineage)”容错)记录下来,以便恢复丢失的分区。 
    Lineage本质上很类似于数据库中的重做日志(Redo Log),只不过这个重做日志粒度很大,是对全局数据做同样的重做进而恢复数据。

    Lineage机制

    Lineage简介

    相比其他系统的细颗粒度的内存数据更新级别的备份或者LOG机制,RDD的Lineage记录的是粗颗粒度的特定数据Transformation操作(如filter、map、join等)行为。当这个RDD的部分分区数据丢失时,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区。因为这种粗颗粒的数据模型,限制了Spark的运用场合,所以Spark并不适用于所有高性能要求的场景,但同时相比细颗粒度的数据模型,也带来了性能的提升。

    两种依赖关系

    RDD在Lineage依赖方面分为两种:窄依赖(Narrow Dependencies)与宽依赖(Wide Dependencies,源码中称为Shuffle 
    Dependencies),用来解决数据容错的高效性。

    • 窄依赖是指父RDD的每一个分区最多被一个子RDD的分区所用,表现为一个父RDD的分区对应于一个子RDD的分区 
      或多个父RDD的分区对应于一个子RDD的分区,也就是说一个父RDD的一个分区不可能对应一个子RDD的多个分区。 
      1个父RDD分区对应1个子RDD分区,这其中又分两种情况:1个子RDD分区对应1个父RDD分区(如map、filter等算子),1个子RDD分区对应N个父RDD分区(如co-paritioned(协同划分)过的Join)。
    • 宽依赖是指子RDD的分区依赖于父RDD的多个分区或所有分区,即存在一个父RDD的一个分区对应一个子RDD的多个分区。 
      1个父RDD分区对应多个子RDD分区,这其中又分两种情况:1个父RDD对应所有子RDD分区(未经协同划分的Join)或者1个父RDD对应非全部的多个RDD分区(如groupByKey)。 
       

    本质理解:根据父RDD分区是对应1个还是多个子RDD分区来区分窄依赖(父分区对应一个子分区)和宽依赖(父分区对应多个子分 
    区)。如果对应多个,则当容错重算分区时,因为父分区数据只有一部分是需要重算子分区的,其余数据重算就造成了冗余计算。

    对于宽依赖,Stage计算的输入和输出在不同的节点上,对于输入节点完好,而输出节点死机的情况,通过重新计算恢复数据这种情况下,这种方法容错是有效的,否则无效,因为无法重试,需要向上追溯其祖先看是否可以重试(这就是lineage,血统的意思),窄依赖对于数据的重算开销要远小于宽依赖的数据重算开销。

    窄依赖和宽依赖的概念主要用在两个地方:一个是容错中相当于Redo日志的功能;另一个是在调度中构建DAG作为不同Stage的划分点。

    依赖关系的特性

    第一,窄依赖可以在某个计算节点上直接通过计算父RDD的某块数据计算得到子RDD对应的某块数据;宽依赖则要等到父RDD所有数据都计算完成之后,并且父RDD的计算结果进行hash并传到对应节点上之后才能计算子RDD。 
    第二,数据丢失时,对于窄依赖只需要重新计算丢失的那一块数据来恢复;对于宽依赖则要将祖先RDD中的所有数据块全部重新计算来恢复。所以在长“血统”链特别是有宽依赖的时候,需要在适当的时机设置数据检查点。也是这两个特性要求对于不同依赖关系要采取不同的任务调度机制和容错恢复机制。

    容错原理

    在容错机制中,如果一个节点死机了,而且运算窄依赖,则只要把丢失的父RDD分区重算即可,不依赖于其他节点。而宽依赖需要父RDD的所有分区都存在,重算就很昂贵了。可以这样理解开销的经济与否:在窄依赖中,在子RDD的分区丢失、重算父RDD分区时,父RDD相应分区的所有数据都是子RDD分区的数据,并不存在冗余计算。在宽依赖情况下,丢失一个子RDD分区重算的每个父RDD的每个分区的所有数据并不是都给丢失的子RDD分区用的,会有一部分数据相当于对应的是未丢失的子RDD分区中需要的数据,这样就会产生冗余计算开销,这也是宽依赖开销更大的原因。因此如果使用Checkpoint算子来做检查点,不仅要考虑Lineage是否足够长,也要考虑是否有宽依赖,对宽依赖加Checkpoint是最物有所值的。

    Checkpoint机制

    通过上述分析可以看出在以下两种情况下,RDD需要加检查点。

    1. DAG中的Lineage过长,如果重算,则开销太大(如在PageRank中)。
    2. 在宽依赖上做Checkpoint获得的收益更大。

    由于RDD是只读的,所以Spark的RDD计算中一致性不是主要关心的内容,内存相对容易管理,这也是设计者很有远见的地方,这样减少了框架的复杂性,提升了性能和可扩展性,为以后上层框架的丰富奠定了强有力的基础。 
    在RDD计算中,通过检查点机制进行容错,传统做检查点有两种方式:通过冗余数据和日志记录更新操作。在RDD中的doCheckPoint方法相当于通过冗余数据来缓存数据,而之前介绍的血统就是通过相当粗粒度的记录更新操作来实现容错的。

    检查点(本质是通过将RDD写入Disk做检查点)是为了通过lineage做容错的辅助,lineage过长会造成容错成本过高,这样就不如在中间阶段做检查点容错,如果之后有节点出现问题而丢失分区,从做检查点的RDD开始重做Lineage,就会减少开销。

    一个 Streaming Application 往往需要7*24不间断的跑,所以需要有抵御意外的能力(比如机器或者系统挂掉,JVM crash等)。为了让这成为可能,Spark Streaming需要 checkpoint 足够多信息至一个具有容错设计的存储系统才能让 Application 从失败中恢复。Spark Streaming 会 checkpoint 两种类型的数据。

    • Metadata(元数据) checkpointing - 保存定义了 Streaming 计算逻辑至类似 HDFS 的支持容错的存储系统。用来恢复 driver,元数据包括:
      • 配置 - 用于创建该 streaming application 的所有配置
      • DStream 操作 - DStream 一些列的操作
      • 未完成的 batches - 那些提交了 job 但尚未执行或未完成的 batches
    • Data checkpointing - 保存已生成的RDDs至可靠的存储。这在某些 stateful 转换中是需要的,在这种转换中,生成 RDD 需要依赖前面的 batches,会导致依赖链随着时间而变长。为了避免这种没有尽头的变长,要定期将中间生成的 RDDs 保存到可靠存储来切断依赖链

    具体来说,metadata checkpointing主要还是从drvier失败中恢复,而Data Checkpoing用于对有状态的transformation操作进行checkpointing

    Checkpointing具体的使用方式时通过下列方法:

    //checkpointDirectory为checkpoint文件保存目录
    streamingContext.checkpoint(checkpointDirectory)

    什么时候需要启用 checkpoint?

    什么时候该启用 checkpoint 呢?满足以下任一条件:

    • 使用了 stateful 转换 - 如果 application 中使用了updateStateByKeyreduceByKeyAndWindow等 stateful 操作,必须提供 checkpoint 目录来允许定时的 RDD checkpoint
    • 希望能从意外中恢复 driver

    如果 streaming app 没有 stateful 操作,也允许 driver 挂掉后再次重启的进度丢失,就没有启用 checkpoint的必要了。

    如何使用 checkpoint?

    启用 checkpoint,需要设置一个支持容错 的、可靠的文件系统(如 HDFS、s3 等)目录来保存 checkpoint 数据。通过调用 streamingContext.checkpoint(checkpointDirectory) 来完成。另外,如果你想让你的 application 能从 driver 失败中恢复,你的 application 要满足:

    • 若 application 为首次重启,将创建一个新的 StreamContext 实例
    • 如果 application 是从失败中重启,将会从 checkpoint 目录导入 checkpoint 数据来重新创建 StreamingContext 实例

    通过 StreamingContext.getOrCreate 可以达到目的:

    复制代码

    // Function to create and setup a new StreamingContext
    def functionToCreateContext(): StreamingContext = {
        val ssc = new StreamingContext(...)   // new context
        val lines = ssc.socketTextStream(...) // create DStreams
        ...
        ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
        ssc
    }
    
    // Get StreamingContext from checkpoint data or create a new one
    val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
    
    // Do additional setup on context that needs to be done,
    // irrespective of whether it is being started or restarted
    context. ...
    
    // Start the context
    context.start()
    context.awaitTermination()

    复制代码

    如果 checkpointDirectory 存在,那么 context 将导入 checkpoint 数据。如果目录不存在,函数 functionToCreateContext 将被调用并创建新的 context

    除调用 getOrCreate 外,还需要你的集群模式支持 driver 挂掉之后重启之。例如,在 yarn 模式下,driver 是运行在 ApplicationMaster 中,若 ApplicationMaster 挂掉,yarn 会自动在另一个节点上启动一个新的 ApplicationMaster。

    需要注意的是,随着 streaming application 的持续运行,checkpoint 数据占用的存储空间会不断变大。因此,需要小心设置checkpoint 的时间间隔。设置得越小,checkpoint 次数会越多,占用空间会越大;如果设置越大,会导致恢复时丢失的数据和进度越多。一般推荐设置为 batch duration 的5~10倍。

    导出 checkpoint 数据

    上文提到,checkpoint 数据会定时导出到可靠的存储系统,那么

    1. 在什么时机进行 checkpoint
    2. checkpoint 的形式是怎么样的

    checkpoint 的时机

    在 Spark Streaming 中,JobGenerator 用于生成每个 batch 对应的 jobs,它有一个定时器,定时器的周期即初始化 StreamingContext 时设置的 batchDuration。这个周期一到,JobGenerator 将调用generateJobs方法来生成并提交 jobs,这之后调用 doCheckpoint 方法来进行 checkpoint。doCheckpoint 方法中,会判断当前时间与 streaming application start 的时间之差是否是 checkpoint duration 的倍数,只有在是的情况下才进行 checkpoint。

    checkpoint 的形式

    最终 checkpoint 的形式是将类 Checkpoint的实例序列化后写入外部存储,值得一提的是,有专门的一条线程来做将序列化后的 checkpoint 写入外部存储。类 Checkpoint 包含以下数据

    除了 Checkpoint 类,还有 CheckpointWriter 类用来导出 checkpoint,CheckpointReader 用来导入 checkpoint

    Checkpoint 的局限

    Spark Streaming 的 checkpoint 机制看起来很美好,却有一个硬伤。上文提到最终刷到外部存储的是类 Checkpoint 对象序列化后的数据。那么在 Spark Streaming application 重新编译后,再去反序列化 checkpoint 数据就会失败。这个时候就必须新建 StreamingContext。

    针对这种情况,在我们结合 Spark Streaming + kafka 的应用中,我们自行维护了消费的 offsets,这样一来及时重新编译 application,还是可以从需要的 offsets 来消费数据,这里只是举个例子,不详细展开了。

    展开全文
  • 高可用设计-容错机制

    千次阅读 2021-12-13 22:06:13
    高可用设计之容错机制的介绍
  • Flink:状态管理和容错机制

    千次阅读 2020-06-17 23:59:36
    Keyed State 和 Operator State 原始状态和托管状态 如何使用Managed Keyed State 状态的生命周期(TTL) 如何使用Managed Operator State 容错机制 什么是checkpoint checkpoint算法 如何使用checkpoint 启用...
  • 【总结】Spark容错机制

    万次阅读 多人点赞 2017-06-23 10:57:12
    对于一个大的集群系统来说,机器故障、网络异常等都是很常见的,Spark这样的大型分布式计算集群提供了很多的容错机制来提高整个系统的可用性。 一般来说,分布式数据集的容错性有两种方式:数据检查点和记录数据的...
  • 代码行为异常容错机制与自我调节

    千次阅读 2020-03-29 21:32:22
    1.5、代码的容错机制与自我调节 2、设计观与方法论 2.1 设计观与代码容错机制、自我调节 2.2 问题是否能够被解决 2.2.1 意识行为是否具有虚拟性 2.2.2 思维是否具有方向性 2.3 问题与问题解决 2.4 软件与问题...
  •  Apache Flink 提供了可以恢复数据流应用到一致状态的容错机制。确保在发生故障时,程序的每条记录只会作用于状态一次(exactly-once),当然也可以降级为至少一次(at-least-once)。  容错机制通过持续创建...
  • 状态编程和容错机制(一) 我们知道Flink是一个流式计算引擎,也就是数据上来一条处理一条。那Flink是怎样完成窗口计算的呢?比如:我要取5秒钟之内传感器温度最大值。如果只是来一条数据处理一条这显然无法完成上述...
  • 1、图解Elasticsearch容错机制:master选举,replica容错,数据恢复 (1)还是之前9个 shard,3个 node的例子 (2)master node宕机,自动master选举,cluster status为red P0、P1、R22这三个shard丢失,master ...
  • Dubbo使用之容错机制

    千次阅读 2018-11-19 21:08:47
    所谓容错机制,举个简单例子,我们在使用电脑的某个程序时,常常会遇到“程序无反应”或“程序未响应”的情况发生,此时这个程序便不能在进行下去,但经常会在过了几秒钟后恢复到正常使用的状态。这种“无反应”或...
  • 容错机制指的是某中系统控制在一定范围的一种允许或包容犯错情况的发生,举个简单的例子,我们在电脑上运行一个程序,有时候会出现无响应的情况,然后系统回弹出一个提示框让我们选择,是立即结束还是继续等待,然后...
  • nginx upstream failover 容错机制

    千次阅读 2017-05-28 11:34:34
    在上面的例子中如果每次发生7个请求,5个请求将被发送到backend1.example.com,其他两台将分别得到一个请求,如果有一台服务器不可用,那么请求将被转发到下一台服务器,直到所有的服务器检查都通过。如果所有的...
  • Flink状态容错机制

    2019-12-02 19:44:54
    Flink状态容错机制 1、前言 ​ 谈一些自己对Flink状态容错机制的理解,没有很详细的解释什么是状态或者是状态的作用,主要的观点支持来源于Flink中文官网:https://ververica.cn 2、状态 2.1、什么是状态 ​ 首先,...
  • 高性能web服务容错机制

    千次阅读 2016-04-11 18:11:41
    一、 重试机制 最容易也最简单被人想到的容错方式,当然就是“失败重试”,总而言之,简单粗暴!简单是指它的实现通常很简单,粗暴则是指使用不当,很可能会带来系统“雪崩”的风险,因为重试意味着对后端服务的...
  • Apache Flink 提供了可以恢复数据流应用到一致状态的容错机制。确保在发生故障时,程序恢复时,数据流的每一条记录只会被处理一次(exactly-once),当然也可以降级为至少处理一次(at-least-once)。 容错机制通过...
  • 1.failover:失效转移 失效转移(failover)是一种备份操作模式,当主要组件由于...失效转移能应用于系统的许多方面:举个例子,在个人电脑内部,失效转移也许是保护故障的处理机的一种机制;在网络内部,失效转移能够
  • nginx upstream 容错机制

    千次阅读 2013-12-13 10:19:50
    在上面的例子中如果每次发生 7 个请求, 5 个请求将被发送到 backend1.example.com ,其他两台将分别得到一个请求,如果有一台服务器不可用,那么请求将被转发到下一台服务器,直到所有的服务器...
  • 上篇我们大体讲了Flink容错机制的处理方法,和产生checkpoint的机制;这次主要讲一些补充 一,Flink的checkpoint形式 checkpoint有两种特殊形式: Savepoint:是一种特殊的checkpoint,只不过不像checkpoint定期的...
  • Storm入门与实践(4)Storm的容错机制

    千次阅读 2017-08-24 11:17:37
    本文详细解释了 Storm 如何实现这种保障机制,以及作为用户如何使用好 Storm 的可靠性机制。消息的“完整性处理”是什么意思一个从 spout 中发送出的 tuple 会产生上千个基于它创建的 tuples。例如,有这样一个 word...
  • Sink Processors是作用在sink组件上的容错机制。通过调度Sink Groups(就是分了组的Sinks),可以做到负载均衡(load_balance Processors)和类似HDFS中Namenode高可用(Failover Processors )那样的目的。 Failover ...
  • flink容错机制(翻译官网英文文档)

    千次阅读 2018-07-17 11:14:49
    flink提供了能够保持一致地恢复数据流应用的状态的一种容错机制,这种机制保证即使在故障持续发生的情况下,程序的状态最终依然会从数据流中产生并且保证exactly once,即正好一次的语义。 容错机制持续不断地从...
  • 异常处理及重启机制 1.对于chunk类型的Step,spring batch为我们提供了用于管理它的状态 2.状态的管理是通过ItemStream接口来实现的 3.ItemStream接口: (1)open():每一次step执行会调用 (2)Update():每一个...
  • 数据和计算系统如何容错

    千次阅读 2021-12-13 16:02:56
    容错是大规模数据系统和计算系统的必备功能,不能容错的分布式系统基本没有可用性。大家可能觉得高质量的系统错误率没有那么高,实质上系统的故障率总是随着系统规模和复杂程度增加。笔者读书的时候曾经听过一位参与...
  • 容错恢复机制 应用程序开发中常见的功能 程序运行的时候,正常情况下应该按某种方式来做,如果按照某种方式来做发生错误的话,系统并不会崩溃,而是继续运行,能提供出错后的备用方案。 日志记录的例子 把...
  • 本文重点讨论软件容错的问题。一个完整的系统在内部是由很多小服务构成,服务之间以及服务与资源之间会存在远程调用,每个系统的可用性不可能达到100%,所以容错处理显得尤为重要!容错的目标是降低...
  • 群集容错机制 Failover Mode 失败自动切换,当出现失败,重试其它服务器。 通常用于读操作,但重试会带来更长延迟。 可通过retries="2"来设置重试次数(不含第一次)。 Failfast Mode 快速失败,只...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 34,589
精华内容 13,835
关键字:

容错机制的例子

友情链接: 菜单的使用.rar